From 3496e7772d4947d35a02367cf91613cb888fcc7e Mon Sep 17 00:00:00 2001 From: Jason Dillaman Date: Thu, 3 Mar 2016 10:17:56 -0500 Subject: [PATCH] librbd: own the lifecycle of the journaler's threads This is the first step in merging the journal threads into the librbd singleton thread pool. Signed-off-by: Jason Dillaman --- src/librbd/Journal.cc | 58 +++++++++++++++++++++++++++- src/librbd/Journal.h | 6 +++ src/test/librbd/test_mock_Journal.cc | 7 ++++ src/tools/rbd/action/Journal.cc | 8 ++-- 4 files changed, 73 insertions(+), 6 deletions(-) diff --git a/src/librbd/Journal.cc b/src/librbd/Journal.cc index c07ba02b9f7..21fc12262b3 100644 --- a/src/librbd/Journal.cc +++ b/src/librbd/Journal.cc @@ -13,6 +13,8 @@ #include "journal/Journaler.h" #include "journal/ReplayEntry.h" #include "common/errno.h" +#include "common/Timer.h" +#include "common/WorkQueue.h" #define dout_subsys ceph_subsys_rbd #undef dout_prefix @@ -20,6 +22,38 @@ namespace librbd { +namespace { + +// TODO: once journaler is 100% async, remove separate threads and +// reuse ImageCtx's thread pool +class ThreadPoolSingleton : public ThreadPool { +public: + explicit ThreadPoolSingleton(CephContext *cct) + : ThreadPool(cct, "librbd::Journal", "tp_librbd_journ", 1) { + start(); + } + virtual ~ThreadPoolSingleton() { + stop(); + } +}; + +class SafeTimerSingleton : public SafeTimer { +public: + Mutex lock; + + explicit SafeTimerSingleton(CephContext *cct) + : SafeTimer(cct, lock, true), + lock("librbd::Journal::SafeTimerSingleton::lock") { + init(); + } + virtual ~SafeTimerSingleton() { + Mutex::Locker locker(lock); + shutdown(); + } +}; + +} // anonymous namespace + using util::create_async_context_callback; using util::create_context_callback; @@ -72,11 +106,30 @@ Journal::Journal(I &image_ctx) m_event_lock("Journal::m_event_lock"), m_event_tid(0), m_blocking_writes(false), m_journal_replay(NULL) { - ldout(m_image_ctx.cct, 5) << this << ": ictx=" << &m_image_ctx << dendl; + CephContext *cct = m_image_ctx.cct; + ldout(cct, 5) << this << ": ictx=" << &m_image_ctx << dendl; + + ThreadPoolSingleton *thread_pool_singleton; + cct->lookup_or_create_singleton_object( + thread_pool_singleton, "librbd::journal::thread_pool"); + m_work_queue = new ContextWQ("librbd::journal::work_queue", + cct->_conf->rbd_op_thread_timeout, + thread_pool_singleton); + + SafeTimerSingleton *safe_timer_singleton; + cct->lookup_or_create_singleton_object( + safe_timer_singleton, "librbd::journal::safe_timer"); + m_timer = safe_timer_singleton; + m_timer_lock = &safe_timer_singleton->lock; } template Journal::~Journal() { + if (m_work_queue != nullptr) { + m_work_queue->drain(); + delete m_work_queue; + } + assert(m_state == STATE_UNINITIALIZED || m_state == STATE_CLOSED); assert(m_journaler == NULL); assert(m_journal_replay == NULL); @@ -560,7 +613,8 @@ void Journal::create_journaler() { assert(m_journaler == NULL); transition_state(STATE_INITIALIZING, 0); - m_journaler = new Journaler(m_image_ctx.md_ctx, m_image_ctx.id, "", + m_journaler = new Journaler(m_work_queue, m_timer, m_timer_lock, + m_image_ctx.md_ctx, m_image_ctx.id, "", m_image_ctx.journal_commit_age); m_journaler->init(create_async_context_callback( m_image_ctx, create_context_callback< diff --git a/src/librbd/Journal.h b/src/librbd/Journal.h index a9f376763f0..2b8aa5f9ecc 100644 --- a/src/librbd/Journal.h +++ b/src/librbd/Journal.h @@ -20,6 +20,8 @@ #include class Context; +class ContextWQ; +class SafeTimer; namespace journal { class Journaler; } @@ -238,6 +240,10 @@ private: } }; + ContextWQ *m_work_queue = nullptr; + SafeTimer *m_timer = nullptr; + Mutex *m_timer_lock = nullptr; + Journaler *m_journaler; mutable Mutex m_lock; State m_state; diff --git a/src/test/librbd/test_mock_Journal.cc b/src/test/librbd/test_mock_Journal.cc index 5c506199d57..706145e6aeb 100644 --- a/src/test/librbd/test_mock_Journal.cc +++ b/src/test/librbd/test_mock_Journal.cc @@ -109,6 +109,13 @@ struct MockJournalerProxy { MockJournaler::get_instance().construct(); } + template + MockJournalerProxy(ContextWQ *work_queue, SafeTimer *safe_timer, + Mutex *timer_lock, IoCtxT &header_ioctx, + const std::string &, const std::string &, double) { + MockJournaler::get_instance().construct(); + } + int exists(bool *header_exists) const { return -EINVAL; } diff --git a/src/tools/rbd/action/Journal.cc b/src/tools/rbd/action/Journal.cc index 2c556a6d505..299568484d4 100644 --- a/src/tools/rbd/action/Journal.cc +++ b/src/tools/rbd/action/Journal.cc @@ -205,8 +205,8 @@ public: return 0; } - int shutdown() { - ::journal::Journaler::shutdown(); + int shut_down() { + ::journal::Journaler::shut_down(); int r = unregister_client(); if (r < 0) { @@ -250,7 +250,7 @@ public: } } - r = m_journaler.shutdown(); + r = m_journaler.shut_down(); if (r < 0 && m_r == 0) { m_r = r; } @@ -679,7 +679,7 @@ public: if (r1 < 0 && r == 0) { r = r1; } - r1 = m_journaler.shutdown(); + r1 = m_journaler.shut_down(); if (r1 < 0 && r == 0) { r = r1; } -- 2.39.5