dout(4) << "inspecting lump " << frag_oid.name << dendl;
+
+ // We will record old fnode version for use in hard link handling
+ // If we don't read an old fnode, take version as zero and write in
+ // all hardlinks we find.
+ version_t old_fnode_version = 0;
+
// Update fnode in omap header of dirfrag object
bool write_fnode = false;
bufferlist old_fnode_bl;
old_fnode.decode(old_fnode_iter);
dout(4) << "frag " << frag_oid.name << " fnode old v" <<
old_fnode.version << " vs new v" << lump.fnode.version << dendl;
- write_fnode = old_fnode.version < lump.fnode.version;
+ old_fnode_version = old_fnode.version;
+ write_fnode = old_fnode_version < lump.fnode.version;
} catch (const buffer::error &err) {
dout(1) << "frag " << frag_oid.name
<< " is corrupt, overwriting" << dendl;
std::set<std::string> read_keys;
- // Compose list of existing dentries we would like to fetch
+ // Compose list of potentially-existing dentries we would like to fetch
list<ceph::shared_ptr<EMetaBlob::fullbit> > const &fb_list =
lump.get_dfull();
for (list<ceph::shared_ptr<EMetaBlob::fullbit> >::const_iterator fbi =
read_keys.insert(key);
}
+ list<EMetaBlob::remotebit> const &rb_list =
+ lump.get_dremote();
+ for (list<EMetaBlob::remotebit>::const_iterator rbi =
+ rb_list.begin(); rbi != rb_list.end(); ++rbi) {
+ EMetaBlob::remotebit const &rb = *rbi;
+
+ // Get a key like "foobar_head"
+ std::string key;
+ dentry_key_t dn_key(rb.dnlast, rb.dn.c_str());
+ dn_key.encode(key);
+ read_keys.insert(key);
+ }
+
// Perform bulk read of existing dentries
std::map<std::string, bufferlist> read_vals;
r = io.omap_get_vals_by_keys(frag_oid.name, read_keys, &read_vals);
// leave write_dentry false, we have no version to
// compare with in a hardlink, so it's not safe to
// squash over it with what's in this fullbit
- dout(4) << "skipping hardlink dentry in backing store" << dendl;
+ dout(10) << "Existing remote inode in slot to be (maybe) written "
+ << "by a full inode from the journal dn '" << fb.dn.c_str()
+ << "' with lump fnode version " << lump.fnode.version
+ << "vs existing fnode version " << old_fnode_version << dendl;
+ write_dentry = old_fnode_version < lump.fnode.version;
} else if (dentry_type == 'I') {
// Read out inode version to compare with backing store
InodeStore inode;
}
if (write_dentry && !dry_run) {
- dout(4) << "writing dentry " << key << " into frag "
+ dout(4) << "writing I dentry " << key << " into frag "
<< frag_oid.name << dendl;
// Compose: Dentry format is dnfirst, [I|L], InodeStore(bare=true)
}
}
+ for (list<EMetaBlob::remotebit>::const_iterator rbi =
+ rb_list.begin(); rbi != rb_list.end(); ++rbi) {
+ EMetaBlob::remotebit const &rb = *rbi;
+
+ // Get a key like "foobar_head"
+ std::string key;
+ dentry_key_t dn_key(rb.dnlast, rb.dn.c_str());
+ dn_key.encode(key);
+
+ dout(4) << "inspecting remotebit " << frag_oid.name << "/" << rb.dn
+ << dendl;
+ bool write_dentry = false;
+ if (read_vals.find(key) == read_vals.end()) {
+ dout(4) << "dentry did not already exist, will create" << dendl;
+ write_dentry = true;
+ } else {
+ dout(4) << "dentry " << key << " existed already" << dendl;
+ dout(4) << "dentry exists, checking versions..." << dendl;
+ bufferlist &old_dentry = read_vals[key];
+ // Decode dentry+inode
+ bufferlist::iterator q = old_dentry.begin();
+
+ snapid_t dnfirst;
+ ::decode(dnfirst, q);
+ char dentry_type;
+ ::decode(dentry_type, q);
+
+ if (dentry_type == 'L') {
+ dout(10) << "Existing hardlink inode in slot to be (maybe) written "
+ << "by a remote inode from the journal dn '" << rb.dn.c_str()
+ << "' with lump fnode version " << lump.fnode.version
+ << "vs existing fnode version " << old_fnode_version << dendl;
+ write_dentry = old_fnode_version < lump.fnode.version;
+ } else if (dentry_type == 'I') {
+ dout(10) << "Existing full inode in slot to be (maybe) written "
+ << "by a remote inode from the journal dn '" << rb.dn.c_str()
+ << "' with lump fnode version " << lump.fnode.version
+ << "vs existing fnode version " << old_fnode_version << dendl;
+ write_dentry = old_fnode_version < lump.fnode.version;
+ } else {
+ dout(4) << "corrupt dentry in backing store, overwriting from "
+ "journal" << dendl;
+ write_dentry = true;
+ }
+ }
+
+ if (write_dentry && !dry_run) {
+ dout(4) << "writing L dentry " << key << " into frag "
+ << frag_oid.name << dendl;
+
+ // Compose: Dentry format is dnfirst, [I|L], InodeStore(bare=true)
+ bufferlist dentry_bl;
+ ::encode(rb.dnfirst, dentry_bl);
+ ::encode('L', dentry_bl);
+ ::encode(rb.ino, dentry_bl);
+ ::encode(rb.d_type, dentry_bl);
+
+ // Record for writing to RADOS
+ write_vals[key] = dentry_bl;
+ consumed_inos->insert(rb.ino);
+ }
+ }
+
// Write back any new/changed dentries
if (!write_vals.empty()) {
r = io.omap_set(frag_oid.name, write_vals);