]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msgr: move dispatch_entry into DispatchQueue class
authorSage Weil <sage@inktank.com>
Fri, 29 Jun 2012 00:45:24 +0000 (17:45 -0700)
committerSage Weil <sage@inktank.com>
Tue, 3 Jul 2012 00:54:00 +0000 (17:54 -0700)
A bit cleaner.

Signed-off-by: Sage Weil <sage@inktank.com>
src/msg/SimpleMessenger.cc
src/msg/SimpleMessenger.h

index 6ffed959a5ceed711f3118f9615414269a7399cf..6852fc3acfd0b1b5834bee4734eab5a7d983baa0 100644 (file)
@@ -291,14 +291,14 @@ void SimpleMessenger::Accepter::stop()
  * end of the queue. If the queue is empty; it's removed.
  * The message is then delivered and the process starts again.
  */
-void SimpleMessenger::dispatch_entry()
+void SimpleMessenger::DispatchQueue::entry()
 {
-  dispatch_queue.lock.Lock();
-  while (!dispatch_queue.stop) {
-    while (!dispatch_queue.queued_pipes.empty() && !dispatch_queue.stop) {
+  lock.Lock();
+  while (!stop) {
+    while (!queued_pipes.empty() && !stop) {
       //get highest-priority pipe
       map<int, xlist<IncomingQueue *>* >::reverse_iterator high_iter =
-       dispatch_queue.queued_pipes.rbegin();
+       queued_pipes.rbegin();
       int priority = high_iter->first;
       xlist<IncomingQueue *> *qlist = high_iter->second;
 
@@ -313,7 +313,7 @@ void SimpleMessenger::dispatch_entry()
        qlist->pop_front();  // pipe is done
        if (qlist->empty()) {
          delete qlist;
-         dispatch_queue.queued_pipes.erase(priority);
+         queued_pipes.erase(priority);
        }
        inq->in_q.erase(priority);
        ldout(cct,20) << "dispatch_entry inq " << inq << " pipe " << inq->pipe << " dequeued " << m
@@ -323,33 +323,33 @@ void SimpleMessenger::dispatch_entry()
                      << ", moved to end of list" << dendl;
        qlist->push_back(inq->queue_items[priority]);  // move to end of list
       }
-      dispatch_queue.lock.Unlock(); //done with the pipe queue for a while
+      lock.Unlock(); //done with the pipe queue for a while
 
       inq->in_qlen--;
-      dispatch_queue.qlen.dec();
+      qlen.dec();
 
       inq->lock.Unlock(); // done with the pipe's message queue now
       {
        if ((long)m == DispatchQueue::D_BAD_REMOTE_RESET) {
-         dispatch_queue.lock.Lock();
-         Connection *con = dispatch_queue.remote_reset_q.front();
-         dispatch_queue.remote_reset_q.pop_front();
-         dispatch_queue.lock.Unlock();
-         ms_deliver_handle_remote_reset(con);
+         lock.Lock();
+         Connection *con = remote_reset_q.front();
+         remote_reset_q.pop_front();
+         lock.Unlock();
+         msgr->ms_deliver_handle_remote_reset(con);
          con->put();
        } else if ((long)m == DispatchQueue::D_CONNECT) {
-         dispatch_queue.lock.Lock();
-         Connection *con = dispatch_queue.connect_q.front();
-         dispatch_queue.connect_q.pop_front();
-         dispatch_queue.lock.Unlock();
-         ms_deliver_handle_connect(con);
+         lock.Lock();
+         Connection *con = connect_q.front();
+         connect_q.pop_front();
+         lock.Unlock();
+         msgr->ms_deliver_handle_connect(con);
          con->put();
        } else if ((long)m == DispatchQueue::D_BAD_RESET) {
-         dispatch_queue.lock.Lock();
-         Connection *con = dispatch_queue.reset_q.front();
-         dispatch_queue.reset_q.pop_front();
-         dispatch_queue.lock.Unlock();
-         ms_deliver_handle_reset(con);
+         lock.Lock();
+         Connection *con = reset_q.front();
+         reset_q.pop_front();
+         lock.Unlock();
+         msgr->ms_deliver_handle_reset(con);
          con->put();
        } else {
          uint64_t msize = m->get_dispatch_throttle_size();
@@ -364,19 +364,24 @@ void SimpleMessenger::dispatch_entry()
                  << " " << m->get_footer().data_crc << ")"
                  << " " << m << " con " << m->get_connection()
                  << dendl;
-         ms_deliver_dispatch(m);
+         msgr->ms_deliver_dispatch(m);
 
-         dispatch_throttle_release(msize);
+         msgr->dispatch_throttle_release(msize);
 
          ldout(cct,20) << "done calling dispatch on " << m << dendl;
        }
       }
-      dispatch_queue.lock.Lock();
+      lock.Lock();
     }
-    if (!dispatch_queue.stop)
-      dispatch_queue.cond.Wait(dispatch_queue.lock); //wait for something to be put on queue
+    if (!stop)
+      cond.Wait(lock); //wait for something to be put on queue
   }
-  dispatch_queue.lock.Unlock();
+  lock.Unlock();
+}
+
+void SimpleMessenger::dispatch_entry()
+{
+  dispatch_queue.entry();
 
   //tell everything else it's time to stop
   lock.Lock();
index 38cd3f76c61bc5e1d2744f94cc33b3def24114f0..9eaa5c5f8a4870896072ec4d69d09f8a487208a1 100644 (file)
@@ -64,6 +64,7 @@ public:
                  string mname, uint64_t _nonce) :
     Messenger(cct, name),
     accepter(this),
+    dispatch_queue(cct, this),
     reaper_thread(this),
     dispatch_thread(this),
     my_type(name.type()),
@@ -716,6 +717,8 @@ private:
    * See SimpleMessenger::dispatch_entry for details.
    */
   struct DispatchQueue {
+    CephContext *cct;
+    SimpleMessenger *msgr;
     Mutex lock;
     Cond cond;
     bool stop;
@@ -761,11 +764,14 @@ private:
       local_delivery((Message*)D_BAD_RESET, CEPH_MSG_PRIO_HIGHEST);
     }
 
-    DispatchQueue() :
-      lock("SimpleMessenger::DispatchQeueu::lock"), 
-      stop(false),
-      qlen(0),
-      local_pipe(NULL)
+    void entry();
+
+    DispatchQueue(CephContext *cct, SimpleMessenger *msgr)
+      : cct(cct), msgr(msgr),
+       lock("SimpleMessenger::DispatchQeueu::lock"), 
+       stop(false),
+       qlen(0),
+       local_pipe(NULL)
     {}
     ~DispatchQueue() {
       for (map< int, xlist<IncomingQueue *>* >::iterator i = queued_pipes.begin();