static const double RETRY_DELAY_SECONDS = 1.0;
+template <typename I>
+struct ImageWatcher<I>::C_ProcessPayload : public Context {
+ ImageWatcher *image_watcher;
+ uint64_t notify_id;
+ uint64_t handle;
+ watch_notify::Payload payload;
+
+ C_ProcessPayload(ImageWatcher *image_watcher_, uint64_t notify_id_,
+ uint64_t handle_, const watch_notify::Payload &payload)
+ : image_watcher(image_watcher_), notify_id(notify_id_), handle(handle_),
+ payload(payload) {
+ }
+
+ void finish(int r) override {
+ image_watcher->m_async_op_tracker.start_op();
+ if (image_watcher->notifications_blocked()) {
+ // requests are blocked -- just ack the notification
+ bufferlist bl;
+ image_watcher->acknowledge_notify(notify_id, handle, bl);
+ } else {
+ image_watcher->process_payload(notify_id, handle, payload);
+ }
+ image_watcher->m_async_op_tracker.finish_op();
+ }
+};
+
template <typename I>
ImageWatcher<I>::ImageWatcher(I &image_ctx)
: Watcher(image_ctx.md_ctx, image_ctx.op_work_queue, image_ctx.header_oid),
Watcher::unregister_watch(ctx);
}
+template <typename I>
+void ImageWatcher<I>::block_notifies(Context *on_finish) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 10) << this << " " << __func__ << dendl;
+
+ on_finish = new FunctionContext([this, on_finish](int r) {
+ cancel_async_requests();
+ on_finish->complete(r);
+ });
+ Watcher::block_notifies(on_finish);
+}
+
template <typename I>
void ImageWatcher<I>::schedule_async_progress(const AsyncRequestId &request,
uint64_t offset, uint64_t total) {
template <typename I>
void ImageWatcher<I>::process_payload(uint64_t notify_id, uint64_t handle,
- const Payload &payload, int r) {
- if (r < 0) {
- bufferlist out_bl;
- this->acknowledge_notify(notify_id, handle, out_bl);
- } else {
- apply_visitor(HandlePayloadVisitor<ImageWatcher<I>>(this, notify_id,
- handle),
- payload);
- }
+ const Payload &payload) {
+ apply_visitor(HandlePayloadVisitor<ImageWatcher<I>>(this, notify_id, handle),
+ payload);
}
template <typename I>
m_image_ctx.state->refresh(new C_ProcessPayload(this, notify_id, handle,
notify_message.payload));
} else {
- process_payload(notify_id, handle, notify_message.payload, 0);
+ process_payload(notify_id, handle, notify_message.payload);
}
}
ImageWatcher(ImageCtxT& image_ctx);
~ImageWatcher() override;
- void unregister_watch(Context *on_finish);
+ void unregister_watch(Context *on_finish) override;
+ void block_notifies(Context *on_finish) override;
void notify_flatten(uint64_t request_id, ProgressContext &prog_ctx,
Context *on_finish);
ProgressContext *m_prog_ctx;
};
- struct C_ProcessPayload : public Context {
- ImageWatcher *image_watcher;
- uint64_t notify_id;
- uint64_t handle;
- watch_notify::Payload payload;
-
- C_ProcessPayload(ImageWatcher *image_watcher_, uint64_t notify_id_,
- uint64_t handle_, const watch_notify::Payload &payload)
- : image_watcher(image_watcher_), notify_id(notify_id_), handle(handle_),
- payload(payload) {
- }
-
- void finish(int r) override {
- image_watcher->process_payload(notify_id, handle, payload, r);
- }
- };
-
+ struct C_ProcessPayload;
struct C_ResponseMessage : public Context {
C_NotifyAck *notify_ack;
bool handle_payload(const watch_notify::UnknownPayload& payload,
C_NotifyAck *ctx);
void process_payload(uint64_t notify_id, uint64_t handle,
- const watch_notify::Payload &payload, int r);
+ const watch_notify::Payload &payload);
void handle_notify(uint64_t notify_id, uint64_t handle,
uint64_t notifier_id, bufferlist &bl) override;
on_finish->complete(0);
}
+bool Watcher::notifications_blocked() const {
+ RWLock::RLocker locker(m_watch_lock);
+
+ bool blocked = (m_blocked_count > 0);
+ ldout(m_cct, 5) << "blocked=" << blocked << dendl;
+ return blocked;
+}
+
+void Watcher::block_notifies(Context *on_finish) {
+ {
+ RWLock::WLocker locker(m_watch_lock);
+ ++m_blocked_count;
+ ldout(m_cct, 5) << "blocked_count=" << m_blocked_count << dendl;
+ }
+ m_async_op_tracker.wait_for_ops(on_finish);
+}
+
+void Watcher::unblock_notifies() {
+ RWLock::WLocker locker(m_watch_lock);
+ assert(m_blocked_count > 0);
+ --m_blocked_count;
+ ldout(m_cct, 5) << "blocked_count=" << m_blocked_count << dendl;
+}
+
void Watcher::flush(Context *on_finish) {
m_notifier.flush(on_finish);
}
m_notifier.notify(payload, response, on_finish);
}
-void Watcher::WatchCtx::handle_notify(uint64_t notify_id,
- uint64_t handle,
- uint64_t notifier_id,
- bufferlist& bl) {
- watcher.handle_notify(notify_id, handle, notifier_id, bl);
+void Watcher::WatchCtx::handle_notify(uint64_t notify_id, uint64_t handle,
+ uint64_t notifier_id, bufferlist& bl) {
+ // if notifications are blocked, finish the notification w/o
+ // bubbling the notification up to the derived class
+ watcher.m_async_op_tracker.start_op();
+ if (watcher.notifications_blocked()) {
+ bufferlist bl;
+ watcher.acknowledge_notify(notify_id, handle, bl);
+ } else {
+ watcher.handle_notify(notify_id, handle, notifier_id, bl);
+ }
+ watcher.m_async_op_tracker.finish_op();
}
void Watcher::WatchCtx::handle_error(uint64_t handle, int err) {
#ifndef CEPH_LIBRBD_WATCHER_H
#define CEPH_LIBRBD_WATCHER_H
+#include "common/AsyncOpTracker.h"
#include "common/Mutex.h"
#include "common/RWLock.h"
#include "include/rados/librados.hpp"
virtual ~Watcher();
void register_watch(Context *on_finish);
- void unregister_watch(Context *on_finish);
+ virtual void unregister_watch(Context *on_finish);
void flush(Context *on_finish);
+ bool notifications_blocked() const;
+ virtual void block_notifies(Context *on_finish);
+ void unblock_notifies();
+
std::string get_oid() const;
void set_oid(const string& oid);
uint64_t m_watch_handle;
watcher::Notifier m_notifier;
WatchState m_watch_state;
+ AsyncOpTracker m_async_op_tracker;
void send_notify(bufferlist &payload,
watcher::NotifyResponse *response = nullptr,
WatchCtx m_watch_ctx;
Context *m_unregister_watch_ctx = nullptr;
+ uint32_t m_blocked_count = 0;
+
void handle_register_watch(int r, Context *on_finish);
void rewatch();