From 8a2ea310f5591a1b05e70ac88656342d0cd2a59c Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Mon, 29 Apr 2019 10:11:01 -0700 Subject: [PATCH] os: break Transaction of of Objectstore, use in crimson Transaction is needed in crimson, but not ObjectStore. Signed-off-by: Samuel Just --- src/crimson/os/CMakeLists.txt | 2 +- src/crimson/os/Transaction.cc | 507 ------------ src/crimson/os/cyan_store.cc | 10 +- src/crimson/osd/osd.cc | 2 +- src/crimson/osd/osd_meta.cc | 2 +- src/crimson/osd/pg.cc | 2 +- src/crimson/osd/pg.h | 2 +- src/crimson/osd/pg_backend.cc | 2 +- src/crimson/osd/pg_backend.h | 2 +- src/os/ObjectStore.cc | 43 - src/os/ObjectStore.h | 1235 +--------------------------- src/os/Transaction.cc | 50 +- src/{crimson => }/os/Transaction.h | 457 +++++----- 13 files changed, 295 insertions(+), 2021 deletions(-) delete mode 100644 src/crimson/os/Transaction.cc rename src/{crimson => }/os/Transaction.h (80%) diff --git a/src/crimson/os/CMakeLists.txt b/src/crimson/os/CMakeLists.txt index 264e35252e3e..6362148bd576 100644 --- a/src/crimson/os/CMakeLists.txt +++ b/src/crimson/os/CMakeLists.txt @@ -2,6 +2,6 @@ add_library(crimson-os cyan_store.cc cyan_collection.cc cyan_object.cc - Transaction.cc) + ${PROJECT_SOURCE_DIR}/src/os/Transaction.cc) target_link_libraries(crimson-os crimson) diff --git a/src/crimson/os/Transaction.cc b/src/crimson/os/Transaction.cc deleted file mode 100644 index acf99c5926cd..000000000000 --- a/src/crimson/os/Transaction.cc +++ /dev/null @@ -1,507 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab - -#include "Transaction.h" -#include "include/denc.h" - -namespace ceph::os -{ - -void Transaction::iterator::decode_attrset_bl(bufferlist *pbl) -{ - using ceph::decode; - auto start = data_bl_p; - __u32 n; - decode(n, data_bl_p); - unsigned len = 4; - while (n--) { - __u32 l; - decode(l, data_bl_p); - data_bl_p.advance(l); - len += 4 + l; - decode(l, data_bl_p); - data_bl_p.advance(l); - len += 4 + l; - } - start.copy(len, *pbl); -} - -void Transaction::iterator::decode_keyset_bl(bufferlist *pbl) -{ - using ceph::decode; - auto start = data_bl_p; - __u32 n; - decode(n, data_bl_p); - unsigned len = 4; - while (n--) { - __u32 l; - decode(l, data_bl_p); - data_bl_p.advance(l); - len += 4 + l; - } - start.copy(len, *pbl); -} - -void Transaction::dump(ceph::Formatter *f) -{ - f->open_array_section("ops"); - iterator i = begin(); - int op_num = 0; - bool stop_looping = false; - while (i.have_op() && !stop_looping) { - Transaction::Op *op = i.decode_op(); - f->open_object_section("op"); - f->dump_int("op_num", op_num); - - switch (op->op) { - case Transaction::OP_NOP: - f->dump_string("op_name", "nop"); - break; - case Transaction::OP_TOUCH: - { - coll_t cid = i.get_cid(op->cid); - ghobject_t oid = i.get_oid(op->oid); - f->dump_string("op_name", "touch"); - f->dump_stream("collection") << cid; - f->dump_stream("oid") << oid; - } - break; - - case Transaction::OP_WRITE: - { - coll_t cid = i.get_cid(op->cid); - ghobject_t oid = i.get_oid(op->oid); - uint64_t off = op->off; - uint64_t len = op->len; - bufferlist bl; - i.decode_bl(bl); - f->dump_string("op_name", "write"); - f->dump_stream("collection") << cid; - f->dump_stream("oid") << oid; - f->dump_unsigned("length", len); - f->dump_unsigned("offset", off); - f->dump_unsigned("bufferlist length", bl.length()); - } - break; - - case Transaction::OP_ZERO: - { - coll_t cid = i.get_cid(op->cid); - ghobject_t oid = i.get_oid(op->oid); - uint64_t off = op->off; - uint64_t len = op->len; - f->dump_string("op_name", "zero"); - f->dump_stream("collection") << cid; - f->dump_stream("oid") << oid; - f->dump_unsigned("offset", off); - f->dump_unsigned("length", len); - } - break; - - case Transaction::OP_TRIMCACHE: - { - // deprecated, no-op - f->dump_string("op_name", "trim_cache"); - } - break; - - case Transaction::OP_TRUNCATE: - { - coll_t cid = i.get_cid(op->cid); - ghobject_t oid = i.get_oid(op->oid); - uint64_t off = op->off; - f->dump_string("op_name", "truncate"); - f->dump_stream("collection") << cid; - f->dump_stream("oid") << oid; - f->dump_unsigned("offset", off); - } - break; - - case Transaction::OP_REMOVE: - { - coll_t cid = i.get_cid(op->cid); - ghobject_t oid = i.get_oid(op->oid); - f->dump_string("op_name", "remove"); - f->dump_stream("collection") << cid; - f->dump_stream("oid") << oid; - } - break; - - case Transaction::OP_SETATTR: - { - coll_t cid = i.get_cid(op->cid); - ghobject_t oid = i.get_oid(op->oid); - string name = i.decode_string(); - bufferlist bl; - i.decode_bl(bl); - f->dump_string("op_name", "setattr"); - f->dump_stream("collection") << cid; - f->dump_stream("oid") << oid; - f->dump_string("name", name); - f->dump_unsigned("length", bl.length()); - } - break; - - case Transaction::OP_SETATTRS: - { - coll_t cid = i.get_cid(op->cid); - ghobject_t oid = i.get_oid(op->oid); - map aset; - i.decode_attrset(aset); - f->dump_string("op_name", "setattrs"); - f->dump_stream("collection") << cid; - f->dump_stream("oid") << oid; - f->open_object_section("attr_lens"); - for (map::iterator p = aset.begin(); - p != aset.end(); ++p) { - f->dump_unsigned(p->first.c_str(), p->second.length()); - } - f->close_section(); - } - break; - - case Transaction::OP_RMATTR: - { - coll_t cid = i.get_cid(op->cid); - ghobject_t oid = i.get_oid(op->oid); - string name = i.decode_string(); - f->dump_string("op_name", "rmattr"); - f->dump_stream("collection") << cid; - f->dump_stream("oid") << oid; - f->dump_string("name", name); - } - break; - - case Transaction::OP_RMATTRS: - { - coll_t cid = i.get_cid(op->cid); - ghobject_t oid = i.get_oid(op->oid); - f->dump_string("op_name", "rmattrs"); - f->dump_stream("collection") << cid; - f->dump_stream("oid") << oid; - } - break; - - case Transaction::OP_CLONE: - { - coll_t cid = i.get_cid(op->cid); - ghobject_t oid = i.get_oid(op->oid); - ghobject_t noid = i.get_oid(op->dest_oid); - f->dump_string("op_name", "clone"); - f->dump_stream("collection") << cid; - f->dump_stream("src_oid") << oid; - f->dump_stream("dst_oid") << noid; - } - break; - - case Transaction::OP_CLONERANGE: - { - coll_t cid = i.get_cid(op->cid); - ghobject_t oid = i.get_oid(op->oid); - ghobject_t noid = i.get_oid(op->dest_oid); - uint64_t off = op->off; - uint64_t len = op->len; - f->dump_string("op_name", "clonerange"); - f->dump_stream("collection") << cid; - f->dump_stream("src_oid") << oid; - f->dump_stream("dst_oid") << noid; - f->dump_unsigned("offset", off); - f->dump_unsigned("len", len); - } - break; - - case Transaction::OP_CLONERANGE2: - { - coll_t cid = i.get_cid(op->cid); - ghobject_t oid = i.get_oid(op->oid); - ghobject_t noid = i.get_oid(op->dest_oid); - uint64_t srcoff = op->off; - uint64_t len = op->len; - uint64_t dstoff = op->dest_off; - f->dump_string("op_name", "clonerange2"); - f->dump_stream("collection") << cid; - f->dump_stream("src_oid") << oid; - f->dump_stream("dst_oid") << noid; - f->dump_unsigned("src_offset", srcoff); - f->dump_unsigned("len", len); - f->dump_unsigned("dst_offset", dstoff); - } - break; - - case Transaction::OP_MKCOLL: - { - coll_t cid = i.get_cid(op->cid); - f->dump_string("op_name", "mkcoll"); - f->dump_stream("collection") << cid; - } - break; - - case Transaction::OP_COLL_HINT: - { - using ceph::decode; - coll_t cid = i.get_cid(op->cid); - uint32_t type = op->hint_type; - f->dump_string("op_name", "coll_hint"); - f->dump_stream("collection") << cid; - f->dump_unsigned("type", type); - bufferlist hint; - i.decode_bl(hint); - auto hiter = hint.cbegin(); - if (type == Transaction::COLL_HINT_EXPECTED_NUM_OBJECTS) { - uint32_t pg_num; - uint64_t num_objs; - decode(pg_num, hiter); - decode(num_objs, hiter); - f->dump_unsigned("pg_num", pg_num); - f->dump_unsigned("expected_num_objects", num_objs); - } - } - break; - - case Transaction::OP_COLL_SET_BITS: - { - coll_t cid = i.get_cid(op->cid); - f->dump_string("op_name", "coll_set_bits"); - f->dump_stream("collection") << cid; - f->dump_unsigned("bits", op->split_bits); - } - break; - - case Transaction::OP_RMCOLL: - { - coll_t cid = i.get_cid(op->cid); - f->dump_string("op_name", "rmcoll"); - f->dump_stream("collection") << cid; - } - break; - - case Transaction::OP_COLL_ADD: - { - coll_t ocid = i.get_cid(op->cid); - coll_t ncid = i.get_cid(op->dest_cid); - ghobject_t oid = i.get_oid(op->oid); - f->dump_string("op_name", "collection_add"); - f->dump_stream("src_collection") << ocid; - f->dump_stream("dst_collection") << ncid; - f->dump_stream("oid") << oid; - } - break; - - case Transaction::OP_COLL_REMOVE: - { - coll_t cid = i.get_cid(op->cid); - ghobject_t oid = i.get_oid(op->oid); - f->dump_string("op_name", "collection_remove"); - f->dump_stream("collection") << cid; - f->dump_stream("oid") << oid; - } - break; - - case Transaction::OP_COLL_MOVE: - { - coll_t ocid = i.get_cid(op->cid); - coll_t ncid = i.get_cid(op->dest_cid); - ghobject_t oid = i.get_oid(op->oid); - f->open_object_section("collection_move"); - f->dump_stream("src_collection") << ocid; - f->dump_stream("dst_collection") << ncid; - f->dump_stream("oid") << oid; - f->close_section(); - } - break; - - case Transaction::OP_COLL_SETATTR: - { - coll_t cid = i.get_cid(op->cid); - string name = i.decode_string(); - bufferlist bl; - i.decode_bl(bl); - f->dump_string("op_name", "collection_setattr"); - f->dump_stream("collection") << cid; - f->dump_string("name", name); - f->dump_unsigned("length", bl.length()); - } - break; - - case Transaction::OP_COLL_RMATTR: - { - coll_t cid = i.get_cid(op->cid); - string name = i.decode_string(); - f->dump_string("op_name", "collection_rmattr"); - f->dump_stream("collection") << cid; - f->dump_string("name", name); - } - break; - - case Transaction::OP_COLL_RENAME: - { - f->dump_string("op_name", "collection_rename"); - } - break; - - case Transaction::OP_OMAP_CLEAR: - { - coll_t cid = i.get_cid(op->cid); - ghobject_t oid = i.get_oid(op->oid); - f->dump_string("op_name", "omap_clear"); - f->dump_stream("collection") << cid; - f->dump_stream("oid") << oid; - } - break; - - case Transaction::OP_OMAP_SETKEYS: - { - coll_t cid = i.get_cid(op->cid); - ghobject_t oid = i.get_oid(op->oid); - map aset; - i.decode_attrset(aset); - f->dump_string("op_name", "omap_setkeys"); - f->dump_stream("collection") << cid; - f->dump_stream("oid") << oid; - f->open_object_section("attr_lens"); - for (map::iterator p = aset.begin(); - p != aset.end(); ++p) { - f->dump_unsigned(p->first.c_str(), p->second.length()); - } - f->close_section(); - } - break; - - case Transaction::OP_OMAP_RMKEYS: - { - coll_t cid = i.get_cid(op->cid); - ghobject_t oid = i.get_oid(op->oid); - set keys; - i.decode_keyset(keys); - f->dump_string("op_name", "omap_rmkeys"); - f->dump_stream("collection") << cid; - f->dump_stream("oid") << oid; - f->open_array_section("attrs"); - for (auto& k : keys) { - f->dump_string("", k.c_str()); - } - f->close_section(); - } - break; - - case Transaction::OP_OMAP_SETHEADER: - { - coll_t cid = i.get_cid(op->cid); - ghobject_t oid = i.get_oid(op->oid); - bufferlist bl; - i.decode_bl(bl); - f->dump_string("op_name", "omap_setheader"); - f->dump_stream("collection") << cid; - f->dump_stream("oid") << oid; - f->dump_stream("header_length") << bl.length(); - } - break; - - case Transaction::OP_SPLIT_COLLECTION: - { - coll_t cid = i.get_cid(op->cid); - uint32_t bits = op->split_bits; - uint32_t rem = op->split_rem; - coll_t dest = i.get_cid(op->dest_cid); - f->dump_string("op_name", "op_split_collection_create"); - f->dump_stream("collection") << cid; - f->dump_stream("bits") << bits; - f->dump_stream("rem") << rem; - f->dump_stream("dest") << dest; - } - break; - - case Transaction::OP_SPLIT_COLLECTION2: - { - coll_t cid = i.get_cid(op->cid); - uint32_t bits = op->split_bits; - uint32_t rem = op->split_rem; - coll_t dest = i.get_cid(op->dest_cid); - f->dump_string("op_name", "op_split_collection"); - f->dump_stream("collection") << cid; - f->dump_stream("bits") << bits; - f->dump_stream("rem") << rem; - f->dump_stream("dest") << dest; - } - break; - - case Transaction::OP_MERGE_COLLECTION: - { - coll_t cid = i.get_cid(op->cid); - uint32_t bits = op->split_bits; - coll_t dest = i.get_cid(op->dest_cid); - f->dump_string("op_name", "op_merge_collection"); - f->dump_stream("collection") << cid; - f->dump_stream("dest") << dest; - f->dump_stream("bits") << bits; - } - break; - - case Transaction::OP_OMAP_RMKEYRANGE: - { - coll_t cid = i.get_cid(op->cid); - ghobject_t oid = i.get_oid(op->oid); - string first, last; - first = i.decode_string(); - last = i.decode_string(); - f->dump_string("op_name", "op_omap_rmkeyrange"); - f->dump_stream("collection") << cid; - f->dump_stream("oid") << oid; - f->dump_string("first", first); - f->dump_string("last", last); - } - break; - - case Transaction::OP_COLL_MOVE_RENAME: - { - coll_t old_cid = i.get_cid(op->cid); - ghobject_t old_oid = i.get_oid(op->oid); - coll_t new_cid = i.get_cid(op->dest_cid); - ghobject_t new_oid = i.get_oid(op->dest_oid); - f->dump_string("op_name", "op_coll_move_rename"); - f->dump_stream("old_collection") << old_cid; - f->dump_stream("old_oid") << old_oid; - f->dump_stream("new_collection") << new_cid; - f->dump_stream("new_oid") << new_oid; - } - break; - - case Transaction::OP_TRY_RENAME: - { - coll_t cid = i.get_cid(op->cid); - ghobject_t old_oid = i.get_oid(op->oid); - ghobject_t new_oid = i.get_oid(op->dest_oid); - f->dump_string("op_name", "op_coll_move_rename"); - f->dump_stream("collection") << cid; - f->dump_stream("old_oid") << old_oid; - f->dump_stream("new_oid") << new_oid; - } - break; - - case Transaction::OP_SETALLOCHINT: - { - coll_t cid = i.get_cid(op->cid); - ghobject_t oid = i.get_oid(op->oid); - uint64_t expected_object_size = op->expected_object_size; - uint64_t expected_write_size = op->expected_write_size; - f->dump_string("op_name", "op_setallochint"); - f->dump_stream("collection") << cid; - f->dump_stream("oid") << oid; - f->dump_stream("expected_object_size") << expected_object_size; - f->dump_stream("expected_write_size") << expected_write_size; - } - break; - - default: - f->dump_string("op_name", "unknown"); - f->dump_unsigned("op_code", op->op); - stop_looping = true; - break; - } - f->close_section(); - op_num++; - } - f->close_section(); -} - -} diff --git a/src/crimson/os/cyan_store.cc b/src/crimson/os/cyan_store.cc index 4319fc148531..4fe9fcb8681d 100644 --- a/src/crimson/os/cyan_store.cc +++ b/src/crimson/os/cyan_store.cc @@ -7,7 +7,7 @@ #include "crimson/os/cyan_collection.h" #include "crimson/os/cyan_object.h" -#include "crimson/os/Transaction.h" +#include "os/Transaction.h" namespace { seastar::logger& logger() { @@ -34,9 +34,9 @@ seastar::future<> CyanStore::mount() throw std::runtime_error("read_file"); } - set collections; + std::set collections; auto p = bl.cbegin(); - decode(collections, p); + ceph::decode(collections, p); for (auto& coll : collections) { string fn = fmt::format("{}/{}", path, coll); @@ -69,7 +69,7 @@ seastar::future<> CyanStore::umount() string fn = path + "/collections"; bufferlist bl; - encode(collections, bl); + ceph::encode(collections, bl); if (int r = bl.write_file(fn.c_str()); r < 0) { throw std::runtime_error("write_file"); } @@ -95,7 +95,7 @@ seastar::future<> CyanStore::mkfs() string fn = path + "/collections"; bufferlist bl; set collections; - encode(collections, bl); + ceph::encode(collections, bl); r = bl.write_file(fn.c_str()); if (r < 0) throw std::runtime_error("write_file"); diff --git a/src/crimson/osd/osd.cc b/src/crimson/osd/osd.cc index eb73a3f0960d..5cfa29766cdd 100644 --- a/src/crimson/osd/osd.cc +++ b/src/crimson/osd/osd.cc @@ -24,7 +24,7 @@ #include "crimson/os/cyan_collection.h" #include "crimson/os/cyan_object.h" #include "crimson/os/cyan_store.h" -#include "crimson/os/Transaction.h" +#include "os/Transaction.h" #include "crimson/osd/heartbeat.h" #include "crimson/osd/osd_meta.h" #include "crimson/osd/pg.h" diff --git a/src/crimson/osd/osd_meta.cc b/src/crimson/osd/osd_meta.cc index 309d70e50bb8..e81455c68c21 100644 --- a/src/crimson/osd/osd_meta.cc +++ b/src/crimson/osd/osd_meta.cc @@ -4,7 +4,7 @@ #include "crimson/os/cyan_collection.h" #include "crimson/os/cyan_store.h" -#include "crimson/os/Transaction.h" +#include "os/Transaction.h" void OSDMeta::create(ceph::os::Transaction& t) { diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc index 6cae617f84d8..81db65b79308 100644 --- a/src/crimson/osd/pg.cc +++ b/src/crimson/osd/pg.cc @@ -28,7 +28,7 @@ #include "crimson/net/Messenger.h" #include "crimson/os/cyan_collection.h" #include "crimson/os/cyan_store.h" -#include "crimson/os/Transaction.h" +#include "os/Transaction.h" #include "crimson/osd/exceptions.h" #include "crimson/osd/pg_meta.h" diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index 12f6bd730e47..2a2d44ce6f64 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -12,7 +12,7 @@ #include #include "crimson/net/Fwd.h" -#include "crimson/os/Transaction.h" +#include "os/Transaction.h" #include "osd/osd_types.h" #include "osd/osd_internal_types.h" #include "recovery_state.h" diff --git a/src/crimson/osd/pg_backend.cc b/src/crimson/osd/pg_backend.cc index fa415d988fcf..79651b78ef85 100644 --- a/src/crimson/osd/pg_backend.cc +++ b/src/crimson/osd/pg_backend.cc @@ -173,7 +173,7 @@ PGBackend::store_object_state( ceph::bufferlist osv; encode(os->oi, osv, 0); // TODO: get_osdmap()->get_features(CEPH_ENTITY_TYPE_OSD, nullptr)); - txn.setattr(coll->cid, ghobject_t{os->oi.soid}, OI_ATTR, std::move(osv)); + txn.setattr(coll->cid, ghobject_t{os->oi.soid}, OI_ATTR, osv); } } else { // reset cached ObjectState without enforcing eviction diff --git a/src/crimson/osd/pg_backend.h b/src/crimson/osd/pg_backend.h index bb44e54052c8..ddde848018c9 100644 --- a/src/crimson/osd/pg_backend.h +++ b/src/crimson/osd/pg_backend.h @@ -9,7 +9,7 @@ #include #include "crimson/common/shared_lru.h" -#include "crimson/os/Transaction.h" +#include "os/Transaction.h" #include "osd/osd_types.h" #include "osd/osd_internal_types.h" diff --git a/src/os/ObjectStore.cc b/src/os/ObjectStore.cc index d9b9cf6d1ab6..5c555ec77c53 100644 --- a/src/os/ObjectStore.cc +++ b/src/os/ObjectStore.cc @@ -24,41 +24,6 @@ #endif #include "kstore/KStore.h" -void decode_str_str_map_to_bl(bufferlist::const_iterator& p, - bufferlist *out) -{ - auto start = p; - __u32 n; - decode(n, p); - unsigned len = 4; - while (n--) { - __u32 l; - decode(l, p); - p.advance(l); - len += 4 + l; - decode(l, p); - p.advance(l); - len += 4 + l; - } - start.copy(len, *out); -} - -void decode_str_set_to_bl(bufferlist::const_iterator& p, - bufferlist *out) -{ - auto start = p; - __u32 n; - decode(n, p); - unsigned len = 4; - while (n--) { - __u32 l; - decode(l, p); - p.advance(l); - len += 4 + l; - } - start.copy(len, *out); -} - ObjectStore *ObjectStore::create(CephContext *cct, const string& type, const string& data, @@ -150,11 +115,3 @@ int ObjectStore::read_meta(const std::string& key, *value = string(buf, r); return 0; } - - - - -ostream& operator<<(ostream& out, const ObjectStore::Transaction& tx) { - - return out << "Transaction(" << &tx << ")"; -} diff --git a/src/os/ObjectStore.h b/src/os/ObjectStore.h index cb84be8faa4f..594af2960bb9 100644 --- a/src/os/ObjectStore.h +++ b/src/os/ObjectStore.h @@ -22,6 +22,7 @@ #include "common/TrackedOp.h" #include "common/WorkQueue.h" #include "ObjectMap.h" +#include "os/Transaction.h" #include #include @@ -34,8 +35,6 @@ #include /* or */ #endif -#define OPS_PER_PTR 32 - class CephContext; namespace ceph { @@ -54,10 +53,6 @@ static inline void encode(const std::map *attrset encode(*attrset, bl); } -// this isn't the best place for these, but... -void decode_str_str_map_to_bl(ceph::buffer::list::const_iterator& p, ceph::buffer::list *out); -void decode_str_set_to_bl(ceph::buffer::list::const_iterator& p, ceph::buffer::list *out); - // Flag bits typedef uint32_t osflagbits_t; const int SKIP_JOURNAL_REPLAY = 1 << 0; @@ -68,6 +63,8 @@ protected: std::string path; public: + using Transaction = ceph::os::Transaction; + CephContext* cct; /** * create - create an ObjectStore instance. @@ -214,1228 +211,6 @@ public: */ - /********************************* - * transaction - * - * A Transaction represents a sequence of primitive mutation - * operations. - * - * Three events in the life of a Transaction result in - * callbacks. Any Transaction can contain any number of callback - * objects (Context) for any combination of the three classes of - * callbacks: - * - * on_applied_sync, on_applied, and on_commit. - * - * The "on_applied" and "on_applied_sync" callbacks are invoked when - * the modifications requested by the Transaction are visible to - * subsequent ObjectStore operations, i.e., the results are - * readable. The only conceptual difference between on_applied and - * on_applied_sync is the specific thread and locking environment in - * which the callbacks operate. "on_applied_sync" is called - * directly by an ObjectStore execution thread. It is expected to - * execute quickly and must not acquire any locks of the calling - * environment. Conversely, "on_applied" is called from the separate - * Finisher thread, meaning that it can contend for calling - * environment locks. NB, on_applied and on_applied_sync are - * sometimes called on_readable and on_readable_sync. - * - * The "on_commit" callback is also called from the Finisher thread - * and indicates that all of the mutations have been durably - * committed to stable storage (i.e., are now software/hardware - * crashproof). - * - * At the implementation level, each mutation primitive (and its - * associated data) can be serialized to a single buffer. That - * serialization, however, does not copy any data, but (using the - * ceph::buffer::list library) will reference the original buffers. This - * implies that the buffer that contains the data being submitted - * must remain stable until the on_commit callback completes. In - * practice, ceph::buffer::list handles all of this for you and this - * subtlety is only relevant if you are referencing an existing - * buffer via buffer::raw_static. - * - * Some implementations of ObjectStore choose to implement their own - * form of journaling that uses the serialized form of a - * Transaction. This requires that the encode/decode logic properly - * version itself and handle version upgrades that might change the - * format of the encoded Transaction. This has already happened a - * couple of times and the Transaction object contains some helper - * variables that aid in this legacy decoding: - * - * sobject_encoding detects an older/simpler version of oid - * present in pre-bobtail versions of ceph. use_pool_override - * also detects a situation where the pool of an oid can be - * overridden for legacy operations/buffers. For non-legacy - * implementations of ObjectStore, neither of these fields are - * relevant. - * - * - * TRANSACTION ISOLATION - * - * Except as noted above, isolation is the responsibility of the - * caller. In other words, if any storage element (storage element - * == any of the four portions of an object as described above) is - * altered by a transaction (including deletion), the caller - * promises not to attempt to read that element while the - * transaction is pending (here pending means from the time of - * issuance until the "on_applied_sync" callback has been - * received). Violations of isolation need not be detected by - * ObjectStore and there is no corresponding error mechanism for - * reporting an isolation violation (crashing would be the - * appropriate way to report an isolation violation if detected). - * - * Enumeration operations may violate transaction isolation as - * described above when a storage element is being created or - * deleted as part of a transaction. In this case, ObjectStore is - * allowed to consider the enumeration operation to either precede - * or follow the violating transaction element. In other words, the - * presence/absence of the mutated element in the enumeration is - * entirely at the discretion of ObjectStore. The arbitrary ordering - * applies independently to each transaction element. For example, - * if a transaction contains two mutating elements "create A" and - * "delete B". And an enumeration operation is performed while this - * transaction is pending. It is permissible for ObjectStore to - * report any of the four possible combinations of the existence of - * A and B. - * - */ - class Transaction { - public: - enum { - OP_NOP = 0, - OP_TOUCH = 9, // cid, oid - OP_WRITE = 10, // cid, oid, offset, len, bl - OP_ZERO = 11, // cid, oid, offset, len - OP_TRUNCATE = 12, // cid, oid, len - OP_REMOVE = 13, // cid, oid - OP_SETATTR = 14, // cid, oid, attrname, bl - OP_SETATTRS = 15, // cid, oid, attrset - OP_RMATTR = 16, // cid, oid, attrname - OP_CLONE = 17, // cid, oid, newoid - OP_CLONERANGE = 18, // cid, oid, newoid, offset, len - OP_CLONERANGE2 = 30, // cid, oid, newoid, srcoff, len, dstoff - - OP_TRIMCACHE = 19, // cid, oid, offset, len **DEPRECATED** - - OP_MKCOLL = 20, // cid - OP_RMCOLL = 21, // cid - OP_COLL_ADD = 22, // cid, oldcid, oid - OP_COLL_REMOVE = 23, // cid, oid - OP_COLL_SETATTR = 24, // cid, attrname, bl - OP_COLL_RMATTR = 25, // cid, attrname - OP_COLL_SETATTRS = 26, // cid, attrset - OP_COLL_MOVE = 8, // newcid, oldcid, oid - - OP_RMATTRS = 28, // cid, oid - OP_COLL_RENAME = 29, // cid, newcid - - OP_OMAP_CLEAR = 31, // cid - OP_OMAP_SETKEYS = 32, // cid, attrset - OP_OMAP_RMKEYS = 33, // cid, keyset - OP_OMAP_SETHEADER = 34, // cid, header - OP_SPLIT_COLLECTION = 35, // cid, bits, destination - OP_SPLIT_COLLECTION2 = 36, /* cid, bits, destination - doesn't create the destination */ - OP_OMAP_RMKEYRANGE = 37, // cid, oid, firstkey, lastkey - OP_COLL_MOVE_RENAME = 38, // oldcid, oldoid, newcid, newoid - - OP_SETALLOCHINT = 39, // cid, oid, object_size, write_size - OP_COLL_HINT = 40, // cid, type, bl - - OP_TRY_RENAME = 41, // oldcid, oldoid, newoid - - OP_COLL_SET_BITS = 42, // cid, bits - - OP_MERGE_COLLECTION = 43, // cid, destination - }; - - // Transaction hint type - enum { - COLL_HINT_EXPECTED_NUM_OBJECTS = 1, - }; - - struct Op { - __le32 op; - __le32 cid; - __le32 oid; - __le64 off; - __le64 len; - __le32 dest_cid; - __le32 dest_oid; //OP_CLONE, OP_CLONERANGE - __le64 dest_off; //OP_CLONERANGE - union { - struct { - __le32 hint_type; //OP_COLL_HINT - }; - struct { - __le32 alloc_hint_flags; //OP_SETALLOCHINT - }; - }; - __le64 expected_object_size; //OP_SETALLOCHINT - __le64 expected_write_size; //OP_SETALLOCHINT - __le32 split_bits; //OP_SPLIT_COLLECTION2,OP_COLL_SET_BITS, - //OP_MKCOLL - __le32 split_rem; //OP_SPLIT_COLLECTION2 - } __attribute__ ((packed)) ; - - struct TransactionData { - __le64 ops; - __le32 largest_data_len; - __le32 largest_data_off; - __le32 largest_data_off_in_data_bl; - __le32 fadvise_flags; - - TransactionData() noexcept : - ops(0), - largest_data_len(0), - largest_data_off(0), - largest_data_off_in_data_bl(0), - fadvise_flags(0) { } - - // override default move operations to reset default values - TransactionData(TransactionData&& other) noexcept : - ops(other.ops), - largest_data_len(other.largest_data_len), - largest_data_off(other.largest_data_off), - largest_data_off_in_data_bl(other.largest_data_off_in_data_bl), - fadvise_flags(other.fadvise_flags) { - other.ops = 0; - other.largest_data_len = 0; - other.largest_data_off = 0; - other.largest_data_off_in_data_bl = 0; - other.fadvise_flags = 0; - } - TransactionData& operator=(TransactionData&& other) noexcept { - ops = other.ops; - largest_data_len = other.largest_data_len; - largest_data_off = other.largest_data_off; - largest_data_off_in_data_bl = other.largest_data_off_in_data_bl; - fadvise_flags = other.fadvise_flags; - other.ops = 0; - other.largest_data_len = 0; - other.largest_data_off = 0; - other.largest_data_off_in_data_bl = 0; - other.fadvise_flags = 0; - return *this; - } - - TransactionData(const TransactionData& other) = default; - TransactionData& operator=(const TransactionData& other) = default; - - void encode(ceph::buffer::list& bl) const { - bl.append((char*)this, sizeof(TransactionData)); - } - void decode(ceph::buffer::list::const_iterator &bl) { - bl.copy(sizeof(TransactionData), (char*)this); - } - } __attribute__ ((packed)) ; - - private: - TransactionData data; - - std::map coll_index; - std::map object_index; - - __le32 coll_id {0}; - __le32 object_id {0}; - - ceph::buffer::list data_bl; - ceph::buffer::list op_bl; - - std::list on_applied; - std::list on_commit; - std::list on_applied_sync; - - public: - Transaction() = default; - - explicit Transaction(ceph::buffer::list::const_iterator &dp) { - decode(dp); - } - explicit Transaction(ceph::buffer::list &nbl) { - auto dp = nbl.cbegin(); - decode(dp); - } - - // override default move operations to reset default values - Transaction(Transaction&& other) noexcept : - data(std::move(other.data)), - coll_index(std::move(other.coll_index)), - object_index(std::move(other.object_index)), - coll_id(other.coll_id), - object_id(other.object_id), - data_bl(std::move(other.data_bl)), - op_bl(std::move(other.op_bl)), - on_applied(std::move(other.on_applied)), - on_commit(std::move(other.on_commit)), - on_applied_sync(std::move(other.on_applied_sync)) { - other.coll_id = 0; - other.object_id = 0; - } - - Transaction& operator=(Transaction&& other) noexcept { - data = std::move(other.data); - coll_index = std::move(other.coll_index); - object_index = std::move(other.object_index); - coll_id = other.coll_id; - object_id = other.object_id; - data_bl = std::move(other.data_bl); - op_bl = std::move(other.op_bl); - on_applied = std::move(other.on_applied); - on_commit = std::move(other.on_commit); - on_applied_sync = std::move(other.on_applied_sync); - other.coll_id = 0; - other.object_id = 0; - return *this; - } - - Transaction(const Transaction& other) = default; - Transaction& operator=(const Transaction& other) = default; - - // expose object_index for FileStore::Op's benefit - const std::map& get_object_index() const { - return object_index; - } - - /* Operations on callback contexts */ - void register_on_applied(Context *c) { - if (!c) return; - on_applied.push_back(c); - } - void register_on_commit(Context *c) { - if (!c) return; - on_commit.push_back(c); - } - void register_on_applied_sync(Context *c) { - if (!c) return; - on_applied_sync.push_back(c); - } - void register_on_complete(Context *c) { - if (!c) return; - RunOnDeleteRef _complete (std::make_shared(c)); - register_on_applied(new ContainerContext(_complete)); - register_on_commit(new ContainerContext(_complete)); - } - bool has_contexts() const { - return - !on_commit.empty() || - !on_applied.empty() || - !on_applied_sync.empty(); - } - - static void collect_contexts( - std::vector& t, - Context **out_on_applied, - Context **out_on_commit, - Context **out_on_applied_sync) { - ceph_assert(out_on_applied); - ceph_assert(out_on_commit); - ceph_assert(out_on_applied_sync); - std::list on_applied, on_commit, on_applied_sync; - for (auto& i : t) { - on_applied.splice(on_applied.end(), i.on_applied); - on_commit.splice(on_commit.end(), i.on_commit); - on_applied_sync.splice(on_applied_sync.end(), i.on_applied_sync); - } - *out_on_applied = C_Contexts::list_to_context(on_applied); - *out_on_commit = C_Contexts::list_to_context(on_commit); - *out_on_applied_sync = C_Contexts::list_to_context(on_applied_sync); - } - static void collect_contexts( - std::vector& t, - std::list *out_on_applied, - std::list *out_on_commit, - std::list *out_on_applied_sync) { - ceph_assert(out_on_applied); - ceph_assert(out_on_commit); - ceph_assert(out_on_applied_sync); - for (auto& i : t) { - out_on_applied->splice(out_on_applied->end(), i.on_applied); - out_on_commit->splice(out_on_commit->end(), i.on_commit); - out_on_applied_sync->splice(out_on_applied_sync->end(), - i.on_applied_sync); - } - } - - Context *get_on_applied() { - return C_Contexts::list_to_context(on_applied); - } - Context *get_on_commit() { - return C_Contexts::list_to_context(on_commit); - } - Context *get_on_applied_sync() { - return C_Contexts::list_to_context(on_applied_sync); - } - - void set_fadvise_flags(uint32_t flags) { - data.fadvise_flags = flags; - } - void set_fadvise_flag(uint32_t flag) { - data.fadvise_flags = data.fadvise_flags | flag; - } - uint32_t get_fadvise_flags() { return data.fadvise_flags; } - - void swap(Transaction& other) noexcept { - std::swap(data, other.data); - std::swap(on_applied, other.on_applied); - std::swap(on_commit, other.on_commit); - std::swap(on_applied_sync, other.on_applied_sync); - - std::swap(coll_index, other.coll_index); - std::swap(object_index, other.object_index); - std::swap(coll_id, other.coll_id); - std::swap(object_id, other.object_id); - op_bl.swap(other.op_bl); - data_bl.swap(other.data_bl); - } - - void _update_op(Op* op, - std::vector<__le32> &cm, - std::vector<__le32> &om) { - - switch (op->op) { - case OP_NOP: - break; - - case OP_TOUCH: - case OP_REMOVE: - case OP_SETATTR: - case OP_SETATTRS: - case OP_RMATTR: - case OP_RMATTRS: - case OP_COLL_REMOVE: - case OP_OMAP_CLEAR: - case OP_OMAP_SETKEYS: - case OP_OMAP_RMKEYS: - case OP_OMAP_RMKEYRANGE: - case OP_OMAP_SETHEADER: - case OP_WRITE: - case OP_ZERO: - case OP_TRUNCATE: - case OP_SETALLOCHINT: - ceph_assert(op->cid < cm.size()); - ceph_assert(op->oid < om.size()); - op->cid = cm[op->cid]; - op->oid = om[op->oid]; - break; - - case OP_CLONERANGE2: - case OP_CLONE: - ceph_assert(op->cid < cm.size()); - ceph_assert(op->oid < om.size()); - ceph_assert(op->dest_oid < om.size()); - op->cid = cm[op->cid]; - op->oid = om[op->oid]; - op->dest_oid = om[op->dest_oid]; - break; - - case OP_MKCOLL: - case OP_RMCOLL: - case OP_COLL_SETATTR: - case OP_COLL_RMATTR: - case OP_COLL_SETATTRS: - case OP_COLL_HINT: - case OP_COLL_SET_BITS: - ceph_assert(op->cid < cm.size()); - op->cid = cm[op->cid]; - break; - - case OP_COLL_ADD: - ceph_assert(op->cid < cm.size()); - ceph_assert(op->oid < om.size()); - ceph_assert(op->dest_cid < om.size()); - op->cid = cm[op->cid]; - op->dest_cid = cm[op->dest_cid]; - op->oid = om[op->oid]; - break; - - case OP_COLL_MOVE_RENAME: - ceph_assert(op->cid < cm.size()); - ceph_assert(op->oid < om.size()); - ceph_assert(op->dest_cid < cm.size()); - ceph_assert(op->dest_oid < om.size()); - op->cid = cm[op->cid]; - op->oid = om[op->oid]; - op->dest_cid = cm[op->dest_cid]; - op->dest_oid = om[op->dest_oid]; - break; - - case OP_TRY_RENAME: - ceph_assert(op->cid < cm.size()); - ceph_assert(op->oid < om.size()); - ceph_assert(op->dest_oid < om.size()); - op->cid = cm[op->cid]; - op->oid = om[op->oid]; - op->dest_oid = om[op->dest_oid]; - break; - - case OP_SPLIT_COLLECTION2: - ceph_assert(op->cid < cm.size()); - ceph_assert(op->dest_cid < cm.size()); - op->cid = cm[op->cid]; - op->dest_cid = cm[op->dest_cid]; - break; - - case OP_MERGE_COLLECTION: - ceph_assert(op->cid < cm.size()); - ceph_assert(op->dest_cid < cm.size()); - op->cid = cm[op->cid]; - op->dest_cid = cm[op->dest_cid]; - break; - - default: - ceph_abort_msg("Unknown OP"); - } - } - void _update_op_bl( - ceph::buffer::list& bl, - std::vector<__le32> &cm, - std::vector<__le32> &om) { - for (auto& bp : bl.buffers()) { - ceph_assert(bp.length() % sizeof(Op) == 0); - - char* raw_p = const_cast(bp.c_str()); - char* raw_end = raw_p + bp.length(); - while (raw_p < raw_end) { - _update_op(reinterpret_cast(raw_p), cm, om); - raw_p += sizeof(Op); - } - } - } - /// Append the operations of the parameter to this Transaction. Those operations are removed from the parameter Transaction - void append(Transaction& other) { - - data.ops += other.data.ops; - if (other.data.largest_data_len > data.largest_data_len) { - data.largest_data_len = other.data.largest_data_len; - data.largest_data_off = other.data.largest_data_off; - data.largest_data_off_in_data_bl = data_bl.length() + other.data.largest_data_off_in_data_bl; - } - data.fadvise_flags |= other.data.fadvise_flags; - on_applied.splice(on_applied.end(), other.on_applied); - on_commit.splice(on_commit.end(), other.on_commit); - on_applied_sync.splice(on_applied_sync.end(), other.on_applied_sync); - - //append coll_index & object_index - std::vector<__le32> cm(other.coll_index.size()); - std::map::iterator coll_index_p; - for (coll_index_p = other.coll_index.begin(); - coll_index_p != other.coll_index.end(); - ++coll_index_p) { - cm[coll_index_p->second] = _get_coll_id(coll_index_p->first); - } - - std::vector<__le32> om(other.object_index.size()); - std::map::iterator object_index_p; - for (object_index_p = other.object_index.begin(); - object_index_p != other.object_index.end(); - ++object_index_p) { - om[object_index_p->second] = _get_object_id(object_index_p->first); - } - - //the other.op_bl SHOULD NOT be changes during append operation, - //we use additional ceph::buffer::list to avoid this problem - ceph::buffer::list other_op_bl; - { - ceph::buffer::ptr other_op_bl_ptr(other.op_bl.length()); - other.op_bl.copy(0, other.op_bl.length(), other_op_bl_ptr.c_str()); - other_op_bl.append(std::move(other_op_bl_ptr)); - } - - //update other_op_bl with cm & om - //When the other is appended to current transaction, all coll_index and - //object_index in other.op_buffer should be updated by new index of the - //combined transaction - _update_op_bl(other_op_bl, cm, om); - - //append op_bl - op_bl.append(other_op_bl); - //append data_bl - data_bl.append(other.data_bl); - } - - /** Inquires about the Transaction as a whole. */ - - /// How big is the encoded Transaction buffer? - uint64_t get_encoded_bytes() { - //layout: data_bl + op_bl + coll_index + object_index + data - - // coll_index size, object_index size and sizeof(transaction_data) - // all here, so they may be computed at compile-time - size_t final_size = sizeof(__u32) * 2 + sizeof(data); - - // coll_index second and object_index second - final_size += (coll_index.size() + object_index.size()) * sizeof(__le32); - - // coll_index first - for (auto p = coll_index.begin(); p != coll_index.end(); ++p) { - final_size += p->first.encoded_size(); - } - - // object_index first - for (auto p = object_index.begin(); p != object_index.end(); ++p) { - final_size += p->first.encoded_size(); - } - - return data_bl.length() + - op_bl.length() + - final_size; - } - - /// Retain old version for regression testing purposes - uint64_t get_encoded_bytes_test() { - using ceph::encode; - //layout: data_bl + op_bl + coll_index + object_index + data - ceph::buffer::list bl; - encode(coll_index, bl); - encode(object_index, bl); - - return data_bl.length() + - op_bl.length() + - bl.length() + - sizeof(data); - } - - uint64_t get_num_bytes() { - return get_encoded_bytes(); - } - /// Size of largest data buffer to the "write" operation encountered so far - uint32_t get_data_length() { - return data.largest_data_len; - } - /// offset within the encoded buffer to the start of the largest data buffer that's encoded - uint32_t get_data_offset() { - if (data.largest_data_off_in_data_bl) { - return data.largest_data_off_in_data_bl + - sizeof(__u8) + // encode struct_v - sizeof(__u8) + // encode compat_v - sizeof(__u32) + // encode len - sizeof(__u32); // data_bl len - } - return 0; // none - } - /// offset of buffer as aligned to destination within object. - int get_data_alignment() { - if (!data.largest_data_len) - return 0; - return (0 - get_data_offset()) & ~CEPH_PAGE_MASK; - } - /// Is the Transaction empty (no operations) - bool empty() { - return !data.ops; - } - /// Number of operations in the transaction - int get_num_ops() { - return data.ops; - } - - /** - * iterator - * - * Helper object to parse Transactions. - * - * ObjectStore instances use this object to step down the encoded - * buffer decoding operation codes and parameters as we go. - * - */ - class iterator { - Transaction *t; - - uint64_t ops; - char* op_buffer_p; - - ceph::buffer::list::const_iterator data_bl_p; - - public: - std::vector colls; - std::vector objects; - - private: - explicit iterator(Transaction *t) - : t(t), - data_bl_p(t->data_bl.cbegin()), - colls(t->coll_index.size()), - objects(t->object_index.size()) { - - ops = t->data.ops; - op_buffer_p = t->op_bl.c_str(); - - std::map::iterator coll_index_p; - for (coll_index_p = t->coll_index.begin(); - coll_index_p != t->coll_index.end(); - ++coll_index_p) { - colls[coll_index_p->second] = coll_index_p->first; - } - - std::map::iterator object_index_p; - for (object_index_p = t->object_index.begin(); - object_index_p != t->object_index.end(); - ++object_index_p) { - objects[object_index_p->second] = object_index_p->first; - } - } - - friend class Transaction; - - public: - - bool have_op() { - return ops > 0; - } - Op* decode_op() { - ceph_assert(ops > 0); - - Op* op = reinterpret_cast(op_buffer_p); - op_buffer_p += sizeof(Op); - ops--; - - return op; - } - std::string decode_string() { - using ceph::decode; - std::string s; - decode(s, data_bl_p); - return s; - } - void decode_bp(ceph::buffer::ptr& bp) { - using ceph::decode; - decode(bp, data_bl_p); - } - void decode_bl(ceph::buffer::list& bl) { - using ceph::decode; - decode(bl, data_bl_p); - } - void decode_attrset(std::map& aset) { - using ceph::decode; - decode(aset, data_bl_p); - } - void decode_attrset(std::map& aset) { - using ceph::decode; - decode(aset, data_bl_p); - } - void decode_attrset_bl(ceph::buffer::list *pbl) { - decode_str_str_map_to_bl(data_bl_p, pbl); - } - void decode_keyset(std::set &keys){ - using ceph::decode; - decode(keys, data_bl_p); - } - void decode_keyset_bl(ceph::buffer::list *pbl){ - decode_str_set_to_bl(data_bl_p, pbl); - } - - const ghobject_t &get_oid(__le32 oid_id) { - ceph_assert(oid_id < objects.size()); - return objects[oid_id]; - } - const coll_t &get_cid(__le32 cid_id) { - ceph_assert(cid_id < colls.size()); - return colls[cid_id]; - } - uint32_t get_fadvise_flags() const { - return t->get_fadvise_flags(); - } - }; - - iterator begin() { - return iterator(this); - } - -private: - void _build_actions_from_tbl(); - - /** - * Helper functions to encode the various mutation elements of a - * transaction. These are 1:1 with the operation codes (see - * enumeration above). These routines ensure that the - * encoder/creator of a transaction gets the right data in the - * right place. Sadly, there's no corresponding version nor any - * form of seat belts for the decoder. - */ - Op* _get_next_op() { - if (op_bl.get_append_buffer_unused_tail_length() < sizeof(Op)) { - op_bl.reserve(sizeof(Op) * OPS_PER_PTR); - } - // append_hole ensures bptr merging. Even huge number of ops - // shouldn't result in overpopulating bl::_buffers. - char* const p = op_bl.append_hole(sizeof(Op)).c_str(); - memset(p, 0, sizeof(Op)); - return reinterpret_cast(p); - } - __le32 _get_coll_id(const coll_t& coll) { - std::map::iterator c = coll_index.find(coll); - if (c != coll_index.end()) - return c->second; - - __le32 index_id = coll_id++; - coll_index[coll] = index_id; - return index_id; - } - __le32 _get_object_id(const ghobject_t& oid) { - std::map::iterator o = object_index.find(oid); - if (o != object_index.end()) - return o->second; - - __le32 index_id = object_id++; - object_index[oid] = index_id; - return index_id; - } - -public: - /// noop. 'nuf said - void nop() { - Op* _op = _get_next_op(); - _op->op = OP_NOP; - data.ops++; - } - /** - * touch - * - * Ensure the existance of an object in a collection. Create an - * empty object if necessary - */ - void touch(const coll_t& cid, const ghobject_t& oid) { - Op* _op = _get_next_op(); - _op->op = OP_TOUCH; - _op->cid = _get_coll_id(cid); - _op->oid = _get_object_id(oid); - data.ops++; - } - /** - * Write data to an offset within an object. If the object is too - * small, it is expanded as needed. It is possible to specify an - * offset beyond the current end of an object and it will be - * expanded as needed. Simple implementations of ObjectStore will - * just zero the data between the old end of the object and the - * newly provided data. More sophisticated implementations of - * ObjectStore will omit the untouched data and store it as a - * "hole" in the file. - * - * Note that a 0-length write does not affect the size of the object. - */ - void write(const coll_t& cid, const ghobject_t& oid, uint64_t off, uint64_t len, - const ceph::buffer::list& write_data, uint32_t flags = 0) { - using ceph::encode; - uint32_t orig_len = data_bl.length(); - Op* _op = _get_next_op(); - _op->op = OP_WRITE; - _op->cid = _get_coll_id(cid); - _op->oid = _get_object_id(oid); - _op->off = off; - _op->len = len; - encode(write_data, data_bl); - - ceph_assert(len == write_data.length()); - data.fadvise_flags = data.fadvise_flags | flags; - if (write_data.length() > data.largest_data_len) { - data.largest_data_len = write_data.length(); - data.largest_data_off = off; - data.largest_data_off_in_data_bl = orig_len + sizeof(__u32); // we are about to - } - data.ops++; - } - /** - * zero out the indicated byte range within an object. Some - * ObjectStore instances may optimize this to release the - * underlying storage space. - * - * If the zero range extends beyond the end of the object, the object - * size is extended, just as if we were writing a buffer full of zeros. - * EXCEPT if the length is 0, in which case (just like a 0-length write) - * we do not adjust the object size. - */ - void zero(const coll_t& cid, const ghobject_t& oid, uint64_t off, uint64_t len) { - Op* _op = _get_next_op(); - _op->op = OP_ZERO; - _op->cid = _get_coll_id(cid); - _op->oid = _get_object_id(oid); - _op->off = off; - _op->len = len; - data.ops++; - } - /// Discard all data in the object beyond the specified size. - void truncate(const coll_t& cid, const ghobject_t& oid, uint64_t off) { - Op* _op = _get_next_op(); - _op->op = OP_TRUNCATE; - _op->cid = _get_coll_id(cid); - _op->oid = _get_object_id(oid); - _op->off = off; - data.ops++; - } - /// Remove an object. All four parts of the object are removed. - void remove(const coll_t& cid, const ghobject_t& oid) { - Op* _op = _get_next_op(); - _op->op = OP_REMOVE; - _op->cid = _get_coll_id(cid); - _op->oid = _get_object_id(oid); - data.ops++; - } - /// Set an xattr of an object - void setattr(const coll_t& cid, const ghobject_t& oid, const char* name, ceph::buffer::list& val) { - std::string n(name); - setattr(cid, oid, n, val); - } - /// Set an xattr of an object - void setattr(const coll_t& cid, const ghobject_t& oid, const std::string& s, ceph::buffer::list& val) { - using ceph::encode; - Op* _op = _get_next_op(); - _op->op = OP_SETATTR; - _op->cid = _get_coll_id(cid); - _op->oid = _get_object_id(oid); - encode(s, data_bl); - encode(val, data_bl); - data.ops++; - } - /// Set multiple xattrs of an object - void setattrs(const coll_t& cid, const ghobject_t& oid, const std::map& attrset) { - using ceph::encode; - Op* _op = _get_next_op(); - _op->op = OP_SETATTRS; - _op->cid = _get_coll_id(cid); - _op->oid = _get_object_id(oid); - encode(attrset, data_bl); - data.ops++; - } - /// Set multiple xattrs of an object - void setattrs(const coll_t& cid, const ghobject_t& oid, const std::map& attrset) { - using ceph::encode; - Op* _op = _get_next_op(); - _op->op = OP_SETATTRS; - _op->cid = _get_coll_id(cid); - _op->oid = _get_object_id(oid); - encode(attrset, data_bl); - data.ops++; - } - /// remove an xattr from an object - void rmattr(const coll_t& cid, const ghobject_t& oid, const char *name) { - std::string n(name); - rmattr(cid, oid, n); - } - /// remove an xattr from an object - void rmattr(const coll_t& cid, const ghobject_t& oid, const std::string& s) { - using ceph::encode; - Op* _op = _get_next_op(); - _op->op = OP_RMATTR; - _op->cid = _get_coll_id(cid); - _op->oid = _get_object_id(oid); - encode(s, data_bl); - data.ops++; - } - /// remove all xattrs from an object - void rmattrs(const coll_t& cid, const ghobject_t& oid) { - Op* _op = _get_next_op(); - _op->op = OP_RMATTRS; - _op->cid = _get_coll_id(cid); - _op->oid = _get_object_id(oid); - data.ops++; - } - /** - * Clone an object into another object. - * - * Low-cost (e.g., O(1)) cloning (if supported) is best, but - * fallback to an O(n) copy is allowed. All four parts of the - * object are cloned (data, xattrs, omap header, omap - * entries). - * - * The destination named object may already exist, in - * which case its previous contents are discarded. - */ - void clone(const coll_t& cid, const ghobject_t& oid, - const ghobject_t& noid) { - Op* _op = _get_next_op(); - _op->op = OP_CLONE; - _op->cid = _get_coll_id(cid); - _op->oid = _get_object_id(oid); - _op->dest_oid = _get_object_id(noid); - data.ops++; - } - /** - * Clone a byte range from one object to another. - * - * The data portion of the destination object receives a copy of a - * portion of the data from the source object. None of the other - * three parts of an object is copied from the source. - * - * The destination object size may be extended to the dstoff + len. - * - * The source range *must* overlap with the source object data. If it does - * not the result is undefined. - */ - void clone_range(const coll_t& cid, const ghobject_t& oid, - const ghobject_t& noid, - uint64_t srcoff, uint64_t srclen, uint64_t dstoff) { - Op* _op = _get_next_op(); - _op->op = OP_CLONERANGE2; - _op->cid = _get_coll_id(cid); - _op->oid = _get_object_id(oid); - _op->dest_oid = _get_object_id(noid); - _op->off = srcoff; - _op->len = srclen; - _op->dest_off = dstoff; - data.ops++; - } - - /// Create the collection - void create_collection(const coll_t& cid, int bits) { - Op* _op = _get_next_op(); - _op->op = OP_MKCOLL; - _op->cid = _get_coll_id(cid); - _op->split_bits = bits; - data.ops++; - } - - /** - * Give the collection a hint. - * - * @param cid - collection id. - * @param type - hint type. - * @param hint - the hint payload, which contains the customized - * data along with the hint type. - */ - void collection_hint(const coll_t& cid, uint32_t type, const ceph::buffer::list& hint) { - using ceph::encode; - Op* _op = _get_next_op(); - _op->op = OP_COLL_HINT; - _op->cid = _get_coll_id(cid); - _op->hint_type = type; - encode(hint, data_bl); - data.ops++; - } - - /// remove the collection, the collection must be empty - void remove_collection(const coll_t& cid) { - Op* _op = _get_next_op(); - _op->op = OP_RMCOLL; - _op->cid = _get_coll_id(cid); - data.ops++; - } - void collection_move(const coll_t& cid, const coll_t &oldcid, const ghobject_t& oid) - __attribute__ ((deprecated)) { - // NOTE: we encode this as a fixed combo of ADD + REMOVE. they - // always appear together, so this is effectively a single MOVE. - Op* _op = _get_next_op(); - _op->op = OP_COLL_ADD; - _op->cid = _get_coll_id(oldcid); - _op->oid = _get_object_id(oid); - _op->dest_cid = _get_coll_id(cid); - data.ops++; - - _op = _get_next_op(); - _op->op = OP_COLL_REMOVE; - _op->cid = _get_coll_id(oldcid); - _op->oid = _get_object_id(oid); - data.ops++; - } - void collection_move_rename(const coll_t& oldcid, const ghobject_t& oldoid, - const coll_t &cid, const ghobject_t& oid) { - Op* _op = _get_next_op(); - _op->op = OP_COLL_MOVE_RENAME; - _op->cid = _get_coll_id(oldcid); - _op->oid = _get_object_id(oldoid); - _op->dest_cid = _get_coll_id(cid); - _op->dest_oid = _get_object_id(oid); - data.ops++; - } - void try_rename(const coll_t &cid, const ghobject_t& oldoid, - const ghobject_t& oid) { - Op* _op = _get_next_op(); - _op->op = OP_TRY_RENAME; - _op->cid = _get_coll_id(cid); - _op->oid = _get_object_id(oldoid); - _op->dest_oid = _get_object_id(oid); - data.ops++; - } - - /// Remove omap from oid - void omap_clear( - const coll_t &cid, ///< [in] Collection containing oid - const ghobject_t &oid ///< [in] Object from which to remove omap - ) { - Op* _op = _get_next_op(); - _op->op = OP_OMAP_CLEAR; - _op->cid = _get_coll_id(cid); - _op->oid = _get_object_id(oid); - data.ops++; - } - /// Set keys on oid omap. Replaces duplicate keys. - void omap_setkeys( - const coll_t& cid, ///< [in] Collection containing oid - const ghobject_t &oid, ///< [in] Object to update - const std::map &attrset ///< [in] Replacement keys and values - ) { - using ceph::encode; - Op* _op = _get_next_op(); - _op->op = OP_OMAP_SETKEYS; - _op->cid = _get_coll_id(cid); - _op->oid = _get_object_id(oid); - encode(attrset, data_bl); - data.ops++; - } - - /// Set keys on an oid omap (ceph::buffer::list variant). - void omap_setkeys( - const coll_t &cid, ///< [in] Collection containing oid - const ghobject_t &oid, ///< [in] Object to update - const ceph::buffer::list &attrset_bl ///< [in] Replacement keys and values - ) { - Op* _op = _get_next_op(); - _op->op = OP_OMAP_SETKEYS; - _op->cid = _get_coll_id(cid); - _op->oid = _get_object_id(oid); - data_bl.append(attrset_bl); - data.ops++; - } - - /// Remove keys from oid omap - void omap_rmkeys( - const coll_t &cid, ///< [in] Collection containing oid - const ghobject_t &oid, ///< [in] Object from which to remove the omap - const std::set &keys ///< [in] Keys to clear - ) { - using ceph::encode; - Op* _op = _get_next_op(); - _op->op = OP_OMAP_RMKEYS; - _op->cid = _get_coll_id(cid); - _op->oid = _get_object_id(oid); - encode(keys, data_bl); - data.ops++; - } - - /// Remove keys from oid omap - void omap_rmkeys( - const coll_t &cid, ///< [in] Collection containing oid - const ghobject_t &oid, ///< [in] Object from which to remove the omap - const ceph::buffer::list &keys_bl ///< [in] Keys to clear - ) { - Op* _op = _get_next_op(); - _op->op = OP_OMAP_RMKEYS; - _op->cid = _get_coll_id(cid); - _op->oid = _get_object_id(oid); - data_bl.append(keys_bl); - data.ops++; - } - - /// Remove key range from oid omap - void omap_rmkeyrange( - const coll_t &cid, ///< [in] Collection containing oid - const ghobject_t &oid, ///< [in] Object from which to remove the omap keys - const std::string& first, ///< [in] first key in range - const std::string& last ///< [in] first key past range, range is [first,last) - ) { - using ceph::encode; - Op* _op = _get_next_op(); - _op->op = OP_OMAP_RMKEYRANGE; - _op->cid = _get_coll_id(cid); - _op->oid = _get_object_id(oid); - encode(first, data_bl); - encode(last, data_bl); - data.ops++; - } - - /// Set omap header - void omap_setheader( - const coll_t &cid, ///< [in] Collection containing oid - const ghobject_t &oid, ///< [in] Object - const ceph::buffer::list &bl ///< [in] Header value - ) { - using ceph::encode; - Op* _op = _get_next_op(); - _op->op = OP_OMAP_SETHEADER; - _op->cid = _get_coll_id(cid); - _op->oid = _get_object_id(oid); - encode(bl, data_bl); - data.ops++; - } - - /// Split collection based on given prefixes, objects matching the specified bits/rem are - /// moved to the new collection - void split_collection( - const coll_t &cid, - uint32_t bits, - uint32_t rem, - const coll_t &destination) { - Op* _op = _get_next_op(); - _op->op = OP_SPLIT_COLLECTION2; - _op->cid = _get_coll_id(cid); - _op->dest_cid = _get_coll_id(destination); - _op->split_bits = bits; - _op->split_rem = rem; - data.ops++; - } - - /// Merge collection into another. - void merge_collection( - coll_t cid, - coll_t destination, - uint32_t bits) { - Op* _op = _get_next_op(); - _op->op = OP_MERGE_COLLECTION; - _op->cid = _get_coll_id(cid); - _op->dest_cid = _get_coll_id(destination); - _op->split_bits = bits; - data.ops++; - } - - void collection_set_bits( - const coll_t &cid, - int bits) { - Op* _op = _get_next_op(); - _op->op = OP_COLL_SET_BITS; - _op->cid = _get_coll_id(cid); - _op->split_bits = bits; - data.ops++; - } - - /// Set allocation hint for an object - /// make 0 values(expected_object_size, expected_write_size) noops for all implementations - void set_alloc_hint( - const coll_t &cid, - const ghobject_t &oid, - uint64_t expected_object_size, - uint64_t expected_write_size, - uint32_t flags - ) { - Op* _op = _get_next_op(); - _op->op = OP_SETALLOCHINT; - _op->cid = _get_coll_id(cid); - _op->oid = _get_object_id(oid); - _op->expected_object_size = expected_object_size; - _op->expected_write_size = expected_write_size; - _op->alloc_hint_flags = flags; - data.ops++; - } - - void encode(ceph::buffer::list& bl) const { - //layout: data_bl + op_bl + coll_index + object_index + data - ENCODE_START(9, 9, bl); - encode(data_bl, bl); - encode(op_bl, bl); - encode(coll_index, bl); - encode(object_index, bl); - data.encode(bl); - ENCODE_FINISH(bl); - } - - void decode(ceph::buffer::list::const_iterator &bl) { - DECODE_START(9, bl); - DECODE_OLDEST(9); - - decode(data_bl, bl); - decode(op_bl, bl); - decode(coll_index, bl); - decode(object_index, bl); - data.decode(bl); - coll_id = coll_index.size(); - object_id = object_index.size(); - - DECODE_FINISH(bl); - } - - void dump(ceph::Formatter *f); - static void generate_test_instances(std::list& o); - }; - int queue_transaction(CollectionHandle& ch, Transaction&& t, TrackedOpRef op = TrackedOpRef(), @@ -1920,9 +695,5 @@ public: return false; } }; -WRITE_CLASS_ENCODER(ObjectStore::Transaction) -WRITE_CLASS_ENCODER(ObjectStore::Transaction::TransactionData) - -std::ostream& operator<<(std::ostream& out, const ObjectStore::Transaction& tx); #endif diff --git a/src/os/Transaction.cc b/src/os/Transaction.cc index ad390a1c3eee..5a5ae0cb6a29 100644 --- a/src/os/Transaction.cc +++ b/src/os/Transaction.cc @@ -1,10 +1,47 @@ // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab -#include "ObjectStore.h" +#include "os/Transaction.h" #include "common/Formatter.h" -void ObjectStore::Transaction::dump(ceph::Formatter *f) +void decode_str_str_map_to_bl(bufferlist::const_iterator& p, + bufferlist *out) +{ + auto start = p; + __u32 n; + decode(n, p); + unsigned len = 4; + while (n--) { + __u32 l; + decode(l, p); + p.advance(l); + len += 4 + l; + decode(l, p); + p.advance(l); + len += 4 + l; + } + start.copy(len, *out); +} + +void decode_str_set_to_bl(bufferlist::const_iterator& p, + bufferlist *out) +{ + auto start = p; + __u32 n; + decode(n, p); + unsigned len = 4; + while (n--) { + __u32 l; + decode(l, p); + p.advance(l); + len += 4 + l; + } + start.copy(len, *out); +} + +namespace ceph::os { + +void Transaction::dump(ceph::Formatter *f) { f->open_array_section("ops"); iterator i = begin(); @@ -470,7 +507,7 @@ void ObjectStore::Transaction::dump(ceph::Formatter *f) #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wdeprecated-declarations" -void ObjectStore::Transaction::generate_test_instances(list& o) +void Transaction::generate_test_instances(list& o) { o.push_back(new Transaction); @@ -512,5 +549,12 @@ void ObjectStore::Transaction::generate_test_instances(list on_applied; std::list on_commit; @@ -245,10 +252,10 @@ private: public: Transaction() = default; - explicit Transaction(bufferlist::const_iterator &dp) { + explicit Transaction(ceph::buffer::list::const_iterator &dp) { decode(dp); } - explicit Transaction(bufferlist &nbl) { + explicit Transaction(ceph::buffer::list &nbl) { auto dp = nbl.cbegin(); decode(dp); } @@ -289,7 +296,7 @@ public: Transaction& operator=(const Transaction& other) = default; // expose object_index for FileStore::Op's benefit - const map& get_object_index() const { + const std::map& get_object_index() const { return object_index; } @@ -314,40 +321,42 @@ public: } bool has_contexts() const { return - !on_commit.empty() || - !on_applied.empty() || - !on_applied_sync.empty(); + !on_commit.empty() || + !on_applied.empty() || + !on_applied_sync.empty(); } - static void collect_contexts(vector& t, - Context **out_on_applied, - Context **out_on_commit, - Context **out_on_applied_sync) { + static void collect_contexts( + std::vector& t, + Context **out_on_applied, + Context **out_on_commit, + Context **out_on_applied_sync) { ceph_assert(out_on_applied); ceph_assert(out_on_commit); ceph_assert(out_on_applied_sync); std::list on_applied, on_commit, on_applied_sync; for (auto& i : t) { - on_applied.splice(on_applied.end(), i.on_applied); - on_commit.splice(on_commit.end(), i.on_commit); - on_applied_sync.splice(on_applied_sync.end(), i.on_applied_sync); + on_applied.splice(on_applied.end(), i.on_applied); + on_commit.splice(on_commit.end(), i.on_commit); + on_applied_sync.splice(on_applied_sync.end(), i.on_applied_sync); } *out_on_applied = C_Contexts::list_to_context(on_applied); *out_on_commit = C_Contexts::list_to_context(on_commit); *out_on_applied_sync = C_Contexts::list_to_context(on_applied_sync); } - static void collect_contexts(vector& t, - std::list *out_on_applied, - std::list *out_on_commit, - std::list *out_on_applied_sync) { + static void collect_contexts( + std::vector& t, + std::list *out_on_applied, + std::list *out_on_commit, + std::list *out_on_applied_sync) { ceph_assert(out_on_applied); ceph_assert(out_on_commit); ceph_assert(out_on_applied_sync); for (auto& i : t) { - out_on_applied->splice(out_on_applied->end(), i.on_applied); - out_on_commit->splice(out_on_commit->end(), i.on_commit); - out_on_applied_sync->splice(out_on_applied_sync->end(), - i.on_applied_sync); + out_on_applied->splice(out_on_applied->end(), i.on_applied); + out_on_commit->splice(out_on_commit->end(), i.on_commit); + out_on_applied_sync->splice(out_on_applied_sync->end(), + i.on_applied_sync); } } @@ -374,7 +383,7 @@ public: std::swap(on_applied, other.on_applied); std::swap(on_commit, other.on_commit); std::swap(on_applied_sync, other.on_applied_sync); - + std::swap(coll_index, other.coll_index); std::swap(object_index, other.object_index); std::swap(coll_id, other.coll_id); @@ -384,8 +393,8 @@ public: } void _update_op(Op* op, - vector<__le32> &cm, - vector<__le32> &om) { + std::vector<__le32> &cm, + std::vector<__le32> &om) { switch (op->op) { case OP_NOP: @@ -442,7 +451,7 @@ public: op->dest_cid = cm[op->dest_cid]; op->oid = om[op->oid]; break; - + case OP_COLL_MOVE_RENAME: ceph_assert(op->cid < cm.size()); ceph_assert(op->oid < om.size()); @@ -453,7 +462,7 @@ public: op->dest_cid = cm[op->dest_cid]; op->dest_oid = om[op->dest_oid]; break; - + case OP_TRY_RENAME: ceph_assert(op->cid < cm.size()); ceph_assert(op->oid < om.size()); @@ -461,136 +470,135 @@ public: op->cid = cm[op->cid]; op->oid = om[op->oid]; op->dest_oid = om[op->dest_oid]; - break; - + break; + case OP_SPLIT_COLLECTION2: ceph_assert(op->cid < cm.size()); - ceph_assert(op->dest_cid < cm.size()); + ceph_assert(op->dest_cid < cm.size()); op->cid = cm[op->cid]; op->dest_cid = cm[op->dest_cid]; break; - + case OP_MERGE_COLLECTION: ceph_assert(op->cid < cm.size()); - ceph_assert(op->dest_cid < cm.size()); + ceph_assert(op->dest_cid < cm.size()); op->cid = cm[op->cid]; op->dest_cid = cm[op->dest_cid]; break; - + default: ceph_abort_msg("Unknown OP"); } } void _update_op_bl( - bufferlist& bl, - vector<__le32> &cm, - vector<__le32> &om) { + ceph::buffer::list& bl, + std::vector<__le32> &cm, + std::vector<__le32> &om) { for (auto& bp : bl.buffers()) { ceph_assert(bp.length() % sizeof(Op) == 0); - + char* raw_p = const_cast(bp.c_str()); char* raw_end = raw_p + bp.length(); while (raw_p < raw_end) { - _update_op(reinterpret_cast(raw_p), cm, om); - raw_p += sizeof(Op); + _update_op(reinterpret_cast(raw_p), cm, om); + raw_p += sizeof(Op); } } } - /// Append the operations of the parameter to this Transaction. Those - /// operations are removed from the parameter Transaction + /// Append the operations of the parameter to this Transaction. Those operations are removed from the parameter Transaction void append(Transaction& other) { data.ops += other.data.ops; if (other.data.largest_data_len > data.largest_data_len) { - data.largest_data_len = other.data.largest_data_len; - data.largest_data_off = other.data.largest_data_off; - data.largest_data_off_in_data_bl = data_bl.length() + other.data.largest_data_off_in_data_bl; + data.largest_data_len = other.data.largest_data_len; + data.largest_data_off = other.data.largest_data_off; + data.largest_data_off_in_data_bl = data_bl.length() + other.data.largest_data_off_in_data_bl; } data.fadvise_flags |= other.data.fadvise_flags; on_applied.splice(on_applied.end(), other.on_applied); on_commit.splice(on_commit.end(), other.on_commit); on_applied_sync.splice(on_applied_sync.end(), other.on_applied_sync); - + //append coll_index & object_index - vector<__le32> cm(other.coll_index.size()); - map::iterator coll_index_p; + std::vector<__le32> cm(other.coll_index.size()); + std::map::iterator coll_index_p; for (coll_index_p = other.coll_index.begin(); - coll_index_p != other.coll_index.end(); - ++coll_index_p) { + coll_index_p != other.coll_index.end(); + ++coll_index_p) { cm[coll_index_p->second] = _get_coll_id(coll_index_p->first); } - - vector<__le32> om(other.object_index.size()); - map::iterator object_index_p; + + std::vector<__le32> om(other.object_index.size()); + std::map::iterator object_index_p; for (object_index_p = other.object_index.begin(); - object_index_p != other.object_index.end(); - ++object_index_p) { + object_index_p != other.object_index.end(); + ++object_index_p) { om[object_index_p->second] = _get_object_id(object_index_p->first); - } - + } + //the other.op_bl SHOULD NOT be changes during append operation, - //we use additional bufferlist to avoid this problem - bufferlist other_op_bl; + //we use additional ceph::buffer::list to avoid this problem + ceph::buffer::list other_op_bl; { - bufferptr other_op_bl_ptr(other.op_bl.length()); + ceph::buffer::ptr other_op_bl_ptr(other.op_bl.length()); other.op_bl.copy(0, other.op_bl.length(), other_op_bl_ptr.c_str()); other_op_bl.append(std::move(other_op_bl_ptr)); } - + //update other_op_bl with cm & om //When the other is appended to current transaction, all coll_index and //object_index in other.op_buffer should be updated by new index of the //combined transaction _update_op_bl(other_op_bl, cm, om); - + //append op_bl op_bl.append(other_op_bl); //append data_bl data_bl.append(other.data_bl); } - + /** Inquires about the Transaction as a whole. */ - + /// How big is the encoded Transaction buffer? uint64_t get_encoded_bytes() { //layout: data_bl + op_bl + coll_index + object_index + data - + // coll_index size, object_index size and sizeof(transaction_data) // all here, so they may be computed at compile-time size_t final_size = sizeof(__u32) * 2 + sizeof(data); - + // coll_index second and object_index second final_size += (coll_index.size() + object_index.size()) * sizeof(__le32); - + // coll_index first for (auto p = coll_index.begin(); p != coll_index.end(); ++p) { - final_size += p->first.encoded_size(); + final_size += p->first.encoded_size(); } - + // object_index first for (auto p = object_index.begin(); p != object_index.end(); ++p) { - final_size += p->first.encoded_size(); + final_size += p->first.encoded_size(); } - + return data_bl.length() + - op_bl.length() + - final_size; + op_bl.length() + + final_size; } - + /// Retain old version for regression testing purposes uint64_t get_encoded_bytes_test() { using ceph::encode; //layout: data_bl + op_bl + coll_index + object_index + data - bufferlist bl; + ceph::buffer::list bl; encode(coll_index, bl); encode(object_index, bl); - + return data_bl.length() + - op_bl.length() + - bl.length() + - sizeof(data); + op_bl.length() + + bl.length() + + sizeof(data); } - + uint64_t get_num_bytes() { return get_encoded_bytes(); } @@ -598,22 +606,21 @@ public: uint32_t get_data_length() { return data.largest_data_len; } - /// offset within the encoded buffer to the start of the largest data buffer - /// that's encoded + /// offset within the encoded buffer to the start of the largest data buffer that's encoded uint32_t get_data_offset() { if (data.largest_data_off_in_data_bl) { - return data.largest_data_off_in_data_bl + - sizeof(__u8) + // encode struct_v - sizeof(__u8) + // encode compat_v - sizeof(__u32) + // encode len - sizeof(__u32); // data_bl len + return data.largest_data_off_in_data_bl + + sizeof(__u8) + // encode struct_v + sizeof(__u8) + // encode compat_v + sizeof(__u32) + // encode len + sizeof(__u32); // data_bl len } return 0; // none } /// offset of buffer as aligned to destination within object. int get_data_alignment() { if (!data.largest_data_len) - return 0; + return 0; return (0 - get_data_offset()) & ~CEPH_PAGE_MASK; } /// Is the Transaction empty (no operations) @@ -624,7 +631,7 @@ public: int get_num_ops() { return data.ops; } - + /** * iterator * @@ -636,86 +643,90 @@ public: */ class iterator { Transaction *t; - + uint64_t ops; char* op_buffer_p; - - bufferlist::const_iterator data_bl_p; - + + ceph::buffer::list::const_iterator data_bl_p; + public: - vector colls; - vector objects; - + std::vector colls; + std::vector objects; + private: explicit iterator(Transaction *t) : t(t), - data_bl_p(t->data_bl.cbegin()), - colls(t->coll_index.size()), - objects(t->object_index.size()) { - + data_bl_p(t->data_bl.cbegin()), + colls(t->coll_index.size()), + objects(t->object_index.size()) { + ops = t->data.ops; op_buffer_p = t->op_bl.c_str(); - - map::iterator coll_index_p; + + std::map::iterator coll_index_p; for (coll_index_p = t->coll_index.begin(); - coll_index_p != t->coll_index.end(); - ++coll_index_p) { - colls[coll_index_p->second] = coll_index_p->first; + coll_index_p != t->coll_index.end(); + ++coll_index_p) { + colls[coll_index_p->second] = coll_index_p->first; } - - map::iterator object_index_p; + + std::map::iterator object_index_p; for (object_index_p = t->object_index.begin(); - object_index_p != t->object_index.end(); - ++object_index_p) { - objects[object_index_p->second] = object_index_p->first; + object_index_p != t->object_index.end(); + ++object_index_p) { + objects[object_index_p->second] = object_index_p->first; } } - + friend class Transaction; - + public: - + bool have_op() { return ops > 0; } Op* decode_op() { ceph_assert(ops > 0); - + Op* op = reinterpret_cast(op_buffer_p); op_buffer_p += sizeof(Op); ops--; - + return op; } - string decode_string() { - using ceph::decode; - string s; + std::string decode_string() { + using ceph::decode; + std::string s; decode(s, data_bl_p); return s; } - void decode_bp(bufferptr& bp) { - using ceph::decode; + void decode_bp(ceph::buffer::ptr& bp) { + using ceph::decode; decode(bp, data_bl_p); } - void decode_bl(bufferlist& bl) { - using ceph::decode; + void decode_bl(ceph::buffer::list& bl) { + using ceph::decode; decode(bl, data_bl_p); } - void decode_attrset(map& aset) { - using ceph::decode; + void decode_attrset(std::map& aset) { + using ceph::decode; decode(aset, data_bl_p); } - void decode_attrset(map& aset) { - using ceph::decode; + void decode_attrset(std::map& aset) { + using ceph::decode; decode(aset, data_bl_p); } - void decode_attrset_bl(bufferlist *pbl); - void decode_keyset(set &keys){ - using ceph::decode; + void decode_attrset_bl(ceph::buffer::list *pbl) { + decode_str_str_map_to_bl(data_bl_p, pbl); + } + void decode_keyset(std::set &keys){ + using ceph::decode; decode(keys, data_bl_p); } - void decode_keyset_bl(bufferlist *pbl); - + void decode_keyset_bl(ceph::buffer::list *pbl){ + decode_str_set_to_bl(data_bl_p, pbl); + } + const ghobject_t &get_oid(__le32 oid_id) { ceph_assert(oid_id < objects.size()); return objects[oid_id]; @@ -725,18 +736,17 @@ public: return colls[cid_id]; } uint32_t get_fadvise_flags() const { - return t->get_fadvise_flags(); + return t->get_fadvise_flags(); } }; - + iterator begin() { - return iterator(this); + return iterator(this); } - + private: void _build_actions_from_tbl(); - static constexpr size_t OPS_PER_PTR = 32u; /** * Helper functions to encode the various mutation elements of a * transaction. These are 1:1 with the operation codes (see @@ -756,24 +766,24 @@ private: return reinterpret_cast(p); } __le32 _get_coll_id(const coll_t& coll) { - map::iterator c = coll_index.find(coll); + std::map::iterator c = coll_index.find(coll); if (c != coll_index.end()) return c->second; - + __le32 index_id = coll_id++; coll_index[coll] = index_id; return index_id; } __le32 _get_object_id(const ghobject_t& oid) { - map::iterator o = object_index.find(oid); + std::map::iterator o = object_index.find(oid); if (o != object_index.end()) return o->second; - + __le32 index_id = object_id++; object_index[oid] = index_id; return index_id; } - + public: /// noop. 'nuf said void nop() { @@ -807,7 +817,7 @@ public: * Note that a 0-length write does not affect the size of the object. */ void write(const coll_t& cid, const ghobject_t& oid, uint64_t off, uint64_t len, - const bufferlist& write_data, uint32_t flags = 0) { + const ceph::buffer::list& write_data, uint32_t flags = 0) { using ceph::encode; uint32_t orig_len = data_bl.length(); Op* _op = _get_next_op(); @@ -817,14 +827,13 @@ public: _op->off = off; _op->len = len; encode(write_data, data_bl); - + ceph_assert(len == write_data.length()); data.fadvise_flags = data.fadvise_flags | flags; if (write_data.length() > data.largest_data_len) { - data.largest_data_len = write_data.length(); - data.largest_data_off = off; - // we are about to - data.largest_data_off_in_data_bl = orig_len + sizeof(__u32); + data.largest_data_len = write_data.length(); + data.largest_data_off = off; + data.largest_data_off_in_data_bl = orig_len + sizeof(__u32); // we are about to } data.ops++; } @@ -865,15 +874,12 @@ public: data.ops++; } /// Set an xattr of an object - void setattr(const coll_t& cid, const ghobject_t& oid, - const char* name, - bufferlist&& val) { - string n(name); + void setattr(const coll_t& cid, const ghobject_t& oid, const char* name, ceph::buffer::list& val) { + std::string n(name); setattr(cid, oid, n, val); } /// Set an xattr of an object - void setattr(const coll_t& cid, const ghobject_t& oid, - const string& s, bufferlist& val) { + void setattr(const coll_t& cid, const ghobject_t& oid, const std::string& s, ceph::buffer::list& val) { using ceph::encode; Op* _op = _get_next_op(); _op->op = OP_SETATTR; @@ -884,8 +890,7 @@ public: data.ops++; } /// Set multiple xattrs of an object - void setattrs(const coll_t& cid, const ghobject_t& oid, - const map& attrset) { + void setattrs(const coll_t& cid, const ghobject_t& oid, const std::map& attrset) { using ceph::encode; Op* _op = _get_next_op(); _op->op = OP_SETATTRS; @@ -895,8 +900,7 @@ public: data.ops++; } /// Set multiple xattrs of an object - void setattrs(const coll_t& cid, const ghobject_t& oid, - const map& attrset) { + void setattrs(const coll_t& cid, const ghobject_t& oid, const std::map& attrset) { using ceph::encode; Op* _op = _get_next_op(); _op->op = OP_SETATTRS; @@ -907,11 +911,11 @@ public: } /// remove an xattr from an object void rmattr(const coll_t& cid, const ghobject_t& oid, const char *name) { - string n(name); + std::string n(name); rmattr(cid, oid, n); } /// remove an xattr from an object - void rmattr(const coll_t& cid, const ghobject_t& oid, const string& s) { + void rmattr(const coll_t& cid, const ghobject_t& oid, const std::string& s) { using ceph::encode; Op* _op = _get_next_op(); _op->op = OP_RMATTR; @@ -940,7 +944,7 @@ public: * which case its previous contents are discarded. */ void clone(const coll_t& cid, const ghobject_t& oid, - const ghobject_t& noid) { + const ghobject_t& noid) { Op* _op = _get_next_op(); _op->op = OP_CLONE; _op->cid = _get_coll_id(cid); @@ -961,8 +965,8 @@ public: * not the result is undefined. */ void clone_range(const coll_t& cid, const ghobject_t& oid, - const ghobject_t& noid, - uint64_t srcoff, uint64_t srclen, uint64_t dstoff) { + const ghobject_t& noid, + uint64_t srcoff, uint64_t srclen, uint64_t dstoff) { Op* _op = _get_next_op(); _op->op = OP_CLONERANGE2; _op->cid = _get_coll_id(cid); @@ -973,7 +977,7 @@ public: _op->dest_off = dstoff; data.ops++; } - + /// Create the collection void create_collection(const coll_t& cid, int bits) { Op* _op = _get_next_op(); @@ -982,7 +986,7 @@ public: _op->split_bits = bits; data.ops++; } - + /** * Give the collection a hint. * @@ -991,7 +995,7 @@ public: * @param hint - the hint payload, which contains the customized * data along with the hint type. */ - void collection_hint(const coll_t& cid, uint32_t type, const bufferlist& hint) { + void collection_hint(const coll_t& cid, uint32_t type, const ceph::buffer::list& hint) { using ceph::encode; Op* _op = _get_next_op(); _op->op = OP_COLL_HINT; @@ -1000,7 +1004,7 @@ public: encode(hint, data_bl); data.ops++; } - + /// remove the collection, the collection must be empty void remove_collection(const coll_t& cid) { Op* _op = _get_next_op(); @@ -1008,26 +1012,25 @@ public: _op->cid = _get_coll_id(cid); data.ops++; } - void collection_move(const coll_t& cid, const coll_t &oldcid, - const ghobject_t& oid) + void collection_move(const coll_t& cid, const coll_t &oldcid, const ghobject_t& oid) __attribute__ ((deprecated)) { - // NOTE: we encode this as a fixed combo of ADD + REMOVE. they - // always appear together, so this is effectively a single MOVE. - Op* _op = _get_next_op(); - _op->op = OP_COLL_ADD; - _op->cid = _get_coll_id(oldcid); - _op->oid = _get_object_id(oid); - _op->dest_cid = _get_coll_id(cid); - data.ops++; - - _op = _get_next_op(); - _op->op = OP_COLL_REMOVE; - _op->cid = _get_coll_id(oldcid); - _op->oid = _get_object_id(oid); - data.ops++; - } + // NOTE: we encode this as a fixed combo of ADD + REMOVE. they + // always appear together, so this is effectively a single MOVE. + Op* _op = _get_next_op(); + _op->op = OP_COLL_ADD; + _op->cid = _get_coll_id(oldcid); + _op->oid = _get_object_id(oid); + _op->dest_cid = _get_coll_id(cid); + data.ops++; + + _op = _get_next_op(); + _op->op = OP_COLL_REMOVE; + _op->cid = _get_coll_id(oldcid); + _op->oid = _get_object_id(oid); + data.ops++; + } void collection_move_rename(const coll_t& oldcid, const ghobject_t& oldoid, - const coll_t &cid, const ghobject_t& oid) { + const coll_t &cid, const ghobject_t& oid) { Op* _op = _get_next_op(); _op->op = OP_COLL_MOVE_RENAME; _op->cid = _get_coll_id(oldcid); @@ -1037,7 +1040,7 @@ public: data.ops++; } void try_rename(const coll_t &cid, const ghobject_t& oldoid, - const ghobject_t& oid) { + const ghobject_t& oid) { Op* _op = _get_next_op(); _op->op = OP_TRY_RENAME; _op->cid = _get_coll_id(cid); @@ -1045,7 +1048,7 @@ public: _op->dest_oid = _get_object_id(oid); data.ops++; } - + /// Remove omap from oid void omap_clear( const coll_t &cid, ///< [in] Collection containing oid @@ -1061,7 +1064,7 @@ public: void omap_setkeys( const coll_t& cid, ///< [in] Collection containing oid const ghobject_t &oid, ///< [in] Object to update - const map &attrset ///< [in] Replacement keys and values + const std::map &attrset ///< [in] Replacement keys and values ) { using ceph::encode; Op* _op = _get_next_op(); @@ -1072,12 +1075,12 @@ public: data.ops++; } - /// Set keys on an oid omap (bufferlist variant). + /// Set keys on an oid omap (ceph::buffer::list variant). void omap_setkeys( const coll_t &cid, ///< [in] Collection containing oid const ghobject_t &oid, ///< [in] Object to update - const bufferlist &attrset_bl ///< [in] Replacement keys and values - ) { + const ceph::buffer::list &attrset_bl ///< [in] Replacement keys and values + ) { Op* _op = _get_next_op(); _op->op = OP_OMAP_SETKEYS; _op->cid = _get_coll_id(cid); @@ -1090,7 +1093,7 @@ public: void omap_rmkeys( const coll_t &cid, ///< [in] Collection containing oid const ghobject_t &oid, ///< [in] Object from which to remove the omap - const set &keys ///< [in] Keys to clear + const std::set &keys ///< [in] Keys to clear ) { using ceph::encode; Op* _op = _get_next_op(); @@ -1105,7 +1108,7 @@ public: void omap_rmkeys( const coll_t &cid, ///< [in] Collection containing oid const ghobject_t &oid, ///< [in] Object from which to remove the omap - const bufferlist &keys_bl ///< [in] Keys to clear + const ceph::buffer::list &keys_bl ///< [in] Keys to clear ) { Op* _op = _get_next_op(); _op->op = OP_OMAP_RMKEYS; @@ -1114,29 +1117,29 @@ public: data_bl.append(keys_bl); data.ops++; } - + /// Remove key range from oid omap void omap_rmkeyrange( const coll_t &cid, ///< [in] Collection containing oid const ghobject_t &oid, ///< [in] Object from which to remove the omap keys - const string& first, ///< [in] first key in range - const string& last ///< [in] first key past range, range is [first,last) + const std::string& first, ///< [in] first key in range + const std::string& last ///< [in] first key past range, range is [first,last) ) { - using ceph::encode; - Op* _op = _get_next_op(); - _op->op = OP_OMAP_RMKEYRANGE; - _op->cid = _get_coll_id(cid); - _op->oid = _get_object_id(oid); - encode(first, data_bl); - encode(last, data_bl); - data.ops++; - } + using ceph::encode; + Op* _op = _get_next_op(); + _op->op = OP_OMAP_RMKEYRANGE; + _op->cid = _get_coll_id(cid); + _op->oid = _get_object_id(oid); + encode(first, data_bl); + encode(last, data_bl); + data.ops++; + } /// Set omap header void omap_setheader( const coll_t &cid, ///< [in] Collection containing oid const ghobject_t &oid, ///< [in] Object - const bufferlist &bl ///< [in] Header value + const ceph::buffer::list &bl ///< [in] Header value ) { using ceph::encode; Op* _op = _get_next_op(); @@ -1147,8 +1150,8 @@ public: data.ops++; } - /// Split collection based on given prefixes, objects matching the specified - /// bits/rem are moved to the new collection + /// Split collection based on given prefixes, objects matching the specified bits/rem are + /// moved to the new collection void split_collection( const coll_t &cid, uint32_t bits, @@ -1205,7 +1208,7 @@ public: data.ops++; } - void encode(bufferlist& bl) const { + void encode(ceph::buffer::list& bl) const { //layout: data_bl + op_bl + coll_index + object_index + data ENCODE_START(9, 9, bl); encode(data_bl, bl); @@ -1216,7 +1219,7 @@ public: ENCODE_FINISH(bl); } - void decode(bufferlist::const_iterator &bl) { + void decode(ceph::buffer::list::const_iterator &bl) { DECODE_START(9, bl); DECODE_OLDEST(9); @@ -1232,5 +1235,11 @@ public: } void dump(ceph::Formatter *f); + static void generate_test_instances(std::list& o); }; +WRITE_CLASS_ENCODER(Transaction) +WRITE_CLASS_ENCODER(Transaction::TransactionData) + +std::ostream& operator<<(std::ostream& out, const Transaction& tx); + } -- 2.47.3