From e81d4eae4e76f9c279cd8f146dd6cc132a5ed51a Mon Sep 17 00:00:00 2001 From: "Adam C. Emerson" Date: Wed, 6 Aug 2025 16:02:32 -0400 Subject: [PATCH] common/async: Update `use_blocked` for newer asio Reimplement with `initiate` rather than the old style. This necessitates getting rid of the old `async::Completion` in anything that was calling it, and other changes. Also, use disposition for error handling. Signed-off-by: Adam C. Emerson --- src/common/async/blocked_completion.h | 368 ++++++++++++++------- src/common/async/forward_handler.h | 2 + src/common/error_code.h | 2 +- src/mon/MonClient.cc | 41 ++- src/mon/MonClient.h | 219 ++++++------ src/neorados/RADOS.cc | 43 +-- src/osdc/Objecter.cc | 37 ++- src/osdc/Objecter.h | 29 +- src/rgw/driver/rados/rgw_datalog.cc | 197 +++++------ src/rgw/radosgw-admin/radosgw-admin.cc | 10 +- src/test/common/test_blocked_completion.cc | 182 +++++----- 11 files changed, 635 insertions(+), 495 deletions(-) diff --git a/src/common/async/blocked_completion.h b/src/common/async/blocked_completion.h index 23a1319bc0fa3..bdf75d906e5c9 100644 --- a/src/common/async/blocked_completion.h +++ b/src/common/async/blocked_completion.h @@ -16,60 +16,80 @@ #ifndef CEPH_COMMON_ASYNC_BLOCKED_COMPLETION_H #define CEPH_COMMON_ASYNC_BLOCKED_COMPLETION_H -#include #include #include #include -#include #include +#include #include -#include + +#include namespace ceph::async { namespace bs = boost::system; class use_blocked_t { - use_blocked_t(bs::error_code* ec) : ec(ec) {} public: use_blocked_t() = default; - use_blocked_t operator [](bs::error_code& _ec) const { - return use_blocked_t(&_ec); + auto operator [](bs::error_code& ec) const { + return boost::asio::redirect_error(use_blocked_t{}, ec); } - - bs::error_code* ec = nullptr; }; inline constexpr use_blocked_t use_blocked; namespace detail { - -template +// Obnoxiously repetitive, but it cuts down on the amount of +// copying/moving/splicing/concatenating of tuples I need to do. +template struct blocked_handler { - blocked_handler(use_blocked_t b) noexcept : ec(b.ec) {} + blocked_handler(std::optional>* vals, std::mutex* m, + std::condition_variable* cv, bool* done) + : vals(vals), m(m), cv(cv), done(done) { + } - void operator ()(Ts... values) noexcept { + template + void operator ()(Args&& ...args) noexcept { + static_assert(sizeof...(Ts) == sizeof...(Args)); std::scoped_lock l(*m); - *ec = bs::error_code{}; - *value = std::forward_as_tuple(std::move(values)...); + *vals = std::tuple(std::forward(args)...); *done = true; cv->notify_one(); } - void operator ()(bs::error_code ec, Ts... values) noexcept { + //private: + std::optional>* vals; + std::mutex* m = nullptr; + std::condition_variable* cv = nullptr; + bool* done = nullptr; +}; + +template +struct blocked_handler +{ + blocked_handler(D* dispo, std::optional>* vals, + std::mutex* m, std::condition_variable* cv, bool* done) + : dispo(dispo), vals(vals), m(m), cv(cv), done(done) { + } + + template + void operator ()(Arg0&& arg0, Args&& ...args) noexcept { + static_assert(sizeof...(Ts) == sizeof...(Args)); std::scoped_lock l(*m); - *this->ec = ec; - *value = std::forward_as_tuple(std::move(values)...); + *dispo = std::move(arg0); + *vals = std::tuple(std::forward(args)...); *done = true; cv->notify_one(); } - bs::error_code* ec; - std::optional>* value = nullptr; + //private: + D* dispo; + std::optional>* vals; std::mutex* m = nullptr; std::condition_variable* cv = nullptr; bool* done = nullptr; @@ -78,27 +98,44 @@ struct blocked_handler template struct blocked_handler { - blocked_handler(use_blocked_t b) noexcept : ec(b.ec) {} + blocked_handler(std::optional* val, std::mutex* m, + std::condition_variable* cv, bool* done) + : val(val), m(m), cv(cv), done(done) {} - void operator ()(T value) noexcept { + template + void operator ()(Arg&& arg) noexcept { std::scoped_lock l(*m); - *ec = bs::error_code(); - *this->value = std::move(value); + *val = std::forward(arg); *done = true; cv->notify_one(); } - void operator ()(bs::error_code ec, T value) noexcept { + //private: + std::optional* val; + std::mutex* m = nullptr; + std::condition_variable* cv = nullptr; + bool* done = nullptr; +}; + +template +struct blocked_handler +{ + blocked_handler(D* dispo, std::optional* val, std::mutex* m, + std::condition_variable* cv, bool* done) + : dispo(dispo), val(val), m(m), cv(cv), done(done) {} + + template + void operator ()(Arg0&& arg0, Arg&& arg) noexcept { std::scoped_lock l(*m); - *this->ec = ec; - *this->value = std::move(value); + *dispo = std::move(arg0); + *val = std::move(arg); *done = true; cv->notify_one(); } //private: - bs::error_code* ec; - std::optional* value; + D* dispo; + std::optional* val; std::mutex* m = nullptr; std::condition_variable* cv = nullptr; bool* done = nullptr; @@ -107,23 +144,37 @@ struct blocked_handler template<> struct blocked_handler { - blocked_handler(use_blocked_t b) noexcept : ec(b.ec) {} + blocked_handler(std::mutex* m, std::condition_variable* cv, bool* done) + : m(m), cv(cv), done(done) {} void operator ()() noexcept { std::scoped_lock l(*m); - *ec = bs::error_code{}; *done = true; cv->notify_one(); } - void operator ()(bs::error_code ec) noexcept { + std::mutex* m = nullptr; + std::condition_variable* cv = nullptr; + bool* done = nullptr; +}; + +template +struct blocked_handler +{ + blocked_handler(D* dispo, std::mutex* m, + std::condition_variable* cv, bool* done) + : dispo(dispo), m(m), cv(cv), done(done) {} + + template + void operator ()(Arg0&& arg0) noexcept { std::scoped_lock l(*m); - *this->ec = ec; + *dispo = std::move(arg0); *done = true; cv->notify_one(); } - bs::error_code* ec; + //private: + D* dispo; std::mutex* m = nullptr; std::condition_variable* cv = nullptr; bool* done = nullptr; @@ -136,35 +187,65 @@ public: using completion_handler_type = blocked_handler; using return_type = std::tuple; - explicit blocked_result(completion_handler_type& h) noexcept { - std::scoped_lock l(m); - out_ec = h.ec; - if (!out_ec) h.ec = &ec; - h.value = &value; - h.m = &m; - h.cv = &cv; - h.done = &done; - } + template + static return_type initiate(Initiation&& init, + use_blocked_t, + Args&& ...args) { + using ttype = std::tuple; + std::optional vals; + std::mutex m; + std::condition_variable cv; + bool done = false; + static_assert(std::tuple_size_v > 1); + + std::move(init)(completion_handler_type(&vals, &m, &cv, &done), + std::forward(args)...); - return_type get() { std::unique_lock l(m); - cv.wait(l, [this]() { return done; }); - if (!out_ec && ec) throw bs::system_error(ec); - return std::move(*value); + cv.wait(l, [&done]() { return done; }); + return std::move(*vals); } blocked_result(const blocked_result&) = delete; blocked_result& operator =(const blocked_result&) = delete; blocked_result(blocked_result&&) = delete; blocked_result& operator =(blocked_result&&) = delete; +}; -private: - bs::error_code* out_ec; - bs::error_code ec; - std::optional value; - std::mutex m; - std::condition_variable cv; - bool done = false; +template +class blocked_result +{ +public: + using completion_handler_type = blocked_handler; + using return_type = std::tuple; + + template + static return_type initiate(Initiation&& init, + use_blocked_t, + Args&& ...args) { + using ttype = std::tuple; + std::optional vals; + D dispo; + std::mutex m; + std::condition_variable cv; + bool done = false; + static_assert(std::tuple_size_v > 1); + + std::move(init)(completion_handler_type(&dispo, &vals, &m, &cv, &done), + std::forward(args)...); + + std::unique_lock l(m); + cv.wait(l, [&done]() { return done; }); + if (dispo != boost::asio::no_error) { + boost::asio::disposition_traits::throw_exception(dispo); + } + return std::move(*vals); + } + + blocked_result(const blocked_result&) = delete; + blocked_result& operator =(const blocked_result&) = delete; + blocked_result(blocked_result&&) = delete; + blocked_result& operator =(blocked_result&&) = delete; }; template @@ -174,35 +255,61 @@ public: using completion_handler_type = blocked_handler; using return_type = T; - explicit blocked_result(completion_handler_type& h) noexcept { - std::scoped_lock l(m); - out_ec = h.ec; - if (!out_ec) h.ec = &ec; - h.value = &value; - h.m = &m; - h.cv = &cv; - h.done = &done; - } + template + static return_type initiate(Initiation&& init, + use_blocked_t, + Args&& ...args) { + std::optional val; + std::mutex m; + std::condition_variable cv; + bool done = false; + + std::move(init)(completion_handler_type(&val, &m, &cv, &done), + std::forward(args)...); - return_type get() { std::unique_lock l(m); - cv.wait(l, [this]() { return done; }); - if (!out_ec && ec) throw bs::system_error(ec); - return std::move(*value); + cv.wait(l, [&done]() { return done; }); + return std::move(*val); } blocked_result(const blocked_result&) = delete; blocked_result& operator =(const blocked_result&) = delete; blocked_result(blocked_result&&) = delete; blocked_result& operator =(blocked_result&&) = delete; +}; + +template +class blocked_result +{ +public: + using completion_handler_type = blocked_handler; + using return_type = T; + + template + static return_type initiate(Initiation&& init, + use_blocked_t, + Args&& ...args) { + D dispo; + std::optional val; + std::mutex m; + std::condition_variable cv; + bool done = false; + + std::move(init)(completion_handler_type(&dispo, &val, &m, &cv, &done), + std::forward(args)...); + + std::unique_lock l(m); + cv.wait(l, [&done]() { return done; }); + if (dispo != boost::asio::no_error) { + boost::asio::disposition_traits::throw_exception(dispo); + } + return std::move(*val); + } -private: - bs::error_code* out_ec; - bs::error_code ec; - std::optional value; - std::mutex m; - std::condition_variable cv; - bool done = false; + blocked_result(const blocked_result&) = delete; + blocked_result& operator =(const blocked_result&) = delete; + blocked_result(blocked_result&&) = delete; + blocked_result& operator =(blocked_result&&) = delete; }; template<> @@ -212,78 +319,101 @@ public: using completion_handler_type = blocked_handler; using return_type = void; - explicit blocked_result(completion_handler_type& h) noexcept { - std::scoped_lock l(m); - out_ec = h.ec; - if (!out_ec) h.ec = &ec; - h.m = &m; - h.cv = &cv; - h.done = &done; - } + template + static return_type initiate(Initiation&& init, + use_blocked_t, + Args&& ...args) { + std::mutex m; + std::condition_variable cv; + bool done = false; + + std::move(init)(completion_handler_type(&m, &cv, &done), + std::forward(args)...); - void get() { std::unique_lock l(m); - cv.wait(l, [this]() { return done; }); - if (!out_ec && ec) throw bs::system_error(ec); + cv.wait(l, [&done]() { return done; }); + return; } blocked_result(const blocked_result&) = delete; blocked_result& operator =(const blocked_result&) = delete; blocked_result(blocked_result&&) = delete; blocked_result& operator =(blocked_result&&) = delete; +}; + -private: - bs::error_code* out_ec; - bs::error_code ec; - std::mutex m; - std::condition_variable cv; - bool done = false; +template +class blocked_result +{ +public: + using completion_handler_type = blocked_handler; + using return_type = void; + + template + static return_type initiate(Initiation&& init, + use_blocked_t, + Args&& ...args) { + D dispo; + std::mutex m; + std::condition_variable cv; + bool done = false; + + std::move(init)(completion_handler_type(&dispo, &m, &cv, &done), + std::forward(args)...); + + std::unique_lock l(m); + cv.wait(l, [&done]() { return done; }); + if (dispo != boost::asio::no_error) { + boost::asio::disposition_traits::throw_exception(dispo); + } + return; + } + + blocked_result(const blocked_result&) = delete; + blocked_result& operator =(const blocked_result&) = delete; + blocked_result(blocked_result&&) = delete; + blocked_result& operator =(blocked_result&&) = delete; }; + } // namespace detail } // namespace ceph::async namespace boost::asio { -template -class async_result +template +class async_result : public ceph::async::detail::blocked_result { -public: - explicit async_result(typename ceph::async::detail::blocked_result - ::completion_handler_type& h) - : ceph::async::detail::blocked_result(h) {} }; -template -class async_result - : public ceph::async::detail::blocked_result...> +template +class async_result + : public ceph::async::detail::blocked_result { -public: - explicit async_result( - typename ceph::async::detail::blocked_result...>::completion_handler_type& h) - : ceph::async::detail::blocked_result...>(h) {} }; -template -class async_result - : public ceph::async::detail::blocked_result +template +class async_result + : public ceph::async::detail::blocked_result { -public: - explicit async_result( - typename ceph::async::detail::blocked_result::completion_handler_type& h) - : ceph::async::detail::blocked_result(h) {} }; -template -class async_result - : public ceph::async::detail::blocked_result...> +template +class async_result + : public ceph::async::detail::blocked_result +{ +}; + +template +class async_result + : public ceph::async::detail::blocked_result +{ +}; + +template +class async_result + : public ceph::async::detail::blocked_result { -public: - explicit async_result( - typename ceph::async::detail::blocked_result...>::completion_handler_type& h) - : ceph::async::detail::blocked_result...>(h) {} }; } diff --git a/src/common/async/forward_handler.h b/src/common/async/forward_handler.h index e204ca9862c35..04fc68aa5c4f7 100644 --- a/src/common/async/forward_handler.h +++ b/src/common/async/forward_handler.h @@ -15,6 +15,8 @@ #ifndef CEPH_ASYNC_FORWARD_HANDLER_H #define CEPH_ASYNC_FORWARD_HANDLER_H +#include + #include namespace ceph::async { diff --git a/src/common/error_code.h b/src/common/error_code.h index f984a88ad087b..f37d7ce617df4 100644 --- a/src/common/error_code.h +++ b/src/common/error_code.h @@ -94,7 +94,7 @@ inline boost::system::error_condition make_error_condition(errc e) noexcept { #pragma GCC diagnostic pop #pragma clang diagnostic pop -inline int from_exception(std::exception_ptr eptr) { +[[nodiscard]] inline int from_exception(std::exception_ptr eptr) { if (!eptr) [[likely]] { return 0; } diff --git a/src/mon/MonClient.cc b/src/mon/MonClient.cc index 5770e6a2b0cf0..132be34c63332 100644 --- a/src/mon/MonClient.cc +++ b/src/mon/MonClient.cc @@ -62,6 +62,7 @@ #undef dout_prefix #define dout_prefix *_dout << "monclient" << (_hunting() ? "(hunting)":"") << ": " +namespace asio = boost::asio; namespace bs = boost::system; using std::string; using namespace std::literals; @@ -534,8 +535,9 @@ void MonClient::shutdown() monc_lock.lock(); stopping = true; while (!version_requests.empty()) { - ceph::async::post(std::move(version_requests.begin()->second), - monc_errc::shutting_down, 0, 0); + asio::dispatch( + asio::append(std::move(version_requests.begin()->second), + make_error_code(monc_errc::shutting_down), 0, 0)); ldout(cct, 20) << __func__ << " canceling and discarding version request " << version_requests.begin()->first << dendl; version_requests.erase(version_requests.begin()); @@ -710,7 +712,7 @@ void MonClient::_finish_auth(int auth_err) ceph_assert(auth); _check_auth_tickets(); } else if (auth_err == -EAGAIN && !active_con) { - ldout(cct,10) << __func__ + ldout(cct,10) << __func__ << " auth returned EAGAIN, reopening the session to try again" << dendl; _reopen_session(); @@ -767,8 +769,9 @@ void MonClient::_reopen_session(int rank) // throw out version check requests while (!version_requests.empty()) { - ceph::async::post(std::move(version_requests.begin()->second), - monc_errc::session_reset, 0, 0); + asio::dispatch(asio::append(std::move(version_requests.begin()->second), + make_error_code(monc_errc::session_reset), + 0, 0)); version_requests.erase(version_requests.begin()); } @@ -1168,7 +1171,8 @@ void MonClient::_send_command(MonCommand *r) if (r->is_tell()) { ++r->send_attempts; if (r->send_attempts > cct->_conf->mon_client_directed_command_retry) { - _finish_command(r, monc_errc::mon_unavailable, "mon unavailable", {}); + _finish_command(r, make_error_code(monc_errc::mon_unavailable), + "mon unavailable", {}); return; } // tell-style command @@ -1180,7 +1184,8 @@ void MonClient::_send_command(MonCommand *r) if (r->target_rank >= (int)monmap.size()) { ldout(cct, 10) << " target " << r->target_rank << " >= max mon " << monmap.size() << dendl; - _finish_command(r, monc_errc::rank_dne, "mon rank dne"sv, {}); + _finish_command(r, make_error_code(monc_errc::rank_dne), + "mon rank dne"sv, {}); return; } r->target_con = messenger->connect_to_mon( @@ -1189,7 +1194,8 @@ void MonClient::_send_command(MonCommand *r) if (!monmap.contains(r->target_name)) { ldout(cct, 10) << " target " << r->target_name << " not present in monmap" << dendl; - _finish_command(r, monc_errc::mon_dne, "mon dne"sv, {}); + _finish_command(r, make_error_code(monc_errc::mon_dne), + "mon dne"sv, {}); return; } r->target_con = messenger->connect_to_mon( @@ -1224,7 +1230,8 @@ void MonClient::_send_command(MonCommand *r) if (r->target_rank >= (int)monmap.size()) { ldout(cct, 10) << " target " << r->target_rank << " >= max mon " << monmap.size() << dendl; - _finish_command(r, monc_errc::rank_dne, "mon rank dne"sv, {}); + _finish_command(r, make_error_code(monc_errc::rank_dne), + "mon rank dne"sv, {}); return; } _reopen_session(r->target_rank); @@ -1239,7 +1246,8 @@ void MonClient::_send_command(MonCommand *r) if (!monmap.contains(r->target_name)) { ldout(cct, 10) << " target " << r->target_name << " not present in monmap" << dendl; - _finish_command(r, monc_errc::mon_dne, "mon dne"sv, {}); + _finish_command(r, make_error_code(monc_errc::mon_dne), + "mon dne"sv, {}); return; } _reopen_session(monmap.get_rank(r->target_name)); @@ -1377,7 +1385,8 @@ int MonClient::_cancel_mon_command(uint64_t tid) ldout(cct, 10) << __func__ << " tid " << tid << dendl; MonCommand *cmd = it->second; - _finish_command(cmd, monc_errc::timed_out, "timed out"sv, {}); + _finish_command(cmd, make_error_code(monc_errc::timed_out), + "timed out"sv, {}); return 0; } @@ -1386,8 +1395,9 @@ void MonClient::_finish_command(MonCommand *r, bs::error_code ret, { ldout(cct, 10) << __func__ << " " << r->tid << " = " << ret << " " << rs << dendl; - ceph::async::post(std::move(r->onfinish), ret, std::string(rs), - std::move(bl)); + asio::post(service.get_executor(), + asio::append(std::move(r->onfinish), ret, std::string(rs), + std::move(bl))); if (r->target_con) { r->target_con->mark_down(); } @@ -1409,8 +1419,9 @@ void MonClient::handle_get_version_reply(MMonGetVersionReply* m) ldout(cct, 10) << __func__ << " finishing " << iter->first << " version " << m->version << dendl; version_requests.erase(iter); - ceph::async::post(std::move(req), bs::error_code(), - m->version, m->oldest_version); + asio::post(service.get_executor(), + asio::append(std::move(req), bs::error_code(), + m->version, m->oldest_version)); } m->put(); } diff --git a/src/mon/MonClient.h b/src/mon/MonClient.h index e725414f8bfc9..2beee15edae9e 100644 --- a/src/mon/MonClient.h +++ b/src/mon/MonClient.h @@ -22,9 +22,14 @@ #include #include +#include +#include #include +#include #include #include +#include +#include #include "msg/Messenger.h" @@ -32,7 +37,6 @@ #include "MonSub.h" #include "common/admin_socket.h" -#include "common/async/completion.h" #include "common/strtol.h" // for strict_strtoll() #include "common/Timer.h" #include "common/config.h" @@ -281,11 +285,11 @@ class MonClient : public Dispatcher, public: // Error, Newest, Oldest using VersionSig = void(boost::system::error_code, version_t, version_t); - using VersionCompletion = ceph::async::Completion; + using VersionCompletion = boost::asio::any_completion_handler; using CommandSig = void(boost::system::error_code, std::string, ceph::buffer::list); - using CommandCompletion = ceph::async::Completion; + using CommandCompletion = boost::asio::any_completion_handler; MonMap monmap; std::map config_mgr; @@ -569,10 +573,10 @@ private: uint64_t tid; std::vector cmd; ceph::buffer::list inbl; - std::unique_ptr onfinish; + CommandCompletion onfinish; std::optional cancel_timer; - MonCommand(MonClient& monc, uint64_t t, std::unique_ptr onfinish) + MonCommand(MonClient& monc, uint64_t t, CommandCompletion onfinish) : tid(t), onfinish(std::move(onfinish)) { auto timeout = monc.cct->_conf.get_val("rados_mon_op_timeout"); @@ -607,86 +611,109 @@ private: public: template - auto start_mon_command(const std::vector& cmd, - const ceph::buffer::list& inbl, + auto start_mon_command(std::vector cmd, + ceph::buffer::list inbl, CompletionToken&& token) { + namespace asio = boost::asio; ldout(cct,10) << __func__ << " cmd=" << cmd << dendl; - boost::asio::async_completion init(token); - { - std::scoped_lock l(monc_lock); - auto h = CommandCompletion::create(service.get_executor(), - std::move(init.completion_handler)); - if (!initialized || stopping) { - ceph::async::post(std::move(h), monc_errc::shutting_down, std::string{}, - bufferlist{}); - } else { - auto r = new MonCommand(*this, ++last_mon_command_tid, std::move(h)); - r->cmd = cmd; - r->inbl = inbl; - mon_commands.emplace(r->tid, r); - _send_command(r); - } - } - return init.result.get(); + auto consigned = asio::consign( + std::forward(token), asio::make_work_guard( + asio::get_associated_executor(token, service.get_executor()))); + return asio::async_initiate( + [this, cmd = std::move(cmd), + inbl = std::move(inbl)](auto handler) mutable { + std::scoped_lock l(monc_lock); + if (!initialized || stopping) { + asio::dispatch( + asio::get_associated_immediate_executor(handler, + service.get_executor()), + asio::append(std::move(handler), + make_error_code(monc_errc::shutting_down), + std::string{}, bufferlist{})); + } else { + auto r = new MonCommand(*this, ++last_mon_command_tid, + std::move(handler)); + r->cmd = std::move(cmd); + r->inbl = std::move(inbl); + mon_commands.emplace(r->tid, r); + _send_command(r); + } + }, consigned); } template - auto start_mon_command(int mon_rank, const std::vector& cmd, - const ceph::buffer::list& inbl, CompletionToken&& token) { + auto start_mon_command(int mon_rank, std::vector cmd, + ceph::buffer::list inbl, + CompletionToken&& token) { + namespace asio = boost::asio; + namespace sys = boost::system; ldout(cct,10) << __func__ << " cmd=" << cmd << dendl; - boost::asio::async_completion init(token); - { - std::scoped_lock l(monc_lock); - auto h = CommandCompletion::create(service.get_executor(), - std::move(init.completion_handler)); - if (!initialized || stopping) { - ceph::async::post(std::move(h), monc_errc::shutting_down, std::string{}, - bufferlist{}); - } else { - auto r = new MonCommand(*this, ++last_mon_command_tid, std::move(h)); - r->target_rank = mon_rank; - r->cmd = cmd; - r->inbl = inbl; - mon_commands.emplace(r->tid, r); - _send_command(r); - } - } - return init.result.get(); + auto consigned = asio::consign( + std::forward(token), asio::make_work_guard( + asio::get_associated_executor(token, service.get_executor()))); + return asio::async_initiate( + [this, mon_rank, cmd = std::move(cmd), + inbl = std::move(inbl)](auto handler) mutable { + std::scoped_lock l(monc_lock); + if (!initialized || stopping) { + asio::dispatch( + asio::get_associated_immediate_executor(handler, + service.get_executor()), + asio::append(std::move(handler), + make_error_code(monc_errc::shutting_down), + std::string{}, bufferlist{})); + } else { + auto r = new MonCommand(*this, ++last_mon_command_tid, + std::move(handler)); + r->target_rank = mon_rank; + r->cmd = std::move(cmd); + r->inbl = std::move(inbl); + mon_commands.emplace(r->tid, r); + _send_command(r); + } + }, consigned); } template - auto start_mon_command(const std::string& mon_name, - const std::vector& cmd, - const ceph::buffer::list& inbl, + auto start_mon_command(std::string mon_name, + std::vector cmd, + ceph::buffer::list inbl, CompletionToken&& token) { + namespace asio = boost::asio; ldout(cct,10) << __func__ << " cmd=" << cmd << dendl; - boost::asio::async_completion init(token); - { - std::scoped_lock l(monc_lock); - auto h = CommandCompletion::create(service.get_executor(), - std::move(init.completion_handler)); - if (!initialized || stopping) { - ceph::async::post(std::move(h), monc_errc::shutting_down, std::string{}, - bufferlist{}); - } else { - auto r = new MonCommand(*this, ++last_mon_command_tid, std::move(h)); - // detect/tolerate mon *rank* passed as a string - std::string err; - int rank = strict_strtoll(mon_name.c_str(), 10, &err); - if (err.size() == 0 && rank >= 0) { - ldout(cct,10) << __func__ << " interpreting name '" << mon_name - << "' as rank " << rank << dendl; - r->target_rank = rank; + auto consigned = asio::consign( + std::forward(token), asio::make_work_guard( + asio::get_associated_executor(token, service.get_executor()))); + return asio::async_initiate( + [this, mon_name = std::move(mon_name), cmd = std::move(cmd), + inbl = std::move(inbl)](auto handler) mutable { + std::scoped_lock l(monc_lock); + if (!initialized || stopping) { + asio::dispatch( + asio::get_associated_immediate_executor(handler, + service.get_executor()), + asio::append(std::move(handler), + make_error_code(monc_errc::shutting_down), + std::string{}, bufferlist{})); } else { - r->target_name = mon_name; + auto r = new MonCommand(*this, ++last_mon_command_tid, + std::move(handler)); + // detect/tolerate mon *rank* passed as a string + std::string err; + int rank = strict_strtoll(mon_name.c_str(), 10, &err); + if (err.size() == 0 && rank >= 0) { + ldout(cct,10) << __func__ << " interpreting name '" << mon_name + << "' as rank " << rank << dendl; + r->target_rank = rank; + } else { + r->target_name = std::move(mon_name); + } + r->cmd = std::move(cmd); + r->inbl = std::move(inbl); + mon_commands.emplace(r->tid, r); + _send_command(r); } - r->cmd = cmd; - r->inbl = inbl; - mon_commands.emplace(r->tid, r); - _send_command(r); - } - } - return init.result.get(); + }, consigned); } class ContextVerter { @@ -715,22 +742,24 @@ public: } }; - void start_mon_command(const std::vector& cmd, const bufferlist& inbl, + void start_mon_command(std::vector cmd, bufferlist inbl, bufferlist *outbl, std::string *outs, Context *onfinish) { - start_mon_command(cmd, inbl, ContextVerter(outs, outbl, onfinish)); + start_mon_command(std::move(cmd), std::move(inbl), + ContextVerter(outs, outbl, onfinish)); } - void start_mon_command(int mon_rank, - const std::vector& cmd, const bufferlist& inbl, - bufferlist *outbl, std::string *outs, + void start_mon_command(int mon_rank, std::vector cmd, + bufferlist inbl, bufferlist *outbl, std::string *outs, Context *onfinish) { - start_mon_command(mon_rank, cmd, inbl, ContextVerter(outs, outbl, onfinish)); + start_mon_command(mon_rank, std::move(cmd), std::move(inbl), + ContextVerter(outs, outbl, onfinish)); } - void start_mon_command(const std::string &mon_name, ///< mon name, with mon. prefix - const std::vector& cmd, const bufferlist& inbl, + void start_mon_command(std::string mon_name, ///< mon name, with mon. prefix + std::vector cmd, bufferlist inbl, bufferlist *outbl, std::string *outs, Context *onfinish) { - start_mon_command(mon_name, cmd, inbl, ContextVerter(outs, outbl, onfinish)); + start_mon_command(std::move(mon_name), std::move(cmd), std::move(inbl), + ContextVerter(outs, outbl, onfinish)); } @@ -747,19 +776,19 @@ public: */ template auto get_version(std::string&& map, CompletionToken&& token) { - boost::asio::async_completion init(token); - { - std::scoped_lock l(monc_lock); - auto m = ceph::make_message(); - m->what = std::move(map); - m->handle = ++version_req_id; - version_requests.emplace(m->handle, - VersionCompletion::create( - service.get_executor(), - std::move(init.completion_handler))); - _send_mon_message(m); - } - return init.result.get(); + namespace asio = boost::asio; + auto consigned = asio::consign( + std::forward(token), asio::make_work_guard( + asio::get_associated_executor(token, service.get_executor()))); + return asio::async_initiate( + [this, map = std::move(map)](auto handler) mutable { + std::scoped_lock l(monc_lock); + auto m = ceph::make_message(); + m->what = std::move(map); + m->handle = ++version_req_id; + version_requests.emplace(m->handle, std::move(handler)); + _send_mon_message(m); + }, consigned); } /** @@ -781,7 +810,7 @@ public: private: - std::map> version_requests; + std::map version_requests; ceph_tid_t version_req_id; void handle_get_version_reply(MMonGetVersionReply* m); md_config_t::config_callback config_cb; diff --git a/src/neorados/RADOS.cc b/src/neorados/RADOS.cc index 41f389aeb181f..4dcba611907fb 100644 --- a/src/neorados/RADOS.cc +++ b/src/neorados/RADOS.cc @@ -1016,7 +1016,8 @@ void RADOS::lookup_pool_(std::string name, LookupPoolComp c) return osdmap.lookup_pg_pool_name(name); }); if (ret < 0) - asio::dispatch(asio::append(std::move(c), osdc_errc::pool_dne, + asio::dispatch(asio::append(std::move(c), + make_error_code(osdc_errc::pool_dne), std::int64_t(0))); else asio::dispatch(asio::append(std::move(c), bs::error_code{}, ret)); @@ -1120,7 +1121,7 @@ bool RADOS::get_self_managed_snaps_mode(std::int64_t pool) const { return impl->objecter->with_osdmap([pool](const OSDMap& osdmap) { const auto pgpool = osdmap.get_pg_pool(pool); if (!pgpool) { - throw bs::system_error(bs::error_code(errc::pool_dne)); + throw bs::system_error(errc::pool_dne); } return pgpool->is_unmanaged_snaps_mode(); }); @@ -1130,11 +1131,11 @@ bool RADOS::get_self_managed_snaps_mode(std::string_view pool) const { return impl->objecter->with_osdmap([pool](const OSDMap& osdmap) { int64_t poolid = osdmap.lookup_pg_pool_name(pool); if (poolid < 0) { - throw bs::system_error(bs::error_code(errc::pool_dne)); + throw bs::system_error(errc::pool_dne); } const auto pgpool = osdmap.get_pg_pool(poolid); if (!pgpool) { - throw bs::system_error(bs::error_code(errc::pool_dne)); + throw bs::system_error(errc::pool_dne); } return pgpool->is_unmanaged_snaps_mode(); }); @@ -1158,11 +1159,11 @@ std::vector RADOS::list_snaps(std::string_view pool) const { return impl->objecter->with_osdmap([pool](const OSDMap& osdmap) { int64_t poolid = osdmap.lookup_pg_pool_name(pool); if (poolid < 0) { - throw bs::system_error(bs::error_code(errc::pool_dne)); + throw bs::system_error(errc::pool_dne); } const auto pgpool = osdmap.get_pg_pool(poolid); if (!pgpool) { - throw bs::system_error(bs::error_code(errc::pool_dne)); + throw bs::system_error(errc::pool_dne); } std::vector snaps; for (const auto& [snapid, snapinfo] : pgpool->snaps) { @@ -1176,12 +1177,12 @@ std::uint64_t RADOS::lookup_snap(std::int64_t pool, std::string_view snap) const return impl->objecter->with_osdmap([pool, snap](const OSDMap& osdmap) { const auto pgpool = osdmap.get_pg_pool(pool); if (!pgpool) { - throw bs::system_error(bs::error_code(errc::pool_dne)); + throw bs::system_error(errc::pool_dne); } for (const auto& [id, snapinfo] : pgpool->snaps) { if (snapinfo.name == snap) return id; } - throw bs::system_error(bs::error_code(errc::snap_dne)); + throw bs::system_error(errc::snap_dne); }); } @@ -1189,16 +1190,16 @@ std::uint64_t RADOS::lookup_snap(std::string_view pool, std::string_view snap) c return impl->objecter->with_osdmap([pool, snap](const OSDMap& osdmap) { int64_t poolid = osdmap.lookup_pg_pool_name(pool); if (poolid < 0) { - throw bs::system_error(bs::error_code(errc::pool_dne)); + throw bs::system_error(errc::pool_dne); } const auto pgpool = osdmap.get_pg_pool(poolid); if (!pgpool) { - throw bs::system_error(bs::error_code(errc::pool_dne)); + throw bs::system_error(errc::pool_dne); } for (const auto& [id, snapinfo] : pgpool->snaps) { if (snapinfo.name == snap) return id; } - throw bs::system_error(bs::error_code(errc::snap_dne)); + throw bs::system_error(errc::snap_dne); }); } @@ -1206,10 +1207,10 @@ std::string RADOS::get_snap_name(std::int64_t pool, std::uint64_t snap) const { return impl->objecter->with_osdmap([pool, snap](const OSDMap& osdmap) { const auto pgpool = osdmap.get_pg_pool(pool); if (!pgpool) { - throw bs::system_error(bs::error_code(errc::pool_dne)); + throw bs::system_error(errc::pool_dne); } if (auto i = pgpool->snaps.find(snap); i == pgpool->snaps.cend()) { - throw bs::system_error(bs::error_code(errc::snap_dne)); + throw bs::system_error(errc::snap_dne); } else { return i->second.name; } @@ -1220,14 +1221,14 @@ std::string RADOS::get_snap_name(std::string_view pool, return impl->objecter->with_osdmap([pool, snap](const OSDMap& osdmap) { int64_t poolid = osdmap.lookup_pg_pool_name(pool); if (poolid < 0) { - throw bs::system_error(bs::error_code(errc::pool_dne)); + throw bs::system_error(errc::pool_dne); } const auto pgpool = osdmap.get_pg_pool(poolid); if (!pgpool) { throw bs::system_error(bs::error_code(errc::pool_dne)); } if (auto i = pgpool->snaps.find(snap); i == pgpool->snaps.cend()) { - throw bs::system_error(bs::error_code(errc::snap_dne)); + throw bs::system_error(errc::snap_dne); } else { return i->second.name; } @@ -1239,10 +1240,10 @@ ceph::real_time RADOS::get_snap_timestamp(std::int64_t pool, return impl->objecter->with_osdmap([pool, snap](const OSDMap& osdmap) { const auto pgpool = osdmap.get_pg_pool(pool); if (!pgpool) { - throw bs::system_error(bs::error_code(errc::pool_dne)); + throw bs::system_error(errc::pool_dne); } if (auto i = pgpool->snaps.find(snap); i == pgpool->snaps.cend()) { - throw bs::system_error(bs::error_code(errc::snap_dne)); + throw bs::system_error(errc::snap_dne); } else { return i->second.stamp.to_real_time(); } @@ -1253,14 +1254,14 @@ ceph::real_time RADOS::get_snap_timestamp(std::string_view pool, return impl->objecter->with_osdmap([pool, snap](const OSDMap& osdmap) { int64_t poolid = osdmap.lookup_pg_pool_name(pool); if (poolid < 0) { - throw bs::system_error(bs::error_code(errc::pool_dne)); + throw bs::system_error(errc::pool_dne); } const auto pgpool = osdmap.get_pg_pool(poolid); if (!pgpool) { - throw bs::system_error(bs::error_code(errc::pool_dne)); + throw bs::system_error(errc::pool_dne); } if (auto i = pgpool->snaps.find(snap); i == pgpool->snaps.cend()) { - throw bs::system_error(bs::error_code(errc::snap_dne)); + throw bs::system_error(errc::snap_dne); } else { return i->second.stamp.to_real_time(); } @@ -1587,7 +1588,7 @@ void RADOS::next_notification_(uint64_t cookie, NextNotificationComp c) { n->add_handler(id, std::move(c)); } catch (const std::bad_any_cast&) { dispatch(asio::append(std::move(c), - bs::error_code(errc::polled_callback_watch), + make_error_code(errc::polled_callback_watch), Notification{})); } } diff --git a/src/osdc/Objecter.cc b/src/osdc/Objecter.cc index 2194816eeb800..32eb025074c7a 100644 --- a/src/osdc/Objecter.cc +++ b/src/osdc/Objecter.cc @@ -1598,7 +1598,7 @@ void Objecter::_check_op_pool_dne(Op *op, std::unique_lock *s << " dne" << dendl; if (op->has_completion()) { num_in_flight--; - op->complete(osdc_errc::pool_dne, -ENOENT, service.get_executor()); + op->complete(make_error_code(osdc_errc::pool_dne), -ENOENT, service.get_executor()); } OSDSession *s = op->session; @@ -1633,7 +1633,8 @@ void Objecter::_check_op_pool_eio(Op *op, std::unique_lock *s << " has eio" << dendl; if (op->has_completion()) { num_in_flight--; - op->complete(osdc_errc::pool_eio, -EIO, service.get_executor()); + op->complete(make_error_code(osdc_errc::pool_eio), -EIO, + service.get_executor()); } OSDSession *s = op->session; @@ -1733,13 +1734,15 @@ void Objecter::_check_linger_pool_dne(LingerOp *op, bool *need_unregister) if (op->on_reg_commit) { asio::defer(service.get_executor(), asio::append(std::move(op->on_reg_commit), - osdc_errc::pool_dne, cb::list{})); + make_error_code(osdc_errc::pool_dne), + cb::list{})); op->on_reg_commit = nullptr; } if (op->on_notify_finish) { asio::defer(service.get_executor(), asio::append(std::move(op->on_notify_finish), - osdc_errc::pool_dne, cb::list{})); + make_error_code(osdc_errc::pool_dne), + cb::list{})); op->on_notify_finish = nullptr; } *need_unregister = true; @@ -1757,12 +1760,12 @@ void Objecter::_check_linger_pool_eio(LingerOp *op) if (op->on_reg_commit) { asio::defer(service.get_executor(), asio::append(std::move(op->on_reg_commit), - osdc_errc::pool_dne, cb::list{})); + make_error_code(osdc_errc::pool_dne), cb::list{})); } if (op->on_notify_finish) { asio::defer(service.get_executor(), asio::append(std::move(op->on_notify_finish), - osdc_errc::pool_dne, cb::list{})); + make_error_code(osdc_errc::pool_dne), cb::list{})); } } @@ -2469,7 +2472,8 @@ void Objecter::_op_submit(Op *op, shunique_lock& sul, ceph_t break; case RECALC_OP_TARGET_POOL_EIO: if (op->has_completion()) { - op->complete(osdc_errc::pool_eio, -EIO, service.get_executor()); + op->complete(make_error_code(osdc_errc::pool_eio), -EIO, + service.get_executor()); } return; } @@ -4080,7 +4084,8 @@ void Objecter::create_pool_snap(int64_t pool, std::string_view snap_name, const pg_pool_t *p = osdmap->get_pg_pool(pool); if (!p) { asio::defer(service.get_executor(), - asio::append(std::move(onfinish), osdc_errc::pool_dne, cb::list{})); + asio::append(std::move(onfinish), + make_error_code(osdc_errc::pool_dne), cb::list{})); return; } if (p->snap_exists(snap_name)) { @@ -4149,14 +4154,17 @@ void Objecter::delete_pool_snap( const pg_pool_t *p = osdmap->get_pg_pool(pool); if (!p) { asio::defer(service.get_executor(), - asio::append(std::move(onfinish), osdc_errc::pool_dne, + asio::append(std::move(onfinish), + make_error_code(osdc_errc::pool_dne), cb::list{})); return; } if (!p->snap_exists(snap_name)) { asio::defer(service.get_executor(), - asio::append(std::move(onfinish), osdc_errc::snapshot_dne, cb::list{})); + asio::append(std::move(onfinish), + make_error_code(osdc_errc::snapshot_dne), + cb::list{})); return; } @@ -4197,7 +4205,8 @@ void Objecter::create_pool(std::string_view name, if (osdmap->lookup_pg_pool_name(name) >= 0) { asio::defer(service.get_executor(), - asio::append(std::move(onfinish), osdc_errc::pool_exists, + asio::append(std::move(onfinish), + make_error_code(osdc_errc::pool_exists), cb::list{})); return; } @@ -4222,7 +4231,8 @@ void Objecter::delete_pool(int64_t pool, if (!osdmap->have_pg_pool(pool)) asio::defer(service.get_executor(), - asio::append(std::move(onfinish), osdc_errc::pool_dne, + asio::append(std::move(onfinish), + make_error_code(osdc_errc::pool_dne), cb::list{})); else _do_delete_pool(pool, std::move(onfinish)); @@ -4238,7 +4248,8 @@ void Objecter::delete_pool(std::string_view pool_name, if (pool < 0) // This only returns one error: -ENOENT. asio::defer(service.get_executor(), - asio::append(std::move(onfinish), osdc_errc::pool_dne, + asio::append(std::move(onfinish), + make_error_code(osdc_errc::pool_dne), cb::list{})); else _do_delete_pool(pool, std::move(onfinish)); diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h index 81a85232516c5..51f1687fc4932 100644 --- a/src/osdc/Objecter.h +++ b/src/osdc/Objecter.h @@ -25,6 +25,7 @@ #include #include +#include #include #include #include @@ -2904,9 +2905,9 @@ public: }, consigned); } - auto wait_for_latest_osdmap(std::unique_ptr> c) { + auto wait_for_latest_osdmap(boost::asio::any_completion_handler c) { wait_for_latest_osdmap([c = std::move(c)](boost::system::error_code e) mutable { - c->dispatch(std::move(c), e); + boost::asio::dispatch(boost::asio::append(std::move(c), e)); }); } @@ -3081,18 +3082,6 @@ public: op_submit(o); } - void mutate(const object_t& oid, const object_locator_t& oloc, - ObjectOperation&& op, const SnapContext& snapc, - ceph::real_time mtime, int flags, - std::unique_ptr> oncommit, - version_t *objver = NULL, osd_reqid_t reqid = osd_reqid_t(), - ZTracer::Trace *parent_trace = nullptr) { - mutate(oid, oloc, std::move(op), snapc, mtime, flags, - [c = std::move(oncommit)](boost::system::error_code ec) mutable { - c->dispatch(std::move(c), ec); - }, objver, reqid, parent_trace); - } - Op *prepare_read_op( const object_t& oid, const object_locator_t& oloc, ObjectOperation& op, @@ -3158,18 +3147,6 @@ public: op_submit(o); } - void read(const object_t& oid, const object_locator_t& oloc, - ObjectOperation&& op, snapid_t snapid, ceph::buffer::list *pbl, - int flags, std::unique_ptr> onack, - version_t *objver = nullptr, int *data_offset = nullptr, - uint64_t features = 0, ZTracer::Trace *parent_trace = nullptr) { - read(oid, oloc, std::move(op), snapid, pbl, flags, - [c = std::move(onack)](boost::system::error_code e) mutable { - c->dispatch(std::move(c), e); - }, objver, data_offset, features, parent_trace); - } - - Op *prepare_pg_read_op( uint32_t hash, object_locator_t oloc, ObjectOperation& op, ceph::buffer::list *pbl, int flags, diff --git a/src/rgw/driver/rados/rgw_datalog.cc b/src/rgw/driver/rados/rgw_datalog.cc index 942a0e666de81..b1f6643a5d689 100644 --- a/src/rgw/driver/rados/rgw_datalog.cc +++ b/src/rgw/driver/rados/rgw_datalog.cc @@ -444,15 +444,11 @@ int RGWDataChangesLog::start(const DoutPrefixProvider *dpp, log_data = zone->log_data; try { // Blocking in startup code, not ideal, but won't hurt anything. - std::exception_ptr eptr - = asio::co_spawn(executor, - start(dpp, zoneparams.log_pool, - background_tasks, background_tasks, - background_tasks), - async::use_blocked); - if (eptr) { - std::rethrow_exception(eptr); - } + asio::co_spawn(executor, + start(dpp, zoneparams.log_pool, + background_tasks, background_tasks, + background_tasks), + async::use_blocked); } catch (const sys::system_error& e) { ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ": Failed to start datalog: " << e.what() @@ -462,7 +458,7 @@ int RGWDataChangesLog::start(const DoutPrefixProvider *dpp, ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ": Failed to start datalog: " << e.what() << dendl; - return -EIO; + return ceph::from_exception(std::current_exception()); } return 0; } @@ -1017,23 +1013,21 @@ int RGWDataChangesLog::add_entry(const DoutPrefixProvider* dpp, const rgw::bucket_log_layout_generation& gen, int shard_id, optional_yield y) { - std::exception_ptr eptr; - if (y) { - try { + try { + if (y) { add_entry(dpp, bucket_info, gen, shard_id, y.get_yield_context()); - } catch (const std::exception&) { - eptr = std::current_exception(); + } else { + maybe_warn_about_blocking(dpp); + asio::spawn(rados->get_executor(), + [this, dpp, &bucket_info, &gen, + &shard_id](asio::yield_context y) { + add_entry(dpp, bucket_info, gen, shard_id, y); + }, async::use_blocked); } - } else { - maybe_warn_about_blocking(dpp); - eptr = asio::spawn(rados->get_executor(), - [this, dpp, &bucket_info, &gen, - &shard_id](asio::yield_context y) { - add_entry(dpp, bucket_info, gen, shard_id, y); - }, - async::use_blocked); - } - return ceph::from_exception(eptr); + } catch (const std::exception&) { + return ceph::from_exception(std::current_exception()); + } + return 0; } asio::awaitable, @@ -1116,7 +1110,6 @@ int RGWDataChangesLog::list_entries( std::string_view marker, std::string* out_marker, bool* truncated, std::string* errstr, optional_yield y) { - std::exception_ptr eptr; std::tuple, std::string> out; if (shard >= num_shards) [[unlikely]] { @@ -1129,25 +1122,22 @@ int RGWDataChangesLog::list_entries( if (std::ssize(entries) < max_entries) { entries.resize(max_entries); } - if (y) { - auto& yield = y.get_yield_context(); - try { + try { + if (y) { + auto& yield = y.get_yield_context(); out = asio::co_spawn(yield.get_executor(), bes->list(dpp, shard, entries, std::string{marker}), yield); - } catch (const std::exception&) { - eptr = std::current_exception(); + } else { + maybe_warn_about_blocking(dpp); + out = asio::co_spawn(rados->get_executor(), + bes->list(dpp, shard, entries, + std::string{marker}), + async::use_blocked); } - } else { - maybe_warn_about_blocking(dpp); - std::tie(eptr, out) = asio::co_spawn(rados->get_executor(), - bes->list(dpp, shard, entries, - std::string{marker}), - async::use_blocked); - } - if (eptr) { - return ceph::from_exception(eptr); + } catch (const std::exception&) { + return ceph::from_exception(std::current_exception()); } auto& [outries, outmark] = out; if (auto size = std::ssize(outries); size < std::ssize(entries)) { @@ -1201,32 +1191,27 @@ int RGWDataChangesLog::list_entries(const DoutPrefixProvider *dpp,int max_entrie RGWDataChangesLogMarker& marker, bool *ptruncated, optional_yield y) { - std::exception_ptr eptr; std::tuple, RGWDataChangesLogMarker> out; if (std::ssize(entries) < max_entries) { entries.resize(max_entries); } - if (y) { - auto& yield = y.get_yield_context(); - try { + try { + if (y) { + auto& yield = y.get_yield_context(); out = asio::co_spawn(yield.get_executor(), list_entries(dpp, max_entries, RGWDataChangesLogMarker{marker}), yield); - } catch (const std::exception&) { - eptr = std::current_exception(); - } } else { - maybe_warn_about_blocking(dpp); - std::tie(eptr, out) = - asio::co_spawn(rados->get_executor(), - list_entries(dpp, max_entries, - RGWDataChangesLogMarker{marker}), - async::use_blocked); - } - if (eptr) { - return ceph::from_exception(eptr); + maybe_warn_about_blocking(dpp); + out = asio::co_spawn(rados->get_executor(), + list_entries(dpp, max_entries, + RGWDataChangesLogMarker{marker}), + async::use_blocked); + } + } catch (const std::exception&) { + return ceph::from_exception(std::current_exception()); } auto& [outries, outmark] = out; if (auto size = std::ssize(outries); size < std::ssize(entries)) { @@ -1251,26 +1236,25 @@ int RGWDataChangesLog::get_info(const DoutPrefixProvider* dpp, int shard_id, } } auto be = bes->head(); - std::exception_ptr eptr; - if (y) { - auto& yield = y.get_yield_context(); - try { + try { + if (y) { + auto& yield = y.get_yield_context(); *info = asio::co_spawn(yield.get_executor(), be->get_info(dpp, shard_id), yield); - } catch (const std::exception&) { - eptr = std::current_exception(); + } else { + maybe_warn_about_blocking(dpp); + *info = asio::co_spawn(rados->get_executor(), + be->get_info(dpp, shard_id), + async::use_blocked); } - } else { - maybe_warn_about_blocking(dpp); - std::tie(eptr, *info) = asio::co_spawn(rados->get_executor(), - be->get_info(dpp, shard_id), - async::use_blocked); + } catch (const std::exception&) { + return ceph::from_exception(std::current_exception()); } if (!info->marker.empty()) { info->marker = gencursor(be->gen_id, info->marker); } - return ceph::from_exception(eptr); + return 0; } asio::awaitable DataLogBackends::trim_entries( @@ -1311,23 +1295,22 @@ int RGWDataChangesLog::trim_entries(const DoutPrefixProvider *dpp, int shard_id, shard_id, num_shards); } } - std::exception_ptr eptr; - if (y) { - auto& yield = y.get_yield_context(); - try { + try { + if (y) { + auto& yield = y.get_yield_context(); asio::co_spawn(yield.get_executor(), bes->trim_entries(dpp, shard_id, marker), yield); - } catch (const std::exception& e) { - eptr = std::current_exception(); + } else { + maybe_warn_about_blocking(dpp); + asio::co_spawn(rados->get_executor(), + bes->trim_entries(dpp, shard_id, marker), + async::use_blocked); } - } else { - maybe_warn_about_blocking(dpp); - eptr = asio::co_spawn(rados->get_executor(), - bes->trim_entries(dpp, shard_id, marker), - async::use_blocked); + } catch (const std::exception& e) { + return ceph::from_exception(std::current_exception()); } - return ceph::from_exception(eptr); + return 0; } int RGWDataChangesLog::trim_entries(const DoutPrefixProvider* dpp, int shard_id, @@ -1578,47 +1561,47 @@ std::string RGWDataChangesLog::max_marker() const { } int RGWDataChangesLog::change_format(const DoutPrefixProvider *dpp, - log_type type,optional_yield y) { - std::exception_ptr eptr; - if (y) { - auto& yield = y.get_yield_context(); - try { + log_type type,optional_yield y) +{ + try { + if (y) { + auto& yield = y.get_yield_context(); asio::co_spawn(yield.get_executor(), bes->new_backing(dpp, type), yield); - } catch (const std::exception&) { - eptr = std::current_exception(); + } else { + maybe_warn_about_blocking(dpp); + asio::co_spawn(rados->get_executor(), + bes->new_backing(dpp, type), + async::use_blocked); } - } else { - maybe_warn_about_blocking(dpp); - eptr = asio::co_spawn(rados->get_executor(), - bes->new_backing(dpp, type), - async::use_blocked); + } catch (const std::exception&) { + return ceph::from_exception(std::current_exception()); } - return ceph::from_exception(eptr); + return 0;; } int RGWDataChangesLog::trim_generations(const DoutPrefixProvider *dpp, std::optional& through, - optional_yield y) { - std::exception_ptr eptr; - if (y) { - auto& yield = y.get_yield_context(); - try { + optional_yield y) +{ + try { + if (y) { + auto& yield = y.get_yield_context(); asio::co_spawn(yield.get_executor(), bes->trim_generations(dpp, through), yield); - } catch (const std::exception& e) { - eptr = std::current_exception(); + } else { + maybe_warn_about_blocking(dpp); + asio::co_spawn(rados->get_executor(), + bes->trim_generations(dpp, through), + async::use_blocked); } - - } else { - maybe_warn_about_blocking(dpp); - eptr = asio::co_spawn(rados->get_executor(), - bes->trim_generations(dpp, through), - async::use_blocked); + } catch (const std::exception& e) { + return ceph::from_exception(std::current_exception()); } - return ceph::from_exception(eptr); + + return 0; } asio::awaitable, diff --git a/src/rgw/radosgw-admin/radosgw-admin.cc b/src/rgw/radosgw-admin/radosgw-admin.cc index e9cb0a07e6110..62d07867fdbc9 100644 --- a/src/rgw/radosgw-admin/radosgw-admin.cc +++ b/src/rgw/radosgw-admin/radosgw-admin.cc @@ -3532,13 +3532,9 @@ void init_realm_param(CephContext *cct, string& var, std::optional& opt_ int run_coro(asio::awaitable coro, std::string_view name) { try { // Blocking in startup code, not ideal, but won't hurt anything. - std::exception_ptr eptr - = asio::co_spawn(static_cast(driver)->get_io_context(), - std::move(coro), - async::use_blocked); - if (eptr) { - std::rethrow_exception(eptr); - } + asio::co_spawn(static_cast(driver)->get_io_context(), + std::move(coro), + async::use_blocked); } catch (boost::system::system_error& e) { ldpp_dout(dpp(), -1) << name << ": failed: " << e.what() << dendl; return ceph::from_error_code(e.code()); diff --git a/src/test/common/test_blocked_completion.cc b/src/test/common/test_blocked_completion.cc index 14c91e4fbe0de..72def49a1f43d 100644 --- a/src/test/common/test_blocked_completion.cc +++ b/src/test/common/test_blocked_completion.cc @@ -13,31 +13,31 @@ */ +#include +#include #include #include #include + #include #include -#include "common/async/bind_handler.h" #include "common/async/blocked_completion.h" -#include "common/async/forward_handler.h" - using namespace std::literals; -namespace ba = boost::asio; -namespace bs = boost::system; -namespace ca = ceph::async; +namespace asio = boost::asio; +namespace sys = boost::system; +namespace async = ceph::async; class context_thread { - ba::io_context c; - ba::executor_work_guard guard; + asio::io_context c; + asio::executor_work_guard guard; std::thread th; public: context_thread() noexcept - : guard(ba::make_work_guard(c)), + : guard(asio::make_work_guard(c)), th([this]() noexcept { c.run();}) {} ~context_thread() { @@ -45,11 +45,11 @@ public: th.join(); } - ba::io_context& io_context() noexcept { + asio::io_context& io_context() noexcept { return c; } - ba::io_context::executor_type get_executor() noexcept { + asio::io_context::executor_type get_executor() noexcept { return c.get_executor(); } }; @@ -71,53 +71,53 @@ template auto id(const Executor& executor, CompletionToken&& token, Args&& ...args) { - ba::async_completion init(token); - boost::asio::post(ca::forward_handler( - ca::bind_handler(std::move(init.completion_handler), - std::forward(args)...))); - return init.result.get(); + return asio::async_initiate( + [](auto handler, Args2&& ...args2) mutable { + asio::post(asio::append(std::move(handler), + std::forward(args2)...)); + }, token, std::forward(args)...); } TEST(BlockedCompletion, Void) { context_thread t; - ba::post(t.get_executor(), ca::use_blocked); + asio::post(t.get_executor(), async::use_blocked); } TEST(BlockedCompletion, Timer) { context_thread t; - ba::steady_timer timer(t.io_context(), 50ms); - timer.async_wait(ca::use_blocked); + asio::steady_timer timer(t.io_context(), 50ms); + timer.async_wait(async::use_blocked); } TEST(BlockedCompletion, NoError) { context_thread t; - ba::steady_timer timer(t.io_context(), 1s); - bs::error_code ec; + asio::steady_timer timer(t.io_context(), 1s); + sys::error_code ec; - EXPECT_NO_THROW(id(t.get_executor(), ca::use_blocked, bs::error_code{})); - EXPECT_NO_THROW(id(t.get_executor(), ca::use_blocked[ec], bs::error_code{})); + EXPECT_NO_THROW(id(t.get_executor(), async::use_blocked, sys::error_code{})); + EXPECT_NO_THROW(id(t.get_executor(), async::use_blocked[ec], sys::error_code{})); EXPECT_FALSE(ec); int i; - EXPECT_NO_THROW(i = id(t.get_executor(), ca::use_blocked, - bs::error_code{}, 5)); + EXPECT_NO_THROW(i = id(t.get_executor(), async::use_blocked, + sys::error_code{}, 5)); ASSERT_EQ(5, i); - EXPECT_NO_THROW(i = id(t.get_executor(), ca::use_blocked[ec], - bs::error_code{}, 7)); + EXPECT_NO_THROW( + i = id(t.get_executor(), async::use_blocked[ec], sys::error_code{}, 7)); EXPECT_FALSE(ec); ASSERT_EQ(7, i); float j; - EXPECT_NO_THROW(std::tie(i, j) = id(t.get_executor(), ca::use_blocked, 9, + EXPECT_NO_THROW(std::tie(i, j) = id(t.get_executor(), async::use_blocked, 9, 3.5)); ASSERT_EQ(9, i); ASSERT_EQ(3.5, j); - EXPECT_NO_THROW(std::tie(i, j) = id(t.get_executor(), ca::use_blocked[ec], + EXPECT_NO_THROW(std::tie(i, j) = id(t.get_executor(), async::use_blocked[ec], 11, 2.25)); EXPECT_FALSE(ec); ASSERT_EQ(11, i); @@ -127,111 +127,111 @@ TEST(BlockedCompletion, NoError) TEST(BlockedCompletion, AnError) { context_thread t; - ba::steady_timer timer(t.io_context(), 1s); - bs::error_code ec; - - EXPECT_THROW(id(t.get_executor(), ca::use_blocked, - bs::error_code{EDOM, bs::system_category()}), - bs::system_error); - EXPECT_NO_THROW(id(t.get_executor(), ca::use_blocked[ec], - bs::error_code{EDOM, bs::system_category()})); - EXPECT_EQ(bs::error_code(EDOM, bs::system_category()), ec); - - EXPECT_THROW(id(t.get_executor(), ca::use_blocked, - bs::error_code{EDOM, bs::system_category()}, 5), - bs::system_error); - EXPECT_NO_THROW(id(t.get_executor(), ca::use_blocked[ec], - bs::error_code{EDOM, bs::system_category()}, 5)); - EXPECT_EQ(bs::error_code(EDOM, bs::system_category()), ec); - - EXPECT_THROW(id(t.get_executor(), ca::use_blocked, - bs::error_code{EDOM, bs::system_category()}, 5, 3), - bs::system_error); - EXPECT_NO_THROW(id(t.get_executor(), ca::use_blocked[ec], - bs::error_code{EDOM, bs::system_category()}, 5, 3)); - EXPECT_EQ(bs::error_code(EDOM, bs::system_category()), ec); + asio::steady_timer timer(t.io_context(), 1s); + sys::error_code ec; + + EXPECT_THROW(id(t.get_executor(), async::use_blocked, + sys::error_code{EDOM, sys::system_category()}), + sys::system_error); + EXPECT_NO_THROW(id(t.get_executor(), async::use_blocked[ec], + sys::error_code{EDOM, sys::system_category()})); + EXPECT_EQ(sys::error_code(EDOM, sys::system_category()), ec); + + EXPECT_THROW(id(t.get_executor(), async::use_blocked, + sys::error_code{EDOM, sys::system_category()}, 5), + sys::system_error); + EXPECT_NO_THROW(id(t.get_executor(), async::use_blocked[ec], + sys::error_code{EDOM, sys::system_category()}, 5)); + EXPECT_EQ(sys::error_code(EDOM, sys::system_category()), ec); + + EXPECT_THROW(id(t.get_executor(), async::use_blocked, + sys::error_code{EDOM, sys::system_category()}, 5, 3), + sys::system_error); + EXPECT_NO_THROW(id(t.get_executor(), async::use_blocked[ec], + sys::error_code{EDOM, sys::system_category()}, 5, 3)); + EXPECT_EQ(sys::error_code(EDOM, sys::system_category()), ec); } TEST(BlockedCompletion, MoveOnly) { context_thread t; - ba::steady_timer timer(t.io_context(), 1s); - bs::error_code ec; + asio::steady_timer timer(t.io_context(), 1s); + sys::error_code ec; - EXPECT_NO_THROW(id(t.get_executor(), ca::use_blocked, - bs::error_code{}, move_only{})); - EXPECT_NO_THROW(id(t.get_executor(), ca::use_blocked[ec], - bs::error_code{}, move_only{})); + EXPECT_NO_THROW(id(t.get_executor(), async::use_blocked, + sys::error_code{}, move_only{})); + EXPECT_NO_THROW(id(t.get_executor(), async::use_blocked[ec], + sys::error_code{}, move_only{})); EXPECT_FALSE(ec); { - auto [i, j] = id(t.get_executor(), ca::use_blocked, move_only{}, 5); + auto [i, j] = id(t.get_executor(), async::use_blocked, move_only{}, 5); EXPECT_EQ(j, 5); } { - auto [i, j] = id(t.get_executor(), ca::use_blocked[ec], move_only{}, 5); + auto [i, j] = id(t.get_executor(), async::use_blocked[ec], move_only{}, 5); EXPECT_EQ(j, 5); } EXPECT_FALSE(ec); - EXPECT_THROW(id(t.get_executor(), ca::use_blocked, - bs::error_code{EDOM, bs::system_category()}, move_only{}), - bs::system_error); - EXPECT_NO_THROW(id(t.get_executor(), ca::use_blocked[ec], - bs::error_code{EDOM, bs::system_category()}, move_only{})); - EXPECT_EQ(bs::error_code(EDOM, bs::system_category()), ec); + EXPECT_THROW(id(t.get_executor(), async::use_blocked, + sys::error_code{EDOM, sys::system_category()}, move_only{}), + sys::system_error); + EXPECT_NO_THROW(id(t.get_executor(), async::use_blocked[ec], + sys::error_code{EDOM, sys::system_category()}, move_only{})); + EXPECT_EQ(sys::error_code(EDOM, sys::system_category()), ec); - EXPECT_THROW(id(t.get_executor(), ca::use_blocked, - bs::error_code{EDOM, bs::system_category()}, move_only{}, 3), - bs::system_error); - EXPECT_NO_THROW(id(t.get_executor(), ca::use_blocked[ec], - bs::error_code{EDOM, bs::system_category()}, + EXPECT_THROW(id(t.get_executor(), async::use_blocked, + sys::error_code{EDOM, sys::system_category()}, move_only{}, 3), + sys::system_error); + EXPECT_NO_THROW(id(t.get_executor(), async::use_blocked[ec], + sys::error_code{EDOM, sys::system_category()}, move_only{}, 3)); - EXPECT_EQ(bs::error_code(EDOM, bs::system_category()), ec); + EXPECT_EQ(sys::error_code(EDOM, sys::system_category()), ec); } TEST(BlockedCompletion, DefaultLess) { context_thread t; - ba::steady_timer timer(t.io_context(), 1s); - bs::error_code ec; + asio::steady_timer timer(t.io_context(), 1s); + sys::error_code ec; { - auto l = id(t.get_executor(), ca::use_blocked, bs::error_code{}, defaultless{5}); + auto l = id(t.get_executor(), async::use_blocked, sys::error_code{}, defaultless{5}); EXPECT_EQ(5, l.a); } { - auto l = id(t.get_executor(), ca::use_blocked[ec], bs::error_code{}, defaultless{7}); + auto l = id(t.get_executor(), async::use_blocked[ec], sys::error_code{}, defaultless{7}); EXPECT_EQ(7, l.a); } { - auto [i, j] = id(t.get_executor(), ca::use_blocked, defaultless{3}, 5); + auto [i, j] = id(t.get_executor(), async::use_blocked, defaultless{3}, 5); EXPECT_EQ(i.a, 3); EXPECT_EQ(j, 5); } { - auto [i, j] = id(t.get_executor(), ca::use_blocked[ec], defaultless{3}, 5); + auto [i, j] = id(t.get_executor(), async::use_blocked[ec], defaultless{3}, 5); EXPECT_EQ(i.a, 3); EXPECT_EQ(j, 5); } EXPECT_FALSE(ec); - EXPECT_THROW(id(t.get_executor(), ca::use_blocked, - bs::error_code{EDOM, bs::system_category()}, move_only{}), - bs::system_error); - EXPECT_NO_THROW(id(t.get_executor(), ca::use_blocked[ec], - bs::error_code{EDOM, bs::system_category()}, move_only{})); - EXPECT_EQ(bs::error_code(EDOM, bs::system_category()), ec); - - EXPECT_THROW(id(t.get_executor(), ca::use_blocked, - bs::error_code{EDOM, bs::system_category()}, move_only{}, 3), - bs::system_error); - EXPECT_NO_THROW(id(t.get_executor(), ca::use_blocked[ec], - bs::error_code{EDOM, bs::system_category()}, + EXPECT_THROW(id(t.get_executor(), async::use_blocked, + sys::error_code{EDOM, sys::system_category()}, move_only{}), + sys::system_error); + EXPECT_NO_THROW(id(t.get_executor(), async::use_blocked[ec], + sys::error_code{EDOM, sys::system_category()}, move_only{})); + EXPECT_EQ(sys::error_code(EDOM, sys::system_category()), ec); + + EXPECT_THROW(id(t.get_executor(), async::use_blocked, + sys::error_code{EDOM, sys::system_category()}, move_only{}, 3), + sys::system_error); + EXPECT_NO_THROW(id(t.get_executor(), async::use_blocked[ec], + sys::error_code{EDOM, sys::system_category()}, move_only{}, 3)); - EXPECT_EQ(bs::error_code(EDOM, bs::system_category()), ec); + EXPECT_EQ(sys::error_code(EDOM, sys::system_category()), ec); } -- 2.39.5