librbd_types_la_SOURCES = \
librbd/journal/Types.cc \
+ librbd/mirroring_watcher/Types.cc \
librbd/WatchNotifyTypes.cc
noinst_LTLIBRARIES += librbd_types.la
librbd/Journal.cc \
librbd/LibrbdAdminSocketHook.cc \
librbd/LibrbdWriteback.cc \
+ librbd/MirroringWatcher.cc \
librbd/ObjectMap.cc \
+ librbd/ObjectWatcher.cc \
librbd/Operations.cc \
librbd/Utils.cc \
librbd/exclusive_lock/AcquireRequest.cc \
librbd/Journal.h \
librbd/LibrbdAdminSocketHook.h \
librbd/LibrbdWriteback.h \
+ librbd/MirroringWatcher.h \
librbd/ObjectMap.h \
+ librbd/ObjectWatcher.h \
librbd/Operations.h \
librbd/parent_types.h \
librbd/SnapInfo.h \
librbd/journal/StandardPolicy.h \
librbd/journal/Types.h \
librbd/journal/TypeTraits.h \
+ librbd/mirroring_watcher/Types.h \
librbd/object_map/InvalidateRequest.h \
librbd/object_map/LockRequest.h \
librbd/object_map/Request.h \
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/MirroringWatcher.h"
+#include "include/rbd_types.h"
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::MirroringWatcher: "
+
+namespace librbd {
+
+template <typename I>
+MirroringWatcher<I>::MirroringWatcher(librados::IoCtx &io_ctx,
+ ContextWQT *work_queue)
+ : ObjectWatcher<I>(io_ctx, work_queue) {
+}
+
+template <typename I>
+std::string MirroringWatcher<I>::get_oid() const {
+ return RBD_MIRRORING;
+}
+
+} // namespace librbd
+
+template class librbd::MirroringWatcher<librbd::ImageCtx>;
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_MIRRORING_WATCHER_H
+#define CEPH_LIBRBD_MIRRORING_WATCHER_H
+
+#include "include/int_types.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/ObjectWatcher.h"
+#include "librbd/mirroring_watcher/Types.h"
+
+namespace librbd {
+
+template <typename ImageCtxT = librbd::ImageCtx>
+class MirroringWatcher : public ObjectWatcher<ImageCtxT> {
+public:
+ typedef typename std::decay<decltype(*ImageCtxT::op_work_queue)>::type ContextWQT;
+
+ MirroringWatcher(librados::IoCtx &io_ctx, ContextWQT *work_queue);
+
+protected:
+ virtual std::string get_oid() const;
+
+private:
+
+};
+
+} // namespace librbd
+
+extern template class librbd::MirroringWatcher<librbd::ImageCtx>;
+
+#endif // CEPH_LIBRBD_MIRRORING_WATCHER_H
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/ObjectWatcher.h"
+#include "include/Context.h"
+#include "common/errno.h"
+#include "common/WorkQueue.h"
+#include "librbd/Utils.h"
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::ObjectWatcher: " << get_oid() << ": " \
+ << __func__
+
+namespace librbd {
+
+using util::create_context_callback;
+using util::create_rados_safe_callback;
+
+namespace {
+
+struct C_UnwatchAndFlush : public Context {
+ librados::Rados rados;
+ Context *on_finish;
+ bool flushing = false;
+ int ret_val = 0;
+
+ C_UnwatchAndFlush(librados::IoCtx &io_ctx, Context *on_finish)
+ : rados(io_ctx), on_finish(on_finish) {
+ }
+
+ virtual void complete(int r) override {
+ if (ret_val == 0 && r < 0) {
+ ret_val = r;
+ }
+
+ if (!flushing) {
+ flushing = true;
+
+ librados::AioCompletion *aio_comp = create_rados_safe_callback(this);
+ r = rados.aio_watch_flush(aio_comp);
+ assert(r == 0);
+ aio_comp->release();
+ } else {
+ Context::complete(ret_val);
+ }
+ }
+
+ virtual void finish(int r) override {
+ on_finish->complete(r);
+ }
+};
+
+} // anonymous namespace
+
+template <typename I>
+ObjectWatcher<I>::ObjectWatcher(librados::IoCtx &io_ctx, ContextWQT *work_queue)
+ : m_io_ctx(io_ctx), m_cct(reinterpret_cast<CephContext*>(io_ctx.cct())),
+ m_work_queue(work_queue),
+ m_watch_lock(util::unique_lock_name("librbd::ObjectWatcher::m_watch_lock", this)),
+ m_watch_ctx(this) {
+}
+
+template <typename I>
+ObjectWatcher<I>::~ObjectWatcher() {
+ RWLock::RLocker watch_locker(m_watch_lock);
+ assert(m_watch_state == WATCH_STATE_UNREGISTERED);
+}
+
+template <typename I>
+void ObjectWatcher<I>::register_watch(Context *on_finish) {
+ ldout(m_cct, 5) << dendl;
+
+ {
+ RWLock::WLocker watch_locker(m_watch_lock);
+ assert(on_finish != nullptr);
+ assert(m_on_register_watch == nullptr);
+ assert(m_watch_state == WATCH_STATE_UNREGISTERED);
+
+ m_watch_state = WATCH_STATE_REGISTERING;
+ m_on_register_watch = on_finish;
+ }
+
+ librados::AioCompletion *aio_comp = create_rados_safe_callback<
+ ObjectWatcher<I>, &ObjectWatcher<I>::handle_register_watch>(this);
+ int r = m_io_ctx.aio_watch(get_oid(), aio_comp, &m_watch_handle,
+ &m_watch_ctx);
+ assert(r == 0);
+ aio_comp->release();
+}
+
+template <typename I>
+void ObjectWatcher<I>::handle_register_watch(int r) {
+ ldout(m_cct, 20) << ": r=" << r << dendl;
+
+ Context *on_register_watch = nullptr;
+ {
+ RWLock::WLocker watch_locker(m_watch_lock);
+ assert(m_watch_state == WATCH_STATE_REGISTERING);
+
+ std::swap(on_register_watch, m_on_register_watch);
+ if (r < 0) {
+ lderr(m_cct) << ": failed to register watch: " << cpp_strerror(r)
+ << dendl;
+
+ m_watch_state = WATCH_STATE_UNREGISTERED;
+ m_watch_handle = 0;
+ } else {
+ m_watch_state = WATCH_STATE_REGISTERED;
+ }
+ }
+ on_register_watch->complete(r);
+}
+
+template <typename I>
+void ObjectWatcher<I>::unregister_watch(Context *on_finish) {
+ ldout(m_cct, 5) << dendl;
+
+ RWLock::WLocker watch_locker(m_watch_lock);
+ assert(on_finish != nullptr);
+ assert(m_on_unregister_watch == nullptr);
+ assert(m_watch_state != WATCH_STATE_UNREGISTERED &&
+ m_watch_state != WATCH_STATE_REGISTERING);
+
+ m_on_unregister_watch = on_finish;
+ if (m_watch_state == WATCH_STATE_REGISTERED) {
+ unregister_watch_();
+ }
+}
+
+template <typename I>
+void ObjectWatcher<I>::unregister_watch_() {
+ assert(m_watch_lock.is_wlocked());
+ assert(m_on_unregister_watch != nullptr);
+ assert(m_watch_state == WATCH_STATE_REGISTERED);
+ m_watch_state = WATCH_STATE_UNREGISTERING;
+
+ Context *ctx = create_context_callback<
+ ObjectWatcher<I>, &ObjectWatcher<I>::handle_unregister_watch>(this);
+ librados::AioCompletion *aio_comp = create_rados_safe_callback(
+ new C_UnwatchAndFlush(m_io_ctx, ctx));
+ int r = m_io_ctx.aio_unwatch(m_watch_handle, aio_comp);
+ assert(r == 0);
+ aio_comp->release();
+}
+
+template <typename I>
+void ObjectWatcher<I>::handle_unregister_watch(int r) {
+ ldout(m_cct, 20) << ": r=" << r << dendl;
+
+ Context *on_unregister_watch = nullptr;
+ {
+ RWLock::WLocker watch_locker(m_watch_lock);
+ assert(m_watch_state == WATCH_STATE_UNREGISTERING);
+
+ if (r < 0) {
+ lderr(m_cct) << ": error encountered unregister watch: "
+ << cpp_strerror(r) << dendl;
+ }
+
+ m_watch_state = WATCH_STATE_UNREGISTERED;
+ m_watch_handle = 0;
+ std::swap(on_unregister_watch, m_on_unregister_watch);
+ }
+
+ on_unregister_watch->complete(r);
+}
+
+template <typename I>
+void ObjectWatcher<I>::pre_unwatch(Context *on_finish) {
+ ldout(m_cct, 20) << dendl;
+
+ on_finish->complete(0);
+}
+
+template <typename I>
+void ObjectWatcher<I>::post_rewatch(Context *on_finish) {
+ ldout(m_cct, 20) << dendl;
+
+ on_finish->complete(0);
+}
+
+template <typename I>
+void ObjectWatcher<I>::handle_notify(uint64_t notify_id, uint64_t handle,
+ bufferlist &bl) {
+ ldout(m_cct, 15) << ": notify_id=" << notify_id << ", "
+ << "handle=" << handle << dendl;
+}
+
+template <typename I>
+void ObjectWatcher<I>::acknowledge_notify(uint64_t notify_id, uint64_t handle,
+ bufferlist &out) {
+ ldout(m_cct, 15) << ": notify_id=" << notify_id << ", "
+ << "handle=" << handle << dendl;
+ m_io_ctx.notify_ack(get_oid(), notify_id, handle, out);
+}
+
+template <typename I>
+void ObjectWatcher<I>::handle_error(uint64_t handle, int err) {
+ lderr(m_cct) << ": handle=" << handle << ", " << "err=" << err << dendl;
+
+ RWLock::WLocker watch_locker(m_watch_lock);
+ if (m_watch_state != WATCH_STATE_REGISTERED) {
+ return;
+ }
+
+ m_watch_state = WATCH_STATE_REREGISTERING;
+ Context *pre_unwatch_ctx = new FunctionContext([this](int r) {
+ assert(r == 0);
+ Context *ctx = create_context_callback<
+ ObjectWatcher<I>, &ObjectWatcher<I>::handle_pre_unwatch>(this);
+ pre_unwatch(ctx);
+ });
+ m_work_queue->queue(pre_unwatch_ctx, 0);
+}
+
+template <typename I>
+void ObjectWatcher<I>::handle_pre_unwatch(int r) {
+ ldout(m_cct, 20) << dendl;
+
+ assert(r == 0);
+ unwatch();
+}
+
+template <typename I>
+void ObjectWatcher<I>::unwatch() {
+ ldout(m_cct, 20) << dendl;
+
+ {
+ RWLock::RLocker watch_locker(m_watch_lock);
+ assert(m_watch_state == WATCH_STATE_REREGISTERING);
+ }
+
+ Context *ctx = create_context_callback<
+ ObjectWatcher<I>, &ObjectWatcher<I>::handle_unwatch>(this);
+ librados::AioCompletion *aio_comp = create_rados_safe_callback(
+ new C_UnwatchAndFlush(m_io_ctx, ctx));
+ int r = m_io_ctx.aio_unwatch(m_watch_handle, aio_comp);
+ assert(r == 0);
+ aio_comp->release();
+}
+
+template <typename I>
+void ObjectWatcher<I>::handle_unwatch(int r) {
+ ldout(m_cct, 20) << ": r=" << r << dendl;
+
+ if (r < 0) {
+ lderr(m_cct) << ": error encountered during unwatch: " << cpp_strerror(r)
+ << dendl;
+ }
+
+ // handling pending unregister (if any)
+ if (pending_unregister_watch(r)) {
+ return;
+ }
+
+ rewatch();
+}
+
+template <typename I>
+void ObjectWatcher<I>::rewatch() {
+ ldout(m_cct, 20) << dendl;
+
+ {
+ RWLock::RLocker watch_locker(m_watch_lock);
+ assert(m_watch_state == WATCH_STATE_REREGISTERING);
+ }
+
+ librados::AioCompletion *aio_comp = create_rados_safe_callback<
+ ObjectWatcher<I>, &ObjectWatcher<I>::handle_rewatch>(this);
+ int r = m_io_ctx.aio_watch(get_oid(), aio_comp, &m_watch_handle,
+ &m_watch_ctx);
+ assert(r == 0);
+ aio_comp->release();
+
+}
+
+template <typename I>
+void ObjectWatcher<I>::handle_rewatch(int r) {
+ ldout(m_cct, 20) << ": r=" << r << dendl;
+
+ if (r < 0) {
+ lderr(m_cct) << ": error encountered during re-watch: " << cpp_strerror(r)
+ << dendl;
+ m_watch_handle = 0;
+
+ if (!pending_unregister_watch(0)) {
+ rewatch();
+ }
+ return;
+ }
+
+ Context *ctx = create_context_callback<
+ ObjectWatcher<I>, &ObjectWatcher<I>::handle_post_watch>(this);
+ post_rewatch(ctx);
+}
+
+template <typename I>
+void ObjectWatcher<I>::handle_post_watch(int r) {
+ ldout(m_cct, 20) << dendl;
+
+ assert(r == 0);
+
+ RWLock::WLocker watch_locker(m_watch_lock);
+ m_watch_state = WATCH_STATE_REGISTERED;
+
+ // handling pending unregister (if any)
+ if (m_on_unregister_watch != nullptr) {
+ unregister_watch_();
+ return;
+ }
+}
+
+template <typename I>
+bool ObjectWatcher<I>::pending_unregister_watch(int r) {
+ Context *on_unregister_watch = nullptr;
+ {
+ RWLock::WLocker watch_locker(m_watch_lock);
+ assert(m_watch_state == WATCH_STATE_REREGISTERING);
+
+ if (m_on_unregister_watch != nullptr) {
+ m_watch_state = WATCH_STATE_UNREGISTERED;
+ std::swap(on_unregister_watch, m_on_unregister_watch);
+ }
+ }
+
+ if (on_unregister_watch != nullptr) {
+ on_unregister_watch->complete(r);
+ return true;
+ }
+
+ return false;
+}
+
+} // namespace librbd
+
+template class librbd::ObjectWatcher<librbd::ImageCtx>;
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_OBJECT_WATCHER_H
+#define CEPH_LIBRBD_OBJECT_WATCHER_H
+
+#include "include/rados/librados.hpp"
+#include "common/RWLock.h"
+#include "librbd/ImageCtx.h"
+#include <string>
+#include <type_traits>
+
+class Context;
+
+namespace librbd {
+
+template <typename ImageCtxT = librbd::ImageCtx>
+class ObjectWatcher {
+public:
+ typedef typename std::decay<decltype(*ImageCtxT::op_work_queue)>::type ContextWQT;
+
+ ObjectWatcher(librados::IoCtx &io_ctx, ContextWQT *work_queue);
+ virtual ~ObjectWatcher();
+
+ ObjectWatcher(const ObjectWatcher&) = delete;
+ ObjectWatcher& operator= (const ObjectWatcher&) = delete;
+
+ void register_watch(Context *on_finish);
+ virtual void unregister_watch(Context *on_finish);
+
+protected:
+ librados::IoCtx &m_io_ctx;
+ CephContext *m_cct;
+
+ virtual std::string get_oid() const = 0;
+
+ virtual void handle_notify(uint64_t notify_id, uint64_t handle,
+ bufferlist &bl);
+ void acknowledge_notify(uint64_t notify_id, uint64_t handle, bufferlist &out);
+
+ virtual void pre_unwatch(Context *on_finish);
+ virtual void post_rewatch(Context *on_finish);
+
+private:
+ /**
+ * @verbatim
+ *
+ * <start>
+ * |
+ * v
+ * REGISTER_WATCH
+ * |
+ * | /-------------------------------------\
+ * | | |
+ * v v (watch error) |
+ * REGISTERED * * * * * * * > PRE_UNWATCH |
+ * | | |
+ * | v |
+ * | UNWATCH |
+ * | | |
+ * | v |
+ * | REWATCH |
+ * | | |
+ * | v |
+ * | POST_REWATCH |
+ * | | |
+ * v \---------------/
+ * UNREGISTER_WATCH
+ * |
+ * v
+ * UNREGISTERED
+ * |
+ * v
+ * <finish>
+ *
+ * @endverbatim
+ */
+
+ struct WatchCtx : public librados::WatchCtx2 {
+ ObjectWatcher *object_watcher;
+
+ WatchCtx(ObjectWatcher *object_watcher) : object_watcher(object_watcher) {
+ }
+
+ virtual void handle_notify(uint64_t notify_id,
+ uint64_t handle,
+ uint64_t notifier_id,
+ bufferlist& bl) {
+ object_watcher->handle_notify(notify_id, handle, bl);
+ }
+
+ virtual void handle_error(uint64_t handle, int err) {
+ object_watcher->handle_error(handle, err);
+ }
+ };
+
+ enum WatchState {
+ WATCH_STATE_UNREGISTERED,
+ WATCH_STATE_REGISTERING,
+ WATCH_STATE_REGISTERED,
+ WATCH_STATE_UNREGISTERING,
+ WATCH_STATE_REREGISTERING
+ };
+
+ ContextWQT* m_work_queue;
+
+ mutable RWLock m_watch_lock;
+ WatchCtx m_watch_ctx;
+ uint64_t m_watch_handle = 0;
+ WatchState m_watch_state = WATCH_STATE_UNREGISTERED;
+
+ Context *m_on_register_watch = nullptr;
+ Context *m_on_unregister_watch = nullptr;
+
+ void handle_register_watch(int r);
+
+ void unregister_watch_();
+ void handle_unregister_watch(int r);
+
+ void handle_error(uint64_t handle, int err);
+
+ void handle_pre_unwatch(int r);
+
+ void unwatch();
+ void handle_unwatch(int r);
+
+ void rewatch();
+ void handle_rewatch(int r);
+
+ void handle_post_watch(int r);
+
+ bool pending_unregister_watch(int r);
+
+};
+
+} // namespace librbd
+
+extern template class librbd::ObjectWatcher<librbd::ImageCtx>;
+
+#endif // CEPH_LIBRBD_OBJECT_WATCHER_H
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/mirroring_watcher/Types.h"
+
+namespace librbd {
+namespace mirroring_watcher {
+
+} // namespace mirroring_watcher
+} // namespace librbd
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_MIRRORING_WATCHER_TYPES_H
+#define CEPH_LIBRBD_MIRRORING_WATCHER_TYPES_H
+
+#include "include/int_types.h"
+
+namespace librbd {
+namespace mirroring_watcher {
+
+} // namespace mirroring_watcher
+} // namespace librbd
+
+#endif // CEPH_LIBRBD_MIRRORING_WATCHER_TYPES_H
test/librbd/test_mock_fixture.cc \
test/librbd/test_mock_ExclusiveLock.cc \
test/librbd/test_mock_Journal.cc \
+ test/librbd/test_mock_ObjectWatcher.cc \
test/librbd/exclusive_lock/test_mock_AcquireRequest.cc \
test/librbd/exclusive_lock/test_mock_ReleaseRequest.cc \
test/librbd/image/test_mock_RefreshRequest.cc \
return io_ctx_impl;
}
+ MOCK_METHOD4(aio_watch, int(const std::string& o, AioCompletionImpl *c,
+ uint64_t *handle, librados::WatchCtx2 *ctx));
+ int do_aio_watch(const std::string& o, AioCompletionImpl *c,
+ uint64_t *handle, librados::WatchCtx2 *ctx) {
+ return TestMemIoCtxImpl::aio_watch(o, c, handle, ctx);
+ }
+
+ MOCK_METHOD2(aio_unwatch, int(uint64_t handle, AioCompletionImpl *c));
+ int do_aio_unwatch(uint64_t handle, AioCompletionImpl *c) {
+ return TestMemIoCtxImpl::aio_unwatch(handle, c);
+ }
+
MOCK_METHOD7(exec, int(const std::string& oid,
TestClassHandler *handler,
const char *cls,
return TestMemIoCtxImpl::list_watchers(o, out_watchers);
}
+ MOCK_METHOD4(notify, int(const std::string& o, bufferlist& bl,
+ uint64_t timeout_ms, bufferlist *pbl));
+ int do_notify(const std::string& o, bufferlist& bl,
+ uint64_t timeout_ms, bufferlist *pbl) {
+ return TestMemIoCtxImpl::notify(o, bl, timeout_ms, pbl);
+ }
+
MOCK_METHOD4(read, int(const std::string& oid,
size_t len,
uint64_t off,
void default_to_parent() {
using namespace ::testing;
+ ON_CALL(*this, aio_watch(_, _, _, _)).WillByDefault(Invoke(this, &MockTestMemIoCtxImpl::do_aio_watch));
+ ON_CALL(*this, aio_unwatch(_, _)).WillByDefault(Invoke(this, &MockTestMemIoCtxImpl::do_aio_unwatch));
ON_CALL(*this, exec(_, _, _, _, _, _, _)).WillByDefault(Invoke(this, &MockTestMemIoCtxImpl::do_exec));
ON_CALL(*this, list_snaps(_, _)).WillByDefault(Invoke(this, &MockTestMemIoCtxImpl::do_list_snaps));
ON_CALL(*this, list_watchers(_, _)).WillByDefault(Invoke(this, &MockTestMemIoCtxImpl::do_list_watchers));
+ ON_CALL(*this, notify(_, _, _, _)).WillByDefault(Invoke(this, &MockTestMemIoCtxImpl::do_notify));
ON_CALL(*this, read(_, _, _, _)).WillByDefault(Invoke(this, &MockTestMemIoCtxImpl::do_read));
ON_CALL(*this, remove(_, _)).WillByDefault(Invoke(this, &MockTestMemIoCtxImpl::do_remove));
ON_CALL(*this, selfmanaged_snap_create(_)).WillByDefault(Invoke(this, &MockTestMemIoCtxImpl::do_selfmanaged_snap_create));
--- /dev/null
+// -*- mode:C; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "test/librbd/test_mock_fixture.h"
+#include "test/librbd/test_support.h"
+#include "test/librbd/mock/MockImageCtx.h"
+#include "test/librados_test_stub/MockTestMemIoCtxImpl.h"
+#include "test/librados_test_stub/MockTestMemRadosClient.h"
+#include "common/Cond.h"
+#include "common/Mutex.h"
+#include "librados/AioCompletionImpl.h"
+#include "librbd/ObjectWatcher.h"
+#include "gmock/gmock.h"
+#include "gtest/gtest.h"
+#include <list>
+
+namespace librbd {
+
+namespace {
+
+struct MockObjectWatcher : public ObjectWatcher<MockImageCtx> {
+ std::string oid;
+
+ MockObjectWatcher(MockImageCtx &mock_image_ctx, const std::string &oid)
+ : ObjectWatcher<MockImageCtx>(mock_image_ctx.md_ctx,
+ mock_image_ctx.op_work_queue),
+ oid(oid) {
+ }
+
+ virtual std::string get_oid() const override {
+ return oid;
+ }
+};
+
+} // anonymous namespace
+
+} // namespace librbd
+
+// template definitions
+#include "librbd/ObjectWatcher.cc"
+template class librbd::ObjectWatcher<librbd::MockImageCtx>;
+
+namespace librbd {
+
+using ::testing::_;
+using ::testing::DoDefault;
+using ::testing::Invoke;
+using ::testing::InSequence;
+using ::testing::Return;
+using ::testing::SaveArg;
+using ::testing::WithArg;
+
+class TestMockObjectWatcher : public TestMockFixture {
+public:
+ TestMockObjectWatcher() : m_lock("TestMockObjectWatcher::m_lock") {
+ }
+
+ virtual void SetUp() {
+ TestMockFixture::SetUp();
+
+ m_oid = get_temp_image_name();
+
+ bufferlist bl;
+ ASSERT_EQ(0, m_ioctx.write_full(m_oid, bl));
+ }
+
+ void expect_aio_watch(MockImageCtx &mock_image_ctx, int r,
+ const std::function<void()> &action = std::function<void()>()) {
+ librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(
+ mock_image_ctx.md_ctx));
+ librados::MockTestMemRadosClient *mock_rados_client(
+ mock_io_ctx.get_mock_rados_client());
+
+ auto &expect = EXPECT_CALL(mock_io_ctx, aio_watch(m_oid, _, _, _));
+ if (r < 0) {
+ expect.WillOnce(DoAll(WithArg<1>(Invoke([this, mock_rados_client, r, action](librados::AioCompletionImpl *c) {
+ if (action) {
+ action();
+ }
+
+ c->get();
+ mock_rados_client->finish_aio_completion(c, r);
+ notify_watch();
+ })),
+ Return(0)));
+ } else {
+ expect.WillOnce(DoAll(SaveArg<3>(&m_watch_ctx),
+ Invoke([this, &mock_io_ctx, action](const std::string& o,
+ librados::AioCompletionImpl *c,
+ uint64_t *handle,
+ librados::WatchCtx2 *ctx) {
+ if (action) {
+ action();
+ }
+
+ mock_io_ctx.do_aio_watch(o, c, handle, ctx);
+ notify_watch();
+ }),
+ Return(0)));
+ }
+ }
+
+ void expect_aio_unwatch(MockImageCtx &mock_image_ctx, int r,
+ const std::function<void()> &action = std::function<void()>()) {
+ librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(
+ mock_image_ctx.md_ctx));
+
+ auto &expect = EXPECT_CALL(mock_io_ctx, aio_unwatch(_, _));
+ if (r < 0) {
+ expect.WillOnce(DoAll(Invoke([this, &mock_io_ctx, r, action](uint64_t handle,
+ librados::AioCompletionImpl *c) {
+ if (action) {
+ action();
+ }
+
+ librados::AioCompletionImpl *dummy_c = new librados::AioCompletionImpl();
+ mock_io_ctx.do_aio_unwatch(handle, dummy_c);
+ ASSERT_EQ(0, dummy_c->wait_for_complete());
+ dummy_c->release();
+
+ c->get();
+ mock_io_ctx.get_mock_rados_client()->finish_aio_completion(c, r);
+ notify_watch();
+ }),
+ Return(0)));
+ } else {
+ expect.WillOnce(DoAll(Invoke([this, &mock_io_ctx, action](uint64_t handle,
+ librados::AioCompletionImpl *c) {
+ if (action) {
+ action();
+ }
+
+ mock_io_ctx.do_aio_unwatch(handle, c);
+ notify_watch();
+ }),
+ Return(0)));
+ }
+ }
+
+ std::string m_oid;
+ librados::WatchCtx2 *m_watch_ctx = nullptr;
+
+ void notify_watch() {
+ Mutex::Locker locker(m_lock);
+ ++m_watch_count;
+ m_cond.Signal();
+ }
+
+ bool wait_for_watch(MockImageCtx &mock_image_ctx, size_t count) {
+ Mutex::Locker locker(m_lock);
+ while (m_watch_count < count) {
+ if (m_cond.WaitInterval(mock_image_ctx.cct, m_lock,
+ utime_t(10, 0)) != 0) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ Mutex m_lock;
+ Cond m_cond;
+ size_t m_watch_count = 0;
+};
+
+TEST_F(TestMockObjectWatcher, Success) {
+ librbd::ImageCtx *ictx;
+ ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+ MockImageCtx mock_image_ctx(*ictx);
+ MockObjectWatcher mock_image_watcher(mock_image_ctx, m_oid);
+
+ InSequence seq;
+ expect_aio_watch(mock_image_ctx, 0);
+ expect_aio_unwatch(mock_image_ctx, 0);
+
+ C_SaferCond register_ctx;
+ mock_image_watcher.register_watch(®ister_ctx);
+ ASSERT_EQ(0, register_ctx.wait());
+
+ C_SaferCond unregister_ctx;
+ mock_image_watcher.unregister_watch(&unregister_ctx);
+ ASSERT_EQ(0, unregister_ctx.wait());
+}
+
+TEST_F(TestMockObjectWatcher, RegisterError) {
+ librbd::ImageCtx *ictx;
+ ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+ MockImageCtx mock_image_ctx(*ictx);
+ MockObjectWatcher mock_image_watcher(mock_image_ctx, m_oid);
+
+ InSequence seq;
+ expect_aio_watch(mock_image_ctx, -EINVAL);
+
+ C_SaferCond register_ctx;
+ mock_image_watcher.register_watch(®ister_ctx);
+ ASSERT_EQ(-EINVAL, register_ctx.wait());
+}
+
+TEST_F(TestMockObjectWatcher, UnregisterError) {
+ librbd::ImageCtx *ictx;
+ ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+ MockImageCtx mock_image_ctx(*ictx);
+ MockObjectWatcher mock_image_watcher(mock_image_ctx, m_oid);
+
+ InSequence seq;
+ expect_aio_watch(mock_image_ctx, 0);
+ expect_aio_unwatch(mock_image_ctx, -EINVAL);
+
+ C_SaferCond register_ctx;
+ mock_image_watcher.register_watch(®ister_ctx);
+ ASSERT_EQ(0, register_ctx.wait());
+
+ C_SaferCond unregister_ctx;
+ mock_image_watcher.unregister_watch(&unregister_ctx);
+ ASSERT_EQ(-EINVAL, unregister_ctx.wait());
+}
+
+TEST_F(TestMockObjectWatcher, Reregister) {
+ librbd::ImageCtx *ictx;
+ ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+ MockImageCtx mock_image_ctx(*ictx);
+ MockObjectWatcher mock_image_watcher(mock_image_ctx, m_oid);
+
+ expect_op_work_queue(mock_image_ctx);
+
+ InSequence seq;
+ expect_aio_watch(mock_image_ctx, 0);
+ expect_aio_unwatch(mock_image_ctx, 0);
+ expect_aio_watch(mock_image_ctx, 0);
+ expect_aio_unwatch(mock_image_ctx, 0);
+
+ C_SaferCond register_ctx;
+ mock_image_watcher.register_watch(®ister_ctx);
+ ASSERT_EQ(0, register_ctx.wait());
+
+ assert(m_watch_ctx != nullptr);
+ m_watch_ctx->handle_error(0, -ESHUTDOWN);
+
+ // wait for recovery unwatch/watch
+ ASSERT_TRUE(wait_for_watch(mock_image_ctx, 3));
+
+ C_SaferCond unregister_ctx;
+ mock_image_watcher.unregister_watch(&unregister_ctx);
+ ASSERT_EQ(0, unregister_ctx.wait());
+}
+
+TEST_F(TestMockObjectWatcher, ReregisterUnwatchError) {
+ librbd::ImageCtx *ictx;
+ ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+ MockImageCtx mock_image_ctx(*ictx);
+ MockObjectWatcher mock_image_watcher(mock_image_ctx, m_oid);
+
+ expect_op_work_queue(mock_image_ctx);
+
+ InSequence seq;
+ expect_aio_watch(mock_image_ctx, 0);
+ expect_aio_unwatch(mock_image_ctx, -EINVAL);
+ expect_aio_watch(mock_image_ctx, 0);
+ expect_aio_unwatch(mock_image_ctx, 0);
+
+ C_SaferCond register_ctx;
+ mock_image_watcher.register_watch(®ister_ctx);
+ ASSERT_EQ(0, register_ctx.wait());
+
+ assert(m_watch_ctx != nullptr);
+ m_watch_ctx->handle_error(0, -ESHUTDOWN);
+
+ // wait for recovery unwatch/watch
+ ASSERT_TRUE(wait_for_watch(mock_image_ctx, 3));
+
+ C_SaferCond unregister_ctx;
+ mock_image_watcher.unregister_watch(&unregister_ctx);
+ ASSERT_EQ(0, unregister_ctx.wait());
+}
+
+TEST_F(TestMockObjectWatcher, ReregisterWatchError) {
+ librbd::ImageCtx *ictx;
+ ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+ MockImageCtx mock_image_ctx(*ictx);
+ MockObjectWatcher mock_image_watcher(mock_image_ctx, m_oid);
+
+ expect_op_work_queue(mock_image_ctx);
+
+ InSequence seq;
+ expect_aio_watch(mock_image_ctx, 0);
+ expect_aio_unwatch(mock_image_ctx, 0);
+ expect_aio_watch(mock_image_ctx, -ESHUTDOWN);
+ expect_aio_watch(mock_image_ctx, 0);
+ expect_aio_unwatch(mock_image_ctx, 0);
+
+ C_SaferCond register_ctx;
+ mock_image_watcher.register_watch(®ister_ctx);
+ ASSERT_EQ(0, register_ctx.wait());
+
+ assert(m_watch_ctx != nullptr);
+ m_watch_ctx->handle_error(0, -ESHUTDOWN);
+
+ // wait for recovery unwatch/watch
+ ASSERT_TRUE(wait_for_watch(mock_image_ctx, 4));
+
+ C_SaferCond unregister_ctx;
+ mock_image_watcher.unregister_watch(&unregister_ctx);
+ ASSERT_EQ(0, unregister_ctx.wait());
+}
+
+TEST_F(TestMockObjectWatcher, ReregisterUnwatchPendingUnregister) {
+ librbd::ImageCtx *ictx;
+ ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+ MockImageCtx mock_image_ctx(*ictx);
+ MockObjectWatcher mock_image_watcher(mock_image_ctx, m_oid);
+
+ expect_op_work_queue(mock_image_ctx);
+
+ InSequence seq;
+ expect_aio_watch(mock_image_ctx, 0);
+
+ // inject an unregister
+ C_SaferCond unregister_ctx;
+ expect_aio_unwatch(mock_image_ctx, 0, [&mock_image_watcher, &unregister_ctx]() {
+ mock_image_watcher.unregister_watch(&unregister_ctx);
+ });
+
+ C_SaferCond register_ctx;
+ mock_image_watcher.register_watch(®ister_ctx);
+ ASSERT_EQ(0, register_ctx.wait());
+
+ assert(m_watch_ctx != nullptr);
+ m_watch_ctx->handle_error(0, -ESHUTDOWN);
+
+ ASSERT_EQ(0, unregister_ctx.wait());
+}
+
+TEST_F(TestMockObjectWatcher, ReregisterWatchPendingUnregister) {
+ librbd::ImageCtx *ictx;
+ ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+ MockImageCtx mock_image_ctx(*ictx);
+ MockObjectWatcher mock_image_watcher(mock_image_ctx, m_oid);
+
+ expect_op_work_queue(mock_image_ctx);
+
+ InSequence seq;
+ expect_aio_watch(mock_image_ctx, 0);
+ expect_aio_unwatch(mock_image_ctx, 0);
+
+ // inject an unregister
+ C_SaferCond unregister_ctx;
+ expect_aio_watch(mock_image_ctx, -ESHUTDOWN,
+ [&mock_image_watcher, &unregister_ctx]() {
+ mock_image_watcher.unregister_watch(&unregister_ctx);
+ });
+
+ C_SaferCond register_ctx;
+ mock_image_watcher.register_watch(®ister_ctx);
+ ASSERT_EQ(0, register_ctx.wait());
+
+ assert(m_watch_ctx != nullptr);
+ m_watch_ctx->handle_error(0, -ESHUTDOWN);
+
+ ASSERT_EQ(0, unregister_ctx.wait());
+}
+
+TEST_F(TestMockObjectWatcher, ReregisterPendingUnregister) {
+ librbd::ImageCtx *ictx;
+ ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+ MockImageCtx mock_image_ctx(*ictx);
+ MockObjectWatcher mock_image_watcher(mock_image_ctx, m_oid);
+
+ expect_op_work_queue(mock_image_ctx);
+
+ InSequence seq;
+ expect_aio_watch(mock_image_ctx, 0);
+ expect_aio_unwatch(mock_image_ctx, 0);
+
+ // inject an unregister
+ C_SaferCond unregister_ctx;
+ expect_aio_watch(mock_image_ctx, 0,
+ [&mock_image_watcher, &unregister_ctx]() {
+ mock_image_watcher.unregister_watch(&unregister_ctx);
+ });
+
+ expect_aio_unwatch(mock_image_ctx, 0);
+
+ C_SaferCond register_ctx;
+ mock_image_watcher.register_watch(®ister_ctx);
+ ASSERT_EQ(0, register_ctx.wait());
+
+ assert(m_watch_ctx != nullptr);
+ m_watch_ctx->handle_error(0, -ESHUTDOWN);
+
+ ASSERT_EQ(0, unregister_ctx.wait());
+}
+
+} // namespace librbd