]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
librados_test_stub: watch/notify now behaves similar to librados 7409/head
authorJason Dillaman <dillaman@redhat.com>
Thu, 28 Jan 2016 19:35:54 +0000 (14:35 -0500)
committerJason Dillaman <dillaman@redhat.com>
Fri, 29 Jan 2016 02:55:35 +0000 (21:55 -0500)
Notifications are executed via the same librados AIO callback
thread, so it's now possible to catch deadlock.

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/test/librados_test_stub/TestRadosClient.cc
src/test/librados_test_stub/TestWatchNotify.cc
src/test/librados_test_stub/TestWatchNotify.h

index 1a9792c6e0209f3ae4eae839192830ba44d00274..c72aa28af8f5f7f7a0ec85813cf74ff5344ae212 100644 (file)
@@ -84,7 +84,8 @@ private:
 
 TestRadosClient::TestRadosClient(CephContext *cct)
   : m_cct(cct->get()),
-    m_watch_notify(m_cct),
+    m_aio_finisher(new Finisher(m_cct)),
+    m_watch_notify(m_cct, m_aio_finisher),
     m_transaction_lock("TestRadosClient::m_transaction_lock")
 {
   get();
@@ -97,7 +98,6 @@ TestRadosClient::TestRadosClient(CephContext *cct)
   }
 
   // replicate AIO callback processing
-  m_aio_finisher = new Finisher(m_cct);
   m_aio_finisher->start();
 }
 
index ef8f5378fae9c8b88c6bcd620f1c8611426d173a..769e00f183d4f7e11db56543159c6fb5bbf2a489 100644 (file)
@@ -9,40 +9,28 @@
 
 namespace librados {
 
-TestWatchNotify::TestWatchNotify(CephContext *cct)
-  : m_cct(cct), m_finisher(new Finisher(cct)), m_handle(), m_notify_id(),
-    m_file_watcher_lock("librados::TestWatchNotify::m_file_watcher_lock"),
+TestWatchNotify::TestWatchNotify(CephContext *cct, Finisher *finisher)
+  : m_cct(cct), m_finisher(finisher), m_handle(), m_notify_id(),
+    m_lock("librados::TestWatchNotify::m_lock"),
     m_pending_notifies(0) {
   m_cct->get();
-  m_finisher->start();
 }
 
 TestWatchNotify::~TestWatchNotify() {
-  m_finisher->stop();
-  delete m_finisher;
   m_cct->put();
 }
 
-TestWatchNotify::NotifyHandle::NotifyHandle()
-  : pbl(NULL), pending_responses(),
-    lock("TestWatchNotify::NotifyHandle::lock") {
-}
-
-TestWatchNotify::Watcher::Watcher()
-  : lock("TestWatchNotify::Watcher::lock") {
-}
-
 void TestWatchNotify::flush() {
-  Mutex::Locker file_watcher_locker(m_file_watcher_lock);
+  Mutex::Locker locker(m_lock);
   while (m_pending_notifies > 0) {
-    m_file_watcher_cond.Wait(m_file_watcher_lock);
+    m_file_watcher_cond.Wait(m_lock);
   }
 }
 
 int TestWatchNotify::list_watchers(const std::string& o,
                                    std::list<obj_watch_t> *out_watchers) {
+  Mutex::Locker lock(m_lock);
   SharedWatcher watcher = get_watcher(o);
-  RWLock::RLocker l(watcher->lock);
 
   out_watchers->clear();
   for (TestWatchNotify::WatchHandles::iterator it =
@@ -50,7 +38,7 @@ int TestWatchNotify::list_watchers(const std::string& o,
        it != watcher->watch_handles.end(); ++it) {
     obj_watch_t obj;
     strcpy(obj.addr, ":/0");
-    obj.watcher_id = static_cast<int64_t>(it->second.instance_id);
+    obj.watcher_id = static_cast<int64_t>(it->second.gid);
     obj.cookie = it->second.handle;
     obj.timeout_seconds = 30;
     out_watchers->push_back(obj);
@@ -61,20 +49,24 @@ int TestWatchNotify::list_watchers(const std::string& o,
 void TestWatchNotify::aio_notify(const std::string& oid, bufferlist& bl,
                                  uint64_t timeout_ms, bufferlist *pbl,
                                  Context *on_notify) {
-  SharedWatcher watcher = get_watcher(oid);
-  RWLock::WLocker watcher_locker(watcher->lock);
-  Mutex::Locker file_watcher_lock(m_file_watcher_lock);
+  Mutex::Locker lock(m_lock);
   ++m_pending_notifies;
   uint64_t notify_id = ++m_notify_id;
 
+  SharedWatcher watcher = get_watcher(oid);
+
   SharedNotifyHandle notify_handle(new NotifyHandle());
   notify_handle->pbl = pbl;
-
+  notify_handle->on_notify = on_notify;
+  for (auto &watch_handle_pair : watcher->watch_handles) {
+    WatchHandle &watch_handle = watch_handle_pair.second;
+    notify_handle->pending_watcher_ids.insert(std::make_pair(
+      watch_handle.gid, watch_handle.handle));
+  }
   watcher->notify_handles[notify_id] = notify_handle;
 
   FunctionContext *ctx = new FunctionContext(
-      boost::bind(&TestWatchNotify::execute_notify, this,
-                  oid, bl, notify_id, on_notify));
+    boost::bind(&TestWatchNotify::execute_notify, this, oid, bl, notify_id));
   m_finisher->queue(ctx);
 }
 
@@ -88,32 +80,20 @@ int TestWatchNotify::notify(const std::string& oid, bufferlist& bl,
 void TestWatchNotify::notify_ack(const std::string& o, uint64_t notify_id,
                                  uint64_t handle, uint64_t gid,
                                  bufferlist& bl) {
-  SharedWatcher watcher = get_watcher(o);
-
-  RWLock::RLocker l(watcher->lock);
-  NotifyHandles::iterator it = watcher->notify_handles.find(notify_id);
-  if (it == watcher->notify_handles.end()) {
-    return;
-  }
-
-  bufferlist response;
-  response.append(bl);
-
-  SharedNotifyHandle notify_handle = it->second;
-  Mutex::Locker l2(notify_handle->lock);
-  --notify_handle->pending_responses;
-  notify_handle->notify_responses[std::make_pair(gid, handle)] = response;
-  notify_handle->cond.Signal();
+  Mutex::Locker lock(m_lock);
+  WatcherID watcher_id = std::make_pair(gid, handle);
+  ack_notify(o, notify_id, watcher_id, bl);
+  finish_notify(o, notify_id);
 }
 
-int TestWatchNotify::watch(const std::string& o, uint64_t instance_id,
+int TestWatchNotify::watch(const std::string& o, uint64_t gid,
                            uint64_t *handle, librados::WatchCtx *ctx,
                            librados::WatchCtx2 *ctx2) {
+  Mutex::Locker lock(m_lock);
   SharedWatcher watcher = get_watcher(o);
 
-  RWLock::WLocker l(watcher->lock);
   WatchHandle watch_handle;
-  watch_handle.instance_id = instance_id;
+  watch_handle.gid = gid;
   watch_handle.handle = ++m_handle;
   watch_handle.watch_ctx = ctx;
   watch_handle.watch_ctx2 = ctx2;
@@ -124,35 +104,26 @@ int TestWatchNotify::watch(const std::string& o, uint64_t instance_id,
 }
 
 int TestWatchNotify::unwatch(uint64_t handle) {
-
-  SharedWatcher watcher;
-  {
-    Mutex::Locker l(m_file_watcher_lock);
-    for (FileWatchers::iterator it = m_file_watchers.begin();
-         it != m_file_watchers.end(); ++it) {
-      if (it->second->watch_handles.find(handle) !=
-            it->second->watch_handles.end()) {
-        watcher = it->second;
-        break;
+  Mutex::Locker locker(m_lock);
+  for (FileWatchers::iterator it = m_file_watchers.begin();
+       it != m_file_watchers.end(); ++it) {
+    SharedWatcher watcher = it->second;
+
+    WatchHandles::iterator w_it = watcher->watch_handles.find(handle);
+    if (w_it != watcher->watch_handles.end()) {
+      watcher->watch_handles.erase(w_it);
+      if (watcher->watch_handles.empty() && watcher->notify_handles.empty()) {
+        m_file_watchers.erase(it);
       }
+      break;
     }
   }
-
-  if (watcher) {
-    RWLock::WLocker l(watcher->lock);
-    watcher->watch_handles.erase(handle);
-  }
   return 0;
 }
 
 TestWatchNotify::SharedWatcher TestWatchNotify::get_watcher(
     const std::string& oid) {
-  Mutex::Locker l(m_file_watcher_lock);
-  return _get_watcher(oid);
-}
-
-TestWatchNotify::SharedWatcher TestWatchNotify::_get_watcher(
-    const std::string& oid) {
+  assert(m_lock.is_locked());
   SharedWatcher &watcher = m_file_watchers[oid];
   if (!watcher) {
     watcher.reset(new Watcher());
@@ -161,58 +132,101 @@ TestWatchNotify::SharedWatcher TestWatchNotify::_get_watcher(
 }
 
 void TestWatchNotify::execute_notify(const std::string &oid,
-                                     bufferlist &bl, uint64_t notify_id,
-                                     Context *on_notify) {
-  WatchHandles watch_handles;
-  SharedNotifyHandle notify_handle;
-
-  {
-    SharedWatcher watcher = get_watcher(oid);
-    RWLock::RLocker l(watcher->lock);
-
-    NotifyHandles::iterator n_it = watcher->notify_handles.find(notify_id);
-    if (n_it == watcher->notify_handles.end()) {
-      return;
-    }
+                                     bufferlist &bl, uint64_t notify_id) {
+  Mutex::Locker lock(m_lock);
+  SharedWatcher watcher = get_watcher(oid);
+  WatchHandles &watch_handles = watcher->watch_handles;
 
-    watch_handles = watcher->watch_handles;
-    notify_handle = n_it->second;
+  NotifyHandles::iterator n_it = watcher->notify_handles.find(notify_id);
+  if (n_it == watcher->notify_handles.end()) {
+    return;
   }
 
-  utime_t timeout;
-  timeout.set_from_double(ceph_clock_now(m_cct) + 15);
-
-  for (WatchHandles::iterator w_it = watch_handles.begin();
-       w_it != watch_handles.end(); ++w_it) {
-    WatchHandle &watch_handle = w_it->second;
+  SharedNotifyHandle notify_handle = n_it->second;
+  WatcherIDs watcher_ids(notify_handle->pending_watcher_ids);
+  for (WatcherIDs::iterator w_id_it = watcher_ids.begin();
+       w_id_it != watcher_ids.end(); ++w_id_it) {
+    WatcherID watcher_id = *w_id_it;
+    WatchHandles::iterator w_it = watch_handles.find(watcher_id.second);
+    if (w_it == watch_handles.end()) {
+      // client disconnected before notification processed
+      notify_handle->pending_watcher_ids.erase(watcher_id);
+    } else {
+      WatchHandle &watch_handle = w_it->second;
+      assert(watch_handle.gid == watcher_id.first);
+      assert(watch_handle.handle == watcher_id.second);
+
+      bufferlist notify_bl;
+      notify_bl.append(bl);
+
+      m_lock.Unlock();
+      if (watch_handle.watch_ctx2 != NULL) {
+        watch_handle.watch_ctx2->handle_notify(notify_id, w_it->first, 0,
+                                               notify_bl);
+      } else if (watch_handle.watch_ctx != NULL) {
+        watch_handle.watch_ctx->notify(0, 0, notify_bl);
+      }
+      m_lock.Lock();
 
-    bufferlist notify_bl;
-    notify_bl.append(bl);
-    if (watch_handle.watch_ctx2 != NULL) {
-      {
-        Mutex::Locker l2(notify_handle->lock);
-        ++notify_handle->pending_responses;
+      if (watch_handle.watch_ctx2 == NULL) {
+        // auto ack old-style watch/notify clients
+        ack_notify(oid, notify_id, watcher_id, bufferlist());
       }
-      watch_handle.watch_ctx2->handle_notify(notify_id, w_it->first, 0,
-                                             notify_bl);
-    } else if (watch_handle.watch_ctx != NULL) {
-      watch_handle.watch_ctx->notify(0, 0, notify_bl);
     }
   }
 
-  {
-    Mutex::Locker l2(notify_handle->lock);
-    while (notify_handle->pending_responses > 0) {
-      notify_handle->cond.WaitUntil(notify_handle->lock, timeout);
-    }
-    if (notify_handle->pbl != NULL) {
-      ::encode(notify_handle->notify_responses, *notify_handle->pbl);
-    }
+  finish_notify(oid, notify_id);
+}
+
+void TestWatchNotify::ack_notify(const std::string &oid,
+                                 uint64_t notify_id,
+                                 const WatcherID &watcher_id,
+                                 const bufferlist &bl) {
+  assert(m_lock.is_locked());
+  SharedWatcher watcher = get_watcher(oid);
+
+  NotifyHandles::iterator it = watcher->notify_handles.find(notify_id);
+  if (it == watcher->notify_handles.end()) {
+    return;
+  }
+
+  bufferlist response;
+  response.append(bl);
+
+  SharedNotifyHandle notify_handle = it->second;
+  notify_handle->notify_responses[watcher_id] = response;
+  notify_handle->pending_watcher_ids.erase(watcher_id);
+}
+
+void TestWatchNotify::finish_notify(const std::string &oid,
+                                    uint64_t notify_id) {
+  assert(m_lock.is_locked());
+  SharedWatcher watcher = get_watcher(oid);
+
+  NotifyHandles::iterator it = watcher->notify_handles.find(notify_id);
+  if (it == watcher->notify_handles.end()) {
+    return;
   }
 
-  on_notify->complete(0);
+  SharedNotifyHandle notify_handle = it->second;
+  if (!notify_handle->pending_watcher_ids.empty()) {
+    return;
+  }
+
+  if (notify_handle->pbl != NULL) {
+    ::encode(notify_handle->notify_responses, *notify_handle->pbl);
+    ::encode(notify_handle->pending_watcher_ids, *notify_handle->pbl);
+  }
+
+  m_lock.Unlock();
+  notify_handle->on_notify->complete(0);
+  m_lock.Lock();
+
+  watcher->notify_handles.erase(notify_id);
+  if (watcher->watch_handles.empty() && watcher->notify_handles.empty()) {
+    m_file_watchers.erase(oid);
+  }
 
-  Mutex::Locker file_watcher_locker(m_file_watcher_lock);
   if (--m_pending_notifies == 0) {
     m_file_watcher_cond.Signal();
   }
index 6f99704784eb1809a87b89ac1cd7a4f2e8f14ffd..a40f560765b4e95be8a88ea8fc6c983a4256530c 100644 (file)
@@ -1,4 +1,4 @@
-
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
 // vim: ts=8 sw=2 smarttab
 
 #ifndef CEPH_TEST_WATCH_NOTIFY_H
@@ -7,7 +7,6 @@
 #include "include/rados/librados.hpp"
 #include "common/Cond.h"
 #include "common/Mutex.h"
-#include "common/RWLock.h"
 #include <boost/noncopyable.hpp>
 #include <boost/shared_ptr.hpp>
 #include <list>
@@ -21,21 +20,21 @@ namespace librados {
 
 class TestWatchNotify : boost::noncopyable {
 public:
+  typedef std::pair<uint64_t, uint64_t> WatcherID;
+  typedef std::set<WatcherID> WatcherIDs;
   typedef std::map<std::pair<uint64_t, uint64_t>, bufferlist> NotifyResponses;
 
   struct NotifyHandle {
-    NotifyHandle();
+    WatcherIDs pending_watcher_ids;
     NotifyResponses notify_responses;
-    bufferlist *pbl;
-    size_t pending_responses;
-    Mutex lock;
-    Cond cond;
+    bufferlist *pbl = nullptr;
+    Context *on_notify = nullptr;
   };
   typedef boost::shared_ptr<NotifyHandle> SharedNotifyHandle;
   typedef std::map<uint64_t, SharedNotifyHandle> NotifyHandles;
 
   struct WatchHandle {
-    uint64_t instance_id;
+    uint64_t gid;
     uint64_t handle;
     librados::WatchCtx* watch_ctx;
     librados::WatchCtx2* watch_ctx2;
@@ -44,14 +43,12 @@ public:
   typedef std::map<uint64_t, WatchHandle> WatchHandles;
 
   struct Watcher {
-    Watcher();
     WatchHandles watch_handles;
     NotifyHandles notify_handles;
-    RWLock lock;
   };
   typedef boost::shared_ptr<Watcher> SharedWatcher;
 
-  TestWatchNotify(CephContext *cct);
+  TestWatchNotify(CephContext *cct, Finisher *finisher);
   ~TestWatchNotify();
 
   void flush();
@@ -63,7 +60,7 @@ public:
              uint64_t timeout_ms, bufferlist *pbl);
   void notify_ack(const std::string& o, uint64_t notify_id,
                   uint64_t handle, uint64_t gid, bufferlist& bl);
-  int watch(const std::string& o, uint64_t instance_id, uint64_t *handle,
+  int watch(const std::string& o, uint64_t gid, uint64_t *handle,
             librados::WatchCtx *ctx, librados::WatchCtx2 *ctx2);
   int unwatch(uint64_t handle);
 
@@ -77,17 +74,19 @@ private:
   uint64_t m_handle;
   uint64_t m_notify_id;
 
-  Mutex m_file_watcher_lock;
-  Cond m_file_watcher_cond;
+  Mutex m_lock;
   uint64_t m_pending_notifies;
 
+  Cond m_file_watcher_cond;
   FileWatchers m_file_watchers;
 
   SharedWatcher get_watcher(const std::string& oid);
-  SharedWatcher _get_watcher(const std::string& oid);
-  void execute_notify(const std::string &oid, bufferlist &bl,
-                      uint64_t notify_id, Context *on_notify);
 
+  void execute_notify(const std::string &oid, bufferlist &bl,
+                      uint64_t notify_id);
+  void ack_notify(const std::string &oid, uint64_t notify_id,
+                  const WatcherID &watcher_id, const bufferlist &bl);
+  void finish_notify(const std::string &oid, uint64_t notify_id);
 };
 
 } // namespace librados