]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rbd-mirror HA: pool replayer should be started/stopped when lock acquired/released
authorMykola Golub <mgolub@mirantis.com>
Sun, 15 Jan 2017 18:15:14 +0000 (19:15 +0100)
committerMykola Golub <mgolub@mirantis.com>
Wed, 1 Feb 2017 09:55:03 +0000 (10:55 +0100)
Fixes: http://tracker.ceph.com/issues/17020
Signed-off-by: Mykola Golub <mgolub@mirantis.com>
src/tools/rbd_mirror/Mirror.cc
src/tools/rbd_mirror/Mirror.h
src/tools/rbd_mirror/Replayer.cc
src/tools/rbd_mirror/Replayer.h

index e14f10c3d4ded719641779c12dfc985b14fa2cb2..e9ef90694f1b6cb3a850a11dac860c43e4485f9a 100644 (file)
@@ -104,6 +104,19 @@ private:
   Mirror *mirror;
 };
 
+class LeaderReleaseCommand : public MirrorAdminSocketCommand {
+public:
+  explicit LeaderReleaseCommand(Mirror *mirror) : mirror(mirror) {}
+
+  bool call(Formatter *f, stringstream *ss) {
+    mirror->release_leader();
+    return true;
+  }
+
+private:
+  Mirror *mirror;
+};
+
 } // anonymous namespace
 
 class MirrorAdminSocketHook : public AdminSocketHook {
@@ -147,6 +160,13 @@ public:
     if (r == 0) {
       commands[command] = new FlushCommand(mirror);
     }
+
+    command = "rbd mirror leader release";
+    r = admin_socket->register_command(command, command, this,
+                                      "release rbd mirror leader");
+    if (r == 0) {
+      commands[command] = new LeaderReleaseCommand(mirror);
+    }
   }
 
   ~MirrorAdminSocketHook() {
@@ -356,6 +376,21 @@ void Mirror::flush()
   }
 }
 
+void Mirror::release_leader()
+{
+  dout(20) << "enter" << dendl;
+  Mutex::Locker l(m_lock);
+
+  if (m_stopping.read()) {
+    return;
+  }
+
+  for (auto it = m_replayers.begin(); it != m_replayers.end(); it++) {
+    auto &replayer = it->second;
+    replayer->release_leader();
+  }
+}
+
 void Mirror::update_replayers(const PoolPeers &pool_peers)
 {
   dout(20) << "enter" << dendl;
index f7a4d02894dae6c7d82691670b4cc5698bcd8d2e..7c39fe9d6de09c43b9e7f572f490c828f8ff687f 100644 (file)
@@ -45,6 +45,7 @@ public:
   void stop();
   void restart();
   void flush();
+  void release_leader();
 
 private:
   typedef ClusterWatcher::PoolPeers PoolPeers;
index 5223aa2c29c65dff3c4911f3b104e6ca4c014fd5..2da5d86bfe6fdd46dbc688e853848e7967e1a6f6 100644 (file)
 #include "include/stringify.h"
 #include "cls/rbd/cls_rbd_client.h"
 #include "global/global_context.h"
+#include "librbd/Utils.h"
 #include "librbd/Watcher.h"
 #include "librbd/internal.h"
+#include "LeaderWatcher.h"
 #include "Replayer.h"
 #include "Threads.h"
 
@@ -31,6 +33,7 @@ using std::unique_ptr;
 using std::vector;
 
 using librbd::cls_client::dir_get_name;
+using librbd::util::create_async_context_callback;
 
 namespace rbd {
 namespace mirror {
@@ -108,6 +111,19 @@ private:
   Replayer *replayer;
 };
 
+class LeaderReleaseCommand : public ReplayerAdminSocketCommand {
+public:
+  explicit LeaderReleaseCommand(Replayer *replayer) : replayer(replayer) {}
+
+  bool call(Formatter *f, stringstream *ss) {
+    replayer->release_leader();
+    return true;
+  }
+
+private:
+  Replayer *replayer;
+};
+
 } // anonymous namespace
 
 class ReplayerAdminSocketHook : public AdminSocketHook {
@@ -152,6 +168,13 @@ public:
     if (r == 0) {
       commands[command] = new FlushCommand(replayer);
     }
+
+    command = "rbd mirror leader release " + name;
+    r = admin_socket->register_command(command, command, this,
+                                       "release rbd mirror leader " + name);
+    if (r == 0) {
+      commands[command] = new LeaderReleaseCommand(replayer);
+    }
   }
 
   ~ReplayerAdminSocketHook() {
@@ -181,55 +204,6 @@ private:
   Commands commands;
 };
 
-class MirrorStatusWatchCtx {
-public:
-  MirrorStatusWatchCtx(librados::IoCtx &ioctx, ContextWQ *work_queue) {
-    m_ioctx.dup(ioctx);
-    m_watcher = new Watcher(m_ioctx, work_queue);
-  }
-
-  ~MirrorStatusWatchCtx() {
-    delete m_watcher;
-  }
-
-  int register_watch() {
-    C_SaferCond cond;
-    m_watcher->register_watch(&cond);
-    return cond.wait();
-  }
-
-  int unregister_watch() {
-    C_SaferCond cond;
-    m_watcher->unregister_watch(&cond);
-    return cond.wait();
-  }
-
-  std::string get_oid() const {
-    return m_watcher->get_oid();
-  }
-
-private:
-  class Watcher : public librbd::Watcher {
-  public:
-    Watcher(librados::IoCtx &ioctx, ContextWQ *work_queue) :
-      librbd::Watcher(ioctx, work_queue, RBD_MIRRORING) {
-    }
-
-    virtual std::string get_oid() const {
-      return RBD_MIRRORING;
-    }
-
-    virtual void handle_notify(uint64_t notify_id, uint64_t handle,
-                               uint64_t notifier_id, bufferlist &bl) {
-      bufferlist out;
-      acknowledge_notify(notify_id, handle, out);
-    }
-  };
-
-  librados::IoCtx m_ioctx;
-  Watcher *m_watcher;
-};
-
 Replayer::Replayer(Threads *threads, std::shared_ptr<ImageDeleter> image_deleter,
                    ImageSyncThrottlerRef<> image_sync_throttler,
                    int64_t local_pool_id, const peer_t &peer,
@@ -242,7 +216,8 @@ Replayer::Replayer(Threads *threads, std::shared_ptr<ImageDeleter> image_deleter
   m_args(args),
   m_local_pool_id(local_pool_id),
   m_asok_hook(nullptr),
-  m_replayer_thread(this)
+  m_replayer_thread(this),
+  m_leader_listener(this)
 {
 }
 
@@ -258,6 +233,9 @@ Replayer::~Replayer()
   if (m_replayer_thread.is_started()) {
     m_replayer_thread.join();
   }
+  if (m_leader_watcher) {
+    m_leader_watcher->shut_down();
+  }
 }
 
 bool Replayer::is_blacklisted() const {
@@ -265,6 +243,11 @@ bool Replayer::is_blacklisted() const {
   return m_blacklisted;
 }
 
+bool Replayer::is_leader() const {
+  Mutex::Locker locker(m_lock);
+  return m_leader_watcher && m_leader_watcher->is_leader();
+}
+
 int Replayer::init()
 {
   dout(20) << "replaying for " << m_peer << dendl;
@@ -301,6 +284,14 @@ int Replayer::init()
 
   dout(20) << "connected to " << m_peer << dendl;
 
+  m_leader_watcher.reset(new LeaderWatcher<>(m_threads, m_local_io_ctx,
+                                             &m_leader_listener));
+  r = m_leader_watcher->init();
+  if (r < 0) {
+    derr << "error initializing leader watcher: " << cpp_strerror(r) << dendl;
+    return r;
+  }
+
   // Bootstrap existing mirroring images
   init_local_mirroring_images();
 
@@ -451,7 +442,7 @@ void Replayer::run()
     if (m_pool_watcher->is_blacklisted()) {
       m_blacklisted = true;
       m_stopping.set(1);
-    } else if (!m_manual_stop) {
+    } else if (!m_manual_stop && m_leader_watcher->is_leader()) {
       set_sources(m_pool_watcher->get_images());
     }
 
@@ -465,7 +456,7 @@ void Replayer::run()
 
   ImageIds empty_sources;
   while (true) {
-    Mutex::Locker l(m_lock);
+    Mutex::Locker locker(m_lock);
     set_sources(empty_sources);
     if (m_image_replayers.empty()) {
       break;
@@ -484,6 +475,7 @@ void Replayer::print_status(Formatter *f, stringstream *ss)
     f->open_object_section("replayer_status");
     f->dump_string("pool", m_local_io_ctx.get_pool_name());
     f->dump_stream("peer") << m_peer;
+    f->dump_bool("leader", m_leader_watcher->is_leader());
     f->open_array_section("image_replayers");
   };
 
@@ -571,13 +563,27 @@ void Replayer::flush()
   }
 }
 
+void Replayer::release_leader()
+{
+  dout(20) << "enter" << dendl;
+
+  Mutex::Locker l(m_lock);
+
+  if (m_stopping.read() || !m_leader_watcher) {
+    return;
+  }
+
+  m_leader_watcher->release_leader();
+}
+
 void Replayer::set_sources(const ImageIds &image_ids)
 {
   dout(20) << "enter" << dendl;
 
   assert(m_lock.is_locked());
 
-  if (!m_init_images.empty()) {
+  if (!m_init_images.empty() && !m_stopping.read() &&
+      m_leader_watcher->is_leader()) {
     dout(20) << "scanning initial local image set" << dendl;
     for (auto &remote_image : image_ids) {
       auto it = m_init_images.find(InitImageInfo(remote_image.global_id));
@@ -598,7 +604,6 @@ void Replayer::set_sources(const ImageIds &image_ids)
   }
 
   // shut down replayers for non-mirrored images
-  bool existing_image_replayers = !m_image_replayers.empty();
   for (auto image_it = m_image_replayers.begin();
        image_it != m_image_replayers.end();) {
     if (image_ids.find(ImageId(image_it->first)) == image_ids.end()) {
@@ -615,9 +620,6 @@ void Replayer::set_sources(const ImageIds &image_ids)
   }
 
   if (image_ids.empty()) {
-    if (existing_image_replayers && m_image_replayers.empty()) {
-      mirror_image_status_shut_down();
-    }
     return;
   }
 
@@ -639,14 +641,6 @@ void Replayer::set_sources(const ImageIds &image_ids)
     return;
   }
 
-  if (m_image_replayers.empty() && !existing_image_replayers) {
-    // create entry for pool if it doesn't exist
-    r = mirror_image_status_init();
-    if (r < 0) {
-      return;
-    }
-  }
-
   for (auto &image_id : image_ids) {
     auto it = m_image_replayers.find(image_id.id);
     if (it == m_image_replayers.end()) {
@@ -665,47 +659,6 @@ void Replayer::set_sources(const ImageIds &image_ids)
   }
 }
 
-int Replayer::mirror_image_status_init() {
-  assert(!m_status_watcher);
-
-  uint64_t instance_id = librados::Rados(m_local_io_ctx).get_instance_id();
-  dout(20) << "pool_id=" << m_local_pool_id << ", "
-           << "instance_id=" << instance_id << dendl;
-
-  librados::ObjectWriteOperation op;
-  librbd::cls_client::mirror_image_status_remove_down(&op);
-  int r = m_local_io_ctx.operate(RBD_MIRRORING, &op);
-  if (r < 0) {
-    derr << "error initializing " << RBD_MIRRORING << "object: "
-        << cpp_strerror(r) << dendl;
-    return r;
-  }
-
-  unique_ptr<MirrorStatusWatchCtx> watch_ctx(
-    new MirrorStatusWatchCtx(m_local_io_ctx, m_threads->work_queue));
-
-  r = watch_ctx->register_watch();
-  if (r < 0) {
-    derr << "error registering watcher for " << watch_ctx->get_oid()
-        << " object: " << cpp_strerror(r) << dendl;
-    return r;
-  }
-
-  m_status_watcher = std::move(watch_ctx);
-  return 0;
-}
-
-void Replayer::mirror_image_status_shut_down() {
-  assert(m_status_watcher);
-
-  int r = m_status_watcher->unregister_watch();
-  if (r < 0) {
-    derr << "error unregistering watcher for " << m_status_watcher->get_oid()
-        << " object: " << cpp_strerror(r) << dendl;
-  }
-  m_status_watcher.reset();
-}
-
 void Replayer::start_image_replayer(unique_ptr<ImageReplayer<> > &image_replayer,
                                     const std::string &image_id,
                                     const boost::optional<std::string>& image_name)
@@ -761,7 +714,7 @@ bool Replayer::stop_image_replayer(unique_ptr<ImageReplayer<> > &image_replayer)
     m_image_deleter->cancel_waiter(m_local_pool_id,
                                    image_replayer->get_global_image_id());
 
-    if (!m_stopping.read()) {
+    if (!m_stopping.read() && m_leader_watcher->is_leader()) {
       dout(20) << "scheduling delete" << dendl;
       m_image_deleter->schedule_image_delete(
         m_local_rados,
@@ -777,7 +730,7 @@ bool Replayer::stop_image_replayer(unique_ptr<ImageReplayer<> > &image_replayer)
     }
     FunctionContext *ctx = new FunctionContext(
         [&image_replayer, this] (int r) {
-          if (!m_stopping.read() && r >= 0) {
+          if (!m_stopping.read() && m_leader_watcher->is_leader() && r >= 0) {
             m_image_deleter->schedule_image_delete(
               m_local_rados,
               image_replayer->get_local_pool_id(),
@@ -793,5 +746,37 @@ bool Replayer::stop_image_replayer(unique_ptr<ImageReplayer<> > &image_replayer)
   return false;
 }
 
+void Replayer::handle_post_acquire_leader(Context *on_finish) {
+  dout(20) << dendl;
+
+  {
+    Mutex::Locker locker(m_lock);
+    m_cond.Signal();
+  }
+
+  on_finish->complete(0);
+}
+
+void Replayer::handle_pre_release_leader(Context *on_finish) {
+  dout(20) << dendl;
+
+  {
+    Mutex::Locker locker(m_lock);
+    set_sources(ImageIds());
+    if (!m_image_replayers.empty()) {
+      Mutex::Locker timer_locker(m_threads->timer_lock);
+      Context *task = create_async_context_callback(
+        m_threads->work_queue, new FunctionContext(
+          [this, on_finish](int r) {
+            handle_pre_release_leader(on_finish);
+          }));
+      m_threads->timer->add_event_after(1, task);
+      return;
+    }
+  }
+
+  on_finish->complete(0);
+}
+
 } // namespace mirror
 } // namespace rbd
index 10d5522f151073297f8286289c3b7930ff20ebbb..ddf99d662452ecda55a75c8709ab08654cb09411 100644 (file)
@@ -17,6 +17,7 @@
 
 #include "ClusterWatcher.h"
 #include "ImageReplayer.h"
+#include "LeaderWatcher.h"
 #include "PoolWatcher.h"
 #include "ImageDeleter.h"
 #include "types.h"
@@ -26,7 +27,6 @@ namespace mirror {
 
 struct Threads;
 class ReplayerAdminSocketHook;
-class MirrorStatusWatchCtx;
 
 /**
  * Controls mirroring for a single remote cluster.
@@ -42,6 +42,7 @@ public:
   Replayer& operator=(const Replayer&) = delete;
 
   bool is_blacklisted() const;
+  bool is_leader() const;
 
   int init();
   void run();
@@ -51,6 +52,7 @@ public:
   void stop(bool manual);
   void restart();
   void flush();
+  void release_leader();
 
 private:
   typedef PoolWatcher::ImageId ImageId;
@@ -64,12 +66,12 @@ private:
                             const boost::optional<std::string>& image_name);
   bool stop_image_replayer(unique_ptr<ImageReplayer<> > &image_replayer);
 
-  int mirror_image_status_init();
-  void mirror_image_status_shut_down();
-
   int init_rados(const std::string &cluster_name, const std::string &client_name,
                  const std::string &description, RadosRef *rados_ref);
 
+  void handle_post_acquire_leader(Context *on_finish);
+  void handle_pre_release_leader(Context *on_finish);
+
   Threads *m_threads;
   std::shared_ptr<ImageDeleter> m_image_deleter;
   ImageSyncThrottlerRef<> m_image_sync_throttler;
@@ -92,7 +94,6 @@ private:
 
   std::unique_ptr<PoolWatcher> m_pool_watcher;
   std::map<std::string, std::unique_ptr<ImageReplayer<> > > m_image_replayers;
-  std::unique_ptr<MirrorStatusWatchCtx> m_status_watcher;
 
   std::string m_asok_hook_name;
   ReplayerAdminSocketHook *m_asok_hook;
@@ -126,6 +127,26 @@ private:
       return 0;
     }
   } m_replayer_thread;
+
+  class LeaderListener : public LeaderWatcher<>::Listener {
+  public:
+    LeaderListener(Replayer *replayer) : m_replayer(replayer) {
+    }
+
+  protected:
+    virtual void post_acquire_handler(Context *on_finish) {
+      m_replayer->handle_post_acquire_leader(on_finish);
+    }
+
+    virtual void pre_release_handler(Context *on_finish) {
+      m_replayer->handle_pre_release_leader(on_finish);
+    }
+
+  private:
+    Replayer *m_replayer;
+  } m_leader_listener;
+
+  std::unique_ptr<LeaderWatcher<> > m_leader_watcher;
 };
 
 } // namespace mirror