From: Greg Farnum Date: Fri, 4 Apr 2014 20:55:54 +0000 (-0700) Subject: msgr: enable fast_dispatch on local connections X-Git-Tag: v0.81~57^2~39 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=69fc6b2b66a45c13c3ce4b65cf4ba2eecfdcf668;p=ceph.git msgr: enable fast_dispatch on local connections We do two things: 1) Call ms_handle_fast_connect() when setting up the local connection, so the Dispatcher can set up any state it needs 2)Move local_delivery into a separate thread from the sender's. fast_dispatch makes this entirely necessary since otherwise we're dipping back in to the Dispatcher while holding whatever locks it held when it sent the Message. Implementation starts with a thread and a list of messages to process and proceeds as you'd expect from that. Signed-off-by: Greg Farnum --- diff --git a/src/msg/DispatchQueue.cc b/src/msg/DispatchQueue.cc index d72d1cd1b166..12c48465ed3f 100644 --- a/src/msg/DispatchQueue.cc +++ b/src/msg/DispatchQueue.cc @@ -95,18 +95,48 @@ void DispatchQueue::enqueue(Message *m, int priority, uint64_t id) void DispatchQueue::local_delivery(Message *m, int priority) { - Mutex::Locker l(lock); m->set_connection(msgr->local_connection.get()); m->set_recv_stamp(ceph_clock_now(msgr->cct)); - add_arrival(m); - if (priority >= CEPH_MSG_PRIO_LOW) { - mqueue.enqueue_strict( - 0, priority, QueueItem(m)); - } else { - mqueue.enqueue( - 0, priority, m->get_cost(), QueueItem(m)); + Mutex::Locker l(local_delivery_lock); + if (local_messages.empty()) + local_delivery_cond.Signal(); + local_messages.push_back(make_pair(m, priority)); + return; +} + +void DispatchQueue::run_local_delivery() +{ + local_delivery_lock.Lock(); + while (true) { + if (stop_local_delivery) + break; + if (local_messages.empty()) { + local_delivery_cond.Wait(local_delivery_lock); + continue; + } + pair mp = local_messages.front(); + local_messages.pop_front(); + local_delivery_lock.Unlock(); + Message *m = mp.first; + int priority = mp.second; + fast_preprocess(m); + if (can_fast_dispatch(m)) { + fast_dispatch(m); + } else { + Mutex::Locker l(lock); + add_arrival(m); + if (priority >= CEPH_MSG_PRIO_LOW) { + mqueue.enqueue_strict( + 0, priority, QueueItem(m)); + } else { + mqueue.enqueue( + 0, priority, m->get_cost(), QueueItem(m)); + } + cond.Signal(); + } + local_delivery_lock.Lock(); } - cond.Signal(); + local_delivery_lock.Unlock(); } /* @@ -188,15 +218,23 @@ void DispatchQueue::start() assert(!stop); assert(!dispatch_thread.is_started()); dispatch_thread.create(); + local_delivery_thread.create(); } void DispatchQueue::wait() { + local_delivery_thread.join(); dispatch_thread.join(); } void DispatchQueue::shutdown() { + // stop my local delivery thread + local_delivery_lock.Lock(); + stop_local_delivery = true; + local_delivery_cond.Signal(); + local_delivery_lock.Unlock(); + // stop my dispatch thread lock.Lock(); stop = true; diff --git a/src/msg/DispatchQueue.h b/src/msg/DispatchQueue.h index 19bfe40edd4d..5fe17dcf5936 100644 --- a/src/msg/DispatchQueue.h +++ b/src/msg/DispatchQueue.h @@ -106,12 +106,27 @@ class DispatchQueue { } } dispatch_thread; + Mutex local_delivery_lock; + Cond local_delivery_cond; + bool stop_local_delivery; + list > local_messages; + class LocalDeliveryThread : public Thread { + DispatchQueue *dq; + public: + LocalDeliveryThread(DispatchQueue *dq) : dq(dq) {} + void *entry() { + dq->run_local_delivery(); + return 0; + } + } local_delivery_thread; + uint64_t pre_dispatch(Message *m); void post_dispatch(Message *m, uint64_t msize); public: bool stop; void local_delivery(Message *m, int priority); + void run_local_delivery(); double get_max_age(utime_t now); @@ -183,6 +198,9 @@ class DispatchQueue { cct->_conf->ms_pq_min_cost), next_pipe_id(1), dispatch_thread(this), + local_delivery_lock("SimpleMessenger::DispatchQueue::local_delivery_lock"), + stop_local_delivery(false), + local_delivery_thread(this), stop(false) {} }; diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc index 2070fe591240..b612fcfe0e62 100644 --- a/src/msg/SimpleMessenger.cc +++ b/src/msg/SimpleMessenger.cc @@ -731,4 +731,5 @@ void SimpleMessenger::init_local_connection() { local_connection->peer_addr = my_inst.addr; local_connection->peer_type = my_type; + ms_deliver_handle_fast_connect(local_connection.get()); }