From: Yingxin Cheng Date: Mon, 25 Sep 2023 03:08:22 +0000 (+0800) Subject: crimson/osd: split ConnectionPipeline::get_pg into 2 phases X-Git-Tag: v19.0.0~141^2~3 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=c3c29c50035d3d5d0fd67a94c7cf80578aec083e;p=ceph.git crimson/osd: split ConnectionPipeline::get_pg into 2 phases Split the cross-core phase into 2 independent core-local phases, and preserve the ordering using sequential ID instead. Signed-off-by: Yingxin Cheng --- diff --git a/src/crimson/osd/osd_connection_priv.h b/src/crimson/osd/osd_connection_priv.h index 99f394b1e83f..2d2a459017bb 100644 --- a/src/crimson/osd/osd_connection_priv.h +++ b/src/crimson/osd/osd_connection_priv.h @@ -3,6 +3,8 @@ #pragma once +#include + #include "crimson/net/Connection.h" #include "crimson/osd/osd_operation.h" #include "crimson/osd/osd_operations/client_request.h" @@ -11,10 +13,79 @@ namespace crimson::osd { +/** + * crosscore_ordering_t + * + * To preserve the event order from 1 source to n target cores. + */ +class crosscore_ordering_t { +public: + using seq_t = uint64_t; + + crosscore_ordering_t() + : out_seqs(seastar::smp::count, 0), + in_controls(seastar::smp::count) {} + + ~crosscore_ordering_t() = default; + + // Called by the original core to get the ordering sequence + seq_t prepare_submit(core_id_t target_core) { + auto &out_seq = out_seqs[target_core]; + ++out_seq; + return out_seq; + } + + /* + * Called by the target core to preserve the ordering + */ + + seq_t get_in_seq() const { + auto core = seastar::this_shard_id(); + return in_controls[core].seq; + } + + bool proceed_or_wait(seq_t seq) { + auto core = seastar::this_shard_id(); + auto &in_control = in_controls[core]; + if (seq == in_control.seq + 1) { + ++in_control.seq; + if (unlikely(in_control.pr_wait.has_value())) { + in_control.pr_wait->set_value(); + in_control.pr_wait = std::nullopt; + } + return true; + } else { + return false; + } + } + + seastar::future<> wait(seq_t seq) { + auto core = seastar::this_shard_id(); + auto &in_control = in_controls[core]; + assert(seq != in_control.seq + 1); + if (!in_control.pr_wait.has_value()) { + in_control.pr_wait = seastar::shared_promise<>(); + } + return in_control.pr_wait->get_shared_future(); + } + +private: + struct in_control_t { + seq_t seq = 0; + std::optional> pr_wait; + }; + + // source-side + std::vector out_seqs; + // target-side + std::vector in_controls; +}; + struct OSDConnectionPriv : public crimson::net::Connection::user_private_t { ConnectionPipeline client_request_conn_pipeline; ConnectionPipeline peering_request_conn_pipeline; ConnectionPipeline replicated_request_conn_pipeline; + crosscore_ordering_t crosscore_ordering; }; static inline OSDConnectionPriv &get_osd_priv(crimson::net::Connection *conn) { diff --git a/src/crimson/osd/osd_operation.h b/src/crimson/osd/osd_operation.h index 8ef44ee9e789..7174143fe01e 100644 --- a/src/crimson/osd/osd_operation.h +++ b/src/crimson/osd/osd_operation.h @@ -27,10 +27,17 @@ struct ConnectionPipeline { "ConnectionPipeline::await_map"; } await_map; - struct GetPG : OrderedExclusivePhaseT { + struct GetPGMapping : OrderedExclusivePhaseT { static constexpr auto type_name = - "ConnectionPipeline::get_pg"; - } get_pg; + "ConnectionPipeline::get_pg_mapping"; + } get_pg_mapping; +}; + +struct PerShardPipeline { + struct CreateOrWaitPG : OrderedExclusivePhaseT { + static constexpr auto type_name = + "PerShardPipeline::create_or_wait_pg"; + } create_or_wait_pg; }; enum class OperationTypeCode { diff --git a/src/crimson/osd/osd_operation_external_tracking.h b/src/crimson/osd/osd_operation_external_tracking.h index 4b6dbf4b7100..d5e2ed453284 100644 --- a/src/crimson/osd/osd_operation_external_tracking.h +++ b/src/crimson/osd/osd_operation_external_tracking.h @@ -22,7 +22,8 @@ struct LttngBackend : ClientRequest::StartEvent::Backend, ConnectionPipeline::AwaitActive::BlockingEvent::Backend, ConnectionPipeline::AwaitMap::BlockingEvent::Backend, - ConnectionPipeline::GetPG::BlockingEvent::Backend, + ConnectionPipeline::GetPGMapping::BlockingEvent::Backend, + PerShardPipeline::CreateOrWaitPG::BlockingEvent::Backend, OSD_OSDMapGate::OSDMapBlocker::BlockingEvent::Backend, PGMap::PGCreationBlockingEvent::Backend, ClientRequest::PGPipeline::AwaitMap::BlockingEvent::Backend, @@ -55,9 +56,14 @@ struct LttngBackend const OSD_OSDMapGate::OSDMapBlocker&) override { } - void handle(ConnectionPipeline::GetPG::BlockingEvent& ev, + void handle(ConnectionPipeline::GetPGMapping::BlockingEvent& ev, const Operation& op, - const ConnectionPipeline::GetPG& blocker) override { + const ConnectionPipeline::GetPGMapping& blocker) override { + } + + void handle(PerShardPipeline::CreateOrWaitPG::BlockingEvent& ev, + const Operation& op, + const PerShardPipeline::CreateOrWaitPG& blocker) override { } void handle(PGMap::PGCreationBlockingEvent&, @@ -122,7 +128,8 @@ struct HistoricBackend : ClientRequest::StartEvent::Backend, ConnectionPipeline::AwaitActive::BlockingEvent::Backend, ConnectionPipeline::AwaitMap::BlockingEvent::Backend, - ConnectionPipeline::GetPG::BlockingEvent::Backend, + ConnectionPipeline::GetPGMapping::BlockingEvent::Backend, + PerShardPipeline::CreateOrWaitPG::BlockingEvent::Backend, OSD_OSDMapGate::OSDMapBlocker::BlockingEvent::Backend, PGMap::PGCreationBlockingEvent::Backend, ClientRequest::PGPipeline::AwaitMap::BlockingEvent::Backend, @@ -155,9 +162,14 @@ struct HistoricBackend const OSD_OSDMapGate::OSDMapBlocker&) override { } - void handle(ConnectionPipeline::GetPG::BlockingEvent& ev, + void handle(ConnectionPipeline::GetPGMapping::BlockingEvent& ev, + const Operation& op, + const ConnectionPipeline::GetPGMapping& blocker) override { + } + + void handle(PerShardPipeline::CreateOrWaitPG::BlockingEvent& ev, const Operation& op, - const ConnectionPipeline::GetPG& blocker) override { + const PerShardPipeline::CreateOrWaitPG& blocker) override { } void handle(PGMap::PGCreationBlockingEvent&, diff --git a/src/crimson/osd/osd_operations/client_request.cc b/src/crimson/osd/osd_operations/client_request.cc index f01f0c491f1a..287072642953 100644 --- a/src/crimson/osd/osd_operations/client_request.cc +++ b/src/crimson/osd/osd_operations/client_request.cc @@ -81,6 +81,12 @@ ConnectionPipeline &ClientRequest::get_connection_pipeline() return get_osd_priv(conn.get()).client_request_conn_pipeline; } +PerShardPipeline &ClientRequest::get_pershard_pipeline( + ShardServices &shard_services) +{ + return shard_services.get_client_request_pipeline(); +} + ClientRequest::PGPipeline &ClientRequest::client_pp(PG &pg) { return pg.request_pg_pipeline; diff --git a/src/crimson/osd/osd_operations/client_request.h b/src/crimson/osd/osd_operations/client_request.h index b2dce1e873e1..d534fd6ac4fa 100644 --- a/src/crimson/osd/osd_operations/client_request.h +++ b/src/crimson/osd/osd_operations/client_request.h @@ -81,7 +81,8 @@ public: ConnectionPipeline::AwaitActive::BlockingEvent, ConnectionPipeline::AwaitMap::BlockingEvent, OSD_OSDMapGate::OSDMapBlocker::BlockingEvent, - ConnectionPipeline::GetPG::BlockingEvent, + ConnectionPipeline::GetPGMapping::BlockingEvent, + PerShardPipeline::CreateOrWaitPG::BlockingEvent, PGMap::PGCreationBlockingEvent, CompletionEvent > tracking_events; @@ -206,6 +207,14 @@ public: epoch_t get_epoch() const { return m->get_min_epoch(); } ConnectionPipeline &get_connection_pipeline(); + + PerShardPipeline &get_pershard_pipeline(ShardServices &); + + crimson::net::Connection &get_connection() { + assert(conn); + return *conn; + }; + seastar::future prepare_remote_submission() { assert(conn); return conn.get_foreign( diff --git a/src/crimson/osd/osd_operations/logmissing_request.cc b/src/crimson/osd/osd_operations/logmissing_request.cc index ee83977cd8a2..7c8e1d7e499a 100644 --- a/src/crimson/osd/osd_operations/logmissing_request.cc +++ b/src/crimson/osd/osd_operations/logmissing_request.cc @@ -49,6 +49,12 @@ ConnectionPipeline &LogMissingRequest::get_connection_pipeline() return get_osd_priv(conn.get()).replicated_request_conn_pipeline; } +PerShardPipeline &LogMissingRequest::get_pershard_pipeline( + ShardServices &shard_services) +{ + return shard_services.get_replicated_request_pipeline(); +} + ClientRequest::PGPipeline &LogMissingRequest::client_pp(PG &pg) { return pg.request_pg_pipeline; diff --git a/src/crimson/osd/osd_operations/logmissing_request.h b/src/crimson/osd/osd_operations/logmissing_request.h index 71d0816fd201..5b01fee17b86 100644 --- a/src/crimson/osd/osd_operations/logmissing_request.h +++ b/src/crimson/osd/osd_operations/logmissing_request.h @@ -38,6 +38,14 @@ public: epoch_t get_epoch() const { return req->get_min_epoch(); } ConnectionPipeline &get_connection_pipeline(); + + PerShardPipeline &get_pershard_pipeline(ShardServices &); + + crimson::net::Connection &get_connection() { + assert(conn); + return *conn; + }; + seastar::future prepare_remote_submission() { assert(conn); return conn.get_foreign( @@ -58,7 +66,8 @@ public: StartEvent, ConnectionPipeline::AwaitActive::BlockingEvent, ConnectionPipeline::AwaitMap::BlockingEvent, - ConnectionPipeline::GetPG::BlockingEvent, + ConnectionPipeline::GetPGMapping::BlockingEvent, + PerShardPipeline::CreateOrWaitPG::BlockingEvent, ClientRequest::PGPipeline::AwaitMap::BlockingEvent, PG_OSDMapGate::OSDMapBlocker::BlockingEvent, PGMap::PGCreationBlockingEvent, diff --git a/src/crimson/osd/osd_operations/logmissing_request_reply.cc b/src/crimson/osd/osd_operations/logmissing_request_reply.cc index 16e61ab4a985..5cfe5b215307 100644 --- a/src/crimson/osd/osd_operations/logmissing_request_reply.cc +++ b/src/crimson/osd/osd_operations/logmissing_request_reply.cc @@ -49,6 +49,12 @@ ConnectionPipeline &LogMissingRequestReply::get_connection_pipeline() return get_osd_priv(conn.get()).replicated_request_conn_pipeline; } +PerShardPipeline &LogMissingRequestReply::get_pershard_pipeline( + ShardServices &shard_services) +{ + return shard_services.get_replicated_request_pipeline(); +} + ClientRequest::PGPipeline &LogMissingRequestReply::client_pp(PG &pg) { return pg.request_pg_pipeline; diff --git a/src/crimson/osd/osd_operations/logmissing_request_reply.h b/src/crimson/osd/osd_operations/logmissing_request_reply.h index c89131fec1d7..b01cae15421d 100644 --- a/src/crimson/osd/osd_operations/logmissing_request_reply.h +++ b/src/crimson/osd/osd_operations/logmissing_request_reply.h @@ -38,6 +38,14 @@ public: epoch_t get_epoch() const { return req->get_min_epoch(); } ConnectionPipeline &get_connection_pipeline(); + + PerShardPipeline &get_pershard_pipeline(ShardServices &); + + crimson::net::Connection &get_connection() { + assert(conn); + return *conn; + }; + seastar::future prepare_remote_submission() { assert(conn); return conn.get_foreign( @@ -58,7 +66,8 @@ public: StartEvent, ConnectionPipeline::AwaitActive::BlockingEvent, ConnectionPipeline::AwaitMap::BlockingEvent, - ConnectionPipeline::GetPG::BlockingEvent, + ConnectionPipeline::GetPGMapping::BlockingEvent, + PerShardPipeline::CreateOrWaitPG::BlockingEvent, PGMap::PGCreationBlockingEvent, OSD_OSDMapGate::OSDMapBlocker::BlockingEvent > tracking_events; diff --git a/src/crimson/osd/osd_operations/peering_event.cc b/src/crimson/osd/osd_operations/peering_event.cc index 0712147ab2b7..9139e337f80a 100644 --- a/src/crimson/osd/osd_operations/peering_event.cc +++ b/src/crimson/osd/osd_operations/peering_event.cc @@ -128,6 +128,12 @@ ConnectionPipeline &RemotePeeringEvent::get_connection_pipeline() return get_osd_priv(conn.get()).peering_request_conn_pipeline; } +PerShardPipeline &RemotePeeringEvent::get_pershard_pipeline( + ShardServices &shard_services) +{ + return shard_services.get_peering_request_pipeline(); +} + void RemotePeeringEvent::on_pg_absent(ShardServices &shard_services) { if (auto& e = get_event().get_event(); diff --git a/src/crimson/osd/osd_operations/peering_event.h b/src/crimson/osd/osd_operations/peering_event.h index a780a26768ef..6bbfe6c91174 100644 --- a/src/crimson/osd/osd_operations/peering_event.h +++ b/src/crimson/osd/osd_operations/peering_event.h @@ -131,7 +131,8 @@ public: ConnectionPipeline::AwaitActive::BlockingEvent, ConnectionPipeline::AwaitMap::BlockingEvent, OSD_OSDMapGate::OSDMapBlocker::BlockingEvent, - ConnectionPipeline::GetPG::BlockingEvent, + ConnectionPipeline::GetPGMapping::BlockingEvent, + PerShardPipeline::CreateOrWaitPG::BlockingEvent, PGMap::PGCreationBlockingEvent, PGPeeringPipeline::AwaitMap::BlockingEvent, PG_OSDMapGate::OSDMapBlocker::BlockingEvent, @@ -148,6 +149,14 @@ public: epoch_t get_epoch() const { return evt.get_epoch_sent(); } ConnectionPipeline &get_connection_pipeline(); + + PerShardPipeline &get_pershard_pipeline(ShardServices &); + + crimson::net::Connection &get_connection() { + assert(conn); + return *conn; + }; + seastar::future prepare_remote_submission() { assert(conn); return conn.get_foreign( diff --git a/src/crimson/osd/osd_operations/recovery_subrequest.cc b/src/crimson/osd/osd_operations/recovery_subrequest.cc index dd310d8d7274..2e939880cbea 100644 --- a/src/crimson/osd/osd_operations/recovery_subrequest.cc +++ b/src/crimson/osd/osd_operations/recovery_subrequest.cc @@ -49,4 +49,10 @@ ConnectionPipeline &RecoverySubRequest::get_connection_pipeline() return get_osd_priv(conn.get()).peering_request_conn_pipeline; } +PerShardPipeline &RecoverySubRequest::get_pershard_pipeline( + ShardServices &shard_services) +{ + return shard_services.get_peering_request_pipeline(); +} + } diff --git a/src/crimson/osd/osd_operations/recovery_subrequest.h b/src/crimson/osd/osd_operations/recovery_subrequest.h index 07c7c95b5e0f..31e6045cb0ee 100644 --- a/src/crimson/osd/osd_operations/recovery_subrequest.h +++ b/src/crimson/osd/osd_operations/recovery_subrequest.h @@ -41,6 +41,14 @@ public: epoch_t get_epoch() const { return m->get_min_epoch(); } ConnectionPipeline &get_connection_pipeline(); + + PerShardPipeline &get_pershard_pipeline(ShardServices &); + + crimson::net::Connection &get_connection() { + assert(conn); + return *conn; + }; + seastar::future prepare_remote_submission() { assert(conn); return conn.get_foreign( @@ -61,7 +69,8 @@ public: StartEvent, ConnectionPipeline::AwaitActive::BlockingEvent, ConnectionPipeline::AwaitMap::BlockingEvent, - ConnectionPipeline::GetPG::BlockingEvent, + ConnectionPipeline::GetPGMapping::BlockingEvent, + PerShardPipeline::CreateOrWaitPG::BlockingEvent, PGMap::PGCreationBlockingEvent, OSD_OSDMapGate::OSDMapBlocker::BlockingEvent, CompletionEvent diff --git a/src/crimson/osd/osd_operations/replicated_request.cc b/src/crimson/osd/osd_operations/replicated_request.cc index 7e16b2ebd06a..7b8592b1e02d 100644 --- a/src/crimson/osd/osd_operations/replicated_request.cc +++ b/src/crimson/osd/osd_operations/replicated_request.cc @@ -49,6 +49,12 @@ ConnectionPipeline &RepRequest::get_connection_pipeline() return get_osd_priv(conn.get()).replicated_request_conn_pipeline; } +PerShardPipeline &RepRequest::get_pershard_pipeline( + ShardServices &shard_services) +{ + return shard_services.get_replicated_request_pipeline(); +} + ClientRequest::PGPipeline &RepRequest::client_pp(PG &pg) { return pg.request_pg_pipeline; diff --git a/src/crimson/osd/osd_operations/replicated_request.h b/src/crimson/osd/osd_operations/replicated_request.h index c742888d9390..32cf271788ba 100644 --- a/src/crimson/osd/osd_operations/replicated_request.h +++ b/src/crimson/osd/osd_operations/replicated_request.h @@ -38,6 +38,14 @@ public: epoch_t get_epoch() const { return req->get_min_epoch(); } ConnectionPipeline &get_connection_pipeline(); + + PerShardPipeline &get_pershard_pipeline(ShardServices &); + + crimson::net::Connection &get_connection() { + assert(conn); + return *conn; + }; + seastar::future prepare_remote_submission() { assert(conn); return conn.get_foreign( @@ -58,7 +66,8 @@ public: StartEvent, ConnectionPipeline::AwaitActive::BlockingEvent, ConnectionPipeline::AwaitMap::BlockingEvent, - ConnectionPipeline::GetPG::BlockingEvent, + ConnectionPipeline::GetPGMapping::BlockingEvent, + PerShardPipeline::CreateOrWaitPG::BlockingEvent, ClientRequest::PGPipeline::AwaitMap::BlockingEvent, PG_OSDMapGate::OSDMapBlocker::BlockingEvent, PGMap::PGCreationBlockingEvent, diff --git a/src/crimson/osd/pg_shard_manager.h b/src/crimson/osd/pg_shard_manager.h index e080dc43e4ad..cf13cb52bbf7 100644 --- a/src/crimson/osd/pg_shard_manager.h +++ b/src/crimson/osd/pg_shard_manager.h @@ -7,6 +7,7 @@ #include #include +#include "crimson/osd/osd_connection_priv.h" #include "crimson/osd/shard_services.h" #include "crimson/osd/pg_map.h" @@ -148,6 +149,31 @@ public: }); } + template + auto process_ordered_op_remotely( + crosscore_ordering_t::seq_t cc_seq, + ShardServices &target_shard_services, + typename T::IRef &&op, + F &&f) { + auto &crosscore_ordering = get_osd_priv(&op->get_connection()).crosscore_ordering; + if (crosscore_ordering.proceed_or_wait(cc_seq)) { + return std::invoke( + std::move(f), + target_shard_services, + std::move(op)); + } else { + auto &logger = crimson::get_logger(ceph_subsys_osd); + logger.debug("{} got {} at the remote pg, wait at {}", + *op, cc_seq, crosscore_ordering.get_in_seq()); + return crosscore_ordering.wait(cc_seq + ).then([this, cc_seq, &target_shard_services, + op=std::move(op), f=std::move(f)]() mutable { + return this->template process_ordered_op_remotely( + cc_seq, target_shard_services, std::move(op), std::move(f)); + }); + } + } + template auto with_remote_shard_state_and_op( core_id_t core, @@ -161,21 +187,28 @@ public: target_shard_services, std::move(op)); } + // Note: the ordering in only preserved until f is invoked. auto &opref = *op; - get_local_state().registry.remove_from_registry(opref); - return opref.prepare_remote_submission( - ).then([op=std::move(op), f=std::move(f), this, core - ](auto f_conn) mutable { + auto &crosscore_ordering = get_osd_priv(&opref.get_connection()).crosscore_ordering; + auto cc_seq = crosscore_ordering.prepare_submit(core); + auto &logger = crimson::get_logger(ceph_subsys_osd); + logger.debug("{}: send {} to the remote pg core {}", + opref, cc_seq, core); + return opref.get_handle().complete( + ).then([&opref, this] { + get_local_state().registry.remove_from_registry(opref); + return opref.prepare_remote_submission(); + }).then([op=std::move(op), f=std::move(f), this, core, cc_seq + ](auto f_conn) mutable { return shard_services.invoke_on( core, - [f=std::move(f), op=std::move(op), f_conn=std::move(f_conn) + [this, cc_seq, + f=std::move(f), op=std::move(op), f_conn=std::move(f_conn) ](auto &target_shard_services) mutable { op->finish_remote_submission(std::move(f_conn)); target_shard_services.local_state.registry.add_to_registry(*op); - return std::invoke( - std::move(f), - target_shard_services, - std::move(op)); + return this->template process_ordered_op_remotely( + cc_seq, target_shard_services, std::move(op), std::move(f)); }); }); } @@ -335,9 +368,9 @@ public: &get_shard_services()); }); }).then([&logger, &opref](auto epoch) { - logger.debug("{}: got map {}, entering get_pg", opref, epoch); + logger.debug("{}: got map {}, entering get_pg_mapping", opref, epoch); return opref.template enter_stage<>( - opref.get_connection_pipeline().get_pg); + opref.get_connection_pipeline().get_pg_mapping); }).then([this, &opref] { return get_pg_to_shard_mapping().maybe_create_pg(opref.get_pgid()); }).then_wrapped([this, &logger, op=std::move(op)](auto fut) mutable { @@ -353,14 +386,21 @@ public: return this->template with_remote_shard_state_and_op( core, std::move(op), [this](ShardServices &target_shard_services, - typename T::IRef op) { - if constexpr (T::can_create()) { - return this->template run_with_pg_maybe_create( - std::move(op), target_shard_services); - } else { - return this->template run_with_pg_maybe_wait( - std::move(op), target_shard_services); - } + typename T::IRef op) { + auto &opref = *op; + auto &logger = crimson::get_logger(ceph_subsys_osd); + logger.debug("{}: entering create_or_wait_pg", opref); + return opref.template enter_stage<>( + opref.get_pershard_pipeline(target_shard_services).create_or_wait_pg + ).then([this, &target_shard_services, op=std::move(op)]() mutable { + if constexpr (T::can_create()) { + return this->template run_with_pg_maybe_create( + std::move(op), target_shard_services); + } else { + return this->template run_with_pg_maybe_wait( + std::move(op), target_shard_services); + } + }); }); }); return std::make_pair(id, std::move(fut)); diff --git a/src/crimson/osd/shard_services.h b/src/crimson/osd/shard_services.h index 8786ec9626fd..d71513a6645e 100644 --- a/src/crimson/osd/shard_services.h +++ b/src/crimson/osd/shard_services.h @@ -70,6 +70,10 @@ class PerShardState { OSDState &osd_state; OSD_OSDMapGate osdmap_gate; + PerShardPipeline client_request_pipeline; + PerShardPipeline peering_request_pipeline; + PerShardPipeline replicated_request_pipeline; + PerfCounters *perf = nullptr; PerfCounters *recoverystate_perf = nullptr; @@ -453,6 +457,18 @@ public: return dispatch_context({}, std::move(ctx)); } + PerShardPipeline &get_client_request_pipeline() { + return local_state.client_request_pipeline; + } + + PerShardPipeline &get_peering_request_pipeline() { + return local_state.peering_request_pipeline; + } + + PerShardPipeline &get_replicated_request_pipeline() { + return local_state.replicated_request_pipeline; + } + /// Return per-core tid ceph_tid_t get_tid() { return local_state.get_tid(); }