From ecd1f81e46bd43678ed09e99e0e0334c410c0f1c Mon Sep 17 00:00:00 2001 From: "Adam C. Emerson" Date: Fri, 18 Apr 2025 03:31:35 -0400 Subject: [PATCH] rgw: Use `run_coro` to call coroutines at use This avoids having two entry points with different error checking preparation, etc. to get out of sync or have a fix get forgotten. Signed-off-by: Adam C. Emerson (cherry picked from commit 47329444b8c62e99a9c823c3fbba31f26a7aab7a) Signed-off-by: Adam C. Emerson --- src/rgw/driver/rados/rgw_datalog.cc | 225 ++++------------------- src/rgw/driver/rados/rgw_datalog.h | 38 ++-- src/rgw/driver/rados/rgw_rest_log.cc | 33 +++- src/rgw/driver/rados/rgw_trim_datalog.cc | 6 +- src/rgw/radosgw-admin/radosgw-admin.cc | 92 +++++---- src/test/rgw/test_datalog.cc | 2 +- 6 files changed, 135 insertions(+), 261 deletions(-) diff --git a/src/rgw/driver/rados/rgw_datalog.cc b/src/rgw/driver/rados/rgw_datalog.cc index 521241b9f1630..562ba1bd7ef1f 100644 --- a/src/rgw/driver/rados/rgw_datalog.cc +++ b/src/rgw/driver/rados/rgw_datalog.cc @@ -1084,7 +1084,7 @@ DataLogBackends::list(const DoutPrefixProvider *dpp, int shard, } asio::awaitable, - std::string>> + std::string, bool>> RGWDataChangesLog::list_entries(const DoutPrefixProvider* dpp, int shard, int max_entries, std::string marker) { @@ -1096,71 +1096,24 @@ RGWDataChangesLog::list_entries(const DoutPrefixProvider* dpp, int shard, } if (max_entries <= 0) { co_return std::make_tuple(std::vector{}, - std::string{}); + std::string{}, false); } std::vector entries(max_entries); entries.resize(max_entries); auto [spanentries, outmark] = co_await bes->list(dpp, shard, entries, marker); entries.resize(spanentries.size()); - co_return std::make_tuple(std::move(entries), std::move(outmark)); -} - -int RGWDataChangesLog::list_entries( - const DoutPrefixProvider *dpp, int shard, - int max_entries, std::vector& entries, - std::string_view marker, std::string* out_marker, bool* truncated, - std::string* errstr, optional_yield y) -{ - std::tuple, - std::string> out; - if (shard >= num_shards) [[unlikely]] { - if (errstr) { - *errstr = fmt::format("{} is not a valid shard. Valid shards are integers in [0, {})", - shard, num_shards); - } - return -EINVAL; - } - if (std::ssize(entries) < max_entries) { - entries.resize(max_entries); - } - try { - if (y) { - auto& yield = y.get_yield_context(); - out = asio::co_spawn(yield.get_executor(), - bes->list(dpp, shard, entries, - std::string{marker}), - yield); - } else { - maybe_warn_about_blocking(dpp); - out = asio::co_spawn(rados->get_executor(), - bes->list(dpp, shard, entries, - std::string{marker}), - async::use_blocked); - } - } catch (const std::exception&) { - return ceph::from_exception(std::current_exception()); - } - auto& [outries, outmark] = out; - if (auto size = std::ssize(outries); size < std::ssize(entries)) { - entries.resize(size); - } - if (truncated) { - *truncated = !outmark.empty(); - } - if (out_marker) { - *out_marker = std::move(outmark); - } - return 0; + bool truncated = !outmark.empty(); + co_return std::make_tuple(std::move(entries), std::move(outmark), truncated); } asio::awaitable, - RGWDataChangesLogMarker>> + RGWDataChangesLogMarker, bool>> RGWDataChangesLog::list_entries(const DoutPrefixProvider *dpp, int max_entries, RGWDataChangesLogMarker marker) { if (max_entries <= 0) { co_return std::make_tuple(std::vector{}, - RGWDataChangesLogMarker{}); + RGWDataChangesLogMarker{}, false); } std::vector entries(max_entries); @@ -1184,78 +1137,25 @@ RGWDataChangesLog::list_entries(const DoutPrefixProvider *dpp, if (!remaining.empty()) { entries.resize(entries.size() - remaining.size()); } - co_return std::make_tuple(std::move(entries), std::move(marker)); -} - -int RGWDataChangesLog::list_entries(const DoutPrefixProvider *dpp,int max_entries, - std::vector& entries, - RGWDataChangesLogMarker& marker, bool *ptruncated, - optional_yield y) -{ - std::tuple, - RGWDataChangesLogMarker> out; - if (std::ssize(entries) < max_entries) { - entries.resize(max_entries); - } - try { - if (y) { - auto& yield = y.get_yield_context(); - out = asio::co_spawn(yield.get_executor(), - list_entries(dpp, max_entries, - RGWDataChangesLogMarker{marker}), - yield); - } else { - maybe_warn_about_blocking(dpp); - out = asio::co_spawn(rados->get_executor(), - list_entries(dpp, max_entries, - RGWDataChangesLogMarker{marker}), - async::use_blocked); - } - } catch (const std::exception&) { - return ceph::from_exception(std::current_exception()); - } - auto& [outries, outmark] = out; - if (auto size = std::ssize(outries); size < std::ssize(entries)) { - entries.resize(size); - } - if (ptruncated) { - *ptruncated = (outmark.shard > 0 || !outmark.marker.empty()); - } - marker = std::move(outmark); - return 0; + bool truncated = marker; + co_return std::make_tuple(std::move(entries), std::move(marker), truncated); } -int RGWDataChangesLog::get_info(const DoutPrefixProvider* dpp, int shard_id, - RGWDataChangesLogInfo* info, - std::string* errstr, optional_yield y) +asio::awaitable +RGWDataChangesLog::get_info(const DoutPrefixProvider* dpp, int shard_id) { if (shard_id >= num_shards) [[unlikely]] { - if (errstr) { - *errstr = fmt::format( + throw sys::system_error{-EINVAL, sys::generic_category(), + fmt::format( "{} is not a valid shard. Valid shards are integers in [0, {})", - shard_id, num_shards); - } + shard_id, num_shards)}; } auto be = bes->head(); - try { - if (y) { - auto& yield = y.get_yield_context(); - *info = asio::co_spawn(yield.get_executor(), - be->get_info(dpp, shard_id), - yield); - } else { - maybe_warn_about_blocking(dpp); - *info = asio::co_spawn(rados->get_executor(), - be->get_info(dpp, shard_id), - async::use_blocked); - } - } catch (const std::exception&) { - return ceph::from_exception(std::current_exception()); - } - if (!info->marker.empty()) { - info->marker = gencursor(be->gen_id, info->marker); + auto info = co_await be->get_info(dpp, shard_id); + if (!info.marker.empty()) { + info.marker = gencursor(be->gen_id, info.marker); } - return 0; + co_return info; } asio::awaitable DataLogBackends::trim_entries( @@ -1285,46 +1185,27 @@ asio::awaitable DataLogBackends::trim_entries( co_return; } -int RGWDataChangesLog::trim_entries(const DoutPrefixProvider *dpp, int shard_id, - std::string_view marker, std::string* errstr, - optional_yield y) +asio::awaitable +RGWDataChangesLog::trim_entries(const DoutPrefixProvider *dpp, int shard_id, + std::string_view marker) { if (shard_id >= num_shards) [[unlikely]] { - if (errstr) { - *errstr = fmt::format( + throw sys::system_error{-EINVAL, sys::generic_category(), + fmt::format( "{} is not a valid shard. Valid shards are integers in [0, {})", - shard_id, num_shards); - } + shard_id, num_shards)}; } - try { - if (y) { - auto& yield = y.get_yield_context(); - asio::co_spawn(yield.get_executor(), - bes->trim_entries(dpp, shard_id, marker), - yield); - } else { - maybe_warn_about_blocking(dpp); - asio::co_spawn(rados->get_executor(), - bes->trim_entries(dpp, shard_id, marker), - async::use_blocked); - } - } catch (const std::exception& e) { - return ceph::from_exception(std::current_exception()); - } - return 0; + auto be = bes->head(); + co_return co_await bes->trim_entries(dpp, shard_id, marker); } -int RGWDataChangesLog::trim_entries(const DoutPrefixProvider* dpp, int shard_id, - std::string_view marker, - librados::AioCompletion* c) +void RGWDataChangesLog::trim_entries(const DoutPrefixProvider* dpp, int shard_id, + std::string_view marker, + librados::AioCompletion* c) { - if (shard_id >= num_shards) [[unlikely]] { - return -EINVAL; - } asio::co_spawn(rados->get_executor(), - bes->trim_entries(dpp, shard_id, marker), + trim_entries(dpp, shard_id, marker), c); - return 0; } @@ -1460,7 +1341,7 @@ asio::awaitable RGWDataChangesLog::renew_run(decltype(renew_signal)) { std::optional through; ldpp_dout(&dp, 2) << "RGWDataChangesLog::ChangesRenewThread: pruning old generations" << dendl; operation = "trim_generations"sv; - co_await bes->trim_generations(&dp, through); + co_await trim_generations(&dp, through); operation = {}; if (through) { ldpp_dout(&dp, 2) @@ -1532,54 +1413,22 @@ std::string RGWDataChangesLog::max_marker() const { "~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~"); } -int RGWDataChangesLog::change_format(const DoutPrefixProvider *dpp, - log_type type,optional_yield y) +asio::awaitable +RGWDataChangesLog::change_format(const DoutPrefixProvider *dpp, log_type type) { - try { - if (y) { - auto& yield = y.get_yield_context(); - asio::co_spawn(yield.get_executor(), - bes->new_backing(dpp, type), - yield); - } else { - maybe_warn_about_blocking(dpp); - asio::co_spawn(rados->get_executor(), - bes->new_backing(dpp, type), - async::use_blocked); - } - } catch (const std::exception&) { - return ceph::from_exception(std::current_exception()); - } - return 0;; + co_return co_await bes->new_backing(dpp, type); } -int RGWDataChangesLog::trim_generations(const DoutPrefixProvider *dpp, - std::optional& through, - optional_yield y) +asio::awaitable +RGWDataChangesLog::trim_generations(const DoutPrefixProvider *dpp, + std::optional& through) { - try { - if (y) { - auto& yield = y.get_yield_context(); - asio::co_spawn(yield.get_executor(), - bes->trim_generations(dpp, through), - yield); - } else { - maybe_warn_about_blocking(dpp); - asio::co_spawn(rados->get_executor(), - bes->trim_generations(dpp, through), - async::use_blocked); - } - } catch (const std::exception& e) { - return ceph::from_exception(std::current_exception()); - } - - return 0; + co_return co_await bes->trim_generations(dpp, through); } asio::awaitable, std::string>> RGWDataChangesLog::read_sems(int index, std::string cursor) { - namespace sem_set = neorados::cls::sem_set; bc::flat_map out; try { co_await rados->execute( diff --git a/src/rgw/driver/rados/rgw_datalog.h b/src/rgw/driver/rados/rgw_datalog.h index 81772d7388d83..5afca080b462a 100644 --- a/src/rgw/driver/rados/rgw_datalog.h +++ b/src/rgw/driver/rados/rgw_datalog.h @@ -463,30 +463,25 @@ public: int shard_id, optional_yield y); int get_log_shard_id(rgw_bucket& bucket, int shard_id); asio::awaitable, - std::string>> + std::string, bool>> list_entries(const DoutPrefixProvider* dpp, int shard, int max_entries, std::string marker); - int list_entries(const DoutPrefixProvider *dpp, int shard, int max_entries, - std::vector& entries, - std::string_view marker, std::string* out_marker, - bool* truncated, std::string* errstr, optional_yield y); asio::awaitable, - RGWDataChangesLogMarker>> + RGWDataChangesLogMarker, bool>> list_entries(const DoutPrefixProvider *dpp, int max_entries, RGWDataChangesLogMarker marker); - int list_entries(const DoutPrefixProvider *dpp, int max_entries, - std::vector& entries, - RGWDataChangesLogMarker& marker, bool* ptruncated, - optional_yield y); - - int trim_entries(const DoutPrefixProvider *dpp, int shard_id, - std::string_view marker, std::string* errstr, optional_yield y); - int trim_entries(const DoutPrefixProvider *dpp, int shard_id, - std::string_view marker, librados::AioCompletion* c); - int get_info(const DoutPrefixProvider *dpp, int shard_id, - RGWDataChangesLogInfo *info, std::string* errstr, - optional_yield y); - + asio::awaitable + get_info(const DoutPrefixProvider* dpp, int shard_id); + asio::awaitable + trim_entries(const DoutPrefixProvider *dpp, int shard_id, + std::string_view marker); + void trim_entries(const DoutPrefixProvider *dpp, int shard_id, + std::string_view marker, librados::AioCompletion* c); + asio::awaitable + trim_generations(const DoutPrefixProvider *dpp, + std::optional& through); + asio::awaitable + change_format(const DoutPrefixProvider *dpp, log_type type); void mark_modified(int shard_id, const rgw_bucket_shard& bs, uint64_t gen); auto read_clear_modified() { std::unique_lock wl{modified_lock}; @@ -509,11 +504,6 @@ public: std::string get_sem_set_oid(int shard_id) const; - int change_format(const DoutPrefixProvider *dpp, log_type type, - optional_yield y); - int trim_generations(const DoutPrefixProvider *dpp, - std::optional& through, - optional_yield y); asio::awaitable, std::string>> read_sems(int index, std::string cursor); diff --git a/src/rgw/driver/rados/rgw_rest_log.cc b/src/rgw/driver/rados/rgw_rest_log.cc index d3d71465d0a18..61a54d9a02446 100644 --- a/src/rgw/driver/rados/rgw_rest_log.cc +++ b/src/rgw/driver/rados/rgw_rest_log.cc @@ -15,6 +15,7 @@ #include "common/ceph_json.h" #include "common/strtol.h" +#include "rgw/async_utils.h" #include "rgw_rest.h" #include "rgw_op.h" #include "rgw_rest_s3.h" @@ -688,13 +689,22 @@ void RGWOp_DATALog_List::execute(optional_yield y) { // Note that last_marker is updated to be the marker of the last // entry listed - op_ret = static_cast(driver)->svc()-> - datalog_rados->list_entries(this, shard_id, max_entries, entries, - marker, &last_marker, &truncated, nullptr, y); + auto store = static_cast(driver); + op_ret = rgw::run_coro( + this, + store->get_io_context(), + store->svc()->datalog_rados->list_entries(this, shard_id, + max_entries, marker), + std::tie(entries, last_marker, truncated), + "RGWDataChangesLog::list_entries", y); + RGWDataChangesLogInfo info; - op_ret = static_cast(driver)->svc()-> - datalog_rados->get_info(this, shard_id, &info, nullptr, y); + op_ret = rgw::run_coro( + this, + store->get_io_context(), + store->svc()->datalog_rados->get_info(this, shard_id), + info, "RGWDataChangesLog::get_info", y); last_update = info.last_update; } @@ -756,8 +766,10 @@ void RGWOp_DATALog_ShardInfo::execute(optional_yield y) { return; } - op_ret = static_cast(driver)->svc()-> - datalog_rados->get_info(this, shard_id, &info, nullptr, y); + auto store = static_cast(driver); + op_ret = rgw::run_coro(this, store->get_io_context(), + store->svc()->datalog_rados->get_info(this, shard_id), + info, "RGWDataChangesLog::get_info", y); } void RGWOp_DATALog_ShardInfo::send_response() { @@ -906,8 +918,11 @@ void RGWOp_DATALog_Delete::execute(optional_yield y) { return; } - op_ret = static_cast(driver)->svc()-> - datalog_rados->trim_entries(this, shard_id, marker, nullptr, y); + auto store = static_cast(driver); + op_ret = rgw::run_coro( + this, store->get_io_context(), + store->svc()->datalog_rados->trim_entries(this, shard_id, marker), + "RGWDataChangesLog::trim_entries", y); } // not in header to avoid pulling in rgw_sync.h diff --git a/src/rgw/driver/rados/rgw_trim_datalog.cc b/src/rgw/driver/rados/rgw_trim_datalog.cc index 5dcddb659e1e5..4f1462ce669e9 100644 --- a/src/rgw/driver/rados/rgw_trim_datalog.cc +++ b/src/rgw/driver/rados/rgw_trim_datalog.cc @@ -45,8 +45,10 @@ class DatalogTrimImplCR : public RGWSimpleCoroutine { int send_request(const DoutPrefixProvider *dpp) override { set_status() << "sending request"; cn = stack->create_completion_notifier(); - return store->svc()->datalog_rados->trim_entries(dpp, shard, marker, - cn->completion()); + // Call cannot fail, all errors will be reported through the completion + store->svc()->datalog_rados->trim_entries(dpp, shard, marker, + cn->completion()); + return 0; } int request_complete() override { int r = cn->completion()->get_return_value(); diff --git a/src/rgw/radosgw-admin/radosgw-admin.cc b/src/rgw/radosgw-admin/radosgw-admin.cc index 5826ca505ffee..9211a56bd40e3 100644 --- a/src/rgw/radosgw-admin/radosgw-admin.cc +++ b/src/rgw/radosgw-admin/radosgw-admin.cc @@ -2,7 +2,7 @@ // vim: ts=8 sw=2 smarttab ft=cpp /* - * Copyright (C) 2025 IBM + * Copyright (C) 2025 IBM */ #include @@ -48,6 +48,9 @@ extern "C" { #include "radosgw-admin/orphan.h" #include "radosgw-admin/sync_checkpoint.h" + +#include "rgw/async_utils.h" + #include "rgw_user.h" #include "rgw_otp.h" #include "rgw_rados.h" @@ -123,6 +126,7 @@ static const DoutPrefixProvider* dpp() { } while (0) using namespace std; +using rgw::run_coro; inline int posix_errortrans(int r) { @@ -3529,23 +3533,6 @@ void init_realm_param(CephContext *cct, string& var, std::optional& opt_ } } -int run_coro(asio::awaitable coro, std::string_view name) { - try { - // Blocking in startup code, not ideal, but won't hurt anything. - asio::co_spawn(static_cast(driver)->get_io_context(), - std::move(coro), - async::use_blocked); - } catch (boost::system::system_error& e) { - ldpp_dout(dpp(), -1) << name << ": failed: " << e.what() << dendl; - return ceph::from_error_code(e.code()); - } catch (std::exception& e) { - ldpp_dout(dpp(), -1) << name << ": failed: " << e.what() << dendl; - return -EIO; - } - return 0; -} - - // This has an uncaught exception. Even if the exception is caught, the program // would need to be terminated, so the warning is simply suppressed. // coverity[root_function:SUPPRESS] @@ -10989,10 +10976,13 @@ next: if (specified_shard_id) { shard = shard_id; } - ret = run_coro(datalog->admin_sem_list(shard, max_entries, marker, + std::string err; + ret = run_coro(dpp(), context_pool, + datalog->admin_sem_list(shard, max_entries, marker, cout, *formatter), - "datalog seamphore list"); + &err); if (ret < 0) { + std::cerr << "datalog semaphore list: " << err << std::endl; return ret; } } @@ -11002,18 +10992,21 @@ next: std::cerr << "Specify the semaphore key with --marker." << std::endl; return -EINVAL; } + std::string errstr; auto datalog = static_cast(driver) ->svc()->datalog_rados; - ret = run_coro(datalog->admin_sem_reset(marker, count.value_or(0)), - "datalog seamphore reset"); + ret = rgw::run_coro(dpp(), context_pool, + datalog->admin_sem_reset(marker, count.value_or(0)), + &errstr); if (ret < 0) { + std::cerr << "datalog semaphore reset: " << errstr << std::endl; return ret; } } if (opt_cmd == OPT::DATALOG_LIST) { formatter->open_array_section("entries"); - bool truncated; + bool truncated = false; int count = 0; if (max_entries < 0) max_entries = 1000; @@ -11045,13 +11038,20 @@ next: do { std::vector entries; if (specified_shard_id) { - ret = datalog_svc->list_entries(dpp(), shard_id, max_entries - count, - entries, marker, - &marker, &truncated, - &errstr, null_yield); + ret = run_coro( + dpp(), + context_pool, + datalog_svc->list_entries(dpp(), shard_id, max_entries - count, + marker), + std::tie(entries, marker, truncated), + &errstr); } else { - ret = datalog_svc->list_entries(dpp(), max_entries - count, entries, - log_marker, &truncated, null_yield); + ret = run_coro( + dpp(), + context_pool, + datalog_svc->list_entries(dpp(), max_entries - count, log_marker), + std::tie(entries, log_marker, truncated), + &errstr); } if (ret < 0) { cerr << "ERROR: datalog_svc->list_entries(): " << errstr << ": " @@ -11082,9 +11082,18 @@ next: for (; i < g_ceph_context->_conf->rgw_data_log_num_shards; i++) { vector entries; + std::string errstr; RGWDataChangesLogInfo info; - static_cast(driver)->svc()-> - datalog_rados->get_info(dpp(), i, &info, nullptr, null_yield); + + int r = run_coro(dpp(), context_pool, + static_cast(driver)->svc()-> + datalog_rados->get_info(dpp(), i), + info, &errstr); + + if (r < 0) { + std::cerr << "datalog status: " << errstr << std::endl; + return -r; + } ::encode_json("info", info, formatter.get()); @@ -11146,11 +11155,14 @@ next: return EINVAL; } + std::string errstr; auto datalog = static_cast(driver)->svc()->datalog_rados; - ret = datalog->trim_entries(dpp(), shard_id, marker, nullptr, null_yield); + ret = run_coro(dpp(), context_pool, + datalog->trim_entries(dpp(), shard_id, marker), + &errstr); if (ret < 0 && ret != -ENODATA) { - cerr << "ERROR: trim_entries(): " << cpp_strerror(-ret) << std::endl; + cerr << "ERROR: trim_entries(): " << errstr << std::endl; return -ret; } } @@ -11161,9 +11173,12 @@ next: return -EINVAL; } auto datalog = static_cast(driver)->svc()->datalog_rados; - ret = datalog->change_format(dpp(), *opt_log_type, null_yield); + std::string errstr; + ret = run_coro(dpp(), context_pool, + datalog->change_format(dpp(), *opt_log_type), + &errstr); if (ret < 0) { - cerr << "ERROR: change_format(): " << cpp_strerror(-ret) << std::endl; + cerr << "ERROR: change_format(): " << errstr << std::endl; return -ret; } } @@ -11171,10 +11186,13 @@ next: if (opt_cmd == OPT::DATALOG_PRUNE) { auto datalog = static_cast(driver)->svc()->datalog_rados; std::optional through; - ret = datalog->trim_generations(dpp(), through, null_yield); + std::string errstr; + ret = run_coro(dpp(), context_pool, + datalog->trim_generations(dpp(), through), + &errstr); if (ret < 0) { - cerr << "ERROR: trim_generations(): " << cpp_strerror(-ret) << std::endl; + cerr << "ERROR: trim_generations(): " << errstr << std::endl; return -ret; } diff --git a/src/test/rgw/test_datalog.cc b/src/test/rgw/test_datalog.cc index 2fea326c81a62..dd00db7ec3828 100644 --- a/src/test/rgw/test_datalog.cc +++ b/src/test/rgw/test_datalog.cc @@ -132,7 +132,7 @@ protected: RGWDataChangesLogMarker marker; do { std::vector entries; - std::tie(entries, marker) = + std::tie(entries, marker, std::ignore) = co_await datalog->list_entries(dpp, 1'000, std::move(marker)); for (const auto& entry : entries) { -- 2.39.5