]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
test/librados_test_stub: simplied aio_notify processing
authorJason Dillaman <dillaman@redhat.com>
Thu, 12 Apr 2018 14:36:23 +0000 (10:36 -0400)
committerJason Dillaman <dillaman@redhat.com>
Thu, 12 Apr 2018 17:02:30 +0000 (13:02 -0400)
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/test/librados_test_stub/TestWatchNotify.cc
src/test/librados_test_stub/TestWatchNotify.h

index a2f2f69e24a6662197322f488206fd6d884c0513..38a5343c4a09b73823f0de1eca5e5070daf62932 100644 (file)
@@ -31,10 +31,9 @@ void TestWatchNotify::flush(TestRadosClient *rados_client) {
 
   ldout(cct, 20) << "enter" << dendl;
   // block until we know no additional async notify callbacks will occur
-  Mutex::Locker locker(m_lock);
-  while (m_pending_notifies > 0) {
-    m_file_watcher_cond.Wait(m_lock);
-  }
+  C_SaferCond ctx;
+  m_async_op_tracker.wait_for_ops(&ctx);
+  ctx.wait();
 }
 
 int TestWatchNotify::list_watchers(const std::string& o,
@@ -99,38 +98,17 @@ void TestWatchNotify::aio_unwatch(TestRadosClient *rados_client,
 }
 
 void TestWatchNotify::aio_notify(TestRadosClient *rados_client,
-                                 const std::string& oid, bufferlist& bl,
+                                 const std::string& oid, const bufferlist& bl,
                                  uint64_t timeout_ms, bufferlist *pbl,
                                  Context *on_notify) {
-  CephContext *cct = rados_client->cct();
-
-  Mutex::Locker lock(m_lock);
-  ++m_pending_notifies;
-  uint64_t notify_id = ++m_notify_id;
-
-  ldout(cct, 20) << "oid=" << oid << ": notify_id=" << notify_id << dendl;
-
-  SharedWatcher watcher = get_watcher(oid);
-
-  SharedNotifyHandle notify_handle(new NotifyHandle());
-  notify_handle->rados_client = rados_client;
-  notify_handle->pbl = pbl;
-  notify_handle->on_notify = on_notify;
-  for (auto &watch_handle_pair : watcher->watch_handles) {
-    WatchHandle &watch_handle = watch_handle_pair.second;
-    notify_handle->pending_watcher_ids.insert(std::make_pair(
-      watch_handle.gid, watch_handle.handle));
-  }
-  watcher->notify_handles[notify_id] = notify_handle;
-
-  FunctionContext *ctx = new FunctionContext(
-    boost::bind(&TestWatchNotify::execute_notify, this, rados_client, oid, bl,
-                notify_id));
+  auto ctx = new FunctionContext([=](int) {
+      execute_notify(rados_client, oid, bl, pbl, on_notify);
+    });
   rados_client->get_aio_finisher()->queue(ctx);
 }
 
 int TestWatchNotify::notify(TestRadosClient *rados_client,
-                            const std::string& oid, bufferlist& bl,
+                            const std::string& oid, const bufferlist& bl,
                             uint64_t timeout_ms, bufferlist *pbl) {
   C_SaferCond cond;
   aio_notify(rados_client, oid, bl, timeout_ms, pbl, &cond);
@@ -215,62 +193,54 @@ TestWatchNotify::SharedWatcher TestWatchNotify::get_watcher(
 
 void TestWatchNotify::execute_notify(TestRadosClient *rados_client,
                                      const std::string &oid,
-                                     bufferlist &bl, uint64_t notify_id) {
+                                     const bufferlist &bl, bufferlist *pbl,
+                                     Context *on_notify) {
   CephContext *cct = rados_client->cct();
 
-  ldout(cct, 20) << "oid=" << oid << ", notify_id=" << notify_id << dendl;
+  m_lock.Lock();
+  uint64_t notify_id = ++m_notify_id;
+
+  ldout(cct, 20) << "oid=" << oid << ": notify_id=" << notify_id << dendl;
 
-  Mutex::Locker lock(m_lock);
   SharedWatcher watcher = get_watcher(oid);
+
+  SharedNotifyHandle notify_handle(new NotifyHandle());
+  notify_handle->rados_client = rados_client;
+  notify_handle->pbl = pbl;
+  notify_handle->on_notify = on_notify;
+
   WatchHandles &watch_handles = watcher->watch_handles;
+  for (auto &watch_handle_pair : watch_handles) {
+    WatchHandle &watch_handle = watch_handle_pair.second;
+    notify_handle->pending_watcher_ids.insert(std::make_pair(
+      watch_handle.gid, watch_handle.handle));
 
-  NotifyHandles::iterator n_it = watcher->notify_handles.find(notify_id);
-  if (n_it == watcher->notify_handles.end()) {
-    ldout(cct, 1) << "oid=" << oid << ", notify_id=" << notify_id
-                 << ": not found" << dendl;
-    return;
-  }
+    m_async_op_tracker.start_op();
+    uint64_t notifier_id = rados_client->get_instance_id();
+    watch_handle.rados_client->get_aio_finisher()->queue(new FunctionContext(
+      [this, oid, bl, notify_id, watch_handle, notifier_id](int r) {
+        bufferlist notify_bl;
+        notify_bl.append(bl);
+
+        if (watch_handle.watch_ctx2 != NULL) {
+          watch_handle.watch_ctx2->handle_notify(notify_id,
+                                                 watch_handle.handle,
+                                                 notifier_id, notify_bl);
+        } else if (watch_handle.watch_ctx != NULL) {
+          watch_handle.watch_ctx->notify(0, 0, notify_bl);
+
+          // auto ack old-style watch/notify clients
+          ack_notify(watch_handle.rados_client, oid, notify_id,
+                     {watch_handle.gid, watch_handle.handle}, bufferlist());
+        }
 
-  SharedNotifyHandle notify_handle = n_it->second;
-  WatcherIDs watcher_ids(notify_handle->pending_watcher_ids);
-  for (WatcherIDs::iterator w_id_it = watcher_ids.begin();
-       w_id_it != watcher_ids.end(); ++w_id_it) {
-    WatcherID watcher_id = *w_id_it;
-    WatchHandles::iterator w_it = watch_handles.find(watcher_id.second);
-    if (w_it == watch_handles.end()) {
-      // client disconnected before notification processed
-      notify_handle->pending_watcher_ids.erase(watcher_id);
-    } else {
-      WatchHandle watch_handle = w_it->second;
-      assert(watch_handle.gid == watcher_id.first);
-      assert(watch_handle.handle == watcher_id.second);
-
-      uint64_t notifier_id = rados_client->get_instance_id();
-      watch_handle.rados_client->get_aio_finisher()->queue(new FunctionContext(
-        [this, oid, bl, notify_id, watch_handle, notifier_id](int r) {
-          bufferlist notify_bl;
-          notify_bl.append(bl);
-
-          if (watch_handle.watch_ctx2 != NULL) {
-            watch_handle.watch_ctx2->handle_notify(notify_id,
-                                                   watch_handle.handle,
-                                                   notifier_id, notify_bl);
-          } else if (watch_handle.watch_ctx != NULL) {
-            watch_handle.watch_ctx->notify(0, 0, notify_bl);
-
-            // auto ack old-style watch/notify clients
-            ack_notify(watch_handle.rados_client, oid, notify_id,
-                       {watch_handle.gid, watch_handle.handle}, bufferlist());
-          }
-        }));
-    }
+        m_async_op_tracker.finish_op();
+      }));
   }
+  watcher->notify_handles[notify_id] = notify_handle;
 
   finish_notify(rados_client, oid, notify_id);
-
-  if (--m_pending_notifies == 0) {
-    m_file_watcher_cond.Signal();
-  }
+  m_lock.Unlock();
 }
 
 void TestWatchNotify::ack_notify(TestRadosClient *rados_client,
index 04d37c2358d58135c87a7e51dbb5f88f7f0f13aa..02be14a3c36d440f1e2a7b3f4a661a1bc3e3b5f7 100644 (file)
@@ -5,7 +5,7 @@
 #define CEPH_TEST_WATCH_NOTIFY_H
 
 #include "include/rados/librados.hpp"
-#include "common/Cond.h"
+#include "common/AsyncOpTracker.h"
 #include "common/Mutex.h"
 #include <boost/noncopyable.hpp>
 #include <boost/shared_ptr.hpp>
@@ -65,12 +65,12 @@ public:
   void aio_unwatch(TestRadosClient *rados_client, uint64_t handle,
                    Context *on_finish);
   void aio_notify(TestRadosClient *rados_client, const std::string& oid,
-                  bufferlist& bl, uint64_t timeout_ms, bufferlist *pbl,
+                  const bufferlist& bl, uint64_t timeout_ms, bufferlist *pbl,
                   Context *on_notify);
 
   void flush(TestRadosClient *rados_client);
   int notify(TestRadosClient *rados_client, const std::string& o,
-             bufferlist& bl, uint64_t timeout_ms, bufferlist *pbl);
+             const bufferlist& bl, uint64_t timeout_ms, bufferlist *pbl);
   void notify_ack(TestRadosClient *rados_client, const std::string& o,
                   uint64_t notify_id, uint64_t handle, uint64_t gid,
                   bufferlist& bl);
@@ -89,9 +89,8 @@ private:
   uint64_t m_notify_id = 0;
 
   Mutex m_lock;
-  uint64_t m_pending_notifies = 0;
+  AsyncOpTracker m_async_op_tracker;
 
-  Cond m_file_watcher_cond;
   FileWatchers m_file_watchers;
 
   SharedWatcher get_watcher(const std::string& oid);
@@ -105,7 +104,8 @@ private:
                        Context *on_finish);
 
   void execute_notify(TestRadosClient *rados_client, const std::string &oid,
-                      bufferlist &bl, uint64_t notify_id);
+                      const bufferlist &bl, bufferlist *pbl,
+                      Context *on_notify);
   void ack_notify(TestRadosClient *rados_client, const std::string &oid,
                   uint64_t notify_id, const WatcherID &watcher_id,
                   const bufferlist &bl);