mock_instance_watcher,
global_image_id,
local_mirror_uuid,
+ {"remote mirror uuid",
+ "remote mirror peer uuid"},
nullptr, nullptr,
&m_mock_state_builder,
&m_do_resync, on_finish);
m_global_image_id, m_threads.get(),
m_instance_watcher, m_local_status_updater,
nullptr);
- m_replayer->add_peer("peer uuid", m_remote_ioctx, nullptr);
+ m_replayer->add_peer({"peer uuid", m_remote_ioctx, {}, nullptr});
}
void start()
static BootstrapRequest* create(
Threads<librbd::MockTestImageCtx>* threads,
librados::IoCtx &local_io_ctx,
- librados::IoCtx &remote_io_ctx,
+ librados::IoCtx& remote_io_ctx,
rbd::mirror::InstanceWatcher<librbd::MockTestImageCtx> *instance_watcher,
const std::string &global_image_id,
const std::string &local_mirror_uuid,
+ const RemotePoolMeta& remote_pool_meta,
::journal::CacheManagerHandler *cache_manager_handler,
rbd::mirror::ProgressContext *progress_ctx,
StateBuilder<librbd::MockTestImageCtx>** state_builder,
m_image_replayer = new MockImageReplayer(
m_local_io_ctx, "local_mirror_uuid", "global image id",
&mock_threads, &m_instance_watcher, &m_local_status_updater, nullptr);
- m_image_replayer->add_peer("peer_uuid", m_remote_io_ctx,
- &m_remote_status_updater);
+ m_image_replayer->add_peer({"peer_uuid", m_remote_io_ctx,
+ {"remote mirror uuid",
+ "remote mirror peer uuid"},
+ &m_remote_status_updater});
}
void wait_for_stopped() {
MOCK_METHOD0(restart, void());
MOCK_METHOD0(flush, void());
MOCK_METHOD1(print_status, void(Formatter *));
- MOCK_METHOD3(add_peer, void(const std::string &, librados::IoCtx &,
- MirrorStatusUpdater<librbd::MockTestImageCtx>*));
+ MOCK_METHOD1(add_peer, void(const Peer<librbd::MockTestImageCtx>& peer));
MOCK_METHOD0(get_global_image_id, const std::string &());
MOCK_METHOD0(get_local_image_id, const std::string &());
MOCK_METHOD0(is_running, bool());
Context *timer_ctx = nullptr;
expect_add_event_after(mock_threads, &timer_ctx);
instance_replayer.init();
- instance_replayer.add_peer("peer_uuid", m_remote_io_ctx, nullptr);
+ instance_replayer.add_peer({"peer_uuid", m_remote_io_ctx, {}, nullptr});
// Acquire
C_SaferCond on_acquire;
- EXPECT_CALL(mock_image_replayer, add_peer("peer_uuid", _, _));
+ EXPECT_CALL(mock_image_replayer, add_peer(_));
EXPECT_CALL(mock_image_replayer, is_stopped()).WillOnce(Return(true));
EXPECT_CALL(mock_image_replayer, is_blacklisted()).WillOnce(Return(false));
EXPECT_CALL(mock_image_replayer, is_finished()).WillOnce(Return(false));
Context *timer_ctx1 = nullptr;
expect_add_event_after(mock_threads, &timer_ctx1);
instance_replayer.init();
- instance_replayer.add_peer("peer_uuid", m_remote_io_ctx, nullptr);
+ instance_replayer.add_peer({"peer_uuid", m_remote_io_ctx, {}, nullptr});
// Acquire
C_SaferCond on_acquire;
- EXPECT_CALL(mock_image_replayer, add_peer("peer_uuid", _, _));
+ EXPECT_CALL(mock_image_replayer, add_peer(_));
EXPECT_CALL(mock_image_replayer, is_stopped()).WillOnce(Return(true));
EXPECT_CALL(mock_image_replayer, is_blacklisted()).WillOnce(Return(false));
EXPECT_CALL(mock_image_replayer, is_finished()).WillOnce(Return(false));
Context *timer_ctx = nullptr;
expect_add_event_after(mock_threads, &timer_ctx);
instance_replayer.init();
- instance_replayer.add_peer("peer_uuid", m_remote_io_ctx, nullptr);
+ instance_replayer.add_peer({"peer_uuid", m_remote_io_ctx, {}, nullptr});
// Acquire
- EXPECT_CALL(mock_image_replayer, add_peer("peer_uuid", _, _));
+ EXPECT_CALL(mock_image_replayer, add_peer(_));
EXPECT_CALL(mock_image_replayer, is_stopped()).WillOnce(Return(true));
EXPECT_CALL(mock_image_replayer, is_blacklisted()).WillOnce(Return(false));
EXPECT_CALL(mock_image_replayer, is_finished()).WillOnce(Return(false));
MOCK_METHOD2(print_status, void(Formatter*, std::stringstream*));
- MOCK_METHOD3(add_peer, void(const std::string&, librados::IoCtx&,
- MirrorStatusUpdater<librbd::MockTestImageCtx>*));
+ MOCK_METHOD1(add_peer, void(const Peer<librbd::MockTestImageCtx>&));
MOCK_METHOD1(init, void(Context*));
MOCK_METHOD1(shut_down, void(Context*));
}
void expect_instance_replayer_add_peer(
- MockInstanceReplayer& mock_instance_replayer, const std::string& uuid) {
- EXPECT_CALL(mock_instance_replayer, add_peer(uuid, _, _));
+ MockInstanceReplayer& mock_instance_replayer) {
+ EXPECT_CALL(mock_instance_replayer, add_peer(_));
}
void expect_instance_replayer_release_all(
MockNamespaceReplayer namespace_replayer(
{}, m_local_io_ctx, m_remote_io_ctx, "local mirror uuid",
- "remote mirror uuid", "siteA", m_mock_threads, nullptr, nullptr, nullptr,
- nullptr);
+ "siteA", "local peer uuid", {"remote mirror uuid", ""}, m_mock_threads,
+ nullptr, nullptr, nullptr, nullptr);
C_SaferCond on_init;
namespace_replayer.init(&on_init);
MockNamespaceReplayer namespace_replayer(
{}, m_local_io_ctx, m_remote_io_ctx, "local mirror uuid",
- "remote mirror uuid", "siteA", m_mock_threads, nullptr, nullptr, nullptr,
- nullptr);
+ "siteA", "local peer uuid", {"remote mirror uuid", ""}, m_mock_threads,
+ nullptr, nullptr, nullptr, nullptr);
C_SaferCond on_init;
namespace_replayer.init(&on_init);
MockNamespaceReplayer namespace_replayer(
{}, m_local_io_ctx, m_remote_io_ctx, "local mirror uuid",
- "remote mirror uuid", "siteA", m_mock_threads, nullptr, nullptr, nullptr,
- nullptr);
+ "siteA", "local peer uuid", {"remote mirror uuid", ""}, m_mock_threads,
+ nullptr, nullptr, nullptr, nullptr);
C_SaferCond on_init;
namespace_replayer.init(&on_init);
auto mock_instance_replayer = new MockInstanceReplayer();
expect_instance_replayer_init(*mock_instance_replayer, 0);
- expect_instance_replayer_add_peer(*mock_instance_replayer,
- "remote mirror uuid");
+ expect_instance_replayer_add_peer(*mock_instance_replayer);
auto mock_instance_watcher = new MockInstanceWatcher();
expect_instance_watcher_init(*mock_instance_watcher, -EINVAL);
MockNamespaceReplayer namespace_replayer(
{}, m_local_io_ctx, m_remote_io_ctx, "local mirror uuid",
- "remote mirror uuid", "siteA", m_mock_threads, nullptr, nullptr, nullptr,
- nullptr);
+ "siteA", "local peer uuid", {"remote mirror uuid", ""}, m_mock_threads,
+ nullptr, nullptr, nullptr, nullptr);
C_SaferCond on_init;
namespace_replayer.init(&on_init);
auto mock_instance_replayer = new MockInstanceReplayer();
expect_instance_replayer_init(*mock_instance_replayer, 0);
- expect_instance_replayer_add_peer(*mock_instance_replayer,
- "remote mirror uuid");
+ expect_instance_replayer_add_peer(*mock_instance_replayer);
auto mock_instance_watcher = new MockInstanceWatcher();
expect_instance_watcher_init(*mock_instance_watcher, 0);
MockServiceDaemon mock_service_daemon;
MockNamespaceReplayer namespace_replayer(
{}, m_local_io_ctx, m_remote_io_ctx, "local mirror uuid",
- "remote mirror uuid", "siteA", m_mock_threads, nullptr, nullptr,
- &mock_service_daemon, nullptr);
+ "siteA", "local peer uuid", {"remote mirror uuid", ""}, m_mock_threads,
+ nullptr, nullptr, &mock_service_daemon, nullptr);
C_SaferCond on_init;
namespace_replayer.init(&on_init);
auto mock_instance_replayer = new MockInstanceReplayer();
expect_instance_replayer_init(*mock_instance_replayer, 0);
- expect_instance_replayer_add_peer(*mock_instance_replayer,
- "remote mirror uuid");
+ expect_instance_replayer_add_peer(*mock_instance_replayer);
auto mock_instance_watcher = new MockInstanceWatcher();
expect_instance_watcher_init(*mock_instance_watcher, 0);
MockServiceDaemon mock_service_daemon;
MockNamespaceReplayer namespace_replayer(
{}, m_local_io_ctx, m_remote_io_ctx, "local mirror uuid",
- "remote mirror uuid", "siteA", m_mock_threads, nullptr, nullptr,
- &mock_service_daemon, nullptr);
+ "siteA", "local peer uuid", {"remote mirror uuid", ""}, m_mock_threads,
+ nullptr, nullptr, &mock_service_daemon, nullptr);
C_SaferCond on_init;
namespace_replayer.init(&on_init);
librados::IoCtx &local_ioctx,
librados::IoCtx &remote_ioctx,
const std::string &local_mirror_uuid,
- const std::string &remote_mirror_uuid,
const std::string &site_name,
+ const std::string& local_mirror_peer_uuid,
+ const RemotePoolMeta& remote_pool_meta,
Threads<librbd::MockTestImageCtx> *threads,
Throttler<librbd::MockTestImageCtx> *image_sync_throttler,
Throttler<librbd::MockTestImageCtx> *image_deletion_throttler,
}
template <typename I>
-void ImageReplayer<I>::add_peer(
- const std::string &peer_uuid, librados::IoCtx &io_ctx,
- MirrorStatusUpdater<I>* remote_status_updater) {
- dout(10) << "peer_uuid=" << &peer_uuid << ", "
- << "remote_status_updater=" << remote_status_updater << dendl;
+void ImageReplayer<I>::add_peer(const Peer<I>& peer) {
+ dout(10) << "peer=" << peer << dendl;
std::lock_guard locker{m_lock};
- auto it = m_peers.find({peer_uuid});
+ auto it = m_peers.find(peer);
if (it == m_peers.end()) {
- m_peers.insert({peer_uuid, io_ctx, remote_status_updater});
+ m_peers.insert(peer);
}
}
// TODO need to support multiple remote images
ceph_assert(!m_peers.empty());
- m_remote_image = {*m_peers.begin()};
+ m_remote_image_peer = *m_peers.begin();
if (on_start_interrupted(m_lock)) {
return;
auto ctx = create_context_callback<
ImageReplayer, &ImageReplayer<I>::handle_bootstrap>(this);
auto request = image_replayer::BootstrapRequest<I>::create(
- m_threads, m_local_io_ctx, m_remote_image.io_ctx, m_instance_watcher,
- m_global_image_id, m_local_mirror_uuid, m_cache_manager_handler,
+ m_threads, m_local_io_ctx, m_remote_image_peer.io_ctx, m_instance_watcher,
+ m_global_image_id, m_local_mirror_uuid,
+ m_remote_image_peer.remote_pool_meta, m_cache_manager_handler,
&m_progress_cxt, &m_state_builder, &m_resync_requested, ctx);
request->get();
dout(15) << "status=" << status << dendl;
m_local_status_updater->set_mirror_image_status(m_global_image_id, status,
force);
- if (m_remote_image.mirror_status_updater != nullptr) {
- m_remote_image.mirror_status_updater->set_mirror_image_status(
+ if (m_remote_image_peer.mirror_status_updater != nullptr) {
+ m_remote_image_peer.mirror_status_updater->set_mirror_image_status(
m_global_image_id, status, force);
}
return;
}
- if (m_remote_image.mirror_status_updater != nullptr &&
- m_remote_image.mirror_status_updater->exists(m_global_image_id)) {
+ if (m_remote_image_peer.mirror_status_updater != nullptr &&
+ m_remote_image_peer.mirror_status_updater->exists(m_global_image_id)) {
dout(15) << "removing remote mirror image status" << dendl;
auto ctx = new LambdaContext([this, r](int) {
handle_shut_down(r);
});
- m_remote_image.mirror_status_updater->remove_mirror_image_status(
+ m_remote_image_peer.mirror_status_updater->remove_mirror_image_status(
m_global_image_id, ctx);
return;
}
image_replayer::HealthState get_health_state() const;
- void add_peer(const std::string &peer_uuid, librados::IoCtx &remote_io_ctx,
- MirrorStatusUpdater<ImageCtxT>* remote_status_updater);
+ void add_peer(const Peer<ImageCtxT>& peer);
inline int64_t get_local_pool_id() const {
return m_local_io_ctx.get_id();
STATE_STOPPED,
};
- struct RemoteImage {
- librados::IoCtx io_ctx;
- MirrorStatusUpdater<ImageCtxT>* mirror_status_updater = nullptr;
-
- RemoteImage() {
- }
- RemoteImage(const Peer<ImageCtxT>& peer)
- : io_ctx(peer.io_ctx), mirror_status_updater(peer.mirror_status_updater) {
- }
- };
struct ReplayerListener;
typedef boost::optional<State> OptionalState;
journal::CacheManagerHandler *m_cache_manager_handler;
Peers m_peers;
- RemoteImage m_remote_image;
+ Peer<ImageCtxT> m_remote_image_peer;
std::string m_local_image_name;
std::string m_image_spec;
}
template <typename I>
-void InstanceReplayer<I>::add_peer(
- std::string peer_uuid, librados::IoCtx io_ctx,
- MirrorStatusUpdater<I>* remote_status_updater) {
- dout(10) << peer_uuid << dendl;
+void InstanceReplayer<I>::add_peer(const Peer<I>& peer) {
+ dout(10) << "peer=" << peer << dendl;
std::lock_guard locker{m_lock};
- auto result = m_peers.insert(
- Peer(peer_uuid, io_ctx, remote_status_updater)).second;
+ auto result = m_peers.insert(peer).second;
ceph_assert(result);
}
// TODO only a single peer is currently supported
ceph_assert(m_peers.size() == 1);
auto peer = *m_peers.begin();
- image_replayer->add_peer(peer.peer_uuid, peer.io_ctx,
- peer.mirror_status_updater);
+ image_replayer->add_peer(peer);
start_image_replayer(image_replayer);
} else {
// A duplicate acquire notification implies (1) connection hiccup or
void init(Context *on_finish);
void shut_down(Context *on_finish);
- void add_peer(std::string peer_uuid, librados::IoCtx io_ctx,
- MirrorStatusUpdater<ImageCtxT>* remote_status_updater);
+ void add_peer(const Peer<ImageCtxT>& peer);
void acquire_image(InstanceWatcher<ImageCtxT> *instance_watcher,
const std::string &global_image_id, Context *on_finish);
NamespaceReplayer<I>::NamespaceReplayer(
const std::string &name,
librados::IoCtx &local_io_ctx, librados::IoCtx &remote_io_ctx,
- const std::string &local_mirror_uuid, const std::string &remote_mirror_uuid,
- const std::string &local_site_name, Threads<I> *threads,
+ const std::string &local_mirror_uuid, const std::string &local_site_name,
+ const std::string& local_mirror_peer_uuid,
+ const RemotePoolMeta& remote_pool_meta, Threads<I> *threads,
Throttler<I> *image_sync_throttler, Throttler<I> *image_deletion_throttler,
ServiceDaemon<I> *service_daemon,
journal::CacheManagerHandler *cache_manager_handler) :
m_namespace_name(name),
m_local_mirror_uuid(local_mirror_uuid),
- m_remote_mirror_uuid(remote_mirror_uuid),
m_local_site_name(local_site_name),
+ m_local_mirror_peer_uuid(local_mirror_peer_uuid),
+ m_remote_pool_meta(remote_pool_meta),
m_threads(threads), m_image_sync_throttler(image_sync_throttler),
m_image_deletion_throttler(image_deletion_throttler),
m_service_daemon(service_daemon),
return;
}
- m_instance_replayer->add_peer(m_remote_mirror_uuid, m_remote_io_ctx,
- m_remote_status_updater.get());
+ m_instance_replayer->add_peer({m_local_mirror_peer_uuid, m_remote_io_ctx,
+ m_remote_pool_meta,
+ m_remote_status_updater.get()});
init_instance_watcher();
}
librados::IoCtx &local_ioctx,
librados::IoCtx &remote_ioctx,
const std::string &local_mirror_uuid,
- const std::string &remote_mirror_uuid,
const std::string &local_site_name,
+ const std::string &local_mirror_peer_uuid,
+ const RemotePoolMeta& remote_pool_meta,
Threads<ImageCtxT> *threads,
Throttler<ImageCtxT> *image_sync_throttler,
Throttler<ImageCtxT> *image_deletion_throttler,
ServiceDaemon<ImageCtxT> *service_daemon,
journal::CacheManagerHandler *cache_manager_handler) {
return new NamespaceReplayer(name, local_ioctx, remote_ioctx,
- local_mirror_uuid, remote_mirror_uuid,
- local_site_name, threads, image_sync_throttler,
+ local_mirror_uuid, local_site_name,
+ local_mirror_peer_uuid, remote_pool_meta,
+ threads, image_sync_throttler,
image_deletion_throttler, service_daemon,
cache_manager_handler);
}
librados::IoCtx &local_ioctx,
librados::IoCtx &remote_ioctx,
const std::string &local_mirror_uuid,
- const std::string &remote_mirror_uuid,
const std::string &local_site_name,
+ const std::string& local_mirror_peer_uuid,
+ const RemotePoolMeta& remote_pool_meta,
Threads<ImageCtxT> *threads,
Throttler<ImageCtxT> *image_sync_throttler,
Throttler<ImageCtxT> *image_deletion_throttler,
librados::IoCtx m_local_io_ctx;
librados::IoCtx m_remote_io_ctx;
std::string m_local_mirror_uuid;
- std::string m_remote_mirror_uuid;
std::string m_local_site_name;
+ std::string m_local_mirror_peer_uuid;
+ RemotePoolMeta m_remote_pool_meta;
Threads<ImageCtxT> *m_threads;
Throttler<ImageCtxT> *m_image_sync_throttler;
Throttler<ImageCtxT> *m_image_deletion_throttler;
ceph_assert(!m_remote_pool_meta.mirror_uuid.empty());
m_default_namespace_replayer.reset(NamespaceReplayer<I>::create(
- "", m_local_io_ctx, m_remote_io_ctx, m_local_mirror_uuid, m_peer.uuid,
- m_site_name, m_threads, m_image_sync_throttler.get(),
+ "", m_local_io_ctx, m_remote_io_ctx, m_local_mirror_uuid, m_site_name,
+ m_peer.uuid, m_remote_pool_meta, m_threads, m_image_sync_throttler.get(),
m_image_deletion_throttler.get(), m_service_daemon,
m_cache_manager_handler));
for (auto &name : mirroring_namespaces) {
auto namespace_replayer = NamespaceReplayer<I>::create(
- name, m_local_io_ctx, m_remote_io_ctx, m_local_mirror_uuid, m_peer.uuid,
- m_site_name, m_threads, m_image_sync_throttler.get(),
- m_image_deletion_throttler.get(), m_service_daemon,
- m_cache_manager_handler);
+ name, m_local_io_ctx, m_remote_io_ctx, m_local_mirror_uuid, m_site_name,
+ m_peer.uuid, m_remote_pool_meta, m_threads,
+ m_image_sync_throttler.get(), m_image_deletion_throttler.get(),
+ m_service_daemon, m_cache_manager_handler);
auto on_init = new LambdaContext(
[this, namespace_replayer, name, &mirroring_namespaces,
ctx=gather_ctx->new_sub()](int r) {
struct RemotePoolMeta {
RemotePoolMeta() {}
- RemotePoolMeta(const std::string& remote_mirror_uuid,
- const std::string& remote_mirror_peer_uuid)
- : mirror_uuid(remote_mirror_uuid),
- mirror_peer_uuid(remote_mirror_peer_uuid) {
+ RemotePoolMeta(const std::string& mirror_uuid,
+ const std::string& mirror_peer_uuid)
+ : mirror_uuid(mirror_uuid),
+ mirror_peer_uuid(mirror_peer_uuid) {
}
std::string mirror_uuid;
template <typename I>
struct Peer {
- std::string peer_uuid;
- librados::IoCtx io_ctx;
+ std::string uuid;
+ mutable librados::IoCtx io_ctx;
+ RemotePoolMeta remote_pool_meta;
MirrorStatusUpdater<I>* mirror_status_updater = nullptr;
Peer() {
}
- Peer(const std::string &peer_uuid) : peer_uuid(peer_uuid) {
- }
- Peer(const std::string &peer_uuid, librados::IoCtx& io_ctx,
+ Peer(const std::string& uuid,
+ librados::IoCtx& io_ctx,
+ const RemotePoolMeta& remote_pool_meta,
MirrorStatusUpdater<I>* mirror_status_updater)
- : peer_uuid(peer_uuid), io_ctx(io_ctx),
+ : io_ctx(io_ctx),
+ remote_pool_meta(remote_pool_meta),
mirror_status_updater(mirror_status_updater) {
}
inline bool operator<(const Peer &rhs) const {
- return peer_uuid < rhs.peer_uuid;
+ return uuid < rhs.uuid;
}
};
+template <typename I>
+std::ostream& operator<<(std::ostream& lhs, const Peer<I>& peer) {
+ return lhs << peer.remote_pool_meta;
+}
+
struct PeerSpec {
PeerSpec() = default;
PeerSpec(const std::string &uuid, const std::string &cluster_name,
InstanceWatcher<I>* instance_watcher,
const std::string& global_image_id,
const std::string& local_mirror_uuid,
+ const RemotePoolMeta& remote_pool_meta,
::journal::CacheManagerHandler* cache_manager_handler,
ProgressContext* progress_ctx,
StateBuilder<I>** state_builder,
m_instance_watcher(instance_watcher),
m_global_image_id(global_image_id),
m_local_mirror_uuid(local_mirror_uuid),
+ m_remote_pool_meta(remote_pool_meta),
m_cache_manager_handler(cache_manager_handler),
m_progress_ctx(progress_ctx),
m_state_builder(state_builder),
InstanceWatcher<ImageCtxT>* instance_watcher,
const std::string& global_image_id,
const std::string& local_mirror_uuid,
+ const RemotePoolMeta& remote_pool_meta,
::journal::CacheManagerHandler* cache_manager_handler,
ProgressContext* progress_ctx,
StateBuilder<ImageCtxT>** state_builder,
Context* on_finish) {
return new BootstrapRequest(
threads, local_io_ctx, remote_io_ctx, instance_watcher, global_image_id,
- local_mirror_uuid, cache_manager_handler, progress_ctx, state_builder,
- do_resync, on_finish);
+ local_mirror_uuid, remote_pool_meta, cache_manager_handler, progress_ctx,
+ state_builder, do_resync, on_finish);
}
BootstrapRequest(
InstanceWatcher<ImageCtxT>* instance_watcher,
const std::string& global_image_id,
const std::string& local_mirror_uuid,
+ const RemotePoolMeta& remote_pool_meta,
::journal::CacheManagerHandler* cache_manager_handler,
ProgressContext* progress_ctx,
StateBuilder<ImageCtxT>** state_builder,
InstanceWatcher<ImageCtxT> *m_instance_watcher;
std::string m_global_image_id;
std::string m_local_mirror_uuid;
+ RemotePoolMeta m_remote_pool_meta;
::journal::CacheManagerHandler *m_cache_manager_handler;
ProgressContext *m_progress_ctx;
StateBuilder<ImageCtxT>** m_state_builder;