# This makes it less annoying to build on non-mpi hosts for dev work, and seems to
# behave just fine... change ${CC} back to mpicxx if you get paranoid.
+#CC = g++
+#CFLAGS = -g -Wall -I. -D_FILE_OFFSET_BITS=64 -DMPICH_IGNORE_CXX_SEEK -D_REENTRANT -D_THREAD_SAFE
+#LIBS = -lpthread
+
+# Hook for extra -I options, etc.
+EXTRA_CFLAGS =
+
+ifeq ($(target),darwin)
+# For Darwin
+CFLAGS = -g -Wall -I. -D_FILE_OFFSET_BITS=64 -DMPICH_IGNORE_CXX_SEEK -D_REENTRANT -D_THREAD_SAFE -DDARWIN -D__FreeBSD__=10 ${EXTRA_CFLAGS}
+else
+# For linux
+CFLAGS = -g -Wall -I. -D_FILE_OFFSET_BITS=64 -DMPICH_IGNORE_CXX_SEEK -D_REENTRANT -D_THREAD_SAFE
+endif
+
CC = g++
-CFLAGS = -g -Wall -I. -D_FILE_OFFSET_BITS=64 -DMPICH_IGNORE_CXX_SEEK -D_REENTRANT -D_THREAD_SAFE
-LIBS = -lpthread
-CRYPTOLIBS = /usr/lib/libcrypto++5.2.so
+LIBS = -lpthread -lcrypto++5.2
+#CRYPTOLIBS = /usr/lib/libcrypto++5.2.so
#for normal mpich2 machines
MPICC = mpicxx
mon/Monitor.o\
mon/OSDMonitor.o\
mon/MDSMonitor.o\
+ mon/ClientMonitor.o\
mon/Elector.o
COMMON_OBJS= \
msg/Messenger.o\
msg/Message.o\
- msg/HostMonitor.o\
common/Logger.o\
common/Clock.o\
common/Timer.o\
client/SyntheticClient.o\
client/Trace.o
-TCP_OBJS = \
- msg/TCPMessenger.o\
- msg/TCPDirectory.o
-
CRYPTO_OBJS = \
crypto/CryptoLib.o
-TARGETS = cosd cfuse newsyn fakesyn
+TARGETS = cmon cosd cmds cfuse newsyn fakesyn
SRCS=*.cc */*.cc *.h */*.h */*/*.h
# real bits
-cmon: cmon.cc mon.o ebofs.o msg/NewerMessenger.o common.o
- ${CC} ${CFLAGS} ${MPILIBS} $^ -o $@
+mkmonmap: mkmonmap.cc common.o crypto.o
+ ${CC} ${CFLAGS} ${LIBS} $^ -o $@
+
+cmon: cmon.cc mon.o ebofs.o msg/SimpleMessenger.o common.o crypto.o
+ ${CC} ${CFLAGS} ${LIBS} $^ -o $@
-cosd: cosd.cc osd.o ebofs.o msg/NewerMessenger.o common.o
- ${CC} ${CFLAGS} ${MPILIBS} $^ -o $@
+cosd: cosd.cc osd.o ebofs.o msg/SimpleMessenger.o common.o crypto.o
+ ${CC} ${CFLAGS} ${LIBS} $^ -o $@
-cmds: cmds.cc mds.o osdc.o msg/NewerMessenger.o common.o
- ${CC} ${CFLAGS} ${MPILIBS} $^ -o $@
+cmds: cmds.cc mds.o osdc.o msg/SimpleMessenger.o common.o crypto.o
+ ${CC} ${CFLAGS} ${LIBS} $^ -o $@
-cfuse: cfuse.cc client.o osdc.o client/fuse.o msg/NewerMessenger.o common.o
+csyn: csyn.cc client.o osdc.o msg/SimpleMessenger.o common.o crypto.o
+ ${CC} ${CFLAGS} ${LIBS} $^ -o $@
+
+cfuse: cfuse.cc client.o osdc.o client/fuse.o msg/SimpleMessenger.o common.o crypto.o
${CC} ${CFLAGS} ${LIBS} -lfuse $^ -o $@
# synthetic workload
fakesyn: fakesyn.o mon.o mds.o client.o osd.o ebofs.o osdc.o msg/FakeMessenger.o common.o crypto.o
- ${CC} -pg ${CFLAGS} ${LIBS} ${CRYPTOLIBS} $^ -o $@
+ ${CC} -pg ${CFLAGS} ${LIBS} $^ -o $@
tcpsyn: tcpsyn.cc mon.o mds.o client.o osd.o ebofs.o osdc.o ${TCP_OBJS} common.o
${MPICC} ${MPICFLAGS} ${MPILIBS} $^ -o $@
-newsyn: newsyn.cc mon.o mds.o client.o osd.o ebofs.o osdc.o msg/NewerMessenger.o common.o
+newsyn: newsyn.cc mon.o mds.o client.o osd.o ebofs.o osdc.o msg/SimpleMessenger.o common.o crypto.o
${MPICC} -pg ${MPICFLAGS} ${MPILIBS} $^ -o $@
-newsyn.nopg: newsyn.cc mon.o mds.o client.o osd.o ebofs.o osdc.o msg/NewerMessenger.o common.o
+newsyn.nopg: newsyn.cc mon.o mds.o client.o osd.o ebofs.o osdc.o msg/NewerMessenger.o common.o crypto.o
${MPICC} ${MPICFLAGS} ${MPILIBS} $^ -o $@
# + obfs
fakesynobfs: fakesyn.cc mds.o client.o osd_obfs.o msg/FakeMessenger.o common.o
- ${CC} -DUSE_OBFS ${CFLAGS} ${LIBS} ${CRYPTOLIBS} $^ -o $@
+ ${CC} -DUSE_OBFS ${CFLAGS} ${LIBS} $^ -o $@
tcpsynobfs: tcpsyn.cc mds.o client.o osd_obfs.o ${TCP_OBJS} common.o
${MPICC} -DUSE_OBFS ${MPICFLAGS} ${MPILIBS} $^ -o $@
# libceph
-libceph.o: client/ldceph.o client/Client.o ${TCP_OBJS} ${COMMON_OBJS} ${SYN_OBJS} ${OSDC_OBJS}
- ld -i $^ -o $@
+libceph.o: client/ldceph.o client/Client.o ${COMMON_OBJS} ${SYN_OBJS} ${OSDC_OBJS}
+ ar -rc $@ $^
bench/mdtest/mdtest.o: bench/mdtest/mdtest.c
mpicc -c $^ -o $@
%.so: %.cc
${CC} -shared -fPIC ${CFLAGS} $< -o $@
-
-testmpi: test/testmpi.cc msg/MPIMessenger.cc config.o common/Timer.o common/clock.o msg/Messenger.o msg/Dispatcher.o msg/error.o
- ${MPICC} ${CFLAGS} ${LIBS} $^ -o $@
-
-
clean:
rm -f *.o */*.o ${TARGETS} ${TEST_TARGETS}
common.o: ${COMMON_OBJS}
- ld -i -o $@ $^
+ ar -rc $@ $^
ebofs.o: ${EBOFS_OBJS}
- ld -i -o $@ $^
+ ar -rc $@ $^
client.o: ${CLIENT_OBJS}
- ld -i -o $@ $^
+ ar -rc $@ $^
osd.o: ${OSD_OBJS}
- ld -i -o $@ $^
+ ar -rc $@ $^
osdc.o: ${OSDC_OBJS}
- ld -i -o $@ $^
+ ar -rc $@ $^
crypto.o: ${CRYPTO_OBJS}
ld -i -o $@ $^
-osd_obfs.o: osd/OBFSStore.o osd/OSD.ccosd/PG.o osd/ObjectStore.o osd/FakeStore.o
- ${MPICC} -DUSE_OBFS ${MPICFLAGS} ${MPILIBS} $^ -o $@ ../uofs/uofs.a
+osd_obfs.o: osd/OBFSStore.o osd/OSD.cc osd/PG.o osd/ObjectStore.o osd/FakeStore.o
+ ${MPICC} -DUSE_OBFS ${MPICFLAGS} ${MPILIBS} $^ -o $@ ../uofs/uofs.o
mds.o: ${MDS_OBJS}
- ld -i -o $@ $^
+ ar -rc $@ $^
mon.o: ${MON_OBJS}
- ld -i -o $@ $^
+ ar -rc $@ $^
%.o: %.cc
${CC} ${CFLAGS} -c $< -o $@
-pmds = parallel metadata server/system
+Ceph - a scalable distributed file system
+-----------------------------------------
-'test' is a standalone proccess that runs all clients, OSDs, and MDSs
-in a single process with a basic message passer (FakeMessenger).
-Useful for debugging.
-
-'pmds' uses MPI for communication.
-
-'import' builds a metadata store on ./osddata/ by taking find output
-from stdin. Make sure find is run from the current directory so that
-import can stat the files it's fed. The find root becomes the file
-system root; feel free to use relative paths.
-
-This is all GPL, etc.
-
-
-Getting started:
-
- 1- Comment out the LEAKTRACER= line in the Makefile if you don't have
- LeakTracer installed (you probably don't).
-
- 2- make (test and import targets are testing ones; pmds uses MPI)
-
- 3- Build an OSD metadata store:
- # mkdir osddata
- # find /some/big/dir | ./import root
-
- 4- Single proc sim:
- # ./test
- or more likely,
- # ./test > out
-
- 5- Change parameters in config.cc.
-
- 6- If you want stats logged, mkdir log (make sure you have enough
- file handles; there's one open file per client).
-
-
-Notes on pmds (MPI version):
-
- - On mcr/alc I have to
- # setenv LD_LIBRARY_PATH /usr/lib/mpi/mpi_gnu/lib
- for the GNU runtime MPI libs (otherwise you get the Intel ones,
- which segfault).
-
- - Each MDS and OSD gets its own node. Clients are divided over
- whatever is left over. So make sure you tell MPI to give you at
- least num_mds+num_osd+1 processes (num_mds etc defined in
- config.cc).
-
-
-
-2004.08.25 sage@newdream.net
+Please see http://ceph.sourceforge.net/ for current info.
*
*/
-
-
#include <sys/stat.h>
#include <iostream>
#include <string>
#include "config.h"
-#include "mds/MDS.h"
-#include "osd/OSD.h"
#include "client/Client.h"
#include "client/fuse.h"
-#include "msg/NewMessenger.h"
+#include "msg/SimpleMessenger.h"
#include "common/Timer.h"
+#ifndef DARWIN
#include <envz.h>
+#endif // DARWIN
#include <sys/types.h>
#include <sys/stat.h>
vec_to_argv(args, argc, argv);
// load monmap
- bufferlist bl;
- int fd = ::open(".ceph_monmap", O_RDONLY);
- assert(fd >= 0);
- struct stat st;
- ::fstat(fd, &st);
- bufferptr bp(st.st_size);
- bl.append(bp);
- ::read(fd, (void*)bl.c_str(), bl.length());
- ::close(fd);
-
- MonMap *monmap = new MonMap;
- monmap->decode(bl);
+ MonMap monmap;
+ int r = monmap.read(".ceph_monmap");
+ assert(r >= 0);
// start up network
- rank.set_namer(monmap->get_inst(0).addr);
rank.start_rank();
// start client
- Client *client = new Client(rank.register_entity(MSG_ADDR_CLIENT_NEW), monmap);
+ Client *client = new Client(rank.register_entity(MSG_ADDR_CLIENT_NEW), &monmap);
client->init();
// start up fuse
// wait for messenger to finish
rank.wait();
-
return 0;
}
#include <sys/stat.h>
#include <fcntl.h>
+#ifdef DARWIN
+#include <sys/statvfs.h>
+#endif // DARWIN
+
#include <iostream>
using namespace std;
#include "Client.h"
+#include "messages/MClientBoot.h"
#include "messages/MClientMount.h"
#include "messages/MClientMountAck.h"
#include "messages/MClientFileCaps.h"
Client::Client(Messenger *m, MonMap *mm)
{
// which client am i?
- whoami = MSG_ADDR_NUM(m->get_myaddr());
+ whoami = m->get_myaddr().num();
monmap = mm;
mounted = false;
client_lock.Unlock();
}
-void Client::handle_mount_ack(MClientMountAck *m)
-{
- // mdsmap!
- if (!mdsmap) mdsmap = new MDSMap;
- mdsmap->decode(m->get_mds_map_state());
-
- // we got osdmap!
- osdmap->decode(m->get_osd_map_state());
-
- dout(2) << "mounted" << endl;
- mounted = true;
- mount_cond.Signal();
-
- delete m;
-}
-
-
-void Client::handle_unmount_ack(Message* m)
-{
- dout(1) << "got unmount ack" << endl;
- mounted = false;
- mount_cond.Signal();
- delete m;
-}
-
void Client::handle_mds_map(MMDSMap* m)
{
if (mdsmap == 0)
mdsmap = new MDSMap;
+ if (whoami < 0) {
+ whoami = m->get_dest().num();
+ dout(1) << "handle_mds_map i am now " << m->get_dest() << endl;
+ messenger->reset_myaddr(m->get_dest());
+ }
+
map<epoch_t, bufferlist>::reverse_iterator p = m->maps.rbegin();
dout(1) << "handle_mds_map epoch " << p->first << endl;
*/
void Client::handle_file_caps(MClientFileCaps *m)
{
- int mds = MSG_ADDR_NUM(m->get_source());
+ int mds = m->get_source().num();
Inode *in = 0;
if (inode_map.count(m->get_ino())) in = inode_map[ m->get_ino() ];
<< ", which we don't want caps for, releasing." << endl;
m->set_caps(0);
m->set_wanted(0);
- entity_inst_t srcinst = m->get_source_inst();
- messenger->send_message(m, m->get_source(), srcinst, m->get_source_port());
+ messenger->send_message(m, m->get_source(), m->get_source_inst(), m->get_source_port());
return;
}
in->file_wr_size = 0;
}
- messenger->send_message(m, m->get_source(), m->get_source_port());
+ messenger->send_message(m, m->get_source(), m->get_source_inst(), m->get_source_port());
}
// -------------------
// fs ops
-int Client::mount(int mkfs)
+int Client::mount()
{
client_lock.Lock();
// FIXME mds map update race with mount.
- dout(2) << "fetching latest mds map" << endl;
+ dout(2) << "sending boot msg to monitor" << endl;
if (mdsmap)
delete mdsmap;
int mon = monmap->pick_mon();
- messenger->send_message(new MMDSGetMap(),
+ messenger->send_message(new MClientBoot(),
MSG_ADDR_MON(mon), monmap->get_inst(mon));
-
+
while (!mdsmap)
mount_cond.Wait(client_lock);
dout(2) << "mounting" << endl;
MClientMount *m = new MClientMount();
- if (mkfs) m->set_mkfs(mkfs);
-
- messenger->send_message(m, MSG_ADDR_MDS(0), mdsmap->get_inst(0), MDS_PORT_SERVER);
+ int who = 0; // mdsmap->get_root(); // mount at root, for now
+ messenger->send_message(m,
+ MSG_ADDR_MDS(who), mdsmap->get_inst(who),
+ MDS_PORT_SERVER);
+
while (!mounted)
mount_cond.Wait(client_lock);
return 0;
}
+void Client::handle_mount_ack(MClientMountAck *m)
+{
+ // mdsmap!
+ if (!mdsmap) mdsmap = new MDSMap;
+ mdsmap->decode(m->get_mds_map_state());
+
+ // we got osdmap!
+ osdmap->decode(m->get_osd_map_state());
+
+ dout(2) << "mounted" << endl;
+ mounted = true;
+ mount_cond.Signal();
+
+ delete m;
+}
+
+
int Client::unmount()
{
client_lock.Lock();
return 0;
}
+void Client::handle_unmount_ack(Message* m)
+{
+ dout(1) << "got unmount ack" << endl;
+ mounted = false;
+ mount_cond.Signal();
+ delete m;
+}
+
// namespace ops
st->st_nlink = inode.nlink;
st->st_uid = inode.uid;
st->st_gid = inode.gid;
+#ifndef DARWIN
+ // FIXME what's going on here with darwin?
st->st_ctime = inode.ctime;
st->st_atime = inode.atime;
st->st_mtime = inode.mtime;
+#endif
st->st_size = inode.size;
st->st_blocks = inode.size ? ((inode.size - 1) / 4096 + 1):0;
st->st_blksize = 4096;
// fill the dirent
d->dp.d_dirent.d_ino = d->p->second.ino;
#ifndef __CYGWIN__
+#ifndef DARWIN
if (d->p->second.is_symlink())
d->dp.d_dirent.d_type = DT_LNK;
else if (d->p->second.is_dir())
d->dp.d_dirent.d_off = d->off;
d->dp.d_dirent.d_reclen = 1; // all records are length 1 (wrt offset, seekdir, telldir, etc.)
+#endif // DARWIN
#endif
strncpy(d->dp.d_dirent.d_name, d->p->first.c_str(), 256);
// fill the dirent
d->dp.d_dirent.d_ino = d->p->second.ino;
#ifndef __CYGWIN__
+#ifndef DARWIN
if (d->p->second.is_symlink())
d->dp.d_dirent.d_type = DT_LNK;
else if (d->p->second.is_dir())
d->dp.d_dirent.d_off = d->off;
d->dp.d_dirent.d_reclen = 1; // all records are length 1 (wrt offset, seekdir, telldir, etc.)
+#endif // DARWIN
#endif
strncpy(d->dp.d_dirent.d_name, d->p->first.c_str(), 256);
if (cmode & FILE_MODE_LAZY) f->inode->num_open_lazy++;
// caps included?
- int mds = MSG_ADDR_NUM(reply->get_source());
+ int mds = reply->get_source().num();
if (f->inode->caps.empty()) {// first caps?
dout(7) << " first caps on " << f->inode->inode.ino << endl;
return 0;
}
+#ifdef DARWIN
+int Client::statfs(const char *path, struct statvfs *stbuf)
+{
+ bzero (stbuf, sizeof (struct statvfs));
+ // FIXME
+ stbuf->f_bsize = 1024;
+ stbuf->f_frsize = 1024;
+ stbuf->f_blocks = 1024 * 1024;
+ stbuf->f_bfree = 1024 * 1024;
+ stbuf->f_bavail = 1024 * 1024;
+ stbuf->f_files = 1024 * 1024;
+ stbuf->f_ffree = 1024 * 1024;
+ stbuf->f_favail = 1024 * 1024;
+ stbuf->f_namemax = 1024;
+
+ return 0;
+}
+#else
int Client::statfs(const char *path, struct statfs *stbuf)
{
assert(0); // implement me
return 0;
}
-
+#endif
int Client::lazyio_propogate(int fd, off_t offset, size_t count)
// ----------------------
// fs ops.
- int mount(int mkfs=0);
+ int mount();
int unmount();
// these shoud (more or less) mirror the actual system calls.
+#ifdef DARWIN
+ int statfs(const char *path, struct statvfs *stbuf);
+#else
int statfs(const char *path, struct statfs *stbuf);
+#endif
// crap
int chdir(const char *s);
#define _XOPEN_SOURCE 500
#endif
-#define FUSE_USE_VERSION 22
+#define FUSE_USE_VERSION 25
#include <fuse.h>
#include <stdio.h>
#include <fcntl.h>
#include <dirent.h>
#include <errno.h>
+#ifdef DARWIN
+#include <sys/statvfs.h>
+#else
#include <sys/statfs.h>
+#endif // DARWIN
// ceph stuff
}
*/
+
+#ifdef DARWIN
+static int ceph_statfs(const char *path, struct statvfs *stbuf)
+{
+ return client->statfs(path, stbuf);
+}
+#else
static int ceph_statfs(const char *path, struct statfs *stbuf)
{
return client->statfs(path, stbuf);
}
+#endif
// allow other (all!) users to see my file system
// NOTE: echo user_allow_other >> /etc/fuse.conf
+ // NB: seems broken on Darwin
+#ifndef DARWIN
newargv[newargc++] = "-o";
newargv[newargc++] = "allow_other";
+#endif // DARWIN
// use inos
newargv[newargc++] = "-o";
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+
+#include <sys/stat.h>
+#include <iostream>
+#include <string>
+using namespace std;
+
+#include "config.h"
+
+#include "mon/MonMap.h"
+#include "mds/MDS.h"
+
+#include "msg/SimpleMessenger.h"
+
+#include "common/Timer.h"
+
+
+class C_Die : public Context {
+public:
+ void finish(int) {
+ cerr << "die" << endl;
+ exit(1);
+ }
+};
+
+class C_Debug : public Context {
+ public:
+ void finish(int) {
+ int size = &g_conf.debug_after - &g_conf.debug;
+ memcpy((char*)&g_conf.debug, (char*)&g_debug_after_conf.debug, size);
+ dout(0) << "debug_after flipping debug settings" << endl;
+ }
+};
+
+
+int main(int argc, char **argv)
+{
+ vector<char*> args;
+ argv_to_vec(argc, argv, args);
+
+ parse_config_options(args);
+
+ if (g_conf.kill_after)
+ g_timer.add_event_after(g_conf.kill_after, new C_Die);
+ if (g_conf.debug_after)
+ g_timer.add_event_after(g_conf.debug_after, new C_Debug);
+
+
+ // load monmap
+ MonMap monmap;
+ int r = monmap.read(".ceph_monmap");
+ assert(r >= 0);
+
+ // start up network
+ rank.start_rank();
+
+ // start mds
+ Messenger *m = rank.register_entity(MSG_ADDR_MDS_NEW);
+ assert(m);
+
+ MDS *mds = new MDS(m->get_myaddr().num(), m, &monmap);
+ mds->init();
+
+ // wait
+ rank.wait();
+
+ // done
+ delete mds;
+
+ return 0;
+}
+
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+
+#include <sys/stat.h>
+#include <iostream>
+#include <string>
+using namespace std;
+
+#include "config.h"
+
+#include "mon/MonMap.h"
+#include "mon/Monitor.h"
+
+#include "msg/SimpleMessenger.h"
+
+#include "common/Timer.h"
+
+
+class C_Die : public Context {
+public:
+ void finish(int) {
+ cerr << "die" << endl;
+ exit(1);
+ }
+};
+
+class C_Debug : public Context {
+ public:
+ void finish(int) {
+ int size = &g_conf.debug_after - &g_conf.debug;
+ memcpy((char*)&g_conf.debug, (char*)&g_debug_after_conf.debug, size);
+ dout(0) << "debug_after flipping debug settings" << endl;
+ }
+};
+
+
+int main(int argc, char **argv)
+{
+ vector<char*> args;
+ argv_to_vec(argc, argv, args);
+
+ parse_config_options(args);
+
+ if (g_conf.kill_after)
+ g_timer.add_event_after(g_conf.kill_after, new C_Die);
+ if (g_conf.debug_after)
+ g_timer.add_event_after(g_conf.debug_after, new C_Debug);
+
+ // args
+ int whoami = -1;
+ char *monmap_fn = ".ceph_monmap";
+ for (unsigned i=0; i<args.size(); i++) {
+ if (strcmp(args[i], "--mon") == 0)
+ whoami = atoi(args[++i]);
+ else if (strcmp(args[i], "--monmap") == 0)
+ monmap_fn = args[++i];
+ else {
+ cerr << "unrecognized arg " << args[i] << endl;
+ return -1;
+ }
+ }
+
+ MonMap monmap;
+
+ if (whoami < 0) {
+ // let's assume a standalone monitor
+ cout << "starting standalone mon0" << endl;
+ whoami = 0;
+
+ // start messenger
+ rank.start_rank();
+ cout << "bound to " << rank.get_listen_addr() << endl;
+
+ // add single mon0
+ monmap.add_mon(rank.my_inst);
+
+ // write monmap
+ cout << "writing monmap to " << monmap_fn << endl;;
+ int r = monmap.write(monmap_fn);
+ assert(r >= 0);
+ } else {
+ // i am specific monitor.
+
+ // read monmap
+ cout << "reading monmap from .ceph_monmap" << endl;
+ int r = monmap.read(monmap_fn);
+ assert(r >= 0);
+
+ // bind to a specific port
+ cout << "starting mon" << whoami << " at " << monmap.get_inst(whoami) << endl;
+ tcpaddr_t addr = monmap.get_inst(whoami).addr;
+ rank.set_listen_addr(addr);
+ rank.start_rank();
+ }
+
+ // start monitor
+ Messenger *m = rank.register_entity(MSG_ADDR_MON(whoami));
+ Monitor *mon = new Monitor(whoami, m, &monmap);
+ mon->init();
+
+ // wait
+ cout << "waiting for shutdown ..." << endl;
+ rank.wait();
+
+ // done
+ delete mon;
+
+ return 0;
+}
+
int nsec() const { return tv.tv_usec*1000; }
// ref accessors/modifiers
- time_t& sec_ref() { return tv.tv_sec; }
- long& usec_ref() { return tv.tv_usec; }
+ time_t& sec_ref() { return tv.tv_sec; }
+ // FIXME: tv.tv_usec is a __darwin_suseconds_t on Darwin.
+ // is just casting it to long& OK?
+ long& usec_ref() { return (long&) tv.tv_usec; }
// cast to double
operator double() {
//cout << "log " << filename << endl;
interval = g_conf.log_interval;
- start = g_clock.now(); // time 0!
+ //start = g_clock.now(); // time 0!
last_logged = 0;
wrote_header = -1;
open = false;
start = fromstart;
}
fromstart -= start;
-
+
while (force ||
((fromstart.sec() > last_logged) &&
(fromstart.sec() - last_logged >= interval))) {
g_timer.add_event_after(g_conf.debug_after, new C_Debug);
- assert(args.size() == 1);
- char *dev = args[0];
- cerr << "dev " << dev << endl;
-
- // who am i? peek at superblock!
- OSDSuperblock sb;
- ObjectStore *store = new Ebofs(dev);
- bufferlist bl;
- store->mount();
- int r = store->read(object_t(0,0), 0, sizeof(sb), bl);
- if (r < 0) {
- cerr << "couldn't read superblock object on " << dev << endl;
- exit(0);
+ char *dev;
+ int whoami = -1;
+ for (unsigned i=0; i<args.size(); i++) {
+ if (strcmp(args[i],"--dev") == 0)
+ dev = args[++i];
+ else if (strcmp(args[i],"--osd") == 0)
+ whoami = atoi(args[++i]);
+ else {
+ cerr << "unrecognized arg " << args[i] << endl;
+ return -1;
+ }
}
- bl.copy(0, sizeof(sb), (char*)&sb);
- store->umount();
- delete store;
+ cout << "dev " << dev << endl;
+
- cout << "osd fs says i am osd" << sb.whoami << endl;
+ if (whoami < 0) {
+ // who am i? peek at superblock!
+ OSDSuperblock sb;
+ ObjectStore *store = new Ebofs(dev);
+ bufferlist bl;
+ store->mount();
+ int r = store->read(object_t(0,0), 0, sizeof(sb), bl);
+ if (r < 0) {
+ cerr << "couldn't read superblock object on " << dev << endl;
+ exit(0);
+ }
+ bl.copy(0, sizeof(sb), (char*)&sb);
+ store->umount();
+ delete store;
+ whoami = sb.whoami;
+
+ cout << "osd fs says i am osd" << whoami << endl;
+ } else {
+ cout << "command line arg says i am osd" << whoami << endl;
+ }
// load monmap
- bl.clear();
- int fd = ::open(".ceph_monmap", O_RDONLY);
- assert(fd >= 0);
- struct stat st;
- ::fstat(fd, &st);
- bufferptr bp(st.st_size);
- bl.append(bp);
- ::read(fd, (void*)bl.c_str(), bl.length());
- ::close(fd);
-
- MonMap *monmap = new MonMap;
- monmap->decode(bl);
+ MonMap monmap;
+ int r = monmap.read(".ceph_monmap");
+ assert(r >= 0);
// start up network
- rank.set_namer(monmap->get_inst(0).addr);
rank.start_rank();
// start osd
- Messenger *m = rank.register_entity(MSG_ADDR_OSD(sb.whoami));
+ Messenger *m = rank.register_entity(MSG_ADDR_OSD(whoami));
assert(m);
- OSD *osd = new OSD(sb.whoami, m, monmap, dev);
+ OSD *osd = new OSD(whoami, m, &monmap, dev);
osd->init();
// wait
#include<crypto++/aes.h>
#include<crypto++/rijndael.h>
-#include"crypto_config.h"
+//#include"crypto_config.h"
#include<iostream>
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include <sys/stat.h>
+#include <iostream>
+#include <string>
+using namespace std;
+
+#include "config.h"
+
+#include "client/SyntheticClient.h"
+#include "client/Client.h"
+#include "client/fuse.h"
+
+#include "msg/SimpleMessenger.h"
+
+#include "common/Timer.h"
+
+#ifndef DARWIN
+#include <envz.h>
+#endif // DARWIN
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+
+int main(int argc, char **argv, char *envp[]) {
+
+ //cerr << "cfuse starting " << myrank << "/" << world << endl;
+ vector<char*> args;
+ argv_to_vec(argc, argv, args);
+ parse_config_options(args);
+ parse_syn_options(args); // for SyntheticClient
+
+ // args for fuse
+ vec_to_argv(args, argc, argv);
+
+ // load monmap
+ MonMap monmap;
+ int r = monmap.read(".ceph_monmap");
+ assert(r >= 0);
+
+ // start up network
+ rank.start_rank();
+
+ // start client
+ Client *client = new Client(rank.register_entity(MSG_ADDR_CLIENT_NEW), &monmap);
+ client->init();
+
+ // start syntheticclient
+ SyntheticClient *syn = new SyntheticClient(client);
+
+ // start up fuse
+ // use my argc, argv (make sure you pass a mount point!)
+ cout << "mounting" << endl;
+ client->mount();
+
+ cout << "starting syn client" << endl;
+ syn->start_thread();
+
+ // wait
+ syn->join_thread();
+
+ // unmount
+ client->unmount();
+ cout << "unmounted" << endl;
+ client->shutdown();
+
+ delete client;
+
+ // wait for messenger to finish
+ rank.wait();
+
+ return 0;
+}
+
#include <sys/ioctl.h>
#ifndef __CYGWIN__
+#ifndef DARWIN
#include <linux/fs.h>
#endif
+#endif
/*******************************************
int BlockDevice::open_fd()
{
+#ifdef DARWIN
+ int fd = ::open(dev.c_str(), O_RDWR|O_SYNC, 0);
+ ::fcntl(fd, F_NOCACHE);
+ return fd;
+#else
return ::open(dev.c_str(), O_RDWR|O_SYNC|O_DIRECT, 0);
+#endif
}
int BlockDevice::open(kicker *idle)
#include "Ebofs.h"
#include <errno.h>
+#ifdef DARWIN
+#include <sys/param.h>
+#include <sys/mount.h>
+#include <sys/statvfs.h>
+#else
#include <sys/vfs.h>
+#endif // DARWIN
// *******************
buf->f_files = nodepool.num_total(); /* total file nodes in file system */
buf->f_ffree = nodepool.num_free(); /* free file nodes in fs */
//buf->f_fsid = 0; /* file system id */
+#ifndef DARWIN
buf->f_namelen = 8; /* maximum length of filenames */
+#endif // DARWIN
return 0;
}
}
MonMap *monmap = new MonMap(g_conf.num_mon);
+ monmap->mon_inst[0].rank = 0; // hack ; see FakeMessenger.cc
char hostname[100];
gethostname(hostname,100);
OSD *mdsosd[NUMMDS];
for (int i=0; i<NUMMDS; i++) {
//cerr << "mds" << i << " on rank " << myrank << " " << hostname << "." << pid << endl;
- mds[i] = new MDS(i, new FakeMessenger(MSG_ADDR_MDS(i)), monmap);
+ mds[i] = new MDS(-1, new FakeMessenger(MSG_ADDR_MDS_NEW), monmap);
if (g_conf.mds_local_osd)
mdsosd[i] = new OSD(i+10000, new FakeMessenger(MSG_ADDR_OSD(i+10000)), monmap);
start++;
class raw_mmap_pages : public raw {
public:
raw_mmap_pages(unsigned l) : raw(l) {
- data = (char*)::mmap(NULL, len, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, -1, 0);
+ data = (char*)::mmap(NULL, len, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANON, -1, 0);
inc_total_alloc(len);
}
~raw_mmap_pages() {
class raw_posix_aligned : public raw {
public:
raw_posix_aligned(unsigned l) : raw(l) {
+#ifdef DARWIN
+ data = (char *) valloc (len);
+#else
::posix_memalign((void**)&data, BUFFER_PAGE_SIZE, len);
+#endif /* DARWIN */
inc_total_alloc(len);
}
~raw_posix_aligned() {
return H(x.c_str());
}
};
+
+ template<> struct hash<__int64_t> {
+ size_t operator()(__int64_t __x) const {
+ static hash<__int32_t> H;
+ return H((__x >> 32) ^ (__x & 0xffffffff));
+ }
+ };
+
}
this->mds = mds;
opening = false;
opened = false;
-
+}
+
+void AnchorTable::init_inode()
+{
memset(&table_inode, 0, sizeof(table_inode));
table_inode.ino = MDS_INO_ANCHORTABLE+mds->get_nodeid();
table_inode.layout = g_OSD_FileLayout;
}
+void AnchorTable::reset()
+{
+ init_inode();
+ opened = true;
+ anchor_map.clear();
+}
+
/*
* basic updates
*/
}
// send reply
- mds->messenger->send_message(reply, m->get_source(), m->get_source_port());
+ mds->messenger->send_message(reply, m->get_source(), m->get_source_inst(), m->get_source_port());
delete m;
}
void AnchorTable::load(Context *onfinish)
{
dout(7) << "load" << endl;
+ init_inode();
assert(!opened);
AnchorTable(MDS *mds);
protected:
+ void init_inode(); // call this before doing anything.
+
//
bool have_ino(inodeno_t ino) {
return true; // always in memory for now.
public:
// load/save entire table for now!
- void reset() {
- opened = true;
- anchor_map.clear();
- }
+ void reset();
void save(Context *onfinish);
void load(Context *onfinish);
void load_2(size_t size, bufferlist& bl);
#define dout(x) if (x <= g_conf.debug_mds) cout << g_clock.now() << " mds" << mds->get_nodeid() << ".idalloc: "
+void IdAllocator::init_inode()
+{
+ memset(&inode, 0, sizeof(inode));
+ inode.ino = MDS_INO_IDS_OFFSET + mds->get_nodeid();
+ inode.layout = g_OSD_FileLayout;
+}
+
+
idno_t IdAllocator::alloc_id(bool replay)
{
assert(is_active());
void IdAllocator::reset()
{
+ init_inode();
+
free.clear();
// use generic range FIXME THIS IS CRAP
{
dout(10) << "load" << endl;
+ init_inode();
+
assert(is_undef());
state = STATE_OPENING;
map<version_t, list<Context*> > waitfor_save;
public:
- IdAllocator(MDS *m, inode_t i) :
+ IdAllocator(MDS *m) :
mds(m),
- inode(i),
state(STATE_UNDEF),
version(0), committing_version(0), committed_version(0)
{
}
+
+ void init_inode();
// alloc or reclaim ids
idno_t alloc_id(bool replay=false);
*/
void Locker::handle_client_file_caps(MClientFileCaps *m)
{
- int client = MSG_ADDR_NUM(m->get_source());
+ int client = m->get_source().num();
CInode *in = mdcache->get_inode(m->get_ino());
Capability *cap = 0;
if (in)
{
assert(m->get_otype() == LOCK_OTYPE_IHARD);
- mds->logger->inc("lih");
+ if (mds->logger) mds->logger->inc("lih");
int from = m->get_asker();
CInode *in = mdcache->get_inode(m->get_ino());
{
assert(m->get_otype() == LOCK_OTYPE_IFILE);
- mds->logger->inc("lif");
+ if (mds->logger) mds->logger->inc("lif");
CInode *in = mdcache->get_inode(m->get_ino());
int from = m->get_asker();
#include "config.h"
#undef dout
-#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mds_balancer) cout << "mds" << mds->get_nodeid() << ".bal " << (g_clock.recent_now() - mds->logger->get_start()) << " "
+#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mds_balancer) cout << g_clock.now() << " mds" << mds->get_nodeid() << ".bal "
#define MIN_LOAD 50 // ??
#define MIN_REEXPORT 5 // will automatically reexport
MHeartbeat *hb = new MHeartbeat(load, beat_epoch);
hb->get_import_map() = import_map;
mds->messenger->send_message(hb,
- MSG_ADDR_MDS(i), MDS_PORT_BALANCER,
+ MSG_ADDR_MDS(i), mds->mdsmap->get_inst(i),
+ MDS_PORT_BALANCER,
MDS_PORT_BALANCER);
}
}
void MDBalancer::handle_heartbeat(MHeartbeat *m)
{
- dout(25) << "=== got heartbeat " << m->get_beat() << " from " << MSG_ADDR_NICE(m->get_source()) << " " << m->get_load() << endl;
+ dout(25) << "=== got heartbeat " << m->get_beat() << " from " << m->get_source().num() << " " << m->get_load() << endl;
if (!mds->mdcache->get_root()) {
dout(10) << "no root on handle" << endl;
return;
}
- int who = MSG_ADDR_NUM(m->get_source());
+ int who = m->get_source().num();
if (who == 0) {
dout(20) << " from mds0, new epoch" << endl;
}
- mds->logger->inc("cex");
+ if (mds->logger) mds->logger->inc("cex");
}
if (dn->get_inode()->is_cached_by(from)) {
dout(15) << "traverse: REP would replicate to mds" << from << ", but already cached_by "
- << MSG_ADDR_NICE(req->get_source()) << " dn " << *dn << endl;
+ << req->get_source() << " dn " << *dn << endl;
} else {
- dout(10) << "traverse: REP replicating to " << MSG_ADDR_NICE(req->get_source()) << " dn " << *dn << endl;
+ dout(10) << "traverse: REP replicating to " << req->get_source() << " dn " << *dn << endl;
MDiscoverReply *reply = new MDiscoverReply(cur->dir->ino());
reply->add_dentry( dn->get_name(), !dn->can_read());
reply->add_inode( dn->inode->replicate_to( from ) );
touch_inode(cur);
mds->mdstore->fetch_dir(cur->dir, ondelay);
- mds->logger->inc("cmiss");
+ if (mds->logger) mds->logger->inc("cmiss");
if (onfinish) delete onfinish;
return 1;
want,
false),
dauth, MDS_PORT_CACHE);
- mds->logger->inc("dis");
+ if (mds->logger) mds->logger->inc("dis");
}
// delay processing of current request.
path[depth],
new C_MDC_TraverseDiscover(onfinish, ondelay));
- mds->logger->inc("cmiss");
+ if (mds->logger) mds->logger->inc("cmiss");
return 1;
}
if (onfail == MDS_TRAVERSE_FORWARD) {
mds->send_message_mds(req, dauth, req->get_dest_port());
//show_imports();
- mds->logger->inc("cfw");
+ if (mds->logger) mds->logger->inc("cfw");
if (onfinish) delete onfinish;
delete ondelay;
return 2;
// request pins
request_pin_inode(req, ref);
- mds->logger->inc("req");
+ if (mds->logger) mds->logger->inc("req");
return true;
}
// log some stats *****
- mds->logger->set("c", lru.lru_get_size());
- mds->logger->set("cpin", lru.lru_get_num_pinned());
- mds->logger->set("ctop", lru.lru_get_top());
- mds->logger->set("cbot", lru.lru_get_bot());
- mds->logger->set("cptail", lru.lru_get_pintail());
- //mds->logger->set("buf",buffer_total_alloc);
+ if (mds->logger) {
+ mds->logger->set("c", lru.lru_get_size());
+ mds->logger->set("cpin", lru.lru_get_num_pinned());
+ mds->logger->set("ctop", lru.lru_get_top());
+ mds->logger->set("cbot", lru.lru_get_bot());
+ mds->logger->set("cptail", lru.lru_get_pintail());
+ //mds->logger->set("buf",buffer_total_alloc);
+ }
if (g_conf.log_pins) {
// pin
for (int i=0; i<CINODE_NUM_PINS; i++) {
- mds->logger2->set(cinode_pin_names[i],
- cinode_pins[i]);
+ if (mds->logger2) mds->logger2->set(cinode_pin_names[i],
+ cinode_pins[i]);
}
/*
for (map<int,int>::iterator it = cdir_pins.begin();
it++) {
//string s = "D";
//s += cdir_pin_names[it->first];
- mds->logger2->set(//s,
+ if (mds->logger2) mds->logger2->set(//s,
cdir_pin_names[it->first],
it->second);
}
request_cleanup(req);
delete req; // delete req
- mds->logger->inc("reply");
+ if (mds->logger) mds->logger->inc("reply");
//dump();
request_cleanup(req);
mds->send_message_mds(req, who, port);
- mds->logger->inc("fw");
+ if (mds->logger) mds->logger->inc("fw");
}
void MDCache::handle_cache_expire(MCacheExpire *m)
{
int from = m->get_from();
- int source = MSG_ADDR_NUM(m->get_source());
+ int source = m->get_source().num();
map<int, MCacheExpire*> proxymap;
if (m->get_from() == source) {
balancer = new MDBalancer(this);
anchorclient = new AnchorClient(messenger, mdsmap);
+ idalloc = new IdAllocator(this);
- // alloc
- {
- inode_t id_inode;
- memset(&id_inode, 0, sizeof(id_inode));
- id_inode.ino = MDS_INO_IDS_OFFSET + whoami;
- id_inode.layout = g_OSD_FileLayout;
- idalloc = new IdAllocator(this, id_inode);
- }
-
- // hack: anchortable on mds0.
- if (whoami == 0)
- anchormgr = new AnchorTable(this);
- else
- anchormgr = 0;
-
+ anchormgr = new AnchorTable(this);
server = new Server(this);
locker = new Locker(this, mdcache);
last_balancer_hash = last_balancer_heartbeat = g_clock.recent_now();
+
+ logger = logger2 = 0;
+
+ // i'm ready!
+ messenger->set_dispatcher(this);
+}
+
+MDS::~MDS() {
+ if (mdcache) { delete mdcache; mdcache = NULL; }
+ if (mdstore) { delete mdstore; mdstore = NULL; }
+ if (mdlog) { delete mdlog; mdlog = NULL; }
+ if (balancer) { delete balancer; balancer = NULL; }
+ if (idalloc) { delete idalloc; idalloc = NULL; }
+ if (anchormgr) { delete anchormgr; anchormgr = NULL; }
+ if (anchorclient) { delete anchorclient; anchorclient = NULL; }
+ if (osdmap) { delete osdmap; osdmap = 0; }
+
+ if (filer) { delete filer; filer = 0; }
+ if (objecter) { delete objecter; objecter = 0; }
+ if (messenger) { delete messenger; messenger = NULL; }
+
+ if (logger) { delete logger; logger = 0; }
+ if (logger2) { delete logger2; logger2 = 0; }
+
+}
+
+
+void MDS::reopen_log()
+{
+ // flush+close old log
+ if (logger) {
+ logger->flush(true);
+ delete logger;
+ }
+ if (logger2) {
+ logger2->flush(true);
+ delete logger2;
+ }
+
+
// log
string name;
name = "mds";
char n[80];
sprintf(n, "mds%d.cache", whoami);
logger2 = new Logger(n, (LogType*)&mds_cache_logtype);
-
-
- // i'm ready!
- messenger->set_dispatcher(this);
}
-MDS::~MDS() {
- if (mdcache) { delete mdcache; mdcache = NULL; }
- if (mdstore) { delete mdstore; mdstore = NULL; }
- if (mdlog) { delete mdlog; mdlog = NULL; }
- if (balancer) { delete balancer; balancer = NULL; }
- if (idalloc) { delete idalloc; idalloc = NULL; }
- if (anchormgr) { delete anchormgr; anchormgr = NULL; }
- if (anchorclient) { delete anchorclient; anchorclient = NULL; }
- if (osdmap) { delete osdmap; osdmap = 0; }
-
- if (filer) { delete filer; filer = 0; }
- if (objecter) { delete objecter; objecter = 0; }
- if (messenger) { delete messenger; messenger = NULL; }
-
- if (logger) { delete logger; logger = 0; }
- if (logger2) { delete logger2; logger2 = 0; }
-
-}
-
-
void MDS::send_message_mds(Message *m, int mds, int port, int fromport)
{
if (port && !fromport)
delete m;
+ // see who i am
+ int w = mdsmap->get_inst_rank(messenger->get_myinst());
+ if (w != whoami) {
+ whoami = w;
+ messenger->reset_myaddr(MSG_ADDR_MDS(w));
+ reopen_log();
+ }
+ dout(1) << "map says i am " << w << endl;
+
if (is_booting()) {
// we need an osdmap too.
int mon = monmap->pick_mon();
mdcache->shutdown_start();
// save anchor table
- if (whoami == 0)
+ if (mdsmap->get_anchortable() == whoami)
anchormgr->save(0); // FIXME FIXME
// flush log
last_log = now;
mds_load_t load = balancer->get_load();
- req_rate = logger->get("req");
-
- logger->set("l", (int)load.mds_load());
- logger->set("q", messenger->get_dispatch_queue_len());
- logger->set("buf", buffer_total_alloc);
-
- mdcache->log_stat(logger);
+ if (logger) {
+ req_rate = logger->get("req");
+
+ logger->set("l", (int)load.mds_load());
+ logger->set("q", messenger->get_dispatch_queue_len());
+ logger->set("buf", buffer_total_alloc);
+
+ mdcache->log_stat(logger);
+ }
// balance?
void MDS::handle_ping(MPing *m)
{
- dout(10) << " received ping from " << MSG_ADDR_NICE(m->get_source()) << " with seq " << m->seq << endl;
+ dout(10) << " received ping from " << m->get_source() << " with seq " << m->seq << endl;
messenger->send_message(new MPingAck(m),
m->get_source(), m->get_source_inst());
// start up, shutdown
int init();
+ void reopen_log();
void boot_mkfs();
void boot_mkfs_finish();
};
-ostream& operator<<(ostream& out, MDS& mds);
-
#endif
}
return false;
}
+
+ int get_inst_rank(const entity_inst_t& inst) {
+ for (map<int,entity_inst_t>::iterator p = mds_inst.begin();
+ p != mds_inst.end();
+ ++p) {
+ if (p->second == inst) return p->first;
+ }
+ return -1;
+ }
+
// serialize, unserialize
void encode(bufferlist& blist) {
dir->state_set(CDIR_STATE_FETCHING);
// stats
- mds->logger->inc("fdir");
+ if (mds->logger) mds->logger->inc("fdir");
// create return context
Context *fin = new C_MDS_Fetch( this, dir->ino() );
dir->set_committing_version();
// stats
- mds->logger->inc("cdir");
+ if (mds->logger) mds->logger->inc("cdir");
if (dir->is_hashed()) {
// hashed
CDir *dir = in->dir;
assert(dir);
- int from = MSG_ADDR_NUM(m->get_source());
+ int from = m->get_source().num();
assert(export_gather[dir].count(from));
export_gather[dir].erase(from);
dout(7) << "export_dir_prep_ack " << *dir << ", starting export" << endl;
// start export.
- export_dir_go(dir, MSG_ADDR_NUM(m->get_source()));
+ export_dir_go(dir, m->get_source().num());
// done
delete m;
// stats
- mds->logger->inc("ex");
- mds->logger->inc("iex", num_exported_inodes);
+ if (mds->logger) mds->logger->inc("ex");
+ if (mds->logger) mds->logger->inc("iex", num_exported_inodes);
show_imports();
}
assert(dir->is_frozen_tree_root()); // i'm exporting!
// remove from waiting list
- int from = MSG_ADDR_NUM(m->get_source());
+ int from = m->get_source().num();
assert(export_notify_ack_waiting[dir].count(from));
export_notify_ack_waiting[dir].erase(from);
// stats
- mds->logger->set("nex", cache->exports.size());
+ if (mds->logger) mds->logger->set("nex", cache->exports.size());
show_imports();
}
void Migrator::handle_export_dir_discover(MExportDirDiscover *m)
{
- assert(MSG_ADDR_NUM(m->get_source()) != mds->get_nodeid());
+ assert(m->get_source().num() != mds->get_nodeid());
dout(7) << "handle_export_dir_discover on " << m->get_path() << endl;
void Migrator::handle_export_dir_prep(MExportDirPrep *m)
{
- assert(MSG_ADDR_NUM(m->get_source()) != mds->get_nodeid());
+ assert(m->get_source().num() != mds->get_nodeid());
CInode *diri = cache->get_inode(m->get_ino());
assert(diri);
CDir *dir = diri->dir;
assert(dir);
- int oldauth = MSG_ADDR_NUM(m->get_source());
+ int oldauth = m->get_source().num();
dout(7) << "handle_export_dir, import " << *dir << " from " << oldauth << endl;
assert(dir->is_auth() == false);
cache->imports.erase(ex);
ex->state_clear(CDIR_STATE_IMPORT);
- mds->logger->inc("imex");
+ if (mds->logger) mds->logger->inc("imex");
// move nested exports under containing_import
for (set<CDir*>::iterator it = cache->nested_exports[ex].begin();
ex->get(CDIR_PIN_EXPORT); // all exports are pinned
cache->exports.insert(ex);
cache->nested_exports[containing_import].insert(ex);
- mds->logger->inc("imex");
+ if (mds->logger) mds->logger->inc("imex");
}
}
mds->balancer->add_import(dir);
// send notify's etc.
- dout(7) << "sending notifyack for " << *dir << " to old auth " << MSG_ADDR_NUM(m->get_source()) << endl;
+ dout(7) << "sending notifyack for " << *dir << " to old auth " << m->get_source().num() << endl;
mds->send_message_mds(new MExportDirNotifyAck(dir->inode->ino()),
m->get_source().num(), MDS_PORT_MIGRATOR);
it != dir->open_by.end();
it++) {
assert( *it != mds->get_nodeid() );
- if ( *it == MSG_ADDR_NUM(m->get_source()) ) continue; // not to old auth.
+ if ( *it == m->get_source().num() ) continue; // not to old auth.
- MExportDirNotify *notify = new MExportDirNotify(dir->ino(), MSG_ADDR_NUM(m->get_source()), mds->get_nodeid());
+ MExportDirNotify *notify = new MExportDirNotify(dir->ino(), m->get_source().num(), mds->get_nodeid());
notify->copy_exports(m->get_exports());
if (g_conf.mds_verify_export_dirauth)
// some stats
- mds->logger->inc("im");
- mds->logger->inc("iim", num_imported_inodes);
- mds->logger->set("nim", cache->imports.size());
+ if (mds->logger) {
+ mds->logger->inc("im");
+ mds->logger->inc("iim", num_imported_inodes);
+ mds->logger->set("nim", cache->imports.size());
+ }
// FIXME LOG IT
dout(5) << "done with import of " << *dir << endl;
show_imports();
- mds->logger->set("nex", cache->exports.size());
- mds->logger->set("nim", cache->imports.size());
+ if (mds->logger) {
+ mds->logger->set("nex", cache->exports.size());
+ mds->logger->set("nim", cache->imports.size());
+ }
// un auth pin (other exports can now proceed)
dir->auth_unpin();
CDir *dir = in->dir;
assert(dir);
- int from = MSG_ADDR_NUM(m->get_source());
+ int from = m->get_source().num();
assert(hash_gather[dir].count(from));
hash_gather[dir].erase(from);
CDir *dir = in->dir;
assert(dir);
- int from = MSG_ADDR_NUM(m->get_source());
+ int from = m->get_source().num();
assert(hash_gather[dir].count(from) == 1);
hash_gather[dir].erase(from);
assert(dir->is_hashed());
assert(dir->is_hashing());
- int from = MSG_ADDR_NUM(m->get_source());
+ int from = m->get_source().num();
assert(hash_gather[dir].count(from) == 1);
hash_gather[dir].erase(from);
assert(hash_gather.count(dir) == 0);
// stats
- //mds->logger->inc("nh", 1);
+ //if (mds->logger) mds->logger->inc("nh", 1);
}
dout(5) << "handle_hash_dir_notify " << *dir << endl;
int from = m->get_from();
- int source = MSG_ADDR_NUM(m->get_source());
+ int source = m->get_source().num();
if (dir->is_auth()) {
// gather notifies
assert(dir->is_hashed());
void Migrator::handle_hash_dir_discover(MHashDirDiscover *m)
{
- assert(MSG_ADDR_NUM(m->get_source()) != mds->get_nodeid());
+ assert(m->get_source().num() != mds->get_nodeid());
dout(7) << "handle_hash_dir_discover on " << m->get_path() << endl;
assert(dir->is_hashing());
dout(5) << "handle_hash_dir " << *dir << endl;
- int oldauth = MSG_ADDR_NUM(m->get_source());
+ int oldauth = m->get_source().num();
// content
import_hashed_content(dir, m->get_state(), m->get_nden(), oldauth);
dout(7) << "sending notifies" << endl;
for (int i=0; i<mds->get_mds_map()->get_num_mds(); i++) {
if (i == mds->get_nodeid()) continue;
- if (i == MSG_ADDR_NUM(m->get_source())) continue;
+ if (i == m->get_source().num()) continue;
mds->send_message_mds(new MHashDirNotify(dir->ino(), mds->get_nodeid()),
i, MDS_PORT_MIGRATOR);
}
CDir *dir = in->dir;
assert(dir);
- int from = MSG_ADDR_NUM(m->get_source());
+ int from = m->get_source().num();
dout(7) << "handle_unhash_dir_prep_ack from " << from << " " << *dir << endl;
if (!m->did_assim()) {
assert(dir->is_hashed());
// assimilate content
- int from = MSG_ADDR_NUM(m->get_source());
+ int from = m->get_source().num();
import_hashed_content(dir, m->get_state(), m->get_nden(), from);
delete m;
assert(dir->is_frozen_dir());
// done?
- int from = MSG_ADDR_NUM(m->get_source());
+ int from = m->get_source().num();
assert(hash_gather[dir].count(from));
hash_gather[dir].erase(from);
delete m;
assert(dir->is_unhashing());
assert(!dir->is_auth());
- int from = MSG_ADDR_NUM(m->get_source());
+ int from = m->get_source().num();
assert(hash_gather[dir].count(from) == 1);
hash_gather[dir].erase(from);
delete m;
assert(in);
dout(7) << "handle_rename_notify_ack on " << *in << endl;
- int source = MSG_ADDR_NUM(m->get_source());
+ int source = m->get_source().num();
rename_waiting_for_ack[in->ino()].erase(source);
if (rename_waiting_for_ack[in->ino()].empty()) {
// last one!
// HACK
bufferlist bufstate;
bufstate.claim_append(m->get_inode_state());
- cache->migrator->decode_import_inode(destdn, bufstate, off, MSG_ADDR_NUM(m->get_source()));
+ cache->migrator->decode_import_inode(destdn, bufstate, off, m->get_source().num());
CInode *in = destdn->inode;
assert(in);
// ok, send notifies.
set<int> notify;
for (int i=0; i<mds->get_mds_map()->get_num_mds(); i++) {
- if (i != MSG_ADDR_NUM(m->get_source()) && // except the source
+ if (i != m->get_source().num() && // except the source
i != mds->get_nodeid()) // and the dest
notify.insert(i);
}
- file_rename_notify(in, srcdir, srcname, destdir, destname, notify, MSG_ADDR_NUM(m->get_source()));
+ file_rename_notify(in, srcdir, srcname, destdir, destname, notify, m->get_source().num());
delete m;
}
#include "config.h"
#undef dout
-#define dout(l) if (l<=g_conf.debug || l <= g_conf.debug_mds) cout << g_clock.now() << " mds" << whoami << ".server "
-#define derr(l) if (l<=g_conf.debug || l <= g_conf.debug_mds) cout << g_clock.now() << " mds" << whoami << ".server "
+#define dout(l) if (l<=g_conf.debug || l <= g_conf.debug_mds) cout << g_clock.now() << " mds" << mds->get_nodeid() << ".server "
+#define derr(l) if (l<=g_conf.debug || l <= g_conf.debug_mds) cout << g_clock.now() << " mds" << mds->get_nodeid() << ".server "
void Server::dispatch(Message *m)
void Server::handle_client_mount(MClientMount *m)
{
- int n = MSG_ADDR_NUM(m->get_source());
+ int n = m->get_source().num();
dout(3) << "mount by client" << n << endl;
mds->clientmap.add_mount(n, m->get_source_inst());
- assert(whoami == 0); // mds0 mounts/unmounts
+ assert(mds->get_nodeid() == 0); // mds0 mounts/unmounts
// ack
messenger->send_message(new MClientMountAck(m, mds->mdsmap, mds->osdmap),
void Server::handle_client_unmount(Message *m)
{
- int n = MSG_ADDR_NUM(m->get_source());
+ int n = m->get_source().num();
dout(3) << "unmount by client" << n << endl;
- assert(whoami == 0); // mds0 mounts/unmounts
+ assert(mds->get_nodeid() == 0); // mds0 mounts/unmounts
mds->clientmap.rem_mount(n);
// include trace
if (tracei) {
- reply->set_trace_dist( tracei, whoami );
+ reply->set_trace_dist( tracei, mds->get_nodeid() );
}
// send reply
ref = mdcache->get_inode(req->get_ino()); // fixme someday no ino needed?
if (!ref) {
- int next = whoami + 1;
+ int next = mds->get_nodeid() + 1;
if (next >= mds->mdsmap->get_num_mds()) next = 0;
dout(10) << "got request on ino we don't have, passing buck to " << next << endl;
mds->send_message_mds(req, next, MDS_PORT_SERVER);
// hashed?
if (dir->is_hashed() &&
- whoami != mds->hash_dentry( dir->ino(), it->first ))
+ mds->get_nodeid() != mds->hash_dentry( dir->ino(), it->first ))
continue;
// is dentry readable?
// add this item
// note: InodeStat makes note of whether inode data is readable.
dnls.push_back( it->first );
- inls.push_back( new InodeStat(in, whoami) );
+ inls.push_back( new InodeStat(in, mds->get_nodeid()) );
numfiles++;
}
return numfiles;
assert(dir->is_hashed());
// move items to hashed_readdir gather
- int from = MSG_ADDR_NUM(m->get_source());
+ int from = m->get_source().num();
assert(dir->hashed_readdir.count(from) == 0);
dir->hashed_readdir[from].first.splice(dir->hashed_readdir[from].first.begin(),
m->get_in());
if (cur->dir)
dirauth = cur->dir->authority();
assert(dirauth >= 0);
- assert(dirauth != whoami);
+ assert(dirauth != mds->get_nodeid());
// forward to authority
dout(10) << " forwarding readdir to authority " << dirauth << endl;
// get local bits
encode_dir_contents(cur->dir,
- dir->hashed_readdir[whoami].first,
- dir->hashed_readdir[whoami].second);
+ dir->hashed_readdir[mds->get_nodeid()].first,
+ dir->hashed_readdir[mds->get_nodeid()].second);
// request other bits
for (int i=0; i<mds->mdsmap->get_num_mds(); i++) {
- if (i == whoami) continue;
+ if (i == mds->get_nodeid()) continue;
mds->send_message_mds(new MHashReaddir(dir->ino()), i, MDS_PORT_SERVER);
}
// . too
dnls.push_back(".");
- inls.push_back(new InodeStat(cur, whoami));
+ inls.push_back(new InodeStat(cur, mds->get_nodeid()));
++numfiles;
// yay, reply
// make sure it's my dentry
int dnauth = dir->dentry_authority(name);
- if (dnauth != whoami) {
+ if (dnauth != mds->get_nodeid()) {
// fw
dout(7) << "mknod on " << req->get_path() << ", dentry " << *dir << " dn " << name << " not mine, fw to " << dnauth << endl;
// make sure it's my dentry
int dauth = dir->dentry_authority(dname);
- if (dauth != whoami) {
+ if (dauth != mds->get_nodeid()) {
// fw
dout(7) << "link on " << req->get_path() << ", dn " << dname << " in " << *dir << " not mine, fw to " << dauth << endl;
mdcache->request_forward(req, dauth);
string dname = req->get_filepath().last_bit();
int dauth = dir->dentry_authority(dname);
- if (whoami != dauth) {
+ if (mds->get_nodeid() != dauth) {
// ugh, exported out from under us
dout(7) << "ugh, forwarded out from under us, dentry auth is " << dauth << endl;
mdcache->request_forward(req, dauth);
} else {
// remote: send nlink++ request, wait
dout(7) << "target is remote, sending InodeLink" << endl;
- mds->send_message_mds(new MInodeLink(targeti->ino(), whoami), targeti->authority(), MDS_PORT_CACHE);
+ mds->send_message_mds(new MInodeLink(targeti->ino(), mds->get_nodeid()), targeti->authority(), MDS_PORT_CACHE);
// wait
targeti->add_waiter(CINODE_WAIT_LINK,
// does it exist?
CDentry *dn = dir->lookup(name);
if (!dn) {
- if (dnauth == whoami) {
+ if (dnauth == mds->get_nodeid()) {
dout(7) << "handle_client_rmdir/unlink dne " << name << " in " << *dir << endl;
reply_request(req, -ENOENT);
} else {
}
// am i dentry auth?
- if (dnauth != whoami) {
+ if (dnauth != mds->get_nodeid()) {
// not auth; forward!
dout(7) << "handle_client_unlink not auth for " << *dir << " dn " << dn->name << ", fwd to " << dnauth << endl;
mdcache->request_forward(req, dnauth);
// make sure it's my dentry
int srcauth = srcdir->dentry_authority(srcname);
- if (srcauth != whoami) {
+ if (srcauth != mds->get_nodeid()) {
// fw
dout(7) << "rename on " << req->get_path() << ", dentry " << *srcdir << " dn " << srcname << " not mine, fw to " << srcauth << endl;
mdcache->request_forward(req, srcauth);
dout(7) << "handle_client_rename_2 destname " << destname << " destdir " << *destdir << " auth " << destauth << endl;
//
- if (srcauth != whoami ||
- destauth != whoami) {
+ if (srcauth != mds->get_nodeid() ||
+ destauth != mds->get_nodeid()) {
dout(7) << "rename has remote dest " << destauth << endl;
dout(7) << "FOREIGN RENAME" << endl;
//everybody = true;
//}
- bool srclocal = srcdn->dir->dentry_authority(srcdn->name) == whoami;
- bool destlocal = destdir->dentry_authority(destname) == whoami;
+ bool srclocal = srcdn->dir->dentry_authority(srcdn->name) == mds->get_nodeid();
+ bool destlocal = destdir->dentry_authority(destname) == mds->get_nodeid();
dout(7) << "handle_client_rename_local: src local=" << srclocal << " " << *srcdn << endl;
if (destdn) {
newdir->is_auth() &&
!newdir->is_hashing()) {
int dest = rand() % mds->mdsmap->get_num_mds();
- if (dest != whoami) {
+ if (dest != mds->get_nodeid()) {
dout(10) << "exporting new dir " << *newdir << " in replicated parent " << *diri->dir << endl;
mdcache->migrator->export_dir(newdir, dest);
}
if (mode != FILE_MODE_R && mode != FILE_MODE_LAZY &&
!cur->is_auth()) {
int auth = cur->authority();
- assert(auth != whoami);
+ assert(auth != mds->get_nodeid());
dout(9) << "open writeable on replica for " << *cur << " fw to auth " << auth << endl;
mdcache->request_forward(req, auth);
MDCache *mdcache;
MDLog *mdlog;
Messenger *messenger;
- int whoami;
__uint64_t stat_ops;
Server(MDS *m) :
mds(m),
mdcache(mds->mdcache), mdlog(mds->mdlog),
- messenger(mds->messenger), whoami(mds->get_nodeid()),
+ messenger(mds->messenger),
stat_ops(0) {
}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+#ifndef __MCLIENTBOOT_H
+#define __MCLIENTBOOT_H
+
+#include "msg/Message.h"
+
+class MClientBoot : public Message {
+
+ public:
+ MClientBoot() : Message(MSG_CLIENT_BOOT) {
+ }
+
+ char *get_type_name() { return "Cboot"; }
+
+ virtual void decode_payload(crope& s, int& off) {
+ }
+ virtual void encode_payload(crope& s) {
+ }
+};
+
+#endif
#include "msg/Message.h"
class MClientMount : public Message {
- long pcid;
- int mkfs;
public:
MClientMount() : Message(MSG_CLIENT_MOUNT) {
- pcid = 0;
- mkfs = 0;
}
- void set_mkfs(int m) { mkfs = m; }
- int get_mkfs() { return mkfs; }
-
- void set_pcid(long pcid) { this->pcid = pcid; }
- long get_pcid() { return pcid; }
-
char *get_type_name() { return "Cmnt"; }
virtual void decode_payload(crope& s, int& off) {
- s.copy(off, sizeof(pcid), (char*)&pcid);
- off += sizeof(pcid);
- s.copy(off, sizeof(mkfs), (char*)&mkfs);
- off += sizeof(mkfs);
}
virtual void encode_payload(crope& s) {
- s.append((char*)&pcid, sizeof(pcid));
- s.append((char*)&mkfs, sizeof(mkfs));
}
};
inline ostream& operator<<(ostream& out, MOSDOp& op)
{
- return out << "MOSDOp(" << MSG_ADDR_NICE(op.get_client()) << "." << op.get_tid()
+ return out << "MOSDOp(" << op.get_client() << "." << op.get_tid()
<< " op " << MOSDOp::get_opname(op.get_op())
<< " oid " << hex << op.get_oid() << dec << " " << &op << ")";
}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+
+#include <sys/stat.h>
+#include <iostream>
+#include <string>
+using namespace std;
+
+#include "config.h"
+
+#include "mon/MonMap.h"
+
+
+bool parse_ip_port(const char *s, tcpaddr_t& tcpaddr)
+{
+ unsigned char addr[4];
+ int port = 0;
+
+ int count = 0; // digit count
+
+ while (1) {
+ // parse the #.
+ int val = 0;
+ int numdigits = 0;
+
+ while (*s >= '0' && *s <= '9') {
+ int digit = *s - '0';
+ //cout << "digit " << digit << endl;
+ val *= 10;
+ val += digit;
+ numdigits++;
+ s++;
+ }
+ //cout << "val " << val << endl;
+
+ if (numdigits == 0) return false; // no digits
+ if (count < 3 && *s != '.') return false; // should have 3 periods
+ if (count == 3 && *s != ':') return false; // then a colon
+ s++;
+
+ if (count <= 3)
+ addr[count] = val;
+ else
+ port = val;
+
+ count++;
+ if (count == 5) break;
+ }
+
+ // copy into inst
+ memcpy((char*)&tcpaddr.sin_addr.s_addr, (char*)addr, 4);
+ tcpaddr.sin_port = port;
+
+ return true;
+}
+
+
+int main(int argc, char **argv)
+{
+ vector<char*> args;
+ argv_to_vec(argc, argv, args);
+
+ MonMap monmap;
+
+ char *outfn = ".ceph_monmap";
+
+ for (unsigned i=0; i<args.size(); i++) {
+ if (strcmp(args[i], "--out") == 0)
+ outfn = args[++i];
+ else {
+ // parse ip:port
+ tcpaddr_t addr;
+ if (!parse_ip_port(args[i], addr)) {
+ cerr << "mkmonmap: invalid ip:port '" << args[i] << "'" << endl;
+ return -1;
+ }
+ entity_inst_t inst;
+ inst.set_addr(addr);
+ cout << "mkmonmap: mon" << monmap.num_mon << " " << inst << endl;
+ monmap.add_mon(inst);
+ }
+ }
+
+ if (monmap.num_mon == 0) {
+ cerr << "usage: mkmonmap ip:port [...]" << endl;
+ return -1;
+ }
+
+ // write it out
+ cout << "mkmonmap: writing monmap to " << outfn << " (" << monmap.num_mon << " monitors)" << endl;
+ int r = monmap.write(outfn);
+ assert(r >= 0);
+
+ return 0;
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+#include "ClientMonitor.h"
+#include "Monitor.h"
+#include "MDSMonitor.h"
+
+#include "messages/MClientBoot.h"
+#include "messages/MMDSMap.h"
+//#include "messages/MMDSFailure.h"
+
+#include "common/Timer.h"
+
+#include "config.h"
+#undef dout
+#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cout << g_clock.now() << " mon" << mon->whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".client "
+#define derr(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cerr << g_clock.now() << " mon" << mon->whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".client "
+
+
+
+
+void ClientMonitor::dispatch(Message *m)
+{
+ switch (m->get_type()) {
+
+ case MSG_CLIENT_BOOT:
+ handle_client_boot((MClientBoot*)m);
+ break;
+
+ /*
+ case MSG_client_FAILURE:
+ handle_client_failure((MClientFailure*)m);
+ break;
+ */
+
+ default:
+ assert(0);
+ }
+}
+
+void ClientMonitor::handle_client_boot(MClientBoot *m)
+{
+ dout(7) << "client_boot from " << m->get_source() << " at " << m->get_source_inst() << endl;
+ assert(m->get_source().is_client());
+ int from = m->get_source().num();
+
+ // choose an MDS id
+ if (from < 0 ||
+ (client_map.count(m->get_source()) && client_map[m->get_source()] != m->get_source_inst())) {
+ from = ++num_clients;
+ dout(10) << "client_boot assigned client" << from << endl;
+ }
+
+ client_map[MSG_ADDR_CLIENT(from)] = m->get_source_inst();
+
+ // reply with latest mds map
+ mon->mdsmon->send_latest(MSG_ADDR_CLIENT(from), m->get_source_inst());
+ delete m;
+}
+
+/*
+void ClientMonitor::handle_mds_shutdown(Message *m)
+{
+ assert(m->get_source().is_mds());
+ int from = m->get_source().num();
+
+ mdsmap.mds_inst.erase(from);
+ mdsmap.all_mds.erase(from);
+
+ dout(7) << "mds_shutdown from " << m->get_source()
+ << ", still have " << mdsmap.all_mds
+ << endl;
+
+ // tell someone?
+ // fixme
+
+ delete m;
+}
+
+*/
+
+/*
+void ClientMonitor::bcast_latest_mds()
+{
+ dout(10) << "bcast_latest_mds " << mdsmap.get_epoch() << endl;
+
+ // tell mds
+ for (set<int>::iterator p = mdsmap.get_mds().begin();
+ p != mdsmap.get_mds().end();
+ p++) {
+ if (mdsmap.is_down(*p)) continue;
+ send_full(MSG_ADDR_MDS(*p), mdsmap.get_inst(*p));
+ }
+}
+
+*/
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef __CLIENTMONITOR_H
+#define __CLIENTMONITOR_H
+
+#include <map>
+#include <set>
+using namespace std;
+
+#include "include/types.h"
+#include "msg/Messenger.h"
+
+#include "mds/MDSMap.h"
+
+class Monitor;
+
+class ClientMonitor : public Dispatcher {
+ Monitor *mon;
+ Messenger *messenger;
+ Mutex &lock;
+
+ private:
+ int num_clients;
+ map<msg_addr_t,entity_inst_t> client_map;
+
+ void bcast_latest_mds();
+
+ //void accept_pending(); // accept pending, new map.
+ //void send_incremental(epoch_t since, msg_addr_t dest);
+
+ void handle_client_boot(class MClientBoot *m);
+
+ public:
+ ClientMonitor(Monitor *mn, Messenger *m, Mutex& l) : mon(mn), messenger(m), lock(l),
+ num_clients(0) { }
+
+ void dispatch(Message *m);
+ void tick(); // check state, take actions
+};
+
+#endif
old_views = views; // TODO deep copy
for (unsigned i=0; i<processes.size(); i++) {
mon->messenger->send_message(new MMonElectionCollect(read_num),
- MSG_ADDR_MON(processes[i]));
+ MSG_ADDR_MON(processes[i]),
+ mon->monmap->get_inst(processes[i]));
}
}
lock.Unlock();
refresh_num++;
MMonElectionRefresh *msg = new MMonElectionRefresh(whoami, registry[whoami], refresh_num);
for (unsigned i=0; i<processes.size(); i++) {
- mon->messenger->send_message(msg, MSG_ADDR_MON(processes[i]));
+ mon->messenger->send_message(msg, MSG_ADDR_MON(processes[i]), mon->monmap->get_inst(processes[i]));
}
// Start the trip timer
mon->messenger->send_message(new MMonElectionStatus(msg->get_source().num(),
msg->read_num,
registry),
- msg->get_source());
+ msg->get_source(),
+ mon->monmap->get_inst(msg->get_source().num()));
delete msg;
}
// reply to msg
mon->messenger->send_message(new MMonElectionAck(msg->p,
- msg->refresh_num),
- msg->get_source());
+ msg->refresh_num),
+ msg->get_source(),
+ mon->monmap->get_inst(msg->get_source().num()));
}
delete msg;
dout(7) << "mds_boot from " << m->get_source() << " at " << m->get_source_inst() << endl;
assert(m->get_source().is_mds());
int from = m->get_source().num();
+
+ // choose an MDS id
+ if (from < 0 || !mdsmap.is_down(from)) {
+ for (from=0; ; ++from)
+ if (mdsmap.is_down(from)) break;
+ dout(10) << "mds_boot assigned mds" << from << endl;
+ }
if (mdsmap.get_epoch() == 0) {
// waiting for boot!
awaiting_map.clear();
}
+void MDSMonitor::send_latest(msg_addr_t dest, const entity_inst_t& inst)
+{
+ // FIXME: check if we're locked, etc.
+ if (mdsmap.get_epoch() > 0)
+ send_full(dest, inst);
+ else
+ awaiting_map[dest] = inst;
+}
void handle_mds_getmap(class MMDSGetMap *m);
void handle_mds_shutdown(Message *m);
+
+
public:
MDSMonitor(Monitor *mn, Messenger *m, Mutex& l) : mon(mn), messenger(m), lock(l) {
create_initial();
void dispatch(Message *m);
void tick(); // check state, take actions
+
+ void send_latest(msg_addr_t dest, const entity_inst_t& inst);
+
};
#endif
#ifndef __MONMAP_H
#define __MONMAP_H
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+
#include "msg/Message.h"
#include "include/types.h"
MonMap(int s=0) : epoch(0), num_mon(s), mon_inst(s), last_mon(-1) {}
+ void add_mon(entity_inst_t inst) {
+ mon_inst.push_back(inst);
+ num_mon++;
+ }
+
// pick a mon.
// choice should be stable, unless we explicitly ask for a new one.
int pick_mon(bool newmon=false) {
_decode(mon_inst, blist, off);
}
+ int write(char *fn) {
+ // encode
+ bufferlist bl;
+ encode(bl);
+
+ // write
+ int fd = ::open(fn, O_RDWR|O_CREAT);
+ if (fd < 0) return fd;
+ ::fchmod(fd, 0644);
+ ::write(fd, (void*)bl.c_str(), bl.length());
+ ::close(fd);
+ return 0;
+ }
+
+ int read(char *fn) {
+ // read
+ bufferlist bl;
+ int fd = ::open(fn, O_RDONLY);
+ if (fd < 0) return fd;
+ struct stat st;
+ ::fstat(fd, &st);
+ bufferptr bp(st.st_size);
+ bl.append(bp);
+ ::read(fd, (void*)bl.c_str(), bl.length());
+ ::close(fd);
+
+ // decode
+ decode(bl);
+ return 0;
+ }
+
};
#endif
#include "OSDMonitor.h"
#include "MDSMonitor.h"
+#include "ClientMonitor.h"
#include "config.h"
#undef dout
// create
osdmon = new OSDMonitor(this, messenger, lock);
mdsmon = new MDSMonitor(this, messenger, lock);
+ clientmon = new ClientMonitor(this, messenger, lock);
// i'm ready!
messenger->set_dispatcher(this);
if (monmap) delete monmap;
if (osdmon) delete osdmon;
if (mdsmon) delete mdsmon;
+ if (clientmon) delete clientmon;
// die.
messenger->shutdown();
mdsmon->dispatch(m);
break;
+ // clients
+ case MSG_CLIENT_BOOT:
+ clientmon->dispatch(m);
+ break;
+
// elector messages
case MSG_MON_ELECTION_ACK:
class ObjectStore;
class OSDMonitor;
class MDSMonitor;
+class ClientMonitor;
class Monitor : public Dispatcher {
protected:
// my public services
OSDMonitor *osdmon;
MDSMonitor *mdsmon;
+ ClientMonitor *clientmon;
// messages
void handle_shutdown(Message *m);
friend class OSDMonitor;
friend class MDSMonitor;
+ friend class ClientMonitor;
public:
Monitor(int w, Messenger *m, MonMap *mm) :
mon_epoch(0),
state(STATE_STARTING),
leader(0),
- osdmon(0),
- mdsmon(0)
+ osdmon(0), mdsmon(0), clientmon(0)
{
// hack leader, until election works.
if (whoami == 0)
// global queue.
-map<msg_addr_t, FakeMessenger*> directory;
+int nranks = 0; // this identify each entity_inst_t
+
+map<int, FakeMessenger*> directory;
hash_map<int, Logger*> loggers;
LogType fakemsg_logtype;
-set<msg_addr_t> shutdown_set;
+set<int> shutdown_set;
Mutex lock;
Cond cond;
pthread_t thread_id;
+
class C_FakeKicker : public Context {
void finish(int r) {
dout(18) << "timer kick" << endl;
lock.Lock();
// messages
- map<msg_addr_t, FakeMessenger*>::iterator it = directory.begin();
+ map<int, FakeMessenger*>::iterator it = directory.begin();
while (it != directory.end()) {
+ FakeMessenger *mgr = it->second;
- dout(18) << "messenger " << it->second << " at " << MSG_ADDR_NICE(it->first) << " has " << it->second->num_incoming() << " queued" << endl;
+ dout(18) << "messenger " << mgr << " at " << mgr->get_myaddr() << " has " << mgr->num_incoming() << " queued" << endl;
- FakeMessenger *mgr = it->second;
if (!mgr->is_ready()) {
- dout(18) << "messenger " << it->second << " at " << MSG_ADDR_NICE(it->first) << " has no dispatcher, skipping" << endl;
+ dout(18) << "messenger " << mgr << " at " << mgr->get_myaddr() << " has no dispatcher, skipping" << endl;
it++;
continue;
}
if (m) {
//dout(18) << "got " << m << endl;
dout(1) << "---- '" << m->get_type_name()
- << "' from " << MSG_ADDR_NICE(m->get_source()) // << ':' << m->get_source_port()
- << " to " << MSG_ADDR_NICE(m->get_dest()) //<< ':' << m->get_dest_port()
+ << "' from " << m->get_source() // << ':' << m->get_source_port()
+ << " to " << m->get_dest() //<< ':' << m->get_dest_port()
<< " ---- " << m
<< endl;
// deal with shutdowns.. dleayed to avoid concurrent directory modification
if (!shutdown_set.empty()) {
- for (set<msg_addr_t>::iterator it = shutdown_set.begin();
+ for (set<int>::iterator it = shutdown_set.begin();
it != shutdown_set.end();
it++) {
dout(7) << "fakemessenger: removing " << *it << " from directory" << endl;
FakeMessenger::FakeMessenger(msg_addr_t me) : Messenger(me)
{
- myaddr = me;
+ entity_inst_t fakeinst;
lock.Lock();
- directory[ myaddr ] = this;
+ {
+ // assign rank
+ fakeinst.addr.sin_port =
+ fakeinst.rank = nranks++;
+ set_myinst(fakeinst);
+
+ // add to directory
+ directory[ fakeinst.rank ] = this;
+ }
lock.Unlock();
- cout << "fakemessenger " << myaddr << " messenger is " << this << endl;
+
+ cout << "fakemessenger " << get_myaddr() << " messenger is " << this << " at " << fakeinst << endl;
//g_timer.set_messenger(this);
{
//cout << "shutdown on messenger " << this << " has " << num_incoming() << " queued" << endl;
lock.Lock();
- assert(directory.count(myaddr) == 1);
- shutdown_set.insert(myaddr);
+ assert(directory.count(get_myinst().rank) == 1);
+ shutdown_set.insert(get_myinst().rank);
/*
directory.erase(myaddr);
}
*/
-int FakeMessenger::send_message(Message *m, msg_addr_t dest, int port, int fromport)
+void FakeMessenger::reset_myaddr(msg_addr_t m)
+{
+ dout(1) << "reset_myaddr from " << get_myaddr() << " to " << m << endl;
+ _set_myaddr(m);
+}
+
+
+int FakeMessenger::send_message(Message *m, msg_addr_t dest, entity_inst_t inst, int port, int fromport)
{
- m->set_source(myaddr, fromport);
+ m->set_source(get_myaddr(), fromport);
m->set_dest(dest, port);
//m->set_lamport_send_stamp( get_lamport() );
- entity_inst_t blank;
- m->set_source_inst(blank);
+ m->set_source_inst(get_myinst());
lock.Lock();
try {
#ifdef LOG_MESSAGES
// stats
- loggers[myaddr]->inc("+send",1);
+ loggers[get_myaddr()]->inc("+send",1);
loggers[dest]->inc("-recv",1);
char s[20];
sprintf(s,"+%s", m->get_type_name());
- loggers[myaddr]->inc(s);
+ loggers[get_myaddr()]->inc(s);
sprintf(s,"-%s", m->get_type_name());
loggers[dest]->inc(s);
#endif
// queue
- FakeMessenger *dm = directory[dest];
+ FakeMessenger *dm = directory[inst.rank];
if (!dm) {
- dout(1) << "** destination " << MSG_ADDR_NICE(dest) << " (" << dest << ") dne" << endl;
+ dout(1) << "** destination " << dest << " (" << inst << ") dne" << endl;
assert(dm);
}
dm->queue_incoming(m);
- dout(1) << "--> " << myaddr << " sending " << m << " '" << m->get_type_name() << "'"
- << " to " << MSG_ADDR_NICE(dest)
+ dout(1) << "--> " << get_myaddr() << " sending " << m << " '" << m->get_type_name() << "'"
+ << " to " << dest
<< endl;//" m " << dm << " has " << dm->num_incoming() << " queued" << endl;
}
class FakeMessenger : public Messenger {
protected:
- msg_addr_t myaddr;
-
class Logger *logger;
int qlen;
virtual int shutdown();
+ void reset_myaddr(msg_addr_t m);
+
// msg interface
- virtual int send_message(Message *m, msg_addr_t dest, int port=0, int fromport=0);
+ virtual int send_message(Message *m, msg_addr_t dest, entity_inst_t inst, int port=0, int fromport=0);
// events
//virtual void trigger_timer(Timer *t);
#include "messages/MOSDPGLog.h"
#include "messages/MOSDPGRemove.h"
+#include "messages/MClientBoot.h"
#include "messages/MClientMount.h"
#include "messages/MClientMountAck.h"
#include "messages/MClientRequest.h"
break;
// clients
+ case MSG_CLIENT_BOOT:
+ m = new MClientBoot();
+ break;
case MSG_CLIENT_MOUNT:
m = new MClientMount();
break;
#define MSG_CLIENT_FILECAPS 63
#define MSG_CLIENT_INODEAUTHUPDATE 64
-#define MSG_CLIENT_MOUNT 70
-#define MSG_CLIENT_MOUNTACK 71
-#define MSG_CLIENT_UNMOUNT 72
+#define MSG_CLIENT_BOOT 70
+#define MSG_CLIENT_MOUNT 71
+#define MSG_CLIENT_MOUNTACK 72
+#define MSG_CLIENT_UNMOUNT 73
// *** MDS ***
// use fixed offsets and static entity -> logical addr mapping!
#define MSG_ADDR_NAMER_BASE 0
-#define MSG_ADDR_RANK_BASE 0x10000000 // per-rank messenger services
-#define MSG_ADDR_MDS_BASE 0x20000000
-#define MSG_ADDR_OSD_BASE 0x30000000
-#define MSG_ADDR_MON_BASE 0x40000000
-#define MSG_ADDR_CLIENT_BASE 0x50000000
+#define MSG_ADDR_RANK_BASE 1
+#define MSG_ADDR_MDS_BASE 2
+#define MSG_ADDR_OSD_BASE 3
+#define MSG_ADDR_MON_BASE 4
+#define MSG_ADDR_CLIENT_BASE 5
-#define MSG_ADDR_TYPE_MASK 0xf0000000
-#define MSG_ADDR_NUM_MASK 0x0fffffff
+#define MSG_ADDR_NEW -1
-#define MSG_ADDR_NEW 0x0fffffff
-#define MSG_ADDR_UNDEF_BASE 0xffffffff
-
-
-/* old int way, which lacked type safety...
-typedef int msg_addr_t;
-
-#define MSG_ADDR_RANK(x) (MSG_ADDR_RANK_BASE + (x))
-#define MSG_ADDR_MDS(x) (MSG_ADDR_MDS_BASE + (x))
-#define MSG_ADDR_OSD(x) (MSG_ADDR_OSD_BASE + (x))
-#define MSG_ADDR_CLIENT(x) (MSG_ADDR_CLIENT_BASE + (x))
-
-#define MSG_ADDR_DIRECTORY 0
-#define MSG_ADDR_RANK_NEW MSG_ADDR_RANK(MSG_ADDR_NEW)
-#define MSG_ADDR_MDS_NEW MSG_ADDR_MDS(MSG_ADDR_NEW)
-#define MSG_ADDR_OSD_NEW MSG_ADDR_OSD(MSG_ADDR_NEW)
-#define MSG_ADDR_CLIENT_NEW MSG_ADDR_CLIENT(MSG_ADDR_NEW)
-
-#define MSG_ADDR_ISCLIENT(x) ((x) >= MSG_ADDR_CLIENT_BASE)
-#define MSG_ADDR_TYPE(x) (((x) & MSG_ADDR_TYPE_MASK) == MSG_ADDR_RANK_BASE ? "rank": \
- (((x) & MSG_ADDR_TYPE_MASK) == MSG_ADDR_CLIENT_BASE ? "client": \
- (((x) & MSG_ADDR_TYPE_MASK) == MSG_ADDR_OSD_BASE ? "osd": \
- (((x) & MSG_ADDR_TYPE_MASK) == MSG_ADDR_MDS_BASE ? "mds": \
- ((x) == MSG_ADDR_DIRECTORY ? "namer":"unknown")))))
-#define MSG_ADDR_NUM(x) ((x) & MSG_ADDR_NUM_MASK)
-#define MSG_ADDR_NICE(x) MSG_ADDR_TYPE(x) << MSG_ADDR_NUM(x)
-*/
// new typed msg_addr_t way!
class msg_addr_t {
public:
- int _addr;
+ int _type;
+ int _num;
- msg_addr_t() : _addr(MSG_ADDR_UNDEF_BASE) {}
- msg_addr_t(int t, int n) : _addr(t | n) {}
+ msg_addr_t() : _type(0), _num(0) {}
+ msg_addr_t(int t, int n) : _type(t), _num(n) {}
- int num() const { return _addr & MSG_ADDR_NUM_MASK; }
- int type() const { return _addr & MSG_ADDR_TYPE_MASK; }
+ int num() const { return _num; }
+ int type() const { return _type; }
const char *type_str() const {
switch (type()) {
case MSG_ADDR_RANK_BASE: return "rank";
bool is_namer() const { return type() == MSG_ADDR_NAMER_BASE; }
};
-inline bool operator== (const msg_addr_t& l, const msg_addr_t& r) { return l._addr == r._addr; }
-inline bool operator!= (const msg_addr_t& l, const msg_addr_t& r) { return l._addr != r._addr; }
-inline bool operator< (const msg_addr_t& l, const msg_addr_t& r) { return l._addr < r._addr; }
-
-//typedef struct msg_addr msg_addr_t;
+inline bool operator== (const msg_addr_t& l, const msg_addr_t& r) { return (l._type == r._type) && (l._num == r._num); }
+inline bool operator!= (const msg_addr_t& l, const msg_addr_t& r) { return (l._type != r._type) || (l._num != r._num); }
+inline bool operator< (const msg_addr_t& l, const msg_addr_t& r) { return (l._type < r._type) || (l._type == r._type && l._num < r._num); }
inline std::ostream& operator<<(std::ostream& out, const msg_addr_t& addr) {
//if (addr.is_namer()) return out << "namer";
- return out << addr.type_str() << addr.num();
+ if (addr.is_new() || addr.num() < 0)
+ return out << addr.type_str() << "?";
+ else
+ return out << addr.type_str() << addr.num();
}
-
namespace __gnu_cxx {
template<> struct hash< msg_addr_t >
{
size_t operator()( const msg_addr_t m ) const
{
static hash<int> H;
- return H(m._addr);
+ return H(m.type() ^ m.num());
}
};
}
#define MSG_ADDR_CLIENT_NEW MSG_ADDR_CLIENT(MSG_ADDR_NEW)
#define MSG_ADDR_NAMER_NEW MSG_ADDR_NAMER(MSG_ADDR_NEW)
-#define MSG_ADDR_ISCLIENT(x) x.is_client()
-#define MSG_ADDR_TYPE(x) x.type_str()
-#define MSG_ADDR_NUM(x) x.num()
-#define MSG_ADDR_NICE(x) x.type_str() << x.num()
-
-
-
class entity_inst_t {
public:
tcpaddr_t addr;
- int rank;
+ __int64_t rank;
entity_inst_t() : rank(-1) {
memset(&addr, 0, sizeof(addr));
entity_inst_t(tcpaddr_t& a, int r) : addr(a), rank(r) {
memset(&addr, 0, sizeof(addr));
}
+
+ void set_addr(tcpaddr_t a) {
+ addr = a;
+
+ // figure out rank
+ rank = *((unsigned*)&a.sin_addr.s_addr);
+ rank |= (__uint64_t)a.sin_port << 32;
+ }
};
inline bool operator==(const entity_inst_t& a, const entity_inst_t& b) { return a.rank == b.rank && a.addr == b.addr; }
inline ostream& operator<<(ostream& out, const entity_inst_t &i)
{
- return out << "rank" << i.rank << "_" << i.addr;
+ //return out << "rank" << i.rank << "_" << i.addr;
+ return out << i.addr;
}
int get_source_port() { return env.source_port; }
entity_inst_t& get_source_inst() { return env.source_inst; }
- void set_source_inst(entity_inst_t &i) { env.source_inst = i; }
+ void set_source_inst(const entity_inst_t &i) { env.source_inst = i; }
// PAYLOAD ----
void reset_payload() {
}
virtual void print(ostream& out) {
- out << "message(type=" << get_type() << ")";
+ out << get_type_name();
}
};
private:
Dispatcher *dispatcher;
msg_addr_t _myaddr;
+ entity_inst_t _myinst;
public:
Messenger(msg_addr_t w) : dispatcher(0), _myaddr(w) { }
virtual ~Messenger() { }
- void set_myaddr(msg_addr_t m) { _myaddr = m; }
+ const entity_inst_t &get_myinst() { return _myinst; }
+ void set_myinst(entity_inst_t& v) { _myinst = v; }
+
msg_addr_t get_myaddr() { return _myaddr; }
+ void _set_myaddr(msg_addr_t m) { _myaddr = m; }
+
+ virtual void reset_myaddr(msg_addr_t m) = 0;
virtual int shutdown() = 0;
// send message
virtual void prepare_dest(const entity_inst_t& inst) {}
- virtual int send_message(Message *m, msg_addr_t dest, int port=0, int fromport=0) = 0;
- virtual int send_message(Message *m, msg_addr_t dest, const entity_inst_t& inst,
- int port=0, int fromport=0) {
- return send_message(m, dest, port, fromport); // overload me!
- }
+ //virtual int send_message(Message *m, msg_addr_t dest, int port=0, int fromport=0) = 0;
+ virtual int send_message(Message *m, msg_addr_t dest, entity_inst_t inst,
+ int port=0, int fromport=0) = 0;
// make a procedure call
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "SimpleMessenger.h"
+
+#include <errno.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+
+#include "config.h"
+
+#include "messages/MGenericMessage.h"
+#include "messages/MNSConnect.h"
+#include "messages/MNSConnectAck.h"
+#include "messages/MNSRegister.h"
+#include "messages/MNSRegisterAck.h"
+#include "messages/MNSLookup.h"
+#include "messages/MNSLookupReply.h"
+#include "messages/MNSFailure.h"
+
+//#include "messages/MFailure.h"
+
+#include <netdb.h>
+
+
+#undef dout
+#define dout(l) if (l<=g_conf.debug_ms) cout << g_clock.now() << " -- " << rank.my_inst.addr << " "
+#define derr(l) if (l<=g_conf.debug_ms) cerr << g_clock.now() << " -- " << rank.my_inst.addr << " "
+
+
+
+#include "tcp.cc"
+
+
+Rank rank;
+
+
+
+/********************************************
+ * Accepter
+ */
+
+int Rank::Accepter::start()
+{
+ // bind to a socket
+ dout(10) << "accepter.start binding to listen " << endl;
+
+ /* socket creation */
+ listen_sd = socket(AF_INET,SOCK_STREAM,0);
+ assert(listen_sd > 0);
+
+ /* bind to port */
+ int rc = bind(listen_sd, (struct sockaddr *) &rank.listen_addr, sizeof(rank.listen_addr));
+ if (rc < 0)
+ derr(0) << "accepter.start unable to bind to " << rank.listen_addr << endl;
+ assert(rc >= 0);
+
+ socklen_t llen = sizeof(rank.listen_addr);
+ getsockname(listen_sd, (sockaddr*)&rank.listen_addr, &llen);
+
+ int myport = rank.listen_addr.sin_port;
+
+ // listen!
+ rc = ::listen(listen_sd, 1000);
+ assert(rc >= 0);
+
+ //dout(10) << "accepter.start listening on " << myport << endl;
+
+ // my address is...
+ char host[100];
+ bzero(host, 100);
+ gethostname(host, 100);
+ //dout(10) << "accepter.start my hostname is " << host << endl;
+
+ struct hostent *myhostname = gethostbyname( host );
+
+ struct sockaddr_in my_addr;
+ memset(&my_addr, 0, sizeof(my_addr));
+
+ my_addr.sin_family = myhostname->h_addrtype;
+ memcpy((char *) &my_addr.sin_addr.s_addr,
+ myhostname->h_addr_list[0],
+ myhostname->h_length);
+ my_addr.sin_port = myport;
+
+ rank.listen_addr = my_addr;
+
+ dout(10) << "accepter.start listen addr is " << rank.listen_addr << endl;
+
+ // start thread
+ create();
+
+ return 0;
+}
+
+void *Rank::Accepter::entry()
+{
+ dout(10) << "accepter starting" << endl;
+
+ while (!done) {
+ // accept
+ struct sockaddr_in addr;
+ socklen_t slen = sizeof(addr);
+ int sd = ::accept(listen_sd, (sockaddr*)&addr, &slen);
+ if (sd > 0) {
+ dout(10) << "accepted incoming on sd " << sd << endl;
+
+ rank.lock.Lock();
+ Pipe *p = new Pipe(sd);
+ rank.pipes.insert(p);
+ rank.lock.Unlock();
+ } else {
+ dout(10) << "no incoming connection?" << endl;
+ break;
+ }
+ }
+
+ return 0;
+}
+
+
+
+/**************************************
+ * Pipe
+ */
+
+int Rank::Pipe::accept()
+{
+ // my creater gave me sd via accept()
+
+ // announce myself.
+ int rc = tcp_write(sd, (char*)&rank.my_inst, sizeof(rank.my_inst));
+ if (rc < 0) {
+ ::close(sd);
+ done = true;
+ return -1;
+ }
+
+ // identify peer
+ rc = tcp_read(sd, (char*)&peer_inst, sizeof(peer_inst));
+ if (rc < 0) {
+ dout(10) << "pipe(? " << this << ").accept couldn't read peer inst" << endl;
+ ::close(sd);
+ done = true;
+ return -1;
+ }
+
+ // create writer thread.
+ writer_running = true;
+ writer_thread.create();
+
+ // register pipe.
+ if (peer_inst.rank >= 0) {
+ rank.lock.Lock();
+ {
+ if (rank.rank_pipe.count(peer_inst.rank) == 0) {
+ // install a pipe!
+ dout(10) << "pipe(" << peer_inst << ' ' << this << ").accept peer is " << peer_inst << endl;
+ rank.rank_pipe[peer_inst.rank] = this;
+ } else {
+ // low ranks' Pipes "win"
+ if (peer_inst.rank < rank.my_inst.rank ||
+ rank.my_inst.rank < 0) {
+ dout(10) << "pipe(" << peer_inst << ' ' << this << ").accept peer is " << peer_inst
+ << ", already had pipe, but switching to this new one" << endl;
+ // switch to this new Pipe
+ rank.rank_pipe[peer_inst.rank]->close(); // close old one
+ rank.rank_pipe[peer_inst.rank] = this;
+ } else {
+ dout(10) << "pipe(" << peer_inst << ' ' << this << ").accept peer is " << peer_inst
+ << ", already had pipe, sticking with it" << endl;
+ }
+ }
+ }
+ rank.lock.Unlock();
+ } else {
+ dout(10) << "pipe(" << peer_inst << ' ' << this << ").accept peer is unranked " << peer_inst << endl;
+ }
+
+ return 0; // success.
+}
+
+int Rank::Pipe::connect()
+{
+ dout(10) << "pipe(" << peer_inst << ' ' << this << ").connect" << endl;
+
+ // create socket?
+ sd = socket(AF_INET,SOCK_STREAM,0);
+ assert(sd > 0);
+
+ // bind any port
+ struct sockaddr_in myAddr;
+ myAddr.sin_family = AF_INET;
+ myAddr.sin_addr.s_addr = htonl(INADDR_ANY);
+ myAddr.sin_port = htons( 0 );
+
+ int rc = bind(sd, (struct sockaddr *) &myAddr, sizeof(myAddr));
+ assert(rc>=0);
+
+ // connect!
+ rc = ::connect(sd, (sockaddr*)&peer_inst.addr, sizeof(myAddr));
+ if (rc < 0) return rc;
+
+ // identify peer
+ entity_inst_t inst;
+ rc = tcp_read(sd, (char*)&inst, sizeof(inst));
+ if (inst.rank < 0)
+ inst = peer_inst; // i know better than they do.
+ if (peer_inst != inst && inst.rank > 0) {
+ derr(0) << "pipe(" << peer_inst << ' ' << this << ").connect peer is " << inst << ", wtf" << endl;
+ assert(0);
+ return -1;
+ }
+
+ // identify myself
+ rc = tcp_write(sd, (char*)&rank.my_inst, sizeof(rank.my_inst));
+ if (rc < 0)
+ return -1;
+
+ // register pipe
+ rank.lock.Lock();
+ {
+ if (rank.rank_pipe.count(peer_inst.rank) == 0) {
+ dout(10) << "pipe(" << peer_inst << ' ' << this << ").connect registering pipe" << endl;
+ rank.rank_pipe[peer_inst.rank] = this;
+ } else {
+ // this is normal.
+ dout(10) << "pipe(" << peer_inst << ' ' << this << ").connect pipe already registered." << endl;
+ }
+ }
+ rank.lock.Unlock();
+
+ // start reader
+ reader_running = true;
+ reader_thread.create();
+
+ return 0;
+}
+
+
+void Rank::Pipe::close()
+{
+ if (sent_close) {
+ dout(10) << "pipe(" << peer_inst << ' ' << this << ").close already closing" << endl;
+ return;
+ }
+ dout(10) << "pipe(" << peer_inst << ' ' << this << ").close" << endl;
+
+ // unreg ourselves
+ rank.lock.Lock();
+ {
+ if (rank.rank_pipe.count(peer_inst.rank) &&
+ rank.rank_pipe[peer_inst.rank] == this) {
+ dout(10) << "pipe(" << peer_inst << ' ' << this << ").close unregistering pipe" << endl;
+ rank.rank_pipe.erase(peer_inst.rank);
+ }
+ }
+ rank.lock.Unlock();
+
+ // queue close message.
+ if (socket_error) {
+ dout(10) << "pipe(" << peer_inst << ' ' << this << ").close not queueing MSG_CLOSE, socket error" << endl;
+ } else {
+ dout(10) << "pipe(" << peer_inst << ' ' << this << ").close queueing MSG_CLOSE" << endl;
+ lock.Lock();
+ q.push_back(new MGenericMessage(MSG_CLOSE));
+ cond.Signal();
+ sent_close = true;
+ lock.Unlock();
+ }
+}
+
+
+/* read msgs from socket.
+ * also, server.
+ *
+ */
+void Rank::Pipe::reader()
+{
+ if (server)
+ accept();
+
+ // loop.
+ while (!done) {
+ Message *m = read_message();
+ if (!m || m->get_type() == 0) {
+ if (m) {
+ delete m;
+ dout(10) << "pipe(" << peer_inst << ' ' << this << ").reader read MSG_CLOSE message" << endl;
+ } else {
+ derr(10) << "pipe(" << peer_inst << ' ' << this << ").reader read null message" << endl;
+ }
+
+ if (!sent_close)
+ close();
+
+ done = true;
+ cond.Signal(); // wake up writer too.
+ break;
+ }
+
+ dout(10) << "pipe(" << peer_inst << ' ' << this << ").reader got message for " << m->get_dest() << endl;
+
+ EntityMessenger *entity = 0;
+
+ rank.lock.Lock();
+ {
+ if (rank.entity_map.count(m->get_source()) &&
+ rank.entity_map[m->get_source()] > m->get_source_inst()) {
+ derr(0) << "pipe(" << peer_inst << ' ' << this << ").reader source " << m->get_source()
+ << " inst " << m->get_source_inst()
+ << " > " << rank.entity_map[m->get_source()]
+ << ", WATCH OUT " << *m << endl;
+ assert(0);
+ }
+
+ if (g_conf.ms_single_dispatch) {
+ // submit to single dispatch queue
+ rank._submit_single_dispatch(m);
+ } else {
+ if (rank.local.count(m->get_dest())) {
+ // find entity
+ entity = rank.local[m->get_dest()];
+ } else {
+ entity = rank.find_unnamed(m->get_dest());
+ if (!entity) {
+ derr(0) << "pipe(" << peer_inst << ' ' << this << ").reader got message " << *m << " for " << m->get_dest() << ", which isn't local" << endl;
+ assert(0); // FIXME do this differently
+ }
+ }
+ }
+ }
+ rank.lock.Unlock();
+
+ if (entity)
+ entity->queue_message(m); // queue
+ }
+
+
+ // reap?
+ bool reap = false;
+ lock.Lock();
+ {
+ reader_running = false;
+ if (!writer_running) reap = true;
+ }
+ lock.Unlock();
+
+ if (reap) {
+ dout(20) << "pipe(" << peer_inst << ' ' << this << ").reader queueing for reap" << endl;
+ ::close(sd);
+ rank.lock.Lock();
+ {
+ rank.pipe_reap_queue.push_back(this);
+ rank.wait_cond.Signal();
+ }
+ rank.lock.Unlock();
+ }
+}
+
+
+/* write msgs to socket.
+ * also, client.
+ */
+void Rank::Pipe::writer()
+{
+ if (!server) {
+ int rc = connect();
+ if (rc < 0) {
+ derr(1) << "pipe(" << peer_inst << ' ' << this << ").writer error connecting" << endl;
+ done = true;
+ list<Message*> out;
+ fail(out);
+ }
+ }
+
+ // loop.
+ lock.Lock();
+ while (!q.empty() || !done) {
+
+ if (!q.empty()) {
+ dout(20) << "pipe(" << peer_inst << ' ' << this << ").writer grabbing message(s)" << endl;
+
+ // grab outgoing list
+ list<Message*> out;
+ out.swap(q);
+
+ // drop lock while i send these
+ lock.Unlock();
+
+ while (!out.empty()) {
+ Message *m = out.front();
+ out.pop_front();
+
+ dout(20) << "pipe(" << peer_inst << ' ' << this << ").writer sending " << *m << endl;
+
+ // stamp.
+ m->set_source_inst(rank.my_inst);
+
+ // marshall
+ if (m->empty_payload())
+ m->encode_payload();
+
+ if (write_message(m) < 0) {
+ // failed!
+ derr(1) << "pipe(" << peer_inst << ' ' << this << ").writer error sending " << *m << " to " << m->get_dest() << endl;
+ out.push_front(m);
+ fail(out);
+ done = true;
+ break;
+ }
+
+ // did i just send a close?
+ if (m->get_type() == MSG_CLOSE)
+ done = true;
+
+ // clean up
+ delete m;
+ }
+
+ lock.Lock();
+ continue;
+ }
+
+ // wait
+ dout(20) << "pipe(" << peer_inst << ' ' << this << ").writer sleeping" << endl;
+ cond.Wait(lock);
+ }
+ lock.Unlock();
+
+ dout(20) << "pipe(" << peer_inst << ' ' << this << ").writer finishing" << endl;
+
+ // reap?
+ bool reap = false;
+ lock.Lock();
+ {
+ writer_running = false;
+ if (!reader_running) reap = true;
+ }
+ lock.Unlock();
+
+ if (reap) {
+ dout(20) << "pipe(" << peer_inst << ' ' << this << ").writer queueing for reap" << endl;
+ ::close(sd);
+ rank.lock.Lock();
+ {
+ rank.pipe_reap_queue.push_back(this);
+ rank.wait_cond.Signal();
+ }
+ rank.lock.Unlock();
+ }
+}
+
+
+Message *Rank::Pipe::read_message()
+{
+ // envelope
+ //dout(10) << "receiver.read_message from sd " << sd << endl;
+
+ msg_envelope_t env;
+ if (!tcp_read( sd, (char*)&env, sizeof(env) )) {
+ socket_error = true;
+ return 0;
+ }
+
+ dout(20) << "pipe(" << peer_inst << ' ' << this << ").reader got envelope type=" << env.type
+ << " src " << env.source << " dst " << env.dest
+ << " nchunks=" << env.nchunks
+ << endl;
+
+ // payload
+ bufferlist blist;
+ for (int i=0; i<env.nchunks; i++) {
+ int size;
+ if (!tcp_read( sd, (char*)&size, sizeof(size) )) {
+ socket_error = true;
+ return 0;
+ }
+
+ if (size == 0) continue;
+
+ bufferptr bp(size);
+
+ if (!tcp_read( sd, bp.c_str(), size )) {
+ socket_error = true;
+ return 0;
+ }
+
+ blist.push_back(bp);
+
+ dout(20) << "pipe(" << peer_inst << ' ' << this << ").reader got frag " << i << " of " << env.nchunks
+ << " len " << bp.length() << endl;
+ }
+
+ // unmarshall message
+ size_t s = blist.length();
+ Message *m = decode_message(env, blist);
+
+ dout(20) << "pipe(" << peer_inst << ' ' << this << ").reader got " << s << " byte message from "
+ << m->get_source() << endl;
+
+ return m;
+}
+
+
+
+int Rank::Pipe::write_message(Message *m)
+{
+ // get envelope, buffers
+ msg_envelope_t *env = &m->get_envelope();
+ bufferlist blist;
+ blist.claim( m->get_payload() );
+
+#ifdef TCP_KEEP_CHUNKS
+ env->nchunks = blist.buffers().size();
+#else
+ env->nchunks = 1;
+#endif
+
+ dout(20) << "pipe(" << peer_inst << ' ' << this << ").writer sending " << m << " " << *m
+ << " to " << m->get_dest()
+ << endl;
+
+ // send envelope
+ int r = tcp_write( sd, (char*)env, sizeof(*env) );
+ if (r < 0) {
+ derr(1) << "pipe(" << peer_inst << ' ' << this << ").writer error sending envelope for " << *m
+ << " to " << m->get_dest() << endl;
+ socket_error = true;
+ return -1;
+ }
+
+ // payload
+#ifdef TCP_KEEP_CHUNKS
+ // send chunk-wise
+ int i = 0;
+ for (list<bufferptr>::const_iterator it = blist.buffers().begin();
+ it != blist.buffers().end();
+ it++) {
+ dout(10) << "pipe(" << peer_inst << ' ' << this << ").writer tcp_sending frag " << i << " len " << (*it).length() << endl;
+ int size = (*it).length();
+ r = tcp_write( sd, (char*)&size, sizeof(size) );
+ if (r < 0) {
+ derr(10) << "pipe(" << peer_inst << ' ' << this << ").writer error sending chunk len for " << *m << " to " << m->get_dest() << endl;
+ socket_error = true;
+ return -1;
+ }
+ r = tcp_write( sd, (*it).c_str(), size );
+ if (r < 0) {
+ derr(10) << "pipe(" << peer_inst << ' ' << this << ").writer error sending data chunk for " << *m << " to " << m->get_dest() << endl;
+ socket_error = true;
+ return -1;
+ }
+ i++;
+ }
+#else
+ // one big chunk
+ int size = blist.length();
+ r = tcp_write( sd, (char*)&size, sizeof(size) );
+ if (r < 0) {
+ derr(10) << "pipe(" << peer_inst << ' ' << this << ").writer error sending data len for " << *m << " to " << m->get_dest() << endl;
+ socket_error = true;
+ return -1;
+ }
+ dout(20) << "pipe(" << peer_inst << ' ' << this << ").writer data len is " << size << " in " << blist.buffers().size() << " buffers" << endl;
+
+ for (list<bufferptr>::const_iterator it = blist.buffers().begin();
+ it != blist.buffers().end();
+ it++) {
+ if ((*it).length() == 0) continue; // blank buffer.
+ r = tcp_write( sd, (char*)(*it).c_str(), (*it).length() );
+ if (r < 0) {
+ derr(10) << "pipe(" << peer_inst << ' ' << this << ").writer error sending data megachunk for " << *m << " to " << m->get_dest() << " : len " << (*it).length() << endl;
+ socket_error = true;
+ return -1;
+ }
+ }
+#endif
+
+ return 0;
+}
+
+
+void Rank::Pipe::fail(list<Message*>& out)
+{
+ derr(10) << "pipe(" << peer_inst << ' ' << this << ").fail" << endl;
+
+ // FIXME: possible race before i reclaim lock here?
+
+ // deactivate myself
+ rank.lock.Lock();
+ {
+ if (rank.rank_pipe.count(peer_inst.rank) &&
+ rank.rank_pipe[peer_inst.rank] == this)
+ rank.rank_pipe.erase(peer_inst.rank);
+ }
+ rank.lock.Unlock();
+
+ // what do i do about reader()? FIXME
+
+ // sort my messages by (source) dispatcher, dest.
+ map<Dispatcher*, map<msg_addr_t, list<Message*> > > by_dis;
+ lock.Lock();
+ {
+ // include out at front of queue
+ q.splice(q.begin(), out);
+
+ // sort
+ while (!q.empty()) {
+ if (q.front()->get_type() == MSG_CLOSE) {
+ delete q.front();
+ }
+ else if (rank.local.count(q.front()->get_source())) {
+ EntityMessenger *mgr = rank.local[q.front()->get_source()];
+ Dispatcher *dis = mgr->get_dispatcher();
+ if (mgr->is_stopped()) {
+ // ignore.
+ dout(1) << "pipe(" << peer_inst << ' ' << this << ").fail on " << *q.front() << ", dispatcher stopping, ignoring." << endl;
+ delete q.front();
+ } else {
+ by_dis[dis][q.front()->get_dest()].push_back(q.front());
+ }
+ }
+ else {
+ // oh well. sending entity musta just shut down?
+ assert(0);
+ delete q.front();
+ }
+ q.pop_front();
+ }
+ }
+ lock.Unlock();
+
+ // report failure(s) to dispatcher(s)
+ for (map<Dispatcher*, map<msg_addr_t, list<Message*> > >::iterator i = by_dis.begin();
+ i != by_dis.end();
+ ++i)
+ for (map<msg_addr_t, list<Message*> >::iterator j = i->second.begin();
+ j != i->second.end();
+ ++j)
+ for (list<Message*>::iterator k = j->second.begin();
+ k != j->second.end();
+ ++k) {
+ derr(1) << "pipe(" << peer_inst << ' ' << this << ").fail on " << **k << " to " << j->first << " inst " << peer_inst << endl;
+ i->first->ms_handle_failure(*k, j->first, peer_inst);
+ }
+}
+
+
+
+
+
+
+/********************************************
+ * Rank
+ */
+
+Rank::Rank() :
+ single_dispatcher(this) {
+ // default to any listen_addr
+ memset((char*)&listen_addr, 0, sizeof(listen_addr));
+ listen_addr.sin_family = AF_INET;
+ listen_addr.sin_addr.s_addr = htonl(INADDR_ANY);
+ listen_addr.sin_port = 0;
+}
+Rank::~Rank()
+{
+}
+
+void Rank::set_listen_addr(tcpaddr_t& a)
+{
+ dout(10) << "set_listen_addr " << a << endl;
+ memcpy((char*)&listen_addr.sin_addr.s_addr, (char*)&a.sin_addr.s_addr, 4);
+ listen_addr.sin_port = a.sin_port;
+}
+
+
+void Rank::_submit_single_dispatch(Message *m)
+{
+ assert(lock.is_locked());
+
+ if (local.count(m->get_dest()) &&
+ local[m->get_dest()]->is_ready()) {
+ rank.single_dispatch_queue.push_back(m);
+ rank.single_dispatch_cond.Signal();
+ } else {
+ waiting_for_ready[m->get_dest()].push_back(m);
+ }
+}
+
+
+void Rank::single_dispatcher_entry()
+{
+ lock.Lock();
+ while (!single_dispatch_stop || !single_dispatch_queue.empty()) {
+ if (!single_dispatch_queue.empty()) {
+ list<Message*> ls;
+ ls.swap(single_dispatch_queue);
+
+ lock.Unlock();
+ {
+ while (!ls.empty()) {
+ Message *m = ls.front();
+ ls.pop_front();
+
+ dout(1) << m->get_dest()
+ << " <-- " << m->get_source() << " " << m->get_source_inst()
+ << " ---- " << *m
+ << " -- " << m
+ << endl;
+
+ assert(local.count(m->get_dest()));
+ local[m->get_dest()]->dispatch(m);
+ }
+ }
+ lock.Lock();
+ continue;
+ }
+ single_dispatch_cond.Wait(lock);
+ }
+ lock.Unlock();
+}
+
+
+/*
+ * note: assumes lock is held
+ */
+void Rank::reaper()
+{
+ dout(10) << "reaper" << endl;
+ assert(lock.is_locked());
+
+ while (!pipe_reap_queue.empty()) {
+ Pipe *p = pipe_reap_queue.front();
+ dout(10) << "reaper reaping pipe " << p->get_peer_inst() << endl;
+ pipe_reap_queue.pop_front();
+ assert(pipes.count(p));
+ pipes.erase(p);
+ p->join();
+ dout(10) << "reaper reaped pipe " << p->get_peer_inst() << endl;
+ delete p;
+ }
+}
+
+
+int Rank::start_rank()
+{
+ dout(10) << "start_rank" << endl;
+
+ // bind to a socket
+ if (accepter.start() < 0)
+ return -1;
+
+ // start single thread dispatcher?
+ if (g_conf.ms_single_dispatch) {
+ single_dispatch_stop = false;
+ single_dispatcher.create();
+ }
+
+ lock.Lock();
+
+ // my_inst
+ my_inst.set_addr( listen_addr );
+
+ dout(1) << "start_rank at " << my_inst << endl;
+
+ lock.Unlock();
+ return 0;
+}
+
+
+
+/* connect_rank
+ * NOTE: assumes rank.lock held.
+ */
+Rank::Pipe *Rank::connect_rank(const entity_inst_t& inst)
+{
+ assert(rank.lock.is_locked());
+ assert(inst != rank.my_inst);
+
+ dout(10) << "connect_rank to " << inst << endl;
+
+ // create pipe
+ Pipe *pipe = new Pipe(inst);
+ rank.rank_pipe[inst.rank] = pipe;
+ pipes.insert(pipe);
+
+ return pipe;
+}
+
+
+
+
+
+void Rank::show_dir()
+{
+ dout(10) << "show_dir ---" << endl;
+
+ for (hash_map<msg_addr_t, entity_inst_t>::iterator i = entity_map.begin();
+ i != entity_map.end();
+ i++) {
+ if (local.count(i->first)) {
+ dout(10) << "show_dir entity_map " << i->first << " -> " << i->second << " local " << endl;
+ } else {
+ dout(10) << "show_dir entity_map " << i->first << " -> " << i->second << endl;
+ }
+ }
+}
+
+Rank::EntityMessenger *Rank::find_unnamed(msg_addr_t a)
+{
+ // find an unnamed local entity of the right type
+ for (map<msg_addr_t, EntityMessenger*>::iterator p = local.begin();
+ p != local.end();
+ ++p) {
+ if (p->first.type() == a.type() && p->first.is_new())
+ return p->second;
+ }
+ return 0;
+}
+
+
+
+
+/* register_entity
+ */
+Rank::EntityMessenger *Rank::register_entity(msg_addr_t addr)
+{
+ dout(10) << "register_entity " << addr << endl;
+ lock.Lock();
+
+ // create messenger
+ EntityMessenger *msgr = new EntityMessenger(addr);
+
+ // add to directory
+ entity_map[addr] = my_inst;
+ local[addr] = msgr;
+
+ lock.Unlock();
+ return msgr;
+}
+
+
+void Rank::unregister_entity(EntityMessenger *msgr)
+{
+ lock.Lock();
+ dout(10) << "unregister_entity " << msgr->get_myaddr() << endl;
+
+ // remove from local directory.
+ assert(local.count(msgr->get_myaddr()));
+ local.erase(msgr->get_myaddr());
+ assert(entity_map.count(msgr->get_myaddr()));
+ entity_map.erase(msgr->get_myaddr());
+
+ wait_cond.Signal();
+
+ lock.Unlock();
+}
+
+
+void Rank::submit_message(Message *m, const entity_inst_t& dest_inst)
+{
+ const msg_addr_t dest = m->get_dest();
+
+ // lookup
+ EntityMessenger *entity = 0;
+ Pipe *pipe = 0;
+
+ lock.Lock();
+ {
+ // local?
+ if (dest_inst.rank == my_inst.rank) {
+ if (local.count(dest)) {
+ // local
+ dout(20) << "submit_message " << *m << " dest " << dest << " local" << endl;
+ if (g_conf.ms_single_dispatch) {
+ _submit_single_dispatch(m);
+ } else {
+ entity = local[dest];
+ }
+ } else {
+ derr(0) << "submit_message " << *m << " dest " << dest << " " << dest_inst << " local but not in local map?" << endl;
+ assert(0); // hmpf
+ }
+ }
+ else {
+ // remote.
+ if (rank_pipe.count( dest_inst.rank )) {
+ dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_inst << ", already connected." << endl;
+ // connected.
+ pipe = rank_pipe[ dest_inst.rank ];
+ } else {
+ dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_inst << ", connecting." << endl;
+ // not connected.
+ pipe = connect_rank( dest_inst );
+ }
+ }
+ }
+ lock.Unlock();
+
+ // do it
+ if (entity) {
+ // local!
+ dout(20) << "submit_message " << *m << " dest " << dest << " local, queueing" << endl;
+ entity->queue_message(m);
+ }
+ else if (pipe) {
+ // remote!
+ dout(20) << "submit_message " << *m << " dest " << dest << " remote, sending" << endl;
+ pipe->send(m);
+ }
+}
+
+
+
+
+
+void Rank::wait()
+{
+ lock.Lock();
+ while (1) {
+ // reap dead pipes
+ reaper();
+
+ if (local.empty()) {
+ dout(10) << "wait: everything stopped" << endl;
+ break; // everything stopped.
+ }
+
+ wait_cond.Wait(lock);
+ }
+ lock.Unlock();
+
+ // done! clean up.
+
+ // stop dispatch thread
+ if (g_conf.ms_single_dispatch) {
+ dout(10) << "wait: stopping dispatch thread" << endl;
+ lock.Lock();
+ single_dispatch_stop = true;
+ single_dispatch_cond.Signal();
+ lock.Unlock();
+ single_dispatcher.join();
+ }
+
+ // reap pipes
+ lock.Lock();
+ {
+ dout(10) << "wait: closing pipes" << endl;
+ list<Pipe*> toclose;
+ for (hash_map<__int64_t,Pipe*>::iterator i = rank_pipe.begin();
+ i != rank_pipe.end();
+ i++)
+ toclose.push_back(i->second);
+ for (list<Pipe*>::iterator i = toclose.begin();
+ i != toclose.end();
+ i++)
+ (*i)->close();
+
+ dout(10) << "wait: waiting for pipes " << pipes << " to close" << endl;
+ while (!pipes.empty()) {
+ wait_cond.Wait(lock);
+ reaper();
+ }
+ }
+ lock.Unlock();
+
+ dout(10) << "wait: done." << endl;
+}
+
+
+
+
+
+
+/**********************************
+ * EntityMessenger
+ */
+
+Rank::EntityMessenger::EntityMessenger(msg_addr_t myaddr) :
+ Messenger(myaddr),
+ stop(false),
+ dispatch_thread(this)
+{
+ set_myinst(rank.my_inst);
+}
+Rank::EntityMessenger::~EntityMessenger()
+{
+}
+
+void Rank::EntityMessenger::dispatch_entry()
+{
+ lock.Lock();
+ while (!stop) {
+ if (!dispatch_queue.empty()) {
+ list<Message*> ls;
+ ls.swap(dispatch_queue);
+
+ lock.Unlock();
+ {
+ // deliver
+ while (!ls.empty()) {
+ Message *m = ls.front();
+ ls.pop_front();
+ dout(1) << m->get_dest()
+ << " <-- " << m->get_source() << " " << m->get_source_inst()
+ << " ---- " << *m
+ << " -- " << m
+ << endl;
+ dispatch(m);
+ }
+ }
+ lock.Lock();
+ continue;
+ }
+ cond.Wait(lock);
+ }
+ lock.Unlock();
+}
+
+void Rank::EntityMessenger::ready()
+{
+ dout(10) << "ready " << get_myaddr() << endl;
+
+ if (g_conf.ms_single_dispatch) {
+ rank.lock.Lock();
+ if (rank.waiting_for_ready.count(get_myaddr())) {
+ rank.single_dispatch_queue.splice(rank.single_dispatch_queue.end(),
+ rank.waiting_for_ready[get_myaddr()]);
+ rank.waiting_for_ready.erase(get_myaddr());
+ rank.single_dispatch_cond.Signal();
+ }
+ rank.lock.Unlock();
+ } else {
+ // start my dispatch thread
+ dispatch_thread.create();
+ }
+}
+
+
+int Rank::EntityMessenger::shutdown()
+{
+ dout(10) << "shutdown " << get_myaddr() << endl;
+
+ // deregister
+ rank.unregister_entity(this);
+
+ // stop my dispatch thread
+ if (dispatch_thread.am_self()) {
+ dout(1) << "shutdown i am dispatch, setting stop flag" << endl;
+ stop = true;
+ } else {
+ dout(1) << "shutdown i am not dispatch, setting stop flag and joining thread." << endl;
+ lock.Lock();
+ stop = true;
+ cond.Signal();
+ lock.Unlock();
+ dispatch_thread.join();
+ }
+
+ return 0;
+}
+
+
+void Rank::EntityMessenger::prepare_dest(const entity_inst_t& inst)
+{
+ rank.lock.Lock();
+ {
+ if (rank.rank_pipe.count(inst.rank) == 0)
+ rank.connect_rank(inst);
+ }
+ rank.lock.Unlock();
+}
+
+int Rank::EntityMessenger::send_message(Message *m, msg_addr_t dest, entity_inst_t inst,
+ int port, int fromport)
+{
+ // set envelope
+ m->set_source(get_myaddr(), fromport);
+ m->set_dest(dest, port);
+
+ m->set_source_inst(rank.my_inst);
+
+ dout(1) << m->get_source()
+ << " --> " << m->get_dest() << " " << inst
+ << " -- " << *m
+ << " -- " << m
+ << endl;
+
+ rank.submit_message(m, inst);
+
+ return 0;
+}
+
+
+void Rank::EntityMessenger::reset_myaddr(msg_addr_t newaddr)
+{
+ msg_addr_t oldaddr = get_myaddr();
+ dout(10) << "set_myaddr " << oldaddr << " to " << newaddr << endl;
+
+ rank.entity_map.erase(oldaddr);
+ rank.local.erase(oldaddr);
+ rank.entity_map[newaddr] = rank.my_inst;
+ rank.local[newaddr] = this;
+
+ _set_myaddr(newaddr);
+}
+
+
+
+
+void Rank::EntityMessenger::mark_down(msg_addr_t a, entity_inst_t& i)
+{
+ assert(a != get_myaddr());
+ rank.mark_down(a,i);
+}
+
+void Rank::mark_down(msg_addr_t a, entity_inst_t& inst)
+{
+ //if (my_rank == 0) return; // ugh.. rank0 already handles this stuff in the namer
+ lock.Lock();
+ if (entity_map.count(a) &&
+ entity_map[a] > inst) {
+ dout(10) << "mark_down " << a << " inst " << inst << " < " << entity_map[a] << endl;
+ derr(10) << "mark_down " << a << " inst " << inst << " < " << entity_map[a] << endl;
+ // do nothing!
+ } else {
+ if (entity_map.count(a) == 0) {
+ // don't know it
+ dout(10) << "mark_down " << a << " inst " << inst << " ... unknown by me" << endl;
+ derr(10) << "mark_down " << a << " inst " << inst << " ... unknown by me" << endl;
+ } else {
+ // know it
+ assert(entity_map[a] <= inst);
+ dout(10) << "mark_down " << a << " inst " << inst << endl;
+ derr(10) << "mark_down " << a << " inst " << inst << endl;
+
+ entity_map.erase(a);
+
+ if (rank_pipe.count(inst.rank)) {
+ rank_pipe[inst.rank]->close();
+ rank_pipe.erase(inst.rank);
+ }
+ }
+ }
+ lock.Unlock();
+}
+
+void Rank::EntityMessenger::mark_up(msg_addr_t a, entity_inst_t& i)
+{
+ assert(a != get_myaddr());
+ rank.mark_up(a, i);
+}
+
+void Rank::mark_up(msg_addr_t a, entity_inst_t& i)
+{
+ lock.Lock();
+ {
+ dout(10) << "mark_up " << a << " inst " << i << endl;
+ derr(10) << "mark_up " << a << " inst " << i << endl;
+
+ if (entity_map.count(a) == 0 ||
+ entity_map[a] < i) {
+ entity_map[a] = i;
+ connect_rank(i);
+ } else if (entity_map[a] == i) {
+ dout(10) << "mark_up " << a << " inst " << i << " ... knew it" << endl;
+ derr(10) << "mark_up " << a << " inst " << i << " ... knew it" << endl;
+ } else {
+ dout(-10) << "mark_up " << a << " inst " << i << " < " << entity_map[a] << endl;
+ derr(-10) << "mark_up " << a << " inst " << i << " < " << entity_map[a] << endl;
+ }
+
+ //if (waiting_for_lookup.count(a))
+ //lookup(a);
+ }
+ lock.Unlock();
+}
+
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef __SIMPLEMESSENGER_H
+#define __SIMPLEMESSENGER_H
+
+
+#include <list>
+#include <map>
+using namespace std;
+#include <ext/hash_map>
+#include <ext/hash_set>
+using namespace __gnu_cxx;
+
+
+#include "include/types.h"
+
+#include "common/Mutex.h"
+#include "common/Cond.h"
+#include "common/Thread.h"
+
+#include "Messenger.h"
+#include "Message.h"
+#include "tcp.h"
+
+
+
+
+/* Rank - per-process
+ */
+class Rank {
+
+ class EntityMessenger;
+ class Pipe;
+
+ // incoming
+ class Accepter : public Thread {
+ public:
+ bool done;
+
+ int listen_sd;
+
+ Accepter() : done(false) {}
+
+ void *entry();
+ void stop() {
+ done = true;
+ ::close(listen_sd);
+ join();
+ }
+ int start();
+ } accepter;
+
+
+ // pipe
+ class Pipe {
+ protected:
+ int sd;
+ bool done;
+ entity_inst_t peer_inst;
+ bool server;
+ bool sent_close;
+ bool socket_error;
+
+ bool reader_running;
+ bool writer_running;
+
+ list<Message*> q;
+ Mutex lock;
+ Cond cond;
+
+ int accept(); // server handshake
+ int connect(); // client handshake
+ void reader();
+ void writer();
+
+ Message *read_message();
+ int write_message(Message *m);
+ void fail(list<Message*>& ls);
+
+ // threads
+ class Reader : public Thread {
+ Pipe *pipe;
+ public:
+ Reader(Pipe *p) : pipe(p) {}
+ void *entry() { pipe->reader(); return 0; }
+ } reader_thread;
+ friend class Reader;
+
+ class Writer : public Thread {
+ Pipe *pipe;
+ public:
+ Writer(Pipe *p) : pipe(p) {}
+ void *entry() { pipe->writer(); return 0; }
+ } writer_thread;
+ friend class Writer;
+
+ public:
+ Pipe(int s) : sd(s),
+ done(false), server(true),
+ sent_close(false), socket_error(false),
+ reader_running(false), writer_running(false),
+ reader_thread(this), writer_thread(this) {
+ // server
+ reader_running = true;
+ reader_thread.create();
+ }
+ Pipe(const entity_inst_t &pi) : sd(0),
+ done(false), peer_inst(pi), server(false),
+ sent_close(false),
+ reader_running(false), writer_running(false),
+ reader_thread(this), writer_thread(this) {
+ // client
+ writer_running = true;
+ writer_thread.create();
+ }
+
+ // public constructors
+ static const Pipe& Server(int s);
+ static const Pipe& Client(const entity_inst_t& pi);
+
+ entity_inst_t& get_peer_inst() { return peer_inst; }
+
+ void close();
+ void join() {
+ writer_thread.join();
+ reader_thread.join();
+ }
+
+ void send(Message *m) {
+ lock.Lock();
+ q.push_back(m);
+ cond.Signal();
+ lock.Unlock();
+ }
+ void send(list<Message*>& ls) {
+ lock.Lock();
+ q.splice(q.end(), ls);
+ cond.Signal();
+ lock.Unlock();
+ }
+ };
+
+
+
+ // messenger interface
+ class EntityMessenger : public Messenger {
+ Mutex lock;
+ Cond cond;
+ list<Message*> dispatch_queue;
+ bool stop;
+
+ class DispatchThread : public Thread {
+ EntityMessenger *m;
+ public:
+ DispatchThread(EntityMessenger *_m) : m(_m) {}
+ void *entry() {
+ m->dispatch_entry();
+ return 0;
+ }
+ } dispatch_thread;
+ void dispatch_entry();
+
+ public:
+ void queue_message(Message *m) {
+ lock.Lock();
+ dispatch_queue.push_back(m);
+ cond.Signal();
+ lock.Unlock();
+ }
+ void queue_messages(list<Message*> ls) {
+ lock.Lock();
+ dispatch_queue.splice(dispatch_queue.end(), ls);
+ cond.Signal();
+ lock.Unlock();
+ }
+
+ public:
+ EntityMessenger(msg_addr_t myaddr);
+ ~EntityMessenger();
+
+ void ready();
+ bool is_stopped() { return stop; }
+
+ void wait() {
+ dispatch_thread.join();
+ }
+
+ void reset_myaddr(msg_addr_t m);
+
+ void callback_kick() {}
+ int shutdown();
+ void prepare_dest(const entity_inst_t& inst);
+ int send_message(Message *m, msg_addr_t dest, entity_inst_t inst,
+ int port=0, int fromport=0);
+
+ void mark_down(msg_addr_t a, entity_inst_t& i);
+ void mark_up(msg_addr_t a, entity_inst_t& i);
+ };
+
+
+ class SingleDispatcher : public Thread {
+ Rank *rank;
+ public:
+ SingleDispatcher(Rank *r) : rank(r) {}
+ void *entry() {
+ rank->single_dispatcher_entry();
+ return 0;
+ }
+ } single_dispatcher;
+
+ Cond single_dispatch_cond;
+ bool single_dispatch_stop;
+ list<Message*> single_dispatch_queue;
+
+ map<msg_addr_t, list<Message*> > waiting_for_ready;
+
+ void single_dispatcher_entry();
+ void _submit_single_dispatch(Message *m);
+
+
+ // Rank stuff
+ public:
+ Mutex lock;
+ Cond wait_cond; // for wait()
+
+ // where i listen
+ tcpaddr_t listen_addr;
+
+ // my instance
+ entity_inst_t my_inst;
+
+ // lookup
+ hash_map<msg_addr_t, entity_inst_t> entity_map;
+ hash_set<msg_addr_t> entity_unstarted;
+
+ // local
+ map<msg_addr_t, EntityMessenger*> local;
+
+ // remote
+ hash_map<__int64_t, Pipe*> rank_pipe;
+
+ set<Pipe*> pipes;
+ list<Pipe*> pipe_reap_queue;
+
+ void show_dir();
+
+ Pipe *connect_rank(const entity_inst_t& inst);
+
+ void mark_down(msg_addr_t addr, entity_inst_t& i);
+ void mark_up(msg_addr_t addr, entity_inst_t& i);
+
+ tcpaddr_t get_listen_addr() { return listen_addr; }
+
+ void reaper();
+
+ EntityMessenger *find_unnamed(msg_addr_t a);
+
+public:
+ Rank();
+ ~Rank();
+
+ void set_listen_addr(tcpaddr_t& a);
+
+ int start_rank();
+ void wait();
+
+ EntityMessenger *register_entity(msg_addr_t addr);
+ void rename_entity(EntityMessenger *ms, msg_addr_t newaddr);
+ void unregister_entity(EntityMessenger *ms);
+
+ void submit_message(Message *m, const entity_inst_t& inst);
+ void prepare_dest(const entity_inst_t& inst);
+
+ // create a new messenger
+ EntityMessenger *new_entity(msg_addr_t addr);
+
+} ;
+
+
+
+extern Rank rank;
+
+#endif
#include "client/Client.h"
#include "client/SyntheticClient.h"
-#include "msg/NewerMessenger.h"
+#include "msg/SimpleMessenger.h"
#include "common/Timer.h"
// start up all monitors at known addresses.
entity_inst_t moninst[mpi_world]; // only care about first g_conf.num_mon of these.
- if (mpi_rank < g_conf.num_mon) {
- rank.my_rank = mpi_rank;
- rank.start_rank(); // bind and listen
+ rank.start_rank(); // bind and listen
- moninst[mpi_rank].rank = mpi_rank;
- moninst[mpi_rank].addr = rank.get_listen_addr();
+ if (mpi_rank < g_conf.num_mon) {
+ moninst[mpi_rank].set_addr( rank.get_listen_addr() );
//cerr << mpi_rank << " at " << rank.get_listen_addr() << endl;
}
MPI_Gather( &moninst[mpi_rank], sizeof(entity_inst_t), MPI_CHAR,
moninst, sizeof(entity_inst_t), MPI_CHAR,
0, MPI_COMM_WORLD);
-
+
if (mpi_rank == 0) {
- rank.start_namer();
-
for (int i=0; i<g_conf.num_mon; i++) {
cerr << "mon" << i << " is at " << moninst[i] << endl;
monmap->mon_inst[i] = moninst[i];
- if (i) rank.namer->manual_insert_inst(monmap->get_inst(i));
}
}
bufferlist bl;
if (mpi_rank == 0) {
monmap->encode(bl);
-
- int fd = ::open(".ceph_monmap", O_WRONLY|O_CREAT);
- ::write(fd, (void*)bl.c_str(), bl.length());
- ::fchmod(fd, 0755);
- ::close(fd);
-
+ monmap->write(".ceph_monmap");
} else {
int l = g_conf.num_mon * 1000; // nice'n big.
bufferptr bp(l);
if (mpi_rank > 0) {
monmap->decode(bl);
- rank.set_namer(monmap->get_inst(0).addr);
- }
-
- if (mpi_rank >= g_conf.num_mon) {
- rank.start_rank();
}
-
+
// wait for everyone!
MPI_Barrier(MPI_COMM_WORLD);
for (int i=0; i<NUMMDS; i++) {
if (myrank != g_conf.ms_skip_rank0+i) continue;
Messenger *m = rank.register_entity(MSG_ADDR_MDS(i));
- cerr << "mds" << i << " on tcprank " << rank.my_rank << " " << hostname << "." << pid << endl;
+ cerr << "mds" << i << " at " << rank.my_inst << " " << hostname << "." << pid << endl;
mds[i] = new MDS(i, m, monmap);
mds[i]->init();
started++;
g_timer.add_event_after(kill_osd_after[i], new C_Die);
Messenger *m = rank.register_entity(MSG_ADDR_OSD(i));
- cerr << "osd" << i << " on tcprank " << rank.my_rank << " " << hostname << "." << pid << endl;
+ cerr << "osd" << i << " at " << rank.my_inst << " " << hostname << "." << pid << endl;
osd[i] = new OSD(i, m, monmap);
osd[i]->init();
started++;
nclients++;
}
if (nclients) {
- cerr << nclients << " clients on tcprank " << rank.my_rank << " " << hostname << "." << pid << endl;
+ cerr << nclients << " clients at " << rank.my_inst << " " << hostname << "." << pid << endl;
}
for (set<int>::iterator it = clientlist.begin();
if (myrank && !started) {
//dout(1) << "IDLE" << endl;
- cerr << "idle on tcprank " << rank.my_rank << " " << hostname << "." << pid << endl;
+ cerr << "idle at " << rank.my_inst << " " << hostname << "." << pid << endl;
//rank.stop_rank();
}
#include <sys/stat.h>
#include <fcntl.h>
+#ifdef DARWIN
+#include <sys/param.h>
+#include <sys/mount.h>
+#endif // DARWIN
+
int myrand()
{
//#include <sys/xattr.h>
//#include <sys/vfs.h>
+#ifdef DARWIN
+#include <sys/param.h>
+#include <sys/mount.h>
+#endif // DARWIN
+
#include "config.h"
#undef dout
#define dout(l) if (l<=g_conf.debug) cout << "osd" << whoami << ".fakestore "
void OSD::handle_pg_notify(MOSDPGNotify *m)
{
dout(7) << "handle_pg_notify from " << m->get_source() << endl;
- int from = MSG_ADDR_NUM(m->get_source());
+ int from = m->get_source().num();
if (!require_same_or_newer_map(m, m->get_epoch())) return;
void OSD::handle_pg_log(MOSDPGLog *m)
{
- int from = MSG_ADDR_NUM(m->get_source());
+ int from = m->get_source().num();
const pg_t pgid = m->get_pgid();
if (!require_same_or_newer_map(m, m->get_epoch())) return;
void OSD::handle_pg_query(MOSDPGQuery *m)
{
dout(7) << "handle_pg_query from " << m->get_source() << " epoch " << m->get_epoch() << endl;
- int from = MSG_ADDR_NUM(m->get_source());
+ int from = m->get_source().num();
if (!require_same_or_newer_map(m, m->get_epoch())) return;
#include "include/Distribution.h"
#include <sys/stat.h>
+
+#ifdef DARWIN
+#include <sys/statvfs.h>
+#else
#include <sys/vfs.h> /* or <sys/statfs.h> */
+#endif /* DARWIN */
#include <list>
using namespace std;
size_t operator()(const reqid_t &r) const {
static hash<unsigned long> H;
static hash<__uint64_t> I;
- return H(r.addr._addr) ^ I(r.tid);
+ return H(r.addr.type() ^ r.addr.num()) ^ I(r.tid);
}
};
}