#include "common/Logger.h"
 
-
 #define DBL 18
 
 
+
 TCPMessenger *rankmessenger = 0; // 
 
 TCPDirectory *nameserver = 0;    // only defined on rank 0
 
 // debug
 #undef dout
-#define  dout(l)    if (l<=g_conf.debug) cout << "[TCP " << my_rank << " " << getpid() << "." << pthread_self() << "] "
+#define  dout(l)    if (l<=g_conf.debug) cout << "[TCP " << my_rank /*(<< " " << getpid() << "." << pthread_self() */  << "] "
 
 
 
 void tcpmessenger_kick_dispatch_loop();
 
 
+int tcpmessenger_get_rank()
+{
+  return my_rank;
+}
 
 
 /** rankserver
 {
   while (len > 0) {
        int got = ::recv( sd, buf, len, 0 );
-       if (got < 0) return false;
+       if (got < 0) {
+         dout(DBL) << "tcp_read bailing with " << got << endl;
+         return false;
+       }
        assert(got >= 0);
        len -= got;
        buf += got;
-       //dout(DBL) << "tcp_read got " << got << ", " << len << " left" << endl;
+       dout(DBL) << "tcp_read got " << got << ", " << len << " left" << endl;
   }
   return true;
 }
        return 0;
   }
 
-  dout(DBL) << "tcp_recv got envelope type=" << env.type << " src " << env.source << " dst " << env.dest << " nchunks=" << env.nchunks << endl;
+  dout(DBL) << "tcp_recv got envelope type=" << env.type << " src " << MSG_ADDR_NICE(env.source) << " dst " << MSG_ADDR_NICE(env.dest) << " nchunks=" << env.nchunks << endl;
   
   // payload
   bufferlist blist;
 
        bufferptr bp = new buffer(size);
        
-       tcp_read( sd, bp.c_str(), size );
+       if (!tcp_read( sd, bp.c_str(), size )) return 0;
 
        blist.push_back(bp);
 
        who = m->get_source();
 
        // give to dispatch loop
+       size_t sz;
        incoming_lock.Lock();
+       {
+         incoming.push_back(m);
+         incoming_cond.Signal();
 
-       stat_inq++;
-       size_t sz = m->get_payload().length();
-       stat_inqb += sz;
-
-       incoming.push_back(m);
-       incoming_cond.Signal();
+         stat_inq++;
+         sz = m->get_payload().length();
+         stat_inqb += sz;
+       }
        incoming_lock.Unlock();
 
        if (logger) {
 {
   dout(5) << "tcp_dispatchthread start pid " << getpid() << endl;
 
-  incoming_lock.Lock();
-
   while (1) {
-
-       // callbacks?
+       // any pending callbacks?
        messenger_do_callbacks();
 
-       // timer events?
-       /*if (pending_timer) {
-         pending_timer = false;
-         dout(DBL) << "dispatch: pending timer" << endl;
-         g_timer.execute_pending();
-       }
-       */
+       // inq?
+       incoming_lock.Lock();
 
        // done?
-       if (tcp_done &&
-               incoming.empty()) break;
-               //&&
-               //!pending_timer) break;
+       if (tcp_done && incoming.empty()) {
+         incoming_lock.Unlock();
+         break;
+       }
 
        // wait?
        if (incoming.empty()) {
          // wait
-         dout(12) << "dispatch: waiting for incoming messages" << endl;
+         dout(DBL) << "dispatch: waiting for incoming messages" << endl;
          incoming_cond.Wait(incoming_lock);
+         dout(DBL) << "dispatch: woke up" << endl;
        }
 
-       // incoming?
-       while (!incoming.empty()) {
-         // grab incoming messages
-         list<Message*> in;
-         in.splice(in.begin(), incoming);
-
-         // drop lock while we deliver
-         incoming_lock.Unlock();
-
-
-         while (!in.empty()) {
-               Message *m = in.front();
-               in.pop_front();
+       // grab incoming messages  
+       list<Message*> in;
+       in.splice(in.begin(), incoming);
 
-               stat_inq--;
-               stat_inqb -= m->get_payload().length();
-               if (logger) {
-                 logger->set("inq", stat_inq);
-                 logger->set("inqb", stat_inqb);
-                 logger->inc("dis");
-               }
+       // drop lock while we deliver
+       incoming_lock.Unlock();
 
-               dout(DBL) << "dispatch doing " << *m << endl;
+       // dispatch!
+       while (!in.empty()) {
+         Message *m = in.front();
+         in.pop_front();
          
-               // for rankserver?
-               if (m->get_type() == MSG_NS_CONNECTACK ||        // i just connected
-                       m->get_dest() == MSG_ADDR_RANK(my_rank)) {
-                 dout(DBL) <<  " giving to rankserver" << endl;
-                 rankserver.dispatch(m);
-                 continue;
-               }
-
-               // ok
-               int dest = m->get_dest();
-               directory_lock.Lock();
-               if (directory.count(dest)) {
-                 Messenger *who = directory[ dest ];
-                 directory_lock.Unlock();                
-
-                 dout(4) << "---- '" << m->get_type_name() << 
-                       "' from " << MSG_ADDR_NICE(m->get_source()) << ':' << m->get_source_port() <<
-                       " to " << MSG_ADDR_NICE(m->get_dest()) << ':' << m->get_dest_port() << " ---- " 
-                                 << m 
-                                 << endl;
-                 
-                 who->dispatch(m);
-               } else {
-                 directory_lock.Unlock();
-                 dout (1) << "---- i don't know who " << MSG_ADDR_NICE(dest) << " " << dest << " is." << endl;
-                 assert(0);
-               }
+         stat_inq--;
+         stat_inqb -= m->get_payload().length();
+         if (logger) {
+               logger->set("inq", stat_inq);
+               logger->set("inqb", stat_inqb);
+               logger->inc("dis");
+         }
+         
+         dout(DBL) << "dispatch doing " << *m << endl;
+         
+         // for rankserver?
+         if (m->get_type() == MSG_NS_CONNECTACK ||        // i just connected
+                 m->get_dest() == MSG_ADDR_RANK(my_rank)) {
+               dout(DBL) <<  " giving to rankserver" << endl;
+               rankserver.dispatch(m);
+               continue;
+         }
+         
+         // ok
+         int dest = m->get_dest();
+         directory_lock.Lock();
+         if (directory.count(dest)) {
+               Messenger *who = directory[ dest ];
+               directory_lock.Unlock();                  
+               
+               dout(4) << "---- '" << m->get_type_name() << 
+                 "' from " << MSG_ADDR_NICE(m->get_source()) << ':' << m->get_source_port() <<
+                 " to " << MSG_ADDR_NICE(m->get_dest()) << ':' << m->get_dest_port() << " ---- " 
+                               << m 
+                               << endl;
+               
+               who->dispatch(m);
+         } else {
+               directory_lock.Unlock();
+               dout (1) << "---- i don't know who " << MSG_ADDR_NICE(dest) << " " << dest << " is." << endl;
+               assert(0);
          }
-
-         incoming_lock.Lock();
        }
   }
 
-  incoming_lock.Unlock();
 
   g_timer.shutdown();
 
 
 void tcpmessenger_kick_dispatch_loop()
 {
+  dout(DBL) << "kicking" << endl;
   incoming_lock.Lock();
+  dout(DBL) << "prekick" << endl;
   incoming_cond.Signal();
   incoming_lock.Unlock();
+  dout(DBL) << "kicked" << endl;
 }
 
 void tcpmessenger_kick_outgoing_loop()
 
 void tcpmessenger_wait()
 {
+  dout(10) << "tcpmessenger_wait waking up dispatch loop" << endl;
   tcpmessenger_kick_dispatch_loop();
 
   void *returnval;
        myaddr = register_entity(myaddr);
   }
 
+
   // my address
   set_myaddr( myaddr );
 
   if (lastone) {
        dout(2) << "shutdown last tcpmessenger on rank " << my_rank << " shut down" << endl;
        //pthread_t whoami = pthread_self();
-
+       
        // no more timer events
        g_timer.unset_messenger();
        msgr_callback_kicker = 0;
-
-  
+       
        // close incoming sockets
        //void *r;
        for (map<int,pthread_t>::iterator it = in_threads.begin();
 
        tcpmessenger_kick_outgoing_loop();
 
-
+       
        /*
 
        dout(15) << "whoami = " << whoami << ", thread = " << dispatch_thread_id << endl;
 
   MDS *mds[NUMMDS];
   for (int i=0; i<NUMMDS; i++) {
        if (myrank != g_conf.tcp_skip_rank0+i) continue;
-       cerr << "mds" << i << " on rank " << myrank << " " << hostname << "." << pid << endl;
-       mds[i] = new MDS(mdc, i, new TCPMessenger(MSG_ADDR_MDS(i)));
+       TCPMessenger *m = new TCPMessenger(MSG_ADDR_MDS(i));
+       cerr << "mds" << i << " on tcprank " << tcpmessenger_get_rank() << " " << hostname << "." << pid << endl;
+       mds[i] = new MDS(mdc, i, m);
        mds[i]->init();
        started++;
   }
   OSD *osd[NUMOSD];
   for (int i=0; i<NUMOSD; i++) {
        if (myrank != g_conf.tcp_skip_rank0+NUMMDS + i) continue;
-       cerr << "osd" << i << " on rank " << myrank << " " << hostname << "." << pid << endl;
-       osd[i] = new OSD(i, new TCPMessenger(MSG_ADDR_OSD(i)));
+       TCPMessenger *m = new TCPMessenger(MSG_ADDR_OSD(i));
+       cerr << "osd" << i << " on tcprank " << tcpmessenger_get_rank() <<  " " << hostname << "." << pid << endl;
+       osd[i] = new OSD(i, m);
        osd[i]->init();
        started++;
   }
        nclients++;
   }
   if (nclients) {
-       cerr << "waiting for " << nclients << " clients to finish" << endl;
+       cerr << nclients << " clients on tcprank " << tcpmessenger_get_rank() << " " << hostname << "." << pid << endl;
   }
 
   for (set<int>::iterator it = clientlist.begin();
   
 
   if (myrank && !started) {
-       dout(1) << "IDLE" << endl;
+       //dout(1) << "IDLE" << endl;
+       cerr << "idle on tcprank " << tcpmessenger_get_rank() << " " << hostname << "." << pid << endl; 
        tcpmessenger_stop_rankserver();
   }