From: Sage Weil Date: Fri, 6 Sep 2019 20:04:11 +0000 (-0500) Subject: msg/async: do not register lossy client connections X-Git-Tag: v15.1.0~1415^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=refs%2Fpull%2F30223%2Fhead;p=ceph.git msg/async: do not register lossy client connections If our policy is server + lossy, we do not need to track our incoming client connections by address. First, because it doesn't do us any good. Second, it is nicer if we don't, because we can allow multiple incoming connections from the same peer addr. Update a couple of tests: one doesn't apply any more, and the other needs a different way of getting the just-accepted con ref. Signed-off-by: Sage Weil --- diff --git a/src/msg/async/AsyncConnection.h b/src/msg/async/AsyncConnection.h index 3ce26e6d52a39..271e9eb9c7386 100644 --- a/src/msg/async/AsyncConnection.h +++ b/src/msg/async/AsyncConnection.h @@ -165,7 +165,9 @@ class AsyncConnection : public Connection { int state; ConnectedSocket cs; int port; +public: Messenger::Policy policy; +private: DispatchQueue *dispatch_queue; diff --git a/src/msg/async/AsyncMessenger.cc b/src/msg/async/AsyncMessenger.cc index 520642814860e..362068caaeaf8 100644 --- a/src/msg/async/AsyncMessenger.cc +++ b/src/msg/async/AsyncMessenger.cc @@ -861,6 +861,12 @@ int AsyncMessenger::get_proto_version(int peer_type, bool connect) const int AsyncMessenger::accept_conn(const AsyncConnectionRef& conn) { std::lock_guard l{lock}; + if (conn->policy.server && + conn->policy.lossy) { + anon_conns.insert(conn); + conn->get_perf_counter()->inc(l_msgr_active_connections); + return 0; + } auto it = conns.find(*conn->peer_addrs); if (it != conns.end()) { auto& existing = it->second; diff --git a/src/msg/async/ProtocolV1.cc b/src/msg/async/ProtocolV1.cc index 9f6160d7ef084..707ff18b92da4 100644 --- a/src/msg/async/ProtocolV1.cc +++ b/src/msg/async/ProtocolV1.cc @@ -2050,7 +2050,14 @@ CtPtr ProtocolV1::handle_connect_message_2() { // session security structure. PLR ldout(cct, 10) << __func__ << " accept setting up session_security." << dendl; - // existing? + if (connection->policy.server && + connection->policy.lossy) { + // incoming lossy client, no need to register this connection + // new session + ldout(cct, 10) << __func__ << " accept new session" << dendl; + return open(reply, authorizer_reply); + } + AsyncConnectionRef existing = messenger->lookup_conn(*connection->peer_addrs); connection->inject_delay(); diff --git a/src/msg/async/ProtocolV2.cc b/src/msg/async/ProtocolV2.cc index af02cc2d7291d..e176c2563fbf6 100644 --- a/src/msg/async/ProtocolV2.cc +++ b/src/msg/async/ProtocolV2.cc @@ -2357,34 +2357,40 @@ CtPtr ProtocolV2::handle_client_ident(ceph::bufferlist &payload) peer_global_seq = client_ident.global_seq(); - // Looks good so far, let's check if there is already an existing connection - // to this peer. - - connection->lock.unlock(); - AsyncConnectionRef existing = messenger->lookup_conn(*connection->peer_addrs); - - if (existing && - existing->protocol->proto_type != 2) { - ldout(cct,1) << __func__ << " existing " << existing << " proto " - << existing->protocol.get() << " version is " - << existing->protocol->proto_type << ", marking down" << dendl; - existing->mark_down(); - existing = nullptr; - } + if (connection->policy.server && + connection->policy.lossy) { + // incoming lossy client, no need to register this connection + } else { + // Looks good so far, let's check if there is already an existing connection + // to this peer. + connection->lock.unlock(); + AsyncConnectionRef existing = messenger->lookup_conn( + *connection->peer_addrs); + + if (existing && + existing->protocol->proto_type != 2) { + ldout(cct,1) << __func__ << " existing " << existing << " proto " + << existing->protocol.get() << " version is " + << existing->protocol->proto_type << ", marking down" + << dendl; + existing->mark_down(); + existing = nullptr; + } - connection->inject_delay(); + connection->inject_delay(); - connection->lock.lock(); - if (state != SESSION_ACCEPTING) { - ldout(cct, 1) << __func__ - << " state changed while accept, it must be mark_down" - << dendl; - ceph_assert(state == CLOSED); - return _fault(); - } + connection->lock.lock(); + if (state != SESSION_ACCEPTING) { + ldout(cct, 1) << __func__ + << " state changed while accept, it must be mark_down" + << dendl; + ceph_assert(state == CLOSED); + return _fault(); + } - if (existing) { - return handle_existing_connection(existing); + if (existing) { + return handle_existing_connection(existing); + } } // if everything is OK reply with server identification diff --git a/src/test/msgr/test_msgr.cc b/src/test/msgr/test_msgr.cc index 0f49b3c4d284e..599fb7d9a08a1 100644 --- a/src/test/msgr/test_msgr.cc +++ b/src/test/msgr/test_msgr.cc @@ -109,6 +109,7 @@ class FakeDispatcher : public Dispatcher { bool got_connect; bool loopback; entity_addrvec_t last_accept; + ConnectionRef *last_accept_con_ptr = nullptr; explicit FakeDispatcher(bool s): Dispatcher(g_ceph_context), is_server(s), got_new(false), got_remote_reset(false), @@ -139,6 +140,9 @@ class FakeDispatcher : public Dispatcher { } void ms_handle_fast_accept(Connection *con) override { last_accept = con->get_peer_addrs(); + if (last_accept_con_ptr) { + *last_accept_con_ptr = con; + } if (!con->get_priv()) { con->set_priv(RefCountedPtr{new Session(con), false}); } @@ -943,39 +947,6 @@ TEST_P(MessengerTest, SimpleMsgr2Test) { server_msgr->wait(); } -TEST_P(MessengerTest, NameAddrTest) { - FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); - entity_addr_t bind_addr; - bind_addr.parse("v2:127.0.0.1"); - server_msgr->bind(bind_addr); - server_msgr->add_dispatcher_head(&srv_dispatcher); - server_msgr->start(); - - client_msgr->add_dispatcher_head(&cli_dispatcher); - client_msgr->start(); - - MPing *m = new MPing(); - ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(), - server_msgr->get_myaddrs()); - { - ASSERT_EQ(conn->send_message(m), 0); - std::unique_lock l{cli_dispatcher.lock}; - cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); - cli_dispatcher.got_new = false; - } - ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); - ASSERT_TRUE(conn->get_peer_addrs() == server_msgr->get_myaddrs()); - ConnectionRef server_conn = server_msgr->connect_to( - client_msgr->get_mytype(), srv_dispatcher.last_accept); - // Verify that server_conn is the one we already accepted from client, - // so it means the session counter in server_conn is also incremented. - ASSERT_EQ(1U, static_cast(server_conn->get_priv().get())->get_count()); - server_msgr->shutdown(); - client_msgr->shutdown(); - server_msgr->wait(); - client_msgr->wait(); -} - TEST_P(MessengerTest, FeatureTest) { FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); entity_addr_t bind_addr; @@ -1199,6 +1170,8 @@ TEST_P(MessengerTest, StatelessTest) { ASSERT_FALSE(conn->is_connected()); srv_dispatcher.got_new = false; + ConnectionRef server_conn; + srv_dispatcher.last_accept_con_ptr = &server_conn; conn = client_msgr->connect_to(server_msgr->get_mytype(), server_msgr->get_myaddrs()); { @@ -1209,8 +1182,8 @@ TEST_P(MessengerTest, StatelessTest) { cli_dispatcher.got_new = false; } ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); - ConnectionRef server_conn = server_msgr->connect_to(client_msgr->get_mytype(), - srv_dispatcher.last_accept); + ASSERT_TRUE(server_conn); + // server lose state { std::unique_lock l{srv_dispatcher.lock};