From 215f2984bd9b38172794241ec6b407ca6f00f622 Mon Sep 17 00:00:00 2001 From: Mykola Golub Date: Wed, 8 Feb 2017 14:40:24 +0100 Subject: [PATCH] rbd-mirror A/A: InstanceWatcher watch/notify stub for leader/follower RPC Fixes: http://tracker.ceph.com/issues/18783 Signed-off-by: Mykola Golub --- src/include/rbd_types.h | 8 +- src/test/rbd_mirror/CMakeLists.txt | 2 + src/test/rbd_mirror/test_InstanceWatcher.cc | 133 +++++ src/test/rbd_mirror/test_main.cc | 10 +- .../rbd_mirror/test_mock_InstanceWatcher.cc | 247 +++++++++ src/tools/rbd_mirror/CMakeLists.txt | 3 +- src/tools/rbd_mirror/InstanceWatcher.cc | 501 ++++++++++++++++++ src/tools/rbd_mirror/InstanceWatcher.h | 130 +++++ src/tools/rbd_mirror/Replayer.cc | 13 +- src/tools/rbd_mirror/Replayer.h | 4 + 10 files changed, 1041 insertions(+), 10 deletions(-) create mode 100644 src/test/rbd_mirror/test_InstanceWatcher.cc create mode 100644 src/test/rbd_mirror/test_mock_InstanceWatcher.cc create mode 100644 src/tools/rbd_mirror/InstanceWatcher.cc create mode 100644 src/tools/rbd_mirror/InstanceWatcher.h diff --git a/src/include/rbd_types.h b/src/include/rbd_types.h index a81da8bb6bc65..845a94e1ebd03 100644 --- a/src/include/rbd_types.h +++ b/src/include/rbd_types.h @@ -65,12 +65,12 @@ */ #define RBD_MIRRORING "rbd_mirroring" - /** - * rbd_mirror_leader object is used for pool-level coordination - * between rbd-mirror daemons. + * rbd_mirror_leader and rbd_mirror_instance. objects are used + * for pool-level coordination between rbd-mirror daemons. */ -#define RBD_MIRROR_LEADER "rbd_mirror_leader" +#define RBD_MIRROR_LEADER "rbd_mirror_leader" +#define RBD_MIRROR_INSTANCE_PREFIX "rbd_mirror_instance." #define RBD_MAX_OBJ_NAME_SIZE 96 #define RBD_MAX_BLOCK_NAME_SIZE 24 diff --git a/src/test/rbd_mirror/CMakeLists.txt b/src/test/rbd_mirror/CMakeLists.txt index b815780650a32..369508eaf32b4 100644 --- a/src/test/rbd_mirror/CMakeLists.txt +++ b/src/test/rbd_mirror/CMakeLists.txt @@ -4,6 +4,7 @@ set(rbd_mirror_test_srcs test_ImageReplayer.cc test_ImageDeleter.cc test_ImageSync.cc + test_InstanceWatcher.cc test_LeaderWatcher.cc test_fixture.cc ) @@ -17,6 +18,7 @@ add_executable(unittest_rbd_mirror test_mock_ImageReplayer.cc test_mock_ImageSync.cc test_mock_ImageSyncThrottler.cc + test_mock_InstanceWatcher.cc test_mock_LeaderWatcher.cc image_replayer/test_mock_BootstrapRequest.cc image_replayer/test_mock_CreateImageRequest.cc diff --git a/src/test/rbd_mirror/test_InstanceWatcher.cc b/src/test/rbd_mirror/test_InstanceWatcher.cc new file mode 100644 index 0000000000000..cbf8cf015dc41 --- /dev/null +++ b/src/test/rbd_mirror/test_InstanceWatcher.cc @@ -0,0 +1,133 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "include/rados/librados.hpp" +#include "include/stringify.h" +#include "cls/rbd/cls_rbd_types.h" +#include "cls/rbd/cls_rbd_client.h" +#include "librbd/Utils.h" +#include "librbd/internal.h" +#include "test/rbd_mirror/test_fixture.h" +#include "tools/rbd_mirror/InstanceWatcher.h" +#include "tools/rbd_mirror/Threads.h" + +#include "test/librados/test.h" +#include "gtest/gtest.h" + +using rbd::mirror::InstanceWatcher; + +void register_test_instance_watcher() { +} + +class TestInstanceWatcher : public ::rbd::mirror::TestFixture { +public: + std::string m_instance_id; + std::string m_oid; + + void SetUp() override { + TestFixture::SetUp(); + m_local_io_ctx.remove(RBD_MIRROR_LEADER); + EXPECT_EQ(0, m_local_io_ctx.create(RBD_MIRROR_LEADER, true)); + + m_instance_id = stringify(m_local_io_ctx.get_instance_id()); + m_oid = RBD_MIRROR_INSTANCE_PREFIX + m_instance_id; + } + + void get_instances(std::vector *instance_ids) { + instance_ids->clear(); + C_SaferCond on_get; + InstanceWatcher<>::get_instances(m_local_io_ctx, instance_ids, &on_get); + EXPECT_EQ(0, on_get.wait()); + } +}; + +TEST_F(TestInstanceWatcher, InitShutdown) +{ + InstanceWatcher<> instance_watcher(m_local_io_ctx, m_threads->work_queue); + std::vector instance_ids; + get_instances(&instance_ids); + ASSERT_EQ(0U, instance_ids.size()); + + uint64_t size; + ASSERT_EQ(-ENOENT, m_local_io_ctx.stat(m_oid, &size, nullptr)); + + // Init + ASSERT_EQ(0, instance_watcher.init()); + + get_instances(&instance_ids); + ASSERT_EQ(1U, instance_ids.size()); + ASSERT_EQ(m_instance_id, instance_ids[0]); + + ASSERT_EQ(0, m_local_io_ctx.stat(m_oid, &size, nullptr)); + std::list watchers; + ASSERT_EQ(0, m_local_io_ctx.list_watchers(m_oid, &watchers)); + ASSERT_EQ(1U, watchers.size()); + ASSERT_EQ(m_instance_id, stringify(watchers.begin()->watcher_id)); + + get_instances(&instance_ids); + ASSERT_EQ(1U, instance_ids.size()); + + // Shutdown + instance_watcher.shut_down(); + + ASSERT_EQ(-ENOENT, m_local_io_ctx.stat(m_oid, &size, nullptr)); + get_instances(&instance_ids); + ASSERT_EQ(0U, instance_ids.size()); +} + +TEST_F(TestInstanceWatcher, Remove) +{ + std::string instance_id = "instance_id"; + std::string oid = RBD_MIRROR_INSTANCE_PREFIX + instance_id; + + std::vector instance_ids; + get_instances(&instance_ids); + ASSERT_EQ(0U, instance_ids.size()); + + uint64_t size; + ASSERT_EQ(-ENOENT, m_local_io_ctx.stat(oid, &size, nullptr)); + + librados::Rados cluster; + librados::IoCtx io_ctx; + ASSERT_EQ("", connect_cluster_pp(cluster)); + ASSERT_EQ(0, cluster.ioctx_create(_local_pool_name.c_str(), io_ctx)); + InstanceWatcher<> instance_watcher(io_ctx, m_threads->work_queue, + instance_id); + // Init + ASSERT_EQ(0, instance_watcher.init()); + + get_instances(&instance_ids); + ASSERT_EQ(1U, instance_ids.size()); + ASSERT_EQ(instance_id, instance_ids[0]); + + ASSERT_EQ(0, m_local_io_ctx.stat(oid, &size, nullptr)); + std::list watchers; + ASSERT_EQ(0, m_local_io_ctx.list_watchers(oid, &watchers)); + ASSERT_EQ(1U, watchers.size()); + + get_instances(&instance_ids); + ASSERT_EQ(1U, instance_ids.size()); + + // Remove + C_SaferCond on_remove; + InstanceWatcher<>::remove_instance(m_local_io_ctx, m_threads->work_queue, + "instance_id", &on_remove); + ASSERT_EQ(0, on_remove.wait()); + + ASSERT_EQ(-ENOENT, m_local_io_ctx.stat(oid, &size, nullptr)); + get_instances(&instance_ids); + ASSERT_EQ(0U, instance_ids.size()); + + // Shutdown + instance_watcher.shut_down(); + + ASSERT_EQ(-ENOENT, m_local_io_ctx.stat(m_oid, &size, nullptr)); + get_instances(&instance_ids); + ASSERT_EQ(0U, instance_ids.size()); + + // Remove NOENT + C_SaferCond on_remove_noent; + InstanceWatcher<>::remove_instance(m_local_io_ctx, m_threads->work_queue, + instance_id, &on_remove_noent); + ASSERT_EQ(0, on_remove_noent.wait()); +} diff --git a/src/test/rbd_mirror/test_main.cc b/src/test/rbd_mirror/test_main.cc index d0d577e056e45..aae4b143787a1 100644 --- a/src/test/rbd_mirror/test_main.cc +++ b/src/test/rbd_mirror/test_main.cc @@ -9,20 +9,22 @@ #include extern void register_test_cluster_watcher(); +extern void register_test_image_sync(); +extern void register_test_instance_watcher(); +extern void register_test_leader_watcher(); extern void register_test_pool_watcher(); extern void register_test_rbd_mirror(); extern void register_test_rbd_mirror_image_deleter(); -extern void register_test_image_sync(); -extern void register_test_leader_watcher(); int main(int argc, char **argv) { register_test_cluster_watcher(); + register_test_image_sync(); + register_test_instance_watcher(); + register_test_leader_watcher(); register_test_pool_watcher(); register_test_rbd_mirror(); register_test_rbd_mirror_image_deleter(); - register_test_image_sync(); - register_test_leader_watcher(); ::testing::InitGoogleTest(&argc, argv); diff --git a/src/test/rbd_mirror/test_mock_InstanceWatcher.cc b/src/test/rbd_mirror/test_mock_InstanceWatcher.cc new file mode 100644 index 0000000000000..f2f451a52980e --- /dev/null +++ b/src/test/rbd_mirror/test_mock_InstanceWatcher.cc @@ -0,0 +1,247 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "librbd/ManagedLock.h" +#include "test/librbd/mock/MockImageCtx.h" +#include "test/librados_test_stub/MockTestMemIoCtxImpl.h" +#include "test/rbd_mirror/test_mock_fixture.h" +#include "tools/rbd_mirror/InstanceWatcher.h" +#include "tools/rbd_mirror/Threads.h" + +namespace librbd { + +namespace { + +struct MockTestImageCtx : public MockImageCtx { + MockTestImageCtx(librbd::ImageCtx &image_ctx) + : librbd::MockImageCtx(image_ctx) { + } +}; + +} // anonymous namespace + +template <> +struct ManagedLock { + static ManagedLock* s_instance; + + static ManagedLock *create(librados::IoCtx& ioctx, ContextWQ *work_queue, + const std::string& oid, librbd::Watcher *watcher, + managed_lock::Mode mode, + bool blacklist_on_break_lock, + uint32_t blacklist_expire_seconds) { + assert(s_instance != nullptr); + return s_instance; + } + + ManagedLock() { + assert(s_instance == nullptr); + s_instance = this; + } + + ~ManagedLock() { + assert(s_instance == this); + s_instance = nullptr; + } + + MOCK_METHOD0(destroy, void()); + MOCK_METHOD1(shut_down, void(Context *)); + MOCK_METHOD1(acquire_lock, void(Context *)); + MOCK_METHOD2(get_locker, void(managed_lock::Locker *, Context *)); + MOCK_METHOD3(break_lock, void(const managed_lock::Locker &, bool, Context *)); +}; + +ManagedLock *ManagedLock::s_instance = nullptr; + +} // namespace librbd + +// template definitions +#include "tools/rbd_mirror/InstanceWatcher.cc" +template class rbd::mirror::InstanceWatcher; + +namespace rbd { +namespace mirror { + +using ::testing::_; +using ::testing::InSequence; +using ::testing::Invoke; +using ::testing::Return; +using ::testing::StrEq; +using ::testing::WithArg; + +class TestMockInstanceWatcher : public TestMockFixture { +public: + typedef librbd::ManagedLock MockManagedLock; + typedef InstanceWatcher MockInstanceWatcher; + + std::string m_instance_id; + std::string m_oid; + + void SetUp() override { + TestFixture::SetUp(); + m_local_io_ctx.remove(RBD_MIRROR_LEADER); + EXPECT_EQ(0, m_local_io_ctx.create(RBD_MIRROR_LEADER, true)); + + m_instance_id = stringify(m_local_io_ctx.get_instance_id()); + m_oid = RBD_MIRROR_INSTANCE_PREFIX + m_instance_id; + } + + void expect_register_watch(librados::MockTestMemIoCtxImpl &mock_io_ctx) { + EXPECT_CALL(mock_io_ctx, aio_watch(m_oid, _, _, _)); + } + + void expect_unregister_watch(librados::MockTestMemIoCtxImpl &mock_io_ctx) { + EXPECT_CALL(mock_io_ctx, aio_unwatch(_, _)); + } + + void expect_register_instance(librados::MockTestMemIoCtxImpl &mock_io_ctx, + int r) { + EXPECT_CALL(mock_io_ctx, exec(RBD_MIRROR_LEADER, _, StrEq("rbd"), + StrEq("mirror_instances_add"), _, _, _)) + .WillOnce(Return(r)); + } + + void expect_unregister_instance(librados::MockTestMemIoCtxImpl &mock_io_ctx, + int r) { + EXPECT_CALL(mock_io_ctx, exec(RBD_MIRROR_LEADER, _, StrEq("rbd"), + StrEq("mirror_instances_remove"), _, _, _)) + .WillOnce(Return(r)); + } + + void expect_acquire_lock(MockManagedLock &mock_managed_lock, int r) { + EXPECT_CALL(mock_managed_lock, acquire_lock(_)) + .WillOnce(CompleteContext(r)); + } + + void expect_release_lock(MockManagedLock &mock_managed_lock, int r) { + EXPECT_CALL(mock_managed_lock, shut_down(_)).WillOnce(CompleteContext(r)); + } + + void expect_destroy_lock(MockManagedLock &mock_managed_lock) { + EXPECT_CALL(mock_managed_lock, destroy()); + } + + void expect_get_locker(MockManagedLock &mock_managed_lock, + const librbd::managed_lock::Locker &locker, int r) { + EXPECT_CALL(mock_managed_lock, get_locker(_, _)) + .WillOnce(Invoke([r, locker](librbd::managed_lock::Locker *out, + Context *ctx) { + if (r == 0) { + *out = locker; + } + ctx->complete(r); + })); + } + + void expect_break_lock(MockManagedLock &mock_managed_lock, + const librbd::managed_lock::Locker &locker, int r) { + EXPECT_CALL(mock_managed_lock, break_lock(locker, true, _)) + .WillOnce(WithArg<2>(CompleteContext(r))); + } +}; + +TEST_F(TestMockInstanceWatcher, InitShutdown) { + MockManagedLock mock_managed_lock; + librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(m_local_io_ctx)); + + auto instance_watcher = new MockInstanceWatcher(m_local_io_ctx, + m_threads->work_queue); + InSequence seq; + + // Init + expect_register_instance(mock_io_ctx, 0); + expect_register_watch(mock_io_ctx); + expect_acquire_lock(mock_managed_lock, 0); + ASSERT_EQ(0, instance_watcher->init()); + + // Shutdown + expect_release_lock(mock_managed_lock, 0); + expect_unregister_watch(mock_io_ctx); + expect_unregister_instance(mock_io_ctx, 0); + instance_watcher->shut_down(); + + expect_destroy_lock(mock_managed_lock); + delete instance_watcher; +} + +TEST_F(TestMockInstanceWatcher, InitError) { + MockManagedLock mock_managed_lock; + librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(m_local_io_ctx)); + + auto instance_watcher = new MockInstanceWatcher(m_local_io_ctx, + m_threads->work_queue); + InSequence seq; + + expect_register_instance(mock_io_ctx, 0); + expect_register_watch(mock_io_ctx); + expect_acquire_lock(mock_managed_lock, -EINVAL); + expect_unregister_watch(mock_io_ctx); + expect_unregister_instance(mock_io_ctx, 0); + + ASSERT_EQ(-EINVAL, instance_watcher->init()); + + expect_destroy_lock(mock_managed_lock); + delete instance_watcher; +} + +TEST_F(TestMockInstanceWatcher, ShutdownError) { + MockManagedLock mock_managed_lock; + librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(m_local_io_ctx)); + + auto instance_watcher = new MockInstanceWatcher(m_local_io_ctx, + m_threads->work_queue); + InSequence seq; + + // Init + expect_register_instance(mock_io_ctx, 0); + expect_register_watch(mock_io_ctx); + expect_acquire_lock(mock_managed_lock, 0); + ASSERT_EQ(0, instance_watcher->init()); + + // Shutdown + expect_release_lock(mock_managed_lock, -EINVAL); + expect_unregister_watch(mock_io_ctx); + expect_unregister_instance(mock_io_ctx, 0); + instance_watcher->shut_down(); + + expect_destroy_lock(mock_managed_lock); + delete instance_watcher; +} + + +TEST_F(TestMockInstanceWatcher, Remove) { + MockManagedLock mock_managed_lock; + librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(m_local_io_ctx)); + librbd::managed_lock::Locker + locker{entity_name_t::CLIENT(1), "auto 123", "1.2.3.4:0/0", 123}; + + InSequence seq; + + expect_get_locker(mock_managed_lock, locker, 0); + expect_break_lock(mock_managed_lock, locker, 0); + expect_unregister_instance(mock_io_ctx, 0); + expect_destroy_lock(mock_managed_lock); + + C_SaferCond on_remove; + MockInstanceWatcher::remove_instance(m_local_io_ctx, m_threads->work_queue, + "instance_id", &on_remove); + ASSERT_EQ(0, on_remove.wait()); +} + +TEST_F(TestMockInstanceWatcher, RemoveNoent) { + MockManagedLock mock_managed_lock; + librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(m_local_io_ctx)); + + InSequence seq; + + expect_get_locker(mock_managed_lock, librbd::managed_lock::Locker(), -ENOENT); + expect_unregister_instance(mock_io_ctx, 0); + expect_destroy_lock(mock_managed_lock); + + C_SaferCond on_remove; + MockInstanceWatcher::remove_instance(m_local_io_ctx, m_threads->work_queue, + "instance_id", &on_remove); + ASSERT_EQ(0, on_remove.wait()); +} + +} // namespace mirror +} // namespace rbd diff --git a/src/tools/rbd_mirror/CMakeLists.txt b/src/tools/rbd_mirror/CMakeLists.txt index 10d6c8918d5d5..9224199e5331c 100644 --- a/src/tools/rbd_mirror/CMakeLists.txt +++ b/src/tools/rbd_mirror/CMakeLists.txt @@ -3,10 +3,11 @@ add_library(rbd_mirror_types STATIC set(rbd_mirror_internal ClusterWatcher.cc - ImageReplayer.cc ImageDeleter.cc + ImageReplayer.cc ImageSync.cc ImageSyncThrottler.cc + InstanceWatcher.cc LeaderWatcher.cc Mirror.cc MirrorStatusWatcher.cc diff --git a/src/tools/rbd_mirror/InstanceWatcher.cc b/src/tools/rbd_mirror/InstanceWatcher.cc new file mode 100644 index 0000000000000..c07100b4648f6 --- /dev/null +++ b/src/tools/rbd_mirror/InstanceWatcher.cc @@ -0,0 +1,501 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "InstanceWatcher.h" +#include "include/stringify.h" +#include "common/debug.h" +#include "common/errno.h" +#include "cls/rbd/cls_rbd_client.h" +#include "librbd/ManagedLock.h" +#include "librbd/Utils.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_rbd_mirror +#undef dout_prefix +#define dout_prefix *_dout << "rbd::mirror::InstanceWatcher: " \ + << this << " " << __func__ << ": " + +namespace rbd { +namespace mirror { + +using librbd::util::create_async_context_callback; +using librbd::util::create_context_callback; +using librbd::util::create_rados_ack_callback; + +namespace { + +struct C_GetInstances : public Context { + std::vector *instance_ids; + Context *on_finish; + bufferlist out_bl; + + C_GetInstances(std::vector *instance_ids, Context *on_finish) + : instance_ids(instance_ids), on_finish(on_finish) { + } + + void finish(int r) override { + if (r == 0) { + bufferlist::iterator it = out_bl.begin(); + r = librbd::cls_client::mirror_instances_list_finish(&it, instance_ids); + } else if (r == -ENOENT) { + r = 0; + } + on_finish->complete(r); + } +}; + +template +struct RemoveInstanceRequest : public Context { + InstanceWatcher instance_watcher; + Context *on_finish; + + RemoveInstanceRequest(librados::IoCtx &io_ctx, ContextWQ *work_queue, + const std::string &instance_id, Context *on_finish) + : instance_watcher(io_ctx, work_queue, instance_id), on_finish(on_finish) { + } + + void send() { + instance_watcher.remove(this); + } + + void finish(int r) override { + assert(r == 0); + + on_finish->complete(r); + } +}; + +} // anonymous namespace + +template +void InstanceWatcher::get_instances(librados::IoCtx &io_ctx, + std::vector *instance_ids, + Context *on_finish) { + librados::ObjectReadOperation op; + librbd::cls_client::mirror_instances_list_start(&op); + C_GetInstances *ctx = new C_GetInstances(instance_ids, on_finish); + librados::AioCompletion *aio_comp = create_rados_ack_callback(ctx); + + int r = io_ctx.aio_operate(RBD_MIRROR_LEADER, aio_comp, &op, &ctx->out_bl); + assert(r == 0); + aio_comp->release(); +} + +template +void InstanceWatcher::remove_instance(librados::IoCtx &io_ctx, + ContextWQ *work_queue, + const std::string &instance_id, + Context *on_finish) { + auto req = new RemoveInstanceRequest(io_ctx, work_queue, instance_id, + on_finish); + req->send(); +} + +template +InstanceWatcher::InstanceWatcher(librados::IoCtx &io_ctx, + ContextWQ *work_queue, + const boost::optional &id) + : Watcher(io_ctx, work_queue, RBD_MIRROR_INSTANCE_PREFIX + + (id ? *id : stringify(io_ctx.get_instance_id()))), + m_instance_id(id ? *id : stringify(io_ctx.get_instance_id())), + m_lock("rbd::mirror::InstanceWatcher " + io_ctx.get_pool_name()), + m_instance_lock(librbd::ManagedLock::create( + m_ioctx, m_work_queue, m_oid, this, librbd::managed_lock::EXCLUSIVE, true, + m_cct->_conf->rbd_blacklist_expire_seconds)) { +} + +template +InstanceWatcher::~InstanceWatcher() { + m_instance_lock->destroy(); +} + +template +int InstanceWatcher::init() { + C_SaferCond init_ctx; + init(&init_ctx); + return init_ctx.wait(); +} + +template +void InstanceWatcher::init(Context *on_finish) { + dout(20) << dendl; + + Mutex::Locker locker(m_lock); + + assert(m_on_finish == nullptr); + m_on_finish = on_finish; + m_ret_val = 0; + + register_instance(); +} + +template +void InstanceWatcher::shut_down() { + C_SaferCond shut_down_ctx; + shut_down(&shut_down_ctx); + int r = shut_down_ctx.wait(); + assert(r == 0); +} + +template +void InstanceWatcher::shut_down(Context *on_finish) { + dout(20) << dendl; + + Mutex::Locker locker(m_lock); + + assert(m_on_finish == nullptr); + m_on_finish = on_finish; + m_ret_val = 0; + + release_lock(); +} + +template +void InstanceWatcher::remove(Context *on_finish) { + dout(20) << dendl; + + Mutex::Locker locker(m_lock); + + assert(m_on_finish == nullptr); + m_on_finish = on_finish; + m_ret_val = 0; + m_removing = true; + + get_instance_locker(); +} + +template +void InstanceWatcher::handle_notify(uint64_t notify_id, uint64_t handle, + uint64_t notifier_id, bufferlist &bl) { + dout(20) << dendl; + + bufferlist out; + acknowledge_notify(notify_id, handle, out); +} + +template +void InstanceWatcher::register_instance() { + assert(m_lock.is_locked()); + + dout(20) << dendl; + + librados::ObjectWriteOperation op; + librbd::cls_client::mirror_instances_add(&op, m_instance_id); + librados::AioCompletion *aio_comp = create_rados_ack_callback< + InstanceWatcher, &InstanceWatcher::handle_register_instance>(this); + + int r = m_ioctx.aio_operate(RBD_MIRROR_LEADER, aio_comp, &op); + assert(r == 0); + aio_comp->release(); +} + +template +void InstanceWatcher::handle_register_instance(int r) { + dout(20) << "r=" << r << dendl; + + Context *on_finish = nullptr; + { + Mutex::Locker locker(m_lock); + + if (r == 0) { + create_instance_object(); + return; + } + + derr << "error registering instance: " << cpp_strerror(r) << dendl; + + std::swap(on_finish, m_on_finish); + } + on_finish->complete(r); +} + + +template +void InstanceWatcher::create_instance_object() { + dout(20) << dendl; + + assert(m_lock.is_locked()); + + librados::ObjectWriteOperation op; + op.create(true); + + librados::AioCompletion *aio_comp = create_rados_ack_callback< + InstanceWatcher, + &InstanceWatcher::handle_create_instance_object>(this); + int r = m_ioctx.aio_operate(m_oid, aio_comp, &op); + assert(r == 0); + aio_comp->release(); +} + +template +void InstanceWatcher::handle_create_instance_object(int r) { + dout(20) << "r=" << r << dendl; + + Mutex::Locker locker(m_lock); + + if (r < 0) { + derr << "error creating " << m_oid << " object: " << cpp_strerror(r) + << dendl; + + m_ret_val = r; + unregister_instance(); + return; + } + + register_watch(); +} + +template +void InstanceWatcher::register_watch() { + dout(20) << dendl; + + assert(m_lock.is_locked()); + + Context *ctx = create_async_context_callback( + m_work_queue, create_context_callback< + InstanceWatcher, &InstanceWatcher::handle_register_watch>(this)); + + librbd::Watcher::register_watch(ctx); +} + +template +void InstanceWatcher::handle_register_watch(int r) { + dout(20) << "r=" << r << dendl; + + Mutex::Locker locker(m_lock); + + if (r < 0) { + derr << "error registering instance watcher for " << m_oid << " object: " + << cpp_strerror(r) << dendl; + + m_ret_val = r; + remove_instance_object(); + return; + } + + acquire_lock(); +} + +template +void InstanceWatcher::acquire_lock() { + dout(20) << dendl; + + assert(m_lock.is_locked()); + + Context *ctx = create_async_context_callback( + m_work_queue, create_context_callback< + InstanceWatcher, &InstanceWatcher::handle_acquire_lock>(this)); + + m_instance_lock->acquire_lock(ctx); +} + +template +void InstanceWatcher::handle_acquire_lock(int r) { + dout(20) << "r=" << r << dendl; + + Context *on_finish = nullptr; + { + Mutex::Locker locker(m_lock); + + if (r < 0) { + + derr << "error acquiring instance lock: " << cpp_strerror(r) << dendl; + + m_ret_val = r; + unregister_watch(); + return; + } + + std::swap(on_finish, m_on_finish); + } + on_finish->complete(r); +} + +template +void InstanceWatcher::release_lock() { + dout(20) << dendl; + + assert(m_lock.is_locked()); + + Context *ctx = create_async_context_callback( + m_work_queue, create_context_callback< + InstanceWatcher, &InstanceWatcher::handle_release_lock>(this)); + + m_instance_lock->shut_down(ctx); +} + +template +void InstanceWatcher::handle_release_lock(int r) { + dout(20) << "r=" << r << dendl; + + Mutex::Locker locker(m_lock); + + if (r < 0) { + derr << "error releasing instance lock: " << cpp_strerror(r) << dendl; + } + + unregister_watch(); +} + +template +void InstanceWatcher::unregister_watch() { + dout(20) << dendl; + + assert(m_lock.is_locked()); + + Context *ctx = create_async_context_callback( + m_work_queue, create_context_callback< + InstanceWatcher, &InstanceWatcher::handle_unregister_watch>(this)); + + librbd::Watcher::unregister_watch(ctx); +} + +template +void InstanceWatcher::handle_unregister_watch(int r) { + dout(20) << "r=" << r << dendl; + + if (r < 0) { + derr << "error unregistering instance watcher for " << m_oid << " object: " + << cpp_strerror(r) << dendl; + } + + Mutex::Locker locker(m_lock); + remove_instance_object(); +} + +template +void InstanceWatcher::remove_instance_object() { + assert(m_lock.is_locked()); + + dout(20) << dendl; + + librados::ObjectWriteOperation op; + op.remove(); + + librados::AioCompletion *aio_comp = create_rados_ack_callback< + InstanceWatcher, + &InstanceWatcher::handle_remove_instance_object>(this); + int r = m_ioctx.aio_operate(m_oid, aio_comp, &op); + assert(r == 0); + aio_comp->release(); +} + +template +void InstanceWatcher::handle_remove_instance_object(int r) { + dout(20) << "r=" << r << dendl; + + if (m_removing && r == -ENOENT) { + r = 0; + } + + if (r < 0) { + derr << "error removing " << m_oid << " object: " << cpp_strerror(r) + << dendl; + } + + Mutex::Locker locker(m_lock); + unregister_instance(); +} + +template +void InstanceWatcher::unregister_instance() { + dout(20) << dendl; + + assert(m_lock.is_locked()); + + librados::ObjectWriteOperation op; + librbd::cls_client::mirror_instances_remove(&op, m_instance_id); + librados::AioCompletion *aio_comp = create_rados_ack_callback< + InstanceWatcher, &InstanceWatcher::handle_unregister_instance>(this); + + int r = m_ioctx.aio_operate(RBD_MIRROR_LEADER, aio_comp, &op); + assert(r == 0); + aio_comp->release(); +} + +template +void InstanceWatcher::handle_unregister_instance(int r) { + dout(20) << "r=" << r << dendl; + + if (r < 0) { + derr << "error unregistering instance: " << cpp_strerror(r) << dendl; + } + + Context *on_finish = nullptr; + { + Mutex::Locker locker(m_lock); + + std::swap(on_finish, m_on_finish); + r = m_ret_val; + + if (m_removing) { + m_removing = false; + } + } + on_finish->complete(r); +} + +template +void InstanceWatcher::get_instance_locker() { + dout(20) << dendl; + + assert(m_lock.is_locked()); + + Context *ctx = create_async_context_callback( + m_work_queue, create_context_callback< + InstanceWatcher, &InstanceWatcher::handle_get_instance_locker>(this)); + + m_instance_lock->get_locker(&m_instance_locker, ctx); +} + +template +void InstanceWatcher::handle_get_instance_locker(int r) { + dout(20) << "r=" << r << dendl; + + Mutex::Locker locker(m_lock); + + if (r < 0) { + if (r != -ENOENT) { + derr << "error retrieving instance locker: " << cpp_strerror(r) << dendl; + } + remove_instance_object(); + return; + } + + break_instance_lock(); +} + +template +void InstanceWatcher::break_instance_lock() { + dout(20) << dendl; + + assert(m_lock.is_locked()); + + Context *ctx = create_async_context_callback( + m_work_queue, create_context_callback< + InstanceWatcher, &InstanceWatcher::handle_break_instance_lock>(this)); + + m_instance_lock->break_lock(m_instance_locker, true, ctx); +} + +template +void InstanceWatcher::handle_break_instance_lock(int r) { + dout(20) << "r=" << r << dendl; + + Mutex::Locker locker(m_lock); + + if (r < 0) { + if (r != -ENOENT) { + derr << "error breaking instance lock: " << cpp_strerror(r) << dendl; + } + remove_instance_object(); + return; + } + + remove_instance_object(); +} + +} // namespace mirror +} // namespace rbd + +template class rbd::mirror::InstanceWatcher; diff --git a/src/tools/rbd_mirror/InstanceWatcher.h b/src/tools/rbd_mirror/InstanceWatcher.h new file mode 100644 index 0000000000000..d89e40f0a5a5b --- /dev/null +++ b/src/tools/rbd_mirror/InstanceWatcher.h @@ -0,0 +1,130 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_RBD_MIRROR_INSTANCE_WATCHER_H +#define CEPH_RBD_MIRROR_INSTANCE_WATCHER_H + +#include +#include +#include + +#include "librbd/Watcher.h" +#include "librbd/managed_lock/Types.h" + +namespace librbd { + class ImageCtx; + template class ManagedLock; +} + +namespace rbd { +namespace mirror { + +template +class InstanceWatcher : protected librbd::Watcher { +public: + static void get_instances(librados::IoCtx &io_ctx, + std::vector *instance_ids, + Context *on_finish); + static void remove_instance(librados::IoCtx &io_ctx, ContextWQ *work_queue, + const std::string &instance_id, + Context *on_finish); + + static InstanceWatcher *create( + librados::IoCtx &io_ctx, ContextWQ *work_queue, + const boost::optional &id = boost::none) { + return new InstanceWatcher(io_ctx, work_queue, id); + } + void destroy() { + delete this; + } + + InstanceWatcher(librados::IoCtx &io_ctx, ContextWQ *work_queue, + const boost::optional &id = boost::none); + ~InstanceWatcher() override; + + int init(); + void shut_down(); + + void init(Context *on_finish); + void shut_down(Context *on_finish); + void remove(Context *on_finish); + +protected: + void handle_notify(uint64_t notify_id, uint64_t handle, uint64_t notifier_id, + bufferlist &bl) override; + +private: + /** + * @verbatim + * + * BREAK_INSTANCE_LOCK -------\ + * ^ | + * | (error) | + * GET_INSTANCE_LOCKER * * *>| + * ^ (remove) | + * | | + * <----------------+--------\ + * | (init) ^ | | + * v (error) * | | + * REGISTER_INSTANCE * * * * * *|* *> UNREGISTER_INSTANCE + * | * | ^ + * v (error) * v | + * CREATE_INSTANCE_OBJECT * * * * * *> REMOVE_INSTANCE_OBJECT + * | * ^ + * v (error) * | + * REGISTER_WATCH * * * * * * * * * *> UNREGISTER_WATCH + * | * ^ + * v (error) * | + * ACQUIRE_LOCK * * * * * * * * * * * RELEASE_LOCK + * | ^ + * v (shut_down) | + * -------------------------------/ + * + * @endverbatim + */ + + bool m_owner; + std::string m_instance_id; + + mutable Mutex m_lock; + librbd::ManagedLock *m_instance_lock; + Context *m_on_finish = nullptr; + int m_ret_val = 0; + bool m_removing = false; + librbd::managed_lock::Locker m_instance_locker; + + void register_instance(); + void handle_register_instance(int r); + + void create_instance_object(); + void handle_create_instance_object(int r); + + void register_watch(); + void handle_register_watch(int r); + + void acquire_lock(); + void handle_acquire_lock(int r); + + void release_lock(); + void handle_release_lock(int r); + + void unregister_watch(); + void handle_unregister_watch(int r); + + void remove_instance_object(); + void handle_remove_instance_object(int r); + + void unregister_instance(); + void handle_unregister_instance(int r); + + void get_instance_locker(); + void handle_get_instance_locker(int r); + + void break_instance_lock(); + void handle_break_instance_lock(int r); +}; + +} // namespace mirror +} // namespace rbd + +#endif // CEPH_RBD_MIRROR_INSTANCE_WATCHER_H diff --git a/src/tools/rbd_mirror/Replayer.cc b/src/tools/rbd_mirror/Replayer.cc index 59092404359ff..0b9cf4780578f 100644 --- a/src/tools/rbd_mirror/Replayer.cc +++ b/src/tools/rbd_mirror/Replayer.cc @@ -16,7 +16,7 @@ #include "librbd/Utils.h" #include "librbd/Watcher.h" #include "librbd/internal.h" -#include "LeaderWatcher.h" +#include "InstanceWatcher.h" #include "Replayer.h" #include "Threads.h" @@ -236,6 +236,9 @@ Replayer::~Replayer() if (m_leader_watcher) { m_leader_watcher->shut_down(); } + if (m_instance_watcher) { + m_instance_watcher->shut_down(); + } } bool Replayer::is_blacklisted() const { @@ -292,6 +295,14 @@ int Replayer::init() return r; } + m_instance_watcher.reset(InstanceWatcher<>::create(m_local_io_ctx, + m_threads->work_queue)); + r = m_instance_watcher->init(); + if (r < 0) { + derr << "error initializing instance watcher: " << cpp_strerror(r) << dendl; + return r; + } + // Bootstrap existing mirroring images init_local_mirroring_images(); diff --git a/src/tools/rbd_mirror/Replayer.h b/src/tools/rbd_mirror/Replayer.h index 36dc074320f41..286b70dddbfee 100644 --- a/src/tools/rbd_mirror/Replayer.h +++ b/src/tools/rbd_mirror/Replayer.h @@ -22,11 +22,14 @@ #include "ImageDeleter.h" #include "types.h" +namespace librbd { class ImageCtx; } + namespace rbd { namespace mirror { struct Threads; class ReplayerAdminSocketHook; +template class InstanceWatcher; /** * Controls mirroring for a single remote cluster. @@ -124,6 +127,7 @@ private: } m_leader_listener; std::unique_ptr > m_leader_watcher; + std::unique_ptr > m_instance_watcher; }; } // namespace mirror -- 2.47.3