From: Jason Dillaman Date: Thu, 24 Mar 2016 19:56:46 +0000 (-0400) Subject: rbd-mirror: framework for replay allocating tags in local journal X-Git-Tag: v10.1.1~64^2~13 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=f42e7610fb67dced9d290c9b01c3fd22a19a3e72;p=ceph.git rbd-mirror: framework for replay allocating tags in local journal Signed-off-by: Jason Dillaman --- diff --git a/src/tools/rbd_mirror/ImageReplayer.cc b/src/tools/rbd_mirror/ImageReplayer.cc index 07334f979895..14564cf206f9 100644 --- a/src/tools/rbd_mirror/ImageReplayer.cc +++ b/src/tools/rbd_mirror/ImageReplayer.cc @@ -9,7 +9,6 @@ #include "common/Timer.h" #include "common/WorkQueue.h" #include "journal/Journaler.h" -#include "journal/ReplayEntry.h" #include "journal/ReplayHandler.h" #include "librbd/ExclusiveLock.h" #include "librbd/ImageCtx.h" @@ -62,22 +61,6 @@ struct ReplayHandler : public ::journal::ReplayHandler { } }; -template -struct C_ReplayCommitted : public Context { - typedef typename librbd::journal::TypeTraits::ReplayEntry ReplayEntry; - - ImageReplayer *replayer; - ReplayEntry replay_entry; - - C_ReplayCommitted(ImageReplayer *replayer, - ReplayEntry &&replay_entry) - : replayer(replayer), replay_entry(std::move(replay_entry)) { - } - virtual void finish(int r) { - replayer->handle_replay_committed(&replay_entry, r); - } -}; - class ImageReplayerAdminSocketCommand { public: virtual ~ImageReplayerAdminSocketCommand() {} @@ -194,7 +177,6 @@ ImageReplayer::ImageReplayer(Threads *threads, RadosRef local, RadosRef remot m_local_replay(nullptr), m_remote_journaler(nullptr), m_replay_handler(nullptr), - m_on_finish(nullptr), m_asok_hook(nullptr) { } @@ -206,7 +188,8 @@ ImageReplayer::~ImageReplayer() assert(m_local_replay == nullptr); assert(m_remote_journaler == nullptr); assert(m_replay_handler == nullptr); - + assert(m_on_start_finish == nullptr); + assert(m_on_stop_finish == nullptr); delete m_asok_hook; } @@ -214,17 +197,16 @@ template void ImageReplayer::start(Context *on_finish, const BootstrapParams *bootstrap_params) { - dout(20) << "on_finish=" << on_finish << ", m_on_finish=" << m_on_finish - << dendl; + assert(m_on_start_finish == nullptr); + assert(m_on_stop_finish == nullptr); + dout(20) << "on_finish=" << on_finish << dendl; { Mutex::Locker locker(m_lock); assert(is_stopped_()); m_state = STATE_STARTING; - - assert(m_on_finish == nullptr); - m_on_finish = on_finish; + m_on_start_finish = on_finish; } int r = m_remote->ioctx_create2(m_remote_pool_id, m_remote_ioctx); @@ -344,18 +326,18 @@ void ImageReplayer::start_replay() { Context *on_finish(nullptr); { Mutex::Locker locker(m_lock); - if (m_state == STATE_STOPPING) { + if (m_stop_requested) { on_start_fail_start(-EINTR); return; } assert(m_state == STATE_STARTING); m_state = STATE_REPLAYING; - std::swap(m_on_finish, on_finish); + std::swap(m_on_start_finish, on_finish); } dout(20) << "start succeeded" << dendl; - if (on_finish) { + if (on_finish != nullptr) { dout(20) << "on finish complete, r=" << r << dendl; on_finish->complete(r); } @@ -408,25 +390,31 @@ void ImageReplayer::on_start_fail_finish(int r) m_local_ioctx.close(); m_remote_ioctx.close(); - Context *on_finish(nullptr); - + Context *on_start_finish(nullptr); + Context *on_stop_finish(nullptr); { Mutex::Locker locker(m_lock); - if (m_state == STATE_STOPPING) { + if (m_stop_requested) { assert(r == -EINTR); dout(20) << "start interrupted" << dendl; m_state = STATE_STOPPED; + m_stop_requested = false; } else { assert(m_state == STATE_STARTING); dout(20) << "start failed" << dendl; m_state = STATE_UNINITIALIZED; } - std::swap(m_on_finish, on_finish); + std::swap(m_on_start_finish, on_start_finish); + std::swap(m_on_stop_finish, on_stop_finish); } - if (on_finish) { - dout(20) << "on finish complete, r=" << r << dendl; - on_finish->complete(r); + if (on_start_finish != nullptr) { + dout(20) << "on start finish complete, r=" << r << dendl; + on_start_finish->complete(r); + } + if (on_stop_finish != nullptr) { + dout(20) << "on stop finish complete, r=" << r << dendl; + on_stop_finish->complete(0); } } @@ -434,13 +422,11 @@ template bool ImageReplayer::on_start_interrupted() { Mutex::Locker locker(m_lock); - - if (m_state == STATE_STARTING) { + assert(m_state == STATE_STARTING); + if (m_on_stop_finish == nullptr) { return false; } - assert(m_state == STATE_STOPPING); - on_start_fail_start(-EINTR); return true; } @@ -448,48 +434,32 @@ bool ImageReplayer::on_start_interrupted() template void ImageReplayer::stop(Context *on_finish) { - dout(20) << "on_finish=" << on_finish << ", m_on_finish=" << m_on_finish - << dendl; + dout(20) << "on_finish=" << on_finish << dendl; - Mutex::Locker locker(m_lock); - assert(is_running_()); - - if (m_state == STATE_STARTING) { - dout(20) << "interrupting start" << dendl; - - if (on_finish) { - Context *on_start_finish = m_on_finish; - FunctionContext *ctx = new FunctionContext( - [this, on_start_finish, on_finish](int r) { - if (on_start_finish) { - on_start_finish->complete(r); - } - on_finish->complete(0); - }); - - m_on_finish = ctx; - } - } else if (m_state == STATE_FLUSHING_REPLAY) { - dout(20) << "interrupting flush" << dendl; - - if (on_finish) { - Context *on_flush_finish = m_on_finish; - FunctionContext *ctx = new FunctionContext( - [this, on_flush_finish, on_finish](int r) { - if (on_flush_finish) { - on_flush_finish->complete(r); - } - on_finish->complete(0); - }); - - m_on_finish = ctx; + bool shut_down_replay = false; + { + Mutex::Locker locker(m_lock); + assert(is_running_()); + + if (!is_stopped_()) { + if (m_state == STATE_STARTING) { + dout(20) << "interrupting start" << dendl; + } else { + dout(20) << "interrupting replay" << dendl; + shut_down_replay = true; + } + + assert(m_on_stop_finish == nullptr); + std::swap(m_on_stop_finish, on_finish); + m_stop_requested = true; } - } else { - assert(m_on_finish == nullptr); - m_on_finish = on_finish; + } + + if (shut_down_replay) { on_stop_journal_replay_shut_down_start(); + } else if (on_finish != nullptr) { + on_finish->complete(0); } - m_state = STATE_STOPPING; } template @@ -502,20 +472,32 @@ void ImageReplayer::on_stop_journal_replay_shut_down_start() on_stop_journal_replay_shut_down_finish(r); }); - m_local_replay->shut_down(false, ctx); + { + Mutex::Locker locker(m_lock); + + // as we complete in-flight records, we might receive multiple stop requests + if (m_state != STATE_REPLAYING) { + return; + } + m_state = STATE_STOPPING; + m_local_replay->shut_down(false, ctx); + } } template void ImageReplayer::on_stop_journal_replay_shut_down_finish(int r) { dout(20) << "r=" << r << dendl; - if (r < 0) { derr << "error flushing journal replay: " << cpp_strerror(r) << dendl; } - m_local_image_ctx->journal->stop_external_replay(); - m_local_replay = nullptr; + { + Mutex::Locker locker(m_lock); + assert(m_state == STATE_STOPPING); + m_local_image_ctx->journal->stop_external_replay(); + m_local_replay = nullptr; + } on_stop_local_image_close_start(); } @@ -561,13 +543,13 @@ void ImageReplayer::on_stop_local_image_close_finish(int r) assert(m_state == STATE_STOPPING); m_state = STATE_STOPPED; - - std::swap(m_on_finish, on_finish); + m_stop_requested = false; + std::swap(m_on_stop_finish, on_finish); } dout(20) << "stop complete" << dendl; - if (on_finish) { + if (on_finish != nullptr) { dout(20) << "on finish complete, r=" << r << dendl; on_finish->complete(r); } @@ -583,20 +565,16 @@ template void ImageReplayer::handle_replay_ready() { dout(20) << "enter" << dendl; - - ReplayEntry replay_entry; - if (!m_remote_journaler->try_pop_front(&replay_entry)) { + if (on_replay_interrupted()) { return; } - dout(20) << "processing entry tid=" << replay_entry.get_commit_tid() << dendl; + if (!m_remote_journaler->try_pop_front(&m_replay_entry)) { + return; + } - bufferlist data = replay_entry.get_data(); - bufferlist::iterator it = data.begin(); - Context *on_ready = create_context_callback< - ImageReplayer, &ImageReplayer::handle_replay_process_ready>(this); - Context *on_commit = new C_ReplayCommitted(this, std::move(replay_entry)); - m_local_replay->process(&it, on_ready, on_commit); + // TODO + process_entry(); } template @@ -604,127 +582,89 @@ void ImageReplayer::flush(Context *on_finish) { dout(20) << "enter" << dendl; - bool start_flush = false; - { Mutex::Locker locker(m_lock); - - if (m_state == STATE_REPLAYING) { - assert(m_on_finish == nullptr); - m_on_finish = on_finish; - - m_state = STATE_FLUSHING_REPLAY; - - start_flush = true; + if (m_state == STATE_REPLAYING || m_state == STATE_REPLAYING) { + Context *ctx = new FunctionContext( + [on_finish](int r) { + if (on_finish != nullptr) { + on_finish->complete(r); + } + }); + on_flush_local_replay_flush_start(ctx); } } - if (start_flush) { - on_flush_local_replay_flush_start(); - } else if (on_finish) { + if (on_finish) { on_finish->complete(0); } } template -void ImageReplayer::on_flush_local_replay_flush_start() +void ImageReplayer::on_flush_local_replay_flush_start(Context *on_flush) { dout(20) << "enter" << dendl; - FunctionContext *ctx = new FunctionContext( - [this](int r) { - on_flush_local_replay_flush_finish(r); + [this, on_flush](int r) { + on_flush_local_replay_flush_finish(on_flush, r); }); + assert(m_lock.is_locked()); + assert(m_state == STATE_REPLAYING); m_local_replay->flush(ctx); } template -void ImageReplayer::on_flush_local_replay_flush_finish(int r) +void ImageReplayer::on_flush_local_replay_flush_finish(Context *on_flush, + int r) { dout(20) << "r=" << r << dendl; - if (r < 0) { derr << "error flushing local replay: " << cpp_strerror(r) << dendl; - } - - if (on_flush_interrupted()) { + on_flush->complete(r); return; } - on_flush_flush_commit_position_start(r); + on_flush_flush_commit_position_start(on_flush); } template -void ImageReplayer::on_flush_flush_commit_position_start(int last_r) +void ImageReplayer::on_flush_flush_commit_position_start(Context *on_flush) { - FunctionContext *ctx = new FunctionContext( - [this, last_r](int r) { - on_flush_flush_commit_position_finish(last_r, r); + [this, on_flush](int r) { + on_flush_flush_commit_position_finish(on_flush, r); }); m_remote_journaler->flush_commit_position(ctx); } template -void ImageReplayer::on_flush_flush_commit_position_finish(int last_r, int r) +void ImageReplayer::on_flush_flush_commit_position_finish(Context *on_flush, + int r) { if (r < 0) { derr << "error flushing remote journal commit position: " << cpp_strerror(r) << dendl; - } else { - r = last_r; - } - - Context *on_finish(nullptr); - - { - Mutex::Locker locker(m_lock); - if (m_state == STATE_STOPPING) { - r = -EINTR; - } else { - assert(m_state == STATE_FLUSHING_REPLAY); - - m_state = STATE_REPLAYING; - } - std::swap(m_on_finish, on_finish); } dout(20) << "flush complete, r=" << r << dendl; - - if (on_finish) { - dout(20) << "on finish complete, r=" << r << dendl; - on_finish->complete(r); - } + on_flush->complete(r); } template -bool ImageReplayer::on_flush_interrupted() +bool ImageReplayer::on_replay_interrupted() { - Context *on_finish(nullptr); - + bool shut_down; { Mutex::Locker locker(m_lock); - - if (m_state == STATE_FLUSHING_REPLAY) { - return false; - } - - assert(m_state == STATE_STOPPING); - - std::swap(m_on_finish, on_finish); + shut_down = m_stop_requested; } - dout(20) << "flush interrupted" << dendl; - - if (on_finish) { - int r = -EINTR; - dout(20) << "on finish complete, r=" << r << dendl; - on_finish->complete(r); + if (shut_down) { + on_stop_journal_replay_shut_down_start(); } - - return true; + return shut_down; } template @@ -746,37 +686,99 @@ void ImageReplayer::print_status(Formatter *f, stringstream *ss) } template -void ImageReplayer::handle_replay_process_ready(int r) +void ImageReplayer::handle_replay_complete(int r) { - // journal::Replay is ready for more events -- attempt to pop another - - dout(20) << "enter" << dendl; - + dout(20) << "r=" << r << dendl; if (r < 0) { - derr << "error replaying journal entry: " << cpp_strerror(r) - << dendl; - // TODO: handle error + derr << "replay encountered an error: " << cpp_strerror(r) << dendl; } - assert(r == 0); - handle_replay_ready(); + { + Mutex::Locker locker(m_lock); + m_stop_requested = true; + } + on_replay_interrupted(); } template -void ImageReplayer::handle_replay_complete(int r) -{ - dout(20) "r=" << r << dendl; +void ImageReplayer::replay_flush() { + dout(20) << dendl; - //m_remote_journaler->stop_replay(); + // TODO } template -void ImageReplayer::handle_replay_committed(ReplayEntry *replay_entry, int r) -{ - dout(20) << "commit_tid=" << replay_entry->get_commit_tid() << ", r=" << r +void ImageReplayer::handle_replay_flush(int r) { + dout(20) << "r=" << r << dendl; + + // TODO +} + +template +void ImageReplayer::get_remote_tag() { + dout(20) << dendl; + + // TODO +} + +template +void ImageReplayer::handle_get_remote_tag(int r) { + dout(20) << "r=" << r << dendl; + + // TODO +} + +template +void ImageReplayer::allocate_local_tag() { + dout(20) << dendl; + + // TODO +} + +template +void ImageReplayer::handle_allocate_local_tag(int r) { + dout(20) << "r=" << r << dendl; + + // TODO +} + +template +void ImageReplayer::process_entry() { + dout(20) << "processing entry tid=" << m_replay_entry.get_commit_tid() + << dendl; + + bufferlist data = m_replay_entry.get_data(); + bufferlist::iterator it = data.begin(); + + Context *on_ready = create_context_callback< + ImageReplayer, &ImageReplayer::handle_process_entry_ready>(this); + Context *on_commit = new C_ReplayCommitted(this, std::move(m_replay_entry)); + m_local_replay->process(&it, on_ready, on_commit); +} + +template +void ImageReplayer::handle_process_entry_ready(int r) { + dout(20) << dendl; + assert(r == 0); + + // attempt to process the next event + handle_replay_ready(); +} + +template +void ImageReplayer::handle_process_entry_safe(const ReplayEntry& replay_entry, + int r) { + dout(20) << "commit_tid=" << replay_entry.get_commit_tid() << ", r=" << r << dendl; - m_remote_journaler->committed(*replay_entry); + if (r < 0) { + derr << "failed to commit journal event: " << cpp_strerror(r) << dendl; + + handle_replay_complete(r); + return; + } + + m_remote_journaler->committed(replay_entry); } template @@ -799,8 +801,6 @@ std::string ImageReplayer::to_string(const State state) { return "Starting"; case ImageReplayer::STATE_REPLAYING: return "Replaying"; - case ImageReplayer::STATE_FLUSHING_REPLAY: - return "FlushingReplay"; case ImageReplayer::STATE_STOPPING: return "Stopping"; case ImageReplayer::STATE_STOPPED: diff --git a/src/tools/rbd_mirror/ImageReplayer.h b/src/tools/rbd_mirror/ImageReplayer.h index ffff469954d4..9073a5b69ede 100644 --- a/src/tools/rbd_mirror/ImageReplayer.h +++ b/src/tools/rbd_mirror/ImageReplayer.h @@ -12,6 +12,7 @@ #include "common/WorkQueue.h" #include "include/rados/librados.hpp" #include "cls/journal/cls_journal_types.h" +#include "journal/ReplayEntry.h" #include "librbd/journal/Types.h" #include "librbd/journal/TypeTraits.h" #include "types.h" @@ -49,7 +50,6 @@ public: STATE_UNINITIALIZED, STATE_STARTING, STATE_REPLAYING, - STATE_FLUSHING_REPLAY, STATE_STOPPING, STATE_STOPPED, }; @@ -67,7 +67,7 @@ public: }; ImageReplayer(Threads *threads, RadosRef local, RadosRef remote, - const std::string &mirror_uuid, int64_t local_pool_id, + const std::string &mirror_uuid, int64_t local_pool_id, int64_t remote_pool_id, const std::string &remote_image_id, const std::string &global_image_id); virtual ~ImageReplayer(); @@ -88,11 +88,8 @@ public: void print_status(Formatter *f, stringstream *ss); virtual void handle_replay_ready(); - virtual void handle_replay_process_ready(int r); virtual void handle_replay_complete(int r); - virtual void handle_replay_committed(ReplayEntry* replay_entry, int r); - inline int64_t get_remote_pool_id() const { return m_remote_pool_id; } @@ -103,32 +100,56 @@ protected: /** * @verbatim * (error) - * <------------------------ FAIL - * | ^ - * v * - * * - * | * - * v (error) * - * BOOTSTRAP_IMAGE * * * * * * * * * * * * * * - * | * - * v (error) * - * INIT_REMOTE_JOURNALER * * * * * * * * * * * - * | * - * v (error) * - * START_REPLAY * * * * * * * * * * * * * * * * + * <------------------------------------ FAIL + * | ^ + * v * + * * + * | * + * v (error) * + * BOOTSTRAP_IMAGE * * * * * * * * * * * * * * * * * * * * + * | * + * v (error) * + * INIT_REMOTE_JOURNALER * * * * * * * * * * * * * * * * * + * | * + * v (error) * + * START_REPLAY * * * * * * * * * * * * * * * * * * * * * * + * | + * | /--------------------------------------------\ + * | | | + * v v (asok flush) | + * REPLAYING -------------> LOCAL_REPLAY_FLUSH | + * | \ | | + * | | v | + * | | FLUSH_COMMIT_POSITION | + * | | | | + * | | \--------------------/| + * | | | + * | | (entries available) | + * | \-----------> REPLAY_READY | + * | | | + * | | (skip if not | + * | v needed) (error) + * | REPLAY_FLUSH * * * * * * * * * + * | | | * + * | | (skip if not | * + * | v needed) (error) * + * | GET_REMOTE_TAG * * * * * * * * + * | | | * + * | | (skip if not | * + * | v needed) (error) * + * | ALLOCATE_LOCAL_TAG * * * * * * + * | | | * + * | v (error) * + * | PROCESS_ENTRY * * * * * * * * * + * | | | * + * | \---------------------/ * + * v * + * REPLAY_COMPLETE < * * * * * * * * * * * * * * * * * * * * | - * | /-------------------------------------------\ - * | | | - * v v | - * --------------> | - * | | | - * v v | - * LOCAL_REPLAY_FLUSH | - * | | | - * v v | - * JOURNAL_REPLAY_SHUT_DOWN FLUSH_COMMIT_POSITION | - * | | | - * v \-------------------/ + * v + * JOURNAL_REPLAY_SHUT_DOWN + * | + * v * LOCAL_IMAGE_CLOSE * | * v @@ -146,11 +167,12 @@ protected: virtual void on_stop_local_image_close_start(); virtual void on_stop_local_image_close_finish(int r); - virtual void on_flush_local_replay_flush_start(); - virtual void on_flush_local_replay_flush_finish(int r); - virtual void on_flush_flush_commit_position_start(int last_r); - virtual void on_flush_flush_commit_position_finish(int last_r, int r); - virtual bool on_flush_interrupted(); + virtual void on_flush_local_replay_flush_start(Context *on_flush); + virtual void on_flush_local_replay_flush_finish(Context *on_flush, int r); + virtual void on_flush_flush_commit_position_start(Context *on_flush); + virtual void on_flush_flush_commit_position_finish(Context *on_flush, int r); + + bool on_replay_interrupted(); void close_local_image(Context *on_finish); // for tests @@ -171,11 +193,30 @@ private: librbd::journal::Replay *m_local_replay; Journaler* m_remote_journaler; ::journal::ReplayHandler *m_replay_handler; - Context *m_on_finish; + + Context *m_on_start_finish = nullptr; + Context *m_on_stop_finish = nullptr; + bool m_stop_requested = false; + AdminSocketHook *m_asok_hook; librbd::journal::MirrorPeerClientMeta m_client_meta; + ReplayEntry m_replay_entry; + + struct C_ReplayCommitted : public Context { + ImageReplayer *replayer; + ReplayEntry replay_entry; + + C_ReplayCommitted(ImageReplayer *replayer, + ReplayEntry &&replay_entry) + : replayer(replayer), replay_entry(std::move(replay_entry)) { + } + virtual void finish(int r) { + replayer->handle_process_entry_safe(replay_entry, r); + } + }; + static std::string to_string(const State state); State get_state_() const { return m_state; } @@ -192,6 +233,20 @@ private: void handle_init_remote_journaler(int r); void start_replay(); + + void replay_flush(); + void handle_replay_flush(int r); + + void get_remote_tag(); + void handle_get_remote_tag(int r); + + void allocate_local_tag(); + void handle_allocate_local_tag(int r); + + void process_entry(); + void handle_process_entry_ready(int r); + void handle_process_entry_safe(const ReplayEntry& replay_entry, int r); + }; } // namespace mirror