ifeq ($(target),darwin)
# For Darwin
CFLAGS = -g -Wall -I. -D_FILE_OFFSET_BITS=64 -DMPICH_IGNORE_CXX_SEEK -D_REENTRANT -D_THREAD_SAFE -DDARWIN -D__FreeBSD__=10 ${EXTRA_CFLAGS}
+LDINC = ar -rc
else
# For linux
CFLAGS = -g -Wall -I. -D_FILE_OFFSET_BITS=64 -DMPICH_IGNORE_CXX_SEEK -D_REENTRANT -D_THREAD_SAFE
+LDINC = ld -i -o
endif
CC = g++
mon/OSDMonitor.o\
mon/MDSMonitor.o\
mon/ClientMonitor.o\
- mon/Elector.o
+ mon/Elector.o\
+ mon/MonitorStore.o
COMMON_OBJS= \
msg/Messenger.o\
# libceph
libceph.o: client/ldceph.o client/Client.o ${COMMON_OBJS} ${SYN_OBJS} ${OSDC_OBJS}
- ar -rc $@ $^
+ ${LDINC} $@ $^
bench/mdtest/mdtest.o: bench/mdtest/mdtest.c
mpicc -c $^ -o $@
rm -f *.o */*.o ${TARGETS} ${TEST_TARGETS}
common.o: ${COMMON_OBJS}
- ar -rc $@ $^
+ ${LDINC} $@ $^
ebofs.o: ${EBOFS_OBJS}
- ar -rc $@ $^
+ ${LDINC} $@ $^
client.o: ${CLIENT_OBJS}
- ar -rc $@ $^
+ ${LDINC} $@ $^
osd.o: ${OSD_OBJS}
- ar -rc $@ $^
+ ${LDINC} $@ $^
osdc.o: ${OSDC_OBJS}
- ar -rc $@ $^
+ ${LDINC} $@ $^
osd_obfs.o: osd/OBFSStore.o osd/OSD.cc osd/PG.o osd/ObjectStore.o osd/FakeStore.o
${MPICC} -DUSE_OBFS ${MPICFLAGS} ${MPILIBS} $^ -o $@ ../uofs/uofs.o
mds.o: ${MDS_OBJS}
- ar -rc $@ $^
+ ${LDINC} $@ $^
mon.o: ${MON_OBJS}
- ar -rc $@ $^
+ ${LDINC} $@ $^
%.o: %.cc
${CC} ${CFLAGS} -c $< -o $@
#include <sys/stat.h>
#include <fcntl.h>
-#ifdef DARWIN
#include <sys/statvfs.h>
-#endif // DARWIN
#include <iostream>
return 0;
}
-#ifdef DARWIN
int Client::statfs(const char *path, struct statvfs *stbuf)
{
bzero (stbuf, sizeof (struct statvfs));
return 0;
}
-#else
-int Client::statfs(const char *path, struct statfs *stbuf)
-{
- assert(0); // implement me
- return 0;
-}
-#endif
int Client::lazyio_propogate(int fd, off_t offset, size_t count)
int unmount();
// these shoud (more or less) mirror the actual system calls.
-#ifdef DARWIN
int statfs(const char *path, struct statvfs *stbuf);
-#else
- int statfs(const char *path, struct statfs *stbuf);
-#endif
// crap
int chdir(const char *s);
#include <fcntl.h>
#include <dirent.h>
#include <errno.h>
-#ifdef DARWIN
#include <sys/statvfs.h>
-#else
-#include <sys/statfs.h>
-#endif // DARWIN
// ceph stuff
*/
-#ifdef DARWIN
static int ceph_statfs(const char *path, struct statvfs *stbuf)
{
return client->statfs(path, stbuf);
}
-#else
-static int ceph_statfs(const char *path, struct statfs *stbuf)
-{
- return client->statfs(path, stbuf);
-}
-#endif
}
*/
+void ObjectCache::touch_bottom(block_t bstart, block_t blast)
+{
+ for (map<block_t, BufferHead*>::iterator p = data.lower_bound(bstart);
+ p != data.end();
+ ++p) {
+ BufferHead *bh = p->second;
+
+ // don't trim unless it's entirely in our range
+ if (bh->start() < bstart) continue;
+ if (bh->end() > blast) break;
+
+ dout(12) << "moving " << *bh << " to bottom of lru" << endl;
+ bc->touch_bottom(bh); // move to bottom of lru list
+ }
+}
+
+
void ObjectCache::truncate(block_t blocks, version_t super_epoch)
{
dout(7) << "truncate " << object_id
utime_t dirty_stamp;
+ bool want_to_expire; // wants to be at bottom of lru
+
public:
BufferHead(ObjectCache *o) :
oc(o), //cancellable_ioh(0), tx_epoch(0),
rx_ioh(0), tx_ioh(0), tx_block(0), partial_tx_to(0), partial_tx_epoch(0),
shadow_of(0),
- ref(0), state(STATE_MISSING), epoch_modified(0), version(0), last_flushed(0)
+ ref(0), state(STATE_MISSING), epoch_modified(0), version(0), last_flushed(0),
+ want_to_expire(false)
{}
~BufferHead() {
unpin_shadows();
interval_set<block_t>& alloc,
map<block_t, BufferHead*>& hits,
version_t super_epoch); // can write to these.
+ void touch_bottom(block_t bstart, block_t blast);
BufferHead *split(BufferHead *bh, block_t off);
} else
lru_rest.lru_touch(bh);
}
+ void touch_bottom(BufferHead *bh) {
+ if (bh->is_dirty()) {
+ bh->want_to_expire = true;
+ lru_dirty.lru_bottouch(bh);
+ } else
+ lru_rest.lru_bottouch(bh);
+ }
void remove_bh(BufferHead *bh) {
bh->get_oc()->remove_bh(bh);
stat_sub(bh);
}
if (s != BufferHead::STATE_DIRTY && bh->get_state() == BufferHead::STATE_DIRTY) {
lru_dirty.lru_remove(bh);
- lru_rest.lru_insert_mid(bh);
+ if (bh->want_to_expire)
+ lru_rest.lru_insert_bot(bh);
+ else
+ lru_rest.lru_insert_mid(bh);
dirty_bh.erase(bh);
}
*/
}
+void Ebofs::trim_from_cache(object_t oid, off_t off, size_t len)
+{
+ ebofs_lock.Lock();
+ _trim_from_cache(oid, off, len);
+ ebofs_lock.Unlock();
+}
+
+void Ebofs::_trim_from_cache(object_t oid, off_t off, size_t len)
+{
+ Onode *on = 0;
+ if (onode_map.count(oid) == 0) {
+ dout(7) << "_trim_from_cache " << oid << " " << off << "~" << len << " ... onode not in cache " << endl;
+ return;
+ }
+
+ if (!on->have_oc())
+ return; // nothing is cached.
+
+ // map to blocks
+ block_t bstart = off / EBOFS_BLOCK_SIZE;
+ block_t blast = (len+off-1) / EBOFS_BLOCK_SIZE;
+
+ ObjectCache *oc = on->get_oc(&bc);
+ oc->touch_bottom(bstart, blast);
+
+ return;
+}
+
+
int Ebofs::read(object_t oid,
off_t off, size_t len,
bufferlist& bl)
}
break;
+ case Transaction::OP_TRIMCACHE:
+ {
+ object_t oid = t.oids.front(); t.oids.pop_front();
+ off_t offset = t.offsets.front(); t.offsets.pop_front();
+ size_t len = t.lengths.front(); t.lengths.pop_front();
+ _trim_from_cache(oid, offset, len);
+ }
+ break;
+
case Transaction::OP_TRUNCATE:
{
object_t oid = t.oids.front(); t.oids.pop_front();
int is_cached(object_t oid, off_t off, size_t len);
int write(object_t oid, off_t off, size_t len, bufferlist& bl, Context *onsafe);
+ void trim_from_cache(object_t oid, off_t off, size_t len);
int truncate(object_t oid, off_t size, Context *onsafe=0);
int truncate_front(object_t oid, off_t size, Context *onsafe=0);
int remove(object_t oid, Context *onsafe=0);
bool _write_will_block();
int _write(object_t oid, off_t off, size_t len, bufferlist& bl);
+ void _trim_from_cache(object_t oid, off_t off, size_t len);
int _truncate(object_t oid, off_t size);
int _truncate_front(object_t oid, off_t size);
int _remove(object_t oid);
class MClientBoot : public Message {
public:
- MClientBoot() : Message(MSG_CLIENT_BOOT) {
- }
+ MClientBoot() : Message(MSG_CLIENT_BOOT) { }
- char *get_type_name() { return "Cboot"; }
+ char *get_type_name() { return "ClientBoot"; }
- virtual void decode_payload(crope& s, int& off) {
- }
- virtual void encode_payload(crope& s) {
- }
+ void encode_payload() { }
+ void decode_payload() { }
};
#endif
MMonElectionAck() : Message(MSG_MON_ELECTION_ACK) {}
virtual char *get_type_name() { return "election_ack"; }
+
+ void encode_payload() {}
+ void decode_payload() {}
};
#endif
MMonElectionPropose() : Message(MSG_MON_ELECTION_PROPOSE) {}
virtual char *get_type_name() { return "election_propose"; }
+
+ void encode_payload() {}
+ void decode_payload() {}
+
};
#endif
virtual char *get_type_name() { return "election_victory"; }
- /*
void encode_payload() {
- ::_encode(active_set, payload);
+ //::_encode(active_set, payload);
}
void decode_payload() {
- int off = 0;
- ::_decode(active_set, payload, off);
+ //int off = 0;
+ //::_decode(active_set, payload, off);
}
- */
};
#endif
}
// copy into inst
+ memset(&tcpaddr, 0, sizeof(addr));
+ tcpaddr.sin_family = AF_INET;
memcpy((char*)&tcpaddr.sin_addr.s_addr, (char*)addr, 4);
- tcpaddr.sin_port = port;
+ tcpaddr.sin_port = htons(port);
return true;
}
cerr << "mkmonmap: invalid ip:port '" << args[i] << "'" << endl;
return -1;
}
+
entity_inst_t inst;
inst.set_addr(addr);
cout << "mkmonmap: mon" << monmap.num_mon << " " << inst << endl;
#include "config.h"
#undef dout
-#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cout << "mon" << whoami << " "
-#define derr(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cerr << "mon" << whoami << " "
+#define derr(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cerr << g_clock.now() << " mon" << mon->whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".elector "
+#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cout << g_clock.now() << " mon" << mon->whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".elector "
void Elector::start()
electing_me = true;
// bcast to everyone else
- for (unsigned i=0; i<mon->monmap->num_mon; ++i) {
+ for (int i=0; i<mon->monmap->num_mon; ++i) {
if (i == whoami) continue;
mon->messenger->send_message(new MMonElectionPropose,
MSG_ADDR_MON(i), mon->monmap->get_inst(i));
void Elector::defer(int who)
{
- dout(5) << "defer -- i'm deferring to " << who << endl;
+ dout(5) << "defer to " << who << endl;
if (electing_me) {
acked_me.clear();
// did i win?
if (electing_me &&
- acked_me.size() > mon->monmap->num_mon / 2) {
+ acked_me.size() > (unsigned)(mon->monmap->num_mon / 2)) {
// i win
victory();
} else {
void Elector::victory()
{
+ leader_acked = -1;
+ electing_me = false;
+
// tell everyone
- for (unsigned i=0; i<mon->monmap->num_mon; ++i) {
+ for (int i=0; i<mon->monmap->num_mon; ++i) {
if (i == whoami) continue;
mon->messenger->send_message(new MMonElectionVictory,
MSG_ADDR_MON(i), mon->monmap->get_inst(i));
void Elector::handle_propose(MMonElectionPropose *m)
{
- dout(5) << "propose from " << m->get_source() << endl;
+ dout(5) << "handle_propose from " << m->get_source() << endl;
int from = m->get_source().num();
if (from > whoami) {
void Elector::handle_ack(MMonElectionAck *m)
{
- dout(5) << "ack from " << m->get_source() << endl;
+ dout(5) << "handle_ack from " << m->get_source() << endl;
int from = m->get_source().num();
if (electing_me) {
dout(5) << " so far i have " << acked_me << endl;
// is that _everyone_?
- if (acked_me.size() == mon->monmap->num_mon) {
+ if (acked_me.size() == (unsigned)mon->monmap->num_mon) {
// if yes, shortcut to election finish
victory();
}
void Elector::handle_victory(MMonElectionVictory *m)
{
- dout(5) << "victory from " << m->get_source() << endl;
+ dout(5) << "handle_victory from " << m->get_source() << endl;
int from = m->get_source().num();
if (from < whoami) {
reset_tick();
// call election?
- assert(monmap->num_mon != 2);
- if (monmap->num_mon >= 3)
+ if (monmap->num_mon > 1) {
+ assert(monmap->num_mon != 2);
call_election();
+ } else {
+ // we're standalone.
+ set<int> q;
+ q.insert(whoami);
+ win_election(q);
+ }
}
void Monitor::shutdown()
//mdsmon->election_starting();
}
+void Monitor::win_election(set<int>& active)
+{
+ state = STATE_LEADER;
+ leader = whoami;
+ quorum = active;
+ dout(10) << "win_election, quorum is " << quorum << endl;
+
+ // init
+ osdmon->election_finished();
+ //mdsmon->election_finished();
+}
+void Monitor::lose_election(int l)
+{
+ state = STATE_PEON;
+ leader = l;
+ dout(10) << "lose_election, leader is mon" << leader << endl;
+}
friend class MDSMonitor;
friend class ClientMonitor;
-
// initiate election
void call_election();
- // called by Elector when it's finished
- void win_election(set<int>& active) {
- leader = whoami;
- quorum = active;
- state = STATE_LEADER;
- }
- void lose_election(int l) {
- state = STATE_PEON;
- leader = l;
- }
+ // end election (called by Elector)
+ void win_election(set<int>& q);
+ void lose_election(int l);
+
public:
Monitor(int w, Messenger *m, MonMap *mm) :
leader(0),
osdmon(0), mdsmon(0), clientmon(0)
{
- // hack leader, until election works.
- if (whoami == 0)
- state = STATE_LEADER;
- else
- state = STATE_PEON;
}
// set up pending_inc
pending_inc.epoch = osdmap.get_epoch()+1;
-
- } else {
- // FIXME. when elections work!
- if (mon->is_leader()) {
- create_initial();
- issue_leases();
- }
}
}
void OSDMonitor::election_finished()
{
- dout(10) << "election_starting" << endl;
+ dout(10) << "election_finished" << endl;
state = STATE_INIT;
+ // map?
+ if (osdmap.get_epoch() == 0 &&
+ mon->is_leader()) {
+ create_initial();
+ }
+
+
+
if (mon->is_leader()) {
// leader.
if (mon->monmap->num_mon == 1) {
}
else if (mon->is_peon()) {
// peon. send info
- messenger->send_message(new MMonOSDMapInfo(osdmap.epoch, osdmap.mon_epoch),
- MSG_ADDR_MON(mon->leader), mon->monmap->get_inst(mon->leader));
+ //messenger->send_message(new MMonOSDMapInfo(osdmap.epoch, osdmap.mon_epoch),
+ //MSG_ADDR_MON(mon->leader), mon->monmap->get_inst(mon->leader));
}
}
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
+#include <signal.h>
#include "config.h"
* Accepter
*/
+void simplemessenger_sigint(int r)
+{
+ rank.sigint();
+}
+
+void Rank::sigint()
+{
+ lock.Lock();
+ derr(0) << "got control-c, exiting" << endl;
+ ::close(accepter.listen_sd);
+ exit(-1);
+ lock.Unlock();
+}
+
+
+
+
int Rank::Accepter::start()
{
// bind to a socket
derr(0) << "accepter.start unable to bind to " << rank.listen_addr << endl;
assert(rc >= 0);
+ // what port did we get?
socklen_t llen = sizeof(rank.listen_addr);
getsockname(listen_sd, (sockaddr*)&rank.listen_addr, &llen);
- int myport = rank.listen_addr.sin_port;
+ int myport = ntohs(rank.listen_addr.sin_port);
+ dout(10) << "accepter.start bound to port " << myport << endl;
// listen!
rc = ::listen(listen_sd, 1000);
assert(rc >= 0);
-
- //dout(10) << "accepter.start listening on " << myport << endl;
// my address is...
char host[100];
memcpy((char *) &my_addr.sin_addr.s_addr,
myhostname->h_addr_list[0],
myhostname->h_length);
- my_addr.sin_port = myport;
+ my_addr.sin_port = htons(myport);
rank.listen_addr = my_addr;
dout(10) << "accepter.start listen addr is " << rank.listen_addr << endl;
+ // set up signal handler
+ signal(SIGINT, simplemessenger_sigint);
+
// start thread
create();
assert(rc>=0);
// connect!
- rc = ::connect(sd, (sockaddr*)&peer_inst.addr, sizeof(myAddr));
- if (rc < 0) return rc;
+ rc = ::connect(sd, (struct sockaddr*)&peer_inst.addr, sizeof(myAddr));
+ if (rc < 0) {
+ dout(10) << "connect error " << peer_inst
+ << ", " << errno << ": " << strerror(errno) << endl;
+ return rc;
+ }
- // identify peer
+ // identify peer ..... FIXME
entity_inst_t inst;
rc = tcp_read(sd, (char*)&inst, sizeof(inst));
if (inst.rank < 0)
if (!server) {
int rc = connect();
if (rc < 0) {
- derr(1) << "pipe(" << peer_inst << ' ' << this << ").writer error connecting" << endl;
+ derr(1) << "pipe(" << peer_inst << ' ' << this << ").writer error connecting, "
+ << errno << ": " << strerror(errno)
+ << endl;
done = true;
list<Message*> out;
fail(out);
if (write_message(m) < 0) {
// failed!
- derr(1) << "pipe(" << peer_inst << ' ' << this << ").writer error sending " << *m << " to " << m->get_dest() << endl;
+ derr(1) << "pipe(" << peer_inst << ' ' << this << ").writer error sending " << *m << " to " << m->get_dest()
+ << ", " << errno << ": " << strerror(errno)
+ << endl;
out.push_front(m);
fail(out);
done = true;
/* Rank - per-process
*/
class Rank {
-
+public:
+ void sigint();
+
+private:
class EntityMessenger;
class Pipe;
}
int start();
} accepter;
+
+ void sigint(int r);
// pipe
<< (unsigned)addr[1] << "."
<< (unsigned)addr[2] << "."
<< (unsigned)addr[3] << ":"
- << (int)a.sin_port;
+ << ntohs(a.sin_port);
return out;
}
static const int OP_RMATTR = 16; // oid, attrname
static const int OP_CLONE = 17; // oid, newoid
+ static const int OP_TRIMCACHE = 18; // oid, offset, len
+
static const int OP_MKCOLL = 20; // cid
static const int OP_RMCOLL = 21; // cid
static const int OP_COLL_ADD = 22; // cid, oid
lengths.push_back(len);
bls.push_back(bl);
}
+ void trim_from_cache(object_t oid, off_t off, size_t len) {
+ int op = OP_TRIMCACHE;
+ ops.push_back(op);
+ oids.push_back(oid);
+ offsets.push_back(off);
+ lengths.push_back(len);
+ }
void truncate(object_t oid, off_t off) {
int op = OP_TRUNCATE;
ops.push_back(op);
}
break;
+ case Transaction::OP_TRIMCACHE:
+ {
+ object_t oid = t.oids.front(); t.oids.pop_front();
+ off_t offset = t.offsets.front(); t.offsets.pop_front();
+ size_t len = t.lengths.front(); t.lengths.pop_front();
+ trim_from_cache(oid, offset, len);
+ }
+ break;
+
case Transaction::OP_TRUNCATE:
{
object_t oid = t.oids.front(); t.oids.pop_front();
off_t offset, size_t len,
bufferlist& bl,
Context *onsafe) = 0;//{ return -1; }
+ virtual void trim_from_cache(object_t oid,
+ off_t offset, size_t len) { }
virtual int setattr(object_t oid, const char *name,
const void *value, size_t size,