From: Samuel Just Date: Mon, 5 Feb 2024 00:04:49 +0000 (-0800) Subject: crimson/.../client_request: factor out with_pg_interruptible X-Git-Tag: testing/wip-batrick-testing-20240411.154038~54^2~7 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=b521cc3d822c39084f72c4f22f621928a184d731;p=ceph-ci.git crimson/.../client_request: factor out with_pg_interruptible Signed-off-by: Samuel Just --- diff --git a/src/crimson/osd/osd_operations/client_request.cc b/src/crimson/osd/osd_operations/client_request.cc index 00ed92adf65..9b9de50c149 100644 --- a/src/crimson/osd/osd_operations/client_request.cc +++ b/src/crimson/osd/osd_operations/client_request.cc @@ -98,7 +98,72 @@ bool ClientRequest::is_pg_op() const [](auto& op) { return ceph_osd_op_type_pg(op.op.op); }); } -seastar::future<> ClientRequest::with_pg_process(Ref pgref) +ClientRequest::interruptible_future<> ClientRequest::with_pg_process_interruptible( + Ref pgref, const unsigned this_instance_id, instance_handle_t &ihref) +{ + LOG_PREFIX(ClientRequest::with_pg_process); + DEBUGDPP( + "{}: same_interval_since: {}", + *pgref, *this, pgref->get_interval_start_epoch()); + + DEBUGDPP("{} start", *pgref, *this); + PG &pg = *pgref; + if (pg.can_discard_op(*m)) { + return shard_services->send_incremental_map( + std::ref(get_foreign_connection()), m->get_map_epoch() + ).then([FNAME, this, this_instance_id, pgref] { + DEBUGDPP("{}: discarding {}", *pgref, *this, this_instance_id); + pgref->client_request_orderer.remove_request(*this); + complete_request(); + return interruptor::now(); + }); + } + DEBUGDPP("{}.{}: entering await_map stage", + *pgref, *this, this_instance_id); + return ihref.enter_stage(client_pp(pg).await_map, *this + ).then_interruptible([FNAME, this, this_instance_id, &pg, &ihref] { + DEBUGDPP("{}.{}: entered await_map stage, waiting for map", + pg, *this, this_instance_id); + return ihref.enter_blocker( + *this, pg.osdmap_gate, &decltype(pg.osdmap_gate)::wait_for_map, + m->get_min_epoch(), nullptr); + }).then_interruptible( + [FNAME, this, this_instance_id, &pg, &ihref](auto map_epoch) { + DEBUGDPP("{}.{}: map epoch got {}, entering wait_for_active", + pg, *this, this_instance_id, map_epoch); + return ihref.enter_stage(client_pp(pg).wait_for_active, *this); + }).then_interruptible([FNAME, this, this_instance_id, &pg, &ihref]() { + DEBUGDPP("{}.{}: entered wait_for_active stage, waiting for active", + pg, *this, this_instance_id); + return ihref.enter_blocker( + *this, + pg.wait_for_active_blocker, + &decltype(pg.wait_for_active_blocker)::wait); + }).then_interruptible( + [FNAME, this, pgref, this_instance_id, &ihref]() mutable + -> interruptible_future<> { + DEBUGDPP("{}.{}: pg active, entering process[_pg]_op", + *pgref, *this, this_instance_id); + if (is_pg_op()) { + return process_pg_op(pgref); + } else { + return process_op(ihref, pgref, this_instance_id); + } + }).then_interruptible([FNAME, this, this_instance_id, pgref, &ihref] { + DEBUGDPP("{}.{}: process[_pg]_op complete, completing handle", + *pgref, *this, this_instance_id); + return ihref.handle.complete(); + }).then_interruptible([FNAME, this, this_instance_id, pgref] { + DEBUGDPP("{}.{}: process[_pg]_op complete," + "removing request from orderer", + *pgref, *this, this_instance_id); + pgref->client_request_orderer.remove_request(*this); + complete_request(); + }); +} + +seastar::future<> ClientRequest::with_pg_process( + Ref pgref) { ceph_assert_always(shard_services); LOG_PREFIX(ClientRequest::with_pg_process); @@ -110,61 +175,8 @@ seastar::future<> ClientRequest::with_pg_process(Ref pgref) auto instance_handle = get_instance_handle(); auto &ihref = *instance_handle; return interruptor::with_interruption( - [FNAME, this, pgref, this_instance_id, &ihref]() mutable { - DEBUGDPP("{} start", *pgref, *this); - PG &pg = *pgref; - if (pg.can_discard_op(*m)) { - return shard_services->send_incremental_map( - std::ref(get_foreign_connection()), m->get_map_epoch() - ).then([FNAME, this, this_instance_id, pgref] { - DEBUGDPP("{}: discarding {}", *pgref, *this, this_instance_id); - pgref->client_request_orderer.remove_request(*this); - complete_request(); - return interruptor::now(); - }); - } - DEBUGDPP("{}.{}: entering await_map stage", - *pgref, *this, this_instance_id); - return ihref.enter_stage(client_pp(pg).await_map, *this - ).then_interruptible([FNAME, this, this_instance_id, &pg, &ihref] { - DEBUGDPP("{}.{}: entered await_map stage, waiting for map", - pg, *this, this_instance_id); - return ihref.enter_blocker( - *this, pg.osdmap_gate, &decltype(pg.osdmap_gate)::wait_for_map, - m->get_min_epoch(), nullptr); - }).then_interruptible( - [FNAME, this, this_instance_id, &pg, &ihref](auto map_epoch) { - DEBUGDPP("{}.{}: map epoch got {}, entering wait_for_active", - pg, *this, this_instance_id, map_epoch); - return ihref.enter_stage(client_pp(pg).wait_for_active, *this); - }).then_interruptible([FNAME, this, this_instance_id, &pg, &ihref]() { - DEBUGDPP("{}.{}: entered wait_for_active stage, waiting for active", - pg, *this, this_instance_id); - return ihref.enter_blocker( - *this, - pg.wait_for_active_blocker, - &decltype(pg.wait_for_active_blocker)::wait); - }).then_interruptible( - [FNAME, this, pgref, this_instance_id, &ihref]() mutable - -> interruptible_future<> { - DEBUGDPP("{}.{}: pg active, entering process[_pg]_op", - *pgref, *this, this_instance_id); - if (is_pg_op()) { - return process_pg_op(pgref); - } else { - return process_op(ihref, pgref, this_instance_id); - } - }).then_interruptible([FNAME, this, this_instance_id, pgref, &ihref] { - DEBUGDPP("{}.{}: process[_pg]_op complete, completing handle", - *pgref, *this, this_instance_id); - return ihref.handle.complete(); - }).then_interruptible([FNAME, this, this_instance_id, pgref] { - DEBUGDPP("{}.{}: process[_pg]_op complete," - "removing request from orderer", - *pgref, *this, this_instance_id); - pgref->client_request_orderer.remove_request(*this); - complete_request(); - }); + [this, pgref, this_instance_id, &ihref]() mutable { + return with_pg_process_interruptible(pgref, this_instance_id, ihref); }, [FNAME, this, this_instance_id, pgref](std::exception_ptr eptr) { DEBUGDPP("{}.{}: interrupted due to {}", *pgref, *this, this_instance_id, eptr); diff --git a/src/crimson/osd/osd_operations/client_request.h b/src/crimson/osd/osd_operations/client_request.h index 3bc81fd4d44..67c3c9b1228 100644 --- a/src/crimson/osd/osd_operations/client_request.h +++ b/src/crimson/osd/osd_operations/client_request.h @@ -253,6 +253,9 @@ public: r_conn = make_local_shared_foreign(std::move(conn)); } + interruptible_future<> with_pg_process_interruptible( + Ref pgref, const unsigned instance_id, instance_handle_t &ihref); + seastar::future<> with_pg_process(Ref pg); public: