]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
reshard: small fix and cleanup
authorliangmingyuan <liangmingyuan@baidu.com>
Fri, 22 Mar 2024 01:14:35 +0000 (09:14 +0800)
committerliangmingyuan <liangmingyuan@baidu.com>
Mon, 22 Jul 2024 01:19:25 +0000 (09:19 +0800)
At the end of each stage, finish() will be called once to guarantee
all entries can be flushed to dst shards. This may costs long time
if the counts of dst shards is vast, especially in second stage,
so renew reshard_lock is needed.

Signed-off-by: Mingyuan Liang <liangmingyuan@baidu.com>
src/cls/rgw/cls_rgw.cc
src/cls/rgw/cls_rgw_client.cc
src/cls/rgw/cls_rgw_client.h
src/rgw/driver/rados/rgw_rados.cc
src/rgw/driver/rados/rgw_rados.h
src/rgw/driver/rados/rgw_reshard.cc
src/rgw/driver/rados/rgw_reshard.h

index af520b6bb0e2281fa0bbc61e8f91e5e51d8540cd..8f0190d421814587401c2346ef5e6d100a727c62 100644 (file)
@@ -2914,7 +2914,6 @@ static int rgw_bi_put_op(cls_method_context_t hctx, bufferlist *in, bufferlist *
   }
 
   rgw_cls_bi_entry& entry = op.entry;
-
   if (entry.type == BIIndexType::ReshardDeleted) {
     int r = cls_cxx_map_remove_key(hctx, entry.idx);
     if (r < 0) {
index a9b1a5bdb1c474fc31033c31a84ca3e21e1dcb38..c5ac99eada02359528e5461ed5aa930d0a35b309 100644 (file)
@@ -471,12 +471,12 @@ int cls_rgw_bi_get(librados::IoCtx& io_ctx, const string oid,
 }
 
 int cls_rgw_bi_get_vals(librados::IoCtx& io_ctx, const std::string oid,
-                        std::set<std::string> log_entries_wanted,
+                        std::set<std::string>& log_entries_wanted,
                         std::list<rgw_cls_bi_entry> *entries)
 {
   bufferlist in, out;
   struct rgw_cls_bi_get_vals_op call;
-  call.log_entries_wanted = log_entries_wanted;
+  call.log_entries_wanted = std::move(log_entries_wanted);
   encode(call, in);
   int r = io_ctx.exec(oid, RGW_CLASS, RGW_BI_GET_VALS, in, out);
   if (r < 0)
@@ -490,7 +490,8 @@ int cls_rgw_bi_get_vals(librados::IoCtx& io_ctx, const std::string oid,
     return -EIO;
   }
 
-  entries->swap(op_ret.entries);
+  if (entries)
+    entries->swap(op_ret.entries);
 
   return 0;
 }
index c5336030c07c2c3bd64626d15a6bf406511afcc5..86c40dc92787df2ca828c710fc96ce4bfcbf5b27 100644 (file)
@@ -384,7 +384,7 @@ int cls_rgw_bi_get(librados::IoCtx& io_ctx, const std::string oid,
                    BIIndexType index_type, const cls_rgw_obj_key& key,
                    rgw_cls_bi_entry *entry);
 int cls_rgw_bi_get_vals(librados::IoCtx& io_ctx, const std::string oid,
-                        std::set<std::string> log_entries_wanted,
+                        std::set<std::string>& log_entries_wanted,
                         std::list<rgw_cls_bi_entry> *entries);
 int cls_rgw_bi_put(librados::IoCtx& io_ctx, const std::string oid, const rgw_cls_bi_entry& entry);
 void cls_rgw_bi_put(librados::ObjectWriteOperation& op, const std::string oid, const rgw_cls_bi_entry& entry);
index 1df24538067f48a7c1b24590a3bdf2800cbbc80d..4be7264d32f6b791352697a2f7bce105a30cb075 100644 (file)
@@ -9239,7 +9239,7 @@ int RGWRados::bi_get(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_
   return cls_rgw_bi_get(ref.ioctx, ref.obj.oid, index_type, key, entry);
 }
 
-int RGWRados::bi_get_vals(BucketShard& bs, set<string> log_entries_wanted,
+int RGWRados::bi_get_vals(BucketShard& bs, set<string>& log_entries_wanted,
                           list<rgw_cls_bi_entry> *entries, optional_yield y)
 {
   auto& ref = bs.bucket_obj;
index 4321cddf40ee37ef699a74ffaa3aec19c161cb8b..a2c55b585d4653e4687b712f322645f70c31a759 100644 (file)
@@ -1522,7 +1522,7 @@ public:
   int bi_get_instance(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const rgw_obj& obj, rgw_bucket_dir_entry *dirent, optional_yield y);
   int bi_get_olh(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const rgw_obj& obj, rgw_bucket_olh_entry *olh, optional_yield y);
   int bi_get(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const rgw_obj& obj, BIIndexType index_type, rgw_cls_bi_entry *entry, optional_yield y);
-  int bi_get_vals(BucketShard& bs, std::set<std::string> log_entries_wanted, std::list<rgw_cls_bi_entry> *entries, optional_yield y);
+  int bi_get_vals(BucketShard& bs, std::set<std::string>& log_entries_wanted, std::list<rgw_cls_bi_entry> *entries, optional_yield y);
   void bi_put(librados::ObjectWriteOperation& op, BucketShard& bs, rgw_cls_bi_entry& entry, optional_yield y);
   int bi_put(BucketShard& bs, rgw_cls_bi_entry& entry, optional_yield y);
   int bi_put(const DoutPrefixProvider *dpp, rgw_bucket& bucket, rgw_obj& obj, rgw_cls_bi_entry& entry, optional_yield y);
index 3ff4915dd75c3775c32fc2e259333bec68cba106..2625b97923339dcd3c79b41afcd1a2f1f8d198c2 100644 (file)
@@ -237,7 +237,8 @@ public:
     return 0;
   }
 
-  int flush(bool process_log = false) {
+  int flush(bool process_log = false, RGWBucketReshard *br = nullptr,
+            const DoutPrefixProvider *dpp = nullptr) {
     if (entries.size() == 0) {
       return 0;
     }
@@ -292,6 +293,13 @@ public:
     }
     entries.clear();
     stats.clear();
+
+    if (br != nullptr) {
+      ret = br->renew_lock_if_needed(dpp);
+      if (ret < 0) {
+        return ret;
+      }
+    }
     return 0;
   }
 
@@ -353,10 +361,11 @@ public:
     return 0;
   }
 
-  int finish(bool process_log = false) {
+  int finish(bool process_log = false, RGWBucketReshard *br = nullptr,
+             const DoutPrefixProvider *dpp = nullptr) {
     int ret = 0;
     for (auto& shard : target_shards) {
-      int r = shard.flush(process_log);
+      int r = shard.flush(process_log, br, dpp);
       if (r < 0) {
         derr << "ERROR: target_shards[" << shard.get_shard_id() << "].flush() returned error: " << cpp_strerror(-r) << dendl;
         ret = r;
@@ -934,7 +943,7 @@ int RGWBucketReshard::cancel(const DoutPrefixProvider* dpp, optional_yield y)
     return ret;
   }
 
-  if (bucket_info.layout.resharding != rgw::BucketReshardState::InProgress ||
+  if (bucket_info.layout.resharding != rgw::BucketReshardState::InProgress &&
       bucket_info.layout.resharding != rgw::BucketReshardState::InLogrecord) {
     ldpp_dout(dpp, -1) << "ERROR: bucket is not resharding" << dendl;
     ret = -EINVAL;
@@ -1032,13 +1041,55 @@ int RGWBucketReshardLock::renew(const Clock::time_point& now) {
   return 0;
 }
 
+int RGWBucketReshard::renew_lock_if_needed(const DoutPrefixProvider *dpp) {
+  int ret = 0;
+  Clock::time_point now = Clock::now();
+  if (reshard_lock.should_renew(now)) {
+    // assume outer locks have timespans at least the size of ours, so
+    // can call inside conditional
+    if (outer_reshard_lock) {
+      ret = outer_reshard_lock->renew(now);
+      if (ret < 0) {
+        return ret;
+      }
+    }
+    ret = reshard_lock.renew(now);
+    if (ret < 0) {
+      ldpp_dout(dpp, -1) << "Error renewing bucket lock: " << ret << dendl;
+      return ret;
+    }
+  }
+  return 0;
+}
+
+int RGWBucketReshard::calc_target_shard(const RGWBucketInfo& bucket_info, const rgw_obj_key& key,
+                                        int& shard, const DoutPrefixProvider *dpp) {
+  int target_shard_id, ret;
+
+  rgw_obj obj(bucket_info.bucket, key);
+  RGWMPObj mp;
+  if (key.ns == RGW_OBJ_NS_MULTIPART && mp.from_meta(key.name)) {
+    // place the multipart .meta object on the same shard as its head object
+    obj.index_hash_source = mp.get_key();
+  }
+  ret = store->getRados()->get_target_shard_id(bucket_info.layout.target_index->layout.normal,
+                obj.get_hash_object(), &target_shard_id);
+  if (ret < 0) {
+    ldpp_dout(dpp, -1) << "ERROR: get_target_shard_id() returned ret=" << ret << dendl;
+    return ret;
+  }
+  shard = (target_shard_id > 0 ? target_shard_id : 0);
+
+  return 0;
+}
+
 int RGWBucketReshard::reshard_process(const rgw::bucket_index_layout_generation& current,
-                                                int& max_op_entries,
-                                                BucketReshardManager& target_shards_mgr,
-                                                bool verbose_json_out,
-                                                ostream *out,
-                                                Formatter *formatter, rgw::BucketReshardState reshard_stage,
-                                                const DoutPrefixProvider *dpp, optional_yield y)
+                                      int& max_op_entries,
+                                      BucketReshardManager& target_shards_mgr,
+                                      bool verbose_json_out,
+                                      ostream *out,
+                                      Formatter *formatter, rgw::BucketReshardState reshard_stage,
+                                      const DoutPrefixProvider *dpp, optional_yield y)
 {
   list<rgw_cls_bi_entry> entries;
 
@@ -1065,6 +1116,7 @@ int RGWBucketReshard::reshard_process(const rgw::bucket_index_layout_generation&
   uint64_t stage_entries = 0;
   stage.append(":");
   if (!verbose_json_out && out) {
+    (*out) << "start time: " << real_clock::now() << std::endl;
     (*out) << stage;
   }
 
@@ -1103,7 +1155,6 @@ int RGWBucketReshard::reshard_process(const rgw::bucket_index_layout_generation&
 
         marker = entry.idx;
 
-        int target_shard_id;
         cls_rgw_obj_key cls_key;
         RGWObjCategory category;
         rgw_bucket_category_stats stats;
@@ -1116,43 +1167,24 @@ int RGWBucketReshard::reshard_process(const rgw::bucket_index_layout_generation&
           ldpp_dout(dpp, 10) << "Dropping entry with empty name, idx=" << marker << dendl;
           continue;
         }
-        rgw_obj obj(bucket_info.bucket, key);
-        RGWMPObj mp;
-        if (key.ns == RGW_OBJ_NS_MULTIPART && mp.from_meta(key.name)) {
-          // place the multipart .meta object on the same shard as its head object
-          obj.index_hash_source = mp.get_key();
-        }
-        ret = store->getRados()->get_target_shard_id(bucket_info.layout.target_index->layout.normal,
-                      obj.get_hash_object(), &target_shard_id);
+
+        int shard_index;
+        ret = calc_target_shard(bucket_info, key, shard_index, dpp);
         if (ret < 0) {
-          ldpp_dout(dpp, -1) << "ERROR: get_target_shard_id() returned ret=" << ret << dendl;
           return ret;
         }
 
-        int shard_index = (target_shard_id > 0 ? target_shard_id : 0);
-
         ret = target_shards_mgr.add_entry(shard_index, entry, account,
                   category, stats, process_log);
         if (ret < 0) {
           return ret;
         }
 
-        Clock::time_point now = Clock::now();
-        if (reshard_lock.should_renew(now)) {
-          // assume outer locks have timespans at least the size of ours, so
-          // can call inside conditional
-          if (outer_reshard_lock) {
-            ret = outer_reshard_lock->renew(now);
-            if (ret < 0) {
-              return ret;
-            }
-          }
-          ret = reshard_lock.renew(now);
-          if (ret < 0) {
-            ldpp_dout(dpp, -1) << "Error renewing bucket lock: " << ret << dendl;
-            return ret;
-          }
+        ret = renew_lock_if_needed(dpp);
+        if (ret < 0) {
+          return ret;
         }
+
         if (verbose_json_out) {
           formatter->close_section();
           formatter->flush(*out);
@@ -1168,13 +1200,15 @@ int RGWBucketReshard::reshard_process(const rgw::bucket_index_layout_generation&
     formatter->flush(*out);
   } else if (out) {
     (*out) << " " << stage_entries << std::endl;
+    (*out) << "end time: " << real_clock::now() << std::endl;
   }
 
-  int ret = target_shards_mgr.finish(process_log);
+  int ret = target_shards_mgr.finish(process_log, this, dpp);
   if (ret < 0) {
     ldpp_dout(dpp, -1) << "ERROR: failed to reshard: " << ret << dendl;
     return -EIO;
   }
+
   return 0;
 }
 
index ea5bb6e713d5a1bc17903ea9c0019a8e82e5e150..3d056e50f468d9860a4fcef9536ed710b75e7fce 100644 (file)
@@ -84,6 +84,9 @@ class RGWBucketReshard {
   // using an initializer_list as an array in contiguous memory
   // allocated in at once
   static const std::initializer_list<uint16_t> reshard_primes;
+
+  int calc_target_shard(const RGWBucketInfo& bucket_info, const rgw_obj_key& key,
+                        int& shard, const DoutPrefixProvider *dpp);
   int reshard_process(const rgw::bucket_index_layout_generation& current,
                       int& max_entries,
                       BucketReshardManager& target_shards_mgr,
@@ -91,6 +94,7 @@ class RGWBucketReshard {
                       std::ostream *out,
                       Formatter *formatter, rgw::BucketReshardState reshard_stage,
                       const DoutPrefixProvider *dpp, optional_yield y);
+
   int do_reshard(const rgw::bucket_index_layout_generation& current,
                  const rgw::bucket_index_layout_generation& target,
                  int max_entries, bool support_logrecord,
@@ -115,6 +119,7 @@ public:
              RGWReshard *reshard_log = nullptr);
   int get_status(const DoutPrefixProvider *dpp, std::list<cls_rgw_bucket_instance_entry> *status);
   int cancel(const DoutPrefixProvider* dpp, optional_yield y);
+  int renew_lock_if_needed(const DoutPrefixProvider *dpp);
 
   static int clear_resharding(rgw::sal::RadosStore* store,
                              RGWBucketInfo& bucket_info,