From: Mykola Golub Date: Sat, 18 Mar 2017 17:09:44 +0000 (+0100) Subject: rbd-mirror A/A: separate ImageReplayer handling from Replayer X-Git-Tag: v12.0.2~138^2 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=4931e31ec172312b2c00d9019e7382e48827711e;p=ceph.git rbd-mirror A/A: separate ImageReplayer handling from Replayer Fixes: http://tracker.ceph.com/issues/18785 Signed-off-by: Mykola Golub --- diff --git a/src/tools/rbd_mirror/Replayer.cc b/src/tools/rbd_mirror/Replayer.cc index 953db35f63ed5..f178af50989a9 100644 --- a/src/tools/rbd_mirror/Replayer.cc +++ b/src/tools/rbd_mirror/Replayer.cc @@ -17,6 +17,7 @@ #include "librbd/Utils.h" #include "librbd/Watcher.h" #include "librbd/api/Mirror.h" +#include "InstanceReplayer.h" #include "InstanceWatcher.h" #include "LeaderWatcher.h" #include "Replayer.h" @@ -258,6 +259,9 @@ Replayer::~Replayer() if (m_instance_watcher) { m_instance_watcher->shut_down(); } + if (m_instance_replayer) { + m_instance_replayer->shut_down(); + } assert(!m_pool_watcher); } @@ -297,6 +301,15 @@ int Replayer::init() return r; } + std::string local_mirror_uuid; + r = librbd::cls_client::mirror_uuid_get(&m_local_io_ctx, + &local_mirror_uuid); + if (r < 0) { + derr << "failed to retrieve local mirror uuid from pool " + << m_local_io_ctx.get_pool_name() << ": " << cpp_strerror(r) << dendl; + return r; + } + r = m_remote_rados->ioctx_create(m_local_io_ctx.get_pool_name().c_str(), m_remote_io_ctx); if (r < 0) { @@ -304,10 +317,15 @@ int Replayer::init() << ": " << cpp_strerror(r) << dendl; return r; } - m_remote_pool_id = m_remote_io_ctx.get_id(); dout(20) << "connected to " << m_peer << dendl; + m_instance_replayer.reset( + InstanceReplayer<>::create(m_threads, m_image_deleter, + m_image_sync_throttler, m_local_rados, + local_mirror_uuid, m_local_pool_id)); + m_instance_replayer->init(); + m_leader_watcher.reset(new LeaderWatcher<>(m_threads, m_local_io_ctx, &m_leader_listener)); r = m_leader_watcher->init(); @@ -422,27 +440,7 @@ void Replayer::run() break; } - for (auto image_it = m_image_replayers.begin(); - image_it != m_image_replayers.end(); ) { - if (image_it->second->remote_images_empty()) { - if (stop_image_replayer(image_it->second)) { - image_it = m_image_replayers.erase(image_it); - continue; - } - } else { - start_image_replayer(image_it->second); - } - ++image_it; - } - - m_cond.WaitInterval(m_lock, - utime_t(g_ceph_context->_conf-> - rbd_mirror_image_state_check_interval, 0)); - } - - Mutex::Locker locker(m_lock); - while (!m_image_replayers.empty()) { - stop_image_replayers(); + m_cond.WaitInterval(m_lock, utime_t(1, 0)); } } @@ -471,14 +469,9 @@ void Replayer::print_status(Formatter *f, stringstream *ss) } f->close_section(); } - f->open_array_section("image_replayers"); - for (auto &kv : m_image_replayers) { - auto &image_replayer = kv.second; - image_replayer->print_status(f, ss); - } + m_instance_replayer->print_status(f, ss); - f->close_section(); f->close_section(); f->flush(*ss); } @@ -493,12 +486,7 @@ void Replayer::start() return; } - m_manual_stop = false; - - for (auto &kv : m_image_replayers) { - auto &image_replayer = kv.second; - image_replayer->start(nullptr, true); - } + m_instance_replayer->start(); } void Replayer::stop(bool manual) @@ -514,11 +502,7 @@ void Replayer::stop(bool manual) return; } - m_manual_stop = true; - for (auto &kv : m_image_replayers) { - auto &image_replayer = kv.second; - image_replayer->stop(nullptr, true); - } + m_instance_replayer->stop(); } void Replayer::restart() @@ -531,12 +515,7 @@ void Replayer::restart() return; } - m_manual_stop = false; - - for (auto &kv : m_image_replayers) { - auto &image_replayer = kv.second; - image_replayer->restart(); - } + m_instance_replayer->restart(); } void Replayer::flush() @@ -549,10 +528,7 @@ void Replayer::flush() return; } - for (auto &kv : m_image_replayers) { - auto &image_replayer = kv.second; - image_replayer->flush(); - } + m_instance_replayer->flush(); } void Replayer::release_leader() @@ -582,6 +558,8 @@ void Replayer::handle_update(const std::string &mirror_uuid, return; } + m_instance_replayer->set_peers({{mirror_uuid, m_remote_io_ctx}}); + // first callback will be a full directory -- so see if we need to remove // any local images that no longer exist on the remote side if (!m_init_image_ids.empty()) { @@ -603,193 +581,27 @@ void Replayer::handle_update(const std::string &mirror_uuid, m_init_image_ids.clear(); } - // shut down replayers for non-mirrored images - for (auto &image_id : removed_image_ids) { - auto image_it = m_image_replayers.find(image_id.global_id); - if (image_it != m_image_replayers.end()) { - image_it->second->remove_remote_image(mirror_uuid, image_id.id); - - if (image_it->second->is_running()) { - dout(20) << "stop image replayer for remote image " - << image_id.id << " (" << image_id.global_id << ")" - << dendl; - } - - if (image_it->second->remote_images_empty() && - stop_image_replayer(image_it->second)) { - // no additional remotes registered for this image - m_image_replayers.erase(image_it); - } - } - } - - // prune previously stopped image replayers - for (auto image_it = m_image_replayers.begin(); - image_it != m_image_replayers.end(); ) { - if (image_it->second->remote_images_empty() && - stop_image_replayer(image_it->second)) { - image_it = m_image_replayers.erase(image_it); - } else { - ++image_it; - } - } + m_update_op_tracker.start_op(); + Context *ctx = new FunctionContext([this](int r) { + dout(20) << "complete handle_update: r=" << r << dendl; + m_update_op_tracker.finish_op(); + }); - if (added_image_ids.empty()) { - return; - } + C_Gather *gather_ctx = new C_Gather(g_ceph_context, ctx); - std::string local_mirror_uuid; - int r = librbd::cls_client::mirror_uuid_get(&m_local_io_ctx, - &local_mirror_uuid); - if (r < 0 || local_mirror_uuid.empty()) { - derr << "failed to retrieve local mirror uuid from pool " - << m_local_io_ctx.get_pool_name() << ": " << cpp_strerror(r) << dendl; - return; + for (auto &image_id : removed_image_ids) { + m_instance_replayer->release_image(image_id.global_id, + {{mirror_uuid, image_id.id}}, true, + gather_ctx->new_sub()); } - // start replayers for newly added remote image sources for (auto &image_id : added_image_ids) { - auto it = m_image_replayers.find(image_id.global_id); - if (it == m_image_replayers.end()) { - unique_ptr > image_replayer(new ImageReplayer<>( - m_threads, m_image_deleter, m_image_sync_throttler, m_local_rados, - local_mirror_uuid, m_local_pool_id, image_id.global_id)); - if (m_manual_stop) { - image_replayer->stop(nullptr, true); - } - - it = m_image_replayers.insert( - std::make_pair(image_id.global_id, std::move(image_replayer))).first; - } - - it->second->add_remote_image(mirror_uuid, image_id.id, - m_remote_io_ctx); - if (!it->second->is_running()) { - dout(20) << "starting image replayer for remote image " - << image_id.global_id << dendl; - } - start_image_replayer(it->second); - } -} - -void Replayer::start_image_replayer(unique_ptr > &image_replayer) -{ - assert(m_lock.is_locked()); - if (!image_replayer->is_stopped() || image_replayer->remote_images_empty()) { - return; - } else if (image_replayer->is_blacklisted()) { - derr << "blacklisted detected during image replay" << dendl; - m_blacklisted = true; - m_stopping.set(1); - return; - } - - std::string global_image_id = image_replayer->get_global_image_id(); - dout(20) << "global_image_id=" << global_image_id << dendl; - - FunctionContext *ctx = new FunctionContext( - [this, global_image_id] (int r) { - dout(20) << "image deleter result: r=" << r << ", " - << "global_image_id=" << global_image_id << dendl; - if (r == -ESTALE || r == -ECANCELED) { - return; - } - - Mutex::Locker locker(m_lock); - auto it = m_image_replayers.find(global_image_id); - if (it == m_image_replayers.end()) { - return; - } - - auto &image_replayer = it->second; - if (r >= 0) { - image_replayer->start(); - } else { - start_image_replayer(image_replayer); - } - } - ); - - m_image_deleter->wait_for_scheduled_deletion( - m_local_pool_id, image_replayer->get_global_image_id(), ctx, false); -} - -bool Replayer::stop_image_replayer(unique_ptr > &image_replayer) -{ - assert(m_lock.is_locked()); - dout(20) << "global_image_id=" << image_replayer->get_global_image_id() - << dendl; - - // TODO: check how long it is stopping and alert if it is too long. - if (image_replayer->is_stopped()) { - m_image_deleter->cancel_waiter(m_local_pool_id, - image_replayer->get_global_image_id()); - - if (!m_stopping.read() && m_leader_watcher->is_leader()) { - dout(20) << "scheduling delete" << dendl; - m_image_deleter->schedule_image_delete( - m_local_rados, - image_replayer->get_local_pool_id(), - image_replayer->get_local_image_id(), - image_replayer->get_global_image_id()); - } - return true; - } else { - if (!m_stopping.read()) { - dout(20) << "scheduling delete after image replayer stopped" << dendl; - } - FunctionContext *ctx = new FunctionContext( - [&image_replayer, this] (int r) { - if (!m_stopping.read() && m_leader_watcher->is_leader() && r >= 0) { - m_image_deleter->schedule_image_delete( - m_local_rados, - image_replayer->get_local_pool_id(), - image_replayer->get_local_image_id(), - image_replayer->get_global_image_id()); - } - } - ); - image_replayer->stop(ctx); - } - - return false; -} - -void Replayer::stop_image_replayers() { - dout(20) << dendl; - - assert(m_lock.is_locked()); - for (auto image_it = m_image_replayers.begin(); - image_it != m_image_replayers.end();) { - if (stop_image_replayer(image_it->second)) { - image_it = m_image_replayers.erase(image_it); - continue; - } - ++image_it; - } -} - -void Replayer::stop_image_replayers(Context *on_finish) { - dout(20) << dendl; - - { - Mutex::Locker locker(m_lock); - stop_image_replayers(); - - if (!m_image_replayers.empty()) { - Context *ctx = new FunctionContext([this, on_finish](int r) { - assert(r == 0); - stop_image_replayers(on_finish); - }); - ctx = create_async_context_callback(m_threads->work_queue, ctx); - - Mutex::Locker timer_locker(m_threads->timer_lock); - m_threads->timer->add_event_after(1, ctx); - return; - } + m_instance_replayer->acquire_image(image_id.global_id, + {{mirror_uuid, image_id.id}}, + gather_ctx->new_sub()); } - on_finish->complete(0); + gather_ctx->activate(); } void Replayer::handle_post_acquire_leader(Context *on_finish) { @@ -871,8 +683,29 @@ void Replayer::handle_shut_down_pool_watcher(int r, Context *on_finish) { assert(m_pool_watcher); m_pool_watcher.reset(); } + wait_for_update_ops(on_finish); +} - stop_image_replayers(on_finish); +void Replayer::wait_for_update_ops(Context *on_finish) { + dout(20) << dendl; + + Mutex::Locker locker(m_lock); + + Context *ctx = new FunctionContext([this, on_finish](int r) { + handle_wait_for_update_ops(r, on_finish); + }); + ctx = create_async_context_callback(m_threads->work_queue, ctx); + + m_update_op_tracker.wait_for_ops(ctx); +} + +void Replayer::handle_wait_for_update_ops(int r, Context *on_finish) { + dout(20) << "r=" << r << dendl; + + assert(r == 0); + + Mutex::Locker locker(m_lock); + m_instance_replayer->release_all(on_finish); } } // namespace mirror diff --git a/src/tools/rbd_mirror/Replayer.h b/src/tools/rbd_mirror/Replayer.h index 6eb3753aa3b96..b6649fb89a51a 100644 --- a/src/tools/rbd_mirror/Replayer.h +++ b/src/tools/rbd_mirror/Replayer.h @@ -9,6 +9,7 @@ #include #include +#include "common/AsyncOpTracker.h" #include "common/Cond.h" #include "common/Mutex.h" #include "common/WorkQueue.h" @@ -16,7 +17,6 @@ #include "include/rados/librados.hpp" #include "ClusterWatcher.h" -#include "ImageReplayer.h" #include "LeaderWatcher.h" #include "PoolWatcher.h" #include "ImageDeleter.h" @@ -30,6 +30,7 @@ namespace rbd { namespace mirror { template struct Threads; +template class InstanceReplayer; template class InstanceWatcher; /** @@ -79,11 +80,6 @@ private: const ImageIds &added_image_ids, const ImageIds &removed_image_ids); - void start_image_replayer(unique_ptr > &image_replayer); - bool stop_image_replayer(unique_ptr > &image_replayer); - void stop_image_replayers(); - void stop_image_replayers(Context *on_finish); - int init_rados(const std::string &cluster_name, const std::string &client_name, const std::string &description, RadosRef *rados_ref); @@ -99,6 +95,9 @@ private: void shut_down_pool_watcher(Context *on_finish); void handle_shut_down_pool_watcher(int r, Context *on_finish); + void wait_for_update_ops(Context *on_finish); + void handle_wait_for_update_ops(int r, Context *on_finish); + Threads *m_threads; std::shared_ptr m_image_deleter; ImageSyncThrottlerRef<> m_image_sync_throttler; @@ -117,12 +116,11 @@ private: librados::IoCtx m_remote_io_ctx; int64_t m_local_pool_id = -1; - int64_t m_remote_pool_id = -1; PoolWatcherListener m_pool_watcher_listener; std::unique_ptr > m_pool_watcher; - std::map > > m_image_replayers; + std::unique_ptr> m_instance_replayer; std::string m_asok_hook_name; AdminSocketHook *m_asok_hook; @@ -159,6 +157,7 @@ private: std::unique_ptr > m_leader_watcher; std::unique_ptr > m_instance_watcher; + AsyncOpTracker m_update_op_tracker; }; } // namespace mirror