]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
tcp messenger!
authorsage <sage@29311d96-e01e-0410-9327-a35deaab8ce9>
Thu, 16 Jun 2005 14:52:37 +0000 (14:52 +0000)
committersage <sage@29311d96-e01e-0410-9327-a35deaab8ce9>
Thu, 16 Jun 2005 14:52:37 +0000 (14:52 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@325 29311d96-e01e-0410-9327-a35deaab8ce9

ceph/Makefile
ceph/config.cc
ceph/config.h
ceph/mds/MDBalancer.cc
ceph/msg/MPIMessenger.cc
ceph/msg/TCPMessenger.cc [new file with mode: 0644]
ceph/msg/TCPMessenger.h [new file with mode: 0644]
ceph/osd/OSDMap.h
ceph/tcpfuse.cc [new file with mode: 0644]
ceph/tcpsyn.cc [new file with mode: 0644]
ceph/test/testmpi.cc

index fb9b084a0d979be3e3f47f05964dabe2f69d1a14..eb33a8a1d2c76d7fb48488012ea88fc052689c87 100644 (file)
@@ -87,9 +87,15 @@ mttest: test/mttest.cc msg/MTMessenger.cc ${COMMON_OBJS}
 mpifuse: mpifuse.cc mds/allmds.o client/Client.o client/SyntheticClient.o osd/OSD.o client/fuse.o msg/MPIMessenger.cc msg/CheesySerializer.o ${COMMON_OBJS}
        ${MPICC} ${CFLAGS} ${LIBS} -lfuse $^ -o $@
 
+tcpfuse: tcpfuse.cc mds/allmds.o client/Client.o client/SyntheticClient.o osd/OSD.o client/fuse.o msg/TCPMessenger.cc msg/CheesySerializer.o ${COMMON_OBJS}
+       ${MPICC} ${CFLAGS} ${LIBS} -lfuse $^ -o $@
+
 mpisyn: mpisyn.cc mds/allmds.o client/Client.o client/SyntheticClient.o osd/OSD.o msg/MPIMessenger.cc msg/CheesySerializer.o ${COMMON_OBJS}
        ${MPICC} ${MPICFLAGS} ${MPILIBS} $^ -o $@
 
+tcpsyn: tcpsyn.cc mds/allmds.o client/Client.o client/SyntheticClient.o osd/OSD.o msg/TCPMessenger.cc msg/CheesySerializer.o ${COMMON_OBJS}
+       ${MPICC} ${MPICFLAGS} ${MPILIBS} $^ -o $@
+
 fakesyn: fakesyn.cc mds/allmds.o client/Client.o client/SyntheticClient.o osd/OSD.o msg/FakeMessenger.o msg/CheesySerializer.o ${COMMON_OBJS}
        ${CC} -pg ${CFLAGS} ${LIBS} $^ -o $@
 
index 33fddeff14cd93c56d39c648b4b1108730a5e9f5..6afb54e9a0370fdc287ba38f2d9c708bc2aabf71 100644 (file)
@@ -50,6 +50,7 @@ md_config_t g_conf = {
   mds_bal_replicate_threshold: 500,
   mds_bal_unreplicate_threshold: 200,
   mds_bal_interval: 60,           // seconds
+  mds_bal_idle_threshold: .1,
 
   mds_commit_on_shutdown: true,
 
index 48da001ec764c6c9211d0e734502bf7d7edc0d2a..3d2cc10b23f3df6130c27d3c25ae522835bcd2f4 100644 (file)
@@ -41,6 +41,7 @@ struct md_config_t {
   float mds_bal_replicate_threshold;
   float mds_bal_unreplicate_threshold;
   int   mds_bal_interval;
+  float mds_bal_idle_threshold;
 
   bool  mds_commit_on_shutdown;
   bool  mds_verify_export_dirauth;     // debug flag
index dbeb42639eb57ad29fc6643a712aadd4c6945904..a2abc5a3c9a94239f33d22034fff20e81049f260 100644 (file)
@@ -194,7 +194,7 @@ void MDBalancer::do_rebalance()
                maxim < 0) break;
 
        if (maxim < maxex) {  // import takes it all
-         dout(5) << " - " << (*exporter).second << " exports " << maxim << " to " << (*importer).second << endl;
+         dout(5) << "   - mds" << (*exporter).second << " exports " << maxim << " to mds" << (*importer).second << endl;
          if ((*exporter).second == whoami)
                my_targets.insert(pair<int,double>((*importer).second, maxim));
          exported += maxim;
@@ -202,7 +202,7 @@ void MDBalancer::do_rebalance()
          imported = 0;
        } 
        else if (maxim > maxex) {         // export all
-         dout(5) << " - " << (*exporter).second << " exports " << maxex << " to " << (*importer).second << endl;
+         dout(5) << "   - mds" << (*exporter).second << " exports " << maxex << " to mds" << (*importer).second << endl;
          if ((*exporter).second == whoami)
                my_targets.insert(pair<int,double>((*importer).second, maxex));
          imported += maxex;
@@ -210,7 +210,7 @@ void MDBalancer::do_rebalance()
          exported = 0;
        } else {
          // wow, perfect match!
-         dout(5) << " - " << (*exporter).second << " exports " << maxex << " to " << (*importer).second << endl;
+         dout(5) << "   - mds" << (*exporter).second << " exports " << maxex << " to mds" << (*importer).second << endl;
          if ((*exporter).second == whoami)
                my_targets.insert(pair<int,double>((*importer).second, maxex));
          imported = exported = 0;
@@ -224,9 +224,16 @@ void MDBalancer::do_rebalance()
   for (set<CDir*>::iterator it = mds->mdcache->imports.begin();
           it != mds->mdcache->imports.end();
           it++) {
-       import_pop_map.insert(pair<double,CDir*>((*it)->popularity[MDS_POP_CURDOM].get(now), *it));
+       double pop = (*it)->popularity[MDS_POP_CURDOM].get(now);
+       if (pop < g_conf.mds_bal_idle_threshold &&
+               (*it)->inode != mds->mdcache->get_root()) {
+         dout(5) << " exporting idle import " << **it << endl;
+         mds->mdcache->export_dir(*it, (*it)->inode->authority());
+         continue;
+       }
+       import_pop_map[ pop ] = *it;
        int from = (*it)->inode->authority();
-       dout(5) << "map i imported " << **it << " from " << from << endl;
+       dout(15) << "  map: i imported " << **it << " from " << from << endl;
        import_from_map.insert(pair<int,CDir*>(from, *it));
   }
   
@@ -552,7 +559,7 @@ void MDBalancer::show_imports(bool external)
                 p != mds->mdcache->nested_exports[im].end();
                 p++) {
          CDir *exp = *p;
-         dout(db) << "      - ex (" << exp->popularity[MDS_POP_NESTED].get(now) << ", " << exp->popularity[MDS_POP_ANYDOM].get(now) << ")" << *exp << " to " << exp->dir_auth << endl;
+         dout(db) << "      - ex (" << exp->popularity[MDS_POP_NESTED].get(now) << ", " << exp->popularity[MDS_POP_ANYDOM].get(now) << ")  " << *exp << " to " << exp->dir_auth << endl;
          assert( exp->is_export() );
          assert( !exp->is_auth() );
          
index 6ea3c6083cec45f3d9817d2d2b3be710dfdcc478..bb6997859098b5bd60a650c8b88bd18c99a2bafa 100644 (file)
@@ -184,12 +184,8 @@ Message *mpi_recv(int tag)
                                  MPI_COMM_WORLD,
                                  &status/*,
                                                   &recv_env_req*/) == MPI_SUCCESS);
-  
-  if (status.count < MSG_ENVELOPE_LEN) {
-       dout(DBLVL) << "mpi_recv got short recv " << status.count << " bytes" << endl;
-       assert(0);
-       return 0;
-  }
+  assert(status.count == MSG_ENVELOPE_LEN);
+
   if (env.type == 0) {
        dout(DBLVL) << "mpi_recv got type 0 message, kicked!" << endl;
        return 0;
@@ -202,7 +198,7 @@ Message *mpi_recv(int tag)
   for (int i=0; i<env.nchunks; i++) {
        MPI_Status fragstatus;
        ASSERT(MPI_Probe(status.MPI_SOURCE,
-                                        tag, //TAG_PAYLOAD,
+                                        tag,
                                         MPI_COMM_WORLD,
                                         &fragstatus) == MPI_SUCCESS);
 
@@ -212,7 +208,7 @@ Message *mpi_recv(int tag)
                                        fragstatus.count,
                                        MPI_CHAR, 
                                        status.MPI_SOURCE,
-                                       tag, //TAG_PAYLOAD,
+                                       tag,
                                        MPI_COMM_WORLD,
                                        &fragstatus) == MPI_SUCCESS);
 
diff --git a/ceph/msg/TCPMessenger.cc b/ceph/msg/TCPMessenger.cc
new file mode 100644 (file)
index 0000000..dd66b58
--- /dev/null
@@ -0,0 +1,688 @@
+
+#include "include/config.h"
+#include "include/error.h"
+
+#include "common/Timer.h"
+#include "common/Mutex.h"
+
+#include "TCPMessenger.h"
+#include "Message.h"
+
+#include <iostream>
+#include <cassert>
+using namespace std;
+#include <ext/hash_map>
+using namespace __gnu_cxx;
+
+
+# include <netdb.h>
+# include <sys/socket.h>
+# include <netinet/in.h>
+# include <arpa/inet.h>
+#include <sys/select.h>
+#include <fcntl.h>
+#include <sys/types.h>
+
+#include <unistd.h>
+#include <mpi.h>
+
+
+#define DBL 18
+
+int tcp_port = 9876;
+
+/*
+ * We make a directory, so that we can have multiple Messengers in the
+ * same process (rank).  This is useful for benchmarking and creating lots of 
+ * simulated clients, e.g.
+ */
+
+hash_map<int, TCPMessenger*>  directory;  // local
+list<Message*>                incoming;
+Mutex                         incoming_lock;
+Cond                          incoming_cond;
+
+struct sockaddr_in *remote_addr;
+int                *in_sd;     // incoming sockets
+pthread_t          *in_threads;
+int                *out_sd;    // outgoing sockets
+
+struct sockaddr_in listen_addr;
+int                listen_sd = 0;
+
+
+
+/* this process */
+int mpi_world;
+int mpi_rank;
+bool tcp_done = false;     // set this flag to stop the event loop
+
+pthread_t dispatch_thread_id = 0;   // thread id of the event loop.  init value == nobody
+pthread_t listen_thread_id = 0;
+Mutex sender_lock;
+
+Timer *pending_timer = 0;
+
+
+
+// debug
+#undef dout
+#define  dout(l)    if (l<=g_conf.debug) cout << "[TCP " << mpi_rank << "/" << mpi_world << " " << getpid() << "." << pthread_self() << "] "
+
+ostream& operator<<(ostream& out, struct sockaddr_in &a)
+{
+  char addr[4];
+  memcpy((char*)addr, (char*)&a.sin_addr.s_addr, 4);
+  out << (int)addr[0] << "."
+         << (int)addr[1] << "."
+         << (int)addr[2] << "."
+         << (int)addr[3] << ":"
+         << (int)a.sin_port;
+  return out;
+}
+
+
+/*****
+ * MPI global methods for process-wide startup, shutdown.
+ */
+
+int tcpmessenger_init(int& argc, char**& argv)
+{
+  // exhcnage addresses with other nodes
+  MPI_Init(&argc, &argv);
+  
+  MPI_Comm_size(MPI_COMM_WORLD, &mpi_world);
+  MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank);
+
+  dout(DBL) << "rank is " << mpi_rank << " / " << mpi_world << endl;
+
+  // LISTEN
+  dout(DBL) << "binding to listen " << endl;
+  
+  /* socket creation */
+  listen_sd = socket(AF_INET,SOCK_STREAM,0);
+  assert(listen_sd > 0);
+  
+  /* bind to port */
+  memset((char*)&listen_addr, 0, sizeof(listen_addr));
+  listen_addr.sin_family = AF_INET;
+  listen_addr.sin_addr.s_addr = htonl(INADDR_ANY);
+  listen_addr.sin_port = 0;
+  
+  int rc = bind(listen_sd, (struct sockaddr *) &listen_addr, sizeof(listen_addr));
+  assert(rc >= 0);
+
+  socklen_t llen = sizeof(listen_addr);
+  getsockname(listen_sd, (sockaddr*)&listen_addr, &llen);
+
+  int myport = listen_addr.sin_port;
+
+  // listen!
+  rc = ::listen(listen_sd, 2*mpi_world);
+  assert(rc >= 0);
+
+  dout(DBL) << "listening on " << myport << endl;
+  
+  remote_addr = new (struct sockaddr_in)[mpi_world];
+
+  // my address is...
+  char host[100];
+  gethostname(host, 100);
+  dout(DBL) << "my hostname is " << host << endl;
+
+  struct hostent *myhostname = gethostbyname( host ); 
+  
+  struct sockaddr_in myaddr;
+  myaddr.sin_family = myhostname->h_addrtype;
+  memcpy((char *) &myaddr.sin_addr.s_addr, 
+                myhostname->h_addr_list[0], 
+                myhostname->h_length);
+  myaddr.sin_port = myport;
+
+  dout(DBL) << "my ip is " << myaddr << endl;
+
+  remote_addr[mpi_rank] = myaddr;
+
+  dout(DBL) << "MPI_Allgathering addrs" << endl;
+  MPI_Allgather( &myaddr, sizeof(struct sockaddr_in), MPI_CHAR,
+                                remote_addr, sizeof(struct sockaddr_in), MPI_CHAR,
+                                MPI_COMM_WORLD);
+
+  //  for (int i=0; i<mpi_world; i++) 
+  //dout(DBL) << "  addr of " << i << " is " << remote_addr[i] << endl;
+
+
+  MPI_Finalize();
+
+  in_sd = new int[mpi_world];
+  memset(in_sd, 0, sizeof(int)*mpi_world);
+  out_sd = new int[mpi_world];
+  memset(out_sd, 0, sizeof(int)*mpi_world);
+  in_threads = new pthread_t[mpi_world];
+  memset(in_threads, 0, sizeof(pthread_t)*mpi_world);
+
+  dout(DBL) << "init done" << endl;
+  return mpi_rank;
+}
+
+
+
+int tcpmessenger_shutdown() 
+{
+  dout(5) << "tcpmessenger_shutdown clsoing all sockets etc" << endl;
+
+  // bleh
+
+
+  delete remote_addr;
+  delete in_sd;
+  delete out_sd;
+}
+
+int tcpmessenger_world()
+{
+  return mpi_world;
+}
+
+
+
+/***
+ * internal send/recv
+ */
+
+
+
+bool tcp_read(int sd, char *buf, int len)
+{
+  while (len > 0) {
+       int got = ::recv( sd, buf, len, 0 );
+       if (got < 0) return false;
+       assert(got >= 0);
+       len -= got;
+       buf += got;
+       //dout(DBL) << "tcp_read got " << got << ", " << len << " left" << endl;
+  }
+  return true;
+}
+
+void tcp_write(int sd, char *buf, int len)
+{
+  //dout(DBL) << "tcp_write writing " << len << endl;
+  while (len > 0) {
+       int did = ::send( sd, buf, len, 0 );
+       assert(did >= 0);
+       len -= did;
+       buf += did;
+       dout(DBL) << "tcp_write did " << did << ", " << len << " left" << endl;
+  }
+
+}
+
+
+/*
+ * recv a Message*
+ */
+
+
+
+void tcp_wait()
+{
+  fd_set fds;
+  FD_ZERO(&fds);
+
+  int n = 0;
+
+  for (int i=0; i<mpi_world; i++) {
+       if (in_sd[i] == 0) continue;
+       FD_SET(in_sd[i], &fds);
+       n++;
+  }
+  assert(n == mpi_world);
+
+  struct timeval tv;
+  tv.tv_sec = 10;        // time out every few seconds
+  tv.tv_usec = 0;
+  
+  dout(DBL) << "tcp_wait on " << n << endl;
+  int r = ::select(n, &fds, 0, &fds, 0);//&tv);
+  dout(DBL) << "select returned " << r << endl;
+}
+
+
+
+Message *tcp_recv(int from)
+{
+  // envelope
+  dout(DBL) << "tcp_recv receiving message from " << from  << endl;
+  
+  msg_envelope_t env;
+  if (!tcp_read( in_sd[from], (char*)&env, sizeof(env) ))
+       return 0;
+
+  if (env.type == 0) {
+       dout(DBL) << "got dummy env, bailing" << endl;
+       return 0;
+  }
+
+  dout(DBL) << "tcp_recv got envelope type=" << env.type << " src " << env.source << " dst " << env.dest << " nchunks=" << env.nchunks << endl;
+  
+  // payload
+  bufferlist blist;
+  for (int i=0; i<env.nchunks; i++) {
+       int size;
+       tcp_read( in_sd[from], (char*)&size, sizeof(size) );
+
+       bufferptr bp = new buffer(size);
+       
+       tcp_read( in_sd[from], bp.c_str(), size );
+
+       bp.set_length(size);
+       blist.push_back(bp);
+
+       dout(DBL) << "tcp_recv got frag " << i << " of " << env.nchunks << " len " << bp.length() << endl;
+  }
+  
+  dout(DBL) << "tcp_recv got " << blist.length() << " byte message" << endl;
+
+  // unmarshall message
+  Message *m = decode_message(env, blist);
+  return m;
+}
+
+
+void *tcp_inthread(void *r)
+{
+  int who = (int)r;
+
+  dout(DBL) << "tcp_inthread reading for " << who << endl;
+
+  while (!tcp_done) {
+       /*
+       fd_set fds;
+       FD_ZERO(&fds);
+       FD_SET(in_sd[who], &fds);
+       
+       dout(DBL) << "tcp_inthread waiting on socket" << endl;
+       ::select(1, &fds, 0, &fds, 0);
+       */
+
+       Message *m = tcp_recv(who);
+       if (!m) break;
+
+       incoming_lock.Lock();
+       incoming.push_back(m);
+       incoming_lock.Unlock();
+       incoming_cond.Signal();
+  }
+
+  dout(DBL) << "tcp_inthrad closing " << who << endl;
+
+  ::close(in_sd[who]);
+  in_sd[who] = 0;
+
+  return 0;  
+}
+
+
+void *tcp_accepter(void *)
+{
+  dout(DBL) << "tcp_accepter starting" << endl;
+
+  int left = mpi_world;
+  while (left > 0) {
+       //dout(DBL) << "accepting, left = " << left << endl;
+
+       struct sockaddr_in addr;
+       socklen_t slen = sizeof(addr);
+       int sd = ::accept(listen_sd, (sockaddr*)&addr, &slen);
+       if (sd > 0) {
+         
+         //dout(DBL) << "accepted incoming, reading who it is " << endl;
+         
+         int who;
+         tcp_read(sd, (char*)&who, sizeof(who));
+         
+         in_sd[who] = sd;
+         left--;
+
+         dout(DBL) << "accepted incoming from " << who << ", left = " << left << endl;
+
+         pthread_create(&in_threads[who],
+                                        NULL,
+                                        tcp_inthread,
+                                        (void*)who);
+       } else {
+         dout(DBL) << "no incoming connection?" << endl;
+       }
+  }
+  dout(DBL) << "got incoming from everyone!" << endl;
+}
+
+
+
+bool tcp_recv_any() 
+{  
+  bool any = false;
+
+  //if (mpi_rank == 0 && tcp_accept()) any = true;
+  
+  // any?
+  for (int i=0; i<mpi_world; i++) {
+       if (in_sd[i] == 0) continue;
+
+       char blah;
+       int r = ::recv( in_sd[i], &blah, 1, MSG_PEEK|MSG_DONTWAIT );
+       if (r > 0) {
+         Message *m = tcp_recv(i);
+         if (m) {
+               incoming_lock.Lock();
+               incoming.push_back(m);
+               incoming_cond.Signal();
+               incoming_lock.Unlock();
+               any = true;
+         }
+       }
+  }
+
+  return any;
+}
+
+void tcp_open(int who)
+{
+  //dout(DBL) << "tcp_open " << who << " to " << remote_addr[who] << endl;
+
+  // create socket?
+  int sd = socket(AF_INET,SOCK_STREAM,0);
+  assert(sd > 0);
+  
+  // bind any port
+  struct sockaddr_in myAddr;
+  myAddr.sin_family = AF_INET;
+  myAddr.sin_addr.s_addr = htonl(INADDR_ANY);
+  myAddr.sin_port = htons( 0 );    
+  
+  int rc = bind(sd, (struct sockaddr *) &myAddr, sizeof(myAddr));
+  assert(rc>=0);
+
+  // connect!
+  int r = connect(sd, (sockaddr*)&remote_addr[who], sizeof(myAddr));
+  assert(r >= 0);
+
+  //dout(DBL) << "tcp_open connected to " << who << endl;
+
+  int me = mpi_rank;
+  tcp_write(sd, (char*)&me, sizeof(me));
+
+  out_sd[who] = sd;
+}
+
+
+
+/*
+ * send a Message* over the wire.  ** do not block **.
+ */
+int tcp_send(Message *m)
+{
+  int rank = MPI_DEST_TO_RANK(m->get_dest(), mpi_world);
+
+  // marshall
+  m->encode_payload();
+  msg_envelope_t *env = &m->get_envelope();
+  bufferlist blist = m->get_payload();
+  env->nchunks = blist.buffers().size();
+
+  dout(7) << "sending " << *m << " to " << MSG_ADDR_NICE(env->dest) << " (rank " << rank << ")" << endl;
+  
+  sender_lock.Lock();
+  
+  // open first?
+  if (out_sd[rank] == 0) tcp_open(rank);
+
+  // send envelope
+  tcp_write( out_sd[rank], (char*)env, sizeof(*env) );
+
+  // payload
+  int i = 0;
+  for (list<bufferptr>::iterator it = blist.buffers().begin();
+          it != blist.buffers().end();
+          it++) {
+       dout(DBL) << "tcp_sending frag " << i << " len " << (*it).length() << endl;
+       int size = (*it).length();
+       tcp_write( out_sd[rank], (char*)&size, sizeof(size) );
+       tcp_write( out_sd[rank], (*it).c_str(), size );
+       i++;
+  }
+
+  sender_lock.Unlock();
+}
+
+
+
+// recv event loop, for unsolicited messages.
+
+void* tcpmessenger_loop(void*)
+{
+  dout(5) << "tcpmessenger_loop start pid " << getpid() << endl;
+
+  incoming_lock.Lock();
+
+  while (1) {
+
+       // timer events?
+       if (pending_timer) {
+         Timer *t = pending_timer;
+         pending_timer = 0;
+         
+         dout(DBL) << "pending timer" << endl;
+         t->execute_pending();
+       }
+
+       // done?
+       if (tcp_done &&
+               incoming.empty() &&
+               pending_timer == 0) break;
+       
+       // incoming
+       dout(12) << "loop waiting for incoming messages" << endl;
+       
+       incoming_cond.Wait(incoming_lock);
+       
+       while (incoming.size()) {
+         Message *m = incoming.front();
+         incoming.pop_front();
+         
+         int dest = m->get_dest();
+         if (directory.count(dest)) {
+               Messenger *who = directory[ dest ];
+               
+               dout(4) << "---- '" << m->get_type_name() << 
+                 "' from " << MSG_ADDR_NICE(m->get_source()) << ':' << m->get_source_port() <<
+                 " to " << MSG_ADDR_NICE(m->get_dest()) << ':' << m->get_dest_port() << " ---- " 
+                               << m 
+                               << endl;
+               
+               who->dispatch(m);
+         } else {
+               dout (1) << "---- i don't know who " << dest << " is." << endl;
+               assert(0);
+               break;
+         }
+       }
+  }
+
+  incoming_lock.Unlock();
+
+  g_timer.shutdown();
+
+  dout(5) << "tcpmessenger_loop exiting loop" << endl;
+}
+
+
+// start/stop mpi receiver thread (for unsolicited messages)
+int tcpmessenger_start()
+{
+  dout(5) << "starting accept thread" << endl;
+  pthread_create(&listen_thread_id,
+                                NULL,
+                                tcp_accepter,
+                                0);                            
+
+  dout(5) << "starting dispatch thread" << endl;
+  
+  // start a thread
+  pthread_create(&dispatch_thread_id, 
+                                NULL, 
+                                tcpmessenger_loop, 
+                                0);
+
+}
+
+
+/*
+ * kick and wake up _loop (to pick up new outgoing message, or quit)
+ */
+
+void tcpmessenger_kick_loop()
+{
+
+  incoming_cond.Signal();
+  /*
+
+  // if we're same thread as the loop, no kicking necessary
+  if (pthread_self() == dispatch_thread_id) return;   
+  
+  msg_envelope_t kick_env;
+  kick_env.type = 0;
+
+  sender_lock.Lock();
+  tcp_write( out_sd[mpi_rank], (char*)&kick_env, sizeof(kick_env) );
+  sender_lock.Unlock();
+  */
+}
+
+
+// wait for thread to finish
+
+void tcpmessenger_wait()
+{
+  incoming_cond.Signal();
+
+  void *returnval;
+  dout(10) << "tcpmessenger_wait waiting for thread to finished." << endl;
+  pthread_join(dispatch_thread_id, &returnval);
+  dout(10) << "tcpmessenger_wait thread finished." << endl;
+}
+
+
+
+
+/***********
+ * Tcpmessenger class implementation
+ */
+
+TCPMessenger::TCPMessenger(msg_addr_t myaddr) : Messenger(myaddr)
+{
+  // my address
+  this->myaddr = myaddr;
+
+  // register myself in the messenger directory
+  directory[myaddr] = this;
+
+  // register to execute timer events
+  g_timer.set_messenger(this);
+
+  // logger
+  /*
+  string name;
+  name = "m.";
+  name += MSG_ADDR_TYPE(whoami);
+  int w = MSG_ADDR_NUM(whoami);
+  if (w >= 1000) name += ('0' + ((w/1000)%10));
+  if (w >= 100) name += ('0' + ((w/100)%10));
+  if (w >= 10) name += ('0' + ((w/10)%10));
+  name += ('0' + ((w/1)%10));
+
+  logger = new Logger(name, (LogType*)&mpimsg_logtype);
+  loggers[ whoami ] = logger;
+  */
+}
+
+TCPMessenger::~TCPMessenger()
+{
+  //delete logger;
+}
+
+
+int TCPMessenger::shutdown()
+{
+  // remove me from the directory
+  directory.erase(myaddr);
+
+  // no more timer events
+  g_timer.unset_messenger();
+
+  // last one?
+  if (directory.empty()) {
+       dout(10) << "shutdown last tcpmessenger on rank " << mpi_rank << " shut down" << endl;
+       pthread_t whoami = pthread_self();
+
+
+  
+       // close incoming sockets
+       void *r;
+       for (int i=0; i<mpi_world; i++) {
+         if (in_sd[i] == 0) continue;
+         dout(DBL) << "closing reader on " << i << " sd " << in_sd[i] << endl;
+         ::close(in_sd[i]);
+         //dout(DBL) << "waiting for reader thread to close on " << i << endl;
+         //pthread_join(in_threads[i], &r);
+       }
+
+       dout(DBL) << "setting tcp_done" << endl;
+
+       tcp_done = true;
+       incoming_cond.Signal();
+       /*
+
+       dout(15) << "whoami = " << whoami << ", thread = " << dispatch_thread_id << endl;
+       if (whoami == thread_id) {
+         // i am the event loop thread, just set flag!
+         dout(15) << "  set tcp_done=true" << endl;
+         tcp_done = true;
+       }
+       */
+  } else {
+       dout(10) << "shutdown still " << directory.size() << " other messengers on rank " << mpi_rank << endl;
+  }
+}
+
+
+
+/*** events
+ */
+
+void TCPMessenger::trigger_timer(Timer *t)
+{
+  pending_timer = t;
+
+  tcpmessenger_kick_loop();
+}
+
+/***
+ * public messaging interface
+ */
+
+
+/* note: send_message _MUST_ be non-blocking */
+int TCPMessenger::send_message(Message *m, msg_addr_t dest, int port, int fromport)
+{
+  // set envelope
+  m->set_source(myaddr, fromport);
+  m->set_dest(dest, port);
+
+  tcp_send(m);
+}
+
+
+
+
diff --git a/ceph/msg/TCPMessenger.h b/ceph/msg/TCPMessenger.h
new file mode 100644 (file)
index 0000000..a40deee
--- /dev/null
@@ -0,0 +1,47 @@
+#ifndef __TCPMESSENGER_H
+#define __TCPMESSENGER_H
+
+#include "Messenger.h"
+#include "Dispatcher.h"
+
+#define NUMMDS g_conf.num_mds
+#define NUMOSD g_conf.num_osd
+#define MPI_DEST_TO_RANK(dest,world)    ((dest)<(NUMMDS+NUMOSD) ? \
+                                                                                (dest) : \
+                                                                                ((NUMMDS+NUMOSD)+(((dest)-NUMMDS-NUMOSD) % ((world)-NUMMDS-NUMOSD))))
+
+class Timer;
+
+class TCPMessenger : public Messenger {
+ protected:
+  msg_addr_t myaddr;     // my address
+  
+  //class Logger *logger;  // for logging
+  
+ public:
+  TCPMessenger(msg_addr_t myaddr);
+  ~TCPMessenger();
+
+  // init, shutdown MPI and associated event loop thread.
+  virtual int shutdown();
+
+  // message interface
+  virtual int send_message(Message *m, msg_addr_t dest, int port=0, int fromport=0);
+  virtual Message* sendrecv(Message *m, msg_addr_t dest, int port=0) { assert(0); }
+
+  // events
+  virtual void trigger_timer(Timer *t);
+};
+
+/**
+ * these are all ONE per process.
+ */
+
+extern int tcpmessenger_world();
+extern int tcpmessenger_init(int& argc, char**& argv);   // init mpi
+extern int tcpmessenger_start();   // start thread
+extern void tcpmessenger_wait();    // wait for thread to finish.
+extern int tcpmessenger_shutdown();   // finalize MPI
+
+
+#endif
index a951f6a2fccf6d46c871f501fca3a2653ba97e22..678963b6d8009f9c445d8fbfaf84830562b68730 100644 (file)
@@ -25,7 +25,7 @@ using namespace __gnu_cxx;
  * some system constants
  */
 #define NUM_REPLICA_GROUPS   (1<<20)  // ~1M
-#define NUM_RUSH_REPLICAS        10   // this should be big enough to cope w/ failing disks.
+#define NUM_RUSH_REPLICAS         4   // this should be big enough to cope w/ failing disks.
 #define MAX_REPLICAS              3
 
 #define FILE_OBJECT_SIZE    (1<<20)  // 1 MB object size
diff --git a/ceph/tcpfuse.cc b/ceph/tcpfuse.cc
new file mode 100644 (file)
index 0000000..b969796
--- /dev/null
@@ -0,0 +1,169 @@
+
+
+#include <sys/stat.h>
+#include <iostream>
+#include <string>
+using namespace std;
+
+#include "include/config.h"
+
+#include "mds/MDCluster.h"
+#include "mds/MDS.h"
+#include "osd/OSD.h"
+#include "client/Client.h"
+#include "client/fuse.h"
+
+#include "msg/TCPMessenger.h"
+#include "msg/CheesySerializer.h"
+
+#include "common/Timer.h"
+
+#define NUMMDS g_conf.num_mds
+#define NUMOSD g_conf.num_osd
+#define NUMCLIENT g_conf.num_client
+
+class C_Test : public Context {
+public:
+  void finish(int r) {
+       cout << "C_Test->finish(" << r << ")" << endl;
+  }
+};
+
+
+int main(int oargc, char **oargv) {
+
+  //cerr << "tcpfuse starting " << myrank << "/" << world << endl;
+  int argc;
+  char **argv;
+  parse_config_options(oargc, oargv,
+                                          argc, argv);
+
+  int start = 0;
+
+  // build new argc+argv for fuse
+  typedef char* pchar;
+  int nargc = 0;
+  char **nargv = new pchar[argc];
+  nargv[nargc++] = argv[0];
+
+  int mkfs = 0;
+  for (int i=1; i<argc; i++) {
+       if (strcmp(argv[i], "--fastmkfs") == 0) {
+         mkfs = MDS_MKFS_FAST;
+       }
+       else if (strcmp(argv[i], "--fullmkfs") == 0) {
+         mkfs = MDS_MKFS_FULL;
+       }
+       else {
+         // unknown arg, pass it on.
+         nargv[nargc++] = argv[i];
+       }
+  }
+
+  int myrank = tcpmessenger_init(argc, argv);
+  int world = tcpmessenger_world();
+
+  assert(NUMMDS + NUMOSD + NUMCLIENT <= world);
+
+  MDCluster *mdc = new MDCluster(NUMMDS, NUMOSD);
+
+
+  char hostname[100];
+  gethostname(hostname,100);
+  int pid = getpid();
+
+  // create mds
+  MDS *mds[NUMMDS];
+  for (int i=0; i<NUMMDS; i++) {
+       if (myrank != MPI_DEST_TO_RANK(MSG_ADDR_MDS(i),world)) continue;
+       cerr << "mds" << i << " on rank " << myrank << " " << hostname << "." << pid << endl;
+       mds[i] = new MDS(mdc, i, new TCPMessenger(MSG_ADDR_MDS(i)));
+       start++;
+  }
+  
+  // create osd
+  OSD *osd[NUMOSD];
+  for (int i=0; i<NUMOSD; i++) {
+       if (myrank != MPI_DEST_TO_RANK(MSG_ADDR_OSD(i),world)) continue;
+       cerr << "osd" << i << " on rank " << myrank << " " << hostname << "." << pid << endl;
+       osd[i] = new OSD(i, new TCPMessenger(MSG_ADDR_OSD(i)));
+       start++;
+  }
+  
+  // create client
+  Client *client[NUMCLIENT];
+  for (int i=0; i<NUMCLIENT; i++) {
+       if (myrank != MPI_DEST_TO_RANK(MSG_ADDR_CLIENT(i),world)) continue;
+       cerr << "client" << i << " on rank " << myrank << " " << hostname << "." << pid << endl;
+       CheesySerializer *serializer = new CheesySerializer( new TCPMessenger(MSG_ADDR_CLIENT(i)) );
+       client[i] = new Client(mdc, i, serializer);
+       start++;
+  }
+
+
+  // start message loop
+  if (start) {
+       tcpmessenger_start();
+    
+       // init
+       for (int i=0; i<NUMMDS; i++) {
+         if (myrank != MPI_DEST_TO_RANK(MSG_ADDR_MDS(i),world)) continue;
+         mds[i]->init();
+       }
+       
+       for (int i=0; i<NUMOSD; i++) {
+         if (myrank != MPI_DEST_TO_RANK(MSG_ADDR_OSD(i),world)) continue;
+         osd[i]->init();
+       }
+       
+       // create client
+       for (int i=0; i<NUMCLIENT; i++) {
+         if (myrank != MPI_DEST_TO_RANK(MSG_ADDR_CLIENT(i),world)) continue;
+         
+         client[i]->init();
+         
+         // start up fuse
+         // use my argc, argv (make sure you pass a mount point!)
+         cout << "mounting" << endl;
+         client[i]->mount(mkfs);
+
+         cout << "starting fuse on rank " << myrank << " pid " << getpid() << endl;
+         ceph_fuse_main(client[i], nargc, nargv);
+         cout << "fuse finished on rank " << myrank << " pid " << getpid() << endl;
+       }
+       for (int i=0; i<NUMCLIENT; i++) {
+         if (myrank != MPI_DEST_TO_RANK(MSG_ADDR_CLIENT(i),world)) continue;
+         
+         client[i]->unmount();
+         cout << "unmounted" << endl;
+         client[i]->shutdown();
+       }
+       
+               
+       // wait for it to finish
+       tcpmessenger_wait();
+
+  } else {
+       cerr << "IDLE rank " << myrank << endl;
+  }
+
+  tcpmessenger_shutdown();  // shutdown MPI
+  
+  // cleanup
+  for (int i=0; i<NUMMDS; i++) {
+       if (myrank != MPI_DEST_TO_RANK(MSG_ADDR_MDS(i),world)) continue;
+       delete mds[i];
+  }
+  for (int i=0; i<NUMOSD; i++) {
+       if (myrank != MPI_DEST_TO_RANK(MSG_ADDR_OSD(i),world)) continue;
+       delete osd[i];
+  }
+  for (int i=0; i<NUMCLIENT; i++) {
+       if (myrank != MPI_DEST_TO_RANK(MSG_ADDR_CLIENT(i),world)) continue;
+       delete client[i];
+  }
+  delete mdc;
+  
+  return 0;
+}
+
diff --git a/ceph/tcpsyn.cc b/ceph/tcpsyn.cc
new file mode 100644 (file)
index 0000000..0cecee6
--- /dev/null
@@ -0,0 +1,212 @@
+
+#include <sys/stat.h>
+#include <iostream>
+#include <string>
+using namespace std;
+
+#include "include/config.h"
+
+#include "mds/MDCluster.h"
+#include "mds/MDS.h"
+#include "osd/OSD.h"
+#include "client/Client.h"
+#include "client/SyntheticClient.h"
+
+#include "msg/TCPMessenger.h"
+#include "msg/CheesySerializer.h"
+
+#include "common/Timer.h"
+
+#define NUMMDS g_conf.num_mds
+#define NUMOSD g_conf.num_osd
+#define NUMCLIENT g_conf.num_client
+
+class C_Test : public Context {
+public:
+  void finish(int r) {
+       cout << "C_Test->finish(" << r << ")" << endl;
+  }
+};
+
+
+int main(int oargc, char **oargv) {
+
+  //cerr << "tcpsyn starting " << myrank << "/" << world << endl;
+  int argc;
+  char **argv;
+  parse_config_options(oargc, oargv,
+                                          argc, argv);
+
+  int start = 0;
+
+  // build new argc+argv for fuse
+  typedef char* pchar;
+  int nargc = 0;
+  char **nargv = new pchar[argc];
+  nargv[nargc++] = argv[0];
+
+  string syn_sarg1;
+  int syn_mode = SYNCLIENT_MODE_WRITEFILE;
+  int syn_iarg1, syn_iarg2, syn_iarg3;
+  int mkfs = 0;
+  for (int i=1; i<argc; i++) {
+       if (strcmp(argv[i], "--fastmkfs") == 0) {
+         mkfs = MDS_MKFS_FAST;
+       }
+       else if (strcmp(argv[i], "--fullmkfs") == 0) {
+         mkfs = MDS_MKFS_FULL;
+       }
+       else if (strcmp(argv[i],"--synsarg1") == 0) 
+         syn_sarg1 = argv[++i];
+       else if (strcmp(argv[i],"--syniarg1") == 0) 
+         syn_iarg1 = atoi(argv[++i]);
+       else if (strcmp(argv[i],"--syniarg2") == 0) 
+         syn_iarg2 = atoi(argv[++i]);
+       else if (strcmp(argv[i],"--syniarg3") == 0) 
+         syn_iarg3 = atoi(argv[++i]);
+       else if (strcmp(argv[i],"--synmode") == 0) {
+         ++i;
+         if (strcmp(argv[i],"writefile") == 0) 
+               syn_mode = SYNCLIENT_MODE_WRITEFILE;
+         else if (strcmp(argv[i],"makedirs") == 0) 
+               syn_mode = SYNCLIENT_MODE_MAKEDIRS;
+         else if (strcmp(argv[i],"fullwalk") == 0) 
+               syn_mode = SYNCLIENT_MODE_FULLWALK;
+         else if (strcmp(argv[i],"randomwalk") == 0) 
+               syn_mode = SYNCLIENT_MODE_RANDOMWALK;
+         else {
+               cerr << "unknown syn mode " << argv[i] << endl;
+               return -1;
+         }
+       }
+
+       else {
+         // unknown arg, pass it on.
+         nargv[nargc++] = argv[i];
+       }
+  }
+
+  int myrank = tcpmessenger_init(argc, argv);
+  int world = tcpmessenger_world();
+
+  if (myrank == 0)
+       cerr << "nummds " << NUMMDS << "  numosd " << NUMOSD << "  numclient " << NUMCLIENT << endl;
+  assert(NUMMDS + NUMOSD + 1 <= world);
+
+  MDCluster *mdc = new MDCluster(NUMMDS, NUMOSD);
+
+
+  char hostname[100];
+  gethostname(hostname,100);
+  int pid = getpid();
+
+  // create mds
+  MDS *mds[NUMMDS];
+  for (int i=0; i<NUMMDS; i++) {
+       if (myrank != MPI_DEST_TO_RANK(MSG_ADDR_MDS(i),world)) continue;
+       cerr << "mds" << i << " on rank " << myrank << " " << hostname << "." << pid << endl;
+       mds[i] = new MDS(mdc, i, new TCPMessenger(MSG_ADDR_MDS(i)));
+       start++;
+  }
+  
+  // create osd
+  OSD *osd[NUMOSD];
+  for (int i=0; i<NUMOSD; i++) {
+       if (myrank != MPI_DEST_TO_RANK(MSG_ADDR_OSD(i),world)) continue;
+       cerr << "osd" << i << " on rank " << myrank << " " << hostname << "." << pid << endl;
+       osd[i] = new OSD(i, new TCPMessenger(MSG_ADDR_OSD(i)));
+       start++;
+  }
+  
+  // create client
+  Client *client[NUMCLIENT];
+  SyntheticClient *syn[NUMCLIENT];
+  for (int i=0; i<NUMCLIENT; i++) {
+       if (myrank != MPI_DEST_TO_RANK(MSG_ADDR_CLIENT(i),world)) continue;
+       cerr << "client" << i << " on rank " << myrank << " " << hostname << "." << pid << endl;
+       CheesySerializer *serializer = new CheesySerializer( new TCPMessenger(MSG_ADDR_CLIENT(i)) );
+       client[i] = new Client(mdc, i, serializer);
+       start++;
+  }
+
+
+  // start message loop
+  if (start) {
+       tcpmessenger_start();
+    
+       // init
+       for (int i=0; i<NUMMDS; i++) {
+         if (myrank != MPI_DEST_TO_RANK(MSG_ADDR_MDS(i),world)) continue;
+         mds[i]->init();
+       }
+       
+       for (int i=0; i<NUMOSD; i++) {
+         if (myrank != MPI_DEST_TO_RANK(MSG_ADDR_OSD(i),world)) continue;
+         osd[i]->init();
+       }
+       
+       // create client
+       for (int i=0; i<NUMCLIENT; i++) {
+         if (myrank != MPI_DEST_TO_RANK(MSG_ADDR_CLIENT(i),world)) continue;
+         
+         client[i]->init();
+         
+         // use my argc, argv (make sure you pass a mount point!)
+         //cout << "mounting" << endl;
+         client[i]->mount(mkfs);
+
+         //cout << "starting synthetic client on rank " << myrank << endl;
+         syn[i] = new SyntheticClient(client[i]);
+
+         char s[20];
+         sprintf(s,"syn.%d", i);
+         syn[i]->sarg1 = s;
+
+         syn[i]->mode = syn_mode;
+         syn[i]->iarg1 = syn_iarg1;
+         syn[i]->iarg2 = syn_iarg2;
+         syn[i]->iarg3 = syn_iarg3;
+
+         syn[i]->start_thread();
+       }
+       for (int i=0; i<NUMCLIENT; i++) {
+         if (myrank != MPI_DEST_TO_RANK(MSG_ADDR_CLIENT(i),world)) continue;
+         
+         cout << "waiting for synthetic client" << i << " to finish" << endl;
+         syn[i]->join_thread();
+         delete syn[i];
+
+         client[i]->unmount();
+         //cout << "client" << i << " unmounted" << endl;
+         client[i]->shutdown();
+       }
+       
+               
+       // wait for it to finish
+       tcpmessenger_wait();
+
+       //assert(0);
+  } else {
+       cerr << "IDLE rank " << myrank << endl;
+  }
+
+  tcpmessenger_shutdown(); 
+  
+  // cleanup
+  for (int i=0; i<NUMMDS; i++) {
+       if (myrank != MPI_DEST_TO_RANK(MSG_ADDR_MDS(i),world)) continue;
+       delete mds[i];
+  }
+  for (int i=0; i<NUMOSD; i++) {
+       if (myrank != MPI_DEST_TO_RANK(MSG_ADDR_OSD(i),world)) continue;
+       delete osd[i];
+  }
+  for (int i=0; i<NUMCLIENT; i++) {
+       if (myrank != MPI_DEST_TO_RANK(MSG_ADDR_CLIENT(i),world)) continue;
+       delete client[i];
+  }
+  delete mdc;
+  
+  return 0;
+}
+
index ed757334e4197ff30e971cbe6b6a6671db41ef83..283efadb751faefc80d21d659ae2384910f921d5 100644 (file)
@@ -32,7 +32,8 @@ int main(int argc, char **argv) {
 
   mpimessenger_start();
 
-  while(1) {
+  //while (1) {
+  for (int i=0; i<10000; i++) {
        
        // ping random nodes
        int d = rand() % world;
@@ -42,9 +43,10 @@ int main(int argc, char **argv) {
         }
        
   }
-  cout << "shutting down" << endl;
 
-  p->messenger->shutdown();
+
+  //cout << "shutting down" << endl;
+  //p->messenger->shutdown();
   
   mpimessenger_wait();
   mpimessenger_shutdown();  // shutdown MPI