]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msgr: kill static instance 'rank' of SimpleMessenger v0.7.3
authorSage Weil <sage@newdream.net>
Fri, 1 May 2009 14:11:46 +0000 (07:11 -0700)
committerSage Weil <sage@newdream.net>
Fri, 1 May 2009 14:12:18 +0000 (07:12 -0700)
src/ceph.cc
src/cfuse.cc
src/cmds.cc
src/cmon.cc
src/cosd.cc
src/csyn.cc
src/dumpjournal.cc
src/mon/MonClient.cc
src/msg/SimpleMessenger.cc
src/msg/SimpleMessenger.h
src/testmsgr.cc

index eff440dfe0af007f3fd6a1b19eba157c6e353727..5884b0e9ba55060a3f8a11d94d1face1d4cddd01 100644 (file)
@@ -587,12 +587,13 @@ int main(int argc, const char **argv, const char *envp[])
     return -1;
   
   // start up network
+  SimpleMessenger rank;
   rank.bind();
   messenger = rank.register_entity(entity_name_t::ADMIN());
   messenger->set_dispatcher(&dispatcher);
 
   rank.start();
-  rank.set_policy(entity_name_t::TYPE_MON, Rank::Policy::lossy_fail_after(1.0));
+  rank.set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossy_fail_after(1.0));
 
   if (watch) {
     lock.Lock();
index b502e6099d0342ee6a469034bd1b003ed853931d..f44aed720d8d518fe1966304178bb5a79edae80e 100644 (file)
@@ -69,6 +69,7 @@ int main(int argc, const char **argv, const char *envp[]) {
     return -1;
 
   // start up network
+  SimpleMessenger rank;
   rank.bind();
   cout << "bound to " << rank.get_rank_addr() << ", mounting ceph" << std::endl;
 
@@ -76,9 +77,9 @@ int main(int argc, const char **argv, const char *envp[]) {
 
   rank.start();
 
-  rank.set_policy(entity_name_t::TYPE_MON, Rank::Policy::lossy_fast_fail());
-  rank.set_policy(entity_name_t::TYPE_MDS, Rank::Policy::lossless());
-  rank.set_policy(entity_name_t::TYPE_OSD, Rank::Policy::lossless());
+  rank.set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossy_fast_fail());
+  rank.set_policy(entity_name_t::TYPE_MDS, SimpleMessenger::Policy::lossless());
+  rank.set_policy(entity_name_t::TYPE_OSD, SimpleMessenger::Policy::lossless());
 
   // start client
   client->init();
index a1603dd3171424a43c1f7d119d1b9c0fb09c1b70..2e76b687ea7d3f0d1c048fc7a4a788a07c536e26 100644 (file)
@@ -68,6 +68,7 @@ int main(int argc, const char **argv)
   if (mc.get_monmap(&monmap) < 0)
     return -1;
 
+  SimpleMessenger rank;
   rank.bind();
   cout << "starting mds." << g_conf.id
        << " at " << rank.get_rank_addr() 
@@ -79,10 +80,10 @@ int main(int argc, const char **argv)
   if (!m)
     return 1;
 
-  rank.set_policy(entity_name_t::TYPE_MON, Rank::Policy::lossy_fail_after(1.0));
-  rank.set_policy(entity_name_t::TYPE_MDS, Rank::Policy::lossless());
-  rank.set_policy(entity_name_t::TYPE_OSD, Rank::Policy::lossless());
-  rank.set_policy(entity_name_t::TYPE_CLIENT, Rank::Policy::lossless());  // mds does its own timeout/markdown
+  rank.set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossy_fail_after(1.0));
+  rank.set_policy(entity_name_t::TYPE_MDS, SimpleMessenger::Policy::lossless());
+  rank.set_policy(entity_name_t::TYPE_OSD, SimpleMessenger::Policy::lossless());
+  rank.set_policy(entity_name_t::TYPE_CLIENT, SimpleMessenger::Policy::lossless());  // mds does its own timeout/markdown
 
   rank.start();
   
index d84e217e8a201f41644265d5484f7005b2013173..d75e861095824a58a549c51af85790b645816986 100644 (file)
@@ -122,6 +122,8 @@ int main(int argc, const char **argv)
   }
 
   // bind
+  SimpleMessenger rank;
+
   cout << "starting mon" << whoami 
        << " at " << monmap.get_inst(whoami).addr
        << " mon_data " << g_conf.mon_data
@@ -141,12 +143,12 @@ int main(int argc, const char **argv)
 
   rank.start();  // may daemonize
 
-  rank.set_policy(entity_name_t::TYPE_MON, Rank::Policy::lossless());
+  rank.set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossless());
 
-  rank.set_policy(entity_name_t::TYPE_MDS, Rank::Policy::lossy_fast_fail());
-  rank.set_policy(entity_name_t::TYPE_CLIENT, Rank::Policy::lossy_fast_fail());
-  rank.set_policy(entity_name_t::TYPE_OSD, Rank::Policy::lossy_fast_fail());
-  rank.set_policy(entity_name_t::TYPE_ADMIN, Rank::Policy::lossy_fast_fail());
+  rank.set_policy(entity_name_t::TYPE_MDS, SimpleMessenger::Policy::lossy_fast_fail());
+  rank.set_policy(entity_name_t::TYPE_CLIENT, SimpleMessenger::Policy::lossy_fast_fail());
+  rank.set_policy(entity_name_t::TYPE_OSD, SimpleMessenger::Policy::lossy_fast_fail());
+  rank.set_policy(entity_name_t::TYPE_ADMIN, SimpleMessenger::Policy::lossy_fast_fail());
 
 
   mon->init();
index c49908d0741fad44b58bcd72a92bb3624d5f7408..0ff53dee2272b5051f8ba9b9b31913fa50456572 100644 (file)
@@ -122,6 +122,7 @@ int main(int argc, const char **argv)
   }
 
   // start up network
+  SimpleMessenger rank;
   rank.bind();
 
   cout << "starting osd" << whoami
@@ -142,14 +143,14 @@ int main(int argc, const char **argv)
   if (!hbm)
     return 1;
 
-  rank.set_policy(entity_name_t::TYPE_MON, Rank::Policy::lossy_fast_fail());
-  rank.set_policy(entity_name_t::TYPE_OSD, Rank::Policy::lossless());
+  rank.set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossy_fast_fail());
+  rank.set_policy(entity_name_t::TYPE_OSD, SimpleMessenger::Policy::lossless());
 
   // make a _reasonable_ effort to send acks/replies to requests, but
   // don't get carried away, as the sender may go away and we won't
   // ever hear about it.
-  rank.set_policy(entity_name_t::TYPE_MDS, Rank::Policy::lossy_fast_fail());
-  rank.set_policy(entity_name_t::TYPE_CLIENT, Rank::Policy::lossy_fast_fail());
+  rank.set_policy(entity_name_t::TYPE_MDS, SimpleMessenger::Policy::lossy_fast_fail());
+  rank.set_policy(entity_name_t::TYPE_CLIENT, SimpleMessenger::Policy::lossy_fast_fail());
 
   rank.start();
 
index 58110fa6a696ad3bbd5f15ea7e1a9809aacecec7..62cd08a0bd3d5d4c877923c61a63374d814a7188 100644 (file)
@@ -56,12 +56,13 @@ int main(int argc, const char **argv, char *envp[])
     return -1;
 
   // start up network
+  SimpleMessenger rank;
   rank.bind();
   cout << "starting csyn at " << rank.get_rank_addr() << std::endl;
 
-  rank.set_policy(entity_name_t::TYPE_MON, Rank::Policy::lossy_fast_fail());
-  rank.set_policy(entity_name_t::TYPE_MDS, Rank::Policy::lossless());
-  rank.set_policy(entity_name_t::TYPE_OSD, Rank::Policy::lossless());
+  rank.set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossy_fast_fail());
+  rank.set_policy(entity_name_t::TYPE_MDS, SimpleMessenger::Policy::lossless());
+  rank.set_policy(entity_name_t::TYPE_OSD, SimpleMessenger::Policy::lossless());
 
   list<Client*> clients;
   list<SyntheticClient*> synclients;
index 158ff642ffcff135acbfc93a0b7a69310311fc37..00b39721027c569eaac840ef412e3c6098cca0d0 100644 (file)
@@ -88,6 +88,7 @@ int main(int argc, const char **argv, const char *envp[])
     return -1;
   
   // start up network
+  SimpleMessenger rank;
   rank.bind();
   g_conf.daemonize = false; // not us!
   rank.start();
index 73888a98ceadcfbf4c5dcf7d25cc7ee1d4c9045d..65f7964083935d300d544991b346a51161c29e49 100644 (file)
@@ -34,7 +34,9 @@ int MonClient::probe_mon(MonMap *pmonmap)
     cerr << "couldn't parse ip:port(s) from '" << g_conf.mon_host << "'" << std::endl;
     return -1;
   }
-  
+
+  SimpleMessenger rank; 
+
   rank.bind();
   dout(1) << " connecting to monitor(s) at " << monaddrs << " ..." << dendl;
   
index b967ca0d83027128d4115f5cd7ca394acfd42a1d..22822405ed09810783c6f10560a6d1fccff4193b 100644 (file)
@@ -38,9 +38,9 @@
 
 #define DOUT_SUBSYS ms
 #undef dout_prefix
-#define dout_prefix _prefix()
-static ostream& _prefix() {
-  return *_dout << dbeginl << pthread_self() << " -- " << rank.rank_addr << " ";
+#define dout_prefix _prefix(rank)
+static ostream& _prefix(SimpleMessenger *rank) {
+  return *_dout << dbeginl << pthread_self() << " -- " << rank->rank_addr << " ";
 }
 
 
@@ -53,8 +53,6 @@ static ostream& _prefix() {
 #define opened_socket() //dout(20) << "opened_socket " << ++sockopen << dendl;
 
 
-Rank rank;
-
 #ifdef DARWIN
 sig_t old_sigint_handler = 0;
 #else
@@ -70,7 +68,7 @@ void noop_signal_handler(int s)
   //dout(0) << "blah_handler got " << s << dendl;
 }
 
-int Rank::Accepter::bind(int64_t force_nonce)
+int SimpleMessenger::Accepter::bind(int64_t force_nonce)
 {
   // bind to a socket
   dout(10) << "accepter.bind" << dendl;
@@ -152,7 +150,7 @@ int Rank::Accepter::bind(int64_t force_nonce)
   return 0;
 }
 
-int Rank::Accepter::start()
+int SimpleMessenger::Accepter::start()
 {
   dout(1) << "accepter.start" << dendl;
 
@@ -171,7 +169,7 @@ int Rank::Accepter::start()
   return 0;
 }
 
-void *Rank::Accepter::entry()
+void *SimpleMessenger::Accepter::entry()
 {
   dout(10) << "accepter starting" << dendl;
   
@@ -229,7 +227,7 @@ void *Rank::Accepter::entry()
   return 0;
 }
 
-void Rank::Accepter::stop()
+void SimpleMessenger::Accepter::stop()
 {
   done = true;
   dout(10) << "stop sending SIGUSR1" << dendl;
@@ -241,1887 +239,1889 @@ void Rank::Accepter::stop()
 
 
 
-/********************************************
- * Rank
- */
 
 
-/*
- * note: assumes lock is held
+/**********************************
+ * Endpoint
  */
-void Rank::reaper()
-{
-  dout(10) << "reaper" << dendl;
-  assert(lock.is_locked());
-
-  while (!pipe_reap_queue.empty()) {
-    Pipe *p = pipe_reap_queue.front();
-    dout(10) << "reaper reaping pipe " << p << " " << p->get_peer_addr() << dendl;
-    p->unregister_pipe();
-    pipe_reap_queue.pop_front();
-    assert(pipes.count(p));
-    pipes.erase(p);
-    p->join();
-    p->discard_queue();
-    dout(10) << "reaper reaped pipe " << p << " " << p->get_peer_addr() << dendl;
-    assert(p->sd < 0);
-    delete p;
-    dout(10) << "reaper deleted pipe " << p << dendl;
-  }
-}
 
-
-int Rank::bind(int64_t force_nonce)
+void SimpleMessenger::Endpoint::dispatch_entry()
 {
   lock.Lock();
-  if (started) {
-    dout(10) << "rank.bind already started" << dendl;
-    lock.Unlock();
-    return -1;
+  while (!stop) {
+    if (!dispatch_queue.empty()) {
+      list<Message*> ls;
+
+      // take highest priority message off the queue
+      map<int, list<Message*> >::reverse_iterator p = dispatch_queue.rbegin();
+      ls.push_back(p->second.front());
+      p->second.pop_front();
+      if (p->second.empty())
+       dispatch_queue.erase(p->first);
+      qlen--;
+
+      lock.Unlock();
+      {
+        // deliver
+        while (!ls.empty()) {
+         if (stop) {
+           dout(1) << "dispatch: stop=true, discarding " << ls.size() 
+                   << " messages in dispatch queue" << dendl;
+           break;
+         }
+          Message *m = ls.front();
+          ls.pop_front();
+         if ((long)m == BAD_REMOTE_RESET) {
+           lock.Lock();
+           entity_addr_t a = remote_reset_q.front().first;
+           entity_name_t n = remote_reset_q.front().second;
+           remote_reset_q.pop_front();
+           lock.Unlock();
+           get_dispatcher()->ms_handle_remote_reset(a, n);
+         } else if ((long)m == BAD_RESET) {
+           lock.Lock();
+           entity_addr_t a = reset_q.front().first;
+           entity_name_t n = reset_q.front().second;
+           reset_q.pop_front();
+           lock.Unlock();
+           get_dispatcher()->ms_handle_reset(a, n);
+         } else if ((long)m == BAD_FAILED) {
+           lock.Lock();
+           m = failed_q.front().first;
+           entity_inst_t i = failed_q.front().second;
+           failed_q.pop_front();
+           lock.Unlock();
+           get_dispatcher()->ms_handle_failure(m, i);
+           m->put();
+         } else {
+           dout(1) << m->get_dest() 
+                   << " <== " << m->get_source_inst()
+                   << " " << m->get_seq()
+                   << " ==== " << *m
+                   << " ==== " << m->get_payload().length() << "+" << m->get_data().length()
+                   << " (" << m->get_footer().front_crc << " " << m->get_footer().data_crc << ")"
+                   << " " << m 
+                   << dendl;
+           dispatch(m);
+           dout(20) << "done calling dispatch on " << m << dendl;
+         }
+        }
+      }
+      lock.Lock();
+      continue;
+    }
+    cond.Wait(lock);
   }
-  dout(10) << "rank.bind" << dendl;
   lock.Unlock();
+  dout(15) << "dispatch: ending loop " << dendl;
 
-  // bind to a socket
-  return accepter.bind(force_nonce);
+  // deregister
+  rank->unregister_entity(this);
+  put();
 }
 
+void SimpleMessenger::Endpoint::ready()
+{
+  dout(10) << "ready " << get_myaddr() << dendl;
+  assert(!dispatch_thread.is_started());
+  get();
+  dispatch_thread.create();
+}
 
-class C_Die : public Context {
-public:
-  void finish(int) {
-    cerr << "die" << std::endl;
-    exit(1);
-  }
-};
 
-static void write_pid_file(int pid)
+int SimpleMessenger::Endpoint::shutdown()
 {
-  if (!g_conf.pid_file)
-    return;
-
-  int fd = ::open(g_conf.pid_file, O_CREAT|O_TRUNC|O_WRONLY, 0644);
-  if (fd >= 0) {
-    char buf[20];
-    int len = sprintf(buf, "%d\n", pid);
-    ::write(fd, buf, len);
-    ::close(fd);
+  dout(10) << "shutdown " << get_myaddr() << dendl;
+  
+  // stop my dispatch thread
+  if (dispatch_thread.am_self()) {
+    dout(10) << "shutdown i am dispatch, setting stop flag" << dendl;
+    stop = true;
+  } else {
+    dout(10) << "shutdown i am not dispatch, setting stop flag and joining thread." << dendl;
+    lock.Lock();
+    stop = true;
+    cond.Signal();
+    lock.Unlock();
   }
+  return 0;
 }
 
-static void remove_pid_file()
+void SimpleMessenger::Endpoint::suicide()
 {
-  if (!g_conf.pid_file)
-    return;
-
-  // only remove it if it has OUR pid in it!
-  int fd = ::open(g_conf.pid_file, O_RDONLY);
-  if (fd >= 0) {
-    char buf[20];
-    ::read(fd, buf, 20);
-    ::close(fd);
-    int a = atoi(buf);
+  dout(10) << "suicide " << get_myaddr() << dendl;
+  shutdown();
+  // hmm, or exit(0)?
+}
 
-    if (a == getpid())
-      ::unlink(g_conf.pid_file);
-    else
-      dout(0) << "strange, pid file " << g_conf.pid_file 
-             << " has " << a << ", not expected " << getpid()
-             << dendl;
+void SimpleMessenger::Endpoint::prepare_dest(const entity_inst_t& inst)
+{
+  rank->lock.Lock();
+  {
+    if (rank->rank_pipe.count(inst.addr) == 0)
+      rank->connect_rank(inst.addr, rank->policy_map[inst.name.type()]);
   }
+  rank->lock.Unlock();
 }
 
-int Rank::start(bool nodaemon)
+int SimpleMessenger::Endpoint::send_message(Message *m, entity_inst_t dest)
 {
-  // register at least one entity, first!
-  assert(my_type >= 0); 
-
-  lock.Lock();
-  if (started) {
-    dout(10) << "rank.start already started" << dendl;
-    lock.Unlock();
-    return 0;
-  }
+  // set envelope
+  m->set_source_inst(_myinst);
+  m->set_orig_source_inst(_myinst);
+  m->set_dest_inst(dest);
+  if (!g_conf.ms_nocrc)
+    m->calc_data_crc();
+  else
+    m->get_footer().flags = (unsigned)m->get_footer().flags | CEPH_MSG_FOOTER_NOCRC;
+  if (!m->get_priority()) m->set_priority(get_default_send_priority());
+  dout(1) << m->get_source()
+          << " --> " << dest.name << " " << dest.addr
+          << " -- " << *m
+         << " -- ?+" << m->get_data().length()
+         << " (? " << m->get_footer().data_crc << ")"
+         << " " << m 
+         << dendl;
 
-  dout(1) << "rank.start at " << rank_addr << dendl;
-  started = true;
-  lock.Unlock();
+  rank->submit_message(m, dest.addr);
 
-  // daemonize?
-  if (g_conf.daemonize && !nodaemon) {
-    if (Thread::get_num_threads() > 0) {
-      derr(0) << "rank.start BUG: there are " << Thread::get_num_threads()
-             << " already started that will now die!  call rank.start() sooner." 
-             << dendl;
-    }
-    dout(1) << "rank.start daemonizing" << dendl;
+  return 0;
+}
 
-    if (1) {
-      daemon(1, 0);
-      write_pid_file(getpid());
-    } else {
-      pid_t pid = fork();
-      if (pid) {
-       // i am parent
-       write_pid_file(pid);
-       ::close(0);
-       ::close(1);
-       ::close(2);
-       _exit(0);
-      }
-    }
+int SimpleMessenger::Endpoint::forward_message(Message *m, entity_inst_t dest)
+{
+  // set envelope
+  m->set_source_inst(_myinst);
+  m->set_dest_inst(dest);
+  if (!g_conf.ms_nocrc)
+    m->calc_data_crc();
+  else
+    m->get_footer().flags = (unsigned)m->get_footer().flags | CEPH_MSG_FOOTER_NOCRC;
+  if (!m->get_priority()) m->set_priority(get_default_send_priority());
  
-    if (g_conf.chdir && g_conf.chdir[0]) {
-      ::mkdir(g_conf.chdir, 0700);
-      ::chdir(g_conf.chdir);
-    }
-
-    _dout_rename_output_file();
-  } else if (g_daemon) {
-    write_pid_file(getpid());
-  }
+  dout(1) << m->get_source()
+          << " **> " << dest.name << " " << dest.addr
+          << " -- " << *m
+         << " -- ?+" << m->get_data().length()
+         << " (? " << m->get_footer().data_crc << ")"
+         << " " << m 
+          << dendl;
 
-  // some debug hackery?
-  if (g_conf.kill_after) 
-    g_timer.add_event_after(g_conf.kill_after, new C_Die);
+  rank->submit_message(m, dest.addr);
 
-  // go!
-  accepter.start();
   return 0;
 }
 
 
-/* connect_rank
- * NOTE: assumes rank.lock held.
- */
-Rank::Pipe *Rank::connect_rank(const entity_addr_t& addr, const Policy& p)
+
+int SimpleMessenger::Endpoint::lazy_send_message(Message *m, entity_inst_t dest)
 {
-  assert(lock.is_locked());
-  assert(addr != rank_addr);
-  
-  dout(10) << "connect_rank to " << addr << ", creating pipe and registering" << dendl;
-  
-  // create pipe
-  Pipe *pipe = new Pipe(this, Pipe::STATE_CONNECTING);
-  pipe->policy = p;
-  pipe->peer_addr = addr;
-  pipe->start_writer();
-  pipe->register_pipe();
-  pipes.insert(pipe);
-
-  return pipe;
-}
-
+  // set envelope
+  m->set_source_inst(_myinst);
+  m->set_orig_source_inst(_myinst);
+  m->set_dest_inst(dest);
+  if (!g_conf.ms_nocrc)
+    m->calc_data_crc();
+  else
+    m->get_footer().flags = (unsigned)m->get_footer().flags | CEPH_MSG_FOOTER_NOCRC;
+  if (!m->get_priority()) m->set_priority(get_default_send_priority());
+  dout(1) << "lazy " << m->get_source()
+          << " --> " << dest.name << " " << dest.addr
+          << " -- " << *m
+         << " -- ?+" << m->get_data().length()
+         << " (? " << m->get_footer().data_crc << ")"
+         << " " << m 
+          << dendl;
 
+  rank->submit_message(m, dest.addr, true);
 
+  return 0;
+}
 
 
 
+void SimpleMessenger::Endpoint::reset_myname(entity_name_t newname)
+{
+  entity_name_t oldname = get_myname();
+  dout(10) << "reset_myname " << oldname << " to " << newname << dendl;
+  _set_myname(newname);
+}
 
 
-/* register_entity 
- */
-Rank::Endpoint *Rank::register_entity(entity_name_t name)
+void SimpleMessenger::Endpoint::mark_down(entity_addr_t a)
 {
-  dout(10) << "register_entity " << name << dendl;
-  lock.Lock();
-  
-  // create messenger
-  int erank = max_local;
-  Endpoint *msgr = new Endpoint(this, name, erank);
+  rank->mark_down(a);
+}
 
-  // now i know my type.
-  if (my_type >= 0)
-    assert(my_type == name.type());
-  else
-    my_type = name.type();
 
-  // add to directory
-  max_local++;
-  local.resize(max_local);
-  stopped.resize(max_local);
 
-  msgr->get();
-  local[erank] = msgr;
-  stopped[erank] = false;
-  msgr->_myinst.addr = rank_addr;
-  if (msgr->_myinst.addr.ipaddr == entity_addr_t().ipaddr)
-    msgr->need_addr = true;
-  msgr->_myinst.addr.erank = erank;
 
-  dout(10) << "register_entity " << name << " at " << msgr->_myinst.addr 
-          << " need_addr=" << need_addr
-          << dendl;
 
-  num_local++;
-  
-  lock.Unlock();
-  return msgr;
-}
+/**************************************
+ * Pipe
+ */
 
+#undef dout_prefix
+#define dout_prefix _pipe_prefix()
+ostream& SimpleMessenger::Pipe::_pipe_prefix() {
+  return *_dout << dbeginl << pthread_self()
+               << " -- " << rank->rank_addr << " >> " << peer_addr << " pipe(" << this
+               << " sd=" << sd
+               << " pgs=" << peer_global_seq
+               << " cs=" << connect_seq
+               << ").";
+}
 
-void Rank::unregister_entity(Endpoint *msgr)
+int SimpleMessenger::Pipe::accept()
 {
-  lock.Lock();
-  dout(10) << "unregister_entity " << msgr->get_myname() << dendl;
-  
-  // remove from local directory.
-  assert(msgr->my_rank >= 0);
-  assert(local[msgr->my_rank] == msgr);
-  local[msgr->my_rank] = 0;
-  stopped[msgr->my_rank] = true;
-  num_local--;
-  msgr->my_rank = -1;
-
-  assert(msgr->nref.test() > 1);
-  msgr->put();
-
-  wait_cond.Signal();
+  dout(10) << "accept" << dendl;
 
-  lock.Unlock();
-}
+  // my creater gave me sd via accept()
+  assert(state == STATE_ACCEPTING);
+  
+  // announce myself.
+  int rc = tcp_write(sd, CEPH_BANNER, strlen(CEPH_BANNER));
+  if (rc < 0) {
+    dout(10) << "accept couldn't write banner" << dendl;
+    state = STATE_CLOSED;
+    return -1;
+  }
 
+  // and my addr
+  rc = tcp_write(sd, (char*)&rank->rank_addr, sizeof(rank->rank_addr));
+  if (rc < 0) {
+    dout(10) << "accept couldn't write my addr" << dendl;
+    state = STATE_CLOSED;
+    return -1;
+  }
+  dout(10) << "accept sd=" << sd << dendl;
+  
+  // identify peer
+  char banner[strlen(CEPH_BANNER)+1];
+  rc = tcp_read(sd, banner, strlen(CEPH_BANNER));
+  if (rc < 0) {
+    dout(10) << "accept couldn't read banner" << dendl;
+    state = STATE_CLOSED;
+    return -1;
+  }
+  if (memcmp(banner, CEPH_BANNER, strlen(CEPH_BANNER))) {
+    banner[strlen(CEPH_BANNER)] = 0;
+    dout(10) << "accept peer sent bad banner '" << banner << "'" << dendl;
+    state = STATE_CLOSED;
+    return -1;
+  }
+  rc = tcp_read(sd, (char*)&peer_addr, sizeof(peer_addr));
+  if (rc < 0) {
+    dout(10) << "accept couldn't read peer_addr" << dendl;
+    state = STATE_CLOSED;
+    return -1;
+  }
+  dout(10) << "accept peer addr is " << peer_addr << dendl;
+  if (peer_addr.ipaddr.sin_addr.s_addr == htonl(INADDR_ANY)) {
+    // peer apparently doesn't know what ip they have; figure it out for them.
+    entity_addr_t old_addr = peer_addr;
+    socklen_t len = sizeof(peer_addr.ipaddr);
+    int r = ::getpeername(sd, (sockaddr*)&peer_addr.ipaddr, &len);
+    if (r < 0) {
+      dout(0) << "accept failed to getpeername " << errno << " " << strerror(errno) << dendl;
+      state = STATE_CLOSED;
+      return -1;
+    }
+    peer_addr.ipaddr.sin_port = old_addr.ipaddr.sin_port;
+    dout(2) << "accept peer says " << old_addr << ", socket says " << peer_addr << dendl;
+  }
+  
+  ceph_msg_connect connect;
+  ceph_msg_connect_reply reply;
+  Pipe *existing = 0;
+  
+  // this should roughly mirror pseudocode at
+  //  http://ceph.newdream.net/wiki/Messaging_protocol
 
-void Rank::submit_message(Message *m, const entity_addr_t& dest_addr, bool lazy)
-{
-  const entity_name_t dest = m->get_dest();
+  while (1) {
+    rc = tcp_read(sd, (char*)&connect, sizeof(connect));
+    if (rc < 0) {
+      dout(10) << "accept couldn't read connect" << dendl;
+      goto fail_unlocked;
+    }
+    dout(20) << "accept got peer connect_seq " << connect.connect_seq
+            << " global_seq " << connect.global_seq
+            << dendl;
+    
+    rank->lock.Lock();
 
-  assert(m->nref.test() == 0);
+    // note peer's type, flags
+    policy = rank->policy_map[connect.host_type];  /* apply policy */
+    lossy_rx = connect.flags & CEPH_MSG_CONNECT_LOSSY;
 
-  m->get_header().mon_protocol = CEPH_MON_PROTOCOL;
-  m->get_header().monc_protocol = CEPH_MONC_PROTOCOL;
-  m->get_header().mds_protocol = CEPH_MDS_PROTOCOL;
-  m->get_header().mdsc_protocol = CEPH_MDSC_PROTOCOL;
-  m->get_header().osd_protocol = CEPH_OSD_PROTOCOL;
-  m->get_header().osdc_protocol = CEPH_OSDC_PROTOCOL;
+    memset(&reply, 0, sizeof(reply));
 
-  // lookup
-  entity_addr_t dest_proc_addr = dest_addr;
-  dest_proc_addr.erank = 0;
+    // existing?
+    if (rank->rank_pipe.count(peer_addr)) {
+      existing = rank->rank_pipe[peer_addr];
+      existing->lock.Lock();
 
-  lock.Lock();
-  {
-    // local?
-    if (rank_addr.is_local_to(dest_addr)) {
-      if (dest_addr.erank < max_local && local[dest_addr.erank]) {
-        // local
-        dout(20) << "submit_message " << *m << " dest " << dest << " local" << dendl;
-       local[dest_addr.erank]->queue_message(m);
+      if (connect.global_seq < existing->peer_global_seq) {
+       dout(10) << "accept existing " << existing << ".gseq " << existing->peer_global_seq
+                << " > " << connect.global_seq << ", RETRY_GLOBAL" << dendl;
+       reply.tag = CEPH_MSGR_TAG_RETRY_GLOBAL;
+       reply.global_seq = existing->peer_global_seq;  // so we can send it below..
+       existing->lock.Unlock();
+       rank->lock.Unlock();
+       goto reply;
       } else {
-        derr(0) << "submit_message " << *m << " dest " << dest << " " << dest_addr << " local but not in local map?  dropping." << dendl;
-        //assert(0);  // hmpf, this is probably mds->mon beacon from newsyn.
-       delete m;
+       dout(10) << "accept existing " << existing << ".gseq " << existing->peer_global_seq
+                << " <= " << connect.global_seq << ", looks ok" << dendl;
       }
-    }
-    else {
-      // remote.
-      Pipe *pipe = 0;
-      if (rank_pipe.count( dest_proc_addr )) {
-        // connected?
-        pipe = rank_pipe[ dest_proc_addr ];
-       pipe->lock.Lock();
-       if (pipe->state == Pipe::STATE_CLOSED) {
-         dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_addr << ", ignoring old closed pipe." << dendl;
-         pipe->unregister_pipe();
-         pipe->lock.Unlock();
-         pipe = 0;
+      
+      if (existing->policy.lossy_tx) {
+       dout(-10) << "accept replacing existing (lossy) channel" << dendl;
+       existing->was_session_reset();
+       goto replace;
+      }
+      if (lossy_rx) {
+       if (existing->state == STATE_STANDBY) {
+         dout(-10) << "accept incoming lossy connection, kicking outgoing lossless "
+                   << existing << dendl;
+         existing->state = STATE_CONNECTING;
+         existing->cond.Signal();
        } else {
-         dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_addr << ", have pipe." << dendl;
-
-         // if this pipe was created by an incoming connection, but we haven't received
-         // a message yet, then it won't have the policy set.
-         if (pipe->get_out_seq() == 0)
-           pipe->policy = policy_map[m->get_dest().type()];
-
-         pipe->_send(m);
-         pipe->lock.Unlock();
+         dout(-10) << "accept incoming lossy connection, our lossless " << existing
+                   << " has state " << existing->state << ", doing nothing" << dendl;
        }
+       existing->lock.Unlock();
+       goto fail;
       }
-      if (!pipe) {
-       if (lazy) {
-         dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_addr << ", lazy, dropping." << dendl;
-         delete m;
-       } else {
-         dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_addr << ", new pipe." << dendl;
-         // not connected.
-         pipe = connect_rank(dest_proc_addr, policy_map[m->get_dest().type()]);
-         pipe->send(m);
+
+      dout(-10) << "accept connect_seq " << connect.connect_seq
+               << " vs existing " << existing->connect_seq
+               << " state " << existing->state << dendl;
+
+      if (connect.connect_seq < existing->connect_seq) {
+       if (connect.connect_seq == 0) {
+         dout(-10) << "accept peer reset, then tried to connect to us, replacing" << dendl;
+         existing->was_session_reset(); // this resets out_queue, msg_ and connect_seq #'s
+         goto replace;
+       } else {
+         // old attempt, or we sent READY but they didn't get it.
+         dout(10) << "accept existing " << existing << ".cseq " << existing->connect_seq
+                  << " > " << connect.connect_seq << ", RETRY_SESSION" << dendl;
+         reply.tag = CEPH_MSGR_TAG_RETRY_SESSION;
+         reply.connect_seq = existing->connect_seq;  // so we can send it below..
+         existing->lock.Unlock();
+         rank->lock.Unlock();
+         goto reply;
+       }
+      }
+
+      if (connect.connect_seq == existing->connect_seq) {
+       // connection race?
+       if (peer_addr < rank->rank_addr) {
+         // incoming wins
+         dout(10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq
+                  << " == " << connect.connect_seq << ", replacing my attempt" << dendl;
+         assert(existing->state == STATE_CONNECTING ||
+                existing->state == STATE_STANDBY ||
+                existing->state == STATE_WAIT);
+         goto replace;
+       } else {
+         // our existing outgoing wins
+         dout(10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq
+                  << " == " << connect.connect_seq << ", sending WAIT" << dendl;
+         assert(peer_addr > rank->rank_addr);
+         assert(existing->state == STATE_CONNECTING); // this will win
+         reply.tag = CEPH_MSGR_TAG_WAIT;
+         existing->lock.Unlock();
+         rank->lock.Unlock();
+         goto reply;
        }
       }
+
+      assert(connect.connect_seq > existing->connect_seq);
+      assert(connect.global_seq >= existing->peer_global_seq);
+      if (existing->connect_seq == 0) {
+       dout(0) << "accept we reset (peer sent cseq " << connect.connect_seq 
+                << ", " << existing << ".cseq = " << existing->connect_seq
+                << "), sending RESETSESSION" << dendl;
+       reply.tag = CEPH_MSGR_TAG_RESETSESSION;
+       rank->lock.Unlock();
+       existing->lock.Unlock();
+       goto reply;
+      }
+
+      // reconnect
+      dout(10) << "accept peer sent cseq " << connect.connect_seq
+              << " > " << existing->connect_seq << dendl;
+      goto replace;
+    } // existing
+    else if (connect.connect_seq > 0) {
+      // we reset, and they are opening a new session
+      dout(0) << "accept we reset (peer sent cseq " << connect.connect_seq << "), sending RESETSESSION" << dendl;
+      rank->lock.Unlock();
+      reply.tag = CEPH_MSGR_TAG_RESETSESSION;
+      goto reply;
+    } else {
+      // new session
+      dout(10) << "accept new session" << dendl;
+      goto open;
     }
-  }
+    assert(0);    
 
-  lock.Unlock();
-}
+  reply:
+    rc = tcp_write(sd, (char*)&reply, sizeof(reply));
+    if (rc < 0)
+      goto fail_unlocked;
+  }
+  
+ replace:
+  dout(10) << "accept replacing " << existing << dendl;
+  existing->state = STATE_CLOSED;
+  existing->cond.Signal();
+  existing->reader_thread.kill(SIGUSR1);
+  existing->unregister_pipe();
+    
+  // steal queue and out_seq
+  existing->requeue_sent();
+  out_seq = existing->out_seq;
+  in_seq = existing->in_seq;
+  dout(10) << "accept   out_seq " << out_seq << "  in_seq " << in_seq << dendl;
+  for (map<int, list<Message*> >::iterator p = existing->q.begin();
+       p != existing->q.end();
+       p++)
+    q[p->first].splice(q[p->first].begin(), p->second);
+  
+  existing->lock.Unlock();
 
+ open:
+  // open
+  connect_seq = connect.connect_seq + 1;
+  peer_global_seq = connect.global_seq;
+  dout(10) << "accept success, connect_seq = " << connect_seq << ", sending READY" << dendl;
 
+  // send READY reply
+  reply.tag = CEPH_MSGR_TAG_READY;
+  reply.global_seq = rank->get_global_seq();
+  reply.connect_seq = connect_seq;
+  reply.flags = 0;
+  if (policy.lossy_tx)
+    reply.flags = reply.flags | CEPH_MSG_CONNECT_LOSSY;
 
+  // ok!
+  register_pipe();
+  rank->lock.Unlock();
 
+  rc = tcp_write(sd, (char*)&reply, sizeof(reply));
+  if (rc < 0)
+    goto fail;
 
-void Rank::wait()
-{
   lock.Lock();
-  while (1) {
-    // reap dead pipes
-    reaper();
-
-    if (num_local == 0) {
-      dout(10) << "wait: everything stopped" << dendl;
-      break;   // everything stopped.
-    } else {
-      dout(10) << "wait: local still has " << local.size() << " items, waiting" << dendl;
-    }
-    
-    wait_cond.Wait(lock);
+  if (state != STATE_CLOSED) {
+    dout(10) << "accept starting writer, " << "state=" << state << dendl;
+    start_writer();
   }
+  dout(20) << "accept done" << dendl;
   lock.Unlock();
-  
-  // done!  clean up.
-  dout(20) << "wait: stopping accepter thread" << dendl;
-  accepter.stop();
-  dout(20) << "wait: stopped accepter thread" << dendl;
+  return 0;   // success.
 
-  // close+reap all pipes
-  lock.Lock();
-  {
-    dout(10) << "wait: closing pipes" << dendl;
-    list<Pipe*> toclose;
-    for (hash_map<entity_addr_t,Pipe*>::iterator i = rank_pipe.begin();
-         i != rank_pipe.end();
-         i++)
-      toclose.push_back(i->second);
-    for (list<Pipe*>::iterator i = toclose.begin();
-        i != toclose.end();
-        i++) {
-      (*i)->unregister_pipe();
-      (*i)->lock.Lock();
-      (*i)->stop();
-      (*i)->lock.Unlock();
-    }
 
-    reaper();
-    dout(10) << "wait: waiting for pipes " << pipes << " to close" << dendl;
-    while (!pipes.empty()) {
-      wait_cond.Wait(lock);
-      reaper();
-    }
-  }
+ fail:
+  rank->lock.Unlock();
+ fail_unlocked:
+  lock.Lock();
+  state = STATE_CLOSED;
+  fault();
   lock.Unlock();
-
-  dout(10) << "wait: done." << dendl;
-  dout(1) << "shutdown complete." << dendl;
-  remove_pid_file();
-  started = false;
-  my_type = -1;
+  return -1;
 }
 
+int SimpleMessenger::Pipe::connect()
+{
+  dout(10) << "connect " << connect_seq << dendl;
+  assert(lock.is_locked());
 
+  if (sd >= 0) {
+    ::close(sd);
+    sd = -1;
+    closed_socket();
+  }
+  __u32 cseq = connect_seq;
+  __u32 gseq = rank->get_global_seq();
 
+  // stop reader thrad
+  join_reader();
 
+  lock.Unlock();
+  
+  char tag = -1;
+  int rc;
+  struct sockaddr_in myAddr;
+  struct msghdr msg;
+  struct iovec msgvec[2];
+  int msglen;
+  char banner[strlen(CEPH_BANNER)];
+  entity_addr_t paddr;
 
+  // create socket?
+  sd = ::socket(AF_INET, SOCK_STREAM, 0);
+  if (sd < 0) {
+    dout(-1) << "connect couldn't created socket " << strerror(errno) << dendl;
+    assert(0);
+    goto fail;
+  }
+  opened_socket();
+  
+  // bind any port
+  myAddr.sin_family = AF_INET;
+  myAddr.sin_addr.s_addr = htonl(INADDR_ANY);
+  myAddr.sin_port = htons( 0 );    
+  dout(10) << "binding to " << myAddr << dendl;
+  rc = ::bind(sd, (struct sockaddr *)&myAddr, sizeof(myAddr));
+  if (rc < 0) {
+    dout(2) << "bind error " << myAddr
+            << ", " << errno << ": " << strerror(errno) << dendl;
+    goto fail;
+  }
 
-/**********************************
- * Endpoint
- */
-
-void Rank::Endpoint::dispatch_entry()
-{
-  lock.Lock();
-  while (!stop) {
-    if (!dispatch_queue.empty()) {
-      list<Message*> ls;
+  // connect!
+  dout(10) << "connecting to " << peer_addr.ipaddr << dendl;
+  rc = ::connect(sd, (sockaddr*)&peer_addr.ipaddr, sizeof(peer_addr.ipaddr));
+  if (rc < 0) {
+    dout(2) << "connect error " << peer_addr.ipaddr
+            << ", " << errno << ": " << strerror(errno) << dendl;
+    goto fail;
+  }
 
-      // take highest priority message off the queue
-      map<int, list<Message*> >::reverse_iterator p = dispatch_queue.rbegin();
-      ls.push_back(p->second.front());
-      p->second.pop_front();
-      if (p->second.empty())
-       dispatch_queue.erase(p->first);
-      qlen--;
+  // disable Nagle algorithm?
+  if (g_conf.ms_tcp_nodelay) {
+    int flag = 1;
+    int r = ::setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (char*)&flag, sizeof(flag));
+    if (r < 0) 
+      dout(0) << "connect couldn't set TCP_NODELAY: " << strerror(errno) << dendl;
+  }
 
-      lock.Unlock();
-      {
-        // deliver
-        while (!ls.empty()) {
-         if (stop) {
-           dout(1) << "dispatch: stop=true, discarding " << ls.size() 
-                   << " messages in dispatch queue" << dendl;
-           break;
-         }
-          Message *m = ls.front();
-          ls.pop_front();
-         if ((long)m == BAD_REMOTE_RESET) {
-           lock.Lock();
-           entity_addr_t a = remote_reset_q.front().first;
-           entity_name_t n = remote_reset_q.front().second;
-           remote_reset_q.pop_front();
-           lock.Unlock();
-           get_dispatcher()->ms_handle_remote_reset(a, n);
-         } else if ((long)m == BAD_RESET) {
-           lock.Lock();
-           entity_addr_t a = reset_q.front().first;
-           entity_name_t n = reset_q.front().second;
-           reset_q.pop_front();
-           lock.Unlock();
-           get_dispatcher()->ms_handle_reset(a, n);
-         } else if ((long)m == BAD_FAILED) {
-           lock.Lock();
-           m = failed_q.front().first;
-           entity_inst_t i = failed_q.front().second;
-           failed_q.pop_front();
-           lock.Unlock();
-           get_dispatcher()->ms_handle_failure(m, i);
-           m->put();
-         } else {
-           dout(1) << m->get_dest() 
-                   << " <== " << m->get_source_inst()
-                   << " " << m->get_seq()
-                   << " ==== " << *m
-                   << " ==== " << m->get_payload().length() << "+" << m->get_data().length()
-                   << " (" << m->get_footer().front_crc << " " << m->get_footer().data_crc << ")"
-                   << " " << m 
-                   << dendl;
-           dispatch(m);
-           dout(20) << "done calling dispatch on " << m << dendl;
-         }
-        }
-      }
-      lock.Lock();
+  // verify banner
+  // FIXME: this should be non-blocking, or in some other way verify the banner as we get it.
+  rc = tcp_read(sd, (char*)&banner, strlen(CEPH_BANNER));
+  if (rc < 0) {
+    dout(2) << "connect couldn't read banner, " << strerror(errno) << dendl;
+    goto fail;
+  }
+  if (memcmp(banner, CEPH_BANNER, strlen(CEPH_BANNER))) {
+    dout(0) << "connect protocol error (bad banner) on peer " << paddr << dendl;
+    goto fail;
+  }
+
+  memset(&msg, 0, sizeof(msg));
+  msgvec[0].iov_base = banner;
+  msgvec[0].iov_len = strlen(CEPH_BANNER);
+  msg.msg_iov = msgvec;
+  msg.msg_iovlen = 1;
+  msglen = msgvec[0].iov_len;
+  if (do_sendmsg(sd, &msg, msglen)) {
+    dout(2) << "connect couldn't write my banner, " << strerror(errno) << dendl;
+    goto fail;
+  }
+
+  // identify peer
+  rc = tcp_read(sd, (char*)&paddr, sizeof(paddr));
+  if (rc < 0) {
+    dout(2) << "connect couldn't read peer addr, " << strerror(errno) << dendl;
+    goto fail;
+  }
+  dout(20) << "connect read peer addr " << paddr << " on socket " << sd << dendl;
+  if (!peer_addr.is_local_to(paddr)) {
+    if (paddr.ipaddr.sin_addr.s_addr == 0 &&
+       peer_addr.ipaddr.sin_port == paddr.ipaddr.sin_port &&
+       peer_addr.nonce == paddr.nonce) {
+      dout(0) << "connect claims to be " 
+             << paddr << " not " << peer_addr << " - presumably this is the same node!" << dendl;
+    } else {
+      dout(0) << "connect claims to be " 
+             << paddr << " not " << peer_addr << " - wrong node!" << dendl;
+      goto fail;
+    }
+  }
+  
+  // identify myself
+  memset(&msg, 0, sizeof(msg));
+  msgvec[0].iov_base = (char*)&rank->rank_addr;
+  msgvec[0].iov_len = sizeof(rank->rank_addr);
+  msg.msg_iov = msgvec;
+  msg.msg_iovlen = 1;
+  msglen = msgvec[0].iov_len;
+  if (do_sendmsg(sd, &msg, msglen)) {
+    dout(2) << "connect couldn't write my addr, " << strerror(errno) << dendl;
+    goto fail;
+  }
+  dout(10) << "connect sent my addr " << rank->rank_addr << dendl;
+
+  while (1) {
+    ceph_msg_connect connect;
+    connect.host_type = rank->my_type;
+    connect.global_seq = gseq;
+    connect.connect_seq = cseq;
+    connect.flags = 0;
+    if (policy.lossy_tx)
+      connect.flags |= CEPH_MSG_CONNECT_LOSSY;
+    memset(&msg, 0, sizeof(msg));
+    msgvec[0].iov_base = (char*)&connect;
+    msgvec[0].iov_len = sizeof(connect);
+    msg.msg_iov = msgvec;
+    msg.msg_iovlen = 1;
+    msglen = msgvec[0].iov_len;
+
+    dout(10) << "connect sending gseq=" << gseq << " cseq=" << cseq << dendl;
+    if (do_sendmsg(sd, &msg, msglen)) {
+      dout(2) << "connect couldn't write gseq, cseq, " << strerror(errno) << dendl;
+      goto fail;
+    }
+
+    dout(20) << "connect wrote (self +) cseq, waiting for reply" << dendl;
+    ceph_msg_connect_reply reply;
+    if (tcp_read(sd, (char*)&reply, sizeof(reply)) < 0) {
+      dout(2) << "connect read reply " << strerror(errno) << dendl;
+      goto fail;
+    }
+    dout(20) << "connect got reply tag " << (int)reply.tag
+            << " connect_seq " << reply.connect_seq
+            << " global_seq " << reply.global_seq
+            << " flags " << (int)reply.flags
+            << dendl;
+
+    lock.Lock();
+    if (state != STATE_CONNECTING) {
+      dout(0) << "connect got RESETSESSION but no longer connecting" << dendl;
+      goto stop_locked;
+    }
+
+    if (reply.tag == CEPH_MSGR_TAG_RESETSESSION) {
+      dout(0) << "connect got RESETSESSION" << dendl;
+      was_session_reset();
+      cseq = 0;
+      lock.Unlock();
       continue;
     }
-    cond.Wait(lock);
+    if (reply.tag == CEPH_MSGR_TAG_RETRY_GLOBAL) {
+      gseq = rank->get_global_seq(reply.global_seq);
+      dout(10) << "connect got RETRY_GLOBAL " << reply.global_seq
+              << " chose new " << gseq << dendl;
+      lock.Unlock();
+      continue;
+    }
+    if (reply.tag == CEPH_MSGR_TAG_RETRY_SESSION) {
+      assert(reply.connect_seq > connect_seq);
+      dout(10) << "connect got RETRY_SESSION " << connect_seq
+              << " -> " << reply.connect_seq << dendl;
+      cseq = connect_seq = reply.connect_seq;
+      lock.Unlock();
+      continue;
+    }
+
+    if (reply.tag == CEPH_MSGR_TAG_WAIT) {
+      dout(3) << "connect got WAIT (connection race)" << dendl;
+      state = STATE_WAIT;
+      goto stop_locked;
+    }
+
+    if (reply.tag == CEPH_MSGR_TAG_READY) {
+      // hooray!
+      peer_global_seq = reply.global_seq;
+      lossy_rx = reply.flags & CEPH_MSG_CONNECT_LOSSY;
+      state = STATE_OPEN;
+      connect_seq = cseq + 1;
+      assert(connect_seq == reply.connect_seq);
+      first_fault = last_attempt = utime_t();
+      dout(20) << "connect success " << connect_seq << ", lossy_rx = " << lossy_rx << dendl;
+
+      if (!reader_running) {
+       dout(20) << "connect starting reader" << dendl;
+       start_reader();
+      }
+      return 0;
+    }
+    
+    // protocol error
+    dout(0) << "connect got bad tag " << (int)tag << dendl;
+    goto fail_locked;
   }
-  lock.Unlock();
-  dout(15) << "dispatch: ending loop " << dendl;
 
-  // deregister
-  rank->unregister_entity(this);
-  put();
+ fail:
+  lock.Lock();
+ fail_locked:
+  if (state == STATE_CONNECTING)
+    fault();
+  else
+    dout(3) << "connect fault, but state != connecting, stopping" << dendl;
+
+ stop_locked:
+  return -1;
 }
 
-void Rank::Endpoint::ready()
+void SimpleMessenger::Pipe::register_pipe()
 {
-  dout(10) << "ready " << get_myaddr() << dendl;
-  assert(!dispatch_thread.is_started());
-  get();
-  dispatch_thread.create();
+  dout(10) << "register_pipe" << dendl;
+  assert(rank->lock.is_locked());
+  assert(rank->rank_pipe.count(peer_addr) == 0);
+  rank->rank_pipe[peer_addr] = this;
 }
 
-
-int Rank::Endpoint::shutdown()
+void SimpleMessenger::Pipe::unregister_pipe()
 {
-  dout(10) << "shutdown " << get_myaddr() << dendl;
-  
-  // stop my dispatch thread
-  if (dispatch_thread.am_self()) {
-    dout(10) << "shutdown i am dispatch, setting stop flag" << dendl;
-    stop = true;
+  assert(rank->lock.is_locked());
+  if (rank->rank_pipe.count(peer_addr) &&
+      rank->rank_pipe[peer_addr] == this) {
+    dout(10) << "unregister_pipe" << dendl;
+    rank->rank_pipe.erase(peer_addr);
   } else {
-    dout(10) << "shutdown i am not dispatch, setting stop flag and joining thread." << dendl;
-    lock.Lock();
-    stop = true;
-    cond.Signal();
-    lock.Unlock();
+    dout(10) << "unregister_pipe - not registered" << dendl;
   }
-  return 0;
 }
 
-void Rank::Endpoint::suicide()
-{
-  dout(10) << "suicide " << get_myaddr() << dendl;
-  shutdown();
-  // hmm, or exit(0)?
-}
 
-void Rank::Endpoint::prepare_dest(const entity_inst_t& inst)
+void SimpleMessenger::Pipe::requeue_sent()
 {
-  rank->lock.Lock();
-  {
-    if (rank->rank_pipe.count(inst.addr) == 0)
-      rank->connect_rank(inst.addr, rank->policy_map[inst.name.type()]);
+  if (sent.empty())
+    return;
+
+  list<Message*>& rq = q[CEPH_MSG_PRIO_HIGHEST];
+  while (!sent.empty()) {
+    Message *m = sent.back();
+    sent.pop_back();
+    dout(10) << "requeue_sent " << *m << " for resend seq " << out_seq
+            << " (" << m->get_seq() << ")" << dendl;
+    rq.push_front(m);
+    out_seq--;
   }
-  rank->lock.Unlock();
 }
 
-int Rank::Endpoint::send_message(Message *m, entity_inst_t dest)
+void SimpleMessenger::Pipe::discard_queue()
 {
-  // set envelope
-  m->set_source_inst(_myinst);
-  m->set_orig_source_inst(_myinst);
-  m->set_dest_inst(dest);
-  if (!g_conf.ms_nocrc)
-    m->calc_data_crc();
-  else
-    m->get_footer().flags = (unsigned)m->get_footer().flags | CEPH_MSG_FOOTER_NOCRC;
-  if (!m->get_priority()) m->set_priority(get_default_send_priority());
-  dout(1) << m->get_source()
-          << " --> " << dest.name << " " << dest.addr
-          << " -- " << *m
-         << " -- ?+" << m->get_data().length()
-         << " (? " << m->get_footer().data_crc << ")"
-         << " " << m 
-         << dendl;
-
-  rank->submit_message(m, dest.addr);
-
-  return 0;
+  dout(10) << "discard_queue" << dendl;
+  for (list<Message*>::iterator p = sent.begin(); p != sent.end(); p++)
+    (*p)->put();
+  sent.clear();
+  for (map<int,list<Message*> >::iterator p = q.begin(); p != q.end(); p++)
+    for (list<Message*>::iterator r = p->second.begin(); r != p->second.end(); r++)
+      (*r)->put();
+  q.clear();
 }
 
-int Rank::Endpoint::forward_message(Message *m, entity_inst_t dest)
+
+void SimpleMessenger::Pipe::fault(bool onconnect, bool onread)
 {
-  // set envelope
-  m->set_source_inst(_myinst);
-  m->set_dest_inst(dest);
-  if (!g_conf.ms_nocrc)
-    m->calc_data_crc();
-  else
-    m->get_footer().flags = (unsigned)m->get_footer().flags | CEPH_MSG_FOOTER_NOCRC;
-  if (!m->get_priority()) m->set_priority(get_default_send_priority());
-  dout(1) << m->get_source()
-          << " **> " << dest.name << " " << dest.addr
-          << " -- " << *m
-         << " -- ?+" << m->get_data().length()
-         << " (? " << m->get_footer().data_crc << ")"
-         << " " << m 
-          << dendl;
+  assert(lock.is_locked());
+  cond.Signal();
 
-  rank->submit_message(m, dest.addr);
+  if (onread && state == STATE_CONNECTING) {
+    dout(10) << "fault already connecting, reader shutting down" << dendl;
+    return;
+  }
 
-  return 0;
-}
+  if (!onconnect) dout(2) << "fault " << errno << ": " << strerror(errno) << dendl;
 
+  if (state == STATE_CLOSED ||
+      state == STATE_CLOSING) {
+    dout(10) << "fault already closed|closing" << dendl;
+    return;
+  }
 
+  if (sd >= 0) {
+    ::close(sd);
+    sd = -1;
+    closed_socket();
+  }
 
-int Rank::Endpoint::lazy_send_message(Message *m, entity_inst_t dest)
-{
-  // set envelope
-  m->set_source_inst(_myinst);
-  m->set_orig_source_inst(_myinst);
-  m->set_dest_inst(dest);
-  if (!g_conf.ms_nocrc)
-    m->calc_data_crc();
-  else
-    m->get_footer().flags = (unsigned)m->get_footer().flags | CEPH_MSG_FOOTER_NOCRC;
-  if (!m->get_priority()) m->set_priority(get_default_send_priority());
-  dout(1) << "lazy " << m->get_source()
-          << " --> " << dest.name << " " << dest.addr
-          << " -- " << *m
-         << " -- ?+" << m->get_data().length()
-         << " (? " << m->get_footer().data_crc << ")"
-         << " " << m 
-          << dendl;
+  // lossy channel?
+  if (policy.lossy_tx) {
+    dout(10) << "fault on lossy channel, failing" << dendl;
+    fail();
+    return;
+  }
 
-  rank->submit_message(m, dest.addr, true);
+  // requeue sent items
+  requeue_sent();
 
-  return 0;
+  if (q.empty()) {
+    if (state == STATE_CLOSING || onconnect) {
+      dout(10) << "fault on connect, or already closing, and q empty: setting closed." << dendl;
+      state = STATE_CLOSED;
+    } else {
+      dout(0) << "fault nothing to send, going to standby" << dendl;
+      state = STATE_STANDBY;
+    }
+    return;
+  } 
+
+  utime_t now = g_clock.now();
+  if (state != STATE_CONNECTING) {
+    if (!onconnect) dout(0) << "fault initiating reconnect" << dendl;
+    connect_seq++;
+    state = STATE_CONNECTING;
+    first_fault = now;
+  } else if (first_fault.sec() == 0) {
+    if (!onconnect) dout(0) << "fault first fault" << dendl;
+    first_fault = now;
+  } else {
+    utime_t failinterval = now - first_fault;
+    utime_t retryinterval = now - last_attempt;
+    if (!onconnect) dout(10) << "fault failure was " << failinterval 
+                            << " ago, last attempt was at " << last_attempt
+                            << ", " << retryinterval << " ago" << dendl;
+    if (policy.fail_interval > 0 && failinterval > policy.fail_interval) {
+      // give up
+      dout(0) << "fault giving up" << dendl;
+      fail();
+    } else if (retryinterval < policy.retry_interval) {
+      // wait
+      now += (policy.retry_interval - retryinterval);
+      dout(10) << "fault waiting until " << now << dendl;
+      cond.WaitUntil(lock, now);
+      dout(10) << "fault done waiting or woke up" << dendl;
+    }
+  }
+  last_attempt = now;
 }
 
+void SimpleMessenger::Pipe::fail()
+{
+  derr(10) << "fail" << dendl;
+  assert(lock.is_locked());
 
+  stop();
+  report_failures();
 
-void Rank::Endpoint::reset_myname(entity_name_t newname)
-{
-  entity_name_t oldname = get_myname();
-  dout(10) << "reset_myname " << oldname << " to " << newname << dendl;
-  _set_myname(newname);
-}
+  for (unsigned i=0; i<rank->local.size(); i++) 
+    if (rank->local[i] && rank->local[i]->get_dispatcher())
+      rank->local[i]->queue_reset(peer_addr, last_dest_name);
 
+  // unregister
+  lock.Unlock();
+  rank->lock.Lock();
+  unregister_pipe();
+  rank->lock.Unlock();
+  lock.Lock();
+}
 
-void Rank::Endpoint::mark_down(entity_addr_t a)
+void SimpleMessenger::Pipe::was_session_reset()
 {
-  rank->mark_down(a);
+  assert(lock.is_locked());
+
+  dout(10) << "was_session_reset" << dendl;
+  report_failures();
+  for (unsigned i=0; i<rank->local.size(); i++) 
+    if (rank->local[i] && rank->local[i]->get_dispatcher())
+      rank->local[i]->queue_remote_reset(peer_addr, last_dest_name);
+
+  out_seq = 0;
+  in_seq = 0;
+  connect_seq = 0;
 }
 
-void Rank::mark_down(entity_addr_t addr)
+void SimpleMessenger::Pipe::report_failures()
 {
-  lock.Lock();
-  if (rank_pipe.count(addr)) {
-    Pipe *p = rank_pipe[addr];
-    dout(1) << "mark_down " << addr << " -- " << p << dendl;
-    p->unregister_pipe();
-    p->lock.Lock();
-    p->stop();
-    p->lock.Unlock();
-  } else {
-    dout(1) << "mark_down " << addr << " -- pipe dne" << dendl;
+  // report failures
+  q[CEPH_MSG_PRIO_HIGHEST].splice(q[CEPH_MSG_PRIO_HIGHEST].begin(), sent);
+  while (1) {
+    Message *m = _get_next_outgoing();
+    if (!m)
+      break;
+
+    if (policy.drop_msg_callback) {
+      unsigned srcrank = m->get_source_inst().addr.erank;
+      if (srcrank >= rank->max_local || rank->local[srcrank] == 0) {
+       dout(1) << "fail on " << *m << ", srcrank " << srcrank << " dne, dropping" << dendl;
+      } else if (rank->local[srcrank]->is_stopped()) {
+       dout(1) << "fail on " << *m << ", dispatcher stopping, ignoring." << dendl;
+      } else {
+       dout(10) << "fail on " << *m << dendl;
+       rank->local[srcrank]->queue_failure(m, m->get_dest_inst());
+      }
+    }
+    m->put();
   }
-  lock.Unlock();
 }
 
+void SimpleMessenger::Pipe::stop()
+{
+  dout(10) << "stop" << dendl;
+  state = STATE_CLOSED;
+  cond.Signal();
+  if (reader_running)
+    reader_thread.kill(SIGUSR1);
+  if (writer_running)
+    writer_thread.kill(SIGUSR1);
+}
 
 
+/* read msgs from socket.
+ * also, server.
+ */
+void SimpleMessenger::Pipe::reader()
+{
+  if (state == STATE_ACCEPTING) 
+    accept();
 
+  lock.Lock();
 
-/**************************************
- * Pipe
- */
+  // loop.
+  while (state != STATE_CLOSED &&
+        state != STATE_CONNECTING) {
+    assert(lock.is_locked());
 
-#undef dout_prefix
-#define dout_prefix _pipe_prefix()
-ostream& Rank::Pipe::_pipe_prefix() {
-  return *_dout << dbeginl << pthread_self()
-               << " -- " << rank->rank_addr << " >> " << peer_addr << " pipe(" << this
-               << " sd=" << sd
-               << " pgs=" << peer_global_seq
-               << " cs=" << connect_seq
-               << ").";
-}
+    // sleep if (re)connecting
+    if (state == STATE_STANDBY) {
+      dout(20) << "reader sleeping during reconnect|standby" << dendl;
+      cond.Wait(lock);
+      continue;
+    }
 
-int Rank::Pipe::accept()
-{
-  dout(10) << "accept" << dendl;
+    lock.Unlock();
 
-  // my creater gave me sd via accept()
-  assert(state == STATE_ACCEPTING);
-  
-  // announce myself.
-  int rc = tcp_write(sd, CEPH_BANNER, strlen(CEPH_BANNER));
-  if (rc < 0) {
-    dout(10) << "accept couldn't write banner" << dendl;
-    state = STATE_CLOSED;
-    return -1;
-  }
+    char tag = -1;
+    dout(20) << "reader reading tag..." << dendl;
+    int rc = tcp_read(sd, (char*)&tag, 1);
+    if (rc < 0) {
+      lock.Lock();
+      dout(2) << "reader couldn't read tag, " << strerror(errno) << dendl;
+      fault(false, true);
+      continue;
+    }
 
-  // and my addr
-  rc = tcp_write(sd, (char*)&rank->rank_addr, sizeof(rank->rank_addr));
-  if (rc < 0) {
-    dout(10) << "accept couldn't write my addr" << dendl;
-    state = STATE_CLOSED;
-    return -1;
-  }
-  dout(10) << "accept sd=" << sd << dendl;
-  
-  // identify peer
-  char banner[strlen(CEPH_BANNER)+1];
-  rc = tcp_read(sd, banner, strlen(CEPH_BANNER));
-  if (rc < 0) {
-    dout(10) << "accept couldn't read banner" << dendl;
-    state = STATE_CLOSED;
-    return -1;
-  }
-  if (memcmp(banner, CEPH_BANNER, strlen(CEPH_BANNER))) {
-    banner[strlen(CEPH_BANNER)] = 0;
-    dout(10) << "accept peer sent bad banner '" << banner << "'" << dendl;
-    state = STATE_CLOSED;
-    return -1;
-  }
-  rc = tcp_read(sd, (char*)&peer_addr, sizeof(peer_addr));
-  if (rc < 0) {
-    dout(10) << "accept couldn't read peer_addr" << dendl;
-    state = STATE_CLOSED;
-    return -1;
-  }
-  dout(10) << "accept peer addr is " << peer_addr << dendl;
-  if (peer_addr.ipaddr.sin_addr.s_addr == htonl(INADDR_ANY)) {
-    // peer apparently doesn't know what ip they have; figure it out for them.
-    entity_addr_t old_addr = peer_addr;
-    socklen_t len = sizeof(peer_addr.ipaddr);
-    int r = ::getpeername(sd, (sockaddr*)&peer_addr.ipaddr, &len);
-    if (r < 0) {
-      dout(0) << "accept failed to getpeername " << errno << " " << strerror(errno) << dendl;
-      state = STATE_CLOSED;
-      return -1;
-    }
-    peer_addr.ipaddr.sin_port = old_addr.ipaddr.sin_port;
-    dout(2) << "accept peer says " << old_addr << ", socket says " << peer_addr << dendl;
-  }
-  
-  ceph_msg_connect connect;
-  ceph_msg_connect_reply reply;
-  Pipe *existing = 0;
-  
-  // this should roughly mirror pseudocode at
-  //  http://ceph.newdream.net/wiki/Messaging_protocol
-
-  while (1) {
-    rc = tcp_read(sd, (char*)&connect, sizeof(connect));
-    if (rc < 0) {
-      dout(10) << "accept couldn't read connect" << dendl;
-      goto fail_unlocked;
-    }
-    dout(20) << "accept got peer connect_seq " << connect.connect_seq
-            << " global_seq " << connect.global_seq
-            << dendl;
-    
-    rank->lock.Lock();
-
-    // note peer's type, flags
-    policy = rank->policy_map[connect.host_type];  /* apply policy */
-    lossy_rx = connect.flags & CEPH_MSG_CONNECT_LOSSY;
-
-    memset(&reply, 0, sizeof(reply));
-
-    // existing?
-    if (rank->rank_pipe.count(peer_addr)) {
-      existing = rank->rank_pipe[peer_addr];
-      existing->lock.Lock();
-
-      if (connect.global_seq < existing->peer_global_seq) {
-       dout(10) << "accept existing " << existing << ".gseq " << existing->peer_global_seq
-                << " > " << connect.global_seq << ", RETRY_GLOBAL" << dendl;
-       reply.tag = CEPH_MSGR_TAG_RETRY_GLOBAL;
-       reply.global_seq = existing->peer_global_seq;  // so we can send it below..
-       existing->lock.Unlock();
-       rank->lock.Unlock();
-       goto reply;
-      } else {
-       dout(10) << "accept existing " << existing << ".gseq " << existing->peer_global_seq
-                << " <= " << connect.global_seq << ", looks ok" << dendl;
-      }
-      
-      if (existing->policy.lossy_tx) {
-       dout(-10) << "accept replacing existing (lossy) channel" << dendl;
-       existing->was_session_reset();
-       goto replace;
-      }
-      if (lossy_rx) {
-       if (existing->state == STATE_STANDBY) {
-         dout(-10) << "accept incoming lossy connection, kicking outgoing lossless "
-                   << existing << dendl;
-         existing->state = STATE_CONNECTING;
-         existing->cond.Signal();
-       } else {
-         dout(-10) << "accept incoming lossy connection, our lossless " << existing
-                   << " has state " << existing->state << ", doing nothing" << dendl;
+    // open ...
+    if (tag == CEPH_MSGR_TAG_ACK) {
+      dout(20) << "reader got ACK" << dendl;
+      __u32 seq;
+      int rc = tcp_read( sd, (char*)&seq, sizeof(seq));
+      lock.Lock();
+      if (rc < 0) {
+       dout(2) << "reader couldn't read ack seq, " << strerror(errno) << dendl;
+       fault(false, true);
+      } else if (state != STATE_CLOSED) {
+       dout(15) << "reader got ack seq " << seq << dendl;
+       // trim sent list
+       while (!sent.empty() &&
+              sent.front()->get_seq() <= seq) {
+         Message *m = sent.front();
+         sent.pop_front();
+         dout(10) << "reader got ack seq " 
+                   << seq << " >= " << m->get_seq() << " on " << m << " " << *m << dendl;
+         m->put();
        }
-       existing->lock.Unlock();
-       goto fail;
       }
+      continue;
+    }
 
-      dout(-10) << "accept connect_seq " << connect.connect_seq
-               << " vs existing " << existing->connect_seq
-               << " state " << existing->state << dendl;
-
-      if (connect.connect_seq < existing->connect_seq) {
-       if (connect.connect_seq == 0) {
-         dout(-10) << "accept peer reset, then tried to connect to us, replacing" << dendl;
-         existing->was_session_reset(); // this resets out_queue, msg_ and connect_seq #'s
-         goto replace;
-       } else {
-         // old attempt, or we sent READY but they didn't get it.
-         dout(10) << "accept existing " << existing << ".cseq " << existing->connect_seq
-                  << " > " << connect.connect_seq << ", RETRY_SESSION" << dendl;
-         reply.tag = CEPH_MSGR_TAG_RETRY_SESSION;
-         reply.connect_seq = existing->connect_seq;  // so we can send it below..
-         existing->lock.Unlock();
-         rank->lock.Unlock();
-         goto reply;
-       }
+    else if (tag == CEPH_MSGR_TAG_MSG) {
+      dout(20) << "reader got MSG" << dendl;
+      Message *m = read_message();
+      lock.Lock();
+      
+      if (!m) {
+       derr(2) << "reader read null message, " << strerror(errno) << dendl;
+       fault(false, true);
+       continue;
       }
 
-      if (connect.connect_seq == existing->connect_seq) {
-       // connection race?
-       if (peer_addr < rank->rank_addr) {
-         // incoming wins
-         dout(10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq
-                  << " == " << connect.connect_seq << ", replacing my attempt" << dendl;
-         assert(existing->state == STATE_CONNECTING ||
-                existing->state == STATE_STANDBY ||
-                existing->state == STATE_WAIT);
-         goto replace;
-       } else {
-         // our existing outgoing wins
-         dout(10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq
-                  << " == " << connect.connect_seq << ", sending WAIT" << dendl;
-         assert(peer_addr > rank->rank_addr);
-         assert(existing->state == STATE_CONNECTING); // this will win
-         reply.tag = CEPH_MSGR_TAG_WAIT;
-         existing->lock.Unlock();
-         rank->lock.Unlock();
-         goto reply;
-       }
-      }
+      if (state == STATE_CLOSED ||
+         state == STATE_CONNECTING)
+       continue;
 
-      assert(connect.connect_seq > existing->connect_seq);
-      assert(connect.global_seq >= existing->peer_global_seq);
-      if (existing->connect_seq == 0) {
-       dout(0) << "accept we reset (peer sent cseq " << connect.connect_seq 
-                << ", " << existing << ".cseq = " << existing->connect_seq
-                << "), sending RESETSESSION" << dendl;
-       reply.tag = CEPH_MSGR_TAG_RESETSESSION;
-       rank->lock.Unlock();
-       existing->lock.Unlock();
-       goto reply;
+      // check received seq#
+      if (m->get_seq() <= in_seq) {
+       dout(-10) << "reader got old message "
+                 << m->get_seq() << " <= " << in_seq << " " << m << " " << *m
+                 << " for " << m->get_dest() 
+                 << ", discarding" << dendl;
+       delete m;
+       continue;
       }
+      in_seq++;
 
-      // reconnect
-      dout(10) << "accept peer sent cseq " << connect.connect_seq
-              << " > " << existing->connect_seq << dendl;
-      goto replace;
-    } // existing
-    else if (connect.connect_seq > 0) {
-      // we reset, and they are opening a new session
-      dout(0) << "accept we reset (peer sent cseq " << connect.connect_seq << "), sending RESETSESSION" << dendl;
-      rank->lock.Unlock();
-      reply.tag = CEPH_MSGR_TAG_RESETSESSION;
-      goto reply;
-    } else {
-      // new session
-      dout(10) << "accept new session" << dendl;
-      goto open;
-    }
-    assert(0);    
-
-  reply:
-    rc = tcp_write(sd, (char*)&reply, sizeof(reply));
-    if (rc < 0)
-      goto fail_unlocked;
-  }
-  
- replace:
-  dout(10) << "accept replacing " << existing << dendl;
-  existing->state = STATE_CLOSED;
-  existing->cond.Signal();
-  existing->reader_thread.kill(SIGUSR1);
-  existing->unregister_pipe();
-    
-  // steal queue and out_seq
-  existing->requeue_sent();
-  out_seq = existing->out_seq;
-  in_seq = existing->in_seq;
-  dout(10) << "accept   out_seq " << out_seq << "  in_seq " << in_seq << dendl;
-  for (map<int, list<Message*> >::iterator p = existing->q.begin();
-       p != existing->q.end();
-       p++)
-    q[p->first].splice(q[p->first].begin(), p->second);
-  
-  existing->lock.Unlock();
-
- open:
-  // open
-  connect_seq = connect.connect_seq + 1;
-  peer_global_seq = connect.global_seq;
-  dout(10) << "accept success, connect_seq = " << connect_seq << ", sending READY" << dendl;
-
-  // send READY reply
-  reply.tag = CEPH_MSGR_TAG_READY;
-  reply.global_seq = rank->get_global_seq();
-  reply.connect_seq = connect_seq;
-  reply.flags = 0;
-  if (policy.lossy_tx)
-    reply.flags = reply.flags | CEPH_MSG_CONNECT_LOSSY;
-
-  // ok!
-  register_pipe();
-  rank->lock.Unlock();
-
-  rc = tcp_write(sd, (char*)&reply, sizeof(reply));
-  if (rc < 0)
-    goto fail;
-
-  lock.Lock();
-  if (state != STATE_CLOSED) {
-    dout(10) << "accept starting writer, " << "state=" << state << dendl;
-    start_writer();
-  }
-  dout(20) << "accept done" << dendl;
-  lock.Unlock();
-  return 0;   // success.
-
-
- fail:
-  rank->lock.Unlock();
- fail_unlocked:
-  lock.Lock();
-  state = STATE_CLOSED;
-  fault();
-  lock.Unlock();
-  return -1;
-}
-
-int Rank::Pipe::connect()
-{
-  dout(10) << "connect " << connect_seq << dendl;
-  assert(lock.is_locked());
-
-  if (sd >= 0) {
-    ::close(sd);
-    sd = -1;
-    closed_socket();
-  }
-  __u32 cseq = connect_seq;
-  __u32 gseq = rank->get_global_seq();
+      if (!lossy_rx && in_seq != m->get_seq()) {
+       dout(0) << "reader got bad seq " << m->get_seq() << " expected " << in_seq
+               << " for " << *m << " from " << m->get_source() << dendl;
+       derr(0) << "reader got bad seq " << m->get_seq() << " expected " << in_seq
+               << " for " << *m << " from " << m->get_source() << dendl;
+       assert(in_seq == m->get_seq()); // for now!
+       fault(false, true);
+       delete m;
+       continue;
+      }
 
-  // stop reader thrad
-  join_reader();
+      cond.Signal();  // wake up writer, to ack this
+      lock.Unlock();
+      
+      dout(10) << "reader got message "
+              << m->get_seq() << " " << m << " " << *m
+              << " for " << m->get_dest() << dendl;
+      
+      // deliver
+      Endpoint *entity = 0;
+      
+      rank->lock.Lock();
+      {
+       unsigned erank = m->get_dest_inst().addr.erank;
+       if (erank < rank->max_local && rank->local[erank]) {
+         // find entity
+         entity = rank->local[erank];
+         entity->get();
 
-  lock.Unlock();
-  
-  char tag = -1;
-  int rc;
-  struct sockaddr_in myAddr;
-  struct msghdr msg;
-  struct iovec msgvec[2];
-  int msglen;
-  char banner[strlen(CEPH_BANNER)];
-  entity_addr_t paddr;
+         // first message?
+         if (entity->need_addr) {
+           entity->_set_myaddr(m->get_dest_inst().addr);
+           dout(2) << "reader entity addr is " << entity->get_myaddr() << dendl;
+           entity->need_addr = false;
+         }
 
-  // create socket?
-  sd = ::socket(AF_INET, SOCK_STREAM, 0);
-  if (sd < 0) {
-    dout(-1) << "connect couldn't created socket " << strerror(errno) << dendl;
-    assert(0);
-    goto fail;
-  }
-  opened_socket();
-  
-  // bind any port
-  myAddr.sin_family = AF_INET;
-  myAddr.sin_addr.s_addr = htonl(INADDR_ANY);
-  myAddr.sin_port = htons( 0 );    
-  dout(10) << "binding to " << myAddr << dendl;
-  rc = ::bind(sd, (struct sockaddr *)&myAddr, sizeof(myAddr));
-  if (rc < 0) {
-    dout(2) << "bind error " << myAddr
-            << ", " << errno << ": " << strerror(errno) << dendl;
-    goto fail;
-  }
+         if (rank->need_addr) {
+           rank->rank_addr = m->get_dest_inst().addr;
+           rank->rank_addr.erank = 0;
+           dout(2) << "reader rank_addr is " << rank->rank_addr << dendl;
+           rank->need_addr = false;
+         }
 
-  // connect!
-  dout(10) << "connecting to " << peer_addr.ipaddr << dendl;
-  rc = ::connect(sd, (sockaddr*)&peer_addr.ipaddr, sizeof(peer_addr.ipaddr));
-  if (rc < 0) {
-    dout(2) << "connect error " << peer_addr.ipaddr
-            << ", " << errno << ": " << strerror(errno) << dendl;
-    goto fail;
-  }
+       } else {
+         derr(0) << "reader got message " << *m << " for " << m->get_dest() << ", which isn't local" << dendl;
+       }
+      }
+      rank->lock.Unlock();
+      
+      if (entity) {
+       entity->queue_message(m);        // queue
+       entity->put();
+      }
 
-  // disable Nagle algorithm?
-  if (g_conf.ms_tcp_nodelay) {
-    int flag = 1;
-    int r = ::setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (char*)&flag, sizeof(flag));
-    if (r < 0) 
-      dout(0) << "connect couldn't set TCP_NODELAY: " << strerror(errno) << dendl;
+      lock.Lock();
+    } 
+    
+    else if (tag == CEPH_MSGR_TAG_CLOSE) {
+      dout(20) << "reader got CLOSE" << dendl;
+      lock.Lock();
+      if (state == STATE_CLOSING)
+       state = STATE_CLOSED;
+      else
+       state = STATE_CLOSING;
+      cond.Signal();
+      break;
+    }
+    else {
+      dout(0) << "reader bad tag " << (int)tag << dendl;
+      lock.Lock();
+      fault(false, true);
+    }
   }
 
-  // verify banner
-  // FIXME: this should be non-blocking, or in some other way verify the banner as we get it.
-  rc = tcp_read(sd, (char*)&banner, strlen(CEPH_BANNER));
-  if (rc < 0) {
-    dout(2) << "connect couldn't read banner, " << strerror(errno) << dendl;
-    goto fail;
-  }
-  if (memcmp(banner, CEPH_BANNER, strlen(CEPH_BANNER))) {
-    dout(0) << "connect protocol error (bad banner) on peer " << paddr << dendl;
-    goto fail;
-  }
+  // reap?
+  bool reap = false;
+  reader_running = false;
+  if (!writer_running)
+    reap = true;
 
-  memset(&msg, 0, sizeof(msg));
-  msgvec[0].iov_base = banner;
-  msgvec[0].iov_len = strlen(CEPH_BANNER);
-  msg.msg_iov = msgvec;
-  msg.msg_iovlen = 1;
-  msglen = msgvec[0].iov_len;
-  if (do_sendmsg(sd, &msg, msglen)) {
-    dout(2) << "connect couldn't write my banner, " << strerror(errno) << dendl;
-    goto fail;
-  }
+  lock.Unlock();
 
-  // identify peer
-  rc = tcp_read(sd, (char*)&paddr, sizeof(paddr));
-  if (rc < 0) {
-    dout(2) << "connect couldn't read peer addr, " << strerror(errno) << dendl;
-    goto fail;
-  }
-  dout(20) << "connect read peer addr " << paddr << " on socket " << sd << dendl;
-  if (!peer_addr.is_local_to(paddr)) {
-    if (paddr.ipaddr.sin_addr.s_addr == 0 &&
-       peer_addr.ipaddr.sin_port == paddr.ipaddr.sin_port &&
-       peer_addr.nonce == paddr.nonce) {
-      dout(0) << "connect claims to be " 
-             << paddr << " not " << peer_addr << " - presumably this is the same node!" << dendl;
-    } else {
-      dout(0) << "connect claims to be " 
-             << paddr << " not " << peer_addr << " - wrong node!" << dendl;
-      goto fail;
+  if (reap) {
+    dout(10) << "reader queueing for reap" << dendl;
+    if (sd >= 0) {
+      ::close(sd);
+      sd = -1;
+      closed_socket();
     }
+    rank->lock.Lock();
+    {
+      rank->pipe_reap_queue.push_back(this);
+      rank->wait_cond.Signal();
+    }
+    rank->lock.Unlock();
   }
-  
-  // identify myself
-  memset(&msg, 0, sizeof(msg));
-  msgvec[0].iov_base = (char*)&rank->rank_addr;
-  msgvec[0].iov_len = sizeof(rank->rank_addr);
-  msg.msg_iov = msgvec;
-  msg.msg_iovlen = 1;
-  msglen = msgvec[0].iov_len;
-  if (do_sendmsg(sd, &msg, msglen)) {
-    dout(2) << "connect couldn't write my addr, " << strerror(errno) << dendl;
-    goto fail;
-  }
-  dout(10) << "connect sent my addr " << rank->rank_addr << dendl;
 
-  while (1) {
-    ceph_msg_connect connect;
-    connect.host_type = rank->my_type;
-    connect.global_seq = gseq;
-    connect.connect_seq = cseq;
-    connect.flags = 0;
-    if (policy.lossy_tx)
-      connect.flags |= CEPH_MSG_CONNECT_LOSSY;
-    memset(&msg, 0, sizeof(msg));
-    msgvec[0].iov_base = (char*)&connect;
-    msgvec[0].iov_len = sizeof(connect);
-    msg.msg_iov = msgvec;
-    msg.msg_iovlen = 1;
-    msglen = msgvec[0].iov_len;
+  dout(10) << "reader done" << dendl;
+}
 
-    dout(10) << "connect sending gseq=" << gseq << " cseq=" << cseq << dendl;
-    if (do_sendmsg(sd, &msg, msglen)) {
-      dout(2) << "connect couldn't write gseq, cseq, " << strerror(errno) << dendl;
-      goto fail;
-    }
+/*
+class FakeSocketError : public Context {
+  int sd;
+public:
+  FakeSocketError(int s) : sd(s) {}
+  void finish(int r) {
+    cout << "faking socket error on " << sd << std::endl;
+    ::close(sd);
+  }
+};
+*/
 
-    dout(20) << "connect wrote (self +) cseq, waiting for reply" << dendl;
-    ceph_msg_connect_reply reply;
-    if (tcp_read(sd, (char*)&reply, sizeof(reply)) < 0) {
-      dout(2) << "connect read reply " << strerror(errno) << dendl;
-      goto fail;
-    }
-    dout(20) << "connect got reply tag " << (int)reply.tag
-            << " connect_seq " << reply.connect_seq
-            << " global_seq " << reply.global_seq
-            << " flags " << (int)reply.flags
-            << dendl;
+/* write msgs to socket.
+ * also, client.
+ */
+void SimpleMessenger::Pipe::writer()
+{
+  lock.Lock();
 
-    lock.Lock();
-    if (state != STATE_CONNECTING) {
-      dout(0) << "connect got RESETSESSION but no longer connecting" << dendl;
-      goto stop_locked;
-    }
+  while (state != STATE_CLOSED) {// && state != STATE_WAIT) {
+    // standby?
+    if (!q.empty() && state == STATE_STANDBY)
+      state = STATE_CONNECTING;
 
-    if (reply.tag == CEPH_MSGR_TAG_RESETSESSION) {
-      dout(0) << "connect got RESETSESSION" << dendl;
-      was_session_reset();
-      cseq = 0;
-      lock.Unlock();
-      continue;
-    }
-    if (reply.tag == CEPH_MSGR_TAG_RETRY_GLOBAL) {
-      gseq = rank->get_global_seq(reply.global_seq);
-      dout(10) << "connect got RETRY_GLOBAL " << reply.global_seq
-              << " chose new " << gseq << dendl;
-      lock.Unlock();
+    // connect?
+    if (state == STATE_CONNECTING) {
+      connect();
       continue;
     }
-    if (reply.tag == CEPH_MSGR_TAG_RETRY_SESSION) {
-      assert(reply.connect_seq > connect_seq);
-      dout(10) << "connect got RETRY_SESSION " << connect_seq
-              << " -> " << reply.connect_seq << dendl;
-      cseq = connect_seq = reply.connect_seq;
+    
+    if (state == STATE_CLOSING) {
+      // write close tag
+      dout(20) << "writer writing CLOSE tag" << dendl;
+      char tag = CEPH_MSGR_TAG_CLOSE;
+      state = STATE_CLOSED;
       lock.Unlock();
+      if (sd) ::write(sd, &tag, 1);
+      lock.Lock();
       continue;
     }
 
-    if (reply.tag == CEPH_MSGR_TAG_WAIT) {
-      dout(3) << "connect got WAIT (connection race)" << dendl;
-      state = STATE_WAIT;
-      goto stop_locked;
-    }
-
-    if (reply.tag == CEPH_MSGR_TAG_READY) {
-      // hooray!
-      peer_global_seq = reply.global_seq;
-      lossy_rx = reply.flags & CEPH_MSG_CONNECT_LOSSY;
-      state = STATE_OPEN;
-      connect_seq = cseq + 1;
-      assert(connect_seq == reply.connect_seq);
-      first_fault = last_attempt = utime_t();
-      dout(20) << "connect success " << connect_seq << ", lossy_rx = " << lossy_rx << dendl;
+    if (state != STATE_CONNECTING && state != STATE_WAIT && state != STATE_STANDBY &&
+       (!q.empty() || in_seq > in_seq_acked)) {
 
-      if (!reader_running) {
-       dout(20) << "connect starting reader" << dendl;
-       start_reader();
+      // send ack?
+      if (in_seq > in_seq_acked) {
+       int send_seq = in_seq;
+       lock.Unlock();
+       int rc = write_ack(send_seq);
+       lock.Lock();
+       if (rc < 0) {
+         dout(2) << "writer couldn't write ack, " << strerror(errno) << dendl;
+         fault();
+         continue;
+       }
+       in_seq_acked = send_seq;
       }
-      return 0;
-    }
-    
-    // protocol error
-    dout(0) << "connect got bad tag " << (int)tag << dendl;
-    goto fail_locked;
-  }
 
- fail:
-  lock.Lock();
- fail_locked:
-  if (state == STATE_CONNECTING)
-    fault();
-  else
-    dout(3) << "connect fault, but state != connecting, stopping" << dendl;
+      // grab outgoing message
+      Message *m = _get_next_outgoing();
+      if (m) {
+       m->set_seq(++out_seq);
+       sent.push_back(m); // move to sent list
+       m->get();
+       lock.Unlock();
 
- stop_locked:
-  return -1;
-}
+        dout(20) << "writer encoding " << m->get_seq() << " " << m << " " << *m << dendl;
 
-void Rank::Pipe::register_pipe()
-{
-  dout(10) << "register_pipe" << dendl;
-  assert(rank->lock.is_locked());
-  assert(rank->rank_pipe.count(peer_addr) == 0);
-  rank->rank_pipe[peer_addr] = this;
-}
+       // encode and copy out of *m
+        if (m->empty_payload()) 
+         m->encode_payload();
+       m->calc_front_crc();
 
-void Rank::Pipe::unregister_pipe()
-{
-  assert(rank->lock.is_locked());
-  if (rank->rank_pipe.count(peer_addr) &&
-      rank->rank_pipe[peer_addr] == this) {
-    dout(10) << "unregister_pipe" << dendl;
-    rank->rank_pipe.erase(peer_addr);
-  } else {
-    dout(10) << "unregister_pipe - not registered" << dendl;
-  }
-}
+        dout(20) << "writer sending " << m->get_seq() << " " << m << dendl;
+       int rc = write_message(m);
 
+       lock.Lock();
+       if (rc < 0) {
+          derr(1) << "writer error sending " << m << " to " << m->get_header().dst << ", "
+                 << errno << ": " << strerror(errno) << dendl;
+         fault();
+        }
+       m->put();
+      }
+      continue;
+    }
+    
+    // wait
+    dout(20) << "writer sleeping" << dendl;
+    cond.Wait(lock);
+  }
+  
+  dout(20) << "writer finishing" << dendl;
 
-void Rank::Pipe::requeue_sent()
-{
-  if (sent.empty())
-    return;
+  // reap?
+  bool reap = false;
+  writer_running = false;
+  if (!reader_running) reap = true;
 
-  list<Message*>& rq = q[CEPH_MSG_PRIO_HIGHEST];
-  while (!sent.empty()) {
-    Message *m = sent.back();
-    sent.pop_back();
-    dout(10) << "requeue_sent " << *m << " for resend seq " << out_seq
-            << " (" << m->get_seq() << ")" << dendl;
-    rq.push_front(m);
-    out_seq--;
+  lock.Unlock();
+  
+  if (reap) {
+    dout(10) << "writer queueing for reap" << dendl;
+    if (sd >= 0) {
+      ::close(sd);
+      sd = -1;
+      closed_socket();
+    }
+    rank->lock.Lock();
+    {
+      rank->pipe_reap_queue.push_back(this);
+      rank->wait_cond.Signal();
+    }
+    rank->lock.Unlock();
   }
-}
 
-void Rank::Pipe::discard_queue()
-{
-  dout(10) << "discard_queue" << dendl;
-  for (list<Message*>::iterator p = sent.begin(); p != sent.end(); p++)
-    (*p)->put();
-  sent.clear();
-  for (map<int,list<Message*> >::iterator p = q.begin(); p != q.end(); p++)
-    for (list<Message*>::iterator r = p->second.begin(); r != p->second.end(); r++)
-      (*r)->put();
-  q.clear();
+  dout(10) << "writer done" << dendl;
 }
 
 
-void Rank::Pipe::fault(bool onconnect, bool onread)
+Message *SimpleMessenger::Pipe::read_message()
 {
-  assert(lock.is_locked());
-  cond.Signal();
-
-  if (onread && state == STATE_CONNECTING) {
-    dout(10) << "fault already connecting, reader shutting down" << dendl;
-    return;
-  }
+  // envelope
+  //dout(10) << "receiver.read_message from sd " << sd  << dendl;
+  
+  ceph_msg_header header; 
+  ceph_msg_footer footer;
 
-  if (!onconnect) dout(2) << "fault " << errno << ": " << strerror(errno) << dendl;
+  if (tcp_read( sd, (char*)&header, sizeof(header) ) < 0)
+    return 0;
+  
+  dout(20) << "reader got envelope type=" << header.type
+           << " src " << header.src << " dst " << header.dst
+           << " front=" << header.front_len
+          << " data=" << header.data_len
+          << " off " << header.data_off
+           << dendl;
 
-  if (state == STATE_CLOSED ||
-      state == STATE_CLOSING) {
-    dout(10) << "fault already closed|closing" << dendl;
-    return;
+  // verify header crc
+  __u32 header_crc = crc32c_le(0, (unsigned char *)&header, sizeof(header) - sizeof(header.crc));
+  if (header_crc != header.crc) {
+    dout(0) << "reader got bad header crc " << header_crc << " != " << header.crc << dendl;
+    return 0;
   }
 
-  if (sd >= 0) {
-    ::close(sd);
-    sd = -1;
-    closed_socket();
+  // ok, now it's safe to change the header..
+  // munge source address?
+  if (header.src.addr.ipaddr.sin_addr.s_addr == htonl(INADDR_ANY)) {
+    dout(10) << "reader munging src addr " << header.src << " to be " << peer_addr << dendl;
+    header.orig_src.addr.ipaddr = header.src.addr.ipaddr = peer_addr.ipaddr;
   }
 
-  // lossy channel?
-  if (policy.lossy_tx) {
-    dout(10) << "fault on lossy channel, failing" << dendl;
-    fail();
-    return;
+  // read front
+  bufferlist front;
+  bufferptr bp;
+  int front_len = header.front_len;
+  if (front_len) {
+    bp = buffer::create(front_len);
+    if (tcp_read( sd, bp.c_str(), front_len ) < 0) 
+      return 0;
+    front.push_back(bp);
+    dout(20) << "reader got front " << front.length() << dendl;
   }
 
-  // requeue sent items
-  requeue_sent();
-
-  if (q.empty()) {
-    if (state == STATE_CLOSING || onconnect) {
-      dout(10) << "fault on connect, or already closing, and q empty: setting closed." << dendl;
-      state = STATE_CLOSED;
-    } else {
-      dout(0) << "fault nothing to send, going to standby" << dendl;
-      state = STATE_STANDBY;
-    }
-    return;
-  } 
-
-  utime_t now = g_clock.now();
-  if (state != STATE_CONNECTING) {
-    if (!onconnect) dout(0) << "fault initiating reconnect" << dendl;
-    connect_seq++;
-    state = STATE_CONNECTING;
-    first_fault = now;
-  } else if (first_fault.sec() == 0) {
-    if (!onconnect) dout(0) << "fault first fault" << dendl;
-    first_fault = now;
-  } else {
-    utime_t failinterval = now - first_fault;
-    utime_t retryinterval = now - last_attempt;
-    if (!onconnect) dout(10) << "fault failure was " << failinterval 
-                            << " ago, last attempt was at " << last_attempt
-                            << ", " << retryinterval << " ago" << dendl;
-    if (policy.fail_interval > 0 && failinterval > policy.fail_interval) {
-      // give up
-      dout(0) << "fault giving up" << dendl;
-      fail();
-    } else if (retryinterval < policy.retry_interval) {
-      // wait
-      now += (policy.retry_interval - retryinterval);
-      dout(10) << "fault waiting until " << now << dendl;
-      cond.WaitUntil(lock, now);
-      dout(10) << "fault done waiting or woke up" << dendl;
-    }
-  }
-  last_attempt = now;
-}
+  // read data
+  bufferlist data;
+  unsigned data_len = le32_to_cpu(header.data_len);
+  unsigned data_off = le32_to_cpu(header.data_off);
+  if (data_len) {
+    int left = data_len;
+    if (data_off & ~PAGE_MASK) {
+      // head
+      int head = MIN(PAGE_SIZE - (data_off & ~PAGE_MASK),
+                    (unsigned)left);
+      bp = buffer::create(head);
+      if (tcp_read( sd, bp.c_str(), head ) < 0) 
+       return 0;
+      data.push_back(bp);
+      left -= head;
+      dout(20) << "reader got data head " << head << dendl;
+    }
 
-void Rank::Pipe::fail()
-{
-  derr(10) << "fail" << dendl;
-  assert(lock.is_locked());
+    // middle
+    int middle = left & PAGE_MASK;
+    if (middle > 0) {
+      bp = buffer::create_page_aligned(middle);
+      if (tcp_read( sd, bp.c_str(), middle ) < 0) 
+       return 0;
+      data.push_back(bp);
+      left -= middle;
+      dout(20) << "reader got data page-aligned middle " << middle << dendl;
+    }
 
-  stop();
-  report_failures();
+    if (left) {
+      bp = buffer::create(left);
+      if (tcp_read( sd, bp.c_str(), left ) < 0) 
+       return 0;
+      data.push_back(bp);
+      dout(20) << "reader got data tail " << left << dendl;
+    }
+  }
 
-  for (unsigned i=0; i<rank->local.size(); i++) 
-    if (rank->local[i] && rank->local[i]->get_dispatcher())
-      rank->local[i]->queue_reset(peer_addr, last_dest_name);
+  // footer
+  if (tcp_read(sd, (char*)&footer, sizeof(footer)) < 0) 
+    return 0;
+  
+  int aborted = (le32_to_cpu(footer.flags) & CEPH_MSG_FOOTER_ABORTED);
+  dout(10) << "aborted = " << aborted << dendl;
+  if (aborted) {
+    dout(0) << "reader got " << front.length() << " + " << data.length()
+           << " byte message from " << header.src << ".. ABORTED" << dendl;
+    // MEH FIXME 
+    Message *m = new MGenericMessage(CEPH_MSG_PING);
+    header.type = CEPH_MSG_PING;
+    m->set_header(header);
+    return m;
+  }
 
-  // unregister
-  lock.Unlock();
-  rank->lock.Lock();
-  unregister_pipe();
-  rank->lock.Unlock();
-  lock.Lock();
+  dout(20) << "reader got " << front.length() << " + " << data.length()
+          << " byte message from " << header.src << dendl;
+  return decode_message(header, footer, front, data);
 }
 
-void Rank::Pipe::was_session_reset()
-{
-  assert(lock.is_locked());
-
-  dout(10) << "was_session_reset" << dendl;
-  report_failures();
-  for (unsigned i=0; i<rank->local.size(); i++) 
-    if (rank->local[i] && rank->local[i]->get_dispatcher())
-      rank->local[i]->queue_remote_reset(peer_addr, last_dest_name);
-
-  out_seq = 0;
-  in_seq = 0;
-  connect_seq = 0;
-}
 
-void Rank::Pipe::report_failures()
+int SimpleMessenger::Pipe::do_sendmsg(int sd, struct msghdr *msg, int len)
 {
-  // report failures
-  q[CEPH_MSG_PRIO_HIGHEST].splice(q[CEPH_MSG_PRIO_HIGHEST].begin(), sent);
-  while (1) {
-    Message *m = _get_next_outgoing();
-    if (!m)
-      break;
+  while (len > 0) {
+    if (0) { // sanity
+      int l = 0;
+      for (unsigned i=0; i<msg->msg_iovlen; i++)
+       l += msg->msg_iov[i].iov_len;
+      assert(l == len);
+    }
 
-    if (policy.drop_msg_callback) {
-      unsigned srcrank = m->get_source_inst().addr.erank;
-      if (srcrank >= rank->max_local || rank->local[srcrank] == 0) {
-       dout(1) << "fail on " << *m << ", srcrank " << srcrank << " dne, dropping" << dendl;
-      } else if (rank->local[srcrank]->is_stopped()) {
-       dout(1) << "fail on " << *m << ", dispatcher stopping, ignoring." << dendl;
+    int r = ::sendmsg(sd, msg, 0);
+    if (r == 0) 
+      dout(10) << "do_sendmsg hmm do_sendmsg got r==0!" << dendl;
+    if (r < 0) { 
+      dout(1) << "do_sendmsg error " << strerror(errno) << dendl;
+      return -1;
+    }
+    if (state == STATE_CLOSED) {
+      dout(10) << "do_sendmsg oh look, state == CLOSED, giving up" << dendl;
+      errno = EINTR;
+      return -1; // close enough
+    }
+    len -= r;
+    if (len == 0) break;
+    
+    // hrmph.  trim r bytes off the front of our message.
+    dout(20) << "do_sendmail short write did " << r << ", still have " << len << dendl;
+    while (r > 0) {
+      if (msg->msg_iov[0].iov_len <= (size_t)r) {
+       // lose this whole item
+       //dout(30) << "skipping " << msg->msg_iov[0].iov_len << ", " << (msg->msg_iovlen-1) << " v, " << r << " left" << dendl;
+       r -= msg->msg_iov[0].iov_len;
+       msg->msg_iov++;
+       msg->msg_iovlen--;
       } else {
-       dout(10) << "fail on " << *m << dendl;
-       rank->local[srcrank]->queue_failure(m, m->get_dest_inst());
+       // partial!
+       //dout(30) << "adjusting " << msg->msg_iov[0].iov_len << ", " << msg->msg_iovlen << " v, " << r << " left" << dendl;
+       msg->msg_iov[0].iov_base = (void*)((long)msg->msg_iov[0].iov_base + r);
+       msg->msg_iov[0].iov_len -= r;
+       break;
       }
     }
-    m->put();
   }
+  return 0;
 }
 
-void Rank::Pipe::stop()
+
+int SimpleMessenger::Pipe::write_ack(unsigned seq)
 {
-  dout(10) << "stop" << dendl;
-  state = STATE_CLOSED;
-  cond.Signal();
-  if (reader_running)
-    reader_thread.kill(SIGUSR1);
-  if (writer_running)
-    writer_thread.kill(SIGUSR1);
+  dout(10) << "write_ack " << seq << dendl;
+
+  char c = CEPH_MSGR_TAG_ACK;
+  __le32 s;
+  s = seq;
+
+  struct msghdr msg;
+  memset(&msg, 0, sizeof(msg));
+  struct iovec msgvec[2];
+  msgvec[0].iov_base = &c;
+  msgvec[0].iov_len = 1;
+  msgvec[1].iov_base = &s;
+  msgvec[1].iov_len = sizeof(s);
+  msg.msg_iov = msgvec;
+  msg.msg_iovlen = 2;
+  
+  if (do_sendmsg(sd, &msg, 5) < 0) 
+    return -1; 
+  return 0;
 }
 
 
-/* read msgs from socket.
- * also, server.
- */
-void Rank::Pipe::reader()
+int SimpleMessenger::Pipe::write_message(Message *m)
 {
-  if (state == STATE_ACCEPTING) 
-    accept();
+  ceph_msg_header& header = m->get_header();
+  ceph_msg_footer& footer = m->get_footer();
 
-  lock.Lock();
+  // get envelope, buffers
+  header.front_len = m->get_payload().length();
+  header.data_len = m->get_data().length();
+  footer.flags = 0;
+  m->calc_header_crc();
 
-  // loop.
-  while (state != STATE_CLOSED &&
-        state != STATE_CONNECTING) {
-    assert(lock.is_locked());
+  bufferlist blist = m->get_payload();
+  blist.append(m->get_data());
+  
+  dout(20)  << "write_message " << m << " to " << header.dst << dendl;
+  
+  // set up msghdr and iovecs
+  struct msghdr msg;
+  memset(&msg, 0, sizeof(msg));
+  struct iovec msgvec[3 + blist.buffers().size()];  // conservative upper bound
+  msg.msg_iov = msgvec;
+  int msglen = 0;
+  
+  // send tag
+  char tag = CEPH_MSGR_TAG_MSG;
+  msgvec[msg.msg_iovlen].iov_base = &tag;
+  msgvec[msg.msg_iovlen].iov_len = 1;
+  msglen++;
+  msg.msg_iovlen++;
 
-    // sleep if (re)connecting
-    if (state == STATE_STANDBY) {
-      dout(20) << "reader sleeping during reconnect|standby" << dendl;
-      cond.Wait(lock);
-      continue;
-    }
+  // send envelope
+  msgvec[msg.msg_iovlen].iov_base = (char*)&header;
+  msgvec[msg.msg_iovlen].iov_len = sizeof(header);
+  msglen += sizeof(header);
+  msg.msg_iovlen++;
 
-    lock.Unlock();
+  // payload (front+data)
+  list<bufferptr>::const_iterator pb = blist.buffers().begin();
+  int b_off = 0;  // carry-over buffer offset, if any
+  int bl_pos = 0; // blist pos
+  int left = blist.length();
 
-    char tag = -1;
-    dout(20) << "reader reading tag..." << dendl;
-    int rc = tcp_read(sd, (char*)&tag, 1);
-    if (rc < 0) {
-      lock.Lock();
-      dout(2) << "reader couldn't read tag, " << strerror(errno) << dendl;
-      fault(false, true);
-      continue;
+  while (left > 0) {
+    int donow = MIN(left, (int)pb->length()-b_off);
+    if (donow == 0) {
+      dout(0) << "donow = " << donow << " left " << left << " pb->length " << pb->length() << " b_off " << b_off << dendl;
     }
-
-    // open ...
-    if (tag == CEPH_MSGR_TAG_ACK) {
-      dout(20) << "reader got ACK" << dendl;
-      __u32 seq;
-      int rc = tcp_read( sd, (char*)&seq, sizeof(seq));
-      lock.Lock();
-      if (rc < 0) {
-       dout(2) << "reader couldn't read ack seq, " << strerror(errno) << dendl;
-       fault(false, true);
-      } else if (state != STATE_CLOSED) {
-       dout(15) << "reader got ack seq " << seq << dendl;
-       // trim sent list
-       while (!sent.empty() &&
-              sent.front()->get_seq() <= seq) {
-         Message *m = sent.front();
-         sent.pop_front();
-         dout(10) << "reader got ack seq " 
-                   << seq << " >= " << m->get_seq() << " on " << m << " " << *m << dendl;
-         m->put();
-       }
-      }
-      continue;
+    assert(donow > 0);
+    dout(30) << " bl_pos " << bl_pos << " b_off " << b_off
+            << " leftinchunk " << left
+            << " buffer len " << pb->length()
+            << " writing " << donow 
+            << dendl;
+    
+    if (msg.msg_iovlen >= IOV_MAX-2) {
+      if (do_sendmsg(sd, &msg, msglen)) 
+       return -1;      
+      
+      // and restart the iov
+      msg.msg_iov = msgvec;
+      msg.msg_iovlen = 0;
+      msglen = 0;
+    }
+    
+    msgvec[msg.msg_iovlen].iov_base = (void*)(pb->c_str()+b_off);
+    msgvec[msg.msg_iovlen].iov_len = donow;
+    msglen += donow;
+    msg.msg_iovlen++;
+    
+    left -= donow;
+    assert(left >= 0);
+    b_off += donow;
+    bl_pos += donow;
+    if (left == 0) break;
+    while (b_off == (int)pb->length()) {
+      pb++;
+      b_off = 0;
     }
+  }
+  assert(left == 0);
 
-    else if (tag == CEPH_MSGR_TAG_MSG) {
-      dout(20) << "reader got MSG" << dendl;
-      Message *m = read_message();
-      lock.Lock();
-      
-      if (!m) {
-       derr(2) << "reader read null message, " << strerror(errno) << dendl;
-       fault(false, true);
-       continue;
-      }
-
-      if (state == STATE_CLOSED ||
-         state == STATE_CONNECTING)
-       continue;
+  // send footer
+  msgvec[msg.msg_iovlen].iov_base = (void*)&footer;
+  msgvec[msg.msg_iovlen].iov_len = sizeof(footer);
+  msglen += sizeof(footer);
+  msg.msg_iovlen++;
 
-      // check received seq#
-      if (m->get_seq() <= in_seq) {
-       dout(-10) << "reader got old message "
-                 << m->get_seq() << " <= " << in_seq << " " << m << " " << *m
-                 << " for " << m->get_dest() 
-                 << ", discarding" << dendl;
-       delete m;
-       continue;
-      }
-      in_seq++;
+  // send
+  if (do_sendmsg(sd, &msg, msglen)) 
+    return -1; 
 
-      if (!lossy_rx && in_seq != m->get_seq()) {
-       dout(0) << "reader got bad seq " << m->get_seq() << " expected " << in_seq
-               << " for " << *m << " from " << m->get_source() << dendl;
-       derr(0) << "reader got bad seq " << m->get_seq() << " expected " << in_seq
-               << " for " << *m << " from " << m->get_source() << dendl;
-       assert(in_seq == m->get_seq()); // for now!
-       fault(false, true);
-       delete m;
-       continue;
-      }
+  return 0;
+}
 
-      cond.Signal();  // wake up writer, to ack this
-      lock.Unlock();
-      
-      dout(10) << "reader got message "
-              << m->get_seq() << " " << m << " " << *m
-              << " for " << m->get_dest() << dendl;
-      
-      // deliver
-      Endpoint *entity = 0;
-      
-      rank->lock.Lock();
-      {
-       unsigned erank = m->get_dest_inst().addr.erank;
-       if (erank < rank->max_local && rank->local[erank]) {
-         // find entity
-         entity = rank->local[erank];
-         entity->get();
 
-         // first message?
-         if (entity->need_addr) {
-           entity->_set_myaddr(m->get_dest_inst().addr);
-           dout(2) << "reader entity addr is " << entity->get_myaddr() << dendl;
-           entity->need_addr = false;
-         }
+/********************************************
+ * SimpleMessenger
+ */
+#undef dout_prefix
+#define dout_prefix _prefix(this)
 
-         if (rank->need_addr) {
-           rank->rank_addr = m->get_dest_inst().addr;
-           rank->rank_addr.erank = 0;
-           dout(2) << "reader rank_addr is " << rank->rank_addr << dendl;
-           rank->need_addr = false;
-         }
 
-       } else {
-         derr(0) << "reader got message " << *m << " for " << m->get_dest() << ", which isn't local" << dendl;
-       }
-      }
-      rank->lock.Unlock();
-      
-      if (entity) {
-       entity->queue_message(m);        // queue
-       entity->put();
-      }
+/*
+ * note: assumes lock is held
+ */
+void SimpleMessenger::reaper()
+{
+  dout(10) << "reaper" << dendl;
+  assert(lock.is_locked());
 
-      lock.Lock();
-    } 
-    
-    else if (tag == CEPH_MSGR_TAG_CLOSE) {
-      dout(20) << "reader got CLOSE" << dendl;
-      lock.Lock();
-      if (state == STATE_CLOSING)
-       state = STATE_CLOSED;
-      else
-       state = STATE_CLOSING;
-      cond.Signal();
-      break;
-    }
-    else {
-      dout(0) << "reader bad tag " << (int)tag << dendl;
-      lock.Lock();
-      fault(false, true);
-    }
+  while (!pipe_reap_queue.empty()) {
+    Pipe *p = pipe_reap_queue.front();
+    dout(10) << "reaper reaping pipe " << p << " " << p->get_peer_addr() << dendl;
+    p->unregister_pipe();
+    pipe_reap_queue.pop_front();
+    assert(pipes.count(p));
+    pipes.erase(p);
+    p->join();
+    p->discard_queue();
+    dout(10) << "reaper reaped pipe " << p << " " << p->get_peer_addr() << dendl;
+    assert(p->sd < 0);
+    delete p;
+    dout(10) << "reaper deleted pipe " << p << dendl;
   }
+}
 
-  // reap?
-  bool reap = false;
-  reader_running = false;
-  if (!writer_running)
-    reap = true;
-
-  lock.Unlock();
 
-  if (reap) {
-    dout(10) << "reader queueing for reap" << dendl;
-    if (sd >= 0) {
-      ::close(sd);
-      sd = -1;
-      closed_socket();
-    }
-    rank->lock.Lock();
-    {
-      rank->pipe_reap_queue.push_back(this);
-      rank->wait_cond.Signal();
-    }
-    rank->lock.Unlock();
+int SimpleMessenger::bind(int64_t force_nonce)
+{
+  lock.Lock();
+  if (started) {
+    dout(10) << "rank.bind already started" << dendl;
+    lock.Unlock();
+    return -1;
   }
+  dout(10) << "rank.bind" << dendl;
+  lock.Unlock();
 
-  dout(10) << "reader done" << dendl;
+  // bind to a socket
+  return accepter.bind(force_nonce);
 }
 
-/*
-class FakeSocketError : public Context {
-  int sd;
+
+class C_Die : public Context {
 public:
-  FakeSocketError(int s) : sd(s) {}
-  void finish(int r) {
-    cout << "faking socket error on " << sd << std::endl;
-    ::close(sd);
+  void finish(int) {
+    cerr << "die" << std::endl;
+    exit(1);
   }
 };
-*/
 
-/* write msgs to socket.
- * also, client.
- */
-void Rank::Pipe::writer()
+static void write_pid_file(int pid)
 {
-  lock.Lock();
-
-  while (state != STATE_CLOSED) {// && state != STATE_WAIT) {
-    // standby?
-    if (!q.empty() && state == STATE_STANDBY)
-      state = STATE_CONNECTING;
-
-    // connect?
-    if (state == STATE_CONNECTING) {
-      connect();
-      continue;
-    }
-    
-    if (state == STATE_CLOSING) {
-      // write close tag
-      dout(20) << "writer writing CLOSE tag" << dendl;
-      char tag = CEPH_MSGR_TAG_CLOSE;
-      state = STATE_CLOSED;
-      lock.Unlock();
-      if (sd) ::write(sd, &tag, 1);
-      lock.Lock();
-      continue;
-    }
-
-    if (state != STATE_CONNECTING && state != STATE_WAIT && state != STATE_STANDBY &&
-       (!q.empty() || in_seq > in_seq_acked)) {
+  if (!g_conf.pid_file)
+    return;
 
-      // send ack?
-      if (in_seq > in_seq_acked) {
-       int send_seq = in_seq;
-       lock.Unlock();
-       int rc = write_ack(send_seq);
-       lock.Lock();
-       if (rc < 0) {
-         dout(2) << "writer couldn't write ack, " << strerror(errno) << dendl;
-         fault();
-         continue;
-       }
-       in_seq_acked = send_seq;
-      }
+  int fd = ::open(g_conf.pid_file, O_CREAT|O_TRUNC|O_WRONLY, 0644);
+  if (fd >= 0) {
+    char buf[20];
+    int len = sprintf(buf, "%d\n", pid);
+    ::write(fd, buf, len);
+    ::close(fd);
+  }
+}
 
-      // grab outgoing message
-      Message *m = _get_next_outgoing();
-      if (m) {
-       m->set_seq(++out_seq);
-       sent.push_back(m); // move to sent list
-       m->get();
-       lock.Unlock();
+static void remove_pid_file()
+{
+  if (!g_conf.pid_file)
+    return;
 
-        dout(20) << "writer encoding " << m->get_seq() << " " << m << " " << *m << dendl;
+  // only remove it if it has OUR pid in it!
+  int fd = ::open(g_conf.pid_file, O_RDONLY);
+  if (fd >= 0) {
+    char buf[20];
+    ::read(fd, buf, 20);
+    ::close(fd);
+    int a = atoi(buf);
 
-       // encode and copy out of *m
-        if (m->empty_payload()) 
-         m->encode_payload();
-       m->calc_front_crc();
+    if (a == getpid())
+      ::unlink(g_conf.pid_file);
+    else
+      generic_dout(0) << "strange, pid file " << g_conf.pid_file 
+             << " has " << a << ", not expected " << getpid()
+             << dendl;
+  }
+}
 
-        dout(20) << "writer sending " << m->get_seq() << " " << m << dendl;
-       int rc = write_message(m);
+int SimpleMessenger::start(bool nodaemon)
+{
+  // register at least one entity, first!
+  assert(my_type >= 0); 
 
-       lock.Lock();
-       if (rc < 0) {
-          derr(1) << "writer error sending " << m << " to " << m->get_header().dst << ", "
-                 << errno << ": " << strerror(errno) << dendl;
-         fault();
-        }
-       m->put();
-      }
-      continue;
-    }
-    
-    // wait
-    dout(20) << "writer sleeping" << dendl;
-    cond.Wait(lock);
+  lock.Lock();
+  if (started) {
+    dout(10) << "rank.start already started" << dendl;
+    lock.Unlock();
+    return 0;
   }
-  
-  dout(20) << "writer finishing" << dendl;
-
-  // reap?
-  bool reap = false;
-  writer_running = false;
-  if (!reader_running) reap = true;
 
+  dout(1) << "rank.start at " << rank_addr << dendl;
+  started = true;
   lock.Unlock();
-  
-  if (reap) {
-    dout(10) << "writer queueing for reap" << dendl;
-    if (sd >= 0) {
-      ::close(sd);
-      sd = -1;
-      closed_socket();
+
+  // daemonize?
+  if (g_conf.daemonize && !nodaemon) {
+    if (Thread::get_num_threads() > 0) {
+      derr(0) << "rank.start BUG: there are " << Thread::get_num_threads()
+             << " already started that will now die!  call rank.start() sooner." 
+             << dendl;
     }
-    rank->lock.Lock();
-    {
-      rank->pipe_reap_queue.push_back(this);
-      rank->wait_cond.Signal();
+    dout(1) << "rank.start daemonizing" << dendl;
+
+    if (1) {
+      daemon(1, 0);
+      write_pid_file(getpid());
+    } else {
+      pid_t pid = fork();
+      if (pid) {
+       // i am parent
+       write_pid_file(pid);
+       ::close(0);
+       ::close(1);
+       ::close(2);
+       _exit(0);
+      }
     }
-    rank->lock.Unlock();
+    if (g_conf.chdir && g_conf.chdir[0]) {
+      ::mkdir(g_conf.chdir, 0700);
+      ::chdir(g_conf.chdir);
+    }
+
+    _dout_rename_output_file();
+  } else if (g_daemon) {
+    write_pid_file(getpid());
   }
 
-  dout(10) << "writer done" << dendl;
+  // some debug hackery?
+  if (g_conf.kill_after) 
+    g_timer.add_event_after(g_conf.kill_after, new C_Die);
+
+  // go!
+  accepter.start();
+  return 0;
 }
 
 
-Message *Rank::Pipe::read_message()
+/* connect_rank
+ * NOTE: assumes rank.lock held.
+ */
+SimpleMessenger::Pipe *SimpleMessenger::connect_rank(const entity_addr_t& addr, const Policy& p)
 {
-  // envelope
-  //dout(10) << "receiver.read_message from sd " << sd  << dendl;
+  assert(lock.is_locked());
+  assert(addr != rank_addr);
   
-  ceph_msg_header header; 
-  ceph_msg_footer footer;
-
-  if (tcp_read( sd, (char*)&header, sizeof(header) ) < 0)
-    return 0;
+  dout(10) << "connect_rank to " << addr << ", creating pipe and registering" << dendl;
   
-  dout(20) << "reader got envelope type=" << header.type
-           << " src " << header.src << " dst " << header.dst
-           << " front=" << header.front_len
-          << " data=" << header.data_len
-          << " off " << header.data_off
-           << dendl;
-
-  // verify header crc
-  __u32 header_crc = crc32c_le(0, (unsigned char *)&header, sizeof(header) - sizeof(header.crc));
-  if (header_crc != header.crc) {
-    dout(0) << "reader got bad header crc " << header_crc << " != " << header.crc << dendl;
-    return 0;
-  }
+  // create pipe
+  Pipe *pipe = new Pipe(this, Pipe::STATE_CONNECTING);
+  pipe->policy = p;
+  pipe->peer_addr = addr;
+  pipe->start_writer();
+  pipe->register_pipe();
+  pipes.insert(pipe);
 
-  // ok, now it's safe to change the header..
-  // munge source address?
-  if (header.src.addr.ipaddr.sin_addr.s_addr == htonl(INADDR_ANY)) {
-    dout(10) << "reader munging src addr " << header.src << " to be " << peer_addr << dendl;
-    header.orig_src.addr.ipaddr = header.src.addr.ipaddr = peer_addr.ipaddr;
-  }
+  return pipe;
+}
 
-  // read front
-  bufferlist front;
-  bufferptr bp;
-  int front_len = header.front_len;
-  if (front_len) {
-    bp = buffer::create(front_len);
-    if (tcp_read( sd, bp.c_str(), front_len ) < 0) 
-      return 0;
-    front.push_back(bp);
-    dout(20) << "reader got front " << front.length() << dendl;
-  }
 
-  // read data
-  bufferlist data;
-  unsigned data_len = le32_to_cpu(header.data_len);
-  unsigned data_off = le32_to_cpu(header.data_off);
-  if (data_len) {
-    int left = data_len;
-    if (data_off & ~PAGE_MASK) {
-      // head
-      int head = MIN(PAGE_SIZE - (data_off & ~PAGE_MASK),
-                    (unsigned)left);
-      bp = buffer::create(head);
-      if (tcp_read( sd, bp.c_str(), head ) < 0) 
-       return 0;
-      data.push_back(bp);
-      left -= head;
-      dout(20) << "reader got data head " << head << dendl;
-    }
 
-    // middle
-    int middle = left & PAGE_MASK;
-    if (middle > 0) {
-      bp = buffer::create_page_aligned(middle);
-      if (tcp_read( sd, bp.c_str(), middle ) < 0) 
-       return 0;
-      data.push_back(bp);
-      left -= middle;
-      dout(20) << "reader got data page-aligned middle " << middle << dendl;
-    }
 
-    if (left) {
-      bp = buffer::create(left);
-      if (tcp_read( sd, bp.c_str(), left ) < 0) 
-       return 0;
-      data.push_back(bp);
-      dout(20) << "reader got data tail " << left << dendl;
-    }
-  }
 
-  // footer
-  if (tcp_read(sd, (char*)&footer, sizeof(footer)) < 0) 
-    return 0;
-  
-  int aborted = (le32_to_cpu(footer.flags) & CEPH_MSG_FOOTER_ABORTED);
-  dout(10) << "aborted = " << aborted << dendl;
-  if (aborted) {
-    dout(0) << "reader got " << front.length() << " + " << data.length()
-           << " byte message from " << header.src << ".. ABORTED" << dendl;
-    // MEH FIXME 
-    Message *m = new MGenericMessage(CEPH_MSG_PING);
-    header.type = CEPH_MSG_PING;
-    m->set_header(header);
-    return m;
-  }
 
-  dout(20) << "reader got " << front.length() << " + " << data.length()
-          << " byte message from " << header.src << dendl;
-  return decode_message(header, footer, front, data);
-}
 
 
-int Rank::Pipe::do_sendmsg(int sd, struct msghdr *msg, int len)
+/* register_entity 
+ */
+SimpleMessenger::Endpoint *SimpleMessenger::register_entity(entity_name_t name)
 {
-  while (len > 0) {
-    if (0) { // sanity
-      int l = 0;
-      for (unsigned i=0; i<msg->msg_iovlen; i++)
-       l += msg->msg_iov[i].iov_len;
-      assert(l == len);
-    }
+  dout(10) << "register_entity " << name << dendl;
+  lock.Lock();
+  
+  // create messenger
+  int erank = max_local;
+  Endpoint *msgr = new Endpoint(this, name, erank);
 
-    int r = ::sendmsg(sd, msg, 0);
-    if (r == 0) 
-      dout(10) << "do_sendmsg hmm do_sendmsg got r==0!" << dendl;
-    if (r < 0) { 
-      dout(1) << "do_sendmsg error " << strerror(errno) << dendl;
-      return -1;
-    }
-    if (state == STATE_CLOSED) {
-      dout(10) << "do_sendmsg oh look, state == CLOSED, giving up" << dendl;
-      errno = EINTR;
-      return -1; // close enough
-    }
-    len -= r;
-    if (len == 0) break;
-    
-    // hrmph.  trim r bytes off the front of our message.
-    dout(20) << "do_sendmail short write did " << r << ", still have " << len << dendl;
-    while (r > 0) {
-      if (msg->msg_iov[0].iov_len <= (size_t)r) {
-       // lose this whole item
-       //dout(30) << "skipping " << msg->msg_iov[0].iov_len << ", " << (msg->msg_iovlen-1) << " v, " << r << " left" << dendl;
-       r -= msg->msg_iov[0].iov_len;
-       msg->msg_iov++;
-       msg->msg_iovlen--;
-      } else {
-       // partial!
-       //dout(30) << "adjusting " << msg->msg_iov[0].iov_len << ", " << msg->msg_iovlen << " v, " << r << " left" << dendl;
-       msg->msg_iov[0].iov_base = (void*)((long)msg->msg_iov[0].iov_base + r);
-       msg->msg_iov[0].iov_len -= r;
-       break;
-      }
-    }
-  }
-  return 0;
+  // now i know my type.
+  if (my_type >= 0)
+    assert(my_type == name.type());
+  else
+    my_type = name.type();
+
+  // add to directory
+  max_local++;
+  local.resize(max_local);
+  stopped.resize(max_local);
+
+  msgr->get();
+  local[erank] = msgr;
+  stopped[erank] = false;
+  msgr->_myinst.addr = rank_addr;
+  if (msgr->_myinst.addr.ipaddr == entity_addr_t().ipaddr)
+    msgr->need_addr = true;
+  msgr->_myinst.addr.erank = erank;
+
+  dout(10) << "register_entity " << name << " at " << msgr->_myinst.addr 
+          << " need_addr=" << need_addr
+          << dendl;
+
+  num_local++;
+  
+  lock.Unlock();
+  return msgr;
 }
 
 
-int Rank::Pipe::write_ack(unsigned seq)
+void SimpleMessenger::unregister_entity(Endpoint *msgr)
 {
-  dout(10) << "write_ack " << seq << dendl;
+  lock.Lock();
+  dout(10) << "unregister_entity " << msgr->get_myname() << dendl;
+  
+  // remove from local directory.
+  assert(msgr->my_rank >= 0);
+  assert(local[msgr->my_rank] == msgr);
+  local[msgr->my_rank] = 0;
+  stopped[msgr->my_rank] = true;
+  num_local--;
+  msgr->my_rank = -1;
 
-  char c = CEPH_MSGR_TAG_ACK;
-  __le32 s;
-  s = seq;
+  assert(msgr->nref.test() > 1);
+  msgr->put();
 
-  struct msghdr msg;
-  memset(&msg, 0, sizeof(msg));
-  struct iovec msgvec[2];
-  msgvec[0].iov_base = &c;
-  msgvec[0].iov_len = 1;
-  msgvec[1].iov_base = &s;
-  msgvec[1].iov_len = sizeof(s);
-  msg.msg_iov = msgvec;
-  msg.msg_iovlen = 2;
-  
-  if (do_sendmsg(sd, &msg, 5) < 0) 
-    return -1; 
-  return 0;
+  wait_cond.Signal();
+
+  lock.Unlock();
 }
 
 
-int Rank::Pipe::write_message(Message *m)
+void SimpleMessenger::submit_message(Message *m, const entity_addr_t& dest_addr, bool lazy)
 {
-  ceph_msg_header& header = m->get_header();
-  ceph_msg_footer& footer = m->get_footer();
+  const entity_name_t dest = m->get_dest();
 
-  // get envelope, buffers
-  header.front_len = m->get_payload().length();
-  header.data_len = m->get_data().length();
-  footer.flags = 0;
-  m->calc_header_crc();
+  assert(m->nref.test() == 0);
 
-  bufferlist blist = m->get_payload();
-  blist.append(m->get_data());
-  
-  dout(20)  << "write_message " << m << " to " << header.dst << dendl;
-  
-  // set up msghdr and iovecs
-  struct msghdr msg;
-  memset(&msg, 0, sizeof(msg));
-  struct iovec msgvec[3 + blist.buffers().size()];  // conservative upper bound
-  msg.msg_iov = msgvec;
-  int msglen = 0;
-  
-  // send tag
-  char tag = CEPH_MSGR_TAG_MSG;
-  msgvec[msg.msg_iovlen].iov_base = &tag;
-  msgvec[msg.msg_iovlen].iov_len = 1;
-  msglen++;
-  msg.msg_iovlen++;
+  m->get_header().mon_protocol = CEPH_MON_PROTOCOL;
+  m->get_header().monc_protocol = CEPH_MONC_PROTOCOL;
+  m->get_header().mds_protocol = CEPH_MDS_PROTOCOL;
+  m->get_header().mdsc_protocol = CEPH_MDSC_PROTOCOL;
+  m->get_header().osd_protocol = CEPH_OSD_PROTOCOL;
+  m->get_header().osdc_protocol = CEPH_OSDC_PROTOCOL;
 
-  // send envelope
-  msgvec[msg.msg_iovlen].iov_base = (char*)&header;
-  msgvec[msg.msg_iovlen].iov_len = sizeof(header);
-  msglen += sizeof(header);
-  msg.msg_iovlen++;
+  // lookup
+  entity_addr_t dest_proc_addr = dest_addr;
+  dest_proc_addr.erank = 0;
 
-  // payload (front+data)
-  list<bufferptr>::const_iterator pb = blist.buffers().begin();
-  int b_off = 0;  // carry-over buffer offset, if any
-  int bl_pos = 0; // blist pos
-  int left = blist.length();
+  lock.Lock();
+  {
+    // local?
+    if (rank_addr.is_local_to(dest_addr)) {
+      if (dest_addr.erank < max_local && local[dest_addr.erank]) {
+        // local
+        dout(20) << "submit_message " << *m << " dest " << dest << " local" << dendl;
+       local[dest_addr.erank]->queue_message(m);
+      } else {
+        derr(0) << "submit_message " << *m << " dest " << dest << " " << dest_addr << " local but not in local map?  dropping." << dendl;
+        //assert(0);  // hmpf, this is probably mds->mon beacon from newsyn.
+       delete m;
+      }
+    }
+    else {
+      // remote.
+      Pipe *pipe = 0;
+      if (rank_pipe.count( dest_proc_addr )) {
+        // connected?
+        pipe = rank_pipe[ dest_proc_addr ];
+       pipe->lock.Lock();
+       if (pipe->state == Pipe::STATE_CLOSED) {
+         dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_addr << ", ignoring old closed pipe." << dendl;
+         pipe->unregister_pipe();
+         pipe->lock.Unlock();
+         pipe = 0;
+       } else {
+         dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_addr << ", have pipe." << dendl;
 
-  while (left > 0) {
-    int donow = MIN(left, (int)pb->length()-b_off);
-    if (donow == 0) {
-      dout(0) << "donow = " << donow << " left " << left << " pb->length " << pb->length() << " b_off " << b_off << dendl;
+         // if this pipe was created by an incoming connection, but we haven't received
+         // a message yet, then it won't have the policy set.
+         if (pipe->get_out_seq() == 0)
+           pipe->policy = policy_map[m->get_dest().type()];
+
+         pipe->_send(m);
+         pipe->lock.Unlock();
+       }
+      }
+      if (!pipe) {
+       if (lazy) {
+         dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_addr << ", lazy, dropping." << dendl;
+         delete m;
+       } else {
+         dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_addr << ", new pipe." << dendl;
+         // not connected.
+         pipe = connect_rank(dest_proc_addr, policy_map[m->get_dest().type()]);
+         pipe->send(m);
+       }
+      }
     }
-    assert(donow > 0);
-    dout(30) << " bl_pos " << bl_pos << " b_off " << b_off
-            << " leftinchunk " << left
-            << " buffer len " << pb->length()
-            << " writing " << donow 
-            << dendl;
-    
-    if (msg.msg_iovlen >= IOV_MAX-2) {
-      if (do_sendmsg(sd, &msg, msglen)) 
-       return -1;      
-      
-      // and restart the iov
-      msg.msg_iov = msgvec;
-      msg.msg_iovlen = 0;
-      msglen = 0;
+  }
+
+  lock.Unlock();
+}
+
+
+
+
+
+void SimpleMessenger::wait()
+{
+  lock.Lock();
+  while (1) {
+    // reap dead pipes
+    reaper();
+
+    if (num_local == 0) {
+      dout(10) << "wait: everything stopped" << dendl;
+      break;   // everything stopped.
+    } else {
+      dout(10) << "wait: local still has " << local.size() << " items, waiting" << dendl;
     }
     
-    msgvec[msg.msg_iovlen].iov_base = (void*)(pb->c_str()+b_off);
-    msgvec[msg.msg_iovlen].iov_len = donow;
-    msglen += donow;
-    msg.msg_iovlen++;
-    
-    left -= donow;
-    assert(left >= 0);
-    b_off += donow;
-    bl_pos += donow;
-    if (left == 0) break;
-    while (b_off == (int)pb->length()) {
-      pb++;
-      b_off = 0;
-    }
+    wait_cond.Wait(lock);
   }
-  assert(left == 0);
+  lock.Unlock();
+  
+  // done!  clean up.
+  dout(20) << "wait: stopping accepter thread" << dendl;
+  accepter.stop();
+  dout(20) << "wait: stopped accepter thread" << dendl;
 
-  // send footer
-  msgvec[msg.msg_iovlen].iov_base = (void*)&footer;
-  msgvec[msg.msg_iovlen].iov_len = sizeof(footer);
-  msglen += sizeof(footer);
-  msg.msg_iovlen++;
+  // close+reap all pipes
+  lock.Lock();
+  {
+    dout(10) << "wait: closing pipes" << dendl;
+    list<Pipe*> toclose;
+    for (hash_map<entity_addr_t,Pipe*>::iterator i = rank_pipe.begin();
+         i != rank_pipe.end();
+         i++)
+      toclose.push_back(i->second);
+    for (list<Pipe*>::iterator i = toclose.begin();
+        i != toclose.end();
+        i++) {
+      (*i)->unregister_pipe();
+      (*i)->lock.Lock();
+      (*i)->stop();
+      (*i)->lock.Unlock();
+    }
 
-  // send
-  if (do_sendmsg(sd, &msg, msglen)) 
-    return -1; 
+    reaper();
+    dout(10) << "wait: waiting for pipes " << pipes << " to close" << dendl;
+    while (!pipes.empty()) {
+      wait_cond.Wait(lock);
+      reaper();
+    }
+  }
+  lock.Unlock();
 
-  return 0;
+  dout(10) << "wait: done." << dendl;
+  dout(1) << "shutdown complete." << dendl;
+  remove_pid_file();
+  started = false;
+  my_type = -1;
 }
 
 
+
+
+void SimpleMessenger::mark_down(entity_addr_t addr)
+{
+  lock.Lock();
+  if (rank_pipe.count(addr)) {
+    Pipe *p = rank_pipe[addr];
+    dout(1) << "mark_down " << addr << " -- " << p << dendl;
+    p->unregister_pipe();
+    p->lock.Lock();
+    p->stop();
+    p->lock.Unlock();
+  } else {
+    dout(1) << "mark_down " << addr << " -- pipe dne" << dendl;
+  }
+  lock.Unlock();
+}
+
index 98f808c7cebc2edbaf027a308704db7f4b738ff0..d25d58189a192ff2b619d810daf7c7d49cbc9f2d 100644 (file)
@@ -36,7 +36,7 @@ using namespace __gnu_cxx;
 
 /* Rank - per-process
  */
-class Rank {
+class SimpleMessenger {
 public:
   struct Policy {
     bool lossy_tx;                // 
@@ -88,11 +88,11 @@ private:
   // incoming
   class Accepter : public Thread {
   public:
-    Rank *rank;
+    SimpleMessenger *rank;
     bool done;
     int listen_sd;
     
-    Accepter(Rank *r) : rank(r), done(false), listen_sd(-1) {}
+    Accepter(SimpleMessenger *r) : rank(r), done(false), listen_sd(-1) {}
     
     void *entry();
     void stop();
@@ -105,7 +105,7 @@ private:
   // pipe
   class Pipe {
   public:
-    Rank *rank;
+    SimpleMessenger *rank;
     ostream& _pipe_prefix();
 
     enum {
@@ -179,10 +179,10 @@ private:
     friend class Writer;
     
   public:
-    Pipe(Rank *r, int st) : 
+    Pipe(SimpleMessenger *r, int st) : 
       rank(r),
       sd(-1),
-      lock("Rank::Pipe::lock"),
+      lock("SimpleMessenger::Pipe::lock"),
       state(st), 
       reader_running(false), writer_running(false),
       connect_seq(0), peer_global_seq(0),
@@ -264,7 +264,7 @@ private:
 
   // messenger interface
   class Endpoint : public Messenger {
-    Rank *rank;
+    SimpleMessenger *rank;
     Mutex lock;
     Cond cond;
     map<int, list<Message*> > dispatch_queue;
@@ -286,7 +286,7 @@ private:
     } dispatch_thread;
     void dispatch_entry();
 
-    friend class Rank;
+    friend class SimpleMessenger;
 
   public:
     void queue_message(Message *m) {
@@ -331,10 +331,10 @@ private:
     }
 
   public:
-    Endpoint(Rank *r, entity_name_t name, int rn) : 
+    Endpoint(SimpleMessenger *r, entity_name_t name, int rn) : 
       Messenger(name),
       rank(r),
-      lock("Rank::Endpoint::lock"),
+      lock("SimpleMessenger::Endpoint::lock"),
       stop(false),
       qlen(0),
       my_rank(rn),
@@ -373,7 +373,7 @@ private:
   };
 
 
-  // Rank stuff
+  // SimpleMessenger stuff
  public:
   Mutex lock;
   Cond  wait_cond;  // for wait()
@@ -409,12 +409,12 @@ private:
   void reaper();
 
 public:
-  Rank() : accepter(this),
-          lock("Rank::lock"), started(false), need_addr(true),
+  SimpleMessenger() : accepter(this),
+          lock("SimpleMessenger::lock"), started(false), need_addr(true),
           max_local(0), num_local(0),
           my_type(-1),
-          global_seq_lock("Rank::global_seq_lock"), global_seq(0) { }
-  ~Rank() { }
+          global_seq_lock("SimpleMessenger::global_seq_lock"), global_seq(0) { }
+  ~SimpleMessenger() { }
 
   //void set_listen_addr(tcpaddr_t& a);
 
@@ -444,8 +444,4 @@ public:
   }
 } ;
 
-
-
-extern Rank rank;
-
 #endif
index 6da4d057a9946b6248339a5c9149beb8ade1c833..5bb484d159da172090d115b899a49a5aafeb41d4 100644 (file)
@@ -83,6 +83,7 @@ int main(int argc, const char **argv, const char *envp[]) {
   
   // start up network
   g_my_addr = monmap.get_inst(whoami).addr;
+  SimpleMessenger rank;
   int err = rank.bind();
   if (err < 0)
     return 1;