]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/async/ProtocolV2: take care of features when replacing the socket 35733/head
authorIlya Dryomov <idryomov@gmail.com>
Fri, 26 Jun 2020 20:57:06 +0000 (20:57 +0000)
committerIlya Dryomov <idryomov@gmail.com>
Mon, 13 Jul 2020 12:45:53 +0000 (14:45 +0200)
reuse_connection() can be called on exproto in BANNER_CONNECTING
(i.e. without peer_supported_features and with tx/rx_frame_asm set to
msgr2.0), but this state isn't carried over.  If the donor connection
is msgr2.1, this leads to repeated connection faults on crc or auth tag
mismatches because we end up assembling 2.0 frames while the peer is
expecting 2.1 frames.

Fixes: https://tracker.ceph.com/issues/46180
Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
(cherry picked from commit baff20a14bb322fc11bf13495bb8f5d5f4626116)

Conflicts:
src/test/msgr/test_msgr.cc [ commits 5e3aa5d15d2d
  ("ceph_test_msgr: remove simple") and bfb8c741cd1e
  ("test/msgr: s/Mutex/ceph::mutex/") not in nautilus ]

src/msg/async/ProtocolV2.cc
src/msg/async/frames_v2.h
src/test/msgr/test_msgr.cc

index f5c904544dd9deed49cf1e6b1d4c52447bcf6b63..0431414608500fa03567e1521b5d11e6890ad01f 100644 (file)
@@ -2618,6 +2618,10 @@ CtPtr ProtocolV2::reuse_connection(AsyncConnectionRef existing,
   exproto->pre_auth.enabled = false;
 
   if (!reconnecting) {
+    exproto->peer_supported_features = peer_supported_features;
+    exproto->tx_frame_asm.set_is_rev1(tx_frame_asm.get_is_rev1());
+    exproto->rx_frame_asm.set_is_rev1(rx_frame_asm.get_is_rev1());
+
     exproto->client_cookie = client_cookie;
     exproto->peer_name = peer_name;
     exproto->connection_features = connection_features;
index 4d1560371167977ace41684e6303668277d493fe..88fa4e1be1fedf5503cd3e8da51a2c50af799799 100644 (file)
@@ -184,6 +184,10 @@ public:
     m_is_rev1 = is_rev1;
   }
 
+  bool get_is_rev1() {
+    return m_is_rev1;
+  }
+
   size_t get_num_segments() const {
     ceph_assert(!m_descs.empty());
     return m_descs.size();
index dc8be363f7689d4cfd4c7948605828ddb28c6028..e388d3abb2c40c7421375f21a77ba23bbc7a76eb 100644 (file)
@@ -16,6 +16,7 @@
 
 #include <atomic>
 #include <iostream>
+#include <memory>
 #include <unistd.h>
 #include <stdlib.h>
 #include <time.h>
@@ -423,6 +424,121 @@ TEST_P(MessengerTest, ConnectionRaceTest) {
   delete srv_interceptor;
 }
 
+/**
+ * Scenario: A connects to B, and B connects to A at the same time.
+ * The first (A -> B) connection gets to message flow handshake, the
+ * second (B -> A) connection is stuck waiting for a banner from A.
+ * After A sends client_ident to B, the first connection wins and B
+ * calls reuse_connection() to replace the second connection's socket
+ * while the second connection is still in BANNER_CONNECTING.
+ */
+TEST_P(MessengerTest, ConnectionRaceReuseBannerTest) {
+  if (string(GetParam()) == "simple") {
+    return;
+  }
+
+  FakeDispatcher cli_dispatcher(false), srv_dispatcher(false);
+
+  auto cli_interceptor = std::make_unique<TestInterceptor>();
+  auto srv_interceptor = std::make_unique<TestInterceptor>();
+
+  server_msgr->set_policy(entity_name_t::TYPE_CLIENT,
+                          Messenger::Policy::lossless_peer_reuse(0));
+  server_msgr->interceptor = srv_interceptor.get();
+
+  client_msgr->set_policy(entity_name_t::TYPE_OSD,
+                          Messenger::Policy::lossless_peer_reuse(0));
+  client_msgr->interceptor = cli_interceptor.get();
+
+  entity_addr_t bind_addr;
+  bind_addr.parse("v2:127.0.0.1:3300");
+  server_msgr->bind(bind_addr);
+  server_msgr->add_dispatcher_head(&srv_dispatcher);
+  server_msgr->start();
+
+  bind_addr.parse("v2:127.0.0.1:3301");
+  client_msgr->bind(bind_addr);
+  client_msgr->add_dispatcher_head(&cli_dispatcher);
+  client_msgr->start();
+
+  // pause before sending client_ident message
+  srv_interceptor->breakpoint(11);
+
+  ConnectionRef s2c = server_msgr->connect_to(client_msgr->get_mytype(),
+                                              client_msgr->get_myaddrs());
+  MPing *m1 = new MPing();
+  ASSERT_EQ(s2c->send_message(m1), 0);
+
+  srv_interceptor->wait(11);
+  srv_interceptor->remove_bp(11);
+
+  // pause before sending banner
+  cli_interceptor->breakpoint(3);
+
+  ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(),
+                                              server_msgr->get_myaddrs());
+  MPing *m2 = new MPing();
+  ASSERT_EQ(c2s->send_message(m2), 0);
+
+  cli_interceptor->wait(3);
+  cli_interceptor->remove_bp(3);
+
+  // second connection is in BANNER_CONNECTING, ensure it stays so
+  // and send client_ident
+  srv_interceptor->breakpoint(4);
+  srv_interceptor->proceed(11, Interceptor::ACTION::CONTINUE);
+
+  // handle client_ident -- triggers reuse_connection() with exproto
+  // in BANNER_CONNECTING
+  cli_interceptor->breakpoint(15);
+  cli_interceptor->proceed(3, Interceptor::ACTION::CONTINUE);
+
+  cli_interceptor->wait(15);
+  cli_interceptor->remove_bp(15);
+
+  // first connection is in READY
+  Connection *s2c_accepter = srv_interceptor->wait(4);
+  srv_interceptor->remove_bp(4);
+
+  srv_interceptor->proceed(4, Interceptor::ACTION::CONTINUE);
+  cli_interceptor->proceed(15, Interceptor::ACTION::CONTINUE);
+
+  {
+    Mutex::Locker l(cli_dispatcher.lock);
+    while (!cli_dispatcher.got_new)
+      cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    cli_dispatcher.got_new = false;
+  }
+
+  {
+    Mutex::Locker l(srv_dispatcher.lock);
+    while (!srv_dispatcher.got_new)
+      srv_dispatcher.cond.Wait(srv_dispatcher.lock);
+    srv_dispatcher.got_new = false;
+  }
+
+  EXPECT_TRUE(s2c->is_connected());
+  EXPECT_EQ(1u, static_cast<Session*>(s2c->get_priv().get())->get_count());
+  EXPECT_TRUE(s2c->peer_is_client());
+
+  EXPECT_TRUE(c2s->is_connected());
+  EXPECT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count());
+  EXPECT_TRUE(c2s->peer_is_osd());
+
+  // closed in reuse_connection() -- EPIPE when writing banner/hello
+  EXPECT_FALSE(s2c_accepter->is_connected());
+
+  // established exactly once, never faulted and reconnected
+  EXPECT_EQ(cli_interceptor->count_step(c2s.get(), 1), 1u);
+  EXPECT_EQ(cli_interceptor->count_step(c2s.get(), 13), 0u);
+  EXPECT_EQ(cli_interceptor->count_step(c2s.get(), 15), 1u);
+
+  client_msgr->shutdown();
+  client_msgr->wait();
+  server_msgr->shutdown();
+  server_msgr->wait();
+}
+
 /**
  * Scenario:
  *    - A connects to B