]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
os/Transaction: page align write data buffers to improve performance 57740/head
authorBill Scales <bill_scales@uk.ibm.com>
Mon, 24 Mar 2025 10:52:28 +0000 (10:52 +0000)
committerBill Scales <bill_scales@uk.ibm.com>
Fri, 11 Apr 2025 12:31:37 +0000 (13:31 +0100)
Align write data in Objectstore::Transaction encoded buffers so that
RepOp and ECSubOpWrite messages align data on the receiving OSD to
avoid a later memmove. Also fix ECSubOpReadReply messages in a
similar way so that read data is aligned on the receiving OSD.

Signed-off-by: Radoslaw Zarzynski <rzarzyns@redhat.com>
Signed-off-by: Bill Scales <bill_scales@uk.ibm.com>
17 files changed:
src/crimson/osd/ops_executer.cc
src/crimson/osd/pg.cc
src/crimson/osd/replicated_backend.cc
src/crimson/osd/replicated_backend.h
src/messages/MOSDECSubOpReadReply.h
src/messages/MOSDECSubOpWrite.h
src/messages/MOSDRepOp.h
src/msg/Message.h
src/os/Transaction.h
src/osd/ECCommon.cc
src/osd/ECListener.h
src/osd/ECMsgTypes.cc
src/osd/ECMsgTypes.h
src/osd/ReplicatedBackend.cc
src/osd/ReplicatedBackend.h
src/test/objectstore/ObjectStoreTransactionBenchmark.cc
src/test/objectstore/test_transaction.cc

index 7e868ab98d8532059a35d9d0c468dd419b120c1e..eb4311eede83bb4f1f215b2955c5c7aafce83fb6 100644 (file)
@@ -1102,17 +1102,18 @@ void OpsExecuter::apply_stats()
   pg->apply_stats(get_target(), delta_stats);
 }
 
-OpsExecuter::OpsExecuter(Ref<PG> pg,
+OpsExecuter::OpsExecuter(Ref<PG> _pg,
                          ObjectContextRef _obc,
                          const OpInfo& op_info,
                          abstracted_msg_t&& msg,
                          crimson::net::ConnectionXcoreRef conn,
                          const SnapContext& _snapc)
-  : pg(std::move(pg)),
+  : pg(std::move(_pg)),
     obc(std::move(_obc)),
     op_info(op_info),
     msg(std::move(msg)),
     conn(conn),
+    txn(pg->min_peer_features()),
     snapc(_snapc)
 {
   if (op_info.may_write() && should_clone(*obc, snapc)) {
index 2b772e5f799e89f2457f3328f867599abfa58368..5e87fc045f75f49a0decf0adc9809fa219c71a09 100644 (file)
@@ -1221,8 +1221,10 @@ PG::handle_rep_op_fut PG::handle_rep_op(Ref<MOSDRepOp> req)
   DEBUGDPP("{}", *this, *req);
 
   ceph::os::Transaction txn;
-  auto encoded_txn = req->get_data().cbegin();
-  decode(txn, encoded_txn);
+  auto encoded_txn_p = req->get_middle().cbegin();
+  auto encoded_txn_d = req->get_data().cbegin();
+  txn.decode(req->get_middle().length() != 0 ? encoded_txn_p : encoded_txn_d,
+             encoded_txn_d);
   auto p = req->logbl.cbegin();
   std::vector<pg_log_entry_t> log_entries;
   decode(log_entries, p);
index 48192d77286e79f29ede574bfc652ccccdbe8a0a..392cb9735ea4d9c2a22611341515f0111c36b2a4 100644 (file)
@@ -39,7 +39,8 @@ ReplicatedBackend::_read(const hobject_t& hoid,
 MURef<MOSDRepOp> ReplicatedBackend::new_repop_msg(
   const pg_shard_t &pg_shard,
   const hobject_t &hoid,
-  const bufferlist &encoded_txn,
+  bufferlist &encoded_txn_p_bl,
+  bufferlist &encoded_txn_d_bl,
   const osd_op_params_t &osd_op_p,
   epoch_t min_epoch,
   epoch_t map_epoch,
@@ -59,7 +60,13 @@ MURef<MOSDRepOp> ReplicatedBackend::new_repop_msg(
     tid,
     osd_op_p.at_version);
   if (send_op) {
-    m->set_data(encoded_txn);
+    if (encoded_txn_d_bl.length() != 0) {
+      m->set_txn_payload(encoded_txn_p_bl);
+      m->set_data(encoded_txn_d_bl);
+    } else {
+      // Pre-tentacle format - everything in data
+      m->set_data(encoded_txn_p_bl);
+    }
   } else {
     ceph::os::Transaction t;
     bufferlist bl;
@@ -97,8 +104,8 @@ ReplicatedBackend::submit_transaction(
       pg_shards.size(),
       osd_op_p.at_version,
       pg.get_last_complete()).first;
-  bufferlist encoded_txn;
-  encode(txn, encoded_txn);
+  bufferlist encoded_txn_p_bl, encoded_txn_d_bl;
+  txn.encode(encoded_txn_p_bl, encoded_txn_d_bl, pg.min_peer_features());
 
   bool is_delete = false;
   for (auto &le : log_entries) {
@@ -120,11 +127,11 @@ ReplicatedBackend::submit_transaction(
     MURef<MOSDRepOp> m;
     if (pg.should_send_op(pg_shard, hoid)) {
       m = new_repop_msg(
-       pg_shard, hoid, encoded_txn, osd_op_p,
+       pg_shard, hoid, encoded_txn_p_bl, encoded_txn_d_bl, osd_op_p,
        min_epoch, map_epoch, log_entries, true, tid);
     } else {
       m = new_repop_msg(
-       pg_shard, hoid, encoded_txn, osd_op_p,
+       pg_shard, hoid, encoded_txn_p_bl, encoded_txn_d_bl, osd_op_p,
        min_epoch, map_epoch, log_entries, false, tid);
       if (pg.is_missing_on_peer(pg_shard, hoid)) {
        if (_new_clone) {
index ddd3bed1a6fe464788c52bd34121d534ff6f5733..9f4132f7ae70c96d1925cc8fce14fcf2c202e677 100644 (file)
@@ -70,7 +70,8 @@ private:
   MURef<MOSDRepOp> new_repop_msg(
     const pg_shard_t &pg_shard,
     const hobject_t &hoid,
-    const bufferlist &encoded_txn,
+    bufferlist &encoded_txn_p_bl,
+    bufferlist &encoded_txn_d_bl,
     const osd_op_params_t &osd_op_p,
     epoch_t min_epoch,
     epoch_t map_epoch,
index b71c22db43b5a418feaebcb671174e6453252b9f..184a952ac69b291337c13770bb4399f075c106fd 100644 (file)
@@ -48,9 +48,10 @@ public:
   void decode_payload() override {
     using ceph::decode;
     auto p = payload.cbegin();
+    auto d = data.cbegin();
     decode(pgid, p);
     decode(map_epoch, p);
-    decode(op, p);
+    op.decode(p, d);
     if (header.version >= 2) {
       decode(min_epoch, p);
       decode_trace(p);
@@ -63,7 +64,7 @@ public:
     using ceph::encode;
     encode(pgid, payload);
     encode(map_epoch, payload);
-    encode(op, payload);
+    op.encode(payload, data, features);
     encode(min_epoch, payload);
     encode_trace(payload, features);
   }
index e1e37d096df1d0bf0756ae605a68eae460b40c2e..34f0fb55dd2f05ca11b6298113d7cc222a5039c4 100644 (file)
@@ -52,9 +52,10 @@ public:
   void decode_payload() override {
     using ceph::decode;
     auto p = payload.cbegin();
+    auto d = data.cbegin();
     decode(pgid, p);
     decode(map_epoch, p);
-    decode(op, p);
+    op.decode(p, d);
     if (header.version >= 2) {
       decode(min_epoch, p);
       decode_trace(p);
@@ -67,7 +68,7 @@ public:
     using ceph::encode;
     encode(pgid, payload);
     encode(map_epoch, payload);
-    encode(op, payload);
+    op.encode(payload, data, features);
     encode(min_epoch, payload);
     encode_trace(payload, features);
   }
index 5e8b386ba0a519489e7fe88278c447972c849808..53b3ae342c77b3a5450a39ed0e8b253385f95cd9 100644 (file)
@@ -85,6 +85,8 @@ public:
   /// non-empty if this transaction involves a hit_set history update
   std::optional<pg_hit_set_history_t> updated_hit_set_history;
 
+  bufferlist txn_payload;
+
   epoch_t get_map_epoch() const override {
     return map_epoch;
   }
@@ -99,6 +101,11 @@ public:
     return data.length();
   }
 
+  void set_txn_payload(bufferlist bl)
+  {
+    txn_payload = bl;
+  }
+
   void decode_payload() override {
     using ceph::decode;
     p = payload.cbegin();
@@ -159,6 +166,8 @@ public:
     encode(from, payload);
     encode(updated_hit_set_history, payload);
     encode(pg_committed_to, payload);
+    bufferlist middle(txn_payload);
+    set_middle(middle);
   }
 
   MOSDRepOp()
index 5c90fb832d7982edd1a2691cc24bf389d924459f..b66c8ccafaa03e74b7d571e70e88895e97ffcc37 100644 (file)
@@ -441,6 +441,7 @@ public:
       byte_throttler->take(middle.length());
   }
   ceph::buffer::list& get_middle() { return middle; }
+  const ceph::buffer::list& get_middle() const { return middle; }
 
   void set_data(const ceph::buffer::list &bl) {
     if (byte_throttler)
index 19896c79f95e29f1171256015d1515687eb7b332..91c7811f11542fc75aba115c0375f221d77f076d 100644 (file)
@@ -179,41 +179,29 @@ public:
 
   struct TransactionData {
     ceph_le64 ops;
-    ceph_le32 largest_data_len;
-    ceph_le32 largest_data_off;
-    ceph_le32 largest_data_off_in_data_bl;
+    ceph_le32 unused1;
+    ceph_le32 unused2;
+    ceph_le32 unused3;
     ceph_le32 fadvise_flags;
 
     TransactionData() noexcept :
       ops(0),
-      largest_data_len(0),
-      largest_data_off(0),
-      largest_data_off_in_data_bl(0),
+      unused1(0),
+      unused2(0),
+      unused3(0),
       fadvise_flags(0) { }
 
     // override default move operations to reset default values
     TransactionData(TransactionData&& other) noexcept :
       ops(other.ops),
-      largest_data_len(other.largest_data_len),
-      largest_data_off(other.largest_data_off),
-      largest_data_off_in_data_bl(other.largest_data_off_in_data_bl),
       fadvise_flags(other.fadvise_flags) {
       other.ops = 0;
-      other.largest_data_len = 0;
-      other.largest_data_off = 0;
-      other.largest_data_off_in_data_bl = 0;
       other.fadvise_flags = 0;
     }
     TransactionData& operator=(TransactionData&& other) noexcept {
       ops = other.ops;
-      largest_data_len = other.largest_data_len;
-      largest_data_off = other.largest_data_off;
-      largest_data_off_in_data_bl = other.largest_data_off_in_data_bl;
       fadvise_flags = other.fadvise_flags;
       other.ops = 0;
-      other.largest_data_len = 0;
-      other.largest_data_off = 0;
-      other.largest_data_off_in_data_bl = 0;
       other.fadvise_flags = 0;
       return *this;
     }
@@ -237,8 +225,24 @@ private:
 
   uint32_t coll_id = 0;
   uint32_t object_id = 0;
+  uint64_t data_features = 0;
+
+  /* Transactions are encoded/decoded in two formats. The old format
+   * (v <= 9) does not page align write data in inter-OSD messages and
+   * can degrade performance. New, aligned format (v >= 10) encodes
+   * aligned data at the start of the message. Transactions are encoded
+   * in the format determined by the replica set's common feature bits.
+   * When a message is recevied and decoded the transaction has a fixed
+   * format, thereafter there are limitations for encode and append
+   * operations
+   */
+  ceph::buffer::list data_aligned_bl;
+  ceph::buffer::list data_misaligned_bl;
+
+  bool is_format_aligned() const {
+    return HAVE_FEATURE(data_features, SERVER_TENTACLE);
+  }
 
-  ceph::buffer::list data_bl;
   ceph::buffer::list op_bl;
 
   std::list<Context *> on_applied;
@@ -247,13 +251,8 @@ private:
 
 public:
   Transaction() = default;
-
-  explicit Transaction(ceph::buffer::list::const_iterator &dp) {
-    decode(dp);
-  }
-  explicit Transaction(ceph::buffer::list &nbl) {
-    auto dp = nbl.cbegin();
-    decode(dp);
+  explicit Transaction(uint64_t data_features)
+    : data_features(data_features) {
   }
 
   // override default move operations to reset default values
@@ -263,7 +262,9 @@ public:
     object_index(std::move(other.object_index)),
     coll_id(other.coll_id),
     object_id(other.object_id),
-    data_bl(std::move(other.data_bl)),
+    data_features(other.data_features),
+    data_aligned_bl(std::move(other.data_aligned_bl)),
+    data_misaligned_bl(std::move(other.data_misaligned_bl)),
     op_bl(std::move(other.op_bl)),
     on_applied(std::move(other.on_applied)),
     on_commit(std::move(other.on_commit)),
@@ -276,9 +277,11 @@ public:
     data = std::move(other.data);
     coll_index = std::move(other.coll_index);
     object_index = std::move(other.object_index);
+    data_features = other.data_features;
     coll_id = other.coll_id;
     object_id = other.object_id;
-    data_bl = std::move(other.data_bl);
+    data_aligned_bl = std::move(other.data_aligned_bl);
+    data_misaligned_bl = std::move(other.data_misaligned_bl);
     op_bl = std::move(other.op_bl);
     on_applied = std::move(other.on_applied);
     on_commit = std::move(other.on_commit);
@@ -394,12 +397,14 @@ public:
     std::swap(on_commit, other.on_commit);
     std::swap(on_applied_sync, other.on_applied_sync);
 
+    std::swap(data_features, other.data_features);
     std::swap(coll_index, other.coll_index);
     std::swap(object_index, other.object_index);
     std::swap(coll_id, other.coll_id);
     std::swap(object_id, other.object_id);
     op_bl.swap(other.op_bl);
-    data_bl.swap(other.data_bl);
+    data_aligned_bl.swap(other.data_aligned_bl);
+    data_misaligned_bl.swap(other.data_misaligned_bl);
   }
 
   void _update_op(Op* op,
@@ -518,13 +523,10 @@ public:
   }
   /// Append the operations of the parameter to this Transaction. Those operations are removed from the parameter Transaction
   void append(Transaction& other) {
-
+    //appending a transaction in new format with a transaction in old format
+    //or versa versa is not supported.
+    ceph_assert(data_features == other.data_features);
     data.ops = data.ops + other.data.ops;
-    if (other.data.largest_data_len > data.largest_data_len) {
-       data.largest_data_len = other.data.largest_data_len;
-       data.largest_data_off = other.data.largest_data_off;
-       data.largest_data_off_in_data_bl = data_bl.length() + other.data.largest_data_off_in_data_bl;
-    }
     data.fadvise_flags = data.fadvise_flags | other.data.fadvise_flags;
     on_applied.splice(on_applied.end(), other.on_applied);
     on_commit.splice(on_commit.end(), other.on_commit);
@@ -564,15 +566,17 @@ public:
 
     //append op_bl
     op_bl.append(other_op_bl);
-    //append data_bl
-    data_bl.append(other.data_bl);
+    //append data_bl's
+    data_aligned_bl.append(other.data_aligned_bl);
+    data_misaligned_bl.append(other.data_misaligned_bl);
   }
 
   /** Inquires about the Transaction as a whole. */
 
   /// How big is the encoded Transaction buffer?
   uint64_t get_encoded_bytes() {
-    //layout: data_bl + op_bl + coll_index + object_index + data
+    //layout: data_misaligned_bl + op_bl + coll_index + object_index +
+    //        data + data_features
 
     // coll_index size, object_index size and sizeof(transaction_data)
     // all here, so they may be computed at compile-time
@@ -591,7 +595,9 @@ public:
        final_size += p->first.encoded_size();
     }
 
-    return data_bl.length() +
+    final_size += sizeof(data_features);
+
+    return data_misaligned_bl.length() +
        op_bl.length() +
        final_size;
   }
@@ -599,42 +605,22 @@ public:
   /// Retain old version for regression testing purposes
   uint64_t get_encoded_bytes_test() {
     using ceph::encode;
-    //layout: data_bl + op_bl + coll_index + object_index + data
+    //layout: data_misaligned_bl + op_bl + coll_index + object_index +
+    //        data + data_features
     ceph::buffer::list bl;
     encode(coll_index, bl);
     encode(object_index, bl);
 
-    return data_bl.length() +
+    return data_misaligned_bl.length() +
        op_bl.length() +
        bl.length() +
-       sizeof(data);
+       sizeof(data) +
+       sizeof(data_features);
   }
 
   uint64_t get_num_bytes() {
     return get_encoded_bytes();
   }
-  /// Size of largest data buffer to the "write" operation encountered so far
-  uint32_t get_data_length() {
-    return data.largest_data_len;
-  }
-  /// offset within the encoded buffer to the start of the largest data buffer that's encoded
-  uint32_t get_data_offset() {
-    if (data.largest_data_off_in_data_bl) {
-       return data.largest_data_off_in_data_bl +
-         sizeof(__u8) +      // encode struct_v
-         sizeof(__u8) +      // encode compat_v
-         sizeof(__u32) +     // encode len
-         sizeof(__u32);      // data_bl len
-    }
-    return 0;  // none
-  }
-  /// offset of buffer as aligned to destination within object.
-  int get_data_alignment() {
-    if (!data.largest_data_len)
-       return 0;
-    return (0 - get_data_offset()) & ~CEPH_PAGE_MASK;
-  }
-  /// Is the Transaction empty (no operations)
   bool empty() {
     return !data.ops;
   }
@@ -658,7 +644,15 @@ public:
     uint64_t ops;
     char* op_buffer_p;
 
-    ceph::buffer::list::const_iterator data_bl_p;
+    ceph::buffer::list::const_iterator data_aligned_bl_p;
+    ceph::buffer::list::const_iterator data_misaligned_bl_p;
+#ifdef WITH_CRIMSON
+    bool new_format;
+#else
+    const bool new_format;
+#endif
+
+    Op *op;
 
   public:
     std::vector<coll_t> colls;
@@ -667,7 +661,9 @@ public:
   private:
     explicit iterator(Transaction *t)
       : t(t),
-         data_bl_p(t->data_bl.cbegin()),
+        data_aligned_bl_p(t->data_aligned_bl.cbegin()),
+        data_misaligned_bl_p(t->data_misaligned_bl.cbegin()),
+        new_format(t->is_format_aligned()),
         colls(t->coll_index.size()),
         objects(t->object_index.size()) {
 
@@ -699,39 +695,69 @@ public:
     Op* decode_op() {
       ceph_assert(ops > 0);
 
-      Op* op = reinterpret_cast<Op*>(op_buffer_p);
+      op = reinterpret_cast<Op*>(op_buffer_p);
       op_buffer_p += sizeof(Op);
       ops--;
 
       return op;
     }
     std::string decode_string() {
-       using ceph::decode;
+      using ceph::decode;
       std::string s;
-      decode(s, data_bl_p);
+      decode(s, data_misaligned_bl_p);
       return s;
     }
     void decode_bl(ceph::buffer::list& bl) {
-       using ceph::decode;
-      decode(bl, data_bl_p);
+      using ceph::decode;
+      if (!new_format) {
+        decode(bl, data_misaligned_bl_p);
+       return;
+      }
+      if (op->op != OP_WRITE) {
+        decode(bl, data_misaligned_bl_p);
+       return;
+      }
+      uint64_t alignstart = (0 - op->off) & ~CEPH_PAGE_MASK;
+      if (op->len >= CEPH_PAGE_SIZE + alignstart) {
+        uint64_t alignlen = (op->len - alignstart) & CEPH_PAGE_MASK;
+        uint64_t suffixstart = alignstart + alignlen;
+        if (alignstart!=0) {
+          // Misaligned chunk at start
+          bufferlist prefix;
+          decode_nohead(alignstart, prefix, data_misaligned_bl_p);
+          bl.append(prefix);
+        }
+        // Aligned chunk in middle
+        bufferlist aligned;
+        decode_nohead(alignlen, aligned, data_aligned_bl_p);
+        bl.append(aligned);
+        if (suffixstart != op->len) {
+          // Misaligned chunk at end
+          bufferlist suffix;
+          decode_nohead(op->len-suffixstart, suffix, data_misaligned_bl_p);
+          bl.append(suffix);
+        }
+      } else {
+        decode_nohead(op->len, bl, data_misaligned_bl_p);
+      }
     }
     void decode_attrset(std::map<std::string,ceph::buffer::ptr>& aset) {
-       using ceph::decode;
-      decode(aset, data_bl_p);
+      using ceph::decode;
+      decode(aset, data_misaligned_bl_p);
     }
     void decode_attrset(std::map<std::string,ceph::buffer::list>& aset) {
-       using ceph::decode;
-      decode(aset, data_bl_p);
+      using ceph::decode;
+      decode(aset, data_misaligned_bl_p);
     }
     void decode_attrset_bl(ceph::buffer::list *pbl) {
-       decode_str_str_map_to_bl(data_bl_p, pbl);
+      decode_str_str_map_to_bl(data_misaligned_bl_p, pbl);
     }
     void decode_keyset(std::set<std::string> &keys){
-       using ceph::decode;
-      decode(keys, data_bl_p);
+      using ceph::decode;
+      decode(keys, data_misaligned_bl_p);
     }
     void decode_keyset_bl(ceph::buffer::list *pbl){
-      decode_str_set_to_bl(data_bl_p, pbl);
+      decode_str_set_to_bl(data_misaligned_bl_p, pbl);
     }
 
     const ghobject_t &get_oid(uint32_t oid_id) {
@@ -843,23 +869,42 @@ public:
   void write(const coll_t& cid, const ghobject_t& oid, uint64_t off, uint64_t len,
               const ceph::buffer::list& write_data, uint32_t flags = 0) {
     using ceph::encode;
-    uint32_t orig_len = data_bl.length();
     Op* _op = _get_next_op();
+    uint64_t alignstart = (0 - off) & ~CEPH_PAGE_MASK;
     _op->op = OP_WRITE;
     _op->cid = _get_coll_id(cid);
     _op->oid = _get_object_id(oid);
     _op->off = off;
     _op->len = len;
-    encode(write_data, data_bl);
-
     ceph_assert(len == write_data.length());
     data.fadvise_flags = data.fadvise_flags | flags;
-    if (write_data.length() > data.largest_data_len) {
-       data.largest_data_len = write_data.length();
-       data.largest_data_off = off;
-       data.largest_data_off_in_data_bl = orig_len + sizeof(__u32);  // we are about to
-    }
     data.ops = data.ops + 1;
+    if (!is_format_aligned()) {
+      encode(write_data, data_misaligned_bl);
+      return;
+    }
+    if (len >= CEPH_PAGE_SIZE + alignstart) {
+      uint64_t alignlen = (len - alignstart) & CEPH_PAGE_MASK;
+      uint64_t suffixstart = alignstart + alignlen;
+      if (alignstart != 0) {
+        // Misaligned chunk at start
+        bufferlist prefix;
+        prefix.substr_of(write_data, 0, alignstart);
+        encode_nohead(prefix, data_misaligned_bl);
+      }
+      // Aligned chunk in middle
+      bufferlist aligned;
+      aligned.substr_of(write_data, alignstart, alignlen);
+      encode_nohead(aligned, data_aligned_bl);
+      if (suffixstart != len) {
+        // Misaligned chunk at end
+        bufferlist suffix;
+        suffix.substr_of(write_data, suffixstart, len-suffixstart);
+        encode_nohead(suffix, data_misaligned_bl);
+      }
+    } else {
+      encode_nohead(write_data, data_misaligned_bl);
+    }
   }
   /**
    * zero out the indicated byte range within an object. Some
@@ -909,8 +954,8 @@ public:
     _op->op = OP_SETATTR;
     _op->cid = _get_coll_id(cid);
     _op->oid = _get_object_id(oid);
-    encode(s, data_bl);
-    encode(val, data_bl);
+    encode(s, data_misaligned_bl);
+    encode(val, data_misaligned_bl);
     data.ops = data.ops + 1;
   }
   /// Set multiple xattrs of an object
@@ -922,7 +967,7 @@ public:
     _op->op = OP_SETATTRS;
     _op->cid = _get_coll_id(cid);
     _op->oid = _get_object_id(oid);
-    encode(attrset, data_bl);
+    encode(attrset, data_misaligned_bl);
     data.ops = data.ops + 1;
   }
   /// Set multiple xattrs of an object
@@ -934,7 +979,7 @@ public:
     _op->op = OP_SETATTRS;
     _op->cid = _get_coll_id(cid);
     _op->oid = _get_object_id(oid);
-    encode(attrset, data_bl);
+    encode(attrset, data_misaligned_bl);
     data.ops = data.ops + 1;
   }
   /// remove an xattr from an object
@@ -949,7 +994,7 @@ public:
     _op->op = OP_RMATTR;
     _op->cid = _get_coll_id(cid);
     _op->oid = _get_object_id(oid);
-    encode(s, data_bl);
+    encode(s, data_misaligned_bl);
     data.ops = data.ops + 1;
   }
   /// remove all xattrs from an object
@@ -1029,7 +1074,7 @@ public:
     _op->op = OP_COLL_HINT;
     _op->cid = _get_coll_id(cid);
     _op->hint = type;
-    encode(hint, data_bl);
+    encode(hint, data_misaligned_bl);
     data.ops = data.ops + 1;
   }
 
@@ -1099,7 +1144,7 @@ public:
     _op->op = OP_OMAP_SETKEYS;
     _op->cid = _get_coll_id(cid);
     _op->oid = _get_object_id(oid);
-    encode(attrset, data_bl);
+    encode(attrset, data_misaligned_bl);
     data.ops = data.ops + 1;
   }
 
@@ -1113,7 +1158,7 @@ public:
     _op->op = OP_OMAP_SETKEYS;
     _op->cid = _get_coll_id(cid);
     _op->oid = _get_object_id(oid);
-    data_bl.append(attrset_bl);
+    data_misaligned_bl.append(attrset_bl);
     data.ops = data.ops + 1;
   }
 
@@ -1128,7 +1173,7 @@ public:
     _op->op = OP_OMAP_RMKEYS;
     _op->cid = _get_coll_id(cid);
     _op->oid = _get_object_id(oid);
-    encode(keys, data_bl);
+    encode(keys, data_misaligned_bl);
     data.ops = data.ops + 1;
   }
 
@@ -1143,8 +1188,8 @@ public:
     _op->cid = _get_coll_id(cid);
     _op->oid = _get_object_id(oid);
     using ceph::encode;
-    encode((uint32_t)1, data_bl);
-    encode(key, data_bl);
+    encode((uint32_t)1, data_misaligned_bl);
+    encode(key, data_misaligned_bl);
     data.ops = data.ops + 1;
   }
 
@@ -1158,7 +1203,7 @@ public:
     _op->op = OP_OMAP_RMKEYS;
     _op->cid = _get_coll_id(cid);
     _op->oid = _get_object_id(oid);
-    data_bl.append(keys_bl);
+    data_misaligned_bl.append(keys_bl);
     data.ops = data.ops + 1;
   }
 
@@ -1174,8 +1219,8 @@ public:
     _op->op = OP_OMAP_RMKEYRANGE;
     _op->cid = _get_coll_id(cid);
     _op->oid = _get_object_id(oid);
-    encode(first, data_bl);
-    encode(last, data_bl);
+    encode(first, data_misaligned_bl);
+    encode(last, data_misaligned_bl);
     data.ops = data.ops + 1;
   }
 
@@ -1189,7 +1234,7 @@ public:
     _op->op = OP_OMAP_RMKEYRANGE;
     _op->cid = _get_coll_id(cid);
     _op->oid = _get_object_id(oid);
-    data_bl.append(keys_bl);
+    data_misaligned_bl.append(keys_bl);
     data.ops = data.ops + 1;
   }
 
@@ -1197,14 +1242,14 @@ public:
   void omap_setheader(
     const coll_t &cid,             ///< [in] Collection containing oid
     const ghobject_t &oid,  ///< [in] Object
-    const ceph::buffer::list &bl    ///< [in] Header value
+    const ceph::buffer::list &hdr_bl    ///< [in] Header value
     ) {
     using ceph::encode;
     Op* _op = _get_next_op();
     _op->op = OP_OMAP_SETHEADER;
     _op->cid = _get_coll_id(cid);
     _op->oid = _get_object_id(oid);
-    encode(bl, data_bl);
+    encode(hdr_bl, data_misaligned_bl);
     data.ops = data.ops + 1;
   }
 
@@ -1266,30 +1311,77 @@ public:
     data.ops = data.ops + 1;
   }
 
-  void encode(ceph::buffer::list& bl) const {
-    //layout: data_bl + op_bl + coll_index + object_index + data
-    ENCODE_START(9, 9, bl);
-    encode(data_bl, bl);
-    encode(op_bl, bl);
-    encode(coll_index, bl);
-    encode(object_index, bl);
-    data.encode(bl);
-    ENCODE_FINISH(bl);
+  void encode(ceph::buffer::list& bl) const
+  {
+    encode(bl, bl);
+  }
+
+  void encode(ceph::buffer::list &p_bl,
+             ceph::buffer::list &d_bl,
+             uint64_t features=0) const
+  {
+    //see also get_encoded_bytes which assumes layout version 9
+
+    //layout version 9:
+    // buffer = data_misaligned_bl + op_bl + coll_index + object_index + data
+    //layout version 10 (for inter-OSD messages):
+    // payload = op_bl + coll_index + object_index + data
+    // data = data_aligned_bl + data_misaligned_bl
+
+    uint8_t ver = HAVE_FEATURE(features, SERVER_TENTACLE) ? 10 : 9;
+    if (is_format_aligned()) {
+      //cannot encode a new format transaction in the old format
+      ceph_assert(ver >= 10);
+    }
+    ENCODE_START(ver, ver, p_bl);
+    if (ver < 10) {
+      encode(data_misaligned_bl, p_bl);
+    }
+    encode(op_bl, p_bl);
+    encode(coll_index, p_bl);
+    encode(object_index, p_bl);
+    data.encode(p_bl);
+
+    if (ver >= 10) {
+      encode(data_features, p_bl);
+      encode(data_aligned_bl.length(), p_bl);
+      encode_nohead(data_aligned_bl, d_bl);
+      encode(data_misaligned_bl.length(), p_bl);
+      encode_nohead(data_misaligned_bl, d_bl);
+    }
+    ENCODE_FINISH(p_bl);
   }
 
   void decode(ceph::buffer::list::const_iterator &bl) {
-    DECODE_START(9, bl);
+    decode(bl, bl);
+  }
+
+  void decode(ceph::buffer::list::const_iterator &p_bl,
+             ceph::buffer::list::const_iterator &d_bl) {
+    DECODE_START(10, p_bl);
     DECODE_OLDEST(9);
 
-    decode(data_bl, bl);
-    decode(op_bl, bl);
-    decode(coll_index, bl);
-    decode(object_index, bl);
-    data.decode(bl);
+    if (struct_v < 10) {
+      decode(data_misaligned_bl, p_bl);
+      data_features = 0;
+    }
+    decode(op_bl, p_bl);
+    decode(coll_index, p_bl);
+    decode(object_index, p_bl);
+    data.decode(p_bl);
     coll_id = coll_index.size();
     object_id = object_index.size();
 
-    DECODE_FINISH(bl);
+    if (struct_v >= 10) {
+      decode(data_features, p_bl);
+      unsigned length;
+      decode(length, p_bl);
+      decode_nohead(length, data_aligned_bl, d_bl);
+      decode(length, p_bl);
+      decode_nohead(length, data_misaligned_bl, d_bl);
+    }
+
+    DECODE_FINISH(p_bl);
   }
 
   void dump(ceph::Formatter *f);
index 029ef3f1a5c375f4f076c0d5f0c51004c211032e..1b197284161df4f3ed07c10ae1870f721bc5791f 100644 (file)
@@ -833,7 +833,7 @@ bool ECCommon::RMWPipeline::try_reads_to_commit()
         get_parent()->get_acting_recovery_backfill_shards().begin();
        i != get_parent()->get_acting_recovery_backfill_shards().end();
        ++i) {
-    trans[i->shard];
+    trans.emplace(i->shard, get_parent()->min_peer_features());
   }
 
   op->trace.event("start ec write");
index 79a0542661b581886c3f01374c7f26ef67f4a373..39602dcd064da117de690c6cd50fb781d9536e92 100644 (file)
@@ -27,6 +27,7 @@ struct ECListener {
   virtual const OSDMapRef& pgb_get_osdmap() const = 0;
   virtual epoch_t pgb_get_osdmap_epoch() const = 0;
   virtual const pg_info_t &get_info() const = 0;
+  virtual uint64_t min_peer_features() const = 0;
   /**
    * Called when a pull on soid cannot be completed due to
    * down peers
index c3ec354dcd1134df4eaf896eb6cc422ac0b802c8..1ccdc188d0ee977f463a3ab35c57f7ad915264de 100644 (file)
@@ -26,53 +26,73 @@ using namespace std::literals;
 
 void ECSubWrite::encode(bufferlist &bl) const
 {
-  ENCODE_START(4, 1, bl);
-  encode(from, bl);
-  encode(tid, bl);
-  encode(reqid, bl);
-  encode(soid, bl);
-  encode(stats, bl);
-  encode(t, bl);
-  encode(at_version, bl);
-  encode(trim_to, bl);
-  encode(log_entries, bl);
-  encode(temp_added, bl);
-  encode(temp_removed, bl);
-  encode(updated_hit_set_history, bl);
-  encode(pg_committed_to, bl);
-  encode(backfill_or_async_recovery, bl);
-  ENCODE_FINISH(bl);
+  encode(bl, bl);
+}
+
+void ECSubWrite::encode(bufferlist &p_bl, bufferlist &d_bl, uint64_t features) const
+{
+  uint8_t ver = HAVE_FEATURE(features, SERVER_TENTACLE) ? 5 : 4;
+  ENCODE_START(ver, 1, p_bl);
+  encode(from, p_bl);
+  encode(tid, p_bl);
+  encode(reqid, p_bl);
+  encode(soid, p_bl);
+  encode(stats, p_bl);
+  if (ver >= 5) {
+    t.encode(p_bl, d_bl, features);
+  } else {
+    t.encode(p_bl, p_bl, features);
+  }
+  encode(at_version, p_bl);
+  encode(trim_to, p_bl);
+  encode(log_entries, p_bl);
+  encode(temp_added, p_bl);
+  encode(temp_removed, p_bl);
+  encode(updated_hit_set_history, p_bl);
+  encode(pg_committed_to, p_bl);
+  encode(backfill_or_async_recovery, p_bl);
+  ENCODE_FINISH(p_bl);
 }
 
 void ECSubWrite::decode(bufferlist::const_iterator &bl)
 {
-  DECODE_START(4, bl);
-  decode(from, bl);
-  decode(tid, bl);
-  decode(reqid, bl);
-  decode(soid, bl);
-  decode(stats, bl);
-  decode(t, bl);
-  decode(at_version, bl);
-  decode(trim_to, bl);
-  decode(log_entries, bl);
-  decode(temp_added, bl);
-  decode(temp_removed, bl);
+  decode(bl, bl);
+}
+
+void ECSubWrite::decode(bufferlist::const_iterator &p_bl,
+                       bufferlist::const_iterator &d_bl)
+{
+  DECODE_START(5, p_bl);
+  decode(from, p_bl);
+  decode(tid, p_bl);
+  decode(reqid, p_bl);
+  decode(soid, p_bl);
+  decode(stats, p_bl);
+  if (struct_v >= 5) {
+    t.decode(p_bl, d_bl);
+  } else {
+    t.decode(p_bl, p_bl);
+  }
+  decode(at_version, p_bl);
+  decode(trim_to, p_bl);
+  decode(log_entries, p_bl);
+  decode(temp_added, p_bl);
+  decode(temp_removed, p_bl);
   if (struct_v >= 2) {
-    decode(updated_hit_set_history, bl);
+    decode(updated_hit_set_history, p_bl);
   }
   if (struct_v >= 3) {
-    decode(pg_committed_to, bl);
+    decode(pg_committed_to, p_bl);
   } else {
     pg_committed_to = trim_to;
   }
   if (struct_v >= 4) {
-    decode(backfill_or_async_recovery, bl);
+    decode(backfill_or_async_recovery, p_bl);
   } else {
     // The old protocol used an empty transaction to indicate backfill or async_recovery
     backfill_or_async_recovery = t.empty();
   }
-  DECODE_FINISH(bl);
+  DECODE_FINISH(p_bl);
 }
 
 std::ostream &operator<<(
@@ -290,24 +310,82 @@ void ECSubRead::generate_test_instances(list<ECSubRead*>& o)
 
 void ECSubReadReply::encode(bufferlist &bl) const
 {
-  ENCODE_START(1, 1, bl);
-  encode(from, bl);
-  encode(tid, bl);
-  encode(buffers_read, bl);
-  encode(attrs_read, bl);
-  encode(errors, bl);
-  ENCODE_FINISH(bl);
+  encode(bl, bl);
+}
+
+void ECSubReadReply::encode(bufferlist &p_bl,
+                           bufferlist &d_bl,
+                           uint64_t features) const
+{
+  uint8_t ver = HAVE_FEATURE(features, SERVER_TENTACLE) ? 2 : 1;
+  ENCODE_START(ver, ver, p_bl);
+  encode(from, p_bl);
+  encode(tid, p_bl);
+  if (ver >= 2) {
+    // Manual encode of std::map<hobject_t, std::list<std::pair<uint64_t,
+    //   ceph::buffer::list> >> buffers_read;
+    // data is encoded into d_bl to keep it aligned
+    __u32 nmap = (__u32)(buffers_read.size());
+    encode(nmap, p_bl);
+    for (auto [oid, datalist] : buffers_read) {
+      encode(oid, p_bl);
+      __u32 nlist = (__u32)(datalist.size());
+      encode(nlist, p_bl);
+      for (auto [result,bl] : datalist) {
+       encode(result, p_bl);
+       encode(bl.length(), p_bl);
+       encode_nohead(bl, d_bl);
+      }
+    }
+  } else {
+    encode(buffers_read, p_bl);
+  }
+  encode(attrs_read, p_bl);
+  encode(errors, p_bl);
+  ENCODE_FINISH(p_bl);
 }
 
 void ECSubReadReply::decode(bufferlist::const_iterator &bl)
 {
-  DECODE_START(1, bl);
-  decode(from, bl);
-  decode(tid, bl);
-  decode(buffers_read, bl);
-  decode(attrs_read, bl);
-  decode(errors, bl);
-  DECODE_FINISH(bl);
+  decode(bl, bl);
+}
+
+void ECSubReadReply::decode(bufferlist::const_iterator &p_bl,
+                           bufferlist::const_iterator &d_bl)
+{
+  DECODE_START(2, p_bl);
+  decode(from, p_bl);
+  decode(tid, p_bl);
+  if (struct_v < 2) {
+    decode(buffers_read, p_bl);
+  } else {
+    // Manual decode of std::map<hobject_t, std::list<std::pair<uint64_t,
+    //   ceph::buffer::list> >> buffers_read;
+    // data is decoded from d_bl to keep it aligned
+    __u32 nmap;
+    decode(nmap, p_bl);
+    buffers_read.clear();
+    while (nmap--) {
+      hobject_t oid;
+      decode(oid, p_bl);
+      std::list<std::pair<uint64_t,ceph::buffer::list>> datalist;
+      __u32 nlist;
+      decode(nlist, p_bl);
+      while (nlist--) {
+       uint64_t result;
+       decode(result, p_bl);
+       ceph::buffer::list bl;
+       __u32 length;
+       decode(length, p_bl);
+       decode_nohead(length, bl, d_bl);
+       datalist.emplace_back(make_pair(result, bl));
+      }
+      buffers_read[oid] = datalist;
+    }
+  }
+  decode(attrs_read, p_bl);
+  decode(errors, p_bl);
+  DECODE_FINISH(p_bl);
 }
 
 std::ostream &operator<<(
index d0df1ad6fa15371b00eb07a2d58b89b6384e8a2a..147b56c0f0a7cf0fb201b627177a35cf445d6a46 100644 (file)
@@ -80,7 +80,12 @@ struct ECSubWrite {
     backfill_or_async_recovery = other.backfill_or_async_recovery;
   }
   void encode(ceph::buffer::list &bl) const;
+  void encode(ceph::buffer::list &p_bl,
+             ceph::buffer::list &d_bll,
+             uint64_t features=0) const;
   void decode(ceph::buffer::list::const_iterator &bl);
+  void decode(ceph::buffer::list::const_iterator &p_bl,
+             ceph::buffer::list::const_iterator &d_bl);
   void dump(ceph::Formatter *f) const;
   static void generate_test_instances(std::list<ECSubWrite*>& o);
 private:
@@ -88,6 +93,7 @@ private:
   ECSubWrite(ECSubWrite& other);
   const ECSubWrite& operator=(const ECSubWrite& other);
 };
+
 WRITE_CLASS_ENCODER(ECSubWrite)
 
 struct ECSubWriteReply {
@@ -124,7 +130,12 @@ struct ECSubReadReply {
   std::map<hobject_t, std::map<std::string, ceph::buffer::list, std::less<>>> attrs_read;
   std::map<hobject_t, int> errors;
   void encode(ceph::buffer::list &bl) const;
+  void encode(ceph::buffer::list &p_bl,
+             ceph::buffer::list &d_pl,
+             uint64_t features=0) const;
   void decode(ceph::buffer::list::const_iterator &bl);
+  void decode(ceph::buffer::list::const_iterator &p_bl,
+             ceph::buffer::list::const_iterator &d_pl);
   void dump(ceph::Formatter *f) const;
   static void generate_test_instances(std::list<ECSubReadReply*>& o);
 };
index ac34259d6ab9b3c3ad0c4c785d5d7a7782ab5053..e920bfc04956e52ec4dd20405c88972546a0d718 100644 (file)
@@ -899,7 +899,7 @@ void ReplicatedBackend::_do_push(OpRequestRef op)
   op->mark_started();
 
   vector<PushReplyOp> replies;
-  ObjectStore::Transaction t;
+  ObjectStore::Transaction t{get_parent()->min_peer_features()};
   if (get_parent()->check_failsafe_full()) {
     dout(10) << __func__ << " Out of space (failsafe) processing push request." << dendl;
     ceph_abort();
@@ -984,7 +984,7 @@ void ReplicatedBackend::_do_pull_response(OpRequestRef op)
     ceph_abort();
   }
 
-  ObjectStore::Transaction t;
+  ObjectStore::Transaction t{get_parent()->min_peer_features()};
   list<pull_complete_info> to_continue;
   for (vector<PushOp>::const_iterator i = m->pushes.begin();
        i != m->pushes.end();
@@ -1090,8 +1090,15 @@ Message * ReplicatedBackend::generate_subop(
     ObjectStore::Transaction t;
     encode(t, wr->get_data());
   } else {
-    encode(op_t, wr->get_data());
-    wr->get_header().data_off = op_t.get_data_alignment();
+    bufferlist p, d;
+    op_t.encode(p, d, get_parent()->min_peer_features());
+    if (d.length() != 0) {
+      wr->set_txn_payload(p);
+      wr->set_data(d);
+    } else {
+      // Pre-tentacle format - everything in data
+      wr->set_data(p);
+    }
   }
 
   wr->logbl = log_entries;
@@ -1195,7 +1202,7 @@ void ReplicatedBackend::do_repop(OpRequestRef op)
 
   op->mark_started();
 
-  RepModifyRef rm(std::make_shared<RepModify>());
+  RepModifyRef rm(std::make_shared<RepModify>(get_parent()->min_peer_features()));
   rm->op = op;
   rm->ackerosd = ackerosd;
   rm->last_complete = get_info().last_complete;
@@ -1205,8 +1212,9 @@ void ReplicatedBackend::do_repop(OpRequestRef op)
   // shipped transaction and log entries
   vector<pg_log_entry_t> log;
 
-  auto p = const_cast<bufferlist&>(m->get_data()).cbegin();
-  decode(rm->opt, p);
+  auto p = const_cast<bufferlist&>(m->get_middle()).cbegin();
+  auto d = const_cast<bufferlist&>(m->get_data()).cbegin();
+  rm->opt.decode(m->get_middle().length() != 0 ?  p : d, d);
 
   if (m->new_temp_oid != hobject_t()) {
     dout(20) << __func__ << " start tracking temp " << m->new_temp_oid << dendl;
index dec7b8e34fce95aff7e80d60228bd25d68404a42..7415a40c32521855b755db4e0b3585229acc2139 100644 (file)
@@ -441,8 +441,12 @@ private:
 
     ObjectStore::Transaction opt, localt;
     
-    RepModify() : committed(false), ackerosd(-1),
-                 epoch_started(0) {}
+    RepModify(uint64_t features)
+      : committed(false),
+        ackerosd(-1),
+        epoch_started(0),
+        localt(features) {
+    }
   };
   typedef std::shared_ptr<RepModify> RepModifyRef;
 
index e2ce3b2ef08cb07a00c78de99b8e6135fdd108b8..e20be42983df701357ef319daa336a5ad1ca2a43 100644 (file)
@@ -40,6 +40,11 @@ class Transaction {
       ticks += a;
       count++;
     }
+    void reset()
+    {
+      ticks = 0;
+      count = 0;
+    }
   };
   static Tick write_ticks, setattr_ticks, omap_setkeys_ticks, omap_rmkey_ticks;
   static Tick encode_ticks, decode_ticks, iterate_ticks;
@@ -70,16 +75,18 @@ class Transaction {
     omap_rmkey_ticks.add(Cycles::rdtsc() - start_time);
   }
 
-  void apply_encode_decode() {
-    bufferlist bl;
+  void apply_encode_decode(bool new_format) {
+    bufferlist p_bl;
+    bufferlist d_bl;
     ObjectStore::Transaction d;
     uint64_t start_time = Cycles::rdtsc();
-    t.encode(bl);
+    t.encode(p_bl, new_format ? d_bl : p_bl);
     encode_ticks.add(Cycles::rdtsc() - start_time);
 
-    auto bliter = bl.cbegin();
+    auto p_bliter = p_bl.cbegin();
+    auto d_bliter = d_bl.cbegin();
     start_time = Cycles::rdtsc();
-    d.decode(bliter);
+    d.decode(p_bliter, new_format ? d_bliter : p_bliter);
     decode_ticks.add(Cycles::rdtsc() - start_time);
   }
 
@@ -135,6 +142,15 @@ class Transaction {
     cerr << " decode op: " << Cycles::to_microseconds(Transaction::decode_ticks.ticks) << "us count: " << Transaction::decode_ticks.count << std::endl;
     cerr << " iterate op: " << Cycles::to_microseconds(Transaction::iterate_ticks.ticks) << "us count: " << Transaction::iterate_ticks.count << std::endl;
   }
+  static void reset_stat() {
+    write_ticks.reset();
+    setattr_ticks.reset();
+    omap_setkeys_ticks.reset();
+    omap_rmkey_ticks.reset();
+    encode_ticks.reset();
+    decode_ticks.reset();
+    iterate_ticks.reset();
+  }
 };
 
 class PerfCase {
@@ -187,7 +203,7 @@ class PerfCase {
     data[info_info_attr] = generate_random(560, 1);
   }
 
-  uint64_t rados_write_4k(int times) {
+  uint64_t rados_write_4k(int times,bool new_format) {
     uint64_t ticks = 0;
     uint64_t len = Kib *4;
     for (int i = 0; i < times; i++) {
@@ -199,7 +215,7 @@ class PerfCase {
         t.write(cid, oid, 0, len, data["4k"]);
         t.setattr(cid, oid, attr, data[attr]);
         t.setattr(cid, oid, snapset_attr, data[snapset_attr]);
-        t.apply_encode_decode();
+        t.apply_encode_decode(new_format);
         t.apply_iterate();
         ticks += Cycles::rdtsc() - start_time;
       }
@@ -214,7 +230,7 @@ class PerfCase {
         t.omap_setkeys(meta_cid, pglog_oid, pglog_attrset);
         t.omap_setkeys(meta_cid, info_oid, info_attrset);
         t.omap_rmkey(meta_cid, pglog_oid, pglog_attr);
-        t.apply_encode_decode();
+        t.apply_encode_decode(new_format);
         t.apply_iterate();
         ticks += Cycles::rdtsc() - start_time;
       }
@@ -258,9 +274,15 @@ int main(int argc, char **argv)
 
   uint64_t times = atoi(args[0]);
   PerfCase c;
-  uint64_t ticks = c.rados_write_4k(times);
+  uint64_t ticks1 = c.rados_write_4k(times, false);
+  Transaction::dump_stat();
+  cerr << " Old format total rados op " << times << " run time " << Cycles::to_microseconds(ticks1) << "us." << std::endl;
+
+  Transaction::reset_stat();
+
+  uint64_t ticks2 = c.rados_write_4k(times, true);
   Transaction::dump_stat();
-  cerr << " Total rados op " << times << " run time " << Cycles::to_microseconds(ticks) << "us." << std::endl;
+  cerr << " New format total rados op " << times << " run time " << Cycles::to_microseconds(ticks2) << "us." << std::endl;
 
   return 0;
 }
index a2113addeb95ce1e5f911880a33191fa906ca69e..451ac434b527bd9be243e94ea0a066d46cd366ca 100644 (file)
@@ -211,3 +211,1100 @@ TEST(Transaction, GetNumBytesBenchCurrent)
 {
    bench_num_bytes(false);
 }
+
+/**
+ * create_pattern
+ *
+ * Fill bufferlist generating data from a seed value
+ */
+void create_pattern(bufferlist& bl, unsigned int seed, unsigned int length)
+{
+  ASSERT_TRUE(length % sizeof(int) == 0);
+  for (unsigned int i = 0; i < length / sizeof(int); i++) {
+    encode(seed, bl);
+    seed = (seed << 1) ^ seed;
+  }
+}
+
+/**
+ * check_pattern
+ *
+ * Validate bufferlist contents matches seed value
+ */
+void check_pattern(bufferlist& bl, unsigned int seed, unsigned int length)
+{
+  ceph::buffer::list::const_iterator p;
+  ASSERT_TRUE(length % sizeof(int) == 0);
+  p = bl.cbegin();
+  for (unsigned int i = 0; i < length / sizeof(int); i++) {
+    unsigned int j;
+    decode(j, p);
+    ASSERT_TRUE(j == seed);
+    seed = (seed << 1) ^ seed;
+  }
+}
+void check_pattern(bufferptr& bptr, unsigned int seed, unsigned int length)
+{
+  bufferlist bl;
+  bl.append(bptr);
+  check_pattern(bl, seed, length);
+}
+
+/**
+ * create_check_transaction1
+ *
+ * Construct/validate an instance of every type of Op in an ObjectStore:Transaction
+ */
+void create_check_transaction1(ObjectStore::Transaction& t_in, bool create, bool append)
+{
+  coll_t c1 = coll_t(spg_t(pg_t(0,111), shard_id_t::NO_SHARD));
+  coll_t c2 = coll_t(spg_t(pg_t(0,111), shard_id_t::NO_SHARD));
+  ghobject_t o1 = ghobject_t(hobject_t(sobject_t("testobject1", CEPH_NOSNAP)));
+  ghobject_t o2 = ghobject_t(hobject_t(sobject_t("testobject2", CEPH_NOSNAP)));
+  bufferlist bl1;
+  bufferlist bl2;
+
+  const unsigned int bl1_seed = 1234;
+  const unsigned int bl1_len = 1024;
+  const unsigned int bl2_seed = 6666;
+  const unsigned int bl2_len = 128;
+
+  create_pattern(bl1, bl1_seed, bl1_len);
+  create_pattern(bl2, bl2_seed, bl2_len);
+
+  ObjectStore::Transaction::iterator i = t_in.begin();
+  ObjectStore::Transaction::Op *op = nullptr;
+
+  bool done = false;
+  for (int pos = 0; !done ; ++pos) {
+    ObjectStore::Transaction t_append;
+    ObjectStore::Transaction& t = append ? t_append : t_in;
+    if (!create) {
+      ASSERT_TRUE(i.have_op());
+      op = i.decode_op();
+      cout << " Checking pos " << pos << " op " << op->op << std::endl;
+    }
+    switch (pos) {
+    case 0:
+      // NOP
+      if (create) {
+        t.nop();
+      }else{
+        ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_NOP);
+      }
+      break;
+    case 1:
+      // CREATE
+      if (create) {
+        t.create(c1, o1);
+      }else{
+        ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_CREATE);
+        ASSERT_TRUE(c1 == i.get_cid(op->cid));
+       ASSERT_TRUE(o1 == i.get_oid(op->oid));
+      }
+      break;
+    case 2:
+      // TOUCH
+      if (create) {
+        t.touch(c2, o2);
+      }else{
+        ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_TOUCH);
+        ASSERT_TRUE(c2 == i.get_cid(op->cid));
+       ASSERT_TRUE(o2 == i.get_oid(op->oid));
+      }
+      break;
+    case 3:
+      // WRITE
+      if (create) {
+        t.write(c1, o1, 0, bl1_len, bl1);
+      }else{
+        bufferlist bl;
+        ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_WRITE);
+        ASSERT_TRUE(c1 == i.get_cid(op->cid));
+       ASSERT_TRUE(o1 == i.get_oid(op->oid));
+        ASSERT_TRUE(op->off == 0);
+        ASSERT_TRUE(op->len == 1024);
+        i.decode_bl(bl);
+       ASSERT_TRUE(bl.length() == op->len);
+        check_pattern(bl, bl1_seed, bl1_len);
+      }
+      break;
+    case 4:
+      // ZERO
+      if (create) {
+        t.zero(c2, o2, 1111, 2222);
+      }else{
+        ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_ZERO);
+        ASSERT_TRUE(c2 == i.get_cid(op->cid));
+       ASSERT_TRUE(o2 == i.get_oid(op->oid));
+        ASSERT_TRUE(op->off == 1111);
+        ASSERT_TRUE(op->len == 2222);
+      }
+      break;
+    case 5:
+      // TRUNCATE
+      if (create) {
+        t.truncate(c1, o1, 3333);
+      }else{
+        ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_TRUNCATE);
+        ASSERT_TRUE(c1 == i.get_cid(op->cid));
+       ASSERT_TRUE(o1 == i.get_oid(op->oid));
+        ASSERT_TRUE(op->off = 3333);
+      }
+      break;
+    case 6:
+      // REMOVE
+      if (create) {
+        t.remove(c2, o2);
+      }else{
+        ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_REMOVE);
+        ASSERT_TRUE(c2 == i.get_cid(op->cid));
+       ASSERT_TRUE(o2 == i.get_oid(op->oid));
+      }
+      break;
+    case 7:
+      // SETATTR (1)
+      if (create) {
+        t.setattr(c1, o1, "attr1", bl2);
+      }else{
+        ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_SETATTR);
+        ASSERT_TRUE(c1 == i.get_cid(op->cid));
+       ASSERT_TRUE(o1 == i.get_oid(op->oid));
+        string name = i.decode_string();
+        bufferlist bl;
+        i.decode_bl(bl);
+        ASSERT_TRUE(name == "attr1");
+       check_pattern(bl, bl2_seed, bl2_len);
+      }
+      break;
+    case 8:
+      // SETATTR (2)
+      if (create) {
+        t.setattr(c2, o2, std::string("attr2"), bl2);
+      }else{
+        ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_SETATTR);
+        ASSERT_TRUE(c2 == i.get_cid(op->cid));
+       ASSERT_TRUE(o2 == i.get_oid(op->oid));
+        string name = i.decode_string();
+        bufferlist bl;
+        i.decode_bl(bl);
+        ASSERT_TRUE(name == "attr2");
+       check_pattern(bl, bl2_seed, bl2_len);
+      }
+      break;
+    case 9:
+      // SETATTRS (1)
+      if (create) {
+        map<string,bufferptr,less<>> m;
+        m["a"] = buffer::copy("this", 4);
+        m["b"] = buffer::copy("that", 4);
+        t.setattrs(c1, o1, m);
+      }else{
+        ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_SETATTRS);
+        ASSERT_TRUE(c1 == i.get_cid(op->cid));
+       ASSERT_TRUE(o1 == i.get_oid(op->oid));
+        map<string, bufferptr> aset;
+        i.decode_attrset(aset);
+       ASSERT_TRUE(aset.size()==2);
+       auto a = aset.find("a");
+       ASSERT_TRUE(a != aset.end());
+       ASSERT_TRUE(!a->second.cmp(buffer::copy("this",4)));
+       auto b = aset.find("b");
+       ASSERT_TRUE(b != aset.end());
+       ASSERT_TRUE(!b->second.cmp(buffer::copy("that",4)));
+      }
+      break;
+    case 10:
+      // SETATTRS (2)
+      if (create) {
+        map<string,bufferlist,less<>> m;
+        bufferlist bflip, bflop;
+        bflip.append(buffer::copy("flip",4));
+        bflop.append(buffer::copy("flop",4));
+        m["a"] = bflip;
+        m["b"] = bflop;
+        t.setattrs(c2, o2, m);
+      }else{
+        ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_SETATTRS);
+        ASSERT_TRUE(c2 == i.get_cid(op->cid));
+       ASSERT_TRUE(o2 == i.get_oid(op->oid));
+        map<string, bufferptr> aset;
+        i.decode_attrset(aset);
+       ASSERT_TRUE(aset.size()==2);
+       auto a = aset.find("a");
+       ASSERT_TRUE(a != aset.end());
+       ASSERT_TRUE(!a->second.cmp(buffer::copy("flip",4)));
+       auto b = aset.find("b");
+       ASSERT_TRUE(b != aset.end());
+       ASSERT_TRUE(!b->second.cmp(buffer::copy("flop",4)));
+      }
+      break;
+    case 11:
+      // RMATTR (1)
+      if (create) {
+        t.rmattr(c1, o1, "attr3");
+      }else{
+        ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_RMATTR);
+        ASSERT_TRUE(c1 == i.get_cid(op->cid));
+       ASSERT_TRUE(o1 == i.get_oid(op->oid));
+        string name = i.decode_string();
+        ASSERT_TRUE(name == "attr3");
+      }
+      break;
+    case 12:
+      // RMATTR (2)
+      if (create) {
+        t.rmattr(c2, o2, std::string("attr4"));
+      }else{
+        ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_RMATTR);
+        ASSERT_TRUE(c2 == i.get_cid(op->cid));
+       ASSERT_TRUE(o2 == i.get_oid(op->oid));
+        string name = i.decode_string();
+        ASSERT_TRUE(name == "attr4");
+      }
+      break;
+    case 13:
+      // RMATTRS
+      if (create) {
+        t.rmattrs(c1, o1);
+      }else{
+        ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_RMATTRS);
+        ASSERT_TRUE(c1 == i.get_cid(op->cid));
+       ASSERT_TRUE(o1 == i.get_oid(op->oid));
+      }
+      break;
+    case 14:
+      // CLONE
+      if (create) {
+        t.clone(c2, o2, o1);
+      }else{
+        ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_CLONE);
+        ASSERT_TRUE(c2 == i.get_cid(op->cid));
+       ASSERT_TRUE(o2 == i.get_oid(op->oid));
+       ASSERT_TRUE(o1 == i.get_oid(op->dest_oid));
+      }
+      break;
+    case 15:
+      // CLONERANGE2
+      if (create) {
+        t.clone_range(c1, o2, o1, 4444, 5555, 6666);
+      }else{
+        ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_CLONERANGE2);
+        ASSERT_TRUE(c1 == i.get_cid(op->cid));
+       ASSERT_TRUE(o2 == i.get_oid(op->oid));
+       ASSERT_TRUE(o1 == i.get_oid(op->dest_oid));
+        ASSERT_TRUE(op->off == 4444);
+        ASSERT_TRUE(op->len == 5555);
+        ASSERT_TRUE(op->dest_off == 6666);
+      }
+      break;
+    case 16:
+      // MKCOLL
+      if (create) {
+        t.create_collection(c2, 7777);
+      }else{
+        ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_MKCOLL);
+        ASSERT_TRUE(c2 == i.get_cid(op->cid));
+        ASSERT_TRUE(op->split_bits == 7777);
+      }
+      break;
+    case 17:
+      // COLL_HINT
+      if (create) {
+        t.collection_hint(c1, 8888, bl2);
+      }else{
+        ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_COLL_HINT);
+        ASSERT_TRUE(c1 == i.get_cid(op->cid));
+        ASSERT_TRUE(op->hint == 8888);
+        bufferlist bl;
+        i.decode_bl(bl);
+       check_pattern(bl, bl2_seed, bl2_len);
+      }
+      break;
+    case 18:
+      // RMCOLL
+      if (create) {
+        t.remove_collection(c1);
+      }else{
+        ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_RMCOLL);
+        ASSERT_TRUE(c1 == i.get_cid(op->cid));
+      }
+      break;
+    case 19:
+      // COLL_MOVE_RENAME
+      if (create) {
+        t.collection_move_rename(c2, o2, c1, o1);
+      }else{
+        ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_COLL_MOVE_RENAME);
+        ASSERT_TRUE(c2 == i.get_cid(op->cid));
+       ASSERT_TRUE(o2 == i.get_oid(op->oid));
+        ASSERT_TRUE(c1 == i.get_cid(op->dest_cid));
+       ASSERT_TRUE(o1 == i.get_oid(op->dest_oid));
+      }
+      break;
+    case 20:
+      // TRY_RENAME
+      if (create) {
+        t.try_rename(c2, o2, o1);
+      }else{
+        ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_TRY_RENAME);
+        ASSERT_TRUE(c2 == i.get_cid(op->cid));
+       ASSERT_TRUE(o2 == i.get_oid(op->oid));
+       ASSERT_TRUE(o1 == i.get_oid(op->dest_oid));
+      }
+      break;
+    case 21:
+      // OMAP_CLEAR
+      if (create) {
+        t.omap_clear(c1, o1);
+      }else{
+        ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_OMAP_CLEAR);
+        ASSERT_TRUE(c1 == i.get_cid(op->cid));
+       ASSERT_TRUE(o1 == i.get_oid(op->oid));
+      }
+      break;
+    case 22:
+      // OMAP_SETKEYS (1)
+      if (create) {
+        map<string,bufferlist> m;
+        bufferlist bthis, bthat;
+        bthis.append(buffer::copy("this",4));
+        bthat.append(buffer::copy("that",4));
+        m["a"] = bthis;
+        m["b"] = bthat;
+        t.omap_setkeys(c2, o2, m);
+      }else{
+        ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_OMAP_SETKEYS);
+        ASSERT_TRUE(c2 == i.get_cid(op->cid));
+       ASSERT_TRUE(o2 == i.get_oid(op->oid));
+       map<string,bufferlist> aset;
+        i.decode_attrset(aset);
+       ASSERT_TRUE(aset.size()==2);
+       auto a = aset.find("a");
+       ASSERT_TRUE(a != aset.end());
+       ASSERT_TRUE(a->second.contents_equal("this",4));
+       auto b = aset.find("b");
+       ASSERT_TRUE(b != aset.end());
+       ASSERT_TRUE(b->second.contents_equal("that",4));
+      }
+      break;
+    case 23:
+      // OMAP_SETKEYS (2)
+      if (create) {
+        map<string,bufferlist> m;
+        bufferlist bthis, bthat;
+        bthis.append(buffer::copy("this",4));
+        bthat.append(buffer::copy("that",4));
+        m["a"] = bthis;
+        m["b"] = bthat;
+        bufferlist bkeys;
+        encode(m,bkeys);
+        t.omap_setkeys(c1, o1, bkeys);
+      }else{
+        ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_OMAP_SETKEYS);
+        ASSERT_TRUE(c1 == i.get_cid(op->cid));
+       ASSERT_TRUE(o1 == i.get_oid(op->oid));
+       map<string,bufferlist> aset;
+        i.decode_attrset(aset);
+       ASSERT_TRUE(aset.size()==2);
+       auto a = aset.find("a");
+       ASSERT_TRUE(a != aset.end());
+       ASSERT_TRUE(a->second.contents_equal("this",4));
+       auto b = aset.find("b");
+       ASSERT_TRUE(b != aset.end());
+       ASSERT_TRUE(b->second.contents_equal("that",4));
+      }
+      break;
+    case 24:
+      // OMAP_RMKEYS (1)
+      if (create) {
+        std::set<std::string> keys;
+        keys.insert(std::string("attr7"));
+        t.omap_rmkeys(c2, o2, keys);
+      }else{
+        ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_OMAP_RMKEYS);
+        ASSERT_TRUE(c2 == i.get_cid(op->cid));
+       ASSERT_TRUE(o2 == i.get_oid(op->oid));
+       bufferlist keys_bl;
+       i.decode_keyset_bl(&keys_bl);
+        std::set<std::string> keys;
+       decode(keys,keys_bl);
+       ASSERT_TRUE(keys.size() == 1);
+       for(auto& str: keys) {
+         ASSERT_TRUE(str == "attr7");
+       }
+      }
+      break;
+    case 25:
+      // OMAP_RMKEYS (2)
+      if (create) {
+        t.omap_rmkey(c1, o1, std::string("attr8"));
+      }else{
+        ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_OMAP_RMKEYS);
+        ASSERT_TRUE(c1 == i.get_cid(op->cid));
+       ASSERT_TRUE(o1 == i.get_oid(op->oid));
+       bufferlist keys_bl;
+       i.decode_keyset_bl(&keys_bl);
+        std::set<std::string> keys;
+       decode(keys,keys_bl);
+       ASSERT_TRUE(keys.size() == 1);
+       for(auto& str: keys) {
+         ASSERT_TRUE(str == "attr8");
+       }
+      }
+      break;
+    case 26:
+      // OMAP_RMKEYS (3)
+      if (create) {
+        std::set<std::string> keys;
+        keys.insert(std::string("attr9"));
+        bufferlist bkeys;
+        encode(keys,bkeys);
+        t.omap_rmkeys(c2, o2, bkeys);
+      }else{
+        ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_OMAP_RMKEYS);
+        ASSERT_TRUE(c2 == i.get_cid(op->cid));
+       ASSERT_TRUE(o2 == i.get_oid(op->oid));
+       bufferlist keys_bl;
+       i.decode_keyset_bl(&keys_bl);
+        std::set<std::string> keys;
+       decode(keys,keys_bl);
+       ASSERT_TRUE(keys.size() == 1);
+       for(auto& str: keys) {
+         ASSERT_TRUE(str == "attr9");
+       }
+      }
+      break;
+    case 27:
+      // OMAP_RMKEYRANGE (1)
+      if (create) {
+        t.omap_rmkeyrange(c1, o1, std::string("attr1"), std::string("attr2"));
+      }else{
+        ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_OMAP_RMKEYRANGE);
+        ASSERT_TRUE(c1 == i.get_cid(op->cid));
+       ASSERT_TRUE(o1 == i.get_oid(op->oid));
+        string first, last;
+        first = i.decode_string();
+        last = i.decode_string();
+        ASSERT_TRUE(first == "attr1");
+        ASSERT_TRUE(last == "attr2");
+      }
+      break;
+    case 28:
+      // OMAP_RMKEYRANGE (2)
+      if (create) {
+        bufferlist brange;
+        encode(std::string("attr3"),brange);
+        encode(std::string("attr4"),brange);
+        t.omap_rmkeyrange(c2, o2, brange);
+      }else{
+        ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_OMAP_RMKEYRANGE);
+        ASSERT_TRUE(c2 == i.get_cid(op->cid));
+       ASSERT_TRUE(o2 == i.get_oid(op->oid));
+        string first, last;
+        first = i.decode_string();
+        last = i.decode_string();
+        ASSERT_TRUE(first == "attr3");
+        ASSERT_TRUE(last == "attr4");
+      }
+      break;
+    case 29:
+      // OMAP_SETHEADER
+      if (create) {
+        t.omap_setheader(c1, o1, bl1);
+      }else{
+        ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_OMAP_SETHEADER);
+        ASSERT_TRUE(c1 == i.get_cid(op->cid));
+       ASSERT_TRUE(o1 == i.get_oid(op->oid));
+      }
+      break;
+    case 30:
+      // SPLIT_COLLECTION
+      if (create) {
+        t.split_collection(c2, 9999, 1000, c1);
+      }else{
+        ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_SPLIT_COLLECTION2);
+        ASSERT_TRUE(c2 == i.get_cid(op->cid));
+        ASSERT_TRUE(c1 == i.get_cid(op->dest_cid));
+        ASSERT_TRUE(op->split_bits == 9999);
+        ASSERT_TRUE(op->split_rem == 1000);
+      }
+      break;
+    case 31:
+      // MERGE_COLLECTION
+      if (create) {
+        t.merge_collection(c2, c1, 1100);
+      }else{
+        ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_MERGE_COLLECTION);
+        ASSERT_TRUE(c2 == i.get_cid(op->cid));
+        ASSERT_TRUE(c1 == i.get_cid(op->dest_cid));
+        ASSERT_TRUE(op->split_bits == 1100);
+      }
+      break;
+    case 32:
+      // SET_BITS
+      if (create) {
+        t.collection_set_bits(c2, 1200);
+      }else{
+        ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_COLL_SET_BITS);
+        ASSERT_TRUE(c2 == i.get_cid(op->cid));
+        ASSERT_TRUE(op->split_bits == 1200);
+      }
+      break;
+    case 33:
+      // SET_ALLOCHINT
+      if (create) {
+        t.set_alloc_hint(c1, o1, 1300, 1400, 1500);
+      }else{
+        ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_SETALLOCHINT);
+        ASSERT_TRUE(c1 == i.get_cid(op->cid));
+       ASSERT_TRUE(o1 == i.get_oid(op->oid));
+        ASSERT_TRUE(op->expected_object_size == 1300);
+        ASSERT_TRUE(op->expected_write_size == 1400);
+        ASSERT_TRUE(op->hint == 1500);
+      }
+      // End of cases
+      done = true;
+      break;
+    }
+    if (append) {
+      t_in.append(t_append);
+    }
+  }
+  if (!create) {
+    ASSERT_TRUE(!i.have_op());
+  }
+}
+
+void create_transaction1(ObjectStore::Transaction& t)
+{
+  create_check_transaction1(t, true, false);
+}
+
+void create_transaction1_using_append(ObjectStore::Transaction& t)
+{
+  create_check_transaction1(t, true, true);
+}
+
+void check_content_transaction1(ObjectStore::Transaction& t)
+{
+  create_check_transaction1(t, false, false);
+}
+
+TEST(Transaction, EncodeDecode)
+{
+  ObjectStore::Transaction source;
+  bufferlist encoded1;
+  bufferlist encoded2p;
+  bufferlist encoded2d;
+  ObjectStore::Transaction decode1;
+  ObjectStore::Transaction decode2;
+  ceph::buffer::list::const_iterator p;
+  ceph::buffer::list::const_iterator d;
+
+  cout << "Creating transaction1" << std::endl;
+  create_transaction1(source);
+  cout << "Checking transaction1" << std::endl;
+  check_content_transaction1(source);
+
+  encode(source, encoded1);
+  p = encoded1.cbegin();
+  decode(decode1, p);
+  cout << "Checking encoded/decoded with 1 buffer transaction1" << std::endl;
+  check_content_transaction1(decode1);
+  source.encode(encoded2p, encoded2d, CEPH_FEATUREMASK_SERVER_TENTACLE);
+  p = encoded2p.cbegin();
+  d = encoded2d.cbegin();
+  decode2.decode(p, d);
+  cout << "Checking encoded/decoded with 2 buffers transaction1" << std::endl;
+  check_content_transaction1(decode2);
+}
+
+/**
+ * create_check_transaction2
+ *
+ * Construct/validate write Ops with different lengths and alignements
+ * in an ObejctStore::Transaction
+ */
+void create_check_transaction2(ObjectStore::Transaction& t,bool create)
+{
+  coll_t c = coll_t();
+  ghobject_t o1 = ghobject_t(hobject_t(sobject_t("testobject", CEPH_NOSNAP)));
+
+  bufferlist blshort;
+  bufferlist blpage;
+  bufferlist blfewpages;
+  bufferlist bllong;
+
+  // Less than 1 page
+  const unsigned int blshort_seed = 4567;
+  const unsigned int blshort_len = CEPH_PAGE_SIZE/4;
+  // 1 page
+  const unsigned int blpage_seed = 7654;
+  const unsigned int blpage_len = CEPH_PAGE_SIZE;
+  // Whole number of pages
+  const unsigned int blfewpages_seed = 1357;
+  const unsigned int blfewpages_len = CEPH_PAGE_SIZE*3;
+  // 1.5 pages
+  const unsigned int bllong_seed = 9876;
+  const unsigned int bllong_len = (CEPH_PAGE_SIZE*3)/2;
+
+  create_pattern(blshort,blshort_seed,blshort_len);
+  create_pattern(blpage,blpage_seed,blpage_len);
+  create_pattern(blfewpages,blfewpages_seed,blfewpages_len);
+  create_pattern(bllong,bllong_seed,bllong_len);
+
+  ObjectStore::Transaction::iterator i = t.begin();
+  ObjectStore::Transaction::Op *op = nullptr;
+
+  bool done = false;
+  for (int pos = 0; !done ; ++pos) {
+    bufferlist bl;
+    if (!create) {
+      ASSERT_TRUE(i.have_op());
+      op = i.decode_op();
+      cout << " Checking pos " << pos << " off " << op->off << " len " << op->len << std::endl;
+      ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_WRITE);
+      i.decode_bl(bl);
+      ASSERT_TRUE(bl.length() == op->len);
+    }
+    switch (pos) {
+    case 0:
+      // Short buffer, PAGE aligned offset
+      if (create) {
+        t.write(c, o1, 0, blshort_len, blshort);
+      }else{
+        ASSERT_TRUE(op->off == 0);
+        ASSERT_TRUE(op->len == blshort_len);
+        check_pattern(bl, blshort_seed, blshort_len);
+      }
+      break;
+    case 1:
+      // Short buffer, straddle PAGE offset
+      if (create) {
+        t.write(c, o1, CEPH_PAGE_SIZE-10, blshort_len, blshort);
+      }else{
+        ASSERT_TRUE(op->off == CEPH_PAGE_SIZE-10);
+        ASSERT_TRUE(op->len == blshort_len);
+        check_pattern(bl, blshort_seed, blshort_len);
+      }
+      break;
+    case 2:
+      // Short buffer, end of buffer PAGE aligned
+      if (create) {
+        t.write(c, o1, CEPH_PAGE_SIZE-blshort_len, blshort_len, blshort);
+      }else{
+        ASSERT_TRUE(op->off == CEPH_PAGE_SIZE-blshort_len);
+        ASSERT_TRUE(op->len == blshort_len);
+        check_pattern(bl, blshort_seed, blshort_len);
+      }
+      break;
+    case 3:
+      // Page buffer, PAGE aligned offset
+      if (create) {
+        t.write(c, o1, CEPH_PAGE_SIZE*3, blpage_len, blpage);
+      }else{
+        ASSERT_TRUE(op->off == CEPH_PAGE_SIZE*3);
+        ASSERT_TRUE(op->len == blpage_len);
+        check_pattern(bl, blpage_seed, blpage_len);
+      }
+      break;
+    case 4:
+      // Page buffer, misaligned offset
+      if (create) {
+        t.write(c, o1, CEPH_PAGE_SIZE-10, blpage_len, blpage);
+      }else{
+        ASSERT_TRUE(op->off == CEPH_PAGE_SIZE-10);
+        ASSERT_TRUE(op->len == blpage_len);
+        check_pattern(bl, blpage_seed, blpage_len);
+      }
+      break;
+    case 5:
+      // Multiple page buffer, PAGE aligned offset
+      if (create) {
+        t.write(c, o1, CEPH_PAGE_SIZE*7, blfewpages_len, blfewpages);
+      }else{
+        ASSERT_TRUE(op->off == CEPH_PAGE_SIZE*7);
+        ASSERT_TRUE(op->len == blfewpages_len);
+        check_pattern(bl, blfewpages_seed, blfewpages_len);
+      }
+      break;
+    case 6:
+      // Multiple page buffer, misaligned offset
+      if (create) {
+        t.write(c, o1, CEPH_PAGE_SIZE-10, blfewpages_len, blfewpages);
+      }else{
+        ASSERT_TRUE(op->off == CEPH_PAGE_SIZE-10);
+        ASSERT_TRUE(op->len == blfewpages_len);
+        check_pattern(bl, blfewpages_seed, blfewpages_len);
+      }
+      break;
+    case 7:
+      // Long buffer, PAGE aligned offset
+      if (create) {
+        t.write(c, o1, 0, bllong_len, bllong);
+      }else{
+        ASSERT_TRUE(op->off == 0);
+        ASSERT_TRUE(op->len == bllong_len);
+        check_pattern(bl, bllong_seed, bllong_len);
+      }
+      break;
+    case 8:
+      // Long buffer, misaligned offset
+      if (create) {
+        t.write(c, o1, CEPH_PAGE_SIZE-10, bllong_len, bllong);
+      }else{
+        ASSERT_TRUE(op->off == CEPH_PAGE_SIZE-10);
+        ASSERT_TRUE(op->len == bllong_len);
+        check_pattern(bl, bllong_seed, bllong_len);
+      }
+      break;
+    case 9:
+      // Long buffer, end of buffer PAGE aligned
+      if (create) {
+        t.write(c, o1, 100*CEPH_PAGE_SIZE-bllong_len, bllong_len, bllong);
+      }else{
+        ASSERT_TRUE(op->off == 100*CEPH_PAGE_SIZE-bllong_len);
+        ASSERT_TRUE(op->len == bllong_len);
+        check_pattern(bl, bllong_seed, bllong_len);
+      }
+      // Last op
+      done = true;
+      break;
+    }
+  }
+  if (!create) {
+    ASSERT_TRUE(!i.have_op());
+  }
+}
+
+void create_transaction2(ObjectStore::Transaction& t)
+{
+  create_check_transaction2(t, true);
+}
+
+void check_content_transaction2(ObjectStore::Transaction& t)
+{
+  create_check_transaction2(t, false);
+}
+
+/**
+ * check_alignment_transaction2
+ *
+ * Validate alignment of write Op data buffers in an ObjectStore::Transaction
+ */
+void check_alignment_transaction2(ObjectStore::Transaction& t)
+{
+  ObjectStore::Transaction::iterator i = t.begin();
+  unsigned int longest_aligned = 0;
+  unsigned int longest_misaligned = 0;
+  unsigned int quantity_aligned = 0;
+  unsigned int quantity_misaligned = 0;
+  while (i.have_op()) {
+    ObjectStore::Transaction::Op *op = i.decode_op();
+    ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_WRITE);
+    bufferlist bl;
+    i.decode_bl(bl);
+    auto off = op->off;
+    auto len = bl.length();
+    for (const auto& bptr : bl.buffers()) {
+      cout << " Checking off " << off << " len " << bptr.length() << " ptr " << (void*)bptr.c_str() << std::endl;
+      if (off % CEPH_PAGE_SIZE) {
+        unsigned int align = (0-off) & ~CEPH_PAGE_MASK;
+        if (bptr.length() >= CEPH_PAGE_SIZE+align) {
+         cout << "  Should have been split" << std::endl;
+         if (longest_misaligned < bptr.length()-align) {
+           longest_misaligned = bptr.length()-align;
+         }
+         ASSERT_TRUE(false);
+       }else{
+         cout << "  OK - Sub PAGE misaligned offset" << std::endl;
+       }
+       quantity_misaligned += bptr.length();
+      }else{
+       if (bptr.length() >= CEPH_PAGE_SIZE) {
+         if (!bptr.is_aligned(CEPH_PAGE_SIZE)) {
+           cout << "  Should have been aligned" << std::endl;
+            if (longest_misaligned < bptr.length()) {
+              longest_misaligned = bptr.length();
+           }
+            quantity_misaligned += bptr.length();
+            ASSERT_TRUE(false);
+         }else{
+           cout << "  Good alignment" << std::endl;
+            if (longest_aligned < bptr.length()) {
+              longest_aligned = bptr.length();
+           }
+            quantity_aligned += bptr.length();
+         }
+       }else{
+         cout << "  OK - Sub PAGE aligned offset" << std::endl;
+          quantity_misaligned += bptr.length();
+       }
+      }
+      off = off + bptr.length();
+      len = len - bptr.length();
+    }
+  }
+  ASSERT_TRUE(longest_aligned>=longest_misaligned);
+  cout << " Longest segment is aligned" << std::endl;
+  cout << " Bytes aligned "<< quantity_aligned << " Bytes misaligned " << quantity_misaligned << std::endl;
+}
+
+TEST(Transaction, WriteBufferAlignment)
+{
+  ObjectStore::Transaction source{CEPH_FEATURES_ALL};
+  bufferlist encoded1;
+  bufferlist encoded2p;
+  bufferlist encoded2d;
+  ObjectStore::Transaction decode1;
+  ObjectStore::Transaction decode2;
+  ObjectStore::Transaction decode3;
+  ceph::buffer::list::const_iterator p;
+  ceph::buffer::list::const_iterator d;
+
+  cout << "Creating transaction2" << std::endl;
+  create_transaction2(source);
+  cout << "Checking transaction2" << std::endl;
+  check_content_transaction2(source);
+#if 0
+  encode(source, encoded1);
+  p = encoded1.cbegin();
+  decode(decode1, p);
+  cout << "Checking encoded/decoded with 1 buffer transaction2" << std::endl;
+  check_content_transaction2(decode1);
+#endif
+
+  source.encode(encoded2p, encoded2d, CEPH_FEATUREMASK_SERVER_TENTACLE);
+  p = encoded2p.cbegin();
+  d = encoded2d.cbegin();
+  decode2.decode(p, d);
+  cout << "Checking encoded/decoded with 2 buffers transaction2" << std::endl;
+  check_content_transaction2(decode2);
+
+  ceph::buffer::list alignedbuf;
+  ceph::bufferptr ptr(ceph::buffer::create_aligned(encoded2d.length(), CEPH_PAGE_SIZE));
+
+  alignedbuf.push_back(ptr);
+  encoded2d.begin().copy(encoded2d.length(), ptr.c_str());
+  p = encoded2p.cbegin();
+  d = alignedbuf.cbegin();
+  decode3.decode(p, d);
+  cout << "Checking alignment of transaction2" << std::endl;
+  check_alignment_transaction2(decode3);
+}
+
+/**
+ * create_check_transaction3
+ *
+ * Construct/validate an ObjectStore:Transaction for appends
+ */
+void create_check_transaction3(ObjectStore::Transaction& t, bool create, bool part1)
+{
+  coll_t c1 = coll_t(spg_t(pg_t(0,111), shard_id_t::NO_SHARD));
+  ghobject_t o1 = ghobject_t(hobject_t(sobject_t("testobject1", CEPH_NOSNAP)));
+  bufferlist bllong;
+
+  // 1.5 pages
+  const unsigned int bllong_seed = 9876;
+  const unsigned int bllong_len = (CEPH_PAGE_SIZE*3)/2;
+  create_pattern(bllong,bllong_seed,bllong_len);
+
+  if (create) {
+    if (part1) {
+      //Part 1
+      t.create(c1, o1);
+    }else{
+      //Part 2
+      t.zero(c1, o1, 1111, 2222);
+      t.write(c1, o1, 0, bllong_len, bllong);
+    }
+  }else{
+    ObjectStore::Transaction::iterator i = t.begin();
+    ObjectStore::Transaction::Op *op;
+
+    //Part 1
+    ASSERT_TRUE(i.have_op());
+    op = i.decode_op();
+    ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_CREATE);
+    ASSERT_TRUE(c1 == i.get_cid(op->cid));
+    ASSERT_TRUE(o1 == i.get_oid(op->oid));
+    //Part 2
+    ASSERT_TRUE(i.have_op());
+    op = i.decode_op();
+    ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_ZERO);
+    ASSERT_TRUE(c1 == i.get_cid(op->cid));
+    ASSERT_TRUE(o1 == i.get_oid(op->oid));
+    ASSERT_TRUE(op->off == 1111);
+    ASSERT_TRUE(op->len == 2222);
+    op = i.decode_op();
+    ASSERT_TRUE(op->op == ObjectStore::Transaction::OP_WRITE);
+    ASSERT_TRUE(c1 == i.get_cid(op->cid));
+    ASSERT_TRUE(o1 == i.get_oid(op->oid));
+    ASSERT_TRUE(op->off == 0);
+    ASSERT_TRUE(op->len == bllong_len);
+    bufferlist bl;
+    i.decode_bl(bl);
+    ASSERT_TRUE(bl.length() == bllong_len);
+    check_pattern(bl, bllong_seed, bllong_len);
+    ASSERT_FALSE(i.have_op());
+  }
+}
+
+void create_transaction3_part1(ObjectStore::Transaction& t)
+{
+  create_check_transaction3(t, true, true);
+}
+
+void create_transaction3_part2(ObjectStore::Transaction& t)
+{
+  create_check_transaction3(t, true, false);
+}
+
+void check_content_transaction3(ObjectStore::Transaction& t)
+{
+  create_check_transaction3(t, false, false);
+}
+
+void transaction_to_old_format(ObjectStore::Transaction& source,ObjectStore::Transaction& dest)
+{
+  bufferlist encoded;
+  ceph::buffer::list::const_iterator p;
+  encode(source, encoded);
+  p = encoded.cbegin();
+  decode(dest, p);
+}
+
+void transaction_to_new_format(ObjectStore::Transaction& source,ObjectStore::Transaction& dest)
+{
+  bufferlist encodedp;
+  bufferlist encodedd;
+  ceph::buffer::list::const_iterator p;
+  ceph::buffer::list::const_iterator d;
+  source.encode(encodedp, encodedd, CEPH_FEATUREMASK_SERVER_TENTACLE);
+  p = encodedp.cbegin();
+  d = encodedd.cbegin();
+  dest.decode(p, d);
+}
+
+TEST(Transaction, AppendSimple) //FormatDualDual
+{
+  ObjectStore::Transaction t1;
+  ObjectStore::Transaction t2;
+
+  create_transaction3_part1(t1);
+  create_transaction3_part2(t2);
+  t1.append(t2);
+  check_content_transaction3(t1);
+}
+
+TEST(Transaction, AppendFormatDualNew)
+{
+  ObjectStore::Transaction t1_dual;
+  ObjectStore::Transaction t2_dual;;
+  ObjectStore::Transaction t2_new;
+
+  create_transaction3_part1(t1_dual);
+  create_transaction3_part2(t2_dual);
+  transaction_to_new_format(t2_dual,t2_new);
+  t1_dual.append(t2_new);
+  check_content_transaction3(t1_dual);
+}
+
+TEST(Transaction, AppendFormatDualOld)
+{
+  ObjectStore::Transaction t1_dual;
+  ObjectStore::Transaction t2_dual;;
+  ObjectStore::Transaction t2_old;
+
+  create_transaction3_part1(t1_dual);
+  create_transaction3_part2(t2_dual);
+  transaction_to_old_format(t2_dual,t2_old);
+  t1_dual.append(t2_old);
+  check_content_transaction3(t1_dual);
+}
+
+TEST(Transaction, AppendFormatNewDual)
+{
+  ObjectStore::Transaction t1_dual;
+  ObjectStore::Transaction t2_dual;;
+  ObjectStore::Transaction t1_new;
+
+  create_transaction3_part1(t1_dual);
+  create_transaction3_part2(t2_dual);
+  transaction_to_new_format(t1_dual,t1_new);
+  t1_new.append(t2_dual);
+  check_content_transaction3(t1_new);
+}
+
+TEST(Transaction, AppendFormatNewNew)
+{
+  ObjectStore::Transaction t1_dual;
+  ObjectStore::Transaction t2_dual;;
+  ObjectStore::Transaction t1_new;
+  ObjectStore::Transaction t2_new;
+
+  create_transaction3_part1(t1_dual);
+  create_transaction3_part2(t2_dual);
+  transaction_to_new_format(t1_dual,t1_new);
+  transaction_to_new_format(t2_dual,t2_new);
+  t1_new.append(t2_new);
+  check_content_transaction3(t1_new);
+}
+
+TEST(Transaction, AppendFormatOldDual)
+{
+  ObjectStore::Transaction t1_dual;
+  ObjectStore::Transaction t2_dual;;
+  ObjectStore::Transaction t1_old;
+
+  create_transaction3_part1(t1_dual);
+  create_transaction3_part2(t2_dual);
+  transaction_to_old_format(t1_dual,t1_old);
+  t1_old.append(t2_dual);
+  check_content_transaction3(t1_old);
+}
+
+TEST(Transaction, AppendFormatOldOld)
+{
+  ObjectStore::Transaction t1_dual;
+  ObjectStore::Transaction t2_dual;;
+  ObjectStore::Transaction t1_old;
+  ObjectStore::Transaction t2_old;
+
+  create_transaction3_part1(t1_dual);
+  create_transaction3_part2(t2_dual);
+  transaction_to_old_format(t1_dual,t1_old);
+  transaction_to_old_format(t2_dual,t2_old);
+  t1_old.append(t2_old);
+  check_content_transaction3(t1_old);
+}
+
+TEST(Transaction, AppendOpTypes)
+{
+  ObjectStore::Transaction source;
+  bufferlist encoded1;
+  bufferlist encoded2p;
+  bufferlist encoded2d;
+  ObjectStore::Transaction decode1;
+  ObjectStore::Transaction decode2;
+  ceph::buffer::list::const_iterator p;
+  ceph::buffer::list::const_iterator d;
+
+  cout << "Creating transaction1 using append" << std::endl;
+  create_transaction1_using_append(source);
+  cout << "Checking transaction1" << std::endl;
+  check_content_transaction1(source);
+
+  encode(source, encoded1);
+  p = encoded1.cbegin();
+  decode(decode1, p);
+  cout << "Checking encoded/decoded with 1 buffer transaction1" << std::endl;
+  check_content_transaction1(decode1);
+  source.encode(encoded2p, encoded2d, CEPH_FEATUREMASK_SERVER_TENTACLE);
+  p = encoded2p.cbegin();
+  d = encoded2d.cbegin();
+  decode2.decode(p, d);
+  cout << "Checking encoded/decoded with 2 buffers transaction1" << std::endl;
+  check_content_transaction1(decode2);
+}