#define CEPH_MDS_PROTOCOL 7 /* cluster internal */
#define CEPH_MON_PROTOCOL 4 /* cluster internal */
#define CEPH_OSDC_PROTOCOL 6 /* public/client */
-#define CEPH_MDSC_PROTOCOL 15 /* public/client */
+#define CEPH_MDSC_PROTOCOL 16 /* public/client */
#define CEPH_MONC_PROTOCOL 11 /* public/client */
} __attribute__ ((packed)) fstat;
struct {
__le32 frag;
+ __le32 max_entries;
} __attribute__ ((packed)) readdir;
struct {
__le32 mode;
int err;
u32 ftype;
struct ceph_mds_reply_info_parsed *rinfo;
+ int complete = 0;
/* set I_READDIR at start of readdir */
if (filp->f_pos == 0)
return err;
}
dout(10, "readdir got and parsed readdir result=%d"
- " on frag %x\n", err, frag);
+ " on frag %x, end=%d, complete=%d\n", err, frag,
+ (int)req->r_reply_info.dir_complete,
+ (int)req->r_reply_info.dir_end);
+ if (req->r_reply_info.dir_complete)
+ complete = 1;
fi->last_readdir = req;
}
* dir contents in our cache.
*/
spin_lock(&inode->i_lock);
- if (ci->i_ceph_flags & CEPH_I_READDIR) {
+ if (complete && (ci->i_ceph_flags & CEPH_I_READDIR)) {
dout(10, " marking %p complete\n", inode);
ci->i_ceph_flags |= CEPH_I_COMPLETE;
ci->i_ceph_flags &= ~CEPH_I_READDIR;
if (*p > end)
goto bad;
- ceph_decode_32_safe(p, end, num, bad);
+ ceph_decode_need(p, end, sizeof(num) + 2, bad);
+ ceph_decode_32(p, num);
+ ceph_decode_8(p, info->dir_end);
+ ceph_decode_8(p, info->dir_complete);
if (num == 0)
goto done;
u32 *dir_dname_len;
struct ceph_mds_reply_lease **dir_dlease;
struct ceph_mds_reply_info_in *dir_in;
+ u8 dir_complete, dir_end;
/* encoded blob describing snapshot contexts for certain
operations (e.g., open) */
bufferlist dirbl, dnbl;
dir->encode_dirstat(dirbl, mds->get_nodeid());
- __u32 numfiles = 0;
CDir::map_t::iterator it = dir->begin();
- while (it != dir->end()) {
+
+ unsigned max = req->head.args.readdir.max_entries;
+ if (!max)
+ max = dir->get_num_any(); // whatever, something big.
+
+ nstring offset_str = req->get_path2();
+ const char *offset = offset_str.length() ? offset_str.c_str() : 0;
+
+ __u32 numfiles = 0;
+ while (it != dir->end() && numfiles < max) {
CDentry *dn = it->second;
it++;
+ if (offset && strcmp(dn->get_name().c_str(), offset) <= 0)
+ continue;
+
bool dnp = dn->use_projected(client, mdr);
CDentry::linkage_t *dnl = dnp ? dn->get_projected_linkage() : dn->get_linkage();
// touch dn
mdcache->lru.lru_touch(dn);
}
+
+ __u8 end = (it == dir->end());
+ __u8 complete = (end && !offset);
+
+ // final blob
::encode(numfiles, dirbl);
+ ::encode(end, dirbl);
+ ::encode(complete, dirbl);
dirbl.claim_append(dnbl);
if (snaps)
// yay, reply
MClientReply *reply = new MClientReply(req, 0);
reply->set_dir_bl(dirbl);
- dout(10) << "reply to " << *req << " readdir " << numfiles << " files" << dendl;
+ dout(10) << "reply to " << *req << " readdir num=" << numfiles << " end=" << (int)end
+ << " complete=" << (int)complete << dendl;
// bump popularity. NOTE: this doesn't quite capture it.
mds->balancer->hit_dir(g_clock.now(), dir, META_POP_IRD, -1, numfiles);