From e11480aa4c550b98e1cc5f584d0d7a697c100fdf Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Tue, 3 Sep 2024 15:16:24 -0400 Subject: [PATCH] cls/rgw: add bulk cls_rgw_bi_put_entries() op for reshard adds a bulk api for reshard to write entries to the target index shard object. this takes care of the bucket stats updates so that rgw's reshard logic doesn't have to worry about it Signed-off-by: Casey Bodley --- src/cls/rgw/cls_rgw.cc | 99 +++++++++++++++++++++++++++++ src/cls/rgw/cls_rgw_client.cc | 15 +++++ src/cls/rgw/cls_rgw_client.h | 6 ++ src/cls/rgw/cls_rgw_const.h | 1 + src/cls/rgw/cls_rgw_ops.cc | 6 ++ src/cls/rgw/cls_rgw_ops.h | 29 +++++++++ src/test/cls_rgw/test_cls_rgw.cc | 91 ++++++++++++++++++++++++++ src/tools/ceph-dencoder/rgw_types.h | 1 + 8 files changed, 248 insertions(+) diff --git a/src/cls/rgw/cls_rgw.cc b/src/cls/rgw/cls_rgw.cc index 30b136e2672..3c6189c3743 100644 --- a/src/cls/rgw/cls_rgw.cc +++ b/src/cls/rgw/cls_rgw.cc @@ -2880,6 +2880,103 @@ static int rgw_bi_put_op(cls_method_context_t hctx, bufferlist *in, bufferlist * return 0; } +static int rgw_bi_put_entries(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + rgw_cls_bi_put_entries_op op; + try { + auto iter = in->cbegin(); + decode(op, iter); + } catch (const ceph::buffer::error&) { + CLS_LOG(0, "ERROR: %s: failed to decode request", __func__); + return -EINVAL; + } + + const size_t limit = cls_get_config(hctx)->osd_max_omap_entries_per_request; + if (op.entries.size() > limit) { + int r = -E2BIG; + CLS_LOG(0, "ERROR: %s: got too many entries (%zu > %zu), returning %d", + __func__, op.entries.size(), limit, r); + return r; + } + + rgw_bucket_dir_header header; + int r = read_bucket_header(hctx, &header); + if (r < 0) { + CLS_LOG(1, "ERROR: %s: failed to read header", __func__); + return r; + } + + if (op.check_existing) { + // fetch any existing keys and decrement their stats before overwriting + std::set keys; + for (const auto& entry : op.entries) { + keys.insert(entry.idx); + } + + std::map vals; + r = cls_cxx_map_get_vals_by_keys(hctx, keys, &vals); + if (r < 0) { + CLS_LOG(0, "ERROR: %s: cls_cxx_map_get_vals_by_keys() returned r=%d", + __func__, r); + return r; + } + + for (auto& [idx, data] : vals) { + rgw_cls_bi_entry entry; + entry.type = bi_type(idx); + entry.idx = std::move(idx); + entry.data = std::move(data); + + cls_rgw_obj_key key; + RGWObjCategory category; + rgw_bucket_category_stats stats; + const bool account = entry.get_info(&key, &category, &stats); + if (account) { + auto& dest = header.stats[category]; + dest.total_size -= stats.total_size; + dest.total_size_rounded -= stats.total_size_rounded; + dest.num_entries -= stats.num_entries; + dest.actual_size -= stats.actual_size; + } + } // foreach vals + } // if op.check_existing + + std::map new_vals; + + for (auto& entry : op.entries) { + if (entry.type == BIIndexType::ReshardDeleted) { + r = cls_cxx_map_remove_key(hctx, entry.idx); + if (r < 0) { + CLS_LOG(0, "WARNING: %s: cls_cxx_map_remove_key(%s) returned r=%d", + __func__, entry.idx.c_str(), r); + } // not fatal + continue; + } + + cls_rgw_obj_key key; + RGWObjCategory category; + rgw_bucket_category_stats stats; + const bool account = entry.get_info(&key, &category, &stats); + if (account) { + auto& dest = header.stats[category]; + dest.total_size += stats.total_size; + dest.total_size_rounded += stats.total_size_rounded; + dest.num_entries += stats.num_entries; + dest.actual_size += stats.actual_size; + } + + new_vals.emplace(std::move(entry.idx), std::move(entry.data)); + } + + r = cls_cxx_map_set_vals(hctx, &new_vals); + if (r < 0) { + CLS_LOG(0, "ERROR: %s: cls_cxx_map_set_vals() returned r=%d", __func__, r); + return r; + } + + return write_bucket_header(hctx, &header); +} + /* The plain entries in the bucket index are divided into two regions * divided by the special entries that begin with 0x80. Those below * ("Low") are ascii entries. Those above ("High") bring in unicode @@ -4985,6 +5082,7 @@ CLS_INIT(rgw) cls_method_handle_t h_rgw_bi_get_op; cls_method_handle_t h_rgw_bi_get_vals_op; cls_method_handle_t h_rgw_bi_put_op; + cls_method_handle_t h_rgw_bi_put_entries_op; cls_method_handle_t h_rgw_bi_list_op; cls_method_handle_t h_rgw_reshard_log_trim_op; cls_method_handle_t h_rgw_bi_log_list_op; @@ -5043,6 +5141,7 @@ CLS_INIT(rgw) cls_register_cxx_method(h_class, RGW_BI_GET, CLS_METHOD_RD, rgw_bi_get_op, &h_rgw_bi_get_op); cls_register_cxx_method(h_class, RGW_BI_GET_VALS, CLS_METHOD_RD, rgw_bi_get_vals_op, &h_rgw_bi_get_vals_op); cls_register_cxx_method(h_class, RGW_BI_PUT, CLS_METHOD_RD | CLS_METHOD_WR, rgw_bi_put_op, &h_rgw_bi_put_op); + cls_register_cxx_method(h_class, RGW_BI_PUT_ENTRIES, CLS_METHOD_RD | CLS_METHOD_WR, rgw_bi_put_entries, &h_rgw_bi_put_entries_op); cls_register_cxx_method(h_class, RGW_BI_LIST, CLS_METHOD_RD, rgw_bi_list_op, &h_rgw_bi_list_op); cls_register_cxx_method(h_class, RGW_RESHARD_LOG_TRIM, CLS_METHOD_RD | CLS_METHOD_WR, rgw_reshard_log_trim_op, &h_rgw_reshard_log_trim_op); diff --git a/src/cls/rgw/cls_rgw_client.cc b/src/cls/rgw/cls_rgw_client.cc index e4c0ec7be0a..be13a51d330 100644 --- a/src/cls/rgw/cls_rgw_client.cc +++ b/src/cls/rgw/cls_rgw_client.cc @@ -523,6 +523,21 @@ void cls_rgw_bi_put(ObjectWriteOperation& op, const string oid, const rgw_cls_bi op.exec(RGW_CLASS, RGW_BI_PUT, in); } +void cls_rgw_bi_put_entries(librados::ObjectWriteOperation& op, + std::vector entries, + bool check_existing) +{ + const auto call = rgw_cls_bi_put_entries_op{ + .entries = std::move(entries), + .check_existing = check_existing + }; + + bufferlist in; + encode(call, in); + + op.exec(RGW_CLASS, RGW_BI_PUT_ENTRIES, in); +} + /* nb: any entries passed in are replaced with the results of the cls * call, so caller does not need to clear entries between calls */ diff --git a/src/cls/rgw/cls_rgw_client.h b/src/cls/rgw/cls_rgw_client.h index 84431055ded..66aa12f5bd0 100644 --- a/src/cls/rgw/cls_rgw_client.h +++ b/src/cls/rgw/cls_rgw_client.h @@ -389,6 +389,12 @@ int cls_rgw_bi_get_vals(librados::IoCtx& io_ctx, const std::string oid, std::list *entries); int cls_rgw_bi_put(librados::IoCtx& io_ctx, const std::string oid, const rgw_cls_bi_entry& entry); void cls_rgw_bi_put(librados::ObjectWriteOperation& op, const std::string oid, const rgw_cls_bi_entry& entry); +// Write the given array of index entries and update bucket stats accordingly. +// If existing entries may be overwritten, pass check_existing=true to decrement +// their stats first. +void cls_rgw_bi_put_entries(librados::ObjectWriteOperation& op, + std::vector entries, + bool check_existing); int cls_rgw_bi_list(librados::IoCtx& io_ctx, const std::string& oid, const std::string& name, const std::string& marker, uint32_t max, std::list *entries, bool *is_truncated, bool reshardlog = false); diff --git a/src/cls/rgw/cls_rgw_const.h b/src/cls/rgw/cls_rgw_const.h index 9a4e368575d..d67274291d6 100644 --- a/src/cls/rgw/cls_rgw_const.h +++ b/src/cls/rgw/cls_rgw_const.h @@ -35,6 +35,7 @@ constexpr int RGWBIAdvanceAndRetryError = -EFBIG; #define RGW_BI_GET "bi_get" #define RGW_BI_GET_VALS "bi_get_vals" #define RGW_BI_PUT "bi_put" +#define RGW_BI_PUT_ENTRIES "bi_put_entries" #define RGW_BI_LIST "bi_list" #define RGW_RESHARD_LOG_TRIM "reshard_log_trim" diff --git a/src/cls/rgw/cls_rgw_ops.cc b/src/cls/rgw/cls_rgw_ops.cc index d32448517ce..2c33a2691b5 100644 --- a/src/cls/rgw/cls_rgw_ops.cc +++ b/src/cls/rgw/cls_rgw_ops.cc @@ -580,3 +580,9 @@ void cls_rgw_get_bucket_resharding_op::generate_test_instances( void cls_rgw_get_bucket_resharding_op::dump(Formatter *f) const { } + +void rgw_cls_bi_put_entries_op::dump(Formatter *f) const +{ + encode_json("entries", entries, f); + encode_json("check_existing", check_existing, f); +} diff --git a/src/cls/rgw/cls_rgw_ops.h b/src/cls/rgw/cls_rgw_ops.h index 3e36d96c50e..9dc7343f68d 100644 --- a/src/cls/rgw/cls_rgw_ops.h +++ b/src/cls/rgw/cls_rgw_ops.h @@ -777,6 +777,35 @@ struct rgw_cls_bi_put_op { }; WRITE_CLASS_ENCODER(rgw_cls_bi_put_op) +struct rgw_cls_bi_put_entries_op { + std::vector entries; + bool check_existing = false; + + void encode(ceph::buffer::list& bl) const { + ENCODE_START(1, 1, bl); + encode(entries, bl); + encode(check_existing, bl); + ENCODE_FINISH(bl); + } + + void decode(ceph::buffer::list::const_iterator& bl) { + DECODE_START(1, bl); + decode(entries, bl); + decode(check_existing, bl); + DECODE_FINISH(bl); + } + + void dump(ceph::Formatter *f) const; + + static void generate_test_instances(std::list& o) { + o.push_back(new rgw_cls_bi_put_entries_op); + o.push_back(new rgw_cls_bi_put_entries_op); + o.back()->entries.push_back({.idx = "entry"}); + o.back()->check_existing = true; + } +}; +WRITE_CLASS_ENCODER(rgw_cls_bi_put_entries_op) + struct rgw_cls_bi_list_op { uint32_t max; std::string name_filter; // limit result to one object and its instances diff --git a/src/test/cls_rgw/test_cls_rgw.cc b/src/test/cls_rgw/test_cls_rgw.cc index 66b28565c44..23c3576429a 100644 --- a/src/test/cls_rgw/test_cls_rgw.cc +++ b/src/test/cls_rgw/test_cls_rgw.cc @@ -1479,3 +1479,94 @@ TEST_F(cls_rgw, reshardlog_num) index_complete(ioctx, bucket_oid, CLS_RGW_OP_DEL, tag, 2, obj1, meta); reshardlog_entries(ioctx, bucket_oid, 2u); } + +TEST_F(cls_rgw, bi_put_entries) +{ + const string src_bucket = str_int("bi_put_entries", 0); + const string dst_bucket = str_int("bi_put_entries", 1); + + const cls_rgw_obj_key obj1 = str_int("obj", 1); + const cls_rgw_obj_key obj2 = str_int("obj", 2); + const cls_rgw_obj_key obj3 = str_int("obj", 3); + const cls_rgw_obj_key obj4 = str_int("obj", 4); + const string tag = str_int("tag", 0); + const string loc = str_int("loc", 0); + auto meta = rgw_bucket_dir_entry_meta{ + .category = RGWObjCategory::Main, .size = 8192}; + + // prepare src_bucket and add two objects + { + ObjectWriteOperation op; + cls_rgw_bucket_init_index2(op); + ASSERT_EQ(0, ioctx.operate(src_bucket, &op)); + + index_prepare(ioctx, src_bucket, CLS_RGW_OP_ADD, tag, obj1, loc); + index_complete(ioctx, src_bucket, CLS_RGW_OP_ADD, tag, 1, obj1, meta); + + index_prepare(ioctx, src_bucket, CLS_RGW_OP_ADD, tag, obj2, loc); + index_complete(ioctx, src_bucket, CLS_RGW_OP_ADD, tag, 2, obj2, meta); + + test_stats(ioctx, src_bucket, RGWObjCategory::Main, 2, 16384); + } + + // prepare dst_bucket and copy the bi entries + { + ObjectWriteOperation op; + cls_rgw_bucket_init_index2(op); + ASSERT_EQ(0, ioctx.operate(dst_bucket, &op)); + } + { + list src_entries; + bool truncated{false}; + ASSERT_EQ(0, cls_rgw_bi_list(ioctx, src_bucket, "", "", 128, + &src_entries, &truncated)); + ASSERT_EQ(2u, src_entries.size()); + + ObjectWriteOperation op; + cls_rgw_bi_put_entries(op, {src_entries.begin(), src_entries.end()}, true); + ASSERT_EQ(0, ioctx.operate(dst_bucket, &op)); + + test_stats(ioctx, dst_bucket, RGWObjCategory::Main, 2, 16384); + } + + { + // start reshard on src_bucket + set_reshard_status(ioctx, src_bucket, cls_rgw_reshard_status::IN_LOGRECORD); + + // delete obj1 and log a ReshardDeleted entry + index_prepare(ioctx, src_bucket, CLS_RGW_OP_DEL, tag, obj1, loc); + index_complete(ioctx, src_bucket, CLS_RGW_OP_DEL, tag, 3, obj1, meta); + + // overwrite obj2 and record its reshardlog entry + index_prepare(ioctx, src_bucket, CLS_RGW_OP_ADD, tag, obj2, loc); + index_complete(ioctx, src_bucket, CLS_RGW_OP_ADD, tag, 4, obj2, meta); + + // add two more objects + index_prepare(ioctx, src_bucket, CLS_RGW_OP_ADD, tag, obj3, loc); + index_complete(ioctx, src_bucket, CLS_RGW_OP_ADD, tag, 5, obj3, meta); + + index_prepare(ioctx, src_bucket, CLS_RGW_OP_ADD, tag, obj4, loc); + index_complete(ioctx, src_bucket, CLS_RGW_OP_ADD, tag, 6, obj4, meta); + + test_stats(ioctx, src_bucket, RGWObjCategory::Main, 3, 24576); + } + + // copy the reshardlog entries from src_bucket to dst_bucket + { + list src_entries; + bool truncated{false}; + const bool reshardlog = true; + ASSERT_EQ(0, cls_rgw_bi_list(ioctx, src_bucket, "", "", 128, + &src_entries, &truncated, reshardlog)); + ASSERT_EQ(4u, src_entries.size()); + + const auto& entry = src_entries.front(); + EXPECT_EQ(BIIndexType::ReshardDeleted, entry.type); + + ObjectWriteOperation op; + cls_rgw_bi_put_entries(op, {src_entries.begin(), src_entries.end()}, true); + ASSERT_EQ(0, ioctx.operate(dst_bucket, &op)); + + test_stats(ioctx, dst_bucket, RGWObjCategory::Main, 3, 24576); + } +} diff --git a/src/tools/ceph-dencoder/rgw_types.h b/src/tools/ceph-dencoder/rgw_types.h index af5ebb2f280..7575a8f00ce 100644 --- a/src/tools/ceph-dencoder/rgw_types.h +++ b/src/tools/ceph-dencoder/rgw_types.h @@ -99,6 +99,7 @@ TYPE(rgw_cls_bi_get_ret) TYPE(rgw_cls_bi_list_op) TYPE(rgw_cls_bi_list_ret) TYPE(rgw_cls_bi_put_op) +TYPE(rgw_cls_bi_put_entries_op) TYPE(rgw_cls_obj_check_attrs_prefix) TYPE(rgw_cls_obj_remove_op) TYPE(rgw_cls_obj_store_pg_ver_op) -- 2.39.5