From b247412b6f60315a93664df49d15c2e0d12b9afc Mon Sep 17 00:00:00 2001 From: sage Date: Mon, 2 Aug 2004 20:52:52 +0000 Subject: [PATCH] *** empty log message *** git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@60 29311d96-e01e-0410-9327-a35deaab8ce9 --- ceph/Makefile | 4 +- ceph/client/Client.cc | 9 ++++ ceph/client/Client.h | 2 + ceph/config.h | 8 +-- ceph/mds/CDir.cc | 22 ++++---- ceph/mds/CInode.h | 13 +++-- ceph/mds/MDBalancer.cc | 2 +- ceph/mds/MDCache.cc | 2 + ceph/mds/MDS.cc | 18 +++++++ ceph/mds/MDS.h | 2 + ceph/mds/events/EInodeUpdate.h | 13 +++-- ceph/msg/MPIMessenger.cc | 99 ++++++++++++++++++++++++---------- ceph/msg/MPIMessenger.h | 1 + ceph/msg/Message.h | 3 ++ ceph/msg/Messenger.cc | 1 + ceph/msg/Messenger.h | 2 + ceph/osd/OSD.cc | 7 +-- ceph/test/fakemds.cc | 5 +- ceph/test/mpitest.cc | 2 +- 19 files changed, 155 insertions(+), 60 deletions(-) diff --git a/ceph/Makefile b/ceph/Makefile index 03f66bb176b0d..62b402f3a825f 100644 --- a/ceph/Makefile +++ b/ceph/Makefile @@ -1,5 +1,5 @@ - CC=g++ +#CC=distcc g++ #CC=mpiCC CFLAGS=-D __gnu_cxx=std -g -I. #-I/usr/lib/mpi/include -L/usr/lib/mpi/lib LIBS= @@ -15,7 +15,7 @@ LEAKTRACER=$(HOME)/lib/LeakTracer.o SRCS=*.cc */*.cc OBJS=osd/OSD.o Messenger.o Logger.o mds/MDBalancer.o mds/MDS.o mds/CDentry.o mds/CDir.o mds/CInode.o mds/MDCache.o mds/MDStore.o clock.o mds/LogStream.o mds/MDLog.o client/Client.o mds/MDCluster.o -TARGETS=test import +TARGETS=test import mpitest all: depend ${TARGETS} diff --git a/ceph/client/Client.cc b/ceph/client/Client.cc index 428f16900473f..fcb9c8a33b157 100644 --- a/ceph/client/Client.cc +++ b/ceph/client/Client.cc @@ -72,6 +72,9 @@ void Client::dispatch(Message *m) if (tid < max_requests) issue_request(); + else { + done(); + } break; default: @@ -82,6 +85,12 @@ void Client::dispatch(Message *m) } +void Client::done() { + dout(1) << "done, sending msg to mds0" << endl; + messenger->send_message(new Message(MSG_CLIENT_DONE), + MSG_ADDR_MDS(0), MDS_PORT_MAIN, 0); +} + void Client::assim_reply(MClientReply *r) { ClNode *cur = root; diff --git a/ceph/client/Client.h b/ceph/client/Client.h index 7a11d0dfd9ff9..183530fb986d4 100644 --- a/ceph/client/Client.h +++ b/ceph/client/Client.h @@ -34,6 +34,8 @@ class Client : public Dispatcher { int init(); int shutdown(); + void done(); + virtual void dispatch(Message *m); virtual void assim_reply(MClientReply*); diff --git a/ceph/config.h b/ceph/config.h index 46b586d4214d2..215021d9986c8 100644 --- a/ceph/config.h +++ b/ceph/config.h @@ -1,6 +1,6 @@ // random crap -#define NUMMDS 10 +#define NUMMDS 4 #define NUMOSD 10 #define CLIENT_CACHE 100 @@ -15,10 +15,10 @@ #define FAKE_CLOCK -#define NUMCLIENT 800 -#define CLIENT_REQUESTS 250 +#define NUMCLIENT 80 +#define CLIENT_REQUESTS 50 -#define DEBUG_LEVEL 10 +#define DEBUG_LEVEL 1 #define MDS_CACHE_SIZE 25000 #define MDS_CACHE_MIDPOINT .8 diff --git a/ceph/mds/CDir.cc b/ceph/mds/CDir.cc index 26bd2acf8b369..457ec3cbc2eee 100644 --- a/ceph/mds/CDir.cc +++ b/ceph/mds/CDir.cc @@ -10,6 +10,10 @@ #include +#include "include/config.h" +#define dout(x) if (x <= DEBUG_LEVEL) cout << "cdir:" + + // CDir void CDir::hit() @@ -258,7 +262,7 @@ void CDir::freeze(Context *c) assert((state_test(CDIR_STATE_FROZEN|CDIR_STATE_FREEZING)) == 0); if (hard_pinned + nested_hard_pinned == 0) { - cout << "freeze " << *inode << endl; + dout(10) << "freeze " << *inode << endl; state_set(CDIR_STATE_FROZEN); inode->hard_pin(); // hard_pin for duration of freeze @@ -269,7 +273,7 @@ void CDir::freeze(Context *c) } else { state_set(CDIR_STATE_FREEZING); - cout << "freeze + wait " << *inode << endl; + dout(10) << "freeze + wait " << *inode << endl; // need to wait for pins to expire waiting_to_freeze.push_back(c); } @@ -277,7 +281,7 @@ void CDir::freeze(Context *c) void CDir::freeze_finish() { - cout << "freeze_finish " << *inode << endl; + dout(10) << "freeze_finish " << *inode << endl; inode->hard_pin(); // hard_pin for duration of freeze @@ -295,7 +299,7 @@ void CDir::freeze_finish() void CDir::unfreeze() // thaw? { - cout << "unfreeze " << *inode << endl; + dout(10) << "unfreeze " << *inode << endl; state_clear(CDIR_STATE_FROZEN); inode->hard_unpin(); @@ -323,15 +327,15 @@ void CDir::dump(int depth) { CDentry* d = iter->second; char isdir = ' '; if (d->inode->dir != NULL) isdir = '/'; - cout << ind << d->inode->inode.ino << " " << d->name << isdir << endl; + dout(10) << ind << d->inode->inode.ino << " " << d->name << isdir << endl; d->inode->dump(depth+1); iter++; } if (!(state_test(CDIR_STATE_COMPLETE))) - cout << ind << "..." << endl; + dout(10) << ind << "..." << endl; if (state_test(CDIR_STATE_DIRTY)) - cout << ind << "[dirty]" << endl; + dout(10) << ind << "[dirty]" << endl; } @@ -342,12 +346,12 @@ void CDir::dump_to_disk(MDS *mds) while (iter != items.end()) { CDentry* d = iter->second; if (d->inode->dir != NULL) { - cout << "dump2disk: " << d->inode->inode.ino << " " << d->name << '/' << endl; + dout(10) << "dump2disk: " << d->inode->inode.ino << " " << d->name << '/' << endl; d->inode->dump_to_disk(mds); } iter++; } - cout << "dump2disk: writing dir " << inode->inode.ino << endl; + dout(10) << "dump2disk: writing dir " << inode->inode.ino << endl; mds->mdstore->commit_dir(inode, NULL); } diff --git a/ceph/mds/CInode.h b/ceph/mds/CInode.h index 00ad828d03bf0..8e5d238e29251 100644 --- a/ceph/mds/CInode.h +++ b/ceph/mds/CInode.h @@ -2,6 +2,7 @@ #ifndef __CINODE_H #define __CINODE_H +#include "include/config.h" #include "include/types.h" #include "include/lru.h" #include "include/DecayCounter.h" @@ -230,7 +231,8 @@ class CInode : LRUObject { // --- reference counting void put(int by) { if (ref == 0 || ref_set.count(by) != 1) { - cout << " bad put " << *this << " by " << by << " was " << ref << " (" << ref_set << ")" << endl; + if (DEBUG_LEVEL > 7) + cout << " bad put " << *this << " by " << by << " was " << ref << " (" << ref_set << ")" << endl; assert(ref_set.count(by) == 1); assert(ref > 0); } @@ -238,18 +240,21 @@ class CInode : LRUObject { ref_set.erase(by); if (ref == 0) lru_unpin(); - cout << " put " << *this << " by " << by << " now " << ref << " (" << ref_set << ")" << endl; + if (DEBUG_LEVEL > 7) + cout << " put " << *this << " by " << by << " now " << ref << " (" << ref_set << ")" << endl; } void get(int by) { if (ref == 0) lru_pin(); if (ref_set.count(by)) { - cout << " bad get " << *this << " by " << by << " was " << ref << " (" << ref_set << ")" << endl; + if (DEBUG_LEVEL > 7) + cout << " bad get " << *this << " by " << by << " was " << ref << " (" << ref_set << ")" << endl; assert(ref_set.count(by) == 0); } ref++; ref_set.insert(by); - cout << " get " << *this << " by " << by << " now " << ref << " (" << ref_set << ")" << endl; + if (DEBUG_LEVEL > 7) + cout << " get " << *this << " by " << by << " now " << ref << " (" << ref_set << ")" << endl; } bool is_pinned_by(int by) { return ref_set.count(by); diff --git a/ceph/mds/MDBalancer.cc b/ceph/mds/MDBalancer.cc index b3aaeb04a047d..5fa3312974269 100644 --- a/ceph/mds/MDBalancer.cc +++ b/ceph/mds/MDBalancer.cc @@ -151,7 +151,7 @@ void MDBalancer::do_rebalance() double my_load = mds_load[whoami].root_pop; mds_load_t target_load = total_load / (double)cluster_size; - cout << " target load " << target_load << endl; + dout(5) << " target load " << target_load << endl; if (my_load < target_load.root_pop) { dout(5) << " i am underloaded, doing nothing." << endl; diff --git a/ceph/mds/MDCache.cc b/ceph/mds/MDCache.cc index 6665ede30bdfe..079ee038277f5 100644 --- a/ceph/mds/MDCache.cc +++ b/ceph/mds/MDCache.cc @@ -270,6 +270,8 @@ bool MDCache::shutdown_pass() dout(7) << "done, sending shutdown_finish" << endl; mds->messenger->send_message(new Message(MSG_MDS_SHUTDOWNFINISH), MSG_ADDR_MDS(0), MDS_PORT_MAIN, MDS_PORT_MAIN); + } else { + mds->handle_shutdown_finish(new Message(MSG_MDS_SHUTDOWNFINISH)); } return true; } else { diff --git a/ceph/mds/MDS.cc b/ceph/mds/MDS.cc index a3bf2c83c8364..cce36e89998c6 100644 --- a/ceph/mds/MDS.cc +++ b/ceph/mds/MDS.cc @@ -151,6 +151,7 @@ void MDS::handle_shutdown_finish(Message *m) if (did_shut_down.size() == mdcluster->get_num_mds()) { shutting_down = false; + messenger->done(); } // done @@ -198,6 +199,9 @@ void MDS::proc_message(Message *m) handle_ping((MPing*)m); break; + case MSG_CLIENT_DONE: + handle_client_done(m); + break; // MDS case MSG_MDS_SHUTDOWNSTART: @@ -280,6 +284,20 @@ void MDS::dispatch(Message *m) } +void MDS::handle_client_done(Message *m) +{ + int n = MSG_ADDR_NUM(m->get_source()); + dout(3) << "client" << n << " done" << endl; + done_clients.insert(n); + if (done_clients.size() == NUMCLIENT) { + dout(3) << "all clients done, initiating shutdown" << endl; + shutdown_start(); + } + + delete m; // done +} + + void MDS::handle_ping(MPing *m) { dout(10) << " received ping from " << MSG_ADDR_NICE(m->get_source()) << " with ttl " << m->ttl << endl; diff --git a/ceph/mds/MDS.h b/ceph/mds/MDS.h index 25350b978db91..f81b9378308a0 100644 --- a/ceph/mds/MDS.h +++ b/ceph/mds/MDS.h @@ -95,6 +95,7 @@ class MDS : public Dispatcher { DecayCounter stat_read; DecayCounter stat_write; + set done_clients; public: // sub systems @@ -139,6 +140,7 @@ class MDS : public Dispatcher { void handle_ping(class MPing *m); + void handle_client_done(Message *m); void handle_shutdown_start(Message *m); void handle_shutdown_finish(Message *m); diff --git a/ceph/mds/events/EInodeUpdate.h b/ceph/mds/events/EInodeUpdate.h index c645a9078a7a9..94d12c7f1dc63 100644 --- a/ceph/mds/events/EInodeUpdate.h +++ b/ceph/mds/events/EInodeUpdate.h @@ -2,6 +2,7 @@ #define __EINODEUPDATE_H #include +#include "include/config.h" #include "include/types.h" #include "../LogEvent.h" #include "../CInode.h" @@ -31,7 +32,8 @@ class C_EIU_VerifyInodeUpdate : public Context { if (in->authority(mds->get_cluster()) == mds->get_nodeid() && // mine in->is_dirty() && // dirty in->get_version() == version) { // same version that i have to deal with - cout << "ARGH, did EInodeUpdate commit but inode " << *in << " is still dirty" << endl; + if (DEBUG_LEVEL > 7) + cout << "ARGH, did EInodeUpdate commit but inode " << *in << " is still dirty" << endl; // damnit mds->mdstore->commit_dir(in->get_parent_inode(), new C_EIU_VerifyInodeUpdate(mds, @@ -78,7 +80,8 @@ class EInodeUpdate : public LogEvent { CInode *in = mds->mdcache->get_inode(inode.ino); //assert(in); if (!in) { - cout << "inode " << inode.ino << " not in cache, must have exported" << endl; + if (DEBUG_LEVEL > 7) + cout << "inode " << inode.ino << " not in cache, must have exported" << endl; return true; } if (in->authority(mds->get_cluster()) != mds->get_nodeid()) @@ -96,7 +99,8 @@ class EInodeUpdate : public LogEvent { if (parent) { // okay! - cout << "commiting containing dir for " << *in << ", which is " << *parent << endl; + if (DEBUG_LEVEL > 7) + cout << "commiting containing dir for " << *in << ", which is " << *parent << endl; mds->mdstore->commit_dir(parent, new C_EIU_VerifyInodeUpdate(mds, in->ino(), @@ -104,7 +108,8 @@ class EInodeUpdate : public LogEvent { c)); } else { // oh, i'm the root inode - cout << "don't know how to commit the root inode" << endl; + if (DEBUG_LEVEL > 7) + cout << "don't know how to commit the root inode" << endl; if (c) { c->finish(0); delete c; diff --git a/ceph/msg/MPIMessenger.cc b/ceph/msg/MPIMessenger.cc index 8959e1b625acf..5f0f226784489 100644 --- a/ceph/msg/MPIMessenger.cc +++ b/ceph/msg/MPIMessenger.cc @@ -20,23 +20,29 @@ hash_map directory; hash_map incoming; -#define dout(l) if (l<=DEBUG_LEVEL) cout << "mpi " -#define dout2(l) if (1<=DEBUG_LEVEL) cout - - int mpi_world_size; int mpi_rank; +bool mpi_done = false; + + +#define dout(l) if (l<=DEBUG_LEVEL) cout << "[MPI " << mpi_rank << "/" << mpi_world_size << "] " +#define dout2(l) if (1<=DEBUG_LEVEL) cout + + int mpimessenger_init(int& argc, char**& argv) { - dout(1) << "MPI_Init" << endl; //MPI::Init(argc, argv); MPI_Init(&argc, &argv); MPI_Comm_size(MPI_COMM_WORLD, &mpi_world_size); MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank); - dout(1) << "i am " << mpi_rank << " of " << mpi_world_size << endl; + char hostname[100]; + gethostname(hostname,100); + int pid = getpid(); + + dout(1) << "init: i am " << hostname << " pid " << pid << endl; assert(mpi_world_size > NUMOSD+NUMMDS); @@ -51,14 +57,14 @@ int mpimessenger_world() int mpimessenger_loop() { - while (1) { + while (!mpi_done) { // check mpi - dout(1) << "waiting for message" << endl; + dout(12) << "waiting for message" << endl; // get size //MPI::Status status; MPI_Status status; - int msize; + int msize = 0; //MPI::COMM_WORLD.Recv(&msize, MPI_Recv(&msize, 1, @@ -68,9 +74,15 @@ int mpimessenger_loop() MPI_COMM_WORLD, &status); // receives greeting from each process + if (msize == -1) { + dout(1) << "got -1 terminate signal" << endl; + mpi_done = true; + break; + } + int tag = status.MPI_TAG; int source = status.MPI_SOURCE; - dout(1) << "incoming size " << msize << " tag " << tag << " from rank " << source << endl; + dout(12) << "incoming size " << msize << " tag " << tag << " from rank " << source << endl; // get message char *buf = new char[msize]; @@ -103,6 +115,9 @@ int mpimessenger_loop() break; } } + + dout(1) << "waiting for all to finish" << endl; + MPI_Barrier (MPI_COMM_WORLD); } int mpimessenger_shutdown() @@ -112,6 +127,26 @@ int mpimessenger_shutdown() } +void MPIMessenger::done() +{ + dout(1) << "done()" << endl; + mpi_done = true; + + // tell everyone + for (int i=0; iset_source(whoami, fromport); + m->set_dest(dest, port); + + // send! int trank = MPI_DEST_TO_RANK(dest,mpi_world_size); crope r = m->get_serialized(); int size = r.length(); @@ -156,27 +196,28 @@ int MPIMessenger::send_message(Message *m, long dest, int port, int fromport) // no implemented assert(0); - } else { - dout(3) << "sending message via MPI for (tag) " << dest << " to rank " << trank << " size " << size << endl; + } - //MPI::COMM_WORLD.Send(&r, - MPI_Send(&r, - 1, - MPI_INT, - trank, - dest, - MPI_COMM_WORLD); - - char *buf = (char*)r.c_str(); - //MPI::COMM_WORLD.Send(buf, - MPI_Send(buf, - size, - MPI_CHAR, - trank, - dest, - MPI_COMM_WORLD); - } + dout(10) << "sending message via MPI for (tag) " << dest << " to rank " << trank << " size " << size << endl; + + //MPI::COMM_WORLD.Send(&r, + MPI_Send(&size, + 1, + MPI_INT, + trank, + dest, + MPI_COMM_WORLD); + + char *buf = (char*)r.c_str(); + //MPI::COMM_WORLD.Send(buf, + MPI_Send(buf, + size, + MPI_CHAR, + trank, + dest, + MPI_COMM_WORLD); } + int MPIMessenger::wait_message(time_t seconds) { } diff --git a/ceph/msg/MPIMessenger.h b/ceph/msg/MPIMessenger.h index 439ca82553f06..33edb32c06967 100644 --- a/ceph/msg/MPIMessenger.h +++ b/ceph/msg/MPIMessenger.h @@ -18,6 +18,7 @@ class MPIMessenger : public Messenger { virtual int shutdown(); virtual int send_message(Message *m, long dest, int port=0, int fromport=0); virtual int wait_message(time_t seconds); + virtual void done(); virtual int loop(); }; diff --git a/ceph/msg/Message.h b/ceph/msg/Message.h index e532f7917d047..f76301730133a 100644 --- a/ceph/msg/Message.h +++ b/ceph/msg/Message.h @@ -4,6 +4,8 @@ #define MSG_PING 1 +#define MSG_FINISH 0 + #define MSG_OSD_READ 10 #define MSG_OSD_READREPLY 11 #define MSG_OSD_WRITE 12 @@ -11,6 +13,7 @@ #define MSG_CLIENT_REQUEST 20 #define MSG_CLIENT_REPLY 21 +#define MSG_CLIENT_DONE 22 #define MSG_MDS_HEARTBEAT 100 #define MSG_MDS_DISCOVER 110 diff --git a/ceph/msg/Messenger.cc b/ceph/msg/Messenger.cc index c8507c978a89e..319a2d8bd6e12 100644 --- a/ceph/msg/Messenger.cc +++ b/ceph/msg/Messenger.cc @@ -123,6 +123,7 @@ decode_message(crope& ser) case MSG_MDS_SHUTDOWNSTART: case MSG_MDS_SHUTDOWNFINISH: + case MSG_CLIENT_DONE: m = new Message(type); break; diff --git a/ceph/msg/Messenger.h b/ceph/msg/Messenger.h index 4681473435b6c..bac42fa8f3ecb 100644 --- a/ceph/msg/Messenger.h +++ b/ceph/msg/Messenger.h @@ -37,6 +37,7 @@ class Messenger { virtual int shutdown() = 0; virtual int send_message(Message *m, long dest, int port=0, int fromport=0) = 0; virtual int wait_message(time_t seconds) = 0; + virtual void done() {} virtual int loop() { while (1) { @@ -75,6 +76,7 @@ class Messenger { + }; diff --git a/ceph/osd/OSD.cc b/ceph/osd/OSD.cc index 6c1dd97219b94..f7629983179e4 100644 --- a/ceph/osd/OSD.cc +++ b/ceph/osd/OSD.cc @@ -130,7 +130,7 @@ void OSD::read(MOSDRead *r) char *buf = new char[len]; long got = ::read(fd, buf, len); - dout(10) << "osd_read " << got << " / " << len << " bytes to " << f << endl; + dout(10) << "osd_read " << got << " / " << len << " bytes from " << f << endl; // close flock(fd, LOCK_UN); @@ -177,7 +177,7 @@ void OSD::write(MOSDWrite *m) fchmod(fd, 0664); - dout(10) << "osd_write " << m->get_len() << " bytes to " << f << endl; + dout(10) << "osd_write " << m->get_len() << " bytes at offset " << m->get_offset() << " to " << f << endl; if (m->get_offset()) lseek(fd, m->get_offset(), SEEK_SET); @@ -191,7 +191,8 @@ void OSD::write(MOSDWrite *m) if (m->get_offset() == 0) { char *n = get_filename(whoami, m->get_oid()); - cout << f << " to " << n << " rename sez " << rename(f, n) << endl; + int r = rename(f,n); + dout(11) << f << " to " << n << " rename sez " << r << endl; } // clean up diff --git a/ceph/test/fakemds.cc b/ceph/test/fakemds.cc index 6fe9615a4c430..522104a83041b 100644 --- a/ceph/test/fakemds.cc +++ b/ceph/test/fakemds.cc @@ -61,9 +61,8 @@ int main(int argc, char **argv) { // loop fakemessenger_do_loop(); - mds[0]->shutdown_start(); - - fakemessenger_do_loop(); + //mds[0]->shutdown_start(); + //fakemessenger_do_loop(); // cout << "---- check ----" << endl; diff --git a/ceph/test/mpitest.cc b/ceph/test/mpitest.cc index ee7d4e980817e..f4a23a309d30e 100644 --- a/ceph/test/mpitest.cc +++ b/ceph/test/mpitest.cc @@ -28,7 +28,7 @@ __uint64_t ino = 1; int play(); int main(int argc, char **argv) { - cout << "hi there" << endl; + cout << "mpitest starting" << endl; int myrank = mpimessenger_init(argc, argv); int world = mpimessenger_world(); -- 2.39.5