ebofs/BlockDevice.o\
ebofs/BufferCache.o\
ebofs/Ebofs.o\
- ebofs/Allocator.o
+ ebofs/Allocator.o\
+ ebofs/FileJournal.o
MDS_OBJS= \
mds/MDS.o\
MON_OBJS= \
mon/Monitor.o\
mon/Paxos.o\
+ mon/PaxosService.o\
mon/OSDMonitor.o\
mon/MDSMonitor.o\
mon/ClientMonitor.o\
if (m->get_source().is_mds())
frommds = m->get_source().num();
- if (mdsmap == 0)
+ if (mdsmap == 0) {
mdsmap = new MDSMap;
- if (whoami < 0) {
- // mounted!
assert(m->get_source().is_mon());
whoami = m->get_dest().num();
dout(1) << "handle_mds_map i am now " << m->get_dest() << endl;
messenger->reset_myname(m->get_dest());
-
+
mount_cond.Signal(); // mount might be waiting for this.
}
// --- mon ---
mon_tick_interval: 5,
mon_osd_down_out_interval: 5, // seconds
- mon_lease: 2.000, // seconds
+ mon_lease: 5, // seconds
+ mon_lease_renew_interval: 3,
+ mon_lease_ack_timeout: 10.0,
+ mon_accept_timeout: 10.0,
mon_stop_with_last_mds: true,
// --- client ---
int mon_tick_interval;
int mon_osd_down_out_interval;
float mon_lease;
+ float mon_lease_renew_interval;
+ float mon_lease_ack_timeout;
+ float mon_accept_timeout;
bool mon_stop_with_last_mds;
// client
#include "Ebofs.h"
+#include "FileJournal.h"
+
#include <errno.h>
#ifndef DARWIN
ebofs_lock.Lock();
assert(!mounted);
+ // open dev
int r = dev.open(&idle_kicker);
if (r < 0) {
ebofs_lock.Unlock();
dout(3) << "mount epoch " << super_epoch << endl;
assert(super_epoch == sb->epoch);
+ super_fsid = sb->fsid;
+
free_blocks = sb->free_blocks;
limbo_blocks = sb->limbo_blocks;
allocator.release_limbo();
+
+ // open journal?
+ if (journalfn) {
+ journal = new FileJournal(this, journalfn);
+ if (journal->open() < 0) {
+ dout(-3) << "mount journal " << journalfn << " open failed" << endl;
+ delete journal;
+ journal = 0;
+ } else {
+ dout(-3) << "mount journal " << journalfn << " opened, replaying" << endl;
+
+ while (1) {
+ bufferlist bl;
+ epoch_t e;
+ if (!journal->read_entry(bl, e)) {
+ dout(-3) << "mount replay: end of journal, done." << endl;
+ break;
+ }
+
+ if (e < super_epoch) {
+ dout(-3) << "mount replay: skipping old entry in epoch " << e << " < " << super_epoch << endl;
+ }
+ if (e == super_epoch+1) {
+ super_epoch++;
+ dout(-3) << "mount replay: jumped to next epoch " << super_epoch << endl;
+ }
+ assert(e == super_epoch);
+
+ dout(-3) << "mount replay: applying transaction in epoch " << e << endl;
+ Transaction t;
+ int off = 0;
+ t._decode(bl, off);
+ _apply_transaction(t);
+ }
+ }
+ }
+
dout(3) << "mount starting commit+finisher threads" << endl;
commit_thread.create();
finisher_thread.create();
dout(1) << "mounted " << dev.get_device_name() << " " << dev.get_num_blocks() << " blocks, " << nice_blocks(dev.get_num_blocks()) << endl;
mounted = true;
+
ebofs_lock.Unlock();
return 0;
}
block_t num_blocks = dev.get_num_blocks();
+ // make a super-random fsid
+ srand(time(0) ^ getpid());
+ super_fsid = (lrand48() << 32) ^ mrand48();
+
free_blocks = 0;
limbo_blocks = 0;
dev.close();
+
+ // create journal?
+ if (journalfn) {
+ journal = new FileJournal(this, journalfn);
+ if (journal->create() < 0) {
+ dout(3) << "mount journal " << journalfn << " created failed" << endl;
+ delete journal;
+ } else {
+ dout(3) << "mount journal " << journalfn << " created" << endl;
+ }
+ }
+
dout(2) << "mkfs: " << dev.get_device_name() << " " << dev.get_num_blocks() << " blocks, " << nice_blocks(dev.get_num_blocks()) << endl;
ebofs_lock.Unlock();
return 0;
// fill in super
memset(&sb, 0, sizeof(sb));
sb.s_magic = EBOFS_MAGIC;
+ sb.fsid = super_fsid;
sb.epoch = epoch;
sb.num_blocks = dev.get_num_blocks();
<< ", max dirty " << g_conf.ebofs_bc_max_dirty
<< endl;
+ if (journal) journal->commit_epoch_start();
// (async) write onodes+condes (do this first; it currently involves inode reallocation)
commit_inodes_start();
alloc_more_node_space();
}
+ // signal journal
+ if (journal) journal->commit_epoch_finish();
+
// kick waiters
dout(10) << "commit_thread queueing commit + kicking sync waiters" << endl;
- finisher_lock.Lock();
- finisher_queue.splice(finisher_queue.end(), commit_waiters[super_epoch-1]);
+ queue_finishers(commit_waiters[super_epoch-1]);
commit_waiters.erase(super_epoch-1);
- finisher_cond.Signal();
- finisher_lock.Unlock();
sync_cond.Signal();
ebofs_lock.Lock();
if (onsafe) {
dirty = true;
- commit_waiters[super_epoch].push_back(onsafe);
+
+ while (1) {
+ if (journal) {
+ // journal empty transaction
+ Transaction t;
+ bufferlist bl;
+ t._encode(bl);
+ if (journal->submit_entry(bl, onsafe)) break;
+ }
+ commit_waiters[super_epoch].push_back(onsafe);
+ break;
+ }
}
ebofs_lock.Unlock();
}
ebofs_lock.Lock();
dout(7) << "apply_transaction start (" << t.ops.size() << " ops)" << endl;
+ unsigned r = _apply_transaction(t);
+
+ // journal, wait for commit
+ if (r != 0 && onsafe) {
+ delete onsafe; // kill callback, but still journal below (in case transaction had side effects)
+ onsafe = 0;
+ }
+ while (1) {
+ if (journal) {
+ bufferlist bl;
+ t._encode(bl);
+ if (journal->submit_entry(bl, onsafe)) break;
+ }
+ if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ break;
+ }
+
+ ebofs_lock.Unlock();
+ return r;
+}
+
+unsigned Ebofs::_apply_transaction(Transaction& t)
+{
// do ops
unsigned r = 0; // bit fields indicate which ops failed.
int bit = 1;
case Transaction::OP_GETATTR:
{
object_t oid = t.oids.front(); t.oids.pop_front();
- const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
+ const char *attrname = t.get_attrname(); t.pop_attrname();
pair<void*,int*> pattrval = t.pattrvals.front(); t.pattrvals.pop_front();
if ((*(pattrval.second) = _getattr(oid, attrname, pattrval.first, *(pattrval.second))) < 0) {
dout(7) << "apply_transaction fail on _getattr" << endl;
case Transaction::OP_SETATTR:
{
object_t oid = t.oids.front(); t.oids.pop_front();
- const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
+ const char *attrname = t.get_attrname(); t.pop_attrname();
//pair<const void*,int> attrval = t.attrvals.front(); t.attrvals.pop_front();
bufferlist bl;
bl.claim( t.attrbls.front() );
case Transaction::OP_RMATTR:
{
object_t oid = t.oids.front(); t.oids.pop_front();
- const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
+ const char *attrname = t.get_attrname(); t.pop_attrname();
if (_rmattr(oid, attrname) < 0) {
dout(7) << "apply_transaction fail on _rmattr" << endl;
r &= bit;
case Transaction::OP_COLL_SETATTR:
{
coll_t cid = t.cids.front(); t.cids.pop_front();
- const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
+ const char *attrname = t.get_attrname(); t.pop_attrname();
//pair<const void*,int> attrval = t.attrvals.front(); t.attrvals.pop_front();
bufferlist bl;
bl.claim( t.attrbls.front() );
case Transaction::OP_COLL_RMATTR:
{
coll_t cid = t.cids.front(); t.cids.pop_front();
- const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
+ const char *attrname = t.get_attrname(); t.pop_attrname();
if (_collection_rmattr(cid, attrname) < 0) {
dout(7) << "apply_transaction fail on _collection_rmattr" << endl;
r &= bit;
bit = bit << 1;
}
- dout(7) << "apply_transaction finish (r = " << r << ")" << endl;
-
- // set up commit waiter
- //if (r == 0) {
- if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
- //} else {
- //if (onsafe) delete onsafe;
- //}
-
- ebofs_lock.Unlock();
+ dout(7) << "_apply_transaction finish (r = " << r << ")" << endl;
return r;
}
}
-/*int Ebofs::write(object_t oid,
- off_t off, size_t len,
- bufferlist& bl, bool fsync)
-{
- // wait?
- if (fsync) {
- // wait for flush.
- Cond cond;
- bool done;
- int flush = 1; // write never returns positive
- Context *c = new C_Cond(&cond, &done, &flush);
- int r = write(oid, off, len, bl, c);
- if (r < 0) return r;
-
- ebofs_lock.Lock();
- {
- while (!done)
- cond.Wait(ebofs_lock);
- assert(flush <= 0);
- }
- ebofs_lock.Unlock();
- if (flush < 0) return flush;
- return r;
- } else {
- // don't wait for flush.
- return write(oid, off, len, bl, (Context*)0);
- }
-}
-*/
-
int Ebofs::write(object_t oid,
off_t off, size_t len,
bufferlist& bl, Context *onsafe)
// commit waiter
if (r > 0) {
assert((size_t)r == len);
- if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ while (1) {
+ if (journal) {
+ Transaction t;
+ t.write(oid, off, len, bl);
+ bufferlist bl;
+ t._encode(bl);
+ if (journal->submit_entry(bl, onsafe)) break;
+ }
+ if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ break;
+ }
} else {
if (onsafe) delete onsafe;
}
// do it
int r = _remove(oid);
- // set up commit waiter
+ // journal, wait for commit
if (r >= 0) {
- if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ while (1) {
+ if (journal) {
+ Transaction t;
+ t.remove(oid);
+ bufferlist bl;
+ t._encode(bl);
+ if (journal->submit_entry(bl, onsafe)) break;
+ }
+ if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ break;
+ }
} else {
if (onsafe) delete onsafe;
}
int r = _truncate(oid, size);
- // set up commit waiter
+ // journal, wait for commit
if (r >= 0) {
- if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ while (1) {
+ if (journal) {
+ Transaction t;
+ t.truncate(oid, size);
+ bufferlist bl;
+ t._encode(bl);
+ if (journal->submit_entry(bl, onsafe)) break;
+ }
+ if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ break;
+ }
} else {
if (onsafe) delete onsafe;
}
int r = _clone(from, to);
- // set up commit waiter
+ // journal, wait for commit
if (r >= 0) {
- if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ while (1) {
+ if (journal) {
+ Transaction t;
+ t.clone(from, to);
+ bufferlist bl;
+ t._encode(bl);
+ if (journal->submit_entry(bl, onsafe)) break;
+ }
+ if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ break;
+ }
} else {
if (onsafe) delete onsafe;
}
ebofs_lock.Lock();
int r = _setattr(oid, name, value, size);
- // set up commit waiter
+ // journal, wait for commit
if (r >= 0) {
- if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ while (1) {
+ if (journal) {
+ Transaction t;
+ t.setattr(oid, name, value, size);
+ bufferlist bl;
+ t._encode(bl);
+ if (journal->submit_entry(bl, onsafe)) break;
+ }
+ if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ break;
+ }
} else {
if (onsafe) delete onsafe;
}
ebofs_lock.Lock();
int r = _setattrs(oid, attrset);
- // set up commit waiter
+ // journal, wait for commit
if (r >= 0) {
- if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ while (1) {
+ if (journal) {
+ Transaction t;
+ t.setattrs(oid, attrset);
+ bufferlist bl;
+ t._encode(bl);
+ if (journal->submit_entry(bl, onsafe)) break;
+ }
+ if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ break;
+ }
} else {
if (onsafe) delete onsafe;
}
int r = _rmattr(oid, name);
- // set up commit waiter
+ // journal, wait for commit
if (r >= 0) {
- if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ while (1) {
+ if (journal) {
+ Transaction t;
+ t.rmattr(oid, name);
+ bufferlist bl;
+ t._encode(bl);
+ if (journal->submit_entry(bl, onsafe)) break;
+ }
+ if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ break;
+ }
} else {
if (onsafe) delete onsafe;
}
int r = _create_collection(cid);
- // set up commit waiter
+ // journal, wait for commit
if (r >= 0) {
- if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ while (1) {
+ if (journal) {
+ Transaction t;
+ t.create_collection(cid);
+ bufferlist bl;
+ t._encode(bl);
+ if (journal->submit_entry(bl, onsafe)) break;
+ }
+ if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ break;
+ }
} else {
if (onsafe) delete onsafe;
}
int r = _destroy_collection(cid);
- // set up commit waiter
+ // journal, wait for commit
if (r >= 0) {
- if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ while (1) {
+ if (journal) {
+ Transaction t;
+ t.remove_collection(cid);
+ bufferlist bl;
+ t._encode(bl);
+ if (journal->submit_entry(bl, onsafe)) break;
+ }
+ if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ break;
+ }
} else {
if (onsafe) delete onsafe;
}
int r = _collection_add(cid, oid);
- // set up commit waiter
+ // journal, wait for commit
if (r >= 0) {
- if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ while (1) {
+ if (journal) {
+ Transaction t;
+ t.collection_add(cid, oid);
+ bufferlist bl;
+ t._encode(bl);
+ if (journal->submit_entry(bl, onsafe)) break;
+ }
+ if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ break;
+ }
} else {
if (onsafe) delete onsafe;
}
int r = _collection_remove(cid, oid);
- // set up commit waiter
+ // journal, wait for commit
if (r >= 0) {
- if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ while (1) {
+ if (journal) {
+ Transaction t;
+ t.collection_remove(cid, oid);
+ bufferlist bl;
+ t._encode(bl);
+ if (journal->submit_entry(bl, onsafe)) break;
+ }
+ if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ break;
+ }
} else {
if (onsafe) delete onsafe;
}
int r = _collection_setattr(cid, name, value, size);
- // set up commit waiter
+ // journal, wait for commit
if (r >= 0) {
- if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ while (1) {
+ if (journal) {
+ Transaction t;
+ t.collection_setattr(cid, name, value, size);
+ bufferlist bl;
+ t._encode(bl);
+ if (journal->submit_entry(bl, onsafe)) break;
+ }
+ if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ break;
+ }
} else {
if (onsafe) delete onsafe;
}
int r = _collection_rmattr(cid, name);
- // set up commit waiter
+ // journal, wait for commit
if (r >= 0) {
- if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ while (1) {
+ if (journal) {
+ Transaction t;
+ t.collection_rmattr(cid, name);
+ bufferlist bl;
+ t._encode(bl);
+ if (journal->submit_entry(bl, onsafe)) break;
+ }
+ if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+ break;
+ }
} else {
if (onsafe) delete onsafe;
}
#include "nodes.h"
#include "Allocator.h"
#include "Table.h"
+#include "Journal.h"
#include "common/Mutex.h"
#include "common/Cond.h"
class Ebofs : public ObjectStore {
- protected:
+protected:
Mutex ebofs_lock; // a beautiful global lock
// ** debuggy **
bool fake_writes;
// ** super **
+public:
BlockDevice dev;
+protected:
bool mounted, unmounting, dirty;
bool readonly;
version_t super_epoch;
bool commit_thread_started, mid_commit;
Cond commit_cond; // to wake up the commit thread
Cond sync_cond;
+ uint64_t super_fsid;
map<version_t, list<Context*> > commit_waiters;
}
} commit_thread;
-
+public:
+ uint64_t get_fsid() { return super_fsid; }
+ epoch_t get_super_epoch() { return super_epoch; }
+protected:
+ // ** journal **
+ char *journalfn;
+ Journal *journal;
+
// ** allocator **
block_t free_blocks, limbo_blocks;
Allocator allocator;
bool finisher_stop;
list<Context*> finisher_queue;
+public:
+ void queue_finisher(Context *c) {
+ finisher_lock.Lock();
+ finisher_queue.push_back(c);
+ finisher_cond.Signal();
+ finisher_lock.Unlock();
+ }
+ void queue_finishers(list<Context*>& ls) {
+ finisher_lock.Lock();
+ finisher_queue.splice(finisher_queue.end(), ls);
+ finisher_cond.Signal();
+ finisher_lock.Unlock();
+ }
+protected:
+
void *finisher_thread_entry();
class FinisherThread : public Thread {
Ebofs *ebofs;
public:
- Ebofs(char *devfn) :
+ Ebofs(char *devfn, char *jfn=0) :
fake_writes(false),
dev(devfn),
mounted(false), unmounting(false), dirty(false), readonly(false),
super_epoch(0), commit_thread_started(false), mid_commit(false),
commit_thread(this),
+ journalfn(jfn), journal(0),
free_blocks(0), limbo_blocks(0),
allocator(this),
nodepool(ebofs_lock),
finisher_stop(false), finisher_thread(this) {
for (int i=0; i<EBOFS_NUM_FREE_BUCKETS; i++)
free_tab[i] = 0;
+ if (!journalfn) {
+ journalfn = new char[strlen(devfn) + 100];
+ strcpy(journalfn, devfn);
+ strcat(journalfn, ".journal");
+ }
}
~Ebofs() {
}
private:
// private interface -- use if caller already holds lock
+ unsigned _apply_transaction(Transaction& t);
+
int _read(object_t oid, off_t off, size_t len, bufferlist& bl);
int _is_cached(object_t oid, off_t off, size_t len);
int _stat(object_t oid, struct stat *st);
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
#include "FileJournal.h"
#include "Ebofs.h"
-#include "config.h"
-#define dout(x) if (x <= g_conf.debug_ebofs) cout << "ebofs(" << dev.get_device_name() << ").journal "
-#define derr(x) if (x <= g_conf.debug_ebofs) cerr << "ebofs(" << dev.get_device_name() << ").journal "
+#include <stdio.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+
+#include "config.h"
+#undef dout
+#define dout(x) if (true || x <= g_conf.debug_ebofs) cout << "ebofs(" << ebofs->dev.get_device_name() << ").journal "
+#define derr(x) if (x <= g_conf.debug_ebofs) cerr << "ebofs(" << ebofs->dev.get_device_name() << ").journal "
-void FileJournal::create()
+int FileJournal::create()
{
dout(1) << "create " << fn << endl;
// open/create
- fd = ::open(fn.c_str(), O_CREAT|O_WRONLY);
+ fd = ::open(fn.c_str(), O_RDWR|O_SYNC);
+ if (fd < 0) {
+ dout(1) << "create failed " << errno << " " << strerror(errno) << endl;
+ return -errno;
+ }
assert(fd > 0);
- ::ftruncate(fd);
- ::fchmod(fd, 0644);
+ //::ftruncate(fd, 0);
+ //::fchmod(fd, 0644);
+
+ // get size
+ struct stat st;
+ ::fstat(fd, &st);
+ dout(1) << "open " << fn << " " << st.st_size << " bytes" << endl;
+
+ // write empty header
+ header.clear();
+ header.fsid = ebofs->get_fsid();
+ header.max_size = st.st_size;
+ write_header();
+
+ // writeable.
+ read_pos = 0;
+ write_pos = queue_pos = sizeof(header);
::close(fd);
-}
+ return 0;
+}
-void FileJournal::open()
+int FileJournal::open()
{
- dout(1) << "open " << fn << endl;
+ //dout(1) << "open " << fn << endl;
// open and file
assert(fd == 0);
- fd = ::open(fn.c_str(), O_RDWR);
+ fd = ::open(fn.c_str(), O_RDWR|O_SYNC);
+ if (fd < 0) {
+ dout(1) << "open failed " << errno << " " << strerror(errno) << endl;
+ return -errno;
+ }
assert(fd > 0);
- // read header?
- // ***
+ // assume writeable, unless...
+ read_pos = 0;
+ write_pos = queue_pos = sizeof(header);
+ // read header?
+ read_header();
+ if (header.num > 0 && header.fsid == ebofs->get_fsid()) {
+ // valid header, pick an offset
+ for (int i=0; i<header.num; i++) {
+ if (header.epoch[i] == ebofs->get_super_epoch()) {
+ dout(2) << "using read_pos header pointer "
+ << header.epoch[i] << " at " << header.offset[i]
+ << endl;
+ read_pos = header.offset[i];
+ write_pos = queue_pos = 0;
+ break;
+ }
+ else if (header.epoch[i] < ebofs->get_super_epoch()) {
+ dout(2) << "super_epoch is " << ebofs->get_super_epoch()
+ << ", skipping old " << header.epoch[i] << " at " << header.offset[i]
+ << endl;
+ }
+ else if (header.epoch[i] > ebofs->get_super_epoch()) {
+ dout(2) << "super_epoch is " << ebofs->get_super_epoch()
+ << ", but wtf, journal is later " << header.epoch[i] << " at " << header.offset[i]
+ << endl;
+ break;
+ }
+ }
+ }
start_writer();
+
+ return 0;
}
void FileJournal::close()
stop_writer();
// close
- assert(q.empty());
+ assert(writeq.empty());
+ assert(commitq.empty());
assert(fd > 0);
::close(fd);
fd = 0;
}
+void FileJournal::print_header()
+{
+ for (int i=0; i<header.num; i++) {
+ if (i && header.offset[i] < header.offset[i-1]) {
+ assert(header.wrap);
+ dout(10) << "header: wrap at " << header.wrap << endl;
+ }
+ dout(10) << "header: epoch " << header.epoch[i] << " at " << header.offset[i] << endl;
+ }
+ //if (header.wrap) dout(10) << "header: wrap at " << header.wrap << endl;
+}
+void FileJournal::read_header()
+{
+ dout(10) << "read_header" << endl;
+ memset(&header, 0, sizeof(header)); // zero out (read may fail)
+ ::lseek(fd, 0, SEEK_SET);
+ int r = ::read(fd, &header, sizeof(header));
+ if (r < 0)
+ dout(0) << "read_header error " << errno << " " << strerror(errno) << endl;
+ print_header();
+}
void FileJournal::write_header()
{
- dout(10) << "write_header" << endl;
-
+ dout(10) << "write_header " << endl;
+ print_header();
+
::lseek(fd, 0, SEEK_SET);
- ::write(fd, &header, sizeof(header));
+ int r = ::write(fd, &header, sizeof(header));
+ if (r < 0)
+ dout(0) << "write_header error " << errno << " " << strerror(errno) << endl;
}
if (writeq.empty()) {
// sleep
dout(20) << "write_thread_entry going to sleep" << endl;
+ assert(write_pos == queue_pos);
write_cond.Wait(write_lock);
dout(20) << "write_thread_entry woke up" << endl;
continue;
// do queued writes
while (!writeq.empty()) {
// grab next item
- epoch_t e = writeq.front().first;
+ epoch_t epoch = writeq.front().first;
bufferlist bl;
bl.claim(writeq.front().second);
writeq.pop_front();
Context *oncommit = commitq.front();
commitq.pop_front();
- dout(15) << "write_thread_entry writing " << bottom << " : "
+ // wrap?
+ if (write_pos == header.wrap) {
+ dout(15) << "write_thread_entry wrapped write_pos at " << write_pos << " to " << sizeof(header_t) << endl;
+ assert(header.wrap == write_pos);
+ write_header();
+ write_pos = sizeof(header_t);
+ }
+
+ // write!
+ dout(15) << "write_thread_entry writing " << write_pos << " : "
<< bl.length()
- << " epoch " << e
+ << " epoch " << epoch
<< endl;
- // write epoch, len, data.
- ::fseek(fd, bottom, SEEK_SET);
- ::write(fd, &e, sizeof(e));
-
- uint32_t len = bl.length();
- ::write(fd, &len, sizeof(len));
+ // write entry header
+ entry_header_t h;
+ h.epoch = epoch;
+ h.len = bl.length();
+ h.make_magic(write_pos, header.fsid);
+
+ ::lseek(fd, write_pos, SEEK_SET);
+ ::write(fd, &h, sizeof(h));
for (list<bufferptr>::const_iterator it = bl.buffers().begin();
it != bl.buffers().end();
if ((*it).length() == 0) continue; // blank buffer.
::write(fd, (char*)(*it).c_str(), (*it).length() );
}
+
+ ::write(fd, &h, sizeof(h));
// move position pointer
- bottom += sizeof(epoch_t) + sizeof(uint32_t) + e.length();
+ write_pos += 2*sizeof(entry_header_t) + bl.length();
- // do commit callback
if (oncommit) {
- oncommit->finish(0);
- delete oncommit;
+ if (1) {
+ // queue callback
+ ebofs->queue_finisher(oncommit);
+ } else {
+ // callback now
+ oncommit->finish(0);
+ delete oncommit;
+ }
}
}
}
dout(10) << "write_thread_entry finish" << endl;
}
-void FileJournal::submit_entry(bufferlist& e, Context *oncommit)
+bool FileJournal::submit_entry(bufferlist& e, Context *oncommit)
{
- dout(10) << "submit_entry " << bottom << " : " << e.length()
- << " epoch " << ebofs->super_epoch
+ assert(queue_pos != 0); // bad create(), or journal didn't replay to completion.
+
+ // ** lock **
+ Mutex::Locker locker(write_lock);
+
+ // wrap? full?
+ off_t size = 2*sizeof(entry_header_t) + e.length();
+
+ if (full) return false; // already marked full.
+
+ if (header.wrap) {
+ // we're wrapped. don't overwrite ourselves.
+ if (queue_pos + size >= header.offset[0]) {
+ dout(10) << "submit_entry JOURNAL FULL (and wrapped), " << queue_pos << "+" << size
+ << " >= " << header.offset[0]
+ << endl;
+ full = true;
+ print_header();
+ return false;
+ }
+ } else {
+ // we haven't wrapped.
+ if (queue_pos + size >= header.max_size) {
+ // is there room if we wrap?
+ if ((off_t)sizeof(header_t) + size < header.offset[0]) {
+ // yes!
+ dout(10) << "submit_entry wrapped from " << queue_pos << " to " << sizeof(header_t) << endl;
+ header.wrap = queue_pos;
+ queue_pos = sizeof(header_t);
+ header.push(ebofs->get_super_epoch(), queue_pos);
+ } else {
+ // no room.
+ dout(10) << "submit_entry JOURNAL FULL (and can't wrap), " << queue_pos << "+" << size
+ << " >= " << header.max_size
+ << endl;
+ full = true;
+ return false;
+ }
+ }
+ }
+
+ dout(10) << "submit_entry " << queue_pos << " : " << e.length()
+ << " epoch " << ebofs->get_super_epoch()
<< " " << oncommit << endl;
// dump on queue
- writeq.push_back(pair<epoch_t,bufferlist>(ebofs->super_epoch, e));
+ writeq.push_back(pair<epoch_t,bufferlist>(ebofs->get_super_epoch(), e));
commitq.push_back(oncommit);
-
+
+ queue_pos += size;
+
// kick writer thread
write_cond.Signal();
+
+ return true;
}
void FileJournal::commit_epoch_start()
{
- dout(10) << "commit_epoch_start" << endl;
+ dout(10) << "commit_epoch_start on " << ebofs->get_super_epoch()-1
+ << " -- new epoch " << ebofs->get_super_epoch()
+ << endl;
- write_lock.Lock();
- {
- header.epoch2 = ebofs->super_epoch;
- header.top2 = bottom;
- write_header();
- }
- write_lock.Unlock();
+ Mutex::Locker locker(write_lock);
+
+ // was full -> empty -> now usable?
+ if (full) {
+ if (header.num != 0) {
+ dout(1) << " journal FULL, ignoring this epoch" << endl;
+ return;
+ }
+
+ dout(1) << " clearing FULL flag, journal now usable" << endl;
+ full = false;
+ }
+
+ // note epoch boundary
+ header.push(ebofs->get_super_epoch(), queue_pos); // note: these entries may not yet be written.
+ //write_header(); // no need to write it now, though...
}
void FileJournal::commit_epoch_finish()
{
- dout(10) << "commit_epoch_finish" << endl;
+ dout(10) << "commit_epoch_finish committed " << ebofs->get_super_epoch()-1 << endl;
write_lock.Lock();
{
- // update header
- header.epoch1 = ebofs->super_epoch;
- header.top1 = header.top2;
- header.epoch2 = 0;
- header.top2 = 0;
+ if (full) {
+ // full journal damage control.
+ dout(15) << " journal was FULL, contents now committed, clearing header. journal still not usable until next epoch." << endl;
+ header.clear();
+ write_pos = queue_pos = sizeof(header_t);
+ } else {
+ // update header -- trim/discard old (committed) epochs
+ while (header.epoch[0] < ebofs->get_super_epoch())
+ header.pop();
+ }
write_header();
- // flush any unwritten items in previous epoch
- while (!writeq.empty() &&
- writeq.front().first < ebofs->super_epoch) {
- dout(15) << " dropping uncommitted journal item from prior epoch" << endl;
- writeq.pop_front();
+ // discard any unwritten items in previous epoch, and do callbacks
+ epoch_t epoch = ebofs->get_super_epoch();
+ list<Context*> callbacks;
+ while (!writeq.empty() && writeq.front().first < epoch) {
+ dout(15) << " dropping unwritten and committed "
+ << write_pos << " : " << writeq.front().second.length()
+ << " epoch " << writeq.front().first
+ << endl;
+ // finisher?
Context *oncommit = commitq.front();
+ if (oncommit) callbacks.push_back(oncommit);
+
+ write_pos += 2*sizeof(entry_header_t) + writeq.front().second.length();
+
+ // discard.
+ writeq.pop_front();
commitq.pop_front();
-
- if (oncommit) {
- oncommit->finish(0);
- delete oncommit;
- }
}
+
+ // queue the finishers
+ ebofs->queue_finishers(callbacks);
}
write_lock.Unlock();
}
+
+
+void FileJournal::make_writeable()
+{
+ if (read_pos)
+ write_pos = queue_pos = read_pos;
+ else
+ write_pos = queue_pos = sizeof(header_t);
+ read_pos = 0;
+}
+
+
+bool FileJournal::read_entry(bufferlist& bl, epoch_t& epoch)
+{
+ if (!read_pos) {
+ dout(1) << "read_entry -- not readable" << endl;
+ make_writeable();
+ return false;
+ }
+
+ if (read_pos == header.wrap) {
+ // find wrap point
+ for (int i=1; i<header.num; i++) {
+ if (header.offset[i] < read_pos) {
+ assert(header.offset[i-1] < read_pos);
+ read_pos = header.offset[i];
+ break;
+ }
+ }
+ assert(read_pos != header.wrap);
+ dout(10) << "read_entry wrapped from " << header.wrap << " to " << read_pos << endl;
+ }
+
+ // header
+ entry_header_t h;
+ ::lseek(fd, read_pos, SEEK_SET);
+ ::read(fd, &h, sizeof(h));
+ if (!h.check_magic(read_pos, header.fsid)) {
+ dout(1) << "read_entry " << read_pos << " : bad header magic, end of journal" << endl;
+ make_writeable();
+ return false;
+ }
+
+ // body
+ bufferptr bp(h.len);
+ ::read(fd, bp.c_str(), h.len);
+
+ // footer
+ entry_header_t f;
+ ::read(fd, &f, sizeof(h));
+ if (!f.check_magic(read_pos, header.fsid) ||
+ h.epoch != f.epoch ||
+ h.len != f.len) {
+ dout(1) << "read_entry " << read_pos << " : bad footer magic, partially entry, end of journal" << endl;
+ make_writeable();
+ return false;
+ }
+
+
+ // yay!
+ dout(1) << "read_entry " << read_pos << " : "
+ << " " << h.len << " bytes"
+ << " epoch " << h.epoch
+ << endl;
+
+ bl.push_back(bp);
+ epoch = h.epoch;
+
+ read_pos += 2*sizeof(entry_header_t) + h.len;
+
+ return true;
+}
#include "Journal.h"
-
+#include "common/Cond.h"
+#include "common/Mutex.h"
+#include "common/Thread.h"
class FileJournal : public Journal {
public:
+ /** log header
+ * we allow 3 pointers:
+ * top/initial,
+ * one for an epoch boundary,
+ * and one for a wrap in the ring buffer/journal file.
+ * the epoch boundary one is useful only for speedier recovery in certain cases
+ * (i.e. when ebofs committed, but the journal didn't rollover ... very small window!)
+ */
struct header_t {
- epoch_t epoch1;
- off_t top1;
- epoch_t epoch2;
- off_t top2;
+ uint64_t fsid;
+ int num;
+ off_t wrap;
+ off_t max_size;
+ epoch_t epoch[3];
+ off_t offset[3];
+
+ header_t() : fsid(0), num(0), wrap(0), max_size(0) {}
+
+ void clear() {
+ num = 0;
+ wrap = 0;
+ }
+ void pop() {
+ if (num >= 2 && offset[0] > offset[1])
+ wrap = 0; // we're eliminating a wrap
+ num--;
+ for (int i=0; i<num; i++) {
+ epoch[i] = epoch[i+1];
+ offset[i] = offset[i+1];
+ }
+ }
+ void push(epoch_t e, off_t o) {
+ assert(num < 3);
+ epoch[num] = e;
+ offset[num] = o;
+ num++;
+ }
} header;
+ struct entry_header_t {
+ uint64_t epoch;
+ uint64_t len;
+ uint64_t magic1;
+ uint64_t magic2;
+
+ void make_magic(off_t pos, uint64_t fsid) {
+ magic1 = pos;
+ magic2 = fsid ^ epoch ^ len;
+ }
+ bool check_magic(off_t pos, uint64_t fsid) {
+ return
+ magic1 == (uint64_t)pos &&
+ magic2 == (fsid ^ epoch ^ len);
+ }
+ };
+
private:
string fn;
- off_t max_size;
- off_t top; // byte of first entry chronologically
- off_t bottom; // byte where next entry goes
- off_t committing_to; // offset of epoch boundary, if we are committing
+ bool full;
+ off_t write_pos; // byte where next entry written goes
+ off_t queue_pos; // byte where next entry queued for write goes
+
+ off_t read_pos; //
int fd;
Cond write_cond;
bool write_stop;
+ void print_header();
+ void read_header();
void write_header();
void start_writer();
void stop_writer();
void write_thread_entry();
+ void make_writeable();
+
class Writer : public Thread {
FileJournal *journal;
public:
Writer(FileJournal *fj) : journal(fj) {}
void *entry() {
- journal->write_thread();
+ journal->write_thread_entry();
return 0;
}
} write_thread;
public:
- FileJournal(Ebofs *e, char *f, off_t sz) :
- Journal(e),
- fn(f), max_size(sz),
- top(0), bottom(0), committing_to(0),
+ FileJournal(Ebofs *e, char *f) :
+ Journal(e), fn(f),
+ full(false),
+ write_pos(0), queue_pos(0), read_pos(0),
fd(0),
- write_stop(false), write_thread(this)
- { }
+ write_stop(false), write_thread(this) { }
~FileJournal() {}
- void create();
- void open();
+ int create();
+ int open();
void close();
// writes
- void submit_entry(bufferlist& e, Context *oncommit); // submit an item
- void commit_epoch_start(); // mark epoch boundary
- void commit_epoch_finish(); // mark prior epoch as committed (we can expire)
+ bool submit_entry(bufferlist& e, Context *oncommit); // submit an item
+ void commit_epoch_start(); // mark epoch boundary
+ void commit_epoch_finish(); // mark prior epoch as committed (we can expire)
+
+ bool read_entry(bufferlist& bl, epoch_t& e);
// reads
};
#ifndef __EBOFS_JOURNAL_H
#define __EBOFS_JOURNAL_H
+class Ebofs;
+
+#include "include/buffer.h"
+#include "include/Context.h"
class Journal {
+protected:
Ebofs *ebofs;
- public:
+public:
Journal(Ebofs *e) : ebofs(e) { }
virtual ~Journal() { }
- virtual void create() = 0;
- virtual void open() = 0;
+ virtual int create() = 0;
+ virtual int open() = 0;
virtual void close() = 0;
// writes
- virtual void submit_entry(bufferlist& e, Context *oncommit) = 0;// submit an item
+ virtual bool submit_entry(bufferlist& e, Context *oncommit) = 0;// submit an item
virtual void commit_epoch_start() = 0; // mark epoch boundary
- virtual void commit_epoch_finish(list<Context*>& ls) = 0; // mark prior epoch as committed (we can expire)
+ virtual void commit_epoch_finish() = 0; // mark prior epoch as committed (we can expire)
+ virtual bool read_entry(bufferlist& bl, epoch_t &e) = 0;
// reads/recovery
char *filename = args[0];
int seconds = atoi(args[1]);
int threads = atoi(args[2]);
+ if (!threads) threads = 1;
cout << "dev " << filename << " .. " << threads << " threads .. " << seconds << " seconds" << endl;
// explicit tests
- if (1) {
+ if (0) {
// verify that clone() plays nice with partial writes
object_t oid(1,1);
bufferptr bp(10000);
struct ebofs_super {
- unsigned s_magic;
-
- unsigned epoch; // version of this superblock.
+ uint64_t s_magic;
+ uint64_t fsid;
+
+ epoch_t epoch; // version of this superblock.
- unsigned num_blocks; /* # blocks in filesystem */
+ uint64_t num_blocks; /* # blocks in filesystem */
// some basic stats, for kicks
- unsigned free_blocks; /* unused blocks */
- unsigned limbo_blocks; /* limbo blocks */
+ uint64_t free_blocks; /* unused blocks */
+ uint64_t limbo_blocks; /* limbo blocks */
//unsigned num_objects;
//unsigned num_fragmented;
using std::cout;
using std::endl;
+ list<Context*> ls;
if (finished.empty()) return;
- dout(10) << finished.size() << " contexts to finish with " << result << endl;
- for (std::list<Context*>::iterator it = finished.begin();
- it != finished.end();
+ ls.swap(finished); // swap out of place to avoid weird loops
+
+ dout(10) << ls.size() << " contexts to finish with " << result << endl;
+ for (std::list<Context*>::iterator it = ls.begin();
+ it != ls.end();
it++) {
Context *c = *it;
dout(10) << "---- " << c << endl;
off += len+1;
}
+// const char* (encode only, string compatible)
+inline void _encode(const char *s, bufferlist& bl)
+{
+ uint32_t len = strlen(s);
+ _encoderaw(len, bl);
+ bl.append(s, len+1);
+}
+
// bufferptr (encapsulated)
inline void _encode(bufferptr& bp, bufferlist& bl)
{
for (map<dirfrag_t,int>::iterator p = import_state.begin();
p != import_state.end();
p++) {
- CDir *dir = mdcache->get_dirfrag(p->first);
+ CDir *dir = mds->mdcache->get_dirfrag(p->first);
if (dir) {
dout(10) << "importing: (" << p->second << ") " << get_import_statename(p->second)
<< " " << p->first
<< " " << *dir
<< endl;
}
+ }
}
void Migrator::show_exporting()
p != export_state.end();
p++)
dout(10) << "exporting: (" << p->second << ") " << get_export_statename(p->second)
- << " " << p->first->get_dirfrag()
+ << " " << p->first->dirfrag()
<< " " << *p->first
<< endl;
}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+#ifndef __MMONELECTION_H
+#define __MMONELECTION_H
+
+#include "msg/Message.h"
+
+
+class MMonElection : public Message {
+public:
+ static const int OP_PROPOSE = 1;
+ static const int OP_ACK = 2;
+ static const int OP_NAK = 3;
+ static const int OP_VICTORY = 4;
+ static const char *get_opname(int o) {
+ switch (o) {
+ case OP_PROPOSE: return "propose";
+ case OP_ACK: return "ack";
+ case OP_NAK: return "nak";
+ case OP_VICTORY: return "victory";
+ default: assert(0); return 0;
+ }
+ }
+
+ int32_t op;
+ epoch_t epoch;
+
+ MMonElection() : Message(MSG_MON_ELECTION) {}
+ MMonElection(int o, epoch_t e) :
+ Message(MSG_MON_ELECTION),
+ op(o), epoch(e) {}
+
+ char *get_type_name() { return "election"; }
+ void print(ostream& out) {
+ out << "election(" << get_opname(op) << " " << epoch << ")";
+ }
+
+ void encode_payload() {
+ ::_encode(op, payload);
+ ::_encode(epoch, payload);
+ }
+ void decode_payload() {
+ int off = 0;
+ ::_decode(op, payload, off);
+ ::_decode(epoch, payload, off);
+ }
+
+};
+
+#endif
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-
-#ifndef __MMONELECTIONACK_H
-#define __MMONELECTIONACK_H
-
-#include "msg/Message.h"
-
-
-class MMonElectionAck : public Message {
- public:
- MMonElectionAck() : Message(MSG_MON_ELECTION_ACK) {}
-
- virtual char *get_type_name() { return "election_ack"; }
-
- void encode_payload() {}
- void decode_payload() {}
-};
-
-#endif
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-
-#ifndef __MMONELECTIONPROPOSE_H
-#define __MMONELECTIONPROPOSE_H
-
-#include "msg/Message.h"
-
-
-class MMonElectionPropose : public Message {
- public:
- MMonElectionPropose() : Message(MSG_MON_ELECTION_PROPOSE) {}
-
- virtual char *get_type_name() { return "election_propose"; }
-
- void encode_payload() {}
- void decode_payload() {}
-
-};
-
-#endif
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-
-#ifndef __MMONELECTIONVICTORY_H
-#define __MMONELECTIONVICTORY_H
-
-#include "msg/Message.h"
-
-
-class MMonElectionVictory : public Message {
- public:
- //set<int> active_set;
-
- MMonElectionVictory(/*set<int>& as*/) : Message(MSG_MON_ELECTION_VICTORY)//,
- //active_set(as)
- {}
-
- virtual char *get_type_name() { return "election_victory"; }
-
- void encode_payload() {
- //::_encode(active_set, payload);
- }
- void decode_payload() {
- //int off = 0;
- //::_decode(active_set, payload, off);
- }
-};
-
-#endif
#define __MMONPAXOS_H
#include "msg/Message.h"
+#include "mon/mon_types.h"
class MMonPaxos : public Message {
public:
// op types
- const static int OP_COLLECT = 1; // proposer: propose round
- const static int OP_LAST = 2; // voter: accept proposed round
- const static int OP_BEGIN = 4; // proposer: value proposed for this round
- const static int OP_ACCEPT = 5; // voter: accept propsed value
- const static int OP_COMMIT = 7; // proposer: notify learners of agreed value
+ const static int OP_COLLECT = 1; // proposer: propose round
+ const static int OP_LAST = 2; // voter: accept proposed round
+ const static int OP_BEGIN = 3; // proposer: value proposed for this round
+ const static int OP_ACCEPT = 4; // voter: accept propsed value
+ const static int OP_COMMIT = 5; // proposer: notify learners of agreed value
+ const static int OP_LEASE = 6; // leader: extend peon lease
+ const static int OP_LEASE_ACK = 7; // peon: lease ack
const static char *get_opname(int op) {
switch (op) {
case OP_COLLECT: return "collect";
case OP_BEGIN: return "begin";
case OP_ACCEPT: return "accept";
case OP_COMMIT: return "commit";
+ case OP_LEASE: return "lease";
+ case OP_LEASE_ACK: return "lease_ack";
default: assert(0); return 0;
}
}
- // which state machine?
- int op;
- int machine_id;
-
+ epoch_t epoch; // monitor epoch
+ int op; // paxos op
+ int machine_id; // which state machine?
+
version_t last_committed; // i've committed to
version_t pn_from; // i promise to accept after
version_t pn; // with with proposal
version_t old_accepted_pn; // previous pn, if we are a LAST with an uncommitted value
+ utime_t lease_expire;
map<version_t,bufferlist> values;
MMonPaxos() : Message(MSG_MON_PAXOS) {}
- MMonPaxos(int o, int mid) : Message(MSG_MON_PAXOS),
- op(o), machine_id(mid),
- last_committed(0), pn_from(0), pn(0), old_accepted_pn(0) { }
+ MMonPaxos(epoch_t e, int o, int mid) :
+ Message(MSG_MON_PAXOS),
+ epoch(e),
+ op(o), machine_id(mid),
+ last_committed(0), pn_from(0), pn(0), old_accepted_pn(0) { }
virtual char *get_type_name() { return "paxos"; }
void print(ostream& out) {
- out << "paxos(m" << machine_id
+ out << "paxos(" << get_paxos_name(machine_id)
<< " " << get_opname(op) << " lc " << last_committed
<< " pn " << pn << " opn " << old_accepted_pn
<< ")";
}
void encode_payload() {
+ ::_encode(epoch, payload);
::_encode(op, payload);
::_encode(machine_id, payload);
::_encode(last_committed, payload);
::_encode(pn_from, payload);
::_encode(pn, payload);
::_encode(old_accepted_pn, payload);
+ ::_encode(lease_expire, payload);
::_encode(values, payload);
}
void decode_payload() {
int off = 0;
+ ::_decode(epoch, payload, off);
::_decode(op, payload, off);
::_decode(machine_id, payload, off);
::_decode(last_committed, payload, off);
::_decode(pn_from, payload, off);
::_decode(pn, payload, off);
::_decode(old_accepted_pn, payload, off);
+ ::_decode(lease_expire, payload, off);
::_decode(values, payload, off);
}
};
#include "Monitor.h"
#include "MDSMonitor.h"
#include "OSDMonitor.h"
+#include "MonitorStore.h"
#include "messages/MClientMount.h"
#include "messages/MClientUnmount.h"
+bool ClientMonitor::update_from_paxos()
+{
+ assert(paxos->is_active());
+ version_t paxosv = paxos->get_version();
+ dout(10) << "update_from_paxos paxosv " << paxosv
+ << ", my v " << client_map.version << endl;
+
+ assert(paxosv >= client_map.version);
+ while (paxosv > client_map.version) {
+ bufferlist bl;
+ bool success = paxos->read(client_map.version+1, bl);
+ if (success) {
+ dout(10) << "update_from_paxos applying incremental " << client_map.version+1 << endl;
+ Incremental inc;
+ int off = 0;
+ inc._decode(bl, off);
+ client_map.apply_incremental(inc);
+
+ } else {
+ dout(10) << "update_from_paxos couldn't read incremental " << client_map.version+1 << endl;
+ return false;
+ }
+
+ // save latest
+ bl.clear();
+ client_map._encode(bl);
+ mon->store->put_bl_ss(bl, "clientmap", "latest");
+
+ // prepare next inc
+ prepare_pending();
+ }
+
+ return true;
+}
+
+void ClientMonitor::prepare_pending()
+{
+ pending_inc = Incremental();
+ pending_inc.version = client_map.version + 1;
+ pending_inc.next_client = client_map.next_client;
+ dout(10) << "prepare_pending v " << pending_inc.version
+ << ", next is " << pending_inc.next_client
+ << endl;
+}
+
+void ClientMonitor::propose_pending()
+{
+ dout(10) << "propose_pending v " << pending_inc.version
+ << ", next is " << pending_inc.next_client
+ << endl;
+
+ // apply to paxos
+ assert(paxos->get_version() + 1 == pending_inc.version);
+ bufferlist bl;
+ pending_inc._encode(bl);
+ paxos->propose_new_value(bl, new C_Commit(this));
+}
+
+
+// -------
+
-void ClientMonitor::dispatch(Message *m)
+bool ClientMonitor::preprocess_update(Message *m)
{
+ dout(10) << "preprocess_update " << *m << " from " << m->get_source_inst() << endl;
+
switch (m->get_type()) {
+ case MSG_CLIENT_MOUNT:
+ {
+ // already mounted?
+ entity_addr_t addr = m->get_source_addr();
+ if (client_map.addr_client.count(addr)) {
+ int client = client_map.addr_client[addr];
+ dout(7) << " client" << client << " already mounted" << endl;
+ _mounted(client, m);
+ return true;
+ }
+ }
+ return false;
+
+ case MSG_CLIENT_UNMOUNT:
+ {
+ // already unmounted?
+ int client = m->get_source().num();
+ if (client_map.client_addr.count(client) == 0) {
+ dout(7) << " client" << client << " not mounted" << endl;
+ _unmounted(m);
+ return true;
+ }
+ }
+ return false;
+
+ default:
+ assert(0);
+ delete m;
+ return true;
+ }
+}
+
+void ClientMonitor::prepare_update(Message *m)
+{
+ dout(10) << "prepare_update " << *m << " from " << m->get_source_inst() << endl;
+
+ int client = m->get_source().num();
+ entity_addr_t addr = m->get_source_addr();
+
+ switch (m->get_type()) {
case MSG_CLIENT_MOUNT:
- handle_client_mount((MClientMount*)m);
+ {
+ // choose a client id
+ if (client < 0 ||
+ (client_map.client_addr.count(client) &&
+ client_map.client_addr[client] != addr)) {
+ client = pending_inc.next_client;
+ dout(10) << "mount: assigned client" << client << " to " << addr << endl;
+ } else {
+ dout(10) << "mount: client" << client << " requested by " << addr << endl;
+ }
+
+ pending_inc.add_mount(client, addr);
+ pending_commit.push_back(new C_Mounted(this, client, m));
+ }
break;
case MSG_CLIENT_UNMOUNT:
- handle_client_unmount((MClientUnmount*)m);
+ {
+ assert(client_map.client_addr.count(client));
+
+ pending_inc.add_unmount(client);
+ pending_commit.push_back(new C_Unmounted(this, m));
+ }
break;
-
-
+
default:
assert(0);
- }
+ delete m;
+ }
}
-void ClientMonitor::handle_client_mount(MClientMount *m)
+
+// MOUNT
+
+
+void ClientMonitor::_mounted(int client, Message *m)
{
- dout(7) << "client_mount from " << m->get_source_inst() << endl;
- assert(m->get_source().is_client());
- int from = m->get_source().num();
-
- // choose a client id
- if (from < 0 ||
- (client_map.count(from) &&
- client_map[from] != m->get_source_addr())) {
- from = num_clients++;
- dout(10) << "client_mount assigned client" << from << endl;
- }
-
- client_map[from] = m->get_source_addr();
-
- // reply with latest mds map
entity_inst_t to = m->get_source_inst();
- to.name = MSG_ADDR_CLIENT(from);
+ to.name = MSG_ADDR_CLIENT(client);
+
+ dout(10) << "_mounted client" << client << " at " << to << endl;
+
+ // reply with latest mds, osd maps
mon->mdsmon->send_latest(to);
mon->osdmon->send_latest(to);
+
delete m;
}
-void ClientMonitor::handle_client_unmount(MClientUnmount *m)
+void ClientMonitor::_unmounted(Message *m)
{
- dout(7) << "client_unmount from " << m->get_source()
- << " at " << m->get_source_inst() << endl;
- assert(m->get_source().is_client());
- int from = m->get_source().num();
-
- if (client_map.count(from)) {
- client_map.erase(from);
-
- if (client_map.empty() &&
- g_conf.mds_shutdown_on_last_unmount) {
- dout(1) << "last client unmounted" << endl;
- mon->do_stop();
- }
- }
-
- // reply with (same) unmount message to ack
+ dout(10) << "_unmounted " << m->get_source() << endl;
+
+ // reply with (same) unmount message
mon->messenger->send_message(m, m->get_source_inst());
-}
+ // auto-shutdown?
+ if (update_from_paxos() &&
+ mon->is_leader() &&
+ client_map.version > 1 &&
+ client_map.client_addr.empty() &&
+ g_conf.mds_shutdown_on_last_unmount) {
+ dout(1) << "last client unmounted" << endl;
+ mon->do_stop();
+ }
+}
-/*
-void ClientMonitor::handle_mds_shutdown(Message *m)
+void ClientMonitor::_commit(int r)
{
- assert(m->get_source().is_mds());
- int from = m->get_source().num();
-
- mdsmap.mds_inst.erase(from);
- mdsmap.all_mds.erase(from);
+ if (r >= 0) {
+ dout(10) << "_commit success" << endl;
+ finish_contexts(pending_commit);
+ } else {
+ dout(10) << "_commit failed" << endl;
+ }
- dout(7) << "mds_shutdown from " << m->get_source()
- << ", still have " << mdsmap.all_mds
- << endl;
-
- // tell someone?
- // fixme
-
- delete m;
+ finish_contexts(pending_commit, r);
}
-*/
-
/*
void ClientMonitor::bcast_latest_mds()
{
send_full(MSG_ADDR_MDS(*p), mdsmap.get_inst(*p));
}
}
-
*/
+
+
+void ClientMonitor::create_initial()
+{
+ dout(10) << "create_initial" << endl;
+
+ if (!mon->is_leader()) return;
+ if (paxos->get_version() > 0) return;
+
+ if (paxos->is_writeable()) {
+ dout(1) << "create_initial -- creating initial map" << endl;
+ prepare_pending();
+ propose_pending();
+ } else {
+ dout(1) << "create_initial -- waiting for writeable" << endl;
+ paxos->wait_for_writeable(new C_CreateInitial(this));
+ }
+}
+
+
#include "mds/MDSMap.h"
+#include "PaxosService.h"
+
class Monitor;
+class Paxos;
+
+class ClientMonitor : public PaxosService {
+public:
+
+ struct Incremental {
+ version_t version;
+ uint32_t next_client;
+ map<int32_t, entity_addr_t> mount;
+ set<int32_t> unmount;
+
+ Incremental() : version(0), next_client() {}
+
+ bool is_empty() { return mount.empty() && unmount.empty(); }
+ void add_mount(uint32_t client, entity_addr_t addr) {
+ next_client = MAX(next_client, client+1);
+ mount[client] = addr;
+ }
+ void add_unmount(uint32_t client) {
+ assert(client < next_client);
+ if (mount.count(client))
+ mount.erase(client);
+ else
+ unmount.insert(client);
+ }
+
+ void _encode(bufferlist &bl) {
+ ::_encode(version, bl);
+ ::_encode(next_client, bl);
+ ::_encode(mount, bl);
+ ::_encode(unmount, bl);
+ }
+ void _decode(bufferlist &bl, int& off) {
+ ::_decode(version, bl, off);
+ ::_decode(next_client, bl, off);
+ ::_decode(mount, bl, off);
+ ::_decode(unmount, bl, off);
+ }
+ };
+
+ struct Map {
+ version_t version;
+ uint32_t next_client;
+ map<uint32_t,entity_addr_t> client_addr;
+ hash_map<entity_addr_t,uint32_t> addr_client;
+
+ Map() : version(0), next_client(0) {}
+
+ void reverse() {
+ addr_client.clear();
+ for (map<uint32_t,entity_addr_t>::iterator p = client_addr.begin();
+ p != client_addr.end();
+ ++p) {
+ addr_client[p->second] = p->first;
+ }
+ }
+ void apply_incremental(Incremental &inc) {
+ assert(inc.version == version+1);
+ version = inc.version;
+ next_client = inc.next_client;
+ for (map<int32_t, entity_addr_t>::iterator p = inc.mount.begin();
+ p != inc.mount.end();
+ ++p) {
+ client_addr[p->first] = p->second;
+ addr_client[p->second] = p->first;
+ }
+
+ for (set<int32_t>::iterator p = inc.unmount.begin();
+ p != inc.unmount.end();
+ ++p) {
+ assert(client_addr.count(*p));
+ addr_client.erase(client_addr[*p]);
+ client_addr.erase(*p);
+ }
+ }
-class ClientMonitor : public Dispatcher {
- Monitor *mon;
- Messenger *messenger;
- Mutex &lock;
+ void _encode(bufferlist &bl) {
+ ::_encode(version, bl);
+ ::_encode(next_client, bl);
+ ::_encode(client_addr, bl);
+ }
+ void _decode(bufferlist &bl, int& off) {
+ ::_decode(version, bl, off);
+ ::_decode(next_client, bl, off);
+ ::_decode(client_addr, bl, off);
+ reverse();
+ }
+ };
- private:
- int num_clients;
- map<int,entity_addr_t> client_map;
+ class C_CreateInitial : public Context {
+ ClientMonitor *cmon;
+ public:
+ C_CreateInitial(ClientMonitor *cm) : cmon(cm) {}
+ void finish(int r) {
+ cmon->create_initial();
+ }
+ };
- void bcast_latest_mds();
+ class C_Mounted : public Context {
+ ClientMonitor *cmon;
+ int client;
+ Message *m;
+ public:
+ C_Mounted(ClientMonitor *cm, int c, Message *m_) :
+ cmon(cm), client(c), m(m_) {}
+ void finish(int r) {
+ if (r >= 0)
+ cmon->_mounted(client, m);
+ else
+ cmon->dispatch(m);
+ }
+ };
- //void accept_pending(); // accept pending, new map.
- //void send_incremental(epoch_t since, msg_addr_t dest);
+ class C_Unmounted : public Context {
+ ClientMonitor *cmon;
+ Message *m;
+ public:
+ C_Unmounted(ClientMonitor *cm, Message *m_) :
+ cmon(cm), m(m_) {}
+ void finish(int r) {
+ if (r >= 0)
+ cmon->_unmounted(m);
+ else
+ cmon->dispatch(m);
+ }
+ };
- void handle_client_mount(class MClientMount *m);
- void handle_client_unmount(class MClientUnmount *m);
+ class C_Commit : public Context {
+ ClientMonitor *cmon;
+ public:
+ C_Commit(ClientMonitor *cm) :
+ cmon(cm) {}
+ void finish(int r) {
+ cmon->_commit(r);
+ }
+ };
+private:
+ Map client_map;
+ list<Message*> waiting_for_active;
+
+ // leader
+ Incremental pending_inc;
+ list<Context*> pending_commit; // contributers to pending_inc
+
+ void create_initial();
+ bool update_from_paxos();
+ void prepare_pending(); // prepare a new pending
+ void propose_pending(); // propose pending update to peers
+
+ void _mounted(int c, Message *m);
+ void _unmounted(Message *m);
+ void _commit(int r);
+
+ void handle_query(Message *m);
+ bool preprocess_update(Message *m); // true if processed.
+ void prepare_update(Message *m);
+
+
public:
- ClientMonitor(Monitor *mn, Messenger *m, Mutex& l) : mon(mn), messenger(m), lock(l),
- num_clients(0) { }
+ ClientMonitor(Monitor *mn, Paxos *p) : PaxosService(mn, p) { }
- void dispatch(Message *m);
- void tick(); // check state, take actions
+ //void tick(); // check state, take actions
+
};
#endif
#include "Monitor.h"
#include "common/Timer.h"
-
-#include "messages/MMonElectionPropose.h"
-#include "messages/MMonElectionAck.h"
-#include "messages/MMonElectionVictory.h"
+#include "MonitorStore.h"
+#include "messages/MMonElection.h"
#include "config.h"
#undef dout
-#define derr(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cerr << g_clock.now() << " mon" << mon->whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".elector "
-#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cout << g_clock.now() << " mon" << mon->whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".elector "
+#define derr(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cerr << g_clock.now() << " mon" << mon->whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".elector(" << epoch << ") "
+#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cout << g_clock.now() << " mon" << mon->whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".elector(" << epoch << ") "
-void Elector::start()
+void Elector::init()
{
- dout(5) << "start -- can i be leader?" << endl;
+ epoch = mon->store->get_int("mon_epoch");
+ if (!epoch)
+ epoch = 1;
+ dout(1) << "init, last seen epoch " << epoch << endl;
+}
+void Elector::shutdown()
+{
+ if (expire_event)
+ mon->timer.cancel_event(expire_event);
+}
+
+void Elector::bump_epoch(epoch_t e)
+{
+ dout(10) << "bump_epoch " << epoch << " to " << e << endl;
+ assert(epoch < e);
+ epoch = e;
+ mon->store->put_int(epoch, "mon_epoch");
+
+ // clear up some state
+ electing_me = false;
+ acked_me.clear();
leader_acked = -1;
+}
+
+void Elector::start()
+{
+ dout(5) << "start -- can i be leader?" << endl;
+
// start by trying to elect me
+ if (epoch % 2 == 0)
+ bump_epoch(epoch+1); // odd == election cycle
start_stamp = g_clock.now();
- acked_me.clear();
- acked_me.insert(whoami);
electing_me = true;
+ acked_me.insert(whoami);
// bcast to everyone else
for (int i=0; i<mon->monmap->num_mon; ++i) {
if (i == whoami) continue;
- mon->messenger->send_message(new MMonElectionPropose,
+ mon->messenger->send_message(new MMonElection(MMonElection::OP_PROPOSE, epoch),
mon->monmap->get_inst(i));
}
dout(5) << "defer to " << who << endl;
if (electing_me) {
+ // drop out
acked_me.clear();
electing_me = false;
}
// ack them
leader_acked = who;
ack_stamp = g_clock.now();
- mon->messenger->send_message(new MMonElectionAck,
+ mon->messenger->send_message(new MMonElection(MMonElection::OP_ACK, epoch),
mon->monmap->get_inst(who));
// set a timer
}
-class C_Mon_ElectionExpire : public Context {
- Elector *elector;
-public:
- C_Mon_ElectionExpire(Elector *e) : elector(e) { }
- void finish(int r) {
- elector->expire();
- }
-};
-
void Elector::reset_timer(double plus)
{
// set the timer
cancel_timer();
- expire_event = new C_Mon_ElectionExpire(this);
- g_timer.add_event_after(g_conf.mon_lease + plus,
- expire_event);
+ expire_event = new C_ElectionExpire(this);
+ mon->timer.add_event_after(g_conf.mon_lease + plus,
+ expire_event);
}
void Elector::cancel_timer()
{
- if (expire_event)
- g_timer.cancel_event(expire_event);
+ if (expire_event) {
+ mon->timer.cancel_event(expire_event);
+ expire_event = 0;
+ }
}
void Elector::expire()
{
leader_acked = -1;
electing_me = false;
-
+ set<int> quorum = acked_me;
+
cancel_timer();
-
+
+ assert(epoch % 2 == 1); // election
+ bump_epoch(epoch+1); // is over!
+
// tell everyone
- for (int i=0; i<mon->monmap->num_mon; ++i) {
- if (i == whoami) continue;
- mon->messenger->send_message(new MMonElectionVictory,
- mon->monmap->get_inst(i));
+ for (set<int>::iterator p = quorum.begin();
+ p != quorum.end();
+ ++p) {
+ if (*p == whoami) continue;
+ mon->messenger->send_message(new MMonElection(MMonElection::OP_VICTORY, epoch),
+ mon->monmap->get_inst(*p));
}
// tell monitor
- mon->win_election(acked_me);
+ mon->win_election(epoch, quorum);
}
-void Elector::handle_propose(MMonElectionPropose *m)
+void Elector::handle_propose(MMonElection *m)
{
dout(5) << "handle_propose from " << m->get_source() << endl;
int from = m->get_source().num();
- if (from > whoami) {
- if (leader_acked >= 0 && // we already acked someone
- leader_acked < from) { // who would win over them
+ assert(m->epoch % 2 == 1); // election
+ if (m->epoch > epoch)
+ bump_epoch(m->epoch);
+
+ if (whoami < from) {
+ // i would win over them.
+ if (leader_acked >= 0) { // we already acked someone
+ assert(leader_acked < from); // and they still win, of course
dout(5) << "no, we already acked " << leader_acked << endl;
} else {
// wait, i should win!
delete m;
}
-void Elector::handle_ack(MMonElectionAck *m)
+void Elector::handle_ack(MMonElection *m)
{
dout(5) << "handle_ack from " << m->get_source() << endl;
int from = m->get_source().num();
+ assert(m->epoch % 2 == 1); // election
+ if (m->epoch > epoch) {
+ dout(5) << "woah, that's a newer epoch, i must have rebooted. bumping and re-starting!" << endl;
+ bump_epoch(m->epoch);
+ start();
+ delete m;
+ return;
+ }
+ assert(m->epoch == epoch);
+
if (electing_me) {
// thanks
acked_me.insert(from);
}
} else {
// ignore, i'm deferring already.
+ assert(leader_acked >= 0);
}
delete m;
}
-void Elector::handle_victory(MMonElectionVictory *m)
+
+void Elector::handle_victory(MMonElection *m)
{
dout(5) << "handle_victory from " << m->get_source() << endl;
int from = m->get_source().num();
+
+ assert(from < whoami);
+ assert(m->epoch % 2 == 0);
+ assert(m->epoch == epoch + 1); // i should have seen this election if i'm getting the victory.
+ bump_epoch(m->epoch);
- if (from < whoami) {
- // ok, fine, they win
- mon->lose_election(from);
-
- // cancel my timer
- cancel_timer();
- } else {
- // no, that makes no sense, i should win. start over!
- start();
- }
+ // they win
+ mon->lose_election(epoch, from);
+
+ // cancel my timer
+ cancel_timer();
}
void Elector::dispatch(Message *m)
{
switch (m->get_type()) {
- case MSG_MON_ELECTION_ACK:
- handle_ack((MMonElectionAck*)m);
- break;
-
- case MSG_MON_ELECTION_PROPOSE:
- handle_propose((MMonElectionPropose*)m);
- break;
-
- case MSG_MON_ELECTION_VICTORY:
- handle_victory((MMonElectionVictory*)m);
+
+ case MSG_MON_ELECTION:
+ {
+ MMonElection *em = (MMonElection*)m;
+
+ if (em->epoch < epoch) {
+ dout(5) << "old epoch, dropping" << endl;
+ delete em;
+ break;
+ }
+
+ switch (em->op) {
+ case MMonElection::OP_ACK:
+ handle_ack(em);
+ break;
+ case MMonElection::OP_PROPOSE:
+ handle_propose(em);
+ break;
+ case MMonElection::OP_VICTORY:
+ handle_victory(em);
+ break;
+ default:
+ assert(0);
+ }
+ }
break;
- default:
+ default:
assert(0);
}
}
void reset_timer(double plus=0.0);
void cancel_timer();
+ epoch_t epoch; // latest epoch we've seen. odd == election, even == stable,
+
// electing me
bool electing_me;
utime_t start_stamp;
int leader_acked; // who i've acked
utime_t ack_stamp; // and when
- public:
-
+ void bump_epoch(epoch_t e=0); // i just saw a larger epoch
+
+ class C_ElectionExpire : public Context {
+ Elector *elector;
+ public:
+ C_ElectionExpire(Elector *e) : elector(e) { }
+ void finish(int r) {
+ elector->expire();
+ }
+ };
+
void start(); // start an electing me
void defer(int who);
void expire(); // timer goes off
void victory();
- void handle_propose(class MMonElectionPropose *m);
- void handle_ack(class MMonElectionAck *m);
- void handle_victory(class MMonElectionVictory *m);
-
+ void handle_propose(class MMonElection *m);
+ void handle_ack(class MMonElection *m);
+ void handle_victory(class MMonElection *m);
public:
- Elector(Monitor *m, int w) : mon(m), whoami(w) {
- // initialize all those values!
- // ...
- }
+ Elector(Monitor *m, int w) : mon(m), whoami(w),
+ expire_event(0),
+ epoch(0),
+ electing_me(false),
+ leader_acked(-1) { }
+
+ void init();
+ void shutdown();
void dispatch(Message *m);
+
+ void call_election() {
+ start();
+ }
+
};
/********* MDS map **************/
+ class C_RetryMessage : public Context {
+ Dispatcher *svc;
+ Message *m;
+ public:
+ C_RetryMessage(Dispatcher *s, Message *m_) : svc(s), m(m_) {}
+ void finish(int r) {
+ svc->dispatch(m);
+ }
+ };
+
void MDSMonitor::dispatch(Message *m)
{
+ if (mon->is_peon()) {
+ dout(1) << "peon, fw to leader" << endl;
+ mon->messenger->send_message(m, mon->monmap->get_inst(mon->get_leader()));
+ return;
+ }
+ if (mon->is_starting()) {
+ dout(1) << "starting, waiting" << endl;
+ waiting_for_active.push_back(new C_RetryMessage(this, m));
+ return;
+ }
+
switch (m->get_type()) {
case MSG_MDS_BEACON:
load_map();
}
}
+
+ finish_contexts(waiting_for_active);
}
private:
bufferlist encoded_map;
+ list<Context*> waiting_for_active;
+
//map<epoch_t, bufferlist> inc_maps;
//MDSMap::Incremental pending_inc;
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
+ // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+ // vim: ts=8 sw=2 smarttab
+ /*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+ // TODO: missing run() method, which creates the two main timers, refreshTimer and readTimer
+
+ #include "Monitor.h"
+
+ #include "osd/OSDMap.h"
+
+ #include "MonitorStore.h"
+
+ #include "msg/Message.h"
+ #include "msg/Messenger.h"
+
+ #include "messages/MPing.h"
+ #include "messages/MPingAck.h"
+ #include "messages/MGenericMessage.h"
+ #include "messages/MMonCommand.h"
+ #include "messages/MMonCommandAck.h"
+
+ #include "messages/MMonPaxos.h"
+
+ #include "common/Timer.h"
+ #include "common/Clock.h"
+
+ #include "OSDMonitor.h"
+ #include "MDSMonitor.h"
+ #include "ClientMonitor.h"
+
+ #include "config.h"
+ #undef dout
+ #define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cout << g_clock.now() << " mon" << whoami << (is_starting() ? (const char*)"(starting)":(is_leader() ? (const char*)"(leader)":(is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << " "
+ #define derr(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cerr << g_clock.now() << " mon" << whoami << (is_starting() ? (const char*)"(starting)":(is_leader() ? (const char*)"(leader)":(is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << " "
+
+
-// TODO: missing run() method, which creates the two main timers, refreshTimer and readTimer
+ void Monitor::init()
+ {
+ lock.Lock();
-#include "Monitor.h"
-
-#include "osd/OSDMap.h"
-
-#include "MonitorStore.h"
-
-#include "msg/Message.h"
-#include "msg/Messenger.h"
-
-#include "messages/MPing.h"
-#include "messages/MPingAck.h"
-#include "messages/MGenericMessage.h"
-#include "messages/MMonCommand.h"
-#include "messages/MMonCommandAck.h"
-
-#include "messages/MMonPaxos.h"
-
-#include "common/Timer.h"
-#include "common/Clock.h"
-
-#include "OSDMonitor.h"
-#include "MDSMonitor.h"
-#include "ClientMonitor.h"
-
-#include "config.h"
-#undef dout
-#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cout << g_clock.now() << " mon" << whoami << (is_starting() ? (const char*)"(starting)":(is_leader() ? (const char*)"(leader)":(is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << " "
-#define derr(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cerr << g_clock.now() << " mon" << whoami << (is_starting() ? (const char*)"(starting)":(is_leader() ? (const char*)"(leader)":(is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << " "
-
-
-
-void Monitor::init()
-{
- lock.Lock();
-
- dout(1) << "init" << endl;
-
- // store
- char s[80];
- sprintf(s, "mondata/mon%d", whoami);
- store = new MonitorStore(s);
-
- if (g_conf.mkfs)
- store->mkfs();
-
- store->mount();
-
- // create
- osdmon = new OSDMonitor(this, messenger, lock);
- mdsmon = new MDSMonitor(this, messenger, lock);
- clientmon = new ClientMonitor(this, messenger, lock);
-
- // i'm ready!
- messenger->set_dispatcher(this);
-
- // start ticker
- reset_tick();
-
- // call election?
- if (monmap->num_mon > 1) {
- assert(monmap->num_mon != 2);
- call_election();
- } else {
- // we're standalone.
- set<int> q;
- q.insert(whoami);
- win_election(q);
- }
-
- lock.Unlock();
-}
-
-void Monitor::shutdown()
-{
- dout(1) << "shutdown" << endl;
-
- // cancel all events
- cancel_tick();
- timer.cancel_all();
- timer.join();
-
- // stop osds.
- for (set<int>::iterator it = osdmon->osdmap.get_osds().begin();
- it != osdmon->osdmap.get_osds().end();
- it++) {
- if (osdmon->osdmap.is_down(*it)) continue;
- dout(10) << "sending shutdown to osd" << *it << endl;
- messenger->send_message(new MGenericMessage(MSG_SHUTDOWN),
- osdmon->osdmap.get_inst(*it));
- }
- osdmon->mark_all_down();
-
- // monitors too.
- for (int i=0; i<monmap->num_mon; i++)
- if (i != whoami)
- messenger->send_message(new MGenericMessage(MSG_SHUTDOWN),
- monmap->get_inst(i));
-
- // unmount my local storage
- if (store)
- delete store;
-
- // clean up
- if (monmap) delete monmap;
- if (osdmon) delete osdmon;
- if (mdsmon) delete mdsmon;
- if (clientmon) delete clientmon;
-
- // die.
- messenger->shutdown();
- delete messenger;
-}
-
-
-void Monitor::call_election()
-{
- if (monmap->num_mon == 1) return;
-
- dout(10) << "call_election" << endl;
- state = STATE_STARTING;
-
- elector.start();
-
- osdmon->election_starting();
- //mdsmon->election_starting();
-}
-
-void Monitor::win_election(set<int>& active)
-{
- state = STATE_LEADER;
- leader = whoami;
- quorum = active;
- dout(10) << "win_election, quorum is " << quorum << endl;
-
- // init
- osdmon->election_finished();
- mdsmon->election_finished();
-
- // init paxos
- test_paxos.leader_start();
-}
-
-void Monitor::lose_election(int l)
-{
- state = STATE_PEON;
- leader = l;
- dout(10) << "lose_election, leader is mon" << leader << endl;
-}
-
-
-void Monitor::handle_command(MMonCommand *m)
-{
- dout(0) << "handle_command " << *m << endl;
-
- int r = -1;
- string rs = "unrecognized command";
-
- if (!m->cmd.empty()) {
- if (m->cmd[0] == "stop") {
- r = 0;
- rs = "stopping";
- do_stop();
- }
- else if (m->cmd[0] == "mds") {
- mdsmon->handle_command(m, r, rs);
- }
- else if (m->cmd[0] == "osd") {
-
- }
- }
-
- // reply
- messenger->send_message(new MMonCommandAck(r, rs), m->get_source_inst());
- delete m;
-}
-
-
-void Monitor::do_stop()
-{
- dout(0) << "do_stop -- shutting down" << endl;
- mdsmon->do_stop();
-}
-
-
-void Monitor::dispatch(Message *m)
-{
- lock.Lock();
- {
- switch (m->get_type()) {
-
- // misc
- case MSG_PING_ACK:
- handle_ping_ack((MPingAck*)m);
- break;
-
- case MSG_SHUTDOWN:
- assert(m->get_source().is_osd());
- osdmon->dispatch(m);
+ dout(1) << "init" << endl;
+
+ // store
+ char s[80];
+ sprintf(s, "mondata/mon%d", whoami);
+ store = new MonitorStore(s);
+
+ if (g_conf.mkfs)
+ store->mkfs();
+
+ store->mount();
+
+ // create
+ osdmon = new OSDMonitor(this, messenger, lock);
+ mdsmon = new MDSMonitor(this, messenger, lock);
+ clientmon = new ClientMonitor(this, &paxos_clientmap);
+
+ // init paxos
+ paxos_test.init();
+ paxos_osdmap.init();
+ paxos_mdsmap.init();
+ paxos_clientmap.init();
+
+ // i'm ready!
+ messenger->set_dispatcher(this);
+
+ // start ticker
+ reset_tick();
+
+ // call election?
+ if (monmap->num_mon > 1) {
+ assert(monmap->num_mon != 2);
+ call_election();
+ } else {
+ // we're standalone.
+ set<int> q;
+ q.insert(whoami);
+ win_election(1, q);
+ }
+
+ lock.Unlock();
+ }
+
+ void Monitor::shutdown()
+ {
+ dout(1) << "shutdown" << endl;
+
+ elector.shutdown();
+
+ // cancel all events
+ cancel_tick();
+ timer.cancel_all();
+ timer.join();
+
+ // stop osds.
+ for (set<int>::iterator it = osdmon->osdmap.get_osds().begin();
+ it != osdmon->osdmap.get_osds().end();
+ it++) {
+ if (osdmon->osdmap.is_down(*it)) continue;
+ dout(10) << "sending shutdown to osd" << *it << endl;
+ messenger->send_message(new MGenericMessage(MSG_SHUTDOWN),
+ osdmon->osdmap.get_inst(*it));
+ }
+ osdmon->mark_all_down();
+
+ // monitors too.
+ for (int i=0; i<monmap->num_mon; i++)
+ if (i != whoami)
+ messenger->send_message(new MGenericMessage(MSG_SHUTDOWN),
+ monmap->get_inst(i));
+
+ // unmount my local storage
+ if (store)
+ delete store;
+
+ // clean up
+ if (monmap) delete monmap;
+ if (osdmon) delete osdmon;
+ if (mdsmon) delete mdsmon;
+ if (clientmon) delete clientmon;
+
+ // die.
+ messenger->shutdown();
+ delete messenger;
+ }
+
+
+ void Monitor::call_election()
+ {
+ if (monmap->num_mon == 1) return;
+
+ dout(10) << "call_election" << endl;
+ state = STATE_STARTING;
+
+ elector.call_election();
+
+ osdmon->election_starting();
+ //mdsmon->election_starting();
+ }
+
+ void Monitor::win_election(epoch_t epoch, set<int>& active)
+ {
+ state = STATE_LEADER;
+ leader = whoami;
+ mon_epoch = epoch;
+ quorum = active;
+ dout(10) << "win_election, epoch " << mon_epoch << " quorum is " << quorum << endl;
+
+ // init paxos
+ paxos_test.leader_init();
+ paxos_mdsmap.leader_init();
+ paxos_osdmap.leader_init();
+ paxos_clientmap.leader_init();
+
+ // init
+ osdmon->election_finished();
+ mdsmon->election_finished();
+ clientmon->election_finished();
+ }
+
+ void Monitor::lose_election(epoch_t epoch, int l)
+ {
+ state = STATE_PEON;
+ mon_epoch = epoch;
+ leader = l;
+ dout(10) << "lose_election, epoch " << mon_epoch << " leader is mon" << leader << endl;
+
+ // init paxos
+ paxos_test.peon_init();
+ paxos_mdsmap.peon_init();
+ paxos_osdmap.peon_init();
+ paxos_clientmap.peon_init();
+ }
+
+
+ void Monitor::handle_command(MMonCommand *m)
+ {
+ dout(0) << "handle_command " << *m << endl;
+
+ int r = -1;
+ string rs = "unrecognized command";
+
+ if (!m->cmd.empty()) {
+ if (m->cmd[0] == "stop") {
+ r = 0;
+ rs = "stopping";
+ do_stop();
+ }
+ else if (m->cmd[0] == "mds") {
+ mdsmon->handle_command(m, r, rs);
+ }
+ else if (m->cmd[0] == "osd") {
+
+ }
+ }
+
+ // reply
+ messenger->send_message(new MMonCommandAck(r, rs), m->get_source_inst());
+ delete m;
+ }
+
+
+ void Monitor::do_stop()
+ {
+ dout(0) << "do_stop -- shutting down" << endl;
+ mdsmon->do_stop();
+ }
+
+
+ void Monitor::dispatch(Message *m)
+ {
+ lock.Lock();
+ {
+ switch (m->get_type()) {
+
+ // misc
+ case MSG_PING_ACK:
+ handle_ping_ack((MPingAck*)m);
+ break;
+
+ case MSG_SHUTDOWN:
+ if (m->get_source().is_osd())
+ osdmon->dispatch(m);
+ else
+ handle_shutdown(m);
+
break;
case MSG_MON_COMMAND:
// hackish: did all mds's shut down?
if (g_conf.mon_stop_with_last_mds &&
- mdsmon->mdsmap.get_num_up_or_failed_mds() == 0)
+ mdsmon->mdsmap.get_num_up_or_failed_mds() == 0 &&
+ is_leader())
shutdown();
break;
// paxos
case MSG_MON_PAXOS:
- // send it to the right paxos instance
- switch (((MMonPaxos*)m)->machine_id) {
- case PAXOS_TEST:
- test_paxos.dispatch(m);
- break;
- case PAXOS_OSDMAP:
- //...
-
- default:
- assert(0);
+ {
+ MMonPaxos *pm = (MMonPaxos*)m;
+
+ // sanitize
+ if (pm->epoch > mon_epoch)
+ assert(0); //call_election(); // wtf
+ if (pm->epoch != mon_epoch) {
+ delete pm;
+ break;
+ }
+
+ // send it to the right paxos instance
+ switch (pm->machine_id) {
+ case PAXOS_TEST:
+ paxos_test.dispatch(m);
+ break;
+ case PAXOS_OSDMAP:
+ paxos_osdmap.dispatch(m);
+ break;
+ case PAXOS_MDSMAP:
+ paxos_mdsmap.dispatch(m);
+ break;
+ case PAXOS_CLIENTMAP:
+ paxos_clientmap.dispatch(m);
+ break;
+ default:
+ assert(0);
+ }
}
break;
// elector messages
- case MSG_MON_ELECTION_PROPOSE:
- case MSG_MON_ELECTION_ACK:
- case MSG_MON_ELECTION_VICTORY:
+ case MSG_MON_ELECTION:
elector.dispatch(m);
break;
void Monitor::handle_shutdown(Message *m)
{
- dout(1) << "shutdown from " << m->get_source() << endl;
-
- shutdown();
+ assert(m->get_source().is_mon());
+ if (m->get_source().num() == get_leader()) {
+ dout(1) << "shutdown from leader " << m->get_source() << endl;
+ shutdown();
+ } else {
+ dout(1) << "ignoring shutdown from non-leader " << m->get_source() << endl;
+ }
delete m;
}
class MDSMonitor;
class ClientMonitor;
-#define PAXOS_TEST 0
-#define PAXOS_OSDMAP 1
-#define PAXOS_MDSMAP 2
-#define PAXOS_CLIENTMAP 3
class Monitor : public Dispatcher {
-protected:
+public:
// me
int whoami;
Messenger *messenger;
void reset_tick();
friend class C_Mon_Tick;
- // my local store
- //ObjectStore *store;
+ // -- local storage --
+public:
MonitorStore *store;
- const static int INO_ELECTOR = 1;
- const static int INO_MON_MAP = 2;
- const static int INO_OSD_MAP = 10;
- const static int INO_OSD_INC_MAP = 11;
- const static int INO_MDS_MAP = 20;
-
- // elector
- Elector elector;
- friend class Elector;
-
- epoch_t mon_epoch; // monitor epoch (election instance)
- set<int> quorum; // current active set of monitors (if !starting)
-
- //void call_election();
-
- // paxos
- Paxos test_paxos;
- friend class Paxos;
-
-
- // monitor state
+ // -- monitor state --
+private:
const static int STATE_STARTING = 0; // electing
const static int STATE_LEADER = 1;
const static int STATE_PEON = 2;
int state;
- int leader; // current leader (to best of knowledge)
- utime_t last_called_election; // [starting] last time i called an election
-
+public:
bool is_starting() { return state == STATE_STARTING; }
bool is_leader() { return state == STATE_LEADER; }
bool is_peon() { return state == STATE_PEON; }
- // my public services
+
+ // -- elector --
+private:
+ Elector elector;
+ friend class Elector;
+
+ epoch_t mon_epoch; // monitor epoch (election instance)
+ int leader; // current leader (to best of knowledge)
+ set<int> quorum; // current active set of monitors (if !starting)
+ utime_t last_called_election; // [starting] last time i called an election
+
+public:
+ epoch_t get_epoch() { return mon_epoch; }
+ int get_leader() { return leader; }
+ const set<int>& get_quorum() { return quorum; }
+
+ void call_election(); // initiate election
+ void win_election(epoch_t epoch, set<int>& q); // end election (called by Elector)
+ void lose_election(epoch_t epoch, int l); // end election (called by Elector)
+
+
+ // -- paxos --
+ Paxos paxos_test;
+ Paxos paxos_mdsmap;
+ Paxos paxos_osdmap;
+ Paxos paxos_clientmap;
+ friend class Paxos;
+
+
+ // -- services --
OSDMonitor *osdmon;
MDSMonitor *mdsmon;
ClientMonitor *clientmon;
- // messages
- void handle_shutdown(Message *m);
- void handle_ping_ack(class MPingAck *m);
- void handle_command(class MMonCommand *m);
-
friend class OSDMonitor;
friend class MDSMonitor;
friend class ClientMonitor;
- // initiate election
- void call_election();
- // end election (called by Elector)
- void win_election(set<int>& q);
- void lose_election(int l);
+ // messages
+ void handle_shutdown(Message *m);
+ void handle_ping_ack(class MPingAck *m);
+ void handle_command(class MMonCommand *m);
monmap(mm),
timer(lock), tick_timer(0),
store(0),
+
+ state(STATE_STARTING),
+
elector(this, w),
mon_epoch(0),
+ leader(0),
- test_paxos(this, w, PAXOS_TEST, "tester"), // tester state machine
+ paxos_test(this, w, PAXOS_TEST),
+ paxos_mdsmap(this, w, PAXOS_MDSMAP),
+ paxos_osdmap(this, w, PAXOS_OSDMAP),
+ paxos_clientmap(this, w, PAXOS_CLIENTMAP),
- state(STATE_STARTING),
- leader(0),
osdmon(0), mdsmon(0), clientmon(0)
{
}
-
void init();
void shutdown();
void dispatch(Message *m);
bool OSDMonitor::get_map_bl(epoch_t epoch, bufferlist& bl)
{
- if (!mon->store->exists_bl_sn("osdmap", epoch))
+ if (!mon->store->exists_bl_sn("osdmap_full", epoch))
return false;
- int r = mon->store->get_bl_sn(bl, "osdmap", epoch);
+ int r = mon->store->get_bl_sn(bl, "osdmap_full", epoch);
assert(r > 0);
return true;
}
bool OSDMonitor::get_inc_map_bl(epoch_t epoch, bufferlist& bl)
{
- if (!mon->store->exists_bl_sn("osdincmap", epoch))
+ if (!mon->store->exists_bl_sn("osdmap_inc", epoch))
return false;
- int r = mon->store->get_bl_sn(bl, "osdincmap", epoch);
+ int r = mon->store->get_bl_sn(bl, "osdmap_inc", epoch);
assert(r > 0);
return true;
}
bufferlist bl;
osdmap.encode(bl);
- mon->store->put_bl_sn(bl, "osdmap", osdmap.get_epoch());
+ mon->store->put_bl_sn(bl, "osdmap_full", osdmap.get_epoch());
mon->store->put_int(osdmap.get_epoch(), "osd_epoch");
}
bufferlist incbl;
inc.encode(incbl);
- mon->store->put_bl_sn(bl, "osdmap", osdmap.get_epoch());
- mon->store->put_bl_sn(incbl, "osdincmap", osdmap.get_epoch());
+ mon->store->put_bl_sn(bl, "osdmap_full", osdmap.get_epoch());
+ mon->store->put_bl_sn(incbl, "osdmap_inc", osdmap.get_epoch());
mon->store->put_int(osdmap.get_epoch(), "osd_epoch");
}
bcast_latest_osd();
bcast_latest_mds();
+ send_waiting();
} else {
dout(7) << "osd_boot waiting for "
<< (osdmap.osds.size() - osdmap.osd_inst.size())
epoch_t epoch = mon->store->get_int("osd_epoch");
dout(10) << " last epoch was " << epoch << endl;
bufferlist bl, blinc;
- int r = mon->store->get_bl_sn(bl, "osdmap", epoch);
+ int r = mon->store->get_bl_sn(bl, "osdmap_full", epoch);
assert(r>0);
osdmap.decode(bl);
#include "config.h"
#undef dout
-#define derr(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cerr << g_clock.now() << " mon" << whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".paxos(" << machine_name << ") "
-#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cout << g_clock.now() << " mon" << whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".paxos(" << machine_name << ") "
+#define derr(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cerr << g_clock.now() << " mon" << whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".paxos(" << machine_name << " " << get_statename(state) << " lc " << last_committed << ") "
+#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cout << g_clock.now() << " mon" << whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".paxos(" << machine_name << " " << get_statename(state) << " lc " << last_committed << ") "
+void Paxos::init()
+{
+ // load paxos variables from stable storage
+ last_pn = mon->store->get_int(machine_name, "last_pn");
+ accepted_pn = mon->store->get_int(machine_name, "accepted_pn");
+ last_committed = mon->store->get_int(machine_name, "last_committed");
+
+ dout(10) << "init" << endl;
+}
+
// ---------------------------------
// PHASE 1
-// proposer
-
+// leader
void Paxos::collect(version_t oldpn)
{
+ // we're recoverying, it seems!
+ state = STATE_RECOVERING;
+ assert(mon->is_leader());
+
// reset the number of lasts received
accepted_pn = get_new_proposal_number(MAX(accepted_pn, oldpn));
accepted_pn_from = last_committed;
num_last = 1;
+ old_accepted_v = 0;
old_accepted_pn = 0;
old_accepted_value.clear();
dout(10) << "collect with pn " << accepted_pn << endl;
// send collect
- for (int i=0; i<mon->monmap->num_mon; ++i) {
- if (i == whoami) continue;
+ for (set<int>::const_iterator p = mon->get_quorum().begin();
+ p != mon->get_quorum().end();
+ ++p) {
+ if (*p == whoami) continue;
- MMonPaxos *collect = new MMonPaxos(MMonPaxos::OP_COLLECT, machine_id);
+ MMonPaxos *collect = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COLLECT, machine_id);
collect->last_committed = last_committed;
collect->pn = accepted_pn;
- mon->messenger->send_message(collect, mon->monmap->get_inst(i));
+ mon->messenger->send_message(collect, mon->monmap->get_inst(*p));
}
}
+
+// peon
void Paxos::handle_collect(MMonPaxos *collect)
{
dout(10) << "handle_collect " << *collect << endl;
+ assert(mon->is_peon()); // mon epoch filter should catch strays
+
+ // we're recoverying, it seems!
+ state = STATE_RECOVERING;
+
// reply
- MMonPaxos *last = new MMonPaxos(MMonPaxos::OP_LAST, machine_id);
+ MMonPaxos *last = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LAST, machine_id);
last->last_committed = last_committed;
// do we have an accepted but uncommitted value?
accepted_pn = collect->pn;
accepted_pn_from = collect->pn_from;
dout(10) << "accepting pn " << accepted_pn << " from " << accepted_pn_from << endl;
+ mon->store->put_int(accepted_pn, machine_name, "accepted_pn");
} else {
// don't accept!
dout(10) << "NOT accepting pn " << collect->pn << " from " << collect->pn_from
}
+// leader
void Paxos::handle_last(MMonPaxos *last)
{
dout(10) << "handle_last " << *last << endl;
+ if (!mon->is_leader()) {
+ dout(10) << "not leader, dropping" << endl;
+ delete last;
+ return;
+ }
+
// share committed values?
if (last->last_committed < last_committed) {
// share committed values
dout(10) << "sending commit to " << last->get_source() << endl;
- MMonPaxos *commit = new MMonPaxos(MMonPaxos::OP_COMMIT, machine_id);
+ MMonPaxos *commit = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COMMIT, machine_id);
for (version_t v = last->last_committed;
v <= last_committed;
v++) {
mon->messenger->send_message(commit, last->get_source_inst());
}
- // did we receive committed value?
+ // did we receive a committed value?
if (last->last_committed > last_committed) {
for (version_t v = last_committed;
v <= last->last_committed;
<< last->values[v].length() << " bytes" << endl;
}
last_committed = last->last_committed;
- mon->store->put_int(last_committed, machine_name, "last_commtted");
+ mon->store->put_int(last_committed, machine_name, "last_committed");
dout(10) << "last_committed now " << last_committed << endl;
}
// do they accept your pn?
if (last->old_accepted_pn > accepted_pn) {
- dout(10) << "uh oh, they have a higher pn than us. pick a new one." << endl;
+ // no, try again.
+ dout(10) << " they had a higher pn than us, picking a new one." << endl;
collect(last->old_accepted_pn);
} else {
- // they accepted our pn. great.
+ // yes, they accepted our pn. great.
num_last++;
- dout(10) << "great, they accepted our pn, we now have " << num_last << endl;
+ dout(10) << " they accepted our pn, we now have "
+ << num_last << " peons" << endl;
// did this person send back an accepted but uncommitted value?
if (last->old_accepted_pn &&
last->old_accepted_pn > old_accepted_pn) {
- version_t v = last->last_committed+1;
- dout(10) << "we learned an old value for " << v << " pn " << last->old_accepted_pn;
+ old_accepted_v = last->last_committed+1;
old_accepted_pn = last->old_accepted_pn;
- old_accepted_value = last->values[v];
+ old_accepted_value = last->values[old_accepted_v];
+ dout(10) << "we learned an old (possible) value for " << old_accepted_v
+ << " pn " << old_accepted_pn
+ << " " << old_accepted_value.length() << " bytes"
+ << endl;
}
- // do we have a majority?
- if (num_last == mon->monmap->num_mon/2+1) {
- // do this once.
-
+ // is that everyone?
+ if (num_last == mon->get_quorum().size()) {
// did we learn an old value?
- if (old_accepted_value.length()) {
- dout(10) << "begin on old learned value" << endl;
+ if (old_accepted_v == last_committed+1 &&
+ old_accepted_value.length()) {
+ dout(10) << "that's everyone. begin on old learned value" << endl;
begin(old_accepted_value);
- }
+ } else {
+ // active!
+ dout(10) << "that's everyone. active!" << endl;
+ state = STATE_ACTIVE;
+ finish_contexts(waiting_for_active);
+ extend_lease();
+ }
}
}
}
+// leader
void Paxos::begin(bufferlist& v)
{
dout(10) << "begin for " << last_committed+1 << " "
<< new_value.length() << " bytes"
<< endl;
- // we must already have a majority for this to work.
- assert(num_last > mon->monmap->num_mon/2);
+ assert(mon->is_leader());
+
+ assert(is_active());
+ state = STATE_UPDATING;
+ // we must already have a majority for this to work.
+ assert(mon->get_quorum().size() == 1 ||
+ num_last > (unsigned)mon->monmap->num_mon/2);
+
// and no value, yet.
assert(new_value.length() == 0);
-
+
// accept it ourselves
num_accepted = 1;
new_value = v;
mon->store->put_bl_sn(new_value, machine_name, last_committed+1);
- // ask others to accept it to!
- for (int i=0; i<mon->monmap->num_mon; ++i) {
- if (i == whoami) continue;
+ if (mon->get_quorum().size() == 1) {
+ // we're alone, take it easy
+ commit();
+ state = STATE_ACTIVE;
+ finish_contexts(waiting_for_commit);
+ finish_contexts(waiting_for_active);
+ return;
+ }
- dout(10) << " sending begin to mon" << i << endl;
- MMonPaxos *begin = new MMonPaxos(MMonPaxos::OP_BEGIN, machine_id);
+ // ask others to accept it to!
+ for (set<int>::const_iterator p = mon->get_quorum().begin();
+ p != mon->get_quorum().end();
+ ++p) {
+ if (*p == whoami) continue;
+
+ dout(10) << " sending begin to mon" << *p << endl;
+ MMonPaxos *begin = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_BEGIN, machine_id);
begin->values[last_committed+1] = new_value;
+ begin->last_committed = last_committed;
begin->pn = accepted_pn;
- mon->messenger->send_message(begin, mon->monmap->get_inst(i));
+ mon->messenger->send_message(begin, mon->monmap->get_inst(*p));
}
+
+ // set timeout event
+ accept_timeout_event = new C_AcceptTimeout(this);
+ mon->timer.add_event_after(g_conf.mon_accept_timeout, accept_timeout_event);
}
+// peon
void Paxos::handle_begin(MMonPaxos *begin)
{
dout(10) << "handle_begin " << *begin << endl;
// can we accept this?
- if (begin->pn != accepted_pn) {
+ if (begin->pn < accepted_pn) {
dout(10) << " we accepted a higher pn " << accepted_pn << ", ignoring" << endl;
delete begin;
return;
}
+ assert(begin->pn == accepted_pn);
+ assert(begin->last_committed == last_committed);
+ // set state.
+ state = STATE_UPDATING;
+ lease_expire = utime_t(); // cancel lease
+
// yes.
version_t v = last_committed+1;
dout(10) << "accepting value for " << v << " pn " << accepted_pn << endl;
mon->store->put_bl_sn(begin->values[v], machine_name, v);
// reply
- MMonPaxos *accept = new MMonPaxos(MMonPaxos::OP_ACCEPT, machine_id);
+ MMonPaxos *accept = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_ACCEPT, machine_id);
accept->pn = accepted_pn;
accept->last_committed = last_committed;
mon->messenger->send_message(accept, begin->get_source_inst());
delete begin;
}
-
+// leader
void Paxos::handle_accept(MMonPaxos *accept)
{
dout(10) << "handle_accept " << *accept << endl;
delete accept;
return;
}
- if (accept->last_committed != last_committed) {
- dout(10) << " this is from an old round that's already committed, ignoring" << endl;
+ if (last_committed > 0 &&
+ accept->last_committed < last_committed-1) {
+ dout(10) << " this is from an old round, ignoring" << endl;
delete accept;
return;
}
+ assert(accept->last_committed == last_committed || // not committed
+ accept->last_committed == last_committed-1); // committed
+ assert(state == STATE_UPDATING);
num_accepted++;
dout(10) << "now " << num_accepted << " have accepted" << endl;
// new majority?
- if (num_accepted == mon->monmap->num_mon/2+1) {
+ if (num_accepted == (unsigned)mon->monmap->num_mon/2+1) {
// yay, commit!
+ // note: this may happen before the lease is reextended (below)
dout(10) << "we got a majority, committing too" << endl;
commit();
- }
+ }
+ // done?
+ if (num_accepted == mon->get_quorum().size()) {
+ state = STATE_ACTIVE;
+ finish_contexts(waiting_for_commit);
+ finish_contexts(waiting_for_active);
+ extend_lease();
+
+ // cancel timeout event
+ mon->timer.cancel_event(accept_timeout_event);
+ accept_timeout_event = 0;
+ }
+}
+
+void Paxos::accept_timeout()
+{
+ dout(5) << "accept timeout, calling fresh election" << endl;
+ assert(mon->is_leader());
+ assert(is_updating());
+ mon->call_election();
}
void Paxos::commit()
mon->store->put_int(last_committed, machine_name, "last_committed");
// tell everyone
- for (int i=0; i<mon->monmap->num_mon; ++i) {
- if (i == whoami) continue;
+ for (set<int>::const_iterator p = mon->get_quorum().begin();
+ p != mon->get_quorum().end();
+ ++p) {
+ if (*p == whoami) continue;
- dout(10) << " sending commit to mon" << i << endl;
- MMonPaxos *commit = new MMonPaxos(MMonPaxos::OP_COMMIT, machine_id);
+ dout(10) << " sending commit to mon" << *p << endl;
+ MMonPaxos *commit = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COMMIT, machine_id);
commit->values[last_committed] = new_value;
commit->pn = accepted_pn;
- mon->messenger->send_message(commit, mon->monmap->get_inst(i));
+ mon->messenger->send_message(commit, mon->monmap->get_inst(*p));
}
// get ready for a new round.
new_value.clear();
-
}
{
dout(10) << "handle_commit on " << commit->last_committed << endl;
+ if (!mon->is_peon()) {
+ dout(10) << "not a peon, dropping" << endl;
+ assert(0);
+ delete commit;
+ return;
+ }
+
// commit locally.
- last_committed = commit->last_committed;
- mon->store->put_bl_sn(commit->values[last_committed], machine_name, last_committed);
+ for (map<version_t,bufferlist>::iterator p = commit->values.begin();
+ p != commit->values.end();
+ ++p) {
+ assert(p->first == last_committed+1);
+ last_committed = p->first;
+ mon->store->put_bl_sn(p->second, machine_name, last_committed);
+ }
mon->store->put_int(last_committed, machine_name, "last_committed");
delete commit;
}
+void Paxos::extend_lease()
+{
+ assert(mon->is_leader());
+ assert(is_active());
+
+ lease_expire = g_clock.now();
+ lease_expire += g_conf.mon_lease;
+ acked_lease.clear();
+ acked_lease.insert(whoami);
+
+ dout(7) << "extend_lease now+" << g_conf.mon_lease << " (" << lease_expire << ")" << endl;
+
+ // bcast
+ for (set<int>::const_iterator p = mon->get_quorum().begin();
+ p != mon->get_quorum().end();
+ ++p) {
+ if (*p == whoami) continue;
+ MMonPaxos *lease = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LEASE, machine_id);
+ lease->last_committed = last_committed;
+ lease->lease_expire = lease_expire;
+ mon->messenger->send_message(lease, mon->monmap->get_inst(*p));
+ }
+
+ // wake people up
+ finish_contexts(waiting_for_readable);
+ finish_contexts(waiting_for_writeable);
+
+ // set renew event
+ lease_renew_event = new C_LeaseRenew(this);
+ utime_t at = lease_expire;
+ at -= g_conf.mon_lease;
+ at += g_conf.mon_lease_renew_interval;
+ mon->timer.add_event_at(at, lease_renew_event);
+
+ // set timeout event.
+ // if old timeout is still in place, leave it.
+ if (!lease_ack_timeout_event) {
+ lease_ack_timeout_event = new C_LeaseAckTimeout(this);
+ mon->timer.add_event_after(g_conf.mon_lease_ack_timeout, lease_ack_timeout_event);
+ }
+}
+
+
+// peon
+void Paxos::handle_lease(MMonPaxos *lease)
+{
+ // sanity
+ if (!mon->is_peon() ||
+ last_committed != lease->last_committed) {
+ dout(10) << "handle_lease i'm not a peon, or they're not the leader, or the last_committed doesn't match, dropping" << endl;
+ delete lease;
+ return;
+ }
+
+ // extend lease
+ if (lease_expire < lease->lease_expire)
+ lease_expire = lease->lease_expire;
+
+ state = STATE_ACTIVE;
+ finish_contexts(waiting_for_active);
+
+ dout(10) << "handle_lease on " << lease->last_committed
+ << " now " << lease_expire << endl;
+
+ // ack
+ MMonPaxos *ack = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LEASE_ACK, machine_id);
+ ack->last_committed = last_committed;
+ ack->lease_expire = lease_expire;
+ mon->messenger->send_message(ack, lease->get_source_inst());
+
+ // kick waiters
+ if (is_readable())
+ finish_contexts(waiting_for_readable);
+
+ delete lease;
+}
+
+void Paxos::handle_lease_ack(MMonPaxos *ack)
+{
+ int from = ack->get_source().num();
+
+ if (acked_lease.count(from) == 0) {
+ acked_lease.insert(from);
+
+ if (acked_lease == mon->get_quorum()) {
+ // yay!
+ dout(10) << "handle_lease_ack from " << ack->get_source()
+ << " -- got everyone" << endl;
+ mon->timer.cancel_event(lease_ack_timeout_event);
+ lease_ack_timeout_event = 0;
+ } else {
+ dout(10) << "handle_lease_ack from " << ack->get_source()
+ << " -- still need "
+ << mon->get_quorum().size() - acked_lease.size()
+ << " more" << endl;
+ }
+ } else {
+ dout(10) << "handle_lease_ack from " << ack->get_source()
+ << " dup (lagging!), ignoring" << endl;
+ }
+
+ delete ack;
+}
+
+void Paxos::lease_ack_timeout()
+{
+ dout(5) << "lease_ack_timeout -- calling new election" << endl;
+ assert(mon->is_leader());
+ assert(is_active());
+ mon->call_election();
+}
/*
*/
version_t Paxos::get_new_proposal_number(version_t gt)
{
- // read last
- version_t last = mon->store->get_int("last_paxos_proposal");
- if (last < gt)
- last = gt;
+ if (last_pn < gt)
+ last_pn = gt;
- // update
- last /= 100;
- last++;
-
- // make it unique among all monitors.
- version_t pn = last*100 + (version_t)whoami;
+ // update. make it unique among all monitors.
+ last_pn /= 100;
+ last_pn++;
+ last_pn *= 100;
+ last_pn += (version_t)whoami;
// write
- mon->store->put_int(pn, "last_paxos_proposal");
+ mon->store->put_int(last_pn, machine_name, "last_pn");
- dout(10) << "get_new_proposal_number = " << pn << endl;
- return pn;
+ dout(10) << "get_new_proposal_number = " << last_pn << endl;
+ return last_pn;
}
-void Paxos::leader_start()
+void Paxos::cancel_events()
+{
+ if (accept_timeout_event) {
+ mon->timer.cancel_event(accept_timeout_event);
+ accept_timeout_event = 0;
+ }
+ if (lease_renew_event) {
+ mon->timer.cancel_event(lease_renew_event);
+ lease_renew_event = 0;
+ }
+ if (lease_ack_timeout_event) {
+ mon->timer.cancel_event(lease_ack_timeout_event);
+ lease_ack_timeout_event = 0;
+ }
+}
+
+void Paxos::leader_init()
{
- dout(10) << "leader_start -- i am the leader, start paxos" << endl;
+ if (mon->get_quorum().size() == 1) {
+ state = STATE_ACTIVE;
+ return;
+ }
+ cancel_events();
+ state = STATE_RECOVERING;
+ lease_expire = utime_t();
+ dout(10) << "leader_init -- starting paxos recovery" << endl;
collect(0);
}
+void Paxos::peon_init()
+{
+ cancel_events();
+ state = STATE_RECOVERING;
+ lease_expire = utime_t();
+ dout(10) << "peon_init -- i am a peon" << endl;
+
+ // no chance to write now!
+ finish_contexts(waiting_for_writeable, -1);
+ finish_contexts(waiting_for_commit, -1);
+}
+
void Paxos::dispatch(Message *m)
{
+ // election in progress?
+ if (mon->is_starting()) {
+ dout(5) << "election in progress, dropping " << *m << endl;
+ delete m;
+ return;
+ }
+
+ // check sanity
+ assert(mon->is_leader() ||
+ (mon->is_peon() && m->get_source().num() == mon->get_leader()));
+
switch (m->get_type()) {
-
+
case MSG_MON_PAXOS:
{
MMonPaxos *pm = (MMonPaxos*)m;
case MMonPaxos::OP_COLLECT:
handle_collect(pm);
break;
-
case MMonPaxos::OP_LAST:
handle_last(pm);
break;
-
case MMonPaxos::OP_BEGIN:
handle_begin(pm);
break;
-
case MMonPaxos::OP_ACCEPT:
handle_accept(pm);
break;
-
case MMonPaxos::OP_COMMIT:
handle_commit(pm);
break;
-
+ case MMonPaxos::OP_LEASE:
+ handle_lease(pm);
+ break;
+ case MMonPaxos::OP_LEASE_ACK:
+ handle_lease_ack(pm);
+ break;
default:
assert(0);
}
}
}
+
+
+
+// -----------------
+// service interface
+
+// -- READ --
+
+bool Paxos::is_readable()
+{
+ if (mon->get_quorum().size() == 1) return true;
+ return
+ (mon->is_peon() || mon->is_leader()) &&
+ is_active() &&
+ g_clock.now() < lease_expire;
+}
+
+bool Paxos::read(version_t v, bufferlist &bl)
+{
+ if (!is_readable())
+ return false;
+
+ if (!mon->store->get_bl_sn(bl, machine_name, v))
+ return false;
+ return true;
+}
+
+version_t Paxos::read_current(bufferlist &bl)
+{
+ if (!is_readable())
+ return 0;
+ if (read(last_committed, bl))
+ return last_committed;
+ return 0;
+}
+
+
+
+
+// -- WRITE --
+
+bool Paxos::is_writeable()
+{
+ if (mon->get_quorum().size() == 1) return true;
+ return
+ mon->is_leader() &&
+ is_active() &&
+ g_clock.now() < lease_expire;
+}
+
+bool Paxos::propose_new_value(bufferlist& bl, Context *oncommit)
+{
+ /*
+ // writeable?
+ if (!is_writeable()) {
+ dout(5) << "propose_new_value " << last_committed+1 << " " << bl.length() << " bytes"
+ << " -- not writeable" << endl;
+ if (oncommit) {
+ oncommit->finish(-1);
+ delete oncommit;
+ }
+ return false;
+ }
+ */
+
+ assert(mon->is_leader() && is_active());
+
+ // cancel lease renewal and timeout events.
+ cancel_events();
+
+ // ok!
+ dout(5) << "propose_new_value " << last_committed+1 << " " << bl.length() << " bytes" << endl;
+ if (oncommit)
+ waiting_for_commit.push_back(oncommit);
+ begin(bl);
+
+ return true;
+}
+
*/
+
+/*
+ * NOTE: This libary is based on the Paxos algorithm, but varies in a few key ways:
+ * 1- Only a single new value is generated at a time, simplifying the recovery logic.
+ * 2- Nodes track "committed" values, and share them generously (and trustingly)
+ * 3- A 'leasing' mechism is built-in, allowing nodes to determine when it is safe to
+ * "read" their copy of the last committed value.
+ *
+ * This provides a simple replication substrate that services can be built on top of.
+ */
+
#ifndef __MON_PAXOS_H
#define __MON_PAXOS_H
#include "include/types.h"
+#include "mon_types.h"
#include "include/buffer.h"
#include "msg/Message.h"
class Monitor;
class MMonPaxos;
+
// i am one state machine.
class Paxos {
Monitor *mon;
int machine_id;
const char *machine_name;
- // phase 1
+ friend class PaxosService;
+
+ // LEADER+PEON
+
+ // -- generic state --
+public:
+ const static int STATE_RECOVERING = 1; // leader|peon: recovering paxos state
+ const static int STATE_ACTIVE = 2; // leader|peon: idle. peon may or may not have valid lease
+ const static int STATE_UPDATING = 3; // leader|peon: updating to new value
+ const char *get_statename(int s) {
+ switch (s) {
+ case STATE_RECOVERING: return "recovering";
+ case STATE_ACTIVE: return "active";
+ case STATE_UPDATING: return "updating";
+ default: assert(0); return 0;
+ }
+ }
+
+private:
+ int state;
+
+public:
+ bool is_recovering() { return state == STATE_RECOVERING; }
+ bool is_active() { return state == STATE_ACTIVE; }
+ bool is_updating() { return state == STATE_UPDATING; }
+
+private:
+ // recovery (phase 1)
+ version_t last_pn;
version_t last_committed;
version_t accepted_pn;
version_t accepted_pn_from;
-
- // results from our last replies
- int num_last;
+
+ // active (phase 2)
+ utime_t lease_expire;
+ list<Context*> waiting_for_active;
+ list<Context*> waiting_for_readable;
+
+
+ // -- leader --
+ // recovery (paxos phase 1)
+ unsigned num_last;
+ version_t old_accepted_v;
version_t old_accepted_pn;
bufferlist old_accepted_value;
- // phase 2
+ // active
+ set<int> acked_lease;
+ Context *lease_renew_event;
+ Context *lease_ack_timeout_event;
+
+ // updating (paxos phase 2)
bufferlist new_value;
- int num_accepted;
-
+ unsigned num_accepted;
+
+ Context *accept_timeout_event;
+
+ list<Context*> waiting_for_writeable;
+ list<Context*> waiting_for_commit;
+
+ class C_AcceptTimeout : public Context {
+ Paxos *paxos;
+ public:
+ C_AcceptTimeout(Paxos *p) : paxos(p) {}
+ void finish(int r) {
+ paxos->accept_timeout();
+ }
+ };
+
+ class C_LeaseAckTimeout : public Context {
+ Paxos *paxos;
+ public:
+ C_LeaseAckTimeout(Paxos *p) : paxos(p) {}
+ void finish(int r) {
+ paxos->lease_ack_timeout();
+ }
+ };
+
+ class C_LeaseRenew : public Context {
+ Paxos *paxos;
+ public:
+ C_LeaseRenew(Paxos *p) : paxos(p) {}
+ void finish(int r) {
+ paxos->extend_lease();
+ }
+ };
+
+
void collect(version_t oldpn);
void handle_collect(MMonPaxos*);
void handle_last(MMonPaxos*);
void begin(bufferlist& value);
void handle_begin(MMonPaxos*);
void handle_accept(MMonPaxos*);
+ void accept_timeout();
void commit();
void handle_commit(MMonPaxos*);
+ void extend_lease();
+ void handle_lease(MMonPaxos*);
+ void handle_lease_ack(MMonPaxos*);
+ void lease_ack_timeout();
+
+ void cancel_events();
version_t get_new_proposal_number(version_t gt=0);
public:
Paxos(Monitor *m, int w,
- int mid,const char *mnm) : mon(m), whoami(w),
- machine_id(mid), machine_name(mnm) {
- }
+ int mid) : mon(m), whoami(w),
+ machine_id(mid),
+ machine_name(get_paxos_name(mid)),
+ state(STATE_RECOVERING),
+ lease_renew_event(0),
+ lease_ack_timeout_event(0),
+ accept_timeout_event(0) { }
void dispatch(Message *m);
- void leader_start();
+ void init();
+
+ void leader_init();
+ void peon_init();
+
+
+ // -- service interface --
+ /*
+ void wait_for_active(Context *c) {
+ assert(!is_active());
+ waiting_for_active.push_back(c);
+ }
+ */
+
+ // read
+ version_t get_version() { return last_committed; }
+ bool is_readable();
+ bool read(version_t v, bufferlist &bl);
+ version_t read_current(bufferlist &bl);
+ void wait_for_readable(Context *onreadable) {
+ assert(!is_readable());
+ waiting_for_readable.push_back(onreadable);
+ }
+
+ // write
+ bool is_leader();
+ bool is_writeable();
+ void wait_for_writeable(Context *c) {
+ assert(!is_writeable());
+ waiting_for_writeable.push_back(c);
+ }
+
+ bool propose_new_value(bufferlist& bl, Context *oncommit=0);
+ void wait_for_commit(Context *oncommit) {
+ waiting_for_commit.push_back(oncommit);
+ }
};
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "PaxosService.h"
+#include "common/Clock.h"
+#include "Monitor.h"
+
+
+
+#include "config.h"
+#undef dout
+#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cout << g_clock.now() << " mon" << mon->whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << "." << get_paxos_name(paxos->machine_id) << " "
+#define derr(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cerr << g_clock.now() << " mon" << mon->whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << "." << get_paxos_name(paxos->machine_id) << " "
+
+
+
+void PaxosService::dispatch(Message *m)
+{
+ dout(10) << "dispatch " << *m << " from " << m->get_source_inst() << endl;
+
+ // make sure our map is readable and up to date
+ if (!paxos->is_readable() ||
+ !update_from_paxos()) {
+ dout(10) << " waiting for paxos -> readable" << endl;
+ paxos->wait_for_readable(new C_RetryMessage(this, m));
+ return;
+ }
+
+ // preprocess
+ if (preprocess_update(m))
+ return; // easy!
+
+ // leader?
+ if (!mon->is_leader()) {
+ // fw to leader
+ dout(10) << " fw to leader mon" << mon->get_leader() << endl;
+ mon->messenger->send_message(m, mon->monmap->get_inst(mon->get_leader()));
+ return;
+ }
+
+ // writeable?
+ if (!paxos->is_writeable()) {
+ dout(10) << " waiting for paxos -> writeable" << endl;
+ paxos->wait_for_writeable(new C_RetryMessage(this, m));
+ return;
+ }
+
+ prepare_update(m);
+
+ // do it now (for now!) ***
+ propose_pending();
+}
+
+void PaxosService::election_finished()
+{
+ if (mon->is_leader() && g_conf.mkfs)
+ create_initial();
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef __PAXOSSERVICE_H
+#define __PAXOSSERVICE_H
+
+#include "msg/Dispatcher.h"
+#include "include/Context.h"
+
+class Monitor;
+class Paxos;
+
+class PaxosService : public Dispatcher {
+protected:
+ Monitor *mon;
+ Paxos *paxos;
+
+
+ class C_RetryMessage : public Context {
+ Dispatcher *svc;
+ Message *m;
+ public:
+ C_RetryMessage(Dispatcher *s, Message *m_) : svc(s), m(m_) {}
+ void finish(int r) {
+ svc->dispatch(m);
+ }
+ };
+
+public:
+ PaxosService(Monitor *mn, Paxos *p) : mon(mn), paxos(p) { }
+
+ // i implement
+ void dispatch(Message *m);
+ void election_finished();
+
+ // you implement
+ virtual void create_initial() = 0;
+ virtual bool update_from_paxos() = 0;
+ virtual void prepare_pending() = 0;
+ virtual void propose_pending() = 0;
+
+ virtual bool preprocess_update(Message *m) = 0; // true if processed.
+ virtual void prepare_update(Message *m)= 0;
+
+ virtual void tick() {}; // check state, take actions
+
+
+};
+
+#endif
+
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef __MON_TYPES_H
+#define __MON_TYPES_H
+
+#define PAXOS_TEST 0
+#define PAXOS_MDSMAP 1
+#define PAXOS_OSDMAP 2
+#define PAXOS_CLIENTMAP 3
+
+inline const char *get_paxos_name(int p) {
+ switch (p) {
+ case PAXOS_TEST: return "test";
+ case PAXOS_MDSMAP: return "mdsmap";
+ case PAXOS_OSDMAP: return "osdmap";
+ case PAXOS_CLIENTMAP: return "clientmap";
+ default: assert(0); return 0;
+ }
+}
+
+#endif
{
lock.Lock();
while (1) {
+ if (fm_shutdown) break;
+ fakemessenger_do_loop_2();
+
+ if (directory.empty()) break;
+
dout(20) << "thread waiting" << endl;
if (fm_shutdown) break;
awake = false;
cond.Wait(lock);
awake = true;
dout(20) << "thread woke up" << endl;
- if (fm_shutdown) break;
-
- fakemessenger_do_loop_2();
-
- if (directory.empty()) break;
}
lock.Unlock();
#include "messages/MMonCommandAck.h"
#include "messages/MMonPaxos.h"
-#include "messages/MMonElectionAck.h"
-#include "messages/MMonElectionPropose.h"
-#include "messages/MMonElectionVictory.h"
+#include "messages/MMonElection.h"
#include "messages/MPing.h"
#include "messages/MPingAck.h"
m = new MMonPaxos;
break;
- case MSG_MON_ELECTION_PROPOSE:
- m = new MMonElectionPropose;
- break;
- case MSG_MON_ELECTION_ACK:
- m = new MMonElectionAck;
- break;
- case MSG_MON_ELECTION_VICTORY:
- m = new MMonElectionVictory;
+ case MSG_MON_ELECTION:
+ m = new MMonElection;
break;
case MSG_PING:
#define MSG_MON_COMMAND_ACK 14
-#define MSG_MON_ELECTION_ACK 15
-#define MSG_MON_ELECTION_PROPOSE 16
-#define MSG_MON_ELECTION_VICTORY 17
+#define MSG_MON_ELECTION 15
#define MSG_MON_OSDMAP_INFO 20
#define MSG_MON_OSDMAP_LEASE 21
switch (op->get_op()) {
case OSD_OP_WRLOCK:
{ // lock object
- //r = store->setattr(oid, "wrlock", &op->get_asker(), sizeof(msg_addr_t), oncommit);
t.setattr(oid, "wrlock", &op->get_client(), sizeof(entity_name_t));
}
break;
list<off_t> offsets;
list<size_t> lengths;
list<const char*> attrnames;
+ list<string> attrnames2;
//list< pair<const void*,int> > attrvals;
list<bufferlist> attrbls;
+ // for reads only (not encoded)
list<bufferlist*> pbls;
list<struct stat*> psts;
list< pair<void*,int*> > pattrvals;
list< map<string,bufferptr>* > pattrsets;
+ const char *get_attrname() {
+ if (attrnames.empty())
+ return attrnames2.front().c_str();
+ else
+ return attrnames.front();
+ }
+ void pop_attrname() {
+ if (attrnames.empty())
+ attrnames2.pop_front();
+ else
+ attrnames.pop_front();
+ }
+
void read(object_t oid, off_t off, size_t len, bufferlist *pbl) {
int op = OP_READ;
ops.push_back(op);
}
// etc.
+
+ void _encode(bufferlist& bl) {
+ ::_encode(ops, bl);
+ ::_encode(bls, bl);
+ ::_encode(oids, bl);
+ ::_encode(cids, bl);
+ ::_encode(offsets, bl);
+ ::_encode(lengths, bl);
+ ::_encode(attrnames, bl);
+ ::_encode(attrbls, bl);
+ }
+ void _decode(bufferlist& bl, int& off) {
+ ::_decode(ops, bl, off);
+ ::_decode(bls, bl, off);
+ ::_decode(oids, bl, off);
+ ::_decode(cids, bl, off);
+ ::_decode(offsets, bl, off);
+ ::_decode(lengths, bl, off);
+ ::_decode(attrnames2, bl, off);
+ ::_decode(attrbls, bl, off);
+ }
};
case Transaction::OP_GETATTR:
{
object_t oid = t.oids.front(); t.oids.pop_front();
- const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
+ const char *attrname = t.get_attrname(); t.pop_attrname();
pair<void*,int*> pattrval = t.pattrvals.front(); t.pattrvals.pop_front();
*pattrval.second = getattr(oid, attrname, pattrval.first, *pattrval.second);
}
case Transaction::OP_SETATTR:
{
object_t oid = t.oids.front(); t.oids.pop_front();
- const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
+ const char *attrname = t.get_attrname(); t.pop_attrname();
//pair<const void*,int> attrval = t.attrvals.front(); t.attrvals.pop_front();
bufferlist bl;
bl.claim( t.attrbls.front() );
case Transaction::OP_RMATTR:
{
object_t oid = t.oids.front(); t.oids.pop_front();
- const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
+ const char *attrname = t.get_attrname(); t.pop_attrname();
rmattr(oid, attrname, 0);
}
break;
case Transaction::OP_COLL_SETATTR:
{
coll_t cid = t.cids.front(); t.cids.pop_front();
- const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
+ const char *attrname = t.get_attrname(); t.pop_attrname();
//pair<const void*,int> attrval = t.attrvals.front(); t.attrvals.pop_front();
bufferlist bl;
bl.claim( t.attrbls.front() );
case Transaction::OP_COLL_RMATTR:
{
coll_t cid = t.cids.front(); t.cids.pop_front();
- const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
+ const char *attrname = t.get_attrname(); t.pop_attrname();
collection_rmattr(cid, attrname, 0);
}
break;