From: Samuel Just Date: Mon, 29 Apr 2019 17:11:01 +0000 (-0700) Subject: os: break Transaction of of Objectstore, use in crimson X-Git-Tag: v15.1.0~2702^2~4 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=8a2ea310f5591a1b05e70ac88656342d0cd2a59c;p=ceph.git os: break Transaction of of Objectstore, use in crimson Transaction is needed in crimson, but not ObjectStore. Signed-off-by: Samuel Just --- diff --git a/src/crimson/os/CMakeLists.txt b/src/crimson/os/CMakeLists.txt index 264e35252e3e..6362148bd576 100644 --- a/src/crimson/os/CMakeLists.txt +++ b/src/crimson/os/CMakeLists.txt @@ -2,6 +2,6 @@ add_library(crimson-os cyan_store.cc cyan_collection.cc cyan_object.cc - Transaction.cc) + ${PROJECT_SOURCE_DIR}/src/os/Transaction.cc) target_link_libraries(crimson-os crimson) diff --git a/src/crimson/os/Transaction.cc b/src/crimson/os/Transaction.cc deleted file mode 100644 index acf99c5926cd..000000000000 --- a/src/crimson/os/Transaction.cc +++ /dev/null @@ -1,507 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab - -#include "Transaction.h" -#include "include/denc.h" - -namespace ceph::os -{ - -void Transaction::iterator::decode_attrset_bl(bufferlist *pbl) -{ - using ceph::decode; - auto start = data_bl_p; - __u32 n; - decode(n, data_bl_p); - unsigned len = 4; - while (n--) { - __u32 l; - decode(l, data_bl_p); - data_bl_p.advance(l); - len += 4 + l; - decode(l, data_bl_p); - data_bl_p.advance(l); - len += 4 + l; - } - start.copy(len, *pbl); -} - -void Transaction::iterator::decode_keyset_bl(bufferlist *pbl) -{ - using ceph::decode; - auto start = data_bl_p; - __u32 n; - decode(n, data_bl_p); - unsigned len = 4; - while (n--) { - __u32 l; - decode(l, data_bl_p); - data_bl_p.advance(l); - len += 4 + l; - } - start.copy(len, *pbl); -} - -void Transaction::dump(ceph::Formatter *f) -{ - f->open_array_section("ops"); - iterator i = begin(); - int op_num = 0; - bool stop_looping = false; - while (i.have_op() && !stop_looping) { - Transaction::Op *op = i.decode_op(); - f->open_object_section("op"); - f->dump_int("op_num", op_num); - - switch (op->op) { - case Transaction::OP_NOP: - f->dump_string("op_name", "nop"); - break; - case Transaction::OP_TOUCH: - { - coll_t cid = i.get_cid(op->cid); - ghobject_t oid = i.get_oid(op->oid); - f->dump_string("op_name", "touch"); - f->dump_stream("collection") << cid; - f->dump_stream("oid") << oid; - } - break; - - case Transaction::OP_WRITE: - { - coll_t cid = i.get_cid(op->cid); - ghobject_t oid = i.get_oid(op->oid); - uint64_t off = op->off; - uint64_t len = op->len; - bufferlist bl; - i.decode_bl(bl); - f->dump_string("op_name", "write"); - f->dump_stream("collection") << cid; - f->dump_stream("oid") << oid; - f->dump_unsigned("length", len); - f->dump_unsigned("offset", off); - f->dump_unsigned("bufferlist length", bl.length()); - } - break; - - case Transaction::OP_ZERO: - { - coll_t cid = i.get_cid(op->cid); - ghobject_t oid = i.get_oid(op->oid); - uint64_t off = op->off; - uint64_t len = op->len; - f->dump_string("op_name", "zero"); - f->dump_stream("collection") << cid; - f->dump_stream("oid") << oid; - f->dump_unsigned("offset", off); - f->dump_unsigned("length", len); - } - break; - - case Transaction::OP_TRIMCACHE: - { - // deprecated, no-op - f->dump_string("op_name", "trim_cache"); - } - break; - - case Transaction::OP_TRUNCATE: - { - coll_t cid = i.get_cid(op->cid); - ghobject_t oid = i.get_oid(op->oid); - uint64_t off = op->off; - f->dump_string("op_name", "truncate"); - f->dump_stream("collection") << cid; - f->dump_stream("oid") << oid; - f->dump_unsigned("offset", off); - } - break; - - case Transaction::OP_REMOVE: - { - coll_t cid = i.get_cid(op->cid); - ghobject_t oid = i.get_oid(op->oid); - f->dump_string("op_name", "remove"); - f->dump_stream("collection") << cid; - f->dump_stream("oid") << oid; - } - break; - - case Transaction::OP_SETATTR: - { - coll_t cid = i.get_cid(op->cid); - ghobject_t oid = i.get_oid(op->oid); - string name = i.decode_string(); - bufferlist bl; - i.decode_bl(bl); - f->dump_string("op_name", "setattr"); - f->dump_stream("collection") << cid; - f->dump_stream("oid") << oid; - f->dump_string("name", name); - f->dump_unsigned("length", bl.length()); - } - break; - - case Transaction::OP_SETATTRS: - { - coll_t cid = i.get_cid(op->cid); - ghobject_t oid = i.get_oid(op->oid); - map aset; - i.decode_attrset(aset); - f->dump_string("op_name", "setattrs"); - f->dump_stream("collection") << cid; - f->dump_stream("oid") << oid; - f->open_object_section("attr_lens"); - for (map::iterator p = aset.begin(); - p != aset.end(); ++p) { - f->dump_unsigned(p->first.c_str(), p->second.length()); - } - f->close_section(); - } - break; - - case Transaction::OP_RMATTR: - { - coll_t cid = i.get_cid(op->cid); - ghobject_t oid = i.get_oid(op->oid); - string name = i.decode_string(); - f->dump_string("op_name", "rmattr"); - f->dump_stream("collection") << cid; - f->dump_stream("oid") << oid; - f->dump_string("name", name); - } - break; - - case Transaction::OP_RMATTRS: - { - coll_t cid = i.get_cid(op->cid); - ghobject_t oid = i.get_oid(op->oid); - f->dump_string("op_name", "rmattrs"); - f->dump_stream("collection") << cid; - f->dump_stream("oid") << oid; - } - break; - - case Transaction::OP_CLONE: - { - coll_t cid = i.get_cid(op->cid); - ghobject_t oid = i.get_oid(op->oid); - ghobject_t noid = i.get_oid(op->dest_oid); - f->dump_string("op_name", "clone"); - f->dump_stream("collection") << cid; - f->dump_stream("src_oid") << oid; - f->dump_stream("dst_oid") << noid; - } - break; - - case Transaction::OP_CLONERANGE: - { - coll_t cid = i.get_cid(op->cid); - ghobject_t oid = i.get_oid(op->oid); - ghobject_t noid = i.get_oid(op->dest_oid); - uint64_t off = op->off; - uint64_t len = op->len; - f->dump_string("op_name", "clonerange"); - f->dump_stream("collection") << cid; - f->dump_stream("src_oid") << oid; - f->dump_stream("dst_oid") << noid; - f->dump_unsigned("offset", off); - f->dump_unsigned("len", len); - } - break; - - case Transaction::OP_CLONERANGE2: - { - coll_t cid = i.get_cid(op->cid); - ghobject_t oid = i.get_oid(op->oid); - ghobject_t noid = i.get_oid(op->dest_oid); - uint64_t srcoff = op->off; - uint64_t len = op->len; - uint64_t dstoff = op->dest_off; - f->dump_string("op_name", "clonerange2"); - f->dump_stream("collection") << cid; - f->dump_stream("src_oid") << oid; - f->dump_stream("dst_oid") << noid; - f->dump_unsigned("src_offset", srcoff); - f->dump_unsigned("len", len); - f->dump_unsigned("dst_offset", dstoff); - } - break; - - case Transaction::OP_MKCOLL: - { - coll_t cid = i.get_cid(op->cid); - f->dump_string("op_name", "mkcoll"); - f->dump_stream("collection") << cid; - } - break; - - case Transaction::OP_COLL_HINT: - { - using ceph::decode; - coll_t cid = i.get_cid(op->cid); - uint32_t type = op->hint_type; - f->dump_string("op_name", "coll_hint"); - f->dump_stream("collection") << cid; - f->dump_unsigned("type", type); - bufferlist hint; - i.decode_bl(hint); - auto hiter = hint.cbegin(); - if (type == Transaction::COLL_HINT_EXPECTED_NUM_OBJECTS) { - uint32_t pg_num; - uint64_t num_objs; - decode(pg_num, hiter); - decode(num_objs, hiter); - f->dump_unsigned("pg_num", pg_num); - f->dump_unsigned("expected_num_objects", num_objs); - } - } - break; - - case Transaction::OP_COLL_SET_BITS: - { - coll_t cid = i.get_cid(op->cid); - f->dump_string("op_name", "coll_set_bits"); - f->dump_stream("collection") << cid; - f->dump_unsigned("bits", op->split_bits); - } - break; - - case Transaction::OP_RMCOLL: - { - coll_t cid = i.get_cid(op->cid); - f->dump_string("op_name", "rmcoll"); - f->dump_stream("collection") << cid; - } - break; - - case Transaction::OP_COLL_ADD: - { - coll_t ocid = i.get_cid(op->cid); - coll_t ncid = i.get_cid(op->dest_cid); - ghobject_t oid = i.get_oid(op->oid); - f->dump_string("op_name", "collection_add"); - f->dump_stream("src_collection") << ocid; - f->dump_stream("dst_collection") << ncid; - f->dump_stream("oid") << oid; - } - break; - - case Transaction::OP_COLL_REMOVE: - { - coll_t cid = i.get_cid(op->cid); - ghobject_t oid = i.get_oid(op->oid); - f->dump_string("op_name", "collection_remove"); - f->dump_stream("collection") << cid; - f->dump_stream("oid") << oid; - } - break; - - case Transaction::OP_COLL_MOVE: - { - coll_t ocid = i.get_cid(op->cid); - coll_t ncid = i.get_cid(op->dest_cid); - ghobject_t oid = i.get_oid(op->oid); - f->open_object_section("collection_move"); - f->dump_stream("src_collection") << ocid; - f->dump_stream("dst_collection") << ncid; - f->dump_stream("oid") << oid; - f->close_section(); - } - break; - - case Transaction::OP_COLL_SETATTR: - { - coll_t cid = i.get_cid(op->cid); - string name = i.decode_string(); - bufferlist bl; - i.decode_bl(bl); - f->dump_string("op_name", "collection_setattr"); - f->dump_stream("collection") << cid; - f->dump_string("name", name); - f->dump_unsigned("length", bl.length()); - } - break; - - case Transaction::OP_COLL_RMATTR: - { - coll_t cid = i.get_cid(op->cid); - string name = i.decode_string(); - f->dump_string("op_name", "collection_rmattr"); - f->dump_stream("collection") << cid; - f->dump_string("name", name); - } - break; - - case Transaction::OP_COLL_RENAME: - { - f->dump_string("op_name", "collection_rename"); - } - break; - - case Transaction::OP_OMAP_CLEAR: - { - coll_t cid = i.get_cid(op->cid); - ghobject_t oid = i.get_oid(op->oid); - f->dump_string("op_name", "omap_clear"); - f->dump_stream("collection") << cid; - f->dump_stream("oid") << oid; - } - break; - - case Transaction::OP_OMAP_SETKEYS: - { - coll_t cid = i.get_cid(op->cid); - ghobject_t oid = i.get_oid(op->oid); - map aset; - i.decode_attrset(aset); - f->dump_string("op_name", "omap_setkeys"); - f->dump_stream("collection") << cid; - f->dump_stream("oid") << oid; - f->open_object_section("attr_lens"); - for (map::iterator p = aset.begin(); - p != aset.end(); ++p) { - f->dump_unsigned(p->first.c_str(), p->second.length()); - } - f->close_section(); - } - break; - - case Transaction::OP_OMAP_RMKEYS: - { - coll_t cid = i.get_cid(op->cid); - ghobject_t oid = i.get_oid(op->oid); - set keys; - i.decode_keyset(keys); - f->dump_string("op_name", "omap_rmkeys"); - f->dump_stream("collection") << cid; - f->dump_stream("oid") << oid; - f->open_array_section("attrs"); - for (auto& k : keys) { - f->dump_string("", k.c_str()); - } - f->close_section(); - } - break; - - case Transaction::OP_OMAP_SETHEADER: - { - coll_t cid = i.get_cid(op->cid); - ghobject_t oid = i.get_oid(op->oid); - bufferlist bl; - i.decode_bl(bl); - f->dump_string("op_name", "omap_setheader"); - f->dump_stream("collection") << cid; - f->dump_stream("oid") << oid; - f->dump_stream("header_length") << bl.length(); - } - break; - - case Transaction::OP_SPLIT_COLLECTION: - { - coll_t cid = i.get_cid(op->cid); - uint32_t bits = op->split_bits; - uint32_t rem = op->split_rem; - coll_t dest = i.get_cid(op->dest_cid); - f->dump_string("op_name", "op_split_collection_create"); - f->dump_stream("collection") << cid; - f->dump_stream("bits") << bits; - f->dump_stream("rem") << rem; - f->dump_stream("dest") << dest; - } - break; - - case Transaction::OP_SPLIT_COLLECTION2: - { - coll_t cid = i.get_cid(op->cid); - uint32_t bits = op->split_bits; - uint32_t rem = op->split_rem; - coll_t dest = i.get_cid(op->dest_cid); - f->dump_string("op_name", "op_split_collection"); - f->dump_stream("collection") << cid; - f->dump_stream("bits") << bits; - f->dump_stream("rem") << rem; - f->dump_stream("dest") << dest; - } - break; - - case Transaction::OP_MERGE_COLLECTION: - { - coll_t cid = i.get_cid(op->cid); - uint32_t bits = op->split_bits; - coll_t dest = i.get_cid(op->dest_cid); - f->dump_string("op_name", "op_merge_collection"); - f->dump_stream("collection") << cid; - f->dump_stream("dest") << dest; - f->dump_stream("bits") << bits; - } - break; - - case Transaction::OP_OMAP_RMKEYRANGE: - { - coll_t cid = i.get_cid(op->cid); - ghobject_t oid = i.get_oid(op->oid); - string first, last; - first = i.decode_string(); - last = i.decode_string(); - f->dump_string("op_name", "op_omap_rmkeyrange"); - f->dump_stream("collection") << cid; - f->dump_stream("oid") << oid; - f->dump_string("first", first); - f->dump_string("last", last); - } - break; - - case Transaction::OP_COLL_MOVE_RENAME: - { - coll_t old_cid = i.get_cid(op->cid); - ghobject_t old_oid = i.get_oid(op->oid); - coll_t new_cid = i.get_cid(op->dest_cid); - ghobject_t new_oid = i.get_oid(op->dest_oid); - f->dump_string("op_name", "op_coll_move_rename"); - f->dump_stream("old_collection") << old_cid; - f->dump_stream("old_oid") << old_oid; - f->dump_stream("new_collection") << new_cid; - f->dump_stream("new_oid") << new_oid; - } - break; - - case Transaction::OP_TRY_RENAME: - { - coll_t cid = i.get_cid(op->cid); - ghobject_t old_oid = i.get_oid(op->oid); - ghobject_t new_oid = i.get_oid(op->dest_oid); - f->dump_string("op_name", "op_coll_move_rename"); - f->dump_stream("collection") << cid; - f->dump_stream("old_oid") << old_oid; - f->dump_stream("new_oid") << new_oid; - } - break; - - case Transaction::OP_SETALLOCHINT: - { - coll_t cid = i.get_cid(op->cid); - ghobject_t oid = i.get_oid(op->oid); - uint64_t expected_object_size = op->expected_object_size; - uint64_t expected_write_size = op->expected_write_size; - f->dump_string("op_name", "op_setallochint"); - f->dump_stream("collection") << cid; - f->dump_stream("oid") << oid; - f->dump_stream("expected_object_size") << expected_object_size; - f->dump_stream("expected_write_size") << expected_write_size; - } - break; - - default: - f->dump_string("op_name", "unknown"); - f->dump_unsigned("op_code", op->op); - stop_looping = true; - break; - } - f->close_section(); - op_num++; - } - f->close_section(); -} - -} diff --git a/src/crimson/os/Transaction.h b/src/crimson/os/Transaction.h deleted file mode 100644 index 3187bad706b7..000000000000 --- a/src/crimson/os/Transaction.h +++ /dev/null @@ -1,1236 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab - -#pragma once - -#include - -#include "include/int_types.h" -#include "include/buffer.h" -#include "osd/osd_types.h" - -/********************************* - * transaction - * - * A Transaction represents a sequence of primitive mutation - * operations. - * - * Three events in the life of a Transaction result in - * callbacks. Any Transaction can contain any number of callback - * objects (Context) for any combination of the three classes of - * callbacks: - * - * on_applied_sync, on_applied, and on_commit. - * - * The "on_applied" and "on_applied_sync" callbacks are invoked when - * the modifications requested by the Transaction are visible to - * subsequent ObjectStore operations, i.e., the results are - * readable. The only conceptual difference between on_applied and - * on_applied_sync is the specific thread and locking environment in - * which the callbacks operate. "on_applied_sync" is called - * directly by an ObjectStore execution thread. It is expected to - * execute quickly and must not acquire any locks of the calling - * environment. Conversely, "on_applied" is called from the separate - * Finisher thread, meaning that it can contend for calling - * environment locks. NB, on_applied and on_applied_sync are - * sometimes called on_readable and on_readable_sync. - * - * The "on_commit" callback is also called from the Finisher thread - * and indicates that all of the mutations have been durably - * committed to stable storage (i.e., are now software/hardware - * crashproof). - * - * At the implementation level, each mutation primitive (and its - * associated data) can be serialized to a single buffer. That - * serialization, however, does not copy any data, but (using the - * bufferlist library) will reference the original buffers. This - * implies that the buffer that contains the data being submitted - * must remain stable until the on_commit callback completes. In - * practice, bufferlist handles all of this for you and this - * subtlety is only relevant if you are referencing an existing - * buffer via buffer::raw_static. - * - * Some implementations of ObjectStore choose to implement their own - * form of journaling that uses the serialized form of a - * Transaction. This requires that the encode/decode logic properly - * version itself and handle version upgrades that might change the - * format of the encoded Transaction. This has already happened a - * couple of times and the Transaction object contains some helper - * variables that aid in this legacy decoding: - * - * sobject_encoding detects an older/simpler version of oid - * present in pre-bobtail versions of ceph. use_pool_override - * also detects a situation where the pool of an oid can be - * overridden for legacy operations/buffers. For non-legacy - * implementations of ObjectStore, neither of these fields are - * relevant. - * - * - * TRANSACTION ISOLATION - * - * Except as noted above, isolation is the responsibility of the - * caller. In other words, if any storage element (storage element - * == any of the four portions of an object as described above) is - * altered by a transaction (including deletion), the caller - * promises not to attempt to read that element while the - * transaction is pending (here pending means from the time of - * issuance until the "on_applied_sync" callback has been - * received). Violations of isolation need not be detected by - * ObjectStore and there is no corresponding error mechanism for - * reporting an isolation violation (crashing would be the - * appropriate way to report an isolation violation if detected). - * - * Enumeration operations may violate transaction isolation as - * described above when a storage element is being created or - * deleted as part of a transaction. In this case, ObjectStore is - * allowed to consider the enumeration operation to either precede - * or follow the violating transaction element. In other words, the - * presence/absence of the mutated element in the enumeration is - * entirely at the discretion of ObjectStore. The arbitrary ordering - * applies independently to each transaction element. For example, - * if a transaction contains two mutating elements "create A" and - * "delete B". And an enumeration operation is performed while this - * transaction is pending. It is permissible for ObjectStore to - * report any of the four possible combinations of the existence of - * A and B. - * - */ -namespace ceph::os { -class Transaction { -public: - enum { - OP_NOP = 0, - OP_TOUCH = 9, // cid, oid - OP_WRITE = 10, // cid, oid, offset, len, bl - OP_ZERO = 11, // cid, oid, offset, len - OP_TRUNCATE = 12, // cid, oid, len - OP_REMOVE = 13, // cid, oid - OP_SETATTR = 14, // cid, oid, attrname, bl - OP_SETATTRS = 15, // cid, oid, attrset - OP_RMATTR = 16, // cid, oid, attrname - OP_CLONE = 17, // cid, oid, newoid - OP_CLONERANGE = 18, // cid, oid, newoid, offset, len - OP_CLONERANGE2 = 30, // cid, oid, newoid, srcoff, len, dstoff - - OP_TRIMCACHE = 19, // cid, oid, offset, len **DEPRECATED** - - OP_MKCOLL = 20, // cid - OP_RMCOLL = 21, // cid - OP_COLL_ADD = 22, // cid, oldcid, oid - OP_COLL_REMOVE = 23, // cid, oid - OP_COLL_SETATTR = 24, // cid, attrname, bl - OP_COLL_RMATTR = 25, // cid, attrname - OP_COLL_SETATTRS = 26, // cid, attrset - OP_COLL_MOVE = 8, // newcid, oldcid, oid - - OP_RMATTRS = 28, // cid, oid - OP_COLL_RENAME = 29, // cid, newcid - - OP_OMAP_CLEAR = 31, // cid - OP_OMAP_SETKEYS = 32, // cid, attrset - OP_OMAP_RMKEYS = 33, // cid, keyset - OP_OMAP_SETHEADER = 34, // cid, header - OP_SPLIT_COLLECTION = 35, // cid, bits, destination - OP_SPLIT_COLLECTION2 = 36, /* cid, bits, destination - doesn't create the destination */ - OP_OMAP_RMKEYRANGE = 37, // cid, oid, firstkey, lastkey - OP_COLL_MOVE_RENAME = 38, // oldcid, oldoid, newcid, newoid - OP_SETALLOCHINT = 39, // cid, oid, object_size, write_size - OP_COLL_HINT = 40, // cid, type, bl - - OP_TRY_RENAME = 41, // oldcid, oldoid, newoid - - OP_COLL_SET_BITS = 42, // cid, bits - - OP_MERGE_COLLECTION = 43, // cid, destination - }; - - // Transaction hint type - enum { - COLL_HINT_EXPECTED_NUM_OBJECTS = 1, - }; - - struct Op { - __le32 op; - __le32 cid; - __le32 oid; - __le64 off; - __le64 len; - __le32 dest_cid; - __le32 dest_oid; //OP_CLONE, OP_CLONERANGE - __le64 dest_off; //OP_CLONERANGE - union { - struct { - __le32 hint_type; //OP_COLL_HINT - }; - struct { - __le32 alloc_hint_flags; //OP_SETALLOCHINT - }; - }; - __le64 expected_object_size; //OP_SETALLOCHINT - __le64 expected_write_size; //OP_SETALLOCHINT - __le32 split_bits; //OP_SPLIT_COLLECTION2,OP_COLL_SET_BITS, - //OP_MKCOLL - __le32 split_rem; //OP_SPLIT_COLLECTION2 - } __attribute__ ((packed)) ; - - struct TransactionData { - __le64 ops; - __le32 largest_data_len; - __le32 largest_data_off; - __le32 largest_data_off_in_data_bl; - __le32 fadvise_flags; - - TransactionData() noexcept : - ops(0), - largest_data_len(0), - largest_data_off(0), - largest_data_off_in_data_bl(0), - fadvise_flags(0) { } - - // override default move operations to reset default values - TransactionData(TransactionData&& other) noexcept : - ops(other.ops), - largest_data_len(other.largest_data_len), - largest_data_off(other.largest_data_off), - largest_data_off_in_data_bl(other.largest_data_off_in_data_bl), - fadvise_flags(other.fadvise_flags) { - other.ops = 0; - other.largest_data_len = 0; - other.largest_data_off = 0; - other.largest_data_off_in_data_bl = 0; - other.fadvise_flags = 0; - } - TransactionData& operator=(TransactionData&& other) noexcept { - ops = other.ops; - largest_data_len = other.largest_data_len; - largest_data_off = other.largest_data_off; - largest_data_off_in_data_bl = other.largest_data_off_in_data_bl; - fadvise_flags = other.fadvise_flags; - other.ops = 0; - other.largest_data_len = 0; - other.largest_data_off = 0; - other.largest_data_off_in_data_bl = 0; - other.fadvise_flags = 0; - return *this; - } - - TransactionData(const TransactionData& other) = default; - TransactionData& operator=(const TransactionData& other) = default; - - void encode(bufferlist& bl) const { - bl.append((char*)this, sizeof(TransactionData)); - } - void decode(bufferlist::const_iterator &bl) { - bl.copy(sizeof(TransactionData), (char*)this); - } - } __attribute__ ((packed)) ; - -private: - TransactionData data; - - std::map coll_index; - std::map object_index; - - __le32 coll_id {0}; - __le32 object_id {0}; - - bufferlist data_bl; - bufferlist op_bl; - - std::list on_applied; - std::list on_commit; - std::list on_applied_sync; - -public: - Transaction() = default; - - explicit Transaction(bufferlist::const_iterator &dp) { - decode(dp); - } - explicit Transaction(bufferlist &nbl) { - auto dp = nbl.cbegin(); - decode(dp); - } - - // override default move operations to reset default values - Transaction(Transaction&& other) noexcept : - data(std::move(other.data)), - coll_index(std::move(other.coll_index)), - object_index(std::move(other.object_index)), - coll_id(other.coll_id), - object_id(other.object_id), - data_bl(std::move(other.data_bl)), - op_bl(std::move(other.op_bl)), - on_applied(std::move(other.on_applied)), - on_commit(std::move(other.on_commit)), - on_applied_sync(std::move(other.on_applied_sync)) { - other.coll_id = 0; - other.object_id = 0; - } - - Transaction& operator=(Transaction&& other) noexcept { - data = std::move(other.data); - coll_index = std::move(other.coll_index); - object_index = std::move(other.object_index); - coll_id = other.coll_id; - object_id = other.object_id; - data_bl = std::move(other.data_bl); - op_bl = std::move(other.op_bl); - on_applied = std::move(other.on_applied); - on_commit = std::move(other.on_commit); - on_applied_sync = std::move(other.on_applied_sync); - other.coll_id = 0; - other.object_id = 0; - return *this; - } - - Transaction(const Transaction& other) = default; - Transaction& operator=(const Transaction& other) = default; - - // expose object_index for FileStore::Op's benefit - const map& get_object_index() const { - return object_index; - } - - /* Operations on callback contexts */ - void register_on_applied(Context *c) { - if (!c) return; - on_applied.push_back(c); - } - void register_on_commit(Context *c) { - if (!c) return; - on_commit.push_back(c); - } - void register_on_applied_sync(Context *c) { - if (!c) return; - on_applied_sync.push_back(c); - } - void register_on_complete(Context *c) { - if (!c) return; - RunOnDeleteRef _complete (std::make_shared(c)); - register_on_applied(new ContainerContext(_complete)); - register_on_commit(new ContainerContext(_complete)); - } - bool has_contexts() const { - return - !on_commit.empty() || - !on_applied.empty() || - !on_applied_sync.empty(); - } - - static void collect_contexts(vector& t, - Context **out_on_applied, - Context **out_on_commit, - Context **out_on_applied_sync) { - ceph_assert(out_on_applied); - ceph_assert(out_on_commit); - ceph_assert(out_on_applied_sync); - std::list on_applied, on_commit, on_applied_sync; - for (auto& i : t) { - on_applied.splice(on_applied.end(), i.on_applied); - on_commit.splice(on_commit.end(), i.on_commit); - on_applied_sync.splice(on_applied_sync.end(), i.on_applied_sync); - } - *out_on_applied = C_Contexts::list_to_context(on_applied); - *out_on_commit = C_Contexts::list_to_context(on_commit); - *out_on_applied_sync = C_Contexts::list_to_context(on_applied_sync); - } - static void collect_contexts(vector& t, - std::list *out_on_applied, - std::list *out_on_commit, - std::list *out_on_applied_sync) { - ceph_assert(out_on_applied); - ceph_assert(out_on_commit); - ceph_assert(out_on_applied_sync); - for (auto& i : t) { - out_on_applied->splice(out_on_applied->end(), i.on_applied); - out_on_commit->splice(out_on_commit->end(), i.on_commit); - out_on_applied_sync->splice(out_on_applied_sync->end(), - i.on_applied_sync); - } - } - - Context *get_on_applied() { - return C_Contexts::list_to_context(on_applied); - } - Context *get_on_commit() { - return C_Contexts::list_to_context(on_commit); - } - Context *get_on_applied_sync() { - return C_Contexts::list_to_context(on_applied_sync); - } - - void set_fadvise_flags(uint32_t flags) { - data.fadvise_flags = flags; - } - void set_fadvise_flag(uint32_t flag) { - data.fadvise_flags = data.fadvise_flags | flag; - } - uint32_t get_fadvise_flags() { return data.fadvise_flags; } - - void swap(Transaction& other) noexcept { - std::swap(data, other.data); - std::swap(on_applied, other.on_applied); - std::swap(on_commit, other.on_commit); - std::swap(on_applied_sync, other.on_applied_sync); - - std::swap(coll_index, other.coll_index); - std::swap(object_index, other.object_index); - std::swap(coll_id, other.coll_id); - std::swap(object_id, other.object_id); - op_bl.swap(other.op_bl); - data_bl.swap(other.data_bl); - } - - void _update_op(Op* op, - vector<__le32> &cm, - vector<__le32> &om) { - - switch (op->op) { - case OP_NOP: - break; - - case OP_TOUCH: - case OP_REMOVE: - case OP_SETATTR: - case OP_SETATTRS: - case OP_RMATTR: - case OP_RMATTRS: - case OP_COLL_REMOVE: - case OP_OMAP_CLEAR: - case OP_OMAP_SETKEYS: - case OP_OMAP_RMKEYS: - case OP_OMAP_RMKEYRANGE: - case OP_OMAP_SETHEADER: - case OP_WRITE: - case OP_ZERO: - case OP_TRUNCATE: - case OP_SETALLOCHINT: - ceph_assert(op->cid < cm.size()); - ceph_assert(op->oid < om.size()); - op->cid = cm[op->cid]; - op->oid = om[op->oid]; - break; - - case OP_CLONERANGE2: - case OP_CLONE: - ceph_assert(op->cid < cm.size()); - ceph_assert(op->oid < om.size()); - ceph_assert(op->dest_oid < om.size()); - op->cid = cm[op->cid]; - op->oid = om[op->oid]; - op->dest_oid = om[op->dest_oid]; - break; - - case OP_MKCOLL: - case OP_RMCOLL: - case OP_COLL_SETATTR: - case OP_COLL_RMATTR: - case OP_COLL_SETATTRS: - case OP_COLL_HINT: - case OP_COLL_SET_BITS: - ceph_assert(op->cid < cm.size()); - op->cid = cm[op->cid]; - break; - - case OP_COLL_ADD: - ceph_assert(op->cid < cm.size()); - ceph_assert(op->oid < om.size()); - ceph_assert(op->dest_cid < om.size()); - op->cid = cm[op->cid]; - op->dest_cid = cm[op->dest_cid]; - op->oid = om[op->oid]; - break; - - case OP_COLL_MOVE_RENAME: - ceph_assert(op->cid < cm.size()); - ceph_assert(op->oid < om.size()); - ceph_assert(op->dest_cid < cm.size()); - ceph_assert(op->dest_oid < om.size()); - op->cid = cm[op->cid]; - op->oid = om[op->oid]; - op->dest_cid = cm[op->dest_cid]; - op->dest_oid = om[op->dest_oid]; - break; - - case OP_TRY_RENAME: - ceph_assert(op->cid < cm.size()); - ceph_assert(op->oid < om.size()); - ceph_assert(op->dest_oid < om.size()); - op->cid = cm[op->cid]; - op->oid = om[op->oid]; - op->dest_oid = om[op->dest_oid]; - break; - - case OP_SPLIT_COLLECTION2: - ceph_assert(op->cid < cm.size()); - ceph_assert(op->dest_cid < cm.size()); - op->cid = cm[op->cid]; - op->dest_cid = cm[op->dest_cid]; - break; - - case OP_MERGE_COLLECTION: - ceph_assert(op->cid < cm.size()); - ceph_assert(op->dest_cid < cm.size()); - op->cid = cm[op->cid]; - op->dest_cid = cm[op->dest_cid]; - break; - - default: - ceph_abort_msg("Unknown OP"); - } - } - void _update_op_bl( - bufferlist& bl, - vector<__le32> &cm, - vector<__le32> &om) { - for (auto& bp : bl.buffers()) { - ceph_assert(bp.length() % sizeof(Op) == 0); - - char* raw_p = const_cast(bp.c_str()); - char* raw_end = raw_p + bp.length(); - while (raw_p < raw_end) { - _update_op(reinterpret_cast(raw_p), cm, om); - raw_p += sizeof(Op); - } - } - } - /// Append the operations of the parameter to this Transaction. Those - /// operations are removed from the parameter Transaction - void append(Transaction& other) { - - data.ops += other.data.ops; - if (other.data.largest_data_len > data.largest_data_len) { - data.largest_data_len = other.data.largest_data_len; - data.largest_data_off = other.data.largest_data_off; - data.largest_data_off_in_data_bl = data_bl.length() + other.data.largest_data_off_in_data_bl; - } - data.fadvise_flags |= other.data.fadvise_flags; - on_applied.splice(on_applied.end(), other.on_applied); - on_commit.splice(on_commit.end(), other.on_commit); - on_applied_sync.splice(on_applied_sync.end(), other.on_applied_sync); - - //append coll_index & object_index - vector<__le32> cm(other.coll_index.size()); - map::iterator coll_index_p; - for (coll_index_p = other.coll_index.begin(); - coll_index_p != other.coll_index.end(); - ++coll_index_p) { - cm[coll_index_p->second] = _get_coll_id(coll_index_p->first); - } - - vector<__le32> om(other.object_index.size()); - map::iterator object_index_p; - for (object_index_p = other.object_index.begin(); - object_index_p != other.object_index.end(); - ++object_index_p) { - om[object_index_p->second] = _get_object_id(object_index_p->first); - } - - //the other.op_bl SHOULD NOT be changes during append operation, - //we use additional bufferlist to avoid this problem - bufferlist other_op_bl; - { - bufferptr other_op_bl_ptr(other.op_bl.length()); - other.op_bl.copy(0, other.op_bl.length(), other_op_bl_ptr.c_str()); - other_op_bl.append(std::move(other_op_bl_ptr)); - } - - //update other_op_bl with cm & om - //When the other is appended to current transaction, all coll_index and - //object_index in other.op_buffer should be updated by new index of the - //combined transaction - _update_op_bl(other_op_bl, cm, om); - - //append op_bl - op_bl.append(other_op_bl); - //append data_bl - data_bl.append(other.data_bl); - } - - /** Inquires about the Transaction as a whole. */ - - /// How big is the encoded Transaction buffer? - uint64_t get_encoded_bytes() { - //layout: data_bl + op_bl + coll_index + object_index + data - - // coll_index size, object_index size and sizeof(transaction_data) - // all here, so they may be computed at compile-time - size_t final_size = sizeof(__u32) * 2 + sizeof(data); - - // coll_index second and object_index second - final_size += (coll_index.size() + object_index.size()) * sizeof(__le32); - - // coll_index first - for (auto p = coll_index.begin(); p != coll_index.end(); ++p) { - final_size += p->first.encoded_size(); - } - - // object_index first - for (auto p = object_index.begin(); p != object_index.end(); ++p) { - final_size += p->first.encoded_size(); - } - - return data_bl.length() + - op_bl.length() + - final_size; - } - - /// Retain old version for regression testing purposes - uint64_t get_encoded_bytes_test() { - using ceph::encode; - //layout: data_bl + op_bl + coll_index + object_index + data - bufferlist bl; - encode(coll_index, bl); - encode(object_index, bl); - - return data_bl.length() + - op_bl.length() + - bl.length() + - sizeof(data); - } - - uint64_t get_num_bytes() { - return get_encoded_bytes(); - } - /// Size of largest data buffer to the "write" operation encountered so far - uint32_t get_data_length() { - return data.largest_data_len; - } - /// offset within the encoded buffer to the start of the largest data buffer - /// that's encoded - uint32_t get_data_offset() { - if (data.largest_data_off_in_data_bl) { - return data.largest_data_off_in_data_bl + - sizeof(__u8) + // encode struct_v - sizeof(__u8) + // encode compat_v - sizeof(__u32) + // encode len - sizeof(__u32); // data_bl len - } - return 0; // none - } - /// offset of buffer as aligned to destination within object. - int get_data_alignment() { - if (!data.largest_data_len) - return 0; - return (0 - get_data_offset()) & ~CEPH_PAGE_MASK; - } - /// Is the Transaction empty (no operations) - bool empty() { - return !data.ops; - } - /// Number of operations in the transaction - int get_num_ops() { - return data.ops; - } - - /** - * iterator - * - * Helper object to parse Transactions. - * - * ObjectStore instances use this object to step down the encoded - * buffer decoding operation codes and parameters as we go. - * - */ - class iterator { - Transaction *t; - - uint64_t ops; - char* op_buffer_p; - - bufferlist::const_iterator data_bl_p; - - public: - vector colls; - vector objects; - - private: - explicit iterator(Transaction *t) - : t(t), - data_bl_p(t->data_bl.cbegin()), - colls(t->coll_index.size()), - objects(t->object_index.size()) { - - ops = t->data.ops; - op_buffer_p = t->op_bl.c_str(); - - map::iterator coll_index_p; - for (coll_index_p = t->coll_index.begin(); - coll_index_p != t->coll_index.end(); - ++coll_index_p) { - colls[coll_index_p->second] = coll_index_p->first; - } - - map::iterator object_index_p; - for (object_index_p = t->object_index.begin(); - object_index_p != t->object_index.end(); - ++object_index_p) { - objects[object_index_p->second] = object_index_p->first; - } - } - - friend class Transaction; - - public: - - bool have_op() { - return ops > 0; - } - Op* decode_op() { - ceph_assert(ops > 0); - - Op* op = reinterpret_cast(op_buffer_p); - op_buffer_p += sizeof(Op); - ops--; - - return op; - } - string decode_string() { - using ceph::decode; - string s; - decode(s, data_bl_p); - return s; - } - void decode_bp(bufferptr& bp) { - using ceph::decode; - decode(bp, data_bl_p); - } - void decode_bl(bufferlist& bl) { - using ceph::decode; - decode(bl, data_bl_p); - } - void decode_attrset(map& aset) { - using ceph::decode; - decode(aset, data_bl_p); - } - void decode_attrset(map& aset) { - using ceph::decode; - decode(aset, data_bl_p); - } - void decode_attrset_bl(bufferlist *pbl); - void decode_keyset(set &keys){ - using ceph::decode; - decode(keys, data_bl_p); - } - void decode_keyset_bl(bufferlist *pbl); - - const ghobject_t &get_oid(__le32 oid_id) { - ceph_assert(oid_id < objects.size()); - return objects[oid_id]; - } - const coll_t &get_cid(__le32 cid_id) { - ceph_assert(cid_id < colls.size()); - return colls[cid_id]; - } - uint32_t get_fadvise_flags() const { - return t->get_fadvise_flags(); - } - }; - - iterator begin() { - return iterator(this); - } - -private: - void _build_actions_from_tbl(); - - static constexpr size_t OPS_PER_PTR = 32u; - /** - * Helper functions to encode the various mutation elements of a - * transaction. These are 1:1 with the operation codes (see - * enumeration above). These routines ensure that the - * encoder/creator of a transaction gets the right data in the - * right place. Sadly, there's no corresponding version nor any - * form of seat belts for the decoder. - */ - Op* _get_next_op() { - if (op_bl.get_append_buffer_unused_tail_length() < sizeof(Op)) { - op_bl.reserve(sizeof(Op) * OPS_PER_PTR); - } - // append_hole ensures bptr merging. Even huge number of ops - // shouldn't result in overpopulating bl::_buffers. - char* const p = op_bl.append_hole(sizeof(Op)).c_str(); - memset(p, 0, sizeof(Op)); - return reinterpret_cast(p); - } - __le32 _get_coll_id(const coll_t& coll) { - map::iterator c = coll_index.find(coll); - if (c != coll_index.end()) - return c->second; - - __le32 index_id = coll_id++; - coll_index[coll] = index_id; - return index_id; - } - __le32 _get_object_id(const ghobject_t& oid) { - map::iterator o = object_index.find(oid); - if (o != object_index.end()) - return o->second; - - __le32 index_id = object_id++; - object_index[oid] = index_id; - return index_id; - } - -public: - /// noop. 'nuf said - void nop() { - Op* _op = _get_next_op(); - _op->op = OP_NOP; - data.ops++; - } - /** - * touch - * - * Ensure the existance of an object in a collection. Create an - * empty object if necessary - */ - void touch(const coll_t& cid, const ghobject_t& oid) { - Op* _op = _get_next_op(); - _op->op = OP_TOUCH; - _op->cid = _get_coll_id(cid); - _op->oid = _get_object_id(oid); - data.ops++; - } - /** - * Write data to an offset within an object. If the object is too - * small, it is expanded as needed. It is possible to specify an - * offset beyond the current end of an object and it will be - * expanded as needed. Simple implementations of ObjectStore will - * just zero the data between the old end of the object and the - * newly provided data. More sophisticated implementations of - * ObjectStore will omit the untouched data and store it as a - * "hole" in the file. - * - * Note that a 0-length write does not affect the size of the object. - */ - void write(const coll_t& cid, const ghobject_t& oid, uint64_t off, uint64_t len, - const bufferlist& write_data, uint32_t flags = 0) { - using ceph::encode; - uint32_t orig_len = data_bl.length(); - Op* _op = _get_next_op(); - _op->op = OP_WRITE; - _op->cid = _get_coll_id(cid); - _op->oid = _get_object_id(oid); - _op->off = off; - _op->len = len; - encode(write_data, data_bl); - - ceph_assert(len == write_data.length()); - data.fadvise_flags = data.fadvise_flags | flags; - if (write_data.length() > data.largest_data_len) { - data.largest_data_len = write_data.length(); - data.largest_data_off = off; - // we are about to - data.largest_data_off_in_data_bl = orig_len + sizeof(__u32); - } - data.ops++; - } - /** - * zero out the indicated byte range within an object. Some - * ObjectStore instances may optimize this to release the - * underlying storage space. - * - * If the zero range extends beyond the end of the object, the object - * size is extended, just as if we were writing a buffer full of zeros. - * EXCEPT if the length is 0, in which case (just like a 0-length write) - * we do not adjust the object size. - */ - void zero(const coll_t& cid, const ghobject_t& oid, uint64_t off, uint64_t len) { - Op* _op = _get_next_op(); - _op->op = OP_ZERO; - _op->cid = _get_coll_id(cid); - _op->oid = _get_object_id(oid); - _op->off = off; - _op->len = len; - data.ops++; - } - /// Discard all data in the object beyond the specified size. - void truncate(const coll_t& cid, const ghobject_t& oid, uint64_t off) { - Op* _op = _get_next_op(); - _op->op = OP_TRUNCATE; - _op->cid = _get_coll_id(cid); - _op->oid = _get_object_id(oid); - _op->off = off; - data.ops++; - } - /// Remove an object. All four parts of the object are removed. - void remove(const coll_t& cid, const ghobject_t& oid) { - Op* _op = _get_next_op(); - _op->op = OP_REMOVE; - _op->cid = _get_coll_id(cid); - _op->oid = _get_object_id(oid); - data.ops++; - } - /// Set an xattr of an object - void setattr(const coll_t& cid, const ghobject_t& oid, - const char* name, - bufferlist&& val) { - string n(name); - setattr(cid, oid, n, val); - } - /// Set an xattr of an object - void setattr(const coll_t& cid, const ghobject_t& oid, - const string& s, bufferlist& val) { - using ceph::encode; - Op* _op = _get_next_op(); - _op->op = OP_SETATTR; - _op->cid = _get_coll_id(cid); - _op->oid = _get_object_id(oid); - encode(s, data_bl); - encode(val, data_bl); - data.ops++; - } - /// Set multiple xattrs of an object - void setattrs(const coll_t& cid, const ghobject_t& oid, - const map& attrset) { - using ceph::encode; - Op* _op = _get_next_op(); - _op->op = OP_SETATTRS; - _op->cid = _get_coll_id(cid); - _op->oid = _get_object_id(oid); - encode(attrset, data_bl); - data.ops++; - } - /// Set multiple xattrs of an object - void setattrs(const coll_t& cid, const ghobject_t& oid, - const map& attrset) { - using ceph::encode; - Op* _op = _get_next_op(); - _op->op = OP_SETATTRS; - _op->cid = _get_coll_id(cid); - _op->oid = _get_object_id(oid); - encode(attrset, data_bl); - data.ops++; - } - /// remove an xattr from an object - void rmattr(const coll_t& cid, const ghobject_t& oid, const char *name) { - string n(name); - rmattr(cid, oid, n); - } - /// remove an xattr from an object - void rmattr(const coll_t& cid, const ghobject_t& oid, const string& s) { - using ceph::encode; - Op* _op = _get_next_op(); - _op->op = OP_RMATTR; - _op->cid = _get_coll_id(cid); - _op->oid = _get_object_id(oid); - encode(s, data_bl); - data.ops++; - } - /// remove all xattrs from an object - void rmattrs(const coll_t& cid, const ghobject_t& oid) { - Op* _op = _get_next_op(); - _op->op = OP_RMATTRS; - _op->cid = _get_coll_id(cid); - _op->oid = _get_object_id(oid); - data.ops++; - } - /** - * Clone an object into another object. - * - * Low-cost (e.g., O(1)) cloning (if supported) is best, but - * fallback to an O(n) copy is allowed. All four parts of the - * object are cloned (data, xattrs, omap header, omap - * entries). - * - * The destination named object may already exist, in - * which case its previous contents are discarded. - */ - void clone(const coll_t& cid, const ghobject_t& oid, - const ghobject_t& noid) { - Op* _op = _get_next_op(); - _op->op = OP_CLONE; - _op->cid = _get_coll_id(cid); - _op->oid = _get_object_id(oid); - _op->dest_oid = _get_object_id(noid); - data.ops++; - } - /** - * Clone a byte range from one object to another. - * - * The data portion of the destination object receives a copy of a - * portion of the data from the source object. None of the other - * three parts of an object is copied from the source. - * - * The destination object size may be extended to the dstoff + len. - * - * The source range *must* overlap with the source object data. If it does - * not the result is undefined. - */ - void clone_range(const coll_t& cid, const ghobject_t& oid, - const ghobject_t& noid, - uint64_t srcoff, uint64_t srclen, uint64_t dstoff) { - Op* _op = _get_next_op(); - _op->op = OP_CLONERANGE2; - _op->cid = _get_coll_id(cid); - _op->oid = _get_object_id(oid); - _op->dest_oid = _get_object_id(noid); - _op->off = srcoff; - _op->len = srclen; - _op->dest_off = dstoff; - data.ops++; - } - - /// Create the collection - void create_collection(const coll_t& cid, int bits) { - Op* _op = _get_next_op(); - _op->op = OP_MKCOLL; - _op->cid = _get_coll_id(cid); - _op->split_bits = bits; - data.ops++; - } - - /** - * Give the collection a hint. - * - * @param cid - collection id. - * @param type - hint type. - * @param hint - the hint payload, which contains the customized - * data along with the hint type. - */ - void collection_hint(const coll_t& cid, uint32_t type, const bufferlist& hint) { - using ceph::encode; - Op* _op = _get_next_op(); - _op->op = OP_COLL_HINT; - _op->cid = _get_coll_id(cid); - _op->hint_type = type; - encode(hint, data_bl); - data.ops++; - } - - /// remove the collection, the collection must be empty - void remove_collection(const coll_t& cid) { - Op* _op = _get_next_op(); - _op->op = OP_RMCOLL; - _op->cid = _get_coll_id(cid); - data.ops++; - } - void collection_move(const coll_t& cid, const coll_t &oldcid, - const ghobject_t& oid) - __attribute__ ((deprecated)) { - // NOTE: we encode this as a fixed combo of ADD + REMOVE. they - // always appear together, so this is effectively a single MOVE. - Op* _op = _get_next_op(); - _op->op = OP_COLL_ADD; - _op->cid = _get_coll_id(oldcid); - _op->oid = _get_object_id(oid); - _op->dest_cid = _get_coll_id(cid); - data.ops++; - - _op = _get_next_op(); - _op->op = OP_COLL_REMOVE; - _op->cid = _get_coll_id(oldcid); - _op->oid = _get_object_id(oid); - data.ops++; - } - void collection_move_rename(const coll_t& oldcid, const ghobject_t& oldoid, - const coll_t &cid, const ghobject_t& oid) { - Op* _op = _get_next_op(); - _op->op = OP_COLL_MOVE_RENAME; - _op->cid = _get_coll_id(oldcid); - _op->oid = _get_object_id(oldoid); - _op->dest_cid = _get_coll_id(cid); - _op->dest_oid = _get_object_id(oid); - data.ops++; - } - void try_rename(const coll_t &cid, const ghobject_t& oldoid, - const ghobject_t& oid) { - Op* _op = _get_next_op(); - _op->op = OP_TRY_RENAME; - _op->cid = _get_coll_id(cid); - _op->oid = _get_object_id(oldoid); - _op->dest_oid = _get_object_id(oid); - data.ops++; - } - - /// Remove omap from oid - void omap_clear( - const coll_t &cid, ///< [in] Collection containing oid - const ghobject_t &oid ///< [in] Object from which to remove omap - ) { - Op* _op = _get_next_op(); - _op->op = OP_OMAP_CLEAR; - _op->cid = _get_coll_id(cid); - _op->oid = _get_object_id(oid); - data.ops++; - } - /// Set keys on oid omap. Replaces duplicate keys. - void omap_setkeys( - const coll_t& cid, ///< [in] Collection containing oid - const ghobject_t &oid, ///< [in] Object to update - const map &attrset ///< [in] Replacement keys and values - ) { - using ceph::encode; - Op* _op = _get_next_op(); - _op->op = OP_OMAP_SETKEYS; - _op->cid = _get_coll_id(cid); - _op->oid = _get_object_id(oid); - encode(attrset, data_bl); - data.ops++; - } - - /// Set keys on an oid omap (bufferlist variant). - void omap_setkeys( - const coll_t &cid, ///< [in] Collection containing oid - const ghobject_t &oid, ///< [in] Object to update - const bufferlist &attrset_bl ///< [in] Replacement keys and values - ) { - Op* _op = _get_next_op(); - _op->op = OP_OMAP_SETKEYS; - _op->cid = _get_coll_id(cid); - _op->oid = _get_object_id(oid); - data_bl.append(attrset_bl); - data.ops++; - } - - /// Remove keys from oid omap - void omap_rmkeys( - const coll_t &cid, ///< [in] Collection containing oid - const ghobject_t &oid, ///< [in] Object from which to remove the omap - const set &keys ///< [in] Keys to clear - ) { - using ceph::encode; - Op* _op = _get_next_op(); - _op->op = OP_OMAP_RMKEYS; - _op->cid = _get_coll_id(cid); - _op->oid = _get_object_id(oid); - encode(keys, data_bl); - data.ops++; - } - - /// Remove keys from oid omap - void omap_rmkeys( - const coll_t &cid, ///< [in] Collection containing oid - const ghobject_t &oid, ///< [in] Object from which to remove the omap - const bufferlist &keys_bl ///< [in] Keys to clear - ) { - Op* _op = _get_next_op(); - _op->op = OP_OMAP_RMKEYS; - _op->cid = _get_coll_id(cid); - _op->oid = _get_object_id(oid); - data_bl.append(keys_bl); - data.ops++; - } - - /// Remove key range from oid omap - void omap_rmkeyrange( - const coll_t &cid, ///< [in] Collection containing oid - const ghobject_t &oid, ///< [in] Object from which to remove the omap keys - const string& first, ///< [in] first key in range - const string& last ///< [in] first key past range, range is [first,last) - ) { - using ceph::encode; - Op* _op = _get_next_op(); - _op->op = OP_OMAP_RMKEYRANGE; - _op->cid = _get_coll_id(cid); - _op->oid = _get_object_id(oid); - encode(first, data_bl); - encode(last, data_bl); - data.ops++; - } - - /// Set omap header - void omap_setheader( - const coll_t &cid, ///< [in] Collection containing oid - const ghobject_t &oid, ///< [in] Object - const bufferlist &bl ///< [in] Header value - ) { - using ceph::encode; - Op* _op = _get_next_op(); - _op->op = OP_OMAP_SETHEADER; - _op->cid = _get_coll_id(cid); - _op->oid = _get_object_id(oid); - encode(bl, data_bl); - data.ops++; - } - - /// Split collection based on given prefixes, objects matching the specified - /// bits/rem are moved to the new collection - void split_collection( - const coll_t &cid, - uint32_t bits, - uint32_t rem, - const coll_t &destination) { - Op* _op = _get_next_op(); - _op->op = OP_SPLIT_COLLECTION2; - _op->cid = _get_coll_id(cid); - _op->dest_cid = _get_coll_id(destination); - _op->split_bits = bits; - _op->split_rem = rem; - data.ops++; - } - - /// Merge collection into another. - void merge_collection( - coll_t cid, - coll_t destination, - uint32_t bits) { - Op* _op = _get_next_op(); - _op->op = OP_MERGE_COLLECTION; - _op->cid = _get_coll_id(cid); - _op->dest_cid = _get_coll_id(destination); - _op->split_bits = bits; - data.ops++; - } - - void collection_set_bits( - const coll_t &cid, - int bits) { - Op* _op = _get_next_op(); - _op->op = OP_COLL_SET_BITS; - _op->cid = _get_coll_id(cid); - _op->split_bits = bits; - data.ops++; - } - - /// Set allocation hint for an object - /// make 0 values(expected_object_size, expected_write_size) noops for all implementations - void set_alloc_hint( - const coll_t &cid, - const ghobject_t &oid, - uint64_t expected_object_size, - uint64_t expected_write_size, - uint32_t flags - ) { - Op* _op = _get_next_op(); - _op->op = OP_SETALLOCHINT; - _op->cid = _get_coll_id(cid); - _op->oid = _get_object_id(oid); - _op->expected_object_size = expected_object_size; - _op->expected_write_size = expected_write_size; - _op->alloc_hint_flags = flags; - data.ops++; - } - - void encode(bufferlist& bl) const { - //layout: data_bl + op_bl + coll_index + object_index + data - ENCODE_START(9, 9, bl); - encode(data_bl, bl); - encode(op_bl, bl); - encode(coll_index, bl); - encode(object_index, bl); - data.encode(bl); - ENCODE_FINISH(bl); - } - - void decode(bufferlist::const_iterator &bl) { - DECODE_START(9, bl); - DECODE_OLDEST(9); - - decode(data_bl, bl); - decode(op_bl, bl); - decode(coll_index, bl); - decode(object_index, bl); - data.decode(bl); - coll_id = coll_index.size(); - object_id = object_index.size(); - - DECODE_FINISH(bl); - } - - void dump(ceph::Formatter *f); -}; -} diff --git a/src/crimson/os/cyan_store.cc b/src/crimson/os/cyan_store.cc index 4319fc148531..4fe9fcb8681d 100644 --- a/src/crimson/os/cyan_store.cc +++ b/src/crimson/os/cyan_store.cc @@ -7,7 +7,7 @@ #include "crimson/os/cyan_collection.h" #include "crimson/os/cyan_object.h" -#include "crimson/os/Transaction.h" +#include "os/Transaction.h" namespace { seastar::logger& logger() { @@ -34,9 +34,9 @@ seastar::future<> CyanStore::mount() throw std::runtime_error("read_file"); } - set collections; + std::set collections; auto p = bl.cbegin(); - decode(collections, p); + ceph::decode(collections, p); for (auto& coll : collections) { string fn = fmt::format("{}/{}", path, coll); @@ -69,7 +69,7 @@ seastar::future<> CyanStore::umount() string fn = path + "/collections"; bufferlist bl; - encode(collections, bl); + ceph::encode(collections, bl); if (int r = bl.write_file(fn.c_str()); r < 0) { throw std::runtime_error("write_file"); } @@ -95,7 +95,7 @@ seastar::future<> CyanStore::mkfs() string fn = path + "/collections"; bufferlist bl; set collections; - encode(collections, bl); + ceph::encode(collections, bl); r = bl.write_file(fn.c_str()); if (r < 0) throw std::runtime_error("write_file"); diff --git a/src/crimson/osd/osd.cc b/src/crimson/osd/osd.cc index eb73a3f0960d..5cfa29766cdd 100644 --- a/src/crimson/osd/osd.cc +++ b/src/crimson/osd/osd.cc @@ -24,7 +24,7 @@ #include "crimson/os/cyan_collection.h" #include "crimson/os/cyan_object.h" #include "crimson/os/cyan_store.h" -#include "crimson/os/Transaction.h" +#include "os/Transaction.h" #include "crimson/osd/heartbeat.h" #include "crimson/osd/osd_meta.h" #include "crimson/osd/pg.h" diff --git a/src/crimson/osd/osd_meta.cc b/src/crimson/osd/osd_meta.cc index 309d70e50bb8..e81455c68c21 100644 --- a/src/crimson/osd/osd_meta.cc +++ b/src/crimson/osd/osd_meta.cc @@ -4,7 +4,7 @@ #include "crimson/os/cyan_collection.h" #include "crimson/os/cyan_store.h" -#include "crimson/os/Transaction.h" +#include "os/Transaction.h" void OSDMeta::create(ceph::os::Transaction& t) { diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc index 6cae617f84d8..81db65b79308 100644 --- a/src/crimson/osd/pg.cc +++ b/src/crimson/osd/pg.cc @@ -28,7 +28,7 @@ #include "crimson/net/Messenger.h" #include "crimson/os/cyan_collection.h" #include "crimson/os/cyan_store.h" -#include "crimson/os/Transaction.h" +#include "os/Transaction.h" #include "crimson/osd/exceptions.h" #include "crimson/osd/pg_meta.h" diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index 12f6bd730e47..2a2d44ce6f64 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -12,7 +12,7 @@ #include #include "crimson/net/Fwd.h" -#include "crimson/os/Transaction.h" +#include "os/Transaction.h" #include "osd/osd_types.h" #include "osd/osd_internal_types.h" #include "recovery_state.h" diff --git a/src/crimson/osd/pg_backend.cc b/src/crimson/osd/pg_backend.cc index fa415d988fcf..79651b78ef85 100644 --- a/src/crimson/osd/pg_backend.cc +++ b/src/crimson/osd/pg_backend.cc @@ -173,7 +173,7 @@ PGBackend::store_object_state( ceph::bufferlist osv; encode(os->oi, osv, 0); // TODO: get_osdmap()->get_features(CEPH_ENTITY_TYPE_OSD, nullptr)); - txn.setattr(coll->cid, ghobject_t{os->oi.soid}, OI_ATTR, std::move(osv)); + txn.setattr(coll->cid, ghobject_t{os->oi.soid}, OI_ATTR, osv); } } else { // reset cached ObjectState without enforcing eviction diff --git a/src/crimson/osd/pg_backend.h b/src/crimson/osd/pg_backend.h index bb44e54052c8..ddde848018c9 100644 --- a/src/crimson/osd/pg_backend.h +++ b/src/crimson/osd/pg_backend.h @@ -9,7 +9,7 @@ #include #include "crimson/common/shared_lru.h" -#include "crimson/os/Transaction.h" +#include "os/Transaction.h" #include "osd/osd_types.h" #include "osd/osd_internal_types.h" diff --git a/src/os/ObjectStore.cc b/src/os/ObjectStore.cc index d9b9cf6d1ab6..5c555ec77c53 100644 --- a/src/os/ObjectStore.cc +++ b/src/os/ObjectStore.cc @@ -24,41 +24,6 @@ #endif #include "kstore/KStore.h" -void decode_str_str_map_to_bl(bufferlist::const_iterator& p, - bufferlist *out) -{ - auto start = p; - __u32 n; - decode(n, p); - unsigned len = 4; - while (n--) { - __u32 l; - decode(l, p); - p.advance(l); - len += 4 + l; - decode(l, p); - p.advance(l); - len += 4 + l; - } - start.copy(len, *out); -} - -void decode_str_set_to_bl(bufferlist::const_iterator& p, - bufferlist *out) -{ - auto start = p; - __u32 n; - decode(n, p); - unsigned len = 4; - while (n--) { - __u32 l; - decode(l, p); - p.advance(l); - len += 4 + l; - } - start.copy(len, *out); -} - ObjectStore *ObjectStore::create(CephContext *cct, const string& type, const string& data, @@ -150,11 +115,3 @@ int ObjectStore::read_meta(const std::string& key, *value = string(buf, r); return 0; } - - - - -ostream& operator<<(ostream& out, const ObjectStore::Transaction& tx) { - - return out << "Transaction(" << &tx << ")"; -} diff --git a/src/os/ObjectStore.h b/src/os/ObjectStore.h index cb84be8faa4f..594af2960bb9 100644 --- a/src/os/ObjectStore.h +++ b/src/os/ObjectStore.h @@ -22,6 +22,7 @@ #include "common/TrackedOp.h" #include "common/WorkQueue.h" #include "ObjectMap.h" +#include "os/Transaction.h" #include #include @@ -34,8 +35,6 @@ #include /* or */ #endif -#define OPS_PER_PTR 32 - class CephContext; namespace ceph { @@ -54,10 +53,6 @@ static inline void encode(const std::map *attrset encode(*attrset, bl); } -// this isn't the best place for these, but... -void decode_str_str_map_to_bl(ceph::buffer::list::const_iterator& p, ceph::buffer::list *out); -void decode_str_set_to_bl(ceph::buffer::list::const_iterator& p, ceph::buffer::list *out); - // Flag bits typedef uint32_t osflagbits_t; const int SKIP_JOURNAL_REPLAY = 1 << 0; @@ -68,6 +63,8 @@ protected: std::string path; public: + using Transaction = ceph::os::Transaction; + CephContext* cct; /** * create - create an ObjectStore instance. @@ -214,1228 +211,6 @@ public: */ - /********************************* - * transaction - * - * A Transaction represents a sequence of primitive mutation - * operations. - * - * Three events in the life of a Transaction result in - * callbacks. Any Transaction can contain any number of callback - * objects (Context) for any combination of the three classes of - * callbacks: - * - * on_applied_sync, on_applied, and on_commit. - * - * The "on_applied" and "on_applied_sync" callbacks are invoked when - * the modifications requested by the Transaction are visible to - * subsequent ObjectStore operations, i.e., the results are - * readable. The only conceptual difference between on_applied and - * on_applied_sync is the specific thread and locking environment in - * which the callbacks operate. "on_applied_sync" is called - * directly by an ObjectStore execution thread. It is expected to - * execute quickly and must not acquire any locks of the calling - * environment. Conversely, "on_applied" is called from the separate - * Finisher thread, meaning that it can contend for calling - * environment locks. NB, on_applied and on_applied_sync are - * sometimes called on_readable and on_readable_sync. - * - * The "on_commit" callback is also called from the Finisher thread - * and indicates that all of the mutations have been durably - * committed to stable storage (i.e., are now software/hardware - * crashproof). - * - * At the implementation level, each mutation primitive (and its - * associated data) can be serialized to a single buffer. That - * serialization, however, does not copy any data, but (using the - * ceph::buffer::list library) will reference the original buffers. This - * implies that the buffer that contains the data being submitted - * must remain stable until the on_commit callback completes. In - * practice, ceph::buffer::list handles all of this for you and this - * subtlety is only relevant if you are referencing an existing - * buffer via buffer::raw_static. - * - * Some implementations of ObjectStore choose to implement their own - * form of journaling that uses the serialized form of a - * Transaction. This requires that the encode/decode logic properly - * version itself and handle version upgrades that might change the - * format of the encoded Transaction. This has already happened a - * couple of times and the Transaction object contains some helper - * variables that aid in this legacy decoding: - * - * sobject_encoding detects an older/simpler version of oid - * present in pre-bobtail versions of ceph. use_pool_override - * also detects a situation where the pool of an oid can be - * overridden for legacy operations/buffers. For non-legacy - * implementations of ObjectStore, neither of these fields are - * relevant. - * - * - * TRANSACTION ISOLATION - * - * Except as noted above, isolation is the responsibility of the - * caller. In other words, if any storage element (storage element - * == any of the four portions of an object as described above) is - * altered by a transaction (including deletion), the caller - * promises not to attempt to read that element while the - * transaction is pending (here pending means from the time of - * issuance until the "on_applied_sync" callback has been - * received). Violations of isolation need not be detected by - * ObjectStore and there is no corresponding error mechanism for - * reporting an isolation violation (crashing would be the - * appropriate way to report an isolation violation if detected). - * - * Enumeration operations may violate transaction isolation as - * described above when a storage element is being created or - * deleted as part of a transaction. In this case, ObjectStore is - * allowed to consider the enumeration operation to either precede - * or follow the violating transaction element. In other words, the - * presence/absence of the mutated element in the enumeration is - * entirely at the discretion of ObjectStore. The arbitrary ordering - * applies independently to each transaction element. For example, - * if a transaction contains two mutating elements "create A" and - * "delete B". And an enumeration operation is performed while this - * transaction is pending. It is permissible for ObjectStore to - * report any of the four possible combinations of the existence of - * A and B. - * - */ - class Transaction { - public: - enum { - OP_NOP = 0, - OP_TOUCH = 9, // cid, oid - OP_WRITE = 10, // cid, oid, offset, len, bl - OP_ZERO = 11, // cid, oid, offset, len - OP_TRUNCATE = 12, // cid, oid, len - OP_REMOVE = 13, // cid, oid - OP_SETATTR = 14, // cid, oid, attrname, bl - OP_SETATTRS = 15, // cid, oid, attrset - OP_RMATTR = 16, // cid, oid, attrname - OP_CLONE = 17, // cid, oid, newoid - OP_CLONERANGE = 18, // cid, oid, newoid, offset, len - OP_CLONERANGE2 = 30, // cid, oid, newoid, srcoff, len, dstoff - - OP_TRIMCACHE = 19, // cid, oid, offset, len **DEPRECATED** - - OP_MKCOLL = 20, // cid - OP_RMCOLL = 21, // cid - OP_COLL_ADD = 22, // cid, oldcid, oid - OP_COLL_REMOVE = 23, // cid, oid - OP_COLL_SETATTR = 24, // cid, attrname, bl - OP_COLL_RMATTR = 25, // cid, attrname - OP_COLL_SETATTRS = 26, // cid, attrset - OP_COLL_MOVE = 8, // newcid, oldcid, oid - - OP_RMATTRS = 28, // cid, oid - OP_COLL_RENAME = 29, // cid, newcid - - OP_OMAP_CLEAR = 31, // cid - OP_OMAP_SETKEYS = 32, // cid, attrset - OP_OMAP_RMKEYS = 33, // cid, keyset - OP_OMAP_SETHEADER = 34, // cid, header - OP_SPLIT_COLLECTION = 35, // cid, bits, destination - OP_SPLIT_COLLECTION2 = 36, /* cid, bits, destination - doesn't create the destination */ - OP_OMAP_RMKEYRANGE = 37, // cid, oid, firstkey, lastkey - OP_COLL_MOVE_RENAME = 38, // oldcid, oldoid, newcid, newoid - - OP_SETALLOCHINT = 39, // cid, oid, object_size, write_size - OP_COLL_HINT = 40, // cid, type, bl - - OP_TRY_RENAME = 41, // oldcid, oldoid, newoid - - OP_COLL_SET_BITS = 42, // cid, bits - - OP_MERGE_COLLECTION = 43, // cid, destination - }; - - // Transaction hint type - enum { - COLL_HINT_EXPECTED_NUM_OBJECTS = 1, - }; - - struct Op { - __le32 op; - __le32 cid; - __le32 oid; - __le64 off; - __le64 len; - __le32 dest_cid; - __le32 dest_oid; //OP_CLONE, OP_CLONERANGE - __le64 dest_off; //OP_CLONERANGE - union { - struct { - __le32 hint_type; //OP_COLL_HINT - }; - struct { - __le32 alloc_hint_flags; //OP_SETALLOCHINT - }; - }; - __le64 expected_object_size; //OP_SETALLOCHINT - __le64 expected_write_size; //OP_SETALLOCHINT - __le32 split_bits; //OP_SPLIT_COLLECTION2,OP_COLL_SET_BITS, - //OP_MKCOLL - __le32 split_rem; //OP_SPLIT_COLLECTION2 - } __attribute__ ((packed)) ; - - struct TransactionData { - __le64 ops; - __le32 largest_data_len; - __le32 largest_data_off; - __le32 largest_data_off_in_data_bl; - __le32 fadvise_flags; - - TransactionData() noexcept : - ops(0), - largest_data_len(0), - largest_data_off(0), - largest_data_off_in_data_bl(0), - fadvise_flags(0) { } - - // override default move operations to reset default values - TransactionData(TransactionData&& other) noexcept : - ops(other.ops), - largest_data_len(other.largest_data_len), - largest_data_off(other.largest_data_off), - largest_data_off_in_data_bl(other.largest_data_off_in_data_bl), - fadvise_flags(other.fadvise_flags) { - other.ops = 0; - other.largest_data_len = 0; - other.largest_data_off = 0; - other.largest_data_off_in_data_bl = 0; - other.fadvise_flags = 0; - } - TransactionData& operator=(TransactionData&& other) noexcept { - ops = other.ops; - largest_data_len = other.largest_data_len; - largest_data_off = other.largest_data_off; - largest_data_off_in_data_bl = other.largest_data_off_in_data_bl; - fadvise_flags = other.fadvise_flags; - other.ops = 0; - other.largest_data_len = 0; - other.largest_data_off = 0; - other.largest_data_off_in_data_bl = 0; - other.fadvise_flags = 0; - return *this; - } - - TransactionData(const TransactionData& other) = default; - TransactionData& operator=(const TransactionData& other) = default; - - void encode(ceph::buffer::list& bl) const { - bl.append((char*)this, sizeof(TransactionData)); - } - void decode(ceph::buffer::list::const_iterator &bl) { - bl.copy(sizeof(TransactionData), (char*)this); - } - } __attribute__ ((packed)) ; - - private: - TransactionData data; - - std::map coll_index; - std::map object_index; - - __le32 coll_id {0}; - __le32 object_id {0}; - - ceph::buffer::list data_bl; - ceph::buffer::list op_bl; - - std::list on_applied; - std::list on_commit; - std::list on_applied_sync; - - public: - Transaction() = default; - - explicit Transaction(ceph::buffer::list::const_iterator &dp) { - decode(dp); - } - explicit Transaction(ceph::buffer::list &nbl) { - auto dp = nbl.cbegin(); - decode(dp); - } - - // override default move operations to reset default values - Transaction(Transaction&& other) noexcept : - data(std::move(other.data)), - coll_index(std::move(other.coll_index)), - object_index(std::move(other.object_index)), - coll_id(other.coll_id), - object_id(other.object_id), - data_bl(std::move(other.data_bl)), - op_bl(std::move(other.op_bl)), - on_applied(std::move(other.on_applied)), - on_commit(std::move(other.on_commit)), - on_applied_sync(std::move(other.on_applied_sync)) { - other.coll_id = 0; - other.object_id = 0; - } - - Transaction& operator=(Transaction&& other) noexcept { - data = std::move(other.data); - coll_index = std::move(other.coll_index); - object_index = std::move(other.object_index); - coll_id = other.coll_id; - object_id = other.object_id; - data_bl = std::move(other.data_bl); - op_bl = std::move(other.op_bl); - on_applied = std::move(other.on_applied); - on_commit = std::move(other.on_commit); - on_applied_sync = std::move(other.on_applied_sync); - other.coll_id = 0; - other.object_id = 0; - return *this; - } - - Transaction(const Transaction& other) = default; - Transaction& operator=(const Transaction& other) = default; - - // expose object_index for FileStore::Op's benefit - const std::map& get_object_index() const { - return object_index; - } - - /* Operations on callback contexts */ - void register_on_applied(Context *c) { - if (!c) return; - on_applied.push_back(c); - } - void register_on_commit(Context *c) { - if (!c) return; - on_commit.push_back(c); - } - void register_on_applied_sync(Context *c) { - if (!c) return; - on_applied_sync.push_back(c); - } - void register_on_complete(Context *c) { - if (!c) return; - RunOnDeleteRef _complete (std::make_shared(c)); - register_on_applied(new ContainerContext(_complete)); - register_on_commit(new ContainerContext(_complete)); - } - bool has_contexts() const { - return - !on_commit.empty() || - !on_applied.empty() || - !on_applied_sync.empty(); - } - - static void collect_contexts( - std::vector& t, - Context **out_on_applied, - Context **out_on_commit, - Context **out_on_applied_sync) { - ceph_assert(out_on_applied); - ceph_assert(out_on_commit); - ceph_assert(out_on_applied_sync); - std::list on_applied, on_commit, on_applied_sync; - for (auto& i : t) { - on_applied.splice(on_applied.end(), i.on_applied); - on_commit.splice(on_commit.end(), i.on_commit); - on_applied_sync.splice(on_applied_sync.end(), i.on_applied_sync); - } - *out_on_applied = C_Contexts::list_to_context(on_applied); - *out_on_commit = C_Contexts::list_to_context(on_commit); - *out_on_applied_sync = C_Contexts::list_to_context(on_applied_sync); - } - static void collect_contexts( - std::vector& t, - std::list *out_on_applied, - std::list *out_on_commit, - std::list *out_on_applied_sync) { - ceph_assert(out_on_applied); - ceph_assert(out_on_commit); - ceph_assert(out_on_applied_sync); - for (auto& i : t) { - out_on_applied->splice(out_on_applied->end(), i.on_applied); - out_on_commit->splice(out_on_commit->end(), i.on_commit); - out_on_applied_sync->splice(out_on_applied_sync->end(), - i.on_applied_sync); - } - } - - Context *get_on_applied() { - return C_Contexts::list_to_context(on_applied); - } - Context *get_on_commit() { - return C_Contexts::list_to_context(on_commit); - } - Context *get_on_applied_sync() { - return C_Contexts::list_to_context(on_applied_sync); - } - - void set_fadvise_flags(uint32_t flags) { - data.fadvise_flags = flags; - } - void set_fadvise_flag(uint32_t flag) { - data.fadvise_flags = data.fadvise_flags | flag; - } - uint32_t get_fadvise_flags() { return data.fadvise_flags; } - - void swap(Transaction& other) noexcept { - std::swap(data, other.data); - std::swap(on_applied, other.on_applied); - std::swap(on_commit, other.on_commit); - std::swap(on_applied_sync, other.on_applied_sync); - - std::swap(coll_index, other.coll_index); - std::swap(object_index, other.object_index); - std::swap(coll_id, other.coll_id); - std::swap(object_id, other.object_id); - op_bl.swap(other.op_bl); - data_bl.swap(other.data_bl); - } - - void _update_op(Op* op, - std::vector<__le32> &cm, - std::vector<__le32> &om) { - - switch (op->op) { - case OP_NOP: - break; - - case OP_TOUCH: - case OP_REMOVE: - case OP_SETATTR: - case OP_SETATTRS: - case OP_RMATTR: - case OP_RMATTRS: - case OP_COLL_REMOVE: - case OP_OMAP_CLEAR: - case OP_OMAP_SETKEYS: - case OP_OMAP_RMKEYS: - case OP_OMAP_RMKEYRANGE: - case OP_OMAP_SETHEADER: - case OP_WRITE: - case OP_ZERO: - case OP_TRUNCATE: - case OP_SETALLOCHINT: - ceph_assert(op->cid < cm.size()); - ceph_assert(op->oid < om.size()); - op->cid = cm[op->cid]; - op->oid = om[op->oid]; - break; - - case OP_CLONERANGE2: - case OP_CLONE: - ceph_assert(op->cid < cm.size()); - ceph_assert(op->oid < om.size()); - ceph_assert(op->dest_oid < om.size()); - op->cid = cm[op->cid]; - op->oid = om[op->oid]; - op->dest_oid = om[op->dest_oid]; - break; - - case OP_MKCOLL: - case OP_RMCOLL: - case OP_COLL_SETATTR: - case OP_COLL_RMATTR: - case OP_COLL_SETATTRS: - case OP_COLL_HINT: - case OP_COLL_SET_BITS: - ceph_assert(op->cid < cm.size()); - op->cid = cm[op->cid]; - break; - - case OP_COLL_ADD: - ceph_assert(op->cid < cm.size()); - ceph_assert(op->oid < om.size()); - ceph_assert(op->dest_cid < om.size()); - op->cid = cm[op->cid]; - op->dest_cid = cm[op->dest_cid]; - op->oid = om[op->oid]; - break; - - case OP_COLL_MOVE_RENAME: - ceph_assert(op->cid < cm.size()); - ceph_assert(op->oid < om.size()); - ceph_assert(op->dest_cid < cm.size()); - ceph_assert(op->dest_oid < om.size()); - op->cid = cm[op->cid]; - op->oid = om[op->oid]; - op->dest_cid = cm[op->dest_cid]; - op->dest_oid = om[op->dest_oid]; - break; - - case OP_TRY_RENAME: - ceph_assert(op->cid < cm.size()); - ceph_assert(op->oid < om.size()); - ceph_assert(op->dest_oid < om.size()); - op->cid = cm[op->cid]; - op->oid = om[op->oid]; - op->dest_oid = om[op->dest_oid]; - break; - - case OP_SPLIT_COLLECTION2: - ceph_assert(op->cid < cm.size()); - ceph_assert(op->dest_cid < cm.size()); - op->cid = cm[op->cid]; - op->dest_cid = cm[op->dest_cid]; - break; - - case OP_MERGE_COLLECTION: - ceph_assert(op->cid < cm.size()); - ceph_assert(op->dest_cid < cm.size()); - op->cid = cm[op->cid]; - op->dest_cid = cm[op->dest_cid]; - break; - - default: - ceph_abort_msg("Unknown OP"); - } - } - void _update_op_bl( - ceph::buffer::list& bl, - std::vector<__le32> &cm, - std::vector<__le32> &om) { - for (auto& bp : bl.buffers()) { - ceph_assert(bp.length() % sizeof(Op) == 0); - - char* raw_p = const_cast(bp.c_str()); - char* raw_end = raw_p + bp.length(); - while (raw_p < raw_end) { - _update_op(reinterpret_cast(raw_p), cm, om); - raw_p += sizeof(Op); - } - } - } - /// Append the operations of the parameter to this Transaction. Those operations are removed from the parameter Transaction - void append(Transaction& other) { - - data.ops += other.data.ops; - if (other.data.largest_data_len > data.largest_data_len) { - data.largest_data_len = other.data.largest_data_len; - data.largest_data_off = other.data.largest_data_off; - data.largest_data_off_in_data_bl = data_bl.length() + other.data.largest_data_off_in_data_bl; - } - data.fadvise_flags |= other.data.fadvise_flags; - on_applied.splice(on_applied.end(), other.on_applied); - on_commit.splice(on_commit.end(), other.on_commit); - on_applied_sync.splice(on_applied_sync.end(), other.on_applied_sync); - - //append coll_index & object_index - std::vector<__le32> cm(other.coll_index.size()); - std::map::iterator coll_index_p; - for (coll_index_p = other.coll_index.begin(); - coll_index_p != other.coll_index.end(); - ++coll_index_p) { - cm[coll_index_p->second] = _get_coll_id(coll_index_p->first); - } - - std::vector<__le32> om(other.object_index.size()); - std::map::iterator object_index_p; - for (object_index_p = other.object_index.begin(); - object_index_p != other.object_index.end(); - ++object_index_p) { - om[object_index_p->second] = _get_object_id(object_index_p->first); - } - - //the other.op_bl SHOULD NOT be changes during append operation, - //we use additional ceph::buffer::list to avoid this problem - ceph::buffer::list other_op_bl; - { - ceph::buffer::ptr other_op_bl_ptr(other.op_bl.length()); - other.op_bl.copy(0, other.op_bl.length(), other_op_bl_ptr.c_str()); - other_op_bl.append(std::move(other_op_bl_ptr)); - } - - //update other_op_bl with cm & om - //When the other is appended to current transaction, all coll_index and - //object_index in other.op_buffer should be updated by new index of the - //combined transaction - _update_op_bl(other_op_bl, cm, om); - - //append op_bl - op_bl.append(other_op_bl); - //append data_bl - data_bl.append(other.data_bl); - } - - /** Inquires about the Transaction as a whole. */ - - /// How big is the encoded Transaction buffer? - uint64_t get_encoded_bytes() { - //layout: data_bl + op_bl + coll_index + object_index + data - - // coll_index size, object_index size and sizeof(transaction_data) - // all here, so they may be computed at compile-time - size_t final_size = sizeof(__u32) * 2 + sizeof(data); - - // coll_index second and object_index second - final_size += (coll_index.size() + object_index.size()) * sizeof(__le32); - - // coll_index first - for (auto p = coll_index.begin(); p != coll_index.end(); ++p) { - final_size += p->first.encoded_size(); - } - - // object_index first - for (auto p = object_index.begin(); p != object_index.end(); ++p) { - final_size += p->first.encoded_size(); - } - - return data_bl.length() + - op_bl.length() + - final_size; - } - - /// Retain old version for regression testing purposes - uint64_t get_encoded_bytes_test() { - using ceph::encode; - //layout: data_bl + op_bl + coll_index + object_index + data - ceph::buffer::list bl; - encode(coll_index, bl); - encode(object_index, bl); - - return data_bl.length() + - op_bl.length() + - bl.length() + - sizeof(data); - } - - uint64_t get_num_bytes() { - return get_encoded_bytes(); - } - /// Size of largest data buffer to the "write" operation encountered so far - uint32_t get_data_length() { - return data.largest_data_len; - } - /// offset within the encoded buffer to the start of the largest data buffer that's encoded - uint32_t get_data_offset() { - if (data.largest_data_off_in_data_bl) { - return data.largest_data_off_in_data_bl + - sizeof(__u8) + // encode struct_v - sizeof(__u8) + // encode compat_v - sizeof(__u32) + // encode len - sizeof(__u32); // data_bl len - } - return 0; // none - } - /// offset of buffer as aligned to destination within object. - int get_data_alignment() { - if (!data.largest_data_len) - return 0; - return (0 - get_data_offset()) & ~CEPH_PAGE_MASK; - } - /// Is the Transaction empty (no operations) - bool empty() { - return !data.ops; - } - /// Number of operations in the transaction - int get_num_ops() { - return data.ops; - } - - /** - * iterator - * - * Helper object to parse Transactions. - * - * ObjectStore instances use this object to step down the encoded - * buffer decoding operation codes and parameters as we go. - * - */ - class iterator { - Transaction *t; - - uint64_t ops; - char* op_buffer_p; - - ceph::buffer::list::const_iterator data_bl_p; - - public: - std::vector colls; - std::vector objects; - - private: - explicit iterator(Transaction *t) - : t(t), - data_bl_p(t->data_bl.cbegin()), - colls(t->coll_index.size()), - objects(t->object_index.size()) { - - ops = t->data.ops; - op_buffer_p = t->op_bl.c_str(); - - std::map::iterator coll_index_p; - for (coll_index_p = t->coll_index.begin(); - coll_index_p != t->coll_index.end(); - ++coll_index_p) { - colls[coll_index_p->second] = coll_index_p->first; - } - - std::map::iterator object_index_p; - for (object_index_p = t->object_index.begin(); - object_index_p != t->object_index.end(); - ++object_index_p) { - objects[object_index_p->second] = object_index_p->first; - } - } - - friend class Transaction; - - public: - - bool have_op() { - return ops > 0; - } - Op* decode_op() { - ceph_assert(ops > 0); - - Op* op = reinterpret_cast(op_buffer_p); - op_buffer_p += sizeof(Op); - ops--; - - return op; - } - std::string decode_string() { - using ceph::decode; - std::string s; - decode(s, data_bl_p); - return s; - } - void decode_bp(ceph::buffer::ptr& bp) { - using ceph::decode; - decode(bp, data_bl_p); - } - void decode_bl(ceph::buffer::list& bl) { - using ceph::decode; - decode(bl, data_bl_p); - } - void decode_attrset(std::map& aset) { - using ceph::decode; - decode(aset, data_bl_p); - } - void decode_attrset(std::map& aset) { - using ceph::decode; - decode(aset, data_bl_p); - } - void decode_attrset_bl(ceph::buffer::list *pbl) { - decode_str_str_map_to_bl(data_bl_p, pbl); - } - void decode_keyset(std::set &keys){ - using ceph::decode; - decode(keys, data_bl_p); - } - void decode_keyset_bl(ceph::buffer::list *pbl){ - decode_str_set_to_bl(data_bl_p, pbl); - } - - const ghobject_t &get_oid(__le32 oid_id) { - ceph_assert(oid_id < objects.size()); - return objects[oid_id]; - } - const coll_t &get_cid(__le32 cid_id) { - ceph_assert(cid_id < colls.size()); - return colls[cid_id]; - } - uint32_t get_fadvise_flags() const { - return t->get_fadvise_flags(); - } - }; - - iterator begin() { - return iterator(this); - } - -private: - void _build_actions_from_tbl(); - - /** - * Helper functions to encode the various mutation elements of a - * transaction. These are 1:1 with the operation codes (see - * enumeration above). These routines ensure that the - * encoder/creator of a transaction gets the right data in the - * right place. Sadly, there's no corresponding version nor any - * form of seat belts for the decoder. - */ - Op* _get_next_op() { - if (op_bl.get_append_buffer_unused_tail_length() < sizeof(Op)) { - op_bl.reserve(sizeof(Op) * OPS_PER_PTR); - } - // append_hole ensures bptr merging. Even huge number of ops - // shouldn't result in overpopulating bl::_buffers. - char* const p = op_bl.append_hole(sizeof(Op)).c_str(); - memset(p, 0, sizeof(Op)); - return reinterpret_cast(p); - } - __le32 _get_coll_id(const coll_t& coll) { - std::map::iterator c = coll_index.find(coll); - if (c != coll_index.end()) - return c->second; - - __le32 index_id = coll_id++; - coll_index[coll] = index_id; - return index_id; - } - __le32 _get_object_id(const ghobject_t& oid) { - std::map::iterator o = object_index.find(oid); - if (o != object_index.end()) - return o->second; - - __le32 index_id = object_id++; - object_index[oid] = index_id; - return index_id; - } - -public: - /// noop. 'nuf said - void nop() { - Op* _op = _get_next_op(); - _op->op = OP_NOP; - data.ops++; - } - /** - * touch - * - * Ensure the existance of an object in a collection. Create an - * empty object if necessary - */ - void touch(const coll_t& cid, const ghobject_t& oid) { - Op* _op = _get_next_op(); - _op->op = OP_TOUCH; - _op->cid = _get_coll_id(cid); - _op->oid = _get_object_id(oid); - data.ops++; - } - /** - * Write data to an offset within an object. If the object is too - * small, it is expanded as needed. It is possible to specify an - * offset beyond the current end of an object and it will be - * expanded as needed. Simple implementations of ObjectStore will - * just zero the data between the old end of the object and the - * newly provided data. More sophisticated implementations of - * ObjectStore will omit the untouched data and store it as a - * "hole" in the file. - * - * Note that a 0-length write does not affect the size of the object. - */ - void write(const coll_t& cid, const ghobject_t& oid, uint64_t off, uint64_t len, - const ceph::buffer::list& write_data, uint32_t flags = 0) { - using ceph::encode; - uint32_t orig_len = data_bl.length(); - Op* _op = _get_next_op(); - _op->op = OP_WRITE; - _op->cid = _get_coll_id(cid); - _op->oid = _get_object_id(oid); - _op->off = off; - _op->len = len; - encode(write_data, data_bl); - - ceph_assert(len == write_data.length()); - data.fadvise_flags = data.fadvise_flags | flags; - if (write_data.length() > data.largest_data_len) { - data.largest_data_len = write_data.length(); - data.largest_data_off = off; - data.largest_data_off_in_data_bl = orig_len + sizeof(__u32); // we are about to - } - data.ops++; - } - /** - * zero out the indicated byte range within an object. Some - * ObjectStore instances may optimize this to release the - * underlying storage space. - * - * If the zero range extends beyond the end of the object, the object - * size is extended, just as if we were writing a buffer full of zeros. - * EXCEPT if the length is 0, in which case (just like a 0-length write) - * we do not adjust the object size. - */ - void zero(const coll_t& cid, const ghobject_t& oid, uint64_t off, uint64_t len) { - Op* _op = _get_next_op(); - _op->op = OP_ZERO; - _op->cid = _get_coll_id(cid); - _op->oid = _get_object_id(oid); - _op->off = off; - _op->len = len; - data.ops++; - } - /// Discard all data in the object beyond the specified size. - void truncate(const coll_t& cid, const ghobject_t& oid, uint64_t off) { - Op* _op = _get_next_op(); - _op->op = OP_TRUNCATE; - _op->cid = _get_coll_id(cid); - _op->oid = _get_object_id(oid); - _op->off = off; - data.ops++; - } - /// Remove an object. All four parts of the object are removed. - void remove(const coll_t& cid, const ghobject_t& oid) { - Op* _op = _get_next_op(); - _op->op = OP_REMOVE; - _op->cid = _get_coll_id(cid); - _op->oid = _get_object_id(oid); - data.ops++; - } - /// Set an xattr of an object - void setattr(const coll_t& cid, const ghobject_t& oid, const char* name, ceph::buffer::list& val) { - std::string n(name); - setattr(cid, oid, n, val); - } - /// Set an xattr of an object - void setattr(const coll_t& cid, const ghobject_t& oid, const std::string& s, ceph::buffer::list& val) { - using ceph::encode; - Op* _op = _get_next_op(); - _op->op = OP_SETATTR; - _op->cid = _get_coll_id(cid); - _op->oid = _get_object_id(oid); - encode(s, data_bl); - encode(val, data_bl); - data.ops++; - } - /// Set multiple xattrs of an object - void setattrs(const coll_t& cid, const ghobject_t& oid, const std::map& attrset) { - using ceph::encode; - Op* _op = _get_next_op(); - _op->op = OP_SETATTRS; - _op->cid = _get_coll_id(cid); - _op->oid = _get_object_id(oid); - encode(attrset, data_bl); - data.ops++; - } - /// Set multiple xattrs of an object - void setattrs(const coll_t& cid, const ghobject_t& oid, const std::map& attrset) { - using ceph::encode; - Op* _op = _get_next_op(); - _op->op = OP_SETATTRS; - _op->cid = _get_coll_id(cid); - _op->oid = _get_object_id(oid); - encode(attrset, data_bl); - data.ops++; - } - /// remove an xattr from an object - void rmattr(const coll_t& cid, const ghobject_t& oid, const char *name) { - std::string n(name); - rmattr(cid, oid, n); - } - /// remove an xattr from an object - void rmattr(const coll_t& cid, const ghobject_t& oid, const std::string& s) { - using ceph::encode; - Op* _op = _get_next_op(); - _op->op = OP_RMATTR; - _op->cid = _get_coll_id(cid); - _op->oid = _get_object_id(oid); - encode(s, data_bl); - data.ops++; - } - /// remove all xattrs from an object - void rmattrs(const coll_t& cid, const ghobject_t& oid) { - Op* _op = _get_next_op(); - _op->op = OP_RMATTRS; - _op->cid = _get_coll_id(cid); - _op->oid = _get_object_id(oid); - data.ops++; - } - /** - * Clone an object into another object. - * - * Low-cost (e.g., O(1)) cloning (if supported) is best, but - * fallback to an O(n) copy is allowed. All four parts of the - * object are cloned (data, xattrs, omap header, omap - * entries). - * - * The destination named object may already exist, in - * which case its previous contents are discarded. - */ - void clone(const coll_t& cid, const ghobject_t& oid, - const ghobject_t& noid) { - Op* _op = _get_next_op(); - _op->op = OP_CLONE; - _op->cid = _get_coll_id(cid); - _op->oid = _get_object_id(oid); - _op->dest_oid = _get_object_id(noid); - data.ops++; - } - /** - * Clone a byte range from one object to another. - * - * The data portion of the destination object receives a copy of a - * portion of the data from the source object. None of the other - * three parts of an object is copied from the source. - * - * The destination object size may be extended to the dstoff + len. - * - * The source range *must* overlap with the source object data. If it does - * not the result is undefined. - */ - void clone_range(const coll_t& cid, const ghobject_t& oid, - const ghobject_t& noid, - uint64_t srcoff, uint64_t srclen, uint64_t dstoff) { - Op* _op = _get_next_op(); - _op->op = OP_CLONERANGE2; - _op->cid = _get_coll_id(cid); - _op->oid = _get_object_id(oid); - _op->dest_oid = _get_object_id(noid); - _op->off = srcoff; - _op->len = srclen; - _op->dest_off = dstoff; - data.ops++; - } - - /// Create the collection - void create_collection(const coll_t& cid, int bits) { - Op* _op = _get_next_op(); - _op->op = OP_MKCOLL; - _op->cid = _get_coll_id(cid); - _op->split_bits = bits; - data.ops++; - } - - /** - * Give the collection a hint. - * - * @param cid - collection id. - * @param type - hint type. - * @param hint - the hint payload, which contains the customized - * data along with the hint type. - */ - void collection_hint(const coll_t& cid, uint32_t type, const ceph::buffer::list& hint) { - using ceph::encode; - Op* _op = _get_next_op(); - _op->op = OP_COLL_HINT; - _op->cid = _get_coll_id(cid); - _op->hint_type = type; - encode(hint, data_bl); - data.ops++; - } - - /// remove the collection, the collection must be empty - void remove_collection(const coll_t& cid) { - Op* _op = _get_next_op(); - _op->op = OP_RMCOLL; - _op->cid = _get_coll_id(cid); - data.ops++; - } - void collection_move(const coll_t& cid, const coll_t &oldcid, const ghobject_t& oid) - __attribute__ ((deprecated)) { - // NOTE: we encode this as a fixed combo of ADD + REMOVE. they - // always appear together, so this is effectively a single MOVE. - Op* _op = _get_next_op(); - _op->op = OP_COLL_ADD; - _op->cid = _get_coll_id(oldcid); - _op->oid = _get_object_id(oid); - _op->dest_cid = _get_coll_id(cid); - data.ops++; - - _op = _get_next_op(); - _op->op = OP_COLL_REMOVE; - _op->cid = _get_coll_id(oldcid); - _op->oid = _get_object_id(oid); - data.ops++; - } - void collection_move_rename(const coll_t& oldcid, const ghobject_t& oldoid, - const coll_t &cid, const ghobject_t& oid) { - Op* _op = _get_next_op(); - _op->op = OP_COLL_MOVE_RENAME; - _op->cid = _get_coll_id(oldcid); - _op->oid = _get_object_id(oldoid); - _op->dest_cid = _get_coll_id(cid); - _op->dest_oid = _get_object_id(oid); - data.ops++; - } - void try_rename(const coll_t &cid, const ghobject_t& oldoid, - const ghobject_t& oid) { - Op* _op = _get_next_op(); - _op->op = OP_TRY_RENAME; - _op->cid = _get_coll_id(cid); - _op->oid = _get_object_id(oldoid); - _op->dest_oid = _get_object_id(oid); - data.ops++; - } - - /// Remove omap from oid - void omap_clear( - const coll_t &cid, ///< [in] Collection containing oid - const ghobject_t &oid ///< [in] Object from which to remove omap - ) { - Op* _op = _get_next_op(); - _op->op = OP_OMAP_CLEAR; - _op->cid = _get_coll_id(cid); - _op->oid = _get_object_id(oid); - data.ops++; - } - /// Set keys on oid omap. Replaces duplicate keys. - void omap_setkeys( - const coll_t& cid, ///< [in] Collection containing oid - const ghobject_t &oid, ///< [in] Object to update - const std::map &attrset ///< [in] Replacement keys and values - ) { - using ceph::encode; - Op* _op = _get_next_op(); - _op->op = OP_OMAP_SETKEYS; - _op->cid = _get_coll_id(cid); - _op->oid = _get_object_id(oid); - encode(attrset, data_bl); - data.ops++; - } - - /// Set keys on an oid omap (ceph::buffer::list variant). - void omap_setkeys( - const coll_t &cid, ///< [in] Collection containing oid - const ghobject_t &oid, ///< [in] Object to update - const ceph::buffer::list &attrset_bl ///< [in] Replacement keys and values - ) { - Op* _op = _get_next_op(); - _op->op = OP_OMAP_SETKEYS; - _op->cid = _get_coll_id(cid); - _op->oid = _get_object_id(oid); - data_bl.append(attrset_bl); - data.ops++; - } - - /// Remove keys from oid omap - void omap_rmkeys( - const coll_t &cid, ///< [in] Collection containing oid - const ghobject_t &oid, ///< [in] Object from which to remove the omap - const std::set &keys ///< [in] Keys to clear - ) { - using ceph::encode; - Op* _op = _get_next_op(); - _op->op = OP_OMAP_RMKEYS; - _op->cid = _get_coll_id(cid); - _op->oid = _get_object_id(oid); - encode(keys, data_bl); - data.ops++; - } - - /// Remove keys from oid omap - void omap_rmkeys( - const coll_t &cid, ///< [in] Collection containing oid - const ghobject_t &oid, ///< [in] Object from which to remove the omap - const ceph::buffer::list &keys_bl ///< [in] Keys to clear - ) { - Op* _op = _get_next_op(); - _op->op = OP_OMAP_RMKEYS; - _op->cid = _get_coll_id(cid); - _op->oid = _get_object_id(oid); - data_bl.append(keys_bl); - data.ops++; - } - - /// Remove key range from oid omap - void omap_rmkeyrange( - const coll_t &cid, ///< [in] Collection containing oid - const ghobject_t &oid, ///< [in] Object from which to remove the omap keys - const std::string& first, ///< [in] first key in range - const std::string& last ///< [in] first key past range, range is [first,last) - ) { - using ceph::encode; - Op* _op = _get_next_op(); - _op->op = OP_OMAP_RMKEYRANGE; - _op->cid = _get_coll_id(cid); - _op->oid = _get_object_id(oid); - encode(first, data_bl); - encode(last, data_bl); - data.ops++; - } - - /// Set omap header - void omap_setheader( - const coll_t &cid, ///< [in] Collection containing oid - const ghobject_t &oid, ///< [in] Object - const ceph::buffer::list &bl ///< [in] Header value - ) { - using ceph::encode; - Op* _op = _get_next_op(); - _op->op = OP_OMAP_SETHEADER; - _op->cid = _get_coll_id(cid); - _op->oid = _get_object_id(oid); - encode(bl, data_bl); - data.ops++; - } - - /// Split collection based on given prefixes, objects matching the specified bits/rem are - /// moved to the new collection - void split_collection( - const coll_t &cid, - uint32_t bits, - uint32_t rem, - const coll_t &destination) { - Op* _op = _get_next_op(); - _op->op = OP_SPLIT_COLLECTION2; - _op->cid = _get_coll_id(cid); - _op->dest_cid = _get_coll_id(destination); - _op->split_bits = bits; - _op->split_rem = rem; - data.ops++; - } - - /// Merge collection into another. - void merge_collection( - coll_t cid, - coll_t destination, - uint32_t bits) { - Op* _op = _get_next_op(); - _op->op = OP_MERGE_COLLECTION; - _op->cid = _get_coll_id(cid); - _op->dest_cid = _get_coll_id(destination); - _op->split_bits = bits; - data.ops++; - } - - void collection_set_bits( - const coll_t &cid, - int bits) { - Op* _op = _get_next_op(); - _op->op = OP_COLL_SET_BITS; - _op->cid = _get_coll_id(cid); - _op->split_bits = bits; - data.ops++; - } - - /// Set allocation hint for an object - /// make 0 values(expected_object_size, expected_write_size) noops for all implementations - void set_alloc_hint( - const coll_t &cid, - const ghobject_t &oid, - uint64_t expected_object_size, - uint64_t expected_write_size, - uint32_t flags - ) { - Op* _op = _get_next_op(); - _op->op = OP_SETALLOCHINT; - _op->cid = _get_coll_id(cid); - _op->oid = _get_object_id(oid); - _op->expected_object_size = expected_object_size; - _op->expected_write_size = expected_write_size; - _op->alloc_hint_flags = flags; - data.ops++; - } - - void encode(ceph::buffer::list& bl) const { - //layout: data_bl + op_bl + coll_index + object_index + data - ENCODE_START(9, 9, bl); - encode(data_bl, bl); - encode(op_bl, bl); - encode(coll_index, bl); - encode(object_index, bl); - data.encode(bl); - ENCODE_FINISH(bl); - } - - void decode(ceph::buffer::list::const_iterator &bl) { - DECODE_START(9, bl); - DECODE_OLDEST(9); - - decode(data_bl, bl); - decode(op_bl, bl); - decode(coll_index, bl); - decode(object_index, bl); - data.decode(bl); - coll_id = coll_index.size(); - object_id = object_index.size(); - - DECODE_FINISH(bl); - } - - void dump(ceph::Formatter *f); - static void generate_test_instances(std::list& o); - }; - int queue_transaction(CollectionHandle& ch, Transaction&& t, TrackedOpRef op = TrackedOpRef(), @@ -1920,9 +695,5 @@ public: return false; } }; -WRITE_CLASS_ENCODER(ObjectStore::Transaction) -WRITE_CLASS_ENCODER(ObjectStore::Transaction::TransactionData) - -std::ostream& operator<<(std::ostream& out, const ObjectStore::Transaction& tx); #endif diff --git a/src/os/Transaction.cc b/src/os/Transaction.cc index ad390a1c3eee..5a5ae0cb6a29 100644 --- a/src/os/Transaction.cc +++ b/src/os/Transaction.cc @@ -1,10 +1,47 @@ // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab -#include "ObjectStore.h" +#include "os/Transaction.h" #include "common/Formatter.h" -void ObjectStore::Transaction::dump(ceph::Formatter *f) +void decode_str_str_map_to_bl(bufferlist::const_iterator& p, + bufferlist *out) +{ + auto start = p; + __u32 n; + decode(n, p); + unsigned len = 4; + while (n--) { + __u32 l; + decode(l, p); + p.advance(l); + len += 4 + l; + decode(l, p); + p.advance(l); + len += 4 + l; + } + start.copy(len, *out); +} + +void decode_str_set_to_bl(bufferlist::const_iterator& p, + bufferlist *out) +{ + auto start = p; + __u32 n; + decode(n, p); + unsigned len = 4; + while (n--) { + __u32 l; + decode(l, p); + p.advance(l); + len += 4 + l; + } + start.copy(len, *out); +} + +namespace ceph::os { + +void Transaction::dump(ceph::Formatter *f) { f->open_array_section("ops"); iterator i = begin(); @@ -470,7 +507,7 @@ void ObjectStore::Transaction::dump(ceph::Formatter *f) #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wdeprecated-declarations" -void ObjectStore::Transaction::generate_test_instances(list& o) +void Transaction::generate_test_instances(list& o) { o.push_back(new Transaction); @@ -512,5 +549,12 @@ void ObjectStore::Transaction::generate_test_instances(list + +#include "include/int_types.h" +#include "include/buffer.h" +#include "osd/osd_types.h" + +#define OPS_PER_PTR 32 + +void decode_str_str_map_to_bl(ceph::buffer::list::const_iterator& p, ceph::buffer::list *out); +void decode_str_set_to_bl(ceph::buffer::list::const_iterator& p, ceph::buffer::list *out); + + +/********************************* + * transaction + * + * A Transaction represents a sequence of primitive mutation + * operations. + * + * Three events in the life of a Transaction result in + * callbacks. Any Transaction can contain any number of callback + * objects (Context) for any combination of the three classes of + * callbacks: + * + * on_applied_sync, on_applied, and on_commit. + * + * The "on_applied" and "on_applied_sync" callbacks are invoked when + * the modifications requested by the Transaction are visible to + * subsequent ObjectStore operations, i.e., the results are + * readable. The only conceptual difference between on_applied and + * on_applied_sync is the specific thread and locking environment in + * which the callbacks operate. "on_applied_sync" is called + * directly by an ObjectStore execution thread. It is expected to + * execute quickly and must not acquire any locks of the calling + * environment. Conversely, "on_applied" is called from the separate + * Finisher thread, meaning that it can contend for calling + * environment locks. NB, on_applied and on_applied_sync are + * sometimes called on_readable and on_readable_sync. + * + * The "on_commit" callback is also called from the Finisher thread + * and indicates that all of the mutations have been durably + * committed to stable storage (i.e., are now software/hardware + * crashproof). + * + * At the implementation level, each mutation primitive (and its + * associated data) can be serialized to a single buffer. That + * serialization, however, does not copy any data, but (using the + * ceph::buffer::list library) will reference the original buffers. This + * implies that the buffer that contains the data being submitted + * must remain stable until the on_commit callback completes. In + * practice, ceph::buffer::list handles all of this for you and this + * subtlety is only relevant if you are referencing an existing + * buffer via buffer::raw_static. + * + * Some implementations of ObjectStore choose to implement their own + * form of journaling that uses the serialized form of a + * Transaction. This requires that the encode/decode logic properly + * version itself and handle version upgrades that might change the + * format of the encoded Transaction. This has already happened a + * couple of times and the Transaction object contains some helper + * variables that aid in this legacy decoding: + * + * sobject_encoding detects an older/simpler version of oid + * present in pre-bobtail versions of ceph. use_pool_override + * also detects a situation where the pool of an oid can be + * overridden for legacy operations/buffers. For non-legacy + * implementations of ObjectStore, neither of these fields are + * relevant. + * + * + * TRANSACTION ISOLATION + * + * Except as noted above, isolation is the responsibility of the + * caller. In other words, if any storage element (storage element + * == any of the four portions of an object as described above) is + * altered by a transaction (including deletion), the caller + * promises not to attempt to read that element while the + * transaction is pending (here pending means from the time of + * issuance until the "on_applied_sync" callback has been + * received). Violations of isolation need not be detected by + * ObjectStore and there is no corresponding error mechanism for + * reporting an isolation violation (crashing would be the + * appropriate way to report an isolation violation if detected). + * + * Enumeration operations may violate transaction isolation as + * described above when a storage element is being created or + * deleted as part of a transaction. In this case, ObjectStore is + * allowed to consider the enumeration operation to either precede + * or follow the violating transaction element. In other words, the + * presence/absence of the mutated element in the enumeration is + * entirely at the discretion of ObjectStore. The arbitrary ordering + * applies independently to each transaction element. For example, + * if a transaction contains two mutating elements "create A" and + * "delete B". And an enumeration operation is performed while this + * transaction is pending. It is permissible for ObjectStore to + * report any of the four possible combinations of the existence of + * A and B. + * + */ +namespace ceph::os { +class Transaction { +public: + enum { + OP_NOP = 0, + OP_TOUCH = 9, // cid, oid + OP_WRITE = 10, // cid, oid, offset, len, bl + OP_ZERO = 11, // cid, oid, offset, len + OP_TRUNCATE = 12, // cid, oid, len + OP_REMOVE = 13, // cid, oid + OP_SETATTR = 14, // cid, oid, attrname, bl + OP_SETATTRS = 15, // cid, oid, attrset + OP_RMATTR = 16, // cid, oid, attrname + OP_CLONE = 17, // cid, oid, newoid + OP_CLONERANGE = 18, // cid, oid, newoid, offset, len + OP_CLONERANGE2 = 30, // cid, oid, newoid, srcoff, len, dstoff + + OP_TRIMCACHE = 19, // cid, oid, offset, len **DEPRECATED** + + OP_MKCOLL = 20, // cid + OP_RMCOLL = 21, // cid + OP_COLL_ADD = 22, // cid, oldcid, oid + OP_COLL_REMOVE = 23, // cid, oid + OP_COLL_SETATTR = 24, // cid, attrname, bl + OP_COLL_RMATTR = 25, // cid, attrname + OP_COLL_SETATTRS = 26, // cid, attrset + OP_COLL_MOVE = 8, // newcid, oldcid, oid + + OP_RMATTRS = 28, // cid, oid + OP_COLL_RENAME = 29, // cid, newcid + + OP_OMAP_CLEAR = 31, // cid + OP_OMAP_SETKEYS = 32, // cid, attrset + OP_OMAP_RMKEYS = 33, // cid, keyset + OP_OMAP_SETHEADER = 34, // cid, header + OP_SPLIT_COLLECTION = 35, // cid, bits, destination + OP_SPLIT_COLLECTION2 = 36, /* cid, bits, destination + doesn't create the destination */ + OP_OMAP_RMKEYRANGE = 37, // cid, oid, firstkey, lastkey + OP_COLL_MOVE_RENAME = 38, // oldcid, oldoid, newcid, newoid + + OP_SETALLOCHINT = 39, // cid, oid, object_size, write_size + OP_COLL_HINT = 40, // cid, type, bl + + OP_TRY_RENAME = 41, // oldcid, oldoid, newoid + + OP_COLL_SET_BITS = 42, // cid, bits + + OP_MERGE_COLLECTION = 43, // cid, destination + }; + + // Transaction hint type + enum { + COLL_HINT_EXPECTED_NUM_OBJECTS = 1, + }; + + struct Op { + __le32 op; + __le32 cid; + __le32 oid; + __le64 off; + __le64 len; + __le32 dest_cid; + __le32 dest_oid; //OP_CLONE, OP_CLONERANGE + __le64 dest_off; //OP_CLONERANGE + union { + struct { + __le32 hint_type; //OP_COLL_HINT + }; + struct { + __le32 alloc_hint_flags; //OP_SETALLOCHINT + }; + }; + __le64 expected_object_size; //OP_SETALLOCHINT + __le64 expected_write_size; //OP_SETALLOCHINT + __le32 split_bits; //OP_SPLIT_COLLECTION2,OP_COLL_SET_BITS, + //OP_MKCOLL + __le32 split_rem; //OP_SPLIT_COLLECTION2 + } __attribute__ ((packed)) ; + + struct TransactionData { + __le64 ops; + __le32 largest_data_len; + __le32 largest_data_off; + __le32 largest_data_off_in_data_bl; + __le32 fadvise_flags; + + TransactionData() noexcept : + ops(0), + largest_data_len(0), + largest_data_off(0), + largest_data_off_in_data_bl(0), + fadvise_flags(0) { } + + // override default move operations to reset default values + TransactionData(TransactionData&& other) noexcept : + ops(other.ops), + largest_data_len(other.largest_data_len), + largest_data_off(other.largest_data_off), + largest_data_off_in_data_bl(other.largest_data_off_in_data_bl), + fadvise_flags(other.fadvise_flags) { + other.ops = 0; + other.largest_data_len = 0; + other.largest_data_off = 0; + other.largest_data_off_in_data_bl = 0; + other.fadvise_flags = 0; + } + TransactionData& operator=(TransactionData&& other) noexcept { + ops = other.ops; + largest_data_len = other.largest_data_len; + largest_data_off = other.largest_data_off; + largest_data_off_in_data_bl = other.largest_data_off_in_data_bl; + fadvise_flags = other.fadvise_flags; + other.ops = 0; + other.largest_data_len = 0; + other.largest_data_off = 0; + other.largest_data_off_in_data_bl = 0; + other.fadvise_flags = 0; + return *this; + } + + TransactionData(const TransactionData& other) = default; + TransactionData& operator=(const TransactionData& other) = default; + + void encode(ceph::buffer::list& bl) const { + bl.append((char*)this, sizeof(TransactionData)); + } + void decode(ceph::buffer::list::const_iterator &bl) { + bl.copy(sizeof(TransactionData), (char*)this); + } + } __attribute__ ((packed)) ; + +private: + TransactionData data; + + std::map coll_index; + std::map object_index; + + __le32 coll_id {0}; + __le32 object_id {0}; + + ceph::buffer::list data_bl; + ceph::buffer::list op_bl; + + std::list on_applied; + std::list on_commit; + std::list on_applied_sync; + +public: + Transaction() = default; + + explicit Transaction(ceph::buffer::list::const_iterator &dp) { + decode(dp); + } + explicit Transaction(ceph::buffer::list &nbl) { + auto dp = nbl.cbegin(); + decode(dp); + } + + // override default move operations to reset default values + Transaction(Transaction&& other) noexcept : + data(std::move(other.data)), + coll_index(std::move(other.coll_index)), + object_index(std::move(other.object_index)), + coll_id(other.coll_id), + object_id(other.object_id), + data_bl(std::move(other.data_bl)), + op_bl(std::move(other.op_bl)), + on_applied(std::move(other.on_applied)), + on_commit(std::move(other.on_commit)), + on_applied_sync(std::move(other.on_applied_sync)) { + other.coll_id = 0; + other.object_id = 0; + } + + Transaction& operator=(Transaction&& other) noexcept { + data = std::move(other.data); + coll_index = std::move(other.coll_index); + object_index = std::move(other.object_index); + coll_id = other.coll_id; + object_id = other.object_id; + data_bl = std::move(other.data_bl); + op_bl = std::move(other.op_bl); + on_applied = std::move(other.on_applied); + on_commit = std::move(other.on_commit); + on_applied_sync = std::move(other.on_applied_sync); + other.coll_id = 0; + other.object_id = 0; + return *this; + } + + Transaction(const Transaction& other) = default; + Transaction& operator=(const Transaction& other) = default; + + // expose object_index for FileStore::Op's benefit + const std::map& get_object_index() const { + return object_index; + } + + /* Operations on callback contexts */ + void register_on_applied(Context *c) { + if (!c) return; + on_applied.push_back(c); + } + void register_on_commit(Context *c) { + if (!c) return; + on_commit.push_back(c); + } + void register_on_applied_sync(Context *c) { + if (!c) return; + on_applied_sync.push_back(c); + } + void register_on_complete(Context *c) { + if (!c) return; + RunOnDeleteRef _complete (std::make_shared(c)); + register_on_applied(new ContainerContext(_complete)); + register_on_commit(new ContainerContext(_complete)); + } + bool has_contexts() const { + return + !on_commit.empty() || + !on_applied.empty() || + !on_applied_sync.empty(); + } + + static void collect_contexts( + std::vector& t, + Context **out_on_applied, + Context **out_on_commit, + Context **out_on_applied_sync) { + ceph_assert(out_on_applied); + ceph_assert(out_on_commit); + ceph_assert(out_on_applied_sync); + std::list on_applied, on_commit, on_applied_sync; + for (auto& i : t) { + on_applied.splice(on_applied.end(), i.on_applied); + on_commit.splice(on_commit.end(), i.on_commit); + on_applied_sync.splice(on_applied_sync.end(), i.on_applied_sync); + } + *out_on_applied = C_Contexts::list_to_context(on_applied); + *out_on_commit = C_Contexts::list_to_context(on_commit); + *out_on_applied_sync = C_Contexts::list_to_context(on_applied_sync); + } + static void collect_contexts( + std::vector& t, + std::list *out_on_applied, + std::list *out_on_commit, + std::list *out_on_applied_sync) { + ceph_assert(out_on_applied); + ceph_assert(out_on_commit); + ceph_assert(out_on_applied_sync); + for (auto& i : t) { + out_on_applied->splice(out_on_applied->end(), i.on_applied); + out_on_commit->splice(out_on_commit->end(), i.on_commit); + out_on_applied_sync->splice(out_on_applied_sync->end(), + i.on_applied_sync); + } + } + + Context *get_on_applied() { + return C_Contexts::list_to_context(on_applied); + } + Context *get_on_commit() { + return C_Contexts::list_to_context(on_commit); + } + Context *get_on_applied_sync() { + return C_Contexts::list_to_context(on_applied_sync); + } + + void set_fadvise_flags(uint32_t flags) { + data.fadvise_flags = flags; + } + void set_fadvise_flag(uint32_t flag) { + data.fadvise_flags = data.fadvise_flags | flag; + } + uint32_t get_fadvise_flags() { return data.fadvise_flags; } + + void swap(Transaction& other) noexcept { + std::swap(data, other.data); + std::swap(on_applied, other.on_applied); + std::swap(on_commit, other.on_commit); + std::swap(on_applied_sync, other.on_applied_sync); + + std::swap(coll_index, other.coll_index); + std::swap(object_index, other.object_index); + std::swap(coll_id, other.coll_id); + std::swap(object_id, other.object_id); + op_bl.swap(other.op_bl); + data_bl.swap(other.data_bl); + } + + void _update_op(Op* op, + std::vector<__le32> &cm, + std::vector<__le32> &om) { + + switch (op->op) { + case OP_NOP: + break; + + case OP_TOUCH: + case OP_REMOVE: + case OP_SETATTR: + case OP_SETATTRS: + case OP_RMATTR: + case OP_RMATTRS: + case OP_COLL_REMOVE: + case OP_OMAP_CLEAR: + case OP_OMAP_SETKEYS: + case OP_OMAP_RMKEYS: + case OP_OMAP_RMKEYRANGE: + case OP_OMAP_SETHEADER: + case OP_WRITE: + case OP_ZERO: + case OP_TRUNCATE: + case OP_SETALLOCHINT: + ceph_assert(op->cid < cm.size()); + ceph_assert(op->oid < om.size()); + op->cid = cm[op->cid]; + op->oid = om[op->oid]; + break; + + case OP_CLONERANGE2: + case OP_CLONE: + ceph_assert(op->cid < cm.size()); + ceph_assert(op->oid < om.size()); + ceph_assert(op->dest_oid < om.size()); + op->cid = cm[op->cid]; + op->oid = om[op->oid]; + op->dest_oid = om[op->dest_oid]; + break; + + case OP_MKCOLL: + case OP_RMCOLL: + case OP_COLL_SETATTR: + case OP_COLL_RMATTR: + case OP_COLL_SETATTRS: + case OP_COLL_HINT: + case OP_COLL_SET_BITS: + ceph_assert(op->cid < cm.size()); + op->cid = cm[op->cid]; + break; + + case OP_COLL_ADD: + ceph_assert(op->cid < cm.size()); + ceph_assert(op->oid < om.size()); + ceph_assert(op->dest_cid < om.size()); + op->cid = cm[op->cid]; + op->dest_cid = cm[op->dest_cid]; + op->oid = om[op->oid]; + break; + + case OP_COLL_MOVE_RENAME: + ceph_assert(op->cid < cm.size()); + ceph_assert(op->oid < om.size()); + ceph_assert(op->dest_cid < cm.size()); + ceph_assert(op->dest_oid < om.size()); + op->cid = cm[op->cid]; + op->oid = om[op->oid]; + op->dest_cid = cm[op->dest_cid]; + op->dest_oid = om[op->dest_oid]; + break; + + case OP_TRY_RENAME: + ceph_assert(op->cid < cm.size()); + ceph_assert(op->oid < om.size()); + ceph_assert(op->dest_oid < om.size()); + op->cid = cm[op->cid]; + op->oid = om[op->oid]; + op->dest_oid = om[op->dest_oid]; + break; + + case OP_SPLIT_COLLECTION2: + ceph_assert(op->cid < cm.size()); + ceph_assert(op->dest_cid < cm.size()); + op->cid = cm[op->cid]; + op->dest_cid = cm[op->dest_cid]; + break; + + case OP_MERGE_COLLECTION: + ceph_assert(op->cid < cm.size()); + ceph_assert(op->dest_cid < cm.size()); + op->cid = cm[op->cid]; + op->dest_cid = cm[op->dest_cid]; + break; + + default: + ceph_abort_msg("Unknown OP"); + } + } + void _update_op_bl( + ceph::buffer::list& bl, + std::vector<__le32> &cm, + std::vector<__le32> &om) { + for (auto& bp : bl.buffers()) { + ceph_assert(bp.length() % sizeof(Op) == 0); + + char* raw_p = const_cast(bp.c_str()); + char* raw_end = raw_p + bp.length(); + while (raw_p < raw_end) { + _update_op(reinterpret_cast(raw_p), cm, om); + raw_p += sizeof(Op); + } + } + } + /// Append the operations of the parameter to this Transaction. Those operations are removed from the parameter Transaction + void append(Transaction& other) { + + data.ops += other.data.ops; + if (other.data.largest_data_len > data.largest_data_len) { + data.largest_data_len = other.data.largest_data_len; + data.largest_data_off = other.data.largest_data_off; + data.largest_data_off_in_data_bl = data_bl.length() + other.data.largest_data_off_in_data_bl; + } + data.fadvise_flags |= other.data.fadvise_flags; + on_applied.splice(on_applied.end(), other.on_applied); + on_commit.splice(on_commit.end(), other.on_commit); + on_applied_sync.splice(on_applied_sync.end(), other.on_applied_sync); + + //append coll_index & object_index + std::vector<__le32> cm(other.coll_index.size()); + std::map::iterator coll_index_p; + for (coll_index_p = other.coll_index.begin(); + coll_index_p != other.coll_index.end(); + ++coll_index_p) { + cm[coll_index_p->second] = _get_coll_id(coll_index_p->first); + } + + std::vector<__le32> om(other.object_index.size()); + std::map::iterator object_index_p; + for (object_index_p = other.object_index.begin(); + object_index_p != other.object_index.end(); + ++object_index_p) { + om[object_index_p->second] = _get_object_id(object_index_p->first); + } + + //the other.op_bl SHOULD NOT be changes during append operation, + //we use additional ceph::buffer::list to avoid this problem + ceph::buffer::list other_op_bl; + { + ceph::buffer::ptr other_op_bl_ptr(other.op_bl.length()); + other.op_bl.copy(0, other.op_bl.length(), other_op_bl_ptr.c_str()); + other_op_bl.append(std::move(other_op_bl_ptr)); + } + + //update other_op_bl with cm & om + //When the other is appended to current transaction, all coll_index and + //object_index in other.op_buffer should be updated by new index of the + //combined transaction + _update_op_bl(other_op_bl, cm, om); + + //append op_bl + op_bl.append(other_op_bl); + //append data_bl + data_bl.append(other.data_bl); + } + + /** Inquires about the Transaction as a whole. */ + + /// How big is the encoded Transaction buffer? + uint64_t get_encoded_bytes() { + //layout: data_bl + op_bl + coll_index + object_index + data + + // coll_index size, object_index size and sizeof(transaction_data) + // all here, so they may be computed at compile-time + size_t final_size = sizeof(__u32) * 2 + sizeof(data); + + // coll_index second and object_index second + final_size += (coll_index.size() + object_index.size()) * sizeof(__le32); + + // coll_index first + for (auto p = coll_index.begin(); p != coll_index.end(); ++p) { + final_size += p->first.encoded_size(); + } + + // object_index first + for (auto p = object_index.begin(); p != object_index.end(); ++p) { + final_size += p->first.encoded_size(); + } + + return data_bl.length() + + op_bl.length() + + final_size; + } + + /// Retain old version for regression testing purposes + uint64_t get_encoded_bytes_test() { + using ceph::encode; + //layout: data_bl + op_bl + coll_index + object_index + data + ceph::buffer::list bl; + encode(coll_index, bl); + encode(object_index, bl); + + return data_bl.length() + + op_bl.length() + + bl.length() + + sizeof(data); + } + + uint64_t get_num_bytes() { + return get_encoded_bytes(); + } + /// Size of largest data buffer to the "write" operation encountered so far + uint32_t get_data_length() { + return data.largest_data_len; + } + /// offset within the encoded buffer to the start of the largest data buffer that's encoded + uint32_t get_data_offset() { + if (data.largest_data_off_in_data_bl) { + return data.largest_data_off_in_data_bl + + sizeof(__u8) + // encode struct_v + sizeof(__u8) + // encode compat_v + sizeof(__u32) + // encode len + sizeof(__u32); // data_bl len + } + return 0; // none + } + /// offset of buffer as aligned to destination within object. + int get_data_alignment() { + if (!data.largest_data_len) + return 0; + return (0 - get_data_offset()) & ~CEPH_PAGE_MASK; + } + /// Is the Transaction empty (no operations) + bool empty() { + return !data.ops; + } + /// Number of operations in the transaction + int get_num_ops() { + return data.ops; + } + + /** + * iterator + * + * Helper object to parse Transactions. + * + * ObjectStore instances use this object to step down the encoded + * buffer decoding operation codes and parameters as we go. + * + */ + class iterator { + Transaction *t; + + uint64_t ops; + char* op_buffer_p; + + ceph::buffer::list::const_iterator data_bl_p; + + public: + std::vector colls; + std::vector objects; + + private: + explicit iterator(Transaction *t) + : t(t), + data_bl_p(t->data_bl.cbegin()), + colls(t->coll_index.size()), + objects(t->object_index.size()) { + + ops = t->data.ops; + op_buffer_p = t->op_bl.c_str(); + + std::map::iterator coll_index_p; + for (coll_index_p = t->coll_index.begin(); + coll_index_p != t->coll_index.end(); + ++coll_index_p) { + colls[coll_index_p->second] = coll_index_p->first; + } + + std::map::iterator object_index_p; + for (object_index_p = t->object_index.begin(); + object_index_p != t->object_index.end(); + ++object_index_p) { + objects[object_index_p->second] = object_index_p->first; + } + } + + friend class Transaction; + + public: + + bool have_op() { + return ops > 0; + } + Op* decode_op() { + ceph_assert(ops > 0); + + Op* op = reinterpret_cast(op_buffer_p); + op_buffer_p += sizeof(Op); + ops--; + + return op; + } + std::string decode_string() { + using ceph::decode; + std::string s; + decode(s, data_bl_p); + return s; + } + void decode_bp(ceph::buffer::ptr& bp) { + using ceph::decode; + decode(bp, data_bl_p); + } + void decode_bl(ceph::buffer::list& bl) { + using ceph::decode; + decode(bl, data_bl_p); + } + void decode_attrset(std::map& aset) { + using ceph::decode; + decode(aset, data_bl_p); + } + void decode_attrset(std::map& aset) { + using ceph::decode; + decode(aset, data_bl_p); + } + void decode_attrset_bl(ceph::buffer::list *pbl) { + decode_str_str_map_to_bl(data_bl_p, pbl); + } + void decode_keyset(std::set &keys){ + using ceph::decode; + decode(keys, data_bl_p); + } + void decode_keyset_bl(ceph::buffer::list *pbl){ + decode_str_set_to_bl(data_bl_p, pbl); + } + + const ghobject_t &get_oid(__le32 oid_id) { + ceph_assert(oid_id < objects.size()); + return objects[oid_id]; + } + const coll_t &get_cid(__le32 cid_id) { + ceph_assert(cid_id < colls.size()); + return colls[cid_id]; + } + uint32_t get_fadvise_flags() const { + return t->get_fadvise_flags(); + } + }; + + iterator begin() { + return iterator(this); + } + +private: + void _build_actions_from_tbl(); + + /** + * Helper functions to encode the various mutation elements of a + * transaction. These are 1:1 with the operation codes (see + * enumeration above). These routines ensure that the + * encoder/creator of a transaction gets the right data in the + * right place. Sadly, there's no corresponding version nor any + * form of seat belts for the decoder. + */ + Op* _get_next_op() { + if (op_bl.get_append_buffer_unused_tail_length() < sizeof(Op)) { + op_bl.reserve(sizeof(Op) * OPS_PER_PTR); + } + // append_hole ensures bptr merging. Even huge number of ops + // shouldn't result in overpopulating bl::_buffers. + char* const p = op_bl.append_hole(sizeof(Op)).c_str(); + memset(p, 0, sizeof(Op)); + return reinterpret_cast(p); + } + __le32 _get_coll_id(const coll_t& coll) { + std::map::iterator c = coll_index.find(coll); + if (c != coll_index.end()) + return c->second; + + __le32 index_id = coll_id++; + coll_index[coll] = index_id; + return index_id; + } + __le32 _get_object_id(const ghobject_t& oid) { + std::map::iterator o = object_index.find(oid); + if (o != object_index.end()) + return o->second; + + __le32 index_id = object_id++; + object_index[oid] = index_id; + return index_id; + } + +public: + /// noop. 'nuf said + void nop() { + Op* _op = _get_next_op(); + _op->op = OP_NOP; + data.ops++; + } + /** + * touch + * + * Ensure the existance of an object in a collection. Create an + * empty object if necessary + */ + void touch(const coll_t& cid, const ghobject_t& oid) { + Op* _op = _get_next_op(); + _op->op = OP_TOUCH; + _op->cid = _get_coll_id(cid); + _op->oid = _get_object_id(oid); + data.ops++; + } + /** + * Write data to an offset within an object. If the object is too + * small, it is expanded as needed. It is possible to specify an + * offset beyond the current end of an object and it will be + * expanded as needed. Simple implementations of ObjectStore will + * just zero the data between the old end of the object and the + * newly provided data. More sophisticated implementations of + * ObjectStore will omit the untouched data and store it as a + * "hole" in the file. + * + * Note that a 0-length write does not affect the size of the object. + */ + void write(const coll_t& cid, const ghobject_t& oid, uint64_t off, uint64_t len, + const ceph::buffer::list& write_data, uint32_t flags = 0) { + using ceph::encode; + uint32_t orig_len = data_bl.length(); + Op* _op = _get_next_op(); + _op->op = OP_WRITE; + _op->cid = _get_coll_id(cid); + _op->oid = _get_object_id(oid); + _op->off = off; + _op->len = len; + encode(write_data, data_bl); + + ceph_assert(len == write_data.length()); + data.fadvise_flags = data.fadvise_flags | flags; + if (write_data.length() > data.largest_data_len) { + data.largest_data_len = write_data.length(); + data.largest_data_off = off; + data.largest_data_off_in_data_bl = orig_len + sizeof(__u32); // we are about to + } + data.ops++; + } + /** + * zero out the indicated byte range within an object. Some + * ObjectStore instances may optimize this to release the + * underlying storage space. + * + * If the zero range extends beyond the end of the object, the object + * size is extended, just as if we were writing a buffer full of zeros. + * EXCEPT if the length is 0, in which case (just like a 0-length write) + * we do not adjust the object size. + */ + void zero(const coll_t& cid, const ghobject_t& oid, uint64_t off, uint64_t len) { + Op* _op = _get_next_op(); + _op->op = OP_ZERO; + _op->cid = _get_coll_id(cid); + _op->oid = _get_object_id(oid); + _op->off = off; + _op->len = len; + data.ops++; + } + /// Discard all data in the object beyond the specified size. + void truncate(const coll_t& cid, const ghobject_t& oid, uint64_t off) { + Op* _op = _get_next_op(); + _op->op = OP_TRUNCATE; + _op->cid = _get_coll_id(cid); + _op->oid = _get_object_id(oid); + _op->off = off; + data.ops++; + } + /// Remove an object. All four parts of the object are removed. + void remove(const coll_t& cid, const ghobject_t& oid) { + Op* _op = _get_next_op(); + _op->op = OP_REMOVE; + _op->cid = _get_coll_id(cid); + _op->oid = _get_object_id(oid); + data.ops++; + } + /// Set an xattr of an object + void setattr(const coll_t& cid, const ghobject_t& oid, const char* name, ceph::buffer::list& val) { + std::string n(name); + setattr(cid, oid, n, val); + } + /// Set an xattr of an object + void setattr(const coll_t& cid, const ghobject_t& oid, const std::string& s, ceph::buffer::list& val) { + using ceph::encode; + Op* _op = _get_next_op(); + _op->op = OP_SETATTR; + _op->cid = _get_coll_id(cid); + _op->oid = _get_object_id(oid); + encode(s, data_bl); + encode(val, data_bl); + data.ops++; + } + /// Set multiple xattrs of an object + void setattrs(const coll_t& cid, const ghobject_t& oid, const std::map& attrset) { + using ceph::encode; + Op* _op = _get_next_op(); + _op->op = OP_SETATTRS; + _op->cid = _get_coll_id(cid); + _op->oid = _get_object_id(oid); + encode(attrset, data_bl); + data.ops++; + } + /// Set multiple xattrs of an object + void setattrs(const coll_t& cid, const ghobject_t& oid, const std::map& attrset) { + using ceph::encode; + Op* _op = _get_next_op(); + _op->op = OP_SETATTRS; + _op->cid = _get_coll_id(cid); + _op->oid = _get_object_id(oid); + encode(attrset, data_bl); + data.ops++; + } + /// remove an xattr from an object + void rmattr(const coll_t& cid, const ghobject_t& oid, const char *name) { + std::string n(name); + rmattr(cid, oid, n); + } + /// remove an xattr from an object + void rmattr(const coll_t& cid, const ghobject_t& oid, const std::string& s) { + using ceph::encode; + Op* _op = _get_next_op(); + _op->op = OP_RMATTR; + _op->cid = _get_coll_id(cid); + _op->oid = _get_object_id(oid); + encode(s, data_bl); + data.ops++; + } + /// remove all xattrs from an object + void rmattrs(const coll_t& cid, const ghobject_t& oid) { + Op* _op = _get_next_op(); + _op->op = OP_RMATTRS; + _op->cid = _get_coll_id(cid); + _op->oid = _get_object_id(oid); + data.ops++; + } + /** + * Clone an object into another object. + * + * Low-cost (e.g., O(1)) cloning (if supported) is best, but + * fallback to an O(n) copy is allowed. All four parts of the + * object are cloned (data, xattrs, omap header, omap + * entries). + * + * The destination named object may already exist, in + * which case its previous contents are discarded. + */ + void clone(const coll_t& cid, const ghobject_t& oid, + const ghobject_t& noid) { + Op* _op = _get_next_op(); + _op->op = OP_CLONE; + _op->cid = _get_coll_id(cid); + _op->oid = _get_object_id(oid); + _op->dest_oid = _get_object_id(noid); + data.ops++; + } + /** + * Clone a byte range from one object to another. + * + * The data portion of the destination object receives a copy of a + * portion of the data from the source object. None of the other + * three parts of an object is copied from the source. + * + * The destination object size may be extended to the dstoff + len. + * + * The source range *must* overlap with the source object data. If it does + * not the result is undefined. + */ + void clone_range(const coll_t& cid, const ghobject_t& oid, + const ghobject_t& noid, + uint64_t srcoff, uint64_t srclen, uint64_t dstoff) { + Op* _op = _get_next_op(); + _op->op = OP_CLONERANGE2; + _op->cid = _get_coll_id(cid); + _op->oid = _get_object_id(oid); + _op->dest_oid = _get_object_id(noid); + _op->off = srcoff; + _op->len = srclen; + _op->dest_off = dstoff; + data.ops++; + } + + /// Create the collection + void create_collection(const coll_t& cid, int bits) { + Op* _op = _get_next_op(); + _op->op = OP_MKCOLL; + _op->cid = _get_coll_id(cid); + _op->split_bits = bits; + data.ops++; + } + + /** + * Give the collection a hint. + * + * @param cid - collection id. + * @param type - hint type. + * @param hint - the hint payload, which contains the customized + * data along with the hint type. + */ + void collection_hint(const coll_t& cid, uint32_t type, const ceph::buffer::list& hint) { + using ceph::encode; + Op* _op = _get_next_op(); + _op->op = OP_COLL_HINT; + _op->cid = _get_coll_id(cid); + _op->hint_type = type; + encode(hint, data_bl); + data.ops++; + } + + /// remove the collection, the collection must be empty + void remove_collection(const coll_t& cid) { + Op* _op = _get_next_op(); + _op->op = OP_RMCOLL; + _op->cid = _get_coll_id(cid); + data.ops++; + } + void collection_move(const coll_t& cid, const coll_t &oldcid, const ghobject_t& oid) + __attribute__ ((deprecated)) { + // NOTE: we encode this as a fixed combo of ADD + REMOVE. they + // always appear together, so this is effectively a single MOVE. + Op* _op = _get_next_op(); + _op->op = OP_COLL_ADD; + _op->cid = _get_coll_id(oldcid); + _op->oid = _get_object_id(oid); + _op->dest_cid = _get_coll_id(cid); + data.ops++; + + _op = _get_next_op(); + _op->op = OP_COLL_REMOVE; + _op->cid = _get_coll_id(oldcid); + _op->oid = _get_object_id(oid); + data.ops++; + } + void collection_move_rename(const coll_t& oldcid, const ghobject_t& oldoid, + const coll_t &cid, const ghobject_t& oid) { + Op* _op = _get_next_op(); + _op->op = OP_COLL_MOVE_RENAME; + _op->cid = _get_coll_id(oldcid); + _op->oid = _get_object_id(oldoid); + _op->dest_cid = _get_coll_id(cid); + _op->dest_oid = _get_object_id(oid); + data.ops++; + } + void try_rename(const coll_t &cid, const ghobject_t& oldoid, + const ghobject_t& oid) { + Op* _op = _get_next_op(); + _op->op = OP_TRY_RENAME; + _op->cid = _get_coll_id(cid); + _op->oid = _get_object_id(oldoid); + _op->dest_oid = _get_object_id(oid); + data.ops++; + } + + /// Remove omap from oid + void omap_clear( + const coll_t &cid, ///< [in] Collection containing oid + const ghobject_t &oid ///< [in] Object from which to remove omap + ) { + Op* _op = _get_next_op(); + _op->op = OP_OMAP_CLEAR; + _op->cid = _get_coll_id(cid); + _op->oid = _get_object_id(oid); + data.ops++; + } + /// Set keys on oid omap. Replaces duplicate keys. + void omap_setkeys( + const coll_t& cid, ///< [in] Collection containing oid + const ghobject_t &oid, ///< [in] Object to update + const std::map &attrset ///< [in] Replacement keys and values + ) { + using ceph::encode; + Op* _op = _get_next_op(); + _op->op = OP_OMAP_SETKEYS; + _op->cid = _get_coll_id(cid); + _op->oid = _get_object_id(oid); + encode(attrset, data_bl); + data.ops++; + } + + /// Set keys on an oid omap (ceph::buffer::list variant). + void omap_setkeys( + const coll_t &cid, ///< [in] Collection containing oid + const ghobject_t &oid, ///< [in] Object to update + const ceph::buffer::list &attrset_bl ///< [in] Replacement keys and values + ) { + Op* _op = _get_next_op(); + _op->op = OP_OMAP_SETKEYS; + _op->cid = _get_coll_id(cid); + _op->oid = _get_object_id(oid); + data_bl.append(attrset_bl); + data.ops++; + } + + /// Remove keys from oid omap + void omap_rmkeys( + const coll_t &cid, ///< [in] Collection containing oid + const ghobject_t &oid, ///< [in] Object from which to remove the omap + const std::set &keys ///< [in] Keys to clear + ) { + using ceph::encode; + Op* _op = _get_next_op(); + _op->op = OP_OMAP_RMKEYS; + _op->cid = _get_coll_id(cid); + _op->oid = _get_object_id(oid); + encode(keys, data_bl); + data.ops++; + } + + /// Remove keys from oid omap + void omap_rmkeys( + const coll_t &cid, ///< [in] Collection containing oid + const ghobject_t &oid, ///< [in] Object from which to remove the omap + const ceph::buffer::list &keys_bl ///< [in] Keys to clear + ) { + Op* _op = _get_next_op(); + _op->op = OP_OMAP_RMKEYS; + _op->cid = _get_coll_id(cid); + _op->oid = _get_object_id(oid); + data_bl.append(keys_bl); + data.ops++; + } + + /// Remove key range from oid omap + void omap_rmkeyrange( + const coll_t &cid, ///< [in] Collection containing oid + const ghobject_t &oid, ///< [in] Object from which to remove the omap keys + const std::string& first, ///< [in] first key in range + const std::string& last ///< [in] first key past range, range is [first,last) + ) { + using ceph::encode; + Op* _op = _get_next_op(); + _op->op = OP_OMAP_RMKEYRANGE; + _op->cid = _get_coll_id(cid); + _op->oid = _get_object_id(oid); + encode(first, data_bl); + encode(last, data_bl); + data.ops++; + } + + /// Set omap header + void omap_setheader( + const coll_t &cid, ///< [in] Collection containing oid + const ghobject_t &oid, ///< [in] Object + const ceph::buffer::list &bl ///< [in] Header value + ) { + using ceph::encode; + Op* _op = _get_next_op(); + _op->op = OP_OMAP_SETHEADER; + _op->cid = _get_coll_id(cid); + _op->oid = _get_object_id(oid); + encode(bl, data_bl); + data.ops++; + } + + /// Split collection based on given prefixes, objects matching the specified bits/rem are + /// moved to the new collection + void split_collection( + const coll_t &cid, + uint32_t bits, + uint32_t rem, + const coll_t &destination) { + Op* _op = _get_next_op(); + _op->op = OP_SPLIT_COLLECTION2; + _op->cid = _get_coll_id(cid); + _op->dest_cid = _get_coll_id(destination); + _op->split_bits = bits; + _op->split_rem = rem; + data.ops++; + } + + /// Merge collection into another. + void merge_collection( + coll_t cid, + coll_t destination, + uint32_t bits) { + Op* _op = _get_next_op(); + _op->op = OP_MERGE_COLLECTION; + _op->cid = _get_coll_id(cid); + _op->dest_cid = _get_coll_id(destination); + _op->split_bits = bits; + data.ops++; + } + + void collection_set_bits( + const coll_t &cid, + int bits) { + Op* _op = _get_next_op(); + _op->op = OP_COLL_SET_BITS; + _op->cid = _get_coll_id(cid); + _op->split_bits = bits; + data.ops++; + } + + /// Set allocation hint for an object + /// make 0 values(expected_object_size, expected_write_size) noops for all implementations + void set_alloc_hint( + const coll_t &cid, + const ghobject_t &oid, + uint64_t expected_object_size, + uint64_t expected_write_size, + uint32_t flags + ) { + Op* _op = _get_next_op(); + _op->op = OP_SETALLOCHINT; + _op->cid = _get_coll_id(cid); + _op->oid = _get_object_id(oid); + _op->expected_object_size = expected_object_size; + _op->expected_write_size = expected_write_size; + _op->alloc_hint_flags = flags; + data.ops++; + } + + void encode(ceph::buffer::list& bl) const { + //layout: data_bl + op_bl + coll_index + object_index + data + ENCODE_START(9, 9, bl); + encode(data_bl, bl); + encode(op_bl, bl); + encode(coll_index, bl); + encode(object_index, bl); + data.encode(bl); + ENCODE_FINISH(bl); + } + + void decode(ceph::buffer::list::const_iterator &bl) { + DECODE_START(9, bl); + DECODE_OLDEST(9); + + decode(data_bl, bl); + decode(op_bl, bl); + decode(coll_index, bl); + decode(object_index, bl); + data.decode(bl); + coll_id = coll_index.size(); + object_id = object_index.size(); + + DECODE_FINISH(bl); + } + + void dump(ceph::Formatter *f); + static void generate_test_instances(std::list& o); +}; +WRITE_CLASS_ENCODER(Transaction) +WRITE_CLASS_ENCODER(Transaction::TransactionData) + +std::ostream& operator<<(std::ostream& out, const Transaction& tx); + +}