return write_bucket_header(hctx, &header);
}
+static int rgw_reshard_log_trim_op(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+ string key_begin(1, BI_PREFIX_CHAR);
+ key_begin.append(bucket_index_prefixes[BI_BUCKET_RESHARD_LOG_INDEX]);
+
+ string key_end;
+ key_end = BI_PREFIX_CHAR;
+ key_end.append(bucket_index_prefixes[BI_BUCKET_RESHARD_LOG_INDEX + 1]);
+
+ // list a single key to detect whether the range is empty
+ const size_t max_entries = 1;
+ std::set<std::string> keys;
+ bool more = false;
+
+ int rc = cls_cxx_map_get_keys(hctx, key_begin, max_entries, &keys, &more);
+ if (rc < 0) {
+ CLS_LOG(1, "ERROR: cls_cxx_map_get_keys failed rc=%d", rc);
+ return rc;
+ }
+
+ if (keys.empty()) {
+ CLS_LOG(20, "range is empty key_begin=%s", key_begin.c_str());
+ return -ENODATA;
+ }
+
+ const std::string& first_key = *keys.begin();
+ if (key_end < first_key) {
+ CLS_LOG(20, "listed key %s past key_end=%s", first_key.c_str(), key_end.c_str());
+ return -ENODATA;
+ }
+
+ CLS_LOG(20, "listed key %s, removing through %s",
+ first_key.c_str(), key_end.c_str());
+
+ rc = cls_cxx_map_remove_range(hctx, first_key, key_end);
+ if (rc < 0) {
+ CLS_LOG(1, "ERROR: cls_cxx_map_remove_range failed rc=%d", rc);
+ return rc;
+ }
+ return 0;
+}
+
static void usage_record_prefix_by_time(uint64_t epoch, string& key)
{
char buf[32];
cls_method_handle_t h_rgw_bi_get_vals_op;
cls_method_handle_t h_rgw_bi_put_op;
cls_method_handle_t h_rgw_bi_list_op;
+ cls_method_handle_t h_rgw_reshard_log_trim_op;
cls_method_handle_t h_rgw_bi_log_list_op;
cls_method_handle_t h_rgw_bi_log_trim_op;
cls_method_handle_t h_rgw_bi_log_resync_op;
cls_register_cxx_method(h_class, RGW_BI_GET_VALS, CLS_METHOD_RD, rgw_bi_get_vals_op, &h_rgw_bi_get_vals_op);
cls_register_cxx_method(h_class, RGW_BI_PUT, CLS_METHOD_RD | CLS_METHOD_WR, rgw_bi_put_op, &h_rgw_bi_put_op);
cls_register_cxx_method(h_class, RGW_BI_LIST, CLS_METHOD_RD, rgw_bi_list_op, &h_rgw_bi_list_op);
+ cls_register_cxx_method(h_class, RGW_RESHARD_LOG_TRIM, CLS_METHOD_RD | CLS_METHOD_WR, rgw_reshard_log_trim_op, &h_rgw_reshard_log_trim_op);
cls_register_cxx_method(h_class, RGW_BI_LOG_LIST, CLS_METHOD_RD, rgw_bi_log_list, &h_rgw_bi_log_list_op);
cls_register_cxx_method(h_class, RGW_BI_LOG_TRIM, CLS_METHOD_RD | CLS_METHOD_WR, rgw_bi_log_trim, &h_rgw_bi_log_trim_op);
return issue_bi_log_trim(io_ctx, oid, shard_id, start_marker_mgr, end_marker_mgr, &manager);
}
+static bool issue_reshard_log_trim(librados::IoCtx& io_ctx, const string& oid, int shard_id,
+ BucketIndexAioManager *manager) {
+ bufferlist in;
+ ObjectWriteOperation op;
+ op.exec(RGW_CLASS, RGW_RESHARD_LOG_TRIM, in);
+ return manager->aio_operate(io_ctx, shard_id, oid, &op);
+}
+
+int CLSRGWIssueReshardLogTrim::issue_op(int shard_id, const string& oid)
+{
+ return issue_reshard_log_trim(io_ctx, oid, shard_id, &manager);
+}
+
static bool issue_bucket_check_index_op(IoCtx& io_ctx, const int shard_id, const string& oid, BucketIndexAioManager *manager,
rgw_cls_check_index_ret *pdata) {
bufferlist in;
virtual ~CLSRGWIssueBILogTrim() override {}
};
+class CLSRGWIssueReshardLogTrim : public CLSRGWConcurrentIO {
+protected:
+ int issue_op(int shard_id, const std::string& oid) override;
+ // Trim until -ENODATA is returned.
+ int valid_ret_code() override { return -ENODATA; }
+ bool need_multiple_rounds() override { return true; }
+ void add_object(int shard, const std::string& oid) override { objs_container[shard] = oid; }
+ void reset_container(std::map<int, std::string>& objs) override {
+ objs_container.swap(objs);
+ iter = objs_container.begin();
+ objs.clear();
+ }
+public:
+ CLSRGWIssueReshardLogTrim(librados::IoCtx& io_ctx, std::map<int, std::string>& _bucket_objs, uint32_t max_aio) :
+ CLSRGWConcurrentIO(io_ctx, _bucket_objs, max_aio) {}
+};
+
/**
* Check the bucket index.
*
#define RGW_BI_PUT "bi_put"
#define RGW_BI_LIST "bi_list"
+#define RGW_RESHARD_LOG_TRIM "reshard_log_trim"
+
#define RGW_BI_LOG_LIST "bi_log_list"
#define RGW_BI_LOG_TRIM "bi_log_trim"
#define RGW_DIR_SUGGEST_CHANGES "dir_suggest_changes"
} else {
ldpp_dout(dpp,20) << __func__ << ": reshard lock success, " <<
"that means the reshard has failed for bucekt " << bucket_info.bucket.bucket_id << dendl;
- // clear the RESHARD_IN_PROGRESS status after reshard failed, also set bucket instance
- // status to CLS_RGW_RESHARD_NONE
+ // clear the RESHARD_IN_PROGRESS status after reshard failed, set bucket instance status
+ // to CLS_RGW_RESHARD_NONE, also clear the reshard log entries
ret = RGWBucketReshard::clear_resharding(this->driver, bucket_info, bucket_attrs, dpp, y);
reshard_lock.unlock();
if (ret < 0) {
return 0;
}
+int RGWRados::trim_reshard_log_entries(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info, optional_yield y)
+{
+ librados::IoCtx index_pool;
+ map<int, string> bucket_objs;
+
+ int r = svc.bi_rados->open_bucket_index(dpp, bucket_info, std::nullopt, bucket_info.layout.current_index, &index_pool, &bucket_objs, nullptr);
+ if (r < 0) {
+ return r;
+ }
+ return CLSRGWIssueReshardLogTrim(index_pool, bucket_objs, cct->_conf->rgw_bucket_index_max_aio)();
+}
+
int RGWRados::gc_operate(const DoutPrefixProvider *dpp, string& oid, librados::ObjectWriteOperation *op, optional_yield y)
{
return rgw_rados_operate(dpp, gc_pool_ctx, oid, op, y);
std::list<rgw_cls_bi_entry> *entries, bool *is_truncated, bool reshardlog, optional_yield y);
int bi_remove(const DoutPrefixProvider *dpp, BucketShard& bs);
+ int trim_reshard_log_entries(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info, optional_yield y);
+
int cls_obj_usage_log_add(const DoutPrefixProvider *dpp, const std::string& oid, rgw_usage_log_info& info, optional_yield y);
int cls_obj_usage_log_read(const DoutPrefixProvider *dpp, const std::string& oid, const std::string& user, const std::string& bucket, uint64_t start_epoch,
uint64_t end_epoch, uint32_t max_entries, std::string& read_iter,
{
int ret = store->svc()->bi->init_index(dpp, bucket_info, index, true);
if (ret == -EOPNOTSUPP) {
- ldpp_dout(dpp, 0) << "WARNING: " << "init_index() does not supported logrecord" << dendl;
+ ldpp_dout(dpp, 0) << "WARNING: " << "init_index() does not supported logrecord, "
+ << "falling back to block reshard mode." << dendl;
support_logrecord = false;
ret = store->svc()->bi->init_index(dpp, bucket_info, index, false);
}
store->svc()->bi->clean_index(dpp, bucket_info, target);
return ret;
}
+
+ // trim the reshard log entries to guarantee that any existing log entries are cleared,
+ // if there are no reshard log entries, this is a no-op that costs little time
+ if (support_logrecord) {
+ ret = store->getRados()->trim_reshard_log_entries(dpp, bucket_info, null_yield);
+ if (ret == -EOPNOTSUPP) {
+ // not an error, logrecord is not supported, change to block reshard
+ ldpp_dout(dpp, 0) << "WARNING: " << "trim_reshard_log_entries() does not supported"
+ << " logrecord, falling back to block reshard mode." << dendl;
+ bucket_info.layout.resharding = rgw::BucketReshardState::InProgress;
+ support_logrecord = false;
+ return 0;
+ }
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "ERROR: " << __func__ << " failed to trim reshard log entries: "
+ << cpp_strerror(ret) << dendl;
+ return ret;
+ }
+ }
return 0;
} // init_target_layout
"target index with: " << cpp_strerror(ret) << dendl;
ret = 0; // non-fatal error
}
+ // trim the reshard log entries written in logrecord state
+ ret = store->getRados()->trim_reshard_log_entries(dpp, bucket_info, null_yield);
+ if (ret < 0) {
+ ldpp_dout(dpp, 1) << "WARNING: " << __func__ << " failed to trim "
+ "reshard log entries: " << cpp_strerror(ret) << dendl;
+ ret = 0; // non-fatal error
+ }
// retry in case of racing writes to the bucket instance metadata
static constexpr auto max_retries = 10;
}
if (ret == -EOPNOTSUPP) {
ldpp_dout(dpp, 0) << "WARNING: " << "set_resharding_status()"
- << " doesn't support logrecords" << dendl;
+ << " doesn't support logrecords,"
+ << " fallback to blocking mode." << dendl;
+ bucket_info.layout.resharding = rgw::BucketReshardState::InProgress;
support_logrecord = false;
}
}
if (ret < 0) {
return ret;
}
- }
- // block the client op and complete the resharding
- ceph_assert(bucket_info.layout.resharding == rgw::BucketReshardState::InProgress);
- int ret = reshard_process(current, max_op_entries, target_shards_mgr, verbose_json_out, out,
- formatter, bucket_info.layout.resharding, dpp, y);
- if (ret < 0) {
- ldpp_dout(dpp, 0) << __func__ << ": failed in progress state of reshard ret = " << ret << dendl;
- return ret;
+ // block the client op and complete the resharding
+ ceph_assert(bucket_info.layout.resharding == rgw::BucketReshardState::InProgress);
+ ret = reshard_process(current, max_op_entries, target_shards_mgr, verbose_json_out, out,
+ formatter, bucket_info.layout.resharding, dpp, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << __func__ << ": failed in progress state of reshard ret = " << ret << dendl;
+ return ret;
+ }
+ } else {
+ // setting InProgress state, but doing InLogrecord state
+ ceph_assert(bucket_info.layout.resharding == rgw::BucketReshardState::InProgress);
+ int ret = reshard_process(current, max_op_entries, target_shards_mgr, verbose_json_out, out,
+ formatter, rgw::BucketReshardState::InLogrecord, dpp, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << __func__ << ": failed in logrecord state of reshard ret = " << ret << dendl;
+ return ret;
+ }
}
return 0;
} // RGWBucketReshard::do_reshard
cout << " reshard cancel cancel resharding a bucket\n";
cout << " reshard stale-instances list list stale-instances from bucket resharding\n";
cout << " reshard stale-instances delete cleanup stale-instances from bucket resharding\n";
+ cout << " reshardlog list list bucket resharding log\n";
+ cout << " reshardlog purge trim bucket resharding log\n";
cout << " sync error list list sync error\n";
cout << " sync error trim trim sync error\n";
cout << " mfa create create a new MFA TOTP token\n";
MFA_RESYNC,
RESHARD_STALE_INSTANCES_LIST,
RESHARD_STALE_INSTANCES_DELETE,
+ RESHARDLOG_LIST,
+ RESHARDLOG_PURGE,
PUBSUB_TOPIC_LIST,
PUBSUB_TOPIC_GET,
PUBSUB_TOPIC_RM,
{ "reshard stale list", OPT::RESHARD_STALE_INSTANCES_LIST },
{ "reshard stale-instances delete", OPT::RESHARD_STALE_INSTANCES_DELETE },
{ "reshard stale delete", OPT::RESHARD_STALE_INSTANCES_DELETE },
+ { "reshardlog list", OPT::RESHARDLOG_LIST},
+ { "reshardlog purge", OPT::RESHARDLOG_PURGE},
{ "topic list", OPT::PUBSUB_TOPIC_LIST },
{ "topic get", OPT::PUBSUB_TOPIC_GET },
{ "topic rm", OPT::PUBSUB_TOPIC_RM },
}
}
+ if (opt_cmd == OPT::RESHARDLOG_LIST) {
+ if (bucket_name.empty()) {
+ cerr << "ERROR: bucket not specified" << std::endl;
+ return EINVAL;
+ }
+ int ret = init_bucket(tenant, bucket_name, bucket_id, &bucket);
+ if (ret < 0) {
+ cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl;
+ return -ret;
+ }
+
+ list<rgw_cls_bi_entry> entries;
+ bool is_truncated;
+ if (max_entries < 0)
+ max_entries = 1000;
+
+ const auto& index = bucket->get_info().layout.current_index;
+ if (index.layout.type == rgw::BucketIndexType::Indexless) {
+ cerr << "ERROR: indexless bucket has no index to purge" << std::endl;
+ return EINVAL;
+ }
+
+ int max_shards = rgw::num_shards(index);
+
+ formatter->open_array_section("entries");
+ int i = (specified_shard_id ? shard_id : 0);
+ for (; i < max_shards; i++) {
+ formatter->open_object_section("shard");
+ encode_json("shard_id", i, formatter.get());
+ formatter->open_array_section("single shard entries");
+ RGWRados::BucketShard bs(static_cast<rgw::sal::RadosStore*>(driver)->getRados());
+ int ret = bs.init(dpp(), bucket->get_info(), index, i, null_yield);
+ if (ret < 0) {
+ cerr << "ERROR: bs.init(bucket=" << bucket << ", shard=" << i << "): " << cpp_strerror(-ret) << std::endl;
+ return -ret;
+ }
+
+ marker.clear();
+ do {
+ entries.clear();
+ ret = static_cast<rgw::sal::RadosStore*>(driver)->getRados()->bi_list(bs, "", marker, max_entries,
+ &entries, &is_truncated,
+ true, null_yield);
+ if (ret < 0) {
+ cerr << "ERROR: bi_list(): " << cpp_strerror(-ret) << std::endl;
+ return -ret;
+ }
+
+ list<rgw_cls_bi_entry>::iterator iter;
+ for (iter = entries.begin(); iter != entries.end(); ++iter) {
+ rgw_cls_bi_entry& entry = *iter;
+ formatter->dump_string("idx", entry.idx);
+ marker = entry.idx;
+ }
+ formatter->flush(cout);
+ } while (is_truncated);
+ formatter->close_section();
+ formatter->close_section();
+ formatter->flush(cout);
+
+ if (specified_shard_id)
+ break;
+ }
+ formatter->close_section();
+ formatter->flush(cout);
+ }
+
+ if (opt_cmd == OPT::RESHARDLOG_PURGE) {
+ if (bucket_name.empty()) {
+ cerr << "ERROR: bucket not specified" << std::endl;
+ return EINVAL;
+ }
+ int ret = init_bucket(tenant, bucket_name, bucket_id, &bucket);
+ if (ret < 0) {
+ cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl;
+ return -ret;
+ }
+ ret = static_cast<rgw::sal::RadosStore*>(driver)->getRados()->trim_reshard_log_entries(dpp(), bucket->get_info(), null_yield);
+ if (ret < 0) {
+ cerr << "ERROR: trim_reshard_log_entries(): " << cpp_strerror(-ret) << std::endl;
+ return -ret;
+ }
+ }
+
if (opt_cmd == OPT::PUBSUB_NOTIFICATION_LIST) {
if (bucket_name.empty()) {
cerr << "ERROR: bucket name was not provided (via --bucket)" << std::endl;