From 95937bcd42abbc7d840bb06dc65cb1de6ef6c5e0 Mon Sep 17 00:00:00 2001 From: Jason Dillaman Date: Thu, 18 May 2017 16:25:55 -0400 Subject: [PATCH] librbd: introduced support for blocking watch notifications Signed-off-by: Jason Dillaman --- src/librbd/ImageWatcher.cc | 52 ++++++++++++++++++++++++++++++-------- src/librbd/ImageWatcher.h | 23 +++-------------- src/librbd/Watcher.cc | 41 ++++++++++++++++++++++++++---- src/librbd/Watcher.h | 10 +++++++- 4 files changed, 91 insertions(+), 35 deletions(-) diff --git a/src/librbd/ImageWatcher.cc b/src/librbd/ImageWatcher.cc index 0e85a6a4c427..ca612a9ded86 100644 --- a/src/librbd/ImageWatcher.cc +++ b/src/librbd/ImageWatcher.cc @@ -33,6 +33,32 @@ using librbd::watcher::util::HandlePayloadVisitor; static const double RETRY_DELAY_SECONDS = 1.0; +template +struct ImageWatcher::C_ProcessPayload : public Context { + ImageWatcher *image_watcher; + uint64_t notify_id; + uint64_t handle; + watch_notify::Payload payload; + + C_ProcessPayload(ImageWatcher *image_watcher_, uint64_t notify_id_, + uint64_t handle_, const watch_notify::Payload &payload) + : image_watcher(image_watcher_), notify_id(notify_id_), handle(handle_), + payload(payload) { + } + + void finish(int r) override { + image_watcher->m_async_op_tracker.start_op(); + if (image_watcher->notifications_blocked()) { + // requests are blocked -- just ack the notification + bufferlist bl; + image_watcher->acknowledge_notify(notify_id, handle, bl); + } else { + image_watcher->process_payload(notify_id, handle, payload); + } + image_watcher->m_async_op_tracker.finish_op(); + } +}; + template ImageWatcher::ImageWatcher(I &image_ctx) : Watcher(image_ctx.md_ctx, image_ctx.op_work_queue, image_ctx.header_oid), @@ -62,6 +88,18 @@ void ImageWatcher::unregister_watch(Context *on_finish) { Watcher::unregister_watch(ctx); } +template +void ImageWatcher::block_notifies(Context *on_finish) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 10) << this << " " << __func__ << dendl; + + on_finish = new FunctionContext([this, on_finish](int r) { + cancel_async_requests(); + on_finish->complete(r); + }); + Watcher::block_notifies(on_finish); +} + template void ImageWatcher::schedule_async_progress(const AsyncRequestId &request, uint64_t offset, uint64_t total) { @@ -884,15 +922,9 @@ bool ImageWatcher::handle_payload(const UnknownPayload &payload, template void ImageWatcher::process_payload(uint64_t notify_id, uint64_t handle, - const Payload &payload, int r) { - if (r < 0) { - bufferlist out_bl; - this->acknowledge_notify(notify_id, handle, out_bl); - } else { - apply_visitor(HandlePayloadVisitor>(this, notify_id, - handle), - payload); - } + const Payload &payload) { + apply_visitor(HandlePayloadVisitor>(this, notify_id, handle), + payload); } template @@ -919,7 +951,7 @@ void ImageWatcher::handle_notify(uint64_t notify_id, uint64_t handle, m_image_ctx.state->refresh(new C_ProcessPayload(this, notify_id, handle, notify_message.payload)); } else { - process_payload(notify_id, handle, notify_message.payload, 0); + process_payload(notify_id, handle, notify_message.payload); } } diff --git a/src/librbd/ImageWatcher.h b/src/librbd/ImageWatcher.h index 838e0e3d0122..5e30c8e5b3be 100644 --- a/src/librbd/ImageWatcher.h +++ b/src/librbd/ImageWatcher.h @@ -36,7 +36,8 @@ public: ImageWatcher(ImageCtxT& image_ctx); ~ImageWatcher() override; - void unregister_watch(Context *on_finish); + void unregister_watch(Context *on_finish) override; + void block_notifies(Context *on_finish) override; void notify_flatten(uint64_t request_id, ProgressContext &prog_ctx, Context *on_finish); @@ -145,23 +146,7 @@ private: ProgressContext *m_prog_ctx; }; - struct C_ProcessPayload : public Context { - ImageWatcher *image_watcher; - uint64_t notify_id; - uint64_t handle; - watch_notify::Payload payload; - - C_ProcessPayload(ImageWatcher *image_watcher_, uint64_t notify_id_, - uint64_t handle_, const watch_notify::Payload &payload) - : image_watcher(image_watcher_), notify_id(notify_id_), handle(handle_), - payload(payload) { - } - - void finish(int r) override { - image_watcher->process_payload(notify_id, handle, payload, r); - } - }; - + struct C_ProcessPayload; struct C_ResponseMessage : public Context { C_NotifyAck *notify_ack; @@ -251,7 +236,7 @@ private: bool handle_payload(const watch_notify::UnknownPayload& payload, C_NotifyAck *ctx); void process_payload(uint64_t notify_id, uint64_t handle, - const watch_notify::Payload &payload, int r); + const watch_notify::Payload &payload); void handle_notify(uint64_t notify_id, uint64_t handle, uint64_t notifier_id, bufferlist &bl) override; diff --git a/src/librbd/Watcher.cc b/src/librbd/Watcher.cc index 98a0e7f0be42..54a2246f12cf 100644 --- a/src/librbd/Watcher.cc +++ b/src/librbd/Watcher.cc @@ -174,6 +174,30 @@ void Watcher::unregister_watch(Context *on_finish) { on_finish->complete(0); } +bool Watcher::notifications_blocked() const { + RWLock::RLocker locker(m_watch_lock); + + bool blocked = (m_blocked_count > 0); + ldout(m_cct, 5) << "blocked=" << blocked << dendl; + return blocked; +} + +void Watcher::block_notifies(Context *on_finish) { + { + RWLock::WLocker locker(m_watch_lock); + ++m_blocked_count; + ldout(m_cct, 5) << "blocked_count=" << m_blocked_count << dendl; + } + m_async_op_tracker.wait_for_ops(on_finish); +} + +void Watcher::unblock_notifies() { + RWLock::WLocker locker(m_watch_lock); + assert(m_blocked_count > 0); + --m_blocked_count; + ldout(m_cct, 5) << "blocked_count=" << m_blocked_count << dendl; +} + void Watcher::flush(Context *on_finish) { m_notifier.flush(on_finish); } @@ -260,11 +284,18 @@ void Watcher::send_notify(bufferlist& payload, m_notifier.notify(payload, response, on_finish); } -void Watcher::WatchCtx::handle_notify(uint64_t notify_id, - uint64_t handle, - uint64_t notifier_id, - bufferlist& bl) { - watcher.handle_notify(notify_id, handle, notifier_id, bl); +void Watcher::WatchCtx::handle_notify(uint64_t notify_id, uint64_t handle, + uint64_t notifier_id, bufferlist& bl) { + // if notifications are blocked, finish the notification w/o + // bubbling the notification up to the derived class + watcher.m_async_op_tracker.start_op(); + if (watcher.notifications_blocked()) { + bufferlist bl; + watcher.acknowledge_notify(notify_id, handle, bl); + } else { + watcher.handle_notify(notify_id, handle, notifier_id, bl); + } + watcher.m_async_op_tracker.finish_op(); } void Watcher::WatchCtx::handle_error(uint64_t handle, int err) { diff --git a/src/librbd/Watcher.h b/src/librbd/Watcher.h index 01de690937d0..39009027d9eb 100644 --- a/src/librbd/Watcher.h +++ b/src/librbd/Watcher.h @@ -4,6 +4,7 @@ #ifndef CEPH_LIBRBD_WATCHER_H #define CEPH_LIBRBD_WATCHER_H +#include "common/AsyncOpTracker.h" #include "common/Mutex.h" #include "common/RWLock.h" #include "include/rados/librados.hpp" @@ -36,9 +37,13 @@ public: virtual ~Watcher(); void register_watch(Context *on_finish); - void unregister_watch(Context *on_finish); + virtual void unregister_watch(Context *on_finish); void flush(Context *on_finish); + bool notifications_blocked() const; + virtual void block_notifies(Context *on_finish); + void unblock_notifies(); + std::string get_oid() const; void set_oid(const string& oid); @@ -73,6 +78,7 @@ protected: uint64_t m_watch_handle; watcher::Notifier m_notifier; WatchState m_watch_state; + AsyncOpTracker m_async_op_tracker; void send_notify(bufferlist &payload, watcher::NotifyResponse *response = nullptr, @@ -149,6 +155,8 @@ private: WatchCtx m_watch_ctx; Context *m_unregister_watch_ctx = nullptr; + uint32_t m_blocked_count = 0; + void handle_register_watch(int r, Context *on_finish); void rewatch(); -- 2.47.3