From c01741421d5009a3e090730fb4d1c30e8d718b10 Mon Sep 17 00:00:00 2001 From: Greg Farnum Date: Mon, 7 Dec 2009 17:58:33 -0800 Subject: [PATCH] msgr: Add queues, etc needed for message delivery by round-robin pipe --- src/msg/SimpleMessenger.h | 27 ++++++++++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h index 7d30bdb0479e7..681f21dae9f6b 100644 --- a/src/msg/SimpleMessenger.h +++ b/src/msg/SimpleMessenger.h @@ -16,6 +16,7 @@ #define __SIMPLEMESSENGER_H #include "include/types.h" +#include "include/xlist.h" #include #include @@ -108,7 +109,9 @@ private: bool reader_running; bool writer_running; - map > out_q; // priority queue + map > out_q; // priority queue for outbound msgs + map > in_q; // and inbound ones + map::item > queue_items; //put in msgr queue list sent; Cond cond; bool keepalive; @@ -192,6 +195,24 @@ private: static const Pipe& Server(int s); static const Pipe& Client(const entity_addr_t& pi); + //we have two queue_received's to allow local signal delivery + // via Message * (that doesn't actually point to a Message) + void queue_received(Message *m, int priority) { + list& queue = in_q[priority]; + lock.Lock(); + queue.push_back(m); + if ( 1 == queue.size()) { + if (!queue_items.count(priority)) { //create an item for that priority + pair< int, xlist::item > + pair_item(p, xlist::item(this)); + queue_items.insert(pair_item); + } + rank->local_endpoint->queue_lock.Lock(); + rank->local_endpoint->queued_pipes[p].push_back(&queue_items[p]); + rank->local_endpoint->queue_lock.Unlock(); + } + lock.Unlock(); + } __u32 get_out_seq() { return out_seq; } @@ -264,6 +285,9 @@ private: SimpleMessenger *rank; Mutex lock; Cond cond; + Mutex queue_lock; + map > queued_pipes; + map::iterator> queued_pipe_iters; map > dispatch_queue; bool stop; int qlen; @@ -328,6 +352,7 @@ private: Messenger(name), rank(r), lock("SimpleMessenger::Endpoint::lock"), + queue_lock("SimpleMessenger::Endpoint:queue_lock"), stop(false), qlen(0), my_rank(rn), -- 2.39.5