]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
neorados: Use `asio::any_completion_handler`
authorAdam C. Emerson <aemerson@redhat.com>
Wed, 29 Mar 2023 05:35:22 +0000 (01:35 -0400)
committerAdam Emerson <aemerson@redhat.com>
Thu, 14 Sep 2023 21:38:12 +0000 (17:38 -0400)
As we'd like to reduce (and eliminate) internal Ceph dependencies to
the extent possible, now that Boost.Asio has a type-erased handler
type, let's use it.

Signed-off-by: Adam C. Emerson <aemerson@redhat.com>
src/include/neorados/RADOS.hpp
src/librados/IoCtxImpl.cc
src/mds/Server.cc
src/neorados/RADOS.cc
src/osdc/Objecter.cc
src/osdc/Objecter.h
src/test/librados_test_stub/NeoradosTestStub.cc

index b6bdbb460b64d0cd92439ef2cc6d4ed1a9b7c690..15bff7df7c40e3bcf6f2e71959caba1117e9a088 100644 (file)
 
 #include "include/neorados/RADOS_Decodable.hpp"
 
-// Needed for type erasure and template support. We can't really avoid
-// it.
-
-#include "common/async/completion.h"
-
 // These are needed for RGW, but in general as a 'shiny new interface'
 // we should try to use forward declarations and provide standard alternatives.
 
@@ -282,7 +277,7 @@ public:
 
   std::size_t size() const;
   using Signature = void(boost::system::error_code);
-  using Completion = ceph::async::Completion<Signature>;
+  using Completion = boost::asio::any_completion_handler<Signature>;
 
   friend std::ostream& operator <<(std::ostream& m, const Op& o);
 protected:
@@ -492,7 +487,7 @@ public:
   }
 
   using BuildSig = void(boost::system::error_code, RADOS);
-  using BuildComp = ceph::async::Completion<BuildSig>;
+  using BuildComp = boost::asio::any_completion_handler<BuildSig>;
   class Builder {
     std::optional<std::string> conf_files;
     std::optional<std::string> cluster;
@@ -525,31 +520,34 @@ public:
       return *this;
     }
 
-    template<typename CompletionToken>
+    template<boost::asio::completion_token_for<BuildSig> CompletionToken>
     auto build(boost::asio::io_context& ioctx, CompletionToken&& token) {
-      return boost::asio::async_initiate<CompletionToken, BuildSig>(
-       [&ioctx, this](auto&& handler) {
-         build(ioctx, BuildComp::create(ioctx.get_executor(),
-                                        std::move(handler)));
-       }, token);
+      auto consigned = boost::asio::consign(
+       std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+         boost::asio::get_associated_executor(token, ioctx.get_executor())));
+      return boost::asio::async_initiate<decltype(consigned), BuildSig>(
+       [&ioctx, this](auto handler) {
+         build_(ioctx, std::move(handler));
+       }, consigned);
     }
 
   private:
-    void build(boost::asio::io_context& ioctx,
-              std::unique_ptr<BuildComp> c);
+    void build_(boost::asio::io_context& ioctx,
+               BuildComp c);
   };
 
 
-  template<typename CompletionToken>
+  template<boost::asio::completion_token_for<BuildSig> CompletionToken>
   static auto make_with_cct(CephContext* cct,
                            boost::asio::io_context& ioctx,
                            CompletionToken&& token) {
-    return boost::asio::async_initiate<CompletionToken, BuildSig>(
+    auto consigned = boost::asio::consign(
+      std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+       boost::asio::get_associated_executor(token, ioctx.get_executor())));
+    return boost::asio::async_initiate<decltype(consigned), BuildSig>(
       [cct, &ioctx](auto&& handler) {
-       make_with_cct(cct, ioctx,
-                     BuildComp::create(ioctx.get_executor(),
-                                       std::move(handler)));
-      }, token);
+       make_with_cct_(cct, ioctx, std::move(handler));
+      }, consigned);
   }
 
   static RADOS make_with_librados(librados::Rados& rados);
@@ -568,171 +566,190 @@ public:
   executor_type get_executor() const;
   boost::asio::io_context& get_io_context();
 
-  template<typename CompletionToken>
+  template<boost::asio::completion_token_for<Op::Signature> CompletionToken>
   auto execute(Object o, IOContext ioc, ReadOp op,
               ceph::buffer::list* bl,
               CompletionToken&& token, uint64_t* objver = nullptr,
               const blkin_trace_info* trace_info = nullptr) {
-    return boost::asio::async_initiate<CompletionToken, Op::Signature>(
+    auto consigned = boost::asio::consign(
+      std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+       boost::asio::get_associated_executor(token, get_executor())));
+    return boost::asio::async_initiate<decltype(consigned), Op::Signature>(
       [o = std::move(o), ioc = std::move(ioc), op = std::move(op),
        bl, objver, trace_info, this](auto&& handler) mutable {
-       execute(std::move(o), std::move(ioc), std::move(op), bl,
-               ReadOp::Completion::create(get_executor(),
-                                          std::move(handler)),
-               objver, trace_info);
-      }, token);
+       execute_(std::move(o), std::move(ioc), std::move(op), bl,
+                std::move(handler), objver, trace_info);
+      }, consigned);
   }
 
-  template<typename CompletionToken>
+  template<boost::asio::completion_token_for<Op::Signature> CompletionToken>
   auto execute(Object o, IOContext ioc, WriteOp op,
               CompletionToken&& token, uint64_t* objver = nullptr,
               const blkin_trace_info* trace_info = nullptr) {
-    return boost::asio::async_initiate<CompletionToken, Op::Signature>(
+    auto consigned = boost::asio::consign(
+      std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+       boost::asio::get_associated_executor(token, get_executor())));
+    return boost::asio::async_initiate<decltype(consigned), Op::Signature>(
       [o = std::move(o), ioc = std::move(ioc), op = std::move(op),
        objver, trace_info, this](auto&& handler) mutable {
-       execute(std::move(o), std::move(ioc), std::move(op),
-               WriteOp::Completion::create(get_executor(),
-                                           std::move(handler)),
-               objver, trace_info);
-      }, token);
+       execute_(std::move(o), std::move(ioc), std::move(op),
+                std::move(handler), objver, trace_info);
+      }, consigned);
   }
 
   boost::uuids::uuid get_fsid() const noexcept;
 
   using LookupPoolSig = void(boost::system::error_code,
                             std::int64_t);
-  using LookupPoolComp = ceph::async::Completion<LookupPoolSig>;
-  template<typename CompletionToken>
+  using LookupPoolComp = boost::asio::any_completion_handler<LookupPoolSig>;
+  template<boost::asio::completion_token_for<LookupPoolSig> CompletionToken>
   auto lookup_pool(std::string name,
                   CompletionToken&& token) {
-    return boost::asio::async_initiate<CompletionToken, LookupPoolSig>(
+    auto consigned = boost::asio::consign(
+      std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+       boost::asio::get_associated_executor(token, get_executor())));
+    return boost::asio::async_initiate<decltype(consigned), LookupPoolSig>(
       [name = std::move(name), this](auto&& handler) mutable {
-       lookup_pool(std::move(name),
-                   LookupPoolComp::create(get_executor(),
-                                          std::move(handler)));
-      }, token);
+       lookup_pool_(std::move(name), std::move(handler));
+      }, consigned);
   }
 
   std::optional<uint64_t> get_pool_alignment(int64_t pool_id);
 
   using LSPoolsSig = void(std::vector<std::pair<std::int64_t, std::string>>);
-  using LSPoolsComp = ceph::async::Completion<LSPoolsSig>;
-  template<typename CompletionToken>
+  using LSPoolsComp = boost::asio::any_completion_handler<LSPoolsSig>;
+  template<boost::asio::completion_token_for<LSPoolsSig> CompletionToken>
   auto list_pools(CompletionToken&& token) {
-    return boost::asio::async_initiate<CompletionToken, LSPoolsSig>(
+    auto consigned = boost::asio::consign(
+      std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+       boost::asio::get_associated_executor(token, get_executor())));
+    return boost::asio::async_initiate<decltype(consigned), LSPoolsSig>(
       [this](auto&& handler) {
-       list_pools(LSPoolsComp::create(get_executor(),
-                                      std::move(handler)));
-      }, token);
+       list_pools_(std::move(handler));
+      }, consigned);
   }
 
   using SimpleOpSig = void(boost::system::error_code);
-  using SimpleOpComp = ceph::async::Completion<SimpleOpSig>;
-  template<typename CompletionToken>
+  using SimpleOpComp = boost::asio::any_completion_handler<SimpleOpSig>;
+  template<boost::asio::completion_token_for<SimpleOpSig> CompletionToken>
   auto create_pool_snap(int64_t pool, std::string snap_name,
                        CompletionToken&& token) {
-    return boost::asio::async_initiate<CompletionToken, SimpleOpSig>(
+    auto consigned = boost::asio::consign(
+      std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+       boost::asio::get_associated_executor(token, get_executor())));
+    return boost::asio::async_initiate<decltype(consigned), SimpleOpSig>(
       [pool, snap_name = std::move(snap_name), this](auto&& handler) mutable {
-       create_pool_snap(pool, std::move(snap_name),
-                        SimpleOpComp::create(get_executor(),
-                                             std::move(handler)));
-      }, token);
+       create_pool_snap_(pool, std::move(snap_name),
+                         std::move(handler));
+      }, consigned);
   }
 
   using SMSnapSig = void(boost::system::error_code, std::uint64_t);
-  using SMSnapComp = ceph::async::Completion<SMSnapSig>;
-  template<typename CompletionToken>
+  using SMSnapComp = boost::asio::any_completion_handler<SMSnapSig>;
+  template<boost::asio::completion_token_for<SMSnapSig> CompletionToken>
   auto allocate_selfmanaged_snap(int64_t pool,
                                 CompletionToken&& token) {
-    return boost::asio::async_initiate<CompletionToken, SMSnapSig>(
+    auto consigned = boost::asio::consign(
+      std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+       boost::asio::get_associated_executor(token, get_executor())));
+    return boost::asio::async_initiate<decltype(consigned), SMSnapSig>(
       [pool, this](auto&& handler) mutable {
-       allocage_selfmanaged_snap(pool,
-                                 SMSnapComp::create(get_executor(),
-                                                    std::move(handler)));
-      }, token);
+       allocage_selfmanaged_snap_(pool, std::move(handler));
+      }, consigned);
   }
 
-  template<typename CompletionToken>
+  template<boost::asio::completion_token_for<SimpleOpSig> CompletionToken>
   auto delete_pool_snap(int64_t pool, std::string snap_name,
                        CompletionToken&& token) {
-    return boost::asio::async_initiate<CompletionToken, SimpleOpSig>(
+    auto consigned = boost::asio::consign(
+      std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+       boost::asio::get_associated_executor(token, get_executor())));
+    return boost::asio::async_initiate<decltype(consigned), SimpleOpSig>(
       [pool, snap_name = std::move(snap_name), this](auto&& handler) mutable {
-       delete_pool_snap(pool, std::move(snap_name),
-                        SimpleOpComp::create(get_executor(),
-                                             std::move(handler)));
-      }, token);
+       delete_pool_snap_(pool, std::move(snap_name),
+                         std::move(handler));
+      }, consigned);
   }
 
-  template<typename CompletionToken>
+  template<boost::asio::completion_token_for<SimpleOpSig> CompletionToken>
   auto delete_selfmanaged_snap(int64_t pool, std::string snap_name,
                               CompletionToken&& token) {
-    return boost::asio::async_initiate<CompletionToken, SimpleOpSig>(
+    auto consigned = boost::asio::consign(
+      std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+       boost::asio::get_associated_executor(token, get_executor())));
+    return boost::asio::async_initiate<decltype(consigned), SimpleOpSig>(
       [pool, snap_name = std::move(snap_name), this](auto&& handler) mutable {
-       delete_selfmanaged_snap(pool, std::move(snap_name),
-                               SimpleOpComp::create(get_executor(),
-                                                    std::move(handler)));
-      }, token);
+       delete_selfmanaged_snap_(pool, std::move(snap_name),
+                                std::move(handler));
+      }, consigned);
   }
 
-  template<typename CompletionToken>
+  template<boost::asio::completion_token_for<SimpleOpSig> CompletionToken>
   auto create_pool(std::string name, std::optional<int> crush_rule,
                   CompletionToken&& token) {
-    return boost::asio::async_initiate<CompletionToken, SimpleOpSig>(
+    auto consigned = boost::asio::consign(
+      std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+       boost::asio::get_associated_executor(token, get_executor())));
+    return boost::asio::async_initiate<decltype(consigned), SimpleOpSig>(
       [name = std::move(name), crush_rule, this](auto&& handler) mutable {
-       create_pool(std::move(name), crush_rule,
-                   SimpleOpComp::create(get_executor(),
-                                        std::move(handler)));
-      }, token);
+       create_pool_(std::move(name), crush_rule,
+                    std::move(handler));
+      }, consigned);
   }
 
-  template<typename CompletionToken>
+  template<boost::asio::completion_token_for<SimpleOpSig> CompletionToken>
   auto delete_pool(std::string name,
                   CompletionToken&& token) {
-    return boost::asio::async_initiate<CompletionToken, SimpleOpSig>(
+    auto consigned = boost::asio::consign(
+      std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+       boost::asio::get_associated_executor(token, get_executor())));
+    return boost::asio::async_initiate<decltype(consigned), SimpleOpSig>(
       [name = std::move(name), this](auto&& handler) mutable {
-       delete_pool(std::move(name),
-                   SimpleOpComp::create(get_executor(),
-                                        std::move(handler)));
-      }, token);
+       delete_pool_(std::move(name), std::move(handler));
+      }, consigned);
   }
 
-  template<typename CompletionToken>
+  template<boost::asio::completion_token_for<SimpleOpSig> CompletionToken>
   auto delete_pool(int64_t pool,
                   CompletionToken&& token) {
-    return boost::asio::async_initiate<CompletionToken, SimpleOpSig>(
+    auto consigned = boost::asio::consign(
+      std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+       boost::asio::get_associated_executor(token, get_executor())));
+    return boost::asio::async_initiate<decltype(consigned), SimpleOpSig>(
       [pool, this](auto&& handler) mutable {
-       delete_pool(pool,
-                   SimpleOpComp::create(get_executor(),
-                                        std::move(handler)));
-      }, token);
+       delete_pool_(pool, std::move(handler));
+      }, consigned);
   }
 
   using PoolStatSig = void(boost::system::error_code,
                           boost::container::flat_map<std::string,
                                                      PoolStats>, bool);
-  using PoolStatComp = ceph::async::Completion<PoolStatSig>;
-  template<typename CompletionToken>
+  using PoolStatComp = boost::asio::any_completion_handler<PoolStatSig>;
+  template<boost::asio::completion_token_for<PoolStatSig> CompletionToken>
   auto stat_pools(std::vector<std::string> pools,
                  CompletionToken&& token) {
-    return boost::asio::async_initiate<CompletionToken, PoolStatSig>(
+    auto consigned = boost::asio::consign(
+      std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+       boost::asio::get_associated_executor(token, get_executor())));
+    return boost::asio::async_initiate<decltype(consigned), PoolStatSig>(
       [pools = std::move(pools), this](auto&& handler) mutable {
-       stat_pools(std::move(pools),
-                  PoolStatComp::create(get_executor(),
-                                       std::move(handler)));
-      }, token);
+       stat_pools_(std::move(pools), std::move(handler));
+      }, consigned);
   }
 
   using StatFSSig = void(boost::system::error_code,
                         FSStats);
-  using StatFSComp = ceph::async::Completion<StatFSSig>;
-  template<typename CompletionToken>
+  using StatFSComp = boost::asio::any_completion_handler<StatFSSig>;
+  template<boost::asio::completion_token_for<StatFSSig> CompletionToken>
   auto statfs(std::optional<int64_t> pool,
              CompletionToken&& token) {
-    return boost::asio::async_initiate<CompletionToken, StatFSSig>(
+    auto consigned = boost::asio::consign(
+      std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+       boost::asio::get_associated_executor(token, get_executor())));
+    return boost::asio::async_initiate<decltype(consigned), StatFSSig>(
       [pool, this](auto&& handler) mutable {
-       statfs(pool, StatFSComp::create(get_executor(),
-                                       std::move(handler)));
-      }, token);
+       statfs_(pool, std::move(handler));
+      }, consigned);
   }
 
   using WatchCB = fu2::unique_function<void(boost::system::error_code,
@@ -743,45 +760,50 @@ public:
 
   using WatchSig = void(boost::system::error_code ec,
                        uint64_t cookie);
-  using WatchComp = ceph::async::Completion<WatchSig>;
-  template<typename CompletionToken>
+  using WatchComp = boost::asio::any_completion_handler<WatchSig>;
+  template<boost::asio::completion_token_for<WatchSig> CompletionToken>
   auto watch(Object o, IOContext ioc,
             std::optional<std::chrono::seconds> timeout,
             WatchCB cb, CompletionToken&& token) {
-    return boost::asio::async_initiate<CompletionToken, WatchSig>(
+    auto consigned = boost::asio::consign(
+      std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+       boost::asio::get_associated_executor(token, get_executor())));
+    return boost::asio::async_initiate<decltype(consigned), WatchSig>(
       [o = std::move(o), ioc = std::move(ioc), timeout, cb = std::move(cb),
        this](auto&& handler) mutable {
-       watch(std::move(o), std::move(ioc), timeout, std::move(cb),
-             WatchComp::create(get_executor(),
-                               std::move(handler)));
-      }, token);
+       watch_(std::move(o), std::move(ioc), timeout, std::move(cb),
+              std::move(handler));
+      }, consigned);
   }
 
-  template<typename CompletionToken>
+  template<boost::asio::completion_token_for<SimpleOpSig> CompletionToken>
   auto notify_ack(Object o,
                  IOContext ioc,
                  uint64_t notify_id,
                  uint64_t cookie,
                  ceph::buffer::list bl,
                  CompletionToken&& token) {
-    return boost::asio::async_initiate<CompletionToken, SimpleOpSig>(
+    auto consigned = boost::asio::consign(
+      std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+       boost::asio::get_associated_executor(token, get_executor())));
+    return boost::asio::async_initiate<decltype(consigned), SimpleOpSig>(
       [o = std::move(o), ioc = std::move(ioc), notify_id, cookie,
        bl = std::move(bl), this](auto&& handler) mutable {
-       notify_ack(std::move(o), std::move(ioc), notify_id, std::move(cookie),
-                  std::move(bl), SimpleOpComp::create(get_executor(),
-                                                      std::move(handler)));
-      }, token);
+       notify_ack_(std::move(o), std::move(ioc), std::move(notify_id),
+                   std::move(cookie), std::move(bl), std::move(handler));
+      }, consigned);
   }
 
-  template<typename CompletionToken>
+  template<boost::asio::completion_token_for<SimpleOpSig> CompletionToken>
   auto unwatch(std::uint64_t cookie, IOContext ioc,
               CompletionToken&& token) {
-    return boost::asio::async_initiate<CompletionToken, SimpleOpSig>(
+    auto consigned = boost::asio::consign(
+      std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+       boost::asio::get_associated_executor(token, get_executor())));
+    return boost::asio::async_initiate<decltype(consigned), SimpleOpSig>(
       [cookie, ioc = std::move(ioc), this](auto&& handler) mutable {
-       unwatch(cookie, std::move(ioc),
-               SimpleOpComp::create(get_executor(),
-                                    std::move(handler)));
-      }, token);
+       unwatch_(cookie, std::move(ioc), std::move(handler));
+      }, consigned);
   }
 
   // This is one of those places where having to force everything into
@@ -789,29 +811,33 @@ public:
   // let us separate out the implementation details without
   // sacrificing all the benefits of templates.
   using VoidOpSig = void();
-  using VoidOpComp = ceph::async::Completion<VoidOpSig>;
-  template<typename CompletionToken>
+  using VoidOpComp = boost::asio::any_completion_handler<VoidOpSig>;
+  template<boost::asio::completion_token_for<VoidOpSig> CompletionToken>
   auto flush_watch(CompletionToken&& token) {
-    return boost::asio::async_initiate<CompletionToken, VoidOpSig>(
+    auto consigned = boost::asio::consign(
+      std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+       boost::asio::get_associated_executor(token, get_executor())));
+    return boost::asio::async_initiate<decltype(consigned), VoidOpSig>(
       [this](auto&& handler) {
-       flush_watch(VoidOpComp::create(get_executor(),
-                                      std::move(handler)));
-      }, token);
+       flush_watch_(std::move(handler));
+      }, consigned);
   }
 
   using NotifySig = void(boost::system::error_code, ceph::buffer::list);
-  using NotifyComp = ceph::async::Completion<NotifySig>;
-  template<typename CompletionToken>
+  using NotifyComp = boost::asio::any_completion_handler<NotifySig>;
+  template<boost::asio::completion_token_for<NotifySig> CompletionToken>
   auto notify(Object o, IOContext ioc, ceph::buffer::list bl,
              std::optional<std::chrono::milliseconds> timeout,
              CompletionToken&& token) {
-    return boost::asio::async_initiate<CompletionToken, NotifySig>(
+    auto consigned = boost::asio::consign(
+      std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+       boost::asio::get_associated_executor(token, get_executor())));
+    return boost::asio::async_initiate<decltype(consigned), NotifySig>(
       [o = std::move(o), ioc = std::move(ioc), bl = std::move(bl), timeout,
        this](auto&& handler) mutable {
-       notify(std::move(o), std::move(ioc), std::move(bl), timeout,
-              NotifyComp::create(get_executor(),
-                                 std::move(handler)));
-      }, token);
+       notify_(std::move(o), std::move(ioc), std::move(bl), timeout,
+               std::move(handler));
+      }, consigned);
   }
 
   // The versions with pointers are fine for coroutines, but
@@ -819,94 +845,108 @@ public:
   using EnumerateSig = void(boost::system::error_code,
                            std::vector<Entry>,
                            Cursor);
-  using EnumerateComp = ceph::async::Completion<EnumerateSig>;
-  template<typename CompletionToken>
+  using EnumerateComp = boost::asio::any_completion_handler<EnumerateSig>;
+  template<boost::asio::completion_token_for<EnumerateSig> CompletionToken>
   auto enumerate_objects(IOContext ioc, Cursor begin,
                         Cursor end, const std::uint32_t max,
                         ceph::buffer::list filter,
                         CompletionToken&& token) {
-    return boost::asio::async_initiate<CompletionToken, EnumerateSig>(
+    auto consigned = boost::asio::consign(
+      std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+       boost::asio::get_associated_executor(token, get_executor())));
+    return boost::asio::async_initiate<decltype(consigned), EnumerateSig>(
       [ioc = std::move(ioc), begin = std::move(begin), end = std::move(end),
        max, filter = std::move(filter), this](auto&& handler) mutable {
-       enumerate_objects(std::move(ioc), std::move(begin), std::move(end),
-                         std::move(max), std::move(filter),
-                         EnumerateComp::create(get_executor(),
-                                               std::move(handler)));
-      }, token);
+       enumerate_objects_(std::move(ioc), std::move(begin), std::move(end),
+                          std::move(max), std::move(filter),
+                          std::move(handler));
+      }, consigned);
   }
 
   using CommandSig = void(boost::system::error_code,
                          std::string, ceph::buffer::list);
-  using CommandComp = ceph::async::Completion<CommandSig>;
-  template<typename CompletionToken>
+  using CommandComp = boost::asio::any_completion_handler<CommandSig>;
+  template<boost::asio::completion_token_for<CommandSig> CompletionToken>
   auto osd_command(int osd, std::vector<std::string> cmd,
                   ceph::buffer::list in, CompletionToken&& token) {
-    return boost::asio::async_initiate<CompletionToken, CommandSig>(
+    auto consigned = boost::asio::consign(
+      std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+       boost::asio::get_associated_executor(token, get_executor())));
+    return boost::asio::async_initiate<decltype(consigned), CommandSig>(
       [osd, cmd = std::move(cmd), in = std::move(in),
        this](auto&& handler) mutable {
-       osd_command(osd, std::move(cmd), std::move(in),
-                   CommandComp::create(get_executor(),
-                                       std::move(handler)));
-      }, token);
+       osd_command_(osd, std::move(cmd), std::move(in),
+                    std::move(handler));
+      }, consigned);
   }
-  template<typename CompletionToken>
+  template<boost::asio::completion_token_for<CommandSig> CompletionToken>
   auto pg_command(PG pg, std::vector<std::string> cmd,
                  ceph::buffer::list in, CompletionToken&& token) {
-    return boost::asio::async_initiate<CompletionToken, CommandSig>(
+    auto consigned = boost::asio::consign(
+      std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+       boost::asio::get_associated_executor(token, get_executor())));
+    return boost::asio::async_initiate<decltype(consigned), CommandSig>(
       [pg = std::move(pg), cmd = std::move(cmd), in = std::move(in),
        this](auto&& handler) mutable {
-       pg_command(std::move(pg), std::move(cmd), std::move(in),
-                  CommandComp::create(get_executor(),
-                                      std::move(handler)));
-      }, token);
+       pg_command_(std::move(pg), std::move(cmd), std::move(in),
+                   std::move(handler));
+      }, consigned);
   }
 
-  template<typename CompletionToken>
+  template<boost::asio::completion_token_for<SimpleOpSig> CompletionToken>
   auto mon_command(std::vector<std::string> command,
                   ceph::buffer::list&& bl,
                   std::string* outs, ceph::buffer::list* outbl,
                   CompletionToken&& token) {
-    return boost::asio::async_initiate<CompletionToken, SimpleOpSig>(
+    auto consigned = boost::asio::consign(
+      std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+       boost::asio::get_associated_executor(token, get_executor())));
+    return boost::asio::async_initiate<decltype(consigned), SimpleOpSig>(
       [command = std::move(command), bl = std::move(bl), outs, outbl,
        this](auto&& handler) mutable {
-       mon_command(std::move(command), std::move(bl), outs, outbl,
-                   SimpleOpComp::create(get_executor(),
-                                        std::move(handler)));
-      }, token);
+       mon_command_(std::move(command), std::move(bl), outs, outbl,
+                    std::move(handler));
+      }, consigned);
   }
 
-  template<typename CompletionToken>
+  template<boost::asio::completion_token_for<SimpleOpSig> CompletionToken>
   auto enable_application(std::string pool, std::string app_name,
                          bool force, CompletionToken&& token) {
-    return boost::asio::async_initiate<CompletionToken, SimpleOpSig>(
+    auto consigned = boost::asio::consign(
+      std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+       boost::asio::get_associated_executor(token, get_executor())));
+    return boost::asio::async_initiate<decltype(consigned), SimpleOpSig>(
       [pool = std::move(pool), app_name = std::move(app_name),
        force, this](auto&& handler) mutable {
-       enable_application(std::move(pool), std::move(app_name), force,
-                          SimpleOpComp::create(get_executor(),
-                                               std::move(handler)));
-      }, token);
+       enable_application_(std::move(pool), std::move(app_name), force,
+                           std::move(handler));
+      }, consigned);
   }
 
-  template<typename CompletionToken>
+  template<boost::asio::completion_token_for<SimpleOpSig> CompletionToken>
   auto blocklist_add(std::string client_address,
                      std::optional<std::chrono::seconds> expire,
                      CompletionToken&& token) {
-    return boost::asio::async_initiate<CompletionToken, SimpleOpSig>(
+    auto consigned = boost::asio::consign(
+      std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+       boost::asio::get_associated_executor(token, get_executor())));
+    return boost::asio::async_initiate<decltype(consigned), SimpleOpSig>(
       [client_address = std::move(client_address), expire,
        this](auto&& handler) mutable {
-       blocklist_add(std::move(client_address), expire,
-                     SimpleOpComp::create(get_executor(),
-                                          std::move(handler)));
-      }, token);
+       blocklist_add_(std::move(client_address), expire,
+                      std::move(handler));
+      }, consigned);
   }
 
-  template<typename CompletionToken>
+  template<boost::asio::completion_token_for<SimpleOpSig> CompletionToken>
   auto wait_for_latest_osd_map(CompletionToken&& token) {
-    return boost::asio::async_initiate<CompletionToken, SimpleOpSig>(
+    auto consigned = boost::asio::consign(
+      std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+       boost::asio::get_associated_executor(token, get_executor())));
+    return boost::asio::async_initiate<decltype(consigned), SimpleOpSig>(
       [this](auto&& handler) {
-       wait_for_latest_osd_map(SimpleOpComp::create(get_executor(),
-                                                    std::move(handler)));
-      }, token);
+       wait_for_latest_osd_map_(std::move(handler));
+      }, consigned);
   }
 
   uint64_t instance_id() const;
@@ -918,85 +958,85 @@ private:
   friend Builder;
 
   RADOS(std::unique_ptr<detail::Client> impl);
-  static void make_with_cct(CephContext* cct,
-                           boost::asio::io_context& ioctx,
-                   std::unique_ptr<BuildComp> c);
-
-  void execute(Object o, IOContext ioc, ReadOp op,
-              ceph::buffer::list* bl, std::unique_ptr<Op::Completion> c,
-              uint64_t* objver, const blkin_trace_info* trace_info);
-
-  void execute(Object o, IOContext ioc, WriteOp op,
-              std::unique_ptr<Op::Completion> c, uint64_t* objver,
-              const blkin_trace_info* trace_info);
-
-
-  void lookup_pool(std::string name, std::unique_ptr<LookupPoolComp> c);
-  void list_pools(std::unique_ptr<LSPoolsComp> c);
-  void create_pool_snap(int64_t pool, std::string snap_name,
-                       std::unique_ptr<SimpleOpComp> c);
-  void allocate_selfmanaged_snap(int64_t pool, std::unique_ptr<SMSnapComp> c);
-  void delete_pool_snap(int64_t pool, std::string snap_name,
-                       std::unique_ptr<SimpleOpComp> c);
-  void delete_selfmanaged_snap(int64_t pool, std::uint64_t snap,
-                              std::unique_ptr<SimpleOpComp> c);
-  void create_pool(std::string name, std::optional<int> crush_rule,
-                  std::unique_ptr<SimpleOpComp> c);
-  void delete_pool(std::string name,
-                  std::unique_ptr<SimpleOpComp> c);
-  void delete_pool(int64_t pool,
-                  std::unique_ptr<SimpleOpComp> c);
-  void stat_pools(std::vector<std::string> pools,
-                 std::unique_ptr<PoolStatComp> c);
-  void stat_fs(std::optional<std::int64_t> pool,
-              std::unique_ptr<StatFSComp> c);
-
-  void watch(Object o, IOContext ioc,
-            std::optional<std::chrono::seconds> timeout,
-            WatchCB cb, std::unique_ptr<WatchComp> c);
+  static void make_with_cct_(CephContext* cct,
+                            boost::asio::io_context& ioctx,
+                            BuildComp c);
+
+  void execute_(Object o, IOContext ioc, ReadOp op,
+               ceph::buffer::list* bl, Op::Completion c,
+               uint64_t* objver, const blkin_trace_info* trace_info);
+
+  void execute_(Object o, IOContext ioc, WriteOp op,
+               Op::Completion c, uint64_t* objver,
+               const blkin_trace_info* trace_info);
+
+
+  void lookup_pool_(std::string name, LookupPoolComp c);
+  void list_pools_(LSPoolsComp c);
+  void create_pool_snap_(int64_t pool, std::string snap_name,
+                        SimpleOpComp c);
+  void allocate_selfmanaged_snap_(int64_t pool, SMSnapComp c);
+  void delete_pool_snap_(int64_t pool, std::string snap_name,
+                        SimpleOpComp c);
+  void delete_selfmanaged_snap_(int64_t pool, std::uint64_t snap,
+                               SimpleOpComp c);
+  void create_pool_(std::string name, std::optional<int> crush_rule,
+                   SimpleOpComp c);
+  void delete_pool_(std::string name,
+                   SimpleOpComp c);
+  void delete_pool_(int64_t pool,
+                   SimpleOpComp c);
+  void stat_pools_(std::vector<std::string> pools,
+                  PoolStatComp c);
+  void stat_fs_(std::optional<std::int64_t> pool,
+               StatFSComp c);
+
+  void watch_(Object o, IOContext ioc,
+             std::optional<std::chrono::seconds> timeout,
+             WatchCB cb, WatchComp c);
   tl::expected<ceph::timespan, boost::system::error_code>
-  watch_check(uint64_t cookie);
-  void notify_ack(Object o, IOContext _ioc,
-                 uint64_t notify_id,
-                 uint64_t cookie,
-                 ceph::buffer::list bl,
-                 std::unique_ptr<SimpleOpComp>);
-  void unwatch(uint64_t cookie, IOContext ioc,
-              std::unique_ptr<SimpleOpComp>);
-  void notify(Object oid, IOContext ioctx,
-             ceph::buffer::list bl,
-             std::optional<std::chrono::milliseconds> timeout,
-             std::unique_ptr<NotifyComp> c);
-  void flush_watch(std::unique_ptr<VoidOpComp>);
-
-  void enumerate_objects(IOContext ioc, Cursor begin,
-                        Cursor end, std::uint32_t max,
-                        ceph::buffer::list filter,
-                        std::vector<Entry>* ls,
-                        Cursor* cursor,
-                        std::unique_ptr<SimpleOpComp> c);
-  void enumerate_objects(IOContext ioc, Cursor begin,
-                        Cursor end, std::uint32_t max,
-                        ceph::buffer::list filter,
-                        std::unique_ptr<EnumerateComp> c);
-  void osd_command(int osd, std::vector<std::string> cmd,
-                  ceph::buffer::list in, std::unique_ptr<CommandComp> c);
-  void pg_command(PG pg, std::vector<std::string> cmd,
-                 ceph::buffer::list in, std::unique_ptr<CommandComp> c);
-
-  void mon_command(std::vector<std::string> command,
+  watch_check_(uint64_t cookie);
+  void notify_ack_(Object o, IOContext _ioc,
+                  uint64_t notify_id,
+                  uint64_t cookie,
                   ceph::buffer::list bl,
-                  std::string* outs, ceph::buffer::list* outbl,
-                  std::unique_ptr<SimpleOpComp> c);
-
-  void enable_application(std::string pool, std::string app_name,
-                         bool force, std::unique_ptr<SimpleOpComp> c);
-
-  void blocklist_add(std::string client_address,
-                     std::optional<std::chrono::seconds> expire,
-                     std::unique_ptr<SimpleOpComp> c);
-
-  void wait_for_latest_osd_map(std::unique_ptr<SimpleOpComp> c);
+                  SimpleOpComp);
+  void unwatch_(uint64_t cookie, IOContext ioc,
+               SimpleOpComp);
+  void notify_(Object oid, IOContext ioctx,
+              ceph::buffer::list bl,
+              std::optional<std::chrono::milliseconds> timeout,
+              NotifyComp c);
+  void flush_watch_(VoidOpComp);
+
+  void enumerate_objects_(IOContext ioc, Cursor begin,
+                         Cursor end, std::uint32_t max,
+                         ceph::buffer::list filter,
+                         std::vector<Entry>* ls,
+                         Cursor* cursor,
+                         SimpleOpComp c);
+  void enumerate_objects_(IOContext ioc, Cursor begin,
+                         Cursor end, std::uint32_t max,
+                         ceph::buffer::list filter,
+                         EnumerateComp c);
+  void osd_command_(int osd, std::vector<std::string> cmd,
+                   ceph::buffer::list in, CommandComp c);
+  void pg_command_(PG pg, std::vector<std::string> cmd,
+                  ceph::buffer::list in, CommandComp c);
+
+  void mon_command_(std::vector<std::string> command,
+                   ceph::buffer::list bl,
+                   std::string* outs, ceph::buffer::list* outbl,
+                   SimpleOpComp c);
+
+  void enable_application_(std::string pool, std::string app_name,
+                          bool force, SimpleOpComp c);
+
+  void blocklist_add_(std::string client_address,
+                     std::optional<std::chrono::seconds> expire,
+                     SimpleOpComp c);
+
+  void wait_for_latest_osd_map_(SimpleOpComp c);
 
   // Proxy object to provide access to low-level RADOS messaging clients
   std::unique_ptr<detail::Client> impl;
index e1d38fd014a4ba30349987a38dbd53a2f21f633a..d66b56560f9c3bdba853b5f1bb475d43a40a6fcc 100644 (file)
@@ -1788,9 +1788,12 @@ int librados::IoCtxImpl::notify(const object_t& oid, bufferlist& bl,
                                                             extra_op_flags);
 
   C_SaferCond notify_finish_cond;
+  auto e = boost::asio::prefer(
+    objecter->service.get_executor(),
+    boost::asio::execution::outstanding_work.tracked);
   linger_op->on_notify_finish =
-    Objecter::LingerOp::OpComp::create(
-      objecter->service.get_executor(),
+    boost::asio::bind_executor(
+      std::move(e),
       CB_notify_Finish(client->cct, &notify_finish_cond,
                        objecter, linger_op, preply_bl,
                        preply_buf, preply_buf_len));
@@ -1844,9 +1847,12 @@ int librados::IoCtxImpl::aio_notify(const object_t& oid, AioCompletionImpl *c,
   c->io = this;
 
   C_aio_notify_Complete *oncomplete = new C_aio_notify_Complete(c, linger_op);
+  auto e = boost::asio::prefer(
+    objecter->service.get_executor(),
+    boost::asio::execution::outstanding_work.tracked);
   linger_op->on_notify_finish =
-    Objecter::LingerOp::OpComp::create(
-      objecter->service.get_executor(),
+    boost::asio::bind_executor(
+      std::move(e),
       CB_notify_Finish(client->cct, oncomplete,
                        objecter, linger_op,
                        preply_bl, preply_buf,
index 90b8f185950ee68f03aa5e7fbc86d3d1b64b65f3..b69d33e60045347e13dcc467440165f2dc2f31eb 100644 (file)
@@ -6008,8 +6008,11 @@ int Server::check_layout_vxattr(MDRequestRef& mdr,
       // latest map. One day if COMPACT_VERSION of MClientRequest >=3,
       // we can remove those code.
       mdr->waited_for_osdmap = true;
-      mds->objecter->wait_for_latest_osdmap(std::ref(*new C_IO_Wrapper(
-        mds, new C_MDS_RetryRequest(mdcache, mdr))));
+      mds->objecter->wait_for_latest_osdmap(
+       [c = new C_IO_Wrapper(mds, new C_MDS_RetryRequest(mdcache, mdr))]
+       (boost::system::error_code ec) {
+         c->complete(ceph::from_error_code(ec));
+       });
       return r;
     }
   }
index d156a49e8a501546092027b9837c40b19be66a2a..ece5697a03fcff5e92f09531c3c5bec220f27204 100644 (file)
@@ -39,9 +39,9 @@
 
 using namespace std::literals;
 
+namespace asio = boost::asio;
 namespace bc = boost::container;
 namespace bs = boost::system;
-namespace ca = ceph::async;
 namespace cb = ceph::buffer;
 
 namespace neorados {
@@ -742,8 +742,8 @@ RADOS::Builder& RADOS::Builder::add_conf_file(std::string_view f) {
   return *this;
 }
 
-void RADOS::Builder::build(boost::asio::io_context& ioctx,
-                          std::unique_ptr<BuildComp> c) {
+void RADOS::Builder::build_(asio::io_context& ioctx,
+                           BuildComp c) {
   constexpr auto env = CODE_ENVIRONMENT_LIBRARY;
   CephInitParameters ci(env);
   if (name)
@@ -771,7 +771,9 @@ void RADOS::Builder::build(boost::asio::io_context& ioctx,
     auto r = cct->_conf.parse_config_files(conf_files ? conf_files->data() : nullptr,
                                           &ss, flags);
     if (r < 0)
-      c->post(std::move(c), ceph::to_error_code(r), RADOS{nullptr});
+      asio::post(ioctx.get_executor(),
+                asio::append(std::move(c), ceph::to_error_code(r),
+                             RADOS{nullptr}));
   }
 
   cct->_conf.parse_env(cct->get_module_type());
@@ -780,7 +782,9 @@ void RADOS::Builder::build(boost::asio::io_context& ioctx,
     std::stringstream ss;
     auto r = cct->_conf.set_val(n, v, &ss);
     if (r < 0)
-      c->post(std::move(c), ceph::to_error_code(-EINVAL), RADOS{nullptr});
+      asio::post(ioctx.get_executor(),
+                asio::append(std::move(c), ceph::to_error_code(-EINVAL),
+                             RADOS{nullptr}));
   }
 
   if (!no_mon_conf) {
@@ -788,7 +792,9 @@ void RADOS::Builder::build(boost::asio::io_context& ioctx,
     // TODO This function should return an error code.
     auto err = mc_bootstrap.get_monmap_and_config();
     if (err < 0)
-      c->post(std::move(c), ceph::to_error_code(err), RADOS{nullptr});
+      asio::post(ioctx.get_executor(),
+                asio::append(std::move(c), ceph::to_error_code(err),
+                             RADOS{nullptr}));
   }
   if (!cct->_log->is_started()) {
     cct->_log->start();
@@ -798,18 +804,19 @@ void RADOS::Builder::build(boost::asio::io_context& ioctx,
   RADOS::make_with_cct(cct, ioctx, std::move(c));
 }
 
-void RADOS::make_with_cct(CephContext* cct,
-                         boost::asio::io_context& ioctx,
-                         std::unique_ptr<BuildComp> c) {
+void RADOS::make_with_cct_(CephContext* cct,
+                          asio::io_context& ioctx,
+                          BuildComp c) {
   try {
     auto r = new detail::NeoClient{std::make_unique<detail::RADOS>(ioctx, cct)};
     r->objecter->wait_for_osd_map(
       [c = std::move(c), r = std::unique_ptr<detail::Client>(r)]() mutable {
-       c->dispatch(std::move(c), bs::error_code{},
-                   RADOS{std::move(r)});
+       asio::dispatch(asio::append(std::move(c), bs::error_code{},
+                                   RADOS{std::move(r)}));
       });
   } catch (const bs::system_error& err) {
-    c->post(std::move(c), err.code(), RADOS{nullptr});
+    asio::post(ioctx.get_executor(),
+              asio::append(std::move(c), err.code(), RADOS{nullptr}));
   }
 }
 
@@ -831,14 +838,14 @@ RADOS::executor_type RADOS::get_executor() const {
   return impl->ioctx.get_executor();
 }
 
-boost::asio::io_context& RADOS::get_io_context() {
+asio::io_context& RADOS::get_io_context() {
   return impl->ioctx;
 }
 
-void RADOS::execute(Object o, IOContext _ioc, ReadOp _op,
-                   cb::list* bl,
-                   std::unique_ptr<ReadOp::Completion> c, version_t* objver,
-                   const blkin_trace_info *trace_info) {
+void RADOS::execute_(Object o, IOContext _ioc, ReadOp _op,
+                    cb::list* bl,
+                    ReadOp::Completion c, version_t* objver,
+                    const blkin_trace_info *trace_info) {
   auto oid = reinterpret_cast<const object_t*>(&o.impl);
   auto ioc = reinterpret_cast<const IOContextImpl*>(&_ioc.impl);
   auto op = reinterpret_cast<OpImpl*>(&_op.impl);
@@ -858,9 +865,9 @@ void RADOS::execute(Object o, IOContext _ioc, ReadOp _op,
   trace.event("submitted");
 }
 
-void RADOS::execute(Object o, IOContext _ioc, WriteOp _op,
-                   std::unique_ptr<WriteOp::Completion> c, version_t* objver,
-                   const blkin_trace_info *trace_info) {
+void RADOS::execute_(Object o, IOContext _ioc, WriteOp _op,
+                    WriteOp::Completion c, version_t* objver,
+                    const blkin_trace_info *trace_info) {
   auto oid = reinterpret_cast<const object_t*>(&o.impl);
   auto ioc = reinterpret_cast<const IOContextImpl*>(&_ioc.impl);
   auto op = reinterpret_cast<OpImpl*>(&_op.impl);
@@ -885,8 +892,8 @@ void RADOS::execute(Object o, IOContext _ioc, WriteOp _op,
   trace.event("submitted");
 }
 
-void RADOS::lookup_pool(std::string name,
-                       std::unique_ptr<LookupPoolComp> c)
+void RADOS::lookup_pool_(std::string name,
+                        LookupPoolComp c)
 {
   // I kind of want to make lookup_pg_pool return
   // std::optional<int64_t> since it can only return one error code.
@@ -903,16 +910,18 @@ void RADOS::lookup_pool(std::string name,
            return osdmap.lookup_pg_pool_name(name);
          });
        if (ret < 0)
-         ca::dispatch(std::move(c), osdc_errc::pool_dne,
-                      std::int64_t(0));
+         asio::dispatch(asio::append(std::move(c), osdc_errc::pool_dne,
+                                     std::int64_t(0)));
        else
-         ca::dispatch(std::move(c), bs::error_code{}, ret);
+         asio::dispatch(asio::append(std::move(c), bs::error_code{}, ret));
       });
   } else if (ret < 0) {
-    ca::post(std::move(c), osdc_errc::pool_dne,
-                std::int64_t(0));
+    asio::post(get_executor(),
+              asio::append(std::move(c), osdc_errc::pool_dne,
+                           std::int64_t(0)));
   } else {
-    ca::post(std::move(c), bs::error_code{}, ret);
+    asio::post(get_executor(),
+              asio::append(std::move(c), bs::error_code{}, ret));
   }
 }
 
@@ -933,108 +942,125 @@ std::optional<uint64_t> RADOS::get_pool_alignment(int64_t pool_id)
     });
 }
 
-void RADOS::list_pools(std::unique_ptr<LSPoolsComp> c) {
-  ca::dispatch(std::move(c),
-              impl->objecter->with_osdmap(
-                [&](OSDMap& o) {
-                  std::vector<std::pair<std::int64_t, std::string>> v;
-                  for (auto p : o.get_pools())
-                    v.push_back(std::make_pair(p.first,
-                                               o.get_pool_name(p.first)));
-                  return v;
-                }));
+void RADOS::list_pools_(LSPoolsComp c) {
+  asio::dispatch(asio::append(std::move(c),
+                             impl->objecter->with_osdmap(
+                               [&](OSDMap& o) {
+                                 std::vector<std::pair<std::int64_t, std::string>> v;
+                                 for (auto p : o.get_pools())
+                                   v.push_back(std::make_pair(p.first,
+                                                              o.get_pool_name(p.first)));
+                                 return v;
+                               })));
 }
 
-void RADOS::create_pool_snap(std::int64_t pool,
-                            std::string snap_name,
-                            std::unique_ptr<SimpleOpComp> c)
+void RADOS::create_pool_snap_(std::int64_t pool,
+                             std::string snap_name,
+                             SimpleOpComp c)
 {
+  auto e = asio::prefer(get_executor(),
+                       asio::execution::outstanding_work.tracked);
   impl->objecter->create_pool_snap(
     pool, snap_name,
-    Objecter::PoolOp::OpComp::create(
-      get_executor(),
+    asio::bind_executor(
+      std::move(e),
       [c = std::move(c)](bs::error_code e, const bufferlist&) mutable {
-       ca::dispatch(std::move(c), e);
+       asio::dispatch(asio::append(std::move(c), e));
       }));
 }
 
-void RADOS::allocate_selfmanaged_snap(int64_t pool,
-                                     std::unique_ptr<SMSnapComp> c) {
+void RADOS::allocate_selfmanaged_snap_(int64_t pool,
+                                      SMSnapComp c) {
+  auto e = asio::prefer(
+    get_executor(),
+    asio::execution::outstanding_work.tracked);
+
   impl->objecter->allocate_selfmanaged_snap(
     pool,
-    ca::Completion<void(bs::error_code, snapid_t)>::create(
-      get_executor(),
+    asio::bind_executor(
+      std::move(e),
       [c = std::move(c)](bs::error_code e, snapid_t snap) mutable {
-       ca::dispatch(std::move(c), e, snap);
+       asio::dispatch(asio::append(std::move(c), e, snap));
       }));
 }
 
-void RADOS::delete_pool_snap(std::int64_t pool,
-                            std::string snap_name,
-                            std::unique_ptr<SimpleOpComp> c)
+void RADOS::delete_pool_snap_(std::int64_t pool,
+                             std::string snap_name,
+                             SimpleOpComp c)
 {
+  auto e = asio::prefer(get_executor(),
+                       asio::execution::outstanding_work.tracked);
   impl->objecter->delete_pool_snap(
     pool, snap_name,
-    Objecter::PoolOp::OpComp::create(
-      get_executor(),
+    asio::bind_executor(
+      std::move(e),
       [c = std::move(c)](bs::error_code e, const bufferlist&) mutable {
-       ca::dispatch(std::move(c), e);
+       asio::dispatch(asio::append(std::move(c), e));
       }));
 }
 
-void RADOS::delete_selfmanaged_snap(std::int64_t pool,
-                                   std::uint64_t snap,
-                                   std::unique_ptr<SimpleOpComp> c)
+void RADOS::delete_selfmanaged_snap_(std::int64_t pool,
+                                    std::uint64_t snap,
+                                    SimpleOpComp c)
 {
+  auto e = asio::prefer(get_executor(),
+                       asio::execution::outstanding_work.tracked);
   impl->objecter->delete_selfmanaged_snap(
     pool, snap,
-    Objecter::PoolOp::OpComp::create(
-      get_executor(),
+    asio::bind_executor(
+      std::move(e),
       [c = std::move(c)](bs::error_code e, const bufferlist&) mutable {
-       ca::dispatch(std::move(c), e);
+       asio::dispatch(asio::append(std::move(c), e));
       }));
 }
 
-void RADOS::create_pool(std::string name,
-                       std::optional<int> crush_rule,
-                       std::unique_ptr<SimpleOpComp> c)
+void RADOS::create_pool_(std::string name,
+                        std::optional<int> crush_rule,
+                        SimpleOpComp c)
 {
+  auto e = asio::prefer(get_executor(),
+                       asio::execution::outstanding_work.tracked);
+
   impl->objecter->create_pool(
     name,
-    Objecter::PoolOp::OpComp::create(
-      get_executor(),
+    asio::bind_executor(
+      std::move(e),
       [c = std::move(c)](bs::error_code e, const bufferlist&) mutable {
-       ca::dispatch(std::move(c), e);
+       asio::dispatch(asio::append(std::move(c), e));
       }),
       crush_rule.value_or(-1));
 }
 
-void RADOS::delete_pool(std::string name,
-                       std::unique_ptr<SimpleOpComp> c)
+void RADOS::delete_pool_(std::string name,
+                        SimpleOpComp c)
 {
+  auto e = asio::prefer(get_executor(),
+                       asio::execution::outstanding_work.tracked);
   impl->objecter->delete_pool(
     name,
-    Objecter::PoolOp::OpComp::create(
-      get_executor(),
+    asio::bind_executor(
+      std::move(e),
       [c = std::move(c)](bs::error_code e, const bufferlist&) mutable {
-       ca::dispatch(std::move(c), e);
+       asio::dispatch(asio::append(std::move(c), e));
       }));
 }
 
-void RADOS::delete_pool(std::int64_t pool,
-                       std::unique_ptr<SimpleOpComp> c)
+void RADOS::delete_pool_(std::int64_t pool,
+                        SimpleOpComp c)
 {
+  auto e = asio::prefer(get_executor(),
+                       asio::execution::outstanding_work.tracked);
   impl->objecter->delete_pool(
     pool,
-    Objecter::PoolOp::OpComp::create(
-      get_executor(),
+    asio::bind_executor(
+      std::move(e),
       [c = std::move(c)](bs::error_code e, const bufferlist&) mutable {
-       ca::dispatch(std::move(c), e);
+       asio::dispatch(asio::append(std::move(c), e));
       }));
 }
 
-void RADOS::stat_pools(std::vector<std::string> pools,
-                      std::unique_ptr<PoolStatComp> c) {
+void RADOS::stat_pools_(std::vector<std::string> pools,
+                       PoolStatComp c) {
   impl->objecter->get_pool_stats(
     pools,
     [c = std::move(c)]
@@ -1073,12 +1099,13 @@ void RADOS::stat_pools(std::vector<std::string> pools,
        pv.compressed_bytes_alloc = statfs.data_compressed_allocated;
       }
 
-      ca::dispatch(std::move(c), ec, std::move(result), per_pool);
+      asio::dispatch(asio::append(std::move(c), ec, std::move(result),
+                                 per_pool));
     });
 }
 
-void RADOS::stat_fs(std::optional<std::int64_t> _pool,
-                   std::unique_ptr<StatFSComp> c) {
+void RADOS::stat_fs_(std::optional<std::int64_t> _pool,
+                    StatFSComp c) {
   std::optional<int64_t> pool;
   if (_pool)
     pool = *pool;
@@ -1086,15 +1113,15 @@ void RADOS::stat_fs(std::optional<std::int64_t> _pool,
     pool,
     [c = std::move(c)](bs::error_code ec, const struct ceph_statfs s) mutable {
       FSStats fso{s.kb, s.kb_used, s.kb_avail, s.num_objects};
-      c->dispatch(std::move(c), ec, std::move(fso));
+      asio::dispatch(asio::append(std::move(c), ec, std::move(fso)));
     });
 }
 
 // --- Watch/Notify
 
-void RADOS::watch(Object o, IOContext _ioc,
-                 std::optional<std::chrono::seconds> timeout, WatchCB cb,
-                 std::unique_ptr<WatchComp> c) {
+void RADOS::watch_(Object o, IOContext _ioc,
+                  std::optional<std::chrono::seconds> timeout, WatchCB cb,
+                  WatchComp c) {
   auto oid = reinterpret_cast<const object_t*>(&o.impl);
   auto ioc = reinterpret_cast<const IOContextImpl*>(&_ioc.impl);
 
@@ -1106,21 +1133,23 @@ void RADOS::watch(Object o, IOContext _ioc,
   linger_op->handle = std::move(cb);
   op.watch(cookie, CEPH_OSD_WATCH_OP_WATCH, timeout.value_or(0s).count());
   bufferlist bl;
+  auto e = asio::prefer(get_executor(),
+                       asio::execution::outstanding_work.tracked);
   impl->objecter->linger_watch(
     linger_op, op, ioc->snapc, ceph::real_clock::now(), bl,
-    Objecter::LingerOp::OpComp::create(
-      get_executor(),
+    asio::bind_executor(
+      std::move(e),
       [c = std::move(c), cookie](bs::error_code e, cb::list) mutable {
-       ca::dispatch(std::move(c), e, cookie);
+       asio::dispatch(asio::append(std::move(c), e, cookie));
       }), nullptr);
 }
 
-void RADOS::notify_ack(Object o,
-                      IOContext _ioc,
-                      uint64_t notify_id,
-                      uint64_t cookie,
-                      bufferlist bl,
-                      std::unique_ptr<SimpleOpComp> c)
+void RADOS::notify_ack_(Object o,
+                       IOContext _ioc,
+                       uint64_t notify_id,
+                       uint64_t cookie,
+                       bufferlist bl,
+                       SimpleOpComp c)
 {
   auto oid = reinterpret_cast<const object_t*>(&o.impl);
   auto ioc = reinterpret_cast<const IOContextImpl*>(&_ioc.impl);
@@ -1132,14 +1161,14 @@ void RADOS::notify_ack(Object o,
                       nullptr, ioc->extra_op_flags, std::move(c));
 }
 
-tl::expected<ceph::timespan, bs::error_code> RADOS::watch_check(uint64_t cookie)
+tl::expected<ceph::timespan, bs::error_code> RADOS::watch_check_(uint64_t cookie)
 {
   Objecter::LingerOp *linger_op = reinterpret_cast<Objecter::LingerOp*>(cookie);
   return impl->objecter->linger_check(linger_op);
 }
 
-void RADOS::unwatch(uint64_t cookie, IOContext _ioc,
-                   std::unique_ptr<SimpleOpComp> c)
+void RADOS::unwatch_(uint64_t cookie, IOContext _ioc,
+                    SimpleOpComp c)
 {
   auto ioc = reinterpret_cast<const IOContextImpl*>(&_ioc.impl);
 
@@ -1147,48 +1176,50 @@ void RADOS::unwatch(uint64_t cookie, IOContext _ioc,
 
   ObjectOperation op;
   op.watch(cookie, CEPH_OSD_WATCH_OP_UNWATCH);
+  auto e = asio::prefer(get_executor(),
+                       asio::execution::outstanding_work.tracked);
   impl->objecter->mutate(linger_op->target.base_oid, ioc->oloc, std::move(op),
                         ioc->snapc, ceph::real_clock::now(), ioc->extra_op_flags,
-                        Objecter::Op::OpComp::create(
-                          get_executor(),
+                        asio::bind_executor(
+                          std::move(e),
                           [objecter = impl->objecter,
                            linger_op, c = std::move(c)]
                           (bs::error_code ec) mutable {
                             objecter->linger_cancel(linger_op);
-                            ca::dispatch(std::move(c), ec);
+                            asio::dispatch(asio::append(std::move(c), ec));
                           }));
 }
 
-void RADOS::flush_watch(std::unique_ptr<VoidOpComp> c)
+void RADOS::flush_watch_(VoidOpComp c)
 {
   impl->objecter->linger_callback_flush([c = std::move(c)]() mutable {
-                                         ca::post(std::move(c));
+                                         asio::dispatch(std::move(c));
                                        });
 }
 
 struct NotifyHandler : std::enable_shared_from_this<NotifyHandler> {
-  boost::asio::io_context& ioc;
-  boost::asio::strand<boost::asio::io_context::executor_type> strand;
+  asio::io_context& ioc;
+  asio::strand<asio::io_context::executor_type> strand;
   Objecter* objecter;
   Objecter::LingerOp* op;
-  std::unique_ptr<RADOS::NotifyComp> c;
+  RADOS::NotifyComp c;
 
   bool acked = false;
   bool finished = false;
   bs::error_code res;
   bufferlist rbl;
 
-  NotifyHandler(boost::asio::io_context& ioc,
+  NotifyHandler(asio::io_context& ioc,
                Objecter* objecter,
                Objecter::LingerOp* op,
-               std::unique_ptr<RADOS::NotifyComp> c)
-    : ioc(ioc), strand(boost::asio::make_strand(ioc)),
+               RADOS::NotifyComp c)
+    : ioc(ioc), strand(asio::make_strand(ioc)),
       objecter(objecter), op(op), c(std::move(c)) {}
 
   // Use bind or a lambda to pass this in.
   void handle_ack(bs::error_code ec,
                  bufferlist&&) {
-    boost::asio::post(
+    asio::post(
       strand,
       [this, ec, p = shared_from_this()]() mutable {
        acked = true;
@@ -1200,7 +1231,7 @@ struct NotifyHandler : std::enable_shared_from_this<NotifyHandler> {
 
   void operator()(bs::error_code ec,
                  bufferlist&& bl) {
-    boost::asio::post(
+    asio::post(
       strand,
       [this, ec, p = shared_from_this()]() mutable {
        finished = true;
@@ -1215,14 +1246,14 @@ struct NotifyHandler : std::enable_shared_from_this<NotifyHandler> {
     if ((acked && finished) || res) {
       objecter->linger_cancel(op);
       ceph_assert(c);
-      ca::dispatch(std::move(c), res, std::move(rbl));
+      asio::dispatch(asio::append(std::move(c), res, std::move(rbl)));
     }
   }
 };
 
-void RADOS::notify(Object o, IOContext _ioc, bufferlist bl,
-                  std::optional<std::chrono::milliseconds> timeout,
-                  std::unique_ptr<NotifyComp> c)
+void RADOS::notify_(Object o, IOContext _ioc, bufferlist bl,
+                   std::optional<std::chrono::milliseconds> timeout,
+                   NotifyComp c)
 {
   auto oid = reinterpret_cast<const object_t*>(&o.impl);
   auto ioc = reinterpret_cast<const IOContextImpl*>(&_ioc.impl);
@@ -1231,9 +1262,11 @@ void RADOS::notify(Object o, IOContext _ioc, bufferlist bl,
 
   auto cb = std::make_shared<NotifyHandler>(impl->ioctx, impl->objecter,
                                             linger_op, std::move(c));
+  auto e = asio::prefer(get_executor(),
+                       asio::execution::outstanding_work.tracked);
   linger_op->on_notify_finish =
-    Objecter::LingerOp::OpComp::create(
-      get_executor(),
+    asio::bind_executor(
+      e,
       [cb](bs::error_code ec, ceph::bufferlist bl) mutable {
        (*cb)(ec, std::move(bl));
       });
@@ -1246,8 +1279,8 @@ void RADOS::notify(Object o, IOContext _ioc, bufferlist bl,
 
   impl->objecter->linger_notify(
     linger_op, rd, ioc->snap_seq, inbl,
-    Objecter::LingerOp::OpComp::create(
-      get_executor(),
+    asio::bind_executor(
+      e,
       [cb](bs::error_code ec, ceph::bufferlist bl) mutable {
        cb->handle_ack(ec, std::move(bl));
       }), nullptr);
@@ -1354,12 +1387,12 @@ Cursor::from_str(const std::string& s) {
   return e;
 }
 
-void RADOS::enumerate_objects(IOContext _ioc,
-                             Cursor begin,
-                             Cursor end,
-                             const std::uint32_t max,
-                             bufferlist filter,
-                             std::unique_ptr<EnumerateComp> c) {
+void RADOS::enumerate_objects_(IOContext _ioc,
+                              Cursor begin,
+                              Cursor end,
+                              const std::uint32_t max,
+                              bufferlist filter,
+                              EnumerateComp c) {
   auto ioc = reinterpret_cast<const IOContextImpl*>(&_ioc.impl);
 
   impl->objecter->enumerate_objects<Entry>(
@@ -1372,45 +1405,43 @@ void RADOS::enumerate_objects(IOContext _ioc,
     [c = std::move(c)]
     (bs::error_code ec, std::vector<Entry>&& v,
      hobject_t&& n) mutable {
-      ca::dispatch(std::move(c), ec, std::move(v),
-                  Cursor(static_cast<void*>(&n)));
+      asio::dispatch(asio::append(std::move(c), ec, std::move(v),
+                                 Cursor(static_cast<void*>(&n))));
     });
 }
 
 
-void RADOS::osd_command(int osd, std::vector<std::string> cmd,
-                       ceph::bufferlist in, std::unique_ptr<CommandComp> c) {
-  impl->objecter->osd_command(osd, std::move(cmd), std::move(in), nullptr,
-                             [c = std::move(c)]
-                             (bs::error_code ec,
-                              std::string&& s,
-                              ceph::bufferlist&& b) mutable {
-                               ca::dispatch(std::move(c), ec,
-                                            std::move(s),
-                                            std::move(b));
-                             });
+void RADOS::osd_command_(int osd, std::vector<std::string> cmd,
+                        ceph::bufferlist in, CommandComp c) {
+  impl->objecter->osd_command(
+    osd, std::move(cmd), std::move(in), nullptr,
+    [c = std::move(c)]
+    (bs::error_code ec, std::string&& s, ceph::bufferlist&& b) mutable {
+      asio::dispatch(asio::append(std::move(c), ec, std::move(s),
+                                 std::move(b)));
+    });
 }
 
-void RADOS::pg_command(PG pg, std::vector<std::string> cmd,
-                      ceph::bufferlist in, std::unique_ptr<CommandComp> c) {
-  impl->objecter->pg_command(pg_t{pg.seed, pg.pool}, std::move(cmd), std::move(in), nullptr,
-                            [c = std::move(c)]
-                            (bs::error_code ec,
-                             std::string&& s,
-                             ceph::bufferlist&& b) mutable {
-                              ca::dispatch(std::move(c), ec,
-                                           std::move(s),
-                                           std::move(b));
-                            });
+void RADOS::pg_command_(PG pg, std::vector<std::string> cmd,
+                       ceph::bufferlist in, CommandComp c) {
+  impl->objecter->pg_command(
+    pg_t{pg.seed, pg.pool}, std::move(cmd), std::move(in), nullptr,
+    [c = std::move(c)]
+    (bs::error_code ec, std::string&& s,
+     ceph::bufferlist&& b) mutable {
+      asio::dispatch(asio::append(std::move(c), ec, std::move(s),
+                                 std::move(b)));
+    });
 }
 
-void RADOS::enable_application(std::string pool, std::string app_name,
-                              bool force, std::unique_ptr<SimpleOpComp> c) {
+void RADOS::enable_application_(std::string pool, std::string app_name,
+                               bool force, SimpleOpComp c) {
   // pre-Luminous clusters will return -EINVAL and application won't be
   // preserved until Luminous is configured as minimum version.
   if (!impl->get_required_monitor_features().contains_all(
        ceph::features::mon::FEATURE_LUMINOUS)) {
-    ca::post(std::move(c), ceph::to_error_code(-EOPNOTSUPP));
+    asio::post(get_executor(),
+              asio::append(std::move(c), ceph::to_error_code(-EOPNOTSUPP)));
   } else {
     impl->monclient.start_mon_command(
       { fmt::format("{{ \"prefix\": \"osd pool application enable\","
@@ -1419,14 +1450,14 @@ void RADOS::enable_application(std::string pool, std::string app_name,
                    force ? " ,\"yes_i_really_mean_it\": true" : "")},
       {}, [c = std::move(c)](bs::error_code e,
                             std::string, cb::list) mutable {
-           ca::post(std::move(c), e);
-         });
+       asio::dispatch(asio::append(std::move(c), e));
+      });
   }
 }
 
-void RADOS::blocklist_add(std::string client_address,
-                          std::optional<std::chrono::seconds> expire,
-                          std::unique_ptr<SimpleOpComp> c) {
+void RADOS::blocklist_add_(std::string client_address,
+                          std::optional<std::chrono::seconds> expire,
+                          SimpleOpComp c) {
   auto expire_arg = (expire ?
     fmt::format(", \"expire\": \"{}.0\"", expire->count()) : std::string{});
   impl->monclient.start_mon_command(
@@ -1439,7 +1470,8 @@ void RADOS::blocklist_add(std::string client_address,
     [this, client_address = std::move(client_address), expire_arg,
      c = std::move(c)](bs::error_code ec, std::string, cb::list) mutable {
       if (ec != bs::errc::invalid_argument) {
-        ca::post(std::move(c), ec);
+        asio::post(get_executor(),
+                  asio::append(std::move(c), ec));
         return;
       }
 
@@ -1452,19 +1484,19 @@ void RADOS::blocklist_add(std::string client_address,
                       client_address, expire_arg) },
         {},
         [c = std::move(c)](bs::error_code ec, std::string, cb::list) mutable {
-          ca::post(std::move(c), ec);
+          asio::dispatch(asio::append(std::move(c), ec));
         });
     });
 }
 
-void RADOS::wait_for_latest_osd_map(std::unique_ptr<SimpleOpComp> c) {
+void RADOS::wait_for_latest_osd_map_(SimpleOpComp c) {
   impl->objecter->wait_for_latest_osdmap(std::move(c));
 }
 
-void RADOS::mon_command(std::vector<std::string> command,
-                       cb::list bl,
-                       std::string* outs, cb::list* outbl,
-                       std::unique_ptr<SimpleOpComp> c) {
+void RADOS::mon_command_(std::vector<std::string> command,
+                        cb::list bl,
+                        std::string* outs, cb::list* outbl,
+                        SimpleOpComp c) {
 
   impl->monclient.start_mon_command(
     command, bl,
@@ -1474,7 +1506,7 @@ void RADOS::mon_command(std::vector<std::string> command,
        *outs = std::move(s);
       if (outbl)
        *outbl = std::move(bl);
-      ca::post(std::move(c), e);
+      asio::dispatch(asio::append(std::move(c), e));
     });
 }
 
index eee03c1189877e5b93bdcfdc2ce83fb0118ac3de..3421da8d59d2692325a72ea957043c2e362c1262 100644 (file)
@@ -95,6 +95,7 @@ namespace bc = boost::container;
 namespace bs = boost::system;
 namespace ca = ceph::async;
 namespace cb = ceph::buffer;
+namespace asio = boost::asio;
 
 #define dout_subsys ceph_subsys_objecter
 #undef dout_prefix
@@ -604,14 +605,14 @@ void Objecter::_linger_commit(LingerOp *info, bs::error_code ec,
   std::unique_lock wl(info->watch_lock);
   ldout(cct, 10) << "_linger_commit " << info->linger_id << dendl;
   if (info->on_reg_commit) {
-    info->on_reg_commit->defer(std::move(info->on_reg_commit),
-                              ec, cb::list{});
-    info->on_reg_commit.reset();
+    asio::defer(service.get_executor(),
+               asio::append(std::move(info->on_reg_commit),
+                            ec, cb::list{}));
   }
   if (ec && info->on_notify_finish) {
-    info->on_notify_finish->defer(std::move(info->on_notify_finish),
-                                 ec, cb::list{});
-    info->on_notify_finish.reset();
+    asio::defer(service.get_executor(),
+               asio::append(std::move(info->on_notify_finish),
+                            ec, cb::list{}));
   }
 
   // only tell the user the first time we do this
@@ -673,7 +674,7 @@ void Objecter::_linger_reconnect(LingerOp *info, bs::error_code ec)
     if (!info->last_error) {
       ec = _normalize_watch_error(ec);
       if (info->handle) {
-       boost::asio::defer(finish_strand, CB_DoWatchError(this, info, ec));
+       asio::defer(finish_strand, CB_DoWatchError(this, info, ec));
       }
     }
   }
@@ -708,7 +709,7 @@ void Objecter::_send_linger_ping(LingerOp *info)
 
   Op *o = new Op(info->target.base_oid, info->target.base_oloc,
                 std::move(opv), info->target.flags | CEPH_OSD_FLAG_READ,
-                CB_Linger_Ping(this, info, now),
+                fu2::unique_function<Op::OpSig>{CB_Linger_Ping(this, info, now)},
                 nullptr, nullptr);
   o->target = info->target;
   o->should_resend = false;
@@ -736,7 +737,7 @@ void Objecter::_linger_ping(LingerOp *info, bs::error_code ec, ceph::coarse_mono
       ec = _normalize_watch_error(ec);
       info->last_error = ec;
       if (info->handle) {
-       boost::asio::defer(finish_strand, CB_DoWatchError(this, info, ec));
+       asio::defer(finish_strand, CB_DoWatchError(this, info, ec));
       }
     }
   } else {
@@ -924,7 +925,7 @@ void Objecter::handle_watch_notify(MWatchNotify *m)
     if (!info->last_error) {
       info->last_error = bs::error_code(ENOTCONN, osd_category());
       if (info->handle) {
-       boost::asio::defer(finish_strand, CB_DoWatchError(this, info,
+       asio::defer(finish_strand, CB_DoWatchError(this, info,
                                                          info->last_error));
       }
     }
@@ -937,16 +938,16 @@ void Objecter::handle_watch_notify(MWatchNotify *m)
       ldout(cct, 10) << __func__ << " reply notify " << m->notify_id
                     << " != " << info->notify_id << ", ignoring" << dendl;
     } else if (info->on_notify_finish) {
-      info->on_notify_finish->defer(
-       std::move(info->on_notify_finish),
-       osdcode(m->return_code), std::move(m->get_data()));
-
+      asio::defer(service.get_executor(),
+                 asio::append(std::move(info->on_notify_finish),
+                              osdcode(m->return_code),
+                              std::move(m->get_data())));
       // if we race with reconnect we might get a second notify; only
       // notify the caller once!
       info->on_notify_finish = nullptr;
     }
   } else {
-    boost::asio::defer(finish_strand, CB_DoWatchNotify(this, info, m));
+    asio::defer(finish_strand, CB_DoWatchNotify(this, info, m));
   }
 }
 
@@ -1379,7 +1380,7 @@ void Objecter::handle_osd_map(MOSDMap *m)
         p->first <= osdmap->get_epoch()) {
     //go through the list and call the onfinish methods
     for (auto& [c, ec] : p->second) {
-      ca::post(std::move(c), ec);
+      asio::post(service.get_executor(), asio::append(std::move(c), ec));
     }
     waiting_for_map.erase(p++);
   }
@@ -1568,7 +1569,7 @@ void Objecter::_check_op_pool_dne(Op *op, std::unique_lock<std::shared_mutex> *s
                     << " dne" << dendl;
       if (op->has_completion()) {
        num_in_flight--;
-       op->complete(osdc_errc::pool_dne, -ENOENT);
+       op->complete(osdc_errc::pool_dne, -ENOENT, service.get_executor());
       }
 
       OSDSession *s = op->session;
@@ -1603,7 +1604,7 @@ void Objecter::_check_op_pool_eio(Op *op, std::unique_lock<std::shared_mutex> *s
                 << " has eio" << dendl;
   if (op->has_completion()) {
     num_in_flight--;
-    op->complete(osdc_errc::pool_eio, -EIO);
+    op->complete(osdc_errc::pool_eio, -EIO, service.get_executor());
   }
 
   OSDSession *s = op->session;
@@ -1701,13 +1702,15 @@ void Objecter::_check_linger_pool_dne(LingerOp *op, bool *need_unregister)
     if (osdmap->get_epoch() >= op->map_dne_bound) {
       std::unique_lock wl{op->watch_lock};
       if (op->on_reg_commit) {
-       op->on_reg_commit->defer(std::move(op->on_reg_commit),
-                                osdc_errc::pool_dne, cb::list{});
+       asio::defer(service.get_executor(),
+                   asio::append(std::move(op->on_reg_commit),
+                                osdc_errc::pool_dne, cb::list{}));
        op->on_reg_commit = nullptr;
       }
       if (op->on_notify_finish) {
-       op->on_notify_finish->defer(std::move(op->on_notify_finish),
-                                   osdc_errc::pool_dne, cb::list{});
+       asio::defer(service.get_executor(),
+                   asio::append(std::move(op->on_notify_finish),
+                                osdc_errc::pool_dne, cb::list{}));
         op->on_notify_finish = nullptr;
       }
       *need_unregister = true;
@@ -1723,14 +1726,14 @@ void Objecter::_check_linger_pool_eio(LingerOp *op)
 
   std::unique_lock wl{op->watch_lock};
   if (op->on_reg_commit) {
-    op->on_reg_commit->defer(std::move(op->on_reg_commit),
-                            osdc_errc::pool_dne, cb::list{});
-    op->on_reg_commit = nullptr;
+    asio::defer(service.get_executor(),
+               asio::append(std::move(op->on_reg_commit),
+                            osdc_errc::pool_dne, cb::list{}));
   }
   if (op->on_notify_finish) {
-    op->on_notify_finish->defer(std::move(op->on_notify_finish),
-                               osdc_errc::pool_dne, cb::list{});
-    op->on_notify_finish = nullptr;
+    asio::defer(service.get_executor(),
+               asio::append(std::move(op->on_notify_finish),
+                            osdc_errc::pool_dne, cb::list{}));
   }
 }
 
@@ -1984,7 +1987,10 @@ void Objecter::wait_for_osd_map(epoch_t e)
   }
 
   ca::waiter<bs::error_code> w;
-  waiting_for_map[e].emplace_back(OpCompletion::create(
+  auto ex = boost::asio::prefer(
+    service.get_executor(),
+    boost::asio::execution::outstanding_work.tracked);
+  waiting_for_map[e].emplace_back(asio::bind_executor(
                                    service.get_executor(),
                                    w.ref()),
                                  bs::error_code{});
@@ -1993,14 +1999,15 @@ void Objecter::wait_for_osd_map(epoch_t e)
 }
 
 void Objecter::_get_latest_version(epoch_t oldest, epoch_t newest,
-                                  std::unique_ptr<OpCompletion> fin,
+                                  OpCompletion fin,
                                   std::unique_lock<ceph::shared_mutex>&& l)
 {
   ceph_assert(fin);
   if (osdmap->get_epoch() >= newest) {
     ldout(cct, 10) << __func__ << " latest " << newest << ", have it" << dendl;
     l.unlock();
-    ca::defer(std::move(fin), bs::error_code{});
+    asio::defer(service.get_executor(),
+               asio::append(std::move(fin), bs::error_code{}));
   } else {
     ldout(cct, 10) << __func__ << " latest " << newest << ", waiting" << dendl;
     _wait_for_new_map(std::move(fin), newest, bs::error_code{});
@@ -2034,7 +2041,7 @@ void Objecter::_maybe_request_map()
   }
 }
 
-void Objecter::_wait_for_new_map(std::unique_ptr<OpCompletion> c, epoch_t epoch,
+void Objecter::_wait_for_new_map(OpCompletion c, epoch_t epoch,
                                 bs::error_code ec)
 {
   // rwlock is locked unique
@@ -2399,7 +2406,7 @@ void Objecter::_op_submit(Op *op, shunique_lock<ceph::shared_mutex>& sul, ceph_t
     break;
   case RECALC_OP_TARGET_POOL_EIO:
     if (op->has_completion()) {
-      op->complete(osdc_errc::pool_eio, -EIO);
+      op->complete(osdc_errc::pool_eio, -EIO, service.get_executor());
     }
     return;
   }
@@ -2510,7 +2517,7 @@ int Objecter::op_cancel(OSDSession *s, ceph_tid_t tid, int r)
   Op *op = p->second;
   if (op->has_completion()) {
     num_in_flight--;
-    op->complete(osdcode(r), r);
+    op->complete(osdcode(r), r, service.get_executor());
   }
   _op_cancel_map_check(op);
   _finish_op(op, r);
@@ -3611,9 +3618,9 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
   // do callbacks
   if (Op::has_completion(onfinish)) {
     if (rc == 0 && handler_error) {
-      Op::complete(std::move(onfinish), handler_error, -EIO);
+      Op::complete(std::move(onfinish), handler_error, -EIO, service.get_executor());
     } else {
-      Op::complete(std::move(onfinish), osdcode(rc), rc);
+      Op::complete(std::move(onfinish), osdcode(rc), rc, service.get_executor());
     }
   }
   if (completion_lock.mutex()) {
@@ -3914,12 +3921,14 @@ void Objecter::create_pool_snap(int64_t pool, std::string_view snap_name,
 
   const pg_pool_t *p = osdmap->get_pg_pool(pool);
   if (!p) {
-    onfinish->defer(std::move(onfinish), osdc_errc::pool_dne, cb::list{});
+    asio::defer(service.get_executor(),
+               asio::append(std::move(onfinish), osdc_errc::pool_dne, cb::list{}));
     return;
   }
   if (p->snap_exists(snap_name)) {
-    onfinish->defer(std::move(onfinish), osdc_errc::snapshot_exists,
-                   cb::list{});
+    asio::defer(service.get_executor(),
+               asio::append(std::move(onfinish), osdc_errc::snapshot_exists,
+                            cb::list{}));
     return;
   }
 
@@ -3935,7 +3944,7 @@ void Objecter::create_pool_snap(int64_t pool, std::string_view snap_name,
 }
 
 struct CB_SelfmanagedSnap {
-  std::unique_ptr<ca::Completion<void(bs::error_code, snapid_t)>> fin;
+  asio::any_completion_handler<void(bs::error_code, snapid_t)> fin;
   CB_SelfmanagedSnap(decltype(fin)&& fin)
     : fin(std::move(fin)) {}
   void operator()(bs::error_code ec, const cb::list& bl) {
@@ -3948,22 +3957,23 @@ struct CB_SelfmanagedSnap {
         ec = e.code();
       }
     }
-    fin->defer(std::move(fin), ec, snapid);
+    asio::dispatch(asio::append(std::move(fin), ec, snapid));
   }
 };
 
 void Objecter::allocate_selfmanaged_snap(
   int64_t pool,
-  std::unique_ptr<ca::Completion<void(bs::error_code, snapid_t)>> onfinish)
+  asio::any_completion_handler<void(bs::error_code, snapid_t)> onfinish)
 {
   unique_lock wl(rwlock);
   ldout(cct, 10) << "allocate_selfmanaged_snap; pool: " << pool << dendl;
   auto op = new PoolOp;
   op->tid = ++last_tid;
   op->pool = pool;
-  op->onfinish = PoolOp::OpComp::create(
+  auto e = boost::asio::prefer(
     service.get_executor(),
-    CB_SelfmanagedSnap(std::move(onfinish)));
+    boost::asio::execution::outstanding_work.tracked);
+  op->onfinish = asio::bind_executor(e, CB_SelfmanagedSnap(std::move(onfinish)));
   op->pool_op = POOL_OP_CREATE_UNMANAGED_SNAP;
   pool_ops[op->tid] = op;
 
@@ -3980,12 +3990,15 @@ void Objecter::delete_pool_snap(
 
   const pg_pool_t *p = osdmap->get_pg_pool(pool);
   if (!p) {
-    onfinish->defer(std::move(onfinish), osdc_errc::pool_dne, cb::list{});
+    asio::defer(service.get_executor(),
+               asio::append(std::move(onfinish), osdc_errc::pool_dne,
+                            cb::list{}));
     return;
   }
 
   if (!p->snap_exists(snap_name)) {
-    onfinish->defer(std::move(onfinish), osdc_errc::snapshot_dne, cb::list{});
+    asio::defer(service.get_executor(),
+               asio::append(std::move(onfinish), osdc_errc::snapshot_dne, cb::list{}));
     return;
   }
 
@@ -4025,7 +4038,9 @@ void Objecter::create_pool(std::string_view name,
   ldout(cct, 10) << "create_pool name=" << name << dendl;
 
   if (osdmap->lookup_pg_pool_name(name) >= 0) {
-    onfinish->defer(std::move(onfinish), osdc_errc::pool_exists, cb::list{});
+    asio::defer(service.get_executor(),
+               asio::append(std::move(onfinish), osdc_errc::pool_exists,
+                            cb::list{}));
     return;
   }
 
@@ -4048,7 +4063,9 @@ void Objecter::delete_pool(int64_t pool,
   ldout(cct, 10) << "delete_pool " << pool << dendl;
 
   if (!osdmap->have_pg_pool(pool))
-    onfinish->defer(std::move(onfinish), osdc_errc::pool_dne, cb::list{});
+    asio::defer(service.get_executor(),
+               asio::append(std::move(onfinish), osdc_errc::pool_dne,
+                            cb::list{}));
   else
     _do_delete_pool(pool, std::move(onfinish));
 }
@@ -4062,7 +4079,9 @@ void Objecter::delete_pool(std::string_view pool_name,
   int64_t pool = osdmap->lookup_pg_pool_name(pool_name);
   if (pool < 0)
     // This only returns one error: -ENOENT.
-    onfinish->defer(std::move(onfinish), osdc_errc::pool_dne, cb::list{});
+    asio::defer(service.get_executor(),
+               asio::append(std::move(onfinish), osdc_errc::pool_dne,
+                            cb::list{}));
   else
     _do_delete_pool(pool, std::move(onfinish));
 }
@@ -4148,12 +4167,16 @@ void Objecter::handle_pool_op_reply(MPoolOpReply *m)
       if (osdmap->get_epoch() < m->epoch) {
        ldout(cct, 20) << "waiting for client to reach epoch " << m->epoch
                       << " before calling back" << dendl;
-       _wait_for_new_map(OpCompletion::create(
-                           service.get_executor(),
+       auto e = boost::asio::prefer(
+         service.get_executor(),
+         boost::asio::execution::outstanding_work.tracked);
+       _wait_for_new_map(asio::bind_executor(
+                           e,
                            [o = std::move(op->onfinish),
-                            bl = std::move(bl)](
+                            bl = std::move(bl),
+                            e = service.get_executor()](
                              bs::error_code ec) mutable {
-                             o->defer(std::move(o), ec, bl);
+                             asio::defer(e, asio::append(std::move(o), ec, bl));
                            }),
                          m->epoch,
                          ec);
@@ -4162,11 +4185,11 @@ void Objecter::handle_pool_op_reply(MPoolOpReply *m)
        // sneaked in. Do caller-specified callback now or else
        // we lose it forever.
        ceph_assert(op->onfinish);
-       op->onfinish->defer(std::move(op->onfinish), ec, std::move(bl));
+       asio::defer(service.get_executor(), asio::append(std::move(op->onfinish), ec, std::move(bl)));
       }
     } else {
       ceph_assert(op->onfinish);
-      op->onfinish->defer(std::move(op->onfinish), ec, std::move(bl));
+      asio::defer(service.get_executor(), asio::append(std::move(op->onfinish), ec, std::move(bl)));
     }
     op->onfinish = nullptr;
     if (!sul.owns_lock()) {
@@ -4205,7 +4228,8 @@ int Objecter::pool_op_cancel(ceph_tid_t tid, int r)
 
   PoolOp *op = it->second;
   if (op->onfinish)
-    op->onfinish->defer(std::move(op->onfinish), osdcode(r), cb::list{});
+    asio::defer(service.get_executor(), asio::append(std::move(op->onfinish),
+                                                    osdcode(r), cb::list{}));
 
   _finish_pool_op(op, r);
   return 0;
@@ -4226,7 +4250,7 @@ void Objecter::_finish_pool_op(PoolOp *op, int r)
 
 // pool stats
 
-void Objecter::get_pool_stats(
+void Objecter::get_pool_stats_(
   const std::vector<std::string>& pools,
   decltype(PoolStatOp::onfinish)&& onfinish)
 {
@@ -4283,8 +4307,9 @@ void Objecter::handle_get_pool_stats_reply(MGetPoolStatsReply *m)
     if (m->version > last_seen_pgmap_version) {
       last_seen_pgmap_version = m->version;
     }
-    op->onfinish->defer(std::move(op->onfinish), bs::error_code{},
-                       std::move(m->pool_stats), m->per_pool);
+    asio::defer(service.get_executor(),
+               asio::append(std::move(op->onfinish), bs::error_code{},
+                            std::move(m->pool_stats), m->per_pool));
     _finish_pool_stat_op(op, 0);
   } else {
     ldout(cct, 10) << "unknown request " << tid << dendl;
@@ -4309,8 +4334,9 @@ int Objecter::pool_stat_op_cancel(ceph_tid_t tid, int r)
 
   auto op = it->second;
   if (op->onfinish)
-    op->onfinish->defer(std::move(op->onfinish), osdcode(r),
-                       bc::flat_map<std::string, pool_stat_t>{}, false);
+    asio::defer(service.get_executor(),
+               asio::append(std::move(op->onfinish), osdcode(r),
+                            bc::flat_map<std::string, pool_stat_t>{}, false));
   _finish_pool_stat_op(op, r);
   return 0;
 }
@@ -4328,8 +4354,8 @@ void Objecter::_finish_pool_stat_op(PoolStatOp *op, int r)
   delete op;
 }
 
-void Objecter::get_fs_stats(std::optional<int64_t> poolid,
-                           decltype(StatfsOp::onfinish)&& onfinish)
+void Objecter::get_fs_stats_(std::optional<int64_t> poolid,
+                            decltype(StatfsOp::onfinish)&& onfinish)
 {
   ldout(cct, 10) << "get_fs_stats" << dendl;
   unique_lock l(rwlock);
@@ -4382,7 +4408,8 @@ void Objecter::handle_fs_stats_reply(MStatfsReply *m)
     ldout(cct, 10) << "have request " << tid << " at " << op << dendl;
     if (m->h.version > last_seen_pgmap_version)
       last_seen_pgmap_version = m->h.version;
-    op->onfinish->defer(std::move(op->onfinish), bs::error_code{}, m->h.st);
+    asio::defer(service.get_executor(), asio::append(std::move(op->onfinish),
+                                                    bs::error_code{}, m->h.st));
     _finish_statfs_op(op, 0);
   } else {
     ldout(cct, 10) << "unknown request " << tid << dendl;
@@ -4407,7 +4434,9 @@ int Objecter::statfs_op_cancel(ceph_tid_t tid, int r)
 
   auto op = it->second;
   if (op->onfinish)
-    op->onfinish->defer(std::move(op->onfinish), osdcode(r), ceph_statfs{});
+    asio::defer(service.get_executor(),
+               asio::append(std::move(op->onfinish),
+                            osdcode(r), ceph_statfs{}));
   _finish_statfs_op(op, r);
   return 0;
 }
@@ -5008,7 +5037,9 @@ void Objecter::_finish_command(CommandOp *c, bs::error_code ec,
                 << rs << dendl;
 
   if (c->onfinish)
-    c->onfinish->defer(std::move(c->onfinish), ec, std::move(rs), std::move(bl));
+    asio::defer(service.get_executor(),
+               asio::append(std::move(c->onfinish), ec, std::move(rs),
+                            std::move(bl)));
 
   if (c->ontimeout && ec != bs::errc::timed_out)
     timer.cancel_event(c->ontimeout);
@@ -5031,7 +5062,7 @@ Objecter::OSDSession::~OSDSession()
 
 Objecter::Objecter(CephContext *cct,
                   Messenger *m, MonClient *mc,
-                  boost::asio::io_context& service) :
+                  asio::io_context& service) :
   Dispatcher(cct), messenger(m), monc(mc), service(service)
 {
   mon_timeout = cct->_conf.get_val<std::chrono::seconds>("rados_mon_op_timeout");
@@ -5235,9 +5266,12 @@ void Objecter::_issue_enumerate(hobject_t start,
   auto pbl = &on_ack->bl;
 
   // Issue.  See you later in _enumerate_reply
+  auto e = boost::asio::prefer(
+    service.get_executor(),
+    boost::asio::execution::outstanding_work.tracked);
   pg_read(start.get_hash(),
          c->oloc, op, pbl, 0,
-         Op::OpComp::create(service.get_executor(),
+         asio::bind_executor(e,
                             [c = std::move(on_ack)]
                             (bs::error_code ec) mutable {
                               (*c)(ec);
index 45250c4361b46b1e14d559c72a6b2375e999ddf5..34cb9db74ff46c0fedc5e08a7d6f2bf4a40a389a 100644 (file)
@@ -41,8 +41,8 @@
 #include "include/function2.hpp"
 #include "include/neorados/RADOS_Decodable.hpp"
 
-#include "common/admin_socket.h"
 #include "common/async/completion.h"
+#include "common/admin_socket.h"
 #include "common/ceph_time.h"
 #include "common/ceph_mutex.h"
 #include "common/ceph_timer.h"
@@ -1626,7 +1626,7 @@ class Objecter : public md_config_obs_t, public Dispatcher {
   using MOSDOp = _mosdop::MOSDOp<osdc_opvec>;
 public:
   using OpSignature = void(boost::system::error_code);
-  using OpCompletion = ceph::async::Completion<OpSignature>;
+  using OpCompletion = boost::asio::any_completion_handler<OpSignature>;
 
   // config observer bits
   const char** get_tracked_conf_keys() const override;
@@ -1843,55 +1843,91 @@ public:
     void dump(ceph::Formatter *f) const;
   };
 
-  std::unique_ptr<ceph::async::Completion<void(boost::system::error_code)>>
+  boost::asio::any_completion_handler<void(boost::system::error_code)>
   OpContextVert(Context* c) {
-    if (c)
-      return ceph::async::Completion<void(boost::system::error_code)>::create(
+    if (c) {
+      auto e = boost::asio::prefer(
        service.get_executor(),
+       boost::asio::execution::outstanding_work.tracked);
+
+      return boost::asio::bind_executor(
+       std::move(e),
        [c = std::unique_ptr<Context>(c)]
        (boost::system::error_code e) mutable {
          c.release()->complete(e);
        });
+    }
     else
       return nullptr;
   }
 
   template<typename T>
-  std::unique_ptr<ceph::async::Completion<void(boost::system::error_code, T)>>
+  boost::asio::any_completion_handler<void(boost::system::error_code, T)>
   OpContextVert(Context* c, T* p) {
 
-    if (c || p)
+    if (c || p) {
+      auto e = boost::asio::prefer(
+       service.get_executor(),
+       boost::asio::execution::outstanding_work.tracked);
       return
-       ceph::async::Completion<void(boost::system::error_code, T)>::create(
-         service.get_executor(),
+       boost::asio::bind_executor(
+         e,
          [c = std::unique_ptr<Context>(c), p]
          (boost::system::error_code e, T r) mutable {
              if (p)
                *p = std::move(r);
              if (c)
                c.release()->complete(ceph::from_error_code(e));
-           });
-    else
+         });
+    } else {
       return nullptr;
+    }
   }
 
   template<typename T>
-  std::unique_ptr<ceph::async::Completion<void(boost::system::error_code, T)>>
+  boost::asio::any_completion_handler<void(boost::system::error_code, T)>
   OpContextVert(Context* c, T& p) {
+    if (c) {
+      auto e = boost::asio::prefer(
+       service.get_executor(),
+       boost::asio::execution::outstanding_work.tracked);
+      return boost::asio::bind_executor(
+       e,
+       [c = std::unique_ptr<Context>(c), &p]
+       (boost::system::error_code e, T r) mutable {
+         p = std::move(r);
+         if (c)
+           c.release()->complete(ceph::from_error_code(e));
+       });
+    } else {
+      return nullptr;
+    }
+  }
+
+  boost::asio::any_completion_handler<void(boost::system::error_code)>
+  OpCompletionVert(std::unique_ptr<ceph::async::Completion<
+                    void(boost::system::error_code)>> c) {
     if (c)
-      return ceph::async::Completion<
-       void(boost::system::error_code, T)>::create(
-         service.get_executor(),
-         [c = std::unique_ptr<Context>(c), &p]
-         (boost::system::error_code e, T r) mutable {
-           p = std::move(r);
-           if (c)
-             c.release()->complete(ceph::from_error_code(e));
-         });
+      return [c = std::move(c)](boost::system::error_code ec) mutable {
+       c->dispatch(std::move(c), ec);
+      };
     else
       return nullptr;
   }
 
+  template<typename T>
+  boost::asio::any_completion_handler<void(boost::system::error_code, T)>
+  OpCompletionVert(std::unique_ptr<ceph::async::Completion<
+                    void(boost::system::error_code, T)>> c) {
+    if (c) {
+      return [c = std::move(c)](boost::system::error_code ec, T t) mutable {
+       c->dispatch(std::move(c), ec, std::move(t));
+      };
+    } else {
+      return nullptr;
+    }
+  }
+
   struct Op : public RefCountedObject {
     OSDSession *session = nullptr;
     int incarnation = 0;
@@ -1919,7 +1955,7 @@ public:
 
     int priority = 0;
     using OpSig = void(boost::system::error_code);
-    using OpComp = ceph::async::Completion<OpSig>;
+    using OpComp = boost::asio::any_completion_handler<OpSig>;
     // Due to an irregularity of cmpxattr, we actualy need the 'int'
     // value for onfinish for legacy librados users. As such just
     // preserve the Context* in this one case. That way we can have
@@ -1929,7 +1965,7 @@ public:
     //
     // Add a function for the linger case, where we want better
     // semantics than Context, but still need to be under the completion_lock.
-    std::variant<std::unique_ptr<OpComp>, fu2::unique_function<OpSig>,
+    std::variant<OpComp, fu2::unique_function<OpSig>,
                 Context*> onfinish;
     uint64_t ontimeout = 0;
 
@@ -1967,8 +2003,8 @@ public:
     }
 
     static void complete(decltype(onfinish)&& f, boost::system::error_code ec,
-                        int r) {
-      std::visit([ec, r](auto&& arg) {
+                        int r, boost::asio::io_context::executor_type e) {
+      std::visit([ec, r, e](auto&& arg) {
                   if constexpr (std::is_same_v<std::decay_t<decltype(arg)>,
                                 Context*>) {
                     arg->complete(r);
@@ -1976,17 +2012,18 @@ public:
                              fu2::unique_function<OpSig>>) {
                     std::move(arg)(ec);
                    } else {
-                    arg->defer(std::move(arg), ec);
+                    boost::asio::defer(e,
+                                       boost::asio::append(std::move(arg), ec));
                   }
                 }, std::move(f));
     }
-    void complete(boost::system::error_code ec, int r) {
-      complete(std::move(onfinish), ec, r);
+    void complete(boost::system::error_code ec, int r,
+                 boost::asio::io_context::executor_type e) {
+      complete(std::move(onfinish), ec, r, e);
     }
 
     Op(const object_t& o, const object_locator_t& ol,  osdc_opvec&& _ops,
-       int f, std::unique_ptr<OpComp>&& fin,
-       version_t *ov, int *offset = nullptr,
+       int f, OpComp&& fin, version_t *ov, int *offset = nullptr,
        ZTracer::Trace *parent_trace = nullptr) :
       target(o, ol, f),
       ops(std::move(_ops)),
@@ -2168,8 +2205,8 @@ public:
     using OpSig = void(boost::system::error_code,
                       boost::container::flat_map<std::string, pool_stat_t>,
                       bool);
-    using OpComp = ceph::async::Completion<OpSig>;
-    std::unique_ptr<OpComp> onfinish;
+    using OpComp = boost::asio::any_completion_handler<OpSig>;
+    OpComp onfinish;
     std::uint64_t ontimeout;
     ceph::coarse_mono_time last_submit;
   };
@@ -2179,9 +2216,9 @@ public:
     std::optional<int64_t> data_pool;
     using OpSig = void(boost::system::error_code,
                       const struct ceph_statfs);
-    using OpComp = ceph::async::Completion<OpSig>;
+    using OpComp = boost::asio::any_completion_handler<OpSig>;
 
-    std::unique_ptr<OpComp> onfinish;
+    OpComp onfinish;
     uint64_t ontimeout;
 
     ceph::coarse_mono_time last_submit;
@@ -2192,8 +2229,8 @@ public:
     int64_t pool = 0;
     std::string name;
     using OpSig = void(boost::system::error_code, ceph::buffer::list);
-    using OpComp = ceph::async::Completion<OpSig>;
-    std::unique_ptr<OpComp> onfinish;
+    using OpComp = boost::asio::any_completion_handler<OpSig>;
+    OpComp onfinish;
     uint64_t ontimeout = 0;
     int pool_op = 0;
     int16_t crush_rule = 0;
@@ -2222,8 +2259,8 @@ public:
 
     using OpSig = void(boost::system::error_code, std::string,
                       ceph::buffer::list);
-    using OpComp = ceph::async::Completion<OpSig>;
-    std::unique_ptr<OpComp> onfinish;
+    using OpComp = boost::asio::any_completion_handler<OpSig>;
+    OpComp onfinish;
 
     uint64_t ontimeout = 0;
     ceph::coarse_mono_time last_submit;
@@ -2289,9 +2326,9 @@ public:
     bool registered{false};
     bool canceled{false};
     using OpSig = void(boost::system::error_code, ceph::buffer::list);
-    using OpComp = ceph::async::Completion<OpSig>;
-    std::unique_ptr<OpComp> on_reg_commit;
-    std::unique_ptr<OpComp> on_notify_finish;
+    using OpComp = boost::asio::any_completion_handler<OpSig>;
+    OpComp on_reg_commit;
+    OpComp on_notify_finish;
     uint64_t notify_id{0};
 
     fu2::unique_function<void(boost::system::error_code,
@@ -2449,7 +2486,7 @@ public:
   std::map<ceph_tid_t, CommandOp*> check_latest_map_commands;
 
   std::map<epoch_t,
-          std::vector<std::pair<std::unique_ptr<OpCompletion>,
+          std::vector<std::pair<OpCompletion,
                                 boost::system::error_code>>> waiting_for_map;
 
   ceph::timespan mon_timeout;
@@ -2516,9 +2553,13 @@ public:
 public:
   template<typename CT>
   auto linger_callback_flush(CT&& ct) {
-    boost::asio::async_completion<CT, void(void)> init(ct);
-    boost::asio::defer(finish_strand, std::move(init.completion_handler));
-    return init.result.get();
+    auto consigned = boost::asio::consign(
+      std::forward<CT>(ct), boost::asio::make_work_guard(
+       service.get_executor()));
+    return boost::asio::async_initiate<decltype(consigned), void()>(
+      [this](auto handler) {
+       boost::asio::defer(finish_strand, std::move(handler));
+      }, consigned);
   }
 
 private:
@@ -2669,22 +2710,28 @@ private:
 
   template<typename CompletionToken>
   auto wait_for_osd_map(CompletionToken&& token) {
-    boost::asio::async_completion<CompletionToken, void()> init(token);
-    std::unique_lock l(rwlock);
-    if (osdmap->get_epoch()) {
-      l.unlock();
-      boost::asio::post(std::move(init.completion_handler));
-    } else {
-      waiting_for_map[0].emplace_back(
-       OpCompletion::create(
-         service.get_executor(),
-         [c = std::move(init.completion_handler)]
-         (boost::system::error_code) mutable {
-           std::move(c)();
-         }), boost::system::error_code{});
-      l.unlock();
-    }
-    return init.result.get();
+    auto consigned = boost::asio::consign(
+      std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+       service.get_executor()));
+    return boost::asio::async_initiate<decltype(consigned), void()>(
+      [this](auto handler) {
+       std::unique_lock l(rwlock);
+       if (osdmap->get_epoch()) {
+         l.unlock();
+         boost::asio::post(std::move(handler));
+       } else {
+         auto e = boost::asio::get_associated_executor(
+           handler, service.get_executor());
+         waiting_for_map[0].emplace_back(
+           boost::asio::bind_executor(
+             e, [c = std::move(handler)]
+             (boost::system::error_code) mutable {
+               boost::asio::dispatch(std::move(c));
+             }),
+           boost::system::error_code{});
+         l.unlock();
+       }
+      }, consigned);
   }
 
 
@@ -2751,9 +2798,9 @@ public:
 
   struct CB_Objecter_GetVersion {
     Objecter *objecter;
-    std::unique_ptr<OpCompletion> fin;
+    OpCompletion fin;
 
-    CB_Objecter_GetVersion(Objecter *o, std::unique_ptr<OpCompletion> c)
+    CB_Objecter_GetVersion(Objecter *o, OpCompletion c)
       : objecter(o), fin(std::move(c)) {}
     void operator()(boost::system::error_code ec, version_t newest,
                    version_t oldest) {
@@ -2761,7 +2808,8 @@ public:
        // try again as instructed
        objecter->_wait_for_latest_osdmap(std::move(*this));
       } else if (ec) {
-       ceph::async::post(std::move(fin), ec);
+       boost::asio::post(objecter->service.get_executor(),
+                         boost::asio::append(std::move(fin), ec));
       } else {
        auto l = std::unique_lock(objecter->rwlock);
        objecter->_get_latest_version(oldest, newest, std::move(fin),
@@ -2772,24 +2820,23 @@ public:
 
   template<typename CompletionToken>
   auto wait_for_map(epoch_t epoch, CompletionToken&& token) {
-    boost::asio::async_completion<CompletionToken, OpSignature> init(token);
-
-    if (osdmap->get_epoch() >= epoch) {
-      boost::asio::post(service,
-                       ceph::async::bind_handler(
-                         std::move(init.completion_handler),
-                         boost::system::error_code()));
-    } else {
-      monc->get_version("osdmap",
-                       CB_Objecter_GetVersion(
-                         this,
-                         OpCompletion::create(service.get_executor(),
-                                              std::move(init.completion_handler))));
-    }
-    return init.result.get();
+    auto consigned = boost::asio::consign(
+      std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+       service.get_executor()));
+    return boost::asio::async_initiate<decltype(consigned), OpSignature>(
+      [epoch, this](auto handler) {
+       if (osdmap->get_epoch() >= epoch) {
+         boost::asio::post(boost::asio::append(
+                             std::move(handler),
+                             boost::system::error_code{}));
+       } else {
+         monc->get_version(
+           "osdmap",
+           CB_Objecter_GetVersion(this, std::move(handler)));
+       }
+      }, consigned);
   }
-
-  void _wait_for_new_map(std::unique_ptr<OpCompletion>, epoch_t epoch,
+  void _wait_for_new_map(OpCompletion, epoch_t epoch,
                         boost::system::error_code = {});
 
 private:
@@ -2801,38 +2848,40 @@ public:
 
   template<typename CompletionToken>
   auto wait_for_latest_osdmap(CompletionToken&& token) {
-    boost::asio::async_completion<CompletionToken, OpSignature> init(token);
-
-    monc->get_version("osdmap",
-                     CB_Objecter_GetVersion(
-                       this,
-                       OpCompletion::create(service.get_executor(),
-                                            std::move(init.completion_handler))));
-    return init.result.get();
+    auto consigned = boost::asio::consign(
+      std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+       service.get_executor()));
+    boost::asio::async_initiate<decltype(consigned), OpSignature>(
+      [this](auto handler) {
+       monc->get_version("osdmap",
+                         CB_Objecter_GetVersion(
+                           this,
+                           std::move(handler)));
+      }, consigned);
   }
 
-  void wait_for_latest_osdmap(std::unique_ptr<OpCompletion> c) {
-    monc->get_version("osdmap",
-                     CB_Objecter_GetVersion(this, std::move(c)));
+  auto wait_for_latest_osdmap(std::unique_ptr<ceph::async::Completion<OpSignature>> c) {
+    wait_for_latest_osdmap([c = std::move(c)](boost::system::error_code e) mutable {
+      c->dispatch(std::move(c), e);
+    });
   }
 
   template<typename CompletionToken>
   auto get_latest_version(epoch_t oldest, epoch_t newest,
                          CompletionToken&& token) {
-    boost::asio::async_completion<CompletionToken, OpSignature> init(token);
-    {
-      std::unique_lock wl(rwlock);
-      _get_latest_version(oldest, newest,
-                         OpCompletion::create(
-                           service.get_executor(),
-                           std::move(init.completion_handler)),
-                         std::move(wl));
-    }
-    return init.result.get();
+    auto consigned = boost::asio::consign(
+      std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+       service.get_executor()));
+    return boost::asio::async_initiate<decltype(consigned), OpSignature>(
+      [oldest, newest, this](auto handler) {
+       std::unique_lock wl(rwlock);
+       _get_latest_version(oldest, newest,
+                           std::move(handler), std::move(wl));
+      }, consigned);
   }
 
   void _get_latest_version(epoch_t oldest, epoch_t neweset,
-                          std::unique_ptr<OpCompletion> fin,
+                          OpCompletion fin,
                           std::unique_lock<ceph::shared_mutex>&& ul);
 
   /** Get the current set of global op flags */
@@ -2865,7 +2914,7 @@ public:
   epoch_t op_cancel_writes(int r, int64_t pool=-1);
 
   // commands
-  void osd_command(int osd, std::vector<std::string> cmd,
+  void osd_command_(int osd, std::vector<std::string> cmd,
                   ceph::buffer::list inbl, ceph_tid_t *ptid,
                   decltype(CommandOp::onfinish)&& onfinish) {
     ceph_assert(osd >= 0);
@@ -2880,17 +2929,20 @@ public:
   auto osd_command(int osd, std::vector<std::string> cmd,
                   ceph::buffer::list inbl, ceph_tid_t *ptid,
                   CompletionToken&& token) {
-    boost::asio::async_completion<CompletionToken,
-                                 CommandOp::OpSig> init(token);
-    osd_command(osd, std::move(cmd), std::move(inbl), ptid,
-               CommandOp::OpComp::create(service.get_executor(),
-                                         std::move(init.completion_handler)));
-    return init.result.get();
-  }
-
-  void pg_command(pg_t pgid, std::vector<std::string> cmd,
-                 ceph::buffer::list inbl, ceph_tid_t *ptid,
-                 decltype(CommandOp::onfinish)&& onfinish) {
+    auto consigned = boost::asio::consign(
+      std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+       service.get_executor()));
+    return boost::asio::async_initiate<decltype(consigned), CommandOp::OpSig>(
+      [osd, cmd = std::move(cmd), inbl = std::move(inbl), ptid, this]
+      (auto handler) {
+       osd_command_(osd, std::move(cmd), std::move(inbl), ptid,
+                    std::move(handler));
+      }, consigned);
+  }
+
+  void pg_command_(pg_t pgid, std::vector<std::string> cmd,
+                  ceph::buffer::list inbl, ceph_tid_t *ptid,
+                  decltype(CommandOp::onfinish)&& onfinish) {
     auto *c = new CommandOp(
       pgid,
       std::move(cmd),
@@ -2903,12 +2955,14 @@ public:
   auto pg_command(pg_t pgid, std::vector<std::string> cmd,
                  ceph::buffer::list inbl, ceph_tid_t *ptid,
                  CompletionToken&& token) {
-    boost::asio::async_completion<CompletionToken,
-                                 CommandOp::OpSig> init(token);
-    pg_command(pgid, std::move(cmd), std::move(inbl), ptid,
-              CommandOp::OpComp::create(service.get_executor(),
-                                        std::move(init.completion_handler)));
-    return init.result.get();
+    auto consigned = boost::asio::consign(
+      std::forward<CompletionToken>(token), boost::asio::make_work_guard(service.get_executor()));
+    return async_initiate<decltype(consigned), CommandOp::OpSig> (
+      [pgid, cmd = std::move(cmd), inbl = std::move(inbl), ptid, this]
+      (auto handler) {
+       pg_command_(pgid, std::move(cmd), std::move(inbl), ptid,
+                   std::move(handler));
+      }, consigned);
   }
 
   // mid-level helpers
@@ -2949,7 +3003,7 @@ public:
   void mutate(const object_t& oid, const object_locator_t& oloc,
              ObjectOperation&& op, const SnapContext& snapc,
              ceph::real_time mtime, int flags,
-             std::unique_ptr<Op::OpComp>&& oncommit,
+             Op::OpComp oncommit,
              version_t *objver = NULL, osd_reqid_t reqid = osd_reqid_t(),
              ZTracer::Trace *parent_trace = nullptr) {
     Op *o = new Op(oid, oloc, std::move(op.ops), flags | global_op_flags |
@@ -2967,6 +3021,18 @@ public:
     op_submit(o);
   }
 
+  void mutate(const object_t& oid, const object_locator_t& oloc,
+             ObjectOperation&& op, const SnapContext& snapc,
+             ceph::real_time mtime, int flags,
+             std::unique_ptr<ceph::async::Completion<Op::OpSig>> oncommit,
+             version_t *objver = NULL, osd_reqid_t reqid = osd_reqid_t(),
+             ZTracer::Trace *parent_trace = nullptr) {
+    mutate(oid, oloc, std::move(op), snapc, mtime, flags,
+          [c = std::move(oncommit)](boost::system::error_code ec) mutable {
+            c->dispatch(std::move(c), ec);
+          }, objver, reqid, parent_trace);
+  }
+
   Op *prepare_read_op(
     const object_t& oid, const object_locator_t& oloc,
     ObjectOperation& op,
@@ -3008,7 +3074,7 @@ public:
 
   void read(const object_t& oid, const object_locator_t& oloc,
            ObjectOperation&& op, snapid_t snapid, ceph::buffer::list *pbl,
-           int flags, std::unique_ptr<Op::OpComp>&& onack,
+           int flags, Op::OpComp onack,
            version_t *objver = nullptr, int *data_offset = nullptr,
            uint64_t features = 0, ZTracer::Trace *parent_trace = nullptr) {
     Op *o = new Op(oid, oloc, std::move(op.ops), flags | global_op_flags |
@@ -3031,6 +3097,17 @@ public:
     op_submit(o);
   }
 
+  void read(const object_t& oid, const object_locator_t& oloc,
+           ObjectOperation&& op, snapid_t snapid, ceph::buffer::list *pbl,
+           int flags, std::unique_ptr<ceph::async::Completion<Op::OpSig>> onack,
+           version_t *objver = nullptr, int *data_offset = nullptr,
+           uint64_t features = 0, ZTracer::Trace *parent_trace = nullptr) {
+    read(oid, oloc, std::move(op), snapid, pbl, flags,
+        [c = std::move(onack)](boost::system::error_code e) mutable {
+          c->dispatch(std::move(c), e);
+        }, objver, data_offset, features, parent_trace);
+  }
+
 
   Op *prepare_pg_read_op(
     uint32_t hash, object_locator_t oloc,
@@ -3074,7 +3151,7 @@ public:
   ceph_tid_t pg_read(
     uint32_t hash, object_locator_t oloc,
     ObjectOperation& op, ceph::buffer::list *pbl, int flags,
-    std::unique_ptr<Op::OpComp>&& onack, epoch_t *reply_epoch, int *ctx_budget) {
+    Op::OpComp onack, epoch_t *reply_epoch, int *ctx_budget) {
     ceph_tid_t tid;
     Op *o = new Op(object_t(), oloc,
                   std::move(op.ops),
@@ -3118,6 +3195,18 @@ public:
     return linger_watch(info, op, snapc, mtime, inbl,
                        OpContextVert<ceph::buffer::list>(onfinish, nullptr), objver);
   }
+  ceph_tid_t linger_watch(LingerOp *info,
+                         ObjectOperation& op,
+                         const SnapContext& snapc, ceph::real_time mtime,
+                         ceph::buffer::list& inbl,
+                         std::unique_ptr<ceph::async::Completion<
+                           void(boost::system::error_code,
+                                ceph::buffer::list)>> onfinish,
+                         version_t *objver) {
+    return linger_watch(info, op, snapc, mtime, inbl,
+                       OpCompletionVert<ceph::buffer::list>(
+                         std::move(onfinish)), objver);
+  }
   ceph_tid_t linger_notify(LingerOp *info,
                           ObjectOperation& op,
                           snapid_t snap, ceph::buffer::list& inbl,
@@ -3133,6 +3222,17 @@ public:
                         OpContextVert(onack, poutbl),
                         objver);
   }
+  ceph_tid_t linger_notify(LingerOp *info,
+                          ObjectOperation& op,
+                          snapid_t snap, ceph::buffer::list& inbl,
+                          std::unique_ptr<ceph::async::Completion<
+                            void(boost::system::error_code,
+                                 ceph::buffer::list)>> onack,
+                          version_t *objver) {
+    return linger_notify(info, op, snap, inbl,
+                        OpCompletionVert<ceph::buffer::list>(
+                          std::move(onack)), objver);
+  }
   tl::expected<ceph::timespan,
               boost::system::error_code> linger_check(LingerOp *info);
   void linger_cancel(LingerOp *info);  // releases a reference
@@ -3713,15 +3813,27 @@ public:
     create_pool_snap(pool, snapName,
                     OpContextVert<ceph::buffer::list>(c, nullptr));
   }
+  void create_pool_snap(
+    int64_t pool, std::string_view snapName,
+    std::unique_ptr<ceph::async::Completion<PoolOp::OpSig>> c) {
+    create_pool_snap(pool, snapName,
+                    OpCompletionVert<ceph::buffer::list>(std::move(c)));
+  }
   void allocate_selfmanaged_snap(int64_t pool,
-                                std::unique_ptr<ceph::async::Completion<
+                                boost::asio::any_completion_handler<
                                 void(boost::system::error_code,
-                                     snapid_t)>> onfinish);
+                                     snapid_t)> onfinish);
   void allocate_selfmanaged_snap(int64_t pool, snapid_t* psnapid,
                                 Context* c) {
     allocate_selfmanaged_snap(pool,
                              OpContextVert(c, psnapid));
   }
+  void allocate_selfmanaged_snap(int64_t pool,
+                                std::unique_ptr<ceph::async::Completion<void(
+                                  boost::system::error_code, snapid_t)>> c) {
+    allocate_selfmanaged_snap(pool,
+                             OpCompletionVert<snapid_t>(std::move(c)));
+  }
   void delete_pool_snap(int64_t pool, std::string_view snapName,
                        decltype(PoolOp::onfinish)&& onfinish);
   void delete_pool_snap(int64_t pool, std::string_view snapName,
@@ -3729,6 +3841,12 @@ public:
     delete_pool_snap(pool, snapName,
                     OpContextVert<ceph::buffer::list>(c, nullptr));
   }
+  void delete_pool_snap(int64_t pool, std::string_view snapName,
+                       std::unique_ptr<ceph::async::Completion<void(
+                          boost::system::error_code, ceph::buffer::list)>> c) {
+    delete_pool_snap(pool, snapName,
+                    OpCompletionVert<ceph::buffer::list>(std::move(c)));
+  }
 
   void delete_selfmanaged_snap(int64_t pool, snapid_t snap,
                               decltype(PoolOp::onfinish)&& onfinish);
@@ -3737,6 +3855,12 @@ public:
     delete_selfmanaged_snap(pool, snap,
                            OpContextVert<ceph::buffer::list>(c, nullptr));
   }
+  void delete_selfmanaged_snap(int64_t pool, snapid_t snap,
+                              std::unique_ptr<ceph::async::Completion<void(
+                                 boost::system::error_code, ceph::buffer::list)>> c) {
+    delete_selfmanaged_snap(pool, snap,
+                           OpCompletionVert<ceph::buffer::list>(std::move(c)));
+  }
 
 
   void create_pool(std::string_view name,
@@ -3748,12 +3872,25 @@ public:
                OpContextVert<ceph::buffer::list>(onfinish, nullptr),
                crush_rule);
   }
+  void create_pool(std::string_view name,
+                  std::unique_ptr<ceph::async::Completion<void(
+                     boost::system::error_code, ceph::buffer::list)>> c,
+                  int crush_rule=-1) {
+    create_pool(name,
+               OpCompletionVert<ceph::buffer::list>(std::move(c)),
+               crush_rule);
+  }
   void delete_pool(int64_t pool,
                   decltype(PoolOp::onfinish)&& onfinish);
   void delete_pool(int64_t pool,
                   Context* onfinish) {
     delete_pool(pool, OpContextVert<ceph::buffer::list>(onfinish, nullptr));
   }
+  void delete_pool(int64_t pool,
+                  std::unique_ptr<ceph::async::Completion<void(
+                    boost::system::error_code, ceph::buffer::list)>> c) {
+    delete_pool(pool, OpCompletionVert<ceph::buffer::list>(std::move(c)));
+  }
 
   void delete_pool(std::string_view name,
                   decltype(PoolOp::onfinish)&& onfinish);
@@ -3762,6 +3899,11 @@ public:
                   Context* onfinish) {
     delete_pool(name, OpContextVert<ceph::buffer::list>(onfinish, nullptr));
   }
+  void delete_pool(std::string_view name,
+                  std::unique_ptr<ceph::async::Completion<void(
+                     boost::system::error_code, ceph::buffer::list)>> c) {
+    delete_pool(name, OpCompletionVert<ceph::buffer::list>(std::move(c)));
+  }
 
   void handle_pool_op_reply(MPoolOpReply *m);
   int pool_op_cancel(ceph_tid_t tid, int r);
@@ -3772,18 +3914,18 @@ private:
   void _poolstat_submit(PoolStatOp *op);
 public:
   void handle_get_pool_stats_reply(MGetPoolStatsReply *m);
-  void get_pool_stats(const std::vector<std::string>& pools,
-                     decltype(PoolStatOp::onfinish)&& onfinish);
+  void get_pool_stats_(const std::vector<std::string>& pools,
+                      decltype(PoolStatOp::onfinish)&& onfinish);
   template<typename CompletionToken>
-  auto get_pool_stats(const std::vector<std::string>& pools,
+  auto get_pool_stats(std::vector<std::string> pools,
                      CompletionToken&& token) {
-    boost::asio::async_completion<CompletionToken,
-                                 PoolStatOp::OpSig> init(token);
-    get_pool_stats(pools,
-                  PoolStatOp::OpComp::create(
-                    service.get_executor(),
-                    std::move(init.completion_handler)));
-    return init.result.get();
+    auto consigned = boost::asio::consign(
+      std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+       service.get_executor()));
+    return boost::asio::async_initiate<decltype(consigned), PoolStatOp::OpSig>(
+      [pools = std::move(pools), this](auto handler) {
+       get_pool_stats_(pools, std::move(handler));
+      }, consigned);
   }
   int pool_stat_op_cancel(ceph_tid_t tid, int r);
   void _finish_pool_stat_op(PoolStatOp *op, int r);
@@ -3794,20 +3936,27 @@ private:
   void _fs_stats_submit(StatfsOp *op);
 public:
   void handle_fs_stats_reply(MStatfsReply *m);
-  void get_fs_stats(std::optional<int64_t> poolid,
-                   decltype(StatfsOp::onfinish)&& onfinish);
+  void get_fs_stats_(std::optional<int64_t> poolid,
+                    decltype(StatfsOp::onfinish)&& onfinish);
   template<typename CompletionToken>
   auto get_fs_stats(std::optional<int64_t> poolid,
                    CompletionToken&& token) {
-    boost::asio::async_completion<CompletionToken, StatfsOp::OpSig> init(token);
-    get_fs_stats(poolid,
-                StatfsOp::OpComp::create(service.get_executor(),
-                                         std::move(init.completion_handler)));
-    return init.result.get();
+    auto consigned = boost::asio::consign(
+      std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+       service.get_executor()));
+    return boost::asio::async_initiate<decltype(consigned), StatfsOp::OpSig>(
+      [poolid, this](auto handler) {
+       get_fs_stats_(poolid, std::move(handler));
+      }, consigned);
   }
   void get_fs_stats(struct ceph_statfs& result, std::optional<int64_t> poolid,
                    Context *onfinish) {
-    get_fs_stats(poolid, OpContextVert(onfinish, result));
+    get_fs_stats_(poolid, OpContextVert(onfinish, result));
+  }
+  void get_fs_stats(std::optional<int64_t> poolid,
+                   std::unique_ptr<ceph::async::Completion<void(
+                      boost::system::error_code, struct ceph_statfs)>> c) {
+    get_fs_stats_(poolid, OpCompletionVert<struct ceph_statfs>(std::move(c)));
   }
   int statfs_op_cancel(ceph_tid_t tid, int r);
   void _finish_statfs_op(StatfsOp *op, int r);
index 4bf4fc037ecf1bd1bcf75e7a4352dfc570fd3389..c1b29bad6c476765588e4ca784184cb43c86403a 100644 (file)
@@ -22,6 +22,7 @@
 #include <boost/system/system_error.hpp>
 
 namespace bs = boost::system;
+namespace asio = boost::asio;
 using namespace std::literals;
 using namespace std::placeholders;
 
@@ -82,7 +83,7 @@ public:
 namespace {
 
 struct CompletionPayload {
-  std::unique_ptr<Op::Completion> c;
+  Op::Completion c;
 };
 
 void completion_callback_adapter(rados_completion_t c, void *arg) {
@@ -91,14 +92,14 @@ void completion_callback_adapter(rados_completion_t c, void *arg) {
   impl->release();
 
   auto payload = reinterpret_cast<CompletionPayload*>(arg);
-  payload->c->defer(std::move(payload->c),
-                    (r < 0) ? bs::error_code(-r, osd_category()) :
-                              bs::error_code());
+  asio::dispatch(asio::append(std::move(payload->c),
+                             (r < 0) ? bs::error_code(-r, osd_category()) :
+                             bs::error_code()));
   delete payload;
 }
 
 librados::AioCompletionImpl* create_aio_completion(
-    std::unique_ptr<Op::Completion>&& c) {
+  Op::Completion&& c) {
   auto payload = new CompletionPayload{std::move(c)};
 
   auto impl = new librados::AioCompletionImpl();
@@ -588,12 +589,12 @@ boost::asio::io_context::executor_type neorados::RADOS::get_executor() const {
   return impl->io_context.get_executor();
 }
 
-void RADOS::execute(Object o, IOContext ioc, ReadOp op,
-                    ceph::buffer::list* bl, std::unique_ptr<Op::Completion> c,
-                    uint64_t* objver, const blkin_trace_info* trace_info) {
+void RADOS::execute_(Object o, IOContext ioc, ReadOp op,
+                    ceph::buffer::list* bl, Op::Completion c,
+                    uint64_t* objver, const blkin_trace_info* trace_info) {
   auto io_ctx = impl->get_io_ctx(ioc);
   if (io_ctx == nullptr) {
-    c->dispatch(std::move(c), osdc_errc::pool_dne);
+    asio::dispatch(asio::append(std::move(c), osdc_errc::pool_dne));
     return;
   }
 
@@ -607,12 +608,12 @@ void RADOS::execute(Object o, IOContext ioc, ReadOp op,
   ceph_assert(r == 0);
 }
 
-void RADOS::execute(Object o, IOContext ioc, WriteOp op,
-                    std::unique_ptr<Op::Completion> c, uint64_t* objver,
-                    const blkin_trace_info* trace_info) {
+void RADOS::execute_(Object o, IOContext ioc, WriteOp op,
+                    Op::Completion c, uint64_t* objver,
+                    const blkin_trace_info* trace_info) {
   auto io_ctx = impl->get_io_ctx(ioc);
   if (io_ctx == nullptr) {
-    c->dispatch(std::move(c), osdc_errc::pool_dne);
+    asio::dispatch(asio::append(std::move(c), osdc_errc::pool_dne));
     return;
   }
 
@@ -629,29 +630,33 @@ void RADOS::execute(Object o, IOContext ioc, WriteOp op,
   ceph_assert(r == 0);
 }
 
-void RADOS::mon_command(std::vector<std::string> command,
-                        bufferlist bl,
-                        std::string* outs, bufferlist* outbl,
-                        std::unique_ptr<Op::Completion> c) {
+void RADOS::mon_command_(std::vector<std::string> command,
+                        bufferlist bl,
+                        std::string* outs, bufferlist* outbl,
+                        Op::Completion c) {
   auto r = impl->test_rados_client->mon_command(command, bl, outbl, outs);
-  c->post(std::move(c),
-          (r < 0 ? bs::error_code(-r, osd_category()) : bs::error_code()));
+  asio::post(get_executor(),
+            asio::append(std::move(c),
+                         (r < 0 ? bs::error_code(-r, osd_category()) :
+                          bs::error_code())));
 }
 
-void RADOS::blocklist_add(std::string client_address,
-                          std::optional<std::chrono::seconds> expire,
-                          std::unique_ptr<SimpleOpComp> c) {
+void RADOS::blocklist_add_(std::string client_address,
+                          std::optional<std::chrono::seconds> expire,
+                          SimpleOpComp c) {
   auto r = impl->test_rados_client->blocklist_add(
     std::string(client_address), expire.value_or(0s).count());
-  c->post(std::move(c),
-          (r < 0 ? bs::error_code(-r, mon_category()) : bs::error_code()));
+  asio::post(get_executor(),
+            asio::append(std::move(c),
+                         (r < 0 ? bs::error_code(-r, mon_category()) :
+                          bs::error_code())));
 }
 
-void RADOS::wait_for_latest_osd_map(std::unique_ptr<Op::Completion> c) {
+void RADOS::wait_for_latest_osd_map_(Op::Completion c) {
   auto r = impl->test_rados_client->wait_for_latest_osd_map();
-  c->dispatch(std::move(c),
-              (r < 0 ? bs::error_code(-r, osd_category()) :
-                       bs::error_code()));
+  asio::dispatch(asio::append(std::move(c),
+                             (r < 0 ? bs::error_code(-r, osd_category()) :
+                              bs::error_code())));
 }
 
 } // namespace neorados