return;
}
- C_Gather *gather = new C_Gather(new C_MDC_FragmentFrozen(this, dirs, frag, 0));
+ CDir *first = dirs.front();
+ int bits = first->get_frag().bits() - frag.bits();
+ dout(10) << " we are merginb by " << bits << " bits" << dendl;
+
+ C_Gather *gather = new C_Gather(new C_MDC_FragmentFrozen(this, dirs, frag, bits));
fragment_freeze_dirs(dirs, gather);
// initial mark+complete pass
}
}
-class C_MDC_FragmentStored : public Context {
+class C_MDC_FragmentLoggedAndStored : public Context {
MDCache *mdcache;
- list<CDir*> dirs;
+ Mutation *mut;
+ list<CDir*> resultfrags;
frag_t basefrag;
int bits;
public:
- C_MDC_FragmentStored(MDCache *m, list<CDir*>& d, frag_t bf, int b) :
- mdcache(m), dirs(d), basefrag(bf), bits(b) {}
+ C_MDC_FragmentLoggedAndStored(MDCache *m, Mutation *mu, list<CDir*>& r, frag_t bf, int bi) :
+ mdcache(m), mut(mu), resultfrags(r), basefrag(bf), bits(bi) {}
virtual void finish(int r) {
- mdcache->fragment_stored(dirs, basefrag, bits);
+ mdcache->fragment_logged_and_stored(mut, resultfrags, basefrag, bits);
}
};
if (bits > 0) {
assert(dirs.size() == 1);
} else {
- assert(bits == 0);
+ assert(bits < 0);
}
dout(10) << "fragment_frozen " << dirs << " " << basefrag << " by " << bits
adjust_dir_fragments(diri, dirs, basefrag, bits, resultfrags, waiters, false);
mds->queue_waiters(waiters);
- C_Gather *gather = new C_Gather(new C_MDC_FragmentStored(this, resultfrags, basefrag, bits));
-
- // freeze, store resulting frags
- for (list<CDir*>::iterator p = resultfrags.begin();
- p != resultfrags.end();
- p++) {
- CDir *dir = *p;
- dout(10) << " result frag " << *dir << dendl;
- dir->state_set(CDir::STATE_FRAGMENTING);
- dir->commit(0, gather->new_sub(), true); // ignore authpinnability
- dir->_freeze_dir();
- }
-}
-
-class C_MDC_FragmentLogged : public Context {
- MDCache *mdcache;
- Mutation *mut;
- list<CDir*> resultfrags;
- frag_t basefrag;
- int bits;
-public:
- C_MDC_FragmentLogged(MDCache *m, Mutation *mu, list<CDir*>& r, frag_t bf, int bi) :
- mdcache(m), mut(mu), resultfrags(r), basefrag(bf), bits(bi) {}
- virtual void finish(int r) {
- mdcache->fragment_logged(mut, resultfrags, basefrag, bits);
- }
-};
-
-void MDCache::fragment_stored(list<CDir*>& resultfrags, frag_t basefrag, int bits)
-{
- CInode *diri = resultfrags.front()->get_inode();
- dout(10) << "fragment_stored " << resultfrags << " " << basefrag << " by " << bits
- << " on " << *diri << dendl;
-
+ // journal
Mutation *mut = new Mutation;
mut->ls = mds->mdlog->get_current_segment();
- EFragment *le = new EFragment(mds->mdlog, diri->ino(), basefrag, bits);
+ EFragment *le = new EFragment(mds->mdlog, EFragment::OP_PREPARE, diri->ino(), basefrag, bits);
mds->mdlog->start_entry(le);
le->metablob.add_dir_context(*resultfrags.begin());
mut->add_updated_lock(&diri->nestlock);
*/
- // journal new dirfrag fragstats for each new fragment.
+ // freeze, journal, and store resulting frags
+ C_Gather *gather = new C_Gather(new C_MDC_FragmentLoggedAndStored(this, mut, resultfrags, basefrag, bits));
+
for (list<CDir*>::iterator p = resultfrags.begin();
p != resultfrags.end();
p++) {
CDir *dir = *p;
dout(10) << " result frag " << *dir << dendl;
le->metablob.add_dir(dir, false);
+
+ // freeze and store them too
+ dir->state_set(CDir::STATE_FRAGMENTING);
+ dir->commit(0, gather->new_sub(), true); // ignore authpinnability
+ dir->_freeze_dir();
}
-
- mds->mdlog->submit_entry(le,
- new C_MDC_FragmentLogged(this, mut, resultfrags, basefrag, bits));
+
+ mds->mdlog->submit_entry(le, gather->new_sub());
mds->mdlog->flush();
}
-void MDCache::fragment_logged(Mutation *mut, list<CDir*>& resultfrags, frag_t basefrag, int bits)
+void MDCache::fragment_logged_and_stored(Mutation *mut, list<CDir*>& resultfrags, frag_t basefrag, int bits)
{
CInode *diri = resultfrags.front()->get_inode();
- dout(10) << "fragment_logged " << resultfrags << " " << basefrag << " bits " << bits
+ dout(10) << "fragment_logged_and_stored " << resultfrags << " " << basefrag << " bits " << bits
<< " on " << *diri << dendl;
+ // journal commit
+ EFragment *le = new EFragment(mds->mdlog, EFragment::OP_COMMIT, diri->ino(), basefrag, bits);
+ mds->mdlog->start_entry(le);
+ mds->mdlog->submit_entry(le);
+
// tell peers
CDir *first = *resultfrags.begin();
for (map<int,int>::iterator p = first->replica_map.begin();
}
+void MDCache::rollback_uncommitted_fragments()
+{
+ dout(10) << "rollback_uncommitted_fragments: " << uncommitted_fragments.size() << " pending" << dendl;
+ for (set< pair<dirfrag_t,int> >::iterator p = uncommitted_fragments.begin();
+ p != uncommitted_fragments.end();
+ ++p) {
+ CInode *diri = get_inode(p->first.ino);
+ assert(diri);
+ dout(10) << " rolling back " << p->first << " refragment by " << p->second << " bits" << dendl;
+ list<CDir*> resultfrags;
+ list<Context*> waiters;
+ adjust_dir_fragments(diri, p->first.frag, -p->second, resultfrags, waiters, true);
+ }
+ uncommitted_fragments.clear();
+}
// -- fragmenting --
+public:
+ set< pair<dirfrag_t,int> > uncommitted_fragments; // prepared but uncommitted refragmentations
+
private:
void adjust_dir_fragments(CInode *diri, frag_t basefrag, int bits,
list<CDir*>& frags, list<Context*>& waiters, bool replay);
void fragment_mark_and_complete(list<CDir*>& dirs);
void fragment_frozen(list<CDir*>& dirs, frag_t basefrag, int bits);
void fragment_unmark_unfreeze_dirs(list<CDir*>& dirs);
- void fragment_stored(list<CDir*>& resultfrags, frag_t basefrag, int bits);
- void fragment_logged(Mutation *mut, list<CDir*>& resultfrags, frag_t basefrag, int bits);
+ void fragment_logged_and_stored(Mutation *mut, list<CDir*>& resultfrags, frag_t basefrag, int bits);
+public:
+ void rollback_uncommitted_fragments();
+private:
+
friend class C_MDC_FragmentFrozen;
friend class C_MDC_FragmentMarking;
- friend class C_MDC_FragmentStored;
- friend class C_MDC_FragmentLogged;
+ friend class C_MDC_FragmentLoggedAndStored;
void handle_fragment_notify(MMDSFragmentNotify *m);
class EFragment : public LogEvent {
public:
EMetaBlob metablob;
+ __u8 op;
inodeno_t ino;
frag_t basefrag;
__s32 bits; // positive for split (from basefrag), negative for merge (to basefrag)
EFragment() : LogEvent(EVENT_FRAGMENT) { }
- EFragment(MDLog *mdlog, inodeno_t i, frag_t bf, int b) :
+ EFragment(MDLog *mdlog, int o, inodeno_t i, frag_t bf, int b) :
LogEvent(EVENT_FRAGMENT), metablob(mdlog),
- ino(i), basefrag(bf), bits(b) { }
+ op(o), ino(i), basefrag(bf), bits(b) { }
void print(ostream& out) {
- out << "EFragment " << ino << " " << basefrag << " by " << bits << " " << metablob;
+ out << "EFragment " << op_name(op) << " " << ino << " " << basefrag << " by " << bits << " " << metablob;
+ }
+
+ enum {
+ OP_PREPARE = 1,
+ OP_COMMIT = 2,
+ OP_ROLLBACK = 3,
+ OP_ONESHOT = 4, // (legacy) PREPARE+COMMIT
+ };
+ const char *op_name(int o) {
+ switch (o) {
+ case OP_PREPARE: return "prepare";
+ case OP_COMMIT: return "commit";
+ case OP_ROLLBACK: return "rollback";
+ default: return "???";
+ }
}
void encode(bufferlist &bl) const {
- __u8 struct_v = 2;
+ __u8 struct_v = 3;
::encode(struct_v, bl);
::encode(stamp, bl);
+ ::encode(op, bl);
::encode(ino, bl);
::encode(basefrag, bl);
::encode(bits, bl);
::decode(struct_v, bl);
if (struct_v >= 2)
::decode(stamp, bl);
+ if (struct_v >= 3)
+ ::decode(op, bl);
+ else
+ op = OP_ONESHOT;
::decode(ino, bl);
::decode(basefrag, bl);
::decode(bits, bl);