]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
cephfs-mirror: Watcher class to receive/ack watch notifications
authorVenky Shankar <vshankar@redhat.com>
Thu, 18 Jun 2020 04:55:57 +0000 (00:55 -0400)
committerVenky Shankar <vshankar@redhat.com>
Tue, 1 Sep 2020 10:58:10 +0000 (06:58 -0400)
Signed-off-by: Venky Shankar <vshankar@redhat.com>
src/tools/cephfs_mirror/Watcher.cc [new file with mode: 0644]
src/tools/cephfs_mirror/Watcher.h [new file with mode: 0644]
src/tools/cephfs_mirror/aio_utils.h [new file with mode: 0644]
src/tools/cephfs_mirror/watcher/RewatchRequest.cc [new file with mode: 0644]
src/tools/cephfs_mirror/watcher/RewatchRequest.h [new file with mode: 0644]

diff --git a/src/tools/cephfs_mirror/Watcher.cc b/src/tools/cephfs_mirror/Watcher.cc
new file mode 100644 (file)
index 0000000..a228d91
--- /dev/null
@@ -0,0 +1,285 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "common/ceph_context.h"
+#include "common/debug.h"
+#include "common/errno.h"
+#include "common/WorkQueue.h"
+#include "include/stringify.h"
+#include "aio_utils.h"
+#include "watcher/RewatchRequest.h"
+#include "Watcher.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_cephfs_mirror
+#undef dout_prefix
+#define dout_prefix *_dout << "cephfs::mirror::Watcher " << __func__
+
+using cephfs::mirror::watcher::RewatchRequest;
+
+namespace cephfs {
+namespace mirror {
+
+namespace {
+
+struct C_UnwatchAndFlush : public Context {
+  librados::Rados rados;
+  Context *on_finish;
+  bool flushing = false;
+  int ret_val = 0;
+
+  C_UnwatchAndFlush(librados::IoCtx &ioctx, Context *on_finish)
+    : rados(ioctx), on_finish(on_finish) {
+  }
+
+  void complete(int r) override {
+    if (ret_val == 0 && r < 0) {
+      ret_val = r;
+    }
+
+    if (!flushing) {
+      flushing = true;
+
+      librados::AioCompletion *aio_comp =
+        librados::Rados::aio_create_completion(
+          this, &rados_callback<Context, &Context::complete>);
+      r = rados.aio_watch_flush(aio_comp);
+
+      ceph_assert(r == 0);
+      aio_comp->release();
+      return;
+    }
+
+    // ensure our reference to the RadosClient is released prior
+    // to completing the callback to avoid racing an explicit
+    // librados shutdown
+    Context *ctx = on_finish;
+    r = ret_val;
+    delete this;
+
+    ctx->complete(r);
+  }
+
+  void finish(int r) override {
+  }
+};
+
+} // anonymous namespace
+
+Watcher::Watcher(librados::IoCtx &ioctx, std::string_view oid, ContextWQ *work_queue)
+  : m_oid(oid),
+    m_ioctx(ioctx),
+    m_work_queue(work_queue),
+    m_lock(ceph::make_shared_mutex("cephfs::mirror::snap_watcher")),
+    m_state(STATE_IDLE),
+    m_watch_ctx(*this) {
+}
+
+Watcher::~Watcher() {
+}
+
+void Watcher::register_watch(Context *on_finish) {
+  dout(20) << dendl;
+
+  std::scoped_lock locker(m_lock);
+  m_state = STATE_REGISTERING;
+
+  on_finish = new C_RegisterWatch(this, on_finish);
+  librados::AioCompletion *aio_comp =
+    librados::Rados::aio_create_completion(on_finish, &rados_callback<Context, &Context::complete>);
+  int r = m_ioctx.aio_watch(m_oid, aio_comp, &m_watch_handle, &m_watch_ctx);
+  ceph_assert(r == 0);
+  aio_comp->release();
+}
+
+void Watcher::handle_register_watch(int r, Context *on_finish) {
+  dout(20) << ": r=" << r << dendl;
+
+  bool watch_error = false;
+  Context *unregister_watch_ctx = nullptr;
+  {
+    std::scoped_lock locker(m_lock);
+    ceph_assert(m_state == STATE_REGISTERING);
+
+    m_state = STATE_IDLE;
+    if (r < 0) {
+      derr << ": failed to register watch: " << cpp_strerror(r) << dendl;
+      m_watch_handle = 0;
+    }
+
+    if (m_unregister_watch_ctx != nullptr) {
+      std::swap(unregister_watch_ctx, m_unregister_watch_ctx);
+    } else if (r == 0 && m_watch_error) {
+      derr << ": re-registering after watch error" << dendl;
+      m_state = STATE_REGISTERING;
+      watch_error = true;
+    } else {
+      m_watch_blacklisted = (r == -EBLACKLISTED);
+    }
+  }
+
+  on_finish->complete(r);
+  if (unregister_watch_ctx != nullptr) {
+    unregister_watch_ctx->complete(0);
+  } else if (watch_error) {
+    rewatch();
+  }
+}
+
+void Watcher::unregister_watch(Context *on_finish) {
+  dout(20) << dendl;
+
+  {
+    std::scoped_lock locker(m_lock);
+    if (m_state != STATE_IDLE) {
+      dout(10) << ": delaying unregister -- watch register in progress" << dendl;
+      ceph_assert(m_unregister_watch_ctx == nullptr);
+      m_unregister_watch_ctx = new LambdaContext([this, on_finish](int r) {
+                                                   unregister_watch(on_finish);
+                                                 });
+      return;
+    } else if (is_registered()) {
+      // watch is registered -- unwatch
+      librados::AioCompletion *aio_comp =
+        librados::Rados::aio_create_completion(new C_UnwatchAndFlush(m_ioctx, on_finish),
+                                               &rados_callback<Context, &Context::complete>);
+      int r = m_ioctx.aio_unwatch(m_watch_handle, aio_comp);
+      ceph_assert(r == 0);
+      aio_comp->release();
+      m_watch_handle = 0;
+      m_watch_blacklisted = false;
+      return;
+    }
+  }
+
+  on_finish->complete(0);
+}
+
+void Watcher::handle_error(uint64_t handle, int err) {
+  derr << ": handle=" << handle << ": " << cpp_strerror(err) << dendl;
+
+  std::scoped_lock locker(m_lock);
+  m_watch_error = true;
+
+  if (is_registered()) {
+    m_state = STATE_REWATCHING;
+    if (err == -EBLACKLISTED) {
+      m_watch_blacklisted = true;
+    }
+    m_work_queue->queue(new LambdaContext([this] {
+                                            rewatch();
+                                          }), 0);
+  }
+}
+
+void Watcher::rewatch() {
+  dout(20) << dendl;
+
+  Context *unregister_watch_ctx = nullptr;
+  {
+    std::unique_lock locker(m_lock);
+    ceph_assert(m_state == STATE_REWATCHING);
+
+    if (m_unregister_watch_ctx != nullptr) {
+      m_state = STATE_IDLE;
+      std::swap(unregister_watch_ctx, m_unregister_watch_ctx);
+    } else {
+      m_watch_error = false;
+      Context *ctx = new C_CallbackAdapter<Watcher, &Watcher::handle_rewatch>(this);
+      auto req = RewatchRequest::create(m_ioctx, m_oid, m_lock,
+                                        &m_watch_ctx, &m_watch_handle, ctx);
+      req->send();
+      return;
+    }
+  }
+
+  unregister_watch_ctx->complete(0);
+}
+
+void Watcher::handle_rewatch(int r) {
+  dout(20) << ": r=" << r << dendl;
+
+  bool watch_error = false;
+  Context *unregister_watch_ctx = nullptr;
+  {
+    std::scoped_lock locker(m_lock);
+    ceph_assert(m_state == STATE_REWATCHING);
+
+    m_watch_blacklisted = false;
+    if (m_unregister_watch_ctx != nullptr) {
+      dout(10) << ": skipping rewatch -- unregistering" << dendl;
+      m_state = STATE_IDLE;
+      std::swap(unregister_watch_ctx, m_unregister_watch_ctx);
+    } else if (r == -EBLACKLISTED) {
+      m_watch_blacklisted = true;
+      derr << ": client blacklisted" << dendl;
+    } else if (r == -ENOENT) {
+      dout(5) << ": object " << m_oid << " does not exist" << dendl;
+    } else if  (r < 0) {
+      derr << ": failed to rewatch: " << cpp_strerror(r) << dendl;
+      watch_error = true;
+    } else if (m_watch_error) {
+      derr << ": re-registering watch after error" << dendl;
+      watch_error = true;
+    }
+  }
+
+  if (unregister_watch_ctx != nullptr) {
+    unregister_watch_ctx->complete(0);
+    return;
+  } else if (watch_error) {
+    rewatch();
+    return;
+  }
+
+  Context *ctx = new C_CallbackAdapter<Watcher, &Watcher::handle_rewatch_callback>(this);
+  m_work_queue->queue(ctx, r);
+}
+
+void Watcher::handle_rewatch_callback(int r) {
+  dout(10) << ": r=" << r << dendl;
+  handle_rewatch_complete(r);
+
+  bool watch_error = false;
+  Context *unregister_watch_ctx = nullptr;
+  {
+    std::scoped_lock locker(m_lock);
+    ceph_assert(m_state == STATE_REWATCHING);
+
+    if (m_unregister_watch_ctx != nullptr) {
+      m_state = STATE_IDLE;
+      std::swap(unregister_watch_ctx, m_unregister_watch_ctx);
+    } else if (r == -EBLACKLISTED || r == -ENOENT) {
+      m_state = STATE_IDLE;
+    } else if (r < 0 || m_watch_error) {
+      watch_error = true;
+    } else {
+      m_state = STATE_IDLE;
+    }
+  }
+
+  if (unregister_watch_ctx != nullptr) {
+    unregister_watch_ctx->complete(0);
+  } else if (watch_error) {
+    rewatch();
+  }
+}
+
+void Watcher::acknowledge_notify(uint64_t notify_id, uint64_t handle, bufferlist &bl) {
+  m_ioctx.notify_ack(m_oid, notify_id, handle, bl);
+}
+
+void Watcher::WatchCtx::handle_notify(uint64_t notify_id, uint64_t handle,
+                                        uint64_t notifier_id, bufferlist& bl) {
+  dout(20) << ": notify_id=" << notify_id << ", handle=" << handle
+           << ", notifier_id=" << notifier_id << dendl;
+  watcher.handle_notify(notify_id, handle, notifier_id, bl);
+}
+
+void Watcher::WatchCtx::handle_error(uint64_t handle, int err) {
+  dout(20) << dendl;
+  watcher.handle_error(handle, err);
+}
+
+} // namespace mirror
+} // namespace cephfs
diff --git a/src/tools/cephfs_mirror/Watcher.h b/src/tools/cephfs_mirror/Watcher.h
new file mode 100644 (file)
index 0000000..860bfc6
--- /dev/null
@@ -0,0 +1,102 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPHFS_MIRROR_WATCHER_H
+#define CEPHFS_MIRROR_WATCHER_H
+
+#include <string_view>
+
+#include "common/ceph_mutex.h"
+#include "include/Context.h"
+#include "include/rados/librados.hpp"
+
+class ContextWQ;
+
+namespace cephfs {
+namespace mirror {
+
+// generic watcher class -- establish watch on a given rados object
+// and invoke handle_notify() when notified. On notify error, try
+// to re-establish the watch. Errors during rewatch are notified via
+// handle_rewatch_complete().
+
+class Watcher {
+public:
+  Watcher(librados::IoCtx &ioctx, std::string_view oid, ContextWQ *work_queue);
+  virtual ~Watcher();
+
+  void register_watch(Context *on_finish);
+  void unregister_watch(Context *on_finish);
+
+protected:
+  std::string m_oid;
+
+  void acknowledge_notify(uint64_t notify_if, uint64_t handle, bufferlist &bl);
+
+  bool is_registered() const {
+    return m_state == STATE_IDLE && m_watch_handle != 0;
+  }
+  bool is_unregistered() const {
+    return m_state == STATE_IDLE && m_watch_handle == 0;
+  }
+
+  virtual void handle_rewatch_complete(int r) { }
+
+private:
+  enum State {
+    STATE_IDLE,
+    STATE_REGISTERING,
+    STATE_REWATCHING
+  };
+
+  struct WatchCtx : public librados::WatchCtx2 {
+    Watcher &watcher;
+
+    WatchCtx(Watcher &parent) : watcher(parent) {}
+
+    void handle_notify(uint64_t notify_id,
+                       uint64_t handle,
+                       uint64_t notifier_id,
+                       bufferlist& bl) override;
+    void handle_error(uint64_t handle, int err) override;
+  };
+
+  struct C_RegisterWatch : public Context {
+    Watcher *watcher;
+    Context *on_finish;
+
+    C_RegisterWatch(Watcher *watcher, Context *on_finish)
+      : watcher(watcher),
+        on_finish(on_finish) {
+    }
+
+    void finish(int r) override {
+      watcher->handle_register_watch(r, on_finish);
+    }
+  };
+
+  librados::IoCtx &m_ioctx;
+  ContextWQ *m_work_queue;
+
+  mutable ceph::shared_mutex m_lock;
+  State m_state;
+  bool m_watch_error = false;
+  bool m_watch_blacklisted = false;
+  uint64_t m_watch_handle;
+  WatchCtx m_watch_ctx;
+  Context *m_unregister_watch_ctx = nullptr;
+
+  virtual void handle_notify(uint64_t notify_id, uint64_t handle,
+                     uint64_t notifier_id, bufferlist& bl) = 0;
+  void handle_error(uint64_t handle, int err);
+
+  void rewatch();
+  void handle_rewatch(int r);
+  void handle_rewatch_callback(int r);
+  void handle_register_watch(int r, Context *on_finish);
+};
+
+} // namespace mirror
+} // namespace cephfs
+
+#endif // CEPHFS_MIRROR_WATCHER_H
diff --git a/src/tools/cephfs_mirror/aio_utils.h b/src/tools/cephfs_mirror/aio_utils.h
new file mode 100644 (file)
index 0000000..43f3563
--- /dev/null
@@ -0,0 +1,53 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPHFS_MIRROR_AIO_UTILS_H
+#define CEPHFS_MIRROR_AIO_UTILS_H
+
+#include "include/rados/librados.hpp"
+
+namespace cephfs {
+namespace mirror {
+
+template <typename T, void(T::*MF)(int)>
+void rados_callback(rados_completion_t c, void *arg) {
+  T *obj = reinterpret_cast<T*>(arg);
+  int r = rados_aio_get_return_value(c);
+  (obj->*MF)(r);
+}
+
+template <typename T, void (T::*MF)(int)>
+class C_CallbackAdapter : public Context {
+  T *obj;
+public:
+  C_CallbackAdapter(T *obj)
+    : obj(obj) {
+  }
+
+protected:
+  void finish(int r) override {
+    (obj->*MF)(r);
+  }
+};
+
+template <typename WQ>
+struct C_AsyncCallback : public Context {
+  WQ *op_work_queue;
+  Context *on_finish;
+
+  C_AsyncCallback(WQ *op_work_queue, Context *on_finish)
+    : op_work_queue(op_work_queue), on_finish(on_finish) {
+  }
+  ~C_AsyncCallback() override {
+    delete on_finish;
+  }
+  void finish(int r) override {
+    op_work_queue->queue(on_finish, r);
+    on_finish = nullptr;
+  }
+};
+
+} // namespace mirror
+} // namespace cephfs
+
+#endif // CEPHFS_MIRROR_AIO_UTILS_H
diff --git a/src/tools/cephfs_mirror/watcher/RewatchRequest.cc b/src/tools/cephfs_mirror/watcher/RewatchRequest.cc
new file mode 100644 (file)
index 0000000..5d12bfc
--- /dev/null
@@ -0,0 +1,102 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "common/ceph_mutex.h"
+#include "common/debug.h"
+#include "common/errno.h"
+#include "include/Context.h"
+#include "tools/cephfs_mirror/aio_utils.h"
+#include "RewatchRequest.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_cephfs_mirror
+#undef dout_prefix
+#define dout_prefix *_dout << "cephfs::mirror::watcher:RewatchRequest " << __func__
+
+namespace cephfs {
+namespace mirror {
+namespace watcher {
+
+RewatchRequest::RewatchRequest(librados::IoCtx &ioctx, const std::string &oid,
+                               ceph::shared_mutex &watch_lock,
+                               librados::WatchCtx2 *watch_ctx,
+                               uint64_t *watch_handle, Context *on_finish)
+  : m_ioctx(ioctx), m_oid(oid), m_lock(watch_lock),
+    m_watch_ctx(watch_ctx), m_watch_handle(watch_handle),
+    m_on_finish(on_finish) {
+}
+
+void RewatchRequest::send() {
+  unwatch();
+}
+
+void RewatchRequest::unwatch() {
+  ceph_assert(ceph_mutex_is_wlocked(m_lock));
+  if (*m_watch_handle == 0) {
+    rewatch();
+    return;
+  }
+
+  dout(10) << dendl;
+
+  uint64_t watch_handle = 0;
+  std::swap(*m_watch_handle, watch_handle);
+
+  librados::AioCompletion *aio_comp =
+    librados::Rados::aio_create_completion(
+      this, &rados_callback<RewatchRequest, &RewatchRequest::handle_unwatch>);
+  int r = m_ioctx.aio_unwatch(watch_handle, aio_comp);
+  ceph_assert(r == 0);
+  aio_comp->release();
+}
+
+void RewatchRequest::handle_unwatch(int r) {
+  dout(20) << ": r=" << r << dendl;
+
+  if (r == -EBLACKLISTED) {
+    derr << ": client blacklisted" << dendl;
+    finish(r);
+    return;
+  } else if (r < 0) {
+    derr << ": failed to unwatch: " << cpp_strerror(r) << dendl;
+  }
+
+  rewatch();
+}
+
+void RewatchRequest::rewatch() {
+  dout(20) << dendl;
+
+  librados::AioCompletion *aio_comp =
+    librados::Rados::aio_create_completion(
+      this, &rados_callback<RewatchRequest, &RewatchRequest::handle_rewatch>);
+  int r = m_ioctx.aio_watch(m_oid, aio_comp, &m_rewatch_handle, m_watch_ctx);
+  ceph_assert(r == 0);
+  aio_comp->release();
+}
+
+void RewatchRequest::handle_rewatch(int r) {
+  dout(20) << ": r=" << r << dendl;
+
+  if (r < 0) {
+    derr << ": failed to watch object: " << cpp_strerror(r) << dendl;
+    m_rewatch_handle = 0;
+  }
+
+  {
+    std::unique_lock locker(m_lock);
+    *m_watch_handle = m_rewatch_handle;
+  }
+
+  finish(r);
+}
+
+void RewatchRequest::finish(int r) {
+  dout(20) << ": r=" << r << dendl;
+  m_on_finish->complete(r);
+  delete this;
+}
+
+} // namespace watcher
+} // namespace mirror
+} // namespace cephfs
diff --git a/src/tools/cephfs_mirror/watcher/RewatchRequest.h b/src/tools/cephfs_mirror/watcher/RewatchRequest.h
new file mode 100644 (file)
index 0000000..453fcb2
--- /dev/null
@@ -0,0 +1,60 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPHFS_MIRROR_WATCHER_REWATCH_REQUEST_H
+#define CEPHFS_MIRROR_WATCHER_REWATCH_REQUEST_H
+
+#include "common/ceph_mutex.h"
+#include "include/int_types.h"
+#include "include/rados/librados.hpp"
+
+struct Context;
+
+namespace cephfs {
+namespace mirror {
+namespace watcher {
+
+// Rewatch an existing watch -- the watch can be in an operatioal
+// or error state.
+
+class RewatchRequest {
+public:
+
+  static RewatchRequest *create(librados::IoCtx &ioctx, const std::string &oid,
+                                ceph::shared_mutex &watch_lock,
+                                librados::WatchCtx2 *watch_ctx,
+                                uint64_t *watch_handle, Context *on_finish) {
+    return new RewatchRequest(ioctx, oid, watch_lock, watch_ctx, watch_handle,
+                              on_finish);
+  }
+
+  RewatchRequest(librados::IoCtx &ioctx, const std::string &oid,
+                 ceph::shared_mutex &watch_lock, librados::WatchCtx2 *watch_ctx,
+                 uint64_t *watch_handle, Context *on_finish);
+
+  void send();
+
+private:
+  librados::IoCtx& m_ioctx;
+  std::string m_oid;
+  ceph::shared_mutex &m_lock;
+  librados::WatchCtx2 *m_watch_ctx;
+  uint64_t *m_watch_handle;
+  Context *m_on_finish;
+
+  uint64_t m_rewatch_handle = 0;
+
+  void unwatch();
+  void handle_unwatch(int r);
+
+  void rewatch();
+  void handle_rewatch(int r);
+
+  void finish(int r);
+};
+
+} // namespace watcher
+} // namespace mirror
+} // namespace cephfs
+
+#endif // CEPHFS_MIRROR_WATCHER_REWATCH_REQUEST_H