From 0efec7445ebfb5ada3eda897dcd004d92773961e Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Fri, 6 Sep 2019 14:34:05 -0500 Subject: [PATCH] msg/async: allow anonymous client-side connections If the connection mode is lossy, allow us to open a new connection to a target, regardless of whether other such connections are already open. This allows for single-use connections. If you call this multiple times, you'll get separate, distinct connections. We are lucky that the cleanup infrastructure for AsyncMessenger just works without modification. :) Signed-off-by: Sage Weil --- src/msg/Connection.h | 5 +++ src/msg/Messenger.h | 19 ++++---- src/msg/async/AsyncMessenger.cc | 35 +++++++++++---- src/msg/async/AsyncMessenger.h | 9 +++- src/test/msgr/test_msgr.cc | 78 +++++++++++++++++++++++++++++++++ 5 files changed, 127 insertions(+), 19 deletions(-) diff --git a/src/msg/Connection.h b/src/msg/Connection.h index e892a68431e2a..927e045bce779 100644 --- a/src/msg/Connection.h +++ b/src/msg/Connection.h @@ -50,6 +50,7 @@ struct Connection : public RefCountedObject { int64_t peer_id = -1; // [msgr2 only] the 0 of osd.0, 4567 or client.4567 safe_item_history peer_addrs; utime_t last_keepalive, last_keepalive_ack; + bool anon = false; ///< anonymous outgoing connection private: uint64_t features; public: @@ -118,6 +119,10 @@ public: return false; } + bool is_anon() const { + return anon; + } + Messenger *get_messenger() { return msgr; } diff --git a/src/msg/Messenger.h b/src/msg/Messenger.h index edd202f3256bf..fdf36cac330fd 100644 --- a/src/msg/Messenger.h +++ b/src/msg/Messenger.h @@ -531,18 +531,19 @@ public: * @param dest The entity to get a connection for. */ virtual ConnectionRef connect_to( - int type, const entity_addrvec_t& dest) = 0; - ConnectionRef connect_to_mon(const entity_addrvec_t& dest) { - return connect_to(CEPH_ENTITY_TYPE_MON, dest); + int type, const entity_addrvec_t& dest, + bool anon=false) = 0; + ConnectionRef connect_to_mon(const entity_addrvec_t& dest, bool anon=false) { + return connect_to(CEPH_ENTITY_TYPE_MON, dest, anon); } - ConnectionRef connect_to_mds(const entity_addrvec_t& dest) { - return connect_to(CEPH_ENTITY_TYPE_MDS, dest); + ConnectionRef connect_to_mds(const entity_addrvec_t& dest, bool anon=false) { + return connect_to(CEPH_ENTITY_TYPE_MDS, dest, anon); } - ConnectionRef connect_to_osd(const entity_addrvec_t& dest) { - return connect_to(CEPH_ENTITY_TYPE_OSD, dest); + ConnectionRef connect_to_osd(const entity_addrvec_t& dest, bool anon=false) { + return connect_to(CEPH_ENTITY_TYPE_OSD, dest, anon); } - ConnectionRef connect_to_mgr(const entity_addrvec_t& dest) { - return connect_to(CEPH_ENTITY_TYPE_MGR, dest); + ConnectionRef connect_to_mgr(const entity_addrvec_t& dest, bool anon=false) { + return connect_to(CEPH_ENTITY_TYPE_MGR, dest, anon); } /** diff --git a/src/msg/async/AsyncMessenger.cc b/src/msg/async/AsyncMessenger.cc index fc1107279e667..520642814860e 100644 --- a/src/msg/async/AsyncMessenger.cc +++ b/src/msg/async/AsyncMessenger.cc @@ -563,7 +563,7 @@ void AsyncMessenger::add_accept(Worker *w, ConnectedSocket cli_socket, } AsyncConnectionRef AsyncMessenger::create_connect( - const entity_addrvec_t& addrs, int type) + const entity_addrvec_t& addrs, int type, bool anon) { ceph_assert(ceph_mutex_is_locked(lock)); @@ -587,11 +587,16 @@ AsyncConnectionRef AsyncMessenger::create_connect( Worker *w = stack->get_worker(); AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, w, target.is_msgr2(), false); + conn->anon = anon; conn->connect(addrs, type, target); - ceph_assert(!conns.count(addrs)); - ldout(cct, 10) << __func__ << " " << conn << " " << addrs << " " - << *conn->peer_addrs << dendl; - conns[addrs] = conn; + if (anon) { + anon_conns.insert(conn); + } else { + ceph_assert(!conns.count(addrs)); + ldout(cct, 10) << __func__ << " " << conn << " " << addrs << " " + << *conn->peer_addrs << dendl; + conns[addrs] = conn; + } w->get_perf_counter()->inc(l_msgr_active_connections); return conn; @@ -663,7 +668,9 @@ int AsyncMessenger::send_to(Message *m, int type, const entity_addrvec_t& addrs) return 0; } -ConnectionRef AsyncMessenger::connect_to(int type, const entity_addrvec_t& addrs) +ConnectionRef AsyncMessenger::connect_to(int type, + const entity_addrvec_t& addrs, + bool anon) { std::lock_guard l{lock}; if (*my_addrs == addrs || @@ -675,11 +682,15 @@ ConnectionRef AsyncMessenger::connect_to(int type, const entity_addrvec_t& addrs auto av = _filter_addrs(addrs); + if (anon) { + return create_connect(av, type, anon); + } + AsyncConnectionRef conn = _lookup_conn(av); if (conn) { ldout(cct, 10) << __func__ << " " << av << " existing " << conn << dendl; } else { - conn = create_connect(av, type); + conn = create_connect(av, type, false); ldout(cct, 10) << __func__ << " " << av << " new " << conn << dendl; } @@ -727,7 +738,7 @@ void AsyncMessenger::submit_message(Message *m, const AsyncConnectionRef& con, } else { ldout(cct,20) << __func__ << " " << *m << " remote, " << dest_addrs << ", new connection." << dendl; - auto&& new_con = create_connect(dest_addrs, dest_type); + auto&& new_con = create_connect(dest_addrs, dest_type, false); new_con->send_message(m); } } @@ -798,6 +809,13 @@ void AsyncMessenger::shutdown_connections(bool queue_reset) } conns.clear(); + for (const auto& c : anon_conns) { + ldout(cct, 5) << __func__ << " mark down " << c << dendl; + c->get_perf_counter()->dec(l_msgr_active_connections); + c->stop(queue_reset); + } + anon_conns.clear(); + { std::lock_guard l{deleted_lock}; if (cct->_conf->subsys.should_gather()) { @@ -930,6 +948,7 @@ int AsyncMessenger::reap_dead() if (conns_it != conns.end() && conns_it->second == c) conns.erase(conns_it); accepting_conns.erase(c); + anon_conns.erase(c); ++num; } deleted_conns.clear(); diff --git a/src/msg/async/AsyncMessenger.h b/src/msg/async/AsyncMessenger.h index 5c1c6ba397ec9..71b5d8ba9ba51 100644 --- a/src/msg/async/AsyncMessenger.h +++ b/src/msg/async/AsyncMessenger.h @@ -147,7 +147,8 @@ public: * @{ */ ConnectionRef connect_to(int type, - const entity_addrvec_t& addrs) override; + const entity_addrvec_t& addrs, + bool anon) override; ConnectionRef get_loopback_connection() override; void mark_down(const entity_addr_t& addr) override { mark_down_addrs(entity_addrvec_t(addr)); @@ -196,7 +197,8 @@ private: * @return a pointer to the newly-created connection. Caller does not own a * reference; take one if you need it. */ - AsyncConnectionRef create_connect(const entity_addrvec_t& addrs, int type); + AsyncConnectionRef create_connect(const entity_addrvec_t& addrs, int type, + bool anon); /** * Queue up a Message for delivery to the entity specified @@ -282,6 +284,9 @@ private: */ set accepting_conns; + /// anonymous outgoing connections + set anon_conns; + /** * list of connection are closed which need to be clean up * diff --git a/src/test/msgr/test_msgr.cc b/src/test/msgr/test_msgr.cc index c107b3695f796..0f49b3c4d284e 100644 --- a/src/test/msgr/test_msgr.cc +++ b/src/test/msgr/test_msgr.cc @@ -1241,6 +1241,84 @@ TEST_P(MessengerTest, StatelessTest) { client_msgr->wait(); } +TEST_P(MessengerTest, AnonTest) { + Message *m; + FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); + entity_addr_t bind_addr; + bind_addr.parse("v2:127.0.0.1"); + Messenger::Policy p = Messenger::Policy::stateless_server(0); + server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p); + p = Messenger::Policy::lossy_client(0); + client_msgr->set_policy(entity_name_t::TYPE_OSD, p); + + server_msgr->bind(bind_addr); + server_msgr->add_dispatcher_head(&srv_dispatcher); + server_msgr->start(); + client_msgr->add_dispatcher_head(&cli_dispatcher); + client_msgr->start(); + + ConnectionRef server_con_a, server_con_b; + + // a + srv_dispatcher.last_accept_con_ptr = &server_con_a; + ConnectionRef con_a = client_msgr->connect_to(server_msgr->get_mytype(), + server_msgr->get_myaddrs(), + true); + { + m = new MPing(); + ASSERT_EQ(con_a->send_message(m), 0); + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); + cli_dispatcher.got_new = false; + } + ASSERT_EQ(1U, static_cast(con_a->get_priv().get())->get_count()); + + // b + srv_dispatcher.last_accept_con_ptr = &server_con_b; + ConnectionRef con_b = client_msgr->connect_to(server_msgr->get_mytype(), + server_msgr->get_myaddrs(), + true); + { + m = new MPing(); + ASSERT_EQ(con_b->send_message(m), 0); + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); + cli_dispatcher.got_new = false; + } + ASSERT_EQ(1U, static_cast(con_b->get_priv().get())->get_count()); + + // these should be distinct + ASSERT_NE(con_a, con_b); + ASSERT_NE(server_con_a, server_con_b); + + // and both connected + { + m = new MPing(); + ASSERT_EQ(con_a->send_message(m), 0); + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); + cli_dispatcher.got_new = false; + } + { + m = new MPing(); + ASSERT_EQ(con_b->send_message(m), 0); + std::unique_lock l{cli_dispatcher.lock}; + cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; }); + cli_dispatcher.got_new = false; + } + + // clean up + con_a->mark_down(); + ASSERT_FALSE(con_a->is_connected()); + con_b->mark_down(); + ASSERT_FALSE(con_b->is_connected()); + + server_msgr->shutdown(); + client_msgr->shutdown(); + server_msgr->wait(); + client_msgr->wait(); +} + TEST_P(MessengerTest, ClientStandbyTest) { Message *m; FakeDispatcher cli_dispatcher(false), srv_dispatcher(true); -- 2.39.5