From: Ricardo Dias Date: Fri, 4 Nov 2016 02:22:59 +0000 (+0000) Subject: rbd: Implementation of a generic object watcher class X-Git-Tag: v12.0.0~225^2~2^2~5 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=1eb2ebcce27ff2d88c2dafa9f96d013531ffb9f1;p=ceph.git rbd: Implementation of a generic object watcher class Signed-off-by: Ricardo Dias --- diff --git a/src/librbd/CMakeLists.txt b/src/librbd/CMakeLists.txt index 10f57cd73c69..554544e36349 100644 --- a/src/librbd/CMakeLists.txt +++ b/src/librbd/CMakeLists.txt @@ -29,6 +29,9 @@ set(librbd_internal_srcs Utils.cc cache/ImageWriteback.cc cache/PassthroughImageCache.cc + Watcher.cc + watcher/Types.cc + watcher/RewatchRequest.cc exclusive_lock/AcquireRequest.cc exclusive_lock/AutomaticPolicy.cc exclusive_lock/ReacquireRequest.cc @@ -64,7 +67,7 @@ set(librbd_internal_srcs object_map/SnapshotRollbackRequest.cc object_map/UnlockRequest.cc object_map/UpdateRequest.cc - object_watcher/Notifier.cc + watcher/Notifier.cc operation/DisableFeaturesRequest.cc operation/EnableFeaturesRequest.cc operation/FlattenRequest.cc diff --git a/src/librbd/Operations.cc b/src/librbd/Operations.cc index 3d4dd0d479f5..b83e1c6f4e58 100644 --- a/src/librbd/Operations.cc +++ b/src/librbd/Operations.cc @@ -578,6 +578,9 @@ void Operations::execute_rename(const std::string &dest_name, // unregister watch before and register back after rename on_finish = new C_NotifyUpdate(m_image_ctx, on_finish); on_finish = new FunctionContext([this, on_finish](int r) { + if (m_image_ctx.old_format) { + m_image_ctx.image_watcher->set_oid(m_image_ctx.header_oid); + } m_image_ctx.image_watcher->register_watch(on_finish); }); on_finish = new FunctionContext([this, dest_name, on_finish](int r) { diff --git a/src/librbd/Watcher.cc b/src/librbd/Watcher.cc new file mode 100644 index 000000000000..f145317584b6 --- /dev/null +++ b/src/librbd/Watcher.cc @@ -0,0 +1,228 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "librbd/Watcher.h" +#include "librbd/watcher/RewatchRequest.h" +#include "librbd/Utils.h" +#include "librbd/TaskFinisher.h" +#include "include/encoding.h" +#include "common/errno.h" +#include "common/WorkQueue.h" +#include + +#define dout_subsys ceph_subsys_rbd +#undef dout_prefix +#define dout_prefix *_dout << "librbd::Watcher: " + +namespace librbd { + +using namespace watcher; + +using util::create_context_callback; +using util::create_rados_safe_callback; +using std::string; + +namespace { + +struct C_UnwatchAndFlush : public Context { + librados::Rados rados; + Context *on_finish; + bool flushing = false; + int ret_val = 0; + + C_UnwatchAndFlush(librados::IoCtx &io_ctx, Context *on_finish) + : rados(io_ctx), on_finish(on_finish) { + } + + virtual void complete(int r) override { + if (ret_val == 0 && r < 0) { + ret_val = r; + } + + if (!flushing) { + flushing = true; + + librados::AioCompletion *aio_comp = create_rados_safe_callback(this); + r = rados.aio_watch_flush(aio_comp); + assert(r == 0); + aio_comp->release(); + return; + } + + // ensure our reference to the RadosClient is released prior + // to completing the callback to avoid racing an explicit + // librados shutdown + Context *ctx = on_finish; + r = ret_val; + delete this; + + ctx->complete(r); + } + + virtual void finish(int r) override { + } +}; + +} // anonymous namespace + +Watcher::Watcher(librados::IoCtx& ioctx, ContextWQ *work_queue, + const string& oid) + : m_cct(reinterpret_cast(ioctx.cct())), + m_watch_lock(util::unique_lock_name("librbd::Watcher::m_watch_lock", this)), + m_watch_handle(0), m_notifier(work_queue, ioctx, oid), + m_watch_state(WATCH_STATE_UNREGISTERED), m_ioctx(ioctx), + m_work_queue(work_queue), m_oid(oid), m_watch_ctx(*this) +{ +} + +Watcher::~Watcher() +{ + RWLock::RLocker l(m_watch_lock); + assert(m_watch_state != WATCH_STATE_REGISTERED); +} + +void Watcher::register_watch(Context *on_finish) { + ldout(m_cct, 10) << this << " registering watcher" << dendl; + + RWLock::RLocker watch_locker(m_watch_lock); + assert(m_watch_state == WATCH_STATE_UNREGISTERED); + librados::AioCompletion *aio_comp = create_rados_safe_callback( + new C_RegisterWatch(this, on_finish)); + int r = m_ioctx.aio_watch(m_oid, aio_comp, &m_watch_handle, &m_watch_ctx); + assert(r == 0); + aio_comp->release(); +} + +void Watcher::handle_register_watch(int r) { + ldout(m_cct, 10) << this << " handle register r=" << r << dendl; + RWLock::WLocker watch_locker(m_watch_lock); + assert(m_watch_state == WATCH_STATE_UNREGISTERED); + if (r < 0) { + lderr(m_cct) << ": failed to register watch: " << cpp_strerror(r) << dendl; + m_watch_handle = 0; + } else if (r >= 0) { + m_watch_state = WATCH_STATE_REGISTERED; + } +} + +void Watcher::unregister_watch(Context *on_finish) { + ldout(m_cct, 10) << this << " unregistering watcher" << dendl; + + RWLock::WLocker watch_locker(m_watch_lock); + if (m_watch_state == WATCH_STATE_REWATCHING) { + ldout(m_cct, 10) << this << " delaying unregister until rewatch completed" + << dendl; + + assert(m_unregister_watch_ctx == nullptr); + m_unregister_watch_ctx = new FunctionContext([this, on_finish](int r) { + unregister_watch(on_finish); + }); + return; + } + + if (m_watch_state == WATCH_STATE_REGISTERED || + m_watch_state == WATCH_STATE_ERROR) { + m_watch_state = WATCH_STATE_UNREGISTERED; + + librados::AioCompletion *aio_comp = create_rados_safe_callback( + new C_UnwatchAndFlush(m_ioctx, on_finish)); + int r = m_ioctx.aio_unwatch(m_watch_handle, aio_comp); + assert(r == 0); + aio_comp->release(); + } +} + +void Watcher::flush(Context *on_finish) { + m_notifier.flush(on_finish); +} + +void Watcher::set_oid(const string& oid) { + RWLock::WLocker l(m_watch_lock); + assert(m_watch_state == WATCH_STATE_UNREGISTERED); + + m_oid = oid; +} + +void Watcher::handle_error(uint64_t handle, int err) { + lderr(m_cct) << this << " watch failed: " << handle << ", " + << cpp_strerror(err) << dendl; + + RWLock::WLocker l(m_watch_lock); + if (m_watch_state == WATCH_STATE_REGISTERED) { + m_watch_state = WATCH_STATE_ERROR; + + FunctionContext *ctx = new FunctionContext( + boost::bind(&Watcher::rewatch, this)); + m_work_queue->queue(ctx); + } +} + +void Watcher::acknowledge_notify(uint64_t notify_id, uint64_t handle, + bufferlist &out) { + m_ioctx.notify_ack(m_oid, notify_id, handle, out); +} + +void Watcher::rewatch() { + ldout(m_cct, 10) << this << " re-registering watch" << dendl; + + RWLock::WLocker l(m_watch_lock); + if (m_watch_state != WATCH_STATE_ERROR) { + return; + } + m_watch_state = WATCH_STATE_REWATCHING; + + Context *ctx = create_context_callback(this); + RewatchRequest *req = RewatchRequest::create(m_ioctx, m_oid, m_watch_lock, + &m_watch_ctx, + &m_watch_handle, ctx); + req->send(); +} + +void Watcher::handle_rewatch(int r) { + ldout(m_cct, 10) << this << " " << __func__ << ": r=" << r << dendl; + + WatchState next_watch_state = WATCH_STATE_REGISTERED; + if (r < 0) { + // only EBLACKLISTED or ENOENT can be returned + assert(r == -EBLACKLISTED || r == -ENOENT); + next_watch_state = WATCH_STATE_UNREGISTERED; + } + + Context *unregister_watch_ctx = nullptr; + { + RWLock::WLocker watch_locker(m_watch_lock); + assert(m_watch_state == WATCH_STATE_REWATCHING); + m_watch_state = next_watch_state; + + std::swap(unregister_watch_ctx, m_unregister_watch_ctx); + + handle_rewatch_complete(r); + m_work_queue->queue( + create_context_callback(this)); + } + + // wake up pending unregister request + if (unregister_watch_ctx != nullptr) { + unregister_watch_ctx->complete(0); + } +} + +void Watcher::send_notify(bufferlist& payload, bufferlist *out_bl, + Context *on_finish) { + m_notifier.notify(payload, out_bl, on_finish); +} + +void Watcher::WatchCtx::handle_notify(uint64_t notify_id, + uint64_t handle, + uint64_t notifier_id, + bufferlist& bl) { + watcher.handle_notify(notify_id, handle, bl); +} + +void Watcher::WatchCtx::handle_error(uint64_t handle, int err) { + watcher.handle_error(handle, err); +} + +} // namespace librbd diff --git a/src/librbd/Watcher.h b/src/librbd/Watcher.h new file mode 100644 index 000000000000..21a59c886f18 --- /dev/null +++ b/src/librbd/Watcher.h @@ -0,0 +1,143 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_WATCHER_H +#define CEPH_LIBRBD_WATCHER_H + +#include "common/Mutex.h" +#include "common/RWLock.h" +#include "include/rados/librados.hpp" +#include "librbd/watcher/Notifier.h" +#include "librbd/watcher/Types.h" +#include +#include + +class ContextWQ; + +namespace librbd { + +class Watcher { + friend struct watcher::C_NotifyAck; + +public: + Watcher(librados::IoCtx& ioctx, ContextWQ *work_queue, + const std::string& oid); + virtual ~Watcher(); + + void register_watch(Context *on_finish); + void unregister_watch(Context *on_finish); + void flush(Context *on_finish); + + void set_oid(const string& oid); + + uint64_t get_watch_handle() const { + RWLock::RLocker watch_locker(m_watch_lock); + return m_watch_handle; + } + + bool is_registered() const { + RWLock::RLocker locker(m_watch_lock); + return m_watch_state == WATCH_STATE_REGISTERED; + } + +protected: + enum WatchState { + WATCH_STATE_UNREGISTERED, + WATCH_STATE_REGISTERED, + WATCH_STATE_ERROR, + WATCH_STATE_REWATCHING + }; + + CephContext *m_cct; + mutable RWLock m_watch_lock; + uint64_t m_watch_handle; + watcher::Notifier m_notifier; + WatchState m_watch_state; + + void send_notify(bufferlist &payload, bufferlist *out_bl = nullptr, + Context *on_finish = nullptr); + + virtual void handle_notify(uint64_t notify_id, uint64_t handle, + bufferlist &bl) = 0; + + virtual void handle_error(uint64_t cookie, int err); + + void acknowledge_notify(uint64_t notify_id, uint64_t handle, + bufferlist &out); + + virtual void handle_rewatch_complete(int r) { } + +private: + /** + * @verbatim + * + * + * | + * v + * UNREGISTERED + * | + * | (register_watch) + * | + * v (watch error) + * REGISTERED * * * * * * * > ERROR + * | ^ | + * | | | (rewatch) + * | | v + * | | REWATCHING + * | | | + * | | | + * | \---------------------/ + * | + * | (unregister_watch) + * | + * v + * UNREGISTERED + * | + * v + * + * + * @endverbatim + */ + + struct WatchCtx : public librados::WatchCtx2 { + Watcher &watcher; + + WatchCtx(Watcher &parent) : watcher(parent) {} + + virtual void handle_notify(uint64_t notify_id, + uint64_t handle, + uint64_t notifier_id, + bufferlist& bl); + virtual void handle_error(uint64_t handle, int err); + }; + + struct C_RegisterWatch : public Context { + Watcher *watcher; + Context *on_finish; + + C_RegisterWatch(Watcher *watcher, Context *on_finish) + : watcher(watcher), on_finish(on_finish) { + } + virtual void finish(int r) override { + watcher->handle_register_watch(r); + on_finish->complete(r); + } + }; + + librados::IoCtx& m_ioctx; + ContextWQ *m_work_queue; + std::string m_oid; + + WatchCtx m_watch_ctx; + Context *m_unregister_watch_ctx = nullptr; + + void handle_register_watch(int r); + + void rewatch(); + void handle_rewatch(int r); + +}; + +} // namespace librbd + +#endif // CEPH_LIBRBD_WATCHER_H diff --git a/src/librbd/object_watcher/Notifier.cc b/src/librbd/object_watcher/Notifier.cc deleted file mode 100644 index 019c786ee62c..000000000000 --- a/src/librbd/object_watcher/Notifier.cc +++ /dev/null @@ -1,77 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab - -#include "librbd/object_watcher/Notifier.h" -#include "common/WorkQueue.h" -#include "librbd/ImageCtx.h" -#include "librbd/Utils.h" - -#define dout_subsys ceph_subsys_rbd -#undef dout_prefix -#define dout_prefix *_dout << "librbd::object_watcher::Notifier: " - -namespace librbd { -namespace object_watcher { - -const uint64_t Notifier::NOTIFY_TIMEOUT = 5000; - -Notifier::Notifier(ContextWQ *work_queue, IoCtx &ioctx, const std::string &oid) - : m_work_queue(work_queue), m_oid(oid), - m_aio_notify_lock(util::unique_lock_name( - "librbd::object_watcher::Notifier::m_aio_notify_lock", this)) { - m_ioctx.dup(ioctx); - m_cct = reinterpret_cast(m_ioctx.cct()); -} - -Notifier::~Notifier() { - Mutex::Locker aio_notify_locker(m_aio_notify_lock); - assert(m_pending_aio_notifies == 0); -} - -void Notifier::flush(Context *on_finish) { - Mutex::Locker aio_notify_locker(m_aio_notify_lock); - if (m_pending_aio_notifies == 0) { - m_work_queue->queue(on_finish, 0); - return; - } - - m_aio_notify_flush_ctxs.push_back(on_finish); -} - -void Notifier::notify(bufferlist &bl, bufferlist *out_bl, Context *on_finish) { - { - Mutex::Locker aio_notify_locker(m_aio_notify_lock); - ++m_pending_aio_notifies; - - ldout(m_cct, 20) << __func__ << ": pending=" << m_pending_aio_notifies - << dendl; - } - - C_AioNotify *ctx = new C_AioNotify(this, on_finish); - librados::AioCompletion *comp = util::create_rados_ack_callback(ctx); - int r = m_ioctx.aio_notify(m_oid, comp, bl, NOTIFY_TIMEOUT, out_bl); - assert(r == 0); - comp->release(); -} - -void Notifier::handle_notify(int r, Context *on_finish) { - if (on_finish != nullptr) { - m_work_queue->queue(on_finish, r); - } - - Mutex::Locker aio_notify_locker(m_aio_notify_lock); - assert(m_pending_aio_notifies > 0); - --m_pending_aio_notifies; - - ldout(m_cct, 20) << __func__ << ": pending=" << m_pending_aio_notifies - << dendl; - if (m_pending_aio_notifies == 0) { - for (auto ctx : m_aio_notify_flush_ctxs) { - m_work_queue->queue(ctx, 0); - } - m_aio_notify_flush_ctxs.clear(); - } -} - -} // namespace object_watcher -} // namespace librbd diff --git a/src/librbd/object_watcher/Notifier.h b/src/librbd/object_watcher/Notifier.h deleted file mode 100644 index bfe996eee74d..000000000000 --- a/src/librbd/object_watcher/Notifier.h +++ /dev/null @@ -1,61 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab - -#ifndef CEPH_LIBRBD_OBJECT_WATCHER_NOTIFIER_H -#define CEPH_LIBRBD_OBJECT_WATCHER_NOTIFIER_H - -#include "include/int_types.h" -#include "include/buffer_fwd.h" -#include "include/Context.h" -#include "include/rados/librados.hpp" -#include "common/Mutex.h" -#include "common/WorkQueue.h" -#include - -namespace librbd { - -namespace object_watcher { - -class Notifier { -public: - static const uint64_t NOTIFY_TIMEOUT; - - Notifier(ContextWQ *work_queue, librados::IoCtx &ioctx, - const std::string &oid); - ~Notifier(); - - void flush(Context *on_finish); - void notify(bufferlist &bl, bufferlist *out_bl, Context *on_finish); - -private: - typedef std::list Contexts; - - struct C_AioNotify : public Context { - Notifier *notifier; - Context *on_finish; - - C_AioNotify(Notifier *notifier, Context *on_finish) - : notifier(notifier), on_finish(on_finish) { - } - virtual void finish(int r) override { - notifier->handle_notify(r, on_finish); - } - }; - - ContextWQ *m_work_queue; - librados::IoCtx m_ioctx; - CephContext *m_cct; - std::string m_oid; - - Mutex m_aio_notify_lock; - size_t m_pending_aio_notifies = 0; - Contexts m_aio_notify_flush_ctxs; - - void handle_notify(int r, Context *on_finish); - -}; - -} // namespace object_watcher -} // namespace librbd - -#endif // CEPH_LIBRBD_OBJECT_WATCHER_NOTIFIER_H diff --git a/src/librbd/watcher/Notifier.cc b/src/librbd/watcher/Notifier.cc new file mode 100644 index 000000000000..318f077a195a --- /dev/null +++ b/src/librbd/watcher/Notifier.cc @@ -0,0 +1,77 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "librbd/watcher/Notifier.h" +#include "common/WorkQueue.h" +#include "librbd/ImageCtx.h" +#include "librbd/Utils.h" + +#define dout_subsys ceph_subsys_rbd +#undef dout_prefix +#define dout_prefix *_dout << "librbd::object_watcher::Notifier: " + +namespace librbd { +namespace watcher { + +const uint64_t Notifier::NOTIFY_TIMEOUT = 5000; + +Notifier::Notifier(ContextWQ *work_queue, IoCtx &ioctx, const std::string &oid) + : m_work_queue(work_queue), m_oid(oid), + m_aio_notify_lock(util::unique_lock_name( + "librbd::object_watcher::Notifier::m_aio_notify_lock", this)) { + m_ioctx.dup(ioctx); + m_cct = reinterpret_cast(m_ioctx.cct()); +} + +Notifier::~Notifier() { + Mutex::Locker aio_notify_locker(m_aio_notify_lock); + assert(m_pending_aio_notifies == 0); +} + +void Notifier::flush(Context *on_finish) { + Mutex::Locker aio_notify_locker(m_aio_notify_lock); + if (m_pending_aio_notifies == 0) { + m_work_queue->queue(on_finish, 0); + return; + } + + m_aio_notify_flush_ctxs.push_back(on_finish); +} + +void Notifier::notify(bufferlist &bl, bufferlist *out_bl, Context *on_finish) { + { + Mutex::Locker aio_notify_locker(m_aio_notify_lock); + ++m_pending_aio_notifies; + + ldout(m_cct, 20) << __func__ << ": pending=" << m_pending_aio_notifies + << dendl; + } + + C_AioNotify *ctx = new C_AioNotify(this, on_finish); + librados::AioCompletion *comp = util::create_rados_ack_callback(ctx); + int r = m_ioctx.aio_notify(m_oid, comp, bl, NOTIFY_TIMEOUT, out_bl); + assert(r == 0); + comp->release(); +} + +void Notifier::handle_notify(int r, Context *on_finish) { + if (on_finish != nullptr) { + m_work_queue->queue(on_finish, r); + } + + Mutex::Locker aio_notify_locker(m_aio_notify_lock); + assert(m_pending_aio_notifies > 0); + --m_pending_aio_notifies; + + ldout(m_cct, 20) << __func__ << ": pending=" << m_pending_aio_notifies + << dendl; + if (m_pending_aio_notifies == 0) { + for (auto ctx : m_aio_notify_flush_ctxs) { + m_work_queue->queue(ctx, 0); + } + m_aio_notify_flush_ctxs.clear(); + } +} + +} // namespace watcher +} // namespace librbd diff --git a/src/librbd/watcher/Notifier.h b/src/librbd/watcher/Notifier.h new file mode 100644 index 000000000000..c45e9ac90f66 --- /dev/null +++ b/src/librbd/watcher/Notifier.h @@ -0,0 +1,61 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_WATCHER_NOTIFIER_H +#define CEPH_LIBRBD_WATCHER_NOTIFIER_H + +#include "include/int_types.h" +#include "include/buffer_fwd.h" +#include "include/Context.h" +#include "include/rados/librados.hpp" +#include "common/Mutex.h" +#include "common/WorkQueue.h" +#include + +namespace librbd { + +namespace watcher { + +class Notifier { +public: + static const uint64_t NOTIFY_TIMEOUT; + + Notifier(ContextWQ *work_queue, librados::IoCtx &ioctx, + const std::string &oid); + ~Notifier(); + + void flush(Context *on_finish); + void notify(bufferlist &bl, bufferlist *out_bl, Context *on_finish); + +private: + typedef std::list Contexts; + + struct C_AioNotify : public Context { + Notifier *notifier; + Context *on_finish; + + C_AioNotify(Notifier *notifier, Context *on_finish) + : notifier(notifier), on_finish(on_finish) { + } + virtual void finish(int r) override { + notifier->handle_notify(r, on_finish); + } + }; + + ContextWQ *m_work_queue; + librados::IoCtx m_ioctx; + CephContext *m_cct; + std::string m_oid; + + Mutex m_aio_notify_lock; + size_t m_pending_aio_notifies = 0; + Contexts m_aio_notify_flush_ctxs; + + void handle_notify(int r, Context *on_finish); + +}; + +} // namespace watcher +} // namespace librbd + +#endif // CEPH_LIBRBD_WATCHER_NOTIFIER_H diff --git a/src/librbd/watcher/RewatchRequest.cc b/src/librbd/watcher/RewatchRequest.cc new file mode 100644 index 000000000000..688106960a99 --- /dev/null +++ b/src/librbd/watcher/RewatchRequest.cc @@ -0,0 +1,114 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "librbd/watcher/RewatchRequest.h" +#include "common/RWLock.h" +#include "common/errno.h" +#include "librbd/Utils.h" + +#define dout_subsys ceph_subsys_rbd +#undef dout_prefix +#define dout_prefix *_dout << "librbd::watcher::RewatchRequest: " \ + << this << ": " << __func__ + +namespace librbd { + +using util::create_context_callback; +using util::create_rados_safe_callback; + +namespace watcher { + +using std::string; + +RewatchRequest::RewatchRequest(librados::IoCtx& ioctx, const string& oid, + RWLock &watch_lock, + librados::WatchCtx2 *watch_ctx, + uint64_t *watch_handle, Context *on_finish) + : m_ioctx(ioctx), m_oid(oid), m_watch_lock(watch_lock), + m_watch_ctx(watch_ctx), m_watch_handle(watch_handle), + m_on_finish(on_finish) { +} + +void RewatchRequest::send() { + unwatch(); +} + +void RewatchRequest::unwatch() { + assert(m_watch_lock.is_wlocked()); + assert(*m_watch_handle != 0); + + CephContext *cct = reinterpret_cast(m_ioctx.cct()); + ldout(cct, 10) << dendl; + + librados::AioCompletion *aio_comp = create_rados_safe_callback< + RewatchRequest, &RewatchRequest::handle_unwatch>(this); + int r = m_ioctx.aio_unwatch(*m_watch_handle, aio_comp); + assert(r == 0); + aio_comp->release(); + + *m_watch_handle = 0; +} + +void RewatchRequest::handle_unwatch(int r) { + CephContext *cct = reinterpret_cast(m_ioctx.cct()); + ldout(cct, 10) << "r=" << r << dendl; + + if (r == -EBLACKLISTED) { + lderr(cct) << "client blacklisted" << dendl; + finish(r); + return; + } else if (r < 0) { + lderr(cct) << "failed to unwatch: " << cpp_strerror(r) << dendl; + } + rewatch(); +} + +void RewatchRequest::rewatch() { + CephContext *cct = reinterpret_cast(m_ioctx.cct()); + ldout(cct, 10) << dendl; + + librados::AioCompletion *aio_comp = create_rados_safe_callback< + RewatchRequest, &RewatchRequest::handle_rewatch>(this); + int r = m_ioctx.aio_watch(m_oid, aio_comp, &m_rewatch_handle, m_watch_ctx); + assert(r == 0); + aio_comp->release(); +} + +void RewatchRequest::handle_rewatch(int r) { + CephContext *cct = reinterpret_cast(m_ioctx.cct()); + ldout(cct, 10) << "r=" << r << dendl; + + if (r == -EBLACKLISTED) { + lderr(cct) << "client blacklisted" << dendl; + finish(r); + return; + } else if (r == -ENOENT) { + ldout(cct, 5) << "object deleted" << dendl; + finish(r); + return; + } else if (r < 0) { + lderr(cct) << "failed to watch object: " << cpp_strerror(r) + << dendl; + rewatch(); + return; + } + + { + RWLock::WLocker watch_locker(m_watch_lock); + *m_watch_handle = m_rewatch_handle; + } + + finish(0); +} + +void RewatchRequest::finish(int r) { + CephContext *cct = reinterpret_cast(m_ioctx.cct()); + ldout(cct, 10) << "r=" << r << dendl; + + m_on_finish->complete(r); + delete this; +} + +} // namespace watcher +} // namespace librbd + diff --git a/src/librbd/watcher/RewatchRequest.h b/src/librbd/watcher/RewatchRequest.h new file mode 100644 index 000000000000..d4fc250abec7 --- /dev/null +++ b/src/librbd/watcher/RewatchRequest.h @@ -0,0 +1,75 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_WATCHER_REWATCH_REQUEST_H +#define CEPH_LIBRBD_WATCHER_REWATCH_REQUEST_H + +#include "include/int_types.h" +#include "include/rados/librados.hpp" + +struct Context; +struct RWLock; + +namespace librbd { + +namespace watcher { + +class RewatchRequest { +public: + + static RewatchRequest *create(librados::IoCtx& ioctx, const std::string& oid, + RWLock &watch_lock, + librados::WatchCtx2 *watch_ctx, + uint64_t *watch_handle, Context *on_finish) { + return new RewatchRequest(ioctx, oid, watch_lock, watch_ctx, watch_handle, + on_finish); + } + + RewatchRequest(librados::IoCtx& ioctx, const std::string& oid, + RWLock &watch_lock, librados::WatchCtx2 *watch_ctx, + uint64_t *watch_handle, Context *on_finish); + + void send(); + +private: + /** + * @verbatim + * + * + * | + * v + * UNWATCH + * | + * | . . . . + * | . . (recoverable error) + * v v . + * REWATCH . . . + * | + * v + * + * + * @endverbatim + */ + + librados::IoCtx& m_ioctx; + std::string m_oid; + RWLock &m_watch_lock; + librados::WatchCtx2 *m_watch_ctx; + uint64_t *m_watch_handle; + Context *m_on_finish; + + uint64_t m_rewatch_handle = 0; + + void unwatch(); + void handle_unwatch(int r); + + void rewatch(); + void handle_rewatch(int r); + + void finish(int r); +}; + +} // namespace watcher +} // namespace librbd + +#endif // CEPH_LIBRBD_WATCHER_REWATCH_REQUEST_H diff --git a/src/librbd/watcher/Types.cc b/src/librbd/watcher/Types.cc new file mode 100644 index 000000000000..f107c722fe18 --- /dev/null +++ b/src/librbd/watcher/Types.cc @@ -0,0 +1,31 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "librbd/watcher/Types.h" +#include "librbd/Watcher.h" +#include "common/dout.h" + +#define dout_subsys ceph_subsys_rbd +#undef dout_prefix +#define dout_prefix *_dout << "librbd::Watcher: " + +namespace librbd { +namespace watcher { + +C_NotifyAck::C_NotifyAck(Watcher *watcher, uint64_t notify_id, + uint64_t handle) + : watcher(watcher), cct(watcher->m_cct), notify_id(notify_id), + handle(handle) { + ldout(cct, 10) << this << " C_NotifyAck start: id=" << notify_id << ", " + << "handle=" << handle << dendl; +} + +void C_NotifyAck::finish(int r) { + assert(r == 0); + ldout(cct, 10) << this << " C_NotifyAck finish: id=" << notify_id << ", " + << "handle=" << handle << dendl; + watcher->acknowledge_notify(notify_id, handle, out); +} + +} // namespace watcher +} // namespace librbd diff --git a/src/librbd/watcher/Types.h b/src/librbd/watcher/Types.h new file mode 100644 index 000000000000..d9823ca31d4a --- /dev/null +++ b/src/librbd/watcher/Types.h @@ -0,0 +1,86 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_WATCHER_TYPES_H +#define CEPH_LIBRBD_WATCHER_TYPES_H + +#include "include/buffer_fwd.h" +#include "include/encoding.h" +#include "include/Context.h" + +namespace ceph { +class Formatter; +} + +namespace librbd { + +class Watcher; + +namespace watcher { + +struct C_NotifyAck : public Context { + Watcher *watcher; + CephContext *cct; + uint64_t notify_id; + uint64_t handle; + bufferlist out; + + C_NotifyAck(Watcher *watcher, uint64_t notify_id, uint64_t handle); + void finish(int r); +}; + +template +struct HandlePayloadVisitor : public boost::static_visitor { + Watcher *watcher; + uint64_t notify_id; + uint64_t handle; + + HandlePayloadVisitor(Watcher *watcher_, uint64_t notify_id_, + uint64_t handle_) + : watcher(watcher_), notify_id(notify_id_), handle(handle_) + { + } + + template + inline void operator()(const P &payload) const { + C_NotifyAck *ctx = new C_NotifyAck(watcher, notify_id, handle); + if (watcher->handle_payload(payload, ctx)) { + ctx->complete(0); + } + } +}; + +class EncodePayloadVisitor : public boost::static_visitor { +public: + explicit EncodePayloadVisitor(bufferlist &bl) : m_bl(bl) {} + + template + inline void operator()(const P &payload) const { + ::encode(static_cast(P::NOTIFY_OP), m_bl); + payload.encode(m_bl); + } + +private: + bufferlist &m_bl; +}; + +class DecodePayloadVisitor : public boost::static_visitor { +public: + DecodePayloadVisitor(__u8 version, bufferlist::iterator &iter) + : m_version(version), m_iter(iter) {} + + template + inline void operator()(P &payload) const { + payload.decode(m_version, m_iter); + } + +private: + __u8 m_version; + bufferlist::iterator &m_iter; +}; + + +} // namespace watcher +} // namespace librbd + +#endif // CEPH_LIBRBD_WATCHER_TYPES_H diff --git a/src/test/librbd/CMakeLists.txt b/src/test/librbd/CMakeLists.txt index 9a1f3727bfe5..6eb47ef1373c 100644 --- a/src/test/librbd/CMakeLists.txt +++ b/src/test/librbd/CMakeLists.txt @@ -59,6 +59,7 @@ set(unittest_librbd_srcs operation/test_mock_SnapshotRemoveRequest.cc operation/test_mock_SnapshotRollbackRequest.cc operation/test_mock_SnapshotUnprotectRequest.cc + watcher/test_mock_RewatchRequest.cc ) add_executable(unittest_librbd ${unittest_librbd_srcs} diff --git a/src/test/librbd/watcher/test_mock_RewatchRequest.cc b/src/test/librbd/watcher/test_mock_RewatchRequest.cc new file mode 100644 index 000000000000..36356b7421ab --- /dev/null +++ b/src/test/librbd/watcher/test_mock_RewatchRequest.cc @@ -0,0 +1,196 @@ +// -*- mode:C; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "test/librbd/test_mock_fixture.h" +#include "include/rados/librados.hpp" +#include "test/librados_test_stub/MockTestMemIoCtxImpl.h" +#include "test/librados_test_stub/MockTestMemRadosClient.h" +#include "test/librbd/test_support.h" +#include "test/librbd/mock/MockImageCtx.h" +#include "librados/AioCompletionImpl.h" +#include "librbd/watcher/RewatchRequest.h" + +namespace librbd { +namespace watcher { + +using ::testing::_; +using ::testing::DoAll; +using ::testing::InSequence; +using ::testing::Invoke; +using ::testing::Return; +using ::testing::WithArg; + +struct TestMockWatcherRewatchRequest : public TestMockFixture { + typedef RewatchRequest MockRewatchRequest; + + TestMockWatcherRewatchRequest() + : m_watch_lock("watch_lock") { + } + + void expect_aio_watch(MockImageCtx &mock_image_ctx, int r) { + librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx( + mock_image_ctx.md_ctx)); + + EXPECT_CALL(mock_io_ctx, aio_watch(mock_image_ctx.header_oid, _, _, _)) + .WillOnce(DoAll(WithArg<1>(Invoke([&mock_image_ctx, &mock_io_ctx, r](librados::AioCompletionImpl *c) { + c->get(); + mock_image_ctx.image_ctx->op_work_queue->queue(new FunctionContext([&mock_io_ctx, c](int r) { + mock_io_ctx.get_mock_rados_client()->finish_aio_completion(c, r); + }), r); + })), + Return(0))); + } + + void expect_aio_unwatch(MockImageCtx &mock_image_ctx, int r) { + librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx( + mock_image_ctx.md_ctx)); + + EXPECT_CALL(mock_io_ctx, aio_unwatch(m_watch_handle, _)) + .WillOnce(DoAll(Invoke([&mock_image_ctx, &mock_io_ctx, r](uint64_t handle, + librados::AioCompletionImpl *c) { + c->get(); + mock_image_ctx.image_ctx->op_work_queue->queue(new FunctionContext([&mock_io_ctx, c](int r) { + mock_io_ctx.get_mock_rados_client()->finish_aio_completion(c, r); + }), r); + }), + Return(0))); + } + + struct WatchCtx : public librados::WatchCtx2 { + virtual void handle_notify(uint64_t, uint64_t, uint64_t, + ceph::bufferlist&) { + assert(false); + } + virtual void handle_error(uint64_t, int) { + assert(false); + } + }; + + RWLock m_watch_lock; + WatchCtx m_watch_ctx; + uint64_t m_watch_handle = 123; +}; + +TEST_F(TestMockWatcherRewatchRequest, Success) { + librbd::ImageCtx *ictx; + ASSERT_EQ(0, open_image(m_image_name, &ictx)); + + MockImageCtx mock_image_ctx(*ictx); + + InSequence seq; + expect_aio_unwatch(mock_image_ctx, 0); + expect_aio_watch(mock_image_ctx, 0); + + C_SaferCond ctx; + MockRewatchRequest *req = MockRewatchRequest::create(mock_image_ctx.md_ctx, + mock_image_ctx.header_oid, + m_watch_lock, + &m_watch_ctx, + &m_watch_handle, + &ctx); + { + RWLock::WLocker watch_locker(m_watch_lock); + req->send(); + } + ASSERT_EQ(0, ctx.wait()); +} + +TEST_F(TestMockWatcherRewatchRequest, UnwatchError) { + librbd::ImageCtx *ictx; + ASSERT_EQ(0, open_image(m_image_name, &ictx)); + + MockImageCtx mock_image_ctx(*ictx); + + InSequence seq; + expect_aio_unwatch(mock_image_ctx, -EINVAL); + expect_aio_watch(mock_image_ctx, 0); + + C_SaferCond ctx; + MockRewatchRequest *req = MockRewatchRequest::create(mock_image_ctx.md_ctx, + mock_image_ctx.header_oid, + m_watch_lock, + &m_watch_ctx, + &m_watch_handle, + &ctx); + { + RWLock::WLocker watch_locker(m_watch_lock); + req->send(); + } + ASSERT_EQ(0, ctx.wait()); +} + +TEST_F(TestMockWatcherRewatchRequest, WatchBlacklist) { + librbd::ImageCtx *ictx; + ASSERT_EQ(0, open_image(m_image_name, &ictx)); + + MockImageCtx mock_image_ctx(*ictx); + + InSequence seq; + expect_aio_unwatch(mock_image_ctx, 0); + expect_aio_watch(mock_image_ctx, -EBLACKLISTED); + + C_SaferCond ctx; + MockRewatchRequest *req = MockRewatchRequest::create(mock_image_ctx.md_ctx, + mock_image_ctx.header_oid, + m_watch_lock, + &m_watch_ctx, + &m_watch_handle, + &ctx); + { + RWLock::WLocker watch_locker(m_watch_lock); + req->send(); + } + ASSERT_EQ(-EBLACKLISTED, ctx.wait()); +} + +TEST_F(TestMockWatcherRewatchRequest, WatchDNE) { + librbd::ImageCtx *ictx; + ASSERT_EQ(0, open_image(m_image_name, &ictx)); + + MockImageCtx mock_image_ctx(*ictx); + + InSequence seq; + expect_aio_unwatch(mock_image_ctx, 0); + expect_aio_watch(mock_image_ctx, -ENOENT); + + C_SaferCond ctx; + MockRewatchRequest *req = MockRewatchRequest::create(mock_image_ctx.md_ctx, + mock_image_ctx.header_oid, + m_watch_lock, + &m_watch_ctx, + &m_watch_handle, + &ctx); + { + RWLock::WLocker watch_locker(m_watch_lock); + req->send(); + } + ASSERT_EQ(-ENOENT, ctx.wait()); +} + +TEST_F(TestMockWatcherRewatchRequest, WatchError) { + librbd::ImageCtx *ictx; + ASSERT_EQ(0, open_image(m_image_name, &ictx)); + + MockImageCtx mock_image_ctx(*ictx); + + InSequence seq; + expect_aio_unwatch(mock_image_ctx, 0); + expect_aio_watch(mock_image_ctx, -EINVAL); + expect_aio_watch(mock_image_ctx, 0); + + C_SaferCond ctx; + MockRewatchRequest *req = MockRewatchRequest::create(mock_image_ctx.md_ctx, + mock_image_ctx.header_oid, + m_watch_lock, + &m_watch_ctx, + &m_watch_handle, + &ctx); + { + RWLock::WLocker watch_locker(m_watch_lock); + req->send(); + } + ASSERT_EQ(0, ctx.wait()); +} + +} // namespace watcher +} // namespace librbd