From 4e20ce19619bc90353e8206c558c24d229c6abf2 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Thu, 7 Nov 2013 13:46:08 -0800 Subject: [PATCH] Messenger,DispatchQueue: add ms_fast_dispatch mechanism This adds a Dispatcher interface allowing the implementation to accept ms_fast_dispatch calls for some messages without going through the DispatchQueue. To support that, we also add 1) new synchronous notifications on connect and accept events 2) a fast_preprocess mechanism Signed-off-by: Samuel Just Signed-off-by: Greg Farnum --- src/msg/DispatchQueue.cc | 22 +++++++++- src/msg/DispatchQueue.h | 3 ++ src/msg/Dispatcher.h | 86 ++++++++++++++++++++++++++++++++++++++-- src/msg/Messenger.h | 79 +++++++++++++++++++++++++++++++++++- src/msg/Pipe.cc | 43 +++++++++++++++----- 5 files changed, 216 insertions(+), 17 deletions(-) diff --git a/src/msg/DispatchQueue.cc b/src/msg/DispatchQueue.cc index 9489a40490dd0..d72d1cd1b1660 100644 --- a/src/msg/DispatchQueue.cc +++ b/src/msg/DispatchQueue.cc @@ -60,17 +60,35 @@ void DispatchQueue::post_dispatch(Message *m, uint64_t msize) ldout(cct,20) << "done calling dispatch on " << m << dendl; } +bool DispatchQueue::can_fast_dispatch(Message *m) +{ + return msgr->ms_can_fast_dispatch(m); +} + +void DispatchQueue::fast_dispatch(Message *m) +{ + uint64_t msize = pre_dispatch(m); + msgr->ms_fast_dispatch(m); + post_dispatch(m, msize); +} + +void DispatchQueue::fast_preprocess(Message *m) +{ + msgr->ms_fast_preprocess(m); +} + void DispatchQueue::enqueue(Message *m, int priority, uint64_t id) { + Mutex::Locker l(lock); ldout(cct,20) << "queue " << m << " prio " << priority << dendl; add_arrival(m); if (priority >= CEPH_MSG_PRIO_LOW) { mqueue.enqueue_strict( - id, priority, QueueItem(m)); + id, priority, QueueItem(m)); } else { mqueue.enqueue( - id, priority, m->get_cost(), QueueItem(m)); + id, priority, m->get_cost(), QueueItem(m)); } cond.Signal(); } diff --git a/src/msg/DispatchQueue.h b/src/msg/DispatchQueue.h index 5b0decdd984b8..19bfe40edd4d7 100644 --- a/src/msg/DispatchQueue.h +++ b/src/msg/DispatchQueue.h @@ -161,6 +161,9 @@ class DispatchQueue { cond.Signal(); } + bool can_fast_dispatch(Message *m); + void fast_dispatch(Message *m); + void fast_preprocess(Message *m); void enqueue(Message *m, int priority, uint64_t id); void discard_queue(uint64_t id); uint64_t get_id() { diff --git a/src/msg/Dispatcher.h b/src/msg/Dispatcher.h index 9f837cd6fdbc7..864e9e1ed2891 100644 --- a/src/msg/Dispatcher.h +++ b/src/msg/Dispatcher.h @@ -30,18 +30,89 @@ public: } virtual ~Dispatcher() { } - // how i receive messages + /** + * The Messenger calls this function to query if you are capable + * of "fast dispatch"ing a message. Indicating that you can fast + * dispatch it requires that you: + * 1) Handle the Message quickly and without taking long-term contended + * locks. (This function is likely to be called in-line with message + * receipt.) + * 2) Be able to accept the Message even if you have not yet received + * an ms_handle_accept() notification for the Connection it is associated + * with, and even if you *have* called mark_down() or received an + * ms_handle_reset() (or similar) call on the Connection. You will + * not receive more than one dead "message" (and should generally be + * prepared for that circumstance anyway, since the normal dispatch can begin, + * then trigger Connection failure before it's percolated through your system). + * We provide ms_handle_fast_[connect|accept] calls if you need them, under + * similar speed and state constraints as fast_dispatch itself. + * 3) Be able to make a determination on fast_dispatch without relying + * on particular system state -- the ms_can_fast_dispatch() call might + * be called multiple times on a single message; the state might change between + * calling ms_can_fast_dispatch and ms_fast_dispatch; etc. + * + * @param m The message we want to fast dispatch. + * @returns True if the message can be fast dispatched; false otherwise. + */ + virtual bool ms_can_fast_dispatch(Message *m) const { return false;} + /** + * This function determines if a dispatcher is included in the + * list of fast-dispatch capable Dispatchers. + * @returns True if the Dispatcher can handle any messages via + * fast dispatch; false otherwise. + */ + virtual bool ms_can_fast_dispatch_any() const { return false; } + /** + * Perform a "fast dispatch" on a given message. See + * ms_can_fast_dispatch() for the requirements. + * + * @param m The Message to fast dispatch. + */ + virtual void ms_fast_dispatch(Message *m) { assert(0); } + /** + * Let the Dispatcher preview a Message before it is dispatched. This + * function is called on *every* Message, prior to the fast/regular dispatch + * decision point, but it is only used on fast-dispatch capable systems. An + * implementation of ms_fast_preprocess must be essentially lock-free in the + * same way as the ms_fast_dispatch function is (in particular, ms_fast_preprocess + * may be called while the Messenger holds internal locks that prevent progress from + * other threads, so any locks it takes must be at the very bottom of the hierarchy). + * Messages are delivered in receipt order within a single Connection, but there are + * no guarantees across Connections. This makes it useful for some limited + * coordination between Messages which can be fast_dispatch'ed and those which must + * go through normal dispatch. + * + * @param m A message which has been received + */ + virtual void ms_fast_preprocess(Message *m) {} + /** + * The Messenger calls this function to deliver a single message. + * + * @param m The message being delivered. You (the Dispatcher) + * are given a single reference count on it. + */ virtual bool ms_dispatch(Message *m) = 0; /** - * This function will be called whenever a new Connection is made to the - * Messenger. + * This function will be called whenever a Connection is newly-created + * or reconnects in the Messenger. * * @param con The new Connection which has been established. You are not * granted a reference to it -- take one if you need one! */ virtual void ms_handle_connect(Connection *con) { }; + /** + * This function will be called synchronously whenever a Connection is + * newly-created or reconnects in the Messenger, if you support fast + * dispatch. It is guaranteed to be called before any messages are + * dispatched. + * + * @param con The new Connection which has been established. You are not + * granted a reference to it -- take one if you need one! + */ + virtual void ms_handle_fast_connect(Connection *con) { }; + /** * Callback indicating we have accepted an incoming connection. * @@ -49,6 +120,15 @@ public: */ virtual void ms_handle_accept(Connection *con) { }; + /** + * Callback indicating we have accepted an incoming connection, if you + * support fast dispatch. It is guaranteed to be called before any messages + * are dispatched. + * + * @param con The (new or existing) Connection associated with the session + */ + virtual void ms_handle_fast_accept(Connection *con) { }; + /* * this indicates that the ordered+reliable delivery semantics have * been violated. Messages may have been lost due to a fault diff --git a/src/msg/Messenger.h b/src/msg/Messenger.h index 42feaf227dfd8..46632718a1215 100644 --- a/src/msg/Messenger.h +++ b/src/msg/Messenger.h @@ -39,6 +39,7 @@ class Timer; class Messenger { private: list dispatchers; + list fast_dispatchers; protected: /// the "name" of the local daemon. eg client.99 @@ -301,6 +302,8 @@ public: void add_dispatcher_head(Dispatcher *d) { bool first = dispatchers.empty(); dispatchers.push_front(d); + if (d->ms_can_fast_dispatch_any()) + fast_dispatchers.push_front(d); if (first) ready(); } @@ -314,6 +317,8 @@ public: void add_dispatcher_tail(Dispatcher *d) { bool first = dispatchers.empty(); dispatchers.push_back(d); + if (d->ms_can_fast_dispatch_any()) + fast_dispatchers.push_back(d); if (first) ready(); } @@ -571,6 +576,50 @@ protected: * @{ */ public: + /** + * Determine whether a message can be fast-dispatched. We will + * query each Dispatcher in sequence to determine if they are + * capable of handling a particular message via "fast dispatch". + * + * @param m The Message we are testing. + */ + bool ms_can_fast_dispatch(Message *m) { + for (list::iterator p = fast_dispatchers.begin(); + p != fast_dispatchers.end(); + ++p) { + if ((*p)->ms_can_fast_dispatch(m)) + return true; + } + return false; + } + + /** + * Deliver a single Message via "fast dispatch". + * + * @param m The Message we are fast dispatching. We take ownership + * of one reference to it. + */ + void ms_fast_dispatch(Message *m) { + for (list::iterator p = fast_dispatchers.begin(); + p != fast_dispatchers.end(); + ++p) { + if ((*p)->ms_can_fast_dispatch(m)) { + (*p)->ms_fast_dispatch(m); + return; + } + } + assert(0); + } + /** + * + */ + void ms_fast_preprocess(Message *m) { + for (list::iterator p = fast_dispatchers.begin(); + p != fast_dispatchers.end(); + ++p) { + (*p)->ms_fast_preprocess(m); + } + } /** * Deliver a single Message. Send it to each Dispatcher * in sequence until one of them handles it. @@ -594,7 +643,8 @@ public: } /** * Notify each Dispatcher of a new Connection. Call - * this function whenever a new Connection is initiated. + * this function whenever a new Connection is initiated or + * reconnects. * * @param con Pointer to the new Connection. */ @@ -605,6 +655,20 @@ public: (*p)->ms_handle_connect(con); } + /** + * Notify each fast Dispatcher of a new Connection. Call + * this function whenever a new Connection is initiated or + * reconnects. + * + * @param con Pointer to the new Connection. + */ + void ms_deliver_handle_fast_connect(Connection *con) { + for (list::iterator p = fast_dispatchers.begin(); + p != fast_dispatchers.end(); + ++p) + (*p)->ms_handle_fast_connect(con); + } + /** * Notify each Dispatcher of a new incomming Connection. Call * this function whenever a new Connection is accepted. @@ -618,6 +682,19 @@ public: (*p)->ms_handle_accept(con); } + /** + * Notify each fast Dispatcher of a new incoming Connection. Call + * this function whenever a new Connection is accepted. + * + * @param con Pointer to the new Connection. + */ + void ms_deliver_handle_fast_accept(Connection *con) { + for (list::iterator p = fast_dispatchers.begin(); + p != fast_dispatchers.end(); + ++p) + (*p)->ms_handle_fast_accept(con); + } + /** * Notify each Dispatcher of a Connection which may have lost * Messages. Call this function whenever you detect that a lossy Connection diff --git a/src/msg/Pipe.cc b/src/msg/Pipe.cc index 0693c09f12211..fd2fbcef70ee4 100644 --- a/src/msg/Pipe.cc +++ b/src/msg/Pipe.cc @@ -194,7 +194,13 @@ void Pipe::DelayedDelivery::flush() while (!delay_queue.empty()) { Message *m = delay_queue.front().second; delay_queue.pop_front(); - pipe->in_q->enqueue(m, m->get_priority(), pipe->conn_id); + if (pipe->in_q->can_fast_dispatch(m)) { + delay_lock.Unlock(); + pipe->in_q->fast_dispatch(m); + delay_lock.Lock(); + } else { + pipe->in_q->enqueue(m, m->get_priority(), pipe->conn_id); + } } } @@ -220,7 +226,13 @@ void *Pipe::DelayedDelivery::entry() } lgeneric_subdout(pipe->msgr->cct, ms, 10) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::entry dequeuing message " << m << " for delivery, past " << release << dendl; delay_queue.pop_front(); - pipe->in_q->enqueue(m, m->get_priority(), pipe->conn_id); + if (pipe->in_q->can_fast_dispatch(m)) { + delay_lock.Unlock(); + pipe->in_q->fast_dispatch(m); + delay_lock.Lock(); + } else { + pipe->in_q->enqueue(m, m->get_priority(), pipe->conn_id); + } } lgeneric_subdout(pipe->msgr->cct, ms, 20) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::entry stop" << dendl; return NULL; @@ -662,6 +674,7 @@ int Pipe::accept() // notify msgr->dispatch_queue.queue_accept(connection_state.get()); + msgr->ms_deliver_handle_fast_accept(connection_state.get()); // ok! if (msgr->dispatch_queue.stop) @@ -1107,6 +1120,7 @@ int Pipe::connect() } msgr->dispatch_queue.queue_connect(connection_state.get()); + msgr->ms_deliver_handle_fast_connect(connection_state.get()); if (!reader_running) { ldout(msgr->cct,20) << "connect starting reader" << dendl; @@ -1514,19 +1528,26 @@ void Pipe::reader() ldout(msgr->cct,10) << "reader got message " << m->get_seq() << " " << m << " " << *m << dendl; + in_q->fast_preprocess(m); if (delay_thread) { - utime_t release; - if (rand() % 10000 < msgr->cct->_conf->ms_inject_delay_probability * 10000.0) { - release = m->get_recv_stamp(); - release += msgr->cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0; - lsubdout(msgr->cct, ms, 1) << "queue_received will delay until " << release << " on " << m << " " << *m << dendl; - } - delay_thread->queue(release, m); + utime_t release; + if (rand() % 10000 < msgr->cct->_conf->ms_inject_delay_probability * 10000.0) { + release = m->get_recv_stamp(); + release += msgr->cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0; + lsubdout(msgr->cct, ms, 1) << "queue_received will delay until " << release << " on " << m << " " << *m << dendl; + } + delay_thread->queue(release, m); } else { - in_q->enqueue(m, m->get_priority(), conn_id); + if (in_q->can_fast_dispatch(m)) { + pipe_lock.Unlock(); + in_q->fast_dispatch(m); + pipe_lock.Lock(); + } else { + in_q->enqueue(m, m->get_priority(), conn_id); + } } - } + } else if (tag == CEPH_MSGR_TAG_CLOSE) { ldout(msgr->cct,20) << "reader got CLOSE" << dendl; -- 2.39.5