From 9d213377a835f45b4351bfcfe59e79d3a4bc10bd Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Sat, 26 May 2018 08:42:07 -0500 Subject: [PATCH] msg/Messenger: *myinst -> *myname and *myaddr Initial step moving Messenger interface away from entity_inst_t. This is mostly s/my_inst.addr/my_addr/ and s/my_inst.name/my_name/. Also add Connection::get_peer_addrs() and Messenger::get_myaddrs(). Signed-off-by: Sage Weil --- src/msg/Connection.h | 3 + src/msg/Messenger.h | 57 ++++++++++--------- src/msg/async/AsyncConnection.cc | 4 +- src/msg/async/AsyncMessenger.cc | 38 ++++++------- src/msg/async/AsyncMessenger.h | 4 +- src/msg/simple/Accepter.cc | 6 +- src/msg/simple/Pipe.cc | 16 +++--- src/msg/simple/SimpleMessenger.cc | 38 ++++++------- src/osd/OSD.cc | 2 +- src/test/mon/test-mon-msg.cc | 3 +- src/test/msgr/test_msgr.cc | 95 +++++++++++++++++++++---------- 11 files changed, 152 insertions(+), 114 deletions(-) diff --git a/src/msg/Connection.h b/src/msg/Connection.h index a19092b93eb..13d99d73595 100644 --- a/src/msg/Connection.h +++ b/src/msg/Connection.h @@ -158,6 +158,9 @@ public: bool peer_is_client() const { return peer_type == CEPH_ENTITY_TYPE_CLIENT; } const entity_addr_t& get_peer_addr() const { return peer_addr; } + entity_addrvec_t get_peer_addrs() const { + return entity_addrvec_t(peer_addr); + } void set_peer_addr(const entity_addr_t& a) { peer_addr = a; } uint64_t get_features() const { return features; } diff --git a/src/msg/Messenger.h b/src/msg/Messenger.h index ebb84566ffb..2c8b94c7d7f 100644 --- a/src/msg/Messenger.h +++ b/src/msg/Messenger.h @@ -48,7 +48,11 @@ private: protected: /// the "name" of the local daemon. eg client.99 - entity_inst_t my_inst; + entity_name_t my_name; + + /// my addr + entity_addr_t my_addr; + int default_send_priority; /// set to true once the Messenger has started, and set to false on shutdown bool started; @@ -141,15 +145,13 @@ public: */ Messenger(CephContext *cct_, entity_name_t w) : trace_endpoint("0.0.0.0", 0, "Messenger"), - my_inst(), - default_send_priority(CEPH_MSG_PRIO_DEFAULT), started(false), + my_name(w), + default_send_priority(CEPH_MSG_PRIO_DEFAULT), + started(false), magic(0), socket_priority(-1), cct(cct_), - crcflags(get_default_crc_flags(cct->_conf)) - { - my_inst.name = w; - } + crcflags(get_default_crc_flags(cct->_conf)) {} virtual ~Messenger() {} /** @@ -192,20 +194,15 @@ public: * @defgroup Accessors * @{ */ + int get_mytype() const { return my_name.type(); } + /** - * Retrieve the Messenger's instance. + * Retrieve the Messenger's name * - * @return A const reference to the instance this Messenger + * @return A const reference to the name this Messenger * currently believes to be its own. */ - const entity_inst_t& get_myinst() { return my_inst; } - /** - * set messenger's instance - */ - void set_myinst(entity_inst_t i) { my_inst = i; } - - uint32_t get_magic() { return magic; } - void set_magic(int _magic) { magic = _magic; } + const entity_name_t& get_myname() { return my_name; } /** * Retrieve the Messenger's address. @@ -213,14 +210,24 @@ public: * @return A const reference to the address this Messenger * currently believes to be its own. */ - const entity_addr_t& get_myaddr() { return my_inst.addr; } + const entity_addr_t& get_myaddr() { return my_addr; } + entity_addrvec_t get_myaddrs() { + return entity_addrvec_t(my_addr); + } + + /** + * set messenger's instance + */ + uint32_t get_magic() { return magic; } + void set_magic(int _magic) { magic = _magic; } + protected: /** * set messenger's address */ virtual void set_myaddr(const entity_addr_t& a) { - my_inst.addr = a; - set_endpoint_addr(a, my_inst.name); + my_addr = a; + set_endpoint_addr(a, my_name); } public: /** @@ -230,13 +237,6 @@ public: return &trace_endpoint; } - /** - * Retrieve the Messenger's name. - * - * @return A const reference to the name this Messenger - * currently believes to be its own. - */ - const entity_name_t& get_myname() { return my_inst.name; } /** * Set the name of the local entity. The name is reported to others and * can be changed while the system is running, but doing so at incorrect @@ -244,7 +244,8 @@ public: * * @param m The name to set. */ - void set_myname(const entity_name_t& m) { my_inst.name = m; } + void set_myname(const entity_name_t& m) { my_name = m; } + /** * Set the unknown address components for this Messenger. * This is useful if the Messenger doesn't know its full address just by diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index b4569d0215e..27b7094f0b6 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -33,7 +33,7 @@ #undef dout_prefix #define dout_prefix _conn_prefix(_dout) ostream& AsyncConnection::_conn_prefix(std::ostream *_dout) { - return *_dout << "-- " << async_msgr->get_myinst().addr << " >> " << peer_addr << " conn(" << this + return *_dout << "-- " << async_msgr->get_myaddr() << " >> " << peer_addr << " conn(" << this << " :" << port << " s=" << get_state_name(state) << " pgs=" << peer_global_seq @@ -1017,7 +1017,7 @@ ssize_t AsyncConnection::_process_connection() bufferlist bl; connect_msg.features = policy.features_supported; - connect_msg.host_type = async_msgr->get_myinst().name.type(); + connect_msg.host_type = async_msgr->get_myname().type(); connect_msg.global_seq = global_seq; connect_msg.connect_seq = connect_seq; connect_msg.protocol_version = async_msgr->get_proto_version(peer_type, true); diff --git a/src/msg/async/AsyncMessenger.cc b/src/msg/async/AsyncMessenger.cc index 032d7314c86..dfa0bee4bed 100644 --- a/src/msg/async/AsyncMessenger.cc +++ b/src/msg/async/AsyncMessenger.cc @@ -386,7 +386,7 @@ int AsyncMessenger::rebind(const set& avoid_ports) // adjust the nonce; we want our entity_addr_t to be truly unique. nonce += 1000000; ldout(cct, 10) << __func__ << " new nonce " << nonce - << " and inst " << get_myinst() << dendl; + << " and addr " << get_myaddr() << dendl; entity_addr_t bound_addr; entity_addr_t bind_addr = get_myaddr(); @@ -417,7 +417,7 @@ int AsyncMessenger::client_bind(const entity_addr_t &bind_addr) return 0; Mutex::Locker l(lock); if (did_bind) { - assert(my_inst.addr == bind_addr); + assert(my_addr == bind_addr); return 0; } if (started) { @@ -446,7 +446,7 @@ void AsyncMessenger::_finish_bind(const entity_addr_t& bind_addr, init_local_connection(); - ldout(cct,1) << __func__ << " bind my_inst.addr is " << get_myaddr() << dendl; + ldout(cct,1) << __func__ << " bind my_addr is " << get_myaddr() << dendl; did_bind = true; } @@ -456,14 +456,14 @@ int AsyncMessenger::start() ldout(cct,1) << __func__ << " start" << dendl; // register at least one entity, first! - assert(my_inst.name.type() >= 0); + assert(my_name.type() >= 0); assert(!started); started = true; stopped = false; if (!did_bind) { - my_inst.addr.nonce = nonce; + my_addr.nonce = nonce; _init_local_connection(); } @@ -512,7 +512,7 @@ void AsyncMessenger::add_accept(Worker *w, ConnectedSocket cli_socket, entity_ad AsyncConnectionRef AsyncMessenger::create_connect(const entity_addr_t& addr, int type) { assert(lock.is_locked()); - assert(addr != my_inst.addr); + assert(addr != my_addr); ldout(cct, 10) << __func__ << " " << addr << ", creating connection and registering" << dendl; @@ -531,7 +531,7 @@ AsyncConnectionRef AsyncMessenger::create_connect(const entity_addr_t& addr, int ConnectionRef AsyncMessenger::get_connection(const entity_inst_t& dest) { Mutex::Locker l(lock); - if (my_inst.addr == dest.addr) { + if (my_addr == dest.addr) { // local return local_connection; } @@ -600,7 +600,7 @@ void AsyncMessenger::submit_message(Message *m, AsyncConnectionRef con, } // local? - if (my_inst.addr == dest_addr) { + if (my_addr == dest_addr) { // local local_connection->send_message(m); return ; @@ -621,16 +621,16 @@ void AsyncMessenger::submit_message(Message *m, AsyncConnectionRef con, } /** - * If my_inst.addr doesn't have an IP set, this function + * If my_addr doesn't have an IP set, this function * will fill it in from the passed addr. Otherwise it does nothing and returns. */ void AsyncMessenger::set_addr_unknowns(const entity_addr_t &addr) { Mutex::Locker l(lock); - if (my_inst.addr.is_blank_ip()) { - int port = my_inst.addr.get_port(); - my_inst.addr.u = addr.u; - my_inst.addr.set_port(port); + if (my_addr.is_blank_ip()) { + int port = my_addr.get_port(); + my_addr.u = addr.u; + my_addr.set_port(port); _init_local_connection(); } } @@ -692,7 +692,7 @@ void AsyncMessenger::mark_down(const entity_addr_t& addr) int AsyncMessenger::get_proto_version(int peer_type, bool connect) const { - int my_type = my_inst.name.type(); + int my_type = my_name.type(); // set reply protocol version if (peer_type == my_type) { @@ -712,7 +712,7 @@ int AsyncMessenger::get_proto_version(int peer_type, bool connect) const void AsyncMessenger::learned_addr(const entity_addr_t &peer_addr_for_me) { // be careful here: multiple threads may block here, and readers of - // my_inst.addr do NOT hold any lock. + // my_addr do NOT hold any lock. // this always goes from true -> false under the protection of the // mutex. if it is already false, we need not retake the mutex at @@ -723,10 +723,10 @@ void AsyncMessenger::learned_addr(const entity_addr_t &peer_addr_for_me) if (need_addr) { need_addr = false; entity_addr_t t = peer_addr_for_me; - t.set_port(my_inst.addr.get_port()); - t.set_nonce(my_inst.addr.get_nonce()); - my_inst.addr = t; - ldout(cct, 1) << __func__ << " learned my addr " << my_inst.addr << dendl; + t.set_port(my_addr.get_port()); + t.set_nonce(my_addr.get_nonce()); + my_addr = t; + ldout(cct, 1) << __func__ << " learned my addr " << my_addr << dendl; _init_local_connection(); } lock.Unlock(); diff --git a/src/msg/async/AsyncMessenger.h b/src/msg/async/AsyncMessenger.h index a07ed4c69c3..c517f3fdb43 100644 --- a/src/msg/async/AsyncMessenger.h +++ b/src/msg/async/AsyncMessenger.h @@ -319,8 +319,8 @@ private: void _init_local_connection() { assert(lock.is_locked()); - local_connection->peer_addr = my_inst.addr; - local_connection->peer_type = my_inst.name.type(); + local_connection->peer_addr = my_addr; + local_connection->peer_type = my_name.type(); local_connection->set_features(CEPH_FEATURES_ALL); ms_deliver_handle_fast_connect(local_connection.get()); } diff --git a/src/msg/simple/Accepter.cc b/src/msg/simple/Accepter.cc index a95e046c7ba..9bb3e355ee2 100644 --- a/src/msg/simple/Accepter.cc +++ b/src/msg/simple/Accepter.cc @@ -270,9 +270,9 @@ int Accepter::rebind(const set& avoid_ports) // adjust the nonce; we want our entity_addr_t to be truly unique. nonce += 1000000; - msgr->my_inst.addr.nonce = nonce; - ldout(msgr->cct,10) << __func__ << " new nonce " << nonce << " and inst " - << msgr->my_inst << dendl; + msgr->my_addr.nonce = nonce; + ldout(msgr->cct,10) << __func__ << " new nonce " << nonce << " and addr " + << msgr->my_addr << dendl; ldout(msgr->cct,10) << " will try " << addr << " and avoid ports " << new_avoid << dendl; int r = bind(addr, new_avoid); diff --git a/src/msg/simple/Pipe.cc b/src/msg/simple/Pipe.cc index f452258369c..9f44dd8b7c0 100644 --- a/src/msg/simple/Pipe.cc +++ b/src/msg/simple/Pipe.cc @@ -44,7 +44,7 @@ #undef dout_prefix #define dout_prefix *_dout << *this ostream& Pipe::_pipe_prefix(std::ostream &out) const { - return out << "-- " << msgr->get_myinst().addr << " >> " << peer_addr << " pipe(" << this + return out << "-- " << msgr->get_myaddr() << " >> " << peer_addr << " pipe(" << this << " sd=" << sd << " :" << port << " s=" << state << " pgs=" << peer_global_seq @@ -363,9 +363,9 @@ int Pipe::accept() } // and my addr - encode(msgr->my_inst.addr, addrs, 0); // legacy + encode(msgr->my_addr, addrs, 0); // legacy - port = msgr->my_inst.addr.get_port(); + port = msgr->my_addr.get_port(); // and peer's socket addr (they might not know their ip) sockaddr_storage ss; @@ -611,7 +611,7 @@ int Pipe::accept() } // connection race? - if (peer_addr < msgr->my_inst.addr || + if (peer_addr < msgr->my_addr || existing->policy.server) { // incoming wins ldout(msgr->cct,10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq @@ -630,7 +630,7 @@ int Pipe::accept() // our existing outgoing wins ldout(msgr->cct,10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq << " == " << connect.connect_seq << ", sending WAIT" << dendl; - assert(peer_addr > msgr->my_inst.addr); + assert(peer_addr > msgr->my_addr); if (!(existing->state == STATE_CONNECTING)) lderr(msgr->cct) << "accept race bad state, would send wait, existing=" << existing->get_state_name() @@ -1100,7 +1100,7 @@ int Pipe::connect() msgr->learned_addr(peer_addr_for_me); - encode(msgr->my_inst.addr, myaddrbl, 0); // legacy + encode(msgr->my_addr, myaddrbl, 0); // legacy memset(&msg, 0, sizeof(msg)); msgvec[0].iov_base = myaddrbl.c_str(); @@ -1113,7 +1113,7 @@ int Pipe::connect() ldout(msgr->cct,2) << "connect couldn't write my addr, " << cpp_strerror(rc) << dendl; goto fail; } - ldout(msgr->cct,10) << "connect sent my addr " << msgr->my_inst.addr << dendl; + ldout(msgr->cct,10) << "connect sent my addr " << msgr->my_addr << dendl; while (1) { @@ -1123,7 +1123,7 @@ int Pipe::connect() ceph_msg_connect connect; connect.features = policy.features_supported; - connect.host_type = msgr->get_myinst().name.type(); + connect.host_type = msgr->get_myname().type(); connect.global_seq = gseq; connect.connect_seq = cseq; connect.protocol_version = msgr->get_proto_version(peer_type, true); diff --git a/src/msg/simple/SimpleMessenger.cc b/src/msg/simple/SimpleMessenger.cc index 4918c52a7af..36a0ee371a1 100644 --- a/src/msg/simple/SimpleMessenger.cc +++ b/src/msg/simple/SimpleMessenger.cc @@ -149,10 +149,10 @@ int SimpleMessenger::_send_message(Message *m, Connection *con) */ void SimpleMessenger::set_addr_unknowns(const entity_addr_t &addr) { - if (my_inst.addr.is_blank_ip()) { - int port = my_inst.addr.get_port(); - my_inst.addr.u = addr.u; - my_inst.addr.set_port(port); + if (my_addr.is_blank_ip()) { + int port = my_addr.get_port(); + my_addr.u = addr.u; + my_addr.set_port(port); init_local_connection(); } } @@ -167,7 +167,7 @@ void SimpleMessenger::set_addr(const entity_addr_t &addr) int SimpleMessenger::get_proto_version(int peer_type, bool connect) { - int my_type = my_inst.name.type(); + int my_type = my_name.type(); // set reply protocol version if (peer_type == my_type) { @@ -319,7 +319,7 @@ int SimpleMessenger::client_bind(const entity_addr_t &bind_addr) return 0; Mutex::Locker l(lock); if (did_bind) { - assert(my_inst.addr == bind_addr); + assert(my_addr == bind_addr); return 0; } if (started) { @@ -339,14 +339,14 @@ int SimpleMessenger::start() ldout(cct,1) << "messenger.start" << dendl; // register at least one entity, first! - assert(my_inst.name.type() >= 0); + assert(my_name.type() >= 0); assert(!started); started = true; stopped = false; if (!did_bind) { - my_inst.addr.nonce = nonce; + my_addr.nonce = nonce; init_local_connection(); } @@ -380,7 +380,7 @@ Pipe *SimpleMessenger::connect_rank(const entity_addr_t& addr, Message *first) { assert(lock.is_locked()); - assert(addr != my_inst.addr); + assert(addr != my_addr); ldout(cct,10) << "connect_rank to " << addr << ", creating pipe and registering" << dendl; @@ -421,7 +421,7 @@ bool SimpleMessenger::verify_authorizer(Connection *con, int peer_type, ConnectionRef SimpleMessenger::get_connection(const entity_inst_t& dest) { Mutex::Locker l(lock); - if (my_inst.addr == dest.addr) { + if (my_addr == dest.addr) { // local return local_connection; } @@ -502,7 +502,7 @@ void SimpleMessenger::submit_message(Message *m, PipeConnection *con, } // local? - if (my_inst.addr == dest_addr) { + if (my_addr == dest_addr) { // local ldout(cct,20) << "submit_message " << *m << " local" << dendl; m->set_connection(local_connection.get()); @@ -723,7 +723,7 @@ void SimpleMessenger::mark_disposable(Connection *con) void SimpleMessenger::learned_addr(const entity_addr_t &peer_addr_for_me) { // be careful here: multiple threads may block here, and readers of - // my_inst.addr do NOT hold any lock. + // my_addr do NOT hold any lock. // this always goes from true -> false under the protection of the // mutex. if it is already false, we need not retake the mutex at @@ -734,12 +734,12 @@ void SimpleMessenger::learned_addr(const entity_addr_t &peer_addr_for_me) lock.Lock(); if (need_addr) { entity_addr_t t = peer_addr_for_me; - t.set_port(my_inst.addr.get_port()); - t.set_nonce(my_inst.addr.get_nonce()); - ANNOTATE_BENIGN_RACE_SIZED(&my_inst.addr, sizeof(my_inst.addr), + t.set_port(my_addr.get_port()); + t.set_nonce(my_addr.get_nonce()); + ANNOTATE_BENIGN_RACE_SIZED(&my_addr, sizeof(my_addr), "SimpleMessenger learned addr"); - my_inst.addr = t; - ldout(cct,1) << "learned my addr " << my_inst.addr << dendl; + my_addr = t; + ldout(cct,1) << "learned my addr " << my_addr << dendl; need_addr = false; init_local_connection(); } @@ -748,8 +748,8 @@ void SimpleMessenger::learned_addr(const entity_addr_t &peer_addr_for_me) void SimpleMessenger::init_local_connection() { - local_connection->peer_addr = my_inst.addr; - local_connection->peer_type = my_inst.name.type(); + local_connection->peer_addr = my_addr; + local_connection->peer_type = my_name.type(); local_connection->set_features(CEPH_FEATURES_ALL); ms_deliver_handle_fast_connect(local_connection.get()); } diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 8a90d9d0370..f60e0ef7e05 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -7436,7 +7436,7 @@ void OSD::_committed_osd_maps(epoch_t first, epoch_t last, MOSDMap *m) service.retrieve_epochs(&boot_epoch, &up_epoch, NULL); if (!up_epoch && osdmap->is_up(whoami) && - osdmap->get_inst(whoami) == client_messenger->get_myinst()) { + osdmap->get_addr(whoami) == client_messenger->get_myaddr()) { up_epoch = osdmap->get_epoch(); dout(10) << "up_epoch is " << up_epoch << dendl; if (!boot_epoch) { diff --git a/src/test/mon/test-mon-msg.cc b/src/test/mon/test-mon-msg.cc index 7e9832057d0..bb2c4638a65 100644 --- a/src/test/mon/test-mon-msg.cc +++ b/src/test/mon/test-mon-msg.cc @@ -301,7 +301,8 @@ TEST_F(MonMsgTest, MRouteTest) Message *payload = new MGenericMessage(CEPH_MSG_SHUTDOWN); MRoute *m = new MRoute; m->msg = payload; - m->dest = msg->get_myinst(); + m->dest.addr = msg->get_myaddr(); + m->dest.name = entity_name_t(msg->get_mytype(), -1); Message *r = send_wait_reply(m, CEPH_MSG_SHUTDOWN); // we want an error ASSERT_NE(IS_ERR(r), 0); diff --git a/src/test/msgr/test_msgr.cc b/src/test/msgr/test_msgr.cc index 772c5a0200c..9a36af697ea 100644 --- a/src/test/msgr/test_msgr.cc +++ b/src/test/msgr/test_msgr.cc @@ -226,7 +226,8 @@ TEST_P(MessengerTest, SimpleTest) { // 1. simple round trip MPing *m = new MPing(); - ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst()); + ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(), + server_msgr->get_myaddrs()); { ASSERT_EQ(conn->send_message(m), 0); Mutex::Locker l(cli_dispatcher.lock); @@ -240,12 +241,18 @@ TEST_P(MessengerTest, SimpleTest) { // 2. test rebind port set avoid_ports; - for (int i = 0; i < 10 ; i++) - avoid_ports.insert(server_msgr->get_myaddr().get_port() + i); + for (int i = 0; i < 10 ; i++) { + for (auto a : server_msgr->get_myaddrs().v) { + avoid_ports.insert(a.get_port() + i); + } + } server_msgr->rebind(avoid_ports); - ASSERT_TRUE(avoid_ports.count(server_msgr->get_myaddr().get_port()) == 0); + for (auto a : server_msgr->get_myaddrs().v) { + ASSERT_TRUE(avoid_ports.count(a.get_port()) == 0); + } - conn = client_msgr->get_connection(server_msgr->get_myinst()); + conn = client_msgr->connect_to(server_msgr->get_mytype(), + server_msgr->get_myaddrs()); { m = new MPing(); ASSERT_EQ(conn->send_message(m), 0); @@ -300,7 +307,8 @@ TEST_P(MessengerTest, NameAddrTest) { client_msgr->start(); MPing *m = new MPing(); - ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst()); + ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(), + server_msgr->get_myaddrs()); { ASSERT_EQ(conn->send_message(m), 0); Mutex::Locker l(cli_dispatcher.lock); @@ -309,8 +317,9 @@ TEST_P(MessengerTest, NameAddrTest) { cli_dispatcher.got_new = false; } ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); - ASSERT_TRUE(conn->get_peer_addr() == server_msgr->get_myaddr()); - ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst()); + ASSERT_TRUE(conn->get_peer_addrs() == server_msgr->get_myaddrs()); + ConnectionRef server_conn = server_msgr->connect_to( + client_msgr->get_mytype(), client_msgr->get_myaddrs()); // Make should server_conn is the one we already accepted from client, // so it means client_msgr has the same addr when server connection has ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); @@ -345,7 +354,8 @@ TEST_P(MessengerTest, FeatureTest) { client_msgr->start(); MPing *m = new MPing(); - ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst()); + ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(), + server_msgr->get_myaddrs()); conn->send_message(m); CHECK_AND_WAIT_TRUE(!conn->is_connected()); // should failed build a connection @@ -360,7 +370,8 @@ TEST_P(MessengerTest, FeatureTest) { client_msgr->set_policy(entity_name_t::TYPE_OSD, p); client_msgr->start(); - conn = client_msgr->get_connection(server_msgr->get_myinst()); + conn = client_msgr->connect_to(server_msgr->get_mytype(), + server_msgr->get_myaddrs()); { m = new MPing(); ASSERT_EQ(conn->send_message(m), 0); @@ -391,7 +402,8 @@ TEST_P(MessengerTest, TimeoutTest) { // 1. build the connection MPing *m = new MPing(); - ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst()); + ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(), + server_msgr->get_myaddrs()); { ASSERT_EQ(conn->send_message(m), 0); Mutex::Locker l(cli_dispatcher.lock); @@ -432,7 +444,8 @@ TEST_P(MessengerTest, StatefulTest) { client_msgr->start(); // 1. test for server standby - ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst()); + ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(), + server_msgr->get_myaddrs()); { m = new MPing(); ASSERT_EQ(conn->send_message(m), 0); @@ -444,12 +457,14 @@ TEST_P(MessengerTest, StatefulTest) { ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); conn->mark_down(); ASSERT_FALSE(conn->is_connected()); - ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst()); + ConnectionRef server_conn = server_msgr->connect_to( + client_msgr->get_mytype(), client_msgr->get_myaddrs()); // don't lose state ASSERT_EQ(1U, static_cast(server_conn->get_priv().get())->get_count()); srv_dispatcher.got_new = false; - conn = client_msgr->get_connection(server_msgr->get_myinst()); + conn = client_msgr->connect_to(server_msgr->get_mytype(), + server_msgr->get_myaddrs()); { m = new MPing(); ASSERT_EQ(conn->send_message(m), 0); @@ -459,7 +474,8 @@ TEST_P(MessengerTest, StatefulTest) { cli_dispatcher.got_new = false; } ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); - server_conn = server_msgr->get_connection(client_msgr->get_myinst()); + server_conn = server_msgr->connect_to(client_msgr->get_mytype(), + client_msgr->get_myaddrs()); { Mutex::Locker l(srv_dispatcher.lock); while (!srv_dispatcher.got_remote_reset) @@ -500,7 +516,8 @@ TEST_P(MessengerTest, StatefulTest) { } // resetcheck happen ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); - server_conn = server_msgr->get_connection(client_msgr->get_myinst()); + server_conn = server_msgr->connect_to(client_msgr->get_mytype(), + client_msgr->get_myaddrs()); ASSERT_EQ(1U, static_cast(server_conn->get_priv().get())->get_count()); cli_dispatcher.got_remote_reset = false; @@ -527,7 +544,8 @@ TEST_P(MessengerTest, StatelessTest) { client_msgr->start(); // 1. test for server lose state - ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst()); + ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(), + server_msgr->get_myaddrs()); { m = new MPing(); ASSERT_EQ(conn->send_message(m), 0); @@ -541,7 +559,8 @@ TEST_P(MessengerTest, StatelessTest) { ASSERT_FALSE(conn->is_connected()); srv_dispatcher.got_new = false; - conn = client_msgr->get_connection(server_msgr->get_myinst()); + conn = client_msgr->connect_to(server_msgr->get_mytype(), + server_msgr->get_myaddrs()); { m = new MPing(); ASSERT_EQ(conn->send_message(m), 0); @@ -551,7 +570,8 @@ TEST_P(MessengerTest, StatelessTest) { cli_dispatcher.got_new = false; } ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); - ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst()); + ConnectionRef server_conn = server_msgr->connect_to(client_msgr->get_mytype(), + client_msgr->get_myaddrs()); // server lose state { Mutex::Locker l(srv_dispatcher.lock); @@ -566,7 +586,8 @@ TEST_P(MessengerTest, StatelessTest) { conn->send_keepalive(); CHECK_AND_WAIT_TRUE(!conn->is_connected()); ASSERT_FALSE(conn->is_connected()); - conn = client_msgr->get_connection(server_msgr->get_myinst()); + conn = client_msgr->connect_to(server_msgr->get_mytype(), + server_msgr->get_myaddrs()); { m = new MPing(); ASSERT_EQ(conn->send_message(m), 0); @@ -600,7 +621,8 @@ TEST_P(MessengerTest, ClientStandbyTest) { client_msgr->start(); // 1. test for client standby, resetcheck - ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst()); + ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(), + server_msgr->get_myaddrs()); { m = new MPing(); ASSERT_EQ(conn->send_message(m), 0); @@ -610,7 +632,9 @@ TEST_P(MessengerTest, ClientStandbyTest) { cli_dispatcher.got_new = false; } ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); - ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst()); + ConnectionRef server_conn = server_msgr->connect_to( + client_msgr->get_mytype(), + client_msgr->get_myaddrs()); ASSERT_FALSE(cli_dispatcher.got_remote_reset); cli_dispatcher.got_connect = false; server_conn->mark_down(); @@ -641,7 +665,8 @@ TEST_P(MessengerTest, ClientStandbyTest) { cli_dispatcher.got_new = false; } ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); - server_conn = server_msgr->get_connection(client_msgr->get_myinst()); + server_conn = server_msgr->connect_to(client_msgr->get_mytype(), + client_msgr->get_myaddrs()); ASSERT_EQ(1U, static_cast(server_conn->get_priv().get())->get_count()); server_msgr->shutdown(); @@ -666,7 +691,8 @@ TEST_P(MessengerTest, AuthTest) { // 1. simple auth round trip MPing *m = new MPing(); - ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst()); + ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(), + server_msgr->get_myaddrs()); { ASSERT_EQ(conn->send_message(m), 0); Mutex::Locker l(cli_dispatcher.lock); @@ -683,7 +709,8 @@ TEST_P(MessengerTest, AuthTest) { g_ceph_context->_conf->set_val("auth_client_required", "none"); conn->mark_down(); ASSERT_FALSE(conn->is_connected()); - conn = client_msgr->get_connection(server_msgr->get_myinst()); + conn = client_msgr->connect_to(server_msgr->get_mytype(), + server_msgr->get_myaddrs()); { MPing *m = new MPing(); ASSERT_EQ(conn->send_message(m), 0); @@ -720,7 +747,8 @@ TEST_P(MessengerTest, MessageTest) { // 1. A very large "front"(as well as "payload") // Because a external message need to invade Messenger::decode_message, // here we only use existing message class(MCommand) - ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst()); + ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(), + server_msgr->get_myaddrs()); { uuid_d uuid; uuid.generate_random(); @@ -1057,14 +1085,16 @@ class SyntheticWorkload { if (server->get_default_policy().server) { p = make_pair(client, server); } else { - ConnectionRef conn = client->get_connection(server->get_myinst()); + ConnectionRef conn = client->connect_to(server->get_mytype(), + server->get_myaddrs()); if (available_connections.count(conn) || choose(rng) % 2) p = make_pair(client, server); else p = make_pair(server, client); } } - ConnectionRef conn = p.first->get_connection(p.second->get_myinst()); + ConnectionRef conn = p.first->connect_to(p.second->get_mytype(), + p.second->get_myaddrs()); available_connections[conn] = p; } @@ -1099,7 +1129,8 @@ class SyntheticWorkload { // it's a lossless policy, so we need to mark down each side if (!p.first->get_default_policy().server && !p.second->get_default_policy().server) { ASSERT_EQ(conn->get_messenger(), p.first); - ConnectionRef peer = p.second->get_connection(p.first->get_myinst()); + ConnectionRef peer = p.second->connect_to(p.first->get_mytype(), + p.first->get_myaddrs()); peer->mark_down(); dispatcher.clear_pending(peer); available_connections.erase(peer); @@ -1462,8 +1493,10 @@ TEST_P(MessengerTest, MarkdownTest) { bool equal = false; uint64_t equal_count = 0; while (i--) { - ConnectionRef conn1 = client_msgr->get_connection(server_msgr->get_myinst()); - ConnectionRef conn2 = client_msgr->get_connection(server_msgr2->get_myinst()); + ConnectionRef conn1 = client_msgr->connect_to(server_msgr->get_mytype(), + server_msgr->get_myaddrs()); + ConnectionRef conn2 = client_msgr->connect_to(server_msgr2->get_mytype(), + server_msgr2->get_myaddrs()); MPing *m = new MPing(); ASSERT_EQ(conn1->send_message(m), 0); m = new MPing(); -- 2.39.5