From: Mykola Golub Date: Wed, 15 Mar 2017 09:47:00 +0000 (+0100) Subject: rbd-mirror A/A: proxy InstanceReplayer APIs via InstanceWatcher RPC X-Git-Tag: v12.0.3~320^2 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=99c695828862397031136d18ccf461922db52d4d;p=ceph-ci.git rbd-mirror A/A: proxy InstanceReplayer APIs via InstanceWatcher RPC Fixes: http://tracker.ceph.com/issues/18787 Signed-off-by: Mykola Golub --- diff --git a/src/test/rbd_mirror/test_InstanceWatcher.cc b/src/test/rbd_mirror/test_InstanceWatcher.cc index cbf8cf015dc..1924928f0c2 100644 --- a/src/test/rbd_mirror/test_InstanceWatcher.cc +++ b/src/test/rbd_mirror/test_InstanceWatcher.cc @@ -43,7 +43,8 @@ public: TEST_F(TestInstanceWatcher, InitShutdown) { - InstanceWatcher<> instance_watcher(m_local_io_ctx, m_threads->work_queue); + InstanceWatcher<> instance_watcher(m_local_io_ctx, m_threads->work_queue, + nullptr, m_instance_id); std::vector instance_ids; get_instances(&instance_ids); ASSERT_EQ(0U, instance_ids.size()); @@ -91,8 +92,8 @@ TEST_F(TestInstanceWatcher, Remove) librados::IoCtx io_ctx; ASSERT_EQ("", connect_cluster_pp(cluster)); ASSERT_EQ(0, cluster.ioctx_create(_local_pool_name.c_str(), io_ctx)); - InstanceWatcher<> instance_watcher(io_ctx, m_threads->work_queue, - instance_id); + InstanceWatcher<> instance_watcher(m_local_io_ctx, m_threads->work_queue, + nullptr, "instance_id"); // Init ASSERT_EQ(0, instance_watcher.init()); @@ -105,9 +106,6 @@ TEST_F(TestInstanceWatcher, Remove) ASSERT_EQ(0, m_local_io_ctx.list_watchers(oid, &watchers)); ASSERT_EQ(1U, watchers.size()); - get_instances(&instance_ids); - ASSERT_EQ(1U, instance_ids.size()); - // Remove C_SaferCond on_remove; InstanceWatcher<>::remove_instance(m_local_io_ctx, m_threads->work_queue, diff --git a/src/test/rbd_mirror/test_mock_InstanceWatcher.cc b/src/test/rbd_mirror/test_mock_InstanceWatcher.cc index 42dfee4e04a..5a94e5a65ab 100644 --- a/src/test/rbd_mirror/test_mock_InstanceWatcher.cc +++ b/src/test/rbd_mirror/test_mock_InstanceWatcher.cc @@ -1,10 +1,14 @@ // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab +#include "librados/AioCompletionImpl.h" #include "librbd/ManagedLock.h" -#include "test/librbd/mock/MockImageCtx.h" +#include "test/librados/test.h" #include "test/librados_test_stub/MockTestMemIoCtxImpl.h" +#include "test/librados_test_stub/MockTestMemRadosClient.h" +#include "test/librbd/mock/MockImageCtx.h" #include "test/rbd_mirror/test_mock_fixture.h" +#include "tools/rbd_mirror/InstanceReplayer.h" #include "tools/rbd_mirror/InstanceWatcher.h" #include "tools/rbd_mirror/Threads.h" @@ -54,6 +58,32 @@ ManagedLock *ManagedLock::s_instance = nullp } // namespace librbd +namespace rbd { +namespace mirror { + +template <> +struct Threads { + Mutex &timer_lock; + SafeTimer *timer; + ContextWQ *work_queue; + + Threads(Threads *threads) + : timer_lock(threads->timer_lock), timer(threads->timer), + work_queue(threads->work_queue) { + } +}; + +template <> +struct InstanceReplayer { + MOCK_METHOD4(acquire_image, void(const std::string &, const std::string &, + const std::string &, Context *)); + MOCK_METHOD5(release_image, void(const std::string &, const std::string &, + const std::string &, bool, Context *)); +}; + +} // namespace mirror +} // namespace rbd + // template definitions #include "tools/rbd_mirror/InstanceWatcher.cc" @@ -70,10 +100,13 @@ using ::testing::WithArg; class TestMockInstanceWatcher : public TestMockFixture { public: typedef librbd::ManagedLock MockManagedLock; + typedef InstanceReplayer MockInstanceReplayer; typedef InstanceWatcher MockInstanceWatcher; + typedef Threads MockThreads; std::string m_instance_id; std::string m_oid; + MockThreads *m_mock_threads; void SetUp() override { TestFixture::SetUp(); @@ -82,12 +115,25 @@ public: m_instance_id = stringify(m_local_io_ctx.get_instance_id()); m_oid = RBD_MIRROR_INSTANCE_PREFIX + m_instance_id; + + m_mock_threads = new MockThreads(m_threads); + } + + void TearDown() override { + delete m_mock_threads; + TestMockFixture::TearDown(); } void expect_register_watch(librados::MockTestMemIoCtxImpl &mock_io_ctx) { EXPECT_CALL(mock_io_ctx, aio_watch(m_oid, _, _, _)); } + void expect_register_watch(librados::MockTestMemIoCtxImpl &mock_io_ctx, + const std::string &instance_id) { + std::string oid = RBD_MIRROR_INSTANCE_PREFIX + instance_id; + EXPECT_CALL(mock_io_ctx, aio_watch(oid, _, _, _)); + } + void expect_unregister_watch(librados::MockTestMemIoCtxImpl &mock_io_ctx) { EXPECT_CALL(mock_io_ctx, aio_unwatch(_, _)); } @@ -148,8 +194,8 @@ TEST_F(TestMockInstanceWatcher, InitShutdown) { MockManagedLock mock_managed_lock; librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(m_local_io_ctx)); - auto instance_watcher = new MockInstanceWatcher(m_local_io_ctx, - m_threads->work_queue); + auto instance_watcher = new MockInstanceWatcher( + m_local_io_ctx, m_mock_threads->work_queue, nullptr, m_instance_id); InSequence seq; // Init @@ -172,8 +218,8 @@ TEST_F(TestMockInstanceWatcher, InitError) { MockManagedLock mock_managed_lock; librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(m_local_io_ctx)); - auto instance_watcher = new MockInstanceWatcher(m_local_io_ctx, - m_threads->work_queue); + auto instance_watcher = new MockInstanceWatcher( + m_local_io_ctx, m_mock_threads->work_queue, nullptr, m_instance_id); InSequence seq; expect_register_instance(mock_io_ctx, 0); @@ -192,8 +238,8 @@ TEST_F(TestMockInstanceWatcher, ShutdownError) { MockManagedLock mock_managed_lock; librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(m_local_io_ctx)); - auto instance_watcher = new MockInstanceWatcher(m_local_io_ctx, - m_threads->work_queue); + auto instance_watcher = new MockInstanceWatcher( + m_local_io_ctx, m_mock_threads->work_queue, nullptr, m_instance_id); InSequence seq; // Init @@ -228,7 +274,8 @@ TEST_F(TestMockInstanceWatcher, Remove) { expect_destroy_lock(mock_managed_lock, &on_destroy); C_SaferCond on_remove; - MockInstanceWatcher::remove_instance(m_local_io_ctx, m_threads->work_queue, + MockInstanceWatcher::remove_instance(m_local_io_ctx, + m_mock_threads->work_queue, "instance_id", &on_remove); ASSERT_EQ(0, on_remove.wait()); ASSERT_EQ(0, on_destroy.wait()); @@ -246,11 +293,167 @@ TEST_F(TestMockInstanceWatcher, RemoveNoent) { expect_destroy_lock(mock_managed_lock, &on_destroy); C_SaferCond on_remove; - MockInstanceWatcher::remove_instance(m_local_io_ctx, m_threads->work_queue, + MockInstanceWatcher::remove_instance(m_local_io_ctx, + m_mock_threads->work_queue, "instance_id", &on_remove); ASSERT_EQ(0, on_remove.wait()); ASSERT_EQ(0, on_destroy.wait()); } +TEST_F(TestMockInstanceWatcher, ImageAcquireRelease) { + MockManagedLock mock_managed_lock; + + librados::IoCtx& io_ctx1 = m_local_io_ctx; + std::string instance_id1 = m_instance_id; + librados::MockTestMemIoCtxImpl &mock_io_ctx1(get_mock_io_ctx(io_ctx1)); + MockInstanceReplayer mock_instance_replayer1; + auto instance_watcher1 = MockInstanceWatcher::create( + io_ctx1, m_mock_threads->work_queue, &mock_instance_replayer1); + + librados::Rados cluster; + librados::IoCtx io_ctx2; + EXPECT_EQ("", connect_cluster_pp(cluster)); + EXPECT_EQ(0, cluster.ioctx_create(_local_pool_name.c_str(), io_ctx2)); + std::string instance_id2 = stringify(io_ctx2.get_instance_id()); + librados::MockTestMemIoCtxImpl &mock_io_ctx2(get_mock_io_ctx(io_ctx2)); + MockInstanceReplayer mock_instance_replayer2; + auto instance_watcher2 = MockInstanceWatcher::create( + io_ctx2, m_mock_threads->work_queue, &mock_instance_replayer2); + + InSequence seq; + + // Init instance watcher 1 + expect_register_instance(mock_io_ctx1, 0); + expect_register_watch(mock_io_ctx1, instance_id1); + expect_acquire_lock(mock_managed_lock, 0); + ASSERT_EQ(0, instance_watcher1->init()); + + // Init instance watcher 2 + expect_register_instance(mock_io_ctx2, 0); + expect_register_watch(mock_io_ctx2, instance_id2); + expect_acquire_lock(mock_managed_lock, 0); + ASSERT_EQ(0, instance_watcher2->init()); + + // Acquire Image on the the same instance + EXPECT_CALL(mock_instance_replayer1, acquire_image("gid", "uuid", "id", _)) + .WillOnce(WithArg<3>(CompleteContext(0))); + C_SaferCond on_acquire1; + instance_watcher1->notify_image_acquire(instance_id1, "gid", "uuid", "id", + &on_acquire1); + ASSERT_EQ(0, on_acquire1.wait()); + + // Acquire Image on the other instance + EXPECT_CALL(mock_instance_replayer2, acquire_image("gid", "uuid", "id", _)) + .WillOnce(WithArg<3>(CompleteContext(0))); + C_SaferCond on_acquire2; + instance_watcher1->notify_image_acquire(instance_id2, "gid", "uuid", "id", + &on_acquire2); + ASSERT_EQ(0, on_acquire2.wait()); + + // Release Image on the the same instance + EXPECT_CALL(mock_instance_replayer1, release_image("gid", "uuid", "id", true, + _)) + .WillOnce(WithArg<4>(CompleteContext(0))); + C_SaferCond on_release1; + instance_watcher1->notify_image_release(instance_id1, "gid", "uuid", "id", + true, &on_release1); + ASSERT_EQ(0, on_release1.wait()); + + // Release Image on the other instance + EXPECT_CALL(mock_instance_replayer2, release_image("gid", "uuid", "id", true, + _)) + .WillOnce(WithArg<4>(CompleteContext(0))); + C_SaferCond on_release2; + instance_watcher1->notify_image_release(instance_id2, "gid", "uuid", "id", + true, &on_release2); + ASSERT_EQ(0, on_release2.wait()); + + // Shutdown instance watcher 1 + expect_release_lock(mock_managed_lock, 0); + expect_unregister_watch(mock_io_ctx1); + expect_unregister_instance(mock_io_ctx1, 0); + instance_watcher1->shut_down(); + + expect_destroy_lock(mock_managed_lock); + delete instance_watcher1; + + // Shutdown instance watcher 2 + expect_release_lock(mock_managed_lock, 0); + expect_unregister_watch(mock_io_ctx2); + expect_unregister_instance(mock_io_ctx2, 0); + instance_watcher2->shut_down(); + + expect_destroy_lock(mock_managed_lock); + delete instance_watcher2; +} + +TEST_F(TestMockInstanceWatcher, ImageAcquireReleaseCancel) { + MockManagedLock mock_managed_lock; + librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(m_local_io_ctx)); + + auto instance_watcher = new MockInstanceWatcher( + m_local_io_ctx, m_mock_threads->work_queue, nullptr, m_instance_id); + InSequence seq; + + // Init + expect_register_instance(mock_io_ctx, 0); + expect_register_watch(mock_io_ctx); + expect_acquire_lock(mock_managed_lock, 0); + ASSERT_EQ(0, instance_watcher->init()); + + // Send Acquire Image and cancel + EXPECT_CALL(mock_io_ctx, aio_notify(_, _, _, _, _)) + .WillOnce(Invoke( + [this, instance_watcher, &mock_io_ctx]( + const std::string& o, librados::AioCompletionImpl *c, + bufferlist& bl, uint64_t timeout_ms, bufferlist *pbl) { + c->get(); + auto ctx = new FunctionContext( + [instance_watcher, &mock_io_ctx, c, pbl](int r) { + instance_watcher->cancel_notify_requests("other"); + ::encode(librbd::watcher::NotifyResponse(), *pbl); + mock_io_ctx.get_mock_rados_client()-> + finish_aio_completion(c, -ETIMEDOUT); + }); + m_threads->work_queue->queue(ctx, 0); + })); + + C_SaferCond on_acquire; + instance_watcher->notify_image_acquire("other", "gid", "uuid", "id", + &on_acquire); + ASSERT_EQ(-ECANCELED, on_acquire.wait()); + + // Send Release Image and cancel + EXPECT_CALL(mock_io_ctx, aio_notify(_, _, _, _, _)) + .WillOnce(Invoke( + [this, instance_watcher, &mock_io_ctx]( + const std::string& o, librados::AioCompletionImpl *c, + bufferlist& bl, uint64_t timeout_ms, bufferlist *pbl) { + c->get(); + auto ctx = new FunctionContext( + [instance_watcher, &mock_io_ctx, c, pbl](int r) { + instance_watcher->cancel_notify_requests("other"); + ::encode(librbd::watcher::NotifyResponse(), *pbl); + mock_io_ctx.get_mock_rados_client()-> + finish_aio_completion(c, -ETIMEDOUT); + }); + m_threads->work_queue->queue(ctx, 0); + })); + + C_SaferCond on_release; + instance_watcher->notify_image_release("other", "gid", "uuid", "id", + true, &on_release); + ASSERT_EQ(-ECANCELED, on_release.wait()); + + // Shutdown + expect_release_lock(mock_managed_lock, 0); + expect_unregister_watch(mock_io_ctx); + expect_unregister_instance(mock_io_ctx, 0); + instance_watcher->shut_down(); + + expect_destroy_lock(mock_managed_lock); + delete instance_watcher; +} + } // namespace mirror } // namespace rbd diff --git a/src/tools/rbd_mirror/CMakeLists.txt b/src/tools/rbd_mirror/CMakeLists.txt index 8086d0e5815..4f1acbcbd4f 100644 --- a/src/tools/rbd_mirror/CMakeLists.txt +++ b/src/tools/rbd_mirror/CMakeLists.txt @@ -1,4 +1,5 @@ add_library(rbd_mirror_types STATIC + instance_watcher/Types.cc leader_watcher/Types.cc) set(rbd_mirror_internal diff --git a/src/tools/rbd_mirror/InstanceWatcher.cc b/src/tools/rbd_mirror/InstanceWatcher.cc index b59936a2640..4bedf700906 100644 --- a/src/tools/rbd_mirror/InstanceWatcher.cc +++ b/src/tools/rbd_mirror/InstanceWatcher.cc @@ -2,22 +2,25 @@ // vim: ts=8 sw=2 smarttab #include "InstanceWatcher.h" +#include "include/atomic.h" #include "include/stringify.h" #include "common/debug.h" #include "common/errno.h" #include "cls/rbd/cls_rbd_client.h" #include "librbd/ManagedLock.h" #include "librbd/Utils.h" +#include "InstanceReplayer.h" #define dout_context g_ceph_context #define dout_subsys ceph_subsys_rbd_mirror #undef dout_prefix -#define dout_prefix *_dout << "rbd::mirror::InstanceWatcher: " \ - << this << " " << __func__ << ": " +#define dout_prefix *_dout << "rbd::mirror::InstanceWatcher: " namespace rbd { namespace mirror { +using namespace instance_watcher; + using librbd::util::create_async_context_callback; using librbd::util::create_context_callback; using librbd::util::create_rados_callback; @@ -35,6 +38,9 @@ struct C_GetInstances : public Context { } void finish(int r) override { + dout(20) << "C_GetInstances: " << this << " " << __func__ << ": r=" << r + << dendl; + if (r == 0) { bufferlist::iterator it = out_bl.begin(); r = librbd::cls_client::mirror_instances_list_finish(&it, instance_ids); @@ -46,20 +52,25 @@ struct C_GetInstances : public Context { }; template -struct RemoveInstanceRequest : public Context { +struct C_RemoveInstanceRequest : public Context { InstanceWatcher instance_watcher; Context *on_finish; - RemoveInstanceRequest(librados::IoCtx &io_ctx, ContextWQ *work_queue, - const std::string &instance_id, Context *on_finish) - : instance_watcher(io_ctx, work_queue, instance_id), on_finish(on_finish) { + C_RemoveInstanceRequest(librados::IoCtx &io_ctx, ContextWQ *work_queue, + const std::string &instance_id, Context *on_finish) + : instance_watcher(io_ctx, work_queue, nullptr, instance_id), + on_finish(on_finish) { } void send() { + dout(20) << "C_RemoveInstanceRequest: " << this << " " << __func__ << dendl; + instance_watcher.remove(this); } void finish(int r) override { + dout(20) << "C_RemoveInstanceRequest: " << this << " " << __func__ << ": r=" + << r << dendl; assert(r == 0); on_finish->complete(r); @@ -68,6 +79,117 @@ struct RemoveInstanceRequest : public Context { } // anonymous namespace +template +struct InstanceWatcher::C_NotifyInstanceRequest : public Context { + InstanceWatcher *instance_watcher; + librbd::watcher::Notifier notifier; + std::string instance_id; + uint64_t request_id; + bufferlist bl; + Context *on_finish; + librbd::watcher::NotifyResponse response; + atomic_t canceling; + + C_NotifyInstanceRequest(InstanceWatcher *instance_watcher, + const std::string &instance_id, uint64_t request_id, + bufferlist &&bl, Context *on_finish) + : instance_watcher(instance_watcher), + notifier(instance_watcher->m_work_queue, instance_watcher->m_ioctx, + RBD_MIRROR_INSTANCE_PREFIX + instance_id), + instance_id(instance_id), request_id(request_id), bl(bl), + on_finish(on_finish) { + instance_watcher->m_notify_op_tracker.start_op(); + assert(instance_watcher->m_lock.is_locked()); + auto result = instance_watcher->m_notify_ops.insert( + std::make_pair(instance_id, this)).second; + assert(result); + } + + void send() { + dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__ << dendl; + + notifier.notify(bl, &response, this); + } + + void cancel() { + dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__ << dendl; + + canceling.set(1); + } + + void finish(int r) override { + dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__ << ": r=" + << r << dendl; + + if (r == 0 || r == -ETIMEDOUT) { + bool found = false; + for (auto &it : response.acks) { + auto &bl = it.second; + if (it.second.length() == 0) { + dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__ + << ": no payload in ack, ignoring" << dendl; + continue; + } + try { + auto iter = bl.begin(); + NotifyAckPayload ack; + ::decode(ack, iter); + if (ack.instance_id != instance_watcher->get_instance_id()) { + derr << "C_NotifyInstanceRequest: " << this << " " << __func__ + << ": ack instance_id (" << ack.instance_id << ") " + << "does not match, ignoring" << dendl; + continue; + } + if (ack.request_id != request_id) { + derr << "C_NotifyInstanceRequest: " << this << " " << __func__ + << ": ack request_id (" << ack.request_id << ") " + << "does not match, ignoring" << dendl; + continue; + } + r = ack.ret_val; + found = true; + break; + } catch (const buffer::error &err) { + derr << "C_NotifyInstanceRequest: " << this << " " << __func__ + << ": failed to decode ack: " << err.what() << dendl; + continue; + } + } + + if (!found) { + if (r == -ETIMEDOUT) { + if (canceling.read()) { + r = -ECANCELED; + } else { + derr << "C_NotifyInstanceRequest: " << this << " " << __func__ + << ": resending after timeout" << dendl; + send(); + return; + } + } else { + r = -EINVAL; + } + } + } + + instance_watcher->m_notify_op_tracker.finish_op(); + on_finish->complete(r); + + Mutex::Locker locker(instance_watcher->m_lock); + auto result = instance_watcher->m_notify_ops.erase( + std::make_pair(instance_id, this)); + assert(result > 0); + delete this; + } + + void complete(int r) override { + finish(r); + } +}; + +#undef dout_prefix +#define dout_prefix *_dout << "rbd::mirror::InstanceWatcher: " \ + << this << " " << __func__ << ": " template void InstanceWatcher::get_instances(librados::IoCtx &io_ctx, std::vector *instance_ids, @@ -87,18 +209,26 @@ void InstanceWatcher::remove_instance(librados::IoCtx &io_ctx, ContextWQ *work_queue, const std::string &instance_id, Context *on_finish) { - auto req = new RemoveInstanceRequest(io_ctx, work_queue, instance_id, - on_finish); + auto req = new C_RemoveInstanceRequest(io_ctx, work_queue, instance_id, + on_finish); req->send(); } +template +InstanceWatcher *InstanceWatcher::create( + librados::IoCtx &io_ctx, ContextWQ *work_queue, + InstanceReplayer *instance_replayer) { + return new InstanceWatcher(io_ctx, work_queue, instance_replayer, + stringify(io_ctx.get_instance_id())); +} + template InstanceWatcher::InstanceWatcher(librados::IoCtx &io_ctx, ContextWQ *work_queue, - const boost::optional &id) - : Watcher(io_ctx, work_queue, RBD_MIRROR_INSTANCE_PREFIX + - (id ? *id : stringify(io_ctx.get_instance_id()))), - m_instance_id(id ? *id : stringify(io_ctx.get_instance_id())), + InstanceReplayer *instance_replayer, + const std::string &instance_id) + : Watcher(io_ctx, work_queue, RBD_MIRROR_INSTANCE_PREFIX + instance_id), + m_instance_replayer(instance_replayer), m_instance_id(instance_id), m_lock(unique_lock_name("rbd::mirror::InstanceWatcher::m_lock", this)), m_instance_lock(librbd::ManagedLock::create( m_ioctx, m_work_queue, m_oid, this, librbd::managed_lock::EXCLUSIVE, true, @@ -166,14 +296,73 @@ void InstanceWatcher::remove(Context *on_finish) { } template -void InstanceWatcher::handle_notify(uint64_t notify_id, uint64_t handle, - uint64_t notifier_id, bufferlist &bl) { - dout(20) << dendl; +void InstanceWatcher::notify_image_acquire( + const std::string &instance_id, const std::string &global_image_id, + const std::string &peer_mirror_uuid, const std::string &peer_image_id, + Context *on_notify_ack) { + dout(20) << "instance_id=" << instance_id << ", global_image_id=" + << global_image_id << dendl; + + Mutex::Locker locker(m_lock); - bufferlist out; - acknowledge_notify(notify_id, handle, out); + assert(m_on_finish == nullptr); + + if (instance_id == m_instance_id) { + handle_image_acquire(global_image_id, peer_mirror_uuid, peer_image_id, + on_notify_ack); + } else { + uint64_t request_id = ++m_request_seq; + bufferlist bl; + ::encode(NotifyMessage{ImageAcquirePayload{ + request_id, global_image_id, peer_mirror_uuid, peer_image_id}}, bl); + auto req = new C_NotifyInstanceRequest(this, instance_id, request_id, + std::move(bl), on_notify_ack); + req->send(); + } } +template +void InstanceWatcher::notify_image_release( + const std::string &instance_id, const std::string &global_image_id, + const std::string &peer_mirror_uuid, const std::string &peer_image_id, + bool schedule_delete, Context *on_notify_ack) { + dout(20) << "instance_id=" << instance_id << ", global_image_id=" + << global_image_id << dendl; + + Mutex::Locker locker(m_lock); + + assert(m_on_finish == nullptr); + + if (instance_id == m_instance_id) { + handle_image_release(global_image_id, peer_mirror_uuid, peer_image_id, + schedule_delete, on_notify_ack); + } else { + uint64_t request_id = ++m_request_seq; + bufferlist bl; + ::encode(NotifyMessage{ImageReleasePayload{ + request_id, global_image_id, peer_mirror_uuid, peer_image_id, + schedule_delete}}, bl); + auto req = new C_NotifyInstanceRequest(this, instance_id, request_id, + std::move(bl), on_notify_ack); + req->send(); + } +} + +template +void InstanceWatcher::cancel_notify_requests( + const std::string &instance_id) { + dout(20) << "instance_id=" << instance_id << dendl; + + Mutex::Locker locker(m_lock); + + for (auto op : m_notify_ops) { + if (op.first == instance_id) { + op.second->cancel(); + } + } +} + + template void InstanceWatcher::register_instance() { assert(m_lock.is_locked()); @@ -309,6 +498,7 @@ void InstanceWatcher::handle_acquire_lock(int r) { std::swap(on_finish, m_on_finish); } + on_finish->complete(r); } @@ -422,10 +612,39 @@ void InstanceWatcher::handle_unregister_instance(int r) { derr << "error unregistering instance: " << cpp_strerror(r) << dendl; } + Mutex::Locker locker(m_lock); + wait_for_notify_ops(); +} + +template +void InstanceWatcher::wait_for_notify_ops() { + dout(20) << dendl; + + assert(m_lock.is_locked()); + + for (auto op : m_notify_ops) { + op.second->cancel(); + } + + Context *ctx = create_async_context_callback( + m_work_queue, create_context_callback< + InstanceWatcher, &InstanceWatcher::handle_wait_for_notify_ops>(this)); + + m_notify_op_tracker.wait_for_ops(ctx); +} + +template +void InstanceWatcher::handle_wait_for_notify_ops(int r) { + dout(20) << "r=" << r << dendl; + + assert(r == 0); + Context *on_finish = nullptr; { Mutex::Locker locker(m_lock); + assert(m_notify_ops.empty()); + std::swap(on_finish, m_on_finish); r = m_ret_val; @@ -496,6 +715,130 @@ void InstanceWatcher::handle_break_instance_lock(int r) { remove_instance_object(); } +template +Context *InstanceWatcher::prepare_request(const std::string &instance_id, + uint64_t request_id, + C_NotifyAck *on_notify_ack) { + dout(20) << "instance_id=" << instance_id << ", request_id=" << request_id + << dendl; + + Mutex::Locker locker(m_lock); + + Context *ctx = nullptr; + Request request(instance_id, request_id); + auto it = m_requests.find(request); + + if (it != m_requests.end()) { + dout(20) << "duplicate for in-progress request" << dendl; + delete it->on_notify_ack; + m_requests.erase(it); + } else { + ctx = new FunctionContext( + [this, instance_id, request_id] (int r) { + C_NotifyAck *on_notify_ack = nullptr; + { + // update request state in the requests list + Mutex::Locker locker(m_lock); + Request request(instance_id, request_id); + auto it = m_requests.find(request); + assert(it != m_requests.end()); + on_notify_ack = it->on_notify_ack; + m_requests.erase(it); + } + + ::encode(NotifyAckPayload(instance_id, request_id, r), + on_notify_ack->out); + on_notify_ack->complete(0); + }); + } + + request.on_notify_ack = on_notify_ack; + m_requests.insert(request); + return ctx; +} + +template +void InstanceWatcher::handle_notify(uint64_t notify_id, uint64_t handle, + uint64_t notifier_id, bufferlist &bl) { + dout(20) << "notify_id=" << notify_id << ", handle=" << handle << ", " + << "notifier_id=" << notifier_id << dendl; + + auto ctx = new C_NotifyAck(this, notify_id, handle); + + NotifyMessage notify_message; + try { + bufferlist::iterator iter = bl.begin(); + ::decode(notify_message, iter); + } catch (const buffer::error &err) { + derr << "error decoding image notification: " << err.what() << dendl; + ctx->complete(0); + return; + } + + apply_visitor(HandlePayloadVisitor(this, stringify(notifier_id), ctx), + notify_message.payload); +} + +template +void InstanceWatcher::handle_image_acquire( + const std::string &global_image_id, const std::string &peer_mirror_uuid, + const std::string &peer_image_id, Context *on_finish) { + dout(20) << "global_image_id=" << global_image_id << dendl; + + m_instance_replayer->acquire_image(global_image_id, peer_mirror_uuid, + peer_image_id, on_finish); +} + +template +void InstanceWatcher::handle_image_release( + const std::string &global_image_id, const std::string &peer_mirror_uuid, + const std::string &peer_image_id, bool schedule_delete, Context *on_finish) { + dout(20) << "global_image_id=" << global_image_id << dendl; + + m_instance_replayer->release_image(global_image_id, peer_mirror_uuid, + peer_image_id, schedule_delete, on_finish); +} + +template +void InstanceWatcher::handle_payload(const std::string &instance_id, + const ImageAcquirePayload &payload, + C_NotifyAck *on_notify_ack) { + dout(20) << "image_acquire: instance_id=" << instance_id << ", " + << "request_id=" << payload.request_id << dendl; + + auto on_finish = prepare_request(instance_id, payload.request_id, + on_notify_ack); + if (on_finish != nullptr) { + handle_image_acquire(payload.global_image_id, payload.peer_mirror_uuid, + payload.peer_image_id, on_finish); + } +} + +template +void InstanceWatcher::handle_payload(const std::string &instance_id, + const ImageReleasePayload &payload, + C_NotifyAck *on_notify_ack) { + dout(20) << "image_release: instance_id=" << instance_id << ", " + << "request_id=" << payload.request_id << dendl; + + auto on_finish = prepare_request(instance_id, payload.request_id, + on_notify_ack); + if (on_finish != nullptr) { + handle_image_release(payload.global_image_id, payload.peer_mirror_uuid, + payload.peer_image_id, payload.schedule_delete, + on_finish); + } +} + +template +void InstanceWatcher::handle_payload(const std::string &instance_id, + const UnknownPayload &payload, + C_NotifyAck *on_notify_ack) { + dout(20) << "unknown: instance_id=" << instance_id << dendl; + + on_notify_ack->complete(0); +} + } // namespace mirror } // namespace rbd diff --git a/src/tools/rbd_mirror/InstanceWatcher.h b/src/tools/rbd_mirror/InstanceWatcher.h index d89e40f0a5a..a9d287f1306 100644 --- a/src/tools/rbd_mirror/InstanceWatcher.h +++ b/src/tools/rbd_mirror/InstanceWatcher.h @@ -4,44 +4,56 @@ #ifndef CEPH_RBD_MIRROR_INSTANCE_WATCHER_H #define CEPH_RBD_MIRROR_INSTANCE_WATCHER_H +#include +#include #include #include -#include +#include "common/AsyncOpTracker.h" #include "librbd/Watcher.h" #include "librbd/managed_lock/Types.h" +#include "tools/rbd_mirror/instance_watcher/Types.h" namespace librbd { - class ImageCtx; - template class ManagedLock; + +class ImageCtx; +template class ManagedLock; + } namespace rbd { namespace mirror { +template class InstanceReplayer; +template struct Threads; + template class InstanceWatcher : protected librbd::Watcher { public: static void get_instances(librados::IoCtx &io_ctx, std::vector *instance_ids, Context *on_finish); - static void remove_instance(librados::IoCtx &io_ctx, ContextWQ *work_queue, + static void remove_instance(librados::IoCtx &io_ctx, + ContextWQ *work_queue, const std::string &instance_id, Context *on_finish); static InstanceWatcher *create( librados::IoCtx &io_ctx, ContextWQ *work_queue, - const boost::optional &id = boost::none) { - return new InstanceWatcher(io_ctx, work_queue, id); - } + InstanceReplayer *instance_replayer); void destroy() { delete this; } InstanceWatcher(librados::IoCtx &io_ctx, ContextWQ *work_queue, - const boost::optional &id = boost::none); + InstanceReplayer *instance_replayer, + const std::string &instance_id); ~InstanceWatcher() override; + inline std::string &get_instance_id() { + return m_instance_id; + } + int init(); void shut_down(); @@ -49,9 +61,18 @@ public: void shut_down(Context *on_finish); void remove(Context *on_finish); -protected: - void handle_notify(uint64_t notify_id, uint64_t handle, uint64_t notifier_id, - bufferlist &bl) override; + void notify_image_acquire(const std::string &instance_id, + const std::string &global_image_id, + const std::string &peer_mirror_uuid, + const std::string &peer_image_id, + Context *on_notify_ack); + void notify_image_release(const std::string &instance_id, + const std::string &global_image_id, + const std::string &peer_mirror_uuid, + const std::string &peer_image_id, + bool schedule_delete, Context *on_notify_ack); + + void cancel_notify_requests(const std::string &instance_id); private: /** @@ -63,8 +84,8 @@ private: * GET_INSTANCE_LOCKER * * *>| * ^ (remove) | * | | - * <----------------+--------\ - * | (init) ^ | | + * <----------------+---- WAIT_FOR_NOTIFY_OPS + * | (init) ^ | ^ * v (error) * | | * REGISTER_INSTANCE * * * * * *|* *> UNREGISTER_INSTANCE * | * | ^ @@ -83,7 +104,43 @@ private: * @endverbatim */ - bool m_owner; + struct C_NotifyInstanceRequest; + + struct HandlePayloadVisitor : public boost::static_visitor { + InstanceWatcher *instance_watcher; + std::string instance_id; + C_NotifyAck *on_notify_ack; + + HandlePayloadVisitor(InstanceWatcher *instance_watcher, + const std::string &instance_id, + C_NotifyAck *on_notify_ack) + : instance_watcher(instance_watcher), instance_id(instance_id), + on_notify_ack(on_notify_ack) { + } + + template + inline void operator()(const Payload &payload) const { + instance_watcher->handle_payload(instance_id, payload, on_notify_ack); + } + }; + + struct Request { + std::string instance_id; + uint64_t request_id; + C_NotifyAck *on_notify_ack = nullptr; + + Request(const std::string &instance_id, uint64_t request_id) + : instance_id(instance_id), request_id(request_id) { + } + + inline bool operator<(const Request &rhs) const { + return instance_id < rhs.instance_id || + (instance_id == rhs.instance_id && request_id < rhs.request_id); + } + }; + + Threads *m_threads; + InstanceReplayer *m_instance_replayer; std::string m_instance_id; mutable Mutex m_lock; @@ -92,6 +149,10 @@ private: int m_ret_val = 0; bool m_removing = false; librbd::managed_lock::Locker m_instance_locker; + std::set> m_notify_ops; + AsyncOpTracker m_notify_op_tracker; + uint64_t m_request_seq = 0; + std::set m_requests; void register_instance(); void handle_register_instance(int r); @@ -117,11 +178,39 @@ private: void unregister_instance(); void handle_unregister_instance(int r); + void wait_for_notify_ops(); + void handle_wait_for_notify_ops(int r); + void get_instance_locker(); void handle_get_instance_locker(int r); void break_instance_lock(); void handle_break_instance_lock(int r); + + Context *prepare_request(const std::string &instance_id, uint64_t request_id, + C_NotifyAck *on_notify_ack); + + void handle_notify(uint64_t notify_id, uint64_t handle, + uint64_t notifier_id, bufferlist &bl) override; + + void handle_image_acquire(const std::string &global_image_id, + const std::string &peer_mirror_uuid, + const std::string &peer_image_id, + Context *on_finish); + void handle_image_release(const std::string &global_image_id, + const std::string &peer_mirror_uuid, + const std::string &peer_image_id, + bool schedule_delete, Context *on_finish); + + void handle_payload(const std::string &instance_id, + const instance_watcher::ImageAcquirePayload &payload, + C_NotifyAck *on_notify_ack); + void handle_payload(const std::string &instance_id, + const instance_watcher::ImageReleasePayload &payload, + C_NotifyAck *on_notify_ack); + void handle_payload(const std::string &instance_id, + const instance_watcher::UnknownPayload &payload, + C_NotifyAck *on_notify_ack); }; } // namespace mirror diff --git a/src/tools/rbd_mirror/LeaderWatcher.cc b/src/tools/rbd_mirror/LeaderWatcher.cc index 152ec3862c7..0d4957130d9 100644 --- a/src/tools/rbd_mirror/LeaderWatcher.cc +++ b/src/tools/rbd_mirror/LeaderWatcher.cc @@ -6,6 +6,7 @@ #include "common/debug.h" #include "common/errno.h" #include "cls/rbd/cls_rbd_client.h" +#include "include/stringify.h" #include "librbd/Utils.h" #include "librbd/watcher/Types.h" #include "Threads.h" @@ -240,14 +241,14 @@ void LeaderWatcher::handle_wait_for_tasks() { } template -bool LeaderWatcher::is_leader() { +bool LeaderWatcher::is_leader() const { Mutex::Locker locker(m_lock); return is_leader(m_lock); } template -bool LeaderWatcher::is_leader(Mutex &lock) { +bool LeaderWatcher::is_leader(Mutex &lock) const { assert(m_lock.is_locked()); bool leader = m_leader_lock->is_leader(); @@ -255,6 +256,41 @@ bool LeaderWatcher::is_leader(Mutex &lock) { return leader; } +template +bool LeaderWatcher::is_releasing_leader() const { + Mutex::Locker locker(m_lock); + + return is_releasing_leader(m_lock); +} + +template +bool LeaderWatcher::is_releasing_leader(Mutex &lock) const { + assert(m_lock.is_locked()); + + bool releasing = m_leader_lock->is_releasing_leader(); + dout(20) << releasing << dendl; + return releasing; +} + +template +bool LeaderWatcher::get_leader_instance_id(std::string *instance_id) const { + dout(20) << dendl; + + Mutex::Locker locker(m_lock); + + if (is_leader(m_lock) || is_releasing_leader(m_lock)) { + *instance_id = stringify(m_notifier_id); + return true; + } + + if (!m_locker.cookie.empty()) { + *instance_id = stringify(m_locker.entity.num()); + return true; + } + + return false; +} + template void LeaderWatcher::release_leader() { dout(20) << dendl; @@ -279,7 +315,6 @@ void LeaderWatcher::list_instances(std::vector *instance_ids) { } } - template void LeaderWatcher::cancel_timer_task() { assert(m_threads->timer_lock.is_locked()); diff --git a/src/tools/rbd_mirror/LeaderWatcher.h b/src/tools/rbd_mirror/LeaderWatcher.h index b3d05122c2f..b4aac4010fa 100644 --- a/src/tools/rbd_mirror/LeaderWatcher.h +++ b/src/tools/rbd_mirror/LeaderWatcher.h @@ -45,7 +45,9 @@ public: void init(Context *on_finish); void shut_down(Context *on_finish); - bool is_leader(); + bool is_leader() const; + bool is_releasing_leader() const; + bool get_leader_instance_id(std::string *instance_id) const; void release_leader(); void list_instances(std::vector *instance_ids); @@ -112,6 +114,11 @@ private: return Parent::is_state_post_acquiring() || Parent::is_state_locked(); } + bool is_releasing_leader() const { + Mutex::Locker locker(Parent::m_lock); + return Parent::is_state_pre_releasing(); + } + protected: void post_acquire_lock_handler(int r, Context *on_finish) { if (r == 0) { @@ -181,7 +188,7 @@ private: Threads *m_threads; Listener *m_listener; - Mutex m_lock; + mutable Mutex m_lock; uint64_t m_notifier_id; LeaderLock *m_leader_lock; Context *m_on_finish = nullptr; @@ -198,7 +205,8 @@ private: librbd::watcher::NotifyResponse m_heartbeat_response; - bool is_leader(Mutex &m_lock); + bool is_leader(Mutex &m_lock) const; + bool is_releasing_leader(Mutex &m_lock) const; void cancel_timer_task(); void schedule_timer_task(const std::string &name, diff --git a/src/tools/rbd_mirror/Replayer.cc b/src/tools/rbd_mirror/Replayer.cc index 826e78563d4..7f8f9e2a304 100644 --- a/src/tools/rbd_mirror/Replayer.cc +++ b/src/tools/rbd_mirror/Replayer.cc @@ -327,19 +327,20 @@ int Replayer::init() m_instance_replayer->init(); m_instance_replayer->add_peer(m_peer.uuid, m_remote_io_ctx); - m_leader_watcher.reset(new LeaderWatcher<>(m_threads, m_local_io_ctx, - &m_leader_listener)); - r = m_leader_watcher->init(); + m_instance_watcher.reset(InstanceWatcher<>::create(m_local_io_ctx, + m_threads->work_queue, + m_instance_replayer.get())); + r = m_instance_watcher->init(); if (r < 0) { - derr << "error initializing leader watcher: " << cpp_strerror(r) << dendl; + derr << "error initializing instance watcher: " << cpp_strerror(r) << dendl; return r; } - m_instance_watcher.reset(InstanceWatcher<>::create(m_local_io_ctx, - m_threads->work_queue)); - r = m_instance_watcher->init(); + m_leader_watcher.reset(new LeaderWatcher<>(m_threads, m_local_io_ctx, + &m_leader_listener)); + r = m_leader_watcher->init(); if (r < 0) { - derr << "error initializing instance watcher: " << cpp_strerror(r) << dendl; + derr << "error initializing leader watcher: " << cpp_strerror(r) << dendl; return r; } @@ -458,6 +459,11 @@ void Replayer::print_status(Formatter *f, stringstream *ss) f->open_object_section("replayer_status"); f->dump_string("pool", m_local_io_ctx.get_pool_name()); f->dump_stream("peer") << m_peer; + f->dump_string("instance_id", m_instance_watcher->get_instance_id()); + + std::string leader_instance_id; + m_leader_watcher->get_leader_instance_id(&leader_instance_id); + f->dump_string("leader_instance_id", leader_instance_id); bool leader = m_leader_watcher->is_leader(); f->dump_bool("leader", leader); @@ -595,14 +601,19 @@ void Replayer::handle_update(const std::string &mirror_uuid, C_Gather *gather_ctx = new C_Gather(g_ceph_context, ctx); for (auto &image_id : removed_image_ids) { - m_instance_replayer->release_image(image_id.global_id, mirror_uuid, - image_id.id, true, - gather_ctx->new_sub()); + // for now always send to myself (the leader) + std::string &instance_id = m_instance_watcher->get_instance_id(); + m_instance_watcher->notify_image_release(instance_id, image_id.global_id, + mirror_uuid, image_id.id, true, + gather_ctx->new_sub()); } for (auto &image_id : added_image_ids) { - m_instance_replayer->acquire_image(image_id.global_id, mirror_uuid, - image_id.id, gather_ctx->new_sub()); + // for now always send to myself (the leader) + std::string &instance_id = m_instance_watcher->get_instance_id(); + m_instance_watcher->notify_image_acquire(instance_id, image_id.global_id, + mirror_uuid, image_id.id, + gather_ctx->new_sub()); } gather_ctx->activate(); diff --git a/src/tools/rbd_mirror/instance_watcher/Types.cc b/src/tools/rbd_mirror/instance_watcher/Types.cc new file mode 100644 index 00000000000..a741d1ef913 --- /dev/null +++ b/src/tools/rbd_mirror/instance_watcher/Types.cc @@ -0,0 +1,184 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "Types.h" +#include "include/assert.h" +#include "include/stringify.h" +#include "common/Formatter.h" + +namespace rbd { +namespace mirror { +namespace instance_watcher { + +namespace { + +class EncodePayloadVisitor : public boost::static_visitor { +public: + explicit EncodePayloadVisitor(bufferlist &bl) : m_bl(bl) {} + + template + inline void operator()(const Payload &payload) const { + ::encode(static_cast(Payload::NOTIFY_OP), m_bl); + payload.encode(m_bl); + } + +private: + bufferlist &m_bl; +}; + +class DecodePayloadVisitor : public boost::static_visitor { +public: + DecodePayloadVisitor(__u8 version, bufferlist::iterator &iter) + : m_version(version), m_iter(iter) {} + + template + inline void operator()(Payload &payload) const { + payload.decode(m_version, m_iter); + } + +private: + __u8 m_version; + bufferlist::iterator &m_iter; +}; + +class DumpPayloadVisitor : public boost::static_visitor { +public: + explicit DumpPayloadVisitor(Formatter *formatter) : m_formatter(formatter) {} + + template + inline void operator()(const Payload &payload) const { + NotifyOp notify_op = Payload::NOTIFY_OP; + m_formatter->dump_string("notify_op", stringify(notify_op)); + payload.dump(m_formatter); + } + +private: + ceph::Formatter *m_formatter; +}; + +} // anonymous namespace + +void ImagePayloadBase::encode(bufferlist &bl) const { + ::encode(request_id, bl); + ::encode(global_image_id, bl); + ::encode(peer_mirror_uuid, bl); + ::encode(peer_image_id, bl); +} + +void ImagePayloadBase::decode(__u8 version, bufferlist::iterator &iter) { + ::decode(request_id, iter); + ::decode(global_image_id, iter); + ::decode(peer_mirror_uuid, iter); + ::decode(peer_image_id, iter); +} + +void ImagePayloadBase::dump(Formatter *f) const { + f->dump_unsigned("request_id", request_id); + f->dump_string("global_image_id", global_image_id); + f->dump_string("peer_mirror_uuid", peer_mirror_uuid); + f->dump_string("peer_image_id", peer_image_id); +} + +void ImageReleasePayload::encode(bufferlist &bl) const { + ImagePayloadBase::encode(bl); + ::encode(schedule_delete, bl); +} + +void ImageReleasePayload::decode(__u8 version, bufferlist::iterator &iter) { + ImagePayloadBase::decode(version, iter); + ::decode(schedule_delete, iter); +} + +void ImageReleasePayload::dump(Formatter *f) const { + ImagePayloadBase::dump(f); + f->dump_bool("schedule_delete", schedule_delete); +} + +void UnknownPayload::encode(bufferlist &bl) const { + assert(false); +} + +void UnknownPayload::decode(__u8 version, bufferlist::iterator &iter) { +} + +void UnknownPayload::dump(Formatter *f) const { +} + +void NotifyMessage::encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + boost::apply_visitor(EncodePayloadVisitor(bl), payload); + ENCODE_FINISH(bl); +} + +void NotifyMessage::decode(bufferlist::iterator& iter) { + DECODE_START(1, iter); + + uint32_t notify_op; + ::decode(notify_op, iter); + + // select the correct payload variant based upon the encoded op + switch (notify_op) { + case NOTIFY_OP_IMAGE_ACQUIRE: + payload = ImageAcquirePayload(); + break; + case NOTIFY_OP_IMAGE_RELEASE: + payload = ImageReleasePayload(); + break; + default: + payload = UnknownPayload(); + break; + } + + apply_visitor(DecodePayloadVisitor(struct_v, iter), payload); + DECODE_FINISH(iter); +} + +void NotifyMessage::dump(Formatter *f) const { + apply_visitor(DumpPayloadVisitor(f), payload); +} + +void NotifyMessage::generate_test_instances(std::list &o) { + o.push_back(new NotifyMessage(ImageAcquirePayload())); + o.push_back(new NotifyMessage(ImageAcquirePayload(1, "gid", "uuid", "id"))); + + o.push_back(new NotifyMessage(ImageReleasePayload())); + o.push_back(new NotifyMessage(ImageReleasePayload(1, "gid", "uuid", "id", + true))); +} + +std::ostream &operator<<(std::ostream &out, const NotifyOp &op) { + switch (op) { + case NOTIFY_OP_IMAGE_ACQUIRE: + out << "ImageAcquire"; + break; + case NOTIFY_OP_IMAGE_RELEASE: + out << "ImageRelease"; + break; + default: + out << "Unknown (" << static_cast(op) << ")"; + break; + } + return out; +} + +void NotifyAckPayload::encode(bufferlist &bl) const { + ::encode(instance_id, bl); + ::encode(request_id, bl); + ::encode(ret_val, bl); +} + +void NotifyAckPayload::decode(bufferlist::iterator &iter) { + ::decode(instance_id, iter); + ::decode(request_id, iter); + ::decode(ret_val, iter); +} + +void NotifyAckPayload::dump(Formatter *f) const { + f->dump_string("instance_id", instance_id); + f->dump_unsigned("request_id", request_id); + f->dump_int("request_id", ret_val); +} + +} // namespace instance_watcher +} // namespace mirror +} // namespace rbd diff --git a/src/tools/rbd_mirror/instance_watcher/Types.h b/src/tools/rbd_mirror/instance_watcher/Types.h new file mode 100644 index 00000000000..7299480cf28 --- /dev/null +++ b/src/tools/rbd_mirror/instance_watcher/Types.h @@ -0,0 +1,141 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef RBD_MIRROR_INSTANCE_WATCHER_TYPES_H +#define RBD_MIRROR_INSTANCE_WATCHER_TYPES_H + +#include +#include +#include + +#include "include/buffer_fwd.h" +#include "include/encoding.h" +#include "include/int_types.h" + +namespace ceph { class Formatter; } + +namespace rbd { +namespace mirror { +namespace instance_watcher { + +enum NotifyOp { + NOTIFY_OP_IMAGE_ACQUIRE = 0, + NOTIFY_OP_IMAGE_RELEASE = 1, +}; + +struct ImagePayloadBase { + uint64_t request_id; + std::string global_image_id; + std::string peer_mirror_uuid; + std::string peer_image_id; + + ImagePayloadBase() : request_id(0) { + } + + ImagePayloadBase(uint64_t request_id, const std::string &global_image_id, + const std::string &peer_mirror_uuid, + const std::string &peer_image_id) + : request_id(request_id), global_image_id(global_image_id), + peer_mirror_uuid(peer_mirror_uuid), peer_image_id(peer_image_id) { + } + + void encode(bufferlist &bl) const; + void decode(__u8 version, bufferlist::iterator &iter); + void dump(Formatter *f) const; +}; + +struct ImageAcquirePayload : public ImagePayloadBase { + static const NotifyOp NOTIFY_OP = NOTIFY_OP_IMAGE_ACQUIRE; + + ImageAcquirePayload() : ImagePayloadBase() { + } + + ImageAcquirePayload(uint64_t request_id, const std::string &global_image_id, + const std::string &peer_mirror_uuid, + const std::string &peer_image_id) + : ImagePayloadBase(request_id, global_image_id, peer_mirror_uuid, + peer_image_id) { + } +}; + +struct ImageReleasePayload : public ImagePayloadBase { + static const NotifyOp NOTIFY_OP = NOTIFY_OP_IMAGE_RELEASE; + + bool schedule_delete; + + ImageReleasePayload() : ImagePayloadBase(), schedule_delete(false) { + } + + ImageReleasePayload(uint64_t request_id, const std::string &global_image_id, + const std::string &peer_mirror_uuid, + const std::string &peer_image_id, bool schedule_delete) + : ImagePayloadBase(request_id, global_image_id, peer_mirror_uuid, + peer_image_id), + schedule_delete(schedule_delete) { + } + + void encode(bufferlist &bl) const; + void decode(__u8 version, bufferlist::iterator &iter); + void dump(Formatter *f) const; +}; + +struct UnknownPayload { + static const NotifyOp NOTIFY_OP = static_cast(-1); + + UnknownPayload() { + } + + void encode(bufferlist &bl) const; + void decode(__u8 version, bufferlist::iterator &iter); + void dump(Formatter *f) const; +}; + +typedef boost::variant Payload; + +struct NotifyMessage { + NotifyMessage(const Payload &payload = UnknownPayload()) : payload(payload) { + } + + Payload payload; + + void encode(bufferlist& bl) const; + void decode(bufferlist::iterator& it); + void dump(Formatter *f) const; + + static void generate_test_instances(std::list &o); +}; + +WRITE_CLASS_ENCODER(NotifyMessage); + +std::ostream &operator<<(std::ostream &out, const NotifyOp &op); + +struct NotifyAckPayload { + std::string instance_id; + uint64_t request_id; + int ret_val; + + NotifyAckPayload() : request_id(0), ret_val(0) { + } + + NotifyAckPayload(const std::string &instance_id, uint64_t request_id, + int ret_val) + : instance_id(instance_id), request_id(request_id), ret_val(ret_val) { + } + + void encode(bufferlist &bl) const; + void decode(bufferlist::iterator& it); + void dump(Formatter *f) const; +}; + +WRITE_CLASS_ENCODER(NotifyAckPayload); + +} // namespace instance_watcher +} // namespace mirror +} // namespace librbd + +using rbd::mirror::instance_watcher::encode; +using rbd::mirror::instance_watcher::decode; + +#endif // RBD_MIRROR_INSTANCE_WATCHER_TYPES_H