From 1192e9c34334b7fb2501c37170a44b3f57eb116e Mon Sep 17 00:00:00 2001 From: Jason Dillaman Date: Tue, 15 Dec 2015 13:20:48 -0500 Subject: [PATCH] librbd: journal replay flush should be async Signed-off-by: Jason Dillaman --- src/librbd/Journal.cc | 168 +++++++++++++++++++++-------------- src/librbd/Journal.h | 107 ++++++++++------------ src/librbd/journal/Replay.cc | 55 +++++++----- src/librbd/journal/Replay.h | 7 +- 4 files changed, 189 insertions(+), 148 deletions(-) diff --git a/src/librbd/Journal.cc b/src/librbd/Journal.cc index 44a37d0db6aa9..54d071c79ea50 100644 --- a/src/librbd/Journal.cc +++ b/src/librbd/Journal.cc @@ -22,6 +22,9 @@ namespace librbd { +using util::create_async_context_callback; +using util::create_context_callback; + namespace { const std::string CLIENT_DESCRIPTION = "master image"; @@ -47,12 +50,16 @@ struct SetOpRequestTid : public boost::static_visitor { } }; +template struct C_ReplayCommitted : public Context { - ::journal::Journaler *journaler; - ::journal::ReplayEntry replay_entry; + typedef journal::TypeTraits TypeTraits; + typedef typename TypeTraits::Journaler Journaler; + typedef typename TypeTraits::ReplayEntry ReplayEntry; + + Journaler *journaler; + ReplayEntry replay_entry; - C_ReplayCommitted(::journal::Journaler *journaler, - ::journal::ReplayEntry &&replay_entry) : + C_ReplayCommitted(Journaler *journaler, ReplayEntry &&replay_entry) : journaler(journaler), replay_entry(std::move(replay_entry)) { } virtual void finish(int r) { @@ -75,9 +82,15 @@ std::ostream &operator<<(std::ostream &os, case Journal::STATE_REPLAYING: os << "Replaying"; break; + case Journal::STATE_FLUSHING_RESTART: + os << "FlushingRestart"; + break; case Journal::STATE_RESTARTING_REPLAY: os << "RestartingReplay"; break; + case Journal::STATE_FLUSHING_REPLAY: + os << "FlushingReplay"; + break; case Journal::STATE_READY: os << "Ready"; break; @@ -144,8 +157,7 @@ int Journal::create(librados::IoCtx &io_ctx, const std::string &image_id, pool_id = data_io_ctx.get_id(); } - ::journal::Journaler journaler(io_ctx, image_id, "", - cct->_conf->rbd_journal_commit_age); + Journaler journaler(io_ctx, image_id, "", cct->_conf->rbd_journal_commit_age); int r = journaler.create(order, splay_width, pool_id); if (r < 0) { @@ -166,8 +178,7 @@ int Journal::remove(librados::IoCtx &io_ctx, const std::string &image_id) { CephContext *cct = reinterpret_cast(io_ctx.cct()); ldout(cct, 5) << __func__ << ": image=" << image_id << dendl; - ::journal::Journaler journaler(io_ctx, image_id, "", - cct->_conf->rbd_journal_commit_age); + Journaler journaler(io_ctx, image_id, "", cct->_conf->rbd_journal_commit_age); bool journal_exists; int r = journaler.exists(&journal_exists); @@ -202,8 +213,7 @@ int Journal::reset(librados::IoCtx &io_ctx, const std::string &image_id) { CephContext *cct = reinterpret_cast(io_ctx.cct()); ldout(cct, 5) << __func__ << ": image=" << image_id << dendl; - ::journal::Journaler journaler(io_ctx, image_id, "", - cct->_conf->rbd_journal_commit_age); + Journaler journaler(io_ctx, image_id, "", cct->_conf->rbd_journal_commit_age); C_SaferCond cond; journaler.init(&cond); @@ -308,7 +318,7 @@ uint64_t Journal::append_io_event(AioCompletion *aio_comp, bufferlist bl; ::encode(event_entry, bl); - ::journal::Future future; + Future future; uint64_t tid; { Mutex::Locker locker(m_lock); @@ -442,7 +452,7 @@ void Journal::flush_event(uint64_t tid, Context *on_safe) { ldout(cct, 20) << this << " " << __func__ << ": tid=" << tid << ", " << "on_safe=" << on_safe << dendl; - ::journal::Future future; + Future future; { Mutex::Locker event_locker(m_event_lock); future = wait_event(m_lock, tid, on_safe); @@ -464,8 +474,8 @@ void Journal::wait_event(uint64_t tid, Context *on_safe) { } template -::journal::Future Journal::wait_event(Mutex &lock, uint64_t tid, - Context *on_safe) { +typename Journal::Future Journal::wait_event(Mutex &lock, uint64_t tid, + Context *on_safe) { assert(m_event_lock.is_locked()); CephContext *cct = m_image_ctx.cct; @@ -474,7 +484,7 @@ template // journal entry already safe ldout(cct, 20) << "journal entry already safe" << dendl; m_image_ctx.op_work_queue->queue(on_safe, 0); - return ::journal::Future(); + return Future(); } Event &event = it->second; @@ -492,9 +502,11 @@ void Journal::create_journaler() { assert(m_journaler == NULL); transition_state(STATE_INITIALIZING, 0); - m_journaler = new ::journal::Journaler(m_image_ctx.md_ctx, m_image_ctx.id, "", - m_image_ctx.journal_commit_age); - m_journaler->init(new C_InitJournal(this)); + m_journaler = new Journaler(m_image_ctx.md_ctx, m_image_ctx.id, "", + m_image_ctx.journal_commit_age); + m_journaler->init(create_async_context_callback( + m_image_ctx, create_context_callback< + Journal, &Journal::handle_initialized>(this))); } template @@ -508,7 +520,8 @@ void Journal::destroy_journaler(int r) { m_journal_replay = NULL; transition_state(STATE_CLOSING, r); - m_image_ctx.op_work_queue->queue(new C_DestroyJournaler(this), 0); + m_image_ctx.op_work_queue->queue(create_context_callback< + Journal, &Journal::handle_journal_destroyed>(this), 0); } template @@ -517,13 +530,15 @@ void Journal::recreate_journaler(int r) { ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl; assert(m_lock.is_locked()); - assert(m_state == STATE_REPLAYING); + assert(m_state == STATE_FLUSHING_RESTART || + m_state == STATE_FLUSHING_REPLAY); delete m_journal_replay; m_journal_replay = NULL; transition_state(STATE_RESTARTING_REPLAY, r); - m_image_ctx.op_work_queue->queue(new C_DestroyJournaler(this), 0); + m_image_ctx.op_work_queue->queue(create_context_callback< + Journal, &Journal::handle_journal_destroyed>(this), 0); } template @@ -572,7 +587,7 @@ void Journal::handle_replay_ready() { } while (true) { - ::journal::ReplayEntry replay_entry; + ReplayEntry replay_entry; if (!m_journaler->try_pop_front(&replay_entry)) { return; } @@ -580,21 +595,24 @@ void Journal::handle_replay_ready() { m_lock.Unlock(); bufferlist data = replay_entry.get_data(); bufferlist::iterator it = data.begin(); - int r = m_journal_replay->process( - it, new C_ReplayCommitted(m_journaler, std::move(replay_entry))); + + Context *on_commit = new C_ReplayCommitted(m_journaler, + std::move(replay_entry)); + int r = m_journal_replay->process(it, on_commit); m_lock.Lock(); if (r < 0) { lderr(cct) << "failed to replay journal entry: " << cpp_strerror(r) << dendl; - m_journaler->stop_replay(); + delete on_commit; - if (m_close_pending) { - destroy_journaler(r); - return; - } + m_journaler->stop_replay(); - recreate_journaler(r); + transition_state(STATE_FLUSHING_RESTART, r); + m_journal_replay->flush(create_async_context_callback( + m_image_ctx, create_context_callback< + Journal, &Journal::handle_flushing_restart>(this))); + return; } } } @@ -611,31 +629,62 @@ void Journal::handle_replay_complete(int r) { ldout(cct, 20) << this << " " << __func__ << dendl; m_journaler->stop_replay(); - - if (r == 0) { - r = m_journal_replay->flush(); - } - if (r < 0) { - lderr(cct) << this << " " << __func__ << ": r=" << r << dendl; - recreate_journaler(r); + transition_state(STATE_FLUSHING_RESTART, r); + m_journal_replay->flush(create_async_context_callback( + m_image_ctx, create_context_callback< + Journal, &Journal::handle_flushing_restart>(this))); return; } - delete m_journal_replay; - m_journal_replay = NULL; + transition_state(STATE_FLUSHING_REPLAY, 0); + m_journal_replay->flush(create_async_context_callback( + m_image_ctx, create_context_callback< + Journal, &Journal::handle_flushing_replay>(this))); + } +} - if (m_close_pending) { - destroy_journaler(0); - return; - } +template +void Journal::handle_flushing_restart(int r) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl; + + Mutex::Locker locker(m_lock); + assert(m_state == STATE_FLUSHING_RESTART); + if (m_close_pending) { + destroy_journaler(r); + return; + } + + recreate_journaler(r); +} - m_error_result = 0; - m_journaler->start_append(m_image_ctx.journal_object_flush_interval, - m_image_ctx.journal_object_flush_bytes, - m_image_ctx.journal_object_flush_age); - transition_state(STATE_READY, 0); +template +void Journal::handle_flushing_replay(int r) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl; + + Mutex::Locker locker(m_lock); + assert(m_state == STATE_FLUSHING_REPLAY); + if (m_close_pending) { + destroy_journaler(r); + return; } + + if (r < 0) { + lderr(cct) << this << " " << __func__ << ": r=" << r << dendl; + recreate_journaler(r); + return; + } + + delete m_journal_replay; + m_journal_replay = NULL; + + m_error_result = 0; + m_journaler->start_append(m_image_ctx.journal_object_flush_interval, + m_image_ctx.journal_object_flush_bytes, + m_image_ctx.journal_object_flush_age); + transition_state(STATE_READY, 0); } template @@ -730,25 +779,8 @@ void Journal::stop_recording() { transition_state(STATE_STOPPING, 0); m_journaler->stop_append(util::create_async_context_callback( - m_image_ctx, new C_StopRecording(this))); -} - -template -void Journal::block_writes() { - assert(m_lock.is_locked()); - if (!m_blocking_writes) { - m_blocking_writes = true; - m_image_ctx.aio_work_queue->block_writes(); - } -} - -template -void Journal::unblock_writes() { - assert(m_lock.is_locked()); - if (m_blocking_writes) { - m_blocking_writes = false; - m_image_ctx.aio_work_queue->unblock_writes(); - } + m_image_ctx, create_context_callback< + Journal, &Journal::handle_recording_stopped>(this))); } template @@ -780,7 +812,9 @@ bool Journal::is_steady_state() const { case STATE_UNINITIALIZED: case STATE_INITIALIZING: case STATE_REPLAYING: + case STATE_FLUSHING_RESTART: case STATE_RESTARTING_REPLAY: + case STATE_FLUSHING_REPLAY: case STATE_STOPPING: case STATE_CLOSING: break; diff --git a/src/librbd/Journal.h b/src/librbd/Journal.h index 80c0babbd623c..934b947372b77 100644 --- a/src/librbd/Journal.h +++ b/src/librbd/Journal.h @@ -20,6 +20,7 @@ class Context; namespace journal { class Journaler; +class ReplayEntry; } namespace librbd { @@ -29,9 +30,19 @@ class AioObjectRequest; class ImageCtx; namespace journal { + class EventEntry; template class Replay; -} + +template +struct TypeTraits { + typedef ::journal::Journaler Journaler; + typedef ::journal::Future Future; + typedef ::journal::ReplayEntry ReplayEntry; +}; + +} // namespace journal + template class Journal { @@ -42,21 +53,25 @@ public: * * | * v - * UNINITIALIZED ---> INITIALIZING ---> REPLAYING ------> READY - * | * . ^ * . | - * | * . | * . | - * | * . | (error) * . . . . | - * | * . | * . | - * | * . | v . v - * | * . | RESTARTING . STOPPING - * | * . | | . | - * | * . | | . | - * | * * * * * * . \-------------/ . | - * | * (error) . . | - * | * . . . . . . . . . . . . . | - * | * . . | - * | v v v | - * | CLOSED <----- CLOSING <-------------------------/ + * UNINITIALIZED ---> INITIALIZING ---> REPLAYING ------> FLUSHING ---> READY + * | * . ^ * . * | + * | * . | * . * | + * | * . | (error) * . . . . . . . * | + * | * . | * . * | + * | * . | v . * | + * | * . | FLUSHING_RESTART . * | + * | * . | | . * | + * | * . | | . * | + * | * . | v . * v + * | * . | RESTARTING < * * * * * STOPPING + * | * . | | . | + * | * . | | . | + * | * * * * * * . \-------------/ . | + * | * (error) . . | + * | * . . . . . . . . . . . . . . . . | + * | * . . | + * | v v v | + * | CLOSED <----- CLOSING <---------------------------------------/ * | | * | v * \---> @@ -67,7 +82,9 @@ public: STATE_UNINITIALIZED, STATE_INITIALIZING, STATE_REPLAYING, + STATE_FLUSHING_RESTART, STATE_RESTARTING_REPLAY, + STATE_FLUSHING_REPLAY, STATE_READY, STATE_STOPPING, STATE_CLOSING, @@ -110,11 +127,19 @@ public: void wait_event(uint64_t tid, Context *on_safe); private: + ImageCtxT &m_image_ctx; + + // mock unit testing support + typedef journal::TypeTraits TypeTraits; + typedef typename TypeTraits::Journaler Journaler; + typedef typename TypeTraits::Future Future; + typedef typename TypeTraits::ReplayEntry ReplayEntry; + typedef std::list Contexts; typedef interval_set ExtentInterval; struct Event { - ::journal::Future future; + Future future; AioCompletion *aio_comp; AioObjectRequests aio_object_requests; Contexts on_safe_contexts; @@ -124,7 +149,7 @@ private: Event() : aio_comp(NULL) { } - Event(const ::journal::Future &_future, AioCompletion *_aio_comp, + Event(const Future &_future, AioCompletion *_aio_comp, const AioObjectRequests &_requests, uint64_t offset, size_t length) : future(_future), aio_comp(_aio_comp), aio_object_requests(_requests), safe(false), ret_val(0) { @@ -133,40 +158,8 @@ private: } } }; - typedef ceph::unordered_map Events; - - struct C_InitJournal : public Context { - Journal *journal; - - C_InitJournal(Journal *_journal) : journal(_journal) { - } - - virtual void finish(int r) { - journal->handle_initialized(r); - } - }; - - struct C_StopRecording : public Context { - Journal *journal; - - C_StopRecording(Journal *_journal) : journal(_journal) { - } - - virtual void finish(int r) { - journal->handle_recording_stopped(r); - } - }; - struct C_DestroyJournaler : public Context { - Journal *journal; - - C_DestroyJournaler(Journal *_journal) : journal(_journal) { - } - - virtual void finish(int r) { - journal->handle_journal_destroyed(r); - } - }; + typedef ceph::unordered_map Events; struct C_EventSafe : public Context { Journal *journal; @@ -201,9 +194,7 @@ private: } }; - ImageCtxT &m_image_ctx; - - ::journal::Journaler *m_journaler; + Journaler *m_journaler; mutable Mutex m_lock; State m_state; @@ -221,7 +212,7 @@ private: journal::Replay *m_journal_replay; - ::journal::Future wait_event(Mutex &lock, uint64_t tid, Context *on_safe); + Future wait_event(Mutex &lock, uint64_t tid, Context *on_safe); void create_journaler(); void destroy_journaler(int r); @@ -234,6 +225,9 @@ private: void handle_replay_ready(); void handle_replay_complete(int r); + void handle_flushing_restart(int r); + void handle_flushing_replay(int r); + void handle_recording_stopped(int r); void handle_journal_destroyed(int r); @@ -242,9 +236,6 @@ private: void stop_recording(); - void block_writes(); - void unblock_writes(); - void transition_state(State state, int r); bool is_steady_state() const; diff --git a/src/librbd/journal/Replay.cc b/src/librbd/journal/Replay.cc index 01847f02bd9ff..23172179e1804 100644 --- a/src/librbd/journal/Replay.cc +++ b/src/librbd/journal/Replay.cc @@ -16,7 +16,8 @@ namespace journal { template Replay::Replay(I &image_ctx) - : m_image_ctx(image_ctx), m_lock("Replay::m_lock"), m_ret_val(0) { + : m_image_ctx(image_ctx), m_lock("Replay::m_lock"), m_flush_ctx(nullptr), + m_ret_val(0) { } template @@ -43,15 +44,20 @@ int Replay::process(bufferlist::iterator it, Context *on_safe) { } template -int Replay::flush() { +void Replay::flush(Context *on_finish) { CephContext *cct = m_image_ctx.cct; ldout(cct, 20) << this << " " << __func__ << dendl; - Mutex::Locker locker(m_lock); - while (!m_aio_completions.empty()) { - m_cond.Wait(m_lock); + { + Mutex::Locker locker(m_lock); + assert(m_flush_ctx == nullptr); + m_flush_ctx = on_finish; + + if (!m_aio_completions.empty()) { + return; + } } - return m_ret_val; + on_finish->complete(m_ret_val); } template @@ -179,27 +185,36 @@ AioCompletion *Replay::create_aio_completion(Context *on_safe) { template void Replay::handle_aio_completion(AioCompletion *aio_comp) { - Mutex::Locker locker(m_lock); + Context *on_finish = nullptr; + { + Mutex::Locker locker(m_lock); - AioCompletions::iterator it = m_aio_completions.find(aio_comp); - assert(it != m_aio_completions.end()); + AioCompletions::iterator it = m_aio_completions.find(aio_comp); + assert(it != m_aio_completions.end()); - int r = aio_comp->get_return_value(); + int r = aio_comp->get_return_value(); - CephContext *cct = m_image_ctx.cct; - ldout(cct, 20) << this << " " << __func__ << ": aio_comp=" << aio_comp << ", " - << "r=" << r << dendl; + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << this << " " << __func__ << ": " + << "aio_comp=" << aio_comp << ", " + << "r=" << r << dendl; + + Context *on_safe = it->second; + on_safe->complete(r); - Context *on_safe = it->second; - on_safe->complete(r); + if (r < 0 && m_ret_val == 0) { + m_ret_val = r; + } - if (r < 0 && m_ret_val == 0) { - m_ret_val = r; + m_aio_completions.erase(it); + if (m_aio_completions.empty()) { + on_finish = m_flush_ctx; + } } - m_aio_completions.erase(it); - if (m_aio_completions.empty()) - m_cond.Signal(); + if (on_finish != nullptr) { + on_finish->complete(m_ret_val); + } } template diff --git a/src/librbd/journal/Replay.h b/src/librbd/journal/Replay.h index 64882844bb5d2..3eeb452648ad4 100644 --- a/src/librbd/journal/Replay.h +++ b/src/librbd/journal/Replay.h @@ -7,12 +7,13 @@ #include "include/int_types.h" #include "include/buffer_fwd.h" #include "include/rbd/librbd.hpp" -#include "common/Cond.h" #include "common/Mutex.h" #include "librbd/journal/Entries.h" #include #include +class Context; + namespace librbd { class AioCompletion; @@ -30,7 +31,7 @@ public: ~Replay(); int process(bufferlist::iterator it, Context *on_safe = NULL); - int flush(); + void flush(Context *on_finish); private: typedef std::map AioCompletions; @@ -52,9 +53,9 @@ private: ImageCtxT &m_image_ctx; Mutex m_lock; - Cond m_cond; AioCompletions m_aio_completions; + Context *m_flush_ctx; int m_ret_val; Replay(ImageCtxT &image_ctx); -- 2.39.5