const string BucketIndexShardsManager::KEY_VALUE_SEPARATOR = "#";
const string BucketIndexShardsManager::SHARDS_SEPARATOR = ",";
-
-int CLSRGWConcurrentIO::operator()() {
- int ret = 0;
- iter = objs_container.begin();
- for (; iter != objs_container.end() && max_aio-- > 0; ++iter) {
- ret = issue_op(iter->first, iter->second);
- if (ret < 0)
- break;
- }
-
- int num_completions = 0, r = 0;
- std::map<int, std::string> completed_objs;
- std::map<int, std::string> retry_objs;
- while (manager.wait_for_completions(valid_ret_code(), &num_completions, &r,
- need_multiple_rounds() ? &completed_objs : nullptr,
- !need_multiple_rounds() ? &retry_objs : nullptr)) {
- if (r >= 0 && ret >= 0) {
- for (; num_completions && iter != objs_container.end(); --num_completions, ++iter) {
- int issue_ret = issue_op(iter->first, iter->second);
- if (issue_ret < 0) {
- ret = issue_ret;
- break;
- }
- }
- } else if (ret >= 0) {
- ret = r;
- }
-
- // if we're at the end with this round, see if another round is needed
- if (iter == objs_container.end()) {
- if (need_multiple_rounds() && !completed_objs.empty()) {
- // For those objects which need another round, use them to reset
- // the container
- reset_container(completed_objs);
- iter = objs_container.begin();
- } else if (! need_multiple_rounds() && !retry_objs.empty()) {
- reset_container(retry_objs);
- iter = objs_container.begin();
- }
-
- // re-issue ops if container was reset above (i.e., iter !=
- // objs_container.end()); if it was not reset above (i.e., iter
- // == objs_container.end()) the loop will exit immediately
- // without iterating
- for (; num_completions && iter != objs_container.end(); --num_completions, ++iter) {
- int issue_ret = issue_op(iter->first, iter->second);
- if (issue_ret < 0) {
- ret = issue_ret;
- break;
- }
- }
- }
- }
-
- if (ret < 0) {
- cleanup();
- }
- return ret;
-} // CLSRGWConcurrentIO::operator()()
-
-
/**
* This class represents the bucket index object operation callback context.
*/
o.exec(RGW_CLASS, RGW_BUCKET_INIT_INDEX, in);
}
-static bool issue_bucket_index_init_op(librados::IoCtx& io_ctx,
- const int shard_id,
- const string& oid,
- BucketIndexAioManager *manager) {
- bufferlist in;
- librados::ObjectWriteOperation op;
- op.create(true);
- cls_rgw_bucket_init_index(op);
- return manager->aio_operate(io_ctx, shard_id, oid, &op);
-}
-
void cls_rgw_bucket_init_index2(ObjectWriteOperation& o)
{
bufferlist in;
o.exec(RGW_CLASS, RGW_BUCKET_INIT_INDEX2, in);
}
-static bool issue_bucket_index_init_op2(librados::IoCtx& io_ctx,
- const int shard_id,
- const string& oid,
- BucketIndexAioManager *manager) {
- bufferlist in;
- librados::ObjectWriteOperation op;
- op.create(true);
- cls_rgw_bucket_init_index2(op);
- return manager->aio_operate(io_ctx, shard_id, oid, &op);
-}
-
-static bool issue_bucket_index_clean_op(librados::IoCtx& io_ctx,
- const int shard_id,
- const string& oid,
- BucketIndexAioManager *manager) {
- bufferlist in;
- librados::ObjectWriteOperation op;
- op.remove();
- return manager->aio_operate(io_ctx, shard_id, oid, &op);
-}
-
void cls_rgw_bucket_set_tag_timeout(librados::ObjectWriteOperation& op,
uint64_t timeout)
{
op.exec(RGW_CLASS, RGW_BUCKET_SET_TAG_TIMEOUT, in);
}
-static bool issue_bucket_set_tag_timeout_op(librados::IoCtx& io_ctx,
- const int shard_id,
- const string& oid,
- uint64_t timeout,
- BucketIndexAioManager *manager) {
- ObjectWriteOperation op;
- cls_rgw_bucket_set_tag_timeout(op, timeout);
- return manager->aio_operate(io_ctx, shard_id, oid, &op);
-}
-
-int CLSRGWIssueBucketIndexInit::issue_op(const int shard_id, const string& oid)
-{
- return issue_bucket_index_init_op(io_ctx, shard_id, oid, &manager);
-}
-
-int CLSRGWIssueBucketIndexInit2::issue_op(const int shard_id, const string& oid)
-{
- return issue_bucket_index_init_op2(io_ctx, shard_id, oid, &manager);
-}
-
-void CLSRGWIssueBucketIndexInit::cleanup()
-{
- // Do best effort removal
- for (auto citer = objs_container.begin(); citer != iter; ++citer) {
- io_ctx.remove(citer->second);
- }
-}
-
-void CLSRGWIssueBucketIndexInit2::cleanup()
-{
- // Do best effort removal
- for (auto citer = objs_container.begin(); citer != iter; ++citer) {
- io_ctx.remove(citer->second);
- }
-}
-
-int CLSRGWIssueBucketIndexClean::issue_op(const int shard_id, const string& oid)
-{
- return issue_bucket_index_clean_op(io_ctx, shard_id, oid, &manager);
-}
-
-int CLSRGWIssueSetTagTimeout::issue_op(const int shard_id, const string& oid)
-{
- return issue_bucket_set_tag_timeout_op(io_ctx, shard_id, oid, tag_timeout, &manager);
-}
-
void cls_rgw_bucket_update_stats(librados::ObjectWriteOperation& o,
bool absolute,
const map<RGWObjCategory, rgw_bucket_category_stats>& stats,
new ClsBucketIndexOpCtx<rgw_cls_list_ret>(result, NULL));
}
-static bool issue_bucket_list_op(librados::IoCtx& io_ctx,
- const int shard_id,
- const std::string& oid,
- const cls_rgw_obj_key& start_obj,
- const std::string& filter_prefix,
- const std::string& delimiter,
- uint32_t num_entries,
- bool list_versions,
- BucketIndexAioManager *manager,
- rgw_cls_list_ret *pdata)
-{
- librados::ObjectReadOperation op;
- cls_rgw_bucket_list_op(op,
- start_obj, filter_prefix, delimiter,
- num_entries, list_versions, pdata);
- return manager->aio_operate(io_ctx, shard_id, oid, &op);
-}
-
-int CLSRGWIssueBucketList::issue_op(const int shard_id, const string& oid)
-{
- // set the marker depending on whether we've already queried this
- // shard and gotten a RGWBIAdvanceAndRetryError (defined
- // constant) return value; if we have use the marker in the return
- // to advance the search, otherwise use the marker passed in by the
- // caller
- cls_rgw_obj_key marker;
- auto iter = result.find(shard_id);
- if (iter != result.end()) {
- marker = iter->second.marker;
- } else {
- marker = start_obj;
- }
-
- return issue_bucket_list_op(io_ctx, shard_id, oid,
- marker, filter_prefix, delimiter,
- num_entries, list_versions, &manager,
- &result[shard_id]);
-}
-
-
-void CLSRGWIssueBucketList::reset_container(std::map<int, std::string>& objs)
-{
- objs_container.swap(objs);
- iter = objs_container.begin();
- objs.clear();
-}
-
-
void cls_rgw_remove_obj(librados::ObjectWriteOperation& o, list<string>& keep_attr_prefixes)
{
bufferlist in;
op.exec(RGW_CLASS, RGW_BI_LOG_LIST, in, new ClsBucketIndexOpCtx<cls_rgw_bi_log_list_ret>(pdata, ret));
}
-static bool issue_bi_log_list_op(librados::IoCtx& io_ctx, const string& oid, const int shard_id,
- BucketIndexShardsManager& marker_mgr, uint32_t max,
- BucketIndexAioManager *manager,
- cls_rgw_bi_log_list_ret *pdata)
-{
- librados::ObjectReadOperation op;
- cls_rgw_bilog_list(op, marker_mgr.get(shard_id, ""), max, pdata, nullptr);
- return manager->aio_operate(io_ctx, shard_id, oid, &op);
-}
-
-int CLSRGWIssueBILogList::issue_op(const int shard_id, const string& oid)
-{
- return issue_bi_log_list_op(io_ctx, oid, shard_id, marker_mgr, max, &manager, &result[shard_id]);
-}
-
void cls_rgw_bilog_trim(librados::ObjectWriteOperation& op,
const std::string& start_marker,
const std::string& end_marker)
op.exec(RGW_CLASS, RGW_BI_LOG_TRIM, in);
}
-static bool issue_bi_log_trim(librados::IoCtx& io_ctx, const string& oid, const int shard_id,
- BucketIndexShardsManager& start_marker_mgr,
- BucketIndexShardsManager& end_marker_mgr, BucketIndexAioManager *manager) {
- cls_rgw_bi_log_trim_op call;
- librados::ObjectWriteOperation op;
- cls_rgw_bilog_trim(op, start_marker_mgr.get(shard_id, ""),
- end_marker_mgr.get(shard_id, ""));
- return manager->aio_operate(io_ctx, shard_id, oid, &op);
-}
-
-int CLSRGWIssueBILogTrim::issue_op(const int shard_id, const string& oid)
-{
- return issue_bi_log_trim(io_ctx, oid, shard_id, start_marker_mgr, end_marker_mgr, &manager);
-}
-
void cls_rgw_bucket_reshard_log_trim(librados::ObjectWriteOperation& op)
{
bufferlist in;
op.exec(RGW_CLASS, RGW_RESHARD_LOG_TRIM, in);
}
-static bool issue_reshard_log_trim(librados::IoCtx& io_ctx, const string& oid, int shard_id,
- BucketIndexAioManager *manager) {
- ObjectWriteOperation op;
- cls_rgw_bucket_reshard_log_trim(op);
- return manager->aio_operate(io_ctx, shard_id, oid, &op);
-}
-
-int CLSRGWIssueReshardLogTrim::issue_op(int shard_id, const string& oid)
-{
- return issue_reshard_log_trim(io_ctx, oid, shard_id, &manager);
-}
-
void cls_rgw_bucket_check_index(librados::ObjectReadOperation& op,
bufferlist& out)
{
decode(result, p);
}
-static bool issue_bucket_check_index_op(IoCtx& io_ctx, const int shard_id, const string& oid, BucketIndexAioManager *manager,
- rgw_cls_check_index_ret *pdata) {
- bufferlist in;
- librados::ObjectReadOperation op;
- op.exec(RGW_CLASS, RGW_BUCKET_CHECK_INDEX, in, new ClsBucketIndexOpCtx<rgw_cls_check_index_ret>(
- pdata, NULL));
- return manager->aio_operate(io_ctx, shard_id, oid, &op);
-}
-
-int CLSRGWIssueBucketCheck::issue_op(int shard_id, const string& oid)
-{
- return issue_bucket_check_index_op(io_ctx, shard_id, oid, &manager, &result[shard_id]);
-}
-
void cls_rgw_bucket_rebuild_index(librados::ObjectWriteOperation& op)
{
bufferlist in;
op.exec(RGW_CLASS, RGW_BUCKET_REBUILD_INDEX, in);
}
-static bool issue_bucket_rebuild_index_op(IoCtx& io_ctx, const int shard_id, const string& oid,
- BucketIndexAioManager *manager) {
- librados::ObjectWriteOperation op;
- cls_rgw_bucket_rebuild_index(op);
- return manager->aio_operate(io_ctx, shard_id, oid, &op);
-}
-
-int CLSRGWIssueBucketRebuild::issue_op(const int shard_id, const string& oid)
-{
- return issue_bucket_rebuild_index_op(io_ctx, shard_id, oid, &manager);
-}
-
void cls_rgw_encode_suggestion(char op, rgw_bucket_dir_entry& dirent, bufferlist& updates)
{
updates.append(op);
o.exec(RGW_CLASS, RGW_DIR_SUGGEST_CHANGES, updates);
}
-int CLSRGWIssueGetDirHeader::issue_op(const int shard_id, const string& oid)
-{
- cls_rgw_obj_key empty_key;
- string empty_prefix;
- string empty_delimiter;
- return issue_bucket_list_op(io_ctx, shard_id, oid,
- empty_key, empty_prefix, empty_delimiter,
- 0, false, &manager, &result[shard_id]);
-}
-
void cls_rgw_bilog_start(ObjectWriteOperation& op)
{
bufferlist in;
op.exec(RGW_CLASS, RGW_BI_LOG_RESYNC, in);
}
-static bool issue_resync_bi_log(librados::IoCtx& io_ctx, const int shard_id, const string& oid, BucketIndexAioManager *manager)
-{
- librados::ObjectWriteOperation op;
- cls_rgw_bilog_start(op);
- return manager->aio_operate(io_ctx, shard_id, oid, &op);
-}
-
-int CLSRGWIssueResyncBucketBILog::issue_op(const int shard_id, const string& oid)
-{
- return issue_resync_bi_log(io_ctx, shard_id, oid, &manager);
-}
-
void cls_rgw_bilog_stop(ObjectWriteOperation& op)
{
bufferlist in;
op.exec(RGW_CLASS, RGW_BI_LOG_STOP, in);
}
-static bool issue_bi_log_stop(librados::IoCtx& io_ctx, const int shard_id, const string& oid, BucketIndexAioManager *manager)
-{
- librados::ObjectWriteOperation op;
- cls_rgw_bilog_stop(op);
- return manager->aio_operate(io_ctx, shard_id, oid, &op);
-}
-
-int CLSRGWIssueBucketBILogStop::issue_op(const int shard_id, const string& oid)
-{
- return issue_bi_log_stop(io_ctx, shard_id, oid, &manager);
-}
-
class GetDirHeaderCompletion : public ObjectOperationCompletion {
boost::intrusive_ptr<RGWGetDirHeader_CB> cb;
public:
op.assert_exists(); // the shard must exist; if not fail rather than recreate
op.exec(RGW_CLASS, RGW_SET_BUCKET_RESHARDING, in);
}
-
-static bool issue_set_bucket_resharding(librados::IoCtx& io_ctx,
- const int shard_id, const string& oid,
- const cls_rgw_bucket_instance_entry& entry,
- BucketIndexAioManager *manager) {
- librados::ObjectWriteOperation op;
- cls_rgw_set_bucket_resharding(op, entry.reshard_status);
- return manager->aio_operate(io_ctx, shard_id, oid, &op);
-}
-
-int CLSRGWIssueSetBucketResharding::issue_op(const int shard_id, const string& oid)
-{
- return issue_set_bucket_resharding(io_ctx, shard_id, oid, entry, &manager);
-}
-
void cls_rgw_bucket_init_index(librados::ObjectWriteOperation& o);
void cls_rgw_bucket_init_index2(librados::ObjectWriteOperation& o);
-class CLSRGWConcurrentIO {
-protected:
- librados::IoCtx& io_ctx;
-
- // map of shard # to oid; the shards that are remaining to be processed
- std::map<int, std::string>& objs_container;
- // iterator to work through objs_container
- std::map<int, std::string>::iterator iter;
-
- uint32_t max_aio;
- BucketIndexAioManager manager;
-
- virtual int issue_op(int shard_id, const std::string& oid) = 0;
-
- virtual void cleanup() {}
- virtual int valid_ret_code() { return 0; }
- // Return true if multiple rounds of OPs might be needed, this happens when
- // OP needs to be re-send until a certain code is returned.
- virtual bool need_multiple_rounds() { return false; }
- // Add a new object to the end of the container.
- virtual void add_object(int shard, const std::string& oid) {}
- virtual void reset_container(std::map<int, std::string>& objs) {}
-
-public:
-
- CLSRGWConcurrentIO(librados::IoCtx& ioc,
- std::map<int, std::string>& _objs_container,
- uint32_t _max_aio) :
- io_ctx(ioc), objs_container(_objs_container), max_aio(_max_aio)
- {}
-
- virtual ~CLSRGWConcurrentIO() {}
-
- int operator()();
-}; // class CLSRGWConcurrentIO
-
void cls_rgw_bucket_set_tag_timeout(librados::ObjectWriteOperation& op,
uint64_t timeout);
-class CLSRGWIssueBucketIndexInit : public CLSRGWConcurrentIO {
-protected:
- int issue_op(int shard_id, const std::string& oid) override;
- int valid_ret_code() override { return -EEXIST; }
- void cleanup() override;
-public:
- CLSRGWIssueBucketIndexInit(librados::IoCtx& ioc,
- std::map<int, std::string>& _bucket_objs,
- uint32_t _max_aio) :
- CLSRGWConcurrentIO(ioc, _bucket_objs, _max_aio) {}
- virtual ~CLSRGWIssueBucketIndexInit() override {}
-};
-
-
-class CLSRGWIssueBucketIndexInit2 : public CLSRGWConcurrentIO {
-protected:
- int issue_op(int shard_id, const std::string& oid) override;
- int valid_ret_code() override { return -EEXIST; }
- void cleanup() override;
-public:
- CLSRGWIssueBucketIndexInit2(librados::IoCtx& ioc,
- std::map<int, std::string>& _bucket_objs,
- uint32_t _max_aio) :
- CLSRGWConcurrentIO(ioc, _bucket_objs, _max_aio) {}
- virtual ~CLSRGWIssueBucketIndexInit2() override {}
-};
-
-
-class CLSRGWIssueBucketIndexClean : public CLSRGWConcurrentIO {
-protected:
- int issue_op(int shard_id, const std::string& oid) override;
- int valid_ret_code() override {
- return -ENOENT;
- }
-
-public:
- CLSRGWIssueBucketIndexClean(librados::IoCtx& ioc,
- std::map<int, std::string>& _bucket_objs,
- uint32_t _max_aio) :
- CLSRGWConcurrentIO(ioc, _bucket_objs, _max_aio)
- {}
- virtual ~CLSRGWIssueBucketIndexClean() override {}
-};
-
-
-class CLSRGWIssueSetTagTimeout : public CLSRGWConcurrentIO {
- uint64_t tag_timeout;
-protected:
- int issue_op(int shard_id, const std::string& oid) override;
-public:
- CLSRGWIssueSetTagTimeout(librados::IoCtx& ioc, std::map<int, std::string>& _bucket_objs,
- uint32_t _max_aio, uint64_t _tag_timeout) :
- CLSRGWConcurrentIO(ioc, _bucket_objs, _max_aio), tag_timeout(_tag_timeout) {}
- virtual ~CLSRGWIssueSetTagTimeout() override {}
-};
-
void cls_rgw_bucket_update_stats(librados::ObjectWriteOperation& o,
bool absolute,
const std::map<RGWObjCategory, rgw_bucket_category_stats>& stats,
uint64_t start_epoch, uint64_t end_epoch);
#endif
-
-/**
- * Std::list the bucket with the starting object and filter prefix.
- * NOTE: this method do listing requests for each bucket index shards identified by
- * the keys of the *list_results* std::map, which means the std::map should be populated
- * by the caller to fill with each bucket index object id.
- *
- * io_ctx - IO context for rados.
- * start_obj - marker for the listing.
- * filter_prefix - filter prefix.
- * num_entries - number of entries to request for each object (note the total
- * amount of entries returned depends on the number of shardings).
- * list_results - the std::list results keyed by bucket index object id.
- * max_aio - the maximum number of AIO (for throttling).
- *
- * Return 0 on success, a failure code otherwise.
-*/
-
-class CLSRGWIssueBucketList : public CLSRGWConcurrentIO {
- cls_rgw_obj_key start_obj;
- std::string filter_prefix;
- std::string delimiter;
- uint32_t num_entries;
- bool list_versions;
- std::map<int, rgw_cls_list_ret>& result; // request_id -> return value
-
-protected:
- int issue_op(int shard_id, const std::string& oid) override;
- void reset_container(std::map<int, std::string>& objs) override;
-
-public:
- CLSRGWIssueBucketList(librados::IoCtx& io_ctx,
- const cls_rgw_obj_key& _start_obj,
- const std::string& _filter_prefix,
- const std::string& _delimiter,
- uint32_t _num_entries,
- bool _list_versions,
- std::map<int, std::string>& oids, // shard_id -> shard_oid
- // shard_id -> return value
- std::map<int, rgw_cls_list_ret>& list_results,
- uint32_t max_aio) :
- CLSRGWConcurrentIO(io_ctx, oids, max_aio),
- start_obj(_start_obj), filter_prefix(_filter_prefix), delimiter(_delimiter),
- num_entries(_num_entries), list_versions(_list_versions),
- result(list_results)
- {}
-};
-
void cls_rgw_bucket_list_op(librados::ObjectReadOperation& op,
const cls_rgw_obj_key& start_obj,
const std::string& filter_prefix,
const std::string& marker, uint32_t max,
cls_rgw_bi_log_list_ret *pdata, int *ret = nullptr);
-class CLSRGWIssueBILogList : public CLSRGWConcurrentIO {
- std::map<int, cls_rgw_bi_log_list_ret>& result;
- BucketIndexShardsManager& marker_mgr;
- uint32_t max;
-protected:
- int issue_op(int shard_id, const std::string& oid) override;
-public:
- CLSRGWIssueBILogList(librados::IoCtx& io_ctx, BucketIndexShardsManager& _marker_mgr, uint32_t _max,
- std::map<int, std::string>& oids,
- std::map<int, cls_rgw_bi_log_list_ret>& bi_log_lists, uint32_t max_aio) :
- CLSRGWConcurrentIO(io_ctx, oids, max_aio), result(bi_log_lists),
- marker_mgr(_marker_mgr), max(_max) {}
- virtual ~CLSRGWIssueBILogList() override {}
-};
-
void cls_rgw_bilog_trim(librados::ObjectWriteOperation& op,
const std::string& start_marker,
const std::string& end_marker);
-class CLSRGWIssueBILogTrim : public CLSRGWConcurrentIO {
- BucketIndexShardsManager& start_marker_mgr;
- BucketIndexShardsManager& end_marker_mgr;
-protected:
- int issue_op(int shard_id, const std::string& oid) override;
- // Trim until -ENODATA is returned.
- int valid_ret_code() override { return -ENODATA; }
- bool need_multiple_rounds() override { return true; }
- void add_object(int shard, const std::string& oid) override { objs_container[shard] = oid; }
- void reset_container(std::map<int, std::string>& objs) override {
- objs_container.swap(objs);
- iter = objs_container.begin();
- objs.clear();
- }
-public:
- CLSRGWIssueBILogTrim(librados::IoCtx& io_ctx, BucketIndexShardsManager& _start_marker_mgr,
- BucketIndexShardsManager& _end_marker_mgr, std::map<int, std::string>& _bucket_objs, uint32_t max_aio) :
- CLSRGWConcurrentIO(io_ctx, _bucket_objs, max_aio),
- start_marker_mgr(_start_marker_mgr), end_marker_mgr(_end_marker_mgr) {}
- virtual ~CLSRGWIssueBILogTrim() override {}
-};
-
-class CLSRGWIssueReshardLogTrim : public CLSRGWConcurrentIO {
-protected:
- int issue_op(int shard_id, const std::string& oid) override;
- // Trim until -ENODATA is returned.
- int valid_ret_code() override { return -ENODATA; }
- bool need_multiple_rounds() override { return true; }
- void add_object(int shard, const std::string& oid) override { objs_container[shard] = oid; }
- void reset_container(std::map<int, std::string>& objs) override {
- objs_container.swap(objs);
- iter = objs_container.begin();
- objs.clear();
- }
-public:
- CLSRGWIssueReshardLogTrim(librados::IoCtx& io_ctx, std::map<int, std::string>& _bucket_objs, uint32_t max_aio) :
- CLSRGWConcurrentIO(io_ctx, _bucket_objs, max_aio) {}
-};
-
void cls_rgw_bucket_check_index(librados::ObjectReadOperation& op,
bufferlist& out);
// decode the response; may throw buffer::error
void cls_rgw_bucket_check_index_decode(const bufferlist& out,
rgw_cls_check_index_ret& result);
-/**
- * Check the bucket index.
- *
- * io_ctx - IO context for rados.
- * bucket_objs_ret - check result for all shards.
- * max_aio - the maximum number of AIO (for throttling).
- *
- * Return 0 on success, a failure code otherwise.
- */
-class CLSRGWIssueBucketCheck : public CLSRGWConcurrentIO /*<std::map<std::string, rgw_cls_check_index_ret> >*/ {
- std::map<int, rgw_cls_check_index_ret>& result;
-protected:
- int issue_op(int shard_id, const std::string& oid) override;
-public:
- CLSRGWIssueBucketCheck(librados::IoCtx& ioc, std::map<int, std::string>& oids,
- std::map<int, rgw_cls_check_index_ret>& bucket_objs_ret,
- uint32_t _max_aio) :
- CLSRGWConcurrentIO(ioc, oids, _max_aio), result(bucket_objs_ret) {}
- virtual ~CLSRGWIssueBucketCheck() override {}
-};
-
void cls_rgw_bucket_rebuild_index(librados::ObjectWriteOperation& op);
-class CLSRGWIssueBucketRebuild : public CLSRGWConcurrentIO {
-protected:
- int issue_op(int shard_id, const std::string& oid) override;
-public:
- CLSRGWIssueBucketRebuild(librados::IoCtx& io_ctx, std::map<int, std::string>& bucket_objs,
- uint32_t max_aio) : CLSRGWConcurrentIO(io_ctx, bucket_objs, max_aio) {}
- virtual ~CLSRGWIssueBucketRebuild() override {}
-};
-
-class CLSRGWIssueGetDirHeader : public CLSRGWConcurrentIO {
- std::map<int, rgw_cls_list_ret>& result;
-protected:
- int issue_op(int shard_id, const std::string& oid) override;
-public:
- CLSRGWIssueGetDirHeader(librados::IoCtx& io_ctx, std::map<int, std::string>& oids, std::map<int, rgw_cls_list_ret>& dir_headers,
- uint32_t max_aio) :
- CLSRGWConcurrentIO(io_ctx, oids, max_aio), result(dir_headers) {}
- virtual ~CLSRGWIssueGetDirHeader() override {}
-};
-
-class CLSRGWIssueSetBucketResharding : public CLSRGWConcurrentIO {
- cls_rgw_bucket_instance_entry entry;
-protected:
- int issue_op(int shard_id, const std::string& oid) override;
-public:
- CLSRGWIssueSetBucketResharding(librados::IoCtx& ioc, std::map<int, std::string>& _bucket_objs,
- const cls_rgw_bucket_instance_entry& _entry,
- uint32_t _max_aio) : CLSRGWConcurrentIO(ioc, _bucket_objs, _max_aio), entry(_entry) {}
- virtual ~CLSRGWIssueSetBucketResharding() override {}
-};
-
void cls_rgw_bilog_start(librados::ObjectWriteOperation& op);
void cls_rgw_bilog_stop(librados::ObjectWriteOperation& op);
-class CLSRGWIssueResyncBucketBILog : public CLSRGWConcurrentIO {
-protected:
- int issue_op(int shard_id, const std::string& oid);
-public:
- CLSRGWIssueResyncBucketBILog(librados::IoCtx& io_ctx, std::map<int, std::string>& _bucket_objs, uint32_t max_aio) :
- CLSRGWConcurrentIO(io_ctx, _bucket_objs, max_aio) {}
- virtual ~CLSRGWIssueResyncBucketBILog() override {}
-};
-
-class CLSRGWIssueBucketBILogStop : public CLSRGWConcurrentIO {
-protected:
- int issue_op(int shard_id, const std::string& oid);
-public:
- CLSRGWIssueBucketBILogStop(librados::IoCtx& io_ctx, std::map<int, std::string>& _bucket_objs, uint32_t max_aio) :
- CLSRGWConcurrentIO(io_ctx, _bucket_objs, max_aio) {}
- virtual ~CLSRGWIssueBucketBILogStop() override {}
-};
-
int cls_rgw_get_dir_header_async(librados::IoCtx& io_ctx, const std::string& oid,
boost::intrusive_ptr<RGWGetDirHeader_CB> cb);
return r;
}
- /* remove bucket index objects asynchronously by best effort */
- maybe_warn_about_blocking(dpp); // TODO: use AioTrottle
- (void) CLSRGWIssueBucketIndexClean(index_pool,
- bucket_objs,
- cct->_conf->rgw_bucket_index_max_aio)();
+ /* remove bucket index objects asynchronously by best effort */
+ std::ignore = svc.bi->clean_index(dpp, y, bucket_info, bucket_info.layout.current_index);
} else {
// set 'deleted' flag for multisite replication to handle bucket instance removal
r = store_delete_bucket_info_flag(bucket_info, attrs, y, dpp);
zones_trace, log_op);
}
-int RGWRados::cls_obj_set_bucket_tag_timeout(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info, uint64_t timeout)
-{
- librados::IoCtx index_pool;
- map<int, string> bucket_objs;
- int r = svc.bi_rados->open_bucket_index(dpp, bucket_info, std::nullopt, bucket_info.layout.current_index, &index_pool, &bucket_objs, nullptr);
- if (r < 0)
- return r;
-
- maybe_warn_about_blocking(dpp); // TODO: use AioTrottle
- return CLSRGWIssueSetTagTimeout(index_pool, bucket_objs, cct->_conf->rgw_bucket_index_max_aio, timeout)();
-}
-
// returns 0 if there is an error in calculation
uint32_t RGWRados::calc_ordered_bucket_list_per_shard(uint32_t num_entries,
int cls_obj_complete_cancel(BucketShard& bs, std::string& tag, rgw_obj& obj,
std::list<rgw_obj_index_key> *remove_objs,
uint16_t bilog_flags, rgw_zone_set *zones_trace = nullptr, bool log_op = true);
- int cls_obj_set_bucket_tag_timeout(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info, uint64_t timeout);
using ent_map_t =
boost::container::flat_map<std::string, rgw_bucket_dir_entry>;
return s;
}
-void test_stats(librados::IoCtx& ioctx, const string& oid, RGWObjCategory category, uint64_t num_entries, uint64_t total_size)
+static int read_header(librados::IoCtx& ioctx, const string& oid,
+ rgw_bucket_dir_header& header)
{
- map<int, struct rgw_cls_list_ret> results;
- map<int, string> oids;
- oids[0] = oid;
- ASSERT_EQ(0, CLSRGWIssueGetDirHeader(ioctx, oids, results, 8)());
-
- uint64_t entries = 0;
- uint64_t size = 0;
- map<int, struct rgw_cls_list_ret>::iterator iter = results.begin();
- for (; iter != results.end(); ++iter) {
- entries += (iter->second).dir.header.stats[category].num_entries;
- size += (iter->second).dir.header.stats[category].total_size;
+ bufferlist bl;
+ librados::ObjectReadOperation op;
+ op.omap_get_header(&bl, nullptr);
+ int r = ioctx.operate(oid, &op, nullptr);
+ if (r < 0) {
+ return r;
}
- ASSERT_EQ(total_size, size);
- ASSERT_EQ(num_entries, entries);
+ auto p = bl.cbegin();
+ decode(header, p);
+ return r;
+}
+
+void test_stats(librados::IoCtx& ioctx, const string& oid, RGWObjCategory category, uint64_t num_entries, uint64_t total_size)
+{
+ rgw_bucket_dir_header header;
+ ASSERT_EQ(0, read_header(ioctx, oid, header));
+ ASSERT_EQ(total_size, header.stats[category].total_size);
+ ASSERT_EQ(num_entries, header.stats[category].num_entries);
}
void index_prepare(librados::IoCtx& ioctx, const string& oid, RGWModifyOp index_op,
cls_rgw_encode_suggestion(suggest_op, dirent, updates);
}
- map<int, string> bucket_objs;
- bucket_objs[0] = bucket_oid;
- int r = CLSRGWIssueSetTagTimeout(ioctx, bucket_objs, 8 /* max aio */, 1)();
- ASSERT_EQ(0, r);
+ {
+ librados::ObjectWriteOperation op;
+ cls_rgw_bucket_set_tag_timeout(op, 1);
+ ASSERT_EQ(0, ioctx.operate(bucket_oid, &op));
+ }
sleep(1);
static void list_entries(librados::IoCtx& ioctx,
const std::string& oid,
uint32_t num_entries,
- std::map<int, rgw_cls_list_ret>& results)
+ rgw_cls_list_ret& result,
+ const cls_rgw_obj_key& start_key = {},
+ const std::string& delimiter = "")
{
std::map<int, std::string> oids = { {0, oid} };
- cls_rgw_obj_key start_key;
string empty_prefix;
- string empty_delimiter;
- ASSERT_EQ(0, CLSRGWIssueBucketList(ioctx, start_key, empty_prefix,
- empty_delimiter, num_entries,
- true, oids, results, 1)());
+ constexpr bool list_versions = true;
+ librados::ObjectReadOperation op;
+ cls_rgw_bucket_list_op(op, start_key, empty_prefix, delimiter,
+ num_entries, list_versions, &result);
+ ASSERT_EQ(0, ioctx.operate(oid, &op, nullptr));
}
TEST_F(cls_rgw, index_suggest_complete)
// list entry before completion
rgw_bucket_dir_entry dirent;
{
- std::map<int, rgw_cls_list_ret> listing;
+ rgw_cls_list_ret listing;
list_entries(ioctx, bucket_oid, 1, listing);
- ASSERT_EQ(1, listing.size());
- const auto& entries = listing.begin()->second.dir.m;
+ const auto& entries = listing.dir.m;
ASSERT_EQ(1, entries.size());
dirent = entries.begin()->second;
ASSERT_EQ(obj, dirent.key);
}
// list entry again, verify that suggested removal was not applied
{
- std::map<int, rgw_cls_list_ret> listing;
+ rgw_cls_list_ret listing;
list_entries(ioctx, bucket_oid, 1, listing);
- ASSERT_EQ(1, listing.size());
- const auto& entries = listing.begin()->second.dir.m;
+ const auto& entries = listing.dir.m;
ASSERT_EQ(1, entries.size());
EXPECT_TRUE(entries.begin()->second.exists);
}
test_stats(ioctx, bucket_oid, RGWObjCategory::None,
num_objs, obj_size * num_objs);
- map<int, string> oids = { {0, bucket_oid} };
- map<int, struct rgw_cls_list_ret> list_results;
- cls_rgw_obj_key start_key("", "");
- string empty_prefix;
- string empty_delimiter;
- int r = CLSRGWIssueBucketList(ioctx, start_key,
- empty_prefix, empty_delimiter,
- 1000, true, oids, list_results, 1)();
- ASSERT_EQ(r, 0);
- ASSERT_EQ(1u, list_results.size());
-
- auto it = list_results.begin();
- auto m = (it->second).dir.m;
+ rgw_cls_list_ret listing;
+ list_entries(ioctx, bucket_oid, 1000, listing);
+ const auto& m = listing.dir.m;
ASSERT_EQ(4u, m.size());
int i = 0;
}
}
- map<int, string> oids = { {0, bucket_oid} };
- map<int, struct rgw_cls_list_ret> list_results;
+ rgw_cls_list_ret listing;
cls_rgw_obj_key start_key("", "");
- const string empty_prefix;
const string delimiter = "/";
- int r = CLSRGWIssueBucketList(ioctx, start_key,
- empty_prefix, delimiter,
- 1000, true, oids, list_results, 1)();
- ASSERT_EQ(r, 0);
- ASSERT_EQ(1u, list_results.size()) <<
- "Because we only have one bucket index shard, we should "
- "only get one list_result.";
-
- auto it = list_results.begin();
- auto id_entry_map = it->second.dir.m;
- bool truncated = it->second.is_truncated;
+ list_entries(ioctx, bucket_oid, 1000, listing, start_key, delimiter);
+ auto id_entry_map = listing.dir.m;
// the cls code will make 4 tries to get 1000 entries; however
// because each of the subdirectories is so large, each attempt will
ASSERT_EQ(48u, id_entry_map.size()) <<
"We should get 40 top-level entries and the tops of 8 \"subdirectories\".";
- ASSERT_EQ(true, truncated) << "We did not get all entries.";
+ ASSERT_EQ(true, listing.is_truncated) << "We did not get all entries.";
ASSERT_EQ("a-0", id_entry_map.cbegin()->first);
ASSERT_EQ("p/", id_entry_map.crbegin()->first);
// now let's get the rest of the entries
- list_results.clear();
-
+ listing = {};
cls_rgw_obj_key start_key2("p/", "");
- r = CLSRGWIssueBucketList(ioctx, start_key2,
- empty_prefix, delimiter,
- 1000, true, oids, list_results, 1)();
- ASSERT_EQ(r, 0);
-
- it = list_results.begin();
- id_entry_map = it->second.dir.m;
- truncated = it->second.is_truncated;
+ list_entries(ioctx, bucket_oid, 1000, listing, start_key2, delimiter);
+ id_entry_map = listing.dir.m;
ASSERT_EQ(17u, id_entry_map.size()) <<
"We should get 15 top-level entries and the tops of 2 \"subdirectories\".";
- ASSERT_EQ(false, truncated) << "We now have all entries.";
+ ASSERT_EQ(false, listing.is_truncated) << "We now have all entries.";
ASSERT_EQ("q-0", id_entry_map.cbegin()->first);
ASSERT_EQ("u-4", id_entry_map.crbegin()->first);
// list to verify no pending ops
{
- std::map<int, rgw_cls_list_ret> results;
- list_entries(ioctx, bucket_oid, 1, results);
- ASSERT_EQ(1, results.size());
- const auto& entries = results.begin()->second.dir.m;
+ rgw_cls_list_ret listing;
+ list_entries(ioctx, bucket_oid, 1, listing);
+ const auto& entries = listing.dir.m;
ASSERT_EQ(1, entries.size());
dirent = std::move(entries.begin()->second);
ASSERT_EQ(obj, dirent.key);
// complete on tag2
index_complete(ioctx, bucket_oid, CLS_RGW_OP_DEL, tag2, ++epoch, obj, meta);
{
- std::map<int, rgw_cls_list_ret> results;
- list_entries(ioctx, bucket_oid, 1, results);
- ASSERT_EQ(1, results.size());
- const auto& entries = results.begin()->second.dir.m;
+ rgw_cls_list_ret listing;
+ list_entries(ioctx, bucket_oid, 1, listing);
+ const auto& entries = listing.dir.m;
ASSERT_EQ(1, entries.size());
dirent = std::move(entries.begin()->second);
ASSERT_EQ(obj, dirent.key);
// cancel on tag1
index_complete(ioctx, bucket_oid, CLS_RGW_OP_CANCEL, tag1, ++epoch, obj, meta);
{
- std::map<int, rgw_cls_list_ret> results;
- list_entries(ioctx, bucket_oid, 1, results);
- ASSERT_EQ(1, results.size());
- const auto& entries = results.begin()->second.dir.m;
+ rgw_cls_list_ret listing;
+ list_entries(ioctx, bucket_oid, 1, listing);
+ const auto& entries = listing.dir.m;
ASSERT_EQ(1, entries.size());
dirent = std::move(entries.begin()->second);
ASSERT_EQ(obj, dirent.key);
// verify that the key was removed
{
- std::map<int, rgw_cls_list_ret> results;
- list_entries(ioctx, bucket_oid, 1, results);
- EXPECT_EQ(1, results.size());
- const auto& entries = results.begin()->second.dir.m;
+ rgw_cls_list_ret listing;
+ list_entries(ioctx, bucket_oid, 1, listing);
+ const auto& entries = listing.dir.m;
ASSERT_EQ(0, entries.size());
}
void set_reshard_status(librados::IoCtx& ioctx, const std::string& oid,
cls_rgw_reshard_status status)
{
- map<int, string> bucket_objs;
- bucket_objs[0] = oid;
- const auto entry = cls_rgw_bucket_instance_entry{.reshard_status = status};
- int r = CLSRGWIssueSetBucketResharding(ioctx, bucket_objs, entry, 1)();
- ASSERT_EQ(0, r);
+ librados::ObjectWriteOperation op;
+ cls_rgw_set_bucket_resharding(op, status);
+ ASSERT_EQ(0, ioctx.operate(oid, &op));
}
static int reshardlog_list(librados::IoCtx& ioctx, const std::string& oid,
void reshardlog_entries(librados::IoCtx& ioctx, const std::string& oid, uint32_t num_entries)
{
- map<int, struct rgw_cls_list_ret> results;
- map<int, string> oids;
- oids[0] = oid;
- ASSERT_EQ(0, CLSRGWIssueGetDirHeader(ioctx, oids, results, 8)());
-
- uint32_t entries = 0;
- map<int, struct rgw_cls_list_ret>::iterator iter = results.begin();
- for (; iter != results.end(); ++iter) {
- entries += (iter->second).dir.header.reshardlog_entries;
- }
- ASSERT_EQ(entries, num_entries);
+ rgw_bucket_dir_header header;
+ ASSERT_EQ(0, read_header(ioctx, oid, header));
+ ASSERT_EQ(num_entries, header.reshardlog_entries);
}
TEST_F(cls_rgw, reshardlog_num)
void read_stats(librados::IoCtx& ioctx, const std::string& oid,
rgw_bucket_dir_stats& stats)
{
- auto oids = std::map<int, std::string>{{0, oid}};
- std::map<int, rgw_cls_list_ret> results;
- ASSERT_EQ(0, CLSRGWIssueGetDirHeader(ioctx, oids, results, 8)());
- ASSERT_EQ(1, results.size());
- stats = std::move(results.begin()->second.dir.header.stats);
+ bufferlist bl;
+ librados::ObjectReadOperation op;
+ op.omap_get_header(&bl, nullptr);
+ ASSERT_EQ(0, ioctx.operate(oid, &op, nullptr));
+
+ rgw_bucket_dir_header header;
+ auto p = bl.cbegin();
+ decode(header, p);
+
+ stats = std::move(header.stats);
}
static void account_entry(rgw_bucket_dir_stats& stats,