#define __SIMPLEMESSENGER_H
#include "include/types.h"
+#include "include/xlist.h"
#include <list>
#include <map>
bool reader_running;
bool writer_running;
- map<int, list<Message*> > out_q; // priority queue
+ map<int, list<Message*> > out_q; // priority queue for outbound msgs
+ map<int, list<Message*> > in_q; // and inbound ones
+ map<int, xlist<Pipe *>::item > queue_items; //put in msgr queue
list<Message*> sent;
Cond cond;
bool keepalive;
static const Pipe& Server(int s);
static const Pipe& Client(const entity_addr_t& pi);
+ //we have two queue_received's to allow local signal delivery
+ // via Message * (that doesn't actually point to a Message)
+ void queue_received(Message *m, int priority) {
+ list<Message *>& queue = in_q[priority];
+ lock.Lock();
+ queue.push_back(m);
+ if ( 1 == queue.size()) {
+ if (!queue_items.count(priority)) { //create an item for that priority
+ pair< int, xlist<Pipe *>::item >
+ pair_item(p, xlist<Pipe *>::item(this));
+ queue_items.insert(pair_item);
+ }
+ rank->local_endpoint->queue_lock.Lock();
+ rank->local_endpoint->queued_pipes[p].push_back(&queue_items[p]);
+ rank->local_endpoint->queue_lock.Unlock();
+ }
+ lock.Unlock();
+ }
__u32 get_out_seq() { return out_seq; }
SimpleMessenger *rank;
Mutex lock;
Cond cond;
+ Mutex queue_lock;
+ map<int, xlist<Pipe *> > queued_pipes;
+ map<int, xlist<Pipe *>::iterator> queued_pipe_iters;
map<int, list<Message*> > dispatch_queue;
bool stop;
int qlen;
Messenger(name),
rank(r),
lock("SimpleMessenger::Endpoint::lock"),
+ queue_lock("SimpleMessenger::Endpoint:queue_lock"),
stop(false),
qlen(0),
my_rank(rn),