From: Casey Bodley Date: Fri, 9 Oct 2015 16:15:05 +0000 (-0400) Subject: rgw: RGWPeriodPusher shares periods between zones/groups X-Git-Tag: v10.1.0~354^2~197 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=22fca7ebd3c29c51559843e175f10dffdc8f01f1;p=ceph.git rgw: RGWPeriodPusher shares periods between zones/groups RGWPeriodPusher implements the RGWRealmWatcher interface to get notifications for new periods. when it discovers that it needs to push a period to other zones, it spawns a thread to send them (and keep retrying) until all of those zones successfully acknowledge the period Signed-off-by: Casey Bodley --- diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 472d59ac63d8..40f6263b7b8d 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1170,6 +1170,7 @@ if(${WITH_RADOSGW}) rgw/rgw_sync.cc rgw/rgw_data_sync.cc rgw/rgw_dencoder.cc + rgw/rgw_period_pusher.cc rgw/rgw_realm_reloader.cc rgw/rgw_realm_watcher.cc rgw/rgw_coroutine.cc diff --git a/src/common/config_opts.h b/src/common/config_opts.h index 50458abf9a66..a36bcc003c83 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -1295,6 +1295,8 @@ OPTION(rgw_run_sync_thread, OPT_BOOL, true) // whether radosgw (not radosgw-admi OPTION(rgw_sync_lease_period, OPT_INT, 30) // time in second for lease that rgw takes on a specific log (or log shard) OPTION(rgw_realm_reconfigure_delay, OPT_DOUBLE, 2) // seconds to wait before reloading realm configuration +OPTION(rgw_period_push_interval, OPT_DOUBLE, 2) // seconds to wait before retrying "period push" +OPTION(rgw_period_push_interval_max, OPT_DOUBLE, 30) // maximum interval after exponential backoff OPTION(mutex_perf_counter, OPT_BOOL, false) // enable/disable mutex perf counter OPTION(throttler_perf_counter, OPT_BOOL, true) // enable/disable throttler perf counter diff --git a/src/rgw/Makefile.am b/src/rgw/Makefile.am index 5ac4bda8e74c..5d30ff6a347d 100644 --- a/src/rgw/Makefile.am +++ b/src/rgw/Makefile.am @@ -59,6 +59,7 @@ librgw_la_SOURCES = \ rgw/rgw_keystone.cc \ rgw/rgw_quota.cc \ rgw/rgw_dencoder.cc \ + rgw/rgw_period_pusher.cc \ rgw/rgw_realm_reloader.cc \ rgw/rgw_realm_watcher.cc \ rgw/rgw_object_expirer_core.cc \ @@ -202,6 +203,7 @@ noinst_HEADERS += \ rgw/rgw_user.h \ rgw/rgw_bucket.h \ rgw/rgw_keystone.h \ + rgw/rgw_period_pusher.h \ rgw/rgw_realm_reloader.h \ rgw/rgw_realm_watcher.h \ rgw/rgw_civetweb.h \ diff --git a/src/rgw/rgw_period_pusher.cc b/src/rgw/rgw_period_pusher.cc new file mode 100644 index 000000000000..e9576f39a0ac --- /dev/null +++ b/src/rgw/rgw_period_pusher.cc @@ -0,0 +1,285 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include +#include + +#include "rgw_period_pusher.h" +#include "rgw_cr_rest.h" +#include "common/errno.h" + +#include "rgw_boost_asio_yield.h" + +#define dout_subsys ceph_subsys_rgw + +#undef dout_prefix +#define dout_prefix (*_dout << "rgw period pusher: ") + +/// A coroutine to post the period over the given connection. +using PushCR = RGWPostRESTResourceCR; + +/// A coroutine that calls PushCR, and retries with backoff until success. +class PushAndRetryCR : public RGWCoroutine { + const std::string& zone; + RGWRESTConn *const conn; + RGWHTTPManager *const http; + RGWPeriod& period; + const std::string epoch; //< epoch string for params + double timeout; //< current interval between retries + const double timeout_max; //< maximum interval between retries + uint32_t counter; //< number of failures since backoff increased + + public: + PushAndRetryCR(CephContext* cct, const std::string& zone, RGWRESTConn* conn, + RGWHTTPManager* http, RGWPeriod& period) + : RGWCoroutine(cct), zone(zone), conn(conn), http(http), period(period), + epoch(std::to_string(period.get_epoch())), + timeout(cct->_conf->rgw_period_push_interval), + timeout_max(cct->_conf->rgw_period_push_interval_max), + counter(0) + {} + + int operate() override; +}; + +int PushAndRetryCR::operate() +{ + reenter(this) { + for (;;) { + yield { + ldout(cct, 10) << "pushing period " << period.get_id() + << " to " << zone << dendl; + // initialize the http params + rgw_http_param_pair params[] = { + { "period", period.get_id().c_str() }, + { "epoch", epoch.c_str() }, + { nullptr, nullptr } + }; + call(new PushCR(cct, conn, http, "/admin/realm/period", + params, period, nullptr)); + } + + // stop on success + if (get_ret_status() == 0) { + ldout(cct, 10) << "push to " << zone << " succeeded" << dendl; + return set_cr_done(); + } + + // try each endpoint in the connection before waiting + if (++counter < conn->get_endpoint_count()) + continue; + counter = 0; + + // wait with exponential backoff up to timeout_max + yield { + utime_t dur; + dur.set_from_double(timeout); + + ldout(cct, 10) << "waiting " << dur << "s for retry.." << dendl; + wait(dur); + + timeout *= 2; + if (timeout > timeout_max) + timeout = timeout_max; + } + } + } + return 0; +} + +/** + * PushAllCR is a coroutine that sends the period over all of the given + * connections, retrying until they are all marked as completed. + */ +class PushAllCR : public RGWCoroutine { + RGWHTTPManager *const http; + RGWPeriod period; //< period object to push + std::map conns; //< zones that need the period + + public: + PushAllCR(CephContext* cct, RGWHTTPManager* http, RGWPeriod&& period, + std::map&& conns) + : RGWCoroutine(cct), http(http), + period(std::move(period)), + conns(std::move(conns)) + {} + + int operate() override; +}; + +int PushAllCR::operate() +{ + reenter(this) { + // spawn a coroutine to push the period over each connection + yield { + ldout(cct, 4) << "sending " << conns.size() << " periods" << dendl; + for (auto& c : conns) + spawn(new PushAndRetryCR(cct, c.first, &c.second, http, period), false); + } + // wait for all to complete + drain_all(); + return set_cr_done(); + } + return 0; +} + +/// A background thread to run the PushAllCR coroutine and exit. +class RGWPeriodPusher::CRThread { + RGWCoroutinesManager coroutines; + RGWHTTPManager http; + boost::intrusive_ptr push_all; + std::thread thread; + + public: + CRThread(CephContext* cct, RGWPeriod&& period, + std::map&& conns) + : coroutines(cct), + http(cct, coroutines.get_completion_mgr()), + push_all(new PushAllCR(cct, &http, std::move(period), std::move(conns))), + thread([this] { coroutines.run(push_all.get()); }) + { + http.set_threaded(); + } + ~CRThread() + { + push_all.reset(); + coroutines.stop(); + if (thread.joinable()) + thread.join(); + } +}; + + +RGWPeriodPusher::RGWPeriodPusher(RGWRados* store) + : cct(store->ctx()), store(store) +{} + +// destructor is here because CRThread is incomplete in the header +RGWPeriodPusher::~RGWPeriodPusher() = default; + +void RGWPeriodPusher::handle_notify(RGWRealmNotify type, + bufferlist::iterator& p) +{ + // decode the period + RGWZonesNeedPeriod info; + try { + ::decode(info, p); + } catch (buffer::error& e) { + derr(cct) << "Failed to decode the period: " << e.what() << dendl; + return; + } + + std::lock_guard lock(mutex); + + // we can't process this notification without access to our current realm + // configuration. queue it until resume() + if (store == nullptr) { + pending_periods.emplace_back(std::move(info)); + return; + } + + handle_notify(std::move(info)); +} + +// expects the caller to hold a lock on mutex +void RGWPeriodPusher::handle_notify(RGWZonesNeedPeriod&& period) +{ + if (period.get_id() != period_id) { + // new period must follow current period + if (period.get_predecessor() != period_id) { + ldout(cct, 10) << "current period " << period_id << " is not period " + << period.get_id() << "'s predecessor" << dendl; + return; + } + } else if (period.get_epoch() <= period_epoch) { + ldout(cct, 10) << "period epoch " << period.get_epoch() << " is not newer " + "than current epoch " << period_epoch << ", discarding update" << dendl; + return; + } + + // find our zonegroup in the new period + auto& zonegroups = period.get_map().zonegroups; + auto i = zonegroups.find(store->get_zonegroup().get_id()); + if (i == zonegroups.end()) { + lderr(cct) << "The new period does not contain my zonegroup!" << dendl; + return; + } + auto& my_zonegroup = i->second; + + // if we're not a master zone, we're not responsible for pushing any updates + if (my_zonegroup.master_zone != store->get_zone_params().get_id()) + return; + + // construct a map of the zones that need this period. the map uses the same + // keys/ordering as the zone[group] map, so we can use a hint for insertions + std::map conns; + auto hint = conns.end(); + + // are we the master zonegroup in this period? + if (period.get_map().master_zonegroup == store->get_zonegroup().get_id()) { + // update other zonegroup endpoints + for (auto& zg : zonegroups) { + auto& zonegroup = zg.second; + if (zonegroup.get_id() == store->get_zonegroup().get_id()) + continue; + if (zonegroup.endpoints.empty()) + continue; + + hint = conns.emplace_hint( + hint, std::piecewise_construct, + std::forward_as_tuple(zonegroup.get_id()), + std::forward_as_tuple(cct, store, zonegroup.endpoints)); + } + } + + // update other zone endpoints + for (auto& z : my_zonegroup.zones) { + auto& zone = z.second; + if (zone.id == store->get_zone_params().get_id()) + continue; + if (zone.endpoints.empty()) + continue; + + hint = conns.emplace_hint( + hint, std::piecewise_construct, + std::forward_as_tuple(zone.id), + std::forward_as_tuple(cct, store, zone.endpoints)); + } + + if (conns.empty()) { + ldout(cct, 4) << "No zones to update" << dendl; + return; + } + + period_id = period.get_id(); + period_epoch = period.get_epoch(); + + ldout(cct, 4) << "Zone master pushing period " << period_id + << " epoch " << period_epoch << " to " + << conns.size() << " other zones" << dendl; + + // spawn a new coroutine thread, destroying the previous one + cr_thread.reset(new CRThread(cct, std::move(period), std::move(conns))); +} + +void RGWPeriodPusher::pause() +{ + ldout(cct, 4) << "paused for realm update" << dendl; + std::lock_guard lock(mutex); + store = nullptr; +} + +void RGWPeriodPusher::resume(RGWRados* store) +{ + std::lock_guard lock(mutex); + this->store = store; + + ldout(cct, 4) << "resume with " << pending_periods.size() + << " periods pending" << dendl; + + // process notification queue + for (auto& info : pending_periods) { + handle_notify(std::move(info)); + } + pending_periods.clear(); +} diff --git a/src/rgw/rgw_period_pusher.h b/src/rgw/rgw_period_pusher.h new file mode 100644 index 000000000000..bf3ca1ab1d5e --- /dev/null +++ b/src/rgw/rgw_period_pusher.h @@ -0,0 +1,56 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef RGW_PERIOD_PUSHER_H +#define RGW_PERIOD_PUSHER_H + +#include +#include +#include + +#include "rgw_realm_reloader.h" + +class RGWRados; +class RGWPeriod; + +// RGWRealmNotify payload for push coordination +using RGWZonesNeedPeriod = RGWPeriod; + +/** + * RGWPeriodPusher coordinates with other nodes via the realm watcher to manage + * the responsibility for pushing period updates to other zones or zonegroups. + */ +class RGWPeriodPusher final : public RGWRealmWatcher::Watcher, + public RGWRealmReloader::Pauser { + public: + RGWPeriodPusher(RGWRados* store); + ~RGWPeriodPusher(); + + /// respond to realm notifications by pushing new periods to other zones + void handle_notify(RGWRealmNotify type, bufferlist::iterator& p) override; + + /// avoid accessing RGWRados while dynamic reconfiguration is in progress. + /// notifications will be enqueued until resume() + void pause() override; + + /// continue processing notifications with a new RGWRados instance + void resume(RGWRados* store) override; + + private: + void handle_notify(RGWZonesNeedPeriod&& period); + + CephContext *const cct; + RGWRados* store; + + std::mutex mutex; + std::string period_id; //< the current period id being sent + epoch_t period_epoch; //< the current period epoch being sent + + /// while paused for reconfiguration, we need to queue up notifications + std::vector pending_periods; + + class CRThread; //< contains thread, coroutine manager, http manager + std::unique_ptr cr_thread; //< thread to run the push coroutines +}; + +#endif // RGW_PERIOD_PUSHER_H diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index bc379da2f20d..7208c6afc3e5 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -795,6 +795,9 @@ int RGWRealm::notify_zone(bufferlist& bl) int RGWRealm::notify_new_period(const RGWPeriod& period) { bufferlist bl; + // push the period to dependent zonegroups/zones + ::encode(RGWRealmNotify::ZonesNeedPeriod, bl); + ::encode(period, bl); // reload the gateway with the new period ::encode(RGWRealmNotify::Reload, bl); diff --git a/src/rgw/rgw_realm_watcher.h b/src/rgw/rgw_realm_watcher.h index d0e87e6b0ea5..1325571dcc68 100644 --- a/src/rgw/rgw_realm_watcher.h +++ b/src/rgw/rgw_realm_watcher.h @@ -14,6 +14,7 @@ class RGWRealm; enum class RGWRealmNotify { Reload, + ZonesNeedPeriod, }; WRITE_RAW_ENCODER(RGWRealmNotify); diff --git a/src/rgw/rgw_rest_conn.h b/src/rgw/rgw_rest_conn.h index 28537cf96d46..308d82b6d1fd 100644 --- a/src/rgw/rgw_rest_conn.h +++ b/src/rgw/rgw_rest_conn.h @@ -76,6 +76,7 @@ public: CephContext *get_ctx() { return cct; } + size_t get_endpoint_count() const { return endpoints.size(); } /* sync request */ int forward(const rgw_user& uid, req_info& info, obj_version *objv, size_t max_response, bufferlist *inbl, bufferlist *outbl);