From 03fbb3603a76a836a2358a6c609fbd3e1315ff91 Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Wed, 7 Aug 2019 19:14:27 +0800 Subject: [PATCH] crimson/net: improve get_global_seq() Implement single global_seq and non-racing get_global_seq() interface. Signed-off-by: Yingxin Cheng --- src/crimson/net/Messenger.h | 8 --- src/crimson/net/ProtocolV1.cc | 46 +++++++------ src/crimson/net/ProtocolV2.cc | 102 +++++++++++++++-------------- src/crimson/net/SocketMessenger.cc | 11 ++++ src/crimson/net/SocketMessenger.h | 2 + 5 files changed, 93 insertions(+), 76 deletions(-) diff --git a/src/crimson/net/Messenger.h b/src/crimson/net/Messenger.h index 6d857463da8..d0c5e5aa71d 100644 --- a/src/crimson/net/Messenger.h +++ b/src/crimson/net/Messenger.h @@ -35,7 +35,6 @@ using SocketPolicy = ceph::net::Policy; class Messenger { entity_name_t my_name; entity_addrvec_t my_addrs; - uint32_t global_seq = 0; uint32_t crc_flags = 0; ceph::auth::AuthClient* auth_client = nullptr; ceph::auth::AuthServer* auth_server = nullptr; @@ -79,13 +78,6 @@ public: /// after this future becomes available virtual seastar::future<> shutdown() = 0; - uint32_t get_global_seq(uint32_t old=0) { - if (old > global_seq) { - global_seq = old; - } - return ++global_seq; - } - uint32_t get_crc_flags() const { return crc_flags; } diff --git a/src/crimson/net/ProtocolV1.cc b/src/crimson/net/ProtocolV1.cc index be857c01f2e..55c8d0f7978 100644 --- a/src/crimson/net/ProtocolV1.cc +++ b/src/crimson/net/ProtocolV1.cc @@ -184,8 +184,10 @@ ProtocolV1::handle_connect_reply(msgr_tag_t tag) reset_session(); return seastar::make_ready_future(stop_t::no); case CEPH_MSGR_TAG_RETRY_GLOBAL: - h.global_seq = messenger.get_global_seq(h.reply.global_seq); - return seastar::make_ready_future(stop_t::no); + return messenger.get_global_seq(h.reply.global_seq).then([this] (auto gs) { + h.global_seq = gs; + return seastar::make_ready_future(stop_t::no); + }); case CEPH_MSGR_TAG_RETRY_SESSION: ceph_assert(h.reply.connect_seq > h.connect_seq); h.connect_seq = h.reply.connect_seq; @@ -327,6 +329,9 @@ void ProtocolV1::start_connect(const entity_addr_t& _peer_addr, } return seastar::now(); }).then([this] { + return messenger.get_global_seq(); + }).then([this] (auto gs) { + h.global_seq = gs; // read server's handshake header return socket->read(server_header_size); }).then([this] (bufferlist headerbl) { @@ -357,7 +362,6 @@ void ProtocolV1::start_connect(const entity_addr_t& _peer_addr, bufferlist bl; bl.append(buffer::create_static(banner_size, banner)); ::encode(messenger.get_myaddr(), bl, 0); - h.global_seq = messenger.get_global_seq(); return socket->write_flush(std::move(bl)); }).then([=] { return seastar::repeat([this] { @@ -399,25 +403,27 @@ seastar::future ProtocolV1::send_connect_reply( seastar::future ProtocolV1::send_connect_reply_ready( msgr_tag_t tag, bufferlist&& authorizer_reply) { - h.global_seq = messenger.get_global_seq(); - h.reply.tag = tag; - h.reply.features = conn.policy.features_supported; - h.reply.global_seq = h.global_seq; - h.reply.connect_seq = h.connect_seq; - h.reply.flags = 0; - if (conn.policy.lossy) { - h.reply.flags = h.reply.flags | CEPH_MSG_CONNECT_LOSSY; - } - h.reply.authorizer_len = authorizer_reply.length(); + return messenger.get_global_seq( + ).then([this, tag, auth_len = authorizer_reply.length()] (auto gs) { + h.global_seq = gs; + h.reply.tag = tag; + h.reply.features = conn.policy.features_supported; + h.reply.global_seq = h.global_seq; + h.reply.connect_seq = h.connect_seq; + h.reply.flags = 0; + if (conn.policy.lossy) { + h.reply.flags = h.reply.flags | CEPH_MSG_CONNECT_LOSSY; + } + h.reply.authorizer_len = auth_len; - session_security.reset( - get_auth_session_handler(nullptr, - auth_meta->auth_method, - auth_meta->session_key, - conn.features)); + session_security.reset( + get_auth_session_handler(nullptr, + auth_meta->auth_method, + auth_meta->session_key, + conn.features)); - return socket->write(make_static_packet(h.reply)) - .then([this, reply=std::move(authorizer_reply)]() mutable { + return socket->write(make_static_packet(h.reply)); + }).then([this, reply=std::move(authorizer_reply)]() mutable { if (reply.length()) { return socket->write(std::move(reply)); } else { diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc index b51e32e134d..0d97b62df73 100644 --- a/src/crimson/net/ProtocolV2.cc +++ b/src/crimson/net/ProtocolV2.cc @@ -777,9 +777,11 @@ seastar::future ProtocolV2::client_reconnect() auto retry = RetryGlobalFrame::Decode(rx_segments_data.back()); logger().warn("{} GOT RetryGlobalFrame: gs={}", conn, retry.global_seq()); - global_seq = messenger.get_global_seq(retry.global_seq()); - logger().warn("{} UPDATE: gs={}", conn, global_seq); - return client_reconnect(); + return messenger.get_global_seq(retry.global_seq()).then([this] (auto gs) { + global_seq = gs; + logger().warn("{} UPDATE: gs={}", conn, global_seq); + return client_reconnect(); + }); }); case Tag::SESSION_RETRY: return read_frame_payload().then([this] { @@ -835,11 +837,12 @@ void ProtocolV2::execute_connecting() seastar::with_gate(pending_dispatch, [this] { // we don't know my socket_port yet conn.set_ephemeral_port(0, SocketConnection::side_t::none); - enable_recording(); - global_seq = messenger.get_global_seq(); - logger().debug("{} UPDATE: gs={}", conn, global_seq); - return Socket::connect(conn.peer_addr) - .then([this](SocketFRef sock) { + return messenger.get_global_seq().then([this] (auto gs) { + global_seq = gs; + enable_recording(); + logger().debug("{} UPDATE: gs={}", conn, global_seq); + return Socket::connect(conn.peer_addr); + }).then([this](SocketFRef sock) { logger().debug("{} socket connected", conn); socket = std::move(sock); if (state == state_t::CLOSING) { @@ -1409,52 +1412,55 @@ seastar::future<> ProtocolV2::send_server_ident() { // send_server_ident() logic - // this is required for the case when this connection is being replaced - // TODO - // out_seq = discard_requeued_up_to(out_seq, 0); - conn.in_seq = 0; - - if (!conn.policy.lossy) { - server_cookie = ceph::util::generate_random_number(1, -1ll); - } + // refered to async-conn v2: not assign gs to global_seq + return messenger.get_global_seq().then([this] (auto gs) { + logger().debug("{} UPDATE: gs={} for server ident", conn, global_seq); - uint64_t flags = 0; - if (conn.policy.lossy) { - flags = flags | CEPH_MSG_CONNECT_LOSSY; - } + // this is required for the case when this connection is being replaced + // TODO + // out_seq = discard_requeued_up_to(out_seq, 0); + conn.in_seq = 0; - // refered to async-conn v2: not assign gs to global_seq - uint64_t gs = messenger.get_global_seq(); - auto server_ident = ServerIdentFrame::Encode( - messenger.get_myaddrs(), - messenger.get_myname().num(), - gs, - conn.policy.features_supported, - conn.policy.features_required | msgr2_required, - flags, - server_cookie); - - logger().debug("{} WRITE ServerIdentFrame: addrs={}, gid={}," - " gs={}, features_supported={}, features_required={}," - " flags={}, cookie={}", - conn, messenger.get_myaddrs(), messenger.get_myname().num(), - gs, conn.policy.features_supported, - conn.policy.features_required | msgr2_required, - flags, server_cookie); + if (!conn.policy.lossy) { + server_cookie = ceph::util::generate_random_number(1, -1ll); + } - conn.set_features(connection_features); + uint64_t flags = 0; + if (conn.policy.lossy) { + flags = flags | CEPH_MSG_CONNECT_LOSSY; + } - // notify - seastar::with_gate(pending_dispatch, [this] { - return dispatcher.ms_handle_accept( - seastar::static_pointer_cast(conn.shared_from_this())) - .handle_exception([this] (std::exception_ptr eptr) { - logger().error("{} ms_handle_accept caught exception: {}", conn, eptr); - ceph_abort("unecpected exception from ms_handle_accept()"); + auto server_ident = ServerIdentFrame::Encode( + messenger.get_myaddrs(), + messenger.get_myname().num(), + gs, + conn.policy.features_supported, + conn.policy.features_required | msgr2_required, + flags, + server_cookie); + + logger().debug("{} WRITE ServerIdentFrame: addrs={}, gid={}," + " gs={}, features_supported={}, features_required={}," + " flags={}, cookie={}", + conn, messenger.get_myaddrs(), messenger.get_myname().num(), + gs, conn.policy.features_supported, + conn.policy.features_required | msgr2_required, + flags, server_cookie); + + conn.set_features(connection_features); + + // notify + seastar::with_gate(pending_dispatch, [this] { + return dispatcher.ms_handle_accept( + seastar::static_pointer_cast(conn.shared_from_this())) + .handle_exception([this] (std::exception_ptr eptr) { + logger().error("{} ms_handle_accept caught exception: {}", conn, eptr); + ceph_abort("unecpected exception from ms_handle_accept()"); + }); }); - }); - return write_frame(server_ident); + return write_frame(server_ident); + }); } // REPLACING state diff --git a/src/crimson/net/SocketMessenger.cc b/src/crimson/net/SocketMessenger.cc index 292962754f5..8b75d46e399 100644 --- a/src/crimson/net/SocketMessenger.cc +++ b/src/crimson/net/SocketMessenger.cc @@ -372,3 +372,14 @@ void SocketMessenger::unregister_conn(SocketConnectionRef conn) ceph_assert(found->second == conn); connections.erase(found); } + +seastar::future +SocketMessenger::get_global_seq(uint32_t old) +{ + return container().invoke_on(0, [old] (auto& msgr) { + if (old > msgr.global_seq) { + msgr.global_seq = old; + } + return ++msgr.global_seq; + }); +} diff --git a/src/crimson/net/SocketMessenger.h b/src/crimson/net/SocketMessenger.h index 2997f643b55..82fa6c8625b 100644 --- a/src/crimson/net/SocketMessenger.h +++ b/src/crimson/net/SocketMessenger.h @@ -42,6 +42,7 @@ class SocketMessenger final : public Messenger, public seastar::peering_sharded_ const uint32_t nonce; // specifying we haven't learned our addr; set false when we find it. bool need_addr = true; + uint32_t global_seq = 0; seastar::future<> accept(seastar::connected_socket socket, seastar::socket_address paddr); @@ -108,6 +109,7 @@ class SocketMessenger final : public Messenger, public seastar::peering_sharded_ void set_policy_throttler(entity_type_t peer_type, Throttle* throttle) override; public: + seastar::future get_global_seq(uint32_t old=0); seastar::future<> learned_addr(const entity_addr_t &peer_addr_for_me, const SocketConnection& conn); -- 2.39.5