]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rbd-mirror: preserve pool replayer instance after errors
authorJason Dillaman <dillaman@redhat.com>
Sat, 15 Jul 2017 01:43:11 +0000 (21:43 -0400)
committerJason Dillaman <dillaman@redhat.com>
Tue, 18 Jul 2017 14:28:14 +0000 (10:28 -0400)
This will help reduce ping-ponging of any potential callout error
notifications.

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/tools/rbd_mirror/Mirror.cc
src/tools/rbd_mirror/PoolReplayer.cc
src/tools/rbd_mirror/PoolReplayer.h

index 20036be055c1a66cf6ea1b6bd9feb28fd6e56179..7852a4c088a7958e789e563b52ae56179c0a6120 100644 (file)
@@ -387,14 +387,11 @@ void Mirror::update_pool_replayers(const PoolPeers &pool_peers)
   for (auto it = m_pool_replayers.begin(); it != m_pool_replayers.end();) {
     auto &peer = it->first.second;
     auto pool_peer_it = pool_peers.find(it->first.first);
-    if (it->second->is_blacklisted()) {
-      derr << "removing blacklisted pool replayer for " << peer << dendl;
-      // TODO: make async
-      it = m_pool_replayers.erase(it);
-    } else if (pool_peer_it == pool_peers.end() ||
-               pool_peer_it->second.find(peer) == pool_peer_it->second.end()) {
+    if (pool_peer_it == pool_peers.end() ||
+        pool_peer_it->second.find(peer) == pool_peer_it->second.end()) {
       dout(20) << "removing pool replayer for " << peer << dendl;
       // TODO: make async
+      it->second->shut_down();
       it = m_pool_replayers.erase(it);
     } else {
       ++it;
@@ -404,17 +401,29 @@ void Mirror::update_pool_replayers(const PoolPeers &pool_peers)
   for (auto &kv : pool_peers) {
     for (auto &peer : kv.second) {
       PoolPeer pool_peer(kv.first, peer);
-      if (m_pool_replayers.find(pool_peer) == m_pool_replayers.end()) {
+
+      auto pool_replayers_it = m_pool_replayers.find(pool_peer);
+      if (pool_replayers_it != m_pool_replayers.end()) {
+        auto& pool_replayer = pool_replayers_it->second;
+        if (pool_replayer->is_blacklisted()) {
+          derr << "restarting blacklisted pool replayer for " << peer << dendl;
+          // TODO: make async
+          pool_replayer->shut_down();
+          pool_replayer->init();
+        } else if (!pool_replayer->is_running()) {
+          derr << "restarting failed pool replayer for " << peer << dendl;
+          // TODO: make async
+          pool_replayer->shut_down();
+          pool_replayer->init();
+        }
+      } else {
         dout(20) << "starting pool replayer for " << peer << dendl;
         unique_ptr<PoolReplayer> pool_replayer(new PoolReplayer(
          m_threads, &m_service_daemon, m_image_deleter.get(), kv.first,
           peer, m_args));
 
-        // TODO: make async, and retry connecting within pool replayer
-        int r = pool_replayer->init();
-        if (r < 0) {
-         continue;
-        }
+        // TODO: make async
+        pool_replayer->init();
         m_pool_replayers.emplace(pool_peer, std::move(pool_replayer));
       }
     }
index ede06fae76bbea61ebb90bf4f2b665e0b7ab5571..3f18a364eda821ca1a07fffce8ed71d1c1abb04b 100644 (file)
@@ -213,13 +213,12 @@ PoolReplayer::PoolReplayer(Threads<librbd::ImageCtx> *threads,
   m_threads(threads),
   m_service_daemon(service_daemon),
   m_image_deleter(image_deleter),
-  m_lock(stringify("rbd::mirror::PoolReplayer ") + stringify(peer)),
+  m_local_pool_id(local_pool_id),
   m_peer(peer),
   m_args(args),
-  m_local_pool_id(local_pool_id),
+  m_lock(stringify("rbd::mirror::PoolReplayer ") + stringify(peer)),
   m_local_pool_watcher_listener(this, true),
   m_remote_pool_watcher_listener(this, false),
-  m_asok_hook(nullptr),
   m_pool_replayer_thread(this),
   m_leader_listener(this)
 {
@@ -228,27 +227,7 @@ PoolReplayer::PoolReplayer(Threads<librbd::ImageCtx> *threads,
 PoolReplayer::~PoolReplayer()
 {
   delete m_asok_hook;
-
-  m_stopping = true;
-  {
-    Mutex::Locker l(m_lock);
-    m_cond.Signal();
-  }
-  if (m_pool_replayer_thread.is_started()) {
-    m_pool_replayer_thread.join();
-  }
-  if (m_leader_watcher) {
-    m_leader_watcher->shut_down();
-  }
-  if (m_instance_watcher) {
-    m_instance_watcher->shut_down();
-  }
-  if (m_instance_replayer) {
-    m_instance_replayer->shut_down();
-  }
-
-  assert(!m_local_pool_watcher);
-  assert(!m_remote_pool_watcher);
+  shut_down();
 }
 
 bool PoolReplayer::is_blacklisted() const {
@@ -261,29 +240,38 @@ bool PoolReplayer::is_leader() const {
   return m_leader_watcher && m_leader_watcher->is_leader();
 }
 
-int PoolReplayer::init()
+bool PoolReplayer::is_running() const {
+  return m_pool_replayer_thread.is_started();
+}
+
+void PoolReplayer::init()
 {
-  dout(20) << "replaying for " << m_peer << dendl;
+  assert(!m_pool_replayer_thread.is_started());
 
+  // reset state
+  m_stopping = false;
+  m_blacklisted = false;
+
+  dout(20) << "replaying for " << m_peer << dendl;
   int r = init_rados(g_ceph_context->_conf->cluster,
                      g_ceph_context->_conf->name.to_str(),
                      "local cluster", &m_local_rados);
   if (r < 0) {
-    return r;
+    return;
   }
 
   r = init_rados(m_peer.cluster_name, m_peer.client_name,
                  std::string("remote peer ") + stringify(m_peer),
                  &m_remote_rados);
   if (r < 0) {
-    return r;
+    return;
   }
 
   r = m_local_rados->ioctx_create2(m_local_pool_id, m_local_io_ctx);
   if (r < 0) {
     derr << "error accessing local pool " << m_local_pool_id << ": "
          << cpp_strerror(r) << dendl;
-    return r;
+    return;
   }
 
   std::string local_mirror_uuid;
@@ -292,7 +280,7 @@ int PoolReplayer::init()
   if (r < 0) {
     derr << "failed to retrieve local mirror uuid from pool "
          << m_local_io_ctx.get_pool_name() << ": " << cpp_strerror(r) << dendl;
-    return r;
+    return;
   }
 
   r = m_remote_rados->ioctx_create(m_local_io_ctx.get_pool_name().c_str(),
@@ -300,7 +288,7 @@ int PoolReplayer::init()
   if (r < 0) {
     derr << "error accessing remote pool " << m_local_io_ctx.get_pool_name()
          << ": " << cpp_strerror(r) << dendl;
-    return r;
+    return;
   }
 
   dout(20) << "connected to " << m_peer << dendl;
@@ -311,27 +299,48 @@ int PoolReplayer::init()
   m_instance_replayer->init();
   m_instance_replayer->add_peer(m_peer.uuid, m_remote_io_ctx);
 
-  m_instance_watcher.reset(InstanceWatcher<>::create(m_local_io_ctx,
-                                                     m_threads->work_queue,
-                                                     m_instance_replayer.get()));
+  m_instance_watcher.reset(InstanceWatcher<>::create(
+    m_local_io_ctx, m_threads->work_queue, m_instance_replayer.get()));
   r = m_instance_watcher->init();
   if (r < 0) {
     derr << "error initializing instance watcher: " << cpp_strerror(r) << dendl;
-    return r;
+    return;
   }
 
   m_leader_watcher.reset(new LeaderWatcher<>(m_threads, m_local_io_ctx,
                                              &m_leader_listener));
-
   r = m_leader_watcher->init();
   if (r < 0) {
     derr << "error initializing leader watcher: " << cpp_strerror(r) << dendl;
-    return r;
+    return;
   }
 
   m_pool_replayer_thread.create("pool replayer");
+}
 
-  return 0;
+void PoolReplayer::shut_down() {
+  m_stopping = true;
+  {
+    Mutex::Locker l(m_lock);
+    m_cond.Signal();
+  }
+  if (m_pool_replayer_thread.is_started()) {
+    m_pool_replayer_thread.join();
+  }
+  if (m_leader_watcher) {
+    m_leader_watcher->shut_down();
+  }
+  if (m_instance_watcher) {
+    m_instance_watcher->shut_down();
+  }
+  if (m_instance_replayer) {
+    m_instance_replayer->shut_down();
+  }
+
+  assert(!m_local_pool_watcher);
+  assert(!m_remote_pool_watcher);
+  m_local_rados.reset();
+  m_remote_rados.reset();
 }
 
 int PoolReplayer::init_rados(const std::string &cluster_name,
index d23b0d66a7443ef5d1c32fe609fca1b4542b5917..dca0a1f332668c91ff91400f68cb9f04f2dc6a08 100644 (file)
@@ -50,8 +50,11 @@ public:
 
   bool is_blacklisted() const;
   bool is_leader() const;
+  bool is_running() const;
+
+  void init();
+  void shut_down();
 
-  int init();
   void run();
 
   void print_status(Formatter *f, stringstream *ss);
@@ -106,22 +109,22 @@ private:
   Threads<librbd::ImageCtx> *m_threads;
   ServiceDaemon<librbd::ImageCtx>* m_service_daemon;
   ImageDeleter<>* m_image_deleter;
+  int64_t m_local_pool_id = -1;
+  peer_t m_peer;
+  std::vector<const char*> m_args;
+
   mutable Mutex m_lock;
   Cond m_cond;
   std::atomic<bool> m_stopping = { false };
   bool m_manual_stop = false;
   bool m_blacklisted = false;
 
-  peer_t m_peer;
-  std::vector<const char*> m_args;
   RadosRef m_local_rados;
   RadosRef m_remote_rados;
 
   librados::IoCtx m_local_io_ctx;
   librados::IoCtx m_remote_io_ctx;
 
-  int64_t m_local_pool_id = -1;
-
   PoolWatcherListener m_local_pool_watcher_listener;
   std::unique_ptr<PoolWatcher<> > m_local_pool_watcher;
 
@@ -131,7 +134,7 @@ private:
   std::unique_ptr<InstanceReplayer<librbd::ImageCtx>> m_instance_replayer;
 
   std::string m_asok_hook_name;
-  AdminSocketHook *m_asok_hook;
+  AdminSocketHook *m_asok_hook = nullptr;
 
   std::map<std::string, ImageIds> m_initial_mirror_image_ids;