void SimpleMessenger::Endpoint::dispatch_entry()
{
- dout(0) << "entered SimpleMessenger::Endpoint::dispatch_entry" << dendl;
endpoint_lock.Lock();
while (!stop) {
- dout(0) << "in outer !stop loop of SimpleMessenger::Endpoint::dispatch_entry" << dendl;
while (!queued_pipes.empty()) {
- dout(0) << "SimpleMessenger::Endpoint::dispatch_entry delivering a message qlen " << qlen << dendl;
//get highest-priority pipe
map<int, xlist<Pipe *> >::reverse_iterator high_iter =
queued_pipes.rbegin();
xlist<Pipe *>& pipe_list = high_iter->second;
Pipe *pipe = pipe_list.front();
- dout(0) << "high priority: " << priority << " taking pipe " << pipe << dendl;
//move pipe to back of line -- or just take off if no more messages
pipe->pipe_lock.Lock();
list<Message *>& m_queue = pipe->in_q[priority];
pipe_list.pop_front();
if (m_queue.size() > 1) {
pipe_list.push_back(pipe->queue_items[priority]);
- dout(0) << "requeuing pipe because there are more messages" << dendl;
}
if (pipe_list.empty())
queued_pipes.erase(priority);
//we have two queue_received's to allow local signal delivery
// via Message * (that doesn't actually point to a Message)
void queue_received(Message *m, int priority) {
- dout(0) << "queuing received message " << m << "in msgr " << rank << dendl;
list<Message *>& queue = in_q[priority];
pipe_lock.Lock();
++rank->local_endpoint->qlen;
rank->local_endpoint->qlen_lock.unlock();
rank->local_endpoint->cond.Signal();
- dout(0) << "finished queuing received message " << m << "in msgr " << rank << dendl;
}
void queue_received(Message *m) {