]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
librbd: replace Finisher/SafeTimer use with facade
authorJason Dillaman <dillaman@redhat.com>
Wed, 25 Feb 2015 04:35:31 +0000 (23:35 -0500)
committerJason Dillaman <dillaman@redhat.com>
Thu, 26 Feb 2015 00:51:07 +0000 (19:51 -0500)
Replace the two Context threading classes used within
ImageWatcher with a facade to orchestrate the scheduling
and canceling of Context task callbacks.

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/librbd/ImageWatcher.cc
src/librbd/ImageWatcher.h
src/librbd/Makefile.am
src/librbd/TaskFinisher.h [new file with mode: 0644]
src/librbd/WatchNotifyTypes.h

index de30d265b3e8b3030544b2283fe7d87bc96fe20c..de751123681f858ccfb9b1761b70469d70e58527 100644 (file)
@@ -4,12 +4,12 @@
 #include "librbd/AioCompletion.h"
 #include "librbd/ImageCtx.h"
 #include "librbd/ObjectMap.h"
+#include "librbd/TaskFinisher.h"
 #include "cls/lock/cls_lock_client.h"
 #include "cls/lock/cls_lock_types.h"
 #include "include/encoding.h"
 #include "include/stringify.h"
 #include "common/errno.h"
-#include "common/Timer.h"
 #include <sstream>
 #include <boost/bind.hpp>
 #include <boost/function.hpp>
@@ -35,24 +35,16 @@ ImageWatcher::ImageWatcher(ImageCtx &image_ctx)
     m_watch_ctx(*this), m_watch_handle(0),
     m_watch_state(WATCH_STATE_UNREGISTERED),
     m_lock_owner_state(LOCK_OWNER_STATE_NOT_LOCKED),
-    m_finisher(new Finisher(image_ctx.cct)),
-    m_timer_lock("librbd::ImageWatcher::m_timer_lock"),
-    m_timer(new SafeTimer(image_ctx.cct, m_timer_lock)),
+    m_task_finisher(new TaskFinisher<Task>(*m_image_ctx.cct)),
     m_async_request_lock("librbd::ImageWatcher::m_async_request_lock"),
     m_aio_request_lock("librbd::ImageWatcher::m_aio_request_lock"),
-    m_retry_aio_context(NULL),
     m_owner_client_id_lock("librbd::ImageWatcher::m_owner_client_id_lock")
 {
-  m_finisher->start();
-  m_timer->init();
 }
 
 ImageWatcher::~ImageWatcher()
 {
-  {
-    Mutex::Locker l(m_timer_lock);
-    m_timer->shutdown();
-  }
+  delete m_task_finisher;
   {
     RWLock::RLocker l(m_watch_lock);
     assert(m_watch_state != WATCH_STATE_REGISTERED);
@@ -61,8 +53,6 @@ ImageWatcher::~ImageWatcher()
     RWLock::RLocker l(m_image_ctx.owner_lock);
     assert(m_lock_owner_state == LOCK_OWNER_STATE_NOT_LOCKED);
   }
-  m_finisher->stop();
-  delete m_finisher;
 }
 
 bool ImageWatcher::is_lock_supported() const {
@@ -102,7 +92,7 @@ int ImageWatcher::unregister_watch() {
   }
 
   cancel_async_requests();
-  m_finisher->wait_for_empty();
+  m_task_finisher->cancel_all();
 
   int r = 0;
   {
@@ -209,7 +199,7 @@ int ImageWatcher::request_lock(
     // run notify request in finisher to avoid blocking aio path
     FunctionContext *ctx = new FunctionContext(
       boost::bind(&ImageWatcher::notify_request_lock, this));
-    m_finisher->queue(ctx);
+    m_task_finisher->queue(TASK_CODE_REQUEST_LOCK, ctx);
   }
   return 0;
 }
@@ -325,7 +315,7 @@ int ImageWatcher::lock() {
   FunctionContext *ctx = new FunctionContext(
     boost::bind(&IoCtx::notify2, &m_image_ctx.md_ctx, m_image_ctx.header_oid,
                bl, NOTIFY_TIMEOUT, reinterpret_cast<bufferlist *>(NULL)));
-  m_finisher->queue(ctx);
+  m_task_finisher->queue(TASK_CODE_ACQUIRED_LOCK, ctx);
   return 0;
 }
 
@@ -366,7 +356,7 @@ int ImageWatcher::unlock()
 
   FunctionContext *ctx = new FunctionContext(
     boost::bind(&ImageWatcher::notify_released_lock, this));
-  m_finisher->queue(ctx);
+  m_task_finisher->queue(TASK_CODE_RELEASED_LOCK, ctx);
   return 0;
 }
 
@@ -375,12 +365,19 @@ void ImageWatcher::release_lock()
   ldout(m_image_ctx.cct, 10) << "releasing exclusive lock by request" << dendl;
   {
     RWLock::WLocker l(m_image_ctx.owner_lock);
+    if (!is_lock_owner()) {
+      return;
+    }
     prepare_unlock();
   }
 
   m_image_ctx.cancel_async_requests();
 
   RWLock::WLocker l(m_image_ctx.owner_lock);
+  if (!is_lock_owner()) {
+    return;
+  }
+
   {
     RWLock::WLocker l2(m_image_ctx.md_lock);
     librbd::_flush(&m_image_ctx);
@@ -399,6 +396,14 @@ void ImageWatcher::assert_header_locked(librados::ObjectWriteOperation *op) {
                                   encode_lock_cookie(), WATCHER_LOCK_TAG);
 }
 
+void ImageWatcher::schedule_async_progress(const AsyncRequestId &request,
+                                          uint64_t offset, uint64_t total) {
+  FunctionContext *ctx = new FunctionContext(
+    boost::bind(&ImageWatcher::notify_async_progress, this, request, offset,
+                total));
+  m_task_finisher->queue(Task(TASK_CODE_ASYNC_PROGRESS, request), ctx);
+}
+
 int ImageWatcher::notify_async_progress(const AsyncRequestId &request,
                                        uint64_t offset, uint64_t total) {
   ldout(m_image_ctx.cct, 20) << "remote async request progress: "
@@ -409,9 +414,6 @@ int ImageWatcher::notify_async_progress(const AsyncRequestId &request,
   ::encode(NotifyMessage(AsyncProgressPayload(request, offset, total)), bl);
 
   m_image_ctx.md_ctx.notify2(m_image_ctx.header_oid, bl, NOTIFY_TIMEOUT, NULL);
-
-  RWLock::WLocker l(m_async_request_lock);
-  m_async_progress.erase(request);
   return 0;
 }
 
@@ -419,7 +421,7 @@ void ImageWatcher::schedule_async_complete(const AsyncRequestId &request,
                                           int r) {
   FunctionContext *ctx = new FunctionContext(
     boost::bind(&ImageWatcher::notify_async_complete, this, request, r));
-  m_finisher->queue(ctx);
+  m_task_finisher->queue(ctx);
 }
 
 int ImageWatcher::notify_async_complete(const AsyncRequestId &request,
@@ -510,38 +512,18 @@ bool ImageWatcher::decode_lock_cookie(const std::string &tag,
 }
 
 void ImageWatcher::schedule_retry_aio_requests(bool use_timer) {
-  Mutex::Locker l(m_timer_lock);
+  Context *ctx = new FunctionContext(boost::bind(
+    &ImageWatcher::retry_aio_requests, this));
   if (use_timer) {
-    if (m_retry_aio_context == NULL) {
-      m_retry_aio_context = new FunctionContext(boost::bind(
-       &ImageWatcher::finalize_retry_aio_requests, this));
-      m_timer->add_event_after(RETRY_DELAY_SECONDS, m_retry_aio_context);
-    }
+    m_task_finisher->add_event_after(TASK_CODE_RETRY_AIO_REQUESTS,
+                                     RETRY_DELAY_SECONDS, ctx);
   } else {
-    m_timer->cancel_event(m_retry_aio_context);
-    m_retry_aio_context = NULL;
-
-    Context *ctx = new FunctionContext(boost::bind(
-      &ImageWatcher::retry_aio_requests, this));
-    m_finisher->queue(ctx);
+    m_task_finisher->queue(TASK_CODE_RETRY_AIO_REQUESTS, ctx);
   }
 }
 
-void ImageWatcher::cancel_retry_aio_requests() {
-  Mutex::Locker l(m_timer_lock);
-  if (m_retry_aio_context != NULL) {
-    m_timer->cancel_event(m_retry_aio_context);
-    m_retry_aio_context = NULL;
-  }
-}
-
-void ImageWatcher::finalize_retry_aio_requests() {
-  assert(m_timer_lock.is_locked());
-  m_retry_aio_context = NULL;
-  retry_aio_requests();
-}
-
 void ImageWatcher::retry_aio_requests() {
+  m_task_finisher->cancel(TASK_CODE_RETRY_AIO_REQUESTS);
   std::vector<AioRequest> lock_request_restarts;
   {
     Mutex::Locker l(m_aio_request_lock);
@@ -560,7 +542,7 @@ void ImageWatcher::retry_aio_requests() {
 void ImageWatcher::schedule_cancel_async_requests() {
   FunctionContext *ctx = new FunctionContext(
     boost::bind(&ImageWatcher::cancel_async_requests, this));
-  m_finisher->queue(ctx);
+  m_task_finisher->queue(TASK_CODE_CANCEL_ASYNC_REQUESTS, ctx);
 }
 
 void ImageWatcher::cancel_async_requests() {
@@ -587,7 +569,7 @@ void ImageWatcher::notify_released_lock() {
 
 void ImageWatcher::notify_request_lock() {
   ldout(m_image_ctx.cct, 10) << "notify request lock" << dendl;
-  cancel_retry_aio_requests();
+  m_task_finisher->cancel(TASK_CODE_RETRY_AIO_REQUESTS);
 
   m_image_ctx.owner_lock.get_read();
   if (try_request_lock()) {
@@ -710,18 +692,6 @@ int ImageWatcher::notify_async_request(const AsyncRequestId &async_request_id,
   return r;
 }
 
-void ImageWatcher::schedule_async_progress(const AsyncRequestId &request,
-                                          uint64_t offset, uint64_t total) {
-  RWLock::WLocker l(m_async_request_lock);
-  if (m_async_progress.count(request) == 0) {
-    m_async_progress.insert(request);
-    FunctionContext *ctx = new FunctionContext(
-      boost::bind(&ImageWatcher::notify_async_progress, this, request, offset,
-                 total));
-    m_finisher->queue(ctx);
-  }
-}
-
 void ImageWatcher::handle_payload(const HeaderUpdatePayload &payload,
                                  bufferlist *out) {
   ldout(m_image_ctx.cct, 10) << "image header updated" << dendl;
@@ -791,7 +761,7 @@ void ImageWatcher::handle_payload(const RequestLockPayload &payload,
     ldout(m_image_ctx.cct, 10) << "queuing release of exclusive lock" << dendl;
     FunctionContext *ctx = new FunctionContext(
       boost::bind(&ImageWatcher::release_lock, this));
-    m_finisher->queue(ctx);
+    m_task_finisher->queue(TASK_CODE_RELEASING_LOCK, ctx);
   }
 }
 
@@ -925,7 +895,7 @@ void ImageWatcher::handle_payload(const SnapCreatePayload &payload,
       // cannot notify within a notificiation
       FunctionContext *ctx = new FunctionContext(
        boost::bind(&ImageWatcher::finalize_header_update, this));
-      m_finisher->queue(ctx);
+      m_task_finisher->queue(TASK_CODE_HEADER_UPDATE, ctx);
     }
   }
 }
@@ -975,7 +945,7 @@ void ImageWatcher::handle_error(uint64_t handle, int err) {
 
     FunctionContext *ctx = new FunctionContext(
       boost::bind(&ImageWatcher::reregister_watch, this));
-    m_finisher->queue(ctx);
+    m_task_finisher->queue(TASK_CODE_REREGISTER_WATCH, ctx);
   }
 }
 
@@ -984,13 +954,6 @@ void ImageWatcher::acknowledge_notify(uint64_t notify_id, uint64_t handle,
   m_image_ctx.md_ctx.notify_ack(m_image_ctx.header_oid, notify_id, handle, out);
 }
 
-void ImageWatcher::schedule_reregister_watch() {
-  Mutex::Locker l(m_timer_lock);
-  Context *ctx = new FunctionContext(boost::bind(
-    &ImageWatcher::reregister_watch, this));
-  m_timer->add_event_after(RETRY_DELAY_SECONDS, ctx);
-}
-
 void ImageWatcher::reregister_watch() {
   ldout(m_image_ctx.cct, 10) << "re-registering image watch" << dendl;
 
@@ -1015,8 +978,9 @@ void ImageWatcher::reregister_watch() {
                                << cpp_strerror(r) << dendl;
        if (r != -ESHUTDOWN) {
          FunctionContext *ctx = new FunctionContext(boost::bind(
-           &ImageWatcher::schedule_reregister_watch, this));
-         m_finisher->queue(ctx);
+           &ImageWatcher::reregister_watch, this));
+         m_task_finisher->add_event_after(TASK_CODE_REREGISTER_WATCH,
+                                           RETRY_DELAY_SECONDS, ctx);
        }
         return;
       }
index 17573d74daac3d227b47ae1303597ca4212e257f..d159d59a2af4eeda8beb31abb8ccdf8c51cd3d5b 100644 (file)
 #include "include/assert.h"
 
 class entity_name_t;
-class Finisher;
-class SafeTimer;
 
 namespace librbd {
 
   class AioCompletion;
   class ImageCtx;
+  template <typename T> class TaskFinisher;
 
   class ImageWatcher {
   public:
@@ -68,10 +67,44 @@ namespace librbd {
       WATCH_STATE_ERROR
     };
 
+    enum TaskCode {
+      TASK_CODE_ACQUIRED_LOCK,
+      TASK_CODE_REQUEST_LOCK,
+      TASK_CODE_RELEASING_LOCK,
+      TASK_CODE_RELEASED_LOCK,
+      TASK_CODE_RETRY_AIO_REQUESTS,
+      TASK_CODE_CANCEL_ASYNC_REQUESTS,
+      TASK_CODE_HEADER_UPDATE,
+      TASK_CODE_REREGISTER_WATCH,
+      TASK_CODE_ASYNC_REQUEST,
+      TASK_CODE_ASYNC_PROGRESS
+    };
+
     typedef std::pair<Context *, ProgressContext *> AsyncRequest;
     typedef std::pair<boost::function<int(AioCompletion *)>,
                      AioCompletion *> AioRequest;
 
+    class Task {
+    public:
+      Task(TaskCode task_code) : m_task_code(task_code) {}
+      Task(TaskCode task_code, const WatchNotify::AsyncRequestId &id)
+        : m_task_code(task_code), m_async_request_id(id) {}
+
+      inline bool operator<(const Task& rhs) const {
+        if (m_task_code != rhs.m_task_code) {
+          return m_task_code < rhs.m_task_code;
+        } else if ((m_task_code == TASK_CODE_ASYNC_REQUEST ||
+                    m_task_code == TASK_CODE_ASYNC_PROGRESS) &&
+                   m_async_request_id != rhs.m_async_request_id) {
+          return m_async_request_id < rhs.m_async_request_id;
+        }
+        return false;
+      }
+    private:
+      TaskCode m_task_code;
+      WatchNotify::AsyncRequestId m_async_request_id;
+    };
+
     struct WatchCtx : public librados::WatchCtx2 {
       ImageWatcher &image_watcher;
 
@@ -159,19 +192,14 @@ namespace librbd {
 
     LockOwnerState m_lock_owner_state;
 
-    Finisher *m_finisher;
-
-    Mutex m_timer_lock;
-    SafeTimer *m_timer;
+    TaskFinisher<Task> *m_task_finisher;
 
     RWLock m_async_request_lock;
     std::map<WatchNotify::AsyncRequestId, AsyncRequest> m_async_requests;
     std::set<WatchNotify::AsyncRequestId> m_async_pending;
-    std::set<WatchNotify::AsyncRequestId> m_async_progress;
 
     Mutex m_aio_request_lock;
     std::vector<AioRequest> m_aio_requests;
-    Context *m_retry_aio_context;
 
     Mutex m_owner_client_id_lock;
     WatchNotify::ClientId m_owner_client_id;
@@ -187,8 +215,6 @@ namespace librbd {
     void finalize_header_update();
 
     void schedule_retry_aio_requests(bool use_timer);
-    void cancel_retry_aio_requests();
-    void finalize_retry_aio_requests();
     void retry_aio_requests();
 
     void schedule_cancel_async_requests();
@@ -239,7 +265,6 @@ namespace librbd {
     void acknowledge_notify(uint64_t notify_id, uint64_t handle,
                            bufferlist &out);
 
-    void schedule_reregister_watch();
     void reregister_watch();
   };
 
index ffed797760f91b1537bd4ad1d8d8657f5bf8f150..a4b21d97db1313e2c0478e76b3b48426236f243e 100644 (file)
@@ -61,4 +61,5 @@ noinst_HEADERS += \
        librbd/ObjectMap.h \
        librbd/parent_types.h \
        librbd/SnapInfo.h \
+       librbd/TaskFinisher.h \
        librbd/WatchNotifyTypes.h
diff --git a/src/librbd/TaskFinisher.h b/src/librbd/TaskFinisher.h
new file mode 100644 (file)
index 0000000..14dcd30
--- /dev/null
@@ -0,0 +1,141 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+#ifndef LIBRBD_TASK_FINISHER_H
+#define LIBRBD_TASK_FINISHER_H
+
+#include "include/int_types.h"
+#include "include/Context.h"
+#include "common/Finisher.h"
+#include "common/Mutex.h"
+#include "common/Timer.h"
+#include <map>
+#include <utility>
+
+class CephContext;
+class Context;
+
+namespace librbd {
+
+template <typename Task>
+class TaskFinisher {
+public:
+  TaskFinisher(CephContext &cct)
+    : m_cct(cct), m_lock("librbd::TaskFinisher::m_lock"),
+      m_finisher(new Finisher(&cct)),
+      m_safe_timer(new SafeTimer(&cct, m_lock, false))
+  {
+    m_finisher->start();
+    m_safe_timer->init();
+  }
+
+  ~TaskFinisher() {
+    {
+      Mutex::Locker l(m_lock);
+      m_safe_timer->shutdown();
+      delete m_safe_timer;
+    }
+
+    m_finisher->stop();
+    delete m_finisher;
+  }
+
+  void cancel(const Task& task) {
+    Mutex::Locker l(m_lock);
+    typename TaskContexts::iterator it = m_task_contexts.find(task);
+    if (it != m_task_contexts.end()) {
+      delete it->second.first;
+      m_task_contexts.erase(it);
+    }
+  }
+
+  void cancel_all() {
+    Mutex::Locker l(m_lock);
+    for (typename TaskContexts::iterator it = m_task_contexts.begin();
+         it != m_task_contexts.end(); ++it) {
+      delete it->second.first;
+    }
+    m_task_contexts.clear();
+  }
+
+  bool add_event_after(const Task& task, double seconds, Context *ctx) {
+    Mutex::Locker l(m_lock);
+    if (m_task_contexts.count(task) != 0) {
+      // task already scheduled on finisher or timer
+      delete ctx;
+      return false;
+    }
+    C_Task *timer_ctx = new C_Task(this, task);
+    m_task_contexts[task] = std::make_pair(ctx, timer_ctx);
+
+    m_safe_timer->add_event_after(seconds, timer_ctx);
+    return true;
+  }
+
+  void queue(Context *ctx) {
+    m_finisher->queue(ctx);
+  }
+
+  bool queue(const Task& task, Context *ctx) {
+    Mutex::Locker l(m_lock);
+    typename TaskContexts::iterator it = m_task_contexts.find(task);
+    if (it != m_task_contexts.end()) {
+      if (it->second.second != NULL) {
+        assert(m_safe_timer->cancel_event(it->second.second));
+        delete it->second.first;
+      } else {
+        // task already scheduled on the finisher
+        delete ctx;
+        return false;
+      }
+    }
+    m_task_contexts[task] = std::make_pair(ctx, reinterpret_cast<Context *>(NULL));
+
+    m_finisher->queue(new C_Task(this, task));
+    return true;
+  }
+
+private:
+  class C_Task : public Context {
+  public:
+    C_Task(TaskFinisher *task_finisher, const Task& task)
+      : m_task_finisher(task_finisher), m_task(task)
+    {
+    }
+  protected:
+    virtual void finish(int r) {
+      m_task_finisher->complete(m_task);
+    }
+  private:
+    TaskFinisher *m_task_finisher;
+    Task m_task;
+  };
+
+  CephContext &m_cct;
+
+  Mutex m_lock;
+  Finisher *m_finisher;
+  SafeTimer *m_safe_timer;
+
+  typedef std::map<Task, std::pair<Context *, Context *> > TaskContexts;
+  TaskContexts m_task_contexts;
+
+  void complete(const Task& task) {
+    Context *ctx = NULL;
+    {
+      Mutex::Locker l(m_lock);
+      typename TaskContexts::iterator it = m_task_contexts.find(task);
+      if (it != m_task_contexts.end()) {
+        ctx = it->second.first;
+        m_task_contexts.erase(it);
+      }
+    }
+
+    if (ctx != NULL) {
+      ctx->complete(0);
+    }
+  }
+};
+
+} // namespace librbd
+
+#endif // LIBRBD_TASK_FINISHER
index 3f54a1c802c1bf753d310158422cb2f7b8871b05..2b3c34b61633ea7e76c1bc0af2ed8466058807db 100644 (file)
@@ -67,6 +67,9 @@ struct AsyncRequestId {
       return request_id < rhs.request_id;
     }
   }
+  inline bool operator!=(const AsyncRequestId &rhs) const {
+    return (client_id != rhs.client_id || request_id != rhs.request_id);
+  }
 };
 
 enum NotifyOp {