From 9606346592dfd6261aa2daa4cbec56f9a72c65fc Mon Sep 17 00:00:00 2001 From: "J. Eric Ivancich" Date: Fri, 16 Jul 2021 15:31:35 -0400 Subject: [PATCH] rgw: de-conflate shard_id and request_id in CLSRGWConcurrentIO When using asynchronous (concurrent) IO for bucket index requests, there are two int ids that are used that need to be kept separate -- shard id and request id. In many cases they're the same -- shard 0 gets request 0, and so forth. But in preparation for re-requests, those ids can diverge, where request 13 maps to shard 2. The existing code maintained the OIDs that went with each request. This PR also maintains the shard id as well. Documentation has been beefed up to help future developers navigate this. Signed-off-by: J. Eric Ivancich --- src/cls/rgw/cls_rgw_client.cc | 107 ++++++++++++++++++---------------- src/cls/rgw/cls_rgw_client.h | 61 +++++++++++++------ 2 files changed, 99 insertions(+), 69 deletions(-) diff --git a/src/cls/rgw/cls_rgw_client.cc b/src/cls/rgw/cls_rgw_client.cc index 73bcbac066e..a93b28bd5de 100644 --- a/src/cls/rgw/cls_rgw_client.cc +++ b/src/cls/rgw/cls_rgw_client.cc @@ -47,19 +47,19 @@ public: } }; -void BucketIndexAioManager::do_completion(int id) { +void BucketIndexAioManager::do_completion(const int request_id) { std::lock_guard l{lock}; - auto iter = pendings.find(id); + auto iter = pendings.find(request_id); ceph_assert(iter != pendings.end()); - completions[id] = iter->second; + completions[request_id] = iter->second; pendings.erase(iter); // If the caller needs a list of finished objects, store them // for further processing - auto miter = pending_objs.find(id); + auto miter = pending_objs.find(request_id); if (miter != pending_objs.end()) { - completion_objs[id] = miter->second; + completion_objs.emplace(request_id, miter->second); pending_objs.erase(miter); } @@ -85,7 +85,7 @@ bool BucketIndexAioManager::wait_for_completions(int valid_ret_code, if (objs && r == 0) { /* update list of successfully completed objs */ auto liter = completion_objs.find(iter->first); if (liter != completion_objs.end()) { - (*objs)[liter->first] = liter->second; + (*objs)[liter->second.shard_id] = liter->second.oid; } } if (ret_code && (r < 0 && r != valid_ret_code)) @@ -107,38 +107,43 @@ void cls_rgw_bucket_init_index(ObjectWriteOperation& o) } static bool issue_bucket_index_init_op(librados::IoCtx& io_ctx, + const int shard_id, const string& oid, BucketIndexAioManager *manager) { bufferlist in; librados::ObjectWriteOperation op; op.create(true); op.exec(RGW_CLASS, RGW_BUCKET_INIT_INDEX, in); - return manager->aio_operate(io_ctx, oid, &op); + return manager->aio_operate(io_ctx, shard_id, oid, &op); } static bool issue_bucket_index_clean_op(librados::IoCtx& io_ctx, + const int shard_id, const string& oid, BucketIndexAioManager *manager) { bufferlist in; librados::ObjectWriteOperation op; op.remove(); - return manager->aio_operate(io_ctx, oid, &op); + return manager->aio_operate(io_ctx, shard_id, oid, &op); } static bool issue_bucket_set_tag_timeout_op(librados::IoCtx& io_ctx, - const string& oid, uint64_t timeout, BucketIndexAioManager *manager) { + const int shard_id, + const string& oid, + uint64_t timeout, + BucketIndexAioManager *manager) { bufferlist in; rgw_cls_tag_timeout_op call; call.tag_timeout = timeout; encode(call, in); ObjectWriteOperation op; op.exec(RGW_CLASS, RGW_BUCKET_SET_TAG_TIMEOUT, in); - return manager->aio_operate(io_ctx, oid, &op); + return manager->aio_operate(io_ctx, shard_id, oid, &op); } -int CLSRGWIssueBucketIndexInit::issue_op(int shard_id, const string& oid) +int CLSRGWIssueBucketIndexInit::issue_op(const int shard_id, const string& oid) { - return issue_bucket_index_init_op(io_ctx, oid, &manager); + return issue_bucket_index_init_op(io_ctx, shard_id, oid, &manager); } void CLSRGWIssueBucketIndexInit::cleanup() @@ -149,14 +154,14 @@ void CLSRGWIssueBucketIndexInit::cleanup() } } -int CLSRGWIssueBucketIndexClean::issue_op(int shard_id, const string& oid) +int CLSRGWIssueBucketIndexClean::issue_op(const int shard_id, const string& oid) { - return issue_bucket_index_clean_op(io_ctx, oid, &manager); + return issue_bucket_index_clean_op(io_ctx, shard_id, oid, &manager); } -int CLSRGWIssueSetTagTimeout::issue_op(int shard_id, const string& oid) +int CLSRGWIssueSetTagTimeout::issue_op(const int shard_id, const string& oid) { - return issue_bucket_set_tag_timeout_op(io_ctx, oid, tag_timeout, &manager); + return issue_bucket_set_tag_timeout_op(io_ctx, shard_id, oid, tag_timeout, &manager); } void cls_rgw_bucket_update_stats(librados::ObjectWriteOperation& o, @@ -237,10 +242,11 @@ void cls_rgw_bucket_list_op(librados::ObjectReadOperation& op, } static bool issue_bucket_list_op(librados::IoCtx& io_ctx, - const string& oid, + const int shard_id, + const std::string& oid, const cls_rgw_obj_key& start_obj, - const string& filter_prefix, - const string& delimiter, + const std::string& filter_prefix, + const std::string& delimiter, uint32_t num_entries, bool list_versions, BucketIndexAioManager *manager, @@ -250,12 +256,12 @@ static bool issue_bucket_list_op(librados::IoCtx& io_ctx, cls_rgw_bucket_list_op(op, start_obj, filter_prefix, delimiter, num_entries, list_versions, pdata); - return manager->aio_operate(io_ctx, oid, &op); + return manager->aio_operate(io_ctx, shard_id, oid, &op); } -int CLSRGWIssueBucketList::issue_op(int shard_id, const string& oid) +int CLSRGWIssueBucketList::issue_op(const int shard_id, const string& oid) { - return issue_bucket_list_op(io_ctx, oid, + return issue_bucket_list_op(io_ctx, shard_id, oid, start_obj, filter_prefix, delimiter, num_entries, list_versions, &manager, &result[shard_id]); @@ -379,7 +385,7 @@ int cls_rgw_bi_list(librados::IoCtx& io_ctx, const std::string& oid, return 0; } -int cls_rgw_bucket_link_olh(librados::IoCtx& io_ctx, const string& oid, +int cls_rgw_bucket_link_olh(librados::IoCtx& io_ctx, const string& oid, const cls_rgw_obj_key& key, bufferlist& olh_tag, bool delete_marker, const string& op_tag, rgw_bucket_dir_entry_meta *meta, uint64_t olh_epoch, ceph::real_time unmod_since, bool high_precision_time, bool log_op, rgw_zone_set& zones_trace) @@ -516,17 +522,17 @@ void cls_rgw_bilog_list(librados::ObjectReadOperation& op, op.exec(RGW_CLASS, RGW_BI_LOG_LIST, in, new ClsBucketIndexOpCtx(pdata, ret)); } -static bool issue_bi_log_list_op(librados::IoCtx& io_ctx, const string& oid, int shard_id, +static bool issue_bi_log_list_op(librados::IoCtx& io_ctx, const string& oid, const int shard_id, BucketIndexShardsManager& marker_mgr, uint32_t max, BucketIndexAioManager *manager, cls_rgw_bi_log_list_ret *pdata) { librados::ObjectReadOperation op; cls_rgw_bilog_list(op, marker_mgr.get(shard_id, ""), max, pdata, nullptr); - return manager->aio_operate(io_ctx, oid, &op); + return manager->aio_operate(io_ctx, shard_id, oid, &op); } -int CLSRGWIssueBILogList::issue_op(int shard_id, const string& oid) +int CLSRGWIssueBILogList::issue_op(const int shard_id, const string& oid) { return issue_bi_log_list_op(io_ctx, oid, shard_id, marker_mgr, max, &manager, &result[shard_id]); } @@ -544,46 +550,46 @@ void cls_rgw_bilog_trim(librados::ObjectWriteOperation& op, op.exec(RGW_CLASS, RGW_BI_LOG_TRIM, in); } -static bool issue_bi_log_trim(librados::IoCtx& io_ctx, const string& oid, int shard_id, +static bool issue_bi_log_trim(librados::IoCtx& io_ctx, const string& oid, const int shard_id, BucketIndexShardsManager& start_marker_mgr, BucketIndexShardsManager& end_marker_mgr, BucketIndexAioManager *manager) { cls_rgw_bi_log_trim_op call; librados::ObjectWriteOperation op; cls_rgw_bilog_trim(op, start_marker_mgr.get(shard_id, ""), end_marker_mgr.get(shard_id, "")); - return manager->aio_operate(io_ctx, oid, &op); + return manager->aio_operate(io_ctx, shard_id, oid, &op); } -int CLSRGWIssueBILogTrim::issue_op(int shard_id, const string& oid) +int CLSRGWIssueBILogTrim::issue_op(const int shard_id, const string& oid) { return issue_bi_log_trim(io_ctx, oid, shard_id, start_marker_mgr, end_marker_mgr, &manager); } -static bool issue_bucket_check_index_op(IoCtx& io_ctx, const string& oid, BucketIndexAioManager *manager, +static bool issue_bucket_check_index_op(IoCtx& io_ctx, const int shard_id, const string& oid, BucketIndexAioManager *manager, rgw_cls_check_index_ret *pdata) { bufferlist in; librados::ObjectReadOperation op; op.exec(RGW_CLASS, RGW_BUCKET_CHECK_INDEX, in, new ClsBucketIndexOpCtx( pdata, NULL)); - return manager->aio_operate(io_ctx, oid, &op); + return manager->aio_operate(io_ctx, shard_id, oid, &op); } int CLSRGWIssueBucketCheck::issue_op(int shard_id, const string& oid) { - return issue_bucket_check_index_op(io_ctx, oid, &manager, &result[shard_id]); + return issue_bucket_check_index_op(io_ctx, shard_id, oid, &manager, &result[shard_id]); } -static bool issue_bucket_rebuild_index_op(IoCtx& io_ctx, const string& oid, +static bool issue_bucket_rebuild_index_op(IoCtx& io_ctx, const int shard_id, const string& oid, BucketIndexAioManager *manager) { bufferlist in; librados::ObjectWriteOperation op; op.exec(RGW_CLASS, RGW_BUCKET_REBUILD_INDEX, in); - return manager->aio_operate(io_ctx, oid, &op); + return manager->aio_operate(io_ctx, shard_id, oid, &op); } -int CLSRGWIssueBucketRebuild::issue_op(int shard_id, const string& oid) +int CLSRGWIssueBucketRebuild::issue_op(const int shard_id, const string& oid) { - return issue_bucket_rebuild_index_op(io_ctx, oid, &manager); + return issue_bucket_rebuild_index_op(io_ctx, shard_id, oid, &manager); } void cls_rgw_encode_suggestion(char op, rgw_bucket_dir_entry& dirent, bufferlist& updates) @@ -597,40 +603,40 @@ void cls_rgw_suggest_changes(ObjectWriteOperation& o, bufferlist& updates) o.exec(RGW_CLASS, RGW_DIR_SUGGEST_CHANGES, updates); } -int CLSRGWIssueGetDirHeader::issue_op(int shard_id, const string& oid) +int CLSRGWIssueGetDirHeader::issue_op(const int shard_id, const string& oid) { cls_rgw_obj_key empty_key; string empty_prefix; string empty_delimiter; - return issue_bucket_list_op(io_ctx, oid, + return issue_bucket_list_op(io_ctx, shard_id, oid, empty_key, empty_prefix, empty_delimiter, 0, false, &manager, &result[shard_id]); } -static bool issue_resync_bi_log(librados::IoCtx& io_ctx, const string& oid, BucketIndexAioManager *manager) +static bool issue_resync_bi_log(librados::IoCtx& io_ctx, const int shard_id, const string& oid, BucketIndexAioManager *manager) { bufferlist in; librados::ObjectWriteOperation op; op.exec(RGW_CLASS, RGW_BI_LOG_RESYNC, in); - return manager->aio_operate(io_ctx, oid, &op); + return manager->aio_operate(io_ctx, shard_id, oid, &op); } -int CLSRGWIssueResyncBucketBILog::issue_op(int shard_id, const string& oid) +int CLSRGWIssueResyncBucketBILog::issue_op(const int shard_id, const string& oid) { - return issue_resync_bi_log(io_ctx, oid, &manager); + return issue_resync_bi_log(io_ctx, shard_id, oid, &manager); } -static bool issue_bi_log_stop(librados::IoCtx& io_ctx, const string& oid, BucketIndexAioManager *manager) +static bool issue_bi_log_stop(librados::IoCtx& io_ctx, const int shard_id, const string& oid, BucketIndexAioManager *manager) { bufferlist in; librados::ObjectWriteOperation op; op.exec(RGW_CLASS, RGW_BI_LOG_STOP, in); - return manager->aio_operate(io_ctx, oid, &op); + return manager->aio_operate(io_ctx, shard_id, oid, &op); } -int CLSRGWIssueBucketBILogStop::issue_op(int shard_id, const string& oid) +int CLSRGWIssueBucketBILogStop::issue_op(const int shard_id, const string& oid) { - return issue_bi_log_stop(io_ctx, oid, &manager); + return issue_bi_log_stop(io_ctx, shard_id, oid, &manager); } class GetDirHeaderCompletion : public ObjectOperationCompletion { @@ -1072,7 +1078,8 @@ void cls_rgw_guard_bucket_resharding(librados::ObjectOperation& op, int ret_err) op.exec(RGW_CLASS, RGW_GUARD_BUCKET_RESHARDING, in); } -static bool issue_set_bucket_resharding(librados::IoCtx& io_ctx, const string& oid, +static bool issue_set_bucket_resharding(librados::IoCtx& io_ctx, + const int shard_id, const string& oid, const cls_rgw_bucket_instance_entry& entry, BucketIndexAioManager *manager) { bufferlist in; @@ -1081,10 +1088,10 @@ static bool issue_set_bucket_resharding(librados::IoCtx& io_ctx, const string& o encode(call, in); librados::ObjectWriteOperation op; op.exec(RGW_CLASS, RGW_SET_BUCKET_RESHARDING, in); - return manager->aio_operate(io_ctx, oid, &op); + return manager->aio_operate(io_ctx, shard_id, oid, &op); } -int CLSRGWIssueSetBucketResharding::issue_op(int shard_id, const string& oid) +int CLSRGWIssueSetBucketResharding::issue_op(const int shard_id, const string& oid) { - return issue_set_bucket_resharding(io_ctx, oid, entry, &manager); + return issue_set_bucket_resharding(io_ctx, shard_id, oid, entry, &manager); } diff --git a/src/cls/rgw/cls_rgw_client.h b/src/cls/rgw/cls_rgw_client.h index b4f0f42349b..bcabd51391f 100644 --- a/src/cls/rgw/cls_rgw_client.h +++ b/src/cls/rgw/cls_rgw_client.h @@ -27,16 +27,33 @@ struct BucketIndexAioArg : public RefCountedObject { }; /* - * This class manages AIO completions. This class is not completely thread-safe, - * methods like *get_next* is not thread-safe and is expected to be called from - * within one thread. + * This class manages AIO completions. This class is not completely + * thread-safe, methods like *get_next_request_id* is not thread-safe + * and is expected to be called from within one thread. */ class BucketIndexAioManager { +public: + + // allows us to reaccess the shard id and shard's oid during and + // after the asynchronous call is made + struct RequestObj { + int shard_id; + std::string oid; + + RequestObj(int _shard_id, const std::string& _oid) : + shard_id(_shard_id), oid(_oid) + {/* empty */} + }; + + private: + // NB: the following 4 maps use the request_id as the key; this + // is not the same as the shard_id! std::map pendings; std::map completions; - std::map pending_objs; - std::map completion_objs; + std::map pending_objs; + std::map completion_objs; + int next = 0; ceph::mutex lock = ceph::make_mutex("BucketIndexAioManager::lock"); ceph::condition_variable cond; @@ -54,8 +71,8 @@ private: * * Return next request ID. */ - int get_next() { return next++; } - + int get_next_request_id() { return next++; } + /* * Add a new pending AIO completion instance. * @@ -64,10 +81,11 @@ private: * @param oid - the object id associated with the object, if it is NULL, we don't * track the object id per callback. */ - void add_pending(int id, librados::AioCompletion* completion, const std::string& oid) { - pendings[id] = completion; - pending_objs[id] = oid; + void add_pending(int request_id, librados::AioCompletion* completion, const int shard_id, const std::string& oid) { + pendings[request_id] = completion; + pending_objs.emplace(request_id, RequestObj(shard_id, oid)); } + public: /* * Create a new instance. @@ -77,7 +95,7 @@ public: /* * Do completion for the given AIO request. */ - void do_completion(int id); + void do_completion(int request_id); /* * Wait for AIO completions. @@ -95,13 +113,14 @@ public: /** * Do aio read operation. */ - bool aio_operate(librados::IoCtx& io_ctx, const std::string& oid, librados::ObjectReadOperation *op) { + bool aio_operate(librados::IoCtx& io_ctx, const int shard_id, const std::string& oid, librados::ObjectReadOperation *op) { std::lock_guard l{lock}; - BucketIndexAioArg *arg = new BucketIndexAioArg(get_next(), this); + const int request_id = get_next_request_id(); + BucketIndexAioArg *arg = new BucketIndexAioArg(request_id, this); librados::AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, bucket_index_op_completion_cb); int r = io_ctx.aio_operate(oid, c, (librados::ObjectReadOperation*)op, NULL); if (r >= 0) { - add_pending(arg->id, c, oid); + add_pending(arg->id, c, shard_id, oid); } else { arg->put(); c->release(); @@ -112,13 +131,14 @@ public: /** * Do aio write operation. */ - bool aio_operate(librados::IoCtx& io_ctx, const std::string& oid, librados::ObjectWriteOperation *op) { + bool aio_operate(librados::IoCtx& io_ctx, const int shard_id, const std::string& oid, librados::ObjectWriteOperation *op) { std::lock_guard l{lock}; - BucketIndexAioArg *arg = new BucketIndexAioArg(get_next(), this); + const int request_id = get_next_request_id(); + BucketIndexAioArg *arg = new BucketIndexAioArg(request_id, this); librados::AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, bucket_index_op_completion_cb); int r = io_ctx.aio_operate(oid, c, (librados::ObjectWriteOperation*)op); if (r >= 0) { - add_pending(arg->id, c, oid); + add_pending(arg->id, c, shard_id, oid); } else { arg->put(); c->release(); @@ -435,9 +455,11 @@ class CLSRGWIssueBucketList : public CLSRGWConcurrentIO { std::string delimiter; uint32_t num_entries; bool list_versions; - std::map& result; + std::map& result; // request_id -> return value + protected: int issue_op(int shard_id, const std::string& oid) override; + public: CLSRGWIssueBucketList(librados::IoCtx& io_ctx, const cls_rgw_obj_key& _start_obj, @@ -445,7 +467,8 @@ public: const std::string& _delimiter, uint32_t _num_entries, bool _list_versions, - std::map& oids, + std::map& oids, // shard_id -> shard_oid + // shard_id -> return value std::map& list_results, uint32_t max_aio) : CLSRGWConcurrentIO(io_ctx, oids, max_aio), -- 2.39.5