]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/Messenger: my_addr -> my_addrs
authorSage Weil <sage@redhat.com>
Mon, 4 Jun 2018 13:34:22 +0000 (08:34 -0500)
committerSage Weil <sage@redhat.com>
Tue, 3 Jul 2018 18:01:23 +0000 (13:01 -0500)
Minimal changes to SimpleMessenger and AsyncMessenger to keep things
working, assuming we ony have a single addr in the addrvec.

Signed-off-by: Sage Weil <sage@redhat.com>
src/ceph_mon.cc
src/msg/Messenger.h
src/msg/async/AsyncMessenger.cc
src/msg/async/AsyncMessenger.h
src/msg/simple/Accepter.cc
src/msg/simple/SimpleMessenger.cc
src/msg/simple/SimpleMessenger.h

index bc3744c55b22850a00b1043510b23a3e9f3e0ab0..a195452a60f539671ded066847895d62e318c89d 100644 (file)
@@ -742,7 +742,7 @@ int main(int argc, const char **argv)
   // if the public and bind addr are different set the msgr addr
   // to the public one, now that the bind is complete.
   if (public_addr != bind_addr) {
-    msgr->set_addr(public_addr);
+    msgr->set_addrs(entity_addrvec_t(public_addr));
   }
 
   Messenger *mgr_msgr = Messenger::create(g_ceph_context, public_msgr_type,
index 1f43876580d4893789e794f7bf59e02c9110eec9..a83d86746514b3b9e464555a14503162bb1f213d 100644 (file)
@@ -44,6 +44,7 @@ private:
   list <Dispatcher*> fast_dispatchers;
   ZTracer::Endpoint trace_endpoint;
 
+protected:
   void set_endpoint_addr(const entity_addr_t& a,
                          const entity_name_t &name);
 
@@ -52,7 +53,7 @@ protected:
   entity_name_t my_name;
 
   /// my addr
-  entity_addr_t my_addr;
+  entity_addrvec_t my_addrs;
 
   int default_send_priority;
   /// set to true once the Messenger has started, and set to false on shutdown
@@ -149,9 +150,11 @@ public:
    * @return A const reference to the address this Messenger
    * currently believes to be its own.
    */
-  const entity_addr_t& get_myaddr() { return my_addr; }
-  entity_addrvec_t get_myaddrs() {
-    return entity_addrvec_t(my_addr);
+  entity_addr_t get_myaddr() {
+    return my_addrs.front();
+  }
+  const entity_addrvec_t& get_myaddrs() {
+    return my_addrs;
   }
 
   /**
@@ -164,9 +167,9 @@ protected:
   /**
    * set messenger's address
    */
-  virtual void set_myaddr(const entity_addr_t& a) {
-    my_addr = a;
-    set_endpoint_addr(a, my_name);
+  virtual void set_myaddrs(const entity_addrvec_t& a) {
+    my_addrs = a;
+    set_endpoint_addr(a.front(), my_name);
   }
 public:
   /**
@@ -202,7 +205,7 @@ public:
    *
    * @param addr The address to use.
    */
-  virtual void set_addr(const entity_addr_t &addr) = 0;
+  virtual void set_addrs(const entity_addrvec_t &addr) = 0;
   /// Get the default send priority.
   int get_default_send_priority() { return default_send_priority; }
   /**
index 2c2d83ef35810d49b39a7e843f58dec52421e9fb..f34e66357ab334af93b39950685d55a3eb3b22cf 100644 (file)
@@ -33,7 +33,7 @@
 #undef dout_prefix
 #define dout_prefix _prefix(_dout, this)
 static ostream& _prefix(std::ostream *_dout, AsyncMessenger *m) {
-  return *_dout << "-- " << m->get_myaddr() << " ";
+  return *_dout << "-- " << m->get_myaddrs() << " ";
 }
 
 static ostream& _prefix(std::ostream *_dout, Processor *p) {
@@ -290,7 +290,7 @@ AsyncMessenger::~AsyncMessenger()
 
 void AsyncMessenger::ready()
 {
-  ldout(cct,10) << __func__ << " " << get_myaddr() << dendl;
+  ldout(cct,10) << __func__ << " " << get_myaddrs() << dendl;
 
   stack->ready();
   if (pending_bind) {
@@ -309,7 +309,7 @@ void AsyncMessenger::ready()
 
 int AsyncMessenger::shutdown()
 {
-  ldout(cct,10) << __func__ << " " << get_myaddr() << dendl;
+  ldout(cct,10) << __func__ << " " << get_myaddrs() << dendl;
 
   // done!  clean up.
   for (auto &&p : processors)
@@ -417,7 +417,7 @@ int AsyncMessenger::client_bind(const entity_addr_t &bind_addr)
     return 0;
   Mutex::Locker l(lock);
   if (did_bind) {
-    assert(my_addr == bind_addr);
+    assert(my_addrs.legacy_addr() == bind_addr);
     return 0;
   }
   if (started) {
@@ -426,27 +426,27 @@ int AsyncMessenger::client_bind(const entity_addr_t &bind_addr)
   }
   ldout(cct, 10) << __func__ << " " << bind_addr << dendl;
 
-  set_myaddr(bind_addr);
+  set_myaddrs(entity_addrvec_t(bind_addr));
   return 0;
 }
 
 void AsyncMessenger::_finish_bind(const entity_addr_t& bind_addr,
                                  const entity_addr_t& listen_addr)
 {
-  set_myaddr(bind_addr);
+  set_myaddrs(entity_addrvec_t(bind_addr));
   if (bind_addr != entity_addr_t())
     learned_addr(bind_addr);
 
   if (get_myaddr().get_port() == 0) {
-    set_myaddr(listen_addr);
+    set_myaddrs(entity_addrvec_t(listen_addr));
+  }
+  for (auto& a : my_addrs.v) {
+    a.set_nonce(nonce);
   }
-  entity_addr_t addr = get_myaddr();
-  addr.set_nonce(nonce);
-  set_myaddr(addr);
 
   init_local_connection();
 
-  ldout(cct,1) << __func__ << " bind my_addr is " << get_myaddr() << dendl;
+  ldout(cct,1) << __func__ << " bind my_addrs is " << get_myaddrs() << dendl;
   did_bind = true;
 }
 
@@ -463,7 +463,9 @@ int AsyncMessenger::start()
   stopped = false;
 
   if (!did_bind) {
-    my_addr.nonce = nonce;
+    for (auto& a : my_addrs.v) {
+      a.nonce = nonce;
+    }
     _init_local_connection();
   }
 
@@ -512,7 +514,7 @@ void AsyncMessenger::add_accept(Worker *w, ConnectedSocket cli_socket, entity_ad
 AsyncConnectionRef AsyncMessenger::create_connect(const entity_addr_t& addr, int type)
 {
   assert(lock.is_locked());
-  assert(addr != my_addr);
+  assert(addr != my_addrs.legacy_addr());
 
   ldout(cct, 10) << __func__ << " " << addr
       << ", creating connection and registering" << dendl;
@@ -531,7 +533,7 @@ AsyncConnectionRef AsyncMessenger::create_connect(const entity_addr_t& addr, int
 ConnectionRef AsyncMessenger::get_connection(const entity_inst_t& dest)
 {
   Mutex::Locker l(lock);
-  if (my_addr == dest.addr) {
+  if (my_addrs.legacy_addr() == dest.addr) {
     // local
     return local_connection;
   }
@@ -600,7 +602,7 @@ void AsyncMessenger::submit_message(Message *m, AsyncConnectionRef con,
   }
 
   // local?
-  if (my_addr == dest_addr) {
+  if (my_addrs.legacy_addr() == dest_addr) {
     // local
     local_connection->send_message(m);
     return ;
@@ -627,20 +629,24 @@ void AsyncMessenger::submit_message(Message *m, AsyncConnectionRef con,
 void AsyncMessenger::set_addr_unknowns(const entity_addr_t &addr)
 {
   Mutex::Locker l(lock);
-  if (my_addr.is_blank_ip()) {
-    int port = my_addr.get_port();
-    my_addr.u = addr.u;
-    my_addr.set_port(port);
+  if (my_addrs.legacy_addr().is_blank_ip()) {
+    for (auto& a : my_addrs.v) {
+      int port = a.get_port();
+      a.u = addr.u;
+      a.set_port(port);
+    }
     _init_local_connection();
   }
 }
 
-void AsyncMessenger::set_addr(const entity_addr_t &addr)
+void AsyncMessenger::set_addrs(const entity_addrvec_t &addrs)
 {
   Mutex::Locker l(lock);
-  entity_addr_t t = addr;
-  t.set_nonce(nonce);
-  set_myaddr(t);
+  auto t = addrs;
+  for (auto& a : t.v) {
+    a.set_nonce(nonce);
+  }
+  set_myaddrs(t);
   _init_local_connection();
 }
 
@@ -722,11 +728,26 @@ void AsyncMessenger::learned_addr(const entity_addr_t &peer_addr_for_me)
   lock.Lock();
   if (need_addr) {
     need_addr = false;
-    entity_addr_t t = peer_addr_for_me;
-    t.set_port(my_addr.get_port());
-    t.set_nonce(my_addr.get_nonce());
-    my_addr = t;
-    ldout(cct, 1) << __func__ << " learned my addr " << my_addr << dendl;
+    if (my_addrs.empty()) {
+      auto a = peer_addr_for_me;
+      a.set_nonce(nonce);
+      set_myaddrs(entity_addrvec_t(a));
+      ldout(cct,10) << __func__ << " had no addrs" << dendl;
+    } else {
+      // fix all addrs of the same family, regardless of type (msgr2 vs legacy)
+      for (auto& a : my_addrs.v) {
+       if (a.get_family() == peer_addr_for_me.get_family()) {
+         entity_addr_t t = peer_addr_for_me;
+         t.set_type(a.get_type());
+         t.set_port(a.get_port());
+         t.set_nonce(a.get_nonce());
+         ldout(cct,10) << __func__ << " " << a << " -> " << t << dendl;
+         a = t;
+       }
+      }
+    }
+    ldout(cct, 1) << __func__ << " learned my addr " << my_addrs
+                 << " (peer_addr_for_me " << peer_addr_for_me << ")" << dendl;
     _init_local_connection();
   }
   lock.Unlock();
index 5ddc2bde9e190c2f87ba3f289629d1916955ef20..698d5e85f0380a40b8ae401a0d5a8f3c5a36f46f 100644 (file)
@@ -95,7 +95,7 @@ public:
    * @{
    */
   void set_addr_unknowns(const entity_addr_t &addr) override;
-  void set_addr(const entity_addr_t &addr) override;
+  void set_addrs(const entity_addrvec_t &addrs) override;
 
   int get_dispatch_queue_len() override {
     return dispatch_queue.get_queue_len();
@@ -319,7 +319,7 @@ private:
 
   void _init_local_connection() {
     assert(lock.is_locked());
-    local_connection->peer_addrs = entity_addrvec_t(my_addr);
+    local_connection->peer_addrs = my_addrs;
     local_connection->peer_type = my_name.type();
     local_connection->set_features(CEPH_FEATURES_ALL);
     ms_deliver_handle_fast_connect(local_connection.get());
index 9bb3e355ee25300424412ae20f462990cafcd75a..0a1c96acf84663241b294d7720d223517a812c31 100644 (file)
@@ -232,18 +232,19 @@ int Accepter::bind(const entity_addr_t &bind_addr, const set<int>& avoid_ports)
     return rc;
   }
   
-  msgr->set_myaddr(bind_addr);
-  if (bind_addr != entity_addr_t())
+  msgr->set_myaddrs(entity_addrvec_t(bind_addr));
+  if (bind_addr != entity_addr_t() &&
+      !bind_addr.is_blank_ip())
     msgr->learned_addr(bind_addr);
   else
     assert(msgr->get_need_addr());  // should still be true.
 
   if (msgr->get_myaddr().get_port() == 0) {
-    msgr->set_myaddr(listen_addr);
+    msgr->set_myaddrs(entity_addrvec_t(listen_addr));
   }
   entity_addr_t addr = msgr->get_myaddr();
   addr.nonce = nonce;
-  msgr->set_myaddr(addr);
+  msgr->set_myaddrs(entity_addrvec_t(addr));
 
   msgr->init_local_connection();
 
@@ -254,7 +255,8 @@ int Accepter::bind(const entity_addr_t &bind_addr, const set<int>& avoid_ports)
     return rc;
   }
 
-  ldout(msgr->cct,1) <<  __func__ << " my_inst.addr is " << msgr->get_myaddr()
+  ldout(msgr->cct,1) <<  __func__ << " my_addrs " << msgr->my_addrs
+                    << " my_addr " << msgr->my_addr
                     << " need_addr=" << msgr->get_need_addr() << dendl;
   return 0;
 }
index 8667302443d781c3b874a18d54c9c12299742317..efd836203300305370e2975f354701f361e05049 100644 (file)
@@ -149,19 +149,43 @@ int SimpleMessenger::_send_message(Message *m, Connection *con)
  */
 void SimpleMessenger::set_addr_unknowns(const entity_addr_t &addr)
 {
+  assert(my_addr == my_addrs.front());
   if (my_addr.is_blank_ip()) {
-    int port = my_addr.get_port();
-    my_addr.u = addr.u;
-    my_addr.set_port(port);
+    ldout(cct,1) << __func__ << " " << addr << dendl;
+    entity_addr_t t = my_addr;
+    int port = t.get_port();
+    t.u = addr.u;
+    t.set_port(port);
+    set_addrs(entity_addrvec_t(t));
     init_local_connection();
+  } else {
+    ldout(cct,1) << __func__ << " " << addr << " no-op" << dendl;
   }
+  assert(my_addr == my_addrs.front());
 }
 
-void SimpleMessenger::set_addr(const entity_addr_t &addr)
+void SimpleMessenger::set_myaddrs(const entity_addrvec_t &av)
 {
-  entity_addr_t t = addr;
-  t.set_nonce(nonce);
-  set_myaddr(t);
+  my_addr = av.front();
+  my_addr.set_nonce(nonce);
+  // do this in a slightly paranoid way because we update this value in a
+  // thread-unsafe way.  SimpleMessenger sucks.
+  if (my_addrs.empty()) {
+    Messenger::set_myaddrs(av);
+  } else {
+    assert(my_addrs.v.size() == av.v.size());
+    my_addrs.v[0] = av.front();
+    set_endpoint_addr(av.front(), my_name);
+  }
+}
+
+void SimpleMessenger::set_addrs(const entity_addrvec_t &av)
+{
+  auto t = av;
+  for (auto& a : t.v) {
+    a.set_nonce(nonce);
+  }
+  set_myaddrs(t);
   init_local_connection();
 }
 
@@ -319,7 +343,7 @@ int SimpleMessenger::client_bind(const entity_addr_t &bind_addr)
     return 0;
   Mutex::Locker l(lock);
   if (did_bind) {
-    assert(my_addr == bind_addr);
+    assert(my_addrs == entity_addrvec_t(bind_addr));
     return 0;
   }
   if (started) {
@@ -328,7 +352,7 @@ int SimpleMessenger::client_bind(const entity_addr_t &bind_addr)
   }
   ldout(cct,10) << "rank.bind " << bind_addr << dendl;
 
-  set_myaddr(bind_addr);
+  set_myaddrs(entity_addrvec_t(bind_addr));
   return 0;
 }
 
@@ -738,7 +762,7 @@ void SimpleMessenger::learned_addr(const entity_addr_t &peer_addr_for_me)
     t.set_nonce(my_addr.get_nonce());
     ANNOTATE_BENIGN_RACE_SIZED(&my_addr, sizeof(my_addr),
                                "SimpleMessenger learned addr");
-    my_addr = t;
+    set_myaddrs(entity_addrvec_t(t));
     ldout(cct,1) << "learned my addr " << my_addr << dendl;
     need_addr = false;
     init_local_connection();
@@ -748,7 +772,7 @@ void SimpleMessenger::learned_addr(const entity_addr_t &peer_addr_for_me)
 
 void SimpleMessenger::init_local_connection()
 {
-  local_connection->peer_addrs = entity_addrvec_t(my_addr);
+  local_connection->peer_addrs = my_addrs;
   local_connection->peer_type = my_name.type();
   local_connection->set_features(CEPH_FEATURES_ALL);
   ms_deliver_handle_fast_connect(local_connection.get());
index 93597a70a5b887a6532eea132a52d339cec1468e..56640cd41e76f6104bc7918d7ec7c12ff86a5987 100644 (file)
@@ -94,7 +94,8 @@ public:
    * @{
    */
   void set_addr_unknowns(const entity_addr_t& addr) override;
-  void set_addr(const entity_addr_t &addr) override;
+  void set_addrs(const entity_addrvec_t &addr) override;
+  void set_myaddrs(const entity_addrvec_t& a) override;
 
   int get_dispatch_queue_len() override {
     return dispatch_queue.get_queue_len();
@@ -286,6 +287,8 @@ private:
   /// lock to protect the global_seq
   ceph::spinlock global_seq_lock;
 
+  entity_addr_t my_addr;
+
   /**
    * hash map of addresses to Pipes
    *