int64_t peer_id = -1; // [msgr2 only] the 0 of osd.0, 4567 or client.4567
safe_item_history<entity_addrvec_t> peer_addrs;
utime_t last_keepalive, last_keepalive_ack;
+ bool anon = false; ///< anonymous outgoing connection
private:
uint64_t features;
public:
return false;
}
+ bool is_anon() const {
+ return anon;
+ }
+
Messenger *get_messenger() {
return msgr;
}
* @param dest The entity to get a connection for.
*/
virtual ConnectionRef connect_to(
- int type, const entity_addrvec_t& dest) = 0;
- ConnectionRef connect_to_mon(const entity_addrvec_t& dest) {
- return connect_to(CEPH_ENTITY_TYPE_MON, dest);
+ int type, const entity_addrvec_t& dest,
+ bool anon=false) = 0;
+ ConnectionRef connect_to_mon(const entity_addrvec_t& dest, bool anon=false) {
+ return connect_to(CEPH_ENTITY_TYPE_MON, dest, anon);
}
- ConnectionRef connect_to_mds(const entity_addrvec_t& dest) {
- return connect_to(CEPH_ENTITY_TYPE_MDS, dest);
+ ConnectionRef connect_to_mds(const entity_addrvec_t& dest, bool anon=false) {
+ return connect_to(CEPH_ENTITY_TYPE_MDS, dest, anon);
}
- ConnectionRef connect_to_osd(const entity_addrvec_t& dest) {
- return connect_to(CEPH_ENTITY_TYPE_OSD, dest);
+ ConnectionRef connect_to_osd(const entity_addrvec_t& dest, bool anon=false) {
+ return connect_to(CEPH_ENTITY_TYPE_OSD, dest, anon);
}
- ConnectionRef connect_to_mgr(const entity_addrvec_t& dest) {
- return connect_to(CEPH_ENTITY_TYPE_MGR, dest);
+ ConnectionRef connect_to_mgr(const entity_addrvec_t& dest, bool anon=false) {
+ return connect_to(CEPH_ENTITY_TYPE_MGR, dest, anon);
}
/**
}
AsyncConnectionRef AsyncMessenger::create_connect(
- const entity_addrvec_t& addrs, int type)
+ const entity_addrvec_t& addrs, int type, bool anon)
{
ceph_assert(ceph_mutex_is_locked(lock));
Worker *w = stack->get_worker();
AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, w,
target.is_msgr2(), false);
+ conn->anon = anon;
conn->connect(addrs, type, target);
- ceph_assert(!conns.count(addrs));
- ldout(cct, 10) << __func__ << " " << conn << " " << addrs << " "
- << *conn->peer_addrs << dendl;
- conns[addrs] = conn;
+ if (anon) {
+ anon_conns.insert(conn);
+ } else {
+ ceph_assert(!conns.count(addrs));
+ ldout(cct, 10) << __func__ << " " << conn << " " << addrs << " "
+ << *conn->peer_addrs << dendl;
+ conns[addrs] = conn;
+ }
w->get_perf_counter()->inc(l_msgr_active_connections);
return conn;
return 0;
}
-ConnectionRef AsyncMessenger::connect_to(int type, const entity_addrvec_t& addrs)
+ConnectionRef AsyncMessenger::connect_to(int type,
+ const entity_addrvec_t& addrs,
+ bool anon)
{
std::lock_guard l{lock};
if (*my_addrs == addrs ||
auto av = _filter_addrs(addrs);
+ if (anon) {
+ return create_connect(av, type, anon);
+ }
+
AsyncConnectionRef conn = _lookup_conn(av);
if (conn) {
ldout(cct, 10) << __func__ << " " << av << " existing " << conn << dendl;
} else {
- conn = create_connect(av, type);
+ conn = create_connect(av, type, false);
ldout(cct, 10) << __func__ << " " << av << " new " << conn << dendl;
}
} else {
ldout(cct,20) << __func__ << " " << *m << " remote, " << dest_addrs
<< ", new connection." << dendl;
- auto&& new_con = create_connect(dest_addrs, dest_type);
+ auto&& new_con = create_connect(dest_addrs, dest_type, false);
new_con->send_message(m);
}
}
}
conns.clear();
+ for (const auto& c : anon_conns) {
+ ldout(cct, 5) << __func__ << " mark down " << c << dendl;
+ c->get_perf_counter()->dec(l_msgr_active_connections);
+ c->stop(queue_reset);
+ }
+ anon_conns.clear();
+
{
std::lock_guard l{deleted_lock};
if (cct->_conf->subsys.should_gather<ceph_subsys_ms, 5>()) {
if (conns_it != conns.end() && conns_it->second == c)
conns.erase(conns_it);
accepting_conns.erase(c);
+ anon_conns.erase(c);
++num;
}
deleted_conns.clear();
* @{
*/
ConnectionRef connect_to(int type,
- const entity_addrvec_t& addrs) override;
+ const entity_addrvec_t& addrs,
+ bool anon) override;
ConnectionRef get_loopback_connection() override;
void mark_down(const entity_addr_t& addr) override {
mark_down_addrs(entity_addrvec_t(addr));
* @return a pointer to the newly-created connection. Caller does not own a
* reference; take one if you need it.
*/
- AsyncConnectionRef create_connect(const entity_addrvec_t& addrs, int type);
+ AsyncConnectionRef create_connect(const entity_addrvec_t& addrs, int type,
+ bool anon);
/**
* Queue up a Message for delivery to the entity specified
*/
set<AsyncConnectionRef> accepting_conns;
+ /// anonymous outgoing connections
+ set<AsyncConnectionRef> anon_conns;
+
/**
* list of connection are closed which need to be clean up
*
client_msgr->wait();
}
+TEST_P(MessengerTest, AnonTest) {
+ Message *m;
+ FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
+ entity_addr_t bind_addr;
+ bind_addr.parse("v2:127.0.0.1");
+ Messenger::Policy p = Messenger::Policy::stateless_server(0);
+ server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p);
+ p = Messenger::Policy::lossy_client(0);
+ client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
+
+ server_msgr->bind(bind_addr);
+ server_msgr->add_dispatcher_head(&srv_dispatcher);
+ server_msgr->start();
+ client_msgr->add_dispatcher_head(&cli_dispatcher);
+ client_msgr->start();
+
+ ConnectionRef server_con_a, server_con_b;
+
+ // a
+ srv_dispatcher.last_accept_con_ptr = &server_con_a;
+ ConnectionRef con_a = client_msgr->connect_to(server_msgr->get_mytype(),
+ server_msgr->get_myaddrs(),
+ true);
+ {
+ m = new MPing();
+ ASSERT_EQ(con_a->send_message(m), 0);
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
+ cli_dispatcher.got_new = false;
+ }
+ ASSERT_EQ(1U, static_cast<Session*>(con_a->get_priv().get())->get_count());
+
+ // b
+ srv_dispatcher.last_accept_con_ptr = &server_con_b;
+ ConnectionRef con_b = client_msgr->connect_to(server_msgr->get_mytype(),
+ server_msgr->get_myaddrs(),
+ true);
+ {
+ m = new MPing();
+ ASSERT_EQ(con_b->send_message(m), 0);
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
+ cli_dispatcher.got_new = false;
+ }
+ ASSERT_EQ(1U, static_cast<Session*>(con_b->get_priv().get())->get_count());
+
+ // these should be distinct
+ ASSERT_NE(con_a, con_b);
+ ASSERT_NE(server_con_a, server_con_b);
+
+ // and both connected
+ {
+ m = new MPing();
+ ASSERT_EQ(con_a->send_message(m), 0);
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
+ cli_dispatcher.got_new = false;
+ }
+ {
+ m = new MPing();
+ ASSERT_EQ(con_b->send_message(m), 0);
+ std::unique_lock l{cli_dispatcher.lock};
+ cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
+ cli_dispatcher.got_new = false;
+ }
+
+ // clean up
+ con_a->mark_down();
+ ASSERT_FALSE(con_a->is_connected());
+ con_b->mark_down();
+ ASSERT_FALSE(con_b->is_connected());
+
+ server_msgr->shutdown();
+ client_msgr->shutdown();
+ server_msgr->wait();
+ client_msgr->wait();
+}
+
TEST_P(MessengerTest, ClientStandbyTest) {
Message *m;
FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);