]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
librbd: make all maintenance op notifications async
authorMykola Golub <mgolub@suse.com>
Fri, 6 Nov 2020 14:19:07 +0000 (14:19 +0000)
committerMykola Golub <mgolub@suse.com>
Wed, 25 Nov 2020 10:13:35 +0000 (10:13 +0000)
The plan is to serialize maintenance operations by type, so
potentially any operation may run for long time and we want to
avoid sending duplicates.

Signed-off-by: Mykola Golub <mgolub@suse.com>
src/librbd/ImageWatcher.cc
src/librbd/ImageWatcher.h
src/librbd/Operations.cc
src/librbd/WatchNotifyTypes.cc
src/librbd/WatchNotifyTypes.h
src/test/librbd/test_ImageWatcher.cc

index 13ab9d4541ea56ac5331ed51649a51ee45904978..a19ddabd7eec223589ec52c226de8c77ad023789 100644 (file)
@@ -201,7 +201,7 @@ void ImageWatcher<I>::notify_resize(uint64_t request_id, uint64_t size,
   AsyncRequestId async_request_id(get_client_id(), request_id);
 
   notify_async_request(async_request_id,
-                       new ResizePayload(size, allow_shrink, async_request_id),
+                       new ResizePayload(async_request_id, size, allow_shrink),
                        prog_ctx, on_finish);
 }
 
@@ -225,51 +225,68 @@ void ImageWatcher<I>::notify_snap_create(uint64_t request_id,
 }
 
 template <typename I>
-void ImageWatcher<I>::notify_snap_rename(const snapid_t &src_snap_id,
+void ImageWatcher<I>::notify_snap_rename(uint64_t request_id,
+                                         const snapid_t &src_snap_id,
                                         const std::string &dst_snap_name,
                                         Context *on_finish) {
   ceph_assert(ceph_mutex_is_locked(m_image_ctx.owner_lock));
   ceph_assert(m_image_ctx.exclusive_lock &&
               !m_image_ctx.exclusive_lock->is_lock_owner());
 
-  notify_lock_owner(new SnapRenamePayload(src_snap_id, dst_snap_name),
-                    on_finish);
+  AsyncRequestId async_request_id(get_client_id(), request_id);
+
+  notify_async_request(
+      async_request_id,
+      new SnapRenamePayload(async_request_id, src_snap_id, dst_snap_name),
+      m_no_op_prog_ctx, on_finish);
 }
 
 template <typename I>
-void ImageWatcher<I>::notify_snap_remove(const cls::rbd::SnapshotNamespace &snap_namespace,
-                                        const std::string &snap_name,
-                                         Context *on_finish) {
+void ImageWatcher<I>::notify_snap_remove(
+    uint64_t request_id, const cls::rbd::SnapshotNamespace &snap_namespace,
+    const std::string &snap_name, Context *on_finish) {
   ceph_assert(ceph_mutex_is_locked(m_image_ctx.owner_lock));
   ceph_assert(m_image_ctx.exclusive_lock &&
               !m_image_ctx.exclusive_lock->is_lock_owner());
 
-  notify_lock_owner(new SnapRemovePayload(snap_namespace, snap_name),
-                    on_finish);
+  AsyncRequestId async_request_id(get_client_id(), request_id);
+
+  notify_async_request(
+      async_request_id,
+      new SnapRemovePayload(async_request_id, snap_namespace, snap_name),
+      m_no_op_prog_ctx, on_finish);
 }
 
 template <typename I>
-void ImageWatcher<I>::notify_snap_protect(const cls::rbd::SnapshotNamespace &snap_namespace,
-                                         const std::string &snap_name,
-                                          Context *on_finish) {
+void ImageWatcher<I>::notify_snap_protect(
+    uint64_t request_id, const cls::rbd::SnapshotNamespace &snap_namespace,
+    const std::string &snap_name, Context *on_finish) {
   ceph_assert(ceph_mutex_is_locked(m_image_ctx.owner_lock));
   ceph_assert(m_image_ctx.exclusive_lock &&
               !m_image_ctx.exclusive_lock->is_lock_owner());
 
-  notify_lock_owner(new SnapProtectPayload(snap_namespace, snap_name),
-                    on_finish);
+  AsyncRequestId async_request_id(get_client_id(), request_id);
+
+  notify_async_request(
+      async_request_id,
+      new SnapProtectPayload(async_request_id, snap_namespace, snap_name),
+      m_no_op_prog_ctx, on_finish);
 }
 
 template <typename I>
-void ImageWatcher<I>::notify_snap_unprotect(const cls::rbd::SnapshotNamespace &snap_namespace,
-                                           const std::string &snap_name,
-                                            Context *on_finish) {
+void ImageWatcher<I>::notify_snap_unprotect(
+    uint64_t request_id, const cls::rbd::SnapshotNamespace &snap_namespace,
+    const std::string &snap_name, Context *on_finish) {
   ceph_assert(ceph_mutex_is_locked(m_image_ctx.owner_lock));
   ceph_assert(m_image_ctx.exclusive_lock &&
               !m_image_ctx.exclusive_lock->is_lock_owner());
 
-  notify_lock_owner(new SnapUnprotectPayload(snap_namespace, snap_name),
-                    on_finish);
+  AsyncRequestId async_request_id(get_client_id(), request_id);
+
+  notify_async_request(
+      async_request_id,
+      new SnapUnprotectPayload(async_request_id, snap_namespace, snap_name),
+      m_no_op_prog_ctx, on_finish);
 }
 
 template <typename I>
@@ -288,23 +305,33 @@ void ImageWatcher<I>::notify_rebuild_object_map(uint64_t request_id,
 }
 
 template <typename I>
-void ImageWatcher<I>::notify_rename(const std::string &image_name,
+void ImageWatcher<I>::notify_rename(uint64_t request_id,
+                                    const std::string &image_name,
                                     Context *on_finish) {
   ceph_assert(ceph_mutex_is_locked(m_image_ctx.owner_lock));
   ceph_assert(m_image_ctx.exclusive_lock &&
               !m_image_ctx.exclusive_lock->is_lock_owner());
 
-  notify_lock_owner(new RenamePayload(image_name), on_finish);
+  AsyncRequestId async_request_id(get_client_id(), request_id);
+
+  notify_async_request(async_request_id,
+                       new RenamePayload(async_request_id, image_name),
+                       m_no_op_prog_ctx, on_finish);
 }
 
 template <typename I>
-void ImageWatcher<I>::notify_update_features(uint64_t features, bool enabled,
+void ImageWatcher<I>::notify_update_features(uint64_t request_id,
+                                             uint64_t features, bool enabled,
                                              Context *on_finish) {
   ceph_assert(ceph_mutex_is_locked(m_image_ctx.owner_lock));
   ceph_assert(m_image_ctx.exclusive_lock &&
               !m_image_ctx.exclusive_lock->is_lock_owner());
 
-  notify_lock_owner(new UpdateFeaturesPayload(features, enabled), on_finish);
+  AsyncRequestId async_request_id(get_client_id(), request_id);
+
+  notify_async_request(async_request_id,
+      new UpdateFeaturesPayload(async_request_id, features, enabled),
+      m_no_op_prog_ctx, on_finish);
 }
 
 template <typename I>
@@ -443,27 +470,37 @@ void ImageWatcher<I>::notify_unquiesce(uint64_t request_id, Context *on_finish)
 }
 
 template <typename I>
-void ImageWatcher<I>::notify_metadata_set(const std::string &key,
+void ImageWatcher<I>::notify_metadata_set(uint64_t request_id,
+                                          const std::string &key,
                                           const std::string &value,
                                           Context *on_finish) {
   ceph_assert(ceph_mutex_is_locked(m_image_ctx.owner_lock));
   ceph_assert(m_image_ctx.exclusive_lock &&
               !m_image_ctx.exclusive_lock->is_lock_owner());
 
-  notify_lock_owner(new MetadataUpdatePayload(key,
-                    std::optional<std::string>{value}),
-                    on_finish);
+  AsyncRequestId async_request_id(get_client_id(), request_id);
+
+  notify_async_request(
+      async_request_id,
+      new MetadataUpdatePayload(async_request_id, key,
+                                std::optional<std::string>{value}),
+      m_no_op_prog_ctx,  on_finish);
 }
 
 template <typename I>
-void ImageWatcher<I>::notify_metadata_remove(const std::string &key,
+void ImageWatcher<I>::notify_metadata_remove(uint64_t request_id,
+                                             const std::string &key,
                                              Context *on_finish) {
   ceph_assert(ceph_mutex_is_locked(m_image_ctx.owner_lock));
   ceph_assert(m_image_ctx.exclusive_lock &&
               !m_image_ctx.exclusive_lock->is_lock_owner());
 
-  notify_lock_owner(new MetadataUpdatePayload(key, std::nullopt),
-                    on_finish);
+  AsyncRequestId async_request_id(get_client_id(), request_id);
+
+  notify_async_request(
+      async_request_id,
+      new MetadataUpdatePayload(async_request_id, key, std::nullopt),
+      m_no_op_prog_ctx,  on_finish);
 }
 
 template <typename I>
@@ -1099,12 +1136,8 @@ bool ImageWatcher<I>::handle_payload(const SnapCreatePayload &payload,
         complete = true;
       } else {
         new_request = true;
-        prog_ctx = new NoOpProgressContext();
-        ctx = new LambdaContext(
-          [prog_ctx, on_finish=new C_ResponseMessage(ack_ctx)](int r) {
-            delete prog_ctx;
-            on_finish->complete(r);
-          });
+        prog_ctx = &m_no_op_prog_ctx;
+        ctx = new C_ResponseMessage(ack_ctx);
         complete = false;
       }
       if (r == 0 && new_request) {
@@ -1135,14 +1168,28 @@ bool ImageWatcher<I>::handle_payload(const SnapRenamePayload &payload,
     int r;
     if (m_image_ctx.exclusive_lock->accept_request(
           exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL, &r)) {
-      ldout(m_image_ctx.cct, 10) << this << " remote snap_rename request: "
-                                << payload.snap_id << " to "
-                                << payload.snap_name << dendl;
-
-      m_image_ctx.operations->execute_snap_rename(payload.snap_id,
-                                                  payload.snap_name,
-                                                  new C_ResponseMessage(ack_ctx));
-      return false;
+      bool new_request;
+      Context *ctx;
+      bool complete;
+      if (payload.async_request_id) {
+        r = prepare_async_request(payload.async_request_id, &new_request,
+                                  &ctx, nullptr);
+        encode(ResponseMessage(r), ack_ctx->out);
+        complete = true;
+      } else {
+        new_request = true;
+        ctx = new C_ResponseMessage(ack_ctx);
+        complete = false;
+      }
+      if (r == 0 && new_request) {
+        ldout(m_image_ctx.cct, 10) << this << " remote snap_rename request: "
+                                   << payload.snap_id << " to "
+                                   << payload.snap_name << dendl;
+
+        m_image_ctx.operations->execute_snap_rename(payload.snap_id,
+                                                    payload.snap_name, ctx);
+      }
+      return complete;
     } else if (r < 0) {
       encode(ResponseMessage(r), ack_ctx->out);
     }
@@ -1162,13 +1209,27 @@ bool ImageWatcher<I>::handle_payload(const SnapRemovePayload &payload,
     }
     int r;
     if (m_image_ctx.exclusive_lock->accept_request(request_type, &r)) {
-      ldout(m_image_ctx.cct, 10) << this << " remote snap_remove request: "
-                                << payload.snap_name << dendl;
+      bool new_request;
+      Context *ctx;
+      bool complete;
+      if (payload.async_request_id) {
+        r = prepare_async_request(payload.async_request_id, &new_request,
+                                  &ctx, nullptr);
+        encode(ResponseMessage(r), ack_ctx->out);
+        complete = true;
+      } else {
+        new_request = true;
+        ctx = new C_ResponseMessage(ack_ctx);
+        complete = false;
+      }
+      if (r == 0 && new_request) {
+        ldout(m_image_ctx.cct, 10) << this << " remote snap_remove request: "
+                                   << payload.snap_name << dendl;
 
-      m_image_ctx.operations->execute_snap_remove(payload.snap_namespace,
-                                                 payload.snap_name,
-                                                  new C_ResponseMessage(ack_ctx));
-      return false;
+        m_image_ctx.operations->execute_snap_remove(payload.snap_namespace,
+                                                    payload.snap_name, ctx);
+      }
+      return complete;
     } else if (r < 0) {
       encode(ResponseMessage(r), ack_ctx->out);
     }
@@ -1184,13 +1245,27 @@ bool ImageWatcher<I>::handle_payload(const SnapProtectPayload& payload,
     int r;
     if (m_image_ctx.exclusive_lock->accept_request(
           exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL, &r)) {
-      ldout(m_image_ctx.cct, 10) << this << " remote snap_protect request: "
-                                 << payload.snap_name << dendl;
+      bool new_request;
+      Context *ctx;
+      bool complete;
+      if (payload.async_request_id) {
+        r = prepare_async_request(payload.async_request_id, &new_request,
+                                  &ctx, nullptr);
+        encode(ResponseMessage(r), ack_ctx->out);
+        complete = true;
+      } else {
+        new_request = true;
+        ctx = new C_ResponseMessage(ack_ctx);
+        complete = false;
+      }
+      if (r == 0 && new_request) {      
+        ldout(m_image_ctx.cct, 10) << this << " remote snap_protect request: "
+                                   << payload.snap_name << dendl;
 
-      m_image_ctx.operations->execute_snap_protect(payload.snap_namespace,
-                                                  payload.snap_name,
-                                                   new C_ResponseMessage(ack_ctx));
-      return false;
+        m_image_ctx.operations->execute_snap_protect(payload.snap_namespace,
+                                                     payload.snap_name, ctx);
+      }
+      return complete;
     } else if (r < 0) {
       encode(ResponseMessage(r), ack_ctx->out);
     }
@@ -1206,13 +1281,27 @@ bool ImageWatcher<I>::handle_payload(const SnapUnprotectPayload& payload,
     int r;
     if (m_image_ctx.exclusive_lock->accept_request(
           exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL, &r)) {
-      ldout(m_image_ctx.cct, 10) << this << " remote snap_unprotect request: "
-                                 << payload.snap_name << dendl;
+      bool new_request;
+      Context *ctx;
+      bool complete;
+      if (payload.async_request_id) {
+        r = prepare_async_request(payload.async_request_id, &new_request,
+                                  &ctx, nullptr);
+        encode(ResponseMessage(r), ack_ctx->out);
+        complete = true;
+      } else {
+        new_request = true;
+        ctx = new C_ResponseMessage(ack_ctx);
+        complete = false;
+      }
+      if (r == 0 && new_request) {      
+        ldout(m_image_ctx.cct, 10) << this << " remote snap_unprotect request: "
+                                   << payload.snap_name << dendl;
 
-      m_image_ctx.operations->execute_snap_unprotect(payload.snap_namespace,
-                                                    payload.snap_name,
-                                                     new C_ResponseMessage(ack_ctx));
-      return false;
+        m_image_ctx.operations->execute_snap_unprotect(payload.snap_namespace,
+                                                       payload.snap_name, ctx);
+      }
+      return complete;
     } else if (r < 0) {
       encode(ResponseMessage(r), ack_ctx->out);
     }
@@ -1256,12 +1345,26 @@ bool ImageWatcher<I>::handle_payload(const RenamePayload& payload,
     int r;
     if (m_image_ctx.exclusive_lock->accept_request(
           exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL, &r)) {
-      ldout(m_image_ctx.cct, 10) << this << " remote rename request: "
-                                 << payload.image_name << dendl;
+      bool new_request;
+      Context *ctx;
+      bool complete;
+      if (payload.async_request_id) {
+        r = prepare_async_request(payload.async_request_id, &new_request,
+                                  &ctx, nullptr);
+        encode(ResponseMessage(r), ack_ctx->out);
+        complete = true;
+      } else {
+        new_request = true;
+        ctx = new C_ResponseMessage(ack_ctx);
+        complete = false;
+      }
+      if (r == 0 && new_request) {      
+        ldout(m_image_ctx.cct, 10) << this << " remote rename request: "
+                                   << payload.image_name << dendl;
 
-      m_image_ctx.operations->execute_rename(payload.image_name,
-                                             new C_ResponseMessage(ack_ctx));
-      return false;
+        m_image_ctx.operations->execute_rename(payload.image_name, ctx);
+      }
+      return complete;
     } else if (r < 0) {
       encode(ResponseMessage(r), ack_ctx->out);
     }
@@ -1277,14 +1380,30 @@ bool ImageWatcher<I>::handle_payload(const UpdateFeaturesPayload& payload,
     int r;
     if (m_image_ctx.exclusive_lock->accept_request(
           exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL, &r)) {
-      ldout(m_image_ctx.cct, 10) << this << " remote update_features request: "
-                                 << payload.features << " "
-                                 << (payload.enabled ? "enabled" : "disabled")
-                                 << dendl;
-
-      m_image_ctx.operations->execute_update_features(
-        payload.features, payload.enabled, new C_ResponseMessage(ack_ctx), 0);
-      return false;
+      bool new_request;
+      Context *ctx;
+      bool complete;
+      if (payload.async_request_id) {
+        r = prepare_async_request(payload.async_request_id, &new_request,
+                                  &ctx, nullptr);
+        encode(ResponseMessage(r), ack_ctx->out);
+        complete = true;
+      } else {
+        new_request = true;
+        ctx = new C_ResponseMessage(ack_ctx);
+        complete = false;
+      }
+      if (r == 0 && new_request) {      
+        ldout(m_image_ctx.cct, 10) << this << " remote update_features request: "
+                                   << payload.features << " "
+                                   << (payload.enabled ? "enabled" : "disabled")
+                                   << dendl;
+
+        m_image_ctx.operations->execute_update_features(payload.features,
+                                                        payload.enabled, ctx,
+                                                        0);
+      }
+      return complete;
     } else if (r < 0) {
       encode(ResponseMessage(r), ack_ctx->out);
     }
@@ -1382,22 +1501,35 @@ bool ImageWatcher<I>::handle_payload(const MetadataUpdatePayload &payload,
     int r;
     if (m_image_ctx.exclusive_lock->accept_request(
           exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL, &r)) {
-      if (payload.value) {
-        ldout(m_image_ctx.cct, 10) << this << " remote metadata_set request: key="
-                                   << payload.key << ", value="
-                                   << *payload.value << dendl;
-
-        m_image_ctx.operations->execute_metadata_set(payload.key, *payload.value,
-                                                     new C_ResponseMessage(ack_ctx));
-        return false;
+      bool new_request;
+      Context *ctx;
+      bool complete;
+      if (payload.async_request_id) {
+        r = prepare_async_request(payload.async_request_id, &new_request,
+                                  &ctx, nullptr);
+        encode(ResponseMessage(r), ack_ctx->out);
+        complete = true;
       } else {
-        ldout(m_image_ctx.cct, 10) << this << " remote metadata_remove request: key="
-                                   << payload.key << dendl;
-
-        m_image_ctx.operations->execute_metadata_remove(payload.key,
-                                                        new C_ResponseMessage(ack_ctx));
-        return false;
+        new_request = true;
+        ctx = new C_ResponseMessage(ack_ctx);
+        complete = false;
       }
+      if (r == 0 && new_request) {      
+        if (payload.value) {
+          ldout(m_image_ctx.cct, 10) << this << " remote metadata_set request: "
+                                     << "key=" << payload.key << ", value="
+                                     << *payload.value << dendl;
+
+          m_image_ctx.operations->execute_metadata_set(payload.key,
+                                                       *payload.value, ctx);
+        } else {
+          ldout(m_image_ctx.cct, 10) << this << " remote metadata_remove request: "
+                                     << "key=" << payload.key << dendl;
+
+          m_image_ctx.operations->execute_metadata_remove(payload.key, ctx);
+        }
+      }
+      return complete;
     } else if (r < 0) {
       encode(ResponseMessage(r), ack_ctx->out);
     }
index c445974c79ae24ae6e639f1d39716ca79abf1fd4..8bd6e07a1a81918a2b3e3eaf3d21a9589b763970 100644 (file)
@@ -11,6 +11,7 @@
 #include "include/rbd/librbd.hpp"
 #include "librbd/Watcher.h"
 #include "librbd/WatchNotifyTypes.h"
+#include "librbd/internal.h"
 #include <set>
 #include <string>
 #include <utility>
@@ -41,23 +42,29 @@ public:
                           uint64_t flags,
                           ProgressContext &prog_ctx,
                          Context *on_finish);
-  void notify_snap_rename(const snapid_t &src_snap_id,
+  void notify_snap_rename(uint64_t request_id,
+                          const snapid_t &src_snap_id,
                           const std::string &dst_snap_name,
                           Context *on_finish);
-  void notify_snap_remove(const cls::rbd::SnapshotNamespace &snap_namespace,
+  void notify_snap_remove(uint64_t request_id,
+                          const cls::rbd::SnapshotNamespace &snap_namespace,
                          const std::string &snap_name,
                          Context *on_finish);
-  void notify_snap_protect(const cls::rbd::SnapshotNamespace &snap_namespace,
+  void notify_snap_protect(uint64_t request_id,
+                           const cls::rbd::SnapshotNamespace &snap_namespace,
                           const std::string &snap_name,
                           Context *on_finish);
-  void notify_snap_unprotect(const cls::rbd::SnapshotNamespace &snap_namespace,
+  void notify_snap_unprotect(uint64_t request_id,
+                             const cls::rbd::SnapshotNamespace &snap_namespace,
                             const std::string &snap_name,
                             Context *on_finish);
   void notify_rebuild_object_map(uint64_t request_id,
                                  ProgressContext &prog_ctx, Context *on_finish);
-  void notify_rename(const std::string &image_name, Context *on_finish);
+  void notify_rename(uint64_t request_id,
+                     const std::string &image_name, Context *on_finish);
 
-  void notify_update_features(uint64_t features, bool enabled,
+  void notify_update_features(uint64_t request_id,
+                              uint64_t features, bool enabled,
                               Context *on_finish);
 
   void notify_migrate(uint64_t request_id, ProgressContext &prog_ctx,
@@ -78,9 +85,11 @@ public:
                       Context *on_finish);
   void notify_unquiesce(uint64_t request_id, Context *on_finish);
 
-  void notify_metadata_set(const std::string &key, const std::string &value,
+  void notify_metadata_set(uint64_t request_id,
+                           const std::string &key, const std::string &value,
                            Context *on_finish);
-  void notify_metadata_remove(const std::string &key, Context *on_finish);
+  void notify_metadata_remove(uint64_t request_id,
+                              const std::string &key, Context *on_finish);
 
 private:
   enum TaskCode {
@@ -182,6 +191,8 @@ private:
 
   AsyncOpTracker m_async_op_tracker;
 
+  NoOpProgressContext m_no_op_prog_ctx;
+
   void handle_register_watch(int r);
 
   void schedule_cancel_async_requests();
index 7db2c0361d320269bc4927b79824d39eeeafa1c0..47e6187c8c7a09daa526b0dab91d8bb5bd32e75d 100644 (file)
@@ -538,14 +538,15 @@ int Operations<I>::rename(const char *dstname) {
   }
 
   if (m_image_ctx.test_features(RBD_FEATURE_JOURNALING)) {
+    uint64_t request_id = ++m_async_request_seq;
     r = invoke_async_request("rename",
                              exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL,
                              true,
                              boost::bind(&Operations<I>::execute_rename, this,
                                          dstname, _1),
                              boost::bind(&ImageWatcher<I>::notify_rename,
-                                         m_image_ctx.image_watcher, dstname,
-                                         _1));
+                                         m_image_ctx.image_watcher, request_id,
+                                         dstname, _1));
     if (r < 0 && r != -EEXIST) {
       return r;
     }
@@ -915,6 +916,7 @@ void Operations<I>::snap_remove(const cls::rbd::SnapshotNamespace& snap_namespac
   m_image_ctx.image_lock.unlock_shared();
 
   if (proxy_op) {
+    uint64_t request_id = ++m_async_request_seq;
     auto request_type = exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL;
     if (cls::rbd::get_snap_namespace_type(snap_namespace) ==
         cls::rbd::SNAPSHOT_NAMESPACE_TYPE_TRASH) {
@@ -922,9 +924,11 @@ void Operations<I>::snap_remove(const cls::rbd::SnapshotNamespace& snap_namespac
     }
     C_InvokeAsyncRequest<I> *req = new C_InvokeAsyncRequest<I>(
       m_image_ctx, "snap_remove", request_type, true,
-      boost::bind(&Operations<I>::execute_snap_remove, this, snap_namespace, snap_name, _1),
-      boost::bind(&ImageWatcher<I>::notify_snap_remove, m_image_ctx.image_watcher,
-                  snap_namespace, snap_name, _1),
+      boost::bind(&Operations<I>::execute_snap_remove, this, snap_namespace,
+                  snap_name, _1),
+      boost::bind(&ImageWatcher<I>::notify_snap_remove,
+                  m_image_ctx.image_watcher, request_id, snap_namespace,
+                  snap_name, _1),
       {-ENOENT}, on_finish);
     req->send();
   } else {
@@ -1012,14 +1016,15 @@ int Operations<I>::snap_rename(const char *srcname, const char *dstname) {
   }
 
   if (m_image_ctx.test_features(RBD_FEATURE_JOURNALING)) {
+    uint64_t request_id = ++m_async_request_seq;
     r = invoke_async_request("snap_rename",
                              exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL,
                              true,
                              boost::bind(&Operations<I>::execute_snap_rename,
                                          this, snap_id, dstname, _1),
                              boost::bind(&ImageWatcher<I>::notify_snap_rename,
-                                         m_image_ctx.image_watcher, snap_id,
-                                         dstname, _1));
+                                         m_image_ctx.image_watcher, request_id,
+                                         snap_id, dstname, _1));
     if (r < 0 && r != -EEXIST) {
       return r;
     }
@@ -1113,13 +1118,14 @@ int Operations<I>::snap_protect(const cls::rbd::SnapshotNamespace& snap_namespac
   }
 
   if (m_image_ctx.test_features(RBD_FEATURE_JOURNALING)) {
+    uint64_t request_id = ++m_async_request_seq;
     r = invoke_async_request("snap_protect",
                              exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL,
                              true,
                              boost::bind(&Operations<I>::execute_snap_protect,
                                          this, snap_namespace, snap_name, _1),
                              boost::bind(&ImageWatcher<I>::notify_snap_protect,
-                                         m_image_ctx.image_watcher,
+                                         m_image_ctx.image_watcher, request_id,
                                         snap_namespace, snap_name, _1));
     if (r < 0 && r != -EBUSY) {
       return r;
@@ -1210,13 +1216,14 @@ int Operations<I>::snap_unprotect(const cls::rbd::SnapshotNamespace& snap_namesp
   }
 
   if (m_image_ctx.test_features(RBD_FEATURE_JOURNALING)) {
+    uint64_t request_id = ++m_async_request_seq;
     r = invoke_async_request("snap_unprotect",
                              exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL,
                              true,
                              boost::bind(&Operations<I>::execute_snap_unprotect,
                                          this, snap_namespace, snap_name, _1),
                              boost::bind(&ImageWatcher<I>::notify_snap_unprotect,
-                                         m_image_ctx.image_watcher,
+                                         m_image_ctx.image_watcher, request_id,
                                         snap_namespace, snap_name, _1));
     if (r < 0 && r != -EINVAL) {
       return r;
@@ -1410,14 +1417,15 @@ int Operations<I>::update_features(uint64_t features, bool enabled) {
 
     r = cond_ctx.wait();
   } else {
+    uint64_t request_id = ++m_async_request_seq;
     r = invoke_async_request("update_features",
                              exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL,
                              false,
                              boost::bind(&Operations<I>::execute_update_features,
                                          this, features, enabled, _1, 0),
                              boost::bind(&ImageWatcher<I>::notify_update_features,
-                                         m_image_ctx.image_watcher, features,
-                                         enabled, _1));
+                                         m_image_ctx.image_watcher, request_id,
+                                         features, enabled, _1));
   }
   ldout(cct, 2) << "update_features finished" << dendl;
   return r;
@@ -1484,13 +1492,14 @@ int Operations<I>::metadata_set(const std::string &key,
     return -EROFS;
   }
 
+  uint64_t request_id = ++m_async_request_seq;
   r = invoke_async_request("metadata_set",
                            exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL,
                            false,
                            boost::bind(&Operations<I>::execute_metadata_set,
                                        this, key, value, _1),
                            boost::bind(&ImageWatcher<I>::notify_metadata_set,
-                                       m_image_ctx.image_watcher,
+                                       m_image_ctx.image_watcher, request_id,
                                        key, value, _1));
 
   if (config_override && r >= 0) {
@@ -1543,13 +1552,15 @@ int Operations<I>::metadata_remove(const std::string &key) {
   if(r < 0)
     return r;
 
+  uint64_t request_id = ++m_async_request_seq;
   r = invoke_async_request("metadata_remove",
                            exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL,
                            false,
                            boost::bind(&Operations<I>::execute_metadata_remove,
                                        this, key, _1),
                            boost::bind(&ImageWatcher<I>::notify_metadata_remove,
-                                       m_image_ctx.image_watcher, key, _1));
+                                       m_image_ctx.image_watcher, request_id,
+                                       key, _1));
   if (r == -ENOENT) {
     r = 0;
   }
index 2f145955977e844925b09b6dff75fe084af42734..413983f3e6377de1be9640ee977eb226018db1f6 100644 (file)
@@ -168,15 +168,16 @@ void ResizePayload::decode(__u8 version, bufferlist::const_iterator &iter) {
 }
 
 void ResizePayload::dump(Formatter *f) const {
+  AsyncRequestPayloadBase::dump(f);
   f->dump_unsigned("size", size);
   f->dump_bool("allow_shrink", allow_shrink);
-  AsyncRequestPayloadBase::dump(f);
 }
 
 void SnapPayloadBase::encode(bufferlist &bl) const {
   using ceph::encode;
   encode(snap_name, bl);
   encode(snap_namespace, bl);
+  encode(async_request_id, bl);
 }
 
 void SnapPayloadBase::decode(__u8 version, bufferlist::const_iterator &iter) {
@@ -185,9 +186,13 @@ void SnapPayloadBase::decode(__u8 version, bufferlist::const_iterator &iter) {
   if (version >= 6) {
     decode(snap_namespace, iter);
   }
+  if (version >= 7) {
+    decode(async_request_id, iter);
+  }
 }
 
 void SnapPayloadBase::dump(Formatter *f) const {
+  AsyncRequestPayloadBase::dump(f);
   f->dump_string("snap_name", snap_name);
   snap_namespace.dump(f);
 }
@@ -195,7 +200,6 @@ void SnapPayloadBase::dump(Formatter *f) const {
 void SnapCreatePayload::encode(bufferlist &bl) const {
   using ceph::encode;
   SnapPayloadBase::encode(bl);
-  encode(async_request_id, bl);
   encode(flags, bl);
 }
 
@@ -206,15 +210,11 @@ void SnapCreatePayload::decode(__u8 version, bufferlist::const_iterator &iter) {
     decode(snap_namespace, iter);
   }
   if (version >= 7) {
-    decode(async_request_id, iter);
     decode(flags, iter);
   }
 }
 
 void SnapCreatePayload::dump(Formatter *f) const {
-  f->open_object_section("async_request_id");
-  async_request_id.dump(f);
-  f->close_section();
   SnapPayloadBase::dump(f);
   f->dump_unsigned("flags", flags);
 }
@@ -232,21 +232,26 @@ void SnapRenamePayload::decode(__u8 version, bufferlist::const_iterator &iter) {
 }
 
 void SnapRenamePayload::dump(Formatter *f) const {
-  f->dump_unsigned("src_snap_id", snap_id);
   SnapPayloadBase::dump(f);
+  f->dump_unsigned("src_snap_id", snap_id);
 }
 
 void RenamePayload::encode(bufferlist &bl) const {
   using ceph::encode;
   encode(image_name, bl);
+  encode(async_request_id, bl);
 }
 
 void RenamePayload::decode(__u8 version, bufferlist::const_iterator &iter) {
   using ceph::decode;
   decode(image_name, iter);
+  if (version >= 7) {
+    decode(async_request_id, iter);
+  }
 }
 
 void RenamePayload::dump(Formatter *f) const {
+  AsyncRequestPayloadBase::dump(f);
   f->dump_string("image_name", image_name);
 }
 
@@ -254,15 +259,20 @@ void UpdateFeaturesPayload::encode(bufferlist &bl) const {
   using ceph::encode;
   encode(features, bl);
   encode(enabled, bl);
+  encode(async_request_id, bl);
 }
 
 void UpdateFeaturesPayload::decode(__u8 version, bufferlist::const_iterator &iter) {
   using ceph::decode;
   decode(features, iter);
   decode(enabled, iter);
+  if (version >= 7) {
+    decode(async_request_id, iter);
+  }
 }
 
 void UpdateFeaturesPayload::dump(Formatter *f) const {
+  AsyncRequestPayloadBase::dump(f);
   f->dump_unsigned("features", features);
   f->dump_bool("enabled", enabled);
 }
@@ -288,15 +298,20 @@ void MetadataUpdatePayload::encode(bufferlist &bl) const {
   using ceph::encode;
   encode(key, bl);
   encode(value, bl);
+  encode(async_request_id, bl);
 }
 
 void MetadataUpdatePayload::decode(__u8 version, bufferlist::const_iterator &iter) {
   using ceph::decode;
   decode(key, iter);
   decode(value, iter);
+  if (version >= 7) {
+    decode(async_request_id, iter);
+  }
 }
 
 void MetadataUpdatePayload::dump(Formatter *f) const {
+  AsyncRequestPayloadBase::dump(f);
   f->dump_string("key", key);
   f->dump_string("value", *value);
 }
@@ -415,21 +430,26 @@ void NotifyMessage::generate_test_instances(std::list<NotifyMessage *> &o) {
   o.push_back(new NotifyMessage(new AsyncProgressPayload(AsyncRequestId(ClientId(0, 1), 2), 3, 4)));
   o.push_back(new NotifyMessage(new AsyncCompletePayload(AsyncRequestId(ClientId(0, 1), 2), 3)));
   o.push_back(new NotifyMessage(new FlattenPayload(AsyncRequestId(ClientId(0, 1), 2))));
-  o.push_back(new NotifyMessage(new ResizePayload(123, true, AsyncRequestId(ClientId(0, 1), 2))));
+  o.push_back(new NotifyMessage(new ResizePayload(AsyncRequestId(ClientId(0, 1), 2), 123, true)));
   o.push_back(new NotifyMessage(new SnapCreatePayload(AsyncRequestId(ClientId(0, 1), 2),
                                                       cls::rbd::UserSnapshotNamespace(),
                                                       "foo", 1)));
-  o.push_back(new NotifyMessage(new SnapRemovePayload(cls::rbd::UserSnapshotNamespace(), "foo")));
-  o.push_back(new NotifyMessage(new SnapProtectPayload(cls::rbd::UserSnapshotNamespace(), "foo")));
-  o.push_back(new NotifyMessage(new SnapUnprotectPayload(cls::rbd::UserSnapshotNamespace(), "foo")));
+  o.push_back(new NotifyMessage(new SnapRemovePayload(AsyncRequestId(ClientId(0, 1), 2),
+                                                      cls::rbd::UserSnapshotNamespace(), "foo")));
+  o.push_back(new NotifyMessage(new SnapProtectPayload(AsyncRequestId(ClientId(0, 1), 2),
+                                                       cls::rbd::UserSnapshotNamespace(), "foo")));
+  o.push_back(new NotifyMessage(new SnapUnprotectPayload(AsyncRequestId(ClientId(0, 1), 2),
+                                                         cls::rbd::UserSnapshotNamespace(), "foo")));
   o.push_back(new NotifyMessage(new RebuildObjectMapPayload(AsyncRequestId(ClientId(0, 1), 2))));
-  o.push_back(new NotifyMessage(new RenamePayload("foo")));
-  o.push_back(new NotifyMessage(new UpdateFeaturesPayload(1, true)));
+  o.push_back(new NotifyMessage(new RenamePayload(AsyncRequestId(ClientId(0, 1), 2), "foo")));
+  o.push_back(new NotifyMessage(new UpdateFeaturesPayload(AsyncRequestId(ClientId(0, 1), 2),
+                                                          1, true)));
   o.push_back(new NotifyMessage(new MigratePayload(AsyncRequestId(ClientId(0, 1), 2))));
   o.push_back(new NotifyMessage(new SparsifyPayload(AsyncRequestId(ClientId(0, 1), 2), 1)));
   o.push_back(new NotifyMessage(new QuiescePayload(AsyncRequestId(ClientId(0, 1), 2))));
   o.push_back(new NotifyMessage(new UnquiescePayload(AsyncRequestId(ClientId(0, 1), 2))));
-  o.push_back(new NotifyMessage(new MetadataUpdatePayload("foo", std::optional<std::string>{"xyz"})));
+  o.push_back(new NotifyMessage(new MetadataUpdatePayload(AsyncRequestId(ClientId(0, 1), 2),
+                                                          "foo", std::optional<std::string>{"xyz"})));
 }
 
 void ResponseMessage::encode(bufferlist& bl) const {
index b8101f45413017cb3db0e9a390b6c5d804fe14ac..ca0b40f28f0c80df39e065b7fe806880496b479e 100644 (file)
@@ -226,7 +226,7 @@ struct ResizePayload : public AsyncRequestPayloadBase {
   bool allow_shrink = true;
 
   ResizePayload() {}
-  ResizePayload(uint64_t size, bool allow_shrink, const AsyncRequestId &id)
+  ResizePayload(const AsyncRequestId &id, uint64_t size, bool allow_shrink)
     : AsyncRequestPayloadBase(id), size(size), allow_shrink(allow_shrink) {}
 
   NotifyOp get_notify_op() const override {
@@ -241,7 +241,7 @@ struct ResizePayload : public AsyncRequestPayloadBase {
   void dump(Formatter *f) const override;
 };
 
-struct SnapPayloadBase : public Payload {
+struct SnapPayloadBase : public AsyncRequestPayloadBase {
 public:
   cls::rbd::SnapshotNamespace snap_namespace;
   std::string snap_name;
@@ -256,21 +256,22 @@ public:
 
 protected:
   SnapPayloadBase() {}
-  SnapPayloadBase(const cls::rbd::SnapshotNamespace& snap_namespace,
+  SnapPayloadBase(const AsyncRequestId &id,
+                  const cls::rbd::SnapshotNamespace& snap_namespace,
                  const std::string &name)
-    : snap_namespace(snap_namespace), snap_name(name) {}
+    : AsyncRequestPayloadBase(id), snap_namespace(snap_namespace),
+      snap_name(name) {
+  }
 };
 
 struct SnapCreatePayload : public SnapPayloadBase {
-  AsyncRequestId async_request_id;
   uint64_t flags = 0;
 
   SnapCreatePayload() {}
   SnapCreatePayload(const AsyncRequestId &id,
                     const cls::rbd::SnapshotNamespace &snap_namespace,
                    const std::string &name, uint64_t flags)
-    : SnapPayloadBase(snap_namespace, name), async_request_id(id),
-      flags(flags) {
+    : SnapPayloadBase(id, snap_namespace, name), flags(flags) {
   }
 
   NotifyOp get_notify_op() const override {
@@ -286,9 +287,12 @@ struct SnapRenamePayload : public SnapPayloadBase {
   uint64_t snap_id = 0;
 
   SnapRenamePayload() {}
-  SnapRenamePayload(const uint64_t &src_snap_id,
+  SnapRenamePayload(const AsyncRequestId &id,
+                    const uint64_t &src_snap_id,
                    const std::string &dst_name)
-    : SnapPayloadBase(cls::rbd::UserSnapshotNamespace(), dst_name), snap_id(src_snap_id) {}
+    : SnapPayloadBase(id, cls::rbd::UserSnapshotNamespace(), dst_name),
+      snap_id(src_snap_id) {
+  }
 
   NotifyOp get_notify_op() const override {
     return NOTIFY_OP_SNAP_RENAME;
@@ -301,9 +305,11 @@ struct SnapRenamePayload : public SnapPayloadBase {
 
 struct SnapRemovePayload : public SnapPayloadBase {
   SnapRemovePayload() {}
-  SnapRemovePayload(const cls::rbd::SnapshotNamespace& snap_namespace,
+  SnapRemovePayload(const AsyncRequestId &id,
+                    const cls::rbd::SnapshotNamespace& snap_namespace,
                    const std::string &name)
-    : SnapPayloadBase(snap_namespace, name) {}
+    : SnapPayloadBase(id, snap_namespace, name) {
+  }
 
   NotifyOp get_notify_op() const override {
     return NOTIFY_OP_SNAP_REMOVE;
@@ -312,9 +318,11 @@ struct SnapRemovePayload : public SnapPayloadBase {
 
 struct SnapProtectPayload : public SnapPayloadBase {
   SnapProtectPayload() {}
-  SnapProtectPayload(const cls::rbd::SnapshotNamespace& snap_namespace,
+  SnapProtectPayload(const AsyncRequestId &id,
+                     const cls::rbd::SnapshotNamespace& snap_namespace,
                     const std::string &name)
-    : SnapPayloadBase(snap_namespace, name) {}
+    : SnapPayloadBase(id, snap_namespace, name) {
+  }
 
   NotifyOp get_notify_op() const override {
     return NOTIFY_OP_SNAP_PROTECT;
@@ -323,9 +331,11 @@ struct SnapProtectPayload : public SnapPayloadBase {
 
 struct SnapUnprotectPayload : public SnapPayloadBase {
   SnapUnprotectPayload() {}
-  SnapUnprotectPayload(const cls::rbd::SnapshotNamespace& snap_namespace,
+  SnapUnprotectPayload(const AsyncRequestId &id,
+                       const cls::rbd::SnapshotNamespace& snap_namespace,
                       const std::string &name)
-    : SnapPayloadBase(snap_namespace, name) {}
+    : SnapPayloadBase(id, snap_namespace, name) {
+  }
 
   NotifyOp get_notify_op() const override {
     return NOTIFY_OP_SNAP_UNPROTECT;
@@ -345,11 +355,13 @@ struct RebuildObjectMapPayload : public AsyncRequestPayloadBase {
   }
 };
 
-struct RenamePayload : public Payload {
+struct RenamePayload : public AsyncRequestPayloadBase {
   std::string image_name;
 
   RenamePayload() {}
-  RenamePayload(const std::string _image_name) : image_name(_image_name) {}
+  RenamePayload(const AsyncRequestId &id, const std::string _image_name)
+    : AsyncRequestPayloadBase(id), image_name(_image_name) {
+  }
 
   NotifyOp get_notify_op() const override {
     return NOTIFY_OP_RENAME;
@@ -363,13 +375,15 @@ struct RenamePayload : public Payload {
   void dump(Formatter *f) const;
 };
 
-struct UpdateFeaturesPayload : public Payload {
+struct UpdateFeaturesPayload : public AsyncRequestPayloadBase {
   uint64_t features = 0;
   bool enabled = false;
 
   UpdateFeaturesPayload() {}
-  UpdateFeaturesPayload(uint64_t features, bool enabled)
-    : features(features), enabled(enabled) {}
+  UpdateFeaturesPayload(const AsyncRequestId &id, uint64_t features,
+                        bool enabled)
+    : AsyncRequestPayloadBase(id), features(features), enabled(enabled) {
+  }
 
   NotifyOp get_notify_op() const override {
     return NOTIFY_OP_UPDATE_FEATURES;
@@ -439,12 +453,14 @@ struct UnquiescePayload : public AsyncRequestPayloadBase {
   }
 };
 
-struct MetadataUpdatePayload : public Payload {
+struct MetadataUpdatePayload : public AsyncRequestPayloadBase {
   std::string key;
   std::optional<std::string> value;
   MetadataUpdatePayload() {}
-  MetadataUpdatePayload(std::string key, std::optional<std::string> value)
-    : key(key), value(value) {}
+  MetadataUpdatePayload(const AsyncRequestId &id, std::string key,
+                        std::optional<std::string> value)
+    : AsyncRequestPayloadBase(id), key(key), value(value) {
+  }
 
   NotifyOp get_notify_op() const override {
     return NOTIFY_OP_METADATA_UPDATE;
index a3cdfdb5e3410ce19a7f9f4fdb65dee48461bf02..f02c7b37b814621c4113b4877bb7375220522f20 100644 (file)
@@ -175,6 +175,41 @@ public:
         *id = payload.async_request_id;
       }
       return true;
+    case NOTIFY_OP_SNAP_RENAME:
+      {
+        SnapRenamePayload payload;
+        payload.decode(7, iter);
+        *id = payload.async_request_id;
+      }
+      return true;
+    case NOTIFY_OP_SNAP_REMOVE:
+      {
+        SnapRemovePayload payload;
+        payload.decode(7, iter);
+        *id = payload.async_request_id;
+      }
+      return true;
+    case NOTIFY_OP_SNAP_PROTECT:
+      {
+        SnapProtectPayload payload;
+        payload.decode(7, iter);
+        *id = payload.async_request_id;
+      }
+      return true;
+    case NOTIFY_OP_SNAP_UNPROTECT:
+      {
+        SnapUnprotectPayload payload;
+        payload.decode(7, iter);
+        *id = payload.async_request_id;
+      }
+      return true;
+    case NOTIFY_OP_RENAME:
+      {
+        RenamePayload payload;
+        payload.decode(7, iter);
+        *id = payload.async_request_id;
+      }
+      return true;
     case NOTIFY_OP_REBUILD_OBJECT_MAP:
       {
         RebuildObjectMapPayload payload;
@@ -293,7 +328,90 @@ struct SnapCreateTask {
     C_SaferCond ctx;
     ictx->image_watcher->notify_snap_create(0, cls::rbd::UserSnapshotNamespace(),
                                             "snap", 0, *progress_context, &ctx);
-    ASSERT_EQ(0, ctx.wait());
+    result = ctx.wait();
+  }
+};
+
+struct SnapRenameTask {
+  librbd::ImageCtx *ictx;
+  int result = 0;
+
+  SnapRenameTask(librbd::ImageCtx *ictx)
+    : ictx(ictx) {
+  }
+
+  void operator()() {
+    std::shared_lock l{ictx->owner_lock};
+    C_SaferCond ctx;
+    ictx->image_watcher->notify_snap_rename(0, 1, "snap-rename", &ctx);
+    result = ctx.wait();
+  }
+};
+
+struct SnapRemoveTask {
+  librbd::ImageCtx *ictx;
+  int result = 0;
+
+  SnapRemoveTask(librbd::ImageCtx *ictx)
+    : ictx(ictx) {
+  }
+
+  void operator()() {
+    std::shared_lock l{ictx->owner_lock};
+    C_SaferCond ctx;
+    ictx->image_watcher->notify_snap_remove(
+        0, cls::rbd::UserSnapshotNamespace(), "snap", &ctx);
+    result = ctx.wait();
+  }
+};
+
+struct SnapProtectTask {
+  librbd::ImageCtx *ictx;
+  int result = 0;
+
+  SnapProtectTask(librbd::ImageCtx *ictx)
+    : ictx(ictx) {
+  }
+
+  void operator()() {
+    std::shared_lock l{ictx->owner_lock};
+    C_SaferCond ctx;
+    ictx->image_watcher->notify_snap_protect(
+        0, cls::rbd::UserSnapshotNamespace(), "snap", &ctx);
+    result = ctx.wait();
+  }
+};
+
+struct SnapUnprotectTask {
+  librbd::ImageCtx *ictx;
+  int result = 0;
+
+  SnapUnprotectTask(librbd::ImageCtx *ictx)
+    : ictx(ictx) {
+  }
+
+  void operator()() {
+    std::shared_lock l{ictx->owner_lock};
+    C_SaferCond ctx;
+    ictx->image_watcher->notify_snap_unprotect(
+        0, cls::rbd::UserSnapshotNamespace(), "snap", &ctx);
+    result = ctx.wait();
+  }
+};
+
+struct RenameTask {
+  librbd::ImageCtx *ictx;
+  int result = 0;
+
+  RenameTask(librbd::ImageCtx *ictx)
+    : ictx(ictx) {
+  }
+
+  void operator()() {
+    std::shared_lock l{ictx->owner_lock};
+    C_SaferCond ctx;
+    ictx->image_watcher->notify_rename(0, "new_name", &ctx);
+    result = ctx.wait();
   }
 };
 
@@ -505,14 +623,23 @@ TEST_F(TestImageWatcher, NotifySnapRename) {
 
   m_notify_acks = {{NOTIFY_OP_SNAP_RENAME, create_response_message(0)}};
 
-  std::shared_lock l{ictx->owner_lock};
-  C_SaferCond notify_ctx;
-  ictx->image_watcher->notify_snap_rename(1, "snap-rename", &notify_ctx);
-  ASSERT_EQ(0, notify_ctx.wait());
+  SnapRenameTask snap_rename_task(ictx);
+  boost::thread thread(boost::ref(snap_rename_task));
+
+  ASSERT_TRUE(wait_for_notifies(*ictx));
 
   NotifyOps expected_notify_ops;
   expected_notify_ops += NOTIFY_OP_SNAP_RENAME;
   ASSERT_EQ(expected_notify_ops, m_notifies);
+
+  AsyncRequestId async_request_id;
+  ASSERT_TRUE(extract_async_request_id(NOTIFY_OP_SNAP_RENAME,
+                                       &async_request_id));
+
+  ASSERT_EQ(0, notify_async_complete(ictx, async_request_id, 0));
+
+  ASSERT_TRUE(thread.timed_join(boost::posix_time::seconds(10)));
+  ASSERT_EQ(0, snap_rename_task.result);
 }
 
 TEST_F(TestImageWatcher, NotifySnapRenameError) {
@@ -529,7 +656,7 @@ TEST_F(TestImageWatcher, NotifySnapRenameError) {
 
   std::shared_lock l{ictx->owner_lock};
   C_SaferCond notify_ctx;
-  ictx->image_watcher->notify_snap_rename(1, "snap-rename", &notify_ctx);
+  ictx->image_watcher->notify_snap_rename(0, 1, "snap-rename", &notify_ctx);
   ASSERT_EQ(-EEXIST, notify_ctx.wait());
 
   NotifyOps expected_notify_ops;
@@ -549,16 +676,23 @@ TEST_F(TestImageWatcher, NotifySnapRemove) {
 
   m_notify_acks = {{NOTIFY_OP_SNAP_REMOVE, create_response_message(0)}};
 
-  std::shared_lock l{ictx->owner_lock};
-  C_SaferCond notify_ctx;
-  ictx->image_watcher->notify_snap_remove(cls::rbd::UserSnapshotNamespace(),
-                                         "snap",
-                                         &notify_ctx);
-  ASSERT_EQ(0, notify_ctx.wait());
+  SnapRemoveTask snap_remove_task(ictx);
+  boost::thread thread(boost::ref(snap_remove_task));
+
+  ASSERT_TRUE(wait_for_notifies(*ictx));
 
   NotifyOps expected_notify_ops;
   expected_notify_ops += NOTIFY_OP_SNAP_REMOVE;
   ASSERT_EQ(expected_notify_ops, m_notifies);
+
+  AsyncRequestId async_request_id;
+  ASSERT_TRUE(extract_async_request_id(NOTIFY_OP_SNAP_REMOVE,
+                                       &async_request_id));
+
+  ASSERT_EQ(0, notify_async_complete(ictx, async_request_id, 0));
+
+  ASSERT_TRUE(thread.timed_join(boost::posix_time::seconds(10)));
+  ASSERT_EQ(0, snap_remove_task.result);
 }
 
 TEST_F(TestImageWatcher, NotifySnapProtect) {
@@ -573,16 +707,23 @@ TEST_F(TestImageWatcher, NotifySnapProtect) {
 
   m_notify_acks = {{NOTIFY_OP_SNAP_PROTECT, create_response_message(0)}};
 
-  std::shared_lock l{ictx->owner_lock};
-  C_SaferCond notify_ctx;
-  ictx->image_watcher->notify_snap_protect(cls::rbd::UserSnapshotNamespace(),
-                                          "snap",
-                                          &notify_ctx);
-  ASSERT_EQ(0, notify_ctx.wait());
+  SnapProtectTask snap_protect_task(ictx);
+  boost::thread thread(boost::ref(snap_protect_task));
+
+  ASSERT_TRUE(wait_for_notifies(*ictx));
 
   NotifyOps expected_notify_ops;
   expected_notify_ops += NOTIFY_OP_SNAP_PROTECT;
   ASSERT_EQ(expected_notify_ops, m_notifies);
+
+  AsyncRequestId async_request_id;
+  ASSERT_TRUE(extract_async_request_id(NOTIFY_OP_SNAP_PROTECT,
+                                       &async_request_id));
+
+  ASSERT_EQ(0, notify_async_complete(ictx, async_request_id, 0));
+
+  ASSERT_TRUE(thread.timed_join(boost::posix_time::seconds(10)));
+  ASSERT_EQ(0, snap_protect_task.result);
 }
 
 TEST_F(TestImageWatcher, NotifySnapUnprotect) {
@@ -597,16 +738,23 @@ TEST_F(TestImageWatcher, NotifySnapUnprotect) {
 
   m_notify_acks = {{NOTIFY_OP_SNAP_UNPROTECT, create_response_message(0)}};
 
-  std::shared_lock l{ictx->owner_lock};
-  C_SaferCond notify_ctx;
-  ictx->image_watcher->notify_snap_unprotect(cls::rbd::UserSnapshotNamespace(),
-                                            "snap",
-                                            &notify_ctx);
-  ASSERT_EQ(0, notify_ctx.wait());
+  SnapUnprotectTask snap_unprotect_task(ictx);
+  boost::thread thread(boost::ref(snap_unprotect_task));
+
+  ASSERT_TRUE(wait_for_notifies(*ictx));
 
   NotifyOps expected_notify_ops;
   expected_notify_ops += NOTIFY_OP_SNAP_UNPROTECT;
   ASSERT_EQ(expected_notify_ops, m_notifies);
+
+  AsyncRequestId async_request_id;
+  ASSERT_TRUE(extract_async_request_id(NOTIFY_OP_SNAP_UNPROTECT,
+                                       &async_request_id));
+
+  ASSERT_EQ(0, notify_async_complete(ictx, async_request_id, 0));
+
+  ASSERT_TRUE(thread.timed_join(boost::posix_time::seconds(10)));
+  ASSERT_EQ(0, snap_unprotect_task.result);
 }
 
 TEST_F(TestImageWatcher, NotifyRename) {
@@ -621,14 +769,23 @@ TEST_F(TestImageWatcher, NotifyRename) {
 
   m_notify_acks = {{NOTIFY_OP_RENAME, create_response_message(0)}};
 
-  std::shared_lock l{ictx->owner_lock};
-  C_SaferCond notify_ctx;
-  ictx->image_watcher->notify_rename("new_name", &notify_ctx);
-  ASSERT_EQ(0, notify_ctx.wait());
+  RenameTask rename_task(ictx);
+  boost::thread thread(boost::ref(rename_task));
+
+  ASSERT_TRUE(wait_for_notifies(*ictx));
 
   NotifyOps expected_notify_ops;
   expected_notify_ops += NOTIFY_OP_RENAME;
   ASSERT_EQ(expected_notify_ops, m_notifies);
+
+  AsyncRequestId async_request_id;
+  ASSERT_TRUE(extract_async_request_id(NOTIFY_OP_RENAME,
+                                       &async_request_id));
+
+  ASSERT_EQ(0, notify_async_complete(ictx, async_request_id, 0));
+
+  ASSERT_TRUE(thread.timed_join(boost::posix_time::seconds(10)));
+  ASSERT_EQ(0, rename_task.result);
 }
 
 TEST_F(TestImageWatcher, NotifyAsyncTimedOut) {