From 1e0ac52660b2bdc7093d13acc37398aaafc39b19 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Fri, 20 Nov 2015 15:06:43 -0800 Subject: [PATCH] rgw: hook cr managers to admin socket Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_coroutine.cc | 60 ++++++++++++++++++++++++++++++++++++ src/rgw/rgw_coroutine.h | 39 +++++++++++++++++++++-- src/rgw/rgw_data_sync.h | 6 ++-- src/rgw/rgw_period_pusher.cc | 2 +- src/rgw/rgw_rados.cc | 13 ++++++-- src/rgw/rgw_rados.h | 6 ++++ src/rgw/rgw_sync.h | 2 +- 7 files changed, 119 insertions(+), 9 deletions(-) diff --git a/src/rgw/rgw_coroutine.cc b/src/rgw/rgw_coroutine.cc index 25633be668acf..e7cc37ad7b981 100644 --- a/src/rgw/rgw_coroutine.cc +++ b/src/rgw/rgw_coroutine.cc @@ -513,6 +513,66 @@ void RGWCoroutinesManager::dump(Formatter *f) const { f->close_section(); } +string RGWCoroutinesManager::get_id() +{ + if (!id.empty()) { + return id; + } + stringstream ss; + ss << (void *)this; + return ss.str(); +} + +void RGWCoroutinesManagerRegistry::add(RGWCoroutinesManager *mgr) +{ + RWLock::WLocker wl(lock); + if (managers.find(mgr) == managers.end()) { + managers.insert(mgr); + get(); + } +} + +void RGWCoroutinesManagerRegistry::remove(RGWCoroutinesManager *mgr) +{ + RWLock::WLocker wl(lock); + if (managers.find(mgr) != managers.end()) { + managers.erase(mgr); + put(); + } +} + +int RGWCoroutinesManagerRegistry::hook_to_admin_command(const string& command) +{ + admin_command = command; + AdminSocket *admin_socket = cct->get_admin_socket(); + int r = admin_socket->register_command(admin_command, admin_command, this, + "dump current coroutines stack state"); + if (r < 0) { + lderr(cct) << "ERROR: fail to register admin socket command (r=" << r << ")" << dendl; + return r; + } + return 0; +} + +bool RGWCoroutinesManagerRegistry::call(std::string command, cmdmap_t& cmdmap, std::string format, + bufferlist& out) { + RWLock::RLocker rl(lock); + stringstream ss; + JSONFormatter f; + ::encode_json("cr_managers", *this, &f); + f.flush(ss); + out.append(ss); + return true; +} + +void RGWCoroutinesManagerRegistry::dump(Formatter *f) const { + f->open_array_section("coroutine_managers"); + for (auto m : managers) { + ::encode_json("entry", *m, f); + } + f->close_section(); +} + void RGWCoroutine::call(RGWCoroutine *op) { stack->call(op); diff --git a/src/rgw/rgw_coroutine.h b/src/rgw/rgw_coroutine.h index 4410d455934ed..353fe704736e7 100644 --- a/src/rgw/rgw_coroutine.h +++ b/src/rgw/rgw_coroutine.h @@ -16,6 +16,7 @@ #include "common/RefCountedObj.h" #include "common/debug.h" #include "common/Timer.h" +#include "common/admin_socket.h" #include "rgw_common.h" #include "rgw_boost_asio_coroutine.h" @@ -401,6 +402,27 @@ void RGWConsumerCR::receive(const T& p, bool wakeup) } } +class RGWCoroutinesManagerRegistry : public RefCountedObject, public AdminSocketHook { + CephContext *cct; + + set managers; + RWLock lock; + + string admin_command; + +public: + RGWCoroutinesManagerRegistry(CephContext *_cct) : cct(_cct), lock("RGWCoroutinesRegistry::lock") {} + + void add(RGWCoroutinesManager *mgr); + void remove(RGWCoroutinesManager *mgr); + + int hook_to_admin_command(const string& command); + bool call(std::string command, cmdmap_t& cmdmap, std::string format, + bufferlist& out); + + void dump(Formatter *f) const; +}; + class RGWCoroutinesManager { CephContext *cct; atomic_t going_down; @@ -413,13 +435,25 @@ class RGWCoroutinesManager { void handle_unblocked_stack(set& context_stacks, list& stacks, RGWCoroutinesStack *stack, int *waiting_count); protected: RGWCompletionManager completion_mgr; + RGWCoroutinesManagerRegistry *cr_registry; int ops_window; + string id; + void put_completion_notifier(RGWAioCompletionNotifier *cn); public: - RGWCoroutinesManager(CephContext *_cct) : cct(_cct), lock("RGWCoroutinesManager::lock"), completion_mgr(cct), ops_window(RGW_ASYNC_OPS_MGR_WINDOW) {} - virtual ~RGWCoroutinesManager() {} + RGWCoroutinesManager(CephContext *_cct, RGWCoroutinesManagerRegistry *_cr_registry) : cct(_cct), lock("RGWCoroutinesManager::lock"), + completion_mgr(cct), cr_registry(_cr_registry), ops_window(RGW_ASYNC_OPS_MGR_WINDOW) { + if (cr_registry) { + cr_registry->add(this); + } + } + virtual ~RGWCoroutinesManager() { + if (cr_registry) { + cr_registry->remove(this); + } + } int run(list& ops); int run(RGWCoroutine *op); @@ -439,6 +473,7 @@ public: return stack; } + virtual string get_id(); void dump(Formatter *f) const; }; diff --git a/src/rgw/rgw_data_sync.h b/src/rgw/rgw_data_sync.h index 929818d0e3d1e..7fba583df8752 100644 --- a/src/rgw/rgw_data_sync.h +++ b/src/rgw/rgw_data_sync.h @@ -156,7 +156,7 @@ class RGWRemoteDataLog : public RGWCoroutinesManager { bool initialized; public: - RGWRemoteDataLog(RGWRados *_store, RGWDataSyncStatusManager *_sm) : RGWCoroutinesManager(_store->ctx()), store(_store), + RGWRemoteDataLog(RGWRados *_store, RGWDataSyncStatusManager *_sm) : RGWCoroutinesManager(_store->ctx(), _store->get_cr_registry()), store(_store), conn(NULL), http_manager(store->ctx(), &completion_mgr), status_manager(_sm), lock("RGWRemoteDataLog::lock"), data_sync_cr(NULL), @@ -344,7 +344,7 @@ class RGWRemoteBucketLog : public RGWCoroutinesManager { public: RGWRemoteBucketLog(RGWRados *_store, RGWBucketSyncStatusManager *_sm, - RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager) : RGWCoroutinesManager(_store->ctx()), store(_store), + RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager) : RGWCoroutinesManager(_store->ctx(), _store->get_cr_registry()), store(_store), conn(NULL), shard_id(0), status_manager(_sm), async_rados(_async_rados), http_manager(_http_manager), sync_cr(NULL) {} @@ -388,7 +388,7 @@ class RGWBucketSyncStatusManager { public: RGWBucketSyncStatusManager(RGWRados *_store, const string& _source_zone, const string& _bucket_name, const string& _bucket_id) : store(_store), - cr_mgr(_store->ctx()), + cr_mgr(_store->ctx(), _store->get_cr_registry()), async_rados(NULL), http_manager(store->ctx(), cr_mgr.get_completion_mgr()), source_zone(_source_zone), diff --git a/src/rgw/rgw_period_pusher.cc b/src/rgw/rgw_period_pusher.cc index adf51f03d1fc5..7669631f10b3f 100644 --- a/src/rgw/rgw_period_pusher.cc +++ b/src/rgw/rgw_period_pusher.cc @@ -133,7 +133,7 @@ class RGWPeriodPusher::CRThread { public: CRThread(CephContext* cct, RGWPeriod&& period, std::map&& conns) - : coroutines(cct), + : coroutines(cct, NULL), http(cct, coroutines.get_completion_mgr()), push_all(new PushAllCR(cct, &http, std::move(period), std::move(conns))), thread([this] { coroutines.run(push_all.get()); }) diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index ead4579666207..c34950e1664fb 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -2393,7 +2393,7 @@ class RGWMetaNotifierManager : public RGWCoroutinesManager { RGWHTTPManager http_manager; public: - RGWMetaNotifierManager(RGWRados *_store) : RGWCoroutinesManager(_store->ctx()), store(_store), + RGWMetaNotifierManager(RGWRados *_store) : RGWCoroutinesManager(_store->ctx(), _store->get_cr_registry()), store(_store), http_manager(store->ctx(), &completion_mgr) { http_manager.set_threaded(); } @@ -2420,7 +2420,7 @@ class RGWDataNotifierManager : public RGWCoroutinesManager { RGWHTTPManager http_manager; public: - RGWDataNotifierManager(RGWRados *_store) : RGWCoroutinesManager(_store->ctx()), store(_store), + RGWDataNotifierManager(RGWRados *_store) : RGWCoroutinesManager(_store->ctx(), _store->get_cr_registry()), store(_store), http_manager(store->ctx(), &completion_mgr) { http_manager.set_threaded(); } @@ -2843,6 +2843,9 @@ void RGWRados::finalize() delete conn; } RGWQuotaHandler::free_handler(quota_handler); + if (cr_registry) { + cr_registry->put(); + } } /** @@ -2880,6 +2883,12 @@ int RGWRados::init_rados() } } + cr_registry = new RGWCoroutinesManagerRegistry(cct); + ret = cr_registry->hook_to_admin_command("cr dump"); + if (ret < 0) { + goto fail; + } + meta_mgr = new RGWMetadataManager(cct, this); data_log = new RGWDataChangesLog(cct, this); diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index 7d28ea3e4080a..7a5d32084bc0e 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -1479,6 +1479,7 @@ WRITE_CLASS_ENCODER(RGWPeriod) class RGWDataChangesLog; class RGWReplicaLogger; +class RGWCoroutinesManagerRegistry; class RGWStateLog { RGWRados *store; @@ -1770,6 +1771,8 @@ protected: RGWQuotaHandler *quota_handler; Finisher *finisher; + + RGWCoroutinesManagerRegistry *cr_registry; RGWZoneGroup zonegroup; RGWZone zone_public_config; /* external zone params, e.g., entrypoints, log flags, etc. */ @@ -1797,6 +1800,7 @@ public: pools_initialized(false), quota_handler(NULL), finisher(NULL), + cr_registry(NULL), has_period_zonegroup(false), has_period_zone(false), rest_master_conn(NULL), @@ -1999,6 +2003,8 @@ public: virtual int list_placement_set(set& names); virtual int create_pools(vector& names, vector& retcodes); + RGWCoroutinesManagerRegistry *get_cr_registry() { return cr_registry; } + class SystemObject { RGWRados *store; RGWObjectCtx& ctx; diff --git a/src/rgw/rgw_sync.h b/src/rgw/rgw_sync.h index ea6e28da7bacd..7b68c6f5608a1 100644 --- a/src/rgw/rgw_sync.h +++ b/src/rgw/rgw_sync.h @@ -211,7 +211,7 @@ class RGWRemoteMetaLog : public RGWCoroutinesManager { void init_sync_env(RGWMetaSyncEnv *env); public: - RGWRemoteMetaLog(RGWRados *_store, RGWMetaSyncStatusManager *_sm) : RGWCoroutinesManager(_store->ctx()), store(_store), + RGWRemoteMetaLog(RGWRados *_store, RGWMetaSyncStatusManager *_sm) : RGWCoroutinesManager(_store->ctx(), _store->get_cr_registry()), store(_store), conn(NULL), async_rados(nullptr), http_manager(store->ctx(), &completion_mgr), status_manager(_sm), meta_sync_cr(NULL), sync_report(_store->ctx()) {} -- 2.39.5