From 344979a521c788b7e0b3f892a5a5f45a7c34fa43 Mon Sep 17 00:00:00 2001 From: Venky Shankar Date: Thu, 18 Jun 2020 00:55:57 -0400 Subject: [PATCH] cephfs-mirror: Watcher class to receive/ack watch notifications Signed-off-by: Venky Shankar --- src/tools/cephfs_mirror/Watcher.cc | 285 ++++++++++++++++++ src/tools/cephfs_mirror/Watcher.h | 102 +++++++ src/tools/cephfs_mirror/aio_utils.h | 53 ++++ .../cephfs_mirror/watcher/RewatchRequest.cc | 102 +++++++ .../cephfs_mirror/watcher/RewatchRequest.h | 60 ++++ 5 files changed, 602 insertions(+) create mode 100644 src/tools/cephfs_mirror/Watcher.cc create mode 100644 src/tools/cephfs_mirror/Watcher.h create mode 100644 src/tools/cephfs_mirror/aio_utils.h create mode 100644 src/tools/cephfs_mirror/watcher/RewatchRequest.cc create mode 100644 src/tools/cephfs_mirror/watcher/RewatchRequest.h diff --git a/src/tools/cephfs_mirror/Watcher.cc b/src/tools/cephfs_mirror/Watcher.cc new file mode 100644 index 0000000000000..a228d91107781 --- /dev/null +++ b/src/tools/cephfs_mirror/Watcher.cc @@ -0,0 +1,285 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "common/ceph_context.h" +#include "common/debug.h" +#include "common/errno.h" +#include "common/WorkQueue.h" +#include "include/stringify.h" +#include "aio_utils.h" +#include "watcher/RewatchRequest.h" +#include "Watcher.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_cephfs_mirror +#undef dout_prefix +#define dout_prefix *_dout << "cephfs::mirror::Watcher " << __func__ + +using cephfs::mirror::watcher::RewatchRequest; + +namespace cephfs { +namespace mirror { + +namespace { + +struct C_UnwatchAndFlush : public Context { + librados::Rados rados; + Context *on_finish; + bool flushing = false; + int ret_val = 0; + + C_UnwatchAndFlush(librados::IoCtx &ioctx, Context *on_finish) + : rados(ioctx), on_finish(on_finish) { + } + + void complete(int r) override { + if (ret_val == 0 && r < 0) { + ret_val = r; + } + + if (!flushing) { + flushing = true; + + librados::AioCompletion *aio_comp = + librados::Rados::aio_create_completion( + this, &rados_callback); + r = rados.aio_watch_flush(aio_comp); + + ceph_assert(r == 0); + aio_comp->release(); + return; + } + + // ensure our reference to the RadosClient is released prior + // to completing the callback to avoid racing an explicit + // librados shutdown + Context *ctx = on_finish; + r = ret_val; + delete this; + + ctx->complete(r); + } + + void finish(int r) override { + } +}; + +} // anonymous namespace + +Watcher::Watcher(librados::IoCtx &ioctx, std::string_view oid, ContextWQ *work_queue) + : m_oid(oid), + m_ioctx(ioctx), + m_work_queue(work_queue), + m_lock(ceph::make_shared_mutex("cephfs::mirror::snap_watcher")), + m_state(STATE_IDLE), + m_watch_ctx(*this) { +} + +Watcher::~Watcher() { +} + +void Watcher::register_watch(Context *on_finish) { + dout(20) << dendl; + + std::scoped_lock locker(m_lock); + m_state = STATE_REGISTERING; + + on_finish = new C_RegisterWatch(this, on_finish); + librados::AioCompletion *aio_comp = + librados::Rados::aio_create_completion(on_finish, &rados_callback); + int r = m_ioctx.aio_watch(m_oid, aio_comp, &m_watch_handle, &m_watch_ctx); + ceph_assert(r == 0); + aio_comp->release(); +} + +void Watcher::handle_register_watch(int r, Context *on_finish) { + dout(20) << ": r=" << r << dendl; + + bool watch_error = false; + Context *unregister_watch_ctx = nullptr; + { + std::scoped_lock locker(m_lock); + ceph_assert(m_state == STATE_REGISTERING); + + m_state = STATE_IDLE; + if (r < 0) { + derr << ": failed to register watch: " << cpp_strerror(r) << dendl; + m_watch_handle = 0; + } + + if (m_unregister_watch_ctx != nullptr) { + std::swap(unregister_watch_ctx, m_unregister_watch_ctx); + } else if (r == 0 && m_watch_error) { + derr << ": re-registering after watch error" << dendl; + m_state = STATE_REGISTERING; + watch_error = true; + } else { + m_watch_blacklisted = (r == -EBLACKLISTED); + } + } + + on_finish->complete(r); + if (unregister_watch_ctx != nullptr) { + unregister_watch_ctx->complete(0); + } else if (watch_error) { + rewatch(); + } +} + +void Watcher::unregister_watch(Context *on_finish) { + dout(20) << dendl; + + { + std::scoped_lock locker(m_lock); + if (m_state != STATE_IDLE) { + dout(10) << ": delaying unregister -- watch register in progress" << dendl; + ceph_assert(m_unregister_watch_ctx == nullptr); + m_unregister_watch_ctx = new LambdaContext([this, on_finish](int r) { + unregister_watch(on_finish); + }); + return; + } else if (is_registered()) { + // watch is registered -- unwatch + librados::AioCompletion *aio_comp = + librados::Rados::aio_create_completion(new C_UnwatchAndFlush(m_ioctx, on_finish), + &rados_callback); + int r = m_ioctx.aio_unwatch(m_watch_handle, aio_comp); + ceph_assert(r == 0); + aio_comp->release(); + m_watch_handle = 0; + m_watch_blacklisted = false; + return; + } + } + + on_finish->complete(0); +} + +void Watcher::handle_error(uint64_t handle, int err) { + derr << ": handle=" << handle << ": " << cpp_strerror(err) << dendl; + + std::scoped_lock locker(m_lock); + m_watch_error = true; + + if (is_registered()) { + m_state = STATE_REWATCHING; + if (err == -EBLACKLISTED) { + m_watch_blacklisted = true; + } + m_work_queue->queue(new LambdaContext([this] { + rewatch(); + }), 0); + } +} + +void Watcher::rewatch() { + dout(20) << dendl; + + Context *unregister_watch_ctx = nullptr; + { + std::unique_lock locker(m_lock); + ceph_assert(m_state == STATE_REWATCHING); + + if (m_unregister_watch_ctx != nullptr) { + m_state = STATE_IDLE; + std::swap(unregister_watch_ctx, m_unregister_watch_ctx); + } else { + m_watch_error = false; + Context *ctx = new C_CallbackAdapter(this); + auto req = RewatchRequest::create(m_ioctx, m_oid, m_lock, + &m_watch_ctx, &m_watch_handle, ctx); + req->send(); + return; + } + } + + unregister_watch_ctx->complete(0); +} + +void Watcher::handle_rewatch(int r) { + dout(20) << ": r=" << r << dendl; + + bool watch_error = false; + Context *unregister_watch_ctx = nullptr; + { + std::scoped_lock locker(m_lock); + ceph_assert(m_state == STATE_REWATCHING); + + m_watch_blacklisted = false; + if (m_unregister_watch_ctx != nullptr) { + dout(10) << ": skipping rewatch -- unregistering" << dendl; + m_state = STATE_IDLE; + std::swap(unregister_watch_ctx, m_unregister_watch_ctx); + } else if (r == -EBLACKLISTED) { + m_watch_blacklisted = true; + derr << ": client blacklisted" << dendl; + } else if (r == -ENOENT) { + dout(5) << ": object " << m_oid << " does not exist" << dendl; + } else if (r < 0) { + derr << ": failed to rewatch: " << cpp_strerror(r) << dendl; + watch_error = true; + } else if (m_watch_error) { + derr << ": re-registering watch after error" << dendl; + watch_error = true; + } + } + + if (unregister_watch_ctx != nullptr) { + unregister_watch_ctx->complete(0); + return; + } else if (watch_error) { + rewatch(); + return; + } + + Context *ctx = new C_CallbackAdapter(this); + m_work_queue->queue(ctx, r); +} + +void Watcher::handle_rewatch_callback(int r) { + dout(10) << ": r=" << r << dendl; + handle_rewatch_complete(r); + + bool watch_error = false; + Context *unregister_watch_ctx = nullptr; + { + std::scoped_lock locker(m_lock); + ceph_assert(m_state == STATE_REWATCHING); + + if (m_unregister_watch_ctx != nullptr) { + m_state = STATE_IDLE; + std::swap(unregister_watch_ctx, m_unregister_watch_ctx); + } else if (r == -EBLACKLISTED || r == -ENOENT) { + m_state = STATE_IDLE; + } else if (r < 0 || m_watch_error) { + watch_error = true; + } else { + m_state = STATE_IDLE; + } + } + + if (unregister_watch_ctx != nullptr) { + unregister_watch_ctx->complete(0); + } else if (watch_error) { + rewatch(); + } +} + +void Watcher::acknowledge_notify(uint64_t notify_id, uint64_t handle, bufferlist &bl) { + m_ioctx.notify_ack(m_oid, notify_id, handle, bl); +} + +void Watcher::WatchCtx::handle_notify(uint64_t notify_id, uint64_t handle, + uint64_t notifier_id, bufferlist& bl) { + dout(20) << ": notify_id=" << notify_id << ", handle=" << handle + << ", notifier_id=" << notifier_id << dendl; + watcher.handle_notify(notify_id, handle, notifier_id, bl); +} + +void Watcher::WatchCtx::handle_error(uint64_t handle, int err) { + dout(20) << dendl; + watcher.handle_error(handle, err); +} + +} // namespace mirror +} // namespace cephfs diff --git a/src/tools/cephfs_mirror/Watcher.h b/src/tools/cephfs_mirror/Watcher.h new file mode 100644 index 0000000000000..860bfc6f5a84f --- /dev/null +++ b/src/tools/cephfs_mirror/Watcher.h @@ -0,0 +1,102 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPHFS_MIRROR_WATCHER_H +#define CEPHFS_MIRROR_WATCHER_H + +#include + +#include "common/ceph_mutex.h" +#include "include/Context.h" +#include "include/rados/librados.hpp" + +class ContextWQ; + +namespace cephfs { +namespace mirror { + +// generic watcher class -- establish watch on a given rados object +// and invoke handle_notify() when notified. On notify error, try +// to re-establish the watch. Errors during rewatch are notified via +// handle_rewatch_complete(). + +class Watcher { +public: + Watcher(librados::IoCtx &ioctx, std::string_view oid, ContextWQ *work_queue); + virtual ~Watcher(); + + void register_watch(Context *on_finish); + void unregister_watch(Context *on_finish); + +protected: + std::string m_oid; + + void acknowledge_notify(uint64_t notify_if, uint64_t handle, bufferlist &bl); + + bool is_registered() const { + return m_state == STATE_IDLE && m_watch_handle != 0; + } + bool is_unregistered() const { + return m_state == STATE_IDLE && m_watch_handle == 0; + } + + virtual void handle_rewatch_complete(int r) { } + +private: + enum State { + STATE_IDLE, + STATE_REGISTERING, + STATE_REWATCHING + }; + + struct WatchCtx : public librados::WatchCtx2 { + Watcher &watcher; + + WatchCtx(Watcher &parent) : watcher(parent) {} + + void handle_notify(uint64_t notify_id, + uint64_t handle, + uint64_t notifier_id, + bufferlist& bl) override; + void handle_error(uint64_t handle, int err) override; + }; + + struct C_RegisterWatch : public Context { + Watcher *watcher; + Context *on_finish; + + C_RegisterWatch(Watcher *watcher, Context *on_finish) + : watcher(watcher), + on_finish(on_finish) { + } + + void finish(int r) override { + watcher->handle_register_watch(r, on_finish); + } + }; + + librados::IoCtx &m_ioctx; + ContextWQ *m_work_queue; + + mutable ceph::shared_mutex m_lock; + State m_state; + bool m_watch_error = false; + bool m_watch_blacklisted = false; + uint64_t m_watch_handle; + WatchCtx m_watch_ctx; + Context *m_unregister_watch_ctx = nullptr; + + virtual void handle_notify(uint64_t notify_id, uint64_t handle, + uint64_t notifier_id, bufferlist& bl) = 0; + void handle_error(uint64_t handle, int err); + + void rewatch(); + void handle_rewatch(int r); + void handle_rewatch_callback(int r); + void handle_register_watch(int r, Context *on_finish); +}; + +} // namespace mirror +} // namespace cephfs + +#endif // CEPHFS_MIRROR_WATCHER_H diff --git a/src/tools/cephfs_mirror/aio_utils.h b/src/tools/cephfs_mirror/aio_utils.h new file mode 100644 index 0000000000000..43f356381ccf8 --- /dev/null +++ b/src/tools/cephfs_mirror/aio_utils.h @@ -0,0 +1,53 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPHFS_MIRROR_AIO_UTILS_H +#define CEPHFS_MIRROR_AIO_UTILS_H + +#include "include/rados/librados.hpp" + +namespace cephfs { +namespace mirror { + +template +void rados_callback(rados_completion_t c, void *arg) { + T *obj = reinterpret_cast(arg); + int r = rados_aio_get_return_value(c); + (obj->*MF)(r); +} + +template +class C_CallbackAdapter : public Context { + T *obj; +public: + C_CallbackAdapter(T *obj) + : obj(obj) { + } + +protected: + void finish(int r) override { + (obj->*MF)(r); + } +}; + +template +struct C_AsyncCallback : public Context { + WQ *op_work_queue; + Context *on_finish; + + C_AsyncCallback(WQ *op_work_queue, Context *on_finish) + : op_work_queue(op_work_queue), on_finish(on_finish) { + } + ~C_AsyncCallback() override { + delete on_finish; + } + void finish(int r) override { + op_work_queue->queue(on_finish, r); + on_finish = nullptr; + } +}; + +} // namespace mirror +} // namespace cephfs + +#endif // CEPHFS_MIRROR_AIO_UTILS_H diff --git a/src/tools/cephfs_mirror/watcher/RewatchRequest.cc b/src/tools/cephfs_mirror/watcher/RewatchRequest.cc new file mode 100644 index 0000000000000..5d12bfc4c24db --- /dev/null +++ b/src/tools/cephfs_mirror/watcher/RewatchRequest.cc @@ -0,0 +1,102 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "common/ceph_mutex.h" +#include "common/debug.h" +#include "common/errno.h" +#include "include/Context.h" +#include "tools/cephfs_mirror/aio_utils.h" +#include "RewatchRequest.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_cephfs_mirror +#undef dout_prefix +#define dout_prefix *_dout << "cephfs::mirror::watcher:RewatchRequest " << __func__ + +namespace cephfs { +namespace mirror { +namespace watcher { + +RewatchRequest::RewatchRequest(librados::IoCtx &ioctx, const std::string &oid, + ceph::shared_mutex &watch_lock, + librados::WatchCtx2 *watch_ctx, + uint64_t *watch_handle, Context *on_finish) + : m_ioctx(ioctx), m_oid(oid), m_lock(watch_lock), + m_watch_ctx(watch_ctx), m_watch_handle(watch_handle), + m_on_finish(on_finish) { +} + +void RewatchRequest::send() { + unwatch(); +} + +void RewatchRequest::unwatch() { + ceph_assert(ceph_mutex_is_wlocked(m_lock)); + if (*m_watch_handle == 0) { + rewatch(); + return; + } + + dout(10) << dendl; + + uint64_t watch_handle = 0; + std::swap(*m_watch_handle, watch_handle); + + librados::AioCompletion *aio_comp = + librados::Rados::aio_create_completion( + this, &rados_callback); + int r = m_ioctx.aio_unwatch(watch_handle, aio_comp); + ceph_assert(r == 0); + aio_comp->release(); +} + +void RewatchRequest::handle_unwatch(int r) { + dout(20) << ": r=" << r << dendl; + + if (r == -EBLACKLISTED) { + derr << ": client blacklisted" << dendl; + finish(r); + return; + } else if (r < 0) { + derr << ": failed to unwatch: " << cpp_strerror(r) << dendl; + } + + rewatch(); +} + +void RewatchRequest::rewatch() { + dout(20) << dendl; + + librados::AioCompletion *aio_comp = + librados::Rados::aio_create_completion( + this, &rados_callback); + int r = m_ioctx.aio_watch(m_oid, aio_comp, &m_rewatch_handle, m_watch_ctx); + ceph_assert(r == 0); + aio_comp->release(); +} + +void RewatchRequest::handle_rewatch(int r) { + dout(20) << ": r=" << r << dendl; + + if (r < 0) { + derr << ": failed to watch object: " << cpp_strerror(r) << dendl; + m_rewatch_handle = 0; + } + + { + std::unique_lock locker(m_lock); + *m_watch_handle = m_rewatch_handle; + } + + finish(r); +} + +void RewatchRequest::finish(int r) { + dout(20) << ": r=" << r << dendl; + m_on_finish->complete(r); + delete this; +} + +} // namespace watcher +} // namespace mirror +} // namespace cephfs diff --git a/src/tools/cephfs_mirror/watcher/RewatchRequest.h b/src/tools/cephfs_mirror/watcher/RewatchRequest.h new file mode 100644 index 0000000000000..453fcb2195710 --- /dev/null +++ b/src/tools/cephfs_mirror/watcher/RewatchRequest.h @@ -0,0 +1,60 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPHFS_MIRROR_WATCHER_REWATCH_REQUEST_H +#define CEPHFS_MIRROR_WATCHER_REWATCH_REQUEST_H + +#include "common/ceph_mutex.h" +#include "include/int_types.h" +#include "include/rados/librados.hpp" + +struct Context; + +namespace cephfs { +namespace mirror { +namespace watcher { + +// Rewatch an existing watch -- the watch can be in an operatioal +// or error state. + +class RewatchRequest { +public: + + static RewatchRequest *create(librados::IoCtx &ioctx, const std::string &oid, + ceph::shared_mutex &watch_lock, + librados::WatchCtx2 *watch_ctx, + uint64_t *watch_handle, Context *on_finish) { + return new RewatchRequest(ioctx, oid, watch_lock, watch_ctx, watch_handle, + on_finish); + } + + RewatchRequest(librados::IoCtx &ioctx, const std::string &oid, + ceph::shared_mutex &watch_lock, librados::WatchCtx2 *watch_ctx, + uint64_t *watch_handle, Context *on_finish); + + void send(); + +private: + librados::IoCtx& m_ioctx; + std::string m_oid; + ceph::shared_mutex &m_lock; + librados::WatchCtx2 *m_watch_ctx; + uint64_t *m_watch_handle; + Context *m_on_finish; + + uint64_t m_rewatch_handle = 0; + + void unwatch(); + void handle_unwatch(int r); + + void rewatch(); + void handle_rewatch(int r); + + void finish(int r); +}; + +} // namespace watcher +} // namespace mirror +} // namespace cephfs + +#endif // CEPHFS_MIRROR_WATCHER_REWATCH_REQUEST_H -- 2.39.5