From: Venky Shankar Date: Thu, 18 Jun 2020 04:41:50 +0000 (-0400) Subject: cephfs-mirror: ClusterWatcher class for watching peer changes X-Git-Tag: v16.1.0~1247^2~7 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=d85af8f604cdce21f168c92aebe085db77028e23;p=ceph.git cephfs-mirror: ClusterWatcher class for watching peer changes Signed-off-by: Venky Shankar --- diff --git a/src/tools/cephfs_mirror/ClusterWatcher.cc b/src/tools/cephfs_mirror/ClusterWatcher.cc new file mode 100644 index 000000000000..895324ad9e9c --- /dev/null +++ b/src/tools/cephfs_mirror/ClusterWatcher.cc @@ -0,0 +1,156 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include +#include + +#include "common/ceph_context.h" +#include "common/debug.h" +#include "common/errno.h" +#include "mon/MonClient.h" + +#include "ClusterWatcher.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_cephfs_mirror +#undef dout_prefix +#define dout_prefix *_dout << "cephfs::mirror::ClusterWatcher " << __func__ + +namespace cephfs { +namespace mirror { + +ClusterWatcher::ClusterWatcher(CephContext *cct, MonClient *monc, Listener &listener) + : Dispatcher(cct), + m_monc(monc), + m_listener(listener) { +} + +ClusterWatcher::~ClusterWatcher() { +} + +bool ClusterWatcher::ms_can_fast_dispatch2(const cref_t &m) const { + return m->get_type() == CEPH_MSG_FS_MAP; +} + +void ClusterWatcher::ms_fast_dispatch2(const ref_t &m) { + bool handled = ms_dispatch2(m); + ceph_assert(handled); +} + +bool ClusterWatcher::ms_dispatch2(const ref_t &m) { + if (m->get_type() == CEPH_MSG_FS_MAP) { + if (m->get_connection()->get_peer_type() == CEPH_ENTITY_TYPE_MON) { + handle_fsmap(ref_cast(m)); + } + return true; + } + + return false; +} + +int ClusterWatcher::init() { + dout(20) << dendl; + + bool sub = m_monc->sub_want("fsmap", 0, 0); + if (!sub) { + derr << ": failed subscribing to FSMap" << dendl; + return -1; + } + + m_monc->renew_subs(); + dout(10) << ": subscribed to FSMap" << dendl; + return 0; +} + +void ClusterWatcher::shutdown() { + dout(20) << dendl; + std::scoped_lock locker(m_lock); + m_monc->sub_unwant("fsmap"); +} + +void ClusterWatcher::handle_fsmap(const cref_t &m) { + dout(20) << dendl; + + auto fsmap = m->get_fsmap(); + auto filesystems = fsmap.get_filesystems(); + + std::vector mirroring_enabled; + std::vector mirroring_disabled; + std::map peers_added; + std::map peers_removed; + std::map fs_metadata_pools; + { + std::scoped_lock locker(m_lock); + for (auto &filesystem : filesystems) { + auto fs_name = filesystem->mds_map.get_fs_name(); + auto pool_id = filesystem->mds_map.get_metadata_pool(); + auto &mirror_info = filesystem->mirror_info; + + if (!mirror_info.is_mirrored()) { + auto it = m_filesystem_peers.find(fs_name); + if (it != m_filesystem_peers.end()) { + mirroring_disabled.emplace_back(fs_name); + m_filesystem_peers.erase(it); + } + } else { + auto [fspeersit, enabled] = m_filesystem_peers.emplace(fs_name, Peers{}); + auto &peers = fspeersit->second; + + if (enabled) { + mirroring_enabled.emplace_back(fs_name); + fs_metadata_pools.emplace(fs_name, pool_id); + } + + // peers added + Peers added; + std::set_difference(mirror_info.peers.begin(), mirror_info.peers.end(), + peers.begin(), peers.end(), std::inserter(added, added.end())); + + // peers removed + Peers removed; + std::set_difference(peers.begin(), peers.end(), + mirror_info.peers.begin(), mirror_info.peers.end(), + std::inserter(removed, removed.end())); + + // update set + if (!added.empty()) { + peers_added.emplace(fs_name, added); + peers.insert(added.begin(), added.end()); + } + if (!removed.empty()) { + peers_removed.emplace(fs_name, removed); + for (auto &p : removed) { + peers.erase(p); + } + } + } + } + } + + dout(5) << ": mirroring enabled=" << mirroring_enabled << ", mirroring_disabled=" + << mirroring_disabled << dendl; + for (auto &fs_name : mirroring_enabled) { + m_listener.handle_mirroring_enabled(FilesystemSpec(fs_name, fs_metadata_pools.at(fs_name))); + } + for (auto &fs_name : mirroring_disabled) { + m_listener.handle_mirroring_disabled(fs_name); + } + + dout(5) << ": peers added=" << peers_added << ", peers removed=" << peers_removed << dendl; + + for (auto &[fs_name, peers] : peers_added) { + for (auto &peer : peers) { + m_listener.handle_peers_added(fs_name, peer); + } + } + for (auto &[fs_name, peers] : peers_removed) { + for (auto &peer : peers) { + m_listener.handle_peers_removed(fs_name, peer); + } + } + + m_monc->sub_got("fsmap", fsmap.get_epoch()); +} + +} // namespace mirror +} // namespace cephfs diff --git a/src/tools/cephfs_mirror/ClusterWatcher.h b/src/tools/cephfs_mirror/ClusterWatcher.h new file mode 100644 index 000000000000..ac95b563cb6e --- /dev/null +++ b/src/tools/cephfs_mirror/ClusterWatcher.h @@ -0,0 +1,80 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPHFS_MIRROR_CLUSTER_WATCHER_H +#define CEPHFS_MIRROR_CLUSTER_WATCHER_H + +#include + +#include "common/ceph_mutex.h" +#include "common/async/context_pool.h" +#include "messages/MFSMap.h" +#include "msg/Dispatcher.h" +#include "Types.h" + +class CephContext; +class MonClient; + +namespace cephfs { +namespace mirror { + +// watch peer changes for filesystems via FSMap updates + +class ClusterWatcher : public Dispatcher { +public: + struct Listener { + virtual ~Listener() { + } + + virtual void handle_mirroring_enabled(const FilesystemSpec &spec) = 0; + virtual void handle_mirroring_disabled(const std::string &fs_name) = 0; + + virtual void handle_peers_added(const std::string &fs_name, const Peer &peer) = 0; + virtual void handle_peers_removed(const std::string &fs_name, const Peer &peer) = 0; + }; + + ClusterWatcher(CephContext *cct, MonClient *monc, Listener &listener); + ~ClusterWatcher(); + + bool ms_can_fast_dispatch_any() const override { + return true; + } + bool ms_can_fast_dispatch2(const cref_t &m) const override; + void ms_fast_dispatch2(const ref_t &m) override; + bool ms_dispatch2(const ref_t &m) override; + + void ms_handle_connect(Connection *c) override { + } + bool ms_handle_reset(Connection *c) override { + return false; + } + void ms_handle_remote_reset(Connection *c) override { + } + bool ms_handle_refused(Connection *c) override { + return false; + } + + int init(); + void shutdown(); + +private: + struct StringCmp { + using is_transparent = void; + bool operator()(std::string_view a, std::string_view b) const { + return a < b; + } + }; + + ceph::mutex m_lock = ceph::make_mutex("cephfs::mirror::cluster_watcher"); + MonClient *m_monc; + Listener &m_listener; + + std::map m_filesystem_peers; + + void handle_fsmap(const cref_t &m); +}; + +} // namespace mirror +} // namespace cephfs + +#endif // CEPHFS_MIRROR_CLUSTER_WATCHER_H