pg->apply_stats(get_target(), delta_stats);
}
-OpsExecuter::OpsExecuter(Ref<PG> pg,
+OpsExecuter::OpsExecuter(Ref<PG> _pg,
ObjectContextRef _obc,
const OpInfo& op_info,
abstracted_msg_t&& msg,
crimson::net::ConnectionXcoreRef conn,
const SnapContext& _snapc)
- : pg(std::move(pg)),
+ : pg(std::move(_pg)),
obc(std::move(_obc)),
op_info(op_info),
msg(std::move(msg)),
conn(conn),
+ txn(pg->min_peer_features()),
snapc(_snapc)
{
if (op_info.may_write() && should_clone(*obc, snapc)) {
DEBUGDPP("{}", *this, *req);
ceph::os::Transaction txn;
- auto encoded_txn = req->get_data().cbegin();
- decode(txn, encoded_txn);
+ auto encoded_txn_p = req->get_middle().cbegin();
+ auto encoded_txn_d = req->get_data().cbegin();
+ txn.decode(req->get_middle().length() != 0 ? encoded_txn_p : encoded_txn_d,
+ encoded_txn_d);
auto p = req->logbl.cbegin();
std::vector<pg_log_entry_t> log_entries;
decode(log_entries, p);
MURef<MOSDRepOp> ReplicatedBackend::new_repop_msg(
const pg_shard_t &pg_shard,
const hobject_t &hoid,
- const bufferlist &encoded_txn,
+ bufferlist &encoded_txn_p_bl,
+ bufferlist &encoded_txn_d_bl,
const osd_op_params_t &osd_op_p,
epoch_t min_epoch,
epoch_t map_epoch,
tid,
osd_op_p.at_version);
if (send_op) {
- m->set_data(encoded_txn);
+ if (encoded_txn_d_bl.length() != 0) {
+ m->set_txn_payload(encoded_txn_p_bl);
+ m->set_data(encoded_txn_d_bl);
+ } else {
+ // Pre-tentacle format - everything in data
+ m->set_data(encoded_txn_p_bl);
+ }
} else {
ceph::os::Transaction t;
bufferlist bl;
pg_shards.size(),
osd_op_p.at_version,
pg.get_last_complete()).first;
- bufferlist encoded_txn;
- encode(txn, encoded_txn);
+ bufferlist encoded_txn_p_bl, encoded_txn_d_bl;
+ txn.encode(encoded_txn_p_bl, encoded_txn_d_bl, pg.min_peer_features());
bool is_delete = false;
for (auto &le : log_entries) {
MURef<MOSDRepOp> m;
if (pg.should_send_op(pg_shard, hoid)) {
m = new_repop_msg(
- pg_shard, hoid, encoded_txn, osd_op_p,
+ pg_shard, hoid, encoded_txn_p_bl, encoded_txn_d_bl, osd_op_p,
min_epoch, map_epoch, log_entries, true, tid);
} else {
m = new_repop_msg(
- pg_shard, hoid, encoded_txn, osd_op_p,
+ pg_shard, hoid, encoded_txn_p_bl, encoded_txn_d_bl, osd_op_p,
min_epoch, map_epoch, log_entries, false, tid);
if (pg.is_missing_on_peer(pg_shard, hoid)) {
if (_new_clone) {
MURef<MOSDRepOp> new_repop_msg(
const pg_shard_t &pg_shard,
const hobject_t &hoid,
- const bufferlist &encoded_txn,
+ bufferlist &encoded_txn_p_bl,
+ bufferlist &encoded_txn_d_bl,
const osd_op_params_t &osd_op_p,
epoch_t min_epoch,
epoch_t map_epoch,
void decode_payload() override {
using ceph::decode;
auto p = payload.cbegin();
+ auto d = data.cbegin();
decode(pgid, p);
decode(map_epoch, p);
- decode(op, p);
+ op.decode(p, d);
if (header.version >= 2) {
decode(min_epoch, p);
decode_trace(p);
using ceph::encode;
encode(pgid, payload);
encode(map_epoch, payload);
- encode(op, payload);
+ op.encode(payload, data, features);
encode(min_epoch, payload);
encode_trace(payload, features);
}
void decode_payload() override {
using ceph::decode;
auto p = payload.cbegin();
+ auto d = data.cbegin();
decode(pgid, p);
decode(map_epoch, p);
- decode(op, p);
+ op.decode(p, d);
if (header.version >= 2) {
decode(min_epoch, p);
decode_trace(p);
using ceph::encode;
encode(pgid, payload);
encode(map_epoch, payload);
- encode(op, payload);
+ op.encode(payload, data, features);
encode(min_epoch, payload);
encode_trace(payload, features);
}
/// non-empty if this transaction involves a hit_set history update
std::optional<pg_hit_set_history_t> updated_hit_set_history;
+ bufferlist txn_payload;
+
epoch_t get_map_epoch() const override {
return map_epoch;
}
return data.length();
}
+ void set_txn_payload(bufferlist bl)
+ {
+ txn_payload = bl;
+ }
+
void decode_payload() override {
using ceph::decode;
p = payload.cbegin();
encode(from, payload);
encode(updated_hit_set_history, payload);
encode(pg_committed_to, payload);
+ bufferlist middle(txn_payload);
+ set_middle(middle);
}
MOSDRepOp()
byte_throttler->take(middle.length());
}
ceph::buffer::list& get_middle() { return middle; }
+ const ceph::buffer::list& get_middle() const { return middle; }
void set_data(const ceph::buffer::list &bl) {
if (byte_throttler)
struct TransactionData {
ceph_le64 ops;
- ceph_le32 largest_data_len;
- ceph_le32 largest_data_off;
- ceph_le32 largest_data_off_in_data_bl;
+ ceph_le32 unused1;
+ ceph_le32 unused2;
+ ceph_le32 unused3;
ceph_le32 fadvise_flags;
TransactionData() noexcept :
ops(0),
- largest_data_len(0),
- largest_data_off(0),
- largest_data_off_in_data_bl(0),
+ unused1(0),
+ unused2(0),
+ unused3(0),
fadvise_flags(0) { }
// override default move operations to reset default values
TransactionData(TransactionData&& other) noexcept :
ops(other.ops),
- largest_data_len(other.largest_data_len),
- largest_data_off(other.largest_data_off),
- largest_data_off_in_data_bl(other.largest_data_off_in_data_bl),
fadvise_flags(other.fadvise_flags) {
other.ops = 0;
- other.largest_data_len = 0;
- other.largest_data_off = 0;
- other.largest_data_off_in_data_bl = 0;
other.fadvise_flags = 0;
}
TransactionData& operator=(TransactionData&& other) noexcept {
ops = other.ops;
- largest_data_len = other.largest_data_len;
- largest_data_off = other.largest_data_off;
- largest_data_off_in_data_bl = other.largest_data_off_in_data_bl;
fadvise_flags = other.fadvise_flags;
other.ops = 0;
- other.largest_data_len = 0;
- other.largest_data_off = 0;
- other.largest_data_off_in_data_bl = 0;
other.fadvise_flags = 0;
return *this;
}
uint32_t coll_id = 0;
uint32_t object_id = 0;
+ uint64_t data_features = 0;
+
+ /* Transactions are encoded/decoded in two formats. The old format
+ * (v <= 9) does not page align write data in inter-OSD messages and
+ * can degrade performance. New, aligned format (v >= 10) encodes
+ * aligned data at the start of the message. Transactions are encoded
+ * in the format determined by the replica set's common feature bits.
+ * When a message is recevied and decoded the transaction has a fixed
+ * format, thereafter there are limitations for encode and append
+ * operations
+ */
+ ceph::buffer::list data_aligned_bl;
+ ceph::buffer::list data_misaligned_bl;
+
+ bool is_format_aligned() const {
+ return HAVE_FEATURE(data_features, SERVER_TENTACLE);
+ }
- ceph::buffer::list data_bl;
ceph::buffer::list op_bl;
std::list<Context *> on_applied;
public:
Transaction() = default;
-
- explicit Transaction(ceph::buffer::list::const_iterator &dp) {
- decode(dp);
- }
- explicit Transaction(ceph::buffer::list &nbl) {
- auto dp = nbl.cbegin();
- decode(dp);
+ explicit Transaction(uint64_t data_features)
+ : data_features(data_features) {
}
// override default move operations to reset default values
object_index(std::move(other.object_index)),
coll_id(other.coll_id),
object_id(other.object_id),
- data_bl(std::move(other.data_bl)),
+ data_features(other.data_features),
+ data_aligned_bl(std::move(other.data_aligned_bl)),
+ data_misaligned_bl(std::move(other.data_misaligned_bl)),
op_bl(std::move(other.op_bl)),
on_applied(std::move(other.on_applied)),
on_commit(std::move(other.on_commit)),
data = std::move(other.data);
coll_index = std::move(other.coll_index);
object_index = std::move(other.object_index);
+ data_features = other.data_features;
coll_id = other.coll_id;
object_id = other.object_id;
- data_bl = std::move(other.data_bl);
+ data_aligned_bl = std::move(other.data_aligned_bl);
+ data_misaligned_bl = std::move(other.data_misaligned_bl);
op_bl = std::move(other.op_bl);
on_applied = std::move(other.on_applied);
on_commit = std::move(other.on_commit);
std::swap(on_commit, other.on_commit);
std::swap(on_applied_sync, other.on_applied_sync);
+ std::swap(data_features, other.data_features);
std::swap(coll_index, other.coll_index);
std::swap(object_index, other.object_index);
std::swap(coll_id, other.coll_id);
std::swap(object_id, other.object_id);
op_bl.swap(other.op_bl);
- data_bl.swap(other.data_bl);
+ data_aligned_bl.swap(other.data_aligned_bl);
+ data_misaligned_bl.swap(other.data_misaligned_bl);
}
void _update_op(Op* op,
}
/// Append the operations of the parameter to this Transaction. Those operations are removed from the parameter Transaction
void append(Transaction& other) {
-
+ //appending a transaction in new format with a transaction in old format
+ //or versa versa is not supported.
+ ceph_assert(data_features == other.data_features);
data.ops = data.ops + other.data.ops;
- if (other.data.largest_data_len > data.largest_data_len) {
- data.largest_data_len = other.data.largest_data_len;
- data.largest_data_off = other.data.largest_data_off;
- data.largest_data_off_in_data_bl = data_bl.length() + other.data.largest_data_off_in_data_bl;
- }
data.fadvise_flags = data.fadvise_flags | other.data.fadvise_flags;
on_applied.splice(on_applied.end(), other.on_applied);
on_commit.splice(on_commit.end(), other.on_commit);
//append op_bl
op_bl.append(other_op_bl);
- //append data_bl
- data_bl.append(other.data_bl);
+ //append data_bl's
+ data_aligned_bl.append(other.data_aligned_bl);
+ data_misaligned_bl.append(other.data_misaligned_bl);
}
/** Inquires about the Transaction as a whole. */
/// How big is the encoded Transaction buffer?
uint64_t get_encoded_bytes() {
- //layout: data_bl + op_bl + coll_index + object_index + data
+ //layout: data_misaligned_bl + op_bl + coll_index + object_index +
+ // data + data_features
// coll_index size, object_index size and sizeof(transaction_data)
// all here, so they may be computed at compile-time
final_size += p->first.encoded_size();
}
- return data_bl.length() +
+ final_size += sizeof(data_features);
+
+ return data_misaligned_bl.length() +
op_bl.length() +
final_size;
}
/// Retain old version for regression testing purposes
uint64_t get_encoded_bytes_test() {
using ceph::encode;
- //layout: data_bl + op_bl + coll_index + object_index + data
+ //layout: data_misaligned_bl + op_bl + coll_index + object_index +
+ // data + data_features
ceph::buffer::list bl;
encode(coll_index, bl);
encode(object_index, bl);
- return data_bl.length() +
+ return data_misaligned_bl.length() +
op_bl.length() +
bl.length() +
- sizeof(data);
+ sizeof(data) +
+ sizeof(data_features);
}
uint64_t get_num_bytes() {
return get_encoded_bytes();
}
- /// Size of largest data buffer to the "write" operation encountered so far
- uint32_t get_data_length() {
- return data.largest_data_len;
- }
- /// offset within the encoded buffer to the start of the largest data buffer that's encoded
- uint32_t get_data_offset() {
- if (data.largest_data_off_in_data_bl) {
- return data.largest_data_off_in_data_bl +
- sizeof(__u8) + // encode struct_v
- sizeof(__u8) + // encode compat_v
- sizeof(__u32) + // encode len
- sizeof(__u32); // data_bl len
- }
- return 0; // none
- }
- /// offset of buffer as aligned to destination within object.
- int get_data_alignment() {
- if (!data.largest_data_len)
- return 0;
- return (0 - get_data_offset()) & ~CEPH_PAGE_MASK;
- }
- /// Is the Transaction empty (no operations)
bool empty() {
return !data.ops;
}
uint64_t ops;
char* op_buffer_p;
- ceph::buffer::list::const_iterator data_bl_p;
+ ceph::buffer::list::const_iterator data_aligned_bl_p;
+ ceph::buffer::list::const_iterator data_misaligned_bl_p;
+#ifdef WITH_CRIMSON
+ bool new_format;
+#else
+ const bool new_format;
+#endif
+
+ Op *op;
public:
std::vector<coll_t> colls;
private:
explicit iterator(Transaction *t)
: t(t),
- data_bl_p(t->data_bl.cbegin()),
+ data_aligned_bl_p(t->data_aligned_bl.cbegin()),
+ data_misaligned_bl_p(t->data_misaligned_bl.cbegin()),
+ new_format(t->is_format_aligned()),
colls(t->coll_index.size()),
objects(t->object_index.size()) {
Op* decode_op() {
ceph_assert(ops > 0);
- Op* op = reinterpret_cast<Op*>(op_buffer_p);
+ op = reinterpret_cast<Op*>(op_buffer_p);
op_buffer_p += sizeof(Op);
ops--;
return op;
}
std::string decode_string() {
- using ceph::decode;
+ using ceph::decode;
std::string s;
- decode(s, data_bl_p);
+ decode(s, data_misaligned_bl_p);
return s;
}
void decode_bl(ceph::buffer::list& bl) {
- using ceph::decode;
- decode(bl, data_bl_p);
+ using ceph::decode;
+ if (!new_format) {
+ decode(bl, data_misaligned_bl_p);
+ return;
+ }
+ if (op->op != OP_WRITE) {
+ decode(bl, data_misaligned_bl_p);
+ return;
+ }
+ uint64_t alignstart = (0 - op->off) & ~CEPH_PAGE_MASK;
+ if (op->len >= CEPH_PAGE_SIZE + alignstart) {
+ uint64_t alignlen = (op->len - alignstart) & CEPH_PAGE_MASK;
+ uint64_t suffixstart = alignstart + alignlen;
+ if (alignstart!=0) {
+ // Misaligned chunk at start
+ bufferlist prefix;
+ decode_nohead(alignstart, prefix, data_misaligned_bl_p);
+ bl.append(prefix);
+ }
+ // Aligned chunk in middle
+ bufferlist aligned;
+ decode_nohead(alignlen, aligned, data_aligned_bl_p);
+ bl.append(aligned);
+ if (suffixstart != op->len) {
+ // Misaligned chunk at end
+ bufferlist suffix;
+ decode_nohead(op->len-suffixstart, suffix, data_misaligned_bl_p);
+ bl.append(suffix);
+ }
+ } else {
+ decode_nohead(op->len, bl, data_misaligned_bl_p);
+ }
}
void decode_attrset(std::map<std::string,ceph::buffer::ptr>& aset) {
- using ceph::decode;
- decode(aset, data_bl_p);
+ using ceph::decode;
+ decode(aset, data_misaligned_bl_p);
}
void decode_attrset(std::map<std::string,ceph::buffer::list>& aset) {
- using ceph::decode;
- decode(aset, data_bl_p);
+ using ceph::decode;
+ decode(aset, data_misaligned_bl_p);
}
void decode_attrset_bl(ceph::buffer::list *pbl) {
- decode_str_str_map_to_bl(data_bl_p, pbl);
+ decode_str_str_map_to_bl(data_misaligned_bl_p, pbl);
}
void decode_keyset(std::set<std::string> &keys){
- using ceph::decode;
- decode(keys, data_bl_p);
+ using ceph::decode;
+ decode(keys, data_misaligned_bl_p);
}
void decode_keyset_bl(ceph::buffer::list *pbl){
- decode_str_set_to_bl(data_bl_p, pbl);
+ decode_str_set_to_bl(data_misaligned_bl_p, pbl);
}
const ghobject_t &get_oid(uint32_t oid_id) {
void write(const coll_t& cid, const ghobject_t& oid, uint64_t off, uint64_t len,
const ceph::buffer::list& write_data, uint32_t flags = 0) {
using ceph::encode;
- uint32_t orig_len = data_bl.length();
Op* _op = _get_next_op();
+ uint64_t alignstart = (0 - off) & ~CEPH_PAGE_MASK;
_op->op = OP_WRITE;
_op->cid = _get_coll_id(cid);
_op->oid = _get_object_id(oid);
_op->off = off;
_op->len = len;
- encode(write_data, data_bl);
-
ceph_assert(len == write_data.length());
data.fadvise_flags = data.fadvise_flags | flags;
- if (write_data.length() > data.largest_data_len) {
- data.largest_data_len = write_data.length();
- data.largest_data_off = off;
- data.largest_data_off_in_data_bl = orig_len + sizeof(__u32); // we are about to
- }
data.ops = data.ops + 1;
+ if (!is_format_aligned()) {
+ encode(write_data, data_misaligned_bl);
+ return;
+ }
+ if (len >= CEPH_PAGE_SIZE + alignstart) {
+ uint64_t alignlen = (len - alignstart) & CEPH_PAGE_MASK;
+ uint64_t suffixstart = alignstart + alignlen;
+ if (alignstart != 0) {
+ // Misaligned chunk at start
+ bufferlist prefix;
+ prefix.substr_of(write_data, 0, alignstart);
+ encode_nohead(prefix, data_misaligned_bl);
+ }
+ // Aligned chunk in middle
+ bufferlist aligned;
+ aligned.substr_of(write_data, alignstart, alignlen);
+ encode_nohead(aligned, data_aligned_bl);
+ if (suffixstart != len) {
+ // Misaligned chunk at end
+ bufferlist suffix;
+ suffix.substr_of(write_data, suffixstart, len-suffixstart);
+ encode_nohead(suffix, data_misaligned_bl);
+ }
+ } else {
+ encode_nohead(write_data, data_misaligned_bl);
+ }
}
/**
* zero out the indicated byte range within an object. Some
_op->op = OP_SETATTR;
_op->cid = _get_coll_id(cid);
_op->oid = _get_object_id(oid);
- encode(s, data_bl);
- encode(val, data_bl);
+ encode(s, data_misaligned_bl);
+ encode(val, data_misaligned_bl);
data.ops = data.ops + 1;
}
/// Set multiple xattrs of an object
_op->op = OP_SETATTRS;
_op->cid = _get_coll_id(cid);
_op->oid = _get_object_id(oid);
- encode(attrset, data_bl);
+ encode(attrset, data_misaligned_bl);
data.ops = data.ops + 1;
}
/// Set multiple xattrs of an object
_op->op = OP_SETATTRS;
_op->cid = _get_coll_id(cid);
_op->oid = _get_object_id(oid);
- encode(attrset, data_bl);
+ encode(attrset, data_misaligned_bl);
data.ops = data.ops + 1;
}
/// remove an xattr from an object
_op->op = OP_RMATTR;
_op->cid = _get_coll_id(cid);
_op->oid = _get_object_id(oid);
- encode(s, data_bl);
+ encode(s, data_misaligned_bl);
data.ops = data.ops + 1;
}
/// remove all xattrs from an object
_op->op = OP_COLL_HINT;
_op->cid = _get_coll_id(cid);
_op->hint = type;
- encode(hint, data_bl);
+ encode(hint, data_misaligned_bl);
data.ops = data.ops + 1;
}
_op->op = OP_OMAP_SETKEYS;
_op->cid = _get_coll_id(cid);
_op->oid = _get_object_id(oid);
- encode(attrset, data_bl);
+ encode(attrset, data_misaligned_bl);
data.ops = data.ops + 1;
}
_op->op = OP_OMAP_SETKEYS;
_op->cid = _get_coll_id(cid);
_op->oid = _get_object_id(oid);
- data_bl.append(attrset_bl);
+ data_misaligned_bl.append(attrset_bl);
data.ops = data.ops + 1;
}
_op->op = OP_OMAP_RMKEYS;
_op->cid = _get_coll_id(cid);
_op->oid = _get_object_id(oid);
- encode(keys, data_bl);
+ encode(keys, data_misaligned_bl);
data.ops = data.ops + 1;
}
_op->cid = _get_coll_id(cid);
_op->oid = _get_object_id(oid);
using ceph::encode;
- encode((uint32_t)1, data_bl);
- encode(key, data_bl);
+ encode((uint32_t)1, data_misaligned_bl);
+ encode(key, data_misaligned_bl);
data.ops = data.ops + 1;
}
_op->op = OP_OMAP_RMKEYS;
_op->cid = _get_coll_id(cid);
_op->oid = _get_object_id(oid);
- data_bl.append(keys_bl);
+ data_misaligned_bl.append(keys_bl);
data.ops = data.ops + 1;
}
_op->op = OP_OMAP_RMKEYRANGE;
_op->cid = _get_coll_id(cid);
_op->oid = _get_object_id(oid);
- encode(first, data_bl);
- encode(last, data_bl);
+ encode(first, data_misaligned_bl);
+ encode(last, data_misaligned_bl);
data.ops = data.ops + 1;
}
_op->op = OP_OMAP_RMKEYRANGE;
_op->cid = _get_coll_id(cid);
_op->oid = _get_object_id(oid);
- data_bl.append(keys_bl);
+ data_misaligned_bl.append(keys_bl);
data.ops = data.ops + 1;
}
void omap_setheader(
const coll_t &cid, ///< [in] Collection containing oid
const ghobject_t &oid, ///< [in] Object
- const ceph::buffer::list &bl ///< [in] Header value
+ const ceph::buffer::list &hdr_bl ///< [in] Header value
) {
using ceph::encode;
Op* _op = _get_next_op();
_op->op = OP_OMAP_SETHEADER;
_op->cid = _get_coll_id(cid);
_op->oid = _get_object_id(oid);
- encode(bl, data_bl);
+ encode(hdr_bl, data_misaligned_bl);
data.ops = data.ops + 1;
}
data.ops = data.ops + 1;
}
- void encode(ceph::buffer::list& bl) const {
- //layout: data_bl + op_bl + coll_index + object_index + data
- ENCODE_START(9, 9, bl);
- encode(data_bl, bl);
- encode(op_bl, bl);
- encode(coll_index, bl);
- encode(object_index, bl);
- data.encode(bl);
- ENCODE_FINISH(bl);
+ void encode(ceph::buffer::list& bl) const
+ {
+ encode(bl, bl);
+ }
+
+ void encode(ceph::buffer::list &p_bl,
+ ceph::buffer::list &d_bl,
+ uint64_t features=0) const
+ {
+ //see also get_encoded_bytes which assumes layout version 9
+
+ //layout version 9:
+ // buffer = data_misaligned_bl + op_bl + coll_index + object_index + data
+ //layout version 10 (for inter-OSD messages):
+ // payload = op_bl + coll_index + object_index + data
+ // data = data_aligned_bl + data_misaligned_bl
+
+ uint8_t ver = HAVE_FEATURE(features, SERVER_TENTACLE) ? 10 : 9;
+ if (is_format_aligned()) {
+ //cannot encode a new format transaction in the old format
+ ceph_assert(ver >= 10);
+ }
+ ENCODE_START(ver, ver, p_bl);
+ if (ver < 10) {
+ encode(data_misaligned_bl, p_bl);
+ }
+ encode(op_bl, p_bl);
+ encode(coll_index, p_bl);
+ encode(object_index, p_bl);
+ data.encode(p_bl);
+
+ if (ver >= 10) {
+ encode(data_features, p_bl);
+ encode(data_aligned_bl.length(), p_bl);
+ encode_nohead(data_aligned_bl, d_bl);
+ encode(data_misaligned_bl.length(), p_bl);
+ encode_nohead(data_misaligned_bl, d_bl);
+ }
+ ENCODE_FINISH(p_bl);
}
void decode(ceph::buffer::list::const_iterator &bl) {
- DECODE_START(9, bl);
+ decode(bl, bl);
+ }
+
+ void decode(ceph::buffer::list::const_iterator &p_bl,
+ ceph::buffer::list::const_iterator &d_bl) {
+ DECODE_START(10, p_bl);
DECODE_OLDEST(9);
- decode(data_bl, bl);
- decode(op_bl, bl);
- decode(coll_index, bl);
- decode(object_index, bl);
- data.decode(bl);
+ if (struct_v < 10) {
+ decode(data_misaligned_bl, p_bl);
+ data_features = 0;
+ }
+ decode(op_bl, p_bl);
+ decode(coll_index, p_bl);
+ decode(object_index, p_bl);
+ data.decode(p_bl);
coll_id = coll_index.size();
object_id = object_index.size();
- DECODE_FINISH(bl);
+ if (struct_v >= 10) {
+ decode(data_features, p_bl);
+ unsigned length;
+ decode(length, p_bl);
+ decode_nohead(length, data_aligned_bl, d_bl);
+ decode(length, p_bl);
+ decode_nohead(length, data_misaligned_bl, d_bl);
+ }
+
+ DECODE_FINISH(p_bl);
}
void dump(ceph::Formatter *f);
get_parent()->get_acting_recovery_backfill_shards().begin();
i != get_parent()->get_acting_recovery_backfill_shards().end();
++i) {
- trans[i->shard];
+ trans.emplace(i->shard, get_parent()->min_peer_features());
}
op->trace.event("start ec write");
virtual const OSDMapRef& pgb_get_osdmap() const = 0;
virtual epoch_t pgb_get_osdmap_epoch() const = 0;
virtual const pg_info_t &get_info() const = 0;
+ virtual uint64_t min_peer_features() const = 0;
/**
* Called when a pull on soid cannot be completed due to
* down peers
void ECSubWrite::encode(bufferlist &bl) const
{
- ENCODE_START(4, 1, bl);
- encode(from, bl);
- encode(tid, bl);
- encode(reqid, bl);
- encode(soid, bl);
- encode(stats, bl);
- encode(t, bl);
- encode(at_version, bl);
- encode(trim_to, bl);
- encode(log_entries, bl);
- encode(temp_added, bl);
- encode(temp_removed, bl);
- encode(updated_hit_set_history, bl);
- encode(pg_committed_to, bl);
- encode(backfill_or_async_recovery, bl);
- ENCODE_FINISH(bl);
+ encode(bl, bl);
+}
+
+void ECSubWrite::encode(bufferlist &p_bl, bufferlist &d_bl, uint64_t features) const
+{
+ uint8_t ver = HAVE_FEATURE(features, SERVER_TENTACLE) ? 5 : 4;
+ ENCODE_START(ver, 1, p_bl);
+ encode(from, p_bl);
+ encode(tid, p_bl);
+ encode(reqid, p_bl);
+ encode(soid, p_bl);
+ encode(stats, p_bl);
+ if (ver >= 5) {
+ t.encode(p_bl, d_bl, features);
+ } else {
+ t.encode(p_bl, p_bl, features);
+ }
+ encode(at_version, p_bl);
+ encode(trim_to, p_bl);
+ encode(log_entries, p_bl);
+ encode(temp_added, p_bl);
+ encode(temp_removed, p_bl);
+ encode(updated_hit_set_history, p_bl);
+ encode(pg_committed_to, p_bl);
+ encode(backfill_or_async_recovery, p_bl);
+ ENCODE_FINISH(p_bl);
}
void ECSubWrite::decode(bufferlist::const_iterator &bl)
{
- DECODE_START(4, bl);
- decode(from, bl);
- decode(tid, bl);
- decode(reqid, bl);
- decode(soid, bl);
- decode(stats, bl);
- decode(t, bl);
- decode(at_version, bl);
- decode(trim_to, bl);
- decode(log_entries, bl);
- decode(temp_added, bl);
- decode(temp_removed, bl);
+ decode(bl, bl);
+}
+
+void ECSubWrite::decode(bufferlist::const_iterator &p_bl,
+ bufferlist::const_iterator &d_bl)
+{
+ DECODE_START(5, p_bl);
+ decode(from, p_bl);
+ decode(tid, p_bl);
+ decode(reqid, p_bl);
+ decode(soid, p_bl);
+ decode(stats, p_bl);
+ if (struct_v >= 5) {
+ t.decode(p_bl, d_bl);
+ } else {
+ t.decode(p_bl, p_bl);
+ }
+ decode(at_version, p_bl);
+ decode(trim_to, p_bl);
+ decode(log_entries, p_bl);
+ decode(temp_added, p_bl);
+ decode(temp_removed, p_bl);
if (struct_v >= 2) {
- decode(updated_hit_set_history, bl);
+ decode(updated_hit_set_history, p_bl);
}
if (struct_v >= 3) {
- decode(pg_committed_to, bl);
+ decode(pg_committed_to, p_bl);
} else {
pg_committed_to = trim_to;
}
if (struct_v >= 4) {
- decode(backfill_or_async_recovery, bl);
+ decode(backfill_or_async_recovery, p_bl);
} else {
// The old protocol used an empty transaction to indicate backfill or async_recovery
backfill_or_async_recovery = t.empty();
}
- DECODE_FINISH(bl);
+ DECODE_FINISH(p_bl);
}
std::ostream &operator<<(
void ECSubReadReply::encode(bufferlist &bl) const
{
- ENCODE_START(1, 1, bl);
- encode(from, bl);
- encode(tid, bl);
- encode(buffers_read, bl);
- encode(attrs_read, bl);
- encode(errors, bl);
- ENCODE_FINISH(bl);
+ encode(bl, bl);
+}
+
+void ECSubReadReply::encode(bufferlist &p_bl,
+ bufferlist &d_bl,
+ uint64_t features) const
+{
+ uint8_t ver = HAVE_FEATURE(features, SERVER_TENTACLE) ? 2 : 1;
+ ENCODE_START(ver, ver, p_bl);
+ encode(from, p_bl);
+ encode(tid, p_bl);
+ if (ver >= 2) {
+ // Manual encode of std::map<hobject_t, std::list<std::pair<uint64_t,
+ // ceph::buffer::list> >> buffers_read;
+ // data is encoded into d_bl to keep it aligned
+ __u32 nmap = (__u32)(buffers_read.size());
+ encode(nmap, p_bl);
+ for (auto [oid, datalist] : buffers_read) {
+ encode(oid, p_bl);
+ __u32 nlist = (__u32)(datalist.size());
+ encode(nlist, p_bl);
+ for (auto [result,bl] : datalist) {
+ encode(result, p_bl);
+ encode(bl.length(), p_bl);
+ encode_nohead(bl, d_bl);
+ }
+ }
+ } else {
+ encode(buffers_read, p_bl);
+ }
+ encode(attrs_read, p_bl);
+ encode(errors, p_bl);
+ ENCODE_FINISH(p_bl);
}
void ECSubReadReply::decode(bufferlist::const_iterator &bl)
{
- DECODE_START(1, bl);
- decode(from, bl);
- decode(tid, bl);
- decode(buffers_read, bl);
- decode(attrs_read, bl);
- decode(errors, bl);
- DECODE_FINISH(bl);
+ decode(bl, bl);
+}
+
+void ECSubReadReply::decode(bufferlist::const_iterator &p_bl,
+ bufferlist::const_iterator &d_bl)
+{
+ DECODE_START(2, p_bl);
+ decode(from, p_bl);
+ decode(tid, p_bl);
+ if (struct_v < 2) {
+ decode(buffers_read, p_bl);
+ } else {
+ // Manual decode of std::map<hobject_t, std::list<std::pair<uint64_t,
+ // ceph::buffer::list> >> buffers_read;
+ // data is decoded from d_bl to keep it aligned
+ __u32 nmap;
+ decode(nmap, p_bl);
+ buffers_read.clear();
+ while (nmap--) {
+ hobject_t oid;
+ decode(oid, p_bl);
+ std::list<std::pair<uint64_t,ceph::buffer::list>> datalist;
+ __u32 nlist;
+ decode(nlist, p_bl);
+ while (nlist--) {
+ uint64_t result;
+ decode(result, p_bl);
+ ceph::buffer::list bl;
+ __u32 length;
+ decode(length, p_bl);
+ decode_nohead(length, bl, d_bl);
+ datalist.emplace_back(make_pair(result, bl));
+ }
+ buffers_read[oid] = datalist;
+ }
+ }
+ decode(attrs_read, p_bl);
+ decode(errors, p_bl);
+ DECODE_FINISH(p_bl);
}
std::ostream &operator<<(
backfill_or_async_recovery = other.backfill_or_async_recovery;
}
void encode(ceph::buffer::list &bl) const;
+ void encode(ceph::buffer::list &p_bl,
+ ceph::buffer::list &d_bll,
+ uint64_t features=0) const;
void decode(ceph::buffer::list::const_iterator &bl);
+ void decode(ceph::buffer::list::const_iterator &p_bl,
+ ceph::buffer::list::const_iterator &d_bl);
void dump(ceph::Formatter *f) const;
static void generate_test_instances(std::list<ECSubWrite*>& o);
private:
ECSubWrite(ECSubWrite& other);
const ECSubWrite& operator=(const ECSubWrite& other);
};
+
WRITE_CLASS_ENCODER(ECSubWrite)
struct ECSubWriteReply {
std::map<hobject_t, std::map<std::string, ceph::buffer::list, std::less<>>> attrs_read;
std::map<hobject_t, int> errors;
void encode(ceph::buffer::list &bl) const;
+ void encode(ceph::buffer::list &p_bl,
+ ceph::buffer::list &d_pl,
+ uint64_t features=0) const;
void decode(ceph::buffer::list::const_iterator &bl);
+ void decode(ceph::buffer::list::const_iterator &p_bl,
+ ceph::buffer::list::const_iterator &d_pl);
void dump(ceph::Formatter *f) const;
static void generate_test_instances(std::list<ECSubReadReply*>& o);
};
op->mark_started();
vector<PushReplyOp> replies;
- ObjectStore::Transaction t;
+ ObjectStore::Transaction t{get_parent()->min_peer_features()};
if (get_parent()->check_failsafe_full()) {
dout(10) << __func__ << " Out of space (failsafe) processing push request." << dendl;
ceph_abort();
ceph_abort();
}
- ObjectStore::Transaction t;
+ ObjectStore::Transaction t{get_parent()->min_peer_features()};
list<pull_complete_info> to_continue;
for (vector<PushOp>::const_iterator i = m->pushes.begin();
i != m->pushes.end();
ObjectStore::Transaction t;
encode(t, wr->get_data());
} else {
- encode(op_t, wr->get_data());
- wr->get_header().data_off = op_t.get_data_alignment();
+ bufferlist p, d;
+ op_t.encode(p, d, get_parent()->min_peer_features());
+ if (d.length() != 0) {
+ wr->set_txn_payload(p);
+ wr->set_data(d);
+ } else {
+ // Pre-tentacle format - everything in data
+ wr->set_data(p);
+ }
}
wr->logbl = log_entries;
op->mark_started();
- RepModifyRef rm(std::make_shared<RepModify>());
+ RepModifyRef rm(std::make_shared<RepModify>(get_parent()->min_peer_features()));
rm->op = op;
rm->ackerosd = ackerosd;
rm->last_complete = get_info().last_complete;
// shipped transaction and log entries
vector<pg_log_entry_t> log;
- auto p = const_cast<bufferlist&>(m->get_data()).cbegin();
- decode(rm->opt, p);
+ auto p = const_cast<bufferlist&>(m->get_middle()).cbegin();
+ auto d = const_cast<bufferlist&>(m->get_data()).cbegin();
+ rm->opt.decode(m->get_middle().length() != 0 ? p : d, d);
if (m->new_temp_oid != hobject_t()) {
dout(20) << __func__ << " start tracking temp " << m->new_temp_oid << dendl;
ObjectStore::Transaction opt, localt;
- RepModify() : committed(false), ackerosd(-1),
- epoch_started(0) {}
+ RepModify(uint64_t features)
+ : committed(false),
+ ackerosd(-1),
+ epoch_started(0),
+ localt(features) {
+ }
};
typedef std::shared_ptr<RepModify> RepModifyRef;
ticks += a;
count++;
}
+ void reset()
+ {
+ ticks = 0;
+ count = 0;
+ }
};
static Tick write_ticks, setattr_ticks, omap_setkeys_ticks, omap_rmkey_ticks;
static Tick encode_ticks, decode_ticks, iterate_ticks;
omap_rmkey_ticks.add(Cycles::rdtsc() - start_time);
}
- void apply_encode_decode() {
- bufferlist bl;
+ void apply_encode_decode(bool new_format) {
+ bufferlist p_bl;
+ bufferlist d_bl;
ObjectStore::Transaction d;
uint64_t start_time = Cycles::rdtsc();
- t.encode(bl);
+ t.encode(p_bl, new_format ? d_bl : p_bl);
encode_ticks.add(Cycles::rdtsc() - start_time);
- auto bliter = bl.cbegin();
+ auto p_bliter = p_bl.cbegin();
+ auto d_bliter = d_bl.cbegin();
start_time = Cycles::rdtsc();
- d.decode(bliter);
+ d.decode(p_bliter, new_format ? d_bliter : p_bliter);
decode_ticks.add(Cycles::rdtsc() - start_time);
}
cerr << " decode op: " << Cycles::to_microseconds(Transaction::decode_ticks.ticks) << "us count: " << Transaction::decode_ticks.count << std::endl;
cerr << " iterate op: " << Cycles::to_microseconds(Transaction::iterate_ticks.ticks) << "us count: " << Transaction::iterate_ticks.count << std::endl;
}
+ static void reset_stat() {
+ write_ticks.reset();
+ setattr_ticks.reset();
+ omap_setkeys_ticks.reset();
+ omap_rmkey_ticks.reset();
+ encode_ticks.reset();
+ decode_ticks.reset();
+ iterate_ticks.reset();
+ }
};
class PerfCase {
data[info_info_attr] = generate_random(560, 1);
}
- uint64_t rados_write_4k(int times) {
+ uint64_t rados_write_4k(int times,bool new_format) {
uint64_t ticks = 0;
uint64_t len = Kib *4;
for (int i = 0; i < times; i++) {
t.write(cid, oid, 0, len, data["4k"]);
t.setattr(cid, oid, attr, data[attr]);
t.setattr(cid, oid, snapset_attr, data[snapset_attr]);
- t.apply_encode_decode();
+ t.apply_encode_decode(new_format);
t.apply_iterate();
ticks += Cycles::rdtsc() - start_time;
}
t.omap_setkeys(meta_cid, pglog_oid, pglog_attrset);
t.omap_setkeys(meta_cid, info_oid, info_attrset);
t.omap_rmkey(meta_cid, pglog_oid, pglog_attr);
- t.apply_encode_decode();
+ t.apply_encode_decode(new_format);
t.apply_iterate();
ticks += Cycles::rdtsc() - start_time;
}
uint64_t times = atoi(args[0]);
PerfCase c;
- uint64_t ticks = c.rados_write_4k(times);
+ uint64_t ticks1 = c.rados_write_4k(times, false);
+ Transaction::dump_stat();
+ cerr << " Old format total rados op " << times << " run time " << Cycles::to_microseconds(ticks1) << "us." << std::endl;
+
+ Transaction::reset_stat();
+
+ uint64_t ticks2 = c.rados_write_4k(times, true);
Transaction::dump_stat();
- cerr << " Total rados op " << times << " run time " << Cycles::to_microseconds(ticks) << "us." << std::endl;
+ cerr << " New format total rados op " << times << " run time " << Cycles::to_microseconds(ticks2) << "us." << std::endl;
return 0;
}
{
bench_num_bytes(false);
}
+
+/**
+ * create_pattern
+ *
+ * Fill bufferlist generating data from a seed value
+ */
+void create_pattern(bufferlist& bl, unsigned int seed, unsigned int length)
+{
+ ASSERT_TRUE(length % sizeof(int) == 0);
+ for (unsigned int i = 0; i < length / sizeof(int); i++) {
+ encode(seed, bl);
+ seed = (seed << 1) ^ seed;
+ }
+}
+
+/**
+ * check_pattern
+ *
+ * Validate bufferlist contents matches seed value
+ */
+void check_pattern(bufferlist& bl, unsigned int seed, unsigned int length)
+{
+ ceph::buffer::list::const_iterator p;
+ ASSERT_TRUE(length % sizeof(int) == 0);
+ p = bl.cbegin();
+ for (unsigned int i = 0; i < length / sizeof(int); i++) {
+ unsigned int j;
+ decode(j, p);
+ ASSERT_TRUE(j == seed);
+ seed = (seed << 1) ^ seed;
+ }
+}
+void check_pattern(bufferptr& bptr, unsigned int seed, unsigned int length)
+{
+ bufferlist bl;
+ bl.append(bptr);
+ check_pattern(bl, seed, length);
+}
+
+/**
+ * create_check_transaction1
+ *
+ * Construct/validate an instance of every type of Op in an ObjectStore:Transaction
+ */
+void create_check_transaction1(ObjectStore::Transaction& t_in, bool create, bool append)
+{
+ coll_t c1 = coll_t(spg_t(pg_t(0,111), shard_id_t::NO_SHARD));
+ coll_t c2 = coll_t(spg_t(pg_t(0,111), shard_id_t::NO_SHARD));
+ ghobject_t o1 = ghobject_t(hobject_t(sobject_t("testobject1", CEPH_NOSNAP)));
+ ghobject_t o2 = ghobject_t(hobject_t(sobject_t("testobject2", CEPH_NOSNAP)));
+ bufferlist bl1;
+ bufferlist bl2;
+
+ const unsigned int bl1_seed = 1234;
+ const unsigned int bl1_len = 1024;
+ const unsigned int bl2_seed = 6666;
+ const unsigned int bl2_len = 128;
+
+ create_pattern(bl1, bl1_seed, bl1_len);
+ create_pattern(bl2, bl2_seed, bl2_len);
+
+ ObjectStore::Transaction::iterator i = t_in.begin();
+ ObjectStore::Transaction::Op *op = nullptr;
+
+ bool done = false;
+ for (int pos = 0; !done ; ++pos) {
+ ObjectStore::Transaction t_append;
+ ObjectStore::Transaction& t = append ? t_append : t_in;
+ if (!create) {
+ ASSERT_TRUE(i.have_op());
+ op = i.decode_op();
+ cout << " Checking pos " << pos << " op " << op->op << std::endl;
+ }
+ switch (pos) {
+ case 0:
+ // NOP
+ if (create) {
+ t.nop();
+ }else{
+ ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_NOP);
+ }
+ break;
+ case 1:
+ // CREATE
+ if (create) {
+ t.create(c1, o1);
+ }else{
+ ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_CREATE);
+ ASSERT_TRUE(c1 == i.get_cid(op->cid));
+ ASSERT_TRUE(o1 == i.get_oid(op->oid));
+ }
+ break;
+ case 2:
+ // TOUCH
+ if (create) {
+ t.touch(c2, o2);
+ }else{
+ ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_TOUCH);
+ ASSERT_TRUE(c2 == i.get_cid(op->cid));
+ ASSERT_TRUE(o2 == i.get_oid(op->oid));
+ }
+ break;
+ case 3:
+ // WRITE
+ if (create) {
+ t.write(c1, o1, 0, bl1_len, bl1);
+ }else{
+ bufferlist bl;
+ ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_WRITE);
+ ASSERT_TRUE(c1 == i.get_cid(op->cid));
+ ASSERT_TRUE(o1 == i.get_oid(op->oid));
+ ASSERT_TRUE(op->off == 0);
+ ASSERT_TRUE(op->len == 1024);
+ i.decode_bl(bl);
+ ASSERT_TRUE(bl.length() == op->len);
+ check_pattern(bl, bl1_seed, bl1_len);
+ }
+ break;
+ case 4:
+ // ZERO
+ if (create) {
+ t.zero(c2, o2, 1111, 2222);
+ }else{
+ ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_ZERO);
+ ASSERT_TRUE(c2 == i.get_cid(op->cid));
+ ASSERT_TRUE(o2 == i.get_oid(op->oid));
+ ASSERT_TRUE(op->off == 1111);
+ ASSERT_TRUE(op->len == 2222);
+ }
+ break;
+ case 5:
+ // TRUNCATE
+ if (create) {
+ t.truncate(c1, o1, 3333);
+ }else{
+ ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_TRUNCATE);
+ ASSERT_TRUE(c1 == i.get_cid(op->cid));
+ ASSERT_TRUE(o1 == i.get_oid(op->oid));
+ ASSERT_TRUE(op->off = 3333);
+ }
+ break;
+ case 6:
+ // REMOVE
+ if (create) {
+ t.remove(c2, o2);
+ }else{
+ ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_REMOVE);
+ ASSERT_TRUE(c2 == i.get_cid(op->cid));
+ ASSERT_TRUE(o2 == i.get_oid(op->oid));
+ }
+ break;
+ case 7:
+ // SETATTR (1)
+ if (create) {
+ t.setattr(c1, o1, "attr1", bl2);
+ }else{
+ ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_SETATTR);
+ ASSERT_TRUE(c1 == i.get_cid(op->cid));
+ ASSERT_TRUE(o1 == i.get_oid(op->oid));
+ string name = i.decode_string();
+ bufferlist bl;
+ i.decode_bl(bl);
+ ASSERT_TRUE(name == "attr1");
+ check_pattern(bl, bl2_seed, bl2_len);
+ }
+ break;
+ case 8:
+ // SETATTR (2)
+ if (create) {
+ t.setattr(c2, o2, std::string("attr2"), bl2);
+ }else{
+ ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_SETATTR);
+ ASSERT_TRUE(c2 == i.get_cid(op->cid));
+ ASSERT_TRUE(o2 == i.get_oid(op->oid));
+ string name = i.decode_string();
+ bufferlist bl;
+ i.decode_bl(bl);
+ ASSERT_TRUE(name == "attr2");
+ check_pattern(bl, bl2_seed, bl2_len);
+ }
+ break;
+ case 9:
+ // SETATTRS (1)
+ if (create) {
+ map<string,bufferptr,less<>> m;
+ m["a"] = buffer::copy("this", 4);
+ m["b"] = buffer::copy("that", 4);
+ t.setattrs(c1, o1, m);
+ }else{
+ ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_SETATTRS);
+ ASSERT_TRUE(c1 == i.get_cid(op->cid));
+ ASSERT_TRUE(o1 == i.get_oid(op->oid));
+ map<string, bufferptr> aset;
+ i.decode_attrset(aset);
+ ASSERT_TRUE(aset.size()==2);
+ auto a = aset.find("a");
+ ASSERT_TRUE(a != aset.end());
+ ASSERT_TRUE(!a->second.cmp(buffer::copy("this",4)));
+ auto b = aset.find("b");
+ ASSERT_TRUE(b != aset.end());
+ ASSERT_TRUE(!b->second.cmp(buffer::copy("that",4)));
+ }
+ break;
+ case 10:
+ // SETATTRS (2)
+ if (create) {
+ map<string,bufferlist,less<>> m;
+ bufferlist bflip, bflop;
+ bflip.append(buffer::copy("flip",4));
+ bflop.append(buffer::copy("flop",4));
+ m["a"] = bflip;
+ m["b"] = bflop;
+ t.setattrs(c2, o2, m);
+ }else{
+ ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_SETATTRS);
+ ASSERT_TRUE(c2 == i.get_cid(op->cid));
+ ASSERT_TRUE(o2 == i.get_oid(op->oid));
+ map<string, bufferptr> aset;
+ i.decode_attrset(aset);
+ ASSERT_TRUE(aset.size()==2);
+ auto a = aset.find("a");
+ ASSERT_TRUE(a != aset.end());
+ ASSERT_TRUE(!a->second.cmp(buffer::copy("flip",4)));
+ auto b = aset.find("b");
+ ASSERT_TRUE(b != aset.end());
+ ASSERT_TRUE(!b->second.cmp(buffer::copy("flop",4)));
+ }
+ break;
+ case 11:
+ // RMATTR (1)
+ if (create) {
+ t.rmattr(c1, o1, "attr3");
+ }else{
+ ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_RMATTR);
+ ASSERT_TRUE(c1 == i.get_cid(op->cid));
+ ASSERT_TRUE(o1 == i.get_oid(op->oid));
+ string name = i.decode_string();
+ ASSERT_TRUE(name == "attr3");
+ }
+ break;
+ case 12:
+ // RMATTR (2)
+ if (create) {
+ t.rmattr(c2, o2, std::string("attr4"));
+ }else{
+ ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_RMATTR);
+ ASSERT_TRUE(c2 == i.get_cid(op->cid));
+ ASSERT_TRUE(o2 == i.get_oid(op->oid));
+ string name = i.decode_string();
+ ASSERT_TRUE(name == "attr4");
+ }
+ break;
+ case 13:
+ // RMATTRS
+ if (create) {
+ t.rmattrs(c1, o1);
+ }else{
+ ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_RMATTRS);
+ ASSERT_TRUE(c1 == i.get_cid(op->cid));
+ ASSERT_TRUE(o1 == i.get_oid(op->oid));
+ }
+ break;
+ case 14:
+ // CLONE
+ if (create) {
+ t.clone(c2, o2, o1);
+ }else{
+ ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_CLONE);
+ ASSERT_TRUE(c2 == i.get_cid(op->cid));
+ ASSERT_TRUE(o2 == i.get_oid(op->oid));
+ ASSERT_TRUE(o1 == i.get_oid(op->dest_oid));
+ }
+ break;
+ case 15:
+ // CLONERANGE2
+ if (create) {
+ t.clone_range(c1, o2, o1, 4444, 5555, 6666);
+ }else{
+ ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_CLONERANGE2);
+ ASSERT_TRUE(c1 == i.get_cid(op->cid));
+ ASSERT_TRUE(o2 == i.get_oid(op->oid));
+ ASSERT_TRUE(o1 == i.get_oid(op->dest_oid));
+ ASSERT_TRUE(op->off == 4444);
+ ASSERT_TRUE(op->len == 5555);
+ ASSERT_TRUE(op->dest_off == 6666);
+ }
+ break;
+ case 16:
+ // MKCOLL
+ if (create) {
+ t.create_collection(c2, 7777);
+ }else{
+ ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_MKCOLL);
+ ASSERT_TRUE(c2 == i.get_cid(op->cid));
+ ASSERT_TRUE(op->split_bits == 7777);
+ }
+ break;
+ case 17:
+ // COLL_HINT
+ if (create) {
+ t.collection_hint(c1, 8888, bl2);
+ }else{
+ ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_COLL_HINT);
+ ASSERT_TRUE(c1 == i.get_cid(op->cid));
+ ASSERT_TRUE(op->hint == 8888);
+ bufferlist bl;
+ i.decode_bl(bl);
+ check_pattern(bl, bl2_seed, bl2_len);
+ }
+ break;
+ case 18:
+ // RMCOLL
+ if (create) {
+ t.remove_collection(c1);
+ }else{
+ ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_RMCOLL);
+ ASSERT_TRUE(c1 == i.get_cid(op->cid));
+ }
+ break;
+ case 19:
+ // COLL_MOVE_RENAME
+ if (create) {
+ t.collection_move_rename(c2, o2, c1, o1);
+ }else{
+ ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_COLL_MOVE_RENAME);
+ ASSERT_TRUE(c2 == i.get_cid(op->cid));
+ ASSERT_TRUE(o2 == i.get_oid(op->oid));
+ ASSERT_TRUE(c1 == i.get_cid(op->dest_cid));
+ ASSERT_TRUE(o1 == i.get_oid(op->dest_oid));
+ }
+ break;
+ case 20:
+ // TRY_RENAME
+ if (create) {
+ t.try_rename(c2, o2, o1);
+ }else{
+ ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_TRY_RENAME);
+ ASSERT_TRUE(c2 == i.get_cid(op->cid));
+ ASSERT_TRUE(o2 == i.get_oid(op->oid));
+ ASSERT_TRUE(o1 == i.get_oid(op->dest_oid));
+ }
+ break;
+ case 21:
+ // OMAP_CLEAR
+ if (create) {
+ t.omap_clear(c1, o1);
+ }else{
+ ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_OMAP_CLEAR);
+ ASSERT_TRUE(c1 == i.get_cid(op->cid));
+ ASSERT_TRUE(o1 == i.get_oid(op->oid));
+ }
+ break;
+ case 22:
+ // OMAP_SETKEYS (1)
+ if (create) {
+ map<string,bufferlist> m;
+ bufferlist bthis, bthat;
+ bthis.append(buffer::copy("this",4));
+ bthat.append(buffer::copy("that",4));
+ m["a"] = bthis;
+ m["b"] = bthat;
+ t.omap_setkeys(c2, o2, m);
+ }else{
+ ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_OMAP_SETKEYS);
+ ASSERT_TRUE(c2 == i.get_cid(op->cid));
+ ASSERT_TRUE(o2 == i.get_oid(op->oid));
+ map<string,bufferlist> aset;
+ i.decode_attrset(aset);
+ ASSERT_TRUE(aset.size()==2);
+ auto a = aset.find("a");
+ ASSERT_TRUE(a != aset.end());
+ ASSERT_TRUE(a->second.contents_equal("this",4));
+ auto b = aset.find("b");
+ ASSERT_TRUE(b != aset.end());
+ ASSERT_TRUE(b->second.contents_equal("that",4));
+ }
+ break;
+ case 23:
+ // OMAP_SETKEYS (2)
+ if (create) {
+ map<string,bufferlist> m;
+ bufferlist bthis, bthat;
+ bthis.append(buffer::copy("this",4));
+ bthat.append(buffer::copy("that",4));
+ m["a"] = bthis;
+ m["b"] = bthat;
+ bufferlist bkeys;
+ encode(m,bkeys);
+ t.omap_setkeys(c1, o1, bkeys);
+ }else{
+ ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_OMAP_SETKEYS);
+ ASSERT_TRUE(c1 == i.get_cid(op->cid));
+ ASSERT_TRUE(o1 == i.get_oid(op->oid));
+ map<string,bufferlist> aset;
+ i.decode_attrset(aset);
+ ASSERT_TRUE(aset.size()==2);
+ auto a = aset.find("a");
+ ASSERT_TRUE(a != aset.end());
+ ASSERT_TRUE(a->second.contents_equal("this",4));
+ auto b = aset.find("b");
+ ASSERT_TRUE(b != aset.end());
+ ASSERT_TRUE(b->second.contents_equal("that",4));
+ }
+ break;
+ case 24:
+ // OMAP_RMKEYS (1)
+ if (create) {
+ std::set<std::string> keys;
+ keys.insert(std::string("attr7"));
+ t.omap_rmkeys(c2, o2, keys);
+ }else{
+ ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_OMAP_RMKEYS);
+ ASSERT_TRUE(c2 == i.get_cid(op->cid));
+ ASSERT_TRUE(o2 == i.get_oid(op->oid));
+ bufferlist keys_bl;
+ i.decode_keyset_bl(&keys_bl);
+ std::set<std::string> keys;
+ decode(keys,keys_bl);
+ ASSERT_TRUE(keys.size() == 1);
+ for(auto& str: keys) {
+ ASSERT_TRUE(str == "attr7");
+ }
+ }
+ break;
+ case 25:
+ // OMAP_RMKEYS (2)
+ if (create) {
+ t.omap_rmkey(c1, o1, std::string("attr8"));
+ }else{
+ ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_OMAP_RMKEYS);
+ ASSERT_TRUE(c1 == i.get_cid(op->cid));
+ ASSERT_TRUE(o1 == i.get_oid(op->oid));
+ bufferlist keys_bl;
+ i.decode_keyset_bl(&keys_bl);
+ std::set<std::string> keys;
+ decode(keys,keys_bl);
+ ASSERT_TRUE(keys.size() == 1);
+ for(auto& str: keys) {
+ ASSERT_TRUE(str == "attr8");
+ }
+ }
+ break;
+ case 26:
+ // OMAP_RMKEYS (3)
+ if (create) {
+ std::set<std::string> keys;
+ keys.insert(std::string("attr9"));
+ bufferlist bkeys;
+ encode(keys,bkeys);
+ t.omap_rmkeys(c2, o2, bkeys);
+ }else{
+ ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_OMAP_RMKEYS);
+ ASSERT_TRUE(c2 == i.get_cid(op->cid));
+ ASSERT_TRUE(o2 == i.get_oid(op->oid));
+ bufferlist keys_bl;
+ i.decode_keyset_bl(&keys_bl);
+ std::set<std::string> keys;
+ decode(keys,keys_bl);
+ ASSERT_TRUE(keys.size() == 1);
+ for(auto& str: keys) {
+ ASSERT_TRUE(str == "attr9");
+ }
+ }
+ break;
+ case 27:
+ // OMAP_RMKEYRANGE (1)
+ if (create) {
+ t.omap_rmkeyrange(c1, o1, std::string("attr1"), std::string("attr2"));
+ }else{
+ ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_OMAP_RMKEYRANGE);
+ ASSERT_TRUE(c1 == i.get_cid(op->cid));
+ ASSERT_TRUE(o1 == i.get_oid(op->oid));
+ string first, last;
+ first = i.decode_string();
+ last = i.decode_string();
+ ASSERT_TRUE(first == "attr1");
+ ASSERT_TRUE(last == "attr2");
+ }
+ break;
+ case 28:
+ // OMAP_RMKEYRANGE (2)
+ if (create) {
+ bufferlist brange;
+ encode(std::string("attr3"),brange);
+ encode(std::string("attr4"),brange);
+ t.omap_rmkeyrange(c2, o2, brange);
+ }else{
+ ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_OMAP_RMKEYRANGE);
+ ASSERT_TRUE(c2 == i.get_cid(op->cid));
+ ASSERT_TRUE(o2 == i.get_oid(op->oid));
+ string first, last;
+ first = i.decode_string();
+ last = i.decode_string();
+ ASSERT_TRUE(first == "attr3");
+ ASSERT_TRUE(last == "attr4");
+ }
+ break;
+ case 29:
+ // OMAP_SETHEADER
+ if (create) {
+ t.omap_setheader(c1, o1, bl1);
+ }else{
+ ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_OMAP_SETHEADER);
+ ASSERT_TRUE(c1 == i.get_cid(op->cid));
+ ASSERT_TRUE(o1 == i.get_oid(op->oid));
+ }
+ break;
+ case 30:
+ // SPLIT_COLLECTION
+ if (create) {
+ t.split_collection(c2, 9999, 1000, c1);
+ }else{
+ ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_SPLIT_COLLECTION2);
+ ASSERT_TRUE(c2 == i.get_cid(op->cid));
+ ASSERT_TRUE(c1 == i.get_cid(op->dest_cid));
+ ASSERT_TRUE(op->split_bits == 9999);
+ ASSERT_TRUE(op->split_rem == 1000);
+ }
+ break;
+ case 31:
+ // MERGE_COLLECTION
+ if (create) {
+ t.merge_collection(c2, c1, 1100);
+ }else{
+ ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_MERGE_COLLECTION);
+ ASSERT_TRUE(c2 == i.get_cid(op->cid));
+ ASSERT_TRUE(c1 == i.get_cid(op->dest_cid));
+ ASSERT_TRUE(op->split_bits == 1100);
+ }
+ break;
+ case 32:
+ // SET_BITS
+ if (create) {
+ t.collection_set_bits(c2, 1200);
+ }else{
+ ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_COLL_SET_BITS);
+ ASSERT_TRUE(c2 == i.get_cid(op->cid));
+ ASSERT_TRUE(op->split_bits == 1200);
+ }
+ break;
+ case 33:
+ // SET_ALLOCHINT
+ if (create) {
+ t.set_alloc_hint(c1, o1, 1300, 1400, 1500);
+ }else{
+ ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_SETALLOCHINT);
+ ASSERT_TRUE(c1 == i.get_cid(op->cid));
+ ASSERT_TRUE(o1 == i.get_oid(op->oid));
+ ASSERT_TRUE(op->expected_object_size == 1300);
+ ASSERT_TRUE(op->expected_write_size == 1400);
+ ASSERT_TRUE(op->hint == 1500);
+ }
+ // End of cases
+ done = true;
+ break;
+ }
+ if (append) {
+ t_in.append(t_append);
+ }
+ }
+ if (!create) {
+ ASSERT_TRUE(!i.have_op());
+ }
+}
+
+void create_transaction1(ObjectStore::Transaction& t)
+{
+ create_check_transaction1(t, true, false);
+}
+
+void create_transaction1_using_append(ObjectStore::Transaction& t)
+{
+ create_check_transaction1(t, true, true);
+}
+
+void check_content_transaction1(ObjectStore::Transaction& t)
+{
+ create_check_transaction1(t, false, false);
+}
+
+TEST(Transaction, EncodeDecode)
+{
+ ObjectStore::Transaction source;
+ bufferlist encoded1;
+ bufferlist encoded2p;
+ bufferlist encoded2d;
+ ObjectStore::Transaction decode1;
+ ObjectStore::Transaction decode2;
+ ceph::buffer::list::const_iterator p;
+ ceph::buffer::list::const_iterator d;
+
+ cout << "Creating transaction1" << std::endl;
+ create_transaction1(source);
+ cout << "Checking transaction1" << std::endl;
+ check_content_transaction1(source);
+
+ encode(source, encoded1);
+ p = encoded1.cbegin();
+ decode(decode1, p);
+ cout << "Checking encoded/decoded with 1 buffer transaction1" << std::endl;
+ check_content_transaction1(decode1);
+ source.encode(encoded2p, encoded2d, CEPH_FEATUREMASK_SERVER_TENTACLE);
+ p = encoded2p.cbegin();
+ d = encoded2d.cbegin();
+ decode2.decode(p, d);
+ cout << "Checking encoded/decoded with 2 buffers transaction1" << std::endl;
+ check_content_transaction1(decode2);
+}
+
+/**
+ * create_check_transaction2
+ *
+ * Construct/validate write Ops with different lengths and alignements
+ * in an ObejctStore::Transaction
+ */
+void create_check_transaction2(ObjectStore::Transaction& t,bool create)
+{
+ coll_t c = coll_t();
+ ghobject_t o1 = ghobject_t(hobject_t(sobject_t("testobject", CEPH_NOSNAP)));
+
+ bufferlist blshort;
+ bufferlist blpage;
+ bufferlist blfewpages;
+ bufferlist bllong;
+
+ // Less than 1 page
+ const unsigned int blshort_seed = 4567;
+ const unsigned int blshort_len = CEPH_PAGE_SIZE/4;
+ // 1 page
+ const unsigned int blpage_seed = 7654;
+ const unsigned int blpage_len = CEPH_PAGE_SIZE;
+ // Whole number of pages
+ const unsigned int blfewpages_seed = 1357;
+ const unsigned int blfewpages_len = CEPH_PAGE_SIZE*3;
+ // 1.5 pages
+ const unsigned int bllong_seed = 9876;
+ const unsigned int bllong_len = (CEPH_PAGE_SIZE*3)/2;
+
+ create_pattern(blshort,blshort_seed,blshort_len);
+ create_pattern(blpage,blpage_seed,blpage_len);
+ create_pattern(blfewpages,blfewpages_seed,blfewpages_len);
+ create_pattern(bllong,bllong_seed,bllong_len);
+
+ ObjectStore::Transaction::iterator i = t.begin();
+ ObjectStore::Transaction::Op *op = nullptr;
+
+ bool done = false;
+ for (int pos = 0; !done ; ++pos) {
+ bufferlist bl;
+ if (!create) {
+ ASSERT_TRUE(i.have_op());
+ op = i.decode_op();
+ cout << " Checking pos " << pos << " off " << op->off << " len " << op->len << std::endl;
+ ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_WRITE);
+ i.decode_bl(bl);
+ ASSERT_TRUE(bl.length() == op->len);
+ }
+ switch (pos) {
+ case 0:
+ // Short buffer, PAGE aligned offset
+ if (create) {
+ t.write(c, o1, 0, blshort_len, blshort);
+ }else{
+ ASSERT_TRUE(op->off == 0);
+ ASSERT_TRUE(op->len == blshort_len);
+ check_pattern(bl, blshort_seed, blshort_len);
+ }
+ break;
+ case 1:
+ // Short buffer, straddle PAGE offset
+ if (create) {
+ t.write(c, o1, CEPH_PAGE_SIZE-10, blshort_len, blshort);
+ }else{
+ ASSERT_TRUE(op->off == CEPH_PAGE_SIZE-10);
+ ASSERT_TRUE(op->len == blshort_len);
+ check_pattern(bl, blshort_seed, blshort_len);
+ }
+ break;
+ case 2:
+ // Short buffer, end of buffer PAGE aligned
+ if (create) {
+ t.write(c, o1, CEPH_PAGE_SIZE-blshort_len, blshort_len, blshort);
+ }else{
+ ASSERT_TRUE(op->off == CEPH_PAGE_SIZE-blshort_len);
+ ASSERT_TRUE(op->len == blshort_len);
+ check_pattern(bl, blshort_seed, blshort_len);
+ }
+ break;
+ case 3:
+ // Page buffer, PAGE aligned offset
+ if (create) {
+ t.write(c, o1, CEPH_PAGE_SIZE*3, blpage_len, blpage);
+ }else{
+ ASSERT_TRUE(op->off == CEPH_PAGE_SIZE*3);
+ ASSERT_TRUE(op->len == blpage_len);
+ check_pattern(bl, blpage_seed, blpage_len);
+ }
+ break;
+ case 4:
+ // Page buffer, misaligned offset
+ if (create) {
+ t.write(c, o1, CEPH_PAGE_SIZE-10, blpage_len, blpage);
+ }else{
+ ASSERT_TRUE(op->off == CEPH_PAGE_SIZE-10);
+ ASSERT_TRUE(op->len == blpage_len);
+ check_pattern(bl, blpage_seed, blpage_len);
+ }
+ break;
+ case 5:
+ // Multiple page buffer, PAGE aligned offset
+ if (create) {
+ t.write(c, o1, CEPH_PAGE_SIZE*7, blfewpages_len, blfewpages);
+ }else{
+ ASSERT_TRUE(op->off == CEPH_PAGE_SIZE*7);
+ ASSERT_TRUE(op->len == blfewpages_len);
+ check_pattern(bl, blfewpages_seed, blfewpages_len);
+ }
+ break;
+ case 6:
+ // Multiple page buffer, misaligned offset
+ if (create) {
+ t.write(c, o1, CEPH_PAGE_SIZE-10, blfewpages_len, blfewpages);
+ }else{
+ ASSERT_TRUE(op->off == CEPH_PAGE_SIZE-10);
+ ASSERT_TRUE(op->len == blfewpages_len);
+ check_pattern(bl, blfewpages_seed, blfewpages_len);
+ }
+ break;
+ case 7:
+ // Long buffer, PAGE aligned offset
+ if (create) {
+ t.write(c, o1, 0, bllong_len, bllong);
+ }else{
+ ASSERT_TRUE(op->off == 0);
+ ASSERT_TRUE(op->len == bllong_len);
+ check_pattern(bl, bllong_seed, bllong_len);
+ }
+ break;
+ case 8:
+ // Long buffer, misaligned offset
+ if (create) {
+ t.write(c, o1, CEPH_PAGE_SIZE-10, bllong_len, bllong);
+ }else{
+ ASSERT_TRUE(op->off == CEPH_PAGE_SIZE-10);
+ ASSERT_TRUE(op->len == bllong_len);
+ check_pattern(bl, bllong_seed, bllong_len);
+ }
+ break;
+ case 9:
+ // Long buffer, end of buffer PAGE aligned
+ if (create) {
+ t.write(c, o1, 100*CEPH_PAGE_SIZE-bllong_len, bllong_len, bllong);
+ }else{
+ ASSERT_TRUE(op->off == 100*CEPH_PAGE_SIZE-bllong_len);
+ ASSERT_TRUE(op->len == bllong_len);
+ check_pattern(bl, bllong_seed, bllong_len);
+ }
+ // Last op
+ done = true;
+ break;
+ }
+ }
+ if (!create) {
+ ASSERT_TRUE(!i.have_op());
+ }
+}
+
+void create_transaction2(ObjectStore::Transaction& t)
+{
+ create_check_transaction2(t, true);
+}
+
+void check_content_transaction2(ObjectStore::Transaction& t)
+{
+ create_check_transaction2(t, false);
+}
+
+/**
+ * check_alignment_transaction2
+ *
+ * Validate alignment of write Op data buffers in an ObjectStore::Transaction
+ */
+void check_alignment_transaction2(ObjectStore::Transaction& t)
+{
+ ObjectStore::Transaction::iterator i = t.begin();
+ unsigned int longest_aligned = 0;
+ unsigned int longest_misaligned = 0;
+ unsigned int quantity_aligned = 0;
+ unsigned int quantity_misaligned = 0;
+ while (i.have_op()) {
+ ObjectStore::Transaction::Op *op = i.decode_op();
+ ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_WRITE);
+ bufferlist bl;
+ i.decode_bl(bl);
+ auto off = op->off;
+ auto len = bl.length();
+ for (const auto& bptr : bl.buffers()) {
+ cout << " Checking off " << off << " len " << bptr.length() << " ptr " << (void*)bptr.c_str() << std::endl;
+ if (off % CEPH_PAGE_SIZE) {
+ unsigned int align = (0-off) & ~CEPH_PAGE_MASK;
+ if (bptr.length() >= CEPH_PAGE_SIZE+align) {
+ cout << " Should have been split" << std::endl;
+ if (longest_misaligned < bptr.length()-align) {
+ longest_misaligned = bptr.length()-align;
+ }
+ ASSERT_TRUE(false);
+ }else{
+ cout << " OK - Sub PAGE misaligned offset" << std::endl;
+ }
+ quantity_misaligned += bptr.length();
+ }else{
+ if (bptr.length() >= CEPH_PAGE_SIZE) {
+ if (!bptr.is_aligned(CEPH_PAGE_SIZE)) {
+ cout << " Should have been aligned" << std::endl;
+ if (longest_misaligned < bptr.length()) {
+ longest_misaligned = bptr.length();
+ }
+ quantity_misaligned += bptr.length();
+ ASSERT_TRUE(false);
+ }else{
+ cout << " Good alignment" << std::endl;
+ if (longest_aligned < bptr.length()) {
+ longest_aligned = bptr.length();
+ }
+ quantity_aligned += bptr.length();
+ }
+ }else{
+ cout << " OK - Sub PAGE aligned offset" << std::endl;
+ quantity_misaligned += bptr.length();
+ }
+ }
+ off = off + bptr.length();
+ len = len - bptr.length();
+ }
+ }
+ ASSERT_TRUE(longest_aligned>=longest_misaligned);
+ cout << " Longest segment is aligned" << std::endl;
+ cout << " Bytes aligned "<< quantity_aligned << " Bytes misaligned " << quantity_misaligned << std::endl;
+}
+
+TEST(Transaction, WriteBufferAlignment)
+{
+ ObjectStore::Transaction source{CEPH_FEATURES_ALL};
+ bufferlist encoded1;
+ bufferlist encoded2p;
+ bufferlist encoded2d;
+ ObjectStore::Transaction decode1;
+ ObjectStore::Transaction decode2;
+ ObjectStore::Transaction decode3;
+ ceph::buffer::list::const_iterator p;
+ ceph::buffer::list::const_iterator d;
+
+ cout << "Creating transaction2" << std::endl;
+ create_transaction2(source);
+ cout << "Checking transaction2" << std::endl;
+ check_content_transaction2(source);
+#if 0
+ encode(source, encoded1);
+ p = encoded1.cbegin();
+ decode(decode1, p);
+ cout << "Checking encoded/decoded with 1 buffer transaction2" << std::endl;
+ check_content_transaction2(decode1);
+#endif
+
+ source.encode(encoded2p, encoded2d, CEPH_FEATUREMASK_SERVER_TENTACLE);
+ p = encoded2p.cbegin();
+ d = encoded2d.cbegin();
+ decode2.decode(p, d);
+ cout << "Checking encoded/decoded with 2 buffers transaction2" << std::endl;
+ check_content_transaction2(decode2);
+
+ ceph::buffer::list alignedbuf;
+ ceph::bufferptr ptr(ceph::buffer::create_aligned(encoded2d.length(), CEPH_PAGE_SIZE));
+
+ alignedbuf.push_back(ptr);
+ encoded2d.begin().copy(encoded2d.length(), ptr.c_str());
+ p = encoded2p.cbegin();
+ d = alignedbuf.cbegin();
+ decode3.decode(p, d);
+ cout << "Checking alignment of transaction2" << std::endl;
+ check_alignment_transaction2(decode3);
+}
+
+/**
+ * create_check_transaction3
+ *
+ * Construct/validate an ObjectStore:Transaction for appends
+ */
+void create_check_transaction3(ObjectStore::Transaction& t, bool create, bool part1)
+{
+ coll_t c1 = coll_t(spg_t(pg_t(0,111), shard_id_t::NO_SHARD));
+ ghobject_t o1 = ghobject_t(hobject_t(sobject_t("testobject1", CEPH_NOSNAP)));
+ bufferlist bllong;
+
+ // 1.5 pages
+ const unsigned int bllong_seed = 9876;
+ const unsigned int bllong_len = (CEPH_PAGE_SIZE*3)/2;
+ create_pattern(bllong,bllong_seed,bllong_len);
+
+ if (create) {
+ if (part1) {
+ //Part 1
+ t.create(c1, o1);
+ }else{
+ //Part 2
+ t.zero(c1, o1, 1111, 2222);
+ t.write(c1, o1, 0, bllong_len, bllong);
+ }
+ }else{
+ ObjectStore::Transaction::iterator i = t.begin();
+ ObjectStore::Transaction::Op *op;
+
+ //Part 1
+ ASSERT_TRUE(i.have_op());
+ op = i.decode_op();
+ ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_CREATE);
+ ASSERT_TRUE(c1 == i.get_cid(op->cid));
+ ASSERT_TRUE(o1 == i.get_oid(op->oid));
+ //Part 2
+ ASSERT_TRUE(i.have_op());
+ op = i.decode_op();
+ ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_ZERO);
+ ASSERT_TRUE(c1 == i.get_cid(op->cid));
+ ASSERT_TRUE(o1 == i.get_oid(op->oid));
+ ASSERT_TRUE(op->off == 1111);
+ ASSERT_TRUE(op->len == 2222);
+ op = i.decode_op();
+ ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_WRITE);
+ ASSERT_TRUE(c1 == i.get_cid(op->cid));
+ ASSERT_TRUE(o1 == i.get_oid(op->oid));
+ ASSERT_TRUE(op->off == 0);
+ ASSERT_TRUE(op->len == bllong_len);
+ bufferlist bl;
+ i.decode_bl(bl);
+ ASSERT_TRUE(bl.length() == bllong_len);
+ check_pattern(bl, bllong_seed, bllong_len);
+ ASSERT_FALSE(i.have_op());
+ }
+}
+
+void create_transaction3_part1(ObjectStore::Transaction& t)
+{
+ create_check_transaction3(t, true, true);
+}
+
+void create_transaction3_part2(ObjectStore::Transaction& t)
+{
+ create_check_transaction3(t, true, false);
+}
+
+void check_content_transaction3(ObjectStore::Transaction& t)
+{
+ create_check_transaction3(t, false, false);
+}
+
+void transaction_to_old_format(ObjectStore::Transaction& source,ObjectStore::Transaction& dest)
+{
+ bufferlist encoded;
+ ceph::buffer::list::const_iterator p;
+ encode(source, encoded);
+ p = encoded.cbegin();
+ decode(dest, p);
+}
+
+void transaction_to_new_format(ObjectStore::Transaction& source,ObjectStore::Transaction& dest)
+{
+ bufferlist encodedp;
+ bufferlist encodedd;
+ ceph::buffer::list::const_iterator p;
+ ceph::buffer::list::const_iterator d;
+ source.encode(encodedp, encodedd, CEPH_FEATUREMASK_SERVER_TENTACLE);
+ p = encodedp.cbegin();
+ d = encodedd.cbegin();
+ dest.decode(p, d);
+}
+
+TEST(Transaction, AppendSimple) //FormatDualDual
+{
+ ObjectStore::Transaction t1;
+ ObjectStore::Transaction t2;
+
+ create_transaction3_part1(t1);
+ create_transaction3_part2(t2);
+ t1.append(t2);
+ check_content_transaction3(t1);
+}
+
+TEST(Transaction, AppendFormatDualNew)
+{
+ ObjectStore::Transaction t1_dual;
+ ObjectStore::Transaction t2_dual;;
+ ObjectStore::Transaction t2_new;
+
+ create_transaction3_part1(t1_dual);
+ create_transaction3_part2(t2_dual);
+ transaction_to_new_format(t2_dual,t2_new);
+ t1_dual.append(t2_new);
+ check_content_transaction3(t1_dual);
+}
+
+TEST(Transaction, AppendFormatDualOld)
+{
+ ObjectStore::Transaction t1_dual;
+ ObjectStore::Transaction t2_dual;;
+ ObjectStore::Transaction t2_old;
+
+ create_transaction3_part1(t1_dual);
+ create_transaction3_part2(t2_dual);
+ transaction_to_old_format(t2_dual,t2_old);
+ t1_dual.append(t2_old);
+ check_content_transaction3(t1_dual);
+}
+
+TEST(Transaction, AppendFormatNewDual)
+{
+ ObjectStore::Transaction t1_dual;
+ ObjectStore::Transaction t2_dual;;
+ ObjectStore::Transaction t1_new;
+
+ create_transaction3_part1(t1_dual);
+ create_transaction3_part2(t2_dual);
+ transaction_to_new_format(t1_dual,t1_new);
+ t1_new.append(t2_dual);
+ check_content_transaction3(t1_new);
+}
+
+TEST(Transaction, AppendFormatNewNew)
+{
+ ObjectStore::Transaction t1_dual;
+ ObjectStore::Transaction t2_dual;;
+ ObjectStore::Transaction t1_new;
+ ObjectStore::Transaction t2_new;
+
+ create_transaction3_part1(t1_dual);
+ create_transaction3_part2(t2_dual);
+ transaction_to_new_format(t1_dual,t1_new);
+ transaction_to_new_format(t2_dual,t2_new);
+ t1_new.append(t2_new);
+ check_content_transaction3(t1_new);
+}
+
+TEST(Transaction, AppendFormatOldDual)
+{
+ ObjectStore::Transaction t1_dual;
+ ObjectStore::Transaction t2_dual;;
+ ObjectStore::Transaction t1_old;
+
+ create_transaction3_part1(t1_dual);
+ create_transaction3_part2(t2_dual);
+ transaction_to_old_format(t1_dual,t1_old);
+ t1_old.append(t2_dual);
+ check_content_transaction3(t1_old);
+}
+
+TEST(Transaction, AppendFormatOldOld)
+{
+ ObjectStore::Transaction t1_dual;
+ ObjectStore::Transaction t2_dual;;
+ ObjectStore::Transaction t1_old;
+ ObjectStore::Transaction t2_old;
+
+ create_transaction3_part1(t1_dual);
+ create_transaction3_part2(t2_dual);
+ transaction_to_old_format(t1_dual,t1_old);
+ transaction_to_old_format(t2_dual,t2_old);
+ t1_old.append(t2_old);
+ check_content_transaction3(t1_old);
+}
+
+TEST(Transaction, AppendOpTypes)
+{
+ ObjectStore::Transaction source;
+ bufferlist encoded1;
+ bufferlist encoded2p;
+ bufferlist encoded2d;
+ ObjectStore::Transaction decode1;
+ ObjectStore::Transaction decode2;
+ ceph::buffer::list::const_iterator p;
+ ceph::buffer::list::const_iterator d;
+
+ cout << "Creating transaction1 using append" << std::endl;
+ create_transaction1_using_append(source);
+ cout << "Checking transaction1" << std::endl;
+ check_content_transaction1(source);
+
+ encode(source, encoded1);
+ p = encoded1.cbegin();
+ decode(decode1, p);
+ cout << "Checking encoded/decoded with 1 buffer transaction1" << std::endl;
+ check_content_transaction1(decode1);
+ source.encode(encoded2p, encoded2d, CEPH_FEATUREMASK_SERVER_TENTACLE);
+ p = encoded2p.cbegin();
+ d = encoded2d.cbegin();
+ decode2.decode(p, d);
+ cout << "Checking encoded/decoded with 2 buffers transaction1" << std::endl;
+ check_content_transaction1(decode2);
+}