From: Venky Shankar Date: Thu, 18 Jun 2020 05:50:25 +0000 (-0400) Subject: cephfs-mirror: cephfs-mirror daemon X-Git-Tag: v16.1.0~1247^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=fbc5df0c9278d5d6f7d60a284697b238a78b2349;p=ceph.git cephfs-mirror: cephfs-mirror daemon Signed-off-by: Venky Shankar --- diff --git a/src/tools/CMakeLists.txt b/src/tools/CMakeLists.txt index 9f46b5314dae..3c2168b92bf9 100644 --- a/src/tools/CMakeLists.txt +++ b/src/tools/CMakeLists.txt @@ -128,6 +128,7 @@ endif(WITH_TESTS) if(WITH_CEPHFS) add_subdirectory(cephfs) + add_subdirectory(cephfs_mirror) endif(WITH_CEPHFS) if(WITH_RBD) diff --git a/src/tools/cephfs_mirror/CMakeLists.txt b/src/tools/cephfs_mirror/CMakeLists.txt new file mode 100644 index 000000000000..9edabaa90f3c --- /dev/null +++ b/src/tools/cephfs_mirror/CMakeLists.txt @@ -0,0 +1,27 @@ +set(cephfs_mirror_internal + ClusterWatcher.cc + Mirror.cc + FSMirror.cc + InstanceWatcher.cc + MirrorWatcher.cc + Types.cc + Watcher.cc + watcher/RewatchRequest.cc) + +add_executable(cephfs-mirror + main.cc) + +add_library(cephfs_mirror_internal STATIC + ${cephfs_mirror_internal}) + +target_link_libraries(cephfs-mirror + cephfs_mirror_internal + global + ceph-common + cls_cephfs_client + librados + mds + cephfs + ${ALLOC_LIBS}) + +install(TARGETS cephfs-mirror DESTINATION bin) diff --git a/src/tools/cephfs_mirror/Mirror.cc b/src/tools/cephfs_mirror/Mirror.cc new file mode 100644 index 000000000000..5a14fd8d5fbf --- /dev/null +++ b/src/tools/cephfs_mirror/Mirror.cc @@ -0,0 +1,280 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "common/ceph_argparse.h" +#include "common/ceph_context.h" +#include "common/common_init.h" +#include "common/Cond.h" +#include "common/debug.h" +#include "common/errno.h" +#include "common/Timer.h" +#include "common/WorkQueue.h" +#include "include/types.h" +#include "json_spirit/json_spirit.h" +#include "mon/MonClient.h" +#include "msg/Messenger.h" +#include "aio_utils.h" +#include "Mirror.h" + + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_cephfs_mirror +#undef dout_prefix +#define dout_prefix *_dout << "cephfs::mirror::Mirror " << __func__ + +namespace cephfs { +namespace mirror { + +namespace { + +class SafeTimerSingleton : public SafeTimer { +public: + SafeTimer *timer; + ceph::mutex timer_lock = ceph::make_mutex("cephfs::mirror::timer_lock"); + + explicit SafeTimerSingleton(CephContext *cct) + : SafeTimer(cct, timer_lock, true) { + init(); + } + + ~SafeTimerSingleton() { + std::scoped_lock locker(timer_lock); + shutdown(); + } +}; + +class ThreadPoolSingleton : public ThreadPool { +public: + ContextWQ *work_queue = nullptr; + + explicit ThreadPoolSingleton(CephContext *cct) + : ThreadPool(cct, "Mirror::thread_pool", "tp_mirror", 1) { + work_queue = new ContextWQ("Mirror::work_queue", ceph::make_timespan(60), this); + + start(); + } + + ~ThreadPoolSingleton() override { + work_queue->drain(); + delete work_queue; + + stop(); + } +}; + +} // anonymous namespace + +Mirror::Mirror(CephContext *cct, const std::vector &args, + MonClient *monc, Messenger *msgr) + : m_cct(cct), + m_args(args), + m_monc(monc), + m_msgr(msgr), + m_listener(this) { + auto thread_pool = &(cct->lookup_or_create_singleton_object( + "cephfs::mirror::thread_pool", false, cct)); + auto safe_timer = &(cct->lookup_or_create_singleton_object( + "cephfs::mirror::safe_timer", false, cct)); + m_work_queue = thread_pool->work_queue; + m_timer = safe_timer->timer; +} + +Mirror::~Mirror() { + dout(10) << dendl; +} + +int Mirror::init_mon_client() { + dout(20) << dendl; + + m_monc->set_messenger(m_msgr); + m_monc->set_want_keys(CEPH_ENTITY_TYPE_MON); + + int r = m_monc->init(); + if (r < 0) { + derr << ": failed to init mon client: " << cpp_strerror(r) << dendl; + return r; + } + + r = m_monc->authenticate(m_cct->_conf->client_mount_timeout); + if (r < 0) { + derr << ": failed to authenticate to monitor: " << cpp_strerror(r) << dendl; + return r; + } + + client_t me = m_monc->get_global_id(); + m_msgr->set_myname(entity_name_t::CLIENT(me.v)); + return 0; +} + +int Mirror::init(std::string &reason) { + dout(20) << dendl; + + std::scoped_lock locker(m_lock); + int r = init_mon_client(); + if (r < 0) { + return r; + } + + return 0; +} + +void Mirror::shutdown() { + dout(20) << dendl; + + std::unique_lock locker(m_lock); + if (m_fs_mirrors.empty()) { + return; + } + + m_stopping = true; + m_cond.notify_all(); + m_cond.wait(locker, [this] {return m_stopped;}); +} + +void Mirror::handle_signal(int signum) { + dout(10) << ": signal=" << signum << dendl; + ceph_assert(signum == SIGTERM || signum == SIGINT); + shutdown(); + ::exit(0); +} + +void Mirror::handle_mirroring_enabled(const std::string &fs_name, int r) { + dout(20) << ": fs_name=" << fs_name << ", r=" << r << dendl; + + std::scoped_lock locker(m_lock); + if (r < 0) { + derr << ": failed to initialize FSMirror for filesystem=" << fs_name + << ": " << cpp_strerror(r) << dendl; + if (!m_stopping) { + m_fs_mirrors.erase(fs_name); + } + + return; + } + + dout(10) << ": Initialized FSMirror for filesystem=" << fs_name << dendl; +} + +void Mirror::mirroring_enabled(const std::string &fs_name, uint64_t local_pool_id) { + dout(10) << ": fs_name=" << fs_name << ", pool_id=" << local_pool_id << dendl; + + std::scoped_lock locker(m_lock); + if (m_stopping) { + return; + } + + // TODO: handle consecutive overlapping enable/disable calls + ceph_assert(m_fs_mirrors.find(fs_name) == m_fs_mirrors.end()); + + dout(10) << ": starting FSMirror: fs_name=" << fs_name << dendl; + std::unique_ptr fs_mirror(new FSMirror(m_cct, fs_name, local_pool_id, + m_args, m_work_queue)); + Context *on_finish = new LambdaContext([this, fs_name](int r) { + handle_mirroring_enabled(fs_name, r); + }); + fs_mirror->init(new C_AsyncCallback(m_work_queue, on_finish)); + m_fs_mirrors.emplace(fs_name, std::move(fs_mirror)); +} + +void Mirror::mirroring_disabled(const std::string &fs_name) { + dout(10) << ": fs_name=" << fs_name << dendl; + + std::scoped_lock locker(m_lock); + if (!m_fs_mirrors.count(fs_name)) { + dout(5) << ": fs mirror not found -- init failure(?) for " << fs_name + << dendl; + return; + } + + if (m_stopping) { + dout(5) << "shutting down" << dendl; + return; + } + + auto &fs_mirror = m_fs_mirrors.at(fs_name); + if (!fs_mirror->is_stopping()) { + Context *on_finish = new LambdaContext([this, fs_name](int r) { + handle_shutdown(fs_name, r); + }); + fs_mirror->shutdown(new C_AsyncCallback(m_work_queue, on_finish)); + } +} + +void Mirror::peer_added(const std::string &fs_name, const Peer &peer) { + dout(20) << ": fs_name=" << fs_name << ", peer=" << peer << dendl; + + std::scoped_lock locker(m_lock); + if (!m_fs_mirrors.count(fs_name)) { + dout(5) << ": fs mirror not found -- init failure(?) for " << fs_name + << dendl; + return; + } + + if (m_stopping) { + dout(5) << "shutting down" << dendl; + return; + } + + auto &fs_mirror = m_fs_mirrors.at(fs_name); + fs_mirror->add_peer(peer); +} + +void Mirror::peer_removed(const std::string &fs_name, const Peer &peer) { + dout(20) << ": fs_name=" << fs_name << ", peer=" << peer << dendl; + + std::scoped_lock locker(m_lock); + if (!m_fs_mirrors.count(fs_name)) { + dout(5) << ": fs mirror not found -- init failure(?) for " << fs_name + << dendl; + return; + } + + if (m_stopping) { + dout(5) << "shutting down" << dendl; + return; + } + + auto &fs_mirror = m_fs_mirrors.at(fs_name); + fs_mirror->remove_peer(peer); +} + +void Mirror::handle_shutdown(const std::string &fs_name, int r) { + dout(10) << ": fs_name=" << fs_name << ", r=" << r << dendl; + + std::scoped_lock locker(m_lock); + m_fs_mirrors.erase(fs_name); + m_cond.notify_all(); +} + +void Mirror::run() { + dout(20) << dendl; + + std::unique_lock locker(m_lock); + m_cluster_watcher.reset(new ClusterWatcher(m_cct, m_monc, m_listener)); + m_msgr->add_dispatcher_tail(m_cluster_watcher.get()); + + m_cluster_watcher->init(); + m_cond.wait(locker, [this]{return m_stopping;}); + + for (auto &[fs_name, fs_mirror] : m_fs_mirrors) { + dout(10) << ": shutting down mirror for fs_name=" << fs_name << dendl; + if (fs_mirror->is_stopping()) { + dout(10) << ": fs_name=" << fs_name << " is under shutdown" << dendl; + continue; + } + + Context *on_finish = new LambdaContext([this, fs_name](int r) { + handle_shutdown(fs_name, r); + }); + fs_mirror->shutdown(new C_AsyncCallback(m_work_queue, on_finish)); + } + + m_cond.wait(locker, [this] {return m_fs_mirrors.empty();}); + + m_stopped = true; + m_cond.notify_all(); +} + +} // namespace mirror +} // namespace cephfs + diff --git a/src/tools/cephfs_mirror/Mirror.h b/src/tools/cephfs_mirror/Mirror.h new file mode 100644 index 000000000000..6bc2f60a52cd --- /dev/null +++ b/src/tools/cephfs_mirror/Mirror.h @@ -0,0 +1,99 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPHFS_MIRROR_H +#define CEPHFS_MIRROR_H + +#include +#include +#include + +#include "common/ceph_mutex.h" +#include "mds/FSMap.h" +#include "ClusterWatcher.h" +#include "FSMirror.h" +#include "Types.h" + +class CephContext; +class Messenger; +class MonClient; +class ContextWQ; + +namespace cephfs { +namespace mirror { + +// this wraps up ClusterWatcher and FSMirrors to implement mirroring +// for ceph filesystems. + +class Mirror { +public: + Mirror(CephContext *cct, const std::vector &args, + MonClient *monc, Messenger *msgr); + ~Mirror(); + + int init(std::string &reason); + void shutdown(); + void run(); + + void handle_signal(int signum); + +private: + static constexpr std::string_view MIRRORING_MODULE = "mirroring"; + + struct ClusterListener : ClusterWatcher::Listener { + Mirror *mirror; + + ClusterListener(Mirror *mirror) + : mirror(mirror) { + } + + void handle_mirroring_enabled(const FilesystemSpec &spec) override { + mirror->mirroring_enabled(spec.fs_name, spec.pool_id); + } + + void handle_mirroring_disabled(const std::string &fs_name) override { + mirror->mirroring_disabled(fs_name); + } + + void handle_peers_added(const std::string &fs_name, const Peer &peer) override { + mirror->peer_added(fs_name, peer); + } + + void handle_peers_removed(const std::string &fs_name, const Peer &peer) override { + mirror->peer_removed(fs_name, peer); + } + }; + + ceph::mutex m_lock = ceph::make_mutex("cephfs::mirror::Mirror"); + ceph::condition_variable m_cond; + + CephContext *m_cct; + std::vector m_args; + MonClient *m_monc; + Messenger *m_msgr; + ClusterListener m_listener; + + ContextWQ *m_work_queue = nullptr; + SafeTimer *m_timer = nullptr; + + bool m_stopping = false; + bool m_stopped = false; + std::unique_ptr m_cluster_watcher; + std::map> m_fs_mirrors; + + int init_mon_client(); + + void handle_mirroring_enabled(const std::string &fs_name, int r); + void mirroring_enabled(const std::string &fs_name, uint64_t local_pool_id); + void mirroring_disabled(const std::string &fs_name); + + void peer_added(const std::string &fs_name, const Peer &peer); + void peer_removed(const std::string &fs_name, const Peer &peer); + + void handle_shutdown(const std::string &fs_name, int r); +}; + +} // namespace mirror +} // namespace cephfs + +#endif // CEPHFS_MIRROR_H diff --git a/src/tools/cephfs_mirror/main.cc b/src/tools/cephfs_mirror/main.cc new file mode 100644 index 000000000000..ccc395e108e3 --- /dev/null +++ b/src/tools/cephfs_mirror/main.cc @@ -0,0 +1,95 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "common/ceph_argparse.h" +#include "common/config.h" +#include "common/debug.h" +#include "common/errno.h" +#include "common/async/context_pool.h" +#include "global/global_init.h" +#include "global/signal_handler.h" +#include "mon/MonClient.h" +#include "msg/Messenger.h" +#include "Mirror.h" + +#include + +void usage() { + std::cout << "usage: cephfs-mirror [options...]" << std::endl; + std::cout << "options:\n"; + std::cout << " --remote/-r FILE remote cluster configuration\n"; + std::cout << " --keyring= path to keyring for remote cluster\n"; + std::cout << " --log-file= file to log debug output\n"; + std::cout << " --debug-cephfs-mirror=/ set cephfs-sync debug level\n"; + generic_server_usage(); +} + +cephfs::mirror::Mirror *mirror = nullptr; + +static void handle_signal(int signum) { + if (mirror) { + mirror->handle_signal(signum); + } +} + +int main(int argc, const char **argv) { + std::vector args; + argv_to_vec(argc, argv, args); + if (args.empty()) { + cerr << argv[0] << ": -h or --help for usage" << std::endl; + ::exit(1); + } + + if (ceph_argparse_need_usage(args)) { + usage(); + ::exit(0); + } + + auto cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_CLIENT, + CODE_ENVIRONMENT_DAEMON, + CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS); + + if (g_conf()->daemonize) { + global_init_daemonize(g_ceph_context); + } + + common_init_finish(g_ceph_context); + + init_async_signal_handler(); + register_async_signal_handler_oneshot(SIGINT, handle_signal); + register_async_signal_handler_oneshot(SIGTERM, handle_signal); + + std::vector cmd_args; + argv_to_vec(argc, argv, cmd_args); + + Messenger *msgr = Messenger::create_client_messenger(g_ceph_context, "client"); + msgr->set_default_policy(Messenger::Policy::lossy_client(0)); + + std::string reason; + ceph::async::io_context_pool ctxpool(1); + MonClient monc(MonClient(g_ceph_context, ctxpool)); + int r = monc.build_initial_monmap(); + if (r < 0) { + cerr << "failed to generate initial monmap" << std::endl; + goto cleanup_messenger; + } + + msgr->start(); + + mirror = new cephfs::mirror::Mirror(g_ceph_context, cmd_args, &monc, msgr); + r = mirror->init(reason); + if (r < 0) { + std::cerr << "failed to initialize cephfs-mirror: " << reason << std::endl; + goto cleanup; + } + + mirror->run(); + +cleanup: + monc.shutdown(); +cleanup_messenger: + msgr->shutdown(); + delete mirror; + + return r < 0 ? EXIT_SUCCESS : EXIT_FAILURE; +}