InstanceWatcher.cc
MirrorWatcher.cc
PeerReplayer.cc
+ ServiceDaemon.cc
Types.cc
Utils.cc
Watcher.cc
#include "mon/MonClient.h"
#include "ClusterWatcher.h"
+#include "ServiceDaemon.h"
#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_cephfs_mirror
namespace cephfs {
namespace mirror {
-ClusterWatcher::ClusterWatcher(CephContext *cct, MonClient *monc, Listener &listener)
+ClusterWatcher::ClusterWatcher(CephContext *cct, MonClient *monc, ServiceDaemon *service_daemon,
+ Listener &listener)
: Dispatcher(cct),
m_monc(monc),
+ m_service_daemon(service_daemon),
m_listener(listener) {
}
dout(5) << ": mirroring enabled=" << mirroring_enabled << ", mirroring_disabled="
<< mirroring_disabled << dendl;
for (auto &fs : mirroring_enabled) {
+ m_service_daemon->add_filesystem(fs.fscid, fs.fs_name);
m_listener.handle_mirroring_enabled(FilesystemSpec(fs, fs_metadata_pools.at(fs)));
}
for (auto &fs : mirroring_disabled) {
+ m_service_daemon->remove_filesystem(fs.fscid);
m_listener.handle_mirroring_disabled(fs);
}
for (auto &[fs, peers] : peers_added) {
for (auto &peer : peers) {
+ m_service_daemon->add_peer(fs.fscid, peer);
m_listener.handle_peers_added(fs, peer);
}
}
for (auto &[fs, peers] : peers_removed) {
for (auto &peer : peers) {
+ m_service_daemon->remove_peer(fs.fscid, peer);
m_listener.handle_peers_removed(fs, peer);
}
}
namespace cephfs {
namespace mirror {
+class ServiceDaemon;
+
// watch peer changes for filesystems via FSMap updates
class ClusterWatcher : public Dispatcher {
virtual void handle_peers_removed(const Filesystem &filesystem, const Peer &peer) = 0;
};
- ClusterWatcher(CephContext *cct, MonClient *monc, Listener &listener);
+ ClusterWatcher(CephContext *cct, MonClient *monc, ServiceDaemon *service_daemon,
+ Listener &listener);
~ClusterWatcher();
bool ms_can_fast_dispatch_any() const override {
private:
ceph::mutex m_lock = ceph::make_mutex("cephfs::mirror::cluster_watcher");
MonClient *m_monc;
+ ServiceDaemon *m_service_daemon;
Listener &m_listener;
std::map<Filesystem, Peers> m_filesystem_peers;
#include "FSMirror.h"
#include "PeerReplayer.h"
#include "aio_utils.h"
+#include "ServiceDaemon.h"
#include "Utils.h"
#include "common/Cond.h"
namespace mirror {
namespace {
+
+const std::string SERVICE_DAEMON_DIR_COUNT_KEY("directory_count");
+const std::string SERVICE_DAEMON_PEER_INIT_FAILED_KEY("peer_init_failed");
+
class MirrorAdminSocketCommand {
public:
virtual ~MirrorAdminSocketCommand() {
};
FSMirror::FSMirror(CephContext *cct, const Filesystem &filesystem, uint64_t pool_id,
- std::vector<const char*> args, ContextWQ *work_queue)
+ ServiceDaemon *service_daemon, std::vector<const char*> args,
+ ContextWQ *work_queue)
: m_cct(cct),
m_filesystem(filesystem),
m_pool_id(pool_id),
+ m_service_daemon(service_daemon),
m_args(args),
m_work_queue(work_queue),
m_snap_listener(this),
m_asok_hook(new MirrorAdminSocketHook(cct, filesystem, this)) {
+ m_service_daemon->add_or_update_fs_attribute(m_filesystem.fscid, SERVICE_DAEMON_DIR_COUNT_KEY,
+ (uint64_t)0);
}
FSMirror::~FSMirror() {
void FSMirror::handle_acquire_directory(string_view dir_path) {
dout(5) << ": dir_path=" << dir_path << dendl;
- std::scoped_lock locker(m_lock);
- m_directories.emplace(dir_path);
- for (auto &[peer, peer_replayer] : m_peer_replayers) {
- dout(10) << ": peer=" << peer << dendl;
- peer_replayer->add_directory(dir_path);
+ {
+ std::scoped_lock locker(m_lock);
+ m_directories.emplace(dir_path);
+ m_service_daemon->add_or_update_fs_attribute(m_filesystem.fscid, SERVICE_DAEMON_DIR_COUNT_KEY,
+ m_directories.size());
+
+ for (auto &[peer, peer_replayer] : m_peer_replayers) {
+ dout(10) << ": peer=" << peer << dendl;
+ peer_replayer->add_directory(dir_path);
+ }
}
}
void FSMirror::handle_release_directory(string_view dir_path) {
dout(5) << ": dir_path=" << dir_path << dendl;
- std::scoped_lock locker(m_lock);
- auto it = m_directories.find(dir_path);
- if (it != m_directories.end()) {
- m_directories.erase(it);
- for (auto &[peer, peer_replayer] : m_peer_replayers) {
- dout(10) << ": peer=" << peer << dendl;
- peer_replayer->remove_directory(dir_path);
+ {
+ std::scoped_lock locker(m_lock);
+ auto it = m_directories.find(dir_path);
+ if (it != m_directories.end()) {
+ m_directories.erase(it);
+ m_service_daemon->add_or_update_fs_attribute(m_filesystem.fscid, SERVICE_DAEMON_DIR_COUNT_KEY,
+ m_directories.size());
+ for (auto &[peer, peer_replayer] : m_peer_replayers) {
+ dout(10) << ": peer=" << peer << dendl;
+ peer_replayer->remove_directory(dir_path);
+ }
}
}
}
}
auto replayer = std::make_unique<PeerReplayer>(
- m_cct, this, m_filesystem, peer, m_directories, m_mount);
+ m_cct, this, m_filesystem, peer, m_directories, m_mount, m_service_daemon);
int r = init_replayer(replayer.get());
if (r < 0) {
+ m_service_daemon->add_or_update_peer_attribute(m_filesystem.fscid, peer,
+ SERVICE_DAEMON_PEER_INIT_FAILED_KEY,
+ true);
return;
}
m_peer_replayers.emplace(peer, std::move(replayer));
class MirrorAdminSocketHook;
class PeerReplayer;
+class ServiceDaemon;
// handle mirroring for a filesystem to a set of peers
class FSMirror {
public:
FSMirror(CephContext *cct, const Filesystem &filesystem, uint64_t pool_id,
- std::vector<const char*> args, ContextWQ *work_queue);
+ ServiceDaemon *service_daemon, std::vector<const char*> args,
+ ContextWQ *work_queue);
~FSMirror();
void init(Context *on_finish);
CephContext *m_cct;
Filesystem m_filesystem;
uint64_t m_pool_id;
+ ServiceDaemon *m_service_daemon;
std::vector<const char *> m_args;
ContextWQ *m_work_queue;
namespace {
+const std::string SERVICE_DAEMON_MIRROR_ENABLE_FAILED_KEY("mirroring_failed");
+
class SafeTimerSingleton : public SafeTimer {
public:
ceph::mutex timer_lock = ceph::make_mutex("cephfs::mirror::timer_lock");
m_monc(monc),
m_msgr(msgr),
m_listener(this),
- m_last_blocklist_check(ceph_clock_now()) {
+ m_last_blocklist_check(ceph_clock_now()),
+ m_local(new librados::Rados()) {
auto thread_pool = &(cct->lookup_or_create_singleton_object<ThreadPoolSingleton>(
"cephfs::mirror::thread_pool", false, cct));
auto safe_timer = &(cct->lookup_or_create_singleton_object<SafeTimerSingleton>(
dout(20) << dendl;
std::scoped_lock locker(m_lock);
- int r = init_mon_client();
+
+ int r = m_local->init_with_context(m_cct);
+ if (r < 0) {
+ derr << ": could not initialize rados handler" << dendl;
+ return r;
+ }
+
+ r = m_local->connect();
+ if (r < 0) {
+ derr << ": error connecting to local cluster" << dendl;
+ return r;
+ }
+
+ m_service_daemon = std::make_unique<ServiceDaemon>(m_cct, m_local);
+ r = m_service_daemon->init();
+ if (r < 0) {
+ derr << ": error registering service daemon: " << cpp_strerror(r) << dendl;
+ return r;
+ }
+
+ r = init_mon_client();
if (r < 0) {
return r;
}
if (r < 0) {
derr << ": failed to initialize FSMirror for filesystem=" << filesystem
<< ": " << cpp_strerror(r) << dendl;
+ m_service_daemon->add_or_update_fs_attribute(filesystem.fscid,
+ SERVICE_DAEMON_MIRROR_ENABLE_FAILED_KEY,
+ true);
return;
}
if (r < 0) {
derr << ": failed to initialize FSMirror for filesystem=" << filesystem
<< ": " << cpp_strerror(r) << dendl;
+ m_service_daemon->add_or_update_fs_attribute(filesystem.fscid,
+ SERVICE_DAEMON_MIRROR_ENABLE_FAILED_KEY,
+ true);
return;
}
mirror_action.action_in_progress = true;
mirror_action.fs_mirror = std::make_unique<FSMirror>(m_cct, filesystem, local_pool_id,
- m_args, m_work_queue);
+ m_service_daemon.get(), m_args, m_work_queue);
mirror_action.fs_mirror->init(new C_AsyncCallback<ContextWQ>(m_work_queue, on_finish));
}
dout(20) << dendl;
std::unique_lock locker(m_lock);
- m_cluster_watcher.reset(new ClusterWatcher(m_cct, m_monc, m_listener));
+ m_cluster_watcher.reset(new ClusterWatcher(m_cct, m_monc, m_service_daemon.get(), m_listener));
m_msgr->add_dispatcher_tail(m_cluster_watcher.get());
m_cluster_watcher->init();
#include "mds/FSMap.h"
#include "ClusterWatcher.h"
#include "FSMirror.h"
+#include "ServiceDaemon.h"
#include "Types.h"
class Messenger;
std::map<Filesystem, MirrorAction> m_mirror_actions;
utime_t m_last_blocklist_check;
+ RadosRef m_local;
+ std::unique_ptr<ServiceDaemon> m_service_daemon;
int init_mon_client();
PeerReplayer::PeerReplayer(CephContext *cct, FSMirror *fs_mirror,
const Filesystem &filesystem, const Peer &peer,
const std::set<std::string, std::less<>> &directories,
- MountRef mount)
+ MountRef mount, ServiceDaemon *service_daemon)
: m_cct(cct),
m_fs_mirror(fs_mirror),
+ m_filesystem(filesystem),
m_peer(peer),
m_directories(directories.begin(), directories.end()),
m_local_mount(mount),
+ m_service_daemon(service_daemon),
m_asok_hook(new PeerReplayerAdminSocketHook(cct, filesystem, peer, this)),
m_lock(ceph::make_mutex("cephfs::mirror::PeerReplayer::" + stringify(peer.uuid))) {
+ // reset sync stats sent via service daemon
+ m_service_daemon->add_or_update_peer_attribute(m_filesystem.fscid, m_peer,
+ SERVICE_DAEMON_FAILED_DIR_COUNT_KEY, (uint64_t)0);
+ m_service_daemon->add_or_update_peer_attribute(m_filesystem.fscid, m_peer,
+ SERVICE_DAEMON_RECOVERED_DIR_COUNT_KEY, (uint64_t)0);
}
PeerReplayer::~PeerReplayer() {
#include "common/Formatter.h"
#include "common/Thread.h"
#include "mds/FSMap.h"
+#include "ServiceDaemon.h"
#include "Types.h"
namespace cephfs {
PeerReplayer(CephContext *cct, FSMirror *fs_mirror,
const Filesystem &filesystem, const Peer &peer,
const std::set<std::string, std::less<>> &directories,
- MountRef mount);
+ MountRef mount, ServiceDaemon *service_daemon);
~PeerReplayer();
// initialize replayer for a peer
private:
inline static const std::string PRIMARY_SNAP_ID_KEY = "primary_snap_id";
+ inline static const std::string SERVICE_DAEMON_FAILED_DIR_COUNT_KEY = "failure_count";
+ inline static const std::string SERVICE_DAEMON_RECOVERED_DIR_COUNT_KEY = "recovery_count";
+
bool is_stopping() {
return m_stopping;
}
using clock = ceph::coarse_mono_clock;
using time = ceph::coarse_mono_time;
+ // stats sent to service daemon
+ struct ServiceDaemonStats {
+ uint64_t failed_dir_count = 0;
+ uint64_t recovered_dir_count = 0;
+ };
+
struct SnapSyncStat {
uint64_t nr_failures = 0; // number of consecutive failures
boost::optional<time> last_failed; // lat failed timestamp
"cephfs_mirror_max_consecutive_failures_per_directory");
auto &sync_stat = m_snap_sync_stats.at(dir_path);
sync_stat.last_failed = clock::now();
- if (++sync_stat.nr_failures >= max_failures) {
+ if (++sync_stat.nr_failures >= max_failures && !sync_stat.failed) {
sync_stat.failed = true;
+ ++m_service_daemon_stats.failed_dir_count;
+ m_service_daemon->add_or_update_peer_attribute(m_filesystem.fscid, m_peer,
+ SERVICE_DAEMON_FAILED_DIR_COUNT_KEY,
+ m_service_daemon_stats.failed_dir_count);
}
}
void _reset_failed_count(const std::string &dir_path) {
auto &sync_stat = m_snap_sync_stats.at(dir_path);
+ if (sync_stat.failed) {
+ ++m_service_daemon_stats.recovered_dir_count;
+ m_service_daemon->add_or_update_peer_attribute(m_filesystem.fscid, m_peer,
+ SERVICE_DAEMON_RECOVERED_DIR_COUNT_KEY,
+ m_service_daemon_stats.recovered_dir_count);
+ }
sync_stat.nr_failures = 0;
sync_stat.failed = false;
sync_stat.last_failed = boost::none;
CephContext *m_cct;
FSMirror *m_fs_mirror;
+ Filesystem m_filesystem;
Peer m_peer;
// probably need to be encapsulated when supporting cancelations
std::map<std::string, DirRegistry> m_registered;
std::vector<std::string> m_directories;
std::map<std::string, SnapSyncStat> m_snap_sync_stats;
MountRef m_local_mount;
+ ServiceDaemon *m_service_daemon;
PeerReplayerAdminSocketHook *m_asok_hook = nullptr;
ceph::mutex m_lock;
bool m_stopping = false;
SnapshotReplayers m_replayers;
+ ServiceDaemonStats m_service_daemon_stats;
+
void run(SnapshotReplayerThread *replayer);
boost::optional<std::string> pick_directory();
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "common/debug.h"
+#include "common/errno.h"
+#include "common/Timer.h"
+#include "include/stringify.h"
+#include "ServiceDaemon.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_cephfs_mirror
+#undef dout_prefix
+#define dout_prefix *_dout << "cephfs::mirror::ServiceDaemon: " << this << " " \
+ << __func__
+
+namespace cephfs {
+namespace mirror {
+
+namespace {
+
+struct AttributeDumpVisitor : public boost::static_visitor<void> {
+ ceph::Formatter *f;
+ std::string name;
+
+ AttributeDumpVisitor(ceph::Formatter *f, std::string_view name)
+ : f(f), name(name) {
+ }
+
+ void operator()(bool val) const {
+ f->dump_bool(name.c_str(), val);
+ }
+ void operator()(uint64_t val) const {
+ f->dump_unsigned(name.c_str(), val);
+ }
+ void operator()(const std::string &val) const {
+ f->dump_string(name.c_str(), val);
+ }
+};
+
+} // anonymous namespace
+
+ServiceDaemon::ServiceDaemon(CephContext *cct, RadosRef rados)
+ : m_cct(cct),
+ m_rados(rados),
+ m_timer(new SafeTimer(cct, m_timer_lock, true)) {
+ m_timer->init();
+}
+
+ServiceDaemon::~ServiceDaemon() {
+ dout(10) << dendl;
+ {
+ std::scoped_lock timer_lock(m_timer_lock);
+ if (m_timer_ctx != nullptr) {
+ dout(5) << ": canceling timer task=" << m_timer_ctx << dendl;
+ m_timer->cancel_event(m_timer_ctx);
+ }
+ m_timer->shutdown();
+ }
+
+ delete m_timer;
+}
+
+int ServiceDaemon::init() {
+ dout(20) << dendl;
+
+ std::string id = m_cct->_conf->name.get_id();
+ if (id.find(CEPHFS_MIRROR_AUTH_ID_PREFIX) == 0) {
+ id = id.substr(CEPHFS_MIRROR_AUTH_ID_PREFIX.size());
+ }
+ std::string instance_id = stringify(m_rados->get_instance_id());
+
+ std::map<std::string, std::string> service_metadata = {{"id", id},
+ {"instance_id", instance_id}};
+ int r = m_rados->service_daemon_register("cephfs-mirror", instance_id,
+ service_metadata);
+ if (r < 0) {
+ return r;
+ }
+ return 0;
+}
+
+void ServiceDaemon::add_filesystem(fs_cluster_id_t fscid, std::string_view fs_name) {
+ dout(10) << ": fscid=" << fscid << ", fs_name=" << fs_name << dendl;
+
+ {
+ std::scoped_lock locker(m_lock);
+ m_filesystems.emplace(fscid, Filesystem(fs_name));
+ }
+ schedule_update_status();
+}
+
+void ServiceDaemon::remove_filesystem(fs_cluster_id_t fscid) {
+ dout(10) << ": fscid=" << fscid << dendl;
+
+ {
+ std::scoped_lock locker(m_lock);
+ m_filesystems.erase(fscid);
+ }
+ schedule_update_status();
+}
+
+void ServiceDaemon::add_peer(fs_cluster_id_t fscid, const Peer &peer) {
+ dout(10) << ": peer=" << peer << dendl;
+
+ {
+ std::scoped_lock locker(m_lock);
+ auto fs_it = m_filesystems.find(fscid);
+ if (fs_it == m_filesystems.end()) {
+ return;
+ }
+ fs_it->second.peer_attributes.emplace(peer, Attributes{});
+ }
+ schedule_update_status();
+}
+
+void ServiceDaemon::remove_peer(fs_cluster_id_t fscid, const Peer &peer) {
+ dout(10) << ": peer=" << peer << dendl;
+
+ {
+ std::scoped_lock locker(m_lock);
+ auto fs_it = m_filesystems.find(fscid);
+ if (fs_it == m_filesystems.end()) {
+ return;
+ }
+ fs_it->second.peer_attributes.erase(peer);
+ }
+ schedule_update_status();
+}
+
+void ServiceDaemon::add_or_update_fs_attribute(fs_cluster_id_t fscid, std::string_view key,
+ AttributeValue value) {
+ dout(10) << ": fscid=" << fscid << dendl;
+
+ {
+ std::scoped_lock locker(m_lock);
+ auto fs_it = m_filesystems.find(fscid);
+ if (fs_it == m_filesystems.end()) {
+ return;
+ }
+
+ fs_it->second.fs_attributes[std::string(key)] = value;
+ }
+ schedule_update_status();
+}
+
+void ServiceDaemon::add_or_update_peer_attribute(fs_cluster_id_t fscid, const Peer &peer,
+ std::string_view key, AttributeValue value) {
+ dout(10) << ": fscid=" << fscid << dendl;
+
+ {
+ std::scoped_lock locker(m_lock);
+ auto fs_it = m_filesystems.find(fscid);
+ if (fs_it == m_filesystems.end()) {
+ return;
+ }
+
+ auto peer_it = fs_it->second.peer_attributes.find(peer);
+ if (peer_it == fs_it->second.peer_attributes.end()) {
+ return;
+ }
+
+ peer_it->second[std::string(key)] = value;
+ }
+ schedule_update_status();
+}
+
+void ServiceDaemon::schedule_update_status() {
+ dout(10) << dendl;
+
+ std::scoped_lock timer_lock(m_timer_lock);
+ if (m_timer_ctx != nullptr) {
+ return;
+ }
+
+ m_timer_ctx = new LambdaContext([this] {
+ m_timer_ctx = nullptr;
+ update_status();
+ });
+ m_timer->add_event_after(1, m_timer_ctx);
+}
+
+void ServiceDaemon::update_status() {
+ dout(20) << ": " << m_filesystems.size() << " filesystem(s)" << dendl;
+
+ ceph::JSONFormatter f;
+ {
+ std::scoped_lock locker(m_lock);
+ f.open_object_section("filesystems");
+ for (auto &[fscid, filesystem] : m_filesystems) {
+ f.open_object_section(stringify(fscid).c_str());
+ f.dump_string("name", filesystem.fs_name);
+ for (auto &[attr_name, attr_value] : filesystem.fs_attributes) {
+ AttributeDumpVisitor visitor(&f, attr_name);
+ boost::apply_visitor(visitor, attr_value);
+ }
+ f.open_object_section("peers");
+ for (auto &[peer, attributes] : filesystem.peer_attributes) {
+ f.open_object_section(peer.uuid);
+ f.dump_object("remote", peer.remote);
+ f.open_object_section("stats");
+ for (auto &[attr_name, attr_value] : attributes) {
+ AttributeDumpVisitor visitor(&f, attr_name);
+ boost::apply_visitor(visitor, attr_value);
+ }
+ f.close_section(); // stats
+ f.close_section(); // peer.uuid
+ }
+ f.close_section(); // peers
+ f.close_section(); // fscid
+ }
+ f.close_section(); // filesystems
+ }
+
+ std::stringstream ss;
+ f.flush(ss);
+
+ int r = m_rados->service_daemon_update_status({{"status_json", ss.str()}});
+ if (r < 0) {
+ derr << ": failed to update service daemon status: " << cpp_strerror(r)
+ << dendl;
+ }
+}
+
+} // namespace mirror
+} // namespace cephfs
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPHFS_MIRROR_SERVICE_DAEMON_H
+#define CEPHFS_MIRROR_SERVICE_DAEMON_H
+
+#include "common/ceph_mutex.h"
+#include "mds/FSMap.h"
+#include "Types.h"
+
+class SafeTimer;
+
+namespace cephfs {
+namespace mirror {
+
+class ServiceDaemon {
+public:
+ ServiceDaemon(CephContext *cct, RadosRef rados);
+ ~ServiceDaemon();
+
+ int init();
+
+ void add_filesystem(fs_cluster_id_t fscid, std::string_view fs_name);
+ void remove_filesystem(fs_cluster_id_t fscid);
+
+ void add_peer(fs_cluster_id_t fscid, const Peer &peer);
+ void remove_peer(fs_cluster_id_t fscid, const Peer &peer);
+
+ void add_or_update_fs_attribute(fs_cluster_id_t fscid, std::string_view key,
+ AttributeValue value);
+ void add_or_update_peer_attribute(fs_cluster_id_t fscid, const Peer &peer,
+ std::string_view key, AttributeValue value);
+
+private:
+ struct Filesystem {
+ std::string fs_name;
+ Attributes fs_attributes;
+ std::map<Peer, Attributes> peer_attributes;
+
+ Filesystem(std::string_view fs_name)
+ : fs_name(fs_name) {
+ }
+ };
+
+ const std::string CEPHFS_MIRROR_AUTH_ID_PREFIX = "cephfs-mirror.";
+
+ CephContext *m_cct;
+ RadosRef m_rados;
+ SafeTimer *m_timer;
+ ceph::mutex m_timer_lock = ceph::make_mutex("cephfs::mirror::ServiceDaemon");
+
+ ceph::mutex m_lock = ceph::make_mutex("cephfs::mirror::service_daemon");
+ Context *m_timer_ctx = nullptr;
+ std::map<fs_cluster_id_t, Filesystem> m_filesystems;
+
+ void schedule_update_status();
+ void update_status();
+};
+
+} // namespace mirror
+} // namespace cephfs
+
+#endif // CEPHFS_MIRROR_SERVICE_DAEMON_H
static const std::string CEPHFS_MIRROR_OBJECT("cephfs_mirror");
+typedef boost::variant<bool, uint64_t, std::string> AttributeValue;
+typedef std::map<std::string, AttributeValue> Attributes;
+
// distinct filesystem identifier
struct Filesystem {
fs_cluster_id_t fscid;