From: Casey Bodley Date: Thu, 7 Nov 2024 21:10:15 +0000 (-0500) Subject: rgw/rados: index operations use async_reads/writes() X-Git-Tag: v20.3.0~144^2~1 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=e4fd504e3fff5f393e5776a261710278f5d7a6d4;p=ceph.git rgw/rados: index operations use async_reads/writes() replace the classes derived from CLSRGWConcurrentIO with classes derived from Reader/Writer/RevertibleWriter and use the async algorithms Signed-off-by: Casey Bodley --- diff --git a/src/cls/rgw/cls_rgw_client.cc b/src/cls/rgw/cls_rgw_client.cc index 6c97311eb0ad2..33cd0de48e305 100644 --- a/src/cls/rgw/cls_rgw_client.cc +++ b/src/cls/rgw/cls_rgw_client.cc @@ -230,17 +230,22 @@ static bool issue_bucket_index_clean_op(librados::IoCtx& io_ctx, return manager->aio_operate(io_ctx, shard_id, oid, &op); } +void cls_rgw_bucket_set_tag_timeout(librados::ObjectWriteOperation& op, + uint64_t timeout) +{ + const auto call = rgw_cls_tag_timeout_op{.tag_timeout = timeout}; + bufferlist in; + encode(call, in); + op.exec(RGW_CLASS, RGW_BUCKET_SET_TAG_TIMEOUT, in); +} + static bool issue_bucket_set_tag_timeout_op(librados::IoCtx& io_ctx, const int shard_id, const string& oid, uint64_t timeout, BucketIndexAioManager *manager) { - bufferlist in; - rgw_cls_tag_timeout_op call; - call.tag_timeout = timeout; - encode(call, in); ObjectWriteOperation op; - op.exec(RGW_CLASS, RGW_BUCKET_SET_TAG_TIMEOUT, in); + cls_rgw_bucket_set_tag_timeout(op, timeout); return manager->aio_operate(io_ctx, shard_id, oid, &op); } @@ -725,11 +730,16 @@ int CLSRGWIssueBILogTrim::issue_op(const int shard_id, const string& oid) return issue_bi_log_trim(io_ctx, oid, shard_id, start_marker_mgr, end_marker_mgr, &manager); } +void cls_rgw_bucket_reshard_log_trim(librados::ObjectWriteOperation& op) +{ + bufferlist in; + op.exec(RGW_CLASS, RGW_RESHARD_LOG_TRIM, in); +} + static bool issue_reshard_log_trim(librados::IoCtx& io_ctx, const string& oid, int shard_id, BucketIndexAioManager *manager) { - bufferlist in; ObjectWriteOperation op; - op.exec(RGW_CLASS, RGW_RESHARD_LOG_TRIM, in); + cls_rgw_bucket_reshard_log_trim(op); return manager->aio_operate(io_ctx, shard_id, oid, &op); } @@ -738,6 +748,20 @@ int CLSRGWIssueReshardLogTrim::issue_op(int shard_id, const string& oid) return issue_reshard_log_trim(io_ctx, oid, shard_id, &manager); } +void cls_rgw_bucket_check_index(librados::ObjectReadOperation& op, + bufferlist& out) +{ + bufferlist in; + op.exec(RGW_CLASS, RGW_BUCKET_CHECK_INDEX, in, &out, nullptr); +} + +void cls_rgw_bucket_check_index_decode(const bufferlist& out, + rgw_cls_check_index_ret& result) +{ + auto p = out.cbegin(); + decode(result, p); +} + static bool issue_bucket_check_index_op(IoCtx& io_ctx, const int shard_id, const string& oid, BucketIndexAioManager *manager, rgw_cls_check_index_ret *pdata) { bufferlist in; @@ -752,11 +776,16 @@ int CLSRGWIssueBucketCheck::issue_op(int shard_id, const string& oid) return issue_bucket_check_index_op(io_ctx, shard_id, oid, &manager, &result[shard_id]); } +void cls_rgw_bucket_rebuild_index(librados::ObjectWriteOperation& op) +{ + bufferlist in; + op.exec(RGW_CLASS, RGW_BUCKET_REBUILD_INDEX, in); +} + static bool issue_bucket_rebuild_index_op(IoCtx& io_ctx, const int shard_id, const string& oid, BucketIndexAioManager *manager) { - bufferlist in; librados::ObjectWriteOperation op; - op.exec(RGW_CLASS, RGW_BUCKET_REBUILD_INDEX, in); + cls_rgw_bucket_rebuild_index(op); return manager->aio_operate(io_ctx, shard_id, oid, &op); } @@ -786,11 +815,16 @@ int CLSRGWIssueGetDirHeader::issue_op(const int shard_id, const string& oid) 0, false, &manager, &result[shard_id]); } -static bool issue_resync_bi_log(librados::IoCtx& io_ctx, const int shard_id, const string& oid, BucketIndexAioManager *manager) +void cls_rgw_bilog_start(ObjectWriteOperation& op) { bufferlist in; - librados::ObjectWriteOperation op; op.exec(RGW_CLASS, RGW_BI_LOG_RESYNC, in); +} + +static bool issue_resync_bi_log(librados::IoCtx& io_ctx, const int shard_id, const string& oid, BucketIndexAioManager *manager) +{ + librados::ObjectWriteOperation op; + cls_rgw_bilog_start(op); return manager->aio_operate(io_ctx, shard_id, oid, &op); } @@ -799,11 +833,16 @@ int CLSRGWIssueResyncBucketBILog::issue_op(const int shard_id, const string& oid return issue_resync_bi_log(io_ctx, shard_id, oid, &manager); } -static bool issue_bi_log_stop(librados::IoCtx& io_ctx, const int shard_id, const string& oid, BucketIndexAioManager *manager) +void cls_rgw_bilog_stop(ObjectWriteOperation& op) { bufferlist in; - librados::ObjectWriteOperation op; op.exec(RGW_CLASS, RGW_BI_LOG_STOP, in); +} + +static bool issue_bi_log_stop(librados::IoCtx& io_ctx, const int shard_id, const string& oid, BucketIndexAioManager *manager) +{ + librados::ObjectWriteOperation op; + cls_rgw_bilog_stop(op); return manager->aio_operate(io_ctx, shard_id, oid, &op); } @@ -1214,49 +1253,31 @@ void cls_rgw_reshard_remove(librados::ObjectWriteOperation& op, const cls_rgw_re op.exec(RGW_CLASS, RGW_RESHARD_REMOVE, in); } -int cls_rgw_set_bucket_resharding(librados::IoCtx& io_ctx, const string& oid, - const cls_rgw_bucket_instance_entry& entry) -{ - bufferlist in, out; - cls_rgw_set_bucket_resharding_op call; - call.entry = entry; - encode(call, in); - librados::ObjectWriteOperation op; - op.exec(RGW_CLASS, RGW_SET_BUCKET_RESHARDING, in); - return io_ctx.operate(oid, &op); -} - -int cls_rgw_clear_bucket_resharding(librados::IoCtx& io_ctx, const string& oid) +void cls_rgw_clear_bucket_resharding(librados::ObjectWriteOperation& op) { - bufferlist in, out; + bufferlist in; cls_rgw_clear_bucket_resharding_op call; encode(call, in); - librados::ObjectWriteOperation op; op.exec(RGW_CLASS, RGW_CLEAR_BUCKET_RESHARDING, in); - return io_ctx.operate(oid, &op); } -int cls_rgw_get_bucket_resharding(librados::IoCtx& io_ctx, const string& oid, - cls_rgw_bucket_instance_entry *entry) +void cls_rgw_get_bucket_resharding(librados::ObjectReadOperation& op, + bufferlist& out) { - bufferlist in, out; + bufferlist in; cls_rgw_get_bucket_resharding_op call; encode(call, in); - int r= io_ctx.exec(oid, RGW_CLASS, RGW_GET_BUCKET_RESHARDING, in, out); - if (r < 0) - return r; + op.exec(RGW_CLASS, RGW_GET_BUCKET_RESHARDING, in, &out, nullptr); +} +void cls_rgw_get_bucket_resharding_decode(const bufferlist& out, + cls_rgw_bucket_instance_entry& entry) +{ cls_rgw_get_bucket_resharding_ret op_ret; auto iter = out.cbegin(); - try { - decode(op_ret, iter); - } catch (ceph::buffer::error& err) { - return -EIO; - } - - *entry = op_ret.new_instance; + decode(op_ret, iter); - return 0; + entry = std::move(op_ret.new_instance); } void cls_rgw_guard_bucket_resharding(librados::ObjectOperation& op, int ret_err) @@ -1268,17 +1289,24 @@ void cls_rgw_guard_bucket_resharding(librados::ObjectOperation& op, int ret_err) op.exec(RGW_CLASS, RGW_GUARD_BUCKET_RESHARDING, in); } -static bool issue_set_bucket_resharding(librados::IoCtx& io_ctx, - const int shard_id, const string& oid, - const cls_rgw_bucket_instance_entry& entry, - BucketIndexAioManager *manager) { +void cls_rgw_set_bucket_resharding(librados::ObjectWriteOperation& op, + cls_rgw_reshard_status status) +{ bufferlist in; cls_rgw_set_bucket_resharding_op call; - call.entry = entry; + call.entry.reshard_status = status; encode(call, in); - librados::ObjectWriteOperation op; + op.assert_exists(); // the shard must exist; if not fail rather than recreate op.exec(RGW_CLASS, RGW_SET_BUCKET_RESHARDING, in); +} + +static bool issue_set_bucket_resharding(librados::IoCtx& io_ctx, + const int shard_id, const string& oid, + const cls_rgw_bucket_instance_entry& entry, + BucketIndexAioManager *manager) { + librados::ObjectWriteOperation op; + cls_rgw_set_bucket_resharding(op, entry.reshard_status); return manager->aio_operate(io_ctx, shard_id, oid, &op); } diff --git a/src/cls/rgw/cls_rgw_client.h b/src/cls/rgw/cls_rgw_client.h index 8776c1b5c0446..7cc7558f66bfc 100644 --- a/src/cls/rgw/cls_rgw_client.h +++ b/src/cls/rgw/cls_rgw_client.h @@ -302,6 +302,8 @@ public: int operator()(); }; // class CLSRGWConcurrentIO +void cls_rgw_bucket_set_tag_timeout(librados::ObjectWriteOperation& op, + uint64_t timeout); class CLSRGWIssueBucketIndexInit : public CLSRGWConcurrentIO { protected: @@ -542,6 +544,12 @@ public: CLSRGWConcurrentIO(io_ctx, _bucket_objs, max_aio) {} }; +void cls_rgw_bucket_check_index(librados::ObjectReadOperation& op, + bufferlist& out); +// decode the response; may throw buffer::error +void cls_rgw_bucket_check_index_decode(const bufferlist& out, + rgw_cls_check_index_ret& result); + /** * Check the bucket index. * @@ -563,6 +571,8 @@ public: virtual ~CLSRGWIssueBucketCheck() override {} }; +void cls_rgw_bucket_rebuild_index(librados::ObjectWriteOperation& op); + class CLSRGWIssueBucketRebuild : public CLSRGWConcurrentIO { protected: int issue_op(int shard_id, const std::string& oid) override; @@ -594,6 +604,9 @@ public: virtual ~CLSRGWIssueSetBucketResharding() override {} }; +void cls_rgw_bilog_start(librados::ObjectWriteOperation& op); +void cls_rgw_bilog_stop(librados::ObjectWriteOperation& op); + class CLSRGWIssueResyncBucketBILog : public CLSRGWConcurrentIO { protected: int issue_op(int shard_id, const std::string& oid); @@ -684,12 +697,15 @@ int cls_rgw_reshard_get(librados::IoCtx& io_ctx, const std::string& oid, cls_rgw // cls_rgw in the T+4 (X) release. void cls_rgw_guard_bucket_resharding(librados::ObjectOperation& op, int ret_err); -// these overloads which call io_ctx.operate() should not be called in the rgw. -// rgw_rados_operate() should be called after the overloads w/o calls to io_ctx.operate() -#ifndef CLS_CLIENT_HIDE_IOCTX -int cls_rgw_set_bucket_resharding(librados::IoCtx& io_ctx, const std::string& oid, - const cls_rgw_bucket_instance_entry& entry); -int cls_rgw_clear_bucket_resharding(librados::IoCtx& io_ctx, const std::string& oid); -int cls_rgw_get_bucket_resharding(librados::IoCtx& io_ctx, const std::string& oid, - cls_rgw_bucket_instance_entry *entry); -#endif +void cls_rgw_set_bucket_resharding(librados::ObjectWriteOperation& op, + cls_rgw_reshard_status status); +void cls_rgw_clear_bucket_resharding(librados::ObjectWriteOperation& op); +void cls_rgw_get_bucket_resharding(librados::ObjectReadOperation& op, + bufferlist& out); +// decode the entry; may throw buffer::error +void cls_rgw_get_bucket_resharding_decode(const bufferlist& out, + cls_rgw_bucket_instance_entry& entry); + +// Try to remove all reshard log entries from the bucket index. Return success +// if any entries were removed, and -ENODATA once they're all gone. +void cls_rgw_bucket_reshard_log_trim(librados::ObjectWriteOperation& op); diff --git a/src/cls/rgw/cls_rgw_ops.h b/src/cls/rgw/cls_rgw_ops.h index 2aababad4d683..225df29fe510f 100644 --- a/src/cls/rgw/cls_rgw_ops.h +++ b/src/cls/rgw/cls_rgw_ops.h @@ -7,9 +7,7 @@ struct rgw_cls_tag_timeout_op { - uint64_t tag_timeout; - - rgw_cls_tag_timeout_op() : tag_timeout(0) {} + uint64_t tag_timeout = 0; void encode(ceph::buffer::list &bl) const { ENCODE_START(1, 1, bl); diff --git a/src/rgw/driver/rados/rgw_rados.cc b/src/rgw/driver/rados/rgw_rados.cc index 0cae52de13d92..179694220ee61 100644 --- a/src/rgw/driver/rados/rgw_rados.cc +++ b/src/rgw/driver/rados/rgw_rados.cc @@ -5993,53 +5993,27 @@ int RGWRados::bucket_check_index(const DoutPrefixProvider *dpp, optional_yield y map *existing_stats, map *calculated_stats) { - librados::IoCtx index_pool; - - // key - bucket index object id - // value - bucket index check OP returned result with the given bucket index object (shard) - map oids; - - int ret = svc.bi_rados->open_bucket_index(dpp, bucket_info, std::nullopt, bucket_info.layout.current_index, &index_pool, &oids, nullptr); + std::map buffers; + int ret = svc.bi_rados->check_index(dpp, y, bucket_info, buffers); if (ret < 0) { return ret; } - // declare and pre-populate - map bucket_objs_ret; - for (auto& iter : oids) { - bucket_objs_ret.emplace(iter.first, rgw_cls_check_index_ret()); - } - - maybe_warn_about_blocking(dpp); // TODO: use AioTrottle - ret = CLSRGWIssueBucketCheck(index_pool, oids, bucket_objs_ret, cct->_conf->rgw_bucket_index_max_aio)(); - if (ret < 0) { - return ret; - } + try { + // decode and accumulate the results + for (const auto& kv : buffers) { + rgw_cls_check_index_ret result; + cls_rgw_bucket_check_index_decode(kv.second, result); - // aggregate results (from different shards if there are any) - for (const auto& iter : bucket_objs_ret) { - accumulate_raw_stats(iter.second.existing_header, *existing_stats); - accumulate_raw_stats(iter.second.calculated_header, *calculated_stats); + accumulate_raw_stats(result.existing_header, *existing_stats); + accumulate_raw_stats(result.calculated_header, *calculated_stats); + } + } catch (const ceph::buffer::error&) { + return -EIO; } - return 0; } -int RGWRados::bucket_rebuild_index(const DoutPrefixProvider *dpp, optional_yield y, - const RGWBucketInfo& bucket_info) -{ - librados::IoCtx index_pool; - map bucket_objs; - - int r = svc.bi_rados->open_bucket_index(dpp, bucket_info, std::nullopt, bucket_info.layout.current_index, &index_pool, &bucket_objs, nullptr); - if (r < 0) { - return r; - } - - maybe_warn_about_blocking(dpp); // TODO: use AioTrottle - return CLSRGWIssueBucketRebuild(index_pool, bucket_objs, cct->_conf->rgw_bucket_index_max_aio)(); -} - static int resync_encrypted_multipart(const DoutPrefixProvider* dpp, optional_yield y, RGWRados* store, RGWBucketInfo& bucket_info, @@ -8341,6 +8315,26 @@ int RGWRados::recover_reshard_logrecord(RGWBucketInfo& bucket_info, return 0; } +static int get_reshard_status(const DoutPrefixProvider* dpp, optional_yield y, + librados::IoCtx& ioctx, const std::string& oid, + cls_rgw_bucket_instance_entry& entry) +{ + librados::ObjectReadOperation op; + bufferlist bl; + cls_rgw_get_bucket_resharding(op, bl); + + int ret = rgw_rados_operate(dpp, ioctx, oid, std::move(op), nullptr, y); + if (ret < 0) { + return ret; + } + try { + cls_rgw_get_bucket_resharding_decode(bl, entry); + return 0; + } catch (const buffer::error&) { + return -EIO; + } +} + int RGWRados::block_while_resharding(RGWRados::BucketShard *bs, const rgw_obj& obj_instance, RGWBucketInfo& bucket_info, @@ -8389,7 +8383,7 @@ int RGWRados::block_while_resharding(RGWRados::BucketShard *bs, constexpr int num_retries = 10; for (int i = 1; i <= num_retries; i++) { // nb: 1-based for loop auto& ref = bs->bucket_obj; - ret = cls_rgw_get_bucket_resharding(ref.ioctx, ref.obj.oid, &entry); + ret = get_reshard_status(dpp, y, ref.ioctx, ref.obj.oid, entry); if (ret == -ENOENT) { ret = fetch_new_bucket_info("get_bucket_resharding_failed"); if (ret < 0) { @@ -10230,11 +10224,9 @@ int RGWRados::cls_bucket_list_ordered(const DoutPrefixProvider *dpp, std::map shard_list_results; cls_rgw_obj_key start_after_key(start_after.name, start_after.instance); - maybe_warn_about_blocking(dpp); // TODO: use AioTrottle - r = CLSRGWIssueBucketList(ioctx, start_after_key, prefix, delimiter, - num_entries_per_shard, - list_versions, shard_oids, shard_list_results, - cct->_conf->rgw_bucket_index_max_aio)(); + r = svc.bi_rados->list_objects(dpp, y, ioctx, shard_oids, start_after_key, + prefix, delimiter, num_entries_per_shard, + list_versions, shard_list_results); if (r < 0) { ldpp_dout(dpp, 0) << __func__ << ": CLSRGWIssueBucketList for " << bucket_info.bucket << diff --git a/src/rgw/driver/rados/rgw_rados.h b/src/rgw/driver/rados/rgw_rados.h index 57e60ceb7818e..01be66ca6177d 100644 --- a/src/rgw/driver/rados/rgw_rados.h +++ b/src/rgw/driver/rados/rgw_rados.h @@ -1594,8 +1594,6 @@ public: const RGWBucketInfo& bucket_info, std::map *existing_stats, std::map *calculated_stats); - int bucket_rebuild_index(const DoutPrefixProvider *dpp, optional_yield y, - const RGWBucketInfo& bucket_info); // Search the bucket for encrypted multipart uploads, and increase their mtime // slightly to generate a bilog entry to trigger a resync to repair any diff --git a/src/rgw/driver/rados/rgw_sal_rados.cc b/src/rgw/driver/rados/rgw_sal_rados.cc index 447e33ef8d115..14d4323ac96a7 100644 --- a/src/rgw/driver/rados/rgw_sal_rados.cc +++ b/src/rgw/driver/rados/rgw_sal_rados.cc @@ -806,12 +806,12 @@ int RadosBucket::check_index(const DoutPrefixProvider *dpp, optional_yield y, int RadosBucket::rebuild_index(const DoutPrefixProvider *dpp, optional_yield y) { - return store->getRados()->bucket_rebuild_index(dpp, y, info); + return store->svc()->bi_rados->rebuild_index(dpp, y, info); } int RadosBucket::set_tag_timeout(const DoutPrefixProvider *dpp, optional_yield y, uint64_t timeout) { - return store->getRados()->cls_obj_set_bucket_tag_timeout(dpp, info, timeout); + return store->svc()->bi_rados->set_tag_timeout(dpp, y, info, timeout); } int RadosBucket::purge_instance(const DoutPrefixProvider* dpp, optional_yield y) diff --git a/src/rgw/services/svc_bi_rados.cc b/src/rgw/services/svc_bi_rados.cc index a82aae180b8d5..e0216400a1751 100644 --- a/src/rgw/services/svc_bi_rados.cc +++ b/src/rgw/services/svc_bi_rados.cc @@ -1,6 +1,9 @@ // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab ft=cpp +#include +#include + #include "svc_bi_rados.h" #include "svc_bilog_rados.h" #include "svc_zone.h" @@ -10,12 +13,15 @@ #include "rgw_zone.h" #include "rgw_datalog.h" +#include "driver/rados/shard_io.h" #include "cls/rgw/cls_rgw_client.h" +#include "common/async/blocked_completion.h" #include "common/errno.h" #define dout_subsys ceph_subsys_rgw using namespace std; +using rgwrados::shard_io::Result; static string dir_oid_prefix = ".dir."; @@ -322,6 +328,32 @@ int RGWSI_BucketIndex_RADOS::open_bucket_index_shard(const DoutPrefixProvider *d return 0; } +struct IndexHeadReader : rgwrados::shard_io::RadosReader { + std::map& buffers; + + IndexHeadReader(const DoutPrefixProvider& dpp, + boost::asio::any_io_executor ex, + librados::IoCtx& ioctx, + std::map& buffers) + : RadosReader(dpp, std::move(ex), ioctx), buffers(buffers) + {} + void prepare_read(int shard, librados::ObjectReadOperation& op) override { + auto& bl = buffers[shard]; + op.omap_get_header(&bl, nullptr); + } + Result on_complete(int, boost::system::error_code ec) override { + // ignore ENOENT + if (ec && ec != boost::system::errc::no_such_file_or_directory) { + return Result::Error; + } else { + return Result::Success; + } + } + void add_prefix(std::ostream& out) const override { + out << "read dir headers: "; + } +}; + int RGWSI_BucketIndex_RADOS::cls_bucket_head(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const rgw::bucket_index_layout_generation& idx_layout, @@ -336,24 +368,85 @@ int RGWSI_BucketIndex_RADOS::cls_bucket_head(const DoutPrefixProvider *dpp, if (r < 0) return r; - map list_results; - for (auto& iter : oids) { - list_results.emplace(iter.first, rgw_cls_list_ret()); - } + // read omap headers into bufferlists + std::map buffers; - maybe_warn_about_blocking(dpp); // TODO: use AioTrottle - r = CLSRGWIssueGetDirHeader(index_pool, oids, list_results, - cct->_conf->rgw_bucket_index_max_aio)(); - if (r < 0) - return r; + const size_t max_aio = cct->_conf->rgw_bucket_index_max_aio; + boost::system::error_code ec; + if (y) { + // run on the coroutine's executor and suspend until completion + auto yield = y.get_yield_context(); + auto ex = yield.get_executor(); + auto reader = IndexHeadReader{*dpp, ex, index_pool, buffers}; + + rgwrados::shard_io::async_reads(reader, oids, max_aio, yield[ec]); + } else { + // run a strand on the system executor and block on a condition variable + auto ex = boost::asio::make_strand(boost::asio::system_executor{}); + auto reader = IndexHeadReader{*dpp, ex, index_pool, buffers}; - map::iterator iter = list_results.begin(); - for(; iter != list_results.end(); ++iter) { - headers->push_back(std::move(iter->second.dir.header)); + maybe_warn_about_blocking(dpp); + rgwrados::shard_io::async_reads(reader, oids, max_aio, + ceph::async::use_blocked[ec]); + } + if (ec) { + return ceph::from_error_code(ec); + } + + try { + std::transform(buffers.begin(), buffers.end(), + std::back_inserter(*headers), + [] (const auto& kv) { + rgw_bucket_dir_header header; + auto p = kv.second.cbegin(); + decode(header, p); + return header; + }); + } catch (const ceph::buffer::error&) { + return -EIO; } return 0; } +// init_index() is all-or-nothing so if we fail to initialize all shards, +// we undo the creation of others. RevertibleWriter provides these semantics +struct IndexInitWriter : rgwrados::shard_io::RadosRevertibleWriter { + bool judge_support_logrecord; + + IndexInitWriter(const DoutPrefixProvider& dpp, + boost::asio::any_io_executor ex, + librados::IoCtx& ioctx, + bool judge_support_logrecord) + : RadosRevertibleWriter(dpp, std::move(ex), ioctx), + judge_support_logrecord(judge_support_logrecord) + {} + void prepare_write(int shard, librados::ObjectWriteOperation& op) override { + // don't overwrite. fail with EEXIST if a shard already exists + op.create(true); + if (judge_support_logrecord) { + // fail with EOPNOTSUPP if the osd doesn't support the reshard log + cls_rgw_bucket_init_index2(op); + } else { + cls_rgw_bucket_init_index(op); + } + } + void prepare_revert(int shard, librados::ObjectWriteOperation& op) override { + // on failure, remove any of the shards we successfully created + op.remove(); + } + Result on_complete(int, boost::system::error_code ec) override { + // ignore EEXIST + if (ec && ec != boost::system::errc::file_exists) { + return Result::Error; + } else { + return Result::Success; + } + } + void add_prefix(std::ostream& out) const override { + out << "init index shards: "; + } +}; + int RGWSI_BucketIndex_RADOS::init_index(const DoutPrefixProvider *dpp, optional_yield y, const RGWBucketInfo& bucket_info, @@ -377,18 +470,49 @@ int RGWSI_BucketIndex_RADOS::init_index(const DoutPrefixProvider *dpp, map bucket_objs; get_bucket_index_objects(dir_oid, idx_layout.layout.normal.num_shards, idx_layout.gen, &bucket_objs); - maybe_warn_about_blocking(dpp); // TODO: use AioTrottle - if (judge_support_logrecord) { - return CLSRGWIssueBucketIndexInit2(index_pool, - bucket_objs, - cct->_conf->rgw_bucket_index_max_aio)(); + const size_t max_aio = cct->_conf->rgw_bucket_index_max_aio; + boost::system::error_code ec; + if (y) { + // run on the coroutine's executor and suspend until completion + auto yield = y.get_yield_context(); + auto ex = yield.get_executor(); + auto writer = IndexInitWriter{*dpp, ex, index_pool, judge_support_logrecord}; + + rgwrados::shard_io::async_writes(writer, bucket_objs, max_aio, yield[ec]); } else { - return CLSRGWIssueBucketIndexInit(index_pool, - bucket_objs, - cct->_conf->rgw_bucket_index_max_aio)(); + // run a strand on the system executor and block on a condition variable + auto ex = boost::asio::make_strand(boost::asio::system_executor{}); + auto writer = IndexInitWriter{*dpp, ex, index_pool, judge_support_logrecord}; + + maybe_warn_about_blocking(dpp); + rgwrados::shard_io::async_writes(writer, bucket_objs, max_aio, + ceph::async::use_blocked[ec]); } + return ceph::from_error_code(ec); } +struct IndexCleanWriter : rgwrados::shard_io::RadosWriter { + IndexCleanWriter(const DoutPrefixProvider& dpp, + boost::asio::any_io_executor ex, + librados::IoCtx& ioctx) + : RadosWriter(dpp, std::move(ex), ioctx) + {} + void prepare_write(int shard, librados::ObjectWriteOperation& op) override { + op.remove(); + } + Result on_complete(int, boost::system::error_code ec) override { + // ignore ENOENT + if (ec && ec != boost::system::errc::no_such_file_or_directory) { + return Result::Error; + } else { + return Result::Success; + } + } + void add_prefix(std::ostream& out) const override { + out << "clean index shards: "; + } +}; + int RGWSI_BucketIndex_RADOS::clean_index(const DoutPrefixProvider *dpp, optional_yield y, const RGWBucketInfo& bucket_info, @@ -412,10 +536,25 @@ int RGWSI_BucketIndex_RADOS::clean_index(const DoutPrefixProvider *dpp, get_bucket_index_objects(dir_oid, idx_layout.layout.normal.num_shards, idx_layout.gen, &bucket_objs); - maybe_warn_about_blocking(dpp); // TODO: use AioTrottle - return CLSRGWIssueBucketIndexClean(index_pool, - bucket_objs, - cct->_conf->rgw_bucket_index_max_aio)(); + const size_t max_aio = cct->_conf->rgw_bucket_index_max_aio; + boost::system::error_code ec; + if (y) { + // run on the coroutine's executor and suspend until completion + auto yield = y.get_yield_context(); + auto ex = yield.get_executor(); + auto writer = IndexCleanWriter{*dpp, ex, index_pool}; + + rgwrados::shard_io::async_writes(writer, bucket_objs, max_aio, yield[ec]); + } else { + // run a strand on the system executor and block on a condition variable + auto ex = boost::asio::make_strand(boost::asio::system_executor{}); + auto writer = IndexCleanWriter{*dpp, ex, index_pool}; + + maybe_warn_about_blocking(dpp); + rgwrados::shard_io::async_writes(writer, bucket_objs, max_aio, + ceph::async::use_blocked[ec]); + } + return ceph::from_error_code(ec); } int RGWSI_BucketIndex_RADOS::read_stats(const DoutPrefixProvider *dpp, @@ -452,6 +591,32 @@ int RGWSI_BucketIndex_RADOS::read_stats(const DoutPrefixProvider *dpp, return 0; } +struct ReshardStatusReader : rgwrados::shard_io::RadosReader { + std::map& buffers; + + ReshardStatusReader(const DoutPrefixProvider& dpp, + boost::asio::any_io_executor ex, + librados::IoCtx& ioctx, + std::map& buffers) + : RadosReader(dpp, std::move(ex), ioctx), buffers(buffers) + {} + void prepare_read(int shard, librados::ObjectReadOperation& op) override { + auto& bl = buffers[shard]; + cls_rgw_get_bucket_resharding(op, bl); + } + Result on_complete(int, boost::system::error_code ec) override { + // ignore ENOENT + if (ec && ec != boost::system::errc::no_such_file_or_directory) { + return Result::Error; + } else { + return Result::Success; + } + } + void add_prefix(std::ostream& out) const override { + out << "get resharding status: "; + } +}; + int RGWSI_BucketIndex_RADOS::get_reshard_status(const DoutPrefixProvider *dpp, optional_yield y, const RGWBucketInfo& bucket_info, @@ -471,28 +636,65 @@ int RGWSI_BucketIndex_RADOS::get_reshard_status(const DoutPrefixProvider *dpp, return r; } - for (auto i : bucket_objs) { - cls_rgw_bucket_instance_entry entry; + std::map buffers; + const size_t max_aio = cct->_conf->rgw_bucket_index_max_aio; + boost::system::error_code ec; + if (y) { + // run on the coroutine's executor and suspend until completion + auto yield = y.get_yield_context(); + auto ex = yield.get_executor(); + auto reader = ReshardStatusReader{*dpp, ex, index_pool, buffers}; - int ret = cls_rgw_get_bucket_resharding(index_pool, i.second, &entry); - if (ret < 0 && ret != -ENOENT) { - ldpp_dout(dpp, -1) << "ERROR: " << __func__ << ": cls_rgw_get_bucket_resharding() returned ret=" << ret << dendl; - return ret; - } + rgwrados::shard_io::async_reads(reader, bucket_objs, max_aio, yield[ec]); + } else { + // run a strand on the system executor and block on a condition variable + auto ex = boost::asio::make_strand(boost::asio::system_executor{}); + auto reader = ReshardStatusReader{*dpp, ex, index_pool, buffers}; + + maybe_warn_about_blocking(dpp); + rgwrados::shard_io::async_reads(reader, bucket_objs, max_aio, + ceph::async::use_blocked[ec]); + } + if (ec) { + return ceph::from_error_code(ec); + } - status->push_back(entry); + try { + std::transform(buffers.begin(), buffers.end(), + std::back_inserter(*status), + [] (const auto& kv) { + cls_rgw_bucket_instance_entry entry; + cls_rgw_get_bucket_resharding_decode(kv.second, entry); + return entry; + }); + } catch (const ceph::buffer::error&) { + return -EIO; } return 0; } +struct ReshardStatusWriter : rgwrados::shard_io::RadosWriter { + cls_rgw_reshard_status status; + ReshardStatusWriter(const DoutPrefixProvider& dpp, + boost::asio::any_io_executor ex, + librados::IoCtx& ioctx, + cls_rgw_reshard_status status) + : RadosWriter(dpp, std::move(ex), ioctx), status(status) + {} + void prepare_write(int, librados::ObjectWriteOperation& op) override { + cls_rgw_set_bucket_resharding(op, status); + } + void add_prefix(std::ostream& out) const override { + out << "set resharding status: "; + } +}; + int RGWSI_BucketIndex_RADOS::set_reshard_status(const DoutPrefixProvider *dpp, optional_yield y, const RGWBucketInfo& bucket_info, cls_rgw_reshard_status status) { - const auto entry = cls_rgw_bucket_instance_entry{.reshard_status = status}; - librados::IoCtx index_pool; map bucket_objs; @@ -504,16 +706,47 @@ int RGWSI_BucketIndex_RADOS::set_reshard_status(const DoutPrefixProvider *dpp, return r; } - maybe_warn_about_blocking(dpp); // TODO: use AioTrottle - r = CLSRGWIssueSetBucketResharding(index_pool, bucket_objs, entry, cct->_conf->rgw_bucket_index_max_aio)(); - if (r < 0) { - ldpp_dout(dpp, 0) << "ERROR: " << __func__ << - ": unable to issue set bucket resharding, r=" << r << " (" << - cpp_strerror(-r) << ")" << dendl; + const size_t max_aio = cct->_conf->rgw_bucket_index_max_aio; + boost::system::error_code ec; + if (y) { + // run on the coroutine's executor and suspend until completion + auto yield = y.get_yield_context(); + auto ex = yield.get_executor(); + auto writer = ReshardStatusWriter{*dpp, ex, index_pool, status}; + + rgwrados::shard_io::async_writes(writer, bucket_objs, max_aio, yield[ec]); + } else { + // run a strand on the system executor and block on a condition variable + auto ex = boost::asio::make_strand(boost::asio::system_executor{}); + auto writer = ReshardStatusWriter{*dpp, ex, index_pool, status}; + + maybe_warn_about_blocking(dpp); + rgwrados::shard_io::async_writes(writer, bucket_objs, max_aio, + ceph::async::use_blocked[ec]); } - return r; + return ceph::from_error_code(ec); } +struct ReshardTrimWriter : rgwrados::shard_io::RadosWriter { + using RadosWriter::RadosWriter; + void prepare_write(int, librados::ObjectWriteOperation& op) override { + cls_rgw_bucket_reshard_log_trim(op); + } + Result on_complete(int, boost::system::error_code ec) override { + // keep trimming until ENODATA (no_message_available) + if (!ec) { + return Result::Retry; + } else if (ec == boost::system::errc::no_message_available) { + return Result::Success; + } else { + return Result::Error; + } + } + void add_prefix(std::ostream& out) const override { + out << "trim reshard logs: "; + } +}; + int RGWSI_BucketIndex_RADOS::trim_reshard_log(const DoutPrefixProvider* dpp, optional_yield y, const RGWBucketInfo& bucket_info) @@ -525,7 +758,249 @@ int RGWSI_BucketIndex_RADOS::trim_reshard_log(const DoutPrefixProvider* dpp, if (r < 0) { return r; } - return CLSRGWIssueReshardLogTrim(index_pool, bucket_objs, cct->_conf->rgw_bucket_index_max_aio)(); + + const size_t max_aio = cct->_conf->rgw_bucket_index_max_aio; + boost::system::error_code ec; + if (y) { + // run on the coroutine's executor and suspend until completion + auto yield = y.get_yield_context(); + auto ex = yield.get_executor(); + auto writer = ReshardTrimWriter{*dpp, ex, index_pool}; + + rgwrados::shard_io::async_writes(writer, bucket_objs, max_aio, yield[ec]); + } else { + // run a strand on the system executor and block on a condition variable + auto ex = boost::asio::make_strand(boost::asio::system_executor{}); + auto writer = ReshardTrimWriter{*dpp, ex, index_pool}; + + maybe_warn_about_blocking(dpp); + rgwrados::shard_io::async_writes(writer, bucket_objs, max_aio, + ceph::async::use_blocked[ec]); + } + return ceph::from_error_code(ec); +} + +struct TagTimeoutWriter : rgwrados::shard_io::RadosWriter { + uint64_t timeout; + TagTimeoutWriter(const DoutPrefixProvider& dpp, + boost::asio::any_io_executor ex, + librados::IoCtx& ioctx, + uint64_t timeout) + : RadosWriter(dpp, std::move(ex), ioctx), timeout(timeout) + {} + void prepare_write(int, librados::ObjectWriteOperation& op) override { + cls_rgw_bucket_set_tag_timeout(op, timeout); + } + void add_prefix(std::ostream& out) const override { + out << "set tag timeouts: "; + } +}; + +int RGWSI_BucketIndex_RADOS::set_tag_timeout(const DoutPrefixProvider* dpp, + optional_yield y, + const RGWBucketInfo& bucket_info, + uint64_t timeout) +{ + librados::IoCtx index_pool; + map bucket_objs; + + int r = open_bucket_index(dpp, bucket_info, std::nullopt, bucket_info.layout.current_index, &index_pool, &bucket_objs, nullptr); + if (r < 0) { + return r; + } + + const size_t max_aio = cct->_conf->rgw_bucket_index_max_aio; + boost::system::error_code ec; + if (y) { + // run on the coroutine's executor and suspend until completion + auto yield = y.get_yield_context(); + auto ex = yield.get_executor(); + auto writer = TagTimeoutWriter{*dpp, ex, index_pool, timeout}; + + rgwrados::shard_io::async_writes(writer, bucket_objs, max_aio, yield[ec]); + } else { + // run a strand on the system executor and block on a condition variable + auto ex = boost::asio::make_strand(boost::asio::system_executor{}); + auto writer = TagTimeoutWriter{*dpp, ex, index_pool, timeout}; + + maybe_warn_about_blocking(dpp); + rgwrados::shard_io::async_writes(writer, bucket_objs, max_aio, + ceph::async::use_blocked[ec]); + } + return ceph::from_error_code(ec); +} + +struct CheckReader : rgwrados::shard_io::RadosReader { + std::map& buffers; + + CheckReader(const DoutPrefixProvider& dpp, + boost::asio::any_io_executor ex, + librados::IoCtx& ioctx, + std::map& buffers) + : RadosReader(dpp, std::move(ex), ioctx), buffers(buffers) + {} + void prepare_read(int shard, librados::ObjectReadOperation& op) override { + auto& bl = buffers[shard]; + cls_rgw_bucket_check_index(op, bl); + } + void add_prefix(std::ostream& out) const override { + out << "check index shards: "; + } +}; + +int RGWSI_BucketIndex_RADOS::check_index(const DoutPrefixProvider *dpp, optional_yield y, + const RGWBucketInfo& bucket_info, + std::map& buffers) +{ + librados::IoCtx index_pool; + std::map bucket_objs; + + int r = open_bucket_index(dpp, bucket_info, std::nullopt, bucket_info.layout.current_index, &index_pool, &bucket_objs, nullptr); + if (r < 0) { + return r; + } + + const size_t max_aio = cct->_conf->rgw_bucket_index_max_aio; + boost::system::error_code ec; + if (y) { + // run on the coroutine's executor and suspend until completion + auto yield = y.get_yield_context(); + auto ex = yield.get_executor(); + auto reader = CheckReader{*dpp, ex, index_pool, buffers}; + + rgwrados::shard_io::async_reads(reader, bucket_objs, max_aio, yield[ec]); + } else { + // run a strand on the system executor and block on a condition variable + auto ex = boost::asio::make_strand(boost::asio::system_executor{}); + auto reader = CheckReader{*dpp, ex, index_pool, buffers}; + + maybe_warn_about_blocking(dpp); + rgwrados::shard_io::async_reads(reader, bucket_objs, max_aio, + ceph::async::use_blocked[ec]); + } + return ceph::from_error_code(ec); +} + +struct RebuildWriter : rgwrados::shard_io::RadosWriter { + using RadosWriter::RadosWriter; + void prepare_write(int, librados::ObjectWriteOperation& op) override { + cls_rgw_bucket_rebuild_index(op); + } + void add_prefix(std::ostream& out) const override { + out << "rebuild index shards: "; + } +}; + +int RGWSI_BucketIndex_RADOS::rebuild_index(const DoutPrefixProvider *dpp, + optional_yield y, + const RGWBucketInfo& bucket_info) +{ + librados::IoCtx index_pool; + map bucket_objs; + + int r = open_bucket_index(dpp, bucket_info, std::nullopt, bucket_info.layout.current_index, &index_pool, &bucket_objs, nullptr); + if (r < 0) { + return r; + } + + const size_t max_aio = cct->_conf->rgw_bucket_index_max_aio; + boost::system::error_code ec; + if (y) { + // run on the coroutine's executor and suspend until completion + auto yield = y.get_yield_context(); + auto ex = yield.get_executor(); + auto writer = RebuildWriter{*dpp, ex, index_pool}; + + rgwrados::shard_io::async_writes(writer, bucket_objs, max_aio, yield[ec]); + } else { + // run a strand on the system executor and block on a condition variable + auto ex = boost::asio::make_strand(boost::asio::system_executor{}); + auto writer = RebuildWriter{*dpp, ex, index_pool}; + + maybe_warn_about_blocking(dpp); + rgwrados::shard_io::async_writes(writer, bucket_objs, max_aio, + ceph::async::use_blocked[ec]); + } + return ceph::from_error_code(ec); +} + +struct ListReader : rgwrados::shard_io::RadosReader { + const cls_rgw_obj_key& start_obj; + const std::string& prefix; + const std::string& delimiter; + uint32_t num_entries; + bool list_versions; + std::map& results; + + ListReader(const DoutPrefixProvider& dpp, + boost::asio::any_io_executor ex, + librados::IoCtx& ioctx, + const cls_rgw_obj_key& start_obj, + const std::string& prefix, + const std::string& delimiter, + uint32_t num_entries, bool list_versions, + std::map& results) + : RadosReader(dpp, std::move(ex), ioctx), + start_obj(start_obj), prefix(prefix), delimiter(delimiter), + num_entries(num_entries), list_versions(list_versions), + results(results) + {} + void prepare_read(int shard, librados::ObjectReadOperation& op) override { + // set the marker depending on whether we've already queried this + // shard and gotten a RGWBIAdvanceAndRetryError (defined + // constant) return value; if we have use the marker in the return + // to advance the search, otherwise use the marker passed in by the + // caller + auto& result = results[shard]; + const cls_rgw_obj_key& marker = + result.marker.empty() ? start_obj : result.marker; + cls_rgw_bucket_list_op(op, marker, prefix, delimiter, + num_entries, list_versions, &result); + } + Result on_complete(int, boost::system::error_code ec) override { + if (ec.value() == -RGWBIAdvanceAndRetryError) { + return Result::Retry; + } else if (ec) { + return Result::Error; + } else { + return Result::Success; + } + } + void add_prefix(std::ostream& out) const override { + out << "sharded list objects: "; + } +}; + +int RGWSI_BucketIndex_RADOS::list_objects(const DoutPrefixProvider* dpp, optional_yield y, + librados::IoCtx& index_pool, + const std::map& bucket_objs, + const cls_rgw_obj_key& start_obj, + const std::string& prefix, + const std::string& delimiter, + uint32_t num_entries, bool list_versions, + std::map& results) +{ + const size_t max_aio = cct->_conf->rgw_bucket_index_max_aio; + boost::system::error_code ec; + if (y) { + // run on the coroutine's executor and suspend until completion + auto yield = y.get_yield_context(); + auto ex = yield.get_executor(); + auto reader = ListReader{*dpp, ex, index_pool, start_obj, prefix, delimiter, + num_entries, list_versions, results}; + + rgwrados::shard_io::async_reads(reader, bucket_objs, max_aio, yield[ec]); + } else { + // run a strand on the system executor and block on a condition variable + auto ex = boost::asio::make_strand(boost::asio::system_executor{}); + auto reader = ListReader{*dpp, ex, index_pool, start_obj, prefix, delimiter, + num_entries, list_versions, results}; + + maybe_warn_about_blocking(dpp); + rgwrados::shard_io::async_reads(reader, bucket_objs, max_aio, + ceph::async::use_blocked[ec]); + } + return ceph::from_error_code(ec); } int RGWSI_BucketIndex_RADOS::handle_overwrite(const DoutPrefixProvider *dpp, diff --git a/src/rgw/services/svc_bi_rados.h b/src/rgw/services/svc_bi_rados.h index 24a9fa14a68a0..2103c1c08a16d 100644 --- a/src/rgw/services/svc_bi_rados.h +++ b/src/rgw/services/svc_bi_rados.h @@ -24,6 +24,7 @@ #include "svc_tier_rados.h" struct rgw_bucket_dir_header; +struct rgw_cls_list_ret; class RGWSI_BILog_RADOS; @@ -145,6 +146,26 @@ public: int trim_reshard_log(const DoutPrefixProvider* dpp, optional_yield, const RGWBucketInfo& bucket_info); + int set_tag_timeout(const DoutPrefixProvider *dpp, optional_yield y, + const RGWBucketInfo& bucket_info, uint64_t timeout); + + int check_index(const DoutPrefixProvider *dpp, optional_yield y, + const RGWBucketInfo& bucket_info, + std::map& buffers); + + int rebuild_index(const DoutPrefixProvider *dpp, optional_yield y, + const RGWBucketInfo& bucket_info); + + /// Read the requested number of entries from each index shard object. + int list_objects(const DoutPrefixProvider* dpp, optional_yield y, + librados::IoCtx& index_pool, + const std::map& bucket_objs, + const cls_rgw_obj_key& start_obj, + const std::string& prefix, + const std::string& delimiter, + uint32_t num_entries, bool list_versions, + std::map& results); + int handle_overwrite(const DoutPrefixProvider *dpp, const RGWBucketInfo& info, const RGWBucketInfo& orig_info, optional_yield y) override; diff --git a/src/rgw/services/svc_bilog_rados.cc b/src/rgw/services/svc_bilog_rados.cc index 71bcbd5660d58..ac35378f3cfba 100644 --- a/src/rgw/services/svc_bilog_rados.cc +++ b/src/rgw/services/svc_bilog_rados.cc @@ -5,11 +5,14 @@ #include "svc_bi_rados.h" #include "rgw_asio_thread.h" +#include "driver/rados/shard_io.h" #include "cls/rgw/cls_rgw_client.h" +#include "common/async/blocked_completion.h" #define dout_subsys ceph_subsys_rgw using namespace std; +using rgwrados::shard_io::Result; RGWSI_BILog_RADOS::RGWSI_BILog_RADOS(CephContext *cct) : RGWServiceInstance(cct) { @@ -20,6 +23,35 @@ void RGWSI_BILog_RADOS::init(RGWSI_BucketIndex_RADOS *bi_rados_svc) svc.bi = bi_rados_svc; } +struct TrimWriter : rgwrados::shard_io::RadosWriter { + const BucketIndexShardsManager& start; + const BucketIndexShardsManager& end; + + TrimWriter(const DoutPrefixProvider& dpp, + boost::asio::any_io_executor ex, + librados::IoCtx& ioctx, + const BucketIndexShardsManager& start, + const BucketIndexShardsManager& end) + : RadosWriter(dpp, std::move(ex), ioctx), start(start), end(end) + {} + void prepare_write(int shard, librados::ObjectWriteOperation& op) override { + cls_rgw_bilog_trim(op, start.get(shard, ""), end.get(shard, "")); + } + Result on_complete(int, boost::system::error_code ec) override { + // continue trimming until -ENODATA or other error + if (ec == boost::system::errc::no_message_available) { + return Result::Success; + } else if (ec) { + return Result::Error; + } else { + return Result::Retry; + } + } + void add_prefix(std::ostream& out) const override { + out << "trim bilog shards: "; + } +}; + int RGWSI_BILog_RADOS::log_trim(const DoutPrefixProvider *dpp, optional_yield y, const RGWBucketInfo& bucket_info, const rgw::bucket_log_layout_generation& log_layout, @@ -49,11 +81,37 @@ int RGWSI_BILog_RADOS::log_trim(const DoutPrefixProvider *dpp, optional_yield y, return r; } - maybe_warn_about_blocking(dpp); // TODO: use AioTrottle - return CLSRGWIssueBILogTrim(index_pool, start_marker_mgr, end_marker_mgr, bucket_objs, - cct->_conf->rgw_bucket_index_max_aio)(); + const size_t max_aio = cct->_conf->rgw_bucket_index_max_aio; + boost::system::error_code ec; + if (y) { + // run on the coroutine's executor and suspend until completion + auto yield = y.get_yield_context(); + auto ex = yield.get_executor(); + auto writer = TrimWriter{*dpp, ex, index_pool, start_marker_mgr, end_marker_mgr}; + + rgwrados::shard_io::async_writes(writer, bucket_objs, max_aio, yield[ec]); + } else { + // run a strand on the system executor and block on a condition variable + auto ex = boost::asio::make_strand(boost::asio::system_executor{}); + auto writer = TrimWriter{*dpp, ex, index_pool, start_marker_mgr, end_marker_mgr}; + + maybe_warn_about_blocking(dpp); + rgwrados::shard_io::async_writes(writer, bucket_objs, max_aio, + ceph::async::use_blocked[ec]); + } + return ceph::from_error_code(ec); } +struct StartWriter : rgwrados::shard_io::RadosWriter { + using RadosWriter::RadosWriter; + void prepare_write(int, librados::ObjectWriteOperation& op) override { + cls_rgw_bilog_start(op); + } + void add_prefix(std::ostream& out) const override { + out << "restart bilog shards: "; + } +}; + int RGWSI_BILog_RADOS::log_start(const DoutPrefixProvider *dpp, optional_yield y, const RGWBucketInfo& bucket_info, const rgw::bucket_log_layout_generation& log_layout, @@ -66,10 +124,37 @@ int RGWSI_BILog_RADOS::log_start(const DoutPrefixProvider *dpp, optional_yield y if (r < 0) return r; - maybe_warn_about_blocking(dpp); // TODO: use AioTrottle - return CLSRGWIssueResyncBucketBILog(index_pool, bucket_objs, cct->_conf->rgw_bucket_index_max_aio)(); + const size_t max_aio = cct->_conf->rgw_bucket_index_max_aio; + boost::system::error_code ec; + if (y) { + // run on the coroutine's executor and suspend until completion + auto yield = y.get_yield_context(); + auto ex = yield.get_executor(); + auto writer = StartWriter{*dpp, ex, index_pool}; + + rgwrados::shard_io::async_writes(writer, bucket_objs, max_aio, yield[ec]); + } else { + // run a strand on the system executor and block on a condition variable + auto ex = boost::asio::make_strand(boost::asio::system_executor{}); + auto writer = StartWriter{*dpp, ex, index_pool}; + + maybe_warn_about_blocking(dpp); + rgwrados::shard_io::async_writes(writer, bucket_objs, max_aio, + ceph::async::use_blocked[ec]); + } + return ceph::from_error_code(ec); } +struct StopWriter : rgwrados::shard_io::RadosWriter { + using RadosWriter::RadosWriter; + void prepare_write(int, librados::ObjectWriteOperation& op) override { + cls_rgw_bilog_stop(op); + } + void add_prefix(std::ostream& out) const override { + out << "stop bilog shards: "; + } +}; + int RGWSI_BILog_RADOS::log_stop(const DoutPrefixProvider *dpp, optional_yield y, const RGWBucketInfo& bucket_info, const rgw::bucket_log_layout_generation& log_layout, @@ -82,8 +167,25 @@ int RGWSI_BILog_RADOS::log_stop(const DoutPrefixProvider *dpp, optional_yield y, if (r < 0) return r; - maybe_warn_about_blocking(dpp); // TODO: use AioTrottle - return CLSRGWIssueBucketBILogStop(index_pool, bucket_objs, cct->_conf->rgw_bucket_index_max_aio)(); + const size_t max_aio = cct->_conf->rgw_bucket_index_max_aio; + boost::system::error_code ec; + if (y) { + // run on the coroutine's executor and suspend until completion + auto yield = y.get_yield_context(); + auto ex = yield.get_executor(); + auto writer = StopWriter{*dpp, ex, index_pool}; + + rgwrados::shard_io::async_writes(writer, bucket_objs, max_aio, yield[ec]); + } else { + // run a strand on the system executor and block on a condition variable + auto ex = boost::asio::make_strand(boost::asio::system_executor{}); + auto writer = StopWriter{*dpp, ex, index_pool}; + + maybe_warn_about_blocking(dpp); + rgwrados::shard_io::async_writes(writer, bucket_objs, max_aio, + ceph::async::use_blocked[ec]); + } + return ceph::from_error_code(ec); } static void build_bucket_index_marker(const string& shard_id_str, @@ -96,6 +198,62 @@ static void build_bucket_index_marker(const string& shard_id_str, } } +struct LogReader : rgwrados::shard_io::RadosReader { + const BucketIndexShardsManager& start; + uint32_t max; + std::map& logs; + + LogReader(const DoutPrefixProvider& dpp, boost::asio::any_io_executor ex, + librados::IoCtx& ioctx, const BucketIndexShardsManager& start, + uint32_t max, std::map& logs) + : RadosReader(dpp, std::move(ex), ioctx), + start(start), max(max), logs(logs) + {} + void prepare_read(int shard, librados::ObjectReadOperation& op) override { + auto& result = logs[shard]; + cls_rgw_bilog_list(op, start.get(shard, ""), max, &result, nullptr); + } + void add_prefix(std::ostream& out) const override { + out << "list bilog shards: "; + } +}; + +static int bilog_list(const DoutPrefixProvider* dpp, optional_yield y, + RGWSI_BucketIndex_RADOS* svc_bi, + const RGWBucketInfo& bucket_info, + const rgw::bucket_log_layout_generation& log_layout, + const BucketIndexShardsManager& start, + int shard_id, uint32_t max, + std::map& logs) +{ + librados::IoCtx index_pool; + map oids; + const auto& current_index = rgw::log_to_index_layout(log_layout); + int r = svc_bi->open_bucket_index(dpp, bucket_info, shard_id, current_index, &index_pool, &oids, nullptr); + if (r < 0) + return r; + + const size_t max_aio = dpp->get_cct()->_conf->rgw_bucket_index_max_aio; + boost::system::error_code ec; + if (y) { + // run on the coroutine's executor and suspend until completion + auto yield = y.get_yield_context(); + auto ex = yield.get_executor(); + auto reader = LogReader{*dpp, ex, index_pool, start, max, logs}; + + rgwrados::shard_io::async_reads(reader, oids, max_aio, yield[ec]); + } else { + // run a strand on the system executor and block on a condition variable + auto ex = boost::asio::make_strand(boost::asio::system_executor{}); + auto reader = LogReader{*dpp, ex, index_pool, start, max, logs}; + + maybe_warn_about_blocking(dpp); + rgwrados::shard_io::async_reads(reader, oids, max_aio, + ceph::async::use_blocked[ec]); + } + return ceph::from_error_code(ec); +} + int RGWSI_BILog_RADOS::log_list(const DoutPrefixProvider *dpp, optional_yield y, const RGWBucketInfo& bucket_info, const rgw::bucket_log_layout_generation& log_layout, @@ -105,26 +263,19 @@ int RGWSI_BILog_RADOS::log_list(const DoutPrefixProvider *dpp, optional_yield y, ldpp_dout(dpp, 20) << __func__ << ": " << bucket_info.bucket << " marker " << marker << " shard_id=" << shard_id << " max " << max << dendl; result.clear(); - librados::IoCtx index_pool; - map oids; - map bi_log_lists; - const auto& current_index = rgw::log_to_index_layout(log_layout); - int r = svc.bi->open_bucket_index(dpp, bucket_info, shard_id, current_index, &index_pool, &oids, nullptr); - if (r < 0) - return r; - BucketIndexShardsManager marker_mgr; - bool has_shards = (oids.size() > 1 || shard_id >= 0); + const bool has_shards = (shard_id >= 0); // If there are multiple shards for the bucket index object, the marker // should have the pattern '{shard_id_1}#{shard_marker_1},{shard_id_2}# // {shard_marker_2}...', if there is no sharding, the bi_log_list should // only contain one record, and the key is the bucket instance id. - r = marker_mgr.from_string(marker, shard_id); + int r = marker_mgr.from_string(marker, shard_id); if (r < 0) return r; - maybe_warn_about_blocking(dpp); // TODO: use AioTrottle - r = CLSRGWIssueBILogList(index_pool, marker_mgr, max, oids, bi_log_lists, cct->_conf->rgw_bucket_index_max_aio)(); + std::map bi_log_lists; + r = bilog_list(dpp, y, svc.bi, bucket_info, log_layout, marker_mgr, + shard_id, max, bi_log_lists); if (r < 0) return r;