]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/reshard: take into account the object stats of dest shards
authorliangmingyuan <liangmingyuan@baidu.com>
Mon, 18 Mar 2024 09:44:21 +0000 (17:44 +0800)
committerliangmingyuan <liangmingyuan@baidu.com>
Sun, 21 Jul 2024 10:03:13 +0000 (18:03 +0800)
In the progress state, some index entries that have already been
copyed to dest shards in logrecord state will be copyed again, we
should subtract their stats in dest shards firstly.

Signed-off-by: Mingyuan Liang <liangmingyuan@baidu.com>
src/cls/rgw/cls_rgw.cc
src/cls/rgw/cls_rgw_client.cc
src/cls/rgw/cls_rgw_client.h
src/cls/rgw/cls_rgw_const.h
src/cls/rgw/cls_rgw_ops.cc
src/cls/rgw/cls_rgw_ops.h
src/rgw/driver/rados/rgw_rados.cc
src/rgw/driver/rados/rgw_rados.h
src/rgw/driver/rados/rgw_reshard.cc

index 7bd60bf20c854d7ea7b89d35f9ed265c4a91425e..22d2e863f68aa8c89177eb44c44c4796d16f591d 100644 (file)
@@ -121,7 +121,7 @@ static bool bi_entry_gt(const string& first, const string& second)
 /**
  * return: Plain, Instance, OLH or Invalid
  */
-BIIndexType bi_type(const string& s, const string& prefix)
+BIIndexType bi_type(const string& s, const string& prefix ="")
 {
   int ret = bi_entry_type(s.substr(prefix.size()));
   if (ret < 0) {
@@ -768,6 +768,19 @@ int rgw_bucket_update_stats(cls_method_context_t hctx, bufferlist *in, bufferlis
     }
   }
 
+  for (auto& s : op.dec_stats) {
+    auto& dest = header.stats[s.first];
+    if (op.absolute) {
+      CLS_LOG(0, "ERROR: %s: there can not be decribed stats when setting absolutly", __func__);
+      return -EINVAL;
+    } else {
+      dest.total_size -= s.second.total_size;
+      dest.total_size_rounded -= s.second.total_size_rounded;
+      dest.num_entries -= s.second.num_entries;
+      dest.actual_size -= s.second.actual_size;
+    }
+  }
+
   return write_bucket_header(hctx, &header);
 }
 
@@ -2821,6 +2834,72 @@ static int rgw_bi_get_op(cls_method_context_t hctx, bufferlist *in, bufferlist *
   return 0;
 }
 
+/* gain bi_entry based on reshard log */
+static int rgw_bi_get_vals_op(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+  // decode request
+  rgw_cls_bi_get_vals_op op;
+  auto bl_iter = in->cbegin();
+  try {
+    decode(op, bl_iter);
+  } catch (ceph::buffer::error& err) {
+    CLS_LOG(0, "ERROR: %s: failed to decode request", __func__);
+    return -EINVAL;
+  }
+
+  map<string, bufferlist> keys;
+  int ret = cls_cxx_map_get_vals_by_keys(hctx, op.log_entries_wanted, &keys);
+  if (ret < 0) {
+    return ret;
+  }
+
+  rgw_cls_bi_list_ret op_ret;
+  std::map<string, bufferlist>::iterator iter;
+  for (iter = keys.begin(); iter != keys.end(); ++iter) {
+
+    rgw_cls_bi_entry entry;
+    entry.idx = iter->first;
+    entry.type = bi_type(iter->first);
+    entry.data = iter->second;
+
+    auto biter = entry.data.cbegin();
+
+    switch (entry.type) {
+      case BIIndexType::Plain:
+      case BIIndexType::Instance: {
+        rgw_bucket_dir_entry e;
+        try {
+          decode(e, biter);
+        } catch (ceph::buffer::error& err) {
+          CLS_LOG(0, "ERROR: %s: failed to decode buffer", __func__);
+          return -EIO;
+        }
+        break;
+      }
+      case BIIndexType::OLH: {
+        rgw_bucket_olh_entry e;
+        try {
+          decode(e, biter);
+        } catch (ceph::buffer::error& err) {
+          CLS_LOG(0, "ERROR: %s: failed to decode buffer (size=%d)", __func__, entry.data.length());
+          return -EIO;
+        }
+        break;
+      }
+      default:
+        CLS_LOG(0, "%s: invalid entry type: %d", __func__, int(entry.type));
+        return -EINVAL;
+    }
+    CLS_LOG(20, "%s: entry.idx=%s", __func__, escape_str(entry.idx).c_str());
+
+    op_ret.entries.push_back(entry);
+  }
+
+  encode(op_ret, *out);
+
+  return 0;
+}
+
 static int rgw_bi_put_op(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
 {
   CLS_LOG(10, "entered %s", __func__);
@@ -4894,6 +4973,7 @@ CLS_INIT(rgw)
   cls_method_handle_t h_rgw_obj_check_attrs_prefix;
   cls_method_handle_t h_rgw_obj_check_mtime;
   cls_method_handle_t h_rgw_bi_get_op;
+  cls_method_handle_t h_rgw_bi_get_vals_op;
   cls_method_handle_t h_rgw_bi_put_op;
   cls_method_handle_t h_rgw_bi_list_op;
   cls_method_handle_t h_rgw_bi_log_list_op;
@@ -4949,6 +5029,7 @@ CLS_INIT(rgw)
   cls_register_cxx_method(h_class, RGW_OBJ_CHECK_MTIME, CLS_METHOD_RD, rgw_obj_check_mtime, &h_rgw_obj_check_mtime);
 
   cls_register_cxx_method(h_class, RGW_BI_GET, CLS_METHOD_RD, rgw_bi_get_op, &h_rgw_bi_get_op);
+  cls_register_cxx_method(h_class, RGW_BI_GET_VALS, CLS_METHOD_RD, rgw_bi_get_vals_op, &h_rgw_bi_get_vals_op);
   cls_register_cxx_method(h_class, RGW_BI_PUT, CLS_METHOD_RD | CLS_METHOD_WR, rgw_bi_put_op, &h_rgw_bi_put_op);
   cls_register_cxx_method(h_class, RGW_BI_LIST, CLS_METHOD_RD, rgw_bi_list_op, &h_rgw_bi_list_op);
 
index bfdac5259aa6e6fa3a1ae46c3eac51d44bd908d1..f8990dfe02ee029806f870f51c588efbc2155e59 100644 (file)
@@ -253,11 +253,14 @@ int CLSRGWIssueSetTagTimeout::issue_op(const int shard_id, const string& oid)
 
 void cls_rgw_bucket_update_stats(librados::ObjectWriteOperation& o,
                                 bool absolute,
-                                 const map<RGWObjCategory, rgw_bucket_category_stats>& stats)
+                                 const map<RGWObjCategory, rgw_bucket_category_stats>& stats,
+                                 const map<RGWObjCategory, rgw_bucket_category_stats>* dec_stats)
 {
   rgw_cls_bucket_update_stats_op call;
   call.absolute = absolute;
   call.stats = stats;
+  if (dec_stats != NULL)
+    call.dec_stats = *dec_stats;
   bufferlist in;
   encode(call, in);
   o.exec(RGW_CLASS, RGW_BUCKET_UPDATE_STATS, in);
@@ -443,6 +446,30 @@ int cls_rgw_bi_get(librados::IoCtx& io_ctx, const string oid,
   return 0;
 }
 
+int cls_rgw_bi_get_vals(librados::IoCtx& io_ctx, const std::string oid,
+                        std::set<std::string> log_entries_wanted,
+                        std::list<rgw_cls_bi_entry> *entries)
+{
+  bufferlist in, out;
+  struct rgw_cls_bi_get_vals_op call;
+  call.log_entries_wanted = log_entries_wanted;
+  encode(call, in);
+  int r = io_ctx.exec(oid, RGW_CLASS, RGW_BI_GET_VALS, in, out);
+  if (r < 0)
+    return r;
+
+  struct rgw_cls_bi_list_ret op_ret;
+  auto iter = out.cbegin();
+  try {
+    decode(op_ret, iter);
+  } catch (ceph::buffer::error& err) {
+    return -EIO;
+  }
+
+  entries->swap(op_ret.entries);
+
+  return 0;
+}
 
 int cls_rgw_bi_put(librados::IoCtx& io_ctx, const string oid, const rgw_cls_bi_entry& entry)
 {
index 6f39858b10ba422d7b40394214a6511565f12a53..2d4fd46e74aff222c3dea8156d1e4519fdb2b04b 100644 (file)
@@ -346,7 +346,8 @@ public:
 
 void cls_rgw_bucket_update_stats(librados::ObjectWriteOperation& o,
                                  bool absolute,
-                                 const std::map<RGWObjCategory, rgw_bucket_category_stats>& stats);
+                                 const std::map<RGWObjCategory, rgw_bucket_category_stats>& stats,
+                                 const std::map<RGWObjCategory, rgw_bucket_category_stats>* dec_stats = nullptr);
 
 void cls_rgw_bucket_prepare_op(librados::ObjectWriteOperation& o, RGWModifyOp op, const std::string& tag,
                                const cls_rgw_obj_key& key, const std::string& locator, bool log_op,
@@ -368,6 +369,9 @@ void cls_rgw_obj_check_mtime(librados::ObjectOperation& o, const ceph::real_time
 int cls_rgw_bi_get(librados::IoCtx& io_ctx, const std::string oid,
                    BIIndexType index_type, const cls_rgw_obj_key& key,
                    rgw_cls_bi_entry *entry);
+int cls_rgw_bi_get_vals(librados::IoCtx& io_ctx, const std::string oid,
+                        std::set<std::string> log_entries_wanted,
+                        std::list<rgw_cls_bi_entry> *entries);
 int cls_rgw_bi_put(librados::IoCtx& io_ctx, const std::string oid, const rgw_cls_bi_entry& entry);
 void cls_rgw_bi_put(librados::ObjectWriteOperation& op, const std::string oid, const rgw_cls_bi_entry& entry);
 int cls_rgw_bi_list(librados::IoCtx& io_ctx, const std::string& oid,
index a3398ebd0164a89ccf42a1e763795739b7dcb27c..d8c3f1dcb99e2591184bd95bf422814154fcf6f8 100644 (file)
@@ -33,6 +33,7 @@ constexpr int RGWBIAdvanceAndRetryError = -EFBIG;
 #define RGW_OBJ_CHECK_MTIME "obj_check_mtime"
 
 #define RGW_BI_GET "bi_get"
+#define RGW_BI_GET_VALS "bi_get_vals"
 #define RGW_BI_PUT "bi_put"
 #define RGW_BI_LIST "bi_list"
 
index 15bcba33330da80d11c2605fd1ff0992f3d89e21..d32448517ce3d016a1c5f2fb40f98877d6dd8b62 100644 (file)
@@ -373,6 +373,10 @@ void rgw_cls_bucket_update_stats_op::generate_test_instances(list<rgw_cls_bucket
   s.total_size = 1;
   s.total_size_rounded = 4096;
   s.num_entries = 1;
+  rgw_bucket_category_stats& dec_s = r->dec_stats[RGWObjCategory::None];
+  dec_s.total_size = 1;
+  dec_s.total_size_rounded = 4096;
+  dec_s.num_entries = 1;
   o.push_back(r);
 
   o.push_back(new rgw_cls_bucket_update_stats_op);
@@ -386,6 +390,11 @@ void rgw_cls_bucket_update_stats_op::dump(Formatter *f) const
     s[(int)entry.first] = entry.second;
   }
   encode_json("stats", s, f);
+  map<int, rgw_bucket_category_stats> dec_s;
+  for (auto& entry : dec_stats) {
+    dec_s[(int)entry.first] = entry.second;
+  }
+  encode_json("dec_stats", dec_s, f);
 }
 
 void cls_rgw_bi_log_list_op::dump(Formatter *f) const
index dd8a7779aeddbec788ed172c4c7e3a50d9183e42..3e36d96c50eb6fb1f8b96a493ce752e1a8b40fa3 100644 (file)
@@ -493,19 +493,23 @@ struct rgw_cls_bucket_update_stats_op
 {
   bool absolute{false};
   std::map<RGWObjCategory, rgw_bucket_category_stats> stats;
+  std::map<RGWObjCategory, rgw_bucket_category_stats> dec_stats;
 
   rgw_cls_bucket_update_stats_op() {}
 
   void encode(ceph::buffer::list &bl) const {
-    ENCODE_START(1, 1, bl);
+    ENCODE_START(2, 1, bl);
     encode(absolute, bl);
     encode(stats, bl);
+    encode(dec_stats, bl);
     ENCODE_FINISH(bl);
   }
   void decode(ceph::buffer::list::const_iterator &bl) {
-    DECODE_START(1, bl);
+    DECODE_START(2, bl);
     decode(absolute, bl);
     decode(stats, bl);
+    if (struct_v >= 2)
+      decode(dec_stats, bl);
     DECODE_FINISH(bl);
   }
   void dump(ceph::Formatter *f) const;
@@ -727,6 +731,23 @@ struct rgw_cls_bi_get_ret {
 };
 WRITE_CLASS_ENCODER(rgw_cls_bi_get_ret)
 
+struct rgw_cls_bi_get_vals_op {
+  std::set<std::string> log_entries_wanted;
+
+  void encode(ceph::buffer::list& bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(log_entries_wanted, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(ceph::buffer::list::const_iterator& bl) {
+    DECODE_START(1, bl);
+    decode(log_entries_wanted, bl);
+    DECODE_FINISH(bl);
+  }
+};
+WRITE_CLASS_ENCODER(rgw_cls_bi_get_vals_op)
+
 struct rgw_cls_bi_put_op {
   rgw_cls_bi_entry entry;
 
index 4939438a68b4fc64994f2a97d98c56238021f830..d79d10238cbf98b79ad0b7ed9f819aa331eadcd5 100644 (file)
@@ -9162,6 +9162,17 @@ int RGWRados::bi_get(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_
   return cls_rgw_bi_get(ref.ioctx, ref.obj.oid, index_type, key, entry);
 }
 
+int RGWRados::bi_get_vals(BucketShard& bs, set<string> log_entries_wanted,
+                          list<rgw_cls_bi_entry> *entries, optional_yield y)
+{
+  auto& ref = bs.bucket_obj;
+  int ret = cls_rgw_bi_get_vals(ref.ioctx, ref.obj.oid, log_entries_wanted, entries);
+  if (ret < 0)
+    return ret;
+
+  return 0;
+}
+
 void RGWRados::bi_put(ObjectWriteOperation& op, BucketShard& bs, rgw_cls_bi_entry& entry, optional_yield y)
 {
   auto& ref = bs.bucket_obj;
index 7d6299d118dedc9bcfc0d0f2926fc0e95ed865d2..67af40f771b4cd2424a5f6039bccecfb07dd5974 100644 (file)
@@ -1515,6 +1515,7 @@ public:
   int bi_get_instance(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const rgw_obj& obj, rgw_bucket_dir_entry *dirent, optional_yield y);
   int bi_get_olh(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const rgw_obj& obj, rgw_bucket_olh_entry *olh, optional_yield y);
   int bi_get(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const rgw_obj& obj, BIIndexType index_type, rgw_cls_bi_entry *entry, optional_yield y);
+  int bi_get_vals(BucketShard& bs, std::set<std::string> log_entries_wanted, std::list<rgw_cls_bi_entry> *entries, optional_yield y);
   void bi_put(librados::ObjectWriteOperation& op, BucketShard& bs, rgw_cls_bi_entry& entry, optional_yield y);
   int bi_put(BucketShard& bs, rgw_cls_bi_entry& entry, optional_yield y);
   int bi_put(const DoutPrefixProvider *dpp, rgw_bucket& bucket, rgw_obj& obj, rgw_cls_bi_entry& entry, optional_yield y);
index b75bb7bf8cd9de91cf0fc377301fbdf12c6a133a..fce156b69772f0d48c6e93c550986223b7f89e53 100644 (file)
@@ -217,7 +217,8 @@ public:
   }
 
   int add_entry(rgw_cls_bi_entry& entry, bool account, RGWObjCategory category,
-                const rgw_bucket_category_stats& entry_stats) {
+                const rgw_bucket_category_stats& entry_stats,
+                bool process_log = false) {
     entries.push_back(entry);
     if (account) {
       rgw_bucket_category_stats& target = stats[category];
@@ -227,7 +228,7 @@ public:
       target.actual_size += entry_stats.actual_size;
     }
     if (entries.size() >= reshard_shard_batch_size) {
-      int ret = flush();
+      int ret = flush(process_log);
       if (ret < 0) {
         return ret;
       }
@@ -236,16 +237,48 @@ public:
     return 0;
   }
 
-  int flush() {
+  int flush(bool process_log = false) {
     if (entries.size() == 0) {
       return 0;
     }
 
     librados::ObjectWriteOperation op;
-    for (auto& entry : entries) {
-      store->getRados()->bi_put(op, bs, entry, null_yield);
+    if (process_log) {
+      map<RGWObjCategory, rgw_bucket_category_stats> dec_stats;
+      list<rgw_cls_bi_entry> dec_entries;
+      set<string> dec_entry_names_wanted;
+      for (auto& entry : entries) {
+        store->getRados()->bi_put(op, bs, entry, null_yield);
+        dec_entry_names_wanted.emplace(entry.idx);
+      }
+
+      // getting the index entry in target shard
+      int ret = store->getRados()->bi_get_vals(bs, dec_entry_names_wanted, &dec_entries, null_yield);
+      if(ret < 0) {
+        derr << "ERROR: bi_get_vals(): " << cpp_strerror(-ret) << dendl;
+        return ret;
+      }
+
+      for (auto& dec_entry : dec_entries) {
+        cls_rgw_obj_key cls_key;
+        RGWObjCategory category;
+        rgw_bucket_category_stats accounted_stats;
+        bool account = dec_entry.get_info(&cls_key, &category, &accounted_stats);
+        if (account) {
+          auto& dest = dec_stats[category];
+          dest.total_size += accounted_stats.total_size;
+          dest.total_size_rounded += accounted_stats.total_size_rounded;
+          dest.num_entries += accounted_stats.num_entries;
+          dest.actual_size += accounted_stats.actual_size;
+        }
+      }
+      cls_rgw_bucket_update_stats(op, false, stats, &dec_stats);
+    } else {
+      for (auto& entry : entries) {
+        store->getRados()->bi_put(op, bs, entry, null_yield);
+      }
+      cls_rgw_bucket_update_stats(op, false, stats);
     }
-    cls_rgw_bucket_update_stats(op, false, stats);
 
     librados::AioCompletion *c;
     int ret = get_completion(&c);
@@ -307,9 +340,10 @@ public:
 
   int add_entry(int shard_index,
                 rgw_cls_bi_entry& entry, bool account, RGWObjCategory category,
-                const rgw_bucket_category_stats& entry_stats) {
+                const rgw_bucket_category_stats& entry_stats,
+                bool process_log = false) {
     int ret = target_shards[shard_index].add_entry(entry, account, category,
-                                                  entry_stats);
+                                                  entry_stats, process_log);
     if (ret < 0) {
       derr << "ERROR: target_shards.add_entry(" << entry.idx <<
        ") returned error: " << cpp_strerror(-ret) << dendl;
@@ -319,10 +353,10 @@ public:
     return 0;
   }
 
-  int finish() {
+  int finish(bool process_log = false) {
     int ret = 0;
     for (auto& shard : target_shards) {
-      int r = shard.flush();
+      int r = shard.flush(process_log);
       if (r < 0) {
         derr << "ERROR: target_shards[" << shard.get_shard_id() << "].flush() returned error: " << cpp_strerror(-r) << dendl;
         ret = r;
@@ -980,15 +1014,15 @@ int RGWBucketReshard::reshard_process(const rgw::bucket_index_layout_generation&
   list<rgw_cls_bi_entry> entries;
 
   string stage;
-  bool read_reshardlog;
+  bool process_log = false;
   switch (reshard_stage) {
   case rgw::BucketReshardState::InLogrecord:
     stage = "inventory";
-    read_reshardlog = false;
+    process_log = false;
     break;
   case rgw::BucketReshardState::InProgress:
     stage = "inc";
-    read_reshardlog = true;
+    process_log = true;
     break;
   default:
     ldpp_dout(dpp, 0) << "ERROR: " << __func__ << " unknown reshard stage" << dendl;
@@ -1016,7 +1050,7 @@ int RGWBucketReshard::reshard_process(const rgw::bucket_index_layout_generation&
 
       int ret = store->getRados()->bi_list(dpp, bucket_info, i, null_object_filter,
                                            marker, max_op_entries, &entries,
-                                           &is_truncated, read_reshardlog, y);
+                                           &is_truncated, process_log, y);
       if (ret == -ENOENT) {
         ldpp_dout(dpp, 1) << "WARNING: " << __func__ << " failed to find shard "
             << i << ", skipping" << dendl;
@@ -1069,7 +1103,7 @@ int RGWBucketReshard::reshard_process(const rgw::bucket_index_layout_generation&
         int shard_index = (target_shard_id > 0 ? target_shard_id : 0);
 
         ret = target_shards_mgr.add_entry(shard_index, entry, account,
-                  category, stats);
+                  category, stats, process_log);
         if (ret < 0) {
           return ret;
         }
@@ -1107,7 +1141,7 @@ int RGWBucketReshard::reshard_process(const rgw::bucket_index_layout_generation&
     (*out) << " " << stage_entries << std::endl;
   }
 
-  int ret = target_shards_mgr.finish();
+  int ret = target_shards_mgr.finish(process_log);
   if (ret < 0) {
     ldpp_dout(dpp, -1) << "ERROR: failed to reshard: " << ret << dendl;
     return -EIO;