]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/async: set nonce before starting the workers 12390/head
authorKefu Chai <kchai@redhat.com>
Thu, 8 Dec 2016 10:40:24 +0000 (18:40 +0800)
committerKefu Chai <kchai@redhat.com>
Thu, 8 Dec 2016 19:03:27 +0000 (03:03 +0800)
otherwise workers will respond with difference nonces to peers.
and remove nonce from Processor. as there is only one nonce for each
Messenger at a given time.

Signed-off-by: Kefu Chai <kchai@redhat.com>
src/msg/async/AsyncMessenger.cc
src/msg/async/AsyncMessenger.h

index 360da05419767e75beeb8e9cc1ef7acc6665b264..f1cb9f776c4dfa0b64b4e727d060519a69d667f4 100644 (file)
@@ -52,10 +52,12 @@ class Processor::C_processor_accept : public EventCallback {
 };
 
 Processor::Processor(AsyncMessenger *r, Worker *w, CephContext *c, uint64_t n)
-  : msgr(r), net(c), worker(w), nonce(n),
+  : msgr(r), net(c), worker(w),
     listen_handler(new C_processor_accept(this)) {}
 
-int Processor::bind(const entity_addr_t &bind_addr, const set<int>& avoid_ports)
+int Processor::bind(const entity_addr_t &bind_addr,
+                   const set<int>& avoid_ports,
+                   entity_addr_t* bound_addr)
 {
   const md_config_t *conf = msgr->cct->_conf;
   // bind to a socket
@@ -137,42 +139,10 @@ int Processor::bind(const entity_addr_t &bind_addr, const set<int>& avoid_ports)
   }
 
   ldout(msgr->cct, 10) << __func__ << " bound to " << listen_addr << dendl;
-
-  msgr->set_myaddr(bind_addr);
-  if (bind_addr != entity_addr_t())
-    msgr->learned_addr(bind_addr);
-
-  if (msgr->get_myaddr().get_port() == 0) {
-    msgr->set_myaddr(listen_addr);
-  }
-  entity_addr_t addr = msgr->get_myaddr();
-  addr.nonce = nonce;
-  msgr->set_myaddr(addr);
-
-  msgr->init_local_connection();
-
-  ldout(msgr->cct,1) << __func__ << " bind my_inst.addr is " << msgr->get_myaddr() << dendl;
+  *bound_addr = listen_addr;
   return 0;
 }
 
-int Processor::rebind(const set<int>& avoid_ports)
-{
-  ldout(msgr->cct, 1) << __func__ << " rebind avoid " << avoid_ports << dendl;
-
-  entity_addr_t addr = msgr->get_myaddr();
-  set<int> new_avoid = avoid_ports;
-  new_avoid.insert(addr.get_port());
-  addr.set_port(0);
-
-  // adjust the nonce; we want our entity_addr_t to be truly unique.
-  nonce += 1000000;
-  msgr->my_inst.addr.nonce = nonce;
-  ldout(msgr->cct, 10) << __func__ << " new nonce " << nonce << " and inst " << msgr->my_inst << dendl;
-
-  ldout(msgr->cct, 10) << __func__ << " will try " << addr << " and avoid ports " << new_avoid << dendl;
-  return bind(addr, new_avoid);
-}
-
 void Processor::start()
 {
   ldout(msgr->cct, 1) << __func__ << dendl;
@@ -346,11 +316,11 @@ int AsyncMessenger::bind(const entity_addr_t &bind_addr)
 
   // bind to a socket
   set<int> avoid_ports;
-  int r = 0;
+  entity_addr_t bound_addr;
   unsigned i = 0;
   for (auto &&p : processors) {
-    r = p->bind(bind_addr, avoid_ports);
-    if (r < 0) {
+    int r = p->bind(bind_addr, avoid_ports, &bound_addr);
+    if (r) {
       // Note: this is related to local tcp listen table problem.
       // Posix(default kernel implementation) backend shares listen table
       // in the kernel, so all threads can use the same listen table naturally
@@ -361,13 +331,12 @@ int AsyncMessenger::bind(const entity_addr_t &bind_addr)
       // but the second worker failed, it's not expected and we need to assert
       // here
       assert(i == 0);
-      break;
+      return r;
     }
     ++i;
   }
-  if (r >= 0)
-    did_bind = true;
-  return r;
+  _finish_bind(bind_addr, bound_addr);
+  return 0;
 }
 
 int AsyncMessenger::rebind(const set<int>& avoid_ports)
@@ -378,19 +347,53 @@ int AsyncMessenger::rebind(const set<int>& avoid_ports)
   for (auto &&p : processors)
     p->stop();
   mark_down_all();
+
+  // adjust the nonce; we want our entity_addr_t to be truly unique.
+  nonce += 1000000;
+  ldout(cct, 10) << __func__ << " new nonce " << nonce
+                << " and inst " << get_myinst() << dendl;
+
+  entity_addr_t bound_addr;
+  entity_addr_t bind_addr = get_myaddr();
+  bind_addr.set_port(0);
+  set<int> new_avoid(avoid_ports);
+  new_avoid.insert(bind_addr.get_port());
+  ldout(cct, 10) << __func__ << " will try " << bind_addr
+                << " and avoid ports " << new_avoid << dendl;
   unsigned i = 0;
-  int r = 0;
   for (auto &&p : processors) {
-    r = p->rebind(avoid_ports);
-    if (r == 0) {
-      p->start();
-    } else {
+    int r = p->bind(bind_addr, avoid_ports, &bound_addr);
+    if (r) {
       assert(i == 0);
-      break;
+      return r;
     }
-    i++;
+    ++i;
+  }
+  _finish_bind(bind_addr, bound_addr);
+  for (auto &&p : processors) {
+    p->start();
+  }
+  return 0;
+}
+
+void AsyncMessenger::_finish_bind(const entity_addr_t& bind_addr,
+                                 const entity_addr_t& listen_addr)
+{
+  set_myaddr(bind_addr);
+  if (bind_addr != entity_addr_t())
+    learned_addr(bind_addr);
+
+  if (get_myaddr().get_port() == 0) {
+    set_myaddr(listen_addr);
   }
-  return r;
+  entity_addr_t addr = get_myaddr();
+  addr.set_nonce(nonce);
+  set_myaddr(addr);
+
+  init_local_connection();
+
+  ldout(cct,1) << __func__ << " bind my_inst.addr is " << get_myaddr() << dendl;
+  did_bind = true;
 }
 
 int AsyncMessenger::start()
index 308d3966b4d767679f8b4bc6bcc6af0d571e0738..d3bb2090f74dfbf9bf21b6b16edef5c0f51cf7b9 100644 (file)
@@ -48,7 +48,6 @@ class Processor {
   NetHandler net;
   Worker *worker;
   ServerSocket listen_socket;
-  uint64_t nonce;
   EventCallbackRef listen_handler;
 
   class C_processor_accept;
@@ -58,8 +57,9 @@ class Processor {
   ~Processor() { delete listen_handler; };
 
   void stop();
-  int bind(const entity_addr_t &bind_addr, const set<int>& avoid_ports);
-  int rebind(const set<int>& avoid_port);
+  int bind(const entity_addr_t &bind_addr,
+          const set<int>& avoid_ports,
+          entity_addr_t* bound_addr);
   void start();
   void accept();
 };
@@ -210,6 +210,8 @@ private:
                       const entity_addr_t& dest_addr, int dest_type);
 
   int _send_message(Message *m, const entity_inst_t& dest);
+  void _finish_bind(const entity_addr_t& bind_addr,
+                   const entity_addr_t& listen_addr);
 
  private:
   static const uint64_t ReapDeadConnectionThreshold = 5;