return 0;
}
+static int rgw_bi_put_entries(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+ rgw_cls_bi_put_entries_op op;
+ try {
+ auto iter = in->cbegin();
+ decode(op, iter);
+ } catch (const ceph::buffer::error&) {
+ CLS_LOG(0, "ERROR: %s: failed to decode request", __func__);
+ return -EINVAL;
+ }
+
+ const size_t limit = cls_get_config(hctx)->osd_max_omap_entries_per_request;
+ if (op.entries.size() > limit) {
+ int r = -E2BIG;
+ CLS_LOG(0, "ERROR: %s: got too many entries (%zu > %zu), returning %d",
+ __func__, op.entries.size(), limit, r);
+ return r;
+ }
+
+ rgw_bucket_dir_header header;
+ int r = read_bucket_header(hctx, &header);
+ if (r < 0) {
+ CLS_LOG(1, "ERROR: %s: failed to read header", __func__);
+ return r;
+ }
+
+ if (op.check_existing) {
+ // fetch any existing keys and decrement their stats before overwriting
+ std::set<std::string> keys;
+ for (const auto& entry : op.entries) {
+ keys.insert(entry.idx);
+ }
+
+ std::map<std::string, ceph::buffer::list> vals;
+ r = cls_cxx_map_get_vals_by_keys(hctx, keys, &vals);
+ if (r < 0) {
+ CLS_LOG(0, "ERROR: %s: cls_cxx_map_get_vals_by_keys() returned r=%d",
+ __func__, r);
+ return r;
+ }
+
+ for (auto& [idx, data] : vals) {
+ rgw_cls_bi_entry entry;
+ entry.type = bi_type(idx);
+ entry.idx = std::move(idx);
+ entry.data = std::move(data);
+
+ cls_rgw_obj_key key;
+ RGWObjCategory category;
+ rgw_bucket_category_stats stats;
+ const bool account = entry.get_info(&key, &category, &stats);
+ if (account) {
+ auto& dest = header.stats[category];
+ dest.total_size -= stats.total_size;
+ dest.total_size_rounded -= stats.total_size_rounded;
+ dest.num_entries -= stats.num_entries;
+ dest.actual_size -= stats.actual_size;
+ }
+ } // foreach vals
+ } // if op.check_existing
+
+ std::map<std::string, ceph::buffer::list> new_vals;
+
+ for (auto& entry : op.entries) {
+ if (entry.type == BIIndexType::ReshardDeleted) {
+ r = cls_cxx_map_remove_key(hctx, entry.idx);
+ if (r < 0) {
+ CLS_LOG(0, "WARNING: %s: cls_cxx_map_remove_key(%s) returned r=%d",
+ __func__, entry.idx.c_str(), r);
+ } // not fatal
+ continue;
+ }
+
+ cls_rgw_obj_key key;
+ RGWObjCategory category;
+ rgw_bucket_category_stats stats;
+ const bool account = entry.get_info(&key, &category, &stats);
+ if (account) {
+ auto& dest = header.stats[category];
+ dest.total_size += stats.total_size;
+ dest.total_size_rounded += stats.total_size_rounded;
+ dest.num_entries += stats.num_entries;
+ dest.actual_size += stats.actual_size;
+ }
+
+ new_vals.emplace(std::move(entry.idx), std::move(entry.data));
+ }
+
+ r = cls_cxx_map_set_vals(hctx, &new_vals);
+ if (r < 0) {
+ CLS_LOG(0, "ERROR: %s: cls_cxx_map_set_vals() returned r=%d", __func__, r);
+ return r;
+ }
+
+ return write_bucket_header(hctx, &header);
+}
+
/* The plain entries in the bucket index are divided into two regions
* divided by the special entries that begin with 0x80. Those below
* ("Low") are ascii entries. Those above ("High") bring in unicode
cls_method_handle_t h_rgw_bi_get_op;
cls_method_handle_t h_rgw_bi_get_vals_op;
cls_method_handle_t h_rgw_bi_put_op;
+ cls_method_handle_t h_rgw_bi_put_entries_op;
cls_method_handle_t h_rgw_bi_list_op;
cls_method_handle_t h_rgw_reshard_log_trim_op;
cls_method_handle_t h_rgw_bi_log_list_op;
cls_register_cxx_method(h_class, RGW_BI_GET, CLS_METHOD_RD, rgw_bi_get_op, &h_rgw_bi_get_op);
cls_register_cxx_method(h_class, RGW_BI_GET_VALS, CLS_METHOD_RD, rgw_bi_get_vals_op, &h_rgw_bi_get_vals_op);
cls_register_cxx_method(h_class, RGW_BI_PUT, CLS_METHOD_RD | CLS_METHOD_WR, rgw_bi_put_op, &h_rgw_bi_put_op);
+ cls_register_cxx_method(h_class, RGW_BI_PUT_ENTRIES, CLS_METHOD_RD | CLS_METHOD_WR, rgw_bi_put_entries, &h_rgw_bi_put_entries_op);
cls_register_cxx_method(h_class, RGW_BI_LIST, CLS_METHOD_RD, rgw_bi_list_op, &h_rgw_bi_list_op);
cls_register_cxx_method(h_class, RGW_RESHARD_LOG_TRIM, CLS_METHOD_RD | CLS_METHOD_WR, rgw_reshard_log_trim_op, &h_rgw_reshard_log_trim_op);
op.exec(RGW_CLASS, RGW_BI_PUT, in);
}
+void cls_rgw_bi_put_entries(librados::ObjectWriteOperation& op,
+ std::vector<rgw_cls_bi_entry> entries,
+ bool check_existing)
+{
+ const auto call = rgw_cls_bi_put_entries_op{
+ .entries = std::move(entries),
+ .check_existing = check_existing
+ };
+
+ bufferlist in;
+ encode(call, in);
+
+ op.exec(RGW_CLASS, RGW_BI_PUT_ENTRIES, in);
+}
+
/* nb: any entries passed in are replaced with the results of the cls
* call, so caller does not need to clear entries between calls
*/
std::list<rgw_cls_bi_entry> *entries);
int cls_rgw_bi_put(librados::IoCtx& io_ctx, const std::string oid, const rgw_cls_bi_entry& entry);
void cls_rgw_bi_put(librados::ObjectWriteOperation& op, const std::string oid, const rgw_cls_bi_entry& entry);
+// Write the given array of index entries and update bucket stats accordingly.
+// If existing entries may be overwritten, pass check_existing=true to decrement
+// their stats first.
+void cls_rgw_bi_put_entries(librados::ObjectWriteOperation& op,
+ std::vector<rgw_cls_bi_entry> entries,
+ bool check_existing);
int cls_rgw_bi_list(librados::IoCtx& io_ctx, const std::string& oid,
const std::string& name, const std::string& marker, uint32_t max,
std::list<rgw_cls_bi_entry> *entries, bool *is_truncated, bool reshardlog = false);
#define RGW_BI_GET "bi_get"
#define RGW_BI_GET_VALS "bi_get_vals"
#define RGW_BI_PUT "bi_put"
+#define RGW_BI_PUT_ENTRIES "bi_put_entries"
#define RGW_BI_LIST "bi_list"
#define RGW_RESHARD_LOG_TRIM "reshard_log_trim"
void cls_rgw_get_bucket_resharding_op::dump(Formatter *f) const
{
}
+
+void rgw_cls_bi_put_entries_op::dump(Formatter *f) const
+{
+ encode_json("entries", entries, f);
+ encode_json("check_existing", check_existing, f);
+}
};
WRITE_CLASS_ENCODER(rgw_cls_bi_put_op)
+struct rgw_cls_bi_put_entries_op {
+ std::vector<rgw_cls_bi_entry> entries;
+ bool check_existing = false;
+
+ void encode(ceph::buffer::list& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(entries, bl);
+ encode(check_existing, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(ceph::buffer::list::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(entries, bl);
+ decode(check_existing, bl);
+ DECODE_FINISH(bl);
+ }
+
+ void dump(ceph::Formatter *f) const;
+
+ static void generate_test_instances(std::list<rgw_cls_bi_put_entries_op*>& o) {
+ o.push_back(new rgw_cls_bi_put_entries_op);
+ o.push_back(new rgw_cls_bi_put_entries_op);
+ o.back()->entries.push_back({.idx = "entry"});
+ o.back()->check_existing = true;
+ }
+};
+WRITE_CLASS_ENCODER(rgw_cls_bi_put_entries_op)
+
struct rgw_cls_bi_list_op {
uint32_t max;
std::string name_filter; // limit result to one object and its instances
index_complete(ioctx, bucket_oid, CLS_RGW_OP_DEL, tag, 2, obj1, meta);
reshardlog_entries(ioctx, bucket_oid, 2u);
}
+
+TEST_F(cls_rgw, bi_put_entries)
+{
+ const string src_bucket = str_int("bi_put_entries", 0);
+ const string dst_bucket = str_int("bi_put_entries", 1);
+
+ const cls_rgw_obj_key obj1 = str_int("obj", 1);
+ const cls_rgw_obj_key obj2 = str_int("obj", 2);
+ const cls_rgw_obj_key obj3 = str_int("obj", 3);
+ const cls_rgw_obj_key obj4 = str_int("obj", 4);
+ const string tag = str_int("tag", 0);
+ const string loc = str_int("loc", 0);
+ auto meta = rgw_bucket_dir_entry_meta{
+ .category = RGWObjCategory::Main, .size = 8192};
+
+ // prepare src_bucket and add two objects
+ {
+ ObjectWriteOperation op;
+ cls_rgw_bucket_init_index2(op);
+ ASSERT_EQ(0, ioctx.operate(src_bucket, &op));
+
+ index_prepare(ioctx, src_bucket, CLS_RGW_OP_ADD, tag, obj1, loc);
+ index_complete(ioctx, src_bucket, CLS_RGW_OP_ADD, tag, 1, obj1, meta);
+
+ index_prepare(ioctx, src_bucket, CLS_RGW_OP_ADD, tag, obj2, loc);
+ index_complete(ioctx, src_bucket, CLS_RGW_OP_ADD, tag, 2, obj2, meta);
+
+ test_stats(ioctx, src_bucket, RGWObjCategory::Main, 2, 16384);
+ }
+
+ // prepare dst_bucket and copy the bi entries
+ {
+ ObjectWriteOperation op;
+ cls_rgw_bucket_init_index2(op);
+ ASSERT_EQ(0, ioctx.operate(dst_bucket, &op));
+ }
+ {
+ list<rgw_cls_bi_entry> src_entries;
+ bool truncated{false};
+ ASSERT_EQ(0, cls_rgw_bi_list(ioctx, src_bucket, "", "", 128,
+ &src_entries, &truncated));
+ ASSERT_EQ(2u, src_entries.size());
+
+ ObjectWriteOperation op;
+ cls_rgw_bi_put_entries(op, {src_entries.begin(), src_entries.end()}, true);
+ ASSERT_EQ(0, ioctx.operate(dst_bucket, &op));
+
+ test_stats(ioctx, dst_bucket, RGWObjCategory::Main, 2, 16384);
+ }
+
+ {
+ // start reshard on src_bucket
+ set_reshard_status(ioctx, src_bucket, cls_rgw_reshard_status::IN_LOGRECORD);
+
+ // delete obj1 and log a ReshardDeleted entry
+ index_prepare(ioctx, src_bucket, CLS_RGW_OP_DEL, tag, obj1, loc);
+ index_complete(ioctx, src_bucket, CLS_RGW_OP_DEL, tag, 3, obj1, meta);
+
+ // overwrite obj2 and record its reshardlog entry
+ index_prepare(ioctx, src_bucket, CLS_RGW_OP_ADD, tag, obj2, loc);
+ index_complete(ioctx, src_bucket, CLS_RGW_OP_ADD, tag, 4, obj2, meta);
+
+ // add two more objects
+ index_prepare(ioctx, src_bucket, CLS_RGW_OP_ADD, tag, obj3, loc);
+ index_complete(ioctx, src_bucket, CLS_RGW_OP_ADD, tag, 5, obj3, meta);
+
+ index_prepare(ioctx, src_bucket, CLS_RGW_OP_ADD, tag, obj4, loc);
+ index_complete(ioctx, src_bucket, CLS_RGW_OP_ADD, tag, 6, obj4, meta);
+
+ test_stats(ioctx, src_bucket, RGWObjCategory::Main, 3, 24576);
+ }
+
+ // copy the reshardlog entries from src_bucket to dst_bucket
+ {
+ list<rgw_cls_bi_entry> src_entries;
+ bool truncated{false};
+ const bool reshardlog = true;
+ ASSERT_EQ(0, cls_rgw_bi_list(ioctx, src_bucket, "", "", 128,
+ &src_entries, &truncated, reshardlog));
+ ASSERT_EQ(4u, src_entries.size());
+
+ const auto& entry = src_entries.front();
+ EXPECT_EQ(BIIndexType::ReshardDeleted, entry.type);
+
+ ObjectWriteOperation op;
+ cls_rgw_bi_put_entries(op, {src_entries.begin(), src_entries.end()}, true);
+ ASSERT_EQ(0, ioctx.operate(dst_bucket, &op));
+
+ test_stats(ioctx, dst_bucket, RGWObjCategory::Main, 3, 24576);
+ }
+}
TYPE(rgw_cls_bi_list_op)
TYPE(rgw_cls_bi_list_ret)
TYPE(rgw_cls_bi_put_op)
+TYPE(rgw_cls_bi_put_entries_op)
TYPE(rgw_cls_obj_check_attrs_prefix)
TYPE(rgw_cls_obj_remove_op)
TYPE(rgw_cls_obj_store_pg_ver_op)