From f286f14749e2aca998c78aaeecc64a8bf24c6007 Mon Sep 17 00:00:00 2001 From: sageweil Date: Tue, 2 Oct 2007 00:16:59 +0000 Subject: [PATCH] migrator export now makes second pass over subtree to delay auth change, dirty->clean transition git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1878 29311d96-e01e-0410-9327-a35deaab8ce9 --- branches/sage/mds/mds/AnchorClient.cc | 52 ++++--- branches/sage/mds/mds/AnchorClient.h | 20 ++- branches/sage/mds/mds/CDentry.cc | 2 +- branches/sage/mds/mds/CDentry.h | 14 +- branches/sage/mds/mds/CDir.cc | 2 +- branches/sage/mds/mds/CDir.h | 5 +- branches/sage/mds/mds/CInode.cc | 2 +- branches/sage/mds/mds/Locker.cc | 8 +- branches/sage/mds/mds/LogSegment.h | 5 +- branches/sage/mds/mds/MDCache.cc | 75 +++++----- branches/sage/mds/mds/MDCache.h | 8 +- branches/sage/mds/mds/Migrator.cc | 173 +++++++++++++----------- branches/sage/mds/mds/Migrator.h | 9 +- branches/sage/mds/mds/Server.cc | 26 ++-- branches/sage/mds/mds/journal.cc | 24 +++- branches/sage/mds/mds/mdstypes.h | 3 +- branches/sage/mds/messages/MExportDir.h | 3 + 17 files changed, 240 insertions(+), 191 deletions(-) diff --git a/branches/sage/mds/mds/AnchorClient.cc b/branches/sage/mds/mds/AnchorClient.cc index 3ae9db25ffd2e..b2fb1fb50d7bd 100644 --- a/branches/sage/mds/mds/AnchorClient.cc +++ b/branches/sage/mds/mds/AnchorClient.cc @@ -25,6 +25,7 @@ using std::cerr; #include "MDS.h" #include "MDLog.h" +#include "LogSegment.h" #include "events/EAnchorClient.h" #include "messages/MAnchor.h" @@ -79,8 +80,6 @@ void AnchorClient::handle_anchor_reply(class MAnchor *m) *pending_create_prepare[ino].patid = atid; pending_create_prepare.erase(ino); - pending_commit.insert(atid); - if (onfinish) { onfinish->finish(0); delete onfinish; @@ -115,8 +114,6 @@ void AnchorClient::handle_anchor_reply(class MAnchor *m) *pending_destroy_prepare[ino].patid = atid; pending_destroy_prepare.erase(ino); - pending_commit.insert(atid); - if (onfinish) { onfinish->finish(0); delete onfinish; @@ -151,8 +148,6 @@ void AnchorClient::handle_anchor_reply(class MAnchor *m) *pending_update_prepare[ino].patid = atid; pending_update_prepare.erase(ino); - pending_commit.insert(atid); - if (onfinish) { onfinish->finish(0); delete onfinish; @@ -187,17 +182,11 @@ void AnchorClient::handle_anchor_reply(class MAnchor *m) // remove from committing list assert(pending_commit.count(atid)); - pending_commit.erase(atid); - + assert(pending_commit[atid]->pending_commit_atids.count(atid)); + // log ACK. - mds->mdlog->submit_entry(new EAnchorClient(ANCHOR_OP_ACK, atid)); - - // kick any waiters - if (ack_waiters.count(atid)) { - dout(15) << "kicking waiters on atid " << atid << dendl; - mds->queue_waiters(ack_waiters[atid]); - ack_waiters.erase(atid); - } + mds->mdlog->submit_entry(new EAnchorClient(ANCHOR_OP_ACK, atid), + new C_LoggedAck(this, atid)); } break; @@ -209,6 +198,24 @@ void AnchorClient::handle_anchor_reply(class MAnchor *m) } +void AnchorClient::_logged_ack(version_t atid) +{ + dout(10) << "_logged_ack" << dendl; + + assert(pending_commit.count(atid)); + assert(pending_commit[atid]->pending_commit_atids.count(atid)); + + pending_commit[atid]->pending_commit_atids.erase(atid); + pending_commit.erase(atid); + + // kick any waiters (LogSegment trim) + if (ack_waiters.count(atid)) { + dout(15) << "kicking ack waiters on atid " << atid << dendl; + mds->queue_waiters(ack_waiters[atid]); + ack_waiters.erase(atid); + } +} + /* * public async interface @@ -291,12 +298,13 @@ void AnchorClient::prepare_update(inodeno_t ino, vector& trace, // COMMIT -void AnchorClient::commit(version_t atid) +void AnchorClient::commit(version_t atid, LogSegment *ls) { dout(10) << "commit " << atid << dendl; - assert(pending_commit.count(atid)); - pending_commit.insert(atid); + assert(pending_commit.count(atid) == 0); + pending_commit[atid] = ls; + ls->pending_commit_atids.insert(atid); // send message MAnchor *req = new MAnchor(ANCHOR_OP_COMMIT, 0, atid); @@ -318,11 +326,11 @@ void AnchorClient::finish_recovery() void AnchorClient::resend_commits() { - for (set::iterator p = pending_commit.begin(); + for (map::iterator p = pending_commit.begin(); p != pending_commit.end(); ++p) { - dout(10) << "resending commit on " << *p << dendl; - MAnchor *req = new MAnchor(ANCHOR_OP_COMMIT, 0, *p); + dout(10) << "resending commit on " << p->first << dendl; + MAnchor *req = new MAnchor(ANCHOR_OP_COMMIT, 0, p->first); mds->send_message_mds(req, mds->mdsmap->get_anchortable(), MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT); diff --git a/branches/sage/mds/mds/AnchorClient.h b/branches/sage/mds/mds/AnchorClient.h index 6ec5603b0bc7e..fd790f39c399d 100644 --- a/branches/sage/mds/mds/AnchorClient.h +++ b/branches/sage/mds/mds/AnchorClient.h @@ -27,6 +27,7 @@ using __gnu_cxx::hash_map; class Context; class MDS; +class LogSegment; class AnchorClient : public Dispatcher { MDS *mds; @@ -49,11 +50,22 @@ class AnchorClient : public Dispatcher { hash_map pending_update_prepare; // pending commits - set pending_commit; + map pending_commit; map > ack_waiters; void handle_anchor_reply(class MAnchor *m); + class C_LoggedAck : public Context { + AnchorClient *ac; + version_t atid; + public: + C_LoggedAck(AnchorClient *a, version_t t) : ac(a), atid(t) {} + void finish(int r) { + ac->_logged_ack(atid); + } + }; + void _logged_ack(version_t atid); + public: AnchorClient(MDS *m) : mds(m) {} @@ -66,7 +78,7 @@ public: void prepare_destroy(inodeno_t ino, version_t *atid, Context *onfinish); void prepare_update(inodeno_t ino, vector& trace, version_t *atid, Context *onfinish); - void commit(version_t atid); + void commit(version_t atid, LogSegment *ls); // for recovery (by other nodes) void handle_mds_recovery(int mds); // called when someone else recovers @@ -75,8 +87,8 @@ public: void resend_prepares(hash_map& prepares, int op); // for recovery (by me) - void got_journaled_agree(version_t atid) { - pending_commit.insert(atid); + void got_journaled_agree(version_t atid, LogSegment *ls) { + pending_commit[atid] = ls; } void got_journaled_ack(version_t atid) { pending_commit.erase(atid); diff --git a/branches/sage/mds/mds/CDentry.cc b/branches/sage/mds/mds/CDentry.cc index 1297acb869573..2b6bb3470e8a8 100644 --- a/branches/sage/mds/mds/CDentry.cc +++ b/branches/sage/mds/mds/CDentry.cc @@ -120,7 +120,7 @@ pair CDentry::authority() void CDentry::add_waiter(int tag, Context *c) { // wait on the directory? - if (tag & (WAIT_AUTHPINNABLE|WAIT_SINGLEAUTH)) { + if (tag & (WAIT_UNFREEZE|WAIT_SINGLEAUTH)) { dir->add_waiter(tag, c); return; } diff --git a/branches/sage/mds/mds/CDentry.h b/branches/sage/mds/mds/CDentry.h index a96ff54d590e7..7991ab9447d7e 100644 --- a/branches/sage/mds/mds/CDentry.h +++ b/branches/sage/mds/mds/CDentry.h @@ -211,13 +211,14 @@ public: // -- exporting // note: this assumes the dentry already exists. // i.e., the name is already extracted... so we just need the other state. - void encode_export_state(bufferlist& bl) { - bl.append((char*)&state, sizeof(state)); - bl.append((char*)&version, sizeof(version)); - bl.append((char*)&projected_version, sizeof(projected_version)); + void encode_export(bufferlist& bl) { + ::_encode_simple(state, bl); + ::_encode_simple(version, bl); + ::_encode_simple(projected_version, bl); lock._encode(bl); - ::_encode(replica_map, bl); - + ::_encode_simple(replica_map, bl); + } + void finish_export() { // twiddle clear_replica_map(); replica_nonce = EXPORT_NONCE; @@ -225,6 +226,7 @@ public: if (is_dirty()) mark_clean(); } + void decode_import_state(bufferlist& bl, int& off, int from, int to, LogSegment *ls) { int nstate; bl.copy(off, sizeof(nstate), (char*)&nstate); diff --git a/branches/sage/mds/mds/CDir.cc b/branches/sage/mds/mds/CDir.cc index 6a3505148e504..d01a0b89889e4 100644 --- a/branches/sage/mds/mds/CDir.cc +++ b/branches/sage/mds/mds/CDir.cc @@ -794,7 +794,7 @@ void CDir::fetch(Context *c, bool ignore_authpinnability) if (!can_auth_pin() && !ignore_authpinnability) { dout(7) << "fetch waiting for authpinnable" << dendl; - add_waiter(WAIT_AUTHPINNABLE, c); + add_waiter(WAIT_UNFREEZE, c); return; } diff --git a/branches/sage/mds/mds/CDir.h b/branches/sage/mds/mds/CDir.h index 1f14590741cea..f8ba1b29ce9d7 100644 --- a/branches/sage/mds/mds/CDir.h +++ b/branches/sage/mds/mds/CDir.h @@ -143,7 +143,7 @@ class CDir : public MDSCacheObject { static const int WAIT_DNLOCK_OFFSET = 4; static const int WAIT_ANY = (0xffffffff); - static const int WAIT_ATFREEZEROOT = (WAIT_AUTHPINNABLE|WAIT_UNFREEZE); + static const int WAIT_ATFREEZEROOT = (WAIT_UNFREEZE); static const int WAIT_ATSUBTREEROOT = (WAIT_SINGLEAUTH); @@ -550,10 +550,11 @@ class CDirExport { st.pop_me = dir->pop_me; st.pop_auth_subtree = dir->pop_auth_subtree; + /* dir->pop_auth_subtree_nested -= dir->pop_auth_subtree; dir->pop_me.zero(now); dir->pop_auth_subtree.zero(now); - + */ rep_by = dir->dir_rep_by; replicas = dir->replica_map; } diff --git a/branches/sage/mds/mds/CInode.cc b/branches/sage/mds/mds/CInode.cc index 3d4c219cc4be7..26f0cb3ea570d 100644 --- a/branches/sage/mds/mds/CInode.cc +++ b/branches/sage/mds/mds/CInode.cc @@ -604,7 +604,7 @@ bool CInode::is_freezing() void CInode::add_waiter(int tag, Context *c) { // wait on the directory? - if (tag & (WAIT_AUTHPINNABLE|WAIT_SINGLEAUTH)) { + if (tag & (WAIT_UNFREEZE|WAIT_SINGLEAUTH)) { parent->dir->add_waiter(tag, c); return; } diff --git a/branches/sage/mds/mds/Locker.cc b/branches/sage/mds/mds/Locker.cc index 25195443a94b7..f0a29ec51e1c0 100644 --- a/branches/sage/mds/mds/Locker.cc +++ b/branches/sage/mds/mds/Locker.cc @@ -202,7 +202,7 @@ bool Locker::acquire_locks(MDRequest *mdr, if (!object->can_auth_pin()) { // wait dout(10) << " can't auth_pin (freezing?), waiting to authpin " << *object << dendl; - object->add_waiter(MDSCacheObject::WAIT_AUTHPINNABLE, new C_MDS_RetryRequest(mdcache, mdr)); + object->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_MDS_RetryRequest(mdcache, mdr)); mds->locker->drop_locks(mdr); mdr->drop_local_auth_pins(); return false; @@ -979,7 +979,7 @@ void Locker::try_simple_eval(SimpleLock *lock) if (!lock->get_parent()->can_auth_pin()) { dout(7) << "try_simple_eval can't auth_pin, waiting on " << *lock->get_parent() << dendl; //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH)) - lock->get_parent()->add_waiter(MDSCacheObject::WAIT_AUTHPINNABLE, new C_Locker_SimpleEval(this, lock)); + lock->get_parent()->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_SimpleEval(this, lock)); return; } @@ -1437,7 +1437,7 @@ void Locker::try_scatter_eval(ScatterLock *lock) if (!lock->get_parent()->can_auth_pin()) { dout(7) << "try_scatter_eval can't auth_pin, waiting on " << *lock->get_parent() << dendl; //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH)) - lock->get_parent()->add_waiter(MDSCacheObject::WAIT_AUTHPINNABLE, new C_Locker_ScatterEval(this, lock)); + lock->get_parent()->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_ScatterEval(this, lock)); return; } @@ -2162,7 +2162,7 @@ void Locker::try_file_eval(FileLock *lock) if (!lock->get_parent()->can_auth_pin()) { dout(7) << "try_file_eval can't auth_pin, waiting on " << *in << dendl; //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH)) - in->add_waiter(CInode::WAIT_AUTHPINNABLE, new C_Locker_FileEval(this, lock)); + in->add_waiter(CInode::WAIT_UNFREEZE, new C_Locker_FileEval(this, lock)); return; } diff --git a/branches/sage/mds/mds/LogSegment.h b/branches/sage/mds/mds/LogSegment.h index 832ac236af1fa..58511c66db904 100644 --- a/branches/sage/mds/mds/LogSegment.h +++ b/branches/sage/mds/mds/LogSegment.h @@ -19,6 +19,9 @@ #include "include/interval_set.h" #include "include/Context.h" +#include +using __gnu_cxx::hash_set; + class CDir; class CInode; class CDentry; @@ -41,7 +44,7 @@ class LogSegment { map > purging_inodes; // committed anchor transactions - interval_set atids; + hash_set pending_commit_atids; // client request ids map last_client_tids; diff --git a/branches/sage/mds/mds/MDCache.cc b/branches/sage/mds/mds/MDCache.cc index a075fa7b3be3e..ac7fc868df305 100644 --- a/branches/sage/mds/mds/MDCache.cc +++ b/branches/sage/mds/mds/MDCache.cc @@ -4477,7 +4477,7 @@ void MDCache::anchor_create(MDRequest *mdr, CInode *in, Context *onfinish) if (!in->can_auth_pin() && !mdr->is_auth_pinned(in)) { dout(7) << "anchor_create not authpinnable, waiting on " << *in << dendl; - in->add_waiter(CInode::WAIT_AUTHPINNABLE, onfinish); + in->add_waiter(CInode::WAIT_UNFREEZE, onfinish); return; } @@ -4510,13 +4510,12 @@ class C_MDC_AnchorCreateLogged : public Context { MDCache *cache; CInode *in; version_t atid; - version_t pdv; LogSegment *ls; public: - C_MDC_AnchorCreateLogged(MDCache *c, CInode *i, version_t t, version_t v, LogSegment *s) : - cache(c), in(i), atid(t), pdv(v), ls(s) {} + C_MDC_AnchorCreateLogged(MDCache *c, CInode *i, version_t t, LogSegment *s) : + cache(c), in(i), atid(t), ls(s) {} void finish(int r) { - cache->_anchor_create_logged(in, atid, pdv, ls); + cache->_anchor_create_logged(in, atid, ls); } }; @@ -4525,29 +4524,24 @@ void MDCache::_anchor_create_prepared(CInode *in, version_t atid) dout(10) << "_anchor_create_prepared " << *in << " atid " << atid << dendl; assert(in->inode.anchored == false); - // predirty, prepare log entry - version_t pdv = in->pre_dirty(); - - EUpdate *le = new EUpdate(mds->mdlog, "anchor_create"); - le->metablob.add_dir_context(in->get_parent_dir()); - // update the logged inode copy - inode_t *pi = le->metablob.add_dentry(in->parent, true); + inode_t *pi = in->project_inode(); pi->anchored = true; - pi->version = pdv; + pi->version = in->pre_dirty(); // note anchor transaction + EUpdate *le = new EUpdate(mds->mdlog, "anchor_create"); + le->metablob.add_dir_context(in->get_parent_dir()); + le->metablob.add_primary_dentry(in->parent, true, 0, pi); le->metablob.add_anchor_transaction(atid); - - // log + wait - mds->mdlog->submit_entry(le, new C_MDC_AnchorCreateLogged(this, in, atid, pdv, + mds->mdlog->submit_entry(le, new C_MDC_AnchorCreateLogged(this, in, atid, mds->mdlog->get_current_segment())); } -void MDCache::_anchor_create_logged(CInode *in, version_t atid, version_t pdv, LogSegment *ls) +void MDCache::_anchor_create_logged(CInode *in, version_t atid, LogSegment *ls) { - dout(10) << "_anchor_create_logged pdv " << pdv << " on " << *in << dendl; + dout(10) << "_anchor_create_logged on " << *in << dendl; // unpin assert(in->state_test(CInode::STATE_ANCHORING)); @@ -4556,11 +4550,10 @@ void MDCache::_anchor_create_logged(CInode *in, version_t atid, version_t pdv, L in->auth_unpin(); // apply update to cache - in->inode.anchored = true; - in->mark_dirty(pdv, ls); + in->pop_and_dirty_projected_inode(ls); // tell the anchortable we've committed - mds->anchorclient->commit(atid); + mds->anchorclient->commit(atid, ls); // trigger waiters in->finish_waiting(CInode::WAIT_ANCHORED, 0); @@ -4588,7 +4581,7 @@ void MDCache::anchor_destroy(CInode *in, Context *onfinish) if (!in->can_auth_pin()/* && !mdr->is_auth_pinned(in)*/) { dout(7) << "anchor_destroy not authpinnable, waiting on " << *in << dendl; - in->add_waiter(CInode::WAIT_AUTHPINNABLE, onfinish); + in->add_waiter(CInode::WAIT_UNFREEZE, onfinish); return; } @@ -4618,12 +4611,12 @@ class C_MDC_AnchorDestroyLogged : public Context { MDCache *cache; CInode *in; version_t atid; - version_t pdv; + LogSegment *ls; public: - C_MDC_AnchorDestroyLogged(MDCache *c, CInode *i, version_t t, version_t v) : - cache(c), in(i), atid(t), pdv(v) {} + C_MDC_AnchorDestroyLogged(MDCache *c, CInode *i, version_t t, LogSegment *l) : + cache(c), in(i), atid(t), ls(l) {} void finish(int r) { - cache->_anchor_destroy_logged(in, atid, pdv); + cache->_anchor_destroy_logged(in, atid, ls); } }; @@ -4633,28 +4626,23 @@ void MDCache::_anchor_destroy_prepared(CInode *in, version_t atid) assert(in->inode.anchored == true); - // predirty, prepare log entry - version_t pdv = in->pre_dirty(); - - EUpdate *le = new EUpdate(mds->mdlog, "anchor_destroy"); - le->metablob.add_dir_context(in->get_parent_dir()); - // update the logged inode copy - inode_t *pi = le->metablob.add_dentry(in->parent, true); + inode_t *pi = in->project_inode(); pi->anchored = true; - pi->version = pdv; - - // note anchor transaction - le->metablob.add_anchor_transaction(atid); + pi->version = in->pre_dirty(); // log + wait - mds->mdlog->submit_entry(le, new C_MDC_AnchorDestroyLogged(this, in, atid, pdv)); + EUpdate *le = new EUpdate(mds->mdlog, "anchor_destroy"); + le->metablob.add_dir_context(in->get_parent_dir()); + le->metablob.add_primary_dentry(in->parent, true, 0, pi); + le->metablob.add_anchor_transaction(atid); + mds->mdlog->submit_entry(le, new C_MDC_AnchorDestroyLogged(this, in, atid, mds->mdlog->get_current_segment())); } -void MDCache::_anchor_destroy_logged(CInode *in, version_t atid, version_t pdv) +void MDCache::_anchor_destroy_logged(CInode *in, version_t atid, LogSegment *ls) { - dout(10) << "_anchor_destroy_logged pdv " << pdv << " on " << *in << dendl; + dout(10) << "_anchor_destroy_logged on " << *in << dendl; // unpin assert(in->state_test(CInode::STATE_UNANCHORING)); @@ -4663,11 +4651,10 @@ void MDCache::_anchor_destroy_logged(CInode *in, version_t atid, version_t pdv) in->auth_unpin(); // apply update to cache - in->inode.anchored = false; - in->inode.version = pdv; - + in->pop_and_dirty_projected_inode(ls); + // tell the anchortable we've committed - mds->anchorclient->commit(atid); + mds->anchorclient->commit(atid, ls); // trigger waiters in->finish_waiting(CInode::WAIT_UNANCHORED, 0); diff --git a/branches/sage/mds/mds/MDCache.h b/branches/sage/mds/mds/MDCache.h index 3db01bb6a0ea3..8f9b0e9ac2e7d 100644 --- a/branches/sage/mds/mds/MDCache.h +++ b/branches/sage/mds/mds/MDCache.h @@ -126,6 +126,7 @@ struct MDRequest { version_t dst_reanchor_atid; // dst->stray bufferlist inode_import; version_t inode_import_v; + CInode *inode_export; // inode we're exporting, if any CDentry *srcdn; // srcdn, if auth, on slave // called when slave commits @@ -139,6 +140,7 @@ struct MDRequest { ls(0), done_locking(false), committing(false), aborted(false), src_reanchor_atid(0), dst_reanchor_atid(0), inode_import_v(0), + inode_export(0), srcdn(0), slave_commit(0) { } MDRequest(metareqid_t ri, MClientRequest *req) : reqid(ri), client_request(req), ref(0), @@ -146,6 +148,7 @@ struct MDRequest { ls(0), done_locking(false), committing(false), aborted(false), src_reanchor_atid(0), dst_reanchor_atid(0), inode_import_v(0), + inode_export(0), srcdn(0), slave_commit(0) { } MDRequest(metareqid_t ri, int by) : reqid(ri), client_request(0), ref(0), @@ -153,6 +156,7 @@ struct MDRequest { ls(0), done_locking(false), committing(false), aborted(false), src_reanchor_atid(0), dst_reanchor_atid(0), inode_import_v(0), + inode_export(0), srcdn(0), slave_commit(0) { } bool is_master() { return slave_to_mds < 0; } @@ -558,9 +562,9 @@ public: void anchor_destroy(CInode *in, Context *onfinish); protected: void _anchor_create_prepared(CInode *in, version_t atid); - void _anchor_create_logged(CInode *in, version_t atid, version_t pdv, LogSegment *ls); + void _anchor_create_logged(CInode *in, version_t atid, LogSegment *ls); void _anchor_destroy_prepared(CInode *in, version_t atid); - void _anchor_destroy_logged(CInode *in, version_t atid, version_t pdv); + void _anchor_destroy_logged(CInode *in, version_t atid, LogSegment *ls); friend class C_MDC_AnchorCreatePrepared; friend class C_MDC_AnchorCreateLogged; diff --git a/branches/sage/mds/mds/Migrator.cc b/branches/sage/mds/mds/Migrator.cc index 3852784974a25..7819bec5f3a67 100644 --- a/branches/sage/mds/mds/Migrator.cc +++ b/branches/sage/mds/mds/Migrator.cc @@ -775,7 +775,6 @@ void Migrator::export_go(CDir *dir) export_warning_ack_waiting.erase(dir); export_state[dir] = EXPORT_EXPORTING; - assert(export_data.count(dir) == 0); assert(dir->get_cum_auth_pins() == 0); // set ambiguous auth @@ -786,22 +785,19 @@ void Migrator::export_go(CDir *dir) // fill export message with cache data utime_t now = g_clock.now(); - C_Contexts *fin = new C_Contexts; // collect all the waiters map exported_client_map; - int num_exported_inodes = encode_export_dir( export_data[dir], - fin, - dir, // base + list export_data; + int num_exported_inodes = encode_export_dir( export_data, dir, // recur start point - dest, exported_client_map, now ); bufferlist bl; ::_encode(exported_client_map, bl); - export_data[dir].push_front(bl); + export_data.push_front(bl); // send the export data! MExportDir *req = new MExportDir(dir->dirfrag()); - req->set_dirstate(export_data[dir]); + req->take_dirstate(export_data); // add bounds to message set bounds; @@ -811,12 +807,9 @@ void Migrator::export_go(CDir *dir) ++p) req->add_export((*p)->dirfrag()); - //s end + // send mds->send_message_mds(req, dest, MDS_PORT_MIGRATOR); - // queue up the finisher - dir->add_waiter( CDir::WAIT_UNFREEZE, fin ); - // stats if (mds->logger) mds->logger->inc("ex"); if (mds->logger) mds->logger->inc("iex", num_exported_inodes); @@ -830,22 +823,39 @@ void Migrator::export_go(CDir *dir) * encode relevant state to be sent over the wire. * used by: encode_export_dir, file_rename (if foreign) */ -void Migrator::encode_export_inode(CInode *in, bufferlist& enc_state, int new_auth, +void Migrator::encode_export_inode(CInode *in, bufferlist& enc_state, map& exported_client_map, utime_t now) { + dout(7) << "encode_export_inode " << *in << dendl; + assert(!in->is_replica(mds->get_nodeid())); + + CInodeExport istate(in, now); + istate._encode(enc_state); + + // make note of clients named by exported capabilities + for (map::iterator it = in->client_caps.begin(); + it != in->client_caps.end(); + it++) + exported_client_map[it->first] = mds->clientmap.get_inst(it->first); +} + +void Migrator::finish_export_inode(CInode *in, C_Contexts *fin) +{ + dout(12) << "finish_export_inode " << *in << dendl; + // tell (all) clients about migrating caps.. mark STALE for (map::iterator it = in->client_caps.begin(); it != in->client_caps.end(); it++) { - dout(7) << "encode_export_inode " << *in << " telling client" << it->first << " stale caps" << dendl; + dout(7) << "finish_export_inode telling client" << it->first + << " stale caps on " << *in << dendl; MClientFileCaps *m = new MClientFileCaps(MClientFileCaps::OP_STALE, in->inode, it->second.get_last_seq(), it->second.pending(), it->second.wanted()); entity_inst_t inst = mds->clientmap.get_inst(it->first); - exported_client_map[it->first] = inst; mds->send_message_client_maybe_open(m, inst); } @@ -853,14 +863,7 @@ void Migrator::encode_export_inode(CInode *in, bufferlist& enc_state, int new_au if (!in->is_replicated()) in->replicate_relax_locks(); - // add inode - assert(!in->is_replica(mds->get_nodeid())); - CInodeExport istate(in, now); - istate._encode( enc_state ); - - // we're export this inode; fix inode state - dout(7) << "encode_export_inode " << *in << dendl; - + // clean if (in->is_dirty()) in->mark_clean(); // clear/unpin cached_by (we're no longer the authority) @@ -878,19 +881,21 @@ void Migrator::encode_export_inode(CInode *in, bufferlist& enc_state, int new_au in->state_clear(CInode::STATE_AUTH); in->replica_nonce = CInode::EXPORT_NONCE; + // waiters + list waiters; + in->take_waiting(CInode::WAIT_ANY, waiters); + fin->take(waiters); + // *** other state too? // move to end of LRU so we drop out of cache quickly! if (in->get_parent_dn()) cache->lru.lru_bottouch(in->get_parent_dn()); -} +} int Migrator::encode_export_dir(list& dirstatelist, - C_Contexts *fin, - CDir *basedir, CDir *dir, - int newauth, map& exported_client_map, utime_t now) { @@ -902,32 +907,11 @@ int Migrator::encode_export_dir(list& dirstatelist, // dir bufferlist enc_dir; - CDirExport dstate(dir, now); dstate._encode( enc_dir ); - // release open_by - dir->clear_replica_map(); - - // mark - assert(dir->is_auth()); - dir->state_clear(CDir::STATE_AUTH); - dir->replica_nonce = CDir::NONCE_EXPORT; - - list subdirs; - - if (dir->is_dirty()) - dir->mark_clean(); - - // discard most dir state - dir->state &= CDir::MASK_STATE_EXPORT_KEPT; // i only retain a few things. - - // suck up all waiters - list waiting; - dir->take_waiting(CDir::WAIT_ANY, waiting); // all dir waiters - fin->take(waiting); - // dentries + list subdirs; CDir::map_t::iterator it; for (it = dir->begin(); it != dir->end(); it++) { CDentry *dn = it->second; @@ -938,11 +922,11 @@ int Migrator::encode_export_dir(list& dirstatelist, // -- dentry dout(7) << "encode_export_dir exporting " << *dn << dendl; - // name + // dn name ::_encode(it->first, enc_dir); // state - it->second->encode_export_state(enc_dir); + dn->encode_export(enc_dir); // points to... @@ -967,7 +951,7 @@ int Migrator::encode_export_dir(list& dirstatelist, // -- inode enc_dir.append("I", 1); // inode dentry - encode_export_inode(in, enc_dir, newauth, exported_client_map, now); // encode, and (update state for) export + encode_export_inode(in, enc_dir, exported_client_map, now); // encode, and (update state for) export // directory? list dfs; @@ -980,11 +964,6 @@ int Migrator::encode_export_dir(list& dirstatelist, subdirs.push_back(dir); // it's ours, recurse (later) } } - - // waiters - list waiters; - in->take_waiting(CInode::WAIT_ANY, waiters); - fin->take(waiters); } // add to dirstatelist @@ -994,12 +973,62 @@ int Migrator::encode_export_dir(list& dirstatelist, // subdirs for (list::iterator it = subdirs.begin(); it != subdirs.end(); it++) - num_exported += encode_export_dir(dirstatelist, fin, basedir, *it, newauth, - exported_client_map, now); + num_exported += encode_export_dir(dirstatelist, *it, exported_client_map, now); return num_exported; } +void Migrator::finish_export_dir(CDir *dir, C_Contexts *fin, utime_t now) +{ + dout(10) << "finish_export_dir " << *dir << dendl; + + // release open_by + dir->clear_replica_map(); + + // mark + assert(dir->is_auth()); + dir->state_clear(CDir::STATE_AUTH); + dir->replica_nonce = CDir::NONCE_EXPORT; + + if (dir->is_dirty()) + dir->mark_clean(); + + // discard most dir state + dir->state &= CDir::MASK_STATE_EXPORT_KEPT; // i only retain a few things. + + // suck up all waiters + list waiting; + dir->take_waiting(CDir::WAIT_ANY, waiting); // all dir waiters + fin->take(waiting); + + // pop + dir->pop_auth_subtree_nested -= dir->pop_auth_subtree; + dir->pop_me.zero(now); + dir->pop_auth_subtree.zero(now); + + // dentries + list subdirs; + CDir::map_t::iterator it; + for (it = dir->begin(); it != dir->end(); it++) { + CDentry *dn = it->second; + CInode *in = dn->get_inode(); + + // dentry + dn->finish_export(); + + // inode? + if (dn->is_primary()) { + finish_export_inode(in, fin); + + // subdirs? + in->get_nested_dirfrags(subdirs); + } + } + + // subdirs + for (list::iterator it = subdirs.begin(); it != subdirs.end(); it++) + finish_export_dir(*it, fin, now); +} class C_MDS_ExportFinishLogged : public Context { Migrator *migrator; @@ -1027,7 +1056,6 @@ void Migrator::handle_export_ack(MExportDirAck *m) export_warning_ack_waiting.erase(dir); export_state[dir] = EXPORT_LOGGINGFINISH; - export_data.erase(dir); set bounds; cache->get_subtree_bounds(dir, bounds); @@ -1055,6 +1083,8 @@ void Migrator::handle_export_ack(MExportDirAck *m) + + /* * this happens if hte dest failes after i send teh export data but before it is acked * that is, we don't know they safely received and logged it, so we reverse our changes @@ -1065,7 +1095,6 @@ void Migrator::export_reverse(CDir *dir) dout(7) << "export_reverse " << *dir << dendl; assert(export_state[dir] == EXPORT_EXPORTING); - assert(export_data.count(dir)); set bounds; cache->get_subtree_bounds(dir, bounds); @@ -1083,27 +1112,10 @@ void Migrator::export_reverse(CDir *dir) bd->state_clear(CDir::STATE_EXPORTBOUND); } - // re-import the metadata - map imported_client_map; - int off = 0; - ::_decode(imported_client_map, export_data[dir].front(), off); - export_data[dir].pop_front(); - - while (!export_data[dir].empty()) { - decode_import_dir(export_data[dir].front(), - export_peer[dir], - dir, // import root - 0, - imported_client_map, - 0); - export_data[dir].pop_front(); - } - // process delayed expires cache->process_delayed_expire(dir); // some clean up - export_data.erase(dir); export_warning_ack_waiting.erase(dir); export_notify_ack_waiting.erase(dir); @@ -1227,6 +1239,11 @@ void Migrator::export_finish(CDir *dir) dout(7) << "not sending MExportDirFinish, dest has failed" << dendl; } + // finish export (adjust local cache state) + C_Contexts *fin = new C_Contexts; + finish_export_dir(dir, fin, g_clock.now()); + dir->add_waiter(CDir::WAIT_UNFREEZE, fin); + // unfreeze dout(7) << "export_finish unfreezing" << dendl; dir->unfreeze_tree(); diff --git a/branches/sage/mds/mds/Migrator.h b/branches/sage/mds/mds/Migrator.h index f9336668dd7f7..ccfe2666d66ab 100644 --- a/branches/sage/mds/mds/Migrator.h +++ b/branches/sage/mds/mds/Migrator.h @@ -78,7 +78,7 @@ protected: // export fun map export_state; map export_peer; - map > export_data; // only during EXPORTING state + //map > export_data; // only during EXPORTING state map > export_warning_ack_waiting; map > export_notify_ack_waiting; @@ -183,16 +183,15 @@ public: void export_dir_nicely(CDir *dir, int dest); void maybe_do_queued_export(); - void encode_export_inode(CInode *in, bufferlist& enc_state, int newauth, + void encode_export_inode(CInode *in, bufferlist& enc_state, map& exported_client_map, utime_t now); + void finish_export_inode(CInode *in, C_Contexts *fin); int encode_export_dir(list& dirstatelist, - class C_Contexts *fin, - CDir *basedir, CDir *dir, - int newauth, map& exported_client_map, utime_t now); + void finish_export_dir(CDir *dir, class C_Contexts *fin, utime_t now); void add_export_finish_waiter(CDir *dir, Context *c) { export_finish_waiters[dir].push_back(c); diff --git a/branches/sage/mds/mds/Server.cc b/branches/sage/mds/mds/Server.cc index 0e062e3ac3764..066502b588096 100644 --- a/branches/sage/mds/mds/Server.cc +++ b/branches/sage/mds/mds/Server.cc @@ -821,7 +821,7 @@ void Server::handle_slave_auth_pin(MDRequest *mdr) !(*p)->can_auth_pin()) { // wait dout(10) << " waiting for authpinnable on " << **p << dendl; - (*p)->add_waiter(CDir::WAIT_AUTHPINNABLE, new C_MDS_RetryRequest(mdcache, mdr)); + (*p)->add_waiter(CDir::WAIT_UNFREEZE, new C_MDS_RetryRequest(mdcache, mdr)); mdr->drop_local_auth_pins(); return; } @@ -1112,7 +1112,7 @@ CInode* Server::rdlock_path_pin_ref(MDRequest *mdr, bool want_auth) if (want_auth) { if (ref->is_frozen()) { dout(7) << "waiting for !frozen/authpinnable on " << *ref << dendl; - ref->add_waiter(CInode::WAIT_AUTHPINNABLE, new C_MDS_RetryRequest(mdcache, mdr)); + ref->add_waiter(CInode::WAIT_UNFREEZE, new C_MDS_RetryRequest(mdcache, mdr)); return 0; } mdr->auth_pin(ref); @@ -1156,7 +1156,7 @@ CDentry* Server::rdlock_path_xlock_dentry(MDRequest *mdr, bool okexist, bool mus // make sure we can auth_pin (or have already authpinned) dir if (dir->is_frozen()) { dout(7) << "waiting for !frozen/authpinnable on " << *dir << dendl; - dir->add_waiter(CInode::WAIT_AUTHPINNABLE, new C_MDS_RetryRequest(mdcache, mdr)); + dir->add_waiter(CInode::WAIT_UNFREEZE, new C_MDS_RetryRequest(mdcache, mdr)); return 0; } @@ -2449,7 +2449,7 @@ void Server::_unlink_local_finish(MDRequest *mdr, // commit anchor update? if (mdr->dst_reanchor_atid) - mds->anchorclient->commit(mdr->dst_reanchor_atid); + mds->anchorclient->commit(mdr->dst_reanchor_atid, mdr->ls); // bump pop //mds->balancer->hit_dir(mdr->now, dn->dir, META_POP_DWR); @@ -2555,7 +2555,7 @@ void Server::_unlink_remote_finish(MDRequest *mdr, // commit anchor update? if (mdr->dst_reanchor_atid) - mds->anchorclient->commit(mdr->dst_reanchor_atid); + mds->anchorclient->commit(mdr->dst_reanchor_atid, mdr->ls); //mds->balancer->hit_dir(mdr->now, dn->dir, META_POP_DWR); @@ -2946,8 +2946,8 @@ void Server::_rename_finish(MDRequest *mdr, CDentry *srcdn, CDentry *destdn, CDe _rename_apply(mdr, srcdn, destdn, straydn); // commit anchor updates? - if (mdr->src_reanchor_atid) mds->anchorclient->commit(mdr->src_reanchor_atid); - if (mdr->dst_reanchor_atid) mds->anchorclient->commit(mdr->dst_reanchor_atid); + if (mdr->src_reanchor_atid) mds->anchorclient->commit(mdr->src_reanchor_atid, mdr->ls); + if (mdr->dst_reanchor_atid) mds->anchorclient->commit(mdr->dst_reanchor_atid, mdr->ls); // bump popularity //if (srcdn->is_auth()) @@ -3325,7 +3325,7 @@ void Server::_logged_slave_rename(MDRequest *mdr, // bump popularity //if (srcdn->is_auth()) //mds->balancer->hit_dir(mdr->now, srcdn->get_dir(), META_POP_DWR); - if (destdn->inode->is_auth()) + if (destdn->inode && destdn->inode->is_auth()) mds->balancer->hit_inode(mdr->now, destdn->inode, META_POP_IWR); // done. @@ -3344,6 +3344,12 @@ void Server::_commit_slave_rename(MDRequest *mdr, int r, // commit _rename_apply(mdr, srcdn, destdn, straydn); + if (mdr->inode_export) { + C_Contexts *fin = new C_Contexts; + mdcache->migrator->finish_export_inode(mdr->inode_export, fin); + mds->queue_waiter(fin); + } + // write a commit to the journal le = new ESlaveUpdate(mdlog, "slave_rename_commit", mdr->reqid, mdr->slave_to_mds, ESlaveUpdate::OP_COMMIT); } else { @@ -3401,7 +3407,7 @@ void Server::handle_slave_rename_get_inode(MDRequest *mdr) map exported_client_map; bufferlist inodebl; - mdcache->migrator->encode_export_inode(mdr->srcdn->inode, inodebl, mdr->slave_to_mds, + mdcache->migrator->encode_export_inode(mdr->srcdn->inode, inodebl, exported_client_map, mdr->now); ::_encode(exported_client_map, reply->inode_export); @@ -3409,8 +3415,6 @@ void Server::handle_slave_rename_get_inode(MDRequest *mdr) reply->inode_export_v = mdr->srcdn->inode->inode.version; - mdr->inode_import = reply->inode_export; // keep a copy locally, in case we have to rollback - mds->send_message_mds(reply, mdr->slave_to_mds, MDS_PORT_SERVER); // clean up. diff --git a/branches/sage/mds/mds/journal.cc b/branches/sage/mds/mds/journal.cc index a43a8c179d7bb..7e0b9f0f9474c 100644 --- a/branches/sage/mds/mds/journal.cc +++ b/branches/sage/mds/mds/journal.cc @@ -91,7 +91,7 @@ C_Gather *LogSegment::try_to_expire(MDS *mds) dir->commit(0, gather->new_sub()); } else { dout(10) << " waiting for unfreeze on " << *dir << dendl; - dir->add_waiter(CDir::WAIT_AUTHPINNABLE, gather->new_sub()); + dir->add_waiter(CDir::WAIT_UNFREEZE, gather->new_sub()); } } } @@ -103,7 +103,16 @@ C_Gather *LogSegment::try_to_expire(MDS *mds) (*p)->dirlock.add_waiter(SimpleLock::WAIT_STABLE, gather->new_sub()); } - // + // pending commit atids + for (hash_set::iterator p = pending_commit_atids.begin(); + p != pending_commit_atids.end(); + ++p) { + if (!gather) gather = new C_Gather; + assert(!mds->anchorclient->has_committed(*p)); + dout(10) << " anchor transaction " << *p + << " pending commit (not yet acked), waiting" << dendl; + mds->anchorclient->wait_for_ack(*p, gather->new_sub()); + } return gather; } @@ -345,7 +354,7 @@ void EMetaBlob::expire(MDS *mds, Context *c) else // pbly about to export|split|merge. // just wait for it to unfreeze, then retry - p->first->add_waiter(CDir::WAIT_AUTHPINNABLE, gather->new_sub()); + p->first->add_waiter(CDir::WAIT_UNFREEZE, gather->new_sub()); } for (list::iterator p = waitfor_export.begin(); p != waitfor_export.end(); @@ -423,8 +432,9 @@ void EMetaBlob::expire(MDS *mds, Context *c) void EMetaBlob::update_segment(LogSegment *ls) { // atids? - for (list::iterator p = atids.begin(); p != atids.end(); ++p) - ls->atids.insert(*p); + //for (list::iterator p = atids.begin(); p != atids.end(); ++p) + // ls->pending_commit_atids[*p] = ls; + // -> handled directly by AnchorClient // dirty inode mtimes // -> handled directly by Server.cc, replay() @@ -434,7 +444,7 @@ void EMetaBlob::update_segment(LogSegment *ls) ls->allocv = alloc_tablev; // truncated inodes - // -> handled directory by Server.cc + // -> handled directly by Server.cc // client requests // note the newest request per client @@ -593,7 +603,7 @@ void EMetaBlob::replay(MDS *mds, LogSegment *logseg) p != atids.end(); ++p) { dout(10) << "EMetaBlob.replay noting anchor transaction " << *p << dendl; - mds->anchorclient->got_journaled_agree(*p); + mds->anchorclient->got_journaled_agree(*p, logseg); } // dirtied inode mtimes diff --git a/branches/sage/mds/mds/mdstypes.h b/branches/sage/mds/mds/mdstypes.h index a0f70ff421e84..7b02ac38d8618 100644 --- a/branches/sage/mds/mds/mdstypes.h +++ b/branches/sage/mds/mds/mdstypes.h @@ -396,8 +396,7 @@ class MDSCacheObject { // -- wait -- const static int WAIT_SINGLEAUTH = (1<<30); - const static int WAIT_AUTHPINNABLE = (1<<29); - const static int WAIT_UNFREEZE = WAIT_AUTHPINNABLE; + const static int WAIT_UNFREEZE = (1<<29); // pka AUTHPINNABLE // ============================================ diff --git a/branches/sage/mds/messages/MExportDir.h b/branches/sage/mds/messages/MExportDir.h index 8fafbe0312636..f00a7fa2507d1 100644 --- a/branches/sage/mds/messages/MExportDir.h +++ b/branches/sage/mds/messages/MExportDir.h @@ -46,6 +46,9 @@ class MExportDir : public Message { void set_dirstate(const list& ls) { dirstate = ls; } + void take_dirstate(list& ls) { + dirstate.swap(ls); + } void add_export(dirfrag_t df) { bounds.push_back(df); } -- 2.39.5