]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/net: improve get_global_seq()
authorYingxin Cheng <yingxin.cheng@intel.com>
Wed, 7 Aug 2019 11:14:27 +0000 (19:14 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Wed, 7 Aug 2019 11:14:27 +0000 (19:14 +0800)
Implement single global_seq and non-racing get_global_seq() interface.

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/net/Messenger.h
src/crimson/net/ProtocolV1.cc
src/crimson/net/ProtocolV2.cc
src/crimson/net/SocketMessenger.cc
src/crimson/net/SocketMessenger.h

index 6d857463da86cf7c7ef3f70cd8ecc7351120590e..d0c5e5aa71d4f4d371138f73267ca1c7e9b4ccab 100644 (file)
@@ -35,7 +35,6 @@ using SocketPolicy = ceph::net::Policy<Throttle>;
 class Messenger {
   entity_name_t my_name;
   entity_addrvec_t my_addrs;
-  uint32_t global_seq = 0;
   uint32_t crc_flags = 0;
   ceph::auth::AuthClient* auth_client = nullptr;
   ceph::auth::AuthServer* auth_server = nullptr;
@@ -79,13 +78,6 @@ public:
   /// after this future becomes available
   virtual seastar::future<> shutdown() = 0;
 
-  uint32_t get_global_seq(uint32_t old=0) {
-    if (old > global_seq) {
-      global_seq = old;
-    }
-    return ++global_seq;
-  }
-
   uint32_t get_crc_flags() const {
     return crc_flags;
   }
index be857c01f2eb040097cd1bf8af4048a8abc5b1fa..55c8d0f7978d9746b0959acfa82ef7079b57e400 100644 (file)
@@ -184,8 +184,10 @@ ProtocolV1::handle_connect_reply(msgr_tag_t tag)
     reset_session();
     return seastar::make_ready_future<stop_t>(stop_t::no);
   case CEPH_MSGR_TAG_RETRY_GLOBAL:
-    h.global_seq = messenger.get_global_seq(h.reply.global_seq);
-    return seastar::make_ready_future<stop_t>(stop_t::no);
+    return messenger.get_global_seq(h.reply.global_seq).then([this] (auto gs) {
+      h.global_seq = gs;
+      return seastar::make_ready_future<stop_t>(stop_t::no);
+    });
   case CEPH_MSGR_TAG_RETRY_SESSION:
     ceph_assert(h.reply.connect_seq > h.connect_seq);
     h.connect_seq = h.reply.connect_seq;
@@ -327,6 +329,9 @@ void ProtocolV1::start_connect(const entity_addr_t& _peer_addr,
           }
           return seastar::now();
         }).then([this] {
+          return messenger.get_global_seq();
+        }).then([this] (auto gs) {
+          h.global_seq = gs;
           // read server's handshake header
           return socket->read(server_header_size);
         }).then([this] (bufferlist headerbl) {
@@ -357,7 +362,6 @@ void ProtocolV1::start_connect(const entity_addr_t& _peer_addr,
           bufferlist bl;
           bl.append(buffer::create_static(banner_size, banner));
           ::encode(messenger.get_myaddr(), bl, 0);
-          h.global_seq = messenger.get_global_seq();
           return socket->write_flush(std::move(bl));
         }).then([=] {
           return seastar::repeat([this] {
@@ -399,25 +403,27 @@ seastar::future<stop_t> ProtocolV1::send_connect_reply(
 seastar::future<stop_t> ProtocolV1::send_connect_reply_ready(
     msgr_tag_t tag, bufferlist&& authorizer_reply)
 {
-  h.global_seq = messenger.get_global_seq();
-  h.reply.tag = tag;
-  h.reply.features = conn.policy.features_supported;
-  h.reply.global_seq = h.global_seq;
-  h.reply.connect_seq = h.connect_seq;
-  h.reply.flags = 0;
-  if (conn.policy.lossy) {
-    h.reply.flags = h.reply.flags | CEPH_MSG_CONNECT_LOSSY;
-  }
-  h.reply.authorizer_len = authorizer_reply.length();
+  return messenger.get_global_seq(
+    ).then([this, tag, auth_len = authorizer_reply.length()] (auto gs) {
+      h.global_seq = gs;
+      h.reply.tag = tag;
+      h.reply.features = conn.policy.features_supported;
+      h.reply.global_seq = h.global_seq;
+      h.reply.connect_seq = h.connect_seq;
+      h.reply.flags = 0;
+      if (conn.policy.lossy) {
+        h.reply.flags = h.reply.flags | CEPH_MSG_CONNECT_LOSSY;
+      }
+      h.reply.authorizer_len = auth_len;
 
-  session_security.reset(
-      get_auth_session_handler(nullptr,
-                               auth_meta->auth_method,
-                               auth_meta->session_key,
-                               conn.features));
+      session_security.reset(
+          get_auth_session_handler(nullptr,
+                                   auth_meta->auth_method,
+                                   auth_meta->session_key,
+                                   conn.features));
 
-  return socket->write(make_static_packet(h.reply))
-    .then([this, reply=std::move(authorizer_reply)]() mutable {
+      return socket->write(make_static_packet(h.reply));
+    }).then([this, reply=std::move(authorizer_reply)]() mutable {
       if (reply.length()) {
         return socket->write(std::move(reply));
       } else {
index b51e32e134dc283c7a755992f5f457d9d066df1e..0d97b62df73fb1c9b932f1ba8141694f308c7dad 100644 (file)
@@ -777,9 +777,11 @@ seastar::future<bool> ProtocolV2::client_reconnect()
           auto retry = RetryGlobalFrame::Decode(rx_segments_data.back());
           logger().warn("{} GOT RetryGlobalFrame: gs={}",
                         conn, retry.global_seq());
-          global_seq = messenger.get_global_seq(retry.global_seq());
-          logger().warn("{} UPDATE: gs={}", conn, global_seq);
-          return client_reconnect();
+          return messenger.get_global_seq(retry.global_seq()).then([this] (auto gs) {
+            global_seq = gs;
+            logger().warn("{} UPDATE: gs={}", conn, global_seq);
+            return client_reconnect();
+          });
         });
       case Tag::SESSION_RETRY:
         return read_frame_payload().then([this] {
@@ -835,11 +837,12 @@ void ProtocolV2::execute_connecting()
   seastar::with_gate(pending_dispatch, [this] {
       // we don't know my socket_port yet
       conn.set_ephemeral_port(0, SocketConnection::side_t::none);
-      enable_recording();
-      global_seq = messenger.get_global_seq();
-      logger().debug("{} UPDATE: gs={}", conn, global_seq);
-      return Socket::connect(conn.peer_addr)
-        .then([this](SocketFRef sock) {
+      return messenger.get_global_seq().then([this] (auto gs) {
+          global_seq = gs;
+          enable_recording();
+          logger().debug("{} UPDATE: gs={}", conn, global_seq);
+          return Socket::connect(conn.peer_addr);
+        }).then([this](SocketFRef sock) {
           logger().debug("{} socket connected", conn);
           socket = std::move(sock);
           if (state == state_t::CLOSING) {
@@ -1409,52 +1412,55 @@ seastar::future<> ProtocolV2::send_server_ident()
 {
   // send_server_ident() logic
 
-  // this is required for the case when this connection is being replaced
-  // TODO
-  // out_seq = discard_requeued_up_to(out_seq, 0);
-  conn.in_seq = 0;
-
-  if (!conn.policy.lossy) {
-    server_cookie = ceph::util::generate_random_number<uint64_t>(1, -1ll);
-  }
+  // refered to async-conn v2: not assign gs to global_seq
+  return messenger.get_global_seq().then([this] (auto gs) {
+    logger().debug("{} UPDATE: gs={} for server ident", conn, global_seq);
 
-  uint64_t flags = 0;
-  if (conn.policy.lossy) {
-    flags = flags | CEPH_MSG_CONNECT_LOSSY;
-  }
+    // this is required for the case when this connection is being replaced
+    // TODO
+    // out_seq = discard_requeued_up_to(out_seq, 0);
+    conn.in_seq = 0;
 
-  // refered to async-conn v2: not assign gs to global_seq
-  uint64_t gs = messenger.get_global_seq();
-  auto server_ident = ServerIdentFrame::Encode(
-          messenger.get_myaddrs(),
-          messenger.get_myname().num(),
-          gs,
-          conn.policy.features_supported,
-          conn.policy.features_required | msgr2_required,
-          flags,
-          server_cookie);
-
-  logger().debug("{} WRITE ServerIdentFrame: addrs={}, gid={},"
-                 " gs={}, features_supported={}, features_required={},"
-                 " flags={}, cookie={}",
-                 conn, messenger.get_myaddrs(), messenger.get_myname().num(),
-                 gs, conn.policy.features_supported,
-                 conn.policy.features_required | msgr2_required,
-                 flags, server_cookie);
+    if (!conn.policy.lossy) {
+      server_cookie = ceph::util::generate_random_number<uint64_t>(1, -1ll);
+    }
 
-  conn.set_features(connection_features);
+    uint64_t flags = 0;
+    if (conn.policy.lossy) {
+      flags = flags | CEPH_MSG_CONNECT_LOSSY;
+    }
 
-  // notify
-  seastar::with_gate(pending_dispatch, [this] {
-    return dispatcher.ms_handle_accept(
-        seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()))
-    .handle_exception([this] (std::exception_ptr eptr) {
-      logger().error("{} ms_handle_accept caught exception: {}", conn, eptr);
-      ceph_abort("unecpected exception from ms_handle_accept()");
+    auto server_ident = ServerIdentFrame::Encode(
+            messenger.get_myaddrs(),
+            messenger.get_myname().num(),
+            gs,
+            conn.policy.features_supported,
+            conn.policy.features_required | msgr2_required,
+            flags,
+            server_cookie);
+
+    logger().debug("{} WRITE ServerIdentFrame: addrs={}, gid={},"
+                   " gs={}, features_supported={}, features_required={},"
+                   " flags={}, cookie={}",
+                   conn, messenger.get_myaddrs(), messenger.get_myname().num(),
+                   gs, conn.policy.features_supported,
+                   conn.policy.features_required | msgr2_required,
+                   flags, server_cookie);
+
+    conn.set_features(connection_features);
+
+    // notify
+    seastar::with_gate(pending_dispatch, [this] {
+      return dispatcher.ms_handle_accept(
+          seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()))
+      .handle_exception([this] (std::exception_ptr eptr) {
+        logger().error("{} ms_handle_accept caught exception: {}", conn, eptr);
+        ceph_abort("unecpected exception from ms_handle_accept()");
+      });
     });
-  });
 
-  return write_frame(server_ident);
+    return write_frame(server_ident);
+  });
 }
 
 // REPLACING state
index 292962754f5f3c7dad6a5c4143bb558f999de278..8b75d46e3994de200b49d50361784f6961e5243f 100644 (file)
@@ -372,3 +372,14 @@ void SocketMessenger::unregister_conn(SocketConnectionRef conn)
   ceph_assert(found->second == conn);
   connections.erase(found);
 }
+
+seastar::future<uint32_t>
+SocketMessenger::get_global_seq(uint32_t old)
+{
+  return container().invoke_on(0, [old] (auto& msgr) {
+    if (old > msgr.global_seq) {
+      msgr.global_seq = old;
+    }
+    return ++msgr.global_seq;
+  });
+}
index 2997f643b553b96c767262bd381e9c21bc837524..82fa6c8625b55c516d8e1729440df207c7cf9938 100644 (file)
@@ -42,6 +42,7 @@ class SocketMessenger final : public Messenger, public seastar::peering_sharded_
   const uint32_t nonce;
   // specifying we haven't learned our addr; set false when we find it.
   bool need_addr = true;
+  uint32_t global_seq = 0;
 
   seastar::future<> accept(seastar::connected_socket socket,
                            seastar::socket_address paddr);
@@ -108,6 +109,7 @@ class SocketMessenger final : public Messenger, public seastar::peering_sharded_
   void set_policy_throttler(entity_type_t peer_type, Throttle* throttle) override;
 
  public:
+  seastar::future<uint32_t> get_global_seq(uint32_t old=0);
   seastar::future<> learned_addr(const entity_addr_t &peer_addr_for_me,
                                  const SocketConnection& conn);