From 5d95b29c9bb862b73d59d35b1656bdd52dedf2d1 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 4 Jun 2018 08:34:22 -0500 Subject: [PATCH] msg/Messenger: my_addr -> my_addrs Minimal changes to SimpleMessenger and AsyncMessenger to keep things working, assuming we ony have a single addr in the addrvec. Signed-off-by: Sage Weil --- src/ceph_mon.cc | 2 +- src/msg/Messenger.h | 19 ++++---- src/msg/async/AsyncMessenger.cc | 77 ++++++++++++++++++++----------- src/msg/async/AsyncMessenger.h | 4 +- src/msg/simple/Accepter.cc | 12 +++-- src/msg/simple/SimpleMessenger.cc | 46 +++++++++++++----- src/msg/simple/SimpleMessenger.h | 5 +- 7 files changed, 109 insertions(+), 56 deletions(-) diff --git a/src/ceph_mon.cc b/src/ceph_mon.cc index bc3744c55b228..a195452a60f53 100644 --- a/src/ceph_mon.cc +++ b/src/ceph_mon.cc @@ -742,7 +742,7 @@ int main(int argc, const char **argv) // if the public and bind addr are different set the msgr addr // to the public one, now that the bind is complete. if (public_addr != bind_addr) { - msgr->set_addr(public_addr); + msgr->set_addrs(entity_addrvec_t(public_addr)); } Messenger *mgr_msgr = Messenger::create(g_ceph_context, public_msgr_type, diff --git a/src/msg/Messenger.h b/src/msg/Messenger.h index 1f43876580d48..a83d86746514b 100644 --- a/src/msg/Messenger.h +++ b/src/msg/Messenger.h @@ -44,6 +44,7 @@ private: list fast_dispatchers; ZTracer::Endpoint trace_endpoint; +protected: void set_endpoint_addr(const entity_addr_t& a, const entity_name_t &name); @@ -52,7 +53,7 @@ protected: entity_name_t my_name; /// my addr - entity_addr_t my_addr; + entity_addrvec_t my_addrs; int default_send_priority; /// set to true once the Messenger has started, and set to false on shutdown @@ -149,9 +150,11 @@ public: * @return A const reference to the address this Messenger * currently believes to be its own. */ - const entity_addr_t& get_myaddr() { return my_addr; } - entity_addrvec_t get_myaddrs() { - return entity_addrvec_t(my_addr); + entity_addr_t get_myaddr() { + return my_addrs.front(); + } + const entity_addrvec_t& get_myaddrs() { + return my_addrs; } /** @@ -164,9 +167,9 @@ protected: /** * set messenger's address */ - virtual void set_myaddr(const entity_addr_t& a) { - my_addr = a; - set_endpoint_addr(a, my_name); + virtual void set_myaddrs(const entity_addrvec_t& a) { + my_addrs = a; + set_endpoint_addr(a.front(), my_name); } public: /** @@ -202,7 +205,7 @@ public: * * @param addr The address to use. */ - virtual void set_addr(const entity_addr_t &addr) = 0; + virtual void set_addrs(const entity_addrvec_t &addr) = 0; /// Get the default send priority. int get_default_send_priority() { return default_send_priority; } /** diff --git a/src/msg/async/AsyncMessenger.cc b/src/msg/async/AsyncMessenger.cc index 2c2d83ef35810..f34e66357ab33 100644 --- a/src/msg/async/AsyncMessenger.cc +++ b/src/msg/async/AsyncMessenger.cc @@ -33,7 +33,7 @@ #undef dout_prefix #define dout_prefix _prefix(_dout, this) static ostream& _prefix(std::ostream *_dout, AsyncMessenger *m) { - return *_dout << "-- " << m->get_myaddr() << " "; + return *_dout << "-- " << m->get_myaddrs() << " "; } static ostream& _prefix(std::ostream *_dout, Processor *p) { @@ -290,7 +290,7 @@ AsyncMessenger::~AsyncMessenger() void AsyncMessenger::ready() { - ldout(cct,10) << __func__ << " " << get_myaddr() << dendl; + ldout(cct,10) << __func__ << " " << get_myaddrs() << dendl; stack->ready(); if (pending_bind) { @@ -309,7 +309,7 @@ void AsyncMessenger::ready() int AsyncMessenger::shutdown() { - ldout(cct,10) << __func__ << " " << get_myaddr() << dendl; + ldout(cct,10) << __func__ << " " << get_myaddrs() << dendl; // done! clean up. for (auto &&p : processors) @@ -417,7 +417,7 @@ int AsyncMessenger::client_bind(const entity_addr_t &bind_addr) return 0; Mutex::Locker l(lock); if (did_bind) { - assert(my_addr == bind_addr); + assert(my_addrs.legacy_addr() == bind_addr); return 0; } if (started) { @@ -426,27 +426,27 @@ int AsyncMessenger::client_bind(const entity_addr_t &bind_addr) } ldout(cct, 10) << __func__ << " " << bind_addr << dendl; - set_myaddr(bind_addr); + set_myaddrs(entity_addrvec_t(bind_addr)); return 0; } void AsyncMessenger::_finish_bind(const entity_addr_t& bind_addr, const entity_addr_t& listen_addr) { - set_myaddr(bind_addr); + set_myaddrs(entity_addrvec_t(bind_addr)); if (bind_addr != entity_addr_t()) learned_addr(bind_addr); if (get_myaddr().get_port() == 0) { - set_myaddr(listen_addr); + set_myaddrs(entity_addrvec_t(listen_addr)); + } + for (auto& a : my_addrs.v) { + a.set_nonce(nonce); } - entity_addr_t addr = get_myaddr(); - addr.set_nonce(nonce); - set_myaddr(addr); init_local_connection(); - ldout(cct,1) << __func__ << " bind my_addr is " << get_myaddr() << dendl; + ldout(cct,1) << __func__ << " bind my_addrs is " << get_myaddrs() << dendl; did_bind = true; } @@ -463,7 +463,9 @@ int AsyncMessenger::start() stopped = false; if (!did_bind) { - my_addr.nonce = nonce; + for (auto& a : my_addrs.v) { + a.nonce = nonce; + } _init_local_connection(); } @@ -512,7 +514,7 @@ void AsyncMessenger::add_accept(Worker *w, ConnectedSocket cli_socket, entity_ad AsyncConnectionRef AsyncMessenger::create_connect(const entity_addr_t& addr, int type) { assert(lock.is_locked()); - assert(addr != my_addr); + assert(addr != my_addrs.legacy_addr()); ldout(cct, 10) << __func__ << " " << addr << ", creating connection and registering" << dendl; @@ -531,7 +533,7 @@ AsyncConnectionRef AsyncMessenger::create_connect(const entity_addr_t& addr, int ConnectionRef AsyncMessenger::get_connection(const entity_inst_t& dest) { Mutex::Locker l(lock); - if (my_addr == dest.addr) { + if (my_addrs.legacy_addr() == dest.addr) { // local return local_connection; } @@ -600,7 +602,7 @@ void AsyncMessenger::submit_message(Message *m, AsyncConnectionRef con, } // local? - if (my_addr == dest_addr) { + if (my_addrs.legacy_addr() == dest_addr) { // local local_connection->send_message(m); return ; @@ -627,20 +629,24 @@ void AsyncMessenger::submit_message(Message *m, AsyncConnectionRef con, void AsyncMessenger::set_addr_unknowns(const entity_addr_t &addr) { Mutex::Locker l(lock); - if (my_addr.is_blank_ip()) { - int port = my_addr.get_port(); - my_addr.u = addr.u; - my_addr.set_port(port); + if (my_addrs.legacy_addr().is_blank_ip()) { + for (auto& a : my_addrs.v) { + int port = a.get_port(); + a.u = addr.u; + a.set_port(port); + } _init_local_connection(); } } -void AsyncMessenger::set_addr(const entity_addr_t &addr) +void AsyncMessenger::set_addrs(const entity_addrvec_t &addrs) { Mutex::Locker l(lock); - entity_addr_t t = addr; - t.set_nonce(nonce); - set_myaddr(t); + auto t = addrs; + for (auto& a : t.v) { + a.set_nonce(nonce); + } + set_myaddrs(t); _init_local_connection(); } @@ -722,11 +728,26 @@ void AsyncMessenger::learned_addr(const entity_addr_t &peer_addr_for_me) lock.Lock(); if (need_addr) { need_addr = false; - entity_addr_t t = peer_addr_for_me; - t.set_port(my_addr.get_port()); - t.set_nonce(my_addr.get_nonce()); - my_addr = t; - ldout(cct, 1) << __func__ << " learned my addr " << my_addr << dendl; + if (my_addrs.empty()) { + auto a = peer_addr_for_me; + a.set_nonce(nonce); + set_myaddrs(entity_addrvec_t(a)); + ldout(cct,10) << __func__ << " had no addrs" << dendl; + } else { + // fix all addrs of the same family, regardless of type (msgr2 vs legacy) + for (auto& a : my_addrs.v) { + if (a.get_family() == peer_addr_for_me.get_family()) { + entity_addr_t t = peer_addr_for_me; + t.set_type(a.get_type()); + t.set_port(a.get_port()); + t.set_nonce(a.get_nonce()); + ldout(cct,10) << __func__ << " " << a << " -> " << t << dendl; + a = t; + } + } + } + ldout(cct, 1) << __func__ << " learned my addr " << my_addrs + << " (peer_addr_for_me " << peer_addr_for_me << ")" << dendl; _init_local_connection(); } lock.Unlock(); diff --git a/src/msg/async/AsyncMessenger.h b/src/msg/async/AsyncMessenger.h index 5ddc2bde9e190..698d5e85f0380 100644 --- a/src/msg/async/AsyncMessenger.h +++ b/src/msg/async/AsyncMessenger.h @@ -95,7 +95,7 @@ public: * @{ */ void set_addr_unknowns(const entity_addr_t &addr) override; - void set_addr(const entity_addr_t &addr) override; + void set_addrs(const entity_addrvec_t &addrs) override; int get_dispatch_queue_len() override { return dispatch_queue.get_queue_len(); @@ -319,7 +319,7 @@ private: void _init_local_connection() { assert(lock.is_locked()); - local_connection->peer_addrs = entity_addrvec_t(my_addr); + local_connection->peer_addrs = my_addrs; local_connection->peer_type = my_name.type(); local_connection->set_features(CEPH_FEATURES_ALL); ms_deliver_handle_fast_connect(local_connection.get()); diff --git a/src/msg/simple/Accepter.cc b/src/msg/simple/Accepter.cc index 9bb3e355ee253..0a1c96acf8466 100644 --- a/src/msg/simple/Accepter.cc +++ b/src/msg/simple/Accepter.cc @@ -232,18 +232,19 @@ int Accepter::bind(const entity_addr_t &bind_addr, const set& avoid_ports) return rc; } - msgr->set_myaddr(bind_addr); - if (bind_addr != entity_addr_t()) + msgr->set_myaddrs(entity_addrvec_t(bind_addr)); + if (bind_addr != entity_addr_t() && + !bind_addr.is_blank_ip()) msgr->learned_addr(bind_addr); else assert(msgr->get_need_addr()); // should still be true. if (msgr->get_myaddr().get_port() == 0) { - msgr->set_myaddr(listen_addr); + msgr->set_myaddrs(entity_addrvec_t(listen_addr)); } entity_addr_t addr = msgr->get_myaddr(); addr.nonce = nonce; - msgr->set_myaddr(addr); + msgr->set_myaddrs(entity_addrvec_t(addr)); msgr->init_local_connection(); @@ -254,7 +255,8 @@ int Accepter::bind(const entity_addr_t &bind_addr, const set& avoid_ports) return rc; } - ldout(msgr->cct,1) << __func__ << " my_inst.addr is " << msgr->get_myaddr() + ldout(msgr->cct,1) << __func__ << " my_addrs " << msgr->my_addrs + << " my_addr " << msgr->my_addr << " need_addr=" << msgr->get_need_addr() << dendl; return 0; } diff --git a/src/msg/simple/SimpleMessenger.cc b/src/msg/simple/SimpleMessenger.cc index 8667302443d78..efd8362033003 100644 --- a/src/msg/simple/SimpleMessenger.cc +++ b/src/msg/simple/SimpleMessenger.cc @@ -149,19 +149,43 @@ int SimpleMessenger::_send_message(Message *m, Connection *con) */ void SimpleMessenger::set_addr_unknowns(const entity_addr_t &addr) { + assert(my_addr == my_addrs.front()); if (my_addr.is_blank_ip()) { - int port = my_addr.get_port(); - my_addr.u = addr.u; - my_addr.set_port(port); + ldout(cct,1) << __func__ << " " << addr << dendl; + entity_addr_t t = my_addr; + int port = t.get_port(); + t.u = addr.u; + t.set_port(port); + set_addrs(entity_addrvec_t(t)); init_local_connection(); + } else { + ldout(cct,1) << __func__ << " " << addr << " no-op" << dendl; } + assert(my_addr == my_addrs.front()); } -void SimpleMessenger::set_addr(const entity_addr_t &addr) +void SimpleMessenger::set_myaddrs(const entity_addrvec_t &av) { - entity_addr_t t = addr; - t.set_nonce(nonce); - set_myaddr(t); + my_addr = av.front(); + my_addr.set_nonce(nonce); + // do this in a slightly paranoid way because we update this value in a + // thread-unsafe way. SimpleMessenger sucks. + if (my_addrs.empty()) { + Messenger::set_myaddrs(av); + } else { + assert(my_addrs.v.size() == av.v.size()); + my_addrs.v[0] = av.front(); + set_endpoint_addr(av.front(), my_name); + } +} + +void SimpleMessenger::set_addrs(const entity_addrvec_t &av) +{ + auto t = av; + for (auto& a : t.v) { + a.set_nonce(nonce); + } + set_myaddrs(t); init_local_connection(); } @@ -319,7 +343,7 @@ int SimpleMessenger::client_bind(const entity_addr_t &bind_addr) return 0; Mutex::Locker l(lock); if (did_bind) { - assert(my_addr == bind_addr); + assert(my_addrs == entity_addrvec_t(bind_addr)); return 0; } if (started) { @@ -328,7 +352,7 @@ int SimpleMessenger::client_bind(const entity_addr_t &bind_addr) } ldout(cct,10) << "rank.bind " << bind_addr << dendl; - set_myaddr(bind_addr); + set_myaddrs(entity_addrvec_t(bind_addr)); return 0; } @@ -738,7 +762,7 @@ void SimpleMessenger::learned_addr(const entity_addr_t &peer_addr_for_me) t.set_nonce(my_addr.get_nonce()); ANNOTATE_BENIGN_RACE_SIZED(&my_addr, sizeof(my_addr), "SimpleMessenger learned addr"); - my_addr = t; + set_myaddrs(entity_addrvec_t(t)); ldout(cct,1) << "learned my addr " << my_addr << dendl; need_addr = false; init_local_connection(); @@ -748,7 +772,7 @@ void SimpleMessenger::learned_addr(const entity_addr_t &peer_addr_for_me) void SimpleMessenger::init_local_connection() { - local_connection->peer_addrs = entity_addrvec_t(my_addr); + local_connection->peer_addrs = my_addrs; local_connection->peer_type = my_name.type(); local_connection->set_features(CEPH_FEATURES_ALL); ms_deliver_handle_fast_connect(local_connection.get()); diff --git a/src/msg/simple/SimpleMessenger.h b/src/msg/simple/SimpleMessenger.h index 93597a70a5b88..56640cd41e76f 100644 --- a/src/msg/simple/SimpleMessenger.h +++ b/src/msg/simple/SimpleMessenger.h @@ -94,7 +94,8 @@ public: * @{ */ void set_addr_unknowns(const entity_addr_t& addr) override; - void set_addr(const entity_addr_t &addr) override; + void set_addrs(const entity_addrvec_t &addr) override; + void set_myaddrs(const entity_addrvec_t& a) override; int get_dispatch_queue_len() override { return dispatch_queue.get_queue_len(); @@ -286,6 +287,8 @@ private: /// lock to protect the global_seq ceph::spinlock global_seq_lock; + entity_addr_t my_addr; + /** * hash map of addresses to Pipes * -- 2.39.5