#include "journal/Journaler.h"
#include "journal/ReplayEntry.h"
#include "common/errno.h"
+#include "common/Timer.h"
+#include "common/WorkQueue.h"
#define dout_subsys ceph_subsys_rbd
#undef dout_prefix
namespace librbd {
+namespace {
+
+// TODO: once journaler is 100% async, remove separate threads and
+// reuse ImageCtx's thread pool
+class ThreadPoolSingleton : public ThreadPool {
+public:
+ explicit ThreadPoolSingleton(CephContext *cct)
+ : ThreadPool(cct, "librbd::Journal", "tp_librbd_journ", 1) {
+ start();
+ }
+ virtual ~ThreadPoolSingleton() {
+ stop();
+ }
+};
+
+class SafeTimerSingleton : public SafeTimer {
+public:
+ Mutex lock;
+
+ explicit SafeTimerSingleton(CephContext *cct)
+ : SafeTimer(cct, lock, true),
+ lock("librbd::Journal::SafeTimerSingleton::lock") {
+ init();
+ }
+ virtual ~SafeTimerSingleton() {
+ Mutex::Locker locker(lock);
+ shutdown();
+ }
+};
+
+} // anonymous namespace
+
using util::create_async_context_callback;
using util::create_context_callback;
m_event_lock("Journal<I>::m_event_lock"), m_event_tid(0),
m_blocking_writes(false), m_journal_replay(NULL) {
- ldout(m_image_ctx.cct, 5) << this << ": ictx=" << &m_image_ctx << dendl;
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 5) << this << ": ictx=" << &m_image_ctx << dendl;
+
+ ThreadPoolSingleton *thread_pool_singleton;
+ cct->lookup_or_create_singleton_object<ThreadPoolSingleton>(
+ thread_pool_singleton, "librbd::journal::thread_pool");
+ m_work_queue = new ContextWQ("librbd::journal::work_queue",
+ cct->_conf->rbd_op_thread_timeout,
+ thread_pool_singleton);
+
+ SafeTimerSingleton *safe_timer_singleton;
+ cct->lookup_or_create_singleton_object<SafeTimerSingleton>(
+ safe_timer_singleton, "librbd::journal::safe_timer");
+ m_timer = safe_timer_singleton;
+ m_timer_lock = &safe_timer_singleton->lock;
}
template <typename I>
Journal<I>::~Journal() {
+ if (m_work_queue != nullptr) {
+ m_work_queue->drain();
+ delete m_work_queue;
+ }
+
assert(m_state == STATE_UNINITIALIZED || m_state == STATE_CLOSED);
assert(m_journaler == NULL);
assert(m_journal_replay == NULL);
assert(m_journaler == NULL);
transition_state(STATE_INITIALIZING, 0);
- m_journaler = new Journaler(m_image_ctx.md_ctx, m_image_ctx.id, "",
+ m_journaler = new Journaler(m_work_queue, m_timer, m_timer_lock,
+ m_image_ctx.md_ctx, m_image_ctx.id, "",
m_image_ctx.journal_commit_age);
m_journaler->init(create_async_context_callback(
m_image_ctx, create_context_callback<