]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
newer messenger
authorsageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Wed, 11 Oct 2006 21:35:56 +0000 (21:35 +0000)
committersageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Wed, 11 Oct 2006 21:35:56 +0000 (21:35 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@922 29311d96-e01e-0410-9327-a35deaab8ce9

ceph/Makefile
ceph/msg/Message.cc
ceph/msg/Message.h
ceph/msg/Messenger.h
ceph/msg/NewerMessenger.cc [new file with mode: 0644]
ceph/msg/NewerMessenger.h [new file with mode: 0644]
ceph/newsyn.cc

index 81271c003fd2b387d852217f0f19547d1860b259..bed4d67700b7d02d7cca642f7a89e9df2721f81d 100644 (file)
@@ -80,7 +80,7 @@ TCP_OBJS = \
 
 TARGETS = cosd cfuse newsyn fakesyn
 
-SRCS=*.cc */*.cc
+SRCS=*.cc */*.cc *.h */*.h */*/*.h
 
 all: depend ${TARGETS}
 
@@ -90,16 +90,16 @@ obfs: depend obfstest
 
 
 # real bits
-cmon: cmon.cc mon.o ebofs.o msg/NewMessenger.o common.o
+cmon: cmon.cc mon.o ebofs.o msg/NewerMessenger.o common.o
        ${CC} ${CFLAGS} ${MPILIBS} $^ -o $@
 
-cosd: cosd.cc osd.o ebofs.o msg/NewMessenger.o common.o
+cosd: cosd.cc osd.o ebofs.o msg/NewerMessenger.o common.o
        ${CC} ${CFLAGS} ${MPILIBS} $^ -o $@
 
-cmds: cmds.cc mds.o osdc.o msg/NewMessenger.o common.o
+cmds: cmds.cc mds.o osdc.o msg/NewerMessenger.o common.o
        ${CC} ${CFLAGS} ${MPILIBS} $^ -o $@
 
-cfuse: cfuse.cc client.o osdc.o client/fuse.o msg/NewMessenger.o common.o
+cfuse: cfuse.cc client.o osdc.o client/fuse.o msg/NewerMessenger.o common.o
        ${CC} ${CFLAGS} ${LIBS} -lfuse $^ -o $@
 
 
@@ -127,10 +127,10 @@ fakesyn: fakesyn.o mon.o mds.o client.o osd.o ebofs.o osdc.o msg/FakeMessenger.o
 tcpsyn: tcpsyn.cc mon.o mds.o client.o osd.o ebofs.o osdc.o ${TCP_OBJS} common.o
        ${MPICC} ${MPICFLAGS} ${MPILIBS} $^ -o $@
 
-newsyn: newsyn.cc mon.o mds.o client.o osd.o ebofs.o osdc.o msg/NewMessenger.o common.o
+newsyn: newsyn.cc mon.o mds.o client.o osd.o ebofs.o osdc.o msg/NewerMessenger.o common.o
        ${MPICC} -pg ${MPICFLAGS} ${MPILIBS} $^ -o $@
 
-newsyn.nopg: newsyn.cc mon.o mds.o client.o osd.o ebofs.o osdc.o msg/NewMessenger.o common.o
+newsyn.nopg: newsyn.cc mon.o mds.o client.o osd.o ebofs.o osdc.o msg/NewerMessenger.o common.o
        ${MPICC} ${MPICFLAGS} ${MPILIBS} $^ -o $@
 
 # + obfs
index 3267e3dd2ca0c73ca6d875f56ce71f25e55bd923..34c8c3d0852d1efeca71f3346f5fd12cbbad5eed 100644 (file)
@@ -113,14 +113,6 @@ using namespace std;
 #define DEBUGLVL  10    // debug level of output
 
 
-ostream& operator<<(ostream& out, Message& m)
-{
-  // some messages define << themselves
-  if (m.get_type() == MSG_CLIENT_REQUEST) return out << *((MClientRequest*)&m);
-
-  // generic
-  return out << "message(type=" << m.get_type() << ")";
-}
 
 
 
@@ -419,6 +411,7 @@ decode_message(msg_envelope_t& env, bufferlist& payload)
 
     // -- simple messages without payload --
 
+  case MSG_CLOSE:
   case MSG_NS_STARTED:
   case MSG_NS_UNREGISTER:
   case MSG_SHUTDOWN:
index 8ccae1971e1aae3595a01a47745d24e633d2ff89..ca697f2eecd8a04680e754375b2ac244cc6b4692 100644 (file)
@@ -16,6 +16,8 @@
 #ifndef __MESSAGE_H
 #define __MESSAGE_H
  
+#define MSG_CLOSE 0
+
 #define MSG_NS_CONNECT     1
 #define MSG_NS_CONNECTACK  2
 #define MSG_NS_REGISTER    3
@@ -438,10 +440,17 @@ public:
     // copy payload
     payload.push_back( new buffer(r.c_str(), r.length()) );
   }
+
+  virtual void print(ostream& out) {
+    out << "message(type=" << get_type() << ")";
+  }
   
 };
 
 extern Message *decode_message(msg_envelope_t &env, bufferlist& bl);
-ostream& operator<<(ostream& out, Message& m);
+inline ostream& operator<<(ostream& out, Message& m) {
+  m.print(out);
+  return out;
+}
 
 #endif
index 7bd759bfbfa8e057524338a5b43bc9147bf75052..4ec3349a2a096cada7cee34fb95d551432011564 100644 (file)
@@ -67,10 +67,11 @@ class Messenger {
   virtual void dispatch(Message *m);
 
   // send message
-  virtual void prepare_send_message(msg_addr_t dest) {}
+  virtual void prepare_dest(const entity_inst_t& inst) {}
   virtual int send_message(Message *m, msg_addr_t dest, int port=0, int fromport=0) = 0;
-  virtual int send_message(Message *m, msg_addr_t dest, const entity_inst_t& inst) {
-    return send_message(m, dest);   // overload me!
+  virtual int send_message(Message *m, msg_addr_t dest, const entity_inst_t& inst,
+                          int port=0, int fromport=0) {
+    return send_message(m, dest, port, fromport);   // overload me!
   }
 
 
diff --git a/ceph/msg/NewerMessenger.cc b/ceph/msg/NewerMessenger.cc
new file mode 100644 (file)
index 0000000..bd1a7fb
--- /dev/null
@@ -0,0 +1,1791 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+#include "NewerMessenger.h"
+
+#include <errno.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+
+#include "config.h"
+
+#include "messages/MGenericMessage.h"
+#include "messages/MNSConnect.h"
+#include "messages/MNSConnectAck.h"
+#include "messages/MNSRegister.h"
+#include "messages/MNSRegisterAck.h"
+#include "messages/MNSLookup.h"
+#include "messages/MNSLookupReply.h"
+#include "messages/MNSFailure.h"
+
+//#include "messages/MFailure.h"
+
+#include <netdb.h>
+
+
+#undef dout
+#define dout(l)  if (l<=g_conf.debug_ms) cout << g_clock.now() << " -- rank" << rank.my_rank << " "
+#define derr(l)  if (l<=g_conf.debug_ms) cerr << g_clock.now() << " -- rank" << rank.my_rank << " "
+
+
+
+#include "tcp.cc"
+
+
+Rank rank;
+
+
+/********************************************
+ * Namer
+ */
+
+Rank::Namer::Namer(EntityMessenger *msgr) :
+  messenger(msgr),
+  nrank(0), nclient(0), nmds(0), nosd(0), nmon(0)
+{
+  assert(rank.my_rank == 0);
+  nrank = g_conf.num_mon;
+  
+  // announce myself
+  /*
+  cerr << "ceph ns is " << rank.accepter.listen_addr << endl;
+  cout << "export CEPH_NAMESERVER=" << rank.accepter.listen_addr << endl;
+  int fd = ::open(".ceph_ns", O_WRONLY|O_CREAT);
+  ::write(fd, (void*)&rank.accepter.listen_addr, sizeof(tcpaddr_t));
+  ::fchmod(fd, 0755);
+  ::close(fd);
+  */
+
+  // ok
+  messenger->set_dispatcher(this);
+}
+
+Rank::Namer::~Namer()
+{
+  //::unlink(".ceph_ns");
+}
+
+
+void Rank::Namer::dispatch(Message *m)
+{
+  rank.lock.Lock();
+  int type = m->get_type();
+  switch (type) {
+  case MSG_NS_CONNECT:
+    handle_connect((class MNSConnect*)m);
+    break;
+  case MSG_NS_REGISTER:
+    handle_register((class MNSRegister*)m);
+    break;
+  case MSG_NS_STARTED:
+    handle_started(m);
+    break;
+  case MSG_NS_UNREGISTER:
+    handle_unregister(m);
+    break;
+  case MSG_NS_LOOKUP:
+    handle_lookup((class MNSLookup*)m);
+    break;
+  case MSG_NS_FAILURE:
+    handle_failure((class MNSFailure*)m);
+    break;
+    
+  case MSG_FAILURE_ACK:
+    delete m;
+    break;
+
+  default:
+    assert(0);
+  }
+  rank.lock.Unlock();
+}
+
+void Rank::Namer::handle_connect(MNSConnect *m)
+{
+  int newrank = nrank++;
+  dout(2) << "namer.handle_connect from new rank" << newrank << " " << m->get_addr() << endl;
+  
+  rank.entity_map[MSG_ADDR_RANK(newrank)].addr = m->get_addr();
+  rank.entity_map[MSG_ADDR_RANK(newrank)].rank = newrank;
+  rank.entity_unstarted.insert(MSG_ADDR_RANK(newrank));
+
+  messenger->send_message(new MNSConnectAck(newrank),
+                          MSG_ADDR_RANK(newrank), rank.entity_map[MSG_ADDR_RANK(newrank)]);
+  delete m;
+}
+
+void Rank::Namer::manual_insert_inst(const entity_inst_t &inst)
+{
+  rank.entity_map[MSG_ADDR_RANK(inst.rank)] = inst;
+}
+
+void Rank::Namer::handle_register(MNSRegister *m)
+{
+  dout(10) << "namer.handle_register from rank " << m->get_rank()
+          << " addr " << m->get_entity() << endl;
+  
+  // pick id
+  msg_addr_t entity = m->get_entity();
+
+  if (entity.is_new()) {
+    // make up a new address!
+    switch (entity.type()) {
+    case MSG_ADDR_MDS_BASE:
+      entity = MSG_ADDR_MDS(nmds++);
+      break;
+      
+    case MSG_ADDR_OSD_BASE:
+      entity = MSG_ADDR_OSD(nosd++);
+      break;
+      
+    case MSG_ADDR_CLIENT_BASE:
+      entity = MSG_ADDR_CLIENT(nclient++);
+      break;
+      
+    default:
+      assert(0);
+    }
+  } else {
+    // specific address!
+  }
+
+
+  // register
+  if (rank.entity_map.count(entity)) {
+    dout(0) << "namer.handle_register re-registering " << entity
+            << " inst " << m->get_source_inst()
+            << " (was " << rank.entity_map[entity] << ")"
+            << endl;
+  } else {
+    dout(0) << "namer.handle_register registering " << entity
+            << " inst " << m->get_source_inst()
+            << endl;
+  }
+  rank.entity_map[entity] = m->get_source_inst();
+  rank.entity_unstarted.insert(entity);
+  
+  // reply w/ new id
+  messenger->send_message(new MNSRegisterAck(m->get_tid(), entity), 
+                          m->get_source(), rank.entity_map[entity]);
+  
+  delete m;
+}
+
+void Rank::Namer::handle_started(Message *m)
+{
+  msg_addr_t who = m->get_source();
+  dout(10) << "namer.handle_started from entity " << who << endl;
+
+  assert(rank.entity_unstarted.count(who));
+  rank.entity_unstarted.erase(who);
+  
+  // anybody waiting?
+  if (waiting.count(who)) {
+    list<Message*> ls;
+    ls.swap(waiting[who]);
+    waiting.erase(who);
+    
+    dout(10) << "doing waiters on " << who << endl;
+    for (list<Message*>::iterator it = ls.begin();
+         it != ls.end();
+         it++) 
+      dispatch(*it);
+  }
+}
+
+void Rank::Namer::handle_unregister(Message *m)
+{
+  msg_addr_t who = m->get_source();
+  dout(0) << "namer.handle_unregister entity " << who << endl;
+
+  rank.show_dir();
+  
+  assert(rank.entity_map.count(who));
+  rank.entity_map.erase(who);
+
+  rank.show_dir();
+
+  // shut myself down?  kick watcher.
+  if (rank.entity_map.size() == 2) {
+    dout(10) << "namer.handle_unregister stopping namer" << endl;
+    rank.lock.Unlock();
+    messenger->shutdown();
+    delete messenger;
+    rank.lock.Lock();
+  }
+
+  delete m;
+}
+
+
+void Rank::Namer::handle_lookup(MNSLookup *m) 
+{
+  // have it?
+  if (rank.entity_map.count(m->get_entity()) == 0) {
+    dout(10) << "namer " << m->get_source() << " lookup '" << m->get_entity() << "' -> dne" << endl;
+    waiting[m->get_entity()].push_back(m);
+    return;
+  }
+
+  if (rank.entity_unstarted.count(m->get_entity())) {
+    dout(10) << "namer " << m->get_source() << " lookup '" << m->get_entity() << "' -> unstarted" << endl;
+    waiting[m->get_entity()].push_back(m);
+    return;
+  }
+
+  // look it up!  
+  MNSLookupReply *reply = new MNSLookupReply(m);
+
+  reply->entity_map[m->get_entity()] = rank.entity_map[m->get_entity()];
+
+  dout(10) << "namer " << m->get_source()
+           << " lookup '" << m->get_entity() 
+           << "' -> " << rank.entity_map[m->get_entity()] << endl;
+  
+  messenger->send_message(reply, m->get_source(), m->get_source_inst());
+  delete m;
+}
+
+void Rank::Namer::handle_failure(MNSFailure *m)
+{
+  dout(10) << "namer.handle_failure inst " << m->get_inst()
+           << endl;
+
+  // search for entities on this instance
+  list<msg_addr_t> rm;
+  for (hash_map<msg_addr_t,entity_inst_t>::iterator i = rank.entity_map.begin();
+       i != rank.entity_map.end();
+       i++) {
+    if (i->second != m->get_inst()) continue;
+    rm.push_back(i->first);
+  }
+  for (list<msg_addr_t>::iterator i = rm.begin();
+       i != rm.end();
+       i++) {
+    dout(10) << "namer.handle_failure inst " << m->get_inst()
+             << ", removing " << *i << endl;
+    
+    rank.entity_map.erase(*i);
+    rank.entity_unstarted.erase(*i);
+    
+    /*
+    if ((*i).is_osd()) {
+      // tell the monitor
+      messenger->send_message(new MFailure(*i, m->get_inst()), MSG_ADDR_MON(0));
+    }
+    */
+  }
+
+  delete m;
+}
+
+
+
+/********************************************
+ * Accepter
+ */
+
+int Rank::Accepter::start()
+{
+  // bind to a socket
+  dout(10) << "accepter.start binding to listen " << endl;
+  
+  /* socket creation */
+  listen_sd = socket(AF_INET,SOCK_STREAM,0);
+  assert(listen_sd > 0);
+  
+  /* bind to port */
+  memset((char*)&listen_addr, 0, sizeof(listen_addr));
+  listen_addr.sin_family = AF_INET;
+  listen_addr.sin_addr.s_addr = htonl(INADDR_ANY);
+  listen_addr.sin_port = 0;
+  
+  int rc = bind(listen_sd, (struct sockaddr *) &listen_addr, sizeof(listen_addr));
+  assert(rc >= 0);
+
+  socklen_t llen = sizeof(listen_addr);
+  getsockname(listen_sd, (sockaddr*)&listen_addr, &llen);
+  
+  int myport = listen_addr.sin_port;
+
+  // listen!
+  rc = ::listen(listen_sd, 1000);
+  assert(rc >= 0);
+
+  //dout(10) << "accepter.start listening on " << myport << endl;
+  
+  // my address is...
+  char host[100];
+  bzero(host, 100);
+  gethostname(host, 100);
+  //dout(10) << "accepter.start my hostname is " << host << endl;
+
+  struct hostent *myhostname = gethostbyname( host ); 
+
+  struct sockaddr_in my_addr;  
+  memset(&my_addr, 0, sizeof(my_addr));
+
+  my_addr.sin_family = myhostname->h_addrtype;
+  memcpy((char *) &my_addr.sin_addr.s_addr, 
+         myhostname->h_addr_list[0], 
+         myhostname->h_length);
+  my_addr.sin_port = myport;
+  
+  listen_addr = my_addr;
+  
+  dout(10) << "accepter.start listen addr is " << listen_addr << endl;
+
+  // start thread
+  create();
+
+  return 0;
+}
+
+void *Rank::Accepter::entry()
+{
+  dout(10) << "accepter starting" << endl;
+
+  while (!done) {
+    // accept
+    struct sockaddr_in addr;
+    socklen_t slen = sizeof(addr);
+    int sd = ::accept(listen_sd, (sockaddr*)&addr, &slen);
+    if (sd > 0) {
+      dout(10) << "accepted incoming on sd " << sd << endl;
+      
+      rank.lock.Lock();
+      Pipe *p = new Pipe(sd);
+      rank.pipes.insert(p);
+      rank.lock.Unlock();
+    } else {
+      dout(10) << "no incoming connection?" << endl;
+      break;
+    }
+  }
+
+  return 0;
+}
+
+
+
+/**************************************
+ * Pipe
+ */
+
+int Rank::Pipe::accept()
+{
+  // my creater gave me sd via accept()
+  
+  // announce myself.
+  int rc = tcp_write(sd, (char*)&rank.my_inst, sizeof(rank.my_inst));
+  if (rc < 0) {
+    ::close(sd);
+    done = true;
+    return -1;
+  }
+  
+  // identify peer
+  rc = tcp_read(sd, (char*)&peer_inst, sizeof(peer_inst));
+  if (rc < 0) {
+    dout(10) << "pipe(? " << this << ").accept couldn't read peer inst" << endl;
+    ::close(sd);
+    done = true;
+    return -1;
+  }
+  
+  // create writer thread.
+  writer_running = true;
+  writer_thread.create();
+  
+  // register pipe.
+  if (peer_inst.rank >= 0) {
+    rank.lock.Lock();
+    {
+      if (rank.rank_pipe.count(peer_inst.rank) == 0) {
+        // install a pipe!
+        dout(10) << "pipe(" << peer_inst << ' ' << this << ").accept peer is " << peer_inst << endl;
+        rank.rank_pipe[peer_inst.rank] = this;
+      } else {
+        // low ranks' Pipes "win"
+        if (peer_inst.rank < rank.my_inst.rank || 
+            rank.my_inst.rank < 0) {
+          dout(10) << "pipe(" << peer_inst << ' ' << this << ").accept peer is " << peer_inst 
+                    << ", already had pipe, but switching to this new one" << endl;
+          // switch to this new Pipe
+          rank.rank_pipe[peer_inst.rank]->close();  // close old one
+          rank.rank_pipe[peer_inst.rank] = this;
+        } else {
+          dout(10) << "pipe(" << peer_inst << ' ' << this << ").accept peer is " << peer_inst 
+                    << ", already had pipe, sticking with it" << endl;
+        }
+      }
+    }
+    rank.lock.Unlock();
+  } else {
+    dout(10) << "pipe(" << peer_inst << ' ' << this << ").accept peer is unranked " << peer_inst << endl;
+  }
+
+  return 0;   // success.
+}
+
+int Rank::Pipe::connect()
+{
+  dout(10) << "pipe(" << peer_inst << ' ' << this << ").connect" << endl;
+
+  // create socket?
+  sd = socket(AF_INET,SOCK_STREAM,0);
+  assert(sd > 0);
+  
+  // bind any port
+  struct sockaddr_in myAddr;
+  myAddr.sin_family = AF_INET;
+  myAddr.sin_addr.s_addr = htonl(INADDR_ANY);
+  myAddr.sin_port = htons( 0 );    
+  
+  int rc = bind(sd, (struct sockaddr *) &myAddr, sizeof(myAddr));
+  assert(rc>=0);
+
+  // connect!
+  rc = ::connect(sd, (sockaddr*)&peer_inst.addr, sizeof(myAddr));
+  if (rc < 0) return rc;
+
+  // identify peer
+  entity_inst_t inst;
+  rc = tcp_read(sd, (char*)&inst, sizeof(inst));
+  if (inst.rank < 0) 
+    inst = peer_inst;   // i know better than they do.
+  if (peer_inst != inst && inst.rank > 0) {
+    derr(0) << "pipe(" << peer_inst << ' ' << this << ").connect peer is " << inst << ", wtf" << endl;
+    assert(0);
+    return -1;
+  }
+
+  // identify myself
+  rc = tcp_write(sd, (char*)&rank.my_inst, sizeof(rank.my_inst));
+  if (rc < 0) 
+    return -1;
+  
+  // register pipe
+  rank.lock.Lock();
+  {
+    if (rank.rank_pipe.count(peer_inst.rank) == 0) {
+      dout(10) << "pipe(" << peer_inst << ' ' << this << ").connect registering pipe" << endl;
+      rank.rank_pipe[peer_inst.rank] = this;
+    } else {
+      // this is normal.
+      dout(10) << "pipe(" << peer_inst << ' ' << this << ").connect pipe already registered." << endl;
+    }
+  }
+  rank.lock.Unlock();
+
+  // start reader
+  reader_running = true;
+  reader_thread.create();  
+  
+  return 0;
+}
+
+
+void Rank::Pipe::close()
+{
+  if (sent_close) {
+    dout(10) << "pipe(" << peer_inst << ' ' << this << ").close already closing" << endl;
+    return;
+  }
+  dout(10) << "pipe(" << peer_inst << ' ' << this << ").close" << endl;
+
+  // unreg ourselves
+  rank.lock.Lock();
+  {
+    if (rank.rank_pipe.count(peer_inst.rank) &&
+        rank.rank_pipe[peer_inst.rank] == this) {
+      dout(10) << "pipe(" << peer_inst << ' ' << this << ").close unregistering pipe" << endl;
+      rank.rank_pipe.erase(peer_inst.rank);
+    }
+  }
+  rank.lock.Unlock();
+
+  // queue close message.
+  dout(10) << "pipe(" << peer_inst << ' ' << this << ").close queueing MSG_CLOSE" << endl;
+  lock.Lock();
+  q.push_back(new MGenericMessage(MSG_CLOSE));
+  cond.Signal();
+  sent_close = true;
+  lock.Unlock();  
+}
+
+
+/* read msgs from socket.
+ * also, server.
+ *
+ */
+void Rank::Pipe::reader()
+{
+  if (server) 
+    accept();
+
+  // loop.
+  while (!done) {
+    Message *m = read_message();
+    if (!m || m->get_type() == 0) {
+      if (m) {
+       delete m;
+       dout(10) << "pipe(" << peer_inst << ' ' << this << ").reader read MSG_CLOSE message" << endl;
+      } else {
+       derr(10) << "pipe(" << peer_inst << ' ' << this << ").reader read null message" << endl;
+      }
+
+      if (!sent_close)
+       close();
+
+      done = true;
+      cond.Signal();  // wake up writer too.
+      break;
+    }
+
+    dout(10) << "pipe(" << peer_inst << ' ' << this << ").reader got message for " << m->get_dest() << endl;
+
+    EntityMessenger *entity = 0;
+
+    rank.lock.Lock();
+    {
+      if (rank.entity_map.count(m->get_source()) &&
+          rank.entity_map[m->get_source()] > m->get_source_inst()) {
+        derr(0) << "pipe(" << peer_inst << ' ' << this << ").reader source " << m->get_source() 
+                << " inst " << m->get_source_inst() 
+                << " > " << rank.entity_map[m->get_source()] 
+                << ", WATCH OUT " << *m << endl;
+        assert(0);
+      }
+
+      if (m->get_dest().type() == MSG_ADDR_RANK_BASE) {
+        // ours.
+        rank.dispatch(m);
+      } else {
+        if (g_conf.ms_single_dispatch) {
+          // submit to single dispatch queue
+          rank._submit_single_dispatch(m);
+        } else {
+          if (rank.local.count(m->get_dest())) {
+            // find entity
+            entity = rank.local[m->get_dest()];
+          } else {
+            derr(0) << "pipe(" << peer_inst << ' ' << this << ").reader got message " << *m << " for " << m->get_dest() << ", which isn't local" << endl;
+            assert(0);  // FIXME do this differently
+            //rank.waiting_for_lookup[m->get_dest()].push_back(m);
+          }
+        }
+      }
+    }
+    rank.lock.Unlock();
+    
+    if (entity) 
+      entity->queue_message(m);        // queue
+  }
+
+  
+  // reap?
+  bool reap = false;
+  lock.Lock();
+  {
+    reader_running = false;
+    if (!writer_running) reap = true;
+  }
+  lock.Unlock();
+
+  if (reap) {
+    dout(20) << "pipe(" << peer_inst << ' ' << this << ").reader queueing for reap" << endl;
+    ::close(sd);
+    rank.lock.Lock();
+    {
+      rank.pipe_reap_queue.push_back(this);
+      rank.wait_cond.Signal();
+    }
+    rank.lock.Unlock();
+  }
+}
+
+
+/* write msgs to socket.
+ * also, client.
+ */
+void Rank::Pipe::writer()
+{
+  if (!server) {
+    int rc = connect();
+    if (rc < 0) {
+      derr(0) << "pipe(" << peer_inst << ' ' << this << ").writer error connecting" << endl;
+      done = true;
+      list<Message*> out;
+      fail(out);
+    }
+  }
+
+  // loop.
+  lock.Lock();
+  while (!q.empty() || !done) {
+    
+    if (!q.empty()) {
+      dout(20) << "pipe(" << peer_inst << ' ' << this << ").writer grabbing message(s)" << endl;
+      
+      // grab outgoing list
+      list<Message*> out;
+      out.swap(q);
+      
+      // drop lock while i send these
+      lock.Unlock();
+      
+      while (!out.empty()) {
+        Message *m = out.front();
+        out.pop_front();
+
+        dout(20) << "pipe(" << peer_inst << ' ' << this << ").writer sending " << *m << endl;
+
+        // stamp.
+        m->set_source_inst(rank.my_inst);
+        
+        // marshall
+        if (m->empty_payload())
+          m->encode_payload();
+        
+        if (write_message(m) < 0) {
+          // failed!
+          derr(0) << "pipe(" << peer_inst << ' ' << this << ").writer error sending " << *m << " to " << m->get_dest() << endl;
+          out.push_front(m);
+          fail(out);
+          done = true;
+          break;
+        }
+
+        // did i just send a close?
+        if (m->get_type() == MSG_CLOSE) 
+          done = true;
+
+        // clean up
+        delete m;
+      }
+
+      lock.Lock();
+      continue;
+    }
+    
+    // wait
+    dout(20) << "pipe(" << peer_inst << ' ' << this << ").writer sleeping" << endl;
+    cond.Wait(lock);
+  }
+  lock.Unlock(); 
+  
+  dout(20) << "pipe(" << peer_inst << ' ' << this << ").writer finishing" << endl;
+
+  // reap?
+  bool reap = false;
+  lock.Lock();
+  {
+    writer_running = false;
+    if (!reader_running) reap = true;
+  }
+  lock.Unlock();
+  
+  if (reap) {
+    dout(20) << "pipe(" << peer_inst << ' ' << this << ").writer queueing for reap" << endl;
+    ::close(sd);
+    rank.lock.Lock();
+    {
+      rank.pipe_reap_queue.push_back(this);
+      rank.wait_cond.Signal();
+    }
+    rank.lock.Unlock();
+  }
+}
+
+
+Message *Rank::Pipe::read_message()
+{
+  // envelope
+  //dout(10) << "receiver.read_message from sd " << sd  << endl;
+  
+  msg_envelope_t env;
+  if (!tcp_read( sd, (char*)&env, sizeof(env) ))
+    return 0;
+  
+  dout(20) << "pipe(" << peer_inst << ' ' << this << ").reader got envelope type=" << env.type 
+           << " src " << env.source << " dst " << env.dest
+           << " nchunks=" << env.nchunks
+           << endl;
+  
+  // payload
+  bufferlist blist;
+  for (int i=0; i<env.nchunks; i++) {
+    int size;
+    if (!tcp_read( sd, (char*)&size, sizeof(size) )) return 0;
+    
+    if (size == 0) continue;
+
+    bufferptr bp = new buffer(size);
+    
+    if (!tcp_read( sd, bp.c_str(), size )) return 0;
+    
+    blist.push_back(bp);
+    
+    dout(20) << "pipe(" << peer_inst << ' ' << this << ").reader got frag " << i << " of " << env.nchunks 
+             << " len " << bp.length() << endl;
+  }
+  
+  // unmarshall message
+  size_t s = blist.length();
+  Message *m = decode_message(env, blist);
+  
+  dout(20) << "pipe(" << peer_inst << ' ' << this << ").reader got " << s << " byte message from " 
+           << m->get_source() << endl;
+  
+  return m;
+}
+
+
+
+int Rank::Pipe::write_message(Message *m)
+{
+  // get envelope, buffers
+  msg_envelope_t *env = &m->get_envelope();
+  bufferlist blist;
+  blist.claim( m->get_payload() );
+  
+#ifdef TCP_KEEP_CHUNKS
+  env->nchunks = blist.buffers().size();
+#else
+  env->nchunks = 1;
+#endif
+
+  dout(20)// << g_clock.now() 
+            << "pipe(" << peer_inst << ' ' << this << ").writer sending " << m << " " << *m 
+            << " to " << m->get_dest()
+            << endl;
+  
+  // send envelope
+  int r = tcp_write( sd, (char*)env, sizeof(*env) );
+  if (r < 0) { 
+    derr(0) << "pipe(" << peer_inst << ' ' << this << ").writer error sending envelope for " << *m
+             << " to " << m->get_dest() << endl; 
+    return -1;
+  }
+
+  // payload
+#ifdef TCP_KEEP_CHUNKS
+  // send chunk-wise
+  int i = 0;
+  for (list<bufferptr>::iterator it = blist.buffers().begin();
+       it != blist.buffers().end();
+       it++) {
+    dout(10) << "pipe(" << peer_inst << ' ' << this << ").writer tcp_sending frag " << i << " len " << (*it).length() << endl;
+    int size = (*it).length();
+    r = tcp_write( sd, (char*)&size, sizeof(size) );
+    if (r < 0) { 
+      derr(10) << "pipe(" << peer_inst << ' ' << this << ").writer error sending chunk len for " << *m << " to " << m->get_dest() << endl; 
+      return -1;
+    }
+    r = tcp_write( sd, (*it).c_str(), size );
+    if (r < 0) { 
+      derr(10) << "pipe(" << peer_inst << ' ' << this << ").writer error sending data chunk for " << *m << " to " << m->get_dest() << endl; 
+      return -1;
+    }
+    i++;
+  }
+#else
+  // one big chunk
+  int size = blist.length();
+  r = tcp_write( sd, (char*)&size, sizeof(size) );
+  if (r < 0) { 
+    derr(10) << "pipe(" << peer_inst << ' ' << this << ").writer error sending data len for " << *m << " to " << m->get_dest() << endl; 
+    return -1;
+  }
+  dout(20) << "pipe(" << peer_inst << ' ' << this << ").writer data len is " << size << " in " << blist.buffers().size() << " buffers" << endl;
+
+  for (list<bufferptr>::iterator it = blist.buffers().begin();
+       it != blist.buffers().end();
+       it++) {
+    if ((*it).length() == 0) continue;  // blank buffer.
+    r = tcp_write( sd, (*it).c_str(), (*it).length() );
+    if (r < 0) { 
+      derr(10) << "pipe(" << peer_inst << ' ' << this << ").writer error sending data megachunk for " << *m << " to " << m->get_dest() << " : len " << (*it).length() << endl; 
+      return -1;
+    }
+  }
+#endif
+  
+  return 0;
+}
+
+
+void Rank::Pipe::fail(list<Message*>& out)
+{
+  derr(10) << "pipe(" << peer_inst << ' ' << this << ").fail" << endl;
+
+  // tell namer
+  if (!rank.messenger) {
+    derr(0) << "FATAL error: can't send failure to namer0, not connected yet" << endl;
+    assert(0);
+  }
+
+  // FIXME: possible race before i reclaim lock here?
+  
+  // deactivate myself
+  rank.lock.Lock();
+  {
+    if (rank.rank_pipe.count(peer_inst.rank) &&
+        rank.rank_pipe[peer_inst.rank] == this)
+      rank.rank_pipe.erase(peer_inst.rank);
+  }
+  rank.lock.Unlock();
+
+  // what do i do about reader()?   FIXME
+
+  // sort my messages by (source) dispatcher, dest.
+  map<Dispatcher*, map<msg_addr_t, list<Message*> > > by_dis;
+  lock.Lock();
+  {
+    // include out at front of queue
+    q.splice(q.begin(), out);  
+
+    // sort
+    while (!q.empty()) {
+      if (q.front()->get_type() == MSG_CLOSE) {
+        delete q.front();
+      } 
+      else if (rank.local.count(q.front()->get_source())) {
+        Dispatcher *dis = rank.local[q.front()->get_source()]->get_dispatcher();
+        by_dis[dis][q.front()->get_dest()].push_back(q.front());
+      } 
+      else {
+        // oh well.  sending entity musta just shut down?
+        assert(0);
+        delete q.front();
+      }
+      q.pop_front();
+    }
+  }
+  lock.Unlock();
+
+  // report failure(s) to dispatcher(s)
+  for (map<Dispatcher*, map<msg_addr_t, list<Message*> > >::iterator i = by_dis.begin();
+       i != by_dis.end();
+       ++i) 
+    for (map<msg_addr_t, list<Message*> >::iterator j = i->second.begin();
+         j != i->second.end();
+         ++j) 
+      for (list<Message*>::iterator k = j->second.begin();
+           k != j->second.end();
+           ++k) {
+       derr(0) << "pipe(" << peer_inst << ' ' << this << ").fail on " << **k << " to " << j->first << " inst " << peer_inst << endl;
+        i->first->ms_handle_failure(*k, j->first, peer_inst);
+      }
+}
+
+
+
+
+
+
+/********************************************
+ * Rank
+ */
+
+Rank::Rank(int r) : 
+  single_dispatcher(this),
+  my_rank(r),
+  namer(0) {
+}
+Rank::~Rank()
+{
+  //FIXME
+  if (namer) delete namer;
+}
+
+
+void Rank::_submit_single_dispatch(Message *m)
+{
+  assert(lock.is_locked());
+
+  if (local.count(m->get_dest()) &&
+      local[m->get_dest()]->is_ready()) {
+    rank.single_dispatch_queue.push_back(m);
+    rank.single_dispatch_cond.Signal();
+  } else {
+    waiting_for_ready[m->get_dest()].push_back(m);
+  }
+}
+
+
+void Rank::single_dispatcher_entry()
+{
+  lock.Lock();
+  while (!single_dispatch_stop || !single_dispatch_queue.empty()) {
+    if (!single_dispatch_queue.empty()) {
+      list<Message*> ls;
+      ls.swap(single_dispatch_queue);
+
+      lock.Unlock();
+      {
+        while (!ls.empty()) {
+          Message *m = ls.front();
+          ls.pop_front();
+          
+          dout(1) //<< g_clock.now() 
+                  << "---- " 
+                  << m->get_source()// << ':' << m->get_source_port() 
+                  << " to " << m->get_dest()// << ':' << m->get_dest_port()
+                  << " ---- " << m->get_type_name() 
+                  << " ---- " << m 
+                  << endl;
+          
+          if (m->get_dest().type() == MSG_ADDR_RANK_BASE)
+            rank.dispatch(m);
+          else {
+            assert(local.count(m->get_dest()));
+            local[m->get_dest()]->dispatch(m);
+          }
+        }
+      }
+      lock.Lock();
+      continue;
+    }
+    single_dispatch_cond.Wait(lock);
+  }
+  lock.Unlock();
+}
+
+
+/*
+ * note: assumes lock is held
+ */
+void Rank::reaper()
+{
+  dout(10) << "reaper" << endl;
+  assert(lock.is_locked());
+
+  while (!pipe_reap_queue.empty()) {
+    Pipe *p = pipe_reap_queue.front();
+    dout(10) << "reaper reaping pipe " << p->get_peer_inst() << endl;
+    pipe_reap_queue.pop_front();
+    assert(pipes.count(p));
+    pipes.erase(p);
+    p->join();
+    dout(10) << "reaper reaped pipe " << p->get_peer_inst() << endl;
+    delete p;
+  }
+}
+
+
+int Rank::start_rank()
+{
+  dout(10) << "start_rank" << endl;
+
+  // bind to a socket
+  if (accepter.start() < 0) 
+    return -1;
+
+  // start single thread dispatcher?
+  if (g_conf.ms_single_dispatch) {
+    single_dispatch_stop = false;
+    single_dispatcher.create();
+  }    
+
+  lock.Lock();
+
+  // my_inst
+  my_inst.addr = accepter.listen_addr;
+  my_inst.rank = my_rank;
+
+  if (my_rank < 0) {
+    dout(10) << "start_rank connecting to namer0" << endl;
+    
+    // connect to namer
+    assert(entity_map.count(MSG_ADDR_NAMER(0)));
+    Pipe *pipe = connect_rank(entity_map[MSG_ADDR_NAMER(0)]);
+    
+    // send
+    Message *m = new MNSConnect(accepter.listen_addr);
+    m->set_dest(MSG_ADDR_NAMER(0), 0);
+    pipe->send(m);
+    
+    // wait
+    while (my_rank < 0) 
+      waiting_for_rank.Wait(lock);
+    assert(my_rank >= 0);    
+    
+    dout(10) << "start_rank got rank " << my_rank << endl;
+    
+    // create rank entity
+    entity_map[MSG_ADDR_RANK(my_rank)] = my_inst;
+    local[MSG_ADDR_RANK(my_rank)] = messenger = new EntityMessenger(MSG_ADDR_RANK(my_rank));
+    messenger->set_dispatcher(this);
+  } else {
+    // create my rank
+    msg_addr_t raddr = MSG_ADDR_RANK(my_rank);
+    entity_map[raddr] = my_inst;
+    entity_unstarted.insert(raddr);
+    local[raddr] = messenger = new EntityMessenger(raddr);
+    messenger->set_dispatcher(this);
+    
+    dout(0) << "start_rank " << my_rank << " at " << my_inst << endl;
+  } 
+
+  lock.Unlock();
+  return 0;
+}
+
+void Rank::start_namer()
+{
+  // create namer0
+  msg_addr_t naddr = MSG_ADDR_NAMER(0);
+  entity_map[naddr] = my_inst;
+  local[naddr] = new EntityMessenger(naddr);
+  namer = new Namer(local[naddr]);
+  namer_inst = my_inst;
+}
+
+void Rank::set_namer(const tcpaddr_t& ns)
+{
+  namer_inst.addr = entity_map[MSG_ADDR_NAMER(0)].addr = ns;
+  namer_inst.rank = entity_map[MSG_ADDR_NAMER(0)].rank = 0;
+}
+
+/* connect_rank
+ * NOTE: assumes rank.lock held.
+ */
+Rank::Pipe *Rank::connect_rank(const entity_inst_t& inst)
+{
+  assert(rank.lock.is_locked());
+  assert(inst != rank.my_inst);
+  
+  dout(10) << "connect_rank to " << inst << endl;
+  
+  // create pipe
+  Pipe *pipe = new Pipe(inst);
+  rank.rank_pipe[inst.rank] = pipe;
+  pipes.insert(pipe);
+
+  return pipe;
+}
+
+
+
+
+
+void Rank::show_dir()
+{
+  dout(10) << "show_dir ---" << endl;
+  
+  for (hash_map<msg_addr_t, entity_inst_t>::iterator i = entity_map.begin();
+       i != entity_map.end();
+       i++) {
+    if (local.count(i->first)) {
+      dout(10) << "show_dir entity_map " << i->first << " -> " << i->second << " local " << endl;
+    } else {
+      dout(10) << "show_dir entity_map " << i->first << " -> " << i->second << endl;
+    }
+  }
+}
+
+
+/* lookup
+ * NOTE: assumes directory.lock held
+ */
+void Rank::lookup(msg_addr_t addr)
+{
+  dout(10) << "lookup " << addr << endl;
+  assert(lock.is_locked());
+
+  assert(looking_up.count(addr) == 0);
+  looking_up.insert(addr);
+
+  MNSLookup *r = new MNSLookup(addr);
+  messenger->send_message(r, MSG_ADDR_NAMER(0), namer_inst);
+}
+
+
+
+/* register_entity 
+ */
+Rank::EntityMessenger *Rank::register_entity(msg_addr_t addr)
+{
+  dout(10) << "register_entity " << addr << endl;
+  lock.Lock();
+  
+  // register with namer
+  static long reg_attempt = 0;
+  long id = ++reg_attempt;
+  
+  Message *reg = new MNSRegister(addr, my_rank, id);
+  reg->set_source(MSG_ADDR_RANK(my_rank), 0);
+  reg->set_source_inst(my_inst);
+  reg->set_dest(MSG_ADDR_DIRECTORY, 0);
+  
+  // prepare cond
+  Cond cond;
+  waiting_for_register_cond[id] = &cond;
+  
+  // send request
+  lock.Unlock();
+  submit_message(reg);
+  lock.Lock();
+  
+  // wait
+  while (!waiting_for_register_result.count(id))
+    cond.Wait(lock);
+  
+  // grab result
+  addr = waiting_for_register_result[id];
+  dout(10) << "register_entity got " << addr << endl;
+  
+  // clean up
+  waiting_for_register_cond.erase(id);
+  waiting_for_register_result.erase(id);
+  
+  // create messenger
+  EntityMessenger *msgr = new EntityMessenger(addr);
+
+  // add to directory
+  entity_map[addr] = my_inst;
+  local[addr] = msgr;
+
+  // was anyone waiting?
+  if (waiting_for_lookup.count(addr)) {
+    submit_messages(waiting_for_lookup[addr]);
+    waiting_for_lookup.erase(addr);
+  }
+
+  lock.Unlock();
+  return msgr;
+}
+
+void Rank::unregister_entity(EntityMessenger *msgr)
+{
+  lock.Lock();
+  dout(10) << "unregister_entity " << msgr->get_myaddr() << endl;
+  
+  // remove from local directory.
+  assert(local.count(msgr->get_myaddr()));
+  local.erase(msgr->get_myaddr());
+
+  if (my_rank > 0) {
+    assert(entity_map.count(msgr->get_myaddr()));
+    entity_map.erase(msgr->get_myaddr());
+  } // else namer will do it.
+
+  // tell namer.
+  if (msgr->get_myaddr() != MSG_ADDR_NAMER(0) &&
+      msgr->get_myaddr() != MSG_ADDR_RANK(0))
+    msgr->send_message(new MGenericMessage(MSG_NS_UNREGISTER),
+                       MSG_ADDR_NAMER(0), namer_inst);
+  
+  // kick wait()?
+  if (local.size() <= 2)
+    wait_cond.Signal();   
+
+  lock.Unlock();
+}
+
+
+void Rank::submit_messages(list<Message*>& ls)
+{
+  for (list<Message*>::iterator i = ls.begin(); i != ls.end(); i++)
+    submit_message(*i);
+  ls.clear();
+}
+
+
+
+void Rank::submit_message(Message *m, const entity_inst_t& dest_inst)
+{
+  const msg_addr_t dest = m->get_dest();
+
+  // lookup
+  EntityMessenger *entity = 0;
+  Pipe *pipe = 0;
+
+  lock.Lock();
+  {
+    // local?
+    if (dest_inst.rank == my_inst.rank) {
+      if (local.count(dest)) {
+        // local
+        dout(20) << "submit_message " << *m << " dest " << dest << " local" << endl;
+        if (g_conf.ms_single_dispatch) {
+          _submit_single_dispatch(m);
+        } else {
+          entity = local[dest];
+        }
+      } else {
+        // mid-register
+        dout(20) << "submit_message " << *m << " dest " << dest << " " << dest_inst << " local but mid-register, waiting." << endl;
+        assert(0);  // hmpf
+        waiting_for_lookup[dest].push_back(m);
+      }
+    }
+    else {
+      // remote.
+      if (rank_pipe.count( dest_inst.rank )) {
+        //&&
+        //rank_pipe[dest_inst.rank]->inst == dest_inst) {
+        dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_inst << ", already connected." << endl;
+        // connected.
+        pipe = rank_pipe[ dest_inst.rank ];
+      } else {
+        dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_inst << ", connecting." << endl;
+        // not connected.
+        pipe = connect_rank( dest_inst );
+      }
+    }
+  }
+  lock.Unlock();
+  
+  // do it
+  if (entity) {  
+    // local!
+    dout(20) << "submit_message " << *m << " dest " << dest << " local, queueing" << endl;
+    entity->queue_message(m);
+  } 
+  else if (pipe) {
+    // remote!
+    dout(20) << "submit_message " << *m << " dest " << dest << " remote, sending" << endl;
+    pipe->send(m);
+  } 
+}
+
+
+void Rank::submit_message(Message *m)
+{
+  const msg_addr_t dest = m->get_dest();
+
+  // lookup
+  EntityMessenger *entity = 0;
+  Pipe *pipe = 0;
+
+  lock.Lock();
+  {
+    if (local.count(dest)) {
+      dout(20) << "submit_message " << *m << " dest " << dest << " local" << endl;
+
+      // local
+      if (g_conf.ms_single_dispatch) {
+        _submit_single_dispatch(m);
+      } else {
+        entity = local[dest];
+      }
+    } else if (entity_map.count( dest )) {
+      // remote, known rank addr.
+      entity_inst_t inst = entity_map[dest];
+
+      if (inst == my_inst) {
+        dout(20) << "submit_message " << *m << " dest " << dest << " local but mid-register, waiting." << endl;
+        waiting_for_lookup[dest].push_back(m);
+      }
+      else if (rank_pipe.count( inst.rank ) &&
+          rank_pipe[inst.rank]->get_peer_inst() == inst) {
+        dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << inst << ", connected." << endl;
+        // connected.
+        pipe = rank_pipe[ inst.rank ];
+      } else {
+        dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << inst << ", connecting." << endl;
+        // not connected.
+        pipe = connect_rank( inst );
+      }
+    } else {
+      // unknown dest rank or rank addr.
+      if (looking_up.count(dest) == 0) {
+        dout(20) << "submit_message " << *m << " dest " << dest << " remote, unknown addr, looking up" << endl;
+        lookup(dest);
+      } else {
+        dout(20) << "submit_message " << *m << " dest " << dest << " remote, unknown addr, already looking up" << endl;
+      }
+      waiting_for_lookup[dest].push_back(m);
+    }
+  }
+  lock.Unlock();
+  
+  // do it
+  if (entity) {  
+    // local!
+    dout(20) << "submit_message " << *m << " dest " << dest << " local, queueing" << endl;
+    entity->queue_message(m);
+  } 
+  else if (pipe) {
+    // remote!
+    dout(20) << "submit_message " << *m << " dest " << dest << " remote, sending" << endl;
+    pipe->send(m);
+  } 
+}
+
+
+
+
+void Rank::dispatch(Message *m) 
+{
+  lock.Lock();
+
+  dout(10) << "dispatching " << *m << endl;
+
+  switch (m->get_type()) {
+  case MSG_NS_CONNECTACK:
+    handle_connect_ack((MNSConnectAck*)m);
+    break;
+    
+  case MSG_NS_REGISTERACK:
+    handle_register_ack((MNSRegisterAck*)m);
+    break;
+    
+  case MSG_NS_LOOKUPREPLY:
+    handle_lookup_reply((MNSLookupReply*)m);
+    break;
+    
+  default:
+    assert(0);
+  }
+  
+  lock.Unlock();
+}
+
+void Rank::handle_connect_ack(MNSConnectAck *m) 
+{
+  dout(10) << "handle_connect_ack, my rank is " << m->get_rank() << endl;
+  my_rank = m->get_rank();
+
+  my_inst.addr = accepter.listen_addr;
+  my_inst.rank = my_rank;
+
+  waiting_for_rank.SignalAll();
+  delete m;
+
+  // logger!
+  /*dout(10) << "logger" << endl;
+  char names[100];
+  sprintf(names, "rank%d", my_rank);
+  string name = names;
+  
+  if (g_conf.tcp_log) {
+    logger = new Logger(name, (LogType*)&rank_logtype);
+    rank_logtype.add_set("num");
+    rank_logtype.add_inc("in");
+    rank_logtype.add_inc("inb");
+    rank_logtype.add_inc("dis");
+    rank_logtype.add_set("inq");
+    rank_logtype.add_set("inqb");
+    rank_logtype.add_set("outq");
+    rank_logtype.add_set("outqb");
+  }
+  */
+}
+
+
+void Rank::handle_register_ack(MNSRegisterAck *m) 
+{
+  dout(10) << "handle_register_ack " << m->get_entity() << endl;
+
+  long tid = m->get_tid();
+  waiting_for_register_result[tid] = m->get_entity();
+  waiting_for_register_cond[tid]->Signal();
+  delete m;
+}
+
+void Rank::handle_lookup_reply(MNSLookupReply *m) 
+{
+  list<Message*> waiting;
+  dout(10) << "got lookup reply" << endl;
+  
+  for (map<msg_addr_t,entity_inst_t>::iterator it = m->entity_map.begin();
+       it != m->entity_map.end();
+       it++) {
+    dout(10) << "lookup got " << it->first << " at " << it->second << endl;
+    msg_addr_t addr = it->first;
+    entity_inst_t inst = it->second;
+
+    if (entity_map.count(addr) &&
+        entity_map[addr] > inst) {
+      dout(10) << "ignoring lookup results for " << addr << ", " \
+               << entity_map[addr] << " > " << inst << endl;
+      continue;
+    }
+
+    // update map.
+    entity_map[addr] = inst;
+
+    if (inst.rank == my_rank) {
+      // local
+      dout(10) << "delivering lookup results locally" << endl;
+      if (local.count(addr)) {
+        if (g_conf.ms_single_dispatch) {
+          single_dispatch_queue.splice(single_dispatch_queue.end(),
+                                       waiting_for_lookup[addr]);
+        } else {
+          local[addr]->queue_messages(waiting_for_lookup[addr]);
+        }
+        waiting_for_lookup.erase(addr);
+      } else
+        lookup(addr);  // try again!
+
+    } else {
+      // remote
+      if (rank_pipe.count(inst.rank) == 0) 
+        connect_rank(inst);
+      else if (rank_pipe[inst.rank]->get_peer_inst() != inst) {
+        dout(0) << "lookup got rank addr change, WATCH OUT" << endl;
+        // FIXME BUG possible message loss weirdness?
+        rank_pipe[inst.rank]->close();
+        rank_pipe.erase(inst.rank);
+        connect_rank(inst);
+      }
+      
+      // take waiters
+      Pipe *pipe = rank_pipe[inst.rank];
+      assert(pipe);
+      
+      if (waiting_for_lookup.count(addr)) {
+        pipe->send(waiting_for_lookup[addr]);
+        waiting_for_lookup.erase(addr);
+      }
+    }
+  }
+
+  delete m;
+}
+
+
+void Rank::wait()
+{
+  lock.Lock();
+  while (1) {
+    // reap dead pipes
+    reaper();
+
+    if (local.size() == 0) {
+      dout(10) << "wait: everything stopped" << endl;
+      break;   // everything stopped.
+    }
+
+    if (local.size() == 1 &&
+        !messenger->is_stopped()) {
+      dout(10) << "wait: stopping rank" << endl;
+      lock.Unlock();
+      messenger->shutdown();
+      delete messenger;
+      lock.Lock();
+      continue;
+    }
+
+    wait_cond.Wait(lock);
+  }
+  lock.Unlock();
+
+  // done!  clean up.
+
+  // stop dispatch thread
+  if (g_conf.ms_single_dispatch) {
+    dout(10) << "wait: stopping dispatch thread" << endl;
+    lock.Lock();
+    single_dispatch_stop = true;
+    single_dispatch_cond.Signal();
+    lock.Unlock();
+    single_dispatcher.join();
+  }
+  
+  // reap pipes
+  lock.Lock();
+  {
+    dout(10) << "wait: closing pipes" << endl;
+    list<Pipe*> toclose;
+    for (hash_map<int,Pipe*>::iterator i = rank_pipe.begin();
+         i != rank_pipe.end();
+         i++)
+      toclose.push_back(i->second);
+    for (list<Pipe*>::iterator i = toclose.begin();
+        i != toclose.end();
+        i++)
+      (*i)->close();
+
+    dout(10) << "wait: waiting for pipes " << pipes << " to close" << endl;
+    while (!pipes.empty()) {
+      wait_cond.Wait(lock);
+      reaper();
+    }
+  }
+  lock.Unlock();
+
+  dout(10) << "wait: done." << endl;
+}
+
+
+
+int Rank::find_ns_addr(tcpaddr_t &nsa)
+{
+  // file?
+  int fd = ::open(".ceph_ns",O_RDONLY);
+  if (fd > 0) {
+    ::read(fd, (void*)&nsa, sizeof(nsa));
+    ::close(fd);
+    cout << "ceph ns is " << nsa << endl;
+    return 0;
+  }
+
+  // env var?
+  char *nsaddr = getenv("CEPH_NAMESERVER");////envz_entry(*envp, e_len, "CEPH_NAMESERVER");    
+  if (nsaddr) {
+    while (nsaddr[0] != '=') nsaddr++;
+    nsaddr++;
+    
+    if (tcp_hostlookup(nsaddr, nsa) < 0) {
+      cout << "can't resolve " << nsaddr << endl;
+      return -1;
+    }
+
+    cout << "ceph ns is " << nsa << endl;
+    return 0;
+  }
+
+  cerr << "i can't find ceph ns addr in .ceph_ns or CEPH_NAMESERVER" << endl;
+  return -1;
+}
+
+
+
+/**********************************
+ * EntityMessenger
+ */
+
+Rank::EntityMessenger::EntityMessenger(msg_addr_t myaddr) :
+  Messenger(myaddr),
+  stop(false),
+  dispatch_thread(this)
+{
+}
+Rank::EntityMessenger::~EntityMessenger()
+{
+}
+
+void Rank::EntityMessenger::dispatch_entry()
+{
+  lock.Lock();
+  while (!stop) {
+    if (!dispatch_queue.empty()) {
+      list<Message*> ls;
+      ls.swap(dispatch_queue);
+
+      lock.Unlock();
+      {
+        // deliver
+        while (!ls.empty()) {
+          Message *m = ls.front();
+          ls.pop_front();
+          dout(1) //<< g_clock.now()
+                  << "---- " 
+                  << m->get_source()// << ':' << m->get_source_port() 
+                  << " to " << m->get_dest()// << ':' << m->get_dest_port()
+                  << " ---- " << m->get_type_name() 
+                  << " ---- " << m->get_source_inst()
+                  << " ---- " << m 
+                  << endl;
+          dispatch(m);
+        }
+      }
+      lock.Lock();
+      continue;
+    }
+    cond.Wait(lock);
+  }
+  lock.Unlock();
+}
+
+void Rank::EntityMessenger::ready()
+{
+  dout(10) << "ready " << get_myaddr() << endl;
+
+  if (g_conf.ms_single_dispatch) {
+    rank.lock.Lock();
+    if (rank.waiting_for_ready.count(get_myaddr())) {
+      rank.single_dispatch_queue.splice(rank.single_dispatch_queue.end(),
+                                        rank.waiting_for_ready[get_myaddr()]);
+      rank.waiting_for_ready.erase(get_myaddr());
+      rank.single_dispatch_cond.Signal();
+    }
+    rank.lock.Unlock();
+  } else {
+    // start my dispatch thread
+    dispatch_thread.create();
+  }
+
+  // tell namer
+  if (get_myaddr() != MSG_ADDR_NAMER(0) &&
+      get_myaddr() != MSG_ADDR_RANK(0))
+    send_message(new MGenericMessage(MSG_NS_STARTED), MSG_ADDR_NAMER(0), rank.namer_inst);
+}
+
+
+int Rank::EntityMessenger::shutdown()
+{
+  dout(10) << "shutdown " << get_myaddr() << endl;
+
+  // deregister
+  rank.unregister_entity(this);
+
+  // stop my dispatch thread
+  if (dispatch_thread.am_self()) {
+    dout(0) << "shutdown i am dispatch, setting stop flag" << endl;
+    stop = true;
+  } else {
+    dout(0) << "shutdown i am not dispatch, setting stop flag and joining thread." << endl;
+    lock.Lock();
+    stop = true;
+    cond.Signal();
+    lock.Unlock();
+    dispatch_thread.join();
+  }
+
+  return 0;
+}
+
+
+void Rank::EntityMessenger::prepare_dest(const entity_inst_t& inst)
+{
+  rank.lock.Lock();
+  {
+    if (rank.rank_pipe.count(inst.rank) == 0)
+      rank.connect_rank(inst);
+  }
+  rank.lock.Unlock();
+}
+
+int Rank::EntityMessenger::send_message(Message *m, msg_addr_t dest, const entity_inst_t& inst,
+                                       int port, int fromport)
+{
+  // set envelope
+  m->set_source(get_myaddr(), fromport);
+  m->set_dest(dest, port);
+
+  m->set_source_inst(rank.my_inst);
+
+  dout(1) << "--> " 
+          << m->get_source() //<< ':' << m->get_source_port() 
+          << " to " << m->get_dest() //<< ':' << m->get_dest_port()
+          << " ---- " << m->get_type_name() 
+          << " ---- " << rank.my_inst << " --> " << inst
+          << " ---- " << m 
+          << endl;
+
+  rank.submit_message(m, inst);
+
+  return 0;
+}
+
+
+int Rank::EntityMessenger::send_message(Message *m, msg_addr_t dest, int port, int fromport)
+{
+  // set envelope
+  m->set_source(get_myaddr(), fromport);
+  m->set_dest(dest, port);
+
+  m->set_source_inst(rank.my_inst);
+
+  dout(1) << "--> " 
+          << m->get_source() //<< ':' << m->get_source_port() 
+          << " to " << m->get_dest() //<< ':' << m->get_dest_port()
+          << " ---- " << m->get_type_name() 
+          << " ---- " << rank.my_inst << " --> ? (DEPRECATED)"
+          << " ---- " << m 
+          << endl;
+
+  rank.submit_message(m);
+
+  return 0;
+}
+
+
+void Rank::EntityMessenger::mark_down(msg_addr_t a, entity_inst_t& i)
+{
+  assert(a != get_myaddr());
+  rank.mark_down(a,i);
+}
+
+void Rank::mark_down(msg_addr_t a, entity_inst_t& inst)
+{
+  //if (my_rank == 0) return;   // ugh.. rank0 already handles this stuff in the namer
+  lock.Lock();
+  if (entity_map.count(a) &&
+      entity_map[a] > inst) {
+    dout(10) << "mark_down " << a << " inst " << inst << " < " << entity_map[a] << endl;
+    derr(10) << "mark_down " << a << " inst " << inst << " < " << entity_map[a] << endl;
+    // do nothing!
+  } else {
+    if (entity_map.count(a) == 0) {
+      // don't know it
+      dout(10) << "mark_down " << a << " inst " << inst << " ... unknown by me" << endl;
+      derr(10) << "mark_down " << a << " inst " << inst << " ... unknown by me" << endl;
+      
+      waiting_for_lookup.erase(a);
+      looking_up.erase(a);
+    } else {
+      // know it
+      assert(entity_map[a] <= inst);
+      dout(10) << "mark_down " << a << " inst " << inst << endl;
+      derr(10) << "mark_down " << a << " inst " << inst << endl;
+      
+      entity_map.erase(a);
+      
+      if (rank_pipe.count(inst.rank)) {
+       rank_pipe[inst.rank]->close();
+       rank_pipe.erase(inst.rank);
+      }
+      
+      // kill rank# too?  only if i'm the namer.
+      if (my_rank == 0) {
+       entity_map.erase(MSG_ADDR_RANK(inst.rank));
+      }
+    }
+  }
+  lock.Unlock();
+}
+
+void Rank::EntityMessenger::mark_up(msg_addr_t a, entity_inst_t& i)
+{
+  assert(a != get_myaddr());
+  rank.mark_up(a, i);
+}
+
+void Rank::mark_up(msg_addr_t a, entity_inst_t& i)
+{
+  if (my_rank == 0) return;
+  lock.Lock();
+  {
+    dout(10) << "mark_up " << a << " inst " << i << endl;
+    derr(10) << "mark_up " << a << " inst " << i << endl;
+
+    assert(i.rank != my_rank);     // hrm?
+    
+    if (entity_map.count(a) == 0 ||
+        entity_map[a] < i) {
+      entity_map[a] = i;
+      connect_rank(i);
+    } else if (entity_map[a] == i) {
+      dout(10) << "mark_up " << a << " inst " << i << " ... knew it" << endl;
+      derr(10) << "mark_up " << a << " inst " << i << " ... knew it" << endl;
+    } else {
+      dout(-10) << "mark_up " << a << " inst " << i << " < " << entity_map[a] << endl;
+      derr(-10) << "mark_up " << a << " inst " << i << " < " << entity_map[a] << endl;
+    }
+
+    //if (waiting_for_lookup.count(a))
+    //lookup(a);
+  }
+  lock.Unlock();
+}
+
diff --git a/ceph/msg/NewerMessenger.h b/ceph/msg/NewerMessenger.h
new file mode 100644 (file)
index 0000000..6a4e003
--- /dev/null
@@ -0,0 +1,343 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+#ifndef __NEWMESSENGER_H
+#define __NEWMESSENGER_H
+
+
+#include <list>
+#include <map>
+using namespace std;
+#include <ext/hash_map>
+#include <ext/hash_set>
+using namespace __gnu_cxx;
+
+
+#include "include/types.h"
+
+#include "common/Mutex.h"
+#include "common/Cond.h"
+#include "common/Thread.h"
+
+#include "Messenger.h"
+#include "Message.h"
+#include "tcp.h"
+
+
+
+
+/* Rank - per-process
+ */
+class Rank : public Dispatcher {
+  class EntityMessenger;
+  class Pipe;
+
+  // namer
+  class Namer : public Dispatcher {
+  public:
+    EntityMessenger *messenger;  // namerN
+
+    int nrank;
+    int nclient, nmds, nosd, nmon;
+    
+    map<msg_addr_t, list<Message*> > waiting;
+
+    Namer(EntityMessenger *msgr);
+    ~Namer();
+
+    void handle_connect(class MNSConnect*);
+    void handle_register(class MNSRegister *m);
+    void handle_started(Message *m);
+    void handle_lookup(class MNSLookup *m);
+    void handle_unregister(Message *m);
+    void handle_failure(class MNSFailure *m);
+
+    void dispatch(Message *m); 
+
+    void manual_insert_inst(const entity_inst_t &inst);
+
+  };
+
+  // incoming
+  class Accepter : public Thread {
+  public:
+    bool done;
+
+    tcpaddr_t listen_addr;
+    int       listen_sd;
+    
+    Accepter() : done(false) {}
+    
+    void *entry();
+    void stop() {
+      done = true;
+      ::close(listen_sd);
+      join();
+    }
+    int start();
+  } accepter;
+  
+
+
+  class Pipe {
+  protected:
+    int sd;
+    bool done;
+    entity_inst_t peer_inst;
+    bool server;
+    bool sent_close;
+
+    bool reader_running;
+    bool writer_running;
+
+    list<Message*> q;
+    Mutex lock;
+    Cond cond;
+    
+    int accept();   // server handshake
+    int connect();  // client handshake
+    void reader();
+    void writer();
+
+    Message *read_message();
+    int write_message(Message *m);
+    void fail(list<Message*>& ls);
+
+    // threads
+    class Reader : public Thread {
+      Pipe *pipe;
+    public:
+      Reader(Pipe *p) : pipe(p) {}
+      void *entry() { pipe->reader(); return 0; }
+    } reader_thread;
+    friend class Reader;
+
+    class Writer : public Thread {
+      Pipe *pipe;
+    public:
+      Writer(Pipe *p) : pipe(p) {}
+      void *entry() { pipe->writer(); return 0; }
+    } writer_thread;
+    friend class Writer;
+
+  public:
+    Pipe(int s) : sd(s),
+      done(false), server(true), 
+      sent_close(false),
+      reader_running(false), writer_running(false),
+      reader_thread(this), writer_thread(this) {
+      // server
+      reader_running = true;
+      reader_thread.create();
+    }
+    Pipe(const entity_inst_t &pi) : sd(0),
+      done(false), peer_inst(pi), server(false), 
+      sent_close(false),
+      reader_running(false), writer_running(false),
+      reader_thread(this), writer_thread(this) {
+      // client
+      writer_running = true;
+      writer_thread.create();
+    }
+
+    // public constructors
+    static const Pipe& Server(int s);
+    static const Pipe& Client(const entity_inst_t& pi);
+
+    entity_inst_t& get_peer_inst() { return peer_inst; }
+
+    void close();
+    void join() {
+      writer_thread.join();
+      reader_thread.join();
+    }
+
+    void send(Message *m) {
+      lock.Lock();
+      q.push_back(m);
+      cond.Signal();
+      lock.Unlock();
+    }    
+    void send(list<Message*>& ls) {
+      lock.Lock();
+      q.splice(q.end(), ls);
+      cond.Signal();
+      lock.Unlock();
+    }
+  };
+
+
+
+  // messenger interface
+  class EntityMessenger : public Messenger {
+    Mutex lock;
+    Cond cond;
+    list<Message*> dispatch_queue;
+    bool stop;
+
+    class DispatchThread : public Thread {
+      EntityMessenger *m;
+    public:
+      DispatchThread(EntityMessenger *_m) : m(_m) {}
+      void *entry() {
+        m->dispatch_entry();
+        return 0;
+      }
+    } dispatch_thread;
+    void dispatch_entry();
+
+  public:
+    void queue_message(Message *m) {
+      lock.Lock();
+      dispatch_queue.push_back(m);
+      cond.Signal();
+      lock.Unlock();
+    }
+    void queue_messages(list<Message*> ls) {
+      lock.Lock();
+      dispatch_queue.splice(dispatch_queue.end(), ls);
+      cond.Signal();
+      lock.Unlock();
+    }
+
+  public:
+    EntityMessenger(msg_addr_t myaddr);
+    ~EntityMessenger();
+
+    void ready();
+    bool is_stopped() { return stop; }
+
+    void wait() {
+      dispatch_thread.join();
+    }
+    
+    virtual void callback_kick() {} 
+    virtual int shutdown();
+    virtual void prepare_dest(const entity_inst_t& inst);
+    virtual int send_message(Message *m, msg_addr_t dest, int port=0, int fromport=0);
+    virtual int send_message(Message *m, msg_addr_t dest, const entity_inst_t& inst,
+                                                        int port=0, int fromport=0);
+
+    virtual void mark_down(msg_addr_t a, entity_inst_t& i);
+    virtual void mark_up(msg_addr_t a, entity_inst_t& i);
+    //virtual void reset(msg_addr_t a);
+  };
+
+
+  class SingleDispatcher : public Thread {
+    Rank *rank;
+  public:
+    SingleDispatcher(Rank *r) : rank(r) {}
+    void *entry() {
+      rank->single_dispatcher_entry();
+      return 0;
+    }
+  } single_dispatcher;
+
+  Cond            single_dispatch_cond;
+  bool            single_dispatch_stop;
+  list<Message*>  single_dispatch_queue;
+
+  map<msg_addr_t, list<Message*> > waiting_for_ready;
+
+  void single_dispatcher_entry();
+  void _submit_single_dispatch(Message *m);
+
+
+  // Rank stuff
+ public:
+  Mutex lock;
+  Cond  wait_cond;  // for wait()
+  
+  // my rank
+  int   my_rank;
+  Cond  waiting_for_rank;
+
+  // my instance
+  entity_inst_t my_inst;
+  
+  // lookup
+  hash_map<msg_addr_t, entity_inst_t> entity_map;
+  hash_set<msg_addr_t>                entity_unstarted;
+  
+  map<msg_addr_t, list<Message*> > waiting_for_lookup;
+  set<msg_addr_t>                  looking_up;
+
+  // register
+  map<int, Cond* >        waiting_for_register_cond;
+  map<int, msg_addr_t >   waiting_for_register_result;
+  
+  // local
+  map<msg_addr_t, EntityMessenger*> local;
+  
+  // remote
+  hash_map<int, Pipe*> rank_pipe;
+
+  set<Pipe*>      pipes;
+  list<Pipe*>     pipe_reap_queue;
+    
+  EntityMessenger *messenger;   // rankN
+  Namer           *namer;
+
+  entity_inst_t    namer_inst;
+
+  void show_dir();
+
+  void lookup(msg_addr_t addr);
+  
+  void dispatch(Message *m);
+  void handle_connect_ack(class MNSConnectAck *m);
+  void handle_register_ack(class MNSRegisterAck *m);
+  void handle_lookup_reply(class MNSLookupReply *m);
+  
+  Pipe *connect_rank(const entity_inst_t& inst);
+
+  void mark_down(msg_addr_t addr, entity_inst_t& i);
+  void mark_up(msg_addr_t addr, entity_inst_t& i);
+
+  tcpaddr_t get_listen_addr() { return accepter.listen_addr; }
+
+  void reaper();
+
+
+public:
+  Rank(int r=-1);
+  ~Rank();
+
+  int find_ns_addr(tcpaddr_t &tcpaddr);
+
+  void set_namer(const tcpaddr_t& ns);
+  void start_namer();
+
+  int start_rank();
+  void wait();
+
+  EntityMessenger *register_entity(msg_addr_t addr);
+  void unregister_entity(EntityMessenger *ms);
+
+  void submit_message(Message *m, const entity_inst_t& inst);  
+  void prepare_dest(const entity_inst_t& inst);
+  void submit_message(Message *m);  
+  void submit_messages(list<Message*>& ls);  
+
+  // create a new messenger
+  EntityMessenger *new_entity(msg_addr_t addr);
+
+} ;
+
+
+
+extern Rank rank;
+
+#endif
index b0130156a6fe1904c8f9fc52c3db65cf819de1b2..075c23f1d51a66be037b39ae0d47a48c80778eb4 100644 (file)
@@ -27,7 +27,7 @@ using namespace std;
 #include "client/Client.h"
 #include "client/SyntheticClient.h"
 
-#include "msg/NewMessenger.h"
+#include "msg/NewerMessenger.h"
 
 #include "common/Timer.h"