]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
librbd: throttle async progress callbacks
authorJason Dillaman <dillaman@redhat.com>
Mon, 19 Jan 2015 22:33:41 +0000 (17:33 -0500)
committerJosh Durgin <jdurgin@redhat.com>
Sat, 24 Jan 2015 23:05:49 +0000 (15:05 -0800)
Ensure that no more than one outstanding progress callback
is queued for notification.  This will allow remote progress
updates to be sent at a rate in which all watch/notify
clients can support.

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/librbd/ImageWatcher.cc
src/librbd/ImageWatcher.h

index 13cf69bee740a7e2c0b930256d28bb43f57aeb3e..b1fea4af62f9366acdade307f31d51fb3330a2fa 100644 (file)
@@ -59,58 +59,6 @@ enum {
   NOTIFY_OP_SNAP_CREATE    = 8
 };
 
-class RemoteProgressContext : public ProgressContext {
-public:
-  RemoteProgressContext(ImageWatcher &image_watcher, Finisher &finisher,
-                       const RemoteAsyncRequest &remote_async_request)
-    : m_image_watcher(image_watcher), m_finisher(finisher),
-      m_remote_async_request(remote_async_request)
-  {
-  }
-
-  virtual int update_progress(uint64_t offset, uint64_t total) {
-    // TODO: JD throttle notify updates(?)
-    FunctionContext *ctx = new FunctionContext(
-      boost::bind(&ImageWatcher::notify_async_progress,
-                 &m_image_watcher, m_remote_async_request, offset, total));
-    m_finisher.queue(ctx);
-    return 0;
-  }
-
-private:
-  ImageWatcher &m_image_watcher;
-  Finisher &m_finisher;
-  RemoteAsyncRequest m_remote_async_request;
-};
-
-class RemoteContext : public Context {
-public:
-  RemoteContext(ImageWatcher &image_watcher, Finisher &finisher,
-               const RemoteAsyncRequest &remote_async_request,
-               RemoteProgressContext *prog_ctx)
-    : m_image_watcher(image_watcher), m_finisher(finisher),
-      m_remote_async_request(remote_async_request), m_prog_ctx(prog_ctx)
-  {
-  }
-
-  ~RemoteContext() {
-    delete m_prog_ctx;
-  }
-
-  virtual void finish(int r) {
-    FunctionContext *ctx = new FunctionContext(
-      boost::bind(&ImageWatcher::notify_async_complete,
-                 &m_image_watcher, m_remote_async_request, r));
-    m_finisher.queue(ctx);
-  }
-
-private:
-  ImageWatcher &m_image_watcher;
-  Finisher &m_finisher;
-  RemoteAsyncRequest m_remote_async_request;
-  RemoteProgressContext *m_prog_ctx;
-};
-
 ImageWatcher::ImageWatcher(ImageCtx &image_ctx)
   : m_image_ctx(image_ctx), m_watch_ctx(*this), m_handle(0),
     m_lock_owner_state(LOCK_OWNER_STATE_NOT_LOCKED),
@@ -443,6 +391,9 @@ int ImageWatcher::notify_async_progress(const RemoteAsyncRequest &request,
   ENCODE_FINISH(bl);
 
   m_image_ctx.md_ctx.notify2(m_image_ctx.header_oid, bl, NOTIFY_TIMEOUT, NULL);
+
+  RWLock::WLocker l(m_async_request_lock);
+  m_async_progress.erase(request);
   return 0;
 }
 
@@ -757,6 +708,19 @@ int ImageWatcher::notify_async_request(uint64_t async_request_id,
   return r;
 }
 
+void ImageWatcher::schedule_update_progress(
+    const RemoteAsyncRequest &remote_async_request,
+    uint64_t offset, uint64_t total) {
+  RWLock::WLocker l(m_async_request_lock);
+  if (m_async_progress.count(remote_async_request) == 0) {
+    m_async_progress.insert(remote_async_request);
+    FunctionContext *ctx = new FunctionContext(
+      boost::bind(&ImageWatcher::notify_async_progress,
+                 this, remote_async_request, offset, total));
+    m_finisher->queue(ctx);
+  }
+}
+
 void ImageWatcher::handle_header_update() {
   ldout(m_image_ctx.cct, 1) << "image header updated" << dendl;
 
@@ -857,13 +821,11 @@ void ImageWatcher::handle_flatten(bufferlist::iterator iter, bufferlist *out) {
     RemoteAsyncRequest request;
     ::decode(request, iter);
 
-    RemoteProgressContext *prog_ctx =
-      new RemoteProgressContext(*this, *m_finisher, request);
-    RemoteContext *ctx = new RemoteContext(*this, *m_finisher, request,
-                                          prog_ctx);
+    RemoteProgressContext *prog_ctx = new RemoteProgressContext(*this,
+                                                               request);
+    RemoteContext *ctx = new RemoteContext(*this, request, prog_ctx);
 
     ldout(m_image_ctx.cct, 20) << "remote flatten request: " << request << dendl;
-
     int r = librbd::async_flatten(&m_image_ctx, ctx, *prog_ctx);
     if (r < 0) {
       delete ctx;
@@ -884,14 +846,12 @@ void ImageWatcher::handle_resize(bufferlist::iterator iter, bufferlist *out) {
     RemoteAsyncRequest request;
     ::decode(request, iter);
 
-    RemoteProgressContext *prog_ctx =
-      new RemoteProgressContext(*this, *m_finisher, request);
-    RemoteContext *ctx = new RemoteContext(*this, *m_finisher, request,
-                                          prog_ctx);
+    RemoteProgressContext *prog_ctx = new RemoteProgressContext(*this,
+                                                               request);
+    RemoteContext *ctx = new RemoteContext(*this, request, prog_ctx);
 
     ldout(m_image_ctx.cct, 20) << "remote resize request: " << request
                               << " " << size << dendl;
-
     int r = librbd::async_resize(&m_image_ctx, ctx, size, *prog_ctx);
     if (r < 0) {
       delete ctx;
@@ -1076,4 +1036,11 @@ void ImageWatcher::WatchCtx::handle_error(uint64_t handle, int err) {
   image_watcher.handle_error(handle, err);
 }
 
+void ImageWatcher::RemoteContext::finish(int r) {
+    FunctionContext *ctx = new FunctionContext(
+      boost::bind(&ImageWatcher::notify_async_complete,
+                 &m_image_watcher, m_remote_async_request, r));
+    m_image_watcher.m_finisher->queue(ctx);
+}
+
 }
index 1ef02f140f845f0f2645f9841aaadd1c07d7dde5..e2a73a7931d4851823462fa457dd5952e1502a18 100644 (file)
@@ -7,6 +7,8 @@
 #include "common/Mutex.h"
 #include "common/RWLock.h"
 #include "include/rados/librados.hpp"
+#include "include/rbd/librbd.hpp"
+#include <set>
 #include <string>
 #include <utility>
 #include <vector>
@@ -22,7 +24,6 @@ namespace librbd {
 
   class AioCompletion;
   class ImageCtx;
-  class ProgressContext;
 
   struct RemoteAsyncRequest {
     uint64_t gid;
@@ -32,12 +33,21 @@ namespace librbd {
     RemoteAsyncRequest() : gid(), handle(), request_id() {}
     RemoteAsyncRequest(uint64_t gid_, uint64_t handle_, uint64_t request_id_)
       : gid(gid_), handle(handle_), request_id(request_id_) {}
+
+    inline bool operator<(const RemoteAsyncRequest &rhs) const {
+      if (gid != rhs.gid) {
+       return gid < rhs.gid;
+      } else if (handle != rhs.handle) {
+       return handle < rhs.handle;
+      } else {
+       return request_id < request_id;
+      }
+    }
   };
 
   class ImageWatcher {
   public:
 
-
     ImageWatcher(ImageCtx& image_ctx);
     ~ImageWatcher();
 
@@ -97,6 +107,48 @@ namespace librbd {
       virtual void handle_error(uint64_t handle, int err);
     };
 
+    class RemoteProgressContext : public ProgressContext {
+    public:
+      RemoteProgressContext(ImageWatcher &image_watcher,
+                           const RemoteAsyncRequest &remote_async_request)
+        : m_image_watcher(image_watcher),
+          m_remote_async_request(remote_async_request)
+      {
+      }
+
+      virtual int update_progress(uint64_t offset, uint64_t total) {
+       m_image_watcher.schedule_update_progress(
+         m_remote_async_request, offset, total);
+        return 0;
+      }
+
+    private:
+      ImageWatcher &m_image_watcher;
+      RemoteAsyncRequest m_remote_async_request;
+    };
+
+    class RemoteContext : public Context {
+    public:
+      RemoteContext(ImageWatcher &image_watcher,
+                   const RemoteAsyncRequest &remote_async_request,
+                   RemoteProgressContext *prog_ctx)
+        : m_image_watcher(image_watcher),
+          m_remote_async_request(remote_async_request), m_prog_ctx(prog_ctx)
+      {
+      }
+
+      ~RemoteContext() {
+        delete m_prog_ctx;
+      }
+
+      virtual void finish(int r);
+
+    private:
+      ImageWatcher &m_image_watcher;
+      RemoteAsyncRequest m_remote_async_request;
+      RemoteProgressContext *m_prog_ctx;
+    };
+
     ImageCtx &m_image_ctx;
 
     WatchCtx m_watch_ctx;
@@ -115,6 +167,7 @@ namespace librbd {
     RWLock m_async_request_lock;
     uint64_t m_async_request_id;
     std::map<uint64_t, AsyncRequest> m_async_requests;
+    std::set<RemoteAsyncRequest> m_async_progress;
 
     Mutex m_aio_request_lock;
     Cond m_aio_request_cond;
@@ -152,6 +205,9 @@ namespace librbd {
                             ProgressContext& prog_ctx);
     void notify_request_leadership();
 
+    void schedule_update_progress(const RemoteAsyncRequest &remote_async_request,
+                                 uint64_t offset, uint64_t total);
+
     void handle_header_update();
     void handle_acquired_lock();
     void handle_released_lock();