From c5591f5cd84c0d89da377bfcebacbd63440b5983 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Wed, 15 Jan 2025 22:34:09 +0000 Subject: [PATCH] crimson/.../replicated_recovery_backend: convert handle_pull_response to coroutine Signed-off-by: Samuel Just --- .../osd/replicated_recovery_backend.cc | 75 +++++++++---------- 1 file changed, 35 insertions(+), 40 deletions(-) diff --git a/src/crimson/osd/replicated_recovery_backend.cc b/src/crimson/osd/replicated_recovery_backend.cc index 901fe2647d3..01cb16bc7fa 100644 --- a/src/crimson/osd/replicated_recovery_backend.cc +++ b/src/crimson/osd/replicated_recovery_backend.cc @@ -936,54 +936,49 @@ ReplicatedRecoveryBackend::handle_pull_response( LOG_PREFIX(ReplicatedRecoveryBackend::handle_pull_response); if (pg.can_discard_replica_op(*m)) { DEBUGDPP("discarding {}", pg, *m); - return seastar::now(); + co_return; } - const PushOp& push_op = m->pushes[0]; //TODO: only one push per message for now. + PushOp& push_op = m->pushes[0]; //TODO: only one push per message for now. if (push_op.version == eversion_t()) { // replica doesn't have it! pg.get_recovery_handler()->on_failed_recover({ m->from }, push_op.soid, get_recovering(push_op.soid).pull_info->recovery_info.version); - return seastar::make_exception_future<>( - std::runtime_error(fmt::format( - "Error on pushing side {} when pulling obj {}", - m->from, push_op.soid))); + throw std::runtime_error( + fmt::format( + "Error on pushing side {} when pulling obj {}", + m->from, push_op.soid)); } DEBUGDPP("{}", pg, *m); - return seastar::do_with(PullOp(), [this, m](auto& response) { - return seastar::do_with(ceph::os::Transaction(), m.get(), - [FNAME, this, &response](auto& t, auto& m) { - pg_shard_t from = m->from; - PushOp& push_op = m->pushes[0]; // only one push per message for now - return _handle_pull_response(from, push_op, &response, &t - ).then_interruptible( - [FNAME, this, &t](bool complete) { - epoch_t epoch_frozen = pg.get_osdmap_epoch(); - DEBUGDPP("submitting transaction", pg); - return shard_services.get_store().do_transaction(coll, std::move(t)) - .then([this, epoch_frozen, complete, - last_complete = pg.get_info().last_complete] { - pg.get_recovery_handler()->_committed_pushed_object(epoch_frozen, last_complete); - return seastar::make_ready_future(complete); - }); - }); - }).then_interruptible([this, m, &response](bool complete) { - if (complete) { - auto& push_op = m->pushes[0]; - get_recovering(push_op.soid).set_pulled(); - return seastar::make_ready_future<>(); - } else { - auto reply = crimson::make_message(); - reply->from = pg.get_pg_whoami(); - reply->set_priority(m->get_priority()); - reply->pgid = pg.get_info().pgid; - reply->map_epoch = m->map_epoch; - reply->min_epoch = m->min_epoch; - reply->set_pulls({std::move(response)}); - return shard_services.send_to_osd(m->from.osd, std::move(reply), pg.get_osdmap_epoch()); - } - }); - }); + PullOp response; + ceph::os::Transaction t; + + pg_shard_t from = m->from; + + const bool complete = co_await _handle_pull_response( + from, push_op, &response, &t); + + epoch_t epoch_frozen = pg.get_osdmap_epoch(); + DEBUGDPP("submitting transaction", pg); + co_await interruptor::make_interruptible( + shard_services.get_store().do_transaction(coll, std::move(t))); + pg.get_recovery_handler()->_committed_pushed_object( + epoch_frozen, pg.get_info().last_complete); + + if (complete) { + get_recovering(push_op.soid).set_pulled(); + } else { + auto reply = crimson::make_message(); + reply->from = pg.get_pg_whoami(); + reply->set_priority(m->get_priority()); + reply->pgid = pg.get_info().pgid; + reply->map_epoch = m->map_epoch; + reply->min_epoch = m->min_epoch; + reply->set_pulls({std::move(response)}); + co_await interruptor::make_interruptible( + shard_services.send_to_osd( + m->from.osd, std::move(reply), pg.get_osdmap_epoch())); + } } RecoveryBackend::interruptible_future<> -- 2.39.5