From: Haomai Wang Date: Fri, 12 Sep 2014 07:51:52 +0000 (+0800) Subject: Remove DispatchQueue in AsyncMessenger X-Git-Tag: v0.88~37^2~4^2~38 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=a653af13c354b6de3bda2d2679485db3e12b6882;p=ceph.git Remove DispatchQueue in AsyncMessenger Signed-off-by: Haomai Wang --- diff --git a/src/msg/AsyncConnection.cc b/src/msg/AsyncConnection.cc index 296c4451357d..7e9dfb09bef1 100644 --- a/src/msg/AsyncConnection.cc +++ b/src/msg/AsyncConnection.cc @@ -1,7 +1,9 @@ // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab #include +#include #include + #include "include/Context.h" #include "common/errno.h" #include "AsyncMessenger.h" @@ -69,8 +71,8 @@ static void alloc_aligned_buffer(bufferlist& data, unsigned len, unsigned off) AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m) : Connection(cct, m), async_msgr(m), global_seq(0), connect_seq(0), out_seq(0), in_seq(0), in_seq_acked(0), - state(STATE_NONE), state_after_send(0), sd(-1), conn_id(m->dispatch_queue.get_id()), - in_q(&(m->dispatch_queue)), lock("AsyncConnection::lock"), backoff(0), + state(STATE_NONE), state_after_send(0), sd(-1), + lock("AsyncConnection::lock"), backoff(0), got_bad_auth(false), authorizer(NULL), state_offset(0), net(cct), center(&m->center) { } AsyncConnection::~AsyncConnection() @@ -415,32 +417,13 @@ void AsyncConnection::process() << policy.throttler_bytes->get_max() << dendl; // FIXME: when to try it again? if (policy.throttler_bytes->get_or_fail(message_size)) - state = STATE_OPEN_MESSAGE_THROTTLE_DISPATCH; + state = STATE_OPEN_MESSAGE_READ_FRONT; } } break; } - case STATE_OPEN_MESSAGE_THROTTLE_DISPATCH: - { - uint64_t message_size = current_header.front_len + current_header.middle_len + current_header.data_len; - if (message_size) { - // throttle total bytes waiting for dispatch. do this _after_ the - // policy throttle, as this one does not deadlock (unless dispatch - // blocks indefinitely, which it shouldn't). in contrast, the - // policy throttle carries for the lifetime of the message. - ldout(async_msgr->cct,10) << __func__ << " wants " << message_size << " from dispatch throttler " - << async_msgr->dispatch_throttler.get_current() << "/" - << async_msgr->dispatch_throttler.get_max() << dendl; - if (async_msgr->dispatch_throttler.get_or_fail(message_size)) { - state = STATE_OPEN_MESSAGE_READ_FRONT; - throttle_stamp = ceph_clock_now(async_msgr->cct); - } - } - - break; - } case STATE_OPEN_MESSAGE_READ_FRONT: { // read front @@ -588,7 +571,7 @@ void AsyncConnection::process() // ceph::shared_ptr auth_handler = session_security; - if (auth_handler == NULL) { + if (auth_handler) { ldout(async_msgr->cct, 10) << __func__ << " No session security set" << dendl; } else { if (auth_handler->check_message_signature(message)) { @@ -617,7 +600,6 @@ void AsyncConnection::process() ldout(async_msgr->cct,0) << __func__ << " got old message " << message->get_seq() << " <= " << in_seq << " " << message << " " << *message << ", discarding" << dendl; - async_msgr->dispatch_throttle_release(message->get_dispatch_throttle_size()); message->put(); if (has_feature(CEPH_FEATURE_RECONNECT_SEQ) && async_msgr->cct->_conf->ms_die_on_old_message) assert(0 == "old msgs despite reconnect_seq feature"); @@ -630,11 +612,11 @@ void AsyncConnection::process() in_seq = message->get_seq(); ldout(async_msgr->cct, 10) << __func__ << " got message " << message->get_seq() << " " << message << " " << *message << dendl; - in_q->fast_preprocess(message); - if (in_q->can_fast_dispatch(message)) { - in_q->fast_dispatch(message); + async_msgr->ms_fast_preprocess(message); + if (async_msgr->ms_can_fast_dispatch(message)) { + async_msgr->ms_fast_dispatch(message); } else { - in_q->enqueue(message, message->get_priority(), conn_id); + async_msgr->ms_deliver_dispatch(message); } state = STATE_OPEN; @@ -696,11 +678,6 @@ fail: << policy.throttler_bytes->get_max() << dendl; policy.throttler_bytes->put(message_size); } - - if (state > STATE_OPEN_MESSAGE_THROTTLE_DISPATCH && - state <= STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH) { - async_msgr->dispatch_throttle_release(message_size); - } } fault(); state = STATE_FAULT; @@ -1009,7 +986,7 @@ int AsyncConnection::_process_connection() } get(); - async_msgr->dispatch_queue.queue_connect(this); + async_msgr->ms_deliver_handle_connect(this); get(); async_msgr->ms_deliver_handle_fast_connect(this); @@ -1448,11 +1425,11 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a if (existing->policy.lossy) { // disconnect from the Connection existing->get(); - async_msgr->dispatch_queue.queue_reset(existing); + async_msgr->ms_deliver_handle_reset(existing); } else { // queue a reset on the new connection, which we're dumping for the old get(); - async_msgr->dispatch_queue.queue_reset(this); + async_msgr->ms_deliver_handle_reset(this); // reset the in_seq if this is a hard reset from peer, // otherwise we respect our original connection's value @@ -1501,7 +1478,7 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a // notify get(); - async_msgr->dispatch_queue.queue_accept(this); + async_msgr->ms_deliver_handle_accept(this); get(); async_msgr->ms_deliver_handle_fast_accept(this); @@ -1642,7 +1619,6 @@ void AsyncConnection::fault() if (policy.lossy && state != STATE_CONNECTING) { ldout(async_msgr->cct, 10) << __func__ << " on lossy channel, failing" << dendl; - in_q->discard_queue(conn_id); _stop(); return ; } @@ -1687,12 +1663,11 @@ void AsyncConnection::fault() void AsyncConnection::was_session_reset() { ldout(async_msgr->cct,10) << __func__ << "was_session_reset" << dendl; - in_q->discard_queue(conn_id); discard_out_queue(); outcoming_bl.clear(); get(); - async_msgr->dispatch_queue.queue_remote_reset(this); + async_msgr->ms_deliver_handle_remote_reset(this); if (randomize_out_seq()) { lsubdout(async_msgr->cct,ms,15) << __func__ << " Could not get random bytes to set seq number for session reset; set seq number to " << out_seq << dendl; @@ -1711,7 +1686,7 @@ void AsyncConnection::_stop() { ldout(async_msgr->cct, 10) << __func__ << dendl; get(); - async_msgr->dispatch_queue.queue_reset(this); + async_msgr->ms_deliver_handle_reset(this); shutdown_socket(); discard_out_queue(); outcoming_bl.clear(); diff --git a/src/msg/AsyncConnection.h b/src/msg/AsyncConnection.h index b97622613ba7..5a472fc2857e 100644 --- a/src/msg/AsyncConnection.h +++ b/src/msg/AsyncConnection.h @@ -143,7 +143,6 @@ class AsyncConnection : public Connection { STATE_OPEN_MESSAGE_HEADER, STATE_OPEN_MESSAGE_THROTTLE_MESSAGE, STATE_OPEN_MESSAGE_THROTTLE_BYTES, - STATE_OPEN_MESSAGE_THROTTLE_DISPATCH, STATE_OPEN_MESSAGE_READ_FRONT, STATE_OPEN_MESSAGE_READ_MIDDLE, STATE_OPEN_MESSAGE_READ_DATA_PREPARE, @@ -182,10 +181,8 @@ class AsyncConnection : public Connection { int state_after_send; int sd; int port; - uint64_t conn_id; Messenger::Policy policy; map > out_q; // priority queue for outbound msgs - DispatchQueue *in_q; list sent; Mutex lock; utime_t backoff; // backoff time diff --git a/src/msg/AsyncMessenger.cc b/src/msg/AsyncMessenger.cc index 7bc8e4cba362..f18c4494354e 100644 --- a/src/msg/AsyncMessenger.cc +++ b/src/msg/AsyncMessenger.cc @@ -266,15 +266,12 @@ AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name, string mname, uint64_t _nonce) : SimplePolicyMessenger(cct, name,mname, _nonce), processor(this, _nonce, ¢er), - nonce(_nonce), lock("AsyncMessenger::lock"), - center(cct), did_bind(false), + nonce(_nonce), did_bind(false), global_seq(0), cluster_protocol(0), local_connection(new AsyncConnection(cct, this)), - dispatch_throttler(cct, string("msgr_dispatch_throttler-") + mname, - cct->_conf->ms_dispatch_throttle_bytes), - dispatch_queue(cct, this) + center(cct) { ceph_spin_init(&global_seq_lock); _init_local_connection(); @@ -292,7 +289,6 @@ AsyncMessenger::~AsyncMessenger() void AsyncMessenger::ready() { ldout(cct,10) << __func__ << " " << get_myaddr() << dendl; - dispatch_queue.start(); lock.Lock(); if (did_bind) @@ -304,7 +300,6 @@ int AsyncMessenger::shutdown() { ldout(cct,10) << __func__ << "shutdown " << get_myaddr() << dendl; mark_down_all(); - dispatch_queue.shutdown(); // break ref cycles on the loopback connection local_connection->set_priv(NULL); @@ -372,12 +367,6 @@ void AsyncMessenger::wait() } lock.Unlock(); - if(dispatch_queue.is_started()) { - ldout(cct,10) << __func__ << ": waiting for dispatch queue" << dendl; - dispatch_queue.wait(); - ldout(cct,10) << __func__ << ": dispatch queue is stopped" << dendl; - } - // done! clean up. if (did_bind) { ldout(cct,20) << __func__ << ": stopping processor thread" << dendl; @@ -503,7 +492,19 @@ void AsyncMessenger::submit_message(Message *m, AsyncConnection *con, if (my_inst.addr == dest_addr) { // local ldout(cct, 20) << __func__ << " " << *m << " local" << dendl; - dispatch_queue.local_delivery(m, m->get_priority()); + m->set_connection(local_connection.get()); + m->set_recv_stamp(ceph_clock_now(cct)); + ms_fast_preprocess(m); + if (ms_can_fast_dispatch(m)) { + ms_fast_dispatch(m); + } else { + if (m->get_priority() >= CEPH_MSG_PRIO_LOW) { + ms_fast_dispatch(m); + } else { + ms_deliver_dispatch(m); + } + } + return; } @@ -550,7 +551,7 @@ void AsyncMessenger::mark_down_all() ldout(cct, 5) << __func__ << " accepting_conn " << p << dendl; p->mark_down(); p->get(); - dispatch_queue.queue_reset(p); + ms_deliver_handle_reset(p); } accepting_conns.clear(); @@ -561,7 +562,7 @@ void AsyncMessenger::mark_down_all() conns.erase(it); p->mark_down(); p->get(); - dispatch_queue.queue_reset(p); + ms_deliver_handle_reset(p); } lock.Unlock(); } @@ -574,7 +575,7 @@ void AsyncMessenger::mark_down(const entity_addr_t& addr) ldout(cct, 1) << __func__ << " " << addr << " -- " << p << dendl; _stop_conn(p); p->get(); - dispatch_queue.queue_reset(p); + ms_deliver_handle_reset(p); } else { ldout(cct, 1) << __func__ << " " << addr << " -- pipe dne" << dendl; } @@ -624,12 +625,3 @@ void AsyncMessenger::learned_addr(const entity_addr_t &peer_addr_for_me) _init_local_connection(); lock.Unlock(); } - -void AsyncMessenger::dispatch_throttle_release(uint64_t msize) { - if (msize) { - ldout(cct,10) << __func__ << " " << msize << " to dispatch throttler " - << dispatch_throttler.get_current() << "/" - << dispatch_throttler.get_max() << dendl; - dispatch_throttler.put(msize); - } -} diff --git a/src/msg/AsyncMessenger.h b/src/msg/AsyncMessenger.h index 55a21eecf42c..f9e9113394b3 100644 --- a/src/msg/AsyncMessenger.h +++ b/src/msg/AsyncMessenger.h @@ -106,11 +106,11 @@ public: void set_addr_unknowns(entity_addr_t& addr); int get_dispatch_queue_len() { - return dispatch_queue.get_queue_len(); + return 0; } double get_dispatch_queue_max_age(utime_t now) { - return dispatch_queue.get_max_age(now); + return 0; } /** @} Accessors */ @@ -280,7 +280,7 @@ private: return p->second; } - void *_stop_conn(AsyncConnection *c) { + void _stop_conn(AsyncConnection *c) { assert(lock.is_locked()); if (c) { c->mark_down(); @@ -301,11 +301,7 @@ public: /// con used for sending messages to ourselves ConnectionRef local_connection; - /// Throttle preventing us from building up a big backlog waiting for dispatch - Throttle dispatch_throttler; - EventCenter center; - DispatchQueue dispatch_queue; /** * @defgroup AsyncMessenger internals @@ -376,13 +372,6 @@ public: _init_local_connection(); } - /** - * Release memory accounting back to the dispatch throttler. - * - * @param msize The amount of memory to release. - */ - void dispatch_throttle_release(uint64_t msize); - /** * @} // AsyncMessenger Internals */ diff --git a/src/msg/Event.cc b/src/msg/Event.cc index d7979712a0a3..6cdd73e89140 100644 --- a/src/msg/Event.cc +++ b/src/msg/Event.cc @@ -1,9 +1,10 @@ // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab #include "common/errno.h" +#include "Event.h" #ifdef HAVE_EPOLL -#include "EventEpoll.h"" +#include "EventEpoll.h" #else #ifdef HAVE_KQUEUE #include "EventKqueue.h" @@ -23,6 +24,7 @@ int EventCenter::init(int n) driver = new KqueueDriver(cct); #else driver = new SelectDriver(cct); +#endif #endif if (!driver) { @@ -41,7 +43,7 @@ int EventCenter::init(int n) return 0; } -EventCenter::~EventCenter(); +EventCenter::~EventCenter() { if (driver) delete driver; @@ -50,41 +52,50 @@ EventCenter::~EventCenter(); int EventCenter::create_event(int fd, int mask, EventCallback *ctxt) { Mutex::Locker l(lock); - if (events.size() > center->nevent) { + if (events.size() > nevent) { lderr(cct) << __func__ << " event count is exceed." << dendl; return -ERANGE; } - int r = driver->add_event(fd, mask); + EventCenter::Event *event = _get_event(fd); + + int r = driver->add_event(fd, event ? event->mask: EVENT_NONE, mask); if (r < 0) return r; - if (events.find(fd) == events.end()) { + if (!event) { events[fd] = EventCenter::Event(); + event = &events[fd]; } - EventCenter::Event *event = &events[fd]; - event->mask |= mask; - if (mask & EVENT_READABLE) + if (mask & EVENT_READABLE) { + if (event->read_cb) + delete event->read_cb; event->read_cb = ctxt; - if (mask & EVENT_WRITABLE) + } + if (mask & EVENT_WRITABLE) { + if (event->write_cb) + delete event->write_cb; event->write_cb = ctxt; + } return 0; } -void delete_event(int fd, int mask) +void EventCenter::delete_event(int fd, int mask) { Mutex::Locker l(lock); - if (event->mask == EVENT_NONE) - return; - driver->del_event(fd, mask); - struct event *event = &events[fd]; + EventCenter::Event *event = _get_event(fd); + driver->del_event(fd, event ? event->mask: EVENT_NONE, mask); + if (!event) { + events[fd] = EventCenter::Event(); + event = &events[fd]; + } - if (mask & EVENT_READABLE) + if (event->read_cb) delete event->read_cb; - if (mask & EVENT_WRITABLE) + if (event->write_cb) delete event->write_cb; event->mask = event->mask & (~mask); @@ -92,10 +103,10 @@ void delete_event(int fd, int mask) events.erase(fd); } -int process_events(int timeout_millionseconds) +int EventCenter::process_events(int timeout_millionseconds) { struct timeval tv; - int j, processed, numevents, mask, fd, rfired; + int j, processed, numevents; if (timeout_millionseconds > 0) { tv.tv_sec = timeout_millionseconds / 1000; @@ -110,7 +121,7 @@ int process_events(int timeout_millionseconds) vector fired_events; numevents = driver->event_wait(fired_events, &tv); for (j = 0; j < numevents; j++) - event_wq.queue(fired_events[i]); + event_wq.queue(fired_events[j]); return numevents; } diff --git a/src/msg/Event.h b/src/msg/Event.h index ac58f0b46381..c2b5ac657c00 100644 --- a/src/msg/Event.h +++ b/src/msg/Event.h @@ -8,7 +8,7 @@ #endif // We use epoll, kqueue, evport, select in descending order by performance. -#ifdef __linux__ +#if defined(__linux__) #define HAVE_EPOLL 1 #endif @@ -48,9 +48,9 @@ class EventDriver { public: virtual ~EventDriver() {} // we want a virtual destructor!!! virtual int init(int nevent) = 0; - virtual int add_event(int fd, int mask) = 0; - virtual void delete_event(int fd, int del_mask) = 0; - virtual int event_wait(FiredEvent &fired_events, struct timeval *tp) = 0; + virtual int add_event(int fd, int cur_mask, int mask) = 0; + virtual void del_event(int fd, int cur_mask, int del_mask) = 0; + virtual int event_wait(vector &fired_events, struct timeval *tp) = 0; }; class EventCallback { @@ -72,16 +72,14 @@ class EventCenter { map events; EventDriver *driver; CephContext *cct; - int nevent; + uint64_t nevent; ThreadPool event_tp; - Event *get_event(int fd) { - Mutex::Locker l(lock); + Event *_get_event(int fd) { map::iterator it = events.find(fd); if (it != events.end()) { return &it->second; } - return NULL; } struct EventWQ : public ThreadPool::WorkQueueVal { @@ -155,8 +153,12 @@ class EventCenter { ~EventCenter(); int init(int nevent); int create_event(int fd, int mask, EventCallback *ctxt); - int delete_event(int fd, int mask); + void delete_event(int fd, int mask); int process_events(int timeout_milliseconds); + Event *get_event(int fd) { + Mutex::Locker l(lock); + return _get_event(fd); + } }; #endif diff --git a/src/msg/EventEpoll.cc b/src/msg/EventEpoll.cc index ef6825bc68e3..ae997374d808 100644 --- a/src/msg/EventEpoll.cc +++ b/src/msg/EventEpoll.cc @@ -1,3 +1,4 @@ +#include "common/errno.h" #include "EventEpoll.h" #define dout_subsys ceph_subsys_ms @@ -7,11 +8,11 @@ int EpollDriver::init(int nevent) { - events = malloc(sizeof(struct epoll_event)*nevent); + events = (struct epoll_event*)malloc(sizeof(struct epoll_event)*nevent); if (!events) { lderr(cct) << __func__ << " unable to malloc memory: " - << cpp_strerror(errno) << dendl; - return -error; + << cpp_strerror(errno) << dendl; + return -errno; } memset(events, 0, sizeof(struct epoll_event)*nevent); @@ -19,13 +20,13 @@ int EpollDriver::init(int nevent) if (epfd == -1) { lderr(cct) << __func__ << " unable to do epoll_create: " << cpp_strerror(errno) << dendl; - return -error; + return -errno; } return 0; } -int EpollDriver::add_event(int fd, int mask) +int EpollDriver::add_event(int fd, int cur_mask, int add_mask) { struct epoll_event ee; /* If the fd was already monitored for some event, we need a MOD @@ -34,30 +35,29 @@ int EpollDriver::add_event(int fd, int mask) map::iterator it = fds.find(fd); if (it == fds.end()) { op = EPOLL_CTL_ADD; - if (deleted_fds.length()) { - pos = deleted.fds.front(); - deleted.fds.pop_front(); + if (deleted_fds.size()) { + pos = deleted_fds.front(); + deleted_fds.pop_front(); } else { fds[fd] = pos = next_pos; next_pos++; } } else { - pos = it->second; - op = events[pos].mask == EVENT_NONE ? EPOLL_CTL_ADD : EPOLL_CTL_MOD; + op = cur_mask == EVENT_NONE ? EPOLL_CTL_ADD: EPOLL_CTL_MOD; } ee.events = 0; - mask |= events[pos].mask; /* Merge old events */ - if (mask & EVENT_READABLE) + add_mask |= cur_mask; /* Merge old events */ + if (add_mask & EVENT_READABLE) ee.events |= EPOLLIN; - if (mask & EVENT_WRITABLE) + if (add_mask & EVENT_WRITABLE) ee.events |= EPOLLOUT; ee.data.u64 = 0; /* avoid valgrind warning */ ee.data.fd = fd; if (epoll_ctl(epfd, op, fd, &ee) == -1) { lderr(cct) << __func__ << " unable to add event: " << cpp_strerror(errno) << dendl; - return -error; + return -errno; } return 0; } @@ -67,7 +67,7 @@ void EpollDriver::del_event(int fd, int cur_mask, int delmask) struct epoll_event ee; map::iterator it = fds.find(fd); if (it == fds.end()) - return 0; + return ; int mask = cur_mask & (~delmask); @@ -91,7 +91,7 @@ void EpollDriver::del_event(int fd, int cur_mask, int delmask) } } -int EpollDriver::event_wait(FiredEvent &fired_events, struct timeval *tvp) +int EpollDriver::event_wait(vector &fired_events, struct timeval *tvp) { int retval, numevents = 0; diff --git a/src/msg/EventEpoll.h b/src/msg/EventEpoll.h index 4513113c866a..0c4305321a83 100644 --- a/src/msg/EventEpoll.h +++ b/src/msg/EventEpoll.h @@ -29,9 +29,9 @@ class EpollDriver : public EventDriver { } int init(int nevent); - int add_event(int fd, int mask); + int add_event(int fd, int cur_mask, int add_mask); void del_event(int fd, int cur_mask, int del_mask); - int event_wait(FiredEvent &fired_events, struct timeval *tp); + int event_wait(vector &fired_events, struct timeval *tp); }; #endif diff --git a/src/msg/simple/DispatchQueue.h b/src/msg/simple/DispatchQueue.h index 006938107c23..5fe17dcf5936 100644 --- a/src/msg/simple/DispatchQueue.h +++ b/src/msg/simple/DispatchQueue.h @@ -65,7 +65,7 @@ class DispatchQueue { }; CephContext *cct; - Messenger *msgr; + SimpleMessenger *msgr; Mutex lock; Cond cond; @@ -191,7 +191,7 @@ class DispatchQueue { void shutdown(); bool is_started() {return dispatch_thread.is_started();} - DispatchQueue(CephContext *cct, Messenger *msgr) + DispatchQueue(CephContext *cct, SimpleMessenger *msgr) : cct(cct), msgr(msgr), lock("SimpleMessenger::DispatchQeueu::lock"), mqueue(cct->_conf->ms_pq_max_tokens_per_priority,