template <>
struct MirrorStatusUpdater<librbd::MockTestImageCtx> {
- static MirrorStatusUpdater* s_instance;
+ static std::map<std::string, MirrorStatusUpdater*> s_instance;
static MirrorStatusUpdater *create(librados::IoCtx &io_ctx,
Threads<librbd::MockTestImageCtx> *threads,
const std::string& site_name) {
- ceph_assert(s_instance != nullptr);
- return s_instance;
- }
-
- MirrorStatusUpdater() {
- ceph_assert(s_instance == nullptr);
- s_instance = this;
+ ceph_assert(s_instance[site_name] != nullptr);
+ return s_instance[site_name];
}
- ~MirrorStatusUpdater() {
- ceph_assert(s_instance == this);
- s_instance = nullptr;
+ MirrorStatusUpdater(const std::string& site_name) {
+ s_instance[site_name] = this;
}
MOCK_METHOD1(init, void(Context *));
MOCK_METHOD1(shut_down, void(Context *));
};
-MirrorStatusUpdater<librbd::MockTestImageCtx> *MirrorStatusUpdater<librbd::MockTestImageCtx>::s_instance = nullptr;
+std::map<std::string, MirrorStatusUpdater<librbd::MockTestImageCtx> *>
+ MirrorStatusUpdater<librbd::MockTestImageCtx>::s_instance;
template<>
struct PoolWatcher<librbd::MockTestImageCtx> {
TestMockFixture::TearDown();
}
- void expect_mirror_status_watcher_init(
- MockMirrorStatusUpdater &mock_mirror_status_watcher, int r) {
- EXPECT_CALL(mock_mirror_status_watcher, init(_))
+ void expect_mirror_status_updater_init(
+ MockMirrorStatusUpdater &mock_mirror_status_updater, int r) {
+ EXPECT_CALL(mock_mirror_status_updater, init(_))
.WillOnce(CompleteContext(m_mock_threads->work_queue, r));
}
- void expect_mirror_status_watcher_shut_down(
- MockMirrorStatusUpdater &mock_mirror_status_watcher) {
- EXPECT_CALL(mock_mirror_status_watcher, shut_down(_))
+ void expect_mirror_status_updater_shut_down(
+ MockMirrorStatusUpdater &mock_mirror_status_updater) {
+ EXPECT_CALL(mock_mirror_status_updater, shut_down(_))
.WillOnce(CompleteContext(m_mock_threads->work_queue, 0));
}
MockThreads *m_mock_threads;
};
-TEST_F(TestMockNamespaceReplayer, Init_MirrorStatusUpdaterError) {
+TEST_F(TestMockNamespaceReplayer, Init_LocalMirrorStatusUpdaterError) {
+ InSequence seq;
+
+ auto mock_local_mirror_status_updater = new MockMirrorStatusUpdater{""};
+ expect_mirror_status_updater_init(*mock_local_mirror_status_updater, -EINVAL);
+
+ MockNamespaceReplayer namespace_replayer(
+ {}, m_local_io_ctx, m_remote_io_ctx, "local mirror uuid",
+ "remote mirror uuid", "siteA", m_mock_threads, nullptr, nullptr, nullptr,
+ nullptr);
+
+ C_SaferCond on_init;
+ namespace_replayer.init(&on_init);
+ ASSERT_EQ(-EINVAL, on_init.wait());
+}
+
+TEST_F(TestMockNamespaceReplayer, Init_RemoteMirrorStatusUpdaterError) {
InSequence seq;
- auto mock_mirror_status_watcher = new MockMirrorStatusUpdater;
- expect_mirror_status_watcher_init(*mock_mirror_status_watcher, -EINVAL);
+ auto mock_local_mirror_status_updater = new MockMirrorStatusUpdater{""};
+ expect_mirror_status_updater_init(*mock_local_mirror_status_updater, 0);
+
+ auto mock_remote_mirror_status_updater = new MockMirrorStatusUpdater{"siteA"};
+ expect_mirror_status_updater_init(*mock_remote_mirror_status_updater,
+ -EINVAL);
+
+ expect_mirror_status_updater_shut_down(*mock_local_mirror_status_updater);
MockNamespaceReplayer namespace_replayer(
{}, m_local_io_ctx, m_remote_io_ctx, "local mirror uuid",
TEST_F(TestMockNamespaceReplayer, Init_InstanceReplayerError) {
InSequence seq;
- auto mock_mirror_status_watcher = new MockMirrorStatusUpdater;
- expect_mirror_status_watcher_init(*mock_mirror_status_watcher, 0);
+ auto mock_local_mirror_status_updater = new MockMirrorStatusUpdater{""};
+ expect_mirror_status_updater_init(*mock_local_mirror_status_updater, 0);
+
+ auto mock_remote_mirror_status_updater = new MockMirrorStatusUpdater{"siteA"};
+ expect_mirror_status_updater_init(*mock_remote_mirror_status_updater, 0);
auto mock_instance_replayer = new MockInstanceReplayer();
expect_instance_replayer_init(*mock_instance_replayer, -EINVAL);
- expect_mirror_status_watcher_shut_down(*mock_mirror_status_watcher);
+ expect_mirror_status_updater_shut_down(*mock_remote_mirror_status_updater);
+ expect_mirror_status_updater_shut_down(*mock_local_mirror_status_updater);
MockNamespaceReplayer namespace_replayer(
{}, m_local_io_ctx, m_remote_io_ctx, "local mirror uuid",
TEST_F(TestMockNamespaceReplayer, Init_InstanceWatcherError) {
InSequence seq;
- auto mock_mirror_status_watcher = new MockMirrorStatusUpdater;
- expect_mirror_status_watcher_init(*mock_mirror_status_watcher, 0);
+ auto mock_local_mirror_status_updater = new MockMirrorStatusUpdater{""};
+ expect_mirror_status_updater_init(*mock_local_mirror_status_updater, 0);
+
+ auto mock_remote_mirror_status_updater = new MockMirrorStatusUpdater{"siteA"};
+ expect_mirror_status_updater_init(*mock_remote_mirror_status_updater, 0);
auto mock_instance_replayer = new MockInstanceReplayer();
expect_instance_replayer_init(*mock_instance_replayer, 0);
expect_instance_watcher_init(*mock_instance_watcher, -EINVAL);
expect_instance_replayer_shut_down(*mock_instance_replayer);
- expect_mirror_status_watcher_shut_down(*mock_mirror_status_watcher);
+ expect_mirror_status_updater_shut_down(*mock_remote_mirror_status_updater);
+ expect_mirror_status_updater_shut_down(*mock_local_mirror_status_updater);
MockNamespaceReplayer namespace_replayer(
{}, m_local_io_ctx, m_remote_io_ctx, "local mirror uuid",
TEST_F(TestMockNamespaceReplayer, Init) {
InSequence seq;
- auto mock_mirror_status_watcher = new MockMirrorStatusUpdater;
- expect_mirror_status_watcher_init(*mock_mirror_status_watcher, 0);
+ auto mock_local_mirror_status_updater = new MockMirrorStatusUpdater{""};
+ expect_mirror_status_updater_init(*mock_local_mirror_status_updater, 0);
+
+ auto mock_remote_mirror_status_updater = new MockMirrorStatusUpdater{"siteA"};
+ expect_mirror_status_updater_init(*mock_remote_mirror_status_updater, 0);
auto mock_instance_replayer = new MockInstanceReplayer();
expect_instance_replayer_init(*mock_instance_replayer, 0);
expect_instance_replayer_stop(*mock_instance_replayer);
expect_instance_watcher_shut_down(*mock_instance_watcher);
expect_instance_replayer_shut_down(*mock_instance_replayer);
- expect_mirror_status_watcher_shut_down(*mock_mirror_status_watcher);
+ expect_mirror_status_updater_shut_down(*mock_remote_mirror_status_updater);
+ expect_mirror_status_updater_shut_down(*mock_local_mirror_status_updater);
C_SaferCond on_shut_down;
namespace_replayer.shut_down(&on_shut_down);
// init
- auto mock_mirror_status_watcher = new MockMirrorStatusUpdater;
- expect_mirror_status_watcher_init(*mock_mirror_status_watcher, 0);
+ auto mock_local_mirror_status_updater = new MockMirrorStatusUpdater{""};
+ expect_mirror_status_updater_init(*mock_local_mirror_status_updater, 0);
+
+ auto mock_remote_mirror_status_updater = new MockMirrorStatusUpdater{"siteA"};
+ expect_mirror_status_updater_init(*mock_remote_mirror_status_updater, 0);
auto mock_instance_replayer = new MockInstanceReplayer();
expect_instance_replayer_init(*mock_instance_replayer, 0);
expect_instance_replayer_stop(*mock_instance_replayer);
expect_instance_watcher_shut_down(*mock_instance_watcher);
expect_instance_replayer_shut_down(*mock_instance_replayer);
- expect_mirror_status_watcher_shut_down(*mock_mirror_status_watcher);
+ expect_mirror_status_updater_shut_down(*mock_remote_mirror_status_updater);
+ expect_mirror_status_updater_shut_down(*mock_local_mirror_status_updater);
C_SaferCond on_shut_down;
namespace_replayer.shut_down(&on_shut_down);
derr << "error initializing local mirror status updater: "
<< cpp_strerror(r) << dendl;
+ m_local_status_updater.reset();
ceph_assert(m_on_finish != nullptr);
m_threads->work_queue->queue(m_on_finish, r);
m_on_finish = nullptr;
return;
}
+ init_remote_status_updater();
+}
+
+template <typename I>
+void NamespaceReplayer<I>::init_remote_status_updater() {
+ dout(10) << dendl;
+
+ ceph_assert(ceph_mutex_is_locked(m_lock));
+ ceph_assert(!m_remote_status_updater);
+
+ m_remote_status_updater.reset(MirrorStatusUpdater<I>::create(
+ m_remote_io_ctx, m_threads, m_local_site_name));
+ auto ctx = create_context_callback<
+ NamespaceReplayer<I>,
+ &NamespaceReplayer<I>::handle_init_remote_status_updater>(this);
+ m_remote_status_updater->init(ctx);
+}
+
+template <typename I>
+void NamespaceReplayer<I>::handle_init_remote_status_updater(int r) {
+ dout(10) << "r=" << r << dendl;
+
+ std::lock_guard locker{m_lock};
+
+ if (r < 0) {
+ derr << "error initializing remote mirror status updater: "
+ << cpp_strerror(r) << dendl;
+
+ m_remote_status_updater.reset();
+ m_ret_val = r;
+ shut_down_local_status_updater();
+ return;
+ }
+
init_instance_replayer();
}
m_instance_replayer.reset();
m_ret_val = r;
- shut_down_local_status_updater();
+ shut_down_remote_status_updater();
return;
}
m_instance_replayer.reset();
+ shut_down_remote_status_updater();
+}
+
+template <typename I>
+void NamespaceReplayer<I>::shut_down_remote_status_updater() {
+ dout(10) << dendl;
+
+ ceph_assert(ceph_mutex_is_locked(m_lock));
+ ceph_assert(m_remote_status_updater);
+
+ auto ctx = create_async_context_callback(
+ m_threads->work_queue, create_context_callback<
+ NamespaceReplayer<I>,
+ &NamespaceReplayer<I>::handle_shut_down_remote_status_updater>(this));
+ m_remote_status_updater->shut_down(ctx);
+}
+
+template <typename I>
+void NamespaceReplayer<I>::handle_shut_down_remote_status_updater(int r) {
+ dout(10) << "r=" << r << dendl;
+
+ if (r < 0) {
+ derr << "error shutting remote mirror status updater down: "
+ << cpp_strerror(r) << dendl;
+ }
+
+ std::lock_guard locker{m_lock};
+ m_remote_status_updater.reset();
+
shut_down_local_status_updater();
}
dout(10) << "r=" << r << dendl;
if (r < 0) {
- derr << "error shutting mirror status watcher down: " << cpp_strerror(r)
- << dendl;
+ derr << "error shutting local mirror status updater down: "
+ << cpp_strerror(r) << dendl;
}
std::lock_guard locker{m_lock};
/**
* @verbatim
*
- * <uninitialized> <--------------------------------\
- * | (init) ^ (error) |
- * v * |
- * INIT_LOCAL_STATUS_UPDATER * * * * * * > SHUT_DOWN_LOCAL_STATUS_UPDATER
- * | * (error) ^
- * v * |
- * INIT_INSTANCE_REPLAYER * * * * * * * > SHUT_DOWN_INSTANCE_REPLAYER
- * | * ^
- * v * |
- * INIT_INSTANCE_WATCHER * * * * * * * * SHUT_DOWN_INSTANCE_WATCHER
- * | (error) ^
- * | |
- * v STOP_INSTANCE_REPLAYER
- * | ^
- * | (shut down) |
- * | /------------------------------------------/
+ * <uninitialized> <------------------------------------\
+ * | (init) ^ (error) |
+ * v * |
+ * INIT_LOCAL_STATUS_UPDATER * * * * * * * * > SHUT_DOWN_LOCAL_STATUS_UPDATER
+ * | * (error) ^
+ * v * |
+ * INIT_REMOTE_STATUS_UPDATER * * * * * * * > SHUT_DOWN_REMOTE_STATUS_UPDATER
+ * | * (error) ^
+ * v * |
+ * INIT_INSTANCE_REPLAYER * * * * * * * * * > SHUT_DOWN_INSTANCE_REPLAYER
+ * | * ^
+ * v * |
+ * INIT_INSTANCE_WATCHER * * * * * * * * * * SHUT_DOWN_INSTANCE_WATCHER
+ * | (error) ^
+ * | |
+ * v STOP_INSTANCE_REPLAYER
+ * | ^
+ * | (shut down) |
+ * | /----------------------------------------------/
* v |
* <follower> <---------------------------\
* . |
void init_local_status_updater();
void handle_init_local_status_updater(int r);
+ void init_remote_status_updater();
+ void handle_init_remote_status_updater(int r);
+
void init_instance_replayer();
void handle_init_instance_replayer(int r);
void shut_down_instance_replayer();
void handle_shut_down_instance_replayer(int r);
+ void shut_down_remote_status_updater();
+ void handle_shut_down_remote_status_updater(int r);
+
void shut_down_local_status_updater();
void handle_shut_down_local_status_updater(int r);
Context *m_on_finish = nullptr;
std::unique_ptr<MirrorStatusUpdater<ImageCtxT>> m_local_status_updater;
+ std::unique_ptr<MirrorStatusUpdater<ImageCtxT>> m_remote_status_updater;
PoolWatcherListener m_local_pool_watcher_listener;
std::unique_ptr<PoolWatcher<ImageCtxT>> m_local_pool_watcher;