From 1ec3c858303ae1eda3c8e518e1065a46bcfd7bb1 Mon Sep 17 00:00:00 2001 From: N Balachandran Date: Fri, 21 Feb 2025 11:47:19 +0530 Subject: [PATCH] rbd-mirror: fixes multiple issues in the group replayer The commit includes the following: - Fixed crashes in the start/stop in GroupReplayer - Fixed crashes in the shut_down sequence in group_replayer::Replayer - ImageMap will now send release_group notifications for non-empty groups. - InstanceReplayer no longer checks if the GroupReplayer needs to be restarted. The GroupReplayer will stop itself if it determines that it needs to be restarted. Signed-off-by: N Balachandran --- qa/workunits/rbd/rbd_mirror_group.sh | 6 +- src/tools/rbd_mirror/GroupReplayer.cc | 493 ++++++++---------- src/tools/rbd_mirror/GroupReplayer.h | 23 +- src/tools/rbd_mirror/ImageMap.cc | 2 +- src/tools/rbd_mirror/InstanceReplayer.cc | 4 - .../rbd_mirror/group_replayer/Replayer.cc | 114 +++- .../rbd_mirror/group_replayer/Replayer.h | 12 +- 7 files changed, 344 insertions(+), 310 deletions(-) diff --git a/qa/workunits/rbd/rbd_mirror_group.sh b/qa/workunits/rbd/rbd_mirror_group.sh index c55e1a64e87dd..251ac56ddb60b 100755 --- a/qa/workunits/rbd/rbd_mirror_group.sh +++ b/qa/workunits/rbd/rbd_mirror_group.sh @@ -303,7 +303,7 @@ test_fields_in_group_info ${CLUSTER2} ${POOL}/${group} 'snapshot' 'enabled' 'tru wait_for_group_replay_started ${CLUSTER1} ${POOL}/${group} 1 mirror_group_snapshot_and_wait_for_sync_complete ${CLUSTER1} ${CLUSTER2} ${POOL}/${group} wait_for_group_status_in_pool_dir ${CLUSTER1} ${POOL}/${group} 'up+replaying' 1 -wait_for_group_status_in_pool_dir ${CLUSTER2} ${POOL}/${group} 'up+stopped' 1 +wait_for_group_status_in_pool_dir ${CLUSTER2} ${POOL}/${group} 'up+stopped' 0 compare_images ${CLUSTER1} ${CLUSTER2} ${POOL} ${POOL} ${image} testlog " - failover" @@ -317,7 +317,7 @@ test_fields_in_group_info ${CLUSTER1} ${POOL}/${group1} 'snapshot' 'enabled' 'tr wait_for_group_replay_started ${CLUSTER2} ${POOL}/${group1} 1 write_image ${CLUSTER1} ${POOL} ${image1} 100 mirror_group_snapshot_and_wait_for_sync_complete ${CLUSTER2} ${CLUSTER1} ${POOL}/${group1} -wait_for_group_status_in_pool_dir ${CLUSTER1} ${POOL}/${group1} 'up+stopped' 1 +wait_for_group_status_in_pool_dir ${CLUSTER1} ${POOL}/${group1} 'up+stopped' 0 wait_for_group_status_in_pool_dir ${CLUSTER2} ${POOL}/${group1} 'up+replaying' 1 compare_images ${CLUSTER1} ${CLUSTER2} ${POOL} ${POOL} ${image1} @@ -333,7 +333,7 @@ wait_for_group_replay_started ${CLUSTER1} ${POOL}/${group1} 1 write_image ${CLUSTER2} ${POOL} ${image1} 100 mirror_group_snapshot_and_wait_for_sync_complete ${CLUSTER1} ${CLUSTER2} ${POOL}/${group1} wait_for_group_status_in_pool_dir ${CLUSTER1} ${POOL}/${group1} 'up+replaying' 1 -wait_for_group_status_in_pool_dir ${CLUSTER2} ${POOL}/${group1} 'up+stopped' 1 +wait_for_group_status_in_pool_dir ${CLUSTER2} ${POOL}/${group1} 'up+stopped' 0 compare_images ${CLUSTER1} ${CLUSTER2} ${POOL} ${POOL} ${image1} testlog " - force promote cluster1" diff --git a/src/tools/rbd_mirror/GroupReplayer.cc b/src/tools/rbd_mirror/GroupReplayer.cc index ee24ae7a11052..5072bbf3ca344 100644 --- a/src/tools/rbd_mirror/GroupReplayer.cc +++ b/src/tools/rbd_mirror/GroupReplayer.cc @@ -217,38 +217,12 @@ GroupReplayer::GroupReplayer( template GroupReplayer::~GroupReplayer() { unregister_admin_socket_hook(); - //ceph_assert(m_on_start_finish == nullptr); - ceph_assert(m_on_stop_finish == nullptr); + ceph_assert(m_on_start_finish == nullptr); ceph_assert(m_bootstrap_request == nullptr); -} - -template -bool GroupReplayer::needs_restart() const { - dout(10) << dendl; - - std::lock_guard locker{m_lock}; - if (!m_replayer) { - return true; - } - - if (!m_local_group_ctx.primary) { - if (m_state != STATE_REPLAYING) { - return true; - } - for (auto &[_, image_replayer] : m_image_replayers) { - if (image_replayer->is_stopped()) { - dout(10) << "image replayer is in stopped state, needs restart" << dendl; - return true; - } - } - } else { - // this is how we determine if the remote state has changed, - // if we never restart the group replayer we never get to see updated - // snapshot information on remote and see if its demoted at all. - return true; - } - - return false; + ceph_assert(m_bootstrap_request == nullptr); + ceph_assert(m_replayer == nullptr); + ceph_assert(m_image_replayers.empty()); + ceph_assert(m_on_stop_contexts.empty()); } template @@ -347,12 +321,13 @@ void GroupReplayer::start(Context *on_finish, bool manual, bool restart) { m_last_r = 0; m_state_desc.clear(); m_local_group_snaps.clear(); + ceph_assert(m_replayer == nullptr); + ceph_assert(m_image_replayers.empty()); m_image_replayers.clear(); m_image_replayer_index.clear(); - m_get_remote_group_snap_ret_vals.clear(); m_manual_stop = false; m_finished = false; - //ceph_assert(m_on_start_finish == nullptr); + ceph_assert(m_on_start_finish == nullptr); std::swap(m_on_start_finish, on_finish); } } @@ -373,96 +348,69 @@ void GroupReplayer::stop(Context *on_finish, bool manual, bool restart) { << ", restart=" << restart << dendl; group_replayer::BootstrapRequest *bootstrap_request = nullptr; - std::map *, Context *>> create_snap_requests; bool shut_down_replay = false; - bool running = true; + bool is_stopped = false; { std::lock_guard locker{m_lock}; - if (restart) { - m_restart_requested = true; - } + dout(10) << "state: " << m_state << ", m_stop_requested: " + << m_stop_requested << dendl; - dout(10) << "state: " << m_state << ", m_stop_requested: " << m_stop_requested << dendl; if (!is_running_()) { dout(10) << "replayers not running" << dendl; - running = false; + if (manual && !m_manual_stop) { + dout(10) << "marking manual" << dendl; + m_manual_stop = true; + } if (!restart && m_restart_requested) { dout(10) << "canceling restart" << dendl; m_restart_requested = false; } + if (is_stopped_()) { + dout(10) << "already stopped" << dendl; + is_stopped = true; + } else { + dout(10) << "joining in-flight stop" << dendl; + if (on_finish != nullptr) { + m_on_stop_contexts.push_back(on_finish); + } + } } else { dout(10) << "replayers still running" << dendl; - if (!is_stopped_() || m_state == STATE_STOPPING) { - if (m_state == STATE_STARTING) { - dout(10) << "canceling start" << dendl; - if (m_bootstrap_request != nullptr) { - bootstrap_request = m_bootstrap_request; - bootstrap_request->get(); - } - shut_down_replay = true; - } else { - dout(10) << "interrupting replay" << dendl; - shut_down_replay = true; - for (auto it = m_create_snap_requests.begin(); - it != m_create_snap_requests.end(); ) { - auto &remote_group_snap_id = it->first; - auto &requests = it->second; - if (m_get_remote_group_snap_ret_vals.count(remote_group_snap_id) == 0) { - dout(20) << "getting remote group snap for " - << remote_group_snap_id << " is still in-progress" - << dendl; - shut_down_replay = false; - } else if (m_pending_snap_create.count(remote_group_snap_id) > 0) { - dout(20) << "group snap create for " << remote_group_snap_id - << " is still in-progress" << dendl; - shut_down_replay = false; - } else { - create_snap_requests[remote_group_snap_id] = requests; - it = m_create_snap_requests.erase(it); - continue; - } - it++; - } + if (m_state == STATE_STARTING) { + dout(10) << "canceling start" << dendl; + if (m_bootstrap_request != nullptr) { + bootstrap_request = m_bootstrap_request; + bootstrap_request->get(); } - m_state = STATE_STOPPING; + } else { + dout(10) << "interrupting replay" << dendl; + shut_down_replay = true; + } - ceph_assert(m_on_stop_finish == nullptr); - std::swap(m_on_stop_finish, on_finish); - m_stop_requested = true; - m_manual_stop = manual; + m_stop_requested = true; + m_manual_stop = manual; + if (on_finish != nullptr) { + m_on_stop_contexts.push_back(on_finish); } } } + if (is_stopped) { + if (on_finish) { + on_finish->complete(-EINVAL); + } + return; + } + if (bootstrap_request != nullptr) { dout(10) << "canceling bootstrap" << dendl; bootstrap_request->cancel(); bootstrap_request->put(); } - for (auto &[_, requests] : create_snap_requests) { - for (auto &[_, on_finish] : requests) { - on_finish->complete(-ESTALE); - } - } - if (shut_down_replay) { - stop_group_replayer(); - } else if (on_finish != nullptr) { - // XXXMG: clean up - { - std::lock_guard locker{m_lock}; - m_stop_requested = false; - } - on_finish->complete(0); - } - - if (!running && shut_down_replay) { - dout(20) << "not running" << dendl; - if (on_finish) { - on_finish->complete(-EINVAL); - } + on_stop_replay(0); } } @@ -501,7 +449,7 @@ void GroupReplayer::flush() { template void GroupReplayer::print_status(Formatter *f) { - dout(10) << dendl; + dout(10) << m_state << dendl; std::lock_guard l{m_lock}; @@ -509,9 +457,11 @@ void GroupReplayer::print_status(Formatter *f) { f->dump_string("name", m_group_spec); auto state = m_state; if (m_local_group_ctx.primary && state == STATE_REPLAYING) { // XXXMG + dout(10) << "setting state to stopped" << dendl; state = STATE_STOPPED; } f->dump_string("state", state_to_string(state)); +// TODO: remove the image_replayers section f->open_array_section("image_replayers"); for (auto &[_, image_replayer] : m_image_replayers) { image_replayer->print_status(f); @@ -520,6 +470,26 @@ void GroupReplayer::print_status(Formatter *f) { f->close_section(); // group_replayer } +template +void GroupReplayer::on_stop_replay(int r, const std::string &desc) +{ + dout(10) << "r=" << r << ", desc=" << desc << dendl; + { + std::lock_guard locker{m_lock}; + if (m_state != STATE_REPLAYING) { + // might be invoked multiple times while stopping + return; + } + + m_stop_requested = true; + m_state = STATE_STOPPING; + m_status_state = cls::rbd::MIRROR_GROUP_STATUS_STATE_STOPPED; + m_state_desc = ""; + } + + shut_down(r); +} + template void GroupReplayer::bootstrap_group() { dout(10) << dendl; @@ -529,7 +499,7 @@ void GroupReplayer::bootstrap_group() { locker.unlock(); dout(5) << "no peer clusters" << dendl; - finish_start(-ENOENT, "no peer clusters"); + finish_start_fail(-ENOENT, "no peer clusters"); return; } @@ -541,6 +511,7 @@ void GroupReplayer::bootstrap_group() { return; } + ceph_assert(m_replayer == nullptr); ceph_assert(m_image_replayers.empty()); auto ctx = create_context_callback< @@ -568,6 +539,7 @@ void GroupReplayer::handle_bootstrap_group(int r) { dout(10) << "r=" << r << dendl; { std::lock_guard locker{m_lock}; +// Should never happen if (m_state == STATE_STOPPING || m_state == STATE_STOPPED) { dout(10) << "stop prevailed" <::handle_bootstrap_group(int r) { m_bootstrap_request->put(); m_bootstrap_request = nullptr; } - m_local_group_ctx.listener = &m_listener; - if (!m_local_group_ctx.name.empty()) { - m_local_group_name = m_local_group_ctx.name; - } } + if (!m_local_group_ctx.name.empty()) { + m_local_group_name = m_local_group_ctx.name; + } if (r == -EINVAL) { sync_group_names(); } else { @@ -591,38 +562,48 @@ void GroupReplayer::handle_bootstrap_group(int r) { if (finish_start_if_interrupted()) { return; } else if (r == -ENOENT) { - finish_start(r, "group removed"); + finish_start_fail(r, "group removed"); return; } else if (r == -EREMOTEIO) { - finish_start(r, "remote group is non-primary"); + finish_start_fail(r, "remote group is non-primary"); return; } else if (r == -EEXIST) { - finish_start(r, "split-brain detected"); + finish_start_fail(r, "split-brain detected"); + return; + } else if (m_remote_group_id.empty()){ + r = -EINVAL; + //FIXME: The primary should not care if the remote is ready. + // Bootstrap again when the replayer is restarted + finish_start_fail(r, "remote is not ready yet"); return; + } else if (r == -EINVAL) { + sync_group_names(); } else if (r < 0) { - finish_start(r, "bootstrap failed"); + finish_start_fail(r, "bootstrap failed"); return; } - if (!m_remote_group_id.empty()) { - C_SaferCond ctx; - create_group_replayer(&ctx); - ctx.wait(); - } else { - r = -EINVAL; - finish_start(r, "remote is not ready yet"); // bootstrap again + if (m_local_group_ctx.primary) { // XXXMG + set_mirror_group_status_update( + cls::rbd::MIRROR_GROUP_STATUS_STATE_STOPPED, + "local group is primary"); + finish_start_fail(0, "local group is primary"); return; } + + m_local_group_ctx.listener = &m_listener; + + create_group_replayer(); } template -void GroupReplayer::create_group_replayer(Context *on_finish) { +void GroupReplayer::create_group_replayer() { dout(10) << dendl; - auto ctx = new LambdaContext( - [this, on_finish](int r) { - handle_create_group_replayer(r, on_finish); - }); + ceph_assert(m_replayer == nullptr); + + auto ctx = create_context_callback< + GroupReplayer, &GroupReplayer::handle_create_group_replayer>(this); m_replayer = group_replayer::Replayer::create( m_threads, m_local_io_ctx, m_remote_group_peer.io_ctx, m_global_group_id, @@ -633,18 +614,21 @@ void GroupReplayer::create_group_replayer(Context *on_finish) { } template -void GroupReplayer::handle_create_group_replayer(int r, Context *on_finish) { +void GroupReplayer::handle_create_group_replayer(int r) { dout(10) << "r=" << r << dendl; - if (m_state == STATE_STOPPING || m_state == STATE_STOPPED) { - dout(10) << "stop prevailed" <complete(r); + if (finish_start_if_interrupted()) { + return; + } else if (r < 0) { + + finish_start_fail(r, "failed to create group replayer"); return; } - on_finish->complete(0); + start_image_replayers(); } +// TODO: Move this to group_replayer::Replayer? template void GroupReplayer::start_image_replayers() { dout(10) << m_image_replayers.size() << dendl; @@ -678,197 +662,154 @@ void GroupReplayer::handle_start_image_replayers(int r) { if (finish_start_if_interrupted()) { return; } else if (r < 0) { - finish_start(r, ""); + finish_start_fail(r, "failed to start image replayers"); return; } - finish_start(0, ""); -} + Context *on_finish = nullptr; + { + std::unique_lock locker{m_lock}; + ceph_assert(m_state == STATE_STARTING); + m_state = STATE_REPLAYING; + std::swap(m_on_start_finish, on_finish); + } -template -void GroupReplayer::stop_group_replayer() { - dout(10) << dendl; set_mirror_group_status_update( - cls::rbd::MIRROR_GROUP_STATUS_STATE_STOPPING_REPLAY, "stopping"); + cls::rbd::MIRROR_GROUP_STATUS_STATE_REPLAYING, "replaying"); - if (m_replayer != nullptr) { - C_SaferCond ctx; - m_replayer->shut_down(&ctx); - ctx.wait(); + if(on_finish) { + on_finish->complete(0); } - - handle_stop_group_replayer(0); } template -void GroupReplayer::handle_stop_group_replayer(int r) { - dout(10) << "r=" << r << dendl; - +bool GroupReplayer::finish_start_if_interrupted() { std::lock_guard locker{m_lock}; - stop_image_replayers(); + + return finish_start_if_interrupted(m_lock); } template -void GroupReplayer::stop_image_replayer(ImageReplayer *image_replayer, - Context *on_finish) { - dout(10) << image_replayer << " global_image_id=" - << image_replayer->get_global_image_id() << ", on_finish=" - << on_finish << dendl; - - if (image_replayer->is_stopped()) { - m_threads->work_queue->queue(on_finish, 0); - return; +bool GroupReplayer::finish_start_if_interrupted(ceph::mutex &lock) { + ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); + ceph_assert(m_state == STATE_STARTING); + if (!m_stop_requested) { + return false; } - m_async_op_tracker.start_op(); - Context *ctx = create_async_context_callback( - m_threads->work_queue, new LambdaContext( - [this, image_replayer, on_finish] (int r) { - stop_image_replayer(image_replayer, on_finish); - m_async_op_tracker.finish_op(); - })); - - if (image_replayer->is_running()) { - image_replayer->stop(ctx, false); - } else { - int after = 1; - dout(10) << "scheduling image replayer " << image_replayer << " stop after " - << after << " sec (task " << ctx << ")" << dendl; - ctx = new LambdaContext( - [this, after, ctx] (int r) { - std::lock_guard timer_locker{m_threads->timer_lock}; - m_threads->timer->add_event_after(after, ctx); - }); - m_threads->work_queue->queue(ctx, 0); - } + finish_start_fail(-ECANCELED, ""); + return true; } template -void GroupReplayer::stop_image_replayers() { - dout(10) << dendl; - - ceph_assert(ceph_mutex_is_locked(m_lock)); - - Context *ctx = create_async_context_callback( - m_threads->work_queue, create_context_callback, - &GroupReplayer::handle_stop_image_replayers>(this)); +void GroupReplayer::finish_start_fail(int r, const std::string &desc) { + dout(10) << "r=" << r << ", desc=" << desc << dendl; + Context *ctx = new LambdaContext([this, r, desc](int _r) { + m_status_state = cls::rbd::MIRROR_GROUP_STATUS_STATE_STOPPED; + m_state_desc = desc; + { + std::lock_guard locker{m_lock}; + ceph_assert(m_state == STATE_STARTING); + m_state = STATE_STOPPING; + if (r < 0) { + if (r == -ECANCELED) { + dout(10) << "start canceled" << dendl; + } else if (r == -ENOENT) { + dout(10) << "mirroring group removed" << dendl; + } else if (r == -EREMOTEIO) { + dout(10) << "mirroring group demoted" << dendl; + } else { + derr << "start failed: " << cpp_strerror(r) << dendl; + m_status_state = cls::rbd::MIRROR_GROUP_STATUS_STATE_ERROR; + } + } + } + shut_down(r); + }); - C_Gather *gather_ctx = new C_Gather(g_ceph_context, ctx); - for (auto &it : m_image_replayers) { - stop_image_replayer(it.second, gather_ctx->new_sub()); - } - gather_ctx->activate(); + m_threads->work_queue->queue(ctx, 0); } template -void GroupReplayer::handle_stop_image_replayers(int r) { - dout(10) << "r=" << r << dendl; - - ceph_assert(r == 0); +void GroupReplayer::shut_down(int r) { + dout(10) << "r=" << r << ", state=" << m_state << dendl; - Context *on_finish = nullptr; { std::lock_guard locker{m_lock}; - - for (auto &it : m_image_replayers) { - ceph_assert(it.second->is_stopped()); - it.second->destroy(); - } ceph_assert(m_state == STATE_STOPPING); - m_image_replayers.clear(); - - m_stop_requested = false; - m_state = STATE_STOPPED; - std::swap(on_finish, m_on_stop_finish); } - dout(15) << "waiting for in-flight operations to complete" << dendl; - m_async_op_tracker.wait_for_ops(new LambdaContext([this](int r) { - set_mirror_group_status_update( - cls::rbd::MIRROR_GROUP_STATUS_STATE_STOPPED, "stopped"); - })); + // chain the shut down sequence (reverse order) + Context *ctx = new LambdaContext( + [this, r](int _r) { + set_mirror_group_status_update(m_status_state, m_state_desc); + handle_shut_down(r); + }); - if (on_finish) { - on_finish->complete(r); - } -} + // stop and destroy the replayers + ctx = new LambdaContext([this, ctx](int r) { + { + std::lock_guard locker{m_lock}; -template -bool GroupReplayer::finish_start_if_interrupted() { - std::lock_guard locker{m_lock}; + for (auto &it : m_image_replayers) { + ceph_assert(it.second->is_stopped()); + it.second->destroy(); + } + m_image_replayers.clear(); + } + ctx->complete(0); + }); - return finish_start_if_interrupted(m_lock); -} + ctx = new LambdaContext([this, ctx](int r) { + C_Gather *gather_ctx = new C_Gather(g_ceph_context, ctx); + { + std::lock_guard locker{m_lock}; + for (auto &it : m_image_replayers) { + it.second->stop(gather_ctx->new_sub()); + } + } + gather_ctx->activate(); + }); -template -bool GroupReplayer::finish_start_if_interrupted(ceph::mutex &lock) { - ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); - ceph_assert(m_state == STATE_STARTING); - if (!m_stop_requested) { - return false; + if (m_replayer != nullptr) { + ctx = new LambdaContext([this, ctx](int r) { + m_replayer->destroy(); + m_replayer = nullptr; + ctx->complete(0); + }); + + ctx = new LambdaContext([this, ctx](int r) { + m_replayer->shut_down(ctx); + }); } - finish_start(-ECANCELED, ""); - return true; + m_threads->work_queue->queue(ctx, 0); } template -void GroupReplayer::finish_start(int r, const std::string &desc) { - dout(10) << "r=" << r << ", desc=" << desc << dendl; - Context *ctx = new LambdaContext( - [this, r, desc](int _r) { - Context *on_finish = nullptr; - { - std::lock_guard locker{m_lock}; - ceph_assert(m_state == STATE_STARTING); - m_state = STATE_REPLAYING; - std::swap(m_on_start_finish, on_finish); - m_state_desc = desc; - if (r < 0) { - auto state = cls::rbd::MIRROR_GROUP_STATUS_STATE_STOPPED; - if (r == -ECANCELED) { - dout(10) << "start canceled" << dendl; - } else if (r == -ENOENT) { - dout(10) << "mirroring group removed" << dendl; - } else if (r == -EREMOTEIO) { - dout(10) << "mirroring group demoted" << dendl; - } else { - derr << "start failed: " << cpp_strerror(r) << dendl; - state = cls::rbd::MIRROR_GROUP_STATUS_STATE_ERROR; - } - on_finish = new LambdaContext( - [this, r, state, desc, on_finish](int) { - set_mirror_group_status_update(state, desc); - - if (r == -ENOENT && !m_resync_requested) { - set_finished(true); - } - if (on_finish != nullptr) { - on_finish->complete(r); - } - }); - } - } +void GroupReplayer::handle_shut_down(int r) { - if (r < 0) { - stop(on_finish, false, false); - return; - } - - if (m_local_group_ctx.primary) { // XXXMG - set_mirror_group_status_update( - cls::rbd::MIRROR_GROUP_STATUS_STATE_STOPPED, - "local group is primary"); - } else { - set_mirror_group_status_update( - cls::rbd::MIRROR_GROUP_STATUS_STATE_REPLAYING, "replaying"); - } + dout(10) << "stop complete" << dendl; + Context *on_start = nullptr; + std::list on_stop_contexts; + { + std::lock_guard locker{m_lock}; + std::swap(on_start, m_on_start_finish); + on_stop_contexts = std::move(m_on_stop_contexts); + m_stop_requested = false; + ceph_assert(m_state == STATE_STOPPING); + m_state = STATE_STOPPED; + } - if (on_finish != nullptr) { - on_finish->complete(0); - } - }); - m_threads->work_queue->queue(ctx, 0); + if (on_start != nullptr) { + dout(10) << "on start finish complete, r=" << r << dendl; + on_start->complete(r); + r = 0; + } + for (auto ctx : on_stop_contexts) { + dout(10) << "on stop finish " << ctx << " complete, r=" << r << dendl; + ctx->complete(r); + } } @@ -881,12 +822,12 @@ void GroupReplayer::register_admin_socket_hook() { return; } - dout(15) << "registered asok hook: " << m_group_spec << dendl; asok_hook = new GroupReplayerAdminSocketHook( g_ceph_context, m_group_spec, this); int r = asok_hook->register_commands(); if (r == 0) { m_asok_hook = asok_hook; + dout(15) << "registered asok hook: " << m_group_spec << dendl; return; } derr << "error registering admin socket commands" << dendl; diff --git a/src/tools/rbd_mirror/GroupReplayer.h b/src/tools/rbd_mirror/GroupReplayer.h index 27dd214dc6c52..ee36e7192b51c 100644 --- a/src/tools/rbd_mirror/GroupReplayer.h +++ b/src/tools/rbd_mirror/GroupReplayer.h @@ -100,7 +100,6 @@ public: return (m_last_r == -EBLOCKLISTED); } - bool needs_restart() const; void sync_group_names(); image_replayer::HealthState get_health_state() const; @@ -206,9 +205,11 @@ private: AsyncOpTracker m_async_op_tracker; State m_state = STATE_STOPPED; std::string m_state_desc; + cls::rbd::MirrorGroupStatusState m_status_state; int m_last_r = 0; Context *m_on_start_finish = nullptr; + std::list m_on_stop_contexts; Context *m_on_stop_finish = nullptr; bool m_stop_requested = false; bool m_resync_requested = false; @@ -216,6 +217,7 @@ private: bool m_manual_stop = false; bool m_finished = false; + AdminSocketHook *m_asok_hook = nullptr; group_replayer::BootstrapRequest *m_bootstrap_request = nullptr; @@ -225,7 +227,6 @@ private: Listener m_listener = {this}; std::map, ImageReplayer *> m_image_replayer_index; std::map m_local_group_snaps; - std::map m_get_remote_group_snap_ret_vals; std::map *, Context *>> m_create_snap_requests; std::set m_pending_snap_create; @@ -254,26 +255,19 @@ private: return (m_state == STATE_REPLAYING); } + void on_stop_replay(int r = 0, const std::string &desc = ""); void bootstrap_group(); void handle_bootstrap_group(int r); - void create_group_replayer(Context *on_finish); - void handle_create_group_replayer(int r, Context *on_finish); + void create_group_replayer(); + void handle_create_group_replayer(int r); void start_image_replayers(); void handle_start_image_replayers(int r); bool finish_start_if_interrupted(); bool finish_start_if_interrupted(ceph::mutex &lock); - void finish_start(int r, const std::string &desc); - - void stop_group_replayer(); - void handle_stop_group_replayer(int r); - - void stop_image_replayer(ImageReplayer *image_replayer, - Context *on_finish); - void stop_image_replayers(); - void handle_stop_image_replayers(int r); + void finish_start_fail(int r, const std::string &desc); void register_admin_socket_hook(); void unregister_admin_socket_hook(); @@ -283,6 +277,9 @@ private: const std::string &desc); void wait_for_ops(); void handle_wait_for_ops(int r); + + void shut_down(int r); + void handle_shut_down(int r); }; } // namespace mirror diff --git a/src/tools/rbd_mirror/ImageMap.cc b/src/tools/rbd_mirror/ImageMap.cc index 9ec991a6d9f67..db76bceeab0f7 100644 --- a/src/tools/rbd_mirror/ImageMap.cc +++ b/src/tools/rbd_mirror/ImageMap.cc @@ -340,7 +340,7 @@ void ImageMap::notify_listener_acquire_release_images( for (auto const &update : release) { auto global_id = GlobalId(update.entity.type, update.entity.global_id); - if (update.entity.type == MIRROR_ENTITY_TYPE_IMAGE || update.entity.count != 0) { + if (update.entity.type == MIRROR_ENTITY_TYPE_IMAGE) { m_listener.release_image( update.entity.global_id, update.instance_id, create_async_context_callback( diff --git a/src/tools/rbd_mirror/InstanceReplayer.cc b/src/tools/rbd_mirror/InstanceReplayer.cc index 01e61a09e43a2..9c573c6debae7 100644 --- a/src/tools/rbd_mirror/InstanceReplayer.cc +++ b/src/tools/rbd_mirror/InstanceReplayer.cc @@ -658,10 +658,6 @@ void InstanceReplayer::start_group_replayer( std::string global_group_id = group_replayer->get_global_group_id(); if (!group_replayer->is_stopped()) { - if (group_replayer->needs_restart()) { - stop_group_replayer(group_replayer, new C_TrackedOp(m_async_op_tracker, - nullptr)); - } return; } else if (group_replayer->is_blocklisted()) { derr << "global_group_id=" << global_group_id << ": blocklisted detected " diff --git a/src/tools/rbd_mirror/group_replayer/Replayer.cc b/src/tools/rbd_mirror/group_replayer/Replayer.cc index 3c45a83d8cf97..9f278dccf2b93 100644 --- a/src/tools/rbd_mirror/group_replayer/Replayer.cc +++ b/src/tools/rbd_mirror/group_replayer/Replayer.cc @@ -77,9 +77,9 @@ bool Replayer::is_replay_interrupted() { template bool Replayer::is_replay_interrupted(std::unique_lock* locker) { + if (m_state == STATE_COMPLETE) { locker->unlock(); - return true; } @@ -88,14 +88,58 @@ bool Replayer::is_replay_interrupted(std::unique_lock* locker) { template void Replayer::schedule_load_group_snapshots() { + + std::lock_guard timer_locker{m_threads->timer_lock}; + std::lock_guard locker{m_lock}; + + if (m_state != STATE_REPLAYING) { + return; + } + dout(10) << dendl; + ceph_assert(m_load_snapshots_task == nullptr); + m_load_snapshots_task = create_context_callback< + Replayer, + &Replayer::handle_schedule_load_group_snapshots>(this); + + m_threads->timer->add_event_after(1, m_load_snapshots_task); +} + +template +void Replayer::handle_schedule_load_group_snapshots(int r) { + dout(10) << dendl; + ceph_assert(ceph_mutex_is_locked_by_me(m_threads->timer_lock)); + + { + std::unique_lock locker{m_lock}; + if (m_state != STATE_REPLAYING) { + return; + } + } + + ceph_assert(m_load_snapshots_task != nullptr); + m_load_snapshots_task = nullptr; + auto ctx = new LambdaContext( [this](int r) { load_local_group_snapshots(); }); - std::lock_guard timer_locker{m_threads->timer_lock}; - m_threads->timer->add_event_after(1, ctx); + m_threads->work_queue->queue(ctx, 0); +} + +template +void Replayer::cancel_load_group_snapshots() { + dout(10) << dendl; + + std::unique_lock timer_locker{m_threads->timer_lock}; + if (m_load_snapshots_task != nullptr) { + dout(10) << dendl; + + if (m_threads->timer->cancel_event(m_load_snapshots_task)) { + m_load_snapshots_task = nullptr; + } + } } template @@ -123,6 +167,7 @@ int Replayer::local_group_image_list_by_id( do { std::vector image_ids_page; +//TODO: Make this async r = librbd::cls_client::group_image_list(&m_local_io_ctx, group_header_oid, start_last, max_read, &image_ids_page); @@ -152,6 +197,7 @@ bool Replayer::is_resync_requested() { std::string group_header_oid = librbd::util::group_header_name( m_local_group_id); std::string value; +// TODO: make this async int r = librbd::cls_client::metadata_get(&m_local_io_ctx, group_header_oid, RBD_GROUP_RESYNC, &value); if (r < 0 && r != -ENOENT) { @@ -212,7 +258,8 @@ template void Replayer::load_local_group_snapshots() { dout(10) << "m_local_group_id=" << m_local_group_id << dendl; - if (is_replay_interrupted()) { + std::unique_lock locker{m_lock}; + if (is_replay_interrupted(&locker)) { return; } @@ -227,14 +274,16 @@ void Replayer::load_local_group_snapshots() { dout(10) << "local group resync requested" << dendl; // send stop for Group Replayer notify_group_listener_stop(); + return; } else if (is_rename_requested()) { m_stop_requested = true; dout(10) << "remote group rename requested" << dendl; // send stop for Group Replayer notify_group_listener_stop(); + return; } - std::unique_lock locker{m_lock}; + m_in_flight_op_tracker.start_op(); m_local_group_snaps.clear(); auto ctx = create_context_callback< Replayer, @@ -249,6 +298,12 @@ template void Replayer::handle_load_local_group_snapshots(int r) { dout(10) << "r=" << r << dendl; + if (is_replay_interrupted()) { + m_in_flight_op_tracker.finish_op(); + return; + } + m_in_flight_op_tracker.finish_op(); + if (r < 0) { derr << "error listing local mirror group snapshots: " << cpp_strerror(r) << dendl; @@ -268,6 +323,7 @@ void Replayer::handle_load_local_group_snapshots(int r) { } // this is primary, IDLE the group replayer m_state = STATE_IDLE; + notify_group_listener_stop(); return; } @@ -288,6 +344,7 @@ void Replayer::load_remote_group_snapshots() { handle_load_remote_group_snapshots(r); }); + m_in_flight_op_tracker.start_op(); auto req = librbd::group::ListSnapshotsRequest::create(m_remote_io_ctx, m_remote_group_id, true, true, &m_remote_group_snaps, ctx); req->send(); @@ -297,6 +354,12 @@ template void Replayer::handle_load_remote_group_snapshots(int r) { dout(10) << "r=" << r << dendl; + if (is_replay_interrupted()) { + m_in_flight_op_tracker.finish_op(); + return; + } + m_in_flight_op_tracker.finish_op(); + if (r < 0) { derr << "error listing remote mirror group snapshots: " << cpp_strerror(r) << dendl; @@ -1134,17 +1197,44 @@ template void Replayer::shut_down(Context* on_finish) { dout(10) << dendl; - std::unique_lock locker{m_lock}; - m_stop_requested = true; - auto state = STATE_COMPLETE; - std::swap(m_state, state); - locker.unlock(); - if (on_finish) { - on_finish->complete(0); + { + std::unique_lock locker{m_lock}; + m_stop_requested = true; + ceph_assert(m_on_shutdown == nullptr); + std::swap(m_on_shutdown, on_finish); + + auto state = STATE_COMPLETE; + std::swap(m_state, state); } + + cancel_load_group_snapshots(); + + if (!m_in_flight_op_tracker.empty()) { + m_in_flight_op_tracker.wait_for_ops(new LambdaContext([this](int) { + finish_shut_down(); + })); + return; + } + + finish_shut_down(); return; } +template +void Replayer::finish_shut_down() { + dout(10) << dendl; + + Context *on_finish = nullptr; + + { + std::unique_lock locker{m_lock}; + ceph_assert(m_on_shutdown != nullptr); + std::swap(m_on_shutdown, on_finish); + } + if (on_finish) { + on_finish->complete(0); + } +} } // namespace group_replayer } // namespace mirror diff --git a/src/tools/rbd_mirror/group_replayer/Replayer.h b/src/tools/rbd_mirror/group_replayer/Replayer.h index e977b1469a0df..445ac67ae2b7c 100644 --- a/src/tools/rbd_mirror/group_replayer/Replayer.h +++ b/src/tools/rbd_mirror/group_replayer/Replayer.h @@ -4,12 +4,13 @@ #ifndef RBD_MIRROR_GROUP_REPLAYER_REPLAYER_H #define RBD_MIRROR_GROUP_REPLAYER_REPLAYER_H -#include "tools/rbd_mirror/image_replayer/Replayer.h" +#include "common/AsyncOpTracker.h" #include "common/ceph_mutex.h" #include "cls/rbd/cls_rbd_types.h" #include "include/rados/librados.hpp" #include "librbd/mirror/snapshot/Types.h" #include "tools/rbd_mirror/Types.h" +#include "tools/rbd_mirror/image_replayer/Replayer.h" #include "tools/rbd_mirror/image_replayer/Types.h" #include @@ -64,6 +65,7 @@ public: } void init(Context* on_finish); void shut_down(Context* on_finish); + void finish_shut_down(); bool is_replaying() const { std::unique_lock locker{m_lock}; @@ -98,6 +100,11 @@ private: std::vector m_local_group_snaps; std::vector m_remote_group_snaps; + Context* m_load_snapshots_task = nullptr; + Context* m_on_shutdown = nullptr; + + AsyncOpTracker m_in_flight_op_tracker; + bool m_stop_requested = false; // map of > @@ -112,6 +119,9 @@ private: std::vector *image_ids); void schedule_load_group_snapshots(); + void handle_schedule_load_group_snapshots(int r); + void cancel_load_group_snapshots(); + void notify_group_listener_stop(); bool is_resync_requested(); bool is_rename_requested(); -- 2.39.5