template <typename I>
GroupReplayer<I>::~GroupReplayer() {
unregister_admin_socket_hook();
- //ceph_assert(m_on_start_finish == nullptr);
- ceph_assert(m_on_stop_finish == nullptr);
+ ceph_assert(m_on_start_finish == nullptr);
ceph_assert(m_bootstrap_request == nullptr);
-}
-
-template <typename I>
-bool GroupReplayer<I>::needs_restart() const {
- dout(10) << dendl;
-
- std::lock_guard locker{m_lock};
- if (!m_replayer) {
- return true;
- }
-
- if (!m_local_group_ctx.primary) {
- if (m_state != STATE_REPLAYING) {
- return true;
- }
- for (auto &[_, image_replayer] : m_image_replayers) {
- if (image_replayer->is_stopped()) {
- dout(10) << "image replayer is in stopped state, needs restart" << dendl;
- return true;
- }
- }
- } else {
- // this is how we determine if the remote state has changed,
- // if we never restart the group replayer we never get to see updated
- // snapshot information on remote and see if its demoted at all.
- return true;
- }
-
- return false;
+ ceph_assert(m_bootstrap_request == nullptr);
+ ceph_assert(m_replayer == nullptr);
+ ceph_assert(m_image_replayers.empty());
+ ceph_assert(m_on_stop_contexts.empty());
}
template <typename I>
m_last_r = 0;
m_state_desc.clear();
m_local_group_snaps.clear();
+ ceph_assert(m_replayer == nullptr);
+ ceph_assert(m_image_replayers.empty());
m_image_replayers.clear();
m_image_replayer_index.clear();
- m_get_remote_group_snap_ret_vals.clear();
m_manual_stop = false;
m_finished = false;
- //ceph_assert(m_on_start_finish == nullptr);
+ ceph_assert(m_on_start_finish == nullptr);
std::swap(m_on_start_finish, on_finish);
}
}
<< ", restart=" << restart << dendl;
group_replayer::BootstrapRequest<I> *bootstrap_request = nullptr;
- std::map<std::string, std::map<ImageReplayer<I> *, Context *>> create_snap_requests;
bool shut_down_replay = false;
- bool running = true;
+ bool is_stopped = false;
{
std::lock_guard locker{m_lock};
- if (restart) {
- m_restart_requested = true;
- }
+ dout(10) << "state: " << m_state << ", m_stop_requested: "
+ << m_stop_requested << dendl;
- dout(10) << "state: " << m_state << ", m_stop_requested: " << m_stop_requested << dendl;
if (!is_running_()) {
dout(10) << "replayers not running" << dendl;
- running = false;
+ if (manual && !m_manual_stop) {
+ dout(10) << "marking manual" << dendl;
+ m_manual_stop = true;
+ }
if (!restart && m_restart_requested) {
dout(10) << "canceling restart" << dendl;
m_restart_requested = false;
}
+ if (is_stopped_()) {
+ dout(10) << "already stopped" << dendl;
+ is_stopped = true;
+ } else {
+ dout(10) << "joining in-flight stop" << dendl;
+ if (on_finish != nullptr) {
+ m_on_stop_contexts.push_back(on_finish);
+ }
+ }
} else {
dout(10) << "replayers still running" << dendl;
- if (!is_stopped_() || m_state == STATE_STOPPING) {
- if (m_state == STATE_STARTING) {
- dout(10) << "canceling start" << dendl;
- if (m_bootstrap_request != nullptr) {
- bootstrap_request = m_bootstrap_request;
- bootstrap_request->get();
- }
- shut_down_replay = true;
- } else {
- dout(10) << "interrupting replay" << dendl;
- shut_down_replay = true;
- for (auto it = m_create_snap_requests.begin();
- it != m_create_snap_requests.end(); ) {
- auto &remote_group_snap_id = it->first;
- auto &requests = it->second;
- if (m_get_remote_group_snap_ret_vals.count(remote_group_snap_id) == 0) {
- dout(20) << "getting remote group snap for "
- << remote_group_snap_id << " is still in-progress"
- << dendl;
- shut_down_replay = false;
- } else if (m_pending_snap_create.count(remote_group_snap_id) > 0) {
- dout(20) << "group snap create for " << remote_group_snap_id
- << " is still in-progress" << dendl;
- shut_down_replay = false;
- } else {
- create_snap_requests[remote_group_snap_id] = requests;
- it = m_create_snap_requests.erase(it);
- continue;
- }
- it++;
- }
+ if (m_state == STATE_STARTING) {
+ dout(10) << "canceling start" << dendl;
+ if (m_bootstrap_request != nullptr) {
+ bootstrap_request = m_bootstrap_request;
+ bootstrap_request->get();
}
- m_state = STATE_STOPPING;
+ } else {
+ dout(10) << "interrupting replay" << dendl;
+ shut_down_replay = true;
+ }
- ceph_assert(m_on_stop_finish == nullptr);
- std::swap(m_on_stop_finish, on_finish);
- m_stop_requested = true;
- m_manual_stop = manual;
+ m_stop_requested = true;
+ m_manual_stop = manual;
+ if (on_finish != nullptr) {
+ m_on_stop_contexts.push_back(on_finish);
}
}
}
+ if (is_stopped) {
+ if (on_finish) {
+ on_finish->complete(-EINVAL);
+ }
+ return;
+ }
+
if (bootstrap_request != nullptr) {
dout(10) << "canceling bootstrap" << dendl;
bootstrap_request->cancel();
bootstrap_request->put();
}
- for (auto &[_, requests] : create_snap_requests) {
- for (auto &[_, on_finish] : requests) {
- on_finish->complete(-ESTALE);
- }
- }
-
if (shut_down_replay) {
- stop_group_replayer();
- } else if (on_finish != nullptr) {
- // XXXMG: clean up
- {
- std::lock_guard locker{m_lock};
- m_stop_requested = false;
- }
- on_finish->complete(0);
- }
-
- if (!running && shut_down_replay) {
- dout(20) << "not running" << dendl;
- if (on_finish) {
- on_finish->complete(-EINVAL);
- }
+ on_stop_replay(0);
}
}
template <typename I>
void GroupReplayer<I>::print_status(Formatter *f) {
- dout(10) << dendl;
+ dout(10) << m_state << dendl;
std::lock_guard l{m_lock};
f->dump_string("name", m_group_spec);
auto state = m_state;
if (m_local_group_ctx.primary && state == STATE_REPLAYING) { // XXXMG
+ dout(10) << "setting state to stopped" << dendl;
state = STATE_STOPPED;
}
f->dump_string("state", state_to_string(state));
+// TODO: remove the image_replayers section
f->open_array_section("image_replayers");
for (auto &[_, image_replayer] : m_image_replayers) {
image_replayer->print_status(f);
f->close_section(); // group_replayer
}
+template <typename I>
+void GroupReplayer<I>::on_stop_replay(int r, const std::string &desc)
+{
+ dout(10) << "r=" << r << ", desc=" << desc << dendl;
+ {
+ std::lock_guard locker{m_lock};
+ if (m_state != STATE_REPLAYING) {
+ // might be invoked multiple times while stopping
+ return;
+ }
+
+ m_stop_requested = true;
+ m_state = STATE_STOPPING;
+ m_status_state = cls::rbd::MIRROR_GROUP_STATUS_STATE_STOPPED;
+ m_state_desc = "";
+ }
+
+ shut_down(r);
+}
+
template <typename I>
void GroupReplayer<I>::bootstrap_group() {
dout(10) << dendl;
locker.unlock();
dout(5) << "no peer clusters" << dendl;
- finish_start(-ENOENT, "no peer clusters");
+ finish_start_fail(-ENOENT, "no peer clusters");
return;
}
return;
}
+ ceph_assert(m_replayer == nullptr);
ceph_assert(m_image_replayers.empty());
auto ctx = create_context_callback<
dout(10) << "r=" << r << dendl;
{
std::lock_guard locker{m_lock};
+// Should never happen
if (m_state == STATE_STOPPING || m_state == STATE_STOPPED) {
dout(10) << "stop prevailed" <<dendl;
return;
m_bootstrap_request->put();
m_bootstrap_request = nullptr;
}
- m_local_group_ctx.listener = &m_listener;
- if (!m_local_group_ctx.name.empty()) {
- m_local_group_name = m_local_group_ctx.name;
- }
}
+ if (!m_local_group_ctx.name.empty()) {
+ m_local_group_name = m_local_group_ctx.name;
+ }
if (r == -EINVAL) {
sync_group_names();
} else {
if (finish_start_if_interrupted()) {
return;
} else if (r == -ENOENT) {
- finish_start(r, "group removed");
+ finish_start_fail(r, "group removed");
return;
} else if (r == -EREMOTEIO) {
- finish_start(r, "remote group is non-primary");
+ finish_start_fail(r, "remote group is non-primary");
return;
} else if (r == -EEXIST) {
- finish_start(r, "split-brain detected");
+ finish_start_fail(r, "split-brain detected");
+ return;
+ } else if (m_remote_group_id.empty()){
+ r = -EINVAL;
+ //FIXME: The primary should not care if the remote is ready.
+ // Bootstrap again when the replayer is restarted
+ finish_start_fail(r, "remote is not ready yet");
return;
+ } else if (r == -EINVAL) {
+ sync_group_names();
} else if (r < 0) {
- finish_start(r, "bootstrap failed");
+ finish_start_fail(r, "bootstrap failed");
return;
}
- if (!m_remote_group_id.empty()) {
- C_SaferCond ctx;
- create_group_replayer(&ctx);
- ctx.wait();
- } else {
- r = -EINVAL;
- finish_start(r, "remote is not ready yet"); // bootstrap again
+ if (m_local_group_ctx.primary) { // XXXMG
+ set_mirror_group_status_update(
+ cls::rbd::MIRROR_GROUP_STATUS_STATE_STOPPED,
+ "local group is primary");
+ finish_start_fail(0, "local group is primary");
return;
}
+
+ m_local_group_ctx.listener = &m_listener;
+
+ create_group_replayer();
}
template <typename I>
-void GroupReplayer<I>::create_group_replayer(Context *on_finish) {
+void GroupReplayer<I>::create_group_replayer() {
dout(10) << dendl;
- auto ctx = new LambdaContext(
- [this, on_finish](int r) {
- handle_create_group_replayer(r, on_finish);
- });
+ ceph_assert(m_replayer == nullptr);
+
+ auto ctx = create_context_callback<
+ GroupReplayer, &GroupReplayer<I>::handle_create_group_replayer>(this);
m_replayer = group_replayer::Replayer<I>::create(
m_threads, m_local_io_ctx, m_remote_group_peer.io_ctx, m_global_group_id,
}
template <typename I>
-void GroupReplayer<I>::handle_create_group_replayer(int r, Context *on_finish) {
+void GroupReplayer<I>::handle_create_group_replayer(int r) {
dout(10) << "r=" << r << dendl;
- if (m_state == STATE_STOPPING || m_state == STATE_STOPPED) {
- dout(10) << "stop prevailed" <<dendl;
- on_finish->complete(r);
+ if (finish_start_if_interrupted()) {
+ return;
+ } else if (r < 0) {
+
+ finish_start_fail(r, "failed to create group replayer");
return;
}
- on_finish->complete(0);
+
start_image_replayers();
}
+// TODO: Move this to group_replayer::Replayer?
template <typename I>
void GroupReplayer<I>::start_image_replayers() {
dout(10) << m_image_replayers.size() << dendl;
if (finish_start_if_interrupted()) {
return;
} else if (r < 0) {
- finish_start(r, "");
+ finish_start_fail(r, "failed to start image replayers");
return;
}
- finish_start(0, "");
-}
+ Context *on_finish = nullptr;
+ {
+ std::unique_lock locker{m_lock};
+ ceph_assert(m_state == STATE_STARTING);
+ m_state = STATE_REPLAYING;
+ std::swap(m_on_start_finish, on_finish);
+ }
-template <typename I>
-void GroupReplayer<I>::stop_group_replayer() {
- dout(10) << dendl;
set_mirror_group_status_update(
- cls::rbd::MIRROR_GROUP_STATUS_STATE_STOPPING_REPLAY, "stopping");
+ cls::rbd::MIRROR_GROUP_STATUS_STATE_REPLAYING, "replaying");
- if (m_replayer != nullptr) {
- C_SaferCond ctx;
- m_replayer->shut_down(&ctx);
- ctx.wait();
+ if(on_finish) {
+ on_finish->complete(0);
}
-
- handle_stop_group_replayer(0);
}
template <typename I>
-void GroupReplayer<I>::handle_stop_group_replayer(int r) {
- dout(10) << "r=" << r << dendl;
-
+bool GroupReplayer<I>::finish_start_if_interrupted() {
std::lock_guard locker{m_lock};
- stop_image_replayers();
+
+ return finish_start_if_interrupted(m_lock);
}
template <typename I>
-void GroupReplayer<I>::stop_image_replayer(ImageReplayer<I> *image_replayer,
- Context *on_finish) {
- dout(10) << image_replayer << " global_image_id="
- << image_replayer->get_global_image_id() << ", on_finish="
- << on_finish << dendl;
-
- if (image_replayer->is_stopped()) {
- m_threads->work_queue->queue(on_finish, 0);
- return;
+bool GroupReplayer<I>::finish_start_if_interrupted(ceph::mutex &lock) {
+ ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+ ceph_assert(m_state == STATE_STARTING);
+ if (!m_stop_requested) {
+ return false;
}
- m_async_op_tracker.start_op();
- Context *ctx = create_async_context_callback(
- m_threads->work_queue, new LambdaContext(
- [this, image_replayer, on_finish] (int r) {
- stop_image_replayer(image_replayer, on_finish);
- m_async_op_tracker.finish_op();
- }));
-
- if (image_replayer->is_running()) {
- image_replayer->stop(ctx, false);
- } else {
- int after = 1;
- dout(10) << "scheduling image replayer " << image_replayer << " stop after "
- << after << " sec (task " << ctx << ")" << dendl;
- ctx = new LambdaContext(
- [this, after, ctx] (int r) {
- std::lock_guard timer_locker{m_threads->timer_lock};
- m_threads->timer->add_event_after(after, ctx);
- });
- m_threads->work_queue->queue(ctx, 0);
- }
+ finish_start_fail(-ECANCELED, "");
+ return true;
}
template <typename I>
-void GroupReplayer<I>::stop_image_replayers() {
- dout(10) << dendl;
-
- ceph_assert(ceph_mutex_is_locked(m_lock));
-
- Context *ctx = create_async_context_callback(
- m_threads->work_queue, create_context_callback<GroupReplayer<I>,
- &GroupReplayer<I>::handle_stop_image_replayers>(this));
+void GroupReplayer<I>::finish_start_fail(int r, const std::string &desc) {
+ dout(10) << "r=" << r << ", desc=" << desc << dendl;
+ Context *ctx = new LambdaContext([this, r, desc](int _r) {
+ m_status_state = cls::rbd::MIRROR_GROUP_STATUS_STATE_STOPPED;
+ m_state_desc = desc;
+ {
+ std::lock_guard locker{m_lock};
+ ceph_assert(m_state == STATE_STARTING);
+ m_state = STATE_STOPPING;
+ if (r < 0) {
+ if (r == -ECANCELED) {
+ dout(10) << "start canceled" << dendl;
+ } else if (r == -ENOENT) {
+ dout(10) << "mirroring group removed" << dendl;
+ } else if (r == -EREMOTEIO) {
+ dout(10) << "mirroring group demoted" << dendl;
+ } else {
+ derr << "start failed: " << cpp_strerror(r) << dendl;
+ m_status_state = cls::rbd::MIRROR_GROUP_STATUS_STATE_ERROR;
+ }
+ }
+ }
+ shut_down(r);
+ });
- C_Gather *gather_ctx = new C_Gather(g_ceph_context, ctx);
- for (auto &it : m_image_replayers) {
- stop_image_replayer(it.second, gather_ctx->new_sub());
- }
- gather_ctx->activate();
+ m_threads->work_queue->queue(ctx, 0);
}
template <typename I>
-void GroupReplayer<I>::handle_stop_image_replayers(int r) {
- dout(10) << "r=" << r << dendl;
-
- ceph_assert(r == 0);
+void GroupReplayer<I>::shut_down(int r) {
+ dout(10) << "r=" << r << ", state=" << m_state << dendl;
- Context *on_finish = nullptr;
{
std::lock_guard locker{m_lock};
-
- for (auto &it : m_image_replayers) {
- ceph_assert(it.second->is_stopped());
- it.second->destroy();
- }
ceph_assert(m_state == STATE_STOPPING);
- m_image_replayers.clear();
-
- m_stop_requested = false;
- m_state = STATE_STOPPED;
- std::swap(on_finish, m_on_stop_finish);
}
- dout(15) << "waiting for in-flight operations to complete" << dendl;
- m_async_op_tracker.wait_for_ops(new LambdaContext([this](int r) {
- set_mirror_group_status_update(
- cls::rbd::MIRROR_GROUP_STATUS_STATE_STOPPED, "stopped");
- }));
+ // chain the shut down sequence (reverse order)
+ Context *ctx = new LambdaContext(
+ [this, r](int _r) {
+ set_mirror_group_status_update(m_status_state, m_state_desc);
+ handle_shut_down(r);
+ });
- if (on_finish) {
- on_finish->complete(r);
- }
-}
+ // stop and destroy the replayers
+ ctx = new LambdaContext([this, ctx](int r) {
+ {
+ std::lock_guard locker{m_lock};
-template <typename I>
-bool GroupReplayer<I>::finish_start_if_interrupted() {
- std::lock_guard locker{m_lock};
+ for (auto &it : m_image_replayers) {
+ ceph_assert(it.second->is_stopped());
+ it.second->destroy();
+ }
+ m_image_replayers.clear();
+ }
+ ctx->complete(0);
+ });
- return finish_start_if_interrupted(m_lock);
-}
+ ctx = new LambdaContext([this, ctx](int r) {
+ C_Gather *gather_ctx = new C_Gather(g_ceph_context, ctx);
+ {
+ std::lock_guard locker{m_lock};
+ for (auto &it : m_image_replayers) {
+ it.second->stop(gather_ctx->new_sub());
+ }
+ }
+ gather_ctx->activate();
+ });
-template <typename I>
-bool GroupReplayer<I>::finish_start_if_interrupted(ceph::mutex &lock) {
- ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
- ceph_assert(m_state == STATE_STARTING);
- if (!m_stop_requested) {
- return false;
+ if (m_replayer != nullptr) {
+ ctx = new LambdaContext([this, ctx](int r) {
+ m_replayer->destroy();
+ m_replayer = nullptr;
+ ctx->complete(0);
+ });
+
+ ctx = new LambdaContext([this, ctx](int r) {
+ m_replayer->shut_down(ctx);
+ });
}
- finish_start(-ECANCELED, "");
- return true;
+ m_threads->work_queue->queue(ctx, 0);
}
template <typename I>
-void GroupReplayer<I>::finish_start(int r, const std::string &desc) {
- dout(10) << "r=" << r << ", desc=" << desc << dendl;
- Context *ctx = new LambdaContext(
- [this, r, desc](int _r) {
- Context *on_finish = nullptr;
- {
- std::lock_guard locker{m_lock};
- ceph_assert(m_state == STATE_STARTING);
- m_state = STATE_REPLAYING;
- std::swap(m_on_start_finish, on_finish);
- m_state_desc = desc;
- if (r < 0) {
- auto state = cls::rbd::MIRROR_GROUP_STATUS_STATE_STOPPED;
- if (r == -ECANCELED) {
- dout(10) << "start canceled" << dendl;
- } else if (r == -ENOENT) {
- dout(10) << "mirroring group removed" << dendl;
- } else if (r == -EREMOTEIO) {
- dout(10) << "mirroring group demoted" << dendl;
- } else {
- derr << "start failed: " << cpp_strerror(r) << dendl;
- state = cls::rbd::MIRROR_GROUP_STATUS_STATE_ERROR;
- }
- on_finish = new LambdaContext(
- [this, r, state, desc, on_finish](int) {
- set_mirror_group_status_update(state, desc);
-
- if (r == -ENOENT && !m_resync_requested) {
- set_finished(true);
- }
- if (on_finish != nullptr) {
- on_finish->complete(r);
- }
- });
- }
- }
+void GroupReplayer<I>::handle_shut_down(int r) {
- if (r < 0) {
- stop(on_finish, false, false);
- return;
- }
-
- if (m_local_group_ctx.primary) { // XXXMG
- set_mirror_group_status_update(
- cls::rbd::MIRROR_GROUP_STATUS_STATE_STOPPED,
- "local group is primary");
- } else {
- set_mirror_group_status_update(
- cls::rbd::MIRROR_GROUP_STATUS_STATE_REPLAYING, "replaying");
- }
+ dout(10) << "stop complete" << dendl;
+ Context *on_start = nullptr;
+ std::list<Context *> on_stop_contexts;
+ {
+ std::lock_guard locker{m_lock};
+ std::swap(on_start, m_on_start_finish);
+ on_stop_contexts = std::move(m_on_stop_contexts);
+ m_stop_requested = false;
+ ceph_assert(m_state == STATE_STOPPING);
+ m_state = STATE_STOPPED;
+ }
- if (on_finish != nullptr) {
- on_finish->complete(0);
- }
- });
- m_threads->work_queue->queue(ctx, 0);
+ if (on_start != nullptr) {
+ dout(10) << "on start finish complete, r=" << r << dendl;
+ on_start->complete(r);
+ r = 0;
+ }
+ for (auto ctx : on_stop_contexts) {
+ dout(10) << "on stop finish " << ctx << " complete, r=" << r << dendl;
+ ctx->complete(r);
+ }
}
return;
}
- dout(15) << "registered asok hook: " << m_group_spec << dendl;
asok_hook = new GroupReplayerAdminSocketHook<I>(
g_ceph_context, m_group_spec, this);
int r = asok_hook->register_commands();
if (r == 0) {
m_asok_hook = asok_hook;
+ dout(15) << "registered asok hook: " << m_group_spec << dendl;
return;
}
derr << "error registering admin socket commands" << dendl;
template <typename I>
bool Replayer<I>::is_replay_interrupted(std::unique_lock<ceph::mutex>* locker) {
+
if (m_state == STATE_COMPLETE) {
locker->unlock();
-
return true;
}
template <typename I>
void Replayer<I>::schedule_load_group_snapshots() {
+
+ std::lock_guard timer_locker{m_threads->timer_lock};
+ std::lock_guard locker{m_lock};
+
+ if (m_state != STATE_REPLAYING) {
+ return;
+ }
+
dout(10) << dendl;
+ ceph_assert(m_load_snapshots_task == nullptr);
+ m_load_snapshots_task = create_context_callback<
+ Replayer<I>,
+ &Replayer<I>::handle_schedule_load_group_snapshots>(this);
+
+ m_threads->timer->add_event_after(1, m_load_snapshots_task);
+}
+
+template <typename I>
+void Replayer<I>::handle_schedule_load_group_snapshots(int r) {
+ dout(10) << dendl;
+ ceph_assert(ceph_mutex_is_locked_by_me(m_threads->timer_lock));
+
+ {
+ std::unique_lock locker{m_lock};
+ if (m_state != STATE_REPLAYING) {
+ return;
+ }
+ }
+
+ ceph_assert(m_load_snapshots_task != nullptr);
+ m_load_snapshots_task = nullptr;
+
auto ctx = new LambdaContext(
[this](int r) {
load_local_group_snapshots();
});
- std::lock_guard timer_locker{m_threads->timer_lock};
- m_threads->timer->add_event_after(1, ctx);
+ m_threads->work_queue->queue(ctx, 0);
+}
+
+template <typename I>
+void Replayer<I>::cancel_load_group_snapshots() {
+ dout(10) << dendl;
+
+ std::unique_lock timer_locker{m_threads->timer_lock};
+ if (m_load_snapshots_task != nullptr) {
+ dout(10) << dendl;
+
+ if (m_threads->timer->cancel_event(m_load_snapshots_task)) {
+ m_load_snapshots_task = nullptr;
+ }
+ }
}
template <typename I>
do {
std::vector<cls::rbd::GroupImageStatus> image_ids_page;
+//TODO: Make this async
r = librbd::cls_client::group_image_list(&m_local_io_ctx, group_header_oid,
start_last, max_read,
&image_ids_page);
std::string group_header_oid = librbd::util::group_header_name(
m_local_group_id);
std::string value;
+// TODO: make this async
int r = librbd::cls_client::metadata_get(&m_local_io_ctx, group_header_oid,
RBD_GROUP_RESYNC, &value);
if (r < 0 && r != -ENOENT) {
void Replayer<I>::load_local_group_snapshots() {
dout(10) << "m_local_group_id=" << m_local_group_id << dendl;
- if (is_replay_interrupted()) {
+ std::unique_lock locker{m_lock};
+ if (is_replay_interrupted(&locker)) {
return;
}
dout(10) << "local group resync requested" << dendl;
// send stop for Group Replayer
notify_group_listener_stop();
+ return;
} else if (is_rename_requested()) {
m_stop_requested = true;
dout(10) << "remote group rename requested" << dendl;
// send stop for Group Replayer
notify_group_listener_stop();
+ return;
}
- std::unique_lock locker{m_lock};
+ m_in_flight_op_tracker.start_op();
m_local_group_snaps.clear();
auto ctx = create_context_callback<
Replayer<I>,
void Replayer<I>::handle_load_local_group_snapshots(int r) {
dout(10) << "r=" << r << dendl;
+ if (is_replay_interrupted()) {
+ m_in_flight_op_tracker.finish_op();
+ return;
+ }
+ m_in_flight_op_tracker.finish_op();
+
if (r < 0) {
derr << "error listing local mirror group snapshots: " << cpp_strerror(r)
<< dendl;
}
// this is primary, IDLE the group replayer
m_state = STATE_IDLE;
+ notify_group_listener_stop();
return;
}
handle_load_remote_group_snapshots(r);
});
+ m_in_flight_op_tracker.start_op();
auto req = librbd::group::ListSnapshotsRequest<I>::create(m_remote_io_ctx,
m_remote_group_id, true, true, &m_remote_group_snaps, ctx);
req->send();
void Replayer<I>::handle_load_remote_group_snapshots(int r) {
dout(10) << "r=" << r << dendl;
+ if (is_replay_interrupted()) {
+ m_in_flight_op_tracker.finish_op();
+ return;
+ }
+ m_in_flight_op_tracker.finish_op();
+
if (r < 0) {
derr << "error listing remote mirror group snapshots: " << cpp_strerror(r)
<< dendl;
void Replayer<I>::shut_down(Context* on_finish) {
dout(10) << dendl;
- std::unique_lock locker{m_lock};
- m_stop_requested = true;
- auto state = STATE_COMPLETE;
- std::swap(m_state, state);
- locker.unlock();
- if (on_finish) {
- on_finish->complete(0);
+ {
+ std::unique_lock locker{m_lock};
+ m_stop_requested = true;
+ ceph_assert(m_on_shutdown == nullptr);
+ std::swap(m_on_shutdown, on_finish);
+
+ auto state = STATE_COMPLETE;
+ std::swap(m_state, state);
}
+
+ cancel_load_group_snapshots();
+
+ if (!m_in_flight_op_tracker.empty()) {
+ m_in_flight_op_tracker.wait_for_ops(new LambdaContext([this](int) {
+ finish_shut_down();
+ }));
+ return;
+ }
+
+ finish_shut_down();
return;
}
+template <typename I>
+void Replayer<I>::finish_shut_down() {
+ dout(10) << dendl;
+
+ Context *on_finish = nullptr;
+
+ {
+ std::unique_lock locker{m_lock};
+ ceph_assert(m_on_shutdown != nullptr);
+ std::swap(m_on_shutdown, on_finish);
+ }
+ if (on_finish) {
+ on_finish->complete(0);
+ }
+}
} // namespace group_replayer
} // namespace mirror