]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/async: allow anonymous client-side connections
authorSage Weil <sage@redhat.com>
Fri, 6 Sep 2019 19:34:05 +0000 (14:34 -0500)
committerSage Weil <sage@redhat.com>
Wed, 25 Sep 2019 15:39:15 +0000 (10:39 -0500)
If the connection mode is lossy, allow us to open a new connection to
a target, regardless of whether other such connections are already open.
This allows for single-use connections.  If you call this multiple times,
you'll get separate, distinct connections.

We are lucky that the cleanup infrastructure for AsyncMessenger just works
without modification.  :)

Signed-off-by: Sage Weil <sage@redhat.com>
src/msg/Connection.h
src/msg/Messenger.h
src/msg/async/AsyncMessenger.cc
src/msg/async/AsyncMessenger.h
src/test/msgr/test_msgr.cc

index e892a68431e2a9f5b77b1d3d8ea64d4aef5ac172..927e045bce7798cda18dc25cf3364422e2f8ef41 100644 (file)
@@ -50,6 +50,7 @@ struct Connection : public RefCountedObject {
   int64_t peer_id = -1;  // [msgr2 only] the 0 of osd.0, 4567 or client.4567
   safe_item_history<entity_addrvec_t> peer_addrs;
   utime_t last_keepalive, last_keepalive_ack;
+  bool anon = false;  ///< anonymous outgoing connection
 private:
   uint64_t features;
 public:
@@ -118,6 +119,10 @@ public:
     return false;
   }
 
+  bool is_anon() const {
+    return anon;
+  }
+
   Messenger *get_messenger() {
     return msgr;
   }
index edd202f3256bf16aa53254ecb0713887d20b396d..fdf36cac330fdd1bb42d1a6b5588164fadfc4527 100644 (file)
@@ -531,18 +531,19 @@ public:
    * @param dest The entity to get a connection for.
    */
   virtual ConnectionRef connect_to(
-    int type, const entity_addrvec_t& dest) = 0;
-  ConnectionRef connect_to_mon(const entity_addrvec_t& dest) {
-    return connect_to(CEPH_ENTITY_TYPE_MON, dest);
+    int type, const entity_addrvec_t& dest,
+    bool anon=false) = 0;
+  ConnectionRef connect_to_mon(const entity_addrvec_t& dest, bool anon=false) {
+    return connect_to(CEPH_ENTITY_TYPE_MON, dest, anon);
   }
-  ConnectionRef connect_to_mds(const entity_addrvec_t& dest) {
-    return connect_to(CEPH_ENTITY_TYPE_MDS, dest);
+  ConnectionRef connect_to_mds(const entity_addrvec_t& dest, bool anon=false) {
+    return connect_to(CEPH_ENTITY_TYPE_MDS, dest, anon);
   }
-  ConnectionRef connect_to_osd(const entity_addrvec_t& dest) {
-    return connect_to(CEPH_ENTITY_TYPE_OSD, dest);
+  ConnectionRef connect_to_osd(const entity_addrvec_t& dest, bool anon=false) {
+    return connect_to(CEPH_ENTITY_TYPE_OSD, dest, anon);
   }
-  ConnectionRef connect_to_mgr(const entity_addrvec_t& dest) {
-    return connect_to(CEPH_ENTITY_TYPE_MGR, dest);
+  ConnectionRef connect_to_mgr(const entity_addrvec_t& dest, bool anon=false) {
+    return connect_to(CEPH_ENTITY_TYPE_MGR, dest, anon);
   }
 
   /**
index fc1107279e6678331dd4de5aef458f4411439efd..520642814860ee35b41fcfc914b444d77884b5df 100644 (file)
@@ -563,7 +563,7 @@ void AsyncMessenger::add_accept(Worker *w, ConnectedSocket cli_socket,
 }
 
 AsyncConnectionRef AsyncMessenger::create_connect(
-  const entity_addrvec_t& addrs, int type)
+  const entity_addrvec_t& addrs, int type, bool anon)
 {
   ceph_assert(ceph_mutex_is_locked(lock));
 
@@ -587,11 +587,16 @@ AsyncConnectionRef AsyncMessenger::create_connect(
   Worker *w = stack->get_worker();
   AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, w,
                                                target.is_msgr2(), false);
+  conn->anon = anon;
   conn->connect(addrs, type, target);
-  ceph_assert(!conns.count(addrs));
-  ldout(cct, 10) << __func__ << " " << conn << " " << addrs << " "
-                << *conn->peer_addrs << dendl;
-  conns[addrs] = conn;
+  if (anon) {
+    anon_conns.insert(conn);
+  } else {
+    ceph_assert(!conns.count(addrs));
+    ldout(cct, 10) << __func__ << " " << conn << " " << addrs << " "
+                  << *conn->peer_addrs << dendl;
+    conns[addrs] = conn;
+  }
   w->get_perf_counter()->inc(l_msgr_active_connections);
 
   return conn;
@@ -663,7 +668,9 @@ int AsyncMessenger::send_to(Message *m, int type, const entity_addrvec_t& addrs)
   return 0;
 }
 
-ConnectionRef AsyncMessenger::connect_to(int type, const entity_addrvec_t& addrs)
+ConnectionRef AsyncMessenger::connect_to(int type,
+                                        const entity_addrvec_t& addrs,
+                                        bool anon)
 {
   std::lock_guard l{lock};
   if (*my_addrs == addrs ||
@@ -675,11 +682,15 @@ ConnectionRef AsyncMessenger::connect_to(int type, const entity_addrvec_t& addrs
 
   auto av = _filter_addrs(addrs);
 
+  if (anon) {
+    return create_connect(av, type, anon);
+  }
+
   AsyncConnectionRef conn = _lookup_conn(av);
   if (conn) {
     ldout(cct, 10) << __func__ << " " << av << " existing " << conn << dendl;
   } else {
-    conn = create_connect(av, type);
+    conn = create_connect(av, type, false);
     ldout(cct, 10) << __func__ << " " << av << " new " << conn << dendl;
   }
 
@@ -727,7 +738,7 @@ void AsyncMessenger::submit_message(Message *m, const AsyncConnectionRef& con,
   } else {
     ldout(cct,20) << __func__ << " " << *m << " remote, " << dest_addrs
                  << ", new connection." << dendl;
-    auto&& new_con = create_connect(dest_addrs, dest_type);
+    auto&& new_con = create_connect(dest_addrs, dest_type, false);
     new_con->send_message(m);
   }
 }
@@ -798,6 +809,13 @@ void AsyncMessenger::shutdown_connections(bool queue_reset)
   }
   conns.clear();
 
+  for (const auto& c : anon_conns) {
+    ldout(cct, 5) << __func__ << " mark down " << c << dendl;
+    c->get_perf_counter()->dec(l_msgr_active_connections);
+    c->stop(queue_reset);
+  }
+  anon_conns.clear();
+
   {
     std::lock_guard l{deleted_lock};
     if (cct->_conf->subsys.should_gather<ceph_subsys_ms, 5>()) {
@@ -930,6 +948,7 @@ int AsyncMessenger::reap_dead()
       if (conns_it != conns.end() && conns_it->second == c)
         conns.erase(conns_it);
       accepting_conns.erase(c);
+      anon_conns.erase(c);
       ++num;
     }
     deleted_conns.clear();
index 5c1c6ba397ec97ddb2c694f00a187e5c68f7f404..71b5d8ba9ba51d555d401b8538fed2a6c35da155 100644 (file)
@@ -147,7 +147,8 @@ public:
    * @{
    */
   ConnectionRef connect_to(int type,
-                          const entity_addrvec_t& addrs) override;
+                          const entity_addrvec_t& addrs,
+                          bool anon) override;
   ConnectionRef get_loopback_connection() override;
   void mark_down(const entity_addr_t& addr) override {
     mark_down_addrs(entity_addrvec_t(addr));
@@ -196,7 +197,8 @@ private:
    * @return a pointer to the newly-created connection. Caller does not own a
    * reference; take one if you need it.
    */
-  AsyncConnectionRef create_connect(const entity_addrvec_t& addrs, int type);
+  AsyncConnectionRef create_connect(const entity_addrvec_t& addrs, int type,
+                                   bool anon);
 
   /**
    * Queue up a Message for delivery to the entity specified
@@ -282,6 +284,9 @@ private:
    */
   set<AsyncConnectionRef> accepting_conns;
 
+  /// anonymous outgoing connections
+  set<AsyncConnectionRef> anon_conns;
+
   /**
    * list of connection are closed which need to be clean up
    *
index c107b3695f7966809397598e239c0d4c2bc1a808..0f49b3c4d284ede6c16b62e0fc627ba2216f6367 100644 (file)
@@ -1241,6 +1241,84 @@ TEST_P(MessengerTest, StatelessTest) {
   client_msgr->wait();
 }
 
+TEST_P(MessengerTest, AnonTest) {
+  Message *m;
+  FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
+  entity_addr_t bind_addr;
+  bind_addr.parse("v2:127.0.0.1");
+  Messenger::Policy p = Messenger::Policy::stateless_server(0);
+  server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p);
+  p = Messenger::Policy::lossy_client(0);
+  client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
+
+  server_msgr->bind(bind_addr);
+  server_msgr->add_dispatcher_head(&srv_dispatcher);
+  server_msgr->start();
+  client_msgr->add_dispatcher_head(&cli_dispatcher);
+  client_msgr->start();
+
+  ConnectionRef server_con_a, server_con_b;
+
+  // a
+  srv_dispatcher.last_accept_con_ptr = &server_con_a;
+  ConnectionRef con_a = client_msgr->connect_to(server_msgr->get_mytype(),
+                                           server_msgr->get_myaddrs(),
+                                           true);
+  {
+    m = new MPing();
+    ASSERT_EQ(con_a->send_message(m), 0);
+    std::unique_lock l{cli_dispatcher.lock};
+    cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
+    cli_dispatcher.got_new = false;
+  }
+  ASSERT_EQ(1U, static_cast<Session*>(con_a->get_priv().get())->get_count());
+
+  // b
+  srv_dispatcher.last_accept_con_ptr = &server_con_b;
+  ConnectionRef con_b = client_msgr->connect_to(server_msgr->get_mytype(),
+                                           server_msgr->get_myaddrs(),
+                                           true);
+  {
+    m = new MPing();
+    ASSERT_EQ(con_b->send_message(m), 0);
+    std::unique_lock l{cli_dispatcher.lock};
+    cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
+    cli_dispatcher.got_new = false;
+  }
+  ASSERT_EQ(1U, static_cast<Session*>(con_b->get_priv().get())->get_count());
+
+  // these should be distinct
+  ASSERT_NE(con_a, con_b);
+  ASSERT_NE(server_con_a, server_con_b);
+
+  // and both connected
+  {
+    m = new MPing();
+    ASSERT_EQ(con_a->send_message(m), 0);
+    std::unique_lock l{cli_dispatcher.lock};
+    cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
+    cli_dispatcher.got_new = false;
+  }
+  {
+    m = new MPing();
+    ASSERT_EQ(con_b->send_message(m), 0);
+    std::unique_lock l{cli_dispatcher.lock};
+    cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
+    cli_dispatcher.got_new = false;
+  }
+
+  // clean up
+  con_a->mark_down();
+  ASSERT_FALSE(con_a->is_connected());
+  con_b->mark_down();
+  ASSERT_FALSE(con_b->is_connected());
+
+  server_msgr->shutdown();
+  client_msgr->shutdown();
+  server_msgr->wait();
+  client_msgr->wait();
+}
+
 TEST_P(MessengerTest, ClientStandbyTest) {
   Message *m;
   FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);