From: Yehuda Sadeh Date: Tue, 8 Mar 2016 17:58:32 +0000 (-0800) Subject: rgw: rework completion notifier and manager lifecycle X-Git-Tag: v10.1.0~143^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=1a800f60958c22ee902b67d00f4679a99004a837;p=ceph.git rgw: rework completion notifier and manager lifecycle completion manager is now refcounted, and keeps track of all the notifiers. This is needed so that when we shut down we can release all completion notifiers, so that they don't reference the manager anymore. Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/rgw_coroutine.cc b/src/rgw/rgw_coroutine.cc index a33337d905c7..ef28ae21fa0c 100644 --- a/src/rgw/rgw_coroutine.cc +++ b/src/rgw/rgw_coroutine.cc @@ -22,14 +22,36 @@ RGWCompletionManager::~RGWCompletionManager() timer.shutdown(); } -void RGWCompletionManager::complete(void *user_info) +void RGWCompletionManager::complete(RGWAioCompletionNotifier *cn, void *user_info) { Mutex::Locker l(lock); - _complete(user_info); + _complete(cn, user_info); } -void RGWCompletionManager::_complete(void *user_info) +void RGWCompletionManager::register_completion_notifier(RGWAioCompletionNotifier *cn) { + Mutex::Locker l(lock); + if (cn) { + cns.insert(cn); + cn->get(); + } +} + +void RGWCompletionManager::unregister_completion_notifier(RGWAioCompletionNotifier *cn) +{ + Mutex::Locker l(lock); + if (cn) { + cns.erase(cn); + cn->put(); + } +} + +void RGWCompletionManager::_complete(RGWAioCompletionNotifier *cn, void *user_info) +{ + if (cn) { + cns.erase(cn); + cn->put(); + } complete_reqs.push_back(user_info); cond.Signal(); } @@ -62,6 +84,9 @@ bool RGWCompletionManager::try_get_next(void **user_info) void RGWCompletionManager::go_down() { Mutex::Locker l(lock); + for (auto cn : cns) { + cn->unregister(); + } going_down.set(1); cond.Signal(); } @@ -86,7 +111,7 @@ void RGWCompletionManager::_wakeup(void *opaque) if (iter != waiters.end()) { void *user_id = iter->second; waiters.erase(iter); - _complete(user_id); + _complete(NULL, user_id); } } @@ -336,7 +361,8 @@ static void _aio_completion_notifier_cb(librados::completion_t cb, void *arg) ((RGWAioCompletionNotifier *)arg)->cb(); } -RGWAioCompletionNotifier::RGWAioCompletionNotifier(RGWCompletionManager *_mgr, void *_user_data) : completion_mgr(_mgr), user_data(_user_data) { +RGWAioCompletionNotifier::RGWAioCompletionNotifier(RGWCompletionManager *_mgr, void *_user_data) : completion_mgr(_mgr), + user_data(_user_data), lock("RGWAioCompletionNotifier"), registered(true) { c = librados::Rados::aio_create_completion((void *)this, _aio_completion_notifier_cb, NULL); } @@ -498,7 +524,7 @@ int RGWCoroutinesManager::run(list& stacks) lock.unlock(); RGWCoroutinesStack *blocked_stack; - while (completion_mgr.try_get_next((void **)&blocked_stack)) { + while (completion_mgr->try_get_next((void **)&blocked_stack)) { handle_unblocked_stack(context_stacks, scheduled_stacks, blocked_stack, &blocked_count); } @@ -508,7 +534,7 @@ int RGWCoroutinesManager::run(list& stacks) * these aren't really waiting for IOs */ while (blocked_count - interval_wait_count >= ops_window) { - ret = completion_mgr.get_next((void **)&blocked_stack); + ret = completion_mgr->get_next((void **)&blocked_stack); if (ret < 0) { ldout(cct, 0) << "ERROR: failed to clone shard, completion_mgr.get_next() returned ret=" << ret << dendl; } @@ -520,7 +546,7 @@ int RGWCoroutinesManager::run(list& stacks) while (scheduled_stacks.empty() && blocked_count > 0) { - ret = completion_mgr.get_next((void **)&blocked_stack); + ret = completion_mgr->get_next((void **)&blocked_stack); if (ret < 0) { ldout(cct, 0) << "ERROR: failed to clone shard, completion_mgr.get_next() returned ret=" << ret << dendl; } diff --git a/src/rgw/rgw_coroutine.h b/src/rgw/rgw_coroutine.h index 1815fa8ab0a0..09128576bb1a 100644 --- a/src/rgw/rgw_coroutine.h +++ b/src/rgw/rgw_coroutine.h @@ -25,10 +25,12 @@ class RGWCoroutinesStack; class RGWCoroutinesManager; +class RGWAioCompletionNotifier; -class RGWCompletionManager { +class RGWCompletionManager : public RefCountedObject { CephContext *cct; list complete_reqs; + set cns; Mutex lock; Cond cond; @@ -53,12 +55,12 @@ class RGWCompletionManager { protected: void _wakeup(void *opaque); - void _complete(void *user_info); + void _complete(RGWAioCompletionNotifier *cn, void *user_info); public: RGWCompletionManager(CephContext *_cct); ~RGWCompletionManager(); - void complete(void *user_info); + void complete(RGWAioCompletionNotifier *cn, void *user_info); int get_next(void **user_info); bool try_get_next(void **user_info); @@ -69,6 +71,9 @@ public: */ void wait_interval(void *opaque, const utime_t& interval, void *user_info); void wakeup(void *opaque); + + void register_completion_notifier(RGWAioCompletionNotifier *cn); + void unregister_completion_notifier(RGWAioCompletionNotifier *cn); }; /* a single use librados aio completion notifier that hooks into the RGWCompletionManager */ @@ -76,19 +81,50 @@ class RGWAioCompletionNotifier : public RefCountedObject { librados::AioCompletion *c; RGWCompletionManager *completion_mgr; void *user_data; + Mutex lock; + bool registered; public: RGWAioCompletionNotifier(RGWCompletionManager *_mgr, void *_user_data); ~RGWAioCompletionNotifier() { c->release(); + lock.Lock(); + bool need_unregister = registered; + if (registered) { + completion_mgr->get(); + } + registered = false; + lock.Unlock(); + if (need_unregister) { + completion_mgr->unregister_completion_notifier(this); + completion_mgr->put(); + } } librados::AioCompletion *completion() { return c; } + void unregister() { + Mutex::Locker l(lock); + if (!registered) { + return; + } + registered = false; + } + void cb() { - completion_mgr->complete(user_data); + lock.Lock(); + if (!registered) { + lock.Unlock(); + put(); + return; + } + completion_mgr->get(); + registered = false; + lock.Unlock(); + completion_mgr->complete(this, user_data); + completion_mgr->put(); put(); } }; @@ -483,7 +519,7 @@ class RGWCoroutinesManager { void handle_unblocked_stack(set& context_stacks, list& scheduled_stacks, RGWCoroutinesStack *stack, int *waiting_count); protected: - RGWCompletionManager completion_mgr; + RGWCompletionManager *completion_mgr; RGWCoroutinesManagerRegistry *cr_registry; int ops_window; @@ -493,12 +529,14 @@ protected: void put_completion_notifier(RGWAioCompletionNotifier *cn); public: RGWCoroutinesManager(CephContext *_cct, RGWCoroutinesManagerRegistry *_cr_registry) : cct(_cct), lock("RGWCoroutinesManager::lock"), - completion_mgr(cct), cr_registry(_cr_registry), ops_window(RGW_ASYNC_OPS_MGR_WINDOW) { + cr_registry(_cr_registry), ops_window(RGW_ASYNC_OPS_MGR_WINDOW) { + completion_mgr = new RGWCompletionManager(cct); if (cr_registry) { cr_registry->add(this); } } virtual ~RGWCoroutinesManager() { + completion_mgr->put(); if (cr_registry) { cr_registry->remove(this); } @@ -508,13 +546,13 @@ public: int run(RGWCoroutine *op); void stop() { going_down.set(1); - completion_mgr.go_down(); + completion_mgr->go_down(); } virtual void report_error(RGWCoroutinesStack *op); RGWAioCompletionNotifier *create_completion_notifier(RGWCoroutinesStack *stack); - RGWCompletionManager *get_completion_mgr() { return &completion_mgr; } + RGWCompletionManager *get_completion_mgr() { return completion_mgr; } void schedule(RGWCoroutinesEnv *env, RGWCoroutinesStack *stack); RGWCoroutinesStack *allocate_stack(); diff --git a/src/rgw/rgw_data_sync.h b/src/rgw/rgw_data_sync.h index 7cf497b9bb73..1574a7fecd25 100644 --- a/src/rgw/rgw_data_sync.h +++ b/src/rgw/rgw_data_sync.h @@ -184,7 +184,7 @@ public: RGWRemoteDataLog(RGWRados *_store, RGWAsyncRadosProcessor *async_rados) : RGWCoroutinesManager(_store->ctx(), _store->get_cr_registry()), store(_store), async_rados(async_rados), - http_manager(store->ctx(), &completion_mgr), + http_manager(store->ctx(), completion_mgr), lock("RGWRemoteDataLog::lock"), data_sync_cr(NULL), initialized(false) {} int init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger); diff --git a/src/rgw/rgw_http_client.cc b/src/rgw/rgw_http_client.cc index 1046a91b0cfc..22f0bf4224da 100644 --- a/src/rgw/rgw_http_client.cc +++ b/src/rgw/rgw_http_client.cc @@ -385,7 +385,7 @@ void RGWHTTPManager::_complete_request(rgw_http_req_data *req_data) reqs.erase(iter); } if (completion_mgr) { - completion_mgr->complete(req_data->client->get_user_info()); + completion_mgr->complete(NULL, req_data->client->get_user_info()); } req_data->put(); } diff --git a/src/rgw/rgw_metadata.cc b/src/rgw/rgw_metadata.cc index 027781bb1e98..9a5804ee1dc8 100644 --- a/src/rgw/rgw_metadata.cc +++ b/src/rgw/rgw_metadata.cc @@ -203,7 +203,7 @@ public: pinfo->marker = header.max_marker; pinfo->last_update = header.max_time; } - completion_manager->complete(user_info); + completion_manager->complete(NULL, user_info); put(); } diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 8f45ad7e91de..2424df74868c 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -2673,7 +2673,7 @@ class RGWMetaNotifierManager : public RGWCoroutinesManager { public: RGWMetaNotifierManager(RGWRados *_store) : RGWCoroutinesManager(_store->ctx(), _store->get_cr_registry()), store(_store), - http_manager(store->ctx(), &completion_mgr) { + http_manager(store->ctx(), completion_mgr) { http_manager.set_threaded(); } @@ -2700,7 +2700,7 @@ class RGWDataNotifierManager : public RGWCoroutinesManager { public: RGWDataNotifierManager(RGWRados *_store) : RGWCoroutinesManager(_store->ctx(), _store->get_cr_registry()), store(_store), - http_manager(store->ctx(), &completion_mgr) { + http_manager(store->ctx(), completion_mgr) { http_manager.set_threaded(); } diff --git a/src/rgw/rgw_sync.h b/src/rgw/rgw_sync.h index 2b191201c733..af57b8d15b22 100644 --- a/src/rgw/rgw_sync.h +++ b/src/rgw/rgw_sync.h @@ -167,7 +167,7 @@ public: RGWMetaSyncStatusManager *_sm) : RGWCoroutinesManager(_store->ctx(), _store->get_cr_registry()), store(_store), conn(NULL), async_rados(async_rados), - http_manager(store->ctx(), &completion_mgr), + http_manager(store->ctx(), completion_mgr), status_manager(_sm), error_logger(NULL), meta_sync_cr(NULL) {} ~RGWRemoteMetaLog();