From: Mykola Golub Date: Sun, 20 Mar 2016 19:05:36 +0000 (+0200) Subject: rbd-mirror: asok commands to get status and flush on Mirror and Replayer level X-Git-Tag: v10.1.1~121^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=a67f0cfdfa0bfd3238388c8d69abd773a4943a73;p=ceph.git rbd-mirror: asok commands to get status and flush on Mirror and Replayer level Signed-off-by: Mykola Golub --- diff --git a/src/tools/rbd_mirror/ImageReplayer.cc b/src/tools/rbd_mirror/ImageReplayer.cc index 2daf3c590da0..11a7b22ab047 100644 --- a/src/tools/rbd_mirror/ImageReplayer.cc +++ b/src/tools/rbd_mirror/ImageReplayer.cc @@ -41,6 +41,8 @@ namespace mirror { using librbd::util::create_context_callback; using namespace rbd::mirror::image_replayer; +std::ostream &operator<<(std::ostream &os, const ImageReplayer::State &state); + namespace { struct ReplayHandler : public ::journal::ReplayHandler { @@ -81,14 +83,7 @@ public: explicit StatusCommand(ImageReplayer *replayer) : replayer(replayer) {} bool call(Formatter *f, stringstream *ss) { - if (f) { - f->open_object_section("status"); - f->dump_stream("state") << replayer->get_state(); - f->close_section(); - f->flush(*ss); - } else { - *ss << "state: " << replayer->get_state(); - } + replayer->print_status(f, ss); return true; } @@ -179,6 +174,7 @@ ImageReplayer::ImageReplayer(Threads *threads, RadosRef local, RadosRef remote, m_remote_pool_id(remote_pool_id), m_local_pool_id(local_pool_id), m_remote_image_id(remote_image_id), + m_name(stringify(remote_pool_id) + "/" + remote_image_id), m_lock("rbd::mirror::ImageReplayer " + stringify(remote_pool_id) + " " + remote_image_id), m_state(STATE_UNINITIALIZED), @@ -452,11 +448,13 @@ void ImageReplayer::on_start_wait_for_local_journal_ready_start() dout(20) << "enter" << dendl; if (!m_asok_hook) { + Mutex::Locker locker(m_lock); + + m_name = m_local_ioctx.get_pool_name() + "/" + m_local_image_ctx->name; + CephContext *cct = static_cast(m_local->cct()); - std::string name = m_local_ioctx.get_pool_name() + "/" + - m_local_image_ctx->name; - m_asok_hook = new ImageReplayerAdminSocketHook(cct, name, this); + m_asok_hook = new ImageReplayerAdminSocketHook(cct, m_name, this); } FunctionContext *ctx = new FunctionContext( @@ -875,6 +873,23 @@ bool ImageReplayer::on_flush_interrupted() return true; } +void ImageReplayer::print_status(Formatter *f, stringstream *ss) +{ + dout(20) << "enter" << dendl; + + Mutex::Locker l(m_lock); + + if (f) { + f->open_object_section("image_replayer"); + f->dump_string("name", m_name); + f->dump_stream("state") << m_state; + f->close_section(); + f->flush(*ss); + } else { + *ss << m_name << ": state: " << m_state; + } +} + void ImageReplayer::handle_replay_process_ready(int r) { // journal::Replay is ready for more events -- attempt to pop another diff --git a/src/tools/rbd_mirror/ImageReplayer.h b/src/tools/rbd_mirror/ImageReplayer.h index 19a87c5262be..9b6d03aa0433 100644 --- a/src/tools/rbd_mirror/ImageReplayer.h +++ b/src/tools/rbd_mirror/ImageReplayer.h @@ -81,11 +81,15 @@ public: bool is_stopped() { Mutex::Locker l(m_lock); return is_stopped_(); } bool is_running() { Mutex::Locker l(m_lock); return is_running_(); } + std::string get_name() { Mutex::Locker l(m_lock); return m_name; }; + void start(Context *on_finish = nullptr, const BootstrapParams *bootstrap_params = nullptr); void stop(Context *on_finish = nullptr); void flush(Context *on_finish = nullptr); + void print_status(Formatter *f, stringstream *ss); + virtual void handle_replay_ready(); virtual void handle_replay_process_ready(int r); virtual void handle_replay_complete(int r); @@ -190,6 +194,7 @@ private: std::string m_client_id; int64_t m_remote_pool_id, m_local_pool_id; std::string m_remote_image_id, m_local_image_id; + std::string m_name; Mutex m_lock; State m_state; std::string m_local_pool_name, m_remote_pool_name; diff --git a/src/tools/rbd_mirror/Mirror.cc b/src/tools/rbd_mirror/Mirror.cc index c5c38bec36a1..3af5e1c6743b 100644 --- a/src/tools/rbd_mirror/Mirror.cc +++ b/src/tools/rbd_mirror/Mirror.cc @@ -3,6 +3,8 @@ #include +#include "common/Formatter.h" +#include "common/admin_socket.h" #include "common/debug.h" #include "common/errno.h" #include "Mirror.h" @@ -27,16 +29,107 @@ using librbd::mirror_peer_t; namespace rbd { namespace mirror { +namespace { + +class MirrorAdminSocketCommand { +public: + virtual ~MirrorAdminSocketCommand() {} + virtual bool call(Formatter *f, stringstream *ss) = 0; +}; + +class StatusCommand : public MirrorAdminSocketCommand { +public: + explicit StatusCommand(Mirror *mirror) : mirror(mirror) {} + + bool call(Formatter *f, stringstream *ss) { + mirror->print_status(f, ss); + return true; + } + +private: + Mirror *mirror; +}; + +class FlushCommand : public MirrorAdminSocketCommand { +public: + explicit FlushCommand(Mirror *mirror) : mirror(mirror) {} + + bool call(Formatter *f, stringstream *ss) { + mirror->flush(); + return true; + } + +private: + Mirror *mirror; +}; + +} // anonymous namespace + +class MirrorAdminSocketHook : public AdminSocketHook { +public: + MirrorAdminSocketHook(CephContext *cct, Mirror *mirror) : + admin_socket(cct->get_admin_socket()) { + std::string command; + int r; + + command = "rbd mirror status"; + r = admin_socket->register_command(command, command, this, + "get status for rbd mirror"); + if (r == 0) { + commands[command] = new StatusCommand(mirror); + } + + command = "rbd mirror flush"; + r = admin_socket->register_command(command, command, this, + "flush rbd mirror"); + if (r == 0) { + commands[command] = new FlushCommand(mirror); + } + } + + ~MirrorAdminSocketHook() { + for (Commands::const_iterator i = commands.begin(); i != commands.end(); + ++i) { + (void)admin_socket->unregister_command(i->first); + delete i->second; + } + } + + bool call(std::string command, cmdmap_t& cmdmap, std::string format, + bufferlist& out) { + Commands::const_iterator i = commands.find(command); + assert(i != commands.end()); + Formatter *f = Formatter::create(format); + stringstream ss; + bool r = i->second->call(f, &ss); + delete f; + out.append(ss); + return r; + } + +private: + typedef std::map Commands; + + AdminSocket *admin_socket; + Commands commands; +}; + Mirror::Mirror(CephContext *cct, const std::vector &args) : m_cct(cct), m_args(args), m_lock("rbd::mirror::Mirror"), - m_local(new librados::Rados()) + m_local(new librados::Rados()), + m_asok_hook(new MirrorAdminSocketHook(cct, this)) { cct->lookup_or_create_singleton_object(m_threads, "rbd_mirror::threads"); } +Mirror::~Mirror() +{ + delete m_asok_hook; +} + void Mirror::handle_signal(int signum) { m_stopping.set(1); @@ -75,6 +168,48 @@ void Mirror::run() dout(20) << "return" << dendl; } +void Mirror::print_status(Formatter *f, stringstream *ss) +{ + dout(20) << "enter" << dendl; + + Mutex::Locker l(m_lock); + + if (m_stopping.read()) { + return; + } + + if (f) { + f->open_object_section("mirror_status"); + f->open_array_section("replayers"); + }; + + for (auto it = m_replayers.begin(); it != m_replayers.end(); it++) { + auto &replayer = it->second; + replayer->print_status(f, ss); + } + + if (f) { + f->close_section(); + f->close_section(); + f->flush(*ss); + } +} + +void Mirror::flush() +{ + dout(20) << "enter" << dendl; + Mutex::Locker l(m_lock); + + if (m_stopping.read()) { + return; + } + + for (auto it = m_replayers.begin(); it != m_replayers.end(); it++) { + auto &replayer = it->second; + replayer->flush(); + } +} + void Mirror::update_replayers(const map > &peer_configs) { dout(20) << "enter" << dendl; diff --git a/src/tools/rbd_mirror/Mirror.h b/src/tools/rbd_mirror/Mirror.h index 6b6cc97f1be6..a23c448e2ba0 100644 --- a/src/tools/rbd_mirror/Mirror.h +++ b/src/tools/rbd_mirror/Mirror.h @@ -20,6 +20,7 @@ namespace rbd { namespace mirror { struct Threads; +class MirrorAdminSocketHook; /** * Contains the main loop and overall state for rbd-mirror. @@ -32,11 +33,15 @@ public: Mirror(CephContext *cct, const std::vector &args); Mirror(const Mirror&) = delete; Mirror& operator=(const Mirror&) = delete; + ~Mirror(); int init(); void run(); void handle_signal(int signum); + void print_status(Formatter *f, stringstream *ss); + void flush(); + private: void refresh_peers(const set &peers); void update_replayers(const map > &peer_configs); @@ -52,6 +57,7 @@ private: std::unique_ptr m_local_cluster_watcher; std::map > m_replayers; atomic_t m_stopping; + MirrorAdminSocketHook *m_asok_hook; }; } // namespace mirror diff --git a/src/tools/rbd_mirror/Replayer.cc b/src/tools/rbd_mirror/Replayer.cc index 620a6a848ef5..9a5d8989e1d8 100644 --- a/src/tools/rbd_mirror/Replayer.cc +++ b/src/tools/rbd_mirror/Replayer.cc @@ -3,6 +3,8 @@ #include +#include "common/Formatter.h" +#include "common/admin_socket.h" #include "common/debug.h" #include "common/errno.h" #include "include/stringify.h" @@ -21,6 +23,92 @@ using std::vector; namespace rbd { namespace mirror { +namespace { + +class ReplayerAdminSocketCommand { +public: + virtual ~ReplayerAdminSocketCommand() {} + virtual bool call(Formatter *f, stringstream *ss) = 0; +}; + +class StatusCommand : public ReplayerAdminSocketCommand { +public: + explicit StatusCommand(Replayer *replayer) : replayer(replayer) {} + + bool call(Formatter *f, stringstream *ss) { + replayer->print_status(f, ss); + return true; + } + +private: + Replayer *replayer; +}; + +class FlushCommand : public ReplayerAdminSocketCommand { +public: + explicit FlushCommand(Replayer *replayer) : replayer(replayer) {} + + bool call(Formatter *f, stringstream *ss) { + replayer->flush(); + return true; + } + +private: + Replayer *replayer; +}; + +} // anonymous namespace + +class ReplayerAdminSocketHook : public AdminSocketHook { +public: + ReplayerAdminSocketHook(CephContext *cct, const std::string &name, + Replayer *replayer) : + admin_socket(cct->get_admin_socket()) { + std::string command; + int r; + + command = "rbd mirror status " + name; + r = admin_socket->register_command(command, command, this, + "get status for rbd mirror " + name); + if (r == 0) { + commands[command] = new StatusCommand(replayer); + } + + command = "rbd mirror flush " + name; + r = admin_socket->register_command(command, command, this, + "flush rbd mirror " + name); + if (r == 0) { + commands[command] = new FlushCommand(replayer); + } + } + + ~ReplayerAdminSocketHook() { + for (Commands::const_iterator i = commands.begin(); i != commands.end(); + ++i) { + (void)admin_socket->unregister_command(i->first); + delete i->second; + } + } + + bool call(std::string command, cmdmap_t& cmdmap, std::string format, + bufferlist& out) { + Commands::const_iterator i = commands.find(command); + assert(i != commands.end()); + Formatter *f = Formatter::create(format); + stringstream ss; + bool r = i->second->call(f, &ss); + delete f; + out.append(ss); + return r; + } + +private: + typedef std::map Commands; + + AdminSocket *admin_socket; + Commands commands; +}; + Replayer::Replayer(Threads *threads, RadosRef local_cluster, const peer_t &peer, const std::vector &args) : m_threads(threads), @@ -29,12 +117,17 @@ Replayer::Replayer(Threads *threads, RadosRef local_cluster, m_args(args), m_local(local_cluster), m_remote(new librados::Rados), + m_asok_hook(nullptr), m_replayer_thread(this) { + CephContext *cct = static_cast(m_local->cct()); + m_asok_hook = new ReplayerAdminSocketHook(cct, m_peer.cluster_name, this); } Replayer::~Replayer() { + delete m_asok_hook; + m_stopping.set(1); { Mutex::Locker l(m_lock); @@ -129,6 +222,52 @@ void Replayer::run() } } +void Replayer::print_status(Formatter *f, stringstream *ss) +{ + dout(20) << "enter" << dendl; + + Mutex::Locker l(m_lock); + + if (f) { + f->open_object_section("replayer_status"); + f->dump_stream("peer") << m_peer; + f->open_array_section("image_replayers"); + }; + + for (auto it = m_images.begin(); it != m_images.end(); it++) { + auto &pool_images = it->second; + for (auto i = pool_images.begin(); i != pool_images.end(); i++) { + auto &image_replayer = i->second; + image_replayer->print_status(f, ss); + } + } + + if (f) { + f->close_section(); + f->close_section(); + f->flush(*ss); + } +} + +void Replayer::flush() +{ + dout(20) << "enter" << dendl; + + Mutex::Locker l(m_lock); + + if (m_stopping.read()) { + return; + } + + for (auto it = m_images.begin(); it != m_images.end(); it++) { + auto &pool_images = it->second; + for (auto i = pool_images.begin(); i != pool_images.end(); i++) { + auto &image_replayer = i->second; + image_replayer->flush(); + } + } +} + void Replayer::set_sources(const map > &images) { dout(20) << "enter" << dendl; diff --git a/src/tools/rbd_mirror/Replayer.h b/src/tools/rbd_mirror/Replayer.h index 83748b94e747..cd123a39772b 100644 --- a/src/tools/rbd_mirror/Replayer.h +++ b/src/tools/rbd_mirror/Replayer.h @@ -24,6 +24,7 @@ namespace rbd { namespace mirror { struct Threads; +class ReplayerAdminSocketHook; /** * Controls mirroring for a single remote cluster. @@ -38,7 +39,9 @@ public: int init(); void run(); - void shutdown(); + + void print_status(Formatter *f, stringstream *ss); + void flush(); private: void set_sources(const std::map > &images); @@ -60,6 +63,7 @@ private: // when a pool's configuration changes std::map > > m_images; + ReplayerAdminSocketHook *m_asok_hook; class ReplayerThread : public Thread { Replayer *m_replayer;