]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/reshard: stop recording logs when reshard failed
authorliangmingyuan <liangmingyuan@baidu.com>
Tue, 19 Mar 2024 04:37:05 +0000 (12:37 +0800)
committerliangmingyuan <liangmingyuan@baidu.com>
Mon, 22 Jul 2024 01:19:24 +0000 (09:19 +0800)
A log will be recorded together with index entry in the logrecord
state, ant it should be stopped when reshard fails. Add a judge
every certain time at the logrecord state, the reshard has been
failed if the reshard_lock can be held, then clear the state and
stop recording logs.

Signed-off-by: Mingyuan Liang <liangmingyuan@baidu.com>
doc/radosgw/dynamicresharding.rst
src/common/options/rgw.yaml.in
src/rgw/driver/rados/rgw_bucket.cc
src/rgw/driver/rados/rgw_rados.cc
src/rgw/driver/rados/rgw_rados.h
src/rgw/driver/rados/rgw_reshard.cc
src/rgw/rgw_bucket_layout.cc
src/rgw/rgw_bucket_layout.h
src/rgw/rgw_common.cc

index 27bf070b104b146671d565a7ea767dcb1c74fe8c..822251bbb485d7737ff4ba146a43c83ef443d251 100644 (file)
@@ -52,6 +52,10 @@ Configuration
 .. confval:: rgw_reshard_thread_interval
 .. confval:: rgw_reshard_num_logs
 
+- ``rgw_reshard_progress_judge_interval``: interval of judging if bucket reshard failed in reshard log process state, default: 120 seconds
+
+- ``rgw_reshard_progress_judge_ratio``: ratio of reshard progress judge interval to randomly vary, default: 0.5
+
 Admin commands
 ==============
 
index 583fe4bcaf647316e54fb8326e2feba0f28eb231..d23ad6cfe1ca58134e6156bb8f1af58cfea792d6 100644 (file)
@@ -2711,6 +2711,25 @@ options:
   with_legacy: true
   services:
   - rgw
+- name: rgw_reshard_progress_judge_interval
+  type: uint
+  level: dev
+  desc: interval (in seconds) of judging if bucket reshard failed in block state
+  default: 120
+  services:
+  - rgw
+- name: rgw_reshard_progress_judge_ratio
+  type: float
+  level: dev
+  desc: ratio of reshard progress judge interval to randomly vary
+  long_desc: Add a random delay to rgw_reshard_progress_judge_interval for deciding when
+    to judge the reshard process. The default setting spreads judge time window of
+    [1, 1.5] * rgw_reshard_progress_judge_interval.
+  default: 0.5
+  services:
+  - rgw
+  see_also:
+  - rgw_reshard_progress_judge_interval
 - name: rgw_debug_inject_set_olh_err
   type: uint
   level: dev
index f002dd78daa1770b8dbb4e2d688e38b74e444698..dc560cee2b92b067d180f1eab978b18adb226074 100644 (file)
@@ -1472,6 +1472,7 @@ static int bucket_stats(rgw::sal::Driver* driver,
 
   utime_t ut(bucket->get_modification_time());
   utime_t ctime_ut(bucket->get_creation_time());
+  utime_t logrecord_ut(bucket->get_info().layout.judge_reshard_lock_time);
 
   formatter->open_object_section("stats");
   formatter->dump_string("bucket", bucket->get_name());
@@ -1489,7 +1490,8 @@ static int bucket_stats(rgw::sal::Driver* driver,
   formatter->dump_int("index_generation", bucket_info.layout.current_index.gen);
   formatter->dump_int("num_shards",
                      bucket_info.layout.current_index.layout.normal.num_shards);
-  formatter->dump_string("reshard_status", to_string(bucket_info.reshard_status));
+  formatter->dump_string("reshard_status", to_string(bucket_info.layout.resharding));
+  logrecord_ut.gmtime(formatter->dump_stream("judge_reshard_lock_time"));
   formatter->dump_bool("object_lock_enabled", bucket_info.obj_lock_enabled());
   formatter->dump_bool("mfa_enabled", bucket_info.mfa_enabled());
   ::encode_json("owner", bucket_info.owner, formatter);
index d79d10238cbf98b79ad0b7ed9f819aa331eadcd5..c9a4ebd7b93b1f65e50ecb8168366ef27840b15c 100644 (file)
@@ -6963,6 +6963,10 @@ int RGWRados::Bucket::UpdateIndex::guard_reshard(const DoutPrefixProvider *dpp,
     *pbs = bs;
   }
 
+  if (target->bucket_info.layout.resharding == rgw::BucketReshardState::InLogrecord) {
+    store->reshard_failed_while_logrecord(target->bucket_info, y, dpp);
+  }
+
   return 0;
 }
 
@@ -7656,9 +7660,76 @@ int RGWRados::guard_reshard(const DoutPrefixProvider *dpp,
     return r;
   }
 
+  if (bucket_info.layout.resharding == rgw::BucketReshardState::InLogrecord) {
+    reshard_failed_while_logrecord(bucket_info, y, dpp);
+  }
+
   return 0;
 }
 
+int RGWRados::reshard_failed_while_logrecord(RGWBucketInfo& bucket_info, optional_yield y,
+                                             const DoutPrefixProvider *dpp)
+{
+  real_time now = real_clock::now();
+  double r = rand() / (double)RAND_MAX;
+  double reshard_progress_judge_interval = cct->_conf.get_val<uint64_t>("rgw_reshard_progress_judge_interval");
+  // avoid getting reshard_lock simultaneously by mass differrent operation
+  reshard_progress_judge_interval +=
+    reshard_progress_judge_interval * cct->_conf.get_val<double>("rgw_reshard_progress_judge_ratio") * r;
+  if (now - bucket_info.layout.judge_reshard_lock_time >= make_timespan(reshard_progress_judge_interval)) {
+
+    map<string, bufferlist> bucket_attrs;
+    int ret = get_bucket_info(&svc, bucket_info.bucket.tenant, bucket_info.bucket.name,
+                              bucket_info, nullptr, y, dpp, &bucket_attrs);
+    if (ret < 0) {
+      ldpp_dout(dpp, 0) << __func__ <<
+        " ERROR: failed to refresh bucket info : " << cpp_strerror(-ret) << dendl;
+      return ret;
+    }
+    if (bucket_info.layout.resharding == rgw::BucketReshardState::InLogrecord &&
+        now - bucket_info.layout.judge_reshard_lock_time >= make_timespan(reshard_progress_judge_interval))
+      return reshard_failed_while_logrecord(bucket_info, bucket_attrs, y, dpp);
+  }
+  return 0;
+}
+
+int RGWRados::reshard_failed_while_logrecord(RGWBucketInfo& bucket_info,
+                                            map<string, bufferlist>& bucket_attrs,
+                                            optional_yield y,
+                                            const DoutPrefixProvider *dpp)
+{
+  RGWBucketReshardLock reshard_lock(this->driver, bucket_info, true);
+  int ret = reshard_lock.lock(dpp);
+  if (ret < 0) {
+    ldpp_dout(dpp, 20) << __func__ <<
+      " INFO: failed to take reshard lock for bucket " <<
+      bucket_info.bucket.bucket_id << "; expected if resharding underway" << dendl;
+    // update the judge time
+    bucket_info.layout.judge_reshard_lock_time = real_clock::now();
+    ret = put_bucket_instance_info(bucket_info, false, real_time(), &bucket_attrs, dpp, y);
+    if (ret < 0) {
+      ldpp_dout(dpp, 0) << "RGWReshard::" << __func__ <<
+        " ERROR: error putting bucket instance info: " << cpp_strerror(-ret) << dendl;
+    }
+  } else {
+    ldpp_dout(dpp,20) << __func__ << ": reshard lock success, " <<
+      "that means the reshard has failed for bucekt " << bucket_info.bucket.bucket_id << dendl;
+    // clear the RESHARD_IN_PROGRESS status after reshard failed, also set bucket instance
+    // status to CLS_RGW_RESHARD_NONE
+    ret = RGWBucketReshard::clear_resharding(this->driver, bucket_info, bucket_attrs, dpp, y);
+    reshard_lock.unlock();
+    if (ret < 0) {
+      ldpp_dout(dpp, 0) << __func__ <<
+        " ERROR: failed to clear resharding flags for bucket " <<
+        bucket_info.bucket.bucket_id << dendl;
+    } else {
+      ldpp_dout(dpp, 5) << __func__ <<
+        " INFO: apparently successfully cleared resharding flags for "
+        "bucket " << bucket_info.bucket.bucket_id << dendl;
+    } // if clear resharding succeeded
+  } // if taking of lock succeeded
+  return 0;
+}
 
 int RGWRados::block_while_resharding(RGWRados::BucketShard *bs,
                                      const rgw_obj& obj_instance,
index 67af40f771b4cd2424a5f6039bccecfb07dd5974..8f88650abd1999855ae90ea69e37dfbceb2a35db 100644 (file)
@@ -1336,6 +1336,13 @@ public:
                    const rgw_obj& obj_instance,
                    RGWBucketInfo& bucket_info,
                    std::function<int(BucketShard *)> call, optional_yield y);
+  /* clear the progress flag when reshard failed */
+  int reshard_failed_while_logrecord(RGWBucketInfo& bucket_info, optional_yield y,
+                                     const DoutPrefixProvider *dpp);
+  int reshard_failed_while_logrecord(RGWBucketInfo& bucket_info,
+                                     std::map<std::string, bufferlist>& bucket_attrs,
+                                     optional_yield y,
+                                     const DoutPrefixProvider *dpp);
   int block_while_resharding(RGWRados::BucketShard *bs,
                              const rgw_obj& obj_instance,
                             RGWBucketInfo& bucket_info,
index fce156b69772f0d48c6e93c550986223b7f89e53..ac85bdecddc3ecaeaf6ffadf8fefa3359891dc23 100644 (file)
@@ -505,6 +505,8 @@ static int init_target_layout(rgw::sal::RadosStore* store,
     bucket_info.layout.target_index = target;
     bucket_info.layout.resharding = rgw::BucketReshardState::InLogrecord;
 
+    // update the judge time meanwhile
+    bucket_info.layout.judge_reshard_lock_time = ceph::real_clock::now();
     if (ret = fault.check("set_target_layout");
         ret == 0) { // no fault injected, write the bucket instance metadata
       ret = store->getRados()->put_bucket_instance_info(bucket_info, false,
index 57d37bf381f4eac4c005e647a0c699e709bdda79..f8c485d89c33b9ca9628c253b308f971146163e0 100644 (file)
@@ -15,6 +15,7 @@
 
 #include <boost/algorithm/string.hpp>
 #include "rgw_bucket_layout.h"
+#include "include/utime.h"
 
 namespace rgw {
 
@@ -335,16 +336,17 @@ void decode_json_obj(BucketReshardState& s, JSONObj *obj)
 // BucketLayout
 void encode(const BucketLayout& l, bufferlist& bl, uint64_t f)
 {
-  ENCODE_START(2, 1, bl);
+  ENCODE_START(3, 1, bl);
   encode(l.resharding, bl);
   encode(l.current_index, bl);
   encode(l.target_index, bl);
   encode(l.logs, bl);
+  encode(l.judge_reshard_lock_time, bl);
   ENCODE_FINISH(bl);
 }
 void decode(BucketLayout& l, bufferlist::const_iterator& bl)
 {
-  DECODE_START(2, bl);
+  DECODE_START(3, bl);
   decode(l.resharding, bl);
   decode(l.current_index, bl);
   decode(l.target_index, bl);
@@ -357,6 +359,9 @@ void decode(BucketLayout& l, bufferlist::const_iterator& bl)
   } else {
     decode(l.logs, bl);
   }
+  if (struct_v >= 3) {
+    decode(l.judge_reshard_lock_time, bl);
+  }
   DECODE_FINISH(bl);
 }
 void encode_json_impl(const char *name, const BucketLayout& l, ceph::Formatter *f)
@@ -371,6 +376,8 @@ void encode_json_impl(const char *name, const BucketLayout& l, ceph::Formatter *
   for (const auto& log : l.logs) {
     encode_json("log", log, f);
   }
+  utime_t jt(l.judge_reshard_lock_time);
+  encode_json("judge_reshard_lock_time", jt, f);
   f->close_section(); // logs[]
   f->close_section();
 }
@@ -380,6 +387,9 @@ void decode_json_obj(BucketLayout& l, JSONObj *obj)
   JSONDecoder::decode_json("current_index", l.current_index, obj);
   JSONDecoder::decode_json("target_index", l.target_index, obj);
   JSONDecoder::decode_json("logs", l.logs, obj);
+  utime_t ut;
+  JSONDecoder::decode_json("judge_reshard_lock_time", ut, obj);
+  l.judge_reshard_lock_time = ut.to_real_time();
 }
 
 } // namespace rgw
index 25dc30c3b5f0188b95b88117a82aa35fcfe819e0..daf890162f774916d9e61304ec4fee16d7f7a639 100644 (file)
@@ -241,6 +241,10 @@ struct BucketLayout {
   // generation at the back()
   std::vector<bucket_log_layout_generation> logs;
 
+  // via this time to judge if the bucket is resharding, when the reshard status
+  // of bucket changed or the reshard status is read, this time will be updated
+  ceph::real_time judge_reshard_lock_time;
+
   friend std::ostream& operator<<(std::ostream& out, const BucketLayout& l) {
     std::stringstream ss;
     if (l.target_index) {
@@ -250,7 +254,8 @@ struct BucketLayout {
     }
     out << "resharding=" << to_string(l.resharding) <<
       ", current_index=[" << l.current_index << "], target_index=[" <<
-      ss.str() << "], logs.size()=" << l.logs.size();
+      ss.str() << "], logs.size()=" << l.logs.size() <<
+      ", judge_reshard_lock_time=" << l.judge_reshard_lock_time;
 
     return out;
   }
index 433c6f4f8fc26de640ccbb2a8a6604aee1ef6160..86514691ed38310114834c7253560ed171ea665b 100644 (file)
@@ -2300,7 +2300,6 @@ void RGWBucketInfo::encode(bufferlist& bl) const {
     encode(empty, bl);
   }
   ceph::versioned_variant::encode(owner, bl); // v24
-
   ENCODE_FINISH(bl);
 }