void SimpleMessenger::Pipe::discard_queue()
{
dout(10) << "discard_queue" << dendl;
- DispatchQueue& q = messenger->dispatch_queue;
+
halt_delivery = true;
+
+ // dequeue pipe
+ DispatchQueue& q = messenger->dispatch_queue;
pipe_lock.Unlock();
- xlist<Pipe *>* list_on;
- q.lock.Lock();//to remove from round-robin
+ q.lock.Lock();
pipe_lock.Lock();
for (map<int, xlist<Pipe *>::item* >::iterator i = queue_items.begin();
i != queue_items.end();
++i) {
+ xlist<Pipe *>* list_on;
if ((list_on = i->second->get_list())) { //if in round-robin
i->second->remove_myself(); //take off
if (list_on->empty()) //if round-robin queue is empty
q.queued_pipes.erase(i->first); //remove from map
}
}
- q.lock.Unlock();
// clear queue_items
while (!queue_items.empty()) {
queue_items.erase(queue_items.begin());
}
+ q.lock.Unlock();
+
// adjust qlen
q.qlen_lock.lock();
q.qlen -= in_qlen;
map<int, list<Message*> > out_q; // priority queue for outbound msgs
map<int, list<Message*> > in_q; // and inbound ones
int in_qlen;
- map<int, xlist<Pipe *>::item* > queue_items; // _map_ protected by pipe_lock, *item protected by q.lock
+ map<int, xlist<Pipe *>::item* > queue_items; // protected by pipe_lock AND q.lock
list<Message*> sent;
Cond cond;
bool keepalive;
for (map<int, xlist<Pipe *>::item* >::iterator i = queue_items.begin();
i != queue_items.end();
++i) {
- if (i->second->is_on_list())
- i->second->remove_myself();
+ assert(!i->second->is_on_list());
delete i->second;
}
assert(out_q.empty());
//Also, call with pipe_lock held or bad things happen
void enqueue_me(int priority) {
assert(pipe_lock.is_locked());
- if (!queue_items.count(priority))
- queue_items[priority] = new xlist<Pipe *>::item(this);
pipe_lock.Unlock();
messenger->dispatch_queue.lock.Lock();
pipe_lock.Lock();
+ if (!queue_items.count(priority))
+ queue_items[priority] = new xlist<Pipe *>::item(this);
if (messenger->dispatch_queue.queued_pipes.empty())
messenger->dispatch_queue.cond.Signal();
messenger->dispatch_queue.queued_pipes[priority].push_back(queue_items[priority]);