From: sage Date: Thu, 16 Jun 2005 14:52:37 +0000 (+0000) Subject: tcp messenger! X-Git-Tag: v0.1~2056 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=0df31ac0c7bfba576ed0325b78222964c4fce806;p=ceph.git tcp messenger! git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@325 29311d96-e01e-0410-9327-a35deaab8ce9 --- diff --git a/ceph/Makefile b/ceph/Makefile index fb9b084a0d97..eb33a8a1d2c7 100644 --- a/ceph/Makefile +++ b/ceph/Makefile @@ -87,9 +87,15 @@ mttest: test/mttest.cc msg/MTMessenger.cc ${COMMON_OBJS} mpifuse: mpifuse.cc mds/allmds.o client/Client.o client/SyntheticClient.o osd/OSD.o client/fuse.o msg/MPIMessenger.cc msg/CheesySerializer.o ${COMMON_OBJS} ${MPICC} ${CFLAGS} ${LIBS} -lfuse $^ -o $@ +tcpfuse: tcpfuse.cc mds/allmds.o client/Client.o client/SyntheticClient.o osd/OSD.o client/fuse.o msg/TCPMessenger.cc msg/CheesySerializer.o ${COMMON_OBJS} + ${MPICC} ${CFLAGS} ${LIBS} -lfuse $^ -o $@ + mpisyn: mpisyn.cc mds/allmds.o client/Client.o client/SyntheticClient.o osd/OSD.o msg/MPIMessenger.cc msg/CheesySerializer.o ${COMMON_OBJS} ${MPICC} ${MPICFLAGS} ${MPILIBS} $^ -o $@ +tcpsyn: tcpsyn.cc mds/allmds.o client/Client.o client/SyntheticClient.o osd/OSD.o msg/TCPMessenger.cc msg/CheesySerializer.o ${COMMON_OBJS} + ${MPICC} ${MPICFLAGS} ${MPILIBS} $^ -o $@ + fakesyn: fakesyn.cc mds/allmds.o client/Client.o client/SyntheticClient.o osd/OSD.o msg/FakeMessenger.o msg/CheesySerializer.o ${COMMON_OBJS} ${CC} -pg ${CFLAGS} ${LIBS} $^ -o $@ diff --git a/ceph/config.cc b/ceph/config.cc index 33fddeff14cd..6afb54e9a037 100644 --- a/ceph/config.cc +++ b/ceph/config.cc @@ -50,6 +50,7 @@ md_config_t g_conf = { mds_bal_replicate_threshold: 500, mds_bal_unreplicate_threshold: 200, mds_bal_interval: 60, // seconds + mds_bal_idle_threshold: .1, mds_commit_on_shutdown: true, diff --git a/ceph/config.h b/ceph/config.h index 48da001ec764..3d2cc10b23f3 100644 --- a/ceph/config.h +++ b/ceph/config.h @@ -41,6 +41,7 @@ struct md_config_t { float mds_bal_replicate_threshold; float mds_bal_unreplicate_threshold; int mds_bal_interval; + float mds_bal_idle_threshold; bool mds_commit_on_shutdown; bool mds_verify_export_dirauth; // debug flag diff --git a/ceph/mds/MDBalancer.cc b/ceph/mds/MDBalancer.cc index dbeb42639eb5..a2abc5a3c9a9 100644 --- a/ceph/mds/MDBalancer.cc +++ b/ceph/mds/MDBalancer.cc @@ -194,7 +194,7 @@ void MDBalancer::do_rebalance() maxim < 0) break; if (maxim < maxex) { // import takes it all - dout(5) << " - " << (*exporter).second << " exports " << maxim << " to " << (*importer).second << endl; + dout(5) << " - mds" << (*exporter).second << " exports " << maxim << " to mds" << (*importer).second << endl; if ((*exporter).second == whoami) my_targets.insert(pair((*importer).second, maxim)); exported += maxim; @@ -202,7 +202,7 @@ void MDBalancer::do_rebalance() imported = 0; } else if (maxim > maxex) { // export all - dout(5) << " - " << (*exporter).second << " exports " << maxex << " to " << (*importer).second << endl; + dout(5) << " - mds" << (*exporter).second << " exports " << maxex << " to mds" << (*importer).second << endl; if ((*exporter).second == whoami) my_targets.insert(pair((*importer).second, maxex)); imported += maxex; @@ -210,7 +210,7 @@ void MDBalancer::do_rebalance() exported = 0; } else { // wow, perfect match! - dout(5) << " - " << (*exporter).second << " exports " << maxex << " to " << (*importer).second << endl; + dout(5) << " - mds" << (*exporter).second << " exports " << maxex << " to mds" << (*importer).second << endl; if ((*exporter).second == whoami) my_targets.insert(pair((*importer).second, maxex)); imported = exported = 0; @@ -224,9 +224,16 @@ void MDBalancer::do_rebalance() for (set::iterator it = mds->mdcache->imports.begin(); it != mds->mdcache->imports.end(); it++) { - import_pop_map.insert(pair((*it)->popularity[MDS_POP_CURDOM].get(now), *it)); + double pop = (*it)->popularity[MDS_POP_CURDOM].get(now); + if (pop < g_conf.mds_bal_idle_threshold && + (*it)->inode != mds->mdcache->get_root()) { + dout(5) << " exporting idle import " << **it << endl; + mds->mdcache->export_dir(*it, (*it)->inode->authority()); + continue; + } + import_pop_map[ pop ] = *it; int from = (*it)->inode->authority(); - dout(5) << "map i imported " << **it << " from " << from << endl; + dout(15) << " map: i imported " << **it << " from " << from << endl; import_from_map.insert(pair(from, *it)); } @@ -552,7 +559,7 @@ void MDBalancer::show_imports(bool external) p != mds->mdcache->nested_exports[im].end(); p++) { CDir *exp = *p; - dout(db) << " - ex (" << exp->popularity[MDS_POP_NESTED].get(now) << ", " << exp->popularity[MDS_POP_ANYDOM].get(now) << ")" << *exp << " to " << exp->dir_auth << endl; + dout(db) << " - ex (" << exp->popularity[MDS_POP_NESTED].get(now) << ", " << exp->popularity[MDS_POP_ANYDOM].get(now) << ") " << *exp << " to " << exp->dir_auth << endl; assert( exp->is_export() ); assert( !exp->is_auth() ); diff --git a/ceph/msg/MPIMessenger.cc b/ceph/msg/MPIMessenger.cc index 6ea3c6083cec..bb6997859098 100644 --- a/ceph/msg/MPIMessenger.cc +++ b/ceph/msg/MPIMessenger.cc @@ -184,12 +184,8 @@ Message *mpi_recv(int tag) MPI_COMM_WORLD, &status/*, &recv_env_req*/) == MPI_SUCCESS); - - if (status.count < MSG_ENVELOPE_LEN) { - dout(DBLVL) << "mpi_recv got short recv " << status.count << " bytes" << endl; - assert(0); - return 0; - } + assert(status.count == MSG_ENVELOPE_LEN); + if (env.type == 0) { dout(DBLVL) << "mpi_recv got type 0 message, kicked!" << endl; return 0; @@ -202,7 +198,7 @@ Message *mpi_recv(int tag) for (int i=0; i +#include +using namespace std; +#include +using namespace __gnu_cxx; + + +# include +# include +# include +# include +#include +#include +#include + +#include +#include + + +#define DBL 18 + +int tcp_port = 9876; + +/* + * We make a directory, so that we can have multiple Messengers in the + * same process (rank). This is useful for benchmarking and creating lots of + * simulated clients, e.g. + */ + +hash_map directory; // local +list incoming; +Mutex incoming_lock; +Cond incoming_cond; + +struct sockaddr_in *remote_addr; +int *in_sd; // incoming sockets +pthread_t *in_threads; +int *out_sd; // outgoing sockets + +struct sockaddr_in listen_addr; +int listen_sd = 0; + + + +/* this process */ +int mpi_world; +int mpi_rank; +bool tcp_done = false; // set this flag to stop the event loop + +pthread_t dispatch_thread_id = 0; // thread id of the event loop. init value == nobody +pthread_t listen_thread_id = 0; +Mutex sender_lock; + +Timer *pending_timer = 0; + + + +// debug +#undef dout +#define dout(l) if (l<=g_conf.debug) cout << "[TCP " << mpi_rank << "/" << mpi_world << " " << getpid() << "." << pthread_self() << "] " + +ostream& operator<<(ostream& out, struct sockaddr_in &a) +{ + char addr[4]; + memcpy((char*)addr, (char*)&a.sin_addr.s_addr, 4); + out << (int)addr[0] << "." + << (int)addr[1] << "." + << (int)addr[2] << "." + << (int)addr[3] << ":" + << (int)a.sin_port; + return out; +} + + +/***** + * MPI global methods for process-wide startup, shutdown. + */ + +int tcpmessenger_init(int& argc, char**& argv) +{ + // exhcnage addresses with other nodes + MPI_Init(&argc, &argv); + + MPI_Comm_size(MPI_COMM_WORLD, &mpi_world); + MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank); + + dout(DBL) << "rank is " << mpi_rank << " / " << mpi_world << endl; + + // LISTEN + dout(DBL) << "binding to listen " << endl; + + /* socket creation */ + listen_sd = socket(AF_INET,SOCK_STREAM,0); + assert(listen_sd > 0); + + /* bind to port */ + memset((char*)&listen_addr, 0, sizeof(listen_addr)); + listen_addr.sin_family = AF_INET; + listen_addr.sin_addr.s_addr = htonl(INADDR_ANY); + listen_addr.sin_port = 0; + + int rc = bind(listen_sd, (struct sockaddr *) &listen_addr, sizeof(listen_addr)); + assert(rc >= 0); + + socklen_t llen = sizeof(listen_addr); + getsockname(listen_sd, (sockaddr*)&listen_addr, &llen); + + int myport = listen_addr.sin_port; + + // listen! + rc = ::listen(listen_sd, 2*mpi_world); + assert(rc >= 0); + + dout(DBL) << "listening on " << myport << endl; + + remote_addr = new (struct sockaddr_in)[mpi_world]; + + // my address is... + char host[100]; + gethostname(host, 100); + dout(DBL) << "my hostname is " << host << endl; + + struct hostent *myhostname = gethostbyname( host ); + + struct sockaddr_in myaddr; + myaddr.sin_family = myhostname->h_addrtype; + memcpy((char *) &myaddr.sin_addr.s_addr, + myhostname->h_addr_list[0], + myhostname->h_length); + myaddr.sin_port = myport; + + dout(DBL) << "my ip is " << myaddr << endl; + + remote_addr[mpi_rank] = myaddr; + + dout(DBL) << "MPI_Allgathering addrs" << endl; + MPI_Allgather( &myaddr, sizeof(struct sockaddr_in), MPI_CHAR, + remote_addr, sizeof(struct sockaddr_in), MPI_CHAR, + MPI_COMM_WORLD); + + // for (int i=0; i 0) { + int got = ::recv( sd, buf, len, 0 ); + if (got < 0) return false; + assert(got >= 0); + len -= got; + buf += got; + //dout(DBL) << "tcp_read got " << got << ", " << len << " left" << endl; + } + return true; +} + +void tcp_write(int sd, char *buf, int len) +{ + //dout(DBL) << "tcp_write writing " << len << endl; + while (len > 0) { + int did = ::send( sd, buf, len, 0 ); + assert(did >= 0); + len -= did; + buf += did; + dout(DBL) << "tcp_write did " << did << ", " << len << " left" << endl; + } + +} + + +/* + * recv a Message* + */ + + + +void tcp_wait() +{ + fd_set fds; + FD_ZERO(&fds); + + int n = 0; + + for (int i=0; i 0) { + //dout(DBL) << "accepting, left = " << left << endl; + + struct sockaddr_in addr; + socklen_t slen = sizeof(addr); + int sd = ::accept(listen_sd, (sockaddr*)&addr, &slen); + if (sd > 0) { + + //dout(DBL) << "accepted incoming, reading who it is " << endl; + + int who; + tcp_read(sd, (char*)&who, sizeof(who)); + + in_sd[who] = sd; + left--; + + dout(DBL) << "accepted incoming from " << who << ", left = " << left << endl; + + pthread_create(&in_threads[who], + NULL, + tcp_inthread, + (void*)who); + } else { + dout(DBL) << "no incoming connection?" << endl; + } + } + dout(DBL) << "got incoming from everyone!" << endl; +} + + + +bool tcp_recv_any() +{ + bool any = false; + + //if (mpi_rank == 0 && tcp_accept()) any = true; + + // any? + for (int i=0; i 0) { + Message *m = tcp_recv(i); + if (m) { + incoming_lock.Lock(); + incoming.push_back(m); + incoming_cond.Signal(); + incoming_lock.Unlock(); + any = true; + } + } + } + + return any; +} + +void tcp_open(int who) +{ + //dout(DBL) << "tcp_open " << who << " to " << remote_addr[who] << endl; + + // create socket? + int sd = socket(AF_INET,SOCK_STREAM,0); + assert(sd > 0); + + // bind any port + struct sockaddr_in myAddr; + myAddr.sin_family = AF_INET; + myAddr.sin_addr.s_addr = htonl(INADDR_ANY); + myAddr.sin_port = htons( 0 ); + + int rc = bind(sd, (struct sockaddr *) &myAddr, sizeof(myAddr)); + assert(rc>=0); + + // connect! + int r = connect(sd, (sockaddr*)&remote_addr[who], sizeof(myAddr)); + assert(r >= 0); + + //dout(DBL) << "tcp_open connected to " << who << endl; + + int me = mpi_rank; + tcp_write(sd, (char*)&me, sizeof(me)); + + out_sd[who] = sd; +} + + + +/* + * send a Message* over the wire. ** do not block **. + */ +int tcp_send(Message *m) +{ + int rank = MPI_DEST_TO_RANK(m->get_dest(), mpi_world); + + // marshall + m->encode_payload(); + msg_envelope_t *env = &m->get_envelope(); + bufferlist blist = m->get_payload(); + env->nchunks = blist.buffers().size(); + + dout(7) << "sending " << *m << " to " << MSG_ADDR_NICE(env->dest) << " (rank " << rank << ")" << endl; + + sender_lock.Lock(); + + // open first? + if (out_sd[rank] == 0) tcp_open(rank); + + // send envelope + tcp_write( out_sd[rank], (char*)env, sizeof(*env) ); + + // payload + int i = 0; + for (list::iterator it = blist.buffers().begin(); + it != blist.buffers().end(); + it++) { + dout(DBL) << "tcp_sending frag " << i << " len " << (*it).length() << endl; + int size = (*it).length(); + tcp_write( out_sd[rank], (char*)&size, sizeof(size) ); + tcp_write( out_sd[rank], (*it).c_str(), size ); + i++; + } + + sender_lock.Unlock(); +} + + + +// recv event loop, for unsolicited messages. + +void* tcpmessenger_loop(void*) +{ + dout(5) << "tcpmessenger_loop start pid " << getpid() << endl; + + incoming_lock.Lock(); + + while (1) { + + // timer events? + if (pending_timer) { + Timer *t = pending_timer; + pending_timer = 0; + + dout(DBL) << "pending timer" << endl; + t->execute_pending(); + } + + // done? + if (tcp_done && + incoming.empty() && + pending_timer == 0) break; + + // incoming + dout(12) << "loop waiting for incoming messages" << endl; + + incoming_cond.Wait(incoming_lock); + + while (incoming.size()) { + Message *m = incoming.front(); + incoming.pop_front(); + + int dest = m->get_dest(); + if (directory.count(dest)) { + Messenger *who = directory[ dest ]; + + dout(4) << "---- '" << m->get_type_name() << + "' from " << MSG_ADDR_NICE(m->get_source()) << ':' << m->get_source_port() << + " to " << MSG_ADDR_NICE(m->get_dest()) << ':' << m->get_dest_port() << " ---- " + << m + << endl; + + who->dispatch(m); + } else { + dout (1) << "---- i don't know who " << dest << " is." << endl; + assert(0); + break; + } + } + } + + incoming_lock.Unlock(); + + g_timer.shutdown(); + + dout(5) << "tcpmessenger_loop exiting loop" << endl; +} + + +// start/stop mpi receiver thread (for unsolicited messages) +int tcpmessenger_start() +{ + dout(5) << "starting accept thread" << endl; + pthread_create(&listen_thread_id, + NULL, + tcp_accepter, + 0); + + dout(5) << "starting dispatch thread" << endl; + + // start a thread + pthread_create(&dispatch_thread_id, + NULL, + tcpmessenger_loop, + 0); + +} + + +/* + * kick and wake up _loop (to pick up new outgoing message, or quit) + */ + +void tcpmessenger_kick_loop() +{ + + incoming_cond.Signal(); + /* + + // if we're same thread as the loop, no kicking necessary + if (pthread_self() == dispatch_thread_id) return; + + msg_envelope_t kick_env; + kick_env.type = 0; + + sender_lock.Lock(); + tcp_write( out_sd[mpi_rank], (char*)&kick_env, sizeof(kick_env) ); + sender_lock.Unlock(); + */ +} + + +// wait for thread to finish + +void tcpmessenger_wait() +{ + incoming_cond.Signal(); + + void *returnval; + dout(10) << "tcpmessenger_wait waiting for thread to finished." << endl; + pthread_join(dispatch_thread_id, &returnval); + dout(10) << "tcpmessenger_wait thread finished." << endl; +} + + + + +/*********** + * Tcpmessenger class implementation + */ + +TCPMessenger::TCPMessenger(msg_addr_t myaddr) : Messenger(myaddr) +{ + // my address + this->myaddr = myaddr; + + // register myself in the messenger directory + directory[myaddr] = this; + + // register to execute timer events + g_timer.set_messenger(this); + + // logger + /* + string name; + name = "m."; + name += MSG_ADDR_TYPE(whoami); + int w = MSG_ADDR_NUM(whoami); + if (w >= 1000) name += ('0' + ((w/1000)%10)); + if (w >= 100) name += ('0' + ((w/100)%10)); + if (w >= 10) name += ('0' + ((w/10)%10)); + name += ('0' + ((w/1)%10)); + + logger = new Logger(name, (LogType*)&mpimsg_logtype); + loggers[ whoami ] = logger; + */ +} + +TCPMessenger::~TCPMessenger() +{ + //delete logger; +} + + +int TCPMessenger::shutdown() +{ + // remove me from the directory + directory.erase(myaddr); + + // no more timer events + g_timer.unset_messenger(); + + // last one? + if (directory.empty()) { + dout(10) << "shutdown last tcpmessenger on rank " << mpi_rank << " shut down" << endl; + pthread_t whoami = pthread_self(); + + + + // close incoming sockets + void *r; + for (int i=0; iset_source(myaddr, fromport); + m->set_dest(dest, port); + + tcp_send(m); +} + + + + diff --git a/ceph/msg/TCPMessenger.h b/ceph/msg/TCPMessenger.h new file mode 100644 index 000000000000..a40deee3927c --- /dev/null +++ b/ceph/msg/TCPMessenger.h @@ -0,0 +1,47 @@ +#ifndef __TCPMESSENGER_H +#define __TCPMESSENGER_H + +#include "Messenger.h" +#include "Dispatcher.h" + +#define NUMMDS g_conf.num_mds +#define NUMOSD g_conf.num_osd +#define MPI_DEST_TO_RANK(dest,world) ((dest)<(NUMMDS+NUMOSD) ? \ + (dest) : \ + ((NUMMDS+NUMOSD)+(((dest)-NUMMDS-NUMOSD) % ((world)-NUMMDS-NUMOSD)))) + +class Timer; + +class TCPMessenger : public Messenger { + protected: + msg_addr_t myaddr; // my address + + //class Logger *logger; // for logging + + public: + TCPMessenger(msg_addr_t myaddr); + ~TCPMessenger(); + + // init, shutdown MPI and associated event loop thread. + virtual int shutdown(); + + // message interface + virtual int send_message(Message *m, msg_addr_t dest, int port=0, int fromport=0); + virtual Message* sendrecv(Message *m, msg_addr_t dest, int port=0) { assert(0); } + + // events + virtual void trigger_timer(Timer *t); +}; + +/** + * these are all ONE per process. + */ + +extern int tcpmessenger_world(); +extern int tcpmessenger_init(int& argc, char**& argv); // init mpi +extern int tcpmessenger_start(); // start thread +extern void tcpmessenger_wait(); // wait for thread to finish. +extern int tcpmessenger_shutdown(); // finalize MPI + + +#endif diff --git a/ceph/osd/OSDMap.h b/ceph/osd/OSDMap.h index a951f6a2fccf..678963b6d800 100644 --- a/ceph/osd/OSDMap.h +++ b/ceph/osd/OSDMap.h @@ -25,7 +25,7 @@ using namespace __gnu_cxx; * some system constants */ #define NUM_REPLICA_GROUPS (1<<20) // ~1M -#define NUM_RUSH_REPLICAS 10 // this should be big enough to cope w/ failing disks. +#define NUM_RUSH_REPLICAS 4 // this should be big enough to cope w/ failing disks. #define MAX_REPLICAS 3 #define FILE_OBJECT_SIZE (1<<20) // 1 MB object size diff --git a/ceph/tcpfuse.cc b/ceph/tcpfuse.cc new file mode 100644 index 000000000000..b96979681660 --- /dev/null +++ b/ceph/tcpfuse.cc @@ -0,0 +1,169 @@ + + +#include +#include +#include +using namespace std; + +#include "include/config.h" + +#include "mds/MDCluster.h" +#include "mds/MDS.h" +#include "osd/OSD.h" +#include "client/Client.h" +#include "client/fuse.h" + +#include "msg/TCPMessenger.h" +#include "msg/CheesySerializer.h" + +#include "common/Timer.h" + +#define NUMMDS g_conf.num_mds +#define NUMOSD g_conf.num_osd +#define NUMCLIENT g_conf.num_client + +class C_Test : public Context { +public: + void finish(int r) { + cout << "C_Test->finish(" << r << ")" << endl; + } +}; + + +int main(int oargc, char **oargv) { + + //cerr << "tcpfuse starting " << myrank << "/" << world << endl; + int argc; + char **argv; + parse_config_options(oargc, oargv, + argc, argv); + + int start = 0; + + // build new argc+argv for fuse + typedef char* pchar; + int nargc = 0; + char **nargv = new pchar[argc]; + nargv[nargc++] = argv[0]; + + int mkfs = 0; + for (int i=1; iinit(); + } + + for (int i=0; iinit(); + } + + // create client + for (int i=0; iinit(); + + // start up fuse + // use my argc, argv (make sure you pass a mount point!) + cout << "mounting" << endl; + client[i]->mount(mkfs); + + cout << "starting fuse on rank " << myrank << " pid " << getpid() << endl; + ceph_fuse_main(client[i], nargc, nargv); + cout << "fuse finished on rank " << myrank << " pid " << getpid() << endl; + } + for (int i=0; iunmount(); + cout << "unmounted" << endl; + client[i]->shutdown(); + } + + + // wait for it to finish + tcpmessenger_wait(); + + } else { + cerr << "IDLE rank " << myrank << endl; + } + + tcpmessenger_shutdown(); // shutdown MPI + + // cleanup + for (int i=0; i +#include +#include +using namespace std; + +#include "include/config.h" + +#include "mds/MDCluster.h" +#include "mds/MDS.h" +#include "osd/OSD.h" +#include "client/Client.h" +#include "client/SyntheticClient.h" + +#include "msg/TCPMessenger.h" +#include "msg/CheesySerializer.h" + +#include "common/Timer.h" + +#define NUMMDS g_conf.num_mds +#define NUMOSD g_conf.num_osd +#define NUMCLIENT g_conf.num_client + +class C_Test : public Context { +public: + void finish(int r) { + cout << "C_Test->finish(" << r << ")" << endl; + } +}; + + +int main(int oargc, char **oargv) { + + //cerr << "tcpsyn starting " << myrank << "/" << world << endl; + int argc; + char **argv; + parse_config_options(oargc, oargv, + argc, argv); + + int start = 0; + + // build new argc+argv for fuse + typedef char* pchar; + int nargc = 0; + char **nargv = new pchar[argc]; + nargv[nargc++] = argv[0]; + + string syn_sarg1; + int syn_mode = SYNCLIENT_MODE_WRITEFILE; + int syn_iarg1, syn_iarg2, syn_iarg3; + int mkfs = 0; + for (int i=1; iinit(); + } + + for (int i=0; iinit(); + } + + // create client + for (int i=0; iinit(); + + // use my argc, argv (make sure you pass a mount point!) + //cout << "mounting" << endl; + client[i]->mount(mkfs); + + //cout << "starting synthetic client on rank " << myrank << endl; + syn[i] = new SyntheticClient(client[i]); + + char s[20]; + sprintf(s,"syn.%d", i); + syn[i]->sarg1 = s; + + syn[i]->mode = syn_mode; + syn[i]->iarg1 = syn_iarg1; + syn[i]->iarg2 = syn_iarg2; + syn[i]->iarg3 = syn_iarg3; + + syn[i]->start_thread(); + } + for (int i=0; ijoin_thread(); + delete syn[i]; + + client[i]->unmount(); + //cout << "client" << i << " unmounted" << endl; + client[i]->shutdown(); + } + + + // wait for it to finish + tcpmessenger_wait(); + + //assert(0); + } else { + cerr << "IDLE rank " << myrank << endl; + } + + tcpmessenger_shutdown(); + + // cleanup + for (int i=0; imessenger->shutdown(); + + //cout << "shutting down" << endl; + //p->messenger->shutdown(); mpimessenger_wait(); mpimessenger_shutdown(); // shutdown MPI