MockNamespaceReplayer namespace_replayer(
{}, m_local_io_ctx, m_remote_io_ctx, "local mirror uuid",
- "remote mirror uuid", m_mock_threads, nullptr, nullptr, nullptr, nullptr);
+ "remote mirror uuid", "siteA", m_mock_threads, nullptr, nullptr, nullptr,
+ nullptr);
C_SaferCond on_init;
namespace_replayer.init(&on_init);
MockNamespaceReplayer namespace_replayer(
{}, m_local_io_ctx, m_remote_io_ctx, "local mirror uuid",
- "remote mirror uuid", m_mock_threads, nullptr, nullptr, nullptr, nullptr);
+ "remote mirror uuid", "siteA", m_mock_threads, nullptr, nullptr, nullptr,
+ nullptr);
C_SaferCond on_init;
namespace_replayer.init(&on_init);
MockNamespaceReplayer namespace_replayer(
{}, m_local_io_ctx, m_remote_io_ctx, "local mirror uuid",
- "remote mirror uuid", m_mock_threads, nullptr, nullptr, nullptr, nullptr);
+ "remote mirror uuid", "siteA", m_mock_threads, nullptr, nullptr, nullptr,
+ nullptr);
C_SaferCond on_init;
namespace_replayer.init(&on_init);
MockNamespaceReplayer namespace_replayer(
{}, m_local_io_ctx, m_remote_io_ctx, "local mirror uuid",
- "remote mirror uuid", m_mock_threads, nullptr, nullptr,
+ "remote mirror uuid", "siteA", m_mock_threads, nullptr, nullptr,
&mock_service_daemon, nullptr);
C_SaferCond on_init;
MockNamespaceReplayer namespace_replayer(
{}, m_local_io_ctx, m_remote_io_ctx, "local mirror uuid",
- "remote mirror uuid", m_mock_threads, nullptr, nullptr,
+ "remote mirror uuid", "siteA", m_mock_threads, nullptr, nullptr,
&mock_service_daemon, nullptr);
C_SaferCond on_init;
librados::IoCtx &remote_ioctx,
const std::string &local_mirror_uuid,
const std::string &remote_mirror_uuid,
+ const std::string &site_name,
Threads<librbd::MockTestImageCtx> *threads,
Throttler<librbd::MockTestImageCtx> *image_sync_throttler,
Throttler<librbd::MockTestImageCtx> *image_deletion_throttler,
MockServiceDaemon mock_service_daemon;
MockPoolReplayer pool_replayer(&mock_threads, &mock_service_daemon, nullptr,
m_local_io_ctx.get_id(), peer_spec, {});
- pool_replayer.init();
+ pool_replayer.init("siteA");
ASSERT_TRUE(remote_cct != nullptr);
ASSERT_EQ("123", remote_cct->_conf.get_val<std::string>("mon_host"));
MockServiceDaemon mock_service_daemon;
MockPoolReplayer pool_replayer(&mock_threads, &mock_service_daemon, nullptr,
m_local_io_ctx.get_id(), peer_spec, {});
- pool_replayer.init();
+ pool_replayer.init("siteA");
expect_service_daemon_add_or_update_attribute(
mock_service_daemon, SERVICE_DAEMON_LEADER_KEY, true);
MockServiceDaemon mock_service_daemon;
MockPoolReplayer pool_replayer(&mock_threads, &mock_service_daemon, nullptr,
m_local_io_ctx.get_id(), peer_spec, {});
- pool_replayer.init();
+ pool_replayer.init("siteA");
C_SaferCond on_ns1_init;
expect_namespace_replayer_init(*mock_ns1_namespace_replayer, 0);
MockServiceDaemon mock_service_daemon;
MockPoolReplayer pool_replayer(&mock_threads, &mock_service_daemon, nullptr,
m_local_io_ctx.get_id(), peer_spec, {});
- pool_replayer.init();
+ pool_replayer.init("siteA");
// test namespace replayer init fails for non leader
std::unique_lock l{m_lock};
if (!m_manual_stop) {
if (refresh_pools) {
- update_pool_replayers(m_local_cluster_watcher->get_pool_peers());
+ update_pool_replayers(m_local_cluster_watcher->get_pool_peers(),
+ m_local_cluster_watcher->get_site_name());
}
m_cache_manager_handler->run_cache_manager();
}
}
}
-void Mirror::update_pool_replayers(const PoolPeers &pool_peers)
+void Mirror::update_pool_replayers(const PoolPeers &pool_peers,
+ const std::string& site_name)
{
dout(20) << "enter" << dendl;
ceph_assert(ceph_mutex_is_locked(m_lock));
auto pool_replayers_it = m_pool_replayers.find(pool_peer);
if (pool_replayers_it != m_pool_replayers.end()) {
auto& pool_replayer = pool_replayers_it->second;
- if (pool_replayer->is_blacklisted()) {
+ if (!m_site_name.empty() && !site_name.empty() &&
+ m_site_name != site_name) {
+ dout(0) << "restarting pool replayer for " << peer << " due to "
+ << "updated site name" << dendl;
+ // TODO: make async
+ pool_replayer->shut_down();
+ pool_replayer->init(site_name);
+ } else if (pool_replayer->is_blacklisted()) {
derr << "restarting blacklisted pool replayer for " << peer << dendl;
// TODO: make async
pool_replayer->shut_down();
- pool_replayer->init();
+ pool_replayer->init(site_name);
} else if (!pool_replayer->is_running()) {
derr << "restarting failed pool replayer for " << peer << dendl;
// TODO: make async
pool_replayer->shut_down();
- pool_replayer->init();
+ pool_replayer->init(site_name);
}
} else {
dout(20) << "starting pool replayer for " << peer << dendl;
m_args));
// TODO: make async
- pool_replayer->init();
+ pool_replayer->init(site_name);
m_pool_replayers.emplace(pool_peer, std::move(pool_replayer));
}
}
// TODO currently only support a single peer
}
+
+ m_site_name = site_name;
}
} // namespace mirror
typedef ClusterWatcher::PoolPeers PoolPeers;
typedef std::pair<int64_t, PeerSpec> PoolPeer;
- void update_pool_replayers(const PoolPeers &pool_peers);
+ void update_pool_replayers(const PoolPeers &pool_peers,
+ const std::string& site_name);
void create_cache_manager();
void run_cache_manager(utime_t *next_run_interval);
std::atomic<bool> m_stopping = { false };
bool m_manual_stop = false;
MirrorAdminSocketHook *m_asok_hook;
+ std::string m_site_name;
};
} // namespace mirror
const std::string &name,
librados::IoCtx &local_io_ctx, librados::IoCtx &remote_io_ctx,
const std::string &local_mirror_uuid, const std::string &remote_mirror_uuid,
- Threads<I> *threads, Throttler<I> *image_sync_throttler,
- Throttler<I> *image_deletion_throttler, ServiceDaemon<I> *service_daemon,
+ const std::string &local_site_name, Threads<I> *threads,
+ Throttler<I> *image_sync_throttler, Throttler<I> *image_deletion_throttler,
+ ServiceDaemon<I> *service_daemon,
journal::CacheManagerHandler *cache_manager_handler) :
m_local_mirror_uuid(local_mirror_uuid),
m_remote_mirror_uuid(remote_mirror_uuid),
+ m_local_site_name(local_site_name),
m_threads(threads), m_image_sync_throttler(image_sync_throttler),
m_image_deletion_throttler(image_deletion_throttler),
m_service_daemon(service_daemon),
librados::IoCtx &remote_ioctx,
const std::string &local_mirror_uuid,
const std::string &remote_mirror_uuid,
+ const std::string &local_site_name,
Threads<ImageCtxT> *threads,
Throttler<ImageCtxT> *image_sync_throttler,
Throttler<ImageCtxT> *image_deletion_throttler,
ServiceDaemon<ImageCtxT> *service_daemon,
journal::CacheManagerHandler *cache_manager_handler) {
return new NamespaceReplayer(name, local_ioctx, remote_ioctx,
- local_mirror_uuid, remote_mirror_uuid, threads,
- image_sync_throttler, image_deletion_throttler,
- service_daemon, cache_manager_handler);
+ local_mirror_uuid, remote_mirror_uuid,
+ local_site_name, threads, image_sync_throttler,
+ image_deletion_throttler, service_daemon,
+ cache_manager_handler);
}
NamespaceReplayer(const std::string &name,
librados::IoCtx &remote_ioctx,
const std::string &local_mirror_uuid,
const std::string &remote_mirror_uuid,
+ const std::string &local_site_name,
Threads<ImageCtxT> *threads,
Throttler<ImageCtxT> *image_sync_throttler,
Throttler<ImageCtxT> *image_deletion_throttler,
librados::IoCtx m_remote_io_ctx;
std::string m_local_mirror_uuid;
std::string m_remote_mirror_uuid;
+ std::string m_local_site_name;
Threads<ImageCtxT> *m_threads;
Throttler<ImageCtxT> *m_image_sync_throttler;
Throttler<ImageCtxT> *m_image_deletion_throttler;
}
template <typename I>
-void PoolReplayer<I>::init() {
+void PoolReplayer<I>::init(const std::string& site_name) {
ceph_assert(!m_pool_replayer_thread.is_started());
// reset state
m_stopping = false;
m_blacklisted = false;
+ m_site_name = site_name;
dout(10) << "replaying for " << m_peer << dendl;
int r = init_rados(g_ceph_context->_conf->cluster,
m_default_namespace_replayer.reset(NamespaceReplayer<I>::create(
"", m_local_io_ctx, m_remote_io_ctx, m_local_mirror_uuid, m_peer.uuid,
- m_threads, m_image_sync_throttler.get(), m_image_deletion_throttler.get(),
- m_service_daemon, m_cache_manager_handler));
+ m_site_name, m_threads, m_image_sync_throttler.get(),
+ m_image_deletion_throttler.get(), m_service_daemon,
+ m_cache_manager_handler));
C_SaferCond on_init;
m_default_namespace_replayer->init(&on_init);
for (auto &name : mirroring_namespaces) {
auto namespace_replayer = NamespaceReplayer<I>::create(
name, m_local_io_ctx, m_remote_io_ctx, m_local_mirror_uuid, m_peer.uuid,
- m_threads, m_image_sync_throttler.get(),
+ m_site_name, m_threads, m_image_sync_throttler.get(),
m_image_deletion_throttler.get(), m_service_daemon,
m_cache_manager_handler);
auto on_init = new LambdaContext(
bool is_leader() const;
bool is_running() const;
- void init();
+ void init(const std::string& site_name);
void shut_down();
void run();
mutable ceph::mutex m_lock;
ceph::condition_variable m_cond;
+ std::string m_site_name;
bool m_stopping = false;
bool m_manual_stop = false;
bool m_blacklisted = false;