From 17bc29350af8d191557b8c8c7c03870178b3ccb4 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Wed, 5 Jan 2011 15:31:06 -0800 Subject: [PATCH] mds: change refragment journaling/store strategy We had a serious problem before where we were updating the cache and redivvying up the dentries among fragments, but not immediately journaling it. This was okay only if we were lucky and no other update journaled something (e.g. some random child journaling its ancestors). Instead, journal (PREPARE) immediately and in parallel with the new dirfrag stores. When the stores complete, journal again (COMMIT). On journal replay, for any PREPAREs without matching COMMITS we immediately journal a ROLLBACK. Other behavior is essentially unchanged. We don't send the notify until both the PREPARE and STORES complete. But that part doesn't really matter: if we restart and rollback, peers will find out during resolve/rejoin, as before. Signed-off-by: Sage Weil --- src/mds/CDir.cc | 2 +- src/mds/MDCache.cc | 94 +++++++++++++++++++------------------- src/mds/MDCache.h | 13 ++++-- src/mds/MDS.cc | 15 ++++-- src/mds/MDS.h | 1 + src/mds/events/EFragment.h | 29 ++++++++++-- src/mds/journal.cc | 21 ++++++++- 7 files changed, 113 insertions(+), 62 deletions(-) diff --git a/src/mds/CDir.cc b/src/mds/CDir.cc index d146c67b60e12..d3b0b88ff5a12 100644 --- a/src/mds/CDir.cc +++ b/src/mds/CDir.cc @@ -784,7 +784,7 @@ void CDir::merge(list& subs, list& waiters, bool replay) for (list::iterator p = subs.begin(); p != subs.end(); p++) { CDir *dir = *p; dout(10) << " subfrag " << dir->get_frag() << " " << *dir << dendl; - assert(!dir->is_auth() || dir->is_complete()); + assert(!dir->is_auth() || dir->is_complete() || replay); // steal dentries while (!dir->items.empty()) diff --git a/src/mds/MDCache.cc b/src/mds/MDCache.cc index 621a43a326cb3..fbecbd530302c 100644 --- a/src/mds/MDCache.cc +++ b/src/mds/MDCache.cc @@ -8947,7 +8947,11 @@ void MDCache::merge_dir(CInode *diri, frag_t frag) return; } - C_Gather *gather = new C_Gather(new C_MDC_FragmentFrozen(this, dirs, frag, 0)); + CDir *first = dirs.front(); + int bits = first->get_frag().bits() - frag.bits(); + dout(10) << " we are merginb by " << bits << " bits" << dendl; + + C_Gather *gather = new C_Gather(new C_MDC_FragmentFrozen(this, dirs, frag, bits)); fragment_freeze_dirs(dirs, gather); // initial mark+complete pass @@ -9039,16 +9043,17 @@ void MDCache::fragment_unmark_unfreeze_dirs(list& dirs) } } -class C_MDC_FragmentStored : public Context { +class C_MDC_FragmentLoggedAndStored : public Context { MDCache *mdcache; - list dirs; + Mutation *mut; + list resultfrags; frag_t basefrag; int bits; public: - C_MDC_FragmentStored(MDCache *m, list& d, frag_t bf, int b) : - mdcache(m), dirs(d), basefrag(bf), bits(b) {} + C_MDC_FragmentLoggedAndStored(MDCache *m, Mutation *mu, list& r, frag_t bf, int bi) : + mdcache(m), mut(mu), resultfrags(r), basefrag(bf), bits(bi) {} virtual void finish(int r) { - mdcache->fragment_stored(dirs, basefrag, bits); + mdcache->fragment_logged_and_stored(mut, resultfrags, basefrag, bits); } }; @@ -9059,7 +9064,7 @@ void MDCache::fragment_frozen(list& dirs, frag_t basefrag, int bits) if (bits > 0) { assert(dirs.size() == 1); } else { - assert(bits == 0); + assert(bits < 0); } dout(10) << "fragment_frozen " << dirs << " " << basefrag << " by " << bits @@ -9083,44 +9088,11 @@ void MDCache::fragment_frozen(list& dirs, frag_t basefrag, int bits) adjust_dir_fragments(diri, dirs, basefrag, bits, resultfrags, waiters, false); mds->queue_waiters(waiters); - C_Gather *gather = new C_Gather(new C_MDC_FragmentStored(this, resultfrags, basefrag, bits)); - - // freeze, store resulting frags - for (list::iterator p = resultfrags.begin(); - p != resultfrags.end(); - p++) { - CDir *dir = *p; - dout(10) << " result frag " << *dir << dendl; - dir->state_set(CDir::STATE_FRAGMENTING); - dir->commit(0, gather->new_sub(), true); // ignore authpinnability - dir->_freeze_dir(); - } -} - -class C_MDC_FragmentLogged : public Context { - MDCache *mdcache; - Mutation *mut; - list resultfrags; - frag_t basefrag; - int bits; -public: - C_MDC_FragmentLogged(MDCache *m, Mutation *mu, list& r, frag_t bf, int bi) : - mdcache(m), mut(mu), resultfrags(r), basefrag(bf), bits(bi) {} - virtual void finish(int r) { - mdcache->fragment_logged(mut, resultfrags, basefrag, bits); - } -}; - -void MDCache::fragment_stored(list& resultfrags, frag_t basefrag, int bits) -{ - CInode *diri = resultfrags.front()->get_inode(); - dout(10) << "fragment_stored " << resultfrags << " " << basefrag << " by " << bits - << " on " << *diri << dendl; - + // journal Mutation *mut = new Mutation; mut->ls = mds->mdlog->get_current_segment(); - EFragment *le = new EFragment(mds->mdlog, diri->ino(), basefrag, bits); + EFragment *le = new EFragment(mds->mdlog, EFragment::OP_PREPARE, diri->ino(), basefrag, bits); mds->mdlog->start_entry(le); le->metablob.add_dir_context(*resultfrags.begin()); @@ -9142,27 +9114,38 @@ void MDCache::fragment_stored(list& resultfrags, frag_t basefrag, int bit mut->add_updated_lock(&diri->nestlock); */ - // journal new dirfrag fragstats for each new fragment. + // freeze, journal, and store resulting frags + C_Gather *gather = new C_Gather(new C_MDC_FragmentLoggedAndStored(this, mut, resultfrags, basefrag, bits)); + for (list::iterator p = resultfrags.begin(); p != resultfrags.end(); p++) { CDir *dir = *p; dout(10) << " result frag " << *dir << dendl; le->metablob.add_dir(dir, false); + + // freeze and store them too + dir->state_set(CDir::STATE_FRAGMENTING); + dir->commit(0, gather->new_sub(), true); // ignore authpinnability + dir->_freeze_dir(); } - - mds->mdlog->submit_entry(le, - new C_MDC_FragmentLogged(this, mut, resultfrags, basefrag, bits)); + + mds->mdlog->submit_entry(le, gather->new_sub()); mds->mdlog->flush(); } -void MDCache::fragment_logged(Mutation *mut, list& resultfrags, frag_t basefrag, int bits) +void MDCache::fragment_logged_and_stored(Mutation *mut, list& resultfrags, frag_t basefrag, int bits) { CInode *diri = resultfrags.front()->get_inode(); - dout(10) << "fragment_logged " << resultfrags << " " << basefrag << " bits " << bits + dout(10) << "fragment_logged_and_stored " << resultfrags << " " << basefrag << " bits " << bits << " on " << *diri << dendl; + // journal commit + EFragment *le = new EFragment(mds->mdlog, EFragment::OP_COMMIT, diri->ino(), basefrag, bits); + mds->mdlog->start_entry(le); + mds->mdlog->submit_entry(le); + // tell peers CDir *first = *resultfrags.begin(); for (map::iterator p = first->replica_map.begin(); @@ -9240,6 +9223,21 @@ void MDCache::handle_fragment_notify(MMDSFragmentNotify *notify) } +void MDCache::rollback_uncommitted_fragments() +{ + dout(10) << "rollback_uncommitted_fragments: " << uncommitted_fragments.size() << " pending" << dendl; + for (set< pair >::iterator p = uncommitted_fragments.begin(); + p != uncommitted_fragments.end(); + ++p) { + CInode *diri = get_inode(p->first.ino); + assert(diri); + dout(10) << " rolling back " << p->first << " refragment by " << p->second << " bits" << dendl; + list resultfrags; + list waiters; + adjust_dir_fragments(diri, p->first.frag, -p->second, resultfrags, waiters, true); + } + uncommitted_fragments.clear(); +} diff --git a/src/mds/MDCache.h b/src/mds/MDCache.h index 6c2fc069c0bd9..12d122bffd04d 100644 --- a/src/mds/MDCache.h +++ b/src/mds/MDCache.h @@ -1096,6 +1096,9 @@ protected: // -- fragmenting -- +public: + set< pair > uncommitted_fragments; // prepared but uncommitted refragmentations + private: void adjust_dir_fragments(CInode *diri, frag_t basefrag, int bits, list& frags, list& waiters, bool replay); @@ -1123,12 +1126,14 @@ private: void fragment_mark_and_complete(list& dirs); void fragment_frozen(list& dirs, frag_t basefrag, int bits); void fragment_unmark_unfreeze_dirs(list& dirs); - void fragment_stored(list& resultfrags, frag_t basefrag, int bits); - void fragment_logged(Mutation *mut, list& resultfrags, frag_t basefrag, int bits); + void fragment_logged_and_stored(Mutation *mut, list& resultfrags, frag_t basefrag, int bits); +public: + void rollback_uncommitted_fragments(); +private: + friend class C_MDC_FragmentFrozen; friend class C_MDC_FragmentMarking; - friend class C_MDC_FragmentStored; - friend class C_MDC_FragmentLogged; + friend class C_MDC_FragmentLoggedAndStored; void handle_fragment_notify(MMDSFragmentNotify *m); diff --git a/src/mds/MDS.cc b/src/mds/MDS.cc index d795ea504dac8..6a4d4334ad140 100644 --- a/src/mds/MDS.cc +++ b/src/mds/MDS.cc @@ -1225,13 +1225,22 @@ void MDS::replay_done() } } +void MDS::reopen_log() +{ + dout(1) << "reopen_log" << dendl; + + // start new segment + mdlog->start_new_segment(0); + + mdcache->rollback_uncommitted_fragments(); +} + void MDS::resolve_start() { dout(1) << "resolve_start" << dendl; - // start new segment - mdlog->start_new_segment(0); + reopen_log(); mdcache->resolve_start(); } @@ -1246,7 +1255,7 @@ void MDS::reconnect_start() dout(1) << "reconnect_start" << dendl; if (last_state == MDSMap::STATE_REPLAY) - mdlog->start_new_segment(0); + reopen_log(); server->reconnect_clients(); finish_contexts(waiting_for_reconnect); diff --git a/src/mds/MDS.h b/src/mds/MDS.h index 8b982cf7202f5..a8609a8ece58f 100644 --- a/src/mds/MDS.h +++ b/src/mds/MDS.h @@ -353,6 +353,7 @@ class MDS : public Dispatcher { void creating_done(); void starting_done(); void replay_done(); + void reopen_log(); void resolve_start(); void resolve_done(); diff --git a/src/mds/events/EFragment.h b/src/mds/events/EFragment.h index 72adf0063f724..3c9a93b549d27 100644 --- a/src/mds/events/EFragment.h +++ b/src/mds/events/EFragment.h @@ -21,22 +21,39 @@ class EFragment : public LogEvent { public: EMetaBlob metablob; + __u8 op; inodeno_t ino; frag_t basefrag; __s32 bits; // positive for split (from basefrag), negative for merge (to basefrag) EFragment() : LogEvent(EVENT_FRAGMENT) { } - EFragment(MDLog *mdlog, inodeno_t i, frag_t bf, int b) : + EFragment(MDLog *mdlog, int o, inodeno_t i, frag_t bf, int b) : LogEvent(EVENT_FRAGMENT), metablob(mdlog), - ino(i), basefrag(bf), bits(b) { } + op(o), ino(i), basefrag(bf), bits(b) { } void print(ostream& out) { - out << "EFragment " << ino << " " << basefrag << " by " << bits << " " << metablob; + out << "EFragment " << op_name(op) << " " << ino << " " << basefrag << " by " << bits << " " << metablob; + } + + enum { + OP_PREPARE = 1, + OP_COMMIT = 2, + OP_ROLLBACK = 3, + OP_ONESHOT = 4, // (legacy) PREPARE+COMMIT + }; + const char *op_name(int o) { + switch (o) { + case OP_PREPARE: return "prepare"; + case OP_COMMIT: return "commit"; + case OP_ROLLBACK: return "rollback"; + default: return "???"; + } } void encode(bufferlist &bl) const { - __u8 struct_v = 2; + __u8 struct_v = 3; ::encode(struct_v, bl); ::encode(stamp, bl); + ::encode(op, bl); ::encode(ino, bl); ::encode(basefrag, bl); ::encode(bits, bl); @@ -47,6 +64,10 @@ public: ::decode(struct_v, bl); if (struct_v >= 2) ::decode(stamp, bl); + if (struct_v >= 3) + ::decode(op, bl); + else + op = OP_ONESHOT; ::decode(ino, bl); ::decode(basefrag, bl); ::decode(bits, bl); diff --git a/src/mds/journal.cc b/src/mds/journal.cc index 3cb188bd1c4ae..62228a1b1cd68 100644 --- a/src/mds/journal.cc +++ b/src/mds/journal.cc @@ -983,12 +983,29 @@ void ESubtreeMap::replay(MDS *mds) void EFragment::replay(MDS *mds) { - dout(10) << "EFragment.replay " << ino << " " << basefrag << " by " << bits << dendl; + dout(10) << "EFragment.replay " << op_name(op) << " " << ino << " " << basefrag << " by " << bits << dendl; CInode *in = mds->mdcache->get_inode(ino); if (in) { list resultfrags; list waiters; - mds->mdcache->adjust_dir_fragments(in, basefrag, bits, resultfrags, waiters, true); + + pair desc(dirfrag_t(ino,basefrag), bits); + + switch (op) { + case OP_PREPARE: + mds->mdcache->uncommitted_fragments.insert(desc); + case OP_ONESHOT: + mds->mdcache->adjust_dir_fragments(in, basefrag, bits, resultfrags, waiters, true); + break; + + case OP_COMMIT: + mds->mdcache->uncommitted_fragments.erase(desc); + break; + + case OP_ROLLBACK: + mds->mdcache->adjust_dir_fragments(in, basefrag, -bits, resultfrags, waiters, true); + break; + } } metablob.replay(mds, _segment); } -- 2.39.5