LibrbdWriteback.cc
MirroringWatcher.cc
ObjectMap.cc
- ObjectWatcher.cc
Operations.cc
Utils.cc
cache/ImageWriteback.cc
namespace librbd {
using namespace mirroring_watcher;
+using namespace watcher;
namespace {
template <typename I>
MirroringWatcher<I>::MirroringWatcher(librados::IoCtx &io_ctx,
- ContextWQT *work_queue)
- : ObjectWatcher<I>(io_ctx, work_queue) {
+ ContextWQ *work_queue)
+ : Watcher(io_ctx, work_queue, RBD_MIRRORING) {
}
template <typename I>
-std::string MirroringWatcher<I>::get_oid() const {
- return RBD_MIRRORING;
+int MirroringWatcher<I>::notify_mode_updated(librados::IoCtx &io_ctx,
+ cls::rbd::MirrorMode mirror_mode) {
+ C_SaferCond ctx;
+ notify_mode_updated(io_ctx, mirror_mode, &ctx);
+ return ctx.wait();
}
template <typename I>
-int MirroringWatcher<I>::notify_mode_updated(librados::IoCtx &io_ctx,
- cls::rbd::MirrorMode mirror_mode) {
+void MirroringWatcher<I>::notify_mode_updated(librados::IoCtx &io_ctx,
+ cls::rbd::MirrorMode mirror_mode,
+ Context *on_finish) {
CephContext *cct = reinterpret_cast<CephContext*>(io_ctx.cct());
ldout(cct, 20) << dendl;
bufferlist bl;
::encode(NotifyMessage{ModeUpdatedPayload{mirror_mode}}, bl);
- int r = io_ctx.notify2(RBD_MIRRORING, bl, NOTIFY_TIMEOUT_MS, nullptr);
- if (r < 0) {
- lderr(cct) << ": error encountered sending mode updated notification: "
- << cpp_strerror(r) << dendl;
- return r;
- }
- return 0;
+ librados::AioCompletion *comp = util::create_rados_ack_callback(on_finish);
+ int r = io_ctx.aio_notify(RBD_MIRRORING, comp, bl, NOTIFY_TIMEOUT_MS,
+ nullptr);
+ assert(r == 0);
+ comp->release();
}
template <typename I>
int MirroringWatcher<I>::notify_image_updated(
librados::IoCtx &io_ctx, cls::rbd::MirrorImageState mirror_image_state,
const std::string &image_id, const std::string &global_image_id) {
- C_SaferCond cond;
+ C_SaferCond ctx;
notify_image_updated(io_ctx, mirror_image_state, image_id, global_image_id,
- &cond);
- return cond.wait();
+ &ctx);
+ return ctx.wait();
}
template <typename I>
librados::IoCtx &io_ctx, cls::rbd::MirrorImageState mirror_image_state,
const std::string &image_id, const std::string &global_image_id,
Context *on_finish) {
+
CephContext *cct = reinterpret_cast<CephContext*>(io_ctx.cct());
ldout(cct, 20) << dendl;
bufferlist bl;
- ::encode(NotifyMessage{ImageUpdatedPayload{mirror_image_state, image_id,
- global_image_id}},
- bl);
+ ::encode(NotifyMessage{ImageUpdatedPayload{
+ mirror_image_state, image_id, global_image_id}}, bl);
+
librados::AioCompletion *comp = util::create_rados_ack_callback(on_finish);
int r = io_ctx.aio_notify(RBD_MIRRORING, comp, bl, NOTIFY_TIMEOUT_MS,
nullptr);
assert(r == 0);
comp->release();
+
}
template <typename I>
ldout(cct, 15) << ": notify_id=" << notify_id << ", "
<< "handle=" << handle << dendl;
- Context *ctx = new typename ObjectWatcher<I>::C_NotifyAck(this, notify_id,
- handle);
NotifyMessage notify_message;
try {
} catch (const buffer::error &err) {
lderr(cct) << ": error decoding image notification: " << err.what()
<< dendl;
+ Context *ctx = new C_NotifyAck(this, notify_id, handle);
ctx->complete(0);
return;
}
- apply_visitor(HandlePayloadVisitor(this, ctx), notify_message.payload);
+ apply_visitor(HandlePayloadVisitor<MirroringWatcher<I>>(this, notify_id,
+ handle),
+ notify_message.payload);
}
template <typename I>
-void MirroringWatcher<I>::handle_payload(const ModeUpdatedPayload &payload,
+bool MirroringWatcher<I>::handle_payload(const ModeUpdatedPayload &payload,
Context *on_notify_ack) {
CephContext *cct = this->m_cct;
ldout(cct, 20) << ": mode updated: " << payload.mirror_mode << dendl;
handle_mode_updated(payload.mirror_mode, on_notify_ack);
+ return true;
}
template <typename I>
-void MirroringWatcher<I>::handle_payload(const ImageUpdatedPayload &payload,
+bool MirroringWatcher<I>::handle_payload(const ImageUpdatedPayload &payload,
Context *on_notify_ack) {
CephContext *cct = this->m_cct;
ldout(cct, 20) << ": image state updated" << dendl;
handle_image_updated(payload.mirror_image_state, payload.image_id,
payload.global_image_id, on_notify_ack);
+ return true;
}
template <typename I>
-void MirroringWatcher<I>::handle_payload(const UnknownPayload &payload,
+bool MirroringWatcher<I>::handle_payload(const UnknownPayload &payload,
Context *on_notify_ack) {
- on_notify_ack->complete(0);
+ return true;
}
} // namespace librbd
#include "include/int_types.h"
#include "cls/rbd/cls_rbd_types.h"
#include "librbd/ImageCtx.h"
-#include "librbd/ObjectWatcher.h"
+#include "librbd/Watcher.h"
#include "librbd/mirroring_watcher/Types.h"
namespace librados {
namespace librbd {
+namespace watcher {
+template <typename> struct HandlePayloadVisitor;
+}
+
template <typename ImageCtxT = librbd::ImageCtx>
-class MirroringWatcher : public ObjectWatcher<ImageCtxT> {
-public:
- typedef typename std::decay<decltype(*ImageCtxT::op_work_queue)>::type ContextWQT;
+class MirroringWatcher : public Watcher {
+ friend struct watcher::HandlePayloadVisitor<MirroringWatcher<ImageCtxT>>;
- MirroringWatcher(librados::IoCtx &io_ctx, ContextWQT *work_queue);
+public:
+ MirroringWatcher(librados::IoCtx &io_ctx, ContextWQ *work_queue);
static int notify_mode_updated(librados::IoCtx &io_ctx,
cls::rbd::MirrorMode mirror_mode);
+ static void notify_mode_updated(librados::IoCtx &io_ctx,
+ cls::rbd::MirrorMode mirror_mode,
+ Context *on_finish);
+
static int notify_image_updated(librados::IoCtx &io_ctx,
cls::rbd::MirrorImageState mirror_image_state,
const std::string &image_id,
const std::string &global_image_id,
Context *on_ack) = 0;
-protected:
- virtual std::string get_oid() const;
-
- virtual void handle_notify(uint64_t notify_id, uint64_t handle,
- bufferlist &bl);
-
private:
- struct HandlePayloadVisitor : public boost::static_visitor<void> {
- MirroringWatcher *mirroring_watcher;
- Context *on_notify_ack;
-
- HandlePayloadVisitor(MirroringWatcher *mirroring_watcher,
- Context *on_notify_ack)
- : mirroring_watcher(mirroring_watcher), on_notify_ack(on_notify_ack) {
- }
-
- template <typename Payload>
- inline void operator()(const Payload &payload) const {
- mirroring_watcher->handle_payload(payload, on_notify_ack);
- }
- };
-
- void handle_payload(const mirroring_watcher::ModeUpdatedPayload &payload,
+ bool handle_payload(const mirroring_watcher::ModeUpdatedPayload &payload,
Context *on_notify_ack);
- void handle_payload(const mirroring_watcher::ImageUpdatedPayload &payload,
+ bool handle_payload(const mirroring_watcher::ImageUpdatedPayload &payload,
Context *on_notify_ack);
- void handle_payload(const mirroring_watcher::UnknownPayload &payload,
+ bool handle_payload(const mirroring_watcher::UnknownPayload &payload,
Context *on_notify_ack);
+ virtual void handle_notify(uint64_t notify_id, uint64_t handle,
+ bufferlist &bl);
};
} // namespace librbd
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-#include "librbd/ObjectWatcher.h"
-#include "include/Context.h"
-#include "common/errno.h"
-#include "common/WorkQueue.h"
-#include "librbd/Utils.h"
-
-#define dout_subsys ceph_subsys_rbd
-#undef dout_prefix
-#define dout_prefix *_dout << "librbd::ObjectWatcher: " << get_oid() << ": " \
- << __func__
-
-namespace librbd {
-
-using util::create_context_callback;
-using util::create_rados_safe_callback;
-
-namespace {
-
-struct C_UnwatchAndFlush : public Context {
- librados::Rados rados;
- Context *on_finish;
- bool flushing = false;
- int ret_val = 0;
-
- C_UnwatchAndFlush(librados::IoCtx &io_ctx, Context *on_finish)
- : rados(io_ctx), on_finish(on_finish) {
- }
-
- virtual void complete(int r) override {
- if (ret_val == 0 && r < 0) {
- ret_val = r;
- }
-
- if (!flushing) {
- flushing = true;
-
- librados::AioCompletion *aio_comp = create_rados_safe_callback(this);
- r = rados.aio_watch_flush(aio_comp);
- assert(r == 0);
- aio_comp->release();
- } else {
- Context::complete(ret_val);
- }
- }
-
- virtual void finish(int r) override {
- on_finish->complete(r);
- }
-};
-
-} // anonymous namespace
-
-template <typename I>
-ObjectWatcher<I>::ObjectWatcher(librados::IoCtx &io_ctx, ContextWQT *work_queue)
- : m_io_ctx(io_ctx), m_cct(reinterpret_cast<CephContext*>(io_ctx.cct())),
- m_work_queue(work_queue),
- m_watch_lock(util::unique_lock_name("librbd::ObjectWatcher::m_watch_lock", this)),
- m_watch_ctx(this) {
-}
-
-template <typename I>
-ObjectWatcher<I>::~ObjectWatcher() {
- RWLock::RLocker watch_locker(m_watch_lock);
- assert(m_watch_state == WATCH_STATE_UNREGISTERED);
-}
-
-template <typename I>
-void ObjectWatcher<I>::register_watch(Context *on_finish) {
- ldout(m_cct, 5) << dendl;
-
- {
- RWLock::WLocker watch_locker(m_watch_lock);
- assert(on_finish != nullptr);
- assert(m_on_register_watch == nullptr);
- assert(m_watch_state == WATCH_STATE_UNREGISTERED);
-
- m_watch_state = WATCH_STATE_REGISTERING;
- m_on_register_watch = on_finish;
- }
-
- librados::AioCompletion *aio_comp = create_rados_safe_callback<
- ObjectWatcher<I>, &ObjectWatcher<I>::handle_register_watch>(this);
- int r = m_io_ctx.aio_watch(get_oid(), aio_comp, &m_watch_handle,
- &m_watch_ctx);
- assert(r == 0);
- aio_comp->release();
-}
-
-template <typename I>
-void ObjectWatcher<I>::handle_register_watch(int r) {
- ldout(m_cct, 20) << ": r=" << r << dendl;
-
- Context *on_register_watch = nullptr;
- {
- RWLock::WLocker watch_locker(m_watch_lock);
- assert(m_watch_state == WATCH_STATE_REGISTERING);
-
- std::swap(on_register_watch, m_on_register_watch);
- if (r < 0) {
- lderr(m_cct) << ": failed to register watch: " << cpp_strerror(r)
- << dendl;
-
- m_watch_state = WATCH_STATE_UNREGISTERED;
- m_watch_handle = 0;
- } else {
- m_watch_state = WATCH_STATE_REGISTERED;
- }
- }
- on_register_watch->complete(r);
-}
-
-template <typename I>
-void ObjectWatcher<I>::unregister_watch(Context *on_finish) {
- ldout(m_cct, 5) << dendl;
-
- RWLock::WLocker watch_locker(m_watch_lock);
- assert(on_finish != nullptr);
- assert(m_on_unregister_watch == nullptr);
- assert(m_watch_state != WATCH_STATE_UNREGISTERED &&
- m_watch_state != WATCH_STATE_REGISTERING);
-
- m_on_unregister_watch = on_finish;
- if (m_watch_state == WATCH_STATE_REGISTERED) {
- unregister_watch_();
- }
-}
-
-template <typename I>
-void ObjectWatcher<I>::unregister_watch_() {
- assert(m_watch_lock.is_wlocked());
- assert(m_on_unregister_watch != nullptr);
- assert(m_watch_state == WATCH_STATE_REGISTERED);
- m_watch_state = WATCH_STATE_UNREGISTERING;
-
- Context *ctx = create_context_callback<
- ObjectWatcher<I>, &ObjectWatcher<I>::handle_unregister_watch>(this);
- librados::AioCompletion *aio_comp = create_rados_safe_callback(
- new C_UnwatchAndFlush(m_io_ctx, ctx));
- int r = m_io_ctx.aio_unwatch(m_watch_handle, aio_comp);
- assert(r == 0);
- aio_comp->release();
-}
-
-template <typename I>
-void ObjectWatcher<I>::handle_unregister_watch(int r) {
- ldout(m_cct, 20) << ": r=" << r << dendl;
-
- Context *on_unregister_watch = nullptr;
- {
- RWLock::WLocker watch_locker(m_watch_lock);
- assert(m_watch_state == WATCH_STATE_UNREGISTERING);
-
- if (r < 0) {
- lderr(m_cct) << ": error encountered unregister watch: "
- << cpp_strerror(r) << dendl;
- }
-
- m_watch_state = WATCH_STATE_UNREGISTERED;
- m_watch_handle = 0;
- std::swap(on_unregister_watch, m_on_unregister_watch);
- }
-
- on_unregister_watch->complete(r);
-}
-
-template <typename I>
-void ObjectWatcher<I>::pre_unwatch(Context *on_finish) {
- ldout(m_cct, 20) << dendl;
-
- on_finish->complete(0);
-}
-
-template <typename I>
-void ObjectWatcher<I>::post_rewatch(Context *on_finish) {
- ldout(m_cct, 20) << dendl;
-
- on_finish->complete(0);
-}
-
-template <typename I>
-void ObjectWatcher<I>::acknowledge_notify(uint64_t notify_id, uint64_t handle,
- bufferlist &out) {
- ldout(m_cct, 15) << ": notify_id=" << notify_id << ", "
- << "handle=" << handle << dendl;
- m_io_ctx.notify_ack(get_oid(), notify_id, handle, out);
-}
-
-template <typename I>
-void ObjectWatcher<I>::handle_error(uint64_t handle, int err) {
- lderr(m_cct) << ": handle=" << handle << ", " << "err=" << err << dendl;
-
- RWLock::WLocker watch_locker(m_watch_lock);
- if (m_watch_state != WATCH_STATE_REGISTERED) {
- return;
- }
-
- m_watch_state = WATCH_STATE_REREGISTERING;
- Context *pre_unwatch_ctx = new FunctionContext([this](int r) {
- assert(r == 0);
- Context *ctx = create_context_callback<
- ObjectWatcher<I>, &ObjectWatcher<I>::handle_pre_unwatch>(this);
- pre_unwatch(ctx);
- });
- m_work_queue->queue(pre_unwatch_ctx, 0);
-}
-
-template <typename I>
-void ObjectWatcher<I>::handle_pre_unwatch(int r) {
- ldout(m_cct, 20) << dendl;
-
- assert(r == 0);
- unwatch();
-}
-
-template <typename I>
-void ObjectWatcher<I>::unwatch() {
- ldout(m_cct, 20) << dendl;
-
- {
- RWLock::RLocker watch_locker(m_watch_lock);
- assert(m_watch_state == WATCH_STATE_REREGISTERING);
- }
-
- Context *ctx = create_context_callback<
- ObjectWatcher<I>, &ObjectWatcher<I>::handle_unwatch>(this);
- librados::AioCompletion *aio_comp = create_rados_safe_callback(
- new C_UnwatchAndFlush(m_io_ctx, ctx));
- int r = m_io_ctx.aio_unwatch(m_watch_handle, aio_comp);
- assert(r == 0);
- aio_comp->release();
-}
-
-template <typename I>
-void ObjectWatcher<I>::handle_unwatch(int r) {
- ldout(m_cct, 20) << ": r=" << r << dendl;
-
- if (r < 0) {
- lderr(m_cct) << ": error encountered during unwatch: " << cpp_strerror(r)
- << dendl;
- }
-
- // handling pending unregister (if any)
- if (pending_unregister_watch(r)) {
- return;
- }
-
- rewatch();
-}
-
-template <typename I>
-void ObjectWatcher<I>::rewatch() {
- ldout(m_cct, 20) << dendl;
-
- {
- RWLock::RLocker watch_locker(m_watch_lock);
- assert(m_watch_state == WATCH_STATE_REREGISTERING);
- }
-
- librados::AioCompletion *aio_comp = create_rados_safe_callback<
- ObjectWatcher<I>, &ObjectWatcher<I>::handle_rewatch>(this);
- int r = m_io_ctx.aio_watch(get_oid(), aio_comp, &m_watch_handle,
- &m_watch_ctx);
- assert(r == 0);
- aio_comp->release();
-
-}
-
-template <typename I>
-void ObjectWatcher<I>::handle_rewatch(int r) {
- ldout(m_cct, 20) << ": r=" << r << dendl;
-
- if (r < 0) {
- lderr(m_cct) << ": error encountered during re-watch: " << cpp_strerror(r)
- << dendl;
- m_watch_handle = 0;
-
- if (!pending_unregister_watch(0)) {
- rewatch();
- }
- return;
- }
-
- Context *ctx = create_context_callback<
- ObjectWatcher<I>, &ObjectWatcher<I>::handle_post_watch>(this);
- post_rewatch(ctx);
-}
-
-template <typename I>
-void ObjectWatcher<I>::handle_post_watch(int r) {
- ldout(m_cct, 20) << dendl;
-
- assert(r == 0);
-
- RWLock::WLocker watch_locker(m_watch_lock);
- m_watch_state = WATCH_STATE_REGISTERED;
-
- // handling pending unregister (if any)
- if (m_on_unregister_watch != nullptr) {
- unregister_watch_();
- return;
- }
-}
-
-template <typename I>
-bool ObjectWatcher<I>::pending_unregister_watch(int r) {
- Context *on_unregister_watch = nullptr;
- {
- RWLock::WLocker watch_locker(m_watch_lock);
- assert(m_watch_state == WATCH_STATE_REREGISTERING);
-
- if (m_on_unregister_watch != nullptr) {
- m_watch_state = WATCH_STATE_UNREGISTERED;
- std::swap(on_unregister_watch, m_on_unregister_watch);
- }
- }
-
- if (on_unregister_watch != nullptr) {
- on_unregister_watch->complete(r);
- return true;
- }
-
- return false;
-}
-
-template <typename I>
-ObjectWatcher<I>::C_NotifyAck::C_NotifyAck(ObjectWatcher *object_watcher,
- uint64_t notify_id, uint64_t handle)
- : object_watcher(object_watcher), notify_id(notify_id), handle(handle) {
- CephContext *cct = object_watcher->m_cct;
- ldout(cct, 10) << ": C_NotifyAck start: id=" << notify_id << ", "
- << "handle=" << handle << dendl;
-}
-
-template <typename I>
-void ObjectWatcher<I>::C_NotifyAck::finish(int r) {
- assert(r == 0);
- CephContext *cct = object_watcher->m_cct;
- ldout(cct, 10) << ": C_NotifyAck finish: id=" << notify_id << ", "
- << "handle=" << handle << dendl;
- object_watcher->acknowledge_notify(notify_id, handle, out);
-}
-
-} // namespace librbd
-
-template class librbd::ObjectWatcher<librbd::ImageCtx>;
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-#ifndef CEPH_LIBRBD_OBJECT_WATCHER_H
-#define CEPH_LIBRBD_OBJECT_WATCHER_H
-
-#include "include/rados/librados.hpp"
-#include "common/RWLock.h"
-#include "librbd/ImageCtx.h"
-#include <string>
-#include <type_traits>
-
-class Context;
-
-namespace librbd {
-
-template <typename ImageCtxT = librbd::ImageCtx>
-class ObjectWatcher {
-public:
- typedef typename std::decay<decltype(*ImageCtxT::op_work_queue)>::type ContextWQT;
-
- ObjectWatcher(librados::IoCtx &io_ctx, ContextWQT *work_queue);
- virtual ~ObjectWatcher();
-
- ObjectWatcher(const ObjectWatcher&) = delete;
- ObjectWatcher& operator= (const ObjectWatcher&) = delete;
-
- void register_watch(Context *on_finish);
- virtual void unregister_watch(Context *on_finish);
-
-protected:
- struct C_NotifyAck : public Context {
- ObjectWatcher *object_watcher;
- uint64_t notify_id;
- uint64_t handle;
- bufferlist out;
-
- C_NotifyAck(ObjectWatcher *object_watcher, uint64_t notify_id,
- uint64_t handle);
- virtual void finish(int r);
-
- std::string get_oid() const {
- return object_watcher->get_oid();
- }
- };
-
- librados::IoCtx &m_io_ctx;
- CephContext *m_cct;
-
- virtual std::string get_oid() const = 0;
-
- virtual void handle_notify(uint64_t notify_id, uint64_t handle,
- bufferlist &bl) = 0;
- void acknowledge_notify(uint64_t notify_id, uint64_t handle, bufferlist &out);
-
- virtual void pre_unwatch(Context *on_finish);
- virtual void post_rewatch(Context *on_finish);
-
-private:
- /**
- * @verbatim
- *
- * <start>
- * |
- * v
- * REGISTER_WATCH
- * |
- * | /-------------------------------------\
- * | | |
- * v v (watch error) |
- * REGISTERED * * * * * * * > PRE_UNWATCH |
- * | | |
- * | v |
- * | UNWATCH |
- * | | |
- * | v |
- * | REWATCH |
- * | | |
- * | v |
- * | POST_REWATCH |
- * | | |
- * v \---------------/
- * UNREGISTER_WATCH
- * |
- * v
- * UNREGISTERED
- * |
- * v
- * <finish>
- *
- * @endverbatim
- */
-
- struct WatchCtx : public librados::WatchCtx2 {
- ObjectWatcher *object_watcher;
-
- WatchCtx(ObjectWatcher *object_watcher) : object_watcher(object_watcher) {
- }
-
- virtual void handle_notify(uint64_t notify_id,
- uint64_t handle,
- uint64_t notifier_id,
- bufferlist& bl) {
- object_watcher->handle_notify(notify_id, handle, bl);
- }
-
- virtual void handle_error(uint64_t handle, int err) {
- object_watcher->handle_error(handle, err);
- }
- };
-
- enum WatchState {
- WATCH_STATE_UNREGISTERED,
- WATCH_STATE_REGISTERING,
- WATCH_STATE_REGISTERED,
- WATCH_STATE_UNREGISTERING,
- WATCH_STATE_REREGISTERING
- };
-
- ContextWQT* m_work_queue;
-
- mutable RWLock m_watch_lock;
- WatchCtx m_watch_ctx;
- uint64_t m_watch_handle = 0;
- WatchState m_watch_state = WATCH_STATE_UNREGISTERED;
-
- Context *m_on_register_watch = nullptr;
- Context *m_on_unregister_watch = nullptr;
-
- void handle_register_watch(int r);
-
- void unregister_watch_();
- void handle_unregister_watch(int r);
-
- void handle_error(uint64_t handle, int err);
-
- void handle_pre_unwatch(int r);
-
- void unwatch();
- void handle_unwatch(int r);
-
- void rewatch();
- void handle_rewatch(int r);
-
- void handle_post_watch(int r);
-
- bool pending_unregister_watch(int r);
-
-};
-
-} // namespace librbd
-
-extern template class librbd::ObjectWatcher<librbd::ImageCtx>;
-
-#endif // CEPH_LIBRBD_OBJECT_WATCHER_H
// vim: ts=8 sw=2 smarttab
#include "librbd/mirroring_watcher/Types.h"
+#include "librbd/watcher/Types.h"
#include "include/assert.h"
#include "include/stringify.h"
#include "common/Formatter.h"
namespace {
-class EncodePayloadVisitor : public boost::static_visitor<void> {
-public:
- explicit EncodePayloadVisitor(bufferlist &bl) : m_bl(bl) {}
-
- template <typename Payload>
- inline void operator()(const Payload &payload) const {
- ::encode(static_cast<uint32_t>(Payload::NOTIFY_OP), m_bl);
- payload.encode(m_bl);
- }
-
-private:
- bufferlist &m_bl;
-};
-
-class DecodePayloadVisitor : public boost::static_visitor<void> {
-public:
- DecodePayloadVisitor(__u8 version, bufferlist::iterator &iter)
- : m_version(version), m_iter(iter) {}
-
- template <typename Payload>
- inline void operator()(Payload &payload) const {
- payload.decode(m_version, m_iter);
- }
-
-private:
- __u8 m_version;
- bufferlist::iterator &m_iter;
-};
-
class DumpPayloadVisitor : public boost::static_visitor<void> {
public:
explicit DumpPayloadVisitor(Formatter *formatter) : m_formatter(formatter) {}
void NotifyMessage::encode(bufferlist& bl) const {
ENCODE_START(1, 1, bl);
- boost::apply_visitor(EncodePayloadVisitor(bl), payload);
+ boost::apply_visitor(watcher::EncodePayloadVisitor(bl), payload);
ENCODE_FINISH(bl);
}
break;
}
- apply_visitor(DecodePayloadVisitor(struct_v, iter), payload);
+ apply_visitor(watcher::DecodePayloadVisitor(struct_v, iter), payload);
DECODE_FINISH(iter);
}
#include "librbd/Watcher.h"
#include "common/dout.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/MirroringWatcher.h"
+
#define dout_subsys ceph_subsys_rbd
#undef dout_prefix
#define dout_prefix *_dout << "librbd::Watcher: "
} // namespace watcher
} // namespace librbd
+
+template struct librbd::watcher::HandlePayloadVisitor<
+ librbd::MirroringWatcher<librbd::ImageCtx>>;
test_mock_ExclusiveLock.cc
test_mock_Journal.cc
test_mock_ObjectMap.cc
- test_mock_ObjectWatcher.cc
exclusive_lock/test_mock_AcquireRequest.cc
exclusive_lock/test_mock_ReacquireRequest.cc
exclusive_lock/test_mock_ReleaseRequest.cc
};
TEST_F(TestMirroringWatcher, ModeUpdated) {
- EXPECT_CALL(*m_image_watcher, handle_mode_updated(cls::rbd::MIRROR_MODE_DISABLED, _))
- .WillRepeatedly(WithArg<1>(Invoke([](Context *on_finish) {
- on_finish->complete(0);
- })));
-
- ASSERT_EQ(0, MockMirroringWatcher::notify_mode_updated(m_ioctx, cls::rbd::MIRROR_MODE_DISABLED));
+ EXPECT_CALL(*m_image_watcher, handle_mode_updated(cls::rbd::MIRROR_MODE_DISABLED, _));
+ C_SaferCond ctx;
+ MockMirroringWatcher::notify_mode_updated(m_ioctx, cls::rbd::MIRROR_MODE_DISABLED, &ctx);
+ ASSERT_EQ(0, ctx.wait());
}
TEST_F(TestMirroringWatcher, ImageStatusUpdated) {
EXPECT_CALL(*m_image_watcher,
handle_image_updated(cls::rbd::MIRROR_IMAGE_STATE_ENABLED,
StrEq("image id"), StrEq("global image id"),
- _))
- .WillRepeatedly(WithArg<3>(Invoke([](Context *on_finish) {
- on_finish->complete(0);
- })));
-
- ASSERT_EQ(0, MockMirroringWatcher::notify_image_updated(m_ioctx,
- cls::rbd::MIRROR_IMAGE_STATE_ENABLED,
- "image id",
- "global image id"));
+ _));
+
+ C_SaferCond ctx;
+ MockMirroringWatcher::notify_image_updated(m_ioctx,
+ cls::rbd::MIRROR_IMAGE_STATE_ENABLED,
+ "image id", "global image id", &ctx);
+ ASSERT_EQ(0, ctx.wait());
}
} // namespace librbd
#include "include/stringify.h"
#include "cls/rbd/cls_rbd_client.h"
#include "global/global_context.h"
-#include "librbd/ObjectWatcher.h"
+#include "librbd/Watcher.h"
#include "librbd/internal.h"
#include "Replayer.h"
#include "Threads.h"
}
private:
- class Watcher : public librbd::ObjectWatcher<> {
+ class Watcher : public librbd::Watcher {
public:
Watcher(librados::IoCtx &ioctx, ContextWQ *work_queue) :
- ObjectWatcher<>(ioctx, work_queue) {
+ librbd::Watcher(ioctx, work_queue, RBD_MIRRORING) {
}
virtual std::string get_oid() const {
}
virtual void handle_notify(uint64_t notify_id, uint64_t handle,
- bufferlist &bl) {
+ bufferlist &bl) {
bufferlist out;
acknowledge_notify(notify_id, handle, out);
}