From c050cc8b15437e62a14e08c43f8a740c2de2a8e5 Mon Sep 17 00:00:00 2001 From: "Adam C. Emerson" Date: Fri, 15 Aug 2025 01:09:08 -0400 Subject: [PATCH] rgw: Update `run_coro` to use `redirect_error` and add tests Should have added tests the first time, but better late than never. Signed-off-by: Adam C. Emerson --- src/rgw/async_utils.h | 113 +++++++-------- src/test/rgw/CMakeLists.txt | 3 + src/test/rgw/test_rgw_async_utils.cc | 207 +++++++++++++++++++++++++++ 3 files changed, 265 insertions(+), 58 deletions(-) create mode 100644 src/test/rgw/test_rgw_async_utils.cc diff --git a/src/rgw/async_utils.h b/src/rgw/async_utils.h index 5bc7de21739..85b43038755 100644 --- a/src/rgw/async_utils.h +++ b/src/rgw/async_utils.h @@ -29,6 +29,7 @@ #include "common/async/blocked_completion.h" #include "common/async/concepts.h" #include "common/async/yield_context.h" +#include "common/async/redirect_error.h" #include "common/dout.h" #include "common/dout_fmt.h" @@ -60,13 +61,10 @@ inline int run_coro( std::string* what ///< Where to store the result of `what()` on exception ) noexcept { - try { - maybe_warn_about_blocking(dpp); - asio::co_spawn(executor, std::move(coro), async::use_blocked); - } catch (const std::exception&) { - return ceph::from_exception(std::current_exception(), what); - } - return 0; + std::exception_ptr e; + maybe_warn_about_blocking(dpp); + asio::co_spawn(executor, std::move(coro), async::use_blocked[e]); + return ceph::from_exception(e, what); } /// Call a coroutine and block until it completes, handling exceptions @@ -107,13 +105,10 @@ int run_coro( std::string* what ///< Where to store the result of `what()`. ) noexcept { - try { - val = asio::co_spawn(executor, std::move(coro), async::use_blocked); - maybe_warn_about_blocking(dpp); - } catch (const std::exception& e) { - return ceph::from_exception(std::current_exception(), what); - } - return 0; + std::exception_ptr e; + maybe_warn_about_blocking(dpp); + val = asio::co_spawn(executor, std::move(coro), async::use_blocked[e]); + return ceph::from_exception(e, what); } /// Call a coroutine and block until it completes, handling exceptions @@ -156,13 +151,10 @@ int run_coro( std::string* what ///< Where to store the result of `what()`. ) noexcept { - try { - maybe_warn_about_blocking(dpp); - vals = asio::co_spawn(executor, std::move(coro), async::use_blocked); - } catch (const std::exception& e) { - return ceph::from_exception(std::current_exception(), what); - } - return 0; + std::exception_ptr e; + maybe_warn_about_blocking(dpp); + vals = asio::co_spawn(executor, std::move(coro), async::use_blocked[e]); + return ceph::from_exception(e, what); } /// Call a coroutine and block until it completes, handling exceptions @@ -203,19 +195,21 @@ int run_coro( int log_level = 5 /// What level to log at ) noexcept { - try { - if (y) { - auto& yield = y.get_yield_context(); - asio::co_spawn(yield.get_executor(), std::move(coro), yield); - } else { - maybe_warn_about_blocking(dpp); - asio::co_spawn(executor, std::move(coro), async::use_blocked); - } - } catch (const std::exception& e) { - ldpp_dout_fmt(dpp, log_level, "{}: failed: {}", name, e.what()); - return ceph::from_exception(std::current_exception()); + std::exception_ptr e; + if (y) { + auto& yield = y.get_yield_context(); + asio::co_spawn(yield.get_executor(), std::move(coro), + async::redirect_error(yield, e)); + } else { + maybe_warn_about_blocking(dpp); + asio::co_spawn(executor, std::move(coro), async::use_blocked[e]); } - return 0; + std::string what; + auto r = ceph::from_exception(e, &what); + if (e) [[unlikely]] { + ldpp_dout_fmt(dpp, log_level, "{}: failed: {}", name, what); + } + return r; } /// Call a coroutine and block until it completes, handling exceptions @@ -260,20 +254,21 @@ int run_coro( int log_level = 5 /// What level to log at ) noexcept { - try { - if (y) { - auto& yield = y.get_yield_context(); - val = asio::co_spawn(yield.get_executor(), std::move(coro), yield); - } else { - maybe_warn_about_blocking(dpp); - val = asio::co_spawn(executor, std::move(coro), async::use_blocked); - } - } catch (const std::exception& e) { - ldpp_dout_fmt(dpp, log_level, "{}: failed: {}", name, e.what()); - return ceph::from_exception(std::current_exception()); + std::exception_ptr e; + if (y) { + auto& yield = y.get_yield_context(); + val = asio::co_spawn(yield.get_executor(), std::move(coro), + async::redirect_error(yield, e)); + } else { + maybe_warn_about_blocking(dpp); + val = asio::co_spawn(executor, std::move(coro), async::use_blocked[e]); } - - return 0; + std::string what; + auto r = ceph::from_exception(e, &what); + if (e) [[unlikely]] { + ldpp_dout_fmt(dpp, log_level, "{}: failed: {}", name, what); + } + return r; } /// Call a coroutine and block until it completes, handling exceptions @@ -317,19 +312,21 @@ int run_coro( int log_level = 5 /// What level to log at ) noexcept { - try { - if (y) { - auto& yield = y.get_yield_context(); - vals = asio::co_spawn(yield.get_executor(), std::move(coro), yield); - } else { - maybe_warn_about_blocking(dpp); - vals = asio::co_spawn(executor, std::move(coro), async::use_blocked); - } - } catch (const std::exception& e) { - ldpp_dout_fmt(dpp, log_level, "{}: failed: {}", name, e.what()); - return ceph::from_exception(std::current_exception()); + std::exception_ptr e; + if (y) { + auto& yield = y.get_yield_context(); + vals = asio::co_spawn(yield.get_executor(), std::move(coro), + async::redirect_error(yield, e)); + } else { + maybe_warn_about_blocking(dpp); + vals = asio::co_spawn(executor, std::move(coro), async::use_blocked[e]); + } + std::string what; + auto r = ceph::from_exception(e, &what); + if (e) [[unlikely]] { + ldpp_dout_fmt(dpp, log_level, "{}: failed: {}", name, what); } - return 0; + return r; } /// Call a coroutine and block until it completes, handling exceptions diff --git a/src/test/rgw/CMakeLists.txt b/src/test/rgw/CMakeLists.txt index 5899e8dca13..eabbc73fc0a 100644 --- a/src/test/rgw/CMakeLists.txt +++ b/src/test/rgw/CMakeLists.txt @@ -416,3 +416,6 @@ endfunction() add_catch2_test_rgw(rgw_hex) +add_executable(unittest_rgw_async_utils test_rgw_async_utils.cc) +add_ceph_unittest(unittest_rgw_async_utils) +target_link_libraries(unittest_rgw_async_utils ${rgw_libs} ${UNITTEST_LIBS}) diff --git a/src/test/rgw/test_rgw_async_utils.cc b/src/test/rgw/test_rgw_async_utils.cc new file mode 100644 index 00000000000..1ddba911280 --- /dev/null +++ b/src/test/rgw/test_rgw_async_utils.cc @@ -0,0 +1,207 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +/* + * Ceph - scalable distributed file system + * + * Copyright contributors to the Ceph project + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "rgw/async_utils.h" + +#include +#include +#include +#include + +#include +#include +#include + +#include +#include + +#include + +#include "common/async/context_pool.h" +#include "common/async/yield_context.h" + +#include +#include + +namespace asio = boost::asio; +namespace async = ceph::async; +namespace sys = boost::system; + +static auto cct = std::make_unique(CEPH_ENTITY_TYPE_ANY); + +static NoDoutPrefix dp(cct.get(), ceph_subsys_rgw); + +TEST(CoSpawn, CoSpawn) { + async::io_context_pool pool{3}; + + auto maybethrow = [](int code) -> asio::awaitable { + if (code != 0) { + throw sys::system_error{code, sys::generic_category()}; + } + co_return; + }; + + int r = 0; + r = rgw::run_coro(&dp, pool, maybethrow(ENOENT), nullptr); + ASSERT_EQ(-ENOENT, r); + + r = rgw::run_coro(&dp, pool, maybethrow(0), nullptr); + ASSERT_EQ(0, r); + + asio::spawn(pool, + [&](asio::yield_context y) -> void { + r = rgw::run_coro(&dp, pool, maybethrow(ENOENT), "yielding", + y); + }, + async::use_blocked); + ASSERT_EQ(-ENOENT, r); + + asio::spawn(pool, + [&](asio::yield_context y) -> void { + r = rgw::run_coro(&dp, pool, maybethrow(0), "yielding", + y); + }, + async::use_blocked); + ASSERT_EQ(0, r); + + r = rgw::run_coro(&dp, pool, maybethrow(ENOENT), "blocking", + null_yield); + ASSERT_EQ(-ENOENT, r); + + r = rgw::run_coro(&dp, pool, maybethrow(0), "blocking", + null_yield); + ASSERT_EQ(0, r); + + auto maybethrowv = [](int code, V v) -> asio::awaitable { + if (code != 0) { + throw sys::system_error{code, sys::generic_category()}; + } + co_return std::move(v); + }; + + const std::string instr("foo"); + std::string s; + + r = rgw::run_coro(&dp, pool, maybethrowv(ENOENT, instr), + s, nullptr); + ASSERT_EQ(-ENOENT, r); + ASSERT_TRUE(s.empty()); + + r = rgw::run_coro(&dp, pool, maybethrowv(0, instr), s, nullptr); + ASSERT_EQ(0, r); + ASSERT_EQ(instr, s); + + s.clear(); + + asio::spawn(pool, + [&](asio::yield_context y) -> void { + r = rgw::run_coro(&dp, pool, maybethrowv(ENOENT, instr), + s, "yielding", y); + }, + async::use_blocked); + ASSERT_EQ(-ENOENT, r); + ASSERT_TRUE(s.empty()); + + asio::spawn(pool, + [&](asio::yield_context y) -> void { + r = rgw::run_coro(&dp, pool, maybethrowv(0, instr), + s, "yielding", y); + }, + async::use_blocked); + ASSERT_EQ(0, r); + ASSERT_EQ(instr, s); + + s.clear(); + r = rgw::run_coro(&dp, pool, maybethrowv(ENOENT, instr), s, + "blocking", null_yield); + ASSERT_EQ(-ENOENT, r); + ASSERT_TRUE(s.empty()); + + r = rgw::run_coro(&dp, pool, maybethrowv(0, instr), s, + "blocking", null_yield); + ASSERT_EQ(0, r); + ASSERT_EQ(instr, s); + + auto maybethrowvs = [](int code, Vs ...vs) + -> asio::awaitable> { + if (code != 0) { + throw sys::system_error{code, sys::generic_category()}; + } + co_return std::make_tuple(std::move(vs)...); + }; + + s.clear(); + std::unique_ptr p; + + r = rgw::run_coro(&dp, pool, maybethrowvs(ENOENT, instr, + std::make_unique(5)), + std::tie(s, p), nullptr); + ASSERT_EQ(-ENOENT, r); + ASSERT_TRUE(s.empty()); + ASSERT_FALSE(p); + + r = rgw::run_coro(&dp, pool, maybethrowvs(0, instr, + std::make_unique(5)), + std::tie(s, p), nullptr); + ASSERT_EQ(0, r); + ASSERT_EQ(instr, s); + ASSERT_TRUE(p); + ASSERT_EQ(5, *p); + + s.clear(); + p.reset(); + + asio::spawn(pool, + [&](asio::yield_context y) -> void { + r = rgw::run_coro(&dp, pool, + maybethrowvs(ENOENT, instr, + std::make_unique(5)), + std::tie(s, p), "yielding", y); + }, + async::use_blocked); + ASSERT_EQ(-ENOENT, r); + ASSERT_TRUE(s.empty()); + ASSERT_FALSE(p); + + asio::spawn(pool, + [&](asio::yield_context y) -> void { + r = rgw::run_coro(&dp, pool, + maybethrowvs(0, instr, + std::make_unique(5)), + std::tie(s, p), "yielding", y); + }, + async::use_blocked); + ASSERT_EQ(0, r); + ASSERT_EQ(instr, s); + ASSERT_TRUE(p); + ASSERT_EQ(5, *p); + + s.clear(); + p.reset(); + r = rgw::run_coro(&dp, pool, maybethrowvs(ENOENT, instr, + std::make_unique(5)), + std::tie(s, p), "blocking", null_yield); + ASSERT_EQ(-ENOENT, r); + ASSERT_TRUE(s.empty()); + ASSERT_FALSE(p); + + r = rgw::run_coro(&dp, pool, maybethrowvs(0, instr, + std::make_unique(5)), + std::tie(s, p), "blocking", null_yield); + ASSERT_EQ(0, r); + ASSERT_EQ(instr, s); + ASSERT_TRUE(p); + ASSERT_EQ(5, *p); +} -- 2.39.5