public:
C_handle_dispatch(AsyncMessenger *msgr, Message *m): msgr(msgr), m(m) {}
void do_request(int id) {
- msgr->ms_fast_preprocess(m);
- if (msgr->ms_can_fast_dispatch(m)) {
- msgr->ms_fast_dispatch(m);
- } else {
+ //msgr->ms_fast_preprocess(m);
+ //if (msgr->ms_can_fast_dispatch(m)) {
+ // msgr->ms_fast_dispatch(m);
+ //} else {
msgr->ms_deliver_dispatch(m);
- }
+ //}
}
};
state(STATE_NONE), state_after_send(0), sd(-1),
lock("AsyncConnection::lock"), open_write(false),
got_bad_auth(false), authorizer(NULL),
- state_buffer(4096), state_offset(0), net(cct), center(&m->center) { }
+ state_buffer(4096), state_offset(0), net(cct), center(&m->center)
+{
+ read_handler.reset(new C_handle_read(this));
+ write_handler.reset(new C_handle_write(this));
+ reset_handler.reset(new C_handle_reset(async_msgr, this));
+}
AsyncConnection::~AsyncConnection()
{
assert(!outcoming_bl.length());
connect_seq++;
state = STATE_CONNECTING;
- center->create_time_event(0, new C_handle_read(this));
+ center->create_time_event(0, read_handler);
return 0;
}
}
if (!open_write && is_queued()) {
- center->create_file_event(sd, EVENT_WRITABLE, new C_handle_write(this));
+ center->create_file_event(sd, EVENT_WRITABLE, write_handler);
open_write = true;
}
int prev_state = state;
Mutex::Locker l(lock);
do {
- ldout(async_msgr->cct, 10) << __func__ << " state is " << get_state_name(state)
+ ldout(async_msgr->cct, 20) << __func__ << " state is " << get_state_name(state)
<< ", prev state is " << get_state_name(prev_state) << dendl;
prev_state = state;
switch (state) {
in_seq = message->get_seq();
ldout(async_msgr->cct, 10) << __func__ << " got message " << message->get_seq()
<< " " << message << " " << *message << dendl;
- center->create_time_event(0, new C_handle_dispatch(async_msgr, message));
-
state = STATE_OPEN;
+
+ async_msgr->ms_fast_preprocess(message);
+ if (async_msgr->ms_can_fast_dispatch(message)) {
+ lock.Unlock();
+ async_msgr->ms_fast_dispatch(message);
+ lock.Lock();
+ } else {
+ center->create_time_event(0, EventCallbackRef(new C_handle_dispatch(async_msgr, message)));
+ }
+
break;
}
}
net.set_socket_options(sd);
- center->create_file_event(sd, EVENT_READABLE, new C_handle_read(this));
+ center->create_file_event(sd, EVENT_READABLE, read_handler);
state = STATE_CONNECTING_WAIT_BANNER;
break;
}
state = STATE_CONNECTING;
// rescheduler connection in order to avoid lock dep
- center->create_time_event(0, new C_handle_read(this));
+ center->create_time_event(0, read_handler);
}
void AsyncConnection::accept(int incoming)
sd = incoming;
state = STATE_ACCEPTING;
- center->create_file_event(sd, EVENT_READABLE, new C_handle_read(this));
+ center->create_file_event(sd, EVENT_READABLE, read_handler);
process();
}
Mutex::Locker l(lock);
out_q[m->get_priority()].push_back(m);
if (sd > 0 && !open_write) {
- center->create_file_event(sd, EVENT_WRITABLE, new C_handle_write(this));
+ center->create_file_event(sd, EVENT_WRITABLE, write_handler);
open_write = true;
}
return 0;
uint64_t milliseconds = double(backoff) * 1000;
// woke up again;
- center->create_time_event(milliseconds, new C_handle_read(this));
+ center->create_time_event(milliseconds, read_handler);
}
void AsyncConnection::was_session_reset()
void AsyncConnection::_stop()
{
ldout(async_msgr->cct, 10) << __func__ << dendl;
- center->create_time_event(0, new C_handle_reset(async_msgr, this));
+ center->create_time_event(0, reset_handler);
shutdown_socket();
discard_out_queue();
outcoming_bl.clear();
Mutex lock;
utime_t backoff; // backoff time
bool open_write;
+ EventCallbackRef read_handler;
+ EventCallbackRef write_handler;
+ EventCallbackRef reset_handler;
// Tis section are temp variables used by state transition
// start thread
create();
if (listen_sd >= 0)
- center->create_file_event(listen_sd, EVENT_READABLE, new C_handle_accept(this));
+ center->create_file_event(listen_sd, EVENT_READABLE, EventCallbackRef(new C_handle_accept(this)));
return 0;
}
while (!done) {
ldout(msgr->cct,20) << __func__ << " calling poll" << dendl;
- r = center->process_events(1000);
+ r = center->process_events(30000);
if (r < 0) {
ldout(msgr->cct,20) << __func__ << " process events failed: "
<< cpp_strerror(errno) << dendl;
if (listen_sd >= 0) {
::shutdown(listen_sd, SHUT_RDWR);
}
+ center->stop();
// wait for thread to stop before closing the socket, to avoid
// racing against fd re-use.
notify_send_fd = fds[1];
nevent = n;
- create_file_event(notify_receive_fd, EVENT_READABLE, new C_handle_notify());
+ create_file_event(notify_receive_fd, EVENT_READABLE, EventCallbackRef(new C_handle_notify()));
return 0;
}
::close(notify_send_fd);
}
-int EventCenter::create_file_event(int fd, int mask, EventCallback *ctxt)
+int EventCenter::create_file_event(int fd, int mask, EventCallbackRef ctxt)
{
int r;
Mutex::Locker l(lock);
event->mask |= mask;
if (mask & EVENT_READABLE) {
- if (event->read_cb)
- delete event->read_cb;
event->read_cb = ctxt;
}
if (mask & EVENT_WRITABLE) {
- if (event->write_cb)
- delete event->write_cb;
event->write_cb = ctxt;
}
ldout(cct, 10) << __func__ << " create event fd=" << fd << " mask=" << mask
driver->del_event(fd, event->mask, mask);
if (mask & EVENT_READABLE && event->read_cb) {
- delete event->read_cb;
- event->read_cb = NULL;
+ event->read_cb.reset();
}
if (mask & EVENT_WRITABLE && event->write_cb) {
- delete event->write_cb;
- event->write_cb = NULL;
+ event->write_cb.reset();
}
event->mask = event->mask & (~mask);
<< " now mask is " << event->mask << dendl;
}
-uint64_t EventCenter::create_time_event(uint64_t milliseconds, EventCallback *ctxt)
+uint64_t EventCenter::create_time_event(uint64_t milliseconds, EventCallbackRef ctxt)
{
Mutex::Locker l(lock);
uint64_t id = time_event_next_id++;
ldout(cct, 1) << __func__ << dendl;
Mutex::Locker l(lock);
event_tp.start();
+ tp_stop = false;
}
void EventCenter::stop()
{
ldout(cct, 1) << __func__ << dendl;
Mutex::Locker l(lock);
- event_tp.stop();
+ if (!tp_stop) {
+ event_tp.stop();
+ tp_stop = true;
+ }
char buf[1];
buf[0] = 'c';
// wake up "event_wait"
virtual ~EventCallback() {} // we want a virtual destructor!!!
};
+typedef ceph::shared_ptr<EventCallback> EventCallbackRef;
+
struct FiredFileEvent {
int fd;
int mask;
struct FiredTimeEvent {
uint64_t id;
- EventCallback *time_cb;
+ EventCallbackRef time_cb;
};
struct FiredEvent {
- union {
- FiredFileEvent file_event;
- FiredTimeEvent time_event;
- };
+ FiredFileEvent file_event;
+ FiredTimeEvent time_event;
bool is_file;
FiredEvent(): is_file(true) {}
class EventCenter {
struct FileEvent {
int mask;
- EventCallback *read_cb;
- EventCallback *write_cb;
- FileEvent(): mask(0), read_cb(NULL), write_cb(NULL) {}
+ EventCallbackRef read_cb;
+ EventCallbackRef write_cb;
+ FileEvent(): mask(0) {}
};
struct TimeEvent {
uint64_t id;
- EventCallback *time_cb;
+ EventCallbackRef time_cb;
- TimeEvent(): id(0), time_cb(NULL) {}
+ TimeEvent(): id(0) {}
};
Mutex lock;
int notify_receive_fd;
int notify_send_fd;
utime_t next_wake;
+ bool tp_stop;
int process_time_events();
FileEvent *_get_file_event(int fd) {
}
} else {
e.time_event.time_cb->do_request(e.time_event.id);
- delete e.time_event.time_cb;
}
}
void _clear() {
EventCenter(CephContext *c):
lock("EventCenter::lock"), driver(NULL), cct(c), nevent(0), time_event_next_id(0),
event_tp(c, "EventCenter::event_tp", c->_conf->ms_event_op_threads, "eventcenter_op_threads"),
- notify_receive_fd(-1), notify_send_fd(-1),
+ notify_receive_fd(-1), notify_send_fd(-1),tp_stop(true),
event_wq(this, c->_conf->ms_event_thread_timeout, c->_conf->ms_event_thread_suicide_timeout, &event_tp) {
last_time = time(NULL);
}
~EventCenter();
int init(int nevent);
- int create_file_event(int fd, int mask, EventCallback *ctxt);
- uint64_t create_time_event(uint64_t milliseconds, EventCallback *ctxt);
+ int create_file_event(int fd, int mask, EventCallbackRef ctxt);
+ uint64_t create_time_event(uint64_t milliseconds, EventCallbackRef ctxt);
void delete_file_event(int fd, int mask);
void delete_time_event(uint64_t id);
int process_events(int timeout_milliseconds);
op = cur_mask == EVENT_NONE ? EPOLL_CTL_ADD: EPOLL_CTL_MOD;
}
- ee.events = 0;
+ ee.events = EPOLLET;
add_mask |= cur_mask; /* Merge old events */
if (add_mask & EVENT_READABLE)
ee.events |= EPOLLIN;
int mask = cur_mask & (~delmask);
- ee.events = EPOLLET;
+ ee.events = 0;
if (mask & EVENT_READABLE) ee.events |= EPOLLIN;
if (mask & EVENT_WRITABLE) ee.events |= EPOLLOUT;
ee.data.u64 = 0; /* avoid valgrind warning */