]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rbd-mirror: asok commands to get status and flush on Mirror and Replayer level 8235/head
authorMykola Golub <mgolub@mirantis.com>
Sun, 20 Mar 2016 19:05:36 +0000 (21:05 +0200)
committerMykola Golub <mgolub@mirantis.com>
Mon, 21 Mar 2016 19:44:31 +0000 (21:44 +0200)
Signed-off-by: Mykola Golub <mgolub@mirantis.com>
src/tools/rbd_mirror/ImageReplayer.cc
src/tools/rbd_mirror/ImageReplayer.h
src/tools/rbd_mirror/Mirror.cc
src/tools/rbd_mirror/Mirror.h
src/tools/rbd_mirror/Replayer.cc
src/tools/rbd_mirror/Replayer.h

index 2daf3c590da01545d0d444a78757978a0552f68b..11a7b22ab0473ae7bd512bd8725195889f229abb 100644 (file)
@@ -41,6 +41,8 @@ namespace mirror {
 using librbd::util::create_context_callback;
 using namespace rbd::mirror::image_replayer;
 
+std::ostream &operator<<(std::ostream &os, const ImageReplayer::State &state);
+
 namespace {
 
 struct ReplayHandler : public ::journal::ReplayHandler {
@@ -81,14 +83,7 @@ public:
   explicit StatusCommand(ImageReplayer *replayer) : replayer(replayer) {}
 
   bool call(Formatter *f, stringstream *ss) {
-    if (f) {
-      f->open_object_section("status");
-      f->dump_stream("state") << replayer->get_state();
-      f->close_section();
-      f->flush(*ss);
-    } else {
-      *ss << "state: " << replayer->get_state();
-    }
+    replayer->print_status(f, ss);
     return true;
   }
 
@@ -179,6 +174,7 @@ ImageReplayer::ImageReplayer(Threads *threads, RadosRef local, RadosRef remote,
   m_remote_pool_id(remote_pool_id),
   m_local_pool_id(local_pool_id),
   m_remote_image_id(remote_image_id),
+  m_name(stringify(remote_pool_id) + "/" + remote_image_id),
   m_lock("rbd::mirror::ImageReplayer " + stringify(remote_pool_id) + " " +
         remote_image_id),
   m_state(STATE_UNINITIALIZED),
@@ -452,11 +448,13 @@ void ImageReplayer::on_start_wait_for_local_journal_ready_start()
   dout(20) << "enter" << dendl;
 
   if (!m_asok_hook) {
+    Mutex::Locker locker(m_lock);
+
+    m_name = m_local_ioctx.get_pool_name() + "/" + m_local_image_ctx->name;
+
     CephContext *cct = static_cast<CephContext *>(m_local->cct());
-    std::string name = m_local_ioctx.get_pool_name() + "/" +
-      m_local_image_ctx->name;
 
-    m_asok_hook = new ImageReplayerAdminSocketHook(cct, name, this);
+    m_asok_hook = new ImageReplayerAdminSocketHook(cct, m_name, this);
   }
 
   FunctionContext *ctx = new FunctionContext(
@@ -875,6 +873,23 @@ bool ImageReplayer::on_flush_interrupted()
   return true;
 }
 
+void ImageReplayer::print_status(Formatter *f, stringstream *ss)
+{
+  dout(20) << "enter" << dendl;
+
+  Mutex::Locker l(m_lock);
+
+  if (f) {
+    f->open_object_section("image_replayer");
+    f->dump_string("name", m_name);
+    f->dump_stream("state") << m_state;
+    f->close_section();
+    f->flush(*ss);
+  } else {
+    *ss << m_name << ": state: " << m_state;
+  }
+}
+
 void ImageReplayer::handle_replay_process_ready(int r)
 {
   // journal::Replay is ready for more events -- attempt to pop another
index 19a87c5262be9d282fdcbc2ef96a2ce67a4a7796..9b6d03aa043390a035c68ff165e192fc0c2d51fc 100644 (file)
@@ -81,11 +81,15 @@ public:
   bool is_stopped() { Mutex::Locker l(m_lock); return is_stopped_(); }
   bool is_running() { Mutex::Locker l(m_lock); return is_running_(); }
 
+  std::string get_name() { Mutex::Locker l(m_lock); return m_name; };
+
   void start(Context *on_finish = nullptr,
             const BootstrapParams *bootstrap_params = nullptr);
   void stop(Context *on_finish = nullptr);
   void flush(Context *on_finish = nullptr);
 
+  void print_status(Formatter *f, stringstream *ss);
+
   virtual void handle_replay_ready();
   virtual void handle_replay_process_ready(int r);
   virtual void handle_replay_complete(int r);
@@ -190,6 +194,7 @@ private:
   std::string m_client_id;
   int64_t m_remote_pool_id, m_local_pool_id;
   std::string m_remote_image_id, m_local_image_id;
+  std::string m_name;
   Mutex m_lock;
   State m_state;
   std::string m_local_pool_name, m_remote_pool_name;
index c5c38bec36a1d12bb89951ac8da40742075bbfbb..3af5e1c6743b71f53707b372b83e3f64a17bfa46 100644 (file)
@@ -3,6 +3,8 @@
 
 #include <boost/range/adaptor/map.hpp>
 
+#include "common/Formatter.h"
+#include "common/admin_socket.h"
 #include "common/debug.h"
 #include "common/errno.h"
 #include "Mirror.h"
@@ -27,16 +29,107 @@ using librbd::mirror_peer_t;
 namespace rbd {
 namespace mirror {
 
+namespace {
+
+class MirrorAdminSocketCommand {
+public:
+  virtual ~MirrorAdminSocketCommand() {}
+  virtual bool call(Formatter *f, stringstream *ss) = 0;
+};
+
+class StatusCommand : public MirrorAdminSocketCommand {
+public:
+  explicit StatusCommand(Mirror *mirror) : mirror(mirror) {}
+
+  bool call(Formatter *f, stringstream *ss) {
+    mirror->print_status(f, ss);
+    return true;
+  }
+
+private:
+  Mirror *mirror;
+};
+
+class FlushCommand : public MirrorAdminSocketCommand {
+public:
+  explicit FlushCommand(Mirror *mirror) : mirror(mirror) {}
+
+  bool call(Formatter *f, stringstream *ss) {
+    mirror->flush();
+    return true;
+  }
+
+private:
+  Mirror *mirror;
+};
+
+} // anonymous namespace
+
+class MirrorAdminSocketHook : public AdminSocketHook {
+public:
+  MirrorAdminSocketHook(CephContext *cct, Mirror *mirror) :
+    admin_socket(cct->get_admin_socket()) {
+    std::string command;
+    int r;
+
+    command = "rbd mirror status";
+    r = admin_socket->register_command(command, command, this,
+                                      "get status for rbd mirror");
+    if (r == 0) {
+      commands[command] = new StatusCommand(mirror);
+    }
+
+    command = "rbd mirror flush";
+    r = admin_socket->register_command(command, command, this,
+                                      "flush rbd mirror");
+    if (r == 0) {
+      commands[command] = new FlushCommand(mirror);
+    }
+  }
+
+  ~MirrorAdminSocketHook() {
+    for (Commands::const_iterator i = commands.begin(); i != commands.end();
+        ++i) {
+      (void)admin_socket->unregister_command(i->first);
+      delete i->second;
+    }
+  }
+
+  bool call(std::string command, cmdmap_t& cmdmap, std::string format,
+           bufferlist& out) {
+    Commands::const_iterator i = commands.find(command);
+    assert(i != commands.end());
+    Formatter *f = Formatter::create(format);
+    stringstream ss;
+    bool r = i->second->call(f, &ss);
+    delete f;
+    out.append(ss);
+    return r;
+  }
+
+private:
+  typedef std::map<std::string, MirrorAdminSocketCommand*> Commands;
+
+  AdminSocket *admin_socket;
+  Commands commands;
+};
+
 Mirror::Mirror(CephContext *cct, const std::vector<const char*> &args) :
   m_cct(cct),
   m_args(args),
   m_lock("rbd::mirror::Mirror"),
-  m_local(new librados::Rados())
+  m_local(new librados::Rados()),
+  m_asok_hook(new MirrorAdminSocketHook(cct, this))
 {
   cct->lookup_or_create_singleton_object<Threads>(m_threads,
                                                   "rbd_mirror::threads");
 }
 
+Mirror::~Mirror()
+{
+  delete m_asok_hook;
+}
+
 void Mirror::handle_signal(int signum)
 {
   m_stopping.set(1);
@@ -75,6 +168,48 @@ void Mirror::run()
   dout(20) << "return" << dendl;
 }
 
+void Mirror::print_status(Formatter *f, stringstream *ss)
+{
+  dout(20) << "enter" << dendl;
+
+  Mutex::Locker l(m_lock);
+
+  if (m_stopping.read()) {
+    return;
+  }
+
+  if (f) {
+    f->open_object_section("mirror_status");
+    f->open_array_section("replayers");
+  };
+
+  for (auto it = m_replayers.begin(); it != m_replayers.end(); it++) {
+    auto &replayer = it->second;
+    replayer->print_status(f, ss);
+  }
+
+  if (f) {
+    f->close_section();
+    f->close_section();
+    f->flush(*ss);
+  }
+}
+
+void Mirror::flush()
+{
+  dout(20) << "enter" << dendl;
+  Mutex::Locker l(m_lock);
+
+  if (m_stopping.read()) {
+    return;
+  }
+
+  for (auto it = m_replayers.begin(); it != m_replayers.end(); it++) {
+    auto &replayer = it->second;
+    replayer->flush();
+  }
+}
+
 void Mirror::update_replayers(const map<peer_t, set<int64_t> > &peer_configs)
 {
   dout(20) << "enter" << dendl;
index 6b6cc97f1be6bc37df6165bbe3fb291b7230bc34..a23c448e2ba0294b52d91836e9c236a8b9a870c8 100644 (file)
@@ -20,6 +20,7 @@ namespace rbd {
 namespace mirror {
 
 struct Threads;
+class MirrorAdminSocketHook;
 
 /**
  * Contains the main loop and overall state for rbd-mirror.
@@ -32,11 +33,15 @@ public:
   Mirror(CephContext *cct, const std::vector<const char*> &args);
   Mirror(const Mirror&) = delete;
   Mirror& operator=(const Mirror&) = delete;
+  ~Mirror();
 
   int init();
   void run();
   void handle_signal(int signum);
 
+  void print_status(Formatter *f, stringstream *ss);
+  void flush();
+
 private:
   void refresh_peers(const set<peer_t> &peers);
   void update_replayers(const map<peer_t, set<int64_t> > &peer_configs);
@@ -52,6 +57,7 @@ private:
   std::unique_ptr<ClusterWatcher> m_local_cluster_watcher;
   std::map<peer_t, std::unique_ptr<Replayer> > m_replayers;
   atomic_t m_stopping;
+  MirrorAdminSocketHook *m_asok_hook;
 };
 
 } // namespace mirror
index 620a6a848ef554bf1551df29409ed6d9a7eab809..9a5d8989e1d80eee515a71ec57f96de0dffe4f9f 100644 (file)
@@ -3,6 +3,8 @@
 
 #include <boost/bind.hpp>
 
+#include "common/Formatter.h"
+#include "common/admin_socket.h"
 #include "common/debug.h"
 #include "common/errno.h"
 #include "include/stringify.h"
@@ -21,6 +23,92 @@ using std::vector;
 namespace rbd {
 namespace mirror {
 
+namespace {
+
+class ReplayerAdminSocketCommand {
+public:
+  virtual ~ReplayerAdminSocketCommand() {}
+  virtual bool call(Formatter *f, stringstream *ss) = 0;
+};
+
+class StatusCommand : public ReplayerAdminSocketCommand {
+public:
+  explicit StatusCommand(Replayer *replayer) : replayer(replayer) {}
+
+  bool call(Formatter *f, stringstream *ss) {
+    replayer->print_status(f, ss);
+    return true;
+  }
+
+private:
+  Replayer *replayer;
+};
+
+class FlushCommand : public ReplayerAdminSocketCommand {
+public:
+  explicit FlushCommand(Replayer *replayer) : replayer(replayer) {}
+
+  bool call(Formatter *f, stringstream *ss) {
+    replayer->flush();
+    return true;
+  }
+
+private:
+  Replayer *replayer;
+};
+
+} // anonymous namespace
+
+class ReplayerAdminSocketHook : public AdminSocketHook {
+public:
+  ReplayerAdminSocketHook(CephContext *cct, const std::string &name,
+                         Replayer *replayer) :
+    admin_socket(cct->get_admin_socket()) {
+    std::string command;
+    int r;
+
+    command = "rbd mirror status " + name;
+    r = admin_socket->register_command(command, command, this,
+                                      "get status for rbd mirror " + name);
+    if (r == 0) {
+      commands[command] = new StatusCommand(replayer);
+    }
+
+    command = "rbd mirror flush " + name;
+    r = admin_socket->register_command(command, command, this,
+                                      "flush rbd mirror " + name);
+    if (r == 0) {
+      commands[command] = new FlushCommand(replayer);
+    }
+  }
+
+  ~ReplayerAdminSocketHook() {
+    for (Commands::const_iterator i = commands.begin(); i != commands.end();
+        ++i) {
+      (void)admin_socket->unregister_command(i->first);
+      delete i->second;
+    }
+  }
+
+  bool call(std::string command, cmdmap_t& cmdmap, std::string format,
+           bufferlist& out) {
+    Commands::const_iterator i = commands.find(command);
+    assert(i != commands.end());
+    Formatter *f = Formatter::create(format);
+    stringstream ss;
+    bool r = i->second->call(f, &ss);
+    delete f;
+    out.append(ss);
+    return r;
+  }
+
+private:
+  typedef std::map<std::string, ReplayerAdminSocketCommand*> Commands;
+
+  AdminSocket *admin_socket;
+  Commands commands;
+};
+
 Replayer::Replayer(Threads *threads, RadosRef local_cluster,
                    const peer_t &peer, const std::vector<const char*> &args) :
   m_threads(threads),
@@ -29,12 +117,17 @@ Replayer::Replayer(Threads *threads, RadosRef local_cluster,
   m_args(args),
   m_local(local_cluster),
   m_remote(new librados::Rados),
+  m_asok_hook(nullptr),
   m_replayer_thread(this)
 {
+  CephContext *cct = static_cast<CephContext *>(m_local->cct());
+  m_asok_hook = new ReplayerAdminSocketHook(cct, m_peer.cluster_name, this);
 }
 
 Replayer::~Replayer()
 {
+  delete m_asok_hook;
+
   m_stopping.set(1);
   {
     Mutex::Locker l(m_lock);
@@ -129,6 +222,52 @@ void Replayer::run()
   }
 }
 
+void Replayer::print_status(Formatter *f, stringstream *ss)
+{
+  dout(20) << "enter" << dendl;
+
+  Mutex::Locker l(m_lock);
+
+  if (f) {
+    f->open_object_section("replayer_status");
+    f->dump_stream("peer") << m_peer;
+    f->open_array_section("image_replayers");
+  };
+
+  for (auto it = m_images.begin(); it != m_images.end(); it++) {
+    auto &pool_images = it->second;
+    for (auto i = pool_images.begin(); i != pool_images.end(); i++) {
+      auto &image_replayer = i->second;
+      image_replayer->print_status(f, ss);
+    }
+  }
+
+  if (f) {
+    f->close_section();
+    f->close_section();
+    f->flush(*ss);
+  }
+}
+
+void Replayer::flush()
+{
+  dout(20) << "enter" << dendl;
+
+  Mutex::Locker l(m_lock);
+
+  if (m_stopping.read()) {
+    return;
+  }
+
+  for (auto it = m_images.begin(); it != m_images.end(); it++) {
+    auto &pool_images = it->second;
+    for (auto i = pool_images.begin(); i != pool_images.end(); i++) {
+      auto &image_replayer = i->second;
+      image_replayer->flush();
+    }
+  }
+}
+
 void Replayer::set_sources(const map<int64_t, set<string> > &images)
 {
   dout(20) << "enter" << dendl;
index 83748b94e747da5de2e8b235368b2580db222c32..cd123a39772b61aa6d86e5ef4c1a9f56ac735ef8 100644 (file)
@@ -24,6 +24,7 @@ namespace rbd {
 namespace mirror {
 
 struct Threads;
+class ReplayerAdminSocketHook;
 
 /**
  * Controls mirroring for a single remote cluster.
@@ -38,7 +39,9 @@ public:
 
   int init();
   void run();
-  void shutdown();
+
+  void print_status(Formatter *f, stringstream *ss);
+  void flush();
 
 private:
   void set_sources(const std::map<int64_t, std::set<std::string> > &images);
@@ -60,6 +63,7 @@ private:
   // when a pool's configuration changes
   std::map<int64_t, std::map<std::string,
                             std::unique_ptr<ImageReplayer> > > m_images;
+  ReplayerAdminSocketHook *m_asok_hook;
 
   class ReplayerThread : public Thread {
     Replayer *m_replayer;