public:
C_handle_dispatch(AsyncMessenger *msgr, Message *m): msgr(msgr), m(m) {}
void do_request(int id) {
- msgr->ms_deliver_dispatch(m);
+ msgr->ms_fast_preprocess(m);
+ if (msgr->ms_can_fast_dispatch(m)) {
+ msgr->ms_fast_dispatch(m);
+ } else {
+ msgr->ms_deliver_dispatch(m);
+ }
}
};
in_seq = message->get_seq();
ldout(async_msgr->cct, 10) << __func__ << " got message " << message->get_seq()
<< " " << message << " " << *message << dendl;
- async_msgr->ms_fast_preprocess(message);
- if (async_msgr->ms_can_fast_dispatch(message)) {
- async_msgr->ms_fast_dispatch(message);
- } else {
- center->create_time_event(0, new C_handle_dispatch(async_msgr, message));
- }
+ center->create_time_event(0, new C_handle_dispatch(async_msgr, message));
state = STATE_OPEN;
break;
* @{
*/
virtual int send_message(Message *m, const entity_inst_t& dest) {
+ Mutex::Locker l(lock);
+
return _send_message(m, dest);
}
#undef dout_prefix
#define dout_prefix *_dout << "Event "
+class C_handle_notify : public EventCallback {
+ public:
+ C_handle_notify() {}
+ void do_request(int fd_or_id) {
+ }
+};
+
int EventCenter::init(int n)
{
// can't init multi times
return r;
}
+ int fds[2];
+ if (pipe(fds) < 0) {
+ lderr(cct) << __func__ << " can't create notify pipe" << dendl;
+ return -1;
+ }
+
+ notify_receive_fd = fds[0];
+ notify_send_fd = fds[1];
+
nevent = n;
+ create_file_event(notify_receive_fd, EVENT_READABLE, new C_handle_notify());
return 0;
}
{
if (driver)
delete driver;
+
+ if (notify_receive_fd > 0)
+ ::close(notify_receive_fd);
+ if (notify_send_fd > 0)
+ ::close(notify_send_fd);
}
int EventCenter::create_file_event(int fd, int mask, EventCallback *ctxt)
time_to_ids[expire] = id;
time_events[id] = event;
+ if (expire < next_wake) {
+ char buf[1];
+ buf[0] = 'c';
+ // wake up "event_wait"
+ int n = write(notify_send_fd, buf, 1);
+ // FIXME ?
+ assert(n == 1);
+ }
return id;
}
ldout(cct, 1) << __func__ << dendl;
Mutex::Locker l(lock);
event_tp.stop();
+ char buf[1];
+ buf[0] = 'c';
+ // wake up "event_wait"
+ int n = write(notify_send_fd, buf, 1);
+ // FIXME ?
+ assert(n == 1);
}
int EventCenter::process_time_events()
tv.tv_sec = timeout_millionseconds / 1000;
tv.tv_usec = (timeout_millionseconds % 1000) * 1000;
}
+
+ next_wake = shortest;
}
ldout(cct, 10) << __func__ << " wait second " << tv.tv_sec << " usec " << tv.tv_usec << dendl;
uint64_t time_event_next_id;
ThreadPool event_tp;
time_t last_time; // last time process time event
+ int notify_receive_fd;
+ int notify_send_fd;
+ utime_t next_wake;
int process_time_events();
FileEvent *_get_file_event(int fd) {
EventCenter(CephContext *c):
lock("EventCenter::lock"), driver(NULL), cct(c), nevent(0), time_event_next_id(0),
event_tp(c, "EventCenter::event_tp", c->_conf->ms_event_op_threads, "eventcenter_op_threads"),
+ notify_receive_fd(-1), notify_send_fd(-1),
event_wq(this, c->_conf->ms_event_thread_timeout, c->_conf->ms_event_thread_suicide_timeout, &event_tp) {
last_time = time(NULL);
}
int mask = cur_mask & (~delmask);
- ee.events = 0;
+ ee.events = EPOLLET;
if (mask & EVENT_READABLE) ee.events |= EPOLLIN;
if (mask & EVENT_WRITABLE) ee.events |= EPOLLOUT;
ee.data.u64 = 0; /* avoid valgrind warning */