]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
cephfs-mirror: cephfs-mirror daemon
authorVenky Shankar <vshankar@redhat.com>
Thu, 18 Jun 2020 05:50:25 +0000 (01:50 -0400)
committerVenky Shankar <vshankar@redhat.com>
Tue, 1 Sep 2020 10:58:10 +0000 (06:58 -0400)
Signed-off-by: Venky Shankar <vshankar@redhat.com>
src/tools/CMakeLists.txt
src/tools/cephfs_mirror/CMakeLists.txt [new file with mode: 0644]
src/tools/cephfs_mirror/Mirror.cc [new file with mode: 0644]
src/tools/cephfs_mirror/Mirror.h [new file with mode: 0644]
src/tools/cephfs_mirror/main.cc [new file with mode: 0644]

index 9f46b5314dae3469242fb7f5c0cc2b05f5e9a5a3..3c2168b92bf9603efb093be4e3ba7e495f2cf0c0 100644 (file)
@@ -128,6 +128,7 @@ endif(WITH_TESTS)
 
 if(WITH_CEPHFS)
   add_subdirectory(cephfs)
+  add_subdirectory(cephfs_mirror)
 endif(WITH_CEPHFS)
 
 if(WITH_RBD)
diff --git a/src/tools/cephfs_mirror/CMakeLists.txt b/src/tools/cephfs_mirror/CMakeLists.txt
new file mode 100644 (file)
index 0000000..9edabaa
--- /dev/null
@@ -0,0 +1,27 @@
+set(cephfs_mirror_internal
+  ClusterWatcher.cc
+  Mirror.cc
+  FSMirror.cc
+  InstanceWatcher.cc
+  MirrorWatcher.cc
+  Types.cc
+  Watcher.cc
+  watcher/RewatchRequest.cc)
+
+add_executable(cephfs-mirror
+  main.cc)
+
+add_library(cephfs_mirror_internal STATIC
+  ${cephfs_mirror_internal})
+
+target_link_libraries(cephfs-mirror
+  cephfs_mirror_internal
+  global
+  ceph-common
+  cls_cephfs_client
+  librados
+  mds
+  cephfs
+  ${ALLOC_LIBS})
+
+install(TARGETS cephfs-mirror DESTINATION bin)
diff --git a/src/tools/cephfs_mirror/Mirror.cc b/src/tools/cephfs_mirror/Mirror.cc
new file mode 100644 (file)
index 0000000..5a14fd8
--- /dev/null
@@ -0,0 +1,280 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "common/ceph_argparse.h"
+#include "common/ceph_context.h"
+#include "common/common_init.h"
+#include "common/Cond.h"
+#include "common/debug.h"
+#include "common/errno.h"
+#include "common/Timer.h"
+#include "common/WorkQueue.h"
+#include "include/types.h"
+#include "json_spirit/json_spirit.h"
+#include "mon/MonClient.h"
+#include "msg/Messenger.h"
+#include "aio_utils.h"
+#include "Mirror.h"
+
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_cephfs_mirror
+#undef dout_prefix
+#define dout_prefix *_dout << "cephfs::mirror::Mirror " << __func__
+
+namespace cephfs {
+namespace mirror {
+
+namespace {
+
+class SafeTimerSingleton : public SafeTimer {
+public:
+  SafeTimer *timer;
+  ceph::mutex timer_lock = ceph::make_mutex("cephfs::mirror::timer_lock");
+
+  explicit SafeTimerSingleton(CephContext *cct)
+    : SafeTimer(cct, timer_lock, true) {
+    init();
+  }
+
+  ~SafeTimerSingleton() {
+    std::scoped_lock locker(timer_lock);
+    shutdown();
+  }
+};
+
+class ThreadPoolSingleton : public ThreadPool {
+public:
+  ContextWQ *work_queue = nullptr;
+
+  explicit ThreadPoolSingleton(CephContext *cct)
+    : ThreadPool(cct, "Mirror::thread_pool", "tp_mirror", 1) {
+    work_queue = new ContextWQ("Mirror::work_queue", ceph::make_timespan(60), this);
+
+    start();
+  }
+
+  ~ThreadPoolSingleton() override {
+    work_queue->drain();
+    delete work_queue;
+
+    stop();
+  }
+};
+
+} // anonymous namespace
+
+Mirror::Mirror(CephContext *cct, const std::vector<const char*> &args,
+                MonClient *monc, Messenger *msgr)
+  : m_cct(cct),
+    m_args(args),
+    m_monc(monc),
+    m_msgr(msgr),
+    m_listener(this) {
+  auto thread_pool = &(cct->lookup_or_create_singleton_object<ThreadPoolSingleton>(
+                         "cephfs::mirror::thread_pool", false, cct));
+  auto safe_timer = &(cct->lookup_or_create_singleton_object<SafeTimerSingleton>(
+                        "cephfs::mirror::safe_timer", false, cct));
+  m_work_queue = thread_pool->work_queue;
+  m_timer = safe_timer->timer;
+}
+
+Mirror::~Mirror() {
+  dout(10) << dendl;
+}
+
+int Mirror::init_mon_client() {
+  dout(20) << dendl;
+
+  m_monc->set_messenger(m_msgr);
+  m_monc->set_want_keys(CEPH_ENTITY_TYPE_MON);
+
+  int r = m_monc->init();
+  if (r < 0) {
+    derr << ": failed to init mon client: " << cpp_strerror(r) << dendl;
+    return r;
+  }
+
+  r = m_monc->authenticate(m_cct->_conf->client_mount_timeout);
+  if (r < 0) {
+    derr << ": failed to authenticate to monitor: " << cpp_strerror(r) << dendl;
+    return r;
+  }
+
+  client_t me = m_monc->get_global_id();
+  m_msgr->set_myname(entity_name_t::CLIENT(me.v));
+  return 0;
+}
+
+int Mirror::init(std::string &reason) {
+  dout(20) << dendl;
+
+  std::scoped_lock locker(m_lock);
+  int r = init_mon_client();
+  if (r < 0) {
+    return r;
+  }
+
+  return 0;
+}
+
+void Mirror::shutdown() {
+  dout(20) << dendl;
+
+  std::unique_lock locker(m_lock);
+  if (m_fs_mirrors.empty()) {
+    return;
+  }
+
+  m_stopping = true;
+  m_cond.notify_all();
+  m_cond.wait(locker, [this] {return m_stopped;});
+}
+
+void Mirror::handle_signal(int signum) {
+  dout(10) << ": signal=" << signum << dendl;
+  ceph_assert(signum == SIGTERM || signum == SIGINT);
+  shutdown();
+  ::exit(0);
+}
+
+void Mirror::handle_mirroring_enabled(const std::string &fs_name, int r) {
+  dout(20) << ": fs_name=" << fs_name << ", r=" << r << dendl;
+
+  std::scoped_lock locker(m_lock);
+  if (r < 0) {
+    derr << ": failed to initialize FSMirror for filesystem=" << fs_name
+         << ": " << cpp_strerror(r) << dendl;
+    if (!m_stopping) {
+      m_fs_mirrors.erase(fs_name);
+    }
+
+    return;
+  }
+
+  dout(10) << ": Initialized FSMirror for filesystem=" << fs_name << dendl;
+}
+
+void Mirror::mirroring_enabled(const std::string &fs_name, uint64_t local_pool_id) {
+  dout(10) << ": fs_name=" << fs_name << ", pool_id=" << local_pool_id << dendl;
+
+  std::scoped_lock locker(m_lock);
+  if (m_stopping) {
+    return;
+  }
+
+  // TODO: handle consecutive overlapping enable/disable calls
+  ceph_assert(m_fs_mirrors.find(fs_name) == m_fs_mirrors.end());
+
+  dout(10) << ": starting FSMirror: fs_name=" << fs_name << dendl;
+  std::unique_ptr<FSMirror> fs_mirror(new FSMirror(m_cct, fs_name, local_pool_id,
+                                                   m_args, m_work_queue));
+  Context *on_finish = new LambdaContext([this, fs_name](int r) {
+                                           handle_mirroring_enabled(fs_name, r);
+                                         });
+  fs_mirror->init(new C_AsyncCallback<ContextWQ>(m_work_queue, on_finish));
+  m_fs_mirrors.emplace(fs_name, std::move(fs_mirror));
+}
+
+void Mirror::mirroring_disabled(const std::string &fs_name) {
+  dout(10) << ": fs_name=" << fs_name << dendl;
+
+  std::scoped_lock locker(m_lock);
+  if (!m_fs_mirrors.count(fs_name)) {
+    dout(5) << ": fs mirror not found -- init failure(?) for " << fs_name
+            << dendl;
+    return;
+  }
+
+  if (m_stopping) {
+    dout(5) << "shutting down" << dendl;
+    return;
+  }
+
+  auto &fs_mirror = m_fs_mirrors.at(fs_name);
+  if (!fs_mirror->is_stopping()) {
+    Context *on_finish = new LambdaContext([this, fs_name](int r) {
+                                             handle_shutdown(fs_name, r);
+                                           });
+    fs_mirror->shutdown(new C_AsyncCallback<ContextWQ>(m_work_queue, on_finish));
+  }
+}
+
+void Mirror::peer_added(const std::string &fs_name, const Peer &peer) {
+  dout(20) << ": fs_name=" << fs_name << ", peer=" << peer << dendl;
+
+  std::scoped_lock locker(m_lock);
+  if (!m_fs_mirrors.count(fs_name)) {
+    dout(5) << ": fs mirror not found -- init failure(?) for " << fs_name
+            << dendl;
+    return;
+  }
+
+  if (m_stopping) {
+    dout(5) << "shutting down" << dendl;
+    return;
+  }
+
+  auto &fs_mirror = m_fs_mirrors.at(fs_name);
+  fs_mirror->add_peer(peer);
+}
+
+void Mirror::peer_removed(const std::string &fs_name, const Peer &peer) {
+  dout(20) << ": fs_name=" << fs_name << ", peer=" << peer << dendl;
+
+  std::scoped_lock locker(m_lock);
+  if (!m_fs_mirrors.count(fs_name)) {
+    dout(5) << ": fs mirror not found -- init failure(?) for " << fs_name
+            << dendl;
+    return;
+  }
+
+  if (m_stopping) {
+    dout(5) << "shutting down" << dendl;
+    return;
+  }
+
+  auto &fs_mirror = m_fs_mirrors.at(fs_name);
+  fs_mirror->remove_peer(peer);
+}
+
+void Mirror::handle_shutdown(const std::string &fs_name, int r) {
+  dout(10) << ": fs_name=" << fs_name << ", r=" << r << dendl;
+
+  std::scoped_lock locker(m_lock);
+  m_fs_mirrors.erase(fs_name);
+  m_cond.notify_all();
+}
+
+void Mirror::run() {
+  dout(20) << dendl;
+
+  std::unique_lock locker(m_lock);
+  m_cluster_watcher.reset(new ClusterWatcher(m_cct, m_monc, m_listener));
+  m_msgr->add_dispatcher_tail(m_cluster_watcher.get());
+
+  m_cluster_watcher->init();
+  m_cond.wait(locker, [this]{return m_stopping;});
+
+  for (auto &[fs_name, fs_mirror] : m_fs_mirrors) {
+    dout(10) << ": shutting down mirror for fs_name=" << fs_name << dendl;
+    if (fs_mirror->is_stopping()) {
+      dout(10) << ": fs_name=" << fs_name << " is under shutdown" << dendl;
+      continue;
+    }
+
+    Context *on_finish = new LambdaContext([this, fs_name](int r) {
+                                             handle_shutdown(fs_name, r);
+                            });
+    fs_mirror->shutdown(new C_AsyncCallback<ContextWQ>(m_work_queue, on_finish));
+  }
+
+  m_cond.wait(locker, [this] {return m_fs_mirrors.empty();});
+
+  m_stopped = true;
+  m_cond.notify_all();
+}
+
+} // namespace mirror
+} // namespace cephfs
+
diff --git a/src/tools/cephfs_mirror/Mirror.h b/src/tools/cephfs_mirror/Mirror.h
new file mode 100644 (file)
index 0000000..6bc2f60
--- /dev/null
@@ -0,0 +1,99 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPHFS_MIRROR_H
+#define CEPHFS_MIRROR_H
+
+#include <map>
+#include <set>
+#include <vector>
+
+#include "common/ceph_mutex.h"
+#include "mds/FSMap.h"
+#include "ClusterWatcher.h"
+#include "FSMirror.h"
+#include "Types.h"
+
+class CephContext;
+class Messenger;
+class MonClient;
+class ContextWQ;
+
+namespace cephfs {
+namespace mirror {
+
+// this wraps up ClusterWatcher and FSMirrors to implement mirroring
+// for ceph filesystems.
+
+class Mirror {
+public:
+  Mirror(CephContext *cct, const std::vector<const char*> &args,
+         MonClient *monc, Messenger *msgr);
+  ~Mirror();
+
+  int init(std::string &reason);
+  void shutdown();
+  void run();
+
+  void handle_signal(int signum);
+
+private:
+  static constexpr std::string_view MIRRORING_MODULE = "mirroring";
+
+  struct ClusterListener : ClusterWatcher::Listener {
+    Mirror *mirror;
+
+    ClusterListener(Mirror *mirror)
+      : mirror(mirror) {
+    }
+
+    void handle_mirroring_enabled(const FilesystemSpec &spec) override {
+      mirror->mirroring_enabled(spec.fs_name, spec.pool_id);
+    }
+
+    void handle_mirroring_disabled(const std::string &fs_name) override {
+      mirror->mirroring_disabled(fs_name);
+    }
+
+    void handle_peers_added(const std::string &fs_name, const Peer &peer) override {
+      mirror->peer_added(fs_name, peer);
+    }
+
+    void handle_peers_removed(const std::string &fs_name, const Peer &peer) override {
+      mirror->peer_removed(fs_name, peer);
+    }
+  };
+
+  ceph::mutex m_lock = ceph::make_mutex("cephfs::mirror::Mirror");
+  ceph::condition_variable m_cond;
+
+  CephContext *m_cct;
+  std::vector<const char *> m_args;
+  MonClient *m_monc;
+  Messenger *m_msgr;
+  ClusterListener m_listener;
+
+  ContextWQ *m_work_queue = nullptr;
+  SafeTimer *m_timer = nullptr;
+
+  bool m_stopping = false;
+  bool m_stopped = false;
+  std::unique_ptr<ClusterWatcher> m_cluster_watcher;
+  std::map<std::string, std::unique_ptr<FSMirror>> m_fs_mirrors;
+
+  int init_mon_client();
+
+  void handle_mirroring_enabled(const std::string &fs_name, int r);
+  void mirroring_enabled(const std::string &fs_name, uint64_t local_pool_id);
+  void mirroring_disabled(const std::string &fs_name);
+
+  void peer_added(const std::string &fs_name, const Peer &peer);
+  void peer_removed(const std::string &fs_name, const Peer &peer);
+
+  void handle_shutdown(const std::string &fs_name, int r);
+};
+
+} // namespace mirror
+} // namespace cephfs
+
+#endif // CEPHFS_MIRROR_H
diff --git a/src/tools/cephfs_mirror/main.cc b/src/tools/cephfs_mirror/main.cc
new file mode 100644 (file)
index 0000000..ccc395e
--- /dev/null
@@ -0,0 +1,95 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "common/ceph_argparse.h"
+#include "common/config.h"
+#include "common/debug.h"
+#include "common/errno.h"
+#include "common/async/context_pool.h"
+#include "global/global_init.h"
+#include "global/signal_handler.h"
+#include "mon/MonClient.h"
+#include "msg/Messenger.h"
+#include "Mirror.h"
+
+#include <vector>
+
+void usage() {
+  std::cout << "usage: cephfs-mirror [options...]" << std::endl;
+  std::cout << "options:\n";
+  std::cout << "  --remote/-r FILE      remote cluster configuration\n";
+  std::cout << "  --keyring=<path>      path to keyring for remote cluster\n";
+  std::cout << "  --log-file=<logfile>  file to log debug output\n";
+  std::cout << "  --debug-cephfs-mirror=<log-level>/<memory-level>  set cephfs-sync debug level\n";
+  generic_server_usage();
+}
+
+cephfs::mirror::Mirror *mirror = nullptr;
+
+static void handle_signal(int signum) {
+  if (mirror) {
+    mirror->handle_signal(signum);
+  }
+}
+
+int main(int argc, const char **argv) {
+  std::vector<const char*> args;
+  argv_to_vec(argc, argv, args);
+  if (args.empty()) {
+    cerr << argv[0] << ": -h or --help for usage" << std::endl;
+    ::exit(1);
+  }
+
+  if (ceph_argparse_need_usage(args)) {
+    usage();
+    ::exit(0);
+  }
+
+  auto cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_CLIENT,
+                         CODE_ENVIRONMENT_DAEMON,
+                         CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS);
+
+  if (g_conf()->daemonize) {
+    global_init_daemonize(g_ceph_context);
+  }
+
+  common_init_finish(g_ceph_context);
+
+  init_async_signal_handler();
+  register_async_signal_handler_oneshot(SIGINT, handle_signal);
+  register_async_signal_handler_oneshot(SIGTERM, handle_signal);
+
+  std::vector<const char*> cmd_args;
+  argv_to_vec(argc, argv, cmd_args);
+
+  Messenger *msgr = Messenger::create_client_messenger(g_ceph_context, "client");
+  msgr->set_default_policy(Messenger::Policy::lossy_client(0));
+
+  std::string reason;
+  ceph::async::io_context_pool ctxpool(1);
+  MonClient monc(MonClient(g_ceph_context, ctxpool));
+  int r = monc.build_initial_monmap();
+  if (r < 0) {
+    cerr << "failed to generate initial monmap" << std::endl;
+    goto cleanup_messenger;
+  }
+
+  msgr->start();
+
+  mirror = new cephfs::mirror::Mirror(g_ceph_context, cmd_args, &monc, msgr);
+  r = mirror->init(reason);
+  if (r < 0) {
+    std::cerr << "failed to initialize cephfs-mirror: " << reason << std::endl;
+    goto cleanup;
+  }
+
+  mirror->run();
+
+cleanup:
+  monc.shutdown();
+cleanup_messenger:
+  msgr->shutdown();
+  delete mirror;
+
+  return r < 0 ? EXIT_SUCCESS : EXIT_FAILURE;
+}