From a73bbfac36491bd738b81b910c2ac5d88e2c73e9 Mon Sep 17 00:00:00 2001 From: sageweil Date: Sun, 4 Feb 2007 03:26:56 +0000 Subject: [PATCH] basic mds recovery now working, mostly missing graceful handling of surviving mds; synclient 'walk' tweaks git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1075 29311d96-e01e-0410-9327-a35deaab8ce9 --- branches/sage/cephmds2/TODO | 23 +- .../sage/cephmds2/client/SyntheticClient.cc | 48 ++- branches/sage/cephmds2/include/buffer.h | 28 ++ branches/sage/cephmds2/mds/CDentry.h | 2 +- branches/sage/cephmds2/mds/CDir.cc | 30 +- branches/sage/cephmds2/mds/CDir.h | 3 +- branches/sage/cephmds2/mds/Lock.h | 2 +- branches/sage/cephmds2/mds/MDCache.cc | 360 ++++++++++++++---- branches/sage/cephmds2/mds/MDCache.h | 15 +- branches/sage/cephmds2/mds/MDS.cc | 85 +++-- branches/sage/cephmds2/mds/MDS.h | 3 +- branches/sage/cephmds2/mds/MDSMap.h | 36 +- branches/sage/cephmds2/mds/Migrator.cc | 327 +++++++++++++--- branches/sage/cephmds2/mds/Migrator.h | 67 +++- .../sage/cephmds2/messages/MMDSCacheRejoin.h | 39 +- .../cephmds2/messages/MMDSCacheRejoinAck.h | 82 ++++ .../sage/cephmds2/messages/MMDSImportMap.h | 2 +- branches/sage/cephmds2/mon/MDSMonitor.cc | 2 +- branches/sage/cephmds2/msg/Message.cc | 4 + branches/sage/cephmds2/msg/Message.h | 1 + 20 files changed, 922 insertions(+), 237 deletions(-) create mode 100644 branches/sage/cephmds2/messages/MMDSCacheRejoinAck.h diff --git a/branches/sage/cephmds2/TODO b/branches/sage/cephmds2/TODO index 4031e9a0ee2b1..1144efdf58b7e 100644 --- a/branches/sage/cephmds2/TODO +++ b/branches/sage/cephmds2/TODO @@ -1,10 +1,23 @@ -- preemptive mdsmap sharing -- how to disambiguate importstart w/o an importfinish? -- der chicken/egg headache on immediately sending import_map, since imports are ambiguous. - +huh: +- how to keep mds osd op ids unique after a failover? + - (and, how to flush out failed mds) + + +- handle exporter recovery if importer fails during EXPORT_EXPORTING stage + +- delay response to sending import_map if export in progress? +- finish export before sending import_map? (if is_recoverying or whatever...) + - ambiguous imports on active node should include in-progress imports! +- how to effectively trim cache after resolve but before rejoin + - we need to eliminate unneed non-auth metadata, without hosing potentially useful auth metadata + + +- falures during recovery stages... rejoin + + importmap only sent after exports have completed. failures update export ack waitlists, so exports will compelte if unrelated nodes fail. @@ -13,7 +26,7 @@ failure of exporter induces some cleanup on importer. will disambiguate when it failure of importer induces cleanup on exporter. no ambiguity. -- no new mds may join if cluster is in a recovery state. creating|replay -> standby (unless failed) +- no new mds may join if cluster is in a recovery state. starting -> standby (unless failed) recoverer: - enter rejoin state diff --git a/branches/sage/cephmds2/client/SyntheticClient.cc b/branches/sage/cephmds2/client/SyntheticClient.cc index b0569d52e553e..1f8966e127b91 100644 --- a/branches/sage/cephmds2/client/SyntheticClient.cc +++ b/branches/sage/cephmds2/client/SyntheticClient.cc @@ -105,7 +105,7 @@ void parse_syn_options(vector& args) syn_iargs.push_back( atoi(args[++i]) ); syn_iargs.push_back( atoi(args[++i]) ); - } else if (strcmp(args[i],"fullwalk") == 0) { + } else if (strcmp(args[i],"walk") == 0) { syn_modes.push_back( SYNCLIENT_MODE_FULLWALK ); //syn_sargs.push_back( atoi(args[++i]) ); } else if (strcmp(args[i],"randomwalk") == 0) { @@ -366,7 +366,7 @@ int SyntheticClient::run() case SYNCLIENT_MODE_FULLWALK: { - string sarg1 = get_sarg(0); + string sarg1;// = get_sarg(0); if (run_me()) { dout(2) << "fullwalk" << sarg1 << endl; full_walk(sarg1); @@ -721,27 +721,39 @@ int SyntheticClient::full_walk(string& basedir) { if (time_to_stop()) return -1; - // read dir - map contents; - int r = client->getdir(basedir.c_str(), contents); - if (r < 0) { - dout(1) << "readdir on " << basedir << " returns " << r << endl; - return r; - } + list dirq; + dirq.push_back(basedir); - for (map::iterator it = contents.begin(); - it != contents.end(); - it++) { - string file = basedir + "/" + it->first; + while (!dirq.empty()) { + string dir = dirq.front(); + dirq.pop_front(); - struct stat st; - int r = client->lstat(file.c_str(), &st); + // read dir + map contents; + int r = client->getdir(dir.c_str(), contents); if (r < 0) { - dout(1) << "stat error on " << file << " r=" << r << endl; + dout(1) << "readdir on " << dir << " returns " << r << endl; continue; } - - if ((st.st_mode & INODE_TYPE_MASK) == INODE_MODE_DIR) full_walk(file); + + for (map::iterator it = contents.begin(); + it != contents.end(); + it++) { + if (it->first == ".") continue; + if (it->first == "..") continue; + string file = dir + "/" + it->first; + + struct stat st; + int r = client->lstat(file.c_str(), &st); + if (r < 0) { + dout(1) << "stat error on " << file << " r=" << r << endl; + continue; + } + + if ((st.st_mode & INODE_TYPE_MASK) == INODE_MODE_DIR) { + dirq.push_back(file); + } + } } return 0; diff --git a/branches/sage/cephmds2/include/buffer.h b/branches/sage/cephmds2/include/buffer.h index 06bad7443f1c2..9fa9663ff1fd3 100644 --- a/branches/sage/cephmds2/include/buffer.h +++ b/branches/sage/cephmds2/include/buffer.h @@ -799,6 +799,34 @@ inline void _decode(bufferlist& s, bufferlist& bl, int& off) #include #include +// set +inline void _encode(const std::set& s, bufferlist& bl) +{ + int n = s.size(); + bl.append((char*)&n, sizeof(n)); + for (std::set::const_iterator it = s.begin(); + it != s.end(); + it++) { + ::_encode(*it, bl); + n--; + } + assert(n==0); +} +inline void _decode(std::set& s, bufferlist& bl, int& off) +{ + s.clear(); + int n; + bl.copy(off, sizeof(n), (char*)&n); + off += sizeof(n); + for (int i=0; i template inline void _encode(const std::set& s, bufferlist& bl) diff --git a/branches/sage/cephmds2/mds/CDentry.h b/branches/sage/cephmds2/mds/CDentry.h index 3080a0b0c201a..65b9155ce69f9 100644 --- a/branches/sage/cephmds2/mds/CDentry.h +++ b/branches/sage/cephmds2/mds/CDentry.h @@ -32,7 +32,7 @@ class CDir; #define DN_LOCK_SYNC 0 #define DN_LOCK_PREXLOCK 1 #define DN_LOCK_XLOCK 2 -#define DN_LOCK_UNPINNING 3 // waiting for pins to go away +#define DN_LOCK_UNPINNING 3 // waiting for pins to go away .. FIXME REVIEW THIS CODE .. #define DN_XLOCK_FOREIGN ((Message*)0x1) // not 0, not a valid pointer. diff --git a/branches/sage/cephmds2/mds/CDir.cc b/branches/sage/cephmds2/mds/CDir.cc index 8f5137514e00f..578cb0c091bb8 100644 --- a/branches/sage/cephmds2/mds/CDir.cc +++ b/branches/sage/cephmds2/mds/CDir.cc @@ -60,6 +60,13 @@ ostream& operator<<(ostream& out, CDir& dir) out << " dir_auth=" << dir.get_dir_auth(); out << " state=" << dir.get_state(); + if (dir.state_test(CDIR_STATE_COMPLETE)) out << "|complete"; + if (dir.state_test(CDIR_STATE_FREEZINGTREE)) out << "|freezingtree"; + if (dir.state_test(CDIR_STATE_FROZENTREE)) out << "|frozentree"; + if (dir.state_test(CDIR_STATE_FROZENTREELEAF)) out << "|frozentreeleaf"; + if (dir.state_test(CDIR_STATE_FROZENDIR)) out << "|frozendir"; + if (dir.state_test(CDIR_STATE_FREEZINGDIR)) out << "|freezingdir"; + out << " sz=" << dir.get_nitems() << "+" << dir.get_nnull(); if (dir.get_num_ref()) { @@ -716,13 +723,24 @@ void CDir::freeze_tree_finish(Context *c) void CDir::unfreeze_tree() { dout(10) << "unfreeze_tree " << *this << endl; - state_clear(CDIR_STATE_FROZENTREE); - - // unpin (may => FREEZEABLE) FIXME: is this order good? - inode->auth_unpin(); - // waiters? - finish_waiting(CDIR_WAIT_UNFREEZE); + if (state_test(CDIR_STATE_FROZENTREE)) { + // frozen. unfreeze. + state_clear(CDIR_STATE_FROZENTREE); + + // unpin (may => FREEZEABLE) FIXME: is this order good? + inode->auth_unpin(); + + // waiters? + finish_waiting(CDIR_WAIT_UNFREEZE); + } else { + // freezing. stop it. + assert(state_test(CDIR_STATE_FREEZINGTREE)); + state_clear(CDIR_STATE_FREEZINGTREE); + + // cancel freeze waiters + finish_waiting(CDIR_WAIT_FREEZEABLE, -1); + } } bool CDir::is_freezing_tree() diff --git a/branches/sage/cephmds2/mds/CDir.h b/branches/sage/cephmds2/mds/CDir.h index fa21c9cdcf3ef..48df8c6f411b7 100644 --- a/branches/sage/cephmds2/mds/CDir.h +++ b/branches/sage/cephmds2/mds/CDir.h @@ -88,7 +88,8 @@ class Context; |CDIR_STATE_DIRTY) #define CDIR_MASK_STATE_IMPORT_KEPT (CDIR_STATE_IMPORT\ |CDIR_STATE_EXPORT\ - |CDIR_STATE_IMPORTINGEXPORT) + |CDIR_STATE_IMPORTINGEXPORT\ + |CDIR_STATE_FROZENTREE) #define CDIR_MASK_STATE_EXPORT_KEPT (CDIR_STATE_HASHED\ |CDIR_STATE_FROZENTREE\ |CDIR_STATE_FROZENDIR\ diff --git a/branches/sage/cephmds2/mds/Lock.h b/branches/sage/cephmds2/mds/Lock.h index 76fb9f5484d49..0d9dabb61b669 100644 --- a/branches/sage/cephmds2/mds/Lock.h +++ b/branches/sage/cephmds2/mds/Lock.h @@ -85,7 +85,7 @@ class CLock { public: CLock() : - state(LOCK_LOCK), + state(LOCK_SYNC), nread(0), wrlock_by(0) { } diff --git a/branches/sage/cephmds2/mds/MDCache.cc b/branches/sage/cephmds2/mds/MDCache.cc index 491b353326b0f..6e351d3f9a4f4 100644 --- a/branches/sage/cephmds2/mds/MDCache.cc +++ b/branches/sage/cephmds2/mds/MDCache.cc @@ -47,6 +47,7 @@ #include "messages/MMDSImportMap.h" #include "messages/MMDSCacheRejoin.h" +#include "messages/MMDSCacheRejoinAck.h" #include "messages/MDiscover.h" #include "messages/MDiscoverReply.h" @@ -249,18 +250,6 @@ void MDCache::log_import_map(Context *onsync) // ===================== // recovery stuff -void MDCache::send_import_maps() -{ - dout(10) << "send_import_maps" << endl; - - for (set::iterator p = want_import_map.begin(); - p != want_import_map.end(); - ++p) - send_import_map(*p); - - want_import_map.clear(); -} - void MDCache::send_import_map(int who) { dout(10) << "send_import_map to mds" << who << endl; @@ -272,14 +261,22 @@ void MDCache::send_import_map(int who) p != imports.end(); p++) { CDir *im = *p; - m->add_import(im->ino()); - - if (nested_exports.count(im)) { - for (set::iterator q = nested_exports[im].begin(); - q != nested_exports[im].end(); - ++q) { - CDir *ex = *q; - m->add_import_export(im->ino(), ex->ino()); + + if (migrator->is_importing(im->ino())) { + // ambiguous (mid-import) + m->add_ambiguous_import(im->ino(), + migrator->get_import_bounds(im->ino())); + } else { + // not ambiguous. + m->add_import(im->ino()); + + if (nested_exports.count(im)) { + for (set::iterator q = nested_exports[im].begin(); + q != nested_exports[im].end(); + ++q) { + CDir *ex = *q; + m->add_import_export(im->ino(), ex->ino()); + } } } } @@ -295,15 +292,18 @@ void MDCache::send_import_map(int who) } + +/* + * during resolve state, we share import_maps to determine who + * is authoritative for which trees. we expect to get an import_map + * from _everyone_ in the recovery_set (the mds cluster at the time of + * the first failure). + */ void MDCache::handle_import_map(MMDSImportMap *m) { dout(7) << "handle_import_map from " << m->get_source() << endl; int from = m->get_source().num(); - MMDSCacheRejoin *rejoin; - if (mds->is_active() || mds->is_stopping()) - rejoin = new MMDSCacheRejoin; - // FIXME: check if we are a surviving ambiguous importer // update my dir_auth values @@ -328,11 +328,6 @@ void MDCache::handle_import_map(MMDSImportMap *m) if (ex->get_dir_auth() == CDIR_AUTH_PARENT) ex->set_dir_auth(CDIR_AUTH_UNKNOWN); } - - if (mds->is_active()) { - // walk my cache to fill out CacheRejoin - cache_rejoin_walk(im, rejoin); - } } // note ambiguous imports too @@ -340,22 +335,23 @@ void MDCache::handle_import_map(MMDSImportMap *m) pi != m->ambiguous_imap.end(); ++pi) mds->mdcache->other_ambiguous_imports[from][pi->first].swap( pi->second ); - - if (mds->is_rejoin()) { - assert(want_import_map.count(from)); - want_import_map.erase(from); - if (want_import_map.empty()) { - dout(10) << "got all import maps" << endl; - disambiguate_imports(); - recalc_auth_bits(); - send_cache_rejoins(); - } else { - dout(10) << "still waiting for importmaps from " << want_import_map << endl; - } - } else if (mds->is_active() || mds->is_stopping()) { - mds->send_message_mds(rejoin, from, MDS_PORT_CACHE); - } + // did i get them all? + got_import_map.insert(from); + + if (got_import_map == recovery_set) { + dout(10) << "got all import maps, ready to rejoin" << endl; + disambiguate_imports(); + recalc_auth_bits(); + + // move to rejoin state + mds->set_want_state(MDSMap::STATE_REJOIN); + + } else { + dout(10) << "still waiting for more importmaps, got " << got_import_map + << ", need " << recovery_set << endl; + } + delete m; } @@ -569,12 +565,66 @@ void MDCache::finish_ambiguous_export(inodeno_t dirino, set& bounds) +/* + * rejoin phase! + * we start out by sending rejoins to everyone in the recovery set. + * + * if _were_ are rejoining, send for all regions in our cache. + * if we are active|stopping, send only to nodes that are are rejoining. + */ void MDCache::send_cache_rejoins() { dout(10) << "send_cache_rejoins " << endl; + + map rejoins; + // build list of dir_auth regions + list dir_auth_regions; + for (hash_map::iterator p = inode_map.begin(); + p != inode_map.end(); + ++p) { + if (!p->second->is_dir()) continue; + if (!p->second->dir) continue; + if (p->second->dir->get_dir_auth() == CDIR_AUTH_PARENT) continue; + + int auth = p->second->dir->get_dir_auth(); + assert(auth >= 0); + + if (auth == mds->get_nodeid()) continue; // skip my own regions! + + if (rejoins.count(auth) == 0) { + if (mds->is_rejoin() || // if i am rejoining, + mds->mdsmap->is_rejoin(auth)) // or if they are rejoining, + rejoins[auth] = new MMDSCacheRejoin; // send a rejoin + else + continue; // otherwise, skip this region + } + + // add to list + dout(10) << " on mds" << auth << " region " << *p->second << endl; + dir_auth_regions.push_back(p->second->dir); + } + + // walk the regions + for (list::iterator p = dir_auth_regions.begin(); + p != dir_auth_regions.end(); + ++p) { + CDir *dir = *p; + int to = dir->authority(); + cache_rejoin_walk(dir, rejoins[to]); + } + + // send the messages + assert(rejoin_ack_gather.empty()); + for (map::iterator p = rejoins.begin(); + p != rejoins.end(); + ++p) { + mds->send_message_mds(p->second, p->first, MDS_PORT_CACHE); + rejoin_ack_gather.insert(p->first); + } } + void MDCache::cache_rejoin_walk(CDir *dir, MMDSCacheRejoin *rejoin) { dout(10) << "cache_rejoin_walk " << *dir << endl; @@ -587,22 +637,13 @@ void MDCache::cache_rejoin_walk(CDir *dir, MMDSCacheRejoin *rejoin) p != dir->items.end(); ++p) { // dentry - if (mds->is_rejoin()) - rejoin->add_dentry(dir->ino(), p->first, -1); - else - rejoin->add_dentry(dir->ino(), p->first, p->second->lockstate); - + rejoin->add_dentry(dir->ino(), p->first); + // inode? if (p->second->is_primary() && p->second->get_inode()) { CInode *in = p->second->get_inode(); - if (mds->is_rejoin()) - rejoin->add_inode(in->ino(), - -1, -1, - in->get_caps_wanted()); - else - rejoin->add_inode(in->ino(), - in->hardlock.get_state(), in->filelock.get_state(), - in->get_caps_wanted()); + rejoin->add_inode(in->ino(), + in->get_caps_wanted()); // dir? if (in->dir && @@ -619,20 +660,169 @@ void MDCache::cache_rejoin_walk(CDir *dir, MMDSCacheRejoin *rejoin) } +/* + * i got a rejoin. + * + * - reply with the lockstate + * + * if i am active|stopping, + * - remove source from replica list for everything not referenced here. + */ void MDCache::handle_cache_rejoin(MMDSCacheRejoin *m) { dout(7) << "handle_cache_rejoin from " << m->get_source() << endl; - //int from = m->get_source().num(); + int from = m->get_source().num(); + + MMDSCacheRejoinAck *ack = new MMDSCacheRejoinAck; + + if (mds->is_active() || mds->is_stopping()) { + dout(10) << "removing stale cache replicas" << endl; + // first, scour cache of replica references + for (hash_map::iterator p = inode_map.begin(); + p != inode_map.end(); + ++p) { + // inode + CInode *in = p->second; + if (in->is_replica(from) && m->inodes.count(p->first) == 0) { + inode_remove_replica(in, from); + dout(10) << " rem " << *in << endl; + } + + // dentry + if (in->parent) { + CDentry *dn = in->parent; + if (dn->is_replica(from) && + (m->dentries.count(dn->get_dir()->ino()) == 0 || + m->dentries[dn->get_dir()->ino()].count(dn->get_name()) == 0)) { + dn->remove_replica(from); + dout(10) << " rem " << *dn << endl; + } + } + // dir + if (in->dir) { + CDir *dir = in->dir; + if (dir->is_replica(from) && m->dirs.count(p->first) == 0) { + dir->remove_replica(from); + dout(10) << " rem " << *dir << endl; + } + } + } + } else { + assert(mds->is_rejoin()); + } + + // dirs + for (set::iterator p = m->dirs.begin(); + p != m->dirs.end(); + ++p) { + CInode *diri = get_inode(*p); + assert(diri); + CDir *dir = diri->dir; + assert(dir); + int nonce = dir->add_replica(from); + dout(10) << " has " << *dir << endl; + ack->add_dir(*p, nonce); + + // dentries + for (set::iterator q = m->dentries[*p].begin(); + q != m->dentries[*p].end(); + ++q) { + CDentry *dn = dir->lookup(*q); + assert(dn); + int nonce = dn->add_replica(from); + dout(10) << " has " << *dn << endl; + ack->add_dentry(*p, *q, dn->get_lockstate(), nonce); + } + } + + // inodes + for (map::iterator p = m->inodes.begin(); + p != m->inodes.end(); + ++p) { + CInode *in = get_inode(p->first); + assert(in); + int nonce = in->add_replica(from); + if (p->second) + in->mds_caps_wanted[from] = p->second; + else + in->mds_caps_wanted.erase(from); + in->hardlock.gather_set.erase(from); // just in case + in->filelock.gather_set.erase(from); // just in case + dout(10) << " has " << *in << endl; + ack->add_inode(p->first, + in->hardlock.get_replica_state(), in->filelock.get_replica_state(), + nonce); + } + + // send ack + mds->send_message_mds(ack, from, MDS_PORT_CACHE); + delete m; +} + + +void MDCache::handle_cache_rejoin_ack(MMDSCacheRejoinAck *m) +{ + dout(7) << "handle_cache_rejoin from " << m->get_source() << endl; + int from = m->get_source().num(); + + // dirs + for (list::iterator p = m->dirs.begin(); + p != m->dirs.end(); + ++p) { + CInode *diri = get_inode(p->dirino); + CDir *dir = diri->dir; + assert(dir); + dir->set_replica_nonce(p->nonce); + dout(10) << " got " << *dir << endl; + + // dentries + for (map::iterator q = m->dentries[p->dirino].begin(); + q != m->dentries[p->dirino].end(); + ++q) { + CDentry *dn = dir->lookup(q->first); + assert(dn); + dn->set_replica_nonce(q->second.nonce); + dn->set_lockstate(q->second.lock); + dout(10) << " got " << *dn << endl; + } + } + + // inodes + for (list::iterator p = m->inodes.begin(); + p != m->inodes.end(); + ++p) { + CInode *in = get_inode(p->ino); + assert(in); + in->set_replica_nonce(p->nonce); + in->hardlock.set_state(p->hardlock); + in->filelock.set_state(p->filelock); + dout(10) << " got " << *in << endl; + } delete m; + + // done? + rejoin_ack_gather.erase(from); + if (rejoin_ack_gather.empty()) { + dout(7) << "all done, going active!" << endl; + show_imports(); + show_cache(); + mds->set_want_state(MDSMap::STATE_ACTIVE); + } else { + dout(7) << "still need rejoin_ack from " << rejoin_ack_gather << endl; + } + } + +// =============================================================================== + void MDCache::rename_file(CDentry *srcdn, CDentry *destdn) { @@ -1209,6 +1399,14 @@ void MDCache::dispatch(Message *m) handle_import_map((MMDSImportMap*)m); break; + case MSG_MDS_CACHEREJOIN: + handle_cache_rejoin((MMDSCacheRejoin*)m); + break; + case MSG_MDS_CACHEREJOINACK: + handle_cache_rejoin_ack((MMDSCacheRejoinAck*)m); + break; + + case MSG_MDS_DISCOVER: handle_discover((MDiscover*)m); break; @@ -2554,27 +2752,7 @@ void MDCache::handle_cache_expire(MCacheExpire *m) else if (nonce == in->get_replica_nonce(from)) { // remove from our cached_by dout(7) << "inode expire on " << *in << " from mds" << from << " cached_by was " << in->get_replicas() << endl; - in->remove_replica(from); - in->mds_caps_wanted.erase(from); - - // note: this code calls _eval more often than it needs to! - // fix lock - if (in->hardlock.is_gathering(from)) { - in->hardlock.gather_set.erase(from); - if (in->hardlock.gather_set.size() == 0) - mds->locker->inode_hard_eval(in); - } - if (in->filelock.is_gathering(from)) { - in->filelock.gather_set.erase(from); - if (in->filelock.gather_set.size() == 0) - mds->locker->inode_file_eval(in); - } - - // alone now? - if (!in->is_replicated()) { - mds->locker->inode_hard_eval(in); - mds->locker->inode_file_eval(in); - } + inode_remove_replica(in, from); } else { @@ -2693,6 +2871,30 @@ void MDCache::handle_cache_expire(MCacheExpire *m) delete m; } +void MDCache::inode_remove_replica(CInode *in, int from) +{ + in->remove_replica(from); + in->mds_caps_wanted.erase(from); + + // note: this code calls _eval more often than it needs to! + // fix lock + if (in->hardlock.is_gathering(from)) { + in->hardlock.gather_set.erase(from); + if (in->hardlock.gather_set.size() == 0) + mds->locker->inode_hard_eval(in); + } + if (in->filelock.is_gathering(from)) { + in->filelock.gather_set.erase(from); + if (in->filelock.gather_set.size() == 0) + mds->locker->inode_file_eval(in); + } + + // alone now? + if (!in->is_replicated()) { + mds->locker->inode_hard_eval(in); + mds->locker->inode_file_eval(in); + } +} int MDCache::send_dir_updates(CDir *dir, bool bcast) diff --git a/branches/sage/cephmds2/mds/MDCache.h b/branches/sage/cephmds2/mds/MDCache.h index 7715f9e50d963..c388bc8f921c1 100644 --- a/branches/sage/cephmds2/mds/MDCache.h +++ b/branches/sage/cephmds2/mds/MDCache.h @@ -131,22 +131,23 @@ protected: // from MMDSImportMaps map > > other_ambiguous_imports; - set want_import_map; // nodes i need to send my import map to (when exports finish) - set import_map_gather; // nodes i need an import_map from + set recovery_set; + set got_import_map; // nodes i need to send my import map to (when exports finish) + set rejoin_ack_gather; // nodes i need a rejoin ack from void handle_import_map(MMDSImportMap *m); void handle_cache_rejoin(MMDSCacheRejoin *m); void handle_cache_rejoin_ack(MMDSCacheRejoinAck *m); void disambiguate_imports(); - void send_cache_rejoins(); void cache_rejoin_walk(CDir *dir, MMDSCacheRejoin *rejoin); void send_cache_rejoin_acks(); public: void send_import_map(int who); void send_import_maps(); + void send_cache_rejoins(); void set_recovery_set(set& s) { - want_import_map.swap(s); + recovery_set = s; } // ambiguous imports @@ -194,8 +195,7 @@ public: void log_import_map(Context *onsync=0); - - + // cache void set_cache_size(size_t max) { lru.lru_set_max(max); } size_t get_cache_size() { return lru.lru_get_size(); } @@ -243,6 +243,9 @@ public: else lru.lru_midtouch(dn); } + + void inode_remove_replica(CInode *in, int rep); + void rename_file(CDentry *srcdn, CDentry *destdn); public: diff --git a/branches/sage/cephmds2/mds/MDS.cc b/branches/sage/cephmds2/mds/MDS.cc index c8c67949b5e89..7d0a2b874255c 100644 --- a/branches/sage/cephmds2/mds/MDS.cc +++ b/branches/sage/cephmds2/mds/MDS.cc @@ -422,7 +422,7 @@ void MDS::handle_mds_map(MMDSMap *m) // is it new? if (epoch <= mdsmap->get_epoch()) { - dout(1) << " old map epoch " << epoch << " < " << mdsmap->get_epoch() + dout(1) << " old map epoch " << epoch << " <= " << mdsmap->get_epoch() << ", discarding" << endl; delete m; return; @@ -431,8 +431,9 @@ void MDS::handle_mds_map(MMDSMap *m) // note some old state int oldwhoami = whoami; int oldstate = state; - set oldrejoin; - mdsmap->get_mds_set(oldrejoin, MDSMap::STATE_REJOIN); + set oldresolve; + mdsmap->get_mds_set(oldresolve, MDSMap::STATE_RESOLVE); + bool wasrejoining = mdsmap->is_rejoining(); set oldfailed; mdsmap->get_mds_set(oldfailed, MDSMap::STATE_FAILED); @@ -512,19 +513,26 @@ void MDS::handle_mds_map(MMDSMap *m) } - - // is anybody rejoining? - if (is_rejoin() || is_active() || is_stopping()) { - set rejoin; - mdsmap->get_mds_set(rejoin, MDSMap::STATE_REJOIN); - dout(10) << "rejoin set is " << rejoin << ", was " << oldrejoin << endl; - for (set::iterator p = rejoin.begin(); p != rejoin.end(); ++p) { + // is anyone resolving? + if (is_resolve() || is_rejoin() || is_active() || is_stopping()) { + set resolve; + mdsmap->get_mds_set(resolve, MDSMap::STATE_RESOLVE); + if (oldresolve != resolve) + dout(10) << "resolve set is " << resolve << ", was " << oldresolve << endl; + for (set::iterator p = resolve.begin(); p != resolve.end(); ++p) { if (*p == whoami) continue; - if (oldrejoin.count(*p) == 0 || // if other guy newly rejoin, or - oldstate == MDSMap::STATE_REPLAY) // if i'm newly rejoin, + if (oldresolve.count(*p) == 0 || // if other guy newly resolve, or + oldstate == MDSMap::STATE_REPLAY) // if i'm newly resolve, mdcache->send_import_map(*p); // share my import map } } + + // is everybody finally rejoining? + if (is_rejoin() || is_active() || is_stopping()) { + if (!wasrejoining && mdsmap->is_rejoining()) { + mdcache->send_cache_rejoins(); + } + } // did anyone go down? if (is_active() || is_stopping()) { @@ -655,7 +663,7 @@ void MDS::boot_finish() assert(mdlog->get_read_pos() == mdlog->get_write_pos()); } - mark_active(); + set_want_state(MDSMap::STATE_ACTIVE); } @@ -705,26 +713,28 @@ void MDS::boot_replay(int step) case 6: // done with replay! - if (mdsmap->get_num_mds(MDSMap::STATE_REJOIN) == 0 && + if (mdsmap->get_num_mds(MDSMap::STATE_ACTIVE) == 0 && + mdsmap->get_num_mds(MDSMap::STATE_STOPPING) == 0 && + mdsmap->get_num_mds(MDSMap::STATE_RESOLVE) == 0 && + mdsmap->get_num_mds(MDSMap::STATE_REJOIN) == 0 && mdsmap->get_num_mds(MDSMap::STATE_REPLAY) == 1 && // me mdsmap->get_num_mds(MDSMap::STATE_FAILED) == 0) { dout(2) << "boot_replay " << step << ": i am alone, moving to state active" << endl; - want_state = MDSMap::STATE_ACTIVE; + set_want_state(MDSMap::STATE_ACTIVE); } else { - dout(2) << "boot_replay " << step << ": i am not alone, moving to state rejoin" << endl; - want_state = MDSMap::STATE_REJOIN; + dout(2) << "boot_replay " << step << ": i am not alone, moving to state resolve" << endl; + set_want_state(MDSMap::STATE_RESOLVE); } - beacon_send(); break; } } -void MDS::mark_active() +void MDS::set_want_state(int s) { - dout(3) << "mark_active" << endl; - want_state = MDSMap::STATE_ACTIVE; + dout(3) << "set_want_state " << MDSMap::get_state_name(s) << endl; + want_state = s; beacon_send(); } @@ -750,8 +760,7 @@ int MDS::shutdown_start() } // go - want_state = MDSMap::STATE_STOPPING; - beacon_send(); + set_want_state(MDSMap::STATE_STOPPING); return 0; } @@ -761,8 +770,7 @@ void MDS::handle_shutdown_start(Message *m) dout(1) << " handle_shutdown_start" << endl; // set flag - want_state = MDSMap::STATE_STOPPING; - beacon_send(); + set_want_state(MDSMap::STATE_STOPPING); delete m; } @@ -774,8 +782,7 @@ int MDS::shutdown_final() dout(1) << "shutdown_final" << endl; // send final down:out beacon (it doesn't matter if this arrives) - want_state = MDSMap::STATE_OUT; - beacon_send(); + set_want_state(MDSMap::STATE_OUT); // stop timers if (beacon_killer) { @@ -817,6 +824,24 @@ void MDS::dispatch(Message *m) void MDS::my_dispatch(Message *m) { + // from bad mds? + if (m->get_source().is_mds()) { + int from = m->get_source().num(); + if (!mdsmap->have_inst(from) || + mdsmap->get_inst(from) != m->get_source_inst()) { + // bogus mds? + if (m->get_type() != MSG_MDS_MAP) { + dout(5) << "got " << *m << " from old/bad/imposter mds " << m->get_source() + << ", dropping" << endl; + delete m; + return; + } else { + dout(5) << "got " << *m << " from old/bad/imposter mds " << m->get_source() + << ", but it's an mdsmap, looking at it" << endl; + } + } + } + switch (m->get_dest_port()) { @@ -916,8 +941,7 @@ void MDS::my_dispatch(Message *m) dout(7) << "shutdown_pass=true, finished w/ shutdown, moving to up:stopped" << endl; // tell monitor we shut down cleanly. - want_state = MDSMap::STATE_STOPPED; - beacon_send(); + set_want_state(MDSMap::STATE_STOPPED); } } @@ -957,6 +981,9 @@ void MDS::proc_message(Message *m) case MSG_PING: handle_ping((MPing*)m); return; + + default: + assert(0); } } diff --git a/branches/sage/cephmds2/mds/MDS.h b/branches/sage/cephmds2/mds/MDS.h index 85723572134d4..aed6f9b1dd3b2 100644 --- a/branches/sage/cephmds2/mds/MDS.h +++ b/branches/sage/cephmds2/mds/MDS.h @@ -152,12 +152,13 @@ class MDS : public Dispatcher { bool is_starting() { return state == MDSMap::STATE_STARTING; } bool is_standby() { return state == MDSMap::STATE_STANDBY; } bool is_replay() { return state == MDSMap::STATE_REPLAY; } + bool is_resolve() { return state == MDSMap::STATE_RESOLVE; } bool is_rejoin() { return state == MDSMap::STATE_REJOIN; } bool is_active() { return state == MDSMap::STATE_ACTIVE; } bool is_stopping() { return state == MDSMap::STATE_STOPPING; } bool is_stopped() { return state == MDSMap::STATE_STOPPED; } - void mark_active(); + void set_want_state(int s); // -- waiters -- diff --git a/branches/sage/cephmds2/mds/MDSMap.h b/branches/sage/cephmds2/mds/MDSMap.h index f2803142d2c09..21577475c9ac7 100644 --- a/branches/sage/cephmds2/mds/MDSMap.h +++ b/branches/sage/cephmds2/mds/MDSMap.h @@ -27,6 +27,7 @@ using namespace std; class MDSMap { public: + // mds states static const int STATE_DNE = 0; // down, never existed. static const int STATE_OUT = 1; // down, once existed, but no imports, empty log. static const int STATE_FAILED = 2; // down, holds (er, held) metadata; needs to be recovered. @@ -35,10 +36,11 @@ class MDSMap { static const int STATE_CREATING = 4; // up, creating MDS instance (new journal, idalloc..) static const int STATE_STARTING = 5; // up, starting prior out MDS instance. static const int STATE_REPLAY = 6; // up, scanning journal, recoverying any shared state - static const int STATE_REJOIN = 7; // up, replayed journal, rejoining distributed cache - static const int STATE_ACTIVE = 8; // up, active - static const int STATE_STOPPING = 9; // up, exporting metadata (-> standby or out) - static const int STATE_STOPPED = 10; // up, finished stopping. like standby, but not avail to takeover. + static const int STATE_RESOLVE = 7; // up, disambiguating partial distributed operations (import/export, ...rename?) + static const int STATE_REJOIN = 8; // up, replayed journal, rejoining distributed cache + static const int STATE_ACTIVE = 9; // up, active + static const int STATE_STOPPING = 10; // up, exporting metadata (-> standby or out) + static const int STATE_STOPPED = 11; // up, finished stopping. like standby, but not avail to takeover. static const char *get_state_name(int s) { switch (s) { @@ -51,6 +53,7 @@ class MDSMap { case STATE_CREATING: return "up:creating"; case STATE_STARTING: return "up:starting"; case STATE_REPLAY: return "up:replay"; + case STATE_RESOLVE: return "up:resolve"; case STATE_REJOIN: return "up:rejoin"; case STATE_ACTIVE: return "up:active"; case STATE_STOPPING: return "up:stopping"; @@ -148,13 +151,13 @@ class MDSMap { p != mds_state.end(); p++) if (is_failed(p->first) || - is_replay(p->first) || is_rejoin(p->first) || + is_replay(p->first) || is_resolve(p->first) || is_rejoin(p->first) || is_active(p->first) || is_stopping(p->first)) s.insert(p->first); } - // state + // mds states bool is_down(int m) { return is_dne(m) || is_out(m) || is_failed(m); } bool is_up(int m) { return !is_down(m); } @@ -166,18 +169,33 @@ class MDSMap { bool is_creating(int m) { return mds_state.count(m) && mds_state[m] == STATE_CREATING; } bool is_starting(int m) { return mds_state.count(m) && mds_state[m] == STATE_STARTING; } bool is_replay(int m) { return mds_state.count(m) && mds_state[m] == STATE_REPLAY; } + bool is_resolve(int m) { return mds_state.count(m) && mds_state[m] == STATE_RESOLVE; } bool is_rejoin(int m) { return mds_state.count(m) && mds_state[m] == STATE_REJOIN; } bool is_active(int m) { return mds_state.count(m) && mds_state[m] == STATE_ACTIVE; } bool is_stopping(int m) { return mds_state.count(m) && mds_state[m] == STATE_STOPPING; } bool is_stopped(int m) { return mds_state.count(m) && mds_state[m] == STATE_STOPPED; } + bool has_created(int m) { return mds_created.count(m); } + + // cluster states bool is_degraded() { - return get_num_mds(STATE_REPLAY) + get_num_mds(STATE_REJOIN) + get_num_mds(STATE_FAILED); + return get_num_mds(STATE_REPLAY) + + get_num_mds(STATE_RESOLVE) + + get_num_mds(STATE_REJOIN) + + get_num_mds(STATE_FAILED); } - bool is_created(int m) { - return mds_created.count(m); + /*bool is_resolving() { // nodes are resolving distributed ops + return get_num_mds(STATE_RESOLVE); + }*/ + bool is_rejoining() { + // nodes are rejoining cache state + return get_num_mds(STATE_REJOIN) > 0 && + get_num_mds(STATE_RESOLVE) == 0 && + get_num_mds(STATE_REPLAY) == 0 && + get_num_mds(STATE_FAILED) == 0; } + int get_state(int m) { if (mds_state.count(m)) return mds_state[m]; return STATE_OUT; diff --git a/branches/sage/cephmds2/mds/Migrator.cc b/branches/sage/cephmds2/mds/Migrator.cc index 40d4e7db837aa..7e4b0b19391bd 100644 --- a/branches/sage/cephmds2/mds/Migrator.cc +++ b/branches/sage/cephmds2/mds/Migrator.cc @@ -210,8 +210,90 @@ void Migrator::export_empty_import(CDir *dir) + // ========================================================== -// IMPORT/EXPORT +// mds failure handling + +void Migrator::handle_mds_failure(int who) +{ + dout(5) << "handle_mds_failure mds" << who << endl; + + // check my exports + map::iterator p = export_state.begin(); + while (p != export_state.end()) { + map::iterator next = p; + next++; + CDir *dir = p->first; + + if (export_peer[dir] == who) { + // the guy i'm exporting to failed. + // clean up. + dout(10) << "cleaning up export state " << p->second << " of " << *dir << endl; + + switch (p->second) { + case EXPORT_DISCOVERING: + dout(10) << "state discovering : canceling freeze and removing auth_pin" << endl; + dir->unfreeze_tree(); // cancel the freeze + dir->auth_unpin(); // remove the auth_pin (that was holding up the freeze) + break; + + case EXPORT_FREEZING: + dout(10) << "state freezing : canceling freeze" << endl; + dir->unfreeze_tree(); // cancel the freeze + break; + + case EXPORT_LOGGINGSTART: + case EXPORT_PREPPING: + dout(10) << "state loggingstart|prepping : logging EExportFinish(false)" << endl; + mds->mdlog->submit_entry(new EExportFinish(dir,false)); + // logger will unfreeze. + break; + + case EXPORT_EXPORTING: + dout(10) << "state exporting : logging EExportFinish(false), reversing, and unfreezing" << endl; + mds->mdlog->submit_entry(new EExportFinish(dir,false)); + reverse_export(dir); + dir->unfreeze_tree(); + break; + + case EXPORT_LOGGINGFINISH: + dout(10) << "state loggingfinish : doing nothing, we were successful." << endl; + break; + + default: + assert(0); + } + + export_state.erase(dir); + export_peer.erase(dir); + + } else { + // third party failed. potential peripheral damage? + if (p->second == EXPORT_EXPORTING) { + // yeah, i'm waiting for acks, let's fake theirs. + if (export_notify_ack_waiting[dir].count(who)) { + dout(10) << "faking export_dir_notify_ack from mds" << who + << " on " << *dir << " to mds" << export_peer[dir] + << endl; + export_notify_ack_waiting[dir].erase(who); + if (export_notify_ack_waiting[dir].empty()) + export_dir_acked(dir); + } + } + } + + // next! + p = next; + } +} + + + + + + +// ========================================================== +// EXPORT class C_MDC_ExportFreeze : public Context { @@ -223,7 +305,8 @@ public: C_MDC_ExportFreeze(Migrator *m, CDir *e, int d) : mig(m), ex(e), dest(d) {} virtual void finish(int r) { - mig->export_dir_frozen(ex, dest); + if (r >= 0) + mig->export_dir_frozen(ex, dest); } }; @@ -266,8 +349,9 @@ void Migrator::export_dir(CDir *dir, } // ok, let's go. - - exporting.insert(dir); + assert(export_state.count(dir) == 0); + export_state[dir] = EXPORT_DISCOVERING; + export_peer[dir] = dest; // send ExportDirDiscover (ask target) mds->send_message_mds(new MExportDirDiscover(dir->inode), dest, MDS_PORT_MIGRATOR); @@ -294,6 +378,9 @@ void Migrator::handle_export_dir_discover_ack(MExportDirDiscoverAck *m) dout(7) << "export_dir_discover_ack from " << m->get_source() << " on " << *dir << ", releasing auth_pin" << endl; + + export_state[dir] = EXPORT_FREEZING; + dir->auth_unpin(); // unpin to allow freeze to complete delete m; // done @@ -318,6 +405,7 @@ void Migrator::export_dir_frozen(CDir *dir, { // subtree is now frozen! dout(7) << "export_dir_frozen on " << *dir << " to " << dest << endl; + export_state[dir] = EXPORT_LOGGINGSTART; show_imports(); @@ -396,6 +484,17 @@ void Migrator::export_dir_frozen(CDir *dir, void Migrator::export_dir_frozen_logged(CDir *dir, MExportDirPrep *prep, int dest) { dout(7) << "export_dir_frozen_logged " << *dir << endl; + + if (export_state.count(dir) == 0 || + export_state[dir] != EXPORT_LOGGINGSTART) { + // export must have aborted. + dout(7) << "export must have aborted, unfreezing and deleting me old prep message" << endl; + delete prep; + dir->unfreeze_tree(); // cancel the freeze + return; + } + + export_state[dir] = EXPORT_PREPPING; mds->send_message_mds(prep, dest, MDS_PORT_MIGRATOR); } @@ -408,7 +507,16 @@ void Migrator::handle_export_dir_prep_ack(MExportDirPrepAck *m) dout(7) << "export_dir_prep_ack " << *dir << ", starting export" << endl; + if (export_state.count(dir) == 0 || + export_state[dir] != EXPORT_PREPPING) { + // export must have aborted. + dout(7) << "export must have aborted, unfreezing" << endl; + dir->unfreeze_tree(); + return; + } + // start export. + export_state[dir] = EXPORT_EXPORTING; export_dir_go(dir, m->get_source().num()); // done @@ -431,6 +539,8 @@ void Migrator::export_dir_go(CDir *dir, // update imports/exports CDir *containing_import = cache->get_auth_container(dir); + set bounds; + if (containing_import == dir) { dout(7) << " i'm rexporting a previous import" << endl; assert(dir->is_import()); @@ -446,6 +556,7 @@ void Migrator::export_dir_go(CDir *dir, // add to export message req->add_export(nested); + bounds.insert(nested); // nested beneath our new export *in; remove! dout(7) << " export " << *nested << " was nested beneath us; removing from export list(s)" << endl; @@ -482,6 +593,7 @@ void Migrator::export_dir_go(CDir *dir, // add to msg req->add_export(nested); + bounds.insert(nested); } else { dout(12) << " export " << *nested << " is under other export " << *containing_export << ", which is unrelated" << endl; assert(cache->get_auth_container(containing_export) != containing_import); @@ -489,12 +601,17 @@ void Migrator::export_dir_go(CDir *dir, } } + // note new authority (locally) if (dir->inode->authority() == dest) dir->set_dir_auth( CDIR_AUTH_PARENT ); else dir->set_dir_auth( dest ); + // note bounds + export_bounds[dir].swap(bounds); + + // make list of nodes i expect an export_dir_notify_ack from // (everyone w/ this dir open, but me!) assert(export_notify_ack_waiting[dir].empty()); @@ -513,7 +630,7 @@ void Migrator::export_dir_go(CDir *dir, assert(export_notify_ack_waiting[dir].count( dest )); // fill export message with cache data - C_Contexts *fin = new C_Contexts; + C_Contexts *fin = new C_Contexts; // collect all the waiters int num_exported_inodes = export_dir_walk( req, fin, dir, // base @@ -776,41 +893,136 @@ void Migrator::handle_export_dir_notify_ack(MExportDirNotifyAck *m) assert(export_notify_ack_waiting[dir].count(from)); export_notify_ack_waiting[dir].erase(from); + dout(7) << "handle_export_dir_notify_ack on " << *dir << " from " << from + << ", still need (" << export_notify_ack_waiting[dir] << ")" << endl; + // done? if (export_notify_ack_waiting[dir].empty()) { - export_notify_ack_waiting.erase(dir); - + export_dir_acked(dir); + } else { dout(7) << "handle_export_dir_notify_ack on " << *dir << " from " << from - << ", last one!" << endl; + << ", still waiting for " << export_notify_ack_waiting[dir] << endl; + } + + delete m; +} + - // log export completion, then finish (unfreeze, trigger finish context, etc.) - mds->mdlog->submit_entry(new EExportFinish(dir, true), - new C_MDS_ExportFinishLogged(this, dir)); +/* + * this happens if hte dest failes after i send teh export data but before it is acked + * that is, we don't know they safely received and logged it, so we reverse our changes + * and go on. + */ +void Migrator::reverse_export(CDir *dir) +{ + dout(7) << "reverse_export " << *dir << endl; + + assert(export_state[dir] == EXPORT_EXPORTING); + assert(export_bounds.count(dir)); + + set bounds; + bounds.swap(export_bounds[dir]); + export_bounds.erase(dir); + + // -- adjust dir_auth -- + // base + CDir *im = dir; + if (dir->get_inode()->authority() == mds->get_nodeid()) { + // parent is already me. adding to existing import. + im = mds->mdcache->get_auth_container(dir); + assert(im); + mds->mdcache->nested_exports[im].erase(dir); + dir->set_dir_auth( CDIR_AUTH_PARENT ); + dir->state_set(CDIR_STATE_EXPORT); + dir->get(CDir::PIN_EXPORT); } else { - dout(7) << "handle_export_dir_notify_ack on " << *dir << " from " << from - << ", still waiting for " << export_notify_ack_waiting[dir] << endl; + // parent isn't me. new import. + mds->mdcache->imports.insert(dir); + dir->set_dir_auth( mds->get_nodeid() ); + dir->state_set(CDIR_STATE_IMPORT); + dir->get(CDir::PIN_IMPORT); } - delete m; + dout(10) << " base " << *dir << endl; + if (dir != im) + dout(10) << " under " << *im << endl; + + // bounds + for (set::iterator p = bounds.begin(); + p != bounds.end(); + ++p) { + CDir *bd = *p; + + if (bd->get_dir_auth() == mds->get_nodeid()) { + // still me. was an import. + mds->mdcache->imports.erase(bd); + bd->set_dir_auth( CDIR_AUTH_PARENT ); + bd->state_clear(CDIR_STATE_IMPORT); + bd->put(CDir::PIN_IMPORT); + // move nested exports. + for (set::iterator q = mds->mdcache->nested_exports[bd].begin(); + q != mds->mdcache->nested_exports[bd].end(); + ++q) + mds->mdcache->nested_exports[im].insert(*q); + mds->mdcache->nested_exports.erase(bd); + } else { + // not me anymore. now an export. + mds->mdcache->exports.insert(bd); + mds->mdcache->nested_exports[im].insert(bd); + assert(bd->get_dir_auth() != CDIR_AUTH_PARENT); + bd->set_dir_auth( CDIR_AUTH_UNKNOWN ); + bd->state_set(CDIR_STATE_EXPORT); + bd->get(CDir::PIN_EXPORT); + } + + dout(10) << " bound " << *bd << endl; + } + + + // -- adjust auth bits -- + + + + } +void Migrator::export_dir_acked(CDir *dir) +{ + dout(7) << "export_dir_acked " << *dir << endl; + export_notify_ack_waiting.erase(dir); + + export_state[dir] = EXPORT_LOGGINGFINISH; + export_bounds.erase(dir); + + // log export completion, then finish (unfreeze, trigger finish context, etc.) + mds->mdlog->submit_entry(new EExportFinish(dir, true), + new C_MDS_ExportFinishLogged(this, dir)); +} + /* * once i get all teh notify_acks i can finish */ void Migrator::export_dir_finish(CDir *dir) { - // send finish/commit to new auth - mds->send_message_mds(new MExportDirFinish(dir->ino()), dir->authority(), MDS_PORT_MIGRATOR); - - // remove from exporting list - exporting.erase(dir); - + dout(7) << "export_dir_finish " << *dir << endl; + + if (export_state.count(dir)) { + // send finish/commit to new auth + mds->send_message_mds(new MExportDirFinish(dir->ino()), dir->authority(), MDS_PORT_MIGRATOR); + + // remove from exporting list + export_state.erase(dir); + export_peer.erase(dir); + } else { + dout(7) << "target must have failed, not sending final commit message. export succeeded anyway." << endl; + } + // unfreeze - dout(7) << "export_dir_finish " << *dir << ", unfreezing" << endl; + dout(7) << "export_dir_finish unfreezing" << endl; dir->unfreeze_tree(); - + // unpin path dout(7) << "export_dir_finish unpinning path" << endl; vector trace; @@ -874,10 +1086,10 @@ void Migrator::export_dir_finish(CDir *dir) +// ========================================================== +// IMPORT -// IMPORTS - class C_MDC_ExportDirDiscover : public Context { Migrator *mig; MExportDirDiscover *m; @@ -939,6 +1151,9 @@ void Migrator::handle_export_dir_discover_2(MExportDirDiscover *m, CInode *in, i // pin auth too, until the import completes. in->auth_pin(); + + import_state[in->ino()] = IMPORT_DISCOVERED; + // reply dout(7) << " sending export_dir_discover_ack on " << *in << endl; @@ -993,6 +1208,9 @@ void Migrator::handle_export_dir_prep(MExportDirPrep *m) // auth pin too dir->auth_pin(); diri->auth_unpin(); + + // change import state + import_state[diri->ino()] = IMPORT_PREPPING; // assimilate traces to exports for (list::iterator it = m->get_inodes().begin(); @@ -1040,6 +1258,9 @@ void Migrator::handle_export_dir_prep(MExportDirPrep *m) CInode *in = cache->get_inode(*it); assert(in); + // note bound. + import_bounds[dir->ino()].insert(*it); + if (!in->dir) { dout(7) << " opening nested export on " << *in << endl; cache->open_remote_dir(in, @@ -1089,7 +1310,10 @@ void Migrator::handle_export_dir_prep(MExportDirPrep *m) dout(7) << " all ready, sending export_dir_prep_ack on " << *dir << endl; mds->send_message_mds(new MExportDirPrepAck(dir->ino()), m->get_source().num(), MDS_PORT_MIGRATOR); - + + // note new state + import_state[diri->ino()] = IMPORT_PREPPED; + // done delete m; } @@ -1216,8 +1440,8 @@ void Migrator::handle_export_dir(MExportDir *m) // mark export point frozenleaf ex->get(CDir::PIN_FREEZELEAF); ex->state_set(CDIR_STATE_FROZENTREELEAF); - import_freeze_leaves[dir->ino()].insert(ex); // and take note! - + assert(import_bounds[dir->ino()].count(*it)); // we took note during prep stage + // remove our pin ex->put(CDir::PIN_IMPORTINGEXPORT); ex->state_clear(CDIR_STATE_IMPORTINGEXPORT); @@ -1281,11 +1505,16 @@ void Migrator::handle_export_dir(MExportDir *m) // adjust popularity mds->balancer->add_import(dir); + dout(7) << "handle_export_dir did " << *dir << endl; + // log it mds->mdlog->submit_entry(le, new C_MDS_ImportDirLoggedStart(this, dir, m->get_source().num(), imported_subdirs, m->get_exports())); + // note state + import_state[dir->ino()] = IMPORT_LOGGINGSTART; + // some stats if (mds->logger) { mds->logger->inc("im"); @@ -1303,7 +1532,8 @@ void Migrator::import_dir_logged_start(CDir *dir, int from, { dout(7) << "import_dir_logged " << *dir << endl; - assert(0); // test die + // note state + import_state[dir->ino()] = IMPORT_ACKING; // send notify's etc. dout(7) << "sending notifyack for " << *dir << " to old auth mds" << from << endl; @@ -1349,6 +1579,9 @@ void Migrator::handle_export_dir_finish(MExportDirFinish *m) dout(7) << "handle_export_dir_finish logging import_finish on " << *dir << endl; assert(dir->is_auth()); + // note state + import_state[dir->ino()] = IMPORT_LOGGINGFINISH; + // log mds->mdlog->submit_entry(new EImportFinish(dir, true), new C_MDS_ImportDirLoggedFinish(this,dir)); @@ -1357,34 +1590,38 @@ void Migrator::handle_export_dir_finish(MExportDirFinish *m) void Migrator::import_dir_logged_finish(CDir *dir) { - dout(7) << "import_dir_finish" << endl; - - dout(5) << "done with import of " << *dir << endl; - show_imports(); - if (mds->logger) { - mds->logger->set("nex", cache->exports.size()); - mds->logger->set("nim", cache->imports.size()); - } + dout(7) << "import_dir_logged_finish " << *dir << endl; // un auth pin (other exports can now proceed) dir->auth_unpin(); // unfreeze! - for (set::iterator p = import_freeze_leaves[dir->ino()].begin(); - p != import_freeze_leaves[dir->ino()].end(); + for (set::iterator p = import_bounds[dir->ino()].begin(); + p != import_bounds[dir->ino()].end(); ++p) { - (*p)->put(CDir::PIN_FREEZELEAF); - (*p)->state_clear(CDIR_STATE_FROZENTREELEAF); + CInode *diri = mds->mdcache->get_inode(*p); + CDir *dir = diri->dir; + assert(dir->state_test(CDIR_STATE_FROZENTREELEAF)); + dir->put(CDir::PIN_FREEZELEAF); + dir->state_clear(CDIR_STATE_FROZENTREELEAF); } - import_freeze_leaves.erase(dir->ino()); dir->unfreeze_tree(); + // clear import state (we're done!) + import_state.erase(dir->ino()); + import_bounds.erase(dir->ino()); // ok now finish contexts dout(5) << "finishing any waiters on imported data" << endl; dir->finish_waiting(CDIR_WAIT_IMPORTED); + // log it + if (mds->logger) { + mds->logger->set("nex", cache->exports.size()); + mds->logger->set("nim", cache->imports.size()); + } + show_imports(); // is it empty? if (dir->get_size() == 0 && @@ -1510,10 +1747,11 @@ int Migrator::import_dir_block(bufferlist& bl, // add to journal entry le->metablob.add_dir(dir, true); // Hmm: false would be okay in some cases + int num_imported = 0; + if (dir->is_hashed()) { // do nothing; dir is hashed - return 0; } else { // take all waiters on this dir // NOTE: a pass of imported data is guaranteed to get all of my waiters because @@ -1529,7 +1767,6 @@ int Migrator::import_dir_block(bufferlist& bl, dout(15) << "doing contents" << endl; // contents - int num_imported = 0; long nden = dstate.get_nden(); for (; nden>0; nden--) { @@ -1586,8 +1823,10 @@ int Migrator::import_dir_block(bufferlist& bl, le->metablob.add_dentry(dn, true); // Hmm: might we do dn->is_dirty() here instead? } - return num_imported; } + + dout(7) << " import_dir_block done " << *dir << endl; + return num_imported; } diff --git a/branches/sage/cephmds2/mds/Migrator.h b/branches/sage/cephmds2/mds/Migrator.h index 033885b4e4f7c..a438e8bdb7855 100644 --- a/branches/sage/cephmds2/mds/Migrator.h +++ b/branches/sage/cephmds2/mds/Migrator.h @@ -61,9 +61,20 @@ private: MDS *mds; MDCache *cache; + // -- exports -- + // export stages. used to clean up intelligently if there's a failure. + const static int EXPORT_DISCOVERING = 1; // dest is disovering export dir + const static int EXPORT_FREEZING = 2; // we're freezing the dir tree + const static int EXPORT_LOGGINGSTART = 3; // we're logging EExportStart + const static int EXPORT_PREPPING = 4; // sending dest spanning tree to export bounds + const static int EXPORT_EXPORTING = 5; // sent actual export, waiting for acks + const static int EXPORT_LOGGINGFINISH = 6; // logging EExportFinish + // export fun - set exporting; - map > export_notify_ack_waiting; // nodes i am waiting to get export_notify_ack's from + map export_state; + map export_peer; + map > export_bounds; + map > export_notify_ack_waiting; // nodes i am waiting to get export_notify_ack's from map > export_proxy_inos; map > export_proxy_dirinos; @@ -72,29 +83,56 @@ private: set stray_export_warnings; // notifies i haven't seen map stray_export_notifies; - // import muck - map > import_freeze_leaves; - // hashing madness + // -- imports -- + const static int IMPORT_DISCOVERED = 1; // waiting for prep + const static int IMPORT_PREPPING = 2; // opening dirs on bounds + const static int IMPORT_PREPPED = 3; // opened bounds, waiting for import + const static int IMPORT_LOGGINGSTART = 3; // got import, logging EImportStart + const static int IMPORT_ACKING = 4; // logged, sent acks + const static int IMPORT_LOGGINGFINISH = 5; + + map import_state; + map > import_bounds; + + + // -- hashing madness -- multimap unhash_waiting; // nodes i am waiting for UnhashDirAck's from multimap import_hashed_replicate_waiting; // nodes i am waiting to discover to complete my import of a hashed dir // maps frozen_dir_ino's to waiting-for-discover ino's. multimap import_hashed_frozen_waiting; // dirs i froze (for the above) + + public: // -- cons -- Migrator(MDS *m, MDCache *c) : mds(m), cache(c) {} void dispatch(Message*); - //bool is_importing(); - bool is_exporting(CDir *dir = 0) { - if (dir) - return exporting.count(dir); - else - return !exporting.empty(); + + // -- status -- + int is_exporting(CDir *dir) { + if (export_state.count(dir)) return export_state[dir]; + return 0; + } + bool is_exporting() { return !export_state.empty(); } + int is_importing(inodeno_t dirino) { + if (import_state.count(dirino)) return import_state[dirino]; + return 0; + } + bool is_importing() { return !import_state.empty(); } + const set& get_import_bounds(inodeno_t base) { + assert(import_bounds.count(base)); + return import_bounds[base]; } + + // -- misc -- + void handle_mds_failure(int who); + void show_imports(); + + // -- import/export -- // exporter public: @@ -121,9 +159,11 @@ public: CDir *basedir, CDir *dir, int newauth); - void export_dir_finish(CDir *dir); void handle_export_dir_notify_ack(MExportDirNotifyAck *m); - + void reverse_export(CDir *dir); + void export_dir_acked(CDir *dir); + void export_dir_finish(CDir *dir); + friend class C_MDC_ExportFreeze; friend class C_MDC_ExportStartLogged; friend class C_MDS_ExportFinishLogged; @@ -155,7 +195,6 @@ public: void handle_export_dir_warning(MExportDirWarning *m); void handle_export_dir_notify(MExportDirNotify *m); - void show_imports(); // -- hashed directories -- diff --git a/branches/sage/cephmds2/messages/MMDSCacheRejoin.h b/branches/sage/cephmds2/messages/MMDSCacheRejoin.h index 7d5554c9ca8b5..2789e30844743 100644 --- a/branches/sage/cephmds2/messages/MMDSCacheRejoin.h +++ b/branches/sage/cephmds2/messages/MMDSCacheRejoin.h @@ -18,21 +18,14 @@ #include "include/types.h" +// sent from replica to auth class MMDSCacheRejoin : public Message { - - struct InodeState { - int hardlock; // hardlock state - int filelock; // filelock state - int caps_wanted; // what caps bits i want - InodeState(int cw=0, int hl=-1, int fl=-1) : hardlock(hl), filelock(fl), caps_wanted(cw) {} - }; - - map inodes; - map > dentries; + public: + map inodes; // ino -> caps_wanted set dirs; + map > dentries; // dir -> (dentries...) - public: MMDSCacheRejoin() : Message(MSG_MDS_CACHEREJOIN) {} char *get_type_name() { return "cache_rejoin"; } @@ -42,23 +35,27 @@ class MMDSCacheRejoin : public Message { } void add_dir(inodeno_t dirino) { - dirs.insert(dirino); + dirs.insert(dirino); } - void add_dentry(inodeno_t dirino, const string& dn, int ls) { - dentries[dirino][dn] = ls; + void add_dentry(inodeno_t dirino, const string& dn) { + dentries[dirino].insert(dn); } - void add_inode(inodeno_t ino, int hl, int fl, int cw) { - inodes[ino] = InodeState(cw,hl,fl); + void add_inode(inodeno_t ino, int cw) { + inodes[ino] = cw; } void encode_payload() { - ::_encode(inodes, payload); - ::_encode(dirs, payload); + ::_encode(inodes, payload); + ::_encode(dirs, payload); + for (set::iterator p = dirs.begin(); p != dirs.end(); ++p) + ::_encode(dentries[*p], payload); } void decode_payload() { - int off = 0; - ::_decode(inodes, payload, off); - ::_decode(dirs, payload, off); + int off = 0; + ::_decode(inodes, payload, off); + ::_decode(dirs, payload, off); + for (set::iterator p = dirs.begin(); p != dirs.end(); ++p) + ::_decode(dentries[*p], payload, off); } }; diff --git a/branches/sage/cephmds2/messages/MMDSCacheRejoinAck.h b/branches/sage/cephmds2/messages/MMDSCacheRejoinAck.h new file mode 100644 index 0000000000000..b8f0d23ebbba0 --- /dev/null +++ b/branches/sage/cephmds2/messages/MMDSCacheRejoinAck.h @@ -0,0 +1,82 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef __MMDSCACHEREJOINACK_H +#define __MMDSCACHEREJOINACK_H + +#include "msg/Message.h" + +#include "include/types.h" + +// sent from auth back to replica + +class MMDSCacheRejoinAck : public Message { + public: + struct inodeinfo { + inodeno_t ino; + int hardlock; + int filelock; + int nonce; + inodeinfo() {} + inodeinfo(inodeno_t i, int h, int f, int n) : ino(i), hardlock(h), filelock(f), nonce(n) {} + }; + struct dninfo { + int lock; + int nonce; + dninfo() {} + dninfo(int l, int n) : lock(l), nonce(n) {} + }; + struct dirinfo { + inodeno_t dirino; + int nonce; + dirinfo() {} + dirinfo(inodeno_t i, int n) : dirino(i), nonce(n) {} + }; + list inodes; + map > dentries; + list dirs; + + MMDSCacheRejoinAck() : Message(MSG_MDS_CACHEREJOINACK) {} + + char *get_type_name() { return "cache_rejoin_ack"; } + + void print(ostream& out) { + out << "cache_rejoin" << endl; + } + + void add_dir(inodeno_t dirino, int nonce) { + dirs.push_back(dirinfo(dirino,nonce)); + } + void add_dentry(inodeno_t dirino, const string& dn, int ls, int nonce) { + dentries[dirino][dn] = dninfo(ls, nonce); + } + void add_inode(inodeno_t ino, int hl, int fl, int nonce) { + inodes.push_back(inodeinfo(ino, hl, fl, nonce)); + } + + void encode_payload() { + ::_encode(inodes, payload); + ::_encode(dirs, payload); + for (list::iterator p = dirs.begin(); p != dirs.end(); ++p) + ::_encode(dentries[p->dirino], payload); + } + void decode_payload() { + int off = 0; + ::_decode(inodes, payload, off); + ::_decode(dirs, payload, off); + for (list::iterator p = dirs.begin(); p != dirs.end(); ++p) + ::_decode(dentries[p->dirino], payload, off); + } +}; + +#endif diff --git a/branches/sage/cephmds2/messages/MMDSImportMap.h b/branches/sage/cephmds2/messages/MMDSImportMap.h index 3469f1b4f9007..22774cdabc2ec 100644 --- a/branches/sage/cephmds2/messages/MMDSImportMap.h +++ b/branches/sage/cephmds2/messages/MMDSImportMap.h @@ -41,7 +41,7 @@ class MMDSImportMap : public Message { imap[im].insert(ex); } - void add_ambiguous_import(inodeno_t im, set& m) { + void add_ambiguous_import(inodeno_t im, const set& m) { ambiguous_imap[im] = m; } diff --git a/branches/sage/cephmds2/mon/MDSMonitor.cc b/branches/sage/cephmds2/mon/MDSMonitor.cc index c03c44243891a..3fca23c9215c5 100644 --- a/branches/sage/cephmds2/mon/MDSMonitor.cc +++ b/branches/sage/cephmds2/mon/MDSMonitor.cc @@ -262,7 +262,7 @@ void MDSMonitor::tick() break; case MDSMap::STATE_STANDBY: - if (mdsmap.is_created(*p)) + if (mdsmap.has_created(*p)) newstate = MDSMap::STATE_OUT; else newstate = MDSMap::STATE_DNE; diff --git a/branches/sage/cephmds2/msg/Message.cc b/branches/sage/cephmds2/msg/Message.cc index 7576a56696a86..1f563a7cdaddd 100644 --- a/branches/sage/cephmds2/msg/Message.cc +++ b/branches/sage/cephmds2/msg/Message.cc @@ -53,6 +53,7 @@ using namespace std; #include "messages/MMDSBeacon.h" #include "messages/MMDSImportMap.h" #include "messages/MMDSCacheRejoin.h" +#include "messages/MMDSCacheRejoinAck.h" #include "messages/MDirUpdate.h" #include "messages/MDiscover.h" @@ -256,6 +257,9 @@ decode_message(msg_envelope_t& env, bufferlist& payload) case MSG_MDS_CACHEREJOIN: m = new MMDSCacheRejoin; break; + case MSG_MDS_CACHEREJOINACK: + m = new MMDSCacheRejoinAck; + break; case MSG_MDS_DIRUPDATE: m = new MDirUpdate(); diff --git a/branches/sage/cephmds2/msg/Message.h b/branches/sage/cephmds2/msg/Message.h index 7d9c1d8bae56c..523ff70932cc9 100644 --- a/branches/sage/cephmds2/msg/Message.h +++ b/branches/sage/cephmds2/msg/Message.h @@ -94,6 +94,7 @@ #define MSG_MDS_IMPORTMAP 106 #define MSG_MDS_CACHEREJOIN 107 +#define MSG_MDS_CACHEREJOINACK 108 #define MSG_MDS_DISCOVER 110 #define MSG_MDS_DISCOVERREPLY 111 -- 2.39.5