]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
Event: time event can used as priority queue
authorHaomai Wang <haomaiwang@gmail.com>
Sun, 21 Sep 2014 07:20:12 +0000 (15:20 +0800)
committerHaomai Wang <haomaiwang@gmail.com>
Wed, 8 Oct 2014 06:04:59 +0000 (14:04 +0800)
Signed-off-by: Haomai Wang <haomaiwang@gmail.com>
src/msg/AsyncConnection.cc
src/msg/Event.cc
src/msg/Event.h

index 730eb57fe911f4813ae9bd2cce5808bb0feb09d2..3fe7402c46731bf14e89e1689916a815f2098729 100644 (file)
@@ -216,6 +216,10 @@ int AsyncConnection::_try_send(bufferlist send_bl, bool send)
     return 0;
   }
 
+  if (state == STATE_STANDBY) {
+    ldout(async_msgr->cct, 1) << __func__ << " connection is standby" << dendl;
+    return 0;
+  }
   if (state == STATE_CLOSED) {
     ldout(async_msgr->cct, 1) << __func__ << " connection is closed" << dendl;
     return -EINTR;
@@ -2012,9 +2016,7 @@ void AsyncConnection::handle_write()
         break;
       }
     }
-  } else if (state != STATE_CONNECTING &&
-             state != STATE_CLOSED &&
-             state != STATE_STANDBY) { // send_message may call this even if socket is closed
+  } else if (state != STATE_CONNECTING) {
     r = _try_send(bl);
     if (r < 0) {
       ldout(async_msgr->cct, 1) << __func__ << " send outcoming bl failed" << dendl;
index 4fa714e9ec513af0fce63566100eff47be683303..a5c6314345f64f9cbef8e1a269592e99ac4eb6da 100644 (file)
@@ -145,36 +145,24 @@ uint64_t EventCenter::create_time_event(uint64_t microseconds, EventCallbackRef
   utime_t expire;
   struct timeval tv;
 
-  expire = ceph_clock_now(cct);
-  expire.copy_to_timeval(&tv);
-  tv.tv_sec += microseconds / 1000000;
-  tv.tv_usec += microseconds % 1000000;
+  if (microseconds < 5) {
+    tv.tv_sec = 0;
+    tv.tv_usec = microseconds;
+  } else {
+    expire = ceph_clock_now(cct);
+    expire.copy_to_timeval(&tv);
+    tv.tv_sec += microseconds / 1000000;
+    tv.tv_usec += microseconds % 1000000;
+  }
   expire.set_from_timeval(&tv);
 
   event.id = id;
   event.time_cb = ctxt;
-  time_to_ids[expire] = id;
-  time_events[id] = event;
+  time_events[expire].push_back(event);
 
-  if (expire < next_wake) {
-    wakeup();
-  }
   return id;
 }
 
-void EventCenter::delete_time_event(uint64_t id)
-{
-  for (map<utime_t, uint64_t>::iterator it = time_to_ids.begin();
-       it != time_to_ids.end(); it++) {
-    if (it->second == id) {
-      time_to_ids.erase(it);
-      time_events.erase(id);
-      ldout(cct, 10) << __func__ << " id=" << id << dendl;
-      return ;
-    }
-  }
-}
-
 void EventCenter::wakeup()
 {
   ldout(cct, 1) << __func__ << dendl;
@@ -202,27 +190,29 @@ int EventCenter::process_time_events()
    * processing events earlier is less dangerous than delaying them
    * indefinitely, and practice suggests it is. */
   if (now < last_time) {
-    map<utime_t, uint64_t> changed;
-    for (map<utime_t, uint64_t>::iterator it = time_to_ids.begin();
-          it != time_to_ids.end(); ++it) {
-      changed[utime_t()] = it->second;
+    map<utime_t, list<TimeEvent> > changed;
+    for (map<utime_t, list<TimeEvent> >::iterator it = time_events.begin();
+         it != time_events.end(); ++it) {
+      changed[utime_t()].swap(it->second);
     }
-    time_to_ids.swap(changed);
+    time_events.swap(changed);
   }
   last_time = now;
 
-  map<utime_t, uint64_t>::iterator prev;
-  for (map<utime_t, uint64_t>::iterator it = time_to_ids.begin();
-       it != time_to_ids.end(); ) {
+  map<utime_t, list<TimeEvent> >::iterator prev;
+  for (map<utime_t, list<TimeEvent> >::iterator it = time_events.begin();
+       it != time_events.end(); ) {
     prev = it;
     if (cur >= it->first) {
-      ldout(cct, 10) << __func__ << " queue time event: id=" << it->second << " time is "
-                     << it->first << dendl;
-      time_events[it->second].time_cb->do_request(it->second);
+      for (list<TimeEvent>::iterator j = it->second.begin();
+           j != it->second.end(); ++j) {
+        ldout(cct, 10) << __func__ << " process time event: id=" << j->id << " time is "
+                      << it->first << dendl;
+        j->time_cb->do_request(j->id);
+      }
       processed++;
       ++it;
-      time_to_ids.erase(prev);
-      time_events.erase(prev->second);
+      time_events.erase(prev);
     } else {
       break;
     }
@@ -246,8 +236,8 @@ int EventCenter::process_events(int timeout_microseconds)
   shortest.set_from_timeval(&tv);
 
   {
-    map<utime_t, uint64_t>::iterator it = time_to_ids.begin();
-    if (it != time_to_ids.end() && shortest >= it->first) {
+    map<utime_t, list<TimeEvent> >::iterator it = time_events.begin();
+    if (it != time_events.end() && shortest >= it->first) {
       ldout(cct, 10) << __func__ << " shortest is " << shortest << " it->first is " << it->first << dendl;
       shortest = it->first;
       trigger_time = true;
@@ -262,8 +252,6 @@ int EventCenter::process_events(int timeout_microseconds)
       tv.tv_sec = timeout_microseconds / 1000000;
       tv.tv_usec = timeout_microseconds % 1000000;
     }
-
-    next_wake = shortest;
   }
 
   ldout(cct, 10) << __func__ << " wait second " << tv.tv_sec << " usec " << tv.tv_usec << dendl;
index b956298c75e84ff0e88aed003f2837b4874eec78..595613b85b6134894ff38272045bf12c0a1728db 100644 (file)
@@ -79,15 +79,11 @@ class EventCenter {
   deque<EventCallbackRef> external_events;
   unordered_map<int, FileEvent> file_events;
   EventDriver *driver;
-  // The second element is id
-  map<utime_t, uint64_t> time_to_ids;
-  // The first element is id
-  unordered_map<uint64_t, TimeEvent> time_events;
+  map<utime_t, list<TimeEvent> > time_events;
   uint64_t time_event_next_id;
   time_t last_time; // last time process time event
   int notify_receive_fd;
   int notify_send_fd;
-  utime_t next_wake;
 
   int process_time_events();
   FileEvent *_get_file_event(int fd) {
@@ -112,7 +108,6 @@ class EventCenter {
   int create_file_event(int fd, int mask, EventCallbackRef ctxt);
   uint64_t create_time_event(uint64_t milliseconds, EventCallbackRef ctxt);
   void delete_file_event(int fd, int mask);
-  void delete_time_event(uint64_t id);
   int process_events(int timeout_microseconds);
   void wakeup();