dout(10) << "accepted incoming on sd " << sd << endl;
rank.lock.Lock();
- Pipe *p = new Pipe(sd);
- rank.pipes.insert(p);
+ if (!rank.local.empty()) {
+ Pipe *p = new Pipe(sd);
+ rank.pipes.insert(p);
+ }
rank.lock.Unlock();
} else {
dout(10) << "no incoming connection?" << endl;
// queue close message.
if (socket_error) {
dout(10) << "pipe(" << peer_inst << ' ' << this << ").close not queueing MSG_CLOSE, socket error" << endl;
+ }
+ else if (!writer_running) {
+ dout(10) << "pipe(" << peer_inst << ' ' << this << ").close not queueing MSG_CLOSE, no writer running" << endl;
} else {
dout(10) << "pipe(" << peer_inst << ' ' << this << ").close queueing MSG_CLOSE" << endl;
lock.Lock();
if (local.empty()) {
dout(10) << "wait: everything stopped" << endl;
break; // everything stopped.
+ } else {
+ dout(10) << "wait: local still has " << local.size() << " items, waiting" << endl;
}
wait_cond.Wait(lock);
// done! clean up.
+ //dout(10) << "wait: stopping accepter thread" << endl;
+ //accepter.stop();
+
// stop dispatch thread
if (g_conf.ms_single_dispatch) {
dout(10) << "wait: stopping dispatch thread" << endl;
{
// deliver
while (!ls.empty()) {
+ if (stop) {
+ dout(1) << "dispatch: stop=true, discarding " << ls.size()
+ << " messages in dispatch queue" << endl;
+ break;
+ }
Message *m = ls.front();
ls.pop_front();
dout(1) << m->get_dest()
cond.Wait(lock);
}
lock.Unlock();
+
+ // deregister
+ rank.unregister_entity(this);
}
void Rank::EntityMessenger::ready()
{
dout(10) << "shutdown " << get_myaddr() << endl;
- // deregister
- rank.unregister_entity(this);
-
// stop my dispatch thread
if (dispatch_thread.am_self()) {
dout(1) << "shutdown i am dispatch, setting stop flag" << endl;