]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rbd-mirror: renamed Replayer to PoolReplayer
authorJason Dillaman <dillaman@redhat.com>
Wed, 19 Apr 2017 20:23:15 +0000 (16:23 -0400)
committerJason Dillaman <dillaman@redhat.com>
Fri, 21 Apr 2017 02:38:18 +0000 (22:38 -0400)
This is a stepping stone to support multiple peers within a single
pool.

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/tools/rbd_mirror/CMakeLists.txt
src/tools/rbd_mirror/Mirror.cc
src/tools/rbd_mirror/Mirror.h
src/tools/rbd_mirror/PoolReplayer.cc [new file with mode: 0644]
src/tools/rbd_mirror/PoolReplayer.h [new file with mode: 0644]
src/tools/rbd_mirror/Replayer.cc [deleted file]
src/tools/rbd_mirror/Replayer.h [deleted file]

index 4f1acbcbd4f2e2732f8792fbbfd1919401e2180e..3fb0536c762a4ecae2f84e06980b4b191a1dca01 100644 (file)
@@ -14,8 +14,8 @@ set(rbd_mirror_internal
   LeaderWatcher.cc
   Mirror.cc
   MirrorStatusWatcher.cc
+  PoolReplayer.cc
   PoolWatcher.cc
-  Replayer.cc
   Threads.cc
   types.cc
   image_replayer/BootstrapRequest.cc
index 0424edd91ddb200604a2225a3abd95efbe5af423..86c6939182f5561b0b6363fb7ae2e80d31dca9e9 100644 (file)
@@ -254,18 +254,17 @@ void Mirror::run()
     m_local_cluster_watcher->refresh_pools();
     Mutex::Locker l(m_lock);
     if (!m_manual_stop) {
-      update_replayers(m_local_cluster_watcher->get_pool_peers());
+      update_pool_replayers(m_local_cluster_watcher->get_pool_peers());
     }
     m_cond.WaitInterval(
       m_lock,
       utime_t(m_cct->_conf->rbd_mirror_pool_replayers_refresh_interval, 0));
   }
 
-  // stop all replayers in parallel
+  // stop all pool replayers in parallel
   Mutex::Locker locker(m_lock);
-  for (auto it = m_replayers.begin(); it != m_replayers.end(); it++) {
-    auto &replayer = it->second;
-    replayer->stop(false);
+  for (auto &pool_replayer : m_pool_replayers) {
+    pool_replayer.second->stop(false);
   }
   dout(20) << "return" << dendl;
 }
@@ -282,12 +281,11 @@ void Mirror::print_status(Formatter *f, stringstream *ss)
 
   if (f) {
     f->open_object_section("mirror_status");
-    f->open_array_section("replayers");
+    f->open_array_section("pool_replayers");
   };
 
-  for (auto it = m_replayers.begin(); it != m_replayers.end(); it++) {
-    auto &replayer = it->second;
-    replayer->print_status(f, ss);
+  for (auto &pool_replayer : m_pool_replayers) {
+    pool_replayer.second->print_status(f, ss);
   }
 
   if (f) {
@@ -322,9 +320,8 @@ void Mirror::start()
 
   m_manual_stop = false;
 
-  for (auto it = m_replayers.begin(); it != m_replayers.end(); it++) {
-    auto &replayer = it->second;
-    replayer->start();
+  for (auto &pool_replayer : m_pool_replayers) {
+    pool_replayer.second->start();
   }
 }
 
@@ -339,9 +336,8 @@ void Mirror::stop()
 
   m_manual_stop = true;
 
-  for (auto it = m_replayers.begin(); it != m_replayers.end(); it++) {
-    auto &replayer = it->second;
-    replayer->stop(true);
+  for (auto &pool_replayer : m_pool_replayers) {
+    pool_replayer.second->stop(true);
   }
 }
 
@@ -356,9 +352,8 @@ void Mirror::restart()
 
   m_manual_stop = false;
 
-  for (auto it = m_replayers.begin(); it != m_replayers.end(); it++) {
-    auto &replayer = it->second;
-    replayer->restart();
+  for (auto &pool_replayer : m_pool_replayers) {
+    pool_replayer.second->restart();
   }
 }
 
@@ -371,9 +366,8 @@ void Mirror::flush()
     return;
   }
 
-  for (auto it = m_replayers.begin(); it != m_replayers.end(); it++) {
-    auto &replayer = it->second;
-    replayer->flush();
+  for (auto &pool_replayer : m_pool_replayers) {
+    pool_replayer.second->flush();
   }
 }
 
@@ -386,30 +380,29 @@ void Mirror::release_leader()
     return;
   }
 
-  for (auto it = m_replayers.begin(); it != m_replayers.end(); it++) {
-    auto &replayer = it->second;
-    replayer->release_leader();
+  for (auto &pool_replayer : m_pool_replayers) {
+    pool_replayer.second->release_leader();
   }
 }
 
-void Mirror::update_replayers(const PoolPeers &pool_peers)
+void Mirror::update_pool_replayers(const PoolPeers &pool_peers)
 {
   dout(20) << "enter" << dendl;
   assert(m_lock.is_locked());
 
-  // remove stale replayers before creating new replayers
-  for (auto it = m_replayers.begin(); it != m_replayers.end();) {
+  // remove stale pool replayers before creating new pool replayers
+  for (auto it = m_pool_replayers.begin(); it != m_pool_replayers.end();) {
     auto &peer = it->first.second;
     auto pool_peer_it = pool_peers.find(it->first.first);
     if (it->second->is_blacklisted()) {
-      derr << "removing blacklisted replayer for " << peer << dendl;
+      derr << "removing blacklisted pool replayer for " << peer << dendl;
       // TODO: make async
-      it = m_replayers.erase(it);
+      it = m_pool_replayers.erase(it);
     } else if (pool_peer_it == pool_peers.end() ||
                pool_peer_it->second.find(peer) == pool_peer_it->second.end()) {
-      dout(20) << "removing replayer for " << peer << dendl;
+      dout(20) << "removing pool replayer for " << peer << dendl;
       // TODO: make async
-      it = m_replayers.erase(it);
+      it = m_pool_replayers.erase(it);
     } else {
       ++it;
     }
@@ -418,19 +411,22 @@ void Mirror::update_replayers(const PoolPeers &pool_peers)
   for (auto &kv : pool_peers) {
     for (auto &peer : kv.second) {
       PoolPeer pool_peer(kv.first, peer);
-      if (m_replayers.find(pool_peer) == m_replayers.end()) {
-        dout(20) << "starting replayer for " << peer << dendl;
-        unique_ptr<Replayer> replayer(new Replayer(m_threads, m_image_deleter,
-                                                   m_image_sync_throttler,
-                                                   kv.first, peer, m_args));
-        // TODO: make async, and retry connecting within replayer
-        int r = replayer->init();
+      if (m_pool_replayers.find(pool_peer) == m_pool_replayers.end()) {
+        dout(20) << "starting pool replayer for " << peer << dendl;
+        unique_ptr<PoolReplayer> pool_replayer(new PoolReplayer(
+         m_threads, m_image_deleter, m_image_sync_throttler, kv.first, peer,
+         m_args));
+
+        // TODO: make async, and retry connecting within pool replayer
+        int r = pool_replayer->init();
         if (r < 0) {
          continue;
         }
-        m_replayers.insert(std::make_pair(pool_peer, std::move(replayer)));
+        m_pool_replayers.emplace(pool_peer, std::move(pool_replayer));
       }
     }
+
+    // TODO currently only support a single peer
   }
 }
 
index 59e0575358b385e1494a6504faf2a8da5247d511..4ff9d512399aeb51f8212d5d37c44ff1fb4f906a 100644 (file)
@@ -13,7 +13,7 @@
 #include "include/atomic.h"
 #include "include/rados/librados.hpp"
 #include "ClusterWatcher.h"
-#include "Replayer.h"
+#include "PoolReplayer.h"
 #include "ImageDeleter.h"
 #include "types.h"
 
@@ -53,7 +53,7 @@ private:
   typedef ClusterWatcher::PoolPeers PoolPeers;
   typedef std::pair<int64_t, peer_t> PoolPeer;
 
-  void update_replayers(const PoolPeers &pool_peers);
+  void update_pool_replayers(const PoolPeers &pool_peers);
 
   CephContext *m_cct;
   std::vector<const char*> m_args;
@@ -66,7 +66,7 @@ private:
   std::unique_ptr<ClusterWatcher> m_local_cluster_watcher;
   std::shared_ptr<ImageDeleter> m_image_deleter;
   ImageSyncThrottlerRef<> m_image_sync_throttler;
-  std::map<PoolPeer, std::unique_ptr<Replayer> > m_replayers;
+  std::map<PoolPeer, std::unique_ptr<PoolReplayer> > m_pool_replayers;
   atomic_t m_stopping;
   bool m_manual_stop = false;
   MirrorAdminSocketHook *m_asok_hook;
diff --git a/src/tools/rbd_mirror/PoolReplayer.cc b/src/tools/rbd_mirror/PoolReplayer.cc
new file mode 100644 (file)
index 0000000..6747ddc
--- /dev/null
@@ -0,0 +1,726 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "PoolReplayer.h"
+#include <boost/bind.hpp>
+#include "common/Formatter.h"
+#include "common/admin_socket.h"
+#include "common/ceph_argparse.h"
+#include "common/code_environment.h"
+#include "common/common_init.h"
+#include "common/debug.h"
+#include "common/errno.h"
+#include "include/stringify.h"
+#include "cls/rbd/cls_rbd_client.h"
+#include "global/global_context.h"
+#include "librbd/internal.h"
+#include "librbd/Utils.h"
+#include "librbd/Watcher.h"
+#include "librbd/api/Mirror.h"
+#include "InstanceReplayer.h"
+#include "InstanceWatcher.h"
+#include "LeaderWatcher.h"
+#include "Threads.h"
+#include "pool_watcher/RefreshImagesRequest.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_rbd_mirror
+#undef dout_prefix
+#define dout_prefix *_dout << "rbd::mirror::PoolReplayer: " \
+                           << this << " " << __func__ << ": "
+
+using std::chrono::seconds;
+using std::map;
+using std::string;
+using std::unique_ptr;
+using std::vector;
+
+using librbd::cls_client::dir_get_name;
+using librbd::util::create_async_context_callback;
+
+namespace rbd {
+namespace mirror {
+
+namespace {
+
+class PoolReplayerAdminSocketCommand {
+public:
+  PoolReplayerAdminSocketCommand(PoolReplayer *pool_replayer)
+    : pool_replayer(pool_replayer) {
+  }
+  virtual ~PoolReplayerAdminSocketCommand() {}
+  virtual bool call(Formatter *f, stringstream *ss) = 0;
+protected:
+  PoolReplayer *pool_replayer;
+};
+
+class StatusCommand : public PoolReplayerAdminSocketCommand {
+public:
+  explicit StatusCommand(PoolReplayer *pool_replayer)
+    : PoolReplayerAdminSocketCommand(pool_replayer) {
+  }
+
+  bool call(Formatter *f, stringstream *ss) override {
+    pool_replayer->print_status(f, ss);
+    return true;
+  }
+};
+
+class StartCommand : public PoolReplayerAdminSocketCommand {
+public:
+  explicit StartCommand(PoolReplayer *pool_replayer)
+    : PoolReplayerAdminSocketCommand(pool_replayer) {
+  }
+
+  bool call(Formatter *f, stringstream *ss) override {
+    pool_replayer->start();
+    return true;
+  }
+};
+
+class StopCommand : public PoolReplayerAdminSocketCommand {
+public:
+  explicit StopCommand(PoolReplayer *pool_replayer)
+    : PoolReplayerAdminSocketCommand(pool_replayer) {
+  }
+
+  bool call(Formatter *f, stringstream *ss) override {
+    pool_replayer->stop(true);
+    return true;
+  }
+};
+
+class RestartCommand : public PoolReplayerAdminSocketCommand {
+public:
+  explicit RestartCommand(PoolReplayer *pool_replayer)
+    : PoolReplayerAdminSocketCommand(pool_replayer) {
+  }
+
+  bool call(Formatter *f, stringstream *ss) override {
+    pool_replayer->restart();
+    return true;
+  }
+};
+
+class FlushCommand : public PoolReplayerAdminSocketCommand {
+public:
+  explicit FlushCommand(PoolReplayer *pool_replayer)
+    : PoolReplayerAdminSocketCommand(pool_replayer) {
+  }
+
+  bool call(Formatter *f, stringstream *ss) override {
+    pool_replayer->flush();
+    return true;
+  }
+};
+
+class LeaderReleaseCommand : public PoolReplayerAdminSocketCommand {
+public:
+  explicit LeaderReleaseCommand(PoolReplayer *pool_replayer)
+    : PoolReplayerAdminSocketCommand(pool_replayer) {
+  }
+
+  bool call(Formatter *f, stringstream *ss) override {
+    pool_replayer->release_leader();
+    return true;
+  }
+};
+
+class PoolReplayerAdminSocketHook : public AdminSocketHook {
+public:
+  PoolReplayerAdminSocketHook(CephContext *cct, const std::string &name,
+                               PoolReplayer *pool_replayer)
+    : admin_socket(cct->get_admin_socket()) {
+    std::string command;
+    int r;
+
+    command = "rbd mirror status " + name;
+    r = admin_socket->register_command(command, command, this,
+                                      "get status for rbd mirror " + name);
+    if (r == 0) {
+      commands[command] = new StatusCommand(pool_replayer);
+    }
+
+    command = "rbd mirror start " + name;
+    r = admin_socket->register_command(command, command, this,
+                                      "start rbd mirror " + name);
+    if (r == 0) {
+      commands[command] = new StartCommand(pool_replayer);
+    }
+
+    command = "rbd mirror stop " + name;
+    r = admin_socket->register_command(command, command, this,
+                                      "stop rbd mirror " + name);
+    if (r == 0) {
+      commands[command] = new StopCommand(pool_replayer);
+    }
+
+    command = "rbd mirror restart " + name;
+    r = admin_socket->register_command(command, command, this,
+                                      "restart rbd mirror " + name);
+    if (r == 0) {
+      commands[command] = new RestartCommand(pool_replayer);
+    }
+
+    command = "rbd mirror flush " + name;
+    r = admin_socket->register_command(command, command, this,
+                                      "flush rbd mirror " + name);
+    if (r == 0) {
+      commands[command] = new FlushCommand(pool_replayer);
+    }
+
+    command = "rbd mirror leader release " + name;
+    r = admin_socket->register_command(command, command, this,
+                                       "release rbd mirror leader " + name);
+    if (r == 0) {
+      commands[command] = new LeaderReleaseCommand(pool_replayer);
+    }
+  }
+
+  ~PoolReplayerAdminSocketHook() override {
+    for (Commands::const_iterator i = commands.begin(); i != commands.end();
+        ++i) {
+      (void)admin_socket->unregister_command(i->first);
+      delete i->second;
+    }
+  }
+
+  bool call(std::string command, cmdmap_t& cmdmap, std::string format,
+           bufferlist& out) override {
+    Commands::const_iterator i = commands.find(command);
+    assert(i != commands.end());
+    Formatter *f = Formatter::create(format);
+    stringstream ss;
+    bool r = i->second->call(f, &ss);
+    delete f;
+    out.append(ss);
+    return r;
+  }
+
+private:
+  typedef std::map<std::string, PoolReplayerAdminSocketCommand*> Commands;
+
+  AdminSocket *admin_socket;
+  Commands commands;
+};
+
+} // anonymous namespace
+
+struct PoolReplayer::C_RefreshLocalImages : public Context {
+  PoolReplayer *pool_replayer;
+  Context *on_finish;
+  ImageIds image_ids;
+
+  C_RefreshLocalImages(PoolReplayer *pool_replayer, Context *on_finish)
+    : pool_replayer(pool_replayer), on_finish(on_finish) {
+  }
+
+  void finish(int r) override {
+    pool_replayer->handle_refresh_local_images(r, std::move(image_ids), on_finish);
+  }
+};
+
+PoolReplayer::PoolReplayer(Threads<librbd::ImageCtx> *threads,
+                          std::shared_ptr<ImageDeleter> image_deleter,
+                          ImageSyncThrottlerRef<> image_sync_throttler,
+                          int64_t local_pool_id, const peer_t &peer,
+                          const std::vector<const char*> &args) :
+  m_threads(threads),
+  m_image_deleter(image_deleter),
+  m_image_sync_throttler(image_sync_throttler),
+  m_lock(stringify("rbd::mirror::PoolReplayer ") + stringify(peer)),
+  m_peer(peer),
+  m_args(args),
+  m_local_pool_id(local_pool_id),
+  m_pool_watcher_listener(this),
+  m_asok_hook(nullptr),
+  m_pool_replayer_thread(this),
+  m_leader_listener(this)
+{
+}
+
+PoolReplayer::~PoolReplayer()
+{
+  delete m_asok_hook;
+
+  m_stopping.set(1);
+  {
+    Mutex::Locker l(m_lock);
+    m_cond.Signal();
+  }
+  if (m_pool_replayer_thread.is_started()) {
+    m_pool_replayer_thread.join();
+  }
+  if (m_leader_watcher) {
+    m_leader_watcher->shut_down();
+  }
+  if (m_instance_watcher) {
+    m_instance_watcher->shut_down();
+  }
+  if (m_instance_replayer) {
+    m_instance_replayer->shut_down();
+  }
+
+  assert(!m_pool_watcher);
+}
+
+bool PoolReplayer::is_blacklisted() const {
+  Mutex::Locker locker(m_lock);
+  return m_blacklisted;
+}
+
+bool PoolReplayer::is_leader() const {
+  Mutex::Locker locker(m_lock);
+  return m_leader_watcher && m_leader_watcher->is_leader();
+}
+
+int PoolReplayer::init()
+{
+  dout(20) << "replaying for " << m_peer << dendl;
+
+  int r = init_rados(g_ceph_context->_conf->cluster,
+                     g_ceph_context->_conf->name.to_str(),
+                     "local cluster", &m_local_rados);
+  if (r < 0) {
+    return r;
+  }
+
+  r = init_rados(m_peer.cluster_name, m_peer.client_name,
+                 std::string("remote peer ") + stringify(m_peer),
+                 &m_remote_rados);
+  if (r < 0) {
+    return r;
+  }
+
+  r = m_local_rados->ioctx_create2(m_local_pool_id, m_local_io_ctx);
+  if (r < 0) {
+    derr << "error accessing local pool " << m_local_pool_id << ": "
+         << cpp_strerror(r) << dendl;
+    return r;
+  }
+
+  std::string local_mirror_uuid;
+  r = librbd::cls_client::mirror_uuid_get(&m_local_io_ctx,
+                                          &local_mirror_uuid);
+  if (r < 0) {
+    derr << "failed to retrieve local mirror uuid from pool "
+         << m_local_io_ctx.get_pool_name() << ": " << cpp_strerror(r) << dendl;
+    return r;
+  }
+
+  r = m_remote_rados->ioctx_create(m_local_io_ctx.get_pool_name().c_str(),
+                                   m_remote_io_ctx);
+  if (r < 0) {
+    derr << "error accessing remote pool " << m_local_io_ctx.get_pool_name()
+         << ": " << cpp_strerror(r) << dendl;
+    return r;
+  }
+
+  dout(20) << "connected to " << m_peer << dendl;
+
+  m_instance_replayer.reset(
+    InstanceReplayer<>::create(m_threads, m_image_deleter,
+                               m_image_sync_throttler, m_local_rados,
+                               local_mirror_uuid, m_local_pool_id));
+  m_instance_replayer->init();
+  m_instance_replayer->add_peer(m_peer.uuid, m_remote_io_ctx);
+
+  m_instance_watcher.reset(InstanceWatcher<>::create(m_local_io_ctx,
+                                                     m_threads->work_queue,
+                                                     m_instance_replayer.get()));
+  r = m_instance_watcher->init();
+  if (r < 0) {
+    derr << "error initializing instance watcher: " << cpp_strerror(r) << dendl;
+    return r;
+  }
+
+  m_leader_watcher.reset(new LeaderWatcher<>(m_threads, m_local_io_ctx,
+                                             &m_leader_listener));
+  r = m_leader_watcher->init();
+  if (r < 0) {
+    derr << "error initializing leader watcher: " << cpp_strerror(r) << dendl;
+    return r;
+  }
+
+  m_pool_replayer_thread.create("pool replayer");
+
+  return 0;
+}
+
+int PoolReplayer::init_rados(const std::string &cluster_name,
+                            const std::string &client_name,
+                            const std::string &description,
+                            RadosRef *rados_ref) {
+  rados_ref->reset(new librados::Rados());
+
+  // NOTE: manually bootstrap a CephContext here instead of via
+  // the librados API to avoid mixing global singletons between
+  // the librados shared library and the daemon
+  // TODO: eliminate intermingling of global singletons within Ceph APIs
+  CephInitParameters iparams(CEPH_ENTITY_TYPE_CLIENT);
+  if (client_name.empty() || !iparams.name.from_str(client_name)) {
+    derr << "error initializing cluster handle for " << description << dendl;
+    return -EINVAL;
+  }
+
+  CephContext *cct = common_preinit(iparams, CODE_ENVIRONMENT_LIBRARY,
+                                    CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS);
+  cct->_conf->cluster = cluster_name;
+
+  // librados::Rados::conf_read_file
+  int r = cct->_conf->parse_config_files(nullptr, nullptr, 0);
+  if (r < 0) {
+    derr << "could not read ceph conf for " << description << ": "
+        << cpp_strerror(r) << dendl;
+    cct->put();
+    return r;
+  }
+  cct->_conf->parse_env();
+
+  // librados::Rados::conf_parse_env
+  std::vector<const char*> args;
+  env_to_vec(args, nullptr);
+  r = cct->_conf->parse_argv(args);
+  if (r < 0) {
+    derr << "could not parse environment for " << description << ":"
+         << cpp_strerror(r) << dendl;
+    cct->put();
+    return r;
+  }
+
+  if (!m_args.empty()) {
+    // librados::Rados::conf_parse_argv
+    args = m_args;
+    r = cct->_conf->parse_argv(args);
+    if (r < 0) {
+      derr << "could not parse command line args for " << description << ": "
+          << cpp_strerror(r) << dendl;
+      cct->put();
+      return r;
+    }
+  }
+
+  // disable unnecessary librbd cache
+  cct->_conf->set_val_or_die("rbd_cache", "false");
+  cct->_conf->apply_changes(nullptr);
+  cct->_conf->complain_about_parse_errors(cct);
+
+  r = (*rados_ref)->init_with_context(cct);
+  assert(r == 0);
+  cct->put();
+
+  r = (*rados_ref)->connect();
+  if (r < 0) {
+    derr << "error connecting to " << description << ": "
+        << cpp_strerror(r) << dendl;
+    return r;
+  }
+
+  return 0;
+}
+
+void PoolReplayer::run()
+{
+  dout(20) << "enter" << dendl;
+
+  while (!m_stopping.read()) {
+    std::string asok_hook_name = m_local_io_ctx.get_pool_name() + " " +
+                                 m_peer.cluster_name;
+    if (m_asok_hook_name != asok_hook_name || m_asok_hook == nullptr) {
+      m_asok_hook_name = asok_hook_name;
+      delete m_asok_hook;
+
+      m_asok_hook = new PoolReplayerAdminSocketHook(g_ceph_context,
+                                                   m_asok_hook_name, this);
+    }
+
+    Mutex::Locker locker(m_lock);
+    if (m_pool_watcher && m_pool_watcher->is_blacklisted()) {
+      m_blacklisted = true;
+      m_stopping.set(1);
+      break;
+    }
+
+    m_cond.WaitInterval(m_lock, utime_t(1, 0));
+  }
+}
+
+void PoolReplayer::print_status(Formatter *f, stringstream *ss)
+{
+  dout(20) << "enter" << dendl;
+
+  if (!f) {
+    return;
+  }
+
+  Mutex::Locker l(m_lock);
+
+  f->open_object_section("pool_replayer_status");
+  f->dump_string("pool", m_local_io_ctx.get_pool_name());
+  f->dump_stream("peer") << m_peer;
+  f->dump_string("instance_id", m_instance_watcher->get_instance_id());
+
+  std::string leader_instance_id;
+  m_leader_watcher->get_leader_instance_id(&leader_instance_id);
+  f->dump_string("leader_instance_id", leader_instance_id);
+
+  bool leader = m_leader_watcher->is_leader();
+  f->dump_bool("leader", leader);
+  if (leader) {
+    std::vector<std::string> instance_ids;
+    m_leader_watcher->list_instances(&instance_ids);
+    f->open_array_section("instances");
+    for (auto instance_id : instance_ids) {
+      f->dump_string("instance_id", instance_id);
+    }
+    f->close_section();
+  }
+
+  m_instance_replayer->print_status(f, ss);
+
+  f->close_section();
+  f->flush(*ss);
+}
+
+void PoolReplayer::start()
+{
+  dout(20) << "enter" << dendl;
+
+  Mutex::Locker l(m_lock);
+
+  if (m_stopping.read()) {
+    return;
+  }
+
+  m_instance_replayer->start();
+}
+
+void PoolReplayer::stop(bool manual)
+{
+  dout(20) << "enter: manual=" << manual << dendl;
+
+  Mutex::Locker l(m_lock);
+  if (!manual) {
+    m_stopping.set(1);
+    m_cond.Signal();
+    return;
+  } else if (m_stopping.read()) {
+    return;
+  }
+
+  m_instance_replayer->stop();
+}
+
+void PoolReplayer::restart()
+{
+  dout(20) << "enter" << dendl;
+
+  Mutex::Locker l(m_lock);
+
+  if (m_stopping.read()) {
+    return;
+  }
+
+  m_instance_replayer->restart();
+}
+
+void PoolReplayer::flush()
+{
+  dout(20) << "enter" << dendl;
+
+  Mutex::Locker l(m_lock);
+
+  if (m_stopping.read() || m_manual_stop) {
+    return;
+  }
+
+  m_instance_replayer->flush();
+}
+
+void PoolReplayer::release_leader()
+{
+  dout(20) << "enter" << dendl;
+
+  Mutex::Locker l(m_lock);
+
+  if (m_stopping.read() || !m_leader_watcher) {
+    return;
+  }
+
+  m_leader_watcher->release_leader();
+}
+
+void PoolReplayer::handle_update(const std::string &mirror_uuid,
+                                const ImageIds &added_image_ids,
+                                const ImageIds &removed_image_ids) {
+  assert(!mirror_uuid.empty());
+  if (m_stopping.read()) {
+    return;
+  }
+
+  dout(10) << dendl;
+  Mutex::Locker locker(m_lock);
+  if (!m_leader_watcher->is_leader()) {
+    return;
+  }
+
+  if (m_peer.uuid != mirror_uuid) {
+    m_instance_replayer->remove_peer(m_peer.uuid);
+    m_instance_replayer->add_peer(mirror_uuid, m_remote_io_ctx);
+    m_peer.uuid = mirror_uuid;
+  }
+
+  // first callback will be a full directory -- so see if we need to remove
+  // any local images that no longer exist on the remote side
+  if (!m_init_image_ids.empty()) {
+    dout(20) << "scanning initial local image set" << dendl;
+    for (auto &image_id : added_image_ids) {
+      auto it = m_init_image_ids.find(image_id);
+      if (it != m_init_image_ids.end()) {
+        m_init_image_ids.erase(it);
+      }
+    }
+
+    // the remaining images in m_init_image_ids must be deleted
+    for (auto &image_id : m_init_image_ids) {
+      dout(20) << "scheduling the deletion of init image: "
+               << image_id.global_id << " (" << image_id.id << ")" << dendl;
+      m_image_deleter->schedule_image_delete(m_local_rados, m_local_pool_id,
+                                             image_id.id, image_id.global_id);
+    }
+    m_init_image_ids.clear();
+  }
+
+  m_update_op_tracker.start_op();
+  Context *ctx = new FunctionContext([this](int r) {
+      dout(20) << "complete handle_update: r=" << r << dendl;
+      m_update_op_tracker.finish_op();
+    });
+
+  C_Gather *gather_ctx = new C_Gather(g_ceph_context, ctx);
+
+  for (auto &image_id : removed_image_ids) {
+    // for now always send to myself (the leader)
+    std::string &instance_id = m_instance_watcher->get_instance_id();
+    m_instance_watcher->notify_image_release(instance_id, image_id.global_id,
+                                             mirror_uuid, image_id.id, true,
+                                             gather_ctx->new_sub());
+  }
+
+  for (auto &image_id : added_image_ids) {
+    // for now always send to myself (the leader)
+    std::string &instance_id = m_instance_watcher->get_instance_id();
+    m_instance_watcher->notify_image_acquire(instance_id, image_id.global_id,
+                                             mirror_uuid, image_id.id,
+                                             gather_ctx->new_sub());
+  }
+
+  gather_ctx->activate();
+}
+
+void PoolReplayer::handle_post_acquire_leader(Context *on_finish) {
+  dout(20) << dendl;
+  refresh_local_images(on_finish);
+}
+
+void PoolReplayer::handle_pre_release_leader(Context *on_finish) {
+  dout(20) << dendl;
+  shut_down_pool_watcher(on_finish);
+}
+
+void PoolReplayer::refresh_local_images(Context *on_finish) {
+  dout(20) << dendl;
+
+  // ensure the initial set of local images is up-to-date
+  // after acquiring the leader role
+  auto ctx = new C_RefreshLocalImages(this, on_finish);
+  auto req = pool_watcher::RefreshImagesRequest<>::create(
+    m_local_io_ctx, &ctx->image_ids, ctx);
+  req->send();
+}
+
+void PoolReplayer::handle_refresh_local_images(int r, ImageIds &&image_ids,
+                                              Context *on_finish) {
+  dout(20) << "r=" << r << dendl;
+
+  {
+    Mutex::Locker locker(m_lock);
+    m_init_image_ids = std::move(image_ids);
+  }
+
+  if (r < 0) {
+    derr << "failed to retrieve local images: " << cpp_strerror(r) << dendl;
+    on_finish->complete(r);
+    return;
+  }
+
+  init_pool_watcher(on_finish);
+}
+
+void PoolReplayer::init_pool_watcher(Context *on_finish) {
+  dout(20) << dendl;
+
+  Mutex::Locker locker(m_lock);
+  assert(!m_pool_watcher);
+  m_pool_watcher.reset(new PoolWatcher<>(
+    m_threads, m_remote_io_ctx, m_pool_watcher_listener));
+  m_pool_watcher->init(create_async_context_callback(
+    m_threads->work_queue, on_finish));
+
+  m_cond.Signal();
+}
+
+void PoolReplayer::shut_down_pool_watcher(Context *on_finish) {
+  dout(20) << dendl;
+
+  {
+    Mutex::Locker locker(m_lock);
+    if (m_pool_watcher) {
+      Context *ctx = new FunctionContext([this, on_finish](int r) {
+          handle_shut_down_pool_watcher(r, on_finish);
+      });
+      ctx = create_async_context_callback(m_threads->work_queue, ctx);
+
+      m_pool_watcher->shut_down(ctx);
+      return;
+    }
+  }
+
+  on_finish->complete(0);
+}
+
+void PoolReplayer::handle_shut_down_pool_watcher(int r, Context *on_finish) {
+  dout(20) << "r=" << r << dendl;
+
+  {
+    Mutex::Locker locker(m_lock);
+    assert(m_pool_watcher);
+    m_pool_watcher.reset();
+  }
+  wait_for_update_ops(on_finish);
+}
+
+void PoolReplayer::wait_for_update_ops(Context *on_finish) {
+  dout(20) << dendl;
+
+  Mutex::Locker locker(m_lock);
+
+  Context *ctx = new FunctionContext([this, on_finish](int r) {
+      handle_wait_for_update_ops(r, on_finish);
+    });
+  ctx = create_async_context_callback(m_threads->work_queue, ctx);
+
+  m_update_op_tracker.wait_for_ops(ctx);
+}
+
+void PoolReplayer::handle_wait_for_update_ops(int r, Context *on_finish) {
+  dout(20) << "r=" << r << dendl;
+
+  assert(r == 0);
+
+  Mutex::Locker locker(m_lock);
+  m_instance_replayer->release_all(on_finish);
+}
+
+} // namespace mirror
+} // namespace rbd
diff --git a/src/tools/rbd_mirror/PoolReplayer.h b/src/tools/rbd_mirror/PoolReplayer.h
new file mode 100644 (file)
index 0000000..7de10a5
--- /dev/null
@@ -0,0 +1,171 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_RBD_MIRROR_POOL_REPLAYER_H
+#define CEPH_RBD_MIRROR_POOL_REPLAYER_H
+
+#include <map>
+#include <memory>
+#include <set>
+#include <string>
+
+#include "common/AsyncOpTracker.h"
+#include "common/Cond.h"
+#include "common/Mutex.h"
+#include "common/WorkQueue.h"
+#include "include/atomic.h"
+#include "include/rados/librados.hpp"
+
+#include "ClusterWatcher.h"
+#include "LeaderWatcher.h"
+#include "PoolWatcher.h"
+#include "ImageDeleter.h"
+#include "types.h"
+
+class AdminSocketHook;
+
+namespace librbd { class ImageCtx; }
+
+namespace rbd {
+namespace mirror {
+
+template <typename> struct Threads;
+template <typename> class InstanceReplayer;
+template <typename> class InstanceWatcher;
+
+/**
+ * Controls mirroring for a single remote cluster.
+ */
+class PoolReplayer {
+public:
+  PoolReplayer(Threads<librbd::ImageCtx> *threads,
+              std::shared_ptr<ImageDeleter> image_deleter,
+              ImageSyncThrottlerRef<> image_sync_throttler,
+              int64_t local_pool_id, const peer_t &peer,
+              const std::vector<const char*> &args);
+  ~PoolReplayer();
+  PoolReplayer(const PoolReplayer&) = delete;
+  PoolReplayer& operator=(const PoolReplayer&) = delete;
+
+  bool is_blacklisted() const;
+  bool is_leader() const;
+
+  int init();
+  void run();
+
+  void print_status(Formatter *f, stringstream *ss);
+  void start();
+  void stop(bool manual);
+  void restart();
+  void flush();
+  void release_leader();
+
+private:
+  struct PoolWatcherListener : public PoolWatcher<>::Listener {
+    PoolReplayer *pool_replayer;
+
+    PoolWatcherListener(PoolReplayer *pool_replayer)
+      : pool_replayer(pool_replayer) {
+    }
+
+    void handle_update(const std::string &mirror_uuid,
+                       const ImageIds &added_image_ids,
+                       const ImageIds &removed_image_ids) override {
+      pool_replayer->handle_update(mirror_uuid, added_image_ids,
+                                  removed_image_ids);
+    }
+  };
+
+  struct C_RefreshLocalImages;
+
+  void handle_update(const std::string &mirror_uuid,
+                     const ImageIds &added_image_ids,
+                     const ImageIds &removed_image_ids);
+
+  int init_rados(const std::string &cluster_name,
+                 const std::string &client_name,
+                 const std::string &description, RadosRef *rados_ref);
+
+  void handle_post_acquire_leader(Context *on_finish);
+  void handle_pre_release_leader(Context *on_finish);
+
+  void refresh_local_images(Context *on_finish);
+  void handle_refresh_local_images(int r, ImageIds &&image_ids,
+                                   Context *on_finish);
+
+  void init_pool_watcher(Context *on_finish);
+  void shut_down_pool_watcher(Context *on_finish);
+  void handle_shut_down_pool_watcher(int r, Context *on_finish);
+
+  void wait_for_update_ops(Context *on_finish);
+  void handle_wait_for_update_ops(int r, Context *on_finish);
+
+  Threads<librbd::ImageCtx> *m_threads;
+  std::shared_ptr<ImageDeleter> m_image_deleter;
+  ImageSyncThrottlerRef<> m_image_sync_throttler;
+  mutable Mutex m_lock;
+  Cond m_cond;
+  atomic_t m_stopping;
+  bool m_manual_stop = false;
+  bool m_blacklisted = false;
+
+  peer_t m_peer;
+  std::vector<const char*> m_args;
+  RadosRef m_local_rados;
+  RadosRef m_remote_rados;
+
+  librados::IoCtx m_local_io_ctx;
+  librados::IoCtx m_remote_io_ctx;
+
+  int64_t m_local_pool_id = -1;
+
+  PoolWatcherListener m_pool_watcher_listener;
+  std::unique_ptr<PoolWatcher<> > m_pool_watcher;
+
+  std::unique_ptr<InstanceReplayer<librbd::ImageCtx>> m_instance_replayer;
+
+  std::string m_asok_hook_name;
+  AdminSocketHook *m_asok_hook;
+
+  std::set<ImageId> m_init_image_ids;
+
+  class PoolReplayerThread : public Thread {
+    PoolReplayer *m_pool_replayer;
+  public:
+    PoolReplayerThread(PoolReplayer *pool_replayer)
+      : m_pool_replayer(pool_replayer) {
+    }
+    void *entry() override {
+      m_pool_replayer->run();
+      return 0;
+    }
+  } m_pool_replayer_thread;
+
+  class LeaderListener : public LeaderWatcher<>::Listener {
+  public:
+    LeaderListener(PoolReplayer *pool_replayer)
+      : m_pool_replayer(pool_replayer) {
+    }
+
+  protected:
+    void post_acquire_handler(Context *on_finish) override {
+      m_pool_replayer->handle_post_acquire_leader(on_finish);
+    }
+
+    void pre_release_handler(Context *on_finish) override {
+      m_pool_replayer->handle_pre_release_leader(on_finish);
+    }
+
+  private:
+    PoolReplayer *m_pool_replayer;
+  } m_leader_listener;
+
+  std::unique_ptr<LeaderWatcher<> > m_leader_watcher;
+  std::unique_ptr<InstanceWatcher<librbd::ImageCtx> > m_instance_watcher;
+  AsyncOpTracker m_update_op_tracker;
+};
+
+} // namespace mirror
+} // namespace rbd
+
+#endif // CEPH_RBD_MIRROR_POOL_REPLAYER_H
diff --git a/src/tools/rbd_mirror/Replayer.cc b/src/tools/rbd_mirror/Replayer.cc
deleted file mode 100644 (file)
index 7f8f9e2..0000000
+++ /dev/null
@@ -1,727 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-#include <boost/bind.hpp>
-
-#include "common/Formatter.h"
-#include "common/admin_socket.h"
-#include "common/ceph_argparse.h"
-#include "common/code_environment.h"
-#include "common/common_init.h"
-#include "common/debug.h"
-#include "common/errno.h"
-#include "include/stringify.h"
-#include "cls/rbd/cls_rbd_client.h"
-#include "global/global_context.h"
-#include "librbd/internal.h"
-#include "librbd/Utils.h"
-#include "librbd/Watcher.h"
-#include "librbd/api/Mirror.h"
-#include "InstanceReplayer.h"
-#include "InstanceWatcher.h"
-#include "LeaderWatcher.h"
-#include "Replayer.h"
-#include "Threads.h"
-#include "pool_watcher/RefreshImagesRequest.h"
-
-#define dout_context g_ceph_context
-#define dout_subsys ceph_subsys_rbd_mirror
-#undef dout_prefix
-#define dout_prefix *_dout << "rbd::mirror::Replayer: " \
-                           << this << " " << __func__ << ": "
-
-using std::chrono::seconds;
-using std::map;
-using std::string;
-using std::unique_ptr;
-using std::vector;
-
-using librbd::cls_client::dir_get_name;
-using librbd::util::create_async_context_callback;
-
-namespace rbd {
-namespace mirror {
-
-namespace {
-
-class ReplayerAdminSocketCommand {
-public:
-  virtual ~ReplayerAdminSocketCommand() {}
-  virtual bool call(Formatter *f, stringstream *ss) = 0;
-};
-
-class StatusCommand : public ReplayerAdminSocketCommand {
-public:
-  explicit StatusCommand(Replayer *replayer) : replayer(replayer) {}
-
-  bool call(Formatter *f, stringstream *ss) override {
-    replayer->print_status(f, ss);
-    return true;
-  }
-
-private:
-  Replayer *replayer;
-};
-
-class StartCommand : public ReplayerAdminSocketCommand {
-public:
-  explicit StartCommand(Replayer *replayer) : replayer(replayer) {}
-
-  bool call(Formatter *f, stringstream *ss) override {
-    replayer->start();
-    return true;
-  }
-
-private:
-  Replayer *replayer;
-};
-
-class StopCommand : public ReplayerAdminSocketCommand {
-public:
-  explicit StopCommand(Replayer *replayer) : replayer(replayer) {}
-
-  bool call(Formatter *f, stringstream *ss) override {
-    replayer->stop(true);
-    return true;
-  }
-
-private:
-  Replayer *replayer;
-};
-
-class RestartCommand : public ReplayerAdminSocketCommand {
-public:
-  explicit RestartCommand(Replayer *replayer) : replayer(replayer) {}
-
-  bool call(Formatter *f, stringstream *ss) override {
-    replayer->restart();
-    return true;
-  }
-
-private:
-  Replayer *replayer;
-};
-
-class FlushCommand : public ReplayerAdminSocketCommand {
-public:
-  explicit FlushCommand(Replayer *replayer) : replayer(replayer) {}
-
-  bool call(Formatter *f, stringstream *ss) override {
-    replayer->flush();
-    return true;
-  }
-
-private:
-  Replayer *replayer;
-};
-
-class LeaderReleaseCommand : public ReplayerAdminSocketCommand {
-public:
-  explicit LeaderReleaseCommand(Replayer *replayer) : replayer(replayer) {}
-
-  bool call(Formatter *f, stringstream *ss) override {
-    replayer->release_leader();
-    return true;
-  }
-
-private:
-  Replayer *replayer;
-};
-
-class ReplayerAdminSocketHook : public AdminSocketHook {
-public:
-  ReplayerAdminSocketHook(CephContext *cct, const std::string &name,
-                         Replayer *replayer) :
-    admin_socket(cct->get_admin_socket()) {
-    std::string command;
-    int r;
-
-    command = "rbd mirror status " + name;
-    r = admin_socket->register_command(command, command, this,
-                                      "get status for rbd mirror " + name);
-    if (r == 0) {
-      commands[command] = new StatusCommand(replayer);
-    }
-
-    command = "rbd mirror start " + name;
-    r = admin_socket->register_command(command, command, this,
-                                      "start rbd mirror " + name);
-    if (r == 0) {
-      commands[command] = new StartCommand(replayer);
-    }
-
-    command = "rbd mirror stop " + name;
-    r = admin_socket->register_command(command, command, this,
-                                      "stop rbd mirror " + name);
-    if (r == 0) {
-      commands[command] = new StopCommand(replayer);
-    }
-
-    command = "rbd mirror restart " + name;
-    r = admin_socket->register_command(command, command, this,
-                                      "restart rbd mirror " + name);
-    if (r == 0) {
-      commands[command] = new RestartCommand(replayer);
-    }
-
-    command = "rbd mirror flush " + name;
-    r = admin_socket->register_command(command, command, this,
-                                      "flush rbd mirror " + name);
-    if (r == 0) {
-      commands[command] = new FlushCommand(replayer);
-    }
-
-    command = "rbd mirror leader release " + name;
-    r = admin_socket->register_command(command, command, this,
-                                       "release rbd mirror leader " + name);
-    if (r == 0) {
-      commands[command] = new LeaderReleaseCommand(replayer);
-    }
-  }
-
-  ~ReplayerAdminSocketHook() override {
-    for (Commands::const_iterator i = commands.begin(); i != commands.end();
-        ++i) {
-      (void)admin_socket->unregister_command(i->first);
-      delete i->second;
-    }
-  }
-
-  bool call(std::string command, cmdmap_t& cmdmap, std::string format,
-           bufferlist& out) override {
-    Commands::const_iterator i = commands.find(command);
-    assert(i != commands.end());
-    Formatter *f = Formatter::create(format);
-    stringstream ss;
-    bool r = i->second->call(f, &ss);
-    delete f;
-    out.append(ss);
-    return r;
-  }
-
-private:
-  typedef std::map<std::string, ReplayerAdminSocketCommand*> Commands;
-
-  AdminSocket *admin_socket;
-  Commands commands;
-};
-
-} // anonymous namespace
-
-struct Replayer::C_RefreshLocalImages : public Context {
-  Replayer *replayer;
-  Context *on_finish;
-  ImageIds image_ids;
-
-  C_RefreshLocalImages(Replayer *replayer, Context *on_finish)
-    : replayer(replayer), on_finish(on_finish) {
-  }
-
-  void finish(int r) override {
-    replayer->handle_refresh_local_images(r, std::move(image_ids), on_finish);
-  }
-};
-
-Replayer::Replayer(Threads<librbd::ImageCtx> *threads,
-                   std::shared_ptr<ImageDeleter> image_deleter,
-                   ImageSyncThrottlerRef<> image_sync_throttler,
-                   int64_t local_pool_id, const peer_t &peer,
-                   const std::vector<const char*> &args) :
-  m_threads(threads),
-  m_image_deleter(image_deleter),
-  m_image_sync_throttler(image_sync_throttler),
-  m_lock(stringify("rbd::mirror::Replayer ") + stringify(peer)),
-  m_peer(peer),
-  m_args(args),
-  m_local_pool_id(local_pool_id),
-  m_pool_watcher_listener(this),
-  m_asok_hook(nullptr),
-  m_replayer_thread(this),
-  m_leader_listener(this)
-{
-}
-
-Replayer::~Replayer()
-{
-  delete m_asok_hook;
-
-  m_stopping.set(1);
-  {
-    Mutex::Locker l(m_lock);
-    m_cond.Signal();
-  }
-  if (m_replayer_thread.is_started()) {
-    m_replayer_thread.join();
-  }
-  if (m_leader_watcher) {
-    m_leader_watcher->shut_down();
-  }
-  if (m_instance_watcher) {
-    m_instance_watcher->shut_down();
-  }
-  if (m_instance_replayer) {
-    m_instance_replayer->shut_down();
-  }
-
-  assert(!m_pool_watcher);
-}
-
-bool Replayer::is_blacklisted() const {
-  Mutex::Locker locker(m_lock);
-  return m_blacklisted;
-}
-
-bool Replayer::is_leader() const {
-  Mutex::Locker locker(m_lock);
-  return m_leader_watcher && m_leader_watcher->is_leader();
-}
-
-int Replayer::init()
-{
-  dout(20) << "replaying for " << m_peer << dendl;
-
-  int r = init_rados(g_ceph_context->_conf->cluster,
-                     g_ceph_context->_conf->name.to_str(),
-                     "local cluster", &m_local_rados);
-  if (r < 0) {
-    return r;
-  }
-
-  r = init_rados(m_peer.cluster_name, m_peer.client_name,
-                 std::string("remote peer ") + stringify(m_peer),
-                 &m_remote_rados);
-  if (r < 0) {
-    return r;
-  }
-
-  r = m_local_rados->ioctx_create2(m_local_pool_id, m_local_io_ctx);
-  if (r < 0) {
-    derr << "error accessing local pool " << m_local_pool_id << ": "
-         << cpp_strerror(r) << dendl;
-    return r;
-  }
-
-  std::string local_mirror_uuid;
-  r = librbd::cls_client::mirror_uuid_get(&m_local_io_ctx,
-                                          &local_mirror_uuid);
-  if (r < 0) {
-    derr << "failed to retrieve local mirror uuid from pool "
-         << m_local_io_ctx.get_pool_name() << ": " << cpp_strerror(r) << dendl;
-    return r;
-  }
-
-  r = m_remote_rados->ioctx_create(m_local_io_ctx.get_pool_name().c_str(),
-                                   m_remote_io_ctx);
-  if (r < 0) {
-    derr << "error accessing remote pool " << m_local_io_ctx.get_pool_name()
-         << ": " << cpp_strerror(r) << dendl;
-    return r;
-  }
-
-  dout(20) << "connected to " << m_peer << dendl;
-
-  m_instance_replayer.reset(
-    InstanceReplayer<>::create(m_threads, m_image_deleter,
-                               m_image_sync_throttler, m_local_rados,
-                               local_mirror_uuid, m_local_pool_id));
-  m_instance_replayer->init();
-  m_instance_replayer->add_peer(m_peer.uuid, m_remote_io_ctx);
-
-  m_instance_watcher.reset(InstanceWatcher<>::create(m_local_io_ctx,
-                                                     m_threads->work_queue,
-                                                     m_instance_replayer.get()));
-  r = m_instance_watcher->init();
-  if (r < 0) {
-    derr << "error initializing instance watcher: " << cpp_strerror(r) << dendl;
-    return r;
-  }
-
-  m_leader_watcher.reset(new LeaderWatcher<>(m_threads, m_local_io_ctx,
-                                             &m_leader_listener));
-  r = m_leader_watcher->init();
-  if (r < 0) {
-    derr << "error initializing leader watcher: " << cpp_strerror(r) << dendl;
-    return r;
-  }
-
-  m_replayer_thread.create("replayer");
-
-  return 0;
-}
-
-int Replayer::init_rados(const std::string &cluster_name,
-                         const std::string &client_name,
-                         const std::string &description, RadosRef *rados_ref) {
-  rados_ref->reset(new librados::Rados());
-
-  // NOTE: manually bootstrap a CephContext here instead of via
-  // the librados API to avoid mixing global singletons between
-  // the librados shared library and the daemon
-  // TODO: eliminate intermingling of global singletons within Ceph APIs
-  CephInitParameters iparams(CEPH_ENTITY_TYPE_CLIENT);
-  if (client_name.empty() || !iparams.name.from_str(client_name)) {
-    derr << "error initializing cluster handle for " << description << dendl;
-    return -EINVAL;
-  }
-
-  CephContext *cct = common_preinit(iparams, CODE_ENVIRONMENT_LIBRARY,
-                                    CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS);
-  cct->_conf->cluster = cluster_name;
-
-  // librados::Rados::conf_read_file
-  int r = cct->_conf->parse_config_files(nullptr, nullptr, 0);
-  if (r < 0) {
-    derr << "could not read ceph conf for " << description << ": "
-        << cpp_strerror(r) << dendl;
-    cct->put();
-    return r;
-  }
-  cct->_conf->parse_env();
-
-  // librados::Rados::conf_parse_env
-  std::vector<const char*> args;
-  env_to_vec(args, nullptr);
-  r = cct->_conf->parse_argv(args);
-  if (r < 0) {
-    derr << "could not parse environment for " << description << ":"
-         << cpp_strerror(r) << dendl;
-    cct->put();
-    return r;
-  }
-
-  if (!m_args.empty()) {
-    // librados::Rados::conf_parse_argv
-    args = m_args;
-    r = cct->_conf->parse_argv(args);
-    if (r < 0) {
-      derr << "could not parse command line args for " << description << ": "
-          << cpp_strerror(r) << dendl;
-      cct->put();
-      return r;
-    }
-  }
-
-  // disable unnecessary librbd cache
-  cct->_conf->set_val_or_die("rbd_cache", "false");
-  cct->_conf->apply_changes(nullptr);
-  cct->_conf->complain_about_parse_errors(cct);
-
-  r = (*rados_ref)->init_with_context(cct);
-  assert(r == 0);
-  cct->put();
-
-  r = (*rados_ref)->connect();
-  if (r < 0) {
-    derr << "error connecting to " << description << ": "
-        << cpp_strerror(r) << dendl;
-    return r;
-  }
-
-  return 0;
-}
-
-void Replayer::run()
-{
-  dout(20) << "enter" << dendl;
-
-  while (!m_stopping.read()) {
-    std::string asok_hook_name = m_local_io_ctx.get_pool_name() + " " +
-                                 m_peer.cluster_name;
-    if (m_asok_hook_name != asok_hook_name || m_asok_hook == nullptr) {
-      m_asok_hook_name = asok_hook_name;
-      delete m_asok_hook;
-
-      m_asok_hook = new ReplayerAdminSocketHook(g_ceph_context,
-                                                m_asok_hook_name, this);
-    }
-
-    Mutex::Locker locker(m_lock);
-    if (m_pool_watcher && m_pool_watcher->is_blacklisted()) {
-      m_blacklisted = true;
-      m_stopping.set(1);
-      break;
-    }
-
-    m_cond.WaitInterval(m_lock, utime_t(1, 0));
-  }
-}
-
-void Replayer::print_status(Formatter *f, stringstream *ss)
-{
-  dout(20) << "enter" << dendl;
-
-  if (!f) {
-    return;
-  }
-
-  Mutex::Locker l(m_lock);
-
-  f->open_object_section("replayer_status");
-  f->dump_string("pool", m_local_io_ctx.get_pool_name());
-  f->dump_stream("peer") << m_peer;
-  f->dump_string("instance_id", m_instance_watcher->get_instance_id());
-
-  std::string leader_instance_id;
-  m_leader_watcher->get_leader_instance_id(&leader_instance_id);
-  f->dump_string("leader_instance_id", leader_instance_id);
-
-  bool leader = m_leader_watcher->is_leader();
-  f->dump_bool("leader", leader);
-  if (leader) {
-    std::vector<std::string> instance_ids;
-    m_leader_watcher->list_instances(&instance_ids);
-    f->open_array_section("instances");
-    for (auto instance_id : instance_ids) {
-      f->dump_string("instance_id", instance_id);
-    }
-    f->close_section();
-  }
-
-  m_instance_replayer->print_status(f, ss);
-
-  f->close_section();
-  f->flush(*ss);
-}
-
-void Replayer::start()
-{
-  dout(20) << "enter" << dendl;
-
-  Mutex::Locker l(m_lock);
-
-  if (m_stopping.read()) {
-    return;
-  }
-
-  m_instance_replayer->start();
-}
-
-void Replayer::stop(bool manual)
-{
-  dout(20) << "enter: manual=" << manual << dendl;
-
-  Mutex::Locker l(m_lock);
-  if (!manual) {
-    m_stopping.set(1);
-    m_cond.Signal();
-    return;
-  } else if (m_stopping.read()) {
-    return;
-  }
-
-  m_instance_replayer->stop();
-}
-
-void Replayer::restart()
-{
-  dout(20) << "enter" << dendl;
-
-  Mutex::Locker l(m_lock);
-
-  if (m_stopping.read()) {
-    return;
-  }
-
-  m_instance_replayer->restart();
-}
-
-void Replayer::flush()
-{
-  dout(20) << "enter" << dendl;
-
-  Mutex::Locker l(m_lock);
-
-  if (m_stopping.read() || m_manual_stop) {
-    return;
-  }
-
-  m_instance_replayer->flush();
-}
-
-void Replayer::release_leader()
-{
-  dout(20) << "enter" << dendl;
-
-  Mutex::Locker l(m_lock);
-
-  if (m_stopping.read() || !m_leader_watcher) {
-    return;
-  }
-
-  m_leader_watcher->release_leader();
-}
-
-void Replayer::handle_update(const std::string &mirror_uuid,
-                             const ImageIds &added_image_ids,
-                             const ImageIds &removed_image_ids) {
-  assert(!mirror_uuid.empty());
-  if (m_stopping.read()) {
-    return;
-  }
-
-  dout(10) << dendl;
-  Mutex::Locker locker(m_lock);
-  if (!m_leader_watcher->is_leader()) {
-    return;
-  }
-
-  if (m_peer.uuid != mirror_uuid) {
-    m_instance_replayer->remove_peer(m_peer.uuid);
-    m_instance_replayer->add_peer(mirror_uuid, m_remote_io_ctx);
-    m_peer.uuid = mirror_uuid;
-  }
-
-  // first callback will be a full directory -- so see if we need to remove
-  // any local images that no longer exist on the remote side
-  if (!m_init_image_ids.empty()) {
-    dout(20) << "scanning initial local image set" << dendl;
-    for (auto &image_id : added_image_ids) {
-      auto it = m_init_image_ids.find(image_id);
-      if (it != m_init_image_ids.end()) {
-        m_init_image_ids.erase(it);
-      }
-    }
-
-    // the remaining images in m_init_image_ids must be deleted
-    for (auto &image_id : m_init_image_ids) {
-      dout(20) << "scheduling the deletion of init image: "
-               << image_id.global_id << " (" << image_id.id << ")" << dendl;
-      m_image_deleter->schedule_image_delete(m_local_rados, m_local_pool_id,
-                                             image_id.id, image_id.global_id);
-    }
-    m_init_image_ids.clear();
-  }
-
-  m_update_op_tracker.start_op();
-  Context *ctx = new FunctionContext([this](int r) {
-      dout(20) << "complete handle_update: r=" << r << dendl;
-      m_update_op_tracker.finish_op();
-    });
-
-  C_Gather *gather_ctx = new C_Gather(g_ceph_context, ctx);
-
-  for (auto &image_id : removed_image_ids) {
-    // for now always send to myself (the leader)
-    std::string &instance_id = m_instance_watcher->get_instance_id();
-    m_instance_watcher->notify_image_release(instance_id, image_id.global_id,
-                                             mirror_uuid, image_id.id, true,
-                                             gather_ctx->new_sub());
-  }
-
-  for (auto &image_id : added_image_ids) {
-    // for now always send to myself (the leader)
-    std::string &instance_id = m_instance_watcher->get_instance_id();
-    m_instance_watcher->notify_image_acquire(instance_id, image_id.global_id,
-                                             mirror_uuid, image_id.id,
-                                             gather_ctx->new_sub());
-  }
-
-  gather_ctx->activate();
-}
-
-void Replayer::handle_post_acquire_leader(Context *on_finish) {
-  dout(20) << dendl;
-  refresh_local_images(on_finish);
-}
-
-void Replayer::handle_pre_release_leader(Context *on_finish) {
-  dout(20) << dendl;
-  shut_down_pool_watcher(on_finish);
-}
-
-void Replayer::refresh_local_images(Context *on_finish) {
-  dout(20) << dendl;
-
-  // ensure the initial set of local images is up-to-date
-  // after acquiring the leader role
-  auto ctx = new C_RefreshLocalImages(this, on_finish);
-  auto req = pool_watcher::RefreshImagesRequest<>::create(
-    m_local_io_ctx, &ctx->image_ids, ctx);
-  req->send();
-}
-
-void Replayer::handle_refresh_local_images(int r, ImageIds &&image_ids,
-                                           Context *on_finish) {
-  dout(20) << "r=" << r << dendl;
-
-  {
-    Mutex::Locker locker(m_lock);
-    m_init_image_ids = std::move(image_ids);
-  }
-
-  if (r < 0) {
-    derr << "failed to retrieve local images: " << cpp_strerror(r) << dendl;
-    on_finish->complete(r);
-    return;
-  }
-
-  init_pool_watcher(on_finish);
-}
-
-void Replayer::init_pool_watcher(Context *on_finish) {
-  dout(20) << dendl;
-
-  Mutex::Locker locker(m_lock);
-  assert(!m_pool_watcher);
-  m_pool_watcher.reset(new PoolWatcher<>(
-    m_threads, m_remote_io_ctx, m_pool_watcher_listener));
-  m_pool_watcher->init(create_async_context_callback(
-    m_threads->work_queue, on_finish));
-
-  m_cond.Signal();
-}
-
-void Replayer::shut_down_pool_watcher(Context *on_finish) {
-  dout(20) << dendl;
-
-  {
-    Mutex::Locker locker(m_lock);
-    if (m_pool_watcher) {
-      Context *ctx = new FunctionContext([this, on_finish](int r) {
-          handle_shut_down_pool_watcher(r, on_finish);
-      });
-      ctx = create_async_context_callback(m_threads->work_queue, ctx);
-
-      m_pool_watcher->shut_down(ctx);
-      return;
-    }
-  }
-
-  on_finish->complete(0);
-}
-
-void Replayer::handle_shut_down_pool_watcher(int r, Context *on_finish) {
-  dout(20) << "r=" << r << dendl;
-
-  {
-    Mutex::Locker locker(m_lock);
-    assert(m_pool_watcher);
-    m_pool_watcher.reset();
-  }
-  wait_for_update_ops(on_finish);
-}
-
-void Replayer::wait_for_update_ops(Context *on_finish) {
-  dout(20) << dendl;
-
-  Mutex::Locker locker(m_lock);
-
-  Context *ctx = new FunctionContext([this, on_finish](int r) {
-      handle_wait_for_update_ops(r, on_finish);
-    });
-  ctx = create_async_context_callback(m_threads->work_queue, ctx);
-
-  m_update_op_tracker.wait_for_ops(ctx);
-}
-
-void Replayer::handle_wait_for_update_ops(int r, Context *on_finish) {
-  dout(20) << "r=" << r << dendl;
-
-  assert(r == 0);
-
-  Mutex::Locker locker(m_lock);
-  m_instance_replayer->release_all(on_finish);
-}
-
-} // namespace mirror
-} // namespace rbd
diff --git a/src/tools/rbd_mirror/Replayer.h b/src/tools/rbd_mirror/Replayer.h
deleted file mode 100644 (file)
index b6649fb..0000000
+++ /dev/null
@@ -1,166 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-#ifndef CEPH_RBD_MIRROR_REPLAYER_H
-#define CEPH_RBD_MIRROR_REPLAYER_H
-
-#include <map>
-#include <memory>
-#include <set>
-#include <string>
-
-#include "common/AsyncOpTracker.h"
-#include "common/Cond.h"
-#include "common/Mutex.h"
-#include "common/WorkQueue.h"
-#include "include/atomic.h"
-#include "include/rados/librados.hpp"
-
-#include "ClusterWatcher.h"
-#include "LeaderWatcher.h"
-#include "PoolWatcher.h"
-#include "ImageDeleter.h"
-#include "types.h"
-
-class AdminSocketHook;
-
-namespace librbd { class ImageCtx; }
-
-namespace rbd {
-namespace mirror {
-
-template <typename> struct Threads;
-template <typename> class InstanceReplayer;
-template <typename> class InstanceWatcher;
-
-/**
- * Controls mirroring for a single remote cluster.
- */
-class Replayer {
-public:
-  Replayer(Threads<librbd::ImageCtx> *threads,
-           std::shared_ptr<ImageDeleter> image_deleter,
-           ImageSyncThrottlerRef<> image_sync_throttler,
-           int64_t local_pool_id, const peer_t &peer,
-           const std::vector<const char*> &args);
-  ~Replayer();
-  Replayer(const Replayer&) = delete;
-  Replayer& operator=(const Replayer&) = delete;
-
-  bool is_blacklisted() const;
-  bool is_leader() const;
-
-  int init();
-  void run();
-
-  void print_status(Formatter *f, stringstream *ss);
-  void start();
-  void stop(bool manual);
-  void restart();
-  void flush();
-  void release_leader();
-
-private:
-  struct PoolWatcherListener : public PoolWatcher<>::Listener {
-    Replayer *replayer;
-
-    PoolWatcherListener(Replayer *replayer) : replayer(replayer) {
-    }
-
-    void handle_update(const std::string &mirror_uuid,
-                       const ImageIds &added_image_ids,
-                       const ImageIds &removed_image_ids) override {
-      replayer->handle_update(mirror_uuid, added_image_ids, removed_image_ids);
-    }
-  };
-
-  struct C_RefreshLocalImages;
-
-  void handle_update(const std::string &mirror_uuid,
-                     const ImageIds &added_image_ids,
-                     const ImageIds &removed_image_ids);
-
-  int init_rados(const std::string &cluster_name,
-                 const std::string &client_name,
-                 const std::string &description, RadosRef *rados_ref);
-
-  void handle_post_acquire_leader(Context *on_finish);
-  void handle_pre_release_leader(Context *on_finish);
-
-  void refresh_local_images(Context *on_finish);
-  void handle_refresh_local_images(int r, ImageIds &&image_ids,
-                                   Context *on_finish);
-
-  void init_pool_watcher(Context *on_finish);
-  void shut_down_pool_watcher(Context *on_finish);
-  void handle_shut_down_pool_watcher(int r, Context *on_finish);
-
-  void wait_for_update_ops(Context *on_finish);
-  void handle_wait_for_update_ops(int r, Context *on_finish);
-
-  Threads<librbd::ImageCtx> *m_threads;
-  std::shared_ptr<ImageDeleter> m_image_deleter;
-  ImageSyncThrottlerRef<> m_image_sync_throttler;
-  mutable Mutex m_lock;
-  Cond m_cond;
-  atomic_t m_stopping;
-  bool m_manual_stop = false;
-  bool m_blacklisted = false;
-
-  peer_t m_peer;
-  std::vector<const char*> m_args;
-  RadosRef m_local_rados;
-  RadosRef m_remote_rados;
-
-  librados::IoCtx m_local_io_ctx;
-  librados::IoCtx m_remote_io_ctx;
-
-  int64_t m_local_pool_id = -1;
-
-  PoolWatcherListener m_pool_watcher_listener;
-  std::unique_ptr<PoolWatcher<> > m_pool_watcher;
-
-  std::unique_ptr<InstanceReplayer<librbd::ImageCtx>> m_instance_replayer;
-
-  std::string m_asok_hook_name;
-  AdminSocketHook *m_asok_hook;
-
-  std::set<ImageId> m_init_image_ids;
-
-  class ReplayerThread : public Thread {
-    Replayer *m_replayer;
-  public:
-    ReplayerThread(Replayer *replayer) : m_replayer(replayer) {}
-    void *entry() override {
-      m_replayer->run();
-      return 0;
-    }
-  } m_replayer_thread;
-
-  class LeaderListener : public LeaderWatcher<>::Listener {
-  public:
-    LeaderListener(Replayer *replayer) : m_replayer(replayer) {
-    }
-
-  protected:
-    void post_acquire_handler(Context *on_finish) override {
-      m_replayer->handle_post_acquire_leader(on_finish);
-    }
-
-    void pre_release_handler(Context *on_finish) override {
-      m_replayer->handle_pre_release_leader(on_finish);
-    }
-
-  private:
-    Replayer *m_replayer;
-  } m_leader_listener;
-
-  std::unique_ptr<LeaderWatcher<> > m_leader_watcher;
-  std::unique_ptr<InstanceWatcher<librbd::ImageCtx> > m_instance_watcher;
-  AsyncOpTracker m_update_op_tracker;
-};
-
-} // namespace mirror
-} // namespace rbd
-
-#endif // CEPH_RBD_MIRROR_REPLAYER_H