in->dirfragtree.swap(fragtree);
in->xattrs.swap(xattrs);
- in->decode_snap(snapbl);
+ in->decode_snap_blob(snapbl);
// add
cache->add_inode( in );
if (in->is_symlink()) {
// include symlink destination!
- dout(18) << " inlcuding symlink ptr " << in->symlink << dendl;
+ dout(18) << " including symlink ptr " << in->symlink << dendl;
::encode(in->symlink, bl);
}
::encode(in->dirfragtree, bl);
::encode(in->xattrs, bl);
- in->encode_snap(bl);
+ bufferlist snapbl;
+ in->encode_snap_blob(snapbl);
+ ::encode(snapbl, bl);
}
}
assert(n == 0);
return cur->snaprealm;
}
-void CInode::encode_snap(bufferlist &bl)
+void CInode::encode_snap_blob(bufferlist &snapbl)
{
- bufferlist snapbl;
- if (snaprealm)
- ::encode(snaprealm, snapbl);
- ::encode(snapbl, bl);
+ if (snaprealm) {
+ ::encode(*snaprealm, snapbl);
+ dout(20) << "encode_snap_blob " << *snaprealm << dendl;
+ }
}
-
-void CInode::decode_snap(bufferlist& snapbl)
+void CInode::decode_snap_blob(bufferlist& snapbl)
{
if (snapbl.length()) {
open_snaprealm();
bufferlist::iterator p = snapbl.begin();
::decode(*snaprealm, p);
+ dout(20) << "decode_snap_blob " << *snaprealm << dendl;
}
}
void open_snaprealm();
void close_snaprealm();
SnapRealm *find_snaprealm();
- void encode_snap(bufferlist &bl);
+ void encode_snap_blob(bufferlist &bl);
+ void decode_snap_blob(bufferlist &bl);
+ void encode_snap(bufferlist& bl) {
+ bufferlist snapbl;
+ encode_snap_blob(snapbl);
+ ::encode(snapbl, bl);
+ }
void decode_snap(bufferlist::iterator& p) {
bufferlist snapbl;
::decode(snapbl, p);
- if (snapbl.length())
- decode_snap(snapbl);
+ decode_snap_blob(snapbl);
}
- void decode_snap(bufferlist &bl);
// -- caps -- (new)
// client caps
follows = m->get_snaps()[0];
dout(7) << "handle_client_file_caps on " << m->get_ino()
<< " follows " << follows
- << " op " << m->get_op() << dendl;
+ << " op " << ceph_cap_op_name(m->get_op()) << dendl;
CInode *head_in = mdcache->get_inode(m->get_ino());
if (!head_in) {
<< " client" << client << " on " << *in << dendl;
int had = cap->confirm_receipt(m->get_seq(), m->get_caps());
int has = cap->confirmed();
- if (in->last != 0 && in->last < CEPH_NOSNAP) {
+
+ // this cap now follows a later snap (i.e. the one initiating this flush, or later)
+ cap->client_follows = follows+1;
+
+ if (in->last && in->last <= follows) {
dout(10) << " flushsnap releasing cloned cap" << dendl;
in->remove_client_cap(client);
} else {
dout(10) << " flushsnap NOT releasing live cap" << dendl;
}
- _do_cap_update(in, has|had, 0, follows, m);
+ _do_cap_update(in, has|had, in->get_caps_wanted(), follows, m);
} else {
// for this and all subsequent versions of this inode,
<< ", has " << cap_string(has)
<< " on " << *in << dendl;
- _do_cap_update(in, had, in->get_caps_wanted() | wanted, follows, m);
-
if (m->get_seq() < cap->get_last_open()) {
/* client may be trying to release caps (i.e. inode closed, etc.)
* by setting reducing wanted set. but it may also be opening the
<< " -> " << cap_string(wanted) << dendl;
cap->set_wanted(wanted);
}
+
+ _do_cap_update(in, had, in->get_caps_wanted() | wanted, follows, m);
// done?
if (in->last == CEPH_NOSNAP || in->last == 0)
inode_t *pi = in->project_inode();
pi->version = in->pre_dirty();
if (change_max) {
- dout(7) << " max_size " << pi->max_size << " -> " << new_max << dendl;
+ dout(7) << " max_size " << pi->max_size << " -> " << new_max << dendl;
pi->max_size = new_max;
}
if (dirty_mtime) {
predirty_nested(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY, false);
mdcache->journal_dirty_inode(&le->metablob, in, follows);
- //le->metablob.add_primary_dentry(in->parent, true, 0, pi);
mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut, change_max));
}
realm = cur->find_snaprealm();
else if (cur->snaprealm)
realm = cur->snaprealm;
- mds->mdcache->journal_dirty_inode(blob, cur, realm->get_latest_snap());
+ mds->mdcache->journal_dirty_inode(blob, cur);
//inode_t *pi = cur->get_projected_inode();
//blob->add_primary_dentry(cur->get_projected_parent_dn(), true, 0, pi);
}
return in;
}
-CInode *MDCache::cow_inode(CInode *in, snapid_t last)
+CInode *MDCache::cow_inode(CInode *in, snapid_t last, bool write_to_clone)
{
assert(last >= in->first);
CInode *oldin = new CInode(this);
- oldin->inode = *in->get_previous_projected_inode();
+ if (write_to_clone)
+ oldin->inode = *in->get_projected_inode();
+ else
+ oldin->inode = *in->get_previous_projected_inode();
oldin->symlink = in->symlink;
oldin->xattrs = in->xattrs;
p++) {
Capability *cap = p->second;
if ((cap->issued() & (CEPH_CAP_WR|CEPH_CAP_WRBUFFER)) &&
- cap->client_follows <= last) {
+ cap->client_follows < last) {
// clone to oldin
int client = p->first;
Capability *newcap = oldin->add_client_cap(client, in->containing_realm);
}
if (oldin->is_any_caps())
oldin->filelock.set_state(LOCK_LOCK);
+ else
+ oldin->inode.max_size = 0;
return oldin;
}
// nothing to cow on a null dentry
assert(!dn->is_null());
+ /*
+ * normally, we write to the head, and make a clone of ther previous
+ * dentry+inode state. unless the follow snapid specified.
+ */
+ bool write_to_clone = false;
+
if (dn->is_primary() && dn->inode->is_multiversion()) {
// multiversion inode.
CInode *in = dn->inode;
if (follows == CEPH_NOSNAP || follows == 0)
follows = in->find_snaprealm()->get_latest_snap();
+ //else
+ //write_to_clone = true;
// already cloned?
if (follows < in->first)
old_inode_t &old = in->old_inodes[follows];
old.first = in->first;
- old.inode = *in->get_previous_projected_inode();
+ if (write_to_clone)
+ old.inode = *in->get_projected_inode();
+ else
+ old.inode = *in->get_previous_projected_inode();
old.xattrs = in->xattrs;
in->first = follows+1;
} else {
if (follows == CEPH_NOSNAP)
follows = dn->dir->inode->find_snaprealm()->get_latest_snap();
+ //else
+ //write_to_clone = true;
// already cloned?
if (follows < dn->first)
dout(10) << " dn " << *dn << dendl;
if (dn->is_primary()) {
assert(oldfirst == dn->inode->first);
- CInode *oldin = cow_inode(dn->inode, follows);
+ CInode *oldin = cow_inode(dn->inode, follows, write_to_clone);
CDentry *olddn = dn->dir->add_primary_dentry(dn->name, oldin, oldfirst, follows);
dout(10) << " olddn " << *olddn << dendl;
metablob->add_primary_dentry(olddn, true);
void MDCache::journal_dirty_inode(EMetaBlob *metablob, CInode *in, snapid_t follows)
{
- journal_cow_inode(metablob, in, follows);
- metablob->add_primary_dentry(in->get_projected_parent_dn(), true, in, in->get_projected_inode());
+ CDentry *dn = in->get_projected_parent_dn();
+ journal_cow_dentry(metablob, dn, follows);
+ metablob->add_primary_dentry(dn, true, in, in->get_projected_inode());
}
// journal helpers
CInode *pick_inode_snap(CInode *in, snapid_t follows);
- CInode *cow_inode(CInode *in, snapid_t last);
+ CInode *cow_inode(CInode *in, snapid_t last, bool write_to_clone=false);
void journal_cow_dentry(EMetaBlob *metablob, CDentry *dn, snapid_t follows=CEPH_NOSNAP);
void journal_cow_inode(EMetaBlob *metablob, CInode *in, snapid_t follows=CEPH_NOSNAP);
void journal_dirty_inode(EMetaBlob *metablob, CInode *in, snapid_t follows=CEPH_NOSNAP);
bool have_inode(inodeno_t ino, snapid_t snap=0) { return have_inode(vinodeno_t(ino, snap)); }
bool have_inode(vinodeno_t vino) { return inode_map.count(vino) ? true:false; }
CInode* get_inode(inodeno_t ino, snapid_t s=0) {
+ if (s == CEPH_NOSNAP)
+ s = 0; // ugly hack.
vinodeno_t vino(ino,s);
if (have_inode(vino))
return inode_map[vino];
mds->mdcache->add_uncommitted_master(mdr->reqid, mdr->ls, mdr->more()->slaves);
}
- snapid_t dnfollows = dn->dir->inode->find_snaprealm()->get_latest_snap();
if (inc) {
+ snapid_t dnfollows = dn->dir->inode->find_snaprealm()->get_latest_snap();
dn->first = dnfollows + 1;
dn->pre_dirty();
mds->locker->predirty_nested(mdr, &le->metablob, targeti, dn->dir, PREDIRTY_DIR, 1);
} else {
dn->pre_dirty();
mds->locker->predirty_nested(mdr, &le->metablob, targeti, dn->dir, PREDIRTY_DIR, -1);
- mdcache->journal_cow_dentry(&le->metablob, dn, dnfollows);
+ mdcache->journal_cow_dentry(&le->metablob, dn);
le->metablob.add_null_dentry(dn, true);
}
le->metablob.add_client_req(req->get_reqid());
le->metablob.add_table_transaction(TABLE_SNAP, stid);
mds->locker->predirty_nested(mdr, &le->metablob, diri, 0, PREDIRTY_PRIMARY, false);
- mdcache->journal_dirty_inode(&le->metablob, diri, diri->find_snaprealm()->get_latest_snap());
+ mdcache->journal_cow_inode(&le->metablob, diri);
+
+ // project the snaprealm
+ bufferlist snapbl;
+ if (diri->snaprealm) {
+ diri->snaprealm->snaps[snapid] = info;
+ diri->encode_snap_blob(snapbl);
+ diri->snaprealm->snaps.erase(snapid);
+ } else {
+ SnapRealm t(mdcache, diri);
+ t.created = snapid;
+ t.snaps[snapid] = info;
+ ::encode(t, snapbl);
+ }
+ le->metablob.add_primary_dentry(diri->get_projected_parent_dn(), true, 0, pi, 0, &snapbl);
mdlog->submit_entry(le, new C_MDS_mksnap_finish(mds, mdr, diri, info));
}
// return remote pointer to to-be-journaled inode
inode_t *add_primary_dentry(CDentry *dn, bool dirty,
- CInode *in=0, inode_t *pi=0, fragtree_t *pdft=0) {
+ CInode *in=0, inode_t *pi=0, fragtree_t *pdft=0, bufferlist *psnapbl=0) {
return add_primary_dentry(add_dir(dn->get_dir(), false),
- dn, dirty, in, pi, pdft);
+ dn, dirty, in, pi, pdft, psnapbl);
}
inode_t *add_primary_dentry(dirlump& lump, CDentry *dn, bool dirty,
- CInode *in=0, inode_t *pi=0, fragtree_t *pdft=0) {
+ CInode *in=0, inode_t *pi=0, fragtree_t *pdft=0, bufferlist *psnapbl=0) {
if (!in)
in = dn->get_inode();
//cout << "journaling " << in->inode.ino << " at " << my_offset << std::endl;
bufferlist snapbl;
- in->encode_snap(snapbl);
+ if (psnapbl)
+ snapbl = *psnapbl;
+ else
+ in->encode_snap_blob(snapbl);
lump.nfull++;
if (dirty) {
p != lump.get_dfull().end();
p++) {
CDentry *dn = dir->lookup(p->dn, p->dnlast);
- if (dn && dn->first < p->dnfirst) {
- dn->last = p->dnfirst-1;
- dout(10) << "EMetaBlob.replay versioned " << *dn << dendl;
- dn = 0;
- }
if (!dn) {
dn = dir->add_null_dentry(p->dn, p->dnfirst, p->dnlast);
dn->set_version(p->dnv);
if (p->dirty) dn->_mark_dirty(logseg);
dout(10) << "EMetaBlob.replay added " << *dn << dendl;
} else {
- assert(p->dnfirst == dn->first);
- dn->last = p->dnlast;
dn->set_version(p->dnv);
if (p->dirty) dn->_mark_dirty(logseg);
- dout(10) << "EMetaBlob.replay had " << *dn << dendl;
+ dout(10) << "EMetaBlob.replay for [" << p->dnfirst << "," << p->dnlast << "] had " << *dn << dendl;
+ dn->first = p->dnfirst;
+ assert(dn->last == p->dnlast);
}
- CInode *in = mds->mdcache->get_inode(p->inode.ino);
+ CInode *in = mds->mdcache->get_inode(p->inode.ino, p->dnlast);
if (!in) {
in = new CInode(mds->mdcache);
+ in->first = p->dnfirst;
+ if (p->dnlast < CEPH_NOSNAP)
+ in->last = p->dnlast;
in->inode = p->inode;
- in->dirfragtree = p->dirfragtree;
in->xattrs = p->xattrs;
+ if (in->inode.is_dir()) {
+ in->dirfragtree = p->dirfragtree;
+ in->decode_snap_blob(p->snapbl);
+ }
if (in->inode.is_symlink()) in->symlink = p->symlink;
mds->mdcache->add_inode(in);
if (!dn->is_null()) {
if (in->get_parent_dn() && in->inode.anchored != p->inode.anchored)
in->get_parent_dn()->adjust_nested_anchors( (int)p->inode.anchored - (int)in->inode.anchored );
in->inode = p->inode;
- in->dirfragtree = p->dirfragtree;
in->xattrs = p->xattrs;
+ if (in->inode.is_dir()) {
+ in->dirfragtree = p->dirfragtree;
+ in->decode_snap_blob(p->snapbl);
+ }
if (in->inode.is_symlink()) in->symlink = p->symlink;
if (p->dirty) in->_mark_dirty(logseg);
if (dn->get_inode() != in) {
dir->link_primary_inode(dn, in);
dout(10) << "EMetaBlob.replay linked " << *in << dendl;
} else {
- dout(10) << "EMetaBlob.replay had " << *in << dendl;
+ dout(10) << "EMetaBlob.replay for [" << p->dnfirst << "," << p->dnlast << "] had " << *in << dendl;
}
+ in->first = p->dnfirst;
}
}
p != lump.get_dremote().end();
p++) {
CDentry *dn = dir->lookup(p->dn, p->dnlast);
- if (dn && dn->first < p->dnfirst) {
- dn->last = p->dnfirst-1;
- dn->_mark_dirty(logseg);
- dout(10) << "EMetaBlob.replay versioned " << *dn << dendl;
- dn = 0;
- }
if (!dn) {
dn = dir->add_remote_dentry(p->dn, p->ino, p->d_type, p->dnfirst, p->dnlast);
dn->set_version(p->dnv);
if (p->dirty) dn->_mark_dirty(logseg);
dout(10) << "EMetaBlob.replay added " << *dn << dendl;
} else {
- assert(p->dnfirst == dn->first);
- dn->last = p->dnlast;
if (!dn->is_null()) {
dout(10) << "EMetaBlob.replay unlinking " << *dn << dendl;
dir->unlink_inode(dn);
dn->set_remote(p->ino, p->d_type);
dn->set_version(p->dnv);
if (p->dirty) dn->_mark_dirty(logseg);
- dout(10) << "EMetaBlob.replay had " << *dn << dendl;
+ dout(10) << "EMetaBlob.replay for [" << p->dnfirst << "," << p->dnlast << "] had " << *dn << dendl;
+ dn->first = p->dnfirst;
+ assert(dn->last == p->dnlast);
}
}
p != lump.get_dnull().end();
p++) {
CDentry *dn = dir->lookup(p->dn, p->dnfirst);
- if (dn && dn->first < p->dnfirst) {
- dn->last = p->dnfirst-1;
- dn->_mark_dirty(logseg);
- dout(10) << "EMetaBlob.replay versioned " << *dn << dendl;
- dn = 0;
- }
if (!dn) {
dn = dir->add_null_dentry(p->dn, p->dnfirst, p->dnlast);
dn->set_version(p->dnv);
if (p->dirty) dn->_mark_dirty(logseg);
dout(10) << "EMetaBlob.replay added " << *dn << dendl;
} else {
- assert(p->dnfirst == dn->first);
- dn->last = p->dnlast;
+ dn->first = p->dnfirst;
if (!dn->is_null()) {
dout(10) << "EMetaBlob.replay unlinking " << *dn << dendl;
dir->unlink_inode(dn);
dn->set_version(p->dnv);
if (p->dirty) dn->_mark_dirty(logseg);
dout(10) << "EMetaBlob.replay had " << *dn << dendl;
+ assert(dn->last == p->dnlast);
}
}
}