From 6e72b201e1884667e3e9c9aba934b010860ff7ea Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Fri, 11 Dec 2009 13:40:53 -0800 Subject: [PATCH] msgr: maintain per-pipe in_qlen; avoid list.size(); some cleanup --- src/msg/SimpleMessenger.cc | 45 +++++++++++++++++++++++--------------- src/msg/SimpleMessenger.h | 27 +++++++++++------------ 2 files changed, 40 insertions(+), 32 deletions(-) diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc index 3f390887d360f..665a1e3ce9084 100644 --- a/src/msg/SimpleMessenger.cc +++ b/src/msg/SimpleMessenger.cc @@ -281,20 +281,23 @@ void SimpleMessenger::Endpoint::dispatch_entry() //move pipe to back of line -- or just take off if no more messages pipe->pipe_lock.Lock(); list& m_queue = pipe->in_q[priority]; - pipe_list.pop_front(); - if (m_queue.size() > 1) { - pipe_list.push_back(pipe->queue_items[priority]); + Message *m = m_queue.front(); + m_queue.pop_front(); + + if (m_queue.empty()) { + pipe_list.pop_front(); // pipe is done + if (pipe_list.empty()) + queued_pipes.erase(priority); + } else { + pipe_list.push_back(pipe->queue_items[priority]); // move to end of list } - if (pipe_list.empty()) - queued_pipes.erase(priority); endpoint_lock.Unlock(); //done with the pipe queue for a while + + pipe->in_qlen--; qlen_lock.lock(); - --qlen; + qlen--; qlen_lock.unlock(); - //get message from pipe - Message *m = m_queue.front(); - m_queue.pop_front(); pipe->pipe_lock.Unlock(); // done with the pipe's message queue now { if ((long)m == D_BAD_REMOTE_RESET) { @@ -1200,15 +1203,27 @@ void SimpleMessenger::Pipe::discard_queue() rank->local_endpoint->endpoint_lock.Lock();//to remove from round-robin for (map::item* >::iterator i = queue_items.begin(); i != queue_items.end(); - ++i) + ++i) { if ((list_on = i->second->get_xlist())) { //if in round-robin i->second->remove_myself(); //take off if (!list_on->size()) //if round-robin queue is empty rank->local_endpoint->queued_pipes.erase(i->first); //remove from map } + } rank->local_endpoint->endpoint_lock.Unlock(); endpoint = true; pipe_lock.Lock(); + + // clear queue_items + while (!queue_items.empty()) { + delete queue_items.begin()->second; + queue_items.erase(queue_items.begin()); + } + + // adjust qlen + rank->local_endpoint->qlen_lock.lock(); + rank->local_endpoint->qlen -= in_qlen; + rank->local_endpoint->qlen_lock.unlock(); } for (list::iterator p = sent.begin(); p != sent.end(); p++) (*p)->put(); @@ -1217,17 +1232,11 @@ void SimpleMessenger::Pipe::discard_queue() for (list::iterator r = p->second.begin(); r != p->second.end(); r++) (*r)->put(); out_q.clear(); - for (map >::iterator p = in_q.begin(); p != in_q.end(); p++) { - if (endpoint) { - int size = in_q.size(); - rank->local_endpoint->qlen_lock.lock(); - rank->local_endpoint->qlen -= size; - rank->local_endpoint->qlen_lock.unlock(); - } + for (map >::iterator p = in_q.begin(); p != in_q.end(); p++) for (list::iterator r = p->second.begin(); r != p->second.end(); r++) delete *r; - } in_q.clear(); + in_qlen = 0; } diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h index 86396f2c947c4..c181214caaba6 100644 --- a/src/msg/SimpleMessenger.h +++ b/src/msg/SimpleMessenger.h @@ -124,7 +124,8 @@ private: map > out_q; // priority queue for outbound msgs map > in_q; // and inbound ones - map::item* > queue_items; //put in msgr queue + int in_qlen; + map::item* > queue_items; // _map_ protected by pipe_lock, *item protected by endpoint_lock. list sent; Cond cond; bool keepalive; @@ -175,7 +176,7 @@ private: state(st), connection_state(new Connection), reader_running(false), writer_running(false), - keepalive(false), + in_qlen(0), keepalive(false), connect_seq(0), peer_global_seq(0), out_seq(0), in_seq(0), in_seq_acked(0), reader_thread(this), writer_thread(this) { } @@ -220,8 +221,9 @@ private: queue_items[priority] = new xlist::item(this); pipe_lock.Unlock(); rank->local_endpoint->endpoint_lock.Lock(); - rank->local_endpoint-> - queued_pipes[priority].push_back(queue_items[priority]); + if (rank->local_endpoint->queued_pipes.empty()) + rank->local_endpoint->cond.Signal(); + rank->local_endpoint->queued_pipes[priority].push_back(queue_items[priority]); rank->local_endpoint->endpoint_lock.Unlock(); pipe_lock.Lock(); } @@ -235,25 +237,22 @@ private: pipe_lock.Lock(); bool was_empty = queue.empty(); queue.push_back(m); - if (was_empty) { //this pipe isn't on the endpoint queue - if (!queue_items.count(priority)) { //create an item for that priority - queue_items[priority] = new xlist::item(this); - } + if (was_empty) //this pipe isn't on the endpoint queue enqueue_me(priority); - } - pipe_lock.Unlock(); - //increment queue length counter + //increment queue length counters + in_qlen++; rank->local_endpoint->qlen_lock.lock(); ++rank->local_endpoint->qlen; rank->local_endpoint->qlen_lock.unlock(); - rank->local_endpoint->cond.Signal(); + + pipe_lock.Unlock(); } void queue_received(Message *m) { m->set_recv_stamp(g_clock.now()); - assert (m->nref.test() == 0); - queue_received(m, m->get_priority() ); + assert(m->nref.test() == 0); + queue_received(m, m->get_priority()); } __u32 get_out_seq() { return out_seq; } -- 2.39.5