void SimpleMessenger::Pipe::discard_queue()
{
dout(10) << "discard_queue" << dendl;
+ bool endpoint = false;
+ if (rank->local_endpoint) {
+ pipe_lock.Unlock();
+ rank->local_endpoint->endpoint_lock.Lock();//to remove from pipe queue
+ for (map<int, xlist<Pipe *>::item* >::iterator i = queue_items.begin();
+ i != queue_items.end();
+ ++i)
+ i->second->remove_myself();
+ endpoint = true;
+ pipe_lock.Lock();
+ }
for (list<Message*>::iterator p = sent.begin(); p != sent.end(); p++)
(*p)->put();
sent.clear();
for (list<Message*>::iterator r = p->second.begin(); r != p->second.end(); r++)
(*r)->put();
out_q.clear();
+ for (map<int,list<Message*> >::iterator p = in_q.begin(); p != in_q.end(); p++) {
+ if (endpoint) {
+ int size = in_q.size();
+ rank->local_endpoint->qlen -= size;
+ }
+ for (list<Message*>::iterator r = p->second.begin(); r != p->second.end(); r++)
+ delete *r;
+ }
+ if (endpoint)
+ rank->local_endpoint->endpoint_lock.Unlock();
+ in_q.clear();
}
pipe_reap_queue.pop_front();
dout(10) << "reaper reaping pipe " << p << " " << p->get_peer_addr() << dendl;
p->pipe_lock.Lock();
+ p->discard_queue();
p->pipe_lock.Unlock();
p->unregister_pipe();
assert(pipes.count(p));
pipes.erase(p);
p->join();
- p->discard_queue();
dout(10) << "reaper reaped pipe " << p << " " << p->get_peer_addr() << dendl;
assert(p->sd < 0);
delete p;
static const Pipe& Server(int s);
static const Pipe& Client(const entity_addr_t& pi);
+ //callers make sure it's not already enqueued or you'll just
+ //push it to the back of the line!
+ //Also, call with pipe_lock held or bad things happen
+ void enqueue_me(int priority) {
+ if (!queue_items.count(priority))
+ queue_items[priority] = new xlist<Pipe *>::item(this);
+ pipe_lock.Unlock();
+ rank->local_endpoint->endpoint_lock.Lock();
+ rank->local_endpoint->
+ queued_pipes[priority].push_back(queue_items[priority]);
+ rank->local_endpoint->endpoint_lock.Unlock();
+ pipe_lock.Lock();
+ }
+
//we have two queue_received's to allow local signal delivery
// via Message * (that doesn't actually point to a Message)
void queue_received(Message *m, int priority) {
list<Message *>& queue = in_q[priority];
pipe_lock.Lock();
+ bool was_empty = queue.empty();
queue.push_back(m);
- if ( 1 == queue.size()) { //this pipe isn't on the endpoint queue
+ if (was_empty) { //this pipe isn't on the endpoint queue
if (!queue_items.count(priority)) { //create an item for that priority
- pair< int, xlist<Pipe *>::item* >
- pair_item(priority, new xlist<Pipe *>::item(this));
- queue_items.insert(pair_item);
+ queue_items[priority] = new xlist<Pipe *>::item(this);
}
- pipe_lock.Unlock();
- rank->local_endpoint->endpoint_lock.Lock();
- rank->local_endpoint->
- queued_pipes[priority].push_back(queue_items[priority]);
- rank->local_endpoint->endpoint_lock.Unlock();
- pipe_lock.Lock();
+ enqueue_me(priority);
}
pipe_lock.Unlock();
++rank->local_endpoint->qlen;
rank->local_endpoint->cond.Signal();
rank->local_endpoint->endpoint_lock.Unlock();
+ dout(0) << "finished queuing received message " << m << "in msgr " << rank << dendl;
}
void queue_received(Message *m) {
dispatch_thread(this) {
local_pipe = new Pipe(r, Pipe::STATE_OPEN);
}
- ~Endpoint() { delete local_pipe; }
+ ~Endpoint() {
+ for (map< int, xlist<Pipe *> >::iterator i = queued_pipes.begin();
+ i != queued_pipes.end();
+ ++i) {
+ i->second.clear();
+ }
+ delete local_pipe;
+ }
void destroy() {
// join dispatch thread