]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
Fix Time Event problem
authorHaomai Wang <haomaiwang@gmail.com>
Fri, 12 Sep 2014 07:52:52 +0000 (15:52 +0800)
committerHaomai Wang <haomaiwang@gmail.com>
Wed, 8 Oct 2014 06:03:17 +0000 (14:03 +0800)
Signed-off-by: Haomai Wang <haomaiwang@gmail.com>
src/msg/AsyncConnection.cc
src/msg/Event.cc
src/msg/Event.h
src/msg/EventEpoll.cc

index c3f56aa681d9a97d88f7dc32e2a45eef9b135f38..12d0c15cac2e2cc8e333b2b001e181391f7633e5 100644 (file)
@@ -1842,6 +1842,7 @@ void AsyncConnection::handle_ack(uint64_t seq)
 
 void AsyncConnection::_send_keepalive_or_ack(bool ack, utime_t *tp)
 {
+  assert(lock.is_locked());
   bufferlist bl;
 
   utime_t t = ceph_clock_now(async_msgr->cct);
@@ -1861,7 +1862,7 @@ void AsyncConnection::_send_keepalive_or_ack(bool ack, utime_t *tp)
     bl.append(CEPH_MSGR_TAG_KEEPALIVE);
   }
 
-  try_send(bl, false);
+  _try_send(bl, false);
 }
 
 void AsyncConnection::handle_write()
index 7d8bf9c50cf568e13d2763a7e62e012743e2b0d1..e59c2d24d645f9677a54cdf2112d9126f3c25ef0 100644 (file)
 #endif
 #endif
 
+#define dout_subsys ceph_subsys_ms
+
+#undef dout_prefix
+#define dout_prefix *_dout << "Event "
+
 int EventCenter::init(int n)
 {
   // can't init multi times
@@ -55,7 +60,7 @@ int EventCenter::create_file_event(int fd, int mask, EventCallback *ctxt)
 {
   int r;
   Mutex::Locker l(lock);
-  if (events.size() > nevent) {
+  if (file_events.size() > nevent) {
     int new_size = nevent << 2;
     ldout(cct, 10) << __func__ << " event count exceed " << nevent << ", expand to " << new_size << dendl;
     r = driver->resize_events(new_size);
@@ -73,8 +78,8 @@ int EventCenter::create_file_event(int fd, int mask, EventCallback *ctxt)
     return r;
 
   if (!event) {
-    events[fd] = EventCenter::FileEvent();
-    event = &events[fd];
+    file_events[fd] = EventCenter::FileEvent();
+    event = &file_events[fd];
   }
 
   event->mask |= mask;
@@ -98,8 +103,8 @@ void EventCenter::delete_file_event(int fd, int mask)
   EventCenter::FileEvent *event = _get_file_event(fd);
   driver->del_event(fd, event ? event->mask: EVENT_NONE, mask);
   if (!event) {
-    events[fd] = EventCenter::FileEvent();
-    event = &events[fd];
+    file_events[fd] = EventCenter::FileEvent();
+    event = &file_events[fd];
   }
 
   if (event->read_cb)
@@ -109,27 +114,28 @@ void EventCenter::delete_file_event(int fd, int mask)
 
   event->mask = event->mask & (~mask);
   if (event->mask == EVENT_NONE)
-    events.erase(fd);
+    file_events.erase(fd);
 }
 
 
 uint64_t EventCenter::create_time_event(uint64_t milliseconds, EventCallback *ctxt)
 {
   Mutex::Locker l(lock);
-  uint64_t id = eventLoop->timeEventNextId++;
+  uint64_t id = time_event_next_id++;
   EventCenter::TimeEvent event;
   utime_t expire;
   struct timeval tv;
 
   expire = ceph_clock_now(cct);
   expire.copy_to_timeval(&tv);
-  tv.tv_sec = expire.tv_sec + milliseconds / 1000;
-  tv.tv_usec = expire.tv_usec + milliseconds * 1000;
+  tv.tv_sec += milliseconds / 1000;
+  tv.tv_usec += milliseconds * 1000;
   expire.set_from_timeval(&tv);
 
   event.id = id;
   event.time_cb = ctxt;
-  time_events[expire] = event;
+  time_to_ids[expire] = id;
+  time_events[id] = event;
 
   return id;
 }
@@ -137,10 +143,11 @@ uint64_t EventCenter::create_time_event(uint64_t milliseconds, EventCallback *ct
 void EventCenter::delete_time_event(uint64_t id)
 {
   Mutex::Locker l(lock);
-  for (map<utime_t, TimeEvent>::iterator it = time_events.begin();
-       it != time_events.end(); it++) {
-    if (it->second.id == id) {
-      time_events.erase(it);
+  for (map<utime_t, uint64_t>::iterator it = time_to_ids.begin();
+       it != time_to_ids.end(); it++) {
+    if (it->second == id) {
+      time_to_ids.erase(it);
+      time_events.erase(id);
       return ;
     }
   }
@@ -161,26 +168,29 @@ int EventCenter::_process_time_events()
    * processing events earlier is less dangerous than delaying them
    * indefinitely, and practice suggests it is. */
   if (now < last_time) {
-    for (map<utime_t, TimeEvent>::iterator it = time_events.begin();
-          it != time_events.end(); ++it) {
-      it->first = utime_t();
+    map<utime_t, uint64_t> changed;
+    for (map<utime_t, uint64_t>::iterator it = time_to_ids.begin();
+          it != time_to_ids.end(); ++it) {
+      changed[utime_t()] = it->second;
     }
+    time_to_ids.swap(changed);
   }
   last_time = now;
 
-  map<utime_t, TimeEvent>::iterator prev;
-  for (map<utime_t, TimeEvent>::iterator it = time_events.begin();
-       it != time_events.end(); ) {
+  map<utime_t, uint64_t>::iterator prev;
+  for (map<utime_t, uint64_t>::iterator it = time_to_ids.begin();
+       it != time_to_ids.end(); ) {
     prev = it;
     if (cur >= it->first) {
-      FiredEvent event;
-      e.time_event.id = it->second.id;
-      e.time_event.time_cb = it->second.time_cb;
+      FiredEvent e;
+      e.time_event.id = it->second;
+      e.time_event.time_cb = time_events[it->second].time_cb;
       e.is_file = false;
       event_wq.queue(e);
       processed++;
       ++it;
-      time_events.erase(prev);
+      time_to_ids.erase(prev);
+      time_events.erase(prev->second);
     } else {
       break;
     }
@@ -206,27 +216,27 @@ int EventCenter::process_events(int timeout_millionseconds)
 
   Mutex::Locker l(lock);
   utime_t shortest = utime_t(&tv);
-  for (map<utime_t, TimeEvent>::iterator it = time_events.begin();
-        it != time_events.end(); ++it) {
-    if (shortes > it->first) {
+  for (map<utime_t, uint64_t>::iterator it = time_to_ids.begin();
+        it != time_to_ids.end(); ++it) {
+    if (shortest > it->first) {
       shortest = it->first;
       trigger_time = true;
       break;
     }
   }
-  shortes.copy_to_timeval(&tv);
+  shortest.copy_to_timeval(&tv);
 
   vector<FiredFileEvent> fired_events;
   numevents = driver->event_wait(fired_events, &tv);
   for (int j = 0; j < numevents; j++) {
     FiredEvent e;
     e.file_event = fired_events[j];
-    e.is_file = true
+    e.is_file = true;
     event_wq.queue(e);
   }
 
   if (trigger_time)
-    numevents += _process_time_events(shortest);
+    numevents += _process_time_events();
 
   return numevents;
 }
index 541c0dd304922d8b80371d47f2776cccc384fa2d..0410b01586272e89c95e2759d243fb6489ade115 100644 (file)
@@ -24,6 +24,7 @@
 #endif
 
 #include "include/Context.h"
+#include "include/unordered_map.h"
 #include "common/WorkQueue.h"
 
 #define EVENT_NONE 0
@@ -86,7 +87,10 @@ class EventCenter {
 
   Mutex lock;
   map<int, FileEvent> file_events;
-  map<utime_t, TimeEvent> time_events;
+  // The second element is id
+  map<utime_t, uint64_t> time_to_ids;
+  // The first element is id
+  unordered_map<uint64_t, TimeEvent> time_events;
   EventDriver *driver;
   CephContext *cct;
   uint64_t nevent;
index a8f18571cf12168c0a2dc4e131324ca24a63f84b..865cf012d1b3cd5889001e4a27a631e454bcbb2a 100644 (file)
@@ -93,7 +93,7 @@ void EpollDriver::del_event(int fd, int cur_mask, int delmask)
 
 int EpollDriver::resize_events(int newsize)
 {
-  state->events = realloc(events, sizeof(struct epoll_event)*newsize);
+  events = (struct epoll_event*)realloc(events, sizeof(struct epoll_event)*newsize);
   return 0;
 }