struct {
__le32 frag; /* which dir fragment */
__le32 max_entries; /* how many dentries to grab */
+ __le32 max_bytes;
} __attribute__ ((packed)) readdir;
struct {
__le32 mode;
}
-bool CInode::encode_inodestat(bufferlist& bl, Session *session,
+int CInode::encode_inodestat(bufferlist& bl, Session *session,
SnapRealm *realm,
- snapid_t snapid)
+ snapid_t snapid, unsigned max_bytes)
{
int client = session->inst.name.num();
assert(snapid);
i = pxattr ? pi:oi;
bool had_latest_xattrs = cap && (cap->issued() & CEPH_CAP_XATTR_SHARED) &&
cap->client_xattr_version == i->xattr_version;
+
+ // xattr
+ bufferlist xbl;
+ e.xattr_version = i->xattr_version;
+ if (!had_latest_xattrs && cap) {
+ if (!pxattrs)
+ pxattrs = pxattr ? get_projected_xattrs() : &xattrs;
+ ::encode(*pxattrs, xbl);
+ }
+ bufferlist splits;
+ for (map<frag_t,int32_t>::iterator p = dirfragtree._splits.begin();
+ p != dirfragtree._splits.end();
+ p++) {
+ ::encode(p->first, bl);
+ ::encode(p->second, bl);
+ }
+
+ // do we have room?
+ if (max_bytes) {
+ unsigned bytes = sizeof(e);
+ bytes += sizeof(__u32);
+ for (map<frag_t,int32_t>::iterator p = dirfragtree._splits.begin();
+ p != dirfragtree._splits.end();
+ p++)
+ bytes += sizeof(p->first) + sizeof(p->second);
+ bytes += sizeof(__u32) + symlink.length();
+ bytes += sizeof(__u32) + xbl.length();
+
+ if (bytes > max_bytes)
+ return -ENOSPC;
+ }
+
+
// encode caps
if (snapid != CEPH_NOSNAP) {
/*
<< " seq " << e.cap.seq
<< " mseq " << e.cap.mseq << dendl;
- // xattr
- bufferlist xbl;
- e.xattr_version = i->xattr_version;
- if (!had_latest_xattrs &&
- cap &&
- (cap->pending() & CEPH_CAP_XATTR_SHARED)) {
-
- if (!pxattrs)
- pxattrs = pxattr ? get_projected_xattrs() : &xattrs;
-
- ::encode(*pxattrs, xbl);
- if (cap)
+ // include those xattrs?
+ if (xbl.length()) {
+ if (cap && (cap->pending() & CEPH_CAP_XATTR_SHARED)) {
+ dout(10) << "including xattrs version " << i->xattr_version << dendl;
cap->client_xattr_version = i->xattr_version;
- dout(10) << "including xattrs version " << i->xattr_version << dendl;
+ } else {
+ xbl.clear(); // no xattrs
+ }
}
// encode
// for giving to clients
- bool encode_inodestat(bufferlist& bl, Session *session, SnapRealm *realm,
- snapid_t snapid=CEPH_NOSNAP);
+ int encode_inodestat(bufferlist& bl, Session *session, SnapRealm *realm,
+ snapid_t snapid=CEPH_NOSNAP, unsigned max_bytes=0);
void encode_cap_message(MClientCaps *m, Capability *cap);
unsigned max = req->head.args.readdir.max_entries;
if (!max)
max = dir->get_num_any(); // whatever, something big.
+ unsigned max_bytes = req->head.args.readdir.max_bytes;
+ if (!max_bytes)
+ max_bytes = 512 << 10; // 512 KB?
+ // start final blob
+ bufferlist dirbl;
+ dir->encode_dirstat(dirbl, mds->get_nodeid());
+
+ // count bytes available.
+ // this isn't perfect, but we should capture the main variable/unbounded size items!
+ int front_bytes = dirbl.length() + sizeof(__u32) + sizeof(__u8)*2;
+ int bytes_left = max_bytes - front_bytes;
+ bytes_left -= realm->get_snap_trace().length();
__u32 numfiles = 0;
while (it != dir->end() && numfiles < max) {
}
assert(in);
+ if ((int)(dnbl.length() + dn->name.length() + sizeof(__u32) + sizeof(LeaseStat)) > bytes_left) {
+ dout(10) << " ran out of room, stopping at " << dnbl.length() << " < " << bytes_left << dendl;
+ break;
+ }
+
+ unsigned start_len = dnbl.length();
+
// dentry
dout(12) << "including dn " << *dn << dendl;
::encode(dn->name, dnbl);
// inode
dout(12) << "including inode " << *in << dendl;
- bool valid = in->encode_inodestat(dnbl, mdr->session, realm, snapid);
- assert(valid);
+ int r = in->encode_inodestat(dnbl, mdr->session, realm, snapid, bytes_left - (int)dnbl.length());
+ if (r < 0) {
+ // chop off dn->name, lease
+ dout(10) << " ran out of room, stopping at " << start_len << " < " << bytes_left << dendl;
+ bufferlist keep;
+ keep.substr_of(dnbl, 0, start_len);
+ dnbl.swap(keep);
+ break;
+ }
+ assert(r >= 0);
numfiles++;
// touch dn
__u8 end = (it == dir->end());
__u8 complete = (end && !offset); // FIXME: what purpose does this serve
-
- // final blob
- bufferlist dirbl;
- dir->encode_dirstat(dirbl, mds->get_nodeid());
+
+ // finish final blob
::encode(numfiles, dirbl);
::encode(end, dirbl);
::encode(complete, dirbl);
dir->log_mark_dirty();
// yay, reply
+ dout(10) << "reply to " << *req << " readdir num=" << numfiles
+ << " bytes=" << dirbl.length()
+ << " end=" << (int)end
+ << " complete=" << (int)complete
+ << dendl;
MClientReply *reply = new MClientReply(req, 0);
reply->set_dir_bl(dirbl);
- dout(10) << "reply to " << *req << " readdir num=" << numfiles << " end=" << (int)end
- << " complete=" << (int)complete << dendl;
// bump popularity. NOTE: this doesn't quite capture it.
mds->balancer->hit_dir(g_clock.now(), dir, META_POP_IRD, -1, numfiles);