mkmonmap: mkmonmap.cc common.o
${CC} ${CFLAGS} ${LIBS} $^ -o $@
-cmon: cmon.cc mon.o ebofs.o msg/SimpleMessenger.o common.o
+cmon: cmon.cc mon.o msg/SimpleMessenger.o common.o
${CC} ${CFLAGS} ${LIBS} $^ -o $@
cosd: cosd.cc osd.o ebofs.o msg/SimpleMessenger.o common.o
#include "osd/OSDMap.h"
-#include "ebofs/Ebofs.h"
+#include "MonitorStore.h"
#include "msg/Message.h"
#include "msg/Messenger.h"
// store
char s[80];
- sprintf(s, "dev/mon%d", whoami);
- store = new Ebofs(s);
+ sprintf(s, "mondata/mon%d", whoami);
+ store = new MonitorStore(s);
if (g_conf.mkfs)
store->mkfs();
- int r = store->mount();
- assert(r >= 0);
// create
osdmon = new OSDMonitor(this, messenger, lock);
cancel_tick();
- if (store) {
- store->umount();
+ if (store)
delete store;
- }
// stop osds.
for (set<int>::iterator it = osdmon->osdmap.get_osds().begin();
#include "Paxos.h"
-class ObjectStore;
+class MonitorStore;
class OSDMonitor;
class MDSMonitor;
class ClientMonitor;
friend class C_Mon_Tick;
// my local store
- ObjectStore *store;
+ //ObjectStore *store;
+ MonitorStore *store;
const static int INO_ELECTOR = 1;
const static int INO_MON_MAP = 2;
elector(this, w),
mon_epoch(0),
- test_paxos(this, w, 0), // machine 0 == test paxos
+ test_paxos(this, w, 0, "tester"), // machine 0 == tester paxos
state(STATE_STARTING),
leader(0),
#define derr(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cerr << g_clock.now() << " store(" << dir <<") "
#include <stdio.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
void MonitorStore::init()
{
+ dout(1) << "init" << endl;
// verify dir exists
- DIR *d = ::opendir(dir);
+ DIR *d = ::opendir(dir.c_str());
if (!d) {
- derr(1) << "basedir " << dir << " dne" << endl;
- assert(0);
+ derr(1) << "basedir " << dir << " dne" << endl;
+ assert(0);
}
::closedir(d);
}
+void MonitorStore::mkfs()
+{
+ dout(1) << "mkfs" << endl;
+
+ char cmd[200];
+ sprintf(cmd, "/bin/rm -rf %s/*", dir.c_str());
+ dout(1) << cmd << endl;
+ //system(cmd);
+}
+
+
version_t MonitorStore::get_int(const char *a, const char *b)
{
char fn[200];
if (b)
- sprintf(fn, "%s/%s/%s", dir, a, b);
+ sprintf(fn, "%s/%s/%s", dir.c_str(), a, b);
else
- sprintf(fn, "%s/%s", dir, a);
-
+ sprintf(fn, "%s/%s", dir.c_str(), a);
+
FILE *f = ::fopen(fn, "r");
if (!f)
- return 0;
-
+ return 0;
+
char buf[20];
::fgets(buf, 20, f);
::fclose(f);
-
+
version_t val = atoi(buf);
-
+
if (b) {
- dout(10) << "get_int " << a << "/" << b << " = " << val << endl;
+ dout(10) << "get_int " << a << "/" << b << " = " << val << endl;
} else {
- dout(10) << "get_int " << a << " = " << val << endl;
+ dout(10) << "get_int " << a << " = " << val << endl;
}
return val;
}
-void MonitorStore::set_int(version_t val, const char *a, const char *b)
+void MonitorStore::put_int(version_t val, const char *a, const char *b)
{
char fn[200];
+ sprintf(fn, "%s/%s", dir.c_str(), a);
if (b) {
- dout(10) << "set_int " << a << "/" << b << " = " << val << endl;
- sprintf(fn, "%s/%s/%s", dir, a, b);
+ ::mkdir(fn, 0755);
+ dout(10) << "set_int " << a << "/" << b << " = " << val << endl;
+ sprintf(fn, "%s/%s/%s", dir.c_str(), a, b);
} else {
- dout(10) << "set_int " << a << " = " << val << endl;
- sprintf(fn, "%s/%s", dir, a);
+ dout(10) << "set_int " << a << " = " << val << endl;
}
- FILE *f = ::fopen(fn, "w");
+ char tfn[200];
+ sprintf(tfn, "%s.new", fn);
+ FILE *f = ::fopen(tfn, "w");
assert(f);
::fprintf(f, "%lld\n", val);
::fclose(f);
+ ::rename(tfn, fn);
+}
+
+
+// ----------------------------------------
+// buffers
+
+bool MonitorStore::exists_bl(const char *a, const char *b)
+{
+ char fn[200];
+ if (b) {
+ dout(10) << "exists_bl " << a << "/" << b << endl;
+ sprintf(fn, "%s/%s/%s", dir.c_str(), a, b);
+ } else {
+ dout(10) << "exists_bl " << a << endl;
+ sprintf(fn, "%s/%s", dir.c_str(), a);
+ }
+
+ struct stat st;
+ int r = ::stat(fn, &st);
+ return r == 0;
+}
+
+
+int MonitorStore::get_bl(bufferlist& bl, const char *a, const char *b)
+{
+ char fn[200];
+ if (b) {
+ sprintf(fn, "%s/%s/%s", dir.c_str(), a, b);
+ } else {
+ sprintf(fn, "%s/%s", dir.c_str(), a);
+ }
+
+ int fd = ::open(fn, O_RDONLY);
+ if (!fd) {
+ if (b) {
+ dout(10) << "get_bl " << a << "/" << b << " DNE" << endl;
+ } else {
+ dout(10) << "get_bl " << a << " DNE" << endl;
+ }
+ return 0;
+ }
+
+ // read size
+ __int32_t len = 0;
+ ::read(fd, &len, sizeof(len));
+
+ // read buffer
+ bl.clear();
+ bufferptr bp(len);
+ ::read(fd, bp.c_str(), len);
+ bl.append(bp);
+ ::close(fd);
+
+ if (b) {
+ dout(10) << "get_bl " << a << "/" << b << " = " << bl.length() << " bytes" << endl;
+ } else {
+ dout(10) << "get_bl " << a << " = " << bl.length() << " bytes" << endl;
+ }
+
+ return len;
+}
+
+int MonitorStore::put_bl(bufferlist& bl, const char *a, const char *b)
+{
+ char fn[200];
+ sprintf(fn, "%s/%s", dir.c_str(), a);
+ if (b) {
+ ::mkdir(fn, 0755);
+ dout(10) << "put_bl " << a << "/" << b << " = " << bl.length() << " bytes" << endl;
+ sprintf(fn, "%s/%s/%s", dir.c_str(), a, b);
+ } else {
+ dout(10) << "put_bl " << a << " = " << bl.length() << " bytes" << endl;
+ }
+
+ char tfn[200];
+ sprintf(tfn, "%s.new", fn);
+ int fd = ::open(tfn, O_WRONLY|O_CREAT);
+ assert(fd);
+
+ // write size
+ __int32_t len = bl.length();
+ ::write(fd, &len, sizeof(len));
+
+ // write data
+ for (list<bufferptr>::const_iterator it = bl.buffers().begin();
+ it != bl.buffers().end();
+ it++)
+ ::write(fd, it->c_str(), it->length());
+
+ ::fsync(fd);
+ ::close(fd);
+ ::rename(tfn, fn);
+
+ return 0;
}
#include "include/types.h"
#include "include/buffer.h"
-class MonitorStore {
- const char *dir;
-
- version_t get_int(const char *a, const char *b=0);
- void set_int(version_t v, const char *a, const char *b=0);
+#include <string.h>
- int get_bl(const char *nm, bufferlist& bl);
- int put_bl(const char *nm, bufferlist& bl);
+class MonitorStore {
+ string dir;
void init();
public:
- MonitorStore(const char *d) :
- dir(d) {
+ MonitorStore(char *d) : dir(d) {
init();
}
+ ~MonitorStore() {
+ }
+
+ void mkfs(); // wipe
+
+ // ints (stored as ascii)
+ version_t get_int(const char *a, const char *b=0);
+ void put_int(version_t v, const char *a, const char *b=0);
+
+ // buffers
+ bool exists_bl(const char *a, const char *b=0);
+ int get_bl(bufferlist& bl, const char *a, const char *b);
+ int put_bl(bufferlist& bl, const char *a, const char *b);
+ bool exists_bl(const char *a, unsigned b) {
+ char bs[16];
+ sprintf(bs, "%0u", b);
+ return exists_bl(a, bs);
+ }
+ int get_bl(bufferlist& bl, const char *a, version_t b) {
+ char bs[16];
+ sprintf(bs, "%0llu", b);
+ return get_bl(bl, a, bs);
+ }
+ int put_bl(bufferlist& bl, const char *a, version_t b) {
+ char bs[16];
+ sprintf(bs, "%0llu", b);
+ return put_bl(bl, a, bs);
+ }
+ /*
version_t get_incarnation() { return get_int("incarnation"); }
void set_incarnation(version_t i) { set_int(i, "incarnation"); }
version_t get_last_proposal() { return get_int("last_proposal"); }
void set_last_proposal(version_t i) { set_int(i, "last_proposal"); }
+ */
};
#include "Monitor.h"
#include "MDSMonitor.h"
-#include "osd/ObjectStore.h"
+#include "MonitorStore.h"
#include "messages/MOSDFailure.h"
#include "messages/MOSDMap.h"
bool OSDMonitor::get_map_bl(epoch_t epoch, bufferlist& bl)
{
- object_t oid(Monitor::INO_OSD_MAP, epoch);
- if (!mon->store->exists(oid))
+ if (!mon->store->exists_bl("osdmap", epoch))
return false;
- int r = mon->store->read(oid, 0, 0, bl);
+ int r = mon->store->get_bl(bl, "osdmap", epoch);
assert(r > 0);
return true;
}
bool OSDMonitor::get_inc_map_bl(epoch_t epoch, bufferlist& bl)
{
- object_t oid(Monitor::INO_OSD_INC_MAP, epoch);
- if (!mon->store->exists(oid))
+ if (!mon->store->exists_bl("osdincmap", epoch))
return false;
- int r = mon->store->read(oid, 0, 0, bl);
+ int r = mon->store->get_bl(bl, "osdincmap", epoch);
assert(r > 0);
return true;
}
bufferlist bl;
osdmap.encode(bl);
- ObjectStore::Transaction t;
- t.write(object_t(Monitor::INO_OSD_MAP,0), 0, bl.length(), bl);
- t.write(object_t(Monitor::INO_OSD_MAP,osdmap.get_epoch()), 0, bl.length(), bl);
- mon->store->apply_transaction(t);
- mon->store->sync();
+ mon->store->put_bl(bl, "osdmap", osdmap.get_epoch());
+ mon->store->put_int(osdmap.get_epoch(), "osd_epoch");
}
void OSDMonitor::save_inc_map(OSDMap::Incremental &inc)
bufferlist incbl;
inc.encode(incbl);
- ObjectStore::Transaction t;
- t.write(object_t(Monitor::INO_OSD_MAP,0), 0, bl.length(), bl);
- t.write(object_t(Monitor::INO_OSD_MAP,osdmap.get_epoch()), 0, bl.length(), bl); // not strictly needed??
- t.write(object_t(Monitor::INO_OSD_INC_MAP,osdmap.get_epoch()), 0, incbl.length(), incbl);
- mon->store->apply_transaction(t);
- mon->store->sync();
+ mon->store->put_bl(bl, "osdmap", osdmap.get_epoch());
+ mon->store->put_bl(incbl, "osdincmap", osdmap.get_epoch());
+ mon->store->put_int(osdmap.get_epoch(), "osd_epoch");
}
#include "Paxos.h"
#include "Monitor.h"
+#include "MonitorStore.h"
#include "messages/MMonPaxos.h"
#include "config.h"
#undef dout
-#define derr(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cerr << g_clock.now() << " mon" << whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".paxos(" << machine_id << ") "
-#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cout << g_clock.now() << " mon" << whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".paxos(" << machine_id << ") "
+#define derr(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cerr << g_clock.now() << " mon" << whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".paxos(" << machine_name << ") "
+#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cout << g_clock.now() << " mon" << whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".paxos(" << machine_name << ") "
// ---------------------------------
}
+/*
+ * return a globally unique, monotonically increasing proposal number
+ */
+version_t Paxos::get_new_proposal_number()
+{
+ // read, update, write
+ version_t last = mon->store->get_int("last_paxos_proposal");
+ last++;
+ mon->store->put_int(last, "last_paxos_proposal");
+
+ // make it unique among all monitors.
+ version_t pn = (100000000ULL * (version_t)whoami) + last;
+
+ dout(10) << "get_new_proposal_number = " << pn << endl;
+ return pn;
+}
+
// ---------------------------------
// accepter
// .. do something else too
- int who = 2;
- mon->messenger->send_message(new MMonPaxos(MMonPaxos::OP_COMMIT, machine_id, 12, 122),
- MSG_ADDR_MON(who), mon->monmap->get_inst(who));
+ //int who = 2;
+ //mon->messenger->send_message(new MMonPaxos(MMonPaxos::OP_COMMIT, machine_id, 12, 122),
+ //MSG_ADDR_MON(who), mon->monmap->get_inst(who));
}
// my state machine info
int machine_id;
+ const char *machine_name;
map<version_t, bufferlist> accepted_values;
map<version_t, int> accepted_proposal_number;
- version_t last_proposal_number;
-
// proposer
void propose(version_t v, bufferlist& value);
void handle_last(MMonPaxos*);
void handle_accept(MMonPaxos*);
+
+ version_t get_new_proposal_number();
// accepter
void handle_prepare(MMonPaxos*);
public:
Paxos(Monitor *m, int w,
- int mid) : mon(m), whoami(w), machine_id(mid) {
+ int mid,const char *mnm) : mon(m), whoami(w),
+ machine_id(mid), machine_name(mnm) {
}
void dispatch(Message *m);