]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
AsyncConnection: Avoid calling callback after delteing AsyncMessenger
authorHaomai Wang <haomaiwang@gmail.com>
Sat, 10 Jan 2015 13:35:04 +0000 (21:35 +0800)
committerHaomai Wang <haomaiwang@gmail.com>
Thu, 15 Jan 2015 19:07:11 +0000 (03:07 +0800)
Now when calling mark_down/mark_down_all, it will dispatch a reset event.
If we call Messenger::shutdown/wait, and it will let reset event called after
Messenger dealloc.

Signed-off-by: Haomai Wang <haomaiwang@gmail.com>
src/msg/async/AsyncConnection.cc
src/msg/async/AsyncConnection.h
src/msg/async/AsyncMessenger.cc
src/msg/async/AsyncMessenger.h
src/test/msgr/test_msgr.cc

index e1d803e7e39b4f990f1133f361e8a12d673a52af..659eeca4795cf7532ef3aedf280b8cd869c69537 100644 (file)
@@ -2111,7 +2111,6 @@ void AsyncConnection::mark_down()
   stopping.set(1);
   Mutex::Locker l(lock);
   _stop();
-  center->dispatch_event_external(reset_handler);
 }
 
 void AsyncConnection::_send_keepalive_or_ack(bool ack, utime_t *tp)
index ab4c445cfe5fc174e3b6b35fdf4513efdf7171fc..d6c1771495ddd7982078a7f691e56926db4131b3 100644 (file)
@@ -286,6 +286,10 @@ class AsyncConnection : public Connection {
   void process();
   void wakeup_from(uint64_t id);
   void local_deliver();
+  void stop() {
+    mark_down();
+    center->dispatch_event_external(reset_handler);
+  }
 }; /* AsyncConnection */
 
 typedef boost::intrusive_ptr<AsyncConnection> AsyncConnectionRef;
index 902a684e2bb0777a9778421ee8aebf23d1e08f93..48241aacd4a512b0ea122a3c5c19a8f5624c46d0 100644 (file)
@@ -106,13 +106,19 @@ int Processor::bind(const entity_addr_t &bind_addr, const set<int>& avoid_ports)
     return -errno;
   }
 
+  int r = net.set_nonblock(listen_sd);
+  if (r < 0) {
+    ::close(listen_sd);
+    listen_sd = -1;
+    return -errno;
+  }
   // use whatever user specified (if anything)
   entity_addr_t listen_addr = bind_addr;
   listen_addr.set_family(family);
 
   /* bind to port */
   int rc = -1;
-  int r = -1;
+  r = -1;
 
   for (int i = 0; i < conf->ms_bind_retry_count; i++) {
     if (i > 0) {
@@ -167,6 +173,8 @@ int Processor::bind(const entity_addr_t &bind_addr, const set<int>& avoid_ports)
   if (rc < 0) {
     lderr(msgr->cct) << __func__ << " was unable to bind after " << conf->ms_bind_retry_count
                      << " attempts: " << cpp_strerror(errno) << dendl;
+    ::close(listen_sd);
+    listen_sd = -1;
     return r;
   }
 
@@ -176,6 +184,8 @@ int Processor::bind(const entity_addr_t &bind_addr, const set<int>& avoid_ports)
   if (rc < 0) {
     rc = -errno;
     lderr(msgr->cct) << __func__ << " failed getsockname: " << cpp_strerror(rc) << dendl;
+    ::close(listen_sd);
+    listen_sd = -1;
     return rc;
   }
 
@@ -187,6 +197,8 @@ int Processor::bind(const entity_addr_t &bind_addr, const set<int>& avoid_ports)
     rc = -errno;
     lderr(msgr->cct) << __func__ << " unable to listen on " << listen_addr
         << ": " << cpp_strerror(rc) << dendl;
+    ::close(listen_sd);
+    listen_sd = -1;
     return rc;
   }
 
@@ -252,10 +264,17 @@ void Processor::accept()
       ldout(msgr->cct, 10) << __func__ << " accepted incoming on sd " << sd << dendl;
 
       msgr->add_accept(sd);
-      break;
+      continue;
     } else {
-      ldout(msgr->cct, 0) << __func__ << " no incoming connection?  sd = " << sd
-          << " errno " << errno << " " << cpp_strerror(errno) << dendl;
+      if (errno == EINTR) {
+        continue;
+      } else if (errno == EAGAIN) {
+        break;
+      } else {
+        errors++;
+        ldout(msgr->cct, 20) << __func__ << " no incoming connection?  sd = " << sd
+                             << " errno " << errno << " " << cpp_strerror(errno) << dendl;
+      }
     }
   }
 }
@@ -371,7 +390,6 @@ void WorkerPool::barrier()
 {
   ldout(cct, 10) << __func__ << " started." << dendl;
   pthread_t cur = pthread_self();
-  uint64_t send = 0;
   for (vector<Worker*>::iterator it = workers.begin(); it != workers.end(); ++it) {
     assert(cur != (*it)->center.get_owner());
     (*it)->center.dispatch_event_external(EventCallbackRef(new C_barrier(this)));
@@ -393,7 +411,7 @@ void WorkerPool::barrier()
 AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name,
                                string mname, uint64_t _nonce)
   : SimplePolicyMessenger(cct, name,mname, _nonce),
-    processor(this, _nonce),
+    processor(this, cct, _nonce),
     lock("AsyncMessenger::lock"),
     nonce(_nonce), need_addr(true), did_bind(false),
     global_seq(0), deleted_lock("AsyncMessenger::deleted_lock"),
@@ -663,8 +681,8 @@ void AsyncMessenger::mark_down_all()
   for (set<AsyncConnectionRef>::iterator q = accepting_conns.begin();
        q != accepting_conns.end(); ++q) {
     AsyncConnectionRef p = *q;
-    ldout(cct, 5) << __func__ << " accepting_conn " << p << dendl;
-    p->mark_down();
+    ldout(cct, 5) << __func__ << " accepting_conn " << p.get() << dendl;
+    p->stop();
   }
   accepting_conns.clear();
 
@@ -673,15 +691,18 @@ void AsyncMessenger::mark_down_all()
     AsyncConnectionRef p = it->second;
     ldout(cct, 5) << __func__ << " mark down " << it->first << " " << p << dendl;
     conns.erase(it);
-    p->mark_down();
+    p->stop();
   }
 
-  while (!deleted_conns.empty()) {
-    set<AsyncConnectionRef>::iterator it = deleted_conns.begin();
-    AsyncConnectionRef p = *it;
-    ldout(cct, 5) << __func__ << " delete " << p << dendl;
-    p->put();
-    deleted_conns.erase(it);
+  {
+    Mutex::Locker l(deleted_lock);
+    while (!deleted_conns.empty()) {
+      set<AsyncConnectionRef>::iterator it = deleted_conns.begin();
+      AsyncConnectionRef p = *it;
+      ldout(cct, 5) << __func__ << " delete " << p << dendl;
+      p->put();
+      deleted_conns.erase(it);
+    }
   }
   lock.Unlock();
 }
@@ -692,7 +713,7 @@ void AsyncMessenger::mark_down(const entity_addr_t& addr)
   AsyncConnectionRef p = _lookup_conn(addr);
   if (p) {
     ldout(cct, 1) << __func__ << " " << addr << " -- " << p << dendl;
-    p->mark_down();
+    p->stop();
   } else {
     ldout(cct, 1) << __func__ << " " << addr << " -- connection dne" << dendl;
   }
index f4bb34a8c915da522c836c0b4522b72fb5ea94e9..972934ad8efb670f2650c5fa6ec0cb22c08cc356 100644 (file)
@@ -63,12 +63,13 @@ class Worker : public Thread {
  */
 class Processor {
   AsyncMessenger *msgr;
+  NetHandler net;
   Worker *worker;
   int listen_sd;
   uint64_t nonce;
 
  public:
-  Processor(AsyncMessenger *r, uint64_t n): msgr(r), worker(NULL), listen_sd(-1), nonce(n) {}
+  Processor(AsyncMessenger *r, CephContext *c, uint64_t n): msgr(r), net(c), worker(NULL), listen_sd(-1), nonce(n) {}
 
   void stop();
   int bind(const entity_addr_t &bind_addr, const set<int>& avoid_ports);
index 0928f5c7dc8fb4970fc7ef2db6afe215eb65781d..2a95f9ad917248e82fcd2719591c7ab94673663f 100644 (file)
@@ -149,7 +149,6 @@ class FakeDispatcher : public Dispatcher {
     cerr << __func__ << con << std::endl;
     Session *s = static_cast<Session*>(con->get_priv());
     if (s) {
-      Mutex::Locker l(s->lock);
       s->con.reset(NULL);  // break con <-> session ref cycle
       con->set_priv(NULL);   // break ref <-> session cycle, if any
       s->put();