From: Jason Dillaman Date: Tue, 22 Dec 2015 01:27:36 +0000 (-0500) Subject: librbd: asynchronously replay journal events X-Git-Tag: v10.0.3~24^2~9 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=0fff6830dbcde1eabc7f35e99ac496ef5dc46b75;p=ceph.git librbd: asynchronously replay journal events Properly track when AIO operations are safely committed to disk before marking the journal events are committed. Signed-off-by: Jason Dillaman --- diff --git a/src/librbd/Journal.cc b/src/librbd/Journal.cc index 6b050556e7497..b30a34a5ef5d5 100644 --- a/src/librbd/Journal.cc +++ b/src/librbd/Journal.cc @@ -27,23 +27,6 @@ namespace { const std::string CLIENT_DESCRIPTION = "master image"; -template -struct C_ReplayCommitted : public Context { - typedef journal::TypeTraits TypeTraits; - typedef typename TypeTraits::Journaler Journaler; - typedef typename TypeTraits::ReplayEntry ReplayEntry; - - Journaler *journaler; - ReplayEntry replay_entry; - - C_ReplayCommitted(Journaler *journaler, ReplayEntry &&replay_entry) : - journaler(journaler), replay_entry(std::move(replay_entry)) { - } - virtual void finish(int r) { - journaler->committed(replay_entry); - } -}; - } // anonymous namespace template @@ -569,78 +552,100 @@ void Journal::handle_initialized(int r) { template void Journal::handle_replay_ready() { + ReplayEntry replay_entry; + { + Mutex::Locker locker(m_lock); + if (m_state != STATE_REPLAYING) { + return; + } + + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << this << " " << __func__ << dendl; + if (!m_journaler->try_pop_front(&replay_entry)) { + return; + } + } + + bufferlist data = replay_entry.get_data(); + bufferlist::iterator it = data.begin(); + Context *on_ready = create_context_callback< + Journal, &Journal::handle_replay_process_ready>(this); + Context *on_commit = new C_ReplayProcessSafe(this, std::move(replay_entry)); + + m_journal_replay->process(&it, on_ready, on_commit); +} + +template +void Journal::handle_replay_complete(int r) { CephContext *cct = m_image_ctx.cct; - ldout(cct, 20) << this << " " << __func__ << dendl; Mutex::Locker locker(m_lock); if (m_state != STATE_REPLAYING) { return; } - while (true) { - ReplayEntry replay_entry; - if (!m_journaler->try_pop_front(&replay_entry)) { - return; - } - - m_lock.Unlock(); - bufferlist data = replay_entry.get_data(); - bufferlist::iterator it = data.begin(); + ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl; + m_journaler->stop_replay(); + if (r < 0) { + transition_state(STATE_FLUSHING_RESTART, r); - Context *on_commit = new C_ReplayCommitted(m_journaler, - std::move(replay_entry)); - int r = m_journal_replay->process(it, on_commit); - m_lock.Lock(); + m_journal_replay->flush(create_context_callback< + Journal, &Journal::handle_flushing_restart>(this)); + } else { + transition_state(STATE_FLUSHING_REPLAY, 0); - if (r < 0) { - lderr(cct) << "failed to replay journal entry: " << cpp_strerror(r) - << dendl; - delete on_commit; + m_journal_replay->flush(create_context_callback< + Journal, &Journal::handle_flushing_replay>(this)); + } +} - m_journaler->stop_replay(); +template +void Journal::handle_replay_process_ready(int r) { + // journal::Replay is ready for more events -- attempt to pop another + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << this << " " << __func__ << dendl; - transition_state(STATE_FLUSHING_RESTART, r); - m_journal_replay->flush(create_async_context_callback( - m_image_ctx, create_context_callback< - Journal, &Journal::handle_flushing_restart>(this))); - return; - } - } + assert(r == 0); + handle_replay_ready(); } template -void Journal::handle_replay_complete(int r) { +void Journal::handle_replay_process_safe(ReplayEntry replay_entry, int r) { + Mutex::Locker locker(m_lock); + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl; + if (r < 0) { + lderr(cct) << "failed to commit journal event to disk: " << cpp_strerror(r) + << dendl; - { - Mutex::Locker locker(m_lock); - if (m_state != STATE_REPLAYING) { - return; - } + if (m_state == STATE_REPLAYING) { + // abort the replay if we have an error + m_journaler->stop_replay(); + transition_state(STATE_FLUSHING_RESTART, r); - ldout(cct, 20) << this << " " << __func__ << dendl; - m_journaler->stop_replay(); - if (r < 0) { + m_journal_replay->flush(create_context_callback< + Journal, &Journal::handle_flushing_restart>(this)); + return; + } else if (m_state == STATE_FLUSHING_REPLAY) { + // end-of-replay flush in-progress -- we need to restart replay transition_state(STATE_FLUSHING_RESTART, r); - m_journal_replay->flush(create_async_context_callback( - m_image_ctx, create_context_callback< - Journal, &Journal::handle_flushing_restart>(this))); return; } - - transition_state(STATE_FLUSHING_REPLAY, 0); - m_journal_replay->flush(create_async_context_callback( - m_image_ctx, create_context_callback< - Journal, &Journal::handle_flushing_replay>(this))); + } else { + // only commit the entry if written successfully + m_journaler->committed(replay_entry); } } template void Journal::handle_flushing_restart(int r) { + Mutex::Locker locker(m_lock); + CephContext *cct = m_image_ctx.cct; - ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl; + ldout(cct, 20) << this << " " << __func__ << dendl; - Mutex::Locker locker(m_lock); + assert(r == 0); assert(m_state == STATE_FLUSHING_RESTART); if (m_close_pending) { destroy_journaler(r); @@ -652,19 +657,19 @@ void Journal::handle_flushing_restart(int r) { template void Journal::handle_flushing_replay(int r) { + Mutex::Locker locker(m_lock); + CephContext *cct = m_image_ctx.cct; ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl; - Mutex::Locker locker(m_lock); - assert(m_state == STATE_FLUSHING_REPLAY); + assert(r == 0); + assert(m_state == STATE_FLUSHING_REPLAY || m_state == STATE_FLUSHING_RESTART); if (m_close_pending) { destroy_journaler(r); return; - } - - if (r < 0) { - lderr(cct) << this << " " << __func__ << ": r=" << r << dendl; - recreate_journaler(r); + } else if (m_state == STATE_FLUSHING_RESTART) { + // failed to replay one-or-more events -- restart + recreate_journaler(0); return; } diff --git a/src/librbd/Journal.h b/src/librbd/Journal.h index 1740bc6a67616..eda2eb92eed6d 100644 --- a/src/librbd/Journal.h +++ b/src/librbd/Journal.h @@ -12,6 +12,7 @@ #include "include/rados/librados.hpp" #include "common/Mutex.h" #include "journal/Future.h" +#include "journal/ReplayEntry.h" #include "journal/ReplayHandler.h" #include #include @@ -21,7 +22,6 @@ class Context; namespace journal { class Journaler; -class ReplayEntry; } namespace librbd { @@ -198,6 +198,18 @@ private: } }; + struct C_ReplayProcessSafe : public Context { + Journal *journal; + ReplayEntry replay_entry; + + C_ReplayProcessSafe(Journal *journal, ReplayEntry &&replay_entry) : + journal(journal), replay_entry(std::move(replay_entry)) { + } + virtual void finish(int r) { + journal->handle_replay_process_safe(replay_entry, r); + } + }; + struct ReplayHandler : public ::journal::ReplayHandler { Journal *journal; ReplayHandler(Journal *_journal) : journal(_journal) { @@ -250,6 +262,8 @@ private: void handle_replay_ready(); void handle_replay_complete(int r); + void handle_replay_process_ready(int r); + void handle_replay_process_safe(ReplayEntry replay_entry, int r); void handle_flushing_restart(int r); void handle_flushing_replay(int r); diff --git a/src/librbd/journal/Replay.cc b/src/librbd/journal/Replay.cc index c2b5eabe07412..95a412f519d7f 100644 --- a/src/librbd/journal/Replay.cc +++ b/src/librbd/journal/Replay.cc @@ -2,6 +2,8 @@ // vim: ts=8 sw=2 smarttab #include "librbd/journal/Replay.h" +#include "common/dout.h" +#include "common/errno.h" #include "common/WorkQueue.h" #include "librbd/AioCompletion.h" #include "librbd/AioImageRequest.h" @@ -19,38 +21,45 @@ namespace journal { namespace { +static const uint64_t IN_FLIGHT_IO_LOW_WATER_MARK(32); +static const uint64_t IN_FLIGHT_IO_HIGH_WATER_MARK(64); + static NoOpProgressContext no_op_progress_callback; } // anonymous namespace - template Replay::Replay(I &image_ctx) - : m_image_ctx(image_ctx), m_lock("Replay::m_lock"), m_flush_ctx(nullptr), - m_ret_val(0) { + : m_image_ctx(image_ctx), m_lock("Replay::m_lock") { } template Replay::~Replay() { - assert(m_op_contexts.empty() && m_aio_completions.empty()); + assert(m_in_flight_aio == 0); + assert(m_aio_modify_unsafe_contexts.empty()); + assert(m_aio_modify_safe_contexts.empty()); + assert(m_op_contexts.empty()); } template -int Replay::process(bufferlist::iterator it, Context *on_safe) { +void Replay::process(bufferlist::iterator *it, Context *on_ready, + Context *on_safe) { CephContext *cct = m_image_ctx.cct; ldout(cct, 20) << this << " " << __func__ << dendl; - RWLock::RLocker owner_lock(m_image_ctx.owner_lock); journal::EventEntry event_entry; try { - ::decode(event_entry, it); + ::decode(event_entry, *it); } catch (const buffer::error &err) { lderr(cct) << "failed to decode event entry: " << err.what() << dendl; - return -EINVAL; + on_ready->complete(-EINVAL); + return; } - boost::apply_visitor(EventVisitor(this, on_safe), event_entry.event); - return 0; + Mutex::Locker locker(m_lock); + RWLock::RLocker owner_lock(m_image_ctx.owner_lock); + boost::apply_visitor(EventVisitor(this, on_ready, on_safe), + event_entry.event); } template @@ -65,108 +74,127 @@ void Replay::flush(Context *on_finish) { assert(m_flush_ctx == nullptr); m_flush_ctx = on_finish; - if (!m_op_contexts.empty() || !m_aio_completions.empty()) { + if (m_in_flight_aio != 0) { + flush_aio(); + } + + if (!m_op_contexts.empty() || m_in_flight_aio != 0) { return; } } - on_finish->complete(m_ret_val); + on_finish->complete(0); } template void Replay::handle_event(const journal::AioDiscardEvent &event, - Context *on_safe) { + Context *on_ready, Context *on_safe) { CephContext *cct = m_image_ctx.cct; ldout(cct, 20) << this << " " << __func__ << ": AIO discard event" << dendl; - AioCompletion *aio_comp = create_aio_completion(on_safe); + bool flush_required; + AioCompletion *aio_comp = create_aio_modify_completion(on_ready, on_safe, + &flush_required); AioImageRequest::aio_discard(&m_image_ctx, aio_comp, event.offset, event.length); + if (flush_required) { + flush_aio(); + } } template void Replay::handle_event(const journal::AioWriteEvent &event, - Context *on_safe) { + Context *on_ready, Context *on_safe) { CephContext *cct = m_image_ctx.cct; ldout(cct, 20) << this << " " << __func__ << ": AIO write event" << dendl; bufferlist data = event.data; - AioCompletion *aio_comp = create_aio_completion(on_safe); + bool flush_required; + AioCompletion *aio_comp = create_aio_modify_completion(on_ready, on_safe, + &flush_required); AioImageRequest::aio_write(&m_image_ctx, aio_comp, event.offset, event.length, data.c_str(), 0); + if (flush_required) { + flush_aio(); + } } template void Replay::handle_event(const journal::AioFlushEvent &event, - Context *on_safe) { + Context *on_ready, Context *on_safe) { CephContext *cct = m_image_ctx.cct; ldout(cct, 20) << this << " " << __func__ << ": AIO flush event" << dendl; - AioCompletion *aio_comp = create_aio_completion(on_safe); + AioCompletion *aio_comp = create_aio_flush_completion(on_ready, on_safe); AioImageRequest::aio_flush(&m_image_ctx, aio_comp); } template void Replay::handle_event(const journal::OpFinishEvent &event, - Context *on_safe) { + Context *on_ready, Context *on_safe) { CephContext *cct = m_image_ctx.cct; ldout(cct, 20) << this << " " << __func__ << ": Op finish event" << dendl; } template void Replay::handle_event(const journal::SnapCreateEvent &event, - Context *on_safe) { + Context *on_ready, Context *on_safe) { CephContext *cct = m_image_ctx.cct; ldout(cct, 20) << this << " " << __func__ << ": Snap create event" << dendl; + // TODO not-ready until state machine lets us know Context *on_finish = create_op_context_callback(on_safe); m_image_ctx.operations->snap_create(event.snap_name.c_str(), on_finish); } template void Replay::handle_event(const journal::SnapRemoveEvent &event, - Context *on_safe) { + Context *on_ready, Context *on_safe) { CephContext *cct = m_image_ctx.cct; ldout(cct, 20) << this << " " << __func__ << ": Snap remove event" << dendl; Context *on_finish = create_op_context_callback(on_safe); m_image_ctx.operations->snap_remove(event.snap_name.c_str(), on_finish); + on_ready->complete(0); } template void Replay::handle_event(const journal::SnapRenameEvent &event, - Context *on_safe) { + Context *on_ready, Context *on_safe) { CephContext *cct = m_image_ctx.cct; ldout(cct, 20) << this << " " << __func__ << ": Snap rename event" << dendl; Context *on_finish = create_op_context_callback(on_safe); m_image_ctx.operations->snap_rename(event.snap_id, event.snap_name.c_str(), on_finish); + on_ready->complete(0); } template void Replay::handle_event(const journal::SnapProtectEvent &event, - Context *on_safe) { + Context *on_ready, Context *on_safe) { CephContext *cct = m_image_ctx.cct; ldout(cct, 20) << this << " " << __func__ << ": Snap protect event" << dendl; Context *on_finish = create_op_context_callback(on_safe); m_image_ctx.operations->snap_protect(event.snap_name.c_str(), on_finish); + on_ready->complete(0); } template void Replay::handle_event(const journal::SnapUnprotectEvent &event, - Context *on_safe) { + Context *on_ready, Context *on_safe) { CephContext *cct = m_image_ctx.cct; ldout(cct, 20) << this << " " << __func__ << ": Snap unprotect event" << dendl; Context *on_finish = create_op_context_callback(on_safe); m_image_ctx.operations->snap_unprotect(event.snap_name.c_str(), on_finish); + on_ready->complete(0); } template void Replay::handle_event(const journal::SnapRollbackEvent &event, - Context *on_safe) { + Context *on_ready, Context *on_safe) { CephContext *cct = m_image_ctx.cct; ldout(cct, 20) << this << " " << __func__ << ": Snap rollback start event" << dendl; @@ -174,24 +202,27 @@ void Replay::handle_event(const journal::SnapRollbackEvent &event, Context *on_finish = create_op_context_callback(on_safe); m_image_ctx.operations->snap_rollback(event.snap_name.c_str(), no_op_progress_callback, on_finish); + on_ready->complete(0); } template void Replay::handle_event(const journal::RenameEvent &event, - Context *on_safe) { + Context *on_ready, Context *on_safe) { CephContext *cct = m_image_ctx.cct; ldout(cct, 20) << this << " " << __func__ << ": Rename event" << dendl; Context *on_finish = create_op_context_callback(on_safe); m_image_ctx.operations->rename(event.image_name.c_str(), on_finish); + on_ready->complete(0); } template void Replay::handle_event(const journal::ResizeEvent &event, - Context *on_safe) { + Context *on_ready, Context *on_safe) { CephContext *cct = m_image_ctx.cct; ldout(cct, 20) << this << " " << __func__ << ": Resize start event" << dendl; + // TODO not-ready until state machine lets us know Context *on_finish = create_op_context_callback(on_safe); m_image_ctx.operations->resize(event.size, no_op_progress_callback, on_finish); @@ -199,108 +230,186 @@ void Replay::handle_event(const journal::ResizeEvent &event, template void Replay::handle_event(const journal::FlattenEvent &event, - Context *on_safe) { + Context *on_ready, Context *on_safe) { CephContext *cct = m_image_ctx.cct; ldout(cct, 20) << this << " " << __func__ << ": Flatten start event" << dendl; Context *on_finish = create_op_context_callback(on_safe); m_image_ctx.operations->flatten(no_op_progress_callback, on_finish); + on_ready->complete(0); } template void Replay::handle_event(const journal::UnknownEvent &event, - Context *on_safe) { + Context *on_ready, Context *on_safe) { CephContext *cct = m_image_ctx.cct; ldout(cct, 20) << this << " " << __func__ << ": unknown event" << dendl; + on_ready->complete(0); on_safe->complete(0); } template -Context *Replay::create_op_context_callback(Context *on_safe) { - C_OpOnFinish *on_finish; - { - on_finish = new C_OpOnFinish(this); - m_op_contexts[on_finish] = on_safe; +void Replay::flush_aio() { + assert(m_lock.is_locked()); + + AioCompletion *aio_comp = create_aio_flush_completion(nullptr, nullptr); + AioImageRequest::aio_flush(&m_image_ctx, aio_comp); +} + +template +void Replay::handle_aio_modify_complete(Context *on_safe, int r) { + Mutex::Locker locker(m_lock); + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << this << " " << __func__ << ": on_safe=" << on_safe << ", " + << "r=" << r << dendl; + + if (r < 0) { + lderr(cct) << "AIO modify op failed: " << cpp_strerror(r) << dendl; + on_safe->complete(r); + return; } - return on_finish; + + // will be completed after next flush operation completes + m_aio_modify_safe_contexts.insert(on_safe); } template -void Replay::handle_op_context_callback(Context *on_op_finish, int r) { - Context *on_safe = nullptr; +void Replay::handle_aio_flush_complete(Context *on_flush_safe, + Contexts &on_safe_ctxs, int r) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << this << " " << __func__ << ": " << "r=" << r << dendl; + + if (r < 0) { + lderr(cct) << "AIO flush failed: " << cpp_strerror(r) << dendl; + } + + Context *on_aio_ready = nullptr; Context *on_flush = nullptr; { Mutex::Locker locker(m_lock); - auto it = m_op_contexts.find(on_op_finish); - assert(it != m_op_contexts.end()); + assert(m_in_flight_aio >= on_safe_ctxs.size()); + m_in_flight_aio -= on_safe_ctxs.size(); - if (m_ret_val == 0 && r < 0) { - m_ret_val = r; + std::swap(on_aio_ready, m_on_aio_ready); + if (m_op_contexts.empty() && m_in_flight_aio == 0) { + on_flush = m_flush_ctx; } - on_safe = it->second; - m_op_contexts.erase(it); - if (m_op_contexts.empty() && m_aio_completions.empty()) { - on_flush = m_flush_ctx; + // strip out previously failed on_safe contexts + for (auto it = on_safe_ctxs.begin(); it != on_safe_ctxs.end(); ) { + if (m_aio_modify_safe_contexts.erase(*it)) { + ++it; + } else { + it = on_safe_ctxs.erase(it); + } } } - on_safe->complete(r); + if (on_aio_ready != nullptr) { + ldout(cct, 10) << "resuming paused AIO" << dendl; + on_aio_ready->complete(0); + } + for (auto ctx : on_safe_ctxs) { + ctx->complete(r); + } + if (on_flush_safe != nullptr) { + on_flush_safe->complete(r); + } if (on_flush != nullptr) { - on_flush->complete(m_ret_val); + on_flush->complete(r); } } template -AioCompletion *Replay::create_aio_completion(Context *on_safe) { - Mutex::Locker locker(m_lock); - AioCompletion *aio_comp = AioCompletion::create(this, aio_completion_callback, - nullptr); - m_aio_completions.insert(std::pair( - aio_comp, on_safe)); - return aio_comp; +Context *Replay::create_op_context_callback(Context *on_safe) { + assert(m_lock.is_locked()); + + C_OpOnFinish *on_finish; + { + on_finish = new C_OpOnFinish(this); + m_op_contexts[on_finish] = on_safe; + } + return on_finish; } template -void Replay::handle_aio_completion(AioCompletion *aio_comp) { - int r; +void Replay::handle_op_context_callback(Context *on_op_finish, int r) { Context *on_safe = nullptr; Context *on_flush = nullptr; { Mutex::Locker locker(m_lock); - AioCompletions::iterator it = m_aio_completions.find(aio_comp); - assert(it != m_aio_completions.end()); - - r = aio_comp->get_return_value(); - if (m_ret_val == 0 && r < 0) { - m_ret_val = r; - } - - CephContext *cct = m_image_ctx.cct; - ldout(cct, 20) << this << " " << __func__ << ": " - << "aio_comp=" << aio_comp << ", " - << "r=" << r << dendl; + auto it = m_op_contexts.find(on_op_finish); + assert(it != m_op_contexts.end()); on_safe = it->second; - m_aio_completions.erase(it); - if (m_op_contexts.empty() && m_aio_completions.empty()) { + m_op_contexts.erase(it); + if (m_op_contexts.empty() && m_in_flight_aio == 0) { on_flush = m_flush_ctx; } } on_safe->complete(r); if (on_flush != nullptr) { - on_flush->complete(m_ret_val); + on_flush->complete(0); } } template -void Replay::aio_completion_callback(completion_t cb, void *arg) { - Replay *replay = reinterpret_cast(arg); - AioCompletion *aio_comp = reinterpret_cast(cb); +AioCompletion *Replay::create_aio_modify_completion(Context *on_ready, + Context *on_safe, + bool *flush_required) { + CephContext *cct = m_image_ctx.cct; + assert(m_lock.is_locked()); + assert(m_on_aio_ready == nullptr); + + ++m_in_flight_aio; + m_aio_modify_unsafe_contexts.push_back(on_safe); + + // FLUSH if we hit the low-water mark -- on_safe contexts are + // completed by flushes-only so that we don't move the journal + // commit position until safely on-disk + + // when safe, the completion of the next flush will fire the on_safe + // callback + AioCompletion *aio_comp = AioCompletion::create( + new C_AioModifyComplete(this, on_safe)); + + *flush_required = (m_aio_modify_unsafe_contexts.size() == + IN_FLIGHT_IO_LOW_WATER_MARK); + if (*flush_required) { + ldout(cct, 10) << "hit AIO replay low-water mark: scheduling flush" + << dendl; + } - replay->handle_aio_completion(aio_comp); - aio_comp->release(); + // READY for more events if: + // * not at high-water mark for IO + // * in-flight ops are at a consistent point (snap create has IO flushed, + // shrink has adjusted clip boundary, etc) -- should have already been + // flagged not-ready + if (m_in_flight_aio == IN_FLIGHT_IO_HIGH_WATER_MARK) { + ldout(cct, 10) << "hit AIO replay high-water mark: pausing replay" + << dendl; + m_on_aio_ready = on_ready; + } else { + on_ready->complete(0); + } + + return aio_comp; +} + +template +AioCompletion *Replay::create_aio_flush_completion(Context *on_ready, + Context *on_safe) { + // associate all prior write/discard ops to this flush request + AioCompletion *aio_comp = AioCompletion::create( + new C_AioFlushComplete(this, on_safe, + std::move(m_aio_modify_unsafe_contexts))); + m_aio_modify_unsafe_contexts.clear(); + + if (on_ready != nullptr) { + on_ready->complete(0); + } + return aio_comp; } } // namespace journal diff --git a/src/librbd/journal/Replay.h b/src/librbd/journal/Replay.h index 5a8c559866d6f..c71157da04dc0 100644 --- a/src/librbd/journal/Replay.h +++ b/src/librbd/journal/Replay.h @@ -7,12 +7,13 @@ #include "include/int_types.h" #include "include/buffer_fwd.h" #include "include/Context.h" +#include "include/unordered_set.h" #include "include/unordered_map.h" #include "include/rbd/librbd.hpp" #include "common/Mutex.h" #include "librbd/journal/Entries.h" #include -#include +#include namespace librbd { @@ -31,12 +32,13 @@ public: Replay(ImageCtxT &image_ctx); ~Replay(); - int process(bufferlist::iterator it, Context *on_safe = NULL); + void process(bufferlist::iterator *it, Context *on_ready, Context *on_safe); void flush(Context *on_finish); private: + typedef std::list Contexts; + typedef ceph::unordered_set ContextSet; typedef ceph::unordered_map OpContexts; - typedef std::map AioCompletions; struct C_OpOnFinish : public Context { Replay *replay; @@ -47,17 +49,43 @@ private: } }; + struct C_AioModifyComplete : public Context { + Replay *replay; + Context *on_safe; + C_AioModifyComplete(Replay *replay, Context *on_safe) + : replay(replay), on_safe(on_safe) { + } + virtual void finish(int r) { + replay->handle_aio_modify_complete(on_safe, r); + } + }; + + struct C_AioFlushComplete : public Context { + Replay *replay; + Context *on_flush_safe; + Contexts on_safe_ctxs; + C_AioFlushComplete(Replay *replay, Context *on_flush_safe, + Contexts &&on_safe_ctxs) + : replay(replay), on_flush_safe(on_flush_safe), + on_safe_ctxs(on_safe_ctxs) { + } + virtual void finish(int r) { + replay->handle_aio_flush_complete(on_flush_safe, on_safe_ctxs, r); + } + }; + struct EventVisitor : public boost::static_visitor { Replay *replay; + Context *on_ready; Context *on_safe; - EventVisitor(Replay *_replay, Context *_on_safe) - : replay(_replay), on_safe(_on_safe) { + EventVisitor(Replay *_replay, Context *_on_ready, Context *_on_safe) + : replay(_replay), on_ready(_on_ready), on_safe(_on_safe) { } template inline void operator()(const Event &event) const { - replay->handle_event(event, on_safe); + replay->handle_event(event, on_ready, on_safe); } }; @@ -65,33 +93,59 @@ private: Mutex m_lock; + uint64_t m_in_flight_aio = 0; + Contexts m_aio_modify_unsafe_contexts; + ContextSet m_aio_modify_safe_contexts; + OpContexts m_op_contexts; - AioCompletions m_aio_completions; - Context *m_flush_ctx; - int m_ret_val; - - void handle_event(const AioDiscardEvent &event, Context *on_safe); - void handle_event(const AioWriteEvent &event, Context *on_safe); - void handle_event(const AioFlushEvent &event, Context *on_safe); - void handle_event(const OpFinishEvent &event, Context *on_safe); - void handle_event(const SnapCreateEvent &event, Context *on_safe); - void handle_event(const SnapRemoveEvent &event, Context *on_safe); - void handle_event(const SnapRenameEvent &event, Context *on_safe); - void handle_event(const SnapProtectEvent &event, Context *on_safe); - void handle_event(const SnapUnprotectEvent &event, Context *on_safe); - void handle_event(const SnapRollbackEvent &event, Context *on_safe); - void handle_event(const RenameEvent &event, Context *on_safe); - void handle_event(const ResizeEvent &event, Context *on_safe); - void handle_event(const FlattenEvent &event, Context *on_safe); - void handle_event(const UnknownEvent &event, Context *on_safe); + + Context *m_flush_ctx = nullptr; + Context *m_on_aio_ready = nullptr; + + void handle_event(const AioDiscardEvent &event, Context *on_ready, + Context *on_safe); + void handle_event(const AioWriteEvent &event, Context *on_ready, + Context *on_safe); + void handle_event(const AioFlushEvent &event, Context *on_ready, + Context *on_safe); + void handle_event(const OpFinishEvent &event, Context *on_ready, + Context *on_safe); + void handle_event(const SnapCreateEvent &event, Context *on_ready, + Context *on_safe); + void handle_event(const SnapRemoveEvent &event, Context *on_ready, + Context *on_safe); + void handle_event(const SnapRenameEvent &event, Context *on_ready, + Context *on_safe); + void handle_event(const SnapProtectEvent &event, Context *on_ready, + Context *on_safe); + void handle_event(const SnapUnprotectEvent &event, Context *on_ready, + Context *on_safe); + void handle_event(const SnapRollbackEvent &event, Context *on_ready, + Context *on_safe); + void handle_event(const RenameEvent &event, Context *on_ready, + Context *on_safe); + void handle_event(const ResizeEvent &event, Context *on_ready, + Context *on_safe); + void handle_event(const FlattenEvent &event, Context *on_ready, + Context *on_safe); + void handle_event(const UnknownEvent &event, Context *on_ready, + Context *on_safe); + + void flush_aio(); + void handle_aio_modify_complete(Context *on_safe, int r); + void handle_aio_flush_complete(Context *on_flush_safe, Contexts &on_safe_ctxs, + int r); Context *create_op_context_callback(Context *on_safe); void handle_op_context_callback(Context *on_op_finish, int r); - AioCompletion *create_aio_completion(Context *on_safe); + AioCompletion *create_aio_modify_completion(Context *on_ready, + Context *on_safe, + bool *flush_required); + AioCompletion *create_aio_flush_completion(Context *on_ready, + Context *on_safe); void handle_aio_completion(AioCompletion *aio_comp); - static void aio_completion_callback(completion_t cb, void *arg); }; } // namespace journal