]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
cephfs-mirror: InstanceWatcher class to register mirror instance
authorVenky Shankar <vshankar@redhat.com>
Thu, 18 Jun 2020 05:30:55 +0000 (01:30 -0400)
committerVenky Shankar <vshankar@redhat.com>
Tue, 1 Sep 2020 10:58:10 +0000 (06:58 -0400)
... and watch for notifications via instance object.

Signed-off-by: Venky Shankar <vshankar@redhat.com>
src/tools/cephfs_mirror/InstanceWatcher.cc [new file with mode: 0644]
src/tools/cephfs_mirror/InstanceWatcher.h [new file with mode: 0644]

diff --git a/src/tools/cephfs_mirror/InstanceWatcher.cc b/src/tools/cephfs_mirror/InstanceWatcher.cc
new file mode 100644 (file)
index 0000000..4596982
--- /dev/null
@@ -0,0 +1,235 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "common/ceph_context.h"
+#include "common/ceph_json.h"
+#include "common/debug.h"
+#include "common/errno.h"
+#include "common/WorkQueue.h"
+#include "cls/cephfs/cls_cephfs_client.h"
+#include "include/stringify.h"
+#include "aio_utils.h"
+#include "InstanceWatcher.h"
+#include "Types.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_cephfs_mirror
+#undef dout_prefix
+#define dout_prefix *_dout << "cephfs::mirror::InstanceWatcher " << __func__
+
+namespace cephfs {
+namespace mirror {
+
+namespace {
+
+std::string instance_oid(const std::string &instance_id) {
+  return CEPHFS_MIRROR_OBJECT + "." + instance_id;
+}
+
+} // anonymous namespace
+
+InstanceWatcher::InstanceWatcher(librados::IoCtx &ioctx,
+                                 Listener &listener, ContextWQ *work_queue)
+  : Watcher(ioctx, instance_oid(stringify(ioctx.get_instance_id())), work_queue),
+    m_ioctx(ioctx),
+    m_listener(listener),
+    m_work_queue(work_queue),
+    m_lock(ceph::make_mutex("cephfs::mirror::instance_watcher")) {
+}
+
+InstanceWatcher::~InstanceWatcher() {
+}
+
+void InstanceWatcher::init(Context *on_finish) {
+  dout(20) << dendl;
+
+  {
+    std::scoped_lock locker(m_lock);
+    ceph_assert(m_on_init_finish == nullptr);
+    m_on_init_finish = new LambdaContext([this, on_finish](int r) {
+                                           on_finish->complete(r);
+                                           if (m_on_shutdown_finish != nullptr) {
+                                             m_on_shutdown_finish->complete(0);
+                                           }
+                                         });
+  }
+
+  create_instance();
+}
+
+void InstanceWatcher::shutdown(Context *on_finish) {
+  dout(20) << dendl;
+
+  {
+    std::scoped_lock locker(m_lock);
+    ceph_assert(m_on_shutdown_finish == nullptr);
+    if (m_on_init_finish != nullptr) {
+      dout(10) << ": delaying shutdown -- init in progress" << dendl;
+      m_on_shutdown_finish = new LambdaContext([this, on_finish](int r) {
+                                                 m_on_shutdown_finish = nullptr;
+                                                 shutdown(on_finish);
+                                               });
+      return;
+    }
+
+    m_on_shutdown_finish = on_finish;
+  }
+
+  unregister_watcher();
+}
+
+void InstanceWatcher::handle_notify(uint64_t notify_id, uint64_t handle,
+                                    uint64_t notifier_id, bufferlist& bl) {
+  dout(20) << dendl;
+
+  std::string dir_name;
+  std::string mode;
+  try {
+    JSONDecoder jd(bl);
+    JSONDecoder::decode_json("dir_name", dir_name, &jd.parser, true);
+    JSONDecoder::decode_json("mode", mode, &jd.parser, true);
+  } catch (const JSONDecoder::err &e) {
+    derr << ": failed to decode notify json: " << e.what() << dendl;
+  }
+
+  dout(20) << ": notifier_id=" << notifier_id << ", dir_name=" << dir_name
+           << ", mode=" << mode << dendl;
+
+  if (mode == "acquire") {
+    m_listener.acquire_directory(dir_name);
+  } else if (mode == "release") {
+    m_listener.release_directory(dir_name);
+  } else {
+    derr << ": unknown mode" << dendl;
+  }
+
+  bufferlist outbl;
+  acknowledge_notify(notify_id, handle, outbl);
+}
+
+void InstanceWatcher::create_instance() {
+  dout(20) << dendl;
+
+  std::scoped_lock locker(m_lock);
+  librados::ObjectWriteOperation op;
+  op.create(false);
+
+  librados::AioCompletion *aio_comp =
+    librados::Rados::aio_create_completion(
+      this, &rados_callback<InstanceWatcher, &InstanceWatcher::handle_create_instance>);
+  int r = m_ioctx.aio_operate(m_oid, aio_comp, &op);
+  ceph_assert(r == 0);
+  aio_comp->release();
+}
+
+void InstanceWatcher::handle_create_instance(int r) {
+  dout(20) << ": r=" << r << dendl;
+
+  Context *on_init_finish = nullptr;
+  {
+    std::scoped_lock locker(m_lock);
+    if (r < 0) {
+      std::swap(on_init_finish, m_on_init_finish);
+    }
+  }
+
+  if (on_init_finish != nullptr) {
+    on_init_finish->complete(r);
+    return;
+  }
+
+  register_watcher();
+}
+
+void InstanceWatcher::register_watcher() {
+  dout(20) << dendl;
+
+  std::scoped_lock locker(m_lock);
+  Context *on_finish = new C_CallbackAdapter<
+    InstanceWatcher, &InstanceWatcher::handle_register_watcher>(this);
+  register_watch(on_finish);
+}
+
+void InstanceWatcher::handle_register_watcher(int r) {
+  dout(20) << ": r=" << r << dendl;
+
+  Context *on_init_finish = nullptr;
+  {
+    std::scoped_lock locker(m_lock);
+    if (r == 0) {
+      std::swap(on_init_finish, m_on_init_finish);
+    }
+  }
+
+  if (on_init_finish != nullptr) {
+    on_init_finish->complete(r);
+    return;
+  }
+
+  remove_instance();
+}
+
+void InstanceWatcher::unregister_watcher() {
+  dout(20) << dendl;
+
+  std::scoped_lock locker(m_lock);
+  Context *on_finish = new C_CallbackAdapter<
+    InstanceWatcher, &InstanceWatcher::handle_unregister_watcher>(this);
+  unregister_watch(new C_AsyncCallback<ContextWQ>(m_work_queue, on_finish));
+}
+
+void InstanceWatcher::handle_unregister_watcher(int r) {
+  dout(20) << ": r=" << r << dendl;
+
+  Context *on_shutdown_finish = nullptr;
+  {
+    std::scoped_lock locker(m_lock);
+    if (r < 0) {
+      std::swap(on_shutdown_finish, m_on_shutdown_finish);
+    }
+  }
+
+  if (on_shutdown_finish != nullptr) {
+    on_shutdown_finish->complete(r);
+    return;
+  }
+
+  remove_instance();
+}
+
+void InstanceWatcher::remove_instance() {
+  dout(20) << dendl;
+
+  std::scoped_lock locker(m_lock);
+  librados::ObjectWriteOperation op;
+  op.remove();
+
+  librados::AioCompletion *aio_comp =
+    librados::Rados::aio_create_completion(
+      this, &rados_callback<InstanceWatcher, &InstanceWatcher::handle_remove_instance>);
+  int r = m_ioctx.aio_operate(m_oid, aio_comp, &op);
+  ceph_assert(r == 0);
+  aio_comp->release();
+}
+
+void InstanceWatcher::handle_remove_instance(int r) {
+  dout(20) << ": r=" << r << dendl;
+
+  Context *on_init_finish = nullptr;
+  Context *on_shutdown_finish = nullptr;
+  {
+    std::scoped_lock locker(m_lock);
+    std::swap(on_init_finish, m_on_init_finish);
+    std::swap(on_shutdown_finish, m_on_shutdown_finish);
+  }
+
+  if (on_init_finish != nullptr) {
+    on_init_finish->complete(r);
+  }
+  if (on_shutdown_finish != nullptr) {
+    on_shutdown_finish->complete(r);
+  }
+}
+
+} // namespace mirror
+} // namespace cephfs
diff --git a/src/tools/cephfs_mirror/InstanceWatcher.h b/src/tools/cephfs_mirror/InstanceWatcher.h
new file mode 100644 (file)
index 0000000..116ecaf
--- /dev/null
@@ -0,0 +1,71 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPHFS_MIRROR_INSTANCE_WATCHER_H
+#define CEPHFS_MIRROR_INSTANCE_WATCHER_H
+
+#include <string_view>
+
+#include "common/ceph_mutex.h"
+#include "include/Context.h"
+#include "include/rados/librados.hpp"
+#include "Watcher.h"
+
+class ContextWQ;
+
+namespace cephfs {
+namespace mirror {
+
+// watch directory update notifications via per daemon rados
+// object and invoke listener callback.
+
+class InstanceWatcher : public Watcher {
+public:
+  struct Listener {
+    virtual ~Listener() {
+    }
+
+    virtual void acquire_directory(string_view dir_name) = 0;
+    virtual void release_directory(string_view dir_name) = 0;
+  };
+
+  static InstanceWatcher *create(librados::IoCtx &ioctx,
+                                 Listener &listener, ContextWQ *work_queue) {
+    return new InstanceWatcher(ioctx, listener, work_queue);
+  }
+
+  InstanceWatcher(librados::IoCtx &ioctx, Listener &listener, ContextWQ *work_queue);
+  ~InstanceWatcher();
+
+  void init(Context *on_finish);
+  void shutdown(Context *on_finish);
+
+  void handle_notify(uint64_t notify_id, uint64_t handle,
+                     uint64_t notifier_id, bufferlist& bl) override;
+
+private:
+  librados::IoCtx &m_ioctx;
+  Listener &m_listener;
+  ContextWQ *m_work_queue;
+
+  ceph::mutex m_lock;
+  Context *m_on_init_finish = nullptr;
+  Context *m_on_shutdown_finish = nullptr;
+
+  void create_instance();
+  void handle_create_instance(int r);
+
+  void register_watcher();
+  void handle_register_watcher(int r);
+
+  void remove_instance();
+  void handle_remove_instance(int r);
+
+  void unregister_watcher();
+  void handle_unregister_watcher(int r);
+};
+
+} // namespace mirror
+} // namespace cephfs
+
+#endif // CEPHFS_MIRROR_INSTANCE_WATCHER_H