From ebedbaf652eb4b190bc18e3c4a6e1a5f749a5e4b Mon Sep 17 00:00:00 2001 From: sage Date: Tue, 3 May 2005 05:55:14 +0000 Subject: [PATCH] *** empty log message *** git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@192 29311d96-e01e-0410-9327-a35deaab8ce9 --- ceph/Makefile | 7 +++-- ceph/client/Client.cc | 35 +++++++++++++------------ ceph/client/Client.h | 5 ++-- ceph/client/fuse.cc | 6 ++--- ceph/client/fuse.h | 9 +++++++ ceph/messages/MClientReply.h | 5 ++++ ceph/messages/MClientRequest.h | 11 +++++--- ceph/msg/CheesySerializer.cc | 47 ++++++++++++++++++---------------- ceph/msg/CheesySerializer.h | 21 ++++++++------- ceph/msg/FakeMessenger.h | 3 +++ ceph/msg/MPIMessenger.cc | 2 +- ceph/msg/Message.h | 6 ++--- ceph/msg/Messenger.h | 9 +++---- 13 files changed, 95 insertions(+), 71 deletions(-) create mode 100644 ceph/client/fuse.h diff --git a/ceph/Makefile b/ceph/Makefile index 4c0bcdea1113f..cb419aacd04ae 100644 --- a/ceph/Makefile +++ b/ceph/Makefile @@ -10,7 +10,7 @@ CC = g++ MPICC = mpicxx CFLAGS = -g -I. -pg -D_FILE_OFFSET_BITS=64 -DMPICH_IGNORE_CXX_SEEK -D_REENTRANT -D_THREAD_SAFE -LIBS = -lpthread +LIBS = -lpthread -lfuse MDS_OBJS= \ mds/MDBalancer.o\ @@ -57,12 +57,15 @@ msg/FakeMessenger.o ${COMMON_OBJS} fakemds: test/fakemds.cc msg/FakeMessenger.o fakeclient/FakeClient.o osd/OSD.o mds/mds.o ${COMMON_OBJS} ${CC} ${CFLAGS} ${LIBS} $^ -o $@ -mpitest: mpitest.cc msg/MPIMessenger.o mds/mds.o osd/OSD.o fakeclient/FakeClient.o ${COMMON_OBJS} +mpitest: test/mpitest.o msg/MPIMessenger.cc mds/mds.o osd/OSD.o fakeclient/FakeClient.o ${COMMON_OBJS} ${MPICC} ${CFLAGS} $^ -o $@ mttest: test/mttest.cc msg/MTMessenger.cc ${COMMON_OBJS} ${MPICC} ${CFLAGS} ${LIBS} $^ -o $@ +mpifuse: mpifuse.cc mds/mds.o client/Client.o osd/OSD.o client/fuse.o msg/MPIMessenger.cc ${COMMON_OBJS} + ${MPICC} ${CFLAGS} ${LIBS} $^ -o $@ + clean: rm -f *.o */*.o ${TARGETS} diff --git a/ceph/client/Client.cc b/ceph/client/Client.cc index b62983f0b3aa1..f715178efc02f 100644 --- a/ceph/client/Client.cc +++ b/ceph/client/Client.cc @@ -27,9 +27,8 @@ Client::Client(MDCluster *mdc, int id, Messenger *m) whoami = id; // set up messengers - raw_messenger = m; - serial_messenger = new CheesySerializer(raw_messenger, this); - raw_messenger->set_dispatcher(serial_messenger); + messenger = m; + messenger->set_dispatcher(this); all_files_closed = false; root = 0; @@ -40,14 +39,16 @@ Client::Client(MDCluster *mdc, int id, Messenger *m) Client::~Client() { - if (serial_messenger) { delete serial_messenger; serial_messenger = 0; } - if (raw_messenger) { delete raw_messenger; raw_messenger = 0; } + if (messenger) { delete messenger; messenger = 0; } } void Client::init() { - // incoming message go through serializer - //messenger->set_dispatcher(serial_messenger); + +} + +void Client::shutdown() { + } // ------------------- @@ -67,7 +68,7 @@ int Client::unlink(const char *path) //FIXME enforce caller uid rights? - MClientReply *reply = (MClientReply*)serial_messenger->sendrecv(req, MSG_ADDR_MDS(0), MDS_PORT_SERVER); + MClientReply *reply = (MClientReply*)messenger->sendrecv(req, MSG_ADDR_MDS(0), MDS_PORT_SERVER); int res = reply->get_result(); delete reply; dout(10) << "unlink result is " << res << endl; @@ -86,7 +87,7 @@ int Client::rename(const char *from, const char *to) //FIXME enforce caller uid rights? - MClientReply *reply = (MClientReply*)serial_messenger->sendrecv(req, MSG_ADDR_MDS(0), MDS_PORT_SERVER); + MClientReply *reply = (MClientReply*)messenger->sendrecv(req, MSG_ADDR_MDS(0), MDS_PORT_SERVER); int res = reply->get_result(); delete reply; dout(10) << "rename result is " << res << endl; @@ -107,7 +108,7 @@ int Client::mkdir(const char *path, mode_t mode) //FIXME enforce caller uid rights? - MClientReply *reply = (MClientReply*)serial_messenger->sendrecv(req, MSG_ADDR_MDS(0), MDS_PORT_SERVER); + MClientReply *reply = (MClientReply*)messenger->sendrecv(req, MSG_ADDR_MDS(0), MDS_PORT_SERVER); int res = reply->get_result(); delete reply; dout(10) << "mkdir result is " << res << endl; @@ -125,7 +126,7 @@ int Client::rmdir(const char *path) //FIXME enforce caller uid rights? - MClientReply *reply = (MClientReply*)serial_messenger->sendrecv(req, MSG_ADDR_MDS(0), MDS_PORT_SERVER); + MClientReply *reply = (MClientReply*)messenger->sendrecv(req, MSG_ADDR_MDS(0), MDS_PORT_SERVER); int res = reply->get_result(); delete reply; dout(10) << "rmdir result is " << res << endl; @@ -147,7 +148,7 @@ int Client::symlink(const char *target, const char *link) //FIXME enforce caller uid rights? - MClientReply *reply = (MClientReply*)serial_messenger->sendrecv(req, MSG_ADDR_MDS(0), MDS_PORT_SERVER); + MClientReply *reply = (MClientReply*)messenger->sendrecv(req, MSG_ADDR_MDS(0), MDS_PORT_SERVER); int res = reply->get_result(); delete reply; dout(10) << "symlink result is " << res << endl; @@ -168,7 +169,7 @@ int Client::lstat(const char *path, struct stat *stbuf) req->set_caller_uid(getuid()); req->set_caller_gid(getgid()); - MClientReply *reply = (MClientReply*)serial_messenger->sendrecv(req, MSG_ADDR_MDS(0), MDS_PORT_SERVER); + MClientReply *reply = (MClientReply*)messenger->sendrecv(req, MSG_ADDR_MDS(0), MDS_PORT_SERVER); int res = reply->get_result(); dout(10) << "lstat res is " << res << endl; if (res != 0) return res; @@ -206,7 +207,7 @@ int Client::chmod(const char *path, mode_t mode) req->set_caller_uid(getuid()); req->set_caller_gid(getgid()); - MClientReply *reply = (MClientReply*)serial_messenger->sendrecv(req, MSG_ADDR_MDS(0), MDS_PORT_SERVER); + MClientReply *reply = (MClientReply*)messenger->sendrecv(req, MSG_ADDR_MDS(0), MDS_PORT_SERVER); int res = reply->get_result(); delete reply; dout(10) << "chmod result is " << res << endl; @@ -226,7 +227,7 @@ int Client::chown(const char *path, uid_t uid, gid_t gid) //FIXME enforce caller uid rights? - MClientReply *reply = (MClientReply*)serial_messenger->sendrecv(req, MSG_ADDR_MDS(0), MDS_PORT_SERVER); + MClientReply *reply = (MClientReply*)messenger->sendrecv(req, MSG_ADDR_MDS(0), MDS_PORT_SERVER); int res = reply->get_result(); delete reply; dout(10) << "chown result is " << res << endl; @@ -246,7 +247,7 @@ int Client::utime(const char *path, struct utimbuf *buf) //FIXME enforce caller uid rights? - MClientReply *reply = (MClientReply*)serial_messenger->sendrecv(req, MSG_ADDR_MDS(0), MDS_PORT_SERVER); + MClientReply *reply = (MClientReply*)messenger->sendrecv(req, MSG_ADDR_MDS(0), MDS_PORT_SERVER); int res = reply->get_result(); delete reply; dout(10) << "utime result is " << res << endl; @@ -273,7 +274,7 @@ int Client::getdir(const char *path, map& contents) //FIXME enforce caller uid rights? - MClientReply *reply = (MClientReply*)serial_messenger->sendrecv(req, MSG_ADDR_MDS(0), MDS_PORT_SERVER); + MClientReply *reply = (MClientReply*)messenger->sendrecv(req, MSG_ADDR_MDS(0), MDS_PORT_SERVER); int res = reply->get_result(); // dir contents to caller! diff --git a/ceph/client/Client.h b/ceph/client/Client.h index d8af089675746..55ba971546e70 100644 --- a/ceph/client/Client.h +++ b/ceph/client/Client.h @@ -96,8 +96,7 @@ typedef int (*dirfillerfunc_t) (void *handle, const char *name, int type, inoden class Client : public Dispatcher { protected: MDCluster *mdcluster; - Messenger *raw_messenger; - SerialMessenger *serial_messenger; + Messenger *messenger; int whoami; bool all_files_closed; @@ -193,10 +192,12 @@ class Client : public Dispatcher { ~Client(); void init(); + void shutdown(); // messaging void dispatch(Message *m) { cout << "dispatch not implemented" << endl; + assert(0); } diff --git a/ceph/client/fuse.cc b/ceph/client/fuse.cc index 9f92fb0e39a93..153e73d9766f4 100644 --- a/ceph/client/fuse.cc +++ b/ceph/client/fuse.cc @@ -213,12 +213,10 @@ static struct fuse_operations ceph_oper = { fsync: ceph_fsync }; -int main(int argc, char *argv[]) +int ceph_fuse_main(Client *c, int argc, char *argv[]) { // init client - - // ** - + client = c; // set up fuse argc/argv int newargc = 0; diff --git a/ceph/client/fuse.h b/ceph/client/fuse.h new file mode 100644 index 0000000000000..6de309f2d2715 --- /dev/null +++ b/ceph/client/fuse.h @@ -0,0 +1,9 @@ + +/* ceph_fuse_main + * - start up fuse glue, attached to Client* cl. + * - argc, argv should include a mount point, and + * any weird fuse options you want. by default, + * we will put fuse in the foreground so that it + * won't fork and we can see stdout. + */ +int ceph_fuse_main(Client *cl, int argc, char *argv[]); diff --git a/ceph/messages/MClientReply.h b/ceph/messages/MClientReply.h index 324f88ba877cd..4bb9019cb1252 100644 --- a/ceph/messages/MClientReply.h +++ b/ceph/messages/MClientReply.h @@ -38,6 +38,7 @@ typedef struct { } c_inode_info; typedef struct { + long pcid; long tid; int op; int result; // error code @@ -54,6 +55,9 @@ class MClientReply : public Message { vector dir_contents; public: + void set_pcid(long pcid) { this->st.pcid = pcid; } + long get_pcid() { return st.pcid; } + long get_tid() { return st.tid; } int get_op() { return st.op; } inodeno_t get_ino() { return trace[trace.size()-1]->inode.ino; } @@ -67,6 +71,7 @@ class MClientReply : public Message { MClientReply() {}; MClientReply(MClientRequest *req, int result = 0) : Message(MSG_CLIENT_REPLY) { + this->st.pcid = req->get_pcid(); // match up procedure call id!!! this->st.tid = req->get_tid(); this->st.op = req->get_op(); this->path = req->get_path(); diff --git a/ceph/messages/MClientRequest.h b/ceph/messages/MClientRequest.h index 91477133f06eb..8870052d960a7 100644 --- a/ceph/messages/MClientRequest.h +++ b/ceph/messages/MClientRequest.h @@ -13,6 +13,7 @@ * can be forwarded around between MDS's. * * int client - the originating client + * long pcid - procedure call id, used to match request+response. * long tid - transaction id, unique among requests for that client. probably just a counter! * -> the MDS passes the Request to the Reply constructor, so this always matches. * @@ -33,6 +34,7 @@ typedef struct { + long pcid; // procedure call id, to match request+response long tid; int client; int op; @@ -61,10 +63,12 @@ class MClientRequest : public Message { } virtual char *get_type_name() { return "creq"; } - void set_tid(long tid) { - this->st.tid = tid; - } + // keep a pcid (procedure call id) to match up request+reply + void set_pcid(long pcid) { this->st.pcid = pcid; } + long get_pcid() { return st.pcid; } + // normal fields + void set_tid(long t) { st.tid = t; } void set_path(string& p) { path.set_path(p); } void set_path(const char *p) { path.set_path(p); } void set_caller_uid(int u) { st.caller_uid = u; } @@ -118,6 +122,7 @@ inline ostream& operator<<(ostream& out, MClientRequest& req) { out << &req << " "; out << "client" << req.get_client() << "." << req.get_tid() + << ".pcid=" << req.get_pcid() << ":"; switch(req.get_op()) { case MDS_OP_STAT: diff --git a/ceph/msg/CheesySerializer.cc b/ceph/msg/CheesySerializer.cc index 6d01c8db6567a..cffd08ad9ba3d 100644 --- a/ceph/msg/CheesySerializer.cc +++ b/ceph/msg/CheesySerializer.cc @@ -17,22 +17,22 @@ using namespace std; void CheesySerializer::dispatch(Message *m) { - long tid = m->get_tid(); + long pcid = m->get_pcid(); lock.Lock(); // was i expecting it? - if (call_sem.count(tid)) { + if (call_sem.count(pcid)) { // yes, this is a reply to a pending call. - dout(DEBUGLVL) << "dispatch got reply for " << tid << " " << m << endl; - call_reply[tid] = m; // set reply - int r = call_sem[tid]->Post(); + dout(DEBUGLVL) << "dispatch got reply for " << pcid << " " << m << endl; + call_reply[pcid] = m; // set reply + int r = call_sem[pcid]->Post(); //cout << "post = " << r << endl; lock.Unlock(); } else { // no, this is an unsolicited message. lock.Unlock(); - dout(DEBUGLVL) << "dispatch got unsolicited message" << m << endl; + dout(DEBUGLVL) << "dispatch got unsolicited message pcid " << pcid << " m " << m << endl; dispatcher->dispatch(m); } } @@ -41,55 +41,58 @@ void CheesySerializer::dispatch(Message *m) // --------- // outgoing messages -void CheesySerializer::send(Message *m, msg_addr_t dest, int port, int fromport) +int CheesySerializer::send_message(Message *m, msg_addr_t dest, int port, int fromport) { // just pass it on to the messenger dout(DEBUGLVL) << "send " << m << endl; + m->set_pcid(0); messenger->send_message(m, dest, port, fromport); } -Message *CheesySerializer::sendrecv(Message *m, msg_addr_t dest, int port, int fromport) +Message *CheesySerializer::sendrecv(Message *m, msg_addr_t dest, int port) { + int fromport = 0; + static Semaphore stsem; Semaphore *sem = &stsem;//new Semaphore(); - // make up a transaction number that is unique (to me!) - /* NOTE: since request+replies are matched up on tid's alone, it means that + // make up a pcid that is unique (to me!) + /* NOTE: since request+replies are matched up on pcid's alone, it means that two nodes using this mechanism can't do calls of each other or else their - tid's might overlap. + pcid's might overlap. This should be fine. only the Client uses this so far (not MDS). If OSDs want to use this, though, this must be made smarter!!! */ - long tid = ++last_tid; - m->set_tid(tid); + long pcid = ++last_pcid; + m->set_pcid(pcid); - dout(DEBUGLVL) << "sendrecv sending " << m << " on tid " << tid << endl; + dout(DEBUGLVL) << "sendrecv sending " << m << " on pcid " << pcid << endl; // add call records lock.Lock(); - assert(call_sem.count(tid) == 0); // tid should be UNIQUE - call_sem[tid] = sem; - call_reply[tid] = 0; // no reply yet + assert(call_sem.count(pcid) == 0); // pcid should be UNIQUE + call_sem[pcid] = sem; + call_reply[pcid] = 0; // no reply yet lock.Unlock(); // send messenger->send_message(m, dest, port, fromport); // wait - dout(DEBUGLVL) << "sendrecv waiting for reply on tid " << tid << endl; + dout(DEBUGLVL) << "sendrecv waiting for reply on pcid " << pcid << endl; //cout << "wait start, value = " << sem->Value() << endl; sem->Wait(); // pick up reply lock.Lock(); - Message *reply = call_reply[tid]; + Message *reply = call_reply[pcid]; assert(reply); - call_reply.erase(tid); // remove from call map - call_sem.erase(tid); + call_reply.erase(pcid); // remove from call map + call_sem.erase(pcid); lock.Unlock(); - dout(DEBUGLVL) << "sendrecv got reply " << reply << " on tid " << tid << endl; + dout(DEBUGLVL) << "sendrecv got reply " << reply << " on pcid " << pcid << endl; //delete sem; return reply; diff --git a/ceph/msg/CheesySerializer.h b/ceph/msg/CheesySerializer.h index 7e69bcd71333c..9a9320995721d 100644 --- a/ceph/msg/CheesySerializer.h +++ b/ceph/msg/CheesySerializer.h @@ -4,7 +4,7 @@ #include "Dispatcher.h" #include "Message.h" -#include "SerialMessenger.h" +#include "Messenger.h" #include "Semaphore.h" #include "Mutex.h" @@ -12,33 +12,32 @@ #include using namespace std; -class CheesySerializer : public SerialMessenger { +class CheesySerializer : public Messenger, + public Dispatcher { protected: - long last_tid; + long last_pcid; - // my state, whatever Messenger *messenger; // this is how i communicate - Dispatcher *dispatcher; // this is where i send unsolicited messages Mutex lock; // protect call_sem, call_reply map call_sem; map call_reply; public: - CheesySerializer(Messenger *msg, Dispatcher *me) { + CheesySerializer(Messenger *msg) { this->messenger = msg; - this->dispatcher = me; - last_tid = 1; + last_pcid = 1; } + int shutdown() {} // incoming messages void dispatch(Message *m); // outgoing messages - void send(Message *m, msg_addr_t dest, - int port=0, int fromport=0); // doesn't block + int send_message(Message *m, msg_addr_t dest, + int port=0, int fromport=0); // doesn't block Message *sendrecv(Message *m, msg_addr_t dest, - int port=0, int fromport=0); // blocks for matching reply + int port=0); // blocks for matching reply }; #endif diff --git a/ceph/msg/FakeMessenger.h b/ceph/msg/FakeMessenger.h index bb53622d7afcd..e455b98ec5232 100644 --- a/ceph/msg/FakeMessenger.h +++ b/ceph/msg/FakeMessenger.h @@ -22,6 +22,9 @@ class FakeMessenger : public Messenger { // msg interface virtual int send_message(Message *m, msg_addr_t dest, int port=0, int fromport=0); + + // use CheesySerializer for now! + virtual Message* sendrecv(Message *m, msg_addr_t dest, int port=0) { assert(0); }; }; int fakemessenger_do_loop(); diff --git a/ceph/msg/MPIMessenger.cc b/ceph/msg/MPIMessenger.cc index fe106e352c726..6cc7f91d62919 100644 --- a/ceph/msg/MPIMessenger.cc +++ b/ceph/msg/MPIMessenger.cc @@ -244,7 +244,7 @@ void mpimessenger_wait() * MPIMessenger implementation */ -MPIMessenger::MPIMessenger(long myaddr)// : Messenger() +MPIMessenger::MPIMessenger(msg_addr_t myaddr) : Messenger() { // my address this->myaddr = myaddr; diff --git a/ceph/msg/Message.h b/ceph/msg/Message.h index c37fc2a7a738d..f5be284523a0c 100644 --- a/ceph/msg/Message.h +++ b/ceph/msg/Message.h @@ -125,9 +125,9 @@ class Message { virtual ~Message() {} - // for rpc-type procedural messages - virtual long get_tid() { return 0; } - virtual void set_tid(long t) { assert(0); } // overload me + // for rpc-type procedural messages (pcid = procedure call id) + virtual long get_pcid() { return 0; } + virtual void set_pcid(long t) { assert(0); } // overload me // ENVELOPE ---- diff --git a/ceph/msg/Messenger.h b/ceph/msg/Messenger.h index 96c60bb6f026f..6ebefdfda4ce5 100644 --- a/ceph/msg/Messenger.h +++ b/ceph/msg/Messenger.h @@ -16,14 +16,11 @@ class Messenger { list incoming; // incoming queue public: - /*Messenger() : dispatcher(0) { } - virtual ~Messenger() { - remove_dispatcher(); - } - */ + Messenger() : dispatcher(0) { } // administrative void set_dispatcher(Dispatcher *d) { dispatcher = d; } + virtual int shutdown() = 0; // -- message interface @@ -31,7 +28,7 @@ class Messenger { virtual int send_message(Message *m, msg_addr_t dest, int port=0, int fromport=0) = 0; // make a procedure call - virtual Message* sendrecv(Message *m, msg_addr_t dest, int port=0) { }; + virtual Message* sendrecv(Message *m, msg_addr_t dest, int port=0) = 0; // wait (block) for a message, or timeout. Don't return message yet. //virtual int wait_message(time_t seconds) = 0; -- 2.39.5