/* bucket index */
cls_register_cxx_method(h_class, RGW_BUCKET_INIT_INDEX, CLS_METHOD_RD | CLS_METHOD_WR, rgw_bucket_init_index, &h_rgw_bucket_init_index);
+ cls_register_cxx_method(h_class, RGW_BUCKET_INIT_INDEX2, CLS_METHOD_RD | CLS_METHOD_WR, rgw_bucket_init_index, &h_rgw_bucket_init_index);
cls_register_cxx_method(h_class, RGW_BUCKET_SET_TAG_TIMEOUT, CLS_METHOD_RD | CLS_METHOD_WR, rgw_bucket_set_tag_timeout, &h_rgw_bucket_set_tag_timeout);
cls_register_cxx_method(h_class, RGW_BUCKET_LIST, CLS_METHOD_RD, rgw_bucket_list, &h_rgw_bucket_list);
cls_register_cxx_method(h_class, RGW_BUCKET_CHECK_INDEX, CLS_METHOD_RD, rgw_bucket_check_index, &h_rgw_bucket_check_index);
/* resharding attribute */
cls_register_cxx_method(h_class, RGW_SET_BUCKET_RESHARDING, CLS_METHOD_RD | CLS_METHOD_WR,
rgw_set_bucket_resharding, &h_rgw_set_bucket_resharding);
+ cls_register_cxx_method(h_class, RGW_SET_BUCKET_RESHARDING2, CLS_METHOD_RD | CLS_METHOD_WR,
+ rgw_set_bucket_resharding, &h_rgw_set_bucket_resharding);
cls_register_cxx_method(h_class, RGW_CLEAR_BUCKET_RESHARDING, CLS_METHOD_RD | CLS_METHOD_WR,
rgw_clear_bucket_resharding, &h_rgw_clear_bucket_resharding);
cls_register_cxx_method(h_class, RGW_GUARD_BUCKET_RESHARDING, CLS_METHOD_RD ,
return manager->aio_operate(io_ctx, shard_id, oid, &op);
}
+static bool issue_bucket_index_init_op2(librados::IoCtx& io_ctx,
+ const int shard_id,
+ const string& oid,
+ BucketIndexAioManager *manager) {
+ bufferlist in;
+ librados::ObjectWriteOperation op;
+ op.create(true);
+ op.exec(RGW_CLASS, RGW_BUCKET_INIT_INDEX2, in);
+ return manager->aio_operate(io_ctx, shard_id, oid, &op);
+}
+
static bool issue_bucket_index_clean_op(librados::IoCtx& io_ctx,
const int shard_id,
const string& oid,
return issue_bucket_index_init_op(io_ctx, shard_id, oid, &manager);
}
+int CLSRGWIssueBucketIndexInit2::issue_op(const int shard_id, const string& oid)
+{
+ return issue_bucket_index_init_op2(io_ctx, shard_id, oid, &manager);
+}
+
void CLSRGWIssueBucketIndexInit::cleanup()
{
// Do best effort removal
}
}
+void CLSRGWIssueBucketIndexInit2::cleanup()
+{
+ // Do best effort removal
+ for (auto citer = objs_container.begin(); citer != iter; ++citer) {
+ io_ctx.remove(citer->second);
+ }
+}
+
int CLSRGWIssueBucketIndexClean::issue_op(const int shard_id, const string& oid)
{
return issue_bucket_index_clean_op(io_ctx, shard_id, oid, &manager);
return manager->aio_operate(io_ctx, shard_id, oid, &op);
}
+static bool issue_set_bucket_resharding2(librados::IoCtx& io_ctx,
+ const int shard_id, const string& oid,
+ const cls_rgw_bucket_instance_entry& entry,
+ BucketIndexAioManager *manager) {
+ bufferlist in;
+ cls_rgw_set_bucket_resharding_op call;
+ call.entry = entry;
+ encode(call, in);
+ librados::ObjectWriteOperation op;
+ op.assert_exists(); // the shard must exist; if not fail rather than recreate
+ op.exec(RGW_CLASS, RGW_SET_BUCKET_RESHARDING2, in);
+ return manager->aio_operate(io_ctx, shard_id, oid, &op);
+}
+
int CLSRGWIssueSetBucketResharding::issue_op(const int shard_id, const string& oid)
{
return issue_set_bucket_resharding(io_ctx, shard_id, oid, entry, &manager);
}
+
+int CLSRGWIssueSetBucketResharding2::issue_op(const int shard_id, const string& oid)
+{
+ return issue_set_bucket_resharding2(io_ctx, shard_id, oid, entry, &manager);
+}
};
+class CLSRGWIssueBucketIndexInit2 : public CLSRGWConcurrentIO {
+protected:
+ int issue_op(int shard_id, const std::string& oid) override;
+ int valid_ret_code() override { return -EEXIST; }
+ void cleanup() override;
+public:
+ CLSRGWIssueBucketIndexInit2(librados::IoCtx& ioc,
+ std::map<int, std::string>& _bucket_objs,
+ uint32_t _max_aio) :
+ CLSRGWConcurrentIO(ioc, _bucket_objs, _max_aio) {}
+ virtual ~CLSRGWIssueBucketIndexInit2() override {}
+};
+
+
class CLSRGWIssueBucketIndexClean : public CLSRGWConcurrentIO {
protected:
int issue_op(int shard_id, const std::string& oid) override;
virtual ~CLSRGWIssueSetBucketResharding() override {}
};
+class CLSRGWIssueSetBucketResharding2 : public CLSRGWConcurrentIO {
+ cls_rgw_bucket_instance_entry entry;
+protected:
+ int issue_op(int shard_id, const std::string& oid) override;
+public:
+ CLSRGWIssueSetBucketResharding2(librados::IoCtx& ioc, std::map<int, std::string>& _bucket_objs,
+ const cls_rgw_bucket_instance_entry& _entry,
+ uint32_t _max_aio) : CLSRGWConcurrentIO(ioc, _bucket_objs, _max_aio), entry(_entry) {}
+ virtual ~CLSRGWIssueSetBucketResharding2() override {}
+};
+
class CLSRGWIssueResyncBucketBILog : public CLSRGWConcurrentIO {
protected:
int issue_op(int shard_id, const std::string& oid);
/* bucket index */
#define RGW_BUCKET_INIT_INDEX "bucket_init_index"
-
+#define RGW_BUCKET_INIT_INDEX2 "bucket_init_index2"
#define RGW_BUCKET_SET_TAG_TIMEOUT "bucket_set_tag_timeout"
#define RGW_BUCKET_LIST "bucket_list"
/* resharding attribute */
#define RGW_SET_BUCKET_RESHARDING "set_bucket_resharding"
+#define RGW_SET_BUCKET_RESHARDING2 "set_bucket_resharding2"
#define RGW_CLEAR_BUCKET_RESHARDING "clear_bucket_resharding"
#define RGW_GUARD_BUCKET_RESHARDING "guard_bucket_resharding"
#define RGW_GET_BUCKET_RESHARDING "get_bucket_resharding"
enum class cls_rgw_reshard_status : uint8_t {
NOT_RESHARDING = 0,
- IN_LOGRECORD = 1,
- IN_PROGRESS = 2,
- DONE = 3
+ IN_PROGRESS = 1,
+ DONE = 2,
+ IN_LOGRECORD = 3
};
std::ostream& operator<<(std::ostream&, cls_rgw_reshard_status);
return 0;
}
-int RGWRados::bucket_set_reshard(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const cls_rgw_bucket_instance_entry& entry)
+int RGWRados::bucket_set_reshard(const DoutPrefixProvider *dpp,
+ const RGWBucketInfo& bucket_info,
+ const cls_rgw_bucket_instance_entry& entry,
+ bool judge_support_logrecord)
{
librados::IoCtx index_pool;
map<int, string> bucket_objs;
return r;
}
- r = CLSRGWIssueSetBucketResharding(index_pool, bucket_objs, entry, cct->_conf->rgw_bucket_index_max_aio)();
+ if (judge_support_logrecord)
+ r = CLSRGWIssueSetBucketResharding2(index_pool, bucket_objs, entry, cct->_conf->rgw_bucket_index_max_aio)();
+ else
+ r = CLSRGWIssueSetBucketResharding(index_pool, bucket_objs, entry, cct->_conf->rgw_bucket_index_max_aio)();
if (r < 0) {
ldpp_dout(dpp, 0) << "ERROR: " << __func__ <<
": unable to issue set bucket resharding, r=" << r << " (" <<
const std::string& marker,
RGWFormatterFlusher& flusher);
- int bucket_set_reshard(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const cls_rgw_bucket_instance_entry& entry);
+ int bucket_set_reshard(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info,
+ const cls_rgw_bucket_instance_entry& entry,
+ bool judge_support_logrecord = false);
int remove_objs_from_index(const DoutPrefixProvider *dpp,
RGWBucketInfo& bucket_info,
const std::list<rgw_obj_index_key>& oid_list);
static int set_resharding_status(const DoutPrefixProvider *dpp,
rgw::sal::RadosStore* store,
const RGWBucketInfo& bucket_info,
- cls_rgw_reshard_status status)
+ cls_rgw_reshard_status status,
+ bool judge_support_logrecord = false)
{
cls_rgw_bucket_instance_entry instance_entry;
instance_entry.set_status(status);
- int ret = store->getRados()->bucket_set_reshard(dpp, bucket_info, instance_entry);
+ int ret = store->getRados()->bucket_set_reshard(dpp, bucket_info, instance_entry, judge_support_logrecord);
if (ret < 0) {
ldpp_dout(dpp, 0) << "RGWReshard::" << __func__ << " ERROR: error setting bucket resharding flag on bucket index: "
<< cpp_strerror(-ret) << dendl;
static int init_target_index(rgw::sal::RadosStore* store,
RGWBucketInfo& bucket_info,
const rgw::bucket_index_layout_generation& index,
+ bool& support_logrecord,
const DoutPrefixProvider* dpp)
{
- int ret = store->svc()->bi->init_index(dpp, bucket_info, index);
+ int ret = store->svc()->bi->init_index(dpp, bucket_info, index, true);
+ if (ret == -EOPNOTSUPP) {
+ ldpp_dout(dpp, 0) << "WARNING: " << "init_index() does not supported logrecord" << dendl;
+ support_logrecord = false;
+ ret = store->svc()->bi->init_index(dpp, bucket_info, index, false);
+ }
if (ret < 0) {
ldpp_dout(dpp, 0) << "ERROR: " << __func__ << " failed to initialize "
"target index shard objects: " << cpp_strerror(ret) << dendl;
std::map<std::string, bufferlist>& bucket_attrs,
ReshardFaultInjector& fault,
const uint32_t new_num_shards,
+ bool& support_logrecord,
const DoutPrefixProvider* dpp, optional_yield y)
{
auto prev = bucket_info.layout; // make a copy for cleanup
}
// create the index shard objects
- int ret = init_target_index(store, bucket_info, target, dpp);
+ int ret = init_target_index(store, bucket_info, target, support_logrecord, dpp);
if (ret < 0) {
return ret;
}
// update resharding state
bucket_info.layout.target_index = target;
- bucket_info.layout.resharding = rgw::BucketReshardState::InLogrecord;
+ if (support_logrecord) {
+ bucket_info.layout.resharding = rgw::BucketReshardState::InLogrecord;
+ } else {
+ bucket_info.layout.resharding = rgw::BucketReshardState::InProgress;
+ }
// update the judge time meanwhile
bucket_info.layout.judge_reshard_lock_time = ceph::real_clock::now();
std::map<std::string, bufferlist>& bucket_attrs,
ReshardFaultInjector& fault,
const uint32_t new_num_shards,
+ bool& support_logrecord,
const DoutPrefixProvider *dpp, optional_yield y)
{
if (new_num_shards == 0) {
return -EINVAL;
}
- int ret = init_target_layout(store, bucket_info, bucket_attrs, fault, new_num_shards, dpp, y);
+ int ret = init_target_layout(store, bucket_info, bucket_attrs, fault, new_num_shards,
+ support_logrecord, dpp, y);
if (ret < 0) {
return ret;
}
- if (ret = fault.check("logrecord_writes");
- ret == 0) { // no fault injected, record log with writing to the current index shards
- ret = set_resharding_status(dpp, store, bucket_info,
- cls_rgw_reshard_status::IN_LOGRECORD);
+ if (support_logrecord) {
+ if (ret = fault.check("logrecord_writes");
+ ret == 0) { // no fault injected, record log with writing to the current index shards
+ ret = set_resharding_status(dpp, store, bucket_info,
+ cls_rgw_reshard_status::IN_LOGRECORD,
+ true);
+ }
+ if (ret == -EOPNOTSUPP) {
+ ldpp_dout(dpp, 0) << "WARNING: " << "set_resharding_status()"
+ << " doesn't support logrecords" << dendl;
+ support_logrecord = false;
+ }
}
+ if (!support_logrecord) {
+ ret = set_resharding_status(dpp, store, bucket_info,
+ cls_rgw_reshard_status::IN_PROGRESS,
+ false);
+ }
if (ret < 0) {
ldpp_dout(dpp, 0) << "ERROR: " << __func__ << " failed to pause "
"writes to the current index: " << cpp_strerror(ret) << dendl;
int RGWBucketReshard::do_reshard(const rgw::bucket_index_layout_generation& current,
const rgw::bucket_index_layout_generation& target,
int max_op_entries, // max num to process per op
+ bool support_logrecord,
bool verbose,
ostream *out,
Formatter *formatter,
bool verbose_json_out = verbose && (formatter != nullptr) && (out != nullptr);
- // a log is written to shard going with client op at this state
- ceph_assert(bucket_info.layout.resharding == rgw::BucketReshardState::InLogrecord);
- int ret = reshard_process(current, max_op_entries, target_shards_mgr, verbose_json_out, out,
- formatter, bucket_info.layout.resharding, dpp, y);
- if (ret < 0) {
- ldpp_dout(dpp, 0) << __func__ << ": failed in logrecord state of reshard ret = " << ret << dendl;
- return ret;
- }
+ if (support_logrecord) {
+ // a log is written to shard going with client op at this state
+ ceph_assert(bucket_info.layout.resharding == rgw::BucketReshardState::InLogrecord);
+ int ret = reshard_process(current, max_op_entries, target_shards_mgr, verbose_json_out, out,
+ formatter, bucket_info.layout.resharding, dpp, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << __func__ << ": failed in logrecord state of reshard ret = " << ret << dendl;
+ return ret;
+ }
- ret = change_reshard_state(store, bucket_info, bucket_attrs, fault, dpp, y);
- if (ret < 0) {
- return ret;
+ ret = change_reshard_state(store, bucket_info, bucket_attrs, fault, dpp, y);
+ if (ret < 0) {
+ return ret;
+ }
}
// block the client op and complete the resharding
ceph_assert(bucket_info.layout.resharding == rgw::BucketReshardState::InProgress);
- ret = reshard_process(current, max_op_entries, target_shards_mgr, verbose_json_out, out,
- formatter, bucket_info.layout.resharding, dpp, y);
+ int ret = reshard_process(current, max_op_entries, target_shards_mgr, verbose_json_out, out,
+ formatter, bucket_info.layout.resharding, dpp, y);
if (ret < 0) {
ldpp_dout(dpp, 0) << __func__ << ": failed in progress state of reshard ret = " << ret << dendl;
return ret;
}
}
+ bool support_logrecord = true;
// prepare the target index and add its layout the bucket info
- ret = init_reshard(store, bucket_info, bucket_attrs, fault, num_shards, dpp, y);
+ ret = init_reshard(store, bucket_info, bucket_attrs, fault, num_shards,
+ support_logrecord, dpp, y);
if (ret < 0) {
return ret;
}
ret == 0) { // no fault injected, do the reshard
ret = do_reshard(bucket_info.layout.current_index,
*bucket_info.layout.target_index,
- max_op_entries, verbose, out, formatter, fault, dpp, y);
+ max_op_entries, support_logrecord,
+ verbose, out, formatter, fault, dpp, y);
}
if (ret < 0) {
const DoutPrefixProvider *dpp, optional_yield y);
int do_reshard(const rgw::bucket_index_layout_generation& current,
const rgw::bucket_index_layout_generation& target,
- int max_entries,
+ int max_entries, bool support_logrecord,
bool verbose,
std::ostream *os,
Formatter *formatter,
enum class BucketReshardState : uint8_t {
None,
- InLogrecord,
InProgress,
+ InLogrecord,
};
std::string_view to_string(const BucketReshardState& s);
bool parse(std::string_view str, BucketReshardState& s);
RGWSI_BucketIndex(CephContext *cct) : RGWServiceInstance(cct) {}
virtual ~RGWSI_BucketIndex() {}
- virtual int init_index(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info, const rgw::bucket_index_layout_generation& idx_layout) = 0;
- virtual int clean_index(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info, const rgw::bucket_index_layout_generation& idx_layout) = 0;
+ virtual int init_index(const DoutPrefixProvider *dpp,
+ RGWBucketInfo& bucket_info,
+ const rgw::bucket_index_layout_generation& idx_layout,
+ bool judge_support_logrecord = false) = 0;
+ virtual int clean_index(const DoutPrefixProvider *dpp,
+ RGWBucketInfo& bucket_info,
+ const rgw::bucket_index_layout_generation& idx_layout) = 0;
virtual int read_stats(const DoutPrefixProvider *dpp,
const RGWBucketInfo& bucket_info,
return 0;
}
-int RGWSI_BucketIndex_RADOS::init_index(const DoutPrefixProvider *dpp,RGWBucketInfo& bucket_info, const rgw::bucket_index_layout_generation& idx_layout)
+int RGWSI_BucketIndex_RADOS::init_index(const DoutPrefixProvider *dpp,
+ RGWBucketInfo& bucket_info,
+ const rgw::bucket_index_layout_generation& idx_layout,
+ bool judge_support_logrecord)
{
librados::IoCtx index_pool;
map<int, string> bucket_objs;
get_bucket_index_objects(dir_oid, idx_layout.layout.normal.num_shards, idx_layout.gen, &bucket_objs);
- return CLSRGWIssueBucketIndexInit(index_pool,
- bucket_objs,
- cct->_conf->rgw_bucket_index_max_aio)();
+ if (judge_support_logrecord) {
+ return CLSRGWIssueBucketIndexInit2(index_pool,
+ bucket_objs,
+ cct->_conf->rgw_bucket_index_max_aio)();
+ } else {
+ return CLSRGWIssueBucketIndexInit(index_pool,
+ bucket_objs,
+ cct->_conf->rgw_bucket_index_max_aio)();
+ }
}
int RGWSI_BucketIndex_RADOS::clean_index(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info, const rgw::bucket_index_layout_generation& idx_layout)
return bucket_shard_index(sharding_key, num_shards);
}
- int init_index(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info,const rgw::bucket_index_layout_generation& idx_layout) override;
- int clean_index(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info, const rgw::bucket_index_layout_generation& idx_layout) override;
+ int init_index(const DoutPrefixProvider *dpp,
+ RGWBucketInfo& bucket_info,
+ const rgw::bucket_index_layout_generation& idx_layout,
+ bool judge_support_logrecord = false) override;
+ int clean_index(const DoutPrefixProvider *dpp,
+ RGWBucketInfo& bucket_info,
+ const rgw::bucket_index_layout_generation& idx_layout) override;
/* RADOS specific */
{
map<int, string> bucket_objs;
bucket_objs[0] = oid;
- int r = CLSRGWIssueSetBucketResharding(ioctx, bucket_objs, entry, 1)();
+ int r = CLSRGWIssueSetBucketResharding2(ioctx, bucket_objs, entry, 1)();
ASSERT_EQ(0, r);
}