From: Haomai Wang Date: Thu, 18 Sep 2014 09:32:03 +0000 (+0800) Subject: AsyncMessenger: Use round-robin to dispatch new connection X-Git-Tag: v0.88~37^2~4^2~22 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=d62fe90e5dca5a397ef07634ca74465408f48d1b;p=ceph.git AsyncMessenger: Use round-robin to dispatch new connection EventCenter won't own lock to protect data, because each EventCenter will master own connections. Signed-off-by: Haomai Wang --- diff --git a/src/msg/AsyncConnection.cc b/src/msg/AsyncConnection.cc index ac60387e9f27..4f9b0c2eeb08 100644 --- a/src/msg/AsyncConnection.cc +++ b/src/msg/AsyncConnection.cc @@ -107,12 +107,12 @@ static void alloc_aligned_buffer(bufferlist& data, unsigned len, unsigned off) } } -AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m) +AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCenter *c) : Connection(cct, m), async_msgr(m), global_seq(0), connect_seq(0), out_seq(0), in_seq(0), in_seq_acked(0), state(STATE_NONE), state_after_send(0), sd(-1), lock("AsyncConnection::lock"), open_write(false), got_bad_auth(false), authorizer(NULL), - state_buffer(4096), state_offset(0), net(cct), center(&m->center) + state_buffer(4096), state_offset(0), net(cct), center(c) { read_handler.reset(new C_handle_read(this)); write_handler.reset(new C_handle_write(this)); @@ -1607,7 +1607,7 @@ void AsyncConnection::accept(int incoming) state = STATE_ACCEPTING; center->create_file_event(sd, EVENT_READABLE, read_handler); // rescheduler connection in order to avoid lock dep - center->create_time_event(0, read_handler); + process(); } int AsyncConnection::send_message(Message *m) diff --git a/src/msg/AsyncConnection.h b/src/msg/AsyncConnection.h index 842bb56a4aae..1fbb953ae9f4 100644 --- a/src/msg/AsyncConnection.h +++ b/src/msg/AsyncConnection.h @@ -87,7 +87,7 @@ class AsyncConnection : public Connection { } public: - AsyncConnection(CephContext *cct, AsyncMessenger *m); + AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCenter *c); ~AsyncConnection(); ostream& _conn_prefix(std::ostream *_dout); diff --git a/src/msg/AsyncMessenger.cc b/src/msg/AsyncMessenger.cc index 00bd5847ca0f..ae024298352d 100644 --- a/src/msg/AsyncMessenger.cc +++ b/src/msg/AsyncMessenger.cc @@ -4,7 +4,7 @@ #include #include #include - +#include #include "AsyncMessenger.h" @@ -22,20 +22,34 @@ static ostream& _prefix(std::ostream *_dout, AsyncMessenger *m) { } static ostream& _prefix(std::ostream *_dout, Processor *p) { - return *_dout << "-- "; + return *_dout << "-- Processor"; } -/******************* - * EventCallBack - */ +static ostream& _prefix(std::ostream *_dout, Worker *w) { + return *_dout << "--"; +} class C_handle_accept : public EventCallback { - Processor *p; + AsyncConnectionRef conn; + int fd; + + public: + C_handle_accept(AsyncConnectionRef c, int s): conn(c), fd(s) {} + void do_request(int id) { + conn->accept(fd); + } +}; + +class C_handle_connect : public EventCallback { + AsyncConnectionRef conn; + const entity_addr_t addr; + int type; public: - C_handle_accept(Processor *p): p(p) {} - void do_request(int fd) { - p->accept(); + C_handle_connect(AsyncConnectionRef c, const entity_addr_t &d, int t) + :conn(c), addr(d), type(t) {} + void do_request(int id) { + conn->connect(addr, type); } }; @@ -50,7 +64,7 @@ int Processor::bind(const entity_addr_t &bind_addr, const set& avoid_ports) // bind to a socket ldout(msgr->cct, 10) << __func__ << dendl; - int family, flags; + int family; switch (bind_addr.get_family()) { case AF_INET: case AF_INET6: @@ -134,14 +148,6 @@ int Processor::bind(const entity_addr_t &bind_addr, const set& avoid_ports) return rc; } - if ((flags = fcntl(listen_sd, F_GETFL, 0)) < 0 || - fcntl(listen_sd, F_SETFL, flags | O_NONBLOCK) < 0) { - rc = -errno; - lderr(msgr->cct) << __func__ << " unable to setnonblock on " << listen_addr - << ": " << cpp_strerror(rc) << dendl; - return rc; - } - msgr->set_myaddr(bind_addr); if (bind_addr != entity_addr_t()) msgr->learned_addr(bind_addr); @@ -186,42 +192,45 @@ int Processor::start() // start thread create(); - if (listen_sd >= 0) - center->create_file_event(listen_sd, EVENT_READABLE, EventCallbackRef(new C_handle_accept(this))); return 0; } -void Processor::accept() -{ - ldout(msgr->cct, 10) << __func__ << " starting" << dendl; - // accept - entity_addr_t addr; - socklen_t slen = sizeof(addr.ss_addr()); - int sd = ::accept(listen_sd, (sockaddr*)&addr.ss_addr(), &slen); - if (sd >= 0) { - ldout(msgr->cct,10) << __func__ << " incoming on sd " << sd << dendl; - - msgr->add_accept(sd); - } else { - ldout(msgr->cct,0) << __func__ << " no incoming connection? sd = " << sd - << " errno " << errno << " " << cpp_strerror(errno) << dendl; - } -} - void *Processor::entry() { ldout(msgr->cct, 10) << __func__ << " starting" << dendl; - int r; + int errors = 0; + struct pollfd pfd; + pfd.fd = listen_sd; + pfd.events = POLLIN | POLLERR | POLLNVAL | POLLHUP; while (!done) { - ldout(msgr->cct,20) << __func__ << " calling poll" << dendl; - - r = center->process_events(30000); - if (r < 0) { - ldout(msgr->cct,20) << __func__ << " process events failed: " - << cpp_strerror(errno) << dendl; - // TODO do something? + ldout(msgr->cct, 20) << __func__ << " calling poll" << dendl; + int r = poll(&pfd, 1, -1); + if (r < 0) + break; + ldout(msgr->cct,20) << __func__ << " poll got " << r << dendl; + + if (pfd.revents & (POLLERR | POLLNVAL | POLLHUP)) + break; + + ldout(msgr->cct,10) << __func__ << " pfd.revents=" << pfd.revents << dendl; + if (done) break; + + // accept + entity_addr_t addr; + socklen_t slen = sizeof(addr.ss_addr()); + int sd = ::accept(listen_sd, (sockaddr*)&addr.ss_addr(), &slen); + if (sd >= 0) { + errors = 0; + ldout(msgr->cct,10) << __func__ << "accepted incoming on sd " << sd << dendl; + + msgr->add_accept(sd); + } else { + ldout(msgr->cct,0) << __func__ << " no incoming connection? sd = " << sd + << " errno " << errno << " " << cpp_strerror(errno) << dendl; + if (++errors > 4) + break; } } @@ -238,14 +247,11 @@ void *Processor::entry() void Processor::stop() { done = true; - ldout(msgr->cct, 10) << __func__ << " processor" << dendl; + ldout(msgr->cct,10) << __func__ << dendl; - if (listen_sd >= 0) - center->delete_file_event(listen_sd, EVENT_READABLE); if (listen_sd >= 0) { ::shutdown(listen_sd, SHUT_RDWR); } - center->stop(); // wait for thread to stop before closing the socket, to avoid // racing against fd re-use. @@ -260,6 +266,31 @@ void Processor::stop() done = false; } +void Worker::stop() +{ + ldout(msgr->cct, 10) << __func__ << dendl; + done = true; + center.wakeup(); +} + +void *Worker::entry() +{ + ldout(msgr->cct, 10) << __func__ << " starting" << dendl; + int r; + + while (!done) { + ldout(msgr->cct, 20) << __func__ << " calling event process" << dendl; + + r = center.process_events(30000); + if (r < 0) { + ldout(msgr->cct,20) << __func__ << " process events failed: " + << cpp_strerror(errno) << dendl; + // TODO do something? + } + } + + return 0; +} /******************* * AsyncMessenger @@ -268,17 +299,20 @@ void Processor::stop() AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name, string mname, uint64_t _nonce) : SimplePolicyMessenger(cct, name,mname, _nonce), - processor(this, _nonce, ¢er), + conn_id(0), + processor(this, _nonce), lock("AsyncMessenger::lock"), nonce(_nonce), did_bind(false), global_seq(0), - cluster_protocol(0), stopped(true), - local_connection(new AsyncConnection(cct, this)), - center(cct) + cluster_protocol(0), stopped(true) { ceph_spin_init(&global_seq_lock); + for (int i = 0; i < cct->_conf->ms_event_op_threads; ++i) { + Worker *w = new Worker(this, cct); + workers.push_back(w); + } + local_connection = new AsyncConnection(cct, this, &workers[0]->center); init_local_connection(); - center.init(5000); } /** @@ -302,10 +336,12 @@ void AsyncMessenger::ready() int AsyncMessenger::shutdown() { ldout(cct,10) << __func__ << "shutdown " << get_myaddr() << dendl; - center.stop(); + for (vector::iterator it = workers.begin(); it != workers.end(); ++it) + (*it)->stop(); mark_down_all(); // break ref cycles on the loopback connection + processor.stop(); local_connection->set_priv(NULL); stop_cond.Signal(); stopped = true; @@ -336,7 +372,11 @@ int AsyncMessenger::rebind(const set& avoid_ports) { ldout(cct,1) << __func__ << " rebind avoid " << avoid_ports << dendl; assert(did_bind); - center.stop(); + for (vector::iterator it = workers.begin(); it != workers.end(); ++it) { + (*it)->stop(); + (*it)->join(); + } + processor.stop(); mark_down_all(); return processor.rebind(avoid_ports); @@ -359,9 +399,10 @@ int AsyncMessenger::start() _init_local_connection(); } - lock.Unlock(); + for (vector::iterator it = workers.begin(); it != workers.end(); ++it) + (*it)->create(); - center.start(); + lock.Unlock(); return 0; } @@ -374,6 +415,9 @@ void AsyncMessenger::wait() } if (!stopped) stop_cond.Wait(lock); + + for (vector::iterator it = workers.begin(); it != workers.end(); ++it) + (*it)->join(); lock.Unlock(); // done! clean up. @@ -402,9 +446,11 @@ void AsyncMessenger::wait() AsyncConnectionRef AsyncMessenger::add_accept(int sd) { lock.Lock(); - AsyncConnectionRef conn = new AsyncConnection(cct, this); - conn->accept(sd); + Worker *w = workers[conn_id % workers.size()]; + AsyncConnectionRef conn = new AsyncConnection(cct, this, &w->center); + w->center.dispatch_event_external(EventCallbackRef(new C_handle_accept(conn, sd))); accepting_conns.insert(conn); + conn_id++; lock.Unlock(); return conn; } @@ -418,10 +464,12 @@ AsyncConnectionRef AsyncMessenger::create_connect(const entity_addr_t& addr, int << ", creating connection and registering" << dendl; // create connection - AsyncConnectionRef conn = new AsyncConnection(cct, this); - conn->connect(addr, type); + Worker *w = workers[conn_id % workers.size()]; + AsyncConnectionRef conn = new AsyncConnection(cct, this, &w->center); + w->center.dispatch_event_external(EventCallbackRef(new C_handle_connect(conn, addr, type))); assert(!conns.count(addr)); conns[addr] = conn; + conn_id++; return conn; } diff --git a/src/msg/AsyncMessenger.h b/src/msg/AsyncMessenger.h index a8d70b2cb767..087f557de497 100644 --- a/src/msg/AsyncMessenger.h +++ b/src/msg/AsyncMessenger.h @@ -36,10 +36,9 @@ class Processor : public Thread { bool done; int listen_sd; uint64_t nonce; - EventCenter *center; public: - Processor(AsyncMessenger *r, uint64_t n, EventCenter *c) : msgr(r), done(false), listen_sd(-1), nonce(n), center(c) {} + Processor(AsyncMessenger *r, uint64_t n) : msgr(r), done(false), listen_sd(-1), nonce(n) {} void *entry(); void stop(); @@ -49,6 +48,19 @@ class Processor : public Thread { void accept(); }; +class Worker : public Thread { + AsyncMessenger *msgr; + bool done; + + public: + EventCenter center; + Worker(AsyncMessenger *m, CephContext *c): msgr(m), done(false), center(c) { + center.init(5000); + } + void *entry(); + void stop(); +}; + /* * This class handles transmission and reception of messages. Generally @@ -167,7 +179,10 @@ public: */ Connection *create_anon_connection() { - return new AsyncConnection(cct, this); + Mutex::Locker l(lock); + Worker *w = workers[conn_id % workers.size()]; + conn_id++; + return new AsyncConnection(cct, this, &w->center); } /** @@ -226,6 +241,9 @@ private: int _send_message(Message *m, const entity_inst_t& dest); private: + vector workers; + int conn_id; + Processor processor; friend class Processor; @@ -264,6 +282,7 @@ private: * * These are not yet in the conns map. */ + // FIXME clear up set accepting_conns; /// internal cluster protocol version, if any, for talking to entities of the same type. @@ -277,11 +296,6 @@ private: ceph::unordered_map::iterator p = conns.find(k); if (p == conns.end()) return NULL; - if (!p->second->is_connected()) { - // FIXME - conns.erase(p); - return NULL; - } return p->second; } @@ -306,8 +320,6 @@ public: /// con used for sending messages to ourselves ConnectionRef local_connection; - EventCenter center; - /** * @defgroup AsyncMessenger internals * @{ diff --git a/src/msg/Event.cc b/src/msg/Event.cc index d1ef51e0be02..3f3cfdc2b3f8 100644 --- a/src/msg/Event.cc +++ b/src/msg/Event.cc @@ -80,7 +80,6 @@ EventCenter::~EventCenter() int EventCenter::create_file_event(int fd, int mask, EventCallbackRef ctxt) { int r; - Mutex::Locker l(lock); if (file_events.size() > nevent) { int new_size = nevent << 2; ldout(cct, 10) << __func__ << " event count exceed " << nevent << ", expand to " << new_size << dendl; @@ -117,8 +116,6 @@ int EventCenter::create_file_event(int fd, int mask, EventCallbackRef ctxt) void EventCenter::delete_file_event(int fd, int mask) { - Mutex::Locker l(lock); - EventCenter::FileEvent *event = _get_file_event(fd); if (!event) return ; @@ -141,20 +138,9 @@ void EventCenter::delete_file_event(int fd, int mask) uint64_t EventCenter::create_time_event(uint64_t milliseconds, EventCallbackRef ctxt) { - Mutex::Locker l(lock); uint64_t id = time_event_next_id++; ldout(cct, 10) << __func__ << " id=" << id << " expire time=" << milliseconds << dendl; - // Direct dispatch - if (milliseconds == 0) { - FiredEvent e; - e.time_event.id = id; - e.time_event.time_cb = ctxt; - e.is_file = false; - event_wq.queue(e); - return id; - } - EventCenter::TimeEvent event; utime_t expire; struct timeval tv; @@ -171,19 +157,13 @@ uint64_t EventCenter::create_time_event(uint64_t milliseconds, EventCallbackRef time_events[id] = event; if (expire < next_wake) { - char buf[1]; - buf[0] = 'c'; - // wake up "event_wait" - int n = write(notify_send_fd, buf, 1); - // FIXME ? - assert(n == 1); + wakeup(); } return id; } void EventCenter::delete_time_event(uint64_t id) { - Mutex::Locker l(lock); for (map::iterator it = time_to_ids.begin(); it != time_to_ids.end(); it++) { if (it->second == id) { @@ -195,21 +175,9 @@ void EventCenter::delete_time_event(uint64_t id) } } -void EventCenter::start() -{ - ldout(cct, 1) << __func__ << dendl; - Mutex::Locker l(lock); - event_tp.start(); - tp_stop = false; -} - -void EventCenter::stop() +void EventCenter::wakeup() { ldout(cct, 1) << __func__ << dendl; - if (!tp_stop) { - event_tp.stop(); - tp_stop = true; - } char buf[1]; buf[0] = 'c'; // wake up "event_wait" @@ -220,7 +188,6 @@ void EventCenter::stop() int EventCenter::process_time_events() { - Mutex::Locker l(lock); int processed = 0; time_t now = time(NULL); utime_t cur = ceph_clock_now(cct); @@ -249,13 +216,9 @@ int EventCenter::process_time_events() it != time_to_ids.end(); ) { prev = it; if (cur >= it->first) { - FiredEvent e; - e.time_event.id = it->second; - e.time_event.time_cb = time_events[it->second].time_cb; - e.is_file = false; - event_wq.queue(e); ldout(cct, 10) << __func__ << " queue time event: id=" << it->second << " time is " << it->first << dendl; + time_events[it->second].time_cb->do_request(it->first); processed++; ++it; time_to_ids.erase(prev); @@ -283,7 +246,6 @@ int EventCenter::process_events(int timeout_millionseconds) shortest.set_from_timeval(&tv); { - Mutex::Locker l(lock); for (map::iterator it = time_to_ids.begin(); it != time_to_ids.end(); ++it) { ldout(cct, 10) << __func__ << " time_to_ids " << it->first << " id=" << it->second << dendl; @@ -308,15 +270,48 @@ int EventCenter::process_events(int timeout_millionseconds) vector fired_events; numevents = driver->event_wait(fired_events, &tv); for (int j = 0; j < numevents; j++) { - FiredEvent e; - e.file_event = fired_events[j]; - e.is_file = true; - event_wq.queue(e); - ldout(cct, 10) << __func__ << " event_wq queue fd is " << fired_events[j].fd << " mask is " << fired_events[j].mask << dendl; + int rfired = 0; + FileEvent *event = _get_file_event(fired_events[j].fd); + if (!event) + continue; + + /* note the event->mask & mask & ... code: maybe an already processed + * event removed an element that fired and we still didn't + * processed, so we check if the event is still valid. */ + if (event->mask & fired_events[j].mask & EVENT_READABLE) { + rfired = 1; + event->read_cb->do_request(fired_events[j].fd); + } + event = _get_file_event(fired_events[j].fd); + if (!event) + continue; + + if (event->mask & fired_events[j].mask & EVENT_WRITABLE) { + if (!rfired || event->read_cb != event->write_cb) + event->write_cb->do_request(fired_events[j].fd); + } + + ldout(cct, 20) << __func__ << " event_wq queue fd is " << fired_events[j].fd << " mask is " << fired_events[j].mask << dendl; } if (trigger_time) numevents += process_time_events(); + { + Mutex::Locker l(lock); + while (!external_events.empty()) { + EventCallbackRef e = external_events.front(); + external_events.pop_front(); + e->do_request(0); + } + } return numevents; } + +void EventCenter::dispatch_event_external(EventCallbackRef e) +{ + lock.Lock(); + external_events.push_back(e); + lock.Unlock(); + wakeup(); +} diff --git a/src/msg/Event.h b/src/msg/Event.h index cc6aee454d01..d761ea874e51 100644 --- a/src/msg/Event.h +++ b/src/msg/Event.h @@ -47,19 +47,6 @@ struct FiredFileEvent { int mask; }; -struct FiredTimeEvent { - uint64_t id; - EventCallbackRef time_cb; -}; - -struct FiredEvent { - FiredFileEvent file_event; - FiredTimeEvent time_event; - bool is_file; - - FiredEvent(): is_file(true) {} -}; - class EventDriver { public: virtual ~EventDriver() {} // we want a virtual destructor!!! @@ -85,22 +72,22 @@ class EventCenter { TimeEvent(): id(0) {} }; + CephContext *cct; + uint64_t nevent; + // Used only to external event Mutex lock; + deque external_events; unordered_map file_events; + EventDriver *driver; // The second element is id map time_to_ids; // The first element is id unordered_map time_events; - EventDriver *driver; - CephContext *cct; - uint64_t nevent; uint64_t time_event_next_id; - ThreadPool event_tp; time_t last_time; // last time process time event int notify_receive_fd; int notify_send_fd; utime_t next_wake; - bool tp_stop; int process_time_events(); FileEvent *_get_file_event(int fd) { @@ -111,98 +98,26 @@ class EventCenter { return NULL; } - struct EventWQ : public ThreadPool::WorkQueueVal { - EventCenter *center; - // In order to ensure the file descriptor is unique in conn_queue, - // pending is introduced to check - // - deque conn_queue; - // used only by file event - unordered_map pending; - - EventWQ(EventCenter *c, time_t timeout, time_t suicide_timeout, ThreadPool *tp) - : ThreadPool::WorkQueueVal("Event::EventWQ", timeout, suicide_timeout, tp), center(c) {} - - void _enqueue(FiredEvent e) { - if (e.is_file) { - // Ensure only one thread process one file descriptor - unordered_map::iterator it = pending.find(e.file_event.fd); - if (it != pending.end()) { - it->second |= e.file_event.mask; - } else { - pending[e.file_event.fd] = e.file_event.mask; - conn_queue.push_back(e); - } - } else { - conn_queue.push_back(e); - } - } - void _enqueue_front(FiredEvent e) { - assert(0); - } - void _dequeue(FiredEvent c) { - assert(0); - } - bool _empty() { - return conn_queue.empty(); - } - FiredEvent _dequeue() { - assert(!conn_queue.empty()); - FiredEvent e = conn_queue.front(); - conn_queue.pop_front(); - if (e.is_file) { - e.file_event.mask = pending[e.file_event.fd]; - pending.erase(e.file_event.fd); - } - return e; - } - void _process(FiredEvent e, ThreadPool::TPHandle &handle) { - if (e.is_file) { - int rfired = 0; - FileEvent *event = center->get_file_event(e.file_event.fd); - if (!event) - return ; - - /* note the event->mask & mask & ... code: maybe an already processed - * event removed an element that fired and we still didn't - * processed, so we check if the event is still valid. */ - if (event->mask & e.file_event.mask & EVENT_READABLE) { - rfired = 1; - event->read_cb->do_request(e.file_event.fd); - } - if (event->mask & e.file_event.mask & EVENT_WRITABLE) { - if (!rfired || event->read_cb != event->write_cb) - event->write_cb->do_request(e.file_event.fd); - } - } else { - e.time_event.time_cb->do_request(e.time_event.id); - } - } - void _clear() { - } - } event_wq; - public: EventCenter(CephContext *c): - lock("EventCenter::lock"), driver(NULL), cct(c), nevent(0), time_event_next_id(0), - event_tp(c, "EventCenter::event_tp", c->_conf->ms_event_op_threads, "eventcenter_op_threads"), - notify_receive_fd(-1), notify_send_fd(-1),tp_stop(true), - event_wq(this, c->_conf->ms_event_thread_timeout, c->_conf->ms_event_thread_suicide_timeout, &event_tp) { + cct(c), nevent(0), + lock("AsyncMessenger::lock"), + driver(NULL), time_event_next_id(0), + notify_receive_fd(-1), notify_send_fd(-1) { last_time = time(NULL); } ~EventCenter(); int init(int nevent); + // Used by internal thread int create_file_event(int fd, int mask, EventCallbackRef ctxt); uint64_t create_time_event(uint64_t milliseconds, EventCallbackRef ctxt); void delete_file_event(int fd, int mask); void delete_time_event(uint64_t id); int process_events(int timeout_milliseconds); - void start(); - void stop(); - FileEvent *get_file_event(int fd) { - Mutex::Locker l(lock); - return _get_file_event(fd); - } + void wakeup(); + + // Used by external thread + void dispatch_event_external(EventCallbackRef e); }; #endif