]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/net: add SocketConnection::peer_wins()
authorYingxin Cheng <yingxin.cheng@intel.com>
Thu, 5 Sep 2019 08:36:32 +0000 (16:36 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Wed, 18 Sep 2019 04:22:09 +0000 (12:22 +0800)
Also expose Connection::peer_wins() to unit tests.

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/net/Connection.h
src/crimson/net/ProtocolV1.cc
src/crimson/net/ProtocolV2.cc
src/crimson/net/SocketConnection.cc
src/crimson/net/SocketConnection.h

index 3a179ee15fecbee7fc1e310903ebcdfb1f463593..8129c5351cf64c69f4c8b7a0727405ffe82c574e 100644 (file)
@@ -69,6 +69,8 @@ class Connection : public seastar::enable_shared_from_this<Connection> {
 
 #ifdef UNIT_TESTS_BUILT
   virtual bool is_closed() const = 0;
+
+  virtual bool peer_wins() const = 0;
 #endif
 
   /// send a message over a connection that has completed its handshake
index 7aca87fd0f36afb2511c66c7b7494013c477045e..13e5fe3e73f620f38b2bfa204768bc512bfd72f2 100644 (file)
@@ -503,9 +503,7 @@ seastar::future<stop_t> ProtocolV1::handle_connect_with_existing(
         h.reply.connect_seq = exproto->connect_seq() + 1;
         return send_connect_reply(CEPH_MSGR_TAG_RETRY_SESSION);
       }
-    } else if (conn.peer_addr < messenger.get_myaddr() ||
-               existing->is_server_side()) {
-      // incoming wins
+    } else if (existing->peer_wins()) {
       return replace_existing(existing, std::move(authorizer_reply));
     } else {
       return send_connect_reply(CEPH_MSGR_TAG_WAIT);
index ddd597168b49f878126294149da110ea7e1dc441..41328b4f82c096f7201fd70997123aa2b1e648c2 100644 (file)
@@ -1195,14 +1195,12 @@ ProtocolV2::handle_existing_connection(SocketConnectionRef existing_conn)
     // Looks like a connection race: server and client are both connecting to
     // each other at the same time.
     if (existing_proto->client_cookie != client_cookie) {
-      if (conn.peer_addr < messenger.get_myaddr() || existing_conn->policy.server) {
-        // this connection wins
+      if (existing_conn->peer_wins()) {
         logger().warn("{} server_connect: connection race detected (cs={}, e_cs={}, ss=0)"
                       " and win, reusing existing {}",
                       conn, client_cookie, existing_proto->client_cookie, *existing_conn);
         return reuse_connection(existing_proto);
       } else {
-        // the existing connection wins
         logger().warn("{} server_connect: connection race detected (cs={}, e_cs={}, ss=0)"
                       " and lose to existing {}, ask client to wait",
                       conn, client_cookie, existing_proto->client_cookie, *existing_conn);
@@ -1468,21 +1466,18 @@ ProtocolV2::server_reconnect()
       return send_retry(existing_proto->connect_seq);
     } else if (existing_proto->connect_seq == reconnect.connect_seq()) {
       // reconnect race: both peers are sending reconnect messages
-      if (existing_conn->peer_addr > messenger.get_myaddrs().msgr2_addr() &&
-          !existing_conn->policy.server) {
-        // the existing connection wins
-        logger().warn("{} server_reconnect: reconnect race detected (cs={})"
-                      " and lose to existing {}, ask client to wait",
-                      conn, reconnect.connect_seq(), *existing_conn);
-        return send_wait();
-      } else {
-        // this connection wins
+      if (existing_conn->peer_wins()) {
         logger().warn("{} server_reconnect: reconnect race detected (cs={})"
                       " and win, reusing existing {}",
                       conn, reconnect.connect_seq(), *existing_conn);
         return reuse_connection(
             existing_proto, false,
             true, reconnect.connect_seq(), reconnect.msg_seq());
+      } else {
+        logger().warn("{} server_reconnect: reconnect race detected (cs={})"
+                      " and lose to existing {}, ask client to wait",
+                      conn, reconnect.connect_seq(), *existing_conn);
+        return send_wait();
       }
     } else { // existing_proto->connect_seq < reconnect.connect_seq()
       logger().warn("{} server_reconnect: stale exsiting connect_seq exist_cs({}) < peer_cs({}),"
index 44735ca8528fde81bc55990f7c8b219d00468bc8..d3e4114ceead0aabd6c066dfc2f48934b62f78c2 100644 (file)
@@ -63,7 +63,12 @@ bool SocketConnection::is_closed() const
   ceph_assert(seastar::engine().cpu_id() == shard_id());
   return protocol->is_closed();
 }
+
 #endif
+bool SocketConnection::peer_wins() const
+{
+  return (messenger.get_myaddr() > peer_addr || policy.server);
+}
 
 seastar::future<> SocketConnection::send(MessageRef msg)
 {
index 0c49f621b883f38de76d2a1063bc5d3108c95ad6..c27315765caa4a3d91c27aa2c307a9eebe06a841 100644 (file)
@@ -83,6 +83,10 @@ class SocketConnection : public Connection {
 
 #ifdef UNIT_TESTS_BUILT
   bool is_closed() const override;
+
+  bool peer_wins() const override;
+#else
+  bool peer_wins() const;
 #endif
 
   seastar::future<> send(MessageRef msg) override;