From 77f02d1bc68ada27bc7d0ba296a990d284a99723 Mon Sep 17 00:00:00 2001 From: Jason Dillaman Date: Fri, 14 Jul 2017 21:43:11 -0400 Subject: [PATCH] rbd-mirror: preserve pool replayer instance after errors This will help reduce ping-ponging of any potential callout error notifications. Signed-off-by: Jason Dillaman --- src/tools/rbd_mirror/Mirror.cc | 33 +++++++---- src/tools/rbd_mirror/PoolReplayer.cc | 85 +++++++++++++++------------- src/tools/rbd_mirror/PoolReplayer.h | 15 +++-- 3 files changed, 77 insertions(+), 56 deletions(-) diff --git a/src/tools/rbd_mirror/Mirror.cc b/src/tools/rbd_mirror/Mirror.cc index 20036be055c1a..7852a4c088a79 100644 --- a/src/tools/rbd_mirror/Mirror.cc +++ b/src/tools/rbd_mirror/Mirror.cc @@ -387,14 +387,11 @@ void Mirror::update_pool_replayers(const PoolPeers &pool_peers) for (auto it = m_pool_replayers.begin(); it != m_pool_replayers.end();) { auto &peer = it->first.second; auto pool_peer_it = pool_peers.find(it->first.first); - if (it->second->is_blacklisted()) { - derr << "removing blacklisted pool replayer for " << peer << dendl; - // TODO: make async - it = m_pool_replayers.erase(it); - } else if (pool_peer_it == pool_peers.end() || - pool_peer_it->second.find(peer) == pool_peer_it->second.end()) { + if (pool_peer_it == pool_peers.end() || + pool_peer_it->second.find(peer) == pool_peer_it->second.end()) { dout(20) << "removing pool replayer for " << peer << dendl; // TODO: make async + it->second->shut_down(); it = m_pool_replayers.erase(it); } else { ++it; @@ -404,17 +401,29 @@ void Mirror::update_pool_replayers(const PoolPeers &pool_peers) for (auto &kv : pool_peers) { for (auto &peer : kv.second) { PoolPeer pool_peer(kv.first, peer); - if (m_pool_replayers.find(pool_peer) == m_pool_replayers.end()) { + + auto pool_replayers_it = m_pool_replayers.find(pool_peer); + if (pool_replayers_it != m_pool_replayers.end()) { + auto& pool_replayer = pool_replayers_it->second; + if (pool_replayer->is_blacklisted()) { + derr << "restarting blacklisted pool replayer for " << peer << dendl; + // TODO: make async + pool_replayer->shut_down(); + pool_replayer->init(); + } else if (!pool_replayer->is_running()) { + derr << "restarting failed pool replayer for " << peer << dendl; + // TODO: make async + pool_replayer->shut_down(); + pool_replayer->init(); + } + } else { dout(20) << "starting pool replayer for " << peer << dendl; unique_ptr pool_replayer(new PoolReplayer( m_threads, &m_service_daemon, m_image_deleter.get(), kv.first, peer, m_args)); - // TODO: make async, and retry connecting within pool replayer - int r = pool_replayer->init(); - if (r < 0) { - continue; - } + // TODO: make async + pool_replayer->init(); m_pool_replayers.emplace(pool_peer, std::move(pool_replayer)); } } diff --git a/src/tools/rbd_mirror/PoolReplayer.cc b/src/tools/rbd_mirror/PoolReplayer.cc index ede06fae76bbe..3f18a364eda82 100644 --- a/src/tools/rbd_mirror/PoolReplayer.cc +++ b/src/tools/rbd_mirror/PoolReplayer.cc @@ -213,13 +213,12 @@ PoolReplayer::PoolReplayer(Threads *threads, m_threads(threads), m_service_daemon(service_daemon), m_image_deleter(image_deleter), - m_lock(stringify("rbd::mirror::PoolReplayer ") + stringify(peer)), + m_local_pool_id(local_pool_id), m_peer(peer), m_args(args), - m_local_pool_id(local_pool_id), + m_lock(stringify("rbd::mirror::PoolReplayer ") + stringify(peer)), m_local_pool_watcher_listener(this, true), m_remote_pool_watcher_listener(this, false), - m_asok_hook(nullptr), m_pool_replayer_thread(this), m_leader_listener(this) { @@ -228,27 +227,7 @@ PoolReplayer::PoolReplayer(Threads *threads, PoolReplayer::~PoolReplayer() { delete m_asok_hook; - - m_stopping = true; - { - Mutex::Locker l(m_lock); - m_cond.Signal(); - } - if (m_pool_replayer_thread.is_started()) { - m_pool_replayer_thread.join(); - } - if (m_leader_watcher) { - m_leader_watcher->shut_down(); - } - if (m_instance_watcher) { - m_instance_watcher->shut_down(); - } - if (m_instance_replayer) { - m_instance_replayer->shut_down(); - } - - assert(!m_local_pool_watcher); - assert(!m_remote_pool_watcher); + shut_down(); } bool PoolReplayer::is_blacklisted() const { @@ -261,29 +240,38 @@ bool PoolReplayer::is_leader() const { return m_leader_watcher && m_leader_watcher->is_leader(); } -int PoolReplayer::init() +bool PoolReplayer::is_running() const { + return m_pool_replayer_thread.is_started(); +} + +void PoolReplayer::init() { - dout(20) << "replaying for " << m_peer << dendl; + assert(!m_pool_replayer_thread.is_started()); + // reset state + m_stopping = false; + m_blacklisted = false; + + dout(20) << "replaying for " << m_peer << dendl; int r = init_rados(g_ceph_context->_conf->cluster, g_ceph_context->_conf->name.to_str(), "local cluster", &m_local_rados); if (r < 0) { - return r; + return; } r = init_rados(m_peer.cluster_name, m_peer.client_name, std::string("remote peer ") + stringify(m_peer), &m_remote_rados); if (r < 0) { - return r; + return; } r = m_local_rados->ioctx_create2(m_local_pool_id, m_local_io_ctx); if (r < 0) { derr << "error accessing local pool " << m_local_pool_id << ": " << cpp_strerror(r) << dendl; - return r; + return; } std::string local_mirror_uuid; @@ -292,7 +280,7 @@ int PoolReplayer::init() if (r < 0) { derr << "failed to retrieve local mirror uuid from pool " << m_local_io_ctx.get_pool_name() << ": " << cpp_strerror(r) << dendl; - return r; + return; } r = m_remote_rados->ioctx_create(m_local_io_ctx.get_pool_name().c_str(), @@ -300,7 +288,7 @@ int PoolReplayer::init() if (r < 0) { derr << "error accessing remote pool " << m_local_io_ctx.get_pool_name() << ": " << cpp_strerror(r) << dendl; - return r; + return; } dout(20) << "connected to " << m_peer << dendl; @@ -311,27 +299,48 @@ int PoolReplayer::init() m_instance_replayer->init(); m_instance_replayer->add_peer(m_peer.uuid, m_remote_io_ctx); - m_instance_watcher.reset(InstanceWatcher<>::create(m_local_io_ctx, - m_threads->work_queue, - m_instance_replayer.get())); + m_instance_watcher.reset(InstanceWatcher<>::create( + m_local_io_ctx, m_threads->work_queue, m_instance_replayer.get())); r = m_instance_watcher->init(); if (r < 0) { derr << "error initializing instance watcher: " << cpp_strerror(r) << dendl; - return r; + return; } m_leader_watcher.reset(new LeaderWatcher<>(m_threads, m_local_io_ctx, &m_leader_listener)); - r = m_leader_watcher->init(); if (r < 0) { derr << "error initializing leader watcher: " << cpp_strerror(r) << dendl; - return r; + return; } m_pool_replayer_thread.create("pool replayer"); +} - return 0; +void PoolReplayer::shut_down() { + m_stopping = true; + { + Mutex::Locker l(m_lock); + m_cond.Signal(); + } + if (m_pool_replayer_thread.is_started()) { + m_pool_replayer_thread.join(); + } + if (m_leader_watcher) { + m_leader_watcher->shut_down(); + } + if (m_instance_watcher) { + m_instance_watcher->shut_down(); + } + if (m_instance_replayer) { + m_instance_replayer->shut_down(); + } + + assert(!m_local_pool_watcher); + assert(!m_remote_pool_watcher); + m_local_rados.reset(); + m_remote_rados.reset(); } int PoolReplayer::init_rados(const std::string &cluster_name, diff --git a/src/tools/rbd_mirror/PoolReplayer.h b/src/tools/rbd_mirror/PoolReplayer.h index d23b0d66a7443..dca0a1f332668 100644 --- a/src/tools/rbd_mirror/PoolReplayer.h +++ b/src/tools/rbd_mirror/PoolReplayer.h @@ -50,8 +50,11 @@ public: bool is_blacklisted() const; bool is_leader() const; + bool is_running() const; + + void init(); + void shut_down(); - int init(); void run(); void print_status(Formatter *f, stringstream *ss); @@ -106,22 +109,22 @@ private: Threads *m_threads; ServiceDaemon* m_service_daemon; ImageDeleter<>* m_image_deleter; + int64_t m_local_pool_id = -1; + peer_t m_peer; + std::vector m_args; + mutable Mutex m_lock; Cond m_cond; std::atomic m_stopping = { false }; bool m_manual_stop = false; bool m_blacklisted = false; - peer_t m_peer; - std::vector m_args; RadosRef m_local_rados; RadosRef m_remote_rados; librados::IoCtx m_local_io_ctx; librados::IoCtx m_remote_io_ctx; - int64_t m_local_pool_id = -1; - PoolWatcherListener m_local_pool_watcher_listener; std::unique_ptr > m_local_pool_watcher; @@ -131,7 +134,7 @@ private: std::unique_ptr> m_instance_replayer; std::string m_asok_hook_name; - AdminSocketHook *m_asok_hook; + AdminSocketHook *m_asok_hook = nullptr; std::map m_initial_mirror_image_ids; -- 2.39.5