class MClientCapRelease : public Message {
+ static const int HEAD_VERSION = 2;
+ static const int COMPAT_VERSION = 1;
public:
struct ceph_mds_cap_release head;
vector<ceph_mds_cap_item> caps;
+ // The message receiver must wait for this OSD epoch
+ // before actioning this cap release.
+ epoch_t osd_epoch_barrier;
+
MClientCapRelease() :
- Message(CEPH_MSG_CLIENT_CAPRELEASE) {
+ Message(CEPH_MSG_CLIENT_CAPRELEASE, HEAD_VERSION, COMPAT_VERSION),
+ osd_epoch_barrier(0)
+ {
memset(&head, 0, sizeof(head));
}
private:
bufferlist::iterator p = payload.begin();
::decode(head, p);
::decode_nohead(head.num, caps, p);
+ if (header.version >= 2) {
+ ::decode(osd_epoch_barrier, p);
+ }
}
void encode_payload(uint64_t features) {
head.num = caps.size();
::encode(head, payload);
::encode_nohead(caps, payload);
+ ::encode(osd_epoch_barrier, payload);
}
};
class MClientCaps : public Message {
-
- static const int HEAD_VERSION = 4; // added flock metadata, inline data
+ static const int HEAD_VERSION = 5;
static const int COMPAT_VERSION = 1;
public:
version_t inline_version;
bufferlist inline_data;
+ // Receivers may not use their new caps until they have this OSD map
+ epoch_t osd_epoch_barrier;
+
int get_caps() { return head.caps; }
int get_wanted() { return head.wanted; }
int get_dirty() { return head.dirty; }
}
MClientCaps()
- : Message(CEPH_MSG_CLIENT_CAPS, HEAD_VERSION, COMPAT_VERSION) {
+ : Message(CEPH_MSG_CLIENT_CAPS, HEAD_VERSION, COMPAT_VERSION),
+ osd_epoch_barrier(0) {
inline_version = 0;
}
MClientCaps(int op,
int caps,
int wanted,
int dirty,
- int mseq)
- : Message(CEPH_MSG_CLIENT_CAPS, HEAD_VERSION, COMPAT_VERSION) {
+ int mseq,
+ epoch_t oeb)
+ : Message(CEPH_MSG_CLIENT_CAPS, HEAD_VERSION, COMPAT_VERSION),
+ osd_epoch_barrier(oeb) {
memset(&head, 0, sizeof(head));
head.op = op;
head.ino = ino;
}
MClientCaps(int op,
inodeno_t ino, inodeno_t realm,
- uint64_t id, int mseq)
- : Message(CEPH_MSG_CLIENT_CAPS, HEAD_VERSION) {
+ uint64_t id, int mseq, epoch_t oeb)
+ : Message(CEPH_MSG_CLIENT_CAPS, HEAD_VERSION),
+ osd_epoch_barrier(oeb){
memset(&head, 0, sizeof(head));
head.op = op;
head.ino = ino;
} else {
inline_version = CEPH_INLINE_NONE;
}
+
+ if (header.version >= 5) {
+ ::decode(osd_epoch_barrier, p);
+ }
}
void encode_payload(uint64_t features) {
head.snap_trace_len = snapbl.length();
header.version = 3;
return;
}
+
+ ::encode(osd_epoch_barrier, payload);
}
};