// if the public and bind addr are different set the msgr addr
// to the public one, now that the bind is complete.
if (public_addr != bind_addr) {
- msgr->set_addr(public_addr);
+ msgr->set_addrs(entity_addrvec_t(public_addr));
}
Messenger *mgr_msgr = Messenger::create(g_ceph_context, public_msgr_type,
list <Dispatcher*> fast_dispatchers;
ZTracer::Endpoint trace_endpoint;
+protected:
void set_endpoint_addr(const entity_addr_t& a,
const entity_name_t &name);
entity_name_t my_name;
/// my addr
- entity_addr_t my_addr;
+ entity_addrvec_t my_addrs;
int default_send_priority;
/// set to true once the Messenger has started, and set to false on shutdown
* @return A const reference to the address this Messenger
* currently believes to be its own.
*/
- const entity_addr_t& get_myaddr() { return my_addr; }
- entity_addrvec_t get_myaddrs() {
- return entity_addrvec_t(my_addr);
+ entity_addr_t get_myaddr() {
+ return my_addrs.front();
+ }
+ const entity_addrvec_t& get_myaddrs() {
+ return my_addrs;
}
/**
/**
* set messenger's address
*/
- virtual void set_myaddr(const entity_addr_t& a) {
- my_addr = a;
- set_endpoint_addr(a, my_name);
+ virtual void set_myaddrs(const entity_addrvec_t& a) {
+ my_addrs = a;
+ set_endpoint_addr(a.front(), my_name);
}
public:
/**
*
* @param addr The address to use.
*/
- virtual void set_addr(const entity_addr_t &addr) = 0;
+ virtual void set_addrs(const entity_addrvec_t &addr) = 0;
/// Get the default send priority.
int get_default_send_priority() { return default_send_priority; }
/**
#undef dout_prefix
#define dout_prefix _prefix(_dout, this)
static ostream& _prefix(std::ostream *_dout, AsyncMessenger *m) {
- return *_dout << "-- " << m->get_myaddr() << " ";
+ return *_dout << "-- " << m->get_myaddrs() << " ";
}
static ostream& _prefix(std::ostream *_dout, Processor *p) {
void AsyncMessenger::ready()
{
- ldout(cct,10) << __func__ << " " << get_myaddr() << dendl;
+ ldout(cct,10) << __func__ << " " << get_myaddrs() << dendl;
stack->ready();
if (pending_bind) {
int AsyncMessenger::shutdown()
{
- ldout(cct,10) << __func__ << " " << get_myaddr() << dendl;
+ ldout(cct,10) << __func__ << " " << get_myaddrs() << dendl;
// done! clean up.
for (auto &&p : processors)
return 0;
Mutex::Locker l(lock);
if (did_bind) {
- assert(my_addr == bind_addr);
+ assert(my_addrs.legacy_addr() == bind_addr);
return 0;
}
if (started) {
}
ldout(cct, 10) << __func__ << " " << bind_addr << dendl;
- set_myaddr(bind_addr);
+ set_myaddrs(entity_addrvec_t(bind_addr));
return 0;
}
void AsyncMessenger::_finish_bind(const entity_addr_t& bind_addr,
const entity_addr_t& listen_addr)
{
- set_myaddr(bind_addr);
+ set_myaddrs(entity_addrvec_t(bind_addr));
if (bind_addr != entity_addr_t())
learned_addr(bind_addr);
if (get_myaddr().get_port() == 0) {
- set_myaddr(listen_addr);
+ set_myaddrs(entity_addrvec_t(listen_addr));
+ }
+ for (auto& a : my_addrs.v) {
+ a.set_nonce(nonce);
}
- entity_addr_t addr = get_myaddr();
- addr.set_nonce(nonce);
- set_myaddr(addr);
init_local_connection();
- ldout(cct,1) << __func__ << " bind my_addr is " << get_myaddr() << dendl;
+ ldout(cct,1) << __func__ << " bind my_addrs is " << get_myaddrs() << dendl;
did_bind = true;
}
stopped = false;
if (!did_bind) {
- my_addr.nonce = nonce;
+ for (auto& a : my_addrs.v) {
+ a.nonce = nonce;
+ }
_init_local_connection();
}
AsyncConnectionRef AsyncMessenger::create_connect(const entity_addr_t& addr, int type)
{
assert(lock.is_locked());
- assert(addr != my_addr);
+ assert(addr != my_addrs.legacy_addr());
ldout(cct, 10) << __func__ << " " << addr
<< ", creating connection and registering" << dendl;
ConnectionRef AsyncMessenger::get_connection(const entity_inst_t& dest)
{
Mutex::Locker l(lock);
- if (my_addr == dest.addr) {
+ if (my_addrs.legacy_addr() == dest.addr) {
// local
return local_connection;
}
}
// local?
- if (my_addr == dest_addr) {
+ if (my_addrs.legacy_addr() == dest_addr) {
// local
local_connection->send_message(m);
return ;
void AsyncMessenger::set_addr_unknowns(const entity_addr_t &addr)
{
Mutex::Locker l(lock);
- if (my_addr.is_blank_ip()) {
- int port = my_addr.get_port();
- my_addr.u = addr.u;
- my_addr.set_port(port);
+ if (my_addrs.legacy_addr().is_blank_ip()) {
+ for (auto& a : my_addrs.v) {
+ int port = a.get_port();
+ a.u = addr.u;
+ a.set_port(port);
+ }
_init_local_connection();
}
}
-void AsyncMessenger::set_addr(const entity_addr_t &addr)
+void AsyncMessenger::set_addrs(const entity_addrvec_t &addrs)
{
Mutex::Locker l(lock);
- entity_addr_t t = addr;
- t.set_nonce(nonce);
- set_myaddr(t);
+ auto t = addrs;
+ for (auto& a : t.v) {
+ a.set_nonce(nonce);
+ }
+ set_myaddrs(t);
_init_local_connection();
}
lock.Lock();
if (need_addr) {
need_addr = false;
- entity_addr_t t = peer_addr_for_me;
- t.set_port(my_addr.get_port());
- t.set_nonce(my_addr.get_nonce());
- my_addr = t;
- ldout(cct, 1) << __func__ << " learned my addr " << my_addr << dendl;
+ if (my_addrs.empty()) {
+ auto a = peer_addr_for_me;
+ a.set_nonce(nonce);
+ set_myaddrs(entity_addrvec_t(a));
+ ldout(cct,10) << __func__ << " had no addrs" << dendl;
+ } else {
+ // fix all addrs of the same family, regardless of type (msgr2 vs legacy)
+ for (auto& a : my_addrs.v) {
+ if (a.get_family() == peer_addr_for_me.get_family()) {
+ entity_addr_t t = peer_addr_for_me;
+ t.set_type(a.get_type());
+ t.set_port(a.get_port());
+ t.set_nonce(a.get_nonce());
+ ldout(cct,10) << __func__ << " " << a << " -> " << t << dendl;
+ a = t;
+ }
+ }
+ }
+ ldout(cct, 1) << __func__ << " learned my addr " << my_addrs
+ << " (peer_addr_for_me " << peer_addr_for_me << ")" << dendl;
_init_local_connection();
}
lock.Unlock();
* @{
*/
void set_addr_unknowns(const entity_addr_t &addr) override;
- void set_addr(const entity_addr_t &addr) override;
+ void set_addrs(const entity_addrvec_t &addrs) override;
int get_dispatch_queue_len() override {
return dispatch_queue.get_queue_len();
void _init_local_connection() {
assert(lock.is_locked());
- local_connection->peer_addrs = entity_addrvec_t(my_addr);
+ local_connection->peer_addrs = my_addrs;
local_connection->peer_type = my_name.type();
local_connection->set_features(CEPH_FEATURES_ALL);
ms_deliver_handle_fast_connect(local_connection.get());
return rc;
}
- msgr->set_myaddr(bind_addr);
- if (bind_addr != entity_addr_t())
+ msgr->set_myaddrs(entity_addrvec_t(bind_addr));
+ if (bind_addr != entity_addr_t() &&
+ !bind_addr.is_blank_ip())
msgr->learned_addr(bind_addr);
else
assert(msgr->get_need_addr()); // should still be true.
if (msgr->get_myaddr().get_port() == 0) {
- msgr->set_myaddr(listen_addr);
+ msgr->set_myaddrs(entity_addrvec_t(listen_addr));
}
entity_addr_t addr = msgr->get_myaddr();
addr.nonce = nonce;
- msgr->set_myaddr(addr);
+ msgr->set_myaddrs(entity_addrvec_t(addr));
msgr->init_local_connection();
return rc;
}
- ldout(msgr->cct,1) << __func__ << " my_inst.addr is " << msgr->get_myaddr()
+ ldout(msgr->cct,1) << __func__ << " my_addrs " << msgr->my_addrs
+ << " my_addr " << msgr->my_addr
<< " need_addr=" << msgr->get_need_addr() << dendl;
return 0;
}
*/
void SimpleMessenger::set_addr_unknowns(const entity_addr_t &addr)
{
+ assert(my_addr == my_addrs.front());
if (my_addr.is_blank_ip()) {
- int port = my_addr.get_port();
- my_addr.u = addr.u;
- my_addr.set_port(port);
+ ldout(cct,1) << __func__ << " " << addr << dendl;
+ entity_addr_t t = my_addr;
+ int port = t.get_port();
+ t.u = addr.u;
+ t.set_port(port);
+ set_addrs(entity_addrvec_t(t));
init_local_connection();
+ } else {
+ ldout(cct,1) << __func__ << " " << addr << " no-op" << dendl;
}
+ assert(my_addr == my_addrs.front());
}
-void SimpleMessenger::set_addr(const entity_addr_t &addr)
+void SimpleMessenger::set_myaddrs(const entity_addrvec_t &av)
{
- entity_addr_t t = addr;
- t.set_nonce(nonce);
- set_myaddr(t);
+ my_addr = av.front();
+ my_addr.set_nonce(nonce);
+ // do this in a slightly paranoid way because we update this value in a
+ // thread-unsafe way. SimpleMessenger sucks.
+ if (my_addrs.empty()) {
+ Messenger::set_myaddrs(av);
+ } else {
+ assert(my_addrs.v.size() == av.v.size());
+ my_addrs.v[0] = av.front();
+ set_endpoint_addr(av.front(), my_name);
+ }
+}
+
+void SimpleMessenger::set_addrs(const entity_addrvec_t &av)
+{
+ auto t = av;
+ for (auto& a : t.v) {
+ a.set_nonce(nonce);
+ }
+ set_myaddrs(t);
init_local_connection();
}
return 0;
Mutex::Locker l(lock);
if (did_bind) {
- assert(my_addr == bind_addr);
+ assert(my_addrs == entity_addrvec_t(bind_addr));
return 0;
}
if (started) {
}
ldout(cct,10) << "rank.bind " << bind_addr << dendl;
- set_myaddr(bind_addr);
+ set_myaddrs(entity_addrvec_t(bind_addr));
return 0;
}
t.set_nonce(my_addr.get_nonce());
ANNOTATE_BENIGN_RACE_SIZED(&my_addr, sizeof(my_addr),
"SimpleMessenger learned addr");
- my_addr = t;
+ set_myaddrs(entity_addrvec_t(t));
ldout(cct,1) << "learned my addr " << my_addr << dendl;
need_addr = false;
init_local_connection();
void SimpleMessenger::init_local_connection()
{
- local_connection->peer_addrs = entity_addrvec_t(my_addr);
+ local_connection->peer_addrs = my_addrs;
local_connection->peer_type = my_name.type();
local_connection->set_features(CEPH_FEATURES_ALL);
ms_deliver_handle_fast_connect(local_connection.get());
* @{
*/
void set_addr_unknowns(const entity_addr_t& addr) override;
- void set_addr(const entity_addr_t &addr) override;
+ void set_addrs(const entity_addrvec_t &addr) override;
+ void set_myaddrs(const entity_addrvec_t& a) override;
int get_dispatch_queue_len() override {
return dispatch_queue.get_queue_len();
/// lock to protect the global_seq
ceph::spinlock global_seq_lock;
+ entity_addr_t my_addr;
+
/**
* hash map of addresses to Pipes
*