]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
cls/rgw: add bulk cls_rgw_bi_put_entries() op for reshard
authorCasey Bodley <cbodley@redhat.com>
Tue, 3 Sep 2024 19:16:24 +0000 (15:16 -0400)
committerCasey Bodley <cbodley@redhat.com>
Thu, 5 Sep 2024 17:23:41 +0000 (13:23 -0400)
adds a bulk api for reshard to write entries to the target index shard
object. this takes care of the bucket stats updates so that rgw's
reshard logic doesn't have to worry about it

Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/cls/rgw/cls_rgw.cc
src/cls/rgw/cls_rgw_client.cc
src/cls/rgw/cls_rgw_client.h
src/cls/rgw/cls_rgw_const.h
src/cls/rgw/cls_rgw_ops.cc
src/cls/rgw/cls_rgw_ops.h
src/test/cls_rgw/test_cls_rgw.cc
src/tools/ceph-dencoder/rgw_types.h

index 30b136e267263c8c69a2aff0ccc3af3dee41661f..3c6189c37438340c4c5757b9cdcbe41231d86098 100644 (file)
@@ -2880,6 +2880,103 @@ static int rgw_bi_put_op(cls_method_context_t hctx, bufferlist *in, bufferlist *
   return 0;
 }
 
+static int rgw_bi_put_entries(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+  rgw_cls_bi_put_entries_op op;
+  try {
+    auto iter = in->cbegin();
+    decode(op, iter);
+  } catch (const ceph::buffer::error&) {
+    CLS_LOG(0, "ERROR: %s: failed to decode request", __func__);
+    return -EINVAL;
+  }
+
+  const size_t limit = cls_get_config(hctx)->osd_max_omap_entries_per_request;
+  if (op.entries.size() > limit) {
+    int r = -E2BIG;
+    CLS_LOG(0, "ERROR: %s: got too many entries (%zu > %zu), returning %d",
+            __func__, op.entries.size(), limit, r);
+    return r;
+  }
+
+  rgw_bucket_dir_header header;
+  int r = read_bucket_header(hctx, &header);
+  if (r < 0) {
+    CLS_LOG(1, "ERROR: %s: failed to read header", __func__);
+    return r;
+  }
+
+  if (op.check_existing) {
+    // fetch any existing keys and decrement their stats before overwriting
+    std::set<std::string> keys;
+    for (const auto& entry : op.entries) {
+      keys.insert(entry.idx);
+    }
+
+    std::map<std::string, ceph::buffer::list> vals;
+    r = cls_cxx_map_get_vals_by_keys(hctx, keys, &vals);
+    if (r < 0) {
+      CLS_LOG(0, "ERROR: %s: cls_cxx_map_get_vals_by_keys() returned r=%d",
+              __func__, r);
+      return r;
+    }
+
+    for (auto& [idx, data] : vals) {
+      rgw_cls_bi_entry entry;
+      entry.type = bi_type(idx);
+      entry.idx = std::move(idx);
+      entry.data = std::move(data);
+
+      cls_rgw_obj_key key;
+      RGWObjCategory category;
+      rgw_bucket_category_stats stats;
+      const bool account = entry.get_info(&key, &category, &stats);
+      if (account) {
+        auto& dest = header.stats[category];
+        dest.total_size -= stats.total_size;
+        dest.total_size_rounded -= stats.total_size_rounded;
+        dest.num_entries -= stats.num_entries;
+        dest.actual_size -= stats.actual_size;
+      }
+    } // foreach vals
+  } // if op.check_existing
+
+  std::map<std::string, ceph::buffer::list> new_vals;
+
+  for (auto& entry : op.entries) {
+    if (entry.type == BIIndexType::ReshardDeleted) {
+      r = cls_cxx_map_remove_key(hctx, entry.idx);
+      if (r < 0) {
+        CLS_LOG(0, "WARNING: %s: cls_cxx_map_remove_key(%s) returned r=%d",
+                __func__, entry.idx.c_str(), r);
+      } // not fatal
+      continue;
+    }
+
+    cls_rgw_obj_key key;
+    RGWObjCategory category;
+    rgw_bucket_category_stats stats;
+    const bool account = entry.get_info(&key, &category, &stats);
+    if (account) {
+      auto& dest = header.stats[category];
+      dest.total_size += stats.total_size;
+      dest.total_size_rounded += stats.total_size_rounded;
+      dest.num_entries += stats.num_entries;
+      dest.actual_size += stats.actual_size;
+    }
+
+    new_vals.emplace(std::move(entry.idx), std::move(entry.data));
+  }
+
+  r = cls_cxx_map_set_vals(hctx, &new_vals);
+  if (r < 0) {
+    CLS_LOG(0, "ERROR: %s: cls_cxx_map_set_vals() returned r=%d", __func__, r);
+    return r;
+  }
+
+  return write_bucket_header(hctx, &header);
+}
+
 /* The plain entries in the bucket index are divided into two regions
  * divided by the special entries that begin with 0x80. Those below
  * ("Low") are ascii entries. Those above ("High") bring in unicode
@@ -4985,6 +5082,7 @@ CLS_INIT(rgw)
   cls_method_handle_t h_rgw_bi_get_op;
   cls_method_handle_t h_rgw_bi_get_vals_op;
   cls_method_handle_t h_rgw_bi_put_op;
+  cls_method_handle_t h_rgw_bi_put_entries_op;
   cls_method_handle_t h_rgw_bi_list_op;
   cls_method_handle_t h_rgw_reshard_log_trim_op;
   cls_method_handle_t h_rgw_bi_log_list_op;
@@ -5043,6 +5141,7 @@ CLS_INIT(rgw)
   cls_register_cxx_method(h_class, RGW_BI_GET, CLS_METHOD_RD, rgw_bi_get_op, &h_rgw_bi_get_op);
   cls_register_cxx_method(h_class, RGW_BI_GET_VALS, CLS_METHOD_RD, rgw_bi_get_vals_op, &h_rgw_bi_get_vals_op);
   cls_register_cxx_method(h_class, RGW_BI_PUT, CLS_METHOD_RD | CLS_METHOD_WR, rgw_bi_put_op, &h_rgw_bi_put_op);
+  cls_register_cxx_method(h_class, RGW_BI_PUT_ENTRIES, CLS_METHOD_RD | CLS_METHOD_WR, rgw_bi_put_entries, &h_rgw_bi_put_entries_op);
   cls_register_cxx_method(h_class, RGW_BI_LIST, CLS_METHOD_RD, rgw_bi_list_op, &h_rgw_bi_list_op);
   cls_register_cxx_method(h_class, RGW_RESHARD_LOG_TRIM, CLS_METHOD_RD | CLS_METHOD_WR, rgw_reshard_log_trim_op, &h_rgw_reshard_log_trim_op);
 
index e4c0ec7be0ab807dfb09b244128f4f12211a7aac..be13a51d33047a44106d2448f2c69c4132510abf 100644 (file)
@@ -523,6 +523,21 @@ void cls_rgw_bi_put(ObjectWriteOperation& op, const string oid, const rgw_cls_bi
   op.exec(RGW_CLASS, RGW_BI_PUT, in);
 }
 
+void cls_rgw_bi_put_entries(librados::ObjectWriteOperation& op,
+                            std::vector<rgw_cls_bi_entry> entries,
+                            bool check_existing)
+{
+  const auto call = rgw_cls_bi_put_entries_op{
+    .entries = std::move(entries),
+    .check_existing = check_existing
+  };
+
+  bufferlist in;
+  encode(call, in);
+
+  op.exec(RGW_CLASS, RGW_BI_PUT_ENTRIES, in);
+}
+
 /* nb: any entries passed in are replaced with the results of the cls
  * call, so caller does not need to clear entries between calls
  */
index 84431055deda40955733989f23649d97fa1ce89e..66aa12f5bd04aae33d737294881359f5bc54c3e4 100644 (file)
@@ -389,6 +389,12 @@ int cls_rgw_bi_get_vals(librados::IoCtx& io_ctx, const std::string oid,
                         std::list<rgw_cls_bi_entry> *entries);
 int cls_rgw_bi_put(librados::IoCtx& io_ctx, const std::string oid, const rgw_cls_bi_entry& entry);
 void cls_rgw_bi_put(librados::ObjectWriteOperation& op, const std::string oid, const rgw_cls_bi_entry& entry);
+// Write the given array of index entries and update bucket stats accordingly.
+// If existing entries may be overwritten, pass check_existing=true to decrement
+// their stats first.
+void cls_rgw_bi_put_entries(librados::ObjectWriteOperation& op,
+                            std::vector<rgw_cls_bi_entry> entries,
+                            bool check_existing);
 int cls_rgw_bi_list(librados::IoCtx& io_ctx, const std::string& oid,
                    const std::string& name, const std::string& marker, uint32_t max,
                    std::list<rgw_cls_bi_entry> *entries, bool *is_truncated, bool reshardlog = false);
index 9a4e368575d9965cef8fbfee732b7196a3404885..d67274291d67f4429d40cd161c90b0c37bae0803 100644 (file)
@@ -35,6 +35,7 @@ constexpr int RGWBIAdvanceAndRetryError = -EFBIG;
 #define RGW_BI_GET "bi_get"
 #define RGW_BI_GET_VALS "bi_get_vals"
 #define RGW_BI_PUT "bi_put"
+#define RGW_BI_PUT_ENTRIES "bi_put_entries"
 #define RGW_BI_LIST "bi_list"
 
 #define RGW_RESHARD_LOG_TRIM "reshard_log_trim"
index d32448517ce3d016a1c5f2fb40f98877d6dd8b62..2c33a2691b5bb2f783e93793cf54201361aa3d73 100644 (file)
@@ -580,3 +580,9 @@ void cls_rgw_get_bucket_resharding_op::generate_test_instances(
 void cls_rgw_get_bucket_resharding_op::dump(Formatter *f) const
 {
 }
+
+void rgw_cls_bi_put_entries_op::dump(Formatter *f) const
+{
+  encode_json("entries", entries, f);
+  encode_json("check_existing", check_existing, f);
+}
index 3e36d96c50eb6fb1f8b96a493ce752e1a8b40fa3..9dc7343f68db9d60a273056693d3af43b739caf3 100644 (file)
@@ -777,6 +777,35 @@ struct rgw_cls_bi_put_op {
 };
 WRITE_CLASS_ENCODER(rgw_cls_bi_put_op)
 
+struct rgw_cls_bi_put_entries_op {
+  std::vector<rgw_cls_bi_entry> entries;
+  bool check_existing = false;
+
+  void encode(ceph::buffer::list& bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(entries, bl);
+    encode(check_existing, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(ceph::buffer::list::const_iterator& bl) {
+    DECODE_START(1, bl);
+    decode(entries, bl);
+    decode(check_existing, bl);
+    DECODE_FINISH(bl);
+  }
+
+  void dump(ceph::Formatter *f) const;
+
+  static void generate_test_instances(std::list<rgw_cls_bi_put_entries_op*>& o) {
+    o.push_back(new rgw_cls_bi_put_entries_op);
+    o.push_back(new rgw_cls_bi_put_entries_op);
+    o.back()->entries.push_back({.idx = "entry"});
+    o.back()->check_existing = true;
+  }
+};
+WRITE_CLASS_ENCODER(rgw_cls_bi_put_entries_op)
+
 struct rgw_cls_bi_list_op {
   uint32_t max;
   std::string name_filter; // limit result to one object and its instances
index 66b28565c44646be3c797dd3fd2aa191a97ecbf5..23c3576429a694ce1bd661a8ac8595e4255e5c35 100644 (file)
@@ -1479,3 +1479,94 @@ TEST_F(cls_rgw, reshardlog_num)
   index_complete(ioctx, bucket_oid, CLS_RGW_OP_DEL, tag, 2, obj1, meta);
   reshardlog_entries(ioctx, bucket_oid, 2u);
 }
+
+TEST_F(cls_rgw, bi_put_entries)
+{
+  const string src_bucket = str_int("bi_put_entries", 0);
+  const string dst_bucket = str_int("bi_put_entries", 1);
+
+  const cls_rgw_obj_key obj1 = str_int("obj", 1);
+  const cls_rgw_obj_key obj2 = str_int("obj", 2);
+  const cls_rgw_obj_key obj3 = str_int("obj", 3);
+  const cls_rgw_obj_key obj4 = str_int("obj", 4);
+  const string tag = str_int("tag", 0);
+  const string loc = str_int("loc", 0);
+  auto meta = rgw_bucket_dir_entry_meta{
+    .category = RGWObjCategory::Main, .size = 8192};
+
+  // prepare src_bucket and add two objects
+  {
+    ObjectWriteOperation op;
+    cls_rgw_bucket_init_index2(op);
+    ASSERT_EQ(0, ioctx.operate(src_bucket, &op));
+
+    index_prepare(ioctx, src_bucket, CLS_RGW_OP_ADD, tag, obj1, loc);
+    index_complete(ioctx, src_bucket, CLS_RGW_OP_ADD, tag, 1, obj1, meta);
+
+    index_prepare(ioctx, src_bucket, CLS_RGW_OP_ADD, tag, obj2, loc);
+    index_complete(ioctx, src_bucket, CLS_RGW_OP_ADD, tag, 2, obj2, meta);
+
+    test_stats(ioctx, src_bucket, RGWObjCategory::Main, 2, 16384);
+  }
+
+  // prepare dst_bucket and copy the bi entries
+  {
+    ObjectWriteOperation op;
+    cls_rgw_bucket_init_index2(op);
+    ASSERT_EQ(0, ioctx.operate(dst_bucket, &op));
+  }
+  {
+    list<rgw_cls_bi_entry> src_entries;
+    bool truncated{false};
+    ASSERT_EQ(0, cls_rgw_bi_list(ioctx, src_bucket, "", "", 128,
+                                 &src_entries, &truncated));
+    ASSERT_EQ(2u, src_entries.size());
+
+    ObjectWriteOperation op;
+    cls_rgw_bi_put_entries(op, {src_entries.begin(), src_entries.end()}, true);
+    ASSERT_EQ(0, ioctx.operate(dst_bucket, &op));
+
+    test_stats(ioctx, dst_bucket, RGWObjCategory::Main, 2, 16384);
+  }
+
+  {
+    // start reshard on src_bucket
+    set_reshard_status(ioctx, src_bucket, cls_rgw_reshard_status::IN_LOGRECORD);
+
+    // delete obj1 and log a ReshardDeleted entry
+    index_prepare(ioctx, src_bucket, CLS_RGW_OP_DEL, tag, obj1, loc);
+    index_complete(ioctx, src_bucket, CLS_RGW_OP_DEL, tag, 3, obj1, meta);
+
+    // overwrite obj2 and record its reshardlog entry
+    index_prepare(ioctx, src_bucket, CLS_RGW_OP_ADD, tag, obj2, loc);
+    index_complete(ioctx, src_bucket, CLS_RGW_OP_ADD, tag, 4, obj2, meta);
+
+    // add two more objects
+    index_prepare(ioctx, src_bucket, CLS_RGW_OP_ADD, tag, obj3, loc);
+    index_complete(ioctx, src_bucket, CLS_RGW_OP_ADD, tag, 5, obj3, meta);
+
+    index_prepare(ioctx, src_bucket, CLS_RGW_OP_ADD, tag, obj4, loc);
+    index_complete(ioctx, src_bucket, CLS_RGW_OP_ADD, tag, 6, obj4, meta);
+
+    test_stats(ioctx, src_bucket, RGWObjCategory::Main, 3, 24576);
+  }
+
+  // copy the reshardlog entries from src_bucket to dst_bucket
+  {
+    list<rgw_cls_bi_entry> src_entries;
+    bool truncated{false};
+    const bool reshardlog = true;
+    ASSERT_EQ(0, cls_rgw_bi_list(ioctx, src_bucket, "", "", 128,
+                                 &src_entries, &truncated, reshardlog));
+    ASSERT_EQ(4u, src_entries.size());
+
+    const auto& entry = src_entries.front();
+    EXPECT_EQ(BIIndexType::ReshardDeleted, entry.type);
+
+    ObjectWriteOperation op;
+    cls_rgw_bi_put_entries(op, {src_entries.begin(), src_entries.end()}, true);
+    ASSERT_EQ(0, ioctx.operate(dst_bucket, &op));
+
+    test_stats(ioctx, dst_bucket, RGWObjCategory::Main, 3, 24576);
+  }
+}
index af5ebb2f280c2fdbaddd51660422335443f931a7..7575a8f00cebb04f5d26ccba9188bc72abb70f5e 100644 (file)
@@ -99,6 +99,7 @@ TYPE(rgw_cls_bi_get_ret)
 TYPE(rgw_cls_bi_list_op)
 TYPE(rgw_cls_bi_list_ret)
 TYPE(rgw_cls_bi_put_op)
+TYPE(rgw_cls_bi_put_entries_op)
 TYPE(rgw_cls_obj_check_attrs_prefix)
 TYPE(rgw_cls_obj_remove_op)
 TYPE(rgw_cls_obj_store_pg_ver_op)