From cfcbb82956a019c6d44724c3c20db538b3290c66 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Wed, 15 Jan 2025 22:15:36 +0000 Subject: [PATCH] crimson/.../replicated_recovery_backend: convert _handle_pull_response to coroutine Signed-off-by: Samuel Just --- .../osd/replicated_recovery_backend.cc | 87 +++++++++---------- 1 file changed, 40 insertions(+), 47 deletions(-) diff --git a/src/crimson/osd/replicated_recovery_backend.cc b/src/crimson/osd/replicated_recovery_backend.cc index 3e3361edb5d..901fe2647d3 100644 --- a/src/crimson/osd/replicated_recovery_backend.cc +++ b/src/crimson/osd/replicated_recovery_backend.cc @@ -840,10 +840,8 @@ ReplicatedRecoveryBackend::_handle_pull_response( if (pull_info.recovery_info.version == eversion_t()) pull_info.recovery_info.version = push_op.version; - auto prepare_waiter = interruptor::make_interruptible( - seastar::make_ready_future<>()); if (pull_info.recovery_progress.first) { - prepare_waiter = pg.obc_loader.with_obc( + auto fut = pg.obc_loader.with_obc( pull_info.recovery_info.soid, [FNAME, this, &pull_info, &recovery_waiter, &push_op](auto, auto obc) { pull_info.obc = obc; @@ -874,55 +872,50 @@ ReplicatedRecoveryBackend::_handle_pull_response( } return crimson::osd::PG::load_obc_ertr::now(); }, false).handle_error_interruptible(crimson::ct_error::assert_all{}); + co_await std::move(fut); }; - return prepare_waiter.then_interruptible( - [FNAME, this, &pull_info, &push_op, t, response]() mutable { - const bool first = pull_info.recovery_progress.first; - pull_info.recovery_progress = push_op.after_progress; - DEBUGDPP("new recovery_info {}, new progress {}", - pg, pull_info.recovery_info, pull_info.recovery_progress); - interval_set data_zeros; - { - uint64_t offset = push_op.before_progress.data_recovered_to; - uint64_t length = (push_op.after_progress.data_recovered_to - - push_op.before_progress.data_recovered_to); - if (length) { - data_zeros.insert(offset, length); - } + + const bool first = pull_info.recovery_progress.first; + pull_info.recovery_progress = push_op.after_progress; + DEBUGDPP("new recovery_info {}, new progress {}", + pg, pull_info.recovery_info, pull_info.recovery_progress); + interval_set data_zeros; + { + uint64_t offset = push_op.before_progress.data_recovered_to; + uint64_t length = (push_op.after_progress.data_recovered_to - + push_op.before_progress.data_recovered_to); + if (length) { + data_zeros.insert(offset, length); } - auto [usable_intervals, data] = - trim_pushed_data(pull_info.recovery_info.copy_subset, - push_op.data_included, push_op.data); - bool complete = pull_info.is_complete(); - bool clear_omap = !push_op.before_progress.omap_complete; - return submit_push_data(pull_info.recovery_info, - first, complete, clear_omap, + } + auto [usable_intervals, data] = + trim_pushed_data(pull_info.recovery_info.copy_subset, + push_op.data_included, push_op.data); + bool complete = pull_info.is_complete(); + bool clear_omap = !push_op.before_progress.omap_complete; + co_await submit_push_data(pull_info.recovery_info, + first, complete, clear_omap, std::move(data_zeros), std::move(usable_intervals), std::move(data), std::move(push_op.omap_header), - push_op.attrset, std::move(push_op.omap_entries), t) - .then_interruptible( - [this, response, &pull_info, &push_op, complete, - t, bytes_recovered=data.length()]() - -> RecoveryBackend::interruptible_future { - pull_info.stat.num_keys_recovered += push_op.omap_entries.size(); - pull_info.stat.num_bytes_recovered += bytes_recovered; + push_op.attrset, std::move(push_op.omap_entries), t); - if (complete) { - pull_info.stat.num_objects_recovered++; - return pg.get_recovery_handler()->on_local_recover( - push_op.soid, get_recovering(push_op.soid).pull_info->recovery_info, - false, *t - ).then_interruptible([] { - return true; - }); - } else { - response->soid = push_op.soid; - response->recovery_info = pull_info.recovery_info; - response->recovery_progress = pull_info.recovery_progress; - return seastar::make_ready_future(false); - } - }); - }); + const auto bytes_recovered = data.length(); + pull_info.stat.num_keys_recovered += push_op.omap_entries.size(); + pull_info.stat.num_bytes_recovered += bytes_recovered; + + if (complete) { + pull_info.stat.num_objects_recovered++; + co_await pg.get_recovery_handler()->on_local_recover( + push_op.soid, get_recovering(push_op.soid).pull_info->recovery_info, + false, *t + ); + co_return true; + } else { + response->soid = push_op.soid; + response->recovery_info = pull_info.recovery_info; + response->recovery_progress = pull_info.recovery_progress; + co_return false; + } } void ReplicatedRecoveryBackend::recalc_subsets( -- 2.39.5