From a419ff600656aa240fc1cb2907de958c7bc6fc36 Mon Sep 17 00:00:00 2001 From: sageweil Date: Wed, 4 Jul 2007 04:34:48 +0000 Subject: [PATCH] merged r1409:1471 from trunk/ceph into branches/sage/pgs (the rest) git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1473 29311d96-e01e-0410-9327-a35deaab8ce9 --- branches/sage/pgs/Makefile | 4 +- branches/sage/pgs/TODO | 7 - branches/sage/pgs/client/Client.cc | 49 ++- branches/sage/pgs/client/Client.h | 18 + branches/sage/pgs/cmonctl.cc | 12 +- branches/sage/pgs/config.cc | 18 +- branches/sage/pgs/config.h | 8 +- branches/sage/pgs/cosd.cc | 10 +- branches/sage/pgs/doc/Commitdir.txt | 2 + branches/sage/pgs/doc/caching.txt | 1 + branches/sage/pgs/ebofs/BlockDevice.cc | 2 +- branches/sage/pgs/ebofs/Ebofs.cc | 336 ++++++++++++---- branches/sage/pgs/ebofs/Ebofs.h | 40 +- branches/sage/pgs/ebofs/FileJournal.cc | 373 +++++++++++++++--- branches/sage/pgs/ebofs/FileJournal.h | 99 ++++- branches/sage/pgs/ebofs/Journal.h | 16 +- branches/sage/pgs/ebofs/test.ebofs.cc | 3 +- branches/sage/pgs/ebofs/types.h | 13 +- branches/sage/pgs/include/Context.h | 9 +- branches/sage/pgs/include/buffer.h | 8 + branches/sage/pgs/mds/MDS.cc | 11 +- branches/sage/pgs/mds/MDSMap.h | 25 +- branches/sage/pgs/messages/MClientMount.h | 14 +- branches/sage/pgs/messages/MClientUnmount.h | 16 +- branches/sage/pgs/messages/MMDSBeacon.h | 22 +- branches/sage/pgs/messages/MMonCommand.h | 6 + branches/sage/pgs/messages/MMonElection.h | 63 +++ branches/sage/pgs/messages/MMonElectionAck.h | 32 -- .../sage/pgs/messages/MMonElectionPropose.h | 33 -- .../sage/pgs/messages/MMonElectionVictory.h | 41 -- branches/sage/pgs/messages/MMonPaxos.h | 46 ++- branches/sage/pgs/messages/MOSDBoot.h | 16 +- branches/sage/pgs/messages/MOSDFailure.h | 23 +- branches/sage/pgs/messages/MOSDGetMap.h | 6 +- branches/sage/pgs/msg/FakeMessenger.cc | 15 +- branches/sage/pgs/msg/Message.cc | 14 +- branches/sage/pgs/msg/Message.h | 4 +- branches/sage/pgs/osd/OSD.cc | 7 +- branches/sage/pgs/osd/OSDMap.h | 67 ++-- branches/sage/pgs/osd/ObjectStore.h | 46 ++- branches/sage/pgs/osd/PG.cc | 4 +- branches/sage/pgs/osdc/Objecter.cc | 2 +- 42 files changed, 1106 insertions(+), 435 deletions(-) create mode 100644 branches/sage/pgs/messages/MMonElection.h delete mode 100644 branches/sage/pgs/messages/MMonElectionAck.h delete mode 100644 branches/sage/pgs/messages/MMonElectionPropose.h delete mode 100644 branches/sage/pgs/messages/MMonElectionVictory.h diff --git a/branches/sage/pgs/Makefile b/branches/sage/pgs/Makefile index 2321d7068063d..9c2c757032eaf 100644 --- a/branches/sage/pgs/Makefile +++ b/branches/sage/pgs/Makefile @@ -38,7 +38,8 @@ EBOFS_OBJS= \ ebofs/BlockDevice.o\ ebofs/BufferCache.o\ ebofs/Ebofs.o\ - ebofs/Allocator.o + ebofs/Allocator.o\ + ebofs/FileJournal.o MDS_OBJS= \ mds/MDS.o\ @@ -74,6 +75,7 @@ OSDC_OBJS= \ MON_OBJS= \ mon/Monitor.o\ mon/Paxos.o\ + mon/PaxosService.o\ mon/OSDMonitor.o\ mon/MDSMonitor.o\ mon/ClientMonitor.o\ diff --git a/branches/sage/pgs/TODO b/branches/sage/pgs/TODO index d636339800220..2a92d5834e778 100644 --- a/branches/sage/pgs/TODO +++ b/branches/sage/pgs/TODO @@ -150,19 +150,12 @@ foreign rename - rejoin will need to explicitly resolve uncommitted items. - fully implement link/unlink first, and use that as a model? -monitor -- finish generic paxos - osdmon -- distribute w/ paxos framework - allow fresh replacement osds. add osd_created in osdmap, probably - monitor needs to monitor some osds... - monitor pg states, notify on out? - watch osd utilization; adjust overload in cluster map -mdsmon -- distribute w/ paxos framework - journaler - fix up for large events (e.g. imports) - use set_floor_and_read for safe takeover from possibly-not-quite-dead otherguy. diff --git a/branches/sage/pgs/client/Client.cc b/branches/sage/pgs/client/Client.cc index 90e37a58a6ac2..11fd9149df92f 100644 --- a/branches/sage/pgs/client/Client.cc +++ b/branches/sage/pgs/client/Client.cc @@ -95,13 +95,14 @@ public: // cons/des -Client::Client(Messenger *m, MonMap *mm) +Client::Client(Messenger *m, MonMap *mm) : timer(client_lock) { // which client am i? whoami = m->get_myname().num(); monmap = mm; mounted = false; + mount_timeout_event = 0; unmounting = false; last_tid = 0; @@ -875,21 +876,21 @@ void Client::handle_mds_map(MMDSMap* m) if (m->get_source().is_mds()) frommds = m->get_source().num(); - if (mdsmap == 0) + if (mdsmap == 0) { mdsmap = new MDSMap; - if (whoami < 0) { - // mounted! assert(m->get_source().is_mon()); whoami = m->get_dest().num(); dout(1) << "handle_mds_map i am now " << m->get_dest() << endl; messenger->reset_myname(m->get_dest()); - + mount_cond.Signal(); // mount might be waiting for this. - } + } dout(1) << "handle_mds_map epoch " << m->get_epoch() << endl; + epoch_t was = mdsmap->get_epoch(); mdsmap->decode(m->get_encoded()); + assert(mdsmap->get_epoch() >= was); // send reconnect? if (frommds >= 0 && @@ -1252,7 +1253,28 @@ void Client::update_caps_wanted(Inode *in) // ------------------- -// fs ops +// MOUNT + +void Client::_try_mount() +{ + dout(10) << "_try_mount" << endl; + int mon = monmap->pick_mon(); + dout(2) << "sending client_mount to mon" << mon << endl; + messenger->send_message(new MClientMount(messenger->get_myaddr()), + monmap->get_inst(mon)); + + // schedule timeout + assert(mount_timeout_event == 0); + mount_timeout_event = new C_MountTimeout(this); + timer.add_event_after(g_conf.client_mount_timeout, mount_timeout_event); +} + +void Client::_mount_timeout() +{ + dout(10) << "_mount_timeout" << endl; + mount_timeout_event = 0; + _try_mount(); +} int Client::mount() { @@ -1260,14 +1282,15 @@ int Client::mount() assert(!mounted); // caller is confused? assert(!mdsmap); - int mon = monmap->pick_mon(); - dout(2) << "sending client_mount to mon" << mon << endl; - messenger->send_message(new MClientMount, monmap->get_inst(mon)); + _try_mount(); while (!mdsmap || !osdmap || osdmap->get_epoch() == 0) mount_cond.Wait(client_lock); + + timer.cancel_event(mount_timeout_event); + mount_timeout_event = 0; mounted = true; @@ -1291,6 +1314,9 @@ int Client::mount() } +// UNMOUNT + + int Client::unmount() { client_lock.Lock(); @@ -1359,7 +1385,8 @@ int Client::unmount() // send unmount! int mon = monmap->pick_mon(); dout(2) << "sending client_unmount to mon" << mon << endl; - messenger->send_message(new MClientUnmount, monmap->get_inst(mon)); + messenger->send_message(new MClientUnmount(messenger->get_myinst()), + monmap->get_inst(mon)); while (mounted) mount_cond.Wait(client_lock); diff --git a/branches/sage/pgs/client/Client.h b/branches/sage/pgs/client/Client.h index 7e66efe7b6c85..700ae3f1e11cc 100644 --- a/branches/sage/pgs/client/Client.h +++ b/branches/sage/pgs/client/Client.h @@ -34,9 +34,11 @@ #include "include/interval_set.h" #include "common/Mutex.h" +#include "common/Timer.h" #include "FileCache.h" + // stl #include #include @@ -333,6 +335,7 @@ class Client : public Dispatcher { MDSMap *mdsmap; OSDMap *osdmap; + SafeTimer timer; protected: Messenger *messenger; @@ -574,6 +577,21 @@ protected: // ---------------------- // fs ops. +private: + void _try_mount(); + void _mount_timeout(); + Context *mount_timeout_event; + + class C_MountTimeout : public Context { + Client *client; + public: + C_MountTimeout(Client *c) : client(c) { } + void finish(int r) { + if (r >= 0) client->_mount_timeout(); + } + }; + +public: int mount(); int unmount(); diff --git a/branches/sage/pgs/cmonctl.cc b/branches/sage/pgs/cmonctl.cc index 19148942acc76..34bd80f9a4d8f 100644 --- a/branches/sage/pgs/cmonctl.cc +++ b/branches/sage/pgs/cmonctl.cc @@ -64,8 +64,13 @@ int main(int argc, char **argv, char *envp[]) { int r = monmap.read(".ceph_monmap"); assert(r >= 0); + // start up network + rank.start_rank(); + messenger = rank.register_entity(entity_name_t(entity_name_t::TYPE_ADMIN)); + messenger->set_dispatcher(&dispatcher); + // build command - MMonCommand *m = new MMonCommand; + MMonCommand *m = new MMonCommand(messenger->get_myinst()); string cmd; for (unsigned i=0; iset_dispatcher(&dispatcher); - // send it messenger->send_message(m, monmap.get_inst(mon)); diff --git a/branches/sage/pgs/config.cc b/branches/sage/pgs/config.cc index d6a21af1d03cd..cef79bb9dd63e 100644 --- a/branches/sage/pgs/config.cc +++ b/branches/sage/pgs/config.cc @@ -101,7 +101,7 @@ md_config_t g_conf = { // --- clock --- clock_lock: false, - clock_tare: true, + clock_tare: false, // --- messenger --- ms_single_dispatch: false, @@ -125,8 +125,13 @@ md_config_t g_conf = { // --- mon --- mon_tick_interval: 5, mon_osd_down_out_interval: 5, // seconds - mon_lease: 2.000, // seconds - mon_stop_with_last_mds: true, + mon_lease: 5, // seconds // lease interval + mon_lease_renew_interval: 3, // on leader, to renew the lease + mon_lease_ack_timeout: 10.0, // on leader, if lease isn't acked by all peons + mon_lease_timeout: 10.0, // on peon, if lease isn't extended + mon_accept_timeout: 10.0, // on leader, if paxos update isn't accepted + mon_stop_on_last_unmount: false, + mon_stop_with_last_mds: false, // --- client --- client_cache_size: 300, @@ -142,6 +147,8 @@ md_config_t g_conf = { client_oc_max_dirty: 1024*1024* 5, // MB * n client_oc_max_sync_write: 128*1024, // writes >= this use wrlock + client_mount_timeout: 10.0, // retry every N seconds + client_hack_balance_reads: false, client_trace: 0, @@ -194,7 +201,6 @@ md_config_t g_conf = { mds_trim_on_rejoin: true, mds_commit_on_shutdown: true, mds_shutdown_check: 0, //30, - mds_shutdown_on_last_unmount: true, mds_verify_export_dirauth: true, @@ -600,8 +606,6 @@ void parse_config_options(std::vector& args) g_conf.mds_commit_on_shutdown = atoi(args[++i]); else if (strcmp(args[i], "--mds_shutdown_check") == 0) g_conf.mds_shutdown_check = atoi(args[++i]); - else if (strcmp(args[i], "--mds_shutdown_on_last_unmount") == 0) - g_conf.mds_shutdown_on_last_unmount = atoi(args[++i]); else if (strcmp(args[i], "--mds_log_flush_on_shutdown") == 0) g_conf.mds_log_flush_on_shutdown = atoi(args[++i]); @@ -663,6 +667,8 @@ void parse_config_options(std::vector& args) else if (strcmp(args[i], "--mon_osd_down_out_interval") == 0) g_conf.mon_osd_down_out_interval = atoi(args[++i]); + else if (strcmp(args[i], "--mon_stop_on_last_unmount") == 0) + g_conf.mon_stop_on_last_unmount = atoi(args[++i]); else if (strcmp(args[i], "--mon_stop_with_last_mds") == 0) g_conf.mon_stop_with_last_mds = atoi(args[++i]); diff --git a/branches/sage/pgs/config.h b/branches/sage/pgs/config.h index 7110a5ad6ea86..0831e123ddbee 100644 --- a/branches/sage/pgs/config.h +++ b/branches/sage/pgs/config.h @@ -115,6 +115,11 @@ struct md_config_t { int mon_tick_interval; int mon_osd_down_out_interval; float mon_lease; + float mon_lease_renew_interval; + float mon_lease_ack_timeout; + float mon_lease_timeout; + float mon_accept_timeout; + bool mon_stop_on_last_unmount; bool mon_stop_with_last_mds; // client @@ -131,6 +136,8 @@ struct md_config_t { int client_oc_max_dirty; size_t client_oc_max_sync_write; + double client_mount_timeout; + // hack bool client_hack_balance_reads; @@ -196,7 +203,6 @@ struct md_config_t { bool mds_trim_on_rejoin; bool mds_commit_on_shutdown; int mds_shutdown_check; - bool mds_shutdown_on_last_unmount; bool mds_verify_export_dirauth; // debug flag diff --git a/branches/sage/pgs/cosd.cc b/branches/sage/pgs/cosd.cc index 800eacf5acd9a..4f3c8ab71a19f 100644 --- a/branches/sage/pgs/cosd.cc +++ b/branches/sage/pgs/cosd.cc @@ -66,7 +66,8 @@ int main(int argc, char **argv) if (g_conf.clock_tare) g_clock.tare(); // osd specific args - char *dev; + char *dev = 0; + char dev_default[20]; int whoami = -1; for (unsigned i=0; i (/usr, /home) + /usr/local -> () /home -> () and on mds1: diff --git a/branches/sage/pgs/ebofs/BlockDevice.cc b/branches/sage/pgs/ebofs/BlockDevice.cc index 29071fd5bd8da..2c2b4ab18f092 100644 --- a/branches/sage/pgs/ebofs/BlockDevice.cc +++ b/branches/sage/pgs/ebofs/BlockDevice.cc @@ -639,7 +639,7 @@ int BlockDevice::_write(int fd, unsigned bno, unsigned num, bufferlist& bl) iov[n].iov_base = (void*)i->c_str(); iov[n].iov_len = MIN(left, i->length()); - assert((((unsigned long long)iov[n].iov_base) & 4095ULL) == 0); + assert((((intptr_t)iov[n].iov_base) & ((intptr_t)4095ULL)) == 0); assert((iov[n].iov_len & 4095) == 0); left -= iov[n].iov_len; diff --git a/branches/sage/pgs/ebofs/Ebofs.cc b/branches/sage/pgs/ebofs/Ebofs.cc index 7d03dc074c30e..a30f031f10fac 100644 --- a/branches/sage/pgs/ebofs/Ebofs.cc +++ b/branches/sage/pgs/ebofs/Ebofs.cc @@ -16,6 +16,8 @@ #include "Ebofs.h" +#include "FileJournal.h" + #include #ifndef DARWIN @@ -50,6 +52,7 @@ int Ebofs::mount() ebofs_lock.Lock(); assert(!mounted); + // open dev int r = dev.open(&idle_kicker); if (r < 0) { ebofs_lock.Unlock(); @@ -79,6 +82,8 @@ int Ebofs::mount() dout(3) << "mount epoch " << super_epoch << endl; assert(super_epoch == sb->epoch); + super_fsid = sb->fsid; + free_blocks = sb->free_blocks; limbo_blocks = sb->limbo_blocks; @@ -101,6 +106,43 @@ int Ebofs::mount() allocator.release_limbo(); + + // open journal? + if (journalfn) { + journal = new FileJournal(this, journalfn); + if (journal->open() < 0) { + dout(-3) << "mount journal " << journalfn << " open failed" << endl; + delete journal; + journal = 0; + } else { + dout(-3) << "mount journal " << journalfn << " opened, replaying" << endl; + + while (1) { + bufferlist bl; + epoch_t e; + if (!journal->read_entry(bl, e)) { + dout(-3) << "mount replay: end of journal, done." << endl; + break; + } + + if (e < super_epoch) { + dout(-3) << "mount replay: skipping old entry in epoch " << e << " < " << super_epoch << endl; + } + if (e == super_epoch+1) { + super_epoch++; + dout(-3) << "mount replay: jumped to next epoch " << super_epoch << endl; + } + assert(e == super_epoch); + + dout(-3) << "mount replay: applying transaction in epoch " << e << endl; + Transaction t; + int off = 0; + t._decode(bl, off); + _apply_transaction(t); + } + } + } + dout(3) << "mount starting commit+finisher threads" << endl; commit_thread.create(); finisher_thread.create(); @@ -108,6 +150,7 @@ int Ebofs::mount() dout(1) << "mounted " << dev.get_device_name() << " " << dev.get_num_blocks() << " blocks, " << nice_blocks(dev.get_num_blocks()) << endl; mounted = true; + ebofs_lock.Unlock(); return 0; } @@ -126,6 +169,10 @@ int Ebofs::mkfs() block_t num_blocks = dev.get_num_blocks(); + // make a super-random fsid + srand(time(0) ^ getpid()); + super_fsid = (lrand48() << 32) ^ mrand48(); + free_blocks = 0; limbo_blocks = 0; @@ -197,6 +244,18 @@ int Ebofs::mkfs() dev.close(); + + // create journal? + if (journalfn) { + journal = new FileJournal(this, journalfn); + if (journal->create() < 0) { + dout(3) << "mount journal " << journalfn << " created failed" << endl; + delete journal; + } else { + dout(3) << "mount journal " << journalfn << " created" << endl; + } + } + dout(2) << "mkfs: " << dev.get_device_name() << " " << dev.get_num_blocks() << " blocks, " << nice_blocks(dev.get_num_blocks()) << endl; ebofs_lock.Unlock(); return 0; @@ -272,6 +331,7 @@ void Ebofs::prepare_super(version_t epoch, bufferptr& bp) // fill in super memset(&sb, 0, sizeof(sb)); sb.s_magic = EBOFS_MAGIC; + sb.fsid = super_fsid; sb.epoch = epoch; sb.num_blocks = dev.get_num_blocks(); @@ -409,6 +469,7 @@ int Ebofs::commit_thread_entry() << ", max dirty " << g_conf.ebofs_bc_max_dirty << endl; + if (journal) journal->commit_epoch_start(); // (async) write onodes+condes (do this first; it currently involves inode reallocation) commit_inodes_start(); @@ -453,14 +514,14 @@ int Ebofs::commit_thread_entry() alloc_more_node_space(); } + // signal journal + if (journal) journal->commit_epoch_finish(); + // kick waiters dout(10) << "commit_thread queueing commit + kicking sync waiters" << endl; - finisher_lock.Lock(); - finisher_queue.splice(finisher_queue.end(), commit_waiters[super_epoch-1]); + queue_finishers(commit_waiters[super_epoch-1]); commit_waiters.erase(super_epoch-1); - finisher_cond.Signal(); - finisher_lock.Unlock(); sync_cond.Signal(); @@ -1222,7 +1283,18 @@ void Ebofs::sync(Context *onsafe) ebofs_lock.Lock(); if (onsafe) { dirty = true; - commit_waiters[super_epoch].push_back(onsafe); + + while (1) { + if (journal) { + // journal empty transaction + Transaction t; + bufferlist bl; + t._encode(bl); + if (journal->submit_entry(bl, onsafe)) break; + } + commit_waiters[super_epoch].push_back(onsafe); + break; + } } ebofs_lock.Unlock(); } @@ -1994,6 +2066,29 @@ unsigned Ebofs::apply_transaction(Transaction& t, Context *onsafe) ebofs_lock.Lock(); dout(7) << "apply_transaction start (" << t.ops.size() << " ops)" << endl; + unsigned r = _apply_transaction(t); + + // journal, wait for commit + if (r != 0 && onsafe) { + delete onsafe; // kill callback, but still journal below (in case transaction had side effects) + onsafe = 0; + } + while (1) { + if (journal) { + bufferlist bl; + t._encode(bl); + if (journal->submit_entry(bl, onsafe)) break; + } + if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + break; + } + + ebofs_lock.Unlock(); + return r; +} + +unsigned Ebofs::_apply_transaction(Transaction& t) +{ // do ops unsigned r = 0; // bit fields indicate which ops failed. int bit = 1; @@ -2028,7 +2123,7 @@ unsigned Ebofs::apply_transaction(Transaction& t, Context *onsafe) case Transaction::OP_GETATTR: { object_t oid = t.oids.front(); t.oids.pop_front(); - const char *attrname = t.attrnames.front(); t.attrnames.pop_front(); + const char *attrname = t.get_attrname(); t.pop_attrname(); pair pattrval = t.pattrvals.front(); t.pattrvals.pop_front(); if ((*(pattrval.second) = _getattr(oid, attrname, pattrval.first, *(pattrval.second))) < 0) { dout(7) << "apply_transaction fail on _getattr" << endl; @@ -2095,7 +2190,7 @@ unsigned Ebofs::apply_transaction(Transaction& t, Context *onsafe) case Transaction::OP_SETATTR: { object_t oid = t.oids.front(); t.oids.pop_front(); - const char *attrname = t.attrnames.front(); t.attrnames.pop_front(); + const char *attrname = t.get_attrname(); t.pop_attrname(); //pair attrval = t.attrvals.front(); t.attrvals.pop_front(); bufferlist bl; bl.claim( t.attrbls.front() ); @@ -2121,7 +2216,7 @@ unsigned Ebofs::apply_transaction(Transaction& t, Context *onsafe) case Transaction::OP_RMATTR: { object_t oid = t.oids.front(); t.oids.pop_front(); - const char *attrname = t.attrnames.front(); t.attrnames.pop_front(); + const char *attrname = t.get_attrname(); t.pop_attrname(); if (_rmattr(oid, attrname) < 0) { dout(7) << "apply_transaction fail on _rmattr" << endl; r &= bit; @@ -2185,7 +2280,7 @@ unsigned Ebofs::apply_transaction(Transaction& t, Context *onsafe) case Transaction::OP_COLL_SETATTR: { coll_t cid = t.cids.front(); t.cids.pop_front(); - const char *attrname = t.attrnames.front(); t.attrnames.pop_front(); + const char *attrname = t.get_attrname(); t.pop_attrname(); //pair attrval = t.attrvals.front(); t.attrvals.pop_front(); bufferlist bl; bl.claim( t.attrbls.front() ); @@ -2201,7 +2296,7 @@ unsigned Ebofs::apply_transaction(Transaction& t, Context *onsafe) case Transaction::OP_COLL_RMATTR: { coll_t cid = t.cids.front(); t.cids.pop_front(); - const char *attrname = t.attrnames.front(); t.attrnames.pop_front(); + const char *attrname = t.get_attrname(); t.pop_attrname(); if (_collection_rmattr(cid, attrname) < 0) { dout(7) << "apply_transaction fail on _collection_rmattr" << endl; r &= bit; @@ -2217,16 +2312,7 @@ unsigned Ebofs::apply_transaction(Transaction& t, Context *onsafe) bit = bit << 1; } - dout(7) << "apply_transaction finish (r = " << r << ")" << endl; - - // set up commit waiter - //if (r == 0) { - if (onsafe) commit_waiters[super_epoch].push_back(onsafe); - //} else { - //if (onsafe) delete onsafe; - //} - - ebofs_lock.Unlock(); + dout(7) << "_apply_transaction finish (r = " << r << ")" << endl; return r; } @@ -2295,36 +2381,6 @@ int Ebofs::_write(object_t oid, off_t offset, size_t length, bufferlist& bl) } -/*int Ebofs::write(object_t oid, - off_t off, size_t len, - bufferlist& bl, bool fsync) -{ - // wait? - if (fsync) { - // wait for flush. - Cond cond; - bool done; - int flush = 1; // write never returns positive - Context *c = new C_Cond(&cond, &done, &flush); - int r = write(oid, off, len, bl, c); - if (r < 0) return r; - - ebofs_lock.Lock(); - { - while (!done) - cond.Wait(ebofs_lock); - assert(flush <= 0); - } - ebofs_lock.Unlock(); - if (flush < 0) return flush; - return r; - } else { - // don't wait for flush. - return write(oid, off, len, bl, (Context*)0); - } -} -*/ - int Ebofs::write(object_t oid, off_t off, size_t len, bufferlist& bl, Context *onsafe) @@ -2338,7 +2394,17 @@ int Ebofs::write(object_t oid, // commit waiter if (r > 0) { assert((size_t)r == len); - if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + while (1) { + if (journal) { + Transaction t; + t.write(oid, off, len, bl); + bufferlist bl; + t._encode(bl); + if (journal->submit_entry(bl, onsafe)) break; + } + if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + break; + } } else { if (onsafe) delete onsafe; } @@ -2370,9 +2436,19 @@ int Ebofs::remove(object_t oid, Context *onsafe) // do it int r = _remove(oid); - // set up commit waiter + // journal, wait for commit if (r >= 0) { - if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + while (1) { + if (journal) { + Transaction t; + t.remove(oid); + bufferlist bl; + t._encode(bl); + if (journal->submit_entry(bl, onsafe)) break; + } + if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + break; + } } else { if (onsafe) delete onsafe; } @@ -2445,9 +2521,19 @@ int Ebofs::truncate(object_t oid, off_t size, Context *onsafe) int r = _truncate(oid, size); - // set up commit waiter + // journal, wait for commit if (r >= 0) { - if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + while (1) { + if (journal) { + Transaction t; + t.truncate(oid, size); + bufferlist bl; + t._encode(bl); + if (journal->submit_entry(bl, onsafe)) break; + } + if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + break; + } } else { if (onsafe) delete onsafe; } @@ -2464,9 +2550,19 @@ int Ebofs::clone(object_t from, object_t to, Context *onsafe) int r = _clone(from, to); - // set up commit waiter + // journal, wait for commit if (r >= 0) { - if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + while (1) { + if (journal) { + Transaction t; + t.clone(from, to); + bufferlist bl; + t._encode(bl); + if (journal->submit_entry(bl, onsafe)) break; + } + if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + break; + } } else { if (onsafe) delete onsafe; } @@ -2640,9 +2736,19 @@ int Ebofs::setattr(object_t oid, const char *name, const void *value, size_t siz ebofs_lock.Lock(); int r = _setattr(oid, name, value, size); - // set up commit waiter + // journal, wait for commit if (r >= 0) { - if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + while (1) { + if (journal) { + Transaction t; + t.setattr(oid, name, value, size); + bufferlist bl; + t._encode(bl); + if (journal->submit_entry(bl, onsafe)) break; + } + if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + break; + } } else { if (onsafe) delete onsafe; } @@ -2673,9 +2779,19 @@ int Ebofs::setattrs(object_t oid, map& attrset, Context *onsaf ebofs_lock.Lock(); int r = _setattrs(oid, attrset); - // set up commit waiter + // journal, wait for commit if (r >= 0) { - if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + while (1) { + if (journal) { + Transaction t; + t.setattrs(oid, attrset); + bufferlist bl; + t._encode(bl); + if (journal->submit_entry(bl, onsafe)) break; + } + if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + break; + } } else { if (onsafe) delete onsafe; } @@ -2758,9 +2874,19 @@ int Ebofs::rmattr(object_t oid, const char *name, Context *onsafe) int r = _rmattr(oid, name); - // set up commit waiter + // journal, wait for commit if (r >= 0) { - if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + while (1) { + if (journal) { + Transaction t; + t.rmattr(oid, name); + bufferlist bl; + t._encode(bl); + if (journal->submit_entry(bl, onsafe)) break; + } + if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + break; + } } else { if (onsafe) delete onsafe; } @@ -2835,9 +2961,19 @@ int Ebofs::create_collection(coll_t cid, Context *onsafe) int r = _create_collection(cid); - // set up commit waiter + // journal, wait for commit if (r >= 0) { - if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + while (1) { + if (journal) { + Transaction t; + t.create_collection(cid); + bufferlist bl; + t._encode(bl); + if (journal->submit_entry(bl, onsafe)) break; + } + if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + break; + } } else { if (onsafe) delete onsafe; } @@ -2882,9 +3018,19 @@ int Ebofs::destroy_collection(coll_t cid, Context *onsafe) int r = _destroy_collection(cid); - // set up commit waiter + // journal, wait for commit if (r >= 0) { - if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + while (1) { + if (journal) { + Transaction t; + t.remove_collection(cid); + bufferlist bl; + t._encode(bl); + if (journal->submit_entry(bl, onsafe)) break; + } + if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + break; + } } else { if (onsafe) delete onsafe; } @@ -2936,9 +3082,19 @@ int Ebofs::collection_add(coll_t cid, object_t oid, Context *onsafe) int r = _collection_add(cid, oid); - // set up commit waiter + // journal, wait for commit if (r >= 0) { - if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + while (1) { + if (journal) { + Transaction t; + t.collection_add(cid, oid); + bufferlist bl; + t._encode(bl); + if (journal->submit_entry(bl, onsafe)) break; + } + if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + break; + } } else { if (onsafe) delete onsafe; } @@ -2977,9 +3133,19 @@ int Ebofs::collection_remove(coll_t cid, object_t oid, Context *onsafe) int r = _collection_remove(cid, oid); - // set up commit waiter + // journal, wait for commit if (r >= 0) { - if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + while (1) { + if (journal) { + Transaction t; + t.collection_remove(cid, oid); + bufferlist bl; + t._encode(bl); + if (journal->submit_entry(bl, onsafe)) break; + } + if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + break; + } } else { if (onsafe) delete onsafe; } @@ -3040,9 +3206,19 @@ int Ebofs::collection_setattr(coll_t cid, const char *name, const void *value, s int r = _collection_setattr(cid, name, value, size); - // set up commit waiter + // journal, wait for commit if (r >= 0) { - if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + while (1) { + if (journal) { + Transaction t; + t.collection_setattr(cid, name, value, size); + bufferlist bl; + t._encode(bl); + if (journal->submit_entry(bl, onsafe)) break; + } + if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + break; + } } else { if (onsafe) delete onsafe; } @@ -3098,9 +3274,19 @@ int Ebofs::collection_rmattr(coll_t cid, const char *name, Context *onsafe) int r = _collection_rmattr(cid, name); - // set up commit waiter + // journal, wait for commit if (r >= 0) { - if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + while (1) { + if (journal) { + Transaction t; + t.collection_rmattr(cid, name); + bufferlist bl; + t._encode(bl); + if (journal->submit_entry(bl, onsafe)) break; + } + if (onsafe) commit_waiters[super_epoch].push_back(onsafe); + break; + } } else { if (onsafe) delete onsafe; } diff --git a/branches/sage/pgs/ebofs/Ebofs.h b/branches/sage/pgs/ebofs/Ebofs.h index 91c5eb51b3cda..eb20cf8920531 100644 --- a/branches/sage/pgs/ebofs/Ebofs.h +++ b/branches/sage/pgs/ebofs/Ebofs.h @@ -29,6 +29,7 @@ using namespace __gnu_cxx; #include "nodes.h" #include "Allocator.h" #include "Table.h" +#include "Journal.h" #include "common/Mutex.h" #include "common/Cond.h" @@ -40,20 +41,23 @@ typedef pair coll_object_t; class Ebofs : public ObjectStore { - protected: +protected: Mutex ebofs_lock; // a beautiful global lock // ** debuggy ** bool fake_writes; // ** super ** +public: BlockDevice dev; +protected: bool mounted, unmounting, dirty; bool readonly; version_t super_epoch; bool commit_thread_started, mid_commit; Cond commit_cond; // to wake up the commit thread Cond sync_cond; + uint64_t super_fsid; map > commit_waiters; @@ -71,9 +75,16 @@ class Ebofs : public ObjectStore { } } commit_thread; - +public: + uint64_t get_fsid() { return super_fsid; } + epoch_t get_super_epoch() { return super_epoch; } +protected: + // ** journal ** + char *journalfn; + Journal *journal; + // ** allocator ** block_t free_blocks, limbo_blocks; Allocator allocator; @@ -188,6 +199,21 @@ class Ebofs : public ObjectStore { bool finisher_stop; list finisher_queue; +public: + void queue_finisher(Context *c) { + finisher_lock.Lock(); + finisher_queue.push_back(c); + finisher_cond.Signal(); + finisher_lock.Unlock(); + } + void queue_finishers(list& ls) { + finisher_lock.Lock(); + finisher_queue.splice(finisher_queue.end(), ls); + finisher_cond.Signal(); + finisher_lock.Unlock(); + } +protected: + void *finisher_thread_entry(); class FinisherThread : public Thread { Ebofs *ebofs; @@ -204,12 +230,13 @@ class Ebofs : public ObjectStore { public: - Ebofs(char *devfn) : + Ebofs(char *devfn, char *jfn=0) : fake_writes(false), dev(devfn), mounted(false), unmounting(false), dirty(false), readonly(false), super_epoch(0), commit_thread_started(false), mid_commit(false), commit_thread(this), + journalfn(jfn), journal(0), free_blocks(0), limbo_blocks(0), allocator(this), nodepool(ebofs_lock), @@ -222,6 +249,11 @@ class Ebofs : public ObjectStore { finisher_stop(false), finisher_thread(this) { for (int i=0; i + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ #include "FileJournal.h" #include "Ebofs.h" -#include "config.h" -#define dout(x) if (x <= g_conf.debug_ebofs) cout << "ebofs(" << dev.get_device_name() << ").journal " -#define derr(x) if (x <= g_conf.debug_ebofs) cerr << "ebofs(" << dev.get_device_name() << ").journal " +#include +#include +#include +#include + +#include "config.h" +#undef dout +#define dout(x) if (true || x <= g_conf.debug_ebofs) cout << "ebofs(" << ebofs->dev.get_device_name() << ").journal " +#define derr(x) if (x <= g_conf.debug_ebofs) cerr << "ebofs(" << ebofs->dev.get_device_name() << ").journal " -void FileJournal::create() +int FileJournal::create() { dout(1) << "create " << fn << endl; // open/create - fd = ::open(fn.c_str(), O_CREAT|O_WRONLY); + fd = ::open(fn.c_str(), O_RDWR|O_SYNC); + if (fd < 0) { + dout(1) << "create failed " << errno << " " << strerror(errno) << endl; + return -errno; + } assert(fd > 0); - ::ftruncate(fd); - ::fchmod(fd, 0644); + //::ftruncate(fd, 0); + //::fchmod(fd, 0644); + + // get size + struct stat st; + ::fstat(fd, &st); + dout(1) << "open " << fn << " " << st.st_size << " bytes" << endl; + + // write empty header + header.clear(); + header.fsid = ebofs->get_fsid(); + header.max_size = st.st_size; + write_header(); + + // writeable. + read_pos = 0; + write_pos = queue_pos = sizeof(header); ::close(fd); -} + return 0; +} -void FileJournal::open() +int FileJournal::open() { - dout(1) << "open " << fn << endl; + //dout(1) << "open " << fn << endl; // open and file assert(fd == 0); - fd = ::open(fn.c_str(), O_RDWR); + fd = ::open(fn.c_str(), O_RDWR|O_SYNC); + if (fd < 0) { + dout(1) << "open failed " << errno << " " << strerror(errno) << endl; + return -errno; + } assert(fd > 0); - // read header? - // *** + // assume writeable, unless... + read_pos = 0; + write_pos = queue_pos = sizeof(header); + // read header? + read_header(); + if (header.num > 0 && header.fsid == ebofs->get_fsid()) { + // valid header, pick an offset + for (int i=0; iget_super_epoch()) { + dout(2) << "using read_pos header pointer " + << header.epoch[i] << " at " << header.offset[i] + << endl; + read_pos = header.offset[i]; + write_pos = queue_pos = 0; + break; + } + else if (header.epoch[i] < ebofs->get_super_epoch()) { + dout(2) << "super_epoch is " << ebofs->get_super_epoch() + << ", skipping old " << header.epoch[i] << " at " << header.offset[i] + << endl; + } + else if (header.epoch[i] > ebofs->get_super_epoch()) { + dout(2) << "super_epoch is " << ebofs->get_super_epoch() + << ", but wtf, journal is later " << header.epoch[i] << " at " << header.offset[i] + << endl; + break; + } + } + } start_writer(); + + return 0; } void FileJournal::close() @@ -49,7 +119,8 @@ void FileJournal::close() stop_writer(); // close - assert(q.empty()); + assert(writeq.empty()); + assert(commitq.empty()); assert(fd > 0); ::close(fd); fd = 0; @@ -73,12 +144,36 @@ void FileJournal::stop_writer() } +void FileJournal::print_header() +{ + for (int i=0; i::const_iterator it = bl.buffers().begin(); it != bl.buffers().end(); @@ -124,14 +231,21 @@ void FileJournal::write_thread_entry() if ((*it).length() == 0) continue; // blank buffer. ::write(fd, (char*)(*it).c_str(), (*it).length() ); } + + ::write(fd, &h, sizeof(h)); // move position pointer - bottom += sizeof(epoch_t) + sizeof(uint32_t) + e.length(); + write_pos += 2*sizeof(entry_header_t) + bl.length(); - // do commit callback if (oncommit) { - oncommit->finish(0); - delete oncommit; + if (1) { + // queue callback + ebofs->queue_finisher(oncommit); + } else { + // callback now + oncommit->finish(0); + delete oncommit; + } } } } @@ -140,61 +254,202 @@ void FileJournal::write_thread_entry() dout(10) << "write_thread_entry finish" << endl; } -void FileJournal::submit_entry(bufferlist& e, Context *oncommit) +bool FileJournal::submit_entry(bufferlist& e, Context *oncommit) { - dout(10) << "submit_entry " << bottom << " : " << e.length() - << " epoch " << ebofs->super_epoch + assert(queue_pos != 0); // bad create(), or journal didn't replay to completion. + + // ** lock ** + Mutex::Locker locker(write_lock); + + // wrap? full? + off_t size = 2*sizeof(entry_header_t) + e.length(); + + if (full) return false; // already marked full. + + if (header.wrap) { + // we're wrapped. don't overwrite ourselves. + if (queue_pos + size >= header.offset[0]) { + dout(10) << "submit_entry JOURNAL FULL (and wrapped), " << queue_pos << "+" << size + << " >= " << header.offset[0] + << endl; + full = true; + print_header(); + return false; + } + } else { + // we haven't wrapped. + if (queue_pos + size >= header.max_size) { + // is there room if we wrap? + if ((off_t)sizeof(header_t) + size < header.offset[0]) { + // yes! + dout(10) << "submit_entry wrapped from " << queue_pos << " to " << sizeof(header_t) << endl; + header.wrap = queue_pos; + queue_pos = sizeof(header_t); + header.push(ebofs->get_super_epoch(), queue_pos); + } else { + // no room. + dout(10) << "submit_entry JOURNAL FULL (and can't wrap), " << queue_pos << "+" << size + << " >= " << header.max_size + << endl; + full = true; + return false; + } + } + } + + dout(10) << "submit_entry " << queue_pos << " : " << e.length() + << " epoch " << ebofs->get_super_epoch() << " " << oncommit << endl; // dump on queue - writeq.push_back(pair(ebofs->super_epoch, e)); + writeq.push_back(pair(ebofs->get_super_epoch(), e)); commitq.push_back(oncommit); - + + queue_pos += size; + // kick writer thread write_cond.Signal(); + + return true; } void FileJournal::commit_epoch_start() { - dout(10) << "commit_epoch_start" << endl; + dout(10) << "commit_epoch_start on " << ebofs->get_super_epoch()-1 + << " -- new epoch " << ebofs->get_super_epoch() + << endl; - write_lock.Lock(); - { - header.epoch2 = ebofs->super_epoch; - header.top2 = bottom; - write_header(); - } - write_lock.Unlock(); + Mutex::Locker locker(write_lock); + + // was full -> empty -> now usable? + if (full) { + if (header.num != 0) { + dout(1) << " journal FULL, ignoring this epoch" << endl; + return; + } + + dout(1) << " clearing FULL flag, journal now usable" << endl; + full = false; + } + + // note epoch boundary + header.push(ebofs->get_super_epoch(), queue_pos); // note: these entries may not yet be written. + //write_header(); // no need to write it now, though... } void FileJournal::commit_epoch_finish() { - dout(10) << "commit_epoch_finish" << endl; + dout(10) << "commit_epoch_finish committed " << ebofs->get_super_epoch()-1 << endl; write_lock.Lock(); { - // update header - header.epoch1 = ebofs->super_epoch; - header.top1 = header.top2; - header.epoch2 = 0; - header.top2 = 0; + if (full) { + // full journal damage control. + dout(15) << " journal was FULL, contents now committed, clearing header. journal still not usable until next epoch." << endl; + header.clear(); + write_pos = queue_pos = sizeof(header_t); + } else { + // update header -- trim/discard old (committed) epochs + while (header.epoch[0] < ebofs->get_super_epoch()) + header.pop(); + } write_header(); - // flush any unwritten items in previous epoch - while (!writeq.empty() && - writeq.front().first < ebofs->super_epoch) { - dout(15) << " dropping uncommitted journal item from prior epoch" << endl; - writeq.pop_front(); + // discard any unwritten items in previous epoch, and do callbacks + epoch_t epoch = ebofs->get_super_epoch(); + list callbacks; + while (!writeq.empty() && writeq.front().first < epoch) { + dout(15) << " dropping unwritten and committed " + << write_pos << " : " << writeq.front().second.length() + << " epoch " << writeq.front().first + << endl; + // finisher? Context *oncommit = commitq.front(); + if (oncommit) callbacks.push_back(oncommit); + + write_pos += 2*sizeof(entry_header_t) + writeq.front().second.length(); + + // discard. + writeq.pop_front(); commitq.pop_front(); - - if (oncommit) { - oncommit->finish(0); - delete oncommit; - } } + + // queue the finishers + ebofs->queue_finishers(callbacks); } write_lock.Unlock(); } + + +void FileJournal::make_writeable() +{ + if (read_pos) + write_pos = queue_pos = read_pos; + else + write_pos = queue_pos = sizeof(header_t); + read_pos = 0; +} + + +bool FileJournal::read_entry(bufferlist& bl, epoch_t& epoch) +{ + if (!read_pos) { + dout(1) << "read_entry -- not readable" << endl; + make_writeable(); + return false; + } + + if (read_pos == header.wrap) { + // find wrap point + for (int i=1; i= 2 && offset[0] > offset[1]) + wrap = 0; // we're eliminating a wrap + num--; + for (int i=0; iwrite_thread(); + journal->write_thread_entry(); return 0; } } write_thread; public: - FileJournal(Ebofs *e, char *f, off_t sz) : - Journal(e), - fn(f), max_size(sz), - top(0), bottom(0), committing_to(0), + FileJournal(Ebofs *e, char *f) : + Journal(e), fn(f), + full(false), + write_pos(0), queue_pos(0), read_pos(0), fd(0), - write_stop(false), write_thread(this) - { } + write_stop(false), write_thread(this) { } ~FileJournal() {} - void create(); - void open(); + int create(); + int open(); void close(); // writes - void submit_entry(bufferlist& e, Context *oncommit); // submit an item - void commit_epoch_start(); // mark epoch boundary - void commit_epoch_finish(); // mark prior epoch as committed (we can expire) + bool submit_entry(bufferlist& e, Context *oncommit); // submit an item + void commit_epoch_start(); // mark epoch boundary + void commit_epoch_finish(); // mark prior epoch as committed (we can expire) + + bool read_entry(bufferlist& bl, epoch_t& e); // reads }; diff --git a/branches/sage/pgs/ebofs/Journal.h b/branches/sage/pgs/ebofs/Journal.h index c05bce5955c5f..fb1983c22eafc 100644 --- a/branches/sage/pgs/ebofs/Journal.h +++ b/branches/sage/pgs/ebofs/Journal.h @@ -16,22 +16,28 @@ #ifndef __EBOFS_JOURNAL_H #define __EBOFS_JOURNAL_H +class Ebofs; + +#include "include/buffer.h" +#include "include/Context.h" class Journal { +protected: Ebofs *ebofs; - public: +public: Journal(Ebofs *e) : ebofs(e) { } virtual ~Journal() { } - virtual void create() = 0; - virtual void open() = 0; + virtual int create() = 0; + virtual int open() = 0; virtual void close() = 0; // writes - virtual void submit_entry(bufferlist& e, Context *oncommit) = 0;// submit an item + virtual bool submit_entry(bufferlist& e, Context *oncommit) = 0;// submit an item virtual void commit_epoch_start() = 0; // mark epoch boundary - virtual void commit_epoch_finish(list& ls) = 0; // mark prior epoch as committed (we can expire) + virtual void commit_epoch_finish() = 0; // mark prior epoch as committed (we can expire) + virtual bool read_entry(bufferlist& bl, epoch_t &e) = 0; // reads/recovery diff --git a/branches/sage/pgs/ebofs/test.ebofs.cc b/branches/sage/pgs/ebofs/test.ebofs.cc index 704ec16581824..345f49b7a68ca 100644 --- a/branches/sage/pgs/ebofs/test.ebofs.cc +++ b/branches/sage/pgs/ebofs/test.ebofs.cc @@ -145,6 +145,7 @@ int main(int argc, char **argv) char *filename = args[0]; int seconds = atoi(args[1]); int threads = atoi(args[2]); + if (!threads) threads = 1; cout << "dev " << filename << " .. " << threads << " threads .. " << seconds << " seconds" << endl; @@ -153,7 +154,7 @@ int main(int argc, char **argv) // explicit tests - if (1) { + if (0) { // verify that clone() plays nice with partial writes object_t oid(1,1); bufferptr bp(10000); diff --git a/branches/sage/pgs/ebofs/types.h b/branches/sage/pgs/ebofs/types.h index b03bb8a40d9c9..1fa209a3deeb9 100644 --- a/branches/sage/pgs/ebofs/types.h +++ b/branches/sage/pgs/ebofs/types.h @@ -142,15 +142,16 @@ static const int EBOFS_FREE_BUCKET_BITS = 2; struct ebofs_super { - unsigned s_magic; - - unsigned epoch; // version of this superblock. + uint64_t s_magic; + uint64_t fsid; + + epoch_t epoch; // version of this superblock. - unsigned num_blocks; /* # blocks in filesystem */ + uint64_t num_blocks; /* # blocks in filesystem */ // some basic stats, for kicks - unsigned free_blocks; /* unused blocks */ - unsigned limbo_blocks; /* limbo blocks */ + uint64_t free_blocks; /* unused blocks */ + uint64_t limbo_blocks; /* limbo blocks */ //unsigned num_objects; //unsigned num_fragmented; diff --git a/branches/sage/pgs/include/Context.h b/branches/sage/pgs/include/Context.h index 7f2f30104407b..d73a1d8c739b6 100644 --- a/branches/sage/pgs/include/Context.h +++ b/branches/sage/pgs/include/Context.h @@ -44,11 +44,14 @@ inline void finish_contexts(std::list& finished, using std::cout; using std::endl; + list ls; if (finished.empty()) return; - dout(10) << finished.size() << " contexts to finish with " << result << endl; - for (std::list::iterator it = finished.begin(); - it != finished.end(); + ls.swap(finished); // swap out of place to avoid weird loops + + dout(10) << ls.size() << " contexts to finish with " << result << endl; + for (std::list::iterator it = ls.begin(); + it != ls.end(); it++) { Context *c = *it; dout(10) << "---- " << c << endl; diff --git a/branches/sage/pgs/include/buffer.h b/branches/sage/pgs/include/buffer.h index 1f401513f688c..1f61d3b892ba5 100644 --- a/branches/sage/pgs/include/buffer.h +++ b/branches/sage/pgs/include/buffer.h @@ -919,6 +919,14 @@ inline void _decode(std::string& s, bufferlist& bl, int& off) off += len+1; } +// const char* (encode only, string compatible) +inline void _encode(const char *s, bufferlist& bl) +{ + uint32_t len = strlen(s); + _encoderaw(len, bl); + bl.append(s, len+1); +} + // bufferptr (encapsulated) inline void _encode(bufferptr& bp, bufferlist& bl) { diff --git a/branches/sage/pgs/mds/MDS.cc b/branches/sage/pgs/mds/MDS.cc index e4b0d21d6959b..f2e40bff824a4 100644 --- a/branches/sage/pgs/mds/MDS.cc +++ b/branches/sage/pgs/mds/MDS.cc @@ -255,11 +255,8 @@ public: int MDS::init(bool standby) { mds_lock.Lock(); - - if (standby) - want_state = MDSMap::STATE_STANDBY; - else - want_state = MDSMap::STATE_STARTING; + + want_state = MDSMap::STATE_BOOT; // starting beacon. this will induce an MDSMap from the monitor beacon_start(); @@ -367,7 +364,7 @@ void MDS::beacon_send() beacon_seq_stamp[beacon_last_seq] = g_clock.now(); int mon = monmap->pick_mon(); - messenger->send_message(new MMDSBeacon(want_state, beacon_last_seq), + messenger->send_message(new MMDSBeacon(messenger->get_myinst(), want_state, beacon_last_seq), monmap->get_inst(mon)); // schedule next sender @@ -481,7 +478,7 @@ void MDS::handle_mds_map(MMDSMap *m) mdsmap->decode(m->get_encoded()); // see who i am - whoami = mdsmap->get_inst_rank(messenger->get_myaddr()); + whoami = mdsmap->get_addr_rank(messenger->get_myaddr()); if (oldwhoami != whoami) { // update messenger. messenger->reset_myname(MSG_ADDR_MDS(whoami)); diff --git a/branches/sage/pgs/mds/MDSMap.h b/branches/sage/pgs/mds/MDSMap.h index 8c47e9ed09681..46d856316740c 100644 --- a/branches/sage/pgs/mds/MDSMap.h +++ b/branches/sage/pgs/mds/MDSMap.h @@ -33,16 +33,17 @@ class MDSMap { static const int STATE_OUT = 1; // down, once existed, but no subtrees, empty log. static const int STATE_FAILED = 2; // down, active subtrees; needs to be recovered. - static const int STATE_STANDBY = 3; // up, but inactive. waiting for assignment by monitor. - static const int STATE_CREATING = 4; // up, creating MDS instance (new journal, idalloc..) - static const int STATE_STARTING = 5; // up, starting prior out MDS instance. - static const int STATE_REPLAY = 6; // up, scanning journal, recoverying any shared state - static const int STATE_RESOLVE = 7; // up, disambiguating partial distributed operations (import/export, ...rename?) - static const int STATE_RECONNECT = 8; // up, reconnect to clients - static const int STATE_REJOIN = 9; // up, replayed journal, rejoining distributed cache - static const int STATE_ACTIVE = 10; // up, active - static const int STATE_STOPPING = 11; // up, exporting metadata (-> standby or out) - static const int STATE_STOPPED = 12; // up, finished stopping. like standby, but not avail to takeover. + static const int STATE_BOOT = 3; // up, started, joining cluster. + static const int STATE_STANDBY = 4; // up, but inactive. waiting for assignment by monitor. + static const int STATE_CREATING = 5; // up, creating MDS instance (new journal, idalloc..) + static const int STATE_STARTING = 6; // up, starting prior out MDS instance. + static const int STATE_REPLAY = 7; // up, scanning journal, recoverying any shared state + static const int STATE_RESOLVE = 8; // up, disambiguating partial distributed operations (import/export, ...rename?) + static const int STATE_RECONNECT = 9; // up, reconnect to clients + static const int STATE_REJOIN = 10; // up, replayed journal, rejoining distributed cache + static const int STATE_ACTIVE = 11; // up, active + static const int STATE_STOPPING = 12; // up, exporting metadata (-> standby or out) + static const int STATE_STOPPED = 13; // up, finished stopping. like standby, but not avail to takeover. static const char *get_state_name(int s) { switch (s) { @@ -51,6 +52,7 @@ class MDSMap { case STATE_OUT: return "down:out"; case STATE_FAILED: return "down:failed"; // up + case STATE_BOOT: return "up:boot"; case STATE_STANDBY: return "up:standby"; case STATE_CREATING: return "up:creating"; case STATE_STARTING: return "up:starting"; @@ -170,6 +172,7 @@ class MDSMap { bool is_out(int m) { return mds_state.count(m) && mds_state[m] == STATE_OUT; } bool is_failed(int m) { return mds_state.count(m) && mds_state[m] == STATE_FAILED; } + bool is_boot(int m) { return mds_state.count(m) && mds_state[m] == STATE_BOOT; } bool is_standby(int m) { return mds_state.count(m) && mds_state[m] == STATE_STANDBY; } bool is_creating(int m) { return mds_state.count(m) && mds_state[m] == STATE_CREATING; } bool is_starting(int m) { return mds_state.count(m) && mds_state[m] == STATE_STARTING; } @@ -226,7 +229,7 @@ class MDSMap { return false; } - int get_inst_rank(const entity_addr_t& addr) { + int get_addr_rank(const entity_addr_t& addr) { for (map::iterator p = mds_inst.begin(); p != mds_inst.end(); ++p) { diff --git a/branches/sage/pgs/messages/MClientMount.h b/branches/sage/pgs/messages/MClientMount.h index c3bc000911835..d083d72833830 100644 --- a/branches/sage/pgs/messages/MClientMount.h +++ b/branches/sage/pgs/messages/MClientMount.h @@ -19,12 +19,22 @@ class MClientMount : public Message { public: + entity_addr_t addr; + MClientMount() : Message(MSG_CLIENT_MOUNT) { } + MClientMount(entity_addr_t a) : + Message(MSG_CLIENT_MOUNT), + addr(a) { } char *get_type_name() { return "client_mount"; } - void decode_payload() { } - void encode_payload() { } + void decode_payload() { + int off = 0; + ::_decode(addr, payload, off); + } + void encode_payload() { + ::_encode(addr, payload); + } }; #endif diff --git a/branches/sage/pgs/messages/MClientUnmount.h b/branches/sage/pgs/messages/MClientUnmount.h index e8acc50f190e0..42fa07db7ba05 100644 --- a/branches/sage/pgs/messages/MClientUnmount.h +++ b/branches/sage/pgs/messages/MClientUnmount.h @@ -19,12 +19,22 @@ class MClientUnmount : public Message { public: + entity_inst_t inst; + MClientUnmount() : Message(MSG_CLIENT_UNMOUNT) { } - + MClientUnmount(entity_inst_t i) : + Message(MSG_CLIENT_UNMOUNT), + inst(i) { } + char *get_type_name() { return "client_unmount"; } - void decode_payload() { } - void encode_payload() { } + void decode_payload() { + int off = 0; + ::_decode(inst, payload, off); + } + void encode_payload() { + ::_encode(inst, payload); + } }; #endif diff --git a/branches/sage/pgs/messages/MMDSBeacon.h b/branches/sage/pgs/messages/MMDSBeacon.h index 4789c809572c4..d8b73a45a3122 100644 --- a/branches/sage/pgs/messages/MMDSBeacon.h +++ b/branches/sage/pgs/messages/MMDSBeacon.h @@ -22,33 +22,37 @@ #include "mds/MDSMap.h" class MMDSBeacon : public Message { + entity_inst_t inst; int state; version_t seq; public: MMDSBeacon() : Message(MSG_MDS_BEACON) {} - MMDSBeacon(int st, version_t se) : Message(MSG_MDS_BEACON), - state(st), seq(se) { } + MMDSBeacon(entity_inst_t i, int st, version_t se) : + Message(MSG_MDS_BEACON), + inst(i), state(st), seq(se) { } + entity_inst_t& get_mds_inst() { return inst; } int get_state() { return state; } version_t get_seq() { return seq; } char *get_type_name() { return "mdsbeacon"; } void print(ostream& out) { - out << "mdsbeacon(" << MDSMap::get_state_name(state) + out << "mdsbeacon(" << inst + << " " << MDSMap::get_state_name(state) << " seq " << seq << ")"; } void encode_payload() { - payload.append((char*)&state, sizeof(state)); - payload.append((char*)&seq, sizeof(seq)); + ::_encode(inst, payload); + ::_encode(state, payload); + ::_encode(seq, payload); } void decode_payload() { int off = 0; - payload.copy(off, sizeof(state), (char*)&state); - off += sizeof(state); - payload.copy(off, sizeof(seq), (char*)&seq); - off += sizeof(seq); + ::_decode(inst, payload, off); + ::_decode(state, payload, off); + ::_decode(seq, payload, off); } }; diff --git a/branches/sage/pgs/messages/MMonCommand.h b/branches/sage/pgs/messages/MMonCommand.h index d5fd8ae64017a..19d25dd7a4d77 100644 --- a/branches/sage/pgs/messages/MMonCommand.h +++ b/branches/sage/pgs/messages/MMonCommand.h @@ -22,9 +22,13 @@ using std::vector; class MMonCommand : public Message { public: + entity_inst_t inst; vector cmd; MMonCommand() : Message(MSG_MON_COMMAND) {} + MMonCommand(entity_inst_t i) : + Message(MSG_MON_COMMAND), + inst(i) { } virtual char *get_type_name() { return "mon_command"; } void print(ostream& o) { @@ -37,10 +41,12 @@ class MMonCommand : public Message { } void encode_payload() { + ::_encode(inst, payload); ::_encode(cmd, payload); } void decode_payload() { int off = 0; + ::_decode(inst, payload, off); ::_decode(cmd, payload, off); } }; diff --git a/branches/sage/pgs/messages/MMonElection.h b/branches/sage/pgs/messages/MMonElection.h new file mode 100644 index 0000000000000..14a29af9140f9 --- /dev/null +++ b/branches/sage/pgs/messages/MMonElection.h @@ -0,0 +1,63 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + + +#ifndef __MMONELECTION_H +#define __MMONELECTION_H + +#include "msg/Message.h" + + +class MMonElection : public Message { +public: + static const int OP_PROPOSE = 1; + static const int OP_ACK = 2; + static const int OP_NAK = 3; + static const int OP_VICTORY = 4; + static const char *get_opname(int o) { + switch (o) { + case OP_PROPOSE: return "propose"; + case OP_ACK: return "ack"; + case OP_NAK: return "nak"; + case OP_VICTORY: return "victory"; + default: assert(0); return 0; + } + } + + int32_t op; + epoch_t epoch; + + MMonElection() : Message(MSG_MON_ELECTION) {} + MMonElection(int o, epoch_t e) : + Message(MSG_MON_ELECTION), + op(o), epoch(e) {} + + char *get_type_name() { return "election"; } + void print(ostream& out) { + out << "election(" << get_opname(op) << " " << epoch << ")"; + } + + void encode_payload() { + ::_encode(op, payload); + ::_encode(epoch, payload); + } + void decode_payload() { + int off = 0; + ::_decode(op, payload, off); + ::_decode(epoch, payload, off); + } + +}; + +#endif diff --git a/branches/sage/pgs/messages/MMonElectionAck.h b/branches/sage/pgs/messages/MMonElectionAck.h deleted file mode 100644 index 14f8b7cb49b7b..0000000000000 --- a/branches/sage/pgs/messages/MMonElectionAck.h +++ /dev/null @@ -1,32 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2004-2006 Sage Weil - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - - -#ifndef __MMONELECTIONACK_H -#define __MMONELECTIONACK_H - -#include "msg/Message.h" - - -class MMonElectionAck : public Message { - public: - MMonElectionAck() : Message(MSG_MON_ELECTION_ACK) {} - - virtual char *get_type_name() { return "election_ack"; } - - void encode_payload() {} - void decode_payload() {} -}; - -#endif diff --git a/branches/sage/pgs/messages/MMonElectionPropose.h b/branches/sage/pgs/messages/MMonElectionPropose.h deleted file mode 100644 index 7ec54b67332b9..0000000000000 --- a/branches/sage/pgs/messages/MMonElectionPropose.h +++ /dev/null @@ -1,33 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2004-2006 Sage Weil - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - - -#ifndef __MMONELECTIONPROPOSE_H -#define __MMONELECTIONPROPOSE_H - -#include "msg/Message.h" - - -class MMonElectionPropose : public Message { - public: - MMonElectionPropose() : Message(MSG_MON_ELECTION_PROPOSE) {} - - virtual char *get_type_name() { return "election_propose"; } - - void encode_payload() {} - void decode_payload() {} - -}; - -#endif diff --git a/branches/sage/pgs/messages/MMonElectionVictory.h b/branches/sage/pgs/messages/MMonElectionVictory.h deleted file mode 100644 index 47211ae501db5..0000000000000 --- a/branches/sage/pgs/messages/MMonElectionVictory.h +++ /dev/null @@ -1,41 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2004-2006 Sage Weil - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - - -#ifndef __MMONELECTIONVICTORY_H -#define __MMONELECTIONVICTORY_H - -#include "msg/Message.h" - - -class MMonElectionVictory : public Message { - public: - //set active_set; - - MMonElectionVictory(/*set& as*/) : Message(MSG_MON_ELECTION_VICTORY)//, - //active_set(as) - {} - - virtual char *get_type_name() { return "election_victory"; } - - void encode_payload() { - //::_encode(active_set, payload); - } - void decode_payload() { - //int off = 0; - //::_decode(active_set, payload, off); - } -}; - -#endif diff --git a/branches/sage/pgs/messages/MMonPaxos.h b/branches/sage/pgs/messages/MMonPaxos.h index 40cdadc81c57d..7210b179c9a42 100644 --- a/branches/sage/pgs/messages/MMonPaxos.h +++ b/branches/sage/pgs/messages/MMonPaxos.h @@ -17,15 +17,18 @@ #define __MMONPAXOS_H #include "msg/Message.h" +#include "mon/mon_types.h" class MMonPaxos : public Message { public: // op types - const static int OP_COLLECT = 1; // proposer: propose round - const static int OP_LAST = 2; // voter: accept proposed round - const static int OP_BEGIN = 4; // proposer: value proposed for this round - const static int OP_ACCEPT = 5; // voter: accept propsed value - const static int OP_COMMIT = 7; // proposer: notify learners of agreed value + const static int OP_COLLECT = 1; // proposer: propose round + const static int OP_LAST = 2; // voter: accept proposed round + const static int OP_BEGIN = 3; // proposer: value proposed for this round + const static int OP_ACCEPT = 4; // voter: accept propsed value + const static int OP_COMMIT = 5; // proposer: notify learners of agreed value + const static int OP_LEASE = 6; // leader: extend peon lease + const static int OP_LEASE_ACK = 7; // peon: lease ack const static char *get_opname(int op) { switch (op) { case OP_COLLECT: return "collect"; @@ -33,52 +36,61 @@ class MMonPaxos : public Message { case OP_BEGIN: return "begin"; case OP_ACCEPT: return "accept"; case OP_COMMIT: return "commit"; + case OP_LEASE: return "lease"; + case OP_LEASE_ACK: return "lease_ack"; default: assert(0); return 0; } } - // which state machine? - int op; - int machine_id; - + epoch_t epoch; // monitor epoch + int op; // paxos op + int machine_id; // which state machine? + version_t last_committed; // i've committed to version_t pn_from; // i promise to accept after version_t pn; // with with proposal - version_t old_accepted_pn; // previous pn, if we are a LAST with an uncommitted value + version_t uncommitted_pn; // previous pn, if we are a LAST with an uncommitted value + utime_t lease_expire; map values; MMonPaxos() : Message(MSG_MON_PAXOS) {} - MMonPaxos(int o, int mid) : Message(MSG_MON_PAXOS), - op(o), machine_id(mid), - last_committed(0), pn_from(0), pn(0), old_accepted_pn(0) { } + MMonPaxos(epoch_t e, int o, int mid) : + Message(MSG_MON_PAXOS), + epoch(e), + op(o), machine_id(mid), + last_committed(0), pn_from(0), pn(0), uncommitted_pn(0) { } virtual char *get_type_name() { return "paxos"; } void print(ostream& out) { - out << "paxos(m" << machine_id + out << "paxos(" << get_paxos_name(machine_id) << " " << get_opname(op) << " lc " << last_committed - << " pn " << pn << " opn " << old_accepted_pn + << " pn " << pn << " opn " << uncommitted_pn << ")"; } void encode_payload() { + ::_encode(epoch, payload); ::_encode(op, payload); ::_encode(machine_id, payload); ::_encode(last_committed, payload); ::_encode(pn_from, payload); ::_encode(pn, payload); - ::_encode(old_accepted_pn, payload); + ::_encode(uncommitted_pn, payload); + ::_encode(lease_expire, payload); ::_encode(values, payload); } void decode_payload() { int off = 0; + ::_decode(epoch, payload, off); ::_decode(op, payload, off); ::_decode(machine_id, payload, off); ::_decode(last_committed, payload, off); ::_decode(pn_from, payload, off); ::_decode(pn, payload, off); - ::_decode(old_accepted_pn, payload, off); + ::_decode(uncommitted_pn, payload, off); + ::_decode(lease_expire, payload, off); ::_decode(values, payload, off); } }; diff --git a/branches/sage/pgs/messages/MOSDBoot.h b/branches/sage/pgs/messages/MOSDBoot.h index 9ae5a68d6c59a..00c94ad1a2a80 100644 --- a/branches/sage/pgs/messages/MOSDBoot.h +++ b/branches/sage/pgs/messages/MOSDBoot.h @@ -22,23 +22,29 @@ class MOSDBoot : public Message { public: + entity_inst_t inst; OSDSuperblock sb; MOSDBoot() {} - MOSDBoot(OSDSuperblock& s) : + MOSDBoot(entity_inst_t i, OSDSuperblock& s) : Message(MSG_OSD_BOOT), + inst(i), sb(s) { } - char *get_type_name() { return "oboot"; } + char *get_type_name() { return "osd_boot"; } + void print(ostream& out) { + out << "osd_boot(" << inst << ")"; + } void encode_payload() { - payload.append((char*)&sb, sizeof(sb)); + ::_encode(inst, payload); + ::_encode(sb, payload); } void decode_payload() { int off = 0; - payload.copy(off, sizeof(sb), (char*)&sb); - off += sizeof(sb); + ::_decode(inst, payload, off); + ::_decode(sb, payload, off); } }; diff --git a/branches/sage/pgs/messages/MOSDFailure.h b/branches/sage/pgs/messages/MOSDFailure.h index 5bc7a5d5ee9f6..965d622a5f5e2 100644 --- a/branches/sage/pgs/messages/MOSDFailure.h +++ b/branches/sage/pgs/messages/MOSDFailure.h @@ -21,30 +21,35 @@ class MOSDFailure : public Message { public: + entity_inst_t from; entity_inst_t failed; epoch_t epoch; MOSDFailure() {} - MOSDFailure(entity_inst_t f, epoch_t e) : + MOSDFailure(entity_inst_t fr, entity_inst_t f, epoch_t e) : Message(MSG_OSD_FAILURE), - failed(f), epoch(e) {} + from(fr), failed(f), epoch(e) {} + entity_inst_t get_from() { return from; } entity_inst_t get_failed() { return failed; } epoch_t get_epoch() { return epoch; } void decode_payload() { int off = 0; - payload.copy(off, sizeof(failed), (char*)&failed); - off += sizeof(failed); - payload.copy(off, sizeof(epoch), (char*)&epoch); - off += sizeof(epoch); + ::_decode(from, payload, off); + ::_decode(failed, payload, off); + ::_decode(epoch, payload, off); } void encode_payload() { - payload.append((char*)&failed, sizeof(failed)); - payload.append((char*)&epoch, sizeof(epoch)); + ::_encode(from, payload); + ::_encode(failed, payload); + ::_encode(epoch, payload); } - virtual char *get_type_name() { return "osdfail"; } + virtual char *get_type_name() { return "osd_failure"; } + void print(ostream& out) { + out << "osd_failure(" << failed << " e" << epoch << ")"; + } }; #endif diff --git a/branches/sage/pgs/messages/MOSDGetMap.h b/branches/sage/pgs/messages/MOSDGetMap.h index 5158ce7d3ed83..68e1b7d137dae 100644 --- a/branches/sage/pgs/messages/MOSDGetMap.h +++ b/branches/sage/pgs/messages/MOSDGetMap.h @@ -23,7 +23,6 @@ class MOSDGetMap : public Message { public: epoch_t since; - //MOSDGetMap() : since(0) {} MOSDGetMap(epoch_t s=0) : Message(MSG_OSD_GETMAP), since(s) { @@ -31,7 +30,10 @@ class MOSDGetMap : public Message { epoch_t get_since() { return since; } - char *get_type_name() { return "getomap"; } + char *get_type_name() { return "get_osd_map"; } + void print(ostream& out) { + out << "get_osd_map(since " << since << ")"; + } void encode_payload() { payload.append((char*)&since, sizeof(since)); diff --git a/branches/sage/pgs/msg/FakeMessenger.cc b/branches/sage/pgs/msg/FakeMessenger.cc index fa8ca2d05c455..76a7576ee6066 100644 --- a/branches/sage/pgs/msg/FakeMessenger.cc +++ b/branches/sage/pgs/msg/FakeMessenger.cc @@ -71,17 +71,17 @@ void *fakemessenger_thread(void *ptr) { lock.Lock(); while (1) { + if (fm_shutdown) break; + fakemessenger_do_loop_2(); + + if (directory.empty()) break; + dout(20) << "thread waiting" << endl; if (fm_shutdown) break; awake = false; cond.Wait(lock); awake = true; dout(20) << "thread woke up" << endl; - if (fm_shutdown) break; - - fakemessenger_do_loop_2(); - - if (directory.empty()) break; } lock.Unlock(); @@ -185,7 +185,7 @@ int fakemessenger_do_loop_2() } } - // deal with shutdowns.. dleayed to avoid concurrent directory modification + // deal with shutdowns.. delayed to avoid concurrent directory modification if (!shutdown_set.empty()) { for (set::iterator it = shutdown_set.begin(); it != shutdown_set.end(); @@ -311,7 +311,8 @@ int FakeMessenger::send_message(Message *m, entity_inst_t inst, int port, int fr #endif // queue - if (directory.count(inst.addr)) { + if (directory.count(inst.addr) && + shutdown_set.count(inst.addr) == 0) { dout(1) << "--> " << get_myname() << " -> " << inst.name << " --- " << *m << endl; directory[inst.addr]->queue_incoming(m); } else { diff --git a/branches/sage/pgs/msg/Message.cc b/branches/sage/pgs/msg/Message.cc index 39510c47b9bd3..cfcde208867f0 100644 --- a/branches/sage/pgs/msg/Message.cc +++ b/branches/sage/pgs/msg/Message.cc @@ -15,9 +15,7 @@ using namespace std; #include "messages/MMonCommandAck.h" #include "messages/MMonPaxos.h" -#include "messages/MMonElectionAck.h" -#include "messages/MMonElectionPropose.h" -#include "messages/MMonElectionVictory.h" +#include "messages/MMonElection.h" #include "messages/MPing.h" #include "messages/MPingAck.h" @@ -122,14 +120,8 @@ decode_message(msg_envelope_t& env, bufferlist& payload) m = new MMonPaxos; break; - case MSG_MON_ELECTION_PROPOSE: - m = new MMonElectionPropose; - break; - case MSG_MON_ELECTION_ACK: - m = new MMonElectionAck; - break; - case MSG_MON_ELECTION_VICTORY: - m = new MMonElectionVictory; + case MSG_MON_ELECTION: + m = new MMonElection; break; case MSG_PING: diff --git a/branches/sage/pgs/msg/Message.h b/branches/sage/pgs/msg/Message.h index c7e76297553ed..8d43684e66e77 100644 --- a/branches/sage/pgs/msg/Message.h +++ b/branches/sage/pgs/msg/Message.h @@ -26,9 +26,7 @@ #define MSG_MON_COMMAND_ACK 14 -#define MSG_MON_ELECTION_ACK 15 -#define MSG_MON_ELECTION_PROPOSE 16 -#define MSG_MON_ELECTION_VICTORY 17 +#define MSG_MON_ELECTION 15 #define MSG_MON_OSDMAP_INFO 20 #define MSG_MON_OSDMAP_LEASE 21 diff --git a/branches/sage/pgs/osd/OSD.cc b/branches/sage/pgs/osd/OSD.cc index 4f9d001a3c459..355855e2afc91 100644 --- a/branches/sage/pgs/osd/OSD.cc +++ b/branches/sage/pgs/osd/OSD.cc @@ -281,7 +281,7 @@ int OSD::init() // announce to monitor i exist and have booted. int mon = monmap->pick_mon(); - messenger->send_message(new MOSDBoot(superblock), monmap->get_inst(mon)); + messenger->send_message(new MOSDBoot(messenger->get_myinst(), superblock), monmap->get_inst(mon)); // start the heart timer.add_event_after(g_conf.osd_heartbeat_interval, new C_Heartbeat(this)); @@ -898,7 +898,7 @@ void OSD::ms_handle_failure(Message *m, const entity_inst_t& inst) << ", dropping and reporting to mon" << mon << " " << *m << dendl; - messenger->send_message(new MOSDFailure(inst, osdmap->get_epoch()), + messenger->send_message(new MOSDFailure(messenger->get_myinst(), inst, osdmap->get_epoch()), monmap->get_inst(mon)); delete m; } else if (dest.is_mon()) { @@ -1166,7 +1166,6 @@ void OSD::advance_map(ObjectStore::Transaction& t) int maxraid = g_conf.osd_max_raid_width; dout(1) << "mkfs " << minrep << ".." << maxrep << " replicas, " << minraid << ".." << maxraid << " osd raid groups" << dendl; - assert(osdmap->get_epoch() == 1); //cerr << "osdmap " << osdmap->get_ctime() << " logger start " << logger->get_start() << dendl; logger->set_start( osdmap->get_ctime() ); @@ -1490,7 +1489,7 @@ void OSD::get_map(epoch_t epoch, OSDMap &m) incs.push_front(inc); } } - assert(e > 0); + assert(e >= 0); // apply incrementals for (e++; e <= epoch; e++) { diff --git a/branches/sage/pgs/osd/OSDMap.h b/branches/sage/pgs/osd/OSDMap.h index f8c5b3ebdd036..6086ea09daae0 100644 --- a/branches/sage/pgs/osd/OSDMap.h +++ b/branches/sage/pgs/osd/OSDMap.h @@ -87,6 +87,11 @@ public: epoch_t epoch; // new epoch; we are a diff from epoch-1 to epoch epoch_t mon_epoch; // monitor epoch (election iteration) utime_t ctime; + + // full (rare) + bufferlist fullmap; // in leiu of below. + + // incremental map new_up; map new_down; list new_in; @@ -103,6 +108,7 @@ public: ::_encode(new_in, bl); ::_encode(new_out, bl); ::_encode(new_overload, bl); + ::_encode(fullmap, bl); } void decode(bufferlist& bl, int& off) { bl.copy(off, sizeof(epoch), (char*)&epoch); @@ -116,6 +122,7 @@ public: ::_decode(new_in, bl, off); ::_decode(new_out, bl, off); ::_decode(new_overload, bl, off); + ::_decode(fullmap, bl, off); } Incremental(epoch_t e=0) : epoch(e), mon_epoch(0) {} @@ -164,8 +171,8 @@ private: const utime_t& get_ctime() const { return ctime; } - bool is_mkfs() const { return epoch == 1; } - //void set_mkfs() { assert(epoch == 1); } + bool is_mkfs() const { return epoch == 2; } + bool post_mkfs() const { return epoch > 2; } /***** cluster state *****/ int num_osds() { return osds.size(); } @@ -176,11 +183,15 @@ private: const set& get_out_osds() { return out_osds; } const map& get_overload_osds() { return overload_osds; } + bool exists(int osd) { return osds.count(osd); } bool is_down(int osd) { return down_osds.count(osd); } - bool is_up(int osd) { return !is_down(osd); } + bool is_up(int osd) { return exists(osd) && !is_down(osd); } bool is_out(int osd) { return out_osds.count(osd); } - bool is_in(int osd) { return !is_out(osd); } + bool is_in(int osd) { return exists(osd) && !is_out(osd); } + bool have_inst(int osd) { + return osd_inst.count(osd); + } const entity_inst_t& get_inst(int osd) { assert(osd_inst.count(osd)); return osd_inst[osd]; @@ -205,15 +216,13 @@ private: mon_epoch = inc.mon_epoch; ctime = inc.ctime; - for (map::iterator i = inc.new_up.begin(); - i != inc.new_up.end(); - i++) { - assert(down_osds.count(i->first)); - down_osds.erase(i->first); - assert(osd_inst.count(i->first) == 0); - osd_inst[i->first] = i->second; - //cout << "epoch " << epoch << " up osd" << i->first << endl; + // full map? + if (inc.fullmap.length()) { + decode(inc.fullmap); + return; } + + // nope, incremental. for (map::iterator i = inc.new_down.begin(); i != inc.new_down.end(); i++) { @@ -224,13 +233,6 @@ private: osd_inst.erase(i->first); //cout << "epoch " << epoch << " down osd" << i->first << endl; } - for (list::iterator i = inc.new_in.begin(); - i != inc.new_in.end(); - i++) { - assert(out_osds.count(*i)); - out_osds.erase(*i); - //cout << "epoch " << epoch << " in osd" << *i << endl; - } for (list::iterator i = inc.new_out.begin(); i != inc.new_out.end(); i++) { @@ -238,17 +240,34 @@ private: out_osds.insert(*i); //cout << "epoch " << epoch << " out osd" << *i << endl; } - for (map::iterator i = inc.new_overload.begin(); - i != inc.new_overload.end(); - i++) { - overload_osds[i->first] = i->second; - } for (list::iterator i = inc.old_overload.begin(); i != inc.old_overload.end(); i++) { assert(overload_osds.count(*i)); overload_osds.erase(*i); } + + for (map::iterator i = inc.new_up.begin(); + i != inc.new_up.end(); + i++) { + assert(down_osds.count(i->first)); + down_osds.erase(i->first); + assert(osd_inst.count(i->first) == 0); + osd_inst[i->first] = i->second; + //cout << "epoch " << epoch << " up osd" << i->first << endl; + } + for (list::iterator i = inc.new_in.begin(); + i != inc.new_in.end(); + i++) { + assert(out_osds.count(*i)); + out_osds.erase(*i); + //cout << "epoch " << epoch << " in osd" << *i << endl; + } + for (map::iterator i = inc.new_overload.begin(); + i != inc.new_overload.end(); + i++) { + overload_osds[i->first] = i->second; + } } // serialize, unserialize diff --git a/branches/sage/pgs/osd/ObjectStore.h b/branches/sage/pgs/osd/ObjectStore.h index d97652b548778..b21cbd26cffc2 100644 --- a/branches/sage/pgs/osd/ObjectStore.h +++ b/branches/sage/pgs/osd/ObjectStore.h @@ -102,14 +102,29 @@ public: list offsets; list lengths; list attrnames; + list attrnames2; //list< pair > attrvals; list attrbls; + // for reads only (not encoded) list pbls; list psts; list< pair > pattrvals; list< map* > pattrsets; + const char *get_attrname() { + if (attrnames.empty()) + return attrnames2.front().c_str(); + else + return attrnames.front(); + } + void pop_attrname() { + if (attrnames.empty()) + attrnames2.pop_front(); + else + attrnames.pop_front(); + } + void read(object_t oid, off_t off, size_t len, bufferlist *pbl) { int op = OP_READ; ops.push_back(op); @@ -232,6 +247,27 @@ public: } // etc. + + void _encode(bufferlist& bl) { + ::_encode(ops, bl); + ::_encode(bls, bl); + ::_encode(oids, bl); + ::_encode(cids, bl); + ::_encode(offsets, bl); + ::_encode(lengths, bl); + ::_encode(attrnames, bl); + ::_encode(attrbls, bl); + } + void _decode(bufferlist& bl, int& off) { + ::_decode(ops, bl, off); + ::_decode(bls, bl, off); + ::_decode(oids, bl, off); + ::_decode(cids, bl, off); + ::_decode(offsets, bl, off); + ::_decode(lengths, bl, off); + ::_decode(attrnames2, bl, off); + ::_decode(attrbls, bl, off); + } }; @@ -264,7 +300,7 @@ public: case Transaction::OP_GETATTR: { object_t oid = t.oids.front(); t.oids.pop_front(); - const char *attrname = t.attrnames.front(); t.attrnames.pop_front(); + const char *attrname = t.get_attrname(); t.pop_attrname(); pair pattrval = t.pattrvals.front(); t.pattrvals.pop_front(); *pattrval.second = getattr(oid, attrname, pattrval.first, *pattrval.second); } @@ -314,7 +350,7 @@ public: case Transaction::OP_SETATTR: { object_t oid = t.oids.front(); t.oids.pop_front(); - const char *attrname = t.attrnames.front(); t.attrnames.pop_front(); + const char *attrname = t.get_attrname(); t.pop_attrname(); //pair attrval = t.attrvals.front(); t.attrvals.pop_front(); bufferlist bl; bl.claim( t.attrbls.front() ); @@ -333,7 +369,7 @@ public: case Transaction::OP_RMATTR: { object_t oid = t.oids.front(); t.oids.pop_front(); - const char *attrname = t.attrnames.front(); t.attrnames.pop_front(); + const char *attrname = t.get_attrname(); t.pop_attrname(); rmattr(oid, attrname, 0); } break; @@ -379,7 +415,7 @@ public: case Transaction::OP_COLL_SETATTR: { coll_t cid = t.cids.front(); t.cids.pop_front(); - const char *attrname = t.attrnames.front(); t.attrnames.pop_front(); + const char *attrname = t.get_attrname(); t.pop_attrname(); //pair attrval = t.attrvals.front(); t.attrvals.pop_front(); bufferlist bl; bl.claim( t.attrbls.front() ); @@ -391,7 +427,7 @@ public: case Transaction::OP_COLL_RMATTR: { coll_t cid = t.cids.front(); t.cids.pop_front(); - const char *attrname = t.attrnames.front(); t.attrnames.pop_front(); + const char *attrname = t.get_attrname(); t.pop_attrname(); collection_rmattr(cid, attrname, 0); } break; diff --git a/branches/sage/pgs/osd/PG.cc b/branches/sage/pgs/osd/PG.cc index 2592bd9ca69f8..c2d1290102e8b 100644 --- a/branches/sage/pgs/osd/PG.cc +++ b/branches/sage/pgs/osd/PG.cc @@ -650,7 +650,7 @@ void PG::peer(ObjectStore::Transaction& t, } else { dout(10) << " still active from last started: " << last_started << dendl; } - } else if (osd->osdmap->get_epoch() > 1) { + } else if (osd->osdmap->post_mkfs()) { dout(10) << " crashed since epoch " << last_epoch_started_any << dendl; state_set(STATE_CRASHED); } @@ -876,7 +876,7 @@ void PG::activate(ObjectStore::Transaction& t) // if primary.. if (role == 0 && - osd->osdmap->get_epoch() > 1) { + osd->osdmap->post_mkfs()) { // who is clean? clean_set.clear(); if (info.is_clean()) diff --git a/branches/sage/pgs/osdc/Objecter.cc b/branches/sage/pgs/osdc/Objecter.cc index 6039c90327140..64d2374b5bd99 100644 --- a/branches/sage/pgs/osdc/Objecter.cc +++ b/branches/sage/pgs/osdc/Objecter.cc @@ -841,7 +841,7 @@ void Objecter::ms_handle_failure(Message *m, entity_name_t dest, const entity_in dout(0) << "ms_handle_failure " << dest << " inst " << inst << ", dropping and reporting to mon" << mon << endl; - messenger->send_message(new MOSDFailure(inst, osdmap->get_epoch()), + messenger->send_message(new MOSDFailure(messenger->get_myinst(), inst, osdmap->get_epoch()), monmap->get_inst(mon)); delete m; } else { -- 2.39.5