From: Yingxin Cheng Date: Mon, 11 Dec 2023 06:38:51 +0000 (+0800) Subject: crimson/osd: drop a foreign-copy to shard-0 for every pg operation X-Git-Tag: v19.1.0~438^2~1 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=3eb3327b848e511ea52e58669bbfa668093cdfcb;p=ceph.git crimson/osd: drop a foreign-copy to shard-0 for every pg operation By using ConnectionRef before pg submission, and after that, change to use ConnectionXcoreRef. The intent is to drop the foreign copy of the connection to shard 0 at pg submission time. This should remove two pairs of crosscore communications in shard 0 for each I/O, one for connection-ref foreign copy, another for connection-ref destruction. Signed-off-by: Yingxin Cheng --- diff --git a/src/crimson/common/local_shared_foreign_ptr.h b/src/crimson/common/local_shared_foreign_ptr.h index c4bd1099a0295..675442273e5b1 100644 --- a/src/crimson/common/local_shared_foreign_ptr.h +++ b/src/crimson/common/local_shared_foreign_ptr.h @@ -27,9 +27,6 @@ namespace crimson { */ template class local_shared_foreign_ptr { - using element_type = typename std::pointer_traits::element_type; - using pointer = element_type*; - seastar::lw_shared_ptr> ptr; /// Wraps a pointer object and remembers the current core. @@ -43,6 +40,9 @@ class local_shared_foreign_ptr { seastar::foreign_ptr &&); public: + using element_type = typename std::pointer_traits::element_type; + using pointer = element_type*; + /// Constructs a null local_shared_foreign_ptr<>. local_shared_foreign_ptr() = default; diff --git a/src/crimson/net/Fwd.h b/src/crimson/net/Fwd.h index 3a56cf5bb0a63..ad8eedd47773d 100644 --- a/src/crimson/net/Fwd.h +++ b/src/crimson/net/Fwd.h @@ -38,6 +38,8 @@ class Connection; using ConnectionLRef = seastar::shared_ptr; using ConnectionFRef = seastar::foreign_ptr; using ConnectionRef = ::crimson::local_shared_foreign_ptr; +using ConnectionFFRef = seastar::foreign_ptr; +using ConnectionXcoreRef = ::crimson::local_shared_foreign_ptr; class Dispatcher; class ChainedDispatchers; diff --git a/src/crimson/osd/ops_executer.cc b/src/crimson/osd/ops_executer.cc index 034fdde716935..4e2e1861f9994 100644 --- a/src/crimson/osd/ops_executer.cc +++ b/src/crimson/osd/ops_executer.cc @@ -160,13 +160,13 @@ OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_watch_subop_watch( logger().debug("{}", __func__); struct connect_ctx_t { ObjectContext::watch_key_t key; - crimson::net::ConnectionRef conn; + crimson::net::ConnectionXcoreRef conn; watch_info_t info; connect_ctx_t( const OSDOp& osd_op, const ExecutableMessage& msg, - crimson::net::ConnectionRef conn) + crimson::net::ConnectionXcoreRef conn) : key(osd_op.op.watch.cookie, msg.get_reqid().name), conn(conn), info(create_watch_info(osd_op, msg, conn->get_peer_addr())) { @@ -323,13 +323,13 @@ OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_notify( return crimson::ct_error::enoent::make(); } struct notify_ctx_t { - crimson::net::ConnectionRef conn; + crimson::net::ConnectionXcoreRef conn; notify_info_t ninfo; const uint64_t client_gid; const epoch_t epoch; notify_ctx_t(const ExecutableMessage& msg, - crimson::net::ConnectionRef conn) + crimson::net::ConnectionXcoreRef conn) : conn(conn), client_gid(msg.get_reqid().name.num()), epoch(msg.get_map_epoch()) { @@ -1067,7 +1067,7 @@ OpsExecuter::OpsExecuter(Ref pg, ObjectContextRef _obc, const OpInfo& op_info, abstracted_msg_t&& msg, - crimson::net::ConnectionRef conn, + crimson::net::ConnectionXcoreRef conn, const SnapContext& _snapc) : pg(std::move(pg)), obc(std::move(_obc)), diff --git a/src/crimson/osd/ops_executer.h b/src/crimson/osd/ops_executer.h index 1230b1c5a2e58..556e2f3b94a35 100644 --- a/src/crimson/osd/ops_executer.h +++ b/src/crimson/osd/ops_executer.h @@ -113,10 +113,10 @@ public: class ExecutableMessagePimpl final : ExecutableMessage { const ImplT* pimpl; // In crimson, conn is independently maintained outside Message. - const crimson::net::ConnectionRef conn; + const crimson::net::ConnectionXcoreRef conn; public: ExecutableMessagePimpl(const ImplT* pimpl, - const crimson::net::ConnectionRef conn) + const crimson::net::ConnectionXcoreRef conn) : pimpl(pimpl), conn(conn) { } @@ -185,7 +185,7 @@ private: ceph::static_ptr)>; abstracted_msg_t msg; - crimson::net::ConnectionRef conn; + crimson::net::ConnectionXcoreRef conn; std::optional osd_op_params; bool user_modify = false; ceph::os::Transaction txn; @@ -372,7 +372,7 @@ private: ObjectContextRef obc, const OpInfo& op_info, abstracted_msg_t&& msg, - crimson::net::ConnectionRef conn, + crimson::net::ConnectionXcoreRef conn, const SnapContext& snapc); public: @@ -381,7 +381,7 @@ public: ObjectContextRef obc, const OpInfo& op_info, const MsgT& msg, - crimson::net::ConnectionRef conn, + crimson::net::ConnectionXcoreRef conn, const SnapContext& snapc) : OpsExecuter( std::move(pg), diff --git a/src/crimson/osd/osd_operations/client_request.cc b/src/crimson/osd/osd_operations/client_request.cc index d1eb9fbb350dd..460f413efe381 100644 --- a/src/crimson/osd/osd_operations/client_request.cc +++ b/src/crimson/osd/osd_operations/client_request.cc @@ -49,7 +49,7 @@ ClientRequest::ClientRequest( ShardServices &_shard_services, crimson::net::ConnectionRef conn, Ref &&m) : shard_services(&_shard_services), - conn(std::move(conn)), + l_conn(std::move(conn)), m(std::move(m)), instance_handle(new instance_handle_t) {} @@ -76,7 +76,8 @@ void ClientRequest::dump_detail(Formatter *f) const ConnectionPipeline &ClientRequest::get_connection_pipeline() { - return get_osd_priv(conn.get()).client_request_conn_pipeline; + return get_osd_priv(&get_local_connection() + ).client_request_conn_pipeline; } PerShardPipeline &ClientRequest::get_pershard_pipeline( @@ -117,7 +118,7 @@ seastar::future<> ClientRequest::with_pg_int(Ref pgref) PG &pg = *pgref; if (pg.can_discard_op(*m)) { return shard_services->send_incremental_map( - std::ref(*conn), m->get_map_epoch() + std::ref(get_foreign_connection()), m->get_map_epoch() ).then([FNAME, this, this_instance_id, pgref] { DEBUGDPP("{}: discarding {}", *pgref, *this, this_instance_id); pgref->client_request_orderer.remove_request(*this); @@ -196,7 +197,7 @@ ClientRequest::process_pg_op( m ).then_interruptible([this, pg=std::move(pg)](MURef reply) { // TODO: gate the crosscore sending - return conn->send_with_throttling(std::move(reply)); + return get_foreign_connection().send_with_throttling(std::move(reply)); }); } @@ -211,7 +212,7 @@ auto ClientRequest::reply_op_error(const Ref& pg, int err) reply->set_reply_versions(eversion_t(), 0); reply->set_op_returns(std::vector{}); // TODO: gate the crosscore sending - return conn->send_with_throttling(std::move(reply)); + return get_foreign_connection().send_with_throttling(std::move(reply)); } ClientRequest::interruptible_future<> @@ -237,7 +238,7 @@ ClientRequest::process_op( CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK, false); reply->set_reply_versions(completed->version, completed->user_version); // TODO: gate the crosscore sending - return conn->send_with_throttling(std::move(reply)); + return get_foreign_connection().send_with_throttling(std::move(reply)); } else { DEBUGDPP("{}.{}: not completed, entering get_obc stage", *pg, *this, this_instance_id); @@ -323,9 +324,10 @@ ClientRequest::do_process( return reply_op_error(pg, -ENAMETOOLONG); } else if (m->get_hobj().oid.name.empty()) { return reply_op_error(pg, -EINVAL); - } else if (pg->get_osdmap()->is_blocklisted(conn->get_peer_addr())) { + } else if (pg->get_osdmap()->is_blocklisted( + get_foreign_connection().get_peer_addr())) { DEBUGDPP("{}.{}: {} is blocklisted", - *pg, *this, this_instance_id, conn->get_peer_addr()); + *pg, *this, this_instance_id, get_foreign_connection().get_peer_addr()); return reply_op_error(pg, -EBLOCKLISTED); } @@ -361,7 +363,9 @@ ClientRequest::do_process( *pg, *this, this_instance_id, m->get_hobj()); } } - return pg->do_osd_ops(m, conn, obc, op_info, snapc).safe_then_unpack_interruptible( + return pg->do_osd_ops( + m, r_conn, obc, op_info, snapc + ).safe_then_unpack_interruptible( [FNAME, this, pg, this_instance_id, &ihref]( auto submitted, auto all_completed) mutable { return submitted.then_interruptible( @@ -379,7 +383,9 @@ ClientRequest::do_process( reply=std::move(reply)]() mutable { DEBUGDPP("{}.{}: sending response", *pg, *this, this_instance_id); - return conn->send(std::move(reply)); + // TODO: gate the crosscore sending + return get_foreign_connection( + ).send_with_throttling(std::move(reply)); }); }, crimson::ct_error::eagain::handle( [this, pg, this_instance_id, &ihref]() mutable { diff --git a/src/crimson/osd/osd_operations/client_request.h b/src/crimson/osd/osd_operations/client_request.h index b374aacbbe940..8bf396232a665 100644 --- a/src/crimson/osd/osd_operations/client_request.h +++ b/src/crimson/osd/osd_operations/client_request.h @@ -32,7 +32,9 @@ class ClientRequest final : public PhasedOperationT, // Initially set to primary core, updated to pg core after with_pg() ShardServices *shard_services = nullptr; - crimson::net::ConnectionRef conn; + crimson::net::ConnectionRef l_conn; + crimson::net::ConnectionXcoreRef r_conn; + // must be after conn due to ConnectionPipeline's life-time Ref m; OpInfo op_info; @@ -224,22 +226,31 @@ public: PerShardPipeline &get_pershard_pipeline(ShardServices &); - crimson::net::Connection &get_connection() { - assert(conn); - return *conn; + crimson::net::Connection &get_local_connection() { + assert(l_conn); + assert(!r_conn); + return *l_conn; }; - seastar::future prepare_remote_submission() { - assert(conn); - return conn.get_foreign( - ).then([this](auto f_conn) { - conn.reset(); - return f_conn; - }); + crimson::net::Connection &get_foreign_connection() { + assert(r_conn); + assert(!l_conn); + return *r_conn; + }; + + crimson::net::ConnectionFFRef prepare_remote_submission() { + assert(l_conn); + assert(!r_conn); + auto ret = seastar::make_foreign(std::move(l_conn)); + l_conn.reset(); + return ret; } - void finish_remote_submission(crimson::net::ConnectionFRef _conn) { - assert(!conn); - conn = make_local_shared_foreign(std::move(_conn)); + + void finish_remote_submission(crimson::net::ConnectionFFRef conn) { + assert(conn); + assert(!l_conn); + assert(!r_conn); + r_conn = make_local_shared_foreign(std::move(conn)); } seastar::future<> with_pg_int(Ref pg); diff --git a/src/crimson/osd/osd_operations/logmissing_request.cc b/src/crimson/osd/osd_operations/logmissing_request.cc index 5f92ccddfcdc2..7e979131f0604 100644 --- a/src/crimson/osd/osd_operations/logmissing_request.cc +++ b/src/crimson/osd/osd_operations/logmissing_request.cc @@ -22,7 +22,7 @@ namespace crimson::osd { LogMissingRequest::LogMissingRequest(crimson::net::ConnectionRef&& conn, Ref &&req) - : conn{std::move(conn)}, + : l_conn{std::move(conn)}, req{std::move(req)} {} @@ -48,7 +48,8 @@ void LogMissingRequest::dump_detail(Formatter *f) const ConnectionPipeline &LogMissingRequest::get_connection_pipeline() { - return get_osd_priv(conn.get()).replicated_request_conn_pipeline; + return get_osd_priv(&get_local_connection() + ).client_request_conn_pipeline; } PerShardPipeline &LogMissingRequest::get_pershard_pipeline( @@ -81,7 +82,7 @@ seastar::future<> LogMissingRequest::with_pg( std::move(trigger), req->min_epoch); }); }).then_interruptible([this, pg](auto) { - return pg->do_update_log_missing(req, conn); + return pg->do_update_log_missing(req, r_conn); }).then_interruptible([this] { logger().debug("{}: complete", *this); return handle.complete(); diff --git a/src/crimson/osd/osd_operations/logmissing_request.h b/src/crimson/osd/osd_operations/logmissing_request.h index 5b01fee17b868..51c9d540cb5ac 100644 --- a/src/crimson/osd/osd_operations/logmissing_request.h +++ b/src/crimson/osd/osd_operations/logmissing_request.h @@ -41,22 +41,31 @@ public: PerShardPipeline &get_pershard_pipeline(ShardServices &); - crimson::net::Connection &get_connection() { - assert(conn); - return *conn; + crimson::net::Connection &get_local_connection() { + assert(l_conn); + assert(!r_conn); + return *l_conn; }; - seastar::future prepare_remote_submission() { - assert(conn); - return conn.get_foreign( - ).then([this](auto f_conn) { - conn.reset(); - return f_conn; - }); + crimson::net::Connection &get_foreign_connection() { + assert(r_conn); + assert(!l_conn); + return *r_conn; + }; + + crimson::net::ConnectionFFRef prepare_remote_submission() { + assert(l_conn); + assert(!r_conn); + auto ret = seastar::make_foreign(std::move(l_conn)); + l_conn.reset(); + return ret; } - void finish_remote_submission(crimson::net::ConnectionFRef _conn) { - assert(!conn); - conn = make_local_shared_foreign(std::move(_conn)); + + void finish_remote_submission(crimson::net::ConnectionFFRef conn) { + assert(conn); + assert(!l_conn); + assert(!r_conn); + r_conn = make_local_shared_foreign(std::move(conn)); } seastar::future<> with_pg( @@ -77,7 +86,9 @@ public: private: ClientRequest::PGPipeline &client_pp(PG &pg); - crimson::net::ConnectionRef conn; + crimson::net::ConnectionRef l_conn; + crimson::net::ConnectionXcoreRef r_conn; + // must be after `conn` to ensure the ConnectionPipeline's is alive PipelineHandle handle; Ref req; diff --git a/src/crimson/osd/osd_operations/logmissing_request_reply.cc b/src/crimson/osd/osd_operations/logmissing_request_reply.cc index 20a9bf7dd00e6..39b3debf10f59 100644 --- a/src/crimson/osd/osd_operations/logmissing_request_reply.cc +++ b/src/crimson/osd/osd_operations/logmissing_request_reply.cc @@ -21,7 +21,7 @@ namespace crimson::osd { LogMissingRequestReply::LogMissingRequestReply( crimson::net::ConnectionRef&& conn, Ref &&req) - : conn{std::move(conn)}, + : l_conn{std::move(conn)}, req{std::move(req)} {} @@ -46,7 +46,8 @@ void LogMissingRequestReply::dump_detail(Formatter *f) const ConnectionPipeline &LogMissingRequestReply::get_connection_pipeline() { - return get_osd_priv(conn.get()).replicated_request_conn_pipeline; + return get_osd_priv(&get_local_connection() + ).replicated_request_conn_pipeline; } PerShardPipeline &LogMissingRequestReply::get_pershard_pipeline( diff --git a/src/crimson/osd/osd_operations/logmissing_request_reply.h b/src/crimson/osd/osd_operations/logmissing_request_reply.h index b01cae15421db..c741b41bd0f57 100644 --- a/src/crimson/osd/osd_operations/logmissing_request_reply.h +++ b/src/crimson/osd/osd_operations/logmissing_request_reply.h @@ -41,22 +41,31 @@ public: PerShardPipeline &get_pershard_pipeline(ShardServices &); - crimson::net::Connection &get_connection() { - assert(conn); - return *conn; + crimson::net::Connection &get_local_connection() { + assert(l_conn); + assert(!r_conn); + return *l_conn; }; - seastar::future prepare_remote_submission() { - assert(conn); - return conn.get_foreign( - ).then([this](auto f_conn) { - conn.reset(); - return f_conn; - }); + crimson::net::Connection &get_foreign_connection() { + assert(r_conn); + assert(!l_conn); + return *r_conn; + }; + + crimson::net::ConnectionFFRef prepare_remote_submission() { + assert(l_conn); + assert(!r_conn); + auto ret = seastar::make_foreign(std::move(l_conn)); + l_conn.reset(); + return ret; } - void finish_remote_submission(crimson::net::ConnectionFRef _conn) { - assert(!conn); - conn = make_local_shared_foreign(std::move(_conn)); + + void finish_remote_submission(crimson::net::ConnectionFFRef conn) { + assert(conn); + assert(!l_conn); + assert(!r_conn); + r_conn = make_local_shared_foreign(std::move(conn)); } seastar::future<> with_pg( @@ -75,7 +84,9 @@ public: private: ClientRequest::PGPipeline &client_pp(PG &pg); - crimson::net::ConnectionRef conn; + crimson::net::ConnectionRef l_conn; + crimson::net::ConnectionXcoreRef r_conn; + // must be after `conn` to ensure the ConnectionPipeline's is alive PipelineHandle handle; Ref req; diff --git a/src/crimson/osd/osd_operations/peering_event.cc b/src/crimson/osd/osd_operations/peering_event.cc index f12699e5e6a21..5c5c73e008676 100644 --- a/src/crimson/osd/osd_operations/peering_event.cc +++ b/src/crimson/osd/osd_operations/peering_event.cc @@ -135,7 +135,8 @@ PeeringEvent::complete_rctx(ShardServices &shard_services, Ref pg) ConnectionPipeline &RemotePeeringEvent::get_connection_pipeline() { - return get_osd_priv(conn.get()).peering_request_conn_pipeline; + return get_osd_priv(&get_local_connection() + ).client_request_conn_pipeline; } PerShardPipeline &RemotePeeringEvent::get_pershard_pipeline( diff --git a/src/crimson/osd/osd_operations/peering_event.h b/src/crimson/osd/osd_operations/peering_event.h index dad1076a93bb4..1e6bd957289ff 100644 --- a/src/crimson/osd/osd_operations/peering_event.h +++ b/src/crimson/osd/osd_operations/peering_event.h @@ -101,7 +101,9 @@ public: class RemotePeeringEvent : public PeeringEvent { protected: - crimson::net::ConnectionRef conn; + crimson::net::ConnectionRef l_conn; + crimson::net::ConnectionXcoreRef r_conn; + // must be after conn due to ConnectionPipeline's life-time PipelineHandle handle; @@ -117,7 +119,7 @@ public: template RemotePeeringEvent(crimson::net::ConnectionRef conn, Args&&... args) : PeeringEvent(std::forward(args)...), - conn(conn) + l_conn(conn) {} std::tuple< @@ -146,22 +148,31 @@ public: PerShardPipeline &get_pershard_pipeline(ShardServices &); - crimson::net::Connection &get_connection() { - assert(conn); - return *conn; + crimson::net::Connection &get_local_connection() { + assert(l_conn); + assert(!r_conn); + return *l_conn; }; - seastar::future prepare_remote_submission() { - assert(conn); - return conn.get_foreign( - ).then([this](auto f_conn) { - conn.reset(); - return f_conn; - }); + crimson::net::Connection &get_foreign_connection() { + assert(r_conn); + assert(!l_conn); + return *r_conn; + }; + + crimson::net::ConnectionFFRef prepare_remote_submission() { + assert(l_conn); + assert(!r_conn); + auto ret = seastar::make_foreign(std::move(l_conn)); + l_conn.reset(); + return ret; } - void finish_remote_submission(crimson::net::ConnectionFRef _conn) { - assert(!conn); - conn = make_local_shared_foreign(std::move(_conn)); + + void finish_remote_submission(crimson::net::ConnectionFFRef conn) { + assert(conn); + assert(!l_conn); + assert(!r_conn); + r_conn = make_local_shared_foreign(std::move(conn)); } }; diff --git a/src/crimson/osd/osd_operations/recovery_subrequest.cc b/src/crimson/osd/osd_operations/recovery_subrequest.cc index 1ea7ae12869ef..06a3be1662c26 100644 --- a/src/crimson/osd/osd_operations/recovery_subrequest.cc +++ b/src/crimson/osd/osd_operations/recovery_subrequest.cc @@ -35,7 +35,7 @@ seastar::future<> RecoverySubRequest::with_pg( return interruptor::with_interruption([this, pgref] { LOG_PREFIX(RecoverySubRequest::with_pg); DEBUGI("{}: {}", "RecoverySubRequest::with_pg", *this); - return pgref->get_recovery_backend()->handle_recovery_op(m, conn + return pgref->get_recovery_backend()->handle_recovery_op(m, r_conn ).then_interruptible([this] { LOG_PREFIX(RecoverySubRequest::with_pg); DEBUGI("{}: complete", *this); @@ -52,7 +52,8 @@ seastar::future<> RecoverySubRequest::with_pg( ConnectionPipeline &RecoverySubRequest::get_connection_pipeline() { - return get_osd_priv(conn.get()).peering_request_conn_pipeline; + return get_osd_priv(&get_local_connection() + ).client_request_conn_pipeline; } PerShardPipeline &RecoverySubRequest::get_pershard_pipeline( diff --git a/src/crimson/osd/osd_operations/recovery_subrequest.h b/src/crimson/osd/osd_operations/recovery_subrequest.h index 31e6045cb0eeb..17c2faf97ea98 100644 --- a/src/crimson/osd/osd_operations/recovery_subrequest.h +++ b/src/crimson/osd/osd_operations/recovery_subrequest.h @@ -22,7 +22,7 @@ public: RecoverySubRequest( crimson::net::ConnectionRef conn, Ref&& m) - : conn(conn), m(m) {} + : l_conn(conn), m(m) {} void print(std::ostream& out) const final { @@ -44,22 +44,31 @@ public: PerShardPipeline &get_pershard_pipeline(ShardServices &); - crimson::net::Connection &get_connection() { - assert(conn); - return *conn; + crimson::net::Connection &get_local_connection() { + assert(l_conn); + assert(!r_conn); + return *l_conn; }; - seastar::future prepare_remote_submission() { - assert(conn); - return conn.get_foreign( - ).then([this](auto f_conn) { - conn.reset(); - return f_conn; - }); + crimson::net::Connection &get_foreign_connection() { + assert(r_conn); + assert(!l_conn); + return *r_conn; + }; + + crimson::net::ConnectionFFRef prepare_remote_submission() { + assert(l_conn); + assert(!r_conn); + auto ret = seastar::make_foreign(std::move(l_conn)); + l_conn.reset(); + return ret; } - void finish_remote_submission(crimson::net::ConnectionFRef _conn) { - assert(!conn); - conn = make_local_shared_foreign(std::move(_conn)); + + void finish_remote_submission(crimson::net::ConnectionFFRef conn) { + assert(conn); + assert(!l_conn); + assert(!r_conn); + r_conn = make_local_shared_foreign(std::move(conn)); } seastar::future<> with_pg( @@ -77,7 +86,9 @@ public: > tracking_events; private: - crimson::net::ConnectionRef conn; + crimson::net::ConnectionRef l_conn; + crimson::net::ConnectionXcoreRef r_conn; + // must be after `conn` to ensure the ConnectionPipeline's is alive PipelineHandle handle; Ref m; diff --git a/src/crimson/osd/osd_operations/replicated_request.cc b/src/crimson/osd/osd_operations/replicated_request.cc index 4e97db5be982c..dc2adc37efefb 100644 --- a/src/crimson/osd/osd_operations/replicated_request.cc +++ b/src/crimson/osd/osd_operations/replicated_request.cc @@ -22,7 +22,7 @@ namespace crimson::osd { RepRequest::RepRequest(crimson::net::ConnectionRef&& conn, Ref &&req) - : conn{std::move(conn)}, + : l_conn{std::move(conn)}, req{std::move(req)} {} @@ -48,7 +48,8 @@ void RepRequest::dump_detail(Formatter *f) const ConnectionPipeline &RepRequest::get_connection_pipeline() { - return get_osd_priv(conn.get()).replicated_request_conn_pipeline; + return get_osd_priv(&get_local_connection() + ).client_request_conn_pipeline; } PerShardPipeline &RepRequest::get_pershard_pipeline( diff --git a/src/crimson/osd/osd_operations/replicated_request.h b/src/crimson/osd/osd_operations/replicated_request.h index 32cf271788ba8..ff5dea6d6db31 100644 --- a/src/crimson/osd/osd_operations/replicated_request.h +++ b/src/crimson/osd/osd_operations/replicated_request.h @@ -41,22 +41,31 @@ public: PerShardPipeline &get_pershard_pipeline(ShardServices &); - crimson::net::Connection &get_connection() { - assert(conn); - return *conn; + crimson::net::Connection &get_local_connection() { + assert(l_conn); + assert(!r_conn); + return *l_conn; }; - seastar::future prepare_remote_submission() { - assert(conn); - return conn.get_foreign( - ).then([this](auto f_conn) { - conn.reset(); - return f_conn; - }); + crimson::net::Connection &get_foreign_connection() { + assert(r_conn); + assert(!l_conn); + return *r_conn; + }; + + crimson::net::ConnectionFFRef prepare_remote_submission() { + assert(l_conn); + assert(!r_conn); + auto ret = seastar::make_foreign(std::move(l_conn)); + l_conn.reset(); + return ret; } - void finish_remote_submission(crimson::net::ConnectionFRef _conn) { - assert(!conn); - conn = make_local_shared_foreign(std::move(_conn)); + + void finish_remote_submission(crimson::net::ConnectionFFRef conn) { + assert(conn); + assert(!l_conn); + assert(!r_conn); + r_conn = make_local_shared_foreign(std::move(conn)); } seastar::future<> with_pg( @@ -77,7 +86,9 @@ public: private: ClientRequest::PGPipeline &client_pp(PG &pg); - crimson::net::ConnectionRef conn; + crimson::net::ConnectionRef l_conn; + crimson::net::ConnectionXcoreRef r_conn; + PipelineHandle handle; Ref req; }; diff --git a/src/crimson/osd/osd_operations/scrub_events.cc b/src/crimson/osd/osd_operations/scrub_events.cc index 4f54cf0b274f3..9246233448b5b 100644 --- a/src/crimson/osd/osd_operations/scrub_events.cc +++ b/src/crimson/osd/osd_operations/scrub_events.cc @@ -20,7 +20,8 @@ PGPeeringPipeline &RemoteScrubEventBaseT::get_peering_pipeline(PG &pg) template ConnectionPipeline &RemoteScrubEventBaseT::get_connection_pipeline() { - return get_osd_priv(conn.get()).peering_request_conn_pipeline; + return get_osd_priv(&get_local_connection() + ).peering_request_conn_pipeline; } template diff --git a/src/crimson/osd/osd_operations/scrub_events.h b/src/crimson/osd/osd_operations/scrub_events.h index 0793983d8c662..6aa7035d18562 100644 --- a/src/crimson/osd/osd_operations/scrub_events.h +++ b/src/crimson/osd/osd_operations/scrub_events.h @@ -24,7 +24,9 @@ class RemoteScrubEventBaseT : public PhasedOperationT { PipelineHandle handle; - crimson::net::ConnectionRef conn; + crimson::net::ConnectionRef l_conn; + crimson::net::ConnectionXcoreRef r_conn; + epoch_t epoch; spg_t pgid; @@ -38,17 +40,41 @@ protected: public: RemoteScrubEventBaseT( crimson::net::ConnectionRef conn, epoch_t epoch, spg_t pgid) - : conn(conn), epoch(epoch), pgid(pgid) {} + : l_conn(std::move(conn)), epoch(epoch), pgid(pgid) {} PGPeeringPipeline &get_peering_pipeline(PG &pg); + ConnectionPipeline &get_connection_pipeline(); + PerShardPipeline &get_pershard_pipeline(ShardServices &); - crimson::net::Connection &get_connection() { - assert(conn); - return *conn; + crimson::net::Connection &get_local_connection() { + assert(l_conn); + assert(!r_conn); + return *l_conn; }; + crimson::net::Connection &get_foreign_connection() { + assert(r_conn); + assert(!l_conn); + return *r_conn; + }; + + crimson::net::ConnectionFFRef prepare_remote_submission() { + assert(l_conn); + assert(!r_conn); + auto ret = seastar::make_foreign(std::move(l_conn)); + l_conn.reset(); + return ret; + } + + void finish_remote_submission(crimson::net::ConnectionFFRef conn) { + assert(conn); + assert(!l_conn); + assert(!r_conn); + r_conn = make_local_shared_foreign(std::move(conn)); + } + static constexpr bool can_create() { return false; } spg_t get_pgid() const { @@ -58,19 +84,6 @@ public: PipelineHandle &get_handle() { return handle; } epoch_t get_epoch() const { return epoch; } - seastar::future prepare_remote_submission() { - assert(conn); - return conn.get_foreign( - ).then([this](auto f_conn) { - conn.reset(); - return f_conn; - }); - } - void finish_remote_submission(crimson::net::ConnectionFRef _conn) { - assert(!conn); - conn = make_local_shared_foreign(std::move(_conn)); - } - seastar::future<> with_pg( ShardServices &shard_services, Ref pg); diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc index 7638b2fcdbeb2..8d12c42873843 100644 --- a/src/crimson/osd/pg.cc +++ b/src/crimson/osd/pg.cc @@ -1059,7 +1059,7 @@ seastar::future> PG::submit_error_log( PG::do_osd_ops_iertr::future>> PG::do_osd_ops( Ref m, - crimson::net::ConnectionRef conn, + crimson::net::ConnectionXcoreRef conn, ObjectContextRef obc, const OpInfo &op_info, const SnapContext& snapc) @@ -1382,7 +1382,7 @@ void PG::handle_rep_op_reply(const MOSDRepOpReply& m) PG::interruptible_future<> PG::do_update_log_missing( Ref m, - crimson::net::ConnectionRef conn) + crimson::net::ConnectionXcoreRef conn) { if (__builtin_expect(stopping, false)) { return seastar::make_exception_future<>( diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index b829ea177dbbf..e103d3bb5eec1 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -526,7 +526,7 @@ public: void handle_rep_op_reply(const MOSDRepOpReply& m); interruptible_future<> do_update_log_missing( Ref m, - crimson::net::ConnectionRef conn); + crimson::net::ConnectionXcoreRef conn); interruptible_future<> do_update_log_missing_reply( Ref m); @@ -570,7 +570,7 @@ private: do_osd_ops_iertr::future>; do_osd_ops_iertr::future>> do_osd_ops( Ref m, - crimson::net::ConnectionRef conn, + crimson::net::ConnectionXcoreRef conn, ObjectContextRef obc, const OpInfo &op_info, const SnapContext& snapc); @@ -798,7 +798,7 @@ private: }; struct PG::do_osd_ops_params_t { - crimson::net::ConnectionRef &get_connection() const { + crimson::net::ConnectionXcoreRef &get_connection() const { return conn; } osd_reqid_t get_reqid() const { @@ -826,7 +826,7 @@ struct PG::do_osd_ops_params_t { return orig_source_inst.name; } - crimson::net::ConnectionRef &conn; + crimson::net::ConnectionXcoreRef &conn; osd_reqid_t reqid; utime_t mtime; epoch_t map_epoch; diff --git a/src/crimson/osd/pg_shard_manager.h b/src/crimson/osd/pg_shard_manager.h index 1559dde0cb1b3..011a11605fd97 100644 --- a/src/crimson/osd/pg_shard_manager.h +++ b/src/crimson/osd/pg_shard_manager.h @@ -156,7 +156,8 @@ public: ShardServices &target_shard_services, typename T::IRef &&op, F &&f) { - auto &crosscore_ordering = get_osd_priv(&op->get_connection()).crosscore_ordering; + auto &crosscore_ordering = get_osd_priv( + &op->get_foreign_connection()).crosscore_ordering; if (crosscore_ordering.proceed_or_wait(cc_seq)) { return std::invoke( std::move(f), @@ -182,6 +183,8 @@ public: F &&f) { ceph_assert(op->use_count() == 1); if (seastar::this_shard_id() == core) { + auto f_conn = op->prepare_remote_submission(); + op->finish_remote_submission(std::move(f_conn)); auto &target_shard_services = shard_services.local(); return std::invoke( std::move(f), @@ -190,17 +193,17 @@ public: } // Note: the ordering in only preserved until f is invoked. auto &opref = *op; - auto &crosscore_ordering = get_osd_priv(&opref.get_connection()).crosscore_ordering; + auto &crosscore_ordering = get_osd_priv( + &opref.get_local_connection()).crosscore_ordering; auto cc_seq = crosscore_ordering.prepare_submit(core); auto &logger = crimson::get_logger(ceph_subsys_osd); logger.debug("{}: send {} to the remote pg core {}", opref, cc_seq, core); return opref.get_handle().complete( - ).then([&opref, this] { - get_local_state().registry.remove_from_registry(opref); - return opref.prepare_remote_submission(); - }).then([op=std::move(op), f=std::move(f), this, core, cc_seq - ](auto f_conn) mutable { + ).then([this, core, cc_seq, + op=std::move(op), f=std::move(f)]() mutable { + get_local_state().registry.remove_from_registry(*op); + auto f_conn = op->prepare_remote_submission(); return shard_services.invoke_on( core, [this, cc_seq, diff --git a/src/crimson/osd/recovery_backend.cc b/src/crimson/osd/recovery_backend.cc index b5394bfdc485d..e5ba9f8d562b5 100644 --- a/src/crimson/osd/recovery_backend.cc +++ b/src/crimson/osd/recovery_backend.cc @@ -70,7 +70,7 @@ void RecoveryBackend::WaitForObjectRecovery::stop() { void RecoveryBackend::handle_backfill_finish( MOSDPGBackfill& m, - crimson::net::ConnectionRef conn) + crimson::net::ConnectionXcoreRef conn) { logger().debug("{}", __func__); ceph_assert(!pg.is_primary()); @@ -125,7 +125,7 @@ RecoveryBackend::handle_backfill_finish_ack( RecoveryBackend::interruptible_future<> RecoveryBackend::handle_backfill( MOSDPGBackfill& m, - crimson::net::ConnectionRef conn) + crimson::net::ConnectionXcoreRef conn) { logger().debug("{}", __func__); if (pg.old_peering_msg(m.map_epoch, m.query_epoch)) { @@ -227,7 +227,7 @@ RecoveryBackend::scan_for_backfill( RecoveryBackend::interruptible_future<> RecoveryBackend::handle_scan_get_digest( MOSDPGScan& m, - crimson::net::ConnectionRef conn) + crimson::net::ConnectionXcoreRef conn) { logger().debug("{}", __func__); if (false /* FIXME: check for backfill too full */) { @@ -289,7 +289,7 @@ RecoveryBackend::handle_scan_digest( RecoveryBackend::interruptible_future<> RecoveryBackend::handle_scan( MOSDPGScan& m, - crimson::net::ConnectionRef conn) + crimson::net::ConnectionXcoreRef conn) { logger().debug("{}", __func__); if (pg.old_peering_msg(m.map_epoch, m.query_epoch)) { @@ -311,7 +311,7 @@ RecoveryBackend::handle_scan( RecoveryBackend::interruptible_future<> RecoveryBackend::handle_recovery_op( Ref m, - crimson::net::ConnectionRef conn) + crimson::net::ConnectionXcoreRef conn) { switch (m->get_header().type) { case MSG_OSD_PG_BACKFILL: diff --git a/src/crimson/osd/recovery_backend.h b/src/crimson/osd/recovery_backend.h index abf6958915966..c65bf56539241 100644 --- a/src/crimson/osd/recovery_backend.h +++ b/src/crimson/osd/recovery_backend.h @@ -66,7 +66,7 @@ public: virtual interruptible_future<> handle_recovery_op( Ref m, - crimson::net::ConnectionRef conn); + crimson::net::ConnectionXcoreRef conn); virtual interruptible_future<> recover_object( const hobject_t& soid, @@ -212,22 +212,22 @@ protected: private: void handle_backfill_finish( MOSDPGBackfill& m, - crimson::net::ConnectionRef conn); + crimson::net::ConnectionXcoreRef conn); interruptible_future<> handle_backfill_progress( MOSDPGBackfill& m); interruptible_future<> handle_backfill_finish_ack( MOSDPGBackfill& m); interruptible_future<> handle_backfill( MOSDPGBackfill& m, - crimson::net::ConnectionRef conn); + crimson::net::ConnectionXcoreRef conn); interruptible_future<> handle_scan_get_digest( MOSDPGScan& m, - crimson::net::ConnectionRef conn); + crimson::net::ConnectionXcoreRef conn); interruptible_future<> handle_scan_digest( MOSDPGScan& m); interruptible_future<> handle_scan( MOSDPGScan& m, - crimson::net::ConnectionRef conn); + crimson::net::ConnectionXcoreRef conn); interruptible_future<> handle_backfill_remove(MOSDPGBackfillRemove& m); }; diff --git a/src/crimson/osd/replicated_recovery_backend.cc b/src/crimson/osd/replicated_recovery_backend.cc index 9a1060c440e27..a62cb793272d5 100644 --- a/src/crimson/osd/replicated_recovery_backend.cc +++ b/src/crimson/osd/replicated_recovery_backend.cc @@ -1305,7 +1305,7 @@ ReplicatedRecoveryBackend::handle_recovery_delete_reply( RecoveryBackend::interruptible_future<> ReplicatedRecoveryBackend::handle_recovery_op( Ref m, - crimson::net::ConnectionRef conn) + crimson::net::ConnectionXcoreRef conn) { switch (m->get_header().type) { case MSG_OSD_PG_PULL: diff --git a/src/crimson/osd/replicated_recovery_backend.h b/src/crimson/osd/replicated_recovery_backend.h index cd1d9e067289e..aac4531ee95ab 100644 --- a/src/crimson/osd/replicated_recovery_backend.h +++ b/src/crimson/osd/replicated_recovery_backend.h @@ -25,7 +25,7 @@ public: {} interruptible_future<> handle_recovery_op( Ref m, - crimson::net::ConnectionRef conn) final; + crimson::net::ConnectionXcoreRef conn) final; interruptible_future<> recover_object( const hobject_t& soid, diff --git a/src/crimson/osd/watch.cc b/src/crimson/osd/watch.cc index 4573333c3dab7..088054c5a6ed6 100644 --- a/src/crimson/osd/watch.cc +++ b/src/crimson/osd/watch.cc @@ -78,7 +78,7 @@ Watch::~Watch() logger().debug("{} gid={} cookie={}", __func__, get_watcher_gid(), get_cookie()); } -seastar::future<> Watch::connect(crimson::net::ConnectionRef conn, bool) +seastar::future<> Watch::connect(crimson::net::ConnectionXcoreRef conn, bool) { if (this->conn == conn) { logger().debug("conn={} already connected", conn); @@ -235,7 +235,7 @@ std::ostream &operator<<(std::ostream &out, const notify_reply_t &rhs) return out; } -Notify::Notify(crimson::net::ConnectionRef conn, +Notify::Notify(crimson::net::ConnectionXcoreRef conn, const notify_info_t& ninfo, const uint64_t client_gid, const uint64_t user_version) diff --git a/src/crimson/osd/watch.h b/src/crimson/osd/watch.h index b3982141d86e8..64708febd09ce 100644 --- a/src/crimson/osd/watch.h +++ b/src/crimson/osd/watch.h @@ -34,7 +34,7 @@ class Watch : public seastar::enable_shared_from_this { struct private_ctag_t{}; std::set> in_progress_notifies; - crimson::net::ConnectionRef conn; + crimson::net::ConnectionXcoreRef conn; crimson::osd::ObjectContextRef obc; watch_info_t winfo; @@ -67,7 +67,7 @@ public: } ~Watch(); - seastar::future<> connect(crimson::net::ConnectionRef, bool); + seastar::future<> connect(crimson::net::ConnectionXcoreRef, bool); void disconnect(); bool is_alive() const { return true; @@ -131,7 +131,7 @@ std::ostream &operator<<(std::ostream &out, const notify_reply_t &rhs); class Notify : public seastar::enable_shared_from_this { std::set watchers; const notify_info_t ninfo; - crimson::net::ConnectionRef conn; + crimson::net::ConnectionXcoreRef conn; const uint64_t client_gid; const uint64_t user_version; bool complete{false}; @@ -154,14 +154,14 @@ class Notify : public seastar::enable_shared_from_this { /// Called on Notify timeout void do_notify_timeout(); - Notify(crimson::net::ConnectionRef conn, + Notify(crimson::net::ConnectionXcoreRef conn, const notify_info_t& ninfo, const uint64_t client_gid, const uint64_t user_version); template Notify(WatchIteratorT begin, WatchIteratorT end, - crimson::net::ConnectionRef conn, + crimson::net::ConnectionXcoreRef conn, const notify_info_t& ninfo, const uint64_t client_gid, const uint64_t user_version); @@ -207,7 +207,7 @@ public: template Notify::Notify(WatchIteratorT begin, WatchIteratorT end, - crimson::net::ConnectionRef conn, + crimson::net::ConnectionXcoreRef conn, const notify_info_t& ninfo, const uint64_t client_gid, const uint64_t user_version)