]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
cephfs-mirror: ClusterWatcher class for watching peer changes
authorVenky Shankar <vshankar@redhat.com>
Thu, 18 Jun 2020 04:41:50 +0000 (00:41 -0400)
committerVenky Shankar <vshankar@redhat.com>
Tue, 1 Sep 2020 10:58:10 +0000 (06:58 -0400)
Signed-off-by: Venky Shankar <vshankar@redhat.com>
src/tools/cephfs_mirror/ClusterWatcher.cc [new file with mode: 0644]
src/tools/cephfs_mirror/ClusterWatcher.h [new file with mode: 0644]

diff --git a/src/tools/cephfs_mirror/ClusterWatcher.cc b/src/tools/cephfs_mirror/ClusterWatcher.cc
new file mode 100644 (file)
index 0000000..895324a
--- /dev/null
@@ -0,0 +1,156 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <mutex>
+#include <vector>
+
+#include "common/ceph_context.h"
+#include "common/debug.h"
+#include "common/errno.h"
+#include "mon/MonClient.h"
+
+#include "ClusterWatcher.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_cephfs_mirror
+#undef dout_prefix
+#define dout_prefix *_dout << "cephfs::mirror::ClusterWatcher " << __func__
+
+namespace cephfs {
+namespace mirror {
+
+ClusterWatcher::ClusterWatcher(CephContext *cct, MonClient *monc, Listener &listener)
+  : Dispatcher(cct),
+    m_monc(monc),
+    m_listener(listener) {
+}
+
+ClusterWatcher::~ClusterWatcher() {
+}
+
+bool ClusterWatcher::ms_can_fast_dispatch2(const cref_t<Message> &m) const {
+  return m->get_type() == CEPH_MSG_FS_MAP;
+}
+
+void ClusterWatcher::ms_fast_dispatch2(const ref_t<Message> &m) {
+  bool handled = ms_dispatch2(m);
+  ceph_assert(handled);
+}
+
+bool ClusterWatcher::ms_dispatch2(const ref_t<Message> &m) {
+  if (m->get_type() == CEPH_MSG_FS_MAP) {
+    if (m->get_connection()->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
+      handle_fsmap(ref_cast<MFSMap>(m));
+    }
+    return true;
+  }
+
+  return false;
+}
+
+int ClusterWatcher::init() {
+  dout(20) << dendl;
+
+  bool sub = m_monc->sub_want("fsmap", 0, 0);
+  if (!sub) {
+    derr << ": failed subscribing to FSMap" << dendl;
+    return -1;
+  }
+
+  m_monc->renew_subs();
+  dout(10) << ": subscribed to FSMap" << dendl;
+  return 0;
+}
+
+void ClusterWatcher::shutdown() {
+  dout(20) << dendl;
+  std::scoped_lock locker(m_lock);
+  m_monc->sub_unwant("fsmap");
+}
+
+void ClusterWatcher::handle_fsmap(const cref_t<MFSMap> &m) {
+  dout(20) << dendl;
+
+  auto fsmap = m->get_fsmap();
+  auto filesystems = fsmap.get_filesystems();
+
+  std::vector<std::string> mirroring_enabled;
+  std::vector<std::string> mirroring_disabled;
+  std::map<std::string, Peers> peers_added;
+  std::map<std::string, Peers> peers_removed;
+  std::map<std::string, uint64_t> fs_metadata_pools;
+  {
+    std::scoped_lock locker(m_lock);
+    for (auto &filesystem : filesystems) {
+      auto fs_name = filesystem->mds_map.get_fs_name();
+      auto pool_id = filesystem->mds_map.get_metadata_pool();
+      auto &mirror_info = filesystem->mirror_info;
+
+      if (!mirror_info.is_mirrored()) {
+        auto it = m_filesystem_peers.find(fs_name);
+        if (it != m_filesystem_peers.end()) {
+          mirroring_disabled.emplace_back(fs_name);
+          m_filesystem_peers.erase(it);
+        }
+      } else {
+        auto [fspeersit, enabled] = m_filesystem_peers.emplace(fs_name, Peers{});
+        auto &peers = fspeersit->second;
+
+        if (enabled) {
+          mirroring_enabled.emplace_back(fs_name);
+          fs_metadata_pools.emplace(fs_name, pool_id);
+        }
+
+        // peers added
+        Peers added;
+        std::set_difference(mirror_info.peers.begin(), mirror_info.peers.end(),
+                            peers.begin(), peers.end(), std::inserter(added, added.end()));
+
+        // peers removed
+        Peers removed;
+        std::set_difference(peers.begin(), peers.end(),
+                            mirror_info.peers.begin(), mirror_info.peers.end(),
+                            std::inserter(removed, removed.end()));
+
+        // update set
+        if (!added.empty()) {
+          peers_added.emplace(fs_name, added);
+          peers.insert(added.begin(), added.end());
+        }
+        if (!removed.empty()) {
+          peers_removed.emplace(fs_name, removed);
+          for (auto &p : removed) {
+            peers.erase(p);
+          }
+        }
+      }
+    }
+  }
+
+  dout(5) << ": mirroring enabled=" << mirroring_enabled << ", mirroring_disabled="
+          << mirroring_disabled << dendl;
+  for (auto &fs_name : mirroring_enabled) {
+    m_listener.handle_mirroring_enabled(FilesystemSpec(fs_name, fs_metadata_pools.at(fs_name)));
+  }
+  for (auto &fs_name : mirroring_disabled) {
+    m_listener.handle_mirroring_disabled(fs_name);
+  }
+
+  dout(5) << ": peers added=" << peers_added << ", peers removed=" << peers_removed << dendl;
+
+  for (auto &[fs_name, peers] : peers_added) {
+    for (auto &peer : peers) {
+      m_listener.handle_peers_added(fs_name, peer);
+    }
+  }
+  for (auto &[fs_name, peers] : peers_removed) {
+    for (auto &peer : peers) {
+      m_listener.handle_peers_removed(fs_name, peer);
+    }
+  }
+
+  m_monc->sub_got("fsmap", fsmap.get_epoch());
+}
+
+} // namespace mirror
+} // namespace cephfs
diff --git a/src/tools/cephfs_mirror/ClusterWatcher.h b/src/tools/cephfs_mirror/ClusterWatcher.h
new file mode 100644 (file)
index 0000000..ac95b56
--- /dev/null
@@ -0,0 +1,80 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPHFS_MIRROR_CLUSTER_WATCHER_H
+#define CEPHFS_MIRROR_CLUSTER_WATCHER_H
+
+#include <map>
+
+#include "common/ceph_mutex.h"
+#include "common/async/context_pool.h"
+#include "messages/MFSMap.h"
+#include "msg/Dispatcher.h"
+#include "Types.h"
+
+class CephContext;
+class MonClient;
+
+namespace cephfs {
+namespace mirror {
+
+// watch peer changes for filesystems via FSMap updates
+
+class ClusterWatcher : public Dispatcher {
+public:
+  struct Listener {
+    virtual ~Listener() {
+    }
+
+    virtual void handle_mirroring_enabled(const FilesystemSpec &spec) = 0;
+    virtual void handle_mirroring_disabled(const std::string &fs_name) = 0;
+
+    virtual void handle_peers_added(const std::string &fs_name, const Peer &peer) = 0;
+    virtual void handle_peers_removed(const std::string &fs_name, const Peer &peer) = 0;
+  };
+
+  ClusterWatcher(CephContext *cct, MonClient *monc, Listener &listener);
+  ~ClusterWatcher();
+
+  bool ms_can_fast_dispatch_any() const override {
+    return true;
+  }
+  bool ms_can_fast_dispatch2(const cref_t<Message> &m) const override;
+  void ms_fast_dispatch2(const ref_t<Message> &m) override;
+  bool ms_dispatch2(const ref_t<Message> &m) override;
+
+  void ms_handle_connect(Connection *c) override {
+  }
+  bool ms_handle_reset(Connection *c) override {
+    return false;
+  }
+  void ms_handle_remote_reset(Connection *c) override {
+  }
+  bool ms_handle_refused(Connection *c) override {
+    return false;
+  }
+
+  int init();
+  void shutdown();
+
+private:
+  struct StringCmp {
+    using is_transparent = void;
+    bool operator()(std::string_view a, std::string_view b) const {
+      return a < b;
+    }
+  };
+
+  ceph::mutex m_lock = ceph::make_mutex("cephfs::mirror::cluster_watcher");
+  MonClient *m_monc;
+  Listener &m_listener;
+
+  std::map<std::string, Peers, StringCmp> m_filesystem_peers;
+
+  void handle_fsmap(const cref_t<MFSMap> &m);
+};
+
+} // namespace mirror
+} // namespace cephfs
+
+#endif // CEPHFS_MIRROR_CLUSTER_WATCHER_H