<< " time_id=" << time_event_next_id << ").";
}
+static thread_local pthread_t thread_id = 0;
+
int EventCenter::init(int n)
{
// can't init multi times
free(file_events);
}
+
+void EventCenter::set_owner()
+{
+ thread_id = owner = pthread_self();
+}
+
int EventCenter::create_file_event(int fd, int mask, EventCallbackRef ctxt)
{
int r = 0;
int numevents;
bool trigger_time = false;
- utime_t period, shortest, now = ceph_clock_now(cct);
- now.copy_to_timeval(&tv);
- if (timeout_microseconds > 0) {
- tv.tv_sec += timeout_microseconds / 1000000;
- tv.tv_usec += timeout_microseconds % 1000000;
- }
- shortest.set_from_timeval(&tv);
+ utime_t now = ceph_clock_now(cct);;
+ // If exists external events, don't block
+ if (external_num_events.read()) {
+ tv.tv_sec = 0;
+ tv.tv_usec = 0;
+ next_time = now;
+ } else {
+ utime_t period, shortest;
+ now.copy_to_timeval(&tv);
+ if (timeout_microseconds > 0) {
+ tv.tv_sec += timeout_microseconds / 1000000;
+ tv.tv_usec += timeout_microseconds % 1000000;
+ }
+ shortest.set_from_timeval(&tv);
- {
Mutex::Locker l(time_lock);
map<utime_t, list<TimeEvent> >::iterator it = time_events.begin();
if (it != time_events.end() && shortest >= it->first) {
tv.tv_sec = timeout_microseconds / 1000000;
tv.tv_usec = timeout_microseconds % 1000000;
}
+ next_time = shortest;
}
ldout(cct, 10) << __func__ << " wait second " << tv.tv_sec << " usec " << tv.tv_usec << dendl;
vector<FiredFileEvent> fired_events;
- next_time = shortest;
numevents = driver->event_wait(fired_events, &tv);
file_lock.Lock();
for (int j = 0; j < numevents; j++) {
if (trigger_time)
numevents += process_time_events();
- external_lock.Lock();
- if (external_events.empty()) {
- external_lock.Unlock();
- } else {
- deque<EventCallbackRef> cur_process;
- cur_process.swap(external_events);
- external_lock.Unlock();
- while (!cur_process.empty()) {
- EventCallbackRef e = cur_process.front();
- if (e)
- e->do_request(0);
- cur_process.pop_front();
+ if (external_num_events.read()) {
+ external_lock.Lock();
+ if (external_events.empty()) {
+ external_lock.Unlock();
+ } else {
+ deque<EventCallbackRef> cur_process;
+ cur_process.swap(external_events);
+ external_num_events.set(0);
+ external_lock.Unlock();
+ while (!cur_process.empty()) {
+ EventCallbackRef e = cur_process.front();
+ if (e)
+ e->do_request(0);
+ cur_process.pop_front();
+ }
}
}
return numevents;
{
external_lock.Lock();
external_events.push_back(e);
+ uint64_t num = external_num_events.inc();
external_lock.Unlock();
- wakeup();
+ if (thread_id != owner)
+ wakeup();
+
+ ldout(cct, 10) << __func__ << " " << e << " pending " << num << dendl;
}
int nevent;
// Used only to external event
Mutex external_lock, file_lock, time_lock;
+ atomic_t external_num_events;
deque<EventCallbackRef> external_events;
FileEvent *file_events;
EventDriver *driver;
external_lock("AsyncMessenger::external_lock"),
file_lock("AsyncMessenger::file_lock"),
time_lock("AsyncMessenger::time_lock"),
+ external_num_events(0),
file_events(NULL),
driver(NULL), time_event_next_id(1),
notify_receive_fd(-1), notify_send_fd(-1), net(c), owner(0), already_wakeup(0) {
ostream& _event_prefix(std::ostream *_dout);
int init(int nevent);
- void set_owner(pthread_t p) { owner = p; }
+ void set_owner();
pthread_t get_owner() { return owner; }
// Used by internal thread