utime_t expire;
struct timeval tv;
- expire = ceph_clock_now(cct);
- expire.copy_to_timeval(&tv);
- tv.tv_sec += microseconds / 1000000;
- tv.tv_usec += microseconds % 1000000;
+ if (microseconds < 5) {
+ tv.tv_sec = 0;
+ tv.tv_usec = microseconds;
+ } else {
+ expire = ceph_clock_now(cct);
+ expire.copy_to_timeval(&tv);
+ tv.tv_sec += microseconds / 1000000;
+ tv.tv_usec += microseconds % 1000000;
+ }
expire.set_from_timeval(&tv);
event.id = id;
event.time_cb = ctxt;
- time_to_ids[expire] = id;
- time_events[id] = event;
+ time_events[expire].push_back(event);
- if (expire < next_wake) {
- wakeup();
- }
return id;
}
-void EventCenter::delete_time_event(uint64_t id)
-{
- for (map<utime_t, uint64_t>::iterator it = time_to_ids.begin();
- it != time_to_ids.end(); it++) {
- if (it->second == id) {
- time_to_ids.erase(it);
- time_events.erase(id);
- ldout(cct, 10) << __func__ << " id=" << id << dendl;
- return ;
- }
- }
-}
-
void EventCenter::wakeup()
{
ldout(cct, 1) << __func__ << dendl;
* processing events earlier is less dangerous than delaying them
* indefinitely, and practice suggests it is. */
if (now < last_time) {
- map<utime_t, uint64_t> changed;
- for (map<utime_t, uint64_t>::iterator it = time_to_ids.begin();
- it != time_to_ids.end(); ++it) {
- changed[utime_t()] = it->second;
+ map<utime_t, list<TimeEvent> > changed;
+ for (map<utime_t, list<TimeEvent> >::iterator it = time_events.begin();
+ it != time_events.end(); ++it) {
+ changed[utime_t()].swap(it->second);
}
- time_to_ids.swap(changed);
+ time_events.swap(changed);
}
last_time = now;
- map<utime_t, uint64_t>::iterator prev;
- for (map<utime_t, uint64_t>::iterator it = time_to_ids.begin();
- it != time_to_ids.end(); ) {
+ map<utime_t, list<TimeEvent> >::iterator prev;
+ for (map<utime_t, list<TimeEvent> >::iterator it = time_events.begin();
+ it != time_events.end(); ) {
prev = it;
if (cur >= it->first) {
- ldout(cct, 10) << __func__ << " queue time event: id=" << it->second << " time is "
- << it->first << dendl;
- time_events[it->second].time_cb->do_request(it->second);
+ for (list<TimeEvent>::iterator j = it->second.begin();
+ j != it->second.end(); ++j) {
+ ldout(cct, 10) << __func__ << " process time event: id=" << j->id << " time is "
+ << it->first << dendl;
+ j->time_cb->do_request(j->id);
+ }
processed++;
++it;
- time_to_ids.erase(prev);
- time_events.erase(prev->second);
+ time_events.erase(prev);
} else {
break;
}
shortest.set_from_timeval(&tv);
{
- map<utime_t, uint64_t>::iterator it = time_to_ids.begin();
- if (it != time_to_ids.end() && shortest >= it->first) {
+ map<utime_t, list<TimeEvent> >::iterator it = time_events.begin();
+ if (it != time_events.end() && shortest >= it->first) {
ldout(cct, 10) << __func__ << " shortest is " << shortest << " it->first is " << it->first << dendl;
shortest = it->first;
trigger_time = true;
tv.tv_sec = timeout_microseconds / 1000000;
tv.tv_usec = timeout_microseconds % 1000000;
}
-
- next_wake = shortest;
}
ldout(cct, 10) << __func__ << " wait second " << tv.tv_sec << " usec " << tv.tv_usec << dendl;
deque<EventCallbackRef> external_events;
unordered_map<int, FileEvent> file_events;
EventDriver *driver;
- // The second element is id
- map<utime_t, uint64_t> time_to_ids;
- // The first element is id
- unordered_map<uint64_t, TimeEvent> time_events;
+ map<utime_t, list<TimeEvent> > time_events;
uint64_t time_event_next_id;
time_t last_time; // last time process time event
int notify_receive_fd;
int notify_send_fd;
- utime_t next_wake;
int process_time_events();
FileEvent *_get_file_event(int fd) {
int create_file_event(int fd, int mask, EventCallbackRef ctxt);
uint64_t create_time_event(uint64_t milliseconds, EventCallbackRef ctxt);
void delete_file_event(int fd, int mask);
- void delete_time_event(uint64_t id);
int process_events(int timeout_microseconds);
void wakeup();