*/
#define RBD_MIRRORING "rbd_mirroring"
-
/**
- * rbd_mirror_leader object is used for pool-level coordination
- * between rbd-mirror daemons.
+ * rbd_mirror_leader and rbd_mirror_instance.<instance id> objects are used
+ * for pool-level coordination between rbd-mirror daemons.
*/
-#define RBD_MIRROR_LEADER "rbd_mirror_leader"
+#define RBD_MIRROR_LEADER "rbd_mirror_leader"
+#define RBD_MIRROR_INSTANCE_PREFIX "rbd_mirror_instance."
#define RBD_MAX_OBJ_NAME_SIZE 96
#define RBD_MAX_BLOCK_NAME_SIZE 24
test_ImageReplayer.cc
test_ImageDeleter.cc
test_ImageSync.cc
+ test_InstanceWatcher.cc
test_LeaderWatcher.cc
test_fixture.cc
)
test_mock_ImageReplayer.cc
test_mock_ImageSync.cc
test_mock_ImageSyncThrottler.cc
+ test_mock_InstanceWatcher.cc
test_mock_LeaderWatcher.cc
image_replayer/test_mock_BootstrapRequest.cc
image_replayer/test_mock_CreateImageRequest.cc
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "include/rados/librados.hpp"
+#include "include/stringify.h"
+#include "cls/rbd/cls_rbd_types.h"
+#include "cls/rbd/cls_rbd_client.h"
+#include "librbd/Utils.h"
+#include "librbd/internal.h"
+#include "test/rbd_mirror/test_fixture.h"
+#include "tools/rbd_mirror/InstanceWatcher.h"
+#include "tools/rbd_mirror/Threads.h"
+
+#include "test/librados/test.h"
+#include "gtest/gtest.h"
+
+using rbd::mirror::InstanceWatcher;
+
+void register_test_instance_watcher() {
+}
+
+class TestInstanceWatcher : public ::rbd::mirror::TestFixture {
+public:
+ std::string m_instance_id;
+ std::string m_oid;
+
+ void SetUp() override {
+ TestFixture::SetUp();
+ m_local_io_ctx.remove(RBD_MIRROR_LEADER);
+ EXPECT_EQ(0, m_local_io_ctx.create(RBD_MIRROR_LEADER, true));
+
+ m_instance_id = stringify(m_local_io_ctx.get_instance_id());
+ m_oid = RBD_MIRROR_INSTANCE_PREFIX + m_instance_id;
+ }
+
+ void get_instances(std::vector<std::string> *instance_ids) {
+ instance_ids->clear();
+ C_SaferCond on_get;
+ InstanceWatcher<>::get_instances(m_local_io_ctx, instance_ids, &on_get);
+ EXPECT_EQ(0, on_get.wait());
+ }
+};
+
+TEST_F(TestInstanceWatcher, InitShutdown)
+{
+ InstanceWatcher<> instance_watcher(m_local_io_ctx, m_threads->work_queue);
+ std::vector<std::string> instance_ids;
+ get_instances(&instance_ids);
+ ASSERT_EQ(0U, instance_ids.size());
+
+ uint64_t size;
+ ASSERT_EQ(-ENOENT, m_local_io_ctx.stat(m_oid, &size, nullptr));
+
+ // Init
+ ASSERT_EQ(0, instance_watcher.init());
+
+ get_instances(&instance_ids);
+ ASSERT_EQ(1U, instance_ids.size());
+ ASSERT_EQ(m_instance_id, instance_ids[0]);
+
+ ASSERT_EQ(0, m_local_io_ctx.stat(m_oid, &size, nullptr));
+ std::list<obj_watch_t> watchers;
+ ASSERT_EQ(0, m_local_io_ctx.list_watchers(m_oid, &watchers));
+ ASSERT_EQ(1U, watchers.size());
+ ASSERT_EQ(m_instance_id, stringify(watchers.begin()->watcher_id));
+
+ get_instances(&instance_ids);
+ ASSERT_EQ(1U, instance_ids.size());
+
+ // Shutdown
+ instance_watcher.shut_down();
+
+ ASSERT_EQ(-ENOENT, m_local_io_ctx.stat(m_oid, &size, nullptr));
+ get_instances(&instance_ids);
+ ASSERT_EQ(0U, instance_ids.size());
+}
+
+TEST_F(TestInstanceWatcher, Remove)
+{
+ std::string instance_id = "instance_id";
+ std::string oid = RBD_MIRROR_INSTANCE_PREFIX + instance_id;
+
+ std::vector<std::string> instance_ids;
+ get_instances(&instance_ids);
+ ASSERT_EQ(0U, instance_ids.size());
+
+ uint64_t size;
+ ASSERT_EQ(-ENOENT, m_local_io_ctx.stat(oid, &size, nullptr));
+
+ librados::Rados cluster;
+ librados::IoCtx io_ctx;
+ ASSERT_EQ("", connect_cluster_pp(cluster));
+ ASSERT_EQ(0, cluster.ioctx_create(_local_pool_name.c_str(), io_ctx));
+ InstanceWatcher<> instance_watcher(io_ctx, m_threads->work_queue,
+ instance_id);
+ // Init
+ ASSERT_EQ(0, instance_watcher.init());
+
+ get_instances(&instance_ids);
+ ASSERT_EQ(1U, instance_ids.size());
+ ASSERT_EQ(instance_id, instance_ids[0]);
+
+ ASSERT_EQ(0, m_local_io_ctx.stat(oid, &size, nullptr));
+ std::list<obj_watch_t> watchers;
+ ASSERT_EQ(0, m_local_io_ctx.list_watchers(oid, &watchers));
+ ASSERT_EQ(1U, watchers.size());
+
+ get_instances(&instance_ids);
+ ASSERT_EQ(1U, instance_ids.size());
+
+ // Remove
+ C_SaferCond on_remove;
+ InstanceWatcher<>::remove_instance(m_local_io_ctx, m_threads->work_queue,
+ "instance_id", &on_remove);
+ ASSERT_EQ(0, on_remove.wait());
+
+ ASSERT_EQ(-ENOENT, m_local_io_ctx.stat(oid, &size, nullptr));
+ get_instances(&instance_ids);
+ ASSERT_EQ(0U, instance_ids.size());
+
+ // Shutdown
+ instance_watcher.shut_down();
+
+ ASSERT_EQ(-ENOENT, m_local_io_ctx.stat(m_oid, &size, nullptr));
+ get_instances(&instance_ids);
+ ASSERT_EQ(0U, instance_ids.size());
+
+ // Remove NOENT
+ C_SaferCond on_remove_noent;
+ InstanceWatcher<>::remove_instance(m_local_io_ctx, m_threads->work_queue,
+ instance_id, &on_remove_noent);
+ ASSERT_EQ(0, on_remove_noent.wait());
+}
#include <string>
extern void register_test_cluster_watcher();
+extern void register_test_image_sync();
+extern void register_test_instance_watcher();
+extern void register_test_leader_watcher();
extern void register_test_pool_watcher();
extern void register_test_rbd_mirror();
extern void register_test_rbd_mirror_image_deleter();
-extern void register_test_image_sync();
-extern void register_test_leader_watcher();
int main(int argc, char **argv)
{
register_test_cluster_watcher();
+ register_test_image_sync();
+ register_test_instance_watcher();
+ register_test_leader_watcher();
register_test_pool_watcher();
register_test_rbd_mirror();
register_test_rbd_mirror_image_deleter();
- register_test_image_sync();
- register_test_leader_watcher();
::testing::InitGoogleTest(&argc, argv);
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/ManagedLock.h"
+#include "test/librbd/mock/MockImageCtx.h"
+#include "test/librados_test_stub/MockTestMemIoCtxImpl.h"
+#include "test/rbd_mirror/test_mock_fixture.h"
+#include "tools/rbd_mirror/InstanceWatcher.h"
+#include "tools/rbd_mirror/Threads.h"
+
+namespace librbd {
+
+namespace {
+
+struct MockTestImageCtx : public MockImageCtx {
+ MockTestImageCtx(librbd::ImageCtx &image_ctx)
+ : librbd::MockImageCtx(image_ctx) {
+ }
+};
+
+} // anonymous namespace
+
+template <>
+struct ManagedLock<MockTestImageCtx> {
+ static ManagedLock* s_instance;
+
+ static ManagedLock *create(librados::IoCtx& ioctx, ContextWQ *work_queue,
+ const std::string& oid, librbd::Watcher *watcher,
+ managed_lock::Mode mode,
+ bool blacklist_on_break_lock,
+ uint32_t blacklist_expire_seconds) {
+ assert(s_instance != nullptr);
+ return s_instance;
+ }
+
+ ManagedLock() {
+ assert(s_instance == nullptr);
+ s_instance = this;
+ }
+
+ ~ManagedLock() {
+ assert(s_instance == this);
+ s_instance = nullptr;
+ }
+
+ MOCK_METHOD0(destroy, void());
+ MOCK_METHOD1(shut_down, void(Context *));
+ MOCK_METHOD1(acquire_lock, void(Context *));
+ MOCK_METHOD2(get_locker, void(managed_lock::Locker *, Context *));
+ MOCK_METHOD3(break_lock, void(const managed_lock::Locker &, bool, Context *));
+};
+
+ManagedLock<MockTestImageCtx> *ManagedLock<MockTestImageCtx>::s_instance = nullptr;
+
+} // namespace librbd
+
+// template definitions
+#include "tools/rbd_mirror/InstanceWatcher.cc"
+template class rbd::mirror::InstanceWatcher<librbd::MockTestImageCtx>;
+
+namespace rbd {
+namespace mirror {
+
+using ::testing::_;
+using ::testing::InSequence;
+using ::testing::Invoke;
+using ::testing::Return;
+using ::testing::StrEq;
+using ::testing::WithArg;
+
+class TestMockInstanceWatcher : public TestMockFixture {
+public:
+ typedef librbd::ManagedLock<librbd::MockTestImageCtx> MockManagedLock;
+ typedef InstanceWatcher<librbd::MockTestImageCtx> MockInstanceWatcher;
+
+ std::string m_instance_id;
+ std::string m_oid;
+
+ void SetUp() override {
+ TestFixture::SetUp();
+ m_local_io_ctx.remove(RBD_MIRROR_LEADER);
+ EXPECT_EQ(0, m_local_io_ctx.create(RBD_MIRROR_LEADER, true));
+
+ m_instance_id = stringify(m_local_io_ctx.get_instance_id());
+ m_oid = RBD_MIRROR_INSTANCE_PREFIX + m_instance_id;
+ }
+
+ void expect_register_watch(librados::MockTestMemIoCtxImpl &mock_io_ctx) {
+ EXPECT_CALL(mock_io_ctx, aio_watch(m_oid, _, _, _));
+ }
+
+ void expect_unregister_watch(librados::MockTestMemIoCtxImpl &mock_io_ctx) {
+ EXPECT_CALL(mock_io_ctx, aio_unwatch(_, _));
+ }
+
+ void expect_register_instance(librados::MockTestMemIoCtxImpl &mock_io_ctx,
+ int r) {
+ EXPECT_CALL(mock_io_ctx, exec(RBD_MIRROR_LEADER, _, StrEq("rbd"),
+ StrEq("mirror_instances_add"), _, _, _))
+ .WillOnce(Return(r));
+ }
+
+ void expect_unregister_instance(librados::MockTestMemIoCtxImpl &mock_io_ctx,
+ int r) {
+ EXPECT_CALL(mock_io_ctx, exec(RBD_MIRROR_LEADER, _, StrEq("rbd"),
+ StrEq("mirror_instances_remove"), _, _, _))
+ .WillOnce(Return(r));
+ }
+
+ void expect_acquire_lock(MockManagedLock &mock_managed_lock, int r) {
+ EXPECT_CALL(mock_managed_lock, acquire_lock(_))
+ .WillOnce(CompleteContext(r));
+ }
+
+ void expect_release_lock(MockManagedLock &mock_managed_lock, int r) {
+ EXPECT_CALL(mock_managed_lock, shut_down(_)).WillOnce(CompleteContext(r));
+ }
+
+ void expect_destroy_lock(MockManagedLock &mock_managed_lock) {
+ EXPECT_CALL(mock_managed_lock, destroy());
+ }
+
+ void expect_get_locker(MockManagedLock &mock_managed_lock,
+ const librbd::managed_lock::Locker &locker, int r) {
+ EXPECT_CALL(mock_managed_lock, get_locker(_, _))
+ .WillOnce(Invoke([r, locker](librbd::managed_lock::Locker *out,
+ Context *ctx) {
+ if (r == 0) {
+ *out = locker;
+ }
+ ctx->complete(r);
+ }));
+ }
+
+ void expect_break_lock(MockManagedLock &mock_managed_lock,
+ const librbd::managed_lock::Locker &locker, int r) {
+ EXPECT_CALL(mock_managed_lock, break_lock(locker, true, _))
+ .WillOnce(WithArg<2>(CompleteContext(r)));
+ }
+};
+
+TEST_F(TestMockInstanceWatcher, InitShutdown) {
+ MockManagedLock mock_managed_lock;
+ librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(m_local_io_ctx));
+
+ auto instance_watcher = new MockInstanceWatcher(m_local_io_ctx,
+ m_threads->work_queue);
+ InSequence seq;
+
+ // Init
+ expect_register_instance(mock_io_ctx, 0);
+ expect_register_watch(mock_io_ctx);
+ expect_acquire_lock(mock_managed_lock, 0);
+ ASSERT_EQ(0, instance_watcher->init());
+
+ // Shutdown
+ expect_release_lock(mock_managed_lock, 0);
+ expect_unregister_watch(mock_io_ctx);
+ expect_unregister_instance(mock_io_ctx, 0);
+ instance_watcher->shut_down();
+
+ expect_destroy_lock(mock_managed_lock);
+ delete instance_watcher;
+}
+
+TEST_F(TestMockInstanceWatcher, InitError) {
+ MockManagedLock mock_managed_lock;
+ librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(m_local_io_ctx));
+
+ auto instance_watcher = new MockInstanceWatcher(m_local_io_ctx,
+ m_threads->work_queue);
+ InSequence seq;
+
+ expect_register_instance(mock_io_ctx, 0);
+ expect_register_watch(mock_io_ctx);
+ expect_acquire_lock(mock_managed_lock, -EINVAL);
+ expect_unregister_watch(mock_io_ctx);
+ expect_unregister_instance(mock_io_ctx, 0);
+
+ ASSERT_EQ(-EINVAL, instance_watcher->init());
+
+ expect_destroy_lock(mock_managed_lock);
+ delete instance_watcher;
+}
+
+TEST_F(TestMockInstanceWatcher, ShutdownError) {
+ MockManagedLock mock_managed_lock;
+ librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(m_local_io_ctx));
+
+ auto instance_watcher = new MockInstanceWatcher(m_local_io_ctx,
+ m_threads->work_queue);
+ InSequence seq;
+
+ // Init
+ expect_register_instance(mock_io_ctx, 0);
+ expect_register_watch(mock_io_ctx);
+ expect_acquire_lock(mock_managed_lock, 0);
+ ASSERT_EQ(0, instance_watcher->init());
+
+ // Shutdown
+ expect_release_lock(mock_managed_lock, -EINVAL);
+ expect_unregister_watch(mock_io_ctx);
+ expect_unregister_instance(mock_io_ctx, 0);
+ instance_watcher->shut_down();
+
+ expect_destroy_lock(mock_managed_lock);
+ delete instance_watcher;
+}
+
+
+TEST_F(TestMockInstanceWatcher, Remove) {
+ MockManagedLock mock_managed_lock;
+ librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(m_local_io_ctx));
+ librbd::managed_lock::Locker
+ locker{entity_name_t::CLIENT(1), "auto 123", "1.2.3.4:0/0", 123};
+
+ InSequence seq;
+
+ expect_get_locker(mock_managed_lock, locker, 0);
+ expect_break_lock(mock_managed_lock, locker, 0);
+ expect_unregister_instance(mock_io_ctx, 0);
+ expect_destroy_lock(mock_managed_lock);
+
+ C_SaferCond on_remove;
+ MockInstanceWatcher::remove_instance(m_local_io_ctx, m_threads->work_queue,
+ "instance_id", &on_remove);
+ ASSERT_EQ(0, on_remove.wait());
+}
+
+TEST_F(TestMockInstanceWatcher, RemoveNoent) {
+ MockManagedLock mock_managed_lock;
+ librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(m_local_io_ctx));
+
+ InSequence seq;
+
+ expect_get_locker(mock_managed_lock, librbd::managed_lock::Locker(), -ENOENT);
+ expect_unregister_instance(mock_io_ctx, 0);
+ expect_destroy_lock(mock_managed_lock);
+
+ C_SaferCond on_remove;
+ MockInstanceWatcher::remove_instance(m_local_io_ctx, m_threads->work_queue,
+ "instance_id", &on_remove);
+ ASSERT_EQ(0, on_remove.wait());
+}
+
+} // namespace mirror
+} // namespace rbd
set(rbd_mirror_internal
ClusterWatcher.cc
- ImageReplayer.cc
ImageDeleter.cc
+ ImageReplayer.cc
ImageSync.cc
ImageSyncThrottler.cc
+ InstanceWatcher.cc
LeaderWatcher.cc
Mirror.cc
MirrorStatusWatcher.cc
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "InstanceWatcher.h"
+#include "include/stringify.h"
+#include "common/debug.h"
+#include "common/errno.h"
+#include "cls/rbd/cls_rbd_client.h"
+#include "librbd/ManagedLock.h"
+#include "librbd/Utils.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_rbd_mirror
+#undef dout_prefix
+#define dout_prefix *_dout << "rbd::mirror::InstanceWatcher: " \
+ << this << " " << __func__ << ": "
+
+namespace rbd {
+namespace mirror {
+
+using librbd::util::create_async_context_callback;
+using librbd::util::create_context_callback;
+using librbd::util::create_rados_ack_callback;
+
+namespace {
+
+struct C_GetInstances : public Context {
+ std::vector<std::string> *instance_ids;
+ Context *on_finish;
+ bufferlist out_bl;
+
+ C_GetInstances(std::vector<std::string> *instance_ids, Context *on_finish)
+ : instance_ids(instance_ids), on_finish(on_finish) {
+ }
+
+ void finish(int r) override {
+ if (r == 0) {
+ bufferlist::iterator it = out_bl.begin();
+ r = librbd::cls_client::mirror_instances_list_finish(&it, instance_ids);
+ } else if (r == -ENOENT) {
+ r = 0;
+ }
+ on_finish->complete(r);
+ }
+};
+
+template <typename I>
+struct RemoveInstanceRequest : public Context {
+ InstanceWatcher<I> instance_watcher;
+ Context *on_finish;
+
+ RemoveInstanceRequest(librados::IoCtx &io_ctx, ContextWQ *work_queue,
+ const std::string &instance_id, Context *on_finish)
+ : instance_watcher(io_ctx, work_queue, instance_id), on_finish(on_finish) {
+ }
+
+ void send() {
+ instance_watcher.remove(this);
+ }
+
+ void finish(int r) override {
+ assert(r == 0);
+
+ on_finish->complete(r);
+ }
+};
+
+} // anonymous namespace
+
+template <typename I>
+void InstanceWatcher<I>::get_instances(librados::IoCtx &io_ctx,
+ std::vector<std::string> *instance_ids,
+ Context *on_finish) {
+ librados::ObjectReadOperation op;
+ librbd::cls_client::mirror_instances_list_start(&op);
+ C_GetInstances *ctx = new C_GetInstances(instance_ids, on_finish);
+ librados::AioCompletion *aio_comp = create_rados_ack_callback(ctx);
+
+ int r = io_ctx.aio_operate(RBD_MIRROR_LEADER, aio_comp, &op, &ctx->out_bl);
+ assert(r == 0);
+ aio_comp->release();
+}
+
+template <typename I>
+void InstanceWatcher<I>::remove_instance(librados::IoCtx &io_ctx,
+ ContextWQ *work_queue,
+ const std::string &instance_id,
+ Context *on_finish) {
+ auto req = new RemoveInstanceRequest<I>(io_ctx, work_queue, instance_id,
+ on_finish);
+ req->send();
+}
+
+template <typename I>
+InstanceWatcher<I>::InstanceWatcher(librados::IoCtx &io_ctx,
+ ContextWQ *work_queue,
+ const boost::optional<std::string> &id)
+ : Watcher(io_ctx, work_queue, RBD_MIRROR_INSTANCE_PREFIX +
+ (id ? *id : stringify(io_ctx.get_instance_id()))),
+ m_instance_id(id ? *id : stringify(io_ctx.get_instance_id())),
+ m_lock("rbd::mirror::InstanceWatcher " + io_ctx.get_pool_name()),
+ m_instance_lock(librbd::ManagedLock<I>::create(
+ m_ioctx, m_work_queue, m_oid, this, librbd::managed_lock::EXCLUSIVE, true,
+ m_cct->_conf->rbd_blacklist_expire_seconds)) {
+}
+
+template <typename I>
+InstanceWatcher<I>::~InstanceWatcher() {
+ m_instance_lock->destroy();
+}
+
+template <typename I>
+int InstanceWatcher<I>::init() {
+ C_SaferCond init_ctx;
+ init(&init_ctx);
+ return init_ctx.wait();
+}
+
+template <typename I>
+void InstanceWatcher<I>::init(Context *on_finish) {
+ dout(20) << dendl;
+
+ Mutex::Locker locker(m_lock);
+
+ assert(m_on_finish == nullptr);
+ m_on_finish = on_finish;
+ m_ret_val = 0;
+
+ register_instance();
+}
+
+template <typename I>
+void InstanceWatcher<I>::shut_down() {
+ C_SaferCond shut_down_ctx;
+ shut_down(&shut_down_ctx);
+ int r = shut_down_ctx.wait();
+ assert(r == 0);
+}
+
+template <typename I>
+void InstanceWatcher<I>::shut_down(Context *on_finish) {
+ dout(20) << dendl;
+
+ Mutex::Locker locker(m_lock);
+
+ assert(m_on_finish == nullptr);
+ m_on_finish = on_finish;
+ m_ret_val = 0;
+
+ release_lock();
+}
+
+template <typename I>
+void InstanceWatcher<I>::remove(Context *on_finish) {
+ dout(20) << dendl;
+
+ Mutex::Locker locker(m_lock);
+
+ assert(m_on_finish == nullptr);
+ m_on_finish = on_finish;
+ m_ret_val = 0;
+ m_removing = true;
+
+ get_instance_locker();
+}
+
+template <typename I>
+void InstanceWatcher<I>::handle_notify(uint64_t notify_id, uint64_t handle,
+ uint64_t notifier_id, bufferlist &bl) {
+ dout(20) << dendl;
+
+ bufferlist out;
+ acknowledge_notify(notify_id, handle, out);
+}
+
+template <typename I>
+void InstanceWatcher<I>::register_instance() {
+ assert(m_lock.is_locked());
+
+ dout(20) << dendl;
+
+ librados::ObjectWriteOperation op;
+ librbd::cls_client::mirror_instances_add(&op, m_instance_id);
+ librados::AioCompletion *aio_comp = create_rados_ack_callback<
+ InstanceWatcher<I>, &InstanceWatcher<I>::handle_register_instance>(this);
+
+ int r = m_ioctx.aio_operate(RBD_MIRROR_LEADER, aio_comp, &op);
+ assert(r == 0);
+ aio_comp->release();
+}
+
+template <typename I>
+void InstanceWatcher<I>::handle_register_instance(int r) {
+ dout(20) << "r=" << r << dendl;
+
+ Context *on_finish = nullptr;
+ {
+ Mutex::Locker locker(m_lock);
+
+ if (r == 0) {
+ create_instance_object();
+ return;
+ }
+
+ derr << "error registering instance: " << cpp_strerror(r) << dendl;
+
+ std::swap(on_finish, m_on_finish);
+ }
+ on_finish->complete(r);
+}
+
+
+template <typename I>
+void InstanceWatcher<I>::create_instance_object() {
+ dout(20) << dendl;
+
+ assert(m_lock.is_locked());
+
+ librados::ObjectWriteOperation op;
+ op.create(true);
+
+ librados::AioCompletion *aio_comp = create_rados_ack_callback<
+ InstanceWatcher<I>,
+ &InstanceWatcher<I>::handle_create_instance_object>(this);
+ int r = m_ioctx.aio_operate(m_oid, aio_comp, &op);
+ assert(r == 0);
+ aio_comp->release();
+}
+
+template <typename I>
+void InstanceWatcher<I>::handle_create_instance_object(int r) {
+ dout(20) << "r=" << r << dendl;
+
+ Mutex::Locker locker(m_lock);
+
+ if (r < 0) {
+ derr << "error creating " << m_oid << " object: " << cpp_strerror(r)
+ << dendl;
+
+ m_ret_val = r;
+ unregister_instance();
+ return;
+ }
+
+ register_watch();
+}
+
+template <typename I>
+void InstanceWatcher<I>::register_watch() {
+ dout(20) << dendl;
+
+ assert(m_lock.is_locked());
+
+ Context *ctx = create_async_context_callback(
+ m_work_queue, create_context_callback<
+ InstanceWatcher<I>, &InstanceWatcher<I>::handle_register_watch>(this));
+
+ librbd::Watcher::register_watch(ctx);
+}
+
+template <typename I>
+void InstanceWatcher<I>::handle_register_watch(int r) {
+ dout(20) << "r=" << r << dendl;
+
+ Mutex::Locker locker(m_lock);
+
+ if (r < 0) {
+ derr << "error registering instance watcher for " << m_oid << " object: "
+ << cpp_strerror(r) << dendl;
+
+ m_ret_val = r;
+ remove_instance_object();
+ return;
+ }
+
+ acquire_lock();
+}
+
+template <typename I>
+void InstanceWatcher<I>::acquire_lock() {
+ dout(20) << dendl;
+
+ assert(m_lock.is_locked());
+
+ Context *ctx = create_async_context_callback(
+ m_work_queue, create_context_callback<
+ InstanceWatcher<I>, &InstanceWatcher<I>::handle_acquire_lock>(this));
+
+ m_instance_lock->acquire_lock(ctx);
+}
+
+template <typename I>
+void InstanceWatcher<I>::handle_acquire_lock(int r) {
+ dout(20) << "r=" << r << dendl;
+
+ Context *on_finish = nullptr;
+ {
+ Mutex::Locker locker(m_lock);
+
+ if (r < 0) {
+
+ derr << "error acquiring instance lock: " << cpp_strerror(r) << dendl;
+
+ m_ret_val = r;
+ unregister_watch();
+ return;
+ }
+
+ std::swap(on_finish, m_on_finish);
+ }
+ on_finish->complete(r);
+}
+
+template <typename I>
+void InstanceWatcher<I>::release_lock() {
+ dout(20) << dendl;
+
+ assert(m_lock.is_locked());
+
+ Context *ctx = create_async_context_callback(
+ m_work_queue, create_context_callback<
+ InstanceWatcher<I>, &InstanceWatcher<I>::handle_release_lock>(this));
+
+ m_instance_lock->shut_down(ctx);
+}
+
+template <typename I>
+void InstanceWatcher<I>::handle_release_lock(int r) {
+ dout(20) << "r=" << r << dendl;
+
+ Mutex::Locker locker(m_lock);
+
+ if (r < 0) {
+ derr << "error releasing instance lock: " << cpp_strerror(r) << dendl;
+ }
+
+ unregister_watch();
+}
+
+template <typename I>
+void InstanceWatcher<I>::unregister_watch() {
+ dout(20) << dendl;
+
+ assert(m_lock.is_locked());
+
+ Context *ctx = create_async_context_callback(
+ m_work_queue, create_context_callback<
+ InstanceWatcher<I>, &InstanceWatcher<I>::handle_unregister_watch>(this));
+
+ librbd::Watcher::unregister_watch(ctx);
+}
+
+template <typename I>
+void InstanceWatcher<I>::handle_unregister_watch(int r) {
+ dout(20) << "r=" << r << dendl;
+
+ if (r < 0) {
+ derr << "error unregistering instance watcher for " << m_oid << " object: "
+ << cpp_strerror(r) << dendl;
+ }
+
+ Mutex::Locker locker(m_lock);
+ remove_instance_object();
+}
+
+template <typename I>
+void InstanceWatcher<I>::remove_instance_object() {
+ assert(m_lock.is_locked());
+
+ dout(20) << dendl;
+
+ librados::ObjectWriteOperation op;
+ op.remove();
+
+ librados::AioCompletion *aio_comp = create_rados_ack_callback<
+ InstanceWatcher<I>,
+ &InstanceWatcher<I>::handle_remove_instance_object>(this);
+ int r = m_ioctx.aio_operate(m_oid, aio_comp, &op);
+ assert(r == 0);
+ aio_comp->release();
+}
+
+template <typename I>
+void InstanceWatcher<I>::handle_remove_instance_object(int r) {
+ dout(20) << "r=" << r << dendl;
+
+ if (m_removing && r == -ENOENT) {
+ r = 0;
+ }
+
+ if (r < 0) {
+ derr << "error removing " << m_oid << " object: " << cpp_strerror(r)
+ << dendl;
+ }
+
+ Mutex::Locker locker(m_lock);
+ unregister_instance();
+}
+
+template <typename I>
+void InstanceWatcher<I>::unregister_instance() {
+ dout(20) << dendl;
+
+ assert(m_lock.is_locked());
+
+ librados::ObjectWriteOperation op;
+ librbd::cls_client::mirror_instances_remove(&op, m_instance_id);
+ librados::AioCompletion *aio_comp = create_rados_ack_callback<
+ InstanceWatcher<I>, &InstanceWatcher<I>::handle_unregister_instance>(this);
+
+ int r = m_ioctx.aio_operate(RBD_MIRROR_LEADER, aio_comp, &op);
+ assert(r == 0);
+ aio_comp->release();
+}
+
+template <typename I>
+void InstanceWatcher<I>::handle_unregister_instance(int r) {
+ dout(20) << "r=" << r << dendl;
+
+ if (r < 0) {
+ derr << "error unregistering instance: " << cpp_strerror(r) << dendl;
+ }
+
+ Context *on_finish = nullptr;
+ {
+ Mutex::Locker locker(m_lock);
+
+ std::swap(on_finish, m_on_finish);
+ r = m_ret_val;
+
+ if (m_removing) {
+ m_removing = false;
+ }
+ }
+ on_finish->complete(r);
+}
+
+template <typename I>
+void InstanceWatcher<I>::get_instance_locker() {
+ dout(20) << dendl;
+
+ assert(m_lock.is_locked());
+
+ Context *ctx = create_async_context_callback(
+ m_work_queue, create_context_callback<
+ InstanceWatcher<I>, &InstanceWatcher<I>::handle_get_instance_locker>(this));
+
+ m_instance_lock->get_locker(&m_instance_locker, ctx);
+}
+
+template <typename I>
+void InstanceWatcher<I>::handle_get_instance_locker(int r) {
+ dout(20) << "r=" << r << dendl;
+
+ Mutex::Locker locker(m_lock);
+
+ if (r < 0) {
+ if (r != -ENOENT) {
+ derr << "error retrieving instance locker: " << cpp_strerror(r) << dendl;
+ }
+ remove_instance_object();
+ return;
+ }
+
+ break_instance_lock();
+}
+
+template <typename I>
+void InstanceWatcher<I>::break_instance_lock() {
+ dout(20) << dendl;
+
+ assert(m_lock.is_locked());
+
+ Context *ctx = create_async_context_callback(
+ m_work_queue, create_context_callback<
+ InstanceWatcher<I>, &InstanceWatcher<I>::handle_break_instance_lock>(this));
+
+ m_instance_lock->break_lock(m_instance_locker, true, ctx);
+}
+
+template <typename I>
+void InstanceWatcher<I>::handle_break_instance_lock(int r) {
+ dout(20) << "r=" << r << dendl;
+
+ Mutex::Locker locker(m_lock);
+
+ if (r < 0) {
+ if (r != -ENOENT) {
+ derr << "error breaking instance lock: " << cpp_strerror(r) << dendl;
+ }
+ remove_instance_object();
+ return;
+ }
+
+ remove_instance_object();
+}
+
+} // namespace mirror
+} // namespace rbd
+
+template class rbd::mirror::InstanceWatcher<librbd::ImageCtx>;
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_RBD_MIRROR_INSTANCE_WATCHER_H
+#define CEPH_RBD_MIRROR_INSTANCE_WATCHER_H
+
+#include <string>
+#include <vector>
+#include <boost/optional.hpp>
+
+#include "librbd/Watcher.h"
+#include "librbd/managed_lock/Types.h"
+
+namespace librbd {
+ class ImageCtx;
+ template <typename> class ManagedLock;
+}
+
+namespace rbd {
+namespace mirror {
+
+template <typename ImageCtxT = librbd::ImageCtx>
+class InstanceWatcher : protected librbd::Watcher {
+public:
+ static void get_instances(librados::IoCtx &io_ctx,
+ std::vector<std::string> *instance_ids,
+ Context *on_finish);
+ static void remove_instance(librados::IoCtx &io_ctx, ContextWQ *work_queue,
+ const std::string &instance_id,
+ Context *on_finish);
+
+ static InstanceWatcher *create(
+ librados::IoCtx &io_ctx, ContextWQ *work_queue,
+ const boost::optional<std::string> &id = boost::none) {
+ return new InstanceWatcher(io_ctx, work_queue, id);
+ }
+ void destroy() {
+ delete this;
+ }
+
+ InstanceWatcher(librados::IoCtx &io_ctx, ContextWQ *work_queue,
+ const boost::optional<std::string> &id = boost::none);
+ ~InstanceWatcher() override;
+
+ int init();
+ void shut_down();
+
+ void init(Context *on_finish);
+ void shut_down(Context *on_finish);
+ void remove(Context *on_finish);
+
+protected:
+ void handle_notify(uint64_t notify_id, uint64_t handle, uint64_t notifier_id,
+ bufferlist &bl) override;
+
+private:
+ /**
+ * @verbatim
+ *
+ * BREAK_INSTANCE_LOCK -------\
+ * ^ |
+ * | (error) |
+ * GET_INSTANCE_LOCKER * * *>|
+ * ^ (remove) |
+ * | |
+ * <uninitialized> <----------------+--------\
+ * | (init) ^ | |
+ * v (error) * | |
+ * REGISTER_INSTANCE * * * * * *|* *> UNREGISTER_INSTANCE
+ * | * | ^
+ * v (error) * v |
+ * CREATE_INSTANCE_OBJECT * * * * * *> REMOVE_INSTANCE_OBJECT
+ * | * ^
+ * v (error) * |
+ * REGISTER_WATCH * * * * * * * * * *> UNREGISTER_WATCH
+ * | * ^
+ * v (error) * |
+ * ACQUIRE_LOCK * * * * * * * * * * * RELEASE_LOCK
+ * | ^
+ * v (shut_down) |
+ * <watching> -------------------------------/
+ *
+ * @endverbatim
+ */
+
+ bool m_owner;
+ std::string m_instance_id;
+
+ mutable Mutex m_lock;
+ librbd::ManagedLock<ImageCtxT> *m_instance_lock;
+ Context *m_on_finish = nullptr;
+ int m_ret_val = 0;
+ bool m_removing = false;
+ librbd::managed_lock::Locker m_instance_locker;
+
+ void register_instance();
+ void handle_register_instance(int r);
+
+ void create_instance_object();
+ void handle_create_instance_object(int r);
+
+ void register_watch();
+ void handle_register_watch(int r);
+
+ void acquire_lock();
+ void handle_acquire_lock(int r);
+
+ void release_lock();
+ void handle_release_lock(int r);
+
+ void unregister_watch();
+ void handle_unregister_watch(int r);
+
+ void remove_instance_object();
+ void handle_remove_instance_object(int r);
+
+ void unregister_instance();
+ void handle_unregister_instance(int r);
+
+ void get_instance_locker();
+ void handle_get_instance_locker(int r);
+
+ void break_instance_lock();
+ void handle_break_instance_lock(int r);
+};
+
+} // namespace mirror
+} // namespace rbd
+
+#endif // CEPH_RBD_MIRROR_INSTANCE_WATCHER_H
#include "librbd/Utils.h"
#include "librbd/Watcher.h"
#include "librbd/internal.h"
-#include "LeaderWatcher.h"
+#include "InstanceWatcher.h"
#include "Replayer.h"
#include "Threads.h"
if (m_leader_watcher) {
m_leader_watcher->shut_down();
}
+ if (m_instance_watcher) {
+ m_instance_watcher->shut_down();
+ }
}
bool Replayer::is_blacklisted() const {
return r;
}
+ m_instance_watcher.reset(InstanceWatcher<>::create(m_local_io_ctx,
+ m_threads->work_queue));
+ r = m_instance_watcher->init();
+ if (r < 0) {
+ derr << "error initializing instance watcher: " << cpp_strerror(r) << dendl;
+ return r;
+ }
+
// Bootstrap existing mirroring images
init_local_mirroring_images();
#include "ImageDeleter.h"
#include "types.h"
+namespace librbd { class ImageCtx; }
+
namespace rbd {
namespace mirror {
struct Threads;
class ReplayerAdminSocketHook;
+template <typename> class InstanceWatcher;
/**
* Controls mirroring for a single remote cluster.
} m_leader_listener;
std::unique_ptr<LeaderWatcher<> > m_leader_watcher;
+ std::unique_ptr<InstanceWatcher<librbd::ImageCtx> > m_instance_watcher;
};
} // namespace mirror