list<Context*> pending;
- if (event && now > next) {
+ if (event && now >= next) {
// move to pending list
map< utime_t, set<Context*> >::iterator it = scheduled.begin();
while (it != scheduled.end()) {
if (messenger) {
for (set<Context*>::iterator cit = it->second.begin();
cit != it->second.end();
- cit++)
+ cit++) {
pending.push_back(*cit);
+ event_times.erase(*cit);
+ }
}
//pending[t] = it->second;
dout(DBL) << "kicked or timed out at " << now << endl;
} else {
dout(DBL) << "sleeping" << endl;
-
- //wtf this isn't waking up!
timed_sleep = false;
sleep_cond.Wait(lock); // wait for waker
- // setting a 1s limit works tho
- //utime_t next = g_clock.now();
- //next.sec_ref() += 10;
- //cond.Wait(lock, next); // wait for waker
-
utime_t now = g_clock.now();
dout(DBL) << "kicked at " << now << endl;
}
* Timer bits
*/
-/*
-void Timer::set_messenger_kicker(Context *c)
-{
- dout(10) << "messenger kicker is " << c << endl;
- messenger_kicker = c;
-}
-
-void Timer::unset_messenger_kicker()
-{
- dout(10) << "unset messenger" << endl;
- if (messenger_kicker) {
- delete messenger_kicker;
- messenger_kicker = 0;
- }
- cancel_timer();
-}
-*/
-
void Timer::set_messenger(Messenger *m)
{
dout(10) << "set messenger " << m << endl;
}
utime_t tp = event_times[callback];
+ assert(scheduled.count(tp));
+ scheduled[tp].erase(callback);
event_times.erase(callback);
- scheduled.erase(tp);
- pending.erase(tp);
lock.Unlock();
return true;
}
-
-/***
- * do user callbacks
- *
- * this should be called by the Messenger in the proper thread (usually same as incoming messages)
- */
-
-/*
-void Timer::execute_pending()
-{
- lock.Lock();
-
- while (pending.size()) {
- utime_t when;
- Context *event = take_next_pending(when);
-
- lock.Unlock();
-
- dout(DBL) << "executing event " << event << " scheduled for " << when << endl;
- event->finish(0);
- delete event;
-
- lock.Lock();
- }
-
- dout(DBL) << "no more events for now" << endl;
-
- lock.Unlock();
-}
-
-*/
class Timer {
private:
map< utime_t, set<Context*> > scheduled; // time -> (context ...)
- map< utime_t, set<Context*> > pending; // time -> (context ...)
- map< Context*, utime_t > event_times; // event -> time
+ hash_map< Context*, utime_t > event_times; // event -> time
// get time of the next event
Context* get_next_scheduled(utime_t& when) {
return *sit;
}
- // get next pending event
- Context* take_next_pending(utime_t& when) {
- if (pending.empty()) return 0;
-
- map< utime_t, set<Context*> >::iterator it = pending.begin();
- when = it->first;
-
- // take and remove
- set<Context*>::iterator sit = it->second.begin();
- Context *event = *sit;
- it->second.erase(sit);
- if (it->second.empty()) pending.erase(it);
-
- return event;
- }
-
void register_timer(); // make sure i get a callback
void cancel_timer(); // make sure i get a callback
delete *sit;
}
scheduled.clear();
-
- // pending
- for (map< utime_t, set<Context*> >::iterator it = pending.begin();
- it != pending.end();
- it++) {
- for (set<Context*>::iterator sit = it->second.begin();
- sit != it->second.end();
- sit++)
- delete *sit;
- }
- pending.clear();
}
void init() {