assert(pipe_lock.is_locked());
list<Message *>& queue = in_q[priority];
- bool was_empty;
if (halt_delivery)
goto halt;
- was_empty = queue.empty();
- queue.push_back(m);
-
- //increment queue length counters
- in_qlen++;
- messenger->dispatch_queue.qlen_lock.lock();
- ++messenger->dispatch_queue.qlen;
- messenger->dispatch_queue.qlen_lock.unlock();
-
- if (was_empty) { //this pipe isn't on the endpoint queue
+ if (queue.empty()) {
+ // queue pipe AND message under pipe AND dispatch_queue locks.
pipe_lock.Unlock();
messenger->dispatch_queue.lock.Lock();
pipe_lock.Lock();
-
+
if (halt_delivery) {
messenger->dispatch_queue.lock.Unlock();
goto halt;
}
-
- dout(20) << "queue_received queuing pipe" << dendl;
- if (!queue_items.count(priority))
- queue_items[priority] = new xlist<Pipe *>::item(this);
- if (messenger->dispatch_queue.queued_pipes.empty())
- messenger->dispatch_queue.cond.Signal();
- messenger->dispatch_queue.queued_pipes[priority].push_back(queue_items[priority]);
+
+ if (queue.empty()) {
+ dout(20) << "queue_received queuing pipe" << dendl;
+ if (!queue_items.count(priority))
+ queue_items[priority] = new xlist<Pipe *>::item(this);
+ if (messenger->dispatch_queue.queued_pipes.empty())
+ messenger->dispatch_queue.cond.Signal();
+ messenger->dispatch_queue.queued_pipes[priority].push_back(queue_items[priority]);
+ }
+
+ queue.push_back(m);
+
messenger->dispatch_queue.lock.Unlock();
+ } else {
+ // just queue message under pipe lock.
+ queue.push_back(m);
}
+
+ // increment queue length counters
+ in_qlen++;
+ messenger->dispatch_queue.qlen_lock.lock();
+ ++messenger->dispatch_queue.qlen;
+ messenger->dispatch_queue.qlen_lock.unlock();
+
return;
halt: