From: Haomai Wang Date: Wed, 10 Sep 2014 02:50:45 +0000 (+0800) Subject: Use shared_ptr for EventCallback X-Git-Tag: v0.88~37^2~4^2~29 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=e79f809395ad2ff5ca74a28563f3819b079049cc;p=ceph.git Use shared_ptr for EventCallback Signed-off-by: Haomai Wang --- diff --git a/src/msg/AsyncConnection.cc b/src/msg/AsyncConnection.cc index cf8c226ef006..19b9ed891de6 100644 --- a/src/msg/AsyncConnection.cc +++ b/src/msg/AsyncConnection.cc @@ -63,12 +63,12 @@ class C_handle_dispatch : public EventCallback { public: C_handle_dispatch(AsyncMessenger *msgr, Message *m): msgr(msgr), m(m) {} void do_request(int id) { - msgr->ms_fast_preprocess(m); - if (msgr->ms_can_fast_dispatch(m)) { - msgr->ms_fast_dispatch(m); - } else { + //msgr->ms_fast_preprocess(m); + //if (msgr->ms_can_fast_dispatch(m)) { + // msgr->ms_fast_dispatch(m); + //} else { msgr->ms_deliver_dispatch(m); - } + //} } }; @@ -101,7 +101,12 @@ AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m) state(STATE_NONE), state_after_send(0), sd(-1), lock("AsyncConnection::lock"), open_write(false), got_bad_auth(false), authorizer(NULL), - state_buffer(4096), state_offset(0), net(cct), center(&m->center) { } + state_buffer(4096), state_offset(0), net(cct), center(&m->center) +{ + read_handler.reset(new C_handle_read(this)); + write_handler.reset(new C_handle_write(this)); + reset_handler.reset(new C_handle_reset(async_msgr, this)); +} AsyncConnection::~AsyncConnection() { @@ -184,7 +189,7 @@ int AsyncConnection::_try_send(bufferlist send_bl, bool send) assert(!outcoming_bl.length()); connect_seq++; state = STATE_CONNECTING; - center->create_time_event(0, new C_handle_read(this)); + center->create_time_event(0, read_handler); return 0; } @@ -237,7 +242,7 @@ int AsyncConnection::_try_send(bufferlist send_bl, bool send) } if (!open_write && is_queued()) { - center->create_file_event(sd, EVENT_WRITABLE, new C_handle_write(this)); + center->create_file_event(sd, EVENT_WRITABLE, write_handler); open_write = true; } @@ -286,7 +291,7 @@ void AsyncConnection::process() int prev_state = state; Mutex::Locker l(lock); do { - ldout(async_msgr->cct, 10) << __func__ << " state is " << get_state_name(state) + ldout(async_msgr->cct, 20) << __func__ << " state is " << get_state_name(state) << ", prev state is " << get_state_name(prev_state) << dendl; prev_state = state; switch (state) { @@ -656,9 +661,17 @@ void AsyncConnection::process() in_seq = message->get_seq(); ldout(async_msgr->cct, 10) << __func__ << " got message " << message->get_seq() << " " << message << " " << *message << dendl; - center->create_time_event(0, new C_handle_dispatch(async_msgr, message)); - state = STATE_OPEN; + + async_msgr->ms_fast_preprocess(message); + if (async_msgr->ms_can_fast_dispatch(message)) { + lock.Unlock(); + async_msgr->ms_fast_dispatch(message); + lock.Lock(); + } else { + center->create_time_event(0, EventCallbackRef(new C_handle_dispatch(async_msgr, message))); + } + break; } @@ -757,7 +770,7 @@ int AsyncConnection::_process_connection() } net.set_socket_options(sd); - center->create_file_event(sd, EVENT_READABLE, new C_handle_read(this)); + center->create_file_event(sd, EVENT_READABLE, read_handler); state = STATE_CONNECTING_WAIT_BANNER; break; } @@ -1561,7 +1574,7 @@ void AsyncConnection::_connect() state = STATE_CONNECTING; // rescheduler connection in order to avoid lock dep - center->create_time_event(0, new C_handle_read(this)); + center->create_time_event(0, read_handler); } void AsyncConnection::accept(int incoming) @@ -1571,7 +1584,7 @@ void AsyncConnection::accept(int incoming) sd = incoming; state = STATE_ACCEPTING; - center->create_file_event(sd, EVENT_READABLE, new C_handle_read(this)); + center->create_file_event(sd, EVENT_READABLE, read_handler); process(); } @@ -1584,7 +1597,7 @@ int AsyncConnection::send_message(Message *m) Mutex::Locker l(lock); out_q[m->get_priority()].push_back(m); if (sd > 0 && !open_write) { - center->create_file_event(sd, EVENT_WRITABLE, new C_handle_write(this)); + center->create_file_event(sd, EVENT_WRITABLE, write_handler); open_write = true; } return 0; @@ -1713,7 +1726,7 @@ void AsyncConnection::fault() uint64_t milliseconds = double(backoff) * 1000; // woke up again; - center->create_time_event(milliseconds, new C_handle_read(this)); + center->create_time_event(milliseconds, read_handler); } void AsyncConnection::was_session_reset() @@ -1740,7 +1753,7 @@ void AsyncConnection::was_session_reset() void AsyncConnection::_stop() { ldout(async_msgr->cct, 10) << __func__ << dendl; - center->create_time_event(0, new C_handle_reset(async_msgr, this)); + center->create_time_event(0, reset_handler); shutdown_socket(); discard_out_queue(); outcoming_bl.clear(); diff --git a/src/msg/AsyncConnection.h b/src/msg/AsyncConnection.h index dfc28c8c62fa..d4f8395d535a 100644 --- a/src/msg/AsyncConnection.h +++ b/src/msg/AsyncConnection.h @@ -216,6 +216,9 @@ class AsyncConnection : public Connection { Mutex lock; utime_t backoff; // backoff time bool open_write; + EventCallbackRef read_handler; + EventCallbackRef write_handler; + EventCallbackRef reset_handler; // Tis section are temp variables used by state transition diff --git a/src/msg/AsyncMessenger.cc b/src/msg/AsyncMessenger.cc index abd27622ecc1..00bd5847ca0f 100644 --- a/src/msg/AsyncMessenger.cc +++ b/src/msg/AsyncMessenger.cc @@ -187,7 +187,7 @@ int Processor::start() // start thread create(); if (listen_sd >= 0) - center->create_file_event(listen_sd, EVENT_READABLE, new C_handle_accept(this)); + center->create_file_event(listen_sd, EVENT_READABLE, EventCallbackRef(new C_handle_accept(this))); return 0; } @@ -217,7 +217,7 @@ void *Processor::entry() while (!done) { ldout(msgr->cct,20) << __func__ << " calling poll" << dendl; - r = center->process_events(1000); + r = center->process_events(30000); if (r < 0) { ldout(msgr->cct,20) << __func__ << " process events failed: " << cpp_strerror(errno) << dendl; @@ -245,6 +245,7 @@ void Processor::stop() if (listen_sd >= 0) { ::shutdown(listen_sd, SHUT_RDWR); } + center->stop(); // wait for thread to stop before closing the socket, to avoid // racing against fd re-use. diff --git a/src/msg/Event.cc b/src/msg/Event.cc index 71f019fb84d9..4afb7227bf73 100644 --- a/src/msg/Event.cc +++ b/src/msg/Event.cc @@ -62,7 +62,7 @@ int EventCenter::init(int n) notify_send_fd = fds[1]; nevent = n; - create_file_event(notify_receive_fd, EVENT_READABLE, new C_handle_notify()); + create_file_event(notify_receive_fd, EVENT_READABLE, EventCallbackRef(new C_handle_notify())); return 0; } @@ -77,7 +77,7 @@ EventCenter::~EventCenter() ::close(notify_send_fd); } -int EventCenter::create_file_event(int fd, int mask, EventCallback *ctxt) +int EventCenter::create_file_event(int fd, int mask, EventCallbackRef ctxt) { int r; Mutex::Locker l(lock); @@ -105,13 +105,9 @@ int EventCenter::create_file_event(int fd, int mask, EventCallback *ctxt) event->mask |= mask; if (mask & EVENT_READABLE) { - if (event->read_cb) - delete event->read_cb; event->read_cb = ctxt; } if (mask & EVENT_WRITABLE) { - if (event->write_cb) - delete event->write_cb; event->write_cb = ctxt; } ldout(cct, 10) << __func__ << " create event fd=" << fd << " mask=" << mask @@ -130,12 +126,10 @@ void EventCenter::delete_file_event(int fd, int mask) driver->del_event(fd, event->mask, mask); if (mask & EVENT_READABLE && event->read_cb) { - delete event->read_cb; - event->read_cb = NULL; + event->read_cb.reset(); } if (mask & EVENT_WRITABLE && event->write_cb) { - delete event->write_cb; - event->write_cb = NULL; + event->write_cb.reset(); } event->mask = event->mask & (~mask); @@ -145,7 +139,7 @@ void EventCenter::delete_file_event(int fd, int mask) << " now mask is " << event->mask << dendl; } -uint64_t EventCenter::create_time_event(uint64_t milliseconds, EventCallback *ctxt) +uint64_t EventCenter::create_time_event(uint64_t milliseconds, EventCallbackRef ctxt) { Mutex::Locker l(lock); uint64_t id = time_event_next_id++; @@ -206,13 +200,17 @@ void EventCenter::start() ldout(cct, 1) << __func__ << dendl; Mutex::Locker l(lock); event_tp.start(); + tp_stop = false; } void EventCenter::stop() { ldout(cct, 1) << __func__ << dendl; Mutex::Locker l(lock); - event_tp.stop(); + if (!tp_stop) { + event_tp.stop(); + tp_stop = true; + } char buf[1]; buf[0] = 'c'; // wake up "event_wait" diff --git a/src/msg/Event.h b/src/msg/Event.h index caebc8af612b..e9aad221482b 100644 --- a/src/msg/Event.h +++ b/src/msg/Event.h @@ -40,6 +40,8 @@ class EventCallback { virtual ~EventCallback() {} // we want a virtual destructor!!! }; +typedef ceph::shared_ptr EventCallbackRef; + struct FiredFileEvent { int fd; int mask; @@ -47,14 +49,12 @@ struct FiredFileEvent { struct FiredTimeEvent { uint64_t id; - EventCallback *time_cb; + EventCallbackRef time_cb; }; struct FiredEvent { - union { - FiredFileEvent file_event; - FiredTimeEvent time_event; - }; + FiredFileEvent file_event; + FiredTimeEvent time_event; bool is_file; FiredEvent(): is_file(true) {} @@ -73,16 +73,16 @@ class EventDriver { class EventCenter { struct FileEvent { int mask; - EventCallback *read_cb; - EventCallback *write_cb; - FileEvent(): mask(0), read_cb(NULL), write_cb(NULL) {} + EventCallbackRef read_cb; + EventCallbackRef write_cb; + FileEvent(): mask(0) {} }; struct TimeEvent { uint64_t id; - EventCallback *time_cb; + EventCallbackRef time_cb; - TimeEvent(): id(0), time_cb(NULL) {} + TimeEvent(): id(0) {} }; Mutex lock; @@ -100,6 +100,7 @@ class EventCenter { int notify_receive_fd; int notify_send_fd; utime_t next_wake; + bool tp_stop; int process_time_events(); FileEvent *_get_file_event(int fd) { @@ -176,7 +177,6 @@ class EventCenter { } } else { e.time_event.time_cb->do_request(e.time_event.id); - delete e.time_event.time_cb; } } void _clear() { @@ -188,14 +188,14 @@ class EventCenter { EventCenter(CephContext *c): lock("EventCenter::lock"), driver(NULL), cct(c), nevent(0), time_event_next_id(0), event_tp(c, "EventCenter::event_tp", c->_conf->ms_event_op_threads, "eventcenter_op_threads"), - notify_receive_fd(-1), notify_send_fd(-1), + notify_receive_fd(-1), notify_send_fd(-1),tp_stop(true), event_wq(this, c->_conf->ms_event_thread_timeout, c->_conf->ms_event_thread_suicide_timeout, &event_tp) { last_time = time(NULL); } ~EventCenter(); int init(int nevent); - int create_file_event(int fd, int mask, EventCallback *ctxt); - uint64_t create_time_event(uint64_t milliseconds, EventCallback *ctxt); + int create_file_event(int fd, int mask, EventCallbackRef ctxt); + uint64_t create_time_event(uint64_t milliseconds, EventCallbackRef ctxt); void delete_file_event(int fd, int mask); void delete_time_event(uint64_t id); int process_events(int timeout_milliseconds); diff --git a/src/msg/EventEpoll.cc b/src/msg/EventEpoll.cc index c12e3b36b997..68d0e1459fbe 100644 --- a/src/msg/EventEpoll.cc +++ b/src/msg/EventEpoll.cc @@ -47,7 +47,7 @@ int EpollDriver::add_event(int fd, int cur_mask, int add_mask) op = cur_mask == EVENT_NONE ? EPOLL_CTL_ADD: EPOLL_CTL_MOD; } - ee.events = 0; + ee.events = EPOLLET; add_mask |= cur_mask; /* Merge old events */ if (add_mask & EVENT_READABLE) ee.events |= EPOLLIN; @@ -75,7 +75,7 @@ void EpollDriver::del_event(int fd, int cur_mask, int delmask) int mask = cur_mask & (~delmask); - ee.events = EPOLLET; + ee.events = 0; if (mask & EVENT_READABLE) ee.events |= EPOLLIN; if (mask & EVENT_WRITABLE) ee.events |= EPOLLOUT; ee.data.u64 = 0; /* avoid valgrind warning */