From ff1784e1e88733c7ea3991fbbd330750f4dd8416 Mon Sep 17 00:00:00 2001 From: sageweil Date: Tue, 30 Jan 2007 20:52:22 +0000 Subject: [PATCH] minor simplemessenger bugs git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1055 29311d96-e01e-0410-9327-a35deaab8ce9 --- branches/sage/cephmds2/msg/SimpleMessenger.cc | 25 +++++++++++++++---- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/branches/sage/cephmds2/msg/SimpleMessenger.cc b/branches/sage/cephmds2/msg/SimpleMessenger.cc index 4b3e3944abe18..10a937762c03b 100644 --- a/branches/sage/cephmds2/msg/SimpleMessenger.cc +++ b/branches/sage/cephmds2/msg/SimpleMessenger.cc @@ -117,8 +117,10 @@ void *Rank::Accepter::entry() dout(10) << "accepted incoming on sd " << sd << endl; rank.lock.Lock(); - Pipe *p = new Pipe(sd); - rank.pipes.insert(p); + if (!rank.local.empty()) { + Pipe *p = new Pipe(sd); + rank.pipes.insert(p); + } rank.lock.Unlock(); } else { dout(10) << "no incoming connection?" << endl; @@ -271,6 +273,9 @@ void Rank::Pipe::close() // queue close message. if (socket_error) { dout(10) << "pipe(" << peer_inst << ' ' << this << ").close not queueing MSG_CLOSE, socket error" << endl; + } + else if (!writer_running) { + dout(10) << "pipe(" << peer_inst << ' ' << this << ").close not queueing MSG_CLOSE, no writer running" << endl; } else { dout(10) << "pipe(" << peer_inst << ' ' << this << ").close queueing MSG_CLOSE" << endl; lock.Lock(); @@ -943,6 +948,8 @@ void Rank::wait() if (local.empty()) { dout(10) << "wait: everything stopped" << endl; break; // everything stopped. + } else { + dout(10) << "wait: local still has " << local.size() << " items, waiting" << endl; } wait_cond.Wait(lock); @@ -951,6 +958,9 @@ void Rank::wait() // done! clean up. + //dout(10) << "wait: stopping accepter thread" << endl; + //accepter.stop(); + // stop dispatch thread if (g_conf.ms_single_dispatch) { dout(10) << "wait: stopping dispatch thread" << endl; @@ -1018,6 +1028,11 @@ void Rank::EntityMessenger::dispatch_entry() { // deliver while (!ls.empty()) { + if (stop) { + dout(1) << "dispatch: stop=true, discarding " << ls.size() + << " messages in dispatch queue" << endl; + break; + } Message *m = ls.front(); ls.pop_front(); dout(1) << m->get_dest() @@ -1034,6 +1049,9 @@ void Rank::EntityMessenger::dispatch_entry() cond.Wait(lock); } lock.Unlock(); + + // deregister + rank.unregister_entity(this); } void Rank::EntityMessenger::ready() @@ -1060,9 +1078,6 @@ int Rank::EntityMessenger::shutdown() { dout(10) << "shutdown " << get_myaddr() << endl; - // deregister - rank.unregister_entity(this); - // stop my dispatch thread if (dispatch_thread.am_self()) { dout(1) << "shutdown i am dispatch, setting stop flag" << endl; -- 2.39.5