using librbd::util::create_context_callback;
using namespace rbd::mirror::image_replayer;
+std::ostream &operator<<(std::ostream &os, const ImageReplayer::State &state);
+
namespace {
struct ReplayHandler : public ::journal::ReplayHandler {
explicit StatusCommand(ImageReplayer *replayer) : replayer(replayer) {}
bool call(Formatter *f, stringstream *ss) {
- if (f) {
- f->open_object_section("status");
- f->dump_stream("state") << replayer->get_state();
- f->close_section();
- f->flush(*ss);
- } else {
- *ss << "state: " << replayer->get_state();
- }
+ replayer->print_status(f, ss);
return true;
}
m_remote_pool_id(remote_pool_id),
m_local_pool_id(local_pool_id),
m_remote_image_id(remote_image_id),
+ m_name(stringify(remote_pool_id) + "/" + remote_image_id),
m_lock("rbd::mirror::ImageReplayer " + stringify(remote_pool_id) + " " +
remote_image_id),
m_state(STATE_UNINITIALIZED),
dout(20) << "enter" << dendl;
if (!m_asok_hook) {
+ Mutex::Locker locker(m_lock);
+
+ m_name = m_local_ioctx.get_pool_name() + "/" + m_local_image_ctx->name;
+
CephContext *cct = static_cast<CephContext *>(m_local->cct());
- std::string name = m_local_ioctx.get_pool_name() + "/" +
- m_local_image_ctx->name;
- m_asok_hook = new ImageReplayerAdminSocketHook(cct, name, this);
+ m_asok_hook = new ImageReplayerAdminSocketHook(cct, m_name, this);
}
FunctionContext *ctx = new FunctionContext(
return true;
}
+void ImageReplayer::print_status(Formatter *f, stringstream *ss)
+{
+ dout(20) << "enter" << dendl;
+
+ Mutex::Locker l(m_lock);
+
+ if (f) {
+ f->open_object_section("image_replayer");
+ f->dump_string("name", m_name);
+ f->dump_stream("state") << m_state;
+ f->close_section();
+ f->flush(*ss);
+ } else {
+ *ss << m_name << ": state: " << m_state;
+ }
+}
+
void ImageReplayer::handle_replay_process_ready(int r)
{
// journal::Replay is ready for more events -- attempt to pop another
bool is_stopped() { Mutex::Locker l(m_lock); return is_stopped_(); }
bool is_running() { Mutex::Locker l(m_lock); return is_running_(); }
+ std::string get_name() { Mutex::Locker l(m_lock); return m_name; };
+
void start(Context *on_finish = nullptr,
const BootstrapParams *bootstrap_params = nullptr);
void stop(Context *on_finish = nullptr);
void flush(Context *on_finish = nullptr);
+ void print_status(Formatter *f, stringstream *ss);
+
virtual void handle_replay_ready();
virtual void handle_replay_process_ready(int r);
virtual void handle_replay_complete(int r);
std::string m_client_id;
int64_t m_remote_pool_id, m_local_pool_id;
std::string m_remote_image_id, m_local_image_id;
+ std::string m_name;
Mutex m_lock;
State m_state;
std::string m_local_pool_name, m_remote_pool_name;
#include <boost/range/adaptor/map.hpp>
+#include "common/Formatter.h"
+#include "common/admin_socket.h"
#include "common/debug.h"
#include "common/errno.h"
#include "Mirror.h"
namespace rbd {
namespace mirror {
+namespace {
+
+class MirrorAdminSocketCommand {
+public:
+ virtual ~MirrorAdminSocketCommand() {}
+ virtual bool call(Formatter *f, stringstream *ss) = 0;
+};
+
+class StatusCommand : public MirrorAdminSocketCommand {
+public:
+ explicit StatusCommand(Mirror *mirror) : mirror(mirror) {}
+
+ bool call(Formatter *f, stringstream *ss) {
+ mirror->print_status(f, ss);
+ return true;
+ }
+
+private:
+ Mirror *mirror;
+};
+
+class FlushCommand : public MirrorAdminSocketCommand {
+public:
+ explicit FlushCommand(Mirror *mirror) : mirror(mirror) {}
+
+ bool call(Formatter *f, stringstream *ss) {
+ mirror->flush();
+ return true;
+ }
+
+private:
+ Mirror *mirror;
+};
+
+} // anonymous namespace
+
+class MirrorAdminSocketHook : public AdminSocketHook {
+public:
+ MirrorAdminSocketHook(CephContext *cct, Mirror *mirror) :
+ admin_socket(cct->get_admin_socket()) {
+ std::string command;
+ int r;
+
+ command = "rbd mirror status";
+ r = admin_socket->register_command(command, command, this,
+ "get status for rbd mirror");
+ if (r == 0) {
+ commands[command] = new StatusCommand(mirror);
+ }
+
+ command = "rbd mirror flush";
+ r = admin_socket->register_command(command, command, this,
+ "flush rbd mirror");
+ if (r == 0) {
+ commands[command] = new FlushCommand(mirror);
+ }
+ }
+
+ ~MirrorAdminSocketHook() {
+ for (Commands::const_iterator i = commands.begin(); i != commands.end();
+ ++i) {
+ (void)admin_socket->unregister_command(i->first);
+ delete i->second;
+ }
+ }
+
+ bool call(std::string command, cmdmap_t& cmdmap, std::string format,
+ bufferlist& out) {
+ Commands::const_iterator i = commands.find(command);
+ assert(i != commands.end());
+ Formatter *f = Formatter::create(format);
+ stringstream ss;
+ bool r = i->second->call(f, &ss);
+ delete f;
+ out.append(ss);
+ return r;
+ }
+
+private:
+ typedef std::map<std::string, MirrorAdminSocketCommand*> Commands;
+
+ AdminSocket *admin_socket;
+ Commands commands;
+};
+
Mirror::Mirror(CephContext *cct, const std::vector<const char*> &args) :
m_cct(cct),
m_args(args),
m_lock("rbd::mirror::Mirror"),
- m_local(new librados::Rados())
+ m_local(new librados::Rados()),
+ m_asok_hook(new MirrorAdminSocketHook(cct, this))
{
cct->lookup_or_create_singleton_object<Threads>(m_threads,
"rbd_mirror::threads");
}
+Mirror::~Mirror()
+{
+ delete m_asok_hook;
+}
+
void Mirror::handle_signal(int signum)
{
m_stopping.set(1);
dout(20) << "return" << dendl;
}
+void Mirror::print_status(Formatter *f, stringstream *ss)
+{
+ dout(20) << "enter" << dendl;
+
+ Mutex::Locker l(m_lock);
+
+ if (m_stopping.read()) {
+ return;
+ }
+
+ if (f) {
+ f->open_object_section("mirror_status");
+ f->open_array_section("replayers");
+ };
+
+ for (auto it = m_replayers.begin(); it != m_replayers.end(); it++) {
+ auto &replayer = it->second;
+ replayer->print_status(f, ss);
+ }
+
+ if (f) {
+ f->close_section();
+ f->close_section();
+ f->flush(*ss);
+ }
+}
+
+void Mirror::flush()
+{
+ dout(20) << "enter" << dendl;
+ Mutex::Locker l(m_lock);
+
+ if (m_stopping.read()) {
+ return;
+ }
+
+ for (auto it = m_replayers.begin(); it != m_replayers.end(); it++) {
+ auto &replayer = it->second;
+ replayer->flush();
+ }
+}
+
void Mirror::update_replayers(const map<peer_t, set<int64_t> > &peer_configs)
{
dout(20) << "enter" << dendl;
namespace mirror {
struct Threads;
+class MirrorAdminSocketHook;
/**
* Contains the main loop and overall state for rbd-mirror.
Mirror(CephContext *cct, const std::vector<const char*> &args);
Mirror(const Mirror&) = delete;
Mirror& operator=(const Mirror&) = delete;
+ ~Mirror();
int init();
void run();
void handle_signal(int signum);
+ void print_status(Formatter *f, stringstream *ss);
+ void flush();
+
private:
void refresh_peers(const set<peer_t> &peers);
void update_replayers(const map<peer_t, set<int64_t> > &peer_configs);
std::unique_ptr<ClusterWatcher> m_local_cluster_watcher;
std::map<peer_t, std::unique_ptr<Replayer> > m_replayers;
atomic_t m_stopping;
+ MirrorAdminSocketHook *m_asok_hook;
};
} // namespace mirror
#include <boost/bind.hpp>
+#include "common/Formatter.h"
+#include "common/admin_socket.h"
#include "common/debug.h"
#include "common/errno.h"
#include "include/stringify.h"
namespace rbd {
namespace mirror {
+namespace {
+
+class ReplayerAdminSocketCommand {
+public:
+ virtual ~ReplayerAdminSocketCommand() {}
+ virtual bool call(Formatter *f, stringstream *ss) = 0;
+};
+
+class StatusCommand : public ReplayerAdminSocketCommand {
+public:
+ explicit StatusCommand(Replayer *replayer) : replayer(replayer) {}
+
+ bool call(Formatter *f, stringstream *ss) {
+ replayer->print_status(f, ss);
+ return true;
+ }
+
+private:
+ Replayer *replayer;
+};
+
+class FlushCommand : public ReplayerAdminSocketCommand {
+public:
+ explicit FlushCommand(Replayer *replayer) : replayer(replayer) {}
+
+ bool call(Formatter *f, stringstream *ss) {
+ replayer->flush();
+ return true;
+ }
+
+private:
+ Replayer *replayer;
+};
+
+} // anonymous namespace
+
+class ReplayerAdminSocketHook : public AdminSocketHook {
+public:
+ ReplayerAdminSocketHook(CephContext *cct, const std::string &name,
+ Replayer *replayer) :
+ admin_socket(cct->get_admin_socket()) {
+ std::string command;
+ int r;
+
+ command = "rbd mirror status " + name;
+ r = admin_socket->register_command(command, command, this,
+ "get status for rbd mirror " + name);
+ if (r == 0) {
+ commands[command] = new StatusCommand(replayer);
+ }
+
+ command = "rbd mirror flush " + name;
+ r = admin_socket->register_command(command, command, this,
+ "flush rbd mirror " + name);
+ if (r == 0) {
+ commands[command] = new FlushCommand(replayer);
+ }
+ }
+
+ ~ReplayerAdminSocketHook() {
+ for (Commands::const_iterator i = commands.begin(); i != commands.end();
+ ++i) {
+ (void)admin_socket->unregister_command(i->first);
+ delete i->second;
+ }
+ }
+
+ bool call(std::string command, cmdmap_t& cmdmap, std::string format,
+ bufferlist& out) {
+ Commands::const_iterator i = commands.find(command);
+ assert(i != commands.end());
+ Formatter *f = Formatter::create(format);
+ stringstream ss;
+ bool r = i->second->call(f, &ss);
+ delete f;
+ out.append(ss);
+ return r;
+ }
+
+private:
+ typedef std::map<std::string, ReplayerAdminSocketCommand*> Commands;
+
+ AdminSocket *admin_socket;
+ Commands commands;
+};
+
Replayer::Replayer(Threads *threads, RadosRef local_cluster,
const peer_t &peer, const std::vector<const char*> &args) :
m_threads(threads),
m_args(args),
m_local(local_cluster),
m_remote(new librados::Rados),
+ m_asok_hook(nullptr),
m_replayer_thread(this)
{
+ CephContext *cct = static_cast<CephContext *>(m_local->cct());
+ m_asok_hook = new ReplayerAdminSocketHook(cct, m_peer.cluster_name, this);
}
Replayer::~Replayer()
{
+ delete m_asok_hook;
+
m_stopping.set(1);
{
Mutex::Locker l(m_lock);
}
}
+void Replayer::print_status(Formatter *f, stringstream *ss)
+{
+ dout(20) << "enter" << dendl;
+
+ Mutex::Locker l(m_lock);
+
+ if (f) {
+ f->open_object_section("replayer_status");
+ f->dump_stream("peer") << m_peer;
+ f->open_array_section("image_replayers");
+ };
+
+ for (auto it = m_images.begin(); it != m_images.end(); it++) {
+ auto &pool_images = it->second;
+ for (auto i = pool_images.begin(); i != pool_images.end(); i++) {
+ auto &image_replayer = i->second;
+ image_replayer->print_status(f, ss);
+ }
+ }
+
+ if (f) {
+ f->close_section();
+ f->close_section();
+ f->flush(*ss);
+ }
+}
+
+void Replayer::flush()
+{
+ dout(20) << "enter" << dendl;
+
+ Mutex::Locker l(m_lock);
+
+ if (m_stopping.read()) {
+ return;
+ }
+
+ for (auto it = m_images.begin(); it != m_images.end(); it++) {
+ auto &pool_images = it->second;
+ for (auto i = pool_images.begin(); i != pool_images.end(); i++) {
+ auto &image_replayer = i->second;
+ image_replayer->flush();
+ }
+ }
+}
+
void Replayer::set_sources(const map<int64_t, set<string> > &images)
{
dout(20) << "enter" << dendl;
namespace mirror {
struct Threads;
+class ReplayerAdminSocketHook;
/**
* Controls mirroring for a single remote cluster.
int init();
void run();
- void shutdown();
+
+ void print_status(Formatter *f, stringstream *ss);
+ void flush();
private:
void set_sources(const std::map<int64_t, std::set<std::string> > &images);
// when a pool's configuration changes
std::map<int64_t, std::map<std::string,
std::unique_ptr<ImageReplayer> > > m_images;
+ ReplayerAdminSocketHook *m_asok_hook;
class ReplayerThread : public Thread {
Replayer *m_replayer;