]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
msg/async: do not register lossy client connections
authorSage Weil <sage@redhat.com>
Fri, 6 Sep 2019 20:04:11 +0000 (15:04 -0500)
committerSage Weil <sage@redhat.com>
Wed, 25 Sep 2019 15:39:15 +0000 (10:39 -0500)
If our policy is server + lossy, we do not need to track our incoming
client connections by address.  First, because it doesn't do us any good.
Second, it is nicer if we don't, because we can allow multiple incoming
connections from the same peer addr.

Update a couple of tests: one doesn't apply any more, and the other needs
a different way of getting the just-accepted con ref.

Signed-off-by: Sage Weil <sage@redhat.com>
src/msg/async/AsyncConnection.h
src/msg/async/AsyncMessenger.cc
src/msg/async/ProtocolV1.cc
src/msg/async/ProtocolV2.cc
src/test/msgr/test_msgr.cc

index 3ce26e6d52a3925f5a3d8e62b5cbec89fb911160..271e9eb9c7386416ebed539ed04f02224ba563d2 100644 (file)
@@ -165,7 +165,9 @@ class AsyncConnection : public Connection {
   int state;
   ConnectedSocket cs;
   int port;
+public:
   Messenger::Policy policy;
+private:
 
   DispatchQueue *dispatch_queue;
 
index 520642814860ee35b41fcfc914b444d77884b5df..362068caaeaf83d44abd0e88d7898dd0b6084869 100644 (file)
@@ -861,6 +861,12 @@ int AsyncMessenger::get_proto_version(int peer_type, bool connect) const
 int AsyncMessenger::accept_conn(const AsyncConnectionRef& conn)
 {
   std::lock_guard l{lock};
+  if (conn->policy.server &&
+      conn->policy.lossy) {
+    anon_conns.insert(conn);
+    conn->get_perf_counter()->inc(l_msgr_active_connections);
+    return 0;
+  }
   auto it = conns.find(*conn->peer_addrs);
   if (it != conns.end()) {
     auto& existing = it->second;
index 9f6160d7ef0844bda6a637a147b7a50e969825be..707ff18b92da42a0be5a17157d86a22aec54c431 100644 (file)
@@ -2050,7 +2050,14 @@ CtPtr ProtocolV1::handle_connect_message_2() {
   // session security structure.  PLR
   ldout(cct, 10) << __func__ << " accept setting up session_security." << dendl;
 
-  // existing?
+  if (connection->policy.server &&
+      connection->policy.lossy) {
+    // incoming lossy client, no need to register this connection
+    // new session
+    ldout(cct, 10) << __func__ << " accept new session" << dendl;
+    return open(reply, authorizer_reply);
+  }
+
   AsyncConnectionRef existing = messenger->lookup_conn(*connection->peer_addrs);
 
   connection->inject_delay();
index af02cc2d7291d2bfb24db7d299c86fe1c13089bc..e176c2563fbf6ba5a87abd11fb9a6a5364c5933e 100644 (file)
@@ -2357,34 +2357,40 @@ CtPtr ProtocolV2::handle_client_ident(ceph::bufferlist &payload)
 
   peer_global_seq = client_ident.global_seq();
 
-  // Looks good so far, let's check if there is already an existing connection
-  // to this peer.
-
-  connection->lock.unlock();
-  AsyncConnectionRef existing = messenger->lookup_conn(*connection->peer_addrs);
-
-  if (existing &&
-      existing->protocol->proto_type != 2) {
-    ldout(cct,1) << __func__ << " existing " << existing << " proto "
-                << existing->protocol.get() << " version is "
-                << existing->protocol->proto_type << ", marking down" << dendl;
-    existing->mark_down();
-    existing = nullptr;
-  }
+  if (connection->policy.server &&
+      connection->policy.lossy) {
+    // incoming lossy client, no need to register this connection
+  } else {
+    // Looks good so far, let's check if there is already an existing connection
+    // to this peer.
+    connection->lock.unlock();
+    AsyncConnectionRef existing = messenger->lookup_conn(
+      *connection->peer_addrs);
+
+    if (existing &&
+       existing->protocol->proto_type != 2) {
+      ldout(cct,1) << __func__ << " existing " << existing << " proto "
+                  << existing->protocol.get() << " version is "
+                  << existing->protocol->proto_type << ", marking down"
+                  << dendl;
+      existing->mark_down();
+      existing = nullptr;
+    }
 
-  connection->inject_delay();
+    connection->inject_delay();
 
-  connection->lock.lock();
-  if (state != SESSION_ACCEPTING) {
-    ldout(cct, 1) << __func__
-                  << " state changed while accept, it must be mark_down"
-                  << dendl;
-    ceph_assert(state == CLOSED);
-    return _fault();
-  }
+    connection->lock.lock();
+    if (state != SESSION_ACCEPTING) {
+      ldout(cct, 1) << __func__
+                   << " state changed while accept, it must be mark_down"
+                   << dendl;
+      ceph_assert(state == CLOSED);
+      return _fault();
+    }
 
-  if (existing) {
-    return handle_existing_connection(existing);
+    if (existing) {
+      return handle_existing_connection(existing);
+    }
   }
 
   // if everything is OK reply with server identification
index 0f49b3c4d284ede6c16b62e0fc627ba2216f6367..599fb7d9a08a1581fc681c6a389e309a5f45038a 100644 (file)
@@ -109,6 +109,7 @@ class FakeDispatcher : public Dispatcher {
   bool got_connect;
   bool loopback;
   entity_addrvec_t last_accept;
+  ConnectionRef *last_accept_con_ptr = nullptr;
 
   explicit FakeDispatcher(bool s): Dispatcher(g_ceph_context),
                           is_server(s), got_new(false), got_remote_reset(false),
@@ -139,6 +140,9 @@ class FakeDispatcher : public Dispatcher {
   }
   void ms_handle_fast_accept(Connection *con) override {
     last_accept = con->get_peer_addrs();
+    if (last_accept_con_ptr) {
+      *last_accept_con_ptr = con;
+    }
     if (!con->get_priv()) {
       con->set_priv(RefCountedPtr{new Session(con), false});
     }
@@ -943,39 +947,6 @@ TEST_P(MessengerTest, SimpleMsgr2Test) {
   server_msgr->wait();
 }
 
-TEST_P(MessengerTest, NameAddrTest) {
-  FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
-  entity_addr_t bind_addr;
-  bind_addr.parse("v2:127.0.0.1");
-  server_msgr->bind(bind_addr);
-  server_msgr->add_dispatcher_head(&srv_dispatcher);
-  server_msgr->start();
-
-  client_msgr->add_dispatcher_head(&cli_dispatcher);
-  client_msgr->start();
-
-  MPing *m = new MPing();
-  ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
-                                              server_msgr->get_myaddrs());
-  {
-    ASSERT_EQ(conn->send_message(m), 0);
-    std::unique_lock l{cli_dispatcher.lock};
-    cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
-    cli_dispatcher.got_new = false;
-  }
-  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
-  ASSERT_TRUE(conn->get_peer_addrs() == server_msgr->get_myaddrs());
-  ConnectionRef server_conn = server_msgr->connect_to(
-    client_msgr->get_mytype(), srv_dispatcher.last_accept);
-  // Verify that server_conn is the one we already accepted from client,
-  // so it means the session counter in server_conn is also incremented.
-  ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv().get())->get_count());
-  server_msgr->shutdown();
-  client_msgr->shutdown();
-  server_msgr->wait();
-  client_msgr->wait();
-}
-
 TEST_P(MessengerTest, FeatureTest) {
   FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
   entity_addr_t bind_addr;
@@ -1199,6 +1170,8 @@ TEST_P(MessengerTest, StatelessTest) {
   ASSERT_FALSE(conn->is_connected());
 
   srv_dispatcher.got_new = false;
+  ConnectionRef server_conn;
+  srv_dispatcher.last_accept_con_ptr = &server_conn;
   conn = client_msgr->connect_to(server_msgr->get_mytype(),
                                      server_msgr->get_myaddrs());
   {
@@ -1209,8 +1182,8 @@ TEST_P(MessengerTest, StatelessTest) {
     cli_dispatcher.got_new = false;
   }
   ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
-  ConnectionRef server_conn = server_msgr->connect_to(client_msgr->get_mytype(),
-                                                     srv_dispatcher.last_accept);
+  ASSERT_TRUE(server_conn);
+
   // server lose state
   {
     std::unique_lock l{srv_dispatcher.lock};