]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/async/Event: remove event wakeup flag
authorHaomai Wang <haomai@xsky.com>
Sun, 10 Jul 2016 08:19:29 +0000 (16:19 +0800)
committerHaomai Wang <haomai@xsky.com>
Tue, 16 Aug 2016 15:17:55 +0000 (23:17 +0800)
Now only dispatch external event will wakeup event thread(previously
delete_time_event will call wakeup), we only need to use
"external_num_events" to indicate whether we have extra events.

Signed-off-by: Haomai Wang <haomai@xsky.com>
src/msg/async/Event.cc
src/msg/async/Event.h

index b3b12ddd03036dbf47265891ea966c4f0e14de3a..5162059552b6dced99666f1ecb6a94c06a6df17c 100644 (file)
@@ -39,14 +39,14 @@ class C_handle_notify : public EventCallback {
   C_handle_notify(EventCenter *c, CephContext *cc): center(c), cct(cc) {}
   void do_request(int fd_or_id) {
     char c[256];
+    int r = 0;
     do {
-      center->already_wakeup.set(0);
-      int r = read(fd_or_id, c, sizeof(c));
+      r = read(fd_or_id, c, sizeof(c));
       if (r < 0) {
-        ldout(cct, 1) << __func__ << " read notify pipe failed: " << cpp_strerror(errno) << dendl;
-        break;
+        if (errno != EAGAIN)
+          ldout(cct, 1) << __func__ << " read notify pipe failed: " << cpp_strerror(errno) << dendl;
       }
-    } while (center->already_wakeup.read());
+    } while (r > 0);
   }
 };
 
@@ -260,15 +260,15 @@ void EventCenter::delete_time_event(uint64_t id)
 
 void EventCenter::wakeup()
 {
-    ldout(cct, 1) << __func__ << dendl;
-    already_wakeup.compare_and_swap(0, 1);
-
-    char buf[1];
-    buf[0] = 'c';
-    // wake up "event_wait"
-    int n = write(notify_send_fd, buf, 1);
-    // FIXME ?
-    assert(n == 1);
+  ldout(cct, 1) << __func__ << dendl;
+
+  char buf = 'c';
+  // wake up "event_wait"
+  int n = write(notify_send_fd, &buf, sizeof(buf));
+  if (n < 0) {
+    ldout(cct, 1) << __func__ << " write notify pipe failed: " << cpp_strerror(errno) << dendl;
+    assert(0);
+  }
 }
 
 int EventCenter::process_time_events()
@@ -361,21 +361,16 @@ int EventCenter::process_events(int timeout_microseconds)
 
   if (external_num_events.load()) {
     external_lock.lock();
-    if (external_events.empty()) {
-      external_lock.unlock();
-    } else {
-      deque<EventCallbackRef> cur_process;
-      cur_process.swap(external_events);
-      external_num_events.store(0);
-      external_lock.unlock();
-      while (!cur_process.empty()) {
-        EventCallbackRef e = cur_process.front();
-        ldout(cct, 20) << __func__ << " do " << e << dendl;
-        if (e)
-          e->do_request(0);
-        cur_process.pop_front();
-        numevents++;
-      }
+    deque<EventCallbackRef> cur_process;
+    cur_process.swap(external_events);
+    external_num_events.store(0);
+    external_lock.unlock();
+    while (!cur_process.empty()) {
+      EventCallbackRef e = cur_process.front();
+      ldout(cct, 20) << __func__ << " do " << e << dendl;
+      e->do_request(0);
+      cur_process.pop_front();
+      numevents++;
     }
   }
   return numevents;
@@ -385,9 +380,10 @@ void EventCenter::dispatch_event_external(EventCallbackRef e)
 {
   external_lock.lock();
   external_events.push_back(e);
+  bool wake = !external_num_events.load();
   uint64_t num = ++external_num_events;
   external_lock.unlock();
-  if (!in_thread())
+  if (!in_thread() && wake)
     wakeup();
 
   ldout(cct, 20) << __func__ << " " << e << " pending " << num << dendl;
index a320489ceda2c13513e2dc591ce5e5ceecee8a5a..daa4537eef6e86074e8973c5592422161a702177 100644 (file)
@@ -136,16 +136,12 @@ class EventCenter {
   }
 
  public:
-  atomic_t already_wakeup;
-
   explicit EventCenter(CephContext *c):
     cct(c), nevent(0),
     external_num_events(0),
     driver(NULL), time_event_next_id(1),
     notify_receive_fd(-1), notify_send_fd(-1), net(c),
-    notify_handler(NULL),
-    already_wakeup(0) {
-  }
+    notify_handler(NULL) { }
   ~EventCenter();
   ostream& _event_prefix(std::ostream *_dout);