From: Haomai Wang Date: Sun, 7 Dec 2014 16:28:11 +0000 (+0800) Subject: AsyncMessenger: Using EventCenter instead of poll for bind X-Git-Tag: v0.93~247^2~26 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=8d2af2faeef2799307be9086f838784f83b55664;p=ceph.git AsyncMessenger: Using EventCenter instead of poll for bind Totally avoid extra thread in AsyncMessenger now. The bind socket will be regarded as a normal socket and will dispatch a random Worker thread to handle accept event. Signed-off-by: Haomai Wang --- diff --git a/src/msg/async/AsyncMessenger.cc b/src/msg/async/AsyncMessenger.cc index 670b980a2719..d6a99a325d52 100644 --- a/src/msg/async/AsyncMessenger.cc +++ b/src/msg/async/AsyncMessenger.cc @@ -19,7 +19,6 @@ #include #include #include -#include #ifdef HAVE_SCHED #include #endif @@ -54,27 +53,46 @@ static ostream& _prefix(std::ostream *_dout, WorkerPool *p) { } -class C_handle_accept : public EventCallback { +class C_conn_accept : public EventCallback { AsyncConnectionRef conn; int fd; public: - C_handle_accept(AsyncConnectionRef c, int s): conn(c), fd(s) {} + C_conn_accept(AsyncConnectionRef c, int s): conn(c), fd(s) {} void do_request(int id) { conn->accept(fd); } }; -class C_handle_connect : public EventCallback { - AsyncConnectionRef conn; - const entity_addr_t addr; - int type; +class C_processor_accept : public EventCallback { + Processor *pro; + + public: + C_processor_accept(Processor *p): pro(p) {} + void do_request(int id) { + pro->accept(); + } +}; + + +class C_processor_start : public EventCallback { + Processor *pro; + Worker *worker; + public: + C_processor_start(Processor *p, Worker *w): pro(p), worker(w) {} + void do_request(int id) { + pro->start(worker); + } +}; + + +class C_processor_stop : public EventCallback { + Processor *pro; public: - C_handle_connect(AsyncConnectionRef c, const entity_addr_t &d, int t) - :conn(c), addr(d), type(t) {} + C_processor_stop(Processor *p): pro(p) {} void do_request(int id) { - conn->connect(addr, type); + pro->stop_cb(); } }; @@ -205,91 +223,72 @@ int Processor::rebind(const set& avoid_ports) ldout(msgr->cct, 10) << __func__ << " new nonce " << nonce << " and inst " << msgr->my_inst << dendl; ldout(msgr->cct, 10) << __func__ << " will try " << addr << " and avoid ports " << new_avoid << dendl; - int r = bind(addr, new_avoid); - if (r == 0) - start(); - return r; + return bind(addr, new_avoid); } -int Processor::start() +int Processor::start(Worker *w) { - ldout(msgr->cct, 1) << __func__ << " start" << dendl; + ldout(msgr->cct, 1) << __func__ << " " << dendl; // start thread - if (listen_sd > 0) - create(); + if (listen_sd > 0) { + assert(w->center.get_owner() == pthread_self()); + worker = w; + w->center.create_file_event(listen_sd, EVENT_READABLE, + EventCallbackRef(new C_processor_accept(this))); + } return 0; } -void *Processor::entry() +void Processor::accept() { - ldout(msgr->cct, 10) << __func__ << " starting" << dendl; + ldout(msgr->cct, 10) << __func__ << " listen_sd=" << listen_sd << dendl; int errors = 0; - - struct pollfd pfd; - pfd.fd = listen_sd; - pfd.events = POLLIN | POLLERR | POLLNVAL | POLLHUP; - while (!done) { - ldout(msgr->cct, 20) << __func__ << " calling poll" << dendl; - int r = poll(&pfd, 1, -1); - if (r < 0) - break; - ldout(msgr->cct,20) << __func__ << " poll got " << r << dendl; - - if (pfd.revents & (POLLERR | POLLNVAL | POLLHUP)) - break; - - ldout(msgr->cct,10) << __func__ << " pfd.revents=" << pfd.revents << dendl; - if (done) break; - - // accept + while (errors < 4) { entity_addr_t addr; socklen_t slen = sizeof(addr.ss_addr()); int sd = ::accept(listen_sd, (sockaddr*)&addr.ss_addr(), &slen); if (sd >= 0) { errors = 0; - ldout(msgr->cct,10) << __func__ << "accepted incoming on sd " << sd << dendl; + ldout(msgr->cct, 10) << __func__ << " accepted incoming on sd " << sd << dendl; msgr->add_accept(sd); + break; } else { - ldout(msgr->cct,0) << __func__ << " no incoming connection? sd = " << sd - << " errno " << errno << " " << cpp_strerror(errno) << dendl; - if (++errors > 4) - break; + ldout(msgr->cct, 0) << __func__ << " no incoming connection? sd = " << sd + << " errno " << errno << " " << cpp_strerror(errno) << dendl; } } +} + +void Processor::stop_cb() +{ + ldout(msgr->cct,10) << __func__ << dendl; - ldout(msgr->cct,20) << __func__ << " closing" << dendl; - // don't close socket, in case we start up again? blech. + Mutex::Locker l(stop_lock); if (listen_sd >= 0) { + worker->center.delete_file_event(listen_sd, EVENT_READABLE); + ::shutdown(listen_sd, SHUT_RDWR); ::close(listen_sd); listen_sd = -1; } - ldout(msgr->cct,10) << __func__ << " stopping" << dendl; - return 0; + worker = NULL; + stop_cond.Signal(); } void Processor::stop() { - done = true; ldout(msgr->cct,10) << __func__ << dendl; if (listen_sd >= 0) { - ::shutdown(listen_sd, SHUT_RDWR); - } - - // wait for thread to stop before closing the socket, to avoid - // racing against fd re-use. - if (is_started()) { - join(); + assert(worker && worker->center.get_owner() != pthread_self()); + Mutex::Locker l(stop_lock); + worker->center.dispatch_event_external(EventCallbackRef(new C_processor_stop(this))); + stop_cond.Wait(stop_lock); + assert(listen_sd == -1); + assert(worker == NULL); } - - if (listen_sd >= 0) { - ::close(listen_sd); - listen_sd = -1; - } - done = false; } void Worker::stop() @@ -418,9 +417,10 @@ void AsyncMessenger::ready() { ldout(cct,10) << __func__ << " " << get_myaddr() << dendl; - lock.Lock(); - processor.start(); - lock.Unlock(); + Mutex::Locker l(lock); + Worker *w = pool->get_worker(); + w->center.dispatch_event_external( + EventCallbackRef(new C_processor_start(&processor, w))); } int AsyncMessenger::shutdown() @@ -465,7 +465,13 @@ int AsyncMessenger::rebind(const set& avoid_ports) processor.stop(); mark_down_all(); - return processor.rebind(avoid_ports); + int r = processor.rebind(avoid_ports); + if (r == 0) { + Worker *w = pool->get_worker(); + w->center.dispatch_event_external( + EventCallbackRef(new C_processor_start(&processor, w))); + } + return r; } int AsyncMessenger::start() @@ -521,7 +527,7 @@ AsyncConnectionRef AsyncMessenger::add_accept(int sd) lock.Lock(); Worker *w = pool->get_worker(); AsyncConnectionRef conn = new AsyncConnection(cct, this, &w->center); - w->center.dispatch_event_external(EventCallbackRef(new C_handle_accept(conn, sd))); + w->center.dispatch_event_external(EventCallbackRef(new C_conn_accept(conn, sd))); accepting_conns.insert(conn); lock.Unlock(); return conn; diff --git a/src/msg/async/AsyncMessenger.h b/src/msg/async/AsyncMessenger.h index 91004fca7417..508ab641cecb 100644 --- a/src/msg/async/AsyncMessenger.h +++ b/src/msg/async/AsyncMessenger.h @@ -39,28 +39,6 @@ using namespace std; class AsyncMessenger; - -/** - * If the Messenger binds to a specific address, the Processor runs - * and listens for incoming connections. - */ -class Processor : public Thread { - AsyncMessenger *msgr; - bool done; - int listen_sd; - uint64_t nonce; - - public: - Processor(AsyncMessenger *r, uint64_t n) : msgr(r), done(false), listen_sd(-1), nonce(n) {} - - void *entry(); - void stop(); - int bind(const entity_addr_t &bind_addr, const set& avoid_ports); - int rebind(const set& avoid_port); - int start(); - void accept(); -}; - class WorkerPool; class Worker : public Thread { @@ -79,6 +57,31 @@ class Worker : public Thread { void stop(); }; +/** + * If the Messenger binds to a specific address, the Processor runs + * and listens for incoming connections. + */ +class Processor { + AsyncMessenger *msgr; + Worker *worker; + int listen_sd; + uint64_t nonce; + Mutex stop_lock; + Cond stop_cond; + + public: + Processor(AsyncMessenger *r, uint64_t n) + : msgr(r), worker(NULL), listen_sd(-1), nonce(n), + stop_lock("AsyncMessenger::Processor::stop_lock") {} + + void stop_cb(); + void stop(); + int bind(const entity_addr_t &bind_addr, const set& avoid_ports); + int rebind(const set& avoid_port); + int start(Worker *w); + void accept(); +}; + class WorkerPool: CephContext::AssociatedSingletonObject { WorkerPool(const WorkerPool &); WorkerPool& operator=(const WorkerPool &);