From 6d903804e2379fcecac74fa430c0d36356e943ae Mon Sep 17 00:00:00 2001 From: sageweil Date: Wed, 17 Oct 2007 23:58:49 +0000 Subject: [PATCH] delay caps import until after EImportStart is journaled; "export" all caps in import_reverse git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1962 29311d96-e01e-0410-9327-a35deaab8ce9 --- branches/sage/mds/mds/CInode.cc | 19 +++++- branches/sage/mds/mds/CInode.h | 6 +- branches/sage/mds/mds/MDCache.h | 3 + branches/sage/mds/mds/Migrator.cc | 98 ++++++++++++++++++++++--------- branches/sage/mds/mds/Migrator.h | 17 +++--- branches/sage/mds/mds/Server.cc | 76 +++++++++++++++++------- branches/sage/mds/mds/Server.h | 6 +- 7 files changed, 163 insertions(+), 62 deletions(-) diff --git a/branches/sage/mds/mds/CInode.cc b/branches/sage/mds/mds/CInode.cc index 3bdfc89e3f1fa..ccb1359329aba 100644 --- a/branches/sage/mds/mds/CInode.cc +++ b/branches/sage/mds/mds/CInode.cc @@ -803,7 +803,7 @@ void CInode::finish_export(utime_t now) } void CInode::decode_import(bufferlist::iterator& p, - set& new_client_caps, + map >& imported_cap_map, LogSegment *ls) { utime_t old_mtime = inode.mtime; @@ -828,7 +828,10 @@ void CInode::decode_import(bufferlist::iterator& p, map cap_map; ::_decode_simple(cap_map, p); - merge_client_caps(cap_map, new_client_caps); + if (!cap_map.empty()) { + imported_cap_map[this].swap(cap_map); + get(PIN_IMPORTINGCAPS); + } authlock._decode(p); linklock._decode(p); @@ -836,3 +839,15 @@ void CInode::decode_import(bufferlist::iterator& p, filelock._decode(p); dirlock._decode(p); } + +/* + * the cap import is delayed so that we can journal before + * contacting clients + */ +void CInode::import_caps(map& cap_map, + set& new_caps) +{ + merge_client_caps(cap_map, new_caps); + put(PIN_IMPORTINGCAPS); +} + diff --git a/branches/sage/mds/mds/CInode.h b/branches/sage/mds/mds/CInode.h index 8f453472a0477..2089ee18e1e5f 100644 --- a/branches/sage/mds/mds/CInode.h +++ b/branches/sage/mds/mds/CInode.h @@ -68,6 +68,7 @@ class CInode : public MDSCacheObject { static const int PIN_PURGING = -12; static const int PIN_FREEZING = 13; static const int PIN_FROZEN = 14; + static const int PIN_IMPORTINGCAPS = 15; const char *pin_name(int p) { switch (p) { @@ -83,6 +84,7 @@ class CInode : public MDSCacheObject { case PIN_STICKYDIRS: return "stickydirs"; case PIN_FREEZING: return "freezing"; case PIN_FROZEN: return "frozen"; + case PIN_IMPORTINGCAPS: return "importingcaps"; default: return generic_pin_name(p); } } @@ -306,8 +308,10 @@ public: put(PIN_TEMPEXPORTING); } void decode_import(bufferlist::iterator& p, - set& new_client_caps, + map >& imported_cap_map, LogSegment *ls); + void import_caps(map& cap_map, + set& new_caps); // -- locks -- diff --git a/branches/sage/mds/mds/MDCache.h b/branches/sage/mds/mds/MDCache.h index 86e3b894c6c8d..896ee791c367b 100644 --- a/branches/sage/mds/mds/MDCache.h +++ b/branches/sage/mds/mds/MDCache.h @@ -132,6 +132,9 @@ struct MDRequest { version_t inode_import_v; CInode* destdn_was_remote_inode; bool was_link_merge; + + map imported_client_map; + map > cap_imports; // called when slave commits or aborts Context *slave_commit; diff --git a/branches/sage/mds/mds/Migrator.cc b/branches/sage/mds/mds/Migrator.cc index 6ef64436318eb..e76fcec029815 100644 --- a/branches/sage/mds/mds/Migrator.cc +++ b/branches/sage/mds/mds/Migrator.cc @@ -838,12 +838,8 @@ void Migrator::encode_export_inode(CInode *in, bufferlist& enc_state, exported_client_map[it->first] = mds->clientmap.get_inst(it->first); } -void Migrator::finish_export_inode(CInode *in, utime_t now, list& finished) +void Migrator::finish_export_inode_caps(CInode *in) { - dout(12) << "finish_export_inode " << *in << dendl; - - in->finish_export(now); - // tell (all) clients about migrating caps.. mark STALE for (map::iterator it = in->client_caps.begin(); it != in->client_caps.end(); @@ -858,6 +854,15 @@ void Migrator::finish_export_inode(CInode *in, utime_t now, list& fini mds->send_message_client(m, it->first); } in->clear_client_caps(); +} + +void Migrator::finish_export_inode(CInode *in, utime_t now, list& finished) +{ + dout(12) << "finish_export_inode " << *in << dendl; + + in->finish_export(now); + + finish_export_inode_caps(in); // relax locks? if (!in->is_replicated()) @@ -1571,11 +1576,13 @@ class C_MDS_ImportDirLoggedStart : public Context { CDir *dir; int from; public: + map imported_client_map; + C_MDS_ImportDirLoggedStart(Migrator *m, CDir *d, int f) : migrator(m), dir(d), from(f) { } void finish(int r) { - migrator->import_logged_start(dir, from); + migrator->import_logged_start(dir, from, imported_client_map); } }; @@ -1590,6 +1597,8 @@ void Migrator::handle_export_dir(MExportDir *m) cache->show_subtrees(); + C_MDS_ImportDirLoggedStart *onlogged = new C_MDS_ImportDirLoggedStart(this, dir, m->get_source().num()); + // start the journal entry EImportStart *le = new EImportStart(dir->dirfrag(), m->get_bounds()); le->metablob.add_dir_context(dir); @@ -1598,11 +1607,11 @@ void Migrator::handle_export_dir(MExportDir *m) cache->adjust_subtree_auth(dir, mds->get_nodeid(), oldauth); // add this crap to my cache - map imported_client_map; bufferlist::iterator blp = m->get_dirstate().begin(); - ::_decode_simple(imported_client_map, blp); - mds->server->force_open_sessions(imported_client_map); + // new client sessions, open these after we journal + ::_decode_simple(onlogged->imported_client_map, blp); + mds->server->prepare_force_open_sessions(onlogged->imported_client_map); int num_imported_inodes = 0; while (!blp.end()) { @@ -1612,6 +1621,7 @@ void Migrator::handle_export_dir(MExportDir *m) dir, // import root le, mds->mdlog->get_current_segment(), + import_caps[dir], import_updated_scatterlocks[dir]); } dout(10) << " " << m->get_bounds().size() << " imported bounds" << dendl; @@ -1633,8 +1643,7 @@ void Migrator::handle_export_dir(MExportDir *m) dout(7) << "handle_export_dir did " << *dir << dendl; // log it - mds->mdlog->submit_entry(le, - new C_MDS_ImportDirLoggedStart(this, dir, m->get_source().num())); + mds->mdlog->submit_entry(le, onlogged); // note state import_state[dir->dirfrag()] = IMPORT_LOGGINGSTART; @@ -1743,6 +1752,20 @@ void Migrator::import_reverse(CDir *dir) } } + // reexport caps + for (map >::iterator p = import_caps[dir].begin(); + p != import_caps[dir].end(); + ++p) { + CInode *in = p->first; + /* + * bleh.. just export all caps for this inode. the auth mds + * will pick them up during recovery. + */ + map cap_map; // throw this away + in->export_client_caps(cap_map); + finish_export_inode_caps(in); + } + // log our failure mds->mdlog->submit_entry(new EImportFinish(dir, false)); // log failure @@ -1794,6 +1817,7 @@ void Migrator::import_reverse_final(CDir *dir) import_bystanders.erase(dir); import_bound_ls.erase(dir); import_updated_scatterlocks.erase(dir); + import_caps.erase(dir); // send pending import_maps? mds->mdcache->maybe_send_pending_resolves(); @@ -1803,13 +1827,42 @@ void Migrator::import_reverse_final(CDir *dir) } -void Migrator::import_logged_start(CDir *dir, int from) +void Migrator::finish_import_caps(CInode *in, int from, map &cap_map) +{ + set new_caps; + in->import_caps(cap_map, new_caps); + + for (set::iterator it = new_caps.begin(); + it != new_caps.end(); + it++) { + dout(0) << "merged caps for client" << *it << " on " << *in << dendl; + MClientFileCaps *caps = new MClientFileCaps(MClientFileCaps::OP_REAP, + in->inode, + in->client_caps[*it].get_last_seq(), + in->client_caps[*it].pending(), + in->client_caps[*it].wanted()); + caps->set_mds(from); // reap from whom? + mds->send_message_client(caps, *it); + } +} + +void Migrator::import_logged_start(CDir *dir, int from, + map &imported_client_map) { dout(7) << "import_logged " << *dir << dendl; // note state import_state[dir->dirfrag()] = IMPORT_ACKING; + // force open client sessions and finish cap import + mds->server->finish_force_open_sessions(imported_client_map); + + for (map >::iterator p = import_caps[dir].begin(); + p != import_caps[dir].end(); + ++p) { + finish_import_caps(p->first, from, p->second); + } + // send notify's etc. dout(7) << "sending ack for " << *dir << " to old auth mds" << from << dendl; mds->send_message_mds(new MExportDirAck(dir->dirfrag()), @@ -1855,6 +1908,7 @@ void Migrator::import_finish(CDir *dir) import_peer.erase(dir->dirfrag()); import_bystanders.erase(dir); import_bound_ls.erase(dir); + import_caps.erase(dir); import_updated_scatterlocks.erase(dir); // process delayed expires @@ -1880,6 +1934,7 @@ void Migrator::import_finish(CDir *dir) void Migrator::decode_import_inode(CDentry *dn, bufferlist::iterator& blp, int oldauth, LogSegment *ls, + map >& cap_imports, list& updated_scatterlocks) { dout(15) << "decode_import_inode on " << *dn << dendl; @@ -1897,8 +1952,7 @@ void Migrator::decode_import_inode(CDentry *dn, bufferlist::iterator& blp, int o } // state after link -- or not! -sage - set merged_client_caps; - in->decode_import(blp, merged_client_caps, ls); + in->decode_import(blp, cap_imports, ls); // cap imports are noted for later action // link before state -- or not! -sage if (dn->inode != in) { @@ -1930,19 +1984,6 @@ void Migrator::decode_import_inode(CDentry *dn, bufferlist::iterator& blp, int o if (in->is_replica(mds->get_nodeid())) in->remove_replica(mds->get_nodeid()); - // caps - for (set::iterator it = merged_client_caps.begin(); - it != merged_client_caps.end(); - it++) { - dout(0) << "merged caps for client" << *it << " on " << *in << dendl; - MClientFileCaps *caps = new MClientFileCaps(MClientFileCaps::OP_REAP, - in->inode, - in->client_caps[*it].get_last_seq(), - in->client_caps[*it].pending(), - in->client_caps[*it].wanted()); - caps->set_mds( oldauth ); // reap from whom? - mds->send_message_client(caps, *it); - } } @@ -1951,6 +1992,7 @@ int Migrator::decode_import_dir(bufferlist::iterator& blp, CDir *import_root, EImportStart *le, LogSegment *ls, + map >& cap_imports, list& updated_scatterlocks) { // set up dir @@ -2045,7 +2087,7 @@ int Migrator::decode_import_dir(bufferlist::iterator& blp, } else if (icode == 'I') { // inode - decode_import_inode(dn, blp, oldauth, ls, updated_scatterlocks); + decode_import_inode(dn, blp, oldauth, ls, cap_imports, updated_scatterlocks); } // add dentry to journal entry diff --git a/branches/sage/mds/mds/Migrator.h b/branches/sage/mds/mds/Migrator.h index 92f824e691c55..26dadbb15ed68 100644 --- a/branches/sage/mds/mds/Migrator.h +++ b/branches/sage/mds/mds/Migrator.h @@ -114,14 +114,7 @@ protected: map > import_bystanders; map > import_bound_ls; map > import_updated_scatterlocks; - - /* - // -- hashing madness -- - multimap unhash_waiting; // nodes i am waiting for UnhashDirAck's from - multimap import_hashed_replicate_waiting; // nodes i am waiting to discover to complete my import of a hashed dir - // maps frozen_dir_ino's to waiting-for-discover ino's. - multimap import_hashed_frozen_waiting; // dirs i froze (for the above) - */ + map > > import_caps; public: @@ -189,6 +182,8 @@ public: void encode_export_inode(CInode *in, bufferlist& enc_state, map& exported_client_map); void finish_export_inode(CInode *in, utime_t now, list& finished); + void finish_export_inode_caps(CInode *in); + int encode_export_dir(bufferlist& exportbl, CDir *dir, map& exported_client_map, @@ -224,13 +219,16 @@ public: public: void decode_import_inode(CDentry *dn, bufferlist::iterator& blp, int oldauth, LogSegment *ls, + map >& cap_imports, list& updated_scatterlocks); int decode_import_dir(bufferlist::iterator& blp, int oldauth, CDir *import_root, EImportStart *le, LogSegment *ls, + map >& cap_imports, list& updated_scatterlocks); + void finish_import_caps(CInode *in, int from, map &cap_map); public: void import_reverse(CDir *dir); @@ -239,7 +237,8 @@ protected: void import_reverse_unfreeze(CDir *dir); void import_reverse_final(CDir *dir); void import_notify_abort(CDir *dir, set& bounds); - void import_logged_start(CDir *dir, int from); + void import_logged_start(CDir *dir, int from, + map &imported_client_map); void handle_export_finish(MExportDirFinish *m); public: void import_finish(CDir *dir); diff --git a/branches/sage/mds/mds/Server.cc b/branches/sage/mds/mds/Server.cc index b60159269a6c9..92b71888e28e3 100644 --- a/branches/sage/mds/mds/Server.cc +++ b/branches/sage/mds/mds/Server.cc @@ -173,6 +173,7 @@ void Server::handle_client_session(MClientSession *m) // journal it version_t cmapv = mds->clientmap.inc_projected(); + dout(10) << " clientmap v " << mds->clientmap.get_version() << " pv " << cmapv << dendl; mdlog->submit_entry(new ESession(m->get_source_inst(), open, cmapv), new C_MDS_session_finish(mds, m->get_source_inst(), open, cmapv)); delete m; @@ -211,10 +212,21 @@ void Server::_session_logged(entity_inst_t client_inst, bool open, version_t cma mds->messenger->send_message(new MClientSession(MClientSession::OP_CLOSE), client_inst); } -void Server::force_open_sessions(map& cm) +void Server::prepare_force_open_sessions(map& cm) +{ + version_t cmapv = mds->clientmap.inc_projected(); + dout(10) << "prepare_force_open_sessions " << cmapv + << " on " << cm.size() << " clients" + << dendl; + for (map::iterator p = cm.begin(); p != cm.end(); ++p) { + mds->clientmap.add_opening(p->first); + } +} + +void Server::finish_force_open_sessions(map& cm) { - dout(10) << "force_open_sessions on " << cm.size() << " clients" << dendl; version_t v = mds->clientmap.get_version(); + dout(10) << "finish_force_open_sessions on " << cm.size() << " clients, v " << v << " -> " << (v+1) << dendl; for (map::iterator p = cm.begin(); p != cm.end(); ++p) { if (mds->clientmap.is_closing(p->first)) { dout(15) << "force_open_sessions canceling close on " << p->second << dendl; @@ -225,11 +237,11 @@ void Server::force_open_sessions(map& cm) dout(15) << "force_open_sessions have session " << p->second << dendl; continue; } - + dout(10) << "force_open_sessions opening " << p->second << dendl; mds->clientmap.open_session(p->second); mds->messenger->send_message(new MClientSession(MClientSession::OP_OPEN), p->second); - } + } mds->clientmap.set_version(v+1); } @@ -2927,7 +2939,7 @@ void Server::handle_client_rename(MDRequest *mdr) EUpdate *le = new EUpdate(mdlog, "rename"); le->metablob.add_client_req(mdr->reqid); - _rename_prepare(mdr, &le->metablob, srcdn, destdn, straydn); + _rename_prepare(mdr, &le->metablob, &le->client_map, srcdn, destdn, straydn); if (!srcdn->is_auth() && srcdn->is_primary()) { // importing inode; also journal imported client map @@ -3013,7 +3025,7 @@ void Server::_rename_prepare_witness(MDRequest *mdr, int who, CDentry *srcdn, CD void Server::_rename_prepare(MDRequest *mdr, - EMetaBlob *metablob, + EMetaBlob *metablob, bufferlist *client_map_bl, CDentry *srcdn, CDentry *destdn, CDentry *straydn) { dout(10) << "_rename_prepare " << *mdr << " " << *srcdn << " " << *destdn << dendl; @@ -3079,8 +3091,30 @@ void Server::_rename_prepare(MDRequest *mdr, version_t siv; if (srcdn->is_auth()) siv = srcdn->inode->get_projected_version(); - else + else { siv = mdr->more()->inode_import_v; + + /* import node */ + bufferlist::iterator blp = mdr->more()->inode_import.begin(); + + // imported caps + ::_decode_simple(mdr->more()->imported_client_map, blp); + ::_encode_simple(mdr->more()->imported_client_map, *client_map_bl); + prepare_force_open_sessions(mdr->more()->imported_client_map); + + list updated_scatterlocks; // we clear_updated explicitly below + + mdcache->migrator->decode_import_inode(srcdn, blp, + srcdn->authority().first, + mdr->ls, + mdr->more()->cap_imports, updated_scatterlocks); + srcdn->inode->dirlock.clear_updated(); + + + // hack: force back to !auth, temporarily + srcdn->inode->state_clear(CInode::STATE_AUTH); + + } mdr->more()->pvmap[destdn] = destdn->pre_dirty(siv+1); } metablob->add_primary_dentry(destdn, true, srcdn->inode); @@ -3223,16 +3257,16 @@ void Server::_rename_apply(MDRequest *mdr, CDentry *srcdn, CDentry *destdn, CDen // srcdn inode import? if (!srcdn->is_auth() && destdn->is_auth()) { assert(mdr->more()->inode_import.length() > 0); - bufferlist::iterator blp = mdr->more()->inode_import.begin(); - map imported_client_map; - list updated_scatterlocks; // we clear_updated explicitly below - ::_decode_simple(imported_client_map, blp); - force_open_sessions(imported_client_map); - mdcache->migrator->decode_import_inode(destdn, blp, - srcdn->authority().first, - mdr->ls, - updated_scatterlocks); - destdn->inode->dirlock.clear_updated(); + assert(destdn->inode->is_dirty()); + + // finish cap imports + finish_force_open_sessions(mdr->more()->imported_client_map); + if (mdr->more()->cap_imports.count(destdn->inode)) + mds->mdcache->migrator->finish_import_caps(destdn->inode, srcdn->authority().first, + mdr->more()->cap_imports[destdn->inode]); + + // hack: fix auth bit + destdn->inode->state_set(CInode::STATE_AUTH); } if (destdn->inode->is_auth()) destdn->inode->mark_dirty(mdr->more()->pvmap[destdn], mdr->ls); @@ -3397,7 +3431,8 @@ void Server::handle_slave_rename_prep(MDRequest *mdr) } // commit case - _rename_prepare(mdr, &le->commit, srcdn, destdn, straydn); + bufferlist blah; + _rename_prepare(mdr, &le->commit, &blah, srcdn, destdn, straydn); mdlog->submit_entry(le, new C_MDS_SlaveRenamePrep(this, mdr, srcdn, destdn, straydn)); } else { @@ -3405,8 +3440,9 @@ void Server::handle_slave_rename_prep(MDRequest *mdr) dout(10) << "not journaling, i'm not auth for anything, and srci isn't open" << dendl; // prepare anyway; this may twiddle dir_auth - EMetaBlob blah; - _rename_prepare(mdr, &blah, srcdn, destdn, straydn); + EMetaBlob blob; + bufferlist blah; + _rename_prepare(mdr, &blob, &blah, srcdn, destdn, straydn); _logged_slave_rename(mdr, srcdn, destdn, straydn); } } diff --git a/branches/sage/mds/mds/Server.h b/branches/sage/mds/mds/Server.h index 6eda7251c4025..d2252f33df7bc 100644 --- a/branches/sage/mds/mds/Server.h +++ b/branches/sage/mds/mds/Server.h @@ -22,6 +22,7 @@ class LogEvent; class C_MDS_rename_finish; class MDRequest; class EMetaBlob; +class EUpdate; class PVList; class MMDSSlaveRequest; @@ -56,7 +57,8 @@ public: void handle_client_session(class MClientSession *m); void _session_logged(entity_inst_t ci, bool open, version_t cmapv); - void force_open_sessions(map &cm); + void prepare_force_open_sessions(map &cm); + void finish_force_open_sessions(map &cm); void terminate_sessions(); void reconnect_clients(); void handle_client_reconnect(class MClientReconnect *m); @@ -167,7 +169,7 @@ public: void _rename_prepare_witness(MDRequest *mdr, int who, CDentry *srcdn, CDentry *destdn, CDentry *straydn); void _rename_prepare(MDRequest *mdr, - EMetaBlob *metablob, + EMetaBlob *metablob, bufferlist *client_map_bl, CDentry *srcdn, CDentry *destdn, CDentry *straydn); void _rename_apply(MDRequest *mdr, CDentry *srcdn, CDentry *destdn, CDentry *straydn); -- 2.39.5