--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "common/ceph_context.h"
+#include "common/debug.h"
+#include "common/errno.h"
+#include "common/WorkQueue.h"
+#include "include/stringify.h"
+#include "aio_utils.h"
+#include "watcher/RewatchRequest.h"
+#include "Watcher.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_cephfs_mirror
+#undef dout_prefix
+#define dout_prefix *_dout << "cephfs::mirror::Watcher " << __func__
+
+using cephfs::mirror::watcher::RewatchRequest;
+
+namespace cephfs {
+namespace mirror {
+
+namespace {
+
+struct C_UnwatchAndFlush : public Context {
+ librados::Rados rados;
+ Context *on_finish;
+ bool flushing = false;
+ int ret_val = 0;
+
+ C_UnwatchAndFlush(librados::IoCtx &ioctx, Context *on_finish)
+ : rados(ioctx), on_finish(on_finish) {
+ }
+
+ void complete(int r) override {
+ if (ret_val == 0 && r < 0) {
+ ret_val = r;
+ }
+
+ if (!flushing) {
+ flushing = true;
+
+ librados::AioCompletion *aio_comp =
+ librados::Rados::aio_create_completion(
+ this, &rados_callback<Context, &Context::complete>);
+ r = rados.aio_watch_flush(aio_comp);
+
+ ceph_assert(r == 0);
+ aio_comp->release();
+ return;
+ }
+
+ // ensure our reference to the RadosClient is released prior
+ // to completing the callback to avoid racing an explicit
+ // librados shutdown
+ Context *ctx = on_finish;
+ r = ret_val;
+ delete this;
+
+ ctx->complete(r);
+ }
+
+ void finish(int r) override {
+ }
+};
+
+} // anonymous namespace
+
+Watcher::Watcher(librados::IoCtx &ioctx, std::string_view oid, ContextWQ *work_queue)
+ : m_oid(oid),
+ m_ioctx(ioctx),
+ m_work_queue(work_queue),
+ m_lock(ceph::make_shared_mutex("cephfs::mirror::snap_watcher")),
+ m_state(STATE_IDLE),
+ m_watch_ctx(*this) {
+}
+
+Watcher::~Watcher() {
+}
+
+void Watcher::register_watch(Context *on_finish) {
+ dout(20) << dendl;
+
+ std::scoped_lock locker(m_lock);
+ m_state = STATE_REGISTERING;
+
+ on_finish = new C_RegisterWatch(this, on_finish);
+ librados::AioCompletion *aio_comp =
+ librados::Rados::aio_create_completion(on_finish, &rados_callback<Context, &Context::complete>);
+ int r = m_ioctx.aio_watch(m_oid, aio_comp, &m_watch_handle, &m_watch_ctx);
+ ceph_assert(r == 0);
+ aio_comp->release();
+}
+
+void Watcher::handle_register_watch(int r, Context *on_finish) {
+ dout(20) << ": r=" << r << dendl;
+
+ bool watch_error = false;
+ Context *unregister_watch_ctx = nullptr;
+ {
+ std::scoped_lock locker(m_lock);
+ ceph_assert(m_state == STATE_REGISTERING);
+
+ m_state = STATE_IDLE;
+ if (r < 0) {
+ derr << ": failed to register watch: " << cpp_strerror(r) << dendl;
+ m_watch_handle = 0;
+ }
+
+ if (m_unregister_watch_ctx != nullptr) {
+ std::swap(unregister_watch_ctx, m_unregister_watch_ctx);
+ } else if (r == 0 && m_watch_error) {
+ derr << ": re-registering after watch error" << dendl;
+ m_state = STATE_REGISTERING;
+ watch_error = true;
+ } else {
+ m_watch_blacklisted = (r == -EBLACKLISTED);
+ }
+ }
+
+ on_finish->complete(r);
+ if (unregister_watch_ctx != nullptr) {
+ unregister_watch_ctx->complete(0);
+ } else if (watch_error) {
+ rewatch();
+ }
+}
+
+void Watcher::unregister_watch(Context *on_finish) {
+ dout(20) << dendl;
+
+ {
+ std::scoped_lock locker(m_lock);
+ if (m_state != STATE_IDLE) {
+ dout(10) << ": delaying unregister -- watch register in progress" << dendl;
+ ceph_assert(m_unregister_watch_ctx == nullptr);
+ m_unregister_watch_ctx = new LambdaContext([this, on_finish](int r) {
+ unregister_watch(on_finish);
+ });
+ return;
+ } else if (is_registered()) {
+ // watch is registered -- unwatch
+ librados::AioCompletion *aio_comp =
+ librados::Rados::aio_create_completion(new C_UnwatchAndFlush(m_ioctx, on_finish),
+ &rados_callback<Context, &Context::complete>);
+ int r = m_ioctx.aio_unwatch(m_watch_handle, aio_comp);
+ ceph_assert(r == 0);
+ aio_comp->release();
+ m_watch_handle = 0;
+ m_watch_blacklisted = false;
+ return;
+ }
+ }
+
+ on_finish->complete(0);
+}
+
+void Watcher::handle_error(uint64_t handle, int err) {
+ derr << ": handle=" << handle << ": " << cpp_strerror(err) << dendl;
+
+ std::scoped_lock locker(m_lock);
+ m_watch_error = true;
+
+ if (is_registered()) {
+ m_state = STATE_REWATCHING;
+ if (err == -EBLACKLISTED) {
+ m_watch_blacklisted = true;
+ }
+ m_work_queue->queue(new LambdaContext([this] {
+ rewatch();
+ }), 0);
+ }
+}
+
+void Watcher::rewatch() {
+ dout(20) << dendl;
+
+ Context *unregister_watch_ctx = nullptr;
+ {
+ std::unique_lock locker(m_lock);
+ ceph_assert(m_state == STATE_REWATCHING);
+
+ if (m_unregister_watch_ctx != nullptr) {
+ m_state = STATE_IDLE;
+ std::swap(unregister_watch_ctx, m_unregister_watch_ctx);
+ } else {
+ m_watch_error = false;
+ Context *ctx = new C_CallbackAdapter<Watcher, &Watcher::handle_rewatch>(this);
+ auto req = RewatchRequest::create(m_ioctx, m_oid, m_lock,
+ &m_watch_ctx, &m_watch_handle, ctx);
+ req->send();
+ return;
+ }
+ }
+
+ unregister_watch_ctx->complete(0);
+}
+
+void Watcher::handle_rewatch(int r) {
+ dout(20) << ": r=" << r << dendl;
+
+ bool watch_error = false;
+ Context *unregister_watch_ctx = nullptr;
+ {
+ std::scoped_lock locker(m_lock);
+ ceph_assert(m_state == STATE_REWATCHING);
+
+ m_watch_blacklisted = false;
+ if (m_unregister_watch_ctx != nullptr) {
+ dout(10) << ": skipping rewatch -- unregistering" << dendl;
+ m_state = STATE_IDLE;
+ std::swap(unregister_watch_ctx, m_unregister_watch_ctx);
+ } else if (r == -EBLACKLISTED) {
+ m_watch_blacklisted = true;
+ derr << ": client blacklisted" << dendl;
+ } else if (r == -ENOENT) {
+ dout(5) << ": object " << m_oid << " does not exist" << dendl;
+ } else if (r < 0) {
+ derr << ": failed to rewatch: " << cpp_strerror(r) << dendl;
+ watch_error = true;
+ } else if (m_watch_error) {
+ derr << ": re-registering watch after error" << dendl;
+ watch_error = true;
+ }
+ }
+
+ if (unregister_watch_ctx != nullptr) {
+ unregister_watch_ctx->complete(0);
+ return;
+ } else if (watch_error) {
+ rewatch();
+ return;
+ }
+
+ Context *ctx = new C_CallbackAdapter<Watcher, &Watcher::handle_rewatch_callback>(this);
+ m_work_queue->queue(ctx, r);
+}
+
+void Watcher::handle_rewatch_callback(int r) {
+ dout(10) << ": r=" << r << dendl;
+ handle_rewatch_complete(r);
+
+ bool watch_error = false;
+ Context *unregister_watch_ctx = nullptr;
+ {
+ std::scoped_lock locker(m_lock);
+ ceph_assert(m_state == STATE_REWATCHING);
+
+ if (m_unregister_watch_ctx != nullptr) {
+ m_state = STATE_IDLE;
+ std::swap(unregister_watch_ctx, m_unregister_watch_ctx);
+ } else if (r == -EBLACKLISTED || r == -ENOENT) {
+ m_state = STATE_IDLE;
+ } else if (r < 0 || m_watch_error) {
+ watch_error = true;
+ } else {
+ m_state = STATE_IDLE;
+ }
+ }
+
+ if (unregister_watch_ctx != nullptr) {
+ unregister_watch_ctx->complete(0);
+ } else if (watch_error) {
+ rewatch();
+ }
+}
+
+void Watcher::acknowledge_notify(uint64_t notify_id, uint64_t handle, bufferlist &bl) {
+ m_ioctx.notify_ack(m_oid, notify_id, handle, bl);
+}
+
+void Watcher::WatchCtx::handle_notify(uint64_t notify_id, uint64_t handle,
+ uint64_t notifier_id, bufferlist& bl) {
+ dout(20) << ": notify_id=" << notify_id << ", handle=" << handle
+ << ", notifier_id=" << notifier_id << dendl;
+ watcher.handle_notify(notify_id, handle, notifier_id, bl);
+}
+
+void Watcher::WatchCtx::handle_error(uint64_t handle, int err) {
+ dout(20) << dendl;
+ watcher.handle_error(handle, err);
+}
+
+} // namespace mirror
+} // namespace cephfs
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPHFS_MIRROR_WATCHER_H
+#define CEPHFS_MIRROR_WATCHER_H
+
+#include <string_view>
+
+#include "common/ceph_mutex.h"
+#include "include/Context.h"
+#include "include/rados/librados.hpp"
+
+class ContextWQ;
+
+namespace cephfs {
+namespace mirror {
+
+// generic watcher class -- establish watch on a given rados object
+// and invoke handle_notify() when notified. On notify error, try
+// to re-establish the watch. Errors during rewatch are notified via
+// handle_rewatch_complete().
+
+class Watcher {
+public:
+ Watcher(librados::IoCtx &ioctx, std::string_view oid, ContextWQ *work_queue);
+ virtual ~Watcher();
+
+ void register_watch(Context *on_finish);
+ void unregister_watch(Context *on_finish);
+
+protected:
+ std::string m_oid;
+
+ void acknowledge_notify(uint64_t notify_if, uint64_t handle, bufferlist &bl);
+
+ bool is_registered() const {
+ return m_state == STATE_IDLE && m_watch_handle != 0;
+ }
+ bool is_unregistered() const {
+ return m_state == STATE_IDLE && m_watch_handle == 0;
+ }
+
+ virtual void handle_rewatch_complete(int r) { }
+
+private:
+ enum State {
+ STATE_IDLE,
+ STATE_REGISTERING,
+ STATE_REWATCHING
+ };
+
+ struct WatchCtx : public librados::WatchCtx2 {
+ Watcher &watcher;
+
+ WatchCtx(Watcher &parent) : watcher(parent) {}
+
+ void handle_notify(uint64_t notify_id,
+ uint64_t handle,
+ uint64_t notifier_id,
+ bufferlist& bl) override;
+ void handle_error(uint64_t handle, int err) override;
+ };
+
+ struct C_RegisterWatch : public Context {
+ Watcher *watcher;
+ Context *on_finish;
+
+ C_RegisterWatch(Watcher *watcher, Context *on_finish)
+ : watcher(watcher),
+ on_finish(on_finish) {
+ }
+
+ void finish(int r) override {
+ watcher->handle_register_watch(r, on_finish);
+ }
+ };
+
+ librados::IoCtx &m_ioctx;
+ ContextWQ *m_work_queue;
+
+ mutable ceph::shared_mutex m_lock;
+ State m_state;
+ bool m_watch_error = false;
+ bool m_watch_blacklisted = false;
+ uint64_t m_watch_handle;
+ WatchCtx m_watch_ctx;
+ Context *m_unregister_watch_ctx = nullptr;
+
+ virtual void handle_notify(uint64_t notify_id, uint64_t handle,
+ uint64_t notifier_id, bufferlist& bl) = 0;
+ void handle_error(uint64_t handle, int err);
+
+ void rewatch();
+ void handle_rewatch(int r);
+ void handle_rewatch_callback(int r);
+ void handle_register_watch(int r, Context *on_finish);
+};
+
+} // namespace mirror
+} // namespace cephfs
+
+#endif // CEPHFS_MIRROR_WATCHER_H
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPHFS_MIRROR_AIO_UTILS_H
+#define CEPHFS_MIRROR_AIO_UTILS_H
+
+#include "include/rados/librados.hpp"
+
+namespace cephfs {
+namespace mirror {
+
+template <typename T, void(T::*MF)(int)>
+void rados_callback(rados_completion_t c, void *arg) {
+ T *obj = reinterpret_cast<T*>(arg);
+ int r = rados_aio_get_return_value(c);
+ (obj->*MF)(r);
+}
+
+template <typename T, void (T::*MF)(int)>
+class C_CallbackAdapter : public Context {
+ T *obj;
+public:
+ C_CallbackAdapter(T *obj)
+ : obj(obj) {
+ }
+
+protected:
+ void finish(int r) override {
+ (obj->*MF)(r);
+ }
+};
+
+template <typename WQ>
+struct C_AsyncCallback : public Context {
+ WQ *op_work_queue;
+ Context *on_finish;
+
+ C_AsyncCallback(WQ *op_work_queue, Context *on_finish)
+ : op_work_queue(op_work_queue), on_finish(on_finish) {
+ }
+ ~C_AsyncCallback() override {
+ delete on_finish;
+ }
+ void finish(int r) override {
+ op_work_queue->queue(on_finish, r);
+ on_finish = nullptr;
+ }
+};
+
+} // namespace mirror
+} // namespace cephfs
+
+#endif // CEPHFS_MIRROR_AIO_UTILS_H
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "common/ceph_mutex.h"
+#include "common/debug.h"
+#include "common/errno.h"
+#include "include/Context.h"
+#include "tools/cephfs_mirror/aio_utils.h"
+#include "RewatchRequest.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_cephfs_mirror
+#undef dout_prefix
+#define dout_prefix *_dout << "cephfs::mirror::watcher:RewatchRequest " << __func__
+
+namespace cephfs {
+namespace mirror {
+namespace watcher {
+
+RewatchRequest::RewatchRequest(librados::IoCtx &ioctx, const std::string &oid,
+ ceph::shared_mutex &watch_lock,
+ librados::WatchCtx2 *watch_ctx,
+ uint64_t *watch_handle, Context *on_finish)
+ : m_ioctx(ioctx), m_oid(oid), m_lock(watch_lock),
+ m_watch_ctx(watch_ctx), m_watch_handle(watch_handle),
+ m_on_finish(on_finish) {
+}
+
+void RewatchRequest::send() {
+ unwatch();
+}
+
+void RewatchRequest::unwatch() {
+ ceph_assert(ceph_mutex_is_wlocked(m_lock));
+ if (*m_watch_handle == 0) {
+ rewatch();
+ return;
+ }
+
+ dout(10) << dendl;
+
+ uint64_t watch_handle = 0;
+ std::swap(*m_watch_handle, watch_handle);
+
+ librados::AioCompletion *aio_comp =
+ librados::Rados::aio_create_completion(
+ this, &rados_callback<RewatchRequest, &RewatchRequest::handle_unwatch>);
+ int r = m_ioctx.aio_unwatch(watch_handle, aio_comp);
+ ceph_assert(r == 0);
+ aio_comp->release();
+}
+
+void RewatchRequest::handle_unwatch(int r) {
+ dout(20) << ": r=" << r << dendl;
+
+ if (r == -EBLACKLISTED) {
+ derr << ": client blacklisted" << dendl;
+ finish(r);
+ return;
+ } else if (r < 0) {
+ derr << ": failed to unwatch: " << cpp_strerror(r) << dendl;
+ }
+
+ rewatch();
+}
+
+void RewatchRequest::rewatch() {
+ dout(20) << dendl;
+
+ librados::AioCompletion *aio_comp =
+ librados::Rados::aio_create_completion(
+ this, &rados_callback<RewatchRequest, &RewatchRequest::handle_rewatch>);
+ int r = m_ioctx.aio_watch(m_oid, aio_comp, &m_rewatch_handle, m_watch_ctx);
+ ceph_assert(r == 0);
+ aio_comp->release();
+}
+
+void RewatchRequest::handle_rewatch(int r) {
+ dout(20) << ": r=" << r << dendl;
+
+ if (r < 0) {
+ derr << ": failed to watch object: " << cpp_strerror(r) << dendl;
+ m_rewatch_handle = 0;
+ }
+
+ {
+ std::unique_lock locker(m_lock);
+ *m_watch_handle = m_rewatch_handle;
+ }
+
+ finish(r);
+}
+
+void RewatchRequest::finish(int r) {
+ dout(20) << ": r=" << r << dendl;
+ m_on_finish->complete(r);
+ delete this;
+}
+
+} // namespace watcher
+} // namespace mirror
+} // namespace cephfs
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPHFS_MIRROR_WATCHER_REWATCH_REQUEST_H
+#define CEPHFS_MIRROR_WATCHER_REWATCH_REQUEST_H
+
+#include "common/ceph_mutex.h"
+#include "include/int_types.h"
+#include "include/rados/librados.hpp"
+
+struct Context;
+
+namespace cephfs {
+namespace mirror {
+namespace watcher {
+
+// Rewatch an existing watch -- the watch can be in an operatioal
+// or error state.
+
+class RewatchRequest {
+public:
+
+ static RewatchRequest *create(librados::IoCtx &ioctx, const std::string &oid,
+ ceph::shared_mutex &watch_lock,
+ librados::WatchCtx2 *watch_ctx,
+ uint64_t *watch_handle, Context *on_finish) {
+ return new RewatchRequest(ioctx, oid, watch_lock, watch_ctx, watch_handle,
+ on_finish);
+ }
+
+ RewatchRequest(librados::IoCtx &ioctx, const std::string &oid,
+ ceph::shared_mutex &watch_lock, librados::WatchCtx2 *watch_ctx,
+ uint64_t *watch_handle, Context *on_finish);
+
+ void send();
+
+private:
+ librados::IoCtx& m_ioctx;
+ std::string m_oid;
+ ceph::shared_mutex &m_lock;
+ librados::WatchCtx2 *m_watch_ctx;
+ uint64_t *m_watch_handle;
+ Context *m_on_finish;
+
+ uint64_t m_rewatch_handle = 0;
+
+ void unwatch();
+ void handle_unwatch(int r);
+
+ void rewatch();
+ void handle_rewatch(int r);
+
+ void finish(int r);
+};
+
+} // namespace watcher
+} // namespace mirror
+} // namespace cephfs
+
+#endif // CEPHFS_MIRROR_WATCHER_REWATCH_REQUEST_H