]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
librbd: own the lifecycle of the journaler's threads
authorJason Dillaman <dillaman@redhat.com>
Thu, 3 Mar 2016 15:17:56 +0000 (10:17 -0500)
committerJason Dillaman <dillaman@redhat.com>
Tue, 8 Mar 2016 12:11:51 +0000 (07:11 -0500)
This is the first step in merging the journal threads into the
librbd singleton thread pool.

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/librbd/Journal.cc
src/librbd/Journal.h
src/test/librbd/test_mock_Journal.cc
src/tools/rbd/action/Journal.cc

index c07ba02b9f7d1d239cd854d044ce9aca2fdf2310..21fc12262b375d30e018aa71ebf1cf0fbf4beee5 100644 (file)
@@ -13,6 +13,8 @@
 #include "journal/Journaler.h"
 #include "journal/ReplayEntry.h"
 #include "common/errno.h"
+#include "common/Timer.h"
+#include "common/WorkQueue.h"
 
 #define dout_subsys ceph_subsys_rbd
 #undef dout_prefix
 
 namespace librbd {
 
+namespace {
+
+// TODO: once journaler is 100% async, remove separate threads and
+// reuse ImageCtx's thread pool
+class ThreadPoolSingleton : public ThreadPool {
+public:
+  explicit ThreadPoolSingleton(CephContext *cct)
+    : ThreadPool(cct, "librbd::Journal", "tp_librbd_journ", 1) {
+    start();
+  }
+  virtual ~ThreadPoolSingleton() {
+    stop();
+  }
+};
+
+class SafeTimerSingleton : public SafeTimer {
+public:
+  Mutex lock;
+
+  explicit SafeTimerSingleton(CephContext *cct)
+      : SafeTimer(cct, lock, true),
+        lock("librbd::Journal::SafeTimerSingleton::lock") {
+    init();
+  }
+  virtual ~SafeTimerSingleton() {
+    Mutex::Locker locker(lock);
+    shutdown();
+  }
+};
+
+} // anonymous namespace
+
 using util::create_async_context_callback;
 using util::create_context_callback;
 
@@ -72,11 +106,30 @@ Journal<I>::Journal(I &image_ctx)
     m_event_lock("Journal<I>::m_event_lock"), m_event_tid(0),
     m_blocking_writes(false), m_journal_replay(NULL) {
 
-  ldout(m_image_ctx.cct, 5) << this << ": ictx=" << &m_image_ctx << dendl;
+  CephContext *cct = m_image_ctx.cct;
+  ldout(cct, 5) << this << ": ictx=" << &m_image_ctx << dendl;
+
+  ThreadPoolSingleton *thread_pool_singleton;
+  cct->lookup_or_create_singleton_object<ThreadPoolSingleton>(
+    thread_pool_singleton, "librbd::journal::thread_pool");
+  m_work_queue = new ContextWQ("librbd::journal::work_queue",
+                               cct->_conf->rbd_op_thread_timeout,
+                               thread_pool_singleton);
+
+  SafeTimerSingleton *safe_timer_singleton;
+  cct->lookup_or_create_singleton_object<SafeTimerSingleton>(
+    safe_timer_singleton, "librbd::journal::safe_timer");
+  m_timer = safe_timer_singleton;
+  m_timer_lock = &safe_timer_singleton->lock;
 }
 
 template <typename I>
 Journal<I>::~Journal() {
+  if (m_work_queue != nullptr) {
+    m_work_queue->drain();
+    delete m_work_queue;
+  }
+
   assert(m_state == STATE_UNINITIALIZED || m_state == STATE_CLOSED);
   assert(m_journaler == NULL);
   assert(m_journal_replay == NULL);
@@ -560,7 +613,8 @@ void Journal<I>::create_journaler() {
   assert(m_journaler == NULL);
 
   transition_state(STATE_INITIALIZING, 0);
-  m_journaler = new Journaler(m_image_ctx.md_ctx, m_image_ctx.id, "",
+  m_journaler = new Journaler(m_work_queue, m_timer, m_timer_lock,
+                              m_image_ctx.md_ctx, m_image_ctx.id, "",
                               m_image_ctx.journal_commit_age);
   m_journaler->init(create_async_context_callback(
     m_image_ctx, create_context_callback<
index a9f376763f0f0049b786e3a62e485f1607cc7112..2b8aa5f9ecc9c8b54e61cbc0267c43df2a7b9999 100644 (file)
@@ -20,6 +20,8 @@
 #include <unordered_map>
 
 class Context;
+class ContextWQ;
+class SafeTimer;
 namespace journal {
 class Journaler;
 }
@@ -238,6 +240,10 @@ private:
     }
   };
 
+  ContextWQ *m_work_queue = nullptr;
+  SafeTimer *m_timer = nullptr;
+  Mutex *m_timer_lock = nullptr;
+
   Journaler *m_journaler;
   mutable Mutex m_lock;
   State m_state;
index 5c506199d570dab81bc06e7f160de148d4c8da03..706145e6aeb924cfca6769f8d354564ba7dbb5a1 100644 (file)
@@ -109,6 +109,13 @@ struct MockJournalerProxy {
     MockJournaler::get_instance().construct();
   }
 
+  template <typename IoCtxT>
+  MockJournalerProxy(ContextWQ *work_queue, SafeTimer *safe_timer,
+                     Mutex *timer_lock, IoCtxT &header_ioctx,
+                     const std::string &, const std::string &, double) {
+    MockJournaler::get_instance().construct();
+  }
+
   int exists(bool *header_exists) const {
     return -EINVAL;
   }
index 2c556a6d50593e5be53473c29b2b7f524d1febc9..299568484d4b7defab9b5792f60b6daed88c8715 100644 (file)
@@ -205,8 +205,8 @@ public:
     return 0;
   }
 
-  int shutdown() {
-    ::journal::Journaler::shutdown();
+  int shut_down() {
+    ::journal::Journaler::shut_down();
 
     int r = unregister_client();
     if (r < 0) {
@@ -250,7 +250,7 @@ public:
       }
     }
 
-    r = m_journaler.shutdown();
+    r = m_journaler.shut_down();
     if (r < 0 && m_r == 0) {
       m_r = r;
     }
@@ -679,7 +679,7 @@ public:
     if (r1 < 0 && r == 0) {
       r = r1;
     }
-    r1 = m_journaler.shutdown();
+    r1 = m_journaler.shut_down();
     if (r1 < 0 && r == 0) {
       r = r1;
     }