From: Yingxin Cheng Date: Thu, 20 Oct 2022 07:03:09 +0000 (+0800) Subject: crimson/net: no futurized SocketMessenger::get_global_seq() X-Git-Tag: v18.1.0~967^2~2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=d3292a3af55fea757448c1d6f03afcf0d8f9f34d;p=ceph.git crimson/net: no futurized SocketMessenger::get_global_seq() Signed-off-by: Yingxin Cheng --- diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc index 194a6b7b4d7..0f36066db8d 100644 --- a/src/crimson/net/ProtocolV2.cc +++ b/src/crimson/net/ProtocolV2.cc @@ -758,11 +758,9 @@ ProtocolV2::client_reconnect() auto retry = RetryGlobalFrame::Decode(rx_segments_data.back()); logger().warn("{} GOT RetryGlobalFrame: gs={}", conn, retry.global_seq()); - return messenger.get_global_seq(retry.global_seq()).then([this] (auto gs) { - global_seq = gs; - logger().warn("{} UPDATE: gs={} for retry global", conn, global_seq); - return client_reconnect(); - }); + global_seq = messenger.get_global_seq(retry.global_seq()); + logger().warn("{} UPDATE: gs={} for retry global", conn, global_seq); + return client_reconnect(); }); case Tag::SESSION_RETRY: return read_frame_payload().then([this] { @@ -808,21 +806,18 @@ void ProtocolV2::execute_connecting() socket->shutdown(); } gated_execute("execute_connecting", [this] { - return messenger.get_global_seq().then([this] (auto gs) { - global_seq = gs; - assert(client_cookie != 0); - if (!conn.policy.lossy && server_cookie != 0) { - ++connect_seq; - logger().debug("{} UPDATE: gs={}, cs={} for reconnect", - conn, global_seq, connect_seq); - } else { // conn.policy.lossy || server_cookie == 0 - assert(connect_seq == 0); - assert(server_cookie == 0); - logger().debug("{} UPDATE: gs={} for connect", conn, global_seq); - } - - return wait_write_exit(); - }).then([this] { + global_seq = messenger.get_global_seq(); + assert(client_cookie != 0); + if (!conn.policy.lossy && server_cookie != 0) { + ++connect_seq; + logger().debug("{} UPDATE: gs={}, cs={} for reconnect", + conn, global_seq, connect_seq); + } else { // conn.policy.lossy || server_cookie == 0 + assert(connect_seq == 0); + assert(server_cookie == 0); + logger().debug("{} UPDATE: gs={} for connect", conn, global_seq); + } + return wait_write_exit().then([this] { if (unlikely(state != state_t::CONNECTING)) { logger().debug("{} triggered {} before Socket::connect()", conn, get_state_name(state)); @@ -1626,42 +1621,40 @@ ProtocolV2::send_server_ident() // send_server_ident() logic // refered to async-conn v2: not assign gs to global_seq - return messenger.get_global_seq().then([this] (auto gs) { - global_seq = gs; - logger().debug("{} UPDATE: gs={} for server ident", conn, global_seq); + global_seq = messenger.get_global_seq(); + logger().debug("{} UPDATE: gs={} for server ident", conn, global_seq); - // this is required for the case when this connection is being replaced - requeue_up_to(0); - conn.in_seq = 0; + // this is required for the case when this connection is being replaced + requeue_up_to(0); + conn.in_seq = 0; - if (!conn.policy.lossy) { - server_cookie = ceph::util::generate_random_number(1, -1ll); - } + if (!conn.policy.lossy) { + server_cookie = ceph::util::generate_random_number(1, -1ll); + } - uint64_t flags = 0; - if (conn.policy.lossy) { - flags = flags | CEPH_MSG_CONNECT_LOSSY; - } + uint64_t flags = 0; + if (conn.policy.lossy) { + flags = flags | CEPH_MSG_CONNECT_LOSSY; + } - auto server_ident = ServerIdentFrame::Encode( - messenger.get_myaddrs(), - messenger.get_myname().num(), - global_seq, - conn.policy.features_supported, - conn.policy.features_required | msgr2_required, - flags, - server_cookie); - - logger().debug("{} WRITE ServerIdentFrame: addrs={}, gid={}," - " gs={}, features_supported={}, features_required={}," - " flags={}, cookie={}", - conn, messenger.get_myaddrs(), messenger.get_myname().num(), - global_seq, conn.policy.features_supported, - conn.policy.features_required | msgr2_required, - flags, server_cookie); - - return write_frame(server_ident); - }); + auto server_ident = ServerIdentFrame::Encode( + messenger.get_myaddrs(), + messenger.get_myname().num(), + global_seq, + conn.policy.features_supported, + conn.policy.features_required | msgr2_required, + flags, + server_cookie); + + logger().debug("{} WRITE ServerIdentFrame: addrs={}, gid={}," + " gs={}, features_supported={}, features_required={}," + " flags={}, cookie={}", + conn, messenger.get_myaddrs(), messenger.get_myname().num(), + global_seq, conn.policy.features_supported, + conn.policy.features_required | msgr2_required, + flags, server_cookie); + + return write_frame(server_ident); } // REPLACING state diff --git a/src/crimson/net/SocketMessenger.cc b/src/crimson/net/SocketMessenger.cc index 073a6fa3bdc..95b9946bf33 100644 --- a/src/crimson/net/SocketMessenger.cc +++ b/src/crimson/net/SocketMessenger.cc @@ -441,13 +441,12 @@ void SocketMessenger::closed_conn(SocketConnectionRef conn) } } -seastar::future -SocketMessenger::get_global_seq(uint32_t old) +uint32_t SocketMessenger::get_global_seq(uint32_t old) { if (old > global_seq) { global_seq = old; } - return seastar::make_ready_future(++global_seq); + return ++global_seq; } } // namespace crimson::net diff --git a/src/crimson/net/SocketMessenger.h b/src/crimson/net/SocketMessenger.h index e53fa6d7bb3..903455e96df 100644 --- a/src/crimson/net/SocketMessenger.h +++ b/src/crimson/net/SocketMessenger.h @@ -110,7 +110,7 @@ class SocketMessenger final : public Messenger { void set_policy_throttler(entity_type_t peer_type, Throttle* throttle) override; public: - seastar::future get_global_seq(uint32_t old=0); + uint32_t get_global_seq(uint32_t old=0); void learned_addr(const entity_addr_t &peer_addr_for_me, const SocketConnection& conn);