From: Haomai Wang Date: Sun, 21 Sep 2014 07:20:12 +0000 (+0800) Subject: Event: time event can used as priority queue X-Git-Tag: v0.88~37^2~4^2~16 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=da83dac239bd0a81a2b9788a552000f7481ca9bb;p=ceph.git Event: time event can used as priority queue Signed-off-by: Haomai Wang --- diff --git a/src/msg/AsyncConnection.cc b/src/msg/AsyncConnection.cc index 730eb57fe911..3fe7402c4673 100644 --- a/src/msg/AsyncConnection.cc +++ b/src/msg/AsyncConnection.cc @@ -216,6 +216,10 @@ int AsyncConnection::_try_send(bufferlist send_bl, bool send) return 0; } + if (state == STATE_STANDBY) { + ldout(async_msgr->cct, 1) << __func__ << " connection is standby" << dendl; + return 0; + } if (state == STATE_CLOSED) { ldout(async_msgr->cct, 1) << __func__ << " connection is closed" << dendl; return -EINTR; @@ -2012,9 +2016,7 @@ void AsyncConnection::handle_write() break; } } - } else if (state != STATE_CONNECTING && - state != STATE_CLOSED && - state != STATE_STANDBY) { // send_message may call this even if socket is closed + } else if (state != STATE_CONNECTING) { r = _try_send(bl); if (r < 0) { ldout(async_msgr->cct, 1) << __func__ << " send outcoming bl failed" << dendl; diff --git a/src/msg/Event.cc b/src/msg/Event.cc index 4fa714e9ec51..a5c6314345f6 100644 --- a/src/msg/Event.cc +++ b/src/msg/Event.cc @@ -145,36 +145,24 @@ uint64_t EventCenter::create_time_event(uint64_t microseconds, EventCallbackRef utime_t expire; struct timeval tv; - expire = ceph_clock_now(cct); - expire.copy_to_timeval(&tv); - tv.tv_sec += microseconds / 1000000; - tv.tv_usec += microseconds % 1000000; + if (microseconds < 5) { + tv.tv_sec = 0; + tv.tv_usec = microseconds; + } else { + expire = ceph_clock_now(cct); + expire.copy_to_timeval(&tv); + tv.tv_sec += microseconds / 1000000; + tv.tv_usec += microseconds % 1000000; + } expire.set_from_timeval(&tv); event.id = id; event.time_cb = ctxt; - time_to_ids[expire] = id; - time_events[id] = event; + time_events[expire].push_back(event); - if (expire < next_wake) { - wakeup(); - } return id; } -void EventCenter::delete_time_event(uint64_t id) -{ - for (map::iterator it = time_to_ids.begin(); - it != time_to_ids.end(); it++) { - if (it->second == id) { - time_to_ids.erase(it); - time_events.erase(id); - ldout(cct, 10) << __func__ << " id=" << id << dendl; - return ; - } - } -} - void EventCenter::wakeup() { ldout(cct, 1) << __func__ << dendl; @@ -202,27 +190,29 @@ int EventCenter::process_time_events() * processing events earlier is less dangerous than delaying them * indefinitely, and practice suggests it is. */ if (now < last_time) { - map changed; - for (map::iterator it = time_to_ids.begin(); - it != time_to_ids.end(); ++it) { - changed[utime_t()] = it->second; + map > changed; + for (map >::iterator it = time_events.begin(); + it != time_events.end(); ++it) { + changed[utime_t()].swap(it->second); } - time_to_ids.swap(changed); + time_events.swap(changed); } last_time = now; - map::iterator prev; - for (map::iterator it = time_to_ids.begin(); - it != time_to_ids.end(); ) { + map >::iterator prev; + for (map >::iterator it = time_events.begin(); + it != time_events.end(); ) { prev = it; if (cur >= it->first) { - ldout(cct, 10) << __func__ << " queue time event: id=" << it->second << " time is " - << it->first << dendl; - time_events[it->second].time_cb->do_request(it->second); + for (list::iterator j = it->second.begin(); + j != it->second.end(); ++j) { + ldout(cct, 10) << __func__ << " process time event: id=" << j->id << " time is " + << it->first << dendl; + j->time_cb->do_request(j->id); + } processed++; ++it; - time_to_ids.erase(prev); - time_events.erase(prev->second); + time_events.erase(prev); } else { break; } @@ -246,8 +236,8 @@ int EventCenter::process_events(int timeout_microseconds) shortest.set_from_timeval(&tv); { - map::iterator it = time_to_ids.begin(); - if (it != time_to_ids.end() && shortest >= it->first) { + map >::iterator it = time_events.begin(); + if (it != time_events.end() && shortest >= it->first) { ldout(cct, 10) << __func__ << " shortest is " << shortest << " it->first is " << it->first << dendl; shortest = it->first; trigger_time = true; @@ -262,8 +252,6 @@ int EventCenter::process_events(int timeout_microseconds) tv.tv_sec = timeout_microseconds / 1000000; tv.tv_usec = timeout_microseconds % 1000000; } - - next_wake = shortest; } ldout(cct, 10) << __func__ << " wait second " << tv.tv_sec << " usec " << tv.tv_usec << dendl; diff --git a/src/msg/Event.h b/src/msg/Event.h index b956298c75e8..595613b85b61 100644 --- a/src/msg/Event.h +++ b/src/msg/Event.h @@ -79,15 +79,11 @@ class EventCenter { deque external_events; unordered_map file_events; EventDriver *driver; - // The second element is id - map time_to_ids; - // The first element is id - unordered_map time_events; + map > time_events; uint64_t time_event_next_id; time_t last_time; // last time process time event int notify_receive_fd; int notify_send_fd; - utime_t next_wake; int process_time_events(); FileEvent *_get_file_event(int fd) { @@ -112,7 +108,6 @@ class EventCenter { int create_file_event(int fd, int mask, EventCallbackRef ctxt); uint64_t create_time_event(uint64_t milliseconds, EventCallbackRef ctxt); void delete_file_event(int fd, int mask); - void delete_time_event(uint64_t id); int process_events(int timeout_microseconds); void wakeup();