__le64 files, subdirs, rbytes, rfiles, rsubdirs; /* dir stats */
struct ceph_timespec rctime;
struct ceph_frag_tree_head fragtree;
+ __le64 xattr_version;
} __attribute__ ((packed));
/* followed by frag array, then symlink string, then xattr blob */
/* xattrlock */
__le32 xattr_len;
+ __le64 xattr_version;
/* filelock */
__le64 size, max_size;
i = get_projected_inode();
else
i = &inode;
- bufferlist xbl;
+
+ map<string, bufferptr> *pxattrs = &xattrs;
+
if (snapid && is_multiversion()) {
// for now at least, old_inodes is only defined/valid on the auth
<< dendl;
assert(p->second.first <= snapid && snapid <= p->first);
i = &p->second.inode;
- ::encode(p->second.xattrs, xbl);
+ pxattrs = &p->second.xattrs;
}
}
e.rdev = i->rdev;
e.fragtree.nsplits = dirfragtree._splits.size();
- // include capability?
Capability *cap = get_client_cap(client);
+
+ bool had_latest_xattrs = cap && (cap->issued() & CEPH_CAP_XATTR_RDCACHE) &&
+ cap->client_xattr_version == i->xattr_version;
+
+ // include capability?
if (snapid != CEPH_NOSNAP && !cap) {
e.cap.caps = valid ? get_caps_allowed(false) : CEPH_STAT_CAP_INODE;
e.cap.seq = 0;
<< " seq " << e.cap.seq
<< " mseq " << e.cap.mseq << dendl;
+ // xattr
+ bufferlist xbl;
+ e.xattr_version = i->xattr_version;
+ if (!had_latest_xattrs &&
+ cap &&
+ (cap->pending() & CEPH_CAP_XATTR_RDCACHE)) {
+ ::encode(*pxattrs, xbl);
+ if (cap)
+ cap->client_xattr_version = i->xattr_version;
+ dout(10) << "including xattrs version " << i->xattr_version << dendl;
+ }
+
// encode
::encode(e, bl);
for (map<frag_t,int32_t>::iterator p = dirfragtree._splits.begin();
::encode(p->second, bl);
}
::encode(symlink, bl);
-
- if (!xattrs.empty() && xbl.length() == 0)
- ::encode(xattrs, xbl);
::encode(xbl, bl);
return valid;
int releasing; // only allow a single in-progress release (it may be waiting for log to flush)
snapid_t client_follows;
+ version_t client_xattr_version;
xlist<Capability*>::item session_caps_item;
xlist<Capability*> *rdcaps_list;
last_sent(0),
mseq(0),
suppress(0), stale(false), releasing(0),
- client_follows(0),
+ client_follows(0), client_xattr_version(0),
session_caps_item(this), rdcaps_list(rl), rdcaps_item(this), snaprealm_caps_item(this) { }
ceph_seq_t get_mseq() { return mseq; }
<< " seq " << cap->get_last_seq()
<< " new pending " << ccap_string(after) << " was " << ccap_string(before)
<< dendl;
- mds->send_message_client(new MClientCaps(CEPH_CAP_OP_GRANT,
- in->inode,
- in->find_snaprealm()->inode->ino(),
- cap->get_last_seq(),
- after, wanted, 0,
- cap->get_mseq()),
- it->first);
+
+ MClientCaps *m = new MClientCaps(CEPH_CAP_OP_GRANT,
+ in->inode,
+ in->find_snaprealm()->inode->ino(),
+ cap->get_last_seq(),
+ after, wanted, 0,
+ cap->get_mseq());
+
+ // include xattrs if they're newer than what the client has
+ if ((after & CEPH_CAP_XATTR_RDCACHE) &&
+ in->inode.xattr_version > cap->client_xattr_version) {
+ dout(10) << " including xattrs v " << in->inode.xattr_version << dendl;
+ ::encode(in->xattrs, m->xattrbl);
+ m->head.xattr_version = in->inode.xattr_version;
+ cap->client_xattr_version = in->inode.xattr_version;
+ }
+
+ mds->send_message_client(m, it->first);
}
}
}
inode_t *pi = cur->project_inode();
pi->version = cur->pre_dirty();
pi->ctime = g_clock.real_now();
+ pi->xattr_version++;
// log + wait
mdr->ls = mdlog->get_current_segment();
inode_t *pi = cur->project_inode();
pi->version = cur->pre_dirty();
pi->ctime = g_clock.real_now();
-
+ pi->xattr_version++;
+
// log + wait
mdr->ls = mdlog->get_current_segment();
EUpdate *le = new EUpdate(mdlog, "removexattr");
utime_t atime; // file data access time.
uint32_t time_warp_seq; // count of (potential) mtime/atime timewarps (i.e., utimes())
- // dirfrag, recursive accounting
+ // dirfrag, recursive accountin
frag_info_t dirstat;
nest_info_t rstat, accounted_rstat;
// special stuff
version_t version; // auth only
version_t file_data_version; // auth only
+ version_t xattr_version;
// file type
bool is_symlink() const { return (mode & S_IFMT) == S_IFLNK; }
::encode(version, bl);
::encode(file_data_version, bl);
+ ::encode(xattr_version, bl);
}
void decode(bufferlist::iterator &p) {
::decode(ino, p);
::decode(version, p);
::decode(file_data_version, p);
+ ::decode(xattr_version, p);
}
};
WRITE_CLASS_ENCODER(inode_t)
if (head.time_warp_seq)
out << " tws " << head.time_warp_seq;
+ if (head.xattr_version)
+ out << " xattrs(v=" << head.xattr_version << " l=" << xattrbl.length() << ")";
+
out << ")";
}
}
void encode_payload() {
head.snap_trace_len = snapbl.length();
+ head.xattr_len = xattrbl.length();
::encode(head, payload);
::encode_nohead(snapbl, payload);
::encode_nohead(xattrbl, payload);