#include "msg/Message.h"
#include "osd/osd_types.h"
#include "include/ceph_features.h"
+#include "common/hobject.h"
#include <atomic>
/*
eversion_t reassert_version;
int32_t retry_attempt; // 0 is first attempt. -1 if we don't know.
- object_t oid;
- object_locator_t oloc;
+ hobject_t hobj;
pg_t pgid;
bufferlist::iterator p;
// Decoding flags. Decoding is only needed for messages catched by pipe reader.
public:
vector<OSDOp> ops;
private:
-
- snapid_t snapid;
snapid_t snap_seq;
vector<snapid_t> snaps;
friend class MOSDOpReply;
ceph_tid_t get_client_tid() { return header.tid; }
- void set_snapid(const snapid_t& s) { snapid = s; }
+ void set_snapid(const snapid_t& s) {
+ hobj.snap = s;
+ }
void set_snaps(const vector<snapid_t>& i) {
snaps = i;
}
assert(!final_decode_needed);
return mtime;
}
- const object_locator_t& get_object_locator() const {
+ object_locator_t get_object_locator() const {
assert(!final_decode_needed);
- return oloc;
+ if (hobj.oid.name.empty())
+ return object_locator_t(hobj.pool, hobj.nspace, hobj.get_hash());
+ else
+ return object_locator_t(hobj);
}
object_t& get_oid() {
assert(!final_decode_needed);
- return oid;
+ return hobj.oid;
}
- const snapid_t& get_snapid() {
+ const hobject_t &get_hobj() const {
+ return hobj;
+ }
+ snapid_t get_snapid() {
assert(!final_decode_needed);
- return snapid;
+ return hobj.snap;
}
const snapid_t& get_snap_seq() const {
assert(!final_decode_needed);
: Message(CEPH_MSG_OSD_OP, HEAD_VERSION, COMPAT_VERSION),
client_inc(inc),
osdmap_epoch(_osdmap_epoch), flags(_flags), retry_attempt(-1),
- oid(_oid), oloc(_oloc), pgid(_pgid),
+ hobj(_oid, _oloc.key, CEPH_NOSNAP, _pgid.ps(), _pgid.pool(), _oloc.nspace),
+ pgid(_pgid),
+ partial_decode_needed(false),
+ final_decode_needed(false),
+ features(feat) {
+ set_tid(tid);
+
+ // also put the client_inc in reqid.inc, so that get_reqid() can
+ // be used before the full message is decoded.
+ reqid.inc = inc;
+ }
+ MOSDOp(int inc, long tid, const hobject_t& ho, pg_t& _pgid,
+ epoch_t _osdmap_epoch,
+ int _flags, uint64_t feat)
+ : Message(CEPH_MSG_OSD_OP, HEAD_VERSION, COMPAT_VERSION),
+ client_inc(inc),
+ osdmap_epoch(_osdmap_epoch), flags(_flags), retry_attempt(-1),
+ hobj(ho),
+ pgid(_pgid),
partial_decode_needed(false),
final_decode_needed(false),
features(feat) {
::encode(mtime, payload);
::encode(reassert_version, payload);
- __u32 oid_len = oid.name.length();
+ __u32 oid_len = hobj.oid.name.length();
::encode(oid_len, payload);
- ::encode(snapid, payload);
+ ::encode(hobj.snap, payload);
::encode(snap_seq, payload);
__u32 num_snaps = snaps.size();
::encode(num_snaps, payload);
for (unsigned i = 0; i < ops.size(); i++)
::encode(ops[i].op, payload);
- ::encode_nohead(oid.name, payload);
+ ::encode_nohead(hobj.oid.name, payload);
::encode_nohead(snaps, payload);
} else if ((features & CEPH_FEATURE_NEW_OSDOP_ENCODING) == 0) {
header.version = 6;
::encode(flags, payload);
::encode(mtime, payload);
::encode(reassert_version, payload);
- ::encode(oloc, payload);
+ ::encode(get_object_locator(), payload);
::encode(pgid, payload);
- ::encode(oid, payload);
+ ::encode(hobj.oid, payload);
__u16 num_ops = ops.size();
::encode(num_ops, payload);
for (unsigned i = 0; i < ops.size(); i++)
::encode(ops[i].op, payload);
- ::encode(snapid, payload);
+ ::encode(hobj.snap, payload);
::encode(snap_seq, payload);
::encode(snaps, payload);
::encode(reqid, payload);
::encode(client_inc, payload);
::encode(mtime, payload);
- ::encode(oloc, payload);
- ::encode(oid, payload);
+ ::encode(get_object_locator(), payload);
+ ::encode(hobj.oid, payload);
__u16 num_ops = ops.size();
::encode(num_ops, payload);
for (unsigned i = 0; i < ops.size(); i++)
::encode(ops[i].op, payload);
- ::encode(snapid, payload);
+ ::encode(hobj.snap, payload);
::encode(snap_seq, payload);
::encode(snaps, payload);
// Always keep here the newest version of decoding order/rule
if (header.version == HEAD_VERSION) {
- ::decode(pgid, p);
- ::decode(osdmap_epoch, p);
- ::decode(flags, p);
- ::decode(reassert_version, p);
- ::decode(reqid, p);
+ ::decode(pgid, p);
+ ::decode(osdmap_epoch, p);
+ ::decode(flags, p);
+ ::decode(reassert_version, p);
+ ::decode(reqid, p);
} else if (header.version < 2) {
// old decode
::decode(client_inc, p);
__u32 su;
::decode(su, p);
- oloc.pool = pgid.pool();
::decode(osdmap_epoch, p);
::decode(flags, p);
__u32 oid_len;
::decode(oid_len, p);
- ::decode(snapid, p);
+ ::decode(hobj.snap, p);
::decode(snap_seq, p);
__u32 num_snaps;
::decode(num_snaps, p);
for (unsigned i = 0; i < num_ops; i++)
::decode(ops[i].op, p);
- decode_nohead(oid_len, oid.name, p);
+ decode_nohead(oid_len, hobj.oid.name, p);
decode_nohead(num_snaps, snaps, p);
// recalculate pgid hash value
pgid.set_ps(ceph_str_hash(CEPH_STR_HASH_RJENKINS,
- oid.name.c_str(),
- oid.name.length()));
+ hobj.oid.name.c_str(),
+ hobj.oid.name.length()));
+ hobj.pool = pgid.pool();
+ hobj.set_hash(pgid.ps());
retry_attempt = -1;
features = 0;
::decode(mtime, p);
::decode(reassert_version, p);
+ object_locator_t oloc;
::decode(oloc, p);
if (header.version < 3) {
::decode(pgid, p);
}
- ::decode(oid, p);
+ ::decode(hobj.oid, p);
//::decode(ops, p);
__u16 num_ops;
for (unsigned i = 0; i < num_ops; i++)
::decode(ops[i].op, p);
- ::decode(snapid, p);
+ ::decode(hobj.snap, p);
::decode(snap_seq, p);
::decode(snaps, p);
else
reqid = osd_reqid_t();
+ hobj.pool = pgid.pool();
+ hobj.set_key(oloc.key);
+ hobj.nspace = oloc.nspace;
+ hobj.set_hash(pgid.ps());
+
OSDOp::split_osd_op_vector_in_data(ops, data);
// we did the full decode
::decode(client_inc, p);
::decode(mtime, p);
+ object_locator_t oloc;
::decode(oloc, p);
- ::decode(oid, p);
+ ::decode(hobj.oid, p);
__u16 num_ops;
::decode(num_ops, p);
for (unsigned i = 0; i < num_ops; i++)
::decode(ops[i].op, p);
- ::decode(snapid, p);
+ ::decode(hobj.snap, p);
::decode(snap_seq, p);
::decode(snaps, p);
::decode(features, p);
+ hobj.pool = pgid.pool();
+ hobj.set_key(oloc.key);
+ hobj.nspace = oloc.nspace;
+ hobj.set_hash(pgid.ps());
+
OSDOp::split_osd_op_vector_in_data(ops, data);
final_decode_needed = false;
out << pgid;
if (!final_decode_needed) {
out << ' ';
- if (!oloc.nspace.empty())
- out << oloc.nspace << "/";
- out << oid
+ out << hobj
<< " " << ops
<< " snapc " << get_snap_seq() << "=" << snaps;
- if (oloc.key.size())
- out << " " << oloc;
if (is_retry_attempt())
out << " RETRY=" << get_retry_attempt();
} else {