]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: rework completion notifier and manager lifecycle
authorYehuda Sadeh <yehuda@redhat.com>
Tue, 8 Mar 2016 17:58:32 +0000 (09:58 -0800)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 8 Mar 2016 18:02:55 +0000 (10:02 -0800)
completion manager is now refcounted, and keeps track of all the
notifiers. This is needed so that when we shut down we can release
all completion notifiers, so that they don't reference the manager
anymore.

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_coroutine.cc
src/rgw/rgw_coroutine.h
src/rgw/rgw_data_sync.h
src/rgw/rgw_http_client.cc
src/rgw/rgw_metadata.cc
src/rgw/rgw_rados.cc
src/rgw/rgw_sync.h

index a33337d905c7a92dfacd58af14859394110478d3..ef28ae21fa0ce1e8217d85ba94ca4de35b597861 100644 (file)
@@ -22,14 +22,36 @@ RGWCompletionManager::~RGWCompletionManager()
   timer.shutdown();
 }
 
-void RGWCompletionManager::complete(void *user_info)
+void RGWCompletionManager::complete(RGWAioCompletionNotifier *cn, void *user_info)
 {
   Mutex::Locker l(lock);
-  _complete(user_info);
+  _complete(cn, user_info);
 }
 
-void RGWCompletionManager::_complete(void *user_info)
+void RGWCompletionManager::register_completion_notifier(RGWAioCompletionNotifier *cn)
 {
+  Mutex::Locker l(lock);
+  if (cn) {
+    cns.insert(cn);
+    cn->get();
+  }
+}
+
+void RGWCompletionManager::unregister_completion_notifier(RGWAioCompletionNotifier *cn)
+{
+  Mutex::Locker l(lock);
+  if (cn) {
+    cns.erase(cn);
+    cn->put();
+  }
+}
+
+void RGWCompletionManager::_complete(RGWAioCompletionNotifier *cn, void *user_info)
+{
+  if (cn) {
+    cns.erase(cn);
+    cn->put();
+  }
   complete_reqs.push_back(user_info);
   cond.Signal();
 }
@@ -62,6 +84,9 @@ bool RGWCompletionManager::try_get_next(void **user_info)
 void RGWCompletionManager::go_down()
 {
   Mutex::Locker l(lock);
+  for (auto cn : cns) {
+    cn->unregister();
+  }
   going_down.set(1);
   cond.Signal();
 }
@@ -86,7 +111,7 @@ void RGWCompletionManager::_wakeup(void *opaque)
   if (iter != waiters.end()) {
     void *user_id = iter->second;
     waiters.erase(iter);
-    _complete(user_id);
+    _complete(NULL, user_id);
   }
 }
 
@@ -336,7 +361,8 @@ static void _aio_completion_notifier_cb(librados::completion_t cb, void *arg)
   ((RGWAioCompletionNotifier *)arg)->cb();
 }
 
-RGWAioCompletionNotifier::RGWAioCompletionNotifier(RGWCompletionManager *_mgr, void *_user_data) : completion_mgr(_mgr), user_data(_user_data) {
+RGWAioCompletionNotifier::RGWAioCompletionNotifier(RGWCompletionManager *_mgr, void *_user_data) : completion_mgr(_mgr),
+                                                                         user_data(_user_data), lock("RGWAioCompletionNotifier"), registered(true) {
   c = librados::Rados::aio_create_completion((void *)this, _aio_completion_notifier_cb, NULL);
 }
 
@@ -498,7 +524,7 @@ int RGWCoroutinesManager::run(list<RGWCoroutinesStack *>& stacks)
     lock.unlock();
 
     RGWCoroutinesStack *blocked_stack;
-    while (completion_mgr.try_get_next((void **)&blocked_stack)) {
+    while (completion_mgr->try_get_next((void **)&blocked_stack)) {
       handle_unblocked_stack(context_stacks, scheduled_stacks, blocked_stack, &blocked_count);
     }
 
@@ -508,7 +534,7 @@ int RGWCoroutinesManager::run(list<RGWCoroutinesStack *>& stacks)
      * these aren't really waiting for IOs
      */
     while (blocked_count - interval_wait_count >= ops_window) {
-      ret = completion_mgr.get_next((void **)&blocked_stack);
+      ret = completion_mgr->get_next((void **)&blocked_stack);
       if (ret < 0) {
        ldout(cct, 0) << "ERROR: failed to clone shard, completion_mgr.get_next() returned ret=" << ret << dendl;
       }
@@ -520,7 +546,7 @@ int RGWCoroutinesManager::run(list<RGWCoroutinesStack *>& stacks)
 
 
     while (scheduled_stacks.empty() && blocked_count > 0) {
-      ret = completion_mgr.get_next((void **)&blocked_stack);
+      ret = completion_mgr->get_next((void **)&blocked_stack);
       if (ret < 0) {
        ldout(cct, 0) << "ERROR: failed to clone shard, completion_mgr.get_next() returned ret=" << ret << dendl;
       }
index 1815fa8ab0a003471f9b182812efbd23377193f3..09128576bb1a45b16a2ac10db289b4463e46b5b4 100644 (file)
 
 class RGWCoroutinesStack;
 class RGWCoroutinesManager;
+class RGWAioCompletionNotifier;
 
-class RGWCompletionManager {
+class RGWCompletionManager : public RefCountedObject {
   CephContext *cct;
   list<void *> complete_reqs;
+  set<RGWAioCompletionNotifier *> cns;
 
   Mutex lock;
   Cond cond;
@@ -53,12 +55,12 @@ class RGWCompletionManager {
 
 protected:
   void _wakeup(void *opaque);
-  void _complete(void *user_info);
+  void _complete(RGWAioCompletionNotifier *cn, void *user_info);
 public:
   RGWCompletionManager(CephContext *_cct);
   ~RGWCompletionManager();
 
-  void complete(void *user_info);
+  void complete(RGWAioCompletionNotifier *cn, void *user_info);
   int get_next(void **user_info);
   bool try_get_next(void **user_info);
 
@@ -69,6 +71,9 @@ public:
    */
   void wait_interval(void *opaque, const utime_t& interval, void *user_info);
   void wakeup(void *opaque);
+
+  void register_completion_notifier(RGWAioCompletionNotifier *cn);
+  void unregister_completion_notifier(RGWAioCompletionNotifier *cn);
 };
 
 /* a single use librados aio completion notifier that hooks into the RGWCompletionManager */
@@ -76,19 +81,50 @@ class RGWAioCompletionNotifier : public RefCountedObject {
   librados::AioCompletion *c;
   RGWCompletionManager *completion_mgr;
   void *user_data;
+  Mutex lock;
+  bool registered;
 
 public:
   RGWAioCompletionNotifier(RGWCompletionManager *_mgr, void *_user_data);
   ~RGWAioCompletionNotifier() {
     c->release();
+    lock.Lock();
+    bool need_unregister = registered;
+    if (registered) {
+      completion_mgr->get();
+    }
+    registered = false;
+    lock.Unlock();
+    if (need_unregister) {
+      completion_mgr->unregister_completion_notifier(this);
+      completion_mgr->put();
+    }
   }
 
   librados::AioCompletion *completion() {
     return c;
   }
 
+  void unregister() {
+    Mutex::Locker l(lock);
+    if (!registered) {
+      return;
+    }
+    registered = false;
+  }
+
   void cb() {
-    completion_mgr->complete(user_data);
+    lock.Lock();
+    if (!registered) {
+      lock.Unlock();
+      put();
+      return;
+    }
+    completion_mgr->get();
+    registered = false;
+    lock.Unlock();
+    completion_mgr->complete(this, user_data);
+    completion_mgr->put();
     put();
   }
 };
@@ -483,7 +519,7 @@ class RGWCoroutinesManager {
 
   void handle_unblocked_stack(set<RGWCoroutinesStack *>& context_stacks, list<RGWCoroutinesStack *>& scheduled_stacks, RGWCoroutinesStack *stack, int *waiting_count);
 protected:
-  RGWCompletionManager completion_mgr;
+  RGWCompletionManager *completion_mgr;
   RGWCoroutinesManagerRegistry *cr_registry;
 
   int ops_window;
@@ -493,12 +529,14 @@ protected:
   void put_completion_notifier(RGWAioCompletionNotifier *cn);
 public:
   RGWCoroutinesManager(CephContext *_cct, RGWCoroutinesManagerRegistry *_cr_registry) : cct(_cct), lock("RGWCoroutinesManager::lock"),
-                                                                                        completion_mgr(cct), cr_registry(_cr_registry), ops_window(RGW_ASYNC_OPS_MGR_WINDOW) {
+                                                                                        cr_registry(_cr_registry), ops_window(RGW_ASYNC_OPS_MGR_WINDOW) {
+    completion_mgr = new RGWCompletionManager(cct);
     if (cr_registry) {
       cr_registry->add(this);
     }
   }
   virtual ~RGWCoroutinesManager() {
+    completion_mgr->put();
     if (cr_registry) {
       cr_registry->remove(this);
     }
@@ -508,13 +546,13 @@ public:
   int run(RGWCoroutine *op);
   void stop() {
     going_down.set(1);
-    completion_mgr.go_down();
+    completion_mgr->go_down();
   }
 
   virtual void report_error(RGWCoroutinesStack *op);
 
   RGWAioCompletionNotifier *create_completion_notifier(RGWCoroutinesStack *stack);
-  RGWCompletionManager *get_completion_mgr() { return &completion_mgr; }
+  RGWCompletionManager *get_completion_mgr() { return completion_mgr; }
 
   void schedule(RGWCoroutinesEnv *env, RGWCoroutinesStack *stack);
   RGWCoroutinesStack *allocate_stack();
index 7cf497b9bb73b22c77b300f2370ce9f0292c6f55..1574a7fecd25815c6c5efcfb9e9f140cb222f27d 100644 (file)
@@ -184,7 +184,7 @@ public:
   RGWRemoteDataLog(RGWRados *_store, RGWAsyncRadosProcessor *async_rados)
     : RGWCoroutinesManager(_store->ctx(), _store->get_cr_registry()),
       store(_store), async_rados(async_rados),
-      http_manager(store->ctx(), &completion_mgr),
+      http_manager(store->ctx(), completion_mgr),
       lock("RGWRemoteDataLog::lock"), data_sync_cr(NULL),
       initialized(false) {}
   int init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger);
index 1046a91b0cfc0d7d6523963198f4ce25bc64a2af..22f0bf4224da3c8df3f4a48a1b25f46994870f9c 100644 (file)
@@ -385,7 +385,7 @@ void RGWHTTPManager::_complete_request(rgw_http_req_data *req_data)
     reqs.erase(iter);
   }
   if (completion_mgr) {
-    completion_mgr->complete(req_data->client->get_user_info());
+    completion_mgr->complete(NULL, req_data->client->get_user_info());
   }
   req_data->put();
 }
index 027781bb1e983408b82c9d7e59e1ee53f72dbc24..9a5804ee1dc84a147b84fc1720bc78f2815f6ab1 100644 (file)
@@ -203,7 +203,7 @@ public:
       pinfo->marker = header.max_marker;
       pinfo->last_update = header.max_time;
     }
-    completion_manager->complete(user_info);
+    completion_manager->complete(NULL, user_info);
     put();
   }
 
index 8f45ad7e91deef5a834a47df7c3ac5baf8fdb8a3..2424df74868ce8efc8d4b487882624e21b8ad822 100644 (file)
@@ -2673,7 +2673,7 @@ class RGWMetaNotifierManager : public RGWCoroutinesManager {
 
 public:
   RGWMetaNotifierManager(RGWRados *_store) : RGWCoroutinesManager(_store->ctx(), _store->get_cr_registry()), store(_store),
-                                             http_manager(store->ctx(), &completion_mgr) {
+                                             http_manager(store->ctx(), completion_mgr) {
     http_manager.set_threaded();
   }
 
@@ -2700,7 +2700,7 @@ class RGWDataNotifierManager : public RGWCoroutinesManager {
 
 public:
   RGWDataNotifierManager(RGWRados *_store) : RGWCoroutinesManager(_store->ctx(), _store->get_cr_registry()), store(_store),
-                                             http_manager(store->ctx(), &completion_mgr) {
+                                             http_manager(store->ctx(), completion_mgr) {
     http_manager.set_threaded();
   }
 
index 2b191201c733d6431842627e8f300c48f0a744e2..af57b8d15b2283420a77491419880b9ab5bc8e87 100644 (file)
@@ -167,7 +167,7 @@ public:
                    RGWMetaSyncStatusManager *_sm)
     : RGWCoroutinesManager(_store->ctx(), _store->get_cr_registry()),
       store(_store), conn(NULL), async_rados(async_rados),
-      http_manager(store->ctx(), &completion_mgr),
+      http_manager(store->ctx(), completion_mgr),
       status_manager(_sm), error_logger(NULL), meta_sync_cr(NULL) {}
 
   ~RGWRemoteMetaLog();