mpifuse: mpifuse.cc mds/allmds.o client/Client.o client/SyntheticClient.o osd/OSD.o client/fuse.o msg/MPIMessenger.cc msg/CheesySerializer.o ${COMMON_OBJS}
${MPICC} ${CFLAGS} ${LIBS} -lfuse $^ -o $@
+tcpfuse: tcpfuse.cc mds/allmds.o client/Client.o client/SyntheticClient.o osd/OSD.o client/fuse.o msg/TCPMessenger.cc msg/CheesySerializer.o ${COMMON_OBJS}
+ ${MPICC} ${CFLAGS} ${LIBS} -lfuse $^ -o $@
+
mpisyn: mpisyn.cc mds/allmds.o client/Client.o client/SyntheticClient.o osd/OSD.o msg/MPIMessenger.cc msg/CheesySerializer.o ${COMMON_OBJS}
${MPICC} ${MPICFLAGS} ${MPILIBS} $^ -o $@
+tcpsyn: tcpsyn.cc mds/allmds.o client/Client.o client/SyntheticClient.o osd/OSD.o msg/TCPMessenger.cc msg/CheesySerializer.o ${COMMON_OBJS}
+ ${MPICC} ${MPICFLAGS} ${MPILIBS} $^ -o $@
+
fakesyn: fakesyn.cc mds/allmds.o client/Client.o client/SyntheticClient.o osd/OSD.o msg/FakeMessenger.o msg/CheesySerializer.o ${COMMON_OBJS}
${CC} -pg ${CFLAGS} ${LIBS} $^ -o $@
mds_bal_replicate_threshold: 500,
mds_bal_unreplicate_threshold: 200,
mds_bal_interval: 60, // seconds
+ mds_bal_idle_threshold: .1,
mds_commit_on_shutdown: true,
float mds_bal_replicate_threshold;
float mds_bal_unreplicate_threshold;
int mds_bal_interval;
+ float mds_bal_idle_threshold;
bool mds_commit_on_shutdown;
bool mds_verify_export_dirauth; // debug flag
maxim < 0) break;
if (maxim < maxex) { // import takes it all
- dout(5) << " - " << (*exporter).second << " exports " << maxim << " to " << (*importer).second << endl;
+ dout(5) << " - mds" << (*exporter).second << " exports " << maxim << " to mds" << (*importer).second << endl;
if ((*exporter).second == whoami)
my_targets.insert(pair<int,double>((*importer).second, maxim));
exported += maxim;
imported = 0;
}
else if (maxim > maxex) { // export all
- dout(5) << " - " << (*exporter).second << " exports " << maxex << " to " << (*importer).second << endl;
+ dout(5) << " - mds" << (*exporter).second << " exports " << maxex << " to mds" << (*importer).second << endl;
if ((*exporter).second == whoami)
my_targets.insert(pair<int,double>((*importer).second, maxex));
imported += maxex;
exported = 0;
} else {
// wow, perfect match!
- dout(5) << " - " << (*exporter).second << " exports " << maxex << " to " << (*importer).second << endl;
+ dout(5) << " - mds" << (*exporter).second << " exports " << maxex << " to mds" << (*importer).second << endl;
if ((*exporter).second == whoami)
my_targets.insert(pair<int,double>((*importer).second, maxex));
imported = exported = 0;
for (set<CDir*>::iterator it = mds->mdcache->imports.begin();
it != mds->mdcache->imports.end();
it++) {
- import_pop_map.insert(pair<double,CDir*>((*it)->popularity[MDS_POP_CURDOM].get(now), *it));
+ double pop = (*it)->popularity[MDS_POP_CURDOM].get(now);
+ if (pop < g_conf.mds_bal_idle_threshold &&
+ (*it)->inode != mds->mdcache->get_root()) {
+ dout(5) << " exporting idle import " << **it << endl;
+ mds->mdcache->export_dir(*it, (*it)->inode->authority());
+ continue;
+ }
+ import_pop_map[ pop ] = *it;
int from = (*it)->inode->authority();
- dout(5) << "map i imported " << **it << " from " << from << endl;
+ dout(15) << " map: i imported " << **it << " from " << from << endl;
import_from_map.insert(pair<int,CDir*>(from, *it));
}
p != mds->mdcache->nested_exports[im].end();
p++) {
CDir *exp = *p;
- dout(db) << " - ex (" << exp->popularity[MDS_POP_NESTED].get(now) << ", " << exp->popularity[MDS_POP_ANYDOM].get(now) << ")" << *exp << " to " << exp->dir_auth << endl;
+ dout(db) << " - ex (" << exp->popularity[MDS_POP_NESTED].get(now) << ", " << exp->popularity[MDS_POP_ANYDOM].get(now) << ") " << *exp << " to " << exp->dir_auth << endl;
assert( exp->is_export() );
assert( !exp->is_auth() );
MPI_COMM_WORLD,
&status/*,
&recv_env_req*/) == MPI_SUCCESS);
-
- if (status.count < MSG_ENVELOPE_LEN) {
- dout(DBLVL) << "mpi_recv got short recv " << status.count << " bytes" << endl;
- assert(0);
- return 0;
- }
+ assert(status.count == MSG_ENVELOPE_LEN);
+
if (env.type == 0) {
dout(DBLVL) << "mpi_recv got type 0 message, kicked!" << endl;
return 0;
for (int i=0; i<env.nchunks; i++) {
MPI_Status fragstatus;
ASSERT(MPI_Probe(status.MPI_SOURCE,
- tag, //TAG_PAYLOAD,
+ tag,
MPI_COMM_WORLD,
&fragstatus) == MPI_SUCCESS);
fragstatus.count,
MPI_CHAR,
status.MPI_SOURCE,
- tag, //TAG_PAYLOAD,
+ tag,
MPI_COMM_WORLD,
&fragstatus) == MPI_SUCCESS);
--- /dev/null
+
+#include "include/config.h"
+#include "include/error.h"
+
+#include "common/Timer.h"
+#include "common/Mutex.h"
+
+#include "TCPMessenger.h"
+#include "Message.h"
+
+#include <iostream>
+#include <cassert>
+using namespace std;
+#include <ext/hash_map>
+using namespace __gnu_cxx;
+
+
+# include <netdb.h>
+# include <sys/socket.h>
+# include <netinet/in.h>
+# include <arpa/inet.h>
+#include <sys/select.h>
+#include <fcntl.h>
+#include <sys/types.h>
+
+#include <unistd.h>
+#include <mpi.h>
+
+
+#define DBL 18
+
+int tcp_port = 9876;
+
+/*
+ * We make a directory, so that we can have multiple Messengers in the
+ * same process (rank). This is useful for benchmarking and creating lots of
+ * simulated clients, e.g.
+ */
+
+hash_map<int, TCPMessenger*> directory; // local
+list<Message*> incoming;
+Mutex incoming_lock;
+Cond incoming_cond;
+
+struct sockaddr_in *remote_addr;
+int *in_sd; // incoming sockets
+pthread_t *in_threads;
+int *out_sd; // outgoing sockets
+
+struct sockaddr_in listen_addr;
+int listen_sd = 0;
+
+
+
+/* this process */
+int mpi_world;
+int mpi_rank;
+bool tcp_done = false; // set this flag to stop the event loop
+
+pthread_t dispatch_thread_id = 0; // thread id of the event loop. init value == nobody
+pthread_t listen_thread_id = 0;
+Mutex sender_lock;
+
+Timer *pending_timer = 0;
+
+
+
+// debug
+#undef dout
+#define dout(l) if (l<=g_conf.debug) cout << "[TCP " << mpi_rank << "/" << mpi_world << " " << getpid() << "." << pthread_self() << "] "
+
+ostream& operator<<(ostream& out, struct sockaddr_in &a)
+{
+ char addr[4];
+ memcpy((char*)addr, (char*)&a.sin_addr.s_addr, 4);
+ out << (int)addr[0] << "."
+ << (int)addr[1] << "."
+ << (int)addr[2] << "."
+ << (int)addr[3] << ":"
+ << (int)a.sin_port;
+ return out;
+}
+
+
+/*****
+ * MPI global methods for process-wide startup, shutdown.
+ */
+
+int tcpmessenger_init(int& argc, char**& argv)
+{
+ // exhcnage addresses with other nodes
+ MPI_Init(&argc, &argv);
+
+ MPI_Comm_size(MPI_COMM_WORLD, &mpi_world);
+ MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank);
+
+ dout(DBL) << "rank is " << mpi_rank << " / " << mpi_world << endl;
+
+ // LISTEN
+ dout(DBL) << "binding to listen " << endl;
+
+ /* socket creation */
+ listen_sd = socket(AF_INET,SOCK_STREAM,0);
+ assert(listen_sd > 0);
+
+ /* bind to port */
+ memset((char*)&listen_addr, 0, sizeof(listen_addr));
+ listen_addr.sin_family = AF_INET;
+ listen_addr.sin_addr.s_addr = htonl(INADDR_ANY);
+ listen_addr.sin_port = 0;
+
+ int rc = bind(listen_sd, (struct sockaddr *) &listen_addr, sizeof(listen_addr));
+ assert(rc >= 0);
+
+ socklen_t llen = sizeof(listen_addr);
+ getsockname(listen_sd, (sockaddr*)&listen_addr, &llen);
+
+ int myport = listen_addr.sin_port;
+
+ // listen!
+ rc = ::listen(listen_sd, 2*mpi_world);
+ assert(rc >= 0);
+
+ dout(DBL) << "listening on " << myport << endl;
+
+ remote_addr = new (struct sockaddr_in)[mpi_world];
+
+ // my address is...
+ char host[100];
+ gethostname(host, 100);
+ dout(DBL) << "my hostname is " << host << endl;
+
+ struct hostent *myhostname = gethostbyname( host );
+
+ struct sockaddr_in myaddr;
+ myaddr.sin_family = myhostname->h_addrtype;
+ memcpy((char *) &myaddr.sin_addr.s_addr,
+ myhostname->h_addr_list[0],
+ myhostname->h_length);
+ myaddr.sin_port = myport;
+
+ dout(DBL) << "my ip is " << myaddr << endl;
+
+ remote_addr[mpi_rank] = myaddr;
+
+ dout(DBL) << "MPI_Allgathering addrs" << endl;
+ MPI_Allgather( &myaddr, sizeof(struct sockaddr_in), MPI_CHAR,
+ remote_addr, sizeof(struct sockaddr_in), MPI_CHAR,
+ MPI_COMM_WORLD);
+
+ // for (int i=0; i<mpi_world; i++)
+ //dout(DBL) << " addr of " << i << " is " << remote_addr[i] << endl;
+
+
+ MPI_Finalize();
+
+ in_sd = new int[mpi_world];
+ memset(in_sd, 0, sizeof(int)*mpi_world);
+ out_sd = new int[mpi_world];
+ memset(out_sd, 0, sizeof(int)*mpi_world);
+ in_threads = new pthread_t[mpi_world];
+ memset(in_threads, 0, sizeof(pthread_t)*mpi_world);
+
+ dout(DBL) << "init done" << endl;
+ return mpi_rank;
+}
+
+
+
+int tcpmessenger_shutdown()
+{
+ dout(5) << "tcpmessenger_shutdown clsoing all sockets etc" << endl;
+
+ // bleh
+
+
+ delete remote_addr;
+ delete in_sd;
+ delete out_sd;
+}
+
+int tcpmessenger_world()
+{
+ return mpi_world;
+}
+
+
+
+/***
+ * internal send/recv
+ */
+
+
+
+bool tcp_read(int sd, char *buf, int len)
+{
+ while (len > 0) {
+ int got = ::recv( sd, buf, len, 0 );
+ if (got < 0) return false;
+ assert(got >= 0);
+ len -= got;
+ buf += got;
+ //dout(DBL) << "tcp_read got " << got << ", " << len << " left" << endl;
+ }
+ return true;
+}
+
+void tcp_write(int sd, char *buf, int len)
+{
+ //dout(DBL) << "tcp_write writing " << len << endl;
+ while (len > 0) {
+ int did = ::send( sd, buf, len, 0 );
+ assert(did >= 0);
+ len -= did;
+ buf += did;
+ dout(DBL) << "tcp_write did " << did << ", " << len << " left" << endl;
+ }
+
+}
+
+
+/*
+ * recv a Message*
+ */
+
+
+
+void tcp_wait()
+{
+ fd_set fds;
+ FD_ZERO(&fds);
+
+ int n = 0;
+
+ for (int i=0; i<mpi_world; i++) {
+ if (in_sd[i] == 0) continue;
+ FD_SET(in_sd[i], &fds);
+ n++;
+ }
+ assert(n == mpi_world);
+
+ struct timeval tv;
+ tv.tv_sec = 10; // time out every few seconds
+ tv.tv_usec = 0;
+
+ dout(DBL) << "tcp_wait on " << n << endl;
+ int r = ::select(n, &fds, 0, &fds, 0);//&tv);
+ dout(DBL) << "select returned " << r << endl;
+}
+
+
+
+Message *tcp_recv(int from)
+{
+ // envelope
+ dout(DBL) << "tcp_recv receiving message from " << from << endl;
+
+ msg_envelope_t env;
+ if (!tcp_read( in_sd[from], (char*)&env, sizeof(env) ))
+ return 0;
+
+ if (env.type == 0) {
+ dout(DBL) << "got dummy env, bailing" << endl;
+ return 0;
+ }
+
+ dout(DBL) << "tcp_recv got envelope type=" << env.type << " src " << env.source << " dst " << env.dest << " nchunks=" << env.nchunks << endl;
+
+ // payload
+ bufferlist blist;
+ for (int i=0; i<env.nchunks; i++) {
+ int size;
+ tcp_read( in_sd[from], (char*)&size, sizeof(size) );
+
+ bufferptr bp = new buffer(size);
+
+ tcp_read( in_sd[from], bp.c_str(), size );
+
+ bp.set_length(size);
+ blist.push_back(bp);
+
+ dout(DBL) << "tcp_recv got frag " << i << " of " << env.nchunks << " len " << bp.length() << endl;
+ }
+
+ dout(DBL) << "tcp_recv got " << blist.length() << " byte message" << endl;
+
+ // unmarshall message
+ Message *m = decode_message(env, blist);
+ return m;
+}
+
+
+void *tcp_inthread(void *r)
+{
+ int who = (int)r;
+
+ dout(DBL) << "tcp_inthread reading for " << who << endl;
+
+ while (!tcp_done) {
+ /*
+ fd_set fds;
+ FD_ZERO(&fds);
+ FD_SET(in_sd[who], &fds);
+
+ dout(DBL) << "tcp_inthread waiting on socket" << endl;
+ ::select(1, &fds, 0, &fds, 0);
+ */
+
+ Message *m = tcp_recv(who);
+ if (!m) break;
+
+ incoming_lock.Lock();
+ incoming.push_back(m);
+ incoming_lock.Unlock();
+ incoming_cond.Signal();
+ }
+
+ dout(DBL) << "tcp_inthrad closing " << who << endl;
+
+ ::close(in_sd[who]);
+ in_sd[who] = 0;
+
+ return 0;
+}
+
+
+void *tcp_accepter(void *)
+{
+ dout(DBL) << "tcp_accepter starting" << endl;
+
+ int left = mpi_world;
+ while (left > 0) {
+ //dout(DBL) << "accepting, left = " << left << endl;
+
+ struct sockaddr_in addr;
+ socklen_t slen = sizeof(addr);
+ int sd = ::accept(listen_sd, (sockaddr*)&addr, &slen);
+ if (sd > 0) {
+
+ //dout(DBL) << "accepted incoming, reading who it is " << endl;
+
+ int who;
+ tcp_read(sd, (char*)&who, sizeof(who));
+
+ in_sd[who] = sd;
+ left--;
+
+ dout(DBL) << "accepted incoming from " << who << ", left = " << left << endl;
+
+ pthread_create(&in_threads[who],
+ NULL,
+ tcp_inthread,
+ (void*)who);
+ } else {
+ dout(DBL) << "no incoming connection?" << endl;
+ }
+ }
+ dout(DBL) << "got incoming from everyone!" << endl;
+}
+
+
+
+bool tcp_recv_any()
+{
+ bool any = false;
+
+ //if (mpi_rank == 0 && tcp_accept()) any = true;
+
+ // any?
+ for (int i=0; i<mpi_world; i++) {
+ if (in_sd[i] == 0) continue;
+
+ char blah;
+ int r = ::recv( in_sd[i], &blah, 1, MSG_PEEK|MSG_DONTWAIT );
+ if (r > 0) {
+ Message *m = tcp_recv(i);
+ if (m) {
+ incoming_lock.Lock();
+ incoming.push_back(m);
+ incoming_cond.Signal();
+ incoming_lock.Unlock();
+ any = true;
+ }
+ }
+ }
+
+ return any;
+}
+
+void tcp_open(int who)
+{
+ //dout(DBL) << "tcp_open " << who << " to " << remote_addr[who] << endl;
+
+ // create socket?
+ int sd = socket(AF_INET,SOCK_STREAM,0);
+ assert(sd > 0);
+
+ // bind any port
+ struct sockaddr_in myAddr;
+ myAddr.sin_family = AF_INET;
+ myAddr.sin_addr.s_addr = htonl(INADDR_ANY);
+ myAddr.sin_port = htons( 0 );
+
+ int rc = bind(sd, (struct sockaddr *) &myAddr, sizeof(myAddr));
+ assert(rc>=0);
+
+ // connect!
+ int r = connect(sd, (sockaddr*)&remote_addr[who], sizeof(myAddr));
+ assert(r >= 0);
+
+ //dout(DBL) << "tcp_open connected to " << who << endl;
+
+ int me = mpi_rank;
+ tcp_write(sd, (char*)&me, sizeof(me));
+
+ out_sd[who] = sd;
+}
+
+
+
+/*
+ * send a Message* over the wire. ** do not block **.
+ */
+int tcp_send(Message *m)
+{
+ int rank = MPI_DEST_TO_RANK(m->get_dest(), mpi_world);
+
+ // marshall
+ m->encode_payload();
+ msg_envelope_t *env = &m->get_envelope();
+ bufferlist blist = m->get_payload();
+ env->nchunks = blist.buffers().size();
+
+ dout(7) << "sending " << *m << " to " << MSG_ADDR_NICE(env->dest) << " (rank " << rank << ")" << endl;
+
+ sender_lock.Lock();
+
+ // open first?
+ if (out_sd[rank] == 0) tcp_open(rank);
+
+ // send envelope
+ tcp_write( out_sd[rank], (char*)env, sizeof(*env) );
+
+ // payload
+ int i = 0;
+ for (list<bufferptr>::iterator it = blist.buffers().begin();
+ it != blist.buffers().end();
+ it++) {
+ dout(DBL) << "tcp_sending frag " << i << " len " << (*it).length() << endl;
+ int size = (*it).length();
+ tcp_write( out_sd[rank], (char*)&size, sizeof(size) );
+ tcp_write( out_sd[rank], (*it).c_str(), size );
+ i++;
+ }
+
+ sender_lock.Unlock();
+}
+
+
+
+// recv event loop, for unsolicited messages.
+
+void* tcpmessenger_loop(void*)
+{
+ dout(5) << "tcpmessenger_loop start pid " << getpid() << endl;
+
+ incoming_lock.Lock();
+
+ while (1) {
+
+ // timer events?
+ if (pending_timer) {
+ Timer *t = pending_timer;
+ pending_timer = 0;
+
+ dout(DBL) << "pending timer" << endl;
+ t->execute_pending();
+ }
+
+ // done?
+ if (tcp_done &&
+ incoming.empty() &&
+ pending_timer == 0) break;
+
+ // incoming
+ dout(12) << "loop waiting for incoming messages" << endl;
+
+ incoming_cond.Wait(incoming_lock);
+
+ while (incoming.size()) {
+ Message *m = incoming.front();
+ incoming.pop_front();
+
+ int dest = m->get_dest();
+ if (directory.count(dest)) {
+ Messenger *who = directory[ dest ];
+
+ dout(4) << "---- '" << m->get_type_name() <<
+ "' from " << MSG_ADDR_NICE(m->get_source()) << ':' << m->get_source_port() <<
+ " to " << MSG_ADDR_NICE(m->get_dest()) << ':' << m->get_dest_port() << " ---- "
+ << m
+ << endl;
+
+ who->dispatch(m);
+ } else {
+ dout (1) << "---- i don't know who " << dest << " is." << endl;
+ assert(0);
+ break;
+ }
+ }
+ }
+
+ incoming_lock.Unlock();
+
+ g_timer.shutdown();
+
+ dout(5) << "tcpmessenger_loop exiting loop" << endl;
+}
+
+
+// start/stop mpi receiver thread (for unsolicited messages)
+int tcpmessenger_start()
+{
+ dout(5) << "starting accept thread" << endl;
+ pthread_create(&listen_thread_id,
+ NULL,
+ tcp_accepter,
+ 0);
+
+ dout(5) << "starting dispatch thread" << endl;
+
+ // start a thread
+ pthread_create(&dispatch_thread_id,
+ NULL,
+ tcpmessenger_loop,
+ 0);
+
+}
+
+
+/*
+ * kick and wake up _loop (to pick up new outgoing message, or quit)
+ */
+
+void tcpmessenger_kick_loop()
+{
+
+ incoming_cond.Signal();
+ /*
+
+ // if we're same thread as the loop, no kicking necessary
+ if (pthread_self() == dispatch_thread_id) return;
+
+ msg_envelope_t kick_env;
+ kick_env.type = 0;
+
+ sender_lock.Lock();
+ tcp_write( out_sd[mpi_rank], (char*)&kick_env, sizeof(kick_env) );
+ sender_lock.Unlock();
+ */
+}
+
+
+// wait for thread to finish
+
+void tcpmessenger_wait()
+{
+ incoming_cond.Signal();
+
+ void *returnval;
+ dout(10) << "tcpmessenger_wait waiting for thread to finished." << endl;
+ pthread_join(dispatch_thread_id, &returnval);
+ dout(10) << "tcpmessenger_wait thread finished." << endl;
+}
+
+
+
+
+/***********
+ * Tcpmessenger class implementation
+ */
+
+TCPMessenger::TCPMessenger(msg_addr_t myaddr) : Messenger(myaddr)
+{
+ // my address
+ this->myaddr = myaddr;
+
+ // register myself in the messenger directory
+ directory[myaddr] = this;
+
+ // register to execute timer events
+ g_timer.set_messenger(this);
+
+ // logger
+ /*
+ string name;
+ name = "m.";
+ name += MSG_ADDR_TYPE(whoami);
+ int w = MSG_ADDR_NUM(whoami);
+ if (w >= 1000) name += ('0' + ((w/1000)%10));
+ if (w >= 100) name += ('0' + ((w/100)%10));
+ if (w >= 10) name += ('0' + ((w/10)%10));
+ name += ('0' + ((w/1)%10));
+
+ logger = new Logger(name, (LogType*)&mpimsg_logtype);
+ loggers[ whoami ] = logger;
+ */
+}
+
+TCPMessenger::~TCPMessenger()
+{
+ //delete logger;
+}
+
+
+int TCPMessenger::shutdown()
+{
+ // remove me from the directory
+ directory.erase(myaddr);
+
+ // no more timer events
+ g_timer.unset_messenger();
+
+ // last one?
+ if (directory.empty()) {
+ dout(10) << "shutdown last tcpmessenger on rank " << mpi_rank << " shut down" << endl;
+ pthread_t whoami = pthread_self();
+
+
+
+ // close incoming sockets
+ void *r;
+ for (int i=0; i<mpi_world; i++) {
+ if (in_sd[i] == 0) continue;
+ dout(DBL) << "closing reader on " << i << " sd " << in_sd[i] << endl;
+ ::close(in_sd[i]);
+ //dout(DBL) << "waiting for reader thread to close on " << i << endl;
+ //pthread_join(in_threads[i], &r);
+ }
+
+ dout(DBL) << "setting tcp_done" << endl;
+
+ tcp_done = true;
+ incoming_cond.Signal();
+ /*
+
+ dout(15) << "whoami = " << whoami << ", thread = " << dispatch_thread_id << endl;
+ if (whoami == thread_id) {
+ // i am the event loop thread, just set flag!
+ dout(15) << " set tcp_done=true" << endl;
+ tcp_done = true;
+ }
+ */
+ } else {
+ dout(10) << "shutdown still " << directory.size() << " other messengers on rank " << mpi_rank << endl;
+ }
+}
+
+
+
+/*** events
+ */
+
+void TCPMessenger::trigger_timer(Timer *t)
+{
+ pending_timer = t;
+
+ tcpmessenger_kick_loop();
+}
+
+/***
+ * public messaging interface
+ */
+
+
+/* note: send_message _MUST_ be non-blocking */
+int TCPMessenger::send_message(Message *m, msg_addr_t dest, int port, int fromport)
+{
+ // set envelope
+ m->set_source(myaddr, fromport);
+ m->set_dest(dest, port);
+
+ tcp_send(m);
+}
+
+
+
+
--- /dev/null
+#ifndef __TCPMESSENGER_H
+#define __TCPMESSENGER_H
+
+#include "Messenger.h"
+#include "Dispatcher.h"
+
+#define NUMMDS g_conf.num_mds
+#define NUMOSD g_conf.num_osd
+#define MPI_DEST_TO_RANK(dest,world) ((dest)<(NUMMDS+NUMOSD) ? \
+ (dest) : \
+ ((NUMMDS+NUMOSD)+(((dest)-NUMMDS-NUMOSD) % ((world)-NUMMDS-NUMOSD))))
+
+class Timer;
+
+class TCPMessenger : public Messenger {
+ protected:
+ msg_addr_t myaddr; // my address
+
+ //class Logger *logger; // for logging
+
+ public:
+ TCPMessenger(msg_addr_t myaddr);
+ ~TCPMessenger();
+
+ // init, shutdown MPI and associated event loop thread.
+ virtual int shutdown();
+
+ // message interface
+ virtual int send_message(Message *m, msg_addr_t dest, int port=0, int fromport=0);
+ virtual Message* sendrecv(Message *m, msg_addr_t dest, int port=0) { assert(0); }
+
+ // events
+ virtual void trigger_timer(Timer *t);
+};
+
+/**
+ * these are all ONE per process.
+ */
+
+extern int tcpmessenger_world();
+extern int tcpmessenger_init(int& argc, char**& argv); // init mpi
+extern int tcpmessenger_start(); // start thread
+extern void tcpmessenger_wait(); // wait for thread to finish.
+extern int tcpmessenger_shutdown(); // finalize MPI
+
+
+#endif
* some system constants
*/
#define NUM_REPLICA_GROUPS (1<<20) // ~1M
-#define NUM_RUSH_REPLICAS 10 // this should be big enough to cope w/ failing disks.
+#define NUM_RUSH_REPLICAS 4 // this should be big enough to cope w/ failing disks.
#define MAX_REPLICAS 3
#define FILE_OBJECT_SIZE (1<<20) // 1 MB object size
--- /dev/null
+
+
+#include <sys/stat.h>
+#include <iostream>
+#include <string>
+using namespace std;
+
+#include "include/config.h"
+
+#include "mds/MDCluster.h"
+#include "mds/MDS.h"
+#include "osd/OSD.h"
+#include "client/Client.h"
+#include "client/fuse.h"
+
+#include "msg/TCPMessenger.h"
+#include "msg/CheesySerializer.h"
+
+#include "common/Timer.h"
+
+#define NUMMDS g_conf.num_mds
+#define NUMOSD g_conf.num_osd
+#define NUMCLIENT g_conf.num_client
+
+class C_Test : public Context {
+public:
+ void finish(int r) {
+ cout << "C_Test->finish(" << r << ")" << endl;
+ }
+};
+
+
+int main(int oargc, char **oargv) {
+
+ //cerr << "tcpfuse starting " << myrank << "/" << world << endl;
+ int argc;
+ char **argv;
+ parse_config_options(oargc, oargv,
+ argc, argv);
+
+ int start = 0;
+
+ // build new argc+argv for fuse
+ typedef char* pchar;
+ int nargc = 0;
+ char **nargv = new pchar[argc];
+ nargv[nargc++] = argv[0];
+
+ int mkfs = 0;
+ for (int i=1; i<argc; i++) {
+ if (strcmp(argv[i], "--fastmkfs") == 0) {
+ mkfs = MDS_MKFS_FAST;
+ }
+ else if (strcmp(argv[i], "--fullmkfs") == 0) {
+ mkfs = MDS_MKFS_FULL;
+ }
+ else {
+ // unknown arg, pass it on.
+ nargv[nargc++] = argv[i];
+ }
+ }
+
+ int myrank = tcpmessenger_init(argc, argv);
+ int world = tcpmessenger_world();
+
+ assert(NUMMDS + NUMOSD + NUMCLIENT <= world);
+
+ MDCluster *mdc = new MDCluster(NUMMDS, NUMOSD);
+
+
+ char hostname[100];
+ gethostname(hostname,100);
+ int pid = getpid();
+
+ // create mds
+ MDS *mds[NUMMDS];
+ for (int i=0; i<NUMMDS; i++) {
+ if (myrank != MPI_DEST_TO_RANK(MSG_ADDR_MDS(i),world)) continue;
+ cerr << "mds" << i << " on rank " << myrank << " " << hostname << "." << pid << endl;
+ mds[i] = new MDS(mdc, i, new TCPMessenger(MSG_ADDR_MDS(i)));
+ start++;
+ }
+
+ // create osd
+ OSD *osd[NUMOSD];
+ for (int i=0; i<NUMOSD; i++) {
+ if (myrank != MPI_DEST_TO_RANK(MSG_ADDR_OSD(i),world)) continue;
+ cerr << "osd" << i << " on rank " << myrank << " " << hostname << "." << pid << endl;
+ osd[i] = new OSD(i, new TCPMessenger(MSG_ADDR_OSD(i)));
+ start++;
+ }
+
+ // create client
+ Client *client[NUMCLIENT];
+ for (int i=0; i<NUMCLIENT; i++) {
+ if (myrank != MPI_DEST_TO_RANK(MSG_ADDR_CLIENT(i),world)) continue;
+ cerr << "client" << i << " on rank " << myrank << " " << hostname << "." << pid << endl;
+ CheesySerializer *serializer = new CheesySerializer( new TCPMessenger(MSG_ADDR_CLIENT(i)) );
+ client[i] = new Client(mdc, i, serializer);
+ start++;
+ }
+
+
+ // start message loop
+ if (start) {
+ tcpmessenger_start();
+
+ // init
+ for (int i=0; i<NUMMDS; i++) {
+ if (myrank != MPI_DEST_TO_RANK(MSG_ADDR_MDS(i),world)) continue;
+ mds[i]->init();
+ }
+
+ for (int i=0; i<NUMOSD; i++) {
+ if (myrank != MPI_DEST_TO_RANK(MSG_ADDR_OSD(i),world)) continue;
+ osd[i]->init();
+ }
+
+ // create client
+ for (int i=0; i<NUMCLIENT; i++) {
+ if (myrank != MPI_DEST_TO_RANK(MSG_ADDR_CLIENT(i),world)) continue;
+
+ client[i]->init();
+
+ // start up fuse
+ // use my argc, argv (make sure you pass a mount point!)
+ cout << "mounting" << endl;
+ client[i]->mount(mkfs);
+
+ cout << "starting fuse on rank " << myrank << " pid " << getpid() << endl;
+ ceph_fuse_main(client[i], nargc, nargv);
+ cout << "fuse finished on rank " << myrank << " pid " << getpid() << endl;
+ }
+ for (int i=0; i<NUMCLIENT; i++) {
+ if (myrank != MPI_DEST_TO_RANK(MSG_ADDR_CLIENT(i),world)) continue;
+
+ client[i]->unmount();
+ cout << "unmounted" << endl;
+ client[i]->shutdown();
+ }
+
+
+ // wait for it to finish
+ tcpmessenger_wait();
+
+ } else {
+ cerr << "IDLE rank " << myrank << endl;
+ }
+
+ tcpmessenger_shutdown(); // shutdown MPI
+
+ // cleanup
+ for (int i=0; i<NUMMDS; i++) {
+ if (myrank != MPI_DEST_TO_RANK(MSG_ADDR_MDS(i),world)) continue;
+ delete mds[i];
+ }
+ for (int i=0; i<NUMOSD; i++) {
+ if (myrank != MPI_DEST_TO_RANK(MSG_ADDR_OSD(i),world)) continue;
+ delete osd[i];
+ }
+ for (int i=0; i<NUMCLIENT; i++) {
+ if (myrank != MPI_DEST_TO_RANK(MSG_ADDR_CLIENT(i),world)) continue;
+ delete client[i];
+ }
+ delete mdc;
+
+ return 0;
+}
+
--- /dev/null
+
+#include <sys/stat.h>
+#include <iostream>
+#include <string>
+using namespace std;
+
+#include "include/config.h"
+
+#include "mds/MDCluster.h"
+#include "mds/MDS.h"
+#include "osd/OSD.h"
+#include "client/Client.h"
+#include "client/SyntheticClient.h"
+
+#include "msg/TCPMessenger.h"
+#include "msg/CheesySerializer.h"
+
+#include "common/Timer.h"
+
+#define NUMMDS g_conf.num_mds
+#define NUMOSD g_conf.num_osd
+#define NUMCLIENT g_conf.num_client
+
+class C_Test : public Context {
+public:
+ void finish(int r) {
+ cout << "C_Test->finish(" << r << ")" << endl;
+ }
+};
+
+
+int main(int oargc, char **oargv) {
+
+ //cerr << "tcpsyn starting " << myrank << "/" << world << endl;
+ int argc;
+ char **argv;
+ parse_config_options(oargc, oargv,
+ argc, argv);
+
+ int start = 0;
+
+ // build new argc+argv for fuse
+ typedef char* pchar;
+ int nargc = 0;
+ char **nargv = new pchar[argc];
+ nargv[nargc++] = argv[0];
+
+ string syn_sarg1;
+ int syn_mode = SYNCLIENT_MODE_WRITEFILE;
+ int syn_iarg1, syn_iarg2, syn_iarg3;
+ int mkfs = 0;
+ for (int i=1; i<argc; i++) {
+ if (strcmp(argv[i], "--fastmkfs") == 0) {
+ mkfs = MDS_MKFS_FAST;
+ }
+ else if (strcmp(argv[i], "--fullmkfs") == 0) {
+ mkfs = MDS_MKFS_FULL;
+ }
+ else if (strcmp(argv[i],"--synsarg1") == 0)
+ syn_sarg1 = argv[++i];
+ else if (strcmp(argv[i],"--syniarg1") == 0)
+ syn_iarg1 = atoi(argv[++i]);
+ else if (strcmp(argv[i],"--syniarg2") == 0)
+ syn_iarg2 = atoi(argv[++i]);
+ else if (strcmp(argv[i],"--syniarg3") == 0)
+ syn_iarg3 = atoi(argv[++i]);
+ else if (strcmp(argv[i],"--synmode") == 0) {
+ ++i;
+ if (strcmp(argv[i],"writefile") == 0)
+ syn_mode = SYNCLIENT_MODE_WRITEFILE;
+ else if (strcmp(argv[i],"makedirs") == 0)
+ syn_mode = SYNCLIENT_MODE_MAKEDIRS;
+ else if (strcmp(argv[i],"fullwalk") == 0)
+ syn_mode = SYNCLIENT_MODE_FULLWALK;
+ else if (strcmp(argv[i],"randomwalk") == 0)
+ syn_mode = SYNCLIENT_MODE_RANDOMWALK;
+ else {
+ cerr << "unknown syn mode " << argv[i] << endl;
+ return -1;
+ }
+ }
+
+ else {
+ // unknown arg, pass it on.
+ nargv[nargc++] = argv[i];
+ }
+ }
+
+ int myrank = tcpmessenger_init(argc, argv);
+ int world = tcpmessenger_world();
+
+ if (myrank == 0)
+ cerr << "nummds " << NUMMDS << " numosd " << NUMOSD << " numclient " << NUMCLIENT << endl;
+ assert(NUMMDS + NUMOSD + 1 <= world);
+
+ MDCluster *mdc = new MDCluster(NUMMDS, NUMOSD);
+
+
+ char hostname[100];
+ gethostname(hostname,100);
+ int pid = getpid();
+
+ // create mds
+ MDS *mds[NUMMDS];
+ for (int i=0; i<NUMMDS; i++) {
+ if (myrank != MPI_DEST_TO_RANK(MSG_ADDR_MDS(i),world)) continue;
+ cerr << "mds" << i << " on rank " << myrank << " " << hostname << "." << pid << endl;
+ mds[i] = new MDS(mdc, i, new TCPMessenger(MSG_ADDR_MDS(i)));
+ start++;
+ }
+
+ // create osd
+ OSD *osd[NUMOSD];
+ for (int i=0; i<NUMOSD; i++) {
+ if (myrank != MPI_DEST_TO_RANK(MSG_ADDR_OSD(i),world)) continue;
+ cerr << "osd" << i << " on rank " << myrank << " " << hostname << "." << pid << endl;
+ osd[i] = new OSD(i, new TCPMessenger(MSG_ADDR_OSD(i)));
+ start++;
+ }
+
+ // create client
+ Client *client[NUMCLIENT];
+ SyntheticClient *syn[NUMCLIENT];
+ for (int i=0; i<NUMCLIENT; i++) {
+ if (myrank != MPI_DEST_TO_RANK(MSG_ADDR_CLIENT(i),world)) continue;
+ cerr << "client" << i << " on rank " << myrank << " " << hostname << "." << pid << endl;
+ CheesySerializer *serializer = new CheesySerializer( new TCPMessenger(MSG_ADDR_CLIENT(i)) );
+ client[i] = new Client(mdc, i, serializer);
+ start++;
+ }
+
+
+ // start message loop
+ if (start) {
+ tcpmessenger_start();
+
+ // init
+ for (int i=0; i<NUMMDS; i++) {
+ if (myrank != MPI_DEST_TO_RANK(MSG_ADDR_MDS(i),world)) continue;
+ mds[i]->init();
+ }
+
+ for (int i=0; i<NUMOSD; i++) {
+ if (myrank != MPI_DEST_TO_RANK(MSG_ADDR_OSD(i),world)) continue;
+ osd[i]->init();
+ }
+
+ // create client
+ for (int i=0; i<NUMCLIENT; i++) {
+ if (myrank != MPI_DEST_TO_RANK(MSG_ADDR_CLIENT(i),world)) continue;
+
+ client[i]->init();
+
+ // use my argc, argv (make sure you pass a mount point!)
+ //cout << "mounting" << endl;
+ client[i]->mount(mkfs);
+
+ //cout << "starting synthetic client on rank " << myrank << endl;
+ syn[i] = new SyntheticClient(client[i]);
+
+ char s[20];
+ sprintf(s,"syn.%d", i);
+ syn[i]->sarg1 = s;
+
+ syn[i]->mode = syn_mode;
+ syn[i]->iarg1 = syn_iarg1;
+ syn[i]->iarg2 = syn_iarg2;
+ syn[i]->iarg3 = syn_iarg3;
+
+ syn[i]->start_thread();
+ }
+ for (int i=0; i<NUMCLIENT; i++) {
+ if (myrank != MPI_DEST_TO_RANK(MSG_ADDR_CLIENT(i),world)) continue;
+
+ cout << "waiting for synthetic client" << i << " to finish" << endl;
+ syn[i]->join_thread();
+ delete syn[i];
+
+ client[i]->unmount();
+ //cout << "client" << i << " unmounted" << endl;
+ client[i]->shutdown();
+ }
+
+
+ // wait for it to finish
+ tcpmessenger_wait();
+
+ //assert(0);
+ } else {
+ cerr << "IDLE rank " << myrank << endl;
+ }
+
+ tcpmessenger_shutdown();
+
+ // cleanup
+ for (int i=0; i<NUMMDS; i++) {
+ if (myrank != MPI_DEST_TO_RANK(MSG_ADDR_MDS(i),world)) continue;
+ delete mds[i];
+ }
+ for (int i=0; i<NUMOSD; i++) {
+ if (myrank != MPI_DEST_TO_RANK(MSG_ADDR_OSD(i),world)) continue;
+ delete osd[i];
+ }
+ for (int i=0; i<NUMCLIENT; i++) {
+ if (myrank != MPI_DEST_TO_RANK(MSG_ADDR_CLIENT(i),world)) continue;
+ delete client[i];
+ }
+ delete mdc;
+
+ return 0;
+}
+
mpimessenger_start();
- while(1) {
+ //while (1) {
+ for (int i=0; i<10000; i++) {
// ping random nodes
int d = rand() % world;
}
}
- cout << "shutting down" << endl;
- p->messenger->shutdown();
+
+ //cout << "shutting down" << endl;
+ //p->messenger->shutdown();
mpimessenger_wait();
mpimessenger_shutdown(); // shutdown MPI