]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
cephfs-mirror: register mirror daemon with service manager
authorVenky Shankar <vshankar@redhat.com>
Wed, 27 Jan 2021 04:53:29 +0000 (23:53 -0500)
committerVenky Shankar <vshankar@redhat.com>
Fri, 5 Mar 2021 06:00:01 +0000 (01:00 -0500)
Signed-off-by: Venky Shankar <vshankar@redhat.com>
12 files changed:
src/tools/cephfs_mirror/CMakeLists.txt
src/tools/cephfs_mirror/ClusterWatcher.cc
src/tools/cephfs_mirror/ClusterWatcher.h
src/tools/cephfs_mirror/FSMirror.cc
src/tools/cephfs_mirror/FSMirror.h
src/tools/cephfs_mirror/Mirror.cc
src/tools/cephfs_mirror/Mirror.h
src/tools/cephfs_mirror/PeerReplayer.cc
src/tools/cephfs_mirror/PeerReplayer.h
src/tools/cephfs_mirror/ServiceDaemon.cc [new file with mode: 0644]
src/tools/cephfs_mirror/ServiceDaemon.h [new file with mode: 0644]
src/tools/cephfs_mirror/Types.h

index d922615713e55ff83477c37adc28b80763771f48..4b6dea7a160672304aa5c34aa05bdb6da00cfcc2 100644 (file)
@@ -5,6 +5,7 @@ set(cephfs_mirror_internal
   InstanceWatcher.cc
   MirrorWatcher.cc
   PeerReplayer.cc
+  ServiceDaemon.cc
   Types.cc
   Utils.cc
   Watcher.cc
index c40ada88c1f5eeef9755bdd87302c4a46d1bf44b..a5d04717a97bf5c7cf1e7c77fb99f2f45f547680 100644 (file)
@@ -10,6 +10,7 @@
 #include "mon/MonClient.h"
 
 #include "ClusterWatcher.h"
+#include "ServiceDaemon.h"
 
 #define dout_context g_ceph_context
 #define dout_subsys ceph_subsys_cephfs_mirror
 namespace cephfs {
 namespace mirror {
 
-ClusterWatcher::ClusterWatcher(CephContext *cct, MonClient *monc, Listener &listener)
+ClusterWatcher::ClusterWatcher(CephContext *cct, MonClient *monc, ServiceDaemon *service_daemon,
+                               Listener &listener)
   : Dispatcher(cct),
     m_monc(monc),
+    m_service_daemon(service_daemon),
     m_listener(listener) {
 }
 
@@ -141,9 +144,11 @@ void ClusterWatcher::handle_fsmap(const cref_t<MFSMap> &m) {
   dout(5) << ": mirroring enabled=" << mirroring_enabled << ", mirroring_disabled="
           << mirroring_disabled << dendl;
   for (auto &fs : mirroring_enabled) {
+    m_service_daemon->add_filesystem(fs.fscid, fs.fs_name);
     m_listener.handle_mirroring_enabled(FilesystemSpec(fs, fs_metadata_pools.at(fs)));
   }
   for (auto &fs : mirroring_disabled) {
+    m_service_daemon->remove_filesystem(fs.fscid);
     m_listener.handle_mirroring_disabled(fs);
   }
 
@@ -151,11 +156,13 @@ void ClusterWatcher::handle_fsmap(const cref_t<MFSMap> &m) {
 
   for (auto &[fs, peers] : peers_added) {
     for (auto &peer : peers) {
+      m_service_daemon->add_peer(fs.fscid, peer);
       m_listener.handle_peers_added(fs, peer);
     }
   }
   for (auto &[fs, peers] : peers_removed) {
     for (auto &peer : peers) {
+      m_service_daemon->remove_peer(fs.fscid, peer);
       m_listener.handle_peers_removed(fs, peer);
     }
   }
index a234eb8b13de3cc96c9b7833c65729c2655033f0..e3bf6298b507cee9705e8cee5993801fbc14f898 100644 (file)
@@ -17,6 +17,8 @@ class MonClient;
 namespace cephfs {
 namespace mirror {
 
+class ServiceDaemon;
+
 // watch peer changes for filesystems via FSMap updates
 
 class ClusterWatcher : public Dispatcher {
@@ -32,7 +34,8 @@ public:
     virtual void handle_peers_removed(const Filesystem &filesystem, const Peer &peer) = 0;
   };
 
-  ClusterWatcher(CephContext *cct, MonClient *monc, Listener &listener);
+  ClusterWatcher(CephContext *cct, MonClient *monc, ServiceDaemon *service_daemon,
+                 Listener &listener);
   ~ClusterWatcher();
 
   bool ms_can_fast_dispatch_any() const override {
@@ -59,6 +62,7 @@ public:
 private:
   ceph::mutex m_lock = ceph::make_mutex("cephfs::mirror::cluster_watcher");
   MonClient *m_monc;
+  ServiceDaemon *m_service_daemon;
   Listener &m_listener;
 
   std::map<Filesystem, Peers> m_filesystem_peers;
index 33c3a311c8f1c13ce98868688a88623c44049fef..b960d60cd7950d8e37939c04f825f4562a3e510d 100644 (file)
@@ -13,6 +13,7 @@
 #include "FSMirror.h"
 #include "PeerReplayer.h"
 #include "aio_utils.h"
+#include "ServiceDaemon.h"
 #include "Utils.h"
 
 #include "common/Cond.h"
@@ -26,6 +27,10 @@ namespace cephfs {
 namespace mirror {
 
 namespace {
+
+const std::string SERVICE_DAEMON_DIR_COUNT_KEY("directory_count");
+const std::string SERVICE_DAEMON_PEER_INIT_FAILED_KEY("peer_init_failed");
+
 class MirrorAdminSocketCommand {
 public:
   virtual ~MirrorAdminSocketCommand() {
@@ -87,14 +92,18 @@ private:
 };
 
 FSMirror::FSMirror(CephContext *cct, const Filesystem &filesystem, uint64_t pool_id,
-                   std::vector<const char*> args, ContextWQ *work_queue)
+                   ServiceDaemon *service_daemon, std::vector<const char*> args,
+                   ContextWQ *work_queue)
   : m_cct(cct),
     m_filesystem(filesystem),
     m_pool_id(pool_id),
+    m_service_daemon(service_daemon),
     m_args(args),
     m_work_queue(work_queue),
     m_snap_listener(this),
     m_asok_hook(new MirrorAdminSocketHook(cct, filesystem, this)) {
+  m_service_daemon->add_or_update_fs_attribute(m_filesystem.fscid, SERVICE_DAEMON_DIR_COUNT_KEY,
+                                               (uint64_t)0);
 }
 
 FSMirror::~FSMirror() {
@@ -321,24 +330,33 @@ void FSMirror::handle_shutdown_instance_watcher(int r) {
 void FSMirror::handle_acquire_directory(string_view dir_path) {
   dout(5) << ": dir_path=" << dir_path << dendl;
 
-  std::scoped_lock locker(m_lock);
-  m_directories.emplace(dir_path);
-  for (auto &[peer, peer_replayer] : m_peer_replayers) {
-    dout(10) << ": peer=" << peer << dendl;
-    peer_replayer->add_directory(dir_path);
+  {
+    std::scoped_lock locker(m_lock);
+    m_directories.emplace(dir_path);
+    m_service_daemon->add_or_update_fs_attribute(m_filesystem.fscid, SERVICE_DAEMON_DIR_COUNT_KEY,
+                                                 m_directories.size());
+
+    for (auto &[peer, peer_replayer] : m_peer_replayers) {
+      dout(10) << ": peer=" << peer << dendl;
+      peer_replayer->add_directory(dir_path);
+    }
   }
 }
 
 void FSMirror::handle_release_directory(string_view dir_path) {
   dout(5) << ": dir_path=" << dir_path << dendl;
 
-  std::scoped_lock locker(m_lock);
-  auto it = m_directories.find(dir_path);
-  if (it != m_directories.end()) {
-    m_directories.erase(it);
-    for (auto &[peer, peer_replayer] : m_peer_replayers) {
-      dout(10) << ": peer=" << peer << dendl;
-      peer_replayer->remove_directory(dir_path);
+  {
+    std::scoped_lock locker(m_lock);
+    auto it = m_directories.find(dir_path);
+    if (it != m_directories.end()) {
+      m_directories.erase(it);
+      m_service_daemon->add_or_update_fs_attribute(m_filesystem.fscid, SERVICE_DAEMON_DIR_COUNT_KEY,
+                                                   m_directories.size());
+      for (auto &[peer, peer_replayer] : m_peer_replayers) {
+        dout(10) << ": peer=" << peer << dendl;
+        peer_replayer->remove_directory(dir_path);
+      }
     }
   }
 }
@@ -353,9 +371,12 @@ void FSMirror::add_peer(const Peer &peer) {
   }
 
   auto replayer = std::make_unique<PeerReplayer>(
-    m_cct, this, m_filesystem, peer, m_directories, m_mount);
+    m_cct, this, m_filesystem, peer, m_directories, m_mount, m_service_daemon);
   int r = init_replayer(replayer.get());
   if (r < 0) {
+    m_service_daemon->add_or_update_peer_attribute(m_filesystem.fscid, peer,
+                                                   SERVICE_DAEMON_PEER_INIT_FAILED_KEY,
+                                                   true);
     return;
   }
   m_peer_replayers.emplace(peer, std::move(replayer));
index 25aa97e863f9b2f7d25b5119d3d5c7a3c13d405e..53af92e5a69b8179e4f7832e1356ab58f7fd8e26 100644 (file)
@@ -18,13 +18,15 @@ namespace mirror {
 
 class MirrorAdminSocketHook;
 class PeerReplayer;
+class ServiceDaemon;
 
 // handle mirroring for a filesystem to a set of peers
 
 class FSMirror {
 public:
   FSMirror(CephContext *cct, const Filesystem &filesystem, uint64_t pool_id,
-           std::vector<const char*> args, ContextWQ *work_queue);
+           ServiceDaemon *service_daemon, std::vector<const char*> args,
+           ContextWQ *work_queue);
   ~FSMirror();
 
   void init(Context *on_finish);
@@ -93,6 +95,7 @@ private:
   CephContext *m_cct;
   Filesystem m_filesystem;
   uint64_t m_pool_id;
+  ServiceDaemon *m_service_daemon;
   std::vector<const char *> m_args;
   ContextWQ *m_work_queue;
 
index 3496118ca3cc47860bc5854fc617b8afe3776989..6d28263adec279717f8bc25e68e250a7b478b2fc 100644 (file)
@@ -25,6 +25,8 @@ namespace mirror {
 
 namespace {
 
+const std::string SERVICE_DAEMON_MIRROR_ENABLE_FAILED_KEY("mirroring_failed");
+
 class SafeTimerSingleton : public SafeTimer {
 public:
   ceph::mutex timer_lock = ceph::make_mutex("cephfs::mirror::timer_lock");
@@ -193,7 +195,8 @@ Mirror::Mirror(CephContext *cct, const std::vector<const char*> &args,
     m_monc(monc),
     m_msgr(msgr),
     m_listener(this),
-    m_last_blocklist_check(ceph_clock_now()) {
+    m_last_blocklist_check(ceph_clock_now()),
+    m_local(new librados::Rados()) {
   auto thread_pool = &(cct->lookup_or_create_singleton_object<ThreadPoolSingleton>(
                          "cephfs::mirror::thread_pool", false, cct));
   auto safe_timer = &(cct->lookup_or_create_singleton_object<SafeTimerSingleton>(
@@ -250,7 +253,27 @@ int Mirror::init(std::string &reason) {
   dout(20) << dendl;
 
   std::scoped_lock locker(m_lock);
-  int r = init_mon_client();
+
+  int r = m_local->init_with_context(m_cct);
+  if (r < 0) {
+    derr << ": could not initialize rados handler" << dendl;
+    return r;
+  }
+
+  r = m_local->connect();
+  if (r < 0) {
+    derr << ": error connecting to local cluster" << dendl;
+    return r;
+  }
+
+  m_service_daemon = std::make_unique<ServiceDaemon>(m_cct, m_local);
+  r = m_service_daemon->init();
+  if (r < 0) {
+    derr << ": error registering service daemon: " << cpp_strerror(r) << dendl;
+    return r;
+  }
+
+  r = init_mon_client();
   if (r < 0) {
     return r;
   }
@@ -286,6 +309,9 @@ void Mirror::handle_enable_mirroring(const Filesystem &filesystem,
   if (r < 0) {
     derr << ": failed to initialize FSMirror for filesystem=" << filesystem
          << ": " << cpp_strerror(r) << dendl;
+    m_service_daemon->add_or_update_fs_attribute(filesystem.fscid,
+                                                 SERVICE_DAEMON_MIRROR_ENABLE_FAILED_KEY,
+                                                 true);
     return;
   }
 
@@ -308,6 +334,9 @@ void Mirror::handle_enable_mirroring(const Filesystem &filesystem, int r) {
   if (r < 0) {
     derr << ": failed to initialize FSMirror for filesystem=" << filesystem
          << ": " << cpp_strerror(r) << dendl;
+    m_service_daemon->add_or_update_fs_attribute(filesystem.fscid,
+                                                 SERVICE_DAEMON_MIRROR_ENABLE_FAILED_KEY,
+                                                 true);
     return;
   }
 
@@ -332,7 +361,7 @@ void Mirror::enable_mirroring(const Filesystem &filesystem, uint64_t local_pool_
 
   mirror_action.action_in_progress = true;
   mirror_action.fs_mirror = std::make_unique<FSMirror>(m_cct, filesystem, local_pool_id,
-                                                       m_args, m_work_queue);
+                                                       m_service_daemon.get(), m_args, m_work_queue);
   mirror_action.fs_mirror->init(new C_AsyncCallback<ContextWQ>(m_work_queue, on_finish));
 }
 
@@ -500,7 +529,7 @@ void Mirror::run() {
   dout(20) << dendl;
 
   std::unique_lock locker(m_lock);
-  m_cluster_watcher.reset(new ClusterWatcher(m_cct, m_monc, m_listener));
+  m_cluster_watcher.reset(new ClusterWatcher(m_cct, m_monc, m_service_daemon.get(), m_listener));
   m_msgr->add_dispatcher_tail(m_cluster_watcher.get());
 
   m_cluster_watcher->init();
index 933b04ac3467e858a6a497514069f7b81bea9b53..3d6dde7301055e865619c21c662f4749481fa145 100644 (file)
@@ -13,6 +13,7 @@
 #include "mds/FSMap.h"
 #include "ClusterWatcher.h"
 #include "FSMirror.h"
+#include "ServiceDaemon.h"
 #include "Types.h"
 
 class Messenger;
@@ -100,6 +101,8 @@ private:
   std::map<Filesystem, MirrorAction> m_mirror_actions;
 
   utime_t m_last_blocklist_check;
+  RadosRef m_local;
+  std::unique_ptr<ServiceDaemon> m_service_daemon;
 
   int init_mon_client();
 
index b3813584bc751bdda0c13fdf9f34a211a8ecbfca..8defdf56ba7d975f30bb3ccb2ab8f1bb407ba262 100644 (file)
@@ -118,14 +118,21 @@ private:
 PeerReplayer::PeerReplayer(CephContext *cct, FSMirror *fs_mirror,
                            const Filesystem &filesystem, const Peer &peer,
                            const std::set<std::string, std::less<>> &directories,
-                           MountRef mount)
+                           MountRef mount, ServiceDaemon *service_daemon)
   : m_cct(cct),
     m_fs_mirror(fs_mirror),
+    m_filesystem(filesystem),
     m_peer(peer),
     m_directories(directories.begin(), directories.end()),
     m_local_mount(mount),
+    m_service_daemon(service_daemon),
     m_asok_hook(new PeerReplayerAdminSocketHook(cct, filesystem, peer, this)),
     m_lock(ceph::make_mutex("cephfs::mirror::PeerReplayer::" + stringify(peer.uuid))) {
+  // reset sync stats sent via service daemon
+  m_service_daemon->add_or_update_peer_attribute(m_filesystem.fscid, m_peer,
+                                                 SERVICE_DAEMON_FAILED_DIR_COUNT_KEY, (uint64_t)0);
+  m_service_daemon->add_or_update_peer_attribute(m_filesystem.fscid, m_peer,
+                                                 SERVICE_DAEMON_RECOVERED_DIR_COUNT_KEY, (uint64_t)0);
 }
 
 PeerReplayer::~PeerReplayer() {
index 4b99e7360e72917ad9f1d5b3b3af261fbdd95acb..3c8743ed1f9cdc67632a22915c8f58183d30ff3d 100644 (file)
@@ -7,6 +7,7 @@
 #include "common/Formatter.h"
 #include "common/Thread.h"
 #include "mds/FSMap.h"
+#include "ServiceDaemon.h"
 #include "Types.h"
 
 namespace cephfs {
@@ -20,7 +21,7 @@ public:
   PeerReplayer(CephContext *cct, FSMirror *fs_mirror,
                const Filesystem &filesystem, const Peer &peer,
                const std::set<std::string, std::less<>> &directories,
-               MountRef mount);
+               MountRef mount, ServiceDaemon *service_daemon);
   ~PeerReplayer();
 
   // initialize replayer for a peer
@@ -41,6 +42,9 @@ public:
 private:
   inline static const std::string PRIMARY_SNAP_ID_KEY = "primary_snap_id";
 
+  inline static const std::string SERVICE_DAEMON_FAILED_DIR_COUNT_KEY = "failure_count";
+  inline static const std::string SERVICE_DAEMON_RECOVERED_DIR_COUNT_KEY = "recovery_count";
+
   bool is_stopping() {
     return m_stopping;
   }
@@ -101,6 +105,12 @@ private:
   using clock = ceph::coarse_mono_clock;
   using time = ceph::coarse_mono_time;
 
+  // stats sent to service daemon
+  struct ServiceDaemonStats {
+    uint64_t failed_dir_count = 0;
+    uint64_t recovered_dir_count = 0;
+  };
+
   struct SnapSyncStat {
     uint64_t nr_failures = 0; // number of consecutive failures
     boost::optional<time> last_failed; // lat failed timestamp
@@ -119,12 +129,22 @@ private:
     "cephfs_mirror_max_consecutive_failures_per_directory");
     auto &sync_stat = m_snap_sync_stats.at(dir_path);
     sync_stat.last_failed = clock::now();
-    if (++sync_stat.nr_failures >= max_failures) {
+    if (++sync_stat.nr_failures >= max_failures && !sync_stat.failed) {
       sync_stat.failed = true;
+      ++m_service_daemon_stats.failed_dir_count;
+      m_service_daemon->add_or_update_peer_attribute(m_filesystem.fscid, m_peer,
+                                                     SERVICE_DAEMON_FAILED_DIR_COUNT_KEY,
+                                                     m_service_daemon_stats.failed_dir_count);
     }
   }
   void _reset_failed_count(const std::string &dir_path) {
     auto &sync_stat = m_snap_sync_stats.at(dir_path);
+    if (sync_stat.failed) {
+      ++m_service_daemon_stats.recovered_dir_count;
+      m_service_daemon->add_or_update_peer_attribute(m_filesystem.fscid, m_peer,
+                                                     SERVICE_DAEMON_RECOVERED_DIR_COUNT_KEY,
+                                                     m_service_daemon_stats.recovered_dir_count);
+    }
     sync_stat.nr_failures = 0;
     sync_stat.failed = false;
     sync_stat.last_failed = boost::none;
@@ -199,12 +219,14 @@ private:
 
   CephContext *m_cct;
   FSMirror *m_fs_mirror;
+  Filesystem m_filesystem;
   Peer m_peer;
   // probably need to be encapsulated when supporting cancelations
   std::map<std::string, DirRegistry> m_registered;
   std::vector<std::string> m_directories;
   std::map<std::string, SnapSyncStat> m_snap_sync_stats;
   MountRef m_local_mount;
+  ServiceDaemon *m_service_daemon;
   PeerReplayerAdminSocketHook *m_asok_hook = nullptr;
 
   ceph::mutex m_lock;
@@ -214,6 +236,8 @@ private:
   bool m_stopping = false;
   SnapshotReplayers m_replayers;
 
+  ServiceDaemonStats m_service_daemon_stats;
+
   void run(SnapshotReplayerThread *replayer);
 
   boost::optional<std::string> pick_directory();
diff --git a/src/tools/cephfs_mirror/ServiceDaemon.cc b/src/tools/cephfs_mirror/ServiceDaemon.cc
new file mode 100644 (file)
index 0000000..f66dd46
--- /dev/null
@@ -0,0 +1,225 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "common/debug.h"
+#include "common/errno.h"
+#include "common/Timer.h"
+#include "include/stringify.h"
+#include "ServiceDaemon.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_cephfs_mirror
+#undef dout_prefix
+#define dout_prefix *_dout << "cephfs::mirror::ServiceDaemon: " << this << " " \
+                           << __func__
+
+namespace cephfs {
+namespace mirror {
+
+namespace {
+
+struct AttributeDumpVisitor : public boost::static_visitor<void> {
+  ceph::Formatter *f;
+  std::string name;
+
+  AttributeDumpVisitor(ceph::Formatter *f, std::string_view name)
+    : f(f), name(name) {
+  }
+
+  void operator()(bool val) const {
+    f->dump_bool(name.c_str(), val);
+  }
+  void operator()(uint64_t val) const {
+    f->dump_unsigned(name.c_str(), val);
+  }
+  void operator()(const std::string &val) const {
+    f->dump_string(name.c_str(), val);
+  }
+};
+
+} // anonymous namespace
+
+ServiceDaemon::ServiceDaemon(CephContext *cct, RadosRef rados)
+  : m_cct(cct),
+    m_rados(rados),
+    m_timer(new SafeTimer(cct, m_timer_lock, true)) {
+  m_timer->init();
+}
+
+ServiceDaemon::~ServiceDaemon() {
+  dout(10) << dendl;
+  {
+    std::scoped_lock timer_lock(m_timer_lock);
+    if (m_timer_ctx != nullptr) {
+      dout(5) << ": canceling timer task=" << m_timer_ctx << dendl;
+      m_timer->cancel_event(m_timer_ctx);
+    }
+    m_timer->shutdown();
+  }
+
+  delete m_timer;
+}
+
+int ServiceDaemon::init() {
+  dout(20) << dendl;
+
+  std::string id = m_cct->_conf->name.get_id();
+  if (id.find(CEPHFS_MIRROR_AUTH_ID_PREFIX) == 0) {
+    id = id.substr(CEPHFS_MIRROR_AUTH_ID_PREFIX.size());
+  }
+  std::string instance_id = stringify(m_rados->get_instance_id());
+
+  std::map<std::string, std::string> service_metadata = {{"id", id},
+                                                         {"instance_id", instance_id}};
+  int r = m_rados->service_daemon_register("cephfs-mirror", instance_id,
+                                           service_metadata);
+  if (r < 0) {
+    return r;
+  }
+  return 0;
+}
+
+void ServiceDaemon::add_filesystem(fs_cluster_id_t fscid, std::string_view fs_name) {
+  dout(10) << ": fscid=" << fscid << ", fs_name=" << fs_name << dendl;
+
+  {
+    std::scoped_lock locker(m_lock);
+    m_filesystems.emplace(fscid, Filesystem(fs_name));
+  }
+  schedule_update_status();
+}
+
+void ServiceDaemon::remove_filesystem(fs_cluster_id_t fscid) {
+  dout(10) << ": fscid=" << fscid << dendl;
+
+  {
+    std::scoped_lock locker(m_lock);
+    m_filesystems.erase(fscid);
+  }
+  schedule_update_status();
+}
+
+void ServiceDaemon::add_peer(fs_cluster_id_t fscid, const Peer &peer) {
+  dout(10) << ": peer=" << peer << dendl;
+
+  {
+    std::scoped_lock locker(m_lock);
+    auto fs_it = m_filesystems.find(fscid);
+    if (fs_it == m_filesystems.end()) {
+      return;
+    }
+    fs_it->second.peer_attributes.emplace(peer, Attributes{});
+  }
+  schedule_update_status();
+}
+
+void ServiceDaemon::remove_peer(fs_cluster_id_t fscid, const Peer &peer) {
+  dout(10) << ": peer=" << peer << dendl;
+
+  {
+    std::scoped_lock locker(m_lock);
+    auto fs_it = m_filesystems.find(fscid);
+    if (fs_it == m_filesystems.end()) {
+      return;
+    }
+    fs_it->second.peer_attributes.erase(peer);
+  }
+  schedule_update_status();
+}
+
+void ServiceDaemon::add_or_update_fs_attribute(fs_cluster_id_t fscid, std::string_view key,
+                                               AttributeValue value) {
+  dout(10) << ": fscid=" << fscid << dendl;
+
+  {
+    std::scoped_lock locker(m_lock);
+    auto fs_it = m_filesystems.find(fscid);
+    if (fs_it == m_filesystems.end()) {
+      return;
+    }
+
+    fs_it->second.fs_attributes[std::string(key)] = value;
+  }
+  schedule_update_status();
+}
+
+void ServiceDaemon::add_or_update_peer_attribute(fs_cluster_id_t fscid, const Peer &peer,
+                                                 std::string_view key, AttributeValue value) {
+  dout(10) << ": fscid=" << fscid << dendl;
+
+  {
+    std::scoped_lock locker(m_lock);
+    auto fs_it = m_filesystems.find(fscid);
+    if (fs_it == m_filesystems.end()) {
+      return;
+    }
+
+    auto peer_it = fs_it->second.peer_attributes.find(peer);
+    if (peer_it == fs_it->second.peer_attributes.end()) {
+      return;
+    }
+
+    peer_it->second[std::string(key)] = value;
+  }
+  schedule_update_status();
+}
+
+void ServiceDaemon::schedule_update_status() {
+  dout(10) << dendl;
+
+  std::scoped_lock timer_lock(m_timer_lock);
+  if (m_timer_ctx != nullptr) {
+    return;
+  }
+
+  m_timer_ctx = new LambdaContext([this] {
+                                    m_timer_ctx = nullptr;
+                                    update_status();
+                                  });
+  m_timer->add_event_after(1, m_timer_ctx);
+}
+
+void ServiceDaemon::update_status() {
+  dout(20) << ": " << m_filesystems.size() << " filesystem(s)" << dendl;
+
+  ceph::JSONFormatter f;
+  {
+    std::scoped_lock locker(m_lock);
+    f.open_object_section("filesystems");
+    for (auto &[fscid, filesystem] : m_filesystems) {
+      f.open_object_section(stringify(fscid).c_str());
+      f.dump_string("name", filesystem.fs_name);
+      for (auto &[attr_name, attr_value] : filesystem.fs_attributes) {
+            AttributeDumpVisitor visitor(&f, attr_name);
+            boost::apply_visitor(visitor, attr_value);
+      }
+      f.open_object_section("peers");
+      for (auto &[peer, attributes] : filesystem.peer_attributes) {
+        f.open_object_section(peer.uuid);
+        f.dump_object("remote", peer.remote);
+        f.open_object_section("stats");
+        for (auto &[attr_name, attr_value] : attributes) {
+            AttributeDumpVisitor visitor(&f, attr_name);
+            boost::apply_visitor(visitor, attr_value);
+        }
+        f.close_section(); // stats
+        f.close_section(); // peer.uuid
+      }
+      f.close_section(); // peers
+      f.close_section(); // fscid
+    }
+    f.close_section(); // filesystems
+  }
+
+  std::stringstream ss;
+  f.flush(ss);
+
+  int r = m_rados->service_daemon_update_status({{"status_json", ss.str()}});
+  if (r < 0) {
+    derr << ": failed to update service daemon status: " << cpp_strerror(r)
+         << dendl;
+  }
+}
+
+} // namespace mirror
+} // namespace cephfs
diff --git a/src/tools/cephfs_mirror/ServiceDaemon.h b/src/tools/cephfs_mirror/ServiceDaemon.h
new file mode 100644 (file)
index 0000000..393c7d5
--- /dev/null
@@ -0,0 +1,63 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPHFS_MIRROR_SERVICE_DAEMON_H
+#define CEPHFS_MIRROR_SERVICE_DAEMON_H
+
+#include "common/ceph_mutex.h"
+#include "mds/FSMap.h"
+#include "Types.h"
+
+class SafeTimer;
+
+namespace cephfs {
+namespace mirror {
+
+class ServiceDaemon {
+public:
+  ServiceDaemon(CephContext *cct, RadosRef rados);
+  ~ServiceDaemon();
+
+  int init();
+
+  void add_filesystem(fs_cluster_id_t fscid, std::string_view fs_name);
+  void remove_filesystem(fs_cluster_id_t fscid);
+
+  void add_peer(fs_cluster_id_t fscid, const Peer &peer);
+  void remove_peer(fs_cluster_id_t fscid, const Peer &peer);
+
+  void add_or_update_fs_attribute(fs_cluster_id_t fscid, std::string_view key,
+                                  AttributeValue value);
+  void add_or_update_peer_attribute(fs_cluster_id_t fscid, const Peer &peer,
+                                    std::string_view key, AttributeValue value);
+
+private:
+  struct Filesystem {
+    std::string fs_name;
+    Attributes fs_attributes;
+    std::map<Peer, Attributes> peer_attributes;
+
+    Filesystem(std::string_view fs_name)
+      : fs_name(fs_name) {
+    }
+  };
+
+  const std::string CEPHFS_MIRROR_AUTH_ID_PREFIX = "cephfs-mirror.";
+
+  CephContext *m_cct;
+  RadosRef m_rados;
+  SafeTimer *m_timer;
+  ceph::mutex m_timer_lock = ceph::make_mutex("cephfs::mirror::ServiceDaemon");
+
+  ceph::mutex m_lock = ceph::make_mutex("cephfs::mirror::service_daemon");
+  Context *m_timer_ctx = nullptr;
+  std::map<fs_cluster_id_t, Filesystem> m_filesystems;
+
+  void schedule_update_status();
+  void update_status();
+};
+
+} // namespace mirror
+} // namespace cephfs
+
+#endif // CEPHFS_MIRROR_SERVICE_DAEMON_H
index 811d150eca8f5a258676db3ad7db9cdc2e221bf5..016a8dc860c5bcdbf20bc80542fe4a6476cb5e48 100644 (file)
@@ -17,6 +17,9 @@ namespace mirror {
 
 static const std::string CEPHFS_MIRROR_OBJECT("cephfs_mirror");
 
+typedef boost::variant<bool, uint64_t, std::string> AttributeValue;
+typedef std::map<std::string, AttributeValue> Attributes;
+
 // distinct filesystem identifier
 struct Filesystem {
   fs_cluster_id_t fscid;