From: Haomai Wang Date: Fri, 12 Sep 2014 07:54:36 +0000 (+0800) Subject: Add wake up to EventCenter X-Git-Tag: v0.88~37^2~4^2~30 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=13ed6c0050bb7a60621349b6b5772ec3f021cba5;p=ceph.git Add wake up to EventCenter Signed-off-by: Haomai Wang --- diff --git a/src/msg/AsyncConnection.cc b/src/msg/AsyncConnection.cc index a1f71de8207..cf8c226ef00 100644 --- a/src/msg/AsyncConnection.cc +++ b/src/msg/AsyncConnection.cc @@ -63,7 +63,12 @@ class C_handle_dispatch : public EventCallback { public: C_handle_dispatch(AsyncMessenger *msgr, Message *m): msgr(msgr), m(m) {} void do_request(int id) { - msgr->ms_deliver_dispatch(m); + msgr->ms_fast_preprocess(m); + if (msgr->ms_can_fast_dispatch(m)) { + msgr->ms_fast_dispatch(m); + } else { + msgr->ms_deliver_dispatch(m); + } } }; @@ -651,12 +656,7 @@ void AsyncConnection::process() in_seq = message->get_seq(); ldout(async_msgr->cct, 10) << __func__ << " got message " << message->get_seq() << " " << message << " " << *message << dendl; - async_msgr->ms_fast_preprocess(message); - if (async_msgr->ms_can_fast_dispatch(message)) { - async_msgr->ms_fast_dispatch(message); - } else { - center->create_time_event(0, new C_handle_dispatch(async_msgr, message)); - } + center->create_time_event(0, new C_handle_dispatch(async_msgr, message)); state = STATE_OPEN; break; diff --git a/src/msg/AsyncMessenger.h b/src/msg/AsyncMessenger.h index 751d87fdcc5..a8d70b2cb76 100644 --- a/src/msg/AsyncMessenger.h +++ b/src/msg/AsyncMessenger.h @@ -143,6 +143,8 @@ public: * @{ */ virtual int send_message(Message *m, const entity_inst_t& dest) { + Mutex::Locker l(lock); + return _send_message(m, dest); } diff --git a/src/msg/Event.cc b/src/msg/Event.cc index 3c3a22ffb70..71f019fb84d 100644 --- a/src/msg/Event.cc +++ b/src/msg/Event.cc @@ -20,6 +20,13 @@ #undef dout_prefix #define dout_prefix *_dout << "Event " +class C_handle_notify : public EventCallback { + public: + C_handle_notify() {} + void do_request(int fd_or_id) { + } +}; + int EventCenter::init(int n) { // can't init multi times @@ -45,7 +52,17 @@ int EventCenter::init(int n) return r; } + int fds[2]; + if (pipe(fds) < 0) { + lderr(cct) << __func__ << " can't create notify pipe" << dendl; + return -1; + } + + notify_receive_fd = fds[0]; + notify_send_fd = fds[1]; + nevent = n; + create_file_event(notify_receive_fd, EVENT_READABLE, new C_handle_notify()); return 0; } @@ -53,6 +70,11 @@ EventCenter::~EventCenter() { if (driver) delete driver; + + if (notify_receive_fd > 0) + ::close(notify_receive_fd); + if (notify_send_fd > 0) + ::close(notify_send_fd); } int EventCenter::create_file_event(int fd, int mask, EventCallback *ctxt) @@ -154,6 +176,14 @@ uint64_t EventCenter::create_time_event(uint64_t milliseconds, EventCallback *ct time_to_ids[expire] = id; time_events[id] = event; + if (expire < next_wake) { + char buf[1]; + buf[0] = 'c'; + // wake up "event_wait" + int n = write(notify_send_fd, buf, 1); + // FIXME ? + assert(n == 1); + } return id; } @@ -183,6 +213,12 @@ void EventCenter::stop() ldout(cct, 1) << __func__ << dendl; Mutex::Locker l(lock); event_tp.stop(); + char buf[1]; + buf[0] = 'c'; + // wake up "event_wait" + int n = write(notify_send_fd, buf, 1); + // FIXME ? + assert(n == 1); } int EventCenter::process_time_events() @@ -267,6 +303,8 @@ int EventCenter::process_events(int timeout_millionseconds) tv.tv_sec = timeout_millionseconds / 1000; tv.tv_usec = (timeout_millionseconds % 1000) * 1000; } + + next_wake = shortest; } ldout(cct, 10) << __func__ << " wait second " << tv.tv_sec << " usec " << tv.tv_usec << dendl; diff --git a/src/msg/Event.h b/src/msg/Event.h index 19da2a28455..caebc8af612 100644 --- a/src/msg/Event.h +++ b/src/msg/Event.h @@ -97,6 +97,9 @@ class EventCenter { uint64_t time_event_next_id; ThreadPool event_tp; time_t last_time; // last time process time event + int notify_receive_fd; + int notify_send_fd; + utime_t next_wake; int process_time_events(); FileEvent *_get_file_event(int fd) { @@ -185,6 +188,7 @@ class EventCenter { EventCenter(CephContext *c): lock("EventCenter::lock"), driver(NULL), cct(c), nevent(0), time_event_next_id(0), event_tp(c, "EventCenter::event_tp", c->_conf->ms_event_op_threads, "eventcenter_op_threads"), + notify_receive_fd(-1), notify_send_fd(-1), event_wq(this, c->_conf->ms_event_thread_timeout, c->_conf->ms_event_thread_suicide_timeout, &event_tp) { last_time = time(NULL); } diff --git a/src/msg/EventEpoll.cc b/src/msg/EventEpoll.cc index 6d5ec5964bc..c12e3b36b99 100644 --- a/src/msg/EventEpoll.cc +++ b/src/msg/EventEpoll.cc @@ -75,7 +75,7 @@ void EpollDriver::del_event(int fd, int cur_mask, int delmask) int mask = cur_mask & (~delmask); - ee.events = 0; + ee.events = EPOLLET; if (mask & EVENT_READABLE) ee.events |= EPOLLIN; if (mask & EVENT_WRITABLE) ee.events |= EPOLLOUT; ee.data.u64 = 0; /* avoid valgrind warning */