From c48a29b9edde3c6d3cd34252d202885e2e064fe0 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Fri, 6 Sep 2019 15:04:11 -0500 Subject: [PATCH] msg/async: do not register lossy client connections If our policy is server + lossy, we do not need to track our incoming client connections by address. First, because it doesn't do us any good. Second, it is nicer if we don't, because we can allow multiple incoming connections from the same peer addr. Update a couple of tests: one doesn't apply any more, and the other needs a different way of getting the just-accepted con ref. Signed-off-by: Sage Weil --- src/msg/async/AsyncConnection.h | 2 ++ src/msg/async/AsyncMessenger.cc | 6 ++++ src/msg/async/ProtocolV1.cc | 9 +++++- src/msg/async/ProtocolV2.cc | 56 ++++++++++++++++++--------------- src/test/msgr/test_msgr.cc | 43 +++++-------------------- 5 files changed, 55 insertions(+), 61 deletions(-) diff --git a/src/msg/async/AsyncConnection.h b/src/msg/async/AsyncConnection.h index 3ce26e6d52a..271e9eb9c73 100644 --- a/src/msg/async/AsyncConnection.h +++ b/src/msg/async/AsyncConnection.h @@ -165,7 +165,9 @@ class AsyncConnection : public Connection { int state; ConnectedSocket cs; int port; +public: Messenger::Policy policy; +private: DispatchQueue *dispatch_queue; diff --git a/src/msg/async/AsyncMessenger.cc b/src/msg/async/AsyncMessenger.cc index 52064281486..362068caaea 100644 --- a/src/msg/async/AsyncMessenger.cc +++ b/src/msg/async/AsyncMessenger.cc @@ -861,6 +861,12 @@ int AsyncMessenger::get_proto_version(int peer_type, bool connect) const int AsyncMessenger::accept_conn(const AsyncConnectionRef& conn) { std::lock_guard l{lock}; + if (conn->policy.server && + conn->policy.lossy) { + anon_conns.insert(conn); + conn->get_perf_counter()->inc(l_msgr_active_connections); + return 0; + } auto it = conns.find(*conn->peer_addrs); if (it != conns.end()) { auto& existing = it->second; diff --git a/src/msg/async/ProtocolV1.cc b/src/msg/async/ProtocolV1.cc index 9f6160d7ef0..707ff18b92d 100644 --- a/src/msg/async/ProtocolV1.cc +++ b/src/msg/async/ProtocolV1.cc @@ -2050,7 +2050,14 @@ CtPtr ProtocolV1::handle_connect_message_2() { // session security structure. PLR ldout(cct, 10) << __func__ << " accept setting up session_security." << dendl; - // existing? + if (connection->policy.server && + connection->policy.lossy) { + // incoming lossy client, no need to register this connection + // new session + ldout(cct, 10) << __func__ << " accept new session" << dendl; + return open(reply, authorizer_reply); + } + AsyncConnectionRef existing = messenger->lookup_conn(*connection->peer_addrs); connection->inject_delay(); diff --git a/src/msg/async/ProtocolV2.cc b/src/msg/async/ProtocolV2.cc index af02cc2d729..e176c2563fb 100644 --- a/src/msg/async/ProtocolV2.cc +++ b/src/msg/async/ProtocolV2.cc @@ -2357,34 +2357,40 @@ CtPtr ProtocolV2::handle_client_ident(ceph::bufferlist &payload) peer_global_seq = client_ident.global_seq(); - // Looks good so far, let's check if there is already an existing connection - // to this peer. - - connection->lock.unlock(); - AsyncConnectionRef existing = messenger->lookup_conn(*connection->peer_addrs); - - if (existing && - existing->protocol->proto_type != 2) { - ldout(cct,1) << __func__ << " existing " << existing << " proto " - << existing->protocol.get() << " version is " - << existing->protocol->proto_type << ", marking down" << dendl; - existing->mark_down(); - existing = nullptr; - } + if (connection->policy.server && + connection->policy.lossy) { + // incoming lossy client, no need to register this connection + } else { + // Looks good so far, let's check if there is already an existing connection + // to this peer. + connection->lock.unlock(); + AsyncConnectionRef existing = messenger->lookup_conn( + *connection->peer_addrs); + + if (existing && + existing->protocol->proto_type != 2) { + ldout(cct,1) << __func__ << " existing " << existing << " proto " + << existing->protocol.get() << " version is " + << existing->protocol->proto_type << ", marking down" + << dendl; + existing->mark_down(); + existing = nullptr; + } - connection->inject_delay(); + connection->inject_delay(); - connection->lock.lock(); - if (state != SESSION_ACCEPTING) { - ldout(cct, 1) << __func__ - << " state changed while accept, it must be mark_down" - << dendl; - ceph_assert(state == CLOSED); - return _fault(); - } + connection->lock.lock(); + if (state != SESSION_ACCEPTING) { + ldout(cct, 1) << __func__ + << " state changed while accept, it must be mark_down" + << dendl; + ceph_assert(state == CLOSED); + return _fault(); + } - if (existing) { - return handle_existing_connection(existing); + if (existing) { + return handle_existing_connection(existing); + } } // if everything is OK reply with server identification diff --git a/src/test/msgr/test_msgr.cc b/src/test/msgr/test_msgr.cc index 0f49b3c4d28..599fb7d9a08 100644 --- a/src/test/msgr/test_msgr.cc +++ b/src/test/msgr/test_msgr.cc @@ -109,6 +109,7 @@ class FakeDispatcher : public Dispatcher { bool got_connect; bool loopback; entity_addrvec_t last_accept; + ConnectionRef *last_accept_con_ptr = nullptr; explicit FakeDispatcher(bool s): Dispatcher(g_ceph_context), is_server(s), got_new(false), got_remote_reset(false), @@ -139,6 +140,9 @@ class FakeDispatcher : public Dispatcher { } void ms_handle_fast_accept(Connection *con) override { last_accept = con->get_peer_addrs(); + if (last_accept_con_ptr) { + *last_accept_con_ptr = con; + } if (!con->get_priv()) { con->set_priv(RefCountedPtr{new Session(con), false}); } @@ -943,39 +947,6 @@ TEST_P(MessengerTest, SimpleMsgr2Test) { server_msgr->wait(); } -TEST_P(MessengerTest, NameAddrTest) { - FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); - entity_addr_t bind_addr; - bind_addr.parse("v2:127.0.0.1"); - server_msgr->bind(bind_addr); - server_msgr->add_dispatcher_head(&srv_dispatcher); - server_msgr->start(); - - client_msgr->add_dispatcher_head(&cli_dispatcher); - client_msgr->start(); - - MPing *m = new MPing(); - ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(), - server_msgr->get_myaddrs()); - { - ASSERT_EQ(conn->send_message(m), 0); - std::unique_lock l{cli_dispatcher.lock}; - cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); - cli_dispatcher.got_new = false; - } - ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); - ASSERT_TRUE(conn->get_peer_addrs() == server_msgr->get_myaddrs()); - ConnectionRef server_conn = server_msgr->connect_to( - client_msgr->get_mytype(), srv_dispatcher.last_accept); - // Verify that server_conn is the one we already accepted from client, - // so it means the session counter in server_conn is also incremented. - ASSERT_EQ(1U, static_cast(server_conn->get_priv().get())->get_count()); - server_msgr->shutdown(); - client_msgr->shutdown(); - server_msgr->wait(); - client_msgr->wait(); -} - TEST_P(MessengerTest, FeatureTest) { FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); entity_addr_t bind_addr; @@ -1199,6 +1170,8 @@ TEST_P(MessengerTest, StatelessTest) { ASSERT_FALSE(conn->is_connected()); srv_dispatcher.got_new = false; + ConnectionRef server_conn; + srv_dispatcher.last_accept_con_ptr = &server_conn; conn = client_msgr->connect_to(server_msgr->get_mytype(), server_msgr->get_myaddrs()); { @@ -1209,8 +1182,8 @@ TEST_P(MessengerTest, StatelessTest) { cli_dispatcher.got_new = false; } ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); - ConnectionRef server_conn = server_msgr->connect_to(client_msgr->get_mytype(), - srv_dispatcher.last_accept); + ASSERT_TRUE(server_conn); + // server lose state { std::unique_lock l{srv_dispatcher.lock}; -- 2.39.5