#endif
#endif
+#define dout_subsys ceph_subsys_ms
+
+#undef dout_prefix
+#define dout_prefix *_dout << "Event "
+
int EventCenter::init(int n)
{
// can't init multi times
{
int r;
Mutex::Locker l(lock);
- if (events.size() > nevent) {
+ if (file_events.size() > nevent) {
int new_size = nevent << 2;
ldout(cct, 10) << __func__ << " event count exceed " << nevent << ", expand to " << new_size << dendl;
r = driver->resize_events(new_size);
return r;
if (!event) {
- events[fd] = EventCenter::FileEvent();
- event = &events[fd];
+ file_events[fd] = EventCenter::FileEvent();
+ event = &file_events[fd];
}
event->mask |= mask;
EventCenter::FileEvent *event = _get_file_event(fd);
driver->del_event(fd, event ? event->mask: EVENT_NONE, mask);
if (!event) {
- events[fd] = EventCenter::FileEvent();
- event = &events[fd];
+ file_events[fd] = EventCenter::FileEvent();
+ event = &file_events[fd];
}
if (event->read_cb)
event->mask = event->mask & (~mask);
if (event->mask == EVENT_NONE)
- events.erase(fd);
+ file_events.erase(fd);
}
uint64_t EventCenter::create_time_event(uint64_t milliseconds, EventCallback *ctxt)
{
Mutex::Locker l(lock);
- uint64_t id = eventLoop->timeEventNextId++;
+ uint64_t id = time_event_next_id++;
EventCenter::TimeEvent event;
utime_t expire;
struct timeval tv;
expire = ceph_clock_now(cct);
expire.copy_to_timeval(&tv);
- tv.tv_sec = expire.tv_sec + milliseconds / 1000;
- tv.tv_usec = expire.tv_usec + milliseconds * 1000;
+ tv.tv_sec += milliseconds / 1000;
+ tv.tv_usec += milliseconds * 1000;
expire.set_from_timeval(&tv);
event.id = id;
event.time_cb = ctxt;
- time_events[expire] = event;
+ time_to_ids[expire] = id;
+ time_events[id] = event;
return id;
}
void EventCenter::delete_time_event(uint64_t id)
{
Mutex::Locker l(lock);
- for (map<utime_t, TimeEvent>::iterator it = time_events.begin();
- it != time_events.end(); it++) {
- if (it->second.id == id) {
- time_events.erase(it);
+ for (map<utime_t, uint64_t>::iterator it = time_to_ids.begin();
+ it != time_to_ids.end(); it++) {
+ if (it->second == id) {
+ time_to_ids.erase(it);
+ time_events.erase(id);
return ;
}
}
* processing events earlier is less dangerous than delaying them
* indefinitely, and practice suggests it is. */
if (now < last_time) {
- for (map<utime_t, TimeEvent>::iterator it = time_events.begin();
- it != time_events.end(); ++it) {
- it->first = utime_t();
+ map<utime_t, uint64_t> changed;
+ for (map<utime_t, uint64_t>::iterator it = time_to_ids.begin();
+ it != time_to_ids.end(); ++it) {
+ changed[utime_t()] = it->second;
}
+ time_to_ids.swap(changed);
}
last_time = now;
- map<utime_t, TimeEvent>::iterator prev;
- for (map<utime_t, TimeEvent>::iterator it = time_events.begin();
- it != time_events.end(); ) {
+ map<utime_t, uint64_t>::iterator prev;
+ for (map<utime_t, uint64_t>::iterator it = time_to_ids.begin();
+ it != time_to_ids.end(); ) {
prev = it;
if (cur >= it->first) {
- FiredEvent event;
- e.time_event.id = it->second.id;
- e.time_event.time_cb = it->second.time_cb;
+ FiredEvent e;
+ e.time_event.id = it->second;
+ e.time_event.time_cb = time_events[it->second].time_cb;
e.is_file = false;
event_wq.queue(e);
processed++;
++it;
- time_events.erase(prev);
+ time_to_ids.erase(prev);
+ time_events.erase(prev->second);
} else {
break;
}
Mutex::Locker l(lock);
utime_t shortest = utime_t(&tv);
- for (map<utime_t, TimeEvent>::iterator it = time_events.begin();
- it != time_events.end(); ++it) {
- if (shortes > it->first) {
+ for (map<utime_t, uint64_t>::iterator it = time_to_ids.begin();
+ it != time_to_ids.end(); ++it) {
+ if (shortest > it->first) {
shortest = it->first;
trigger_time = true;
break;
}
}
- shortes.copy_to_timeval(&tv);
+ shortest.copy_to_timeval(&tv);
vector<FiredFileEvent> fired_events;
numevents = driver->event_wait(fired_events, &tv);
for (int j = 0; j < numevents; j++) {
FiredEvent e;
e.file_event = fired_events[j];
- e.is_file = true
+ e.is_file = true;
event_wq.queue(e);
}
if (trigger_time)
- numevents += _process_time_events(shortest);
+ numevents += _process_time_events();
return numevents;
}