${MPICC} ${MPICFLAGS} ${MPILIBS} $^ -o $@
obfstest: tcpsyn.cc mds/allmds.o client/Client.o client/SyntheticClient.o osd/OSD.cc osd/OBFSStore.o msg/TCPMessenger.cc ${COMMON_OBJS}
- ${MPICC} -DUSE_OBFS ${MPICFLAGS} ${MPILIBS} $^ -o $@ ./lib/uofs.a
+ ${MPICC} -DUSE_OBFS ${MPICFLAGS} ${MPILIBS} $^ -o $@ ../uofs/uofs.a
fakesyn: fakesyn.cc mds/allmds.o client/Client.o client/SyntheticClient.o osd/OSD.o msg/FakeMessenger.o ${COMMON_OBJS}
${CC} -pg ${CFLAGS} ${LIBS} $^ -o $@
assert(!mounted); // caller is confused?
- dout(1) << "mounting" << endl;
+ dout(2) << "mounting" << endl;
MClientMount *m = new MClientMount();
if (mkfs) m->set_mkfs(mkfs);
// we got osdcluster!
osdcluster->decode(reply->get_osd_cluster_state());
- dout(1) << "mounted" << endl;
+ dout(2) << "mounted" << endl;
mounted = true;
delete reply;
assert(mounted); // caller is confused?
- dout(1) << "unmounting" << endl;
+ dout(2) << "unmounting" << endl;
Message *req = new MGenericMessage(MSG_CLIENT_UNMOUNT);
client_lock.Unlock();
Message *reply = messenger->sendrecv(req, MSG_ADDR_MDS(0), MDS_PORT_SERVER);
client_lock.Lock();
assert(reply);
mounted = false;
- dout(1) << "unmounted" << endl;
+ dout(2) << "unmounted" << endl;
delete reply;
{
dout(15) << "sync_pos now " << off << endl;
sync_pos = off;
+
+ // discard written bufferlist
+ delete writing_buffers[off];
+ writing_buffers.erase(off);
// wake up waiters
map< off_t, list<Context*> >::iterator it = waiting_for_sync.begin();
assert(write_buf.length() == append_pos - flush_pos);
+ // tuck writing buffer away until write finishes
+ writing_buffers[flush_pos] = new bufferlist;
+ writing_buffers[flush_pos]->claim(write_buf);
+
+ // write it
mds->filer->write(log_ino,
- write_buf.length(), flush_pos,
- write_buf,
+ writing_buffers[flush_pos]->length(), flush_pos,
+ *writing_buffers[flush_pos],
0,
new C_LS_Append(this, append_pos));
-
- write_buf.clear();
+
flush_pos = append_pos;
} else {
dout(15) << "flush flush_pos " << flush_pos << " == append_pos " << append_pos << ", nothing to do" << endl;
off_t append_pos; // where next event will be written
bufferlist write_buf; // unwritten (between flush_pos and append_pos)
+ std::map< off_t, bufferlist* > writing_buffers;
+
+
// reading
off_t read_pos; // abs position in file
//off_t read_buf_start; // where read buf begins
int tcpmessenger_shutdown()
{
- dout(1) << "tcpmessenger_shutdown closing all sockets etc" << endl;
+ dout(2) << "tcpmessenger_shutdown closing all sockets etc" << endl;
// bleh
for (int i=0; i<mpi_world; i++) {
#include "OBFSStore.h"
extern "C" {
-#include "../include/uofs.h"
+#include "../../uofs/uofs.h"
}
+
#include "include/types.h"
#include <unistd.h>
}
// create client
+ set<int> clientlist;
Client *client[NUMCLIENT];
SyntheticClient *syn[NUMCLIENT];
for (int i=0; i<NUMCLIENT; i++) {
if (myrank != MPI_DEST_TO_RANK(MSG_ADDR_CLIENT(i),world)) continue;
- cerr << "client" << i << " on rank " << myrank << " " << hostname << "." << pid << endl;
+ clientlist.insert(i);
client[i] = new Client(mdc, i, new TCPMessenger(MSG_ADDR_CLIENT(i)) );
start++;
}
+ cerr << "clients " << clientlist << " on rank " << myrank << " " << hostname << "." << pid << endl;
// start message loop