From: Haomai Wang Date: Fri, 12 Sep 2014 07:52:52 +0000 (+0800) Subject: Fix Time Event problem X-Git-Tag: v0.88~37^2~4^2~34 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=d43b0aeb3e82dcd5badd57fb57b144f93ef306ac;p=ceph.git Fix Time Event problem Signed-off-by: Haomai Wang --- diff --git a/src/msg/AsyncConnection.cc b/src/msg/AsyncConnection.cc index c3f56aa681d9..12d0c15cac2e 100644 --- a/src/msg/AsyncConnection.cc +++ b/src/msg/AsyncConnection.cc @@ -1842,6 +1842,7 @@ void AsyncConnection::handle_ack(uint64_t seq) void AsyncConnection::_send_keepalive_or_ack(bool ack, utime_t *tp) { + assert(lock.is_locked()); bufferlist bl; utime_t t = ceph_clock_now(async_msgr->cct); @@ -1861,7 +1862,7 @@ void AsyncConnection::_send_keepalive_or_ack(bool ack, utime_t *tp) bl.append(CEPH_MSGR_TAG_KEEPALIVE); } - try_send(bl, false); + _try_send(bl, false); } void AsyncConnection::handle_write() diff --git a/src/msg/Event.cc b/src/msg/Event.cc index 7d8bf9c50cf5..e59c2d24d645 100644 --- a/src/msg/Event.cc +++ b/src/msg/Event.cc @@ -15,6 +15,11 @@ #endif #endif +#define dout_subsys ceph_subsys_ms + +#undef dout_prefix +#define dout_prefix *_dout << "Event " + int EventCenter::init(int n) { // can't init multi times @@ -55,7 +60,7 @@ int EventCenter::create_file_event(int fd, int mask, EventCallback *ctxt) { int r; Mutex::Locker l(lock); - if (events.size() > nevent) { + if (file_events.size() > nevent) { int new_size = nevent << 2; ldout(cct, 10) << __func__ << " event count exceed " << nevent << ", expand to " << new_size << dendl; r = driver->resize_events(new_size); @@ -73,8 +78,8 @@ int EventCenter::create_file_event(int fd, int mask, EventCallback *ctxt) return r; if (!event) { - events[fd] = EventCenter::FileEvent(); - event = &events[fd]; + file_events[fd] = EventCenter::FileEvent(); + event = &file_events[fd]; } event->mask |= mask; @@ -98,8 +103,8 @@ void EventCenter::delete_file_event(int fd, int mask) EventCenter::FileEvent *event = _get_file_event(fd); driver->del_event(fd, event ? event->mask: EVENT_NONE, mask); if (!event) { - events[fd] = EventCenter::FileEvent(); - event = &events[fd]; + file_events[fd] = EventCenter::FileEvent(); + event = &file_events[fd]; } if (event->read_cb) @@ -109,27 +114,28 @@ void EventCenter::delete_file_event(int fd, int mask) event->mask = event->mask & (~mask); if (event->mask == EVENT_NONE) - events.erase(fd); + file_events.erase(fd); } uint64_t EventCenter::create_time_event(uint64_t milliseconds, EventCallback *ctxt) { Mutex::Locker l(lock); - uint64_t id = eventLoop->timeEventNextId++; + uint64_t id = time_event_next_id++; EventCenter::TimeEvent event; utime_t expire; struct timeval tv; expire = ceph_clock_now(cct); expire.copy_to_timeval(&tv); - tv.tv_sec = expire.tv_sec + milliseconds / 1000; - tv.tv_usec = expire.tv_usec + milliseconds * 1000; + tv.tv_sec += milliseconds / 1000; + tv.tv_usec += milliseconds * 1000; expire.set_from_timeval(&tv); event.id = id; event.time_cb = ctxt; - time_events[expire] = event; + time_to_ids[expire] = id; + time_events[id] = event; return id; } @@ -137,10 +143,11 @@ uint64_t EventCenter::create_time_event(uint64_t milliseconds, EventCallback *ct void EventCenter::delete_time_event(uint64_t id) { Mutex::Locker l(lock); - for (map::iterator it = time_events.begin(); - it != time_events.end(); it++) { - if (it->second.id == id) { - time_events.erase(it); + for (map::iterator it = time_to_ids.begin(); + it != time_to_ids.end(); it++) { + if (it->second == id) { + time_to_ids.erase(it); + time_events.erase(id); return ; } } @@ -161,26 +168,29 @@ int EventCenter::_process_time_events() * processing events earlier is less dangerous than delaying them * indefinitely, and practice suggests it is. */ if (now < last_time) { - for (map::iterator it = time_events.begin(); - it != time_events.end(); ++it) { - it->first = utime_t(); + map changed; + for (map::iterator it = time_to_ids.begin(); + it != time_to_ids.end(); ++it) { + changed[utime_t()] = it->second; } + time_to_ids.swap(changed); } last_time = now; - map::iterator prev; - for (map::iterator it = time_events.begin(); - it != time_events.end(); ) { + map::iterator prev; + for (map::iterator it = time_to_ids.begin(); + it != time_to_ids.end(); ) { prev = it; if (cur >= it->first) { - FiredEvent event; - e.time_event.id = it->second.id; - e.time_event.time_cb = it->second.time_cb; + FiredEvent e; + e.time_event.id = it->second; + e.time_event.time_cb = time_events[it->second].time_cb; e.is_file = false; event_wq.queue(e); processed++; ++it; - time_events.erase(prev); + time_to_ids.erase(prev); + time_events.erase(prev->second); } else { break; } @@ -206,27 +216,27 @@ int EventCenter::process_events(int timeout_millionseconds) Mutex::Locker l(lock); utime_t shortest = utime_t(&tv); - for (map::iterator it = time_events.begin(); - it != time_events.end(); ++it) { - if (shortes > it->first) { + for (map::iterator it = time_to_ids.begin(); + it != time_to_ids.end(); ++it) { + if (shortest > it->first) { shortest = it->first; trigger_time = true; break; } } - shortes.copy_to_timeval(&tv); + shortest.copy_to_timeval(&tv); vector fired_events; numevents = driver->event_wait(fired_events, &tv); for (int j = 0; j < numevents; j++) { FiredEvent e; e.file_event = fired_events[j]; - e.is_file = true + e.is_file = true; event_wq.queue(e); } if (trigger_time) - numevents += _process_time_events(shortest); + numevents += _process_time_events(); return numevents; } diff --git a/src/msg/Event.h b/src/msg/Event.h index 541c0dd30492..0410b0158627 100644 --- a/src/msg/Event.h +++ b/src/msg/Event.h @@ -24,6 +24,7 @@ #endif #include "include/Context.h" +#include "include/unordered_map.h" #include "common/WorkQueue.h" #define EVENT_NONE 0 @@ -86,7 +87,10 @@ class EventCenter { Mutex lock; map file_events; - map time_events; + // The second element is id + map time_to_ids; + // The first element is id + unordered_map time_events; EventDriver *driver; CephContext *cct; uint64_t nevent; diff --git a/src/msg/EventEpoll.cc b/src/msg/EventEpoll.cc index a8f18571cf12..865cf012d1b3 100644 --- a/src/msg/EventEpoll.cc +++ b/src/msg/EventEpoll.cc @@ -93,7 +93,7 @@ void EpollDriver::del_event(int fd, int cur_mask, int delmask) int EpollDriver::resize_events(int newsize) { - state->events = realloc(events, sizeof(struct epoll_event)*newsize); + events = (struct epoll_event*)realloc(events, sizeof(struct epoll_event)*newsize); return 0; }