From baff20a14bb322fc11bf13495bb8f5d5f4626116 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Fri, 26 Jun 2020 20:57:06 +0000 Subject: [PATCH] msg/async/ProtocolV2: take care of features when replacing the socket reuse_connection() can be called on exproto in BANNER_CONNECTING (i.e. without peer_supported_features and with tx/rx_frame_asm set to msgr2.0), but this state isn't carried over. If the donor connection is msgr2.1, this leads to repeated connection faults on crc or auth tag mismatches because we end up assembling 2.0 frames while the peer is expecting 2.1 frames. Fixes: https://tracker.ceph.com/issues/46180 Signed-off-by: Ilya Dryomov --- src/msg/async/ProtocolV2.cc | 4 ++ src/msg/async/frames_v2.h | 4 ++ src/test/msgr/test_msgr.cc | 110 ++++++++++++++++++++++++++++++++++++ 3 files changed, 118 insertions(+) diff --git a/src/msg/async/ProtocolV2.cc b/src/msg/async/ProtocolV2.cc index b7d4cacb151c..603a9a526e5d 100644 --- a/src/msg/async/ProtocolV2.cc +++ b/src/msg/async/ProtocolV2.cc @@ -2645,6 +2645,10 @@ CtPtr ProtocolV2::reuse_connection(const AsyncConnectionRef& existing, exproto->pre_auth.enabled = false; if (!reconnecting) { + exproto->peer_supported_features = peer_supported_features; + exproto->tx_frame_asm.set_is_rev1(tx_frame_asm.get_is_rev1()); + exproto->rx_frame_asm.set_is_rev1(rx_frame_asm.get_is_rev1()); + exproto->client_cookie = client_cookie; exproto->peer_name = peer_name; exproto->connection_features = connection_features; diff --git a/src/msg/async/frames_v2.h b/src/msg/async/frames_v2.h index 709bbbdd7363..94d4d1732b20 100644 --- a/src/msg/async/frames_v2.h +++ b/src/msg/async/frames_v2.h @@ -184,6 +184,10 @@ public: m_is_rev1 = is_rev1; } + bool get_is_rev1() { + return m_is_rev1; + } + size_t get_num_segments() const { ceph_assert(!m_descs.empty()); return m_descs.size(); diff --git a/src/test/msgr/test_msgr.cc b/src/test/msgr/test_msgr.cc index 599fb7d9a08a..15f5d36f69d0 100644 --- a/src/test/msgr/test_msgr.cc +++ b/src/test/msgr/test_msgr.cc @@ -16,6 +16,7 @@ #include #include +#include #include #include #include @@ -414,6 +415,115 @@ TEST_P(MessengerTest, ConnectionRaceTest) { delete srv_interceptor; } +/** + * Scenario: A connects to B, and B connects to A at the same time. + * The first (A -> B) connection gets to message flow handshake, the + * second (B -> A) connection is stuck waiting for a banner from A. + * After A sends client_ident to B, the first connection wins and B + * calls reuse_connection() to replace the second connection's socket + * while the second connection is still in BANNER_CONNECTING. + */ +TEST_P(MessengerTest, ConnectionRaceReuseBannerTest) { + FakeDispatcher cli_dispatcher(false), srv_dispatcher(false); + + auto cli_interceptor = std::make_unique(); + auto srv_interceptor = std::make_unique(); + + server_msgr->set_policy(entity_name_t::TYPE_CLIENT, + Messenger::Policy::lossless_peer_reuse(0)); + server_msgr->interceptor = srv_interceptor.get(); + + client_msgr->set_policy(entity_name_t::TYPE_OSD, + Messenger::Policy::lossless_peer_reuse(0)); + client_msgr->interceptor = cli_interceptor.get(); + + entity_addr_t bind_addr; + bind_addr.parse("v2:127.0.0.1:3300"); + server_msgr->bind(bind_addr); + server_msgr->add_dispatcher_head(&srv_dispatcher); + server_msgr->start(); + + bind_addr.parse("v2:127.0.0.1:3301"); + client_msgr->bind(bind_addr); + client_msgr->add_dispatcher_head(&cli_dispatcher); + client_msgr->start(); + + // pause before sending client_ident message + srv_interceptor->breakpoint(11); + + ConnectionRef s2c = server_msgr->connect_to(client_msgr->get_mytype(), + client_msgr->get_myaddrs()); + MPing *m1 = new MPing(); + ASSERT_EQ(s2c->send_message(m1), 0); + + srv_interceptor->wait(11); + srv_interceptor->remove_bp(11); + + // pause before sending banner + cli_interceptor->breakpoint(3); + + ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(), + server_msgr->get_myaddrs()); + MPing *m2 = new MPing(); + ASSERT_EQ(c2s->send_message(m2), 0); + + cli_interceptor->wait(3); + cli_interceptor->remove_bp(3); + + // second connection is in BANNER_CONNECTING, ensure it stays so + // and send client_ident + srv_interceptor->breakpoint(4); + srv_interceptor->proceed(11, Interceptor::ACTION::CONTINUE); + + // handle client_ident -- triggers reuse_connection() with exproto + // in BANNER_CONNECTING + cli_interceptor->breakpoint(15); + cli_interceptor->proceed(3, Interceptor::ACTION::CONTINUE); + + cli_interceptor->wait(15); + cli_interceptor->remove_bp(15); + + // first connection is in READY + Connection *s2c_accepter = srv_interceptor->wait(4); + srv_interceptor->remove_bp(4); + + srv_interceptor->proceed(4, Interceptor::ACTION::CONTINUE); + cli_interceptor->proceed(15, Interceptor::ACTION::CONTINUE); + + { + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); + cli_dispatcher.got_new = false; + } + + { + std::unique_lock l{srv_dispatcher.lock}; + srv_dispatcher.cond.wait(l, [&] { return srv_dispatcher.got_new; }); + srv_dispatcher.got_new = false; + } + + EXPECT_TRUE(s2c->is_connected()); + EXPECT_EQ(1u, static_cast(s2c->get_priv().get())->get_count()); + EXPECT_TRUE(s2c->peer_is_client()); + + EXPECT_TRUE(c2s->is_connected()); + EXPECT_EQ(1u, static_cast(c2s->get_priv().get())->get_count()); + EXPECT_TRUE(c2s->peer_is_osd()); + + // closed in reuse_connection() -- EPIPE when writing banner/hello + EXPECT_FALSE(s2c_accepter->is_connected()); + + // established exactly once, never faulted and reconnected + EXPECT_EQ(cli_interceptor->count_step(c2s.get(), 1), 1u); + EXPECT_EQ(cli_interceptor->count_step(c2s.get(), 13), 0u); + EXPECT_EQ(cli_interceptor->count_step(c2s.get(), 15), 1u); + + client_msgr->shutdown(); + client_msgr->wait(); + server_msgr->shutdown(); + server_msgr->wait(); +} + /** * Scenario: * - A connects to B -- 2.47.3