frag_t fg = (unsigned)request->head.args.readdir.frag;
unsigned readdir_offset = dirp->next_offset;
string readdir_start = dirp->last_name;
+ assert(!readdir_start.empty() || readdir_offset == 2);
unsigned last_hash = 0;
- if (!readdir_start.empty())
- last_hash = ceph_frag_value(diri->hash_dentry_name(readdir_start));
+ if (hash_order) {
+ if (!readdir_start.empty()) {
+ last_hash = ceph_frag_value(diri->hash_dentry_name(readdir_start));
+ } else if (flags & CEPH_READDIR_OFFSET_HASH) {
+ /* mds understands offset_hash */
+ last_hash = (unsigned)request->head.args.readdir.offset_hash;
+ }
+ }
if (fg != dst.frag) {
ldout(cct, 10) << "insert_trace got new frag " << fg << " -> " << dst.frag << dendl;
}
ldout(cct, 10) << __func__ << " " << numdn << " readdir items, end=" << end
- << ", hash_order=" << hash_order << ", offset " << readdir_offset
- << ", readdir_start " << readdir_start << dendl;
+ << ", hash_order=" << hash_order
+ << ", readdir_start " << readdir_start
+ << ", last_hash " << last_hash
+ << ", next_offset " << readdir_offset << dendl;
if (diri->snapid != CEPH_SNAPDIR &&
- fg.is_leftmost() && readdir_offset == 2 && readdir_start.empty()) {
+ fg.is_leftmost() && readdir_offset == 2 &&
+ !(hash_order && last_hash)) {
dirp->release_count = diri->dir_release_count;
dirp->ordered_count = diri->dir_ordered_count;
dirp->start_shared_gen = diri->shared_gen;
req->head.args.readdir.flags = CEPH_READDIR_REPLY_BITFLAGS;
if (dirp->last_name.length()) {
req->path2.set_path(dirp->last_name.c_str());
+ } else if (dirp->hash_order()) {
+ req->head.args.readdir.offset_hash = dirp->offset_high();
}
req->dirp = dirp;
#define CEPH_READDIR_FRAG_END (1<<0)
#define CEPH_READDIR_FRAG_COMPLETE (1<<8)
#define CEPH_READDIR_HASH_ORDER (1<<9)
+#define CEPH_READDIR_OFFSET_HASH (1<<10)
/* Note that this is embedded wthin ceph_mds_request_head_legacy. */
union ceph_mds_request_args_legacy {
__le32 max_entries; /* how many dentries to grab */
__le32 max_bytes;
__le16 flags;
+ __le32 offset_hash;
} __attribute__ ((packed)) readdir;
struct {
__le32 mode;
__le32 max_entries; /* how many dentries to grab */
__le32 max_bytes;
__le16 flags;
+ __le32 offset_hash;
} __attribute__ ((packed)) readdir;
struct {
__le32 mode;
frag_t fg = (__u32)req->head.args.readdir.frag;
unsigned req_flags = (__u32)req->head.args.readdir.flags;
string offset_str = req->get_path2();
- dout(10) << " frag " << fg << " offset '" << offset_str << "'"
- << " flags " << req_flags << dendl;
__u32 offset_hash = 0;
if (!offset_str.empty())
offset_hash = ceph_frag_value(diri->hash_dentry_name(offset_str));
+ else
+ offset_hash = (__u32)req->head.args.readdir.offset_hash;
+
+ dout(10) << " frag " << fg << " offset '" << offset_str << "'"
+ << " offset_hash " << offset_hash << " flags " << req_flags << dendl;
// does the frag exist?
if (diri->dirfragtree[fg.value()] != fg) {
// build dir contents
bufferlist dnbl;
__u32 numfiles = 0;
+ bool start = !offset_hash && offset_str.empty();
bool end = (dir->begin() == dir->end());
// skip all dns < dentry_key_t(snapid, offset_str, offset_hash)
dentry_key_t skip_key(snapid, offset_str.c_str(), offset_hash);
- for (CDir::map_t::iterator it = offset_str.empty() ? dir->begin() : dir->lower_bound(skip_key);
+ for (CDir::map_t::iterator it = start ? dir->begin() : dir->lower_bound(skip_key);
!end && numfiles < max;
end = (it == dir->end())) {
CDentry *dn = it->second;
continue;
}
- if (!offset_str.empty()) {
+ if (!start) {
dentry_key_t offset_key(dn->last, offset_str.c_str(), offset_hash);
if (!(offset_key < dn->key()))
continue;
mdcache->lru.lru_touch(dn);
}
- bool complete = false;
__u16 flags = 0;
if (end) {
flags = CEPH_READDIR_FRAG_END;
- complete = offset_str.empty(); // FIXME: what purpose does this serve
- if (complete)
- flags |= CEPH_READDIR_FRAG_COMPLETE;
+ if (start)
+ flags |= CEPH_READDIR_FRAG_COMPLETE; // FIXME: what purpose does this serve
}
// client only understand END and COMPLETE flags ?
if (req_flags & CEPH_READDIR_REPLY_BITFLAGS) {
- flags |= CEPH_READDIR_HASH_ORDER;
+ flags |= CEPH_READDIR_HASH_ORDER | CEPH_READDIR_OFFSET_HASH;
}
// finish final blob
// yay, reply
dout(10) << "reply to " << *req << " readdir num=" << numfiles
<< " bytes=" << dirbl.length()
+ << " start=" << (int)start
<< " end=" << (int)end
- << " complete=" << (int)complete
<< dendl;
mdr->reply_extra_bl = dirbl;