ldout(cct,20) << "done calling dispatch on " << m << dendl;
}
+bool DispatchQueue::can_fast_dispatch(Message *m)
+{
+ return msgr->ms_can_fast_dispatch(m);
+}
+
+void DispatchQueue::fast_dispatch(Message *m)
+{
+ uint64_t msize = pre_dispatch(m);
+ msgr->ms_fast_dispatch(m);
+ post_dispatch(m, msize);
+}
+
+void DispatchQueue::fast_preprocess(Message *m)
+{
+ msgr->ms_fast_preprocess(m);
+}
+
void DispatchQueue::enqueue(Message *m, int priority, uint64_t id)
{
+
Mutex::Locker l(lock);
ldout(cct,20) << "queue " << m << " prio " << priority << dendl;
add_arrival(m);
if (priority >= CEPH_MSG_PRIO_LOW) {
mqueue.enqueue_strict(
- id, priority, QueueItem(m));
+ id, priority, QueueItem(m));
} else {
mqueue.enqueue(
- id, priority, m->get_cost(), QueueItem(m));
+ id, priority, m->get_cost(), QueueItem(m));
}
cond.Signal();
}
cond.Signal();
}
+ bool can_fast_dispatch(Message *m);
+ void fast_dispatch(Message *m);
+ void fast_preprocess(Message *m);
void enqueue(Message *m, int priority, uint64_t id);
void discard_queue(uint64_t id);
uint64_t get_id() {
}
virtual ~Dispatcher() { }
- // how i receive messages
+ /**
+ * The Messenger calls this function to query if you are capable
+ * of "fast dispatch"ing a message. Indicating that you can fast
+ * dispatch it requires that you:
+ * 1) Handle the Message quickly and without taking long-term contended
+ * locks. (This function is likely to be called in-line with message
+ * receipt.)
+ * 2) Be able to accept the Message even if you have not yet received
+ * an ms_handle_accept() notification for the Connection it is associated
+ * with, and even if you *have* called mark_down() or received an
+ * ms_handle_reset() (or similar) call on the Connection. You will
+ * not receive more than one dead "message" (and should generally be
+ * prepared for that circumstance anyway, since the normal dispatch can begin,
+ * then trigger Connection failure before it's percolated through your system).
+ * We provide ms_handle_fast_[connect|accept] calls if you need them, under
+ * similar speed and state constraints as fast_dispatch itself.
+ * 3) Be able to make a determination on fast_dispatch without relying
+ * on particular system state -- the ms_can_fast_dispatch() call might
+ * be called multiple times on a single message; the state might change between
+ * calling ms_can_fast_dispatch and ms_fast_dispatch; etc.
+ *
+ * @param m The message we want to fast dispatch.
+ * @returns True if the message can be fast dispatched; false otherwise.
+ */
+ virtual bool ms_can_fast_dispatch(Message *m) const { return false;}
+ /**
+ * This function determines if a dispatcher is included in the
+ * list of fast-dispatch capable Dispatchers.
+ * @returns True if the Dispatcher can handle any messages via
+ * fast dispatch; false otherwise.
+ */
+ virtual bool ms_can_fast_dispatch_any() const { return false; }
+ /**
+ * Perform a "fast dispatch" on a given message. See
+ * ms_can_fast_dispatch() for the requirements.
+ *
+ * @param m The Message to fast dispatch.
+ */
+ virtual void ms_fast_dispatch(Message *m) { assert(0); }
+ /**
+ * Let the Dispatcher preview a Message before it is dispatched. This
+ * function is called on *every* Message, prior to the fast/regular dispatch
+ * decision point, but it is only used on fast-dispatch capable systems. An
+ * implementation of ms_fast_preprocess must be essentially lock-free in the
+ * same way as the ms_fast_dispatch function is (in particular, ms_fast_preprocess
+ * may be called while the Messenger holds internal locks that prevent progress from
+ * other threads, so any locks it takes must be at the very bottom of the hierarchy).
+ * Messages are delivered in receipt order within a single Connection, but there are
+ * no guarantees across Connections. This makes it useful for some limited
+ * coordination between Messages which can be fast_dispatch'ed and those which must
+ * go through normal dispatch.
+ *
+ * @param m A message which has been received
+ */
+ virtual void ms_fast_preprocess(Message *m) {}
+ /**
+ * The Messenger calls this function to deliver a single message.
+ *
+ * @param m The message being delivered. You (the Dispatcher)
+ * are given a single reference count on it.
+ */
virtual bool ms_dispatch(Message *m) = 0;
/**
- * This function will be called whenever a new Connection is made to the
- * Messenger.
+ * This function will be called whenever a Connection is newly-created
+ * or reconnects in the Messenger.
*
* @param con The new Connection which has been established. You are not
* granted a reference to it -- take one if you need one!
*/
virtual void ms_handle_connect(Connection *con) { };
+ /**
+ * This function will be called synchronously whenever a Connection is
+ * newly-created or reconnects in the Messenger, if you support fast
+ * dispatch. It is guaranteed to be called before any messages are
+ * dispatched.
+ *
+ * @param con The new Connection which has been established. You are not
+ * granted a reference to it -- take one if you need one!
+ */
+ virtual void ms_handle_fast_connect(Connection *con) { };
+
/**
* Callback indicating we have accepted an incoming connection.
*
*/
virtual void ms_handle_accept(Connection *con) { };
+ /**
+ * Callback indicating we have accepted an incoming connection, if you
+ * support fast dispatch. It is guaranteed to be called before any messages
+ * are dispatched.
+ *
+ * @param con The (new or existing) Connection associated with the session
+ */
+ virtual void ms_handle_fast_accept(Connection *con) { };
+
/*
* this indicates that the ordered+reliable delivery semantics have
* been violated. Messages may have been lost due to a fault
class Messenger {
private:
list<Dispatcher*> dispatchers;
+ list <Dispatcher*> fast_dispatchers;
protected:
/// the "name" of the local daemon. eg client.99
void add_dispatcher_head(Dispatcher *d) {
bool first = dispatchers.empty();
dispatchers.push_front(d);
+ if (d->ms_can_fast_dispatch_any())
+ fast_dispatchers.push_front(d);
if (first)
ready();
}
void add_dispatcher_tail(Dispatcher *d) {
bool first = dispatchers.empty();
dispatchers.push_back(d);
+ if (d->ms_can_fast_dispatch_any())
+ fast_dispatchers.push_back(d);
if (first)
ready();
}
* @{
*/
public:
+ /**
+ * Determine whether a message can be fast-dispatched. We will
+ * query each Dispatcher in sequence to determine if they are
+ * capable of handling a particular message via "fast dispatch".
+ *
+ * @param m The Message we are testing.
+ */
+ bool ms_can_fast_dispatch(Message *m) {
+ for (list<Dispatcher*>::iterator p = fast_dispatchers.begin();
+ p != fast_dispatchers.end();
+ ++p) {
+ if ((*p)->ms_can_fast_dispatch(m))
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Deliver a single Message via "fast dispatch".
+ *
+ * @param m The Message we are fast dispatching. We take ownership
+ * of one reference to it.
+ */
+ void ms_fast_dispatch(Message *m) {
+ for (list<Dispatcher*>::iterator p = fast_dispatchers.begin();
+ p != fast_dispatchers.end();
+ ++p) {
+ if ((*p)->ms_can_fast_dispatch(m)) {
+ (*p)->ms_fast_dispatch(m);
+ return;
+ }
+ }
+ assert(0);
+ }
+ /**
+ *
+ */
+ void ms_fast_preprocess(Message *m) {
+ for (list<Dispatcher*>::iterator p = fast_dispatchers.begin();
+ p != fast_dispatchers.end();
+ ++p) {
+ (*p)->ms_fast_preprocess(m);
+ }
+ }
/**
* Deliver a single Message. Send it to each Dispatcher
* in sequence until one of them handles it.
}
/**
* Notify each Dispatcher of a new Connection. Call
- * this function whenever a new Connection is initiated.
+ * this function whenever a new Connection is initiated or
+ * reconnects.
*
* @param con Pointer to the new Connection.
*/
(*p)->ms_handle_connect(con);
}
+ /**
+ * Notify each fast Dispatcher of a new Connection. Call
+ * this function whenever a new Connection is initiated or
+ * reconnects.
+ *
+ * @param con Pointer to the new Connection.
+ */
+ void ms_deliver_handle_fast_connect(Connection *con) {
+ for (list<Dispatcher*>::iterator p = fast_dispatchers.begin();
+ p != fast_dispatchers.end();
+ ++p)
+ (*p)->ms_handle_fast_connect(con);
+ }
+
/**
* Notify each Dispatcher of a new incomming Connection. Call
* this function whenever a new Connection is accepted.
(*p)->ms_handle_accept(con);
}
+ /**
+ * Notify each fast Dispatcher of a new incoming Connection. Call
+ * this function whenever a new Connection is accepted.
+ *
+ * @param con Pointer to the new Connection.
+ */
+ void ms_deliver_handle_fast_accept(Connection *con) {
+ for (list<Dispatcher*>::iterator p = fast_dispatchers.begin();
+ p != fast_dispatchers.end();
+ ++p)
+ (*p)->ms_handle_fast_accept(con);
+ }
+
/**
* Notify each Dispatcher of a Connection which may have lost
* Messages. Call this function whenever you detect that a lossy Connection
while (!delay_queue.empty()) {
Message *m = delay_queue.front().second;
delay_queue.pop_front();
- pipe->in_q->enqueue(m, m->get_priority(), pipe->conn_id);
+ if (pipe->in_q->can_fast_dispatch(m)) {
+ delay_lock.Unlock();
+ pipe->in_q->fast_dispatch(m);
+ delay_lock.Lock();
+ } else {
+ pipe->in_q->enqueue(m, m->get_priority(), pipe->conn_id);
+ }
}
}
}
lgeneric_subdout(pipe->msgr->cct, ms, 10) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::entry dequeuing message " << m << " for delivery, past " << release << dendl;
delay_queue.pop_front();
- pipe->in_q->enqueue(m, m->get_priority(), pipe->conn_id);
+ if (pipe->in_q->can_fast_dispatch(m)) {
+ delay_lock.Unlock();
+ pipe->in_q->fast_dispatch(m);
+ delay_lock.Lock();
+ } else {
+ pipe->in_q->enqueue(m, m->get_priority(), pipe->conn_id);
+ }
}
lgeneric_subdout(pipe->msgr->cct, ms, 20) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::entry stop" << dendl;
return NULL;
// notify
msgr->dispatch_queue.queue_accept(connection_state.get());
+ msgr->ms_deliver_handle_fast_accept(connection_state.get());
// ok!
if (msgr->dispatch_queue.stop)
}
msgr->dispatch_queue.queue_connect(connection_state.get());
+ msgr->ms_deliver_handle_fast_connect(connection_state.get());
if (!reader_running) {
ldout(msgr->cct,20) << "connect starting reader" << dendl;
ldout(msgr->cct,10) << "reader got message "
<< m->get_seq() << " " << m << " " << *m
<< dendl;
+ in_q->fast_preprocess(m);
if (delay_thread) {
- utime_t release;
- if (rand() % 10000 < msgr->cct->_conf->ms_inject_delay_probability * 10000.0) {
- release = m->get_recv_stamp();
- release += msgr->cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0;
- lsubdout(msgr->cct, ms, 1) << "queue_received will delay until " << release << " on " << m << " " << *m << dendl;
- }
- delay_thread->queue(release, m);
+ utime_t release;
+ if (rand() % 10000 < msgr->cct->_conf->ms_inject_delay_probability * 10000.0) {
+ release = m->get_recv_stamp();
+ release += msgr->cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0;
+ lsubdout(msgr->cct, ms, 1) << "queue_received will delay until " << release << " on " << m << " " << *m << dendl;
+ }
+ delay_thread->queue(release, m);
} else {
- in_q->enqueue(m, m->get_priority(), conn_id);
+ if (in_q->can_fast_dispatch(m)) {
+ pipe_lock.Unlock();
+ in_q->fast_dispatch(m);
+ pipe_lock.Lock();
+ } else {
+ in_q->enqueue(m, m->get_priority(), conn_id);
+ }
}
- }
+ }
else if (tag == CEPH_MSGR_TAG_CLOSE) {
ldout(msgr->cct,20) << "reader got CLOSE" << dendl;