MON_OBJS= \
mon/Monitor.o\
+ mon/Paxos.o\
mon/OSDMonitor.o\
mon/MDSMonitor.o\
mon/ClientMonitor.o\
- mon/Elector.o
+ mon/Elector.o\
+ mon/MonitorStore.o
COMMON_OBJS= \
msg/Message.o\
OSBDB_OBJ = osbdb.o
endif
-TARGETS = cmon cosd cmds cfuse csyn newsyn fakesyn
+TARGETS = cmon cosd cmds cfuse csyn newsyn fakesyn mkmonmap
SRCS=*.cc */*.cc *.h */*.h */*/*.h
mkmonmap: mkmonmap.cc common.o
${CC} ${CFLAGS} ${LIBS} $^ -o $@
-cmon: cmon.cc mon.o ebofs.o msg/SimpleMessenger.o common.o
+cmon: cmon.cc mon.o msg/SimpleMessenger.o common.o
${CC} ${CFLAGS} ${LIBS} $^ -o $@
cosd: cosd.cc osd.o ebofs.o ${OSBDB_OBJ} msg/SimpleMessenger.o common.o
}
-void Client::ms_handle_failure(Message *m, entity_name_t dest, const entity_inst_t& inst)
+void Client::ms_handle_failure(Message *m, const entity_inst_t& inst)
{
+ entity_name_t dest = inst.name;
+
if (dest.is_mon()) {
// resend to a different monitor.
int mon = monmap->pick_mon(true);
int describe_layout(char *fn, list<ObjectExtent>& result);
- void ms_handle_failure(Message*, entity_name_t dest, const entity_inst_t& inst);
+ void ms_handle_failure(Message*, const entity_inst_t& inst);
};
#endif
fakestore_fsync: false,//true,
fakestore_writesync: false,
fakestore_syncthreads: 4,
- fakestore_fakeattr: true,
+ fakestore_fake_attrs: false,
+ fakestore_fake_collections: false,
fakestore_dev: 0,
// --- ebofs ---
g_conf.fakestore_writesync = atoi(args[++i]);
else if (strcmp(args[i], "--fakestore_dev") == 0)
g_conf.fakestore_dev = args[++i];
+ else if (strcmp(args[i], "--fakestore_fake_attrs") == 0)
+ g_conf.fakestore_fake_attrs = true;//atoi(args[++i]);
+ else if (strcmp(args[i], "--fakestore_fake_collections") == 0)
+ g_conf.fakestore_fake_collections = true;//atoi(args[++i]);
else if (strcmp(args[i], "--obfs") == 0) {
g_conf.uofs = 1;
bool fakestore_fsync;
bool fakestore_writesync;
int fakestore_syncthreads; // such crap
- bool fakestore_fakeattr;
+ bool fakestore_fake_attrs;
+ bool fakestore_fake_collections;
char *fakestore_dev;
// ebofs
object_t() : ino(0), bno(0), rev(0) {}
object_t(__uint64_t i, __uint32_t b) : ino(i), bno(b), rev(0) {}
+ object_t(__uint64_t i, __uint32_t b, __uint32_t r) : ino(i), bno(b), rev(r) {}
};
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+#ifndef __MMONPAXOS_H
+#define __MMONPAXOS_H
+
+#include "msg/Message.h"
+
+class MMonPaxos : public Message {
+ public:
+ // op types
+ const static int OP_COLLECT = 1; // proposer: propose round
+ const static int OP_LAST = 2; // voter: accept proposed round
+ const static int OP_OLDROUND = 3; // voter: notify proposer he proposed an old round
+ const static int OP_BEGIN = 4; // proposer: value proposed for this round
+ const static int OP_ACCEPT = 5; // voter: accept propsed value
+ const static int OP_SUCCESS = 7; // proposer: notify learners of agreed value
+ const static int OP_ACK = 8; // learner: notify proposer that new value has been saved
+
+ int op;
+ int machine_id;
+ version_t proposal;
+ version_t n;
+ bufferlist value;
+
+ MMonPaxos() : Message(MSG_MON_PAXOS) {}
+ MMonPaxos(int o, int mid,
+ version_t pn, version_t v) : Message(MSG_MON_PAXOS),
+ op(o), machine_id(mid),
+ proposal(pn), n(v) {}
+ MMonPaxos(int o, int mid,
+ version_t pn, version_t v,
+ bufferlist& b) : Message(MSG_MON_PAXOS),
+ op(o), machine_id(mid),
+ proposal(pn), n(v),
+ value(b) {}
+
+ virtual char *get_type_name() { return "paxos"; }
+
+ void print(ostream& out) {
+ out << "paxos(op " << op
+ << ", machine " << machine_id
+ << ", proposal " << proposal
+ << ", state " << n
+ << ", " << value.length() << " bytes)";
+ }
+
+ void encode_payload() {
+ payload.append((char*)&op, sizeof(op));
+ payload.append((char*)&machine_id, sizeof(machine_id));
+ payload.append((char*)&proposal, sizeof(proposal));
+ payload.append((char*)&n, sizeof(n));
+ ::_encode(value, payload);
+ }
+ void decode_payload() {
+ int off = 0;
+ payload.copy(off, sizeof(op), (char*)&op);
+ off += sizeof(op);
+ payload.copy(off, sizeof(machine_id), (char*)&machine_id);
+ off += sizeof(machine_id);
+ payload.copy(off, sizeof(proposal), (char*)&proposal);
+ off += sizeof(proposal);
+ payload.copy(off, sizeof(n), (char*)&n);
+ off += sizeof(n);
+ ::_decode(value, payload, off);
+ }
+};
+
+#endif
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
#include "Elector.h"
#include "Monitor.h"
// bcast to everyone else
for (int i=0; i<mon->monmap->num_mon; ++i) {
- if (i == whoami) continue;
- mon->messenger->send_message(new MMonElectionPropose,
- mon->monmap->get_inst(i));
+ if (i == whoami) continue;
+ mon->messenger->send_message(new MMonElectionPropose,
+ mon->monmap->get_inst(i));
}
reset_timer();
dout(5) << "defer to " << who << endl;
if (electing_me) {
- acked_me.clear();
- electing_me = false;
+ acked_me.clear();
+ electing_me = false;
}
// ack them
leader_acked = who;
ack_stamp = g_clock.now();
mon->messenger->send_message(new MMonElectionAck,
- mon->monmap->get_inst(who));
+ mon->monmap->get_inst(who));
// set a timer
- reset_timer();
+ reset_timer(1.0); // give the leader some extra time to declare victory
}
public:
C_Mon_ElectionExpire(Elector *e) : elector(e) { }
void finish(int r) {
- elector->expire();
+ elector->expire();
}
};
-void Elector::reset_timer()
+void Elector::reset_timer(double plus)
{
// set the timer
cancel_timer();
expire_event = new C_Mon_ElectionExpire(this);
- g_timer.add_event_after(g_conf.mon_lease,
- expire_event);
+ g_timer.add_event_after(g_conf.mon_lease + plus,
+ expire_event);
}
void Elector::cancel_timer()
{
if (expire_event)
- g_timer.cancel_event(expire_event);
+ g_timer.cancel_event(expire_event);
}
void Elector::expire()
// did i win?
if (electing_me &&
- acked_me.size() > (unsigned)(mon->monmap->num_mon / 2)) {
- // i win
- victory();
+ acked_me.size() > (unsigned)(mon->monmap->num_mon / 2)) {
+ // i win
+ victory();
} else {
- // whoever i deferred to didn't declare victory quickly enough.
- start();
+ // whoever i deferred to didn't declare victory quickly enough.
+ start();
}
}
leader_acked = -1;
electing_me = false;
+ cancel_timer();
+
// tell everyone
for (int i=0; i<mon->monmap->num_mon; ++i) {
- if (i == whoami) continue;
- mon->messenger->send_message(new MMonElectionVictory,
- mon->monmap->get_inst(i));
+ if (i == whoami) continue;
+ mon->messenger->send_message(new MMonElectionVictory,
+ mon->monmap->get_inst(i));
}
// tell monitor
int from = m->get_source().num();
if (from > whoami) {
- // wait, i should win!
- if (!electing_me)
- start();
+ // wait, i should win!
+ if (!electing_me)
+ start();
} else {
- // they would win over me
- if (leader_acked < 0 || // haven't acked anyone yet, or
- leader_acked > from) { // they would win over who you did ack
- defer(from);
- } else {
- // ignore them!
- dout(5) << "no, we already acked " << leader_acked << endl;
- }
+ // they would win over me
+ if (leader_acked < 0 || // haven't acked anyone yet, or
+ leader_acked > from || // they would win over who you did ack, or
+ leader_acked == from) { // this is the guy we're already deferring to
+ defer(from);
+ } else {
+ // ignore them!
+ dout(5) << "no, we already acked " << leader_acked << endl;
+ }
}
delete m;
int from = m->get_source().num();
if (electing_me) {
- // thanks
- acked_me.insert(from);
- dout(5) << " so far i have " << acked_me << endl;
-
- // is that _everyone_?
- if (acked_me.size() == (unsigned)mon->monmap->num_mon) {
- // if yes, shortcut to election finish
- victory();
- }
+ // thanks
+ acked_me.insert(from);
+ dout(5) << " so far i have " << acked_me << endl;
+
+ // is that _everyone_?
+ if (acked_me.size() == (unsigned)mon->monmap->num_mon) {
+ // if yes, shortcut to election finish
+ victory();
+ }
} else {
- // ignore, i'm deferring already.
+ // ignore, i'm deferring already.
}
delete m;
int from = m->get_source().num();
if (from < whoami) {
- // ok, fine, they win
- mon->lose_election(from);
-
- // cancel my timer
- cancel_timer();
+ // ok, fine, they win
+ mon->lose_election(from);
+
+ // cancel my timer
+ cancel_timer();
} else {
- // no, that makes no sense, i should win. start over!
- start();
+ // no, that makes no sense, i should win. start over!
+ start();
}
}
{
switch (m->get_type()) {
case MSG_MON_ELECTION_ACK:
- handle_ack((MMonElectionAck*)m);
- break;
+ handle_ack((MMonElectionAck*)m);
+ break;
case MSG_MON_ELECTION_PROPOSE:
- handle_propose((MMonElectionPropose*)m);
- break;
+ handle_propose((MMonElectionPropose*)m);
+ break;
case MSG_MON_ELECTION_VICTORY:
- handle_victory((MMonElectionVictory*)m);
- break;
-
+ handle_victory((MMonElectionVictory*)m);
+ break;
+
default:
- assert(0);
+ assert(0);
}
}
Context *expire_event;
- void reset_timer();
+ void reset_timer(double plus=0.0);
void cancel_timer();
// electing me
#include "MDSMonitor.h"
#include "Monitor.h"
+#include "MonitorStore.h"
#include "messages/MMDSMap.h"
#include "messages/MMDSGetMap.h"
/********* MDS map **************/
-void MDSMonitor::create_initial()
-{
- mdsmap.epoch = 0; // until everyone boots
- mdsmap.ctime = g_clock.now();
-
- print_map();
-}
-
void MDSMonitor::dispatch(Message *m)
{
switch (m->get_type()) {
}
}
+
+
+void MDSMonitor::election_finished()
+{
+ if (mon->is_leader()) {
+
+ // FIXME be smarter later.
+
+ if (g_conf.mkfs) {
+ create_initial();
+ save_map();
+ } else {
+ load_map();
+ }
+ }
+}
+
+
+void MDSMonitor::create_initial()
+{
+ mdsmap.epoch = 0; // until everyone boots
+ mdsmap.ctime = g_clock.now();
+
+ mdsmap.encode(encoded_map);
+
+ print_map();
+}
+
+void MDSMonitor::load_map()
+{
+ int r = mon->store->get_bl_ss(encoded_map, "mdsmap", "current");
+ assert(r > 0);
+ mdsmap.decode(encoded_map);
+ dout(7) << "load_map epoch " << mdsmap.get_epoch() << endl;
+}
+
+void MDSMonitor::save_map()
+{
+ dout(7) << "save_map epoch " << mdsmap.get_epoch() << endl;
+
+ int r = mon->store->put_bl_ss(encoded_map, "mdsmap", "current");
+ assert(r>=0);
+}
+
void MDSMonitor::print_map()
{
dout(7) << "print_map epoch " << mdsmap.get_epoch() << endl;
}
+void MDSMonitor::issue_map()
+{
+ mdsmap.inc_epoch();
+ encoded_map.clear();
+ mdsmap.encode(encoded_map);
+
+ dout(7) << "issue_map epoch " << mdsmap.get_epoch() << endl;
+
+ save_map();
+ print_map();
+
+ // bcast map
+ bcast_latest_mds();
+ send_current();
+}
+
void MDSMonitor::handle_mds_beacon(MMDSBeacon *m)
{
else
mdsmap.mds_state_seq.erase(from);
- // inc map version
- mdsmap.inc_epoch();
- mdsmap.encode(maps[mdsmap.get_epoch()]);
-
- print_map();
-
- // bcast map
- bcast_latest_mds();
- send_current();
+ issue_map();
}
delete m;
}
if (changed) {
- mdsmap.inc_epoch();
- mdsmap.encode(maps[mdsmap.get_epoch()]);
-
- print_map();
-
- // bcast map
- bcast_latest_mds();
- send_current();
+ issue_map();
}
}
}
MDSMap mdsmap;
private:
- map<epoch_t, bufferlist> maps;
+ bufferlist encoded_map;
//map<epoch_t, bufferlist> inc_maps;
//MDSMap::Incremental pending_inc;
void send_full(entity_inst_t dest);
void bcast_latest_mds();
+ void issue_map();
+
+ void save_map();
+ void load_map();
void print_map();
//void accept_pending(); // accept pending, new map.
public:
MDSMonitor(Monitor *mn, Messenger *m, Mutex& l) : mon(mn), messenger(m), lock(l) {
- create_initial();
}
void dispatch(Message *m);
void tick(); // check state, take actions
+ void election_starting();
+ void election_finished();
+
void send_latest(entity_inst_t dest);
};
#include "osd/OSDMap.h"
-#include "ebofs/Ebofs.h"
+#include "MonitorStore.h"
#include "msg/Message.h"
#include "msg/Messenger.h"
#include "messages/MPingAck.h"
#include "messages/MGenericMessage.h"
+#include "messages/MMonPaxos.h"
+
#include "common/Timer.h"
#include "common/Clock.h"
// store
char s[80];
- sprintf(s, "dev/mon%d", whoami);
- store = new Ebofs(s);
+ sprintf(s, "mondata/mon%d", whoami);
+ store = new MonitorStore(s);
if (g_conf.mkfs)
store->mkfs();
- int r = store->mount();
- assert(r >= 0);
+ else
+ store->mount();
// create
osdmon = new OSDMonitor(this, messenger, lock);
timer.cancel_all();
timer.join();
- // unmount my local storage
- if (store) {
- store->umount();
- delete store;
- }
-
// stop osds.
for (set<int>::iterator it = osdmon->osdmap.get_osds().begin();
it != osdmon->osdmap.get_osds().end();
messenger->send_message(new MGenericMessage(MSG_SHUTDOWN),
osdmon->osdmap.get_inst(*it));
}
+ osdmon->mark_all_down();
// monitors too.
for (int i=0; i<monmap->num_mon; i++)
messenger->send_message(new MGenericMessage(MSG_SHUTDOWN),
monmap->get_inst(i));
+ // unmount my local storage
+ if (store)
+ delete store;
+
// clean up
if (monmap) delete monmap;
if (osdmon) delete osdmon;
// init
osdmon->election_finished();
- //mdsmon->election_finished();
+ mdsmon->election_finished();
+
+ // init paxos
+ test_paxos.leader_start();
}
void Monitor::lose_election(int l)
break;
+ // paxos
+ case MSG_MON_PAXOS:
+ // send it to the right paxos instance
+ switch (((MMonPaxos*)m)->machine_id) {
+ case PAXOS_TEST:
+ test_paxos.dispatch(m);
+ break;
+ case PAXOS_OSDMAP:
+ //...
+
+ default:
+ assert(0);
+ }
+ break;
+
// elector messages
case MSG_MON_ELECTION_PROPOSE:
case MSG_MON_ELECTION_ACK:
#include "MonMap.h"
#include "Elector.h"
+#include "Paxos.h"
-class ObjectStore;
+
+class MonitorStore;
class OSDMonitor;
class MDSMonitor;
class ClientMonitor;
+#define PAXOS_TEST 0
+#define PAXOS_OSDMAP 1
+#define PAXOS_MDSMAP 2
+#define PAXOS_CLIENTMAP 3
+
class Monitor : public Dispatcher {
protected:
// me
friend class C_Mon_Tick;
// my local store
- ObjectStore *store;
+ //ObjectStore *store;
+ MonitorStore *store;
const static int INO_ELECTOR = 1;
const static int INO_MON_MAP = 2;
//void call_election();
+ // paxos
+ Paxos test_paxos;
+ friend class Paxos;
+
+
// monitor state
const static int STATE_STARTING = 0; // electing
const static int STATE_LEADER = 1;
void lose_election(int l);
+
public:
Monitor(int w, Messenger *m, MonMap *mm) :
whoami(w),
store(0),
elector(this, w),
mon_epoch(0),
+
+ test_paxos(this, w, PAXOS_TEST, "tester"), // tester state machine
+
state(STATE_STARTING),
leader(0),
osdmon(0), mdsmon(0), clientmon(0)
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "MonitorStore.h"
+#include "common/Clock.h"
+
+#include "config.h"
+#undef dout
+#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cout << g_clock.now() << " store(" << dir <<") "
+#define derr(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cerr << g_clock.now() << " store(" << dir <<") "
+
+#include <stdio.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+
+
+void MonitorStore::mount()
+{
+ dout(1) << "mount" << endl;
+ // verify dir exists
+ DIR *d = ::opendir(dir.c_str());
+ if (!d) {
+ derr(1) << "basedir " << dir << " dne" << endl;
+ assert(0);
+ }
+ ::closedir(d);
+}
+
+
+void MonitorStore::mkfs()
+{
+ dout(1) << "mkfs" << endl;
+
+ char cmd[200];
+ sprintf(cmd, "test -d %s && /bin/rm -r %s ; mkdir -p %s", dir.c_str(), dir.c_str(), dir.c_str());
+ dout(1) << cmd << endl;
+ system(cmd);
+}
+
+
+version_t MonitorStore::get_int(const char *a, const char *b)
+{
+ char fn[200];
+ if (b)
+ sprintf(fn, "%s/%s/%s", dir.c_str(), a, b);
+ else
+ sprintf(fn, "%s/%s", dir.c_str(), a);
+
+ FILE *f = ::fopen(fn, "r");
+ if (!f)
+ return 0;
+
+ char buf[20];
+ ::fgets(buf, 20, f);
+ ::fclose(f);
+
+ version_t val = atoi(buf);
+
+ if (b) {
+ dout(15) << "get_int " << a << "/" << b << " = " << val << endl;
+ } else {
+ dout(15) << "get_int " << a << " = " << val << endl;
+ }
+ return val;
+}
+
+
+void MonitorStore::put_int(version_t val, const char *a, const char *b)
+{
+ char fn[200];
+ sprintf(fn, "%s/%s", dir.c_str(), a);
+ if (b) {
+ ::mkdir(fn, 0755);
+ dout(15) << "set_int " << a << "/" << b << " = " << val << endl;
+ sprintf(fn, "%s/%s/%s", dir.c_str(), a, b);
+ } else {
+ dout(15) << "set_int " << a << " = " << val << endl;
+ }
+
+ char vs[30];
+ sprintf(vs, "%lld\n", val);
+
+ char tfn[200];
+ sprintf(tfn, "%s.new", fn);
+
+ int fd = ::open(tfn, O_WRONLY|O_CREAT);
+ assert(fd > 0);
+ ::fchmod(fd, 0644);
+ ::write(fd, vs, strlen(vs));
+ ::close(fd);
+ ::rename(tfn, fn);
+}
+
+
+// ----------------------------------------
+// buffers
+
+bool MonitorStore::exists_bl_ss(const char *a, const char *b)
+{
+ char fn[200];
+ if (b) {
+ dout(15) << "exists_bl " << a << "/" << b << endl;
+ sprintf(fn, "%s/%s/%s", dir.c_str(), a, b);
+ } else {
+ dout(15) << "exists_bl " << a << endl;
+ sprintf(fn, "%s/%s", dir.c_str(), a);
+ }
+
+ struct stat st;
+ int r = ::stat(fn, &st);
+ return r == 0;
+}
+
+
+int MonitorStore::get_bl_ss(bufferlist& bl, const char *a, const char *b)
+{
+ char fn[200];
+ if (b) {
+ sprintf(fn, "%s/%s/%s", dir.c_str(), a, b);
+ } else {
+ sprintf(fn, "%s/%s", dir.c_str(), a);
+ }
+
+ int fd = ::open(fn, O_RDONLY);
+ if (!fd) {
+ if (b) {
+ dout(15) << "get_bl " << a << "/" << b << " DNE" << endl;
+ } else {
+ dout(15) << "get_bl " << a << " DNE" << endl;
+ }
+ return 0;
+ }
+
+ // read size
+ __int32_t len = 0;
+ ::read(fd, &len, sizeof(len));
+
+ // read buffer
+ bl.clear();
+ bufferptr bp(len);
+ ::read(fd, bp.c_str(), len);
+ bl.append(bp);
+ ::close(fd);
+
+ if (b) {
+ dout(15) << "get_bl " << a << "/" << b << " = " << bl.length() << " bytes" << endl;
+ } else {
+ dout(15) << "get_bl " << a << " = " << bl.length() << " bytes" << endl;
+ }
+
+ return len;
+}
+
+int MonitorStore::put_bl_ss(bufferlist& bl, const char *a, const char *b)
+{
+ char fn[200];
+ sprintf(fn, "%s/%s", dir.c_str(), a);
+ if (b) {
+ ::mkdir(fn, 0755);
+ dout(15) << "put_bl " << a << "/" << b << " = " << bl.length() << " bytes" << endl;
+ sprintf(fn, "%s/%s/%s", dir.c_str(), a, b);
+ } else {
+ dout(15) << "put_bl " << a << " = " << bl.length() << " bytes" << endl;
+ }
+
+ char tfn[200];
+ sprintf(tfn, "%s.new", fn);
+ int fd = ::open(tfn, O_WRONLY|O_CREAT);
+ assert(fd);
+
+ // write size
+ __int32_t len = bl.length();
+ ::write(fd, &len, sizeof(len));
+
+ // write data
+ for (list<bufferptr>::const_iterator it = bl.buffers().begin();
+ it != bl.buffers().end();
+ it++)
+ ::write(fd, it->c_str(), it->length());
+
+ ::fchmod(fd, 0644);
+ ::fsync(fd);
+ ::close(fd);
+ ::rename(tfn, fn);
+
+ return 0;
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef __MON_MONITORSTORE_H
+#define __MON_MONITORSTORE_H
+
+#include "include/types.h"
+#include "include/buffer.h"
+
+#include <string.h>
+
+class MonitorStore {
+ string dir;
+
+public:
+ MonitorStore(char *d) : dir(d) {
+ }
+ ~MonitorStore() {
+ }
+
+ void mkfs(); // wipe
+ void mount();
+
+ // ints (stored as ascii)
+ version_t get_int(const char *a, const char *b=0);
+ void put_int(version_t v, const char *a, const char *b=0);
+
+ // buffers
+ // ss and sn varieties.
+ bool exists_bl_ss(const char *a, const char *b=0);
+ int get_bl_ss(bufferlist& bl, const char *a, const char *b);
+ int put_bl_ss(bufferlist& bl, const char *a, const char *b);
+ bool exists_bl_sn(const char *a, version_t b) {
+ char bs[20];
+ sprintf(bs, "%llu", b);
+ return exists_bl_ss(a, bs);
+ }
+ int get_bl_sn(bufferlist& bl, const char *a, version_t b) {
+ char bs[20];
+ sprintf(bs, "%llu", b);
+ return get_bl_ss(bl, a, bs);
+ }
+ int put_bl_sn(bufferlist& bl, const char *a, version_t b) {
+ char bs[20];
+ sprintf(bs, "%llu", b);
+ return put_bl_ss(bl, a, bs);
+ }
+
+ /*
+ version_t get_incarnation() { return get_int("incarnation"); }
+ void set_incarnation(version_t i) { set_int(i, "incarnation"); }
+
+ version_t get_last_proposal() { return get_int("last_proposal"); }
+ void set_last_proposal(version_t i) { set_int(i, "last_proposal"); }
+ */
+};
+
+
+#endif
#include "Monitor.h"
#include "MDSMonitor.h"
-#include "osd/ObjectStore.h"
+#include "MonitorStore.h"
#include "messages/MOSDFailure.h"
#include "messages/MOSDMap.h"
+/*
void OSDMonitor::init()
{
// start with blank map
pending_inc.epoch = osdmap.get_epoch()+1;
}
}
+*/
bool OSDMonitor::get_map_bl(epoch_t epoch, bufferlist& bl)
{
- object_t oid(Monitor::INO_OSD_MAP, epoch);
- if (!mon->store->exists(oid))
+ if (!mon->store->exists_bl_sn("osdmap", epoch))
return false;
- int r = mon->store->read(oid, 0, 0, bl);
+ int r = mon->store->get_bl_sn(bl, "osdmap", epoch);
assert(r > 0);
return true;
}
bool OSDMonitor::get_inc_map_bl(epoch_t epoch, bufferlist& bl)
{
- object_t oid(Monitor::INO_OSD_INC_MAP, epoch);
- if (!mon->store->exists(oid))
+ if (!mon->store->exists_bl_sn("osdincmap", epoch))
return false;
- int r = mon->store->read(oid, 0, 0, bl);
+ int r = mon->store->get_bl_sn(bl, "osdincmap", epoch);
assert(r > 0);
return true;
}
bufferlist bl;
osdmap.encode(bl);
- ObjectStore::Transaction t;
- t.write(object_t(Monitor::INO_OSD_MAP,0), 0, bl.length(), bl);
- t.write(object_t(Monitor::INO_OSD_MAP,osdmap.get_epoch()), 0, bl.length(), bl);
- mon->store->apply_transaction(t);
- mon->store->sync();
+ mon->store->put_bl_sn(bl, "osdmap", osdmap.get_epoch());
+ mon->store->put_int(osdmap.get_epoch(), "osd_epoch");
}
void OSDMonitor::save_inc_map(OSDMap::Incremental &inc)
bufferlist incbl;
inc.encode(incbl);
- ObjectStore::Transaction t;
- t.write(object_t(Monitor::INO_OSD_MAP,0), 0, bl.length(), bl);
- t.write(object_t(Monitor::INO_OSD_MAP,osdmap.get_epoch()), 0, bl.length(), bl); // not strictly needed??
- t.write(object_t(Monitor::INO_OSD_INC_MAP,osdmap.get_epoch()), 0, incbl.length(), incbl);
- mon->store->apply_transaction(t);
- mon->store->sync();
+ mon->store->put_bl_sn(bl, "osdmap", osdmap.get_epoch());
+ mon->store->put_bl_sn(incbl, "osdincmap", osdmap.get_epoch());
+ mon->store->put_int(osdmap.get_epoch(), "osd_epoch");
}
bcast_latest_mds();
}
+void OSDMonitor::mark_all_down()
+{
+ dout(7) << "mark_all_down" << endl;
+
+ for (set<int>::iterator it = osdmap.get_osds().begin();
+ it != osdmap.get_osds().end();
+ it++) {
+ if (osdmap.is_down(*it)) continue;
+ pending_inc.new_down[*it] = osdmap.get_inst(*it);
+ }
+ accept_pending();
+}
+
+
+
void OSDMonitor::handle_osd_boot(MOSDBoot *m)
{
{
dout(10) << "election_finished" << endl;
+ if (mon->is_leader()) {
+ if (g_conf.mkfs) {
+ create_initial();
+ save_map();
+ } else {
+ //
+ epoch_t epoch = mon->store->get_int("osd_epoch");
+ dout(10) << " last epoch was " << epoch << endl;
+ bufferlist bl, blinc;
+ int r = mon->store->get_bl_sn(bl, "osdmap", epoch);
+ assert(r>0);
+ osdmap.decode(bl);
+
+ // pending_inc
+ pending_inc.epoch = epoch+1;
+ }
+
+ }
+
+ /*
state = STATE_INIT;
// map?
//messenger->send_message(new MMonOSDMapInfo(osdmap.epoch, osdmap.mon_epoch),
// mon->monmap->get_inst(mon->leader));
}
-
+ */
}
int state;
utime_t lease_expire; // when lease expires
- void init();
+ //void init();
// maps
void accept_pending(); // accept pending, new map.
OSDMonitor(Monitor *mn, Messenger *m, Mutex& l) :
mon(mn), messenger(m), lock(l),
state(STATE_SYNC) {
- init();
+ //init();
}
void dispatch(Message *m);
void issue_leases();
+ void mark_all_down();
+
void fake_osd_failure(int osd, bool down);
void fake_osdmap_update();
void fake_reorg();
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "Paxos.h"
+#include "Monitor.h"
+#include "MonitorStore.h"
+
+#include "messages/MMonPaxos.h"
+
+#include "config.h"
+#undef dout
+#define derr(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cerr << g_clock.now() << " mon" << whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".paxos(" << machine_name << ") "
+#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cout << g_clock.now() << " mon" << whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".paxos(" << machine_name << ") "
+
+
+// ---------------------------------
+// proposer
+void Paxos::propose(version_t v, bufferlist& value)
+{
+//todo high rf
+}
+
+void Paxos::handle_last(MMonPaxos *m)
+{
+//todo high rf
+ dout(10) << "handle_last " << *m << endl;
+ delete m;
+}
+
+void Paxos::handle_accept(MMonPaxos *m)
+{
+//todo high rf
+ dout(10) << "handle_accept " << *m << endl;
+ delete m;
+
+}
+
+void Paxos::handle_ack(MMonPaxos *m)
+{
+//todo high rf
+ dout(10) << "handle_ack " << *m << endl;
+ delete m;
+}
+
+void Paxos::handle_old_round(MMonPaxos *m)
+{
+//todo high rf
+ dout(10) << "handle_old_round " << *m << endl;
+ delete m;
+}
+
+
+/*
+ * return a globally unique, monotonically increasing proposal number
+ */
+version_t Paxos::get_new_proposal_number(version_t gt)
+{
+ // read last
+ version_t last = mon->store->get_int("last_paxos_proposal");
+ if (last < gt)
+ last = gt;
+
+ // update
+ last /= 100;
+ last++;
+
+ // make it unique among all monitors.
+ version_t pn = last*100 + (version_t)whoami;
+
+ // write
+ mon->store->put_int(pn, "last_paxos_proposal");
+
+ dout(10) << "get_new_proposal_number = " << pn << endl;
+ return pn;
+}
+
+
+// ---------------------------------
+// accepter
+void Paxos::handle_collect(MMonPaxos *m)
+{
+//todo high rf
+ // ...
+
+ delete m;
+}
+
+
+
+
+// ---------------------------------
+// learner
+void Paxos::handle_success(MMonPaxos *m)
+{
+ //todo high rf
+ delete m;
+}
+
+void Paxos::handle_begin(MMonPaxos *m)
+{
+ //todo high rf
+ delete m;
+}
+
+// ---------------------------------
+
+void Paxos::leader_start()
+{
+ dout(10) << "i am the leader" << endl;
+
+ // .. do something else too
+ version_t pn = get_new_proposal_number();
+ for (int i=0; i<mon->monmap->num_mon; ++i) {
+ if (i == whoami) continue;
+ // todo high rf I pass the pn twice... what is the last parameter for?
+ mon->messenger->send_message(new MMonPaxos(MMonPaxos::OP_COLLECT, whoami, pn, pn),
+ mon->monmap->get_inst(i));
+ }
+}
+
+
+
+void Paxos::dispatch(Message *m)
+{
+ switch (m->get_type()) {
+
+ case MSG_MON_PAXOS:
+ {
+ MMonPaxos *pm = (MMonPaxos*)m;
+
+ // NOTE: these ops are defined in messages/MMonPaxos.h
+ switch (pm->op) {
+ // learner
+ case MMonPaxos::OP_COLLECT:
+ handle_collect(pm);
+ break;
+
+ case MMonPaxos::OP_LAST:
+ handle_last(pm);
+ break;
+
+ case MMonPaxos::OP_OLDROUND:
+ handle_old_round(pm);
+ break;
+
+ case MMonPaxos::OP_BEGIN:
+ handle_begin(pm);
+ break;
+
+ case MMonPaxos::OP_ACCEPT:
+ handle_accept(pm);
+ break;
+
+ case MMonPaxos::OP_SUCCESS:
+ handle_success(pm);
+ break;
+
+ case MMonPaxos::OP_ACK:
+ handle_ack(pm);
+ break;
+
+ default:
+ assert(0);
+ }
+ }
+ break;
+
+ default:
+ assert(0);
+ }
+}
+
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+#ifndef __MON_PAXOS_H
+#define __MON_PAXOS_H
+
+#include "include/types.h"
+#include "include/buffer.h"
+#include "msg/Message.h"
+
+#include "include/Context.h"
+
+#include "common/Timer.h"
+
+class Monitor;
+class MMonPaxos;
+
+// i am one state machine.
+class Paxos {
+ Monitor *mon;
+ int whoami;
+
+ // my state machine info
+ int machine_id;
+ const char *machine_name;
+ map<version_t, bufferlist> accepted_values;
+ map<version_t, int> accepted_proposal_number;
+
+ // proposer
+ void propose(version_t v, bufferlist& value);
+
+ void handle_last(MMonPaxos*);
+ void handle_accept(MMonPaxos*);
+ void handle_ack(MMonPaxos*);
+ void handle_old_round(MMonPaxos*);
+
+ version_t get_new_proposal_number(version_t gt=0);
+
+ // accepter
+ void handle_collect(MMonPaxos*);
+
+ // learner
+ void handle_success(MMonPaxos*);
+ void handle_begin(MMonPaxos*);
+
+
+public:
+ Paxos(Monitor *m, int w,
+ int mid,const char *mnm) : mon(m), whoami(w),
+ machine_id(mid), machine_name(mnm) {
+ }
+
+ void dispatch(Message *m);
+
+ void leader_start();
+
+};
+
+
+
+#endif
+
virtual void dispatch(Message *m) = 0;
// how i deal with transmission failures.
- virtual void ms_handle_failure(Message *m, entity_name_t dest, const entity_addr_t& addr) { delete m; }
+ virtual void ms_handle_failure(Message *m, const entity_inst_t& inst) { delete m; }
};
#endif
{
// assign rank
_myinst.name = me;
- _myinst.addr.nonce = nranks++;
+ _myinst.addr.port = nranks++;
+ //if (!me.is_mon())
+ //_myinst.addr.nonce = getpid();
// add to directory
directory[ _myinst.addr ] = this;
++p) {
dout(1) << "** have " << p->first << " to " << p->second << endl;
}
- assert(dm);
+ //assert(dm);
}
dm->queue_incoming(m);
#include "messages/MNSFailure.h"
*/
+#include "messages/MMonPaxos.h"
+
#include "messages/MMonElectionAck.h"
#include "messages/MMonElectionPropose.h"
#include "messages/MMonElectionVictory.h"
break;
*/
+ case MSG_MON_PAXOS:
+ m = new MMonPaxos;
+ break;
+
case MSG_MON_ELECTION_PROPOSE:
m = new MMonElectionPropose;
break;
#define MSG_SHUTDOWN 99999
+
#define MSG_MON_ELECTION_ACK 15
#define MSG_MON_ELECTION_PROPOSE 16
#define MSG_MON_ELECTION_VICTORY 17
#define MSG_MON_OSDMAP_UPDATE_ACK 24
#define MSG_MON_OSDMAP_UPDATE_COMMIT 25
+#define MSG_MON_PAXOS 30
+
#define MSG_OSD_OP 40 // delete, etc.
#define MSG_OSD_OPREPLY 41 // delete, etc.
#define MSG_OSD_PING 42
// source/dest
entity_inst_t& get_dest_inst() { return env.dst; }
+ void set_dest_inst(entity_inst_t& inst) { env.dst = inst; }
+
entity_inst_t& get_source_inst() { return env.src; }
+ void set_source_inst(entity_inst_t& inst) { env.src = inst; }
entity_name_t& get_dest() { return env.dst.name; }
void set_dest(entity_name_t a, int p) { env.dst.name = a; env.dest_port = p; }
int get_dest_port() { return env.dest_port; }
+ void set_dest_port(int p) { env.dest_port = p; }
entity_name_t& get_source() { return env.src.name; }
void set_source(entity_name_t a, int p) { env.src.name = a; env.source_port = p; }
lock.Lock();
derr(0) << "got control-c, exiting" << endl;
::close(accepter.listen_sd);
- exit(-1);
+ _exit(-1);
lock.Unlock();
}
socklen_t llen = sizeof(rank.listen_addr);
getsockname(listen_sd, (sockaddr*)&rank.listen_addr, &llen);
- int myport = ntohs(rank.listen_addr.sin_port);
- dout(10) << "accepter.start bound to port " << myport << endl;
+ dout(10) << "accepter.start bound to " << rank.listen_addr << endl;
// listen!
rc = ::listen(listen_sd, 1000);
assert(rc >= 0);
- //dout(10) << "accepter.start listening on " << myport << endl;
// my address is... HELP HELP HELP!
char host[100];
// figure out my_addr
if (g_my_addr.port > 0) {
- // user specified it, easy.
+ // user specified it, easy peasy.
rank.my_addr = g_my_addr;
} else {
- // try to figure out what ip i can be reached out
- memset(&rank.listen_addr, 0, sizeof(rank.listen_addr));
-
// look up my hostname. blech! this sucks.
rank.listen_addr.sin_family = myhostname->h_addrtype;
memcpy((char *) &rank.listen_addr.sin_addr.s_addr,
myhostname->h_addr_list[0],
myhostname->h_length);
- rank.listen_addr.sin_port = htons(myport);
+
+ // set up my_addr with a nonce
rank.my_addr.set_addr(rank.listen_addr);
+ rank.my_addr.nonce = getpid(); // FIXME: pid might not be best choice here.
}
-
- // set a nonce
- rank.my_addr.nonce = getpid(); // FIXME: pid might not be best choice here.
-
+
dout(10) << "accepter.start my addr is " << rank.my_addr << endl;
// set up signal handler
for (list<Message*>::iterator k = j->second.begin();
k != j->second.end();
++k) {
- derr(1) << "pipe(" << peer_addr << ' ' << this << ").fail on " << **k << " to " << j->first << " inst " << peer_addr << endl;
- i->first->ms_handle_failure(*k, j->first, peer_addr);
+ derr(1) << "pipe(" << peer_addr << ' ' << this << ").fail on " << **k << " to " << (*k)->get_dest_inst() << endl;
+ i->first->ms_handle_failure(*k, (*k)->get_dest_inst());
}
}
// set envelope
m->set_source(get_myname(), fromport);
m->set_source_addr(rank.my_addr);
- m->set_dest(dest.name, port);
+ m->set_dest_inst(dest);
+ m->set_dest_port(port);
dout(1) << m->get_source()
<< " --> " << dest.name << " " << dest.addr
void set_addr(tcpaddr_t a) {
memcpy((char*)ipq, (char*)&a.sin_addr.s_addr, 4);
- port = a.sin_port;
+ port = ntohs(a.sin_port);
}
void make_addr(tcpaddr_t& a) const {
+ memset(&a, 0, sizeof(a));
a.sin_family = AF_INET;
memcpy((char*)&a.sin_addr.s_addr, (char*)ipq, 4);
- a.sin_port = port;
+ a.sin_port = htons(port);
}
};
#include <cassert>
#include <errno.h>
#include <dirent.h>
-//#include <sys/xattr.h>
+#include <sys/xattr.h>
//#include <sys/vfs.h>
#ifdef DARWIN
#include "config.h"
#undef dout
-#define dout(l) if (l<=g_conf.debug) cout << "osd" << whoami << ".fakestore "
+#define dout(l) if (l<=g_conf.debug) cout << g_clock.now() << " osd" << whoami << ".fakestore "
+#define derr(l) if (l<=g_conf.debug) cerr << g_clock.now() << " osd" << whoami << ".fakestore "
#include "include/buffer.h"
+int FakeStore::statfs(struct statfs *buf)
+{
+ return ::statfs(basedir.c_str(), buf);
+}
+/*
+ * sorry, these are sentitive to the object_t and coll_t typing.
+ */
+void FakeStore::get_oname(object_t oid, char *s)
+{
+ static hash<object_t> H;
+ assert(sizeof(oid) == 16);
+ sprintf(s, "%s/objects/%02x/%016llx.%016llx", basedir.c_str(), H(oid) & HASH_MASK,
+ *((__uint64_t*)&oid),
+ *(((__uint64_t*)&oid) + 1));
+}
-int FakeStore::mount()
+void FakeStore::get_cdir(coll_t cid, char *s)
+{
+ assert(sizeof(cid) == 8);
+ sprintf(s, "%s/collections/%016llx", basedir.c_str(),
+ cid);
+}
+
+void FakeStore::get_coname(coll_t cid, object_t oid, char *s)
+{
+ assert(sizeof(oid) == 16);
+ sprintf(s, "%s/collections/%016llx/%016llx.%016llx", basedir.c_str(), cid,
+ *((__uint64_t*)&oid),
+ *(((__uint64_t*)&oid) + 1));
+}
+
+
+
+
+int FakeStore::mkfs()
{
+ char cmd[200];
if (g_conf.fakestore_dev) {
dout(0) << "mounting" << endl;
- char cmd[100];
sprintf(cmd,"mount %s", g_conf.fakestore_dev);
system(cmd);
}
- string mydir;
- get_dir(mydir);
-
- dout(5) << "init with basedir " << mydir << endl;
-
- // make sure global base dir exists
- struct stat st;
- int r = ::stat(basedir.c_str(), &st);
- if (r != 0) {
- dout(1) << "unable to stat basedir " << basedir << ", r = " << r << endl;
- return r;
- }
+ dout(1) << "mkfs in " << basedir << endl;
- // all okay.
- return 0;
-}
+ // wipe
+ sprintf(cmd, "test -d %s && rm -r %s ; mkdir -p %s/collections && mkdir -p %s/objects",
+ basedir.c_str(), basedir.c_str(), basedir.c_str(), basedir.c_str());
+
+ dout(5) << "wipe: " << cmd << endl;
+ system(cmd);
-int FakeStore::umount()
-{
- dout(5) << "finalize" << endl;
+ // hashed bits too
+ for (int i=0; i<HASH_DIRS; i++) {
+ char s[4];
+ sprintf(s, "%02x", i);
+ string subdir = basedir + "/objects/" + s;
+ dout(15) << " creating " << subdir << endl;
+ int r = ::mkdir(subdir.c_str(), 0755);
+ if (r != 0) {
+ derr(0) << "couldnt create subdir, r = " << r << endl;
+ return r;
+ }
+ }
+
if (g_conf.fakestore_dev) {
char cmd[100];
dout(0) << "umounting" << endl;
sprintf(cmd,"umount %s", g_conf.fakestore_dev);
- system(cmd);
+ //system(cmd);
}
- // nothing
- return 0;
-}
-
-
-int FakeStore::statfs(struct statfs *buf)
-{
- string mydir;
- get_dir(mydir);
- return ::statfs(mydir.c_str(), buf);
-}
-
-
-
+ dout(1) << "mkfs done in " << basedir << endl;
-void FakeStore::get_dir(string& dir) {
- char s[30];
- sprintf(s, "%d", whoami);
- dir = basedir + "/" + s;
-}
-void FakeStore::get_oname(object_t oid, string& fn) {
- char s[100];
- static hash<object_t> H;
- sprintf(s, "%d/%02x/%016llx.%08x.%d", whoami, H(oid) & HASH_MASK, oid.ino, oid.bno, oid.rev);
- fn = basedir + "/" + s;
- // dout(1) << "oname is " << fn << endl;
-}
-
-
-
-void FakeStore::wipe_dir(string mydir)
-{
- DIR *dir = ::opendir(mydir.c_str());
- if (dir) {
- dout(10) << "wiping " << mydir << endl;
- struct dirent *ent = 0;
-
- while ((ent = ::readdir(dir)) != 0) {
- if (ent->d_name[0] == '.') continue;
- dout(25) << "mkfs unlinking " << ent->d_name << endl;
- string fn = mydir + "/" + ent->d_name;
- ::unlink(fn.c_str());
- }
-
- ::closedir(dir);
- } else {
- dout(1) << "mkfs couldn't read dir " << mydir << endl;
- }
+ return 0;
}
-int FakeStore::mkfs()
+int FakeStore::mount()
{
if (g_conf.fakestore_dev) {
dout(0) << "mounting" << endl;
char cmd[100];
sprintf(cmd,"mount %s", g_conf.fakestore_dev);
- system(cmd);
+ //system(cmd);
}
-
- int r = 0;
+ dout(5) << "basedir " << basedir << endl;
+
+ // make sure global base dir exists
struct stat st;
- string mydir;
- get_dir(mydir);
-
- dout(1) << "mkfs in " << mydir << endl;
-
-
- // make sure my dir exists
- r = ::stat(mydir.c_str(), &st);
+ int r = ::stat(basedir.c_str(), &st);
if (r != 0) {
- dout(10) << "creating " << mydir << endl;
- mkdir(mydir.c_str(), 0755);
- r = ::stat(mydir.c_str(), &st);
- if (r != 0) {
- dout(1) << "couldnt create dir, r = " << r << endl;
- return r;
- }
+ derr(0) << "unable to stat basedir " << basedir << ", r = " << r << endl;
+ return r;
+ }
+
+ if (g_conf.fakestore_fake_collections) {
+ dout(0) << "faking collections (in memory)" << endl;
+ fake_collections = true;
}
- else wipe_dir(mydir);
- // hashed bits too
- for (int i=0; i<HASH_DIRS; i++) {
- char s[4];
- sprintf(s, "%02x", i);
- string subdir = mydir + "/" + s;
- r = ::stat(subdir.c_str(), &st);
- if (r != 0) {
- dout(2) << " creating " << subdir << endl;
- ::mkdir(subdir.c_str(), 0755);
- r = ::stat(subdir.c_str(), &st);
- if (r != 0) {
- dout(1) << "couldnt create subdir, r = " << r << endl;
- return r;
- }
+ // fake attrs?
+ // let's test to see if they work.
+ if (g_conf.fakestore_fake_attrs) {
+ dout(0) << "faking attrs (in memory)" << endl;
+ fake_attrs = true;
+ } else {
+ char names[1000];
+ r = ::listxattr(basedir.c_str(), names, 1000);
+ if (r < 0) {
+ derr(0) << "xattrs don't appear to work (" << strerror(errno) << "), specify --fakestore_fake_attrs to fake them (in memory)." << endl;
+ assert(0);
}
- else
- wipe_dir( subdir );
}
+
+ // all okay.
+ return 0;
+}
+
+int FakeStore::umount()
+{
+ dout(5) << "umount " << basedir << endl;
+ sync();
+
if (g_conf.fakestore_dev) {
char cmd[100];
dout(0) << "umounting" << endl;
sprintf(cmd,"umount %s", g_conf.fakestore_dev);
- system(cmd);
+ //system(cmd);
}
- dout(1) << "mkfs done in " << mydir << endl;
-
- return r;
+ // nothing
+ return 0;
}
+// --------------------
+// objects
+
bool FakeStore::exists(object_t oid)
{
struct stat *st)
{
dout(20) << "stat " << oid << endl;
- string fn;
+ char fn[200];
get_oname(oid,fn);
- int r = ::stat(fn.c_str(), st);
+ int r = ::stat(fn, st);
return r;
}
int FakeStore::remove(object_t oid, Context *onsafe)
{
dout(20) << "remove " << oid << endl;
- string fn;
+ char fn[200];
get_oname(oid,fn);
- int r = ::unlink(fn.c_str());
+ int r = ::unlink(fn);
if (onsafe) sync(onsafe);
return r;
}
{
dout(20) << "truncate " << oid << " size " << size << endl;
- string fn;
+ char fn[200];
get_oname(oid,fn);
- int r = ::truncate(fn.c_str(), size);
+ int r = ::truncate(fn, size);
if (onsafe) sync(onsafe);
return r;
}
bufferlist& bl) {
dout(20) << "read " << oid << " len " << len << " off " << offset << endl;
- string fn;
+ char fn[200];
get_oname(oid,fn);
- int fd = ::open(fn.c_str(), O_RDONLY);
+ int fd = ::open(fn, O_RDONLY);
if (fd < 0) {
- dout(10) << "read couldn't open " << fn.c_str() << " errno " << errno << " " << strerror(errno) << endl;
+ dout(10) << "read couldn't open " << fn << " errno " << errno << " " << strerror(errno) << endl;
return fd;
}
::flock(fd, LOCK_EX); // lock for safety
if (len == 0) {
struct stat st;
- fstat(fd, &st);
+ ::fstat(fd, &st);
len = st.st_size;
}
{
dout(20) << "write " << oid << " len " << len << " off " << offset << endl;
- string fn;
+ char fn[200];
get_oname(oid,fn);
- ::mknod(fn.c_str(), 0644, 0); // in case it doesn't exist yet.
+ ::mknod(fn, 0644, 0); // in case it doesn't exist yet.
int flags = O_WRONLY;//|O_CREAT;
- int fd = ::open(fn.c_str(), flags);
+ int fd = ::open(fn, flags);
if (fd < 0) {
- dout(1) << "write couldn't open " << fn.c_str() << " flags " << flags << " errno " << errno << " " << strerror(errno) << endl;
+ derr(0) << "write couldn't open " << fn << " flags " << flags << " errno " << errno << " " << strerror(errno) << endl;
return fd;
}
+ ::fchmod(fd, 0664);
::flock(fd, LOCK_EX); // lock for safety
- //::fchmod(fd, 0664);
// seek
- off_t actual = lseek(fd, offset, SEEK_SET);
+ off_t actual = ::lseek(fd, offset, SEEK_SET);
int did = 0;
assert(actual == offset);
if (r > 0)
did += r;
else {
- dout(1) << "couldn't write to " << fn.c_str() << " len " << len << " off " << offset << " errno " << errno << " " << strerror(errno) << endl;
+ derr(0) << "couldn't write to " << fn << " len " << len << " off " << offset << " errno " << errno << " " << strerror(errno) << endl;
}
}
if (did < 0) {
- dout(1) << "couldn't write to " << fn.c_str() << " len " << len << " off " << offset << " errno " << errno << " " << strerror(errno) << endl;
+ derr(0) << "couldn't write to " << fn << " len " << len << " off " << offset << " errno " << errno << " " << strerror(errno) << endl;
}
::flock(fd, LOCK_UN);
class C_FakeSync : public Context {
-public:
Context *c;
int *n;
- C_FakeSync(Context *c_, int *n_) : c(c_), n(n_) {
+ Mutex *lock;
+ Cond *cond;
+
+public:
+ C_FakeSync(Context *c_, int *n_, Mutex *lo, Cond *co) :
+ c(c_), n(n_),
+ lock(lo), cond(co) {
+ lock->Lock();
++*n;
+ lock->Unlock();
}
void finish(int r) {
c->finish(r);
+
+ lock->Lock();
--(*n);
- //cout << "sync, " << *n << " still unsync" << endl;
+ if (*n == 0) cond->Signal();
+ lock->Unlock();
}
};
+void FakeStore::sync()
+{
+ synclock.Lock();
+ while (unsync > 0) {
+ dout(0) << "sync waiting for " << unsync << " items to (fake) sync" << endl;
+ synccond.Wait(synclock);
+ }
+ synclock.Unlock();
+}
+
void FakeStore::sync(Context *onsafe)
{
if (g_conf.fakestore_fake_sync) {
g_timer.add_event_after((float)g_conf.fakestore_fake_sync,
- new C_FakeSync(onsafe, &unsync));
+ new C_FakeSync(onsafe, &unsync, &synclock, &synccond));
} else {
assert(0); // der..no implemented anymore
}
+// -------------------------------
+// attributes
+
+// objects
+
+int FakeStore::setattr(object_t oid, const char *name,
+ const void *value, size_t size,
+ Context *onsafe)
+{
+ if (fake_attrs) return attrs.setattr(oid, name, value, size, onsafe);
+
+ char fn[100];
+ get_oname(oid, fn);
+ int r = ::setxattr(fn, name, value, size, 0);
+ return r;
+}
+
+int FakeStore::setattrs(object_t oid, map<string,bufferptr>& aset)
+{
+ if (fake_attrs) return attrs.setattrs(oid, aset);
+
+ char fn[100];
+ get_oname(oid, fn);
+ int r = 0;
+ for (map<string,bufferptr>::iterator p = aset.begin();
+ p != aset.end();
+ ++p) {
+ r = ::setxattr(fn, p->first.c_str(), p->second.c_str(), p->second.length(), 0);
+ if (r < 0) break;
+ }
+ return r;
+}
+
+int FakeStore::getattr(object_t oid, const char *name,
+ void *value, size_t size)
+{
+ if (fake_attrs) return attrs.getattr(oid, name, value, size);
+ char fn[100];
+ get_oname(oid, fn);
+ int r = ::getxattr(fn, name, value, size);
+ return r;
+}
+
+int FakeStore::getattrs(object_t oid, map<string,bufferptr>& aset)
+{
+ if (fake_attrs) return attrs.getattrs(oid, aset);
+
+ char fn[100];
+ get_oname(oid, fn);
+
+ char val[1000];
+ char names[1000];
+ int num = ::listxattr(fn, names, 1000);
+
+ char *name = names;
+ for (int i=0; i<num; i++) {
+ dout(0) << "getattrs " << oid << " getting " << (i+1) << "/" << num << " '" << names << "'" << endl;
+ int l = ::getxattr(fn, name, val, 1000);
+ dout(0) << "getattrs " << oid << " getting " << (i+1) << "/" << num << " '" << names << "' = " << l << " bytes" << endl;
+ aset[names].append(val, l);
+ name += strlen(name) + 1;
+ }
+
+ return 0;
+}
+
+int FakeStore::rmattr(object_t oid, const char *name, Context *onsafe)
+{
+ if (fake_attrs) return attrs.rmattr(oid, name, onsafe);
+ char fn[100];
+ get_oname(oid, fn);
+ int r = ::removexattr(fn, name);
+ return r;
+}
+
+/*
+int FakeStore::listattr(object_t oid, char *attrls, size_t size)
+{
+ if (fake_attrs) return attrs.listattr(oid, attrls, size);
+ char fn[100];
+ get_oname(oid, fn);
+ return ::listxattr(fn, attrls, size);
+}
+*/
+
+
+// collections
+
+int FakeStore::collection_setattr(coll_t c, const char *name,
+ void *value, size_t size,
+ Context *onsafe)
+{
+ if (fake_attrs) return attrs.collection_setattr(c, name, value, size, onsafe);
+ return 0;
+}
+
+int FakeStore::collection_rmattr(coll_t c, const char *name,
+ Context *onsafe)
+{
+ if (fake_attrs) return attrs.collection_rmattr(c, name, onsafe);
+ return 0;
+}
+
+int FakeStore::collection_getattr(coll_t c, const char *name,
+ void *value, size_t size)
+{
+ if (fake_attrs) return attrs.collection_getattr(c, name, value, size);
+ return 0;
+}
+
+/*
+int FakeStore::collection_listattr(coll_t c, char *attrs, size_t size)
+{
+ if (fake_attrs) return collection_listattr(c, attrs, size);
+ return 0;
+}
+*/
+
+// --------------------------
+// collections
+
+int FakeStore::list_collections(list<coll_t>& ls)
+{
+ if (fake_collections) return collections.list_collections(ls);
+
+ char fn[200];
+ sprintf(fn, "%s/collections", basedir.c_str());
+
+ DIR *dir = ::opendir(fn);
+ assert(dir);
+
+ struct dirent *de;
+ while ((de = ::readdir(dir)) != 0) {
+ // parse
+ coll_t c = strtoll(de->d_name, 0, 16);
+ dout(0) << " got " << c << " errno " << errno << " on " << de->d_name << endl;
+ if (errno) continue;
+ ls.push_back(c);
+ }
+
+ ::closedir(dir);
+ return 0;
+}
+
+int FakeStore::create_collection(coll_t c,
+ Context *onsafe)
+{
+ if (fake_collections) return collections.create_collection(c, onsafe);
+
+ char fn[200];
+ get_cdir(c, fn);
+
+ int r = ::mkdir(fn, 0755);
+
+ if (onsafe) sync(onsafe);
+ return r;
+}
+
+int FakeStore::destroy_collection(coll_t c,
+ Context *onsafe)
+{
+ if (fake_collections) return collections.destroy_collection(c, onsafe);
+
+ char fn[200];
+ get_cdir(c, fn);
+ char cmd[200];
+ sprintf(cmd, "test -d %s && rm -r %s", fn, fn);
+ system(cmd);
+
+ if (onsafe) sync(onsafe);
+ return 0;
+}
+
+int FakeStore::collection_stat(coll_t c, struct stat *st)
+{
+ if (fake_collections) return collections.collection_stat(c, st);
+
+ char fn[200];
+ get_cdir(c, fn);
+ return ::lstat(fn, st);
+}
+
+bool FakeStore::collection_exists(coll_t c)
+{
+ if (fake_collections) return collections.collection_exists(c);
+
+ struct stat st;
+ return collection_stat(c, &st) == 0;
+}
+
+
+int FakeStore::collection_add(coll_t c, object_t o,
+ Context *onsafe)
+{
+ if (fake_collections) return collections.collection_add(c, o, onsafe);
+
+ char cof[200];
+ get_coname(c, o, cof);
+ char of[200];
+ get_oname(o, of);
+
+ int r = ::link(of, cof);
+ if (onsafe) sync(onsafe);
+ return r;
+}
+
+int FakeStore::collection_remove(coll_t c, object_t o,
+ Context *onsafe)
+{
+ if (fake_collections) return collections.collection_remove(c, o, onsafe);
+
+ char cof[200];
+ get_coname(c, o, cof);
+
+ int r = ::unlink(cof);
+ if (onsafe) sync(onsafe);
+ return r;
+}
+
+int FakeStore::collection_list(coll_t c, list<object_t>& ls)
+{
+ if (fake_collections) return collections.collection_list(c, ls);
+
+ char fn[200];
+ get_cdir(c, fn);
+
+ DIR *dir = ::opendir(fn);
+ assert(dir);
+
+ struct dirent *de;
+ while ((de = ::readdir(dir)) != 0) {
+ // parse
+ object_t o;
+ assert(sizeof(o) == 16);
+ *(((__uint64_t*)&o) + 0) = strtoll(de->d_name, 0, 16);
+ assert(de->d_name[16] == '.');
+ *(((__uint64_t*)&o) + 1) = strtoll(de->d_name+17, 0, 16);
+ dout(0) << " got " << o << " errno " << errno << " on " << de->d_name << endl;
+ if (errno) continue;
+ ls.push_back(o);
+ }
+
+ ::closedir(dir);
+ return 0;
+}
+// eof.
// fake attributes in memory, if we need to.
-
-class FakeStore : public ObjectStore,
- public FakeStoreAttrs,
- public FakeStoreCollections {
+class FakeStore : public ObjectStore {
string basedir;
int whoami;
-
- int unsync;
- Mutex lock;
+ Mutex synclock;
+ Cond synccond;
+ int unsync;
- // fns
- void get_dir(string& dir);
- void get_oname(object_t oid, string& fn);
- void wipe_dir(string mydir);
+ // fake attrs?
+ FakeStoreAttrs attrs;
+ bool fake_attrs;
+ // fake collections?
+ FakeStoreCollections collections;
+ bool fake_collections;
+
+ // helper fns
+ void get_oname(object_t oid, char *s);
+ void get_cdir(coll_t cid, char *s);
+ void get_coname(coll_t cid, object_t oid, char *s);
public:
- FakeStore(char *base, int whoami) : FakeStoreAttrs(this), FakeStoreCollections(this)
- {
- this->basedir = base;
- this->whoami = whoami;
- unsync = 0;
- }
-
+ FakeStore(char *base, int w) :
+ basedir(base),
+ whoami(w),
+ unsync(0),
+ attrs(this), fake_attrs(false),
+ collections(this), fake_collections(false) { }
int mount();
int umount();
int stat(object_t oid, struct stat *st);
int remove(object_t oid, Context *onsafe);
int truncate(object_t oid, off_t size, Context *onsafe);
- int read(object_t oid,
- off_t offset, size_t len,
- bufferlist& bl);
- int write(object_t oid,
- off_t offset, size_t len,
- bufferlist& bl,
- Context *onsafe);
+ int read(object_t oid, off_t offset, size_t len, bufferlist& bl);
+ int write(object_t oid, off_t offset, size_t len, bufferlist& bl, Context *onsafe);
+ void sync();
void sync(Context *onsafe);
+
+ // attrs
+ int setattr(object_t oid, const char *name, const void *value, size_t size, Context *onsafe=0);
+ int setattrs(object_t oid, map<string,bufferptr>& aset);
+ int getattr(object_t oid, const char *name, void *value, size_t size);
+ int getattrs(object_t oid, map<string,bufferptr>& aset);
+ int rmattr(object_t oid, const char *name, Context *onsafe=0);
+ //int listattr(object_t oid, char *attrs, size_t size);
+ int collection_setattr(coll_t c, const char *name, void *value, size_t size, Context *onsafe=0);
+ int collection_rmattr(coll_t c, const char *name, Context *onsafe=0);
+ int collection_getattr(coll_t c, const char *name, void *value, size_t size);
+ //int collection_listattr(coll_t c, char *attrs, size_t size);
+
+
+ // collections
+ int list_collections(list<coll_t>& ls);
+ int create_collection(coll_t c, Context *onsafe=0);
+ int destroy_collection(coll_t c, Context *onsafe=0);
+ int collection_stat(coll_t c, struct stat *st);
+ bool collection_exists(coll_t c);
+ int collection_add(coll_t c, object_t o, Context *onsafe=0);
+ int collection_remove(coll_t c, object_t o, Context *onsafe=0);
+ int collection_list(coll_t c, list<object_t>& o);
+
};
#endif
}
#endif // USE_OSBDB
else {
- store = new FakeStore(osd_base_path, whoami);
+ sprintf(dev_path, "osddata/osd%d", whoami);
+ store = new FakeStore(dev_path, whoami);
}
}
p++)
t.remove(*p);
t.remove_collection(pgid);
- t.remove(object_t(1,pgid)); // log too
+ t.remove(pgid.to_object()); // log too
}
store->apply_transaction(t);
entity_name_t dest = inst.name;
if (g_conf.ms_die_on_failure) {
+ dout(0) << "ms_handle_failure " << inst << " on " << *m << endl;
exit(0);
}
//cerr << "osdmap " << osdmap->get_ctime() << " logger start " << logger->get_start() << endl;
logger->set_start( osdmap->get_ctime() );
+ assert(g_conf.osd_mkfs); // make sure we did a mkfs!
+
// create PGs
for (int nrep = 1;
nrep <= MIN(g_conf.num_osd, g_conf.osd_max_rep); // for low osd counts.. hackish bleh
assert(pg->missing.num_lost() == 0);
// ok activate!
- pg->activate(t);
+ pg->activate(t);
}
unsigned tr = store->apply_transaction(t);
return -1;
}
- virtual int listattr(object_t oid, char *attrs, size_t size) {return 0;} //= 0;
+ //virtual int listattr(object_t oid, char *attrs, size_t size) {return 0;} //= 0;
// collections
virtual int list_collections(list<coll_t>& ls) {return 0;}//= 0;
Context *onsafe=0) {return 0;} //= 0;
virtual int collection_getattr(coll_t cid, const char *name,
void *value, size_t size) {return 0;} //= 0;
- virtual int collection_listattr(coll_t cid, char *attrs, size_t size) {return 0;} //= 0;
+ //virtual int collection_listattr(coll_t cid, char *attrs, size_t size) {return 0;} //= 0;
- virtual void sync(Context *onsync) {};
- virtual void sync() {};
+ virtual void sync(Context *onsync) {}
+ virtual void sync() {}
virtual void _fake_writes(bool b) {};
state_set(STATE_ACTIVE);
state_clear(STATE_STRAY);
if (is_crashed()) {
- assert(is_replay());
+ //assert(is_replay()); // HELP.. not on replica?
state_clear(STATE_CRASHED);
state_clear(STATE_REPLAY);
}
void PG::write_log(ObjectStore::Transaction& t)
{
+ dout(10) << "write_log" << endl;
+
// assemble buffer
bufferlist bl;
if (bl.length() % 4096 == 0)
ondisklog.block_map[bl.length()] = p->version;
bl.append((char*)&(*p), sizeof(*p));
+ if (g_conf.osd_pad_pg_log) { // pad to 4k, until i fix ebofs reallocation crap. FIXME.
+ bufferptr bp(4096 - sizeof(*p));
+ bl.push_back(bp);
+ }
}
ondisklog.top = bl.length();
// write it
- t.remove( object_t(1,info.pgid) );
- t.write( object_t(1,info.pgid) , 0, bl.length(), bl);
+ t.remove( info.pgid.to_object() );
+ t.write( info.pgid.to_object() , 0, bl.length(), bl);
t.collection_setattr(info.pgid, "ondisklog_bottom", &ondisklog.bottom, sizeof(ondisklog.bottom));
t.collection_setattr(info.pgid, "ondisklog_top", &ondisklog.top, sizeof(ondisklog.top));
void PG::append_log(ObjectStore::Transaction& t, PG::Log::Entry& logentry,
eversion_t trim_to)
{
+ dout(10) << "append_log " << ondisklog.top << " " << logentry << endl;
+
// write entry on disk
bufferlist bl;
bl.append( (char*)&logentry, sizeof(logentry) );
bufferptr bp(4096 - sizeof(logentry));
bl.push_back(bp);
}
- t.write( object_t(1,info.pgid), ondisklog.top, bl.length(), bl );
+ t.write( info.pgid.to_object(), ondisklog.top, bl.length(), bl );
// update block map?
if (ondisklog.top % 4096 == 0)
void PG::read_log(ObjectStore *store)
{
+ int r;
// load bounds
ondisklog.bottom = ondisklog.top = 0;
- store->collection_getattr(info.pgid, "ondisklog_bottom", &ondisklog.bottom, sizeof(ondisklog.bottom));
- store->collection_getattr(info.pgid, "ondisklog_top", &ondisklog.top, sizeof(ondisklog.top));
-
+ r = store->collection_getattr(info.pgid, "ondisklog_bottom", &ondisklog.bottom, sizeof(ondisklog.bottom));
+ assert(r == sizeof(ondisklog.bottom));
+ r = store->collection_getattr(info.pgid, "ondisklog_top", &ondisklog.top, sizeof(ondisklog.top));
+ assert(r == sizeof(ondisklog.top));
+
+ dout(10) << "read_log [" << ondisklog.bottom << "," << ondisklog.top << ")" << endl;
+
log.backlog = info.log_backlog;
log.bottom = info.log_bottom;
if (ondisklog.top > 0) {
// read
bufferlist bl;
- store->read(object_t(1,info.pgid), ondisklog.bottom, ondisklog.top-ondisklog.bottom, bl);
+ store->read(info.pgid.to_object(), ondisklog.bottom, ondisklog.top-ondisklog.bottom, bl);
PG::Log::Entry e;
off_t pos = ondisklog.bottom;
+ assert(log.log.empty());
while (pos < ondisklog.top) {
bl.copy(pos-ondisklog.bottom, sizeof(e), (char*)&e);
+ dout(10) << "read_log " << pos << " " << e << endl;
+
if (e.version > log.bottom || log.backlog) { // ignore items below log.bottom
if (pos % 4096 == 0)
- ondisklog.block_map[pos] = e.version;
+ ondisklog.block_map[pos] = e.version;
log.log.push_back(e);
+ } else {
+ dout(10) << "read_log ignoring entry at " << pos << endl;
}
- pos += sizeof(e);
+ if (g_conf.osd_pad_pg_log) // pad to 4k, until i fix ebofs reallocation crap. FIXME.
+ pos += 4096;
+ else
+ pos += sizeof(e);
}
}
log.top = info.last_update;
inline ostream& operator<<(ostream& out, const PG::Info& pgi)
{
- out << "pginfo(" << hex << pgi.pgid << dec;
+ out << "pginfo(" << pgi.pgid;
if (pgi.is_empty())
out << " empty";
else
!pg.log.backlog) ||
(pg.log.log.rbegin()->version.version != pg.log.top.version)) {
out << " (log bound mismatch, actual=["
- << pg.log.log.begin()->version << ","
- << pg.log.log.rbegin()->version << "])";
+ << pg.log.log.begin()->version << ","
+ << pg.log.log.rbegin()->version << "] len=" << pg.log.log.size() << ")";
}
}
#include "include/reqid.h"
+#define PG_INO 1
+
+
// osd types
typedef __uint64_t coll_t; // collection id
-
// pg stuff
typedef __uint16_t ps_t;
typedef __uint8_t pruleset_t;
struct pg_t {
union {
struct {
- int preferred;
- ps_t ps;
- __uint8_t nrep;
- pruleset_t ruleset;
+ __uint32_t preferred:32; // 32
+ ps_t ps:16; // 16
+ __uint8_t nrep:8; // 8
+ pruleset_t ruleset:8; // 8
} fields;
- __uint64_t val;
+ __uint64_t val; // 64
} u;
+
pg_t() { u.val = 0; }
pg_t(const pg_t& o) { u.val = o.u.val; }
pg_t(ps_t s, int p, unsigned char n, pruleset_t r=0) {
pg_t operator++() { ++u.val; return *this; }
*/
operator __uint64_t() const { return u.val; }
+
+ object_t to_object() const { return object_t(PG_INO, u.val >> 32, u.val & 0xffffffff); }
};
inline ostream& operator<<(ostream& out, pg_t pg) {
if (pg.u.fields.preferred)
out << pg.u.fields.preferred << '.';
out << hex << pg.u.fields.ps << dec;
+ out << "=" << hex << pg.u.val << dec;
+ out << "=" << hex << (__uint64_t)pg << dec;
return out;
}