mpitest: mpitest.o msg/MPIMessenger.cc pmds.o
${MPICC} ${CFLAGS} ${MPILIBS} mpitest.o msg/MPIMessenger.cc pmds.o -o mpitest
-singleclient: pmds.o fakesingleclient.o client/Client.o msg/CheesySerializer.o msg/FakeMessenger.o
- ${CC} ${CFLAGS} ${LIBS} pmds.o client/Client.o msg/FakeMessenger.o msg/CheesySerializer.o fakesingleclient.o ${LEAKTRACER} -o singleclient
+singleclient: pmds.o fakesingleclient.o client/Client.o msg/CheesySerializer.o msg/FakeMessenger.o fsck.o
+ ${CC} ${CFLAGS} ${LIBS} pmds.o client/Client.o msg/FakeMessenger.o msg/CheesySerializer.o fakesingleclient.o fsck.o ${LEAKTRACER} -o singleclient
fuseclient: client/Client.o client/fuse.o msg/CheesySerializer.o msg/FakeMessenger.o
${CC} ${CFLAGS} ${LIBS} pmds.o client/fuse.o client/Client.o msg/FakeMessenger.o msg/CheesySerializer.o ${LEAKTRACER} -o fuseclient
// fyi: typedef int (*dirfillerfunc_t) (void *handle, const char *name, int type, inodeno_t ino);
-int Client::getdir(const char *path, map<string,inode_t*> contents)
+int Client::getdir(const char *path, map<string,inode_t*>& contents)
{
+ MClientRequest *req = new MClientRequest(MDS_OP_READDIR, whoami);
+ req->set_path(path);
+
+ // FIXME where does FUSE maintain user information
+ req->set_caller_uid(getuid());
+ req->set_caller_gid(getgid());
+
+ //FIXME enforce caller uid rights?
+
+ MClientReply *reply = (MClientReply*)serial_messenger->sendrecv(req, MSG_ADDR_MDS(0), MDS_PORT_SERVER);
+ int res = reply->get_result();
- // ...
- int res;
+ // dir contents to caller!
+ vector<c_inode_info*>::iterator it;
+ for (it = reply->get_dir_contents().begin();
+ it != reply->get_dir_contents().end();
+ it++) {
+ // put in cache
- // return contents to caller
- /*
- for (...) {
-
- contents[dentryname] = inodeptr; // ptr to inode_t in our cache
+ contents[(*it)->ref_dn] = &(*it)->inode; // FIXME don't use one from reply!
}
- */
+
+ //delete reply; fix thing above first
return res;
}
+
+
+
int statfs(const char *path, struct statfs *stbuf);
// namespace ops
- int getdir(const char *path, map<string,inode_t*> contents);
+ int getdir(const char *path, map<string,inode_t*>& contents);
int link(const char *existing, const char *newname);
int unlink(const char *path);
int rename(const char *from, const char *to);
fake_clock: true,
fakemessenger_serialize: true,// false,
- debug: 14,
+ debug: 10,
mdcache_size: 500, //MDS_CACHE_SIZE,
mdcache_mid: .8,
client_op_chown: 10, // untested
client_op_readdir: 10,
- client_op_mknod: 10,
+ client_op_mknod: 100,
client_op_link: false,
client_op_unlink: 10,
- client_op_rename: 100,
+ client_op_rename: 00,
- client_op_mkdir: 10,
+ client_op_mkdir: 100,
client_op_rmdir: 10,
client_op_symlink: 10,
nnull++;
}
- dout(10) << "add_dentry " << *dn << endl;
+ dout(12) << "add_dentry " << *dn << endl;
// pin?
if (nnull + nitems == 1) get(CDIR_PIN_CHILD);
void CDir::remove_dentry(CDentry *dn)
{
- dout(10) << "remove_dentry " << *dn << endl;
+ dout(12) << "remove_dentry " << *dn << endl;
if (dn->inode) {
// detach inode and dentry
void CDir::link_inode( CDentry *dn, CInode *in )
{
link_inode_work(dn,in);
- dout(10) << "link_inode " << *dn << " " << *in << endl;
+ dout(12) << "link_inode " << *dn << " " << *in << endl;
// remove from null list
assert(null_items.count(dn->name) == 1);
void CDir::unlink_inode( CDentry *dn )
{
- dout(10) << "unlink_inode " << *dn << " " << *dn->inode << endl;
+ dout(12) << "unlink_inode " << *dn << " " << *dn->inode << endl;
unlink_inode_work(dn);
this->onfinish = onfinish;
}
void finish(int r) {
- cout << "TraverseDiscover r = " << r << endl;
+ //dout(10) << "TraverseDiscover r = " << r << endl;
if (r < 0) { // ENOENT on discover, pass back to caller.
onfinish->finish(r);
} else {
#include <iostream>
using namespace std;
+#include "include/config.h"
+#undef dout
+#define dout(l) if (l<=g_conf.debug) cout << "serializer: "
+
+#define DEBUGLVL 13 // debug level of output
// ---------
// incoming messages
// was i expecting it?
if (call_sem.count(tid)) {
// yes, this is a reply to a pending call.
- cout << "serializer: dispatch got reply for " << tid << " " << m << endl;
+ dout(DEBUGLVL) << "dispatch got reply for " << tid << " " << m << endl;
call_reply[tid] = m; // set reply
- call_sem[tid]->Post();
+ int r = call_sem[tid]->Post();
+ //cout << "post = " << r << endl;
lock.Unlock();
} else {
// no, this is an unsolicited message.
lock.Unlock();
- cout << "serializer: dispatch got unsolicited message" << m << endl;
+ dout(DEBUGLVL) << "dispatch got unsolicited message" << m << endl;
dispatcher->dispatch(m);
}
}
void CheesySerializer::send(Message *m, msg_addr_t dest, int port, int fromport)
{
// just pass it on to the messenger
- cout << "serializer: send " << m << endl;
+ dout(DEBUGLVL) << "send " << m << endl;
messenger->send_message(m, dest, port, fromport);
}
Message *CheesySerializer::sendrecv(Message *m, msg_addr_t dest, int port, int fromport)
{
- Semaphore *sem = new Semaphore();
+ static Semaphore stsem;
+ Semaphore *sem = &stsem;//new Semaphore();
// make up a transaction number that is unique (to me!)
/* NOTE: since request+replies are matched up on tid's alone, it means that
long tid = ++last_tid;
m->set_tid(tid);
- cout << "serializer: sendrecv sending " << m << " on tid " << tid << endl;
+ dout(DEBUGLVL) << "sendrecv sending " << m << " on tid " << tid << endl;
// add call records
lock.Lock();
messenger->send_message(m, dest, port, fromport);
// wait
- cout << "serializer: sendrecv waiting for reply on tid " << tid << endl;
+ dout(DEBUGLVL) << "sendrecv waiting for reply on tid " << tid << endl;
+ //cout << "wait start, value = " << sem->Value() << endl;
sem->Wait();
+
// pick up reply
lock.Lock();
Message *reply = call_reply[tid];
call_sem.erase(tid);
lock.Unlock();
- delete sem;
+ dout(DEBUGLVL) << "sendrecv got reply " << reply << " on tid " << tid << endl;
+ //delete sem;
- cout << "serializer: sendrecv got reply " << reply << " on tid " << tid << endl;
return reply;
}
hash_map<int, Logger*> loggers;
LogType fakemsg_logtype;
-
+Mutex lock;
Semaphore sem;
Semaphore shutdownsem;
bool awake = false;
dout(11) << "do_loop top" << endl;
+ lock.Lock();
+
map<int, FakeMessenger*>::iterator it = directory.begin();
while (it != directory.end()) {
Message *m = it->second->get_message();
}
}
+ lock.Unlock();
+
didone = true;
it->second->dispatch(m);
+
+ lock.Lock();
}
it++;
}
+ lock.Unlock();
+
+
if (!didone)
break;
}
+
+
dout(1) << "do_loop end (no more messages)." << endl;
return 0;
}
m->set_source(whoami, fromport);
m->set_dest(dest, port);
+ lock.Lock();
+
// deliver
try {
#ifdef LOG_MESSAGES
awake = true;
sem.Post();
}
+
+ lock.Unlock();
}
int FakeMessenger::wait_message(time_t seconds)
#define NUMOSD g_conf.num_osd
#define MPI_DEST_TO_RANK(dest,world) ((dest)<(NUMMDS+NUMOSD) ? \
(dest) : \
- ((NUMMDS+NUMOSD)+(((dest)-NUMMDS-NUMOSD) % (world-NUMMDS-NUMOSD))))
+ ((NUMMDS+NUMOSD)+(((dest)-NUMMDS-NUMOSD) % ((world)-NUMMDS-NUMOSD))))
class MPIMessenger : public Messenger {
// incoming queue
Message *get_message() {
if (incoming.size() > 0) {
- cout << incoming.size() << " messages, taking first" << endl;
+ //cout << incoming.size() << " messages, taking first" << endl;
Message *m = incoming.front();
incoming.pop_front();
return m;
#include <semaphore.h>
#include <errno.h>
+#include <iostream>
+using namespace std;
class Semaphore
{
sem_t S;
public:
- Semaphore( int init = 0 )
- { sem_init(&S,0,init); }
-
- virtual ~Semaphore()
- { sem_destroy(&S); }
-
- void Wait() const
- { sem_wait((sem_t *)&S); }
+ Semaphore( int init = 0 ) {
+ int r = sem_init(&S,0,init);
+ //cout << "sem_init = " << r << endl;
+ }
+
+ virtual ~Semaphore() {
+ int r = sem_destroy(&S);
+ //cout << "sem_destroy = " << r << endl;
+ }
+
+ void Wait() const {
+ while (1) {
+ int r = sem_wait((sem_t *)&S);
+ if (r == 0) break;
+ cout << "sem_wait returned " << r << ", trying again" << endl;
+ }
+ }
int Wait_Try() const
{ return (sem_trywait((sem_t *)&S)?errno:0); }