]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
common/async: Update `use_blocked` for newer asio
authorAdam C. Emerson <aemerson@redhat.com>
Wed, 6 Aug 2025 20:02:32 +0000 (16:02 -0400)
committerAdam C. Emerson <aemerson@redhat.com>
Thu, 7 Aug 2025 19:32:15 +0000 (15:32 -0400)
Reimplement with `initiate` rather than the old style. This
necessitates getting rid of the old `async::Completion` in anything
that was calling it, and other changes.

Also, use disposition for error handling.

Signed-off-by: Adam C. Emerson <aemerson@redhat.com>
(cherry picked from commit e81d4eae4e76f9c279cd8f146dd6cc132a5ed51a)

Conflicts:
src/rgw/driver/rados/rgw_datalog.cc
 - This commit was on top of another bugfix with changes to
   startup/shutdown that has not yet merged.

Signed-off-by: Adam C. Emerson <aemerson@redhat.com>
src/common/async/blocked_completion.h
src/common/async/forward_handler.h
src/common/error_code.h
src/mon/MonClient.cc
src/mon/MonClient.h
src/neorados/RADOS.cc
src/osdc/Objecter.cc
src/osdc/Objecter.h
src/rgw/driver/rados/rgw_datalog.cc
src/rgw/radosgw-admin/radosgw-admin.cc
src/test/common/test_blocked_completion.cc

index 23a1319bc0fa378fd680fdfc56f7f10e2f2078ff..bdf75d906e5c9342768befae9d0b8850196b8c12 100644 (file)
 #ifndef CEPH_COMMON_ASYNC_BLOCKED_COMPLETION_H
 #define CEPH_COMMON_ASYNC_BLOCKED_COMPLETION_H
 
-#include <atomic>
 #include <condition_variable>
 #include <mutex>
 #include <optional>
-#include <type_traits>
 
 #include <boost/asio/async_result.hpp>
+#include <boost/asio/redirect_error.hpp>
 
 #include <boost/system/error_code.hpp>
-#include <boost/system/system_error.hpp>
+
+#include <common/async/concepts.h>
 
 namespace ceph::async {
 
 namespace bs = boost::system;
 
 class use_blocked_t {
-  use_blocked_t(bs::error_code* ec) : ec(ec) {}
 public:
   use_blocked_t() = default;
 
-  use_blocked_t operator [](bs::error_code& _ec) const {
-    return use_blocked_t(&_ec);
+  auto operator [](bs::error_code& ec) const {
+    return boost::asio::redirect_error(use_blocked_t{}, ec);
   }
-
-  bs::error_code* ec = nullptr;
 };
 
 inline constexpr use_blocked_t use_blocked;
 
 namespace detail {
-
-template<typename... Ts>
+// Obnoxiously repetitive, but it cuts down on the amount of
+// copying/moving/splicing/concatenating of tuples I need to do.
+template<typename ...Ts>
 struct blocked_handler
 {
-  blocked_handler(use_blocked_t b) noexcept : ec(b.ec) {}
+  blocked_handler(std::optional<std::tuple<Ts...>>* vals, std::mutex* m,
+                 std::condition_variable* cv, bool* done)
+    : vals(vals), m(m), cv(cv), done(done) {
+  }
 
-  void operator ()(Ts... values) noexcept {
+  template<typename ...Args>
+  void operator ()(Args&& ...args) noexcept {
+    static_assert(sizeof...(Ts) == sizeof...(Args));
     std::scoped_lock l(*m);
-    *ec = bs::error_code{};
-    *value = std::forward_as_tuple(std::move(values)...);
+    *vals = std::tuple<Ts...>(std::forward<Args>(args)...);
     *done = true;
     cv->notify_one();
   }
 
-  void operator ()(bs::error_code ec, Ts... values) noexcept {
+  //private:
+  std::optional<std::tuple<Ts...>>* vals;
+  std::mutex* m = nullptr;
+  std::condition_variable* cv = nullptr;
+  bool* done = nullptr;
+};
+
+template<boost::asio::disposition D, typename ...Ts>
+struct blocked_handler<D, Ts...>
+{
+  blocked_handler(D* dispo, std::optional<std::tuple<Ts...>>* vals,
+                 std::mutex* m, std::condition_variable* cv, bool* done)
+    : dispo(dispo), vals(vals), m(m), cv(cv), done(done) {
+  }
+
+  template<typename Arg0, typename... Args>
+  void operator ()(Arg0&& arg0, Args&& ...args) noexcept {
+    static_assert(sizeof...(Ts) == sizeof...(Args));
     std::scoped_lock l(*m);
-    *this->ec = ec;
-    *value = std::forward_as_tuple(std::move(values)...);
+    *dispo = std::move(arg0);
+    *vals = std::tuple<Ts...>(std::forward<Args>(args)...);
     *done = true;
     cv->notify_one();
   }
 
-  bs::error_code* ec;
-  std::optional<std::tuple<Ts...>>* value = nullptr;
+  //private:
+  D* dispo;
+  std::optional<std::tuple<Ts...>>* vals;
   std::mutex* m = nullptr;
   std::condition_variable* cv = nullptr;
   bool* done = nullptr;
@@ -78,27 +98,44 @@ struct blocked_handler
 template<typename T>
 struct blocked_handler<T>
 {
-  blocked_handler(use_blocked_t b) noexcept : ec(b.ec) {}
+  blocked_handler(std::optional<T>* val, std::mutex* m,
+                 std::condition_variable* cv, bool* done)
+    : val(val), m(m), cv(cv), done(done) {}
 
-  void operator ()(T value) noexcept {
+  template<typename Arg>
+  void operator ()(Arg&& arg) noexcept {
     std::scoped_lock l(*m);
-    *ec = bs::error_code();
-    *this->value = std::move(value);
+    *val = std::forward<Arg>(arg);
     *done = true;
     cv->notify_one();
   }
 
-  void operator ()(bs::error_code ec, T value) noexcept {
+  //private:
+  std::optional<T>* val;
+  std::mutex* m = nullptr;
+  std::condition_variable* cv = nullptr;
+  bool* done = nullptr;
+};
+
+template<boost::asio::disposition D, typename T>
+struct blocked_handler<D, T>
+{
+  blocked_handler(D* dispo, std::optional<T>* val, std::mutex* m,
+                 std::condition_variable* cv, bool* done)
+    : dispo(dispo), val(val), m(m), cv(cv), done(done) {}
+
+  template<typename Arg0, typename Arg>
+  void operator ()(Arg0&& arg0, Arg&& arg) noexcept {
     std::scoped_lock l(*m);
-    *this->ec = ec;
-    *this->value = std::move(value);
+    *dispo = std::move(arg0);
+    *val = std::move(arg);
     *done = true;
     cv->notify_one();
   }
 
   //private:
-  bs::error_code* ec;
-  std::optional<T>* value;
+  D* dispo;
+  std::optional<T>* val;
   std::mutex* m = nullptr;
   std::condition_variable* cv = nullptr;
   bool* done = nullptr;
@@ -107,23 +144,37 @@ struct blocked_handler<T>
 template<>
 struct blocked_handler<void>
 {
-  blocked_handler(use_blocked_t b) noexcept : ec(b.ec) {}
+  blocked_handler(std::mutex* m, std::condition_variable* cv, bool* done)
+    : m(m), cv(cv), done(done) {}
 
   void operator ()() noexcept {
     std::scoped_lock l(*m);
-    *ec = bs::error_code{};
     *done = true;
     cv->notify_one();
   }
 
-  void operator ()(bs::error_code ec) noexcept {
+  std::mutex* m = nullptr;
+  std::condition_variable* cv = nullptr;
+  bool* done = nullptr;
+};
+
+template<boost::asio::disposition D>
+struct blocked_handler<D>
+{
+  blocked_handler(D* dispo, std::mutex* m,
+                 std::condition_variable* cv, bool* done)
+    : dispo(dispo), m(m), cv(cv), done(done) {}
+
+  template<typename Arg0>
+  void operator ()(Arg0&& arg0) noexcept {
     std::scoped_lock l(*m);
-    *this->ec = ec;
+    *dispo = std::move(arg0);
     *done = true;
     cv->notify_one();
   }
 
-  bs::error_code* ec;
+  //private:
+  D* dispo;
   std::mutex* m = nullptr;
   std::condition_variable* cv = nullptr;
   bool* done = nullptr;
@@ -136,35 +187,65 @@ public:
   using completion_handler_type = blocked_handler<Ts...>;
   using return_type = std::tuple<Ts...>;
 
-  explicit blocked_result(completion_handler_type& h) noexcept {
-    std::scoped_lock l(m);
-    out_ec = h.ec;
-    if (!out_ec) h.ec = &ec;
-    h.value = &value;
-    h.m = &m;
-    h.cv = &cv;
-    h.done = &done;
-  }
+  template<typename Initiation, typename... Args>
+  static return_type initiate(Initiation&& init,
+                             use_blocked_t,
+                             Args&& ...args) {
+    using ttype = std::tuple<Ts...>;
+    std::optional<ttype> vals;
+    std::mutex m;
+    std::condition_variable cv;
+    bool done = false;
+    static_assert(std::tuple_size_v<ttype> > 1);
+
+    std::move(init)(completion_handler_type(&vals, &m, &cv, &done),
+                   std::forward<Args>(args)...);
 
-  return_type get() {
     std::unique_lock l(m);
-    cv.wait(l, [this]() { return done; });
-    if (!out_ec && ec) throw bs::system_error(ec);
-    return std::move(*value);
+    cv.wait(l, [&done]() { return done; });
+    return std::move(*vals);
   }
 
   blocked_result(const blocked_result&) = delete;
   blocked_result& operator =(const blocked_result&) = delete;
   blocked_result(blocked_result&&) = delete;
   blocked_result& operator =(blocked_result&&) = delete;
+};
 
-private:
-  bs::error_code* out_ec;
-  bs::error_code ec;
-  std::optional<return_type> value;
-  std::mutex m;
-  std::condition_variable cv;
-  bool done = false;
+template<boost::asio::disposition D, typename... Ts>
+class blocked_result<D, Ts...>
+{
+public:
+  using completion_handler_type = blocked_handler<D, Ts...>;
+  using return_type = std::tuple<Ts...>;
+
+  template<typename Initiation, typename... Args>
+  static return_type initiate(Initiation&& init,
+                             use_blocked_t,
+                             Args&& ...args) {
+    using ttype = std::tuple<Ts...>;
+    std::optional<ttype> vals;
+    D dispo;
+    std::mutex m;
+    std::condition_variable cv;
+    bool done = false;
+    static_assert(std::tuple_size_v<ttype> > 1);
+
+    std::move(init)(completion_handler_type(&dispo, &vals, &m, &cv, &done),
+                   std::forward<Args>(args)...);
+
+    std::unique_lock l(m);
+    cv.wait(l, [&done]() { return done; });
+    if (dispo != boost::asio::no_error) {
+      boost::asio::disposition_traits<D>::throw_exception(dispo);
+    }
+    return std::move(*vals);
+  }
+
+  blocked_result(const blocked_result&) = delete;
+  blocked_result& operator =(const blocked_result&) = delete;
+  blocked_result(blocked_result&&) = delete;
+  blocked_result& operator =(blocked_result&&) = delete;
 };
 
 template<typename T>
@@ -174,35 +255,61 @@ public:
   using completion_handler_type = blocked_handler<T>;
   using return_type = T;
 
-  explicit blocked_result(completion_handler_type& h) noexcept {
-    std::scoped_lock l(m);
-    out_ec = h.ec;
-    if (!out_ec) h.ec = &ec;
-    h.value = &value;
-    h.m = &m;
-    h.cv = &cv;
-    h.done = &done;
-  }
+  template<typename Initiation, typename... Args>
+  static return_type initiate(Initiation&& init,
+                             use_blocked_t,
+                             Args&& ...args) {
+    std::optional<T> val;
+    std::mutex m;
+    std::condition_variable cv;
+    bool done = false;
+
+    std::move(init)(completion_handler_type(&val, &m, &cv, &done),
+                   std::forward<Args>(args)...);
 
-  return_type get() {
     std::unique_lock l(m);
-    cv.wait(l, [this]() { return done; });
-    if (!out_ec && ec) throw bs::system_error(ec);
-    return std::move(*value);
+    cv.wait(l, [&done]() { return done; });
+    return std::move(*val);
   }
 
   blocked_result(const blocked_result&) = delete;
   blocked_result& operator =(const blocked_result&) = delete;
   blocked_result(blocked_result&&) = delete;
   blocked_result& operator =(blocked_result&&) = delete;
+};
+
+template<boost::asio::disposition D, typename T>
+class blocked_result<D, T>
+{
+public:
+  using completion_handler_type = blocked_handler<D, T>;
+  using return_type = T;
+
+  template<typename Initiation, typename... Args>
+  static return_type initiate(Initiation&& init,
+                             use_blocked_t,
+                             Args&& ...args) {
+    D dispo;
+    std::optional<T> val;
+    std::mutex m;
+    std::condition_variable cv;
+    bool done = false;
+
+    std::move(init)(completion_handler_type(&dispo, &val, &m, &cv, &done),
+                   std::forward<Args>(args)...);
+
+    std::unique_lock l(m);
+    cv.wait(l, [&done]() { return done; });
+    if (dispo != boost::asio::no_error) {
+      boost::asio::disposition_traits<D>::throw_exception(dispo);
+    }
+    return std::move(*val);
+  }
 
-private:
-  bs::error_code* out_ec;
-  bs::error_code ec;
-  std::optional<return_type> value;
-  std::mutex m;
-  std::condition_variable cv;
-  bool done = false;
+  blocked_result(const blocked_result&) = delete;
+  blocked_result& operator =(const blocked_result&) = delete;
+  blocked_result(blocked_result&&) = delete;
+  blocked_result& operator =(blocked_result&&) = delete;
 };
 
 template<>
@@ -212,78 +319,101 @@ public:
   using completion_handler_type = blocked_handler<void>;
   using return_type = void;
 
-  explicit blocked_result(completion_handler_type& h) noexcept {
-    std::scoped_lock l(m);
-    out_ec = h.ec;
-    if (!out_ec) h.ec = &ec;
-    h.m = &m;
-    h.cv = &cv;
-    h.done = &done;
-  }
+  template<typename Initiation, typename... Args>
+  static return_type initiate(Initiation&& init,
+                             use_blocked_t,
+                             Args&& ...args) {
+    std::mutex m;
+    std::condition_variable cv;
+    bool done = false;
+
+    std::move(init)(completion_handler_type(&m, &cv, &done),
+                   std::forward<Args>(args)...);
 
-  void get() {
     std::unique_lock l(m);
-    cv.wait(l, [this]() { return done; });
-    if (!out_ec && ec) throw bs::system_error(ec);
+    cv.wait(l, [&done]() { return done; });
+    return;
   }
 
   blocked_result(const blocked_result&) = delete;
   blocked_result& operator =(const blocked_result&) = delete;
   blocked_result(blocked_result&&) = delete;
   blocked_result& operator =(blocked_result&&) = delete;
+};
+
 
-private:
-  bs::error_code* out_ec;
-  bs::error_code ec;
-  std::mutex m;
-  std::condition_variable cv;
-  bool done = false;
+template<boost::asio::disposition D>
+class blocked_result<D>
+{
+public:
+  using completion_handler_type = blocked_handler<D>;
+  using return_type = void;
+
+  template<typename Initiation, typename... Args>
+  static return_type initiate(Initiation&& init,
+                             use_blocked_t,
+                             Args&& ...args) {
+    D dispo;
+    std::mutex m;
+    std::condition_variable cv;
+    bool done = false;
+
+    std::move(init)(completion_handler_type(&dispo, &m, &cv, &done),
+                   std::forward<Args>(args)...);
+
+    std::unique_lock l(m);
+    cv.wait(l, [&done]() { return done; });
+    if (dispo != boost::asio::no_error) {
+      boost::asio::disposition_traits<D>::throw_exception(dispo);
+    }
+    return;
+  }
+
+  blocked_result(const blocked_result&) = delete;
+  blocked_result& operator =(const blocked_result&) = delete;
+  blocked_result(blocked_result&&) = delete;
+  blocked_result& operator =(blocked_result&&) = delete;
 };
+
 } // namespace detail
 } // namespace ceph::async
 
 
 namespace boost::asio {
-template<typename ReturnType>
-class async_result<ceph::async::use_blocked_t, ReturnType()>
+template<typename R>
+class async_result<ceph::async::use_blocked_t, R()>
   : public ceph::async::detail::blocked_result<void>
 {
-public:
-  explicit async_result(typename ceph::async::detail::blocked_result<void>
-                       ::completion_handler_type& h)
-    : ceph::async::detail::blocked_result<void>(h) {}
 };
 
-template<typename ReturnType, typename... Args>
-class async_result<ceph::async::use_blocked_t, ReturnType(Args...)>
-  : public ceph::async::detail::blocked_result<std::decay_t<Args>...>
+template<typename R, disposition D>
+class async_result<ceph::async::use_blocked_t, R(D)>
+  : public ceph::async::detail::blocked_result<D>
 {
-public:
-  explicit async_result(
-    typename ceph::async::detail::blocked_result<std::decay_t<Args>...>::completion_handler_type& h)
-    : ceph::async::detail::blocked_result<std::decay_t<Args>...>(h) {}
 };
 
-template<typename ReturnType>
-class async_result<ceph::async::use_blocked_t,
-                  ReturnType(boost::system::error_code)>
-  : public ceph::async::detail::blocked_result<void>
+template<typename R, typename Arg>
+class async_result<ceph::async::use_blocked_t, R(Arg)>
+  : public ceph::async::detail::blocked_result<Arg>
 {
-public:
-  explicit async_result(
-    typename ceph::async::detail::blocked_result<void>::completion_handler_type& h)
-    : ceph::async::detail::blocked_result<void>(h) {}
 };
 
-template<typename ReturnType, typename... Args>
-class async_result<ceph::async::use_blocked_t,
-                  ReturnType(boost::system::error_code, Args...)>
-  : public ceph::async::detail::blocked_result<std::decay_t<Args>...>
+template<typename R, disposition D, typename Arg>
+class async_result<ceph::async::use_blocked_t, R(D, Arg)>
+  : public ceph::async::detail::blocked_result<D, Arg>
+{
+};
+
+template<typename R, typename... Args>
+class async_result<ceph::async::use_blocked_t, R(Args...)>
+  : public ceph::async::detail::blocked_result<Args...>
+{
+};
+
+template<typename R, disposition D, typename... Args>
+class async_result<ceph::async::use_blocked_t, R(D, Args...)>
+  : public ceph::async::detail::blocked_result<D, Args...>
 {
-public:
-  explicit async_result(
-    typename ceph::async::detail::blocked_result<std::decay_t<Args>...>::completion_handler_type& h)
-    : ceph::async::detail::blocked_result<std::decay_t<Args>...>(h) {}
 };
 }
 
index e204ca9862c3569447dcf31614a649c8616cf0dd..04fc68aa5c4f72bf8fd83528cd75c79085d87c07 100644 (file)
@@ -15,6 +15,8 @@
 #ifndef CEPH_ASYNC_FORWARD_HANDLER_H
 #define CEPH_ASYNC_FORWARD_HANDLER_H
 
+#include <utility>
+
 #include <boost/asio/associator.hpp>
 
 namespace ceph::async {
index f984a88ad087b11ac5707ffb85095db09778a49e..f37d7ce617df4b029768480b5c1b091de3911e7c 100644 (file)
@@ -94,7 +94,7 @@ inline boost::system::error_condition make_error_condition(errc e) noexcept {
 #pragma GCC diagnostic pop
 #pragma clang diagnostic pop
 
-inline int from_exception(std::exception_ptr eptr) {
+[[nodiscard]] inline int from_exception(std::exception_ptr eptr) {
   if (!eptr) [[likely]] {
     return 0;
   }
index 5e83a2cf32400011fb63f41d4510cde67b375c22..aebd3776a1040336dbed1696e54088e252eba73d 100644 (file)
@@ -62,6 +62,7 @@
 #undef dout_prefix
 #define dout_prefix *_dout << "monclient" << (_hunting() ? "(hunting)":"") << ": "
 
+namespace asio = boost::asio;
 namespace bs = boost::system;
 using std::string;
 using namespace std::literals;
@@ -534,8 +535,9 @@ void MonClient::shutdown()
   monc_lock.lock();
   stopping = true;
   while (!version_requests.empty()) {
-    ceph::async::post(std::move(version_requests.begin()->second),
-                     monc_errc::shutting_down, 0, 0);
+    asio::dispatch(
+      asio::append(std::move(version_requests.begin()->second),
+                  make_error_code(monc_errc::shutting_down), 0, 0));
     ldout(cct, 20) << __func__ << " canceling and discarding version request "
                   << version_requests.begin()->first << dendl;
     version_requests.erase(version_requests.begin());
@@ -710,7 +712,7 @@ void MonClient::_finish_auth(int auth_err)
     ceph_assert(auth);
     _check_auth_tickets();
   } else if (auth_err == -EAGAIN && !active_con) {
-    ldout(cct,10) << __func__ 
+    ldout(cct,10) << __func__
                   << " auth returned EAGAIN, reopening the session to try again"
                   << dendl;
     _reopen_session();
@@ -767,8 +769,9 @@ void MonClient::_reopen_session(int rank)
 
   // throw out version check requests
   while (!version_requests.empty()) {
-    ceph::async::post(std::move(version_requests.begin()->second),
-                     monc_errc::session_reset, 0, 0);
+    asio::dispatch(asio::append(std::move(version_requests.begin()->second),
+                               make_error_code(monc_errc::session_reset),
+                               0, 0));
     version_requests.erase(version_requests.begin());
   }
 
@@ -1168,7 +1171,8 @@ void MonClient::_send_command(MonCommand *r)
   if (r->is_tell()) {
     ++r->send_attempts;
     if (r->send_attempts > cct->_conf->mon_client_directed_command_retry) {
-      _finish_command(r, monc_errc::mon_unavailable, "mon unavailable", {});
+      _finish_command(r, make_error_code(monc_errc::mon_unavailable),
+                     "mon unavailable", {});
       return;
     }
     // tell-style command
@@ -1180,7 +1184,8 @@ void MonClient::_send_command(MonCommand *r)
        if (r->target_rank >= (int)monmap.size()) {
          ldout(cct, 10) << " target " << r->target_rank
                         << " >= max mon " << monmap.size() << dendl;
-         _finish_command(r, monc_errc::rank_dne, "mon rank dne"sv, {});
+         _finish_command(r, make_error_code(monc_errc::rank_dne),
+                         "mon rank dne"sv, {});
          return;
        }
        r->target_con = messenger->connect_to_mon(
@@ -1189,7 +1194,8 @@ void MonClient::_send_command(MonCommand *r)
        if (!monmap.contains(r->target_name)) {
          ldout(cct, 10) << " target " << r->target_name
                         << " not present in monmap" << dendl;
-         _finish_command(r, monc_errc::mon_dne, "mon dne"sv, {});
+         _finish_command(r, make_error_code(monc_errc::mon_dne),
+                         "mon dne"sv, {});
          return;
        }
        r->target_con = messenger->connect_to_mon(
@@ -1224,7 +1230,8 @@ void MonClient::_send_command(MonCommand *r)
       if (r->target_rank >= (int)monmap.size()) {
        ldout(cct, 10) << " target " << r->target_rank
                       << " >= max mon " << monmap.size() << dendl;
-       _finish_command(r, monc_errc::rank_dne, "mon rank dne"sv, {});
+       _finish_command(r, make_error_code(monc_errc::rank_dne),
+                       "mon rank dne"sv, {});
        return;
       }
       _reopen_session(r->target_rank);
@@ -1239,7 +1246,8 @@ void MonClient::_send_command(MonCommand *r)
       if (!monmap.contains(r->target_name)) {
        ldout(cct, 10) << " target " << r->target_name
                       << " not present in monmap" << dendl;
-       _finish_command(r, monc_errc::mon_dne, "mon dne"sv, {});
+       _finish_command(r, make_error_code(monc_errc::mon_dne),
+                       "mon dne"sv, {});
        return;
       }
       _reopen_session(monmap.get_rank(r->target_name));
@@ -1377,7 +1385,8 @@ int MonClient::_cancel_mon_command(uint64_t tid)
   ldout(cct, 10) << __func__ << " tid " << tid << dendl;
 
   MonCommand *cmd = it->second;
-  _finish_command(cmd, monc_errc::timed_out, "timed out"sv, {});
+  _finish_command(cmd, make_error_code(monc_errc::timed_out),
+                 "timed out"sv, {});
   return 0;
 }
 
@@ -1386,8 +1395,9 @@ void MonClient::_finish_command(MonCommand *r, bs::error_code ret,
 {
   ldout(cct, 10) << __func__ << " " << r->tid << " = " << ret << " " << rs
                 << dendl;
-  ceph::async::post(std::move(r->onfinish), ret, std::string(rs),
-                   std::move(bl));
+  asio::post(service.get_executor(),
+            asio::append(std::move(r->onfinish), ret, std::string(rs),
+                         std::move(bl)));
   if (r->target_con) {
     r->target_con->mark_down();
   }
@@ -1409,8 +1419,9 @@ void MonClient::handle_get_version_reply(MMonGetVersionReply* m)
     ldout(cct, 10) << __func__ << " finishing " << iter->first << " version "
                   << m->version << dendl;
     version_requests.erase(iter);
-    ceph::async::post(std::move(req), bs::error_code(),
-                     m->version, m->oldest_version);
+    asio::post(service.get_executor(),
+              asio::append(std::move(req), bs::error_code(),
+                           m->version, m->oldest_version));
   }
   m->put();
 }
index e725414f8bfc92e3f73ee1a1d4f5617945be9441..2beee15edae9efca25ea5c394e805928025e73ca 100644 (file)
 #include <string>
 #include <vector>
 
+#include <boost/asio/append.hpp>
+#include <boost/asio/consign.hpp>
 #include <boost/asio/io_context.hpp>
+#include <boost/asio/any_completion_handler.hpp>
 #include <boost/asio/steady_timer.hpp>
 #include <boost/asio/strand.hpp>
+#include <boost/asio/executor_work_guard.hpp>
+#include <boost/asio/post.hpp>
 
 #include "msg/Messenger.h"
 
@@ -32,7 +37,6 @@
 #include "MonSub.h"
 
 #include "common/admin_socket.h"
-#include "common/async/completion.h"
 #include "common/strtol.h" // for strict_strtoll()
 #include "common/Timer.h"
 #include "common/config.h"
@@ -281,11 +285,11 @@ class MonClient : public Dispatcher,
 public:
   // Error, Newest, Oldest
   using VersionSig = void(boost::system::error_code, version_t, version_t);
-  using VersionCompletion = ceph::async::Completion<VersionSig>;
+  using VersionCompletion = boost::asio::any_completion_handler<VersionSig>;
 
   using CommandSig = void(boost::system::error_code, std::string,
                          ceph::buffer::list);
-  using CommandCompletion = ceph::async::Completion<CommandSig>;
+  using CommandCompletion = boost::asio::any_completion_handler<CommandSig>;
 
   MonMap monmap;
   std::map<std::string,std::string> config_mgr;
@@ -569,10 +573,10 @@ private:
     uint64_t tid;
     std::vector<std::string> cmd;
     ceph::buffer::list inbl;
-    std::unique_ptr<CommandCompletion> onfinish;
+    CommandCompletion onfinish;
     std::optional<boost::asio::steady_timer> cancel_timer;
 
-    MonCommand(MonClient& monc, uint64_t t, std::unique_ptr<CommandCompletion> onfinish)
+    MonCommand(MonClient& monc, uint64_t t, CommandCompletion onfinish)
       : tid(t), onfinish(std::move(onfinish)) {
       auto timeout =
           monc.cct->_conf.get_val<std::chrono::seconds>("rados_mon_op_timeout");
@@ -607,86 +611,109 @@ private:
 
 public:
   template<typename CompletionToken>
-  auto start_mon_command(const std::vector<std::string>& cmd,
-                         const ceph::buffer::list& inbl,
+  auto start_mon_command(std::vector<std::string> cmd,
+                         ceph::buffer::list inbl,
                         CompletionToken&& token) {
+    namespace asio = boost::asio;
     ldout(cct,10) << __func__ << " cmd=" << cmd << dendl;
-    boost::asio::async_completion<CompletionToken, CommandSig> init(token);
-    {
-      std::scoped_lock l(monc_lock);
-      auto h = CommandCompletion::create(service.get_executor(),
-                                        std::move(init.completion_handler));
-      if (!initialized || stopping) {
-       ceph::async::post(std::move(h), monc_errc::shutting_down, std::string{},
-                         bufferlist{});
-      } else {
-       auto r = new MonCommand(*this, ++last_mon_command_tid, std::move(h));
-       r->cmd = cmd;
-       r->inbl = inbl;
-       mon_commands.emplace(r->tid, r);
-       _send_command(r);
-      }
-    }
-    return init.result.get();
+    auto consigned = asio::consign(
+      std::forward<CompletionToken>(token), asio::make_work_guard(
+       asio::get_associated_executor(token, service.get_executor())));
+    return asio::async_initiate<decltype(consigned), CommandSig>(
+      [this, cmd = std::move(cmd),
+       inbl = std::move(inbl)](auto handler) mutable {
+       std::scoped_lock l(monc_lock);
+       if (!initialized || stopping) {
+         asio::dispatch(
+           asio::get_associated_immediate_executor(handler,
+                                                   service.get_executor()),
+           asio::append(std::move(handler),
+                        make_error_code(monc_errc::shutting_down),
+                        std::string{}, bufferlist{}));
+       } else {
+         auto r = new MonCommand(*this, ++last_mon_command_tid,
+                                 std::move(handler));
+         r->cmd = std::move(cmd);
+         r->inbl = std::move(inbl);
+         mon_commands.emplace(r->tid, r);
+         _send_command(r);
+       }
+      }, consigned);
   }
 
   template<typename CompletionToken>
-  auto start_mon_command(int mon_rank, const std::vector<std::string>& cmd,
-                        const ceph::buffer::list& inbl, CompletionToken&& token) {
+  auto start_mon_command(int mon_rank, std::vector<std::string> cmd,
+                        ceph::buffer::list inbl,
+                        CompletionToken&& token) {
+    namespace asio = boost::asio;
+    namespace sys = boost::system;
     ldout(cct,10) << __func__ << " cmd=" << cmd << dendl;
-    boost::asio::async_completion<CompletionToken, CommandSig> init(token);
-    {
-      std::scoped_lock l(monc_lock);
-      auto h = CommandCompletion::create(service.get_executor(),
-                                        std::move(init.completion_handler));
-      if (!initialized || stopping) {
-       ceph::async::post(std::move(h), monc_errc::shutting_down, std::string{},
-                         bufferlist{});
-      } else {
-       auto r = new MonCommand(*this, ++last_mon_command_tid, std::move(h));
-       r->target_rank = mon_rank;
-       r->cmd = cmd;
-       r->inbl = inbl;
-       mon_commands.emplace(r->tid, r);
-       _send_command(r);
-      }
-    }
-    return init.result.get();
+    auto consigned = asio::consign(
+      std::forward<CompletionToken>(token), asio::make_work_guard(
+       asio::get_associated_executor(token, service.get_executor())));
+    return asio::async_initiate<decltype(consigned), CommandSig>(
+      [this, mon_rank, cmd = std::move(cmd),
+       inbl = std::move(inbl)](auto handler) mutable {
+       std::scoped_lock l(monc_lock);
+       if (!initialized || stopping) {
+         asio::dispatch(
+           asio::get_associated_immediate_executor(handler,
+                                                   service.get_executor()),
+           asio::append(std::move(handler),
+                        make_error_code(monc_errc::shutting_down),
+                        std::string{}, bufferlist{}));
+       } else {
+         auto r = new MonCommand(*this, ++last_mon_command_tid,
+                                 std::move(handler));
+         r->target_rank = mon_rank;
+         r->cmd = std::move(cmd);
+         r->inbl = std::move(inbl);
+         mon_commands.emplace(r->tid, r);
+         _send_command(r);
+       }
+      }, consigned);
   }
 
   template<typename CompletionToken>
-  auto start_mon_command(const std::string& mon_name,
-                         const std::vector<std::string>& cmd,
-                        const ceph::buffer::list& inbl,
+  auto start_mon_command(std::string mon_name,
+                         std::vector<std::string> cmd,
+                        ceph::buffer::list inbl,
                         CompletionToken&& token) {
+    namespace asio = boost::asio;
     ldout(cct,10) << __func__ << " cmd=" << cmd << dendl;
-    boost::asio::async_completion<CompletionToken, CommandSig> init(token);
-    {
-      std::scoped_lock l(monc_lock);
-      auto h = CommandCompletion::create(service.get_executor(),
-                                        std::move(init.completion_handler));
-      if (!initialized || stopping) {
-       ceph::async::post(std::move(h), monc_errc::shutting_down, std::string{},
-                         bufferlist{});
-      } else {
-       auto r = new MonCommand(*this, ++last_mon_command_tid, std::move(h));
-       // detect/tolerate mon *rank* passed as a string
-       std::string err;
-       int rank = strict_strtoll(mon_name.c_str(), 10, &err);
-       if (err.size() == 0 && rank >= 0) {
-         ldout(cct,10) << __func__ << " interpreting name '" << mon_name
-                       << "' as rank " << rank << dendl;
-         r->target_rank = rank;
+    auto consigned = asio::consign(
+      std::forward<CompletionToken>(token), asio::make_work_guard(
+       asio::get_associated_executor(token, service.get_executor())));
+    return asio::async_initiate<decltype(consigned), CommandSig>(
+      [this, mon_name = std::move(mon_name), cmd = std::move(cmd),
+       inbl = std::move(inbl)](auto handler) mutable {
+       std::scoped_lock l(monc_lock);
+       if (!initialized || stopping) {
+         asio::dispatch(
+           asio::get_associated_immediate_executor(handler,
+                                                   service.get_executor()),
+           asio::append(std::move(handler),
+                        make_error_code(monc_errc::shutting_down),
+                        std::string{}, bufferlist{}));
        } else {
-         r->target_name = mon_name;
+         auto r = new MonCommand(*this, ++last_mon_command_tid,
+                                 std::move(handler));
+         // detect/tolerate mon *rank* passed as a string
+         std::string err;
+         int rank = strict_strtoll(mon_name.c_str(), 10, &err);
+         if (err.size() == 0 && rank >= 0) {
+           ldout(cct,10) << __func__ << " interpreting name '" << mon_name
+                         << "' as rank " << rank << dendl;
+           r->target_rank = rank;
+         } else {
+           r->target_name = std::move(mon_name);
+         }
+         r->cmd = std::move(cmd);
+         r->inbl = std::move(inbl);
+         mon_commands.emplace(r->tid, r);
+         _send_command(r);
        }
-       r->cmd = cmd;
-       r->inbl = inbl;
-       mon_commands.emplace(r->tid, r);
-       _send_command(r);
-      }
-    }
-    return init.result.get();
+      }, consigned);
   }
 
   class ContextVerter {
@@ -715,22 +742,24 @@ public:
     }
   };
 
-  void start_mon_command(const std::vector<std::string>& cmd, const bufferlist& inbl,
+  void start_mon_command(std::vector<std::string> cmd, bufferlist inbl,
                         bufferlist *outbl, std::string *outs,
                         Context *onfinish) {
-    start_mon_command(cmd, inbl, ContextVerter(outs, outbl, onfinish));
+    start_mon_command(std::move(cmd), std::move(inbl),
+                     ContextVerter(outs, outbl, onfinish));
   }
-  void start_mon_command(int mon_rank,
-                        const std::vector<std::string>& cmd, const bufferlist& inbl,
-                        bufferlist *outbl, std::string *outs,
+  void start_mon_command(int mon_rank, std::vector<std::string> cmd,
+                        bufferlist inbl, bufferlist *outbl, std::string *outs,
                         Context *onfinish) {
-    start_mon_command(mon_rank, cmd, inbl, ContextVerter(outs, outbl, onfinish));
+    start_mon_command(mon_rank, std::move(cmd), std::move(inbl),
+                     ContextVerter(outs, outbl, onfinish));
   }
-  void start_mon_command(const std::string &mon_name,  ///< mon name, with mon. prefix
-                        const std::vector<std::string>& cmd, const bufferlist& inbl,
+  void start_mon_command(std::string mon_name,  ///< mon name, with mon. prefix
+                        std::vector<std::string> cmd, bufferlist inbl,
                         bufferlist *outbl, std::string *outs,
                         Context *onfinish) {
-    start_mon_command(mon_name, cmd, inbl, ContextVerter(outs, outbl, onfinish));
+    start_mon_command(std::move(mon_name), std::move(cmd), std::move(inbl),
+                     ContextVerter(outs, outbl, onfinish));
   }
 
 
@@ -747,19 +776,19 @@ public:
    */
   template<typename CompletionToken>
   auto get_version(std::string&& map, CompletionToken&& token) {
-    boost::asio::async_completion<CompletionToken, VersionSig> init(token);
-    {
-      std::scoped_lock l(monc_lock);
-      auto m = ceph::make_message<MMonGetVersion>();
-      m->what = std::move(map);
-      m->handle = ++version_req_id;
-      version_requests.emplace(m->handle,
-                              VersionCompletion::create(
-                                service.get_executor(),
-                                std::move(init.completion_handler)));
-      _send_mon_message(m);
-    }
-    return init.result.get();
+    namespace asio = boost::asio;
+    auto consigned = asio::consign(
+      std::forward<CompletionToken>(token), asio::make_work_guard(
+       asio::get_associated_executor(token, service.get_executor())));
+    return asio::async_initiate<decltype(consigned), VersionSig>(
+      [this, map = std::move(map)](auto handler) mutable {
+       std::scoped_lock l(monc_lock);
+       auto m = ceph::make_message<MMonGetVersion>();
+       m->what = std::move(map);
+       m->handle = ++version_req_id;
+       version_requests.emplace(m->handle, std::move(handler));
+       _send_mon_message(m);
+      }, consigned);
   }
 
   /**
@@ -781,7 +810,7 @@ public:
 
 private:
 
-  std::map<ceph_tid_t, std::unique_ptr<VersionCompletion>> version_requests;
+  std::map<ceph_tid_t, VersionCompletion> version_requests;
   ceph_tid_t version_req_id;
   void handle_get_version_reply(MMonGetVersionReply* m);
   md_config_t::config_callback config_cb;
index f0b100bf96da677b6fec2011d0d3fbad1fd850e8..c48a81ba7646d21a2e4055c711e24e2d0f1a9bff 100644 (file)
@@ -1009,7 +1009,8 @@ void RADOS::lookup_pool_(std::string name, LookupPoolComp c)
            return osdmap.lookup_pg_pool_name(name);
          });
        if (ret < 0)
-         asio::dispatch(asio::append(std::move(c), osdc_errc::pool_dne,
+         asio::dispatch(asio::append(std::move(c),
+                                     make_error_code(osdc_errc::pool_dne),
                                      std::int64_t(0)));
        else
          asio::dispatch(asio::append(std::move(c), bs::error_code{}, ret));
@@ -1113,7 +1114,7 @@ bool RADOS::get_self_managed_snaps_mode(std::int64_t pool) const {
   return impl->objecter->with_osdmap([pool](const OSDMap& osdmap) {
     const auto pgpool = osdmap.get_pg_pool(pool);
     if (!pgpool) {
-      throw bs::system_error(bs::error_code(errc::pool_dne));
+      throw bs::system_error(errc::pool_dne);
     }
     return pgpool->is_unmanaged_snaps_mode();
   });
@@ -1123,11 +1124,11 @@ bool RADOS::get_self_managed_snaps_mode(std::string_view pool) const {
   return impl->objecter->with_osdmap([pool](const OSDMap& osdmap) {
     int64_t poolid = osdmap.lookup_pg_pool_name(pool);
     if (poolid < 0) {
-      throw bs::system_error(bs::error_code(errc::pool_dne));
+      throw bs::system_error(errc::pool_dne);
     }
     const auto pgpool = osdmap.get_pg_pool(poolid);
     if (!pgpool) {
-      throw bs::system_error(bs::error_code(errc::pool_dne));
+      throw bs::system_error(errc::pool_dne);
     }
     return pgpool->is_unmanaged_snaps_mode();
   });
@@ -1151,11 +1152,11 @@ std::vector<std::uint64_t> RADOS::list_snaps(std::string_view pool) const {
   return impl->objecter->with_osdmap([pool](const OSDMap& osdmap) {
     int64_t poolid = osdmap.lookup_pg_pool_name(pool);
     if (poolid < 0) {
-      throw bs::system_error(bs::error_code(errc::pool_dne));
+      throw bs::system_error(errc::pool_dne);
     }
     const auto pgpool = osdmap.get_pg_pool(poolid);
     if (!pgpool) {
-      throw bs::system_error(bs::error_code(errc::pool_dne));
+      throw bs::system_error(errc::pool_dne);
     }
     std::vector<std::uint64_t> snaps;
     for (const auto& [snapid, snapinfo] : pgpool->snaps) {
@@ -1169,12 +1170,12 @@ std::uint64_t RADOS::lookup_snap(std::int64_t pool, std::string_view snap) const
   return impl->objecter->with_osdmap([pool, snap](const OSDMap& osdmap) {
     const auto pgpool = osdmap.get_pg_pool(pool);
     if (!pgpool) {
-      throw bs::system_error(bs::error_code(errc::pool_dne));
+      throw bs::system_error(errc::pool_dne);
     }
     for (const auto& [id, snapinfo] : pgpool->snaps) {
       if (snapinfo.name == snap) return id;
     }
-    throw bs::system_error(bs::error_code(errc::snap_dne));
+    throw bs::system_error(errc::snap_dne);
   });
 }
 
@@ -1182,16 +1183,16 @@ std::uint64_t RADOS::lookup_snap(std::string_view pool, std::string_view snap) c
   return impl->objecter->with_osdmap([pool, snap](const OSDMap& osdmap) {
     int64_t poolid = osdmap.lookup_pg_pool_name(pool);
     if (poolid < 0) {
-      throw bs::system_error(bs::error_code(errc::pool_dne));
+      throw bs::system_error(errc::pool_dne);
     }
     const auto pgpool = osdmap.get_pg_pool(poolid);
     if (!pgpool) {
-      throw bs::system_error(bs::error_code(errc::pool_dne));
+      throw bs::system_error(errc::pool_dne);
     }
     for (const auto& [id, snapinfo] : pgpool->snaps) {
       if (snapinfo.name == snap) return id;
     }
-    throw bs::system_error(bs::error_code(errc::snap_dne));
+    throw bs::system_error(errc::snap_dne);
   });
 }
 
@@ -1199,10 +1200,10 @@ std::string RADOS::get_snap_name(std::int64_t pool, std::uint64_t snap) const {
   return impl->objecter->with_osdmap([pool, snap](const OSDMap& osdmap) {
     const auto pgpool = osdmap.get_pg_pool(pool);
     if (!pgpool) {
-      throw bs::system_error(bs::error_code(errc::pool_dne));
+      throw bs::system_error(errc::pool_dne);
     }
     if (auto i = pgpool->snaps.find(snap); i == pgpool->snaps.cend()) {
-      throw bs::system_error(bs::error_code(errc::snap_dne));
+      throw bs::system_error(errc::snap_dne);
     } else {
       return i->second.name;
     }
@@ -1213,14 +1214,14 @@ std::string RADOS::get_snap_name(std::string_view pool,
   return impl->objecter->with_osdmap([pool, snap](const OSDMap& osdmap) {
     int64_t poolid = osdmap.lookup_pg_pool_name(pool);
     if (poolid < 0) {
-      throw bs::system_error(bs::error_code(errc::pool_dne));
+      throw bs::system_error(errc::pool_dne);
     }
     const auto pgpool = osdmap.get_pg_pool(poolid);
     if (!pgpool) {
       throw bs::system_error(bs::error_code(errc::pool_dne));
     }
     if (auto i = pgpool->snaps.find(snap); i == pgpool->snaps.cend()) {
-      throw bs::system_error(bs::error_code(errc::snap_dne));
+      throw bs::system_error(errc::snap_dne);
     } else {
       return i->second.name;
     }
@@ -1232,10 +1233,10 @@ ceph::real_time RADOS::get_snap_timestamp(std::int64_t pool,
   return impl->objecter->with_osdmap([pool, snap](const OSDMap& osdmap) {
     const auto pgpool = osdmap.get_pg_pool(pool);
     if (!pgpool) {
-      throw bs::system_error(bs::error_code(errc::pool_dne));
+      throw bs::system_error(errc::pool_dne);
     }
     if (auto i = pgpool->snaps.find(snap); i == pgpool->snaps.cend()) {
-      throw bs::system_error(bs::error_code(errc::snap_dne));
+      throw bs::system_error(errc::snap_dne);
     } else {
       return i->second.stamp.to_real_time();
     }
@@ -1246,14 +1247,14 @@ ceph::real_time RADOS::get_snap_timestamp(std::string_view pool,
   return impl->objecter->with_osdmap([pool, snap](const OSDMap& osdmap) {
     int64_t poolid = osdmap.lookup_pg_pool_name(pool);
     if (poolid < 0) {
-      throw bs::system_error(bs::error_code(errc::pool_dne));
+      throw bs::system_error(errc::pool_dne);
     }
     const auto pgpool = osdmap.get_pg_pool(poolid);
     if (!pgpool) {
-      throw bs::system_error(bs::error_code(errc::pool_dne));
+      throw bs::system_error(errc::pool_dne);
     }
     if (auto i = pgpool->snaps.find(snap); i == pgpool->snaps.cend()) {
-      throw bs::system_error(bs::error_code(errc::snap_dne));
+      throw bs::system_error(errc::snap_dne);
     } else {
       return i->second.stamp.to_real_time();
     }
@@ -1575,7 +1576,7 @@ void RADOS::next_notification_(uint64_t cookie, NextNotificationComp c) {
       n->add_handler(id, std::move(c));
     } catch (const std::bad_any_cast&) {
       dispatch(asio::append(std::move(c),
-                           bs::error_code(errc::polled_callback_watch),
+                           make_error_code(errc::polled_callback_watch),
                            Notification{}));
     }
 }
index 203eab51d225f6d0ea6c28a190a5e0cb276f4846..c45770ac9498f4a1626a20ee7cbe62c91d9aa6ba 100644 (file)
@@ -1598,7 +1598,7 @@ void Objecter::_check_op_pool_dne(Op *op, std::unique_lock<std::shared_mutex> *s
                     << " dne" << dendl;
       if (op->has_completion()) {
        num_in_flight--;
-       op->complete(osdc_errc::pool_dne, -ENOENT, service.get_executor());
+       op->complete(make_error_code(osdc_errc::pool_dne), -ENOENT, service.get_executor());
       }
 
       OSDSession *s = op->session;
@@ -1633,7 +1633,8 @@ void Objecter::_check_op_pool_eio(Op *op, std::unique_lock<std::shared_mutex> *s
                 << " has eio" << dendl;
   if (op->has_completion()) {
     num_in_flight--;
-    op->complete(osdc_errc::pool_eio, -EIO, service.get_executor());
+    op->complete(make_error_code(osdc_errc::pool_eio), -EIO,
+                service.get_executor());
   }
 
   OSDSession *s = op->session;
@@ -1733,13 +1734,15 @@ void Objecter::_check_linger_pool_dne(LingerOp *op, bool *need_unregister)
       if (op->on_reg_commit) {
        asio::defer(service.get_executor(),
                    asio::append(std::move(op->on_reg_commit),
-                                osdc_errc::pool_dne, cb::list{}));
+                                make_error_code(osdc_errc::pool_dne),
+                                cb::list{}));
        op->on_reg_commit = nullptr;
       }
       if (op->on_notify_finish) {
        asio::defer(service.get_executor(),
                    asio::append(std::move(op->on_notify_finish),
-                                osdc_errc::pool_dne, cb::list{}));
+                                make_error_code(osdc_errc::pool_dne),
+                                cb::list{}));
         op->on_notify_finish = nullptr;
       }
       *need_unregister = true;
@@ -1757,12 +1760,12 @@ void Objecter::_check_linger_pool_eio(LingerOp *op)
   if (op->on_reg_commit) {
     asio::defer(service.get_executor(),
                asio::append(std::move(op->on_reg_commit),
-                            osdc_errc::pool_dne, cb::list{}));
+                            make_error_code(osdc_errc::pool_dne), cb::list{}));
   }
   if (op->on_notify_finish) {
     asio::defer(service.get_executor(),
                asio::append(std::move(op->on_notify_finish),
-                            osdc_errc::pool_dne, cb::list{}));
+                            make_error_code(osdc_errc::pool_dne), cb::list{}));
   }
 }
 
@@ -2456,7 +2459,8 @@ void Objecter::_op_submit(Op *op, shunique_lock<ceph::shared_mutex>& sul, ceph_t
     break;
   case RECALC_OP_TARGET_POOL_EIO:
     if (op->has_completion()) {
-      op->complete(osdc_errc::pool_eio, -EIO, service.get_executor());
+      op->complete(make_error_code(osdc_errc::pool_eio), -EIO,
+                  service.get_executor());
     }
     return;
   }
@@ -4014,7 +4018,8 @@ void Objecter::create_pool_snap(int64_t pool, std::string_view snap_name,
   const pg_pool_t *p = osdmap->get_pg_pool(pool);
   if (!p) {
     asio::defer(service.get_executor(),
-               asio::append(std::move(onfinish), osdc_errc::pool_dne, cb::list{}));
+               asio::append(std::move(onfinish),
+                            make_error_code(osdc_errc::pool_dne), cb::list{}));
     return;
   }
   if (p->snap_exists(snap_name)) {
@@ -4083,14 +4088,17 @@ void Objecter::delete_pool_snap(
   const pg_pool_t *p = osdmap->get_pg_pool(pool);
   if (!p) {
     asio::defer(service.get_executor(),
-               asio::append(std::move(onfinish), osdc_errc::pool_dne,
+               asio::append(std::move(onfinish),
+                            make_error_code(osdc_errc::pool_dne),
                             cb::list{}));
     return;
   }
 
   if (!p->snap_exists(snap_name)) {
     asio::defer(service.get_executor(),
-               asio::append(std::move(onfinish), osdc_errc::snapshot_dne, cb::list{}));
+               asio::append(std::move(onfinish),
+                            make_error_code(osdc_errc::snapshot_dne),
+                            cb::list{}));
     return;
   }
 
@@ -4131,7 +4139,8 @@ void Objecter::create_pool(std::string_view name,
 
   if (osdmap->lookup_pg_pool_name(name) >= 0) {
     asio::defer(service.get_executor(),
-               asio::append(std::move(onfinish), osdc_errc::pool_exists,
+               asio::append(std::move(onfinish),
+                            make_error_code(osdc_errc::pool_exists),
                             cb::list{}));
     return;
   }
@@ -4156,7 +4165,8 @@ void Objecter::delete_pool(int64_t pool,
 
   if (!osdmap->have_pg_pool(pool))
     asio::defer(service.get_executor(),
-               asio::append(std::move(onfinish), osdc_errc::pool_dne,
+               asio::append(std::move(onfinish),
+                            make_error_code(osdc_errc::pool_dne),
                             cb::list{}));
   else
     _do_delete_pool(pool, std::move(onfinish));
@@ -4172,7 +4182,8 @@ void Objecter::delete_pool(std::string_view pool_name,
   if (pool < 0)
     // This only returns one error: -ENOENT.
     asio::defer(service.get_executor(),
-               asio::append(std::move(onfinish), osdc_errc::pool_dne,
+               asio::append(std::move(onfinish),
+                            make_error_code(osdc_errc::pool_dne),
                             cb::list{}));
   else
     _do_delete_pool(pool, std::move(onfinish));
index edf0f4d285ec3f37b0680c60fe7d1248a7c16b94..e9fdcf07cb91ca04f35d42c333c3cb459a432596 100644 (file)
@@ -25,6 +25,7 @@
 #include <variant>
 
 #include <boost/container/small_vector.hpp>
+#include <boost/asio/bind_executor.hpp>
 #include <boost/asio/any_completion_handler.hpp>
 #include <boost/asio/append.hpp>
 #include <boost/asio/async_result.hpp>
@@ -2892,9 +2893,9 @@ public:
       }, consigned);
   }
 
-  auto wait_for_latest_osdmap(std::unique_ptr<ceph::async::Completion<OpSignature>> c) {
+  auto wait_for_latest_osdmap(boost::asio::any_completion_handler<OpSignature> c) {
     wait_for_latest_osdmap([c = std::move(c)](boost::system::error_code e) mutable {
-      c->dispatch(std::move(c), e);
+      boost::asio::dispatch(boost::asio::append(std::move(c), e));
     });
   }
 
@@ -3067,18 +3068,6 @@ public:
     op_submit(o);
   }
 
-  void mutate(const object_t& oid, const object_locator_t& oloc,
-             ObjectOperation&& op, const SnapContext& snapc,
-             ceph::real_time mtime, int flags,
-             std::unique_ptr<ceph::async::Completion<Op::OpSig>> oncommit,
-             version_t *objver = NULL, osd_reqid_t reqid = osd_reqid_t(),
-             ZTracer::Trace *parent_trace = nullptr) {
-    mutate(oid, oloc, std::move(op), snapc, mtime, flags,
-          [c = std::move(oncommit)](boost::system::error_code ec) mutable {
-            c->dispatch(std::move(c), ec);
-          }, objver, reqid, parent_trace);
-  }
-
   Op *prepare_read_op(
     const object_t& oid, const object_locator_t& oloc,
     ObjectOperation& op,
@@ -3143,18 +3132,6 @@ public:
     op_submit(o);
   }
 
-  void read(const object_t& oid, const object_locator_t& oloc,
-           ObjectOperation&& op, snapid_t snapid, ceph::buffer::list *pbl,
-           int flags, std::unique_ptr<ceph::async::Completion<Op::OpSig>> onack,
-           version_t *objver = nullptr, int *data_offset = nullptr,
-           uint64_t features = 0, ZTracer::Trace *parent_trace = nullptr) {
-    read(oid, oloc, std::move(op), snapid, pbl, flags,
-        [c = std::move(onack)](boost::system::error_code e) mutable {
-          c->dispatch(std::move(c), e);
-        }, objver, data_offset, features, parent_trace);
-  }
-
-
   Op *prepare_pg_read_op(
     uint32_t hash, object_locator_t oloc,
     ObjectOperation& op, ceph::buffer::list *pbl, int flags,
index ffc6a5c3bd9d2fc6d86e87ed3e9ee7817562819c..521241b9f163037a984e75b93c6890b6359bfd2e 100644 (file)
@@ -446,15 +446,11 @@ int RGWDataChangesLog::start(const DoutPrefixProvider *dpp,
   rados = &store->get_neorados();
   try {
     // Blocking in startup code, not ideal, but won't hurt anything.
-    std::exception_ptr eptr
-      = asio::co_spawn(store->get_io_context(),
-                      start(dpp, zoneparams.log_pool,
-                            background_tasks, background_tasks,
-                            background_tasks),
-                      async::use_blocked);
-    if (eptr) {
-      std::rethrow_exception(eptr);
-    }
+    asio::co_spawn(store->get_io_context(),
+                  start(dpp, zoneparams.log_pool,
+                        background_tasks, background_tasks,
+                        background_tasks),
+                  async::use_blocked);
   } catch (const sys::system_error& e) {
     ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
                       << ": Failed to start datalog: " << e.what()
@@ -464,7 +460,7 @@ int RGWDataChangesLog::start(const DoutPrefixProvider *dpp,
     ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
                       << ": Failed to start datalog: " << e.what()
                       << dendl;
-    return -EIO;
+    return ceph::from_exception(std::current_exception());
   }
   return 0;
 }
@@ -1018,23 +1014,21 @@ int RGWDataChangesLog::add_entry(const DoutPrefixProvider* dpp,
                                 const rgw::bucket_log_layout_generation& gen,
                                 int shard_id, optional_yield y)
 {
-  std::exception_ptr eptr;
-  if (y) {
-    try {
+  try {
+    if (y) {
       add_entry(dpp, bucket_info, gen, shard_id, y.get_yield_context());
-    } catch (const std::exception&) {
-      eptr = std::current_exception();
+    } else {
+      maybe_warn_about_blocking(dpp);
+      asio::spawn(rados->get_executor(),
+                 [this, dpp, &bucket_info, &gen,
+                  &shard_id](asio::yield_context y) {
+                   add_entry(dpp, bucket_info, gen, shard_id, y);
+                 }, async::use_blocked);
     }
-  } else {
-    maybe_warn_about_blocking(dpp);
-    eptr = asio::spawn(rados->get_executor(),
-                      [this, dpp, &bucket_info, &gen,
-                       &shard_id](asio::yield_context y) {
-                        add_entry(dpp, bucket_info, gen, shard_id, y);
-                      },
-                      async::use_blocked);
-  }
-  return ceph::from_exception(eptr);
+  } catch (const std::exception&) {
+    return ceph::from_exception(std::current_exception());
+  }
+  return 0;
 }
 
 asio::awaitable<std::tuple<std::span<rgw_data_change_log_entry>,
@@ -1117,7 +1111,6 @@ int RGWDataChangesLog::list_entries(
   std::string_view marker, std::string* out_marker, bool* truncated,
   std::string* errstr, optional_yield y)
 {
-  std::exception_ptr eptr;
   std::tuple<std::span<rgw_data_change_log_entry>,
             std::string> out;
   if (shard >= num_shards) [[unlikely]] {
@@ -1130,25 +1123,22 @@ int RGWDataChangesLog::list_entries(
   if (std::ssize(entries) < max_entries) {
     entries.resize(max_entries);
   }
-  if (y) {
-    auto& yield = y.get_yield_context();
-    try {
+  try {
+    if (y) {
+      auto& yield = y.get_yield_context();
       out = asio::co_spawn(yield.get_executor(),
                           bes->list(dpp, shard, entries,
                                     std::string{marker}),
                           yield);
-    } catch (const std::exception&) {
-      eptr = std::current_exception();
+    } else {
+      maybe_warn_about_blocking(dpp);
+      out = asio::co_spawn(rados->get_executor(),
+                          bes->list(dpp, shard, entries,
+                                    std::string{marker}),
+                          async::use_blocked);
     }
-  } else {
-    maybe_warn_about_blocking(dpp);
-    std::tie(eptr, out) = asio::co_spawn(rados->get_executor(),
-                                        bes->list(dpp, shard, entries,
-                                                  std::string{marker}),
-                                        async::use_blocked);
-  }
-  if (eptr) {
-    return ceph::from_exception(eptr);
+  } catch (const std::exception&) {
+    return ceph::from_exception(std::current_exception());
   }
   auto& [outries, outmark] = out;
   if (auto size = std::ssize(outries); size < std::ssize(entries)) {
@@ -1202,32 +1192,27 @@ int RGWDataChangesLog::list_entries(const DoutPrefixProvider *dpp,int max_entrie
                                    RGWDataChangesLogMarker& marker, bool *ptruncated,
                                    optional_yield y)
 {
-  std::exception_ptr eptr;
   std::tuple<std::vector<rgw_data_change_log_entry>,
             RGWDataChangesLogMarker> out;
   if (std::ssize(entries) < max_entries) {
     entries.resize(max_entries);
   }
-  if (y) {
-    auto& yield = y.get_yield_context();
-    try {
+  try {
+    if (y) {
+      auto& yield = y.get_yield_context();
       out = asio::co_spawn(yield.get_executor(),
                           list_entries(dpp, max_entries,
                                        RGWDataChangesLogMarker{marker}),
                           yield);
-    } catch (const std::exception&) {
-      eptr = std::current_exception();
-    }
   } else {
-    maybe_warn_about_blocking(dpp);
-    std::tie(eptr, out) =
-      asio::co_spawn(rados->get_executor(),
-                    list_entries(dpp, max_entries,
-                                 RGWDataChangesLogMarker{marker}),
-                    async::use_blocked);
-  }
-  if (eptr) {
-    return ceph::from_exception(eptr);
+      maybe_warn_about_blocking(dpp);
+      out = asio::co_spawn(rados->get_executor(),
+                          list_entries(dpp, max_entries,
+                                       RGWDataChangesLogMarker{marker}),
+                          async::use_blocked);
+    }
+  } catch (const std::exception&) {
+    return ceph::from_exception(std::current_exception());
   }
   auto& [outries, outmark] = out;
   if (auto size = std::ssize(outries); size < std::ssize(entries)) {
@@ -1252,26 +1237,25 @@ int RGWDataChangesLog::get_info(const DoutPrefixProvider* dpp, int shard_id,
     }
   }
   auto be = bes->head();
-  std::exception_ptr eptr;
-  if (y) {
-    auto& yield = y.get_yield_context();
-    try {
+  try {
+    if (y) {
+      auto& yield = y.get_yield_context();
       *info = asio::co_spawn(yield.get_executor(),
                             be->get_info(dpp, shard_id),
                             yield);
-    } catch (const std::exception&) {
-      eptr = std::current_exception();
+    } else {
+      maybe_warn_about_blocking(dpp);
+      *info = asio::co_spawn(rados->get_executor(),
+                            be->get_info(dpp, shard_id),
+                            async::use_blocked);
     }
-  } else {
-    maybe_warn_about_blocking(dpp);
-    std::tie(eptr, *info) = asio::co_spawn(rados->get_executor(),
-                                          be->get_info(dpp, shard_id),
-                                          async::use_blocked);
+  } catch (const std::exception&) {
+    return ceph::from_exception(std::current_exception());
   }
   if (!info->marker.empty()) {
     info->marker = gencursor(be->gen_id, info->marker);
   }
-  return ceph::from_exception(eptr);
+  return 0;
 }
 
 asio::awaitable<void> DataLogBackends::trim_entries(
@@ -1312,23 +1296,22 @@ int RGWDataChangesLog::trim_entries(const DoutPrefixProvider *dpp, int shard_id,
        shard_id, num_shards);
     }
   }
-  std::exception_ptr eptr;
-  if (y) {
-    auto& yield = y.get_yield_context();
-    try {
+  try {
+    if (y) {
+      auto& yield = y.get_yield_context();
       asio::co_spawn(yield.get_executor(),
                     bes->trim_entries(dpp, shard_id, marker),
                     yield);
-    } catch (const std::exception& e) {
-      eptr = std::current_exception();
+    } else {
+      maybe_warn_about_blocking(dpp);
+      asio::co_spawn(rados->get_executor(),
+                    bes->trim_entries(dpp, shard_id, marker),
+                    async::use_blocked);
     }
-  } else {
-    maybe_warn_about_blocking(dpp);
-    eptr = asio::co_spawn(rados->get_executor(),
-                         bes->trim_entries(dpp, shard_id, marker),
-                         async::use_blocked);
+  } catch (const std::exception& e) {
+    return ceph::from_exception(std::current_exception());
   }
-  return ceph::from_exception(eptr);
+  return 0;
 }
 
 int RGWDataChangesLog::trim_entries(const DoutPrefixProvider* dpp, int shard_id,
@@ -1443,12 +1426,9 @@ RGWDataChangesLog::~RGWDataChangesLog() {
 void RGWDataChangesLog::blocking_shutdown() {
   if (!down_flag) {
     try {
-      auto eptr = asio::co_spawn(rados->get_io_context(),
-                                shutdown_or_timeout(),
-                                async::use_blocked);
-      if (eptr) {
-       std::rethrow_exception(eptr);
-      }
+      asio::co_spawn(rados->get_io_context(),
+                    shutdown_or_timeout(),
+                    async::use_blocked);
     } catch (const sys::system_error& e) {
       lderr(cct) << __PRETTY_FUNCTION__
                 << ": Failed to shutting down: " << e.what()
@@ -1553,47 +1533,47 @@ std::string RGWDataChangesLog::max_marker() const {
 }
 
 int RGWDataChangesLog::change_format(const DoutPrefixProvider *dpp,
-                                    log_type type,optional_yield y) {
-  std::exception_ptr eptr;
-  if (y) {
-    auto& yield = y.get_yield_context();
-    try {
+                                    log_type type,optional_yield y)
+{
+  try {
+    if (y) {
+      auto& yield = y.get_yield_context();
       asio::co_spawn(yield.get_executor(),
                     bes->new_backing(dpp, type),
                     yield);
-    } catch (const std::exception&) {
-      eptr = std::current_exception();
+    } else {
+      maybe_warn_about_blocking(dpp);
+      asio::co_spawn(rados->get_executor(),
+                    bes->new_backing(dpp, type),
+                    async::use_blocked);
     }
-  } else {
-    maybe_warn_about_blocking(dpp);
-    eptr = asio::co_spawn(rados->get_executor(),
-                         bes->new_backing(dpp, type),
-                         async::use_blocked);
+  } catch (const std::exception&) {
+    return ceph::from_exception(std::current_exception());
   }
-  return ceph::from_exception(eptr);
+  return 0;;
 }
 
 int RGWDataChangesLog::trim_generations(const DoutPrefixProvider *dpp,
                                        std::optional<uint64_t>& through,
-                                       optional_yield y) {
-  std::exception_ptr eptr;
-  if (y) {
-    auto& yield = y.get_yield_context();
-    try {
+                                       optional_yield y)
+{
+  try {
+    if (y) {
+      auto& yield = y.get_yield_context();
       asio::co_spawn(yield.get_executor(),
                     bes->trim_generations(dpp, through),
                     yield);
-    } catch (const std::exception& e) {
-      eptr = std::current_exception();
+    } else {
+      maybe_warn_about_blocking(dpp);
+      asio::co_spawn(rados->get_executor(),
+                    bes->trim_generations(dpp, through),
+                    async::use_blocked);
     }
-
-  } else {
-    maybe_warn_about_blocking(dpp);
-    eptr = asio::co_spawn(rados->get_executor(),
-                         bes->trim_generations(dpp, through),
-                         async::use_blocked);
+  } catch (const std::exception& e) {
+    return ceph::from_exception(std::current_exception());
   }
-  return ceph::from_exception(eptr);
+
+  return 0;
 }
 
 asio::awaitable<std::pair<bc::flat_map<std::string, uint64_t>,
index 4e9ac3abfd3b990f3497e6ba464152f5fedc296f..5826ca505ffeeb8aaff90adc792bdf204b6efaaf 100644 (file)
@@ -3532,13 +3532,9 @@ void init_realm_param(CephContext *cct, string& var, std::optional<string>& opt_
 int run_coro(asio::awaitable<void> coro, std::string_view name) {
   try {
     // Blocking in startup code, not ideal, but won't hurt anything.
-    std::exception_ptr eptr
-      = asio::co_spawn(static_cast<rgw::sal::RadosStore*>(driver)->get_io_context(),
-                      std::move(coro),
-                      async::use_blocked);
-    if (eptr) {
-      std::rethrow_exception(eptr);
-    }
+    asio::co_spawn(static_cast<rgw::sal::RadosStore*>(driver)->get_io_context(),
+                  std::move(coro),
+                  async::use_blocked);
   } catch (boost::system::system_error& e) {
     ldpp_dout(dpp(), -1) << name << ": failed: " << e.what() << dendl;
     return ceph::from_error_code(e.code());
index 14c91e4fbe0de4fce92cecfe764716a0d9dda81d..72def49a1f43dbcf171447f5adc2ed53f1cf71e2 100644 (file)
  */
 
 
+#include <boost/asio/append.hpp>
+#include <boost/asio/async_result.hpp>
 #include <boost/asio/io_context.hpp>
 #include <boost/asio/post.hpp>
 #include <boost/asio/steady_timer.hpp>
+
 #include <boost/system/error_code.hpp>
 
 #include <gtest/gtest.h>
 
-#include "common/async/bind_handler.h"
 #include "common/async/blocked_completion.h"
-#include "common/async/forward_handler.h"
-
 using namespace std::literals;
 
-namespace ba = boost::asio;
-namespace bs = boost::system;
-namespace ca = ceph::async;
+namespace asio = boost::asio;
+namespace sys = boost::system;
+namespace async = ceph::async;
 
 class context_thread {
-  ba::io_context c;
-  ba::executor_work_guard<ba::io_context::executor_type> guard;
+  asio::io_context c;
+  asio::executor_work_guard<asio::io_context::executor_type> guard;
   std::thread th;
 
 public:
   context_thread() noexcept
-    : guard(ba::make_work_guard(c)),
+    : guard(asio::make_work_guard(c)),
       th([this]() noexcept { c.run();}) {}
 
   ~context_thread() {
@@ -45,11 +45,11 @@ public:
     th.join();
   }
 
-  ba::io_context& io_context() noexcept {
+  asio::io_context& io_context() noexcept {
     return c;
   }
 
-  ba::io_context::executor_type get_executor() noexcept {
+  asio::io_context::executor_type get_executor() noexcept {
     return c.get_executor();
   }
 };
@@ -71,53 +71,53 @@ template<typename Executor, typename CompletionToken, typename... Args>
 auto id(const Executor& executor, CompletionToken&& token,
        Args&& ...args)
 {
-  ba::async_completion<CompletionToken, void(Args...)> init(token);
-  boost::asio::post(ca::forward_handler(
-                 ca::bind_handler(std::move(init.completion_handler),
-                                  std::forward<Args>(args)...)));
-  return init.result.get();
+  return asio::async_initiate<CompletionToken, void(Args...)>(
+    []<typename ...Args2>(auto handler, Args2&& ...args2) mutable {
+      asio::post(asio::append(std::move(handler),
+                             std::forward<Args2>(args2)...));
+    }, token, std::forward<Args>(args)...);
 }
 
 TEST(BlockedCompletion, Void)
 {
   context_thread t;
 
-  ba::post(t.get_executor(), ca::use_blocked);
+  asio::post(t.get_executor(), async::use_blocked);
 }
 
 TEST(BlockedCompletion, Timer)
 {
   context_thread t;
-  ba::steady_timer timer(t.io_context(), 50ms);
-  timer.async_wait(ca::use_blocked);
+  asio::steady_timer timer(t.io_context(), 50ms);
+  timer.async_wait(async::use_blocked);
 }
 
 TEST(BlockedCompletion, NoError)
 {
   context_thread t;
-  ba::steady_timer timer(t.io_context(), 1s);
-  bs::error_code ec;
+  asio::steady_timer timer(t.io_context(), 1s);
+  sys::error_code ec;
 
-  EXPECT_NO_THROW(id(t.get_executor(), ca::use_blocked, bs::error_code{}));
-  EXPECT_NO_THROW(id(t.get_executor(), ca::use_blocked[ec], bs::error_code{}));
+  EXPECT_NO_THROW(id(t.get_executor(), async::use_blocked, sys::error_code{}));
+  EXPECT_NO_THROW(id(t.get_executor(), async::use_blocked[ec], sys::error_code{}));
   EXPECT_FALSE(ec);
 
   int i;
-  EXPECT_NO_THROW(i = id(t.get_executor(), ca::use_blocked,
-                        bs::error_code{}, 5));
+  EXPECT_NO_THROW(i = id(t.get_executor(), async::use_blocked,
+                        sys::error_code{}, 5));
   ASSERT_EQ(5, i);
-  EXPECT_NO_THROW(i = id(t.get_executor(), ca::use_blocked[ec],
-                        bs::error_code{}, 7));
+  EXPECT_NO_THROW(
+      i = id(t.get_executor(), async::use_blocked[ec], sys::error_code{}, 7));
   EXPECT_FALSE(ec);
   ASSERT_EQ(7, i);
 
   float j;
 
-  EXPECT_NO_THROW(std::tie(i, j) = id(t.get_executor(), ca::use_blocked, 9,
+  EXPECT_NO_THROW(std::tie(i, j) = id(t.get_executor(), async::use_blocked, 9,
                                      3.5));
   ASSERT_EQ(9, i);
   ASSERT_EQ(3.5, j);
-  EXPECT_NO_THROW(std::tie(i, j) = id(t.get_executor(), ca::use_blocked[ec],
+  EXPECT_NO_THROW(std::tie(i, j) = id(t.get_executor(), async::use_blocked[ec],
                                      11, 2.25));
   EXPECT_FALSE(ec);
   ASSERT_EQ(11, i);
@@ -127,111 +127,111 @@ TEST(BlockedCompletion, NoError)
 TEST(BlockedCompletion, AnError)
 {
   context_thread t;
-  ba::steady_timer timer(t.io_context(), 1s);
-  bs::error_code ec;
-
-  EXPECT_THROW(id(t.get_executor(), ca::use_blocked,
-                 bs::error_code{EDOM, bs::system_category()}),
-              bs::system_error);
-  EXPECT_NO_THROW(id(t.get_executor(), ca::use_blocked[ec],
-                    bs::error_code{EDOM, bs::system_category()}));
-  EXPECT_EQ(bs::error_code(EDOM, bs::system_category()), ec);
-
-  EXPECT_THROW(id(t.get_executor(), ca::use_blocked,
-                 bs::error_code{EDOM, bs::system_category()}, 5),
-              bs::system_error);
-  EXPECT_NO_THROW(id(t.get_executor(), ca::use_blocked[ec],
-                    bs::error_code{EDOM, bs::system_category()}, 5));
-  EXPECT_EQ(bs::error_code(EDOM, bs::system_category()), ec);
-
-  EXPECT_THROW(id(t.get_executor(), ca::use_blocked,
-                 bs::error_code{EDOM, bs::system_category()}, 5, 3),
-              bs::system_error);
-  EXPECT_NO_THROW(id(t.get_executor(), ca::use_blocked[ec],
-                    bs::error_code{EDOM, bs::system_category()}, 5, 3));
-  EXPECT_EQ(bs::error_code(EDOM, bs::system_category()), ec);
+  asio::steady_timer timer(t.io_context(), 1s);
+  sys::error_code ec;
+
+  EXPECT_THROW(id(t.get_executor(), async::use_blocked,
+                 sys::error_code{EDOM, sys::system_category()}),
+              sys::system_error);
+  EXPECT_NO_THROW(id(t.get_executor(), async::use_blocked[ec],
+                    sys::error_code{EDOM, sys::system_category()}));
+  EXPECT_EQ(sys::error_code(EDOM, sys::system_category()), ec);
+
+  EXPECT_THROW(id(t.get_executor(), async::use_blocked,
+                 sys::error_code{EDOM, sys::system_category()}, 5),
+              sys::system_error);
+  EXPECT_NO_THROW(id(t.get_executor(), async::use_blocked[ec],
+                    sys::error_code{EDOM, sys::system_category()}, 5));
+  EXPECT_EQ(sys::error_code(EDOM, sys::system_category()), ec);
+
+  EXPECT_THROW(id(t.get_executor(), async::use_blocked,
+                 sys::error_code{EDOM, sys::system_category()}, 5, 3),
+              sys::system_error);
+  EXPECT_NO_THROW(id(t.get_executor(), async::use_blocked[ec],
+                    sys::error_code{EDOM, sys::system_category()}, 5, 3));
+  EXPECT_EQ(sys::error_code(EDOM, sys::system_category()), ec);
 }
 
 TEST(BlockedCompletion, MoveOnly)
 {
   context_thread t;
-  ba::steady_timer timer(t.io_context(), 1s);
-  bs::error_code ec;
+  asio::steady_timer timer(t.io_context(), 1s);
+  sys::error_code ec;
 
 
-  EXPECT_NO_THROW(id(t.get_executor(), ca::use_blocked,
-                        bs::error_code{}, move_only{}));
-  EXPECT_NO_THROW(id(t.get_executor(), ca::use_blocked[ec],
-                    bs::error_code{}, move_only{}));
+  EXPECT_NO_THROW(id(t.get_executor(), async::use_blocked,
+                        sys::error_code{}, move_only{}));
+  EXPECT_NO_THROW(id(t.get_executor(), async::use_blocked[ec],
+                    sys::error_code{}, move_only{}));
   EXPECT_FALSE(ec);
 
   {
-    auto [i, j] = id(t.get_executor(), ca::use_blocked, move_only{}, 5);
+    auto [i, j] = id(t.get_executor(), async::use_blocked, move_only{}, 5);
     EXPECT_EQ(j, 5);
   }
   {
-    auto [i, j] = id(t.get_executor(), ca::use_blocked[ec], move_only{}, 5);
+    auto [i, j] = id(t.get_executor(), async::use_blocked[ec], move_only{}, 5);
     EXPECT_EQ(j, 5);
   }
   EXPECT_FALSE(ec);
 
 
-  EXPECT_THROW(id(t.get_executor(), ca::use_blocked,
-                 bs::error_code{EDOM, bs::system_category()}, move_only{}),
-              bs::system_error);
-  EXPECT_NO_THROW(id(t.get_executor(), ca::use_blocked[ec],
-                    bs::error_code{EDOM, bs::system_category()}, move_only{}));
-  EXPECT_EQ(bs::error_code(EDOM, bs::system_category()), ec);
+  EXPECT_THROW(id(t.get_executor(), async::use_blocked,
+                 sys::error_code{EDOM, sys::system_category()}, move_only{}),
+              sys::system_error);
+  EXPECT_NO_THROW(id(t.get_executor(), async::use_blocked[ec],
+                    sys::error_code{EDOM, sys::system_category()}, move_only{}));
+  EXPECT_EQ(sys::error_code(EDOM, sys::system_category()), ec);
 
-  EXPECT_THROW(id(t.get_executor(), ca::use_blocked,
-                 bs::error_code{EDOM, bs::system_category()}, move_only{}, 3),
-              bs::system_error);
-  EXPECT_NO_THROW(id(t.get_executor(), ca::use_blocked[ec],
-                    bs::error_code{EDOM, bs::system_category()},
+  EXPECT_THROW(id(t.get_executor(), async::use_blocked,
+                 sys::error_code{EDOM, sys::system_category()}, move_only{}, 3),
+              sys::system_error);
+  EXPECT_NO_THROW(id(t.get_executor(), async::use_blocked[ec],
+                    sys::error_code{EDOM, sys::system_category()},
                     move_only{}, 3));
-  EXPECT_EQ(bs::error_code(EDOM, bs::system_category()), ec);
+  EXPECT_EQ(sys::error_code(EDOM, sys::system_category()), ec);
 }
 
 TEST(BlockedCompletion, DefaultLess)
 {
   context_thread t;
-  ba::steady_timer timer(t.io_context(), 1s);
-  bs::error_code ec;
+  asio::steady_timer timer(t.io_context(), 1s);
+  sys::error_code ec;
 
 
   {
-    auto l = id(t.get_executor(), ca::use_blocked, bs::error_code{}, defaultless{5});
+    auto l = id(t.get_executor(), async::use_blocked, sys::error_code{}, defaultless{5});
     EXPECT_EQ(5, l.a);
   }
   {
-    auto l = id(t.get_executor(), ca::use_blocked[ec], bs::error_code{}, defaultless{7});
+    auto l = id(t.get_executor(), async::use_blocked[ec], sys::error_code{}, defaultless{7});
     EXPECT_EQ(7, l.a);
   }
 
   {
-    auto [i, j] = id(t.get_executor(), ca::use_blocked, defaultless{3}, 5);
+    auto [i, j] = id(t.get_executor(), async::use_blocked, defaultless{3}, 5);
     EXPECT_EQ(i.a, 3);
     EXPECT_EQ(j, 5);
   }
   {
-    auto [i, j] = id(t.get_executor(), ca::use_blocked[ec], defaultless{3}, 5);
+    auto [i, j] = id(t.get_executor(), async::use_blocked[ec], defaultless{3}, 5);
     EXPECT_EQ(i.a, 3);
     EXPECT_EQ(j, 5);
   }
   EXPECT_FALSE(ec);
 
-  EXPECT_THROW(id(t.get_executor(), ca::use_blocked,
-                 bs::error_code{EDOM, bs::system_category()}, move_only{}),
-              bs::system_error);
-  EXPECT_NO_THROW(id(t.get_executor(), ca::use_blocked[ec],
-                    bs::error_code{EDOM, bs::system_category()}, move_only{}));
-  EXPECT_EQ(bs::error_code(EDOM, bs::system_category()), ec);
-
-  EXPECT_THROW(id(t.get_executor(), ca::use_blocked,
-                 bs::error_code{EDOM, bs::system_category()}, move_only{}, 3),
-              bs::system_error);
-  EXPECT_NO_THROW(id(t.get_executor(), ca::use_blocked[ec],
-                    bs::error_code{EDOM, bs::system_category()},
+  EXPECT_THROW(id(t.get_executor(), async::use_blocked,
+                 sys::error_code{EDOM, sys::system_category()}, move_only{}),
+              sys::system_error);
+  EXPECT_NO_THROW(id(t.get_executor(), async::use_blocked[ec],
+                    sys::error_code{EDOM, sys::system_category()}, move_only{}));
+  EXPECT_EQ(sys::error_code(EDOM, sys::system_category()), ec);
+
+  EXPECT_THROW(id(t.get_executor(), async::use_blocked,
+                 sys::error_code{EDOM, sys::system_category()}, move_only{}, 3),
+              sys::system_error);
+  EXPECT_NO_THROW(id(t.get_executor(), async::use_blocked[ec],
+                    sys::error_code{EDOM, sys::system_category()},
                     move_only{}, 3));
-  EXPECT_EQ(bs::error_code(EDOM, bs::system_category()), ec);
+  EXPECT_EQ(sys::error_code(EDOM, sys::system_category()), ec);
 }