From: Venky Shankar Date: Thu, 18 Jun 2020 05:30:55 +0000 (-0400) Subject: cephfs-mirror: InstanceWatcher class to register mirror instance X-Git-Tag: v16.1.0~1247^2~4 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=c40e5a0478328a99047e52a6f42f582d86cb8ce8;p=ceph.git cephfs-mirror: InstanceWatcher class to register mirror instance ... and watch for notifications via instance object. Signed-off-by: Venky Shankar --- diff --git a/src/tools/cephfs_mirror/InstanceWatcher.cc b/src/tools/cephfs_mirror/InstanceWatcher.cc new file mode 100644 index 000000000000..459698266f10 --- /dev/null +++ b/src/tools/cephfs_mirror/InstanceWatcher.cc @@ -0,0 +1,235 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "common/ceph_context.h" +#include "common/ceph_json.h" +#include "common/debug.h" +#include "common/errno.h" +#include "common/WorkQueue.h" +#include "cls/cephfs/cls_cephfs_client.h" +#include "include/stringify.h" +#include "aio_utils.h" +#include "InstanceWatcher.h" +#include "Types.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_cephfs_mirror +#undef dout_prefix +#define dout_prefix *_dout << "cephfs::mirror::InstanceWatcher " << __func__ + +namespace cephfs { +namespace mirror { + +namespace { + +std::string instance_oid(const std::string &instance_id) { + return CEPHFS_MIRROR_OBJECT + "." + instance_id; +} + +} // anonymous namespace + +InstanceWatcher::InstanceWatcher(librados::IoCtx &ioctx, + Listener &listener, ContextWQ *work_queue) + : Watcher(ioctx, instance_oid(stringify(ioctx.get_instance_id())), work_queue), + m_ioctx(ioctx), + m_listener(listener), + m_work_queue(work_queue), + m_lock(ceph::make_mutex("cephfs::mirror::instance_watcher")) { +} + +InstanceWatcher::~InstanceWatcher() { +} + +void InstanceWatcher::init(Context *on_finish) { + dout(20) << dendl; + + { + std::scoped_lock locker(m_lock); + ceph_assert(m_on_init_finish == nullptr); + m_on_init_finish = new LambdaContext([this, on_finish](int r) { + on_finish->complete(r); + if (m_on_shutdown_finish != nullptr) { + m_on_shutdown_finish->complete(0); + } + }); + } + + create_instance(); +} + +void InstanceWatcher::shutdown(Context *on_finish) { + dout(20) << dendl; + + { + std::scoped_lock locker(m_lock); + ceph_assert(m_on_shutdown_finish == nullptr); + if (m_on_init_finish != nullptr) { + dout(10) << ": delaying shutdown -- init in progress" << dendl; + m_on_shutdown_finish = new LambdaContext([this, on_finish](int r) { + m_on_shutdown_finish = nullptr; + shutdown(on_finish); + }); + return; + } + + m_on_shutdown_finish = on_finish; + } + + unregister_watcher(); +} + +void InstanceWatcher::handle_notify(uint64_t notify_id, uint64_t handle, + uint64_t notifier_id, bufferlist& bl) { + dout(20) << dendl; + + std::string dir_name; + std::string mode; + try { + JSONDecoder jd(bl); + JSONDecoder::decode_json("dir_name", dir_name, &jd.parser, true); + JSONDecoder::decode_json("mode", mode, &jd.parser, true); + } catch (const JSONDecoder::err &e) { + derr << ": failed to decode notify json: " << e.what() << dendl; + } + + dout(20) << ": notifier_id=" << notifier_id << ", dir_name=" << dir_name + << ", mode=" << mode << dendl; + + if (mode == "acquire") { + m_listener.acquire_directory(dir_name); + } else if (mode == "release") { + m_listener.release_directory(dir_name); + } else { + derr << ": unknown mode" << dendl; + } + + bufferlist outbl; + acknowledge_notify(notify_id, handle, outbl); +} + +void InstanceWatcher::create_instance() { + dout(20) << dendl; + + std::scoped_lock locker(m_lock); + librados::ObjectWriteOperation op; + op.create(false); + + librados::AioCompletion *aio_comp = + librados::Rados::aio_create_completion( + this, &rados_callback); + int r = m_ioctx.aio_operate(m_oid, aio_comp, &op); + ceph_assert(r == 0); + aio_comp->release(); +} + +void InstanceWatcher::handle_create_instance(int r) { + dout(20) << ": r=" << r << dendl; + + Context *on_init_finish = nullptr; + { + std::scoped_lock locker(m_lock); + if (r < 0) { + std::swap(on_init_finish, m_on_init_finish); + } + } + + if (on_init_finish != nullptr) { + on_init_finish->complete(r); + return; + } + + register_watcher(); +} + +void InstanceWatcher::register_watcher() { + dout(20) << dendl; + + std::scoped_lock locker(m_lock); + Context *on_finish = new C_CallbackAdapter< + InstanceWatcher, &InstanceWatcher::handle_register_watcher>(this); + register_watch(on_finish); +} + +void InstanceWatcher::handle_register_watcher(int r) { + dout(20) << ": r=" << r << dendl; + + Context *on_init_finish = nullptr; + { + std::scoped_lock locker(m_lock); + if (r == 0) { + std::swap(on_init_finish, m_on_init_finish); + } + } + + if (on_init_finish != nullptr) { + on_init_finish->complete(r); + return; + } + + remove_instance(); +} + +void InstanceWatcher::unregister_watcher() { + dout(20) << dendl; + + std::scoped_lock locker(m_lock); + Context *on_finish = new C_CallbackAdapter< + InstanceWatcher, &InstanceWatcher::handle_unregister_watcher>(this); + unregister_watch(new C_AsyncCallback(m_work_queue, on_finish)); +} + +void InstanceWatcher::handle_unregister_watcher(int r) { + dout(20) << ": r=" << r << dendl; + + Context *on_shutdown_finish = nullptr; + { + std::scoped_lock locker(m_lock); + if (r < 0) { + std::swap(on_shutdown_finish, m_on_shutdown_finish); + } + } + + if (on_shutdown_finish != nullptr) { + on_shutdown_finish->complete(r); + return; + } + + remove_instance(); +} + +void InstanceWatcher::remove_instance() { + dout(20) << dendl; + + std::scoped_lock locker(m_lock); + librados::ObjectWriteOperation op; + op.remove(); + + librados::AioCompletion *aio_comp = + librados::Rados::aio_create_completion( + this, &rados_callback); + int r = m_ioctx.aio_operate(m_oid, aio_comp, &op); + ceph_assert(r == 0); + aio_comp->release(); +} + +void InstanceWatcher::handle_remove_instance(int r) { + dout(20) << ": r=" << r << dendl; + + Context *on_init_finish = nullptr; + Context *on_shutdown_finish = nullptr; + { + std::scoped_lock locker(m_lock); + std::swap(on_init_finish, m_on_init_finish); + std::swap(on_shutdown_finish, m_on_shutdown_finish); + } + + if (on_init_finish != nullptr) { + on_init_finish->complete(r); + } + if (on_shutdown_finish != nullptr) { + on_shutdown_finish->complete(r); + } +} + +} // namespace mirror +} // namespace cephfs diff --git a/src/tools/cephfs_mirror/InstanceWatcher.h b/src/tools/cephfs_mirror/InstanceWatcher.h new file mode 100644 index 000000000000..116ecafda053 --- /dev/null +++ b/src/tools/cephfs_mirror/InstanceWatcher.h @@ -0,0 +1,71 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPHFS_MIRROR_INSTANCE_WATCHER_H +#define CEPHFS_MIRROR_INSTANCE_WATCHER_H + +#include + +#include "common/ceph_mutex.h" +#include "include/Context.h" +#include "include/rados/librados.hpp" +#include "Watcher.h" + +class ContextWQ; + +namespace cephfs { +namespace mirror { + +// watch directory update notifications via per daemon rados +// object and invoke listener callback. + +class InstanceWatcher : public Watcher { +public: + struct Listener { + virtual ~Listener() { + } + + virtual void acquire_directory(string_view dir_name) = 0; + virtual void release_directory(string_view dir_name) = 0; + }; + + static InstanceWatcher *create(librados::IoCtx &ioctx, + Listener &listener, ContextWQ *work_queue) { + return new InstanceWatcher(ioctx, listener, work_queue); + } + + InstanceWatcher(librados::IoCtx &ioctx, Listener &listener, ContextWQ *work_queue); + ~InstanceWatcher(); + + void init(Context *on_finish); + void shutdown(Context *on_finish); + + void handle_notify(uint64_t notify_id, uint64_t handle, + uint64_t notifier_id, bufferlist& bl) override; + +private: + librados::IoCtx &m_ioctx; + Listener &m_listener; + ContextWQ *m_work_queue; + + ceph::mutex m_lock; + Context *m_on_init_finish = nullptr; + Context *m_on_shutdown_finish = nullptr; + + void create_instance(); + void handle_create_instance(int r); + + void register_watcher(); + void handle_register_watcher(int r); + + void remove_instance(); + void handle_remove_instance(int r); + + void unregister_watcher(); + void handle_unregister_watcher(int r); +}; + +} // namespace mirror +} // namespace cephfs + +#endif // CEPHFS_MIRROR_INSTANCE_WATCHER_H