#include "librbd/Utils.h"
#include "librbd/Watcher.h"
#include "librbd/api/Mirror.h"
+#include "InstanceReplayer.h"
#include "InstanceWatcher.h"
#include "LeaderWatcher.h"
#include "Replayer.h"
if (m_instance_watcher) {
m_instance_watcher->shut_down();
}
+ if (m_instance_replayer) {
+ m_instance_replayer->shut_down();
+ }
assert(!m_pool_watcher);
}
return r;
}
+ std::string local_mirror_uuid;
+ r = librbd::cls_client::mirror_uuid_get(&m_local_io_ctx,
+ &local_mirror_uuid);
+ if (r < 0) {
+ derr << "failed to retrieve local mirror uuid from pool "
+ << m_local_io_ctx.get_pool_name() << ": " << cpp_strerror(r) << dendl;
+ return r;
+ }
+
r = m_remote_rados->ioctx_create(m_local_io_ctx.get_pool_name().c_str(),
m_remote_io_ctx);
if (r < 0) {
<< ": " << cpp_strerror(r) << dendl;
return r;
}
- m_remote_pool_id = m_remote_io_ctx.get_id();
dout(20) << "connected to " << m_peer << dendl;
+ m_instance_replayer.reset(
+ InstanceReplayer<>::create(m_threads, m_image_deleter,
+ m_image_sync_throttler, m_local_rados,
+ local_mirror_uuid, m_local_pool_id));
+ m_instance_replayer->init();
+
m_leader_watcher.reset(new LeaderWatcher<>(m_threads, m_local_io_ctx,
&m_leader_listener));
r = m_leader_watcher->init();
break;
}
- for (auto image_it = m_image_replayers.begin();
- image_it != m_image_replayers.end(); ) {
- if (image_it->second->remote_images_empty()) {
- if (stop_image_replayer(image_it->second)) {
- image_it = m_image_replayers.erase(image_it);
- continue;
- }
- } else {
- start_image_replayer(image_it->second);
- }
- ++image_it;
- }
-
- m_cond.WaitInterval(m_lock,
- utime_t(g_ceph_context->_conf->
- rbd_mirror_image_state_check_interval, 0));
- }
-
- Mutex::Locker locker(m_lock);
- while (!m_image_replayers.empty()) {
- stop_image_replayers();
+ m_cond.WaitInterval(m_lock, utime_t(1, 0));
}
}
}
f->close_section();
}
- f->open_array_section("image_replayers");
- for (auto &kv : m_image_replayers) {
- auto &image_replayer = kv.second;
- image_replayer->print_status(f, ss);
- }
+ m_instance_replayer->print_status(f, ss);
- f->close_section();
f->close_section();
f->flush(*ss);
}
return;
}
- m_manual_stop = false;
-
- for (auto &kv : m_image_replayers) {
- auto &image_replayer = kv.second;
- image_replayer->start(nullptr, true);
- }
+ m_instance_replayer->start();
}
void Replayer::stop(bool manual)
return;
}
- m_manual_stop = true;
- for (auto &kv : m_image_replayers) {
- auto &image_replayer = kv.second;
- image_replayer->stop(nullptr, true);
- }
+ m_instance_replayer->stop();
}
void Replayer::restart()
return;
}
- m_manual_stop = false;
-
- for (auto &kv : m_image_replayers) {
- auto &image_replayer = kv.second;
- image_replayer->restart();
- }
+ m_instance_replayer->restart();
}
void Replayer::flush()
return;
}
- for (auto &kv : m_image_replayers) {
- auto &image_replayer = kv.second;
- image_replayer->flush();
- }
+ m_instance_replayer->flush();
}
void Replayer::release_leader()
return;
}
+ m_instance_replayer->set_peers({{mirror_uuid, m_remote_io_ctx}});
+
// first callback will be a full directory -- so see if we need to remove
// any local images that no longer exist on the remote side
if (!m_init_image_ids.empty()) {
m_init_image_ids.clear();
}
- // shut down replayers for non-mirrored images
- for (auto &image_id : removed_image_ids) {
- auto image_it = m_image_replayers.find(image_id.global_id);
- if (image_it != m_image_replayers.end()) {
- image_it->second->remove_remote_image(mirror_uuid, image_id.id);
-
- if (image_it->second->is_running()) {
- dout(20) << "stop image replayer for remote image "
- << image_id.id << " (" << image_id.global_id << ")"
- << dendl;
- }
-
- if (image_it->second->remote_images_empty() &&
- stop_image_replayer(image_it->second)) {
- // no additional remotes registered for this image
- m_image_replayers.erase(image_it);
- }
- }
- }
-
- // prune previously stopped image replayers
- for (auto image_it = m_image_replayers.begin();
- image_it != m_image_replayers.end(); ) {
- if (image_it->second->remote_images_empty() &&
- stop_image_replayer(image_it->second)) {
- image_it = m_image_replayers.erase(image_it);
- } else {
- ++image_it;
- }
- }
+ m_update_op_tracker.start_op();
+ Context *ctx = new FunctionContext([this](int r) {
+ dout(20) << "complete handle_update: r=" << r << dendl;
+ m_update_op_tracker.finish_op();
+ });
- if (added_image_ids.empty()) {
- return;
- }
+ C_Gather *gather_ctx = new C_Gather(g_ceph_context, ctx);
- std::string local_mirror_uuid;
- int r = librbd::cls_client::mirror_uuid_get(&m_local_io_ctx,
- &local_mirror_uuid);
- if (r < 0 || local_mirror_uuid.empty()) {
- derr << "failed to retrieve local mirror uuid from pool "
- << m_local_io_ctx.get_pool_name() << ": " << cpp_strerror(r) << dendl;
- return;
+ for (auto &image_id : removed_image_ids) {
+ m_instance_replayer->release_image(image_id.global_id,
+ {{mirror_uuid, image_id.id}}, true,
+ gather_ctx->new_sub());
}
- // start replayers for newly added remote image sources
for (auto &image_id : added_image_ids) {
- auto it = m_image_replayers.find(image_id.global_id);
- if (it == m_image_replayers.end()) {
- unique_ptr<ImageReplayer<> > image_replayer(new ImageReplayer<>(
- m_threads, m_image_deleter, m_image_sync_throttler, m_local_rados,
- local_mirror_uuid, m_local_pool_id, image_id.global_id));
- if (m_manual_stop) {
- image_replayer->stop(nullptr, true);
- }
-
- it = m_image_replayers.insert(
- std::make_pair(image_id.global_id, std::move(image_replayer))).first;
- }
-
- it->second->add_remote_image(mirror_uuid, image_id.id,
- m_remote_io_ctx);
- if (!it->second->is_running()) {
- dout(20) << "starting image replayer for remote image "
- << image_id.global_id << dendl;
- }
- start_image_replayer(it->second);
- }
-}
-
-void Replayer::start_image_replayer(unique_ptr<ImageReplayer<> > &image_replayer)
-{
- assert(m_lock.is_locked());
- if (!image_replayer->is_stopped() || image_replayer->remote_images_empty()) {
- return;
- } else if (image_replayer->is_blacklisted()) {
- derr << "blacklisted detected during image replay" << dendl;
- m_blacklisted = true;
- m_stopping.set(1);
- return;
- }
-
- std::string global_image_id = image_replayer->get_global_image_id();
- dout(20) << "global_image_id=" << global_image_id << dendl;
-
- FunctionContext *ctx = new FunctionContext(
- [this, global_image_id] (int r) {
- dout(20) << "image deleter result: r=" << r << ", "
- << "global_image_id=" << global_image_id << dendl;
- if (r == -ESTALE || r == -ECANCELED) {
- return;
- }
-
- Mutex::Locker locker(m_lock);
- auto it = m_image_replayers.find(global_image_id);
- if (it == m_image_replayers.end()) {
- return;
- }
-
- auto &image_replayer = it->second;
- if (r >= 0) {
- image_replayer->start();
- } else {
- start_image_replayer(image_replayer);
- }
- }
- );
-
- m_image_deleter->wait_for_scheduled_deletion(
- m_local_pool_id, image_replayer->get_global_image_id(), ctx, false);
-}
-
-bool Replayer::stop_image_replayer(unique_ptr<ImageReplayer<> > &image_replayer)
-{
- assert(m_lock.is_locked());
- dout(20) << "global_image_id=" << image_replayer->get_global_image_id()
- << dendl;
-
- // TODO: check how long it is stopping and alert if it is too long.
- if (image_replayer->is_stopped()) {
- m_image_deleter->cancel_waiter(m_local_pool_id,
- image_replayer->get_global_image_id());
-
- if (!m_stopping.read() && m_leader_watcher->is_leader()) {
- dout(20) << "scheduling delete" << dendl;
- m_image_deleter->schedule_image_delete(
- m_local_rados,
- image_replayer->get_local_pool_id(),
- image_replayer->get_local_image_id(),
- image_replayer->get_global_image_id());
- }
- return true;
- } else {
- if (!m_stopping.read()) {
- dout(20) << "scheduling delete after image replayer stopped" << dendl;
- }
- FunctionContext *ctx = new FunctionContext(
- [&image_replayer, this] (int r) {
- if (!m_stopping.read() && m_leader_watcher->is_leader() && r >= 0) {
- m_image_deleter->schedule_image_delete(
- m_local_rados,
- image_replayer->get_local_pool_id(),
- image_replayer->get_local_image_id(),
- image_replayer->get_global_image_id());
- }
- }
- );
- image_replayer->stop(ctx);
- }
-
- return false;
-}
-
-void Replayer::stop_image_replayers() {
- dout(20) << dendl;
-
- assert(m_lock.is_locked());
- for (auto image_it = m_image_replayers.begin();
- image_it != m_image_replayers.end();) {
- if (stop_image_replayer(image_it->second)) {
- image_it = m_image_replayers.erase(image_it);
- continue;
- }
- ++image_it;
- }
-}
-
-void Replayer::stop_image_replayers(Context *on_finish) {
- dout(20) << dendl;
-
- {
- Mutex::Locker locker(m_lock);
- stop_image_replayers();
-
- if (!m_image_replayers.empty()) {
- Context *ctx = new FunctionContext([this, on_finish](int r) {
- assert(r == 0);
- stop_image_replayers(on_finish);
- });
- ctx = create_async_context_callback(m_threads->work_queue, ctx);
-
- Mutex::Locker timer_locker(m_threads->timer_lock);
- m_threads->timer->add_event_after(1, ctx);
- return;
- }
+ m_instance_replayer->acquire_image(image_id.global_id,
+ {{mirror_uuid, image_id.id}},
+ gather_ctx->new_sub());
}
- on_finish->complete(0);
+ gather_ctx->activate();
}
void Replayer::handle_post_acquire_leader(Context *on_finish) {
assert(m_pool_watcher);
m_pool_watcher.reset();
}
+ wait_for_update_ops(on_finish);
+}
- stop_image_replayers(on_finish);
+void Replayer::wait_for_update_ops(Context *on_finish) {
+ dout(20) << dendl;
+
+ Mutex::Locker locker(m_lock);
+
+ Context *ctx = new FunctionContext([this, on_finish](int r) {
+ handle_wait_for_update_ops(r, on_finish);
+ });
+ ctx = create_async_context_callback(m_threads->work_queue, ctx);
+
+ m_update_op_tracker.wait_for_ops(ctx);
+}
+
+void Replayer::handle_wait_for_update_ops(int r, Context *on_finish) {
+ dout(20) << "r=" << r << dendl;
+
+ assert(r == 0);
+
+ Mutex::Locker locker(m_lock);
+ m_instance_replayer->release_all(on_finish);
}
} // namespace mirror