class MOSDOp : public Message {
- static const int HEAD_VERSION = 6;
+ static const int HEAD_VERSION = 7;
static const int COMPAT_VERSION = 3;
private:
object_t oid;
object_locator_t oloc;
pg_t pgid;
+ bufferlist::iterator p;
+ // Decoding flags. Decoding is only needed for messages catched by pipe reader.
+ bool partial_decode_needed;
+ bool final_decode_needed;
+ //
public:
vector<OSDOp> ops;
private:
public:
friend class MOSDOpReply;
- // read
- const snapid_t& get_snapid() { return snapid; }
+ ceph_tid_t get_client_tid() { return header.tid; }
void set_snapid(const snapid_t& s) { snapid = s; }
- // writ
- const snapid_t& get_snap_seq() const { return snap_seq; }
- const vector<snapid_t> &get_snaps() const { return snaps; }
void set_snaps(const vector<snapid_t>& i) {
snaps = i;
}
reqid = rid;
}
+ // Fields decoded in partial decoding
+ const pg_t& get_pg() const { assert(!partial_decode_needed); return pgid; }
+ epoch_t get_map_epoch() { assert(!partial_decode_needed); return osdmap_epoch; }
+ int get_flags() const { assert(!partial_decode_needed); return flags; }
+ const eversion_t& get_version() { assert(!partial_decode_needed); return reassert_version; }
osd_reqid_t get_reqid() const {
+ assert(!partial_decode_needed);
if (reqid != osd_reqid_t())
return reqid;
else
client_inc,
header.tid);
}
- int get_client_inc() { return client_inc; }
- ceph_tid_t get_client_tid() { return header.tid; }
-
- object_t& get_oid() { return oid; }
- const pg_t& get_pg() const { return pgid; }
-
- const object_locator_t& get_object_locator() const {
- return oloc;
+ // Fields decoded in final decoding
+ int get_client_inc() { assert(!final_decode_needed); return client_inc; }
+ utime_t get_mtime() { assert(!final_decode_needed); return mtime; }
+ const object_locator_t& get_object_locator() const { assert(!final_decode_needed); return oloc; }
+ object_t& get_oid() { assert(!final_decode_needed); return oid; }
+ const snapid_t& get_snapid() { assert(!final_decode_needed); return snapid; }
+ const snapid_t& get_snap_seq() const { assert(!final_decode_needed); return snap_seq; }
+ const vector<snapid_t> &get_snaps() const { assert(!final_decode_needed); return snaps; }
+ /**
+ * get retry attempt
+ *
+ * 0 is the first attempt.
+ *
+ * @return retry attempt, or -1 if we don't know
+ */
+ int get_retry_attempt() const {
+ return retry_attempt;
+ }
+ uint64_t get_features() const {
+ if (features)
+ return features;
+ return get_connection()->get_features();
}
-
- epoch_t get_map_epoch() { return osdmap_epoch; }
-
- const eversion_t& get_version() { return reassert_version; }
-
- utime_t get_mtime() { return mtime; }
MOSDOp()
- : Message(CEPH_MSG_OSD_OP, HEAD_VERSION, COMPAT_VERSION) { }
+ : Message(CEPH_MSG_OSD_OP, HEAD_VERSION, COMPAT_VERSION), partial_decode_needed(true), final_decode_needed(true) { }
MOSDOp(int inc, long tid,
object_t& _oid, object_locator_t& _oloc, pg_t& _pgid, epoch_t _osdmap_epoch,
int _flags, uint64_t feat)
: Message(CEPH_MSG_OSD_OP, HEAD_VERSION, COMPAT_VERSION),
client_inc(inc),
osdmap_epoch(_osdmap_epoch), flags(_flags), retry_attempt(-1),
- oid(_oid), oloc(_oloc), pgid(_pgid),
+ oid(_oid), oloc(_oloc), pgid(_pgid), partial_decode_needed(false), final_decode_needed(false),
features(feat) {
set_tid(tid);
}
add_simple_op(CEPH_OSD_OP_STAT, 0, 0);
}
- uint64_t get_features() const {
- if (features)
- return features;
- return get_connection()->get_features();
- }
-
- // flags
- int get_flags() const { return flags; }
bool has_flag(__u32 flag) { return flags & flag; };
-
bool wants_ack() const { return flags & CEPH_OSD_FLAG_ACK; }
bool wants_ondisk() const { return flags & CEPH_OSD_FLAG_ONDISK; }
bool wants_onnvram() const { return flags & CEPH_OSD_FLAG_ONNVRAM; }
retry_attempt = a;
}
- /**
- * get retry attempt
- *
- * 0 is the first attempt.
- *
- * @return retry attempt, or -1 if we don't know
- */
- int get_retry_attempt() const {
- return retry_attempt;
- }
-
// marshalling
virtual void encode_payload(uint64_t features) {
::encode_nohead(oid.name, payload);
::encode_nohead(snaps, payload);
- } else {
- header.version = HEAD_VERSION;
+ } else if ((features & CEPH_FEATURE_NEW_OSDOP_ENCODING) == 0) {
+ header.version = 6;
::encode(client_inc, payload);
::encode(osdmap_epoch, payload);
::encode(flags, payload);
::encode(mtime, payload);
::encode(reassert_version, payload);
-
::encode(oloc, payload);
::encode(pgid, payload);
+
::encode(oid, payload);
__u16 num_ops = ops.size();
::encode(num_ops, payload);
for (unsigned i = 0; i < ops.size(); i++)
- ::encode(ops[i].op, payload);
+ ::encode(ops[i].op, payload);
::encode(snapid, payload);
::encode(snap_seq, payload);
::encode(retry_attempt, payload);
::encode(features, payload);
::encode(reqid, payload);
+ } else {
+ // new, reordered, v7 message encoding
+ header.version = HEAD_VERSION;
+ ::encode(pgid, payload);
+ ::encode(osdmap_epoch, payload);
+ ::encode(flags, payload);
+ ::encode(reassert_version, payload);
+ ::encode(reqid, payload);
+ ::encode(client_inc, payload);
+ ::encode(mtime, payload);
+ ::encode(oloc, payload);
+ ::encode(oid, payload);
+
+ __u16 num_ops = ops.size();
+ ::encode(num_ops, payload);
+ for (unsigned i = 0; i < ops.size(); i++)
+ ::encode(ops[i].op, payload);
+
+ ::encode(snapid, payload);
+ ::encode(snap_seq, payload);
+ ::encode(snaps, payload);
+
+ ::encode(retry_attempt, payload);
+ ::encode(features, payload);
}
}
virtual void decode_payload() {
- bufferlist::iterator p = payload.begin();
+ assert(partial_decode_needed && final_decode_needed);
+ p = payload.begin();
if (header.version < 2) {
// old decode
retry_attempt = -1;
features = 0;
reqid = osd_reqid_t();
- } else {
- // new decode
+ OSDOp::split_osd_op_vector_in_data(ops, data);
+ // In old versions, final decoding is done in first step
+ final_decode_needed = false;
+
+ } else if (header.version < 7) {
::decode(client_inc, p);
::decode(osdmap_epoch, p);
::decode(flags, p);
::decode(pgid, p);
}
+ } else {
+ // new, v7 decode, splitted to partial and final
+ ::decode(pgid, p);
+ ::decode(osdmap_epoch, p);
+ ::decode(flags, p);
+ ::decode(reassert_version, p);
+ ::decode(reqid, p);
+ }
+
+ partial_decode_needed = false;
+
+ }
+
+ void finish_decode() {
+ assert(!partial_decode_needed); // partial decoding required
+
+ if (!final_decode_needed)
+ return; //Message is already final decoded
+
+ if (header.version < 7) {
::decode(oid, p);
//::decode(ops, p);
::decode(num_ops, p);
ops.resize(num_ops);
for (unsigned i = 0; i < num_ops; i++)
- ::decode(ops[i].op, p);
+ ::decode(ops[i].op, p);
::decode(snapid, p);
::decode(snap_seq, p);
::decode(snaps, p);
if (header.version >= 4)
- ::decode(retry_attempt, p);
+ ::decode(retry_attempt, p);
else
- retry_attempt = -1;
+ retry_attempt = -1;
if (header.version >= 5)
- ::decode(features, p);
+ ::decode(features, p);
else
features = 0;
::decode(reqid, p);
else
reqid = osd_reqid_t();
+
+ OSDOp::split_osd_op_vector_in_data(ops, data);
+
+ } else { // final decoding for reordered v7
+ ::decode(client_inc, p);
+ ::decode(mtime, p);
+ ::decode(oloc, p);
+ ::decode(oid, p);
+
+ __u16 num_ops;
+ ::decode(num_ops, p);
+ ops.resize(num_ops);
+ for (unsigned i = 0; i < num_ops; i++)
+ ::decode(ops[i].op, p);
+
+ ::decode(snapid, p);
+ ::decode(snap_seq, p);
+ ::decode(snaps, p);
+
+ ::decode(retry_attempt, p);
+
+ ::decode(features, p);
+
+ OSDOp::split_osd_op_vector_in_data(ops, data);
+
}
- OSDOp::split_osd_op_vector_in_data(ops, data);
+ final_decode_needed = false;
}
void clear_buffers() {
const char *get_type_name() const { return "osd_op"; }
void print(ostream& out) const {
- out << "osd_op(" << get_reqid();
+ if (!partial_decode_needed)
+ out << "osd_op(" << get_reqid();
out << " ";
if (!oloc.nspace.empty())
out << oloc.nspace << "/";
out << " RETRY=" << get_retry_attempt();
if (reassert_version != eversion_t())
out << " reassert_version=" << reassert_version;
- if (get_snap_seq())
+ if (!final_decode_needed)
out << " snapc " << get_snap_seq() << "=" << snaps;
- out << " " << ceph_osd_flag_string(get_flags());
+ if (!partial_decode_needed)
+ out << " " << ceph_osd_flag_string(get_flags());
out << " e" << osdmap_epoch;
out << ")";
}