#include "tools/rbd_mirror/LeaderWatcher.h"
#include "tools/rbd_mirror/NamespaceReplayer.h"
#include "tools/rbd_mirror/PoolReplayer.h"
+#include "tools/rbd_mirror/RemotePoolPoller.h"
#include "tools/rbd_mirror/ServiceDaemon.h"
#include "tools/rbd_mirror/Threads.h"
#include "common/Formatter.h"
LeaderWatcher<librbd::MockTestImageCtx>* LeaderWatcher<librbd::MockTestImageCtx>::s_instance = nullptr;
+template<>
+struct RemotePoolPoller<librbd::MockTestImageCtx> {
+ static RemotePoolPoller* s_instance;
+
+ remote_pool_poller::Listener* listener = nullptr;
+
+ static RemotePoolPoller* create(
+ Threads<librbd::MockTestImageCtx>* threads,
+ librados::IoCtx& remote_io_ctx,
+ const std::string& local_site_name,
+ const std::string& local_fsid,
+ remote_pool_poller::Listener& listener) {
+ ceph_assert(s_instance != nullptr);
+ s_instance->listener = &listener;
+ return s_instance;
+ }
+
+ MOCK_METHOD1(init, void(Context*));
+ MOCK_METHOD1(shut_down, void(Context*));
+
+ RemotePoolPoller() {
+ s_instance = this;
+ }
+};
+
+RemotePoolPoller<librbd::MockTestImageCtx>* RemotePoolPoller<librbd::MockTestImageCtx>::s_instance = nullptr;
+
template<>
struct ServiceDaemon<librbd::MockTestImageCtx> {
MOCK_METHOD2(add_namespace, void(int64_t, const std::string &));
typedef PoolReplayer<librbd::MockTestImageCtx> MockPoolReplayer;
typedef Throttler<librbd::MockTestImageCtx> MockThrottler;
typedef NamespaceReplayer<librbd::MockTestImageCtx> MockNamespaceReplayer;
+ typedef RemotePoolPoller<librbd::MockTestImageCtx> MockRemotePoolPoller;
typedef LeaderWatcher<librbd::MockTestImageCtx> MockLeaderWatcher;
typedef ServiceDaemon<librbd::MockTestImageCtx> MockServiceDaemon;
typedef Threads<librbd::MockTestImageCtx> MockThreads;
.Times(AtLeast(0));
}
+ void expect_remote_pool_poller_init(
+ MockRemotePoolPoller& mock_remote_pool_poller,
+ const RemotePoolMeta& remote_pool_meta, int r) {
+ EXPECT_CALL(mock_remote_pool_poller, init(_))
+ .WillOnce(Invoke(
+ [this, &mock_remote_pool_poller, remote_pool_meta, r]
+ (Context* ctx) {
+ if (r >= 0) {
+ mock_remote_pool_poller.listener->handle_updated(
+ remote_pool_meta);
+ }
+
+ m_threads->work_queue->queue(ctx, r);
+ }));
+ }
+
+ void expect_remote_pool_poller_shut_down(
+ MockRemotePoolPoller& mock_remote_pool_poller, int r) {
+ EXPECT_CALL(mock_remote_pool_poller, shut_down(_))
+ .WillOnce(Invoke(
+ [this, r](Context* ctx) {
+ m_threads->work_queue->queue(ctx, r);
+ }));
+ }
+
void expect_namespace_replayer_is_blacklisted(
MockNamespaceReplayer &mock_namespace_replayer,
bool blacklisted) {
expect_create_ioctx(mock_local_rados_client, mock_local_io_ctx);
expect_mirror_uuid_get(mock_local_io_ctx, "uuid", 0);
+ auto mock_remote_pool_poller = new MockRemotePoolPoller();
+ expect_remote_pool_poller_init(*mock_remote_pool_poller,
+ {"remote mirror uuid", ""}, 0);
expect_namespace_replayer_init(*mock_default_namespace_replayer, 0);
expect_leader_watcher_init(*mock_leader_watcher, 0);
expect_leader_watcher_shut_down(*mock_leader_watcher);
expect_namespace_replayer_shut_down(*mock_default_namespace_replayer);
+ expect_remote_pool_poller_shut_down(*mock_remote_pool_poller, 0);
pool_replayer.shut_down();
}
expect_create_ioctx(mock_local_rados_client, mock_local_io_ctx);
expect_mirror_uuid_get(mock_local_io_ctx, "uuid", 0);
+ auto mock_remote_pool_poller = new MockRemotePoolPoller();
+ expect_remote_pool_poller_init(*mock_remote_pool_poller,
+ {"remote mirror uuid", ""}, 0);
expect_namespace_replayer_init(*mock_default_namespace_replayer, 0);
expect_leader_watcher_init(*mock_leader_watcher, 0);
expect_leader_watcher_shut_down(*mock_leader_watcher);
expect_namespace_replayer_shut_down(*mock_default_namespace_replayer);
+ expect_remote_pool_poller_shut_down(*mock_remote_pool_poller, 0);
pool_replayer.shut_down();
}
nullptr);
expect_create_ioctx(mock_local_rados_client, mock_local_io_ctx);
expect_mirror_uuid_get(mock_local_io_ctx, "uuid", 0);
+ auto mock_remote_pool_poller = new MockRemotePoolPoller();
+ expect_remote_pool_poller_init(*mock_remote_pool_poller,
+ {"remote mirror uuid", ""}, 0);
expect_namespace_replayer_init(*mock_default_namespace_replayer, 0);
expect_leader_watcher_init(*mock_leader_watcher, 0);
expect_namespace_replayer_shut_down(*mock_ns1_namespace_replayer);
expect_leader_watcher_shut_down(*mock_leader_watcher);
expect_namespace_replayer_shut_down(*mock_default_namespace_replayer);
+ expect_remote_pool_poller_shut_down(*mock_remote_pool_poller, 0);
pool_replayer.shut_down();
}
nullptr);
expect_create_ioctx(mock_local_rados_client, mock_local_io_ctx);
expect_mirror_uuid_get(mock_local_io_ctx, "uuid", 0);
+ auto mock_remote_pool_poller = new MockRemotePoolPoller();
+ expect_remote_pool_poller_init(*mock_remote_pool_poller,
+ {"remote mirror uuid", ""}, 0);
expect_namespace_replayer_init(*mock_default_namespace_replayer, 0);
expect_leader_watcher_init(*mock_leader_watcher, 0);
expect_leader_watcher_shut_down(*mock_leader_watcher);
expect_namespace_replayer_shut_down(*mock_default_namespace_replayer);
+ expect_remote_pool_poller_shut_down(*mock_remote_pool_poller, 0);
pool_replayer.shut_down();
}
#include "global/global_context.h"
#include "librbd/api/Config.h"
#include "librbd/api/Namespace.h"
+#include "RemotePoolPoller.h"
#include "ServiceDaemon.h"
#include "Threads.h"
} // anonymous namespace
+template <typename I>
+struct PoolReplayer<I>::RemotePoolPollerListener
+ : public remote_pool_poller::Listener {
+
+ PoolReplayer<I>* m_pool_replayer;
+
+ RemotePoolPollerListener(PoolReplayer<I>* pool_replayer)
+ : m_pool_replayer(pool_replayer) {
+ }
+
+ void handle_updated(const RemotePoolMeta& remote_pool_meta) override {
+ m_pool_replayer->handle_remote_pool_meta_updated(remote_pool_meta);
+ }
+};
+
template <typename I>
PoolReplayer<I>::PoolReplayer(
Threads<I> *threads, ServiceDaemon<I> *service_daemon,
m_image_deletion_throttler.reset(
Throttler<I>::create(cct, "rbd_mirror_concurrent_image_deletions"));
+ std::string local_fsid;
+ r = m_local_rados->cluster_fsid(&local_fsid);
+ if (r < 0) {
+ derr << "failed to retrieve local fsid: " << cpp_strerror(r) << dendl;
+ return;
+ }
+
+ m_remote_pool_poller_listener.reset(new RemotePoolPollerListener(this));
+ m_remote_pool_poller.reset(RemotePoolPoller<I>::create(
+ m_threads, m_remote_io_ctx, m_site_name, local_fsid,
+ *m_remote_pool_poller_listener));
+
+ C_SaferCond on_pool_poller_init;
+ m_remote_pool_poller->init(&on_pool_poller_init);
+ r = on_pool_poller_init.wait();
+ if (r < 0) {
+ derr << "failed to initialize remote pool poller: " << cpp_strerror(r)
+ << dendl;
+ m_callout_id = m_service_daemon->add_or_update_callout(
+ m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
+ "unable to initialize remote pool poller");
+ m_remote_pool_poller.reset();
+ return;
+ }
+ ceph_assert(!m_remote_pool_meta.mirror_uuid.empty());
+
m_default_namespace_replayer.reset(NamespaceReplayer<I>::create(
"", m_local_io_ctx, m_remote_io_ctx, m_local_mirror_uuid, m_peer.uuid,
m_site_name, m_threads, m_image_sync_throttler.get(),
}
m_default_namespace_replayer.reset();
+ if (m_remote_pool_poller) {
+ C_SaferCond ctx;
+ m_remote_pool_poller->shut_down(&ctx);
+ ctx.wait();
+ }
+ m_remote_pool_poller.reset();
+ m_remote_pool_poller_listener.reset();
+
m_image_sync_throttler.reset();
m_image_deletion_throttler.reset();
}
}
+template <typename I>
+void PoolReplayer<I>::handle_remote_pool_meta_updated(
+ const RemotePoolMeta& remote_pool_meta) {
+ dout(5) << "remote_pool_meta=" << remote_pool_meta << dendl;
+
+ if (!m_default_namespace_replayer) {
+ m_remote_pool_meta = remote_pool_meta;
+ return;
+ }
+
+ derr << "remote pool metadata updated unexpectedly" << dendl;
+ std::unique_lock locker{m_lock};
+ m_stopping = true;
+ m_cond.notify_all();
+}
+
} // namespace mirror
} // namespace rbd