From 2e78035ef4ff22d60d22dab4a0e7716d3990b83f Mon Sep 17 00:00:00 2001 From: Greg Farnum Date: Wed, 9 Dec 2009 19:10:45 -0800 Subject: [PATCH] msgr: Fix a number of locking and deletion bugs. Appears to work. --- src/msg/SimpleMessenger.cc | 24 +++++++++++++++++++++++- src/msg/SimpleMessenger.h | 38 +++++++++++++++++++++++++++----------- 2 files changed, 50 insertions(+), 12 deletions(-) diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc index d1eb84a7c3e15..2482790f92600 100644 --- a/src/msg/SimpleMessenger.cc +++ b/src/msg/SimpleMessenger.cc @@ -1179,6 +1179,17 @@ void SimpleMessenger::Pipe::requeue_sent() void SimpleMessenger::Pipe::discard_queue() { dout(10) << "discard_queue" << dendl; + bool endpoint = false; + if (rank->local_endpoint) { + pipe_lock.Unlock(); + rank->local_endpoint->endpoint_lock.Lock();//to remove from pipe queue + for (map::item* >::iterator i = queue_items.begin(); + i != queue_items.end(); + ++i) + i->second->remove_myself(); + endpoint = true; + pipe_lock.Lock(); + } for (list::iterator p = sent.begin(); p != sent.end(); p++) (*p)->put(); sent.clear(); @@ -1186,6 +1197,17 @@ void SimpleMessenger::Pipe::discard_queue() for (list::iterator r = p->second.begin(); r != p->second.end(); r++) (*r)->put(); out_q.clear(); + for (map >::iterator p = in_q.begin(); p != in_q.end(); p++) { + if (endpoint) { + int size = in_q.size(); + rank->local_endpoint->qlen -= size; + } + for (list::iterator r = p->second.begin(); r != p->second.end(); r++) + delete *r; + } + if (endpoint) + rank->local_endpoint->endpoint_lock.Unlock(); + in_q.clear(); } @@ -1941,12 +1963,12 @@ void SimpleMessenger::reaper() pipe_reap_queue.pop_front(); dout(10) << "reaper reaping pipe " << p << " " << p->get_peer_addr() << dendl; p->pipe_lock.Lock(); + p->discard_queue(); p->pipe_lock.Unlock(); p->unregister_pipe(); assert(pipes.count(p)); pipes.erase(p); p->join(); - p->discard_queue(); dout(10) << "reaper reaped pipe " << p << " " << p->get_peer_addr() << dendl; assert(p->sd < 0); delete p; diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h index 2f1a5aa6e6f1e..59cb6e9c875f5 100644 --- a/src/msg/SimpleMessenger.h +++ b/src/msg/SimpleMessenger.h @@ -200,6 +200,20 @@ private: static const Pipe& Server(int s); static const Pipe& Client(const entity_addr_t& pi); + //callers make sure it's not already enqueued or you'll just + //push it to the back of the line! + //Also, call with pipe_lock held or bad things happen + void enqueue_me(int priority) { + if (!queue_items.count(priority)) + queue_items[priority] = new xlist::item(this); + pipe_lock.Unlock(); + rank->local_endpoint->endpoint_lock.Lock(); + rank->local_endpoint-> + queued_pipes[priority].push_back(queue_items[priority]); + rank->local_endpoint->endpoint_lock.Unlock(); + pipe_lock.Lock(); + } + //we have two queue_received's to allow local signal delivery // via Message * (that doesn't actually point to a Message) void queue_received(Message *m, int priority) { @@ -207,19 +221,13 @@ private: list& queue = in_q[priority]; pipe_lock.Lock(); + bool was_empty = queue.empty(); queue.push_back(m); - if ( 1 == queue.size()) { //this pipe isn't on the endpoint queue + if (was_empty) { //this pipe isn't on the endpoint queue if (!queue_items.count(priority)) { //create an item for that priority - pair< int, xlist::item* > - pair_item(priority, new xlist::item(this)); - queue_items.insert(pair_item); + queue_items[priority] = new xlist::item(this); } - pipe_lock.Unlock(); - rank->local_endpoint->endpoint_lock.Lock(); - rank->local_endpoint-> - queued_pipes[priority].push_back(queue_items[priority]); - rank->local_endpoint->endpoint_lock.Unlock(); - pipe_lock.Lock(); + enqueue_me(priority); } pipe_lock.Unlock(); @@ -228,6 +236,7 @@ private: ++rank->local_endpoint->qlen; rank->local_endpoint->cond.Signal(); rank->local_endpoint->endpoint_lock.Unlock(); + dout(0) << "finished queuing received message " << m << "in msgr " << rank << dendl; } void queue_received(Message *m) { @@ -371,7 +380,14 @@ private: dispatch_thread(this) { local_pipe = new Pipe(r, Pipe::STATE_OPEN); } - ~Endpoint() { delete local_pipe; } + ~Endpoint() { + for (map< int, xlist >::iterator i = queued_pipes.begin(); + i != queued_pipes.end(); + ++i) { + i->second.clear(); + } + delete local_pipe; + } void destroy() { // join dispatch thread -- 2.39.5