From: Samuel Just Date: Fri, 29 Apr 2022 22:37:57 +0000 (+0000) Subject: crimson/osd/osd_operations/client_request: refactor for OSD::start_pg_operation X-Git-Tag: v18.0.0~914^2~4 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=580b5e24c6ad7f9cbf36fecd4be6af2c7ad053e2;p=ceph-ci.git crimson/osd/osd_operations/client_request: refactor for OSD::start_pg_operation Signed-off-by: Samuel Just --- diff --git a/src/crimson/osd/osd.cc b/src/crimson/osd/osd.cc index c73920c2ce7..6e1d1bf5191 100644 --- a/src/crimson/osd/osd.cc +++ b/src/crimson/osd/osd.cc @@ -1198,7 +1198,7 @@ seastar::future<> OSD::committed_osd_maps(version_t first, seastar::future<> OSD::handle_osd_op(crimson::net::ConnectionRef conn, Ref m) { - (void) shard_services.start_operation( + (void) start_pg_operation( *this, conn, std::move(m)); diff --git a/src/crimson/osd/osd_operations/client_request.cc b/src/crimson/osd/osd_operations/client_request.cc index b6ac8604046..bc0c3d94397 100644 --- a/src/crimson/osd/osd_operations/client_request.cc +++ b/src/crimson/osd/osd_operations/client_request.cc @@ -49,6 +49,11 @@ void ClientRequest::dump_detail(Formatter *f) const }, tracking_events); } +ConnectionPipeline &ClientRequest::get_connection_pipeline() +{ + return get_osd_priv(conn.get()).client_request_conn_pipeline; +} + ConnectionPipeline &ClientRequest::cp() { return get_osd_priv(conn.get()).client_request_conn_pipeline; @@ -72,105 +77,98 @@ bool ClientRequest::is_pg_op() const [](auto& op) { return ceph_osd_op_type_pg(op.op.op); }); } -template -ClientRequest::interruptible_future<> ClientRequest::with_sequencer(FuncT&& func) +seastar::future ClientRequest::with_pg_int( + ShardServices &shard_services, Ref pgref) { - return sequencer.start_op(*this, handle, osd.get_shard_services().registry, std::forward(func)); + epoch_t same_interval_since = pgref->get_interval_start_epoch(); + logger().debug("{} same_interval_since: {}", *this, same_interval_since); + if (m->finish_decode()) { + m->clear_payload(); + } + return interruptor::with_interruption( + [this, &shard_services, pgref]() mutable { + return sequencer.start_op( + *this, handle, shard_services.registry, + interruptor::wrap_function([pgref, this, &shard_services] { + PG &pg = *pgref; + if (pg.can_discard_op(*m)) { + return osd.send_incremental_map( + conn, m->get_map_epoch() + ).then([this, &shard_services] { + sequencer.finish_op_out_of_order(*this, shard_services.registry); + return interruptor::now(); + }); + } + return enter_stage(pp(pg).await_map + ).then_interruptible([this, &pg] { + return with_blocking_event< + PG_OSDMapGate::OSDMapBlocker::BlockingEvent + >([this, &pg] (auto&& trigger) { + return pg.osdmap_gate.wait_for_map(std::move(trigger), + m->get_min_epoch()); + }); + }).then_interruptible([this, &pg](auto map) { + return enter_stage(pp(pg).wait_for_active); + }).then_interruptible([this, &pg]() { + return with_blocking_event< + PGActivationBlocker::BlockingEvent + >([&pg] (auto&& trigger) { + return pg.wait_for_active_blocker.wait(std::move(trigger)); + }); + }).then_interruptible( + [this, &shard_services, pgref=std::move(pgref)]() mutable { + if (is_pg_op()) { + return process_pg_op( + pgref + ).then_interruptible([this, &shard_services] { + sequencer.finish_op_out_of_order(*this, shard_services.registry); + }); + } else { + return process_op( + pgref + ).then_interruptible([this, &shard_services](const seq_mode_t mode) { + if (mode == seq_mode_t::IN_ORDER) { + sequencer.finish_op_in_order(*this); + } else { + assert(mode == seq_mode_t::OUT_OF_ORDER); + sequencer.finish_op_out_of_order(*this, shard_services.registry); + } + }); + } + }); + })).then_interruptible([] { + return seastar::stop_iteration::yes; + }); + }, [this, pgref](std::exception_ptr eptr) { + if (should_abort_request(*this, std::move(eptr))) { + sequencer.abort(); + return seastar::stop_iteration::yes; + } else { + sequencer.maybe_reset(*this); + return seastar::stop_iteration::no; + } + }, pgref); } -seastar::future<> ClientRequest::start() +seastar::future<> ClientRequest::with_pg( + ShardServices &shard_services, Ref pgref) { - logger().debug("{}: start", *this); - - track_event(); - return seastar::repeat([this, opref=IRef{this}]() mutable { - logger().debug("{}: in repeat", *this); - return enter_stage<>(cp().await_map).then([this]() { - return with_blocking_event( - [this](auto&& trigger) { - return osd.osdmap_gate.wait_for_map(std::move(trigger), - m->get_min_epoch()); - }); - }).then([this](epoch_t epoch) { - return enter_stage<>(cp().get_pg); - }).then([this] { - return with_blocking_event( - [this] (auto&& trigger) { - return osd.wait_for_pg(std::move(trigger), m->get_spg()); - }); - }).then([this](Ref pgref) mutable { - return interruptor::with_interruption([this, pgref]() mutable { - epoch_t same_interval_since = pgref->get_interval_start_epoch(); - logger().debug("{} same_interval_since: {}", *this, same_interval_since); - if (m->finish_decode()) { - m->clear_payload(); - } - return with_sequencer(interruptor::wrap_function([pgref, this] { - PG &pg = *pgref; - if (pg.can_discard_op(*m)) { - return osd.send_incremental_map( - conn, m->get_map_epoch()).then([this] { - sequencer.finish_op_out_of_order(*this, osd.get_shard_services().registry); - return interruptor::now(); - }); - } - return enter_stage( - pp(pg).await_map - ).then_interruptible([this, &pg] { - return with_blocking_event( - [this, &pg] (auto&& trigger) { - return pg.osdmap_gate.wait_for_map(std::move(trigger), - m->get_min_epoch()); - }); - }).then_interruptible([this, &pg](auto&&) { - return enter_stage<>(pp(pg).wait_for_active); - }).then_interruptible([this, &pg]() { - return with_blocking_event( - [&pg] (auto&& trigger) { - return pg.wait_for_active_blocker.wait(std::move(trigger)); - }); - }).then_interruptible([this, pgref=std::move(pgref)]() mutable { - if (is_pg_op()) { - return process_pg_op(pgref).then_interruptible([this] { - sequencer.finish_op_out_of_order(*this, osd.get_shard_services().registry); - }); - } else { - return process_op(pgref).then_interruptible([this] (const seq_mode_t mode) { - if (mode == seq_mode_t::IN_ORDER) { - sequencer.finish_op_in_order(*this); - } else { - assert(mode == seq_mode_t::OUT_OF_ORDER); - sequencer.finish_op_out_of_order(*this, osd.get_shard_services().registry); - } - }); - } - }); - })).then_interruptible([pgref] { - return seastar::stop_iteration::yes; - }); - }, [this, pgref](std::exception_ptr eptr) { - if (should_abort_request(*this, std::move(eptr))) { - sequencer.abort(); - return seastar::stop_iteration::yes; - } else { - sequencer.maybe_reset(*this); - return seastar::stop_iteration::no; - } - }, pgref); - }); - }).then([this] { - track_event(); - }); + return seastar::repeat([this, &shard_services, pgref]() mutable { + return with_pg_int(shard_services, pgref); + }).then([this] { + track_event(); + }); } ClientRequest::interruptible_future<> ClientRequest::process_pg_op( Ref &pg) { - return pg->do_pg_ops(m) - .then_interruptible([this, pg=std::move(pg)](MURef reply) { - return conn->send(std::move(reply)); - }); + return pg->do_pg_ops( + m + ).then_interruptible([this, pg=std::move(pg)](MURef reply) { + return conn->send(std::move(reply)); + }); } ClientRequest::interruptible_future diff --git a/src/crimson/osd/osd_operations/client_request.h b/src/crimson/osd/osd_operations/client_request.h index a081f460955..21e09bc57cc 100644 --- a/src/crimson/osd/osd_operations/client_request.h +++ b/src/crimson/osd/osd_operations/client_request.h @@ -18,6 +18,7 @@ namespace crimson::osd { class PG; class OSD; +class ShardServices; class ClientRequest final : public PhasedOperationT, private CommonClientRequest { @@ -49,10 +50,21 @@ public: void print(std::ostream &) const final; void dump_detail(Formatter *f) const final; + static constexpr bool can_create() { return false; } + spg_t get_pgid() const { + return m->get_spg(); + } + ConnectionPipeline &get_connection_pipeline(); + PipelineHandle &get_handle() { return handle; } + epoch_t get_epoch() const { return m->get_min_epoch(); } + + seastar::future with_pg_int( + ShardServices &shard_services, Ref pg); public: - seastar::future<> start(); bool same_session_and_pg(const ClientRequest& other_op) const; + seastar::future<> with_pg( + ShardServices &shard_services, Ref pgref); private: template interruptible_future<> with_sequencer(FuncT&& func);