image/SetFlagsRequest.cc
image/SetSnapRequest.cc
image_watcher/NotifyLockOwner.cc
- image_watcher/RewatchRequest.cc
journal/RemoveRequest.cc
journal/CreateRequest.cc
journal/OpenRequest.cc
#include "librbd/Utils.h"
#include "librbd/exclusive_lock/Policy.h"
#include "librbd/image_watcher/NotifyLockOwner.h"
-#include "librbd/image_watcher/RewatchRequest.h"
#include "include/encoding.h"
#include "common/errno.h"
#include "common/WorkQueue.h"
using util::create_async_context_callback;
using util::create_context_callback;
using util::create_rados_safe_callback;
-
-namespace {
-
-struct C_UnwatchAndFlush : public Context {
- librados::Rados rados;
- Context *on_finish;
- bool flushing = false;
- int ret_val = 0;
-
- C_UnwatchAndFlush(librados::IoCtx &io_ctx, Context *on_finish)
- : rados(io_ctx), on_finish(on_finish) {
- }
-
- virtual void complete(int r) override {
- if (ret_val == 0 && r < 0) {
- ret_val = r;
- }
-
- if (!flushing) {
- flushing = true;
-
- librados::AioCompletion *aio_comp = create_rados_safe_callback(this);
- r = rados.aio_watch_flush(aio_comp);
- assert(r == 0);
- aio_comp->release();
- return;
- }
-
- // ensure our reference to the RadosClient is released prior
- // to completing the callback to avoid racing an explicit
- // librados shutdown
- Context *ctx = on_finish;
- r = ret_val;
- delete this;
-
- ctx->complete(r);
- }
-
- virtual void finish(int r) override {
- }
-};
-
-} // anonymous namespace
+using librbd::watcher::HandlePayloadVisitor;
+using librbd::watcher::C_NotifyAck;
static const double RETRY_DELAY_SECONDS = 1.0;
template <typename I>
ImageWatcher<I>::ImageWatcher(I &image_ctx)
- : m_image_ctx(image_ctx),
- m_watch_lock(util::unique_lock_name("librbd::ImageWatcher::m_watch_lock", this)),
- m_watch_ctx(*this), m_watch_handle(0),
- m_watch_state(WATCH_STATE_UNREGISTERED),
+ : Watcher(image_ctx.md_ctx, image_ctx.op_work_queue, image_ctx.header_oid),
+ m_image_ctx(image_ctx),
m_task_finisher(new TaskFinisher<Task>(*m_image_ctx.cct)),
m_async_request_lock(util::unique_lock_name("librbd::ImageWatcher::m_async_request_lock", this)),
- m_owner_client_id_lock(util::unique_lock_name("librbd::ImageWatcher::m_owner_client_id_lock", this)),
- m_notifier(image_ctx.op_work_queue, image_ctx.md_ctx, image_ctx.header_oid)
+ m_owner_client_id_lock(util::unique_lock_name("librbd::ImageWatcher::m_owner_client_id_lock", this))
{
}
ImageWatcher<I>::~ImageWatcher()
{
delete m_task_finisher;
- {
- RWLock::RLocker l(m_watch_lock);
- assert(m_watch_state != WATCH_STATE_REGISTERED);
- }
-}
-
-template <typename I>
-void ImageWatcher<I>::register_watch(Context *on_finish) {
- ldout(m_image_ctx.cct, 10) << this << " registering image watcher" << dendl;
-
- RWLock::RLocker watch_locker(m_watch_lock);
- assert(m_watch_state == WATCH_STATE_UNREGISTERED);
- librados::AioCompletion *aio_comp = create_rados_safe_callback(
- new C_RegisterWatch(this, on_finish));
- int r = m_image_ctx.md_ctx.aio_watch(m_image_ctx.header_oid, aio_comp,
- &m_watch_handle, &m_watch_ctx);
- assert(r == 0);
- aio_comp->release();
-}
-
-template <typename I>
-void ImageWatcher<I>::handle_register_watch(int r) {
- RWLock::WLocker watch_locker(m_watch_lock);
- assert(m_watch_state == WATCH_STATE_UNREGISTERED);
- if (r < 0) {
- m_watch_handle = 0;
- } else if (r >= 0) {
- m_watch_state = WATCH_STATE_REGISTERED;
- }
}
template <typename I>
cancel_async_requests();
- C_Gather *gather_ctx = nullptr;
- {
- RWLock::WLocker watch_locker(m_watch_lock);
- if (m_watch_state == WATCH_STATE_REWATCHING) {
- ldout(cct, 10) << this << " delaying unregister until rewatch completed"
- << dendl;
-
- assert(m_unregister_watch_ctx == nullptr);
- m_unregister_watch_ctx = new FunctionContext([this, on_finish](int r) {
- unregister_watch(on_finish);
- });
- return;
- }
-
- gather_ctx = new C_Gather(m_image_ctx.cct, create_async_context_callback(
- m_image_ctx, on_finish));
- if (m_watch_state == WATCH_STATE_REGISTERED ||
- m_watch_state == WATCH_STATE_ERROR) {
- m_watch_state = WATCH_STATE_UNREGISTERED;
-
- librados::AioCompletion *aio_comp = create_rados_safe_callback(
- new C_UnwatchAndFlush(m_image_ctx.md_ctx, gather_ctx->new_sub()));
- int r = m_image_ctx.md_ctx.aio_unwatch(m_watch_handle, aio_comp);
- assert(r == 0);
- aio_comp->release();
- }
- }
-
- assert(gather_ctx != nullptr);
- m_task_finisher->cancel_all(gather_ctx->new_sub());
- gather_ctx->activate();
-}
-
-template <typename I>
-void ImageWatcher<I>::flush(Context *on_finish) {
- m_notifier.flush(on_finish);
+ FunctionContext *ctx = new FunctionContext([this, on_finish](int r) {
+ m_task_finisher->cancel_all(on_finish);
+ });
+ Watcher::unregister_watch(ctx);
}
template <typename I>
<< request << " @ " << offset
<< "/" << total << dendl;
- bufferlist bl;
- ::encode(NotifyMessage(AsyncProgressPayload(request, offset, total)), bl);
- m_notifier.notify(bl, nullptr, nullptr);
+ send_notify(AsyncProgressPayload(request, offset, total));
return 0;
}
ldout(m_image_ctx.cct, 20) << this << " remote async request finished: "
<< request << " = " << r << dendl;
- bufferlist bl;
- ::encode(NotifyMessage(AsyncCompletePayload(request, r)), bl);
- m_notifier.notify(bl, nullptr, new FunctionContext(
- boost::bind(&ImageWatcher<I>::handle_async_complete, this, request, r,
- _1)));
+ send_notify(AsyncCompletePayload(request, r),
+ new FunctionContext(boost::bind(&ImageWatcher<I>::handle_async_complete,
+ this, request, r, _1)));
}
template <typename I>
AsyncRequestId async_request_id(get_client_id(), request_id);
- bufferlist bl;
- ::encode(NotifyMessage(FlattenPayload(async_request_id)), bl);
- notify_async_request(async_request_id, std::move(bl), prog_ctx, on_finish);
+ notify_async_request(async_request_id, FlattenPayload(async_request_id),
+ prog_ctx, on_finish);
}
template <typename I>
AsyncRequestId async_request_id(get_client_id(), request_id);
- bufferlist bl;
- ::encode(NotifyMessage(ResizePayload(size, allow_shrink, async_request_id)), bl);
- notify_async_request(async_request_id, std::move(bl), prog_ctx, on_finish);
+ notify_async_request(async_request_id,
+ ResizePayload(size, allow_shrink, async_request_id),
+ prog_ctx, on_finish);
}
template <typename I>
assert(m_image_ctx.exclusive_lock &&
!m_image_ctx.exclusive_lock->is_lock_owner());
- bufferlist bl;
- ::encode(NotifyMessage(SnapCreatePayload(snap_name, snap_namespace)), bl);
- notify_lock_owner(std::move(bl), on_finish);
+ notify_lock_owner(SnapCreatePayload(snap_name, snap_namespace), on_finish);
}
template <typename I>
assert(m_image_ctx.exclusive_lock &&
!m_image_ctx.exclusive_lock->is_lock_owner());
- bufferlist bl;
- ::encode(NotifyMessage(SnapRenamePayload(src_snap_id, dst_snap_name)), bl);
- notify_lock_owner(std::move(bl), on_finish);
+ notify_lock_owner(SnapRenamePayload(src_snap_id, dst_snap_name), on_finish);
}
template <typename I>
assert(m_image_ctx.exclusive_lock &&
!m_image_ctx.exclusive_lock->is_lock_owner());
- bufferlist bl;
- ::encode(NotifyMessage(SnapRemovePayload(snap_name)), bl);
- notify_lock_owner(std::move(bl), on_finish);
+ notify_lock_owner(SnapRemovePayload(snap_name), on_finish);
}
template <typename I>
assert(m_image_ctx.exclusive_lock &&
!m_image_ctx.exclusive_lock->is_lock_owner());
- bufferlist bl;
- ::encode(NotifyMessage(SnapProtectPayload(snap_name)), bl);
- notify_lock_owner(std::move(bl), on_finish);
+ notify_lock_owner(SnapProtectPayload(snap_name), on_finish);
}
template <typename I>
assert(m_image_ctx.exclusive_lock &&
!m_image_ctx.exclusive_lock->is_lock_owner());
- bufferlist bl;
- ::encode(NotifyMessage(SnapUnprotectPayload(snap_name)), bl);
- notify_lock_owner(std::move(bl), on_finish);
+ notify_lock_owner(SnapUnprotectPayload(snap_name), on_finish);
}
template <typename I>
AsyncRequestId async_request_id(get_client_id(), request_id);
- bufferlist bl;
- ::encode(NotifyMessage(RebuildObjectMapPayload(async_request_id)), bl);
- notify_async_request(async_request_id, std::move(bl), prog_ctx, on_finish);
+ notify_async_request(async_request_id,
+ RebuildObjectMapPayload(async_request_id),
+ prog_ctx, on_finish);
}
template <typename I>
assert(m_image_ctx.exclusive_lock &&
!m_image_ctx.exclusive_lock->is_lock_owner());
- bufferlist bl;
- ::encode(NotifyMessage(RenamePayload(image_name)), bl);
- notify_lock_owner(std::move(bl), on_finish);
+ notify_lock_owner(RenamePayload(image_name), on_finish);
}
template <typename I>
assert(m_image_ctx.exclusive_lock &&
!m_image_ctx.exclusive_lock->is_lock_owner());
- bufferlist bl;
- ::encode(NotifyMessage(UpdateFeaturesPayload(features, enabled)), bl);
- notify_lock_owner(std::move(bl), on_finish);
+ notify_lock_owner(UpdateFeaturesPayload(features, enabled), on_finish);
}
template <typename I>
ldout(m_image_ctx.cct, 10) << this << ": " << __func__ << dendl;
// supports legacy (empty buffer) clients
- bufferlist bl;
- ::encode(NotifyMessage(HeaderUpdatePayload()), bl);
- m_notifier.notify(bl, nullptr, on_finish);
+ send_notify(HeaderUpdatePayload(), on_finish);
}
template <typename I>
// supports legacy (empty buffer) clients
bufferlist bl;
::encode(NotifyMessage(HeaderUpdatePayload()), bl);
- io_ctx.notify2(oid, bl, object_watcher::Notifier::NOTIFY_TIMEOUT, nullptr);
+ io_ctx.notify2(oid, bl, watcher::Notifier::NOTIFY_TIMEOUT, nullptr);
}
template <typename I>
template <typename I>
ClientId ImageWatcher<I>::get_client_id() {
- RWLock::RLocker l(m_watch_lock);
- return ClientId(m_image_ctx.md_ctx.get_instance_id(), m_watch_handle);
+ RWLock::RLocker l(this->m_watch_lock);
+ return ClientId(m_image_ctx.md_ctx.get_instance_id(), this->m_watch_handle);
}
template <typename I>
set_owner_client_id(client_id);
}
- bufferlist bl;
- ::encode(NotifyMessage(AcquiredLockPayload(client_id)), bl);
- m_notifier.notify(bl, nullptr, nullptr);
+ send_notify(AcquiredLockPayload(client_id));
}
template <typename I>
set_owner_client_id(ClientId());
}
- bufferlist bl;
- ::encode(NotifyMessage(ReleasedLockPayload(get_client_id())), bl);
- m_notifier.notify(bl, nullptr, nullptr);
+ send_notify(ReleasedLockPayload(get_client_id()));
}
template <typename I>
assert(m_image_ctx.exclusive_lock &&
!m_image_ctx.exclusive_lock->is_lock_owner());
- RWLock::RLocker watch_locker(m_watch_lock);
- if (m_watch_state == WATCH_STATE_REGISTERED) {
+ RWLock::RLocker watch_locker(this->m_watch_lock);
+ if (this->m_watch_state == Watcher::WATCH_STATE_REGISTERED) {
ldout(m_image_ctx.cct, 15) << this << " requesting exclusive lock" << dendl;
FunctionContext *ctx = new FunctionContext(
if (timer_delay < 0) {
timer_delay = RETRY_DELAY_SECONDS;
}
- m_task_finisher->add_event_after(TASK_CODE_REQUEST_LOCK, timer_delay,
- ctx);
+ m_task_finisher->add_event_after(TASK_CODE_REQUEST_LOCK,
+ timer_delay, ctx);
} else {
m_task_finisher->queue(TASK_CODE_REQUEST_LOCK, ctx);
}
ldout(m_image_ctx.cct, 10) << this << " notify request lock" << dendl;
- bufferlist bl;
- ::encode(NotifyMessage(RequestLockPayload(get_client_id(), false)), bl);
- notify_lock_owner(std::move(bl), create_context_callback<
- ImageWatcher, &ImageWatcher<I>::handle_request_lock>(this));
+ notify_lock_owner(RequestLockPayload(get_client_id(), false),
+ create_context_callback<
+ ImageWatcher, &ImageWatcher<I>::handle_request_lock>(this));
}
template <typename I>
}
template <typename I>
-void ImageWatcher<I>::notify_lock_owner(bufferlist &&bl, Context *on_finish) {
+void ImageWatcher<I>::notify_lock_owner(const Payload& payload,
+ Context *on_finish) {
assert(on_finish != nullptr);
assert(m_image_ctx.owner_lock.is_locked());
+
+ bufferlist bl;
+ ::encode(NotifyMessage(payload), bl);
+
NotifyLockOwner *notify_lock_owner = NotifyLockOwner::create(
- m_image_ctx, m_notifier, std::move(bl), on_finish);
+ m_image_ctx, this->m_notifier, std::move(bl), on_finish);
notify_lock_owner->send();
}
Task task(TASK_CODE_ASYNC_REQUEST, id);
m_task_finisher->cancel(task);
- m_task_finisher->add_event_after(task, m_image_ctx.request_timed_out_seconds, ctx);
+ m_task_finisher->add_event_after(task, m_image_ctx.request_timed_out_seconds,
+ ctx);
}
template <typename I>
template <typename I>
void ImageWatcher<I>::notify_async_request(const AsyncRequestId &async_request_id,
- bufferlist &&in,
- ProgressContext& prog_ctx,
- Context *on_finish) {
+ const Payload& payload,
+ ProgressContext& prog_ctx,
+ Context *on_finish) {
assert(on_finish != nullptr);
assert(m_image_ctx.owner_lock.is_locked());
}
}
});
+
Context *on_complete = new FunctionContext(
[this, async_request_id, on_finish](int r) {
m_task_finisher->cancel(Task(TASK_CODE_ASYNC_REQUEST, async_request_id));
}
schedule_async_request_timed_out(async_request_id);
- notify_lock_owner(std::move(in), on_notify);
+ notify_lock_owner(payload, on_notify);
}
template <typename I>
const Payload &payload, int r) {
if (r < 0) {
bufferlist out_bl;
- acknowledge_notify(notify_id, handle, out_bl);
+ this->acknowledge_notify(notify_id, handle, out_bl);
} else {
- apply_visitor(HandlePayloadVisitor(this, notify_id, handle), payload);
+ apply_visitor(HandlePayloadVisitor<ImageWatcher<I>>(this, notify_id,
+ handle),
+ payload);
}
}
set_owner_client_id(ClientId());
}
- RWLock::WLocker l(m_watch_lock);
- if (m_watch_state == WATCH_STATE_REGISTERED) {
- m_watch_state = WATCH_STATE_ERROR;
-
- FunctionContext *ctx = new FunctionContext(
- boost::bind(&ImageWatcher<I>::rewatch, this));
- m_task_finisher->queue(TASK_CODE_REREGISTER_WATCH, ctx);
- }
-}
-
-template <typename I>
-void ImageWatcher<I>::acknowledge_notify(uint64_t notify_id, uint64_t handle,
- bufferlist &out) {
- m_image_ctx.md_ctx.notify_ack(m_image_ctx.header_oid, notify_id, handle, out);
-}
-
-template <typename I>
-void ImageWatcher<I>::rewatch() {
- ldout(m_image_ctx.cct, 10) << this << " re-registering image watch" << dendl;
-
- RWLock::WLocker l(m_watch_lock);
- if (m_watch_state != WATCH_STATE_ERROR) {
- return;
- }
- m_watch_state = WATCH_STATE_REWATCHING;
-
- Context *ctx = create_context_callback<
- ImageWatcher<I>, &ImageWatcher<I>::handle_rewatch>(this);
- RewatchRequest<I> *req = RewatchRequest<I>::create(m_image_ctx, m_watch_lock,
- &m_watch_ctx,
- &m_watch_handle, ctx);
- req->send();
+ Watcher::handle_error(handle, err);
}
template <typename I>
-void ImageWatcher<I>::handle_rewatch(int r) {
+void ImageWatcher<I>::handle_rewatch_complete(int r) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 10) << this << " " << __func__ << ": r=" << r << dendl;
- WatchState next_watch_state = WATCH_STATE_REGISTERED;
- if (r < 0) {
- // only EBLACKLISTED or ENOENT can be returned
- assert(r == -EBLACKLISTED || r == -ENOENT);
- next_watch_state = WATCH_STATE_UNREGISTERED;
- }
-
- Context *unregister_watch_ctx = nullptr;
{
- RWLock::WLocker watch_locker(m_watch_lock);
- assert(m_watch_state == WATCH_STATE_REWATCHING);
- m_watch_state = next_watch_state;
-
- std::swap(unregister_watch_ctx, m_unregister_watch_ctx);
-
- // image might have been updated while we didn't have active watch
- handle_payload(HeaderUpdatePayload(), nullptr);
+ RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
+ if (m_image_ctx.exclusive_lock != nullptr) {
+ // update the lock cookie with the new watch handle
+ m_image_ctx.exclusive_lock->reacquire_lock();
+ }
}
- // wake up pending unregister request
- if (unregister_watch_ctx != nullptr) {
- unregister_watch_ctx->complete(0);
- }
+ // image might have been updated while we didn't have active watch
+ handle_payload(HeaderUpdatePayload(), nullptr);
}
template <typename I>
-void ImageWatcher<I>::WatchCtx::handle_notify(uint64_t notify_id,
- uint64_t handle,
- uint64_t notifier_id,
- bufferlist& bl) {
- image_watcher.handle_notify(notify_id, handle, bl);
-}
+void ImageWatcher<I>::send_notify(const Payload &payload, Context *ctx) {
+ bufferlist bl;
-template <typename I>
-void ImageWatcher<I>::WatchCtx::handle_error(uint64_t handle, int err) {
- image_watcher.handle_error(handle, err);
+ ::encode(NotifyMessage(payload), bl);
+ Watcher::send_notify(bl, nullptr, ctx);
}
template <typename I>
m_image_watcher.schedule_async_complete(m_async_request_id, r);
}
-template <typename I>
-ImageWatcher<I>::C_NotifyAck::C_NotifyAck(ImageWatcher *image_watcher,
- uint64_t notify_id, uint64_t handle)
- : image_watcher(image_watcher), notify_id(notify_id), handle(handle) {
- CephContext *cct = image_watcher->m_image_ctx.cct;
- ldout(cct, 10) << this << " C_NotifyAck start: id=" << notify_id << ", "
- << "handle=" << handle << dendl;
-}
-
-template <typename I>
-void ImageWatcher<I>::C_NotifyAck::finish(int r) {
- assert(r == 0);
- CephContext *cct = image_watcher->m_image_ctx.cct;
- ldout(cct, 10) << this << " C_NotifyAck finish: id=" << notify_id << ", "
- << "handle=" << handle << dendl;
-
- image_watcher->acknowledge_notify(notify_id, handle, out);
-}
-
template <typename I>
void ImageWatcher<I>::C_ResponseMessage::finish(int r) {
- CephContext *cct = notify_ack->image_watcher->m_image_ctx.cct;
+ CephContext *cct = notify_ack->cct;
ldout(cct, 10) << this << " C_ResponseMessage: r=" << r << dendl;
::encode(ResponseMessage(r), notify_ack->out);
#include "common/RWLock.h"
#include "include/Context.h"
#include "include/rbd/librbd.hpp"
-#include "librbd/object_watcher/Notifier.h"
+#include "librbd/Watcher.h"
#include "librbd/WatchNotifyTypes.h"
#include <set>
#include <string>
#include <utility>
-#include <boost/variant.hpp>
class entity_name_t;
namespace librbd {
+namespace watcher {
+template <typename> struct HandlePayloadVisitor;
+}
+
class ImageCtx;
-template <typename T> class TaskFinisher;
+template <typename> class TaskFinisher;
template <typename ImageCtxT = ImageCtx>
-class ImageWatcher {
+class ImageWatcher : public Watcher {
+ friend struct watcher::HandlePayloadVisitor<ImageWatcher<ImageCtxT>>;
+
public:
ImageWatcher(ImageCtxT& image_ctx);
- ~ImageWatcher();
+ virtual ~ImageWatcher();
- void register_watch(Context *on_finish);
void unregister_watch(Context *on_finish);
- void flush(Context *on_finish);
void notify_flatten(uint64_t request_id, ProgressContext &prog_ctx,
Context *on_finish);
static void notify_header_update(librados::IoCtx &io_ctx,
const std::string &oid);
- uint64_t get_watch_handle() const {
- RWLock::RLocker watch_locker(m_watch_lock);
- return m_watch_handle;
- }
-
private:
- enum WatchState {
- WATCH_STATE_UNREGISTERED,
- WATCH_STATE_REGISTERED,
- WATCH_STATE_ERROR,
- WATCH_STATE_REWATCHING
- };
-
enum TaskCode {
TASK_CODE_REQUEST_LOCK,
TASK_CODE_CANCEL_ASYNC_REQUESTS,
watch_notify::AsyncRequestId m_async_request_id;
};
- struct WatchCtx : public librados::WatchCtx2 {
- ImageWatcher &image_watcher;
-
- WatchCtx(ImageWatcher &parent) : image_watcher(parent) {}
-
- virtual void handle_notify(uint64_t notify_id,
- uint64_t handle,
- uint64_t notifier_id,
- bufferlist& bl);
- virtual void handle_error(uint64_t handle, int err);
- };
-
class RemoteProgressContext : public ProgressContext {
public:
RemoteProgressContext(ImageWatcher &image_watcher,
ProgressContext *m_prog_ctx;
};
- struct C_RegisterWatch : public Context {
- ImageWatcher *image_watcher;
- Context *on_finish;
-
- C_RegisterWatch(ImageWatcher *image_watcher, Context *on_finish)
- : image_watcher(image_watcher), on_finish(on_finish) {
- }
- virtual void finish(int r) override {
- image_watcher->handle_register_watch(r);
- on_finish->complete(r);
- }
- };
- struct C_NotifyAck : public Context {
- ImageWatcher *image_watcher;
- uint64_t notify_id;
- uint64_t handle;
- bufferlist out;
-
- C_NotifyAck(ImageWatcher *image_watcher, uint64_t notify_id,
- uint64_t handle);
- virtual void finish(int r);
- };
-
- struct C_ResponseMessage : public Context {
- C_NotifyAck *notify_ack;
-
- C_ResponseMessage(C_NotifyAck *notify_ack) : notify_ack(notify_ack) {
- }
- virtual void finish(int r);
- };
-
struct C_ProcessPayload : public Context {
ImageWatcher *image_watcher;
uint64_t notify_id;
}
};
- struct HandlePayloadVisitor : public boost::static_visitor<void> {
- ImageWatcher *image_watcher;
- uint64_t notify_id;
- uint64_t handle;
-
- HandlePayloadVisitor(ImageWatcher *image_watcher_, uint64_t notify_id_,
- uint64_t handle_)
- : image_watcher(image_watcher_), notify_id(notify_id_), handle(handle_)
- {
- }
+ struct C_ResponseMessage : public Context {
+ watcher::C_NotifyAck *notify_ack;
- template <typename Payload>
- inline void operator()(const Payload &payload) const {
- C_NotifyAck *ctx = new C_NotifyAck(image_watcher, notify_id,
- handle);
- if (image_watcher->handle_payload(payload, ctx)) {
- ctx->complete(0);
- }
+ C_ResponseMessage(watcher::C_NotifyAck *notify_ack) : notify_ack(notify_ack) {
}
+ virtual void finish(int r);
};
ImageCtxT &m_image_ctx;
- mutable RWLock m_watch_lock;
- WatchCtx m_watch_ctx;
- uint64_t m_watch_handle;
- WatchState m_watch_state;
- Context *m_unregister_watch_ctx = nullptr;
-
TaskFinisher<Task> *m_task_finisher;
RWLock m_async_request_lock;
Mutex m_owner_client_id_lock;
watch_notify::ClientId m_owner_client_id;
- object_watcher::Notifier m_notifier;
-
void handle_register_watch(int r);
void schedule_cancel_async_requests();
void handle_request_lock(int r);
void schedule_request_lock(bool use_timer, int timer_delay = -1);
- void notify_lock_owner(bufferlist &&bl, Context *on_finish);
+ void notify_lock_owner(const watch_notify::Payload& payload,
+ Context *on_finish);
Context *remove_async_request(const watch_notify::AsyncRequestId &id);
void schedule_async_request_timed_out(const watch_notify::AsyncRequestId &id);
void async_request_timed_out(const watch_notify::AsyncRequestId &id);
void notify_async_request(const watch_notify::AsyncRequestId &id,
- bufferlist &&in, ProgressContext& prog_ctx,
+ const watch_notify::Payload &payload,
+ ProgressContext& prog_ctx,
Context *on_finish);
void schedule_async_progress(const watch_notify::AsyncRequestId &id,
ProgressContext** prog_ctx);
bool handle_payload(const watch_notify::HeaderUpdatePayload& payload,
- C_NotifyAck *ctx);
+ watcher::C_NotifyAck *ctx);
bool handle_payload(const watch_notify::AcquiredLockPayload& payload,
- C_NotifyAck *ctx);
+ watcher::C_NotifyAck *ctx);
bool handle_payload(const watch_notify::ReleasedLockPayload& payload,
- C_NotifyAck *ctx);
+ watcher::C_NotifyAck *ctx);
bool handle_payload(const watch_notify::RequestLockPayload& payload,
- C_NotifyAck *ctx);
+ watcher::C_NotifyAck *ctx);
bool handle_payload(const watch_notify::AsyncProgressPayload& payload,
- C_NotifyAck *ctx);
+ watcher::C_NotifyAck *ctx);
bool handle_payload(const watch_notify::AsyncCompletePayload& payload,
- C_NotifyAck *ctx);
+ watcher::C_NotifyAck *ctx);
bool handle_payload(const watch_notify::FlattenPayload& payload,
- C_NotifyAck *ctx);
+ watcher::C_NotifyAck *ctx);
bool handle_payload(const watch_notify::ResizePayload& payload,
- C_NotifyAck *ctx);
+ watcher::C_NotifyAck *ctx);
bool handle_payload(const watch_notify::SnapCreatePayload& payload,
- C_NotifyAck *ctx);
+ watcher::C_NotifyAck *ctx);
bool handle_payload(const watch_notify::SnapRenamePayload& payload,
- C_NotifyAck *ctx);
+ watcher::C_NotifyAck *ctx);
bool handle_payload(const watch_notify::SnapRemovePayload& payload,
- C_NotifyAck *ctx);
+ watcher::C_NotifyAck *ctx);
bool handle_payload(const watch_notify::SnapProtectPayload& payload,
- C_NotifyAck *ctx);
+ watcher::C_NotifyAck *ctx);
bool handle_payload(const watch_notify::SnapUnprotectPayload& payload,
- C_NotifyAck *ctx);
+ watcher::C_NotifyAck *ctx);
bool handle_payload(const watch_notify::RebuildObjectMapPayload& payload,
- C_NotifyAck *ctx);
+ watcher::C_NotifyAck *ctx);
bool handle_payload(const watch_notify::RenamePayload& payload,
- C_NotifyAck *ctx);
+ watcher::C_NotifyAck *ctx);
bool handle_payload(const watch_notify::UpdateFeaturesPayload& payload,
- C_NotifyAck *ctx);
+ watcher::C_NotifyAck *ctx);
bool handle_payload(const watch_notify::UnknownPayload& payload,
- C_NotifyAck *ctx);
+ watcher::C_NotifyAck *ctx);
void process_payload(uint64_t notify_id, uint64_t handle,
- const watch_notify::Payload &payload, int r);
+ const watch_notify::Payload &payload, int r);
+
+ virtual void handle_notify(uint64_t notify_id, uint64_t handle,
+ bufferlist &bl);
+ virtual void handle_error(uint64_t cookie, int err);
+ virtual void handle_rewatch_complete(int r);
- void handle_notify(uint64_t notify_id, uint64_t handle, bufferlist &bl);
- void handle_error(uint64_t cookie, int err);
- void acknowledge_notify(uint64_t notify_id, uint64_t handle, bufferlist &out);
+ void send_notify(const watch_notify::Payload& payload,
+ Context *ctx = nullptr);
- void rewatch();
- void handle_rewatch(int r);
};
} // namespace librbd
#include "cls/rbd/cls_rbd_types.h"
#include "librbd/WatchNotifyTypes.h"
+#include "librbd/watcher/Types.h"
#include "include/assert.h"
#include "include/stringify.h"
#include "common/Formatter.h"
}
};
-class EncodePayloadVisitor : public boost::static_visitor<void> {
-public:
- explicit EncodePayloadVisitor(bufferlist &bl) : m_bl(bl) {}
-
- template <typename Payload>
- inline void operator()(const Payload &payload) const {
- ::encode(static_cast<uint32_t>(Payload::NOTIFY_OP), m_bl);
- payload.encode(m_bl);
- }
-
-private:
- bufferlist &m_bl;
-};
-
-class DecodePayloadVisitor : public boost::static_visitor<void> {
-public:
- DecodePayloadVisitor(__u8 version, bufferlist::iterator &iter)
- : m_version(version), m_iter(iter) {}
-
- template <typename Payload>
- inline void operator()(Payload &payload) const {
- payload.decode(m_version, m_iter);
- }
-
-private:
- __u8 m_version;
- bufferlist::iterator &m_iter;
-};
-
class DumpPayloadVisitor : public boost::static_visitor<void> {
public:
explicit DumpPayloadVisitor(Formatter *formatter) : m_formatter(formatter) {}
ceph::Formatter *m_formatter;
};
-}
+} // anonymous namespace
void ClientId::encode(bufferlist &bl) const {
::encode(gid, bl);
void NotifyMessage::encode(bufferlist& bl) const {
ENCODE_START(5, 1, bl);
- boost::apply_visitor(EncodePayloadVisitor(bl), payload);
+ boost::apply_visitor(watcher::EncodePayloadVisitor(bl), payload);
ENCODE_FINISH(bl);
}
break;
}
- apply_visitor(DecodePayloadVisitor(struct_v, iter), payload);
+ apply_visitor(watcher::DecodePayloadVisitor(struct_v, iter), payload);
DECODE_FINISH(iter);
}
int r = m_ioctx.aio_unwatch(m_watch_handle, aio_comp);
assert(r == 0);
aio_comp->release();
+ } else {
+ on_finish->complete(0);
}
}
#include "librbd/ImageCtx.h"
#include "librbd/Utils.h"
#include "librbd/WatchNotifyTypes.h"
-#include "librbd/object_watcher/Notifier.h"
+#include "librbd/watcher/Notifier.h"
#include <map>
#define dout_subsys ceph_subsys_rbd
<< this << " " << __func__
namespace librbd {
+
namespace image_watcher {
using namespace watch_notify;
using util::create_context_callback;
NotifyLockOwner::NotifyLockOwner(ImageCtx &image_ctx,
- object_watcher::Notifier ¬ifier,
+ watcher::Notifier ¬ifier,
bufferlist &&bl, Context *on_finish)
: m_image_ctx(image_ctx), m_notifier(notifier), m_bl(std::move(bl)),
m_on_finish(on_finish) {
struct ImageCtx;
-namespace object_watcher { class Notifier; }
+namespace watcher { class Notifier; }
namespace image_watcher {
class NotifyLockOwner {
public:
static NotifyLockOwner *create(ImageCtx &image_ctx,
- object_watcher::Notifier ¬ifier,
+ watcher::Notifier ¬ifier,
bufferlist &&bl, Context *on_finish) {
return new NotifyLockOwner(image_ctx, notifier, std::move(bl), on_finish);
}
- NotifyLockOwner(ImageCtx &image_ctx, object_watcher::Notifier ¬ifier,
+ NotifyLockOwner(ImageCtx &image_ctx, watcher::Notifier ¬ifier,
bufferlist &&bl, Context *on_finish);
void send();
private:
ImageCtx &m_image_ctx;
- object_watcher::Notifier &m_notifier;
+ watcher::Notifier &m_notifier;
bufferlist m_bl;
bufferlist m_out_bl;
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-#include "librbd/image_watcher/RewatchRequest.h"
-#include "common/errno.h"
-#include "librbd/ExclusiveLock.h"
-#include "librbd/ImageCtx.h"
-#include "librbd/Utils.h"
-
-#define dout_subsys ceph_subsys_rbd
-#undef dout_prefix
-#define dout_prefix *_dout << "librbd::image_watcher::RewatchRequest: " \
- << this << ": " << __func__
-
-namespace librbd {
-namespace image_watcher {
-
-using librbd::util::create_context_callback;
-using librbd::util::create_rados_safe_callback;
-
-template <typename I>
-RewatchRequest<I>::RewatchRequest(I &image_ctx, RWLock &watch_lock,
- librados::WatchCtx2 *watch_ctx,
- uint64_t *watch_handle, Context *on_finish)
- : m_image_ctx(image_ctx), m_watch_lock(watch_lock), m_watch_ctx(watch_ctx),
- m_watch_handle(watch_handle), m_on_finish(on_finish) {
-}
-
-template <typename I>
-void RewatchRequest<I>::send() {
- unwatch();
-}
-
-template <typename I>
-void RewatchRequest<I>::unwatch() {
- assert(m_watch_lock.is_wlocked());
- assert(*m_watch_handle != 0);
-
- CephContext *cct = m_image_ctx.cct;
- ldout(cct, 10) << dendl;
-
- librados::AioCompletion *aio_comp = create_rados_safe_callback<
- RewatchRequest<I>, &RewatchRequest<I>::handle_unwatch>(this);
- int r = m_image_ctx.md_ctx.aio_unwatch(*m_watch_handle, aio_comp);
- assert(r == 0);
- aio_comp->release();
-
- *m_watch_handle = 0;
-}
-
-template <typename I>
-void RewatchRequest<I>::handle_unwatch(int r) {
- CephContext *cct = m_image_ctx.cct;
- ldout(cct, 10) << "r=" << r << dendl;
-
- if (r == -EBLACKLISTED) {
- lderr(cct) << "client blacklisted" << dendl;
- finish(r);
- return;
- } else if (r < 0) {
- lderr(cct) << "failed to unwatch: " << cpp_strerror(r) << dendl;
- }
- rewatch();
-}
-
-template <typename I>
-void RewatchRequest<I>::rewatch() {
- CephContext *cct = m_image_ctx.cct;
- ldout(cct, 10) << dendl;
-
- librados::AioCompletion *aio_comp = create_rados_safe_callback<
- RewatchRequest<I>, &RewatchRequest<I>::handle_rewatch>(this);
- int r = m_image_ctx.md_ctx.aio_watch(m_image_ctx.header_oid, aio_comp,
- &m_rewatch_handle, m_watch_ctx);
- assert(r == 0);
- aio_comp->release();
-}
-
-template <typename I>
-void RewatchRequest<I>::handle_rewatch(int r) {
- CephContext *cct = m_image_ctx.cct;
- ldout(cct, 10) << "r=" << r << dendl;
-
- if (r == -EBLACKLISTED) {
- lderr(cct) << "client blacklisted" << dendl;
- finish(r);
- return;
- } else if (r == -ENOENT) {
- ldout(cct, 5) << "image header deleted" << dendl;
- finish(r);
- return;
- } else if (r < 0) {
- lderr(cct) << "failed to watch image header: " << cpp_strerror(r)
- << dendl;
- rewatch();
- return;
- }
-
- {
- RWLock::WLocker watch_locker(m_watch_lock);
- *m_watch_handle = m_rewatch_handle;
- }
-
- {
- RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
- if (m_image_ctx.exclusive_lock != nullptr) {
- // update the lock cookie with the new watch handle
- m_image_ctx.exclusive_lock->reacquire_lock();
- }
- }
- finish(0);
-}
-
-template <typename I>
-void RewatchRequest<I>::finish(int r) {
- CephContext *cct = m_image_ctx.cct;
- ldout(cct, 10) << "r=" << r << dendl;
-
- m_on_finish->complete(r);
- delete this;
-}
-
-} // namespace image_watcher
-} // namespace librbd
-
-template class librbd::image_watcher::RewatchRequest<librbd::ImageCtx>;
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-#ifndef CEPH_LIBRBD_IMAGE_WATCHER_REWATCH_REQUEST_H
-#define CEPH_LIBRBD_IMAGE_WATCHER_REWATCH_REQUEST_H
-
-#include "include/int_types.h"
-#include "include/rados/librados.hpp"
-
-struct Context;
-struct RWLock;
-
-namespace librbd {
-
-class ImageCtx;
-
-namespace image_watcher {
-
-template <typename ImageCtxT = librbd::ImageCtx>
-class RewatchRequest {
-public:
-
- static RewatchRequest *create(ImageCtxT &image_ctx, RWLock &watch_lock,
- librados::WatchCtx2 *watch_ctx,
- uint64_t *watch_handle, Context *on_finish) {
- return new RewatchRequest(image_ctx, watch_lock, watch_ctx, watch_handle,
- on_finish);
- }
-
- RewatchRequest(ImageCtxT &image_ctx, RWLock &watch_lock,
- librados::WatchCtx2 *watch_ctx, uint64_t *watch_handle,
- Context *on_finish);
-
- void send();
-
-private:
- /**
- * @verbatim
- *
- * <start>
- * |
- * v
- * UNWATCH
- * |
- * | . . . .
- * | . . (recoverable error)
- * v v .
- * REWATCH . . .
- * |
- * v
- * <finish>
- *
- * @endverbatim
- */
-
- ImageCtxT &m_image_ctx;
- RWLock &m_watch_lock;
- librados::WatchCtx2 *m_watch_ctx;
- uint64_t *m_watch_handle;
- Context *m_on_finish;
-
- uint64_t m_rewatch_handle = 0;
-
- void unwatch();
- void handle_unwatch(int r);
-
- void rewatch();
- void handle_rewatch(int r);
-
- void finish(int r);
-};
-
-} // namespace image_watcher
-} // namespace librbd
-
-extern template class librbd::image_watcher::RewatchRequest<librbd::ImageCtx>;
-
-#endif // CEPH_LIBRBD_IMAGE_WATCHER_REWATCH_REQUEST_H
#include "librbd/ImageCtx.h"
#include "librbd/MirroringWatcher.h"
+#include "librbd/ImageWatcher.h"
#define dout_subsys ceph_subsys_rbd
#undef dout_prefix
template struct librbd::watcher::HandlePayloadVisitor<
librbd::MirroringWatcher<librbd::ImageCtx>>;
+
+template struct librbd::watcher::HandlePayloadVisitor<
+ librbd::ImageWatcher<librbd::ImageCtx>>;
exclusive_lock/test_mock_ReacquireRequest.cc
exclusive_lock/test_mock_ReleaseRequest.cc
image/test_mock_RefreshRequest.cc
- image_watcher/test_mock_RewatchRequest.cc
journal/test_mock_OpenRequest.cc
journal/test_mock_PromoteRequest.cc
journal/test_mock_Replay.cc
+++ /dev/null
-// -*- mode:C; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-#include "test/librbd/test_mock_fixture.h"
-#include "include/rados/librados.hpp"
-#include "test/librados_test_stub/MockTestMemIoCtxImpl.h"
-#include "test/librados_test_stub/MockTestMemRadosClient.h"
-#include "test/librbd/test_support.h"
-#include "test/librbd/mock/MockExclusiveLock.h"
-#include "test/librbd/mock/MockImageCtx.h"
-#include "librados/AioCompletionImpl.h"
-#include "librbd/image_watcher/RewatchRequest.h"
-
-namespace librbd {
-namespace {
-
-struct MockTestImageCtx : public MockImageCtx {
- MockTestImageCtx(ImageCtx &image_ctx) : MockImageCtx(image_ctx) {
- }
-};
-
-} // anonymous namespace
-} // namespace librbd
-
-#include "librbd/image_watcher/RewatchRequest.cc"
-
-namespace librbd {
-namespace image_watcher {
-
-using ::testing::_;
-using ::testing::DoAll;
-using ::testing::InSequence;
-using ::testing::Invoke;
-using ::testing::Return;
-using ::testing::WithArg;
-
-struct TestMockImageWatcherRewatchRequest : public TestMockFixture {
- typedef RewatchRequest<librbd::MockTestImageCtx> MockRewatchRequest;
-
- TestMockImageWatcherRewatchRequest()
- : m_watch_lock("watch_lock") {
- }
-
- void expect_aio_watch(MockImageCtx &mock_image_ctx, int r) {
- librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(
- mock_image_ctx.md_ctx));
-
- EXPECT_CALL(mock_io_ctx, aio_watch(mock_image_ctx.header_oid, _, _, _))
- .WillOnce(DoAll(WithArg<1>(Invoke([&mock_image_ctx, &mock_io_ctx, r](librados::AioCompletionImpl *c) {
- c->get();
- mock_image_ctx.image_ctx->op_work_queue->queue(new FunctionContext([&mock_io_ctx, c](int r) {
- mock_io_ctx.get_mock_rados_client()->finish_aio_completion(c, r);
- }), r);
- })),
- Return(0)));
- }
-
- void expect_aio_unwatch(MockImageCtx &mock_image_ctx, int r) {
- librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(
- mock_image_ctx.md_ctx));
-
- EXPECT_CALL(mock_io_ctx, aio_unwatch(m_watch_handle, _))
- .WillOnce(DoAll(Invoke([&mock_image_ctx, &mock_io_ctx, r](uint64_t handle,
- librados::AioCompletionImpl *c) {
- c->get();
- mock_image_ctx.image_ctx->op_work_queue->queue(new FunctionContext([&mock_io_ctx, c](int r) {
- mock_io_ctx.get_mock_rados_client()->finish_aio_completion(c, r);
- }), r);
- }),
- Return(0)));
- }
-
- void expect_reacquire_lock(MockExclusiveLock &mock_exclusive_lock) {
- EXPECT_CALL(mock_exclusive_lock, reacquire_lock());
- }
-
- struct WatchCtx : public librados::WatchCtx2 {
- virtual void handle_notify(uint64_t, uint64_t, uint64_t,
- ceph::bufferlist&) {
- assert(false);
- }
- virtual void handle_error(uint64_t, int) {
- assert(false);
- }
- };
-
- RWLock m_watch_lock;
- WatchCtx m_watch_ctx;
- uint64_t m_watch_handle = 123;
-};
-
-TEST_F(TestMockImageWatcherRewatchRequest, Success) {
- librbd::ImageCtx *ictx;
- ASSERT_EQ(0, open_image(m_image_name, &ictx));
-
- MockTestImageCtx mock_image_ctx(*ictx);
-
- InSequence seq;
- expect_aio_unwatch(mock_image_ctx, 0);
- expect_aio_watch(mock_image_ctx, 0);
-
- MockExclusiveLock mock_exclusive_lock;
- if (ictx->test_features(RBD_FEATURE_EXCLUSIVE_LOCK)) {
- mock_image_ctx.exclusive_lock = &mock_exclusive_lock;
- expect_reacquire_lock(mock_exclusive_lock);
- }
-
- C_SaferCond ctx;
- MockRewatchRequest *req = MockRewatchRequest::create(mock_image_ctx,
- m_watch_lock,
- &m_watch_ctx,
- &m_watch_handle,
- &ctx);
- {
- RWLock::WLocker watch_locker(m_watch_lock);
- req->send();
- }
- ASSERT_EQ(0, ctx.wait());
-}
-
-TEST_F(TestMockImageWatcherRewatchRequest, UnwatchError) {
- librbd::ImageCtx *ictx;
- ASSERT_EQ(0, open_image(m_image_name, &ictx));
-
- MockTestImageCtx mock_image_ctx(*ictx);
-
- InSequence seq;
- expect_aio_unwatch(mock_image_ctx, -EINVAL);
- expect_aio_watch(mock_image_ctx, 0);
-
- C_SaferCond ctx;
- MockRewatchRequest *req = MockRewatchRequest::create(mock_image_ctx,
- m_watch_lock,
- &m_watch_ctx,
- &m_watch_handle,
- &ctx);
- {
- RWLock::WLocker watch_locker(m_watch_lock);
- req->send();
- }
- ASSERT_EQ(0, ctx.wait());
-}
-
-TEST_F(TestMockImageWatcherRewatchRequest, WatchBlacklist) {
- librbd::ImageCtx *ictx;
- ASSERT_EQ(0, open_image(m_image_name, &ictx));
-
- MockTestImageCtx mock_image_ctx(*ictx);
-
- InSequence seq;
- expect_aio_unwatch(mock_image_ctx, 0);
- expect_aio_watch(mock_image_ctx, -EBLACKLISTED);
-
- C_SaferCond ctx;
- MockRewatchRequest *req = MockRewatchRequest::create(mock_image_ctx,
- m_watch_lock,
- &m_watch_ctx,
- &m_watch_handle,
- &ctx);
- {
- RWLock::WLocker watch_locker(m_watch_lock);
- req->send();
- }
- ASSERT_EQ(-EBLACKLISTED, ctx.wait());
-}
-
-TEST_F(TestMockImageWatcherRewatchRequest, WatchDNE) {
- librbd::ImageCtx *ictx;
- ASSERT_EQ(0, open_image(m_image_name, &ictx));
-
- MockTestImageCtx mock_image_ctx(*ictx);
-
- InSequence seq;
- expect_aio_unwatch(mock_image_ctx, 0);
- expect_aio_watch(mock_image_ctx, -ENOENT);
-
- C_SaferCond ctx;
- MockRewatchRequest *req = MockRewatchRequest::create(mock_image_ctx,
- m_watch_lock,
- &m_watch_ctx,
- &m_watch_handle,
- &ctx);
- {
- RWLock::WLocker watch_locker(m_watch_lock);
- req->send();
- }
- ASSERT_EQ(-ENOENT, ctx.wait());
-}
-
-TEST_F(TestMockImageWatcherRewatchRequest, WatchError) {
- librbd::ImageCtx *ictx;
- ASSERT_EQ(0, open_image(m_image_name, &ictx));
-
- MockTestImageCtx mock_image_ctx(*ictx);
-
- InSequence seq;
- expect_aio_unwatch(mock_image_ctx, 0);
- expect_aio_watch(mock_image_ctx, -EINVAL);
- expect_aio_watch(mock_image_ctx, 0);
-
- C_SaferCond ctx;
- MockRewatchRequest *req = MockRewatchRequest::create(mock_image_ctx,
- m_watch_lock,
- &m_watch_ctx,
- &m_watch_handle,
- &ctx);
- {
- RWLock::WLocker watch_locker(m_watch_lock);
- req->send();
- }
- ASSERT_EQ(0, ctx.wait());
-}
-
-} // namespace image_watcher
-} // namespace librbd
namespace librbd {
struct MockImageWatcher {
+ MOCK_METHOD0(is_registered, bool());
MOCK_METHOD0(unregister_watch, void());
MOCK_METHOD1(flush, void(Context *));