--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "common/ceph_argparse.h"
+#include "common/ceph_context.h"
+#include "common/common_init.h"
+#include "common/Cond.h"
+#include "common/debug.h"
+#include "common/errno.h"
+#include "common/Timer.h"
+#include "common/WorkQueue.h"
+#include "include/types.h"
+#include "json_spirit/json_spirit.h"
+#include "mon/MonClient.h"
+#include "msg/Messenger.h"
+#include "aio_utils.h"
+#include "Mirror.h"
+
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_cephfs_mirror
+#undef dout_prefix
+#define dout_prefix *_dout << "cephfs::mirror::Mirror " << __func__
+
+namespace cephfs {
+namespace mirror {
+
+namespace {
+
+class SafeTimerSingleton : public SafeTimer {
+public:
+ SafeTimer *timer;
+ ceph::mutex timer_lock = ceph::make_mutex("cephfs::mirror::timer_lock");
+
+ explicit SafeTimerSingleton(CephContext *cct)
+ : SafeTimer(cct, timer_lock, true) {
+ init();
+ }
+
+ ~SafeTimerSingleton() {
+ std::scoped_lock locker(timer_lock);
+ shutdown();
+ }
+};
+
+class ThreadPoolSingleton : public ThreadPool {
+public:
+ ContextWQ *work_queue = nullptr;
+
+ explicit ThreadPoolSingleton(CephContext *cct)
+ : ThreadPool(cct, "Mirror::thread_pool", "tp_mirror", 1) {
+ work_queue = new ContextWQ("Mirror::work_queue", ceph::make_timespan(60), this);
+
+ start();
+ }
+
+ ~ThreadPoolSingleton() override {
+ work_queue->drain();
+ delete work_queue;
+
+ stop();
+ }
+};
+
+} // anonymous namespace
+
+Mirror::Mirror(CephContext *cct, const std::vector<const char*> &args,
+ MonClient *monc, Messenger *msgr)
+ : m_cct(cct),
+ m_args(args),
+ m_monc(monc),
+ m_msgr(msgr),
+ m_listener(this) {
+ auto thread_pool = &(cct->lookup_or_create_singleton_object<ThreadPoolSingleton>(
+ "cephfs::mirror::thread_pool", false, cct));
+ auto safe_timer = &(cct->lookup_or_create_singleton_object<SafeTimerSingleton>(
+ "cephfs::mirror::safe_timer", false, cct));
+ m_work_queue = thread_pool->work_queue;
+ m_timer = safe_timer->timer;
+}
+
+Mirror::~Mirror() {
+ dout(10) << dendl;
+}
+
+int Mirror::init_mon_client() {
+ dout(20) << dendl;
+
+ m_monc->set_messenger(m_msgr);
+ m_monc->set_want_keys(CEPH_ENTITY_TYPE_MON);
+
+ int r = m_monc->init();
+ if (r < 0) {
+ derr << ": failed to init mon client: " << cpp_strerror(r) << dendl;
+ return r;
+ }
+
+ r = m_monc->authenticate(m_cct->_conf->client_mount_timeout);
+ if (r < 0) {
+ derr << ": failed to authenticate to monitor: " << cpp_strerror(r) << dendl;
+ return r;
+ }
+
+ client_t me = m_monc->get_global_id();
+ m_msgr->set_myname(entity_name_t::CLIENT(me.v));
+ return 0;
+}
+
+int Mirror::init(std::string &reason) {
+ dout(20) << dendl;
+
+ std::scoped_lock locker(m_lock);
+ int r = init_mon_client();
+ if (r < 0) {
+ return r;
+ }
+
+ return 0;
+}
+
+void Mirror::shutdown() {
+ dout(20) << dendl;
+
+ std::unique_lock locker(m_lock);
+ if (m_fs_mirrors.empty()) {
+ return;
+ }
+
+ m_stopping = true;
+ m_cond.notify_all();
+ m_cond.wait(locker, [this] {return m_stopped;});
+}
+
+void Mirror::handle_signal(int signum) {
+ dout(10) << ": signal=" << signum << dendl;
+ ceph_assert(signum == SIGTERM || signum == SIGINT);
+ shutdown();
+ ::exit(0);
+}
+
+void Mirror::handle_mirroring_enabled(const std::string &fs_name, int r) {
+ dout(20) << ": fs_name=" << fs_name << ", r=" << r << dendl;
+
+ std::scoped_lock locker(m_lock);
+ if (r < 0) {
+ derr << ": failed to initialize FSMirror for filesystem=" << fs_name
+ << ": " << cpp_strerror(r) << dendl;
+ if (!m_stopping) {
+ m_fs_mirrors.erase(fs_name);
+ }
+
+ return;
+ }
+
+ dout(10) << ": Initialized FSMirror for filesystem=" << fs_name << dendl;
+}
+
+void Mirror::mirroring_enabled(const std::string &fs_name, uint64_t local_pool_id) {
+ dout(10) << ": fs_name=" << fs_name << ", pool_id=" << local_pool_id << dendl;
+
+ std::scoped_lock locker(m_lock);
+ if (m_stopping) {
+ return;
+ }
+
+ // TODO: handle consecutive overlapping enable/disable calls
+ ceph_assert(m_fs_mirrors.find(fs_name) == m_fs_mirrors.end());
+
+ dout(10) << ": starting FSMirror: fs_name=" << fs_name << dendl;
+ std::unique_ptr<FSMirror> fs_mirror(new FSMirror(m_cct, fs_name, local_pool_id,
+ m_args, m_work_queue));
+ Context *on_finish = new LambdaContext([this, fs_name](int r) {
+ handle_mirroring_enabled(fs_name, r);
+ });
+ fs_mirror->init(new C_AsyncCallback<ContextWQ>(m_work_queue, on_finish));
+ m_fs_mirrors.emplace(fs_name, std::move(fs_mirror));
+}
+
+void Mirror::mirroring_disabled(const std::string &fs_name) {
+ dout(10) << ": fs_name=" << fs_name << dendl;
+
+ std::scoped_lock locker(m_lock);
+ if (!m_fs_mirrors.count(fs_name)) {
+ dout(5) << ": fs mirror not found -- init failure(?) for " << fs_name
+ << dendl;
+ return;
+ }
+
+ if (m_stopping) {
+ dout(5) << "shutting down" << dendl;
+ return;
+ }
+
+ auto &fs_mirror = m_fs_mirrors.at(fs_name);
+ if (!fs_mirror->is_stopping()) {
+ Context *on_finish = new LambdaContext([this, fs_name](int r) {
+ handle_shutdown(fs_name, r);
+ });
+ fs_mirror->shutdown(new C_AsyncCallback<ContextWQ>(m_work_queue, on_finish));
+ }
+}
+
+void Mirror::peer_added(const std::string &fs_name, const Peer &peer) {
+ dout(20) << ": fs_name=" << fs_name << ", peer=" << peer << dendl;
+
+ std::scoped_lock locker(m_lock);
+ if (!m_fs_mirrors.count(fs_name)) {
+ dout(5) << ": fs mirror not found -- init failure(?) for " << fs_name
+ << dendl;
+ return;
+ }
+
+ if (m_stopping) {
+ dout(5) << "shutting down" << dendl;
+ return;
+ }
+
+ auto &fs_mirror = m_fs_mirrors.at(fs_name);
+ fs_mirror->add_peer(peer);
+}
+
+void Mirror::peer_removed(const std::string &fs_name, const Peer &peer) {
+ dout(20) << ": fs_name=" << fs_name << ", peer=" << peer << dendl;
+
+ std::scoped_lock locker(m_lock);
+ if (!m_fs_mirrors.count(fs_name)) {
+ dout(5) << ": fs mirror not found -- init failure(?) for " << fs_name
+ << dendl;
+ return;
+ }
+
+ if (m_stopping) {
+ dout(5) << "shutting down" << dendl;
+ return;
+ }
+
+ auto &fs_mirror = m_fs_mirrors.at(fs_name);
+ fs_mirror->remove_peer(peer);
+}
+
+void Mirror::handle_shutdown(const std::string &fs_name, int r) {
+ dout(10) << ": fs_name=" << fs_name << ", r=" << r << dendl;
+
+ std::scoped_lock locker(m_lock);
+ m_fs_mirrors.erase(fs_name);
+ m_cond.notify_all();
+}
+
+void Mirror::run() {
+ dout(20) << dendl;
+
+ std::unique_lock locker(m_lock);
+ m_cluster_watcher.reset(new ClusterWatcher(m_cct, m_monc, m_listener));
+ m_msgr->add_dispatcher_tail(m_cluster_watcher.get());
+
+ m_cluster_watcher->init();
+ m_cond.wait(locker, [this]{return m_stopping;});
+
+ for (auto &[fs_name, fs_mirror] : m_fs_mirrors) {
+ dout(10) << ": shutting down mirror for fs_name=" << fs_name << dendl;
+ if (fs_mirror->is_stopping()) {
+ dout(10) << ": fs_name=" << fs_name << " is under shutdown" << dendl;
+ continue;
+ }
+
+ Context *on_finish = new LambdaContext([this, fs_name](int r) {
+ handle_shutdown(fs_name, r);
+ });
+ fs_mirror->shutdown(new C_AsyncCallback<ContextWQ>(m_work_queue, on_finish));
+ }
+
+ m_cond.wait(locker, [this] {return m_fs_mirrors.empty();});
+
+ m_stopped = true;
+ m_cond.notify_all();
+}
+
+} // namespace mirror
+} // namespace cephfs
+
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPHFS_MIRROR_H
+#define CEPHFS_MIRROR_H
+
+#include <map>
+#include <set>
+#include <vector>
+
+#include "common/ceph_mutex.h"
+#include "mds/FSMap.h"
+#include "ClusterWatcher.h"
+#include "FSMirror.h"
+#include "Types.h"
+
+class CephContext;
+class Messenger;
+class MonClient;
+class ContextWQ;
+
+namespace cephfs {
+namespace mirror {
+
+// this wraps up ClusterWatcher and FSMirrors to implement mirroring
+// for ceph filesystems.
+
+class Mirror {
+public:
+ Mirror(CephContext *cct, const std::vector<const char*> &args,
+ MonClient *monc, Messenger *msgr);
+ ~Mirror();
+
+ int init(std::string &reason);
+ void shutdown();
+ void run();
+
+ void handle_signal(int signum);
+
+private:
+ static constexpr std::string_view MIRRORING_MODULE = "mirroring";
+
+ struct ClusterListener : ClusterWatcher::Listener {
+ Mirror *mirror;
+
+ ClusterListener(Mirror *mirror)
+ : mirror(mirror) {
+ }
+
+ void handle_mirroring_enabled(const FilesystemSpec &spec) override {
+ mirror->mirroring_enabled(spec.fs_name, spec.pool_id);
+ }
+
+ void handle_mirroring_disabled(const std::string &fs_name) override {
+ mirror->mirroring_disabled(fs_name);
+ }
+
+ void handle_peers_added(const std::string &fs_name, const Peer &peer) override {
+ mirror->peer_added(fs_name, peer);
+ }
+
+ void handle_peers_removed(const std::string &fs_name, const Peer &peer) override {
+ mirror->peer_removed(fs_name, peer);
+ }
+ };
+
+ ceph::mutex m_lock = ceph::make_mutex("cephfs::mirror::Mirror");
+ ceph::condition_variable m_cond;
+
+ CephContext *m_cct;
+ std::vector<const char *> m_args;
+ MonClient *m_monc;
+ Messenger *m_msgr;
+ ClusterListener m_listener;
+
+ ContextWQ *m_work_queue = nullptr;
+ SafeTimer *m_timer = nullptr;
+
+ bool m_stopping = false;
+ bool m_stopped = false;
+ std::unique_ptr<ClusterWatcher> m_cluster_watcher;
+ std::map<std::string, std::unique_ptr<FSMirror>> m_fs_mirrors;
+
+ int init_mon_client();
+
+ void handle_mirroring_enabled(const std::string &fs_name, int r);
+ void mirroring_enabled(const std::string &fs_name, uint64_t local_pool_id);
+ void mirroring_disabled(const std::string &fs_name);
+
+ void peer_added(const std::string &fs_name, const Peer &peer);
+ void peer_removed(const std::string &fs_name, const Peer &peer);
+
+ void handle_shutdown(const std::string &fs_name, int r);
+};
+
+} // namespace mirror
+} // namespace cephfs
+
+#endif // CEPHFS_MIRROR_H
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "common/ceph_argparse.h"
+#include "common/config.h"
+#include "common/debug.h"
+#include "common/errno.h"
+#include "common/async/context_pool.h"
+#include "global/global_init.h"
+#include "global/signal_handler.h"
+#include "mon/MonClient.h"
+#include "msg/Messenger.h"
+#include "Mirror.h"
+
+#include <vector>
+
+void usage() {
+ std::cout << "usage: cephfs-mirror [options...]" << std::endl;
+ std::cout << "options:\n";
+ std::cout << " --remote/-r FILE remote cluster configuration\n";
+ std::cout << " --keyring=<path> path to keyring for remote cluster\n";
+ std::cout << " --log-file=<logfile> file to log debug output\n";
+ std::cout << " --debug-cephfs-mirror=<log-level>/<memory-level> set cephfs-sync debug level\n";
+ generic_server_usage();
+}
+
+cephfs::mirror::Mirror *mirror = nullptr;
+
+static void handle_signal(int signum) {
+ if (mirror) {
+ mirror->handle_signal(signum);
+ }
+}
+
+int main(int argc, const char **argv) {
+ std::vector<const char*> args;
+ argv_to_vec(argc, argv, args);
+ if (args.empty()) {
+ cerr << argv[0] << ": -h or --help for usage" << std::endl;
+ ::exit(1);
+ }
+
+ if (ceph_argparse_need_usage(args)) {
+ usage();
+ ::exit(0);
+ }
+
+ auto cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_CLIENT,
+ CODE_ENVIRONMENT_DAEMON,
+ CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS);
+
+ if (g_conf()->daemonize) {
+ global_init_daemonize(g_ceph_context);
+ }
+
+ common_init_finish(g_ceph_context);
+
+ init_async_signal_handler();
+ register_async_signal_handler_oneshot(SIGINT, handle_signal);
+ register_async_signal_handler_oneshot(SIGTERM, handle_signal);
+
+ std::vector<const char*> cmd_args;
+ argv_to_vec(argc, argv, cmd_args);
+
+ Messenger *msgr = Messenger::create_client_messenger(g_ceph_context, "client");
+ msgr->set_default_policy(Messenger::Policy::lossy_client(0));
+
+ std::string reason;
+ ceph::async::io_context_pool ctxpool(1);
+ MonClient monc(MonClient(g_ceph_context, ctxpool));
+ int r = monc.build_initial_monmap();
+ if (r < 0) {
+ cerr << "failed to generate initial monmap" << std::endl;
+ goto cleanup_messenger;
+ }
+
+ msgr->start();
+
+ mirror = new cephfs::mirror::Mirror(g_ceph_context, cmd_args, &monc, msgr);
+ r = mirror->init(reason);
+ if (r < 0) {
+ std::cerr << "failed to initialize cephfs-mirror: " << reason << std::endl;
+ goto cleanup;
+ }
+
+ mirror->run();
+
+cleanup:
+ monc.shutdown();
+cleanup_messenger:
+ msgr->shutdown();
+ delete mirror;
+
+ return r < 0 ? EXIT_SUCCESS : EXIT_FAILURE;
+}