bool peer_is_client() const { return peer_type == CEPH_ENTITY_TYPE_CLIENT; }
const entity_addr_t& get_peer_addr() const { return peer_addr; }
+ entity_addrvec_t get_peer_addrs() const {
+ return entity_addrvec_t(peer_addr);
+ }
void set_peer_addr(const entity_addr_t& a) { peer_addr = a; }
uint64_t get_features() const { return features; }
protected:
/// the "name" of the local daemon. eg client.99
- entity_inst_t my_inst;
+ entity_name_t my_name;
+
+ /// my addr
+ entity_addr_t my_addr;
+
int default_send_priority;
/// set to true once the Messenger has started, and set to false on shutdown
bool started;
*/
Messenger(CephContext *cct_, entity_name_t w)
: trace_endpoint("0.0.0.0", 0, "Messenger"),
- my_inst(),
- default_send_priority(CEPH_MSG_PRIO_DEFAULT), started(false),
+ my_name(w),
+ default_send_priority(CEPH_MSG_PRIO_DEFAULT),
+ started(false),
magic(0),
socket_priority(-1),
cct(cct_),
- crcflags(get_default_crc_flags(cct->_conf))
- {
- my_inst.name = w;
- }
+ crcflags(get_default_crc_flags(cct->_conf)) {}
virtual ~Messenger() {}
/**
* @defgroup Accessors
* @{
*/
+ int get_mytype() const { return my_name.type(); }
+
/**
- * Retrieve the Messenger's instance.
+ * Retrieve the Messenger's name
*
- * @return A const reference to the instance this Messenger
+ * @return A const reference to the name this Messenger
* currently believes to be its own.
*/
- const entity_inst_t& get_myinst() { return my_inst; }
- /**
- * set messenger's instance
- */
- void set_myinst(entity_inst_t i) { my_inst = i; }
-
- uint32_t get_magic() { return magic; }
- void set_magic(int _magic) { magic = _magic; }
+ const entity_name_t& get_myname() { return my_name; }
/**
* Retrieve the Messenger's address.
* @return A const reference to the address this Messenger
* currently believes to be its own.
*/
- const entity_addr_t& get_myaddr() { return my_inst.addr; }
+ const entity_addr_t& get_myaddr() { return my_addr; }
+ entity_addrvec_t get_myaddrs() {
+ return entity_addrvec_t(my_addr);
+ }
+
+ /**
+ * set messenger's instance
+ */
+ uint32_t get_magic() { return magic; }
+ void set_magic(int _magic) { magic = _magic; }
+
protected:
/**
* set messenger's address
*/
virtual void set_myaddr(const entity_addr_t& a) {
- my_inst.addr = a;
- set_endpoint_addr(a, my_inst.name);
+ my_addr = a;
+ set_endpoint_addr(a, my_name);
}
public:
/**
return &trace_endpoint;
}
- /**
- * Retrieve the Messenger's name.
- *
- * @return A const reference to the name this Messenger
- * currently believes to be its own.
- */
- const entity_name_t& get_myname() { return my_inst.name; }
/**
* Set the name of the local entity. The name is reported to others and
* can be changed while the system is running, but doing so at incorrect
*
* @param m The name to set.
*/
- void set_myname(const entity_name_t& m) { my_inst.name = m; }
+ void set_myname(const entity_name_t& m) { my_name = m; }
+
/**
* Set the unknown address components for this Messenger.
* This is useful if the Messenger doesn't know its full address just by
#undef dout_prefix
#define dout_prefix _conn_prefix(_dout)
ostream& AsyncConnection::_conn_prefix(std::ostream *_dout) {
- return *_dout << "-- " << async_msgr->get_myinst().addr << " >> " << peer_addr << " conn(" << this
+ return *_dout << "-- " << async_msgr->get_myaddr() << " >> " << peer_addr << " conn(" << this
<< " :" << port
<< " s=" << get_state_name(state)
<< " pgs=" << peer_global_seq
bufferlist bl;
connect_msg.features = policy.features_supported;
- connect_msg.host_type = async_msgr->get_myinst().name.type();
+ connect_msg.host_type = async_msgr->get_myname().type();
connect_msg.global_seq = global_seq;
connect_msg.connect_seq = connect_seq;
connect_msg.protocol_version = async_msgr->get_proto_version(peer_type, true);
// adjust the nonce; we want our entity_addr_t to be truly unique.
nonce += 1000000;
ldout(cct, 10) << __func__ << " new nonce " << nonce
- << " and inst " << get_myinst() << dendl;
+ << " and addr " << get_myaddr() << dendl;
entity_addr_t bound_addr;
entity_addr_t bind_addr = get_myaddr();
return 0;
Mutex::Locker l(lock);
if (did_bind) {
- assert(my_inst.addr == bind_addr);
+ assert(my_addr == bind_addr);
return 0;
}
if (started) {
init_local_connection();
- ldout(cct,1) << __func__ << " bind my_inst.addr is " << get_myaddr() << dendl;
+ ldout(cct,1) << __func__ << " bind my_addr is " << get_myaddr() << dendl;
did_bind = true;
}
ldout(cct,1) << __func__ << " start" << dendl;
// register at least one entity, first!
- assert(my_inst.name.type() >= 0);
+ assert(my_name.type() >= 0);
assert(!started);
started = true;
stopped = false;
if (!did_bind) {
- my_inst.addr.nonce = nonce;
+ my_addr.nonce = nonce;
_init_local_connection();
}
AsyncConnectionRef AsyncMessenger::create_connect(const entity_addr_t& addr, int type)
{
assert(lock.is_locked());
- assert(addr != my_inst.addr);
+ assert(addr != my_addr);
ldout(cct, 10) << __func__ << " " << addr
<< ", creating connection and registering" << dendl;
ConnectionRef AsyncMessenger::get_connection(const entity_inst_t& dest)
{
Mutex::Locker l(lock);
- if (my_inst.addr == dest.addr) {
+ if (my_addr == dest.addr) {
// local
return local_connection;
}
}
// local?
- if (my_inst.addr == dest_addr) {
+ if (my_addr == dest_addr) {
// local
local_connection->send_message(m);
return ;
}
/**
- * If my_inst.addr doesn't have an IP set, this function
+ * If my_addr doesn't have an IP set, this function
* will fill it in from the passed addr. Otherwise it does nothing and returns.
*/
void AsyncMessenger::set_addr_unknowns(const entity_addr_t &addr)
{
Mutex::Locker l(lock);
- if (my_inst.addr.is_blank_ip()) {
- int port = my_inst.addr.get_port();
- my_inst.addr.u = addr.u;
- my_inst.addr.set_port(port);
+ if (my_addr.is_blank_ip()) {
+ int port = my_addr.get_port();
+ my_addr.u = addr.u;
+ my_addr.set_port(port);
_init_local_connection();
}
}
int AsyncMessenger::get_proto_version(int peer_type, bool connect) const
{
- int my_type = my_inst.name.type();
+ int my_type = my_name.type();
// set reply protocol version
if (peer_type == my_type) {
void AsyncMessenger::learned_addr(const entity_addr_t &peer_addr_for_me)
{
// be careful here: multiple threads may block here, and readers of
- // my_inst.addr do NOT hold any lock.
+ // my_addr do NOT hold any lock.
// this always goes from true -> false under the protection of the
// mutex. if it is already false, we need not retake the mutex at
if (need_addr) {
need_addr = false;
entity_addr_t t = peer_addr_for_me;
- t.set_port(my_inst.addr.get_port());
- t.set_nonce(my_inst.addr.get_nonce());
- my_inst.addr = t;
- ldout(cct, 1) << __func__ << " learned my addr " << my_inst.addr << dendl;
+ t.set_port(my_addr.get_port());
+ t.set_nonce(my_addr.get_nonce());
+ my_addr = t;
+ ldout(cct, 1) << __func__ << " learned my addr " << my_addr << dendl;
_init_local_connection();
}
lock.Unlock();
void _init_local_connection() {
assert(lock.is_locked());
- local_connection->peer_addr = my_inst.addr;
- local_connection->peer_type = my_inst.name.type();
+ local_connection->peer_addr = my_addr;
+ local_connection->peer_type = my_name.type();
local_connection->set_features(CEPH_FEATURES_ALL);
ms_deliver_handle_fast_connect(local_connection.get());
}
// adjust the nonce; we want our entity_addr_t to be truly unique.
nonce += 1000000;
- msgr->my_inst.addr.nonce = nonce;
- ldout(msgr->cct,10) << __func__ << " new nonce " << nonce << " and inst "
- << msgr->my_inst << dendl;
+ msgr->my_addr.nonce = nonce;
+ ldout(msgr->cct,10) << __func__ << " new nonce " << nonce << " and addr "
+ << msgr->my_addr << dendl;
ldout(msgr->cct,10) << " will try " << addr << " and avoid ports " << new_avoid << dendl;
int r = bind(addr, new_avoid);
#undef dout_prefix
#define dout_prefix *_dout << *this
ostream& Pipe::_pipe_prefix(std::ostream &out) const {
- return out << "-- " << msgr->get_myinst().addr << " >> " << peer_addr << " pipe(" << this
+ return out << "-- " << msgr->get_myaddr() << " >> " << peer_addr << " pipe(" << this
<< " sd=" << sd << " :" << port
<< " s=" << state
<< " pgs=" << peer_global_seq
}
// and my addr
- encode(msgr->my_inst.addr, addrs, 0); // legacy
+ encode(msgr->my_addr, addrs, 0); // legacy
- port = msgr->my_inst.addr.get_port();
+ port = msgr->my_addr.get_port();
// and peer's socket addr (they might not know their ip)
sockaddr_storage ss;
}
// connection race?
- if (peer_addr < msgr->my_inst.addr ||
+ if (peer_addr < msgr->my_addr ||
existing->policy.server) {
// incoming wins
ldout(msgr->cct,10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq
// our existing outgoing wins
ldout(msgr->cct,10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq
<< " == " << connect.connect_seq << ", sending WAIT" << dendl;
- assert(peer_addr > msgr->my_inst.addr);
+ assert(peer_addr > msgr->my_addr);
if (!(existing->state == STATE_CONNECTING))
lderr(msgr->cct) << "accept race bad state, would send wait, existing="
<< existing->get_state_name()
msgr->learned_addr(peer_addr_for_me);
- encode(msgr->my_inst.addr, myaddrbl, 0); // legacy
+ encode(msgr->my_addr, myaddrbl, 0); // legacy
memset(&msg, 0, sizeof(msg));
msgvec[0].iov_base = myaddrbl.c_str();
ldout(msgr->cct,2) << "connect couldn't write my addr, " << cpp_strerror(rc) << dendl;
goto fail;
}
- ldout(msgr->cct,10) << "connect sent my addr " << msgr->my_inst.addr << dendl;
+ ldout(msgr->cct,10) << "connect sent my addr " << msgr->my_addr << dendl;
while (1) {
ceph_msg_connect connect;
connect.features = policy.features_supported;
- connect.host_type = msgr->get_myinst().name.type();
+ connect.host_type = msgr->get_myname().type();
connect.global_seq = gseq;
connect.connect_seq = cseq;
connect.protocol_version = msgr->get_proto_version(peer_type, true);
*/
void SimpleMessenger::set_addr_unknowns(const entity_addr_t &addr)
{
- if (my_inst.addr.is_blank_ip()) {
- int port = my_inst.addr.get_port();
- my_inst.addr.u = addr.u;
- my_inst.addr.set_port(port);
+ if (my_addr.is_blank_ip()) {
+ int port = my_addr.get_port();
+ my_addr.u = addr.u;
+ my_addr.set_port(port);
init_local_connection();
}
}
int SimpleMessenger::get_proto_version(int peer_type, bool connect)
{
- int my_type = my_inst.name.type();
+ int my_type = my_name.type();
// set reply protocol version
if (peer_type == my_type) {
return 0;
Mutex::Locker l(lock);
if (did_bind) {
- assert(my_inst.addr == bind_addr);
+ assert(my_addr == bind_addr);
return 0;
}
if (started) {
ldout(cct,1) << "messenger.start" << dendl;
// register at least one entity, first!
- assert(my_inst.name.type() >= 0);
+ assert(my_name.type() >= 0);
assert(!started);
started = true;
stopped = false;
if (!did_bind) {
- my_inst.addr.nonce = nonce;
+ my_addr.nonce = nonce;
init_local_connection();
}
Message *first)
{
assert(lock.is_locked());
- assert(addr != my_inst.addr);
+ assert(addr != my_addr);
ldout(cct,10) << "connect_rank to " << addr << ", creating pipe and registering" << dendl;
ConnectionRef SimpleMessenger::get_connection(const entity_inst_t& dest)
{
Mutex::Locker l(lock);
- if (my_inst.addr == dest.addr) {
+ if (my_addr == dest.addr) {
// local
return local_connection;
}
}
// local?
- if (my_inst.addr == dest_addr) {
+ if (my_addr == dest_addr) {
// local
ldout(cct,20) << "submit_message " << *m << " local" << dendl;
m->set_connection(local_connection.get());
void SimpleMessenger::learned_addr(const entity_addr_t &peer_addr_for_me)
{
// be careful here: multiple threads may block here, and readers of
- // my_inst.addr do NOT hold any lock.
+ // my_addr do NOT hold any lock.
// this always goes from true -> false under the protection of the
// mutex. if it is already false, we need not retake the mutex at
lock.Lock();
if (need_addr) {
entity_addr_t t = peer_addr_for_me;
- t.set_port(my_inst.addr.get_port());
- t.set_nonce(my_inst.addr.get_nonce());
- ANNOTATE_BENIGN_RACE_SIZED(&my_inst.addr, sizeof(my_inst.addr),
+ t.set_port(my_addr.get_port());
+ t.set_nonce(my_addr.get_nonce());
+ ANNOTATE_BENIGN_RACE_SIZED(&my_addr, sizeof(my_addr),
"SimpleMessenger learned addr");
- my_inst.addr = t;
- ldout(cct,1) << "learned my addr " << my_inst.addr << dendl;
+ my_addr = t;
+ ldout(cct,1) << "learned my addr " << my_addr << dendl;
need_addr = false;
init_local_connection();
}
void SimpleMessenger::init_local_connection()
{
- local_connection->peer_addr = my_inst.addr;
- local_connection->peer_type = my_inst.name.type();
+ local_connection->peer_addr = my_addr;
+ local_connection->peer_type = my_name.type();
local_connection->set_features(CEPH_FEATURES_ALL);
ms_deliver_handle_fast_connect(local_connection.get());
}
service.retrieve_epochs(&boot_epoch, &up_epoch, NULL);
if (!up_epoch &&
osdmap->is_up(whoami) &&
- osdmap->get_inst(whoami) == client_messenger->get_myinst()) {
+ osdmap->get_addr(whoami) == client_messenger->get_myaddr()) {
up_epoch = osdmap->get_epoch();
dout(10) << "up_epoch is " << up_epoch << dendl;
if (!boot_epoch) {
Message *payload = new MGenericMessage(CEPH_MSG_SHUTDOWN);
MRoute *m = new MRoute;
m->msg = payload;
- m->dest = msg->get_myinst();
+ m->dest.addr = msg->get_myaddr();
+ m->dest.name = entity_name_t(msg->get_mytype(), -1);
Message *r = send_wait_reply(m, CEPH_MSG_SHUTDOWN);
// we want an error
ASSERT_NE(IS_ERR(r), 0);
// 1. simple round trip
MPing *m = new MPing();
- ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst());
+ ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
+ server_msgr->get_myaddrs());
{
ASSERT_EQ(conn->send_message(m), 0);
Mutex::Locker l(cli_dispatcher.lock);
// 2. test rebind port
set<int> avoid_ports;
- for (int i = 0; i < 10 ; i++)
- avoid_ports.insert(server_msgr->get_myaddr().get_port() + i);
+ for (int i = 0; i < 10 ; i++) {
+ for (auto a : server_msgr->get_myaddrs().v) {
+ avoid_ports.insert(a.get_port() + i);
+ }
+ }
server_msgr->rebind(avoid_ports);
- ASSERT_TRUE(avoid_ports.count(server_msgr->get_myaddr().get_port()) == 0);
+ for (auto a : server_msgr->get_myaddrs().v) {
+ ASSERT_TRUE(avoid_ports.count(a.get_port()) == 0);
+ }
- conn = client_msgr->get_connection(server_msgr->get_myinst());
+ conn = client_msgr->connect_to(server_msgr->get_mytype(),
+ server_msgr->get_myaddrs());
{
m = new MPing();
ASSERT_EQ(conn->send_message(m), 0);
client_msgr->start();
MPing *m = new MPing();
- ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst());
+ ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
+ server_msgr->get_myaddrs());
{
ASSERT_EQ(conn->send_message(m), 0);
Mutex::Locker l(cli_dispatcher.lock);
cli_dispatcher.got_new = false;
}
ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
- ASSERT_TRUE(conn->get_peer_addr() == server_msgr->get_myaddr());
- ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst());
+ ASSERT_TRUE(conn->get_peer_addrs() == server_msgr->get_myaddrs());
+ ConnectionRef server_conn = server_msgr->connect_to(
+ client_msgr->get_mytype(), client_msgr->get_myaddrs());
// Make should server_conn is the one we already accepted from client,
// so it means client_msgr has the same addr when server connection has
ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
client_msgr->start();
MPing *m = new MPing();
- ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst());
+ ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
+ server_msgr->get_myaddrs());
conn->send_message(m);
CHECK_AND_WAIT_TRUE(!conn->is_connected());
// should failed build a connection
client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
client_msgr->start();
- conn = client_msgr->get_connection(server_msgr->get_myinst());
+ conn = client_msgr->connect_to(server_msgr->get_mytype(),
+ server_msgr->get_myaddrs());
{
m = new MPing();
ASSERT_EQ(conn->send_message(m), 0);
// 1. build the connection
MPing *m = new MPing();
- ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst());
+ ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
+ server_msgr->get_myaddrs());
{
ASSERT_EQ(conn->send_message(m), 0);
Mutex::Locker l(cli_dispatcher.lock);
client_msgr->start();
// 1. test for server standby
- ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst());
+ ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
+ server_msgr->get_myaddrs());
{
m = new MPing();
ASSERT_EQ(conn->send_message(m), 0);
ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
conn->mark_down();
ASSERT_FALSE(conn->is_connected());
- ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst());
+ ConnectionRef server_conn = server_msgr->connect_to(
+ client_msgr->get_mytype(), client_msgr->get_myaddrs());
// don't lose state
ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv().get())->get_count());
srv_dispatcher.got_new = false;
- conn = client_msgr->get_connection(server_msgr->get_myinst());
+ conn = client_msgr->connect_to(server_msgr->get_mytype(),
+ server_msgr->get_myaddrs());
{
m = new MPing();
ASSERT_EQ(conn->send_message(m), 0);
cli_dispatcher.got_new = false;
}
ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
- server_conn = server_msgr->get_connection(client_msgr->get_myinst());
+ server_conn = server_msgr->connect_to(client_msgr->get_mytype(),
+ client_msgr->get_myaddrs());
{
Mutex::Locker l(srv_dispatcher.lock);
while (!srv_dispatcher.got_remote_reset)
}
// resetcheck happen
ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
- server_conn = server_msgr->get_connection(client_msgr->get_myinst());
+ server_conn = server_msgr->connect_to(client_msgr->get_mytype(),
+ client_msgr->get_myaddrs());
ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv().get())->get_count());
cli_dispatcher.got_remote_reset = false;
client_msgr->start();
// 1. test for server lose state
- ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst());
+ ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
+ server_msgr->get_myaddrs());
{
m = new MPing();
ASSERT_EQ(conn->send_message(m), 0);
ASSERT_FALSE(conn->is_connected());
srv_dispatcher.got_new = false;
- conn = client_msgr->get_connection(server_msgr->get_myinst());
+ conn = client_msgr->connect_to(server_msgr->get_mytype(),
+ server_msgr->get_myaddrs());
{
m = new MPing();
ASSERT_EQ(conn->send_message(m), 0);
cli_dispatcher.got_new = false;
}
ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
- ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst());
+ ConnectionRef server_conn = server_msgr->connect_to(client_msgr->get_mytype(),
+ client_msgr->get_myaddrs());
// server lose state
{
Mutex::Locker l(srv_dispatcher.lock);
conn->send_keepalive();
CHECK_AND_WAIT_TRUE(!conn->is_connected());
ASSERT_FALSE(conn->is_connected());
- conn = client_msgr->get_connection(server_msgr->get_myinst());
+ conn = client_msgr->connect_to(server_msgr->get_mytype(),
+ server_msgr->get_myaddrs());
{
m = new MPing();
ASSERT_EQ(conn->send_message(m), 0);
client_msgr->start();
// 1. test for client standby, resetcheck
- ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst());
+ ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
+ server_msgr->get_myaddrs());
{
m = new MPing();
ASSERT_EQ(conn->send_message(m), 0);
cli_dispatcher.got_new = false;
}
ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
- ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst());
+ ConnectionRef server_conn = server_msgr->connect_to(
+ client_msgr->get_mytype(),
+ client_msgr->get_myaddrs());
ASSERT_FALSE(cli_dispatcher.got_remote_reset);
cli_dispatcher.got_connect = false;
server_conn->mark_down();
cli_dispatcher.got_new = false;
}
ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
- server_conn = server_msgr->get_connection(client_msgr->get_myinst());
+ server_conn = server_msgr->connect_to(client_msgr->get_mytype(),
+ client_msgr->get_myaddrs());
ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv().get())->get_count());
server_msgr->shutdown();
// 1. simple auth round trip
MPing *m = new MPing();
- ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst());
+ ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
+ server_msgr->get_myaddrs());
{
ASSERT_EQ(conn->send_message(m), 0);
Mutex::Locker l(cli_dispatcher.lock);
g_ceph_context->_conf->set_val("auth_client_required", "none");
conn->mark_down();
ASSERT_FALSE(conn->is_connected());
- conn = client_msgr->get_connection(server_msgr->get_myinst());
+ conn = client_msgr->connect_to(server_msgr->get_mytype(),
+ server_msgr->get_myaddrs());
{
MPing *m = new MPing();
ASSERT_EQ(conn->send_message(m), 0);
// 1. A very large "front"(as well as "payload")
// Because a external message need to invade Messenger::decode_message,
// here we only use existing message class(MCommand)
- ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst());
+ ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
+ server_msgr->get_myaddrs());
{
uuid_d uuid;
uuid.generate_random();
if (server->get_default_policy().server) {
p = make_pair(client, server);
} else {
- ConnectionRef conn = client->get_connection(server->get_myinst());
+ ConnectionRef conn = client->connect_to(server->get_mytype(),
+ server->get_myaddrs());
if (available_connections.count(conn) || choose(rng) % 2)
p = make_pair(client, server);
else
p = make_pair(server, client);
}
}
- ConnectionRef conn = p.first->get_connection(p.second->get_myinst());
+ ConnectionRef conn = p.first->connect_to(p.second->get_mytype(),
+ p.second->get_myaddrs());
available_connections[conn] = p;
}
// it's a lossless policy, so we need to mark down each side
if (!p.first->get_default_policy().server && !p.second->get_default_policy().server) {
ASSERT_EQ(conn->get_messenger(), p.first);
- ConnectionRef peer = p.second->get_connection(p.first->get_myinst());
+ ConnectionRef peer = p.second->connect_to(p.first->get_mytype(),
+ p.first->get_myaddrs());
peer->mark_down();
dispatcher.clear_pending(peer);
available_connections.erase(peer);
bool equal = false;
uint64_t equal_count = 0;
while (i--) {
- ConnectionRef conn1 = client_msgr->get_connection(server_msgr->get_myinst());
- ConnectionRef conn2 = client_msgr->get_connection(server_msgr2->get_myinst());
+ ConnectionRef conn1 = client_msgr->connect_to(server_msgr->get_mytype(),
+ server_msgr->get_myaddrs());
+ ConnectionRef conn2 = client_msgr->connect_to(server_msgr2->get_mytype(),
+ server_msgr2->get_myaddrs());
MPing *m = new MPing();
ASSERT_EQ(conn1->send_message(m), 0);
m = new MPing();