From 6f29657c3d1ea991e4c39f2ff694e490fdb395d8 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Fri, 13 Dec 2024 19:26:47 -0800 Subject: [PATCH] crimson/.../replicated_request: split discard check and reply send out of PG::handle_rep_op Signed-off-by: Samuel Just --- .../osd/osd_operations/replicated_request.cc | 14 +++++++++++++- src/crimson/osd/pg.cc | 13 ++++--------- src/crimson/osd/pg.h | 8 +++++++- 3 files changed, 24 insertions(+), 11 deletions(-) diff --git a/src/crimson/osd/osd_operations/replicated_request.cc b/src/crimson/osd/osd_operations/replicated_request.cc index 4768b83e27c..0eb1a270db2 100644 --- a/src/crimson/osd/osd_operations/replicated_request.cc +++ b/src/crimson/osd/osd_operations/replicated_request.cc @@ -76,7 +76,19 @@ RepRequest::interruptible_future<> RepRequest::with_pg_interruptible( return pg->osdmap_gate.wait_for_map( std::move(trigger), req->min_epoch); })); - co_await pg->handle_rep_op(req); + + if (pg->can_discard_replica_op(*req)) { + co_return; + } + + auto [commit_fut, reply] = co_await pg->handle_rep_op(req); + + co_await std::move(commit_fut); + + co_await interruptor::make_interruptible( + pg->shard_services.send_to_osd( + req->from.osd, std::move(reply), pg->get_osdmap_epoch()) + ); } seastar::future<> RepRequest::with_pg( diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc index d812d822550..bf521498abf 100644 --- a/src/crimson/osd/pg.cc +++ b/src/crimson/osd/pg.cc @@ -1215,13 +1215,10 @@ void PG::update_stats(const pg_stat_t &stat) { ); } -PG::interruptible_future<> PG::handle_rep_op(Ref req) +PG::handle_rep_op_fut PG::handle_rep_op(Ref req) { LOG_PREFIX(PG::handle_rep_op); DEBUGDPP("{}", *this, *req); - if (can_discard_replica_op(*req)) { - co_return; - } ceph::os::Transaction txn; auto encoded_txn = req->get_data().cbegin(); @@ -1243,7 +1240,8 @@ PG::interruptible_future<> PG::handle_rep_op(Ref req) txn, false); DEBUGDPP("{} do_transaction", *this, *req); - co_await interruptor::make_interruptible( + + auto commit_fut = interruptor::make_interruptible( shard_services.get_store().do_transaction(coll_ref, std::move(txn)) ); @@ -1254,10 +1252,7 @@ PG::interruptible_future<> PG::handle_rep_op(Ref req) req.get(), pg_whoami, 0, map_epoch, req->get_min_epoch(), CEPH_OSD_FLAG_ONDISK); reply->set_last_complete_ondisk(lcod); - co_await interruptor::make_interruptible( - shard_services.send_to_osd(req->from.osd, std::move(reply), map_epoch) - ); - co_return; + co_return handle_rep_op_ret(std::move(commit_fut), std::move(reply)); } PG::interruptible_future<> PG::update_snap_map( diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index d87d0b2d0e9..6db73ee835b 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -596,7 +596,13 @@ public: using with_obc_func_t = std::function (ObjectContextRef, ObjectContextRef)>; - interruptible_future<> handle_rep_op(Ref m); + using handle_rep_op_ret = std::tuple< + interruptible_future<>, // resolves upon commit + MURef // reply message + >; + // outer future resolves upon submission + using handle_rep_op_fut = interruptible_future; + handle_rep_op_fut handle_rep_op(Ref m); void update_stats(const pg_stat_t &stat); interruptible_future<> update_snap_map( const std::vector &log_entries, -- 2.39.5