]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/async/ProtocolV2: take care of features when replacing the socket 35816/head
authorIlya Dryomov <idryomov@gmail.com>
Fri, 26 Jun 2020 20:57:06 +0000 (20:57 +0000)
committerIlya Dryomov <idryomov@gmail.com>
Sun, 28 Jun 2020 10:32:49 +0000 (10:32 +0000)
reuse_connection() can be called on exproto in BANNER_CONNECTING
(i.e. without peer_supported_features and with tx/rx_frame_asm set to
msgr2.0), but this state isn't carried over.  If the donor connection
is msgr2.1, this leads to repeated connection faults on crc or auth tag
mismatches because we end up assembling 2.0 frames while the peer is
expecting 2.1 frames.

Fixes: https://tracker.ceph.com/issues/46180
Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
src/msg/async/ProtocolV2.cc
src/msg/async/frames_v2.h
src/test/msgr/test_msgr.cc

index b7d4cacb151c0fb13ff7b95c0cb3e3c26d4e9730..603a9a526e5decb78142d749364733e642cccd98 100644 (file)
@@ -2645,6 +2645,10 @@ CtPtr ProtocolV2::reuse_connection(const AsyncConnectionRef& existing,
   exproto->pre_auth.enabled = false;
 
   if (!reconnecting) {
+    exproto->peer_supported_features = peer_supported_features;
+    exproto->tx_frame_asm.set_is_rev1(tx_frame_asm.get_is_rev1());
+    exproto->rx_frame_asm.set_is_rev1(rx_frame_asm.get_is_rev1());
+
     exproto->client_cookie = client_cookie;
     exproto->peer_name = peer_name;
     exproto->connection_features = connection_features;
index 709bbbdd7363f7ba6431f54f86e54b57d7d61a9c..94d4d1732b201fe4c64d5db3f7b0d13ca2379486 100644 (file)
@@ -184,6 +184,10 @@ public:
     m_is_rev1 = is_rev1;
   }
 
+  bool get_is_rev1() {
+    return m_is_rev1;
+  }
+
   size_t get_num_segments() const {
     ceph_assert(!m_descs.empty());
     return m_descs.size();
index 599fb7d9a08a1581fc681c6a389e309a5f45038a..15f5d36f69d0449cac589f5e415e83d6e26d11e4 100644 (file)
@@ -16,6 +16,7 @@
 
 #include <atomic>
 #include <iostream>
+#include <memory>
 #include <unistd.h>
 #include <stdlib.h>
 #include <time.h>
@@ -414,6 +415,115 @@ TEST_P(MessengerTest, ConnectionRaceTest) {
   delete srv_interceptor;
 }
 
+/**
+ * Scenario: A connects to B, and B connects to A at the same time.
+ * The first (A -> B) connection gets to message flow handshake, the
+ * second (B -> A) connection is stuck waiting for a banner from A.
+ * After A sends client_ident to B, the first connection wins and B
+ * calls reuse_connection() to replace the second connection's socket
+ * while the second connection is still in BANNER_CONNECTING.
+ */
+TEST_P(MessengerTest, ConnectionRaceReuseBannerTest) {
+  FakeDispatcher cli_dispatcher(false), srv_dispatcher(false);
+
+  auto cli_interceptor = std::make_unique<TestInterceptor>();
+  auto srv_interceptor = std::make_unique<TestInterceptor>();
+
+  server_msgr->set_policy(entity_name_t::TYPE_CLIENT,
+                          Messenger::Policy::lossless_peer_reuse(0));
+  server_msgr->interceptor = srv_interceptor.get();
+
+  client_msgr->set_policy(entity_name_t::TYPE_OSD,
+                          Messenger::Policy::lossless_peer_reuse(0));
+  client_msgr->interceptor = cli_interceptor.get();
+
+  entity_addr_t bind_addr;
+  bind_addr.parse("v2:127.0.0.1:3300");
+  server_msgr->bind(bind_addr);
+  server_msgr->add_dispatcher_head(&srv_dispatcher);
+  server_msgr->start();
+
+  bind_addr.parse("v2:127.0.0.1:3301");
+  client_msgr->bind(bind_addr);
+  client_msgr->add_dispatcher_head(&cli_dispatcher);
+  client_msgr->start();
+
+  // pause before sending client_ident message
+  srv_interceptor->breakpoint(11);
+
+  ConnectionRef s2c = server_msgr->connect_to(client_msgr->get_mytype(),
+                                              client_msgr->get_myaddrs());
+  MPing *m1 = new MPing();
+  ASSERT_EQ(s2c->send_message(m1), 0);
+
+  srv_interceptor->wait(11);
+  srv_interceptor->remove_bp(11);
+
+  // pause before sending banner
+  cli_interceptor->breakpoint(3);
+
+  ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(),
+                                              server_msgr->get_myaddrs());
+  MPing *m2 = new MPing();
+  ASSERT_EQ(c2s->send_message(m2), 0);
+
+  cli_interceptor->wait(3);
+  cli_interceptor->remove_bp(3);
+
+  // second connection is in BANNER_CONNECTING, ensure it stays so
+  // and send client_ident
+  srv_interceptor->breakpoint(4);
+  srv_interceptor->proceed(11, Interceptor::ACTION::CONTINUE);
+
+  // handle client_ident -- triggers reuse_connection() with exproto
+  // in BANNER_CONNECTING
+  cli_interceptor->breakpoint(15);
+  cli_interceptor->proceed(3, Interceptor::ACTION::CONTINUE);
+
+  cli_interceptor->wait(15);
+  cli_interceptor->remove_bp(15);
+
+  // first connection is in READY
+  Connection *s2c_accepter = srv_interceptor->wait(4);
+  srv_interceptor->remove_bp(4);
+
+  srv_interceptor->proceed(4, Interceptor::ACTION::CONTINUE);
+  cli_interceptor->proceed(15, Interceptor::ACTION::CONTINUE);
+
+  {
+    std::unique_lock l{cli_dispatcher.lock};
+    cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
+    cli_dispatcher.got_new = false;
+  }
+
+  {
+    std::unique_lock l{srv_dispatcher.lock};
+    srv_dispatcher.cond.wait(l, [&] { return srv_dispatcher.got_new; });
+    srv_dispatcher.got_new = false;
+  }
+
+  EXPECT_TRUE(s2c->is_connected());
+  EXPECT_EQ(1u, static_cast<Session*>(s2c->get_priv().get())->get_count());
+  EXPECT_TRUE(s2c->peer_is_client());
+
+  EXPECT_TRUE(c2s->is_connected());
+  EXPECT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count());
+  EXPECT_TRUE(c2s->peer_is_osd());
+
+  // closed in reuse_connection() -- EPIPE when writing banner/hello
+  EXPECT_FALSE(s2c_accepter->is_connected());
+
+  // established exactly once, never faulted and reconnected
+  EXPECT_EQ(cli_interceptor->count_step(c2s.get(), 1), 1u);
+  EXPECT_EQ(cli_interceptor->count_step(c2s.get(), 13), 0u);
+  EXPECT_EQ(cli_interceptor->count_step(c2s.get(), 15), 1u);
+
+  client_msgr->shutdown();
+  client_msgr->wait();
+  server_msgr->shutdown();
+  server_msgr->wait();
+}
+
 /**
  * Scenario:
  *    - A connects to B