]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
librados_test_stub: watch/notify now behaves similar to librados
authorJason Dillaman <dillaman@redhat.com>
Thu, 28 Jan 2016 19:35:54 +0000 (14:35 -0500)
committerJason Dillaman <dillaman@redhat.com>
Thu, 11 Feb 2016 14:41:32 +0000 (09:41 -0500)
Notifications are executed via the same librados AIO callback
thread, so it's now possible to catch deadlock.

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
(cherry picked from commit 0a3822f1559ba3fe3def6a65883b9c6c7c5a33fe)

src/test/librados_test_stub/TestRadosClient.cc
src/test/librados_test_stub/TestWatchNotify.cc
src/test/librados_test_stub/TestWatchNotify.h

index 46437ac8657e329aba6dd59b48988d3a7010e758..c75f79e286dab65d21740c04c09f1d796cc10398 100644 (file)
@@ -84,7 +84,8 @@ private:
 
 TestRadosClient::TestRadosClient(CephContext *cct)
   : m_cct(cct->get()),
-    m_watch_notify(m_cct),
+    m_aio_finisher(new Finisher(m_cct)),
+    m_watch_notify(m_cct, m_aio_finisher),
     m_transaction_lock("TestRadosClient::m_transaction_lock")
 {
   get();
@@ -97,7 +98,6 @@ TestRadosClient::TestRadosClient(CephContext *cct)
   }
 
   // replicate AIO callback processing
-  m_aio_finisher = new Finisher(m_cct);
   m_aio_finisher->start();
 }
 
index 14a43bc58cb1cf7bc56b3a1201e4995e82a42fce..f40eea835784169861e20e460dbc28723c0bb188 100644 (file)
@@ -9,23 +9,19 @@
 
 namespace librados {
 
-TestWatchNotify::TestWatchNotify(CephContext *cct)
-  : m_cct(cct), m_finisher(new Finisher(cct)), m_handle(), m_notify_id(),
+TestWatchNotify::TestWatchNotify(CephContext *cct, Finisher *finisher)
+  : m_cct(cct), m_finisher(finisher), m_handle(), m_notify_id(),
     m_file_watcher_lock("librados::TestWatchNotify::m_file_watcher_lock"),
     m_pending_notifies(0) {
   m_cct->get();
-  m_finisher->start();
 }
 
 TestWatchNotify::~TestWatchNotify() {
-  m_finisher->stop();
-  delete m_finisher;
   m_cct->put();
 }
 
 TestWatchNotify::NotifyHandle::NotifyHandle()
-  : pbl(NULL), pending_responses(),
-    lock("TestWatchNotify::NotifyHandle::lock") {
+  : lock("TestWatchNotify::NotifyHandle::lock") {
 }
 
 TestWatchNotify::Watcher::Watcher()
@@ -50,7 +46,7 @@ int TestWatchNotify::list_watchers(const std::string& o,
        it != watcher->watch_handles.end(); ++it) {
     obj_watch_t obj;
     strcpy(obj.addr, ":/0");
-    obj.watcher_id = static_cast<int64_t>(it->second.instance_id);
+    obj.watcher_id = static_cast<int64_t>(it->second.gid);
     obj.cookie = it->second.handle;
     obj.timeout_seconds = 30;
     out_watchers->push_back(obj);
@@ -60,35 +56,64 @@ int TestWatchNotify::list_watchers(const std::string& o,
 
 int TestWatchNotify::notify(const std::string& oid, bufferlist& bl,
                             uint64_t timeout_ms, bufferlist *pbl) {
-  Mutex lock("TestRadosClient::watcher_notify::lock");
-  Cond cond;
-  bool done = false;
+  uint64_t notify_id;
+  {
+    Mutex::Locker file_watcher_locker(m_file_watcher_lock);
+    ++m_pending_notifies;
+    notify_id = ++m_notify_id;
+  }
 
+  SharedNotifyHandle notify_handle(new NotifyHandle());
   {
     SharedWatcher watcher = get_watcher(oid);
-    RWLock::WLocker l(watcher->lock);
-    {
-      Mutex::Locker l2(m_file_watcher_lock);
-      ++m_pending_notifies;
-      uint64_t notify_id = ++m_notify_id;
+    RWLock::WLocker watcher_locker(watcher->lock);
+
+    WatchHandles watch_handles = watcher->watch_handles;
+    for (WatchHandles::iterator w_it = watch_handles.begin();
+         w_it != watch_handles.end(); ++w_it) {
+      WatchHandle &watch_handle = w_it->second;
+
+      Mutex::Locker notify_handle_locker(notify_handle->lock);
+      notify_handle->pending_watcher_ids.insert(std::make_pair(
+        watch_handle.gid, watch_handle.handle));
+    }
+
+    watcher->notify_handles[notify_id] = notify_handle;
+
+    FunctionContext *ctx = new FunctionContext(
+      boost::bind(&TestWatchNotify::execute_notify, this, oid, bl, notify_id));
+    m_finisher->queue(ctx);
+  }
+
+  {
+    utime_t timeout;
+    timeout.set_from_double(ceph_clock_now(m_cct) + (timeout_ms / 1000.0));
 
-      SharedNotifyHandle notify_handle(new NotifyHandle());
-      notify_handle->pbl = pbl;
+    Mutex::Locker notify_locker(notify_handle->lock);
+    while (!notify_handle->pending_watcher_ids.empty()) {
+      notify_handle->cond.WaitUntil(notify_handle->lock, timeout);
+    }
 
-      watcher->notify_handles[notify_id] = notify_handle;
+    if (pbl != NULL) {
+      ::encode(notify_handle->notify_responses, *pbl);
+      ::encode(notify_handle->pending_watcher_ids, *pbl);
+    }
+  }
 
-      FunctionContext *ctx = new FunctionContext(
-          boost::bind(&TestWatchNotify::execute_notify, this,
-                      oid, bl, notify_id, &lock, &cond, &done));
-      m_finisher->queue(ctx);
+  SharedWatcher watcher = get_watcher(oid);
+  Mutex::Locker file_watcher_locker(m_file_watcher_lock);
+  {
+    RWLock::WLocker watcher_locker(watcher->lock);
+
+    watcher->notify_handles.erase(notify_id);
+    if (watcher->watch_handles.empty() && watcher->notify_handles.empty()) {
+      m_file_watchers.erase(oid);
     }
   }
 
-  lock.Lock();
-  while (!done) {
-    cond.Wait(lock);
+  if (--m_pending_notifies == 0) {
+    m_file_watcher_cond.Signal();
   }
-  lock.Unlock();
   return 0;
 }
 
@@ -107,20 +132,24 @@ void TestWatchNotify::notify_ack(const std::string& o, uint64_t notify_id,
   response.append(bl);
 
   SharedNotifyHandle notify_handle = it->second;
-  Mutex::Locker l2(notify_handle->lock);
-  --notify_handle->pending_responses;
-  notify_handle->notify_responses[std::make_pair(gid, handle)] = response;
-  notify_handle->cond.Signal();
+  Mutex::Locker notify_handle_locker(notify_handle->lock);
+
+  WatcherID watcher_id = std::make_pair(gid, handle);
+  notify_handle->notify_responses[watcher_id] = response;
+  notify_handle->pending_watcher_ids.erase(watcher_id);
+  if (notify_handle->pending_watcher_ids.empty()) {
+    notify_handle->cond.Signal();
+  }
 }
 
-int TestWatchNotify::watch(const std::string& o, uint64_t instance_id,
+int TestWatchNotify::watch(const std::string& o, uint64_t gid,
                            uint64_t *handle, librados::WatchCtx *ctx,
                            librados::WatchCtx2 *ctx2) {
   SharedWatcher watcher = get_watcher(o);
 
   RWLock::WLocker l(watcher->lock);
   WatchHandle watch_handle;
-  watch_handle.instance_id = instance_id;
+  watch_handle.gid = gid;
   watch_handle.handle = ++m_handle;
   watch_handle.watch_ctx = ctx;
   watch_handle.watch_ctx2 = ctx2;
@@ -131,24 +160,21 @@ int TestWatchNotify::watch(const std::string& o, uint64_t instance_id,
 }
 
 int TestWatchNotify::unwatch(uint64_t handle) {
-
-  SharedWatcher watcher;
-  {
-    Mutex::Locker l(m_file_watcher_lock);
-    for (FileWatchers::iterator it = m_file_watchers.begin();
-         it != m_file_watchers.end(); ++it) {
-      if (it->second->watch_handles.find(handle) !=
-            it->second->watch_handles.end()) {
-        watcher = it->second;
-        break;
+  Mutex::Locker l(m_file_watcher_lock);
+  for (FileWatchers::iterator it = m_file_watchers.begin();
+       it != m_file_watchers.end(); ++it) {
+    SharedWatcher watcher = it->second;
+    RWLock::WLocker watcher_locker(watcher->lock);
+
+    WatchHandles::iterator w_it = watcher->watch_handles.find(handle);
+    if (w_it != watcher->watch_handles.end()) {
+      watcher->watch_handles.erase(w_it);
+      if (watcher->watch_handles.empty() && watcher->notify_handles.empty()) {
+        m_file_watchers.erase(it);
       }
+      break;
     }
   }
-
-  if (watcher) {
-    RWLock::WLocker l(watcher->lock);
-    watcher->watch_handles.erase(handle);
-  }
   return 0;
 }
 
@@ -168,65 +194,55 @@ TestWatchNotify::SharedWatcher TestWatchNotify::_get_watcher(
 }
 
 void TestWatchNotify::execute_notify(const std::string &oid,
-                                     bufferlist &bl, uint64_t notify_id,
-                                     Mutex *lock, Cond *cond,
-                                     bool *done) {
-  WatchHandles watch_handles;
-  SharedNotifyHandle notify_handle;
-
-  {
-    SharedWatcher watcher = get_watcher(oid);
-    RWLock::RLocker l(watcher->lock);
+                                     bufferlist &bl, uint64_t notify_id) {
+  SharedWatcher watcher = get_watcher(oid);
+  RWLock::RLocker watcher_locker(watcher->lock);
+  WatchHandles &watch_handles = watcher->watch_handles;
 
-    NotifyHandles::iterator n_it = watcher->notify_handles.find(notify_id);
-    if (n_it == watcher->notify_handles.end()) {
-      return;
-    }
-
-    watch_handles = watcher->watch_handles;
-    notify_handle = n_it->second;
+  NotifyHandles::iterator n_it = watcher->notify_handles.find(notify_id);
+  if (n_it == watcher->notify_handles.end()) {
+    return;
   }
 
-  utime_t timeout;
-  timeout.set_from_double(ceph_clock_now(m_cct) + 15);
-
-  for (WatchHandles::iterator w_it = watch_handles.begin();
-       w_it != watch_handles.end(); ++w_it) {
-    WatchHandle &watch_handle = w_it->second;
-
-    bufferlist notify_bl;
-    notify_bl.append(bl);
-    if (watch_handle.watch_ctx2 != NULL) {
-      {
-        Mutex::Locker l2(notify_handle->lock);
-        ++notify_handle->pending_responses;
+  SharedNotifyHandle notify_handle = n_it->second;
+  Mutex::Locker notify_handle_locker(notify_handle->lock);
+
+  WatcherIDs watcher_ids(notify_handle->pending_watcher_ids);
+  for (WatcherIDs::iterator w_id_it = watcher_ids.begin();
+       w_id_it != watcher_ids.end(); ++w_id_it) {
+    WatcherID watcher_id = *w_id_it;
+    WatchHandles::iterator w_it = watch_handles.find(watcher_id.second);
+    if (w_it == watch_handles.end()) {
+      notify_handle->pending_watcher_ids.erase(watcher_id);
+    } else {
+      WatchHandle &watch_handle = w_it->second;
+      assert(watch_handle.gid == watcher_id.first);
+      assert(watch_handle.handle == watcher_id.second);
+
+      bufferlist notify_bl;
+      notify_bl.append(bl);
+
+      notify_handle->lock.Unlock();
+      watcher->lock.put_read();
+      if (watch_handle.watch_ctx2 != NULL) {
+        watch_handle.watch_ctx2->handle_notify(notify_id, w_it->first, 0,
+                                               notify_bl);
+      } else if (watch_handle.watch_ctx != NULL) {
+        watch_handle.watch_ctx->notify(0, 0, notify_bl);
       }
-      watch_handle.watch_ctx2->handle_notify(notify_id, w_it->first, 0,
-                                             notify_bl);
-    } else if (watch_handle.watch_ctx != NULL) {
-      watch_handle.watch_ctx->notify(0, 0, notify_bl);
-    }
-  }
+      watcher->lock.get_read();
+      notify_handle->lock.Lock();
 
-  {
-    Mutex::Locker l2(notify_handle->lock);
-    while (notify_handle->pending_responses > 0) {
-      notify_handle->cond.WaitUntil(notify_handle->lock, timeout);
-    }
-    if (notify_handle->pbl != NULL) {
-      ::encode(notify_handle->notify_responses, *notify_handle->pbl);
+      if (watch_handle.watch_ctx2 == NULL) {
+        // auto ack old-style watch/notify clients
+        notify_handle->notify_responses[watcher_id] = bufferlist();
+        notify_handle->pending_watcher_ids.erase(watcher_id);
+      }
     }
   }
 
-  Mutex::Locker l3(*lock);
-  *done = true;
-  cond->Signal();
-
-  {
-    Mutex::Locker file_watcher_locker(m_file_watcher_lock);
-    if (--m_pending_notifies == 0) {
-      m_file_watcher_cond.Signal();
-    }
+  if (notify_handle->pending_watcher_ids.empty()) {
+    notify_handle->cond.Signal();
   }
 }
 
index 1761302bbf37bde86e16895f4fc1ccb263d22a22..7ff4b1afaaadf17b3e041d293f012cdb7339f612 100644 (file)
@@ -21,13 +21,14 @@ namespace librados {
 
 class TestWatchNotify : boost::noncopyable {
 public:
+  typedef std::pair<uint64_t, uint64_t> WatcherID;
+  typedef std::set<WatcherID> WatcherIDs;
   typedef std::map<std::pair<uint64_t, uint64_t>, bufferlist> NotifyResponses;
 
   struct NotifyHandle {
     NotifyHandle();
+    WatcherIDs pending_watcher_ids;
     NotifyResponses notify_responses;
-    bufferlist *pbl;
-    size_t pending_responses;
     Mutex lock;
     Cond cond;
   };
@@ -35,7 +36,7 @@ public:
   typedef std::map<uint64_t, SharedNotifyHandle> NotifyHandles;
 
   struct WatchHandle {
-    uint64_t instance_id;
+    uint64_t gid;
     uint64_t handle;
     librados::WatchCtx* watch_ctx;
     librados::WatchCtx2* watch_ctx2;
@@ -51,7 +52,7 @@ public:
   };
   typedef boost::shared_ptr<Watcher> SharedWatcher;
 
-  TestWatchNotify(CephContext *cct);
+  TestWatchNotify(CephContext *cct, Finisher *finisher);
   ~TestWatchNotify();
 
   void flush();
@@ -61,7 +62,7 @@ public:
              uint64_t timeout_ms, bufferlist *pbl);
   void notify_ack(const std::string& o, uint64_t notify_id,
                   uint64_t handle, uint64_t gid, bufferlist& bl);
-  int watch(const std::string& o, uint64_t instance_id, uint64_t *handle,
+  int watch(const std::string& o, uint64_t gid, uint64_t *handle,
             librados::WatchCtx *ctx, librados::WatchCtx2 *ctx2);
   int unwatch(uint64_t handle);
 
@@ -84,7 +85,7 @@ private:
   SharedWatcher get_watcher(const std::string& oid);
   SharedWatcher _get_watcher(const std::string& oid);
   void execute_notify(const std::string &oid, bufferlist &bl,
-                      uint64_t notify_id, Mutex *lock, Cond *cond, bool *done);
+                      uint64_t notify_id);
 
 };