]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rbd-mirror A/A: separate ImageReplayer handling from Replayer 13803/head
authorMykola Golub <mgolub@mirantis.com>
Sat, 18 Mar 2017 17:09:44 +0000 (18:09 +0100)
committerMykola Golub <mgolub@mirantis.com>
Fri, 7 Apr 2017 13:58:52 +0000 (15:58 +0200)
Fixes: http://tracker.ceph.com/issues/18785
Signed-off-by: Mykola Golub <mgolub@mirantis.com>
src/tools/rbd_mirror/Replayer.cc
src/tools/rbd_mirror/Replayer.h

index 953db35f63ed530060fedc9adcd6dfdec418c01d..f178af50989a95d61caa8d11f78f31f906741f14 100644 (file)
@@ -17,6 +17,7 @@
 #include "librbd/Utils.h"
 #include "librbd/Watcher.h"
 #include "librbd/api/Mirror.h"
+#include "InstanceReplayer.h"
 #include "InstanceWatcher.h"
 #include "LeaderWatcher.h"
 #include "Replayer.h"
@@ -258,6 +259,9 @@ Replayer::~Replayer()
   if (m_instance_watcher) {
     m_instance_watcher->shut_down();
   }
+  if (m_instance_replayer) {
+    m_instance_replayer->shut_down();
+  }
 
   assert(!m_pool_watcher);
 }
@@ -297,6 +301,15 @@ int Replayer::init()
     return r;
   }
 
+  std::string local_mirror_uuid;
+  r = librbd::cls_client::mirror_uuid_get(&m_local_io_ctx,
+                                          &local_mirror_uuid);
+  if (r < 0) {
+    derr << "failed to retrieve local mirror uuid from pool "
+         << m_local_io_ctx.get_pool_name() << ": " << cpp_strerror(r) << dendl;
+    return r;
+  }
+
   r = m_remote_rados->ioctx_create(m_local_io_ctx.get_pool_name().c_str(),
                                    m_remote_io_ctx);
   if (r < 0) {
@@ -304,10 +317,15 @@ int Replayer::init()
          << ": " << cpp_strerror(r) << dendl;
     return r;
   }
-  m_remote_pool_id = m_remote_io_ctx.get_id();
 
   dout(20) << "connected to " << m_peer << dendl;
 
+  m_instance_replayer.reset(
+    InstanceReplayer<>::create(m_threads, m_image_deleter,
+                               m_image_sync_throttler, m_local_rados,
+                               local_mirror_uuid, m_local_pool_id));
+  m_instance_replayer->init();
+
   m_leader_watcher.reset(new LeaderWatcher<>(m_threads, m_local_io_ctx,
                                              &m_leader_listener));
   r = m_leader_watcher->init();
@@ -422,27 +440,7 @@ void Replayer::run()
       break;
     }
 
-    for (auto image_it = m_image_replayers.begin();
-         image_it != m_image_replayers.end(); ) {
-      if (image_it->second->remote_images_empty()) {
-        if (stop_image_replayer(image_it->second)) {
-          image_it = m_image_replayers.erase(image_it);
-          continue;
-        }
-      } else {
-        start_image_replayer(image_it->second);
-      }
-      ++image_it;
-    }
-
-    m_cond.WaitInterval(m_lock,
-                       utime_t(g_ceph_context->_conf->
-                                  rbd_mirror_image_state_check_interval, 0));
-  }
-
-  Mutex::Locker locker(m_lock);
-  while (!m_image_replayers.empty()) {
-    stop_image_replayers();
+    m_cond.WaitInterval(m_lock, utime_t(1, 0));
   }
 }
 
@@ -471,14 +469,9 @@ void Replayer::print_status(Formatter *f, stringstream *ss)
     }
     f->close_section();
   }
-  f->open_array_section("image_replayers");
 
-  for (auto &kv : m_image_replayers) {
-    auto &image_replayer = kv.second;
-    image_replayer->print_status(f, ss);
-  }
+  m_instance_replayer->print_status(f, ss);
 
-  f->close_section();
   f->close_section();
   f->flush(*ss);
 }
@@ -493,12 +486,7 @@ void Replayer::start()
     return;
   }
 
-  m_manual_stop = false;
-
-  for (auto &kv : m_image_replayers) {
-    auto &image_replayer = kv.second;
-    image_replayer->start(nullptr, true);
-  }
+  m_instance_replayer->start();
 }
 
 void Replayer::stop(bool manual)
@@ -514,11 +502,7 @@ void Replayer::stop(bool manual)
     return;
   }
 
-  m_manual_stop = true;
-  for (auto &kv : m_image_replayers) {
-    auto &image_replayer = kv.second;
-    image_replayer->stop(nullptr, true);
-  }
+  m_instance_replayer->stop();
 }
 
 void Replayer::restart()
@@ -531,12 +515,7 @@ void Replayer::restart()
     return;
   }
 
-  m_manual_stop = false;
-
-  for (auto &kv : m_image_replayers) {
-    auto &image_replayer = kv.second;
-    image_replayer->restart();
-  }
+  m_instance_replayer->restart();
 }
 
 void Replayer::flush()
@@ -549,10 +528,7 @@ void Replayer::flush()
     return;
   }
 
-  for (auto &kv : m_image_replayers) {
-    auto &image_replayer = kv.second;
-    image_replayer->flush();
-  }
+  m_instance_replayer->flush();
 }
 
 void Replayer::release_leader()
@@ -582,6 +558,8 @@ void Replayer::handle_update(const std::string &mirror_uuid,
     return;
   }
 
+  m_instance_replayer->set_peers({{mirror_uuid, m_remote_io_ctx}});
+
   // first callback will be a full directory -- so see if we need to remove
   // any local images that no longer exist on the remote side
   if (!m_init_image_ids.empty()) {
@@ -603,193 +581,27 @@ void Replayer::handle_update(const std::string &mirror_uuid,
     m_init_image_ids.clear();
   }
 
-  // shut down replayers for non-mirrored images
-  for (auto &image_id : removed_image_ids) {
-    auto image_it = m_image_replayers.find(image_id.global_id);
-    if (image_it != m_image_replayers.end()) {
-      image_it->second->remove_remote_image(mirror_uuid, image_id.id);
-
-      if (image_it->second->is_running()) {
-        dout(20) << "stop image replayer for remote image "
-                 << image_id.id << " (" << image_id.global_id << ")"
-                 << dendl;
-      }
-
-      if (image_it->second->remote_images_empty() &&
-          stop_image_replayer(image_it->second)) {
-        // no additional remotes registered for this image
-        m_image_replayers.erase(image_it);
-      }
-    }
-  }
-
-  // prune previously stopped image replayers
-  for (auto image_it = m_image_replayers.begin();
-       image_it != m_image_replayers.end(); ) {
-    if (image_it->second->remote_images_empty() &&
-        stop_image_replayer(image_it->second)) {
-      image_it = m_image_replayers.erase(image_it);
-    } else {
-      ++image_it;
-    }
-  }
+  m_update_op_tracker.start_op();
+  Context *ctx = new FunctionContext([this](int r) {
+      dout(20) << "complete handle_update: r=" << r << dendl;
+      m_update_op_tracker.finish_op();
+    });
 
-  if (added_image_ids.empty()) {
-    return;
-  }
+  C_Gather *gather_ctx = new C_Gather(g_ceph_context, ctx);
 
-  std::string local_mirror_uuid;
-  int r = librbd::cls_client::mirror_uuid_get(&m_local_io_ctx,
-                                              &local_mirror_uuid);
-  if (r < 0 || local_mirror_uuid.empty()) {
-    derr << "failed to retrieve local mirror uuid from pool "
-         << m_local_io_ctx.get_pool_name() << ": " << cpp_strerror(r) << dendl;
-    return;
+  for (auto &image_id : removed_image_ids) {
+    m_instance_replayer->release_image(image_id.global_id,
+                                       {{mirror_uuid, image_id.id}}, true,
+                                       gather_ctx->new_sub());
   }
 
-  // start replayers for newly added remote image sources
   for (auto &image_id : added_image_ids) {
-    auto it = m_image_replayers.find(image_id.global_id);
-    if (it == m_image_replayers.end()) {
-      unique_ptr<ImageReplayer<> > image_replayer(new ImageReplayer<>(
-        m_threads, m_image_deleter, m_image_sync_throttler, m_local_rados,
-        local_mirror_uuid, m_local_pool_id, image_id.global_id));
-      if (m_manual_stop) {
-        image_replayer->stop(nullptr, true);
-      }
-
-      it = m_image_replayers.insert(
-        std::make_pair(image_id.global_id, std::move(image_replayer))).first;
-    }
-
-    it->second->add_remote_image(mirror_uuid, image_id.id,
-                                 m_remote_io_ctx);
-    if (!it->second->is_running()) {
-      dout(20) << "starting image replayer for remote image "
-               << image_id.global_id << dendl;
-    }
-    start_image_replayer(it->second);
-  }
-}
-
-void Replayer::start_image_replayer(unique_ptr<ImageReplayer<> > &image_replayer)
-{
-  assert(m_lock.is_locked());
-  if (!image_replayer->is_stopped() || image_replayer->remote_images_empty()) {
-    return;
-  } else if (image_replayer->is_blacklisted()) {
-    derr << "blacklisted detected during image replay" << dendl;
-    m_blacklisted = true;
-    m_stopping.set(1);
-    return;
-  }
-
-  std::string global_image_id = image_replayer->get_global_image_id();
-  dout(20) << "global_image_id=" << global_image_id << dendl;
-
-  FunctionContext *ctx = new FunctionContext(
-      [this, global_image_id] (int r) {
-        dout(20) << "image deleter result: r=" << r << ", "
-                 << "global_image_id=" << global_image_id << dendl;
-        if (r == -ESTALE || r == -ECANCELED) {
-          return;
-        }
-
-        Mutex::Locker locker(m_lock);
-        auto it = m_image_replayers.find(global_image_id);
-        if (it == m_image_replayers.end()) {
-          return;
-        }
-
-        auto &image_replayer = it->second;
-        if (r >= 0) {
-          image_replayer->start();
-        } else {
-          start_image_replayer(image_replayer);
-        }
-     }
-  );
-
-  m_image_deleter->wait_for_scheduled_deletion(
-    m_local_pool_id, image_replayer->get_global_image_id(), ctx, false);
-}
-
-bool Replayer::stop_image_replayer(unique_ptr<ImageReplayer<> > &image_replayer)
-{
-  assert(m_lock.is_locked());
-  dout(20) << "global_image_id=" << image_replayer->get_global_image_id()
-           << dendl;
-
-  // TODO: check how long it is stopping and alert if it is too long.
-  if (image_replayer->is_stopped()) {
-    m_image_deleter->cancel_waiter(m_local_pool_id,
-                                   image_replayer->get_global_image_id());
-
-    if (!m_stopping.read() && m_leader_watcher->is_leader()) {
-      dout(20) << "scheduling delete" << dendl;
-      m_image_deleter->schedule_image_delete(
-        m_local_rados,
-        image_replayer->get_local_pool_id(),
-        image_replayer->get_local_image_id(),
-        image_replayer->get_global_image_id());
-    }
-    return true;
-  } else {
-    if (!m_stopping.read()) {
-      dout(20) << "scheduling delete after image replayer stopped" << dendl;
-    }
-    FunctionContext *ctx = new FunctionContext(
-        [&image_replayer, this] (int r) {
-          if (!m_stopping.read() && m_leader_watcher->is_leader() && r >= 0) {
-            m_image_deleter->schedule_image_delete(
-              m_local_rados,
-              image_replayer->get_local_pool_id(),
-              image_replayer->get_local_image_id(),
-              image_replayer->get_global_image_id());
-          }
-        }
-    );
-    image_replayer->stop(ctx);
-  }
-
-  return false;
-}
-
-void Replayer::stop_image_replayers() {
-  dout(20) << dendl;
-
-  assert(m_lock.is_locked());
-  for (auto image_it = m_image_replayers.begin();
-       image_it != m_image_replayers.end();) {
-    if (stop_image_replayer(image_it->second)) {
-      image_it = m_image_replayers.erase(image_it);
-      continue;
-    }
-    ++image_it;
-  }
-}
-
-void Replayer::stop_image_replayers(Context *on_finish) {
-  dout(20) << dendl;
-
-  {
-    Mutex::Locker locker(m_lock);
-    stop_image_replayers();
-
-    if (!m_image_replayers.empty()) {
-      Context *ctx = new FunctionContext([this, on_finish](int r) {
-          assert(r == 0);
-          stop_image_replayers(on_finish);
-        });
-      ctx = create_async_context_callback(m_threads->work_queue, ctx);
-
-      Mutex::Locker timer_locker(m_threads->timer_lock);
-      m_threads->timer->add_event_after(1, ctx);
-      return;
-    }
+    m_instance_replayer->acquire_image(image_id.global_id,
+                                       {{mirror_uuid, image_id.id}},
+                                       gather_ctx->new_sub());
   }
 
-  on_finish->complete(0);
+  gather_ctx->activate();
 }
 
 void Replayer::handle_post_acquire_leader(Context *on_finish) {
@@ -871,8 +683,29 @@ void Replayer::handle_shut_down_pool_watcher(int r, Context *on_finish) {
     assert(m_pool_watcher);
     m_pool_watcher.reset();
   }
+  wait_for_update_ops(on_finish);
+}
 
-  stop_image_replayers(on_finish);
+void Replayer::wait_for_update_ops(Context *on_finish) {
+  dout(20) << dendl;
+
+  Mutex::Locker locker(m_lock);
+
+  Context *ctx = new FunctionContext([this, on_finish](int r) {
+      handle_wait_for_update_ops(r, on_finish);
+    });
+  ctx = create_async_context_callback(m_threads->work_queue, ctx);
+
+  m_update_op_tracker.wait_for_ops(ctx);
+}
+
+void Replayer::handle_wait_for_update_ops(int r, Context *on_finish) {
+  dout(20) << "r=" << r << dendl;
+
+  assert(r == 0);
+
+  Mutex::Locker locker(m_lock);
+  m_instance_replayer->release_all(on_finish);
 }
 
 } // namespace mirror
index 6eb3753aa3b963015588fa0dc33bd0622eef734b..b6649fb89a51a35c961f9b5f87561bd54ce257d2 100644 (file)
@@ -9,6 +9,7 @@
 #include <set>
 #include <string>
 
+#include "common/AsyncOpTracker.h"
 #include "common/Cond.h"
 #include "common/Mutex.h"
 #include "common/WorkQueue.h"
@@ -16,7 +17,6 @@
 #include "include/rados/librados.hpp"
 
 #include "ClusterWatcher.h"
-#include "ImageReplayer.h"
 #include "LeaderWatcher.h"
 #include "PoolWatcher.h"
 #include "ImageDeleter.h"
@@ -30,6 +30,7 @@ namespace rbd {
 namespace mirror {
 
 template <typename> struct Threads;
+template <typename> class InstanceReplayer;
 template <typename> class InstanceWatcher;
 
 /**
@@ -79,11 +80,6 @@ private:
                      const ImageIds &added_image_ids,
                      const ImageIds &removed_image_ids);
 
-  void start_image_replayer(unique_ptr<ImageReplayer<> > &image_replayer);
-  bool stop_image_replayer(unique_ptr<ImageReplayer<> > &image_replayer);
-  void stop_image_replayers();
-  void stop_image_replayers(Context *on_finish);
-
   int init_rados(const std::string &cluster_name,
                  const std::string &client_name,
                  const std::string &description, RadosRef *rados_ref);
@@ -99,6 +95,9 @@ private:
   void shut_down_pool_watcher(Context *on_finish);
   void handle_shut_down_pool_watcher(int r, Context *on_finish);
 
+  void wait_for_update_ops(Context *on_finish);
+  void handle_wait_for_update_ops(int r, Context *on_finish);
+
   Threads<librbd::ImageCtx> *m_threads;
   std::shared_ptr<ImageDeleter> m_image_deleter;
   ImageSyncThrottlerRef<> m_image_sync_throttler;
@@ -117,12 +116,11 @@ private:
   librados::IoCtx m_remote_io_ctx;
 
   int64_t m_local_pool_id = -1;
-  int64_t m_remote_pool_id = -1;
 
   PoolWatcherListener m_pool_watcher_listener;
   std::unique_ptr<PoolWatcher<> > m_pool_watcher;
 
-  std::map<std::string, std::unique_ptr<ImageReplayer<> > > m_image_replayers;
+  std::unique_ptr<InstanceReplayer<librbd::ImageCtx>> m_instance_replayer;
 
   std::string m_asok_hook_name;
   AdminSocketHook *m_asok_hook;
@@ -159,6 +157,7 @@ private:
 
   std::unique_ptr<LeaderWatcher<> > m_leader_watcher;
   std::unique_ptr<InstanceWatcher<librbd::ImageCtx> > m_instance_watcher;
+  AsyncOpTracker m_update_op_tracker;
 };
 
 } // namespace mirror