]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
Event: don't wakeup if caller is thread self 7453/head
authorHaomai Wang <haomai@xsky.com>
Sat, 30 Jan 2016 18:39:20 +0000 (02:39 +0800)
committerHaomai Wang <haomai@xsky.com>
Sun, 31 Jan 2016 05:22:43 +0000 (13:22 +0800)
Signed-off-by: Haomai Wang <haomai@xsky.com>
src/msg/async/AsyncMessenger.cc
src/msg/async/Event.cc
src/msg/async/Event.h
src/test/msgr/test_async_driver.cc
src/test/perf_local.cc

index 9d7619dc1c0d36c3bbb74b6e96e2b459b75121a5..b54db4e3b48c886ab4a4746f0652922c3fa19532 100644 (file)
@@ -286,7 +286,7 @@ void *Worker::entry()
     }
   }
 
-  center.set_owner(pthread_self());
+  center.set_owner();
   while (!done) {
     ldout(cct, 20) << __func__ << " calling event process" << dendl;
 
index 04887b874d7bdf74aa2bffed5a022155d5f3ee87..f82929673663a4cec860ecd60688c52241661f9c 100644 (file)
@@ -62,6 +62,8 @@ ostream& EventCenter::_event_prefix(std::ostream *_dout)
                 << " time_id=" << time_event_next_id << ").";
 }
 
+static thread_local pthread_t thread_id = 0;
+
 int EventCenter::init(int n)
 {
   // can't init multi times
@@ -126,6 +128,12 @@ EventCenter::~EventCenter()
     free(file_events);
 }
 
+
+void EventCenter::set_owner()
+{
+  thread_id = owner = pthread_self();
+}
+
 int EventCenter::create_file_event(int fd, int mask, EventCallbackRef ctxt)
 {
   int r = 0;
@@ -334,15 +342,21 @@ int EventCenter::process_events(int timeout_microseconds)
   int numevents;
   bool trigger_time = false;
 
-  utime_t period, shortest, now = ceph_clock_now(cct);
-  now.copy_to_timeval(&tv);
-  if (timeout_microseconds > 0) {
-    tv.tv_sec += timeout_microseconds / 1000000;
-    tv.tv_usec += timeout_microseconds % 1000000;
-  }
-  shortest.set_from_timeval(&tv);
+  utime_t now = ceph_clock_now(cct);;
+  // If exists external events, don't block
+  if (external_num_events.read()) {
+    tv.tv_sec = 0;
+    tv.tv_usec = 0;
+    next_time = now;
+  } else {
+    utime_t period, shortest;
+    now.copy_to_timeval(&tv);
+    if (timeout_microseconds > 0) {
+      tv.tv_sec += timeout_microseconds / 1000000;
+      tv.tv_usec += timeout_microseconds % 1000000;
+    }
+    shortest.set_from_timeval(&tv);
 
-  {
     Mutex::Locker l(time_lock);
     map<utime_t, list<TimeEvent> >::iterator it = time_events.begin();
     if (it != time_events.end() && shortest >= it->first) {
@@ -360,11 +374,11 @@ int EventCenter::process_events(int timeout_microseconds)
       tv.tv_sec = timeout_microseconds / 1000000;
       tv.tv_usec = timeout_microseconds % 1000000;
     }
+    next_time = shortest;
   }
 
   ldout(cct, 10) << __func__ << " wait second " << tv.tv_sec << " usec " << tv.tv_usec << dendl;
   vector<FiredFileEvent> fired_events;
-  next_time = shortest;
   numevents = driver->event_wait(fired_events, &tv);
   file_lock.Lock();
   for (int j = 0; j < numevents; j++) {
@@ -402,18 +416,21 @@ int EventCenter::process_events(int timeout_microseconds)
   if (trigger_time)
     numevents += process_time_events();
 
-  external_lock.Lock();
-  if (external_events.empty()) {
-    external_lock.Unlock();
-  } else {
-    deque<EventCallbackRef> cur_process;
-    cur_process.swap(external_events);
-    external_lock.Unlock();
-    while (!cur_process.empty()) {
-      EventCallbackRef e = cur_process.front();
-      if (e)
-        e->do_request(0);
-      cur_process.pop_front();
+  if (external_num_events.read()) {
+    external_lock.Lock();
+    if (external_events.empty()) {
+      external_lock.Unlock();
+    } else {
+      deque<EventCallbackRef> cur_process;
+      cur_process.swap(external_events);
+      external_num_events.set(0);
+      external_lock.Unlock();
+      while (!cur_process.empty()) {
+        EventCallbackRef e = cur_process.front();
+        if (e)
+          e->do_request(0);
+        cur_process.pop_front();
+      }
     }
   }
   return numevents;
@@ -423,6 +440,10 @@ void EventCenter::dispatch_event_external(EventCallbackRef e)
 {
   external_lock.Lock();
   external_events.push_back(e);
+  uint64_t num = external_num_events.inc();
   external_lock.Unlock();
-  wakeup();
+  if (thread_id != owner)
+    wakeup();
+
+  ldout(cct, 10) << __func__ << " " << e << " pending " << num << dendl;
 }
index 2575130305b03d74492e5d751e929644278f0e9c..1b5b401b2e253dd77d8a70d591227b181d389135 100644 (file)
@@ -103,6 +103,7 @@ class EventCenter {
   int nevent;
   // Used only to external event
   Mutex external_lock, file_lock, time_lock;
+  atomic_t external_num_events;
   deque<EventCallbackRef> external_events;
   FileEvent *file_events;
   EventDriver *driver;
@@ -132,6 +133,7 @@ class EventCenter {
     external_lock("AsyncMessenger::external_lock"),
     file_lock("AsyncMessenger::file_lock"),
     time_lock("AsyncMessenger::time_lock"),
+    external_num_events(0),
     file_events(NULL),
     driver(NULL), time_event_next_id(1),
     notify_receive_fd(-1), notify_send_fd(-1), net(c), owner(0), already_wakeup(0) {
@@ -141,7 +143,7 @@ class EventCenter {
   ostream& _event_prefix(std::ostream *_dout);
 
   int init(int nevent);
-  void set_owner(pthread_t p) { owner = p; }
+  void set_owner();
   pthread_t get_owner() { return owner; }
 
   // Used by internal thread
index fb4637426447804facd7fcf2306f5ecb157471e3..1e96d984568de5bd6dcf730f7537b2bfee0e2831 100644 (file)
@@ -280,7 +280,7 @@ class Worker : public Thread {
     center.wakeup();
   }
   void* entry() {
-    center.set_owner(pthread_self());
+    center.set_owner();
     while (!done)
       center.process_events(1000000);
     return 0;
index fe642588f4c660742a24f12c7e8080535c831093..fc8990879f7dc2476f00d723767a5c14be36c8d0 100644 (file)
@@ -451,7 +451,7 @@ double eventcenter_poll()
   int count = 1000000;
   EventCenter center(g_ceph_context);
   center.init(1000);
-  center.set_owner(pthread_self());
+  center.set_owner();
   uint64_t start = Cycles::rdtsc();
   for (int i = 0; i < count; i++) {
     center.process_events(0);
@@ -474,7 +474,7 @@ class CenterWorker : public Thread {
     center.wakeup();
   }
   void* entry() {
-    center.set_owner(pthread_self());
+    center.set_owner();
     bind_thread_to_cpu(2);
     while (!done)
       center.process_events(1000);