From 8e350e1ec332eddc2c03a00968e27465ddb58848 Mon Sep 17 00:00:00 2001 From: sageweil Date: Fri, 13 Jul 2007 17:50:22 +0000 Subject: [PATCH] * merged 1447:1497 from trunk into branches/sage/cephmds2 git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1499 29311d96-e01e-0410-9327-a35deaab8ce9 --- branches/sage/cephmds2/TODO | 17 +- branches/sage/cephmds2/client/Client.cc | 50 +- branches/sage/cephmds2/client/Client.h | 18 + branches/sage/cephmds2/cmonctl.cc | 12 +- branches/sage/cephmds2/config.cc | 22 +- branches/sage/cephmds2/config.h | 6 +- branches/sage/cephmds2/cosd.cc | 10 +- branches/sage/cephmds2/doc/Commitdir.txt | 2 + branches/sage/cephmds2/doc/caching.txt | 1 + branches/sage/cephmds2/ebofs/Ebofs.cc | 13 +- branches/sage/cephmds2/ebofs/Ebofs.h | 6 +- branches/sage/cephmds2/ebofs/FileJournal.cc | 3 +- branches/sage/cephmds2/fakesyn.cc | 6 + branches/sage/cephmds2/include/buffer.h | 66 +- branches/sage/cephmds2/include/types.h | 2 +- branches/sage/cephmds2/mds/MDBalancer.cc | 2 +- branches/sage/cephmds2/mds/MDCache.cc | 2 +- branches/sage/cephmds2/mds/MDS.cc | 20 +- branches/sage/cephmds2/mds/MDS.h | 1 - branches/sage/cephmds2/mds/MDSMap.h | 199 ++-- .../sage/cephmds2/messages/MClientMount.h | 14 +- .../sage/cephmds2/messages/MClientUnmount.h | 16 +- branches/sage/cephmds2/messages/MMDSBeacon.h | 22 +- branches/sage/cephmds2/messages/MMonCommand.h | 6 + branches/sage/cephmds2/messages/MMonPaxos.h | 10 +- branches/sage/cephmds2/messages/MOSDBoot.h | 16 +- branches/sage/cephmds2/messages/MOSDFailure.h | 23 +- branches/sage/cephmds2/messages/MOSDGetMap.h | 6 +- branches/sage/cephmds2/messages/MOSDMap.h | 6 +- branches/sage/cephmds2/mon/ClientMonitor.cc | 153 ++- branches/sage/cephmds2/mon/ClientMonitor.h | 48 +- branches/sage/cephmds2/mon/Elector.cc | 25 +- branches/sage/cephmds2/mon/MDSMonitor.cc | 604 +++++++----- branches/sage/cephmds2/mon/MDSMonitor.h | 100 +- branches/sage/cephmds2/mon/MonMap.h | 5 +- branches/sage/cephmds2/mon/Monitor.cc | 487 +++++----- branches/sage/cephmds2/mon/MonitorStore.cc | 1 + branches/sage/cephmds2/mon/OSDMonitor.cc | 910 ++++++++---------- branches/sage/cephmds2/mon/OSDMonitor.h | 113 ++- branches/sage/cephmds2/mon/Paxos.cc | 194 ++-- branches/sage/cephmds2/mon/Paxos.h | 34 +- branches/sage/cephmds2/mon/PaxosService.cc | 87 +- branches/sage/cephmds2/mon/PaxosService.h | 55 +- branches/sage/cephmds2/msg/FakeMessenger.cc | 5 +- branches/sage/cephmds2/newsyn.cc | 63 +- branches/sage/cephmds2/osd/FakeStore.cc | 2 +- branches/sage/cephmds2/osd/FakeStore.h | 2 +- branches/sage/cephmds2/osd/OSD.cc | 14 +- branches/sage/cephmds2/osd/OSDMap.h | 82 +- branches/sage/cephmds2/osd/ObjectStore.h | 10 +- branches/sage/cephmds2/osd/PG.cc | 4 +- branches/sage/cephmds2/osdc/Objecter.cc | 2 +- 52 files changed, 1948 insertions(+), 1629 deletions(-) diff --git a/branches/sage/cephmds2/TODO b/branches/sage/cephmds2/TODO index 58b14ac21b9da..1be506037d234 100644 --- a/branches/sage/cephmds2/TODO +++ b/branches/sage/cephmds2/TODO @@ -1,5 +1,11 @@ +- change same_inst_since to align with "in" set +- tag MClientRequest with mdsmap v +- push new mdsmap to clients on send_message_client, based on the tag? + - hrm, what about exports and stale caps wonkiness... there's a race with the REAP. hmm. + + some smallish projects: - crush rewrite in C @@ -36,6 +42,10 @@ general kernel planning - soft consistency on lookup? - accurate reconstruction of (syscall) path? +software raid layer for EBOFS? +- actually, we just need software raid _awareness_ in the allocator, so + that we can write only full stripes, without fear of clobbering things on + failure. then use MD or similar layer provided by kernel. sage doc @@ -148,15 +158,11 @@ sage mds osdmon -- distribute w/ paxos framework - allow fresh replacement osds. add osd_created in osdmap, probably - monitor needs to monitor some osds... - monitor pg states, notify on out? - watch osd utilization; adjust overload in cluster map -mdsmon -- distribute w/ paxos framework - journaler - fix up for large events (e.g. imports) - use set_floor_and_read for safe takeover from possibly-not-quite-dead otherguy. @@ -229,6 +235,9 @@ osd/rados - pg_bit/pg_num changes - report crashed pgs? +messenger +- fix messenger shutdown.. we shouldn't delete messenger, since the caller may be referencing it, etc. + simplemessenger - close idle connections - buffer sent messages until a receive is acknowledged (handshake!) diff --git a/branches/sage/cephmds2/client/Client.cc b/branches/sage/cephmds2/client/Client.cc index 12c30ecf2368e..e28e71a3047ed 100644 --- a/branches/sage/cephmds2/client/Client.cc +++ b/branches/sage/cephmds2/client/Client.cc @@ -95,13 +95,14 @@ public: // cons/des -Client::Client(Messenger *m, MonMap *mm) +Client::Client(Messenger *m, MonMap *mm) : timer(client_lock) { // which client am i? whoami = m->get_myname().num(); monmap = mm; mounted = false; + mount_timeout_event = 0; unmounting = false; last_tid = 0; @@ -512,11 +513,12 @@ int Client::choose_target_mds(MClientRequest *req) // pick mds if (!diri || g_conf.client_use_random_mds) { // no root info, pick a random MDS - mds = rand() % mdsmap->get_num_mds(); // FIXME: this isn't really correct. + mds = mdsmap->get_random_in_mds(); + if (mds < 0) mds = 0; if (0) { - dout(0) << "hack: sending all requests to mds0" << endl; - mds = 0; + mds = 1; + dout(0) << "hack: sending all requests to mds" << mds << endl; } } else { if (req->auth_is_best()) { @@ -893,10 +895,12 @@ void Client::handle_mds_map(MMDSMap* m) messenger->reset_myname(m->get_dest()); mount_cond.Signal(); // mount might be waiting for this. - } + } dout(1) << "handle_mds_map epoch " << m->get_epoch() << endl; + epoch_t was = mdsmap->get_epoch(); mdsmap->decode(m->get_encoded()); + assert(mdsmap->get_epoch() >= was); // send reconnect? if (frommds >= 0 && @@ -1264,7 +1268,28 @@ void Client::update_caps_wanted(Inode *in) // ------------------- -// fs ops +// MOUNT + +void Client::_try_mount() +{ + dout(10) << "_try_mount" << endl; + int mon = monmap->pick_mon(); + dout(2) << "sending client_mount to mon" << mon << endl; + messenger->send_message(new MClientMount(messenger->get_myaddr()), + monmap->get_inst(mon)); + + // schedule timeout + assert(mount_timeout_event == 0); + mount_timeout_event = new C_MountTimeout(this); + timer.add_event_after(g_conf.client_mount_timeout, mount_timeout_event); +} + +void Client::_mount_timeout() +{ + dout(10) << "_mount_timeout" << endl; + mount_timeout_event = 0; + _try_mount(); +} int Client::mount() { @@ -1272,14 +1297,15 @@ int Client::mount() assert(!mounted); // caller is confused? assert(!mdsmap); - int mon = monmap->pick_mon(); - dout(2) << "sending client_mount to mon" << mon << endl; - messenger->send_message(new MClientMount, monmap->get_inst(mon)); + _try_mount(); while (!mdsmap || !osdmap || osdmap->get_epoch() == 0) mount_cond.Wait(client_lock); + + timer.cancel_event(mount_timeout_event); + mount_timeout_event = 0; mounted = true; @@ -1303,6 +1329,9 @@ int Client::mount() } +// UNMOUNT + + int Client::unmount() { client_lock.Lock(); @@ -1372,7 +1401,8 @@ int Client::unmount() // send unmount! int mon = monmap->pick_mon(); dout(2) << "sending client_unmount to mon" << mon << endl; - messenger->send_message(new MClientUnmount, monmap->get_inst(mon)); + messenger->send_message(new MClientUnmount(messenger->get_myinst()), + monmap->get_inst(mon)); while (mounted) mount_cond.Wait(client_lock); diff --git a/branches/sage/cephmds2/client/Client.h b/branches/sage/cephmds2/client/Client.h index 02601b21e7b2e..6a4521a7f6179 100644 --- a/branches/sage/cephmds2/client/Client.h +++ b/branches/sage/cephmds2/client/Client.h @@ -34,9 +34,11 @@ #include "include/interval_set.h" #include "common/Mutex.h" +#include "common/Timer.h" #include "FileCache.h" + // stl #include #include @@ -328,6 +330,7 @@ class Client : public Dispatcher { MDSMap *mdsmap; OSDMap *osdmap; + SafeTimer timer; protected: Messenger *messenger; @@ -580,6 +583,21 @@ protected: // ---------------------- // fs ops. +private: + void _try_mount(); + void _mount_timeout(); + Context *mount_timeout_event; + + class C_MountTimeout : public Context { + Client *client; + public: + C_MountTimeout(Client *c) : client(c) { } + void finish(int r) { + if (r >= 0) client->_mount_timeout(); + } + }; + +public: int mount(); int unmount(); diff --git a/branches/sage/cephmds2/cmonctl.cc b/branches/sage/cephmds2/cmonctl.cc index 19148942acc76..34bd80f9a4d8f 100644 --- a/branches/sage/cephmds2/cmonctl.cc +++ b/branches/sage/cephmds2/cmonctl.cc @@ -64,8 +64,13 @@ int main(int argc, char **argv, char *envp[]) { int r = monmap.read(".ceph_monmap"); assert(r >= 0); + // start up network + rank.start_rank(); + messenger = rank.register_entity(entity_name_t(entity_name_t::TYPE_ADMIN)); + messenger->set_dispatcher(&dispatcher); + // build command - MMonCommand *m = new MMonCommand; + MMonCommand *m = new MMonCommand(messenger->get_myinst()); string cmd; for (unsigned i=0; iset_dispatcher(&dispatcher); - // send it messenger->send_message(m, monmap.get_inst(mon)); diff --git a/branches/sage/cephmds2/config.cc b/branches/sage/cephmds2/config.cc index 71a0940a20a29..9094625bd2848 100644 --- a/branches/sage/cephmds2/config.cc +++ b/branches/sage/cephmds2/config.cc @@ -96,6 +96,7 @@ md_config_t g_conf = { debug_ns: 0, debug_ms: 0, debug_mon: 0, + debug_paxos: 0, debug_after: 0, @@ -104,7 +105,7 @@ md_config_t g_conf = { // --- clock --- clock_lock: false, - clock_tare: true, + clock_tare: false, // --- messenger --- ms_single_dispatch: false, @@ -128,11 +129,13 @@ md_config_t g_conf = { // --- mon --- mon_tick_interval: 5, mon_osd_down_out_interval: 5, // seconds - mon_lease: 5, // seconds - mon_lease_renew_interval: 3, - mon_lease_ack_timeout: 10.0, - mon_accept_timeout: 10.0, - mon_stop_with_last_mds: true, + mon_lease: 5, // seconds // lease interval + mon_lease_renew_interval: 3, // on leader, to renew the lease + mon_lease_ack_timeout: 10.0, // on leader, if lease isn't acked by all peons + mon_lease_timeout: 10.0, // on peon, if lease isn't extended + mon_accept_timeout: 10.0, // on leader, if paxos update isn't accepted + mon_stop_on_last_unmount: false, + mon_stop_with_last_mds: false, // --- client --- client_cache_size: 300, @@ -148,6 +151,8 @@ md_config_t g_conf = { client_oc_max_dirty: 1024*1024* 5, // MB * n client_oc_max_sync_write: 128*1024, // writes >= this use wrlock + client_mount_timeout: 10.0, // retry every N seconds + client_trace: 0, fuse_direct_io: 0, @@ -199,7 +204,6 @@ md_config_t g_conf = { mds_trim_on_rejoin: true, mds_commit_on_shutdown: true, mds_shutdown_check: 0, //30, - mds_shutdown_on_last_unmount: true, mds_verify_export_dirauth: true, @@ -614,8 +618,6 @@ void parse_config_options(std::vector& args) g_conf.mds_commit_on_shutdown = atoi(args[++i]); else if (strcmp(args[i], "--mds_shutdown_check") == 0) g_conf.mds_shutdown_check = atoi(args[++i]); - else if (strcmp(args[i], "--mds_shutdown_on_last_unmount") == 0) - g_conf.mds_shutdown_on_last_unmount = atoi(args[++i]); else if (strcmp(args[i], "--mds_log_flush_on_shutdown") == 0) g_conf.mds_log_flush_on_shutdown = atoi(args[++i]); @@ -677,6 +679,8 @@ void parse_config_options(std::vector& args) else if (strcmp(args[i], "--mon_osd_down_out_interval") == 0) g_conf.mon_osd_down_out_interval = atoi(args[++i]); + else if (strcmp(args[i], "--mon_stop_on_last_unmount") == 0) + g_conf.mon_stop_on_last_unmount = atoi(args[++i]); else if (strcmp(args[i], "--mon_stop_with_last_mds") == 0) g_conf.mon_stop_with_last_mds = atoi(args[++i]); diff --git a/branches/sage/cephmds2/config.h b/branches/sage/cephmds2/config.h index a77ba355e5e6a..8cb902df4ea90 100644 --- a/branches/sage/cephmds2/config.h +++ b/branches/sage/cephmds2/config.h @@ -81,6 +81,7 @@ struct md_config_t { int debug_ns; int debug_ms; int debug_mon; + int debug_paxos; int debug_after; @@ -116,7 +117,9 @@ struct md_config_t { float mon_lease; float mon_lease_renew_interval; float mon_lease_ack_timeout; + float mon_lease_timeout; float mon_accept_timeout; + bool mon_stop_on_last_unmount; bool mon_stop_with_last_mds; // client @@ -133,6 +136,8 @@ struct md_config_t { int client_oc_max_dirty; size_t client_oc_max_sync_write; + double client_mount_timeout; + /* @@ -197,7 +202,6 @@ struct md_config_t { bool mds_trim_on_rejoin; bool mds_commit_on_shutdown; int mds_shutdown_check; - bool mds_shutdown_on_last_unmount; bool mds_verify_export_dirauth; // debug flag diff --git a/branches/sage/cephmds2/cosd.cc b/branches/sage/cephmds2/cosd.cc index 800eacf5acd9a..4f3c8ab71a19f 100644 --- a/branches/sage/cephmds2/cosd.cc +++ b/branches/sage/cephmds2/cosd.cc @@ -66,7 +66,8 @@ int main(int argc, char **argv) if (g_conf.clock_tare) g_clock.tare(); // osd specific args - char *dev; + char *dev = 0; + char dev_default[20]; int whoami = -1; for (unsigned i=0; i (/usr, /home) + /usr/local -> () /home -> () and on mds1: diff --git a/branches/sage/cephmds2/ebofs/Ebofs.cc b/branches/sage/cephmds2/ebofs/Ebofs.cc index f315d0385016f..2f33b2098b34b 100644 --- a/branches/sage/cephmds2/ebofs/Ebofs.cc +++ b/branches/sage/cephmds2/ebofs/Ebofs.cc @@ -127,6 +127,7 @@ int Ebofs::mount() if (e < super_epoch) { dout(-3) << "mount replay: skipping old entry in epoch " << e << " < " << super_epoch << endl; + continue; } if (e == super_epoch+1) { super_epoch++; @@ -1511,7 +1512,7 @@ void Ebofs::alloc_write(Onode *on, -void Ebofs::apply_write(Onode *on, off_t off, size_t len, bufferlist& bl) +void Ebofs::apply_write(Onode *on, off_t off, size_t len, const bufferlist& bl) { ObjectCache *oc = on->get_oc(&bc); @@ -2318,7 +2319,7 @@ unsigned Ebofs::_apply_transaction(Transaction& t) -int Ebofs::_write(object_t oid, off_t offset, size_t length, bufferlist& bl) +int Ebofs::_write(object_t oid, off_t offset, size_t length, const bufferlist& bl) { dout(7) << "_write " << oid << " " << offset << "~" << length << endl; assert(bl.length() == length); @@ -2383,7 +2384,7 @@ int Ebofs::_write(object_t oid, off_t offset, size_t length, bufferlist& bl) int Ebofs::write(object_t oid, off_t off, size_t len, - bufferlist& bl, Context *onsafe) + const bufferlist& bl, Context *onsafe) { ebofs_lock.Lock(); assert(len > 0); @@ -2398,9 +2399,9 @@ int Ebofs::write(object_t oid, if (journal) { Transaction t; t.write(oid, off, len, bl); - bufferlist bl; - t._encode(bl); - if (journal->submit_entry(bl, onsafe)) break; + bufferlist tbl; + t._encode(tbl); + if (journal->submit_entry(tbl, onsafe)) break; } if (onsafe) commit_waiters[super_epoch].push_back(onsafe); break; diff --git a/branches/sage/cephmds2/ebofs/Ebofs.h b/branches/sage/cephmds2/ebofs/Ebofs.h index eb20cf8920531..4d95a71f77e4e 100644 --- a/branches/sage/cephmds2/ebofs/Ebofs.h +++ b/branches/sage/cephmds2/ebofs/Ebofs.h @@ -188,7 +188,7 @@ protected: block_t start, block_t len, interval_set& alloc, block_t& old_bfirst, block_t& old_blast); - void apply_write(Onode *on, off_t off, size_t len, bufferlist& bl); + void apply_write(Onode *on, off_t off, size_t len, const bufferlist& bl); bool attempt_read(Onode *on, off_t off, size_t len, bufferlist& bl, Cond *will_wait_on, bool *will_wait_on_bool); @@ -275,7 +275,7 @@ protected: int read(object_t, off_t off, size_t len, bufferlist& bl); int is_cached(object_t oid, off_t off, size_t len); - int write(object_t oid, off_t off, size_t len, bufferlist& bl, Context *onsafe); + int write(object_t oid, off_t off, size_t len, const bufferlist& bl, Context *onsafe); void trim_from_cache(object_t oid, off_t off, size_t len); int truncate(object_t oid, off_t size, Context *onsafe=0); int truncate_front(object_t oid, off_t size, Context *onsafe=0); @@ -339,7 +339,7 @@ private: int _getattrs(object_t oid, map &aset); bool _write_will_block(); - int _write(object_t oid, off_t off, size_t len, bufferlist& bl); + int _write(object_t oid, off_t off, size_t len, const bufferlist& bl); void _trim_from_cache(object_t oid, off_t off, size_t len); int _truncate(object_t oid, off_t size); int _truncate_front(object_t oid, off_t size); diff --git a/branches/sage/cephmds2/ebofs/FileJournal.cc b/branches/sage/cephmds2/ebofs/FileJournal.cc index 8bc942c861b92..40a73a442182d 100644 --- a/branches/sage/cephmds2/ebofs/FileJournal.cc +++ b/branches/sage/cephmds2/ebofs/FileJournal.cc @@ -48,6 +48,7 @@ int FileJournal::create() dout(1) << "open " << fn << " " << st.st_size << " bytes" << endl; // write empty header + memset(&header, 0, sizeof(header)); header.clear(); header.fsid = ebofs->get_fsid(); header.max_size = st.st_size; @@ -274,7 +275,7 @@ bool FileJournal::submit_entry(bufferlist& e, Context *oncommit) << endl; full = true; print_header(); - return false; + return false; } } else { // we haven't wrapped. diff --git a/branches/sage/cephmds2/fakesyn.cc b/branches/sage/cephmds2/fakesyn.cc index 236c881675ddb..75f8917d9cdc9 100644 --- a/branches/sage/cephmds2/fakesyn.cc +++ b/branches/sage/cephmds2/fakesyn.cc @@ -79,6 +79,12 @@ int main(int argc, char **argv) if (g_conf.clock_tare) g_clock.tare(); + + // stop on our own + g_conf.mon_stop_on_last_unmount = true; + g_conf.mon_stop_with_last_mds = true; + + MonMap *monmap = new MonMap(g_conf.num_mon); entity_addr_t a; for (int i=0; idata + _off; } unsigned length() const { return _len; } unsigned offset() const { return _off; } - unsigned unused_tail_length() const { return _raw->len - (_off+_len); } + unsigned start() const { return _off; } + unsigned end() const { return _off + _len; } + unsigned unused_tail_length() const { + if (_raw) + return _raw->len - (_off+_len); + else + return 0; + } const char& operator[](unsigned n) const { assert(_raw); assert(n < _len); @@ -332,6 +339,7 @@ public: // my private bits std::list _buffers; unsigned _len; + ptr append_buffer; // where i put small appends. public: // cons/des @@ -352,10 +360,10 @@ public: const std::list& buffers() const { return _buffers; } unsigned length() const { -#if 0 +#if 1 // DEBUG: verify _len unsigned len = 0; - for (std::list::iterator it = _buffers.begin(); + for (std::list::const_iterator it = _buffers.begin(); it != _buffers.end(); it++) { len += (*it).length(); @@ -508,33 +516,23 @@ public: void append(const char *data, unsigned len) { - if (len == 0) return; - - unsigned alen = 0; - - // copy into the tail buffer? - if (!_buffers.empty()) { - unsigned avail = _buffers.back().unused_tail_length(); - if (avail > 0) { - //std::cout << "copying up to " << len << " into tail " << avail << " bytes of tail buf " << _buffers.back() << std::endl; - if (avail > len) - avail = len; - _buffers.back().append(data, avail); - _len += avail; - data += avail; - len -= avail; + while (len > 0) { + // put what we can into the existing append_buffer. + if (append_buffer.unused_tail_length() > 0) { + unsigned gap = append_buffer.unused_tail_length(); + if (gap > len) gap = len; + append_buffer.append(data, gap); + append(append_buffer, append_buffer.end() - gap, gap); // add segment to the list + len -= gap; + data += gap; } - alen = _buffers.back().length(); + if (len == 0) break; // done! + + // make a new append_buffer! + unsigned alen = BUFFER_PAGE_SIZE * (((len-1) / BUFFER_PAGE_SIZE) + 1); + append_buffer = create_page_aligned(alen); + append_buffer.set_length(0); // unused, so far. } - if (len == 0) return; - - // just add another buffer. - // alloc a bit extra, in case we do a bunch of appends. FIXME be smarter! - if (alen < 4096) alen = 4096; - ptr bp = create(alen); - bp.set_length(len); - bp.copy_in(0, len, data); - push_back(bp); } void append(ptr& bp) { push_back(bp); @@ -545,8 +543,11 @@ public: push_back(tempbp); } void append(const list& bl) { - list temp(bl); // copy list - claim_append(temp); // and append + _len += bl._len; + for (std::list::const_iterator p = bl._buffers.begin(); + p != bl._buffers.end(); + ++p) + _buffers.push_back(*p); } @@ -586,12 +587,12 @@ public: } } - void substr_of(list& other, unsigned off, unsigned len) { + void substr_of(const list& other, unsigned off, unsigned len) { assert(off + len <= other.length()); clear(); // skip off - std::list::iterator curbuf = other._buffers.begin(); + std::list::const_iterator curbuf = other._buffers.begin(); while (off > 0) { assert(curbuf != _buffers.end()); if (off >= (*curbuf).length()) { @@ -953,6 +954,7 @@ inline void _decode(bufferptr& bp, bufferlist& bl, int& off) inline void _encode(const bufferlist& s, bufferlist& bl) { uint32_t len = s.length(); + cout << "_encode bufferlist len " << len << endl; _encoderaw(len, bl); bl.append(s); } diff --git a/branches/sage/cephmds2/include/types.h b/branches/sage/cephmds2/include/types.h index 8b778c90aa7a8..d6bdbff2983db 100644 --- a/branches/sage/cephmds2/include/types.h +++ b/branches/sage/cephmds2/include/types.h @@ -258,7 +258,7 @@ struct inode_t { bool anchored; // auth only? // file (data access) - off_t size, max_size; + off_t size, max_size, allocated_size; utime_t mtime; // file data modify time. utime_t atime; // file data access time. diff --git a/branches/sage/cephmds2/mds/MDBalancer.cc b/branches/sage/cephmds2/mds/MDBalancer.cc index 75e8872a4dc0a..58de2647753e7 100644 --- a/branches/sage/cephmds2/mds/MDBalancer.cc +++ b/branches/sage/cephmds2/mds/MDBalancer.cc @@ -160,7 +160,7 @@ void MDBalancer::send_heartbeat() set up; - mds->get_mds_map()->get_up_mds_set(up); + mds->get_mds_map()->get_in_mds_set(up); for (set::iterator p = up.begin(); p != up.end(); ++p) { if (*p == mds->get_nodeid()) continue; MHeartbeat *hb = new MHeartbeat(load, beat_epoch); diff --git a/branches/sage/cephmds2/mds/MDCache.cc b/branches/sage/cephmds2/mds/MDCache.cc index 7a571ff86b0ad..1e89350473778 100644 --- a/branches/sage/cephmds2/mds/MDCache.cc +++ b/branches/sage/cephmds2/mds/MDCache.cc @@ -3285,7 +3285,7 @@ bool MDCache::shutdown_pass() { dout(7) << "shutdown_pass" << endl; - if (mds->is_out()) { + if (mds->is_stopped()) { dout(7) << " already shut down" << endl; show_cache(); show_subtrees(); diff --git a/branches/sage/cephmds2/mds/MDS.cc b/branches/sage/cephmds2/mds/MDS.cc index 2fbcc72b8d4f2..5faa99b989bf2 100644 --- a/branches/sage/cephmds2/mds/MDS.cc +++ b/branches/sage/cephmds2/mds/MDS.cc @@ -291,11 +291,8 @@ void MDS::send_message_client_maybe_open(Message *m, entity_inst_t clientinst) int MDS::init(bool standby) { mds_lock.Lock(); - - if (standby) - want_state = MDSMap::STATE_STANDBY; - else - want_state = MDSMap::STATE_STARTING; + + want_state = MDSMap::STATE_BOOT; // starting beacon. this will induce an MDSMap from the monitor beacon_start(); @@ -397,7 +394,7 @@ void MDS::beacon_send() beacon_seq_stamp[beacon_last_seq] = g_clock.now(); int mon = monmap->pick_mon(); - messenger->send_message(new MMDSBeacon(want_state, beacon_last_seq), + messenger->send_message(new MMDSBeacon(messenger->get_myinst(), want_state, beacon_last_seq), monmap->get_inst(mon)); // schedule next sender @@ -495,8 +492,6 @@ void MDS::handle_mds_map(MMDSMap *m) mdsmap->get_mds_set(oldactive, MDSMap::STATE_ACTIVE); set oldcreating; mdsmap->get_mds_set(oldcreating, MDSMap::STATE_CREATING); - set oldout; - mdsmap->get_mds_set(oldout, MDSMap::STATE_OUT); set oldstopped; mdsmap->get_mds_set(oldstopped, MDSMap::STATE_STOPPED); @@ -504,7 +499,7 @@ void MDS::handle_mds_map(MMDSMap *m) mdsmap->decode(m->get_encoded()); // see who i am - whoami = mdsmap->get_inst_rank(messenger->get_myaddr()); + whoami = mdsmap->get_addr_rank(messenger->get_myaddr()); if (oldwhoami != whoami) { // update messenger. messenger->reset_myname(MSG_ADDR_MDS(whoami)); @@ -993,7 +988,7 @@ void MDS::shutdown_start() // tell everyone to stop. set active; - mdsmap->get_active_mds_set(active); + mdsmap->get_in_mds_set(active); for (set::iterator p = active.begin(); p != active.end(); p++) { @@ -1051,9 +1046,6 @@ int MDS::shutdown_final() if (logger2) logger2->flush(true); mdlog->flush_logger(); - // send final down:out beacon (it doesn't matter if this arrives) - set_want_state(MDSMap::STATE_OUT); - // stop timers if (beacon_killer) { timer.cancel_event(beacon_killer); @@ -1243,7 +1235,7 @@ void MDS::my_dispatch(Message *m) // shut down? if (is_stopping()) { if (mdcache->shutdown_pass()) { - dout(7) << "shutdown_pass=true, finished w/ shutdown, moving to up:stopped" << endl; + dout(7) << "shutdown_pass=true, finished w/ shutdown, moving to down:stopped" << endl; stopping_done(); } } diff --git a/branches/sage/cephmds2/mds/MDS.h b/branches/sage/cephmds2/mds/MDS.h index c5dd4652aed7e..ae8fb4d618d4e 100644 --- a/branches/sage/cephmds2/mds/MDS.h +++ b/branches/sage/cephmds2/mds/MDS.h @@ -127,7 +127,6 @@ class MDS : public Dispatcher { } bool is_dne() { return state == MDSMap::STATE_DNE; } - bool is_out() { return state == MDSMap::STATE_OUT; } bool is_failed() { return state == MDSMap::STATE_FAILED; } bool is_creating() { return state == MDSMap::STATE_CREATING; } bool is_starting() { return state == MDSMap::STATE_STARTING; } diff --git a/branches/sage/cephmds2/mds/MDSMap.h b/branches/sage/cephmds2/mds/MDSMap.h index 8c47e9ed09681..d72e6a1f21cca 100644 --- a/branches/sage/cephmds2/mds/MDSMap.h +++ b/branches/sage/cephmds2/mds/MDSMap.h @@ -26,41 +26,69 @@ #include using namespace std; + +/* + + beautiful state diagram: + + STOPPED DNE FAILED + / | \ / | | + / | \________ _______/ | | +| v v v v v +| STARTING <--> STANDBY <--> CREATING REPLAY -> RECONNECT -> REJOIN +| \ / / +| \____ ____________/ / + \ v v / + \ ACTIVE <----------------------------------------/ + \ | + \ | + \ v + \-- STOPPING + + + + +*/ + + class MDSMap { public: // mds states - static const int STATE_DNE = 0; // down, never existed. - static const int STATE_OUT = 1; // down, once existed, but no subtrees, empty log. - static const int STATE_FAILED = 2; // down, active subtrees; needs to be recovered. - - static const int STATE_STANDBY = 3; // up, but inactive. waiting for assignment by monitor. - static const int STATE_CREATING = 4; // up, creating MDS instance (new journal, idalloc..) - static const int STATE_STARTING = 5; // up, starting prior out MDS instance. - static const int STATE_REPLAY = 6; // up, scanning journal, recoverying any shared state - static const int STATE_RESOLVE = 7; // up, disambiguating partial distributed operations (import/export, ...rename?) - static const int STATE_RECONNECT = 8; // up, reconnect to clients - static const int STATE_REJOIN = 9; // up, replayed journal, rejoining distributed cache - static const int STATE_ACTIVE = 10; // up, active - static const int STATE_STOPPING = 11; // up, exporting metadata (-> standby or out) - static const int STATE_STOPPED = 12; // up, finished stopping. like standby, but not avail to takeover. + static const int STATE_DNE = 0; // down, never existed. + static const int STATE_STOPPED = -1; // down, once existed, but no subtrees. empty log. + static const int STATE_FAILED = 2; // down, active subtrees; needs to be recovered. + + static const int STATE_BOOT = -3; // up, boot announcement. destiny unknown. + static const int STATE_STANDBY = -4; // up, idle. waiting for assignment by monitor. + static const int STATE_CREATING = -5; // up, creating MDS instance (new journal, idalloc..). + static const int STATE_STARTING = -6; // up, starting prior stopped MDS instance. + + static const int STATE_REPLAY = 7; // up, starting prior failed instance. scanning journal. + static const int STATE_RESOLVE = 8; // up, disambiguating distributed operations (import, rename, etc.) + static const int STATE_RECONNECT = 9; // up, reconnect to clients + static const int STATE_REJOIN = 10; // up, replayed journal, rejoining distributed cache + static const int STATE_ACTIVE = 11; // up, active + static const int STATE_STOPPING = 12; // up, exporting metadata (-> standby or out) static const char *get_state_name(int s) { switch (s) { - // down + // down and out case STATE_DNE: return "down:dne"; - case STATE_OUT: return "down:out"; + case STATE_STOPPED: return "down:stopped"; + // down and in case STATE_FAILED: return "down:failed"; - // up - case STATE_STANDBY: return "up:standby"; + // up and out + case STATE_BOOT: return "up:boot"; case STATE_CREATING: return "up:creating"; case STATE_STARTING: return "up:starting"; + case STATE_STANDBY: return "up:standby"; + // up and in case STATE_REPLAY: return "up:replay"; case STATE_RESOLVE: return "up:resolve"; case STATE_RECONNECT: return "up:reconnect"; case STATE_REJOIN: return "up:rejoin"; case STATE_ACTIVE: return "up:active"; case STATE_STOPPING: return "up:stopping"; - case STATE_STOPPED: return "up:stopped"; default: assert(0); } return 0; @@ -71,6 +99,7 @@ class MDSMap { utime_t created; epoch_t same_inst_since; + int target_num; int anchortable; // which MDS has anchortable (fixme someday) int root; // which MDS has root directory @@ -95,7 +124,9 @@ class MDSMap { int get_root() const { return root; } // counts - int get_num_mds() const { return mds_state.size(); } + int get_num_mds() { + return get_num_in_mds(); + } int get_num_mds(int state) { int n = 0; for (map::const_iterator p = mds_state.begin(); @@ -104,24 +135,13 @@ class MDSMap { if (p->second == state) ++n; return n; } - int get_num_in_mds() { - return get_num_up_mds() - get_num_mds(STATE_STANDBY) - get_num_mds(STATE_STOPPED); - } - int get_num_up_mds() { - int n = 0; - for (map::const_iterator p = mds_state.begin(); - p != mds_state.end(); - p++) - if (is_up(p->first)) ++n; - return n; - } - int get_num_up_or_failed_mds() { + + int get_num_in_mds() { int n = 0; for (map::const_iterator p = mds_state.begin(); p != mds_state.end(); p++) - if (is_up(p->first) || is_failed(p->first)) - ++n; + if (p->second > 0) ++n; return n; } @@ -132,19 +152,24 @@ class MDSMap { p++) s.insert(p->first); } - void get_up_mds_set(set& s) { + void get_mds_set(set& s, int state) { for (map::const_iterator p = mds_state.begin(); p != mds_state.end(); p++) - if (is_up(p->first)) + if (p->second == state) s.insert(p->first); + } + void get_up_mds_set(set& s) { + for (map::const_iterator p = mds_state.begin(); + p != mds_state.end(); + p++) + if (is_up(p->first)) s.insert(p->first); } - void get_mds_set(set& s, int state) { + void get_in_mds_set(set& s) { for (map::const_iterator p = mds_state.begin(); p != mds_state.end(); p++) - if (p->second == state) - s.insert(p->first); + if (is_in(p->first)) s.insert(p->first); } void get_active_mds_set(set& s) { get_mds_set(s, MDSMap::STATE_ACTIVE); @@ -161,15 +186,29 @@ class MDSMap { s.insert(p->first); } + int get_random_in_mds() { + vector v; + for (map::const_iterator p = mds_state.begin(); + p != mds_state.end(); + p++) + if (p->second > 0) v.push_back(p->first); + if (v.empty()) + return -1; + else + return v[rand() % v.size()]; + } + // mds states - bool is_down(int m) { return is_dne(m) || is_out(m) || is_failed(m); } + bool is_down(int m) { return is_dne(m) || is_stopped(m) || is_failed(m); } bool is_up(int m) { return !is_down(m); } + bool is_in(int m) { return mds_state.count(m) && mds_state[m] > 0; } + bool is_out(int m) { return !mds_state.count(m) || mds_state[m] <= 0; } bool is_dne(int m) { return mds_state.count(m) == 0 || mds_state[m] == STATE_DNE; } - bool is_out(int m) { return mds_state.count(m) && mds_state[m] == STATE_OUT; } bool is_failed(int m) { return mds_state.count(m) && mds_state[m] == STATE_FAILED; } + bool is_boot(int m) { return mds_state.count(m) && mds_state[m] == STATE_BOOT; } bool is_standby(int m) { return mds_state.count(m) && mds_state[m] == STATE_STANDBY; } bool is_creating(int m) { return mds_state.count(m) && mds_state[m] == STATE_CREATING; } bool is_starting(int m) { return mds_state.count(m) && mds_state[m] == STATE_STARTING; } @@ -185,29 +224,40 @@ class MDSMap { bool has_created(int m) { return mds_created.count(m); } // cluster states + bool is_full() { + return get_num_in_mds() >= target_num; + } bool is_degraded() { // degraded = some recovery in process. fixes active membership and recovery_set. - return get_num_mds(STATE_REPLAY) + + return + get_num_mds(STATE_REPLAY) + get_num_mds(STATE_RESOLVE) + get_num_mds(STATE_RECONNECT) + get_num_mds(STATE_REJOIN) + get_num_mds(STATE_FAILED); } - /*bool is_resolving() { // nodes are resolving distributed ops - return get_num_mds(STATE_RESOLVE); - }*/ bool is_rejoining() { // nodes are rejoining cache state - return get_num_mds(STATE_REJOIN) > 0 && + return + get_num_mds(STATE_REJOIN) > 0 && + get_num_mds(STATE_REPLAY) == 0 && get_num_mds(STATE_RECONNECT) == 0 && get_num_mds(STATE_RESOLVE) == 0 && - get_num_mds(STATE_REPLAY) == 0 && get_num_mds(STATE_FAILED) == 0; } + bool is_stopped() { + return + get_num_in_mds() == 0 && + get_num_mds(STATE_CREATING) == 0 && + get_num_mds(STATE_STARTING) == 0 && + get_num_mds(STATE_STANDBY) == 0; + } int get_state(int m) { - if (mds_state.count(m)) return mds_state[m]; - return STATE_OUT; + if (mds_state.count(m)) + return mds_state[m]; + else + return STATE_DNE; } // inst @@ -226,7 +276,7 @@ class MDSMap { return false; } - int get_inst_rank(const entity_addr_t& addr) { + int get_addr_rank(const entity_addr_t& addr) { for (map::iterator p = mds_inst.begin(); p != mds_inst.end(); ++p) { @@ -257,36 +307,31 @@ class MDSMap { // serialize, unserialize - void encode(bufferlist& blist) { - blist.append((char*)&epoch, sizeof(epoch)); - blist.append((char*)&created, sizeof(created)); - blist.append((char*)&same_inst_since, sizeof(same_inst_since)); - blist.append((char*)&anchortable, sizeof(anchortable)); - blist.append((char*)&root, sizeof(root)); - - ::_encode(mds_state, blist); - ::_encode(mds_state_seq, blist); - ::_encode(mds_inst, blist); - ::_encode(mds_inc, blist); + void encode(bufferlist& bl) { + ::_encode(epoch, bl); + ::_encode(target_num, bl); + ::_encode(created, bl); + ::_encode(same_inst_since, bl); + ::_encode(anchortable, bl); + ::_encode(root, bl); + ::_encode(mds_state, bl); + ::_encode(mds_state_seq, bl); + ::_encode(mds_inst, bl); + ::_encode(mds_inc, bl); } - void decode(bufferlist& blist) { + void decode(bufferlist& bl) { int off = 0; - blist.copy(off, sizeof(epoch), (char*)&epoch); - off += sizeof(epoch); - blist.copy(off, sizeof(created), (char*)&created); - off += sizeof(created); - blist.copy(off, sizeof(same_inst_since), (char*)&same_inst_since); - off += sizeof(same_inst_since); - blist.copy(off, sizeof(anchortable), (char*)&anchortable); - off += sizeof(anchortable); - blist.copy(off, sizeof(root), (char*)&root); - off += sizeof(root); - - ::_decode(mds_state, blist, off); - ::_decode(mds_state_seq, blist, off); - ::_decode(mds_inst, blist, off); - ::_decode(mds_inc, blist, off); + ::_decode(epoch, bl, off); + ::_decode(target_num, bl, off); + ::_decode(created, bl, off); + ::_decode(same_inst_since, bl, off); + ::_decode(anchortable, bl, off); + ::_decode(root, bl, off); + ::_decode(mds_state, bl, off); + ::_decode(mds_state_seq, bl, off); + ::_decode(mds_inst, bl, off); + ::_decode(mds_inc, bl, off); } diff --git a/branches/sage/cephmds2/messages/MClientMount.h b/branches/sage/cephmds2/messages/MClientMount.h index c3bc000911835..d083d72833830 100644 --- a/branches/sage/cephmds2/messages/MClientMount.h +++ b/branches/sage/cephmds2/messages/MClientMount.h @@ -19,12 +19,22 @@ class MClientMount : public Message { public: + entity_addr_t addr; + MClientMount() : Message(MSG_CLIENT_MOUNT) { } + MClientMount(entity_addr_t a) : + Message(MSG_CLIENT_MOUNT), + addr(a) { } char *get_type_name() { return "client_mount"; } - void decode_payload() { } - void encode_payload() { } + void decode_payload() { + int off = 0; + ::_decode(addr, payload, off); + } + void encode_payload() { + ::_encode(addr, payload); + } }; #endif diff --git a/branches/sage/cephmds2/messages/MClientUnmount.h b/branches/sage/cephmds2/messages/MClientUnmount.h index e8acc50f190e0..42fa07db7ba05 100644 --- a/branches/sage/cephmds2/messages/MClientUnmount.h +++ b/branches/sage/cephmds2/messages/MClientUnmount.h @@ -19,12 +19,22 @@ class MClientUnmount : public Message { public: + entity_inst_t inst; + MClientUnmount() : Message(MSG_CLIENT_UNMOUNT) { } - + MClientUnmount(entity_inst_t i) : + Message(MSG_CLIENT_UNMOUNT), + inst(i) { } + char *get_type_name() { return "client_unmount"; } - void decode_payload() { } - void encode_payload() { } + void decode_payload() { + int off = 0; + ::_decode(inst, payload, off); + } + void encode_payload() { + ::_encode(inst, payload); + } }; #endif diff --git a/branches/sage/cephmds2/messages/MMDSBeacon.h b/branches/sage/cephmds2/messages/MMDSBeacon.h index 4789c809572c4..d8b73a45a3122 100644 --- a/branches/sage/cephmds2/messages/MMDSBeacon.h +++ b/branches/sage/cephmds2/messages/MMDSBeacon.h @@ -22,33 +22,37 @@ #include "mds/MDSMap.h" class MMDSBeacon : public Message { + entity_inst_t inst; int state; version_t seq; public: MMDSBeacon() : Message(MSG_MDS_BEACON) {} - MMDSBeacon(int st, version_t se) : Message(MSG_MDS_BEACON), - state(st), seq(se) { } + MMDSBeacon(entity_inst_t i, int st, version_t se) : + Message(MSG_MDS_BEACON), + inst(i), state(st), seq(se) { } + entity_inst_t& get_mds_inst() { return inst; } int get_state() { return state; } version_t get_seq() { return seq; } char *get_type_name() { return "mdsbeacon"; } void print(ostream& out) { - out << "mdsbeacon(" << MDSMap::get_state_name(state) + out << "mdsbeacon(" << inst + << " " << MDSMap::get_state_name(state) << " seq " << seq << ")"; } void encode_payload() { - payload.append((char*)&state, sizeof(state)); - payload.append((char*)&seq, sizeof(seq)); + ::_encode(inst, payload); + ::_encode(state, payload); + ::_encode(seq, payload); } void decode_payload() { int off = 0; - payload.copy(off, sizeof(state), (char*)&state); - off += sizeof(state); - payload.copy(off, sizeof(seq), (char*)&seq); - off += sizeof(seq); + ::_decode(inst, payload, off); + ::_decode(state, payload, off); + ::_decode(seq, payload, off); } }; diff --git a/branches/sage/cephmds2/messages/MMonCommand.h b/branches/sage/cephmds2/messages/MMonCommand.h index d5fd8ae64017a..19d25dd7a4d77 100644 --- a/branches/sage/cephmds2/messages/MMonCommand.h +++ b/branches/sage/cephmds2/messages/MMonCommand.h @@ -22,9 +22,13 @@ using std::vector; class MMonCommand : public Message { public: + entity_inst_t inst; vector cmd; MMonCommand() : Message(MSG_MON_COMMAND) {} + MMonCommand(entity_inst_t i) : + Message(MSG_MON_COMMAND), + inst(i) { } virtual char *get_type_name() { return "mon_command"; } void print(ostream& o) { @@ -37,10 +41,12 @@ class MMonCommand : public Message { } void encode_payload() { + ::_encode(inst, payload); ::_encode(cmd, payload); } void decode_payload() { int off = 0; + ::_decode(inst, payload, off); ::_decode(cmd, payload, off); } }; diff --git a/branches/sage/cephmds2/messages/MMonPaxos.h b/branches/sage/cephmds2/messages/MMonPaxos.h index b33012336901f..7210b179c9a42 100644 --- a/branches/sage/cephmds2/messages/MMonPaxos.h +++ b/branches/sage/cephmds2/messages/MMonPaxos.h @@ -49,7 +49,7 @@ class MMonPaxos : public Message { version_t last_committed; // i've committed to version_t pn_from; // i promise to accept after version_t pn; // with with proposal - version_t old_accepted_pn; // previous pn, if we are a LAST with an uncommitted value + version_t uncommitted_pn; // previous pn, if we are a LAST with an uncommitted value utime_t lease_expire; map values; @@ -59,14 +59,14 @@ class MMonPaxos : public Message { Message(MSG_MON_PAXOS), epoch(e), op(o), machine_id(mid), - last_committed(0), pn_from(0), pn(0), old_accepted_pn(0) { } + last_committed(0), pn_from(0), pn(0), uncommitted_pn(0) { } virtual char *get_type_name() { return "paxos"; } void print(ostream& out) { out << "paxos(" << get_paxos_name(machine_id) << " " << get_opname(op) << " lc " << last_committed - << " pn " << pn << " opn " << old_accepted_pn + << " pn " << pn << " opn " << uncommitted_pn << ")"; } @@ -77,7 +77,7 @@ class MMonPaxos : public Message { ::_encode(last_committed, payload); ::_encode(pn_from, payload); ::_encode(pn, payload); - ::_encode(old_accepted_pn, payload); + ::_encode(uncommitted_pn, payload); ::_encode(lease_expire, payload); ::_encode(values, payload); } @@ -89,7 +89,7 @@ class MMonPaxos : public Message { ::_decode(last_committed, payload, off); ::_decode(pn_from, payload, off); ::_decode(pn, payload, off); - ::_decode(old_accepted_pn, payload, off); + ::_decode(uncommitted_pn, payload, off); ::_decode(lease_expire, payload, off); ::_decode(values, payload, off); } diff --git a/branches/sage/cephmds2/messages/MOSDBoot.h b/branches/sage/cephmds2/messages/MOSDBoot.h index 9ae5a68d6c59a..00c94ad1a2a80 100644 --- a/branches/sage/cephmds2/messages/MOSDBoot.h +++ b/branches/sage/cephmds2/messages/MOSDBoot.h @@ -22,23 +22,29 @@ class MOSDBoot : public Message { public: + entity_inst_t inst; OSDSuperblock sb; MOSDBoot() {} - MOSDBoot(OSDSuperblock& s) : + MOSDBoot(entity_inst_t i, OSDSuperblock& s) : Message(MSG_OSD_BOOT), + inst(i), sb(s) { } - char *get_type_name() { return "oboot"; } + char *get_type_name() { return "osd_boot"; } + void print(ostream& out) { + out << "osd_boot(" << inst << ")"; + } void encode_payload() { - payload.append((char*)&sb, sizeof(sb)); + ::_encode(inst, payload); + ::_encode(sb, payload); } void decode_payload() { int off = 0; - payload.copy(off, sizeof(sb), (char*)&sb); - off += sizeof(sb); + ::_decode(inst, payload, off); + ::_decode(sb, payload, off); } }; diff --git a/branches/sage/cephmds2/messages/MOSDFailure.h b/branches/sage/cephmds2/messages/MOSDFailure.h index 5bc7a5d5ee9f6..965d622a5f5e2 100644 --- a/branches/sage/cephmds2/messages/MOSDFailure.h +++ b/branches/sage/cephmds2/messages/MOSDFailure.h @@ -21,30 +21,35 @@ class MOSDFailure : public Message { public: + entity_inst_t from; entity_inst_t failed; epoch_t epoch; MOSDFailure() {} - MOSDFailure(entity_inst_t f, epoch_t e) : + MOSDFailure(entity_inst_t fr, entity_inst_t f, epoch_t e) : Message(MSG_OSD_FAILURE), - failed(f), epoch(e) {} + from(fr), failed(f), epoch(e) {} + entity_inst_t get_from() { return from; } entity_inst_t get_failed() { return failed; } epoch_t get_epoch() { return epoch; } void decode_payload() { int off = 0; - payload.copy(off, sizeof(failed), (char*)&failed); - off += sizeof(failed); - payload.copy(off, sizeof(epoch), (char*)&epoch); - off += sizeof(epoch); + ::_decode(from, payload, off); + ::_decode(failed, payload, off); + ::_decode(epoch, payload, off); } void encode_payload() { - payload.append((char*)&failed, sizeof(failed)); - payload.append((char*)&epoch, sizeof(epoch)); + ::_encode(from, payload); + ::_encode(failed, payload); + ::_encode(epoch, payload); } - virtual char *get_type_name() { return "osdfail"; } + virtual char *get_type_name() { return "osd_failure"; } + void print(ostream& out) { + out << "osd_failure(" << failed << " e" << epoch << ")"; + } }; #endif diff --git a/branches/sage/cephmds2/messages/MOSDGetMap.h b/branches/sage/cephmds2/messages/MOSDGetMap.h index 5158ce7d3ed83..68e1b7d137dae 100644 --- a/branches/sage/cephmds2/messages/MOSDGetMap.h +++ b/branches/sage/cephmds2/messages/MOSDGetMap.h @@ -23,7 +23,6 @@ class MOSDGetMap : public Message { public: epoch_t since; - //MOSDGetMap() : since(0) {} MOSDGetMap(epoch_t s=0) : Message(MSG_OSD_GETMAP), since(s) { @@ -31,7 +30,10 @@ class MOSDGetMap : public Message { epoch_t get_since() { return since; } - char *get_type_name() { return "getomap"; } + char *get_type_name() { return "get_osd_map"; } + void print(ostream& out) { + out << "get_osd_map(since " << since << ")"; + } void encode_payload() { payload.append((char*)&since, sizeof(since)); diff --git a/branches/sage/cephmds2/messages/MOSDMap.h b/branches/sage/cephmds2/messages/MOSDMap.h index 83929ddd23c28..b6de1b027557c 100644 --- a/branches/sage/cephmds2/messages/MOSDMap.h +++ b/branches/sage/cephmds2/messages/MOSDMap.h @@ -45,10 +45,8 @@ class MOSDMap : public Message { } - MOSDMap() : - Message(MSG_OSD_MAP) {} - MOSDMap(OSDMap *oc) : - Message(MSG_OSD_MAP) { + MOSDMap() : Message(MSG_OSD_MAP) { } + MOSDMap(OSDMap *oc) : Message(MSG_OSD_MAP) { oc->encode(maps[oc->get_epoch()]); } diff --git a/branches/sage/cephmds2/mon/ClientMonitor.cc b/branches/sage/cephmds2/mon/ClientMonitor.cc index 51686e8f2bed4..175f70477f7a5 100644 --- a/branches/sage/cephmds2/mon/ClientMonitor.cc +++ b/branches/sage/cephmds2/mon/ClientMonitor.cc @@ -34,68 +34,85 @@ bool ClientMonitor::update_from_paxos() { assert(paxos->is_active()); + version_t paxosv = paxos->get_version(); dout(10) << "update_from_paxos paxosv " << paxosv << ", my v " << client_map.version << endl; + if (paxosv == client_map.version) return true; assert(paxosv >= client_map.version); + + if (client_map.version == 0 && paxosv > 1 && + mon->store->exists_bl_ss("clientmap","latest")) { + // starting up: load latest + dout(7) << "update_from_paxos startup: loading latest full clientmap" << endl; + bufferlist bl; + mon->store->get_bl_ss(bl, "clientmap", "latest"); + int off = 0; + client_map._decode(bl, off); + } + + // walk through incrementals while (paxosv > client_map.version) { bufferlist bl; bool success = paxos->read(client_map.version+1, bl); if (success) { - dout(10) << "update_from_paxos applying incremental " << client_map.version+1 << endl; + dout(7) << "update_from_paxos applying incremental " << client_map.version+1 << endl; Incremental inc; int off = 0; inc._decode(bl, off); client_map.apply_incremental(inc); - + } else { - dout(10) << "update_from_paxos couldn't read incremental " << client_map.version+1 << endl; + dout(7) << "update_from_paxos couldn't read incremental " << client_map.version+1 << endl; return false; } - - // save latest - bl.clear(); - client_map._encode(bl); - mon->store->put_bl_ss(bl, "clientmap", "latest"); - - // prepare next inc - prepare_pending(); } + // save latest + bufferlist bl; + client_map._encode(bl); + mon->store->put_bl_ss(bl, "clientmap", "latest"); + return true; } -void ClientMonitor::prepare_pending() +void ClientMonitor::create_pending() { + assert(mon->is_leader()); pending_inc = Incremental(); pending_inc.version = client_map.version + 1; pending_inc.next_client = client_map.next_client; - dout(10) << "prepare_pending v " << pending_inc.version + dout(10) << "create_pending v " << pending_inc.version << ", next is " << pending_inc.next_client << endl; } -void ClientMonitor::propose_pending() +void ClientMonitor::create_initial() { - dout(10) << "propose_pending v " << pending_inc.version + dout(1) << "create_initial -- creating initial map" << endl; +} + + + +void ClientMonitor::encode_pending(bufferlist &bl) +{ + assert(mon->is_leader()); + dout(10) << "encode_pending v " << pending_inc.version << ", next is " << pending_inc.next_client << endl; - // apply to paxos assert(paxos->get_version() + 1 == pending_inc.version); - bufferlist bl; pending_inc._encode(bl); - paxos->propose_new_value(bl, new C_Commit(this)); } // ------- -bool ClientMonitor::preprocess_update(Message *m) +bool ClientMonitor::preprocess_query(Message *m) { - dout(10) << "preprocess_update " << *m << " from " << m->get_source_inst() << endl; + dout(10) << "preprocess_query " << *m << " from " << m->get_source_inst() << endl; switch (m->get_type()) { case MSG_CLIENT_MOUNT: @@ -105,7 +122,7 @@ bool ClientMonitor::preprocess_update(Message *m) if (client_map.addr_client.count(addr)) { int client = client_map.addr_client[addr]; dout(7) << " client" << client << " already mounted" << endl; - _mounted(client, m); + _mounted(client, (MClientMount*)m); return true; } } @@ -117,7 +134,7 @@ bool ClientMonitor::preprocess_update(Message *m) int client = m->get_source().num(); if (client_map.client_addr.count(client) == 0) { dout(7) << " client" << client << " not mounted" << endl; - _unmounted(m); + _unmounted((MClientUnmount*)m); return true; } } @@ -131,16 +148,19 @@ bool ClientMonitor::preprocess_update(Message *m) } } -void ClientMonitor::prepare_update(Message *m) +bool ClientMonitor::prepare_update(Message *m) { dout(10) << "prepare_update " << *m << " from " << m->get_source_inst() << endl; - - int client = m->get_source().num(); - entity_addr_t addr = m->get_source_addr(); - + switch (m->get_type()) { case MSG_CLIENT_MOUNT: { + MClientMount *mount = (MClientMount*)m; + entity_addr_t addr = mount->addr; + int client = -1; + if (mount->get_source().is_client()) + client = mount->get_source().num(); + // choose a client id if (client < 0 || (client_map.client_addr.count(client) && @@ -152,32 +172,39 @@ void ClientMonitor::prepare_update(Message *m) } pending_inc.add_mount(client, addr); - pending_commit.push_back(new C_Mounted(this, client, m)); + paxos->wait_for_commit(new C_Mounted(this, client, mount)); } - break; + return true; case MSG_CLIENT_UNMOUNT: { + MClientUnmount *unmount = (MClientUnmount*)m; + assert(unmount->inst.name.is_client()); + int client = unmount->inst.name.num(); + assert(client_map.client_addr.count(client)); pending_inc.add_unmount(client); - pending_commit.push_back(new C_Unmounted(this, m)); + paxos->wait_for_commit(new C_Unmounted(this, unmount)); } - break; - + return true; + default: assert(0); delete m; + return false; } + } // MOUNT -void ClientMonitor::_mounted(int client, Message *m) +void ClientMonitor::_mounted(int client, MClientMount *m) { - entity_inst_t to = m->get_source_inst(); + entity_inst_t to; + to.addr = m->addr; to.name = MSG_ADDR_CLIENT(client); dout(10) << "_mounted client" << client << " at " << to << endl; @@ -189,68 +216,22 @@ void ClientMonitor::_mounted(int client, Message *m) delete m; } -void ClientMonitor::_unmounted(Message *m) +void ClientMonitor::_unmounted(MClientUnmount *m) { - dout(10) << "_unmounted " << m->get_source() << endl; + dout(10) << "_unmounted " << m->inst << endl; // reply with (same) unmount message - mon->messenger->send_message(m, m->get_source_inst()); + mon->messenger->send_message(m, m->inst); // auto-shutdown? - if (update_from_paxos() && - mon->is_leader() && + // (hack for fakesyn/newsyn, mostly) + if (mon->is_leader() && client_map.version > 1 && client_map.client_addr.empty() && - g_conf.mds_shutdown_on_last_unmount) { + g_conf.mon_stop_on_last_unmount) { dout(1) << "last client unmounted" << endl; mon->do_stop(); } } -void ClientMonitor::_commit(int r) -{ - if (r >= 0) { - dout(10) << "_commit success" << endl; - finish_contexts(pending_commit); - } else { - dout(10) << "_commit failed" << endl; - } - - finish_contexts(pending_commit, r); -} - -/* -void ClientMonitor::bcast_latest_mds() -{ - dout(10) << "bcast_latest_mds " << mdsmap.get_epoch() << endl; - - // tell mds - for (set::iterator p = mdsmap.get_mds().begin(); - p != mdsmap.get_mds().end(); - p++) { - if (mdsmap.is_down(*p)) continue; - send_full(MSG_ADDR_MDS(*p), mdsmap.get_inst(*p)); - } -} -*/ - - -void ClientMonitor::create_initial() -{ - dout(10) << "create_initial" << endl; - - if (!mon->is_leader()) return; - if (paxos->get_version() > 0) return; - - if (paxos->is_writeable()) { - dout(1) << "create_initial -- creating initial map" << endl; - prepare_pending(); - propose_pending(); - } else { - dout(1) << "create_initial -- waiting for writeable" << endl; - paxos->wait_for_writeable(new C_CreateInitial(this)); - } -} - - diff --git a/branches/sage/cephmds2/mon/ClientMonitor.h b/branches/sage/cephmds2/mon/ClientMonitor.h index ebb1f05e9c8c6..8321202fc24f1 100644 --- a/branches/sage/cephmds2/mon/ClientMonitor.h +++ b/branches/sage/cephmds2/mon/ClientMonitor.h @@ -28,6 +28,8 @@ using namespace std; class Monitor; class Paxos; +class MClientMount; +class MClientUnmount; class ClientMonitor : public PaxosService { public: @@ -116,74 +118,52 @@ public: } }; - class C_CreateInitial : public Context { - ClientMonitor *cmon; - public: - C_CreateInitial(ClientMonitor *cm) : cmon(cm) {} - void finish(int r) { - cmon->create_initial(); - } - }; - class C_Mounted : public Context { ClientMonitor *cmon; int client; - Message *m; + MClientMount *m; public: - C_Mounted(ClientMonitor *cm, int c, Message *m_) : + C_Mounted(ClientMonitor *cm, int c, MClientMount *m_) : cmon(cm), client(c), m(m_) {} void finish(int r) { if (r >= 0) cmon->_mounted(client, m); else - cmon->dispatch(m); + cmon->dispatch((Message*)m); } }; class C_Unmounted : public Context { ClientMonitor *cmon; - Message *m; + MClientUnmount *m; public: - C_Unmounted(ClientMonitor *cm, Message *m_) : + C_Unmounted(ClientMonitor *cm, MClientUnmount *m_) : cmon(cm), m(m_) {} void finish(int r) { if (r >= 0) cmon->_unmounted(m); else - cmon->dispatch(m); + cmon->dispatch((Message*)m); } }; - class C_Commit : public Context { - ClientMonitor *cmon; - public: - C_Commit(ClientMonitor *cm) : - cmon(cm) {} - void finish(int r) { - cmon->_commit(r); - } - }; private: Map client_map; - list waiting_for_active; // leader Incremental pending_inc; - list pending_commit; // contributers to pending_inc void create_initial(); bool update_from_paxos(); - void prepare_pending(); // prepare a new pending - void propose_pending(); // propose pending update to peers + void create_pending(); // prepare a new pending + void encode_pending(bufferlist &bl); // propose pending update to peers - void _mounted(int c, Message *m); - void _unmounted(Message *m); - void _commit(int r); + void _mounted(int c, MClientMount *m); + void _unmounted(MClientUnmount *m); - void handle_query(Message *m); - bool preprocess_update(Message *m); // true if processed. - void prepare_update(Message *m); + bool preprocess_query(Message *m); // true if processed. + bool prepare_update(Message *m); public: diff --git a/branches/sage/cephmds2/mon/Elector.cc b/branches/sage/cephmds2/mon/Elector.cc index cdfce72bb0681..816946d3cbfe3 100644 --- a/branches/sage/cephmds2/mon/Elector.cc +++ b/branches/sage/cephmds2/mon/Elector.cc @@ -160,9 +160,17 @@ void Elector::handle_propose(MMonElection *m) int from = m->get_source().num(); assert(m->epoch % 2 == 1); // election - if (m->epoch > epoch) + if (m->epoch > epoch) { bump_epoch(m->epoch); - + } + else if (m->epoch < epoch && // got an "old" propose, + epoch % 2 == 0 && // in a non-election cycle + mon->quorum.count(from) == 0) { // from someone outside the quorum + // a mon just started up, call a new election so they can rejoin! + dout(5) << " got propose from old epoch, " << m->get_source() << " must have just started" << endl; + start(); + } + if (whoami < from) { // i would win over them. if (leader_acked >= 0) { // we already acked someone @@ -250,6 +258,12 @@ void Elector::dispatch(Message *m) { MMonElection *em = (MMonElection*)m; + switch (em->op) { + case MMonElection::OP_PROPOSE: + handle_propose(em); + return; + } + if (em->epoch < epoch) { dout(5) << "old epoch, dropping" << endl; delete em; @@ -259,13 +273,10 @@ void Elector::dispatch(Message *m) switch (em->op) { case MMonElection::OP_ACK: handle_ack(em); - break; - case MMonElection::OP_PROPOSE: - handle_propose(em); - break; + return; case MMonElection::OP_VICTORY: handle_victory(em); - break; + return; default: assert(0); } diff --git a/branches/sage/cephmds2/mon/MDSMonitor.cc b/branches/sage/cephmds2/mon/MDSMonitor.cc index 995b6053f1c60..7578ddc99c356 100644 --- a/branches/sage/cephmds2/mon/MDSMonitor.cc +++ b/branches/sage/cephmds2/mon/MDSMonitor.cc @@ -16,12 +16,17 @@ #include "MDSMonitor.h" #include "Monitor.h" #include "MonitorStore.h" +#include "OSDMonitor.h" #include "messages/MMDSMap.h" #include "messages/MMDSGetMap.h" #include "messages/MMDSBeacon.h" #include "messages/MMonCommand.h" +#include "messages/MMonCommandAck.h" + +#include "messages/MGenericMessage.h" + #include "common/Timer.h" @@ -34,307 +39,372 @@ -/********* MDS map **************/ - - class C_RetryMessage : public Context { - Dispatcher *svc; - Message *m; - public: - C_RetryMessage(Dispatcher *s, Message *m_) : svc(s), m(m_) {} - void finish(int r) { - svc->dispatch(m); - } - }; +// my methods -void MDSMonitor::dispatch(Message *m) +void MDSMonitor::print_map(MDSMap &m) { - if (mon->is_peon()) { - dout(1) << "peon, fw to leader" << endl; - mon->messenger->send_message(m, mon->monmap->get_inst(mon->get_leader())); - return; - } - if (mon->is_starting()) { - dout(1) << "starting, waiting" << endl; - waiting_for_active.push_back(new C_RetryMessage(this, m)); - return; + dout(7) << "print_map epoch " << m.get_epoch() << " target_num " << m.target_num << endl; + entity_inst_t blank; + set all; + m.get_mds_set(all); + for (set::iterator p = all.begin(); + p != all.end(); + ++p) { + dout(7) << " mds" << *p << "." << m.mds_inc[*p] + << " : " << MDSMap::get_state_name(m.get_state(*p)) + << " : " << (m.have_inst(*p) ? m.get_inst(*p) : blank) + << endl; } - - switch (m->get_type()) { - - case MSG_MDS_BEACON: - handle_mds_beacon((MMDSBeacon*)m); - break; - - case MSG_MDS_GETMAP: - handle_mds_getmap((MMDSGetMap*)m); - break; - - default: - assert(0); - } } -void MDSMonitor::election_finished() -{ - if (mon->is_leader()) { - - // FIXME be smarter later. - - if (g_conf.mkfs) { - create_initial(); - save_map(); - } else { - load_map(); - } - } - - finish_contexts(waiting_for_active); -} - +// service methods void MDSMonitor::create_initial() { - mdsmap.epoch = 0; // until everyone boots - mdsmap.created = g_clock.now(); - - mdsmap.encode(encoded_map); - - print_map(); + dout(10) << "create_initial" << endl; + pending_mdsmap.target_num = g_conf.num_mds; + pending_mdsmap.created = g_clock.now(); + print_map(pending_mdsmap); } -void MDSMonitor::load_map() +bool MDSMonitor::update_from_paxos() { - int r = mon->store->get_bl_ss(encoded_map, "mdsmap", "current"); - assert(r > 0); - mdsmap.decode(encoded_map); - dout(7) << "load_map epoch " << mdsmap.get_epoch() << endl; + assert(paxos->is_active()); + + version_t paxosv = paxos->get_version(); + dout(10) << "update_from_paxos paxosv " << paxosv + << ", my e " << mdsmap.epoch << endl; + + if (paxosv == mdsmap.epoch) return true; + assert(paxosv >= mdsmap.epoch); + + // read and decode + mdsmap_bl.clear(); + bool success = paxos->read(paxosv, mdsmap_bl); + assert(success); + dout(10) << "update_from_paxos got " << paxosv << endl; + mdsmap.decode(mdsmap_bl); + + // new map + print_map(mdsmap); + + // bcast map to mds, waiters + if (mon->is_leader()) + bcast_latest_mds(); + send_to_waiting(); + + // hackish: did all mds's shut down? + if (mon->is_leader() && + g_conf.mon_stop_with_last_mds && + mdsmap.get_epoch() > 1 && + mdsmap.is_stopped()) + mon->messenger->send_message(new MGenericMessage(MSG_SHUTDOWN), + mon->monmap->get_inst(mon->whoami)); + + return true; } -void MDSMonitor::save_map() +void MDSMonitor::create_pending() { - dout(7) << "save_map epoch " << mdsmap.get_epoch() << endl; - - int r = mon->store->put_bl_ss(encoded_map, "mdsmap", "current"); - assert(r>=0); + pending_mdsmap = mdsmap; + pending_mdsmap.epoch++; + dout(10) << "create_pending e" << pending_mdsmap.epoch << endl; } -void MDSMonitor::print_map() +void MDSMonitor::encode_pending(bufferlist &bl) { - dout(7) << "print_map epoch " << mdsmap.get_epoch() << " num_mds " << g_conf.num_mds << endl; - entity_inst_t blank; - set all; - mdsmap.get_mds_set(all); - for (set::iterator p = all.begin(); - p != all.end(); - ++p) { - dout(7) << " mds" << *p << "." << mdsmap.mds_inc[*p] - << " : " << MDSMap::get_state_name(mdsmap.get_state(*p)) - << " : " << (mdsmap.have_inst(*p) ? mdsmap.get_inst(*p) : blank) - << endl; - } -} - -void MDSMonitor::issue_map() -{ - mdsmap.inc_epoch(); - encoded_map.clear(); - mdsmap.encode(encoded_map); - - dout(7) << "issue_map epoch " << mdsmap.get_epoch() << endl; - - save_map(); - print_map(); + dout(10) << "encode_pending e" << pending_mdsmap.epoch << endl; - // bcast map - bcast_latest_mds(); - send_current(); + print_map(pending_mdsmap); + + // apply to paxos + assert(paxos->get_version() + 1 == pending_mdsmap.epoch); + pending_mdsmap.encode(bl); } -void MDSMonitor::handle_command(MMonCommand *m, int& r, string& rs) +bool MDSMonitor::preprocess_query(Message *m) { - stringstream ss; - if (m->cmd.size() > 1) { - if (m->cmd[1] == "stop" && m->cmd.size() > 2) { - int who = atoi(m->cmd[2].c_str()); - if (mdsmap.is_active(who)) { - r = 0; - ss << "telling mds" << who << " to stop"; - getline(ss,rs); + dout(10) << "preprocess_query " << *m << " from " << m->get_source_inst() << endl; - // hack - mdsmap.mds_state[who] = MDSMap::STATE_STOPPING; - issue_map(); + switch (m->get_type()) { + + case MSG_MDS_BEACON: + return preprocess_beacon((MMDSBeacon*)m); + + case MSG_MDS_GETMAP: + send_full(m->get_source_inst()); + return true; - } else { - ss << "mds" << who << " not active (" << mdsmap.get_state_name(mdsmap.get_state(who)) << ")"; - getline(ss,rs); - } - } - else if (m->cmd[1] == "setnum" && m->cmd.size() > 2) { - g_conf.num_mds = atoi(m->cmd[2].c_str()); - ss << "g_conf.num_mds = " << g_conf.num_mds << endl; - getline(ss,rs); - print_map(); - } + case MSG_MON_COMMAND: + return false; + + default: + assert(0); + delete m; + return true; } } - -void MDSMonitor::handle_mds_beacon(MMDSBeacon *m) +bool MDSMonitor::preprocess_beacon(MMDSBeacon *m) { - dout(12) << "mds_beacon " << *m - << " from " << m->get_source() - << " " << m->get_source_inst() - << endl; - int from = m->get_source().num(); + dout(12) << "preprocess_beacon " << *m + << " from " << m->get_mds_inst() + << endl; + + // fw to leader? + if (!mon->is_leader()) { + dout(10) << "fw to leader" << endl; + mon->messenger->send_message(m, mon->monmap->get_inst(mon->get_leader())); + return true; + } + + // let's see. + int from = m->get_mds_inst().name.num(); int state = m->get_state(); version_t seq = m->get_seq(); - // initial boot? - bool booted = false; + // can i handle this query without a map update? - // choose an MDS id - if (from >= 0) { - // wants to be (or already is) a specific MDS. - if (mdsmap.is_down(from)) { - dout(10) << "mds_beacon assigning requested mds" << from << endl; - booted = true; - } else if (mdsmap.get_inst(from) != m->get_source_inst()) { - dout(10) << "mds_beacon not assigning requested mds" << from - << ", that mds is up and someone else" << endl; - from = -1; - } - } - if (from < 0) { - // pick a failed mds? - set failed; - mdsmap.get_failed_mds_set(failed); - if (!failed.empty()) { - from = *failed.begin(); - dout(10) << "mds_beacon assigned failed mds" << from << endl; - booted = true; - } - } - if (from < 0) { - // ok, just pick any unused mds id. - for (from=0; ; ++from) { - if (mdsmap.is_dne(from) || - mdsmap.is_out(from)) { - dout(10) << "mds_beacon assigned out|dne mds" << from << endl; - booted = true; - break; - } - } + // boot? + if (state == MDSMap::STATE_BOOT) { + // already booted? + int already = mdsmap.get_addr_rank(m->get_mds_inst().addr); + if (already < 0) + return false; // need to update map + + // already booted. just reply to beacon, as per usual. + from = already; } - - // old beacon? + // reply to beacon if (mdsmap.mds_state_seq[from] > seq) { dout(7) << "mds_beacon " << *m << " has old seq, ignoring" << endl; delete m; - return; + return true; } // reply to beacon? - if (state != MDSMap::STATE_OUT) { + if (state != MDSMap::STATE_STOPPED) { last_beacon[from] = g_clock.now(); // note time - messenger->send_message(new MMDSBeacon(state, seq), - m->get_source_inst()); + mon->messenger->send_message(new MMDSBeacon(m->get_mds_inst(), state, seq), + m->get_mds_inst()); } + + // is there a state change here? + if (mdsmap.mds_state.count(from) == 0 || + mdsmap.mds_state[from] != state) + return false; // yep, need to update map. + + // we're done. + delete m; + return true; +} - // make sure it's in the map - if (booted) { - mdsmap.mds_inst[from].addr = m->get_source_addr(); - mdsmap.mds_inst[from].name = MSG_ADDR_MDS(from); - mdsmap.mds_inc[from]++; - - // someone (new) joined the cluster - mdsmap.same_inst_since = mdsmap.epoch+1; - - // starting -> creating|starting|replay - if (mdsmap.is_degraded() && - !mdsmap.is_failed(from)) { - dout(10) << "mds_beacon currently degraded, mds" << from << " will be standby" << endl; - state = MDSMap::STATE_STANDBY; - } - /* - else if (from >= g_conf.num_mds) { - dout(10) << "mds_beacon already have " << g_conf.num_mds << " mds's, standby (increase with 'mds setnum xxx')" << endl; - state = MDSMap::STATE_STANDBY; - } - */ - else if (state == MDSMap::STATE_STARTING) { +bool MDSMonitor::prepare_update(Message *m) +{ + dout(10) << "prepare_update " << *m << endl; + + switch (m->get_type()) { + + case MSG_MDS_BEACON: + return handle_beacon((MMDSBeacon*)m); + + case MSG_MON_COMMAND: + return handle_command((MMonCommand*)m); + + default: + assert(0); + delete m; + } + + return true; +} + +bool MDSMonitor::should_propose_now() +{ + return true; +} + + +bool MDSMonitor::handle_beacon(MMDSBeacon *m) +{ + // -- this is an update -- + dout(12) << "handle_beacon " << *m + << " from " << m->get_mds_inst() + << endl; + int from = m->get_mds_inst().name.num(); + int state = m->get_state(); + version_t seq = m->get_seq(); + + assert(state != mdsmap.get_state(from)); + + // boot? + if (state == MDSMap::STATE_BOOT) { + // assign a name. + if (from >= 0) { + // wants to be (or already is) a specific MDS. if (mdsmap.is_failed(from)) { - dout(10) << "mds_beacon will recover mds" << from << endl; + dout(10) << "mds_beacon boot: mds" << from << " was failed, replaying" << endl; state = MDSMap::STATE_REPLAY; - } - else if (mdsmap.is_out(from)) { - dout(10) << "mds_beacon will start mds" << from << endl; + } else if (mdsmap.is_stopped(from)) { + dout(10) << "mds_beacon boot: mds" << from << " was stopped, starting" << endl; state = MDSMap::STATE_STARTING; - } - else { - dout(10) << "mds_beacon will create mds" << from << endl; - state = MDSMap::STATE_CREATING; - } + } else if (!mdsmap.have_inst(from) || mdsmap.get_inst(from) != m->get_mds_inst()) { + dout(10) << "mds_beacon boot: mds" << from << " is someone else" << endl; + from = -1; + } + } + if (from < 0) { + from = pending_mdsmap.get_addr_rank(m->get_mds_inst().addr); + if (from >= 0) { + state = pending_mdsmap.mds_state[from]; + dout(10) << "mds_beacon boot: already pending mds" << from + << " " << MDSMap::get_state_name(state) << endl; + delete m; + return false; + } + } + if (from < 0) { + // pick a failed mds? + set failed; + pending_mdsmap.get_failed_mds_set(failed); + if (!failed.empty()) { + from = *failed.begin(); + dout(10) << "mds_beacon boot: assigned failed mds" << from << endl; + state = MDSMap::STATE_REPLAY; + } + } + if (from < 0) { + // ok, just pick any unused mds id. + for (from=0; ; ++from) { + if (pending_mdsmap.is_dne(from)) { + dout(10) << "mds_beacon boot: assigned new mds" << from << endl; + state = MDSMap::STATE_CREATING; + break; + } else if (pending_mdsmap.is_stopped(from)) { + dout(10) << "mds_beacon boot: assigned stopped mds" << from << endl; + state = MDSMap::STATE_STARTING; + break; + } + } } + + assert(state == MDSMap::STATE_CREATING || + state == MDSMap::STATE_STARTING || + state == MDSMap::STATE_REPLAY); + + // put it in the map. + pending_mdsmap.mds_inst[from].addr = m->get_mds_inst().addr; + pending_mdsmap.mds_inst[from].name = MSG_ADDR_MDS(from); + pending_mdsmap.mds_inc[from]++; + + // someone (new) has joined the cluster. + pending_mdsmap.same_inst_since = pending_mdsmap.epoch; } - // can't go from stopping -> active - if (state == MDSMap::STATE_ACTIVE && mdsmap.mds_state[from] == MDSMap::STATE_STOPPING) - state = MDSMap::STATE_STOPPING; // dummy - - // if creating -> active, go to standby instead - if (state == MDSMap::STATE_ACTIVE && mdsmap.is_creating(from)) { - mdsmap.mds_created.insert(from); + // created? + if (state == MDSMap::STATE_ACTIVE && + mdsmap.is_creating(from)) { + pending_mdsmap.mds_created.insert(from); dout(10) << "mds_beacon created mds" << from << endl; - - if (mdsmap.is_degraded()) { - dout(10) << "mds_beacon current degraded, marking mds" << from << " as standby" << endl; - state = MDSMap::STATE_STANDBY; - } + } + + // if starting|creating and degraded|full, go to standby + if ((state == MDSMap::STATE_STARTING || + state == MDSMap::STATE_CREATING || + mdsmap.is_starting(from) || + mdsmap.is_creating(from)) && + (pending_mdsmap.is_degraded() || + pending_mdsmap.is_full())) { + dout(10) << "mds_beacon cluster degraded|full, mds" << from << " will be standby" << endl; + state = MDSMap::STATE_STANDBY; } + // update the map + dout(10) << "mds_beacon mds" << from << " " << MDSMap::get_state_name(mdsmap.mds_state[from]) + << " -> " << MDSMap::get_state_name(state) + << endl; - // did we update the map? - if (mdsmap.mds_state.count(from) == 0 || - mdsmap.mds_state[from] != state) { - // update mds state - dout(10) << "mds_beacon mds" << from << " " << MDSMap::get_state_name(mdsmap.mds_state[from]) - << " -> " << MDSMap::get_state_name(state) - << endl; - // did someone leave the cluster? - if (state == MDSMap::STATE_OUT && mdsmap.mds_state[from] != MDSMap::STATE_OUT) - mdsmap.same_inst_since = mdsmap.epoch+1; - - // change the state - mdsmap.mds_state[from] = state; - if (mdsmap.is_up(from)) - mdsmap.mds_state_seq[from] = seq; - else - mdsmap.mds_state_seq.erase(from); - - issue_map(); - } + // did someone leave the cluster? + if (state == MDSMap::STATE_STOPPED && + !mdsmap.is_stopped(from)) + pending_mdsmap.same_inst_since = pending_mdsmap.epoch; + + // change the state + pending_mdsmap.mds_state[from] = state; + if (pending_mdsmap.is_up(from)) + pending_mdsmap.mds_state_seq[from] = seq; + else + pending_mdsmap.mds_state_seq.erase(from); + + dout(7) << "pending map now:" << endl; + print_map(pending_mdsmap); + paxos->wait_for_commit(new C_Updated(this, from, m)); + + return true; +} + + +void MDSMonitor::_updated(int from, MMDSBeacon *m) +{ + if (m->get_state() == MDSMap::STATE_BOOT) { + dout(10) << "_updated (booted) mds" << from << " " << *m << endl; + mon->osdmon->send_latest(mdsmap.get_inst(from)); + } else { + dout(10) << "_updated mds" << from << " " << *m << endl; + } + if (m->get_state() == MDSMap::STATE_STOPPED) { + // send the map manually (they're out of the map, so they won't get it automatic) + send_latest(m->get_mds_inst()); + } delete m; } -void MDSMonitor::handle_mds_getmap(MMDSGetMap *m) + +bool MDSMonitor::handle_command(MMonCommand *m) { - dout(7) << "mds_getmap from " << m->get_source() << " " << m->get_source_inst() << endl; - if (mdsmap.get_epoch() > 0) - send_full(m->get_source_inst()); - else - awaiting_map.push_back( m->get_source_inst() ); + int r = -EINVAL; + stringstream ss; + + if (m->cmd.size() > 1) { + if (m->cmd[1] == "stop" && m->cmd.size() > 2) { + int who = atoi(m->cmd[2].c_str()); + if (mdsmap.is_active(who)) { + r = 0; + ss << "telling mds" << who << " to stop"; + pending_mdsmap.mds_state[who] = MDSMap::STATE_STOPPING; + } else { + r = -EEXIST; + ss << "mds" << who << " not active (" << mdsmap.get_state_name(mdsmap.get_state(who)) << ")"; + } + } + else if (m->cmd[1] == "set_target_num" && m->cmd.size() > 2) { + pending_mdsmap.target_num = atoi(m->cmd[2].c_str()); + r = 0; + ss << "target_num = " << pending_mdsmap.target_num << endl; + } + } + if (r == -EINVAL) { + ss << "unrecognized command"; + } + + // reply + string rs; + getline(ss,rs); + mon->messenger->send_message(new MMonCommandAck(r, rs), m->get_source_inst()); + delete m; + return r >= 0; } + void MDSMonitor::bcast_latest_mds() { dout(10) << "bcast_latest_mds " << mdsmap.get_epoch() << endl; @@ -351,26 +421,25 @@ void MDSMonitor::bcast_latest_mds() void MDSMonitor::send_full(entity_inst_t dest) { dout(11) << "send_full to " << dest << endl; - messenger->send_message(new MMDSMap(&mdsmap), dest); + mon->messenger->send_message(new MMDSMap(&mdsmap), dest); } -void MDSMonitor::send_current() +void MDSMonitor::send_to_waiting() { - dout(10) << "mds_send_current " << mdsmap.get_epoch() << endl; - for (list::iterator i = awaiting_map.begin(); - i != awaiting_map.end(); + dout(10) << "send_to_waiting " << mdsmap.get_epoch() << endl; + for (list::iterator i = waiting_for_map.begin(); + i != waiting_for_map.end(); i++) send_full(*i); - awaiting_map.clear(); + waiting_for_map.clear(); } void MDSMonitor::send_latest(entity_inst_t dest) { - // FIXME: check if we're locked, etc. - if (mdsmap.get_epoch() > 0) + if (paxos->is_readable()) send_full(dest); else - awaiting_map.push_back(dest); + waiting_for_map.push_back(dest); } @@ -378,6 +447,11 @@ void MDSMonitor::tick() { // make sure mds's are still alive utime_t now = g_clock.now(); + + // ...if i am an active leader + if (!mon->is_leader()) return; + if (!paxos->is_active()) return; + if (now > g_conf.mds_beacon_grace) { utime_t cutoff = now; cutoff -= g_conf.mds_beacon_grace; @@ -396,16 +470,20 @@ void MDSMonitor::tick() // failure! int newstate; switch (mdsmap.get_state(*p)) { + case MDSMap::STATE_STANDBY: + if (mdsmap.has_created(*p)) + newstate = MDSMap::STATE_STOPPED; + else + newstate = MDSMap::STATE_DNE; + break; + case MDSMap::STATE_CREATING: // didn't finish creating newstate = MDSMap::STATE_DNE; break; - case MDSMap::STATE_STANDBY: - if (mdsmap.has_created(*p)) - newstate = MDSMap::STATE_OUT; - else - newstate = MDSMap::STATE_DNE; + case MDSMap::STATE_STARTING: + newstate = MDSMap::STATE_STOPPED; break; case MDSMap::STATE_REPLAY: @@ -416,11 +494,6 @@ void MDSMonitor::tick() newstate = MDSMap::STATE_FAILED; break; - case MDSMap::STATE_STARTING: - case MDSMap::STATE_STOPPED: - newstate = MDSMap::STATE_OUT; - break; - default: assert(0); } @@ -430,8 +503,8 @@ void MDSMonitor::tick() << endl; // update map - mdsmap.mds_state[*p] = newstate; - mdsmap.mds_state_seq.erase(*p); + pending_mdsmap.mds_state[*p] = newstate; + pending_mdsmap.mds_state_seq.erase(*p); changed = true; } } else { @@ -440,20 +513,29 @@ void MDSMonitor::tick() } } - if (changed) { - issue_map(); - } + if (changed) + propose_pending(); } } void MDSMonitor::do_stop() { + // hrm... + if (!mon->is_leader() || + !paxos->is_active()) { + dout(-10) << "do_stop can't stop right now, mdsmap not writeable" << endl; + return; + } + + dout(10) << "do_stop stopping active mds nodes" << endl; + + print_map(mdsmap); for (map::iterator p = mdsmap.mds_state.begin(); p != mdsmap.mds_state.end(); ++p) if (mdsmap.is_active(p->first)) - mdsmap.mds_state[p->first] = MDSMap::STATE_STOPPING; + pending_mdsmap.mds_state[p->first] = MDSMap::STATE_STOPPING; - issue_map(); + propose_pending(); } diff --git a/branches/sage/cephmds2/mon/MDSMonitor.h b/branches/sage/cephmds2/mon/MDSMonitor.h index 5a9dcb65c8484..082423aec33a0 100644 --- a/branches/sage/cephmds2/mon/MDSMonitor.h +++ b/branches/sage/cephmds2/mon/MDSMonitor.h @@ -24,69 +24,71 @@ using namespace std; #include "mds/MDSMap.h" -class Monitor; +#include "PaxosService.h" -class MDSMonitor : public Dispatcher { - Monitor *mon; - Messenger *messenger; - Mutex &lock; +class MMDSBeacon; - // mds maps +class MDSMonitor : public PaxosService { public: - MDSMap mdsmap; - - private: - bufferlist encoded_map; - - list waiting_for_active; - - //map inc_maps; - //MDSMap::Incremental pending_inc; + // mds maps + MDSMap mdsmap; // current + bufferlist mdsmap_bl; // encoded + + MDSMap pending_mdsmap; // current + pending updates + + // my helpers + void print_map(MDSMap &m); + + class C_Updated : public Context { + MDSMonitor *mm; + int mds; + MMDSBeacon *m; + public: + C_Updated(MDSMonitor *a, int b, MMDSBeacon *c) : + mm(a), mds(b), m(c) {} + void finish(int r) { + if (r >= 0) + mm->_updated(mds, m); // success + else + mm->dispatch((Message*)m); // try again + } + }; + + + // service methods + void create_initial(); + bool update_from_paxos(); + void create_pending(); + void encode_pending(bufferlist &bl); - list awaiting_map; + void _updated(int m, MMDSBeacon *m); + + bool preprocess_query(Message *m); // true if processed. + bool prepare_update(Message *m); + bool should_propose_now(); + + bool preprocess_beacon(class MMDSBeacon *m); + bool handle_beacon(class MMDSBeacon *m); + bool handle_command(class MMonCommand *m); // beacons map last_beacon; - bool is_alive(int mds); +public: + MDSMonitor(Monitor *mn, Paxos *p) : PaxosService(mn, p) { } + // sending the map +private: + list waiting_for_map; - // maps - void create_initial(); - void send_current(); // send current map to waiters. - void send_full(entity_inst_t dest); void bcast_latest_mds(); + void send_full(entity_inst_t dest); + void send_to_waiting(); - void issue_map(); - - void save_map(); - void load_map(); - void print_map(); - - //void accept_pending(); // accept pending, new map. - //void send_incremental(epoch_t since, msg_addr_t dest); - - void handle_mds_state(class MMDSState *m); - void handle_mds_beacon(class MMDSBeacon *m); - //void handle_mds_failure(class MMDSFailure *m); - void handle_mds_getmap(class MMDSGetMap *m); - - - - public: - MDSMonitor(Monitor *mn, Messenger *m, Mutex& l) : mon(mn), messenger(m), lock(l) { - } - - void dispatch(Message *m); - void tick(); // check state, take actions - - void election_starting(); - void election_finished(); - +public: void send_latest(entity_inst_t dest); - void handle_command(class MMonCommand *m, int& r, string& rs); - + void tick(); // check state, take actions void do_stop(); }; diff --git a/branches/sage/cephmds2/mon/MonMap.h b/branches/sage/cephmds2/mon/MonMap.h index cd77bbf3488e6..eb18579cd7e99 100644 --- a/branches/sage/cephmds2/mon/MonMap.h +++ b/branches/sage/cephmds2/mon/MonMap.h @@ -24,7 +24,7 @@ class MonMap { public: - epoch_t epoch; // what epoch of the osd cluster descriptor is this + epoch_t epoch; // what epoch/version of the monmap int num_mon; vector mon_inst; @@ -41,7 +41,7 @@ class MonMap { // choice should be stable, unless we explicitly ask for a new one. int pick_mon(bool newmon=false) { if (newmon || (last_mon < 0)) { - last_mon = 0; //last_mon = rand() % num_mon; + last_mon = rand() % num_mon; } return last_mon; } @@ -68,6 +68,7 @@ class MonMap { _decode(mon_inst, blist, off); } + // read from/write to a file int write(char *fn) { // encode bufferlist bl; diff --git a/branches/sage/cephmds2/mon/Monitor.cc b/branches/sage/cephmds2/mon/Monitor.cc index e92756ba084f0..55af22cb3439b 100644 --- a/branches/sage/cephmds2/mon/Monitor.cc +++ b/branches/sage/cephmds2/mon/Monitor.cc @@ -1,242 +1,252 @@ - // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- - // vim: ts=8 sw=2 smarttab - /* - * Ceph - scalable distributed file system - * - * Copyright (C) 2004-2006 Sage Weil - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - - // TODO: missing run() method, which creates the two main timers, refreshTimer and readTimer - - #include "Monitor.h" - - #include "osd/OSDMap.h" - - #include "MonitorStore.h" - - #include "msg/Message.h" - #include "msg/Messenger.h" - - #include "messages/MPing.h" - #include "messages/MPingAck.h" - #include "messages/MGenericMessage.h" - #include "messages/MMonCommand.h" - #include "messages/MMonCommandAck.h" - - #include "messages/MMonPaxos.h" - - #include "common/Timer.h" - #include "common/Clock.h" - - #include "OSDMonitor.h" - #include "MDSMonitor.h" - #include "ClientMonitor.h" - - #include "config.h" - #undef dout - #define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cout << g_clock.now() << " mon" << whoami << (is_starting() ? (const char*)"(starting)":(is_leader() ? (const char*)"(leader)":(is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << " " - #define derr(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cerr << g_clock.now() << " mon" << whoami << (is_starting() ? (const char*)"(starting)":(is_leader() ? (const char*)"(leader)":(is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << " " - - +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ - void Monitor::init() - { - lock.Lock(); +// TODO: missing run() method, which creates the two main timers, refreshTimer and readTimer - dout(1) << "init" << endl; - - // store - char s[80]; - sprintf(s, "mondata/mon%d", whoami); - store = new MonitorStore(s); - - if (g_conf.mkfs) - store->mkfs(); - - store->mount(); - - // create - osdmon = new OSDMonitor(this, messenger, lock); - mdsmon = new MDSMonitor(this, messenger, lock); - clientmon = new ClientMonitor(this, &paxos_clientmap); - - // init paxos - paxos_test.init(); - paxos_osdmap.init(); - paxos_mdsmap.init(); - paxos_clientmap.init(); - - // i'm ready! - messenger->set_dispatcher(this); - - // start ticker - reset_tick(); - - // call election? - if (monmap->num_mon > 1) { - assert(monmap->num_mon != 2); - call_election(); - } else { - // we're standalone. - set q; - q.insert(whoami); - win_election(1, q); - } - - lock.Unlock(); - } - - void Monitor::shutdown() - { - dout(1) << "shutdown" << endl; - - elector.shutdown(); - - // cancel all events - cancel_tick(); - timer.cancel_all(); - timer.join(); - - // stop osds. - for (set::iterator it = osdmon->osdmap.get_osds().begin(); - it != osdmon->osdmap.get_osds().end(); - it++) { - if (osdmon->osdmap.is_down(*it)) continue; - dout(10) << "sending shutdown to osd" << *it << endl; - messenger->send_message(new MGenericMessage(MSG_SHUTDOWN), - osdmon->osdmap.get_inst(*it)); - } - osdmon->mark_all_down(); - - // monitors too. - for (int i=0; inum_mon; i++) - if (i != whoami) - messenger->send_message(new MGenericMessage(MSG_SHUTDOWN), - monmap->get_inst(i)); - - // unmount my local storage - if (store) - delete store; - - // clean up - if (monmap) delete monmap; - if (osdmon) delete osdmon; - if (mdsmon) delete mdsmon; - if (clientmon) delete clientmon; - - // die. - messenger->shutdown(); - delete messenger; - } - - - void Monitor::call_election() - { - if (monmap->num_mon == 1) return; - - dout(10) << "call_election" << endl; - state = STATE_STARTING; - - elector.call_election(); - - osdmon->election_starting(); - //mdsmon->election_starting(); - } - - void Monitor::win_election(epoch_t epoch, set& active) - { - state = STATE_LEADER; - leader = whoami; - mon_epoch = epoch; - quorum = active; - dout(10) << "win_election, epoch " << mon_epoch << " quorum is " << quorum << endl; - - // init paxos - paxos_test.leader_init(); - paxos_mdsmap.leader_init(); - paxos_osdmap.leader_init(); - paxos_clientmap.leader_init(); - - // init - osdmon->election_finished(); - mdsmon->election_finished(); - clientmon->election_finished(); - } - - void Monitor::lose_election(epoch_t epoch, int l) - { - state = STATE_PEON; - mon_epoch = epoch; - leader = l; - dout(10) << "lose_election, epoch " << mon_epoch << " leader is mon" << leader << endl; - - // init paxos - paxos_test.peon_init(); - paxos_mdsmap.peon_init(); - paxos_osdmap.peon_init(); - paxos_clientmap.peon_init(); - } - - - void Monitor::handle_command(MMonCommand *m) - { - dout(0) << "handle_command " << *m << endl; - - int r = -1; - string rs = "unrecognized command"; - - if (!m->cmd.empty()) { - if (m->cmd[0] == "stop") { - r = 0; - rs = "stopping"; - do_stop(); - } - else if (m->cmd[0] == "mds") { - mdsmon->handle_command(m, r, rs); - } - else if (m->cmd[0] == "osd") { - - } - } - - // reply - messenger->send_message(new MMonCommandAck(r, rs), m->get_source_inst()); - delete m; - } - - - void Monitor::do_stop() - { - dout(0) << "do_stop -- shutting down" << endl; - mdsmon->do_stop(); - } - - - void Monitor::dispatch(Message *m) - { - lock.Lock(); - { - switch (m->get_type()) { - - // misc - case MSG_PING_ACK: - handle_ping_ack((MPingAck*)m); - break; - - case MSG_SHUTDOWN: - if (m->get_source().is_osd()) +#include "Monitor.h" + +#include "osd/OSDMap.h" + +#include "MonitorStore.h" + +#include "msg/Message.h" +#include "msg/Messenger.h" + +#include "messages/MPing.h" +#include "messages/MPingAck.h" +#include "messages/MGenericMessage.h" +#include "messages/MMonCommand.h" +#include "messages/MMonCommandAck.h" + +#include "messages/MMonPaxos.h" + +#include "common/Timer.h" +#include "common/Clock.h" + +#include "OSDMonitor.h" +#include "MDSMonitor.h" +#include "ClientMonitor.h" + +#include "config.h" +#undef dout +#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cout << g_clock.now() << " mon" << whoami << (is_starting() ? (const char*)"(starting)":(is_leader() ? (const char*)"(leader)":(is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << " " +#define derr(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cerr << g_clock.now() << " mon" << whoami << (is_starting() ? (const char*)"(starting)":(is_leader() ? (const char*)"(leader)":(is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << " " + + + +void Monitor::init() +{ + lock.Lock(); + + dout(1) << "init" << endl; + + // store + char s[80]; + sprintf(s, "mondata/mon%d", whoami); + store = new MonitorStore(s); + + if (g_conf.mkfs) + store->mkfs(); + + store->mount(); + + // create + osdmon = new OSDMonitor(this, &paxos_osdmap); + mdsmon = new MDSMonitor(this, &paxos_mdsmap); + clientmon = new ClientMonitor(this, &paxos_clientmap); + + // init paxos + paxos_test.init(); + paxos_osdmap.init(); + paxos_mdsmap.init(); + paxos_clientmap.init(); + + // i'm ready! + messenger->set_dispatcher(this); + + // start ticker + reset_tick(); + + // call election? + if (monmap->num_mon > 1) { + assert(monmap->num_mon != 2); + call_election(); + } else { + // we're standalone. + set q; + q.insert(whoami); + win_election(1, q); + } + + lock.Unlock(); +} + +void Monitor::shutdown() +{ + dout(1) << "shutdown" << endl; + + elector.shutdown(); + + if (is_leader()) { + // stop osds. + for (set::iterator it = osdmon->osdmap.get_osds().begin(); + it != osdmon->osdmap.get_osds().end(); + it++) { + if (osdmon->osdmap.is_down(*it)) continue; + dout(10) << "sending shutdown to osd" << *it << endl; + messenger->send_message(new MGenericMessage(MSG_SHUTDOWN), + osdmon->osdmap.get_inst(*it)); + } + osdmon->mark_all_down(); + + // monitors too. + for (int i=0; inum_mon; i++) + if (i != whoami) + messenger->send_message(new MGenericMessage(MSG_SHUTDOWN), + monmap->get_inst(i)); + } + + // cancel all events + cancel_tick(); + timer.cancel_all(); + timer.join(); + + // unmount my local storage + if (store) + delete store; + + // clean up + if (osdmon) delete osdmon; + if (mdsmon) delete mdsmon; + if (clientmon) delete clientmon; + + // die. + messenger->shutdown(); + delete messenger; +} + + +void Monitor::call_election() +{ + if (monmap->num_mon == 1) return; + + dout(10) << "call_election" << endl; + state = STATE_STARTING; + + // tell paxos + paxos_test.election_starting(); + paxos_mdsmap.election_starting(); + paxos_osdmap.election_starting(); + paxos_clientmap.election_starting(); + + // call a new election + elector.call_election(); +} + +void Monitor::win_election(epoch_t epoch, set& active) +{ + state = STATE_LEADER; + leader = whoami; + mon_epoch = epoch; + quorum = active; + dout(10) << "win_election, epoch " << mon_epoch << " quorum is " << quorum << endl; + + // init paxos + paxos_test.leader_init(); + paxos_mdsmap.leader_init(); + paxos_osdmap.leader_init(); + paxos_clientmap.leader_init(); + + // init + osdmon->election_finished(); + mdsmon->election_finished(); + clientmon->election_finished(); +} + +void Monitor::lose_election(epoch_t epoch, int l) +{ + state = STATE_PEON; + mon_epoch = epoch; + leader = l; + dout(10) << "lose_election, epoch " << mon_epoch << " leader is mon" << leader << endl; + + // init paxos + paxos_test.peon_init(); + paxos_mdsmap.peon_init(); + paxos_osdmap.peon_init(); + paxos_clientmap.peon_init(); + + // init + osdmon->election_finished(); + mdsmon->election_finished(); + clientmon->election_finished(); +} + + +void Monitor::handle_command(MMonCommand *m) +{ + dout(0) << "handle_command " << *m << endl; + + int r = -1; + string rs = "unrecognized command"; + + if (!m->cmd.empty()) { + if (m->cmd[0] == "stop") { + r = 0; + rs = "stopping"; + do_stop(); + } + else if (m->cmd[0] == "mds") { + mdsmon->dispatch(m); + return; + } + else if (m->cmd[0] == "osd") { + + } + } + + // reply + messenger->send_message(new MMonCommandAck(r, rs), m->get_source_inst()); + delete m; +} + + +void Monitor::do_stop() +{ + dout(0) << "do_stop -- shutting down" << endl; + mdsmon->do_stop(); +} + + +void Monitor::dispatch(Message *m) +{ + lock.Lock(); + { + switch (m->get_type()) { + + // misc + case MSG_PING_ACK: + handle_ping_ack((MPingAck*)m); + break; + + case MSG_SHUTDOWN: + if (m->get_source().is_osd()) osdmon->dispatch(m); - else + else handle_shutdown(m); - break; - + case MSG_MON_COMMAND: handle_command((MMonCommand*)m); break; @@ -256,13 +266,6 @@ case MSG_MDS_BEACON: case MSG_MDS_GETMAP: mdsmon->dispatch(m); - - // hackish: did all mds's shut down? - if (g_conf.mon_stop_with_last_mds && - mdsmon->mdsmap.get_num_up_or_failed_mds() == 0 && - is_leader()) - shutdown(); - break; // clients @@ -279,7 +282,7 @@ // sanitize if (pm->epoch > mon_epoch) - assert(0); //call_election(); // wtf + call_election(); if (pm->epoch != mon_epoch) { delete pm; break; diff --git a/branches/sage/cephmds2/mon/MonitorStore.cc b/branches/sage/cephmds2/mon/MonitorStore.cc index cbbfba0892898..d260dfd7604e4 100644 --- a/branches/sage/cephmds2/mon/MonitorStore.cc +++ b/branches/sage/cephmds2/mon/MonitorStore.cc @@ -134,6 +134,7 @@ bool MonitorStore::exists_bl_ss(const char *a, const char *b) struct stat st; int r = ::stat(fn, &st); + //dout(15) << "exists_bl stat " << fn << " r=" << r << " errno " << errno << " " << strerror(errno) << endl; return r == 0; } diff --git a/branches/sage/cephmds2/mon/OSDMonitor.cc b/branches/sage/cephmds2/mon/OSDMonitor.cc index 26696c929c236..11b2f59d1771d 100644 --- a/branches/sage/cephmds2/mon/OSDMonitor.cc +++ b/branches/sage/cephmds2/mon/OSDMonitor.cc @@ -36,10 +36,12 @@ #include "config.h" #undef dout -#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cout << g_clock.now() << " mon" << mon->whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".osd(" << (state == STATE_INIT ? (const char*)"init":(state == STATE_SYNC ? (const char*)"sync":(state == STATE_LOCK ? (const char*)"lock":(state == STATE_UPDATING ? (const char*)"updating":(const char*)"?\?")))) << ") e" << osdmap.get_epoch() << " " -#define derr(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cerr << g_clock.now() << " mon" << mon->whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".osd(" << (state == STATE_INIT ? (const char*)"init":(state == STATE_SYNC ? (const char*)"sync":(state == STATE_LOCK ? (const char*)"lock":(state == STATE_UPDATING ? (const char*)"updating":(const char*)"?\?")))) << ") e" << osdmap.get_epoch() << " " +#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cout << g_clock.now() << " mon" << mon->whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".osd(e" << osdmap.get_epoch() << ") " +#define derr(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cerr << g_clock.now() << " mon" << mon->whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".osd(e" << osdmap.get_epoch() << ") " +// FAKING + class C_Mon_FakeOSDFailure : public Context { OSDMonitor *mon; int osd; @@ -51,16 +53,30 @@ public: } }; +void OSDMonitor::fake_osd_failure(int osd, bool down) +{ + if (down) { + dout(1) << "fake_osd_failure DOWN osd" << osd << endl; + pending_inc.new_down[osd] = osdmap.osd_inst[osd]; + } else { + dout(1) << "fake_osd_failure OUT osd" << osd << endl; + pending_inc.new_out.push_back(osd); + } + propose_pending(); + + // fixme + //bcast_latest_osd(); + //bcast_latest_mds(); +} void OSDMonitor::fake_osdmap_update() { dout(1) << "fake_osdmap_update" << endl; - accept_pending(); + propose_pending(); // tell a random osd int osd = rand() % g_conf.num_osd; - send_incremental(osdmap.get_epoch()-1, // ick! FIXME - osdmap.get_inst(osd)); + send_latest(osdmap.get_inst(osd)); } @@ -76,56 +92,28 @@ void OSDMonitor::fake_reorg() pending_inc.new_out.push_back(r); } - accept_pending(); - - // tell him! - send_incremental(osdmap.get_epoch()-1, osdmap.get_inst(r)); - - // do it again? - /* - if (g_conf.num_osd - d > 4 && - g_conf.num_osd - d > g_conf.num_osd/2) - mon->timer.add_event_after(g_conf.fake_osdmap_expand, - new C_Mon_Faker(this)); - */ + propose_pending(); + send_latest(osdmap.get_inst(r)); // after } -/* -void OSDMonitor::init() -{ - // start with blank map - - // load my last state from the store - bufferlist bl; - if (get_map_bl(0, bl)) { // FIXME - // yay! - osdmap.decode(bl); - dout(1) << "init got epoch " << osdmap.get_epoch() << " from store" << endl; - - // set up pending_inc - pending_inc.epoch = osdmap.get_epoch()+1; - } -} -*/ - - - - /************ MAPS ****************/ - void OSDMonitor::create_initial() { - dout(1) << "create_initial generating osdmap from g_conf" << endl; + assert(mon->is_leader()); + assert(paxos->get_version() == 0); + + dout(1) << "create_initial -- creating initial osdmap from g_conf" << endl; // - osdmap.mon_epoch = mon->mon_epoch; - osdmap.ctime = g_clock.now(); + OSDMap newmap; + newmap.mon_epoch = mon->mon_epoch; + newmap.ctime = g_clock.now(); if (g_conf.osd_pg_bits) { - osdmap.set_pg_bits(g_conf.osd_pg_bits); + newmap.set_pg_bits(g_conf.osd_pg_bits); } else { // figure out how many bits worth of osds we have. // 1 osd -> 0 bits @@ -140,12 +128,12 @@ void OSDMonitor::create_initial() } // 7 bits per osd. - osdmap.set_pg_bits(osdbits + 4); // FIXME + newmap.set_pg_bits(osdbits + 4); // FIXME } - // start at epoch 0 until all osds boot - //osdmap.inc_epoch(); // = 1 - //assert(osdmap.get_epoch() == 1); + // start at epoch 1 until all osds boot + newmap.inc_epoch(); // = 1 + assert(newmap.get_epoch() == 1); if (g_conf.num_osd >= 12) { int ndom = g_conf.osd_max_rep; @@ -153,7 +141,7 @@ void OSDMonitor::create_initial() int domid[ndom]; for (int i=0; iadd_item(i, 1.0); //cerr << "osd" << i << " in domain " << dom << endl; i++; @@ -176,40 +165,43 @@ void OSDMonitor::create_initial() //cerr << "dom " << i << " w " << domain[i]->get_weight() << endl; root->add_item(domid[i], domain[i]->get_weight()); } - int nroot = osdmap.crush.add_bucket(root); + int nroot = newmap.crush.add_bucket(root); // rules for (int i=1; i<=ndom; i++) { - osdmap.crush.rules[i].steps.push_back(RuleStep(CRUSH_RULE_TAKE, nroot)); - osdmap.crush.rules[i].steps.push_back(RuleStep(CRUSH_RULE_CHOOSE, i, 1)); - osdmap.crush.rules[i].steps.push_back(RuleStep(CRUSH_RULE_CHOOSE, 1, 0)); - osdmap.crush.rules[i].steps.push_back(RuleStep(CRUSH_RULE_EMIT)); + newmap.crush.rules[i].steps.push_back(RuleStep(CRUSH_RULE_TAKE, nroot)); + newmap.crush.rules[i].steps.push_back(RuleStep(CRUSH_RULE_CHOOSE, i, 1)); + newmap.crush.rules[i].steps.push_back(RuleStep(CRUSH_RULE_CHOOSE, 1, 0)); + newmap.crush.rules[i].steps.push_back(RuleStep(CRUSH_RULE_EMIT)); } // test //vector out; - //osdmap.pg_to_osds(0x40200000110ULL, out); + //newmap.pg_to_osds(0x40200000110ULL, out); } else { // one bucket Bucket *b = new UniformBucket(1, 0); - int root = osdmap.crush.add_bucket(b); + int root = newmap.crush.add_bucket(b); for (int i=0; iadd_item(i, 1.0); } for (int i=1; i<=g_conf.osd_max_rep; i++) { - osdmap.crush.rules[i].steps.push_back(RuleStep(CRUSH_RULE_TAKE, root)); - osdmap.crush.rules[i].steps.push_back(RuleStep(CRUSH_RULE_CHOOSE, i, 0)); - osdmap.crush.rules[i].steps.push_back(RuleStep(CRUSH_RULE_EMIT)); + newmap.crush.rules[i].steps.push_back(RuleStep(CRUSH_RULE_TAKE, root)); + newmap.crush.rules[i].steps.push_back(RuleStep(CRUSH_RULE_CHOOSE, i, 0)); + newmap.crush.rules[i].steps.push_back(RuleStep(CRUSH_RULE_EMIT)); } } if (g_conf.mds_local_osd) { // add mds osds, but don't put them in the crush mapping func - for (int i=0; i @@ -227,215 +219,321 @@ void OSDMonitor::create_initial() dout(0) << "will fake osd" << i->first << " OUT after " << i->second << endl; mon->timer.add_event_after(i->second, new C_Mon_FakeOSDFailure(this, i->first, 0)); } -} - -bool OSDMonitor::get_map_bl(epoch_t epoch, bufferlist& bl) -{ - if (!mon->store->exists_bl_sn("osdmap_full", epoch)) - return false; - int r = mon->store->get_bl_sn(bl, "osdmap_full", epoch); - assert(r > 0); - return true; + // encode into pending incremental + newmap.encode(pending_inc.fullmap); } -bool OSDMonitor::get_inc_map_bl(epoch_t epoch, bufferlist& bl) +bool OSDMonitor::update_from_paxos() { - if (!mon->store->exists_bl_sn("osdmap_inc", epoch)) - return false; - int r = mon->store->get_bl_sn(bl, "osdmap_inc", epoch); - assert(r > 0); - return true; -} + assert(paxos->is_active()); + version_t paxosv = paxos->get_version(); + dout(15) << "update_from_paxos paxos e " << paxosv + << ", my e " << osdmap.epoch << endl; -void OSDMonitor::save_map() -{ - bufferlist bl; - osdmap.encode(bl); + if (paxosv == osdmap.epoch) return true; + assert(paxosv >= osdmap.epoch); - mon->store->put_bl_sn(bl, "osdmap_full", osdmap.get_epoch()); - mon->store->put_int(osdmap.get_epoch(), "osd_epoch"); + if (osdmap.epoch == 0 && paxosv > 1) { + // startup: just load latest full map + epoch_t lastfull = mon->store->get_int("osdmap_full","last_epoch"); + if (lastfull) { + dout(7) << "update_from_paxos startup: loading latest full map e" << lastfull << endl; + bufferlist bl; + mon->store->get_bl_sn(bl, "osdmap_full", lastfull); + osdmap.decode(bl); + } + } + + // walk through incrementals + while (paxosv > osdmap.epoch) { + bufferlist bl; + bool success = paxos->read(osdmap.epoch+1, bl); + assert(success); + + dout(7) << "update_from_paxos applying incremental " << osdmap.epoch+1 << endl; + OSDMap::Incremental inc; + int off = 0; + inc.decode(bl, off); + osdmap.apply_incremental(inc); + + // write out the full map, too. + bl.clear(); + osdmap.encode(bl); + mon->store->put_bl_sn(bl, "osdmap_full", osdmap.epoch); + } + mon->store->put_int(osdmap.epoch, "osdmap_full","last_epoch"); + + // new map! + bcast_latest_mds(); + + return true; } -void OSDMonitor::save_inc_map(OSDMap::Incremental &inc) + +void OSDMonitor::create_pending() { - bufferlist bl; - osdmap.encode(bl); + pending_inc = OSDMap::Incremental(osdmap.epoch+1); + dout(10) << "create_pending e " << pending_inc.epoch + << endl; +} - bufferlist incbl; - inc.encode(incbl); +void OSDMonitor::encode_pending(bufferlist &bl) +{ + dout(10) << "encode_pending e " << pending_inc.epoch + << endl; + + // finish up pending_inc + pending_inc.ctime = g_clock.now(); + pending_inc.mon_epoch = mon->mon_epoch; + + // tell me about it + for (map::iterator i = pending_inc.new_up.begin(); + i != pending_inc.new_up.end(); + i++) { + dout(0) << " osd" << i->first << " UP " << i->second << endl; + derr(0) << " osd" << i->first << " UP " << i->second << endl; + } + for (map::iterator i = pending_inc.new_down.begin(); + i != pending_inc.new_down.end(); + i++) { + dout(0) << " osd" << i->first << " DOWN " << i->second << endl; + derr(0) << " osd" << i->first << " DOWN " << i->second << endl; + mon->messenger->mark_down(i->second.addr); + } + for (list::iterator i = pending_inc.new_in.begin(); + i != pending_inc.new_in.end(); + i++) { + dout(0) << " osd" << *i << " IN" << endl; + derr(0) << " osd" << *i << " IN" << endl; + } + for (list::iterator i = pending_inc.new_out.begin(); + i != pending_inc.new_out.end(); + i++) { + dout(0) << " osd" << *i << " OUT" << endl; + derr(0) << " osd" << *i << " OUT" << endl; + } - mon->store->put_bl_sn(bl, "osdmap_full", osdmap.get_epoch()); - mon->store->put_bl_sn(incbl, "osdmap_inc", osdmap.get_epoch()); - mon->store->put_int(osdmap.get_epoch(), "osd_epoch"); + // encode + assert(paxos->get_version() + 1 == pending_inc.epoch); + pending_inc.encode(bl); } +// ------------- -void OSDMonitor::dispatch(Message *m) +bool OSDMonitor::preprocess_query(Message *m) { + dout(10) << "preprocess_query " << *m << " from " << m->get_source_inst() << endl; + switch (m->get_type()) { - - // services + // READs case MSG_OSD_GETMAP: handle_osd_getmap((MOSDGetMap*)m); - break; + return true; + + // damp updates case MSG_OSD_FAILURE: - handle_osd_failure((MOSDFailure*)m); - break; + return preprocess_failure((MOSDFailure*)m); case MSG_OSD_BOOT: - handle_osd_boot((MOSDBoot*)m); - break; + return preprocess_boot((MOSDBoot*)m); + /* case MSG_OSD_IN: - handle_osd_in((MOSDIn*)m); - break; + return preprocess_in((MOSDIn*)m); case MSG_OSD_OUT: - handle_osd_out((MOSDOut*)m); - break; - - // replication - case MSG_MON_OSDMAP_INFO: - handle_info((MMonOSDMapInfo*)m); - break; - case MSG_MON_OSDMAP_LEASE: - handle_lease((MMonOSDMapLease*)m); - break; - case MSG_MON_OSDMAP_LEASE_ACK: - handle_lease_ack((MMonOSDMapLeaseAck*)m); - break; - case MSG_MON_OSDMAP_UPDATE_PREPARE: - handle_update_prepare((MMonOSDMapUpdatePrepare*)m); - break; - case MSG_MON_OSDMAP_UPDATE_ACK: - handle_update_ack((MMonOSDMapUpdateAck*)m); - break; - case MSG_MON_OSDMAP_UPDATE_COMMIT: - handle_update_commit((MMonOSDMapUpdateCommit*)m); - break; + return preprocess_out((MOSDOut*)m); + */ default: assert(0); + delete m; + return true; + } +} + +bool OSDMonitor::prepare_update(Message *m) +{ + dout(10) << "prepare_update " << *m << " from " << m->get_source_inst() << endl; + + switch (m->get_type()) { + // damp updates + case MSG_OSD_FAILURE: + return prepare_failure((MOSDFailure*)m); + case MSG_OSD_BOOT: + return prepare_boot((MOSDBoot*)m); + + /* + case MSG_OSD_IN: + return prepare_in((MOSDIn*)m); + case MSG_OSD_OUT: + return prepare_out((MOSDOut*)m); + */ + + default: + assert(0); + delete m; } + + return false; +} + +bool OSDMonitor::should_propose_now() +{ + // don't propose initial map until _all_ osds boot. + //dout(10) << "should_propose_now " << pending_inc.new_up.size() << " vs " << osdmap.get_osds().size() << endl; + if (osdmap.epoch == 1 && + pending_inc.new_up.size() < osdmap.get_osds().size()) + return false; // not all up (yet) + + // FIXME do somethihng smart here. + return true; } -void OSDMonitor::handle_osd_failure(MOSDFailure *m) +// --------------------------- +// READs + +void OSDMonitor::handle_osd_getmap(MOSDGetMap *m) { - dout(1) << "osd failure: " << m->get_failed() << " from " << m->get_source() << endl; + dout(7) << "osd_getmap from " << m->get_source() << " since " << m->get_since() << endl; - // FIXME - // take their word for it - int from = m->get_failed().name.num(); - if (osdmap.is_up(from) && - (osdmap.osd_inst.count(from) == 0 || - osdmap.osd_inst[from] == m->get_failed())) { - pending_inc.new_down[from] = m->get_failed(); - - if (osdmap.is_in(from)) - down_pending_out[from] = g_clock.now(); - - //awaiting_maps[pending_inc.epoch][m->get_source()] = - - accept_pending(); - - send_incremental(m->get_epoch(), m->get_source_inst()); - - send_waiting(); - bcast_latest_mds(); - } + //if (m->get_since()) + send_incremental(m->get_since(), m->get_source_inst()); + //else + //send_full(m->get_source_inst()); delete m; } -void OSDMonitor::fake_osd_failure(int osd, bool down) + +// --------------------------- +// UPDATEs + +// failure -- + +bool OSDMonitor::preprocess_failure(MOSDFailure *m) { - if (down) { - dout(1) << "fake_osd_failure DOWN osd" << osd << endl; - pending_inc.new_down[osd] = osdmap.osd_inst[osd]; - } else { - dout(1) << "fake_osd_failure OUT osd" << osd << endl; - pending_inc.new_out.push_back(osd); + int badboy = m->get_failed().name.num(); + + // weird? + if (!osdmap.have_inst(badboy)) { + dout(5) << "preprocess_failure dne(/dup?): " << m->get_failed() << ", from " << m->get_from() << endl; + send_incremental(m->get_epoch(), m->get_from()); + return true; } - accept_pending(); - bcast_latest_osd(); - bcast_latest_mds(); + if (osdmap.get_inst(badboy) != m->get_failed()) { + dout(5) << "preprocess_failure wrong osd: report " << m->get_failed() << " != map's " << osdmap.get_inst(badboy) + << ", from " << m->get_from() << endl; + send_incremental(m->get_epoch(), m->get_from()); + return true; + } + // already reported? + if (osdmap.is_down(badboy)) { + dout(5) << "preprocess_failure dup: " << m->get_failed() << ", from " << m->get_from() << endl; + send_incremental(m->get_epoch(), m->get_from()); + return true; + } + + dout(10) << "preprocess_failure new: " << m->get_failed() << ", from " << m->get_from() << endl; + return false; } -void OSDMonitor::mark_all_down() +bool OSDMonitor::prepare_failure(MOSDFailure *m) { - dout(7) << "mark_all_down" << endl; + dout(1) << "prepare_failure " << m->get_failed() << " from " << m->get_from() << endl; + + // FIXME + // take their word for it + int badboy = m->get_failed().name.num(); + assert(osdmap.is_up(badboy)); + assert(osdmap.osd_inst[badboy] == m->get_failed()); + + pending_inc.new_down[badboy] = m->get_failed(); + + if (osdmap.is_in(badboy)) + down_pending_out[badboy] = g_clock.now(); - for (set::iterator it = osdmap.get_osds().begin(); - it != osdmap.get_osds().end(); - it++) { - if (osdmap.is_down(*it)) continue; - pending_inc.new_down[*it] = osdmap.get_inst(*it); - } - accept_pending(); + paxos->wait_for_commit(new C_Reported(this, m)); + + return true; } +void OSDMonitor::_reported_failure(MOSDFailure *m) +{ + dout(7) << "_reported_failure on " << m->get_failed() << ", telling " << m->get_from() << endl; + send_latest(m->get_from(), m->get_epoch()); +} +// boot -- -void OSDMonitor::handle_osd_boot(MOSDBoot *m) +bool OSDMonitor::preprocess_boot(MOSDBoot *m) { - dout(7) << "osd_boot from " << m->get_source() << endl; - assert(m->get_source().is_osd()); - int from = m->get_source().num(); + assert(m->inst.name.is_osd()); + int from = m->inst.name.num(); - if (osdmap.get_epoch() == 0) { - // waiting for boot! - osdmap.osd_inst[from] = m->get_source_inst(); - - if (osdmap.osd_inst.size() == osdmap.osds.size()) { - dout(-7) << "osd_boot all osds booted." << endl; - osdmap.inc_epoch(); - - save_map(); - - pending_inc.epoch = osdmap.get_epoch()+1; // 2 - - bcast_latest_osd(); - bcast_latest_mds(); - send_waiting(); - } else { - dout(7) << "osd_boot waiting for " - << (osdmap.osds.size() - osdmap.osd_inst.size()) - << " osds to boot" << endl; - } + // already booted? + if (osdmap.is_up(from) && + osdmap.get_inst(from) == m->inst) { + // yup. + dout(7) << "preprocess_boot dup from " << m->inst << endl; + _booted(m); + return true; + } + + dout(10) << "preprocess_boot from " << m->inst << endl; + return false; +} +bool OSDMonitor::prepare_boot(MOSDBoot *m) +{ + dout(7) << "prepare_boot from " << m->inst << endl; + assert(m->inst.name.is_osd()); + int from = m->inst.name.num(); + + // does this osd exist? + if (!osdmap.exists(from)) { + dout(1) << "boot from non-existent osd" << from << endl; delete m; - return; + return true; } - + // already up? mark down first? if (osdmap.is_up(from)) { + assert(osdmap.get_inst(from) != m->inst); // preproces should have caught it + + // mark previous guy down pending_inc.new_down[from] = osdmap.osd_inst[from]; - accept_pending(); } - // mark up. - down_pending_out.erase(from); - assert(osdmap.is_down(from)); - pending_inc.new_up[from] = m->get_source_inst(); + // mark new guy up. + down_pending_out.erase(from); // if any + pending_inc.new_up[from] = m->inst; // mark in? if (osdmap.out_osds.count(from)) pending_inc.new_in.push_back(from); - accept_pending(); - - // the booting osd will spread word - send_incremental(m->sb.current_epoch, m->get_source_inst()); - delete m; + // wait + paxos->wait_for_commit(new C_Booted(this, m)); - // tell mds - bcast_latest_mds(); + return true; +} + +void OSDMonitor::_booted(MOSDBoot *m) +{ + dout(7) << "_booted " << m->inst << endl; + send_latest(m->inst, m->sb.current_epoch); + delete m; } + +// in -- + +/* void OSDMonitor::handle_osd_in(MOSDIn *m) { dout(7) << "osd_in from " << m->get_source() << endl; @@ -457,74 +555,16 @@ void OSDMonitor::handle_osd_out(MOSDOut *m) send_incremental(m->map_epoch, m->get_source_inst()); } } - -void OSDMonitor::handle_osd_getmap(MOSDGetMap *m) -{ - dout(7) << "osd_getmap from " << m->get_source() << " since " << m->get_since() << endl; - - if (osdmap.get_epoch() == 0) { - awaiting_map[m->get_source()].first = m->get_source_inst(); - awaiting_map[m->get_source()].second = m->get_since(); - } else { - //if (m->get_since()) - send_incremental(m->get_since(), m->get_source_inst()); - //else - //send_full(m->get_source(), m->get_source_inst()); - } - delete m; -} - +*/ -void OSDMonitor::accept_pending() -{ - dout(-10) << "accept_pending " << osdmap.get_epoch() << " -> " << pending_inc.epoch << endl; - // accept pending into a new map! - pending_inc.ctime = g_clock.now(); - pending_inc.mon_epoch = mon->mon_epoch; +// --------------- +// map helpers - // advance! - osdmap.apply_incremental(pending_inc); - - // save it. - save_inc_map( pending_inc ); - - // tell me about it - for (map::iterator i = pending_inc.new_up.begin(); - i != pending_inc.new_up.end(); - i++) { - dout(0) << "osd" << i->first << " UP " << i->second << endl; - derr(0) << "osd" << i->first << " UP " << i->second << endl; - } - for (map::iterator i = pending_inc.new_down.begin(); - i != pending_inc.new_down.end(); - i++) { - dout(0) << "osd" << i->first << " DOWN " << i->second << endl; - derr(0) << "osd" << i->first << " DOWN " << i->second << endl; - messenger->mark_down(i->second.addr); - } - for (list::iterator i = pending_inc.new_in.begin(); - i != pending_inc.new_in.end(); - i++) { - dout(0) << "osd" << *i << " IN" << endl; - derr(0) << "osd" << *i << " IN" << endl; - } - for (list::iterator i = pending_inc.new_out.begin(); - i != pending_inc.new_out.end(); - i++) { - dout(0) << "osd" << *i << " OUT" << endl; - derr(0) << "osd" << *i << " OUT" << endl; - } - - // clear new pending - OSDMap::Incremental next(osdmap.get_epoch() + 1); - pending_inc = next; -} - -void OSDMonitor::send_waiting() +void OSDMonitor::send_to_waiting() { - dout(10) << "send_waiting " << osdmap.get_epoch() << endl; + dout(10) << "send_to_waiting " << osdmap.get_epoch() << endl; for (map >::iterator i = awaiting_map.begin(); i != awaiting_map.end(); @@ -536,26 +576,31 @@ void OSDMonitor::send_waiting() } } - -void OSDMonitor::send_latest(entity_inst_t who) +void OSDMonitor::send_latest(entity_inst_t who, epoch_t since) { - // FIXME this is super naive - if (osdmap.get_epoch() == 0) { - awaiting_map[who.name].first = who; - awaiting_map[who.name].second = 0; + if (paxos->is_readable()) { + dout(5) << "send_latest to " << who << " now" << endl; + if (since == (epoch_t)(-1)) + send_full(who); + else + send_incremental(since, who); } else { - send_full(who); + dout(5) << "send_latest to " << who << " later" << endl; + awaiting_map[who.name].first = who; + awaiting_map[who.name].second = since; } } + void OSDMonitor::send_full(entity_inst_t who) { - messenger->send_message(new MOSDMap(&osdmap), who); + dout(5) << "send_full to " << who << endl; + mon->messenger->send_message(new MOSDMap(&osdmap), who); } void OSDMonitor::send_incremental(epoch_t since, entity_inst_t dest) { - dout(5) << "osd_send_incremental " << since << " -> " << osdmap.get_epoch() + dout(5) << "send_incremental " << since << " -> " << osdmap.get_epoch() << " to " << dest << endl; MOSDMap *m = new MOSDMap; @@ -564,12 +609,12 @@ void OSDMonitor::send_incremental(epoch_t since, entity_inst_t dest) e > since; e--) { bufferlist bl; - if (get_inc_map_bl(e, bl)) { - dout(10) << "osd_send_incremental inc " << e << endl; + if (mon->store->get_bl_sn(bl, "osdmap", e) > 0) { + dout(20) << "send_incremental inc " << e << " " << bl.length() << " bytes" << endl; m->incremental_maps[e] = bl; } - else if (get_map_bl(e, bl)) { - dout(10) << "osd_send_incremental full " << e << endl; + else if (mon->store->get_bl_sn(bl, "osdmap_full", e) > 0) { + dout(20) << "send_incremental full " << e << endl; m->maps[e] = bl; } else { @@ -577,11 +622,10 @@ void OSDMonitor::send_incremental(epoch_t since, entity_inst_t dest) } } - messenger->send_message(m, dest); + mon->messenger->send_message(m, dest); } - void OSDMonitor::bcast_latest_mds() { epoch_t e = osdmap.get_epoch(); @@ -614,6 +658,24 @@ void OSDMonitor::bcast_latest_osd() } } +void OSDMonitor::bcast_full_osd() +{ + epoch_t e = osdmap.get_epoch(); + dout(1) << "bcast_full_osd epoch " << e << endl; + + // tell osds + set osds; + osdmap.get_all_osds(osds); + for (set::iterator it = osds.begin(); + it != osds.end(); + it++) { + if (osdmap.is_down(*it)) continue; + send_full(osdmap.get_inst(*it)); + } +} + + +// TICK void OSDMonitor::tick() @@ -639,281 +701,93 @@ void OSDMonitor::tick() pending_inc.new_out.push_back( *i ); } if (!mark_out.empty()) { - accept_pending(); - - // hrmpf. bcast map for now. FIXME FIXME. - bcast_latest_osd(); + propose_pending(); } } -void OSDMonitor::election_starting() -{ - dout(10) << "election_starting" << endl; -} -void OSDMonitor::election_finished() -{ - dout(10) << "election_finished" << endl; - if (mon->is_leader()) { - if (g_conf.mkfs) { - create_initial(); - save_map(); - } else { - // - epoch_t epoch = mon->store->get_int("osd_epoch"); - dout(10) << " last epoch was " << epoch << endl; - bufferlist bl, blinc; - int r = mon->store->get_bl_sn(bl, "osdmap_full", epoch); - assert(r>0); - osdmap.decode(bl); - // pending_inc - pending_inc.epoch = epoch+1; - } - } +/* +void OSDMonitor::init() +{ + // start with blank map - /* - state = STATE_INIT; + // load my last state from the store + bufferlist bl; + if (get_map_bl(0, bl)) { // FIXME + // yay! + osdmap.decode(bl); + dout(1) << "init got epoch " << osdmap.get_epoch() << " from store" << endl; - // map? - if (osdmap.get_epoch() == 0 && - mon->is_leader()) { - create_initial(); + // set up pending_inc + pending_inc.epoch = osdmap.get_epoch()+1; } +} +*/ - if (mon->is_leader()) { - // leader. - if (mon->monmap->num_mon == 1) { - // hmm, it's just me! - state = STATE_SYNC; - } - } - else if (mon->is_peon()) { - // peon. send info - //messenger->send_message(new MMonOSDMapInfo(osdmap.epoch, osdmap.mon_epoch), - // mon->monmap->get_inst(mon->leader)); - } - */ -} +void OSDMonitor::mark_all_down() +{ + assert(mon->is_leader()); + dout(7) << "mark_all_down" << endl; -void OSDMonitor::handle_info(MMonOSDMapInfo *m) -{ - dout(10) << "handle_info from " << m->get_source() - << " epoch " << m->get_epoch() << " in mon_epoch " << m->get_mon_epoch() - << endl; - - epoch_t epoch = m->get_epoch(); - - // did they have anything? - if (epoch > 0) { - // make sure it's current. - if (epoch == osdmap.get_epoch()) { - if (osdmap.mon_epoch != m->get_mon_epoch()) { - dout(10) << "handle_info had divergent epoch " << m->get_epoch() - << ", mon_epoch " << m->get_mon_epoch() << " != " << osdmap.mon_epoch << endl; - epoch--; - } - } else { - bufferlist bl; - get_map_bl(epoch, bl); - - OSDMap old; - old.decode(bl); - - if (old.mon_epoch != m->get_mon_epoch()) { - dout(10) << "handle_info had divergent epoch " << m->get_epoch() - << ", mon_epoch " << m->get_mon_epoch() << " != " << old.mon_epoch << endl; - epoch--; - } - } + for (set::iterator it = osdmap.get_osds().begin(); + it != osdmap.get_osds().end(); + it++) { + if (osdmap.is_down(*it)) continue; + pending_inc.new_down[*it] = osdmap.get_inst(*it); } - - // bring up to date - if (epoch < osdmap.get_epoch()) - send_incremental(epoch, m->get_source_inst()); - - delete m; + + propose_pending(); } -void OSDMonitor::issue_leases() -{ - dout(10) << "issue_leases" << endl; - assert(mon->is_leader()); - // set lease endpoint - lease_expire = g_clock.now(); - lease_expire += g_conf.mon_lease; - pending_ack.clear(); - - for (set::iterator i = mon->quorum.begin(); - i != mon->quorum.end(); - i++) { - if (*i == mon->whoami) continue; - messenger->send_message(new MMonOSDMapLease(osdmap.get_epoch(), lease_expire), - mon->monmap->get_inst(*i)); - pending_ack.insert(*i); - } -} -void OSDMonitor::handle_lease(MMonOSDMapLease *m) -{ - if (m->get_epoch() != osdmap.get_epoch() + 1) { - dout(10) << "map_lease from " << m->get_source() - << " on epoch " << m->get_epoch() << ", but i am " << osdmap.get_epoch() << endl; - assert(0); - delete m; - return; - } - - dout(10) << "map_lease from " << m->get_source() << " expires " << lease_expire << endl; - lease_expire = m->get_lease_expire(); - - delete m; -} -void OSDMonitor::handle_lease_ack(MMonOSDMapLeaseAck *m) -{ - // right epoch? - if (m->get_epoch() != osdmap.get_epoch()) { - dout(10) << "map_lease_ack from " << m->get_source() - << " on old epoch " << m->get_epoch() << ", dropping" << endl; - delete m; - return; - } - - // within time limit? - if (g_clock.now() >= lease_expire) { - dout(10) << "map_lease_ack from " << m->get_source() - << ", but lease expired, calling election" << endl; - mon->call_election(); - delete m; - return; - } - - assert(m->get_source().is_mon()); - int from = m->get_source().num(); - assert(pending_ack.count(from)); - pending_ack.erase(from); - if (pending_ack.empty()) { - dout(10) << "map_lease_ack from " << m->get_source() - << ", last one" << endl; - } else { - dout(10) << "map_lease_ack from " << m->get_source() - << ", still waiting on " << pending_ack << endl; - } - - delete m; -} -void OSDMonitor::update_map() -{ - // lock map - state = STATE_UPDATING; - pending_ack.clear(); - - // set lease endpoint - lease_expire += g_conf.mon_lease; - // send prepare - epoch_t epoch = osdmap.get_epoch(); - bufferlist map_bl, inc_map_bl; - if (!get_inc_map_bl(epoch, inc_map_bl)) - get_map_bl(epoch, map_bl); - for (set::iterator i = mon->quorum.begin(); - i != mon->quorum.end(); - i++) { - if (*i == mon->whoami) continue; - messenger->send_message(new MMonOSDMapUpdatePrepare(epoch, - map_bl, inc_map_bl), - mon->monmap->get_inst(*i)); - pending_ack.insert(*i); - } -} -void OSDMonitor::handle_update_prepare(MMonOSDMapUpdatePrepare *m) -{ - dout(10) << "map_update_prepare from " << m->get_source() << " epoch " << m->get_epoch() << endl; - // accept map - assert(m->get_epoch() == osdmap.get_epoch() + 1); - - if (m->inc_map_bl.length()) { - int off = 0; - pending_inc.decode(m->inc_map_bl, off); - accept_pending(); - } else { - osdmap.decode(m->map_bl); - } - - // state - state = STATE_LOCK; - //lease_expire = m->lease_expire; - - // ack - messenger->send_message(new MMonOSDMapUpdateAck(osdmap.get_epoch()), - m->get_source_inst()); - delete m; -} +/* -void OSDMonitor::handle_update_ack(MMonOSDMapUpdateAck *m) + +void OSDMonitor::election_finished() { - /* - // right epoch? - if (m->get_epoch() != osdmap.get_epoch()) { - dout(10) << "map_update_ack from " << m->get_source() - << " on old epoch " << m->get_epoch() << ", dropping" << endl; - delete m; - return; - } + dout(10) << "election_finished" << endl; - // within time limit? - if (g_clock.now() >= lease_expire) { - dout(10) << "map_update_ack from " << m->get_source() - << ", but lease expired, calling election" << endl; - state = STATE_SYNC; - mon->call_election(); - return; - } + if (mon->is_leader()) { + if (g_conf.mkfs) { + create_initial(); + save_map(); + } else { + // + epoch_t epoch = mon->store->get_int("osd_epoch"); + dout(10) << " last epoch was " << epoch << endl; + bufferlist bl, blinc; + int r = mon->store->get_bl_sn(bl, "osdmap_full", epoch); + assert(r>0); + osdmap.decode(bl); - assert(m->get_source().is_mon()); - int from = m->get_source().num(); + // pending_inc + pending_inc.epoch = epoch+1; + } - assert(pending_lease_ack.count(from)); - pending_lease_ack.erase(from); - - if (pending_lease_ack.empty()) { - dout(10) << "map_update_ack from " << m->get_source() - << ", last one" << endl; - state = STATE_SYNC; - - // send lease commit - for (map::iterator i = mon->quorum.begin(); - i != mon->quorum.end(); - i++) { - if (i == mon->whoami) continue; - messenger->send_message(new MMonOSDMapLeaseCommit(osdmap), - MSG_ADDR_MON(*i), mon->monmap->get_inst(*i)); - } - } else { - dout(10) << "map_update_ack from " << m->get_source() - << ", still waiting on " << pending_lease_ack << endl; } -*/ -} -void OSDMonitor::handle_update_commit(MMonOSDMapUpdateCommit *m) -{ } + + + +*/ diff --git a/branches/sage/cephmds2/mon/OSDMonitor.h b/branches/sage/cephmds2/mon/OSDMonitor.h index 000a79f4024bc..59424a6fbe9e8 100644 --- a/branches/sage/cephmds2/mon/OSDMonitor.h +++ b/branches/sage/cephmds2/mon/OSDMonitor.h @@ -25,85 +25,96 @@ using namespace std; #include "osd/OSDMap.h" -class Monitor; +#include "PaxosService.h" -class OSDMonitor : public Dispatcher { - Monitor *mon; - Messenger *messenger; - Mutex &lock; +class Monitor; +class MOSDBoot; - // osd maps +class OSDMonitor : public PaxosService { public: OSDMap osdmap; private: map > awaiting_map; - - void create_initial(); - bool get_map_bl(epoch_t epoch, bufferlist &bl); - bool get_inc_map_bl(epoch_t epoch, bufferlist &bl); - - void save_map(); - void save_inc_map(OSDMap::Incremental &inc); // [leader] OSDMap::Incremental pending_inc; map down_pending_out; // osd down -> out - set pending_ack; + // svc + void create_initial(); + bool update_from_paxos(); + void create_pending(); // prepare a new pending + void encode_pending(bufferlist &bl); - // we are distributed - const static int STATE_INIT = 0; // startup - const static int STATE_SYNC = 1; // sync map copy (readonly) - const static int STATE_LOCK = 2; // [peon] map locked - const static int STATE_UPDATING = 3; // [leader] map locked, waiting for peon ack + void handle_query(Message *m); + bool preprocess_query(Message *m); // true if processed. + bool prepare_update(Message *m); + bool should_propose_now(); - int state; - utime_t lease_expire; // when lease expires + // ... + bool get_map_bl(epoch_t epoch, bufferlist &bl); + bool get_inc_map_bl(epoch_t epoch, bufferlist &bl); - //void init(); - - // maps - void accept_pending(); // accept pending, new map. - void send_waiting(); // send current map to waiters. + void send_to_waiting(); // send current map to waiters. void send_full(entity_inst_t dest); void send_incremental(epoch_t since, entity_inst_t dest); void bcast_latest_mds(); void bcast_latest_osd(); - - void update_map(); - - void handle_osd_boot(class MOSDBoot *m); - void handle_osd_in(class MOSDIn *m); - void handle_osd_out(class MOSDOut *m); - void handle_osd_failure(class MOSDFailure *m); + void bcast_full_osd(); + void handle_osd_getmap(class MOSDGetMap *m); - void handle_info(class MMonOSDMapInfo*); - void handle_lease(class MMonOSDMapLease*); - void handle_lease_ack(class MMonOSDMapLeaseAck*); - void handle_update_prepare(class MMonOSDMapUpdatePrepare*); - void handle_update_ack(class MMonOSDMapUpdateAck*); - void handle_update_commit(class MMonOSDMapUpdateCommit*); + bool preprocess_failure(class MOSDFailure *m); + bool prepare_failure(class MOSDFailure *m); + void _reported_failure(MOSDFailure *m); + + bool preprocess_boot(class MOSDBoot *m); + bool prepare_boot(class MOSDBoot *m); + void _booted(MOSDBoot *m); + + class C_Booted : public Context { + OSDMonitor *cmon; + MOSDBoot *m; + public: + C_Booted(OSDMonitor *cm, MOSDBoot *m_) : + cmon(cm), m(m_) {} + void finish(int r) { + if (r >= 0) + cmon->_booted(m); + else + cmon->dispatch((Message*)m); + } + }; + class C_Reported : public Context { + OSDMonitor *cmon; + MOSDFailure *m; + public: + C_Reported(OSDMonitor *cm, MOSDFailure *m_) : + cmon(cm), m(m_) {} + void finish(int r) { + if (r >= 0) + cmon->_reported_failure(m); + else + cmon->dispatch((Message*)m); + } + }; + + bool preprocess_in(class MOSDIn *m); + bool prepare_in(class MOSDIn *m); + + bool preprocess_out(class MOSDOut *m); + bool prepare_out(class MOSDOut *m); public: - OSDMonitor(Monitor *mn, Messenger *m, Mutex& l) : - mon(mn), messenger(m), lock(l), - state(STATE_SYNC) { - //init(); - } + OSDMonitor(Monitor *mn, Paxos *p) : + PaxosService(mn, p) { } - void dispatch(Message *m); void tick(); // check state, take actions - void election_starting(); // abort whatever. - void election_finished(); // reinitialize whatever. - - void issue_leases(); - void mark_all_down(); - void send_latest(entity_inst_t i); + void send_latest(entity_inst_t i, epoch_t since=(epoch_t)(-1)); void fake_osd_failure(int osd, bool down); void fake_osdmap_update(); diff --git a/branches/sage/cephmds2/mon/Paxos.cc b/branches/sage/cephmds2/mon/Paxos.cc index f83bdc57b21ba..0ecf0f5a6caf8 100644 --- a/branches/sage/cephmds2/mon/Paxos.cc +++ b/branches/sage/cephmds2/mon/Paxos.cc @@ -20,8 +20,8 @@ #include "config.h" #undef dout -#define derr(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cerr << g_clock.now() << " mon" << whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".paxos(" << machine_name << " " << get_statename(state) << " lc " << last_committed << ") " -#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cout << g_clock.now() << " mon" << whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".paxos(" << machine_name << " " << get_statename(state) << " lc " << last_committed << ") " +#define derr(l) if (l<=g_conf.debug || l<=g_conf.debug_paxos) cerr << g_clock.now() << " mon" << whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".paxos(" << machine_name << " " << get_statename(state) << " lc " << last_committed << ") " +#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_paxos) cout << g_clock.now() << " mon" << whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".paxos(" << machine_name << " " << get_statename(state) << " lc " << last_committed << ") " void Paxos::init() @@ -46,13 +46,24 @@ void Paxos::collect(version_t oldpn) assert(mon->is_leader()); // reset the number of lasts received + uncommitted_v = 0; + uncommitted_pn = 0; + uncommitted_value.clear(); + + // look for uncommitted value + if (mon->store->exists_bl_sn(machine_name, last_committed+1)) { + uncommitted_v = last_committed+1; + uncommitted_pn = accepted_pn; + mon->store->get_bl_sn(uncommitted_value, machine_name, last_committed+1); + dout(10) << "learned uncommitted " << (last_committed+1) + << " (" << uncommitted_value.length() << " bytes) from myself" + << endl; + } + + // pick new pn accepted_pn = get_new_proposal_number(MAX(accepted_pn, oldpn)); accepted_pn_from = last_committed; num_last = 1; - old_accepted_v = 0; - old_accepted_pn = 0; - old_accepted_value.clear(); - dout(10) << "collect with pn " << accepted_pn << endl; // send collect @@ -66,6 +77,7 @@ void Paxos::collect(version_t oldpn) collect->pn = accepted_pn; mon->messenger->send_message(collect, mon->monmap->get_inst(*p)); } + } @@ -89,9 +101,10 @@ void Paxos::handle_collect(MMonPaxos *collect) if (mon->store->exists_bl_sn(machine_name, last_committed+1)) { mon->store->get_bl_sn(bl, machine_name, last_committed+1); assert(bl.length() > 0); - dout(10) << "sharing our accepted but uncommitted value for " << last_committed+1 << endl; + dout(10) << " sharing our accepted but uncommitted value for " << last_committed+1 + << " (" << bl.length() << " bytes)" << endl; last->values[last_committed+1] = bl; - last->old_accepted_pn = accepted_pn; + last->uncommitted_pn = accepted_pn; } // can we accept this pn? @@ -111,13 +124,13 @@ void Paxos::handle_collect(MMonPaxos *collect) last->pn_from = accepted_pn_from; // and share whatever data we have - for (version_t v = collect->last_committed; + for (version_t v = collect->last_committed+1; v <= last_committed; v++) { if (mon->store->exists_bl_sn(machine_name, v)) { mon->store->get_bl_sn(last->values[v], machine_name, v); - dout(10) << " sharing " << v << " " - << last->values[v].length() << " bytes" << endl; + dout(10) << " sharing " << v << " (" + << last->values[v].length() << " bytes)" << endl; } } @@ -143,19 +156,20 @@ void Paxos::handle_last(MMonPaxos *last) // share committed values dout(10) << "sending commit to " << last->get_source() << endl; MMonPaxos *commit = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COMMIT, machine_id); - for (version_t v = last->last_committed; + for (version_t v = last->last_committed+1; v <= last_committed; v++) { mon->store->get_bl_sn(commit->values[v], machine_name, v); - dout(10) << "sharing " << v << " " - << commit->values[v].length() << " bytes" << endl; + dout(10) << " sharing " << v << " (" + << commit->values[v].length() << " bytes)" << endl; } + commit->last_committed = last_committed; mon->messenger->send_message(commit, last->get_source_inst()); } // did we receive a committed value? if (last->last_committed > last_committed) { - for (version_t v = last_committed; + for (version_t v = last_committed+1; v <= last->last_committed; v++) { mon->store->put_bl_sn(last->values[v], machine_name, v); @@ -168,10 +182,10 @@ void Paxos::handle_last(MMonPaxos *last) } // do they accept your pn? - if (last->old_accepted_pn > accepted_pn) { + if (last->pn > accepted_pn) { // no, try again. dout(10) << " they had a higher pn than us, picking a new one." << endl; - collect(last->old_accepted_pn); + collect(last->pn); } else { // yes, they accepted our pn. great. num_last++; @@ -179,30 +193,36 @@ void Paxos::handle_last(MMonPaxos *last) << num_last << " peons" << endl; // did this person send back an accepted but uncommitted value? - if (last->old_accepted_pn && - last->old_accepted_pn > old_accepted_pn) { - old_accepted_v = last->last_committed+1; - old_accepted_pn = last->old_accepted_pn; - old_accepted_value = last->values[old_accepted_v]; - dout(10) << "we learned an old (possible) value for " << old_accepted_v - << " pn " << old_accepted_pn - << " " << old_accepted_value.length() << " bytes" + if (last->uncommitted_pn && + last->uncommitted_pn > uncommitted_pn) { + uncommitted_v = last->last_committed+1; + uncommitted_pn = last->uncommitted_pn; + uncommitted_value = last->values[uncommitted_v]; + dout(10) << "we learned an uncommitted value for " << uncommitted_v + << " pn " << uncommitted_pn + << " " << uncommitted_value.length() << " bytes" << endl; } // is that everyone? if (num_last == mon->get_quorum().size()) { + // almost... + state = STATE_ACTIVE; + // did we learn an old value? - if (old_accepted_v == last_committed+1 && - old_accepted_value.length()) { + if (uncommitted_v == last_committed+1 && + uncommitted_value.length()) { dout(10) << "that's everyone. begin on old learned value" << endl; - begin(old_accepted_value); + begin(uncommitted_value); } else { // active! dout(10) << "that's everyone. active!" << endl; - state = STATE_ACTIVE; - finish_contexts(waiting_for_active); extend_lease(); + + // wake people up + finish_contexts(waiting_for_active); + finish_contexts(waiting_for_readable); + finish_contexts(waiting_for_writeable); } } } @@ -215,11 +235,10 @@ void Paxos::handle_last(MMonPaxos *last) void Paxos::begin(bufferlist& v) { dout(10) << "begin for " << last_committed+1 << " " - << new_value.length() << " bytes" + << v.length() << " bytes" << endl; assert(mon->is_leader()); - assert(is_active()); state = STATE_UPDATING; @@ -231,7 +250,8 @@ void Paxos::begin(bufferlist& v) assert(new_value.length() == 0); // accept it ourselves - num_accepted = 1; + accepted.clear(); + accepted.insert(whoami); new_value = v; mon->store->put_bl_sn(new_value, machine_name, last_committed+1); @@ -239,8 +259,10 @@ void Paxos::begin(bufferlist& v) // we're alone, take it easy commit(); state = STATE_ACTIVE; - finish_contexts(waiting_for_commit); finish_contexts(waiting_for_active); + finish_contexts(waiting_for_commit); + finish_contexts(waiting_for_readable); + finish_contexts(waiting_for_writeable); return; } @@ -300,7 +322,8 @@ void Paxos::handle_begin(MMonPaxos *begin) void Paxos::handle_accept(MMonPaxos *accept) { dout(10) << "handle_accept " << *accept << endl; - + int from = accept->get_source().num(); + if (accept->pn != accepted_pn) { // we accepted a higher pn, from some other leader dout(10) << " we accepted a higher pn " << accepted_pn << ", ignoring" << endl; @@ -317,35 +340,44 @@ void Paxos::handle_accept(MMonPaxos *accept) accept->last_committed == last_committed-1); // committed assert(state == STATE_UPDATING); - num_accepted++; - dout(10) << "now " << num_accepted << " have accepted" << endl; + assert(accepted.count(from) == 0); + accepted.insert(from); + dout(10) << " now " << accepted << " have accepted" << endl; // new majority? - if (num_accepted == (unsigned)mon->monmap->num_mon/2+1) { + if (accepted.size() == (unsigned)mon->monmap->num_mon/2+1) { // yay, commit! // note: this may happen before the lease is reextended (below) - dout(10) << "we got a majority, committing too" << endl; + dout(10) << " got majority, committing" << endl; commit(); } // done? - if (num_accepted == mon->get_quorum().size()) { - state = STATE_ACTIVE; - finish_contexts(waiting_for_commit); - finish_contexts(waiting_for_active); - extend_lease(); - + if (accepted == mon->get_quorum()) { + dout(10) << " got quorum, done with update" << endl; // cancel timeout event mon->timer.cancel_event(accept_timeout_event); accept_timeout_event = 0; + + // yay! + state = STATE_ACTIVE; + extend_lease(); + + // wake people up + finish_contexts(waiting_for_active); + finish_contexts(waiting_for_commit); + finish_contexts(waiting_for_readable); + finish_contexts(waiting_for_writeable); } } void Paxos::accept_timeout() { dout(5) << "accept timeout, calling fresh election" << endl; + accept_timeout_event = 0; assert(mon->is_leader()); assert(is_updating()); + cancel_events(); mon->call_election(); } @@ -393,12 +425,13 @@ void Paxos::handle_commit(MMonPaxos *commit) ++p) { assert(p->first == last_committed+1); last_committed = p->first; + dout(10) << " storing " << last_committed << " (" << p->second.length() << " bytes)" << endl; mon->store->put_bl_sn(p->second, machine_name, last_committed); } mon->store->put_int(last_committed, machine_name, "last_committed"); delete commit; -} +} void Paxos::extend_lease() { @@ -423,9 +456,12 @@ void Paxos::extend_lease() mon->messenger->send_message(lease, mon->monmap->get_inst(*p)); } - // wake people up - finish_contexts(waiting_for_readable); - finish_contexts(waiting_for_writeable); + // set timeout event. + // if old timeout is still in place, leave it. + if (!lease_ack_timeout_event) { + lease_ack_timeout_event = new C_LeaseAckTimeout(this); + mon->timer.add_event_after(g_conf.mon_lease_ack_timeout, lease_ack_timeout_event); + } // set renew event lease_renew_event = new C_LeaseRenew(this); @@ -433,13 +469,6 @@ void Paxos::extend_lease() at -= g_conf.mon_lease; at += g_conf.mon_lease_renew_interval; mon->timer.add_event_at(at, lease_renew_event); - - // set timeout event. - // if old timeout is still in place, leave it. - if (!lease_ack_timeout_event) { - lease_ack_timeout_event = new C_LeaseAckTimeout(this); - mon->timer.add_event_after(g_conf.mon_lease_ack_timeout, lease_ack_timeout_event); - } } @@ -457,9 +486,8 @@ void Paxos::handle_lease(MMonPaxos *lease) // extend lease if (lease_expire < lease->lease_expire) lease_expire = lease->lease_expire; - + state = STATE_ACTIVE; - finish_contexts(waiting_for_active); dout(10) << "handle_lease on " << lease->last_committed << " now " << lease_expire << endl; @@ -469,8 +497,15 @@ void Paxos::handle_lease(MMonPaxos *lease) ack->last_committed = last_committed; ack->lease_expire = lease_expire; mon->messenger->send_message(ack, lease->get_source_inst()); + + // (re)set timeout event. + if (lease_timeout_event) + mon->timer.cancel_event(lease_timeout_event); + lease_timeout_event = new C_LeaseTimeout(this); + mon->timer.add_event_after(g_conf.mon_lease_ack_timeout, lease_timeout_event); // kick waiters + finish_contexts(waiting_for_active); if (is_readable()) finish_contexts(waiting_for_readable); @@ -481,7 +516,10 @@ void Paxos::handle_lease_ack(MMonPaxos *ack) { int from = ack->get_source().num(); - if (acked_lease.count(from) == 0) { + if (!lease_ack_timeout_event) { + dout(10) << "handle_lease_ack from " << ack->get_source() << " -- stray (probably since revoked)" << endl; + } + else if (acked_lease.count(from) == 0) { acked_lease.insert(from); if (acked_lease == mon->get_quorum()) { @@ -509,9 +547,28 @@ void Paxos::lease_ack_timeout() dout(5) << "lease_ack_timeout -- calling new election" << endl; assert(mon->is_leader()); assert(is_active()); + + lease_ack_timeout_event = 0; + cancel_events(); + mon->call_election(); +} + +void Paxos::lease_timeout() +{ + dout(5) << "lease_timeout -- calling new election" << endl; + assert(mon->is_peon()); + + lease_timeout_event = 0; + cancel_events(); mon->call_election(); } +void Paxos::lease_renew_timeout() +{ + lease_renew_event = 0; + extend_lease(); +} + /* * return a globally unique, monotonically increasing proposal number @@ -549,6 +606,10 @@ void Paxos::cancel_events() mon->timer.cancel_event(lease_ack_timeout_event); lease_ack_timeout_event = 0; } + if (lease_timeout_event) { + mon->timer.cancel_event(lease_timeout_event); + lease_timeout_event = 0; + } } void Paxos::leader_init() @@ -576,6 +637,15 @@ void Paxos::peon_init() finish_contexts(waiting_for_commit, -1); } +void Paxos::election_starting() +{ + dout(10) << "election_starting -- canceling timeouts" << endl; + cancel_events(); + new_value.clear(); + + finish_contexts(waiting_for_commit, -1); +} + void Paxos::dispatch(Message *m) { @@ -641,11 +711,13 @@ void Paxos::dispatch(Message *m) bool Paxos::is_readable() { - if (mon->get_quorum().size() == 1) return true; + //dout(15) << "is_readable now=" << g_clock.now() << " lease_expire=" << lease_expire << endl; return (mon->is_peon() || mon->is_leader()) && is_active() && - g_clock.now() < lease_expire; + last_committed > 0 && // must have a value + (mon->get_quorum().size() == 1 || // alone, or + g_clock.now() < lease_expire); // have lease } bool Paxos::read(version_t v, bufferlist &bl) diff --git a/branches/sage/cephmds2/mon/Paxos.h b/branches/sage/cephmds2/mon/Paxos.h index 6699cc5ad33ad..403e6d6eeaf96 100644 --- a/branches/sage/cephmds2/mon/Paxos.h +++ b/branches/sage/cephmds2/mon/Paxos.h @@ -113,18 +113,19 @@ private: // -- leader -- // recovery (paxos phase 1) unsigned num_last; - version_t old_accepted_v; - version_t old_accepted_pn; - bufferlist old_accepted_value; + version_t uncommitted_v; + version_t uncommitted_pn; + bufferlist uncommitted_value; // active set acked_lease; Context *lease_renew_event; Context *lease_ack_timeout_event; + Context *lease_timeout_event; // updating (paxos phase 2) bufferlist new_value; - unsigned num_accepted; + set accepted; Context *accept_timeout_event; @@ -149,12 +150,21 @@ private: } }; + class C_LeaseTimeout : public Context { + Paxos *paxos; + public: + C_LeaseTimeout(Paxos *p) : paxos(p) {} + void finish(int r) { + paxos->lease_timeout(); + } + }; + class C_LeaseRenew : public Context { Paxos *paxos; public: C_LeaseRenew(Paxos *p) : paxos(p) {} void finish(int r) { - paxos->extend_lease(); + paxos->lease_renew_timeout(); } }; @@ -171,8 +181,11 @@ private: void extend_lease(); void handle_lease(MMonPaxos*); void handle_lease_ack(MMonPaxos*); - void lease_ack_timeout(); - + + void lease_ack_timeout(); // on leader, if lease isn't acked by all peons + void lease_renew_timeout(); // on leader, to renew the lease + void lease_timeout(); // on peon, if lease isn't extended + void cancel_events(); version_t get_new_proposal_number(version_t gt=0); @@ -185,23 +198,23 @@ public: state(STATE_RECOVERING), lease_renew_event(0), lease_ack_timeout_event(0), + lease_timeout_event(0), accept_timeout_event(0) { } void dispatch(Message *m); void init(); + void election_starting(); void leader_init(); void peon_init(); // -- service interface -- - /* void wait_for_active(Context *c) { assert(!is_active()); waiting_for_active.push_back(c); } - */ // read version_t get_version() { return last_committed; } @@ -225,6 +238,9 @@ public: void wait_for_commit(Context *oncommit) { waiting_for_commit.push_back(oncommit); } + void wait_for_commit_front(Context *oncommit) { + waiting_for_commit.push_front(oncommit); + } }; diff --git a/branches/sage/cephmds2/mon/PaxosService.cc b/branches/sage/cephmds2/mon/PaxosService.cc index 021981b7b148b..6f4fba2d6c27d 100644 --- a/branches/sage/cephmds2/mon/PaxosService.cc +++ b/branches/sage/cephmds2/mon/PaxosService.cc @@ -20,8 +20,9 @@ #include "config.h" #undef dout -#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cout << g_clock.now() << " mon" << mon->whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << "." << get_paxos_name(paxos->machine_id) << " " -#define derr(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cerr << g_clock.now() << " mon" << mon->whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << "." << get_paxos_name(paxos->machine_id) << " " +#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_paxos) cout << g_clock.now() << " mon" << mon->whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".paxosservice(" << get_paxos_name(paxos->machine_id) << ") " +//#define derr(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cerr << g_clock.now() << " mon" << mon->whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << "." << get_paxos_name(paxos->machine_id) << " " + @@ -30,15 +31,17 @@ void PaxosService::dispatch(Message *m) dout(10) << "dispatch " << *m << " from " << m->get_source_inst() << endl; // make sure our map is readable and up to date - if (!paxos->is_readable() || - !update_from_paxos()) { + if (!paxos->is_readable()) { dout(10) << " waiting for paxos -> readable" << endl; paxos->wait_for_readable(new C_RetryMessage(this, m)); return; } + // make sure service has latest from paxos. + update_from_paxos(); + // preprocess - if (preprocess_update(m)) + if (preprocess_query(m)) return; // easy! // leader? @@ -56,14 +59,78 @@ void PaxosService::dispatch(Message *m) return; } - prepare_update(m); + // update + if (prepare_update(m) && + should_propose_now()) + propose_pending(); +} + +void PaxosService::_commit() +{ + dout(7) << "_commit" << endl; + update_from_paxos(); // notify service of new paxos state + + if (mon->is_leader()) { + dout(7) << "_commit creating new pending" << endl; + assert(have_pending == false); + create_pending(); + have_pending = true; + } +} + + +void PaxosService::propose_pending() +{ + dout(10) << "propose_pending" << endl; + assert(have_pending); + + // finish and encode + bufferlist bl; + encode_pending(bl); + have_pending = false; - // do it now (for now!) *** - propose_pending(); + // apply to paxos + paxos->wait_for_commit_front(new C_Commit(this)); + paxos->propose_new_value(bl); } + void PaxosService::election_finished() { - if (mon->is_leader() && g_conf.mkfs) - create_initial(); + dout(10) << "election_finished" << endl; + + if (have_pending && + !mon->is_leader()) { + discard_pending(); + have_pending = false; + } + + // make sure we update our state + if (paxos->is_active()) + _active(); + else + paxos->wait_for_active(new C_Active(this)); +} + +void PaxosService::_active() +{ + dout(10) << "_active" << endl; + assert(paxos->is_active()); + + // pull latest from paxos + update_from_paxos(); + + // create pending state? + if (mon->is_leader()) { + if (!have_pending) { + create_pending(); + have_pending = true; + } + + if (g_conf.mkfs && + paxos->get_version() == 0) { + create_initial(); + propose_pending(); + } + } } diff --git a/branches/sage/cephmds2/mon/PaxosService.h b/branches/sage/cephmds2/mon/PaxosService.h index 59bcc770d108f..32bcb3e4b11fb 100644 --- a/branches/sage/cephmds2/mon/PaxosService.h +++ b/branches/sage/cephmds2/mon/PaxosService.h @@ -25,36 +25,65 @@ class PaxosService : public Dispatcher { protected: Monitor *mon; Paxos *paxos; - class C_RetryMessage : public Context { - Dispatcher *svc; + PaxosService *svc; Message *m; public: - C_RetryMessage(Dispatcher *s, Message *m_) : svc(s), m(m_) {} + C_RetryMessage(PaxosService *s, Message *m_) : svc(s), m(m_) {} void finish(int r) { svc->dispatch(m); } }; + class C_Active : public Context { + PaxosService *svc; + public: + C_Active(PaxosService *s) : svc(s) {} + void finish(int r) { + if (r >= 0) + svc->_active(); + } + }; + class C_Commit : public Context { + PaxosService *svc; + public: + C_Commit(PaxosService *s) : svc(s) {} + void finish(int r) { + if (r >= 0) + svc->_commit(); + } + }; + friend class C_Update; + +private: + bool have_pending; public: - PaxosService(Monitor *mn, Paxos *p) : mon(mn), paxos(p) { } + PaxosService(Monitor *mn, Paxos *p) : mon(mn), paxos(p), + have_pending(false) { } - // i implement + // i implement and you ignore void dispatch(Message *m); void election_finished(); - // you implement - virtual void create_initial() = 0; - virtual bool update_from_paxos() = 0; - virtual void prepare_pending() = 0; - virtual void propose_pending() = 0; +private: + void _active(); + void _commit(); - virtual bool preprocess_update(Message *m) = 0; // true if processed. - virtual void prepare_update(Message *m)= 0; +public: + // i implement and you use + void propose_pending(); // propose current pending as new paxos state - virtual void tick() {}; // check state, take actions + // you implement + virtual bool update_from_paxos() = 0; // assimilate latest paxos state + virtual void create_pending() = 0; // [leader] create new pending structures + virtual void create_initial() = 0; // [leader] populate pending with initial state (1) + virtual void encode_pending(bufferlist& bl) = 0; // [leader] finish and encode pending for next paxos state + virtual void discard_pending() { } // [leader] discard pending + virtual bool preprocess_query(Message *m) = 0; // true if processed (e.g., read-only) + virtual bool prepare_update(Message *m) = 0; + virtual bool should_propose_now() { return true; } }; diff --git a/branches/sage/cephmds2/msg/FakeMessenger.cc b/branches/sage/cephmds2/msg/FakeMessenger.cc index 00adff44eda32..dcfc40a986a3a 100644 --- a/branches/sage/cephmds2/msg/FakeMessenger.cc +++ b/branches/sage/cephmds2/msg/FakeMessenger.cc @@ -233,7 +233,7 @@ int fakemessenger_do_loop_2() } } - // deal with shutdowns.. dleayed to avoid concurrent directory modification + // deal with shutdowns.. delayed to avoid concurrent directory modification if (!shutdown_set.empty()) { for (set::iterator it = shutdown_set.begin(); it != shutdown_set.end(); @@ -377,7 +377,8 @@ int FakeMessenger::send_message(Message *m, entity_inst_t inst, int port, int fr #endif // queue - if (directory.count(inst.addr)) { + if (directory.count(inst.addr) && + shutdown_set.count(inst.addr) == 0) { dout(1) << "--> " << get_myname() << " -> " << inst.name << " " << *m << " -- " << m << endl; directory[inst.addr]->queue_incoming(m); diff --git a/branches/sage/cephmds2/newsyn.cc b/branches/sage/cephmds2/newsyn.cc index b7b8d43042191..dc967ada1caaf 100644 --- a/branches/sage/cephmds2/newsyn.cc +++ b/branches/sage/cephmds2/newsyn.cc @@ -12,6 +12,7 @@ * */ +#define intabs(x) ((x) >= 0 ? (x):(-(x))) #include #include @@ -32,10 +33,6 @@ using namespace std; #include "common/Timer.h" -#define NUMMDS g_conf.num_mds -#define NUMOSD g_conf.num_osd -#define NUMCLIENT g_conf.num_client - class C_Test : public Context { public: void finish(int r) { @@ -180,6 +177,22 @@ int main(int argc, char **argv) parse_config_options(args); parse_syn_options(args); + + //int start_mon = g_conf.num_mon > 0 ? g_conf.num_mon:0; + int start_mds = g_conf.num_mds > 0 ? g_conf.num_mds:0; + int start_osd = g_conf.num_osd > 0 ? g_conf.num_osd:0; + int start_client = g_conf.num_client > 0 ? g_conf.num_client:0; + + //g_conf.num_mon = intabs(g_conf.num_mon); + g_conf.num_mds = intabs(g_conf.num_mds); + g_conf.num_client = intabs(g_conf.num_client); + g_conf.num_osd = intabs(g_conf.num_osd); + + // stop on our own + g_conf.mon_stop_on_last_unmount = true; + g_conf.mon_stop_with_last_mds = true; + + if (g_conf.kill_after) g_timer.add_event_after(g_conf.kill_after, new C_Die); if (g_conf.debug_after) @@ -213,19 +226,19 @@ int main(int argc, char **argv) int need = 0; if (g_conf.ms_skip_rank0) need++; - need += NUMMDS; + need += start_mds; if (g_conf.ms_stripe_osds) need++; else - need += NUMOSD; - if (NUMCLIENT) { + need += start_osd; + if (start_client) { if (!g_conf.ms_overlay_clients) need += 1; } assert(need <= world); if (myrank == 0) - cerr << "nummds " << NUMMDS << " numosd " << NUMOSD << " numclient " << NUMCLIENT << " .. need " << need << ", have " << world << endl; + cerr << "nummds " << start_mds << " numosd " << start_osd << " numclient " << start_client << " .. need " << need << ", have " << world << endl; char hostname[100]; @@ -253,7 +266,7 @@ int main(int argc, char **argv) // create mds map mds; map mdsosd; - for (int i=0; i osd; - int max_osd_nodes = world - NUMMDS - g_conf.ms_skip_rank0; // assumes 0 clients, if we stripe. - int osds_per_node = (NUMOSD-1)/max_osd_nodes + 1; - for (int i=0; i 0) clients_per_node = (NUMCLIENT-1) / client_nodes + 1; + if (start_client && client_nodes > 0) clients_per_node = (start_client-1) / client_nodes + 1; set clientlist; - map client;//[NUMCLIENT]; - map syn;//[NUMCLIENT]; + map client;//[start_client]; + map syn;//[start_client]; int nclients = 0; - for (int i=0; isecond; */ /* - for (int i=0; ipick_mon(); - messenger->send_message(new MOSDBoot(superblock), monmap->get_inst(mon)); + messenger->send_message(new MOSDBoot(messenger->get_myinst(), superblock), monmap->get_inst(mon)); // start the heart timer.add_event_after(g_conf.osd_heartbeat_interval, new C_Heartbeat(this)); @@ -744,7 +744,7 @@ void OSD::ms_handle_failure(Message *m, const entity_inst_t& inst) << ", dropping and reporting to mon" << mon << " " << *m << dendl; - messenger->send_message(new MOSDFailure(inst, osdmap->get_epoch()), + messenger->send_message(new MOSDFailure(messenger->get_myinst(), inst, osdmap->get_epoch()), monmap->get_inst(mon)); delete m; } else if (dest.is_mon()) { @@ -894,10 +894,13 @@ void OSD::handle_osd_map(MOSDMap *m) dout(10) << "handle_osd_map decoding inc map epoch " << cur+1 << dendl; bufferlist bl; - if (m->incremental_maps.count(cur+1)) + if (m->incremental_maps.count(cur+1)) { + dout(10) << " using provided inc map" << endl; bl = m->incremental_maps[cur+1]; - else + } else { + dout(10) << " using my locally stored inc map" << endl; get_inc_map_bl(cur+1, bl); + } OSDMap::Incremental inc; int off = 0; @@ -1018,7 +1021,6 @@ void OSD::advance_map(ObjectStore::Transaction& t) ps_t maxps = 1ULL << osdmap->get_pg_bits(); ps_t maxlps = 1ULL << osdmap->get_localized_pg_bits(); dout(1) << "mkfs on " << osdmap->get_pg_bits() << " bits, " << maxps << " pgs" << dendl; - assert(osdmap->get_epoch() == 1); //cerr << "osdmap " << osdmap->get_ctime() << " logger start " << logger->get_start() << dendl; logger->set_start( osdmap->get_ctime() ); @@ -1317,7 +1319,7 @@ void OSD::get_map(epoch_t epoch, OSDMap &m) incs.push_front(inc); } } - assert(e > 0); + assert(e >= 0); // apply incrementals for (e++; e <= epoch; e++) { diff --git a/branches/sage/cephmds2/osd/OSDMap.h b/branches/sage/cephmds2/osd/OSDMap.h index c9a4a8b528600..5803d55da2687 100644 --- a/branches/sage/cephmds2/osd/OSDMap.h +++ b/branches/sage/cephmds2/osd/OSDMap.h @@ -70,6 +70,11 @@ public: epoch_t epoch; // new epoch; we are a diff from epoch-1 to epoch epoch_t mon_epoch; // monitor epoch (election iteration) utime_t ctime; + + // full (rare) + bufferlist fullmap; // in leiu of below. + + // incremental map new_up; map new_down; list new_in; @@ -78,27 +83,26 @@ public: list old_overload; // no longer overload void encode(bufferlist& bl) { - bl.append((char*)&epoch, sizeof(epoch)); - bl.append((char*)&mon_epoch, sizeof(mon_epoch)); - bl.append((char*)&ctime, sizeof(ctime)); + ::_encode(epoch, bl); + ::_encode(mon_epoch, bl); + ::_encode(ctime, bl); ::_encode(new_up, bl); ::_encode(new_down, bl); ::_encode(new_in, bl); ::_encode(new_out, bl); ::_encode(new_overload, bl); + ::_encode(fullmap, bl); } void decode(bufferlist& bl, int& off) { - bl.copy(off, sizeof(epoch), (char*)&epoch); - off += sizeof(epoch); - bl.copy(off, sizeof(mon_epoch), (char*)&mon_epoch); - off += sizeof(mon_epoch); - bl.copy(off, sizeof(ctime), (char*)&ctime); - off += sizeof(ctime); + ::_decode(epoch, bl, off); + ::_decode(mon_epoch, bl, off); + ::_decode(ctime, bl, off); ::_decode(new_up, bl, off); ::_decode(new_down, bl, off); ::_decode(new_in, bl, off); ::_decode(new_out, bl, off); ::_decode(new_overload, bl, off); + ::_decode(fullmap, bl, off); } Incremental(epoch_t e=0) : epoch(e), mon_epoch(0) {} @@ -136,8 +140,8 @@ private: const utime_t& get_ctime() const { return ctime; } - bool is_mkfs() const { return epoch == 1; } - //void set_mkfs() { assert(epoch == 1); } + bool is_mkfs() const { return epoch == 2; } + bool post_mkfs() const { return epoch > 2; } /***** cluster state *****/ int num_osds() { return osds.size(); } @@ -148,11 +152,15 @@ private: const set& get_out_osds() { return out_osds; } const map& get_overload_osds() { return overload_osds; } + bool exists(int osd) { return osds.count(osd); } bool is_down(int osd) { return down_osds.count(osd); } - bool is_up(int osd) { return !is_down(osd); } + bool is_up(int osd) { return exists(osd) && !is_down(osd); } bool is_out(int osd) { return out_osds.count(osd); } - bool is_in(int osd) { return !is_out(osd); } + bool is_in(int osd) { return exists(osd) && !is_out(osd); } + bool have_inst(int osd) { + return osd_inst.count(osd); + } const entity_inst_t& get_inst(int osd) { assert(osd_inst.count(osd)); return osd_inst[osd]; @@ -177,15 +185,13 @@ private: mon_epoch = inc.mon_epoch; ctime = inc.ctime; - for (map::iterator i = inc.new_up.begin(); - i != inc.new_up.end(); - i++) { - assert(down_osds.count(i->first)); - down_osds.erase(i->first); - assert(osd_inst.count(i->first) == 0); - osd_inst[i->first] = i->second; - //cout << "epoch " << epoch << " up osd" << i->first << endl; + // full map? + if (inc.fullmap.length()) { + decode(inc.fullmap); + return; } + + // nope, incremental. for (map::iterator i = inc.new_down.begin(); i != inc.new_down.end(); i++) { @@ -196,13 +202,6 @@ private: osd_inst.erase(i->first); //cout << "epoch " << epoch << " down osd" << i->first << endl; } - for (list::iterator i = inc.new_in.begin(); - i != inc.new_in.end(); - i++) { - assert(out_osds.count(*i)); - out_osds.erase(*i); - //cout << "epoch " << epoch << " in osd" << *i << endl; - } for (list::iterator i = inc.new_out.begin(); i != inc.new_out.end(); i++) { @@ -210,17 +209,34 @@ private: out_osds.insert(*i); //cout << "epoch " << epoch << " out osd" << *i << endl; } - for (map::iterator i = inc.new_overload.begin(); - i != inc.new_overload.end(); - i++) { - overload_osds[i->first] = i->second; - } for (list::iterator i = inc.old_overload.begin(); i != inc.old_overload.end(); i++) { assert(overload_osds.count(*i)); overload_osds.erase(*i); } + + for (map::iterator i = inc.new_up.begin(); + i != inc.new_up.end(); + i++) { + assert(down_osds.count(i->first)); + down_osds.erase(i->first); + assert(osd_inst.count(i->first) == 0); + osd_inst[i->first] = i->second; + //cout << "epoch " << epoch << " up osd" << i->first << endl; + } + for (list::iterator i = inc.new_in.begin(); + i != inc.new_in.end(); + i++) { + assert(out_osds.count(*i)); + out_osds.erase(*i); + //cout << "epoch " << epoch << " in osd" << *i << endl; + } + for (map::iterator i = inc.new_overload.begin(); + i != inc.new_overload.end(); + i++) { + overload_osds[i->first] = i->second; + } } // serialize, unserialize diff --git a/branches/sage/cephmds2/osd/ObjectStore.h b/branches/sage/cephmds2/osd/ObjectStore.h index 74818e0470526..7f6dc7f0afae7 100644 --- a/branches/sage/cephmds2/osd/ObjectStore.h +++ b/branches/sage/cephmds2/osd/ObjectStore.h @@ -153,7 +153,7 @@ public: pattrsets.push_back(&aset); } - void write(object_t oid, off_t off, size_t len, bufferlist& bl) { + void write(object_t oid, off_t off, size_t len, const bufferlist& bl) { int op = OP_WRITE; ops.push_back(op); oids.push_back(oid); @@ -474,15 +474,9 @@ public: virtual int read(object_t oid, off_t offset, size_t len, bufferlist& bl) = 0; - - /*virtual int write(object_t oid, - off_t offset, size_t len, - bufferlist& bl, - bool fsync=true) = 0; - */ virtual int write(object_t oid, off_t offset, size_t len, - bufferlist& bl, + const bufferlist& bl, Context *onsafe) = 0;//{ return -1; } virtual void trim_from_cache(object_t oid, off_t offset, size_t len) { } diff --git a/branches/sage/cephmds2/osd/PG.cc b/branches/sage/cephmds2/osd/PG.cc index 38e10e96183a5..7948e166757ab 100644 --- a/branches/sage/cephmds2/osd/PG.cc +++ b/branches/sage/cephmds2/osd/PG.cc @@ -649,7 +649,7 @@ void PG::peer(ObjectStore::Transaction& t, } else { dout(10) << " still active from last started: " << last_started << dendl; } - } else if (osd->osdmap->get_epoch() > 1) { + } else if (osd->osdmap->post_mkfs()) { dout(10) << " crashed since epoch " << last_epoch_started_any << dendl; state_set(STATE_CRASHED); } @@ -875,7 +875,7 @@ void PG::activate(ObjectStore::Transaction& t) // if primary.. if (role == 0 && - osd->osdmap->get_epoch() > 1) { + osd->osdmap->post_mkfs()) { // who is clean? clean_set.clear(); if (info.is_clean()) diff --git a/branches/sage/cephmds2/osdc/Objecter.cc b/branches/sage/cephmds2/osdc/Objecter.cc index e27f62ddac6c8..43c43a4ee6b5c 100644 --- a/branches/sage/cephmds2/osdc/Objecter.cc +++ b/branches/sage/cephmds2/osdc/Objecter.cc @@ -833,7 +833,7 @@ void Objecter::ms_handle_failure(Message *m, entity_name_t dest, const entity_in dout(0) << "ms_handle_failure " << dest << " inst " << inst << ", dropping and reporting to mon" << mon << endl; - messenger->send_message(new MOSDFailure(inst, osdmap->get_epoch()), + messenger->send_message(new MOSDFailure(messenger->get_myinst(), inst, osdmap->get_epoch()), monmap->get_inst(mon)); delete m; } else { -- 2.39.5