From 93ef23d90aa71ca3c084c28d53a0acb0bf5fb4f6 Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Tue, 31 Oct 2023 09:36:23 +0800 Subject: [PATCH] crimson/osd: allow to send messages concurrently The ordering is now guaranteed upon calling send(), so there is no reason to couple the crosscore send future with the operation phases -- exclusive phases will limit the send concurrency, potentially causing OSD starvation. Decouple the crosscore send futures in the IO pathes, mostly in ClientRequest and OSDSingletonState::send_to_osd(). Issue-identified-by: Chunmei Liu see PR53934. Signed-off-by: Yingxin Cheng --- src/crimson/net/Connection.h | 27 ++++++++++++++++- src/crimson/net/Fwd.h | 2 +- .../osd/osd_operations/client_request.cc | 29 +++++++++++-------- src/crimson/osd/shard_services.cc | 3 +- 4 files changed, 46 insertions(+), 15 deletions(-) diff --git a/src/crimson/net/Connection.h b/src/crimson/net/Connection.h index 41596987b09f5..c19bfb1ff57a1 100644 --- a/src/crimson/net/Connection.h +++ b/src/crimson/net/Connection.h @@ -83,8 +83,33 @@ class Connection : public seastar::enable_shared_from_this { * * May be invoked from any core, and the send order will be preserved upon * the call. + * + * The returned future will be resolved only after the message is enqueued + * remotely. */ - virtual seastar::future<> send(MessageURef msg) = 0; + virtual seastar::future<> send( + MessageURef msg) = 0; + + /** + * send_with_throttling + * + * Send a message over a connection that has completed its handshake. + * + * May be invoked from any core, and the send order will be preserved upon + * the call. + * + * TODO: + * + * The returned future is reserved for throttling. + * + * Gating is needed for graceful shutdown, to wait until the message is + * enqueued remotely. + */ + seastar::future<> send_with_throttling( + MessageURef msg /* , seastar::gate & */) { + std::ignore = send(std::move(msg)); + return seastar::now(); + } /** * send_keepalive diff --git a/src/crimson/net/Fwd.h b/src/crimson/net/Fwd.h index 2b159514193c7..3a56cf5bb0a63 100644 --- a/src/crimson/net/Fwd.h +++ b/src/crimson/net/Fwd.h @@ -21,7 +21,7 @@ #include #include "msg/Connection.h" -#include "msg/MessageRef.h" +#include "msg/Message.h" #include "msg/msg_types.h" #include "crimson/common/errorator.h" diff --git a/src/crimson/osd/osd_operations/client_request.cc b/src/crimson/osd/osd_operations/client_request.cc index d208e2e53d97e..c6dbd498c66dd 100644 --- a/src/crimson/osd/osd_operations/client_request.cc +++ b/src/crimson/osd/osd_operations/client_request.cc @@ -204,7 +204,8 @@ ClientRequest::process_pg_op( return pg->do_pg_ops( m ).then_interruptible([this, pg=std::move(pg)](MURef reply) { - return conn->send(std::move(reply)); + // TODO: gate the crosscore sending + return conn->send_with_throttling(std::move(reply)); }); } @@ -218,7 +219,8 @@ auto ClientRequest::reply_op_error(const Ref& pg, int err) !m->has_flag(CEPH_OSD_FLAG_RETURNVEC)); reply->set_reply_versions(eversion_t(), 0); reply->set_op_returns(std::vector{}); - return conn->send(std::move(reply)); + // TODO: gate the crosscore sending + return conn->send_with_throttling(std::move(reply)); } ClientRequest::interruptible_future<> @@ -246,7 +248,8 @@ ClientRequest::process_op(instance_handle_t &ihref, Ref &pg) m.get(), completed->err, pg->get_osdmap_epoch(), CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK, false); reply->set_reply_versions(completed->version, completed->user_version); - return conn->send(std::move(reply)); + // TODO: gate the crosscore sending + return conn->send_with_throttling(std::move(reply)); } else { return ihref.enter_stage(client_pp(*pg).get_obc, *this ).then_interruptible( @@ -319,13 +322,13 @@ ClientRequest::do_process( SnapContext snapc = get_snapc(pg,obc); - if ((m->has_flag(CEPH_OSD_FLAG_ORDERSNAP)) && - snapc.seq < obc->ssc->snapset.seq) { - DEBUGI("{} ORDERSNAP flag set and snapc seq {}", - " < snapset seq {} on {}", - __func__, snapc.seq, obc->ssc->snapset.seq, - obc->obs.oi.soid); - return reply_op_error(pg, -EOLDSNAPC); + if (m->has_flag(CEPH_OSD_FLAG_ORDERSNAP) && + snapc.seq < obc->ssc->snapset.seq) { + DEBUGI("{} ORDERSNAP flag set and snapc seq {}", + " < snapset seq {} on {}", + __func__, snapc.seq, obc->ssc->snapset.seq, + obc->obs.oi.soid); + return reply_op_error(pg, -EOLDSNAPC); } if (!pg->is_primary()) { @@ -357,8 +360,10 @@ ClientRequest::do_process( [this, reply=std::move(reply)]() mutable { LOG_PREFIX(ClientRequest::do_process); DEBUGI("{}: sending response", *this); - return conn->send(std::move(reply)); - }); + // TODO: gate the crosscore sending + return conn->send_with_throttling(std::move(reply)); + } + ); }, crimson::ct_error::eagain::handle([this, pg, &ihref]() mutable { return process_op(ihref, pg); })); diff --git a/src/crimson/osd/shard_services.cc b/src/crimson/osd/shard_services.cc index 404f28d7d7f3b..ae36d7f6ea008 100644 --- a/src/crimson/osd/shard_services.cc +++ b/src/crimson/osd/shard_services.cc @@ -166,7 +166,8 @@ seastar::future<> OSDSingletonState::send_to_osd( } else { auto conn = cluster_msgr.connect( osdmap->get_cluster_addrs(peer).front(), CEPH_ENTITY_TYPE_OSD); - return conn->send(std::move(m)); + // TODO: gate the crosscore sending + return conn->send_with_throttling(std::move(m)); } } -- 2.39.5