From 98b2658de13659ae84ad1c859db15b0623b56286 Mon Sep 17 00:00:00 2001 From: sageweil Date: Wed, 24 Jan 2007 03:05:55 +0000 Subject: [PATCH] NewerMessenger replaced with SimpleMessenger, which is much... simpler. new build targets for standalone monitor and mds: cmon and cmds. it's now possible to start up a distributed fs without mpi and newsyn. some cleanup in other messenger code. client boot process modified to contact montior explicitly for client id. git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1029 29311d96-e01e-0410-9327-a35deaab8ce9 --- trunk/ceph/Makefile | 14 +- trunk/ceph/README | 55 +- trunk/ceph/cfuse.cc | 8 +- trunk/ceph/client/Client.cc | 84 +- trunk/ceph/client/Client.h | 2 +- trunk/ceph/cmds.cc | 96 +++ trunk/ceph/cmon.cc | 100 +++ trunk/ceph/common/Logger.cc | 4 +- trunk/ceph/cosd.cc | 57 +- trunk/ceph/fakesyn.cc | 3 +- trunk/ceph/include/types.h | 8 + trunk/ceph/mds/AnchorTable.cc | 15 +- trunk/ceph/mds/AnchorTable.h | 7 +- trunk/ceph/mds/IdAllocator.cc | 12 + trunk/ceph/mds/IdAllocator.h | 5 +- trunk/ceph/mds/Locker.cc | 6 +- trunk/ceph/mds/MDBalancer.cc | 9 +- trunk/ceph/mds/MDCache.cc | 42 +- trunk/ceph/mds/MDS.cc | 110 +-- trunk/ceph/mds/MDS.h | 3 +- trunk/ceph/mds/MDSMap.h | 10 + trunk/ceph/mds/MDStore.cc | 4 +- trunk/ceph/mds/Migrator.cc | 66 +- trunk/ceph/mds/Renamer.cc | 8 +- trunk/ceph/mds/Server.cc | 58 +- trunk/ceph/mds/Server.h | 3 +- trunk/ceph/messages/MClientBoot.h | 34 + trunk/ceph/messages/MClientMount.h | 16 - trunk/ceph/messages/MOSDOp.h | 2 +- trunk/ceph/mon/ClientMonitor.cc | 107 +++ trunk/ceph/mon/ClientMonitor.h | 52 ++ trunk/ceph/mon/Elector.cc | 13 +- trunk/ceph/mon/MDSMonitor.cc | 15 + trunk/ceph/mon/MDSMonitor.h | 5 + trunk/ceph/mon/Monitor.cc | 8 + trunk/ceph/mon/Monitor.h | 6 +- trunk/ceph/msg/FakeMessenger.cc | 66 +- trunk/ceph/msg/FakeMessenger.h | 6 +- trunk/ceph/msg/Message.cc | 4 + trunk/ceph/msg/Message.h | 98 +-- trunk/ceph/msg/Messenger.h | 16 +- trunk/ceph/msg/SimpleMessenger.cc | 1210 ++++++++++++++++++++++++++++ trunk/ceph/msg/SimpleMessenger.h | 292 +++++++ trunk/ceph/newsyn.cc | 32 +- trunk/ceph/osd/OSD.cc | 6 +- trunk/ceph/osd/PG.h | 2 +- 46 files changed, 2354 insertions(+), 425 deletions(-) create mode 100644 trunk/ceph/cmds.cc create mode 100644 trunk/ceph/cmon.cc create mode 100644 trunk/ceph/messages/MClientBoot.h create mode 100644 trunk/ceph/mon/ClientMonitor.cc create mode 100644 trunk/ceph/mon/ClientMonitor.h create mode 100644 trunk/ceph/msg/SimpleMessenger.cc create mode 100644 trunk/ceph/msg/SimpleMessenger.h diff --git a/trunk/ceph/Makefile b/trunk/ceph/Makefile index 1681ac16698a8..c35adbb514b9c 100644 --- a/trunk/ceph/Makefile +++ b/trunk/ceph/Makefile @@ -63,12 +63,12 @@ MON_OBJS= \ mon/Monitor.o\ mon/OSDMonitor.o\ mon/MDSMonitor.o\ + mon/ClientMonitor.o\ mon/Elector.o COMMON_OBJS= \ msg/Messenger.o\ msg/Message.o\ - msg/HostMonitor.o\ common/Logger.o\ common/Clock.o\ common/Timer.o\ @@ -85,7 +85,7 @@ TCP_OBJS = \ msg/TCPMessenger.o\ msg/TCPDirectory.o -TARGETS = cosd cfuse newsyn fakesyn +TARGETS = cmon cosd cmds cfuse newsyn fakesyn SRCS=*.cc */*.cc *.h */*.h */*/*.h @@ -97,16 +97,16 @@ obfs: depend obfstest # real bits -cmon: cmon.cc mon.o ebofs.o msg/NewerMessenger.o common.o +cmon: cmon.cc mon.o ebofs.o msg/SimpleMessenger.o common.o ${CC} ${CFLAGS} ${MPILIBS} $^ -o $@ -cosd: cosd.cc osd.o ebofs.o msg/NewerMessenger.o common.o +cosd: cosd.cc osd.o ebofs.o msg/SimpleMessenger.o common.o ${CC} ${CFLAGS} ${MPILIBS} $^ -o $@ -cmds: cmds.cc mds.o osdc.o msg/NewerMessenger.o common.o +cmds: cmds.cc mds.o osdc.o msg/SimpleMessenger.o common.o ${CC} ${CFLAGS} ${MPILIBS} $^ -o $@ -cfuse: cfuse.cc client.o osdc.o client/fuse.o msg/NewerMessenger.o common.o +cfuse: cfuse.cc client.o osdc.o client/fuse.o msg/SimpleMessenger.o common.o ${CC} ${CFLAGS} ${LIBS} -lfuse $^ -o $@ @@ -134,7 +134,7 @@ fakesyn: fakesyn.o mon.o mds.o client.o osd.o ebofs.o osdc.o msg/FakeMessenger.o tcpsyn: tcpsyn.cc mon.o mds.o client.o osd.o ebofs.o osdc.o ${TCP_OBJS} common.o ${MPICC} ${MPICFLAGS} ${MPILIBS} $^ -o $@ -newsyn: newsyn.cc mon.o mds.o client.o osd.o ebofs.o osdc.o msg/NewerMessenger.o common.o +newsyn: newsyn.cc mon.o mds.o client.o osd.o ebofs.o osdc.o msg/SimpleMessenger.o common.o ${MPICC} -pg ${MPICFLAGS} ${MPILIBS} $^ -o $@ newsyn.nopg: newsyn.cc mon.o mds.o client.o osd.o ebofs.o osdc.o msg/NewerMessenger.o common.o diff --git a/trunk/ceph/README b/trunk/ceph/README index 97008e49ffe75..aa016817cebf0 100644 --- a/trunk/ceph/README +++ b/trunk/ceph/README @@ -1,53 +1,4 @@ -pmds = parallel metadata server/system +Ceph - a scalable distributed file system +----------------------------------------- -'test' is a standalone proccess that runs all clients, OSDs, and MDSs -in a single process with a basic message passer (FakeMessenger). -Useful for debugging. - -'pmds' uses MPI for communication. - -'import' builds a metadata store on ./osddata/ by taking find output -from stdin. Make sure find is run from the current directory so that -import can stat the files it's fed. The find root becomes the file -system root; feel free to use relative paths. - -This is all GPL, etc. - - -Getting started: - - 1- Comment out the LEAKTRACER= line in the Makefile if you don't have - LeakTracer installed (you probably don't). - - 2- make (test and import targets are testing ones; pmds uses MPI) - - 3- Build an OSD metadata store: - # mkdir osddata - # find /some/big/dir | ./import root - - 4- Single proc sim: - # ./test - or more likely, - # ./test > out - - 5- Change parameters in config.cc. - - 6- If you want stats logged, mkdir log (make sure you have enough - file handles; there's one open file per client). - - -Notes on pmds (MPI version): - - - On mcr/alc I have to - # setenv LD_LIBRARY_PATH /usr/lib/mpi/mpi_gnu/lib - for the GNU runtime MPI libs (otherwise you get the Intel ones, - which segfault). - - - Each MDS and OSD gets its own node. Clients are divided over - whatever is left over. So make sure you tell MPI to give you at - least num_mds+num_osd+1 processes (num_mds etc defined in - config.cc). - - - -2004.08.25 sage@newdream.net +Please see http://ceph.sourceforge.net/ for current info. diff --git a/trunk/ceph/cfuse.cc b/trunk/ceph/cfuse.cc index b260c4bd3c3f8..95567084348a9 100644 --- a/trunk/ceph/cfuse.cc +++ b/trunk/ceph/cfuse.cc @@ -11,8 +11,6 @@ * */ - - #include #include #include @@ -20,12 +18,10 @@ using namespace std; #include "config.h" -#include "mds/MDS.h" -#include "osd/OSD.h" #include "client/Client.h" #include "client/fuse.h" -#include "msg/NewMessenger.h" +#include "msg/SimpleMessenger.h" #include "common/Timer.h" @@ -60,7 +56,6 @@ int main(int argc, char **argv, char *envp[]) { monmap->decode(bl); // start up network - rank.set_namer(monmap->get_inst(0).addr); rank.start_rank(); // start client @@ -85,7 +80,6 @@ int main(int argc, char **argv, char *envp[]) { // wait for messenger to finish rank.wait(); - return 0; } diff --git a/trunk/ceph/client/Client.cc b/trunk/ceph/client/Client.cc index cb3cc2622bae4..e004876fa6072 100644 --- a/trunk/ceph/client/Client.cc +++ b/trunk/ceph/client/Client.cc @@ -30,6 +30,7 @@ using namespace std; #include "Client.h" +#include "messages/MClientBoot.h" #include "messages/MClientMount.h" #include "messages/MClientMountAck.h" #include "messages/MClientFileCaps.h" @@ -91,7 +92,7 @@ public: Client::Client(Messenger *m, MonMap *mm) { // which client am i? - whoami = MSG_ADDR_NUM(m->get_myaddr()); + whoami = m->get_myaddr().num(); monmap = mm; mounted = false; @@ -683,37 +684,18 @@ void Client::dispatch(Message *m) client_lock.Unlock(); } -void Client::handle_mount_ack(MClientMountAck *m) -{ - // mdsmap! - if (!mdsmap) mdsmap = new MDSMap; - mdsmap->decode(m->get_mds_map_state()); - - // we got osdmap! - osdmap->decode(m->get_osd_map_state()); - - dout(2) << "mounted" << endl; - mounted = true; - mount_cond.Signal(); - - delete m; -} - - -void Client::handle_unmount_ack(Message* m) -{ - dout(1) << "got unmount ack" << endl; - mounted = false; - mount_cond.Signal(); - delete m; -} - void Client::handle_mds_map(MMDSMap* m) { if (mdsmap == 0) mdsmap = new MDSMap; + if (whoami < 0) { + whoami = m->get_dest().num(); + dout(1) << "handle_mds_map i am now " << m->get_dest() << endl; + messenger->reset_myaddr(m->get_dest()); + } + map::reverse_iterator p = m->maps.rbegin(); dout(1) << "handle_mds_map epoch " << p->first << endl; @@ -747,7 +729,7 @@ public: */ void Client::handle_file_caps(MClientFileCaps *m) { - int mds = MSG_ADDR_NUM(m->get_source()); + int mds = m->get_source().num(); Inode *in = 0; if (inode_map.count(m->get_ino())) in = inode_map[ m->get_ino() ]; @@ -847,8 +829,7 @@ void Client::handle_file_caps(MClientFileCaps *m) << ", which we don't want caps for, releasing." << endl; m->set_caps(0); m->set_wanted(0); - entity_inst_t srcinst = m->get_source_inst(); - messenger->send_message(m, m->get_source(), srcinst, m->get_source_port()); + messenger->send_message(m, m->get_source(), m->get_source_inst(), m->get_source_port()); return; } @@ -955,7 +936,7 @@ void Client::implemented_caps(MClientFileCaps *m, Inode *in) in->file_wr_size = 0; } - messenger->send_message(m, m->get_source(), m->get_source_port()); + messenger->send_message(m, m->get_source(), m->get_source_inst(), m->get_source_port()); } @@ -1013,7 +994,7 @@ void Client::update_caps_wanted(Inode *in) // ------------------- // fs ops -int Client::mount(int mkfs) +int Client::mount() { client_lock.Lock(); @@ -1021,22 +1002,24 @@ int Client::mount(int mkfs) // FIXME mds map update race with mount. - dout(2) << "fetching latest mds map" << endl; + dout(2) << "sending boot msg to monitor" << endl; if (mdsmap) delete mdsmap; int mon = monmap->pick_mon(); - messenger->send_message(new MMDSGetMap(), + messenger->send_message(new MClientBoot(), MSG_ADDR_MON(mon), monmap->get_inst(mon)); - + while (!mdsmap) mount_cond.Wait(client_lock); dout(2) << "mounting" << endl; MClientMount *m = new MClientMount(); - if (mkfs) m->set_mkfs(mkfs); - - messenger->send_message(m, MSG_ADDR_MDS(0), mdsmap->get_inst(0), MDS_PORT_SERVER); + int who = 0; // mdsmap->get_root(); // mount at root, for now + messenger->send_message(m, + MSG_ADDR_MDS(who), mdsmap->get_inst(who), + MDS_PORT_SERVER); + while (!mounted) mount_cond.Wait(client_lock); @@ -1055,6 +1038,23 @@ int Client::mount(int mkfs) return 0; } +void Client::handle_mount_ack(MClientMountAck *m) +{ + // mdsmap! + if (!mdsmap) mdsmap = new MDSMap; + mdsmap->decode(m->get_mds_map_state()); + + // we got osdmap! + osdmap->decode(m->get_osd_map_state()); + + dout(2) << "mounted" << endl; + mounted = true; + mount_cond.Signal(); + + delete m; +} + + int Client::unmount() { client_lock.Lock(); @@ -1124,6 +1124,14 @@ int Client::unmount() return 0; } +void Client::handle_unmount_ack(Message* m) +{ + dout(1) << "got unmount ack" << endl; + mounted = false; + mount_cond.Signal(); + delete m; +} + // namespace ops @@ -2042,7 +2050,7 @@ int Client::open(const char *relpath, int flags) if (cmode & FILE_MODE_LAZY) f->inode->num_open_lazy++; // caps included? - int mds = MSG_ADDR_NUM(reply->get_source()); + int mds = reply->get_source().num(); if (f->inode->caps.empty()) {// first caps? dout(7) << " first caps on " << f->inode->inode.ino << endl; diff --git a/trunk/ceph/client/Client.h b/trunk/ceph/client/Client.h index 626176f9f9f47..5a871bf01ccc8 100644 --- a/trunk/ceph/client/Client.h +++ b/trunk/ceph/client/Client.h @@ -519,7 +519,7 @@ protected: // ---------------------- // fs ops. - int mount(int mkfs=0); + int mount(); int unmount(); // these shoud (more or less) mirror the actual system calls. diff --git a/trunk/ceph/cmds.cc b/trunk/ceph/cmds.cc new file mode 100644 index 0000000000000..5015fd21bcee5 --- /dev/null +++ b/trunk/ceph/cmds.cc @@ -0,0 +1,96 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include +#include +#include + +#include +#include +#include +using namespace std; + +#include "config.h" + +#include "mon/MonMap.h" +#include "mds/MDS.h" + +#include "msg/SimpleMessenger.h" + +#include "common/Timer.h" + + +class C_Die : public Context { +public: + void finish(int) { + cerr << "die" << endl; + exit(1); + } +}; + +class C_Debug : public Context { + public: + void finish(int) { + int size = &g_conf.debug_after - &g_conf.debug; + memcpy((char*)&g_conf.debug, (char*)&g_debug_after_conf.debug, size); + dout(0) << "debug_after flipping debug settings" << endl; + } +}; + + +int main(int argc, char **argv) +{ + vector args; + argv_to_vec(argc, argv, args); + + parse_config_options(args); + + if (g_conf.kill_after) + g_timer.add_event_after(g_conf.kill_after, new C_Die); + if (g_conf.debug_after) + g_timer.add_event_after(g_conf.debug_after, new C_Debug); + + + // load monmap + bufferlist bl; + int fd = ::open(".ceph_monmap", O_RDONLY); + assert(fd >= 0); + struct stat st; + ::fstat(fd, &st); + bufferptr bp(st.st_size); + bl.append(bp); + ::read(fd, (void*)bl.c_str(), bl.length()); + ::close(fd); + + MonMap *monmap = new MonMap; + monmap->decode(bl); + + // start up network + rank.start_rank(); + + // start mds + Messenger *m = rank.register_entity(MSG_ADDR_MDS_NEW); + assert(m); + + MDS *mds = new MDS(m->get_myaddr().num(), m, monmap); + mds->init(); + + // wait + rank.wait(); + + // done + delete mds; + + return 0; +} + diff --git a/trunk/ceph/cmon.cc b/trunk/ceph/cmon.cc new file mode 100644 index 0000000000000..7d915d9c3ef58 --- /dev/null +++ b/trunk/ceph/cmon.cc @@ -0,0 +1,100 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include +#include +#include + +#include +#include +#include +using namespace std; + +#include "config.h" + +#include "mon/MonMap.h" +#include "mon/Monitor.h" + +#include "msg/SimpleMessenger.h" + +#include "common/Timer.h" + + +class C_Die : public Context { +public: + void finish(int) { + cerr << "die" << endl; + exit(1); + } +}; + +class C_Debug : public Context { + public: + void finish(int) { + int size = &g_conf.debug_after - &g_conf.debug; + memcpy((char*)&g_conf.debug, (char*)&g_debug_after_conf.debug, size); + dout(0) << "debug_after flipping debug settings" << endl; + } +}; + + +int main(int argc, char **argv) +{ + vector args; + argv_to_vec(argc, argv, args); + + parse_config_options(args); + + if (g_conf.kill_after) + g_timer.add_event_after(g_conf.kill_after, new C_Die); + if (g_conf.debug_after) + g_timer.add_event_after(g_conf.debug_after, new C_Debug); + + // let's assume a standalone monitor + // FIXME: we want to start a cluster, eventually. + cout << "starting standalone mon0" << endl; + + // start messenger + rank.start_rank(); + + // create monmap + MonMap *monmap = new MonMap(1); + monmap->mon_inst[0] = rank.my_inst; + + cout << "bound to " << rank.get_listen_addr() << endl; + + // start monitor + Messenger *m = rank.register_entity(MSG_ADDR_MON(0)); + Monitor *mon = new Monitor(0, m, monmap); + mon->init(); + + // write monmap + cout << "writing monmap to .ceph_monmap" << endl; + bufferlist bl; + monmap->encode(bl); + int fd = ::open(".ceph_monmap", O_RDWR|O_CREAT); + assert(fd >= 0); + ::fchmod(fd, 0644); + ::write(fd, (void*)bl.c_str(), bl.length()); + ::close(fd); + + // wait + cout << "waiting for shutdown ..." << endl; + rank.wait(); + + // done + delete mon; + + return 0; +} + diff --git a/trunk/ceph/common/Logger.cc b/trunk/ceph/common/Logger.cc index 37ceb22321d8f..bb9923d6a2cd3 100644 --- a/trunk/ceph/common/Logger.cc +++ b/trunk/ceph/common/Logger.cc @@ -44,7 +44,7 @@ Logger::Logger(string fn, LogType *type) //cout << "log " << filename << endl; interval = g_conf.log_interval; - start = g_clock.now(); // time 0! + //start = g_clock.now(); // time 0! last_logged = 0; wrote_header = -1; open = false; @@ -152,7 +152,7 @@ void Logger::flush(bool force) start = fromstart; } fromstart -= start; - + while (force || ((fromstart.sec() > last_logged) && (fromstart.sec() - last_logged >= interval))) { diff --git a/trunk/ceph/cosd.cc b/trunk/ceph/cosd.cc index cb60ed492515b..667e86537e4b2 100644 --- a/trunk/ceph/cosd.cc +++ b/trunk/ceph/cosd.cc @@ -63,28 +63,44 @@ int main(int argc, char **argv) g_timer.add_event_after(g_conf.debug_after, new C_Debug); - assert(args.size() == 1); - char *dev = args[0]; - cerr << "dev " << dev << endl; - - // who am i? peek at superblock! - OSDSuperblock sb; - ObjectStore *store = new Ebofs(dev); - bufferlist bl; - store->mount(); - int r = store->read(object_t(0,0), 0, sizeof(sb), bl); - if (r < 0) { - cerr << "couldn't read superblock object on " << dev << endl; - exit(0); + char *dev; + int whoami = -1; + for (unsigned i=0; iumount(); - delete store; + cout << "dev " << dev << endl; + - cout << "osd fs says i am osd" << sb.whoami << endl; + if (whoami < 0) { + // who am i? peek at superblock! + OSDSuperblock sb; + ObjectStore *store = new Ebofs(dev); + bufferlist bl; + store->mount(); + int r = store->read(object_t(0,0), 0, sizeof(sb), bl); + if (r < 0) { + cerr << "couldn't read superblock object on " << dev << endl; + exit(0); + } + bl.copy(0, sizeof(sb), (char*)&sb); + store->umount(); + delete store; + whoami = sb.whoami; + + cout << "osd fs says i am osd" << whoami << endl; + } else { + cout << "command line arg says i am osd" << whoami << endl; + } // load monmap - bl.clear(); + bufferlist bl; int fd = ::open(".ceph_monmap", O_RDONLY); assert(fd >= 0); struct stat st; @@ -98,13 +114,12 @@ int main(int argc, char **argv) monmap->decode(bl); // start up network - rank.set_namer(monmap->get_inst(0).addr); rank.start_rank(); // start osd - Messenger *m = rank.register_entity(MSG_ADDR_OSD(sb.whoami)); + Messenger *m = rank.register_entity(MSG_ADDR_OSD(whoami)); assert(m); - OSD *osd = new OSD(sb.whoami, m, monmap, dev); + OSD *osd = new OSD(whoami, m, monmap, dev); osd->init(); // wait diff --git a/trunk/ceph/fakesyn.cc b/trunk/ceph/fakesyn.cc index 312ad8b345ef8..3b04dbb319f8b 100644 --- a/trunk/ceph/fakesyn.cc +++ b/trunk/ceph/fakesyn.cc @@ -71,6 +71,7 @@ int main(int argc, char **argv) g_clock.tare(); MonMap *monmap = new MonMap(g_conf.num_mon); + monmap->mon_inst[0].rank = 0; // hack ; see FakeMessenger.cc char hostname[100]; gethostname(hostname,100); @@ -87,7 +88,7 @@ int main(int argc, char **argv) OSD *mdsosd[NUMMDS]; for (int i=0; i struct hash<__int64_t> { + size_t operator()(__int64_t __x) const { + static hash<__int32_t> H; + return H((__x >> 32) ^ (__x & 0xffffffff)); + } + }; + } diff --git a/trunk/ceph/mds/AnchorTable.cc b/trunk/ceph/mds/AnchorTable.cc index 7b881de0339da..d2c338513740c 100644 --- a/trunk/ceph/mds/AnchorTable.cc +++ b/trunk/ceph/mds/AnchorTable.cc @@ -32,12 +32,22 @@ AnchorTable::AnchorTable(MDS *mds) this->mds = mds; opening = false; opened = false; - +} + +void AnchorTable::init_inode() +{ memset(&table_inode, 0, sizeof(table_inode)); table_inode.ino = MDS_INO_ANCHORTABLE+mds->get_nodeid(); table_inode.layout = g_OSD_FileLayout; } +void AnchorTable::reset() +{ + init_inode(); + opened = true; + anchor_map.clear(); +} + /* * basic updates */ @@ -214,7 +224,7 @@ void AnchorTable::handle_anchor_request(class MAnchorRequest *m) } // send reply - mds->messenger->send_message(reply, m->get_source(), m->get_source_port()); + mds->messenger->send_message(reply, m->get_source(), m->get_source_inst(), m->get_source_port()); delete m; } @@ -308,6 +318,7 @@ public: void AnchorTable::load(Context *onfinish) { dout(7) << "load" << endl; + init_inode(); assert(!opened); diff --git a/trunk/ceph/mds/AnchorTable.h b/trunk/ceph/mds/AnchorTable.h index 2e6c1d7b07788..0b0af03af5b68 100644 --- a/trunk/ceph/mds/AnchorTable.h +++ b/trunk/ceph/mds/AnchorTable.h @@ -38,6 +38,8 @@ class AnchorTable { AnchorTable(MDS *mds); protected: + void init_inode(); // call this before doing anything. + // bool have_ino(inodeno_t ino) { return true; // always in memory for now. @@ -68,10 +70,7 @@ class AnchorTable { public: // load/save entire table for now! - void reset() { - opened = true; - anchor_map.clear(); - } + void reset(); void save(Context *onfinish); void load(Context *onfinish); void load_2(size_t size, bufferlist& bl); diff --git a/trunk/ceph/mds/IdAllocator.cc b/trunk/ceph/mds/IdAllocator.cc index fba33d599de40..671bd70a77c27 100644 --- a/trunk/ceph/mds/IdAllocator.cc +++ b/trunk/ceph/mds/IdAllocator.cc @@ -29,6 +29,14 @@ #define dout(x) if (x <= g_conf.debug_mds) cout << g_clock.now() << " mds" << mds->get_nodeid() << ".idalloc: " +void IdAllocator::init_inode() +{ + memset(&inode, 0, sizeof(inode)); + inode.ino = MDS_INO_IDS_OFFSET + mds->get_nodeid(); + inode.layout = g_OSD_FileLayout; +} + + idno_t IdAllocator::alloc_id(bool replay) { assert(is_active()); @@ -119,6 +127,8 @@ void IdAllocator::save_2(version_t v) void IdAllocator::reset() { + init_inode(); + free.clear(); // use generic range FIXME THIS IS CRAP @@ -152,6 +162,8 @@ void IdAllocator::load(Context *onfinish) { dout(10) << "load" << endl; + init_inode(); + assert(is_undef()); state = STATE_OPENING; diff --git a/trunk/ceph/mds/IdAllocator.h b/trunk/ceph/mds/IdAllocator.h index 745d863be99d3..c79266d3e71b6 100644 --- a/trunk/ceph/mds/IdAllocator.h +++ b/trunk/ceph/mds/IdAllocator.h @@ -42,13 +42,14 @@ class IdAllocator { map > waitfor_save; public: - IdAllocator(MDS *m, inode_t i) : + IdAllocator(MDS *m) : mds(m), - inode(i), state(STATE_UNDEF), version(0), committing_version(0), committed_version(0) { } + + void init_inode(); // alloc or reclaim ids idno_t alloc_id(bool replay=false); diff --git a/trunk/ceph/mds/Locker.cc b/trunk/ceph/mds/Locker.cc index 0b4418fe2262d..08c1900eec9f2 100644 --- a/trunk/ceph/mds/Locker.cc +++ b/trunk/ceph/mds/Locker.cc @@ -296,7 +296,7 @@ void Locker::handle_inode_file_caps(MInodeFileCaps *m) */ void Locker::handle_client_file_caps(MClientFileCaps *m) { - int client = MSG_ADDR_NUM(m->get_source()); + int client = m->get_source().num(); CInode *in = mdcache->get_inode(m->get_ino()); Capability *cap = 0; if (in) @@ -721,7 +721,7 @@ void Locker::handle_lock_inode_hard(MLock *m) { assert(m->get_otype() == LOCK_OTYPE_IHARD); - mds->logger->inc("lih"); + if (mds->logger) mds->logger->inc("lih"); int from = m->get_asker(); CInode *in = mdcache->get_inode(m->get_ino()); @@ -1539,7 +1539,7 @@ void Locker::handle_lock_inode_file(MLock *m) { assert(m->get_otype() == LOCK_OTYPE_IFILE); - mds->logger->inc("lif"); + if (mds->logger) mds->logger->inc("lif"); CInode *in = mdcache->get_inode(m->get_ino()); int from = m->get_asker(); diff --git a/trunk/ceph/mds/MDBalancer.cc b/trunk/ceph/mds/MDBalancer.cc index 0b497103183b2..c1888fea3c2d2 100644 --- a/trunk/ceph/mds/MDBalancer.cc +++ b/trunk/ceph/mds/MDBalancer.cc @@ -31,7 +31,7 @@ using namespace std; #include "config.h" #undef dout -#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mds_balancer) cout << "mds" << mds->get_nodeid() << ".bal " << (g_clock.recent_now() - mds->logger->get_start()) << " " +#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mds_balancer) cout << g_clock.now() << " mds" << mds->get_nodeid() << ".bal " #define MIN_LOAD 50 // ?? #define MIN_REEXPORT 5 // will automatically reexport @@ -125,14 +125,15 @@ void MDBalancer::send_heartbeat() MHeartbeat *hb = new MHeartbeat(load, beat_epoch); hb->get_import_map() = import_map; mds->messenger->send_message(hb, - MSG_ADDR_MDS(i), MDS_PORT_BALANCER, + MSG_ADDR_MDS(i), mds->mdsmap->get_inst(i), + MDS_PORT_BALANCER, MDS_PORT_BALANCER); } } void MDBalancer::handle_heartbeat(MHeartbeat *m) { - dout(25) << "=== got heartbeat " << m->get_beat() << " from " << MSG_ADDR_NICE(m->get_source()) << " " << m->get_load() << endl; + dout(25) << "=== got heartbeat " << m->get_beat() << " from " << m->get_source().num() << " " << m->get_load() << endl; if (!mds->mdcache->get_root()) { dout(10) << "no root on handle" << endl; @@ -140,7 +141,7 @@ void MDBalancer::handle_heartbeat(MHeartbeat *m) return; } - int who = MSG_ADDR_NUM(m->get_source()); + int who = m->get_source().num(); if (who == 0) { dout(20) << " from mds0, new epoch" << endl; diff --git a/trunk/ceph/mds/MDCache.cc b/trunk/ceph/mds/MDCache.cc index 02e2a9cd1417d..5216476131914 100644 --- a/trunk/ceph/mds/MDCache.cc +++ b/trunk/ceph/mds/MDCache.cc @@ -374,7 +374,7 @@ bool MDCache::trim(int max) } - mds->logger->inc("cex"); + if (mds->logger) mds->logger->inc("cex"); } @@ -971,9 +971,9 @@ int MDCache::path_traverse(filepath& origpath, if (dn->get_inode()->is_cached_by(from)) { dout(15) << "traverse: REP would replicate to mds" << from << ", but already cached_by " - << MSG_ADDR_NICE(req->get_source()) << " dn " << *dn << endl; + << req->get_source() << " dn " << *dn << endl; } else { - dout(10) << "traverse: REP replicating to " << MSG_ADDR_NICE(req->get_source()) << " dn " << *dn << endl; + dout(10) << "traverse: REP replicating to " << req->get_source() << " dn " << *dn << endl; MDiscoverReply *reply = new MDiscoverReply(cur->dir->ino()); reply->add_dentry( dn->get_name(), !dn->can_read()); reply->add_inode( dn->inode->replicate_to( from ) ); @@ -1017,7 +1017,7 @@ int MDCache::path_traverse(filepath& origpath, touch_inode(cur); mds->mdstore->fetch_dir(cur->dir, ondelay); - mds->logger->inc("cmiss"); + if (mds->logger) mds->logger->inc("cmiss"); if (onfinish) delete onfinish; return 1; @@ -1051,7 +1051,7 @@ int MDCache::path_traverse(filepath& origpath, want, false), dauth, MDS_PORT_CACHE); - mds->logger->inc("dis"); + if (mds->logger) mds->logger->inc("dis"); } // delay processing of current request. @@ -1061,7 +1061,7 @@ int MDCache::path_traverse(filepath& origpath, path[depth], new C_MDC_TraverseDiscover(onfinish, ondelay)); - mds->logger->inc("cmiss"); + if (mds->logger) mds->logger->inc("cmiss"); return 1; } if (onfail == MDS_TRAVERSE_FORWARD) { @@ -1077,7 +1077,7 @@ int MDCache::path_traverse(filepath& origpath, mds->send_message_mds(req, dauth, req->get_dest_port()); //show_imports(); - mds->logger->inc("cfw"); + if (mds->logger) mds->logger->inc("cfw"); if (onfinish) delete onfinish; delete ondelay; return 2; @@ -1283,7 +1283,7 @@ bool MDCache::request_start(Message *req, // request pins request_pin_inode(req, ref); - mds->logger->inc("req"); + if (mds->logger) mds->logger->inc("req"); return true; } @@ -1378,18 +1378,20 @@ void MDCache::request_cleanup(Message *req) // log some stats ***** - mds->logger->set("c", lru.lru_get_size()); - mds->logger->set("cpin", lru.lru_get_num_pinned()); - mds->logger->set("ctop", lru.lru_get_top()); - mds->logger->set("cbot", lru.lru_get_bot()); - mds->logger->set("cptail", lru.lru_get_pintail()); - //mds->logger->set("buf",buffer_total_alloc); + if (mds->logger) { + mds->logger->set("c", lru.lru_get_size()); + mds->logger->set("cpin", lru.lru_get_num_pinned()); + mds->logger->set("ctop", lru.lru_get_top()); + mds->logger->set("cbot", lru.lru_get_bot()); + mds->logger->set("cptail", lru.lru_get_pintail()); + //mds->logger->set("buf",buffer_total_alloc); + } if (g_conf.log_pins) { // pin for (int i=0; ilogger2->set(cinode_pin_names[i], - cinode_pins[i]); + if (mds->logger2) mds->logger2->set(cinode_pin_names[i], + cinode_pins[i]); } /* for (map::iterator it = cdir_pins.begin(); @@ -1397,7 +1399,7 @@ void MDCache::request_cleanup(Message *req) it++) { //string s = "D"; //s += cdir_pin_names[it->first]; - mds->logger2->set(//s, + if (mds->logger2) mds->logger2->set(//s, cdir_pin_names[it->first], it->second); } @@ -1412,7 +1414,7 @@ void MDCache::request_finish(Message *req) request_cleanup(req); delete req; // delete req - mds->logger->inc("reply"); + if (mds->logger) mds->logger->inc("reply"); //dump(); @@ -1427,7 +1429,7 @@ void MDCache::request_forward(Message *req, int who, int port) request_cleanup(req); mds->send_message_mds(req, who, port); - mds->logger->inc("fw"); + if (mds->logger) mds->logger->inc("fw"); } @@ -1995,7 +1997,7 @@ void MDCache::handle_inode_update(MInodeUpdate *m) void MDCache::handle_cache_expire(MCacheExpire *m) { int from = m->get_from(); - int source = MSG_ADDR_NUM(m->get_source()); + int source = m->get_source().num(); map proxymap; if (m->get_from() == source) { diff --git a/trunk/ceph/mds/MDS.cc b/trunk/ceph/mds/MDS.cc index a487d6469eb7a..48f38150cf41d 100644 --- a/trunk/ceph/mds/MDS.cc +++ b/trunk/ceph/mds/MDS.cc @@ -84,22 +84,9 @@ MDS::MDS(int whoami, Messenger *m, MonMap *mm) { balancer = new MDBalancer(this); anchorclient = new AnchorClient(messenger, mdsmap); + idalloc = new IdAllocator(this); - // alloc - { - inode_t id_inode; - memset(&id_inode, 0, sizeof(id_inode)); - id_inode.ino = MDS_INO_IDS_OFFSET + whoami; - id_inode.layout = g_OSD_FileLayout; - idalloc = new IdAllocator(this, id_inode); - } - - // hack: anchortable on mds0. - if (whoami == 0) - anchormgr = new AnchorTable(this); - else - anchormgr = 0; - + anchormgr = new AnchorTable(this); server = new Server(this); locker = new Locker(this, mdcache); @@ -111,6 +98,46 @@ MDS::MDS(int whoami, Messenger *m, MonMap *mm) { last_balancer_hash = last_balancer_heartbeat = g_clock.recent_now(); + + logger = logger2 = 0; + + // i'm ready! + messenger->set_dispatcher(this); +} + +MDS::~MDS() { + if (mdcache) { delete mdcache; mdcache = NULL; } + if (mdstore) { delete mdstore; mdstore = NULL; } + if (mdlog) { delete mdlog; mdlog = NULL; } + if (balancer) { delete balancer; balancer = NULL; } + if (idalloc) { delete idalloc; idalloc = NULL; } + if (anchormgr) { delete anchormgr; anchormgr = NULL; } + if (anchorclient) { delete anchorclient; anchorclient = NULL; } + if (osdmap) { delete osdmap; osdmap = 0; } + + if (filer) { delete filer; filer = 0; } + if (objecter) { delete objecter; objecter = 0; } + if (messenger) { delete messenger; messenger = NULL; } + + if (logger) { delete logger; logger = 0; } + if (logger2) { delete logger2; logger2 = 0; } + +} + + +void MDS::reopen_log() +{ + // flush+close old log + if (logger) { + logger->flush(true); + delete logger; + } + if (logger2) { + logger2->flush(true); + delete logger2; + } + + // log string name; name = "mds"; @@ -160,32 +187,8 @@ MDS::MDS(int whoami, Messenger *m, MonMap *mm) { char n[80]; sprintf(n, "mds%d.cache", whoami); logger2 = new Logger(n, (LogType*)&mds_cache_logtype); - - - // i'm ready! - messenger->set_dispatcher(this); } -MDS::~MDS() { - if (mdcache) { delete mdcache; mdcache = NULL; } - if (mdstore) { delete mdstore; mdstore = NULL; } - if (mdlog) { delete mdlog; mdlog = NULL; } - if (balancer) { delete balancer; balancer = NULL; } - if (idalloc) { delete idalloc; idalloc = NULL; } - if (anchormgr) { delete anchormgr; anchormgr = NULL; } - if (anchorclient) { delete anchorclient; anchorclient = NULL; } - if (osdmap) { delete osdmap; osdmap = 0; } - - if (filer) { delete filer; filer = 0; } - if (objecter) { delete objecter; objecter = 0; } - if (messenger) { delete messenger; messenger = NULL; } - - if (logger) { delete logger; logger = 0; } - if (logger2) { delete logger2; logger2 = 0; } - -} - - void MDS::send_message_mds(Message *m, int mds, int port, int fromport) { if (port && !fromport) @@ -213,6 +216,15 @@ void MDS::handle_mds_map(MMDSMap *m) delete m; + // see who i am + int w = mdsmap->get_inst_rank(messenger->get_myinst()); + if (w != whoami) { + whoami = w; + messenger->reset_myaddr(MSG_ADDR_MDS(w)); + reopen_log(); + } + dout(1) << "map says i am " << w << endl; + if (is_booting()) { // we need an osdmap too. int mon = monmap->pick_mon(); @@ -407,7 +419,7 @@ void MDS::handle_shutdown_start(Message *m) mdcache->shutdown_start(); // save anchor table - if (whoami == 0) + if (mdsmap->get_anchortable() == whoami) anchormgr->save(0); // FIXME FIXME // flush log @@ -552,13 +564,15 @@ void MDS::my_dispatch(Message *m) last_log = now; mds_load_t load = balancer->get_load(); - req_rate = logger->get("req"); - - logger->set("l", (int)load.mds_load()); - logger->set("q", messenger->get_dispatch_queue_len()); - logger->set("buf", buffer_total_alloc); - - mdcache->log_stat(logger); + if (logger) { + req_rate = logger->get("req"); + + logger->set("l", (int)load.mds_load()); + logger->set("q", messenger->get_dispatch_queue_len()); + logger->set("buf", buffer_total_alloc); + + mdcache->log_stat(logger); + } // balance? @@ -682,7 +696,7 @@ void MDS::proc_message(Message *m) void MDS::handle_ping(MPing *m) { - dout(10) << " received ping from " << MSG_ADDR_NICE(m->get_source()) << " with seq " << m->seq << endl; + dout(10) << " received ping from " << m->get_source() << " with seq " << m->seq << endl; messenger->send_message(new MPingAck(m), m->get_source(), m->get_source_inst()); diff --git a/trunk/ceph/mds/MDS.h b/trunk/ceph/mds/MDS.h index 1581d9c4049ca..b67a7e58e98e7 100644 --- a/trunk/ceph/mds/MDS.h +++ b/trunk/ceph/mds/MDS.h @@ -198,6 +198,7 @@ public: // start up, shutdown int init(); + void reopen_log(); void boot_mkfs(); void boot_mkfs_finish(); @@ -246,7 +247,5 @@ public: }; -ostream& operator<<(ostream& out, MDS& mds); - #endif diff --git a/trunk/ceph/mds/MDSMap.h b/trunk/ceph/mds/MDSMap.h index 6117e6943d3c7..c94a7ef88cd48 100644 --- a/trunk/ceph/mds/MDSMap.h +++ b/trunk/ceph/mds/MDSMap.h @@ -68,6 +68,16 @@ class MDSMap { } return false; } + + int get_inst_rank(const entity_inst_t& inst) { + for (map::iterator p = mds_inst.begin(); + p != mds_inst.end(); + ++p) { + if (p->second == inst) return p->first; + } + return -1; + } + // serialize, unserialize void encode(bufferlist& blist) { diff --git a/trunk/ceph/mds/MDStore.cc b/trunk/ceph/mds/MDStore.cc index 432d56751b643..433d631dfa5ca 100644 --- a/trunk/ceph/mds/MDStore.cc +++ b/trunk/ceph/mds/MDStore.cc @@ -92,7 +92,7 @@ void MDStore::fetch_dir( CDir *dir, dir->state_set(CDIR_STATE_FETCHING); // stats - mds->logger->inc("fdir"); + if (mds->logger) mds->logger->inc("fdir"); // create return context Context *fin = new C_MDS_Fetch( this, dir->ino() ); @@ -522,7 +522,7 @@ void MDStore::commit_dir( CDir *dir, dir->set_committing_version(); // stats - mds->logger->inc("cdir"); + if (mds->logger) mds->logger->inc("cdir"); if (dir->is_hashed()) { // hashed diff --git a/trunk/ceph/mds/Migrator.cc b/trunk/ceph/mds/Migrator.cc index bde26ae72dced..82c7970e072c7 100644 --- a/trunk/ceph/mds/Migrator.cc +++ b/trunk/ceph/mds/Migrator.cc @@ -281,7 +281,7 @@ void Migrator::handle_export_dir_discover_ack(MExportDirDiscoverAck *m) CDir *dir = in->dir; assert(dir); - int from = MSG_ADDR_NUM(m->get_source()); + int from = m->get_source().num(); assert(export_gather[dir].count(from)); export_gather[dir].erase(from); @@ -380,7 +380,7 @@ void Migrator::handle_export_dir_prep_ack(MExportDirPrepAck *m) dout(7) << "export_dir_prep_ack " << *dir << ", starting export" << endl; // start export. - export_dir_go(dir, MSG_ADDR_NUM(m->get_source())); + export_dir_go(dir, m->get_source().num()); // done delete m; @@ -499,8 +499,8 @@ void Migrator::export_dir_go(CDir *dir, // stats - mds->logger->inc("ex"); - mds->logger->inc("iex", num_exported_inodes); + if (mds->logger) mds->logger->inc("ex"); + if (mds->logger) mds->logger->inc("iex", num_exported_inodes); show_imports(); } @@ -728,7 +728,7 @@ void Migrator::handle_export_dir_notify_ack(MExportDirNotifyAck *m) assert(dir->is_frozen_tree_root()); // i'm exporting! // remove from waiting list - int from = MSG_ADDR_NUM(m->get_source()); + int from = m->get_source().num(); assert(export_notify_ack_waiting[dir].count(from)); export_notify_ack_waiting[dir].erase(from); @@ -816,7 +816,7 @@ void Migrator::export_dir_finish(CDir *dir) // stats - mds->logger->set("nex", cache->exports.size()); + if (mds->logger) mds->logger->set("nex", cache->exports.size()); show_imports(); } @@ -850,7 +850,7 @@ public: void Migrator::handle_export_dir_discover(MExportDirDiscover *m) { - assert(MSG_ADDR_NUM(m->get_source()) != mds->get_nodeid()); + assert(m->get_source().num() != mds->get_nodeid()); dout(7) << "handle_export_dir_discover on " << m->get_path() << endl; @@ -907,7 +907,7 @@ void Migrator::handle_export_dir_discover_2(MExportDirDiscover *m, CInode *in, i void Migrator::handle_export_dir_prep(MExportDirPrep *m) { - assert(MSG_ADDR_NUM(m->get_source()) != mds->get_nodeid()); + assert(m->get_source().num() != mds->get_nodeid()); CInode *diri = cache->get_inode(m->get_ino()); assert(diri); @@ -1093,7 +1093,7 @@ void Migrator::handle_export_dir(MExportDir *m) CDir *dir = diri->dir; assert(dir); - int oldauth = MSG_ADDR_NUM(m->get_source()); + int oldauth = m->get_source().num(); dout(7) << "handle_export_dir, import " << *dir << " from " << oldauth << endl; assert(dir->is_auth() == false); @@ -1157,7 +1157,7 @@ void Migrator::handle_export_dir(MExportDir *m) cache->imports.erase(ex); ex->state_clear(CDIR_STATE_IMPORT); - mds->logger->inc("imex"); + if (mds->logger) mds->logger->inc("imex"); // move nested exports under containing_import for (set::iterator it = cache->nested_exports[ex].begin(); @@ -1179,7 +1179,7 @@ void Migrator::handle_export_dir(MExportDir *m) ex->get(CDIR_PIN_EXPORT); // all exports are pinned cache->exports.insert(ex); cache->nested_exports[containing_import].insert(ex); - mds->logger->inc("imex"); + if (mds->logger) mds->logger->inc("imex"); } } @@ -1208,7 +1208,7 @@ void Migrator::handle_export_dir(MExportDir *m) mds->balancer->add_import(dir); // send notify's etc. - dout(7) << "sending notifyack for " << *dir << " to old auth " << MSG_ADDR_NUM(m->get_source()) << endl; + dout(7) << "sending notifyack for " << *dir << " to old auth " << m->get_source().num() << endl; mds->send_message_mds(new MExportDirNotifyAck(dir->inode->ino()), m->get_source().num(), MDS_PORT_MIGRATOR); @@ -1217,9 +1217,9 @@ void Migrator::handle_export_dir(MExportDir *m) it != dir->open_by.end(); it++) { assert( *it != mds->get_nodeid() ); - if ( *it == MSG_ADDR_NUM(m->get_source()) ) continue; // not to old auth. + if ( *it == m->get_source().num() ) continue; // not to old auth. - MExportDirNotify *notify = new MExportDirNotify(dir->ino(), MSG_ADDR_NUM(m->get_source()), mds->get_nodeid()); + MExportDirNotify *notify = new MExportDirNotify(dir->ino(), m->get_source().num(), mds->get_nodeid()); notify->copy_exports(m->get_exports()); if (g_conf.mds_verify_export_dirauth) @@ -1243,9 +1243,11 @@ void Migrator::handle_export_dir(MExportDir *m) // some stats - mds->logger->inc("im"); - mds->logger->inc("iim", num_imported_inodes); - mds->logger->set("nim", cache->imports.size()); + if (mds->logger) { + mds->logger->inc("im"); + mds->logger->inc("iim", num_imported_inodes); + mds->logger->set("nim", cache->imports.size()); + } // FIXME LOG IT @@ -1277,8 +1279,10 @@ void Migrator::handle_export_dir_finish(MExportDirFinish *m) dout(5) << "done with import of " << *dir << endl; show_imports(); - mds->logger->set("nex", cache->exports.size()); - mds->logger->set("nim", cache->imports.size()); + if (mds->logger) { + mds->logger->set("nex", cache->exports.size()); + mds->logger->set("nim", cache->imports.size()); + } // un auth pin (other exports can now proceed) dir->auth_unpin(); @@ -1813,7 +1817,7 @@ void Migrator::handle_hash_dir_discover_ack(MHashDirDiscoverAck *m) CDir *dir = in->dir; assert(dir); - int from = MSG_ADDR_NUM(m->get_source()); + int from = m->get_source().num(); assert(hash_gather[dir].count(from)); hash_gather[dir].erase(from); @@ -1923,7 +1927,7 @@ void Migrator::handle_hash_dir_prep_ack(MHashDirPrepAck *m) CDir *dir = in->dir; assert(dir); - int from = MSG_ADDR_NUM(m->get_source()); + int from = m->get_source().num(); assert(hash_gather[dir].count(from) == 1); hash_gather[dir].erase(from); @@ -2133,7 +2137,7 @@ void Migrator::handle_hash_dir_ack(MHashDirAck *m) assert(dir->is_hashed()); assert(dir->is_hashing()); - int from = MSG_ADDR_NUM(m->get_source()); + int from = m->get_source().num(); assert(hash_gather[dir].count(from) == 1); hash_gather[dir].erase(from); @@ -2191,7 +2195,7 @@ void Migrator::hash_dir_finish(CDir *dir) assert(hash_gather.count(dir) == 0); // stats - //mds->logger->inc("nh", 1); + //if (mds->logger) mds->logger->inc("nh", 1); } @@ -2211,7 +2215,7 @@ void Migrator::handle_hash_dir_notify(MHashDirNotify *m) dout(5) << "handle_hash_dir_notify " << *dir << endl; int from = m->get_from(); - int source = MSG_ADDR_NUM(m->get_source()); + int source = m->get_source().num(); if (dir->is_auth()) { // gather notifies assert(dir->is_hashed()); @@ -2318,7 +2322,7 @@ public: void Migrator::handle_hash_dir_discover(MHashDirDiscover *m) { - assert(MSG_ADDR_NUM(m->get_source()) != mds->get_nodeid()); + assert(m->get_source().num() != mds->get_nodeid()); dout(7) << "handle_hash_dir_discover on " << m->get_path() << endl; @@ -2475,7 +2479,7 @@ void Migrator::handle_hash_dir(MHashDir *m) assert(dir->is_hashing()); dout(5) << "handle_hash_dir " << *dir << endl; - int oldauth = MSG_ADDR_NUM(m->get_source()); + int oldauth = m->get_source().num(); // content import_hashed_content(dir, m->get_state(), m->get_nden(), oldauth); @@ -2498,7 +2502,7 @@ void Migrator::handle_hash_dir(MHashDir *m) dout(7) << "sending notifies" << endl; for (int i=0; iget_mds_map()->get_num_mds(); i++) { if (i == mds->get_nodeid()) continue; - if (i == MSG_ADDR_NUM(m->get_source())) continue; + if (i == m->get_source().num()) continue; mds->send_message_mds(new MHashDirNotify(dir->ino(), mds->get_nodeid()), i, MDS_PORT_MIGRATOR); } @@ -2622,7 +2626,7 @@ void Migrator::handle_unhash_dir_prep_ack(MUnhashDirPrepAck *m) CDir *dir = in->dir; assert(dir); - int from = MSG_ADDR_NUM(m->get_source()); + int from = m->get_source().num(); dout(7) << "handle_unhash_dir_prep_ack from " << from << " " << *dir << endl; if (!m->did_assim()) { @@ -2735,7 +2739,7 @@ void Migrator::handle_unhash_dir_ack(MUnhashDirAck *m) assert(dir->is_hashed()); // assimilate content - int from = MSG_ADDR_NUM(m->get_source()); + int from = m->get_source().num(); import_hashed_content(dir, m->get_state(), m->get_nden(), from); delete m; @@ -2810,7 +2814,7 @@ void Migrator::handle_unhash_dir_notify_ack(MUnhashDirNotifyAck *m) assert(dir->is_frozen_dir()); // done? - int from = MSG_ADDR_NUM(m->get_source()); + int from = m->get_source().num(); assert(hash_gather[dir].count(from)); hash_gather[dir].erase(from); delete m; @@ -3139,7 +3143,7 @@ void Migrator::handle_unhash_dir_notify(MUnhashDirNotify *m) assert(dir->is_unhashing()); assert(!dir->is_auth()); - int from = MSG_ADDR_NUM(m->get_source()); + int from = m->get_source().num(); assert(hash_gather[dir].count(from) == 1); hash_gather[dir].erase(from); delete m; diff --git a/trunk/ceph/mds/Renamer.cc b/trunk/ceph/mds/Renamer.cc index dfea8d6336803..db7a4f59a1378 100644 --- a/trunk/ceph/mds/Renamer.cc +++ b/trunk/ceph/mds/Renamer.cc @@ -597,7 +597,7 @@ void Renamer::handle_rename_notify_ack(MRenameNotifyAck *m) assert(in); dout(7) << "handle_rename_notify_ack on " << *in << endl; - int source = MSG_ADDR_NUM(m->get_source()); + int source = m->get_source().num(); rename_waiting_for_ack[in->ino()].erase(source); if (rename_waiting_for_ack[in->ino()].empty()) { // last one! @@ -724,7 +724,7 @@ void Renamer::handle_rename(MRename *m) // HACK bufferlist bufstate; bufstate.claim_append(m->get_inode_state()); - cache->migrator->decode_import_inode(destdn, bufstate, off, MSG_ADDR_NUM(m->get_source())); + cache->migrator->decode_import_inode(destdn, bufstate, off, m->get_source().num()); CInode *in = destdn->inode; assert(in); @@ -746,11 +746,11 @@ void Renamer::handle_rename(MRename *m) // ok, send notifies. set notify; for (int i=0; iget_mds_map()->get_num_mds(); i++) { - if (i != MSG_ADDR_NUM(m->get_source()) && // except the source + if (i != m->get_source().num() && // except the source i != mds->get_nodeid()) // and the dest notify.insert(i); } - file_rename_notify(in, srcdir, srcname, destdir, destname, notify, MSG_ADDR_NUM(m->get_source())); + file_rename_notify(in, srcdir, srcname, destdir, destname, notify, m->get_source().num()); delete m; } diff --git a/trunk/ceph/mds/Server.cc b/trunk/ceph/mds/Server.cc index 28ebb826e1a3a..d333330002c2b 100644 --- a/trunk/ceph/mds/Server.cc +++ b/trunk/ceph/mds/Server.cc @@ -52,8 +52,8 @@ using namespace std; #include "config.h" #undef dout -#define dout(l) if (l<=g_conf.debug || l <= g_conf.debug_mds) cout << g_clock.now() << " mds" << whoami << ".server " -#define derr(l) if (l<=g_conf.debug || l <= g_conf.debug_mds) cout << g_clock.now() << " mds" << whoami << ".server " +#define dout(l) if (l<=g_conf.debug || l <= g_conf.debug_mds) cout << g_clock.now() << " mds" << mds->get_nodeid() << ".server " +#define derr(l) if (l<=g_conf.debug || l <= g_conf.debug_mds) cout << g_clock.now() << " mds" << mds->get_nodeid() << ".server " void Server::dispatch(Message *m) @@ -99,11 +99,11 @@ void Server::dispatch(Message *m) void Server::handle_client_mount(MClientMount *m) { - int n = MSG_ADDR_NUM(m->get_source()); + int n = m->get_source().num(); dout(3) << "mount by client" << n << endl; mds->clientmap.add_mount(n, m->get_source_inst()); - assert(whoami == 0); // mds0 mounts/unmounts + assert(mds->get_nodeid() == 0); // mds0 mounts/unmounts // ack messenger->send_message(new MClientMountAck(m, mds->mdsmap, mds->osdmap), @@ -113,10 +113,10 @@ void Server::handle_client_mount(MClientMount *m) void Server::handle_client_unmount(Message *m) { - int n = MSG_ADDR_NUM(m->get_source()); + int n = m->get_source().num(); dout(3) << "unmount by client" << n << endl; - assert(whoami == 0); // mds0 mounts/unmounts + assert(mds->get_nodeid() == 0); // mds0 mounts/unmounts mds->clientmap.rem_mount(n); @@ -189,7 +189,7 @@ void Server::reply_request(MClientRequest *req, MClientReply *reply, CInode *tra // include trace if (tracei) { - reply->set_trace_dist( tracei, whoami ); + reply->set_trace_dist( tracei, mds->get_nodeid() ); } // send reply @@ -284,7 +284,7 @@ void Server::handle_client_request(MClientRequest *req) ref = mdcache->get_inode(req->get_ino()); // fixme someday no ino needed? if (!ref) { - int next = whoami + 1; + int next = mds->get_nodeid() + 1; if (next >= mds->mdsmap->get_num_mds()) next = 0; dout(10) << "got request on ino we don't have, passing buck to " << next << endl; mds->send_message_mds(req, next, MDS_PORT_SERVER); @@ -649,7 +649,7 @@ int Server::encode_dir_contents(CDir *dir, // hashed? if (dir->is_hashed() && - whoami != mds->hash_dentry( dir->ino(), it->first )) + mds->get_nodeid() != mds->hash_dentry( dir->ino(), it->first )) continue; // is dentry readable? @@ -667,7 +667,7 @@ int Server::encode_dir_contents(CDir *dir, // add this item // note: InodeStat makes note of whether inode data is readable. dnls.push_back( it->first ); - inls.push_back( new InodeStat(in, whoami) ); + inls.push_back( new InodeStat(in, mds->get_nodeid()) ); numfiles++; } return numfiles; @@ -728,7 +728,7 @@ void Server::handle_hash_readdir_reply(MHashReaddirReply *m) assert(dir->is_hashed()); // move items to hashed_readdir gather - int from = MSG_ADDR_NUM(m->get_source()); + int from = m->get_source().num(); assert(dir->hashed_readdir.count(from) == 0); dir->hashed_readdir[from].first.splice(dir->hashed_readdir[from].first.begin(), m->get_in()); @@ -821,7 +821,7 @@ void Server::handle_client_readdir(MClientRequest *req, if (cur->dir) dirauth = cur->dir->authority(); assert(dirauth >= 0); - assert(dirauth != whoami); + assert(dirauth != mds->get_nodeid()); // forward to authority dout(10) << " forwarding readdir to authority " << dirauth << endl; @@ -880,12 +880,12 @@ void Server::handle_client_readdir(MClientRequest *req, // get local bits encode_dir_contents(cur->dir, - dir->hashed_readdir[whoami].first, - dir->hashed_readdir[whoami].second); + dir->hashed_readdir[mds->get_nodeid()].first, + dir->hashed_readdir[mds->get_nodeid()].second); // request other bits for (int i=0; imdsmap->get_num_mds(); i++) { - if (i == whoami) continue; + if (i == mds->get_nodeid()) continue; mds->send_message_mds(new MHashReaddir(dir->ino()), i, MDS_PORT_SERVER); } @@ -901,7 +901,7 @@ void Server::handle_client_readdir(MClientRequest *req, // . too dnls.push_back("."); - inls.push_back(new InodeStat(cur, whoami)); + inls.push_back(new InodeStat(cur, mds->get_nodeid())); ++numfiles; // yay, reply @@ -972,7 +972,7 @@ CInode *Server::mknod(MClientRequest *req, CInode *diri, bool okexist) // make sure it's my dentry int dnauth = dir->dentry_authority(name); - if (dnauth != whoami) { + if (dnauth != mds->get_nodeid()) { // fw dout(7) << "mknod on " << req->get_path() << ", dentry " << *dir << " dn " << name << " not mine, fw to " << dnauth << endl; @@ -1093,7 +1093,7 @@ void Server::handle_client_link(MClientRequest *req, CInode *ref) // make sure it's my dentry int dauth = dir->dentry_authority(dname); - if (dauth != whoami) { + if (dauth != mds->get_nodeid()) { // fw dout(7) << "link on " << req->get_path() << ", dn " << dname << " in " << *dir << " not mine, fw to " << dauth << endl; mdcache->request_forward(req, dauth); @@ -1191,7 +1191,7 @@ void Server::handle_client_link_2(int r, MClientRequest *req, CInode *ref, vecto string dname = req->get_filepath().last_bit(); int dauth = dir->dentry_authority(dname); - if (whoami != dauth) { + if (mds->get_nodeid() != dauth) { // ugh, exported out from under us dout(7) << "ugh, forwarded out from under us, dentry auth is " << dauth << endl; mdcache->request_forward(req, dauth); @@ -1237,7 +1237,7 @@ void Server::handle_client_link_2(int r, MClientRequest *req, CInode *ref, vecto } else { // remote: send nlink++ request, wait dout(7) << "target is remote, sending InodeLink" << endl; - mds->send_message_mds(new MInodeLink(targeti->ino(), whoami), targeti->authority(), MDS_PORT_CACHE); + mds->send_message_mds(new MInodeLink(targeti->ino(), mds->get_nodeid()), targeti->authority(), MDS_PORT_CACHE); // wait targeti->add_waiter(CINODE_WAIT_LINK, @@ -1303,7 +1303,7 @@ void Server::handle_client_unlink(MClientRequest *req, // does it exist? CDentry *dn = dir->lookup(name); if (!dn) { - if (dnauth == whoami) { + if (dnauth == mds->get_nodeid()) { dout(7) << "handle_client_rmdir/unlink dne " << name << " in " << *dir << endl; reply_request(req, -ENOENT); } else { @@ -1413,7 +1413,7 @@ void Server::handle_client_unlink(MClientRequest *req, } // am i dentry auth? - if (dnauth != whoami) { + if (dnauth != mds->get_nodeid()) { // not auth; forward! dout(7) << "handle_client_unlink not auth for " << *dir << " dn " << dn->name << ", fwd to " << dnauth << endl; mdcache->request_forward(req, dnauth); @@ -1586,7 +1586,7 @@ void Server::handle_client_rename(MClientRequest *req, // make sure it's my dentry int srcauth = srcdir->dentry_authority(srcname); - if (srcauth != whoami) { + if (srcauth != mds->get_nodeid()) { // fw dout(7) << "rename on " << req->get_path() << ", dentry " << *srcdir << " dn " << srcname << " not mine, fw to " << srcauth << endl; mdcache->request_forward(req, srcauth); @@ -1780,8 +1780,8 @@ void Server::handle_client_rename_2(MClientRequest *req, dout(7) << "handle_client_rename_2 destname " << destname << " destdir " << *destdir << " auth " << destauth << endl; // - if (srcauth != whoami || - destauth != whoami) { + if (srcauth != mds->get_nodeid() || + destauth != mds->get_nodeid()) { dout(7) << "rename has remote dest " << destauth << endl; dout(7) << "FOREIGN RENAME" << endl; @@ -1857,8 +1857,8 @@ void Server::handle_client_rename_local(MClientRequest *req, //everybody = true; //} - bool srclocal = srcdn->dir->dentry_authority(srcdn->name) == whoami; - bool destlocal = destdir->dentry_authority(destname) == whoami; + bool srclocal = srcdn->dir->dentry_authority(srcdn->name) == mds->get_nodeid(); + bool destlocal = destdir->dentry_authority(destname) == mds->get_nodeid(); dout(7) << "handle_client_rename_local: src local=" << srclocal << " " << *srcdn << endl; if (destdn) { @@ -1991,7 +1991,7 @@ void Server::handle_client_mkdir(MClientRequest *req, CInode *diri) newdir->is_auth() && !newdir->is_hashing()) { int dest = rand() % mds->mdsmap->get_num_mds(); - if (dest != whoami) { + if (dest != mds->get_nodeid()) { dout(10) << "exporting new dir " << *newdir << " in replicated parent " << *diri->dir << endl; mdcache->migrator->export_dir(newdir, dest); } @@ -2093,7 +2093,7 @@ void Server::handle_client_open(MClientRequest *req, if (mode != FILE_MODE_R && mode != FILE_MODE_LAZY && !cur->is_auth()) { int auth = cur->authority(); - assert(auth != whoami); + assert(auth != mds->get_nodeid()); dout(9) << "open writeable on replica for " << *cur << " fw to auth " << auth << endl; mdcache->request_forward(req, auth); diff --git a/trunk/ceph/mds/Server.h b/trunk/ceph/mds/Server.h index 912af31ca909a..53e917386440e 100644 --- a/trunk/ceph/mds/Server.h +++ b/trunk/ceph/mds/Server.h @@ -23,7 +23,6 @@ class Server { MDCache *mdcache; MDLog *mdlog; Messenger *messenger; - int whoami; __uint64_t stat_ops; @@ -32,7 +31,7 @@ public: Server(MDS *m) : mds(m), mdcache(mds->mdcache), mdlog(mds->mdlog), - messenger(mds->messenger), whoami(mds->get_nodeid()), + messenger(mds->messenger), stat_ops(0) { } diff --git a/trunk/ceph/messages/MClientBoot.h b/trunk/ceph/messages/MClientBoot.h new file mode 100644 index 0000000000000..0b73505642d7d --- /dev/null +++ b/trunk/ceph/messages/MClientBoot.h @@ -0,0 +1,34 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + + +#ifndef __MCLIENTBOOT_H +#define __MCLIENTBOOT_H + +#include "msg/Message.h" + +class MClientBoot : public Message { + + public: + MClientBoot() : Message(MSG_CLIENT_BOOT) { + } + + char *get_type_name() { return "Cboot"; } + + virtual void decode_payload(crope& s, int& off) { + } + virtual void encode_payload(crope& s) { + } +}; + +#endif diff --git a/trunk/ceph/messages/MClientMount.h b/trunk/ceph/messages/MClientMount.h index fd253baed0f24..0684cea8d95c2 100644 --- a/trunk/ceph/messages/MClientMount.h +++ b/trunk/ceph/messages/MClientMount.h @@ -18,32 +18,16 @@ #include "msg/Message.h" class MClientMount : public Message { - long pcid; - int mkfs; public: MClientMount() : Message(MSG_CLIENT_MOUNT) { - pcid = 0; - mkfs = 0; } - void set_mkfs(int m) { mkfs = m; } - int get_mkfs() { return mkfs; } - - void set_pcid(long pcid) { this->pcid = pcid; } - long get_pcid() { return pcid; } - char *get_type_name() { return "Cmnt"; } virtual void decode_payload(crope& s, int& off) { - s.copy(off, sizeof(pcid), (char*)&pcid); - off += sizeof(pcid); - s.copy(off, sizeof(mkfs), (char*)&mkfs); - off += sizeof(mkfs); } virtual void encode_payload(crope& s) { - s.append((char*)&pcid, sizeof(pcid)); - s.append((char*)&mkfs, sizeof(mkfs)); } }; diff --git a/trunk/ceph/messages/MOSDOp.h b/trunk/ceph/messages/MOSDOp.h index 1297c764402d2..f56ba8a479ed3 100644 --- a/trunk/ceph/messages/MOSDOp.h +++ b/trunk/ceph/messages/MOSDOp.h @@ -206,7 +206,7 @@ private: inline ostream& operator<<(ostream& out, MOSDOp& op) { - return out << "MOSDOp(" << MSG_ADDR_NICE(op.get_client()) << "." << op.get_tid() + return out << "MOSDOp(" << op.get_client() << "." << op.get_tid() << " op " << MOSDOp::get_opname(op.get_op()) << " oid " << hex << op.get_oid() << dec << " " << &op << ")"; } diff --git a/trunk/ceph/mon/ClientMonitor.cc b/trunk/ceph/mon/ClientMonitor.cc new file mode 100644 index 0000000000000..8da75ab066ee0 --- /dev/null +++ b/trunk/ceph/mon/ClientMonitor.cc @@ -0,0 +1,107 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + + +#include "ClientMonitor.h" +#include "Monitor.h" +#include "MDSMonitor.h" + +#include "messages/MClientBoot.h" +#include "messages/MMDSMap.h" +//#include "messages/MMDSFailure.h" + +#include "common/Timer.h" + +#include "config.h" +#undef dout +#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cout << g_clock.now() << " mon" << mon->whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".client " +#define derr(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cerr << g_clock.now() << " mon" << mon->whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".client " + + + + +void ClientMonitor::dispatch(Message *m) +{ + switch (m->get_type()) { + + case MSG_CLIENT_BOOT: + handle_client_boot((MClientBoot*)m); + break; + + /* + case MSG_client_FAILURE: + handle_client_failure((MClientFailure*)m); + break; + */ + + default: + assert(0); + } +} + +void ClientMonitor::handle_client_boot(MClientBoot *m) +{ + dout(7) << "client_boot from " << m->get_source() << " at " << m->get_source_inst() << endl; + assert(m->get_source().is_client()); + int from = m->get_source().num(); + + // choose an MDS id + if (from < 0 || + (client_map.count(m->get_source()) && client_map[m->get_source()] != m->get_source_inst())) { + from = ++num_clients; + dout(10) << "client_boot assigned client" << from << endl; + } + + client_map[MSG_ADDR_CLIENT(from)] = m->get_source_inst(); + + // reply with latest mds map + mon->mdsmon->send_latest(MSG_ADDR_CLIENT(from), m->get_source_inst()); + delete m; +} + +/* +void ClientMonitor::handle_mds_shutdown(Message *m) +{ + assert(m->get_source().is_mds()); + int from = m->get_source().num(); + + mdsmap.mds_inst.erase(from); + mdsmap.all_mds.erase(from); + + dout(7) << "mds_shutdown from " << m->get_source() + << ", still have " << mdsmap.all_mds + << endl; + + // tell someone? + // fixme + + delete m; +} + +*/ + +/* +void ClientMonitor::bcast_latest_mds() +{ + dout(10) << "bcast_latest_mds " << mdsmap.get_epoch() << endl; + + // tell mds + for (set::iterator p = mdsmap.get_mds().begin(); + p != mdsmap.get_mds().end(); + p++) { + if (mdsmap.is_down(*p)) continue; + send_full(MSG_ADDR_MDS(*p), mdsmap.get_inst(*p)); + } +} + +*/ diff --git a/trunk/ceph/mon/ClientMonitor.h b/trunk/ceph/mon/ClientMonitor.h new file mode 100644 index 0000000000000..09c13adddf5ef --- /dev/null +++ b/trunk/ceph/mon/ClientMonitor.h @@ -0,0 +1,52 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef __CLIENTMONITOR_H +#define __CLIENTMONITOR_H + +#include +#include +using namespace std; + +#include "include/types.h" +#include "msg/Messenger.h" + +#include "mds/MDSMap.h" + +class Monitor; + +class ClientMonitor : public Dispatcher { + Monitor *mon; + Messenger *messenger; + Mutex &lock; + + private: + int num_clients; + map client_map; + + void bcast_latest_mds(); + + //void accept_pending(); // accept pending, new map. + //void send_incremental(epoch_t since, msg_addr_t dest); + + void handle_client_boot(class MClientBoot *m); + + public: + ClientMonitor(Monitor *mn, Messenger *m, Mutex& l) : mon(mn), messenger(m), lock(l), + num_clients(0) { } + + void dispatch(Message *m); + void tick(); // check state, take actions +}; + +#endif diff --git a/trunk/ceph/mon/Elector.cc b/trunk/ceph/mon/Elector.cc index a08d0bd7f87df..5b793777ab3dd 100644 --- a/trunk/ceph/mon/Elector.cc +++ b/trunk/ceph/mon/Elector.cc @@ -34,7 +34,8 @@ void Elector::read_timer() old_views = views; // TODO deep copy for (unsigned i=0; imessenger->send_message(new MMonElectionCollect(read_num), - MSG_ADDR_MON(processes[i])); + MSG_ADDR_MON(processes[i]), + mon->monmap->get_inst(processes[i])); } } lock.Unlock(); @@ -83,7 +84,7 @@ void Elector::refresh_timer() refresh_num++; MMonElectionRefresh *msg = new MMonElectionRefresh(whoami, registry[whoami], refresh_num); for (unsigned i=0; imessenger->send_message(msg, MSG_ADDR_MON(processes[i])); + mon->messenger->send_message(msg, MSG_ADDR_MON(processes[i]), mon->monmap->get_inst(processes[i])); } // Start the trip timer @@ -163,7 +164,8 @@ void Elector::handle_collect(MMonElectionCollect* msg) mon->messenger->send_message(new MMonElectionStatus(msg->get_source().num(), msg->read_num, registry), - msg->get_source()); + msg->get_source(), + mon->monmap->get_inst(msg->get_source().num())); delete msg; } @@ -175,8 +177,9 @@ void Elector::handle_refresh(MMonElectionRefresh* msg) // reply to msg mon->messenger->send_message(new MMonElectionAck(msg->p, - msg->refresh_num), - msg->get_source()); + msg->refresh_num), + msg->get_source(), + mon->monmap->get_inst(msg->get_source().num())); } delete msg; diff --git a/trunk/ceph/mon/MDSMonitor.cc b/trunk/ceph/mon/MDSMonitor.cc index e2e2553670fe7..a31d264b529c4 100644 --- a/trunk/ceph/mon/MDSMonitor.cc +++ b/trunk/ceph/mon/MDSMonitor.cc @@ -73,6 +73,13 @@ void MDSMonitor::handle_mds_boot(MMDSBoot *m) dout(7) << "mds_boot from " << m->get_source() << " at " << m->get_source_inst() << endl; assert(m->get_source().is_mds()); int from = m->get_source().num(); + + // choose an MDS id + if (from < 0 || !mdsmap.is_down(from)) { + for (from=0; ; ++from) + if (mdsmap.is_down(from)) break; + dout(10) << "mds_boot assigned mds" << from << endl; + } if (mdsmap.get_epoch() == 0) { // waiting for boot! @@ -156,3 +163,11 @@ void MDSMonitor::send_current() awaiting_map.clear(); } +void MDSMonitor::send_latest(msg_addr_t dest, const entity_inst_t& inst) +{ + // FIXME: check if we're locked, etc. + if (mdsmap.get_epoch() > 0) + send_full(dest, inst); + else + awaiting_map[dest] = inst; +} diff --git a/trunk/ceph/mon/MDSMonitor.h b/trunk/ceph/mon/MDSMonitor.h index 66e28451e1de4..58cb8912f0bf6 100644 --- a/trunk/ceph/mon/MDSMonitor.h +++ b/trunk/ceph/mon/MDSMonitor.h @@ -57,6 +57,8 @@ class MDSMonitor : public Dispatcher { void handle_mds_getmap(class MMDSGetMap *m); void handle_mds_shutdown(Message *m); + + public: MDSMonitor(Monitor *mn, Messenger *m, Mutex& l) : mon(mn), messenger(m), lock(l) { create_initial(); @@ -64,6 +66,9 @@ class MDSMonitor : public Dispatcher { void dispatch(Message *m); void tick(); // check state, take actions + + void send_latest(msg_addr_t dest, const entity_inst_t& inst); + }; #endif diff --git a/trunk/ceph/mon/Monitor.cc b/trunk/ceph/mon/Monitor.cc index e0462534553d6..acba0e9f5e45a 100644 --- a/trunk/ceph/mon/Monitor.cc +++ b/trunk/ceph/mon/Monitor.cc @@ -31,6 +31,7 @@ #include "OSDMonitor.h" #include "MDSMonitor.h" +#include "ClientMonitor.h" #include "config.h" #undef dout @@ -56,6 +57,7 @@ void Monitor::init() // create osdmon = new OSDMonitor(this, messenger, lock); mdsmon = new MDSMonitor(this, messenger, lock); + clientmon = new ClientMonitor(this, messenger, lock); // i'm ready! messenger->set_dispatcher(this); @@ -95,6 +97,7 @@ void Monitor::shutdown() if (monmap) delete monmap; if (osdmon) delete osdmon; if (mdsmon) delete mdsmon; + if (clientmon) delete clientmon; // die. messenger->shutdown(); @@ -154,6 +157,11 @@ void Monitor::dispatch(Message *m) mdsmon->dispatch(m); break; + // clients + case MSG_CLIENT_BOOT: + clientmon->dispatch(m); + break; + // elector messages case MSG_MON_ELECTION_ACK: diff --git a/trunk/ceph/mon/Monitor.h b/trunk/ceph/mon/Monitor.h index 0b8890fcbae3b..9df57cfb23fea 100644 --- a/trunk/ceph/mon/Monitor.h +++ b/trunk/ceph/mon/Monitor.h @@ -24,6 +24,7 @@ class ObjectStore; class OSDMonitor; class MDSMonitor; +class ClientMonitor; class Monitor : public Dispatcher { protected: @@ -75,6 +76,7 @@ protected: // my public services OSDMonitor *osdmon; MDSMonitor *mdsmon; + ClientMonitor *clientmon; // messages void handle_shutdown(Message *m); @@ -82,6 +84,7 @@ protected: friend class OSDMonitor; friend class MDSMonitor; + friend class ClientMonitor; public: Monitor(int w, Messenger *m, MonMap *mm) : @@ -94,8 +97,7 @@ protected: mon_epoch(0), state(STATE_STARTING), leader(0), - osdmon(0), - mdsmon(0) + osdmon(0), mdsmon(0), clientmon(0) { // hack leader, until election works. if (whoami == 0) diff --git a/trunk/ceph/msg/FakeMessenger.cc b/trunk/ceph/msg/FakeMessenger.cc index 01f6301c2618e..c809a655f5a09 100644 --- a/trunk/ceph/msg/FakeMessenger.cc +++ b/trunk/ceph/msg/FakeMessenger.cc @@ -48,11 +48,13 @@ using namespace __gnu_cxx; // global queue. -map directory; +int nranks = 0; // this identify each entity_inst_t + +map directory; hash_map loggers; LogType fakemsg_logtype; -set shutdown_set; +set shutdown_set; Mutex lock; Cond cond; @@ -64,6 +66,7 @@ bool fm_shutdown = false; pthread_t thread_id; + class C_FakeKicker : public Context { void finish(int r) { dout(18) << "timer kick" << endl; @@ -175,15 +178,15 @@ int fakemessenger_do_loop_2() lock.Lock(); // messages - map::iterator it = directory.begin(); + map::iterator it = directory.begin(); while (it != directory.end()) { + FakeMessenger *mgr = it->second; - dout(18) << "messenger " << it->second << " at " << MSG_ADDR_NICE(it->first) << " has " << it->second->num_incoming() << " queued" << endl; + dout(18) << "messenger " << mgr << " at " << mgr->get_myaddr() << " has " << mgr->num_incoming() << " queued" << endl; - FakeMessenger *mgr = it->second; if (!mgr->is_ready()) { - dout(18) << "messenger " << it->second << " at " << MSG_ADDR_NICE(it->first) << " has no dispatcher, skipping" << endl; + dout(18) << "messenger " << mgr << " at " << mgr->get_myaddr() << " has no dispatcher, skipping" << endl; it++; continue; } @@ -194,8 +197,8 @@ int fakemessenger_do_loop_2() if (m) { //dout(18) << "got " << m << endl; dout(1) << "---- '" << m->get_type_name() - << "' from " << MSG_ADDR_NICE(m->get_source()) // << ':' << m->get_source_port() - << " to " << MSG_ADDR_NICE(m->get_dest()) //<< ':' << m->get_dest_port() + << "' from " << m->get_source() // << ':' << m->get_source_port() + << " to " << m->get_dest() //<< ':' << m->get_dest_port() << " ---- " << m << endl; @@ -225,7 +228,7 @@ int fakemessenger_do_loop_2() // deal with shutdowns.. dleayed to avoid concurrent directory modification if (!shutdown_set.empty()) { - for (set::iterator it = shutdown_set.begin(); + for (set::iterator it = shutdown_set.begin(); it != shutdown_set.end(); it++) { dout(7) << "fakemessenger: removing " << *it << " from directory" << endl; @@ -252,12 +255,21 @@ int fakemessenger_do_loop_2() FakeMessenger::FakeMessenger(msg_addr_t me) : Messenger(me) { - myaddr = me; + entity_inst_t fakeinst; lock.Lock(); - directory[ myaddr ] = this; + { + // assign rank + fakeinst.addr.sin_port = + fakeinst.rank = nranks++; + set_myinst(fakeinst); + + // add to directory + directory[ fakeinst.rank ] = this; + } lock.Unlock(); - cout << "fakemessenger " << myaddr << " messenger is " << this << endl; + + cout << "fakemessenger " << get_myaddr() << " messenger is " << this << " at " << fakeinst << endl; //g_timer.set_messenger(this); @@ -287,8 +299,8 @@ int FakeMessenger::shutdown() { //cout << "shutdown on messenger " << this << " has " << num_incoming() << " queued" << endl; lock.Lock(); - assert(directory.count(myaddr) == 1); - shutdown_set.insert(myaddr); + assert(directory.count(get_myinst().rank) == 1); + shutdown_set.insert(get_myinst().rank); /* directory.erase(myaddr); @@ -321,14 +333,20 @@ void FakeMessenger::trigger_timer(Timer *t) } */ -int FakeMessenger::send_message(Message *m, msg_addr_t dest, int port, int fromport) +void FakeMessenger::reset_myaddr(msg_addr_t m) +{ + dout(1) << "reset_myaddr from " << get_myaddr() << " to " << m << endl; + _set_myaddr(m); +} + + +int FakeMessenger::send_message(Message *m, msg_addr_t dest, const entity_inst_t& inst, int port, int fromport) { - m->set_source(myaddr, fromport); + m->set_source(get_myaddr(), fromport); m->set_dest(dest, port); //m->set_lamport_send_stamp( get_lamport() ); - entity_inst_t blank; - m->set_source_inst(blank); + m->set_source_inst(get_myinst()); lock.Lock(); @@ -336,26 +354,26 @@ int FakeMessenger::send_message(Message *m, msg_addr_t dest, int port, int fromp try { #ifdef LOG_MESSAGES // stats - loggers[myaddr]->inc("+send",1); + loggers[get_myaddr()]->inc("+send",1); loggers[dest]->inc("-recv",1); char s[20]; sprintf(s,"+%s", m->get_type_name()); - loggers[myaddr]->inc(s); + loggers[get_myaddr()]->inc(s); sprintf(s,"-%s", m->get_type_name()); loggers[dest]->inc(s); #endif // queue - FakeMessenger *dm = directory[dest]; + FakeMessenger *dm = directory[inst.rank]; if (!dm) { - dout(1) << "** destination " << MSG_ADDR_NICE(dest) << " (" << dest << ") dne" << endl; + dout(1) << "** destination " << dest << " (" << inst << ") dne" << endl; assert(dm); } dm->queue_incoming(m); - dout(1) << "--> " << myaddr << " sending " << m << " '" << m->get_type_name() << "'" - << " to " << MSG_ADDR_NICE(dest) + dout(1) << "--> " << get_myaddr() << " sending " << m << " '" << m->get_type_name() << "'" + << " to " << dest << endl;//" m " << dm << " has " << dm->num_incoming() << " queued" << endl; } diff --git a/trunk/ceph/msg/FakeMessenger.h b/trunk/ceph/msg/FakeMessenger.h index 51bec779c4366..7833f224f8bbd 100644 --- a/trunk/ceph/msg/FakeMessenger.h +++ b/trunk/ceph/msg/FakeMessenger.h @@ -26,8 +26,6 @@ class Timer; class FakeMessenger : public Messenger { protected: - msg_addr_t myaddr; - class Logger *logger; int qlen; @@ -39,8 +37,10 @@ class FakeMessenger : public Messenger { virtual int shutdown(); + void reset_myaddr(msg_addr_t m); + // msg interface - virtual int send_message(Message *m, msg_addr_t dest, int port=0, int fromport=0); + virtual int send_message(Message *m, msg_addr_t dest, entity_inst_t inst, int port=0, int fromport=0); // events //virtual void trigger_timer(Timer *t); diff --git a/trunk/ceph/msg/Message.cc b/trunk/ceph/msg/Message.cc index b37c4d2cb421d..2ce6b8bbf1272 100644 --- a/trunk/ceph/msg/Message.cc +++ b/trunk/ceph/msg/Message.cc @@ -41,6 +41,7 @@ using namespace std; #include "messages/MOSDPGLog.h" #include "messages/MOSDPGRemove.h" +#include "messages/MClientBoot.h" #include "messages/MClientMount.h" #include "messages/MClientMountAck.h" #include "messages/MClientRequest.h" @@ -218,6 +219,9 @@ decode_message(msg_envelope_t& env, bufferlist& payload) break; // clients + case MSG_CLIENT_BOOT: + m = new MClientBoot(); + break; case MSG_CLIENT_MOUNT: m = new MClientMount(); break; diff --git a/trunk/ceph/msg/Message.h b/trunk/ceph/msg/Message.h index afe1ae6941844..5f41453e9ec40 100644 --- a/trunk/ceph/msg/Message.h +++ b/trunk/ceph/msg/Message.h @@ -79,9 +79,10 @@ #define MSG_CLIENT_FILECAPS 63 #define MSG_CLIENT_INODEAUTHUPDATE 64 -#define MSG_CLIENT_MOUNT 70 -#define MSG_CLIENT_MOUNTACK 71 -#define MSG_CLIENT_UNMOUNT 72 +#define MSG_CLIENT_BOOT 70 +#define MSG_CLIENT_MOUNT 71 +#define MSG_CLIENT_MOUNTACK 72 +#define MSG_CLIENT_UNMOUNT 73 // *** MDS *** @@ -183,53 +184,26 @@ using __gnu_cxx::crope; // use fixed offsets and static entity -> logical addr mapping! #define MSG_ADDR_NAMER_BASE 0 -#define MSG_ADDR_RANK_BASE 0x10000000 // per-rank messenger services -#define MSG_ADDR_MDS_BASE 0x20000000 -#define MSG_ADDR_OSD_BASE 0x30000000 -#define MSG_ADDR_MON_BASE 0x40000000 -#define MSG_ADDR_CLIENT_BASE 0x50000000 +#define MSG_ADDR_RANK_BASE 1 +#define MSG_ADDR_MDS_BASE 2 +#define MSG_ADDR_OSD_BASE 3 +#define MSG_ADDR_MON_BASE 4 +#define MSG_ADDR_CLIENT_BASE 5 -#define MSG_ADDR_TYPE_MASK 0xf0000000 -#define MSG_ADDR_NUM_MASK 0x0fffffff +#define MSG_ADDR_NEW -1 -#define MSG_ADDR_NEW 0x0fffffff -#define MSG_ADDR_UNDEF_BASE 0xffffffff - - -/* old int way, which lacked type safety... -typedef int msg_addr_t; - -#define MSG_ADDR_RANK(x) (MSG_ADDR_RANK_BASE + (x)) -#define MSG_ADDR_MDS(x) (MSG_ADDR_MDS_BASE + (x)) -#define MSG_ADDR_OSD(x) (MSG_ADDR_OSD_BASE + (x)) -#define MSG_ADDR_CLIENT(x) (MSG_ADDR_CLIENT_BASE + (x)) - -#define MSG_ADDR_DIRECTORY 0 -#define MSG_ADDR_RANK_NEW MSG_ADDR_RANK(MSG_ADDR_NEW) -#define MSG_ADDR_MDS_NEW MSG_ADDR_MDS(MSG_ADDR_NEW) -#define MSG_ADDR_OSD_NEW MSG_ADDR_OSD(MSG_ADDR_NEW) -#define MSG_ADDR_CLIENT_NEW MSG_ADDR_CLIENT(MSG_ADDR_NEW) - -#define MSG_ADDR_ISCLIENT(x) ((x) >= MSG_ADDR_CLIENT_BASE) -#define MSG_ADDR_TYPE(x) (((x) & MSG_ADDR_TYPE_MASK) == MSG_ADDR_RANK_BASE ? "rank": \ - (((x) & MSG_ADDR_TYPE_MASK) == MSG_ADDR_CLIENT_BASE ? "client": \ - (((x) & MSG_ADDR_TYPE_MASK) == MSG_ADDR_OSD_BASE ? "osd": \ - (((x) & MSG_ADDR_TYPE_MASK) == MSG_ADDR_MDS_BASE ? "mds": \ - ((x) == MSG_ADDR_DIRECTORY ? "namer":"unknown"))))) -#define MSG_ADDR_NUM(x) ((x) & MSG_ADDR_NUM_MASK) -#define MSG_ADDR_NICE(x) MSG_ADDR_TYPE(x) << MSG_ADDR_NUM(x) -*/ // new typed msg_addr_t way! class msg_addr_t { public: - int _addr; + int _type; + int _num; - msg_addr_t() : _addr(MSG_ADDR_UNDEF_BASE) {} - msg_addr_t(int t, int n) : _addr(t | n) {} + msg_addr_t() : _type(0), _num(0) {} + msg_addr_t(int t, int n) : _type(t), _num(n) {} - int num() const { return _addr & MSG_ADDR_NUM_MASK; } - int type() const { return _addr & MSG_ADDR_TYPE_MASK; } + int num() const { return _num; } + int type() const { return _type; } const char *type_str() const { switch (type()) { case MSG_ADDR_RANK_BASE: return "rank"; @@ -251,25 +225,25 @@ public: bool is_namer() const { return type() == MSG_ADDR_NAMER_BASE; } }; -inline bool operator== (const msg_addr_t& l, const msg_addr_t& r) { return l._addr == r._addr; } -inline bool operator!= (const msg_addr_t& l, const msg_addr_t& r) { return l._addr != r._addr; } -inline bool operator< (const msg_addr_t& l, const msg_addr_t& r) { return l._addr < r._addr; } - -//typedef struct msg_addr msg_addr_t; +inline bool operator== (const msg_addr_t& l, const msg_addr_t& r) { return (l._type == r._type) && (l._num == r._num); } +inline bool operator!= (const msg_addr_t& l, const msg_addr_t& r) { return (l._type != r._type) || (l._num != r._num); } +inline bool operator< (const msg_addr_t& l, const msg_addr_t& r) { return (l._type < r._type) || (l._type == r._type && l._num < r._num); } inline std::ostream& operator<<(std::ostream& out, const msg_addr_t& addr) { //if (addr.is_namer()) return out << "namer"; - return out << addr.type_str() << addr.num(); + if (addr.is_new() || addr.num() < 0) + return out << addr.type_str() << "?"; + else + return out << addr.type_str() << addr.num(); } - namespace __gnu_cxx { template<> struct hash< msg_addr_t > { size_t operator()( const msg_addr_t m ) const { static hash H; - return H(m._addr); + return H(m.type() ^ m.num()); } }; } @@ -290,18 +264,11 @@ namespace __gnu_cxx { #define MSG_ADDR_CLIENT_NEW MSG_ADDR_CLIENT(MSG_ADDR_NEW) #define MSG_ADDR_NAMER_NEW MSG_ADDR_NAMER(MSG_ADDR_NEW) -#define MSG_ADDR_ISCLIENT(x) x.is_client() -#define MSG_ADDR_TYPE(x) x.type_str() -#define MSG_ADDR_NUM(x) x.num() -#define MSG_ADDR_NICE(x) x.type_str() << x.num() - - - class entity_inst_t { public: tcpaddr_t addr; - int rank; + __int64_t rank; entity_inst_t() : rank(-1) { memset(&addr, 0, sizeof(addr)); @@ -309,6 +276,14 @@ class entity_inst_t { entity_inst_t(tcpaddr_t& a, int r) : addr(a), rank(r) { memset(&addr, 0, sizeof(addr)); } + + void set_addr(tcpaddr_t a) { + addr = a; + + // figure out rank + rank = *((unsigned*)&a.sin_addr.s_addr); + rank |= (__uint64_t)a.sin_port << 32; + } }; inline bool operator==(const entity_inst_t& a, const entity_inst_t& b) { return a.rank == b.rank && a.addr == b.addr; } @@ -320,7 +295,8 @@ inline bool operator<=(const entity_inst_t& a, const entity_inst_t& b) { return inline ostream& operator<<(ostream& out, const entity_inst_t &i) { - return out << "rank" << i.rank << "_" << i.addr; + //return out << "rank" << i.rank << "_" << i.addr; + return out << i.addr; } @@ -413,7 +389,7 @@ public: int get_source_port() { return env.source_port; } entity_inst_t& get_source_inst() { return env.source_inst; } - void set_source_inst(entity_inst_t &i) { env.source_inst = i; } + void set_source_inst(const entity_inst_t &i) { env.source_inst = i; } // PAYLOAD ---- void reset_payload() { @@ -449,7 +425,7 @@ public: } virtual void print(ostream& out) { - out << "message(type=" << get_type() << ")"; + out << get_type_name(); } }; diff --git a/trunk/ceph/msg/Messenger.h b/trunk/ceph/msg/Messenger.h index 4ec3349a2a096..85ef499eb97bb 100644 --- a/trunk/ceph/msg/Messenger.h +++ b/trunk/ceph/msg/Messenger.h @@ -36,14 +36,20 @@ class Messenger { private: Dispatcher *dispatcher; msg_addr_t _myaddr; + entity_inst_t _myinst; public: Messenger(msg_addr_t w) : dispatcher(0), _myaddr(w) { } virtual ~Messenger() { } - void set_myaddr(msg_addr_t m) { _myaddr = m; } + const entity_inst_t &get_myinst() { return _myinst; } + void set_myinst(entity_inst_t& v) { _myinst = v; } + msg_addr_t get_myaddr() { return _myaddr; } + void _set_myaddr(msg_addr_t m) { _myaddr = m; } + + virtual void reset_myaddr(msg_addr_t m) = 0; virtual int shutdown() = 0; @@ -68,11 +74,9 @@ class Messenger { // send message virtual void prepare_dest(const entity_inst_t& inst) {} - virtual int send_message(Message *m, msg_addr_t dest, int port=0, int fromport=0) = 0; - virtual int send_message(Message *m, msg_addr_t dest, const entity_inst_t& inst, - int port=0, int fromport=0) { - return send_message(m, dest, port, fromport); // overload me! - } + //virtual int send_message(Message *m, msg_addr_t dest, int port=0, int fromport=0) = 0; + virtual int send_message(Message *m, msg_addr_t dest, entity_inst_t inst, + int port=0, int fromport=0) = 0; // make a procedure call diff --git a/trunk/ceph/msg/SimpleMessenger.cc b/trunk/ceph/msg/SimpleMessenger.cc new file mode 100644 index 0000000000000..9a6ae8f558fe2 --- /dev/null +++ b/trunk/ceph/msg/SimpleMessenger.cc @@ -0,0 +1,1210 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "SimpleMessenger.h" + +#include +#include +#include +#include + +#include "config.h" + +#include "messages/MGenericMessage.h" +#include "messages/MNSConnect.h" +#include "messages/MNSConnectAck.h" +#include "messages/MNSRegister.h" +#include "messages/MNSRegisterAck.h" +#include "messages/MNSLookup.h" +#include "messages/MNSLookupReply.h" +#include "messages/MNSFailure.h" + +//#include "messages/MFailure.h" + +#include + + +#undef dout +#define dout(l) if (l<=g_conf.debug_ms) cout << g_clock.now() << " -- " << rank.my_inst.addr << " " +#define derr(l) if (l<=g_conf.debug_ms) cerr << g_clock.now() << " -- " << rank.my_inst.addr << " " + + + +#include "tcp.cc" + + +Rank rank; + + + +/******************************************** + * Accepter + */ + +int Rank::Accepter::start() +{ + // bind to a socket + dout(10) << "accepter.start binding to listen " << endl; + + /* socket creation */ + listen_sd = socket(AF_INET,SOCK_STREAM,0); + assert(listen_sd > 0); + + /* bind to port */ + memset((char*)&listen_addr, 0, sizeof(listen_addr)); + listen_addr.sin_family = AF_INET; + listen_addr.sin_addr.s_addr = htonl(INADDR_ANY); + listen_addr.sin_port = 0; + + int rc = bind(listen_sd, (struct sockaddr *) &listen_addr, sizeof(listen_addr)); + assert(rc >= 0); + + socklen_t llen = sizeof(listen_addr); + getsockname(listen_sd, (sockaddr*)&listen_addr, &llen); + + int myport = listen_addr.sin_port; + + // listen! + rc = ::listen(listen_sd, 1000); + assert(rc >= 0); + + //dout(10) << "accepter.start listening on " << myport << endl; + + // my address is... + char host[100]; + bzero(host, 100); + gethostname(host, 100); + //dout(10) << "accepter.start my hostname is " << host << endl; + + struct hostent *myhostname = gethostbyname( host ); + + struct sockaddr_in my_addr; + memset(&my_addr, 0, sizeof(my_addr)); + + my_addr.sin_family = myhostname->h_addrtype; + memcpy((char *) &my_addr.sin_addr.s_addr, + myhostname->h_addr_list[0], + myhostname->h_length); + my_addr.sin_port = myport; + + listen_addr = my_addr; + + dout(10) << "accepter.start listen addr is " << listen_addr << endl; + + // start thread + create(); + + return 0; +} + +void *Rank::Accepter::entry() +{ + dout(10) << "accepter starting" << endl; + + while (!done) { + // accept + struct sockaddr_in addr; + socklen_t slen = sizeof(addr); + int sd = ::accept(listen_sd, (sockaddr*)&addr, &slen); + if (sd > 0) { + dout(10) << "accepted incoming on sd " << sd << endl; + + rank.lock.Lock(); + Pipe *p = new Pipe(sd); + rank.pipes.insert(p); + rank.lock.Unlock(); + } else { + dout(10) << "no incoming connection?" << endl; + break; + } + } + + return 0; +} + + + +/************************************** + * Pipe + */ + +int Rank::Pipe::accept() +{ + // my creater gave me sd via accept() + + // announce myself. + int rc = tcp_write(sd, (char*)&rank.my_inst, sizeof(rank.my_inst)); + if (rc < 0) { + ::close(sd); + done = true; + return -1; + } + + // identify peer + rc = tcp_read(sd, (char*)&peer_inst, sizeof(peer_inst)); + if (rc < 0) { + dout(10) << "pipe(? " << this << ").accept couldn't read peer inst" << endl; + ::close(sd); + done = true; + return -1; + } + + // create writer thread. + writer_running = true; + writer_thread.create(); + + // register pipe. + if (peer_inst.rank >= 0) { + rank.lock.Lock(); + { + if (rank.rank_pipe.count(peer_inst.rank) == 0) { + // install a pipe! + dout(10) << "pipe(" << peer_inst << ' ' << this << ").accept peer is " << peer_inst << endl; + rank.rank_pipe[peer_inst.rank] = this; + } else { + // low ranks' Pipes "win" + if (peer_inst.rank < rank.my_inst.rank || + rank.my_inst.rank < 0) { + dout(10) << "pipe(" << peer_inst << ' ' << this << ").accept peer is " << peer_inst + << ", already had pipe, but switching to this new one" << endl; + // switch to this new Pipe + rank.rank_pipe[peer_inst.rank]->close(); // close old one + rank.rank_pipe[peer_inst.rank] = this; + } else { + dout(10) << "pipe(" << peer_inst << ' ' << this << ").accept peer is " << peer_inst + << ", already had pipe, sticking with it" << endl; + } + } + } + rank.lock.Unlock(); + } else { + dout(10) << "pipe(" << peer_inst << ' ' << this << ").accept peer is unranked " << peer_inst << endl; + } + + return 0; // success. +} + +int Rank::Pipe::connect() +{ + dout(10) << "pipe(" << peer_inst << ' ' << this << ").connect" << endl; + + // create socket? + sd = socket(AF_INET,SOCK_STREAM,0); + assert(sd > 0); + + // bind any port + struct sockaddr_in myAddr; + myAddr.sin_family = AF_INET; + myAddr.sin_addr.s_addr = htonl(INADDR_ANY); + myAddr.sin_port = htons( 0 ); + + int rc = bind(sd, (struct sockaddr *) &myAddr, sizeof(myAddr)); + assert(rc>=0); + + // connect! + rc = ::connect(sd, (sockaddr*)&peer_inst.addr, sizeof(myAddr)); + if (rc < 0) return rc; + + // identify peer + entity_inst_t inst; + rc = tcp_read(sd, (char*)&inst, sizeof(inst)); + if (inst.rank < 0) + inst = peer_inst; // i know better than they do. + if (peer_inst != inst && inst.rank > 0) { + derr(0) << "pipe(" << peer_inst << ' ' << this << ").connect peer is " << inst << ", wtf" << endl; + assert(0); + return -1; + } + + // identify myself + rc = tcp_write(sd, (char*)&rank.my_inst, sizeof(rank.my_inst)); + if (rc < 0) + return -1; + + // register pipe + rank.lock.Lock(); + { + if (rank.rank_pipe.count(peer_inst.rank) == 0) { + dout(10) << "pipe(" << peer_inst << ' ' << this << ").connect registering pipe" << endl; + rank.rank_pipe[peer_inst.rank] = this; + } else { + // this is normal. + dout(10) << "pipe(" << peer_inst << ' ' << this << ").connect pipe already registered." << endl; + } + } + rank.lock.Unlock(); + + // start reader + reader_running = true; + reader_thread.create(); + + return 0; +} + + +void Rank::Pipe::close() +{ + if (sent_close) { + dout(10) << "pipe(" << peer_inst << ' ' << this << ").close already closing" << endl; + return; + } + dout(10) << "pipe(" << peer_inst << ' ' << this << ").close" << endl; + + // unreg ourselves + rank.lock.Lock(); + { + if (rank.rank_pipe.count(peer_inst.rank) && + rank.rank_pipe[peer_inst.rank] == this) { + dout(10) << "pipe(" << peer_inst << ' ' << this << ").close unregistering pipe" << endl; + rank.rank_pipe.erase(peer_inst.rank); + } + } + rank.lock.Unlock(); + + // queue close message. + if (socket_error) { + dout(10) << "pipe(" << peer_inst << ' ' << this << ").close not queueing MSG_CLOSE, socket error" << endl; + } else { + dout(10) << "pipe(" << peer_inst << ' ' << this << ").close queueing MSG_CLOSE" << endl; + lock.Lock(); + q.push_back(new MGenericMessage(MSG_CLOSE)); + cond.Signal(); + sent_close = true; + lock.Unlock(); + } +} + + +/* read msgs from socket. + * also, server. + * + */ +void Rank::Pipe::reader() +{ + if (server) + accept(); + + // loop. + while (!done) { + Message *m = read_message(); + if (!m || m->get_type() == 0) { + if (m) { + delete m; + dout(10) << "pipe(" << peer_inst << ' ' << this << ").reader read MSG_CLOSE message" << endl; + } else { + derr(10) << "pipe(" << peer_inst << ' ' << this << ").reader read null message" << endl; + } + + if (!sent_close) + close(); + + done = true; + cond.Signal(); // wake up writer too. + break; + } + + dout(10) << "pipe(" << peer_inst << ' ' << this << ").reader got message for " << m->get_dest() << endl; + + EntityMessenger *entity = 0; + + rank.lock.Lock(); + { + if (rank.entity_map.count(m->get_source()) && + rank.entity_map[m->get_source()] > m->get_source_inst()) { + derr(0) << "pipe(" << peer_inst << ' ' << this << ").reader source " << m->get_source() + << " inst " << m->get_source_inst() + << " > " << rank.entity_map[m->get_source()] + << ", WATCH OUT " << *m << endl; + assert(0); + } + + if (g_conf.ms_single_dispatch) { + // submit to single dispatch queue + rank._submit_single_dispatch(m); + } else { + if (rank.local.count(m->get_dest())) { + // find entity + entity = rank.local[m->get_dest()]; + } else { + entity = rank.find_unnamed(m->get_dest()); + if (!entity) { + derr(0) << "pipe(" << peer_inst << ' ' << this << ").reader got message " << *m << " for " << m->get_dest() << ", which isn't local" << endl; + assert(0); // FIXME do this differently + } + } + } + } + rank.lock.Unlock(); + + if (entity) + entity->queue_message(m); // queue + } + + + // reap? + bool reap = false; + lock.Lock(); + { + reader_running = false; + if (!writer_running) reap = true; + } + lock.Unlock(); + + if (reap) { + dout(20) << "pipe(" << peer_inst << ' ' << this << ").reader queueing for reap" << endl; + ::close(sd); + rank.lock.Lock(); + { + rank.pipe_reap_queue.push_back(this); + rank.wait_cond.Signal(); + } + rank.lock.Unlock(); + } +} + + +/* write msgs to socket. + * also, client. + */ +void Rank::Pipe::writer() +{ + if (!server) { + int rc = connect(); + if (rc < 0) { + derr(1) << "pipe(" << peer_inst << ' ' << this << ").writer error connecting" << endl; + done = true; + list out; + fail(out); + } + } + + // loop. + lock.Lock(); + while (!q.empty() || !done) { + + if (!q.empty()) { + dout(20) << "pipe(" << peer_inst << ' ' << this << ").writer grabbing message(s)" << endl; + + // grab outgoing list + list out; + out.swap(q); + + // drop lock while i send these + lock.Unlock(); + + while (!out.empty()) { + Message *m = out.front(); + out.pop_front(); + + dout(20) << "pipe(" << peer_inst << ' ' << this << ").writer sending " << *m << endl; + + // stamp. + m->set_source_inst(rank.my_inst); + + // marshall + if (m->empty_payload()) + m->encode_payload(); + + if (write_message(m) < 0) { + // failed! + derr(1) << "pipe(" << peer_inst << ' ' << this << ").writer error sending " << *m << " to " << m->get_dest() << endl; + out.push_front(m); + fail(out); + done = true; + break; + } + + // did i just send a close? + if (m->get_type() == MSG_CLOSE) + done = true; + + // clean up + delete m; + } + + lock.Lock(); + continue; + } + + // wait + dout(20) << "pipe(" << peer_inst << ' ' << this << ").writer sleeping" << endl; + cond.Wait(lock); + } + lock.Unlock(); + + dout(20) << "pipe(" << peer_inst << ' ' << this << ").writer finishing" << endl; + + // reap? + bool reap = false; + lock.Lock(); + { + writer_running = false; + if (!reader_running) reap = true; + } + lock.Unlock(); + + if (reap) { + dout(20) << "pipe(" << peer_inst << ' ' << this << ").writer queueing for reap" << endl; + ::close(sd); + rank.lock.Lock(); + { + rank.pipe_reap_queue.push_back(this); + rank.wait_cond.Signal(); + } + rank.lock.Unlock(); + } +} + + +Message *Rank::Pipe::read_message() +{ + // envelope + //dout(10) << "receiver.read_message from sd " << sd << endl; + + msg_envelope_t env; + if (!tcp_read( sd, (char*)&env, sizeof(env) )) { + socket_error = true; + return 0; + } + + dout(20) << "pipe(" << peer_inst << ' ' << this << ").reader got envelope type=" << env.type + << " src " << env.source << " dst " << env.dest + << " nchunks=" << env.nchunks + << endl; + + // payload + bufferlist blist; + for (int i=0; iget_source() << endl; + + return m; +} + + + +int Rank::Pipe::write_message(Message *m) +{ + // get envelope, buffers + msg_envelope_t *env = &m->get_envelope(); + bufferlist blist; + blist.claim( m->get_payload() ); + +#ifdef TCP_KEEP_CHUNKS + env->nchunks = blist.buffers().size(); +#else + env->nchunks = 1; +#endif + + dout(20) << "pipe(" << peer_inst << ' ' << this << ").writer sending " << m << " " << *m + << " to " << m->get_dest() + << endl; + + // send envelope + int r = tcp_write( sd, (char*)env, sizeof(*env) ); + if (r < 0) { + derr(1) << "pipe(" << peer_inst << ' ' << this << ").writer error sending envelope for " << *m + << " to " << m->get_dest() << endl; + socket_error = true; + return -1; + } + + // payload +#ifdef TCP_KEEP_CHUNKS + // send chunk-wise + int i = 0; + for (list::const_iterator it = blist.buffers().begin(); + it != blist.buffers().end(); + it++) { + dout(10) << "pipe(" << peer_inst << ' ' << this << ").writer tcp_sending frag " << i << " len " << (*it).length() << endl; + int size = (*it).length(); + r = tcp_write( sd, (char*)&size, sizeof(size) ); + if (r < 0) { + derr(10) << "pipe(" << peer_inst << ' ' << this << ").writer error sending chunk len for " << *m << " to " << m->get_dest() << endl; + socket_error = true; + return -1; + } + r = tcp_write( sd, (*it).c_str(), size ); + if (r < 0) { + derr(10) << "pipe(" << peer_inst << ' ' << this << ").writer error sending data chunk for " << *m << " to " << m->get_dest() << endl; + socket_error = true; + return -1; + } + i++; + } +#else + // one big chunk + int size = blist.length(); + r = tcp_write( sd, (char*)&size, sizeof(size) ); + if (r < 0) { + derr(10) << "pipe(" << peer_inst << ' ' << this << ").writer error sending data len for " << *m << " to " << m->get_dest() << endl; + socket_error = true; + return -1; + } + dout(20) << "pipe(" << peer_inst << ' ' << this << ").writer data len is " << size << " in " << blist.buffers().size() << " buffers" << endl; + + for (list::const_iterator it = blist.buffers().begin(); + it != blist.buffers().end(); + it++) { + if ((*it).length() == 0) continue; // blank buffer. + r = tcp_write( sd, (char*)(*it).c_str(), (*it).length() ); + if (r < 0) { + derr(10) << "pipe(" << peer_inst << ' ' << this << ").writer error sending data megachunk for " << *m << " to " << m->get_dest() << " : len " << (*it).length() << endl; + socket_error = true; + return -1; + } + } +#endif + + return 0; +} + + +void Rank::Pipe::fail(list& out) +{ + derr(10) << "pipe(" << peer_inst << ' ' << this << ").fail" << endl; + + // FIXME: possible race before i reclaim lock here? + + // deactivate myself + rank.lock.Lock(); + { + if (rank.rank_pipe.count(peer_inst.rank) && + rank.rank_pipe[peer_inst.rank] == this) + rank.rank_pipe.erase(peer_inst.rank); + } + rank.lock.Unlock(); + + // what do i do about reader()? FIXME + + // sort my messages by (source) dispatcher, dest. + map > > by_dis; + lock.Lock(); + { + // include out at front of queue + q.splice(q.begin(), out); + + // sort + while (!q.empty()) { + if (q.front()->get_type() == MSG_CLOSE) { + delete q.front(); + } + else if (rank.local.count(q.front()->get_source())) { + EntityMessenger *mgr = rank.local[q.front()->get_source()]; + Dispatcher *dis = mgr->get_dispatcher(); + if (mgr->is_stopped()) { + // ignore. + dout(1) << "pipe(" << peer_inst << ' ' << this << ").fail on " << *q.front() << ", dispatcher stopping, ignoring." << endl; + delete q.front(); + } else { + by_dis[dis][q.front()->get_dest()].push_back(q.front()); + } + } + else { + // oh well. sending entity musta just shut down? + assert(0); + delete q.front(); + } + q.pop_front(); + } + } + lock.Unlock(); + + // report failure(s) to dispatcher(s) + for (map > >::iterator i = by_dis.begin(); + i != by_dis.end(); + ++i) + for (map >::iterator j = i->second.begin(); + j != i->second.end(); + ++j) + for (list::iterator k = j->second.begin(); + k != j->second.end(); + ++k) { + derr(1) << "pipe(" << peer_inst << ' ' << this << ").fail on " << **k << " to " << j->first << " inst " << peer_inst << endl; + i->first->ms_handle_failure(*k, j->first, peer_inst); + } +} + + + + + + +/******************************************** + * Rank + */ + +Rank::Rank() : + single_dispatcher(this) { +} +Rank::~Rank() +{ +} + + +void Rank::_submit_single_dispatch(Message *m) +{ + assert(lock.is_locked()); + + if (local.count(m->get_dest()) && + local[m->get_dest()]->is_ready()) { + rank.single_dispatch_queue.push_back(m); + rank.single_dispatch_cond.Signal(); + } else { + waiting_for_ready[m->get_dest()].push_back(m); + } +} + + +void Rank::single_dispatcher_entry() +{ + lock.Lock(); + while (!single_dispatch_stop || !single_dispatch_queue.empty()) { + if (!single_dispatch_queue.empty()) { + list ls; + ls.swap(single_dispatch_queue); + + lock.Unlock(); + { + while (!ls.empty()) { + Message *m = ls.front(); + ls.pop_front(); + + dout(1) << m->get_dest() + << " <-- " << m->get_source() << " " << m->get_source_inst() + << " ---- " << *m + << " -- " << m + << endl; + + assert(local.count(m->get_dest())); + local[m->get_dest()]->dispatch(m); + } + } + lock.Lock(); + continue; + } + single_dispatch_cond.Wait(lock); + } + lock.Unlock(); +} + + +/* + * note: assumes lock is held + */ +void Rank::reaper() +{ + dout(10) << "reaper" << endl; + assert(lock.is_locked()); + + while (!pipe_reap_queue.empty()) { + Pipe *p = pipe_reap_queue.front(); + dout(10) << "reaper reaping pipe " << p->get_peer_inst() << endl; + pipe_reap_queue.pop_front(); + assert(pipes.count(p)); + pipes.erase(p); + p->join(); + dout(10) << "reaper reaped pipe " << p->get_peer_inst() << endl; + delete p; + } +} + + +int Rank::start_rank() +{ + dout(10) << "start_rank" << endl; + + // bind to a socket + if (accepter.start() < 0) + return -1; + + // start single thread dispatcher? + if (g_conf.ms_single_dispatch) { + single_dispatch_stop = false; + single_dispatcher.create(); + } + + lock.Lock(); + + // my_inst + my_inst.set_addr( accepter.listen_addr ); + + dout(1) << "start_rank at " << my_inst << endl; + + lock.Unlock(); + return 0; +} + + + +/* connect_rank + * NOTE: assumes rank.lock held. + */ +Rank::Pipe *Rank::connect_rank(const entity_inst_t& inst) +{ + assert(rank.lock.is_locked()); + assert(inst != rank.my_inst); + + dout(10) << "connect_rank to " << inst << endl; + + // create pipe + Pipe *pipe = new Pipe(inst); + rank.rank_pipe[inst.rank] = pipe; + pipes.insert(pipe); + + return pipe; +} + + + + + +void Rank::show_dir() +{ + dout(10) << "show_dir ---" << endl; + + for (hash_map::iterator i = entity_map.begin(); + i != entity_map.end(); + i++) { + if (local.count(i->first)) { + dout(10) << "show_dir entity_map " << i->first << " -> " << i->second << " local " << endl; + } else { + dout(10) << "show_dir entity_map " << i->first << " -> " << i->second << endl; + } + } +} + +Rank::EntityMessenger *Rank::find_unnamed(msg_addr_t a) +{ + // find an unnamed local entity of the right type + for (map::iterator p = local.begin(); + p != local.end(); + ++p) { + if (p->first.type() == a.type() && p->first.is_new()) + return p->second; + } + return 0; +} + + + + +/* register_entity + */ +Rank::EntityMessenger *Rank::register_entity(msg_addr_t addr) +{ + dout(10) << "register_entity " << addr << endl; + lock.Lock(); + + // create messenger + EntityMessenger *msgr = new EntityMessenger(addr); + + // add to directory + entity_map[addr] = my_inst; + local[addr] = msgr; + + lock.Unlock(); + return msgr; +} + + +void Rank::unregister_entity(EntityMessenger *msgr) +{ + lock.Lock(); + dout(10) << "unregister_entity " << msgr->get_myaddr() << endl; + + // remove from local directory. + assert(local.count(msgr->get_myaddr())); + local.erase(msgr->get_myaddr()); + assert(entity_map.count(msgr->get_myaddr())); + entity_map.erase(msgr->get_myaddr()); + + wait_cond.Signal(); + + lock.Unlock(); +} + + +void Rank::submit_message(Message *m, const entity_inst_t& dest_inst) +{ + const msg_addr_t dest = m->get_dest(); + + // lookup + EntityMessenger *entity = 0; + Pipe *pipe = 0; + + lock.Lock(); + { + // local? + if (dest_inst.rank == my_inst.rank) { + if (local.count(dest)) { + // local + dout(20) << "submit_message " << *m << " dest " << dest << " local" << endl; + if (g_conf.ms_single_dispatch) { + _submit_single_dispatch(m); + } else { + entity = local[dest]; + } + } else { + derr(0) << "submit_message " << *m << " dest " << dest << " " << dest_inst << " local but not in local map?" << endl; + assert(0); // hmpf + } + } + else { + // remote. + if (rank_pipe.count( dest_inst.rank )) { + dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_inst << ", already connected." << endl; + // connected. + pipe = rank_pipe[ dest_inst.rank ]; + } else { + dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_inst << ", connecting." << endl; + // not connected. + pipe = connect_rank( dest_inst ); + } + } + } + lock.Unlock(); + + // do it + if (entity) { + // local! + dout(20) << "submit_message " << *m << " dest " << dest << " local, queueing" << endl; + entity->queue_message(m); + } + else if (pipe) { + // remote! + dout(20) << "submit_message " << *m << " dest " << dest << " remote, sending" << endl; + pipe->send(m); + } +} + + + + + +void Rank::wait() +{ + lock.Lock(); + while (1) { + // reap dead pipes + reaper(); + + if (local.empty()) { + dout(10) << "wait: everything stopped" << endl; + break; // everything stopped. + } + + wait_cond.Wait(lock); + } + lock.Unlock(); + + // done! clean up. + + // stop dispatch thread + if (g_conf.ms_single_dispatch) { + dout(10) << "wait: stopping dispatch thread" << endl; + lock.Lock(); + single_dispatch_stop = true; + single_dispatch_cond.Signal(); + lock.Unlock(); + single_dispatcher.join(); + } + + // reap pipes + lock.Lock(); + { + dout(10) << "wait: closing pipes" << endl; + list toclose; + for (hash_map<__int64_t,Pipe*>::iterator i = rank_pipe.begin(); + i != rank_pipe.end(); + i++) + toclose.push_back(i->second); + for (list::iterator i = toclose.begin(); + i != toclose.end(); + i++) + (*i)->close(); + + dout(10) << "wait: waiting for pipes " << pipes << " to close" << endl; + while (!pipes.empty()) { + wait_cond.Wait(lock); + reaper(); + } + } + lock.Unlock(); + + dout(10) << "wait: done." << endl; +} + + + +int Rank::find_ns_addr(tcpaddr_t &nsa) +{ + // file? + int fd = ::open(".ceph_ns",O_RDONLY); + if (fd > 0) { + ::read(fd, (void*)&nsa, sizeof(nsa)); + ::close(fd); + cout << "ceph ns is " << nsa << endl; + return 0; + } + + // env var? + char *nsaddr = getenv("CEPH_NAMESERVER");////envz_entry(*envp, e_len, "CEPH_NAMESERVER"); + if (nsaddr) { + while (nsaddr[0] != '=') nsaddr++; + nsaddr++; + + if (tcp_hostlookup(nsaddr, nsa) < 0) { + cout << "can't resolve " << nsaddr << endl; + return -1; + } + + cout << "ceph ns is " << nsa << endl; + return 0; + } + + cerr << "i can't find ceph ns addr in .ceph_ns or CEPH_NAMESERVER" << endl; + return -1; +} + + + +/********************************** + * EntityMessenger + */ + +Rank::EntityMessenger::EntityMessenger(msg_addr_t myaddr) : + Messenger(myaddr), + stop(false), + dispatch_thread(this) +{ + set_myinst(rank.my_inst); +} +Rank::EntityMessenger::~EntityMessenger() +{ +} + +void Rank::EntityMessenger::dispatch_entry() +{ + lock.Lock(); + while (!stop) { + if (!dispatch_queue.empty()) { + list ls; + ls.swap(dispatch_queue); + + lock.Unlock(); + { + // deliver + while (!ls.empty()) { + Message *m = ls.front(); + ls.pop_front(); + dout(1) << m->get_dest() + << " <-- " << m->get_source() << " " << m->get_source_inst() + << " ---- " << *m + << " -- " << m + << endl; + dispatch(m); + } + } + lock.Lock(); + continue; + } + cond.Wait(lock); + } + lock.Unlock(); +} + +void Rank::EntityMessenger::ready() +{ + dout(10) << "ready " << get_myaddr() << endl; + + if (g_conf.ms_single_dispatch) { + rank.lock.Lock(); + if (rank.waiting_for_ready.count(get_myaddr())) { + rank.single_dispatch_queue.splice(rank.single_dispatch_queue.end(), + rank.waiting_for_ready[get_myaddr()]); + rank.waiting_for_ready.erase(get_myaddr()); + rank.single_dispatch_cond.Signal(); + } + rank.lock.Unlock(); + } else { + // start my dispatch thread + dispatch_thread.create(); + } +} + + +int Rank::EntityMessenger::shutdown() +{ + dout(10) << "shutdown " << get_myaddr() << endl; + + // deregister + rank.unregister_entity(this); + + // stop my dispatch thread + if (dispatch_thread.am_self()) { + dout(1) << "shutdown i am dispatch, setting stop flag" << endl; + stop = true; + } else { + dout(1) << "shutdown i am not dispatch, setting stop flag and joining thread." << endl; + lock.Lock(); + stop = true; + cond.Signal(); + lock.Unlock(); + dispatch_thread.join(); + } + + return 0; +} + + +void Rank::EntityMessenger::prepare_dest(const entity_inst_t& inst) +{ + rank.lock.Lock(); + { + if (rank.rank_pipe.count(inst.rank) == 0) + rank.connect_rank(inst); + } + rank.lock.Unlock(); +} + +int Rank::EntityMessenger::send_message(Message *m, msg_addr_t dest, entity_inst_t inst, + int port, int fromport) +{ + // set envelope + m->set_source(get_myaddr(), fromport); + m->set_dest(dest, port); + + m->set_source_inst(rank.my_inst); + + dout(1) << m->get_source() + << " --> " << m->get_dest() << " " << inst + << " -- " << *m + << " -- " << m + << endl; + + rank.submit_message(m, inst); + + return 0; +} + + +void Rank::EntityMessenger::reset_myaddr(msg_addr_t newaddr) +{ + msg_addr_t oldaddr = get_myaddr(); + dout(10) << "set_myaddr " << oldaddr << " to " << newaddr << endl; + + rank.entity_map.erase(oldaddr); + rank.local.erase(oldaddr); + rank.entity_map[newaddr] = rank.my_inst; + rank.local[newaddr] = this; + + _set_myaddr(newaddr); +} + + + + +void Rank::EntityMessenger::mark_down(msg_addr_t a, entity_inst_t& i) +{ + assert(a != get_myaddr()); + rank.mark_down(a,i); +} + +void Rank::mark_down(msg_addr_t a, entity_inst_t& inst) +{ + //if (my_rank == 0) return; // ugh.. rank0 already handles this stuff in the namer + lock.Lock(); + if (entity_map.count(a) && + entity_map[a] > inst) { + dout(10) << "mark_down " << a << " inst " << inst << " < " << entity_map[a] << endl; + derr(10) << "mark_down " << a << " inst " << inst << " < " << entity_map[a] << endl; + // do nothing! + } else { + if (entity_map.count(a) == 0) { + // don't know it + dout(10) << "mark_down " << a << " inst " << inst << " ... unknown by me" << endl; + derr(10) << "mark_down " << a << " inst " << inst << " ... unknown by me" << endl; + } else { + // know it + assert(entity_map[a] <= inst); + dout(10) << "mark_down " << a << " inst " << inst << endl; + derr(10) << "mark_down " << a << " inst " << inst << endl; + + entity_map.erase(a); + + if (rank_pipe.count(inst.rank)) { + rank_pipe[inst.rank]->close(); + rank_pipe.erase(inst.rank); + } + } + } + lock.Unlock(); +} + +void Rank::EntityMessenger::mark_up(msg_addr_t a, entity_inst_t& i) +{ + assert(a != get_myaddr()); + rank.mark_up(a, i); +} + +void Rank::mark_up(msg_addr_t a, entity_inst_t& i) +{ + lock.Lock(); + { + dout(10) << "mark_up " << a << " inst " << i << endl; + derr(10) << "mark_up " << a << " inst " << i << endl; + + if (entity_map.count(a) == 0 || + entity_map[a] < i) { + entity_map[a] = i; + connect_rank(i); + } else if (entity_map[a] == i) { + dout(10) << "mark_up " << a << " inst " << i << " ... knew it" << endl; + derr(10) << "mark_up " << a << " inst " << i << " ... knew it" << endl; + } else { + dout(-10) << "mark_up " << a << " inst " << i << " < " << entity_map[a] << endl; + derr(-10) << "mark_up " << a << " inst " << i << " < " << entity_map[a] << endl; + } + + //if (waiting_for_lookup.count(a)) + //lookup(a); + } + lock.Unlock(); +} + diff --git a/trunk/ceph/msg/SimpleMessenger.h b/trunk/ceph/msg/SimpleMessenger.h new file mode 100644 index 0000000000000..247d76a07d22f --- /dev/null +++ b/trunk/ceph/msg/SimpleMessenger.h @@ -0,0 +1,292 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef __SIMPLEMESSENGER_H +#define __SIMPLEMESSENGER_H + + +#include +#include +using namespace std; +#include +#include +using namespace __gnu_cxx; + + +#include "include/types.h" + +#include "common/Mutex.h" +#include "common/Cond.h" +#include "common/Thread.h" + +#include "Messenger.h" +#include "Message.h" +#include "tcp.h" + + + + +/* Rank - per-process + */ +class Rank { + + class EntityMessenger; + class Pipe; + + // incoming + class Accepter : public Thread { + public: + bool done; + + tcpaddr_t listen_addr; + int listen_sd; + + Accepter() : done(false) {} + + void *entry(); + void stop() { + done = true; + ::close(listen_sd); + join(); + } + int start(); + } accepter; + + + // pipe + class Pipe { + protected: + int sd; + bool done; + entity_inst_t peer_inst; + bool server; + bool sent_close; + bool socket_error; + + bool reader_running; + bool writer_running; + + list q; + Mutex lock; + Cond cond; + + int accept(); // server handshake + int connect(); // client handshake + void reader(); + void writer(); + + Message *read_message(); + int write_message(Message *m); + void fail(list& ls); + + // threads + class Reader : public Thread { + Pipe *pipe; + public: + Reader(Pipe *p) : pipe(p) {} + void *entry() { pipe->reader(); return 0; } + } reader_thread; + friend class Reader; + + class Writer : public Thread { + Pipe *pipe; + public: + Writer(Pipe *p) : pipe(p) {} + void *entry() { pipe->writer(); return 0; } + } writer_thread; + friend class Writer; + + public: + Pipe(int s) : sd(s), + done(false), server(true), + sent_close(false), socket_error(false), + reader_running(false), writer_running(false), + reader_thread(this), writer_thread(this) { + // server + reader_running = true; + reader_thread.create(); + } + Pipe(const entity_inst_t &pi) : sd(0), + done(false), peer_inst(pi), server(false), + sent_close(false), + reader_running(false), writer_running(false), + reader_thread(this), writer_thread(this) { + // client + writer_running = true; + writer_thread.create(); + } + + // public constructors + static const Pipe& Server(int s); + static const Pipe& Client(const entity_inst_t& pi); + + entity_inst_t& get_peer_inst() { return peer_inst; } + + void close(); + void join() { + writer_thread.join(); + reader_thread.join(); + } + + void send(Message *m) { + lock.Lock(); + q.push_back(m); + cond.Signal(); + lock.Unlock(); + } + void send(list& ls) { + lock.Lock(); + q.splice(q.end(), ls); + cond.Signal(); + lock.Unlock(); + } + }; + + + + // messenger interface + class EntityMessenger : public Messenger { + Mutex lock; + Cond cond; + list dispatch_queue; + bool stop; + + class DispatchThread : public Thread { + EntityMessenger *m; + public: + DispatchThread(EntityMessenger *_m) : m(_m) {} + void *entry() { + m->dispatch_entry(); + return 0; + } + } dispatch_thread; + void dispatch_entry(); + + public: + void queue_message(Message *m) { + lock.Lock(); + dispatch_queue.push_back(m); + cond.Signal(); + lock.Unlock(); + } + void queue_messages(list ls) { + lock.Lock(); + dispatch_queue.splice(dispatch_queue.end(), ls); + cond.Signal(); + lock.Unlock(); + } + + public: + EntityMessenger(msg_addr_t myaddr); + ~EntityMessenger(); + + void ready(); + bool is_stopped() { return stop; } + + void wait() { + dispatch_thread.join(); + } + + void reset_myaddr(msg_addr_t m); + + void callback_kick() {} + int shutdown(); + void prepare_dest(const entity_inst_t& inst); + int send_message(Message *m, msg_addr_t dest, entity_inst_t inst, + int port=0, int fromport=0); + + void mark_down(msg_addr_t a, entity_inst_t& i); + void mark_up(msg_addr_t a, entity_inst_t& i); + }; + + + class SingleDispatcher : public Thread { + Rank *rank; + public: + SingleDispatcher(Rank *r) : rank(r) {} + void *entry() { + rank->single_dispatcher_entry(); + return 0; + } + } single_dispatcher; + + Cond single_dispatch_cond; + bool single_dispatch_stop; + list single_dispatch_queue; + + map > waiting_for_ready; + + void single_dispatcher_entry(); + void _submit_single_dispatch(Message *m); + + + // Rank stuff + public: + Mutex lock; + Cond wait_cond; // for wait() + + // my instance + entity_inst_t my_inst; + + // lookup + hash_map entity_map; + hash_set entity_unstarted; + + // local + map local; + + // remote + hash_map<__int64_t, Pipe*> rank_pipe; + + set pipes; + list pipe_reap_queue; + + void show_dir(); + + Pipe *connect_rank(const entity_inst_t& inst); + + void mark_down(msg_addr_t addr, entity_inst_t& i); + void mark_up(msg_addr_t addr, entity_inst_t& i); + + tcpaddr_t get_listen_addr() { return accepter.listen_addr; } + + void reaper(); + + EntityMessenger *find_unnamed(msg_addr_t a); + +public: + Rank(); + ~Rank(); + + int find_ns_addr(tcpaddr_t &tcpaddr); + + int start_rank(); + void wait(); + + EntityMessenger *register_entity(msg_addr_t addr); + void rename_entity(EntityMessenger *ms, msg_addr_t newaddr); + void unregister_entity(EntityMessenger *ms); + + void submit_message(Message *m, const entity_inst_t& inst); + void prepare_dest(const entity_inst_t& inst); + + // create a new messenger + EntityMessenger *new_entity(msg_addr_t addr); + +} ; + + + +extern Rank rank; + +#endif diff --git a/trunk/ceph/newsyn.cc b/trunk/ceph/newsyn.cc index 43fd1b2373391..efce694dbf9a2 100644 --- a/trunk/ceph/newsyn.cc +++ b/trunk/ceph/newsyn.cc @@ -27,7 +27,7 @@ using namespace std; #include "client/Client.h" #include "client/SyntheticClient.h" -#include "msg/NewerMessenger.h" +#include "msg/SimpleMessenger.h" #include "common/Timer.h" @@ -65,12 +65,10 @@ pair mpi_bootstrap_new(int& argc, char**& argv, MonMap *monmap) // start up all monitors at known addresses. entity_inst_t moninst[mpi_world]; // only care about first g_conf.num_mon of these. - if (mpi_rank < g_conf.num_mon) { - rank.my_rank = mpi_rank; - rank.start_rank(); // bind and listen + rank.start_rank(); // bind and listen - moninst[mpi_rank].rank = mpi_rank; - moninst[mpi_rank].addr = rank.get_listen_addr(); + if (mpi_rank < g_conf.num_mon) { + moninst[mpi_rank].set_addr( rank.get_listen_addr() ); //cerr << mpi_rank << " at " << rank.get_listen_addr() << endl; } @@ -78,14 +76,11 @@ pair mpi_bootstrap_new(int& argc, char**& argv, MonMap *monmap) MPI_Gather( &moninst[mpi_rank], sizeof(entity_inst_t), MPI_CHAR, moninst, sizeof(entity_inst_t), MPI_CHAR, 0, MPI_COMM_WORLD); - + if (mpi_rank == 0) { - rank.start_namer(); - for (int i=0; imon_inst[i] = moninst[i]; - if (i) rank.namer->manual_insert_inst(monmap->get_inst(i)); } } @@ -97,7 +92,7 @@ pair mpi_bootstrap_new(int& argc, char**& argv, MonMap *monmap) int fd = ::open(".ceph_monmap", O_WRONLY|O_CREAT); ::write(fd, (void*)bl.c_str(), bl.length()); - ::fchmod(fd, 0755); + ::fchmod(fd, 0644); ::close(fd); } else { @@ -111,13 +106,8 @@ pair mpi_bootstrap_new(int& argc, char**& argv, MonMap *monmap) if (mpi_rank > 0) { monmap->decode(bl); - rank.set_namer(monmap->get_inst(0).addr); } - - if (mpi_rank >= g_conf.num_mon) { - rank.start_rank(); - } - + // wait for everyone! MPI_Barrier(MPI_COMM_WORLD); @@ -257,7 +247,7 @@ int main(int argc, char **argv) for (int i=0; iinit(); started++; @@ -283,7 +273,7 @@ int main(int argc, char **argv) g_timer.add_event_after(kill_osd_after[i], new C_Die); Messenger *m = rank.register_entity(MSG_ADDR_OSD(i)); - cerr << "osd" << i << " on tcprank " << rank.my_rank << " " << hostname << "." << pid << endl; + cerr << "osd" << i << " at " << rank.my_inst << " " << hostname << "." << pid << endl; osd[i] = new OSD(i, m, monmap); osd[i]->init(); started++; @@ -352,7 +342,7 @@ int main(int argc, char **argv) nclients++; } if (nclients) { - cerr << nclients << " clients on tcprank " << rank.my_rank << " " << hostname << "." << pid << endl; + cerr << nclients << " clients at " << rank.my_inst << " " << hostname << "." << pid << endl; } for (set::iterator it = clientlist.begin(); @@ -374,7 +364,7 @@ int main(int argc, char **argv) if (myrank && !started) { //dout(1) << "IDLE" << endl; - cerr << "idle on tcprank " << rank.my_rank << " " << hostname << "." << pid << endl; + cerr << "idle at " << rank.my_inst << " " << hostname << "." << pid << endl; //rank.stop_rank(); } diff --git a/trunk/ceph/osd/OSD.cc b/trunk/ceph/osd/OSD.cc index 67e84746229b0..d89f1348e5f69 100644 --- a/trunk/ceph/osd/OSD.cc +++ b/trunk/ceph/osd/OSD.cc @@ -1568,7 +1568,7 @@ void OSD::do_queries(map< int, map >& query_map) void OSD::handle_pg_notify(MOSDPGNotify *m) { dout(7) << "handle_pg_notify from " << m->get_source() << endl; - int from = MSG_ADDR_NUM(m->get_source()); + int from = m->get_source().num(); if (!require_same_or_newer_map(m, m->get_epoch())) return; @@ -1687,7 +1687,7 @@ void OSD::handle_pg_notify(MOSDPGNotify *m) void OSD::handle_pg_log(MOSDPGLog *m) { - int from = MSG_ADDR_NUM(m->get_source()); + int from = m->get_source().num(); const pg_t pgid = m->get_pgid(); if (!require_same_or_newer_map(m, m->get_epoch())) return; @@ -1759,7 +1759,7 @@ void OSD::handle_pg_log(MOSDPGLog *m) void OSD::handle_pg_query(MOSDPGQuery *m) { dout(7) << "handle_pg_query from " << m->get_source() << " epoch " << m->get_epoch() << endl; - int from = MSG_ADDR_NUM(m->get_source()); + int from = m->get_source().num(); if (!require_same_or_newer_map(m, m->get_epoch())) return; diff --git a/trunk/ceph/osd/PG.h b/trunk/ceph/osd/PG.h index f8a040346e88e..3da16b9b81b7b 100644 --- a/trunk/ceph/osd/PG.h +++ b/trunk/ceph/osd/PG.h @@ -59,7 +59,7 @@ namespace __gnu_cxx { size_t operator()(const reqid_t &r) const { static hash H; static hash<__uint64_t> I; - return H(r.addr._addr) ^ I(r.tid); + return H(r.addr.type() ^ r.addr.num()) ^ I(r.tid); } }; } -- 2.39.5