<< m->get_seq() << " " << m << " " << *m
<< dendl;
- // deliver
+ //deliver
if (rank->local_endpoint)
- rank->local_endpoint->queue_message(m);
+ queue_received(m);
else derr(0) << "reader got message " << *m
- << "which isn't local" << dendl;
+ << "but there is no endpoint!" << dendl;
lock.Lock();
}
if (dest_addr.get_erank() == 0 && local_endpoint) {
// local
dout(20) << "submit_message " << *m << " local" << dendl;
- local_endpoint->queue_message(m);
+ local_endpoint->local_delivery(m, m->get_priority());
} else {
derr(0) << "submit_message " << *m << " " << dest_addr << " local but wrong erank? dropping." << dendl;
assert(0); // hmpf, this is probably mds->mon beacon from newsyn.
if ( 1 == queue.size()) {
if (!queue_items.count(priority)) { //create an item for that priority
pair< int, xlist<Pipe *>::item >
- pair_item(p, xlist<Pipe *>::item(this));
+ pair_item(priority, xlist<Pipe *>::item(this));
queue_items.insert(pair_item);
}
rank->local_endpoint->queue_lock.Lock();
- rank->local_endpoint->queued_pipes[p].push_back(&queue_items[p]);
+ rank->local_endpoint->
+ queued_pipes[priority].push_back(&queue_items[priority]);
rank->local_endpoint->queue_lock.Unlock();
}
lock.Unlock();
}
+
+ void queue_received(Message *m) {
+ m->set_recv_stamp(g_clock.now());
+ assert (m->nref.test() == 0);
+ queue_received(m, m->get_priority() );
+ }
__u32 get_out_seq() { return out_seq; }
// messenger interface
class Endpoint : public Messenger {
SimpleMessenger *rank;
+ Pipe *local_pipe;
Mutex lock;
Cond cond;
Mutex queue_lock;
map<int, xlist<Pipe *> > queued_pipes;
map<int, xlist<Pipe *>::iterator> queued_pipe_iters;
- map<int, list<Message*> > dispatch_queue;
bool stop;
int qlen;
int my_rank;
friend class SimpleMessenger;
public:
- void queue_message(Message *m) {
- // set recv stamp
- m->set_recv_stamp(g_clock.now());
-
- assert(m->nref.test() == 0);
-
- lock.Lock();
- qlen++;
- dispatch_queue[m->get_priority()].push_back(m);
- cond.Signal();
- lock.Unlock();
- }
-
enum { D_CONNECT, D_BAD_REMOTE_RESET, D_BAD_RESET };
list<Connection*> connect_q;
list<Connection*> remote_reset_q;
list<Connection*> reset_q;
+ void local_delivery(Message *m, int priority) {
+ local_pipe->queue_received(m, priority);
+ }
+
+ void local_delivery(Message *m) {
+ local_pipe->queue_received(m);
+ }
+
void queue_connect(Connection *con) {
lock.Lock();
connect_q.push_back(con);
- dispatch_queue[CEPH_MSG_PRIO_HIGHEST].push_back((Message*)D_CONNECT);
+ local_delivery((Message*)D_CONNECT, CEPH_MSG_PRIO_HIGHEST);
cond.Signal();
lock.Unlock();
}
void queue_remote_reset(Connection *con) {
lock.Lock();
remote_reset_q.push_back(con);
- dispatch_queue[CEPH_MSG_PRIO_HIGHEST].push_back((Message*)D_BAD_REMOTE_RESET);
+ local_delivery((Message*)D_BAD_REMOTE_RESET, CEPH_MSG_PRIO_HIGHEST);
cond.Signal();
lock.Unlock();
}
void queue_reset(Connection *con) {
lock.Lock();
reset_q.push_back(con);
- dispatch_queue[CEPH_MSG_PRIO_HIGHEST].push_back((Message*)D_BAD_RESET);
+ local_delivery((Message*)D_BAD_RESET, CEPH_MSG_PRIO_HIGHEST);
cond.Signal();
lock.Unlock();
}
stop(false),
qlen(0),
my_rank(rn),
- dispatch_thread(this) { }
- ~Endpoint() { }
+ dispatch_thread(this) {
+ local_pipe = new Pipe(r, Pipe::STATE_OPEN);
+ }
+ ~Endpoint() { delete local_pipe; }
void destroy() {
// join dispatch thread