C_handle_notify(EventCenter *c, CephContext *cc): center(c), cct(cc) {}
void do_request(int fd_or_id) {
char c[256];
+ int r = 0;
do {
- center->already_wakeup.set(0);
- int r = read(fd_or_id, c, sizeof(c));
+ r = read(fd_or_id, c, sizeof(c));
if (r < 0) {
- ldout(cct, 1) << __func__ << " read notify pipe failed: " << cpp_strerror(errno) << dendl;
- break;
+ if (errno != EAGAIN)
+ ldout(cct, 1) << __func__ << " read notify pipe failed: " << cpp_strerror(errno) << dendl;
}
- } while (center->already_wakeup.read());
+ } while (r > 0);
}
};
void EventCenter::wakeup()
{
- ldout(cct, 1) << __func__ << dendl;
- already_wakeup.compare_and_swap(0, 1);
-
- char buf[1];
- buf[0] = 'c';
- // wake up "event_wait"
- int n = write(notify_send_fd, buf, 1);
- // FIXME ?
- assert(n == 1);
+ ldout(cct, 1) << __func__ << dendl;
+
+ char buf = 'c';
+ // wake up "event_wait"
+ int n = write(notify_send_fd, &buf, sizeof(buf));
+ if (n < 0) {
+ ldout(cct, 1) << __func__ << " write notify pipe failed: " << cpp_strerror(errno) << dendl;
+ assert(0);
+ }
}
int EventCenter::process_time_events()
if (external_num_events.load()) {
external_lock.lock();
- if (external_events.empty()) {
- external_lock.unlock();
- } else {
- deque<EventCallbackRef> cur_process;
- cur_process.swap(external_events);
- external_num_events.store(0);
- external_lock.unlock();
- while (!cur_process.empty()) {
- EventCallbackRef e = cur_process.front();
- ldout(cct, 20) << __func__ << " do " << e << dendl;
- if (e)
- e->do_request(0);
- cur_process.pop_front();
- numevents++;
- }
+ deque<EventCallbackRef> cur_process;
+ cur_process.swap(external_events);
+ external_num_events.store(0);
+ external_lock.unlock();
+ while (!cur_process.empty()) {
+ EventCallbackRef e = cur_process.front();
+ ldout(cct, 20) << __func__ << " do " << e << dendl;
+ e->do_request(0);
+ cur_process.pop_front();
+ numevents++;
}
}
return numevents;
{
external_lock.lock();
external_events.push_back(e);
+ bool wake = !external_num_events.load();
uint64_t num = ++external_num_events;
external_lock.unlock();
- if (!in_thread())
+ if (!in_thread() && wake)
wakeup();
ldout(cct, 20) << __func__ << " " << e << " pending " << num << dendl;