for (auto it = m_pool_replayers.begin(); it != m_pool_replayers.end();) {
auto &peer = it->first.second;
auto pool_peer_it = pool_peers.find(it->first.first);
- if (it->second->is_blacklisted()) {
- derr << "removing blacklisted pool replayer for " << peer << dendl;
- // TODO: make async
- it = m_pool_replayers.erase(it);
- } else if (pool_peer_it == pool_peers.end() ||
- pool_peer_it->second.find(peer) == pool_peer_it->second.end()) {
+ if (pool_peer_it == pool_peers.end() ||
+ pool_peer_it->second.find(peer) == pool_peer_it->second.end()) {
dout(20) << "removing pool replayer for " << peer << dendl;
// TODO: make async
+ it->second->shut_down();
it = m_pool_replayers.erase(it);
} else {
++it;
for (auto &kv : pool_peers) {
for (auto &peer : kv.second) {
PoolPeer pool_peer(kv.first, peer);
- if (m_pool_replayers.find(pool_peer) == m_pool_replayers.end()) {
+
+ auto pool_replayers_it = m_pool_replayers.find(pool_peer);
+ if (pool_replayers_it != m_pool_replayers.end()) {
+ auto& pool_replayer = pool_replayers_it->second;
+ if (pool_replayer->is_blacklisted()) {
+ derr << "restarting blacklisted pool replayer for " << peer << dendl;
+ // TODO: make async
+ pool_replayer->shut_down();
+ pool_replayer->init();
+ } else if (!pool_replayer->is_running()) {
+ derr << "restarting failed pool replayer for " << peer << dendl;
+ // TODO: make async
+ pool_replayer->shut_down();
+ pool_replayer->init();
+ }
+ } else {
dout(20) << "starting pool replayer for " << peer << dendl;
unique_ptr<PoolReplayer> pool_replayer(new PoolReplayer(
m_threads, &m_service_daemon, m_image_deleter.get(), kv.first,
peer, m_args));
- // TODO: make async, and retry connecting within pool replayer
- int r = pool_replayer->init();
- if (r < 0) {
- continue;
- }
+ // TODO: make async
+ pool_replayer->init();
m_pool_replayers.emplace(pool_peer, std::move(pool_replayer));
}
}
m_threads(threads),
m_service_daemon(service_daemon),
m_image_deleter(image_deleter),
- m_lock(stringify("rbd::mirror::PoolReplayer ") + stringify(peer)),
+ m_local_pool_id(local_pool_id),
m_peer(peer),
m_args(args),
- m_local_pool_id(local_pool_id),
+ m_lock(stringify("rbd::mirror::PoolReplayer ") + stringify(peer)),
m_local_pool_watcher_listener(this, true),
m_remote_pool_watcher_listener(this, false),
- m_asok_hook(nullptr),
m_pool_replayer_thread(this),
m_leader_listener(this)
{
PoolReplayer::~PoolReplayer()
{
delete m_asok_hook;
-
- m_stopping = true;
- {
- Mutex::Locker l(m_lock);
- m_cond.Signal();
- }
- if (m_pool_replayer_thread.is_started()) {
- m_pool_replayer_thread.join();
- }
- if (m_leader_watcher) {
- m_leader_watcher->shut_down();
- }
- if (m_instance_watcher) {
- m_instance_watcher->shut_down();
- }
- if (m_instance_replayer) {
- m_instance_replayer->shut_down();
- }
-
- assert(!m_local_pool_watcher);
- assert(!m_remote_pool_watcher);
+ shut_down();
}
bool PoolReplayer::is_blacklisted() const {
return m_leader_watcher && m_leader_watcher->is_leader();
}
-int PoolReplayer::init()
+bool PoolReplayer::is_running() const {
+ return m_pool_replayer_thread.is_started();
+}
+
+void PoolReplayer::init()
{
- dout(20) << "replaying for " << m_peer << dendl;
+ assert(!m_pool_replayer_thread.is_started());
+ // reset state
+ m_stopping = false;
+ m_blacklisted = false;
+
+ dout(20) << "replaying for " << m_peer << dendl;
int r = init_rados(g_ceph_context->_conf->cluster,
g_ceph_context->_conf->name.to_str(),
"local cluster", &m_local_rados);
if (r < 0) {
- return r;
+ return;
}
r = init_rados(m_peer.cluster_name, m_peer.client_name,
std::string("remote peer ") + stringify(m_peer),
&m_remote_rados);
if (r < 0) {
- return r;
+ return;
}
r = m_local_rados->ioctx_create2(m_local_pool_id, m_local_io_ctx);
if (r < 0) {
derr << "error accessing local pool " << m_local_pool_id << ": "
<< cpp_strerror(r) << dendl;
- return r;
+ return;
}
std::string local_mirror_uuid;
if (r < 0) {
derr << "failed to retrieve local mirror uuid from pool "
<< m_local_io_ctx.get_pool_name() << ": " << cpp_strerror(r) << dendl;
- return r;
+ return;
}
r = m_remote_rados->ioctx_create(m_local_io_ctx.get_pool_name().c_str(),
if (r < 0) {
derr << "error accessing remote pool " << m_local_io_ctx.get_pool_name()
<< ": " << cpp_strerror(r) << dendl;
- return r;
+ return;
}
dout(20) << "connected to " << m_peer << dendl;
m_instance_replayer->init();
m_instance_replayer->add_peer(m_peer.uuid, m_remote_io_ctx);
- m_instance_watcher.reset(InstanceWatcher<>::create(m_local_io_ctx,
- m_threads->work_queue,
- m_instance_replayer.get()));
+ m_instance_watcher.reset(InstanceWatcher<>::create(
+ m_local_io_ctx, m_threads->work_queue, m_instance_replayer.get()));
r = m_instance_watcher->init();
if (r < 0) {
derr << "error initializing instance watcher: " << cpp_strerror(r) << dendl;
- return r;
+ return;
}
m_leader_watcher.reset(new LeaderWatcher<>(m_threads, m_local_io_ctx,
&m_leader_listener));
-
r = m_leader_watcher->init();
if (r < 0) {
derr << "error initializing leader watcher: " << cpp_strerror(r) << dendl;
- return r;
+ return;
}
m_pool_replayer_thread.create("pool replayer");
+}
- return 0;
+void PoolReplayer::shut_down() {
+ m_stopping = true;
+ {
+ Mutex::Locker l(m_lock);
+ m_cond.Signal();
+ }
+ if (m_pool_replayer_thread.is_started()) {
+ m_pool_replayer_thread.join();
+ }
+ if (m_leader_watcher) {
+ m_leader_watcher->shut_down();
+ }
+ if (m_instance_watcher) {
+ m_instance_watcher->shut_down();
+ }
+ if (m_instance_replayer) {
+ m_instance_replayer->shut_down();
+ }
+
+ assert(!m_local_pool_watcher);
+ assert(!m_remote_pool_watcher);
+ m_local_rados.reset();
+ m_remote_rados.reset();
}
int PoolReplayer::init_rados(const std::string &cluster_name,
bool is_blacklisted() const;
bool is_leader() const;
+ bool is_running() const;
+
+ void init();
+ void shut_down();
- int init();
void run();
void print_status(Formatter *f, stringstream *ss);
Threads<librbd::ImageCtx> *m_threads;
ServiceDaemon<librbd::ImageCtx>* m_service_daemon;
ImageDeleter<>* m_image_deleter;
+ int64_t m_local_pool_id = -1;
+ peer_t m_peer;
+ std::vector<const char*> m_args;
+
mutable Mutex m_lock;
Cond m_cond;
std::atomic<bool> m_stopping = { false };
bool m_manual_stop = false;
bool m_blacklisted = false;
- peer_t m_peer;
- std::vector<const char*> m_args;
RadosRef m_local_rados;
RadosRef m_remote_rados;
librados::IoCtx m_local_io_ctx;
librados::IoCtx m_remote_io_ctx;
- int64_t m_local_pool_id = -1;
-
PoolWatcherListener m_local_pool_watcher_listener;
std::unique_ptr<PoolWatcher<> > m_local_pool_watcher;
std::unique_ptr<InstanceReplayer<librbd::ImageCtx>> m_instance_replayer;
std::string m_asok_hook_name;
- AdminSocketHook *m_asok_hook;
+ AdminSocketHook *m_asok_hook = nullptr;
std::map<std::string, ImageIds> m_initial_mirror_image_ids;