]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msgr: Add queues, etc needed for message delivery by round-robin pipe
authorGreg Farnum <gregf@hq.newdream.net>
Tue, 8 Dec 2009 01:58:33 +0000 (17:58 -0800)
committerGreg Farnum <gregf@hq.newdream.net>
Thu, 10 Dec 2009 22:33:31 +0000 (14:33 -0800)
src/msg/SimpleMessenger.h

index 7d30bdb0479e713f94598360e9eb616b1390f730..681f21dae9f6b10f4d7eb233bd124c5a21f33b89 100644 (file)
@@ -16,6 +16,7 @@
 #define __SIMPLEMESSENGER_H
 
 #include "include/types.h"
+#include "include/xlist.h"
 
 #include <list>
 #include <map>
@@ -108,7 +109,9 @@ private:
     bool reader_running;
     bool writer_running;
 
-    map<int, list<Message*> > out_q;  // priority queue
+    map<int, list<Message*> > out_q;  // priority queue for outbound msgs
+    map<int, list<Message*> > in_q; // and inbound ones
+    map<int, xlist<Pipe *>::item > queue_items; //put in msgr queue
     list<Message*> sent;
     Cond cond;
     bool keepalive;
@@ -192,6 +195,24 @@ private:
     static const Pipe& Server(int s);
     static const Pipe& Client(const entity_addr_t& pi);
 
+    //we have two queue_received's to allow local signal delivery
+    // via Message * (that doesn't actually point to a Message)
+    void queue_received(Message *m, int priority) {
+      list<Message *>& queue = in_q[priority];
+      lock.Lock();
+      queue.push_back(m);
+      if ( 1 == queue.size()) {
+       if (!queue_items.count(priority)) { //create an item for that priority
+         pair< int, xlist<Pipe *>::item >
+           pair_item(p, xlist<Pipe *>::item(this));
+         queue_items.insert(pair_item);
+       }
+       rank->local_endpoint->queue_lock.Lock();
+       rank->local_endpoint->queued_pipes[p].push_back(&queue_items[p]);
+       rank->local_endpoint->queue_lock.Unlock();
+      }
+      lock.Unlock();
+    }
 
     __u32 get_out_seq() { return out_seq; }
 
@@ -264,6 +285,9 @@ private:
     SimpleMessenger *rank;
     Mutex lock;
     Cond cond;
+    Mutex queue_lock;
+    map<int, xlist<Pipe *> > queued_pipes;
+    map<int, xlist<Pipe *>::iterator> queued_pipe_iters;
     map<int, list<Message*> > dispatch_queue;
     bool stop;
     int qlen;
@@ -328,6 +352,7 @@ private:
       Messenger(name),
       rank(r),
       lock("SimpleMessenger::Endpoint::lock"),
+      queue_lock("SimpleMessenger::Endpoint:queue_lock"),
       stop(false),
       qlen(0),
       my_rank(rn),