From: Venky Shankar Date: Wed, 27 Jan 2021 04:53:29 +0000 (-0500) Subject: cephfs-mirror: register mirror daemon with service manager X-Git-Tag: v17.1.0~2686^2~2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=0cac56020664ad20d6aefd2b2eed1542e06c550e;p=ceph.git cephfs-mirror: register mirror daemon with service manager Signed-off-by: Venky Shankar --- diff --git a/src/tools/cephfs_mirror/CMakeLists.txt b/src/tools/cephfs_mirror/CMakeLists.txt index d922615713e..4b6dea7a160 100644 --- a/src/tools/cephfs_mirror/CMakeLists.txt +++ b/src/tools/cephfs_mirror/CMakeLists.txt @@ -5,6 +5,7 @@ set(cephfs_mirror_internal InstanceWatcher.cc MirrorWatcher.cc PeerReplayer.cc + ServiceDaemon.cc Types.cc Utils.cc Watcher.cc diff --git a/src/tools/cephfs_mirror/ClusterWatcher.cc b/src/tools/cephfs_mirror/ClusterWatcher.cc index c40ada88c1f..a5d04717a97 100644 --- a/src/tools/cephfs_mirror/ClusterWatcher.cc +++ b/src/tools/cephfs_mirror/ClusterWatcher.cc @@ -10,6 +10,7 @@ #include "mon/MonClient.h" #include "ClusterWatcher.h" +#include "ServiceDaemon.h" #define dout_context g_ceph_context #define dout_subsys ceph_subsys_cephfs_mirror @@ -19,9 +20,11 @@ namespace cephfs { namespace mirror { -ClusterWatcher::ClusterWatcher(CephContext *cct, MonClient *monc, Listener &listener) +ClusterWatcher::ClusterWatcher(CephContext *cct, MonClient *monc, ServiceDaemon *service_daemon, + Listener &listener) : Dispatcher(cct), m_monc(monc), + m_service_daemon(service_daemon), m_listener(listener) { } @@ -141,9 +144,11 @@ void ClusterWatcher::handle_fsmap(const cref_t &m) { dout(5) << ": mirroring enabled=" << mirroring_enabled << ", mirroring_disabled=" << mirroring_disabled << dendl; for (auto &fs : mirroring_enabled) { + m_service_daemon->add_filesystem(fs.fscid, fs.fs_name); m_listener.handle_mirroring_enabled(FilesystemSpec(fs, fs_metadata_pools.at(fs))); } for (auto &fs : mirroring_disabled) { + m_service_daemon->remove_filesystem(fs.fscid); m_listener.handle_mirroring_disabled(fs); } @@ -151,11 +156,13 @@ void ClusterWatcher::handle_fsmap(const cref_t &m) { for (auto &[fs, peers] : peers_added) { for (auto &peer : peers) { + m_service_daemon->add_peer(fs.fscid, peer); m_listener.handle_peers_added(fs, peer); } } for (auto &[fs, peers] : peers_removed) { for (auto &peer : peers) { + m_service_daemon->remove_peer(fs.fscid, peer); m_listener.handle_peers_removed(fs, peer); } } diff --git a/src/tools/cephfs_mirror/ClusterWatcher.h b/src/tools/cephfs_mirror/ClusterWatcher.h index a234eb8b13d..e3bf6298b50 100644 --- a/src/tools/cephfs_mirror/ClusterWatcher.h +++ b/src/tools/cephfs_mirror/ClusterWatcher.h @@ -17,6 +17,8 @@ class MonClient; namespace cephfs { namespace mirror { +class ServiceDaemon; + // watch peer changes for filesystems via FSMap updates class ClusterWatcher : public Dispatcher { @@ -32,7 +34,8 @@ public: virtual void handle_peers_removed(const Filesystem &filesystem, const Peer &peer) = 0; }; - ClusterWatcher(CephContext *cct, MonClient *monc, Listener &listener); + ClusterWatcher(CephContext *cct, MonClient *monc, ServiceDaemon *service_daemon, + Listener &listener); ~ClusterWatcher(); bool ms_can_fast_dispatch_any() const override { @@ -59,6 +62,7 @@ public: private: ceph::mutex m_lock = ceph::make_mutex("cephfs::mirror::cluster_watcher"); MonClient *m_monc; + ServiceDaemon *m_service_daemon; Listener &m_listener; std::map m_filesystem_peers; diff --git a/src/tools/cephfs_mirror/FSMirror.cc b/src/tools/cephfs_mirror/FSMirror.cc index 33c3a311c8f..b960d60cd79 100644 --- a/src/tools/cephfs_mirror/FSMirror.cc +++ b/src/tools/cephfs_mirror/FSMirror.cc @@ -13,6 +13,7 @@ #include "FSMirror.h" #include "PeerReplayer.h" #include "aio_utils.h" +#include "ServiceDaemon.h" #include "Utils.h" #include "common/Cond.h" @@ -26,6 +27,10 @@ namespace cephfs { namespace mirror { namespace { + +const std::string SERVICE_DAEMON_DIR_COUNT_KEY("directory_count"); +const std::string SERVICE_DAEMON_PEER_INIT_FAILED_KEY("peer_init_failed"); + class MirrorAdminSocketCommand { public: virtual ~MirrorAdminSocketCommand() { @@ -87,14 +92,18 @@ private: }; FSMirror::FSMirror(CephContext *cct, const Filesystem &filesystem, uint64_t pool_id, - std::vector args, ContextWQ *work_queue) + ServiceDaemon *service_daemon, std::vector args, + ContextWQ *work_queue) : m_cct(cct), m_filesystem(filesystem), m_pool_id(pool_id), + m_service_daemon(service_daemon), m_args(args), m_work_queue(work_queue), m_snap_listener(this), m_asok_hook(new MirrorAdminSocketHook(cct, filesystem, this)) { + m_service_daemon->add_or_update_fs_attribute(m_filesystem.fscid, SERVICE_DAEMON_DIR_COUNT_KEY, + (uint64_t)0); } FSMirror::~FSMirror() { @@ -321,24 +330,33 @@ void FSMirror::handle_shutdown_instance_watcher(int r) { void FSMirror::handle_acquire_directory(string_view dir_path) { dout(5) << ": dir_path=" << dir_path << dendl; - std::scoped_lock locker(m_lock); - m_directories.emplace(dir_path); - for (auto &[peer, peer_replayer] : m_peer_replayers) { - dout(10) << ": peer=" << peer << dendl; - peer_replayer->add_directory(dir_path); + { + std::scoped_lock locker(m_lock); + m_directories.emplace(dir_path); + m_service_daemon->add_or_update_fs_attribute(m_filesystem.fscid, SERVICE_DAEMON_DIR_COUNT_KEY, + m_directories.size()); + + for (auto &[peer, peer_replayer] : m_peer_replayers) { + dout(10) << ": peer=" << peer << dendl; + peer_replayer->add_directory(dir_path); + } } } void FSMirror::handle_release_directory(string_view dir_path) { dout(5) << ": dir_path=" << dir_path << dendl; - std::scoped_lock locker(m_lock); - auto it = m_directories.find(dir_path); - if (it != m_directories.end()) { - m_directories.erase(it); - for (auto &[peer, peer_replayer] : m_peer_replayers) { - dout(10) << ": peer=" << peer << dendl; - peer_replayer->remove_directory(dir_path); + { + std::scoped_lock locker(m_lock); + auto it = m_directories.find(dir_path); + if (it != m_directories.end()) { + m_directories.erase(it); + m_service_daemon->add_or_update_fs_attribute(m_filesystem.fscid, SERVICE_DAEMON_DIR_COUNT_KEY, + m_directories.size()); + for (auto &[peer, peer_replayer] : m_peer_replayers) { + dout(10) << ": peer=" << peer << dendl; + peer_replayer->remove_directory(dir_path); + } } } } @@ -353,9 +371,12 @@ void FSMirror::add_peer(const Peer &peer) { } auto replayer = std::make_unique( - m_cct, this, m_filesystem, peer, m_directories, m_mount); + m_cct, this, m_filesystem, peer, m_directories, m_mount, m_service_daemon); int r = init_replayer(replayer.get()); if (r < 0) { + m_service_daemon->add_or_update_peer_attribute(m_filesystem.fscid, peer, + SERVICE_DAEMON_PEER_INIT_FAILED_KEY, + true); return; } m_peer_replayers.emplace(peer, std::move(replayer)); diff --git a/src/tools/cephfs_mirror/FSMirror.h b/src/tools/cephfs_mirror/FSMirror.h index 25aa97e863f..53af92e5a69 100644 --- a/src/tools/cephfs_mirror/FSMirror.h +++ b/src/tools/cephfs_mirror/FSMirror.h @@ -18,13 +18,15 @@ namespace mirror { class MirrorAdminSocketHook; class PeerReplayer; +class ServiceDaemon; // handle mirroring for a filesystem to a set of peers class FSMirror { public: FSMirror(CephContext *cct, const Filesystem &filesystem, uint64_t pool_id, - std::vector args, ContextWQ *work_queue); + ServiceDaemon *service_daemon, std::vector args, + ContextWQ *work_queue); ~FSMirror(); void init(Context *on_finish); @@ -93,6 +95,7 @@ private: CephContext *m_cct; Filesystem m_filesystem; uint64_t m_pool_id; + ServiceDaemon *m_service_daemon; std::vector m_args; ContextWQ *m_work_queue; diff --git a/src/tools/cephfs_mirror/Mirror.cc b/src/tools/cephfs_mirror/Mirror.cc index 3496118ca3c..6d28263adec 100644 --- a/src/tools/cephfs_mirror/Mirror.cc +++ b/src/tools/cephfs_mirror/Mirror.cc @@ -25,6 +25,8 @@ namespace mirror { namespace { +const std::string SERVICE_DAEMON_MIRROR_ENABLE_FAILED_KEY("mirroring_failed"); + class SafeTimerSingleton : public SafeTimer { public: ceph::mutex timer_lock = ceph::make_mutex("cephfs::mirror::timer_lock"); @@ -193,7 +195,8 @@ Mirror::Mirror(CephContext *cct, const std::vector &args, m_monc(monc), m_msgr(msgr), m_listener(this), - m_last_blocklist_check(ceph_clock_now()) { + m_last_blocklist_check(ceph_clock_now()), + m_local(new librados::Rados()) { auto thread_pool = &(cct->lookup_or_create_singleton_object( "cephfs::mirror::thread_pool", false, cct)); auto safe_timer = &(cct->lookup_or_create_singleton_object( @@ -250,7 +253,27 @@ int Mirror::init(std::string &reason) { dout(20) << dendl; std::scoped_lock locker(m_lock); - int r = init_mon_client(); + + int r = m_local->init_with_context(m_cct); + if (r < 0) { + derr << ": could not initialize rados handler" << dendl; + return r; + } + + r = m_local->connect(); + if (r < 0) { + derr << ": error connecting to local cluster" << dendl; + return r; + } + + m_service_daemon = std::make_unique(m_cct, m_local); + r = m_service_daemon->init(); + if (r < 0) { + derr << ": error registering service daemon: " << cpp_strerror(r) << dendl; + return r; + } + + r = init_mon_client(); if (r < 0) { return r; } @@ -286,6 +309,9 @@ void Mirror::handle_enable_mirroring(const Filesystem &filesystem, if (r < 0) { derr << ": failed to initialize FSMirror for filesystem=" << filesystem << ": " << cpp_strerror(r) << dendl; + m_service_daemon->add_or_update_fs_attribute(filesystem.fscid, + SERVICE_DAEMON_MIRROR_ENABLE_FAILED_KEY, + true); return; } @@ -308,6 +334,9 @@ void Mirror::handle_enable_mirroring(const Filesystem &filesystem, int r) { if (r < 0) { derr << ": failed to initialize FSMirror for filesystem=" << filesystem << ": " << cpp_strerror(r) << dendl; + m_service_daemon->add_or_update_fs_attribute(filesystem.fscid, + SERVICE_DAEMON_MIRROR_ENABLE_FAILED_KEY, + true); return; } @@ -332,7 +361,7 @@ void Mirror::enable_mirroring(const Filesystem &filesystem, uint64_t local_pool_ mirror_action.action_in_progress = true; mirror_action.fs_mirror = std::make_unique(m_cct, filesystem, local_pool_id, - m_args, m_work_queue); + m_service_daemon.get(), m_args, m_work_queue); mirror_action.fs_mirror->init(new C_AsyncCallback(m_work_queue, on_finish)); } @@ -500,7 +529,7 @@ void Mirror::run() { dout(20) << dendl; std::unique_lock locker(m_lock); - m_cluster_watcher.reset(new ClusterWatcher(m_cct, m_monc, m_listener)); + m_cluster_watcher.reset(new ClusterWatcher(m_cct, m_monc, m_service_daemon.get(), m_listener)); m_msgr->add_dispatcher_tail(m_cluster_watcher.get()); m_cluster_watcher->init(); diff --git a/src/tools/cephfs_mirror/Mirror.h b/src/tools/cephfs_mirror/Mirror.h index 933b04ac346..3d6dde73010 100644 --- a/src/tools/cephfs_mirror/Mirror.h +++ b/src/tools/cephfs_mirror/Mirror.h @@ -13,6 +13,7 @@ #include "mds/FSMap.h" #include "ClusterWatcher.h" #include "FSMirror.h" +#include "ServiceDaemon.h" #include "Types.h" class Messenger; @@ -100,6 +101,8 @@ private: std::map m_mirror_actions; utime_t m_last_blocklist_check; + RadosRef m_local; + std::unique_ptr m_service_daemon; int init_mon_client(); diff --git a/src/tools/cephfs_mirror/PeerReplayer.cc b/src/tools/cephfs_mirror/PeerReplayer.cc index b3813584bc7..8defdf56ba7 100644 --- a/src/tools/cephfs_mirror/PeerReplayer.cc +++ b/src/tools/cephfs_mirror/PeerReplayer.cc @@ -118,14 +118,21 @@ private: PeerReplayer::PeerReplayer(CephContext *cct, FSMirror *fs_mirror, const Filesystem &filesystem, const Peer &peer, const std::set> &directories, - MountRef mount) + MountRef mount, ServiceDaemon *service_daemon) : m_cct(cct), m_fs_mirror(fs_mirror), + m_filesystem(filesystem), m_peer(peer), m_directories(directories.begin(), directories.end()), m_local_mount(mount), + m_service_daemon(service_daemon), m_asok_hook(new PeerReplayerAdminSocketHook(cct, filesystem, peer, this)), m_lock(ceph::make_mutex("cephfs::mirror::PeerReplayer::" + stringify(peer.uuid))) { + // reset sync stats sent via service daemon + m_service_daemon->add_or_update_peer_attribute(m_filesystem.fscid, m_peer, + SERVICE_DAEMON_FAILED_DIR_COUNT_KEY, (uint64_t)0); + m_service_daemon->add_or_update_peer_attribute(m_filesystem.fscid, m_peer, + SERVICE_DAEMON_RECOVERED_DIR_COUNT_KEY, (uint64_t)0); } PeerReplayer::~PeerReplayer() { diff --git a/src/tools/cephfs_mirror/PeerReplayer.h b/src/tools/cephfs_mirror/PeerReplayer.h index 4b99e7360e7..3c8743ed1f9 100644 --- a/src/tools/cephfs_mirror/PeerReplayer.h +++ b/src/tools/cephfs_mirror/PeerReplayer.h @@ -7,6 +7,7 @@ #include "common/Formatter.h" #include "common/Thread.h" #include "mds/FSMap.h" +#include "ServiceDaemon.h" #include "Types.h" namespace cephfs { @@ -20,7 +21,7 @@ public: PeerReplayer(CephContext *cct, FSMirror *fs_mirror, const Filesystem &filesystem, const Peer &peer, const std::set> &directories, - MountRef mount); + MountRef mount, ServiceDaemon *service_daemon); ~PeerReplayer(); // initialize replayer for a peer @@ -41,6 +42,9 @@ public: private: inline static const std::string PRIMARY_SNAP_ID_KEY = "primary_snap_id"; + inline static const std::string SERVICE_DAEMON_FAILED_DIR_COUNT_KEY = "failure_count"; + inline static const std::string SERVICE_DAEMON_RECOVERED_DIR_COUNT_KEY = "recovery_count"; + bool is_stopping() { return m_stopping; } @@ -101,6 +105,12 @@ private: using clock = ceph::coarse_mono_clock; using time = ceph::coarse_mono_time; + // stats sent to service daemon + struct ServiceDaemonStats { + uint64_t failed_dir_count = 0; + uint64_t recovered_dir_count = 0; + }; + struct SnapSyncStat { uint64_t nr_failures = 0; // number of consecutive failures boost::optional