LeaderWatcher.cc
Mirror.cc
MirrorStatusWatcher.cc
+ PoolReplayer.cc
PoolWatcher.cc
- Replayer.cc
Threads.cc
types.cc
image_replayer/BootstrapRequest.cc
m_local_cluster_watcher->refresh_pools();
Mutex::Locker l(m_lock);
if (!m_manual_stop) {
- update_replayers(m_local_cluster_watcher->get_pool_peers());
+ update_pool_replayers(m_local_cluster_watcher->get_pool_peers());
}
m_cond.WaitInterval(
m_lock,
utime_t(m_cct->_conf->rbd_mirror_pool_replayers_refresh_interval, 0));
}
- // stop all replayers in parallel
+ // stop all pool replayers in parallel
Mutex::Locker locker(m_lock);
- for (auto it = m_replayers.begin(); it != m_replayers.end(); it++) {
- auto &replayer = it->second;
- replayer->stop(false);
+ for (auto &pool_replayer : m_pool_replayers) {
+ pool_replayer.second->stop(false);
}
dout(20) << "return" << dendl;
}
if (f) {
f->open_object_section("mirror_status");
- f->open_array_section("replayers");
+ f->open_array_section("pool_replayers");
};
- for (auto it = m_replayers.begin(); it != m_replayers.end(); it++) {
- auto &replayer = it->second;
- replayer->print_status(f, ss);
+ for (auto &pool_replayer : m_pool_replayers) {
+ pool_replayer.second->print_status(f, ss);
}
if (f) {
m_manual_stop = false;
- for (auto it = m_replayers.begin(); it != m_replayers.end(); it++) {
- auto &replayer = it->second;
- replayer->start();
+ for (auto &pool_replayer : m_pool_replayers) {
+ pool_replayer.second->start();
}
}
m_manual_stop = true;
- for (auto it = m_replayers.begin(); it != m_replayers.end(); it++) {
- auto &replayer = it->second;
- replayer->stop(true);
+ for (auto &pool_replayer : m_pool_replayers) {
+ pool_replayer.second->stop(true);
}
}
m_manual_stop = false;
- for (auto it = m_replayers.begin(); it != m_replayers.end(); it++) {
- auto &replayer = it->second;
- replayer->restart();
+ for (auto &pool_replayer : m_pool_replayers) {
+ pool_replayer.second->restart();
}
}
return;
}
- for (auto it = m_replayers.begin(); it != m_replayers.end(); it++) {
- auto &replayer = it->second;
- replayer->flush();
+ for (auto &pool_replayer : m_pool_replayers) {
+ pool_replayer.second->flush();
}
}
return;
}
- for (auto it = m_replayers.begin(); it != m_replayers.end(); it++) {
- auto &replayer = it->second;
- replayer->release_leader();
+ for (auto &pool_replayer : m_pool_replayers) {
+ pool_replayer.second->release_leader();
}
}
-void Mirror::update_replayers(const PoolPeers &pool_peers)
+void Mirror::update_pool_replayers(const PoolPeers &pool_peers)
{
dout(20) << "enter" << dendl;
assert(m_lock.is_locked());
- // remove stale replayers before creating new replayers
- for (auto it = m_replayers.begin(); it != m_replayers.end();) {
+ // remove stale pool replayers before creating new pool replayers
+ for (auto it = m_pool_replayers.begin(); it != m_pool_replayers.end();) {
auto &peer = it->first.second;
auto pool_peer_it = pool_peers.find(it->first.first);
if (it->second->is_blacklisted()) {
- derr << "removing blacklisted replayer for " << peer << dendl;
+ derr << "removing blacklisted pool replayer for " << peer << dendl;
// TODO: make async
- it = m_replayers.erase(it);
+ it = m_pool_replayers.erase(it);
} else if (pool_peer_it == pool_peers.end() ||
pool_peer_it->second.find(peer) == pool_peer_it->second.end()) {
- dout(20) << "removing replayer for " << peer << dendl;
+ dout(20) << "removing pool replayer for " << peer << dendl;
// TODO: make async
- it = m_replayers.erase(it);
+ it = m_pool_replayers.erase(it);
} else {
++it;
}
for (auto &kv : pool_peers) {
for (auto &peer : kv.second) {
PoolPeer pool_peer(kv.first, peer);
- if (m_replayers.find(pool_peer) == m_replayers.end()) {
- dout(20) << "starting replayer for " << peer << dendl;
- unique_ptr<Replayer> replayer(new Replayer(m_threads, m_image_deleter,
- m_image_sync_throttler,
- kv.first, peer, m_args));
- // TODO: make async, and retry connecting within replayer
- int r = replayer->init();
+ if (m_pool_replayers.find(pool_peer) == m_pool_replayers.end()) {
+ dout(20) << "starting pool replayer for " << peer << dendl;
+ unique_ptr<PoolReplayer> pool_replayer(new PoolReplayer(
+ m_threads, m_image_deleter, m_image_sync_throttler, kv.first, peer,
+ m_args));
+
+ // TODO: make async, and retry connecting within pool replayer
+ int r = pool_replayer->init();
if (r < 0) {
continue;
}
- m_replayers.insert(std::make_pair(pool_peer, std::move(replayer)));
+ m_pool_replayers.emplace(pool_peer, std::move(pool_replayer));
}
}
+
+ // TODO currently only support a single peer
}
}
#include "include/atomic.h"
#include "include/rados/librados.hpp"
#include "ClusterWatcher.h"
-#include "Replayer.h"
+#include "PoolReplayer.h"
#include "ImageDeleter.h"
#include "types.h"
typedef ClusterWatcher::PoolPeers PoolPeers;
typedef std::pair<int64_t, peer_t> PoolPeer;
- void update_replayers(const PoolPeers &pool_peers);
+ void update_pool_replayers(const PoolPeers &pool_peers);
CephContext *m_cct;
std::vector<const char*> m_args;
std::unique_ptr<ClusterWatcher> m_local_cluster_watcher;
std::shared_ptr<ImageDeleter> m_image_deleter;
ImageSyncThrottlerRef<> m_image_sync_throttler;
- std::map<PoolPeer, std::unique_ptr<Replayer> > m_replayers;
+ std::map<PoolPeer, std::unique_ptr<PoolReplayer> > m_pool_replayers;
atomic_t m_stopping;
bool m_manual_stop = false;
MirrorAdminSocketHook *m_asok_hook;
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "PoolReplayer.h"
+#include <boost/bind.hpp>
+#include "common/Formatter.h"
+#include "common/admin_socket.h"
+#include "common/ceph_argparse.h"
+#include "common/code_environment.h"
+#include "common/common_init.h"
+#include "common/debug.h"
+#include "common/errno.h"
+#include "include/stringify.h"
+#include "cls/rbd/cls_rbd_client.h"
+#include "global/global_context.h"
+#include "librbd/internal.h"
+#include "librbd/Utils.h"
+#include "librbd/Watcher.h"
+#include "librbd/api/Mirror.h"
+#include "InstanceReplayer.h"
+#include "InstanceWatcher.h"
+#include "LeaderWatcher.h"
+#include "Threads.h"
+#include "pool_watcher/RefreshImagesRequest.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_rbd_mirror
+#undef dout_prefix
+#define dout_prefix *_dout << "rbd::mirror::PoolReplayer: " \
+ << this << " " << __func__ << ": "
+
+using std::chrono::seconds;
+using std::map;
+using std::string;
+using std::unique_ptr;
+using std::vector;
+
+using librbd::cls_client::dir_get_name;
+using librbd::util::create_async_context_callback;
+
+namespace rbd {
+namespace mirror {
+
+namespace {
+
+class PoolReplayerAdminSocketCommand {
+public:
+ PoolReplayerAdminSocketCommand(PoolReplayer *pool_replayer)
+ : pool_replayer(pool_replayer) {
+ }
+ virtual ~PoolReplayerAdminSocketCommand() {}
+ virtual bool call(Formatter *f, stringstream *ss) = 0;
+protected:
+ PoolReplayer *pool_replayer;
+};
+
+class StatusCommand : public PoolReplayerAdminSocketCommand {
+public:
+ explicit StatusCommand(PoolReplayer *pool_replayer)
+ : PoolReplayerAdminSocketCommand(pool_replayer) {
+ }
+
+ bool call(Formatter *f, stringstream *ss) override {
+ pool_replayer->print_status(f, ss);
+ return true;
+ }
+};
+
+class StartCommand : public PoolReplayerAdminSocketCommand {
+public:
+ explicit StartCommand(PoolReplayer *pool_replayer)
+ : PoolReplayerAdminSocketCommand(pool_replayer) {
+ }
+
+ bool call(Formatter *f, stringstream *ss) override {
+ pool_replayer->start();
+ return true;
+ }
+};
+
+class StopCommand : public PoolReplayerAdminSocketCommand {
+public:
+ explicit StopCommand(PoolReplayer *pool_replayer)
+ : PoolReplayerAdminSocketCommand(pool_replayer) {
+ }
+
+ bool call(Formatter *f, stringstream *ss) override {
+ pool_replayer->stop(true);
+ return true;
+ }
+};
+
+class RestartCommand : public PoolReplayerAdminSocketCommand {
+public:
+ explicit RestartCommand(PoolReplayer *pool_replayer)
+ : PoolReplayerAdminSocketCommand(pool_replayer) {
+ }
+
+ bool call(Formatter *f, stringstream *ss) override {
+ pool_replayer->restart();
+ return true;
+ }
+};
+
+class FlushCommand : public PoolReplayerAdminSocketCommand {
+public:
+ explicit FlushCommand(PoolReplayer *pool_replayer)
+ : PoolReplayerAdminSocketCommand(pool_replayer) {
+ }
+
+ bool call(Formatter *f, stringstream *ss) override {
+ pool_replayer->flush();
+ return true;
+ }
+};
+
+class LeaderReleaseCommand : public PoolReplayerAdminSocketCommand {
+public:
+ explicit LeaderReleaseCommand(PoolReplayer *pool_replayer)
+ : PoolReplayerAdminSocketCommand(pool_replayer) {
+ }
+
+ bool call(Formatter *f, stringstream *ss) override {
+ pool_replayer->release_leader();
+ return true;
+ }
+};
+
+class PoolReplayerAdminSocketHook : public AdminSocketHook {
+public:
+ PoolReplayerAdminSocketHook(CephContext *cct, const std::string &name,
+ PoolReplayer *pool_replayer)
+ : admin_socket(cct->get_admin_socket()) {
+ std::string command;
+ int r;
+
+ command = "rbd mirror status " + name;
+ r = admin_socket->register_command(command, command, this,
+ "get status for rbd mirror " + name);
+ if (r == 0) {
+ commands[command] = new StatusCommand(pool_replayer);
+ }
+
+ command = "rbd mirror start " + name;
+ r = admin_socket->register_command(command, command, this,
+ "start rbd mirror " + name);
+ if (r == 0) {
+ commands[command] = new StartCommand(pool_replayer);
+ }
+
+ command = "rbd mirror stop " + name;
+ r = admin_socket->register_command(command, command, this,
+ "stop rbd mirror " + name);
+ if (r == 0) {
+ commands[command] = new StopCommand(pool_replayer);
+ }
+
+ command = "rbd mirror restart " + name;
+ r = admin_socket->register_command(command, command, this,
+ "restart rbd mirror " + name);
+ if (r == 0) {
+ commands[command] = new RestartCommand(pool_replayer);
+ }
+
+ command = "rbd mirror flush " + name;
+ r = admin_socket->register_command(command, command, this,
+ "flush rbd mirror " + name);
+ if (r == 0) {
+ commands[command] = new FlushCommand(pool_replayer);
+ }
+
+ command = "rbd mirror leader release " + name;
+ r = admin_socket->register_command(command, command, this,
+ "release rbd mirror leader " + name);
+ if (r == 0) {
+ commands[command] = new LeaderReleaseCommand(pool_replayer);
+ }
+ }
+
+ ~PoolReplayerAdminSocketHook() override {
+ for (Commands::const_iterator i = commands.begin(); i != commands.end();
+ ++i) {
+ (void)admin_socket->unregister_command(i->first);
+ delete i->second;
+ }
+ }
+
+ bool call(std::string command, cmdmap_t& cmdmap, std::string format,
+ bufferlist& out) override {
+ Commands::const_iterator i = commands.find(command);
+ assert(i != commands.end());
+ Formatter *f = Formatter::create(format);
+ stringstream ss;
+ bool r = i->second->call(f, &ss);
+ delete f;
+ out.append(ss);
+ return r;
+ }
+
+private:
+ typedef std::map<std::string, PoolReplayerAdminSocketCommand*> Commands;
+
+ AdminSocket *admin_socket;
+ Commands commands;
+};
+
+} // anonymous namespace
+
+struct PoolReplayer::C_RefreshLocalImages : public Context {
+ PoolReplayer *pool_replayer;
+ Context *on_finish;
+ ImageIds image_ids;
+
+ C_RefreshLocalImages(PoolReplayer *pool_replayer, Context *on_finish)
+ : pool_replayer(pool_replayer), on_finish(on_finish) {
+ }
+
+ void finish(int r) override {
+ pool_replayer->handle_refresh_local_images(r, std::move(image_ids), on_finish);
+ }
+};
+
+PoolReplayer::PoolReplayer(Threads<librbd::ImageCtx> *threads,
+ std::shared_ptr<ImageDeleter> image_deleter,
+ ImageSyncThrottlerRef<> image_sync_throttler,
+ int64_t local_pool_id, const peer_t &peer,
+ const std::vector<const char*> &args) :
+ m_threads(threads),
+ m_image_deleter(image_deleter),
+ m_image_sync_throttler(image_sync_throttler),
+ m_lock(stringify("rbd::mirror::PoolReplayer ") + stringify(peer)),
+ m_peer(peer),
+ m_args(args),
+ m_local_pool_id(local_pool_id),
+ m_pool_watcher_listener(this),
+ m_asok_hook(nullptr),
+ m_pool_replayer_thread(this),
+ m_leader_listener(this)
+{
+}
+
+PoolReplayer::~PoolReplayer()
+{
+ delete m_asok_hook;
+
+ m_stopping.set(1);
+ {
+ Mutex::Locker l(m_lock);
+ m_cond.Signal();
+ }
+ if (m_pool_replayer_thread.is_started()) {
+ m_pool_replayer_thread.join();
+ }
+ if (m_leader_watcher) {
+ m_leader_watcher->shut_down();
+ }
+ if (m_instance_watcher) {
+ m_instance_watcher->shut_down();
+ }
+ if (m_instance_replayer) {
+ m_instance_replayer->shut_down();
+ }
+
+ assert(!m_pool_watcher);
+}
+
+bool PoolReplayer::is_blacklisted() const {
+ Mutex::Locker locker(m_lock);
+ return m_blacklisted;
+}
+
+bool PoolReplayer::is_leader() const {
+ Mutex::Locker locker(m_lock);
+ return m_leader_watcher && m_leader_watcher->is_leader();
+}
+
+int PoolReplayer::init()
+{
+ dout(20) << "replaying for " << m_peer << dendl;
+
+ int r = init_rados(g_ceph_context->_conf->cluster,
+ g_ceph_context->_conf->name.to_str(),
+ "local cluster", &m_local_rados);
+ if (r < 0) {
+ return r;
+ }
+
+ r = init_rados(m_peer.cluster_name, m_peer.client_name,
+ std::string("remote peer ") + stringify(m_peer),
+ &m_remote_rados);
+ if (r < 0) {
+ return r;
+ }
+
+ r = m_local_rados->ioctx_create2(m_local_pool_id, m_local_io_ctx);
+ if (r < 0) {
+ derr << "error accessing local pool " << m_local_pool_id << ": "
+ << cpp_strerror(r) << dendl;
+ return r;
+ }
+
+ std::string local_mirror_uuid;
+ r = librbd::cls_client::mirror_uuid_get(&m_local_io_ctx,
+ &local_mirror_uuid);
+ if (r < 0) {
+ derr << "failed to retrieve local mirror uuid from pool "
+ << m_local_io_ctx.get_pool_name() << ": " << cpp_strerror(r) << dendl;
+ return r;
+ }
+
+ r = m_remote_rados->ioctx_create(m_local_io_ctx.get_pool_name().c_str(),
+ m_remote_io_ctx);
+ if (r < 0) {
+ derr << "error accessing remote pool " << m_local_io_ctx.get_pool_name()
+ << ": " << cpp_strerror(r) << dendl;
+ return r;
+ }
+
+ dout(20) << "connected to " << m_peer << dendl;
+
+ m_instance_replayer.reset(
+ InstanceReplayer<>::create(m_threads, m_image_deleter,
+ m_image_sync_throttler, m_local_rados,
+ local_mirror_uuid, m_local_pool_id));
+ m_instance_replayer->init();
+ m_instance_replayer->add_peer(m_peer.uuid, m_remote_io_ctx);
+
+ m_instance_watcher.reset(InstanceWatcher<>::create(m_local_io_ctx,
+ m_threads->work_queue,
+ m_instance_replayer.get()));
+ r = m_instance_watcher->init();
+ if (r < 0) {
+ derr << "error initializing instance watcher: " << cpp_strerror(r) << dendl;
+ return r;
+ }
+
+ m_leader_watcher.reset(new LeaderWatcher<>(m_threads, m_local_io_ctx,
+ &m_leader_listener));
+ r = m_leader_watcher->init();
+ if (r < 0) {
+ derr << "error initializing leader watcher: " << cpp_strerror(r) << dendl;
+ return r;
+ }
+
+ m_pool_replayer_thread.create("pool replayer");
+
+ return 0;
+}
+
+int PoolReplayer::init_rados(const std::string &cluster_name,
+ const std::string &client_name,
+ const std::string &description,
+ RadosRef *rados_ref) {
+ rados_ref->reset(new librados::Rados());
+
+ // NOTE: manually bootstrap a CephContext here instead of via
+ // the librados API to avoid mixing global singletons between
+ // the librados shared library and the daemon
+ // TODO: eliminate intermingling of global singletons within Ceph APIs
+ CephInitParameters iparams(CEPH_ENTITY_TYPE_CLIENT);
+ if (client_name.empty() || !iparams.name.from_str(client_name)) {
+ derr << "error initializing cluster handle for " << description << dendl;
+ return -EINVAL;
+ }
+
+ CephContext *cct = common_preinit(iparams, CODE_ENVIRONMENT_LIBRARY,
+ CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS);
+ cct->_conf->cluster = cluster_name;
+
+ // librados::Rados::conf_read_file
+ int r = cct->_conf->parse_config_files(nullptr, nullptr, 0);
+ if (r < 0) {
+ derr << "could not read ceph conf for " << description << ": "
+ << cpp_strerror(r) << dendl;
+ cct->put();
+ return r;
+ }
+ cct->_conf->parse_env();
+
+ // librados::Rados::conf_parse_env
+ std::vector<const char*> args;
+ env_to_vec(args, nullptr);
+ r = cct->_conf->parse_argv(args);
+ if (r < 0) {
+ derr << "could not parse environment for " << description << ":"
+ << cpp_strerror(r) << dendl;
+ cct->put();
+ return r;
+ }
+
+ if (!m_args.empty()) {
+ // librados::Rados::conf_parse_argv
+ args = m_args;
+ r = cct->_conf->parse_argv(args);
+ if (r < 0) {
+ derr << "could not parse command line args for " << description << ": "
+ << cpp_strerror(r) << dendl;
+ cct->put();
+ return r;
+ }
+ }
+
+ // disable unnecessary librbd cache
+ cct->_conf->set_val_or_die("rbd_cache", "false");
+ cct->_conf->apply_changes(nullptr);
+ cct->_conf->complain_about_parse_errors(cct);
+
+ r = (*rados_ref)->init_with_context(cct);
+ assert(r == 0);
+ cct->put();
+
+ r = (*rados_ref)->connect();
+ if (r < 0) {
+ derr << "error connecting to " << description << ": "
+ << cpp_strerror(r) << dendl;
+ return r;
+ }
+
+ return 0;
+}
+
+void PoolReplayer::run()
+{
+ dout(20) << "enter" << dendl;
+
+ while (!m_stopping.read()) {
+ std::string asok_hook_name = m_local_io_ctx.get_pool_name() + " " +
+ m_peer.cluster_name;
+ if (m_asok_hook_name != asok_hook_name || m_asok_hook == nullptr) {
+ m_asok_hook_name = asok_hook_name;
+ delete m_asok_hook;
+
+ m_asok_hook = new PoolReplayerAdminSocketHook(g_ceph_context,
+ m_asok_hook_name, this);
+ }
+
+ Mutex::Locker locker(m_lock);
+ if (m_pool_watcher && m_pool_watcher->is_blacklisted()) {
+ m_blacklisted = true;
+ m_stopping.set(1);
+ break;
+ }
+
+ m_cond.WaitInterval(m_lock, utime_t(1, 0));
+ }
+}
+
+void PoolReplayer::print_status(Formatter *f, stringstream *ss)
+{
+ dout(20) << "enter" << dendl;
+
+ if (!f) {
+ return;
+ }
+
+ Mutex::Locker l(m_lock);
+
+ f->open_object_section("pool_replayer_status");
+ f->dump_string("pool", m_local_io_ctx.get_pool_name());
+ f->dump_stream("peer") << m_peer;
+ f->dump_string("instance_id", m_instance_watcher->get_instance_id());
+
+ std::string leader_instance_id;
+ m_leader_watcher->get_leader_instance_id(&leader_instance_id);
+ f->dump_string("leader_instance_id", leader_instance_id);
+
+ bool leader = m_leader_watcher->is_leader();
+ f->dump_bool("leader", leader);
+ if (leader) {
+ std::vector<std::string> instance_ids;
+ m_leader_watcher->list_instances(&instance_ids);
+ f->open_array_section("instances");
+ for (auto instance_id : instance_ids) {
+ f->dump_string("instance_id", instance_id);
+ }
+ f->close_section();
+ }
+
+ m_instance_replayer->print_status(f, ss);
+
+ f->close_section();
+ f->flush(*ss);
+}
+
+void PoolReplayer::start()
+{
+ dout(20) << "enter" << dendl;
+
+ Mutex::Locker l(m_lock);
+
+ if (m_stopping.read()) {
+ return;
+ }
+
+ m_instance_replayer->start();
+}
+
+void PoolReplayer::stop(bool manual)
+{
+ dout(20) << "enter: manual=" << manual << dendl;
+
+ Mutex::Locker l(m_lock);
+ if (!manual) {
+ m_stopping.set(1);
+ m_cond.Signal();
+ return;
+ } else if (m_stopping.read()) {
+ return;
+ }
+
+ m_instance_replayer->stop();
+}
+
+void PoolReplayer::restart()
+{
+ dout(20) << "enter" << dendl;
+
+ Mutex::Locker l(m_lock);
+
+ if (m_stopping.read()) {
+ return;
+ }
+
+ m_instance_replayer->restart();
+}
+
+void PoolReplayer::flush()
+{
+ dout(20) << "enter" << dendl;
+
+ Mutex::Locker l(m_lock);
+
+ if (m_stopping.read() || m_manual_stop) {
+ return;
+ }
+
+ m_instance_replayer->flush();
+}
+
+void PoolReplayer::release_leader()
+{
+ dout(20) << "enter" << dendl;
+
+ Mutex::Locker l(m_lock);
+
+ if (m_stopping.read() || !m_leader_watcher) {
+ return;
+ }
+
+ m_leader_watcher->release_leader();
+}
+
+void PoolReplayer::handle_update(const std::string &mirror_uuid,
+ const ImageIds &added_image_ids,
+ const ImageIds &removed_image_ids) {
+ assert(!mirror_uuid.empty());
+ if (m_stopping.read()) {
+ return;
+ }
+
+ dout(10) << dendl;
+ Mutex::Locker locker(m_lock);
+ if (!m_leader_watcher->is_leader()) {
+ return;
+ }
+
+ if (m_peer.uuid != mirror_uuid) {
+ m_instance_replayer->remove_peer(m_peer.uuid);
+ m_instance_replayer->add_peer(mirror_uuid, m_remote_io_ctx);
+ m_peer.uuid = mirror_uuid;
+ }
+
+ // first callback will be a full directory -- so see if we need to remove
+ // any local images that no longer exist on the remote side
+ if (!m_init_image_ids.empty()) {
+ dout(20) << "scanning initial local image set" << dendl;
+ for (auto &image_id : added_image_ids) {
+ auto it = m_init_image_ids.find(image_id);
+ if (it != m_init_image_ids.end()) {
+ m_init_image_ids.erase(it);
+ }
+ }
+
+ // the remaining images in m_init_image_ids must be deleted
+ for (auto &image_id : m_init_image_ids) {
+ dout(20) << "scheduling the deletion of init image: "
+ << image_id.global_id << " (" << image_id.id << ")" << dendl;
+ m_image_deleter->schedule_image_delete(m_local_rados, m_local_pool_id,
+ image_id.id, image_id.global_id);
+ }
+ m_init_image_ids.clear();
+ }
+
+ m_update_op_tracker.start_op();
+ Context *ctx = new FunctionContext([this](int r) {
+ dout(20) << "complete handle_update: r=" << r << dendl;
+ m_update_op_tracker.finish_op();
+ });
+
+ C_Gather *gather_ctx = new C_Gather(g_ceph_context, ctx);
+
+ for (auto &image_id : removed_image_ids) {
+ // for now always send to myself (the leader)
+ std::string &instance_id = m_instance_watcher->get_instance_id();
+ m_instance_watcher->notify_image_release(instance_id, image_id.global_id,
+ mirror_uuid, image_id.id, true,
+ gather_ctx->new_sub());
+ }
+
+ for (auto &image_id : added_image_ids) {
+ // for now always send to myself (the leader)
+ std::string &instance_id = m_instance_watcher->get_instance_id();
+ m_instance_watcher->notify_image_acquire(instance_id, image_id.global_id,
+ mirror_uuid, image_id.id,
+ gather_ctx->new_sub());
+ }
+
+ gather_ctx->activate();
+}
+
+void PoolReplayer::handle_post_acquire_leader(Context *on_finish) {
+ dout(20) << dendl;
+ refresh_local_images(on_finish);
+}
+
+void PoolReplayer::handle_pre_release_leader(Context *on_finish) {
+ dout(20) << dendl;
+ shut_down_pool_watcher(on_finish);
+}
+
+void PoolReplayer::refresh_local_images(Context *on_finish) {
+ dout(20) << dendl;
+
+ // ensure the initial set of local images is up-to-date
+ // after acquiring the leader role
+ auto ctx = new C_RefreshLocalImages(this, on_finish);
+ auto req = pool_watcher::RefreshImagesRequest<>::create(
+ m_local_io_ctx, &ctx->image_ids, ctx);
+ req->send();
+}
+
+void PoolReplayer::handle_refresh_local_images(int r, ImageIds &&image_ids,
+ Context *on_finish) {
+ dout(20) << "r=" << r << dendl;
+
+ {
+ Mutex::Locker locker(m_lock);
+ m_init_image_ids = std::move(image_ids);
+ }
+
+ if (r < 0) {
+ derr << "failed to retrieve local images: " << cpp_strerror(r) << dendl;
+ on_finish->complete(r);
+ return;
+ }
+
+ init_pool_watcher(on_finish);
+}
+
+void PoolReplayer::init_pool_watcher(Context *on_finish) {
+ dout(20) << dendl;
+
+ Mutex::Locker locker(m_lock);
+ assert(!m_pool_watcher);
+ m_pool_watcher.reset(new PoolWatcher<>(
+ m_threads, m_remote_io_ctx, m_pool_watcher_listener));
+ m_pool_watcher->init(create_async_context_callback(
+ m_threads->work_queue, on_finish));
+
+ m_cond.Signal();
+}
+
+void PoolReplayer::shut_down_pool_watcher(Context *on_finish) {
+ dout(20) << dendl;
+
+ {
+ Mutex::Locker locker(m_lock);
+ if (m_pool_watcher) {
+ Context *ctx = new FunctionContext([this, on_finish](int r) {
+ handle_shut_down_pool_watcher(r, on_finish);
+ });
+ ctx = create_async_context_callback(m_threads->work_queue, ctx);
+
+ m_pool_watcher->shut_down(ctx);
+ return;
+ }
+ }
+
+ on_finish->complete(0);
+}
+
+void PoolReplayer::handle_shut_down_pool_watcher(int r, Context *on_finish) {
+ dout(20) << "r=" << r << dendl;
+
+ {
+ Mutex::Locker locker(m_lock);
+ assert(m_pool_watcher);
+ m_pool_watcher.reset();
+ }
+ wait_for_update_ops(on_finish);
+}
+
+void PoolReplayer::wait_for_update_ops(Context *on_finish) {
+ dout(20) << dendl;
+
+ Mutex::Locker locker(m_lock);
+
+ Context *ctx = new FunctionContext([this, on_finish](int r) {
+ handle_wait_for_update_ops(r, on_finish);
+ });
+ ctx = create_async_context_callback(m_threads->work_queue, ctx);
+
+ m_update_op_tracker.wait_for_ops(ctx);
+}
+
+void PoolReplayer::handle_wait_for_update_ops(int r, Context *on_finish) {
+ dout(20) << "r=" << r << dendl;
+
+ assert(r == 0);
+
+ Mutex::Locker locker(m_lock);
+ m_instance_replayer->release_all(on_finish);
+}
+
+} // namespace mirror
+} // namespace rbd
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_RBD_MIRROR_POOL_REPLAYER_H
+#define CEPH_RBD_MIRROR_POOL_REPLAYER_H
+
+#include <map>
+#include <memory>
+#include <set>
+#include <string>
+
+#include "common/AsyncOpTracker.h"
+#include "common/Cond.h"
+#include "common/Mutex.h"
+#include "common/WorkQueue.h"
+#include "include/atomic.h"
+#include "include/rados/librados.hpp"
+
+#include "ClusterWatcher.h"
+#include "LeaderWatcher.h"
+#include "PoolWatcher.h"
+#include "ImageDeleter.h"
+#include "types.h"
+
+class AdminSocketHook;
+
+namespace librbd { class ImageCtx; }
+
+namespace rbd {
+namespace mirror {
+
+template <typename> struct Threads;
+template <typename> class InstanceReplayer;
+template <typename> class InstanceWatcher;
+
+/**
+ * Controls mirroring for a single remote cluster.
+ */
+class PoolReplayer {
+public:
+ PoolReplayer(Threads<librbd::ImageCtx> *threads,
+ std::shared_ptr<ImageDeleter> image_deleter,
+ ImageSyncThrottlerRef<> image_sync_throttler,
+ int64_t local_pool_id, const peer_t &peer,
+ const std::vector<const char*> &args);
+ ~PoolReplayer();
+ PoolReplayer(const PoolReplayer&) = delete;
+ PoolReplayer& operator=(const PoolReplayer&) = delete;
+
+ bool is_blacklisted() const;
+ bool is_leader() const;
+
+ int init();
+ void run();
+
+ void print_status(Formatter *f, stringstream *ss);
+ void start();
+ void stop(bool manual);
+ void restart();
+ void flush();
+ void release_leader();
+
+private:
+ struct PoolWatcherListener : public PoolWatcher<>::Listener {
+ PoolReplayer *pool_replayer;
+
+ PoolWatcherListener(PoolReplayer *pool_replayer)
+ : pool_replayer(pool_replayer) {
+ }
+
+ void handle_update(const std::string &mirror_uuid,
+ const ImageIds &added_image_ids,
+ const ImageIds &removed_image_ids) override {
+ pool_replayer->handle_update(mirror_uuid, added_image_ids,
+ removed_image_ids);
+ }
+ };
+
+ struct C_RefreshLocalImages;
+
+ void handle_update(const std::string &mirror_uuid,
+ const ImageIds &added_image_ids,
+ const ImageIds &removed_image_ids);
+
+ int init_rados(const std::string &cluster_name,
+ const std::string &client_name,
+ const std::string &description, RadosRef *rados_ref);
+
+ void handle_post_acquire_leader(Context *on_finish);
+ void handle_pre_release_leader(Context *on_finish);
+
+ void refresh_local_images(Context *on_finish);
+ void handle_refresh_local_images(int r, ImageIds &&image_ids,
+ Context *on_finish);
+
+ void init_pool_watcher(Context *on_finish);
+ void shut_down_pool_watcher(Context *on_finish);
+ void handle_shut_down_pool_watcher(int r, Context *on_finish);
+
+ void wait_for_update_ops(Context *on_finish);
+ void handle_wait_for_update_ops(int r, Context *on_finish);
+
+ Threads<librbd::ImageCtx> *m_threads;
+ std::shared_ptr<ImageDeleter> m_image_deleter;
+ ImageSyncThrottlerRef<> m_image_sync_throttler;
+ mutable Mutex m_lock;
+ Cond m_cond;
+ atomic_t m_stopping;
+ bool m_manual_stop = false;
+ bool m_blacklisted = false;
+
+ peer_t m_peer;
+ std::vector<const char*> m_args;
+ RadosRef m_local_rados;
+ RadosRef m_remote_rados;
+
+ librados::IoCtx m_local_io_ctx;
+ librados::IoCtx m_remote_io_ctx;
+
+ int64_t m_local_pool_id = -1;
+
+ PoolWatcherListener m_pool_watcher_listener;
+ std::unique_ptr<PoolWatcher<> > m_pool_watcher;
+
+ std::unique_ptr<InstanceReplayer<librbd::ImageCtx>> m_instance_replayer;
+
+ std::string m_asok_hook_name;
+ AdminSocketHook *m_asok_hook;
+
+ std::set<ImageId> m_init_image_ids;
+
+ class PoolReplayerThread : public Thread {
+ PoolReplayer *m_pool_replayer;
+ public:
+ PoolReplayerThread(PoolReplayer *pool_replayer)
+ : m_pool_replayer(pool_replayer) {
+ }
+ void *entry() override {
+ m_pool_replayer->run();
+ return 0;
+ }
+ } m_pool_replayer_thread;
+
+ class LeaderListener : public LeaderWatcher<>::Listener {
+ public:
+ LeaderListener(PoolReplayer *pool_replayer)
+ : m_pool_replayer(pool_replayer) {
+ }
+
+ protected:
+ void post_acquire_handler(Context *on_finish) override {
+ m_pool_replayer->handle_post_acquire_leader(on_finish);
+ }
+
+ void pre_release_handler(Context *on_finish) override {
+ m_pool_replayer->handle_pre_release_leader(on_finish);
+ }
+
+ private:
+ PoolReplayer *m_pool_replayer;
+ } m_leader_listener;
+
+ std::unique_ptr<LeaderWatcher<> > m_leader_watcher;
+ std::unique_ptr<InstanceWatcher<librbd::ImageCtx> > m_instance_watcher;
+ AsyncOpTracker m_update_op_tracker;
+};
+
+} // namespace mirror
+} // namespace rbd
+
+#endif // CEPH_RBD_MIRROR_POOL_REPLAYER_H
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-#include <boost/bind.hpp>
-
-#include "common/Formatter.h"
-#include "common/admin_socket.h"
-#include "common/ceph_argparse.h"
-#include "common/code_environment.h"
-#include "common/common_init.h"
-#include "common/debug.h"
-#include "common/errno.h"
-#include "include/stringify.h"
-#include "cls/rbd/cls_rbd_client.h"
-#include "global/global_context.h"
-#include "librbd/internal.h"
-#include "librbd/Utils.h"
-#include "librbd/Watcher.h"
-#include "librbd/api/Mirror.h"
-#include "InstanceReplayer.h"
-#include "InstanceWatcher.h"
-#include "LeaderWatcher.h"
-#include "Replayer.h"
-#include "Threads.h"
-#include "pool_watcher/RefreshImagesRequest.h"
-
-#define dout_context g_ceph_context
-#define dout_subsys ceph_subsys_rbd_mirror
-#undef dout_prefix
-#define dout_prefix *_dout << "rbd::mirror::Replayer: " \
- << this << " " << __func__ << ": "
-
-using std::chrono::seconds;
-using std::map;
-using std::string;
-using std::unique_ptr;
-using std::vector;
-
-using librbd::cls_client::dir_get_name;
-using librbd::util::create_async_context_callback;
-
-namespace rbd {
-namespace mirror {
-
-namespace {
-
-class ReplayerAdminSocketCommand {
-public:
- virtual ~ReplayerAdminSocketCommand() {}
- virtual bool call(Formatter *f, stringstream *ss) = 0;
-};
-
-class StatusCommand : public ReplayerAdminSocketCommand {
-public:
- explicit StatusCommand(Replayer *replayer) : replayer(replayer) {}
-
- bool call(Formatter *f, stringstream *ss) override {
- replayer->print_status(f, ss);
- return true;
- }
-
-private:
- Replayer *replayer;
-};
-
-class StartCommand : public ReplayerAdminSocketCommand {
-public:
- explicit StartCommand(Replayer *replayer) : replayer(replayer) {}
-
- bool call(Formatter *f, stringstream *ss) override {
- replayer->start();
- return true;
- }
-
-private:
- Replayer *replayer;
-};
-
-class StopCommand : public ReplayerAdminSocketCommand {
-public:
- explicit StopCommand(Replayer *replayer) : replayer(replayer) {}
-
- bool call(Formatter *f, stringstream *ss) override {
- replayer->stop(true);
- return true;
- }
-
-private:
- Replayer *replayer;
-};
-
-class RestartCommand : public ReplayerAdminSocketCommand {
-public:
- explicit RestartCommand(Replayer *replayer) : replayer(replayer) {}
-
- bool call(Formatter *f, stringstream *ss) override {
- replayer->restart();
- return true;
- }
-
-private:
- Replayer *replayer;
-};
-
-class FlushCommand : public ReplayerAdminSocketCommand {
-public:
- explicit FlushCommand(Replayer *replayer) : replayer(replayer) {}
-
- bool call(Formatter *f, stringstream *ss) override {
- replayer->flush();
- return true;
- }
-
-private:
- Replayer *replayer;
-};
-
-class LeaderReleaseCommand : public ReplayerAdminSocketCommand {
-public:
- explicit LeaderReleaseCommand(Replayer *replayer) : replayer(replayer) {}
-
- bool call(Formatter *f, stringstream *ss) override {
- replayer->release_leader();
- return true;
- }
-
-private:
- Replayer *replayer;
-};
-
-class ReplayerAdminSocketHook : public AdminSocketHook {
-public:
- ReplayerAdminSocketHook(CephContext *cct, const std::string &name,
- Replayer *replayer) :
- admin_socket(cct->get_admin_socket()) {
- std::string command;
- int r;
-
- command = "rbd mirror status " + name;
- r = admin_socket->register_command(command, command, this,
- "get status for rbd mirror " + name);
- if (r == 0) {
- commands[command] = new StatusCommand(replayer);
- }
-
- command = "rbd mirror start " + name;
- r = admin_socket->register_command(command, command, this,
- "start rbd mirror " + name);
- if (r == 0) {
- commands[command] = new StartCommand(replayer);
- }
-
- command = "rbd mirror stop " + name;
- r = admin_socket->register_command(command, command, this,
- "stop rbd mirror " + name);
- if (r == 0) {
- commands[command] = new StopCommand(replayer);
- }
-
- command = "rbd mirror restart " + name;
- r = admin_socket->register_command(command, command, this,
- "restart rbd mirror " + name);
- if (r == 0) {
- commands[command] = new RestartCommand(replayer);
- }
-
- command = "rbd mirror flush " + name;
- r = admin_socket->register_command(command, command, this,
- "flush rbd mirror " + name);
- if (r == 0) {
- commands[command] = new FlushCommand(replayer);
- }
-
- command = "rbd mirror leader release " + name;
- r = admin_socket->register_command(command, command, this,
- "release rbd mirror leader " + name);
- if (r == 0) {
- commands[command] = new LeaderReleaseCommand(replayer);
- }
- }
-
- ~ReplayerAdminSocketHook() override {
- for (Commands::const_iterator i = commands.begin(); i != commands.end();
- ++i) {
- (void)admin_socket->unregister_command(i->first);
- delete i->second;
- }
- }
-
- bool call(std::string command, cmdmap_t& cmdmap, std::string format,
- bufferlist& out) override {
- Commands::const_iterator i = commands.find(command);
- assert(i != commands.end());
- Formatter *f = Formatter::create(format);
- stringstream ss;
- bool r = i->second->call(f, &ss);
- delete f;
- out.append(ss);
- return r;
- }
-
-private:
- typedef std::map<std::string, ReplayerAdminSocketCommand*> Commands;
-
- AdminSocket *admin_socket;
- Commands commands;
-};
-
-} // anonymous namespace
-
-struct Replayer::C_RefreshLocalImages : public Context {
- Replayer *replayer;
- Context *on_finish;
- ImageIds image_ids;
-
- C_RefreshLocalImages(Replayer *replayer, Context *on_finish)
- : replayer(replayer), on_finish(on_finish) {
- }
-
- void finish(int r) override {
- replayer->handle_refresh_local_images(r, std::move(image_ids), on_finish);
- }
-};
-
-Replayer::Replayer(Threads<librbd::ImageCtx> *threads,
- std::shared_ptr<ImageDeleter> image_deleter,
- ImageSyncThrottlerRef<> image_sync_throttler,
- int64_t local_pool_id, const peer_t &peer,
- const std::vector<const char*> &args) :
- m_threads(threads),
- m_image_deleter(image_deleter),
- m_image_sync_throttler(image_sync_throttler),
- m_lock(stringify("rbd::mirror::Replayer ") + stringify(peer)),
- m_peer(peer),
- m_args(args),
- m_local_pool_id(local_pool_id),
- m_pool_watcher_listener(this),
- m_asok_hook(nullptr),
- m_replayer_thread(this),
- m_leader_listener(this)
-{
-}
-
-Replayer::~Replayer()
-{
- delete m_asok_hook;
-
- m_stopping.set(1);
- {
- Mutex::Locker l(m_lock);
- m_cond.Signal();
- }
- if (m_replayer_thread.is_started()) {
- m_replayer_thread.join();
- }
- if (m_leader_watcher) {
- m_leader_watcher->shut_down();
- }
- if (m_instance_watcher) {
- m_instance_watcher->shut_down();
- }
- if (m_instance_replayer) {
- m_instance_replayer->shut_down();
- }
-
- assert(!m_pool_watcher);
-}
-
-bool Replayer::is_blacklisted() const {
- Mutex::Locker locker(m_lock);
- return m_blacklisted;
-}
-
-bool Replayer::is_leader() const {
- Mutex::Locker locker(m_lock);
- return m_leader_watcher && m_leader_watcher->is_leader();
-}
-
-int Replayer::init()
-{
- dout(20) << "replaying for " << m_peer << dendl;
-
- int r = init_rados(g_ceph_context->_conf->cluster,
- g_ceph_context->_conf->name.to_str(),
- "local cluster", &m_local_rados);
- if (r < 0) {
- return r;
- }
-
- r = init_rados(m_peer.cluster_name, m_peer.client_name,
- std::string("remote peer ") + stringify(m_peer),
- &m_remote_rados);
- if (r < 0) {
- return r;
- }
-
- r = m_local_rados->ioctx_create2(m_local_pool_id, m_local_io_ctx);
- if (r < 0) {
- derr << "error accessing local pool " << m_local_pool_id << ": "
- << cpp_strerror(r) << dendl;
- return r;
- }
-
- std::string local_mirror_uuid;
- r = librbd::cls_client::mirror_uuid_get(&m_local_io_ctx,
- &local_mirror_uuid);
- if (r < 0) {
- derr << "failed to retrieve local mirror uuid from pool "
- << m_local_io_ctx.get_pool_name() << ": " << cpp_strerror(r) << dendl;
- return r;
- }
-
- r = m_remote_rados->ioctx_create(m_local_io_ctx.get_pool_name().c_str(),
- m_remote_io_ctx);
- if (r < 0) {
- derr << "error accessing remote pool " << m_local_io_ctx.get_pool_name()
- << ": " << cpp_strerror(r) << dendl;
- return r;
- }
-
- dout(20) << "connected to " << m_peer << dendl;
-
- m_instance_replayer.reset(
- InstanceReplayer<>::create(m_threads, m_image_deleter,
- m_image_sync_throttler, m_local_rados,
- local_mirror_uuid, m_local_pool_id));
- m_instance_replayer->init();
- m_instance_replayer->add_peer(m_peer.uuid, m_remote_io_ctx);
-
- m_instance_watcher.reset(InstanceWatcher<>::create(m_local_io_ctx,
- m_threads->work_queue,
- m_instance_replayer.get()));
- r = m_instance_watcher->init();
- if (r < 0) {
- derr << "error initializing instance watcher: " << cpp_strerror(r) << dendl;
- return r;
- }
-
- m_leader_watcher.reset(new LeaderWatcher<>(m_threads, m_local_io_ctx,
- &m_leader_listener));
- r = m_leader_watcher->init();
- if (r < 0) {
- derr << "error initializing leader watcher: " << cpp_strerror(r) << dendl;
- return r;
- }
-
- m_replayer_thread.create("replayer");
-
- return 0;
-}
-
-int Replayer::init_rados(const std::string &cluster_name,
- const std::string &client_name,
- const std::string &description, RadosRef *rados_ref) {
- rados_ref->reset(new librados::Rados());
-
- // NOTE: manually bootstrap a CephContext here instead of via
- // the librados API to avoid mixing global singletons between
- // the librados shared library and the daemon
- // TODO: eliminate intermingling of global singletons within Ceph APIs
- CephInitParameters iparams(CEPH_ENTITY_TYPE_CLIENT);
- if (client_name.empty() || !iparams.name.from_str(client_name)) {
- derr << "error initializing cluster handle for " << description << dendl;
- return -EINVAL;
- }
-
- CephContext *cct = common_preinit(iparams, CODE_ENVIRONMENT_LIBRARY,
- CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS);
- cct->_conf->cluster = cluster_name;
-
- // librados::Rados::conf_read_file
- int r = cct->_conf->parse_config_files(nullptr, nullptr, 0);
- if (r < 0) {
- derr << "could not read ceph conf for " << description << ": "
- << cpp_strerror(r) << dendl;
- cct->put();
- return r;
- }
- cct->_conf->parse_env();
-
- // librados::Rados::conf_parse_env
- std::vector<const char*> args;
- env_to_vec(args, nullptr);
- r = cct->_conf->parse_argv(args);
- if (r < 0) {
- derr << "could not parse environment for " << description << ":"
- << cpp_strerror(r) << dendl;
- cct->put();
- return r;
- }
-
- if (!m_args.empty()) {
- // librados::Rados::conf_parse_argv
- args = m_args;
- r = cct->_conf->parse_argv(args);
- if (r < 0) {
- derr << "could not parse command line args for " << description << ": "
- << cpp_strerror(r) << dendl;
- cct->put();
- return r;
- }
- }
-
- // disable unnecessary librbd cache
- cct->_conf->set_val_or_die("rbd_cache", "false");
- cct->_conf->apply_changes(nullptr);
- cct->_conf->complain_about_parse_errors(cct);
-
- r = (*rados_ref)->init_with_context(cct);
- assert(r == 0);
- cct->put();
-
- r = (*rados_ref)->connect();
- if (r < 0) {
- derr << "error connecting to " << description << ": "
- << cpp_strerror(r) << dendl;
- return r;
- }
-
- return 0;
-}
-
-void Replayer::run()
-{
- dout(20) << "enter" << dendl;
-
- while (!m_stopping.read()) {
- std::string asok_hook_name = m_local_io_ctx.get_pool_name() + " " +
- m_peer.cluster_name;
- if (m_asok_hook_name != asok_hook_name || m_asok_hook == nullptr) {
- m_asok_hook_name = asok_hook_name;
- delete m_asok_hook;
-
- m_asok_hook = new ReplayerAdminSocketHook(g_ceph_context,
- m_asok_hook_name, this);
- }
-
- Mutex::Locker locker(m_lock);
- if (m_pool_watcher && m_pool_watcher->is_blacklisted()) {
- m_blacklisted = true;
- m_stopping.set(1);
- break;
- }
-
- m_cond.WaitInterval(m_lock, utime_t(1, 0));
- }
-}
-
-void Replayer::print_status(Formatter *f, stringstream *ss)
-{
- dout(20) << "enter" << dendl;
-
- if (!f) {
- return;
- }
-
- Mutex::Locker l(m_lock);
-
- f->open_object_section("replayer_status");
- f->dump_string("pool", m_local_io_ctx.get_pool_name());
- f->dump_stream("peer") << m_peer;
- f->dump_string("instance_id", m_instance_watcher->get_instance_id());
-
- std::string leader_instance_id;
- m_leader_watcher->get_leader_instance_id(&leader_instance_id);
- f->dump_string("leader_instance_id", leader_instance_id);
-
- bool leader = m_leader_watcher->is_leader();
- f->dump_bool("leader", leader);
- if (leader) {
- std::vector<std::string> instance_ids;
- m_leader_watcher->list_instances(&instance_ids);
- f->open_array_section("instances");
- for (auto instance_id : instance_ids) {
- f->dump_string("instance_id", instance_id);
- }
- f->close_section();
- }
-
- m_instance_replayer->print_status(f, ss);
-
- f->close_section();
- f->flush(*ss);
-}
-
-void Replayer::start()
-{
- dout(20) << "enter" << dendl;
-
- Mutex::Locker l(m_lock);
-
- if (m_stopping.read()) {
- return;
- }
-
- m_instance_replayer->start();
-}
-
-void Replayer::stop(bool manual)
-{
- dout(20) << "enter: manual=" << manual << dendl;
-
- Mutex::Locker l(m_lock);
- if (!manual) {
- m_stopping.set(1);
- m_cond.Signal();
- return;
- } else if (m_stopping.read()) {
- return;
- }
-
- m_instance_replayer->stop();
-}
-
-void Replayer::restart()
-{
- dout(20) << "enter" << dendl;
-
- Mutex::Locker l(m_lock);
-
- if (m_stopping.read()) {
- return;
- }
-
- m_instance_replayer->restart();
-}
-
-void Replayer::flush()
-{
- dout(20) << "enter" << dendl;
-
- Mutex::Locker l(m_lock);
-
- if (m_stopping.read() || m_manual_stop) {
- return;
- }
-
- m_instance_replayer->flush();
-}
-
-void Replayer::release_leader()
-{
- dout(20) << "enter" << dendl;
-
- Mutex::Locker l(m_lock);
-
- if (m_stopping.read() || !m_leader_watcher) {
- return;
- }
-
- m_leader_watcher->release_leader();
-}
-
-void Replayer::handle_update(const std::string &mirror_uuid,
- const ImageIds &added_image_ids,
- const ImageIds &removed_image_ids) {
- assert(!mirror_uuid.empty());
- if (m_stopping.read()) {
- return;
- }
-
- dout(10) << dendl;
- Mutex::Locker locker(m_lock);
- if (!m_leader_watcher->is_leader()) {
- return;
- }
-
- if (m_peer.uuid != mirror_uuid) {
- m_instance_replayer->remove_peer(m_peer.uuid);
- m_instance_replayer->add_peer(mirror_uuid, m_remote_io_ctx);
- m_peer.uuid = mirror_uuid;
- }
-
- // first callback will be a full directory -- so see if we need to remove
- // any local images that no longer exist on the remote side
- if (!m_init_image_ids.empty()) {
- dout(20) << "scanning initial local image set" << dendl;
- for (auto &image_id : added_image_ids) {
- auto it = m_init_image_ids.find(image_id);
- if (it != m_init_image_ids.end()) {
- m_init_image_ids.erase(it);
- }
- }
-
- // the remaining images in m_init_image_ids must be deleted
- for (auto &image_id : m_init_image_ids) {
- dout(20) << "scheduling the deletion of init image: "
- << image_id.global_id << " (" << image_id.id << ")" << dendl;
- m_image_deleter->schedule_image_delete(m_local_rados, m_local_pool_id,
- image_id.id, image_id.global_id);
- }
- m_init_image_ids.clear();
- }
-
- m_update_op_tracker.start_op();
- Context *ctx = new FunctionContext([this](int r) {
- dout(20) << "complete handle_update: r=" << r << dendl;
- m_update_op_tracker.finish_op();
- });
-
- C_Gather *gather_ctx = new C_Gather(g_ceph_context, ctx);
-
- for (auto &image_id : removed_image_ids) {
- // for now always send to myself (the leader)
- std::string &instance_id = m_instance_watcher->get_instance_id();
- m_instance_watcher->notify_image_release(instance_id, image_id.global_id,
- mirror_uuid, image_id.id, true,
- gather_ctx->new_sub());
- }
-
- for (auto &image_id : added_image_ids) {
- // for now always send to myself (the leader)
- std::string &instance_id = m_instance_watcher->get_instance_id();
- m_instance_watcher->notify_image_acquire(instance_id, image_id.global_id,
- mirror_uuid, image_id.id,
- gather_ctx->new_sub());
- }
-
- gather_ctx->activate();
-}
-
-void Replayer::handle_post_acquire_leader(Context *on_finish) {
- dout(20) << dendl;
- refresh_local_images(on_finish);
-}
-
-void Replayer::handle_pre_release_leader(Context *on_finish) {
- dout(20) << dendl;
- shut_down_pool_watcher(on_finish);
-}
-
-void Replayer::refresh_local_images(Context *on_finish) {
- dout(20) << dendl;
-
- // ensure the initial set of local images is up-to-date
- // after acquiring the leader role
- auto ctx = new C_RefreshLocalImages(this, on_finish);
- auto req = pool_watcher::RefreshImagesRequest<>::create(
- m_local_io_ctx, &ctx->image_ids, ctx);
- req->send();
-}
-
-void Replayer::handle_refresh_local_images(int r, ImageIds &&image_ids,
- Context *on_finish) {
- dout(20) << "r=" << r << dendl;
-
- {
- Mutex::Locker locker(m_lock);
- m_init_image_ids = std::move(image_ids);
- }
-
- if (r < 0) {
- derr << "failed to retrieve local images: " << cpp_strerror(r) << dendl;
- on_finish->complete(r);
- return;
- }
-
- init_pool_watcher(on_finish);
-}
-
-void Replayer::init_pool_watcher(Context *on_finish) {
- dout(20) << dendl;
-
- Mutex::Locker locker(m_lock);
- assert(!m_pool_watcher);
- m_pool_watcher.reset(new PoolWatcher<>(
- m_threads, m_remote_io_ctx, m_pool_watcher_listener));
- m_pool_watcher->init(create_async_context_callback(
- m_threads->work_queue, on_finish));
-
- m_cond.Signal();
-}
-
-void Replayer::shut_down_pool_watcher(Context *on_finish) {
- dout(20) << dendl;
-
- {
- Mutex::Locker locker(m_lock);
- if (m_pool_watcher) {
- Context *ctx = new FunctionContext([this, on_finish](int r) {
- handle_shut_down_pool_watcher(r, on_finish);
- });
- ctx = create_async_context_callback(m_threads->work_queue, ctx);
-
- m_pool_watcher->shut_down(ctx);
- return;
- }
- }
-
- on_finish->complete(0);
-}
-
-void Replayer::handle_shut_down_pool_watcher(int r, Context *on_finish) {
- dout(20) << "r=" << r << dendl;
-
- {
- Mutex::Locker locker(m_lock);
- assert(m_pool_watcher);
- m_pool_watcher.reset();
- }
- wait_for_update_ops(on_finish);
-}
-
-void Replayer::wait_for_update_ops(Context *on_finish) {
- dout(20) << dendl;
-
- Mutex::Locker locker(m_lock);
-
- Context *ctx = new FunctionContext([this, on_finish](int r) {
- handle_wait_for_update_ops(r, on_finish);
- });
- ctx = create_async_context_callback(m_threads->work_queue, ctx);
-
- m_update_op_tracker.wait_for_ops(ctx);
-}
-
-void Replayer::handle_wait_for_update_ops(int r, Context *on_finish) {
- dout(20) << "r=" << r << dendl;
-
- assert(r == 0);
-
- Mutex::Locker locker(m_lock);
- m_instance_replayer->release_all(on_finish);
-}
-
-} // namespace mirror
-} // namespace rbd
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-#ifndef CEPH_RBD_MIRROR_REPLAYER_H
-#define CEPH_RBD_MIRROR_REPLAYER_H
-
-#include <map>
-#include <memory>
-#include <set>
-#include <string>
-
-#include "common/AsyncOpTracker.h"
-#include "common/Cond.h"
-#include "common/Mutex.h"
-#include "common/WorkQueue.h"
-#include "include/atomic.h"
-#include "include/rados/librados.hpp"
-
-#include "ClusterWatcher.h"
-#include "LeaderWatcher.h"
-#include "PoolWatcher.h"
-#include "ImageDeleter.h"
-#include "types.h"
-
-class AdminSocketHook;
-
-namespace librbd { class ImageCtx; }
-
-namespace rbd {
-namespace mirror {
-
-template <typename> struct Threads;
-template <typename> class InstanceReplayer;
-template <typename> class InstanceWatcher;
-
-/**
- * Controls mirroring for a single remote cluster.
- */
-class Replayer {
-public:
- Replayer(Threads<librbd::ImageCtx> *threads,
- std::shared_ptr<ImageDeleter> image_deleter,
- ImageSyncThrottlerRef<> image_sync_throttler,
- int64_t local_pool_id, const peer_t &peer,
- const std::vector<const char*> &args);
- ~Replayer();
- Replayer(const Replayer&) = delete;
- Replayer& operator=(const Replayer&) = delete;
-
- bool is_blacklisted() const;
- bool is_leader() const;
-
- int init();
- void run();
-
- void print_status(Formatter *f, stringstream *ss);
- void start();
- void stop(bool manual);
- void restart();
- void flush();
- void release_leader();
-
-private:
- struct PoolWatcherListener : public PoolWatcher<>::Listener {
- Replayer *replayer;
-
- PoolWatcherListener(Replayer *replayer) : replayer(replayer) {
- }
-
- void handle_update(const std::string &mirror_uuid,
- const ImageIds &added_image_ids,
- const ImageIds &removed_image_ids) override {
- replayer->handle_update(mirror_uuid, added_image_ids, removed_image_ids);
- }
- };
-
- struct C_RefreshLocalImages;
-
- void handle_update(const std::string &mirror_uuid,
- const ImageIds &added_image_ids,
- const ImageIds &removed_image_ids);
-
- int init_rados(const std::string &cluster_name,
- const std::string &client_name,
- const std::string &description, RadosRef *rados_ref);
-
- void handle_post_acquire_leader(Context *on_finish);
- void handle_pre_release_leader(Context *on_finish);
-
- void refresh_local_images(Context *on_finish);
- void handle_refresh_local_images(int r, ImageIds &&image_ids,
- Context *on_finish);
-
- void init_pool_watcher(Context *on_finish);
- void shut_down_pool_watcher(Context *on_finish);
- void handle_shut_down_pool_watcher(int r, Context *on_finish);
-
- void wait_for_update_ops(Context *on_finish);
- void handle_wait_for_update_ops(int r, Context *on_finish);
-
- Threads<librbd::ImageCtx> *m_threads;
- std::shared_ptr<ImageDeleter> m_image_deleter;
- ImageSyncThrottlerRef<> m_image_sync_throttler;
- mutable Mutex m_lock;
- Cond m_cond;
- atomic_t m_stopping;
- bool m_manual_stop = false;
- bool m_blacklisted = false;
-
- peer_t m_peer;
- std::vector<const char*> m_args;
- RadosRef m_local_rados;
- RadosRef m_remote_rados;
-
- librados::IoCtx m_local_io_ctx;
- librados::IoCtx m_remote_io_ctx;
-
- int64_t m_local_pool_id = -1;
-
- PoolWatcherListener m_pool_watcher_listener;
- std::unique_ptr<PoolWatcher<> > m_pool_watcher;
-
- std::unique_ptr<InstanceReplayer<librbd::ImageCtx>> m_instance_replayer;
-
- std::string m_asok_hook_name;
- AdminSocketHook *m_asok_hook;
-
- std::set<ImageId> m_init_image_ids;
-
- class ReplayerThread : public Thread {
- Replayer *m_replayer;
- public:
- ReplayerThread(Replayer *replayer) : m_replayer(replayer) {}
- void *entry() override {
- m_replayer->run();
- return 0;
- }
- } m_replayer_thread;
-
- class LeaderListener : public LeaderWatcher<>::Listener {
- public:
- LeaderListener(Replayer *replayer) : m_replayer(replayer) {
- }
-
- protected:
- void post_acquire_handler(Context *on_finish) override {
- m_replayer->handle_post_acquire_leader(on_finish);
- }
-
- void pre_release_handler(Context *on_finish) override {
- m_replayer->handle_pre_release_leader(on_finish);
- }
-
- private:
- Replayer *m_replayer;
- } m_leader_listener;
-
- std::unique_ptr<LeaderWatcher<> > m_leader_watcher;
- std::unique_ptr<InstanceWatcher<librbd::ImageCtx> > m_instance_watcher;
- AsyncOpTracker m_update_op_tracker;
-};
-
-} // namespace mirror
-} // namespace rbd
-
-#endif // CEPH_RBD_MIRROR_REPLAYER_H