uint64_t EventCenter::create_time_event(uint64_t microseconds, EventCallbackRef ctxt)
{
- Mutex::Locker l(time_lock);
+ assert(in_thread());
uint64_t id = time_event_next_id++;
ldout(cct, 10) << __func__ << " id=" << id << " trigger after " << microseconds << "us"<< dendl;
// TODO: Ineffective implementation now!
void EventCenter::delete_time_event(uint64_t id)
{
- Mutex::Locker l(time_lock);
+ assert(in_thread());
ldout(cct, 10) << __func__ << " id=" << id << dendl;
if (id >= time_event_next_id)
return ;
clock_type::time_point now = clock_type::now();
ldout(cct, 10) << __func__ << " cur time is " << now << dendl;
- Mutex::Locker l(time_lock);
while (!time_events.empty()) {
auto it = time_events.begin();
if (now >= it->first) {
time_events.erase(it);
ldout(cct, 10) << __func__ << " process time event: id=" << id << dendl;
processed++;
- time_lock.Unlock();
cb->do_request(id);
- time_lock.Lock();
} else {
break;
}
clock_type::time_point shortest;
shortest = now + std::chrono::microseconds(timeout_microseconds);
- Mutex::Locker l(time_lock);
auto it = time_events.begin();
- if (it != time_events.end() && shortest > it->first) {
+ if (it != time_events.end() && shortest >= it->first) {
ldout(cct, 10) << __func__ << " shortest is " << shortest << " it->first is " << it->first << dendl;
shortest = it->first;
trigger_time = true;
CephContext *cct;
int nevent;
// Used only to external event
- Mutex external_lock, file_lock, time_lock;
+ Mutex external_lock, file_lock;
atomic_t external_num_events;
deque<EventCallbackRef> external_events;
vector<FileEvent> file_events;
cct(c), nevent(0),
external_lock("AsyncMessenger::external_lock"),
file_lock("AsyncMessenger::file_lock"),
- time_lock("AsyncMessenger::time_lock"),
external_num_events(0),
driver(NULL), time_event_next_id(1),
notify_receive_fd(-1), notify_send_fd(-1), net(c),