From 904a9937b6560f9a1f936f7f79badc56d8949dd1 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Mon, 5 Feb 2024 20:46:19 +0000 Subject: [PATCH] crimson/.../client_request: convert with_pg_process_interruptible coroutine Signed-off-by: Samuel Just --- .../osd/osd_operations/client_request.cc | 86 +++++++++---------- 1 file changed, 41 insertions(+), 45 deletions(-) diff --git a/src/crimson/osd/osd_operations/client_request.cc b/src/crimson/osd/osd_operations/client_request.cc index 9b9de50c149d3..a3c895d965410 100644 --- a/src/crimson/osd/osd_operations/client_request.cc +++ b/src/crimson/osd/osd_operations/client_request.cc @@ -4,6 +4,7 @@ #include "messages/MOSDOp.h" #include "messages/MOSDOpReply.h" +#include "crimson/common/coroutine.h" #include "crimson/common/exception.h" #include "crimson/common/log.h" #include "crimson/osd/pg.h" @@ -109,57 +110,52 @@ ClientRequest::interruptible_future<> ClientRequest::with_pg_process_interruptib DEBUGDPP("{} start", *pgref, *this); PG &pg = *pgref; if (pg.can_discard_op(*m)) { - return shard_services->send_incremental_map( - std::ref(get_foreign_connection()), m->get_map_epoch() - ).then([FNAME, this, this_instance_id, pgref] { - DEBUGDPP("{}: discarding {}", *pgref, *this, this_instance_id); - pgref->client_request_orderer.remove_request(*this); - complete_request(); - return interruptor::now(); - }); + co_await interruptor::make_interruptible( + shard_services->send_incremental_map( + std::ref(get_foreign_connection()), m->get_map_epoch() + )); + DEBUGDPP("{}: discarding {}", *pgref, *this, this_instance_id); + pgref->client_request_orderer.remove_request(*this); + complete_request(); + co_return; } DEBUGDPP("{}.{}: entering await_map stage", *pgref, *this, this_instance_id); - return ihref.enter_stage(client_pp(pg).await_map, *this - ).then_interruptible([FNAME, this, this_instance_id, &pg, &ihref] { - DEBUGDPP("{}.{}: entered await_map stage, waiting for map", - pg, *this, this_instance_id); - return ihref.enter_blocker( + co_await ihref.enter_stage(client_pp(pg).await_map, *this); + DEBUGDPP("{}.{}: entered await_map stage, waiting for map", + pg, *this, this_instance_id); + auto map_epoch = co_await interruptor::make_interruptible( + ihref.enter_blocker( *this, pg.osdmap_gate, &decltype(pg.osdmap_gate)::wait_for_map, - m->get_min_epoch(), nullptr); - }).then_interruptible( - [FNAME, this, this_instance_id, &pg, &ihref](auto map_epoch) { - DEBUGDPP("{}.{}: map epoch got {}, entering wait_for_active", - pg, *this, this_instance_id, map_epoch); - return ihref.enter_stage(client_pp(pg).wait_for_active, *this); - }).then_interruptible([FNAME, this, this_instance_id, &pg, &ihref]() { - DEBUGDPP("{}.{}: entered wait_for_active stage, waiting for active", - pg, *this, this_instance_id); - return ihref.enter_blocker( + m->get_min_epoch(), nullptr)); + + DEBUGDPP("{}.{}: map epoch got {}, entering wait_for_active", + pg, *this, this_instance_id, map_epoch); + co_await ihref.enter_stage(client_pp(pg).wait_for_active, *this); + + DEBUGDPP("{}.{}: entered wait_for_active stage, waiting for active", + pg, *this, this_instance_id); + co_await interruptor::make_interruptible( + ihref.enter_blocker( *this, pg.wait_for_active_blocker, - &decltype(pg.wait_for_active_blocker)::wait); - }).then_interruptible( - [FNAME, this, pgref, this_instance_id, &ihref]() mutable - -> interruptible_future<> { - DEBUGDPP("{}.{}: pg active, entering process[_pg]_op", - *pgref, *this, this_instance_id); - if (is_pg_op()) { - return process_pg_op(pgref); - } else { - return process_op(ihref, pgref, this_instance_id); - } - }).then_interruptible([FNAME, this, this_instance_id, pgref, &ihref] { - DEBUGDPP("{}.{}: process[_pg]_op complete, completing handle", - *pgref, *this, this_instance_id); - return ihref.handle.complete(); - }).then_interruptible([FNAME, this, this_instance_id, pgref] { - DEBUGDPP("{}.{}: process[_pg]_op complete," - "removing request from orderer", - *pgref, *this, this_instance_id); - pgref->client_request_orderer.remove_request(*this); - complete_request(); - }); + &decltype(pg.wait_for_active_blocker)::wait)); + + DEBUGDPP("{}.{}: pg active, entering process[_pg]_op", + *pgref, *this, this_instance_id); + + co_await (is_pg_op() ? process_pg_op(pgref) : + process_op(ihref, pgref, this_instance_id)); + + DEBUGDPP("{}.{}: process[_pg]_op complete, completing handle", + *pgref, *this, this_instance_id); + co_await interruptor::make_interruptible(ihref.handle.complete()); + + DEBUGDPP("{}.{}: process[_pg]_op complete," + "removing request from orderer", + *pgref, *this, this_instance_id); + pgref->client_request_orderer.remove_request(*this); + complete_request(); } seastar::future<> ClientRequest::with_pg_process( -- 2.39.5