]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/os: move more features from MemStore to CyanStore
authorKefu Chai <kchai@redhat.com>
Mon, 21 Jan 2019 11:24:19 +0000 (19:24 +0800)
committerKefu Chai <kchai@redhat.com>
Tue, 29 Jan 2019 09:58:18 +0000 (17:58 +0800)
Signed-off-by: Kefu Chai <kchai@redhat.com>
src/crimson/os/CMakeLists.txt
src/crimson/os/Transaction.cc [new file with mode: 0644]
src/crimson/os/Transaction.h [new file with mode: 0644]
src/crimson/os/cyan_collection.cc [new file with mode: 0644]
src/crimson/os/cyan_collection.h [new file with mode: 0644]
src/crimson/os/cyan_object.cc [new file with mode: 0644]
src/crimson/os/cyan_object.h [new file with mode: 0644]
src/crimson/os/cyan_store.cc
src/crimson/os/cyan_store.h
src/crimson/osd/osd.cc
src/crimson/osd/osd.h

index edc3f493605b82cb0772d05656455c151218f4c4..264e35252e3eedf1f19048344be5772126969491 100644 (file)
@@ -1,4 +1,7 @@
 add_library(crimson-os
-  cyan_store.cc)
+  cyan_store.cc
+  cyan_collection.cc
+  cyan_object.cc
+  Transaction.cc)
 target_link_libraries(crimson-os
-  fmt::fmt crimson-common)
+  crimson)
diff --git a/src/crimson/os/Transaction.cc b/src/crimson/os/Transaction.cc
new file mode 100644 (file)
index 0000000..acf99c5
--- /dev/null
@@ -0,0 +1,507 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "Transaction.h"
+#include "include/denc.h"
+
+namespace ceph::os
+{
+
+void Transaction::iterator::decode_attrset_bl(bufferlist *pbl)
+{
+  using ceph::decode;
+  auto start = data_bl_p;
+  __u32 n;
+  decode(n, data_bl_p);
+  unsigned len = 4;
+  while (n--) {
+    __u32 l;
+    decode(l, data_bl_p);
+    data_bl_p.advance(l);
+    len += 4 + l;
+    decode(l, data_bl_p);
+    data_bl_p.advance(l);
+    len += 4 + l;
+  }
+  start.copy(len, *pbl);  
+}
+
+void Transaction::iterator::decode_keyset_bl(bufferlist *pbl)
+{
+  using ceph::decode;
+  auto start = data_bl_p;
+  __u32 n;
+  decode(n, data_bl_p);
+  unsigned len = 4;
+  while (n--) {
+    __u32 l;
+    decode(l, data_bl_p);
+    data_bl_p.advance(l);
+    len += 4 + l;
+  }
+  start.copy(len, *pbl);
+}
+
+void Transaction::dump(ceph::Formatter *f)
+{
+  f->open_array_section("ops");
+  iterator i = begin();
+  int op_num = 0;
+  bool stop_looping = false;
+  while (i.have_op() && !stop_looping) {
+    Transaction::Op *op = i.decode_op();
+    f->open_object_section("op");
+    f->dump_int("op_num", op_num);
+    
+    switch (op->op) {
+    case Transaction::OP_NOP:
+      f->dump_string("op_name", "nop");
+      break;
+    case Transaction::OP_TOUCH:
+      {
+        coll_t cid = i.get_cid(op->cid);
+        ghobject_t oid = i.get_oid(op->oid);
+        f->dump_string("op_name", "touch");
+        f->dump_stream("collection") << cid;
+        f->dump_stream("oid") << oid;
+      }
+      break;
+      
+    case Transaction::OP_WRITE:
+      {
+        coll_t cid = i.get_cid(op->cid);
+        ghobject_t oid = i.get_oid(op->oid);
+        uint64_t off = op->off;
+        uint64_t len = op->len;
+        bufferlist bl;
+        i.decode_bl(bl);
+        f->dump_string("op_name", "write");
+        f->dump_stream("collection") << cid;
+        f->dump_stream("oid") << oid;
+        f->dump_unsigned("length", len);
+        f->dump_unsigned("offset", off);
+        f->dump_unsigned("bufferlist length", bl.length());
+      }
+      break;
+      
+    case Transaction::OP_ZERO:
+      {
+        coll_t cid = i.get_cid(op->cid);
+        ghobject_t oid = i.get_oid(op->oid);
+        uint64_t off = op->off;
+        uint64_t len = op->len;
+        f->dump_string("op_name", "zero");
+        f->dump_stream("collection") << cid;
+        f->dump_stream("oid") << oid;
+        f->dump_unsigned("offset", off);
+        f->dump_unsigned("length", len);
+      }
+      break;
+      
+    case Transaction::OP_TRIMCACHE:
+      {
+        // deprecated, no-op
+        f->dump_string("op_name", "trim_cache");
+      }
+      break;
+      
+    case Transaction::OP_TRUNCATE:
+      {
+        coll_t cid = i.get_cid(op->cid);
+        ghobject_t oid = i.get_oid(op->oid);
+        uint64_t off = op->off;
+        f->dump_string("op_name", "truncate");
+        f->dump_stream("collection") << cid;
+        f->dump_stream("oid") << oid;
+        f->dump_unsigned("offset", off);
+      }
+      break;
+      
+    case Transaction::OP_REMOVE:
+      {
+        coll_t cid = i.get_cid(op->cid);
+        ghobject_t oid = i.get_oid(op->oid);
+        f->dump_string("op_name", "remove");
+        f->dump_stream("collection") << cid;
+        f->dump_stream("oid") << oid;
+      }
+      break;
+      
+    case Transaction::OP_SETATTR:
+      {
+        coll_t cid = i.get_cid(op->cid);
+        ghobject_t oid = i.get_oid(op->oid);
+        string name = i.decode_string();
+        bufferlist bl;
+        i.decode_bl(bl);
+        f->dump_string("op_name", "setattr");
+        f->dump_stream("collection") << cid;
+        f->dump_stream("oid") << oid;
+        f->dump_string("name", name);
+        f->dump_unsigned("length", bl.length());
+      }
+      break;
+      
+    case Transaction::OP_SETATTRS:
+      {
+        coll_t cid = i.get_cid(op->cid);
+        ghobject_t oid = i.get_oid(op->oid);
+        map<string, bufferptr> aset;
+        i.decode_attrset(aset);
+        f->dump_string("op_name", "setattrs");
+        f->dump_stream("collection") << cid;
+        f->dump_stream("oid") << oid;
+        f->open_object_section("attr_lens");
+        for (map<string,bufferptr>::iterator p = aset.begin();
+           p != aset.end(); ++p) {
+          f->dump_unsigned(p->first.c_str(), p->second.length());
+        }
+        f->close_section();
+      }
+      break;
+
+    case Transaction::OP_RMATTR:
+      {
+        coll_t cid = i.get_cid(op->cid);
+        ghobject_t oid = i.get_oid(op->oid);
+        string name = i.decode_string();
+        f->dump_string("op_name", "rmattr");
+        f->dump_stream("collection") << cid;
+        f->dump_stream("oid") << oid;
+        f->dump_string("name", name);
+      }
+      break;
+
+    case Transaction::OP_RMATTRS:
+      {
+        coll_t cid = i.get_cid(op->cid);
+        ghobject_t oid = i.get_oid(op->oid);
+        f->dump_string("op_name", "rmattrs");
+        f->dump_stream("collection") << cid;
+        f->dump_stream("oid") << oid;
+      }
+      break;
+      
+    case Transaction::OP_CLONE:
+      {
+        coll_t cid = i.get_cid(op->cid);
+        ghobject_t oid = i.get_oid(op->oid);
+        ghobject_t noid = i.get_oid(op->dest_oid);
+        f->dump_string("op_name", "clone");
+        f->dump_stream("collection") << cid;
+        f->dump_stream("src_oid") << oid;
+        f->dump_stream("dst_oid") << noid;
+      }
+      break;
+
+    case Transaction::OP_CLONERANGE:
+      {
+        coll_t cid = i.get_cid(op->cid);
+        ghobject_t oid = i.get_oid(op->oid);
+        ghobject_t noid = i.get_oid(op->dest_oid);
+        uint64_t off = op->off;
+        uint64_t len = op->len;
+        f->dump_string("op_name", "clonerange");
+        f->dump_stream("collection") << cid;
+        f->dump_stream("src_oid") << oid;
+        f->dump_stream("dst_oid") << noid;
+        f->dump_unsigned("offset", off);
+        f->dump_unsigned("len", len);
+      }
+      break;
+
+    case Transaction::OP_CLONERANGE2:
+      {
+        coll_t cid = i.get_cid(op->cid);
+        ghobject_t oid = i.get_oid(op->oid);
+        ghobject_t noid = i.get_oid(op->dest_oid);
+        uint64_t srcoff = op->off;
+        uint64_t len = op->len;
+        uint64_t dstoff = op->dest_off;
+        f->dump_string("op_name", "clonerange2");
+        f->dump_stream("collection") << cid;
+        f->dump_stream("src_oid") << oid;
+        f->dump_stream("dst_oid") << noid;
+        f->dump_unsigned("src_offset", srcoff);
+        f->dump_unsigned("len", len);
+        f->dump_unsigned("dst_offset", dstoff);
+      }
+      break;
+
+    case Transaction::OP_MKCOLL:
+      {
+        coll_t cid = i.get_cid(op->cid);
+        f->dump_string("op_name", "mkcoll");
+        f->dump_stream("collection") << cid;
+      }
+      break;
+
+    case Transaction::OP_COLL_HINT:
+      {
+       using ceph::decode;
+        coll_t cid = i.get_cid(op->cid);
+        uint32_t type = op->hint_type;
+        f->dump_string("op_name", "coll_hint");
+        f->dump_stream("collection") << cid;
+        f->dump_unsigned("type", type);
+        bufferlist hint;
+        i.decode_bl(hint);
+        auto hiter = hint.cbegin();
+        if (type == Transaction::COLL_HINT_EXPECTED_NUM_OBJECTS) {
+          uint32_t pg_num;
+          uint64_t num_objs;
+          decode(pg_num, hiter);
+          decode(num_objs, hiter);
+          f->dump_unsigned("pg_num", pg_num);
+          f->dump_unsigned("expected_num_objects", num_objs);
+        }
+      }
+      break;
+
+    case Transaction::OP_COLL_SET_BITS:
+      {
+        coll_t cid = i.get_cid(op->cid);
+        f->dump_string("op_name", "coll_set_bits");
+        f->dump_stream("collection") << cid;
+        f->dump_unsigned("bits", op->split_bits);
+      }
+      break;
+
+    case Transaction::OP_RMCOLL:
+      {
+        coll_t cid = i.get_cid(op->cid);
+        f->dump_string("op_name", "rmcoll");
+        f->dump_stream("collection") << cid;
+      }
+      break;
+
+    case Transaction::OP_COLL_ADD:
+      {
+        coll_t ocid = i.get_cid(op->cid);
+        coll_t ncid = i.get_cid(op->dest_cid);
+        ghobject_t oid = i.get_oid(op->oid);
+        f->dump_string("op_name", "collection_add");
+        f->dump_stream("src_collection") << ocid;
+        f->dump_stream("dst_collection") << ncid;
+        f->dump_stream("oid") << oid;
+      }
+      break;
+
+    case Transaction::OP_COLL_REMOVE:
+       {
+        coll_t cid = i.get_cid(op->cid);
+        ghobject_t oid = i.get_oid(op->oid);
+        f->dump_string("op_name", "collection_remove");
+        f->dump_stream("collection") << cid;
+        f->dump_stream("oid") << oid;
+       }
+      break;
+
+    case Transaction::OP_COLL_MOVE:
+       {
+        coll_t ocid = i.get_cid(op->cid);
+        coll_t ncid = i.get_cid(op->dest_cid);
+        ghobject_t oid = i.get_oid(op->oid);
+        f->open_object_section("collection_move");
+        f->dump_stream("src_collection") << ocid;
+        f->dump_stream("dst_collection") << ncid;
+        f->dump_stream("oid") << oid;
+        f->close_section();
+       }
+      break;
+
+    case Transaction::OP_COLL_SETATTR:
+      {
+        coll_t cid = i.get_cid(op->cid);
+        string name = i.decode_string();
+        bufferlist bl;
+        i.decode_bl(bl);
+        f->dump_string("op_name", "collection_setattr");
+        f->dump_stream("collection") << cid;
+        f->dump_string("name", name);
+        f->dump_unsigned("length", bl.length());
+      }
+      break;
+
+    case Transaction::OP_COLL_RMATTR:
+      {
+        coll_t cid = i.get_cid(op->cid);
+        string name = i.decode_string();
+        f->dump_string("op_name", "collection_rmattr");
+        f->dump_stream("collection") << cid;
+        f->dump_string("name", name);
+      }
+      break;
+
+    case Transaction::OP_COLL_RENAME:
+      {
+        f->dump_string("op_name", "collection_rename");
+      }
+      break;
+
+    case Transaction::OP_OMAP_CLEAR:
+      {
+        coll_t cid = i.get_cid(op->cid);
+        ghobject_t oid = i.get_oid(op->oid);
+        f->dump_string("op_name", "omap_clear");
+        f->dump_stream("collection") << cid;
+        f->dump_stream("oid") << oid;
+      }
+      break;
+
+    case Transaction::OP_OMAP_SETKEYS:
+      {
+        coll_t cid = i.get_cid(op->cid);
+        ghobject_t oid = i.get_oid(op->oid);
+        map<string, bufferlist> aset;
+        i.decode_attrset(aset);
+        f->dump_string("op_name", "omap_setkeys");
+        f->dump_stream("collection") << cid;
+        f->dump_stream("oid") << oid;
+        f->open_object_section("attr_lens");
+        for (map<string, bufferlist>::iterator p = aset.begin();
+             p != aset.end(); ++p) {
+          f->dump_unsigned(p->first.c_str(), p->second.length());
+        }
+        f->close_section();
+      }
+      break;
+
+    case Transaction::OP_OMAP_RMKEYS:
+      {
+        coll_t cid = i.get_cid(op->cid);
+        ghobject_t oid = i.get_oid(op->oid);
+        set<string> keys;
+        i.decode_keyset(keys);
+        f->dump_string("op_name", "omap_rmkeys");
+        f->dump_stream("collection") << cid;
+        f->dump_stream("oid") << oid;
+        f->open_array_section("attrs");
+        for (auto& k : keys) {
+          f->dump_string("", k.c_str());
+        }
+        f->close_section();
+      }
+      break;
+
+    case Transaction::OP_OMAP_SETHEADER:
+      {
+        coll_t cid = i.get_cid(op->cid);
+        ghobject_t oid = i.get_oid(op->oid);
+        bufferlist bl;
+        i.decode_bl(bl);
+        f->dump_string("op_name", "omap_setheader");
+        f->dump_stream("collection") << cid;
+        f->dump_stream("oid") << oid;
+        f->dump_stream("header_length") << bl.length();
+      }
+      break;
+
+    case Transaction::OP_SPLIT_COLLECTION:
+      {
+        coll_t cid = i.get_cid(op->cid);
+        uint32_t bits = op->split_bits;
+        uint32_t rem = op->split_rem;
+        coll_t dest = i.get_cid(op->dest_cid);
+        f->dump_string("op_name", "op_split_collection_create");
+        f->dump_stream("collection") << cid;
+        f->dump_stream("bits") << bits;
+        f->dump_stream("rem") << rem;
+        f->dump_stream("dest") << dest;
+      }
+      break;
+
+    case Transaction::OP_SPLIT_COLLECTION2:
+      {
+        coll_t cid = i.get_cid(op->cid);
+        uint32_t bits = op->split_bits;
+        uint32_t rem = op->split_rem;
+        coll_t dest = i.get_cid(op->dest_cid);
+        f->dump_string("op_name", "op_split_collection");
+        f->dump_stream("collection") << cid;
+        f->dump_stream("bits") << bits;
+        f->dump_stream("rem") << rem;
+        f->dump_stream("dest") << dest;
+      }
+      break;
+
+    case Transaction::OP_MERGE_COLLECTION:
+      {
+        coll_t cid = i.get_cid(op->cid);
+        uint32_t bits = op->split_bits;
+        coll_t dest = i.get_cid(op->dest_cid);
+        f->dump_string("op_name", "op_merge_collection");
+        f->dump_stream("collection") << cid;
+        f->dump_stream("dest") << dest;
+        f->dump_stream("bits") << bits;
+      }
+      break;
+
+    case Transaction::OP_OMAP_RMKEYRANGE:
+      {
+        coll_t cid = i.get_cid(op->cid);
+        ghobject_t oid = i.get_oid(op->oid);
+        string first, last;
+        first = i.decode_string();
+        last = i.decode_string();
+        f->dump_string("op_name", "op_omap_rmkeyrange");
+        f->dump_stream("collection") << cid;
+        f->dump_stream("oid") << oid;
+        f->dump_string("first", first);
+        f->dump_string("last", last);
+      }
+      break;
+
+    case Transaction::OP_COLL_MOVE_RENAME:
+      {
+        coll_t old_cid = i.get_cid(op->cid);
+        ghobject_t old_oid = i.get_oid(op->oid);
+        coll_t new_cid = i.get_cid(op->dest_cid);
+        ghobject_t new_oid = i.get_oid(op->dest_oid);
+        f->dump_string("op_name", "op_coll_move_rename");
+        f->dump_stream("old_collection") << old_cid;
+        f->dump_stream("old_oid") << old_oid;
+        f->dump_stream("new_collection") << new_cid;
+        f->dump_stream("new_oid") << new_oid;
+      }
+      break;
+
+    case Transaction::OP_TRY_RENAME:
+      {
+        coll_t cid = i.get_cid(op->cid);
+        ghobject_t old_oid = i.get_oid(op->oid);
+        ghobject_t new_oid = i.get_oid(op->dest_oid);
+        f->dump_string("op_name", "op_coll_move_rename");
+        f->dump_stream("collection") << cid;
+        f->dump_stream("old_oid") << old_oid;
+        f->dump_stream("new_oid") << new_oid;
+      }
+      break;
+       
+    case Transaction::OP_SETALLOCHINT:
+      {
+        coll_t cid = i.get_cid(op->cid);
+        ghobject_t oid = i.get_oid(op->oid);
+        uint64_t expected_object_size = op->expected_object_size;
+        uint64_t expected_write_size = op->expected_write_size;
+        f->dump_string("op_name", "op_setallochint");
+        f->dump_stream("collection") << cid;
+        f->dump_stream("oid") << oid;
+        f->dump_stream("expected_object_size") << expected_object_size;
+        f->dump_stream("expected_write_size") << expected_write_size;
+      }
+      break;
+
+    default:
+      f->dump_string("op_name", "unknown");
+      f->dump_unsigned("op_code", op->op);
+      stop_looping = true;
+      break;
+    }
+    f->close_section();
+    op_num++;
+  }
+  f->close_section();
+}
+
+}
diff --git a/src/crimson/os/Transaction.h b/src/crimson/os/Transaction.h
new file mode 100644 (file)
index 0000000..a2f58b1
--- /dev/null
@@ -0,0 +1,1234 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+
+#include <map>
+
+#include "include/int_types.h"
+#include "include/buffer.h"
+#include "osd/osd_types.h"
+
+/*********************************
+ * transaction
+ *
+ * A Transaction represents a sequence of primitive mutation
+ * operations.
+ *
+ * Three events in the life of a Transaction result in
+ * callbacks. Any Transaction can contain any number of callback
+ * objects (Context) for any combination of the three classes of
+ * callbacks:
+ *
+ *    on_applied_sync, on_applied, and on_commit.
+ *
+ * The "on_applied" and "on_applied_sync" callbacks are invoked when
+ * the modifications requested by the Transaction are visible to
+ * subsequent ObjectStore operations, i.e., the results are
+ * readable. The only conceptual difference between on_applied and
+ * on_applied_sync is the specific thread and locking environment in
+ * which the callbacks operate.  "on_applied_sync" is called
+ * directly by an ObjectStore execution thread. It is expected to
+ * execute quickly and must not acquire any locks of the calling
+ * environment. Conversely, "on_applied" is called from the separate
+ * Finisher thread, meaning that it can contend for calling
+ * environment locks. NB, on_applied and on_applied_sync are
+ * sometimes called on_readable and on_readable_sync.
+ *
+ * The "on_commit" callback is also called from the Finisher thread
+ * and indicates that all of the mutations have been durably
+ * committed to stable storage (i.e., are now software/hardware
+ * crashproof).
+ *
+ * At the implementation level, each mutation primitive (and its
+ * associated data) can be serialized to a single buffer.  That
+ * serialization, however, does not copy any data, but (using the
+ * bufferlist library) will reference the original buffers.  This
+ * implies that the buffer that contains the data being submitted
+ * must remain stable until the on_commit callback completes.  In
+ * practice, bufferlist handles all of this for you and this
+ * subtlety is only relevant if you are referencing an existing
+ * buffer via buffer::raw_static.
+ *
+ * Some implementations of ObjectStore choose to implement their own
+ * form of journaling that uses the serialized form of a
+ * Transaction. This requires that the encode/decode logic properly
+ * version itself and handle version upgrades that might change the
+ * format of the encoded Transaction. This has already happened a
+ * couple of times and the Transaction object contains some helper
+ * variables that aid in this legacy decoding:
+ *
+ *   sobject_encoding detects an older/simpler version of oid
+ *   present in pre-bobtail versions of ceph.  use_pool_override
+ *   also detects a situation where the pool of an oid can be
+ *   overridden for legacy operations/buffers.  For non-legacy
+ *   implementations of ObjectStore, neither of these fields are
+ *   relevant.
+ *
+ *
+ * TRANSACTION ISOLATION
+ *
+ * Except as noted above, isolation is the responsibility of the
+ * caller. In other words, if any storage element (storage element
+ * == any of the four portions of an object as described above) is
+ * altered by a transaction (including deletion), the caller
+ * promises not to attempt to read that element while the
+ * transaction is pending (here pending means from the time of
+ * issuance until the "on_applied_sync" callback has been
+ * received). Violations of isolation need not be detected by
+ * ObjectStore and there is no corresponding error mechanism for
+ * reporting an isolation violation (crashing would be the
+ * appropriate way to report an isolation violation if detected).
+ *
+ * Enumeration operations may violate transaction isolation as
+ * described above when a storage element is being created or
+ * deleted as part of a transaction. In this case, ObjectStore is
+ * allowed to consider the enumeration operation to either precede
+ * or follow the violating transaction element. In other words, the
+ * presence/absence of the mutated element in the enumeration is
+ * entirely at the discretion of ObjectStore. The arbitrary ordering
+ * applies independently to each transaction element. For example,
+ * if a transaction contains two mutating elements "create A" and
+ * "delete B". And an enumeration operation is performed while this
+ * transaction is pending. It is permissible for ObjectStore to
+ * report any of the four possible combinations of the existence of
+ * A and B.
+ *
+ */
+namespace ceph::os {
+class Transaction {
+public:
+  enum {
+    OP_NOP =          0,
+    OP_TOUCH =        9,   // cid, oid
+    OP_WRITE =        10,  // cid, oid, offset, len, bl
+    OP_ZERO =         11,  // cid, oid, offset, len
+    OP_TRUNCATE =     12,  // cid, oid, len
+    OP_REMOVE =       13,  // cid, oid
+    OP_SETATTR =      14,  // cid, oid, attrname, bl
+    OP_SETATTRS =     15,  // cid, oid, attrset
+    OP_RMATTR =       16,  // cid, oid, attrname
+    OP_CLONE =        17,  // cid, oid, newoid
+    OP_CLONERANGE =   18,  // cid, oid, newoid, offset, len
+    OP_CLONERANGE2 =  30,  // cid, oid, newoid, srcoff, len, dstoff
+
+    OP_TRIMCACHE =    19,  // cid, oid, offset, len  **DEPRECATED**
+
+    OP_MKCOLL =       20,  // cid
+    OP_RMCOLL =       21,  // cid
+    OP_COLL_ADD =     22,  // cid, oldcid, oid
+    OP_COLL_REMOVE =  23,  // cid, oid
+    OP_COLL_SETATTR = 24,  // cid, attrname, bl
+    OP_COLL_RMATTR =  25,  // cid, attrname
+    OP_COLL_SETATTRS = 26,  // cid, attrset
+    OP_COLL_MOVE =    8,   // newcid, oldcid, oid
+
+    OP_RMATTRS =      28,  // cid, oid
+    OP_COLL_RENAME =       29,  // cid, newcid
+
+    OP_OMAP_CLEAR = 31,   // cid
+    OP_OMAP_SETKEYS = 32, // cid, attrset
+    OP_OMAP_RMKEYS = 33,  // cid, keyset
+    OP_OMAP_SETHEADER = 34, // cid, header
+    OP_SPLIT_COLLECTION = 35, // cid, bits, destination
+    OP_SPLIT_COLLECTION2 = 36, /* cid, bits, destination
+                   doesn't create the destination */
+    OP_OMAP_RMKEYRANGE = 37,  // cid, oid, firstkey, lastkey
+    OP_COLL_MOVE_RENAME = 38,   // oldcid, oldoid, newcid, newoid
+    OP_SETALLOCHINT = 39,  // cid, oid, object_size, write_size
+    OP_COLL_HINT = 40, // cid, type, bl
+
+    OP_TRY_RENAME = 41,   // oldcid, oldoid, newoid
+
+    OP_COLL_SET_BITS = 42, // cid, bits
+
+    OP_MERGE_COLLECTION = 43, // cid, destination
+  };
+
+  // Transaction hint type
+  enum {
+    COLL_HINT_EXPECTED_NUM_OBJECTS = 1,
+  };
+
+  struct Op {
+    __le32 op;
+    __le32 cid;
+    __le32 oid;
+    __le64 off;
+    __le64 len;
+    __le32 dest_cid;
+    __le32 dest_oid;                  //OP_CLONE, OP_CLONERANGE
+    __le64 dest_off;                  //OP_CLONERANGE
+    union {
+      struct {
+        __le32 hint_type;             //OP_COLL_HINT
+      };
+      struct {
+        __le32 alloc_hint_flags;      //OP_SETALLOCHINT
+      };
+    };
+    __le64 expected_object_size;      //OP_SETALLOCHINT
+    __le64 expected_write_size;       //OP_SETALLOCHINT
+    __le32 split_bits;                //OP_SPLIT_COLLECTION2,OP_COLL_SET_BITS,
+                                      //OP_MKCOLL
+    __le32 split_rem;                 //OP_SPLIT_COLLECTION2
+  } __attribute__ ((packed)) ;
+
+  struct TransactionData {
+    __le64 ops;
+    __le32 largest_data_len;
+    __le32 largest_data_off;
+    __le32 largest_data_off_in_data_bl;
+    __le32 fadvise_flags;
+
+    TransactionData() noexcept :
+    ops(0),
+      largest_data_len(0),
+      largest_data_off(0),
+      largest_data_off_in_data_bl(0),
+       fadvise_flags(0) { }
+
+      // override default move operations to reset default values
+    TransactionData(TransactionData&& other) noexcept :
+    ops(other.ops),
+      largest_data_len(other.largest_data_len),
+      largest_data_off(other.largest_data_off),
+      largest_data_off_in_data_bl(other.largest_data_off_in_data_bl),
+      fadvise_flags(other.fadvise_flags) {
+      other.ops = 0;
+      other.largest_data_len = 0;
+      other.largest_data_off = 0;
+      other.largest_data_off_in_data_bl = 0;
+      other.fadvise_flags = 0;
+    }
+    TransactionData& operator=(TransactionData&& other) noexcept {
+      ops = other.ops;
+      largest_data_len = other.largest_data_len;
+      largest_data_off = other.largest_data_off;
+      largest_data_off_in_data_bl = other.largest_data_off_in_data_bl;
+      fadvise_flags = other.fadvise_flags;
+      other.ops = 0;
+      other.largest_data_len = 0;
+      other.largest_data_off = 0;
+      other.largest_data_off_in_data_bl = 0;
+      other.fadvise_flags = 0;
+      return *this;
+    }
+
+    TransactionData(const TransactionData& other) = default;
+    TransactionData& operator=(const TransactionData& other) = default;
+
+    void encode(bufferlist& bl) const {
+      bl.append((char*)this, sizeof(TransactionData));
+    }
+    void decode(bufferlist::const_iterator &bl) {
+      bl.copy(sizeof(TransactionData), (char*)this);
+    }
+  } __attribute__ ((packed)) ;
+
+private:
+  TransactionData data;
+
+  std::map<coll_t, __le32> coll_index;
+  std::map<ghobject_t, __le32> object_index;
+
+  __le32 coll_id {0};
+  __le32 object_id {0};
+
+  bufferlist data_bl;
+  bufferlist op_bl;
+
+  std::list<Context *> on_applied;
+  std::list<Context *> on_commit;
+  std::list<Context *> on_applied_sync;
+
+public:
+  Transaction() = default;
+
+  explicit Transaction(bufferlist::const_iterator &dp) {
+    decode(dp);
+  }
+  explicit Transaction(bufferlist &nbl) {
+    auto dp = nbl.cbegin();
+    decode(dp);
+  }
+
+  // override default move operations to reset default values
+  Transaction(Transaction&& other) noexcept :
+    data(std::move(other.data)),
+    coll_index(std::move(other.coll_index)),
+    object_index(std::move(other.object_index)),
+    coll_id(other.coll_id),
+    object_id(other.object_id),
+    data_bl(std::move(other.data_bl)),
+    op_bl(std::move(other.op_bl)),
+    on_applied(std::move(other.on_applied)),
+    on_commit(std::move(other.on_commit)),
+    on_applied_sync(std::move(other.on_applied_sync)) {
+    other.coll_id = 0;
+    other.object_id = 0;
+  }
+
+  Transaction& operator=(Transaction&& other) noexcept {
+    data = std::move(other.data);
+    coll_index = std::move(other.coll_index);
+    object_index = std::move(other.object_index);
+    coll_id = other.coll_id;
+    object_id = other.object_id;
+    data_bl = std::move(other.data_bl);
+    op_bl = std::move(other.op_bl);
+    on_applied = std::move(other.on_applied);
+    on_commit = std::move(other.on_commit);
+    on_applied_sync = std::move(other.on_applied_sync);
+    other.coll_id = 0;
+    other.object_id = 0;
+    return *this;
+  }
+
+  Transaction(const Transaction& other) = default;
+  Transaction& operator=(const Transaction& other) = default;
+
+  // expose object_index for FileStore::Op's benefit
+  const map<ghobject_t, __le32>& get_object_index() const {
+    return object_index;
+  }
+
+  /* Operations on callback contexts */
+  void register_on_applied(Context *c) {
+    if (!c) return;
+    on_applied.push_back(c);
+  }
+  void register_on_commit(Context *c) {
+    if (!c) return;
+    on_commit.push_back(c);
+  }
+  void register_on_applied_sync(Context *c) {
+    if (!c) return;
+    on_applied_sync.push_back(c);
+  }
+  void register_on_complete(Context *c) {
+    if (!c) return;
+    RunOnDeleteRef _complete (std::make_shared<RunOnDelete>(c));
+    register_on_applied(new ContainerContext<RunOnDeleteRef>(_complete));
+    register_on_commit(new ContainerContext<RunOnDeleteRef>(_complete));
+  }
+  bool has_contexts() const {
+    return
+      !on_commit.empty() ||
+      !on_applied.empty() ||
+      !on_applied_sync.empty();
+  }
+
+  static void collect_contexts(vector<Transaction>& t,
+                              Context **out_on_applied,
+                              Context **out_on_commit,
+                              Context **out_on_applied_sync) {
+    ceph_assert(out_on_applied);
+    ceph_assert(out_on_commit);
+    ceph_assert(out_on_applied_sync);
+    std::list<Context *> on_applied, on_commit, on_applied_sync;
+    for (auto& i : t) {
+      on_applied.splice(on_applied.end(), i.on_applied);
+      on_commit.splice(on_commit.end(), i.on_commit);
+      on_applied_sync.splice(on_applied_sync.end(), i.on_applied_sync);
+    }
+    *out_on_applied = C_Contexts::list_to_context(on_applied);
+    *out_on_commit = C_Contexts::list_to_context(on_commit);
+    *out_on_applied_sync = C_Contexts::list_to_context(on_applied_sync);
+  }
+  static void collect_contexts(vector<Transaction>& t,
+                              std::list<Context*> *out_on_applied,
+                              std::list<Context*> *out_on_commit,
+                              std::list<Context*> *out_on_applied_sync) {
+    ceph_assert(out_on_applied);
+    ceph_assert(out_on_commit);
+    ceph_assert(out_on_applied_sync);
+    for (auto& i : t) {
+      out_on_applied->splice(out_on_applied->end(), i.on_applied);
+      out_on_commit->splice(out_on_commit->end(), i.on_commit);
+      out_on_applied_sync->splice(out_on_applied_sync->end(),
+                                 i.on_applied_sync);
+    }
+  }
+
+  Context *get_on_applied() {
+    return C_Contexts::list_to_context(on_applied);
+  }
+  Context *get_on_commit() {
+    return C_Contexts::list_to_context(on_commit);
+  }
+  Context *get_on_applied_sync() {
+    return C_Contexts::list_to_context(on_applied_sync);
+  }
+
+  void set_fadvise_flags(uint32_t flags) {
+    data.fadvise_flags = flags;
+  }
+  void set_fadvise_flag(uint32_t flag) {
+    data.fadvise_flags = data.fadvise_flags | flag;
+  }
+  uint32_t get_fadvise_flags() { return data.fadvise_flags; }
+
+  void swap(Transaction& other) noexcept {
+    std::swap(data, other.data);
+    std::swap(on_applied, other.on_applied);
+    std::swap(on_commit, other.on_commit);
+    std::swap(on_applied_sync, other.on_applied_sync);
+    
+    std::swap(coll_index, other.coll_index);
+    std::swap(object_index, other.object_index);
+    std::swap(coll_id, other.coll_id);
+    std::swap(object_id, other.object_id);
+    op_bl.swap(other.op_bl);
+    data_bl.swap(other.data_bl);
+  }
+
+  void _update_op(Op* op,
+                 vector<__le32> &cm,
+                 vector<__le32> &om) {
+
+    switch (op->op) {
+    case OP_NOP:
+      break;
+
+    case OP_TOUCH:
+    case OP_REMOVE:
+    case OP_SETATTR:
+    case OP_SETATTRS:
+    case OP_RMATTR:
+    case OP_RMATTRS:
+    case OP_COLL_REMOVE:
+    case OP_OMAP_CLEAR:
+    case OP_OMAP_SETKEYS:
+    case OP_OMAP_RMKEYS:
+    case OP_OMAP_RMKEYRANGE:
+    case OP_OMAP_SETHEADER:
+    case OP_WRITE:
+    case OP_ZERO:
+    case OP_TRUNCATE:
+    case OP_SETALLOCHINT:
+      ceph_assert(op->cid < cm.size());
+      ceph_assert(op->oid < om.size());
+      op->cid = cm[op->cid];
+      op->oid = om[op->oid];
+      break;
+
+    case OP_CLONERANGE2:
+    case OP_CLONE:
+      ceph_assert(op->cid < cm.size());
+      ceph_assert(op->oid < om.size());
+      ceph_assert(op->dest_oid < om.size());
+      op->cid = cm[op->cid];
+      op->oid = om[op->oid];
+      op->dest_oid = om[op->dest_oid];
+      break;
+
+    case OP_MKCOLL:
+    case OP_RMCOLL:
+    case OP_COLL_SETATTR:
+    case OP_COLL_RMATTR:
+    case OP_COLL_SETATTRS:
+    case OP_COLL_HINT:
+    case OP_COLL_SET_BITS:
+      ceph_assert(op->cid < cm.size());
+      op->cid = cm[op->cid];
+      break;
+
+    case OP_COLL_ADD:
+      ceph_assert(op->cid < cm.size());
+      ceph_assert(op->oid < om.size());
+      ceph_assert(op->dest_cid < om.size());
+      op->cid = cm[op->cid];
+      op->dest_cid = cm[op->dest_cid];
+      op->oid = om[op->oid];
+      break;
+      
+    case OP_COLL_MOVE_RENAME:
+      ceph_assert(op->cid < cm.size());
+      ceph_assert(op->oid < om.size());
+      ceph_assert(op->dest_cid < cm.size());
+      ceph_assert(op->dest_oid < om.size());
+      op->cid = cm[op->cid];
+      op->oid = om[op->oid];
+      op->dest_cid = cm[op->dest_cid];
+      op->dest_oid = om[op->dest_oid];
+      break;
+      
+    case OP_TRY_RENAME:
+      ceph_assert(op->cid < cm.size());
+      ceph_assert(op->oid < om.size());
+      ceph_assert(op->dest_oid < om.size());
+      op->cid = cm[op->cid];
+      op->oid = om[op->oid];
+      op->dest_oid = om[op->dest_oid];
+      break;
+      
+    case OP_SPLIT_COLLECTION2:
+      ceph_assert(op->cid < cm.size());
+      ceph_assert(op->dest_cid < cm.size());
+      op->cid = cm[op->cid];
+      op->dest_cid = cm[op->dest_cid];
+      break;
+      
+    case OP_MERGE_COLLECTION:
+      ceph_assert(op->cid < cm.size());
+      ceph_assert(op->dest_cid < cm.size());
+      op->cid = cm[op->cid];
+      op->dest_cid = cm[op->dest_cid];
+      break;
+      
+    default:
+      ceph_abort_msg("Unknown OP");
+    }
+  }
+  void _update_op_bl(
+                    bufferlist& bl,
+                    vector<__le32> &cm,
+                    vector<__le32> &om) {
+    for (auto& bp : bl.buffers()) {
+      ceph_assert(bp.length() % sizeof(Op) == 0);
+      
+      char* raw_p = const_cast<char*>(bp.c_str());
+      char* raw_end = raw_p + bp.length();
+      while (raw_p < raw_end) {
+       _update_op(reinterpret_cast<Op*>(raw_p), cm, om);
+       raw_p += sizeof(Op);
+      }
+    }
+  }
+  /// Append the operations of the parameter to this Transaction. Those
+  /// operations are removed from the parameter Transaction
+  void append(Transaction& other) {
+
+    data.ops += other.data.ops;
+    if (other.data.largest_data_len > data.largest_data_len) {
+      data.largest_data_len = other.data.largest_data_len;
+      data.largest_data_off = other.data.largest_data_off;
+      data.largest_data_off_in_data_bl = data_bl.length() + other.data.largest_data_off_in_data_bl;
+    }
+    data.fadvise_flags |= other.data.fadvise_flags;
+    on_applied.splice(on_applied.end(), other.on_applied);
+    on_commit.splice(on_commit.end(), other.on_commit);
+    on_applied_sync.splice(on_applied_sync.end(), other.on_applied_sync);
+    
+    //append coll_index & object_index
+    vector<__le32> cm(other.coll_index.size());
+    map<coll_t, __le32>::iterator coll_index_p;
+    for (coll_index_p = other.coll_index.begin();
+        coll_index_p != other.coll_index.end();
+        ++coll_index_p) {
+      cm[coll_index_p->second] = _get_coll_id(coll_index_p->first);
+    }
+    
+    vector<__le32> om(other.object_index.size());
+    map<ghobject_t, __le32>::iterator object_index_p;
+    for (object_index_p = other.object_index.begin();
+        object_index_p != other.object_index.end();
+        ++object_index_p) {
+      om[object_index_p->second] = _get_object_id(object_index_p->first);
+    }      
+    
+    //the other.op_bl SHOULD NOT be changes during append operation,
+    //we use additional bufferlist to avoid this problem
+    bufferlist other_op_bl;
+    {
+      bufferptr other_op_bl_ptr(other.op_bl.length());
+      other.op_bl.copy(0, other.op_bl.length(), other_op_bl_ptr.c_str());
+      other_op_bl.append(std::move(other_op_bl_ptr));
+    }
+    
+    //update other_op_bl with cm & om
+    //When the other is appended to current transaction, all coll_index and
+    //object_index in other.op_buffer should be updated by new index of the
+    //combined transaction
+    _update_op_bl(other_op_bl, cm, om);
+    
+    //append op_bl
+    op_bl.append(other_op_bl);
+    //append data_bl
+    data_bl.append(other.data_bl);
+  }
+  
+  /** Inquires about the Transaction as a whole. */
+  
+  /// How big is the encoded Transaction buffer?
+  uint64_t get_encoded_bytes() {
+    //layout: data_bl + op_bl + coll_index + object_index + data
+    
+    // coll_index size, object_index size and sizeof(transaction_data)
+    // all here, so they may be computed at compile-time
+    size_t final_size = sizeof(__u32) * 2 + sizeof(data);
+    
+    // coll_index second and object_index second
+    final_size += (coll_index.size() + object_index.size()) * sizeof(__le32);
+    
+    // coll_index first
+    for (auto p = coll_index.begin(); p != coll_index.end(); ++p) {
+      final_size += p->first.encoded_size();
+    }
+    
+    // object_index first
+    for (auto p = object_index.begin(); p != object_index.end(); ++p) {
+      final_size += p->first.encoded_size();
+    }
+    
+    return data_bl.length() +
+      op_bl.length() +
+      final_size;
+  }
+  
+  /// Retain old version for regression testing purposes
+  uint64_t get_encoded_bytes_test() {
+    using ceph::encode;
+    //layout: data_bl + op_bl + coll_index + object_index + data
+    bufferlist bl;
+    encode(coll_index, bl);
+    encode(object_index, bl);
+    
+    return data_bl.length() +
+      op_bl.length() +
+      bl.length() +
+      sizeof(data);
+  }
+  
+  uint64_t get_num_bytes() {
+    return get_encoded_bytes();
+  }
+  /// Size of largest data buffer to the "write" operation encountered so far
+  uint32_t get_data_length() {
+    return data.largest_data_len;
+  }
+  /// offset within the encoded buffer to the start of the largest data buffer
+  /// that's encoded
+  uint32_t get_data_offset() {
+    if (data.largest_data_off_in_data_bl) {
+      return data.largest_data_off_in_data_bl +
+       sizeof(__u8) +      // encode struct_v
+       sizeof(__u8) +      // encode compat_v
+       sizeof(__u32) +     // encode len
+       sizeof(__u32);      // data_bl len
+    }
+    return 0;  // none
+  }
+  /// offset of buffer as aligned to destination within object.
+  int get_data_alignment() {
+    if (!data.largest_data_len)
+      return 0;
+    return (0 - get_data_offset()) & ~CEPH_PAGE_MASK;
+  }
+  /// Is the Transaction empty (no operations)
+  bool empty() {
+    return !data.ops;
+  }
+  /// Number of operations in the transaction
+  int get_num_ops() {
+    return data.ops;
+  }
+  
+  /**
+   * iterator
+   *
+   * Helper object to parse Transactions.
+   *
+   * ObjectStore instances use this object to step down the encoded
+   * buffer decoding operation codes and parameters as we go.
+   *
+   */
+  class iterator {
+    Transaction *t;
+    
+    uint64_t ops;
+    char* op_buffer_p;
+    
+    bufferlist::const_iterator data_bl_p;
+    
+  public:
+    vector<coll_t> colls;
+    vector<ghobject_t> objects;
+    
+  private:
+    explicit iterator(Transaction *t)
+      : t(t),
+       data_bl_p(t->data_bl.cbegin()),
+       colls(t->coll_index.size()),
+       objects(t->object_index.size()) {
+      
+      ops = t->data.ops;
+      op_buffer_p = t->op_bl.c_str();
+      
+      map<coll_t, __le32>::iterator coll_index_p;
+      for (coll_index_p = t->coll_index.begin();
+          coll_index_p != t->coll_index.end();
+          ++coll_index_p) {
+       colls[coll_index_p->second] = coll_index_p->first;
+      }
+      
+      map<ghobject_t, __le32>::iterator object_index_p;
+      for (object_index_p = t->object_index.begin();
+          object_index_p != t->object_index.end();
+          ++object_index_p) {
+       objects[object_index_p->second] = object_index_p->first;
+      }
+    }
+    
+    friend class Transaction;
+    
+  public:
+    
+    bool have_op() {
+      return ops > 0;
+    }
+    Op* decode_op() {
+      ceph_assert(ops > 0);
+      
+      Op* op = reinterpret_cast<Op*>(op_buffer_p);
+      op_buffer_p += sizeof(Op);
+      ops--;
+      
+      return op;
+    }
+    string decode_string() {
+      using ceph::decode;
+      string s;
+      decode(s, data_bl_p);
+      return s;
+    }
+    void decode_bp(bufferptr& bp) {
+      using ceph::decode;
+      decode(bp, data_bl_p);
+    }
+    void decode_bl(bufferlist& bl) {
+      using ceph::decode;
+      decode(bl, data_bl_p);
+    }
+    void decode_attrset(map<string,bufferptr>& aset) {
+      using ceph::decode;
+      decode(aset, data_bl_p);
+    }
+    void decode_attrset(map<string,bufferlist>& aset) {
+      using ceph::decode;
+      decode(aset, data_bl_p);
+    }
+    void decode_attrset_bl(bufferlist *pbl);
+    void decode_keyset(set<string> &keys){
+      using ceph::decode;
+      decode(keys, data_bl_p);
+    }
+    void decode_keyset_bl(bufferlist *pbl);
+    
+    const ghobject_t &get_oid(__le32 oid_id) {
+      ceph_assert(oid_id < objects.size());
+      return objects[oid_id];
+    }
+    const coll_t &get_cid(__le32 cid_id) {
+      ceph_assert(cid_id < colls.size());
+      return colls[cid_id];
+    }
+    uint32_t get_fadvise_flags() const {
+      return t->get_fadvise_flags();
+    }
+  };
+  
+  iterator begin() {
+    return iterator(this);
+  }
+  
+private:
+  void _build_actions_from_tbl();
+
+  static constexpr size_t OPS_PER_PTR = 32u;
+  /**
+   * Helper functions to encode the various mutation elements of a
+   * transaction.  These are 1:1 with the operation codes (see
+   * enumeration above).  These routines ensure that the
+   * encoder/creator of a transaction gets the right data in the
+   * right place. Sadly, there's no corresponding version nor any
+   * form of seat belts for the decoder.
+   */
+  Op* _get_next_op() {
+    if (op_bl.get_append_buffer_unused_tail_length() < sizeof(Op)) {
+      op_bl.reserve(sizeof(Op) * OPS_PER_PTR);
+    }
+    // append_hole ensures bptr merging. Even huge number of ops
+    // shouldn't result in overpopulating bl::_buffers.
+    char* const p = op_bl.append_hole(sizeof(Op)).c_str();
+    memset(p, 0, sizeof(Op));
+    return reinterpret_cast<Op*>(p);
+  }
+  __le32 _get_coll_id(const coll_t& coll) {
+    map<coll_t, __le32>::iterator c = coll_index.find(coll);
+    if (c != coll_index.end())
+      return c->second;
+    
+    __le32 index_id = coll_id++;
+    coll_index[coll] = index_id;
+    return index_id;
+  }
+  __le32 _get_object_id(const ghobject_t& oid) {
+    map<ghobject_t, __le32>::iterator o = object_index.find(oid);
+    if (o != object_index.end())
+      return o->second;
+    
+    __le32 index_id = object_id++;
+    object_index[oid] = index_id;
+    return index_id;
+  }
+  
+public:
+  /// noop. 'nuf said
+  void nop() {
+    Op* _op = _get_next_op();
+    _op->op = OP_NOP;
+    data.ops++;
+  }
+  /**
+   * touch
+   *
+   * Ensure the existance of an object in a collection. Create an
+   * empty object if necessary
+   */
+  void touch(const coll_t& cid, const ghobject_t& oid) {
+    Op* _op = _get_next_op();
+    _op->op = OP_TOUCH;
+    _op->cid = _get_coll_id(cid);
+    _op->oid = _get_object_id(oid);
+    data.ops++;
+  }
+  /**
+   * Write data to an offset within an object. If the object is too
+   * small, it is expanded as needed.  It is possible to specify an
+   * offset beyond the current end of an object and it will be
+   * expanded as needed. Simple implementations of ObjectStore will
+   * just zero the data between the old end of the object and the
+   * newly provided data. More sophisticated implementations of
+   * ObjectStore will omit the untouched data and store it as a
+   * "hole" in the file.
+   *
+   * Note that a 0-length write does not affect the size of the object.
+   */
+  void write(const coll_t& cid, const ghobject_t& oid, uint64_t off, uint64_t len,
+            const bufferlist& write_data, uint32_t flags = 0) {
+    using ceph::encode;
+    uint32_t orig_len = data_bl.length();
+    Op* _op = _get_next_op();
+    _op->op = OP_WRITE;
+    _op->cid = _get_coll_id(cid);
+    _op->oid = _get_object_id(oid);
+    _op->off = off;
+    _op->len = len;
+    encode(write_data, data_bl);
+    
+    ceph_assert(len == write_data.length());
+    data.fadvise_flags = data.fadvise_flags | flags;
+    if (write_data.length() > data.largest_data_len) {
+      data.largest_data_len = write_data.length();
+      data.largest_data_off = off;
+      // we are about to
+      data.largest_data_off_in_data_bl = orig_len + sizeof(__u32);  
+    }
+    data.ops++;
+  }
+  /**
+   * zero out the indicated byte range within an object. Some
+   * ObjectStore instances may optimize this to release the
+   * underlying storage space.
+   *
+   * If the zero range extends beyond the end of the object, the object
+   * size is extended, just as if we were writing a buffer full of zeros.
+   * EXCEPT if the length is 0, in which case (just like a 0-length write)
+   * we do not adjust the object size.
+   */
+  void zero(const coll_t& cid, const ghobject_t& oid, uint64_t off, uint64_t len) {
+    Op* _op = _get_next_op();
+    _op->op = OP_ZERO;
+    _op->cid = _get_coll_id(cid);
+    _op->oid = _get_object_id(oid);
+    _op->off = off;
+    _op->len = len;
+    data.ops++;
+  }
+  /// Discard all data in the object beyond the specified size.
+  void truncate(const coll_t& cid, const ghobject_t& oid, uint64_t off) {
+    Op* _op = _get_next_op();
+    _op->op = OP_TRUNCATE;
+    _op->cid = _get_coll_id(cid);
+    _op->oid = _get_object_id(oid);
+    _op->off = off;
+    data.ops++;
+  }
+  /// Remove an object. All four parts of the object are removed.
+  void remove(const coll_t& cid, const ghobject_t& oid) {
+    Op* _op = _get_next_op();
+    _op->op = OP_REMOVE;
+    _op->cid = _get_coll_id(cid);
+    _op->oid = _get_object_id(oid);
+    data.ops++;
+  }
+  /// Set an xattr of an object
+  void setattr(const coll_t& cid, const ghobject_t& oid,
+              const char* name,
+              bufferlist& val) {
+    string n(name);
+    setattr(cid, oid, n, val);
+  }
+  /// Set an xattr of an object
+  void setattr(const coll_t& cid, const ghobject_t& oid,
+              const string& s, bufferlist& val) {
+    using ceph::encode;
+    Op* _op = _get_next_op();
+    _op->op = OP_SETATTR;
+    _op->cid = _get_coll_id(cid);
+    _op->oid = _get_object_id(oid);
+    encode(s, data_bl);
+    encode(val, data_bl);
+    data.ops++;
+  }
+  /// Set multiple xattrs of an object
+  void setattrs(const coll_t& cid, const ghobject_t& oid,
+               const map<string,bufferptr>& attrset) {
+    using ceph::encode;
+    Op* _op = _get_next_op();
+    _op->op = OP_SETATTRS;
+    _op->cid = _get_coll_id(cid);
+    _op->oid = _get_object_id(oid);
+    encode(attrset, data_bl);
+    data.ops++;
+  }
+  /// Set multiple xattrs of an object
+  void setattrs(const coll_t& cid, const ghobject_t& oid,
+               const map<string,bufferlist>& attrset) {
+    using ceph::encode;
+    Op* _op = _get_next_op();
+    _op->op = OP_SETATTRS;
+    _op->cid = _get_coll_id(cid);
+    _op->oid = _get_object_id(oid);
+    encode(attrset, data_bl);
+    data.ops++;
+  }
+  /// remove an xattr from an object
+  void rmattr(const coll_t& cid, const ghobject_t& oid, const char *name) {
+    string n(name);
+    rmattr(cid, oid, n);
+  }
+  /// remove an xattr from an object
+  void rmattr(const coll_t& cid, const ghobject_t& oid, const string& s) {
+    using ceph::encode;
+    Op* _op = _get_next_op();
+    _op->op = OP_RMATTR;
+    _op->cid = _get_coll_id(cid);
+    _op->oid = _get_object_id(oid);
+    encode(s, data_bl);
+    data.ops++;
+  }
+  /// remove all xattrs from an object
+  void rmattrs(const coll_t& cid, const ghobject_t& oid) {
+    Op* _op = _get_next_op();
+    _op->op = OP_RMATTRS;
+    _op->cid = _get_coll_id(cid);
+    _op->oid = _get_object_id(oid);
+    data.ops++;
+  }
+  /**
+   * Clone an object into another object.
+   *
+   * Low-cost (e.g., O(1)) cloning (if supported) is best, but
+   * fallback to an O(n) copy is allowed.  All four parts of the
+   * object are cloned (data, xattrs, omap header, omap
+   * entries).
+   *
+   * The destination named object may already exist, in
+   * which case its previous contents are discarded.
+   */
+  void clone(const coll_t& cid, const ghobject_t& oid,
+            const ghobject_t& noid) {
+    Op* _op = _get_next_op();
+    _op->op = OP_CLONE;
+    _op->cid = _get_coll_id(cid);
+    _op->oid = _get_object_id(oid);
+    _op->dest_oid = _get_object_id(noid);
+    data.ops++;
+  }
+  /**
+   * Clone a byte range from one object to another.
+   *
+   * The data portion of the destination object receives a copy of a
+   * portion of the data from the source object. None of the other
+   * three parts of an object is copied from the source.
+   *
+   * The destination object size may be extended to the dstoff + len.
+   *
+   * The source range *must* overlap with the source object data. If it does
+   * not the result is undefined.
+   */
+  void clone_range(const coll_t& cid, const ghobject_t& oid,
+                  const ghobject_t& noid,
+                  uint64_t srcoff, uint64_t srclen, uint64_t dstoff) {
+    Op* _op = _get_next_op();
+    _op->op = OP_CLONERANGE2;
+    _op->cid = _get_coll_id(cid);
+    _op->oid = _get_object_id(oid);
+    _op->dest_oid = _get_object_id(noid);
+    _op->off = srcoff;
+    _op->len = srclen;
+    _op->dest_off = dstoff;
+    data.ops++;
+  }
+  
+  /// Create the collection
+  void create_collection(const coll_t& cid, int bits) {
+    Op* _op = _get_next_op();
+    _op->op = OP_MKCOLL;
+    _op->cid = _get_coll_id(cid);
+    _op->split_bits = bits;
+    data.ops++;
+  }
+  
+  /**
+   * Give the collection a hint.
+   *
+   * @param cid  - collection id.
+   * @param type - hint type.
+   * @param hint - the hint payload, which contains the customized
+   *               data along with the hint type.
+   */
+  void collection_hint(const coll_t& cid, uint32_t type, const bufferlist& hint) {
+    using ceph::encode;
+    Op* _op = _get_next_op();
+    _op->op = OP_COLL_HINT;
+    _op->cid = _get_coll_id(cid);
+    _op->hint_type = type;
+    encode(hint, data_bl);
+    data.ops++;
+  }
+  
+  /// remove the collection, the collection must be empty
+  void remove_collection(const coll_t& cid) {
+    Op* _op = _get_next_op();
+    _op->op = OP_RMCOLL;
+    _op->cid = _get_coll_id(cid);
+    data.ops++;
+  }
+  void collection_move(const coll_t& cid, const coll_t &oldcid,
+                      const ghobject_t& oid)
+    __attribute__ ((deprecated)) {
+    // NOTE: we encode this as a fixed combo of ADD + REMOVE.  they
+    // always appear together, so this is effectively a single MOVE.
+    Op* _op = _get_next_op();
+    _op->op = OP_COLL_ADD;
+    _op->cid = _get_coll_id(oldcid);
+    _op->oid = _get_object_id(oid);
+    _op->dest_cid = _get_coll_id(cid);
+    data.ops++;
+    
+    _op = _get_next_op();
+    _op->op = OP_COLL_REMOVE;
+    _op->cid = _get_coll_id(oldcid);
+    _op->oid = _get_object_id(oid);
+    data.ops++;
+  }
+  void collection_move_rename(const coll_t& oldcid, const ghobject_t& oldoid,
+                             const coll_t &cid, const ghobject_t& oid) {
+    Op* _op = _get_next_op();
+    _op->op = OP_COLL_MOVE_RENAME;
+    _op->cid = _get_coll_id(oldcid);
+    _op->oid = _get_object_id(oldoid);
+    _op->dest_cid = _get_coll_id(cid);
+    _op->dest_oid = _get_object_id(oid);
+    data.ops++;
+  }
+  void try_rename(const coll_t &cid, const ghobject_t& oldoid,
+                 const ghobject_t& oid) {
+    Op* _op = _get_next_op();
+    _op->op = OP_TRY_RENAME;
+    _op->cid = _get_coll_id(cid);
+    _op->oid = _get_object_id(oldoid);
+    _op->dest_oid = _get_object_id(oid);
+    data.ops++;
+  }
+  
+  /// Remove omap from oid
+  void omap_clear(
+    const coll_t &cid,           ///< [in] Collection containing oid
+    const ghobject_t &oid  ///< [in] Object from which to remove omap
+    ) {
+    Op* _op = _get_next_op();
+    _op->op = OP_OMAP_CLEAR;
+    _op->cid = _get_coll_id(cid);
+    _op->oid = _get_object_id(oid);
+    data.ops++;
+  }
+  /// Set keys on oid omap.  Replaces duplicate keys.
+  void omap_setkeys(
+    const coll_t& cid,                           ///< [in] Collection containing oid
+    const ghobject_t &oid,                ///< [in] Object to update
+    const map<string, bufferlist> &attrset ///< [in] Replacement keys and values
+    ) {
+    using ceph::encode;
+    Op* _op = _get_next_op();
+    _op->op = OP_OMAP_SETKEYS;
+    _op->cid = _get_coll_id(cid);
+    _op->oid = _get_object_id(oid);
+    encode(attrset, data_bl);
+    data.ops++;
+  }
+
+    /// Set keys on an oid omap (bufferlist variant).
+  void omap_setkeys(
+    const coll_t &cid,                           ///< [in] Collection containing oid
+    const ghobject_t &oid,                ///< [in] Object to update
+    const bufferlist &attrset_bl          ///< [in] Replacement keys and values
+  ) {
+    Op* _op = _get_next_op();
+    _op->op = OP_OMAP_SETKEYS;
+    _op->cid = _get_coll_id(cid);
+    _op->oid = _get_object_id(oid);
+    data_bl.append(attrset_bl);
+    data.ops++;
+  }
+
+  /// Remove keys from oid omap
+  void omap_rmkeys(
+    const coll_t &cid,             ///< [in] Collection containing oid
+    const ghobject_t &oid,  ///< [in] Object from which to remove the omap
+    const set<string> &keys ///< [in] Keys to clear
+    ) {
+    using ceph::encode;
+    Op* _op = _get_next_op();
+    _op->op = OP_OMAP_RMKEYS;
+    _op->cid = _get_coll_id(cid);
+    _op->oid = _get_object_id(oid);
+    encode(keys, data_bl);
+    data.ops++;
+  }
+
+  /// Remove keys from oid omap
+  void omap_rmkeys(
+    const coll_t &cid,             ///< [in] Collection containing oid
+    const ghobject_t &oid,  ///< [in] Object from which to remove the omap
+    const bufferlist &keys_bl ///< [in] Keys to clear
+    ) {
+    Op* _op = _get_next_op();
+    _op->op = OP_OMAP_RMKEYS;
+    _op->cid = _get_coll_id(cid);
+    _op->oid = _get_object_id(oid);
+    data_bl.append(keys_bl);
+    data.ops++;
+  }
+  
+  /// Remove key range from oid omap
+  void omap_rmkeyrange(
+    const coll_t &cid,             ///< [in] Collection containing oid
+    const ghobject_t &oid,  ///< [in] Object from which to remove the omap keys
+    const string& first,    ///< [in] first key in range
+    const string& last      ///< [in] first key past range, range is [first,last)
+    ) {
+    using ceph::encode;
+    Op* _op = _get_next_op();
+    _op->op = OP_OMAP_RMKEYRANGE;
+    _op->cid = _get_coll_id(cid);
+    _op->oid = _get_object_id(oid);
+    encode(first, data_bl);
+    encode(last, data_bl);
+    data.ops++;
+  }
+
+  /// Set omap header
+  void omap_setheader(
+    const coll_t &cid,             ///< [in] Collection containing oid
+    const ghobject_t &oid,  ///< [in] Object
+    const bufferlist &bl    ///< [in] Header value
+    ) {
+    using ceph::encode;
+    Op* _op = _get_next_op();
+    _op->op = OP_OMAP_SETHEADER;
+    _op->cid = _get_coll_id(cid);
+    _op->oid = _get_object_id(oid);
+    encode(bl, data_bl);
+    data.ops++;
+  }
+
+  /// Split collection based on given prefixes, objects matching the specified
+  /// bits/rem are moved to the new collection
+  void split_collection(
+    const coll_t &cid,
+    uint32_t bits,
+    uint32_t rem,
+    const coll_t &destination) {
+    Op* _op = _get_next_op();
+    _op->op = OP_SPLIT_COLLECTION2;
+    _op->cid = _get_coll_id(cid);
+    _op->dest_cid = _get_coll_id(destination);
+    _op->split_bits = bits;
+    _op->split_rem = rem;
+    data.ops++;
+  }
+
+  /// Merge collection into another.
+  void merge_collection(
+    coll_t cid,
+    coll_t destination,
+    uint32_t bits) {
+    Op* _op = _get_next_op();
+    _op->op = OP_MERGE_COLLECTION;
+    _op->cid = _get_coll_id(cid);
+    _op->dest_cid = _get_coll_id(destination);
+    _op->split_bits = bits;
+    data.ops++;
+  }
+
+  void collection_set_bits(
+    const coll_t &cid,
+    int bits) {
+    Op* _op = _get_next_op();
+    _op->op = OP_COLL_SET_BITS;
+    _op->cid = _get_coll_id(cid);
+    _op->split_bits = bits;
+    data.ops++;
+  }
+
+  /// Set allocation hint for an object
+  /// make 0 values(expected_object_size, expected_write_size) noops for all implementations
+  void set_alloc_hint(
+    const coll_t &cid,
+    const ghobject_t &oid,
+    uint64_t expected_object_size,
+    uint64_t expected_write_size,
+    uint32_t flags
+  ) {
+    Op* _op = _get_next_op();
+    _op->op = OP_SETALLOCHINT;
+    _op->cid = _get_coll_id(cid);
+    _op->oid = _get_object_id(oid);
+    _op->expected_object_size = expected_object_size;
+    _op->expected_write_size = expected_write_size;
+    _op->alloc_hint_flags = flags;
+    data.ops++;
+  }
+
+  void encode(bufferlist& bl) const {
+    //layout: data_bl + op_bl + coll_index + object_index + data
+    ENCODE_START(9, 9, bl);
+    encode(data_bl, bl);
+    encode(op_bl, bl);
+    encode(coll_index, bl);
+    encode(object_index, bl);
+    data.encode(bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(bufferlist::const_iterator &bl) {
+    DECODE_START(9, bl);
+    DECODE_OLDEST(9);
+
+    decode(data_bl, bl);
+    decode(op_bl, bl);
+    decode(coll_index, bl);
+    decode(object_index, bl);
+    data.decode(bl);
+    coll_id = coll_index.size();
+    object_id = object_index.size();
+
+    DECODE_FINISH(bl);
+  }
+
+  void dump(ceph::Formatter *f);
+};
+}
diff --git a/src/crimson/os/cyan_collection.cc b/src/crimson/os/cyan_collection.cc
new file mode 100644 (file)
index 0000000..82403db
--- /dev/null
@@ -0,0 +1,76 @@
+#include "cyan_collection.h"
+
+#include "cyan_object.h"
+
+namespace ceph::os
+{
+
+Collection::Collection(const coll_t& c)
+  : cid{c}
+{}
+
+Collection::~Collection() = default;
+
+Collection::ObjectRef Collection::create_object() const
+{
+  return new ceph::os::Object{};
+}
+
+Collection::ObjectRef Collection::get_object(ghobject_t oid)
+{
+  auto o = object_hash.find(oid);
+  if (o == object_hash.end())
+    return ObjectRef();
+  return o->second;
+}
+
+Collection::ObjectRef Collection::get_or_create_object(ghobject_t oid)
+{
+  auto result = object_hash.emplace(oid, ObjectRef{});
+  if (result.second)
+    object_map[oid] = result.first->second = create_object();
+  return result.first->second;
+}
+
+uint64_t Collection::used_bytes() const
+{
+  uint64_t result = 0;
+  for (auto& obj : object_map) {
+    result += obj.second->get_size();
+  }
+  return result;
+}
+
+void Collection::encode(bufferlist& bl) const
+{
+  ENCODE_START(1, 1, bl);
+  encode(xattr, bl);
+  encode(use_page_set, bl);
+  uint32_t s = object_map.size();
+  encode(s, bl);
+  for (auto& [oid, obj] : object_map) {
+    encode(oid, bl);
+    obj->encode(bl);
+  }
+  ENCODE_FINISH(bl);
+}
+
+void Collection::decode(bufferlist::const_iterator& p)
+{
+  DECODE_START(1, p);
+  decode(xattr, p);
+  decode(use_page_set, p);
+  uint32_t s;
+  decode(s, p);
+  while (s--) {
+    ghobject_t k;
+    decode(k, p);
+    auto o = create_object();
+    o->decode(p);
+    object_map.insert(make_pair(k, o));
+    object_hash.insert(make_pair(k, o));
+  }
+  DECODE_FINISH(p);
+}
+
+}
diff --git a/src/crimson/os/cyan_collection.h b/src/crimson/os/cyan_collection.h
new file mode 100644 (file)
index 0000000..78f1aa0
--- /dev/null
@@ -0,0 +1,55 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <string>
+#include <unordered_map>
+#include <boost/intrusive_ptr.hpp>
+#include <boost/smart_ptr/intrusive_ref_counter.hpp>
+
+#include "include/buffer.h"
+#include "osd/osd_types.h"
+
+namespace ceph::os {
+
+class Object;
+/**
+ * a collection also orders transactions
+ *
+ * Any transactions queued under a given collection will be applied in
+ * sequence.  Transactions queued under different collections may run
+ * in parallel.
+ *
+ * ObjectStore users my get collection handles with open_collection() (or,
+ * for bootstrapping a new collection, create_new_collection()).
+ */
+struct Collection : public boost::intrusive_ref_counter<
+  Collection,
+  boost::thread_unsafe_counter>
+{
+  using ObjectRef = boost::intrusive_ptr<Object>;
+  const coll_t cid;
+  int bits = 0;
+  // always use bufferlist object for testing
+  bool use_page_set = false;
+  std::unordered_map<ghobject_t, ObjectRef> object_hash;  ///< for lookup
+  std::map<ghobject_t, ObjectRef> object_map;        ///< for iteration
+  std::map<std::string,bufferptr> xattr;
+  bool exists = true;
+
+  Collection(const coll_t& c);
+  ~Collection();
+
+  ObjectRef create_object() const;
+  ObjectRef get_object(ghobject_t oid);
+  ObjectRef get_or_create_object(ghobject_t oid);
+  uint64_t used_bytes() const;
+
+  const coll_t &get_cid() const {
+    return cid;
+  }
+
+  void encode(bufferlist& bl) const;
+  void decode(bufferlist::const_iterator& p);
+};
+
+}
diff --git a/src/crimson/os/cyan_object.cc b/src/crimson/os/cyan_object.cc
new file mode 100644 (file)
index 0000000..1612ceb
--- /dev/null
@@ -0,0 +1,88 @@
+#include "cyan_object.h"
+#include "include/encoding.h"
+
+namespace ceph::os {
+
+size_t Object::get_size() const {
+  return data.length();
+}
+
+int Object::read(uint64_t offset, uint64_t len, bufferlist &bl)
+{
+  bl.substr_of(data, offset, len);
+  return bl.length();
+}
+
+int Object::write(uint64_t offset, const bufferlist &src)
+{
+  unsigned len = src.length();
+  // before
+  bufferlist newdata;
+  if (get_size() >= offset) {
+    newdata.substr_of(data, 0, offset);
+  } else {
+    if (get_size()) {
+      newdata.substr_of(data, 0, get_size());
+    }
+    newdata.append_zero(offset - get_size());
+  }
+
+  newdata.append(src);
+
+  // after
+  if (get_size() > offset + len) {
+    bufferlist tail;
+    tail.substr_of(data, offset + len, get_size() - (offset + len));
+    newdata.append(tail);
+  }
+
+  data.claim(newdata);
+  return 0;
+}
+
+int Object::clone(Object *src, uint64_t srcoff, uint64_t len,
+                  uint64_t dstoff)
+{
+  bufferlist bl;
+  if (srcoff == dstoff && len == src->get_size()) {
+    data = src->data;
+    return 0;
+  }
+  bl.substr_of(src->data, srcoff, len);
+  return write(dstoff, bl);
+
+}
+
+int Object::truncate(uint64_t size)
+{
+  if (get_size() > size) {
+    bufferlist bl;
+    bl.substr_of(data, 0, size);
+    data.claim(bl);
+  } else if (get_size() == size) {
+    // do nothing
+  } else {
+    data.append_zero(size - get_size());
+  }
+  return 0;
+}
+
+void Object::encode(bufferlist& bl) const {
+  ENCODE_START(1, 1, bl);
+  encode(data, bl);
+  encode(xattr, bl);
+  encode(omap_header, bl);
+  encode(omap, bl);
+  ENCODE_FINISH(bl);
+}
+
+void Object::decode(bufferlist::const_iterator& p) {
+  DECODE_START(1, p);
+  decode(data, p);
+  decode(xattr, p);
+  decode(omap_header, p);
+  decode(omap, p);
+  DECODE_FINISH(p);
+}
+
+}
diff --git a/src/crimson/os/cyan_object.h b/src/crimson/os/cyan_object.h
new file mode 100644 (file)
index 0000000..6846b1a
--- /dev/null
@@ -0,0 +1,38 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+#include <cstddef>
+#include <map>
+#include <string>
+#include <boost/intrusive_ptr.hpp>
+#include <boost/smart_ptr/intrusive_ref_counter.hpp>
+#include "include/buffer.h"
+
+namespace ceph::os {
+
+struct Object : public boost::intrusive_ref_counter<
+  Object,
+  boost::thread_unsafe_counter>
+{
+  using bufferlist = ceph::bufferlist;
+
+  bufferlist data;
+  std::map<std::string,bufferptr> xattr;
+  bufferlist omap_header;
+  std::map<std::string,bufferlist> omap;
+
+  typedef boost::intrusive_ptr<Object> Ref;
+
+  Object() = default;
+
+  // interface for object data
+  size_t get_size() const;
+  int read(uint64_t offset, uint64_t len, bufferlist &bl);
+  int write(uint64_t offset, const bufferlist &bl);
+  int clone(Object *src, uint64_t srcoff, uint64_t len,
+            uint64_t dstoff);
+  int truncate(uint64_t offset);
+
+  void encode(bufferlist& bl) const;
+  void decode(bufferlist::const_iterator& p);
+};
+}
index 0d7cbdf620b0b076c6d8a48bb84b681c8b63a9c7..ab0a8288d07475da1417e88a6cb894c1772af902 100644 (file)
@@ -4,6 +4,228 @@
 
 #include "common/safe_io.h"
 
+#include "crimson/os/cyan_collection.h"
+#include "crimson/os/cyan_object.h"
+#include "crimson/os/Transaction.h"
+
+namespace {
+  seastar::logger& logger() {
+    return ceph::get_logger(ceph_subsys_filestore);
+  }
+}
+
+namespace ceph::os {
+
+using ObjectRef = boost::intrusive_ptr<Object>;
+
+CyanStore::CyanStore(const std::string& path)
+  : path{path}
+{}
+
+CyanStore::~CyanStore() = default;
+
+seastar::future<> CyanStore::mount()
+{
+  bufferlist bl;
+  string fn = path + "/collections";
+  string err;
+  if (int r = bl.read_file(fn.c_str(), &err); r < 0) {
+    throw std::runtime_error("read_file");
+  }
+
+  set<coll_t> collections;
+  auto p = bl.cbegin();
+  decode(collections, p);
+
+  for (auto& coll : collections) {
+    string fn = fmt::format("{}/{}", path, coll);
+    bufferlist cbl;
+    if (int r = cbl.read_file(fn.c_str(), &err); r < 0) {
+      throw std::runtime_error("read_file");
+    }
+    CollectionRef c{new Collection{coll}};
+    auto p = cbl.cbegin();
+    c->decode(p);
+    coll_map[coll] = c;
+    used_bytes += c->used_bytes();
+  }
+  return seastar::now();
+}
+
+seastar::future<> CyanStore::umount()
+{
+  set<coll_t> collections;
+  for (auto& [col, ch] : coll_map) {
+    collections.insert(col);
+    bufferlist bl;
+    ceph_assert(ch);
+    ch->encode(bl);
+    string fn = fmt::format("{}/{}", path, col);
+    if (int r = bl.write_file(fn.c_str()); r < 0) {
+      throw std::runtime_error("write_file");
+    }
+  }
+
+  string fn = path + "/collections";
+  bufferlist bl;
+  encode(collections, bl);
+  if (int r = bl.write_file(fn.c_str()); r < 0) {
+    throw std::runtime_error("write_file");
+  }
+  return seastar::now();
+}
+
+seastar::future<> CyanStore::mkfs(uuid_d osd_fsid)
+{
+  string fsid_str;
+  int r = read_meta("fsid", &fsid_str);
+  if (r == -ENOENT) {
+    write_meta("fsid", fmt::format("{}", osd_fsid));
+  } else if (r < 0) {
+    throw std::runtime_error("read_meta");
+  } else {
+    logger().error("{} already has fsid {}", __func__, fsid_str);
+    throw std::runtime_error("mkfs");
+  }
+
+  string fn = path + "/collections";
+  bufferlist bl;
+  set<coll_t> collections;
+  encode(collections, bl);
+  r = bl.write_file(fn.c_str());
+  if (r < 0)
+    throw std::runtime_error("write_file");
+
+  write_meta("type", "memstore");
+  return seastar::now();
+}
+
+CyanStore::CollectionRef CyanStore::create_new_collection(const coll_t& cid)
+{
+  auto c = new Collection{cid};
+  return new_coll_map[cid] = c;
+}
+
+CyanStore::CollectionRef CyanStore::open_collection(const coll_t& cid)
+{
+  auto cp = coll_map.find(cid);
+  if (cp == coll_map.end())
+    return {};
+  return cp->second;
+}
+
+std::vector<coll_t> CyanStore::list_collections()
+{
+  std::vector<coll_t> collections;
+  for (auto& coll : coll_map) {
+    collections.push_back(coll.first);
+  }
+  return collections;
+}
+
+seastar::future<bufferlist> CyanStore::read(CollectionRef c,
+                                            const ghobject_t& oid,
+                                            uint64_t offset,
+                                            size_t len,
+                                            uint32_t op_flags)
+{
+  logger().info("{} {} {} {}~{}",
+                __func__, c->cid, oid, offset, len);
+  if (!c->exists) {
+    throw std::runtime_error("collection does not exist");
+  }
+  ObjectRef o = c->get_object(oid);
+  if (!o) {
+    throw std::runtime_error("object does not exist");
+  }
+  if (offset >= o->get_size())
+    return seastar::make_ready_future<bufferlist>();
+  size_t l = len;
+  if (l == 0 && offset == 0)  // note: len == 0 means read the entire object
+    l = o->get_size();
+  else if (offset + l > o->get_size())
+    l = o->get_size() - offset;
+  bufferlist bl;
+  if (int r = o->read(offset, l, bl); r < 0) {
+    throw std::runtime_error("read");
+  }
+  return seastar::make_ready_future<bufferlist>(std::move(bl));
+}
+
+seastar::future<> CyanStore::do_transaction(CollectionRef ch,
+                                            Transaction&& t)
+{
+  auto i = t.begin();
+  while (i.have_op()) {
+    Transaction::Op* op = i.decode_op();
+    int r = 0;
+    switch (op->op) {
+    case Transaction::OP_NOP:
+      break;
+    case Transaction::OP_WRITE:
+      {
+        coll_t cid = i.get_cid(op->cid);
+        ghobject_t oid = i.get_oid(op->oid);
+        uint64_t off = op->off;
+        uint64_t len = op->len;
+        uint32_t fadvise_flags = i.get_fadvise_flags();
+        bufferlist bl;
+        i.decode_bl(bl);
+        r = _write(cid, oid, off, len, bl, fadvise_flags);
+      }
+      break;
+    case Transaction::OP_MKCOLL:
+      {
+        coll_t cid = i.get_cid(op->cid);
+        r = _create_collection(cid, op->split_bits);
+      }
+      break;
+    default:
+      logger().error("bad op {}", static_cast<unsigned>(op->op));
+      abort();
+    }
+    if (r < 0) {
+      abort();
+    }
+  }
+  return seastar::now();
+}
+
+int CyanStore::_write(const coll_t& cid, const ghobject_t& oid,
+                       uint64_t offset, size_t len, const bufferlist& bl,
+                       uint32_t fadvise_flags)
+{
+  logger().info("{} {} {} {} ~ {}",
+                __func__, cid, oid, offset, len);
+  assert(len == bl.length());
+
+  auto c = open_collection(cid);
+  if (!c)
+    return -ENOENT;
+
+  ObjectRef o = c->get_or_create_object(oid);
+  if (len > 0) {
+    const ssize_t old_size = o->get_size();
+    o->write(offset, bl);
+    used_bytes += (o->get_size() - old_size);
+  }
+
+  return 0;
+}
+
+int CyanStore::_create_collection(const coll_t& cid, int bits)
+{
+  auto result = coll_map.insert(std::make_pair(cid, CollectionRef()));
+  if (!result.second)
+    return -EEXIST;
+  auto p = new_coll_map.find(cid);
+  assert(p != new_coll_map.end());
+  result.first->second = p->second;
+  result.first->second->bits = bits;
+  new_coll_map.erase(p);
+  return 0;
+}
+
 void CyanStore::write_meta(const std::string& key,
                            const std::string& value)
 {
@@ -16,16 +238,20 @@ void CyanStore::write_meta(const std::string& key,
   }
 }
 
-std::string CyanStore::read_meta(const std::string& key)
+int CyanStore::read_meta(const std::string& key,
+                          std::string* value)
 {
   char buf[4096];
   int r = safe_read_file(path.c_str(), key.c_str(),
                          buf, sizeof(buf));
-  if (r <= 0)
-    throw std::runtime_error{fmt::format("unable to read_meta({})", key)};
+  if (r <= 0) {
+    return r;
+  }
   // drop trailing newlines
   while (r && isspace(buf[r-1])) {
     --r;
   }
-  return std::string(buf, r);
+  *value = string{buf, static_cast<size_t>(r)};
+  return 0;
+}
 }
index 0683d4e427daf37fe6ebe4280a6c0039d629a7d7..e85b6d076b6b6d3dcf718fb82fc3bab49096c7c8 100644 (file)
@@ -4,17 +4,55 @@
 #pragma once
 
 #include <string>
+#include <unordered_map>
+#include <map>
+#include <vector>
+#include <seastar/core/future.hh>
+#include "osd/osd_types.h"
 #include "include/uuid.h"
 
+namespace ceph::os {
+
+class Collection;
+class Transaction;
+
 // a just-enough store for reading/writing the superblock
 class CyanStore {
+  using CollectionRef = boost::intrusive_ptr<Collection>;
   const std::string path;
+  std::unordered_map<coll_t, CollectionRef> coll_map;
+  std::map<coll_t,CollectionRef> new_coll_map;
+  uint64_t used_bytes = 0;
+
 public:
-  CyanStore(const std::string& path)
-    : path{path}
-  {}
+  CyanStore(const std::string& path);
+  ~CyanStore();
+
+  seastar::future<> mount();
+  seastar::future<> umount();
+
+  seastar::future<> mkfs(uuid_d osd_fsid);
+  seastar::future<bufferlist> read(CollectionRef c,
+                                  const ghobject_t& oid,
+                                  uint64_t offset,
+                                  size_t len,
+                                  uint32_t op_flags = 0);
+  CollectionRef create_new_collection(const coll_t& cid);
+  CollectionRef open_collection(const coll_t& cid);
+  std::vector<coll_t> list_collections();
+
+  seastar::future<> do_transaction(CollectionRef ch,
+                                  Transaction&& txn);
 
   void write_meta(const std::string& key,
                  const std::string& value);
-  std::string read_meta(const std::string& key);
+  int read_meta(const std::string& key, std::string* value);
+
+private:
+  int _write(const coll_t& cid, const ghobject_t& oid,
+            uint64_t offset, size_t len, const bufferlist& bl,
+            uint32_t fadvise_flags);
+  int _create_collection(const coll_t& cid, int bits);
 };
+
+}
index 72ac133c0d1bd3158b8fa98dd26acf13f462d220..82d4f2520134aee9a8024c00f7736f6c6ed1a7d1 100644 (file)
@@ -5,6 +5,10 @@
 #include "messages/MOSDMap.h"
 #include "crimson/net/Connection.h"
 #include "crimson/net/SocketMessenger.h"
+#include "crimson/os/cyan_collection.h"
+#include "crimson/os/cyan_object.h"
+#include "crimson/os/cyan_store.h"
+#include "crimson/os/Transaction.h"
 
 namespace {
   seastar::logger& logger() {
@@ -19,6 +23,7 @@ namespace {
 }
 
 using ceph::common::local_conf;
+using ceph::os::CyanStore;
 
 OSD::OSD(int id, uint32_t nonce)
   : whoami{id},
@@ -48,7 +53,7 @@ OSD::~OSD() = default;
 
 seastar::future<> OSD::mkfs(uuid_d cluster_fsid, int whoami)
 {
-  CyanStore store{local_conf().get_val<std::string>("osd_data")};
+  ceph::os::CyanStore store{local_conf().get_val<std::string>("osd_data")};
   uuid_d osd_fsid;
   osd_fsid.generate_random();
   store.write_meta("fsid", osd_fsid.to_string());
@@ -60,9 +65,12 @@ seastar::future<> OSD::mkfs(uuid_d cluster_fsid, int whoami)
 seastar::future<> OSD::start()
 {
   logger().info("start");
-  auto& conf = local_conf();
-  store = std::make_unique<CyanStore>(conf.get_val<std::string>("osd_data"));
-  return read_superblock().then([this] {
+  const auto data_path = local_conf().get_val<std::string>("osd_data");
+  store = std::make_unique<ceph::os::CyanStore>(data_path);
+  return store->mount().then([this] {
+    meta_coll = store->open_collection(coll_t::meta());
+    return read_superblock();
+  }).then([this] {
     osdmap = get_map(superblock.current_epoch);
     return client_msgr->start(&dispatchers);
   }).then([this] {
@@ -220,15 +228,22 @@ seastar::future<> OSD::osdmap_subscribe(version_t epoch, bool force_request)
   }
 }
 
+void OSD::write_superblock(ceph::os::Transaction& t)
+{
+  bufferlist bl;
+  encode(superblock, bl);
+  t.write(meta_coll->cid, OSD_SUPERBLOCK_GOBJECT, 0, bl.length(), bl);
+}
+
 seastar::future<> OSD::read_superblock()
 {
   // just-enough superblock so mon can ack my MOSDBoot
-  // might want to have a PurpleStore which is able to read the meta data for us.
-  string ceph_fsid = store->read_meta("ceph_fsid");
-  superblock.cluster_fsid.parse(ceph_fsid.c_str());
-  string osd_fsid = store->read_meta("fsid");
-  superblock.osd_fsid.parse(osd_fsid.c_str());
-  return seastar::now();
+  return store->read(meta_coll, OSD_SUPERBLOCK_GOBJECT, 0, 0)
+    .then([this] (bufferlist&& bl) {
+      auto p = bl.cbegin();
+      decode(superblock, p);
+      return seastar::now();
+  });
 }
 
 seastar::future<> OSD::handle_osd_map(ceph::net::ConnectionRef conn,
index 9a1ada416ad47dd23455cb95ed36cba084a0c549..f565bfbf943edc237bd7f5732358e00972c9dff1 100644 (file)
@@ -8,7 +8,6 @@
 
 #include "crimson/mon/MonClient.h"
 #include "crimson/net/Dispatcher.h"
-#include "crimson/os/cyan_store.h"
 #include "crimson/osd/chained_dispatchers.h"
 #include "crimson/osd/state.h"
 
@@ -21,6 +20,11 @@ namespace ceph::net {
   class Messenger;
 }
 
+namespace ceph::os {
+  class CyanStore;
+  struct Collection;
+}
+
 class OSD : public ceph::net::Dispatcher {
   seastar::gate gate;
   seastar::timer<seastar::lowres_clock> beacon_timer;
@@ -36,7 +40,9 @@ class OSD : public ceph::net::Dispatcher {
   std::map<epoch_t, seastar::lw_shared_ptr<OSDMap>> osdmaps;
   seastar::lw_shared_ptr<OSDMap> osdmap;
   // TODO: use a wrapper for ObjectStore
-  std::unique_ptr<CyanStore> store;
+  std::unique_ptr<ceph::os::CyanStore> store;
+  using CollectionRef = boost::intrusive_ptr<ceph::os::Collection>;
+  CollectionRef meta_coll;
 
   OSDState state;