--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "common/ceph_context.h"
+#include "common/ceph_json.h"
+#include "common/debug.h"
+#include "common/errno.h"
+#include "common/WorkQueue.h"
+#include "include/stringify.h"
+#include "msg/Messenger.h"
+#include "aio_utils.h"
+#include "MirrorWatcher.h"
+#include "Types.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_cephfs_mirror
+#undef dout_prefix
+#define dout_prefix *_dout << "cephfs::mirror::MirrorWatcher " << __func__
+
+namespace cephfs {
+namespace mirror {
+
+MirrorWatcher::MirrorWatcher(librados::IoCtx &ioctx, std::string_view addrs,
+ ContextWQ *work_queue)
+ : Watcher(ioctx, CEPHFS_MIRROR_OBJECT, work_queue),
+ m_ioctx(ioctx),
+ m_addrs(addrs),
+ m_work_queue(work_queue),
+ m_lock(ceph::make_mutex("cephfs::mirror::mirror_watcher")),
+ m_instance_id(stringify(m_ioctx.get_instance_id())) {
+}
+
+MirrorWatcher::~MirrorWatcher() {
+}
+
+void MirrorWatcher::init(Context *on_finish) {
+ dout(20) << dendl;
+
+ {
+ std::scoped_lock locker(m_lock);
+ ceph_assert(m_on_init_finish == nullptr);
+ m_on_init_finish = new LambdaContext([this, on_finish](int r) {
+ on_finish->complete(r);
+ if (m_on_shutdown_finish != nullptr) {
+ m_on_shutdown_finish->complete(0);
+ }
+ });
+ }
+
+ register_watcher();
+}
+
+void MirrorWatcher::shutdown(Context *on_finish) {
+ dout(20) << dendl;
+
+ {
+ std::scoped_lock locker(m_lock);
+ ceph_assert(m_on_shutdown_finish == nullptr);
+ if (m_on_init_finish != nullptr) {
+ dout(10) << ": delaying shutdown -- init in progress" << dendl;
+ m_on_shutdown_finish = new LambdaContext([this, on_finish](int r) {
+ m_on_shutdown_finish = nullptr;
+ shutdown(on_finish);
+ });
+ return;
+ }
+
+ m_on_shutdown_finish = on_finish;
+ }
+
+ unregister_watcher();
+}
+
+void MirrorWatcher::handle_notify(uint64_t notify_id, uint64_t handle,
+ uint64_t notifier_id, bufferlist& bl) {
+ dout(20) << dendl;
+
+ JSONFormatter f;
+ f.dump_string("addr", m_addrs);
+
+ bufferlist outbl;
+ f.flush(outbl);
+ acknowledge_notify(notify_id, handle, outbl);
+}
+
+void MirrorWatcher::register_watcher() {
+ dout(20) << dendl;
+
+ std::scoped_lock locker(m_lock);
+ Context *on_finish = new C_CallbackAdapter<
+ MirrorWatcher, &MirrorWatcher::handle_register_watcher>(this);
+ register_watch(on_finish);
+}
+
+void MirrorWatcher::handle_register_watcher(int r) {
+ dout(20) << ": r=" << r << dendl;
+
+ Context *on_init_finish = nullptr;
+ {
+ std::scoped_lock locker(m_lock);
+ std::swap(on_init_finish, m_on_init_finish);
+ }
+
+ on_init_finish->complete(r);
+}
+
+void MirrorWatcher::unregister_watcher() {
+ dout(20) << dendl;
+
+ std::scoped_lock locker(m_lock);
+ Context *on_finish = new C_CallbackAdapter<
+ MirrorWatcher, &MirrorWatcher::handle_unregister_watcher>(this);
+ unregister_watch(new C_AsyncCallback<ContextWQ>(m_work_queue, on_finish));
+}
+
+void MirrorWatcher::handle_unregister_watcher(int r) {
+ dout(20) << ": r=" << r << dendl;
+
+ Context *on_shutdown_finish = nullptr;
+ {
+ std::scoped_lock locker(m_lock);
+ std::swap(on_shutdown_finish, m_on_shutdown_finish);
+ }
+
+ on_shutdown_finish->complete(r);
+}
+
+} // namespace mirror
+} // namespace cephfs
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPHFS_MIRROR_MIRROR_WATCHER_H
+#define CEPHFS_MIRROR_MIRROR_WATCHER_H
+
+#include <string_view>
+
+#include "common/ceph_mutex.h"
+#include "include/Context.h"
+#include "include/rados/librados.hpp"
+#include "Watcher.h"
+
+class ContextWQ;
+class Messenger;
+
+namespace cephfs {
+namespace mirror {
+
+// watch for notifications via cephfs_mirror object (in metadata
+// pool). this is used sending keepalived with keepalive payload
+// being the rados instance address (used by the manager module
+// to blacklist when needed).
+
+class MirrorWatcher : public Watcher {
+public:
+ static MirrorWatcher *create(librados::IoCtx &ioctx, std::string_view addrs,
+ ContextWQ *work_queue) {
+ return new MirrorWatcher(ioctx, addrs, work_queue);
+ }
+
+ MirrorWatcher(librados::IoCtx &ioctx, std::string_view addrs,
+ ContextWQ *work_queue);
+ ~MirrorWatcher();
+
+ void init(Context *on_finish);
+ void shutdown(Context *on_finish);
+
+ void handle_notify(uint64_t notify_id, uint64_t handle,
+ uint64_t notifier_id, bufferlist& bl) override;
+
+private:
+ librados::IoCtx &m_ioctx;
+ std::string m_addrs;
+ ContextWQ *m_work_queue;
+
+ ceph::mutex m_lock;
+ std::string m_instance_id;
+
+ Context *m_on_init_finish = nullptr;
+ Context *m_on_shutdown_finish = nullptr;
+
+ void register_watcher();
+ void handle_register_watcher(int r);
+
+ void unregister_watcher();
+ void handle_unregister_watcher(int r);
+};
+
+} // namespace mirror
+} // namespace cephfs
+
+#endif // CEPHFS_MIRROR_MIRROR_WATCHER_H