]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
reshard: guarantee no duplicated index entries exist before starting
authorliangmingyuan <liangmingyuan@baidu.com>
Tue, 26 Mar 2024 01:12:08 +0000 (09:12 +0800)
committerliangmingyuan <liangmingyuan@baidu.com>
Fri, 26 Jul 2024 00:50:49 +0000 (08:50 +0800)
reshard

There will be duplicated index entries remaining after reshard failed,
that can lead to redundant copys in a new reshard process. What's more,
if the duplicated entry is deleting operation, and the same entry was
written again before a new resharding, the dst index may be deleted
wrongly. So duplicated index entries should be cleared after reshard
failed and before a new reshard autom automatically.
For convenience, rgw-admin can list and purge reshard logsi manually.

Signed-off-by: Mingyuan Liang <liangmingyuan@baidu.com>
src/cls/rgw/cls_rgw.cc
src/cls/rgw/cls_rgw_client.cc
src/cls/rgw/cls_rgw_client.h
src/cls/rgw/cls_rgw_const.h
src/rgw/driver/rados/rgw_rados.cc
src/rgw/driver/rados/rgw_rados.h
src/rgw/driver/rados/rgw_reshard.cc
src/rgw/rgw_admin.cc

index 8f0190d421814587401c2346ef5e6d100a727c62..9f4fd54a67da184308fc49ff1834c4cf496b4d09 100644 (file)
@@ -3822,6 +3822,48 @@ static int rgw_bi_log_stop(cls_method_context_t hctx, bufferlist *in, bufferlist
   return write_bucket_header(hctx, &header);
 }
 
+static int rgw_reshard_log_trim_op(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+  string key_begin(1, BI_PREFIX_CHAR);
+  key_begin.append(bucket_index_prefixes[BI_BUCKET_RESHARD_LOG_INDEX]);
+
+  string key_end;
+  key_end = BI_PREFIX_CHAR;
+  key_end.append(bucket_index_prefixes[BI_BUCKET_RESHARD_LOG_INDEX + 1]);
+
+  // list a single key to detect whether the range is empty
+  const size_t max_entries = 1;
+  std::set<std::string> keys;
+  bool more = false;
+
+  int rc = cls_cxx_map_get_keys(hctx, key_begin, max_entries, &keys, &more);
+  if (rc < 0) {
+    CLS_LOG(1, "ERROR: cls_cxx_map_get_keys failed rc=%d", rc);
+    return rc;
+  }
+
+  if (keys.empty()) {
+    CLS_LOG(20, "range is empty key_begin=%s", key_begin.c_str());
+    return -ENODATA;
+  }
+
+  const std::string& first_key = *keys.begin();
+  if (key_end < first_key) {
+    CLS_LOG(20, "listed key %s past key_end=%s", first_key.c_str(), key_end.c_str());
+    return -ENODATA;
+  }
+
+  CLS_LOG(20, "listed key %s, removing through %s",
+          first_key.c_str(), key_end.c_str());
+
+  rc = cls_cxx_map_remove_range(hctx, first_key, key_end);
+  if (rc < 0) {
+    CLS_LOG(1, "ERROR: cls_cxx_map_remove_range failed rc=%d", rc);
+    return rc;
+  }
+  return 0;
+}
+
 static void usage_record_prefix_by_time(uint64_t epoch, string& key)
 {
   char buf[32];
@@ -4975,6 +5017,7 @@ CLS_INIT(rgw)
   cls_method_handle_t h_rgw_bi_get_vals_op;
   cls_method_handle_t h_rgw_bi_put_op;
   cls_method_handle_t h_rgw_bi_list_op;
+  cls_method_handle_t h_rgw_reshard_log_trim_op;
   cls_method_handle_t h_rgw_bi_log_list_op;
   cls_method_handle_t h_rgw_bi_log_trim_op;
   cls_method_handle_t h_rgw_bi_log_resync_op;
@@ -5032,6 +5075,7 @@ CLS_INIT(rgw)
   cls_register_cxx_method(h_class, RGW_BI_GET_VALS, CLS_METHOD_RD, rgw_bi_get_vals_op, &h_rgw_bi_get_vals_op);
   cls_register_cxx_method(h_class, RGW_BI_PUT, CLS_METHOD_RD | CLS_METHOD_WR, rgw_bi_put_op, &h_rgw_bi_put_op);
   cls_register_cxx_method(h_class, RGW_BI_LIST, CLS_METHOD_RD, rgw_bi_list_op, &h_rgw_bi_list_op);
+  cls_register_cxx_method(h_class, RGW_RESHARD_LOG_TRIM, CLS_METHOD_RD | CLS_METHOD_WR, rgw_reshard_log_trim_op, &h_rgw_reshard_log_trim_op);
 
   cls_register_cxx_method(h_class, RGW_BI_LOG_LIST, CLS_METHOD_RD, rgw_bi_log_list, &h_rgw_bi_log_list_op);
   cls_register_cxx_method(h_class, RGW_BI_LOG_TRIM, CLS_METHOD_RD | CLS_METHOD_WR, rgw_bi_log_trim, &h_rgw_bi_log_trim_op);
index c5ac99eada02359528e5461ed5aa930d0a35b309..400b7f768a9f0beda559f87b4ccb1b4451c2e13d 100644 (file)
@@ -733,6 +733,19 @@ int CLSRGWIssueBILogTrim::issue_op(const int shard_id, const string& oid)
   return issue_bi_log_trim(io_ctx, oid, shard_id, start_marker_mgr, end_marker_mgr, &manager);
 }
 
+static bool issue_reshard_log_trim(librados::IoCtx& io_ctx, const string& oid, int shard_id,
+                                   BucketIndexAioManager *manager) {
+  bufferlist in;
+  ObjectWriteOperation op;
+  op.exec(RGW_CLASS, RGW_RESHARD_LOG_TRIM, in);
+  return manager->aio_operate(io_ctx, shard_id, oid, &op);
+}
+
+int CLSRGWIssueReshardLogTrim::issue_op(int shard_id, const string& oid)
+{
+  return issue_reshard_log_trim(io_ctx, oid, shard_id, &manager);
+}
+
 static bool issue_bucket_check_index_op(IoCtx& io_ctx, const int shard_id, const string& oid, BucketIndexAioManager *manager,
     rgw_cls_check_index_ret *pdata) {
   bufferlist in;
index 86c40dc92787df2ca828c710fc96ce4bfcbf5b27..e43aa981a37cb0acfa77a52b116b0ea8833045b8 100644 (file)
@@ -522,6 +522,23 @@ public:
   virtual ~CLSRGWIssueBILogTrim() override {}
 };
 
+class CLSRGWIssueReshardLogTrim : public CLSRGWConcurrentIO {
+protected:
+  int issue_op(int shard_id, const std::string& oid) override;
+  // Trim until -ENODATA is returned.
+  int valid_ret_code() override { return -ENODATA; }
+  bool need_multiple_rounds() override { return true; }
+  void add_object(int shard, const std::string& oid) override { objs_container[shard] = oid; }
+  void reset_container(std::map<int, std::string>& objs) override {
+    objs_container.swap(objs);
+    iter = objs_container.begin();
+    objs.clear();
+  }
+public:
+  CLSRGWIssueReshardLogTrim(librados::IoCtx& io_ctx, std::map<int, std::string>& _bucket_objs, uint32_t max_aio) :
+      CLSRGWConcurrentIO(io_ctx, _bucket_objs, max_aio) {}
+};
+
 /**
  * Check the bucket index.
  *
index 4c8f20ffa9e897f7120fce335a3777edd208f54f..9a4e368575d9965cef8fbfee732b7196a3404885 100644 (file)
@@ -37,6 +37,8 @@ constexpr int RGWBIAdvanceAndRetryError = -EFBIG;
 #define RGW_BI_PUT "bi_put"
 #define RGW_BI_LIST "bi_list"
 
+#define RGW_RESHARD_LOG_TRIM "reshard_log_trim"
+
 #define RGW_BI_LOG_LIST "bi_log_list"
 #define RGW_BI_LOG_TRIM "bi_log_trim"
 #define RGW_DIR_SUGGEST_CHANGES "dir_suggest_changes"
index 4be7264d32f6b791352697a2f7bce105a30cb075..aa5e646ed518c3dbedb151e496dd3093b9c5403d 100644 (file)
@@ -7720,8 +7720,8 @@ int RGWRados::reshard_failed_while_logrecord(RGWBucketInfo& bucket_info,
   } else {
     ldpp_dout(dpp,20) << __func__ << ": reshard lock success, " <<
       "that means the reshard has failed for bucekt " << bucket_info.bucket.bucket_id << dendl;
-    // clear the RESHARD_IN_PROGRESS status after reshard failed, also set bucket instance
-    // status to CLS_RGW_RESHARD_NONE
+    // clear the RESHARD_IN_PROGRESS status after reshard failed, set bucket instance status
+    // to CLS_RGW_RESHARD_NONE, also clear the reshard log entries
     ret = RGWBucketReshard::clear_resharding(this->driver, bucket_info, bucket_attrs, dpp, y);
     reshard_lock.unlock();
     if (ret < 0) {
@@ -9351,6 +9351,18 @@ int RGWRados::bi_remove(const DoutPrefixProvider *dpp, BucketShard& bs)
   return 0;
 }
 
+int RGWRados::trim_reshard_log_entries(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info, optional_yield y)
+{
+  librados::IoCtx index_pool;
+  map<int, string> bucket_objs;
+
+  int r = svc.bi_rados->open_bucket_index(dpp, bucket_info, std::nullopt, bucket_info.layout.current_index, &index_pool, &bucket_objs, nullptr);
+  if (r < 0) {
+    return r;
+  }
+  return CLSRGWIssueReshardLogTrim(index_pool, bucket_objs, cct->_conf->rgw_bucket_index_max_aio)();
+}
+
 int RGWRados::gc_operate(const DoutPrefixProvider *dpp, string& oid, librados::ObjectWriteOperation *op, optional_yield y)
 {
   return rgw_rados_operate(dpp, gc_pool_ctx, oid, op, y);
index a2c55b585d4653e4687b712f322645f70c31a759..713e8f351a48919da9a727a74730c9d4a3100264 100644 (file)
@@ -1540,6 +1540,8 @@ public:
               std::list<rgw_cls_bi_entry> *entries, bool *is_truncated, bool reshardlog, optional_yield y);
   int bi_remove(const DoutPrefixProvider *dpp, BucketShard& bs);
 
+  int trim_reshard_log_entries(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info, optional_yield y);
+
   int cls_obj_usage_log_add(const DoutPrefixProvider *dpp, const std::string& oid, rgw_usage_log_info& info, optional_yield y);
   int cls_obj_usage_log_read(const DoutPrefixProvider *dpp, const std::string& oid, const std::string& user, const std::string& bucket, uint64_t start_epoch,
                              uint64_t end_epoch, uint32_t max_entries, std::string& read_iter,
index 2625b97923339dcd3c79b41afcd1a2f1f8d198c2..84c76f2d52d24a6e938c0e95e17c68ed336502ea 100644 (file)
@@ -436,7 +436,8 @@ static int init_target_index(rgw::sal::RadosStore* store,
 {
   int ret = store->svc()->bi->init_index(dpp, bucket_info, index, true);
   if (ret == -EOPNOTSUPP) {
-    ldpp_dout(dpp, 0) << "WARNING: " << "init_index() does not supported logrecord" << dendl;
+    ldpp_dout(dpp, 0) << "WARNING: " << "init_index() does not supported logrecord, "
+                      << "falling back to block reshard mode." << dendl;
     support_logrecord = false;
     ret = store->svc()->bi->init_index(dpp, bucket_info, index, false);
   }
@@ -571,6 +572,25 @@ static int init_target_layout(rgw::sal::RadosStore* store,
     store->svc()->bi->clean_index(dpp, bucket_info, target);
     return ret;
   }
+
+  // trim the reshard log entries to guarantee that any existing log entries are cleared,
+  // if there are no reshard log entries, this is a no-op that costs little time
+  if (support_logrecord) {
+    ret = store->getRados()->trim_reshard_log_entries(dpp, bucket_info, null_yield);
+    if (ret == -EOPNOTSUPP) {
+      // not an error, logrecord is not supported, change to block reshard
+      ldpp_dout(dpp, 0) << "WARNING: " << "trim_reshard_log_entries() does not supported"
+                        << " logrecord, falling back to block reshard mode." << dendl;
+      bucket_info.layout.resharding = rgw::BucketReshardState::InProgress;
+      support_logrecord = false;
+      return 0;
+    }
+    if (ret < 0) {
+      ldpp_dout(dpp, 0) << "ERROR: " << __func__ << " failed to trim reshard log entries: "
+          << cpp_strerror(ret) << dendl;
+      return ret;
+    }
+  }
   return 0;
 } // init_target_layout
 
@@ -591,6 +611,13 @@ static int revert_target_layout(rgw::sal::RadosStore* store,
         "target index with: " << cpp_strerror(ret) << dendl;
     ret = 0; // non-fatal error
   }
+  // trim the reshard log entries written in logrecord state
+  ret = store->getRados()->trim_reshard_log_entries(dpp, bucket_info, null_yield);
+  if (ret < 0) {
+    ldpp_dout(dpp, 1) << "WARNING: " << __func__ << " failed to trim "
+        "reshard log entries: " << cpp_strerror(ret) << dendl;
+    ret = 0; // non-fatal error
+  }
 
   // retry in case of racing writes to the bucket instance metadata
   static constexpr auto max_retries = 10;
@@ -677,7 +704,9 @@ static int init_reshard(rgw::sal::RadosStore* store,
     }
     if (ret == -EOPNOTSUPP) {
       ldpp_dout(dpp, 0) << "WARNING: " << "set_resharding_status()"
-                        << " doesn't support logrecords" << dendl;
+                        << " doesn't support logrecords,"
+                        << " fallback to blocking mode." << dendl;
+      bucket_info.layout.resharding = rgw::BucketReshardState::InProgress;
       support_logrecord = false;
     }
   }
@@ -1251,15 +1280,24 @@ int RGWBucketReshard::do_reshard(const rgw::bucket_index_layout_generation& curr
     if (ret < 0) {
       return ret;
     }
-  }
 
-  // block the client op and complete the resharding
-  ceph_assert(bucket_info.layout.resharding == rgw::BucketReshardState::InProgress);
-  int ret = reshard_process(current, max_op_entries, target_shards_mgr, verbose_json_out, out,
-                            formatter, bucket_info.layout.resharding, dpp, y);
-  if (ret < 0) {
-    ldpp_dout(dpp, 0) << __func__ << ": failed in progress state of reshard ret = " << ret << dendl;
-    return ret;
+    // block the client op and complete the resharding
+    ceph_assert(bucket_info.layout.resharding == rgw::BucketReshardState::InProgress);
+    ret = reshard_process(current, max_op_entries, target_shards_mgr, verbose_json_out, out,
+                              formatter, bucket_info.layout.resharding, dpp, y);
+    if (ret < 0) {
+      ldpp_dout(dpp, 0) << __func__ << ": failed in progress state of reshard ret = " << ret << dendl;
+      return ret;
+    }
+  } else {
+    // setting InProgress state, but doing InLogrecord state
+    ceph_assert(bucket_info.layout.resharding == rgw::BucketReshardState::InProgress);
+    int ret = reshard_process(current, max_op_entries, target_shards_mgr, verbose_json_out, out,
+                              formatter, rgw::BucketReshardState::InLogrecord, dpp, y);
+    if (ret < 0) {
+      ldpp_dout(dpp, 0) << __func__ << ": failed in logrecord state of reshard ret = " << ret << dendl;
+      return ret;
+    }
   }
   return 0;
 } // RGWBucketReshard::do_reshard
index ee8f1451bfa49aab0da9d5a2c055b1e3ad99be72..a40b8e242129ad4ffb2342f5de6b9174b0481f3d 100644 (file)
@@ -317,6 +317,8 @@ void usage()
   cout << "  reshard cancel                   cancel resharding a bucket\n";
   cout << "  reshard stale-instances list     list stale-instances from bucket resharding\n";
   cout << "  reshard stale-instances delete   cleanup stale-instances from bucket resharding\n";
+  cout << "  reshardlog list                  list bucket resharding log\n";
+  cout << "  reshardlog purge                 trim bucket resharding log\n";
   cout << "  sync error list                  list sync error\n";
   cout << "  sync error trim                  trim sync error\n";
   cout << "  mfa create                       create a new MFA TOTP token\n";
@@ -863,6 +865,8 @@ enum class OPT {
   MFA_RESYNC,
   RESHARD_STALE_INSTANCES_LIST,
   RESHARD_STALE_INSTANCES_DELETE,
+  RESHARDLOG_LIST,
+  RESHARDLOG_PURGE,
   PUBSUB_TOPIC_LIST,
   PUBSUB_TOPIC_GET,
   PUBSUB_TOPIC_RM,
@@ -1112,6 +1116,8 @@ static SimpleCmd::Commands all_cmds = {
   { "reshard stale list", OPT::RESHARD_STALE_INSTANCES_LIST },
   { "reshard stale-instances delete", OPT::RESHARD_STALE_INSTANCES_DELETE },
   { "reshard stale delete", OPT::RESHARD_STALE_INSTANCES_DELETE },
+  { "reshardlog list", OPT::RESHARDLOG_LIST},
+  { "reshardlog purge", OPT::RESHARDLOG_PURGE},
   { "topic list", OPT::PUBSUB_TOPIC_LIST },
   { "topic get", OPT::PUBSUB_TOPIC_GET },
   { "topic rm", OPT::PUBSUB_TOPIC_RM },
@@ -11039,6 +11045,90 @@ next:
    }
  }
 
+  if (opt_cmd == OPT::RESHARDLOG_LIST) {
+    if (bucket_name.empty()) {
+      cerr << "ERROR: bucket not specified" << std::endl;
+      return EINVAL;
+    }
+    int ret = init_bucket(tenant, bucket_name, bucket_id, &bucket);
+    if (ret < 0) {
+      cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl;
+      return -ret;
+    }
+
+    list<rgw_cls_bi_entry> entries;
+    bool is_truncated;
+    if (max_entries < 0)
+      max_entries = 1000;
+
+    const auto& index = bucket->get_info().layout.current_index;
+    if (index.layout.type == rgw::BucketIndexType::Indexless) {
+      cerr << "ERROR: indexless bucket has no index to purge" << std::endl;
+      return EINVAL;
+    }
+
+    int max_shards = rgw::num_shards(index);
+
+    formatter->open_array_section("entries");
+    int i = (specified_shard_id ? shard_id : 0);
+    for (; i < max_shards; i++) {
+      formatter->open_object_section("shard");
+      encode_json("shard_id", i, formatter.get());
+      formatter->open_array_section("single shard entries");
+      RGWRados::BucketShard bs(static_cast<rgw::sal::RadosStore*>(driver)->getRados());
+      int ret = bs.init(dpp(), bucket->get_info(), index, i, null_yield);
+      if (ret < 0) {
+        cerr << "ERROR: bs.init(bucket=" << bucket << ", shard=" << i << "): " << cpp_strerror(-ret) << std::endl;
+        return -ret;
+      }
+
+      marker.clear();
+      do {
+        entries.clear();
+        ret = static_cast<rgw::sal::RadosStore*>(driver)->getRados()->bi_list(bs, "", marker, max_entries,
+                                                                              &entries, &is_truncated,
+                                                                              true, null_yield);
+        if (ret < 0) {
+          cerr << "ERROR: bi_list(): " << cpp_strerror(-ret) << std::endl;
+          return -ret;
+        }
+
+        list<rgw_cls_bi_entry>::iterator iter;
+        for (iter = entries.begin(); iter != entries.end(); ++iter) {
+          rgw_cls_bi_entry& entry = *iter;
+          formatter->dump_string("idx", entry.idx);
+          marker = entry.idx;
+        }
+        formatter->flush(cout);
+      } while (is_truncated);
+      formatter->close_section();
+      formatter->close_section();
+      formatter->flush(cout);
+
+      if (specified_shard_id)
+        break;
+    }
+    formatter->close_section();
+    formatter->flush(cout);
+  }
+
+  if (opt_cmd == OPT::RESHARDLOG_PURGE) {
+    if (bucket_name.empty()) {
+      cerr << "ERROR: bucket not specified" << std::endl;
+      return EINVAL;
+    }
+    int ret = init_bucket(tenant, bucket_name, bucket_id, &bucket);
+    if (ret < 0) {
+      cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl;
+      return -ret;
+    }
+    ret = static_cast<rgw::sal::RadosStore*>(driver)->getRados()->trim_reshard_log_entries(dpp(), bucket->get_info(), null_yield);
+    if (ret < 0) {
+      cerr << "ERROR: trim_reshard_log_entries(): " << cpp_strerror(-ret) << std::endl;
+      return -ret;
+    }
+  }
+
   if (opt_cmd == OPT::PUBSUB_NOTIFICATION_LIST) {
     if (bucket_name.empty()) {
       cerr << "ERROR: bucket name was not provided (via --bucket)" << std::endl;