]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: RGWPeriodPusher shares periods between zones/groups
authorCasey Bodley <cbodley@redhat.com>
Fri, 9 Oct 2015 16:15:05 +0000 (12:15 -0400)
committerYehuda Sadeh <yehuda@redhat.com>
Fri, 12 Feb 2016 00:13:35 +0000 (16:13 -0800)
RGWPeriodPusher implements the RGWRealmWatcher interface to get
notifications for new periods.  when it discovers that it needs to push
a period to other zones, it spawns a thread to send them (and keep
retrying) until all of those zones successfully acknowledge the period

Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/CMakeLists.txt
src/common/config_opts.h
src/rgw/Makefile.am
src/rgw/rgw_period_pusher.cc [new file with mode: 0644]
src/rgw/rgw_period_pusher.h [new file with mode: 0644]
src/rgw/rgw_rados.cc
src/rgw/rgw_realm_watcher.h
src/rgw/rgw_rest_conn.h

index 472d59ac63d868264073b197683884a49ec94dad..40f6263b7b8d28b3b619edf9bfd9b560ae617709 100644 (file)
@@ -1170,6 +1170,7 @@ if(${WITH_RADOSGW})
     rgw/rgw_sync.cc
     rgw/rgw_data_sync.cc
     rgw/rgw_dencoder.cc
+    rgw/rgw_period_pusher.cc
     rgw/rgw_realm_reloader.cc
     rgw/rgw_realm_watcher.cc
     rgw/rgw_coroutine.cc
index 50458abf9a66ca255c0fefdfe26aa1e92a544346..a36bcc003c83cac774c73561aefa848bbe8e6358 100644 (file)
@@ -1295,6 +1295,8 @@ OPTION(rgw_run_sync_thread, OPT_BOOL, true) // whether radosgw (not radosgw-admi
 OPTION(rgw_sync_lease_period, OPT_INT, 30) // time in second for lease that rgw takes on a specific log (or log shard)
 
 OPTION(rgw_realm_reconfigure_delay, OPT_DOUBLE, 2) // seconds to wait before reloading realm configuration
+OPTION(rgw_period_push_interval, OPT_DOUBLE, 2) // seconds to wait before retrying "period push"
+OPTION(rgw_period_push_interval_max, OPT_DOUBLE, 30) // maximum interval after exponential backoff
 
 OPTION(mutex_perf_counter, OPT_BOOL, false) // enable/disable mutex perf counter
 OPTION(throttler_perf_counter, OPT_BOOL, true) // enable/disable throttler perf counter
index 5ac4bda8e74cbc050c221a2765eb382354805a39..5d30ff6a347d48735ff07bcc34e616af4be946f5 100644 (file)
@@ -59,6 +59,7 @@ librgw_la_SOURCES =  \
        rgw/rgw_keystone.cc \
        rgw/rgw_quota.cc \
        rgw/rgw_dencoder.cc \
+       rgw/rgw_period_pusher.cc \
        rgw/rgw_realm_reloader.cc \
        rgw/rgw_realm_watcher.cc \
        rgw/rgw_object_expirer_core.cc \
@@ -202,6 +203,7 @@ noinst_HEADERS += \
        rgw/rgw_user.h \
        rgw/rgw_bucket.h \
        rgw/rgw_keystone.h \
+       rgw/rgw_period_pusher.h \
        rgw/rgw_realm_reloader.h \
        rgw/rgw_realm_watcher.h \
        rgw/rgw_civetweb.h \
diff --git a/src/rgw/rgw_period_pusher.cc b/src/rgw/rgw_period_pusher.cc
new file mode 100644 (file)
index 0000000..e9576f3
--- /dev/null
@@ -0,0 +1,285 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <map>
+#include <thread>
+
+#include "rgw_period_pusher.h"
+#include "rgw_cr_rest.h"
+#include "common/errno.h"
+
+#include "rgw_boost_asio_yield.h"
+
+#define dout_subsys ceph_subsys_rgw
+
+#undef dout_prefix
+#define dout_prefix (*_dout << "rgw period pusher: ")
+
+/// A coroutine to post the period over the given connection.
+using PushCR = RGWPostRESTResourceCR<RGWPeriod, int>;
+
+/// A coroutine that calls PushCR, and retries with backoff until success.
+class PushAndRetryCR : public RGWCoroutine {
+  const std::string& zone;
+  RGWRESTConn *const conn;
+  RGWHTTPManager *const http;
+  RGWPeriod& period;
+  const std::string epoch; //< epoch string for params
+  double timeout; //< current interval between retries
+  const double timeout_max; //< maximum interval between retries
+  uint32_t counter; //< number of failures since backoff increased
+
+ public:
+  PushAndRetryCR(CephContext* cct, const std::string& zone, RGWRESTConn* conn,
+                 RGWHTTPManager* http, RGWPeriod& period)
+    : RGWCoroutine(cct), zone(zone), conn(conn), http(http), period(period),
+      epoch(std::to_string(period.get_epoch())),
+      timeout(cct->_conf->rgw_period_push_interval),
+      timeout_max(cct->_conf->rgw_period_push_interval_max),
+      counter(0)
+  {}
+
+  int operate() override;
+};
+
+int PushAndRetryCR::operate()
+{
+  reenter(this) {
+    for (;;) {
+      yield {
+        ldout(cct, 10) << "pushing period " << period.get_id()
+            << " to " << zone << dendl;
+        // initialize the http params
+        rgw_http_param_pair params[] = {
+          { "period", period.get_id().c_str() },
+          { "epoch", epoch.c_str() },
+          { nullptr, nullptr }
+        };
+        call(new PushCR(cct, conn, http, "/admin/realm/period",
+                        params, period, nullptr));
+      }
+
+      // stop on success
+      if (get_ret_status() == 0) {
+        ldout(cct, 10) << "push to " << zone << " succeeded" << dendl;
+        return set_cr_done();
+      }
+
+      // try each endpoint in the connection before waiting
+      if (++counter < conn->get_endpoint_count())
+        continue;
+      counter = 0;
+
+      // wait with exponential backoff up to timeout_max
+      yield {
+        utime_t dur;
+        dur.set_from_double(timeout);
+
+        ldout(cct, 10) << "waiting " << dur << "s for retry.." << dendl;
+        wait(dur);
+
+        timeout *= 2;
+        if (timeout > timeout_max)
+          timeout = timeout_max;
+      }
+    }
+  }
+  return 0;
+}
+
+/**
+ * PushAllCR is a coroutine that sends the period over all of the given
+ * connections, retrying until they are all marked as completed.
+ */
+class PushAllCR : public RGWCoroutine {
+  RGWHTTPManager *const http;
+  RGWPeriod period; //< period object to push
+  std::map<std::string, RGWRESTConn> conns; //< zones that need the period
+
+ public:
+  PushAllCR(CephContext* cct, RGWHTTPManager* http, RGWPeriod&& period,
+            std::map<std::string, RGWRESTConn>&& conns)
+    : RGWCoroutine(cct), http(http),
+      period(std::move(period)),
+      conns(std::move(conns))
+  {}
+
+  int operate() override;
+};
+
+int PushAllCR::operate()
+{
+  reenter(this) {
+    // spawn a coroutine to push the period over each connection
+    yield {
+      ldout(cct, 4) << "sending " << conns.size() << " periods" << dendl;
+      for (auto& c : conns)
+        spawn(new PushAndRetryCR(cct, c.first, &c.second, http, period), false);
+    }
+    // wait for all to complete
+    drain_all();
+    return set_cr_done();
+  }
+  return 0;
+}
+
+/// A background thread to run the PushAllCR coroutine and exit.
+class RGWPeriodPusher::CRThread {
+  RGWCoroutinesManager coroutines;
+  RGWHTTPManager http;
+  boost::intrusive_ptr<PushAllCR> push_all;
+  std::thread thread;
+
+ public:
+  CRThread(CephContext* cct, RGWPeriod&& period,
+           std::map<std::string, RGWRESTConn>&& conns)
+    : coroutines(cct),
+      http(cct, coroutines.get_completion_mgr()),
+      push_all(new PushAllCR(cct, &http, std::move(period), std::move(conns))),
+      thread([this] { coroutines.run(push_all.get()); })
+  {
+    http.set_threaded();
+  }
+  ~CRThread()
+  {
+    push_all.reset();
+    coroutines.stop();
+    if (thread.joinable())
+      thread.join();
+  }
+};
+
+
+RGWPeriodPusher::RGWPeriodPusher(RGWRados* store)
+  : cct(store->ctx()), store(store)
+{}
+
+// destructor is here because CRThread is incomplete in the header
+RGWPeriodPusher::~RGWPeriodPusher() = default;
+
+void RGWPeriodPusher::handle_notify(RGWRealmNotify type,
+                                    bufferlist::iterator& p)
+{
+  // decode the period
+  RGWZonesNeedPeriod info;
+  try {
+    ::decode(info, p);
+  } catch (buffer::error& e) {
+    derr(cct) << "Failed to decode the period: " << e.what() << dendl;
+    return;
+  }
+
+  std::lock_guard<std::mutex> lock(mutex);
+
+  // we can't process this notification without access to our current realm
+  // configuration. queue it until resume()
+  if (store == nullptr) {
+    pending_periods.emplace_back(std::move(info));
+    return;
+  }
+
+  handle_notify(std::move(info));
+}
+
+// expects the caller to hold a lock on mutex
+void RGWPeriodPusher::handle_notify(RGWZonesNeedPeriod&& period)
+{
+  if (period.get_id() != period_id) {
+    // new period must follow current period
+    if (period.get_predecessor() != period_id) {
+      ldout(cct, 10) << "current period " << period_id << " is not period "
+          << period.get_id() << "'s predecessor" << dendl;
+      return;
+    }
+  } else if (period.get_epoch() <= period_epoch) {
+    ldout(cct, 10) << "period epoch " << period.get_epoch() << " is not newer "
+        "than current epoch " << period_epoch << ", discarding update" << dendl;
+    return;
+  }
+
+  // find our zonegroup in the new period
+  auto& zonegroups = period.get_map().zonegroups;
+  auto i = zonegroups.find(store->get_zonegroup().get_id());
+  if (i == zonegroups.end()) {
+    lderr(cct) << "The new period does not contain my zonegroup!" << dendl;
+    return;
+  }
+  auto& my_zonegroup = i->second;
+
+  // if we're not a master zone, we're not responsible for pushing any updates
+  if (my_zonegroup.master_zone != store->get_zone_params().get_id())
+    return;
+
+  // construct a map of the zones that need this period. the map uses the same
+  // keys/ordering as the zone[group] map, so we can use a hint for insertions
+  std::map<std::string, RGWRESTConn> conns;
+  auto hint = conns.end();
+
+  // are we the master zonegroup in this period?
+  if (period.get_map().master_zonegroup == store->get_zonegroup().get_id()) {
+    // update other zonegroup endpoints
+    for (auto& zg : zonegroups) {
+      auto& zonegroup = zg.second;
+      if (zonegroup.get_id() == store->get_zonegroup().get_id())
+        continue;
+      if (zonegroup.endpoints.empty())
+        continue;
+
+      hint = conns.emplace_hint(
+          hint, std::piecewise_construct,
+          std::forward_as_tuple(zonegroup.get_id()),
+          std::forward_as_tuple(cct, store, zonegroup.endpoints));
+    }
+  }
+
+  // update other zone endpoints
+  for (auto& z : my_zonegroup.zones) {
+    auto& zone = z.second;
+    if (zone.id == store->get_zone_params().get_id())
+      continue;
+    if (zone.endpoints.empty())
+      continue;
+
+    hint = conns.emplace_hint(
+        hint, std::piecewise_construct,
+        std::forward_as_tuple(zone.id),
+        std::forward_as_tuple(cct, store, zone.endpoints));
+  }
+
+  if (conns.empty()) {
+    ldout(cct, 4) << "No zones to update" << dendl;
+    return;
+  }
+
+  period_id = period.get_id();
+  period_epoch = period.get_epoch();
+
+  ldout(cct, 4) << "Zone master pushing period " << period_id
+      << " epoch " << period_epoch << " to "
+      << conns.size() << " other zones" << dendl;
+
+  // spawn a new coroutine thread, destroying the previous one
+  cr_thread.reset(new CRThread(cct, std::move(period), std::move(conns)));
+}
+
+void RGWPeriodPusher::pause()
+{
+  ldout(cct, 4) << "paused for realm update" << dendl;
+  std::lock_guard<std::mutex> lock(mutex);
+  store = nullptr;
+}
+
+void RGWPeriodPusher::resume(RGWRados* store)
+{
+  std::lock_guard<std::mutex> lock(mutex);
+  this->store = store;
+
+  ldout(cct, 4) << "resume with " << pending_periods.size()
+      << " periods pending" << dendl;
+
+  // process notification queue
+  for (auto& info : pending_periods) {
+    handle_notify(std::move(info));
+  }
+  pending_periods.clear();
+}
diff --git a/src/rgw/rgw_period_pusher.h b/src/rgw/rgw_period_pusher.h
new file mode 100644 (file)
index 0000000..bf3ca1a
--- /dev/null
@@ -0,0 +1,56 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef RGW_PERIOD_PUSHER_H
+#define RGW_PERIOD_PUSHER_H
+
+#include <memory>
+#include <mutex>
+#include <vector>
+
+#include "rgw_realm_reloader.h"
+
+class RGWRados;
+class RGWPeriod;
+
+// RGWRealmNotify payload for push coordination
+using RGWZonesNeedPeriod = RGWPeriod;
+
+/**
+ * RGWPeriodPusher coordinates with other nodes via the realm watcher to manage
+ * the responsibility for pushing period updates to other zones or zonegroups.
+ */
+class RGWPeriodPusher final : public RGWRealmWatcher::Watcher,
+                              public RGWRealmReloader::Pauser {
+ public:
+  RGWPeriodPusher(RGWRados* store);
+  ~RGWPeriodPusher();
+
+  /// respond to realm notifications by pushing new periods to other zones
+  void handle_notify(RGWRealmNotify type, bufferlist::iterator& p) override;
+
+  /// avoid accessing RGWRados while dynamic reconfiguration is in progress.
+  /// notifications will be enqueued until resume()
+  void pause() override;
+
+  /// continue processing notifications with a new RGWRados instance
+  void resume(RGWRados* store) override;
+
+ private:
+  void handle_notify(RGWZonesNeedPeriod&& period);
+
+  CephContext *const cct;
+  RGWRados* store;
+
+  std::mutex mutex;
+  std::string period_id; //< the current period id being sent
+  epoch_t period_epoch; //< the current period epoch being sent
+
+  /// while paused for reconfiguration, we need to queue up notifications
+  std::vector<RGWZonesNeedPeriod> pending_periods;
+
+  class CRThread; //< contains thread, coroutine manager, http manager
+  std::unique_ptr<CRThread> cr_thread; //< thread to run the push coroutines
+};
+
+#endif // RGW_PERIOD_PUSHER_H
index bc379da2f20d970ce82e93d2d0e6460d35942768..7208c6afc3e59f27c230a58a4ef98961ebe6b64f 100644 (file)
@@ -795,6 +795,9 @@ int RGWRealm::notify_zone(bufferlist& bl)
 int RGWRealm::notify_new_period(const RGWPeriod& period)
 {
   bufferlist bl;
+  // push the period to dependent zonegroups/zones
+  ::encode(RGWRealmNotify::ZonesNeedPeriod, bl);
+  ::encode(period, bl);
   // reload the gateway with the new period
   ::encode(RGWRealmNotify::Reload, bl);
 
index d0e87e6b0ea5e6f9abfe1add89746d1bb1b9244a..1325571dcc689bcb178ec5e066546b022f85e755 100644 (file)
@@ -14,6 +14,7 @@ class RGWRealm;
 
 enum class RGWRealmNotify {
   Reload,
+  ZonesNeedPeriod,
 };
 WRITE_RAW_ENCODER(RGWRealmNotify);
 
index 28537cf96d462d502b59dd3b7ab81f4580f70cc4..308d82b6d1fd7436959c3225fc9b434e6b5b2666 100644 (file)
@@ -76,6 +76,7 @@ public:
   CephContext *get_ctx() {
     return cct;
   }
+  size_t get_endpoint_count() const { return endpoints.size(); }
 
   /* sync request */
   int forward(const rgw_user& uid, req_info& info, obj_version *objv, size_t max_response, bufferlist *inbl, bufferlist *outbl);