]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
librbd: introduced support for blocking watch notifications
authorJason Dillaman <dillaman@redhat.com>
Thu, 18 May 2017 20:25:55 +0000 (16:25 -0400)
committerJason Dillaman <dillaman@redhat.com>
Fri, 19 May 2017 12:43:37 +0000 (08:43 -0400)
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/librbd/ImageWatcher.cc
src/librbd/ImageWatcher.h
src/librbd/Watcher.cc
src/librbd/Watcher.h

index 0e85a6a4c42700095de36645fa79e51ec2ca771f..ca612a9ded86a4278b439b5175d2f4ff8d7bbc42 100644 (file)
@@ -33,6 +33,32 @@ using librbd::watcher::util::HandlePayloadVisitor;
 
 static const double    RETRY_DELAY_SECONDS = 1.0;
 
+template <typename I>
+struct ImageWatcher<I>::C_ProcessPayload : public Context {
+  ImageWatcher *image_watcher;
+  uint64_t notify_id;
+  uint64_t handle;
+  watch_notify::Payload payload;
+
+  C_ProcessPayload(ImageWatcher *image_watcher_, uint64_t notify_id_,
+                   uint64_t handle_, const watch_notify::Payload &payload)
+    : image_watcher(image_watcher_), notify_id(notify_id_), handle(handle_),
+      payload(payload) {
+  }
+
+  void finish(int r) override {
+    image_watcher->m_async_op_tracker.start_op();
+    if (image_watcher->notifications_blocked()) {
+      // requests are blocked -- just ack the notification
+      bufferlist bl;
+      image_watcher->acknowledge_notify(notify_id, handle, bl);
+    } else {
+      image_watcher->process_payload(notify_id, handle, payload);
+    }
+    image_watcher->m_async_op_tracker.finish_op();
+  }
+};
+
 template <typename I>
 ImageWatcher<I>::ImageWatcher(I &image_ctx)
   : Watcher(image_ctx.md_ctx, image_ctx.op_work_queue, image_ctx.header_oid),
@@ -62,6 +88,18 @@ void ImageWatcher<I>::unregister_watch(Context *on_finish) {
   Watcher::unregister_watch(ctx);
 }
 
+template <typename I>
+void ImageWatcher<I>::block_notifies(Context *on_finish) {
+  CephContext *cct = m_image_ctx.cct;
+  ldout(cct, 10) << this << " "  << __func__ << dendl;
+
+  on_finish = new FunctionContext([this, on_finish](int r) {
+      cancel_async_requests();
+      on_finish->complete(r);
+    });
+  Watcher::block_notifies(on_finish);
+}
+
 template <typename I>
 void ImageWatcher<I>::schedule_async_progress(const AsyncRequestId &request,
                                              uint64_t offset, uint64_t total) {
@@ -884,15 +922,9 @@ bool ImageWatcher<I>::handle_payload(const UnknownPayload &payload,
 
 template <typename I>
 void ImageWatcher<I>::process_payload(uint64_t notify_id, uint64_t handle,
-                                      const Payload &payload, int r) {
-  if (r < 0) {
-    bufferlist out_bl;
-    this->acknowledge_notify(notify_id, handle, out_bl);
-  } else {
-    apply_visitor(HandlePayloadVisitor<ImageWatcher<I>>(this, notify_id,
-                                                       handle),
-                  payload);
-  }
+                                      const Payload &payload) {
+  apply_visitor(HandlePayloadVisitor<ImageWatcher<I>>(this, notify_id, handle),
+                payload);
 }
 
 template <typename I>
@@ -919,7 +951,7 @@ void ImageWatcher<I>::handle_notify(uint64_t notify_id, uint64_t handle,
     m_image_ctx.state->refresh(new C_ProcessPayload(this, notify_id, handle,
                                                     notify_message.payload));
   } else {
-    process_payload(notify_id, handle, notify_message.payload, 0);
+    process_payload(notify_id, handle, notify_message.payload);
   }
 }
 
index 838e0e3d0122aedef422e982ca5bd870dcbb3504..5e30c8e5b3beba851ec59e593558c69468a29706 100644 (file)
@@ -36,7 +36,8 @@ public:
   ImageWatcher(ImageCtxT& image_ctx);
   ~ImageWatcher() override;
 
-  void unregister_watch(Context *on_finish);
+  void unregister_watch(Context *on_finish) override;
+  void block_notifies(Context *on_finish) override;
 
   void notify_flatten(uint64_t request_id, ProgressContext &prog_ctx,
                       Context *on_finish);
@@ -145,23 +146,7 @@ private:
     ProgressContext *m_prog_ctx;
   };
 
-  struct C_ProcessPayload : public Context {
-    ImageWatcher *image_watcher;
-    uint64_t notify_id;
-    uint64_t handle;
-    watch_notify::Payload payload;
-
-    C_ProcessPayload(ImageWatcher *image_watcher_, uint64_t notify_id_,
-                     uint64_t handle_, const watch_notify::Payload &payload)
-      : image_watcher(image_watcher_), notify_id(notify_id_), handle(handle_),
-        payload(payload) {
-    }
-
-    void finish(int r) override {
-      image_watcher->process_payload(notify_id, handle, payload, r);
-    }
-  };
-
+  struct C_ProcessPayload;
   struct C_ResponseMessage : public Context {
     C_NotifyAck *notify_ack;
 
@@ -251,7 +236,7 @@ private:
   bool handle_payload(const watch_notify::UnknownPayload& payload,
                       C_NotifyAck *ctx);
   void process_payload(uint64_t notify_id, uint64_t handle,
-                       const watch_notify::Payload &payload, int r);
+                       const watch_notify::Payload &payload);
 
   void handle_notify(uint64_t notify_id, uint64_t handle,
                      uint64_t notifier_id, bufferlist &bl) override;
index 98a0e7f0be42d4c8a9d1348e83f5bb1d49112072..54a2246f12cff7ea82c41353141b30a0624de84a 100644 (file)
@@ -174,6 +174,30 @@ void Watcher::unregister_watch(Context *on_finish) {
   on_finish->complete(0);
 }
 
+bool Watcher::notifications_blocked() const {
+  RWLock::RLocker locker(m_watch_lock);
+
+  bool blocked = (m_blocked_count > 0);
+  ldout(m_cct, 5) << "blocked=" << blocked << dendl;
+  return blocked;
+}
+
+void Watcher::block_notifies(Context *on_finish) {
+  {
+    RWLock::WLocker locker(m_watch_lock);
+    ++m_blocked_count;
+    ldout(m_cct, 5) << "blocked_count=" << m_blocked_count << dendl;
+  }
+  m_async_op_tracker.wait_for_ops(on_finish);
+}
+
+void Watcher::unblock_notifies() {
+  RWLock::WLocker locker(m_watch_lock);
+  assert(m_blocked_count > 0);
+  --m_blocked_count;
+  ldout(m_cct, 5) << "blocked_count=" << m_blocked_count << dendl;
+}
+
 void Watcher::flush(Context *on_finish) {
   m_notifier.flush(on_finish);
 }
@@ -260,11 +284,18 @@ void Watcher::send_notify(bufferlist& payload,
   m_notifier.notify(payload, response, on_finish);
 }
 
-void Watcher::WatchCtx::handle_notify(uint64_t notify_id,
-                                               uint64_t handle,
-                                               uint64_t notifier_id,
-                                               bufferlist& bl) {
-  watcher.handle_notify(notify_id, handle, notifier_id, bl);
+void Watcher::WatchCtx::handle_notify(uint64_t notify_id, uint64_t handle,
+                                      uint64_t notifier_id, bufferlist& bl) {
+  // if notifications are blocked, finish the notification w/o
+  // bubbling the notification up to the derived class
+  watcher.m_async_op_tracker.start_op();
+  if (watcher.notifications_blocked()) {
+    bufferlist bl;
+    watcher.acknowledge_notify(notify_id, handle, bl);
+  } else {
+    watcher.handle_notify(notify_id, handle, notifier_id, bl);
+  }
+  watcher.m_async_op_tracker.finish_op();
 }
 
 void Watcher::WatchCtx::handle_error(uint64_t handle, int err) {
index 01de690937d084d458f556634d89fde57c86cb6b..39009027d9ebc4756bd466b0d6f555e53614975b 100644 (file)
@@ -4,6 +4,7 @@
 #ifndef CEPH_LIBRBD_WATCHER_H
 #define CEPH_LIBRBD_WATCHER_H
 
+#include "common/AsyncOpTracker.h"
 #include "common/Mutex.h"
 #include "common/RWLock.h"
 #include "include/rados/librados.hpp"
@@ -36,9 +37,13 @@ public:
   virtual ~Watcher();
 
   void register_watch(Context *on_finish);
-  void unregister_watch(Context *on_finish);
+  virtual void unregister_watch(Context *on_finish);
   void flush(Context *on_finish);
 
+  bool notifications_blocked() const;
+  virtual void block_notifies(Context *on_finish);
+  void unblock_notifies();
+
   std::string get_oid() const;
   void set_oid(const string& oid);
 
@@ -73,6 +78,7 @@ protected:
   uint64_t m_watch_handle;
   watcher::Notifier m_notifier;
   WatchState m_watch_state;
+  AsyncOpTracker m_async_op_tracker;
 
   void send_notify(bufferlist &payload,
                    watcher::NotifyResponse *response = nullptr,
@@ -149,6 +155,8 @@ private:
   WatchCtx m_watch_ctx;
   Context *m_unregister_watch_ctx = nullptr;
 
+  uint32_t m_blocked_count = 0;
+
   void handle_register_watch(int r, Context *on_finish);
 
   void rewatch();