//move pipe to back of line -- or just take off if no more messages
pipe->pipe_lock.Lock();
list<Message *>& m_queue = pipe->in_q[priority];
- pipe_list.pop_front();
- if (m_queue.size() > 1) {
- pipe_list.push_back(pipe->queue_items[priority]);
+ Message *m = m_queue.front();
+ m_queue.pop_front();
+
+ if (m_queue.empty()) {
+ pipe_list.pop_front(); // pipe is done
+ if (pipe_list.empty())
+ queued_pipes.erase(priority);
+ } else {
+ pipe_list.push_back(pipe->queue_items[priority]); // move to end of list
}
- if (pipe_list.empty())
- queued_pipes.erase(priority);
endpoint_lock.Unlock(); //done with the pipe queue for a while
+
+ pipe->in_qlen--;
qlen_lock.lock();
- --qlen;
+ qlen--;
qlen_lock.unlock();
- //get message from pipe
- Message *m = m_queue.front();
- m_queue.pop_front();
pipe->pipe_lock.Unlock(); // done with the pipe's message queue now
{
if ((long)m == D_BAD_REMOTE_RESET) {
rank->local_endpoint->endpoint_lock.Lock();//to remove from round-robin
for (map<int, xlist<Pipe *>::item* >::iterator i = queue_items.begin();
i != queue_items.end();
- ++i)
+ ++i) {
if ((list_on = i->second->get_xlist())) { //if in round-robin
i->second->remove_myself(); //take off
if (!list_on->size()) //if round-robin queue is empty
rank->local_endpoint->queued_pipes.erase(i->first); //remove from map
}
+ }
rank->local_endpoint->endpoint_lock.Unlock();
endpoint = true;
pipe_lock.Lock();
+
+ // clear queue_items
+ while (!queue_items.empty()) {
+ delete queue_items.begin()->second;
+ queue_items.erase(queue_items.begin());
+ }
+
+ // adjust qlen
+ rank->local_endpoint->qlen_lock.lock();
+ rank->local_endpoint->qlen -= in_qlen;
+ rank->local_endpoint->qlen_lock.unlock();
}
for (list<Message*>::iterator p = sent.begin(); p != sent.end(); p++)
(*p)->put();
for (list<Message*>::iterator r = p->second.begin(); r != p->second.end(); r++)
(*r)->put();
out_q.clear();
- for (map<int,list<Message*> >::iterator p = in_q.begin(); p != in_q.end(); p++) {
- if (endpoint) {
- int size = in_q.size();
- rank->local_endpoint->qlen_lock.lock();
- rank->local_endpoint->qlen -= size;
- rank->local_endpoint->qlen_lock.unlock();
- }
+ for (map<int,list<Message*> >::iterator p = in_q.begin(); p != in_q.end(); p++)
for (list<Message*>::iterator r = p->second.begin(); r != p->second.end(); r++)
delete *r;
- }
in_q.clear();
+ in_qlen = 0;
}
map<int, list<Message*> > out_q; // priority queue for outbound msgs
map<int, list<Message*> > in_q; // and inbound ones
- map<int, xlist<Pipe *>::item* > queue_items; //put in msgr queue
+ int in_qlen;
+ map<int, xlist<Pipe *>::item* > queue_items; // _map_ protected by pipe_lock, *item protected by endpoint_lock.
list<Message*> sent;
Cond cond;
bool keepalive;
state(st),
connection_state(new Connection),
reader_running(false), writer_running(false),
- keepalive(false),
+ in_qlen(0), keepalive(false),
connect_seq(0), peer_global_seq(0),
out_seq(0), in_seq(0), in_seq_acked(0),
reader_thread(this), writer_thread(this) { }
queue_items[priority] = new xlist<Pipe *>::item(this);
pipe_lock.Unlock();
rank->local_endpoint->endpoint_lock.Lock();
- rank->local_endpoint->
- queued_pipes[priority].push_back(queue_items[priority]);
+ if (rank->local_endpoint->queued_pipes.empty())
+ rank->local_endpoint->cond.Signal();
+ rank->local_endpoint->queued_pipes[priority].push_back(queue_items[priority]);
rank->local_endpoint->endpoint_lock.Unlock();
pipe_lock.Lock();
}
pipe_lock.Lock();
bool was_empty = queue.empty();
queue.push_back(m);
- if (was_empty) { //this pipe isn't on the endpoint queue
- if (!queue_items.count(priority)) { //create an item for that priority
- queue_items[priority] = new xlist<Pipe *>::item(this);
- }
+ if (was_empty) //this pipe isn't on the endpoint queue
enqueue_me(priority);
- }
- pipe_lock.Unlock();
- //increment queue length counter
+ //increment queue length counters
+ in_qlen++;
rank->local_endpoint->qlen_lock.lock();
++rank->local_endpoint->qlen;
rank->local_endpoint->qlen_lock.unlock();
- rank->local_endpoint->cond.Signal();
+
+ pipe_lock.Unlock();
}
void queue_received(Message *m) {
m->set_recv_stamp(g_clock.now());
- assert (m->nref.test() == 0);
- queue_received(m, m->get_priority() );
+ assert(m->nref.test() == 0);
+ queue_received(m, m->get_priority());
}
__u32 get_out_seq() { return out_seq; }