From bcb9b1e400fbb49f9ec467fd5ed7828c826ad876 Mon Sep 17 00:00:00 2001 From: sageweil Date: Sat, 10 Feb 2007 00:48:40 +0000 Subject: [PATCH] surviving exporter will recovery importer failure during export git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1093 29311d96-e01e-0410-9327-a35deaab8ce9 --- branches/sage/cephmds2/Makefile | 3 + branches/sage/cephmds2/TODO | 13 +- branches/sage/cephmds2/include/buffer.h | 27 ++ branches/sage/cephmds2/mds/CDir.cc | 1 + branches/sage/cephmds2/mds/CDir.h | 32 +- branches/sage/cephmds2/mds/MDCache.cc | 29 +- branches/sage/cephmds2/mds/MDS.cc | 3 +- branches/sage/cephmds2/mds/Migrator.cc | 349 +++++++++++-------- branches/sage/cephmds2/mds/Migrator.h | 24 +- branches/sage/cephmds2/messages/MExportDir.h | 38 +- branches/sage/cephmds2/mon/ClientMonitor.cc | 4 +- branches/sage/cephmds2/msg/msg_types.h | 8 +- 12 files changed, 323 insertions(+), 208 deletions(-) diff --git a/branches/sage/cephmds2/Makefile b/branches/sage/cephmds2/Makefile index 48d44bd905614..b73d4bb4d1163 100644 --- a/branches/sage/cephmds2/Makefile +++ b/branches/sage/cephmds2/Makefile @@ -206,6 +206,9 @@ count: cat ${SRCS} | wc -l cat ${SRCS} | grep -c \; +TAGS: + etags `find . -name "*.[h|cc]"` + .depend: touch .depend diff --git a/branches/sage/cephmds2/TODO b/branches/sage/cephmds2/TODO index 8441672e770ea..b5aaa5b65bd1f 100644 --- a/branches/sage/cephmds2/TODO +++ b/branches/sage/cephmds2/TODO @@ -1,5 +1,14 @@ -- exporter recovery if importer fails during EXPORT_EXPORTING stage +- how to reliably deliver cache expire messages? + - how should proxy behave? + - exporter failure + - all cacheexpire info has been passed on up until point where export is permanent. no impact. + - importer failure + - exporter collects expire info, so that it can reverse. + - ??? + - maybe hosts should double-up expires until after export is known to have committed? + +/- exporter recovery if importer fails during EXPORT_EXPORTING stage - importer recovery if exporter fails /?- delay response to sending import_map if export in progress? @@ -10,7 +19,7 @@ - osd needs a set_floor_and_read op for safe failover/STOGITH-like semantics. -- falures during recovery stages... rejoin +- failures during recovery stages (resolve, rejoin)... make sure rejoin still works! - fix mds initial osdmap weirdness (which will currently screw up on standby -> almost anything) diff --git a/branches/sage/cephmds2/include/buffer.h b/branches/sage/cephmds2/include/buffer.h index 9fa9663ff1fd3..e9de7a9894b2a 100644 --- a/branches/sage/cephmds2/include/buffer.h +++ b/branches/sage/cephmds2/include/buffer.h @@ -826,6 +826,33 @@ inline void _decode(std::set& s, bufferlist& bl, int& off) assert(s.size() == (unsigned)n); } +// list +inline void _encode(const std::list& s, bufferlist& bl) +{ + int n = s.size(); + bl.append((char*)&n, sizeof(n)); + for (std::list::const_iterator it = s.begin(); + it != s.end(); + it++) { + ::_encode(*it, bl); + n--; + } + assert(n==0); +} +inline void _decode(std::list& s, bufferlist& bl, int& off) +{ + s.clear(); + int n; + bl.copy(off, sizeof(n), (char*)&n); + off += sizeof(n); + for (int i=0; i template diff --git a/branches/sage/cephmds2/mds/CDir.cc b/branches/sage/cephmds2/mds/CDir.cc index 578cb0c091bb8..c9b9996d91c2d 100644 --- a/branches/sage/cephmds2/mds/CDir.cc +++ b/branches/sage/cephmds2/mds/CDir.cc @@ -60,6 +60,7 @@ ostream& operator<<(ostream& out, CDir& dir) out << " dir_auth=" << dir.get_dir_auth(); out << " state=" << dir.get_state(); + if (dir.state_test(CDIR_STATE_PROXY)) out << "|proxy"; if (dir.state_test(CDIR_STATE_COMPLETE)) out << "|complete"; if (dir.state_test(CDIR_STATE_FREEZINGTREE)) out << "|freezingtree"; if (dir.state_test(CDIR_STATE_FROZENTREE)) out << "|frozentree"; diff --git a/branches/sage/cephmds2/mds/CDir.h b/branches/sage/cephmds2/mds/CDir.h index 48df8c6f411b7..6283bef7c0aff 100644 --- a/branches/sage/cephmds2/mds/CDir.h +++ b/branches/sage/cephmds2/mds/CDir.h @@ -89,12 +89,14 @@ class Context; #define CDIR_MASK_STATE_IMPORT_KEPT (CDIR_STATE_IMPORT\ |CDIR_STATE_EXPORT\ |CDIR_STATE_IMPORTINGEXPORT\ - |CDIR_STATE_FROZENTREE) + |CDIR_STATE_FROZENTREE\ + |CDIR_STATE_PROXY) + #define CDIR_MASK_STATE_EXPORT_KEPT (CDIR_STATE_HASHED\ |CDIR_STATE_FROZENTREE\ |CDIR_STATE_FROZENDIR\ |CDIR_STATE_EXPORT\ - |CDIR_STATE_PROXY) + |CDIR_STATE_PROXY) // common states #define CDIR_STATE_CLEAN 0 @@ -513,21 +515,17 @@ class CDirDiscover { // export -typedef struct { - inodeno_t ino; - __uint64_t nitems; // actual real entries - __uint64_t nden; // num dentries (including null ones) - version_t version; - unsigned state; - meta_load_t popularity_justme; - meta_load_t popularity_curdom; - int dir_auth; - int dir_rep; - // ints follow -} CDirExport_st; - class CDirExport { - CDirExport_st st; + struct { + inodeno_t ino; + long nitems; // actual real entries + long nden; // num dentries (including null ones) + version_t version; + unsigned state; + meta_load_t popularity_justme; + meta_load_t popularity_curdom; + int dir_rep; + } st; map replicas; set rep_by; @@ -543,7 +541,6 @@ class CDirExport { st.nden = dir->items.size(); st.version = dir->version; st.state = dir->state; - st.dir_auth = dir->dir_auth; st.dir_rep = dir->dir_rep; st.popularity_justme.take( dir->popularity[MDS_POP_JUSTME] ); @@ -573,7 +570,6 @@ class CDirExport { else dir->state = (dir->state & CDIR_MASK_STATE_IMPORT_KEPT) | // remember import flag, etc. (st.state & CDIR_MASK_STATE_EXPORTED); - dir->dir_auth = st.dir_auth; dir->dir_rep = st.dir_rep; dir->popularity[MDS_POP_JUSTME] += st.popularity_justme; diff --git a/branches/sage/cephmds2/mds/MDCache.cc b/branches/sage/cephmds2/mds/MDCache.cc index 5306d4b01c885..eb8ad591d6a35 100644 --- a/branches/sage/cephmds2/mds/MDCache.cc +++ b/branches/sage/cephmds2/mds/MDCache.cc @@ -607,6 +607,17 @@ void MDCache::send_cache_rejoins() map rejoins; + // if i am rejoining, send a rejoin to everyone. + // otherwise, just send to others who are rejoining. + for (set::iterator p = recovery_set.begin(); + p != recovery_set.end(); + ++p) { + if (*p == mds->get_nodeid()) continue; // nothing to myself! + if (mds->is_rejoin() || + mds->mdsmap->is_rejoin(*p)) + rejoins[*p] = new MMDSCacheRejoin; + } + // build list of dir_auth regions list dir_auth_regions; for (hash_map::iterator p = inode_map.begin(); @@ -620,14 +631,9 @@ void MDCache::send_cache_rejoins() assert(auth >= 0); if (auth == mds->get_nodeid()) continue; // skip my own regions! - - if (rejoins.count(auth) == 0) { - if (mds->is_rejoin() || // if i am rejoining, - mds->mdsmap->is_rejoin(auth)) // or if they are rejoining, - rejoins[auth] = new MMDSCacheRejoin; // send a rejoin - else - continue; // otherwise, skip this region - } + + if (rejoins.count(auth) == 0) + continue; // don't care about this node's regions // add to list dout(10) << " on mds" << auth << " region " << *p->second << endl; @@ -651,9 +657,16 @@ void MDCache::send_cache_rejoins() mds->send_message_mds(p->second, p->first, MDS_PORT_CACHE); rejoin_ack_gather.insert(p->first); } + + // nothing? + if (rejoins.empty()) { + dout(10) << "nothing to rejoin, going active" << endl; + mds->set_want_state(MDSMap::STATE_ACTIVE); + } } + void MDCache::cache_rejoin_walk(CDir *dir, MMDSCacheRejoin *rejoin) { dout(10) << "cache_rejoin_walk " << *dir << endl; diff --git a/branches/sage/cephmds2/mds/MDS.cc b/branches/sage/cephmds2/mds/MDS.cc index 3fa5f1535734b..374ce40efa166 100644 --- a/branches/sage/cephmds2/mds/MDS.cc +++ b/branches/sage/cephmds2/mds/MDS.cc @@ -546,7 +546,8 @@ void MDS::handle_mds_map(MMDSMap *m) for (set::iterator p = failed.begin(); p != failed.end(); ++p) { // newly so? if (oldfailed.count(*p)) continue; - // FIXME. + + mdcache->migrator->handle_mds_failure(*p); } } diff --git a/branches/sage/cephmds2/mds/Migrator.cc b/branches/sage/cephmds2/mds/Migrator.cc index 009bb9a3916db..5d14bfbee4283 100644 --- a/branches/sage/cephmds2/mds/Migrator.cc +++ b/branches/sage/cephmds2/mds/Migrator.cc @@ -267,8 +267,20 @@ void Migrator::handle_mds_failure(int who) export_state.erase(dir); export_peer.erase(dir); + // unpin the path + vector trace; + cache->make_trace(trace, dir->inode); + cache->path_unpin(trace, 0); + + // wake up any waiters + mds->queue_finished(export_finish_waiters[dir]); + export_finish_waiters.erase(dir); + // send pending import_maps? mds->mdcache->send_pending_import_maps(); + + mds->mdcache->show_imports(); + mds->mdcache->show_cache(); } else { // third party failed. potential peripheral damage? if (p->second == EXPORT_EXPORTING) { @@ -287,6 +299,51 @@ void Migrator::handle_mds_failure(int who) // next! p = next; } + + + // check my imports + map::iterator q = import_state.begin(); + while (q != import_state.end()) { + map::iterator next = q; + next++; + inodeno_t dirino = q->first; + CInode *diri = mds->mdcache->get_inode(dirino); + CDir *dir = 0; + if (diri) + dir = diri->dir; + + if (import_peer[dirino] == who) { + switch (import_peer[dirino]) { + case IMPORT_DISCOVERED: + + break; + + case IMPORT_PREPPING: + + break; + + case IMPORT_PREPPED: + + break; + + case IMPORT_LOGGINGSTART: + + break; + + case IMPORT_ACKING: + // hrm. make this an ambiguous import, and wait for exporter recovery to disambiguate + // ... + break; + + case IMPORT_LOGGINGFINISH: + // do nothing, exporter is no longer involved. + break; + } + } + + // next! + q = next; + } } @@ -532,22 +589,18 @@ void Migrator::handle_export_dir_prep_ack(MExportDirPrepAck *m) void Migrator::export_dir_go(CDir *dir, - int dest) + int dest) { dout(7) << "export_dir_go " << *dir << " to " << dest << endl; show_imports(); - - // build export message - MExportDir *req = new MExportDir(dir->inode); // include pop - + assert(export_bounds.count(dir) == 0); + assert(export_data.count(dir) == 0); // update imports/exports CDir *containing_import = cache->get_auth_container(dir); - set bounds; - if (containing_import == dir) { dout(7) << " i'm rexporting a previous import" << endl; assert(dir->is_import()); @@ -562,8 +615,7 @@ void Migrator::export_dir_go(CDir *dir, p++; // add to export message - req->add_export(nested); - bounds.insert(nested); + export_bounds[dir].insert(nested); // nested beneath our new export *in; remove! dout(7) << " export " << *nested << " was nested beneath us; removing from export list(s)" << endl; @@ -599,8 +651,7 @@ void Migrator::export_dir_go(CDir *dir, // exports.erase(nested); _walk does this // add to msg - req->add_export(nested); - bounds.insert(nested); + export_bounds[dir].insert(nested); } else { dout(12) << " export " << *nested << " is under other export " << *containing_export << ", which is unrelated" << endl; assert(cache->get_auth_container(containing_export) != containing_import); @@ -608,16 +659,12 @@ void Migrator::export_dir_go(CDir *dir, } } - // note new authority (locally) if (dir->inode->authority() == dest) dir->set_dir_auth( CDIR_AUTH_PARENT ); else dir->set_dir_auth( dest ); - // note bounds - export_bounds[dir].swap(bounds); - // make list of nodes i expect an export_dir_notify_ack from // (everyone w/ this dir open, but me!) @@ -638,13 +685,25 @@ void Migrator::export_dir_go(CDir *dir, // fill export message with cache data C_Contexts *fin = new C_Contexts; // collect all the waiters - int num_exported_inodes = export_dir_walk( req, + int num_exported_inodes = encode_export_dir( export_data[dir], fin, dir, // base dir, // recur start point dest ); // send the export data! + MExportDir *req = new MExportDir(dir->ino()); + + // export state + req->set_dirstate( export_data[dir] ); + + // add bounds + for (set::iterator p = export_bounds[dir].begin(); + p != export_bounds[dir].end(); + ++p) + req->add_export((*p)->ino()); + + //s end mds->send_message_mds(req, dest, MDS_PORT_MIGRATOR); // queue up the finisher @@ -736,11 +795,11 @@ void Migrator::encode_export_inode(CInode *in, bufferlist& enc_state, int new_au } -int Migrator::export_dir_walk(MExportDir *req, - C_Contexts *fin, - CDir *basedir, - CDir *dir, - int newauth) +int Migrator::encode_export_dir(list& dirstatelist, + C_Contexts *fin, + CDir *basedir, + CDir *dir, + int newauth) { int num_exported = 0; @@ -864,11 +923,14 @@ int Migrator::export_dir_walk(MExportDir *req, } } - req->add_dir( enc_dir ); + // add to dirstatelist + bufferlist bl; + dirstatelist.push_back( bl ); + dirstatelist.back().claim( enc_dir ); // subdirs for (list::iterator it = subdirs.begin(); it != subdirs.end(); it++) - num_exported += export_dir_walk(req, fin, basedir, *it, newauth); + num_exported += encode_export_dir(dirstatelist, fin, basedir, *it, newauth); return num_exported; } @@ -927,7 +989,9 @@ void Migrator::reverse_export(CDir *dir) assert(export_state[dir] == EXPORT_EXPORTING); assert(export_bounds.count(dir)); + assert(export_data.count(dir)); + // re-import it. set bounds; bounds.swap(export_bounds[dir]); export_bounds.erase(dir); @@ -936,13 +1000,14 @@ void Migrator::reverse_export(CDir *dir) // base CDir *im = dir; if (dir->get_inode()->authority() == mds->get_nodeid()) { - // parent is already me. adding to existing import. + // parent is already me. was export, adding back to existing import. im = mds->mdcache->get_auth_container(dir); assert(im); mds->mdcache->nested_exports[im].erase(dir); + mds->mdcache->exports.erase(dir); dir->set_dir_auth( CDIR_AUTH_PARENT ); - dir->state_set(CDIR_STATE_EXPORT); - dir->get(CDir::PIN_EXPORT); + dir->state_clear(CDIR_STATE_EXPORT); + dir->put(CDir::PIN_EXPORT); } else { // parent isn't me. new import. mds->mdcache->imports.insert(dir); @@ -987,19 +1052,38 @@ void Migrator::reverse_export(CDir *dir) } - // -- adjust auth bits -- - - + // reimport the dirs + list imported_subdirs; + int num_imported_inodes = 0; + + for (list::iterator p = export_data[dir].begin(); + p != export_data[dir].end(); + ++p) { + num_imported_inodes += + decode_import_dir(*p, + export_peer[dir], + dir, // import root + imported_subdirs, + 0); + } + // remove proxy bits + clear_export_proxy_pins(dir); + // some clean up + export_data.erase(dir); + export_bounds.erase(dir); + export_notify_ack_waiting.erase(dir); } + void Migrator::export_dir_acked(CDir *dir) { dout(7) << "export_dir_acked " << *dir << endl; export_notify_ack_waiting.erase(dir); export_state[dir] = EXPORT_LOGGINGFINISH; + export_data.erase(dir); export_bounds.erase(dir); // log export completion, then finish (unfreeze, trigger finish context, etc.) @@ -1037,11 +1121,32 @@ void Migrator::export_dir_finish(CDir *dir) cache->path_unpin(trace, 0); // unpin proxies + clear_export_proxy_pins(dir); + + // queue finishers + mds->queue_finished(export_finish_waiters[dir]); + export_finish_waiters.erase(dir); + + // stats + if (mds->logger) mds->logger->set("nex", cache->exports.size()); + + show_imports(); + + // send pending import_maps? + mds->mdcache->send_pending_import_maps(); +} + + +void Migrator::clear_export_proxy_pins(CDir *dir) +{ + dout(10) << "clear_export_proxy_pins " << *dir << endl; + // inodes for (list::iterator it = export_proxy_inos[dir].begin(); it != export_proxy_inos[dir].end(); it++) { CInode *in = cache->get_inode(*it); + dout(15) << " " << *in << endl; in->put(CInode::PIN_PROXY); assert(in->state_test(CInode::STATE_PROXY)); in->state_clear(CInode::STATE_PROXY); @@ -1052,7 +1157,8 @@ void Migrator::export_dir_finish(CDir *dir) for (list::iterator it = export_proxy_dirinos[dir].begin(); it != export_proxy_dirinos[dir].end(); it++) { - CDir *dir = cache->get_inode(*it)->dir; + CDir *dir = cache->get_inode(*it)->dir; + dout(15) << " " << *dir << endl; dir->put(CDir::PIN_PROXY); assert(dir->state_test(CDIR_STATE_PROXY)); dir->state_clear(CDIR_STATE_PROXY); @@ -1073,23 +1179,7 @@ void Migrator::export_dir_finish(CDir *dir) } } export_proxy_dirinos.erase(dir); - - // queue finishers - mds->queue_finished(export_finish_waiters[dir]); - export_finish_waiters.erase(dir); - - // stats - if (mds->logger) mds->logger->set("nex", cache->exports.size()); - - show_imports(); - - // send pending import_maps? - mds->mdcache->send_pending_import_maps(); -} - - - - +} @@ -1163,6 +1253,7 @@ void Migrator::handle_export_dir_discover_2(MExportDirDiscover *m, CInode *in, i in->auth_pin(); import_state[in->ino()] = IMPORT_DISCOVERED; + import_peer[in->ino()] = m->get_source().num(); // reply @@ -1398,37 +1489,24 @@ void Migrator::handle_export_dir(MExportDir *m) le->metablob.add_dir_context(dir); // note new authority (locally) - if (dir->inode->is_auth()) - dir->set_dir_auth( CDIR_AUTH_PARENT ); - else - dir->set_dir_auth( mds->get_nodeid() ); - dout(10) << " set dir_auth to " << dir->get_dir_auth() << endl; - - // update imports/exports - CDir *containing_import; - if (cache->exports.count(dir)) { - // reimporting - dout(7) << " i'm reimporting " << *dir << endl; - cache->exports.erase(dir); - + CDir *im = dir; + if (dir->inode->is_auth()) { + // parent is already me. was export, adding back to existing import. + im = mds->mdcache->get_auth_container(dir); + assert(im); + mds->mdcache->nested_exports[im].erase(dir); + mds->mdcache->exports.erase(dir); + dir->set_dir_auth( CDIR_AUTH_PARENT ); dir->state_clear(CDIR_STATE_EXPORT); - dir->put(CDir::PIN_EXPORT); // unpin, no longer an export - - containing_import = cache->get_auth_container(dir); - dout(7) << " it is nested under import " << *containing_import << endl; - cache->nested_exports[containing_import].erase(dir); + dir->put(CDir::PIN_EXPORT); } else { - // new import - cache->imports.insert(dir); + // parent isn't me. new import. + mds->mdcache->imports.insert(dir); + dir->set_dir_auth( mds->get_nodeid() ); dir->state_set(CDIR_STATE_IMPORT); - dir->get(CDir::PIN_IMPORT); // must keep it pinned - - containing_import = dir; // imported exports nested under *in - - dout(7) << " new import at " << *dir << endl; + dir->get(CDir::PIN_IMPORT); } - // take out my temp pin dir->put(CDir::PIN_IMPORTING); @@ -1437,72 +1515,61 @@ void Migrator::handle_export_dir(MExportDir *m) dir->get_inode()->auth_pin(); dir->state_set(CDIR_STATE_FROZENTREE); - // add any inherited exports + dout(10) << " base " << *dir << endl; + if (dir != im) + dout(10) << " under " << *im << endl; + + // bounds for (list::iterator it = m->get_exports().begin(); it != m->get_exports().end(); it++) { - CInode *exi = cache->get_inode(*it); - assert(exi && exi->dir); - CDir *ex = exi->dir; - - dout(15) << " nested export " << *ex << endl; + CInode *bdi = cache->get_inode(*it); + CDir *bd = bdi->dir; + + if (bd->get_dir_auth() == mds->get_nodeid()) { + // still me. was an import. + assert(bd->is_import()); + mds->mdcache->imports.erase(bd); + bd->set_dir_auth( CDIR_AUTH_PARENT ); + bd->state_clear(CDIR_STATE_IMPORT); + bd->put(CDir::PIN_IMPORT); + // move nested exports. + for (set::iterator q = mds->mdcache->nested_exports[bd].begin(); + q != mds->mdcache->nested_exports[bd].end(); + ++q) + mds->mdcache->nested_exports[im].insert(*q); + mds->mdcache->nested_exports.erase(bd); + } else { + // not me anymore. now an export. + mds->mdcache->exports.insert(bd); + mds->mdcache->nested_exports[im].insert(bd); + assert(bd->get_dir_auth() != CDIR_AUTH_PARENT); + bd->set_dir_auth( CDIR_AUTH_UNKNOWN ); + bd->state_set(CDIR_STATE_EXPORT); + bd->get(CDir::PIN_EXPORT); + } // mark export point frozenleaf - ex->get(CDir::PIN_FREEZELEAF); - ex->state_set(CDIR_STATE_FROZENTREELEAF); + bd->get(CDir::PIN_FREEZELEAF); + bd->state_set(CDIR_STATE_FROZENTREELEAF); assert(import_bounds[dir->ino()].count(*it)); // we took note during prep stage - - // remove our pin - ex->put(CDir::PIN_IMPORTINGEXPORT); - ex->state_clear(CDIR_STATE_IMPORTINGEXPORT); - - - // add... - if (ex->is_import()) { - dout(7) << " importing my import " << *ex << endl; - cache->imports.erase(ex); - ex->state_clear(CDIR_STATE_IMPORT); - - if (mds->logger) mds->logger->inc("imex"); - // move nested exports under containing_import - for (set::iterator it = cache->nested_exports[ex].begin(); - it != cache->nested_exports[ex].end(); - it++) { - dout(7) << " moving nested export " << **it - << " under " << *containing_import << endl; - cache->nested_exports[containing_import].insert(*it); - } - cache->nested_exports.erase(ex); // de-list under old import - - ex->set_dir_auth( CDIR_AUTH_PARENT ); - ex->put(CDir::PIN_IMPORT); // imports are pinned, no longer import + // remove our pin + bd->put(CDir::PIN_IMPORTINGEXPORT); + bd->state_clear(CDIR_STATE_IMPORTINGEXPORT); - } else { - dout(7) << " importing export " << *ex << endl; - - // add it - ex->state_set(CDIR_STATE_EXPORT); - ex->get(CDir::PIN_EXPORT); // all exports are pinned - cache->exports.insert(ex); - cache->nested_exports[containing_import].insert(ex); - if (mds->logger) mds->logger->inc("imex"); - } - + dout(10) << " bound " << *bd << endl; } - - + // add this crap to my cache list imported_subdirs; - bufferlist dir_state; - dir_state.claim( m->get_state() ); - int off = 0; int num_imported_inodes = 0; - for (int i = 0; i < m->get_ndirs(); i++) { + for (list::iterator p = m->get_dirstate().begin(); + p != m->get_dirstate().end(); + ++p) { num_imported_inodes += - import_dir_block(dir_state, - off, + decode_import_dir(*p, oldauth, dir, // import root imported_subdirs, @@ -1620,6 +1687,7 @@ void Migrator::import_dir_logged_finish(CDir *dir) // clear import state (we're done!) import_state.erase(dir->ino()); + import_peer.erase(dir->ino()); import_bounds.erase(dir->ino()); // ok now finish contexts @@ -1676,8 +1744,8 @@ void Migrator::decode_import_inode(CDentry *dn, bufferlist& bl, int& off, int ol } - // cached_by - assert(!in->is_replica(oldauth)); + // adjust replica list + //assert(!in->is_replica(oldauth)); // not true on failed export in->add_replica( oldauth, CINODE_EXPORT_NONCE ); if (in->is_replica(mds->get_nodeid())) in->remove_replica(mds->get_nodeid()); @@ -1717,23 +1785,24 @@ void Migrator::decode_import_inode(CDentry *dn, bufferlist& bl, int& off, int ol } -int Migrator::import_dir_block(bufferlist& bl, - int& off, +int Migrator::decode_import_dir(bufferlist& bl, int oldauth, CDir *import_root, list& imported_subdirs, EImportStart *le) { + int off = 0; + // set up dir CDirExport dstate; off = dstate._decode(bl, off); - + CInode *diri = cache->get_inode(dstate.get_ino()); assert(diri); CDir *dir = diri->get_or_open_dir(mds->mdcache); assert(dir); - - dout(7) << " import_dir_block " << *dir << " have " << dir->nitems << " items, importing " << dstate.get_nden() << " dentries" << endl; + + dout(7) << "decode_import_dir " << *dir << endl; // add to list if (dir != import_root) @@ -1741,21 +1810,20 @@ int Migrator::import_dir_block(bufferlist& bl, // assimilate state dstate.update_dir( dir ); - if (diri->is_auth()) - dir->set_dir_auth( CDIR_AUTH_PARENT ); // update_dir may hose dir_auth // mark (may already be marked from get_or_open_dir() above) if (!dir->is_auth()) dir->state_set(CDIR_STATE_AUTH); - // open_by - assert(!dir->is_replica(oldauth)); + // adjust replica list + //assert(!dir->is_replica(oldauth)); // not true on failed export dir->add_replica(oldauth); if (dir->is_replica(mds->get_nodeid())) dir->remove_replica(mds->get_nodeid()); // add to journal entry - le->metablob.add_dir(dir, true); // Hmm: false would be okay in some cases + if (le) + le->metablob.add_dir(dir, true); // Hmm: false would be okay in some cases int num_imported = 0; @@ -1830,12 +1898,13 @@ int Migrator::import_dir_block(bufferlist& bl, } // add dentry to journal entry - le->metablob.add_dentry(dn, true); // Hmm: might we do dn->is_dirty() here instead? + if (le) + le->metablob.add_dentry(dn, true); // Hmm: might we do dn->is_dirty() here instead? } } - dout(7) << " import_dir_block done " << *dir << endl; + dout(7) << "decode_import_dir done " << *dir << endl; return num_imported; } @@ -2059,7 +2128,7 @@ void Migrator::import_hashed_content(CDir *dir, bufferlist& bl, int nden, int ol - remember simple rule: dir auth follows inode, unless dir_auth is explicit. - - export_dir_walk and import_dir_block take care with dir_auth: (for import/export) + - export_dir_walk and decode_import_dir take care with dir_auth: (for import/export) - on export, -1 is changed to mds->get_nodeid() - on import, nothing special, actually. diff --git a/branches/sage/cephmds2/mds/Migrator.h b/branches/sage/cephmds2/mds/Migrator.h index a438e8bdb7855..dd2886008d163 100644 --- a/branches/sage/cephmds2/mds/Migrator.h +++ b/branches/sage/cephmds2/mds/Migrator.h @@ -29,12 +29,12 @@ class CDir; class CInode; class CDentry; +class MExportDir; class MExportDirDiscover; class MExportDirDiscoverAck; class MExportDirPrep; class MExportDirPrepAck; class MExportDirWarning; -class MExportDir; class MExportDirNotify; class MExportDirNotifyAck; class MExportDirFinish; @@ -74,6 +74,7 @@ private: map export_state; map export_peer; map > export_bounds; + map > export_data; // only during EXPORTING state map > export_notify_ack_waiting; // nodes i am waiting to get export_notify_ack's from map > export_proxy_inos; map > export_proxy_dirinos; @@ -88,11 +89,12 @@ private: const static int IMPORT_DISCOVERED = 1; // waiting for prep const static int IMPORT_PREPPING = 2; // opening dirs on bounds const static int IMPORT_PREPPED = 3; // opened bounds, waiting for import - const static int IMPORT_LOGGINGSTART = 3; // got import, logging EImportStart - const static int IMPORT_ACKING = 4; // logged, sent acks - const static int IMPORT_LOGGINGFINISH = 5; + const static int IMPORT_LOGGINGSTART = 4; // got import, logging EImportStart + const static int IMPORT_ACKING = 5; // logged, sent acks + const static int IMPORT_LOGGINGFINISH = 6; map import_state; + map import_peer; map > import_bounds; @@ -146,6 +148,7 @@ public: void add_export_finish_waiter(CDir *dir, Context *c) { export_finish_waiters[dir].push_back(c); } + void clear_export_proxy_pins(CDir *dir); protected: void handle_export_dir_discover_ack(MExportDirDiscoverAck *m); @@ -154,7 +157,7 @@ public: void handle_export_dir_prep_ack(MExportDirPrepAck *m); void export_dir_go(CDir *dir, int dest); - int export_dir_walk(MExportDir *req, + int encode_export_dir(list& dirstatelist, class C_Contexts *fin, CDir *basedir, CDir *dir, @@ -177,12 +180,11 @@ public: list &exports); void import_dir_logged_finish(CDir *dir); void handle_export_dir_finish(MExportDirFinish *m); - int import_dir_block(bufferlist& bl, - int& off, - int oldauth, - CDir *import_root, - list& imported_subdirs, - EImportStart *le); + int decode_import_dir(bufferlist& bl, + int oldauth, + CDir *import_root, + list& imported_subdirs, + EImportStart *le); void got_hashed_replica(CDir *import, inodeno_t dir_ino, inodeno_t replica_ino); diff --git a/branches/sage/cephmds2/messages/MExportDir.h b/branches/sage/cephmds2/messages/MExportDir.h index c81a5db0beee8..8fdda89466b1e 100644 --- a/branches/sage/cephmds2/messages/MExportDir.h +++ b/branches/sage/cephmds2/messages/MExportDir.h @@ -21,50 +21,42 @@ class MExportDir : public Message { inodeno_t ino; - int ndirs; - bufferlist state; - - list exports; - - // hashed pre-discovers - //map > hashed_prediscover; + list dirstate; // a bl for reach dir + list exports; public: MExportDir() {} - MExportDir(CInode *in) : - Message(MSG_MDS_EXPORTDIR) { - this->ino = in->inode.ino; - ndirs = 0; + MExportDir(inodeno_t dirino) : + Message(MSG_MDS_EXPORTDIR), + ino(dirino) { } virtual char *get_type_name() { return "Ex"; } inodeno_t get_ino() { return ino; } - int get_ndirs() { return ndirs; } - bufferlist& get_state() { return state; } + list& get_dirstate() { return dirstate; } list& get_exports() { return exports; } - + void add_dir(bufferlist& dir) { - state.claim_append( dir ); - ndirs++; + dirstate.push_back(dir); + } + void set_dirstate(const list& ls) { + dirstate = ls; } - void add_export(CDir *dir) { - exports.push_back(dir->ino()); + void add_export(inodeno_t dirino) { + exports.push_back(dirino); } virtual void decode_payload() { int off = 0; payload.copy(off, sizeof(ino), (char*)&ino); off += sizeof(ino); - payload.copy(off, sizeof(ndirs), (char*)&ndirs); - off += sizeof(ndirs); ::_decode(exports, payload, off); - ::_decode(state, payload, off); + ::_decode(dirstate, payload, off); } virtual void encode_payload() { payload.append((char*)&ino, sizeof(ino)); - payload.append((char*)&ndirs, sizeof(ndirs)); ::_encode(exports, payload); - ::_encode(state, payload); + ::_encode(dirstate, payload); } }; diff --git a/branches/sage/cephmds2/mon/ClientMonitor.cc b/branches/sage/cephmds2/mon/ClientMonitor.cc index f37510cbe77bf..8ab59504d4bae 100644 --- a/branches/sage/cephmds2/mon/ClientMonitor.cc +++ b/branches/sage/cephmds2/mon/ClientMonitor.cc @@ -65,7 +65,9 @@ void ClientMonitor::handle_client_boot(MClientBoot *m) client_map[MSG_ADDR_CLIENT(from)] = m->get_source_addr(); // reply with latest mds map - mon->mdsmon->send_latest(m->get_source_inst()); + entity_inst_t to = m->get_source_inst(); + to.name = MSG_ADDR_CLIENT(from); + mon->mdsmon->send_latest(to); delete m; } diff --git a/branches/sage/cephmds2/msg/msg_types.h b/branches/sage/cephmds2/msg/msg_types.h index aca159a9b9f59..5fd8d780b291e 100644 --- a/branches/sage/cephmds2/msg/msg_types.h +++ b/branches/sage/cephmds2/msg/msg_types.h @@ -19,8 +19,8 @@ // new typed msg_addr_t way! class entity_name_t { - int _type:3; - int _num:29; + int _type; + int _num; public: static const int TYPE_MON = 1; @@ -42,8 +42,8 @@ public: case TYPE_OSD: return "osd"; case TYPE_MON: return "mon"; case TYPE_CLIENT: return "client"; - } - return "unknown"; + default: return "unknown"; + } } bool is_new() const { return num() == NEW; } -- 2.39.5