# dev work, and seems to behave just fine... change ${CC} back to
# mpicxx if you get paranoid.
+#CC = g++
+#CFLAGS = -g -Wall -I. -D_FILE_OFFSET_BITS=64 -DMPICH_IGNORE_CXX_SEEK -D_REENTRANT -D_THREAD_SAFE
+#LIBS = -lpthread
+
+# Hook for extra -I options, etc.
+EXTRA_CFLAGS =
+
+ifeq ($(target),darwin)
+# For Darwin
+CFLAGS = -g -Wall -I. -D_FILE_OFFSET_BITS=64 -DMPICH_IGNORE_CXX_SEEK -D_REENTRANT -D_THREAD_SAFE -DDARWIN -D__FreeBSD__=10 ${EXTRA_CFLAGS}
+LDINC = ar -rc
+else
+# For linux
+CFLAGS = -g -Wall -I. -D_FILE_OFFSET_BITS=64 -DMPICH_IGNORE_CXX_SEEK -D_REENTRANT -D_THREAD_SAFE
+LDINC = ld -i -o
+endif
+
CC = g++
-CFLAGS = -g -Wall -I. -D_FILE_OFFSET_BITS=64 -DMPICH_IGNORE_CXX_SEEK -D_REENTRANT -D_THREAD_SAFE
-LIBS = -lpthread
+LIBS = -lpthread
#for normal mpich2 machines
MPICC = mpicxx
fakefuse: fakefuse.cc mon.o mds.o client.o osd.o osdc.o ebofs.o client/fuse.o msg/FakeMessenger.o common.o
${CC} -pg ${CFLAGS} ${LIBS} -lfuse $^ -o $@
-fakesyn: fakesyn.o mon.o mds.o client.o osd.o ebofs.o osdc.o msg/FakeMessenger.o common.o
+fakesyn: fakesyn.cc mon.o mds.o client.o osd.o ebofs.o osdc.o msg/FakeMessenger.o common.o
${CC} -pg ${CFLAGS} ${LIBS} $^ -o $@
# libceph
libceph.o: client/ldceph.o client/Client.o msg/SimpleMessenger.o ${COMMON_OBJS} ${SYN_OBJS} ${OSDC_OBJS}
- ld -i $^ -o $@
+ ${LDINC} $^ -o $@
bench/mdtest/mdtest.o: bench/mdtest/mdtest.c
mpicc -c $^ -o $@
%.so: %.cc
${CC} -shared -fPIC ${CFLAGS} $< -o $@
-
clean:
rm -f *.o */*.o ${TARGETS} ${TEST_TARGETS}
common.o: ${COMMON_OBJS}
- ld -i -o $@ $^
+ ${LDINC} $@ $^
ebofs.o: ${EBOFS_OBJS}
- ld -i -o $@ $^
+ ${LDINC} $@ $^
client.o: ${CLIENT_OBJS}
- ld -i -o $@ $^
+ ${LDINC} $@ $^
osd.o: ${OSD_OBJS}
- ld -i -o $@ $^
+ ${LDINC} $@ $^
osdc.o: ${OSDC_OBJS}
- ld -i -o $@ $^
+ ${LDINC} $@ $^
mds.o: ${MDS_OBJS}
- ld -i -o $@ $^
+ ${LDINC} $@ $^
mon.o: ${MON_OBJS}
- ld -i -o $@ $^
+ ${LDINC} $@ $^
%.o: %.cc
${CC} ${CFLAGS} -c $< -o $@
#include "common/Timer.h"
+#ifndef DARWIN
#include <envz.h>
+#endif // DARWIN
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/stat.h>
#include <fcntl.h>
+#include <sys/statvfs.h>
+
#include <iostream>
using namespace std;
st->st_nlink = inode.nlink;
st->st_uid = inode.uid;
st->st_gid = inode.gid;
+#ifndef DARWIN
+ // FIXME what's going on here with darwin?
st->st_ctime = inode.ctime;
st->st_atime = inode.atime;
st->st_mtime = inode.mtime;
+#endif
st->st_size = inode.size;
st->st_blocks = inode.size ? ((inode.size - 1) / 4096 + 1):0;
st->st_blksize = 4096;
// fill the dirent
d->dp.d_dirent.d_ino = d->p->second.ino;
#ifndef __CYGWIN__
+#ifndef DARWIN
if (d->p->second.is_symlink())
d->dp.d_dirent.d_type = DT_LNK;
else if (d->p->second.is_dir())
d->dp.d_dirent.d_off = d->off;
d->dp.d_dirent.d_reclen = 1; // all records are length 1 (wrt offset, seekdir, telldir, etc.)
+#endif // DARWIN
#endif
strncpy(d->dp.d_dirent.d_name, d->p->first.c_str(), 256);
// fill the dirent
d->dp.d_dirent.d_ino = d->p->second.ino;
#ifndef __CYGWIN__
+#ifndef DARWIN
if (d->p->second.is_symlink())
d->dp.d_dirent.d_type = DT_LNK;
else if (d->p->second.is_dir())
d->dp.d_dirent.d_off = d->off;
d->dp.d_dirent.d_reclen = 1; // all records are length 1 (wrt offset, seekdir, telldir, etc.)
+#endif // DARWIN
#endif
strncpy(d->dp.d_dirent.d_name, d->p->first.c_str(), 256);
return 0;
}
-int Client::statfs(const char *path, struct statfs *stbuf)
+int Client::statfs(const char *path, struct statvfs *stbuf)
{
- assert(0); // implement me
+ bzero (stbuf, sizeof (struct statvfs));
+ // FIXME
+ stbuf->f_bsize = 1024;
+ stbuf->f_frsize = 1024;
+ stbuf->f_blocks = 1024 * 1024;
+ stbuf->f_bfree = 1024 * 1024;
+ stbuf->f_bavail = 1024 * 1024;
+ stbuf->f_files = 1024 * 1024;
+ stbuf->f_ffree = 1024 * 1024;
+ stbuf->f_favail = 1024 * 1024;
+ stbuf->f_namemax = 1024;
+
return 0;
}
-
int Client::lazyio_propogate(int fd, off_t offset, size_t count)
{
client_lock.Lock();
int unmount();
// these shoud (more or less) mirror the actual system calls.
- int statfs(const char *path, struct statfs *stbuf);
+ int statfs(const char *path, struct statvfs *stbuf);
// crap
int chdir(const char *s);
*/
#include <iostream>
+#include <sstream>
using namespace std;
syn_iargs.push_back( atoi(args[++i]) );
syn_iargs.push_back( atoi(args[++i]) );
syn_iargs.push_back( atoi(args[++i]) );
+ } else if (strcmp(args[i],"makedirmess") == 0) {
+ syn_modes.push_back( SYNCLIENT_MODE_MAKEDIRMESS );
+ syn_iargs.push_back( atoi(args[++i]) );
} else if (strcmp(args[i],"statdirs") == 0) {
syn_modes.push_back( SYNCLIENT_MODE_STATDIRS );
syn_iargs.push_back( atoi(args[++i]) );
}
break;
+ case SYNCLIENT_MODE_MAKEDIRMESS:
+ {
+ string sarg1 = get_sarg(0);
+ int iarg1 = iargs.front(); iargs.pop_front();
+ if (run_me()) {
+ dout(2) << "makedirmess " << sarg1 << " " << iarg1 << endl;
+ make_dir_mess(sarg1.c_str(), iarg1);
+ }
+ }
+ break;
case SYNCLIENT_MODE_MAKEDIRS:
{
string sarg1 = get_sarg(0);
}
+
+
+void SyntheticClient::make_dir_mess(const char *basedir, int n)
+{
+ vector<string> dirs;
+
+ dirs.push_back(basedir);
+ dirs.push_back(basedir);
+
+ client->mkdir(basedir, 0755);
+
+ // motivation:
+ // P(dir) ~ subdirs_of(dir) + 2
+ // from 5-year metadata workload paper in fast'07
+
+ // create dirs
+ for (int i=0; i<n; i++) {
+ // pick a dir
+ int k = rand() % dirs.size();
+ string parent = dirs[k];
+
+ // pick a name
+ std::stringstream ss;
+ ss << parent << "/" << i;
+ string dir;
+ ss >> dir;
+
+ // update dirs
+ dirs.push_back(parent);
+ dirs.push_back(dir);
+ dirs.push_back(dir);
+
+ // do it
+ client->mkdir(dir.c_str(), 0755);
+ }
+
+
+}
+
#define SYNCLIENT_MODE_RANDOMWALK 1
#define SYNCLIENT_MODE_FULLWALK 2
-#define SYNCLIENT_MODE_REPEATWALK 7
+#define SYNCLIENT_MODE_REPEATWALK 3
+#define SYNCLIENT_MODE_MAKEDIRMESS 7
#define SYNCLIENT_MODE_MAKEDIRS 8 // dirs files depth
#define SYNCLIENT_MODE_STATDIRS 9 // dirs files depth
#define SYNCLIENT_MODE_READDIRS 10 // dirs files depth
int play_trace(Trace& t, string& prefix);
+ void make_dir_mess(const char *basedir, int n);
+
};
#endif
#define _XOPEN_SOURCE 500
#endif
-#define FUSE_USE_VERSION 22
+#define FUSE_USE_VERSION 25
#include <fuse.h>
#include <stdio.h>
#include <fcntl.h>
#include <dirent.h>
#include <errno.h>
-#include <sys/statfs.h>
+#include <sys/statvfs.h>
// ceph stuff
}
*/
-static int ceph_statfs(const char *path, struct statfs *stbuf)
+
+static int ceph_statfs(const char *path, struct statvfs *stbuf)
{
return client->statfs(path, stbuf);
}
// allow other (all!) users to see my file system
// NOTE: echo user_allow_other >> /etc/fuse.conf
+ // NB: seems broken on Darwin
+#ifndef DARWIN
newargv[newargc++] = "-o";
newargv[newargc++] = "allow_other";
+#endif // DARWIN
// use inos
newargv[newargc++] = "-o";
int nsec() const { return tv.tv_usec*1000; }
// ref accessors/modifiers
- time_t& sec_ref() { return tv.tv_sec; }
- long& usec_ref() { return tv.tv_usec; }
+ time_t& sec_ref() { return tv.tv_sec; }
+ // FIXME: tv.tv_usec is a __darwin_suseconds_t on Darwin.
+ // is just casting it to long& OK?
+ long& usec_ref() { return (long&) tv.tv_usec; }
// cast to double
operator double() {
#include "msg/SimpleMessenger.h"
#include "common/Timer.h"
-
+
+#ifndef DARWIN
#include <envz.h>
+#endif // DARWIN
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/ioctl.h>
#ifndef __CYGWIN__
+#ifndef DARWIN
#include <linux/fs.h>
#endif
+#endif
/*******************************************
int BlockDevice::open_fd()
{
+#ifdef DARWIN
+ int fd = ::open(dev.c_str(), O_RDWR|O_SYNC, 0);
+ ::fcntl(fd, F_NOCACHE);
+ return fd;
+#else
return ::open(dev.c_str(), O_RDWR|O_SYNC|O_DIRECT, 0);
+#endif
}
int BlockDevice::open(kicker *idle)
}
*/
+void ObjectCache::touch_bottom(block_t bstart, block_t blast)
+{
+ for (map<block_t, BufferHead*>::iterator p = data.lower_bound(bstart);
+ p != data.end();
+ ++p) {
+ BufferHead *bh = p->second;
+
+ // don't trim unless it's entirely in our range
+ if (bh->start() < bstart) continue;
+ if (bh->end() > blast) break;
+
+ dout(12) << "moving " << *bh << " to bottom of lru" << endl;
+ bc->touch_bottom(bh); // move to bottom of lru list
+ }
+}
+
+
void ObjectCache::truncate(block_t blocks, version_t super_epoch)
{
dout(7) << "truncate " << object_id
utime_t dirty_stamp;
+ bool want_to_expire; // wants to be at bottom of lru
+
public:
BufferHead(ObjectCache *o) :
oc(o), //cancellable_ioh(0), tx_epoch(0),
rx_ioh(0), tx_ioh(0), tx_block(0), partial_tx_to(0), partial_tx_epoch(0),
shadow_of(0),
- ref(0), state(STATE_MISSING), epoch_modified(0), version(0), last_flushed(0)
+ ref(0), state(STATE_MISSING), epoch_modified(0), version(0), last_flushed(0),
+ want_to_expire(false)
{}
~BufferHead() {
unpin_shadows();
interval_set<block_t>& alloc,
map<block_t, BufferHead*>& hits,
version_t super_epoch); // can write to these.
+ void touch_bottom(block_t bstart, block_t blast);
BufferHead *split(BufferHead *bh, block_t off);
} else
lru_rest.lru_touch(bh);
}
+ void touch_bottom(BufferHead *bh) {
+ if (bh->is_dirty()) {
+ bh->want_to_expire = true;
+ lru_dirty.lru_bottouch(bh);
+ } else
+ lru_rest.lru_bottouch(bh);
+ }
void remove_bh(BufferHead *bh) {
bh->get_oc()->remove_bh(bh);
stat_sub(bh);
}
if (s != BufferHead::STATE_DIRTY && bh->get_state() == BufferHead::STATE_DIRTY) {
lru_dirty.lru_remove(bh);
- lru_rest.lru_insert_mid(bh);
+ if (bh->want_to_expire)
+ lru_rest.lru_insert_bot(bh);
+ else
+ lru_rest.lru_insert_mid(bh);
dirty_bh.erase(bh);
}
void Ebofs::sync(Context *onsafe)
{
ebofs_lock.Lock();
- if (onsafe)
+ if (onsafe) {
+ dirty = true;
commit_waiters[super_epoch].push_back(onsafe);
+ }
ebofs_lock.Unlock();
}
if (!dirty) {
dout(7) << "sync in " << super_epoch << ", not dirty" << endl;
} else {
- dout(7) << "sync in " << super_epoch << endl;
-
- if (!commit_thread_started) {
- dout(10) << "sync waiting for commit thread to start" << endl;
- sync_cond.Wait(ebofs_lock);
- }
-
- if (mid_commit) {
- dout(10) << "sync waiting for commit in progress" << endl;
+ epoch_t start = super_epoch;
+ dout(7) << "sync start in " << start << endl;
+ while (super_epoch == start) {
+ dout(7) << "sync kicking commit in " << super_epoch << endl;
+ dirty = true;
+ commit_cond.Signal();
sync_cond.Wait(ebofs_lock);
}
-
- commit_cond.Signal(); // trigger a commit
-
- sync_cond.Wait(ebofs_lock); // wait
-
dout(10) << "sync finish in " << super_epoch << endl;
}
ebofs_lock.Unlock();
return true;
}
+
+/*
+ * is_cached -- query whether a object extent is in our cache
+ * return value of -1 if onode isn't loaded. otherwise, the number
+ * of extents that need to be read (i.e. # of seeks)
+ */
+int Ebofs::is_cached(object_t oid, off_t off, size_t len)
+{
+ ebofs_lock.Lock();
+ int r = _is_cached(oid, off, len);
+ ebofs_lock.Unlock();
+ return r;
+}
+
+int Ebofs::_is_cached(object_t oid, off_t off, size_t len)
+{
+ Onode *on = 0;
+ if (onode_map.count(oid) == 0) {
+ dout(7) << "_is_cached " << oid << " " << off << "~" << len << " ... onode " << endl;
+ return -1; // object dne?
+ }
+
+ if (!on->have_oc()) {
+ // nothing is cached. return # of extents in file.
+ return on->extent_map.size();
+ }
+
+ // map
+ block_t bstart = off / EBOFS_BLOCK_SIZE;
+ block_t blast = (len+off-1) / EBOFS_BLOCK_SIZE;
+ block_t blen = blast-bstart+1;
+
+ map<block_t, BufferHead*> hits;
+ map<block_t, BufferHead*> missing; // read these
+ map<block_t, BufferHead*> rx; // wait for these
+ map<block_t, BufferHead*> partials; // ??
+ on->get_oc(&bc)->map_read(bstart, blen, hits, missing, rx, partials);
+ return missing.size() + rx.size() + partials.size();
+
+ // FIXME: actually, we should calculate if these extents are contiguous.
+ // and not using map_read, probably...
+ /* hrmpf
+ block_t dpos = 0;
+ block_t opos = bstart;
+ while (opos < blen) {
+ if (hits.begin()->first == opos) {
+ } else {
+ block_t d;
+ if (missing.begin()->first == opos) d = missing.begin()->second.
+
+ }
+ */
+}
+
+void Ebofs::trim_from_cache(object_t oid, off_t off, size_t len)
+{
+ ebofs_lock.Lock();
+ _trim_from_cache(oid, off, len);
+ ebofs_lock.Unlock();
+}
+
+void Ebofs::_trim_from_cache(object_t oid, off_t off, size_t len)
+{
+ Onode *on = 0;
+ if (onode_map.count(oid) == 0) {
+ dout(7) << "_trim_from_cache " << oid << " " << off << "~" << len << " ... onode not in cache " << endl;
+ return;
+ }
+
+ if (!on->have_oc())
+ return; // nothing is cached.
+
+ // map to blocks
+ block_t bstart = off / EBOFS_BLOCK_SIZE;
+ block_t blast = (len+off-1) / EBOFS_BLOCK_SIZE;
+
+ ObjectCache *oc = on->get_oc(&bc);
+ oc->touch_bottom(bstart, blast);
+
+ return;
+}
+
+
int Ebofs::read(object_t oid,
off_t off, size_t len,
bufferlist& bl)
}
break;
+ case Transaction::OP_TRIMCACHE:
+ {
+ object_t oid = t.oids.front(); t.oids.pop_front();
+ off_t offset = t.offsets.front(); t.offsets.pop_front();
+ size_t len = t.lengths.front(); t.lengths.pop_front();
+ _trim_from_cache(oid, offset, len);
+ }
+ break;
+
case Transaction::OP_TRUNCATE:
{
object_t oid = t.oids.front(); t.oids.pop_front();
bool exists(object_t);
int stat(object_t, struct stat*);
int read(object_t, off_t off, size_t len, bufferlist& bl);
- //int write(object_t oid, off_t off, size_t len, bufferlist& bl, bool fsync=true);
+ int is_cached(object_t oid, off_t off, size_t len);
+
int write(object_t oid, off_t off, size_t len, bufferlist& bl, Context *onsafe);
+ void trim_from_cache(object_t oid, off_t off, size_t len);
int truncate(object_t oid, off_t size, Context *onsafe=0);
int truncate_front(object_t oid, off_t size, Context *onsafe=0);
int remove(object_t oid, Context *onsafe=0);
private:
// private interface -- use if caller already holds lock
int _read(object_t oid, off_t off, size_t len, bufferlist& bl);
+ int _is_cached(object_t oid, off_t off, size_t len);
int _stat(object_t oid, struct stat *st);
int _getattr(object_t oid, const char *name, void *value, size_t size);
int _getattrs(object_t oid, map<string,bufferptr> &aset);
bool _write_will_block();
int _write(object_t oid, off_t off, size_t len, bufferlist& bl);
+ void _trim_from_cache(object_t oid, off_t off, size_t len);
int _truncate(object_t oid, off_t size);
int _truncate_front(object_t oid, off_t size);
int _remove(object_t oid);
class raw_mmap_pages : public raw {
public:
raw_mmap_pages(unsigned l) : raw(l) {
- data = (char*)::mmap(NULL, len, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, -1, 0);
+ data = (char*)::mmap(NULL, len, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANON, -1, 0);
inc_total_alloc(len);
}
~raw_mmap_pages() {
class raw_posix_aligned : public raw {
public:
raw_posix_aligned(unsigned l) : raw(l) {
+#ifdef DARWIN
+ data = (char *) valloc (len);
+#else
::posix_memalign((void**)&data, BUFFER_PAGE_SIZE, len);
+#endif /* DARWIN */
inc_total_alloc(len);
}
~raw_posix_aligned() {
case STATE_STOPPED: return "up:stopped";
default: assert(0);
}
+ return 0;
}
protected:
class MClientBoot : public Message {
public:
- MClientBoot() : Message(MSG_CLIENT_BOOT) {
- }
+ MClientBoot() : Message(MSG_CLIENT_BOOT) { }
- char *get_type_name() { return "Cboot"; }
+ char *get_type_name() { return "ClientBoot"; }
- virtual void decode_payload(crope& s, int& off) {
- }
- virtual void encode_payload(crope& s) {
- }
+ void encode_payload() { }
+ void decode_payload() { }
};
#endif
class MMonElectionAck : public Message {
public:
- int q;
- int refresh_num;
+ MMonElectionAck() : Message(MSG_MON_ELECTION_ACK) {}
+
+ virtual char *get_type_name() { return "election_ack"; }
- MMonElectionAck() {}
- MMonElectionAck(int _q, int _n) :
- Message(MSG_MON_ELECTION_ACK),
- q(_q), refresh_num(_n) {}
-
- void decode_payload() {
- int off = 0;
- payload.copy(off, sizeof(q), (char*)&q);
- off += sizeof(q);
- payload.copy(off, sizeof(refresh_num), (char*)&refresh_num);
- off += sizeof(refresh_num);
- }
- void encode_payload() {
- payload.append((char*)&q, sizeof(q));
- payload.append((char*)&refresh_num, sizeof(refresh_num));
- }
-
- virtual char *get_type_name() { return "MonElAck"; }
+ void encode_payload() {}
+ void decode_payload() {}
};
#endif
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+#ifndef __MMONELECTIONPROPOSE_H
+#define __MMONELECTIONPROPOSE_H
+
+#include "msg/Message.h"
+
+
+class MMonElectionPropose : public Message {
+ public:
+ MMonElectionPropose() : Message(MSG_MON_ELECTION_PROPOSE) {}
+
+ virtual char *get_type_name() { return "election_propose"; }
+
+ void encode_payload() {}
+ void decode_payload() {}
+
+};
+
+#endif
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+#ifndef __MMONELECTIONVICTORY_H
+#define __MMONELECTIONVICTORY_H
+
+#include "msg/Message.h"
+
+
+class MMonElectionVictory : public Message {
+ public:
+ //set<int> active_set;
+
+ MMonElectionVictory(/*set<int>& as*/) : Message(MSG_MON_ELECTION_VICTORY)//,
+ //active_set(as)
+ {}
+
+ virtual char *get_type_name() { return "election_victory"; }
+
+ void encode_payload() {
+ //::_encode(active_set, payload);
+ }
+ void decode_payload() {
+ //int off = 0;
+ //::_decode(active_set, payload, off);
+ }
+};
+
+#endif
#include "common/Timer.h"
-#include "messages/MMonElectionRefresh.h"
-#include "messages/MMonElectionStatus.h"
+#include "messages/MMonElectionPropose.h"
#include "messages/MMonElectionAck.h"
-#include "messages/MMonElectionCollect.h"
+#include "messages/MMonElectionVictory.h"
#include "config.h"
#undef dout
-#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cout << "mon" << whoami << " "
-#define derr(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cerr << "mon" << whoami << " "
+#define derr(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cerr << g_clock.now() << " mon" << mon->whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".elector "
+#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cout << g_clock.now() << " mon" << mon->whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".elector "
+void Elector::start()
+{
+ dout(5) << "start -- can i be leader?" << endl;
-class C_Elect_ReadTimer : public Context {
- Elector *mon;
-public:
- C_Elect_ReadTimer(Elector *m) : mon(m){}
- void finish(int r) {
- mon->read_timer();
+ leader_acked = -1;
+
+ // start by trying to elect me
+ start_stamp = g_clock.now();
+ acked_me.clear();
+ acked_me.insert(whoami);
+ electing_me = true;
+
+ // bcast to everyone else
+ for (int i=0; i<mon->monmap->num_mon; ++i) {
+ if (i == whoami) continue;
+ mon->messenger->send_message(new MMonElectionPropose,
+ mon->monmap->get_inst(i));
}
-};
+
+ reset_timer();
+}
-void Elector::read_timer()
+void Elector::defer(int who)
{
- read_num++;
- status_msg_count = 0;
- old_views = views; // TODO deep copy
- for (unsigned i=0; i<processes.size(); i++) {
- mon->messenger->send_message(new MMonElectionCollect(read_num),
- mon->monmap->get_inst(processes[i]));
- }
-};
+ dout(5) << "defer to " << who << endl;
-class C_Elect_TripTimer : public Context {
- Elector *mon;
-public:
- C_Elect_TripTimer(Elector *m) : mon(m){}
- void finish(int r) {
- mon->trip_timer();
+ if (electing_me) {
+ acked_me.clear();
+ electing_me = false;
}
-};
-
-void Elector::trip_timer()
-{
- views[whoami].expired = true;
- registry[whoami].epoch.s_num++;
- dout(1) << "Process " << whoami
- << " timed out (" << ack_msg_count << "/" << (f + 1)
- << ") ... increasing epoch. Now epoch is "
- << registry[whoami].epoch.s_num
- << endl;
-};
+ // ack them
+ leader_acked = who;
+ ack_stamp = g_clock.now();
+ mon->messenger->send_message(new MMonElectionAck,
+ mon->monmap->get_inst(who));
+
+ // set a timer
+ reset_timer();
+}
-class C_Elect_RefreshTimer : public Context {
- Elector *mon;
+class C_Mon_ElectionExpire : public Context {
+ Elector *elector;
public:
- C_Elect_RefreshTimer(Elector *m) : mon(m) {}
+ C_Mon_ElectionExpire(Elector *e) : elector(e) { }
void finish(int r) {
- mon->refresh_timer();
+ elector->expire();
}
};
-void Elector::refresh_timer()
+void Elector::reset_timer()
{
- ack_msg_count = 0;
- refresh_num++;
- MMonElectionRefresh *msg = new MMonElectionRefresh(whoami, registry[whoami], refresh_num);
- for (unsigned i=0; i<processes.size(); i++) {
- mon->messenger->send_message(msg, mon->monmap->get_inst(processes[i]));
- }
-
- // Start the trip timer
- //round_trip_timer = new C_Elect_TripTimer(this);
- mon->timer.add_event_after(trip_delta, new C_Elect_TripTimer(this));
-};
+ // set the timer
+ cancel_timer();
+ expire_event = new C_Mon_ElectionExpire(this);
+ g_timer.add_event_after(g_conf.mon_lease,
+ expire_event);
+}
+void Elector::cancel_timer()
+{
+ if (expire_event)
+ g_timer.cancel_event(expire_event);
+}
-//////////////////////////
+void Elector::expire()
+{
+ dout(5) << "election timer expired" << endl;
+
+ // did i win?
+ if (electing_me &&
+ acked_me.size() > (unsigned)(mon->monmap->num_mon / 2)) {
+ // i win
+ victory();
+ } else {
+ // whoever i deferred to didn't declare victory quickly enough.
+ start();
+ }
+}
-Elector::Epoch Elector::get_min_epoch()
+void Elector::victory()
{
- assert(!views.empty());
- Epoch min = views[0].state.epoch;
- for (unsigned i=1; i<views.size(); i++) {
- if (views[i].state.epoch < min && !views[i].expired) {
- min = views[i].state.epoch;
- }
+ leader_acked = -1;
+ electing_me = false;
+
+ // tell everyone
+ for (int i=0; i<mon->monmap->num_mon; ++i) {
+ if (i == whoami) continue;
+ mon->messenger->send_message(new MMonElectionVictory,
+ mon->monmap->get_inst(i));
}
- return min;
+
+ // tell monitor
+ mon->win_election(acked_me);
}
-void Elector::dispatch(Message *m)
+void Elector::handle_propose(MMonElectionPropose *m)
{
- switch (m->get_type()) {
- case MSG_MON_ELECTION_ACK:
- handle_ack((MMonElectionAck*)m);
- break;
-
- case MSG_MON_ELECTION_STATUS:
- handle_status((MMonElectionStatus*)m);
- break;
-
- case MSG_MON_ELECTION_COLLECT:
- handle_collect((MMonElectionCollect*)m);
- break;
-
- case MSG_MON_ELECTION_REFRESH:
- handle_refresh((MMonElectionRefresh*)m);
- break;
-
- default:
- assert(0);
+ dout(5) << "handle_propose from " << m->get_source() << endl;
+ int from = m->get_source().num();
+
+ if (from > whoami) {
+ // wait, i should win!
+ if (!electing_me)
+ start();
+ } else {
+ // they would win over me
+ if (leader_acked < 0 || // haven't acked anyone yet, or
+ leader_acked > from) { // they would win over who you did ack
+ defer(from);
+ } else {
+ // ignore them!
+ dout(5) << "no, we already acked " << leader_acked << endl;
+ }
}
+
+ delete m;
}
-
-void Elector::handle_ack(MMonElectionAck* msg)
+
+void Elector::handle_ack(MMonElectionAck *m)
{
- assert(refresh_num >= msg->refresh_num);
+ dout(5) << "handle_ack from " << m->get_source() << endl;
+ int from = m->get_source().num();
- if (refresh_num > msg->refresh_num) {
- // we got the message too late... discard it
- return;
- }
- ack_msg_count++;
- if (ack_msg_count >= f + 1) {
- dout(5) << "Received _f+1 acks, increase freshness" << endl;
- //mon->timer.cancel_event(round_trip_task);
- //round_trip_timer->cancel();
- registry[whoami].freshness++;
+ if (electing_me) {
+ // thanks
+ acked_me.insert(from);
+ dout(5) << " so far i have " << acked_me << endl;
+
+ // is that _everyone_?
+ if (acked_me.size() == (unsigned)mon->monmap->num_mon) {
+ // if yes, shortcut to election finish
+ victory();
+ }
+ } else {
+ // ignore, i'm deferring already.
}
- delete msg;
+ delete m;
}
-void Elector::handle_collect(MMonElectionCollect* msg)
+void Elector::handle_victory(MMonElectionVictory *m)
{
- mon->messenger->send_message(new MMonElectionStatus(msg->get_source().num(),
- msg->read_num,
- registry),
- mon->monmap->get_inst(msg->get_source().num()));
- delete msg;
+ dout(5) << "handle_victory from " << m->get_source() << endl;
+ int from = m->get_source().num();
+
+ if (from < whoami) {
+ // ok, fine, they win
+ mon->lose_election(from);
+
+ // cancel my timer
+ cancel_timer();
+ } else {
+ // no, that makes no sense, i should win. start over!
+ start();
+ }
}
-void Elector::handle_refresh(MMonElectionRefresh* msg)
-{
- if (registry[msg->p] < msg->state) {
- // update local data
- registry[msg->p] = msg->state;
-
- // reply to msg
- mon->messenger->send_message(new MMonElectionAck(msg->p,
- msg->refresh_num),
- mon->monmap->get_inst(msg->get_source().num()));
- }
- delete msg;
-}
-void Elector::handle_status(MMonElectionStatus* msg)
+void Elector::dispatch(Message *m)
{
- if (read_num != msg->read_num) {
- dout(1) << "handle_status "
- << ":DISCARDED B/C OF READNUM(" << read_num << ":"
- << msg->read_num << ")"
- << endl;
- return;
- }
- for (unsigned i=0; i<processes.size(); i++) {
- int r = processes[i];
- // Put in the view the max value between then new state and the stored one
- if ( msg->registry[r] > views[r].state ) {
- views[r].state = msg->registry[r];
- }
- }
-
- status_msg_count++;
- if (status_msg_count >= (int)processes.size() - f) { // Responses from quorum collected
- for (unsigned i=0; i<processes.size(); i++) {
- int r = processes[i];
- // Check if r has refreshed its epoch number
- if (!( views[r].state > old_views[r].state )) {
- dout(5) << ":Other process (" << r << ") has expired" << endl;
- views[r].expired = true;
- }
- if (views[r].state.epoch > old_views[r].state.epoch) {
- views[r].expired = false;
- }
- }
- Epoch leader_epoch = get_min_epoch();
- leader_id = leader_epoch.p_id;
- dout(1) << " thinks leader has ID: " << leader_id << endl;
+ switch (m->get_type()) {
+ case MSG_MON_ELECTION_ACK:
+ handle_ack((MMonElectionAck*)m);
+ break;
+
+ case MSG_MON_ELECTION_PROPOSE:
+ handle_propose((MMonElectionPropose*)m);
+ break;
- // Restarts the timer for the next iteration
- mon->timer.add_event_after(main_delta + trip_delta, new C_Elect_ReadTimer(this));
+ case MSG_MON_ELECTION_VICTORY:
+ handle_victory((MMonElectionVictory*)m);
+ break;
+
+ default:
+ assert(0);
}
}
#include "include/types.h"
#include "msg/Message.h"
+#include "include/Context.h"
+
+#include "common/Timer.h"
class Monitor;
class Elector {
- public:
-
- //// sub-classes
-
- // Epoch
- class Epoch {
- public:
- int p_id;
- int s_num;
-
- Epoch(int p_id=0, int s_num=0) {
- this->p_id = p_id;
- this->s_num = s_num;
- }
- };
-
-
- // State
- class State {
- public:
- Epoch epoch;
- int freshness;
-
- State() : freshness(0) {};
- State(Epoch& e, int f) :
- epoch(e), freshness(f) {}
- };
-
-
- class View {
- public:
- State state;
- bool expired;
- View() : expired(false) {}
- View(State& s, bool e) : state(s), expired(e) {}
- };
-
-
- ///////////////
private:
Monitor *mon;
int whoami;
- // used during refresh phase
- int ack_msg_count;
- int refresh_num;
-
- // used during read phase
- int read_num;
- int status_msg_count;
-
- // the leader process id
- int leader_id;
- // f-accessible
- int f;
-
- // the processes that compose the group
- vector<int> processes;
- // parameters for the process
- int main_delta;
- int trip_delta;
-
- // state variables
- map<int, State> registry;
- map<int, View> views;
- map<int, View> old_views;
+ Context *expire_event;
- // get the minimum epoch in the view map
- Epoch get_min_epoch();
+ void reset_timer();
+ void cancel_timer();
+
+ // electing me
+ bool electing_me;
+ utime_t start_stamp;
+ set<int> acked_me;
+
+ // electing them
+ int leader_acked; // who i've acked
+ utime_t ack_stamp; // and when
- // handlers for election messages
+ public:
+
+ void start(); // start an electing me
+ void defer(int who);
+ void expire(); // timer goes off
+ void victory();
+
+ void handle_propose(class MMonElectionPropose *m);
void handle_ack(class MMonElectionAck *m);
- void handle_collect(class MMonElectionCollect *m);
- void handle_refresh(class MMonElectionRefresh *m);
- void handle_status(class MMonElectionStatus *m);
+ void handle_victory(class MMonElectionVictory *m);
+
public:
Elector(Monitor *m, int w) : mon(m), whoami(w) {
// initialize all those values!
// ...
}
- // timer methods
- void read_timer();
- void trip_timer();
- void refresh_timer();
-
void dispatch(Message *m);
-
};
-inline bool operator>(const Elector::Epoch& l, const Elector::Epoch& r) {
- if (l.s_num == r.s_num)
- return (l.p_id > r.p_id);
- else
- return (l.s_num > r.s_num);
-}
-
-inline bool operator<(const Elector::Epoch& l, const Elector::Epoch& r) {
- if (l.s_num == r.s_num)
- return (l.p_id < r.p_id);
- else
- return (l.s_num < r.s_num);
-}
-
-inline bool operator==(const Elector::Epoch& l, const Elector::Epoch& r) {
- return ((l.s_num == r.s_num) && (l.p_id > r.p_id));
-}
-
-
-inline bool operator>(const Elector::State& l, const Elector::State& r)
-{
- if (l.epoch == r.epoch)
- return (l.freshness > r.freshness);
- else
- return l.epoch > r.epoch;
-}
-
-inline bool operator<(const Elector::State& l, const Elector::State& r)
-{
- if (l.epoch == r.epoch)
- return (l.freshness < r.freshness);
- else
- return l.epoch < r.epoch;
-}
-
-inline bool operator==(const Elector::State& l, const Elector::State& r)
-{
- return ( (l.epoch == r.epoch) && (l.freshness == r.freshness) );
-}
-
-
#endif
// start ticker
reset_tick();
-
+
+ // call election?
+ if (monmap->num_mon > 1) {
+ assert(monmap->num_mon != 2);
+ call_election();
+ } else {
+ // we're standalone.
+ set<int> q;
+ q.insert(whoami);
+ win_election(q);
+ }
+
lock.Unlock();
}
void Monitor::call_election()
{
+ if (monmap->num_mon == 1) return;
+
dout(10) << "call_election" << endl;
state = STATE_STARTING;
+ elector.start();
+
osdmon->election_starting();
//mdsmon->election_starting();
}
+void Monitor::win_election(set<int>& active)
+{
+ state = STATE_LEADER;
+ leader = whoami;
+ quorum = active;
+ dout(10) << "win_election, quorum is " << quorum << endl;
+
+ // init
+ osdmon->election_finished();
+ //mdsmon->election_finished();
+}
+void Monitor::lose_election(int l)
+{
+ state = STATE_PEON;
+ leader = l;
+ dout(10) << "lose_election, leader is mon" << leader << endl;
+}
// elector messages
+ case MSG_MON_ELECTION_PROPOSE:
case MSG_MON_ELECTION_ACK:
- case MSG_MON_ELECTION_STATUS:
- case MSG_MON_ELECTION_COLLECT:
- case MSG_MON_ELECTION_REFRESH:
+ case MSG_MON_ELECTION_VICTORY:
elector.dispatch(m);
break;
epoch_t mon_epoch; // monitor epoch (election instance)
set<int> quorum; // current active set of monitors (if !starting)
- void call_election();
+ //void call_election();
// monitor state
- const static int STATE_STARTING = 0;
- const static int STATE_LEADER = 1;
- const static int STATE_PEON = 2;
+ const static int STATE_STARTING = 0; // electing
+ const static int STATE_LEADER = 1;
+ const static int STATE_PEON = 2;
int state;
int leader; // current leader (to best of knowledge)
friend class MDSMonitor;
friend class ClientMonitor;
+ // initiate election
+ void call_election();
+
+ // end election (called by Elector)
+ void win_election(set<int>& q);
+ void lose_election(int l);
+
+
public:
Monitor(int w, Messenger *m, MonMap *mm) :
whoami(w),
leader(0),
osdmon(0), mdsmon(0), clientmon(0)
{
- // hack leader, until election works.
- if (whoami == 0)
- state = STATE_LEADER;
- else
- state = STATE_PEON;
}
+
void init();
void shutdown();
void dispatch(Message *m);
// set up pending_inc
pending_inc.epoch = osdmap.get_epoch()+1;
-
- } else {
- // FIXME. when elections work!
- if (mon->is_leader()) {
- create_initial();
- issue_leases();
- }
}
}
void OSDMonitor::election_finished()
{
- dout(10) << "election_starting" << endl;
+ dout(10) << "election_finished" << endl;
state = STATE_INIT;
+ // map?
+ if (osdmap.get_epoch() == 0 &&
+ mon->is_leader()) {
+ create_initial();
+ }
+
+
+
if (mon->is_leader()) {
// leader.
if (mon->monmap->num_mon == 1) {
}
else if (mon->is_peon()) {
// peon. send info
- messenger->send_message(new MMonOSDMapInfo(osdmap.epoch, osdmap.mon_epoch),
- mon->monmap->get_inst(mon->leader));
+ //messenger->send_message(new MMonOSDMapInfo(osdmap.epoch, osdmap.mon_epoch),
+ // mon->monmap->get_inst(mon->leader));
}
}
*/
#include "messages/MMonElectionAck.h"
-#include "messages/MMonElectionCollect.h"
-#include "messages/MMonElectionRefresh.h"
-#include "messages/MMonElectionStatus.h"
+#include "messages/MMonElectionPropose.h"
+#include "messages/MMonElectionVictory.h"
#include "messages/MPing.h"
#include "messages/MPingAck.h"
break;
*/
- case MSG_MON_ELECTION_ACK:
- m = new MMonElectionAck();
- break;
- case MSG_MON_ELECTION_COLLECT:
- m = new MMonElectionCollect();
+ case MSG_MON_ELECTION_PROPOSE:
+ m = new MMonElectionPropose;
break;
- case MSG_MON_ELECTION_REFRESH:
- m = new MMonElectionRefresh();
+ case MSG_MON_ELECTION_ACK:
+ m = new MMonElectionAck;
break;
- case MSG_MON_ELECTION_STATUS:
- m = new MMonElectionStatus();
+ case MSG_MON_ELECTION_VICTORY:
+ m = new MMonElectionVictory;
break;
case MSG_PING:
#define MSG_MON_ELECTION_ACK 15
-#define MSG_MON_ELECTION_COLLECT 16
-#define MSG_MON_ELECTION_REFRESH 17
-#define MSG_MON_ELECTION_STATUS 18
+#define MSG_MON_ELECTION_PROPOSE 16
+#define MSG_MON_ELECTION_VICTORY 17
#define MSG_MON_OSDMAP_INFO 20
#define MSG_MON_OSDMAP_LEASE 21
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
+#include <signal.h>
#include "config.h"
* Accepter
*/
+void simplemessenger_sigint(int r)
+{
+ rank.sigint();
+}
+
+void Rank::sigint()
+{
+ lock.Lock();
+ derr(0) << "got control-c, exiting" << endl;
+ ::close(accepter.listen_sd);
+ exit(-1);
+ lock.Unlock();
+}
+
+
+
+
int Rank::Accepter::start()
{
// bind to a socket
derr(0) << "accepter.start unable to bind to " << rank.listen_addr << endl;
assert(rc >= 0);
+ // what port did we get?
socklen_t llen = sizeof(rank.listen_addr);
getsockname(listen_sd, (sockaddr*)&rank.listen_addr, &llen);
- int myport = rank.listen_addr.sin_port;
+ int myport = ntohs(rank.listen_addr.sin_port);
+ dout(10) << "accepter.start bound to port " << myport << endl;
// listen!
rc = ::listen(listen_sd, 1000);
memcpy((char *) &rank.listen_addr.sin_addr.s_addr,
myhostname->h_addr_list[0],
myhostname->h_length);
- rank.listen_addr.sin_port = myport;
+ rank.listen_addr.sin_port = htons(myport);
rank.my_addr.set_addr(rank.listen_addr);
}
dout(10) << "accepter.start my addr is " << rank.my_addr << endl;
+ // set up signal handler
+ signal(SIGINT, simplemessenger_sigint);
+
// start thread
create();
tcpaddr_t tcpaddr;
peer_addr.make_addr(tcpaddr);
rc = ::connect(sd, (sockaddr*)&tcpaddr, sizeof(myAddr));
- if (rc < 0) return rc;
+ if (rc < 0) {
+ dout(10) << "connect error " << peer_addr
+ << ", " << errno << ": " << strerror(errno) << endl;
+ return rc;
+ }
- // identify peer
+ // identify peer ..... FIXME
entity_addr_t paddr;
rc = tcp_read(sd, (char*)&paddr, sizeof(paddr));
if (peer_addr != paddr) {
if (!server) {
int rc = connect();
if (rc < 0) {
- derr(1) << "pipe(" << peer_addr << ' ' << this << ").writer error connecting" << endl;
+ derr(1) << "pipe(" << peer_addr << ' ' << this << ").writer error connecting, "
+ << errno << ": " << strerror(errno)
+ << endl;
done = true;
list<Message*> out;
fail(out);
if (write_message(m) < 0) {
// failed!
- derr(1) << "pipe(" << peer_addr << ' ' << this << ").writer error sending " << *m << " to " << m->get_dest() << endl;
+ derr(1) << "pipe(" << peer_addr << ' ' << this << ").writer error sending " << *m << " to " << m->get_dest()
+ << ", " << errno << ": " << strerror(errno)
+ << endl;
out.push_front(m);
fail(out);
done = true;
/* Rank - per-process
*/
class Rank {
-
+public:
+ void sigint();
+
+private:
class EntityMessenger;
class Pipe;
}
int start();
} accepter;
+
+ void sigint(int r);
// pipe
<< (unsigned)addr[1] << "."
<< (unsigned)addr[2] << "."
<< (unsigned)addr[3] << ":"
- << (int)a.sin_port;
+ << ntohs(a.sin_port);
return out;
}
#include <sys/stat.h>
#include <fcntl.h>
+#ifdef DARWIN
+#include <sys/param.h>
+#include <sys/mount.h>
+#endif // DARWIN
+
int myrand()
{
//#include <sys/xattr.h>
//#include <sys/vfs.h>
+#ifdef DARWIN
+#include <sys/param.h>
+#include <sys/mount.h>
+#endif // DARWIN
+
#include "config.h"
#undef dout
#define dout(l) if (l<=g_conf.debug) cout << "osd" << whoami << ".fakestore "
#include "include/Distribution.h"
#include <sys/stat.h>
+
+#ifdef DARWIN
+#include <sys/statvfs.h>
+#else
#include <sys/vfs.h> /* or <sys/statfs.h> */
+#endif /* DARWIN */
#include <list>
using namespace std;
static const int OP_RMATTR = 16; // oid, attrname
static const int OP_CLONE = 17; // oid, newoid
+ static const int OP_TRIMCACHE = 18; // oid, offset, len
+
static const int OP_MKCOLL = 20; // cid
static const int OP_RMCOLL = 21; // cid
static const int OP_COLL_ADD = 22; // cid, oid
lengths.push_back(len);
bls.push_back(bl);
}
+ void trim_from_cache(object_t oid, off_t off, size_t len) {
+ int op = OP_TRIMCACHE;
+ ops.push_back(op);
+ oids.push_back(oid);
+ offsets.push_back(off);
+ lengths.push_back(len);
+ }
void truncate(object_t oid, off_t off) {
int op = OP_TRUNCATE;
ops.push_back(op);
}
break;
+ case Transaction::OP_TRIMCACHE:
+ {
+ object_t oid = t.oids.front(); t.oids.pop_front();
+ off_t offset = t.offsets.front(); t.offsets.pop_front();
+ size_t len = t.lengths.front(); t.lengths.pop_front();
+ trim_from_cache(oid, offset, len);
+ }
+ break;
+
case Transaction::OP_TRUNCATE:
{
object_t oid = t.oids.front(); t.oids.pop_front();
off_t offset, size_t len,
bufferlist& bl,
Context *onsafe) = 0;//{ return -1; }
+ virtual void trim_from_cache(object_t oid,
+ off_t offset, size_t len) { }
virtual int setattr(object_t oid, const char *name,
const void *value, size_t size,