From 733a9bcf8a28afbb02716e19b6ae52a5d52ca4de Mon Sep 17 00:00:00 2001 From: Venky Shankar Date: Wed, 30 Sep 2020 05:41:23 -0400 Subject: [PATCH] cephfs-mirror: switch to using PeerReplayer class Signed-off-by: Venky Shankar --- src/tools/cephfs_mirror/FSMirror.cc | 180 ++++++++++------------------ src/tools/cephfs_mirror/FSMirror.h | 43 ++----- 2 files changed, 71 insertions(+), 152 deletions(-) diff --git a/src/tools/cephfs_mirror/FSMirror.cc b/src/tools/cephfs_mirror/FSMirror.cc index 9bd054fe6f82a..15aa73f831c3f 100644 --- a/src/tools/cephfs_mirror/FSMirror.cc +++ b/src/tools/cephfs_mirror/FSMirror.cc @@ -11,7 +11,9 @@ #include "include/stringify.h" #include "msg/Messenger.h" #include "FSMirror.h" +#include "PeerReplayer.h" #include "aio_utils.h" +#include "Utils.h" #include "common/Cond.h" @@ -86,7 +88,8 @@ private: FSMirror::FSMirror(CephContext *cct, const Filesystem &filesystem, uint64_t pool_id, std::vector args, ContextWQ *work_queue) - : m_filesystem(filesystem), + : m_cct(cct), + m_filesystem(filesystem), m_pool_id(pool_id), m_args(args), m_work_queue(work_queue), @@ -108,104 +111,20 @@ FSMirror::~FSMirror() { delete m_asok_hook; } -int FSMirror::connect(std::string_view client_name, std::string_view cluster_name, - RadosRef *cluster) { - dout(20) << ": connecting to cluster=" << cluster_name << ", client=" << client_name - << dendl; - - CephInitParameters iparams(CEPH_ENTITY_TYPE_CLIENT); - if (client_name.empty() || !iparams.name.from_str(client_name)) { - derr << ": error initializing cluster handle for " << cluster_name << dendl; - return -EINVAL; - } - - CephContext *cct = common_preinit(iparams, CODE_ENVIRONMENT_LIBRARY, - CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS); - cct->_conf->cluster = cluster_name; - - int r = cct->_conf.parse_config_files(nullptr, nullptr, 0); - if (r < 0) { - derr << ": could not read ceph conf: " << ": " << cpp_strerror(r) << dendl; - return r; - } - - cct->_conf.parse_env(cct->get_module_type()); - - std::vector args; - r = cct->_conf.parse_argv(args); - if (r < 0) { - derr << ": could not parse environment: " << cpp_strerror(r) << dendl; - cct->put(); - return r; - } - cct->_conf.parse_env(cct->get_module_type()); - - cluster->reset(new librados::Rados()); - - r = (*cluster)->init_with_context(cct); - ceph_assert(r == 0); - cct->put(); - - r = (*cluster)->connect(); - if (r < 0) { - derr << ": error connecting to " << cluster_name << ": " << cpp_strerror(r) - << dendl; - return r; - } - - dout(10) << ": connected to cluster=" << cluster_name << " using client=" - << client_name << dendl; - - return 0; -} - -void FSMirror::run(PeerReplayer *peer_replayer) { - dout(20) << dendl; - - std::unique_lock locker(m_lock); - while (true) { - dout(20) << ": trying to pick from " << m_directories.size() << " directories" << dendl; - m_cond.wait(locker, [this, peer_replayer]{return m_directories.size() || peer_replayer->is_stopping();}); - if (peer_replayer->is_stopping()) { - dout(5) << ": exiting" << dendl; - break; - } - - locker.unlock(); - ::sleep(1); - locker.lock(); - } -} - -void FSMirror::init_replayers(PeerReplayer *peer_replayer) { +int FSMirror::init_replayer(PeerReplayer *peer_replayer) { ceph_assert(ceph_mutex_is_locked(m_lock)); - - auto nr_replayers = g_ceph_context->_conf.get_val( - "cephfs_mirror_max_concurrent_directory_syncs"); - dout(20) << ": spawning " << nr_replayers << " snapshot replayer(s)" << dendl; - - while (nr_replayers-- > 0) { - std::unique_ptr replayer( - new SnapshotReplayerThread(this, peer_replayer)); - std::string name("replayer-" + stringify(nr_replayers)); - replayer->create(name.c_str()); - peer_replayer->replayers.push_back(std::move(replayer)); - } + return peer_replayer->init(); } -void FSMirror::shutdown_replayers(PeerReplayer *peer_replayer, - std::unique_lock &locker) { - peer_replayer->stopping = true; - m_cond.notify_all(); - - locker.unlock(); - // safe to iterate unlocked - for (auto &replayer : peer_replayer->replayers) { - replayer->join(); - } - locker.lock(); +void FSMirror::shutdown_replayer(PeerReplayer *peer_replayer) { + peer_replayer->shutdown(); +} - peer_replayer->replayers.clear(); +void FSMirror::cleanup() { + dout(20) << dendl; + ceph_unmount(m_mount); + m_ioctx.close(); + m_cluster.reset(); } void FSMirror::init(Context *on_finish) { @@ -219,17 +138,26 @@ void FSMirror::init(Context *on_finish) { return; } - m_addrs = m_cluster->get_addrs(); - dout(10) << ": rados addrs=" << m_addrs << dendl; - r = m_cluster->ioctx_create2(m_pool_id, m_ioctx); if (r < 0) { + m_cluster.reset(); derr << ": error accessing local pool (id=" << m_pool_id << "): " << cpp_strerror(r) << dendl; on_finish->complete(r); return; } + r = mount(m_cluster, m_filesystem, true, &m_mount); + if (r < 0) { + m_ioctx.close(); + m_cluster.reset(); + on_finish->complete(r); + return; + } + + m_addrs = m_cluster->get_addrs(); + dout(10) << ": rados addrs=" << m_addrs << dendl; + init_instance_watcher(on_finish); } @@ -237,12 +165,12 @@ void FSMirror::shutdown(Context *on_finish) { dout(20) << dendl; { - std::unique_lock locker(m_lock); + std::scoped_lock locker(m_lock); m_stopping = true; - m_cond.notify_all(); if (m_on_init_finish != nullptr) { dout(10) << ": delaying shutdown -- init in progress" << dendl; m_on_shutdown_finish = new LambdaContext([this, on_finish](int r) { + cleanup(); if (r < 0) { on_finish->complete(0); return; @@ -254,14 +182,13 @@ void FSMirror::shutdown(Context *on_finish) { } m_on_shutdown_finish = on_finish; + } - for (auto &[peer, peer_replayer] : m_peer_replayers) { + for (auto &[peer, peer_replayer] : m_peer_replayers) { dout(5) << ": shutting down replayer for peer=" << peer << dendl; - shutdown_replayers(&peer_replayer, locker); - } - m_peer_replayers.clear(); + shutdown_replayer(peer_replayer.get()); } - + m_peer_replayers.clear(); shutdown_mirror_watcher(); } @@ -382,7 +309,10 @@ void FSMirror::handle_acquire_directory(string_view dir_path) { std::scoped_lock locker(m_lock); m_directories.emplace(dir_path); - m_cond.notify_all(); + for (auto &[peer, peer_replayer] : m_peer_replayers) { + dout(10) << ": peer=" << peer << dendl; + peer_replayer->add_directory(dir_path); + } } void FSMirror::handle_release_directory(string_view dir_path) { @@ -392,6 +322,10 @@ void FSMirror::handle_release_directory(string_view dir_path) { auto it = m_directories.find(dir_path); if (it != m_directories.end()) { m_directories.erase(it); + for (auto &[peer, peer_replayer] : m_peer_replayers) { + dout(10) << ": peer=" << peer << dendl; + peer_replayer->remove_directory(dir_path); + } } } @@ -400,23 +334,37 @@ void FSMirror::add_peer(const Peer &peer) { std::scoped_lock locker(m_lock); m_all_peers.emplace(peer); - auto p = m_peer_replayers.emplace(peer, PeerReplayer()); - ceph_assert(m_peer_replayers.size() == 1); // support only a single peer - if (p.second) { - init_replayers(&p.first->second); + if (m_peer_replayers.find(peer) != m_peer_replayers.end()) { + return; + } + + auto replayer = std::make_unique( + m_cct, this, m_filesystem, peer, m_directories, m_mount); + int r = init_replayer(replayer.get()); + if (r < 0) { + return; } + m_peer_replayers.emplace(peer, std::move(replayer)); + ceph_assert(m_peer_replayers.size() == 1); // support only a single peer } void FSMirror::remove_peer(const Peer &peer) { dout(10) << ": peer=" << peer << dendl; - std::unique_lock locker(m_lock); - m_all_peers.erase(peer); - auto it = m_peer_replayers.find(peer); - if (it != m_peer_replayers.end()) { + std::unique_ptr replayer; + { + std::scoped_lock locker(m_lock); + m_all_peers.erase(peer); + auto it = m_peer_replayers.find(peer); + if (it != m_peer_replayers.end()) { + replayer = std::move(it->second); + m_peer_replayers.erase(it); + } + } + + if (replayer) { dout(5) << ": shutting down replayers for peer=" << peer << dendl; - shutdown_replayers(&it->second, locker); - m_peer_replayers.erase(it); + shutdown_replayer(replayer.get()); } } diff --git a/src/tools/cephfs_mirror/FSMirror.h b/src/tools/cephfs_mirror/FSMirror.h index 7605529ca0f77..f7939c143b790 100644 --- a/src/tools/cephfs_mirror/FSMirror.h +++ b/src/tools/cephfs_mirror/FSMirror.h @@ -17,6 +17,7 @@ namespace cephfs { namespace mirror { class MirrorAdminSocketHook; +class PeerReplayer; // handle mirroring for a filesystem to a set of peers @@ -89,45 +90,17 @@ private: } }; - struct PeerReplayer; - class SnapshotReplayerThread : public Thread { - public: - SnapshotReplayerThread(FSMirror *fs_mirror, PeerReplayer *peer_replayer) - : m_fs_mirror(fs_mirror), - m_peer_replayer(peer_replayer) { - } - - void *entry() override { - m_fs_mirror->run(m_peer_replayer); - return 0; - } - - private: - FSMirror *m_fs_mirror; - PeerReplayer *m_peer_replayer; - }; - - typedef std::vector> SnapshotReplayers; - struct PeerReplayer { - SnapshotReplayers replayers; - bool stopping = false; - - bool is_stopping() { - return stopping; - } - }; - + CephContext *m_cct; Filesystem m_filesystem; uint64_t m_pool_id; std::vector m_args; ContextWQ *m_work_queue; ceph::mutex m_lock = ceph::make_mutex("cephfs::mirror::fs_mirror"); - ceph::condition_variable m_cond; SnapListener m_snap_listener; std::set> m_directories; Peers m_all_peers; - std::map m_peer_replayers; + std::map> m_peer_replayers; RadosRef m_cluster; std::string m_addrs; @@ -143,13 +116,11 @@ private: MirrorAdminSocketHook *m_asok_hook = nullptr; - void run(PeerReplayer *peer_replayer); - void init_replayers(PeerReplayer *peer_replayer); - void shutdown_replayers(PeerReplayer *peer_replayer, - std::unique_lock &locker); + MountRef m_mount; - int connect(std::string_view cluster_name, std::string_view client_name, - RadosRef *cluster); + int init_replayer(PeerReplayer *peer_replayer); + void shutdown_replayer(PeerReplayer *peer_replayer); + void cleanup(); void init_instance_watcher(Context *on_finish); void handle_init_instance_watcher(int r); -- 2.39.5