From: Mykola Golub Date: Tue, 23 Feb 2016 07:02:00 +0000 (+0200) Subject: rbd-mirror: ImageReplayer async start/stop X-Git-Tag: v10.1.0~133^2^2~3 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=b61692ba74c27054f1d255830cc3d54398df424a;p=ceph.git rbd-mirror: ImageReplayer async start/stop Signed-off-by: Mykola Golub --- diff --git a/src/test/rbd_mirror/image_replay.cc b/src/test/rbd_mirror/image_replay.cc index 66fcd610856..87d449717a9 100644 --- a/src/test/rbd_mirror/image_replay.cc +++ b/src/test/rbd_mirror/image_replay.cc @@ -132,6 +132,8 @@ int main(int argc, const char **argv) rbd::mirror::RadosRef remote(new librados::Rados()); rbd::mirror::Threads *threads = nullptr; + C_SaferCond start_cond, stop_cond; + int r = local->init_with_context(g_ceph_context); if (r < 0) { derr << "could not initialize rados handle" << dendl; @@ -187,7 +189,8 @@ int main(int argc, const char **argv) local_pool_id, remote_pool_id, remote_image_id); - r = replayer->start(&bootstap_params); + replayer->start(&start_cond, &bootstap_params); + r = start_cond.wait(); if (r < 0) { derr << "failed to start: " << cpp_strerror(r) << dendl; goto cleanup; @@ -201,7 +204,9 @@ int main(int argc, const char **argv) dout(1) << "termination signal received, stopping replay" << dendl; - replayer->stop(); + replayer->stop(&stop_cond); + r = stop_cond.wait(); + assert(r == 0); dout(1) << "shutdown" << dendl; diff --git a/src/test/rbd_mirror/test_ImageReplayer.cc b/src/test/rbd_mirror/test_ImageReplayer.cc index 3ab535c55e3..c0bdef56195 100644 --- a/src/test/rbd_mirror/test_ImageReplayer.cc +++ b/src/test/rbd_mirror/test_ImageReplayer.cc @@ -101,7 +101,6 @@ public: m_threads = new rbd::mirror::Threads(reinterpret_cast( m_local_ioctx.cct())); - m_replayer = new rbd::mirror::ImageReplayer( m_threads, rbd::mirror::RadosRef(new librados::Rados(m_local_ioctx)), @@ -123,7 +122,9 @@ public: void start(rbd::mirror::ImageReplayer::BootstrapParams *bootstap_params = nullptr) { - ASSERT_EQ(0, m_replayer->start(bootstap_params)); + C_SaferCond cond; + m_replayer->start(&cond, bootstap_params); + ASSERT_EQ(0, cond.wait()); ASSERT_EQ(0U, m_watch_handle); std::string oid = ::journal::Journaler::header_oid(m_remote_image_id); @@ -140,7 +141,9 @@ public: m_watch_handle = 0; } - m_replayer->stop(); + C_SaferCond cond; + m_replayer->stop(&cond); + ASSERT_EQ(0, cond.wait()); } void bootstrap() diff --git a/src/tools/rbd_mirror/ImageReplayer.cc b/src/tools/rbd_mirror/ImageReplayer.cc index 0afea699fd9..3148b6dc05e 100644 --- a/src/tools/rbd_mirror/ImageReplayer.cc +++ b/src/tools/rbd_mirror/ImageReplayer.cc @@ -65,6 +65,30 @@ struct C_ReplayCommitted : public Context { } }; +class BootstrapThread : public Thread { +public: + explicit BootstrapThread(ImageReplayer *replayer, + const ImageReplayer::BootstrapParams ¶ms, + Context *on_finish) + : replayer(replayer), params(params), on_finish(on_finish) {} + + virtual ~BootstrapThread() {} + +protected: + void *entry() + { + int r = replayer->bootstrap(params); + on_finish->complete(r); + delete this; + return NULL; + } + +private: + ImageReplayer *replayer; + ImageReplayer::BootstrapParams params; + Context *on_finish; +}; + class ImageReplayerAdminSocketCommand { public: virtual ~ImageReplayerAdminSocketCommand() {} @@ -179,7 +203,8 @@ ImageReplayer::ImageReplayer(Threads *threads, RadosRef local, RadosRef remote, m_local_image_ctx(nullptr), m_local_replay(nullptr), m_remote_journaler(nullptr), - m_replay_handler(nullptr) + m_replay_handler(nullptr), + m_on_finish(nullptr) { CephContext *cct = static_cast(m_local->cct()); @@ -198,110 +223,298 @@ ImageReplayer::~ImageReplayer() delete m_asok_hook; } -int ImageReplayer::start(const BootstrapParams *bootstrap_params) +void ImageReplayer::start(Context *on_finish, + const BootstrapParams *bootstrap_params) { - // TODO: make async - - dout(20) << "enter" << dendl; + dout(20) << "on_finish=" << on_finish << ", m_on_finish=" << m_on_finish + << dendl; { Mutex::Locker locker(m_lock); - assert(m_state == STATE_UNINITIALIZED || m_state == STATE_STOPPED); + assert(is_stopped_()); m_state = STATE_STARTING; - } - std::string remote_journal_id = m_remote_image_id; - std::string image_name = ""; - C_SaferCond cond, lock_ctx; - double commit_interval; - bool registered; - int r = 0; + assert(m_on_finish == nullptr); + m_on_finish = on_finish; + } - r = m_remote->ioctx_create2(m_remote_pool_id, m_remote_ioctx); + int r = m_remote->ioctx_create2(m_remote_pool_id, m_remote_ioctx); if (r < 0) { derr << "error opening ioctx for remote pool " << m_remote_pool_id << ": " << cpp_strerror(r) << dendl; - return r; + on_start_fail_start(r); + return; } CephContext *cct = static_cast(m_local->cct()); - commit_interval = cct->_conf->rbd_journal_commit_age; - bool remote_journaler_initialized = false; + double commit_interval = cct->_conf->rbd_journal_commit_age; m_remote_journaler = new ::journal::Journaler(m_threads->work_queue, - m_threads->timer, - &m_threads->timer_lock, - m_remote_ioctx, - remote_journal_id, - m_client_id, commit_interval); - r = get_registered_client_status(®istered); + m_threads->timer, + &m_threads->timer_lock, + m_remote_ioctx, + m_remote_image_id, m_client_id, + commit_interval); + + on_start_get_registered_client_status_start(bootstrap_params); +} + +void ImageReplayer::on_start_get_registered_client_status_start( + const BootstrapParams *bootstrap_params) +{ + dout(20) << "enter" << dendl; + + struct Metadata { + uint64_t minimum_set; + uint64_t active_set; + std::set registered_clients; + BootstrapParams bootstrap_params; + } *m = new Metadata(); + + if (bootstrap_params) { + m->bootstrap_params = *bootstrap_params; + } + + FunctionContext *ctx = new FunctionContext( + [this, m, bootstrap_params](int r) { + on_start_get_registered_client_status_finish(r, m->registered_clients, + m->bootstrap_params); + delete m; + }); + + m_remote_journaler->get_mutable_metadata(&m->minimum_set, &m->active_set, + &m->registered_clients, ctx); +} + +void ImageReplayer::on_start_get_registered_client_status_finish(int r, + const std::set ®istered_clients, + const BootstrapParams &bootstrap_params) +{ + dout(20) << "r=" << r << dendl; + if (r < 0) { derr << "error obtaining registered client status: " << cpp_strerror(r) << dendl; - goto fail; + on_start_fail_start(r); + return; + } + if (on_start_interrupted()) { + return; } - if (registered) { - if (bootstrap_params) { - dout(0) << "ignoring bootsrap params: client already registered" << dendl; - } - } else { - r = bootstrap(bootstrap_params); - if (r < 0) { - derr << "bootstrap failed: " << cpp_strerror(r) << dendl; - goto fail; + for (auto c : registered_clients) { + if (c.id == m_client_id) { + librbd::journal::ClientData client_data; + bufferlist::iterator bl = c.data.begin(); + try { + ::decode(client_data, bl); + } catch (const buffer::error &err) { + derr << "failed to decode client meta data: " << err.what() << dendl; + on_start_fail_start(-EINVAL); + return; + } + librbd::journal::MirrorPeerClientMeta &cm = + boost::get(client_data.client_meta); + m_local_image_id = cm.image_id; + + // TODO: snap name should be transient + if (cm.sync_points.empty()) { + derr << "sync points not found" << dendl; + on_start_fail_start(-ENOENT); + return; + } + m_snap_name = cm.sync_points.front().snap_name; + + dout(20) << "client found, pool_id=" << m_local_pool_id << ", image_id=" + << m_local_image_id << ", snap_name=" << m_snap_name << dendl; + + if (!bootstrap_params.empty()) { + dout(0) << "ignoring bootsrap params: client already registered" << dendl; + } + + on_start_bootstrap_finish(0); + return; } } - m_remote_journaler->init(&cond); - r = cond.wait(); + dout(20) << "client not found" << dendl; + + on_start_bootstrap_start(bootstrap_params); +} + +void ImageReplayer::on_start_bootstrap_start(const BootstrapParams ¶ms) +{ + dout(20) << "enter" << dendl; + + FunctionContext *ctx = new FunctionContext( + [this](int r) { + on_start_bootstrap_finish(r); + }); + + BootstrapThread *thread = new BootstrapThread(this, params, ctx); + + thread->create("bootstrap"); + thread->detach(); + // TODO: As the bootstrap might take long time it needs some control + // to get current status, interrupt, etc... +} + +void ImageReplayer::on_start_bootstrap_finish(int r) +{ + dout(20) << "r=" << r << dendl; + + if (r < 0) { + on_start_fail_start(r); + return; + } + if (on_start_interrupted()) { + return; + } + + on_start_remote_journaler_init_start(); +} + +void ImageReplayer::on_start_remote_journaler_init_start() +{ + dout(20) << "enter" << dendl; + + FunctionContext *ctx = new FunctionContext( + [this](int r) { + on_start_remote_journaler_init_finish(r); + }); + + m_remote_journaler->init(ctx); +} + +void ImageReplayer::on_start_remote_journaler_init_finish(int r) +{ + dout(20) << "r=" << r << dendl; + if (r < 0) { derr << "error initializing journal: " << cpp_strerror(r) << dendl; - goto fail; + on_start_fail_start(r); + return; + } + if (on_start_interrupted()) { + return; } - remote_journaler_initialized = true; r = m_local->ioctx_create2(m_local_pool_id, m_local_ioctx); if (r < 0) { derr << "error opening ioctx for local pool " << m_local_pool_id << ": " << cpp_strerror(r) << dendl; - goto fail; + on_start_fail_start(r); + return; } + on_start_local_image_open_start(); +} + +void ImageReplayer::on_start_local_image_open_start() +{ + dout(20) << "enter" << dendl; + m_local_image_ctx = new librbd::ImageCtx("", m_local_image_id, NULL, m_local_ioctx, false); - r = m_local_image_ctx->state->open(); + + FunctionContext *ctx = new FunctionContext( + [this](int r) { + on_start_local_image_open_finish(r); + }); + m_local_image_ctx->state->open(ctx); +} + +void ImageReplayer::on_start_local_image_open_finish(int r) +{ + dout(20) << "r=" << r << dendl; + if (r < 0) { derr << "error opening local image " << m_local_image_id << ": " << cpp_strerror(r) << dendl; - delete m_local_image_ctx; - m_local_image_ctx = nullptr; - goto fail; - } - { - RWLock::WLocker owner_locker(m_local_image_ctx->owner_lock); - m_local_image_ctx->exclusive_lock->request_lock(&lock_ctx); + FunctionContext *ctx = new FunctionContext( + [this, r](int r1) { + assert(r1 == 0); + delete m_local_image_ctx; + m_local_image_ctx = nullptr; + on_start_fail_start(r); + }); + + m_threads->work_queue->queue(ctx, 0); + return; } - r = lock_ctx.wait(); + if (on_start_interrupted()) { + return; + } + + on_start_local_image_lock_start(); +} + +void ImageReplayer::on_start_local_image_lock_start() +{ + dout(20) << "enter" << dendl; + + RWLock::WLocker owner_locker(m_local_image_ctx->owner_lock); + FunctionContext *ctx = new FunctionContext( + [this](int r) { + on_start_local_image_lock_finish(r); + }); + m_local_image_ctx->exclusive_lock->request_lock(ctx); +} + +void ImageReplayer::on_start_local_image_lock_finish(int r) +{ + dout(20) << "r=" << r << dendl; + if (r < 0) { derr << "error to lock exclusively local image " << m_local_image_id << ": " << cpp_strerror(r) << dendl; - goto fail; + on_start_fail_start(r); + return; } - if (m_local_image_ctx->journal == nullptr) { - derr << "journaling is not enabled on local image " << m_local_image_id - << ": " << cpp_strerror(r) << dendl; - goto fail; + on_start_fail_start(-EINVAL); + return; + } + if (on_start_interrupted()) { + return; + } + + on_start_wait_for_local_journal_ready_start(); +} + +void ImageReplayer::on_start_wait_for_local_journal_ready_start() +{ + dout(20) << "enter" << dendl; + + FunctionContext *ctx = new FunctionContext( + [this](int r) { + on_start_wait_for_local_journal_ready_finish(r); + }); + m_local_image_ctx->journal->wait_for_journal_ready(ctx); +} + +void ImageReplayer::on_start_wait_for_local_journal_ready_finish(int r) +{ + dout(20) << "r=" << r << dendl; + + if (r < 0) { + derr << "error when waiting for local journal ready: " << cpp_strerror(r) + << dendl; + on_start_fail_start(r); + return; + } + if (on_start_interrupted()) { + return; } r = m_local_image_ctx->journal->start_external_replay(&m_local_replay); if (r < 0) { derr << "error starting external replay on local image " << m_local_image_id << ": " << cpp_strerror(r) << dendl; - goto fail; + on_start_fail_start(r); + return; } m_replay_handler = new ReplayHandler(this); @@ -311,21 +524,51 @@ int ImageReplayer::start(const BootstrapParams *bootstrap_params) dout(20) << "m_remote_journaler=" << *m_remote_journaler << dendl; + assert(r == 0); + + Context *on_finish(nullptr); + { Mutex::Locker locker(m_lock); - assert(m_state == STATE_STARTING); + if (m_state == STATE_STOPPING) { + on_start_fail_start(-EINTR); + return; + } + + assert(m_state == STATE_STARTING); m_state = STATE_REPLAYING; + + std::swap(m_on_finish, on_finish); } - return 0; + dout(20) << "start succeeded" << dendl; + + if (on_finish) { + dout(20) << "on finish complete, r=" << r << dendl; + on_finish->complete(r); + } +} -fail: - dout(20) << "fail, r=" << r << dendl; +void ImageReplayer::on_start_fail_start(int r) +{ + dout(20) << "r=" << r << dendl; + + FunctionContext *ctx = new FunctionContext( + [this, r](int r1) { + assert(r1 == 0); + on_start_fail_finish(r); + }); + + m_threads->work_queue->queue(ctx, 0); +} + +void ImageReplayer::on_start_fail_finish(int r) +{ + dout(20) << "r=" << r << dendl; if (m_remote_journaler) { - if (remote_journaler_initialized) { - m_remote_journaler->stop_replay(); + if (m_remote_journaler->is_initialized()) { m_remote_journaler->shut_down(); } delete m_remote_journaler; @@ -333,7 +576,6 @@ fail: } if (m_local_replay) { - Mutex::Locker locker(m_lock); shut_down_journal_replay(true); m_local_image_ctx->journal->stop_external_replay(); m_local_replay = nullptr; @@ -357,34 +599,137 @@ fail: m_local_ioctx.close(); m_remote_ioctx.close(); + Context *on_finish(nullptr); + { Mutex::Locker locker(m_lock); - assert(m_state == STATE_STARTING); + if (m_state == STATE_STOPPING) { + assert(r == -EINTR); + dout(20) << "start interrupted" << dendl; + m_state = STATE_STOPPED; + } else { + assert(m_state == STATE_STARTING); + dout(20) << "start failed" << dendl; + m_state = STATE_UNINITIALIZED; + } + std::swap(m_on_finish, on_finish); + } - m_state = STATE_UNINITIALIZED; + if (on_finish) { + dout(20) << "on finish complete, r=" << r << dendl; + on_finish->complete(r); } +} - return r; +bool ImageReplayer::on_start_interrupted() +{ + Mutex::Locker locker(m_lock); + + if (m_state == STATE_STARTING) { + return false; + } + + assert(m_state == STATE_STOPPING); + + on_start_fail_start(-EINTR); + return true; } -void ImageReplayer::stop() +void ImageReplayer::stop(Context *on_finish) { - dout(20) << "enter" << dendl; + dout(20) << "on_finish=" << on_finish << ", m_on_finish=" << m_on_finish + << dendl; - { - Mutex::Locker locker(m_lock); - assert(m_state == STATE_REPLAYING); + Mutex::Locker locker(m_lock); + assert(is_running_()); + + if (m_state == STATE_STARTING) { + dout(20) << "interrupting start" << dendl; - m_state = STATE_STOPPING; + if (on_finish) { + Context *on_start_finish = m_on_finish; + FunctionContext *ctx = new FunctionContext( + [this, on_start_finish, on_finish](int r) { + if (on_start_finish) { + on_start_finish->complete(r); + } + on_finish->complete(0); + }); + + m_on_finish = ctx; + } + } else { + assert(m_on_finish == nullptr); + m_on_finish = on_finish; + on_stop_journal_replay_shut_down_start(); } + m_state = STATE_STOPPING; +} - shut_down_journal_replay(false); +void ImageReplayer::on_stop_journal_replay_shut_down_start() +{ + dout(20) << "enter" << dendl; + + FunctionContext *ctx = new FunctionContext( + [this](int r) { + on_stop_journal_replay_shut_down_finish(r); + }); + + m_local_replay->shut_down(false, ctx); +} + +void ImageReplayer::on_stop_journal_replay_shut_down_finish(int r) +{ + dout(20) << "r=" << r << dendl; + + if (r < 0) { + derr << "error flushing journal replay: " << cpp_strerror(r) << dendl; + } m_local_image_ctx->journal->stop_external_replay(); m_local_replay = nullptr; - m_local_image_ctx->state->close(); - m_local_image_ctx = nullptr; + on_stop_local_image_close_start(); +} + +void ImageReplayer::on_stop_local_image_close_start() +{ + dout(20) << "enter" << dendl; + + FunctionContext *ctx = new FunctionContext( + [this](int r) { + on_stop_local_image_close_finish(r); + }); + + m_local_image_ctx->state->close(ctx); +} + +void ImageReplayer::on_stop_local_image_close_finish(int r) +{ + dout(20) << "r=" << r << dendl; + + if (r < 0) { + derr << "error closing local image: " << cpp_strerror(r) << dendl; + } + + on_stop_local_image_delete_start(); +} + +void ImageReplayer::on_stop_local_image_delete_start() +{ + FunctionContext *ctx = new FunctionContext( + [this](int r) { + delete m_local_image_ctx; + m_local_image_ctx = nullptr; + on_stop_local_image_delete_finish(r); + }); + + m_threads->work_queue->queue(ctx, 0); +} + +void ImageReplayer::on_stop_local_image_delete_finish(int r) +{ + assert(r == 0); m_local_ioctx.close(); @@ -398,14 +743,47 @@ void ImageReplayer::stop() m_remote_ioctx.close(); + Context *on_finish(nullptr); + { Mutex::Locker locker(m_lock); assert(m_state == STATE_STOPPING); m_state = STATE_STOPPED; + + std::swap(m_on_finish, on_finish); } - dout(20) << "done" << dendl; + dout(20) << "stop complete" << dendl; + + if (on_finish) { + dout(20) << "on finish complete, r=" << r << dendl; + on_finish->complete(r); + } +} + +void ImageReplayer::close_local_image(Context *on_finish) +{ + m_local_image_ctx->state->close(on_finish); +} + +void ImageReplayer::handle_replay_ready() +{ + dout(20) << "enter" << dendl; + + ::journal::ReplayEntry replay_entry; + if (!m_remote_journaler->try_pop_front(&replay_entry)) { + return; + } + + dout(20) << "processing entry tid=" << replay_entry.get_commit_tid() << dendl; + + bufferlist data = replay_entry.get_data(); + bufferlist::iterator it = data.begin(); + Context *on_ready = create_context_callback< + ImageReplayer, &ImageReplayer::handle_replay_process_ready>(this); + Context *on_commit = new C_ReplayCommitted(this, std::move(replay_entry)); + m_local_replay->process(&it, on_ready, on_commit); } int ImageReplayer::flush() @@ -451,25 +829,6 @@ int ImageReplayer::flush() return r < 0 ? r : r1; } -void ImageReplayer::handle_replay_ready() -{ - dout(20) << "enter" << dendl; - - ::journal::ReplayEntry replay_entry; - if (!m_remote_journaler->try_pop_front(&replay_entry)) { - return; - } - - dout(20) << "processing entry tid=" << replay_entry.get_commit_tid() << dendl; - - bufferlist data = replay_entry.get_data(); - bufferlist::iterator it = data.begin(); - Context *on_ready = create_context_callback< - ImageReplayer, &ImageReplayer::handle_replay_process_ready>(this); - Context *on_commit = new C_ReplayCommitted(this, std::move(replay_entry)); - m_local_replay->process(&it, on_ready, on_commit); -} - void ImageReplayer::handle_replay_process_ready(int r) { // journal::Replay is ready for more events -- attempt to pop another @@ -502,60 +861,10 @@ void ImageReplayer::handle_replay_committed( m_remote_journaler->committed(*replay_entry); } -int ImageReplayer::get_registered_client_status(bool *registered) -{ - dout(20) << "enter" << dendl; - - uint64_t minimum_set; - uint64_t active_set; - std::set registered_clients; - C_SaferCond cond; - m_remote_journaler->get_mutable_metadata(&minimum_set, &active_set, - ®istered_clients, &cond); - int r = cond.wait(); - if (r < 0) { - derr << "error retrieving remote journal registered clients: " - << cpp_strerror(r) << dendl; - return r; - } - - for (auto c : registered_clients) { - if (c.id == m_client_id) { - *registered = true; - librbd::journal::ClientData client_data; - bufferlist::iterator bl = c.data.begin(); - try { - ::decode(client_data, bl); - } catch (const buffer::error &err) { - derr << "failed to decode client meta data: " << err.what() << dendl; - return -EINVAL; - } - librbd::journal::MirrorPeerClientMeta &cm = - boost::get(client_data.client_meta); - m_local_image_id = cm.image_id; - - // TODO: snap name should be transient - if (cm.sync_points.empty()) { - return -ENOENT; - } - m_snap_name = cm.sync_points.front().snap_name; - - dout(20) << "client found, pool_id=" << m_local_pool_id << ", image_id=" - << m_local_image_id << ", snap_name=" << m_snap_name << dendl; - return 0; - } - } - - dout(20) << "client not found" << dendl; - - *registered = false; - return 0; -} - int ImageReplayer::register_client() { // TODO allocate snap as part of sync process - std::string m_snap_name = ".rbd-mirror." + m_client_id; + m_snap_name = ".rbd-mirror." + m_client_id; dout(20) << "mirror_uuid=" << m_client_id << ", " << "image_id=" << m_local_image_id << ", " @@ -589,7 +898,7 @@ int ImageReplayer::get_bootrstap_params(BootstrapParams *params) return 0; } -int ImageReplayer::bootstrap(const BootstrapParams *bootstrap_params) +int ImageReplayer::bootstrap(const BootstrapParams &bootstrap_params) { // Register client and sync images @@ -598,13 +907,13 @@ int ImageReplayer::bootstrap(const BootstrapParams *bootstrap_params) int r; BootstrapParams params; - if (bootstrap_params) { + if (!bootstrap_params.empty()) { dout(20) << "using external bootstrap params" << dendl; - params = *bootstrap_params; + params = bootstrap_params; } else { r = get_bootrstap_params(¶ms); if (r < 0) { - derr << "error obtaining bootrstap parameters: " + derr << "error obtaining bootstrap parameters: " << cpp_strerror(r) << dendl; return r; } diff --git a/src/tools/rbd_mirror/ImageReplayer.h b/src/tools/rbd_mirror/ImageReplayer.h index 7e9a7e3b216..f3c91768b77 100644 --- a/src/tools/rbd_mirror/ImageReplayer.h +++ b/src/tools/rbd_mirror/ImageReplayer.h @@ -11,10 +11,9 @@ #include "common/Mutex.h" #include "common/WorkQueue.h" #include "include/rados/librados.hpp" +#include "cls/journal/cls_journal_types.h" #include "types.h" -class ContextWQ; - namespace journal { class Journaler; @@ -64,6 +63,10 @@ public: const std::string local_image_name) : local_pool_name(local_pool_name), local_image_name(local_image_name) {} + + bool empty() const { + return local_pool_name.empty() && local_image_name.empty(); + } }; public: @@ -74,24 +77,107 @@ public: ImageReplayer(const ImageReplayer&) = delete; ImageReplayer& operator=(const ImageReplayer&) = delete; - State get_state() { return m_state; } - - int start(const BootstrapParams *bootstrap_params = nullptr); - void stop(); + State get_state() { Mutex::Locker l(m_lock); return get_state_(); } + bool is_stopped() { Mutex::Locker l(m_lock); return is_stopped_(); } + bool is_running() { Mutex::Locker l(m_lock); return is_running_(); } + void start(Context *on_finish = nullptr, + const BootstrapParams *bootstrap_params = nullptr); + void stop(Context *on_finish = nullptr); int flush(); + int bootstrap(const BootstrapParams &bootstrap_params); + virtual void handle_replay_ready(); virtual void handle_replay_process_ready(int r); virtual void handle_replay_complete(int r); virtual void handle_replay_committed(::journal::ReplayEntry* replay_entry, int r); +protected: + /** + * @verbatim + * (error) + * <------------------- FAIL + * | ^ + * v | + * | + * | | + * v (error) | + * GET_REGISTERED_CLIENT_STATUS -------->| + * | | + * v (error)| + * BOOTSTRAP (skip if not needed) ------>| + * | | + * v (error) | + * REMOTE_JOURNALER_INIT --------------->| + * | | + * v (error) | + * LOCAL_IMAGE_OPEN -------------------->| + * | | + * v (error) | + * LOCAL_IMAGE_LOCK -------------------->| + * | | + * v (error) | + * WAIT_FOR_LOCAL_JOURNAL_READY -------->/ + * | + * v + * + * | + * v + * + * | + * v + * JOURNAL_REPLAY_SHUT_DOWN + * | + * v + * LOCAL_IMAGE_CLOSE + * | + * v + * LOCAL_IMAGE_DELETE + * | + * v + * + * + * @endverbatim + */ + + virtual void on_start_get_registered_client_status_start( + const BootstrapParams *bootstrap_params); + virtual void on_start_get_registered_client_status_finish(int r, + const std::set ®istered_clients, + const BootstrapParams &bootstrap_params); + virtual void on_start_bootstrap_start(const BootstrapParams ¶ms); + virtual void on_start_bootstrap_finish(int r); + virtual void on_start_remote_journaler_init_start(); + virtual void on_start_remote_journaler_init_finish(int r); + virtual void on_start_local_image_open_start(); + virtual void on_start_local_image_open_finish(int r); + virtual void on_start_local_image_lock_start(); + virtual void on_start_local_image_lock_finish(int r); + virtual void on_start_wait_for_local_journal_ready_start(); + virtual void on_start_wait_for_local_journal_ready_finish(int r); + virtual void on_start_fail_start(int r); + virtual void on_start_fail_finish(int r); + virtual bool on_start_interrupted(); + + virtual void on_stop_journal_replay_shut_down_start(); + virtual void on_stop_journal_replay_shut_down_finish(int r); + virtual void on_stop_local_image_close_start(); + virtual void on_stop_local_image_close_finish(int r); + virtual void on_stop_local_image_delete_start(); + virtual void on_stop_local_image_delete_finish(int r); + + void close_local_image(Context *on_finish); // for tests + private: - int get_registered_client_status(bool *registered); - int register_client(); + State get_state_() const { return m_state; } + bool is_stopped_() const { return m_state == STATE_UNINITIALIZED || + m_state == STATE_STOPPED; } + bool is_running_() const { return !is_stopped_() && m_state != STATE_STOPPING; } + int get_bootrstap_params(BootstrapParams *params); - int bootstrap(const BootstrapParams *bootstrap_params); + int register_client(); int create_local_image(const BootstrapParams &bootstrap_params); int get_image_id(librados::IoCtx &ioctx, const std::string &image_name, std::string *image_id); @@ -108,6 +194,7 @@ private: int64_t m_remote_pool_id, m_local_pool_id; std::string m_remote_image_id, m_local_image_id; std::string m_snap_name; + ContextWQ *m_work_queue; Mutex m_lock; State m_state; std::string m_local_pool_name, m_remote_pool_name; @@ -116,6 +203,7 @@ private: librbd::journal::Replay *m_local_replay; ::journal::Journaler *m_remote_journaler; ::journal::ReplayHandler *m_replay_handler; + Context *m_on_finish; ImageReplayerAdminSocketHook *m_asok_hook; }; diff --git a/src/tools/rbd_mirror/Replayer.cc b/src/tools/rbd_mirror/Replayer.cc index ba043b37ec6..f40127b6e89 100644 --- a/src/tools/rbd_mirror/Replayer.cc +++ b/src/tools/rbd_mirror/Replayer.cc @@ -99,6 +99,17 @@ void Replayer::run() set_sources(m_pool_watcher->get_images()); m_cond.WaitInterval(g_ceph_context, m_lock, seconds(30)); } + + // Stopping + map > empty_sources; + while (true) { + Mutex::Locker l(m_lock); + set_sources(empty_sources); + if (m_images.empty()) { + break; + } + m_cond.WaitInterval(g_ceph_context, m_lock, seconds(1)); + } } void Replayer::set_sources(const map > &images) @@ -106,19 +117,28 @@ void Replayer::set_sources(const map > &images) dout(20) << "enter" << dendl; assert(m_lock.is_locked()); - // TODO: make stopping and starting ImageReplayers async for (auto it = m_images.begin(); it != m_images.end();) { int64_t pool_id = it->first; auto &pool_images = it->second; if (images.find(pool_id) == images.end()) { - m_images.erase(it++); + for (auto images_it = pool_images.begin(); + images_it != pool_images.end();) { + if (stop_image_replayer(images_it->second)) { + pool_images.erase(images_it++); + } + } + if (pool_images.empty()) { + m_images.erase(it++); + } continue; } for (auto images_it = pool_images.begin(); images_it != pool_images.end();) { if (images.at(pool_id).find(images_it->first) == images.at(pool_id).end()) { - pool_images.erase(images_it++); + if (stop_image_replayer(images_it->second)) { + pool_images.erase(images_it++); + } } else { ++images_it; } @@ -149,23 +169,46 @@ void Replayer::set_sources(const map > &images) // create entry for pool if it doesn't exist auto &pool_replayers = m_images[pool_id]; for (const auto &image_id : kv.second) { - if (pool_replayers.find(image_id) == pool_replayers.end()) { + auto it = pool_replayers.find(image_id); + if (it == pool_replayers.end()) { unique_ptr image_replayer(new ImageReplayer(m_threads, - m_local, + m_local, m_remote, m_client_id, - local_ioctx.get_id(), + local_ioctx.get_id(), pool_id, image_id)); - int r = image_replayer->start(); - if (r < 0) { - continue; - } - pool_replayers.insert(std::make_pair(image_id, std::move(image_replayer))); + it = pool_replayers.insert( + std::make_pair(image_id, std::move(image_replayer))).first; } + start_image_replayer(it->second); } } } +void Replayer::start_image_replayer(unique_ptr &image_replayer) +{ + if (!image_replayer->is_stopped()) { + return; + } + + image_replayer->start(); +} + +bool Replayer::stop_image_replayer(unique_ptr &image_replayer) +{ + if (image_replayer->is_stopped()) { + return true; + } + + if (image_replayer->is_running()) { + image_replayer->stop(); + } else { + // TODO: check how long it is stopping and alert if it is too long. + } + + return false; +} + } // namespace mirror } // namespace rbd diff --git a/src/tools/rbd_mirror/Replayer.h b/src/tools/rbd_mirror/Replayer.h index 03199b6c8fb..44d699e1cbe 100644 --- a/src/tools/rbd_mirror/Replayer.h +++ b/src/tools/rbd_mirror/Replayer.h @@ -42,6 +42,9 @@ public: private: void set_sources(const std::map > &images); + void start_image_replayer(unique_ptr &image_replayer); + bool stop_image_replayer(unique_ptr &image_replayer); + Threads *m_threads; Mutex m_lock; Cond m_cond;