From: Haomai Wang Date: Sat, 30 Jan 2016 18:39:20 +0000 (+0800) Subject: Event: don't wakeup if caller is thread self X-Git-Tag: v10.0.4~54^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=2292dd98c600f21dfe54c2cd2aeca09f764d3bd1;p=ceph.git Event: don't wakeup if caller is thread self Signed-off-by: Haomai Wang --- diff --git a/src/msg/async/AsyncMessenger.cc b/src/msg/async/AsyncMessenger.cc index 9d7619dc1c0..b54db4e3b48 100644 --- a/src/msg/async/AsyncMessenger.cc +++ b/src/msg/async/AsyncMessenger.cc @@ -286,7 +286,7 @@ void *Worker::entry() } } - center.set_owner(pthread_self()); + center.set_owner(); while (!done) { ldout(cct, 20) << __func__ << " calling event process" << dendl; diff --git a/src/msg/async/Event.cc b/src/msg/async/Event.cc index 04887b874d7..f8292967366 100644 --- a/src/msg/async/Event.cc +++ b/src/msg/async/Event.cc @@ -62,6 +62,8 @@ ostream& EventCenter::_event_prefix(std::ostream *_dout) << " time_id=" << time_event_next_id << ")."; } +static thread_local pthread_t thread_id = 0; + int EventCenter::init(int n) { // can't init multi times @@ -126,6 +128,12 @@ EventCenter::~EventCenter() free(file_events); } + +void EventCenter::set_owner() +{ + thread_id = owner = pthread_self(); +} + int EventCenter::create_file_event(int fd, int mask, EventCallbackRef ctxt) { int r = 0; @@ -334,15 +342,21 @@ int EventCenter::process_events(int timeout_microseconds) int numevents; bool trigger_time = false; - utime_t period, shortest, now = ceph_clock_now(cct); - now.copy_to_timeval(&tv); - if (timeout_microseconds > 0) { - tv.tv_sec += timeout_microseconds / 1000000; - tv.tv_usec += timeout_microseconds % 1000000; - } - shortest.set_from_timeval(&tv); + utime_t now = ceph_clock_now(cct);; + // If exists external events, don't block + if (external_num_events.read()) { + tv.tv_sec = 0; + tv.tv_usec = 0; + next_time = now; + } else { + utime_t period, shortest; + now.copy_to_timeval(&tv); + if (timeout_microseconds > 0) { + tv.tv_sec += timeout_microseconds / 1000000; + tv.tv_usec += timeout_microseconds % 1000000; + } + shortest.set_from_timeval(&tv); - { Mutex::Locker l(time_lock); map >::iterator it = time_events.begin(); if (it != time_events.end() && shortest >= it->first) { @@ -360,11 +374,11 @@ int EventCenter::process_events(int timeout_microseconds) tv.tv_sec = timeout_microseconds / 1000000; tv.tv_usec = timeout_microseconds % 1000000; } + next_time = shortest; } ldout(cct, 10) << __func__ << " wait second " << tv.tv_sec << " usec " << tv.tv_usec << dendl; vector fired_events; - next_time = shortest; numevents = driver->event_wait(fired_events, &tv); file_lock.Lock(); for (int j = 0; j < numevents; j++) { @@ -402,18 +416,21 @@ int EventCenter::process_events(int timeout_microseconds) if (trigger_time) numevents += process_time_events(); - external_lock.Lock(); - if (external_events.empty()) { - external_lock.Unlock(); - } else { - deque cur_process; - cur_process.swap(external_events); - external_lock.Unlock(); - while (!cur_process.empty()) { - EventCallbackRef e = cur_process.front(); - if (e) - e->do_request(0); - cur_process.pop_front(); + if (external_num_events.read()) { + external_lock.Lock(); + if (external_events.empty()) { + external_lock.Unlock(); + } else { + deque cur_process; + cur_process.swap(external_events); + external_num_events.set(0); + external_lock.Unlock(); + while (!cur_process.empty()) { + EventCallbackRef e = cur_process.front(); + if (e) + e->do_request(0); + cur_process.pop_front(); + } } } return numevents; @@ -423,6 +440,10 @@ void EventCenter::dispatch_event_external(EventCallbackRef e) { external_lock.Lock(); external_events.push_back(e); + uint64_t num = external_num_events.inc(); external_lock.Unlock(); - wakeup(); + if (thread_id != owner) + wakeup(); + + ldout(cct, 10) << __func__ << " " << e << " pending " << num << dendl; } diff --git a/src/msg/async/Event.h b/src/msg/async/Event.h index 2575130305b..1b5b401b2e2 100644 --- a/src/msg/async/Event.h +++ b/src/msg/async/Event.h @@ -103,6 +103,7 @@ class EventCenter { int nevent; // Used only to external event Mutex external_lock, file_lock, time_lock; + atomic_t external_num_events; deque external_events; FileEvent *file_events; EventDriver *driver; @@ -132,6 +133,7 @@ class EventCenter { external_lock("AsyncMessenger::external_lock"), file_lock("AsyncMessenger::file_lock"), time_lock("AsyncMessenger::time_lock"), + external_num_events(0), file_events(NULL), driver(NULL), time_event_next_id(1), notify_receive_fd(-1), notify_send_fd(-1), net(c), owner(0), already_wakeup(0) { @@ -141,7 +143,7 @@ class EventCenter { ostream& _event_prefix(std::ostream *_dout); int init(int nevent); - void set_owner(pthread_t p) { owner = p; } + void set_owner(); pthread_t get_owner() { return owner; } // Used by internal thread diff --git a/src/test/msgr/test_async_driver.cc b/src/test/msgr/test_async_driver.cc index fb463742644..1e96d984568 100644 --- a/src/test/msgr/test_async_driver.cc +++ b/src/test/msgr/test_async_driver.cc @@ -280,7 +280,7 @@ class Worker : public Thread { center.wakeup(); } void* entry() { - center.set_owner(pthread_self()); + center.set_owner(); while (!done) center.process_events(1000000); return 0; diff --git a/src/test/perf_local.cc b/src/test/perf_local.cc index fe642588f4c..fc8990879f7 100644 --- a/src/test/perf_local.cc +++ b/src/test/perf_local.cc @@ -451,7 +451,7 @@ double eventcenter_poll() int count = 1000000; EventCenter center(g_ceph_context); center.init(1000); - center.set_owner(pthread_self()); + center.set_owner(); uint64_t start = Cycles::rdtsc(); for (int i = 0; i < count; i++) { center.process_events(0); @@ -474,7 +474,7 @@ class CenterWorker : public Thread { center.wakeup(); } void* entry() { - center.set_owner(pthread_self()); + center.set_owner(); bind_thread_to_cpu(2); while (!done) center.process_events(1000);