From: Jason Dillaman Date: Wed, 19 Apr 2017 20:23:15 +0000 (-0400) Subject: rbd-mirror: renamed Replayer to PoolReplayer X-Git-Tag: v12.0.3~284^2~5 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=9795f125d67cfa414ef59f60a9178ac254684633;p=ceph.git rbd-mirror: renamed Replayer to PoolReplayer This is a stepping stone to support multiple peers within a single pool. Signed-off-by: Jason Dillaman --- diff --git a/src/tools/rbd_mirror/CMakeLists.txt b/src/tools/rbd_mirror/CMakeLists.txt index 4f1acbcbd4f2..3fb0536c762a 100644 --- a/src/tools/rbd_mirror/CMakeLists.txt +++ b/src/tools/rbd_mirror/CMakeLists.txt @@ -14,8 +14,8 @@ set(rbd_mirror_internal LeaderWatcher.cc Mirror.cc MirrorStatusWatcher.cc + PoolReplayer.cc PoolWatcher.cc - Replayer.cc Threads.cc types.cc image_replayer/BootstrapRequest.cc diff --git a/src/tools/rbd_mirror/Mirror.cc b/src/tools/rbd_mirror/Mirror.cc index 0424edd91ddb..86c6939182f5 100644 --- a/src/tools/rbd_mirror/Mirror.cc +++ b/src/tools/rbd_mirror/Mirror.cc @@ -254,18 +254,17 @@ void Mirror::run() m_local_cluster_watcher->refresh_pools(); Mutex::Locker l(m_lock); if (!m_manual_stop) { - update_replayers(m_local_cluster_watcher->get_pool_peers()); + update_pool_replayers(m_local_cluster_watcher->get_pool_peers()); } m_cond.WaitInterval( m_lock, utime_t(m_cct->_conf->rbd_mirror_pool_replayers_refresh_interval, 0)); } - // stop all replayers in parallel + // stop all pool replayers in parallel Mutex::Locker locker(m_lock); - for (auto it = m_replayers.begin(); it != m_replayers.end(); it++) { - auto &replayer = it->second; - replayer->stop(false); + for (auto &pool_replayer : m_pool_replayers) { + pool_replayer.second->stop(false); } dout(20) << "return" << dendl; } @@ -282,12 +281,11 @@ void Mirror::print_status(Formatter *f, stringstream *ss) if (f) { f->open_object_section("mirror_status"); - f->open_array_section("replayers"); + f->open_array_section("pool_replayers"); }; - for (auto it = m_replayers.begin(); it != m_replayers.end(); it++) { - auto &replayer = it->second; - replayer->print_status(f, ss); + for (auto &pool_replayer : m_pool_replayers) { + pool_replayer.second->print_status(f, ss); } if (f) { @@ -322,9 +320,8 @@ void Mirror::start() m_manual_stop = false; - for (auto it = m_replayers.begin(); it != m_replayers.end(); it++) { - auto &replayer = it->second; - replayer->start(); + for (auto &pool_replayer : m_pool_replayers) { + pool_replayer.second->start(); } } @@ -339,9 +336,8 @@ void Mirror::stop() m_manual_stop = true; - for (auto it = m_replayers.begin(); it != m_replayers.end(); it++) { - auto &replayer = it->second; - replayer->stop(true); + for (auto &pool_replayer : m_pool_replayers) { + pool_replayer.second->stop(true); } } @@ -356,9 +352,8 @@ void Mirror::restart() m_manual_stop = false; - for (auto it = m_replayers.begin(); it != m_replayers.end(); it++) { - auto &replayer = it->second; - replayer->restart(); + for (auto &pool_replayer : m_pool_replayers) { + pool_replayer.second->restart(); } } @@ -371,9 +366,8 @@ void Mirror::flush() return; } - for (auto it = m_replayers.begin(); it != m_replayers.end(); it++) { - auto &replayer = it->second; - replayer->flush(); + for (auto &pool_replayer : m_pool_replayers) { + pool_replayer.second->flush(); } } @@ -386,30 +380,29 @@ void Mirror::release_leader() return; } - for (auto it = m_replayers.begin(); it != m_replayers.end(); it++) { - auto &replayer = it->second; - replayer->release_leader(); + for (auto &pool_replayer : m_pool_replayers) { + pool_replayer.second->release_leader(); } } -void Mirror::update_replayers(const PoolPeers &pool_peers) +void Mirror::update_pool_replayers(const PoolPeers &pool_peers) { dout(20) << "enter" << dendl; assert(m_lock.is_locked()); - // remove stale replayers before creating new replayers - for (auto it = m_replayers.begin(); it != m_replayers.end();) { + // remove stale pool replayers before creating new pool replayers + for (auto it = m_pool_replayers.begin(); it != m_pool_replayers.end();) { auto &peer = it->first.second; auto pool_peer_it = pool_peers.find(it->first.first); if (it->second->is_blacklisted()) { - derr << "removing blacklisted replayer for " << peer << dendl; + derr << "removing blacklisted pool replayer for " << peer << dendl; // TODO: make async - it = m_replayers.erase(it); + it = m_pool_replayers.erase(it); } else if (pool_peer_it == pool_peers.end() || pool_peer_it->second.find(peer) == pool_peer_it->second.end()) { - dout(20) << "removing replayer for " << peer << dendl; + dout(20) << "removing pool replayer for " << peer << dendl; // TODO: make async - it = m_replayers.erase(it); + it = m_pool_replayers.erase(it); } else { ++it; } @@ -418,19 +411,22 @@ void Mirror::update_replayers(const PoolPeers &pool_peers) for (auto &kv : pool_peers) { for (auto &peer : kv.second) { PoolPeer pool_peer(kv.first, peer); - if (m_replayers.find(pool_peer) == m_replayers.end()) { - dout(20) << "starting replayer for " << peer << dendl; - unique_ptr replayer(new Replayer(m_threads, m_image_deleter, - m_image_sync_throttler, - kv.first, peer, m_args)); - // TODO: make async, and retry connecting within replayer - int r = replayer->init(); + if (m_pool_replayers.find(pool_peer) == m_pool_replayers.end()) { + dout(20) << "starting pool replayer for " << peer << dendl; + unique_ptr pool_replayer(new PoolReplayer( + m_threads, m_image_deleter, m_image_sync_throttler, kv.first, peer, + m_args)); + + // TODO: make async, and retry connecting within pool replayer + int r = pool_replayer->init(); if (r < 0) { continue; } - m_replayers.insert(std::make_pair(pool_peer, std::move(replayer))); + m_pool_replayers.emplace(pool_peer, std::move(pool_replayer)); } } + + // TODO currently only support a single peer } } diff --git a/src/tools/rbd_mirror/Mirror.h b/src/tools/rbd_mirror/Mirror.h index 59e0575358b3..4ff9d512399a 100644 --- a/src/tools/rbd_mirror/Mirror.h +++ b/src/tools/rbd_mirror/Mirror.h @@ -13,7 +13,7 @@ #include "include/atomic.h" #include "include/rados/librados.hpp" #include "ClusterWatcher.h" -#include "Replayer.h" +#include "PoolReplayer.h" #include "ImageDeleter.h" #include "types.h" @@ -53,7 +53,7 @@ private: typedef ClusterWatcher::PoolPeers PoolPeers; typedef std::pair PoolPeer; - void update_replayers(const PoolPeers &pool_peers); + void update_pool_replayers(const PoolPeers &pool_peers); CephContext *m_cct; std::vector m_args; @@ -66,7 +66,7 @@ private: std::unique_ptr m_local_cluster_watcher; std::shared_ptr m_image_deleter; ImageSyncThrottlerRef<> m_image_sync_throttler; - std::map > m_replayers; + std::map > m_pool_replayers; atomic_t m_stopping; bool m_manual_stop = false; MirrorAdminSocketHook *m_asok_hook; diff --git a/src/tools/rbd_mirror/PoolReplayer.cc b/src/tools/rbd_mirror/PoolReplayer.cc new file mode 100644 index 000000000000..6747ddc15aac --- /dev/null +++ b/src/tools/rbd_mirror/PoolReplayer.cc @@ -0,0 +1,726 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "PoolReplayer.h" +#include +#include "common/Formatter.h" +#include "common/admin_socket.h" +#include "common/ceph_argparse.h" +#include "common/code_environment.h" +#include "common/common_init.h" +#include "common/debug.h" +#include "common/errno.h" +#include "include/stringify.h" +#include "cls/rbd/cls_rbd_client.h" +#include "global/global_context.h" +#include "librbd/internal.h" +#include "librbd/Utils.h" +#include "librbd/Watcher.h" +#include "librbd/api/Mirror.h" +#include "InstanceReplayer.h" +#include "InstanceWatcher.h" +#include "LeaderWatcher.h" +#include "Threads.h" +#include "pool_watcher/RefreshImagesRequest.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_rbd_mirror +#undef dout_prefix +#define dout_prefix *_dout << "rbd::mirror::PoolReplayer: " \ + << this << " " << __func__ << ": " + +using std::chrono::seconds; +using std::map; +using std::string; +using std::unique_ptr; +using std::vector; + +using librbd::cls_client::dir_get_name; +using librbd::util::create_async_context_callback; + +namespace rbd { +namespace mirror { + +namespace { + +class PoolReplayerAdminSocketCommand { +public: + PoolReplayerAdminSocketCommand(PoolReplayer *pool_replayer) + : pool_replayer(pool_replayer) { + } + virtual ~PoolReplayerAdminSocketCommand() {} + virtual bool call(Formatter *f, stringstream *ss) = 0; +protected: + PoolReplayer *pool_replayer; +}; + +class StatusCommand : public PoolReplayerAdminSocketCommand { +public: + explicit StatusCommand(PoolReplayer *pool_replayer) + : PoolReplayerAdminSocketCommand(pool_replayer) { + } + + bool call(Formatter *f, stringstream *ss) override { + pool_replayer->print_status(f, ss); + return true; + } +}; + +class StartCommand : public PoolReplayerAdminSocketCommand { +public: + explicit StartCommand(PoolReplayer *pool_replayer) + : PoolReplayerAdminSocketCommand(pool_replayer) { + } + + bool call(Formatter *f, stringstream *ss) override { + pool_replayer->start(); + return true; + } +}; + +class StopCommand : public PoolReplayerAdminSocketCommand { +public: + explicit StopCommand(PoolReplayer *pool_replayer) + : PoolReplayerAdminSocketCommand(pool_replayer) { + } + + bool call(Formatter *f, stringstream *ss) override { + pool_replayer->stop(true); + return true; + } +}; + +class RestartCommand : public PoolReplayerAdminSocketCommand { +public: + explicit RestartCommand(PoolReplayer *pool_replayer) + : PoolReplayerAdminSocketCommand(pool_replayer) { + } + + bool call(Formatter *f, stringstream *ss) override { + pool_replayer->restart(); + return true; + } +}; + +class FlushCommand : public PoolReplayerAdminSocketCommand { +public: + explicit FlushCommand(PoolReplayer *pool_replayer) + : PoolReplayerAdminSocketCommand(pool_replayer) { + } + + bool call(Formatter *f, stringstream *ss) override { + pool_replayer->flush(); + return true; + } +}; + +class LeaderReleaseCommand : public PoolReplayerAdminSocketCommand { +public: + explicit LeaderReleaseCommand(PoolReplayer *pool_replayer) + : PoolReplayerAdminSocketCommand(pool_replayer) { + } + + bool call(Formatter *f, stringstream *ss) override { + pool_replayer->release_leader(); + return true; + } +}; + +class PoolReplayerAdminSocketHook : public AdminSocketHook { +public: + PoolReplayerAdminSocketHook(CephContext *cct, const std::string &name, + PoolReplayer *pool_replayer) + : admin_socket(cct->get_admin_socket()) { + std::string command; + int r; + + command = "rbd mirror status " + name; + r = admin_socket->register_command(command, command, this, + "get status for rbd mirror " + name); + if (r == 0) { + commands[command] = new StatusCommand(pool_replayer); + } + + command = "rbd mirror start " + name; + r = admin_socket->register_command(command, command, this, + "start rbd mirror " + name); + if (r == 0) { + commands[command] = new StartCommand(pool_replayer); + } + + command = "rbd mirror stop " + name; + r = admin_socket->register_command(command, command, this, + "stop rbd mirror " + name); + if (r == 0) { + commands[command] = new StopCommand(pool_replayer); + } + + command = "rbd mirror restart " + name; + r = admin_socket->register_command(command, command, this, + "restart rbd mirror " + name); + if (r == 0) { + commands[command] = new RestartCommand(pool_replayer); + } + + command = "rbd mirror flush " + name; + r = admin_socket->register_command(command, command, this, + "flush rbd mirror " + name); + if (r == 0) { + commands[command] = new FlushCommand(pool_replayer); + } + + command = "rbd mirror leader release " + name; + r = admin_socket->register_command(command, command, this, + "release rbd mirror leader " + name); + if (r == 0) { + commands[command] = new LeaderReleaseCommand(pool_replayer); + } + } + + ~PoolReplayerAdminSocketHook() override { + for (Commands::const_iterator i = commands.begin(); i != commands.end(); + ++i) { + (void)admin_socket->unregister_command(i->first); + delete i->second; + } + } + + bool call(std::string command, cmdmap_t& cmdmap, std::string format, + bufferlist& out) override { + Commands::const_iterator i = commands.find(command); + assert(i != commands.end()); + Formatter *f = Formatter::create(format); + stringstream ss; + bool r = i->second->call(f, &ss); + delete f; + out.append(ss); + return r; + } + +private: + typedef std::map Commands; + + AdminSocket *admin_socket; + Commands commands; +}; + +} // anonymous namespace + +struct PoolReplayer::C_RefreshLocalImages : public Context { + PoolReplayer *pool_replayer; + Context *on_finish; + ImageIds image_ids; + + C_RefreshLocalImages(PoolReplayer *pool_replayer, Context *on_finish) + : pool_replayer(pool_replayer), on_finish(on_finish) { + } + + void finish(int r) override { + pool_replayer->handle_refresh_local_images(r, std::move(image_ids), on_finish); + } +}; + +PoolReplayer::PoolReplayer(Threads *threads, + std::shared_ptr image_deleter, + ImageSyncThrottlerRef<> image_sync_throttler, + int64_t local_pool_id, const peer_t &peer, + const std::vector &args) : + m_threads(threads), + m_image_deleter(image_deleter), + m_image_sync_throttler(image_sync_throttler), + m_lock(stringify("rbd::mirror::PoolReplayer ") + stringify(peer)), + m_peer(peer), + m_args(args), + m_local_pool_id(local_pool_id), + m_pool_watcher_listener(this), + m_asok_hook(nullptr), + m_pool_replayer_thread(this), + m_leader_listener(this) +{ +} + +PoolReplayer::~PoolReplayer() +{ + delete m_asok_hook; + + m_stopping.set(1); + { + Mutex::Locker l(m_lock); + m_cond.Signal(); + } + if (m_pool_replayer_thread.is_started()) { + m_pool_replayer_thread.join(); + } + if (m_leader_watcher) { + m_leader_watcher->shut_down(); + } + if (m_instance_watcher) { + m_instance_watcher->shut_down(); + } + if (m_instance_replayer) { + m_instance_replayer->shut_down(); + } + + assert(!m_pool_watcher); +} + +bool PoolReplayer::is_blacklisted() const { + Mutex::Locker locker(m_lock); + return m_blacklisted; +} + +bool PoolReplayer::is_leader() const { + Mutex::Locker locker(m_lock); + return m_leader_watcher && m_leader_watcher->is_leader(); +} + +int PoolReplayer::init() +{ + dout(20) << "replaying for " << m_peer << dendl; + + int r = init_rados(g_ceph_context->_conf->cluster, + g_ceph_context->_conf->name.to_str(), + "local cluster", &m_local_rados); + if (r < 0) { + return r; + } + + r = init_rados(m_peer.cluster_name, m_peer.client_name, + std::string("remote peer ") + stringify(m_peer), + &m_remote_rados); + if (r < 0) { + return r; + } + + r = m_local_rados->ioctx_create2(m_local_pool_id, m_local_io_ctx); + if (r < 0) { + derr << "error accessing local pool " << m_local_pool_id << ": " + << cpp_strerror(r) << dendl; + return r; + } + + std::string local_mirror_uuid; + r = librbd::cls_client::mirror_uuid_get(&m_local_io_ctx, + &local_mirror_uuid); + if (r < 0) { + derr << "failed to retrieve local mirror uuid from pool " + << m_local_io_ctx.get_pool_name() << ": " << cpp_strerror(r) << dendl; + return r; + } + + r = m_remote_rados->ioctx_create(m_local_io_ctx.get_pool_name().c_str(), + m_remote_io_ctx); + if (r < 0) { + derr << "error accessing remote pool " << m_local_io_ctx.get_pool_name() + << ": " << cpp_strerror(r) << dendl; + return r; + } + + dout(20) << "connected to " << m_peer << dendl; + + m_instance_replayer.reset( + InstanceReplayer<>::create(m_threads, m_image_deleter, + m_image_sync_throttler, m_local_rados, + local_mirror_uuid, m_local_pool_id)); + m_instance_replayer->init(); + m_instance_replayer->add_peer(m_peer.uuid, m_remote_io_ctx); + + m_instance_watcher.reset(InstanceWatcher<>::create(m_local_io_ctx, + m_threads->work_queue, + m_instance_replayer.get())); + r = m_instance_watcher->init(); + if (r < 0) { + derr << "error initializing instance watcher: " << cpp_strerror(r) << dendl; + return r; + } + + m_leader_watcher.reset(new LeaderWatcher<>(m_threads, m_local_io_ctx, + &m_leader_listener)); + r = m_leader_watcher->init(); + if (r < 0) { + derr << "error initializing leader watcher: " << cpp_strerror(r) << dendl; + return r; + } + + m_pool_replayer_thread.create("pool replayer"); + + return 0; +} + +int PoolReplayer::init_rados(const std::string &cluster_name, + const std::string &client_name, + const std::string &description, + RadosRef *rados_ref) { + rados_ref->reset(new librados::Rados()); + + // NOTE: manually bootstrap a CephContext here instead of via + // the librados API to avoid mixing global singletons between + // the librados shared library and the daemon + // TODO: eliminate intermingling of global singletons within Ceph APIs + CephInitParameters iparams(CEPH_ENTITY_TYPE_CLIENT); + if (client_name.empty() || !iparams.name.from_str(client_name)) { + derr << "error initializing cluster handle for " << description << dendl; + return -EINVAL; + } + + CephContext *cct = common_preinit(iparams, CODE_ENVIRONMENT_LIBRARY, + CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS); + cct->_conf->cluster = cluster_name; + + // librados::Rados::conf_read_file + int r = cct->_conf->parse_config_files(nullptr, nullptr, 0); + if (r < 0) { + derr << "could not read ceph conf for " << description << ": " + << cpp_strerror(r) << dendl; + cct->put(); + return r; + } + cct->_conf->parse_env(); + + // librados::Rados::conf_parse_env + std::vector args; + env_to_vec(args, nullptr); + r = cct->_conf->parse_argv(args); + if (r < 0) { + derr << "could not parse environment for " << description << ":" + << cpp_strerror(r) << dendl; + cct->put(); + return r; + } + + if (!m_args.empty()) { + // librados::Rados::conf_parse_argv + args = m_args; + r = cct->_conf->parse_argv(args); + if (r < 0) { + derr << "could not parse command line args for " << description << ": " + << cpp_strerror(r) << dendl; + cct->put(); + return r; + } + } + + // disable unnecessary librbd cache + cct->_conf->set_val_or_die("rbd_cache", "false"); + cct->_conf->apply_changes(nullptr); + cct->_conf->complain_about_parse_errors(cct); + + r = (*rados_ref)->init_with_context(cct); + assert(r == 0); + cct->put(); + + r = (*rados_ref)->connect(); + if (r < 0) { + derr << "error connecting to " << description << ": " + << cpp_strerror(r) << dendl; + return r; + } + + return 0; +} + +void PoolReplayer::run() +{ + dout(20) << "enter" << dendl; + + while (!m_stopping.read()) { + std::string asok_hook_name = m_local_io_ctx.get_pool_name() + " " + + m_peer.cluster_name; + if (m_asok_hook_name != asok_hook_name || m_asok_hook == nullptr) { + m_asok_hook_name = asok_hook_name; + delete m_asok_hook; + + m_asok_hook = new PoolReplayerAdminSocketHook(g_ceph_context, + m_asok_hook_name, this); + } + + Mutex::Locker locker(m_lock); + if (m_pool_watcher && m_pool_watcher->is_blacklisted()) { + m_blacklisted = true; + m_stopping.set(1); + break; + } + + m_cond.WaitInterval(m_lock, utime_t(1, 0)); + } +} + +void PoolReplayer::print_status(Formatter *f, stringstream *ss) +{ + dout(20) << "enter" << dendl; + + if (!f) { + return; + } + + Mutex::Locker l(m_lock); + + f->open_object_section("pool_replayer_status"); + f->dump_string("pool", m_local_io_ctx.get_pool_name()); + f->dump_stream("peer") << m_peer; + f->dump_string("instance_id", m_instance_watcher->get_instance_id()); + + std::string leader_instance_id; + m_leader_watcher->get_leader_instance_id(&leader_instance_id); + f->dump_string("leader_instance_id", leader_instance_id); + + bool leader = m_leader_watcher->is_leader(); + f->dump_bool("leader", leader); + if (leader) { + std::vector instance_ids; + m_leader_watcher->list_instances(&instance_ids); + f->open_array_section("instances"); + for (auto instance_id : instance_ids) { + f->dump_string("instance_id", instance_id); + } + f->close_section(); + } + + m_instance_replayer->print_status(f, ss); + + f->close_section(); + f->flush(*ss); +} + +void PoolReplayer::start() +{ + dout(20) << "enter" << dendl; + + Mutex::Locker l(m_lock); + + if (m_stopping.read()) { + return; + } + + m_instance_replayer->start(); +} + +void PoolReplayer::stop(bool manual) +{ + dout(20) << "enter: manual=" << manual << dendl; + + Mutex::Locker l(m_lock); + if (!manual) { + m_stopping.set(1); + m_cond.Signal(); + return; + } else if (m_stopping.read()) { + return; + } + + m_instance_replayer->stop(); +} + +void PoolReplayer::restart() +{ + dout(20) << "enter" << dendl; + + Mutex::Locker l(m_lock); + + if (m_stopping.read()) { + return; + } + + m_instance_replayer->restart(); +} + +void PoolReplayer::flush() +{ + dout(20) << "enter" << dendl; + + Mutex::Locker l(m_lock); + + if (m_stopping.read() || m_manual_stop) { + return; + } + + m_instance_replayer->flush(); +} + +void PoolReplayer::release_leader() +{ + dout(20) << "enter" << dendl; + + Mutex::Locker l(m_lock); + + if (m_stopping.read() || !m_leader_watcher) { + return; + } + + m_leader_watcher->release_leader(); +} + +void PoolReplayer::handle_update(const std::string &mirror_uuid, + const ImageIds &added_image_ids, + const ImageIds &removed_image_ids) { + assert(!mirror_uuid.empty()); + if (m_stopping.read()) { + return; + } + + dout(10) << dendl; + Mutex::Locker locker(m_lock); + if (!m_leader_watcher->is_leader()) { + return; + } + + if (m_peer.uuid != mirror_uuid) { + m_instance_replayer->remove_peer(m_peer.uuid); + m_instance_replayer->add_peer(mirror_uuid, m_remote_io_ctx); + m_peer.uuid = mirror_uuid; + } + + // first callback will be a full directory -- so see if we need to remove + // any local images that no longer exist on the remote side + if (!m_init_image_ids.empty()) { + dout(20) << "scanning initial local image set" << dendl; + for (auto &image_id : added_image_ids) { + auto it = m_init_image_ids.find(image_id); + if (it != m_init_image_ids.end()) { + m_init_image_ids.erase(it); + } + } + + // the remaining images in m_init_image_ids must be deleted + for (auto &image_id : m_init_image_ids) { + dout(20) << "scheduling the deletion of init image: " + << image_id.global_id << " (" << image_id.id << ")" << dendl; + m_image_deleter->schedule_image_delete(m_local_rados, m_local_pool_id, + image_id.id, image_id.global_id); + } + m_init_image_ids.clear(); + } + + m_update_op_tracker.start_op(); + Context *ctx = new FunctionContext([this](int r) { + dout(20) << "complete handle_update: r=" << r << dendl; + m_update_op_tracker.finish_op(); + }); + + C_Gather *gather_ctx = new C_Gather(g_ceph_context, ctx); + + for (auto &image_id : removed_image_ids) { + // for now always send to myself (the leader) + std::string &instance_id = m_instance_watcher->get_instance_id(); + m_instance_watcher->notify_image_release(instance_id, image_id.global_id, + mirror_uuid, image_id.id, true, + gather_ctx->new_sub()); + } + + for (auto &image_id : added_image_ids) { + // for now always send to myself (the leader) + std::string &instance_id = m_instance_watcher->get_instance_id(); + m_instance_watcher->notify_image_acquire(instance_id, image_id.global_id, + mirror_uuid, image_id.id, + gather_ctx->new_sub()); + } + + gather_ctx->activate(); +} + +void PoolReplayer::handle_post_acquire_leader(Context *on_finish) { + dout(20) << dendl; + refresh_local_images(on_finish); +} + +void PoolReplayer::handle_pre_release_leader(Context *on_finish) { + dout(20) << dendl; + shut_down_pool_watcher(on_finish); +} + +void PoolReplayer::refresh_local_images(Context *on_finish) { + dout(20) << dendl; + + // ensure the initial set of local images is up-to-date + // after acquiring the leader role + auto ctx = new C_RefreshLocalImages(this, on_finish); + auto req = pool_watcher::RefreshImagesRequest<>::create( + m_local_io_ctx, &ctx->image_ids, ctx); + req->send(); +} + +void PoolReplayer::handle_refresh_local_images(int r, ImageIds &&image_ids, + Context *on_finish) { + dout(20) << "r=" << r << dendl; + + { + Mutex::Locker locker(m_lock); + m_init_image_ids = std::move(image_ids); + } + + if (r < 0) { + derr << "failed to retrieve local images: " << cpp_strerror(r) << dendl; + on_finish->complete(r); + return; + } + + init_pool_watcher(on_finish); +} + +void PoolReplayer::init_pool_watcher(Context *on_finish) { + dout(20) << dendl; + + Mutex::Locker locker(m_lock); + assert(!m_pool_watcher); + m_pool_watcher.reset(new PoolWatcher<>( + m_threads, m_remote_io_ctx, m_pool_watcher_listener)); + m_pool_watcher->init(create_async_context_callback( + m_threads->work_queue, on_finish)); + + m_cond.Signal(); +} + +void PoolReplayer::shut_down_pool_watcher(Context *on_finish) { + dout(20) << dendl; + + { + Mutex::Locker locker(m_lock); + if (m_pool_watcher) { + Context *ctx = new FunctionContext([this, on_finish](int r) { + handle_shut_down_pool_watcher(r, on_finish); + }); + ctx = create_async_context_callback(m_threads->work_queue, ctx); + + m_pool_watcher->shut_down(ctx); + return; + } + } + + on_finish->complete(0); +} + +void PoolReplayer::handle_shut_down_pool_watcher(int r, Context *on_finish) { + dout(20) << "r=" << r << dendl; + + { + Mutex::Locker locker(m_lock); + assert(m_pool_watcher); + m_pool_watcher.reset(); + } + wait_for_update_ops(on_finish); +} + +void PoolReplayer::wait_for_update_ops(Context *on_finish) { + dout(20) << dendl; + + Mutex::Locker locker(m_lock); + + Context *ctx = new FunctionContext([this, on_finish](int r) { + handle_wait_for_update_ops(r, on_finish); + }); + ctx = create_async_context_callback(m_threads->work_queue, ctx); + + m_update_op_tracker.wait_for_ops(ctx); +} + +void PoolReplayer::handle_wait_for_update_ops(int r, Context *on_finish) { + dout(20) << "r=" << r << dendl; + + assert(r == 0); + + Mutex::Locker locker(m_lock); + m_instance_replayer->release_all(on_finish); +} + +} // namespace mirror +} // namespace rbd diff --git a/src/tools/rbd_mirror/PoolReplayer.h b/src/tools/rbd_mirror/PoolReplayer.h new file mode 100644 index 000000000000..7de10a5c6a5e --- /dev/null +++ b/src/tools/rbd_mirror/PoolReplayer.h @@ -0,0 +1,171 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_RBD_MIRROR_POOL_REPLAYER_H +#define CEPH_RBD_MIRROR_POOL_REPLAYER_H + +#include +#include +#include +#include + +#include "common/AsyncOpTracker.h" +#include "common/Cond.h" +#include "common/Mutex.h" +#include "common/WorkQueue.h" +#include "include/atomic.h" +#include "include/rados/librados.hpp" + +#include "ClusterWatcher.h" +#include "LeaderWatcher.h" +#include "PoolWatcher.h" +#include "ImageDeleter.h" +#include "types.h" + +class AdminSocketHook; + +namespace librbd { class ImageCtx; } + +namespace rbd { +namespace mirror { + +template struct Threads; +template class InstanceReplayer; +template class InstanceWatcher; + +/** + * Controls mirroring for a single remote cluster. + */ +class PoolReplayer { +public: + PoolReplayer(Threads *threads, + std::shared_ptr image_deleter, + ImageSyncThrottlerRef<> image_sync_throttler, + int64_t local_pool_id, const peer_t &peer, + const std::vector &args); + ~PoolReplayer(); + PoolReplayer(const PoolReplayer&) = delete; + PoolReplayer& operator=(const PoolReplayer&) = delete; + + bool is_blacklisted() const; + bool is_leader() const; + + int init(); + void run(); + + void print_status(Formatter *f, stringstream *ss); + void start(); + void stop(bool manual); + void restart(); + void flush(); + void release_leader(); + +private: + struct PoolWatcherListener : public PoolWatcher<>::Listener { + PoolReplayer *pool_replayer; + + PoolWatcherListener(PoolReplayer *pool_replayer) + : pool_replayer(pool_replayer) { + } + + void handle_update(const std::string &mirror_uuid, + const ImageIds &added_image_ids, + const ImageIds &removed_image_ids) override { + pool_replayer->handle_update(mirror_uuid, added_image_ids, + removed_image_ids); + } + }; + + struct C_RefreshLocalImages; + + void handle_update(const std::string &mirror_uuid, + const ImageIds &added_image_ids, + const ImageIds &removed_image_ids); + + int init_rados(const std::string &cluster_name, + const std::string &client_name, + const std::string &description, RadosRef *rados_ref); + + void handle_post_acquire_leader(Context *on_finish); + void handle_pre_release_leader(Context *on_finish); + + void refresh_local_images(Context *on_finish); + void handle_refresh_local_images(int r, ImageIds &&image_ids, + Context *on_finish); + + void init_pool_watcher(Context *on_finish); + void shut_down_pool_watcher(Context *on_finish); + void handle_shut_down_pool_watcher(int r, Context *on_finish); + + void wait_for_update_ops(Context *on_finish); + void handle_wait_for_update_ops(int r, Context *on_finish); + + Threads *m_threads; + std::shared_ptr m_image_deleter; + ImageSyncThrottlerRef<> m_image_sync_throttler; + mutable Mutex m_lock; + Cond m_cond; + atomic_t m_stopping; + bool m_manual_stop = false; + bool m_blacklisted = false; + + peer_t m_peer; + std::vector m_args; + RadosRef m_local_rados; + RadosRef m_remote_rados; + + librados::IoCtx m_local_io_ctx; + librados::IoCtx m_remote_io_ctx; + + int64_t m_local_pool_id = -1; + + PoolWatcherListener m_pool_watcher_listener; + std::unique_ptr > m_pool_watcher; + + std::unique_ptr> m_instance_replayer; + + std::string m_asok_hook_name; + AdminSocketHook *m_asok_hook; + + std::set m_init_image_ids; + + class PoolReplayerThread : public Thread { + PoolReplayer *m_pool_replayer; + public: + PoolReplayerThread(PoolReplayer *pool_replayer) + : m_pool_replayer(pool_replayer) { + } + void *entry() override { + m_pool_replayer->run(); + return 0; + } + } m_pool_replayer_thread; + + class LeaderListener : public LeaderWatcher<>::Listener { + public: + LeaderListener(PoolReplayer *pool_replayer) + : m_pool_replayer(pool_replayer) { + } + + protected: + void post_acquire_handler(Context *on_finish) override { + m_pool_replayer->handle_post_acquire_leader(on_finish); + } + + void pre_release_handler(Context *on_finish) override { + m_pool_replayer->handle_pre_release_leader(on_finish); + } + + private: + PoolReplayer *m_pool_replayer; + } m_leader_listener; + + std::unique_ptr > m_leader_watcher; + std::unique_ptr > m_instance_watcher; + AsyncOpTracker m_update_op_tracker; +}; + +} // namespace mirror +} // namespace rbd + +#endif // CEPH_RBD_MIRROR_POOL_REPLAYER_H diff --git a/src/tools/rbd_mirror/Replayer.cc b/src/tools/rbd_mirror/Replayer.cc deleted file mode 100644 index 7f8f9e2a304d..000000000000 --- a/src/tools/rbd_mirror/Replayer.cc +++ /dev/null @@ -1,727 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab - -#include - -#include "common/Formatter.h" -#include "common/admin_socket.h" -#include "common/ceph_argparse.h" -#include "common/code_environment.h" -#include "common/common_init.h" -#include "common/debug.h" -#include "common/errno.h" -#include "include/stringify.h" -#include "cls/rbd/cls_rbd_client.h" -#include "global/global_context.h" -#include "librbd/internal.h" -#include "librbd/Utils.h" -#include "librbd/Watcher.h" -#include "librbd/api/Mirror.h" -#include "InstanceReplayer.h" -#include "InstanceWatcher.h" -#include "LeaderWatcher.h" -#include "Replayer.h" -#include "Threads.h" -#include "pool_watcher/RefreshImagesRequest.h" - -#define dout_context g_ceph_context -#define dout_subsys ceph_subsys_rbd_mirror -#undef dout_prefix -#define dout_prefix *_dout << "rbd::mirror::Replayer: " \ - << this << " " << __func__ << ": " - -using std::chrono::seconds; -using std::map; -using std::string; -using std::unique_ptr; -using std::vector; - -using librbd::cls_client::dir_get_name; -using librbd::util::create_async_context_callback; - -namespace rbd { -namespace mirror { - -namespace { - -class ReplayerAdminSocketCommand { -public: - virtual ~ReplayerAdminSocketCommand() {} - virtual bool call(Formatter *f, stringstream *ss) = 0; -}; - -class StatusCommand : public ReplayerAdminSocketCommand { -public: - explicit StatusCommand(Replayer *replayer) : replayer(replayer) {} - - bool call(Formatter *f, stringstream *ss) override { - replayer->print_status(f, ss); - return true; - } - -private: - Replayer *replayer; -}; - -class StartCommand : public ReplayerAdminSocketCommand { -public: - explicit StartCommand(Replayer *replayer) : replayer(replayer) {} - - bool call(Formatter *f, stringstream *ss) override { - replayer->start(); - return true; - } - -private: - Replayer *replayer; -}; - -class StopCommand : public ReplayerAdminSocketCommand { -public: - explicit StopCommand(Replayer *replayer) : replayer(replayer) {} - - bool call(Formatter *f, stringstream *ss) override { - replayer->stop(true); - return true; - } - -private: - Replayer *replayer; -}; - -class RestartCommand : public ReplayerAdminSocketCommand { -public: - explicit RestartCommand(Replayer *replayer) : replayer(replayer) {} - - bool call(Formatter *f, stringstream *ss) override { - replayer->restart(); - return true; - } - -private: - Replayer *replayer; -}; - -class FlushCommand : public ReplayerAdminSocketCommand { -public: - explicit FlushCommand(Replayer *replayer) : replayer(replayer) {} - - bool call(Formatter *f, stringstream *ss) override { - replayer->flush(); - return true; - } - -private: - Replayer *replayer; -}; - -class LeaderReleaseCommand : public ReplayerAdminSocketCommand { -public: - explicit LeaderReleaseCommand(Replayer *replayer) : replayer(replayer) {} - - bool call(Formatter *f, stringstream *ss) override { - replayer->release_leader(); - return true; - } - -private: - Replayer *replayer; -}; - -class ReplayerAdminSocketHook : public AdminSocketHook { -public: - ReplayerAdminSocketHook(CephContext *cct, const std::string &name, - Replayer *replayer) : - admin_socket(cct->get_admin_socket()) { - std::string command; - int r; - - command = "rbd mirror status " + name; - r = admin_socket->register_command(command, command, this, - "get status for rbd mirror " + name); - if (r == 0) { - commands[command] = new StatusCommand(replayer); - } - - command = "rbd mirror start " + name; - r = admin_socket->register_command(command, command, this, - "start rbd mirror " + name); - if (r == 0) { - commands[command] = new StartCommand(replayer); - } - - command = "rbd mirror stop " + name; - r = admin_socket->register_command(command, command, this, - "stop rbd mirror " + name); - if (r == 0) { - commands[command] = new StopCommand(replayer); - } - - command = "rbd mirror restart " + name; - r = admin_socket->register_command(command, command, this, - "restart rbd mirror " + name); - if (r == 0) { - commands[command] = new RestartCommand(replayer); - } - - command = "rbd mirror flush " + name; - r = admin_socket->register_command(command, command, this, - "flush rbd mirror " + name); - if (r == 0) { - commands[command] = new FlushCommand(replayer); - } - - command = "rbd mirror leader release " + name; - r = admin_socket->register_command(command, command, this, - "release rbd mirror leader " + name); - if (r == 0) { - commands[command] = new LeaderReleaseCommand(replayer); - } - } - - ~ReplayerAdminSocketHook() override { - for (Commands::const_iterator i = commands.begin(); i != commands.end(); - ++i) { - (void)admin_socket->unregister_command(i->first); - delete i->second; - } - } - - bool call(std::string command, cmdmap_t& cmdmap, std::string format, - bufferlist& out) override { - Commands::const_iterator i = commands.find(command); - assert(i != commands.end()); - Formatter *f = Formatter::create(format); - stringstream ss; - bool r = i->second->call(f, &ss); - delete f; - out.append(ss); - return r; - } - -private: - typedef std::map Commands; - - AdminSocket *admin_socket; - Commands commands; -}; - -} // anonymous namespace - -struct Replayer::C_RefreshLocalImages : public Context { - Replayer *replayer; - Context *on_finish; - ImageIds image_ids; - - C_RefreshLocalImages(Replayer *replayer, Context *on_finish) - : replayer(replayer), on_finish(on_finish) { - } - - void finish(int r) override { - replayer->handle_refresh_local_images(r, std::move(image_ids), on_finish); - } -}; - -Replayer::Replayer(Threads *threads, - std::shared_ptr image_deleter, - ImageSyncThrottlerRef<> image_sync_throttler, - int64_t local_pool_id, const peer_t &peer, - const std::vector &args) : - m_threads(threads), - m_image_deleter(image_deleter), - m_image_sync_throttler(image_sync_throttler), - m_lock(stringify("rbd::mirror::Replayer ") + stringify(peer)), - m_peer(peer), - m_args(args), - m_local_pool_id(local_pool_id), - m_pool_watcher_listener(this), - m_asok_hook(nullptr), - m_replayer_thread(this), - m_leader_listener(this) -{ -} - -Replayer::~Replayer() -{ - delete m_asok_hook; - - m_stopping.set(1); - { - Mutex::Locker l(m_lock); - m_cond.Signal(); - } - if (m_replayer_thread.is_started()) { - m_replayer_thread.join(); - } - if (m_leader_watcher) { - m_leader_watcher->shut_down(); - } - if (m_instance_watcher) { - m_instance_watcher->shut_down(); - } - if (m_instance_replayer) { - m_instance_replayer->shut_down(); - } - - assert(!m_pool_watcher); -} - -bool Replayer::is_blacklisted() const { - Mutex::Locker locker(m_lock); - return m_blacklisted; -} - -bool Replayer::is_leader() const { - Mutex::Locker locker(m_lock); - return m_leader_watcher && m_leader_watcher->is_leader(); -} - -int Replayer::init() -{ - dout(20) << "replaying for " << m_peer << dendl; - - int r = init_rados(g_ceph_context->_conf->cluster, - g_ceph_context->_conf->name.to_str(), - "local cluster", &m_local_rados); - if (r < 0) { - return r; - } - - r = init_rados(m_peer.cluster_name, m_peer.client_name, - std::string("remote peer ") + stringify(m_peer), - &m_remote_rados); - if (r < 0) { - return r; - } - - r = m_local_rados->ioctx_create2(m_local_pool_id, m_local_io_ctx); - if (r < 0) { - derr << "error accessing local pool " << m_local_pool_id << ": " - << cpp_strerror(r) << dendl; - return r; - } - - std::string local_mirror_uuid; - r = librbd::cls_client::mirror_uuid_get(&m_local_io_ctx, - &local_mirror_uuid); - if (r < 0) { - derr << "failed to retrieve local mirror uuid from pool " - << m_local_io_ctx.get_pool_name() << ": " << cpp_strerror(r) << dendl; - return r; - } - - r = m_remote_rados->ioctx_create(m_local_io_ctx.get_pool_name().c_str(), - m_remote_io_ctx); - if (r < 0) { - derr << "error accessing remote pool " << m_local_io_ctx.get_pool_name() - << ": " << cpp_strerror(r) << dendl; - return r; - } - - dout(20) << "connected to " << m_peer << dendl; - - m_instance_replayer.reset( - InstanceReplayer<>::create(m_threads, m_image_deleter, - m_image_sync_throttler, m_local_rados, - local_mirror_uuid, m_local_pool_id)); - m_instance_replayer->init(); - m_instance_replayer->add_peer(m_peer.uuid, m_remote_io_ctx); - - m_instance_watcher.reset(InstanceWatcher<>::create(m_local_io_ctx, - m_threads->work_queue, - m_instance_replayer.get())); - r = m_instance_watcher->init(); - if (r < 0) { - derr << "error initializing instance watcher: " << cpp_strerror(r) << dendl; - return r; - } - - m_leader_watcher.reset(new LeaderWatcher<>(m_threads, m_local_io_ctx, - &m_leader_listener)); - r = m_leader_watcher->init(); - if (r < 0) { - derr << "error initializing leader watcher: " << cpp_strerror(r) << dendl; - return r; - } - - m_replayer_thread.create("replayer"); - - return 0; -} - -int Replayer::init_rados(const std::string &cluster_name, - const std::string &client_name, - const std::string &description, RadosRef *rados_ref) { - rados_ref->reset(new librados::Rados()); - - // NOTE: manually bootstrap a CephContext here instead of via - // the librados API to avoid mixing global singletons between - // the librados shared library and the daemon - // TODO: eliminate intermingling of global singletons within Ceph APIs - CephInitParameters iparams(CEPH_ENTITY_TYPE_CLIENT); - if (client_name.empty() || !iparams.name.from_str(client_name)) { - derr << "error initializing cluster handle for " << description << dendl; - return -EINVAL; - } - - CephContext *cct = common_preinit(iparams, CODE_ENVIRONMENT_LIBRARY, - CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS); - cct->_conf->cluster = cluster_name; - - // librados::Rados::conf_read_file - int r = cct->_conf->parse_config_files(nullptr, nullptr, 0); - if (r < 0) { - derr << "could not read ceph conf for " << description << ": " - << cpp_strerror(r) << dendl; - cct->put(); - return r; - } - cct->_conf->parse_env(); - - // librados::Rados::conf_parse_env - std::vector args; - env_to_vec(args, nullptr); - r = cct->_conf->parse_argv(args); - if (r < 0) { - derr << "could not parse environment for " << description << ":" - << cpp_strerror(r) << dendl; - cct->put(); - return r; - } - - if (!m_args.empty()) { - // librados::Rados::conf_parse_argv - args = m_args; - r = cct->_conf->parse_argv(args); - if (r < 0) { - derr << "could not parse command line args for " << description << ": " - << cpp_strerror(r) << dendl; - cct->put(); - return r; - } - } - - // disable unnecessary librbd cache - cct->_conf->set_val_or_die("rbd_cache", "false"); - cct->_conf->apply_changes(nullptr); - cct->_conf->complain_about_parse_errors(cct); - - r = (*rados_ref)->init_with_context(cct); - assert(r == 0); - cct->put(); - - r = (*rados_ref)->connect(); - if (r < 0) { - derr << "error connecting to " << description << ": " - << cpp_strerror(r) << dendl; - return r; - } - - return 0; -} - -void Replayer::run() -{ - dout(20) << "enter" << dendl; - - while (!m_stopping.read()) { - std::string asok_hook_name = m_local_io_ctx.get_pool_name() + " " + - m_peer.cluster_name; - if (m_asok_hook_name != asok_hook_name || m_asok_hook == nullptr) { - m_asok_hook_name = asok_hook_name; - delete m_asok_hook; - - m_asok_hook = new ReplayerAdminSocketHook(g_ceph_context, - m_asok_hook_name, this); - } - - Mutex::Locker locker(m_lock); - if (m_pool_watcher && m_pool_watcher->is_blacklisted()) { - m_blacklisted = true; - m_stopping.set(1); - break; - } - - m_cond.WaitInterval(m_lock, utime_t(1, 0)); - } -} - -void Replayer::print_status(Formatter *f, stringstream *ss) -{ - dout(20) << "enter" << dendl; - - if (!f) { - return; - } - - Mutex::Locker l(m_lock); - - f->open_object_section("replayer_status"); - f->dump_string("pool", m_local_io_ctx.get_pool_name()); - f->dump_stream("peer") << m_peer; - f->dump_string("instance_id", m_instance_watcher->get_instance_id()); - - std::string leader_instance_id; - m_leader_watcher->get_leader_instance_id(&leader_instance_id); - f->dump_string("leader_instance_id", leader_instance_id); - - bool leader = m_leader_watcher->is_leader(); - f->dump_bool("leader", leader); - if (leader) { - std::vector instance_ids; - m_leader_watcher->list_instances(&instance_ids); - f->open_array_section("instances"); - for (auto instance_id : instance_ids) { - f->dump_string("instance_id", instance_id); - } - f->close_section(); - } - - m_instance_replayer->print_status(f, ss); - - f->close_section(); - f->flush(*ss); -} - -void Replayer::start() -{ - dout(20) << "enter" << dendl; - - Mutex::Locker l(m_lock); - - if (m_stopping.read()) { - return; - } - - m_instance_replayer->start(); -} - -void Replayer::stop(bool manual) -{ - dout(20) << "enter: manual=" << manual << dendl; - - Mutex::Locker l(m_lock); - if (!manual) { - m_stopping.set(1); - m_cond.Signal(); - return; - } else if (m_stopping.read()) { - return; - } - - m_instance_replayer->stop(); -} - -void Replayer::restart() -{ - dout(20) << "enter" << dendl; - - Mutex::Locker l(m_lock); - - if (m_stopping.read()) { - return; - } - - m_instance_replayer->restart(); -} - -void Replayer::flush() -{ - dout(20) << "enter" << dendl; - - Mutex::Locker l(m_lock); - - if (m_stopping.read() || m_manual_stop) { - return; - } - - m_instance_replayer->flush(); -} - -void Replayer::release_leader() -{ - dout(20) << "enter" << dendl; - - Mutex::Locker l(m_lock); - - if (m_stopping.read() || !m_leader_watcher) { - return; - } - - m_leader_watcher->release_leader(); -} - -void Replayer::handle_update(const std::string &mirror_uuid, - const ImageIds &added_image_ids, - const ImageIds &removed_image_ids) { - assert(!mirror_uuid.empty()); - if (m_stopping.read()) { - return; - } - - dout(10) << dendl; - Mutex::Locker locker(m_lock); - if (!m_leader_watcher->is_leader()) { - return; - } - - if (m_peer.uuid != mirror_uuid) { - m_instance_replayer->remove_peer(m_peer.uuid); - m_instance_replayer->add_peer(mirror_uuid, m_remote_io_ctx); - m_peer.uuid = mirror_uuid; - } - - // first callback will be a full directory -- so see if we need to remove - // any local images that no longer exist on the remote side - if (!m_init_image_ids.empty()) { - dout(20) << "scanning initial local image set" << dendl; - for (auto &image_id : added_image_ids) { - auto it = m_init_image_ids.find(image_id); - if (it != m_init_image_ids.end()) { - m_init_image_ids.erase(it); - } - } - - // the remaining images in m_init_image_ids must be deleted - for (auto &image_id : m_init_image_ids) { - dout(20) << "scheduling the deletion of init image: " - << image_id.global_id << " (" << image_id.id << ")" << dendl; - m_image_deleter->schedule_image_delete(m_local_rados, m_local_pool_id, - image_id.id, image_id.global_id); - } - m_init_image_ids.clear(); - } - - m_update_op_tracker.start_op(); - Context *ctx = new FunctionContext([this](int r) { - dout(20) << "complete handle_update: r=" << r << dendl; - m_update_op_tracker.finish_op(); - }); - - C_Gather *gather_ctx = new C_Gather(g_ceph_context, ctx); - - for (auto &image_id : removed_image_ids) { - // for now always send to myself (the leader) - std::string &instance_id = m_instance_watcher->get_instance_id(); - m_instance_watcher->notify_image_release(instance_id, image_id.global_id, - mirror_uuid, image_id.id, true, - gather_ctx->new_sub()); - } - - for (auto &image_id : added_image_ids) { - // for now always send to myself (the leader) - std::string &instance_id = m_instance_watcher->get_instance_id(); - m_instance_watcher->notify_image_acquire(instance_id, image_id.global_id, - mirror_uuid, image_id.id, - gather_ctx->new_sub()); - } - - gather_ctx->activate(); -} - -void Replayer::handle_post_acquire_leader(Context *on_finish) { - dout(20) << dendl; - refresh_local_images(on_finish); -} - -void Replayer::handle_pre_release_leader(Context *on_finish) { - dout(20) << dendl; - shut_down_pool_watcher(on_finish); -} - -void Replayer::refresh_local_images(Context *on_finish) { - dout(20) << dendl; - - // ensure the initial set of local images is up-to-date - // after acquiring the leader role - auto ctx = new C_RefreshLocalImages(this, on_finish); - auto req = pool_watcher::RefreshImagesRequest<>::create( - m_local_io_ctx, &ctx->image_ids, ctx); - req->send(); -} - -void Replayer::handle_refresh_local_images(int r, ImageIds &&image_ids, - Context *on_finish) { - dout(20) << "r=" << r << dendl; - - { - Mutex::Locker locker(m_lock); - m_init_image_ids = std::move(image_ids); - } - - if (r < 0) { - derr << "failed to retrieve local images: " << cpp_strerror(r) << dendl; - on_finish->complete(r); - return; - } - - init_pool_watcher(on_finish); -} - -void Replayer::init_pool_watcher(Context *on_finish) { - dout(20) << dendl; - - Mutex::Locker locker(m_lock); - assert(!m_pool_watcher); - m_pool_watcher.reset(new PoolWatcher<>( - m_threads, m_remote_io_ctx, m_pool_watcher_listener)); - m_pool_watcher->init(create_async_context_callback( - m_threads->work_queue, on_finish)); - - m_cond.Signal(); -} - -void Replayer::shut_down_pool_watcher(Context *on_finish) { - dout(20) << dendl; - - { - Mutex::Locker locker(m_lock); - if (m_pool_watcher) { - Context *ctx = new FunctionContext([this, on_finish](int r) { - handle_shut_down_pool_watcher(r, on_finish); - }); - ctx = create_async_context_callback(m_threads->work_queue, ctx); - - m_pool_watcher->shut_down(ctx); - return; - } - } - - on_finish->complete(0); -} - -void Replayer::handle_shut_down_pool_watcher(int r, Context *on_finish) { - dout(20) << "r=" << r << dendl; - - { - Mutex::Locker locker(m_lock); - assert(m_pool_watcher); - m_pool_watcher.reset(); - } - wait_for_update_ops(on_finish); -} - -void Replayer::wait_for_update_ops(Context *on_finish) { - dout(20) << dendl; - - Mutex::Locker locker(m_lock); - - Context *ctx = new FunctionContext([this, on_finish](int r) { - handle_wait_for_update_ops(r, on_finish); - }); - ctx = create_async_context_callback(m_threads->work_queue, ctx); - - m_update_op_tracker.wait_for_ops(ctx); -} - -void Replayer::handle_wait_for_update_ops(int r, Context *on_finish) { - dout(20) << "r=" << r << dendl; - - assert(r == 0); - - Mutex::Locker locker(m_lock); - m_instance_replayer->release_all(on_finish); -} - -} // namespace mirror -} // namespace rbd diff --git a/src/tools/rbd_mirror/Replayer.h b/src/tools/rbd_mirror/Replayer.h deleted file mode 100644 index b6649fb89a51..000000000000 --- a/src/tools/rbd_mirror/Replayer.h +++ /dev/null @@ -1,166 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab - -#ifndef CEPH_RBD_MIRROR_REPLAYER_H -#define CEPH_RBD_MIRROR_REPLAYER_H - -#include -#include -#include -#include - -#include "common/AsyncOpTracker.h" -#include "common/Cond.h" -#include "common/Mutex.h" -#include "common/WorkQueue.h" -#include "include/atomic.h" -#include "include/rados/librados.hpp" - -#include "ClusterWatcher.h" -#include "LeaderWatcher.h" -#include "PoolWatcher.h" -#include "ImageDeleter.h" -#include "types.h" - -class AdminSocketHook; - -namespace librbd { class ImageCtx; } - -namespace rbd { -namespace mirror { - -template struct Threads; -template class InstanceReplayer; -template class InstanceWatcher; - -/** - * Controls mirroring for a single remote cluster. - */ -class Replayer { -public: - Replayer(Threads *threads, - std::shared_ptr image_deleter, - ImageSyncThrottlerRef<> image_sync_throttler, - int64_t local_pool_id, const peer_t &peer, - const std::vector &args); - ~Replayer(); - Replayer(const Replayer&) = delete; - Replayer& operator=(const Replayer&) = delete; - - bool is_blacklisted() const; - bool is_leader() const; - - int init(); - void run(); - - void print_status(Formatter *f, stringstream *ss); - void start(); - void stop(bool manual); - void restart(); - void flush(); - void release_leader(); - -private: - struct PoolWatcherListener : public PoolWatcher<>::Listener { - Replayer *replayer; - - PoolWatcherListener(Replayer *replayer) : replayer(replayer) { - } - - void handle_update(const std::string &mirror_uuid, - const ImageIds &added_image_ids, - const ImageIds &removed_image_ids) override { - replayer->handle_update(mirror_uuid, added_image_ids, removed_image_ids); - } - }; - - struct C_RefreshLocalImages; - - void handle_update(const std::string &mirror_uuid, - const ImageIds &added_image_ids, - const ImageIds &removed_image_ids); - - int init_rados(const std::string &cluster_name, - const std::string &client_name, - const std::string &description, RadosRef *rados_ref); - - void handle_post_acquire_leader(Context *on_finish); - void handle_pre_release_leader(Context *on_finish); - - void refresh_local_images(Context *on_finish); - void handle_refresh_local_images(int r, ImageIds &&image_ids, - Context *on_finish); - - void init_pool_watcher(Context *on_finish); - void shut_down_pool_watcher(Context *on_finish); - void handle_shut_down_pool_watcher(int r, Context *on_finish); - - void wait_for_update_ops(Context *on_finish); - void handle_wait_for_update_ops(int r, Context *on_finish); - - Threads *m_threads; - std::shared_ptr m_image_deleter; - ImageSyncThrottlerRef<> m_image_sync_throttler; - mutable Mutex m_lock; - Cond m_cond; - atomic_t m_stopping; - bool m_manual_stop = false; - bool m_blacklisted = false; - - peer_t m_peer; - std::vector m_args; - RadosRef m_local_rados; - RadosRef m_remote_rados; - - librados::IoCtx m_local_io_ctx; - librados::IoCtx m_remote_io_ctx; - - int64_t m_local_pool_id = -1; - - PoolWatcherListener m_pool_watcher_listener; - std::unique_ptr > m_pool_watcher; - - std::unique_ptr> m_instance_replayer; - - std::string m_asok_hook_name; - AdminSocketHook *m_asok_hook; - - std::set m_init_image_ids; - - class ReplayerThread : public Thread { - Replayer *m_replayer; - public: - ReplayerThread(Replayer *replayer) : m_replayer(replayer) {} - void *entry() override { - m_replayer->run(); - return 0; - } - } m_replayer_thread; - - class LeaderListener : public LeaderWatcher<>::Listener { - public: - LeaderListener(Replayer *replayer) : m_replayer(replayer) { - } - - protected: - void post_acquire_handler(Context *on_finish) override { - m_replayer->handle_post_acquire_leader(on_finish); - } - - void pre_release_handler(Context *on_finish) override { - m_replayer->handle_pre_release_leader(on_finish); - } - - private: - Replayer *m_replayer; - } m_leader_listener; - - std::unique_ptr > m_leader_watcher; - std::unique_ptr > m_instance_watcher; - AsyncOpTracker m_update_op_tracker; -}; - -} // namespace mirror -} // namespace rbd - -#endif // CEPH_RBD_MIRROR_REPLAYER_H