TEST_F(TestInstanceWatcher, InitShutdown)
{
- InstanceWatcher<> instance_watcher(m_local_io_ctx, m_threads->work_queue);
+ InstanceWatcher<> instance_watcher(m_local_io_ctx, m_threads->work_queue,
+ nullptr, m_instance_id);
std::vector<std::string> instance_ids;
get_instances(&instance_ids);
ASSERT_EQ(0U, instance_ids.size());
librados::IoCtx io_ctx;
ASSERT_EQ("", connect_cluster_pp(cluster));
ASSERT_EQ(0, cluster.ioctx_create(_local_pool_name.c_str(), io_ctx));
- InstanceWatcher<> instance_watcher(io_ctx, m_threads->work_queue,
- instance_id);
+ InstanceWatcher<> instance_watcher(m_local_io_ctx, m_threads->work_queue,
+ nullptr, "instance_id");
// Init
ASSERT_EQ(0, instance_watcher.init());
ASSERT_EQ(0, m_local_io_ctx.list_watchers(oid, &watchers));
ASSERT_EQ(1U, watchers.size());
- get_instances(&instance_ids);
- ASSERT_EQ(1U, instance_ids.size());
-
// Remove
C_SaferCond on_remove;
InstanceWatcher<>::remove_instance(m_local_io_ctx, m_threads->work_queue,
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
+#include "librados/AioCompletionImpl.h"
#include "librbd/ManagedLock.h"
-#include "test/librbd/mock/MockImageCtx.h"
+#include "test/librados/test.h"
#include "test/librados_test_stub/MockTestMemIoCtxImpl.h"
+#include "test/librados_test_stub/MockTestMemRadosClient.h"
+#include "test/librbd/mock/MockImageCtx.h"
#include "test/rbd_mirror/test_mock_fixture.h"
+#include "tools/rbd_mirror/InstanceReplayer.h"
#include "tools/rbd_mirror/InstanceWatcher.h"
#include "tools/rbd_mirror/Threads.h"
} // namespace librbd
+namespace rbd {
+namespace mirror {
+
+template <>
+struct Threads<librbd::MockTestImageCtx> {
+ Mutex &timer_lock;
+ SafeTimer *timer;
+ ContextWQ *work_queue;
+
+ Threads(Threads<librbd::ImageCtx> *threads)
+ : timer_lock(threads->timer_lock), timer(threads->timer),
+ work_queue(threads->work_queue) {
+ }
+};
+
+template <>
+struct InstanceReplayer<librbd::MockTestImageCtx> {
+ MOCK_METHOD4(acquire_image, void(const std::string &, const std::string &,
+ const std::string &, Context *));
+ MOCK_METHOD5(release_image, void(const std::string &, const std::string &,
+ const std::string &, bool, Context *));
+};
+
+} // namespace mirror
+} // namespace rbd
+
// template definitions
#include "tools/rbd_mirror/InstanceWatcher.cc"
class TestMockInstanceWatcher : public TestMockFixture {
public:
typedef librbd::ManagedLock<librbd::MockTestImageCtx> MockManagedLock;
+ typedef InstanceReplayer<librbd::MockTestImageCtx> MockInstanceReplayer;
typedef InstanceWatcher<librbd::MockTestImageCtx> MockInstanceWatcher;
+ typedef Threads<librbd::MockTestImageCtx> MockThreads;
std::string m_instance_id;
std::string m_oid;
+ MockThreads *m_mock_threads;
void SetUp() override {
TestFixture::SetUp();
m_instance_id = stringify(m_local_io_ctx.get_instance_id());
m_oid = RBD_MIRROR_INSTANCE_PREFIX + m_instance_id;
+
+ m_mock_threads = new MockThreads(m_threads);
+ }
+
+ void TearDown() override {
+ delete m_mock_threads;
+ TestMockFixture::TearDown();
}
void expect_register_watch(librados::MockTestMemIoCtxImpl &mock_io_ctx) {
EXPECT_CALL(mock_io_ctx, aio_watch(m_oid, _, _, _));
}
+ void expect_register_watch(librados::MockTestMemIoCtxImpl &mock_io_ctx,
+ const std::string &instance_id) {
+ std::string oid = RBD_MIRROR_INSTANCE_PREFIX + instance_id;
+ EXPECT_CALL(mock_io_ctx, aio_watch(oid, _, _, _));
+ }
+
void expect_unregister_watch(librados::MockTestMemIoCtxImpl &mock_io_ctx) {
EXPECT_CALL(mock_io_ctx, aio_unwatch(_, _));
}
MockManagedLock mock_managed_lock;
librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(m_local_io_ctx));
- auto instance_watcher = new MockInstanceWatcher(m_local_io_ctx,
- m_threads->work_queue);
+ auto instance_watcher = new MockInstanceWatcher(
+ m_local_io_ctx, m_mock_threads->work_queue, nullptr, m_instance_id);
InSequence seq;
// Init
MockManagedLock mock_managed_lock;
librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(m_local_io_ctx));
- auto instance_watcher = new MockInstanceWatcher(m_local_io_ctx,
- m_threads->work_queue);
+ auto instance_watcher = new MockInstanceWatcher(
+ m_local_io_ctx, m_mock_threads->work_queue, nullptr, m_instance_id);
InSequence seq;
expect_register_instance(mock_io_ctx, 0);
MockManagedLock mock_managed_lock;
librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(m_local_io_ctx));
- auto instance_watcher = new MockInstanceWatcher(m_local_io_ctx,
- m_threads->work_queue);
+ auto instance_watcher = new MockInstanceWatcher(
+ m_local_io_ctx, m_mock_threads->work_queue, nullptr, m_instance_id);
InSequence seq;
// Init
expect_destroy_lock(mock_managed_lock, &on_destroy);
C_SaferCond on_remove;
- MockInstanceWatcher::remove_instance(m_local_io_ctx, m_threads->work_queue,
+ MockInstanceWatcher::remove_instance(m_local_io_ctx,
+ m_mock_threads->work_queue,
"instance_id", &on_remove);
ASSERT_EQ(0, on_remove.wait());
ASSERT_EQ(0, on_destroy.wait());
expect_destroy_lock(mock_managed_lock, &on_destroy);
C_SaferCond on_remove;
- MockInstanceWatcher::remove_instance(m_local_io_ctx, m_threads->work_queue,
+ MockInstanceWatcher::remove_instance(m_local_io_ctx,
+ m_mock_threads->work_queue,
"instance_id", &on_remove);
ASSERT_EQ(0, on_remove.wait());
ASSERT_EQ(0, on_destroy.wait());
}
+TEST_F(TestMockInstanceWatcher, ImageAcquireRelease) {
+ MockManagedLock mock_managed_lock;
+
+ librados::IoCtx& io_ctx1 = m_local_io_ctx;
+ std::string instance_id1 = m_instance_id;
+ librados::MockTestMemIoCtxImpl &mock_io_ctx1(get_mock_io_ctx(io_ctx1));
+ MockInstanceReplayer mock_instance_replayer1;
+ auto instance_watcher1 = MockInstanceWatcher::create(
+ io_ctx1, m_mock_threads->work_queue, &mock_instance_replayer1);
+
+ librados::Rados cluster;
+ librados::IoCtx io_ctx2;
+ EXPECT_EQ("", connect_cluster_pp(cluster));
+ EXPECT_EQ(0, cluster.ioctx_create(_local_pool_name.c_str(), io_ctx2));
+ std::string instance_id2 = stringify(io_ctx2.get_instance_id());
+ librados::MockTestMemIoCtxImpl &mock_io_ctx2(get_mock_io_ctx(io_ctx2));
+ MockInstanceReplayer mock_instance_replayer2;
+ auto instance_watcher2 = MockInstanceWatcher::create(
+ io_ctx2, m_mock_threads->work_queue, &mock_instance_replayer2);
+
+ InSequence seq;
+
+ // Init instance watcher 1
+ expect_register_instance(mock_io_ctx1, 0);
+ expect_register_watch(mock_io_ctx1, instance_id1);
+ expect_acquire_lock(mock_managed_lock, 0);
+ ASSERT_EQ(0, instance_watcher1->init());
+
+ // Init instance watcher 2
+ expect_register_instance(mock_io_ctx2, 0);
+ expect_register_watch(mock_io_ctx2, instance_id2);
+ expect_acquire_lock(mock_managed_lock, 0);
+ ASSERT_EQ(0, instance_watcher2->init());
+
+ // Acquire Image on the the same instance
+ EXPECT_CALL(mock_instance_replayer1, acquire_image("gid", "uuid", "id", _))
+ .WillOnce(WithArg<3>(CompleteContext(0)));
+ C_SaferCond on_acquire1;
+ instance_watcher1->notify_image_acquire(instance_id1, "gid", "uuid", "id",
+ &on_acquire1);
+ ASSERT_EQ(0, on_acquire1.wait());
+
+ // Acquire Image on the other instance
+ EXPECT_CALL(mock_instance_replayer2, acquire_image("gid", "uuid", "id", _))
+ .WillOnce(WithArg<3>(CompleteContext(0)));
+ C_SaferCond on_acquire2;
+ instance_watcher1->notify_image_acquire(instance_id2, "gid", "uuid", "id",
+ &on_acquire2);
+ ASSERT_EQ(0, on_acquire2.wait());
+
+ // Release Image on the the same instance
+ EXPECT_CALL(mock_instance_replayer1, release_image("gid", "uuid", "id", true,
+ _))
+ .WillOnce(WithArg<4>(CompleteContext(0)));
+ C_SaferCond on_release1;
+ instance_watcher1->notify_image_release(instance_id1, "gid", "uuid", "id",
+ true, &on_release1);
+ ASSERT_EQ(0, on_release1.wait());
+
+ // Release Image on the other instance
+ EXPECT_CALL(mock_instance_replayer2, release_image("gid", "uuid", "id", true,
+ _))
+ .WillOnce(WithArg<4>(CompleteContext(0)));
+ C_SaferCond on_release2;
+ instance_watcher1->notify_image_release(instance_id2, "gid", "uuid", "id",
+ true, &on_release2);
+ ASSERT_EQ(0, on_release2.wait());
+
+ // Shutdown instance watcher 1
+ expect_release_lock(mock_managed_lock, 0);
+ expect_unregister_watch(mock_io_ctx1);
+ expect_unregister_instance(mock_io_ctx1, 0);
+ instance_watcher1->shut_down();
+
+ expect_destroy_lock(mock_managed_lock);
+ delete instance_watcher1;
+
+ // Shutdown instance watcher 2
+ expect_release_lock(mock_managed_lock, 0);
+ expect_unregister_watch(mock_io_ctx2);
+ expect_unregister_instance(mock_io_ctx2, 0);
+ instance_watcher2->shut_down();
+
+ expect_destroy_lock(mock_managed_lock);
+ delete instance_watcher2;
+}
+
+TEST_F(TestMockInstanceWatcher, ImageAcquireReleaseCancel) {
+ MockManagedLock mock_managed_lock;
+ librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(m_local_io_ctx));
+
+ auto instance_watcher = new MockInstanceWatcher(
+ m_local_io_ctx, m_mock_threads->work_queue, nullptr, m_instance_id);
+ InSequence seq;
+
+ // Init
+ expect_register_instance(mock_io_ctx, 0);
+ expect_register_watch(mock_io_ctx);
+ expect_acquire_lock(mock_managed_lock, 0);
+ ASSERT_EQ(0, instance_watcher->init());
+
+ // Send Acquire Image and cancel
+ EXPECT_CALL(mock_io_ctx, aio_notify(_, _, _, _, _))
+ .WillOnce(Invoke(
+ [this, instance_watcher, &mock_io_ctx](
+ const std::string& o, librados::AioCompletionImpl *c,
+ bufferlist& bl, uint64_t timeout_ms, bufferlist *pbl) {
+ c->get();
+ auto ctx = new FunctionContext(
+ [instance_watcher, &mock_io_ctx, c, pbl](int r) {
+ instance_watcher->cancel_notify_requests("other");
+ ::encode(librbd::watcher::NotifyResponse(), *pbl);
+ mock_io_ctx.get_mock_rados_client()->
+ finish_aio_completion(c, -ETIMEDOUT);
+ });
+ m_threads->work_queue->queue(ctx, 0);
+ }));
+
+ C_SaferCond on_acquire;
+ instance_watcher->notify_image_acquire("other", "gid", "uuid", "id",
+ &on_acquire);
+ ASSERT_EQ(-ECANCELED, on_acquire.wait());
+
+ // Send Release Image and cancel
+ EXPECT_CALL(mock_io_ctx, aio_notify(_, _, _, _, _))
+ .WillOnce(Invoke(
+ [this, instance_watcher, &mock_io_ctx](
+ const std::string& o, librados::AioCompletionImpl *c,
+ bufferlist& bl, uint64_t timeout_ms, bufferlist *pbl) {
+ c->get();
+ auto ctx = new FunctionContext(
+ [instance_watcher, &mock_io_ctx, c, pbl](int r) {
+ instance_watcher->cancel_notify_requests("other");
+ ::encode(librbd::watcher::NotifyResponse(), *pbl);
+ mock_io_ctx.get_mock_rados_client()->
+ finish_aio_completion(c, -ETIMEDOUT);
+ });
+ m_threads->work_queue->queue(ctx, 0);
+ }));
+
+ C_SaferCond on_release;
+ instance_watcher->notify_image_release("other", "gid", "uuid", "id",
+ true, &on_release);
+ ASSERT_EQ(-ECANCELED, on_release.wait());
+
+ // Shutdown
+ expect_release_lock(mock_managed_lock, 0);
+ expect_unregister_watch(mock_io_ctx);
+ expect_unregister_instance(mock_io_ctx, 0);
+ instance_watcher->shut_down();
+
+ expect_destroy_lock(mock_managed_lock);
+ delete instance_watcher;
+}
+
} // namespace mirror
} // namespace rbd
add_library(rbd_mirror_types STATIC
+ instance_watcher/Types.cc
leader_watcher/Types.cc)
set(rbd_mirror_internal
// vim: ts=8 sw=2 smarttab
#include "InstanceWatcher.h"
+#include "include/atomic.h"
#include "include/stringify.h"
#include "common/debug.h"
#include "common/errno.h"
#include "cls/rbd/cls_rbd_client.h"
#include "librbd/ManagedLock.h"
#include "librbd/Utils.h"
+#include "InstanceReplayer.h"
#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_rbd_mirror
#undef dout_prefix
-#define dout_prefix *_dout << "rbd::mirror::InstanceWatcher: " \
- << this << " " << __func__ << ": "
+#define dout_prefix *_dout << "rbd::mirror::InstanceWatcher: "
namespace rbd {
namespace mirror {
+using namespace instance_watcher;
+
using librbd::util::create_async_context_callback;
using librbd::util::create_context_callback;
using librbd::util::create_rados_callback;
}
void finish(int r) override {
+ dout(20) << "C_GetInstances: " << this << " " << __func__ << ": r=" << r
+ << dendl;
+
if (r == 0) {
bufferlist::iterator it = out_bl.begin();
r = librbd::cls_client::mirror_instances_list_finish(&it, instance_ids);
};
template <typename I>
-struct RemoveInstanceRequest : public Context {
+struct C_RemoveInstanceRequest : public Context {
InstanceWatcher<I> instance_watcher;
Context *on_finish;
- RemoveInstanceRequest(librados::IoCtx &io_ctx, ContextWQ *work_queue,
- const std::string &instance_id, Context *on_finish)
- : instance_watcher(io_ctx, work_queue, instance_id), on_finish(on_finish) {
+ C_RemoveInstanceRequest(librados::IoCtx &io_ctx, ContextWQ *work_queue,
+ const std::string &instance_id, Context *on_finish)
+ : instance_watcher(io_ctx, work_queue, nullptr, instance_id),
+ on_finish(on_finish) {
}
void send() {
+ dout(20) << "C_RemoveInstanceRequest: " << this << " " << __func__ << dendl;
+
instance_watcher.remove(this);
}
void finish(int r) override {
+ dout(20) << "C_RemoveInstanceRequest: " << this << " " << __func__ << ": r="
+ << r << dendl;
assert(r == 0);
on_finish->complete(r);
} // anonymous namespace
+template <typename I>
+struct InstanceWatcher<I>::C_NotifyInstanceRequest : public Context {
+ InstanceWatcher<I> *instance_watcher;
+ librbd::watcher::Notifier notifier;
+ std::string instance_id;
+ uint64_t request_id;
+ bufferlist bl;
+ Context *on_finish;
+ librbd::watcher::NotifyResponse response;
+ atomic_t canceling;
+
+ C_NotifyInstanceRequest(InstanceWatcher<I> *instance_watcher,
+ const std::string &instance_id, uint64_t request_id,
+ bufferlist &&bl, Context *on_finish)
+ : instance_watcher(instance_watcher),
+ notifier(instance_watcher->m_work_queue, instance_watcher->m_ioctx,
+ RBD_MIRROR_INSTANCE_PREFIX + instance_id),
+ instance_id(instance_id), request_id(request_id), bl(bl),
+ on_finish(on_finish) {
+ instance_watcher->m_notify_op_tracker.start_op();
+ assert(instance_watcher->m_lock.is_locked());
+ auto result = instance_watcher->m_notify_ops.insert(
+ std::make_pair(instance_id, this)).second;
+ assert(result);
+ }
+
+ void send() {
+ dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__ << dendl;
+
+ notifier.notify(bl, &response, this);
+ }
+
+ void cancel() {
+ dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__ << dendl;
+
+ canceling.set(1);
+ }
+
+ void finish(int r) override {
+ dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__ << ": r="
+ << r << dendl;
+
+ if (r == 0 || r == -ETIMEDOUT) {
+ bool found = false;
+ for (auto &it : response.acks) {
+ auto &bl = it.second;
+ if (it.second.length() == 0) {
+ dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__
+ << ": no payload in ack, ignoring" << dendl;
+ continue;
+ }
+ try {
+ auto iter = bl.begin();
+ NotifyAckPayload ack;
+ ::decode(ack, iter);
+ if (ack.instance_id != instance_watcher->get_instance_id()) {
+ derr << "C_NotifyInstanceRequest: " << this << " " << __func__
+ << ": ack instance_id (" << ack.instance_id << ") "
+ << "does not match, ignoring" << dendl;
+ continue;
+ }
+ if (ack.request_id != request_id) {
+ derr << "C_NotifyInstanceRequest: " << this << " " << __func__
+ << ": ack request_id (" << ack.request_id << ") "
+ << "does not match, ignoring" << dendl;
+ continue;
+ }
+ r = ack.ret_val;
+ found = true;
+ break;
+ } catch (const buffer::error &err) {
+ derr << "C_NotifyInstanceRequest: " << this << " " << __func__
+ << ": failed to decode ack: " << err.what() << dendl;
+ continue;
+ }
+ }
+
+ if (!found) {
+ if (r == -ETIMEDOUT) {
+ if (canceling.read()) {
+ r = -ECANCELED;
+ } else {
+ derr << "C_NotifyInstanceRequest: " << this << " " << __func__
+ << ": resending after timeout" << dendl;
+ send();
+ return;
+ }
+ } else {
+ r = -EINVAL;
+ }
+ }
+ }
+
+ instance_watcher->m_notify_op_tracker.finish_op();
+ on_finish->complete(r);
+
+ Mutex::Locker locker(instance_watcher->m_lock);
+ auto result = instance_watcher->m_notify_ops.erase(
+ std::make_pair(instance_id, this));
+ assert(result > 0);
+ delete this;
+ }
+
+ void complete(int r) override {
+ finish(r);
+ }
+};
+
+#undef dout_prefix
+#define dout_prefix *_dout << "rbd::mirror::InstanceWatcher: " \
+ << this << " " << __func__ << ": "
template <typename I>
void InstanceWatcher<I>::get_instances(librados::IoCtx &io_ctx,
std::vector<std::string> *instance_ids,
ContextWQ *work_queue,
const std::string &instance_id,
Context *on_finish) {
- auto req = new RemoveInstanceRequest<I>(io_ctx, work_queue, instance_id,
- on_finish);
+ auto req = new C_RemoveInstanceRequest<I>(io_ctx, work_queue, instance_id,
+ on_finish);
req->send();
}
+template <typename I>
+InstanceWatcher<I> *InstanceWatcher<I>::create(
+ librados::IoCtx &io_ctx, ContextWQ *work_queue,
+ InstanceReplayer<I> *instance_replayer) {
+ return new InstanceWatcher<I>(io_ctx, work_queue, instance_replayer,
+ stringify(io_ctx.get_instance_id()));
+}
+
template <typename I>
InstanceWatcher<I>::InstanceWatcher(librados::IoCtx &io_ctx,
ContextWQ *work_queue,
- const boost::optional<std::string> &id)
- : Watcher(io_ctx, work_queue, RBD_MIRROR_INSTANCE_PREFIX +
- (id ? *id : stringify(io_ctx.get_instance_id()))),
- m_instance_id(id ? *id : stringify(io_ctx.get_instance_id())),
+ InstanceReplayer<I> *instance_replayer,
+ const std::string &instance_id)
+ : Watcher(io_ctx, work_queue, RBD_MIRROR_INSTANCE_PREFIX + instance_id),
+ m_instance_replayer(instance_replayer), m_instance_id(instance_id),
m_lock(unique_lock_name("rbd::mirror::InstanceWatcher::m_lock", this)),
m_instance_lock(librbd::ManagedLock<I>::create(
m_ioctx, m_work_queue, m_oid, this, librbd::managed_lock::EXCLUSIVE, true,
}
template <typename I>
-void InstanceWatcher<I>::handle_notify(uint64_t notify_id, uint64_t handle,
- uint64_t notifier_id, bufferlist &bl) {
- dout(20) << dendl;
+void InstanceWatcher<I>::notify_image_acquire(
+ const std::string &instance_id, const std::string &global_image_id,
+ const std::string &peer_mirror_uuid, const std::string &peer_image_id,
+ Context *on_notify_ack) {
+ dout(20) << "instance_id=" << instance_id << ", global_image_id="
+ << global_image_id << dendl;
+
+ Mutex::Locker locker(m_lock);
- bufferlist out;
- acknowledge_notify(notify_id, handle, out);
+ assert(m_on_finish == nullptr);
+
+ if (instance_id == m_instance_id) {
+ handle_image_acquire(global_image_id, peer_mirror_uuid, peer_image_id,
+ on_notify_ack);
+ } else {
+ uint64_t request_id = ++m_request_seq;
+ bufferlist bl;
+ ::encode(NotifyMessage{ImageAcquirePayload{
+ request_id, global_image_id, peer_mirror_uuid, peer_image_id}}, bl);
+ auto req = new C_NotifyInstanceRequest(this, instance_id, request_id,
+ std::move(bl), on_notify_ack);
+ req->send();
+ }
}
+template <typename I>
+void InstanceWatcher<I>::notify_image_release(
+ const std::string &instance_id, const std::string &global_image_id,
+ const std::string &peer_mirror_uuid, const std::string &peer_image_id,
+ bool schedule_delete, Context *on_notify_ack) {
+ dout(20) << "instance_id=" << instance_id << ", global_image_id="
+ << global_image_id << dendl;
+
+ Mutex::Locker locker(m_lock);
+
+ assert(m_on_finish == nullptr);
+
+ if (instance_id == m_instance_id) {
+ handle_image_release(global_image_id, peer_mirror_uuid, peer_image_id,
+ schedule_delete, on_notify_ack);
+ } else {
+ uint64_t request_id = ++m_request_seq;
+ bufferlist bl;
+ ::encode(NotifyMessage{ImageReleasePayload{
+ request_id, global_image_id, peer_mirror_uuid, peer_image_id,
+ schedule_delete}}, bl);
+ auto req = new C_NotifyInstanceRequest(this, instance_id, request_id,
+ std::move(bl), on_notify_ack);
+ req->send();
+ }
+}
+
+template <typename I>
+void InstanceWatcher<I>::cancel_notify_requests(
+ const std::string &instance_id) {
+ dout(20) << "instance_id=" << instance_id << dendl;
+
+ Mutex::Locker locker(m_lock);
+
+ for (auto op : m_notify_ops) {
+ if (op.first == instance_id) {
+ op.second->cancel();
+ }
+ }
+}
+
+
template <typename I>
void InstanceWatcher<I>::register_instance() {
assert(m_lock.is_locked());
std::swap(on_finish, m_on_finish);
}
+
on_finish->complete(r);
}
derr << "error unregistering instance: " << cpp_strerror(r) << dendl;
}
+ Mutex::Locker locker(m_lock);
+ wait_for_notify_ops();
+}
+
+template <typename I>
+void InstanceWatcher<I>::wait_for_notify_ops() {
+ dout(20) << dendl;
+
+ assert(m_lock.is_locked());
+
+ for (auto op : m_notify_ops) {
+ op.second->cancel();
+ }
+
+ Context *ctx = create_async_context_callback(
+ m_work_queue, create_context_callback<
+ InstanceWatcher<I>, &InstanceWatcher<I>::handle_wait_for_notify_ops>(this));
+
+ m_notify_op_tracker.wait_for_ops(ctx);
+}
+
+template <typename I>
+void InstanceWatcher<I>::handle_wait_for_notify_ops(int r) {
+ dout(20) << "r=" << r << dendl;
+
+ assert(r == 0);
+
Context *on_finish = nullptr;
{
Mutex::Locker locker(m_lock);
+ assert(m_notify_ops.empty());
+
std::swap(on_finish, m_on_finish);
r = m_ret_val;
remove_instance_object();
}
+template <typename I>
+Context *InstanceWatcher<I>::prepare_request(const std::string &instance_id,
+ uint64_t request_id,
+ C_NotifyAck *on_notify_ack) {
+ dout(20) << "instance_id=" << instance_id << ", request_id=" << request_id
+ << dendl;
+
+ Mutex::Locker locker(m_lock);
+
+ Context *ctx = nullptr;
+ Request request(instance_id, request_id);
+ auto it = m_requests.find(request);
+
+ if (it != m_requests.end()) {
+ dout(20) << "duplicate for in-progress request" << dendl;
+ delete it->on_notify_ack;
+ m_requests.erase(it);
+ } else {
+ ctx = new FunctionContext(
+ [this, instance_id, request_id] (int r) {
+ C_NotifyAck *on_notify_ack = nullptr;
+ {
+ // update request state in the requests list
+ Mutex::Locker locker(m_lock);
+ Request request(instance_id, request_id);
+ auto it = m_requests.find(request);
+ assert(it != m_requests.end());
+ on_notify_ack = it->on_notify_ack;
+ m_requests.erase(it);
+ }
+
+ ::encode(NotifyAckPayload(instance_id, request_id, r),
+ on_notify_ack->out);
+ on_notify_ack->complete(0);
+ });
+ }
+
+ request.on_notify_ack = on_notify_ack;
+ m_requests.insert(request);
+ return ctx;
+}
+
+template <typename I>
+void InstanceWatcher<I>::handle_notify(uint64_t notify_id, uint64_t handle,
+ uint64_t notifier_id, bufferlist &bl) {
+ dout(20) << "notify_id=" << notify_id << ", handle=" << handle << ", "
+ << "notifier_id=" << notifier_id << dendl;
+
+ auto ctx = new C_NotifyAck(this, notify_id, handle);
+
+ NotifyMessage notify_message;
+ try {
+ bufferlist::iterator iter = bl.begin();
+ ::decode(notify_message, iter);
+ } catch (const buffer::error &err) {
+ derr << "error decoding image notification: " << err.what() << dendl;
+ ctx->complete(0);
+ return;
+ }
+
+ apply_visitor(HandlePayloadVisitor(this, stringify(notifier_id), ctx),
+ notify_message.payload);
+}
+
+template <typename I>
+void InstanceWatcher<I>::handle_image_acquire(
+ const std::string &global_image_id, const std::string &peer_mirror_uuid,
+ const std::string &peer_image_id, Context *on_finish) {
+ dout(20) << "global_image_id=" << global_image_id << dendl;
+
+ m_instance_replayer->acquire_image(global_image_id, peer_mirror_uuid,
+ peer_image_id, on_finish);
+}
+
+template <typename I>
+void InstanceWatcher<I>::handle_image_release(
+ const std::string &global_image_id, const std::string &peer_mirror_uuid,
+ const std::string &peer_image_id, bool schedule_delete, Context *on_finish) {
+ dout(20) << "global_image_id=" << global_image_id << dendl;
+
+ m_instance_replayer->release_image(global_image_id, peer_mirror_uuid,
+ peer_image_id, schedule_delete, on_finish);
+}
+
+template <typename I>
+void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
+ const ImageAcquirePayload &payload,
+ C_NotifyAck *on_notify_ack) {
+ dout(20) << "image_acquire: instance_id=" << instance_id << ", "
+ << "request_id=" << payload.request_id << dendl;
+
+ auto on_finish = prepare_request(instance_id, payload.request_id,
+ on_notify_ack);
+ if (on_finish != nullptr) {
+ handle_image_acquire(payload.global_image_id, payload.peer_mirror_uuid,
+ payload.peer_image_id, on_finish);
+ }
+}
+
+template <typename I>
+void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
+ const ImageReleasePayload &payload,
+ C_NotifyAck *on_notify_ack) {
+ dout(20) << "image_release: instance_id=" << instance_id << ", "
+ << "request_id=" << payload.request_id << dendl;
+
+ auto on_finish = prepare_request(instance_id, payload.request_id,
+ on_notify_ack);
+ if (on_finish != nullptr) {
+ handle_image_release(payload.global_image_id, payload.peer_mirror_uuid,
+ payload.peer_image_id, payload.schedule_delete,
+ on_finish);
+ }
+}
+
+template <typename I>
+void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
+ const UnknownPayload &payload,
+ C_NotifyAck *on_notify_ack) {
+ dout(20) << "unknown: instance_id=" << instance_id << dendl;
+
+ on_notify_ack->complete(0);
+}
+
} // namespace mirror
} // namespace rbd
#ifndef CEPH_RBD_MIRROR_INSTANCE_WATCHER_H
#define CEPH_RBD_MIRROR_INSTANCE_WATCHER_H
+#include <map>
+#include <set>
#include <string>
#include <vector>
-#include <boost/optional.hpp>
+#include "common/AsyncOpTracker.h"
#include "librbd/Watcher.h"
#include "librbd/managed_lock/Types.h"
+#include "tools/rbd_mirror/instance_watcher/Types.h"
namespace librbd {
- class ImageCtx;
- template <typename> class ManagedLock;
+
+class ImageCtx;
+template <typename> class ManagedLock;
+
}
namespace rbd {
namespace mirror {
+template <typename> class InstanceReplayer;
+template <typename> struct Threads;
+
template <typename ImageCtxT = librbd::ImageCtx>
class InstanceWatcher : protected librbd::Watcher {
public:
static void get_instances(librados::IoCtx &io_ctx,
std::vector<std::string> *instance_ids,
Context *on_finish);
- static void remove_instance(librados::IoCtx &io_ctx, ContextWQ *work_queue,
+ static void remove_instance(librados::IoCtx &io_ctx,
+ ContextWQ *work_queue,
const std::string &instance_id,
Context *on_finish);
static InstanceWatcher *create(
librados::IoCtx &io_ctx, ContextWQ *work_queue,
- const boost::optional<std::string> &id = boost::none) {
- return new InstanceWatcher(io_ctx, work_queue, id);
- }
+ InstanceReplayer<ImageCtxT> *instance_replayer);
void destroy() {
delete this;
}
InstanceWatcher(librados::IoCtx &io_ctx, ContextWQ *work_queue,
- const boost::optional<std::string> &id = boost::none);
+ InstanceReplayer<ImageCtxT> *instance_replayer,
+ const std::string &instance_id);
~InstanceWatcher() override;
+ inline std::string &get_instance_id() {
+ return m_instance_id;
+ }
+
int init();
void shut_down();
void shut_down(Context *on_finish);
void remove(Context *on_finish);
-protected:
- void handle_notify(uint64_t notify_id, uint64_t handle, uint64_t notifier_id,
- bufferlist &bl) override;
+ void notify_image_acquire(const std::string &instance_id,
+ const std::string &global_image_id,
+ const std::string &peer_mirror_uuid,
+ const std::string &peer_image_id,
+ Context *on_notify_ack);
+ void notify_image_release(const std::string &instance_id,
+ const std::string &global_image_id,
+ const std::string &peer_mirror_uuid,
+ const std::string &peer_image_id,
+ bool schedule_delete, Context *on_notify_ack);
+
+ void cancel_notify_requests(const std::string &instance_id);
private:
/**
* GET_INSTANCE_LOCKER * * *>|
* ^ (remove) |
* | |
- * <uninitialized> <----------------+--------\
- * | (init) ^ | |
+ * <uninitialized> <----------------+---- WAIT_FOR_NOTIFY_OPS
+ * | (init) ^ | ^
* v (error) * | |
* REGISTER_INSTANCE * * * * * *|* *> UNREGISTER_INSTANCE
* | * | ^
* @endverbatim
*/
- bool m_owner;
+ struct C_NotifyInstanceRequest;
+
+ struct HandlePayloadVisitor : public boost::static_visitor<void> {
+ InstanceWatcher *instance_watcher;
+ std::string instance_id;
+ C_NotifyAck *on_notify_ack;
+
+ HandlePayloadVisitor(InstanceWatcher *instance_watcher,
+ const std::string &instance_id,
+ C_NotifyAck *on_notify_ack)
+ : instance_watcher(instance_watcher), instance_id(instance_id),
+ on_notify_ack(on_notify_ack) {
+ }
+
+ template <typename Payload>
+ inline void operator()(const Payload &payload) const {
+ instance_watcher->handle_payload(instance_id, payload, on_notify_ack);
+ }
+ };
+
+ struct Request {
+ std::string instance_id;
+ uint64_t request_id;
+ C_NotifyAck *on_notify_ack = nullptr;
+
+ Request(const std::string &instance_id, uint64_t request_id)
+ : instance_id(instance_id), request_id(request_id) {
+ }
+
+ inline bool operator<(const Request &rhs) const {
+ return instance_id < rhs.instance_id ||
+ (instance_id == rhs.instance_id && request_id < rhs.request_id);
+ }
+ };
+
+ Threads<ImageCtxT> *m_threads;
+ InstanceReplayer<ImageCtxT> *m_instance_replayer;
std::string m_instance_id;
mutable Mutex m_lock;
int m_ret_val = 0;
bool m_removing = false;
librbd::managed_lock::Locker m_instance_locker;
+ std::set<std::pair<std::string, C_NotifyInstanceRequest *>> m_notify_ops;
+ AsyncOpTracker m_notify_op_tracker;
+ uint64_t m_request_seq = 0;
+ std::set<Request> m_requests;
void register_instance();
void handle_register_instance(int r);
void unregister_instance();
void handle_unregister_instance(int r);
+ void wait_for_notify_ops();
+ void handle_wait_for_notify_ops(int r);
+
void get_instance_locker();
void handle_get_instance_locker(int r);
void break_instance_lock();
void handle_break_instance_lock(int r);
+
+ Context *prepare_request(const std::string &instance_id, uint64_t request_id,
+ C_NotifyAck *on_notify_ack);
+
+ void handle_notify(uint64_t notify_id, uint64_t handle,
+ uint64_t notifier_id, bufferlist &bl) override;
+
+ void handle_image_acquire(const std::string &global_image_id,
+ const std::string &peer_mirror_uuid,
+ const std::string &peer_image_id,
+ Context *on_finish);
+ void handle_image_release(const std::string &global_image_id,
+ const std::string &peer_mirror_uuid,
+ const std::string &peer_image_id,
+ bool schedule_delete, Context *on_finish);
+
+ void handle_payload(const std::string &instance_id,
+ const instance_watcher::ImageAcquirePayload &payload,
+ C_NotifyAck *on_notify_ack);
+ void handle_payload(const std::string &instance_id,
+ const instance_watcher::ImageReleasePayload &payload,
+ C_NotifyAck *on_notify_ack);
+ void handle_payload(const std::string &instance_id,
+ const instance_watcher::UnknownPayload &payload,
+ C_NotifyAck *on_notify_ack);
};
} // namespace mirror
#include "common/debug.h"
#include "common/errno.h"
#include "cls/rbd/cls_rbd_client.h"
+#include "include/stringify.h"
#include "librbd/Utils.h"
#include "librbd/watcher/Types.h"
#include "Threads.h"
}
template <typename I>
-bool LeaderWatcher<I>::is_leader() {
+bool LeaderWatcher<I>::is_leader() const {
Mutex::Locker locker(m_lock);
return is_leader(m_lock);
}
template <typename I>
-bool LeaderWatcher<I>::is_leader(Mutex &lock) {
+bool LeaderWatcher<I>::is_leader(Mutex &lock) const {
assert(m_lock.is_locked());
bool leader = m_leader_lock->is_leader();
return leader;
}
+template <typename I>
+bool LeaderWatcher<I>::is_releasing_leader() const {
+ Mutex::Locker locker(m_lock);
+
+ return is_releasing_leader(m_lock);
+}
+
+template <typename I>
+bool LeaderWatcher<I>::is_releasing_leader(Mutex &lock) const {
+ assert(m_lock.is_locked());
+
+ bool releasing = m_leader_lock->is_releasing_leader();
+ dout(20) << releasing << dendl;
+ return releasing;
+}
+
+template <typename I>
+bool LeaderWatcher<I>::get_leader_instance_id(std::string *instance_id) const {
+ dout(20) << dendl;
+
+ Mutex::Locker locker(m_lock);
+
+ if (is_leader(m_lock) || is_releasing_leader(m_lock)) {
+ *instance_id = stringify(m_notifier_id);
+ return true;
+ }
+
+ if (!m_locker.cookie.empty()) {
+ *instance_id = stringify(m_locker.entity.num());
+ return true;
+ }
+
+ return false;
+}
+
template <typename I>
void LeaderWatcher<I>::release_leader() {
dout(20) << dendl;
}
}
-
template <typename I>
void LeaderWatcher<I>::cancel_timer_task() {
assert(m_threads->timer_lock.is_locked());
void init(Context *on_finish);
void shut_down(Context *on_finish);
- bool is_leader();
+ bool is_leader() const;
+ bool is_releasing_leader() const;
+ bool get_leader_instance_id(std::string *instance_id) const;
void release_leader();
void list_instances(std::vector<std::string> *instance_ids);
return Parent::is_state_post_acquiring() || Parent::is_state_locked();
}
+ bool is_releasing_leader() const {
+ Mutex::Locker locker(Parent::m_lock);
+ return Parent::is_state_pre_releasing();
+ }
+
protected:
void post_acquire_lock_handler(int r, Context *on_finish) {
if (r == 0) {
Threads<ImageCtxT> *m_threads;
Listener *m_listener;
- Mutex m_lock;
+ mutable Mutex m_lock;
uint64_t m_notifier_id;
LeaderLock *m_leader_lock;
Context *m_on_finish = nullptr;
librbd::watcher::NotifyResponse m_heartbeat_response;
- bool is_leader(Mutex &m_lock);
+ bool is_leader(Mutex &m_lock) const;
+ bool is_releasing_leader(Mutex &m_lock) const;
void cancel_timer_task();
void schedule_timer_task(const std::string &name,
m_instance_replayer->init();
m_instance_replayer->add_peer(m_peer.uuid, m_remote_io_ctx);
- m_leader_watcher.reset(new LeaderWatcher<>(m_threads, m_local_io_ctx,
- &m_leader_listener));
- r = m_leader_watcher->init();
+ m_instance_watcher.reset(InstanceWatcher<>::create(m_local_io_ctx,
+ m_threads->work_queue,
+ m_instance_replayer.get()));
+ r = m_instance_watcher->init();
if (r < 0) {
- derr << "error initializing leader watcher: " << cpp_strerror(r) << dendl;
+ derr << "error initializing instance watcher: " << cpp_strerror(r) << dendl;
return r;
}
- m_instance_watcher.reset(InstanceWatcher<>::create(m_local_io_ctx,
- m_threads->work_queue));
- r = m_instance_watcher->init();
+ m_leader_watcher.reset(new LeaderWatcher<>(m_threads, m_local_io_ctx,
+ &m_leader_listener));
+ r = m_leader_watcher->init();
if (r < 0) {
- derr << "error initializing instance watcher: " << cpp_strerror(r) << dendl;
+ derr << "error initializing leader watcher: " << cpp_strerror(r) << dendl;
return r;
}
f->open_object_section("replayer_status");
f->dump_string("pool", m_local_io_ctx.get_pool_name());
f->dump_stream("peer") << m_peer;
+ f->dump_string("instance_id", m_instance_watcher->get_instance_id());
+
+ std::string leader_instance_id;
+ m_leader_watcher->get_leader_instance_id(&leader_instance_id);
+ f->dump_string("leader_instance_id", leader_instance_id);
bool leader = m_leader_watcher->is_leader();
f->dump_bool("leader", leader);
C_Gather *gather_ctx = new C_Gather(g_ceph_context, ctx);
for (auto &image_id : removed_image_ids) {
- m_instance_replayer->release_image(image_id.global_id, mirror_uuid,
- image_id.id, true,
- gather_ctx->new_sub());
+ // for now always send to myself (the leader)
+ std::string &instance_id = m_instance_watcher->get_instance_id();
+ m_instance_watcher->notify_image_release(instance_id, image_id.global_id,
+ mirror_uuid, image_id.id, true,
+ gather_ctx->new_sub());
}
for (auto &image_id : added_image_ids) {
- m_instance_replayer->acquire_image(image_id.global_id, mirror_uuid,
- image_id.id, gather_ctx->new_sub());
+ // for now always send to myself (the leader)
+ std::string &instance_id = m_instance_watcher->get_instance_id();
+ m_instance_watcher->notify_image_acquire(instance_id, image_id.global_id,
+ mirror_uuid, image_id.id,
+ gather_ctx->new_sub());
}
gather_ctx->activate();
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "Types.h"
+#include "include/assert.h"
+#include "include/stringify.h"
+#include "common/Formatter.h"
+
+namespace rbd {
+namespace mirror {
+namespace instance_watcher {
+
+namespace {
+
+class EncodePayloadVisitor : public boost::static_visitor<void> {
+public:
+ explicit EncodePayloadVisitor(bufferlist &bl) : m_bl(bl) {}
+
+ template <typename Payload>
+ inline void operator()(const Payload &payload) const {
+ ::encode(static_cast<uint32_t>(Payload::NOTIFY_OP), m_bl);
+ payload.encode(m_bl);
+ }
+
+private:
+ bufferlist &m_bl;
+};
+
+class DecodePayloadVisitor : public boost::static_visitor<void> {
+public:
+ DecodePayloadVisitor(__u8 version, bufferlist::iterator &iter)
+ : m_version(version), m_iter(iter) {}
+
+ template <typename Payload>
+ inline void operator()(Payload &payload) const {
+ payload.decode(m_version, m_iter);
+ }
+
+private:
+ __u8 m_version;
+ bufferlist::iterator &m_iter;
+};
+
+class DumpPayloadVisitor : public boost::static_visitor<void> {
+public:
+ explicit DumpPayloadVisitor(Formatter *formatter) : m_formatter(formatter) {}
+
+ template <typename Payload>
+ inline void operator()(const Payload &payload) const {
+ NotifyOp notify_op = Payload::NOTIFY_OP;
+ m_formatter->dump_string("notify_op", stringify(notify_op));
+ payload.dump(m_formatter);
+ }
+
+private:
+ ceph::Formatter *m_formatter;
+};
+
+} // anonymous namespace
+
+void ImagePayloadBase::encode(bufferlist &bl) const {
+ ::encode(request_id, bl);
+ ::encode(global_image_id, bl);
+ ::encode(peer_mirror_uuid, bl);
+ ::encode(peer_image_id, bl);
+}
+
+void ImagePayloadBase::decode(__u8 version, bufferlist::iterator &iter) {
+ ::decode(request_id, iter);
+ ::decode(global_image_id, iter);
+ ::decode(peer_mirror_uuid, iter);
+ ::decode(peer_image_id, iter);
+}
+
+void ImagePayloadBase::dump(Formatter *f) const {
+ f->dump_unsigned("request_id", request_id);
+ f->dump_string("global_image_id", global_image_id);
+ f->dump_string("peer_mirror_uuid", peer_mirror_uuid);
+ f->dump_string("peer_image_id", peer_image_id);
+}
+
+void ImageReleasePayload::encode(bufferlist &bl) const {
+ ImagePayloadBase::encode(bl);
+ ::encode(schedule_delete, bl);
+}
+
+void ImageReleasePayload::decode(__u8 version, bufferlist::iterator &iter) {
+ ImagePayloadBase::decode(version, iter);
+ ::decode(schedule_delete, iter);
+}
+
+void ImageReleasePayload::dump(Formatter *f) const {
+ ImagePayloadBase::dump(f);
+ f->dump_bool("schedule_delete", schedule_delete);
+}
+
+void UnknownPayload::encode(bufferlist &bl) const {
+ assert(false);
+}
+
+void UnknownPayload::decode(__u8 version, bufferlist::iterator &iter) {
+}
+
+void UnknownPayload::dump(Formatter *f) const {
+}
+
+void NotifyMessage::encode(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ boost::apply_visitor(EncodePayloadVisitor(bl), payload);
+ ENCODE_FINISH(bl);
+}
+
+void NotifyMessage::decode(bufferlist::iterator& iter) {
+ DECODE_START(1, iter);
+
+ uint32_t notify_op;
+ ::decode(notify_op, iter);
+
+ // select the correct payload variant based upon the encoded op
+ switch (notify_op) {
+ case NOTIFY_OP_IMAGE_ACQUIRE:
+ payload = ImageAcquirePayload();
+ break;
+ case NOTIFY_OP_IMAGE_RELEASE:
+ payload = ImageReleasePayload();
+ break;
+ default:
+ payload = UnknownPayload();
+ break;
+ }
+
+ apply_visitor(DecodePayloadVisitor(struct_v, iter), payload);
+ DECODE_FINISH(iter);
+}
+
+void NotifyMessage::dump(Formatter *f) const {
+ apply_visitor(DumpPayloadVisitor(f), payload);
+}
+
+void NotifyMessage::generate_test_instances(std::list<NotifyMessage *> &o) {
+ o.push_back(new NotifyMessage(ImageAcquirePayload()));
+ o.push_back(new NotifyMessage(ImageAcquirePayload(1, "gid", "uuid", "id")));
+
+ o.push_back(new NotifyMessage(ImageReleasePayload()));
+ o.push_back(new NotifyMessage(ImageReleasePayload(1, "gid", "uuid", "id",
+ true)));
+}
+
+std::ostream &operator<<(std::ostream &out, const NotifyOp &op) {
+ switch (op) {
+ case NOTIFY_OP_IMAGE_ACQUIRE:
+ out << "ImageAcquire";
+ break;
+ case NOTIFY_OP_IMAGE_RELEASE:
+ out << "ImageRelease";
+ break;
+ default:
+ out << "Unknown (" << static_cast<uint32_t>(op) << ")";
+ break;
+ }
+ return out;
+}
+
+void NotifyAckPayload::encode(bufferlist &bl) const {
+ ::encode(instance_id, bl);
+ ::encode(request_id, bl);
+ ::encode(ret_val, bl);
+}
+
+void NotifyAckPayload::decode(bufferlist::iterator &iter) {
+ ::decode(instance_id, iter);
+ ::decode(request_id, iter);
+ ::decode(ret_val, iter);
+}
+
+void NotifyAckPayload::dump(Formatter *f) const {
+ f->dump_string("instance_id", instance_id);
+ f->dump_unsigned("request_id", request_id);
+ f->dump_int("request_id", ret_val);
+}
+
+} // namespace instance_watcher
+} // namespace mirror
+} // namespace rbd
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef RBD_MIRROR_INSTANCE_WATCHER_TYPES_H
+#define RBD_MIRROR_INSTANCE_WATCHER_TYPES_H
+
+#include <string>
+#include <set>
+#include <boost/variant.hpp>
+
+#include "include/buffer_fwd.h"
+#include "include/encoding.h"
+#include "include/int_types.h"
+
+namespace ceph { class Formatter; }
+
+namespace rbd {
+namespace mirror {
+namespace instance_watcher {
+
+enum NotifyOp {
+ NOTIFY_OP_IMAGE_ACQUIRE = 0,
+ NOTIFY_OP_IMAGE_RELEASE = 1,
+};
+
+struct ImagePayloadBase {
+ uint64_t request_id;
+ std::string global_image_id;
+ std::string peer_mirror_uuid;
+ std::string peer_image_id;
+
+ ImagePayloadBase() : request_id(0) {
+ }
+
+ ImagePayloadBase(uint64_t request_id, const std::string &global_image_id,
+ const std::string &peer_mirror_uuid,
+ const std::string &peer_image_id)
+ : request_id(request_id), global_image_id(global_image_id),
+ peer_mirror_uuid(peer_mirror_uuid), peer_image_id(peer_image_id) {
+ }
+
+ void encode(bufferlist &bl) const;
+ void decode(__u8 version, bufferlist::iterator &iter);
+ void dump(Formatter *f) const;
+};
+
+struct ImageAcquirePayload : public ImagePayloadBase {
+ static const NotifyOp NOTIFY_OP = NOTIFY_OP_IMAGE_ACQUIRE;
+
+ ImageAcquirePayload() : ImagePayloadBase() {
+ }
+
+ ImageAcquirePayload(uint64_t request_id, const std::string &global_image_id,
+ const std::string &peer_mirror_uuid,
+ const std::string &peer_image_id)
+ : ImagePayloadBase(request_id, global_image_id, peer_mirror_uuid,
+ peer_image_id) {
+ }
+};
+
+struct ImageReleasePayload : public ImagePayloadBase {
+ static const NotifyOp NOTIFY_OP = NOTIFY_OP_IMAGE_RELEASE;
+
+ bool schedule_delete;
+
+ ImageReleasePayload() : ImagePayloadBase(), schedule_delete(false) {
+ }
+
+ ImageReleasePayload(uint64_t request_id, const std::string &global_image_id,
+ const std::string &peer_mirror_uuid,
+ const std::string &peer_image_id, bool schedule_delete)
+ : ImagePayloadBase(request_id, global_image_id, peer_mirror_uuid,
+ peer_image_id),
+ schedule_delete(schedule_delete) {
+ }
+
+ void encode(bufferlist &bl) const;
+ void decode(__u8 version, bufferlist::iterator &iter);
+ void dump(Formatter *f) const;
+};
+
+struct UnknownPayload {
+ static const NotifyOp NOTIFY_OP = static_cast<NotifyOp>(-1);
+
+ UnknownPayload() {
+ }
+
+ void encode(bufferlist &bl) const;
+ void decode(__u8 version, bufferlist::iterator &iter);
+ void dump(Formatter *f) const;
+};
+
+typedef boost::variant<ImageAcquirePayload,
+ ImageReleasePayload,
+ UnknownPayload> Payload;
+
+struct NotifyMessage {
+ NotifyMessage(const Payload &payload = UnknownPayload()) : payload(payload) {
+ }
+
+ Payload payload;
+
+ void encode(bufferlist& bl) const;
+ void decode(bufferlist::iterator& it);
+ void dump(Formatter *f) const;
+
+ static void generate_test_instances(std::list<NotifyMessage *> &o);
+};
+
+WRITE_CLASS_ENCODER(NotifyMessage);
+
+std::ostream &operator<<(std::ostream &out, const NotifyOp &op);
+
+struct NotifyAckPayload {
+ std::string instance_id;
+ uint64_t request_id;
+ int ret_val;
+
+ NotifyAckPayload() : request_id(0), ret_val(0) {
+ }
+
+ NotifyAckPayload(const std::string &instance_id, uint64_t request_id,
+ int ret_val)
+ : instance_id(instance_id), request_id(request_id), ret_val(ret_val) {
+ }
+
+ void encode(bufferlist &bl) const;
+ void decode(bufferlist::iterator& it);
+ void dump(Formatter *f) const;
+};
+
+WRITE_CLASS_ENCODER(NotifyAckPayload);
+
+} // namespace instance_watcher
+} // namespace mirror
+} // namespace librbd
+
+using rbd::mirror::instance_watcher::encode;
+using rbd::mirror::instance_watcher::decode;
+
+#endif // RBD_MIRROR_INSTANCE_WATCHER_TYPES_H