]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
AsyncMessenger: Use round-robin to dispatch new connection
authorHaomai Wang <haomaiwang@gmail.com>
Thu, 18 Sep 2014 09:32:03 +0000 (17:32 +0800)
committerHaomai Wang <haomaiwang@gmail.com>
Wed, 8 Oct 2014 06:04:58 +0000 (14:04 +0800)
EventCenter won't own lock to protect data, because each EventCenter will
master own connections.

Signed-off-by: Haomai Wang <haomaiwang@gmail.com>
src/msg/AsyncConnection.cc
src/msg/AsyncConnection.h
src/msg/AsyncMessenger.cc
src/msg/AsyncMessenger.h
src/msg/Event.cc
src/msg/Event.h

index ac60387e9f27b004ce35263bb4e4fb89d72d93eb..4f9b0c2eeb08abcff96bd200af63ff95ed79119a 100644 (file)
@@ -107,12 +107,12 @@ static void alloc_aligned_buffer(bufferlist& data, unsigned len, unsigned off)
   }
 }
 
-AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m)
+AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCenter *c)
   : Connection(cct, m), async_msgr(m), global_seq(0), connect_seq(0), out_seq(0), in_seq(0), in_seq_acked(0),
     state(STATE_NONE), state_after_send(0), sd(-1),
     lock("AsyncConnection::lock"), open_write(false),
     got_bad_auth(false), authorizer(NULL),
-    state_buffer(4096), state_offset(0), net(cct), center(&m->center)
+    state_buffer(4096), state_offset(0), net(cct), center(c)
 {
   read_handler.reset(new C_handle_read(this));
   write_handler.reset(new C_handle_write(this));
@@ -1607,7 +1607,7 @@ void AsyncConnection::accept(int incoming)
   state = STATE_ACCEPTING;
   center->create_file_event(sd, EVENT_READABLE, read_handler);
   // rescheduler connection in order to avoid lock dep
-  center->create_time_event(0, read_handler);
+  process();
 }
 
 int AsyncConnection::send_message(Message *m)
index 842bb56a4aaeb0991aa03105a07e1cefc21f3fe4..1fbb953ae9f4ae2143bc06c419593788bf6762c0 100644 (file)
@@ -87,7 +87,7 @@ class AsyncConnection : public Connection {
   }
 
  public:
-  AsyncConnection(CephContext *cct, AsyncMessenger *m);
+  AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCenter *c);
   ~AsyncConnection();
 
   ostream& _conn_prefix(std::ostream *_dout);
index 00bd5847ca0fab868aff9e745cd75608eec60fb4..ae024298352d5739e9f5f9f944cc71ecd78597d8 100644 (file)
@@ -4,7 +4,7 @@
 #include <errno.h>
 #include <iostream>
 #include <fstream>
-
+#include <poll.h>
 
 #include "AsyncMessenger.h"
 
@@ -22,20 +22,34 @@ static ostream& _prefix(std::ostream *_dout, AsyncMessenger *m) {
 }
 
 static ostream& _prefix(std::ostream *_dout, Processor *p) {
-  return *_dout << "-- ";
+  return *_dout << "-- Processor";
 }
 
-/*******************
- * EventCallBack
- */
+static ostream& _prefix(std::ostream *_dout, Worker *w) {
+  return *_dout << "--";
+}
 
 class C_handle_accept : public EventCallback {
-  Processor *p;
+  AsyncConnectionRef conn;
+  int fd;
+
+ public:
+  C_handle_accept(AsyncConnectionRef c, int s): conn(c), fd(s) {}
+  void do_request(int id) {
+    conn->accept(fd);
+  }
+};
+
+class C_handle_connect : public EventCallback {
+  AsyncConnectionRef conn;
+  const entity_addr_t addr;
+  int type;
 
  public:
-  C_handle_accept(Processor *p): p(p) {}
-  void do_request(int fd) {
-    p->accept();
+  C_handle_connect(AsyncConnectionRef c, const entity_addr_t &d, int t)
+      :conn(c), addr(d), type(t) {}
+  void do_request(int id) {
+    conn->connect(addr, type);
   }
 };
 
@@ -50,7 +64,7 @@ int Processor::bind(const entity_addr_t &bind_addr, const set<int>& avoid_ports)
   // bind to a socket
   ldout(msgr->cct, 10) << __func__ << dendl;
 
-  int family, flags;
+  int family;
   switch (bind_addr.get_family()) {
   case AF_INET:
   case AF_INET6:
@@ -134,14 +148,6 @@ int Processor::bind(const entity_addr_t &bind_addr, const set<int>& avoid_ports)
     return rc;
   }
 
-  if ((flags = fcntl(listen_sd, F_GETFL, 0)) < 0 ||
-      fcntl(listen_sd, F_SETFL, flags | O_NONBLOCK) < 0) {
-    rc = -errno;
-    lderr(msgr->cct) << __func__ << " unable to setnonblock on " << listen_addr
-                     << ": " << cpp_strerror(rc) << dendl;
-    return rc;
-  }
-
   msgr->set_myaddr(bind_addr);
   if (bind_addr != entity_addr_t())
     msgr->learned_addr(bind_addr);
@@ -186,42 +192,45 @@ int Processor::start()
 
   // start thread
   create();
-  if (listen_sd >= 0)
-    center->create_file_event(listen_sd, EVENT_READABLE, EventCallbackRef(new C_handle_accept(this)));
 
   return 0;
 }
 
-void Processor::accept()
-{
-  ldout(msgr->cct, 10) << __func__ << " starting" << dendl;
-  // accept
-  entity_addr_t addr;
-  socklen_t slen = sizeof(addr.ss_addr());
-  int sd = ::accept(listen_sd, (sockaddr*)&addr.ss_addr(), &slen);
-  if (sd >= 0) {
-    ldout(msgr->cct,10) << __func__ << " incoming on sd " << sd << dendl;
-
-    msgr->add_accept(sd);
-  } else {
-    ldout(msgr->cct,0) << __func__ << " no incoming connection?  sd = " << sd
-                       << " errno " << errno << " " << cpp_strerror(errno) << dendl;
-  }
-}
-
 void *Processor::entry()
 {
   ldout(msgr->cct, 10) << __func__ << " starting" << dendl;
-  int r;
+  int errors = 0;
 
+  struct pollfd pfd;
+  pfd.fd = listen_sd;
+  pfd.events = POLLIN | POLLERR | POLLNVAL | POLLHUP;
   while (!done) {
-    ldout(msgr->cct,20) << __func__ << " calling poll" << dendl;
-
-    r = center->process_events(30000);
-    if (r < 0) {
-      ldout(msgr->cct,20) << __func__ << " process events failed: "
-                          << cpp_strerror(errno) << dendl;
-      // TODO do something?
+    ldout(msgr->cct, 20) << __func__ << " calling poll" << dendl;
+    int r = poll(&pfd, 1, -1);
+    if (r < 0)
+      break;
+    ldout(msgr->cct,20) << __func__ << " poll got " << r << dendl;
+
+    if (pfd.revents & (POLLERR | POLLNVAL | POLLHUP))
+      break;
+
+    ldout(msgr->cct,10) << __func__ << " pfd.revents=" << pfd.revents << dendl;
+    if (done) break;
+
+    // accept
+    entity_addr_t addr;
+    socklen_t slen = sizeof(addr.ss_addr());
+    int sd = ::accept(listen_sd, (sockaddr*)&addr.ss_addr(), &slen);
+    if (sd >= 0) {
+      errors = 0;
+      ldout(msgr->cct,10) << __func__ << "accepted incoming on sd " << sd << dendl;
+
+      msgr->add_accept(sd);
+    } else {
+      ldout(msgr->cct,0) << __func__ << " no incoming connection?  sd = " << sd
+                         << " errno " << errno << " " << cpp_strerror(errno) << dendl;
+      if (++errors > 4)
+        break;
     }
   }
 
@@ -238,14 +247,11 @@ void *Processor::entry()
 void Processor::stop()
 {
   done = true;
-  ldout(msgr->cct, 10) << __func__ << " processor" << dendl;
+  ldout(msgr->cct,10) << __func__ << dendl;
 
-  if (listen_sd >= 0)
-    center->delete_file_event(listen_sd, EVENT_READABLE);
   if (listen_sd >= 0) {
     ::shutdown(listen_sd, SHUT_RDWR);
   }
-  center->stop();
 
   // wait for thread to stop before closing the socket, to avoid
   // racing against fd re-use.
@@ -260,6 +266,31 @@ void Processor::stop()
   done = false;
 }
 
+void Worker::stop()
+{
+  ldout(msgr->cct, 10) << __func__ << dendl;
+  done = true;
+  center.wakeup();
+}
+
+void *Worker::entry()
+{
+  ldout(msgr->cct, 10) << __func__ << " starting" << dendl;
+  int r;
+
+  while (!done) {
+    ldout(msgr->cct, 20) << __func__ << " calling event process" << dendl;
+
+    r = center.process_events(30000);
+    if (r < 0) {
+      ldout(msgr->cct,20) << __func__ << " process events failed: "
+                          << cpp_strerror(errno) << dendl;
+      // TODO do something?
+    }
+  }
+
+  return 0;
+}
 
 /*******************
  * AsyncMessenger
@@ -268,17 +299,20 @@ void Processor::stop()
 AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name,
                                string mname, uint64_t _nonce)
   : SimplePolicyMessenger(cct, name,mname, _nonce),
-    processor(this, _nonce, &center),
+    conn_id(0),
+    processor(this, _nonce),
     lock("AsyncMessenger::lock"),
     nonce(_nonce), did_bind(false),
     global_seq(0),
-    cluster_protocol(0), stopped(true),
-    local_connection(new AsyncConnection(cct, this)),
-    center(cct)
+    cluster_protocol(0), stopped(true)
 {
   ceph_spin_init(&global_seq_lock);
+  for (int i = 0; i < cct->_conf->ms_event_op_threads; ++i) {
+    Worker *w = new Worker(this, cct);
+    workers.push_back(w);
+  }
+  local_connection = new AsyncConnection(cct, this, &workers[0]->center);
   init_local_connection();
-  center.init(5000);
 }
 
 /**
@@ -302,10 +336,12 @@ void AsyncMessenger::ready()
 int AsyncMessenger::shutdown()
 {
   ldout(cct,10) << __func__ << "shutdown " << get_myaddr() << dendl;
-  center.stop();
+  for (vector<Worker*>::iterator it = workers.begin(); it != workers.end(); ++it)
+    (*it)->stop();
   mark_down_all();
 
   // break ref cycles on the loopback connection
+  processor.stop();
   local_connection->set_priv(NULL);
   stop_cond.Signal();
   stopped = true;
@@ -336,7 +372,11 @@ int AsyncMessenger::rebind(const set<int>& avoid_ports)
 {
   ldout(cct,1) << __func__ << " rebind avoid " << avoid_ports << dendl;
   assert(did_bind);
-  center.stop();
+  for (vector<Worker*>::iterator it = workers.begin(); it != workers.end(); ++it) {
+    (*it)->stop();
+    (*it)->join();
+  }
+
   processor.stop();
   mark_down_all();
   return processor.rebind(avoid_ports);
@@ -359,9 +399,10 @@ int AsyncMessenger::start()
     _init_local_connection();
   }
 
-  lock.Unlock();
+  for (vector<Worker*>::iterator it = workers.begin(); it != workers.end(); ++it)
+    (*it)->create();
 
-  center.start();
+  lock.Unlock();
   return 0;
 }
 
@@ -374,6 +415,9 @@ void AsyncMessenger::wait()
   }
   if (!stopped)
     stop_cond.Wait(lock);
+
+  for (vector<Worker*>::iterator it = workers.begin(); it != workers.end(); ++it)
+    (*it)->join();
   lock.Unlock();
 
   // done!  clean up.
@@ -402,9 +446,11 @@ void AsyncMessenger::wait()
 AsyncConnectionRef AsyncMessenger::add_accept(int sd)
 {
   lock.Lock();
-  AsyncConnectionRef conn = new AsyncConnection(cct, this);
-  conn->accept(sd);
+  Worker *w = workers[conn_id % workers.size()];
+  AsyncConnectionRef conn = new AsyncConnection(cct, this, &w->center);
+  w->center.dispatch_event_external(EventCallbackRef(new C_handle_accept(conn, sd)));
   accepting_conns.insert(conn);
+  conn_id++;
   lock.Unlock();
   return conn;
 }
@@ -418,10 +464,12 @@ AsyncConnectionRef AsyncMessenger::create_connect(const entity_addr_t& addr, int
                  << ", creating connection and registering" << dendl;
 
   // create connection
-  AsyncConnectionRef conn = new AsyncConnection(cct, this);
-  conn->connect(addr, type);
+  Worker *w = workers[conn_id % workers.size()];
+  AsyncConnectionRef conn = new AsyncConnection(cct, this, &w->center);
+  w->center.dispatch_event_external(EventCallbackRef(new C_handle_connect(conn, addr, type)));
   assert(!conns.count(addr));
   conns[addr] = conn;
+  conn_id++;
 
   return conn;
 }
index a8d70b2cb76764d607c7a16b5cb42c83737812cf..087f557de4977ec247c062cd315b19aeee18b056 100644 (file)
@@ -36,10 +36,9 @@ class Processor : public Thread {
   bool done;
   int listen_sd;
   uint64_t nonce;
-  EventCenter *center;
 
   public:
-  Processor(AsyncMessenger *r, uint64_t n, EventCenter *c) : msgr(r), done(false), listen_sd(-1), nonce(n), center(c) {}
+  Processor(AsyncMessenger *r, uint64_t n) : msgr(r), done(false), listen_sd(-1), nonce(n) {}
 
   void *entry();
   void stop();
@@ -49,6 +48,19 @@ class Processor : public Thread {
   void accept();
 };
 
+class Worker : public Thread {
+  AsyncMessenger *msgr;
+  bool done;
+
+ public:
+  EventCenter center;
+  Worker(AsyncMessenger *m, CephContext *c): msgr(m), done(false), center(c) {
+    center.init(5000);
+  }
+  void *entry();
+  void stop();
+};
+
 
 /*
  * This class handles transmission and reception of messages. Generally
@@ -167,7 +179,10 @@ public:
    */
 
   Connection *create_anon_connection() {
-    return new AsyncConnection(cct, this);
+    Mutex::Locker l(lock);
+    Worker *w = workers[conn_id % workers.size()];
+    conn_id++;
+    return new AsyncConnection(cct, this, &w->center);
   }
 
   /**
@@ -226,6 +241,9 @@ private:
   int _send_message(Message *m, const entity_inst_t& dest);
 
  private:
+  vector<Worker*> workers;
+  int conn_id;
+
   Processor processor;
   friend class Processor;
 
@@ -264,6 +282,7 @@ private:
    *
    * These are not yet in the conns map.
    */
+  // FIXME clear up
   set<AsyncConnectionRef> accepting_conns;
 
   /// internal cluster protocol version, if any, for talking to entities of the same type.
@@ -277,11 +296,6 @@ private:
     ceph::unordered_map<entity_addr_t, AsyncConnectionRef>::iterator p = conns.find(k);
     if (p == conns.end())
       return NULL;
-    if (!p->second->is_connected()) {
-      // FIXME
-      conns.erase(p);
-      return NULL;
-    }
     return p->second;
   }
 
@@ -306,8 +320,6 @@ public:
   /// con used for sending messages to ourselves
   ConnectionRef local_connection;
 
-  EventCenter center;
-
   /**
    * @defgroup AsyncMessenger internals
    * @{
index d1ef51e0be020083f4958111e7dbf2dddc2de0d6..3f3cfdc2b3f84939eb5877a367b3c9fbc8cc5e94 100644 (file)
@@ -80,7 +80,6 @@ EventCenter::~EventCenter()
 int EventCenter::create_file_event(int fd, int mask, EventCallbackRef ctxt)
 {
   int r;
-  Mutex::Locker l(lock);
   if (file_events.size() > nevent) {
     int new_size = nevent << 2;
     ldout(cct, 10) << __func__ << " event count exceed " << nevent << ", expand to " << new_size << dendl;
@@ -117,8 +116,6 @@ int EventCenter::create_file_event(int fd, int mask, EventCallbackRef ctxt)
 
 void EventCenter::delete_file_event(int fd, int mask)
 {
-  Mutex::Locker l(lock);
-
   EventCenter::FileEvent *event = _get_file_event(fd);
   if (!event)
     return ;
@@ -141,20 +138,9 @@ void EventCenter::delete_file_event(int fd, int mask)
 
 uint64_t EventCenter::create_time_event(uint64_t milliseconds, EventCallbackRef ctxt)
 {
-  Mutex::Locker l(lock);
   uint64_t id = time_event_next_id++;
 
   ldout(cct, 10) << __func__ << " id=" << id << " expire time=" << milliseconds << dendl;
-  // Direct dispatch
-  if (milliseconds == 0) {
-    FiredEvent e;
-    e.time_event.id = id;
-    e.time_event.time_cb = ctxt;
-    e.is_file = false;
-    event_wq.queue(e);
-    return id;
-  }
-
   EventCenter::TimeEvent event;
   utime_t expire;
   struct timeval tv;
@@ -171,19 +157,13 @@ uint64_t EventCenter::create_time_event(uint64_t milliseconds, EventCallbackRef
   time_events[id] = event;
 
   if (expire < next_wake) {
-    char buf[1];
-    buf[0] = 'c';
-    // wake up "event_wait"
-    int n = write(notify_send_fd, buf, 1);
-    // FIXME ?
-    assert(n == 1);
+    wakeup();
   }
   return id;
 }
 
 void EventCenter::delete_time_event(uint64_t id)
 {
-  Mutex::Locker l(lock);
   for (map<utime_t, uint64_t>::iterator it = time_to_ids.begin();
        it != time_to_ids.end(); it++) {
     if (it->second == id) {
@@ -195,21 +175,9 @@ void EventCenter::delete_time_event(uint64_t id)
   }
 }
 
-void EventCenter::start()
-{
-  ldout(cct, 1) << __func__ << dendl;
-  Mutex::Locker l(lock);
-  event_tp.start();
-  tp_stop = false;
-}
-
-void EventCenter::stop()
+void EventCenter::wakeup()
 {
   ldout(cct, 1) << __func__ << dendl;
-  if (!tp_stop) {
-    event_tp.stop();
-    tp_stop = true;
-  }
   char buf[1];
   buf[0] = 'c';
   // wake up "event_wait"
@@ -220,7 +188,6 @@ void EventCenter::stop()
 
 int EventCenter::process_time_events()
 {
-  Mutex::Locker l(lock);
   int processed = 0;
   time_t now = time(NULL);
   utime_t cur = ceph_clock_now(cct);
@@ -249,13 +216,9 @@ int EventCenter::process_time_events()
        it != time_to_ids.end(); ) {
     prev = it;
     if (cur >= it->first) {
-      FiredEvent e;
-      e.time_event.id = it->second;
-      e.time_event.time_cb = time_events[it->second].time_cb;
-      e.is_file = false;
-      event_wq.queue(e);
       ldout(cct, 10) << __func__ << " queue time event: id=" << it->second << " time is "
                      << it->first << dendl;
+      time_events[it->second].time_cb->do_request(it->first);
       processed++;
       ++it;
       time_to_ids.erase(prev);
@@ -283,7 +246,6 @@ int EventCenter::process_events(int timeout_millionseconds)
   shortest.set_from_timeval(&tv);
 
   {
-    Mutex::Locker l(lock);
     for (map<utime_t, uint64_t>::iterator it = time_to_ids.begin();
           it != time_to_ids.end(); ++it) {
       ldout(cct, 10) << __func__ << " time_to_ids " << it->first << " id=" << it->second << dendl;
@@ -308,15 +270,48 @@ int EventCenter::process_events(int timeout_millionseconds)
   vector<FiredFileEvent> fired_events;
   numevents = driver->event_wait(fired_events, &tv);
   for (int j = 0; j < numevents; j++) {
-    FiredEvent e;
-    e.file_event = fired_events[j];
-    e.is_file = true;
-    event_wq.queue(e);
-    ldout(cct, 10) << __func__ << " event_wq queue fd is " << fired_events[j].fd << " mask is " << fired_events[j].mask << dendl;
+    int rfired = 0;
+    FileEvent *event = _get_file_event(fired_events[j].fd);
+    if (!event)
+      continue;
+
+    /* note the event->mask & mask & ... code: maybe an already processed
+    * event removed an element that fired and we still didn't
+    * processed, so we check if the event is still valid. */
+    if (event->mask & fired_events[j].mask & EVENT_READABLE) {
+      rfired = 1;
+      event->read_cb->do_request(fired_events[j].fd);
+    }
+    event = _get_file_event(fired_events[j].fd);
+    if (!event)
+      continue;
+
+    if (event->mask & fired_events[j].mask & EVENT_WRITABLE) {
+      if (!rfired || event->read_cb != event->write_cb)
+        event->write_cb->do_request(fired_events[j].fd);
+    }
+
+    ldout(cct, 20) << __func__ << " event_wq queue fd is " << fired_events[j].fd << " mask is " << fired_events[j].mask << dendl;
   }
 
   if (trigger_time)
     numevents += process_time_events();
 
+  {
+    Mutex::Locker l(lock);
+    while (!external_events.empty()) {
+      EventCallbackRef e = external_events.front();
+      external_events.pop_front();
+      e->do_request(0);
+    }
+  }
   return numevents;
 }
+
+void EventCenter::dispatch_event_external(EventCallbackRef e)
+{
+  lock.Lock();
+  external_events.push_back(e);
+  lock.Unlock();
+  wakeup();
+}
index cc6aee454d01fe048c3f1dafa08ec23374ee5f0a..d761ea874e51c3934255a22d64031945903975ad 100644 (file)
@@ -47,19 +47,6 @@ struct FiredFileEvent {
   int mask;
 };
 
-struct FiredTimeEvent {
-  uint64_t id;
-  EventCallbackRef time_cb;
-};
-
-struct FiredEvent {
-  FiredFileEvent file_event;
-  FiredTimeEvent time_event;
-  bool is_file;
-
-  FiredEvent(): is_file(true) {}
-};
-
 class EventDriver {
  public:
   virtual ~EventDriver() {}       // we want a virtual destructor!!!
@@ -85,22 +72,22 @@ class EventCenter {
     TimeEvent(): id(0) {}
   };
 
+  CephContext *cct;
+  uint64_t nevent;
+  // Used only to external event
   Mutex lock;
+  deque<EventCallbackRef> external_events;
   unordered_map<int, FileEvent> file_events;
+  EventDriver *driver;
   // The second element is id
   map<utime_t, uint64_t> time_to_ids;
   // The first element is id
   unordered_map<uint64_t, TimeEvent> time_events;
-  EventDriver *driver;
-  CephContext *cct;
-  uint64_t nevent;
   uint64_t time_event_next_id;
-  ThreadPool event_tp;
   time_t last_time; // last time process time event
   int notify_receive_fd;
   int notify_send_fd;
   utime_t next_wake;
-  bool tp_stop;
 
   int process_time_events();
   FileEvent *_get_file_event(int fd) {
@@ -111,98 +98,26 @@ class EventCenter {
     return NULL;
   }
 
-  struct EventWQ : public ThreadPool::WorkQueueVal<FiredEvent> {
-    EventCenter *center;
-    // In order to ensure the file descriptor is unique in conn_queue,
-    // pending is introduced to check
-    //
-    deque<FiredEvent> conn_queue;
-    // used only by file event <File Descriptor, Mask>
-    unordered_map<int, int> pending;
-
-    EventWQ(EventCenter *c, time_t timeout, time_t suicide_timeout, ThreadPool *tp)
-      : ThreadPool::WorkQueueVal<FiredEvent>("Event::EventWQ", timeout, suicide_timeout, tp), center(c) {}
-
-    void _enqueue(FiredEvent e) {
-      if (e.is_file) {
-        // Ensure only one thread process one file descriptor
-        unordered_map<int, int>::iterator it = pending.find(e.file_event.fd);
-        if (it != pending.end()) {
-          it->second |= e.file_event.mask;
-        } else {
-          pending[e.file_event.fd] = e.file_event.mask;
-          conn_queue.push_back(e);
-        }
-      } else {
-        conn_queue.push_back(e);
-      }
-    }
-    void _enqueue_front(FiredEvent e) {
-      assert(0);
-    }
-    void _dequeue(FiredEvent c) {
-      assert(0);
-    }
-    bool _empty() {
-      return conn_queue.empty();
-    }
-    FiredEvent _dequeue() {
-      assert(!conn_queue.empty());
-      FiredEvent e = conn_queue.front();
-      conn_queue.pop_front();
-      if (e.is_file) {
-        e.file_event.mask = pending[e.file_event.fd];
-        pending.erase(e.file_event.fd);
-      }
-      return e;
-    }
-    void _process(FiredEvent e, ThreadPool::TPHandle &handle) {
-      if (e.is_file) {
-        int rfired = 0;
-        FileEvent *event = center->get_file_event(e.file_event.fd);
-        if (!event)
-          return ;
-
-        /* note the event->mask & mask & ... code: maybe an already processed
-        * event removed an element that fired and we still didn't
-        * processed, so we check if the event is still valid. */
-        if (event->mask & e.file_event.mask & EVENT_READABLE) {
-          rfired = 1;
-          event->read_cb->do_request(e.file_event.fd);
-        }
-        if (event->mask & e.file_event.mask & EVENT_WRITABLE) {
-          if (!rfired || event->read_cb != event->write_cb)
-            event->write_cb->do_request(e.file_event.fd);
-        }
-      } else {
-        e.time_event.time_cb->do_request(e.time_event.id);
-      }
-    }
-    void _clear() {
-    }
-  } event_wq;
-
  public:
   EventCenter(CephContext *c):
-    lock("EventCenter::lock"), driver(NULL), cct(c), nevent(0), time_event_next_id(0),
-    event_tp(c, "EventCenter::event_tp", c->_conf->ms_event_op_threads, "eventcenter_op_threads"),
-    notify_receive_fd(-1), notify_send_fd(-1),tp_stop(true),
-    event_wq(this, c->_conf->ms_event_thread_timeout, c->_conf->ms_event_thread_suicide_timeout, &event_tp) {
+    cct(c), nevent(0),
+    lock("AsyncMessenger::lock"),
+    driver(NULL), time_event_next_id(0),
+    notify_receive_fd(-1), notify_send_fd(-1) {
     last_time = time(NULL);
   }
   ~EventCenter();
   int init(int nevent);
+  // Used by internal thread
   int create_file_event(int fd, int mask, EventCallbackRef ctxt);
   uint64_t create_time_event(uint64_t milliseconds, EventCallbackRef ctxt);
   void delete_file_event(int fd, int mask);
   void delete_time_event(uint64_t id);
   int process_events(int timeout_milliseconds);
-  void start();
-  void stop();
-  FileEvent *get_file_event(int fd) {
-    Mutex::Locker l(lock);
-    return _get_file_event(fd);
-  }
+  void wakeup();
+
+  // Used by external thread
+  void dispatch_event_external(EventCallbackRef e);
 };
 
 #endif