From: Mykola Golub Date: Sun, 15 Jan 2017 18:15:14 +0000 (+0100) Subject: rbd-mirror HA: pool replayer should be started/stopped when lock acquired/released X-Git-Tag: v12.0.0~38^2~4 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=065b513ad7f995c2e8f2211a151b893960362aad;p=ceph.git rbd-mirror HA: pool replayer should be started/stopped when lock acquired/released Fixes: http://tracker.ceph.com/issues/17020 Signed-off-by: Mykola Golub --- diff --git a/src/tools/rbd_mirror/Mirror.cc b/src/tools/rbd_mirror/Mirror.cc index e14f10c3d4d..e9ef90694f1 100644 --- a/src/tools/rbd_mirror/Mirror.cc +++ b/src/tools/rbd_mirror/Mirror.cc @@ -104,6 +104,19 @@ private: Mirror *mirror; }; +class LeaderReleaseCommand : public MirrorAdminSocketCommand { +public: + explicit LeaderReleaseCommand(Mirror *mirror) : mirror(mirror) {} + + bool call(Formatter *f, stringstream *ss) { + mirror->release_leader(); + return true; + } + +private: + Mirror *mirror; +}; + } // anonymous namespace class MirrorAdminSocketHook : public AdminSocketHook { @@ -147,6 +160,13 @@ public: if (r == 0) { commands[command] = new FlushCommand(mirror); } + + command = "rbd mirror leader release"; + r = admin_socket->register_command(command, command, this, + "release rbd mirror leader"); + if (r == 0) { + commands[command] = new LeaderReleaseCommand(mirror); + } } ~MirrorAdminSocketHook() { @@ -356,6 +376,21 @@ void Mirror::flush() } } +void Mirror::release_leader() +{ + dout(20) << "enter" << dendl; + Mutex::Locker l(m_lock); + + if (m_stopping.read()) { + return; + } + + for (auto it = m_replayers.begin(); it != m_replayers.end(); it++) { + auto &replayer = it->second; + replayer->release_leader(); + } +} + void Mirror::update_replayers(const PoolPeers &pool_peers) { dout(20) << "enter" << dendl; diff --git a/src/tools/rbd_mirror/Mirror.h b/src/tools/rbd_mirror/Mirror.h index f7a4d02894d..7c39fe9d6de 100644 --- a/src/tools/rbd_mirror/Mirror.h +++ b/src/tools/rbd_mirror/Mirror.h @@ -45,6 +45,7 @@ public: void stop(); void restart(); void flush(); + void release_leader(); private: typedef ClusterWatcher::PoolPeers PoolPeers; diff --git a/src/tools/rbd_mirror/Replayer.cc b/src/tools/rbd_mirror/Replayer.cc index 5223aa2c29c..2da5d86bfe6 100644 --- a/src/tools/rbd_mirror/Replayer.cc +++ b/src/tools/rbd_mirror/Replayer.cc @@ -13,8 +13,10 @@ #include "include/stringify.h" #include "cls/rbd/cls_rbd_client.h" #include "global/global_context.h" +#include "librbd/Utils.h" #include "librbd/Watcher.h" #include "librbd/internal.h" +#include "LeaderWatcher.h" #include "Replayer.h" #include "Threads.h" @@ -31,6 +33,7 @@ using std::unique_ptr; using std::vector; using librbd::cls_client::dir_get_name; +using librbd::util::create_async_context_callback; namespace rbd { namespace mirror { @@ -108,6 +111,19 @@ private: Replayer *replayer; }; +class LeaderReleaseCommand : public ReplayerAdminSocketCommand { +public: + explicit LeaderReleaseCommand(Replayer *replayer) : replayer(replayer) {} + + bool call(Formatter *f, stringstream *ss) { + replayer->release_leader(); + return true; + } + +private: + Replayer *replayer; +}; + } // anonymous namespace class ReplayerAdminSocketHook : public AdminSocketHook { @@ -152,6 +168,13 @@ public: if (r == 0) { commands[command] = new FlushCommand(replayer); } + + command = "rbd mirror leader release " + name; + r = admin_socket->register_command(command, command, this, + "release rbd mirror leader " + name); + if (r == 0) { + commands[command] = new LeaderReleaseCommand(replayer); + } } ~ReplayerAdminSocketHook() { @@ -181,55 +204,6 @@ private: Commands commands; }; -class MirrorStatusWatchCtx { -public: - MirrorStatusWatchCtx(librados::IoCtx &ioctx, ContextWQ *work_queue) { - m_ioctx.dup(ioctx); - m_watcher = new Watcher(m_ioctx, work_queue); - } - - ~MirrorStatusWatchCtx() { - delete m_watcher; - } - - int register_watch() { - C_SaferCond cond; - m_watcher->register_watch(&cond); - return cond.wait(); - } - - int unregister_watch() { - C_SaferCond cond; - m_watcher->unregister_watch(&cond); - return cond.wait(); - } - - std::string get_oid() const { - return m_watcher->get_oid(); - } - -private: - class Watcher : public librbd::Watcher { - public: - Watcher(librados::IoCtx &ioctx, ContextWQ *work_queue) : - librbd::Watcher(ioctx, work_queue, RBD_MIRRORING) { - } - - virtual std::string get_oid() const { - return RBD_MIRRORING; - } - - virtual void handle_notify(uint64_t notify_id, uint64_t handle, - uint64_t notifier_id, bufferlist &bl) { - bufferlist out; - acknowledge_notify(notify_id, handle, out); - } - }; - - librados::IoCtx m_ioctx; - Watcher *m_watcher; -}; - Replayer::Replayer(Threads *threads, std::shared_ptr image_deleter, ImageSyncThrottlerRef<> image_sync_throttler, int64_t local_pool_id, const peer_t &peer, @@ -242,7 +216,8 @@ Replayer::Replayer(Threads *threads, std::shared_ptr image_deleter m_args(args), m_local_pool_id(local_pool_id), m_asok_hook(nullptr), - m_replayer_thread(this) + m_replayer_thread(this), + m_leader_listener(this) { } @@ -258,6 +233,9 @@ Replayer::~Replayer() if (m_replayer_thread.is_started()) { m_replayer_thread.join(); } + if (m_leader_watcher) { + m_leader_watcher->shut_down(); + } } bool Replayer::is_blacklisted() const { @@ -265,6 +243,11 @@ bool Replayer::is_blacklisted() const { return m_blacklisted; } +bool Replayer::is_leader() const { + Mutex::Locker locker(m_lock); + return m_leader_watcher && m_leader_watcher->is_leader(); +} + int Replayer::init() { dout(20) << "replaying for " << m_peer << dendl; @@ -301,6 +284,14 @@ int Replayer::init() dout(20) << "connected to " << m_peer << dendl; + m_leader_watcher.reset(new LeaderWatcher<>(m_threads, m_local_io_ctx, + &m_leader_listener)); + r = m_leader_watcher->init(); + if (r < 0) { + derr << "error initializing leader watcher: " << cpp_strerror(r) << dendl; + return r; + } + // Bootstrap existing mirroring images init_local_mirroring_images(); @@ -451,7 +442,7 @@ void Replayer::run() if (m_pool_watcher->is_blacklisted()) { m_blacklisted = true; m_stopping.set(1); - } else if (!m_manual_stop) { + } else if (!m_manual_stop && m_leader_watcher->is_leader()) { set_sources(m_pool_watcher->get_images()); } @@ -465,7 +456,7 @@ void Replayer::run() ImageIds empty_sources; while (true) { - Mutex::Locker l(m_lock); + Mutex::Locker locker(m_lock); set_sources(empty_sources); if (m_image_replayers.empty()) { break; @@ -484,6 +475,7 @@ void Replayer::print_status(Formatter *f, stringstream *ss) f->open_object_section("replayer_status"); f->dump_string("pool", m_local_io_ctx.get_pool_name()); f->dump_stream("peer") << m_peer; + f->dump_bool("leader", m_leader_watcher->is_leader()); f->open_array_section("image_replayers"); }; @@ -571,13 +563,27 @@ void Replayer::flush() } } +void Replayer::release_leader() +{ + dout(20) << "enter" << dendl; + + Mutex::Locker l(m_lock); + + if (m_stopping.read() || !m_leader_watcher) { + return; + } + + m_leader_watcher->release_leader(); +} + void Replayer::set_sources(const ImageIds &image_ids) { dout(20) << "enter" << dendl; assert(m_lock.is_locked()); - if (!m_init_images.empty()) { + if (!m_init_images.empty() && !m_stopping.read() && + m_leader_watcher->is_leader()) { dout(20) << "scanning initial local image set" << dendl; for (auto &remote_image : image_ids) { auto it = m_init_images.find(InitImageInfo(remote_image.global_id)); @@ -598,7 +604,6 @@ void Replayer::set_sources(const ImageIds &image_ids) } // shut down replayers for non-mirrored images - bool existing_image_replayers = !m_image_replayers.empty(); for (auto image_it = m_image_replayers.begin(); image_it != m_image_replayers.end();) { if (image_ids.find(ImageId(image_it->first)) == image_ids.end()) { @@ -615,9 +620,6 @@ void Replayer::set_sources(const ImageIds &image_ids) } if (image_ids.empty()) { - if (existing_image_replayers && m_image_replayers.empty()) { - mirror_image_status_shut_down(); - } return; } @@ -639,14 +641,6 @@ void Replayer::set_sources(const ImageIds &image_ids) return; } - if (m_image_replayers.empty() && !existing_image_replayers) { - // create entry for pool if it doesn't exist - r = mirror_image_status_init(); - if (r < 0) { - return; - } - } - for (auto &image_id : image_ids) { auto it = m_image_replayers.find(image_id.id); if (it == m_image_replayers.end()) { @@ -665,47 +659,6 @@ void Replayer::set_sources(const ImageIds &image_ids) } } -int Replayer::mirror_image_status_init() { - assert(!m_status_watcher); - - uint64_t instance_id = librados::Rados(m_local_io_ctx).get_instance_id(); - dout(20) << "pool_id=" << m_local_pool_id << ", " - << "instance_id=" << instance_id << dendl; - - librados::ObjectWriteOperation op; - librbd::cls_client::mirror_image_status_remove_down(&op); - int r = m_local_io_ctx.operate(RBD_MIRRORING, &op); - if (r < 0) { - derr << "error initializing " << RBD_MIRRORING << "object: " - << cpp_strerror(r) << dendl; - return r; - } - - unique_ptr watch_ctx( - new MirrorStatusWatchCtx(m_local_io_ctx, m_threads->work_queue)); - - r = watch_ctx->register_watch(); - if (r < 0) { - derr << "error registering watcher for " << watch_ctx->get_oid() - << " object: " << cpp_strerror(r) << dendl; - return r; - } - - m_status_watcher = std::move(watch_ctx); - return 0; -} - -void Replayer::mirror_image_status_shut_down() { - assert(m_status_watcher); - - int r = m_status_watcher->unregister_watch(); - if (r < 0) { - derr << "error unregistering watcher for " << m_status_watcher->get_oid() - << " object: " << cpp_strerror(r) << dendl; - } - m_status_watcher.reset(); -} - void Replayer::start_image_replayer(unique_ptr > &image_replayer, const std::string &image_id, const boost::optional& image_name) @@ -761,7 +714,7 @@ bool Replayer::stop_image_replayer(unique_ptr > &image_replayer) m_image_deleter->cancel_waiter(m_local_pool_id, image_replayer->get_global_image_id()); - if (!m_stopping.read()) { + if (!m_stopping.read() && m_leader_watcher->is_leader()) { dout(20) << "scheduling delete" << dendl; m_image_deleter->schedule_image_delete( m_local_rados, @@ -777,7 +730,7 @@ bool Replayer::stop_image_replayer(unique_ptr > &image_replayer) } FunctionContext *ctx = new FunctionContext( [&image_replayer, this] (int r) { - if (!m_stopping.read() && r >= 0) { + if (!m_stopping.read() && m_leader_watcher->is_leader() && r >= 0) { m_image_deleter->schedule_image_delete( m_local_rados, image_replayer->get_local_pool_id(), @@ -793,5 +746,37 @@ bool Replayer::stop_image_replayer(unique_ptr > &image_replayer) return false; } +void Replayer::handle_post_acquire_leader(Context *on_finish) { + dout(20) << dendl; + + { + Mutex::Locker locker(m_lock); + m_cond.Signal(); + } + + on_finish->complete(0); +} + +void Replayer::handle_pre_release_leader(Context *on_finish) { + dout(20) << dendl; + + { + Mutex::Locker locker(m_lock); + set_sources(ImageIds()); + if (!m_image_replayers.empty()) { + Mutex::Locker timer_locker(m_threads->timer_lock); + Context *task = create_async_context_callback( + m_threads->work_queue, new FunctionContext( + [this, on_finish](int r) { + handle_pre_release_leader(on_finish); + })); + m_threads->timer->add_event_after(1, task); + return; + } + } + + on_finish->complete(0); +} + } // namespace mirror } // namespace rbd diff --git a/src/tools/rbd_mirror/Replayer.h b/src/tools/rbd_mirror/Replayer.h index 10d5522f151..ddf99d66245 100644 --- a/src/tools/rbd_mirror/Replayer.h +++ b/src/tools/rbd_mirror/Replayer.h @@ -17,6 +17,7 @@ #include "ClusterWatcher.h" #include "ImageReplayer.h" +#include "LeaderWatcher.h" #include "PoolWatcher.h" #include "ImageDeleter.h" #include "types.h" @@ -26,7 +27,6 @@ namespace mirror { struct Threads; class ReplayerAdminSocketHook; -class MirrorStatusWatchCtx; /** * Controls mirroring for a single remote cluster. @@ -42,6 +42,7 @@ public: Replayer& operator=(const Replayer&) = delete; bool is_blacklisted() const; + bool is_leader() const; int init(); void run(); @@ -51,6 +52,7 @@ public: void stop(bool manual); void restart(); void flush(); + void release_leader(); private: typedef PoolWatcher::ImageId ImageId; @@ -64,12 +66,12 @@ private: const boost::optional& image_name); bool stop_image_replayer(unique_ptr > &image_replayer); - int mirror_image_status_init(); - void mirror_image_status_shut_down(); - int init_rados(const std::string &cluster_name, const std::string &client_name, const std::string &description, RadosRef *rados_ref); + void handle_post_acquire_leader(Context *on_finish); + void handle_pre_release_leader(Context *on_finish); + Threads *m_threads; std::shared_ptr m_image_deleter; ImageSyncThrottlerRef<> m_image_sync_throttler; @@ -92,7 +94,6 @@ private: std::unique_ptr m_pool_watcher; std::map > > m_image_replayers; - std::unique_ptr m_status_watcher; std::string m_asok_hook_name; ReplayerAdminSocketHook *m_asok_hook; @@ -126,6 +127,26 @@ private: return 0; } } m_replayer_thread; + + class LeaderListener : public LeaderWatcher<>::Listener { + public: + LeaderListener(Replayer *replayer) : m_replayer(replayer) { + } + + protected: + virtual void post_acquire_handler(Context *on_finish) { + m_replayer->handle_post_acquire_leader(on_finish); + } + + virtual void pre_release_handler(Context *on_finish) { + m_replayer->handle_pre_release_leader(on_finish); + } + + private: + Replayer *m_replayer; + } m_leader_listener; + + std::unique_ptr > m_leader_watcher; }; } // namespace mirror