From: Sage Weil Date: Fri, 12 Nov 2010 22:05:56 +0000 (-0800) Subject: msgr: protect pipe queue_item map with pipe_lock AND dispatch_queue lock X-Git-Tag: v0.23.1~3^2~4 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=1071a9abdea1e8ce69cde489c4c4228312b45fe7;p=ceph.git msgr: protect pipe queue_item map with pipe_lock AND dispatch_queue lock Close a few different races here. Also, assert that queue_items are not queued in ~Pipe(). Signed-off-by: Sage Weil --- diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc index 2da27e166cb5..97bcbc878c2e 100644 --- a/src/msg/SimpleMessenger.cc +++ b/src/msg/SimpleMessenger.cc @@ -1267,22 +1267,24 @@ void SimpleMessenger::Pipe::requeue_sent(uint64_t max_acked) void SimpleMessenger::Pipe::discard_queue() { dout(10) << "discard_queue" << dendl; - DispatchQueue& q = messenger->dispatch_queue; + halt_delivery = true; + + // dequeue pipe + DispatchQueue& q = messenger->dispatch_queue; pipe_lock.Unlock(); - xlist* list_on; - q.lock.Lock();//to remove from round-robin + q.lock.Lock(); pipe_lock.Lock(); for (map::item* >::iterator i = queue_items.begin(); i != queue_items.end(); ++i) { + xlist* list_on; if ((list_on = i->second->get_list())) { //if in round-robin i->second->remove_myself(); //take off if (list_on->empty()) //if round-robin queue is empty q.queued_pipes.erase(i->first); //remove from map } } - q.lock.Unlock(); // clear queue_items while (!queue_items.empty()) { @@ -1290,6 +1292,8 @@ void SimpleMessenger::Pipe::discard_queue() queue_items.erase(queue_items.begin()); } + q.lock.Unlock(); + // adjust qlen q.qlen_lock.lock(); q.qlen -= in_qlen; diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h index 88aefefe6216..052e88c9f3d1 100644 --- a/src/msg/SimpleMessenger.h +++ b/src/msg/SimpleMessenger.h @@ -144,7 +144,7 @@ private: map > out_q; // priority queue for outbound msgs map > in_q; // and inbound ones int in_qlen; - map::item* > queue_items; // _map_ protected by pipe_lock, *item protected by q.lock + map::item* > queue_items; // protected by pipe_lock AND q.lock list sent; Cond cond; bool keepalive; @@ -224,8 +224,7 @@ private: for (map::item* >::iterator i = queue_items.begin(); i != queue_items.end(); ++i) { - if (i->second->is_on_list()) - i->second->remove_myself(); + assert(!i->second->is_on_list()); delete i->second; } assert(out_q.empty()); @@ -269,11 +268,11 @@ private: //Also, call with pipe_lock held or bad things happen void enqueue_me(int priority) { assert(pipe_lock.is_locked()); - if (!queue_items.count(priority)) - queue_items[priority] = new xlist::item(this); pipe_lock.Unlock(); messenger->dispatch_queue.lock.Lock(); pipe_lock.Lock(); + if (!queue_items.count(priority)) + queue_items[priority] = new xlist::item(this); if (messenger->dispatch_queue.queued_pipes.empty()) messenger->dispatch_queue.cond.Signal(); messenger->dispatch_queue.queued_pipes[priority].push_back(queue_items[priority]);