#CC=distcc g++
CFLAGS=-g -I. -pg
#CFLAGS=-D __gnu_cxx=std -g -I. #-I/usr/lib/mpi/include -L/usr/lib/mpi/lib
-LIBS=
+LIBS=-lpthread
MPICC=g++
MPICFLAGS=${CFLAGS} -I/usr/lib/mpi/include -L/usr/lib/mpi/mpi_gnu/lib
MPILIBS= -lelan -lmpi ${LIBS}
-#LEAKTRACER=
-LEAKTRACER=$(HOME)/lib/LeakTracer.o
+LEAKTRACER=
+#LEAKTRACER=$(HOME)/lib/LeakTracer.o
SRCS=*.cc */*.cc
OBJS=osd/OSD.o\
pmds: mpitest.cc msg/MPIMessenger.cc pmds.o
${MPICC} ${MPICFLAGS} ${MPILIBS} mpitest.cc msg/MPIMessenger.cc pmds.o -o pmds
+singleclient: pmds.o fakesingleclient.o client/Client.o msg/CheesySerializer.o msg/FakeMessenger.o
+ ${CC} ${CFLAGS} ${LIBS} pmds.o client/Client.o msg/FakeMessenger.o msg/CheesySerializer.o fakesingleclient.o ${LEAKTRACER} -o singleclient
+
clean:
rm *.o */*.o ${TARGETS}
- test marshalling
- symlinks
+- fix logging model for data safety
+ - inodeupdate
+ - dentryunlink
+ - dentrylink
+ - put path to item in log entry
+
+
+
/- export null dentries
+// ceph stuff
#include "Client.h"
+
+#include "msg/CheesySerializer.h"
+
#include "messages/MClientRequest.h"
#include "messages/MClientReply.h"
+// unix-ey fs stuff
+#include <sys/types.h>
+#include <utime.h>
+
+
// cons/des
Client::Client(MDCluster *mdc, int id, Messenger *m)
mdcluster = mdc;
whoami = id;
messenger = m;
+ serial_messenger = new CheesySerializer(m, this);
+
all_files_closed = false;
tid = 0;
root = 0;
Client::~Client()
{
if (messenger) { delete messenger; messenger = 0; }
+ if (serial_messenger) { delete serial_messenger; serial_messenger = 0; }
}
{
MClientRequest *req = new MClientRequest(tid++, MDS_OP_STAT, whoami);
MClientReply *reply;
- inode_t inode;
- vector<c_inode_info*> *trace;
- req->set_path(string(path)); //FIXME correct construction of string?
+ req->set_path(path);
// FIXME where does FUSE maintain user information
req->set_caller_uid(getuid());
req->set_caller_gid(getgid());
- reply = messenger->sendrecv(req);
+ reply = (MClientReply*)serial_messenger->sendrecv(req, MSG_ADDR_MDS(0));
- res = reply->get_result();
+ int res = reply->get_result();
if (res != 0) return res;
//Transfer information from reply to stbuf
- trace = reply->get_trace();
- inode = trace[trace.size()-1]->inode;
+ vector<c_inode_info*> trace = reply->get_trace();
+ inode_t inode = trace[trace.size()-1]->inode;
//stbuf->st_dev =
- stbuf->st_ino = inode->ino;
- stbuf->st_mode = inode->mode;
+ stbuf->st_ino = inode.ino;
+ stbuf->st_mode = inode.mode;
//stbuf->st_nlink =
- stbuf->st_uid = inode->uid;
- stbuf->st_gid = inode->gid;
- stbuf->st_ctime = inode->ctime;
- stbuf->st_atime = inode->atime;
- stbuf->st_mtime = inode->mtime;
- stbuf->st_size = (off_t) inode->size; //FIXME off_t is signed 64 vs size is unsigned 64
+ stbuf->st_uid = inode.uid;
+ stbuf->st_gid = inode.gid;
+ stbuf->st_ctime = inode.ctime;
+ stbuf->st_atime = inode.atime;
+ stbuf->st_mtime = inode.mtime;
+ stbuf->st_size = (off_t) inode.size; //FIXME off_t is signed 64 vs size is unsigned 64
//stbuf->st_blocks =
//stbuf->st_blksize =
//stbuf->st_flags =
MClientRequest *req = new MClientRequest(tid++, MDS_OP_CHMOD, whoami);
MClientReply *reply;
- req->set_path(string(path)); //FIXME correct construction of string?
+ req->set_path(path); //FIXME correct construction of string?
// FIXME where does FUSE maintain user information
req->set_caller_uid(getuid());
req->set_caller_gid(getgid());
- req->set_iarg = (int) mode;
+ req->set_iarg( (int)mode );
- reply = messenger->sendrecv(req);
+ reply = (MClientReply*)serial_messenger->sendrecv(req, MSG_ADDR_MDS(0));
return reply->get_result();
}
MClientRequest *req = new MClientRequest(tid++, MDS_OP_CHOWN, whoami);
MClientReply *reply;
- req->set_path(string(path)); //FIXME correct construction of string?
+ req->set_path(path); //FIXME correct construction of string?
// FIXME where does FUSE maintain user information
req->set_caller_uid(getuid());
//FIXME enforce caller uid rights?
- req->set_iarg = (int) uid;
- req->set_iarg2 = (int) gid;
+ req->set_iarg( (int)uid );
+ req->set_iarg2( (int)gid );
- reply = messenger->sendrecv(req);
+ reply = (MClientReply*)serial_messenger->sendrecv(req, MSG_ADDR_MDS(0));
return reply->get_result();
}
MClientRequest *req = new MClientRequest(tid++, MDS_OP_UTIME, whoami);
MClientReply *reply;
- req->set_path(string(path)); //FIXME correct construction of string?
+ req->set_path(path); //FIXME correct construction of string?
// FIXME where does FUSE maintain user information
req->set_caller_uid(getuid());
//FIXME enforce caller uid rights?
- req->set_targ = utimbuf->modtime;
- req->set_targ2 = utimbuf->actime;
+ req->set_targ( buf->modtime );
+ req->set_targ2( buf->actime );
- reply = messenger->sendrecv(req);
+ reply = (MClientReply*)serial_messenger->sendrecv(req, MSG_ADDR_MDS(0));
return reply->get_result();
}
//
// getdir
+/*
typedef int (*fuse_dirfil_t) (fuse_dirh_t h, const char *name, int type,
ino_t ino);
+*/
#ifndef __CLIENT_H
#define __CLIENT_H
+#include "mds/MDCluster.h"
+
#include "msg/Message.h"
-#include "msgthread.h"
+#include "msg/Dispatcher.h"
+#include "msg/SerialMessenger.h"
+
+//#include "msgthread.h"
#include "include/types.h"
+#include "include/lru.h"
+// stl
#include <set>
#include <map>
using namespace std;
*/
class Dentry;
+class Inode;
class Dir {
public:
Inode *parent_inode; // my inode
hash_map< string, Dentry* > dentries;
- Dir(Inode* in) { inode = in; }
+ Dir(Inode* in) { parent_inode = in; }
};
class Inode {
public:
- inodeno_t inode;
+ inode_t inode; // the actual inode
time_t last_updated;
int ref; // ref count. 1 for each dentry, fh or dir that links to me
Dir *dir; // if i'm a dir.
+ Dentry *dn; // if i'm linked to a dentry.
void get() { ref++; }
void put() { ref--; assert(ref >= 0); }
// ========================================================
// client interface
-class Client {
+class Client : public Dispatcher {
protected:
MDCluster *mdcluster;
Messenger *messenger;
+ SerialMessenger *serial_messenger;
long tid;
int whoami;
bool all_files_closed;
LRU lru; // lru list of Dentry's in our local metadata cache.
// file handles
- map<fh_t, Fh*> fh_map;
+ map<fileh_t, Fh*> fh_map;
// global semaphore/mutex protecting cache+fh structures
// link to inode
dn->inode = in;
+ in->dn = dn;
in->get();
+ lru.lru_insert_mid(dn); // mid or top?
return dn;
}
void unlink(Dentry *dn) {
+ Inode *in = dn->inode;
+
+ // unlink from inode
dn->inode = 0;
+ in->dn = 0;
in->put();
// unlink from dir
delete dn->dir;
}
dn->dir = 0;
+
+ // delete den
+ lru.lru_remove(dn);
delete dn;
}
Client(MDCluster *mdc, int id, Messenger *m);
~Client();
+ // messaging
+ void dispatch(Message *m) {
+ cout << "dispatch not implemented" << endl;
+ }
+
+
// ----------------------
// fs ops.
// these shoud (more or less) mirror the actual system calls.
// namespace ops
//?int getdir(const char *path, fuse_dirh_t h, fuse_dirfil_t filler);
- int link(const char *existing, const char *new);
+ int link(const char *existing, const char *newname);
int unlink(const char *path);
int rename(const char *from, const char *to);
// symlinks
int readlink(const char *path, char *buf, size_t size);
- int symlink(const char *existing, const char *new);
+ int symlink(const char *existing, const char *newname);
// inode stuff
int lstat(const char *path, struct stat *stbuf);
// file ops
int mknod(const char *path, mode_t mode);
int open(const char *path, int mode);
- int read(fh_t fh, char *buf, size_t size, off_t offset);
- int write(fh_t fh, const char *buf, size_t size, off_t offset);
- int truncate(fh_t fh, off_t size);
- int fsync(fh_t fh);
+ int read(fileh_t fh, char *buf, size_t size, off_t offset);
+ int write(fileh_t fh, const char *buf, size_t size, off_t offset);
+ int truncate(fileh_t fh, off_t size);
+ int fsync(fileh_t fh);
};
--- /dev/null
+/////////////////////////////////////////////////////////////////////
+// Written by Phillip Sitbon
+// Copyright 2003
+//
+// Posix/Mutex.h
+// - Resource locking mechanism using Posix mutexes
+//
+/////////////////////////////////////////////////////////////////////
+
+#ifndef _Mutex_Posix_
+#define _Mutex_Posix_
+
+#include <pthread.h>
+
+class Mutex
+{
+ mutable pthread_mutex_t M;
+ void operator=(Mutex &M) {}
+ Mutex( const Mutex &M ) {}
+
+ public:
+
+ Mutex()
+ {
+ pthread_mutexattr_t attr;
+ pthread_mutexattr_init(&attr);
+ pthread_mutexattr_settype(&attr,PTHREAD_MUTEX_RECURSIVE);
+ pthread_mutex_init(&M,&attr);
+ pthread_mutexattr_destroy(&attr);
+ }
+
+ virtual ~Mutex()
+ { pthread_mutex_unlock(&M); pthread_mutex_destroy(&M); }
+
+ int Lock() const
+ { return pthread_mutex_lock(&M); }
+
+ int Lock_Try() const
+ { return pthread_mutex_trylock(&M); }
+
+ int Unlock() const
+ { return pthread_mutex_unlock(&M); }
+};
+
+#endif // !_Mutex_Posix_
client_op_statfs: false,
- client_op_stat: 10,
+ client_op_stat: 100,
client_op_lstat: false,
client_op_utime: 10, // untested
client_op_chmod: 10,
virtual char *get_type_name() { return "creq"; }
void set_path(string& p) { path.set_path(p); }
+ void set_path(const char *p) { path.set_path(p); }
void set_caller_uid(int u) { st.caller_uid = u; }
void set_caller_gid(int g) { st.caller_uid = g; }
void set_ino(inodeno_t ino) { st.ino = ino; }
--- /dev/null
+
+#include "CheesySerializer.h"
+#include "Message.h"
+#include <iostream>
+using namespace std;
+
+
+void CheesySerializer::dispatch(Message *m)
+{
+ // i better be expecting it
+ assert(waiting_for_reply);
+
+ cout << "dispatch got " << reply << ", waking up waiter" << endl;
+ reply = m;
+ waiter.Post();
+}
+
+
+void CheesySerializer::send(Message *m, msg_addr_t dest, int port, int fromport)
+{
+ messenger->send_message(m, dest, port, fromport);
+}
+
+Message *CheesySerializer::sendrecv(Message *m, msg_addr_t dest, int port, int fromport)
+{
+ messenger->send_message(m, dest, port, fromport);
+ waiting_for_reply = true;
+ waiter.Wait();
+ return reply;
+}
+
+
+// thread crap
+void *cheesyserializer_starter(void *pthis)
+{
+ CheesySerializer *pt = (CheesySerializer*)pthis;
+ pt->message_thread();
+}
+
+void CheesySerializer::message_thread()
+{
+
+}
--- /dev/null
+
+#include "SerialMessenger.h"
+#include "Messenger.h"
+#include "Dispatcher.h"
+
+#include "Semaphore.h"
+
+
+class CheesySerializer : public SerialMessenger {
+ // my state, whatever
+ Messenger *messenger; // this is how i communicate
+ Dispatcher *dispatcher; // this is where i send unsolicited messages
+
+ bool waiting_for_reply;
+ Message *reply;
+ Semaphore waiter;
+
+ public:
+ CheesySerializer(Messenger *msg, Dispatcher *me) {
+ this->messenger = msg;
+ this->dispatcher = me;
+ waiting_for_reply = false;
+ reply = 0;
+ }
+
+ // i receive my messages here
+ void dispatch(Message *m);
+
+ // my stuff
+ void start(); // start my thread
+ void message_thread();
+
+ void send(Message *m, msg_addr_t dest, int port=0, int fromport=0); // doesn't block
+ Message *sendrecv(Message *m, msg_addr_t dest, int port=0, int fromport=0); // blocks for matching reply
+};
+
using namespace std;
+#include "Semaphore.h"
+#include "Mutex.h"
+#include <pthread.h>
#include "include/config.h"
hash_map<int, Logger*> loggers;
LogType fakemsg_logtype;
+
+Semaphore sem;
+Semaphore shutdownsem;
+bool awake = false;
+bool shutdown = false;
+pthread_t thread_id;
+
+void *fakemessenger_thread(void *ptr)
+{
+ dout(1) << "thread start" << endl;
+
+ while (1) {
+ awake = false;
+ sem.Wait();
+
+ if (shutdown) break;
+ fakemessenger_do_loop();
+ }
+
+ dout(1) << "thread finish (i woke up but no messages, bye)" << endl;
+ shutdownsem.Post();
+}
+
+
+void fakemessenger_startthread() {
+ pthread_create(&thread_id, NULL, fakemessenger_thread, 0);
+}
+
+void fakemessenger_stopthread() {
+ shutdown = true;
+ sem.Post();
+ shutdownsem.Wait();
+}
+
+
+
+
// lame main looper
int fakemessenger_do_loop()
{
dout(1) << "do_loop begin." << endl;
+
while (1) {
bool didone = false;
break;
}
dout(1) << "do_loop end (no more messages)." << endl;
+ return 0;
}
cout << "no destination " << dest << endl;
assert(0);
}
+
+ // wake up loop?
+ if (!awake) {
+ dout(1) << "waking up fakemessenger" << endl;
+ awake = true;
+ sem.Post();
+ }
}
int FakeMessenger::wait_message(time_t seconds)
int whoami;
class Logger *logger;
-
+
public:
FakeMessenger(long me);
~FakeMessenger();
};
-
int fakemessenger_do_loop();
+void fakemessenger_startthread();
+void fakemessenger_stopthread();
+
+
#endif
--- /dev/null
+
+#ifndef _RWLock_Posix_
+#define _RWLock_Posix_
+
+#include <pthread.h>
+
+class RWLock
+{
+ mutable pthread_rwlock_t L;
+
+ public:
+
+ RWLock() {
+ pthread_rwlock_init(&L, NULL);
+ }
+
+ virtual ~RWLock() {
+ pthread_rwlock_unlock(&L);
+ pthread_rwlock_destroy(&L);
+ }
+
+ void unlock() {
+ pthread_rwlock_unlock(&L);
+ }
+ void get_read() {
+ pthread_rwlock_rdlock(&L);
+ }
+ void put_read() { unlock(); }
+ void get_write() {
+ pthread_rwlock_wrlock(&L);
+ }
+ void put_write() { unlock(); }
+};
+
+#endif // !_Mutex_Posix_
--- /dev/null
+/////////////////////////////////////////////////////////////////////
+// Written by Phillip Sitbon
+// Copyright 2003
+//
+// Posix/Semaphore.h
+// - Resource counting mechanism
+//
+/////////////////////////////////////////////////////////////////////
+
+#ifndef _Semaphore_Posix_
+#define _Semaphore_Posix_
+
+#include <semaphore.h>
+#include <errno.h>
+
+class Semaphore
+{
+ sem_t S;
+
+ public:
+ Semaphore( int init = 0 )
+ { sem_init(&S,0,init); }
+
+ virtual ~Semaphore()
+ { sem_destroy(&S); }
+
+ void Wait() const
+ { sem_wait((sem_t *)&S); }
+
+ int Wait_Try() const
+ { return (sem_trywait((sem_t *)&S)?errno:0); }
+
+ int Post() const
+ { return (sem_post((sem_t *)&S)?errno:0); }
+
+ int Value() const
+ { int V = -1; sem_getvalue((sem_t *)&S,&V); return V; }
+
+ void Reset( int init = 0 )
+ { sem_destroy(&S); sem_init(&S,0,init); }
+};
+
+#endif // !_Semaphore_Posix_
--- /dev/null
+#ifndef __SERIAL_MESSENGER_H
+#define __SERIAL_MESSENGER_H
+
+#include "Dispatcher.h"
+#include "Message.h"
+
+class SerialMessenger : public Dispatcher {
+ public:
+ virtual void dispatch(Message *m) = 0; // i receive my messages here
+ virtual void send(Message *m, msg_addr_t dest, int port=0, int fromport=0) = 0; // doesn't block
+ virtual Message *sendrecv(Message *m, msg_addr_t dest, int port=0, int fromport=0) = 0; // blocks for matching reply
+};
+
+#endif