From 5c2210d9e791d879e2dffa611aea50e7902a03d8 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Mon, 8 Nov 2021 09:05:45 -0500 Subject: [PATCH] test/rgw: model multipart uploads in ceph_test_cls_rgw_stats Signed-off-by: Casey Bodley --- src/test/cls_rgw/test_cls_rgw_stats.cc | 354 +++++++++++++++++++------ 1 file changed, 274 insertions(+), 80 deletions(-) diff --git a/src/test/cls_rgw/test_cls_rgw_stats.cc b/src/test/cls_rgw/test_cls_rgw_stats.cc index e1e5c727918..5174c1a08bd 100644 --- a/src/test/cls_rgw/test_cls_rgw_stats.cc +++ b/src/test/cls_rgw/test_cls_rgw_stats.cc @@ -16,6 +16,22 @@ #define dout_subsys ceph_subsys_rgw #define dout_context g_ceph_context + +// simulation parameters: + +// total number of index operations to prepare +constexpr size_t max_operations = 2048; +// total number of object names. each operation chooses one at random +constexpr size_t max_entries = 32; +// maximum number of pending operations. once the limit is reached, the oldest +// pending operation is finished before another can start +constexpr size_t max_pending = 16; +// object size is randomly distributed between 0 and 4M +constexpr size_t max_object_size = 4*1024*1024; +// multipart upload threshold +constexpr size_t max_part_size = 1024*1024; + + // create/destroy a pool that's shared by all tests in the process struct RadosEnv : public ::testing::Environment { static std::optional pool_name; @@ -48,6 +64,8 @@ std::ostream& operator<<(std::ostream& out, const rgw_bucket_category_stats& c) return out << "{count=" << c.num_entries << " size=" << c.total_size << '}'; } + +// librados helpers rgw_bucket_entry_ver last_version(librados::IoCtx& ioctx) { rgw_bucket_entry_ver ver; @@ -79,10 +97,10 @@ int index_prepare(librados::IoCtx& ioctx, const std::string& oid, int index_complete(librados::IoCtx& ioctx, const std::string& oid, const cls_rgw_obj_key& key, const std::string& tag, RGWModifyOp type, const rgw_bucket_entry_ver& ver, - const rgw_bucket_dir_entry_meta& meta) + const rgw_bucket_dir_entry_meta& meta, + std::list* remove_objs) { librados::ObjectWriteOperation op; - constexpr std::list* remove_objs = nullptr; constexpr bool log_op = false; constexpr int flags = 0; constexpr rgw_zone_set* zones = nullptr; @@ -122,6 +140,7 @@ static void unaccount_entry(rgw_bucket_dir_stats& stats, } +// a map of cached dir entries representing the expected state of cls_rgw struct object : rgw_bucket_dir_entry, boost::intrusive::set_base_hook<> { explicit object(const cls_rgw_obj_key& key) { this->key = key; @@ -142,55 +161,51 @@ struct object_map : object_map_base { } }; + +// models a bucket index operation, starting with cls_rgw_bucket_prepare_op(). +// stores all of the state necessary to complete the operation, either with +// cls_rgw_bucket_complete_op() or cls_rgw_suggest_changes(). uploads larger +// than max_part_size are modeled as multipart uploads struct operation { RGWModifyOp type; cls_rgw_obj_key key; std::string tag; rgw_bucket_entry_ver ver; + std::string upload_id; // empty unless multipart rgw_bucket_dir_entry_meta meta; }; -struct config { - size_t max_operations = 0; - size_t max_entries = 0; - size_t max_pending = 0; - size_t max_object_size = 0; -}; class simulator { public: - simulator(librados::IoCtx& ioctx, - std::string oid, - const config& cfg) - : ioctx(ioctx), - oid(std::move(oid)), - cfg(cfg), - pending(cfg.max_pending) + simulator(librados::IoCtx& ioctx, std::string oid) + : ioctx(ioctx), oid(std::move(oid)), pending(max_pending) { // generate a set of object keys. each operation chooses one at random - keys.reserve(cfg.max_entries); - for (size_t i = 0; i < cfg.max_entries; i++) { - keys.emplace_back(gen_rand_alphanumeric(g_ceph_context, 12)); + keys.reserve(max_entries); + for (size_t i = 0; i < max_entries; i++) { + keys.emplace_back(gen_rand_alphanumeric_upper(g_ceph_context, 12)); } } void run(); private: - void step(); void start(); int try_start(const cls_rgw_obj_key& key, const std::string& tag); + void finish(const operation& op); void complete(const operation& op, RGWModifyOp type); void suggest(const operation& op, char suggestion); + int init_multipart(const operation& op); + void complete_multipart(const operation& op); + object_map::iterator find_or_create(const cls_rgw_obj_key& key); - void check_stats(); librados::IoCtx& ioctx; std::string oid; - const config& cfg; std::vector keys; object_map objects; @@ -203,24 +218,32 @@ void simulator::run() { // init the bucket index object ASSERT_EQ(0, index_init(ioctx, oid)); - // for the simulation for N steps - for (size_t i = 0; i < cfg.max_operations; i++) { - step(); - } + // run the simulation for N steps + for (size_t i = 0; i < max_operations; i++) { + if (pending.full()) { + // if we're at max_pending, finish the oldest operation + auto& op = pending.front(); + finish(op); + pending.pop_front(); + + // verify bucket stats + rgw_bucket_dir_stats stored_stats; + read_stats(ioctx, oid, stored_stats); + + const rgw_bucket_dir_stats& expected_stats = stats; + ASSERT_EQ(expected_stats, stored_stats); + } - // TODO: recalc stats and compare -} + // initiate the next operation + start(); -void simulator::step() -{ - if (pending.full()) { - auto& op = pending.front(); - finish(op); - pending.pop_front(); - check_stats(); + // verify bucket stats + rgw_bucket_dir_stats stored_stats; + read_stats(ioctx, oid, stored_stats); + + const rgw_bucket_dir_stats& expected_stats = stats; + ASSERT_EQ(expected_stats, stored_stats); } - start(); - check_stats(); } object_map::iterator simulator::find_or_create(const cls_rgw_obj_key& key) @@ -240,31 +263,45 @@ int simulator::try_start(const cls_rgw_obj_key& key, const std::string& tag) const auto type = static_cast( ceph::util::generate_random_number(CLS_RGW_OP_ADD, CLS_RGW_OP_DEL)); - // prepare operation - int r = index_prepare(ioctx, oid, key, tag, type); - if (r != 0) { - derr << "> failed to prepare operation key=" << key - << " tag=" << tag << " type=" << type - << ": " << cpp_strerror(r) << dendl; - return r; - } - - // on success, initialize the pending operation - auto op = operation{type, key, tag, last_version(ioctx)}; + auto op = operation{type, key, tag}; - op.meta.category = static_cast( - ceph::util::generate_random_number(0, 1)); + op.meta.category = RGWObjCategory::Main; op.meta.size = op.meta.accounted_size = - ceph::util::generate_random_number(1, cfg.max_object_size); + ceph::util::generate_random_number(1, max_object_size); + + if (type == CLS_RGW_OP_ADD && op.meta.size > max_part_size) { + // simulate multipart for uploads over the max_part_size threshold + op.upload_id = gen_rand_alphanumeric_upper(g_ceph_context, 12); + + int r = init_multipart(op); + if (r != 0) { + derr << "> failed to prepare multipart upload key=" << key + << " upload=" << op.upload_id << " tag=" << tag + << " type=" << type << ": " << cpp_strerror(r) << dendl; + return r; + } + + dout(1) << "> prepared multipart upload key=" << key + << " upload=" << op.upload_id << " tag=" << tag + << " type=" << type << " size=" << op.meta.size << dendl; + } else { + // prepare operation + int r = index_prepare(ioctx, oid, op.key, op.tag, op.type); + if (r != 0) { + derr << "> failed to prepare operation key=" << key + << " tag=" << tag << " type=" << type + << ": " << cpp_strerror(r) << dendl; + return r; + } + + dout(1) << "> prepared operation key=" << key + << " tag=" << tag << " type=" << type + << " size=" << op.meta.size << dendl; + } + op.ver = last_version(ioctx); - dout(1) << "> prepared operation key=" << key - << " tag=" << tag << " type=" << type - << " cat=" << op.meta.category - << " size=" << op.meta.size << dendl; ceph_assert(!pending.full()); pending.push_back(std::move(op)); - - find_or_create(key); return 0; } @@ -274,15 +311,21 @@ void simulator::start() const size_t index = ceph::util::generate_random_number(0, keys.size() - 1); const auto& key = keys[index]; // generate a random tag - const auto tag = gen_rand_alphanumeric(g_ceph_context, 12); + const auto tag = gen_rand_alphanumeric_upper(g_ceph_context, 12); - // retry until success + // retry until success. failures don't count towards max_operations while (try_start(key, tag) != 0) ; } void simulator::finish(const operation& op) { + if (op.type == CLS_RGW_OP_ADD && !op.upload_id.empty()) { + // multipart uploads either complete or abort based on part uploads + complete_multipart(op); + return; + } + // complete most operations, but finish some with cancel or dir suggest constexpr int cancel_percent = 10; constexpr int suggest_update_percent = 10; @@ -308,11 +351,11 @@ void simulator::finish(const operation& op) void simulator::complete(const operation& op, RGWModifyOp type) { - int r = index_complete(ioctx, oid, op.key, op.tag, type, op.ver, op.meta); + int r = index_complete(ioctx, oid, op.key, op.tag, type, + op.ver, op.meta, nullptr); if (r != 0) { derr << "< failed to complete operation key=" << op.key << " tag=" << op.tag << " type=" << op.type - << " cat=" << op.meta.category << " size=" << op.meta.size << ": " << cpp_strerror(r) << dendl; return; @@ -321,7 +364,6 @@ void simulator::complete(const operation& op, RGWModifyOp type) if (type == CLS_RGW_OP_CANCEL) { dout(1) << "< canceled operation key=" << op.key << " tag=" << op.tag << " type=" << op.type - << " cat=" << op.meta.category << " size=" << op.meta.size << dendl; } else if (type == CLS_RGW_OP_ADD) { auto obj = find_or_create(op.key); @@ -333,7 +375,6 @@ void simulator::complete(const operation& op, RGWModifyOp type) account_entry(stats, obj->meta); dout(1) << "< completed write operation key=" << op.key << " tag=" << op.tag << " type=" << type - << " cat=" << op.meta.category << " size=" << op.meta.size << dendl; } else { ceph_assert(type == CLS_RGW_OP_DEL); @@ -345,8 +386,7 @@ void simulator::complete(const operation& op, RGWModifyOp type) objects.erase_and_dispose(obj, std::default_delete{}); } dout(1) << "< completed delete operation key=" << op.key - << " tag=" << op.tag << " type=" << type - << " cat=" << op.meta.category << dendl; + << " tag=" << op.tag << " type=" << type << dendl; } } @@ -358,7 +398,6 @@ void simulator::suggest(const operation& op, char suggestion) if (r != 0) { derr << "< no bi entry to suggest for operation key=" << op.key << " tag=" << op.tag << " type=" << op.type - << " cat=" << op.meta.category << " size=" << op.meta.size << ": " << cpp_strerror(r) << dendl; return; @@ -396,7 +435,6 @@ void simulator::suggest(const operation& op, char suggestion) if (r != 0) { derr << "< failed to suggest operation key=" << op.key << " tag=" << op.tag << " type=" << op.type - << " cat=" << op.meta.category << " size=" << op.meta.size << ": " << cpp_strerror(r) << dendl; return; @@ -413,7 +451,6 @@ void simulator::suggest(const operation& op, char suggestion) account_entry(stats, obj->meta); dout(1) << "< suggested update operation key=" << op.key << " tag=" << op.tag << " type=" << op.type - << " cat=" << op.meta.category << " size=" << op.meta.size << dendl; } else { ceph_assert(suggestion == CEPH_RGW_REMOVE); @@ -425,28 +462,185 @@ void simulator::suggest(const operation& op, char suggestion) objects.erase_and_dispose(obj, std::default_delete{}); } dout(1) << "< suggested remove operation key=" << op.key - << " tag=" << op.tag << " type=" << op.type - << " cat=" << op.meta.category << dendl; + << " tag=" << op.tag << " type=" << op.type << dendl; } } -void simulator::check_stats() +int simulator::init_multipart(const operation& op) { - rgw_bucket_dir_stats stored_stats; - read_stats(ioctx, oid, stored_stats); + // create (not just prepare) the meta object + const auto meta_key = cls_rgw_obj_key{ + fmt::format("_multipart_{}.2~{}.meta", op.key.name, op.upload_id)}; + const std::string empty_tag; // empty tag enables complete without prepare + const rgw_bucket_entry_ver empty_ver; + rgw_bucket_dir_entry_meta meta_meta; + meta_meta.category = RGWObjCategory::MultiMeta; + int r = index_complete(ioctx, oid, meta_key, empty_tag, CLS_RGW_OP_ADD, + empty_ver, meta_meta, nullptr); + if (r != 0) { + derr << " < failed to create multipart meta key=" << meta_key + << ": " << cpp_strerror(r) << dendl; + return r; + } else { + // account for meta object + auto obj = find_or_create(meta_key); + if (obj->exists) { + unaccount_entry(stats, obj->meta); + } + obj->exists = true; + obj->meta = meta_meta; + account_entry(stats, obj->meta); + } - const rgw_bucket_dir_stats& expected_stats = stats; - ASSERT_EQ(expected_stats, stored_stats); + // prepare part uploads + std::list remove_objs; + size_t part_id = 0; + + size_t remaining = op.meta.size; + while (remaining > max_part_size) { + remaining -= max_part_size; + const auto part_size = std::min(remaining, max_part_size); + const auto part_key = cls_rgw_obj_key{ + fmt::format("_multipart_{}.2~{}.{}", op.key.name, op.upload_id, part_id)}; + part_id++; + + r = index_prepare(ioctx, oid, part_key, op.tag, op.type); + if (r != 0) { + // if part prepare fails, remove the meta object and remove_objs + [[maybe_unused]] int ignored = + index_complete(ioctx, oid, meta_key, empty_tag, CLS_RGW_OP_DEL, + empty_ver, meta_meta, &remove_objs); + derr << " > failed to prepare part key=" << part_key + << " size=" << part_size << dendl; + return r; // return the error from prepare + } + dout(1) << " > prepared part key=" << part_key + << " size=" << part_size << dendl; + remove_objs.push_back(part_key); + } + return 0; +} + +void simulator::complete_multipart(const operation& op) +{ + const std::string empty_tag; // empty tag enables complete without prepare + const rgw_bucket_entry_ver empty_ver; + + // try to finish part uploads + size_t part_id = 0; + std::list remove_objs; + + RGWModifyOp type = op.type; // OP_ADD, or OP_CANCEL for abort + + size_t remaining = op.meta.size; + while (remaining > max_part_size) { + remaining -= max_part_size; + const auto part_size = std::min(remaining, max_part_size); + const auto part_key = cls_rgw_obj_key{ + fmt::format("_multipart_{}.2~{}.{}", op.key.name, op.upload_id, part_id)}; + part_id++; + + // cancel 10% of part uploads (and abort the multipart upload) + constexpr int cancel_percent = 10; + const int result = ceph::util::generate_random_number(0, 99); + if (result < cancel_percent) { + type = CLS_RGW_OP_CANCEL; // abort multipart + dout(1) << " < canceled part key=" << part_key + << " size=" << part_size << dendl; + } else { + rgw_bucket_dir_entry_meta meta; + meta.category = op.meta.category; + meta.size = meta.accounted_size = part_size; + + int r = index_complete(ioctx, oid, part_key, op.tag, op.type, + empty_ver, meta, nullptr); + if (r != 0) { + derr << " < failed to complete part key=" << part_key + << " size=" << meta.size << ": " << cpp_strerror(r) << dendl; + type = CLS_RGW_OP_CANCEL; // abort multipart + } else { + dout(1) << " < completed part key=" << part_key + << " size=" << meta.size << dendl; + // account for successful part upload + auto obj = find_or_create(part_key); + if (obj->exists) { + unaccount_entry(stats, obj->meta); + } + obj->exists = true; + obj->meta = meta; + account_entry(stats, obj->meta); + } + } + remove_objs.push_back(part_key); + } + + // delete the multipart meta object + const auto meta_key = cls_rgw_obj_key{ + fmt::format("_multipart_{}.2~{}.meta", op.key.name, op.upload_id)}; + rgw_bucket_dir_entry_meta meta_meta; + meta_meta.category = RGWObjCategory::MultiMeta; + + int r = index_complete(ioctx, oid, meta_key, empty_tag, CLS_RGW_OP_DEL, + empty_ver, meta_meta, nullptr); + if (r != 0) { + derr << " < failed to remove multipart meta key=" << meta_key + << ": " << cpp_strerror(r) << dendl; + } else { + // unaccount for meta object + auto obj = objects.find(meta_key, std::less{}); + if (obj != objects.end()) { + if (obj->exists) { + unaccount_entry(stats, obj->meta); + } + objects.erase_and_dispose(obj, std::default_delete{}); + } + } + + // create or cancel the head object + r = index_complete(ioctx, oid, op.key, empty_tag, type, + empty_ver, op.meta, &remove_objs); + if (r != 0) { + derr << "< failed to complete multipart upload key=" << op.key + << " upload=" << op.upload_id << " tag=" << op.tag + << " type=" << type << " size=" << op.meta.size + << ": " << cpp_strerror(r) << dendl; + return; + } + + if (type == CLS_RGW_OP_ADD) { + dout(1) << "< completed multipart upload key=" << op.key + << " upload=" << op.upload_id << " tag=" << op.tag + << " type=" << op.type << " size=" << op.meta.size << dendl; + + // account for head stats + auto obj = find_or_create(op.key); + if (obj->exists) { + unaccount_entry(stats, obj->meta); + } + obj->exists = true; + obj->meta = op.meta; + account_entry(stats, obj->meta); + } else { + dout(1) << "< canceled multipart upload key=" << op.key + << " upload=" << op.upload_id << " tag=" << op.tag + << " type=" << op.type << " size=" << op.meta.size << dendl; + } + + // unaccount for remove_objs + for (const auto& part_key : remove_objs) { + auto obj = objects.find(part_key, std::less{}); + if (obj != objects.end()) { + if (obj->exists) { + unaccount_entry(stats, obj->meta); + } + objects.erase_and_dispose(obj, std::default_delete{}); + } + } } TEST(cls_rgw_stats, simulate) { const char* bucket_oid = __func__; - config cfg; - cfg.max_operations = 2048; - cfg.max_entries = 64; - cfg.max_pending = 16; - cfg.max_object_size = 4*1024*1024; // 4M - auto sim = simulator{RadosEnv::ioctx, bucket_oid, cfg}; + auto sim = simulator{RadosEnv::ioctx, bucket_oid}; sim.run(); } -- 2.39.5