int n = 0;
for (list<frag_t>::iterator p = frags.begin(); p != frags.end(); ++p) {
CDir *f = new CDir(inode, *p, cache, is_auth());
- f->state_set(state & MASK_STATE_FRAGMENT_KEPT);
+ f->state_set(state & (MASK_STATE_FRAGMENT_KEPT | STATE_COMPLETE));
f->replica_map = replica_map;
f->dir_auth = dir_auth;
f->init_fragment_pins();
inode->close_dirfrag(dir->get_frag());
}
+ if (is_auth() && !replay)
+ mark_complete();
+
// FIXME: merge dirty old rstat
fnode.rstat.version = rstat_version;
fnode.accounted_rstat = fnode.rstat;
|STATE_FROZENDIR
|STATE_STICKY);
static const unsigned MASK_STATE_FRAGMENT_KEPT =
- (STATE_DIRTY |
- STATE_COMPLETE |
+ (STATE_DIRTY|
STATE_EXPORTBOUND |
STATE_IMPORTBOUND);
uf.bits = bits;
uf.ls = ls;
ls->uncommitted_fragments.insert(basedirfrag);
- if (rollback)
+ if (rollback) {
uf.rollback.swap(*rollback);
+ // preserve COMPLETE flag for newly created dirfrag
+ if (bits > 0 && basedirfrag.frag == frag_t()) {
+ CDir *dir = get_dirfrag(basedirfrag);
+ if (dir && dir->is_complete())
+ uf.complete = true;
+ }
+ }
}
void MDCache::finish_uncommitted_fragment(dirfrag_t basedirfrag, int op)
dir->set_version(rollback.fnode.version);
dir->fnode = rollback.fnode;
+ if (uf.complete)
+ dir->mark_complete();
dir->_mark_dirty(ls);
if (!(dir->fnode.rstat == dir->fnode.accounted_rstat)) {
le->add_orig_frag(dir->get_frag());
le->metablob.add_dir_context(dir);
- le->metablob.add_dir(dir, true);
+ le->metablob.add_dir(dir, true, uf.complete);
}
}
struct ufragment {
int bits;
bool committed;
+ bool complete;
LogSegment *ls;
list<Context*> waiters;
list<frag_t> old_frags;
bufferlist rollback;
- ufragment() : bits(0), committed(false), ls(NULL) {}
+ ufragment() : bits(0), committed(false), complete(false), ls(NULL) {}
};
map<dirfrag_t, ufragment> uncommitted_fragments;