offset_hash = (__u32)req->head.args.snapdiff.offset_hash;
}
- dout(10) << " frag " << fg << " offset '" << offset_str << "'"
+ dout(10) << __func__ << " frag " << fg << " offset '" << offset_str << "'"
<< " offset_hash " << offset_hash << " flags " << req_flags << dendl;
// does the frag exist?
std::swap(snapid, snapid_prev);
}
bool from_the_beginning = !offset_hash && offset_str.empty();
- // skip all dns < dentry_key_t(snapid, offset_str, offset_hash)
- dentry_key_t skip_key(snapid_prev, offset_str.c_str(), offset_hash);
+ // skip all dns <= dentry_key_t(*, offset_str, offset_hash)
+ dentry_key_t skip_key(CEPH_NOSNAP, offset_str.c_str(), offset_hash);
+
+ // We need to rollback all the entries with the same name
+ // when some entries with this name don't fit into the same fragment.
+ // This is caused by the limited ability for offset provisioning between
+ // fragments - there is no way to identify specific snapshot for the last entry.
+ // The following vars denote the potential rollback position for such a case.
+ // Fixes: https://tracker.ceph.com/issues/72518
+ string last_name;
+ size_t rollback_pos = 0;
+ size_t rollback_num = 0;
bool end = build_snap_diff(
mdr,
effective_snapid = exists ? snapid : snapid_prev;
name.append(dn_name);
if ((int)(dnbl.length() + name.length() + sizeof(__u32) + sizeof(LeaseStat)) > bytes_left) {
- dout(10) << " ran out of room, stopping at " << dnbl.length() << " < " << bytes_left << dendl;
+ dout(10) << " ran out of room for name, stopping at " << dnbl.length() << " < " << bytes_left << dendl;
+ if (name == last_name) {
+ bufferlist keep;
+ keep.substr_of(dnbl, 0, rollback_pos);
+ dnbl.swap(keep);
+ last_name.clear();
+ rollback_pos = 0;
+ numfiles = rollback_num;
+ rollback_num = 0;
+ }
return false;
}
unsigned start_len = dnbl.length();
dout(10) << "inc dn " << *dn << " as " << name
<< std::hex << " hash 0x" << hash << std::dec
+ << " " << effective_snapid
<< dendl;
encode(name, dnbl);
mds->locker->issue_client_lease(dn, in, mdr, now, dnbl);
dout(10) << " ran out of room, stopping at "
<< start_len << " < " << bytes_left << dendl;
bufferlist keep;
- keep.substr_of(dnbl, 0, start_len);
+
+ keep.substr_of(dnbl, 0,
+ name == last_name ? rollback_pos : start_len);
dnbl.swap(keep);
+
+ last_name.clear();
+ rollback_pos = 0;
+ numfiles = rollback_num;
+ rollback_num = 0;
return false;
}
+ // set rollback position
+ if (name != last_name) {
+ last_name = name;
+ rollback_pos = start_len;
+ rollback_num = numfiles;
+ }
// touch dn
mdcache->lru.lru_touch(dn);
++numfiles;
return r;
};
- auto it = !skip_key ? dir->begin() : dir->lower_bound(*skip_key);
+ auto it = !skip_key ? dir->begin() : dir->upper_bound(*skip_key);
while(it != dir->end()) {
CDentry* dn = it->second;
dout(20) << __func__ << " not in range, skipping" << dendl;
continue;
}
- if (skip_key) {
- skip_key->snapid = dn->last;
- if (!(*skip_key < dn->key()))
- continue;
- }
CInode* in = dnl->get_inode();
if (in && in->ino() == CEPH_INO_CEPH)
ceph_assert(in);
utime_t mtime = in->get_inode()->mtime;
-
if (in->is_dir()) {
// we need to maintain the order of entries (determined by their name hashes)