]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
AsyncMessenger: Using EventCenter instead of poll for bind
authorHaomai Wang <haomaiwang@gmail.com>
Sun, 7 Dec 2014 16:28:11 +0000 (00:28 +0800)
committerHaomai Wang <haomaiwang@gmail.com>
Thu, 15 Jan 2015 19:07:09 +0000 (03:07 +0800)
Totally avoid extra thread in AsyncMessenger now. The bind socket will be
regarded as a normal socket and will dispatch a random Worker thread to
handle accept event.

Signed-off-by: Haomai Wang <haomaiwang@gmail.com>
src/msg/async/AsyncMessenger.cc
src/msg/async/AsyncMessenger.h

index 670b980a271905bff9abacc7c6646ed15c71a5fe..d6a99a325d522a8099ae8ed3c1fe7207fc2576a0 100644 (file)
@@ -19,7 +19,6 @@
 #include <errno.h>
 #include <iostream>
 #include <fstream>
-#include <poll.h>
 #ifdef HAVE_SCHED
 #include <sched.h>
 #endif
@@ -54,27 +53,46 @@ static ostream& _prefix(std::ostream *_dout, WorkerPool *p) {
 }
 
 
-class C_handle_accept : public EventCallback {
+class C_conn_accept : public EventCallback {
   AsyncConnectionRef conn;
   int fd;
 
  public:
-  C_handle_accept(AsyncConnectionRef c, int s): conn(c), fd(s) {}
+  C_conn_accept(AsyncConnectionRef c, int s): conn(c), fd(s) {}
   void do_request(int id) {
     conn->accept(fd);
   }
 };
 
-class C_handle_connect : public EventCallback {
-  AsyncConnectionRef conn;
-  const entity_addr_t addr;
-  int type;
 
+class C_processor_accept : public EventCallback {
+  Processor *pro;
+
+ public:
+  C_processor_accept(Processor *p): pro(p) {}
+  void do_request(int id) {
+    pro->accept();
+  }
+};
+
+
+class C_processor_start : public EventCallback {
+  Processor *pro;
+  Worker *worker;
+ public:
+  C_processor_start(Processor *p, Worker *w): pro(p), worker(w) {}
+  void do_request(int id) {
+    pro->start(worker);
+  }
+};
+
+
+class C_processor_stop : public EventCallback {
+  Processor *pro;
  public:
-  C_handle_connect(AsyncConnectionRef c, const entity_addr_t &d, int t)
-      :conn(c), addr(d), type(t) {}
+  C_processor_stop(Processor *p): pro(p) {}
   void do_request(int id) {
-    conn->connect(addr, type);
+    pro->stop_cb();
   }
 };
 
@@ -205,91 +223,72 @@ int Processor::rebind(const set<int>& avoid_ports)
   ldout(msgr->cct, 10) << __func__ << " new nonce " << nonce << " and inst " << msgr->my_inst << dendl;
 
   ldout(msgr->cct, 10) << __func__ << " will try " << addr << " and avoid ports " << new_avoid << dendl;
-  int r = bind(addr, new_avoid);
-  if (r == 0)
-    start();
-  return r;
+  return bind(addr, new_avoid);
 }
 
-int Processor::start()
+int Processor::start(Worker *w)
 {
-  ldout(msgr->cct, 1) << __func__ << " start" << dendl;
+  ldout(msgr->cct, 1) << __func__ << " " << dendl;
 
   // start thread
-  if (listen_sd > 0)
-    create();
+  if (listen_sd > 0) {
+    assert(w->center.get_owner() == pthread_self());
+    worker = w;
+    w->center.create_file_event(listen_sd, EVENT_READABLE,
+                                EventCallbackRef(new C_processor_accept(this)));
+  }
 
   return 0;
 }
 
-void *Processor::entry()
+void Processor::accept()
 {
-  ldout(msgr->cct, 10) << __func__ << " starting" << dendl;
+  ldout(msgr->cct, 10) << __func__ << " listen_sd=" << listen_sd << dendl;
   int errors = 0;
-
-  struct pollfd pfd;
-  pfd.fd = listen_sd;
-  pfd.events = POLLIN | POLLERR | POLLNVAL | POLLHUP;
-  while (!done) {
-    ldout(msgr->cct, 20) << __func__ << " calling poll" << dendl;
-    int r = poll(&pfd, 1, -1);
-    if (r < 0)
-      break;
-    ldout(msgr->cct,20) << __func__ << " poll got " << r << dendl;
-
-    if (pfd.revents & (POLLERR | POLLNVAL | POLLHUP))
-      break;
-
-    ldout(msgr->cct,10) << __func__ << " pfd.revents=" << pfd.revents << dendl;
-    if (done) break;
-
-    // accept
+  while (errors < 4) {
     entity_addr_t addr;
     socklen_t slen = sizeof(addr.ss_addr());
     int sd = ::accept(listen_sd, (sockaddr*)&addr.ss_addr(), &slen);
     if (sd >= 0) {
       errors = 0;
-      ldout(msgr->cct,10) << __func__ << "accepted incoming on sd " << sd << dendl;
+      ldout(msgr->cct, 10) << __func__ << " accepted incoming on sd " << sd << dendl;
 
       msgr->add_accept(sd);
+      break;
     } else {
-      ldout(msgr->cct,0) << __func__ << " no incoming connection?  sd = " << sd
-                         << " errno " << errno << " " << cpp_strerror(errno) << dendl;
-      if (++errors > 4)
-        break;
+      ldout(msgr->cct, 0) << __func__ << " no incoming connection?  sd = " << sd
+                          << " errno " << errno << " " << cpp_strerror(errno) << dendl;
     }
   }
+}
+
+void Processor::stop_cb()
+{
+  ldout(msgr->cct,10) << __func__ << dendl;
 
-  ldout(msgr->cct,20) << __func__ << " closing" << dendl;
-  // don't close socket, in case we start up again?  blech.
+  Mutex::Locker l(stop_lock);
   if (listen_sd >= 0) {
+    worker->center.delete_file_event(listen_sd, EVENT_READABLE);
+    ::shutdown(listen_sd, SHUT_RDWR);
     ::close(listen_sd);
     listen_sd = -1;
   }
-  ldout(msgr->cct,10) << __func__ << " stopping" << dendl;
-  return 0;
+  worker = NULL;
+  stop_cond.Signal();
 }
 
 void Processor::stop()
 {
-  done = true;
   ldout(msgr->cct,10) << __func__ << dendl;
 
   if (listen_sd >= 0) {
-    ::shutdown(listen_sd, SHUT_RDWR);
-  }
-
-  // wait for thread to stop before closing the socket, to avoid
-  // racing against fd re-use.
-  if (is_started()) {
-    join();
+    assert(worker && worker->center.get_owner() != pthread_self());
+    Mutex::Locker l(stop_lock);
+    worker->center.dispatch_event_external(EventCallbackRef(new C_processor_stop(this)));
+    stop_cond.Wait(stop_lock);
+    assert(listen_sd == -1);
+    assert(worker == NULL);
   }
-
-  if (listen_sd >= 0) {
-    ::close(listen_sd);
-    listen_sd = -1;
-  }
-  done = false;
 }
 
 void Worker::stop()
@@ -418,9 +417,10 @@ void AsyncMessenger::ready()
 {
   ldout(cct,10) << __func__ << " " << get_myaddr() << dendl;
 
-  lock.Lock();
-  processor.start();
-  lock.Unlock();
+  Mutex::Locker l(lock);
+  Worker *w = pool->get_worker();
+  w->center.dispatch_event_external(
+      EventCallbackRef(new C_processor_start(&processor, w)));
 }
 
 int AsyncMessenger::shutdown()
@@ -465,7 +465,13 @@ int AsyncMessenger::rebind(const set<int>& avoid_ports)
 
   processor.stop();
   mark_down_all();
-  return processor.rebind(avoid_ports);
+  int r = processor.rebind(avoid_ports);
+  if (r == 0) {
+    Worker *w = pool->get_worker();
+    w->center.dispatch_event_external(
+        EventCallbackRef(new C_processor_start(&processor, w)));
+  }
+  return r;
 }
 
 int AsyncMessenger::start()
@@ -521,7 +527,7 @@ AsyncConnectionRef AsyncMessenger::add_accept(int sd)
   lock.Lock();
   Worker *w = pool->get_worker();
   AsyncConnectionRef conn = new AsyncConnection(cct, this, &w->center);
-  w->center.dispatch_event_external(EventCallbackRef(new C_handle_accept(conn, sd)));
+  w->center.dispatch_event_external(EventCallbackRef(new C_conn_accept(conn, sd)));
   accepting_conns.insert(conn);
   lock.Unlock();
   return conn;
index 91004fca74175a01970c6f24f601bc4616c7e7c3..508ab641cecb1a1fc5146c49656e2e2526229b82 100644 (file)
@@ -39,28 +39,6 @@ using namespace std;
 
 
 class AsyncMessenger;
-
-/**
- * If the Messenger binds to a specific address, the Processor runs
- * and listens for incoming connections.
- */
-class Processor : public Thread {
-  AsyncMessenger *msgr;
-  bool done;
-  int listen_sd;
-  uint64_t nonce;
-
- public:
-  Processor(AsyncMessenger *r, uint64_t n) : msgr(r), done(false), listen_sd(-1), nonce(n) {}
-
-  void *entry();
-  void stop();
-  int bind(const entity_addr_t &bind_addr, const set<int>& avoid_ports);
-  int rebind(const set<int>& avoid_port);
-  int start();
-  void accept();
-};
-
 class WorkerPool;
 
 class Worker : public Thread {
@@ -79,6 +57,31 @@ class Worker : public Thread {
   void stop();
 };
 
+/**
+ * If the Messenger binds to a specific address, the Processor runs
+ * and listens for incoming connections.
+ */
+class Processor {
+  AsyncMessenger *msgr;
+  Worker *worker;
+  int listen_sd;
+  uint64_t nonce;
+  Mutex stop_lock;
+  Cond stop_cond;
+
+ public:
+  Processor(AsyncMessenger *r, uint64_t n)
+    : msgr(r), worker(NULL), listen_sd(-1), nonce(n),
+      stop_lock("AsyncMessenger::Processor::stop_lock") {}
+
+  void stop_cb();
+  void stop();
+  int bind(const entity_addr_t &bind_addr, const set<int>& avoid_ports);
+  int rebind(const set<int>& avoid_port);
+  int start(Worker *w);
+  void accept();
+};
+
 class WorkerPool: CephContext::AssociatedSingletonObject {
   WorkerPool(const WorkerPool &);
   WorkerPool& operator=(const WorkerPool &);