From 9d73dc45207e0575013f3559514985d2a60444ca Mon Sep 17 00:00:00 2001 From: sageweil Date: Tue, 26 Jun 2007 22:50:17 +0000 Subject: [PATCH] merged trunk changes r1407:1446 into branches/sage/cephmds2 git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1447 29311d96-e01e-0410-9327-a35deaab8ce9 --- branches/sage/cephmds2/Makefile | 4 +- branches/sage/cephmds2/client/Client.cc | 6 +- branches/sage/cephmds2/config.cc | 5 +- branches/sage/cephmds2/config.h | 3 + branches/sage/cephmds2/ebofs/Ebofs.cc | 336 +++++++++--- branches/sage/cephmds2/ebofs/Ebofs.h | 40 +- branches/sage/cephmds2/ebofs/FileJournal.cc | 373 +++++++++++-- branches/sage/cephmds2/ebofs/FileJournal.h | 99 +++- branches/sage/cephmds2/ebofs/Journal.h | 16 +- branches/sage/cephmds2/ebofs/test.ebofs.cc | 3 +- branches/sage/cephmds2/ebofs/types.h | 13 +- branches/sage/cephmds2/include/Context.h | 9 +- branches/sage/cephmds2/include/buffer.h | 8 + branches/sage/cephmds2/mds/Migrator.cc | 5 +- .../sage/cephmds2/messages/MMonElection.h | 63 +++ .../cephmds2/messages/MMonElectionPropose.h | 33 -- .../cephmds2/messages/MMonElectionVictory.h | 41 -- branches/sage/cephmds2/messages/MMonPaxos.h | 38 +- branches/sage/cephmds2/mon/ClientMonitor.cc | 240 +++++++-- branches/sage/cephmds2/mon/ClientMonitor.h | 174 +++++- branches/sage/cephmds2/mon/Elector.cc | 177 +++--- branches/sage/cephmds2/mon/Elector.h | 39 +- branches/sage/cephmds2/mon/MDSMonitor.cc | 23 + branches/sage/cephmds2/mon/MDSMonitor.h | 2 + branches/sage/cephmds2/mon/Monitor.cc | 502 ++++++++++-------- branches/sage/cephmds2/mon/Monitor.h | 94 ++-- branches/sage/cephmds2/mon/OSDMonitor.cc | 17 +- branches/sage/cephmds2/mon/Paxos.cc | 482 ++++++++++++++--- branches/sage/cephmds2/mon/Paxos.h | 155 +++++- branches/sage/cephmds2/mon/PaxosService.cc | 69 +++ branches/sage/cephmds2/mon/PaxosService.h | 62 +++ .../MMonElectionAck.h => mon/mon_types.h} | 29 +- branches/sage/cephmds2/msg/FakeMessenger.cc | 10 +- branches/sage/cephmds2/msg/Message.cc | 14 +- branches/sage/cephmds2/msg/Message.h | 4 +- branches/sage/cephmds2/osd/OSD.cc | 1 - branches/sage/cephmds2/osd/ObjectStore.h | 46 +- 37 files changed, 2419 insertions(+), 816 deletions(-) create mode 100644 branches/sage/cephmds2/messages/MMonElection.h delete mode 100644 branches/sage/cephmds2/messages/MMonElectionPropose.h delete mode 100644 branches/sage/cephmds2/messages/MMonElectionVictory.h create mode 100644 branches/sage/cephmds2/mon/PaxosService.cc create mode 100644 branches/sage/cephmds2/mon/PaxosService.h rename branches/sage/cephmds2/{messages/MMonElectionAck.h => mon/mon_types.h} (52%) diff --git a/branches/sage/cephmds2/Makefile b/branches/sage/cephmds2/Makefile index 49cfc23c92adb..191aefe268521 100644 --- a/branches/sage/cephmds2/Makefile +++ b/branches/sage/cephmds2/Makefile @@ -38,7 +38,8 @@ EBOFS_OBJS= \ ebofs/BlockDevice.o\ ebofs/BufferCache.o\ ebofs/Ebofs.o\ - ebofs/Allocator.o + ebofs/Allocator.o\ + ebofs/FileJournal.o MDS_OBJS= \ mds/MDS.o\ @@ -72,6 +73,7 @@ OSDC_OBJS= \ MON_OBJS= \ mon/Monitor.o\ mon/Paxos.o\ + mon/PaxosService.o\ mon/OSDMonitor.o\ mon/MDSMonitor.o\ mon/ClientMonitor.o\ diff --git a/branches/sage/cephmds2/client/Client.cc b/branches/sage/cephmds2/client/Client.cc index 7682524f81325..8b65c76c1ebab 100644 --- a/branches/sage/cephmds2/client/Client.cc +++ b/branches/sage/cephmds2/client/Client.cc @@ -875,16 +875,14 @@ void Client::handle_mds_map(MMDSMap* m) if (m->get_source().is_mds()) frommds = m->get_source().num(); - if (mdsmap == 0) + if (mdsmap == 0) { mdsmap = new MDSMap; - if (whoami < 0) { - // mounted! assert(m->get_source().is_mon()); whoami = m->get_dest().num(); dout(1) << "handle_mds_map i am now " << m->get_dest() << endl; messenger->reset_myname(m->get_dest()); - + mount_cond.Signal(); // mount might be waiting for this. } diff --git a/branches/sage/cephmds2/config.cc b/branches/sage/cephmds2/config.cc index b4c8e0edde5fa..e5d7d3d30cfa6 100644 --- a/branches/sage/cephmds2/config.cc +++ b/branches/sage/cephmds2/config.cc @@ -125,7 +125,10 @@ md_config_t g_conf = { // --- mon --- mon_tick_interval: 5, mon_osd_down_out_interval: 5, // seconds - mon_lease: 2.000, // seconds + mon_lease: 5, // seconds + mon_lease_renew_interval: 3, + mon_lease_ack_timeout: 10.0, + mon_accept_timeout: 10.0, mon_stop_with_last_mds: true, // --- client --- diff --git a/branches/sage/cephmds2/config.h b/branches/sage/cephmds2/config.h index b8c6d20d7d6a5..232041b717d92 100644 --- a/branches/sage/cephmds2/config.h +++ b/branches/sage/cephmds2/config.h @@ -114,6 +114,9 @@ struct md_config_t { int mon_tick_interval; int mon_osd_down_out_interval; float mon_lease; + float mon_lease_renew_interval; + float mon_lease_ack_timeout; + float mon_accept_timeout; bool mon_stop_with_last_mds; // client diff --git a/branches/sage/cephmds2/ebofs/Ebofs.cc b/branches/sage/cephmds2/ebofs/Ebofs.cc index a7c192b390100..f315d0385016f 100644 --- a/branches/sage/cephmds2/ebofs/Ebofs.cc +++ b/branches/sage/cephmds2/ebofs/Ebofs.cc @@ -16,6 +16,8 @@ #include "Ebofs.h" +#include "FileJournal.h" + #include #ifndef DARWIN @@ -50,6 +52,7 @@ int Ebofs::mount() ebofs_lock.Lock(); assert(!mounted); + // open dev int r = dev.open(&idle_kicker); if (r < 0) { ebofs_lock.Unlock(); @@ -79,6 +82,8 @@ int Ebofs::mount() dout(3) << "mount epoch " << super_epoch << endl; assert(super_epoch == sb->epoch); + super_fsid = sb->fsid; + free_blocks = sb->free_blocks; limbo_blocks = sb->limbo_blocks; @@ -101,6 +106,43 @@ int Ebofs::mount() allocator.release_limbo(); + + // open journal? + if (journalfn) { + journal = new FileJournal(this, journalfn); + if (journal->open() < 0) { + dout(-3) << "mount journal " << journalfn << " open failed" << endl; + delete journal; + journal = 0; + } else { + dout(-3) << "mount journal " << journalfn << " opened, replaying" << endl; + + while (1) { + bufferlist bl; + epoch_t e; + if (!journal->read_entry(bl, e)) { + dout(-3) << "mount replay: end of journal, done." << endl; + break; + } + + if (e < super_epoch) { + dout(-3) << "mount replay: skipping old entry in epoch " << e << " < " << super_epoch << endl; + } + if (e == super_epoch+1) { + super_epoch++; + dout(-3) << "mount replay: jumped to next epoch " << super_epoch << endl; + } + assert(e == super_epoch); + + dout(-3) << "mount replay: applying transaction in epoch " << e << endl; + Transaction t; + int off = 0; + t._decode(bl, off); + _apply_transaction(t); + } + } + } + dout(3) << "mount starting commit+finisher threads" << endl; commit_thread.create(); finisher_thread.create(); @@ -108,6 +150,7 @@ int Ebofs::mount() dout(1) << "mounted " << dev.get_device_name() << " " << dev.get_num_blocks() << " blocks, " << nice_blocks(dev.get_num_blocks()) << endl; mounted = true; + ebofs_lock.Unlock(); return 0; } @@ -126,6 +169,10 @@ int Ebofs::mkfs() block_t num_blocks = dev.get_num_blocks(); + // make a super-random fsid + srand(time(0) ^ getpid()); + super_fsid = (lrand48() << 32) ^ mrand48(); + free_blocks = 0; limbo_blocks = 0; @@ -197,6 +244,18 @@ int Ebofs::mkfs() dev.close(); + + // create journal? + if (journalfn) { + journal = new FileJournal(this, journalfn); + if (journal->create() < 0) { + dout(3) << "mount journal " << journalfn << " created failed" << endl; + delete journal; + } else { + dout(3) << "mount journal " << journalfn << " created" << endl; + } + } + dout(2) << "mkfs: " << dev.get_device_name() << " " << dev.get_num_blocks() << " blocks, " << nice_blocks(dev.get_num_blocks()) << endl; ebofs_lock.Unlock(); return 0; @@ -272,6 +331,7 @@ void Ebofs::prepare_super(version_t epoch, bufferptr& bp) // fill in super memset(&sb, 0, sizeof(sb)); sb.s_magic = EBOFS_MAGIC; + sb.fsid = super_fsid; sb.epoch = epoch; sb.num_blocks = dev.get_num_blocks(); @@ -409,6 +469,7 @@ int Ebofs::commit_thread_entry() << ", max dirty " << g_conf.ebofs_bc_max_dirty << endl; + if (journal) journal->commit_epoch_start(); // (async) write onodes+condes (do this first; it currently involves inode reallocation) commit_inodes_start(); @@ -453,14 +514,14 @@ int Ebofs::commit_thread_entry() alloc_more_node_space(); } + // signal journal + if (journal) journal->commit_epoch_finish(); + // kick waiters dout(10) << "commit_thread queueing commit + kicking sync waiters" << endl; - finisher_lock.Lock(); - finisher_queue.splice(finisher_queue.end(), commit_waiters[super_epoch-1]); + queue_finishers(commit_waiters[super_epoch-1]); commit_waiters.erase(super_epoch-1); - finisher_cond.Signal(); - finisher_lock.Unlock(); sync_cond.Signal(); @@ -1222,7 +1283,18 @@ void Ebofs::sync(Context *onsafe) ebofs_lock.Lock(); if (onsafe) { dirty = true; - commit_waiters[super_epoch].push_back(onsafe); + + while (1) { + if (journal) { + // journal empty transaction + Transaction t; + bufferlist bl; + t._encode(bl); + if (journal->submit_entry(bl, onsafe)) break; + } + commit_waiters[super_epoch].push_back(onsafe); + break; + } } ebofs_lock.Unlock(); } @@ -1994,6 +2066,29 @@ unsigned Ebofs::apply_transaction(Transaction& t, Context *onsafe) ebofs_lock.Lock(); dout(7) << "apply_transaction start (" << t.ops.size() << " ops)" << endl; + unsigned r = _apply_transaction(t); + + // journal, wait for commit + if (r != 0 && onsafe) { + delete onsafe; // kill callback, but still journal below (in case transaction had side effects) + onsafe = 0; + } + while (1) { + if (journal) { + bufferlist bl; + t._encode(bl); + if (journal->submit_entry(bl, onsafe)) break; + } + if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + break; + } + + ebofs_lock.Unlock(); + return r; +} + +unsigned Ebofs::_apply_transaction(Transaction& t) +{ // do ops unsigned r = 0; // bit fields indicate which ops failed. int bit = 1; @@ -2028,7 +2123,7 @@ unsigned Ebofs::apply_transaction(Transaction& t, Context *onsafe) case Transaction::OP_GETATTR: { object_t oid = t.oids.front(); t.oids.pop_front(); - const char *attrname = t.attrnames.front(); t.attrnames.pop_front(); + const char *attrname = t.get_attrname(); t.pop_attrname(); pair pattrval = t.pattrvals.front(); t.pattrvals.pop_front(); if ((*(pattrval.second) = _getattr(oid, attrname, pattrval.first, *(pattrval.second))) < 0) { dout(7) << "apply_transaction fail on _getattr" << endl; @@ -2095,7 +2190,7 @@ unsigned Ebofs::apply_transaction(Transaction& t, Context *onsafe) case Transaction::OP_SETATTR: { object_t oid = t.oids.front(); t.oids.pop_front(); - const char *attrname = t.attrnames.front(); t.attrnames.pop_front(); + const char *attrname = t.get_attrname(); t.pop_attrname(); //pair attrval = t.attrvals.front(); t.attrvals.pop_front(); bufferlist bl; bl.claim( t.attrbls.front() ); @@ -2121,7 +2216,7 @@ unsigned Ebofs::apply_transaction(Transaction& t, Context *onsafe) case Transaction::OP_RMATTR: { object_t oid = t.oids.front(); t.oids.pop_front(); - const char *attrname = t.attrnames.front(); t.attrnames.pop_front(); + const char *attrname = t.get_attrname(); t.pop_attrname(); if (_rmattr(oid, attrname) < 0) { dout(7) << "apply_transaction fail on _rmattr" << endl; r &= bit; @@ -2185,7 +2280,7 @@ unsigned Ebofs::apply_transaction(Transaction& t, Context *onsafe) case Transaction::OP_COLL_SETATTR: { coll_t cid = t.cids.front(); t.cids.pop_front(); - const char *attrname = t.attrnames.front(); t.attrnames.pop_front(); + const char *attrname = t.get_attrname(); t.pop_attrname(); //pair attrval = t.attrvals.front(); t.attrvals.pop_front(); bufferlist bl; bl.claim( t.attrbls.front() ); @@ -2201,7 +2296,7 @@ unsigned Ebofs::apply_transaction(Transaction& t, Context *onsafe) case Transaction::OP_COLL_RMATTR: { coll_t cid = t.cids.front(); t.cids.pop_front(); - const char *attrname = t.attrnames.front(); t.attrnames.pop_front(); + const char *attrname = t.get_attrname(); t.pop_attrname(); if (_collection_rmattr(cid, attrname) < 0) { dout(7) << "apply_transaction fail on _collection_rmattr" << endl; r &= bit; @@ -2217,16 +2312,7 @@ unsigned Ebofs::apply_transaction(Transaction& t, Context *onsafe) bit = bit << 1; } - dout(7) << "apply_transaction finish (r = " << r << ")" << endl; - - // set up commit waiter - //if (r == 0) { - if (onsafe) commit_waiters[super_epoch].push_back(onsafe); - //} else { - //if (onsafe) delete onsafe; - //} - - ebofs_lock.Unlock(); + dout(7) << "_apply_transaction finish (r = " << r << ")" << endl; return r; } @@ -2295,36 +2381,6 @@ int Ebofs::_write(object_t oid, off_t offset, size_t length, bufferlist& bl) } -/*int Ebofs::write(object_t oid, - off_t off, size_t len, - bufferlist& bl, bool fsync) -{ - // wait? - if (fsync) { - // wait for flush. - Cond cond; - bool done; - int flush = 1; // write never returns positive - Context *c = new C_Cond(&cond, &done, &flush); - int r = write(oid, off, len, bl, c); - if (r < 0) return r; - - ebofs_lock.Lock(); - { - while (!done) - cond.Wait(ebofs_lock); - assert(flush <= 0); - } - ebofs_lock.Unlock(); - if (flush < 0) return flush; - return r; - } else { - // don't wait for flush. - return write(oid, off, len, bl, (Context*)0); - } -} -*/ - int Ebofs::write(object_t oid, off_t off, size_t len, bufferlist& bl, Context *onsafe) @@ -2338,7 +2394,17 @@ int Ebofs::write(object_t oid, // commit waiter if (r > 0) { assert((size_t)r == len); - if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + while (1) { + if (journal) { + Transaction t; + t.write(oid, off, len, bl); + bufferlist bl; + t._encode(bl); + if (journal->submit_entry(bl, onsafe)) break; + } + if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + break; + } } else { if (onsafe) delete onsafe; } @@ -2370,9 +2436,19 @@ int Ebofs::remove(object_t oid, Context *onsafe) // do it int r = _remove(oid); - // set up commit waiter + // journal, wait for commit if (r >= 0) { - if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + while (1) { + if (journal) { + Transaction t; + t.remove(oid); + bufferlist bl; + t._encode(bl); + if (journal->submit_entry(bl, onsafe)) break; + } + if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + break; + } } else { if (onsafe) delete onsafe; } @@ -2445,9 +2521,19 @@ int Ebofs::truncate(object_t oid, off_t size, Context *onsafe) int r = _truncate(oid, size); - // set up commit waiter + // journal, wait for commit if (r >= 0) { - if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + while (1) { + if (journal) { + Transaction t; + t.truncate(oid, size); + bufferlist bl; + t._encode(bl); + if (journal->submit_entry(bl, onsafe)) break; + } + if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + break; + } } else { if (onsafe) delete onsafe; } @@ -2464,9 +2550,19 @@ int Ebofs::clone(object_t from, object_t to, Context *onsafe) int r = _clone(from, to); - // set up commit waiter + // journal, wait for commit if (r >= 0) { - if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + while (1) { + if (journal) { + Transaction t; + t.clone(from, to); + bufferlist bl; + t._encode(bl); + if (journal->submit_entry(bl, onsafe)) break; + } + if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + break; + } } else { if (onsafe) delete onsafe; } @@ -2640,9 +2736,19 @@ int Ebofs::setattr(object_t oid, const char *name, const void *value, size_t siz ebofs_lock.Lock(); int r = _setattr(oid, name, value, size); - // set up commit waiter + // journal, wait for commit if (r >= 0) { - if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + while (1) { + if (journal) { + Transaction t; + t.setattr(oid, name, value, size); + bufferlist bl; + t._encode(bl); + if (journal->submit_entry(bl, onsafe)) break; + } + if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + break; + } } else { if (onsafe) delete onsafe; } @@ -2673,9 +2779,19 @@ int Ebofs::setattrs(object_t oid, map& attrset, Context *onsaf ebofs_lock.Lock(); int r = _setattrs(oid, attrset); - // set up commit waiter + // journal, wait for commit if (r >= 0) { - if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + while (1) { + if (journal) { + Transaction t; + t.setattrs(oid, attrset); + bufferlist bl; + t._encode(bl); + if (journal->submit_entry(bl, onsafe)) break; + } + if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + break; + } } else { if (onsafe) delete onsafe; } @@ -2758,9 +2874,19 @@ int Ebofs::rmattr(object_t oid, const char *name, Context *onsafe) int r = _rmattr(oid, name); - // set up commit waiter + // journal, wait for commit if (r >= 0) { - if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + while (1) { + if (journal) { + Transaction t; + t.rmattr(oid, name); + bufferlist bl; + t._encode(bl); + if (journal->submit_entry(bl, onsafe)) break; + } + if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + break; + } } else { if (onsafe) delete onsafe; } @@ -2835,9 +2961,19 @@ int Ebofs::create_collection(coll_t cid, Context *onsafe) int r = _create_collection(cid); - // set up commit waiter + // journal, wait for commit if (r >= 0) { - if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + while (1) { + if (journal) { + Transaction t; + t.create_collection(cid); + bufferlist bl; + t._encode(bl); + if (journal->submit_entry(bl, onsafe)) break; + } + if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + break; + } } else { if (onsafe) delete onsafe; } @@ -2882,9 +3018,19 @@ int Ebofs::destroy_collection(coll_t cid, Context *onsafe) int r = _destroy_collection(cid); - // set up commit waiter + // journal, wait for commit if (r >= 0) { - if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + while (1) { + if (journal) { + Transaction t; + t.remove_collection(cid); + bufferlist bl; + t._encode(bl); + if (journal->submit_entry(bl, onsafe)) break; + } + if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + break; + } } else { if (onsafe) delete onsafe; } @@ -2936,9 +3082,19 @@ int Ebofs::collection_add(coll_t cid, object_t oid, Context *onsafe) int r = _collection_add(cid, oid); - // set up commit waiter + // journal, wait for commit if (r >= 0) { - if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + while (1) { + if (journal) { + Transaction t; + t.collection_add(cid, oid); + bufferlist bl; + t._encode(bl); + if (journal->submit_entry(bl, onsafe)) break; + } + if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + break; + } } else { if (onsafe) delete onsafe; } @@ -2977,9 +3133,19 @@ int Ebofs::collection_remove(coll_t cid, object_t oid, Context *onsafe) int r = _collection_remove(cid, oid); - // set up commit waiter + // journal, wait for commit if (r >= 0) { - if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + while (1) { + if (journal) { + Transaction t; + t.collection_remove(cid, oid); + bufferlist bl; + t._encode(bl); + if (journal->submit_entry(bl, onsafe)) break; + } + if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + break; + } } else { if (onsafe) delete onsafe; } @@ -3040,9 +3206,19 @@ int Ebofs::collection_setattr(coll_t cid, const char *name, const void *value, s int r = _collection_setattr(cid, name, value, size); - // set up commit waiter + // journal, wait for commit if (r >= 0) { - if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + while (1) { + if (journal) { + Transaction t; + t.collection_setattr(cid, name, value, size); + bufferlist bl; + t._encode(bl); + if (journal->submit_entry(bl, onsafe)) break; + } + if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + break; + } } else { if (onsafe) delete onsafe; } @@ -3098,9 +3274,19 @@ int Ebofs::collection_rmattr(coll_t cid, const char *name, Context *onsafe) int r = _collection_rmattr(cid, name); - // set up commit waiter + // journal, wait for commit if (r >= 0) { - if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + while (1) { + if (journal) { + Transaction t; + t.collection_rmattr(cid, name); + bufferlist bl; + t._encode(bl); + if (journal->submit_entry(bl, onsafe)) break; + } + if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + break; + } } else { if (onsafe) delete onsafe; } diff --git a/branches/sage/cephmds2/ebofs/Ebofs.h b/branches/sage/cephmds2/ebofs/Ebofs.h index 91c5eb51b3cda..eb20cf8920531 100644 --- a/branches/sage/cephmds2/ebofs/Ebofs.h +++ b/branches/sage/cephmds2/ebofs/Ebofs.h @@ -29,6 +29,7 @@ using namespace __gnu_cxx; #include "nodes.h" #include "Allocator.h" #include "Table.h" +#include "Journal.h" #include "common/Mutex.h" #include "common/Cond.h" @@ -40,20 +41,23 @@ typedef pair coll_object_t; class Ebofs : public ObjectStore { - protected: +protected: Mutex ebofs_lock; // a beautiful global lock // ** debuggy ** bool fake_writes; // ** super ** +public: BlockDevice dev; +protected: bool mounted, unmounting, dirty; bool readonly; version_t super_epoch; bool commit_thread_started, mid_commit; Cond commit_cond; // to wake up the commit thread Cond sync_cond; + uint64_t super_fsid; map > commit_waiters; @@ -71,9 +75,16 @@ class Ebofs : public ObjectStore { } } commit_thread; - +public: + uint64_t get_fsid() { return super_fsid; } + epoch_t get_super_epoch() { return super_epoch; } +protected: + // ** journal ** + char *journalfn; + Journal *journal; + // ** allocator ** block_t free_blocks, limbo_blocks; Allocator allocator; @@ -188,6 +199,21 @@ class Ebofs : public ObjectStore { bool finisher_stop; list finisher_queue; +public: + void queue_finisher(Context *c) { + finisher_lock.Lock(); + finisher_queue.push_back(c); + finisher_cond.Signal(); + finisher_lock.Unlock(); + } + void queue_finishers(list& ls) { + finisher_lock.Lock(); + finisher_queue.splice(finisher_queue.end(), ls); + finisher_cond.Signal(); + finisher_lock.Unlock(); + } +protected: + void *finisher_thread_entry(); class FinisherThread : public Thread { Ebofs *ebofs; @@ -204,12 +230,13 @@ class Ebofs : public ObjectStore { public: - Ebofs(char *devfn) : + Ebofs(char *devfn, char *jfn=0) : fake_writes(false), dev(devfn), mounted(false), unmounting(false), dirty(false), readonly(false), super_epoch(0), commit_thread_started(false), mid_commit(false), commit_thread(this), + journalfn(jfn), journal(0), free_blocks(0), limbo_blocks(0), allocator(this), nodepool(ebofs_lock), @@ -222,6 +249,11 @@ class Ebofs : public ObjectStore { finisher_stop(false), finisher_thread(this) { for (int i=0; i + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ #include "FileJournal.h" #include "Ebofs.h" -#include "config.h" -#define dout(x) if (x <= g_conf.debug_ebofs) cout << "ebofs(" << dev.get_device_name() << ").journal " -#define derr(x) if (x <= g_conf.debug_ebofs) cerr << "ebofs(" << dev.get_device_name() << ").journal " +#include +#include +#include +#include + +#include "config.h" +#undef dout +#define dout(x) if (true || x <= g_conf.debug_ebofs) cout << "ebofs(" << ebofs->dev.get_device_name() << ").journal " +#define derr(x) if (x <= g_conf.debug_ebofs) cerr << "ebofs(" << ebofs->dev.get_device_name() << ").journal " -void FileJournal::create() +int FileJournal::create() { dout(1) << "create " << fn << endl; // open/create - fd = ::open(fn.c_str(), O_CREAT|O_WRONLY); + fd = ::open(fn.c_str(), O_RDWR|O_SYNC); + if (fd < 0) { + dout(1) << "create failed " << errno << " " << strerror(errno) << endl; + return -errno; + } assert(fd > 0); - ::ftruncate(fd); - ::fchmod(fd, 0644); + //::ftruncate(fd, 0); + //::fchmod(fd, 0644); + + // get size + struct stat st; + ::fstat(fd, &st); + dout(1) << "open " << fn << " " << st.st_size << " bytes" << endl; + + // write empty header + header.clear(); + header.fsid = ebofs->get_fsid(); + header.max_size = st.st_size; + write_header(); + + // writeable. + read_pos = 0; + write_pos = queue_pos = sizeof(header); ::close(fd); -} + return 0; +} -void FileJournal::open() +int FileJournal::open() { - dout(1) << "open " << fn << endl; + //dout(1) << "open " << fn << endl; // open and file assert(fd == 0); - fd = ::open(fn.c_str(), O_RDWR); + fd = ::open(fn.c_str(), O_RDWR|O_SYNC); + if (fd < 0) { + dout(1) << "open failed " << errno << " " << strerror(errno) << endl; + return -errno; + } assert(fd > 0); - // read header? - // *** + // assume writeable, unless... + read_pos = 0; + write_pos = queue_pos = sizeof(header); + // read header? + read_header(); + if (header.num > 0 && header.fsid == ebofs->get_fsid()) { + // valid header, pick an offset + for (int i=0; iget_super_epoch()) { + dout(2) << "using read_pos header pointer " + << header.epoch[i] << " at " << header.offset[i] + << endl; + read_pos = header.offset[i]; + write_pos = queue_pos = 0; + break; + } + else if (header.epoch[i] < ebofs->get_super_epoch()) { + dout(2) << "super_epoch is " << ebofs->get_super_epoch() + << ", skipping old " << header.epoch[i] << " at " << header.offset[i] + << endl; + } + else if (header.epoch[i] > ebofs->get_super_epoch()) { + dout(2) << "super_epoch is " << ebofs->get_super_epoch() + << ", but wtf, journal is later " << header.epoch[i] << " at " << header.offset[i] + << endl; + break; + } + } + } start_writer(); + + return 0; } void FileJournal::close() @@ -49,7 +119,8 @@ void FileJournal::close() stop_writer(); // close - assert(q.empty()); + assert(writeq.empty()); + assert(commitq.empty()); assert(fd > 0); ::close(fd); fd = 0; @@ -73,12 +144,36 @@ void FileJournal::stop_writer() } +void FileJournal::print_header() +{ + for (int i=0; i::const_iterator it = bl.buffers().begin(); it != bl.buffers().end(); @@ -124,14 +231,21 @@ void FileJournal::write_thread_entry() if ((*it).length() == 0) continue; // blank buffer. ::write(fd, (char*)(*it).c_str(), (*it).length() ); } + + ::write(fd, &h, sizeof(h)); // move position pointer - bottom += sizeof(epoch_t) + sizeof(uint32_t) + e.length(); + write_pos += 2*sizeof(entry_header_t) + bl.length(); - // do commit callback if (oncommit) { - oncommit->finish(0); - delete oncommit; + if (1) { + // queue callback + ebofs->queue_finisher(oncommit); + } else { + // callback now + oncommit->finish(0); + delete oncommit; + } } } } @@ -140,61 +254,202 @@ void FileJournal::write_thread_entry() dout(10) << "write_thread_entry finish" << endl; } -void FileJournal::submit_entry(bufferlist& e, Context *oncommit) +bool FileJournal::submit_entry(bufferlist& e, Context *oncommit) { - dout(10) << "submit_entry " << bottom << " : " << e.length() - << " epoch " << ebofs->super_epoch + assert(queue_pos != 0); // bad create(), or journal didn't replay to completion. + + // ** lock ** + Mutex::Locker locker(write_lock); + + // wrap? full? + off_t size = 2*sizeof(entry_header_t) + e.length(); + + if (full) return false; // already marked full. + + if (header.wrap) { + // we're wrapped. don't overwrite ourselves. + if (queue_pos + size >= header.offset[0]) { + dout(10) << "submit_entry JOURNAL FULL (and wrapped), " << queue_pos << "+" << size + << " >= " << header.offset[0] + << endl; + full = true; + print_header(); + return false; + } + } else { + // we haven't wrapped. + if (queue_pos + size >= header.max_size) { + // is there room if we wrap? + if ((off_t)sizeof(header_t) + size < header.offset[0]) { + // yes! + dout(10) << "submit_entry wrapped from " << queue_pos << " to " << sizeof(header_t) << endl; + header.wrap = queue_pos; + queue_pos = sizeof(header_t); + header.push(ebofs->get_super_epoch(), queue_pos); + } else { + // no room. + dout(10) << "submit_entry JOURNAL FULL (and can't wrap), " << queue_pos << "+" << size + << " >= " << header.max_size + << endl; + full = true; + return false; + } + } + } + + dout(10) << "submit_entry " << queue_pos << " : " << e.length() + << " epoch " << ebofs->get_super_epoch() << " " << oncommit << endl; // dump on queue - writeq.push_back(pair(ebofs->super_epoch, e)); + writeq.push_back(pair(ebofs->get_super_epoch(), e)); commitq.push_back(oncommit); - + + queue_pos += size; + // kick writer thread write_cond.Signal(); + + return true; } void FileJournal::commit_epoch_start() { - dout(10) << "commit_epoch_start" << endl; + dout(10) << "commit_epoch_start on " << ebofs->get_super_epoch()-1 + << " -- new epoch " << ebofs->get_super_epoch() + << endl; - write_lock.Lock(); - { - header.epoch2 = ebofs->super_epoch; - header.top2 = bottom; - write_header(); - } - write_lock.Unlock(); + Mutex::Locker locker(write_lock); + + // was full -> empty -> now usable? + if (full) { + if (header.num != 0) { + dout(1) << " journal FULL, ignoring this epoch" << endl; + return; + } + + dout(1) << " clearing FULL flag, journal now usable" << endl; + full = false; + } + + // note epoch boundary + header.push(ebofs->get_super_epoch(), queue_pos); // note: these entries may not yet be written. + //write_header(); // no need to write it now, though... } void FileJournal::commit_epoch_finish() { - dout(10) << "commit_epoch_finish" << endl; + dout(10) << "commit_epoch_finish committed " << ebofs->get_super_epoch()-1 << endl; write_lock.Lock(); { - // update header - header.epoch1 = ebofs->super_epoch; - header.top1 = header.top2; - header.epoch2 = 0; - header.top2 = 0; + if (full) { + // full journal damage control. + dout(15) << " journal was FULL, contents now committed, clearing header. journal still not usable until next epoch." << endl; + header.clear(); + write_pos = queue_pos = sizeof(header_t); + } else { + // update header -- trim/discard old (committed) epochs + while (header.epoch[0] < ebofs->get_super_epoch()) + header.pop(); + } write_header(); - // flush any unwritten items in previous epoch - while (!writeq.empty() && - writeq.front().first < ebofs->super_epoch) { - dout(15) << " dropping uncommitted journal item from prior epoch" << endl; - writeq.pop_front(); + // discard any unwritten items in previous epoch, and do callbacks + epoch_t epoch = ebofs->get_super_epoch(); + list callbacks; + while (!writeq.empty() && writeq.front().first < epoch) { + dout(15) << " dropping unwritten and committed " + << write_pos << " : " << writeq.front().second.length() + << " epoch " << writeq.front().first + << endl; + // finisher? Context *oncommit = commitq.front(); + if (oncommit) callbacks.push_back(oncommit); + + write_pos += 2*sizeof(entry_header_t) + writeq.front().second.length(); + + // discard. + writeq.pop_front(); commitq.pop_front(); - - if (oncommit) { - oncommit->finish(0); - delete oncommit; - } } + + // queue the finishers + ebofs->queue_finishers(callbacks); } write_lock.Unlock(); } + + +void FileJournal::make_writeable() +{ + if (read_pos) + write_pos = queue_pos = read_pos; + else + write_pos = queue_pos = sizeof(header_t); + read_pos = 0; +} + + +bool FileJournal::read_entry(bufferlist& bl, epoch_t& epoch) +{ + if (!read_pos) { + dout(1) << "read_entry -- not readable" << endl; + make_writeable(); + return false; + } + + if (read_pos == header.wrap) { + // find wrap point + for (int i=1; i= 2 && offset[0] > offset[1]) + wrap = 0; // we're eliminating a wrap + num--; + for (int i=0; iwrite_thread(); + journal->write_thread_entry(); return 0; } } write_thread; public: - FileJournal(Ebofs *e, char *f, off_t sz) : - Journal(e), - fn(f), max_size(sz), - top(0), bottom(0), committing_to(0), + FileJournal(Ebofs *e, char *f) : + Journal(e), fn(f), + full(false), + write_pos(0), queue_pos(0), read_pos(0), fd(0), - write_stop(false), write_thread(this) - { } + write_stop(false), write_thread(this) { } ~FileJournal() {} - void create(); - void open(); + int create(); + int open(); void close(); // writes - void submit_entry(bufferlist& e, Context *oncommit); // submit an item - void commit_epoch_start(); // mark epoch boundary - void commit_epoch_finish(); // mark prior epoch as committed (we can expire) + bool submit_entry(bufferlist& e, Context *oncommit); // submit an item + void commit_epoch_start(); // mark epoch boundary + void commit_epoch_finish(); // mark prior epoch as committed (we can expire) + + bool read_entry(bufferlist& bl, epoch_t& e); // reads }; diff --git a/branches/sage/cephmds2/ebofs/Journal.h b/branches/sage/cephmds2/ebofs/Journal.h index c05bce5955c5f..fb1983c22eafc 100644 --- a/branches/sage/cephmds2/ebofs/Journal.h +++ b/branches/sage/cephmds2/ebofs/Journal.h @@ -16,22 +16,28 @@ #ifndef __EBOFS_JOURNAL_H #define __EBOFS_JOURNAL_H +class Ebofs; + +#include "include/buffer.h" +#include "include/Context.h" class Journal { +protected: Ebofs *ebofs; - public: +public: Journal(Ebofs *e) : ebofs(e) { } virtual ~Journal() { } - virtual void create() = 0; - virtual void open() = 0; + virtual int create() = 0; + virtual int open() = 0; virtual void close() = 0; // writes - virtual void submit_entry(bufferlist& e, Context *oncommit) = 0;// submit an item + virtual bool submit_entry(bufferlist& e, Context *oncommit) = 0;// submit an item virtual void commit_epoch_start() = 0; // mark epoch boundary - virtual void commit_epoch_finish(list& ls) = 0; // mark prior epoch as committed (we can expire) + virtual void commit_epoch_finish() = 0; // mark prior epoch as committed (we can expire) + virtual bool read_entry(bufferlist& bl, epoch_t &e) = 0; // reads/recovery diff --git a/branches/sage/cephmds2/ebofs/test.ebofs.cc b/branches/sage/cephmds2/ebofs/test.ebofs.cc index 704ec16581824..345f49b7a68ca 100644 --- a/branches/sage/cephmds2/ebofs/test.ebofs.cc +++ b/branches/sage/cephmds2/ebofs/test.ebofs.cc @@ -145,6 +145,7 @@ int main(int argc, char **argv) char *filename = args[0]; int seconds = atoi(args[1]); int threads = atoi(args[2]); + if (!threads) threads = 1; cout << "dev " << filename << " .. " << threads << " threads .. " << seconds << " seconds" << endl; @@ -153,7 +154,7 @@ int main(int argc, char **argv) // explicit tests - if (1) { + if (0) { // verify that clone() plays nice with partial writes object_t oid(1,1); bufferptr bp(10000); diff --git a/branches/sage/cephmds2/ebofs/types.h b/branches/sage/cephmds2/ebofs/types.h index b03bb8a40d9c9..1fa209a3deeb9 100644 --- a/branches/sage/cephmds2/ebofs/types.h +++ b/branches/sage/cephmds2/ebofs/types.h @@ -142,15 +142,16 @@ static const int EBOFS_FREE_BUCKET_BITS = 2; struct ebofs_super { - unsigned s_magic; - - unsigned epoch; // version of this superblock. + uint64_t s_magic; + uint64_t fsid; + + epoch_t epoch; // version of this superblock. - unsigned num_blocks; /* # blocks in filesystem */ + uint64_t num_blocks; /* # blocks in filesystem */ // some basic stats, for kicks - unsigned free_blocks; /* unused blocks */ - unsigned limbo_blocks; /* limbo blocks */ + uint64_t free_blocks; /* unused blocks */ + uint64_t limbo_blocks; /* limbo blocks */ //unsigned num_objects; //unsigned num_fragmented; diff --git a/branches/sage/cephmds2/include/Context.h b/branches/sage/cephmds2/include/Context.h index 7f2f30104407b..d73a1d8c739b6 100644 --- a/branches/sage/cephmds2/include/Context.h +++ b/branches/sage/cephmds2/include/Context.h @@ -44,11 +44,14 @@ inline void finish_contexts(std::list& finished, using std::cout; using std::endl; + list ls; if (finished.empty()) return; - dout(10) << finished.size() << " contexts to finish with " << result << endl; - for (std::list::iterator it = finished.begin(); - it != finished.end(); + ls.swap(finished); // swap out of place to avoid weird loops + + dout(10) << ls.size() << " contexts to finish with " << result << endl; + for (std::list::iterator it = ls.begin(); + it != ls.end(); it++) { Context *c = *it; dout(10) << "---- " << c << endl; diff --git a/branches/sage/cephmds2/include/buffer.h b/branches/sage/cephmds2/include/buffer.h index 1f401513f688c..1f61d3b892ba5 100644 --- a/branches/sage/cephmds2/include/buffer.h +++ b/branches/sage/cephmds2/include/buffer.h @@ -919,6 +919,14 @@ inline void _decode(std::string& s, bufferlist& bl, int& off) off += len+1; } +// const char* (encode only, string compatible) +inline void _encode(const char *s, bufferlist& bl) +{ + uint32_t len = strlen(s); + _encoderaw(len, bl); + bl.append(s, len+1); +} + // bufferptr (encapsulated) inline void _encode(bufferptr& bp, bufferlist& bl) { diff --git a/branches/sage/cephmds2/mds/Migrator.cc b/branches/sage/cephmds2/mds/Migrator.cc index e0cc389bdfd8e..2350b1a518fa1 100644 --- a/branches/sage/cephmds2/mds/Migrator.cc +++ b/branches/sage/cephmds2/mds/Migrator.cc @@ -366,7 +366,7 @@ void Migrator::show_importing() for (map::iterator p = import_state.begin(); p != import_state.end(); p++) { - CDir *dir = mdcache->get_dirfrag(p->first); + CDir *dir = mds->mdcache->get_dirfrag(p->first); if (dir) { dout(10) << "importing: (" << p->second << ") " << get_import_statename(p->second) << " " << p->first @@ -377,6 +377,7 @@ void Migrator::show_importing() << " " << *dir << endl; } + } } void Migrator::show_exporting() @@ -386,7 +387,7 @@ void Migrator::show_exporting() p != export_state.end(); p++) dout(10) << "exporting: (" << p->second << ") " << get_export_statename(p->second) - << " " << p->first->get_dirfrag() + << " " << p->first->dirfrag() << " " << *p->first << endl; } diff --git a/branches/sage/cephmds2/messages/MMonElection.h b/branches/sage/cephmds2/messages/MMonElection.h new file mode 100644 index 0000000000000..14a29af9140f9 --- /dev/null +++ b/branches/sage/cephmds2/messages/MMonElection.h @@ -0,0 +1,63 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + + +#ifndef __MMONELECTION_H +#define __MMONELECTION_H + +#include "msg/Message.h" + + +class MMonElection : public Message { +public: + static const int OP_PROPOSE = 1; + static const int OP_ACK = 2; + static const int OP_NAK = 3; + static const int OP_VICTORY = 4; + static const char *get_opname(int o) { + switch (o) { + case OP_PROPOSE: return "propose"; + case OP_ACK: return "ack"; + case OP_NAK: return "nak"; + case OP_VICTORY: return "victory"; + default: assert(0); return 0; + } + } + + int32_t op; + epoch_t epoch; + + MMonElection() : Message(MSG_MON_ELECTION) {} + MMonElection(int o, epoch_t e) : + Message(MSG_MON_ELECTION), + op(o), epoch(e) {} + + char *get_type_name() { return "election"; } + void print(ostream& out) { + out << "election(" << get_opname(op) << " " << epoch << ")"; + } + + void encode_payload() { + ::_encode(op, payload); + ::_encode(epoch, payload); + } + void decode_payload() { + int off = 0; + ::_decode(op, payload, off); + ::_decode(epoch, payload, off); + } + +}; + +#endif diff --git a/branches/sage/cephmds2/messages/MMonElectionPropose.h b/branches/sage/cephmds2/messages/MMonElectionPropose.h deleted file mode 100644 index 7ec54b67332b9..0000000000000 --- a/branches/sage/cephmds2/messages/MMonElectionPropose.h +++ /dev/null @@ -1,33 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2004-2006 Sage Weil - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - - -#ifndef __MMONELECTIONPROPOSE_H -#define __MMONELECTIONPROPOSE_H - -#include "msg/Message.h" - - -class MMonElectionPropose : public Message { - public: - MMonElectionPropose() : Message(MSG_MON_ELECTION_PROPOSE) {} - - virtual char *get_type_name() { return "election_propose"; } - - void encode_payload() {} - void decode_payload() {} - -}; - -#endif diff --git a/branches/sage/cephmds2/messages/MMonElectionVictory.h b/branches/sage/cephmds2/messages/MMonElectionVictory.h deleted file mode 100644 index 47211ae501db5..0000000000000 --- a/branches/sage/cephmds2/messages/MMonElectionVictory.h +++ /dev/null @@ -1,41 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2004-2006 Sage Weil - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - - -#ifndef __MMONELECTIONVICTORY_H -#define __MMONELECTIONVICTORY_H - -#include "msg/Message.h" - - -class MMonElectionVictory : public Message { - public: - //set active_set; - - MMonElectionVictory(/*set& as*/) : Message(MSG_MON_ELECTION_VICTORY)//, - //active_set(as) - {} - - virtual char *get_type_name() { return "election_victory"; } - - void encode_payload() { - //::_encode(active_set, payload); - } - void decode_payload() { - //int off = 0; - //::_decode(active_set, payload, off); - } -}; - -#endif diff --git a/branches/sage/cephmds2/messages/MMonPaxos.h b/branches/sage/cephmds2/messages/MMonPaxos.h index 40cdadc81c57d..b33012336901f 100644 --- a/branches/sage/cephmds2/messages/MMonPaxos.h +++ b/branches/sage/cephmds2/messages/MMonPaxos.h @@ -17,15 +17,18 @@ #define __MMONPAXOS_H #include "msg/Message.h" +#include "mon/mon_types.h" class MMonPaxos : public Message { public: // op types - const static int OP_COLLECT = 1; // proposer: propose round - const static int OP_LAST = 2; // voter: accept proposed round - const static int OP_BEGIN = 4; // proposer: value proposed for this round - const static int OP_ACCEPT = 5; // voter: accept propsed value - const static int OP_COMMIT = 7; // proposer: notify learners of agreed value + const static int OP_COLLECT = 1; // proposer: propose round + const static int OP_LAST = 2; // voter: accept proposed round + const static int OP_BEGIN = 3; // proposer: value proposed for this round + const static int OP_ACCEPT = 4; // voter: accept propsed value + const static int OP_COMMIT = 5; // proposer: notify learners of agreed value + const static int OP_LEASE = 6; // leader: extend peon lease + const static int OP_LEASE_ACK = 7; // peon: lease ack const static char *get_opname(int op) { switch (op) { case OP_COLLECT: return "collect"; @@ -33,52 +36,61 @@ class MMonPaxos : public Message { case OP_BEGIN: return "begin"; case OP_ACCEPT: return "accept"; case OP_COMMIT: return "commit"; + case OP_LEASE: return "lease"; + case OP_LEASE_ACK: return "lease_ack"; default: assert(0); return 0; } } - // which state machine? - int op; - int machine_id; - + epoch_t epoch; // monitor epoch + int op; // paxos op + int machine_id; // which state machine? + version_t last_committed; // i've committed to version_t pn_from; // i promise to accept after version_t pn; // with with proposal version_t old_accepted_pn; // previous pn, if we are a LAST with an uncommitted value + utime_t lease_expire; map values; MMonPaxos() : Message(MSG_MON_PAXOS) {} - MMonPaxos(int o, int mid) : Message(MSG_MON_PAXOS), - op(o), machine_id(mid), - last_committed(0), pn_from(0), pn(0), old_accepted_pn(0) { } + MMonPaxos(epoch_t e, int o, int mid) : + Message(MSG_MON_PAXOS), + epoch(e), + op(o), machine_id(mid), + last_committed(0), pn_from(0), pn(0), old_accepted_pn(0) { } virtual char *get_type_name() { return "paxos"; } void print(ostream& out) { - out << "paxos(m" << machine_id + out << "paxos(" << get_paxos_name(machine_id) << " " << get_opname(op) << " lc " << last_committed << " pn " << pn << " opn " << old_accepted_pn << ")"; } void encode_payload() { + ::_encode(epoch, payload); ::_encode(op, payload); ::_encode(machine_id, payload); ::_encode(last_committed, payload); ::_encode(pn_from, payload); ::_encode(pn, payload); ::_encode(old_accepted_pn, payload); + ::_encode(lease_expire, payload); ::_encode(values, payload); } void decode_payload() { int off = 0; + ::_decode(epoch, payload, off); ::_decode(op, payload, off); ::_decode(machine_id, payload, off); ::_decode(last_committed, payload, off); ::_decode(pn_from, payload, off); ::_decode(pn, payload, off); ::_decode(old_accepted_pn, payload, off); + ::_decode(lease_expire, payload, off); ::_decode(values, payload, off); } }; diff --git a/branches/sage/cephmds2/mon/ClientMonitor.cc b/branches/sage/cephmds2/mon/ClientMonitor.cc index 1b7e4f0f12ac3..51686e8f2bed4 100644 --- a/branches/sage/cephmds2/mon/ClientMonitor.cc +++ b/branches/sage/cephmds2/mon/ClientMonitor.cc @@ -17,6 +17,7 @@ #include "Monitor.h" #include "MDSMonitor.h" #include "OSDMonitor.h" +#include "MonitorStore.h" #include "messages/MClientMount.h" #include "messages/MClientUnmount.h" @@ -30,93 +31,195 @@ +bool ClientMonitor::update_from_paxos() +{ + assert(paxos->is_active()); + version_t paxosv = paxos->get_version(); + dout(10) << "update_from_paxos paxosv " << paxosv + << ", my v " << client_map.version << endl; + + assert(paxosv >= client_map.version); + while (paxosv > client_map.version) { + bufferlist bl; + bool success = paxos->read(client_map.version+1, bl); + if (success) { + dout(10) << "update_from_paxos applying incremental " << client_map.version+1 << endl; + Incremental inc; + int off = 0; + inc._decode(bl, off); + client_map.apply_incremental(inc); + + } else { + dout(10) << "update_from_paxos couldn't read incremental " << client_map.version+1 << endl; + return false; + } + + // save latest + bl.clear(); + client_map._encode(bl); + mon->store->put_bl_ss(bl, "clientmap", "latest"); + + // prepare next inc + prepare_pending(); + } + + return true; +} + +void ClientMonitor::prepare_pending() +{ + pending_inc = Incremental(); + pending_inc.version = client_map.version + 1; + pending_inc.next_client = client_map.next_client; + dout(10) << "prepare_pending v " << pending_inc.version + << ", next is " << pending_inc.next_client + << endl; +} + +void ClientMonitor::propose_pending() +{ + dout(10) << "propose_pending v " << pending_inc.version + << ", next is " << pending_inc.next_client + << endl; + + // apply to paxos + assert(paxos->get_version() + 1 == pending_inc.version); + bufferlist bl; + pending_inc._encode(bl); + paxos->propose_new_value(bl, new C_Commit(this)); +} + + +// ------- + -void ClientMonitor::dispatch(Message *m) +bool ClientMonitor::preprocess_update(Message *m) { + dout(10) << "preprocess_update " << *m << " from " << m->get_source_inst() << endl; + switch (m->get_type()) { + case MSG_CLIENT_MOUNT: + { + // already mounted? + entity_addr_t addr = m->get_source_addr(); + if (client_map.addr_client.count(addr)) { + int client = client_map.addr_client[addr]; + dout(7) << " client" << client << " already mounted" << endl; + _mounted(client, m); + return true; + } + } + return false; + + case MSG_CLIENT_UNMOUNT: + { + // already unmounted? + int client = m->get_source().num(); + if (client_map.client_addr.count(client) == 0) { + dout(7) << " client" << client << " not mounted" << endl; + _unmounted(m); + return true; + } + } + return false; + + default: + assert(0); + delete m; + return true; + } +} + +void ClientMonitor::prepare_update(Message *m) +{ + dout(10) << "prepare_update " << *m << " from " << m->get_source_inst() << endl; + + int client = m->get_source().num(); + entity_addr_t addr = m->get_source_addr(); + + switch (m->get_type()) { case MSG_CLIENT_MOUNT: - handle_client_mount((MClientMount*)m); + { + // choose a client id + if (client < 0 || + (client_map.client_addr.count(client) && + client_map.client_addr[client] != addr)) { + client = pending_inc.next_client; + dout(10) << "mount: assigned client" << client << " to " << addr << endl; + } else { + dout(10) << "mount: client" << client << " requested by " << addr << endl; + } + + pending_inc.add_mount(client, addr); + pending_commit.push_back(new C_Mounted(this, client, m)); + } break; case MSG_CLIENT_UNMOUNT: - handle_client_unmount((MClientUnmount*)m); + { + assert(client_map.client_addr.count(client)); + + pending_inc.add_unmount(client); + pending_commit.push_back(new C_Unmounted(this, m)); + } break; - - + default: assert(0); - } + delete m; + } } -void ClientMonitor::handle_client_mount(MClientMount *m) + +// MOUNT + + +void ClientMonitor::_mounted(int client, Message *m) { - dout(7) << "client_mount from " << m->get_source_inst() << endl; - assert(m->get_source().is_client()); - int from = m->get_source().num(); - - // choose a client id - if (from < 0 || - (client_map.count(from) && - client_map[from] != m->get_source_addr())) { - from = num_clients++; - dout(10) << "client_mount assigned client" << from << endl; - } - - client_map[from] = m->get_source_addr(); - - // reply with latest mds map entity_inst_t to = m->get_source_inst(); - to.name = MSG_ADDR_CLIENT(from); + to.name = MSG_ADDR_CLIENT(client); + + dout(10) << "_mounted client" << client << " at " << to << endl; + + // reply with latest mds, osd maps mon->mdsmon->send_latest(to); mon->osdmon->send_latest(to); + delete m; } -void ClientMonitor::handle_client_unmount(MClientUnmount *m) +void ClientMonitor::_unmounted(Message *m) { - dout(7) << "client_unmount from " << m->get_source() - << " at " << m->get_source_inst() << endl; - assert(m->get_source().is_client()); - int from = m->get_source().num(); - - if (client_map.count(from)) { - client_map.erase(from); - - if (client_map.empty() && - g_conf.mds_shutdown_on_last_unmount) { - dout(1) << "last client unmounted" << endl; - mon->do_stop(); - } - } - - // reply with (same) unmount message to ack + dout(10) << "_unmounted " << m->get_source() << endl; + + // reply with (same) unmount message mon->messenger->send_message(m, m->get_source_inst()); -} + // auto-shutdown? + if (update_from_paxos() && + mon->is_leader() && + client_map.version > 1 && + client_map.client_addr.empty() && + g_conf.mds_shutdown_on_last_unmount) { + dout(1) << "last client unmounted" << endl; + mon->do_stop(); + } +} -/* -void ClientMonitor::handle_mds_shutdown(Message *m) +void ClientMonitor::_commit(int r) { - assert(m->get_source().is_mds()); - int from = m->get_source().num(); - - mdsmap.mds_inst.erase(from); - mdsmap.all_mds.erase(from); + if (r >= 0) { + dout(10) << "_commit success" << endl; + finish_contexts(pending_commit); + } else { + dout(10) << "_commit failed" << endl; + } - dout(7) << "mds_shutdown from " << m->get_source() - << ", still have " << mdsmap.all_mds - << endl; - - // tell someone? - // fixme - - delete m; + finish_contexts(pending_commit, r); } -*/ - /* void ClientMonitor::bcast_latest_mds() { @@ -130,5 +233,24 @@ void ClientMonitor::bcast_latest_mds() send_full(MSG_ADDR_MDS(*p), mdsmap.get_inst(*p)); } } - */ + + +void ClientMonitor::create_initial() +{ + dout(10) << "create_initial" << endl; + + if (!mon->is_leader()) return; + if (paxos->get_version() > 0) return; + + if (paxos->is_writeable()) { + dout(1) << "create_initial -- creating initial map" << endl; + prepare_pending(); + propose_pending(); + } else { + dout(1) << "create_initial -- waiting for writeable" << endl; + paxos->wait_for_writeable(new C_CreateInitial(this)); + } +} + + diff --git a/branches/sage/cephmds2/mon/ClientMonitor.h b/branches/sage/cephmds2/mon/ClientMonitor.h index 1ae9401465c94..ebb1f05e9c8c6 100644 --- a/branches/sage/cephmds2/mon/ClientMonitor.h +++ b/branches/sage/cephmds2/mon/ClientMonitor.h @@ -24,31 +24,173 @@ using namespace std; #include "mds/MDSMap.h" +#include "PaxosService.h" + class Monitor; +class Paxos; + +class ClientMonitor : public PaxosService { +public: + + struct Incremental { + version_t version; + uint32_t next_client; + map mount; + set unmount; + + Incremental() : version(0), next_client() {} + + bool is_empty() { return mount.empty() && unmount.empty(); } + void add_mount(uint32_t client, entity_addr_t addr) { + next_client = MAX(next_client, client+1); + mount[client] = addr; + } + void add_unmount(uint32_t client) { + assert(client < next_client); + if (mount.count(client)) + mount.erase(client); + else + unmount.insert(client); + } + + void _encode(bufferlist &bl) { + ::_encode(version, bl); + ::_encode(next_client, bl); + ::_encode(mount, bl); + ::_encode(unmount, bl); + } + void _decode(bufferlist &bl, int& off) { + ::_decode(version, bl, off); + ::_decode(next_client, bl, off); + ::_decode(mount, bl, off); + ::_decode(unmount, bl, off); + } + }; + + struct Map { + version_t version; + uint32_t next_client; + map client_addr; + hash_map addr_client; + + Map() : version(0), next_client(0) {} + + void reverse() { + addr_client.clear(); + for (map::iterator p = client_addr.begin(); + p != client_addr.end(); + ++p) { + addr_client[p->second] = p->first; + } + } + void apply_incremental(Incremental &inc) { + assert(inc.version == version+1); + version = inc.version; + next_client = inc.next_client; + for (map::iterator p = inc.mount.begin(); + p != inc.mount.end(); + ++p) { + client_addr[p->first] = p->second; + addr_client[p->second] = p->first; + } + + for (set::iterator p = inc.unmount.begin(); + p != inc.unmount.end(); + ++p) { + assert(client_addr.count(*p)); + addr_client.erase(client_addr[*p]); + client_addr.erase(*p); + } + } -class ClientMonitor : public Dispatcher { - Monitor *mon; - Messenger *messenger; - Mutex &lock; + void _encode(bufferlist &bl) { + ::_encode(version, bl); + ::_encode(next_client, bl); + ::_encode(client_addr, bl); + } + void _decode(bufferlist &bl, int& off) { + ::_decode(version, bl, off); + ::_decode(next_client, bl, off); + ::_decode(client_addr, bl, off); + reverse(); + } + }; - private: - int num_clients; - map client_map; + class C_CreateInitial : public Context { + ClientMonitor *cmon; + public: + C_CreateInitial(ClientMonitor *cm) : cmon(cm) {} + void finish(int r) { + cmon->create_initial(); + } + }; - void bcast_latest_mds(); + class C_Mounted : public Context { + ClientMonitor *cmon; + int client; + Message *m; + public: + C_Mounted(ClientMonitor *cm, int c, Message *m_) : + cmon(cm), client(c), m(m_) {} + void finish(int r) { + if (r >= 0) + cmon->_mounted(client, m); + else + cmon->dispatch(m); + } + }; - //void accept_pending(); // accept pending, new map. - //void send_incremental(epoch_t since, msg_addr_t dest); + class C_Unmounted : public Context { + ClientMonitor *cmon; + Message *m; + public: + C_Unmounted(ClientMonitor *cm, Message *m_) : + cmon(cm), m(m_) {} + void finish(int r) { + if (r >= 0) + cmon->_unmounted(m); + else + cmon->dispatch(m); + } + }; - void handle_client_mount(class MClientMount *m); - void handle_client_unmount(class MClientUnmount *m); + class C_Commit : public Context { + ClientMonitor *cmon; + public: + C_Commit(ClientMonitor *cm) : + cmon(cm) {} + void finish(int r) { + cmon->_commit(r); + } + }; +private: + Map client_map; + list waiting_for_active; + + // leader + Incremental pending_inc; + list pending_commit; // contributers to pending_inc + + void create_initial(); + bool update_from_paxos(); + void prepare_pending(); // prepare a new pending + void propose_pending(); // propose pending update to peers + + void _mounted(int c, Message *m); + void _unmounted(Message *m); + void _commit(int r); + + void handle_query(Message *m); + bool preprocess_update(Message *m); // true if processed. + void prepare_update(Message *m); + + public: - ClientMonitor(Monitor *mn, Messenger *m, Mutex& l) : mon(mn), messenger(m), lock(l), - num_clients(0) { } + ClientMonitor(Monitor *mn, Paxos *p) : PaxosService(mn, p) { } - void dispatch(Message *m); - void tick(); // check state, take actions + //void tick(); // check state, take actions + }; #endif diff --git a/branches/sage/cephmds2/mon/Elector.cc b/branches/sage/cephmds2/mon/Elector.cc index 43341f1a4a327..cdfce72bb0681 100644 --- a/branches/sage/cephmds2/mon/Elector.cc +++ b/branches/sage/cephmds2/mon/Elector.cc @@ -16,33 +16,58 @@ #include "Monitor.h" #include "common/Timer.h" - -#include "messages/MMonElectionPropose.h" -#include "messages/MMonElectionAck.h" -#include "messages/MMonElectionVictory.h" +#include "MonitorStore.h" +#include "messages/MMonElection.h" #include "config.h" #undef dout -#define derr(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cerr << g_clock.now() << " mon" << mon->whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".elector " -#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cout << g_clock.now() << " mon" << mon->whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".elector " +#define derr(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cerr << g_clock.now() << " mon" << mon->whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".elector(" << epoch << ") " +#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cout << g_clock.now() << " mon" << mon->whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".elector(" << epoch << ") " -void Elector::start() +void Elector::init() { - dout(5) << "start -- can i be leader?" << endl; + epoch = mon->store->get_int("mon_epoch"); + if (!epoch) + epoch = 1; + dout(1) << "init, last seen epoch " << epoch << endl; +} +void Elector::shutdown() +{ + if (expire_event) + mon->timer.cancel_event(expire_event); +} + +void Elector::bump_epoch(epoch_t e) +{ + dout(10) << "bump_epoch " << epoch << " to " << e << endl; + assert(epoch < e); + epoch = e; + mon->store->put_int(epoch, "mon_epoch"); + + // clear up some state + electing_me = false; + acked_me.clear(); leader_acked = -1; +} + +void Elector::start() +{ + dout(5) << "start -- can i be leader?" << endl; + // start by trying to elect me + if (epoch % 2 == 0) + bump_epoch(epoch+1); // odd == election cycle start_stamp = g_clock.now(); - acked_me.clear(); - acked_me.insert(whoami); electing_me = true; + acked_me.insert(whoami); // bcast to everyone else for (int i=0; imonmap->num_mon; ++i) { if (i == whoami) continue; - mon->messenger->send_message(new MMonElectionPropose, + mon->messenger->send_message(new MMonElection(MMonElection::OP_PROPOSE, epoch), mon->monmap->get_inst(i)); } @@ -54,6 +79,7 @@ void Elector::defer(int who) dout(5) << "defer to " << who << endl; if (electing_me) { + // drop out acked_me.clear(); electing_me = false; } @@ -61,7 +87,7 @@ void Elector::defer(int who) // ack them leader_acked = who; ack_stamp = g_clock.now(); - mon->messenger->send_message(new MMonElectionAck, + mon->messenger->send_message(new MMonElection(MMonElection::OP_ACK, epoch), mon->monmap->get_inst(who)); // set a timer @@ -69,29 +95,22 @@ void Elector::defer(int who) } -class C_Mon_ElectionExpire : public Context { - Elector *elector; -public: - C_Mon_ElectionExpire(Elector *e) : elector(e) { } - void finish(int r) { - elector->expire(); - } -}; - void Elector::reset_timer(double plus) { // set the timer cancel_timer(); - expire_event = new C_Mon_ElectionExpire(this); - g_timer.add_event_after(g_conf.mon_lease + plus, - expire_event); + expire_event = new C_ElectionExpire(this); + mon->timer.add_event_after(g_conf.mon_lease + plus, + expire_event); } void Elector::cancel_timer() { - if (expire_event) - g_timer.cancel_event(expire_event); + if (expire_event) { + mon->timer.cancel_event(expire_event); + expire_event = 0; + } } void Elector::expire() @@ -114,29 +133,40 @@ void Elector::victory() { leader_acked = -1; electing_me = false; - + set quorum = acked_me; + cancel_timer(); - + + assert(epoch % 2 == 1); // election + bump_epoch(epoch+1); // is over! + // tell everyone - for (int i=0; imonmap->num_mon; ++i) { - if (i == whoami) continue; - mon->messenger->send_message(new MMonElectionVictory, - mon->monmap->get_inst(i)); + for (set::iterator p = quorum.begin(); + p != quorum.end(); + ++p) { + if (*p == whoami) continue; + mon->messenger->send_message(new MMonElection(MMonElection::OP_VICTORY, epoch), + mon->monmap->get_inst(*p)); } // tell monitor - mon->win_election(acked_me); + mon->win_election(epoch, quorum); } -void Elector::handle_propose(MMonElectionPropose *m) +void Elector::handle_propose(MMonElection *m) { dout(5) << "handle_propose from " << m->get_source() << endl; int from = m->get_source().num(); - if (from > whoami) { - if (leader_acked >= 0 && // we already acked someone - leader_acked < from) { // who would win over them + assert(m->epoch % 2 == 1); // election + if (m->epoch > epoch) + bump_epoch(m->epoch); + + if (whoami < from) { + // i would win over them. + if (leader_acked >= 0) { // we already acked someone + assert(leader_acked < from); // and they still win, of course dout(5) << "no, we already acked " << leader_acked << endl; } else { // wait, i should win! @@ -158,11 +188,21 @@ void Elector::handle_propose(MMonElectionPropose *m) delete m; } -void Elector::handle_ack(MMonElectionAck *m) +void Elector::handle_ack(MMonElection *m) { dout(5) << "handle_ack from " << m->get_source() << endl; int from = m->get_source().num(); + assert(m->epoch % 2 == 1); // election + if (m->epoch > epoch) { + dout(5) << "woah, that's a newer epoch, i must have rebooted. bumping and re-starting!" << endl; + bump_epoch(m->epoch); + start(); + delete m; + return; + } + assert(m->epoch == epoch); + if (electing_me) { // thanks acked_me.insert(from); @@ -175,26 +215,28 @@ void Elector::handle_ack(MMonElectionAck *m) } } else { // ignore, i'm deferring already. + assert(leader_acked >= 0); } delete m; } -void Elector::handle_victory(MMonElectionVictory *m) + +void Elector::handle_victory(MMonElection *m) { dout(5) << "handle_victory from " << m->get_source() << endl; int from = m->get_source().num(); + + assert(from < whoami); + assert(m->epoch % 2 == 0); + assert(m->epoch == epoch + 1); // i should have seen this election if i'm getting the victory. + bump_epoch(m->epoch); - if (from < whoami) { - // ok, fine, they win - mon->lose_election(from); - - // cancel my timer - cancel_timer(); - } else { - // no, that makes no sense, i should win. start over! - start(); - } + // they win + mon->lose_election(epoch, from); + + // cancel my timer + cancel_timer(); } @@ -203,19 +245,34 @@ void Elector::handle_victory(MMonElectionVictory *m) void Elector::dispatch(Message *m) { switch (m->get_type()) { - case MSG_MON_ELECTION_ACK: - handle_ack((MMonElectionAck*)m); - break; - - case MSG_MON_ELECTION_PROPOSE: - handle_propose((MMonElectionPropose*)m); - break; - - case MSG_MON_ELECTION_VICTORY: - handle_victory((MMonElectionVictory*)m); + + case MSG_MON_ELECTION: + { + MMonElection *em = (MMonElection*)m; + + if (em->epoch < epoch) { + dout(5) << "old epoch, dropping" << endl; + delete em; + break; + } + + switch (em->op) { + case MMonElection::OP_ACK: + handle_ack(em); + break; + case MMonElection::OP_PROPOSE: + handle_propose(em); + break; + case MMonElection::OP_VICTORY: + handle_victory(em); + break; + default: + assert(0); + } + } break; - default: + default: assert(0); } } diff --git a/branches/sage/cephmds2/mon/Elector.h b/branches/sage/cephmds2/mon/Elector.h index 2a10dddf92419..9bfd7cb644fc7 100644 --- a/branches/sage/cephmds2/mon/Elector.h +++ b/branches/sage/cephmds2/mon/Elector.h @@ -39,6 +39,8 @@ class Elector { void reset_timer(double plus=0.0); void cancel_timer(); + epoch_t epoch; // latest epoch we've seen. odd == election, even == stable, + // electing me bool electing_me; utime_t start_stamp; @@ -48,25 +50,42 @@ class Elector { int leader_acked; // who i've acked utime_t ack_stamp; // and when - public: - + void bump_epoch(epoch_t e=0); // i just saw a larger epoch + + class C_ElectionExpire : public Context { + Elector *elector; + public: + C_ElectionExpire(Elector *e) : elector(e) { } + void finish(int r) { + elector->expire(); + } + }; + void start(); // start an electing me void defer(int who); void expire(); // timer goes off void victory(); - void handle_propose(class MMonElectionPropose *m); - void handle_ack(class MMonElectionAck *m); - void handle_victory(class MMonElectionVictory *m); - + void handle_propose(class MMonElection *m); + void handle_ack(class MMonElection *m); + void handle_victory(class MMonElection *m); public: - Elector(Monitor *m, int w) : mon(m), whoami(w) { - // initialize all those values! - // ... - } + Elector(Monitor *m, int w) : mon(m), whoami(w), + expire_event(0), + epoch(0), + electing_me(false), + leader_acked(-1) { } + + void init(); + void shutdown(); void dispatch(Message *m); + + void call_election() { + start(); + } + }; diff --git a/branches/sage/cephmds2/mon/MDSMonitor.cc b/branches/sage/cephmds2/mon/MDSMonitor.cc index c9a680d36a244..8644f769eaaed 100644 --- a/branches/sage/cephmds2/mon/MDSMonitor.cc +++ b/branches/sage/cephmds2/mon/MDSMonitor.cc @@ -36,8 +36,29 @@ /********* MDS map **************/ + class C_RetryMessage : public Context { + Dispatcher *svc; + Message *m; + public: + C_RetryMessage(Dispatcher *s, Message *m_) : svc(s), m(m_) {} + void finish(int r) { + svc->dispatch(m); + } + }; + void MDSMonitor::dispatch(Message *m) { + if (mon->is_peon()) { + dout(1) << "peon, fw to leader" << endl; + mon->messenger->send_message(m, mon->monmap->get_inst(mon->get_leader())); + return; + } + if (mon->is_starting()) { + dout(1) << "starting, waiting" << endl; + waiting_for_active.push_back(new C_RetryMessage(this, m)); + return; + } + switch (m->get_type()) { case MSG_MDS_BEACON: @@ -68,6 +89,8 @@ void MDSMonitor::election_finished() load_map(); } } + + finish_contexts(waiting_for_active); } diff --git a/branches/sage/cephmds2/mon/MDSMonitor.h b/branches/sage/cephmds2/mon/MDSMonitor.h index 658ba50855b29..5a9dcb65c8484 100644 --- a/branches/sage/cephmds2/mon/MDSMonitor.h +++ b/branches/sage/cephmds2/mon/MDSMonitor.h @@ -38,6 +38,8 @@ class MDSMonitor : public Dispatcher { private: bufferlist encoded_map; + list waiting_for_active; + //map inc_maps; //MDSMap::Incremental pending_inc; diff --git a/branches/sage/cephmds2/mon/Monitor.cc b/branches/sage/cephmds2/mon/Monitor.cc index 402f7359552bb..e92756ba084f0 100644 --- a/branches/sage/cephmds2/mon/Monitor.cc +++ b/branches/sage/cephmds2/mon/Monitor.cc @@ -1,217 +1,240 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2004-2006 Sage Weil - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ + // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- + // vim: ts=8 sw=2 smarttab + /* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + + // TODO: missing run() method, which creates the two main timers, refreshTimer and readTimer + + #include "Monitor.h" + + #include "osd/OSDMap.h" + + #include "MonitorStore.h" + + #include "msg/Message.h" + #include "msg/Messenger.h" + + #include "messages/MPing.h" + #include "messages/MPingAck.h" + #include "messages/MGenericMessage.h" + #include "messages/MMonCommand.h" + #include "messages/MMonCommandAck.h" + + #include "messages/MMonPaxos.h" + + #include "common/Timer.h" + #include "common/Clock.h" + + #include "OSDMonitor.h" + #include "MDSMonitor.h" + #include "ClientMonitor.h" + + #include "config.h" + #undef dout + #define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cout << g_clock.now() << " mon" << whoami << (is_starting() ? (const char*)"(starting)":(is_leader() ? (const char*)"(leader)":(is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << " " + #define derr(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cerr << g_clock.now() << " mon" << whoami << (is_starting() ? (const char*)"(starting)":(is_leader() ? (const char*)"(leader)":(is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << " " + + -// TODO: missing run() method, which creates the two main timers, refreshTimer and readTimer + void Monitor::init() + { + lock.Lock(); -#include "Monitor.h" - -#include "osd/OSDMap.h" - -#include "MonitorStore.h" - -#include "msg/Message.h" -#include "msg/Messenger.h" - -#include "messages/MPing.h" -#include "messages/MPingAck.h" -#include "messages/MGenericMessage.h" -#include "messages/MMonCommand.h" -#include "messages/MMonCommandAck.h" - -#include "messages/MMonPaxos.h" - -#include "common/Timer.h" -#include "common/Clock.h" - -#include "OSDMonitor.h" -#include "MDSMonitor.h" -#include "ClientMonitor.h" - -#include "config.h" -#undef dout -#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cout << g_clock.now() << " mon" << whoami << (is_starting() ? (const char*)"(starting)":(is_leader() ? (const char*)"(leader)":(is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << " " -#define derr(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cerr << g_clock.now() << " mon" << whoami << (is_starting() ? (const char*)"(starting)":(is_leader() ? (const char*)"(leader)":(is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << " " - - - -void Monitor::init() -{ - lock.Lock(); - - dout(1) << "init" << endl; - - // store - char s[80]; - sprintf(s, "mondata/mon%d", whoami); - store = new MonitorStore(s); - - if (g_conf.mkfs) - store->mkfs(); - - store->mount(); - - // create - osdmon = new OSDMonitor(this, messenger, lock); - mdsmon = new MDSMonitor(this, messenger, lock); - clientmon = new ClientMonitor(this, messenger, lock); - - // i'm ready! - messenger->set_dispatcher(this); - - // start ticker - reset_tick(); - - // call election? - if (monmap->num_mon > 1) { - assert(monmap->num_mon != 2); - call_election(); - } else { - // we're standalone. - set q; - q.insert(whoami); - win_election(q); - } - - lock.Unlock(); -} - -void Monitor::shutdown() -{ - dout(1) << "shutdown" << endl; - - // cancel all events - cancel_tick(); - timer.cancel_all(); - timer.join(); - - // stop osds. - for (set::iterator it = osdmon->osdmap.get_osds().begin(); - it != osdmon->osdmap.get_osds().end(); - it++) { - if (osdmon->osdmap.is_down(*it)) continue; - dout(10) << "sending shutdown to osd" << *it << endl; - messenger->send_message(new MGenericMessage(MSG_SHUTDOWN), - osdmon->osdmap.get_inst(*it)); - } - osdmon->mark_all_down(); - - // monitors too. - for (int i=0; inum_mon; i++) - if (i != whoami) - messenger->send_message(new MGenericMessage(MSG_SHUTDOWN), - monmap->get_inst(i)); - - // unmount my local storage - if (store) - delete store; - - // clean up - if (monmap) delete monmap; - if (osdmon) delete osdmon; - if (mdsmon) delete mdsmon; - if (clientmon) delete clientmon; - - // die. - messenger->shutdown(); - delete messenger; -} - - -void Monitor::call_election() -{ - if (monmap->num_mon == 1) return; - - dout(10) << "call_election" << endl; - state = STATE_STARTING; - - elector.start(); - - osdmon->election_starting(); - //mdsmon->election_starting(); -} - -void Monitor::win_election(set& active) -{ - state = STATE_LEADER; - leader = whoami; - quorum = active; - dout(10) << "win_election, quorum is " << quorum << endl; - - // init - osdmon->election_finished(); - mdsmon->election_finished(); - - // init paxos - test_paxos.leader_start(); -} - -void Monitor::lose_election(int l) -{ - state = STATE_PEON; - leader = l; - dout(10) << "lose_election, leader is mon" << leader << endl; -} - - -void Monitor::handle_command(MMonCommand *m) -{ - dout(0) << "handle_command " << *m << endl; - - int r = -1; - string rs = "unrecognized command"; - - if (!m->cmd.empty()) { - if (m->cmd[0] == "stop") { - r = 0; - rs = "stopping"; - do_stop(); - } - else if (m->cmd[0] == "mds") { - mdsmon->handle_command(m, r, rs); - } - else if (m->cmd[0] == "osd") { - - } - } - - // reply - messenger->send_message(new MMonCommandAck(r, rs), m->get_source_inst()); - delete m; -} - - -void Monitor::do_stop() -{ - dout(0) << "do_stop -- shutting down" << endl; - mdsmon->do_stop(); -} - - -void Monitor::dispatch(Message *m) -{ - lock.Lock(); - { - switch (m->get_type()) { - - // misc - case MSG_PING_ACK: - handle_ping_ack((MPingAck*)m); - break; - - case MSG_SHUTDOWN: - assert(m->get_source().is_osd()); - osdmon->dispatch(m); + dout(1) << "init" << endl; + + // store + char s[80]; + sprintf(s, "mondata/mon%d", whoami); + store = new MonitorStore(s); + + if (g_conf.mkfs) + store->mkfs(); + + store->mount(); + + // create + osdmon = new OSDMonitor(this, messenger, lock); + mdsmon = new MDSMonitor(this, messenger, lock); + clientmon = new ClientMonitor(this, &paxos_clientmap); + + // init paxos + paxos_test.init(); + paxos_osdmap.init(); + paxos_mdsmap.init(); + paxos_clientmap.init(); + + // i'm ready! + messenger->set_dispatcher(this); + + // start ticker + reset_tick(); + + // call election? + if (monmap->num_mon > 1) { + assert(monmap->num_mon != 2); + call_election(); + } else { + // we're standalone. + set q; + q.insert(whoami); + win_election(1, q); + } + + lock.Unlock(); + } + + void Monitor::shutdown() + { + dout(1) << "shutdown" << endl; + + elector.shutdown(); + + // cancel all events + cancel_tick(); + timer.cancel_all(); + timer.join(); + + // stop osds. + for (set::iterator it = osdmon->osdmap.get_osds().begin(); + it != osdmon->osdmap.get_osds().end(); + it++) { + if (osdmon->osdmap.is_down(*it)) continue; + dout(10) << "sending shutdown to osd" << *it << endl; + messenger->send_message(new MGenericMessage(MSG_SHUTDOWN), + osdmon->osdmap.get_inst(*it)); + } + osdmon->mark_all_down(); + + // monitors too. + for (int i=0; inum_mon; i++) + if (i != whoami) + messenger->send_message(new MGenericMessage(MSG_SHUTDOWN), + monmap->get_inst(i)); + + // unmount my local storage + if (store) + delete store; + + // clean up + if (monmap) delete monmap; + if (osdmon) delete osdmon; + if (mdsmon) delete mdsmon; + if (clientmon) delete clientmon; + + // die. + messenger->shutdown(); + delete messenger; + } + + + void Monitor::call_election() + { + if (monmap->num_mon == 1) return; + + dout(10) << "call_election" << endl; + state = STATE_STARTING; + + elector.call_election(); + + osdmon->election_starting(); + //mdsmon->election_starting(); + } + + void Monitor::win_election(epoch_t epoch, set& active) + { + state = STATE_LEADER; + leader = whoami; + mon_epoch = epoch; + quorum = active; + dout(10) << "win_election, epoch " << mon_epoch << " quorum is " << quorum << endl; + + // init paxos + paxos_test.leader_init(); + paxos_mdsmap.leader_init(); + paxos_osdmap.leader_init(); + paxos_clientmap.leader_init(); + + // init + osdmon->election_finished(); + mdsmon->election_finished(); + clientmon->election_finished(); + } + + void Monitor::lose_election(epoch_t epoch, int l) + { + state = STATE_PEON; + mon_epoch = epoch; + leader = l; + dout(10) << "lose_election, epoch " << mon_epoch << " leader is mon" << leader << endl; + + // init paxos + paxos_test.peon_init(); + paxos_mdsmap.peon_init(); + paxos_osdmap.peon_init(); + paxos_clientmap.peon_init(); + } + + + void Monitor::handle_command(MMonCommand *m) + { + dout(0) << "handle_command " << *m << endl; + + int r = -1; + string rs = "unrecognized command"; + + if (!m->cmd.empty()) { + if (m->cmd[0] == "stop") { + r = 0; + rs = "stopping"; + do_stop(); + } + else if (m->cmd[0] == "mds") { + mdsmon->handle_command(m, r, rs); + } + else if (m->cmd[0] == "osd") { + + } + } + + // reply + messenger->send_message(new MMonCommandAck(r, rs), m->get_source_inst()); + delete m; + } + + + void Monitor::do_stop() + { + dout(0) << "do_stop -- shutting down" << endl; + mdsmon->do_stop(); + } + + + void Monitor::dispatch(Message *m) + { + lock.Lock(); + { + switch (m->get_type()) { + + // misc + case MSG_PING_ACK: + handle_ping_ack((MPingAck*)m); + break; + + case MSG_SHUTDOWN: + if (m->get_source().is_osd()) + osdmon->dispatch(m); + else + handle_shutdown(m); + break; case MSG_MON_COMMAND: @@ -236,7 +259,8 @@ void Monitor::dispatch(Message *m) // hackish: did all mds's shut down? if (g_conf.mon_stop_with_last_mds && - mdsmon->mdsmap.get_num_up_or_failed_mds() == 0) + mdsmon->mdsmap.get_num_up_or_failed_mds() == 0 && + is_leader()) shutdown(); break; @@ -250,23 +274,39 @@ void Monitor::dispatch(Message *m) // paxos case MSG_MON_PAXOS: - // send it to the right paxos instance - switch (((MMonPaxos*)m)->machine_id) { - case PAXOS_TEST: - test_paxos.dispatch(m); - break; - case PAXOS_OSDMAP: - //... - - default: - assert(0); + { + MMonPaxos *pm = (MMonPaxos*)m; + + // sanitize + if (pm->epoch > mon_epoch) + assert(0); //call_election(); // wtf + if (pm->epoch != mon_epoch) { + delete pm; + break; + } + + // send it to the right paxos instance + switch (pm->machine_id) { + case PAXOS_TEST: + paxos_test.dispatch(m); + break; + case PAXOS_OSDMAP: + paxos_osdmap.dispatch(m); + break; + case PAXOS_MDSMAP: + paxos_mdsmap.dispatch(m); + break; + case PAXOS_CLIENTMAP: + paxos_clientmap.dispatch(m); + break; + default: + assert(0); + } } break; // elector messages - case MSG_MON_ELECTION_PROPOSE: - case MSG_MON_ELECTION_ACK: - case MSG_MON_ELECTION_VICTORY: + case MSG_MON_ELECTION: elector.dispatch(m); break; @@ -282,9 +322,13 @@ void Monitor::dispatch(Message *m) void Monitor::handle_shutdown(Message *m) { - dout(1) << "shutdown from " << m->get_source() << endl; - - shutdown(); + assert(m->get_source().is_mon()); + if (m->get_source().num() == get_leader()) { + dout(1) << "shutdown from leader " << m->get_source() << endl; + shutdown(); + } else { + dout(1) << "ignoring shutdown from non-leader " << m->get_source() << endl; + } delete m; } diff --git a/branches/sage/cephmds2/mon/Monitor.h b/branches/sage/cephmds2/mon/Monitor.h index 526f63ab55fae..015e5797ca6df 100644 --- a/branches/sage/cephmds2/mon/Monitor.h +++ b/branches/sage/cephmds2/mon/Monitor.h @@ -31,13 +31,9 @@ class OSDMonitor; class MDSMonitor; class ClientMonitor; -#define PAXOS_TEST 0 -#define PAXOS_OSDMAP 1 -#define PAXOS_MDSMAP 2 -#define PAXOS_CLIENTMAP 3 class Monitor : public Dispatcher { -protected: +public: // me int whoami; Messenger *messenger; @@ -52,63 +48,65 @@ protected: void reset_tick(); friend class C_Mon_Tick; - // my local store - //ObjectStore *store; + // -- local storage -- +public: MonitorStore *store; - const static int INO_ELECTOR = 1; - const static int INO_MON_MAP = 2; - const static int INO_OSD_MAP = 10; - const static int INO_OSD_INC_MAP = 11; - const static int INO_MDS_MAP = 20; - - // elector - Elector elector; - friend class Elector; - - epoch_t mon_epoch; // monitor epoch (election instance) - set quorum; // current active set of monitors (if !starting) - - //void call_election(); - - // paxos - Paxos test_paxos; - friend class Paxos; - - - // monitor state + // -- monitor state -- +private: const static int STATE_STARTING = 0; // electing const static int STATE_LEADER = 1; const static int STATE_PEON = 2; int state; - int leader; // current leader (to best of knowledge) - utime_t last_called_election; // [starting] last time i called an election - +public: bool is_starting() { return state == STATE_STARTING; } bool is_leader() { return state == STATE_LEADER; } bool is_peon() { return state == STATE_PEON; } - // my public services + + // -- elector -- +private: + Elector elector; + friend class Elector; + + epoch_t mon_epoch; // monitor epoch (election instance) + int leader; // current leader (to best of knowledge) + set quorum; // current active set of monitors (if !starting) + utime_t last_called_election; // [starting] last time i called an election + +public: + epoch_t get_epoch() { return mon_epoch; } + int get_leader() { return leader; } + const set& get_quorum() { return quorum; } + + void call_election(); // initiate election + void win_election(epoch_t epoch, set& q); // end election (called by Elector) + void lose_election(epoch_t epoch, int l); // end election (called by Elector) + + + // -- paxos -- + Paxos paxos_test; + Paxos paxos_mdsmap; + Paxos paxos_osdmap; + Paxos paxos_clientmap; + friend class Paxos; + + + // -- services -- OSDMonitor *osdmon; MDSMonitor *mdsmon; ClientMonitor *clientmon; - // messages - void handle_shutdown(Message *m); - void handle_ping_ack(class MPingAck *m); - void handle_command(class MMonCommand *m); - friend class OSDMonitor; friend class MDSMonitor; friend class ClientMonitor; - // initiate election - void call_election(); - // end election (called by Elector) - void win_election(set& q); - void lose_election(int l); + // messages + void handle_shutdown(Message *m); + void handle_ping_ack(class MPingAck *m); + void handle_command(class MMonCommand *m); @@ -119,18 +117,22 @@ protected: monmap(mm), timer(lock), tick_timer(0), store(0), + + state(STATE_STARTING), + elector(this, w), mon_epoch(0), + leader(0), - test_paxos(this, w, PAXOS_TEST, "tester"), // tester state machine + paxos_test(this, w, PAXOS_TEST), + paxos_mdsmap(this, w, PAXOS_MDSMAP), + paxos_osdmap(this, w, PAXOS_OSDMAP), + paxos_clientmap(this, w, PAXOS_CLIENTMAP), - state(STATE_STARTING), - leader(0), osdmon(0), mdsmon(0), clientmon(0) { } - void init(); void shutdown(); void dispatch(Message *m); diff --git a/branches/sage/cephmds2/mon/OSDMonitor.cc b/branches/sage/cephmds2/mon/OSDMonitor.cc index 38b7616722d4e..26696c929c236 100644 --- a/branches/sage/cephmds2/mon/OSDMonitor.cc +++ b/branches/sage/cephmds2/mon/OSDMonitor.cc @@ -232,18 +232,18 @@ void OSDMonitor::create_initial() bool OSDMonitor::get_map_bl(epoch_t epoch, bufferlist& bl) { - if (!mon->store->exists_bl_sn("osdmap", epoch)) + if (!mon->store->exists_bl_sn("osdmap_full", epoch)) return false; - int r = mon->store->get_bl_sn(bl, "osdmap", epoch); + int r = mon->store->get_bl_sn(bl, "osdmap_full", epoch); assert(r > 0); return true; } bool OSDMonitor::get_inc_map_bl(epoch_t epoch, bufferlist& bl) { - if (!mon->store->exists_bl_sn("osdincmap", epoch)) + if (!mon->store->exists_bl_sn("osdmap_inc", epoch)) return false; - int r = mon->store->get_bl_sn(bl, "osdincmap", epoch); + int r = mon->store->get_bl_sn(bl, "osdmap_inc", epoch); assert(r > 0); return true; } @@ -254,7 +254,7 @@ void OSDMonitor::save_map() bufferlist bl; osdmap.encode(bl); - mon->store->put_bl_sn(bl, "osdmap", osdmap.get_epoch()); + mon->store->put_bl_sn(bl, "osdmap_full", osdmap.get_epoch()); mon->store->put_int(osdmap.get_epoch(), "osd_epoch"); } @@ -266,8 +266,8 @@ void OSDMonitor::save_inc_map(OSDMap::Incremental &inc) bufferlist incbl; inc.encode(incbl); - mon->store->put_bl_sn(bl, "osdmap", osdmap.get_epoch()); - mon->store->put_bl_sn(incbl, "osdincmap", osdmap.get_epoch()); + mon->store->put_bl_sn(bl, "osdmap_full", osdmap.get_epoch()); + mon->store->put_bl_sn(incbl, "osdmap_inc", osdmap.get_epoch()); mon->store->put_int(osdmap.get_epoch(), "osd_epoch"); } @@ -400,6 +400,7 @@ void OSDMonitor::handle_osd_boot(MOSDBoot *m) bcast_latest_osd(); bcast_latest_mds(); + send_waiting(); } else { dout(7) << "osd_boot waiting for " << (osdmap.osds.size() - osdmap.osd_inst.size()) @@ -663,7 +664,7 @@ void OSDMonitor::election_finished() epoch_t epoch = mon->store->get_int("osd_epoch"); dout(10) << " last epoch was " << epoch << endl; bufferlist bl, blinc; - int r = mon->store->get_bl_sn(bl, "osdmap", epoch); + int r = mon->store->get_bl_sn(bl, "osdmap_full", epoch); assert(r>0); osdmap.decode(bl); diff --git a/branches/sage/cephmds2/mon/Paxos.cc b/branches/sage/cephmds2/mon/Paxos.cc index 78d3d58287bbc..f83bdc57b21ba 100644 --- a/branches/sage/cephmds2/mon/Paxos.cc +++ b/branches/sage/cephmds2/mon/Paxos.cc @@ -20,44 +20,67 @@ #include "config.h" #undef dout -#define derr(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cerr << g_clock.now() << " mon" << whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".paxos(" << machine_name << ") " -#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cout << g_clock.now() << " mon" << whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".paxos(" << machine_name << ") " +#define derr(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cerr << g_clock.now() << " mon" << whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".paxos(" << machine_name << " " << get_statename(state) << " lc " << last_committed << ") " +#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cout << g_clock.now() << " mon" << whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".paxos(" << machine_name << " " << get_statename(state) << " lc " << last_committed << ") " +void Paxos::init() +{ + // load paxos variables from stable storage + last_pn = mon->store->get_int(machine_name, "last_pn"); + accepted_pn = mon->store->get_int(machine_name, "accepted_pn"); + last_committed = mon->store->get_int(machine_name, "last_committed"); + + dout(10) << "init" << endl; +} + // --------------------------------- // PHASE 1 -// proposer - +// leader void Paxos::collect(version_t oldpn) { + // we're recoverying, it seems! + state = STATE_RECOVERING; + assert(mon->is_leader()); + // reset the number of lasts received accepted_pn = get_new_proposal_number(MAX(accepted_pn, oldpn)); accepted_pn_from = last_committed; num_last = 1; + old_accepted_v = 0; old_accepted_pn = 0; old_accepted_value.clear(); dout(10) << "collect with pn " << accepted_pn << endl; // send collect - for (int i=0; imonmap->num_mon; ++i) { - if (i == whoami) continue; + for (set::const_iterator p = mon->get_quorum().begin(); + p != mon->get_quorum().end(); + ++p) { + if (*p == whoami) continue; - MMonPaxos *collect = new MMonPaxos(MMonPaxos::OP_COLLECT, machine_id); + MMonPaxos *collect = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COLLECT, machine_id); collect->last_committed = last_committed; collect->pn = accepted_pn; - mon->messenger->send_message(collect, mon->monmap->get_inst(i)); + mon->messenger->send_message(collect, mon->monmap->get_inst(*p)); } } + +// peon void Paxos::handle_collect(MMonPaxos *collect) { dout(10) << "handle_collect " << *collect << endl; + assert(mon->is_peon()); // mon epoch filter should catch strays + + // we're recoverying, it seems! + state = STATE_RECOVERING; + // reply - MMonPaxos *last = new MMonPaxos(MMonPaxos::OP_LAST, machine_id); + MMonPaxos *last = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LAST, machine_id); last->last_committed = last_committed; // do we have an accepted but uncommitted value? @@ -77,6 +100,7 @@ void Paxos::handle_collect(MMonPaxos *collect) accepted_pn = collect->pn; accepted_pn_from = collect->pn_from; dout(10) << "accepting pn " << accepted_pn << " from " << accepted_pn_from << endl; + mon->store->put_int(accepted_pn, machine_name, "accepted_pn"); } else { // don't accept! dout(10) << "NOT accepting pn " << collect->pn << " from " << collect->pn_from @@ -103,15 +127,22 @@ void Paxos::handle_collect(MMonPaxos *collect) } +// leader void Paxos::handle_last(MMonPaxos *last) { dout(10) << "handle_last " << *last << endl; + if (!mon->is_leader()) { + dout(10) << "not leader, dropping" << endl; + delete last; + return; + } + // share committed values? if (last->last_committed < last_committed) { // share committed values dout(10) << "sending commit to " << last->get_source() << endl; - MMonPaxos *commit = new MMonPaxos(MMonPaxos::OP_COMMIT, machine_id); + MMonPaxos *commit = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COMMIT, machine_id); for (version_t v = last->last_committed; v <= last_committed; v++) { @@ -122,7 +153,7 @@ void Paxos::handle_last(MMonPaxos *last) mon->messenger->send_message(commit, last->get_source_inst()); } - // did we receive committed value? + // did we receive a committed value? if (last->last_committed > last_committed) { for (version_t v = last_committed; v <= last->last_committed; @@ -132,37 +163,47 @@ void Paxos::handle_last(MMonPaxos *last) << last->values[v].length() << " bytes" << endl; } last_committed = last->last_committed; - mon->store->put_int(last_committed, machine_name, "last_commtted"); + mon->store->put_int(last_committed, machine_name, "last_committed"); dout(10) << "last_committed now " << last_committed << endl; } // do they accept your pn? if (last->old_accepted_pn > accepted_pn) { - dout(10) << "uh oh, they have a higher pn than us. pick a new one." << endl; + // no, try again. + dout(10) << " they had a higher pn than us, picking a new one." << endl; collect(last->old_accepted_pn); } else { - // they accepted our pn. great. + // yes, they accepted our pn. great. num_last++; - dout(10) << "great, they accepted our pn, we now have " << num_last << endl; + dout(10) << " they accepted our pn, we now have " + << num_last << " peons" << endl; // did this person send back an accepted but uncommitted value? if (last->old_accepted_pn && last->old_accepted_pn > old_accepted_pn) { - version_t v = last->last_committed+1; - dout(10) << "we learned an old value for " << v << " pn " << last->old_accepted_pn; + old_accepted_v = last->last_committed+1; old_accepted_pn = last->old_accepted_pn; - old_accepted_value = last->values[v]; + old_accepted_value = last->values[old_accepted_v]; + dout(10) << "we learned an old (possible) value for " << old_accepted_v + << " pn " << old_accepted_pn + << " " << old_accepted_value.length() << " bytes" + << endl; } - // do we have a majority? - if (num_last == mon->monmap->num_mon/2+1) { - // do this once. - + // is that everyone? + if (num_last == mon->get_quorum().size()) { // did we learn an old value? - if (old_accepted_value.length()) { - dout(10) << "begin on old learned value" << endl; + if (old_accepted_v == last_committed+1 && + old_accepted_value.length()) { + dout(10) << "that's everyone. begin on old learned value" << endl; begin(old_accepted_value); - } + } else { + // active! + dout(10) << "that's everyone. active!" << endl; + state = STATE_ACTIVE; + finish_contexts(waiting_for_active); + extend_lease(); + } } } @@ -170,54 +211,84 @@ void Paxos::handle_last(MMonPaxos *last) } +// leader void Paxos::begin(bufferlist& v) { dout(10) << "begin for " << last_committed+1 << " " << new_value.length() << " bytes" << endl; - // we must already have a majority for this to work. - assert(num_last > mon->monmap->num_mon/2); + assert(mon->is_leader()); + + assert(is_active()); + state = STATE_UPDATING; + // we must already have a majority for this to work. + assert(mon->get_quorum().size() == 1 || + num_last > (unsigned)mon->monmap->num_mon/2); + // and no value, yet. assert(new_value.length() == 0); - + // accept it ourselves num_accepted = 1; new_value = v; mon->store->put_bl_sn(new_value, machine_name, last_committed+1); - // ask others to accept it to! - for (int i=0; imonmap->num_mon; ++i) { - if (i == whoami) continue; + if (mon->get_quorum().size() == 1) { + // we're alone, take it easy + commit(); + state = STATE_ACTIVE; + finish_contexts(waiting_for_commit); + finish_contexts(waiting_for_active); + return; + } - dout(10) << " sending begin to mon" << i << endl; - MMonPaxos *begin = new MMonPaxos(MMonPaxos::OP_BEGIN, machine_id); + // ask others to accept it to! + for (set::const_iterator p = mon->get_quorum().begin(); + p != mon->get_quorum().end(); + ++p) { + if (*p == whoami) continue; + + dout(10) << " sending begin to mon" << *p << endl; + MMonPaxos *begin = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_BEGIN, machine_id); begin->values[last_committed+1] = new_value; + begin->last_committed = last_committed; begin->pn = accepted_pn; - mon->messenger->send_message(begin, mon->monmap->get_inst(i)); + mon->messenger->send_message(begin, mon->monmap->get_inst(*p)); } + + // set timeout event + accept_timeout_event = new C_AcceptTimeout(this); + mon->timer.add_event_after(g_conf.mon_accept_timeout, accept_timeout_event); } +// peon void Paxos::handle_begin(MMonPaxos *begin) { dout(10) << "handle_begin " << *begin << endl; // can we accept this? - if (begin->pn != accepted_pn) { + if (begin->pn < accepted_pn) { dout(10) << " we accepted a higher pn " << accepted_pn << ", ignoring" << endl; delete begin; return; } + assert(begin->pn == accepted_pn); + assert(begin->last_committed == last_committed); + // set state. + state = STATE_UPDATING; + lease_expire = utime_t(); // cancel lease + // yes. version_t v = last_committed+1; dout(10) << "accepting value for " << v << " pn " << accepted_pn << endl; mon->store->put_bl_sn(begin->values[v], machine_name, v); // reply - MMonPaxos *accept = new MMonPaxos(MMonPaxos::OP_ACCEPT, machine_id); + MMonPaxos *accept = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_ACCEPT, machine_id); accept->pn = accepted_pn; accept->last_committed = last_committed; mon->messenger->send_message(accept, begin->get_source_inst()); @@ -225,7 +296,7 @@ void Paxos::handle_begin(MMonPaxos *begin) delete begin; } - +// leader void Paxos::handle_accept(MMonPaxos *accept) { dout(10) << "handle_accept " << *accept << endl; @@ -236,22 +307,46 @@ void Paxos::handle_accept(MMonPaxos *accept) delete accept; return; } - if (accept->last_committed != last_committed) { - dout(10) << " this is from an old round that's already committed, ignoring" << endl; + if (last_committed > 0 && + accept->last_committed < last_committed-1) { + dout(10) << " this is from an old round, ignoring" << endl; delete accept; return; } + assert(accept->last_committed == last_committed || // not committed + accept->last_committed == last_committed-1); // committed + assert(state == STATE_UPDATING); num_accepted++; dout(10) << "now " << num_accepted << " have accepted" << endl; // new majority? - if (num_accepted == mon->monmap->num_mon/2+1) { + if (num_accepted == (unsigned)mon->monmap->num_mon/2+1) { // yay, commit! + // note: this may happen before the lease is reextended (below) dout(10) << "we got a majority, committing too" << endl; commit(); - } + } + // done? + if (num_accepted == mon->get_quorum().size()) { + state = STATE_ACTIVE; + finish_contexts(waiting_for_commit); + finish_contexts(waiting_for_active); + extend_lease(); + + // cancel timeout event + mon->timer.cancel_event(accept_timeout_event); + accept_timeout_event = 0; + } +} + +void Paxos::accept_timeout() +{ + dout(5) << "accept timeout, calling fresh election" << endl; + assert(mon->is_leader()); + assert(is_updating()); + mon->call_election(); } void Paxos::commit() @@ -263,20 +358,21 @@ void Paxos::commit() mon->store->put_int(last_committed, machine_name, "last_committed"); // tell everyone - for (int i=0; imonmap->num_mon; ++i) { - if (i == whoami) continue; + for (set::const_iterator p = mon->get_quorum().begin(); + p != mon->get_quorum().end(); + ++p) { + if (*p == whoami) continue; - dout(10) << " sending commit to mon" << i << endl; - MMonPaxos *commit = new MMonPaxos(MMonPaxos::OP_COMMIT, machine_id); + dout(10) << " sending commit to mon" << *p << endl; + MMonPaxos *commit = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COMMIT, machine_id); commit->values[last_committed] = new_value; commit->pn = accepted_pn; - mon->messenger->send_message(commit, mon->monmap->get_inst(i)); + mon->messenger->send_message(commit, mon->monmap->get_inst(*p)); } // get ready for a new round. new_value.clear(); - } @@ -284,14 +380,137 @@ void Paxos::handle_commit(MMonPaxos *commit) { dout(10) << "handle_commit on " << commit->last_committed << endl; + if (!mon->is_peon()) { + dout(10) << "not a peon, dropping" << endl; + assert(0); + delete commit; + return; + } + // commit locally. - last_committed = commit->last_committed; - mon->store->put_bl_sn(commit->values[last_committed], machine_name, last_committed); + for (map::iterator p = commit->values.begin(); + p != commit->values.end(); + ++p) { + assert(p->first == last_committed+1); + last_committed = p->first; + mon->store->put_bl_sn(p->second, machine_name, last_committed); + } mon->store->put_int(last_committed, machine_name, "last_committed"); delete commit; } +void Paxos::extend_lease() +{ + assert(mon->is_leader()); + assert(is_active()); + + lease_expire = g_clock.now(); + lease_expire += g_conf.mon_lease; + acked_lease.clear(); + acked_lease.insert(whoami); + + dout(7) << "extend_lease now+" << g_conf.mon_lease << " (" << lease_expire << ")" << endl; + + // bcast + for (set::const_iterator p = mon->get_quorum().begin(); + p != mon->get_quorum().end(); + ++p) { + if (*p == whoami) continue; + MMonPaxos *lease = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LEASE, machine_id); + lease->last_committed = last_committed; + lease->lease_expire = lease_expire; + mon->messenger->send_message(lease, mon->monmap->get_inst(*p)); + } + + // wake people up + finish_contexts(waiting_for_readable); + finish_contexts(waiting_for_writeable); + + // set renew event + lease_renew_event = new C_LeaseRenew(this); + utime_t at = lease_expire; + at -= g_conf.mon_lease; + at += g_conf.mon_lease_renew_interval; + mon->timer.add_event_at(at, lease_renew_event); + + // set timeout event. + // if old timeout is still in place, leave it. + if (!lease_ack_timeout_event) { + lease_ack_timeout_event = new C_LeaseAckTimeout(this); + mon->timer.add_event_after(g_conf.mon_lease_ack_timeout, lease_ack_timeout_event); + } +} + + +// peon +void Paxos::handle_lease(MMonPaxos *lease) +{ + // sanity + if (!mon->is_peon() || + last_committed != lease->last_committed) { + dout(10) << "handle_lease i'm not a peon, or they're not the leader, or the last_committed doesn't match, dropping" << endl; + delete lease; + return; + } + + // extend lease + if (lease_expire < lease->lease_expire) + lease_expire = lease->lease_expire; + + state = STATE_ACTIVE; + finish_contexts(waiting_for_active); + + dout(10) << "handle_lease on " << lease->last_committed + << " now " << lease_expire << endl; + + // ack + MMonPaxos *ack = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LEASE_ACK, machine_id); + ack->last_committed = last_committed; + ack->lease_expire = lease_expire; + mon->messenger->send_message(ack, lease->get_source_inst()); + + // kick waiters + if (is_readable()) + finish_contexts(waiting_for_readable); + + delete lease; +} + +void Paxos::handle_lease_ack(MMonPaxos *ack) +{ + int from = ack->get_source().num(); + + if (acked_lease.count(from) == 0) { + acked_lease.insert(from); + + if (acked_lease == mon->get_quorum()) { + // yay! + dout(10) << "handle_lease_ack from " << ack->get_source() + << " -- got everyone" << endl; + mon->timer.cancel_event(lease_ack_timeout_event); + lease_ack_timeout_event = 0; + } else { + dout(10) << "handle_lease_ack from " << ack->get_source() + << " -- still need " + << mon->get_quorum().size() - acked_lease.size() + << " more" << endl; + } + } else { + dout(10) << "handle_lease_ack from " << ack->get_source() + << " dup (lagging!), ignoring" << endl; + } + + delete ack; +} + +void Paxos::lease_ack_timeout() +{ + dout(5) << "lease_ack_timeout -- calling new election" << endl; + assert(mon->is_leader()); + assert(is_active()); + mon->call_election(); +} /* @@ -299,37 +518,80 @@ void Paxos::handle_commit(MMonPaxos *commit) */ version_t Paxos::get_new_proposal_number(version_t gt) { - // read last - version_t last = mon->store->get_int("last_paxos_proposal"); - if (last < gt) - last = gt; + if (last_pn < gt) + last_pn = gt; - // update - last /= 100; - last++; - - // make it unique among all monitors. - version_t pn = last*100 + (version_t)whoami; + // update. make it unique among all monitors. + last_pn /= 100; + last_pn++; + last_pn *= 100; + last_pn += (version_t)whoami; // write - mon->store->put_int(pn, "last_paxos_proposal"); + mon->store->put_int(last_pn, machine_name, "last_pn"); - dout(10) << "get_new_proposal_number = " << pn << endl; - return pn; + dout(10) << "get_new_proposal_number = " << last_pn << endl; + return last_pn; } -void Paxos::leader_start() +void Paxos::cancel_events() +{ + if (accept_timeout_event) { + mon->timer.cancel_event(accept_timeout_event); + accept_timeout_event = 0; + } + if (lease_renew_event) { + mon->timer.cancel_event(lease_renew_event); + lease_renew_event = 0; + } + if (lease_ack_timeout_event) { + mon->timer.cancel_event(lease_ack_timeout_event); + lease_ack_timeout_event = 0; + } +} + +void Paxos::leader_init() { - dout(10) << "leader_start -- i am the leader, start paxos" << endl; + if (mon->get_quorum().size() == 1) { + state = STATE_ACTIVE; + return; + } + cancel_events(); + state = STATE_RECOVERING; + lease_expire = utime_t(); + dout(10) << "leader_init -- starting paxos recovery" << endl; collect(0); } +void Paxos::peon_init() +{ + cancel_events(); + state = STATE_RECOVERING; + lease_expire = utime_t(); + dout(10) << "peon_init -- i am a peon" << endl; + + // no chance to write now! + finish_contexts(waiting_for_writeable, -1); + finish_contexts(waiting_for_commit, -1); +} + void Paxos::dispatch(Message *m) { + // election in progress? + if (mon->is_starting()) { + dout(5) << "election in progress, dropping " << *m << endl; + delete m; + return; + } + + // check sanity + assert(mon->is_leader() || + (mon->is_peon() && m->get_source().num() == mon->get_leader())); + switch (m->get_type()) { - + case MSG_MON_PAXOS: { MMonPaxos *pm = (MMonPaxos*)m; @@ -340,23 +602,24 @@ void Paxos::dispatch(Message *m) case MMonPaxos::OP_COLLECT: handle_collect(pm); break; - case MMonPaxos::OP_LAST: handle_last(pm); break; - case MMonPaxos::OP_BEGIN: handle_begin(pm); break; - case MMonPaxos::OP_ACCEPT: handle_accept(pm); break; - case MMonPaxos::OP_COMMIT: handle_commit(pm); break; - + case MMonPaxos::OP_LEASE: + handle_lease(pm); + break; + case MMonPaxos::OP_LEASE_ACK: + handle_lease_ack(pm); + break; default: assert(0); } @@ -368,3 +631,82 @@ void Paxos::dispatch(Message *m) } } + + + +// ----------------- +// service interface + +// -- READ -- + +bool Paxos::is_readable() +{ + if (mon->get_quorum().size() == 1) return true; + return + (mon->is_peon() || mon->is_leader()) && + is_active() && + g_clock.now() < lease_expire; +} + +bool Paxos::read(version_t v, bufferlist &bl) +{ + if (!is_readable()) + return false; + + if (!mon->store->get_bl_sn(bl, machine_name, v)) + return false; + return true; +} + +version_t Paxos::read_current(bufferlist &bl) +{ + if (!is_readable()) + return 0; + if (read(last_committed, bl)) + return last_committed; + return 0; +} + + + + +// -- WRITE -- + +bool Paxos::is_writeable() +{ + if (mon->get_quorum().size() == 1) return true; + return + mon->is_leader() && + is_active() && + g_clock.now() < lease_expire; +} + +bool Paxos::propose_new_value(bufferlist& bl, Context *oncommit) +{ + /* + // writeable? + if (!is_writeable()) { + dout(5) << "propose_new_value " << last_committed+1 << " " << bl.length() << " bytes" + << " -- not writeable" << endl; + if (oncommit) { + oncommit->finish(-1); + delete oncommit; + } + return false; + } + */ + + assert(mon->is_leader() && is_active()); + + // cancel lease renewal and timeout events. + cancel_events(); + + // ok! + dout(5) << "propose_new_value " << last_committed+1 << " " << bl.length() << " bytes" << endl; + if (oncommit) + waiting_for_commit.push_back(oncommit); + begin(bl); + + return true; +} + diff --git a/branches/sage/cephmds2/mon/Paxos.h b/branches/sage/cephmds2/mon/Paxos.h index 777d175685bc9..6699cc5ad33ad 100644 --- a/branches/sage/cephmds2/mon/Paxos.h +++ b/branches/sage/cephmds2/mon/Paxos.h @@ -35,10 +35,22 @@ e 12v */ + +/* + * NOTE: This libary is based on the Paxos algorithm, but varies in a few key ways: + * 1- Only a single new value is generated at a time, simplifying the recovery logic. + * 2- Nodes track "committed" values, and share them generously (and trustingly) + * 3- A 'leasing' mechism is built-in, allowing nodes to determine when it is safe to + * "read" their copy of the last committed value. + * + * This provides a simple replication substrate that services can be built on top of. + */ + #ifndef __MON_PAXOS_H #define __MON_PAXOS_H #include "include/types.h" +#include "mon_types.h" #include "include/buffer.h" #include "msg/Message.h" @@ -49,6 +61,7 @@ e 12v class Monitor; class MMonPaxos; + // i am one state machine. class Paxos { Monitor *mon; @@ -58,40 +71,160 @@ class Paxos { int machine_id; const char *machine_name; - // phase 1 + friend class PaxosService; + + // LEADER+PEON + + // -- generic state -- +public: + const static int STATE_RECOVERING = 1; // leader|peon: recovering paxos state + const static int STATE_ACTIVE = 2; // leader|peon: idle. peon may or may not have valid lease + const static int STATE_UPDATING = 3; // leader|peon: updating to new value + const char *get_statename(int s) { + switch (s) { + case STATE_RECOVERING: return "recovering"; + case STATE_ACTIVE: return "active"; + case STATE_UPDATING: return "updating"; + default: assert(0); return 0; + } + } + +private: + int state; + +public: + bool is_recovering() { return state == STATE_RECOVERING; } + bool is_active() { return state == STATE_ACTIVE; } + bool is_updating() { return state == STATE_UPDATING; } + +private: + // recovery (phase 1) + version_t last_pn; version_t last_committed; version_t accepted_pn; version_t accepted_pn_from; - - // results from our last replies - int num_last; + + // active (phase 2) + utime_t lease_expire; + list waiting_for_active; + list waiting_for_readable; + + + // -- leader -- + // recovery (paxos phase 1) + unsigned num_last; + version_t old_accepted_v; version_t old_accepted_pn; bufferlist old_accepted_value; - // phase 2 + // active + set acked_lease; + Context *lease_renew_event; + Context *lease_ack_timeout_event; + + // updating (paxos phase 2) bufferlist new_value; - int num_accepted; - + unsigned num_accepted; + + Context *accept_timeout_event; + + list waiting_for_writeable; + list waiting_for_commit; + + class C_AcceptTimeout : public Context { + Paxos *paxos; + public: + C_AcceptTimeout(Paxos *p) : paxos(p) {} + void finish(int r) { + paxos->accept_timeout(); + } + }; + + class C_LeaseAckTimeout : public Context { + Paxos *paxos; + public: + C_LeaseAckTimeout(Paxos *p) : paxos(p) {} + void finish(int r) { + paxos->lease_ack_timeout(); + } + }; + + class C_LeaseRenew : public Context { + Paxos *paxos; + public: + C_LeaseRenew(Paxos *p) : paxos(p) {} + void finish(int r) { + paxos->extend_lease(); + } + }; + + void collect(version_t oldpn); void handle_collect(MMonPaxos*); void handle_last(MMonPaxos*); void begin(bufferlist& value); void handle_begin(MMonPaxos*); void handle_accept(MMonPaxos*); + void accept_timeout(); void commit(); void handle_commit(MMonPaxos*); + void extend_lease(); + void handle_lease(MMonPaxos*); + void handle_lease_ack(MMonPaxos*); + void lease_ack_timeout(); + + void cancel_events(); version_t get_new_proposal_number(version_t gt=0); public: Paxos(Monitor *m, int w, - int mid,const char *mnm) : mon(m), whoami(w), - machine_id(mid), machine_name(mnm) { - } + int mid) : mon(m), whoami(w), + machine_id(mid), + machine_name(get_paxos_name(mid)), + state(STATE_RECOVERING), + lease_renew_event(0), + lease_ack_timeout_event(0), + accept_timeout_event(0) { } void dispatch(Message *m); - void leader_start(); + void init(); + + void leader_init(); + void peon_init(); + + + // -- service interface -- + /* + void wait_for_active(Context *c) { + assert(!is_active()); + waiting_for_active.push_back(c); + } + */ + + // read + version_t get_version() { return last_committed; } + bool is_readable(); + bool read(version_t v, bufferlist &bl); + version_t read_current(bufferlist &bl); + void wait_for_readable(Context *onreadable) { + assert(!is_readable()); + waiting_for_readable.push_back(onreadable); + } + + // write + bool is_leader(); + bool is_writeable(); + void wait_for_writeable(Context *c) { + assert(!is_writeable()); + waiting_for_writeable.push_back(c); + } + + bool propose_new_value(bufferlist& bl, Context *oncommit=0); + void wait_for_commit(Context *oncommit) { + waiting_for_commit.push_back(oncommit); + } }; diff --git a/branches/sage/cephmds2/mon/PaxosService.cc b/branches/sage/cephmds2/mon/PaxosService.cc new file mode 100644 index 0000000000000..021981b7b148b --- /dev/null +++ b/branches/sage/cephmds2/mon/PaxosService.cc @@ -0,0 +1,69 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "PaxosService.h" +#include "common/Clock.h" +#include "Monitor.h" + + + +#include "config.h" +#undef dout +#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cout << g_clock.now() << " mon" << mon->whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << "." << get_paxos_name(paxos->machine_id) << " " +#define derr(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cerr << g_clock.now() << " mon" << mon->whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << "." << get_paxos_name(paxos->machine_id) << " " + + + +void PaxosService::dispatch(Message *m) +{ + dout(10) << "dispatch " << *m << " from " << m->get_source_inst() << endl; + + // make sure our map is readable and up to date + if (!paxos->is_readable() || + !update_from_paxos()) { + dout(10) << " waiting for paxos -> readable" << endl; + paxos->wait_for_readable(new C_RetryMessage(this, m)); + return; + } + + // preprocess + if (preprocess_update(m)) + return; // easy! + + // leader? + if (!mon->is_leader()) { + // fw to leader + dout(10) << " fw to leader mon" << mon->get_leader() << endl; + mon->messenger->send_message(m, mon->monmap->get_inst(mon->get_leader())); + return; + } + + // writeable? + if (!paxos->is_writeable()) { + dout(10) << " waiting for paxos -> writeable" << endl; + paxos->wait_for_writeable(new C_RetryMessage(this, m)); + return; + } + + prepare_update(m); + + // do it now (for now!) *** + propose_pending(); +} + +void PaxosService::election_finished() +{ + if (mon->is_leader() && g_conf.mkfs) + create_initial(); +} diff --git a/branches/sage/cephmds2/mon/PaxosService.h b/branches/sage/cephmds2/mon/PaxosService.h new file mode 100644 index 0000000000000..59bcc770d108f --- /dev/null +++ b/branches/sage/cephmds2/mon/PaxosService.h @@ -0,0 +1,62 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef __PAXOSSERVICE_H +#define __PAXOSSERVICE_H + +#include "msg/Dispatcher.h" +#include "include/Context.h" + +class Monitor; +class Paxos; + +class PaxosService : public Dispatcher { +protected: + Monitor *mon; + Paxos *paxos; + + + class C_RetryMessage : public Context { + Dispatcher *svc; + Message *m; + public: + C_RetryMessage(Dispatcher *s, Message *m_) : svc(s), m(m_) {} + void finish(int r) { + svc->dispatch(m); + } + }; + +public: + PaxosService(Monitor *mn, Paxos *p) : mon(mn), paxos(p) { } + + // i implement + void dispatch(Message *m); + void election_finished(); + + // you implement + virtual void create_initial() = 0; + virtual bool update_from_paxos() = 0; + virtual void prepare_pending() = 0; + virtual void propose_pending() = 0; + + virtual bool preprocess_update(Message *m) = 0; // true if processed. + virtual void prepare_update(Message *m)= 0; + + virtual void tick() {}; // check state, take actions + + +}; + +#endif + diff --git a/branches/sage/cephmds2/messages/MMonElectionAck.h b/branches/sage/cephmds2/mon/mon_types.h similarity index 52% rename from branches/sage/cephmds2/messages/MMonElectionAck.h rename to branches/sage/cephmds2/mon/mon_types.h index 14f8b7cb49b7b..852e42b8d983f 100644 --- a/branches/sage/cephmds2/messages/MMonElectionAck.h +++ b/branches/sage/cephmds2/mon/mon_types.h @@ -12,21 +12,22 @@ * */ +#ifndef __MON_TYPES_H +#define __MON_TYPES_H -#ifndef __MMONELECTIONACK_H -#define __MMONELECTIONACK_H +#define PAXOS_TEST 0 +#define PAXOS_MDSMAP 1 +#define PAXOS_OSDMAP 2 +#define PAXOS_CLIENTMAP 3 -#include "msg/Message.h" - - -class MMonElectionAck : public Message { - public: - MMonElectionAck() : Message(MSG_MON_ELECTION_ACK) {} - - virtual char *get_type_name() { return "election_ack"; } - - void encode_payload() {} - void decode_payload() {} -}; +inline const char *get_paxos_name(int p) { + switch (p) { + case PAXOS_TEST: return "test"; + case PAXOS_MDSMAP: return "mdsmap"; + case PAXOS_OSDMAP: return "osdmap"; + case PAXOS_CLIENTMAP: return "clientmap"; + default: assert(0); return 0; + } +} #endif diff --git a/branches/sage/cephmds2/msg/FakeMessenger.cc b/branches/sage/cephmds2/msg/FakeMessenger.cc index b7f02bdb40624..19f8f4320ed68 100644 --- a/branches/sage/cephmds2/msg/FakeMessenger.cc +++ b/branches/sage/cephmds2/msg/FakeMessenger.cc @@ -71,17 +71,17 @@ void *fakemessenger_thread(void *ptr) { lock.Lock(); while (1) { + if (fm_shutdown) break; + fakemessenger_do_loop_2(); + + if (directory.empty()) break; + dout(20) << "thread waiting" << endl; if (fm_shutdown) break; awake = false; cond.Wait(lock); awake = true; dout(20) << "thread woke up" << endl; - if (fm_shutdown) break; - - fakemessenger_do_loop_2(); - - if (directory.empty()) break; } lock.Unlock(); diff --git a/branches/sage/cephmds2/msg/Message.cc b/branches/sage/cephmds2/msg/Message.cc index 716fafb491719..4c7572bc77a7d 100644 --- a/branches/sage/cephmds2/msg/Message.cc +++ b/branches/sage/cephmds2/msg/Message.cc @@ -15,9 +15,7 @@ using namespace std; #include "messages/MMonCommandAck.h" #include "messages/MMonPaxos.h" -#include "messages/MMonElectionAck.h" -#include "messages/MMonElectionPropose.h" -#include "messages/MMonElectionVictory.h" +#include "messages/MMonElection.h" #include "messages/MPing.h" #include "messages/MPingAck.h" @@ -116,14 +114,8 @@ decode_message(msg_envelope_t& env, bufferlist& payload) m = new MMonPaxos; break; - case MSG_MON_ELECTION_PROPOSE: - m = new MMonElectionPropose; - break; - case MSG_MON_ELECTION_ACK: - m = new MMonElectionAck; - break; - case MSG_MON_ELECTION_VICTORY: - m = new MMonElectionVictory; + case MSG_MON_ELECTION: + m = new MMonElection; break; case MSG_PING: diff --git a/branches/sage/cephmds2/msg/Message.h b/branches/sage/cephmds2/msg/Message.h index 6d66e713cb1fe..05d347e0bbdbe 100644 --- a/branches/sage/cephmds2/msg/Message.h +++ b/branches/sage/cephmds2/msg/Message.h @@ -26,9 +26,7 @@ #define MSG_MON_COMMAND_ACK 14 -#define MSG_MON_ELECTION_ACK 15 -#define MSG_MON_ELECTION_PROPOSE 16 -#define MSG_MON_ELECTION_VICTORY 17 +#define MSG_MON_ELECTION 15 #define MSG_MON_OSDMAP_INFO 20 #define MSG_MON_OSDMAP_LEASE 21 diff --git a/branches/sage/cephmds2/osd/OSD.cc b/branches/sage/cephmds2/osd/OSD.cc index 04f453fff85b7..ec3726503b241 100644 --- a/branches/sage/cephmds2/osd/OSD.cc +++ b/branches/sage/cephmds2/osd/OSD.cc @@ -3402,7 +3402,6 @@ void OSD::prepare_op_transaction(ObjectStore::Transaction& t, switch (op->get_op()) { case OSD_OP_WRLOCK: { // lock object - //r = store->setattr(oid, "wrlock", &op->get_asker(), sizeof(msg_addr_t), oncommit); t.setattr(oid, "wrlock", &op->get_client(), sizeof(entity_name_t)); } break; diff --git a/branches/sage/cephmds2/osd/ObjectStore.h b/branches/sage/cephmds2/osd/ObjectStore.h index 3a3cec32515ff..74818e0470526 100644 --- a/branches/sage/cephmds2/osd/ObjectStore.h +++ b/branches/sage/cephmds2/osd/ObjectStore.h @@ -102,14 +102,29 @@ public: list offsets; list lengths; list attrnames; + list attrnames2; //list< pair > attrvals; list attrbls; + // for reads only (not encoded) list pbls; list psts; list< pair > pattrvals; list< map* > pattrsets; + const char *get_attrname() { + if (attrnames.empty()) + return attrnames2.front().c_str(); + else + return attrnames.front(); + } + void pop_attrname() { + if (attrnames.empty()) + attrnames2.pop_front(); + else + attrnames.pop_front(); + } + void read(object_t oid, off_t off, size_t len, bufferlist *pbl) { int op = OP_READ; ops.push_back(op); @@ -232,6 +247,27 @@ public: } // etc. + + void _encode(bufferlist& bl) { + ::_encode(ops, bl); + ::_encode(bls, bl); + ::_encode(oids, bl); + ::_encode(cids, bl); + ::_encode(offsets, bl); + ::_encode(lengths, bl); + ::_encode(attrnames, bl); + ::_encode(attrbls, bl); + } + void _decode(bufferlist& bl, int& off) { + ::_decode(ops, bl, off); + ::_decode(bls, bl, off); + ::_decode(oids, bl, off); + ::_decode(cids, bl, off); + ::_decode(offsets, bl, off); + ::_decode(lengths, bl, off); + ::_decode(attrnames2, bl, off); + ::_decode(attrbls, bl, off); + } }; @@ -264,7 +300,7 @@ public: case Transaction::OP_GETATTR: { object_t oid = t.oids.front(); t.oids.pop_front(); - const char *attrname = t.attrnames.front(); t.attrnames.pop_front(); + const char *attrname = t.get_attrname(); t.pop_attrname(); pair pattrval = t.pattrvals.front(); t.pattrvals.pop_front(); *pattrval.second = getattr(oid, attrname, pattrval.first, *pattrval.second); } @@ -314,7 +350,7 @@ public: case Transaction::OP_SETATTR: { object_t oid = t.oids.front(); t.oids.pop_front(); - const char *attrname = t.attrnames.front(); t.attrnames.pop_front(); + const char *attrname = t.get_attrname(); t.pop_attrname(); //pair attrval = t.attrvals.front(); t.attrvals.pop_front(); bufferlist bl; bl.claim( t.attrbls.front() ); @@ -333,7 +369,7 @@ public: case Transaction::OP_RMATTR: { object_t oid = t.oids.front(); t.oids.pop_front(); - const char *attrname = t.attrnames.front(); t.attrnames.pop_front(); + const char *attrname = t.get_attrname(); t.pop_attrname(); rmattr(oid, attrname, 0); } break; @@ -379,7 +415,7 @@ public: case Transaction::OP_COLL_SETATTR: { coll_t cid = t.cids.front(); t.cids.pop_front(); - const char *attrname = t.attrnames.front(); t.attrnames.pop_front(); + const char *attrname = t.get_attrname(); t.pop_attrname(); //pair attrval = t.attrvals.front(); t.attrvals.pop_front(); bufferlist bl; bl.claim( t.attrbls.front() ); @@ -391,7 +427,7 @@ public: case Transaction::OP_COLL_RMATTR: { coll_t cid = t.cids.front(); t.cids.pop_front(); - const char *attrname = t.attrnames.front(); t.attrnames.pop_front(); + const char *attrname = t.get_attrname(); t.pop_attrname(); collection_rmattr(cid, attrname, 0); } break; -- 2.39.5