]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
Add wake up to EventCenter
authorHaomai Wang <haomaiwang@gmail.com>
Fri, 12 Sep 2014 07:54:36 +0000 (15:54 +0800)
committerHaomai Wang <haomaiwang@gmail.com>
Wed, 8 Oct 2014 06:04:57 +0000 (14:04 +0800)
Signed-off-by: Haomai Wang <haomaiwang@gmail.com>
src/msg/AsyncConnection.cc
src/msg/AsyncMessenger.h
src/msg/Event.cc
src/msg/Event.h
src/msg/EventEpoll.cc

index a1f71de820724ee6dd5adaa596384905f8ad451c..cf8c226ef00677ef65e5ac2bddf50865ce6b26cb 100644 (file)
@@ -63,7 +63,12 @@ class C_handle_dispatch : public EventCallback {
  public:
   C_handle_dispatch(AsyncMessenger *msgr, Message *m): msgr(msgr), m(m) {}
   void do_request(int id) {
-    msgr->ms_deliver_dispatch(m);
+    msgr->ms_fast_preprocess(m);
+    if (msgr->ms_can_fast_dispatch(m)) {
+      msgr->ms_fast_dispatch(m);
+    } else {
+      msgr->ms_deliver_dispatch(m);
+    }
   }
 };
 
@@ -651,12 +656,7 @@ void AsyncConnection::process()
           in_seq = message->get_seq();
           ldout(async_msgr->cct, 10) << __func__ << " got message " << message->get_seq()
                                << " " << message << " " << *message << dendl;
-          async_msgr->ms_fast_preprocess(message);
-          if (async_msgr->ms_can_fast_dispatch(message)) {
-            async_msgr->ms_fast_dispatch(message);
-          } else {
-            center->create_time_event(0, new C_handle_dispatch(async_msgr, message));
-          }
+          center->create_time_event(0, new C_handle_dispatch(async_msgr, message));
 
           state = STATE_OPEN;
           break;
index 751d87fdcc538e2761473447fa04f7b3af309db5..a8d70b2cb76764d607c7a16b5cb42c83737812cf 100644 (file)
@@ -143,6 +143,8 @@ public:
    * @{
    */
   virtual int send_message(Message *m, const entity_inst_t& dest) {
+          Mutex::Locker l(lock);
+
     return _send_message(m, dest);
   }
 
index 3c3a22ffb7069ad8102715419b0ede9104454e55..71f019fb84d9380a42e0305eabb0aa227e84aa33 100644 (file)
 #undef dout_prefix
 #define dout_prefix *_dout << "Event "
 
+class C_handle_notify : public EventCallback {
+ public:
+  C_handle_notify() {}
+  void do_request(int fd_or_id) {
+  }
+};
+
 int EventCenter::init(int n)
 {
   // can't init multi times
@@ -45,7 +52,17 @@ int EventCenter::init(int n)
     return r;
   }
 
+  int fds[2];
+  if (pipe(fds) < 0) {
+    lderr(cct) << __func__ << " can't create notify pipe" << dendl;
+    return -1;
+  }
+
+  notify_receive_fd = fds[0];
+  notify_send_fd = fds[1];
+
   nevent = n;
+  create_file_event(notify_receive_fd, EVENT_READABLE, new C_handle_notify());
   return 0;
 }
 
@@ -53,6 +70,11 @@ EventCenter::~EventCenter()
 {
   if (driver)
     delete driver;
+
+  if (notify_receive_fd > 0)
+    ::close(notify_receive_fd);
+  if (notify_send_fd > 0)
+    ::close(notify_send_fd);
 }
 
 int EventCenter::create_file_event(int fd, int mask, EventCallback *ctxt)
@@ -154,6 +176,14 @@ uint64_t EventCenter::create_time_event(uint64_t milliseconds, EventCallback *ct
   time_to_ids[expire] = id;
   time_events[id] = event;
 
+  if (expire < next_wake) {
+    char buf[1];
+    buf[0] = 'c';
+    // wake up "event_wait"
+    int n = write(notify_send_fd, buf, 1);
+    // FIXME ?
+    assert(n == 1);
+  }
   return id;
 }
 
@@ -183,6 +213,12 @@ void EventCenter::stop()
   ldout(cct, 1) << __func__ << dendl;
   Mutex::Locker l(lock);
   event_tp.stop();
+  char buf[1];
+  buf[0] = 'c';
+  // wake up "event_wait"
+  int n = write(notify_send_fd, buf, 1);
+  // FIXME ?
+  assert(n == 1);
 }
 
 int EventCenter::process_time_events()
@@ -267,6 +303,8 @@ int EventCenter::process_events(int timeout_millionseconds)
       tv.tv_sec = timeout_millionseconds / 1000;
       tv.tv_usec = (timeout_millionseconds % 1000) * 1000;
     }
+
+    next_wake = shortest;
   }
 
   ldout(cct, 10) << __func__ << " wait second " << tv.tv_sec << " usec " << tv.tv_usec << dendl;
index 19da2a28455100de930227cfbd4f50475a9c9478..caebc8af612be96e38d34451de4f2ea0043e69be 100644 (file)
@@ -97,6 +97,9 @@ class EventCenter {
   uint64_t time_event_next_id;
   ThreadPool event_tp;
   time_t last_time; // last time process time event
+  int notify_receive_fd;
+  int notify_send_fd;
+  utime_t next_wake;
 
   int process_time_events();
   FileEvent *_get_file_event(int fd) {
@@ -185,6 +188,7 @@ class EventCenter {
   EventCenter(CephContext *c):
     lock("EventCenter::lock"), driver(NULL), cct(c), nevent(0), time_event_next_id(0),
     event_tp(c, "EventCenter::event_tp", c->_conf->ms_event_op_threads, "eventcenter_op_threads"),
+    notify_receive_fd(-1), notify_send_fd(-1),
     event_wq(this, c->_conf->ms_event_thread_timeout, c->_conf->ms_event_thread_suicide_timeout, &event_tp) {
     last_time = time(NULL);
   }
index 6d5ec5964bc1fb7849b606e718232a553e8a6ca5..c12e3b36b997205dbd73cbf123e0be54e823e221 100644 (file)
@@ -75,7 +75,7 @@ void EpollDriver::del_event(int fd, int cur_mask, int delmask)
 
   int mask = cur_mask & (~delmask);
 
-  ee.events = 0;
+  ee.events = EPOLLET;
   if (mask & EVENT_READABLE) ee.events |= EPOLLIN;
   if (mask & EVENT_WRITABLE) ee.events |= EPOLLOUT;
   ee.data.u64 = 0; /* avoid valgrind warning */