From: Adam C. Emerson Date: Wed, 29 Mar 2023 05:35:22 +0000 (-0400) Subject: neorados: Use `asio::any_completion_handler` X-Git-Tag: v19.3.0~349^2~23 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=ea67f3dee2a3f8fcdcbb0bc0e80e38ec70378f05;p=ceph.git neorados: Use `asio::any_completion_handler` As we'd like to reduce (and eliminate) internal Ceph dependencies to the extent possible, now that Boost.Asio has a type-erased handler type, let's use it. Signed-off-by: Adam C. Emerson --- diff --git a/src/include/neorados/RADOS.hpp b/src/include/neorados/RADOS.hpp index 1b0443f30cc..9bbe10b6670 100644 --- a/src/include/neorados/RADOS.hpp +++ b/src/include/neorados/RADOS.hpp @@ -27,7 +27,11 @@ #include #include +#include +#include #include +#include +#include #include #include @@ -48,11 +52,6 @@ #include "include/neorados/RADOS_Decodable.hpp" -// Needed for type erasure and template support. We can't really avoid -// it. - -#include "common/async/completion.h" - // These are needed for RGW, but in general as a 'shiny new interface' // we should try to use forward declarations and provide standard alternatives. @@ -279,7 +278,7 @@ public: std::size_t size() const; using Signature = void(boost::system::error_code); - using Completion = ceph::async::Completion; + using Completion = boost::asio::any_completion_handler; friend std::ostream& operator <<(std::ostream& m, const Op& o); protected: @@ -489,7 +488,7 @@ public: } using BuildSig = void(boost::system::error_code, RADOS); - using BuildComp = ceph::async::Completion; + using BuildComp = boost::asio::any_completion_handler; class Builder { std::optional conf_files; std::optional cluster; @@ -522,31 +521,34 @@ public: return *this; } - template + template CompletionToken> auto build(boost::asio::io_context& ioctx, CompletionToken&& token) { - return boost::asio::async_initiate( - [&ioctx, this](auto&& handler) { - build(ioctx, BuildComp::create(ioctx.get_executor(), - std::move(handler))); - }, token); + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + boost::asio::get_associated_executor(token, ioctx.get_executor()))); + return boost::asio::async_initiate( + [&ioctx, this](auto handler) { + build_(ioctx, std::move(handler)); + }, consigned); } private: - void build(boost::asio::io_context& ioctx, - std::unique_ptr c); + void build_(boost::asio::io_context& ioctx, + BuildComp c); }; - template + template CompletionToken> static auto make_with_cct(CephContext* cct, boost::asio::io_context& ioctx, CompletionToken&& token) { - return boost::asio::async_initiate( + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + boost::asio::get_associated_executor(token, ioctx.get_executor()))); + return boost::asio::async_initiate( [cct, &ioctx](auto&& handler) { - make_with_cct(cct, ioctx, - BuildComp::create(ioctx.get_executor(), - std::move(handler))); - }, token); + make_with_cct_(cct, ioctx, std::move(handler)); + }, consigned); } static RADOS make_with_librados(librados::Rados& rados); @@ -565,164 +567,183 @@ public: executor_type get_executor() const; boost::asio::io_context& get_io_context(); - template + template CompletionToken> auto execute(Object o, IOContext ioc, ReadOp op, ceph::buffer::list* bl, CompletionToken&& token, uint64_t* objver = nullptr, const blkin_trace_info* trace_info = nullptr) { - return boost::asio::async_initiate( + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + boost::asio::get_associated_executor(token, get_executor()))); + return boost::asio::async_initiate( [o = std::move(o), ioc = std::move(ioc), op = std::move(op), bl, objver, trace_info, this](auto&& handler) mutable { - execute(std::move(o), std::move(ioc), std::move(op), bl, - ReadOp::Completion::create(get_executor(), - std::move(handler)), - objver, trace_info); - }, token); + execute_(std::move(o), std::move(ioc), std::move(op), bl, + std::move(handler), objver, trace_info); + }, consigned); } - template + template CompletionToken> auto execute(Object o, IOContext ioc, WriteOp op, CompletionToken&& token, uint64_t* objver = nullptr, const blkin_trace_info* trace_info = nullptr) { - return boost::asio::async_initiate( + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + boost::asio::get_associated_executor(token, get_executor()))); + return boost::asio::async_initiate( [o = std::move(o), ioc = std::move(ioc), op = std::move(op), objver, trace_info, this](auto&& handler) mutable { - execute(std::move(o), std::move(ioc), std::move(op), - WriteOp::Completion::create(get_executor(), - std::move(handler)), - objver, trace_info); - }, token); + execute_(std::move(o), std::move(ioc), std::move(op), + std::move(handler), objver, trace_info); + }, consigned); } boost::uuids::uuid get_fsid() const noexcept; using LookupPoolSig = void(boost::system::error_code, std::int64_t); - using LookupPoolComp = ceph::async::Completion; - template + using LookupPoolComp = boost::asio::any_completion_handler; + template CompletionToken> auto lookup_pool(std::string name, CompletionToken&& token) { - return boost::asio::async_initiate( + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + boost::asio::get_associated_executor(token, get_executor()))); + return boost::asio::async_initiate( [name = std::move(name), this](auto&& handler) mutable { - lookup_pool(std::move(name), - LookupPoolComp::create(get_executor(), std::move(handler))); - }, token); + lookup_pool_(std::move(name), std::move(handler)); + }, consigned); } std::optional get_pool_alignment(int64_t pool_id); using LSPoolsSig = void(std::vector>); - using LSPoolsComp = ceph::async::Completion; - template + using LSPoolsComp = boost::asio::any_completion_handler; + template CompletionToken> auto list_pools(CompletionToken&& token) { - return boost::asio::async_initiate( + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + boost::asio::get_associated_executor(token, get_executor()))); + return boost::asio::async_initiate( [this](auto&& handler) { - list_pools(LSPoolsComp::create(get_executor(), - std::move(handler))); - }, token); + list_pools_(std::move(handler)); + }, consigned); } using SimpleOpSig = void(boost::system::error_code); - using SimpleOpComp = ceph::async::Completion; - template + using SimpleOpComp = boost::asio::any_completion_handler; + template CompletionToken> auto create_pool_snap(int64_t pool, std::string snap_name, CompletionToken&& token) { - return boost::asio::async_initiate( + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + boost::asio::get_associated_executor(token, get_executor()))); + return boost::asio::async_initiate( [snap_name = std::move(snap_name), pool, this](auto&& handler) mutable { - create_pool_snap(pool, std::move(snap_name), - SimpleOpComp::create(get_executor(), - std::move(handler))); - }, token); + create_pool_snap_(pool, std::move(snap_name), + std::move(handler)); + }, consigned); } using SMSnapSig = void(boost::system::error_code, std::uint64_t); - using SMSnapComp = ceph::async::Completion; - template + using SMSnapComp = boost::asio::any_completion_handler; + template CompletionToken> auto allocate_selfmanaged_snap(int64_t pool, CompletionToken&& token) { - return boost::asio::async_initiate( - [pool, this](auto&& handler) { - allocage_selfmanaged_snap(pool, - SMSnapComp::create(get_executor(), - std::move(handler))); - }, token); + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + boost::asio::get_associated_executor(token, get_executor()))); + return boost::asio::async_initiate( + [pool, this](auto&& handler) mutable { + allocage_selfmanaged_snap_(pool, std::move(handler)); + }, consigned); } - template + template CompletionToken> auto delete_pool_snap(int64_t pool, std::string snap_name, CompletionToken&& token) { - return boost::asio::async_initiate( + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + boost::asio::get_associated_executor(token, get_executor()))); + return boost::asio::async_initiate( [snap_name = std::move(snap_name), pool, this](auto&& handler) mutable { - delete_pool_snap(pool, std::move(snap_name), - SimpleOpComp::create(get_executor(), - std::move(handler))); - }, token); + delete_pool_snap_(pool, std::move(snap_name), + std::move(handler)); + }, consigned); } - template + template CompletionToken> auto delete_selfmanaged_snap(int64_t pool, std::uint64_t snap, CompletionToken&& token) { - return boost::asio::async_initiate( - [snap, pool, this](auto&& handler) { - delete_selfmanaged_snap(pool, std::move(snap), - SimpleOpComp::create(get_executor(), - std::move(handler))); - }, token); + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + boost::asio::get_associated_executor(token, get_executor()))); + return boost::asio::async_initiate( + [pool, snap, this](auto&& handler) mutable { + delete_selfmanaged_snap_(pool, snap, std::move(handler)); + }, consigned); } - template + template CompletionToken> auto create_pool(std::string name, std::optional crush_rule, CompletionToken&& token) { - return boost::asio::async_initiate( + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + boost::asio::get_associated_executor(token, get_executor()))); + return boost::asio::async_initiate( [name = std::move(name), crush_rule, this](auto&& handler) mutable { - create_pool(std::move(name), crush_rule, - SimpleOpComp::create(get_executor(), - std::move(handler))); - }, token); + create_pool_(std::move(name), crush_rule, + std::move(handler)); + }, consigned); } - template + template CompletionToken> auto delete_pool(std::string name, CompletionToken&& token) { - return boost::asio::async_initiate( + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + boost::asio::get_associated_executor(token, get_executor()))); + return boost::asio::async_initiate( [name = std::move(name), this](auto&& handler) mutable { - delete_pool(std::move(name), - SimpleOpComp::create(get_executor(), - std::move(handler))); - }, token); + delete_pool_(std::move(name), std::move(handler)); + }, consigned); } - template + template CompletionToken> auto delete_pool(int64_t pool, CompletionToken&& token) { - return boost::asio::async_initiate( + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + boost::asio::get_associated_executor(token, get_executor()))); + return boost::asio::async_initiate( [pool, this](auto&& handler) mutable { - delete_pool(pool, - SimpleOpComp::create(get_executor(), - std::move(handler))); - }, token); + delete_pool_(pool, std::move(handler)); + }, consigned); } using PoolStatSig = void(boost::system::error_code, boost::container::flat_map, bool); - using PoolStatComp = ceph::async::Completion; - template + PoolStats>, bool); + using PoolStatComp = boost::asio::any_completion_handler; + template CompletionToken> auto stat_pools(std::vector pools, CompletionToken&& token) { - return boost::asio::async_initiate( + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + boost::asio::get_associated_executor(token, get_executor()))); + return boost::asio::async_initiate( [pools = std::move(pools), this](auto&& handler) mutable { - stat_pools(std::move(pools), - PoolStatComp::create(get_executor(), - std::move(handler))); - }, token); + stat_pools_(std::move(pools), std::move(handler)); + }, consigned); } using StatFSSig = void(boost::system::error_code, FSStats); - using StatFSComp = ceph::async::Completion; - template + using StatFSComp = boost::asio::any_completion_handler; + template CompletionToken> auto statfs(std::optional pool, CompletionToken&& token) { - return boost::asio::async_initiate( - [pool, this](auto&& handler) { - statfs(pool, StatFSComp::create(get_executor(), - std::move(handler))); - }, token); + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + boost::asio::get_associated_executor(token, get_executor()))); + return boost::asio::async_initiate( + [pool, this](auto&& handler) mutable { + statfs_(pool, std::move(handler)); + }, consigned); } using WatchCB = fu2::unique_function; - template + using WatchComp = boost::asio::any_completion_handler; + template CompletionToken> auto watch(Object o, IOContext ioc, std::optional timeout, WatchCB cb, CompletionToken&& token) { - return boost::asio::async_initiate( + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + boost::asio::get_associated_executor(token, get_executor()))); + return boost::asio::async_initiate( [o = std::move(o), ioc = std::move(ioc), timeout, cb = std::move(cb), this](auto&& handler) mutable { - watch(std::move(o), std::move(ioc), timeout, std::move(cb), - WatchComp::create(get_executor(), std::move(handler))); - }, token); + watch_(std::move(o), std::move(ioc), timeout, std::move(cb), + std::move(handler)); + }, consigned); } - template + template CompletionToken> auto notify_ack(Object o, IOContext ioc, - uint64_t notify_id, - uint64_t cookie, + uint64_t notify_id, uint64_t cookie, ceph::buffer::list bl, CompletionToken&& token) { - return boost::asio::async_initiate( + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + boost::asio::get_associated_executor(token, get_executor()))); + return boost::asio::async_initiate( [o = std::move(o), ioc = std::move(ioc), notify_id, cookie, bl = std::move(bl), this](auto&& handler) mutable { - notify_ack(std::move(o), std::move(ioc), notify_id, cookie, - std::move(bl), - SimpleOpComp::create(get_executor(), - std::move(handler))); - }, token); + notify_ack_(std::move(o), std::move(ioc), std::move(notify_id), + std::move(cookie), std::move(bl), std::move(handler)); + }, consigned); } - template - auto unwatch(uint64_t cookie, IOContext ioc, + template CompletionToken> + auto unwatch(std::uint64_t cookie, IOContext ioc, CompletionToken&& token) { - return boost::asio::async_initiate( + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + boost::asio::get_associated_executor(token, get_executor()))); + return boost::asio::async_initiate( [cookie, ioc = std::move(ioc), this](auto&& handler) mutable { - unwatch(cookie, std::move(ioc), - SimpleOpComp::create(get_executor(), std::move(handler))); - }, token); + unwatch_(cookie, std::move(ioc), std::move(handler)); + }, consigned); } // This is one of those places where having to force everything into @@ -777,29 +803,33 @@ public: // let us separate out the implementation details without // sacrificing all the benefits of templates. using VoidOpSig = void(); - using VoidOpComp = ceph::async::Completion; - template + using VoidOpComp = boost::asio::any_completion_handler; + template CompletionToken> auto flush_watch(CompletionToken&& token) { - return boost::asio::async_initiate( + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + boost::asio::get_associated_executor(token, get_executor()))); + return boost::asio::async_initiate( [this](auto&& handler) { - flush_watch(VoidOpComp::create(get_executor(), - std::move(handler))); - }, token); + flush_watch_(std::move(handler)); + }, consigned); } using NotifySig = void(boost::system::error_code, ceph::buffer::list); - using NotifyComp = ceph::async::Completion; - template - auto notify(Object oid, IOContext ioc, ceph::buffer::list bl, + using NotifyComp = boost::asio::any_completion_handler; + template CompletionToken> + auto notify(Object o, IOContext ioc, ceph::buffer::list bl, std::optional timeout, CompletionToken&& token) { - return boost::asio::async_initiate( - [oid = std::move(oid), ioc = std::move(ioc), - bl = std::move(bl), timeout, this](auto&& handler) mutable { - notify(std::move(oid), std::move(ioc), std::move(bl), timeout, - NotifyComp::create(get_executor(), - std::move(handler))); - }, token); + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + boost::asio::get_associated_executor(token, get_executor()))); + return boost::asio::async_initiate( + [o = std::move(o), ioc = std::move(ioc), bl = std::move(bl), timeout, + this](auto&& handler) mutable { + notify_(std::move(o), std::move(ioc), std::move(bl), timeout, + std::move(handler)); + }, consigned); } // The versions with pointers are fine for coroutines, but @@ -807,94 +837,108 @@ public: using EnumerateSig = void(boost::system::error_code, std::vector, Cursor); - using EnumerateComp = ceph::async::Completion; - template + using EnumerateComp = boost::asio::any_completion_handler; + template CompletionToken> auto enumerate_objects(IOContext ioc, Cursor begin, Cursor end, std::uint32_t max, ceph::buffer::list filter, CompletionToken&& token) { - return boost::asio::async_initiate( + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + boost::asio::get_associated_executor(token, get_executor()))); + return boost::asio::async_initiate( [ioc = std::move(ioc), begin = std::move(begin), end = std::move(end), max, filter = std::move(filter), this](auto&& handler) mutable { - enumerate_objects(std::move(ioc), std::move(begin), std::move(end), - max, std::move(filter), - EnumerateComp::create(get_executor(), - std::move(handler))); - }, token); + enumerate_objects_(std::move(ioc), std::move(begin), std::move(end), + std::move(max), std::move(filter), + std::move(handler)); + }, consigned); } using CommandSig = void(boost::system::error_code, std::string, ceph::buffer::list); - using CommandComp = ceph::async::Completion; - template + using CommandComp = boost::asio::any_completion_handler; + template CompletionToken> auto osd_command(int osd, std::vector cmd, ceph::buffer::list in, CompletionToken&& token) { - return boost::asio::async_initiate( + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + boost::asio::get_associated_executor(token, get_executor()))); + return boost::asio::async_initiate( [osd, cmd = std::move(cmd), in = std::move(in), this](auto&& handler) mutable { - osd_command(osd, std::move(cmd), std::move(in), - CommandComp::create(get_executor(), - std::move(handler))); - }, token); + osd_command_(osd, std::move(cmd), std::move(in), + std::move(handler)); + }, consigned); } - template + template CompletionToken> auto pg_command(PG pg, std::vector cmd, ceph::buffer::list in, CompletionToken&& token) { - return boost::asio::async_initiate( - [pg, cmd = std::move(cmd), in = std::move(in), + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + boost::asio::get_associated_executor(token, get_executor()))); + return boost::asio::async_initiate( + [pg = std::move(pg), cmd = std::move(cmd), in = std::move(in), this](auto&& handler) mutable { - pg_command(pg, std::move(cmd), std::move(in), - CommandComp::create(get_executor(), - std::move(handler))); - }, token); + pg_command_(std::move(pg), std::move(cmd), std::move(in), + std::move(handler)); + }, consigned); } - template + template CompletionToken> auto mon_command(std::vector command, ceph::buffer::list bl, std::string* outs, ceph::buffer::list* outbl, CompletionToken&& token) { - return boost::asio::async_initiate( - [command = std::move(command), bl = std::move(bl), outs, - outbl, this](auto&& handler) mutable { - mon_command(std::move(command), std::move(bl), outs, outbl, - SimpleOpComp::create(get_executor(), - std::move(handler))); - }, token); + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + boost::asio::get_associated_executor(token, get_executor()))); + return boost::asio::async_initiate( + [command = std::move(command), bl = std::move(bl), outs, outbl, + this](auto&& handler) mutable { + mon_command_(std::move(command), std::move(bl), outs, outbl, + std::move(handler)); + }, consigned); } - template + template CompletionToken> auto enable_application(std::string pool, std::string app_name, bool force, CompletionToken&& token) { - return boost::asio::async_initiate( + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + boost::asio::get_associated_executor(token, get_executor()))); + return boost::asio::async_initiate( [pool = std::move(pool), app_name = std::move(app_name), force, this](auto&& handler) mutable { - enable_application(std::move(pool), std::move(app_name), force, - SimpleOpComp::create(get_executor(), - std::move(handler))); - }, token); + enable_application_(std::move(pool), std::move(app_name), force, + std::move(handler)); + }, consigned); } - template + template CompletionToken> auto blocklist_add(std::string client_address, std::optional expire, CompletionToken&& token) { - return boost::asio::async_initiate( + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + boost::asio::get_associated_executor(token, get_executor()))); + return boost::asio::async_initiate( [client_address = std::move(client_address), expire, this](auto&& handler) mutable { - blocklist_add(std::move(client_address), expire, - SimpleOpComp::create(get_executor(), - std::move(handler))); - }, token); + blocklist_add_(std::move(client_address), expire, + std::move(handler)); + }, consigned); } - template + template CompletionToken> auto wait_for_latest_osd_map(CompletionToken&& token) { - return boost::asio::async_initiate( + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + boost::asio::get_associated_executor(token, get_executor()))); + return boost::asio::async_initiate( [this](auto&& handler) { - wait_for_latest_osd_map(SimpleOpComp::create(get_executor(), - std::move(handler))); - }, token); + wait_for_latest_osd_map_(std::move(handler)); + }, consigned); } uint64_t instance_id() const; @@ -906,77 +950,83 @@ private: friend Builder; RADOS(std::unique_ptr impl); - static void make_with_cct(CephContext* cct, - boost::asio::io_context& ioctx, - std::unique_ptr c); - - void execute(Object o, IOContext ioc, ReadOp op, - ceph::buffer::list* bl, std::unique_ptr c, - uint64_t* objver, const blkin_trace_info* trace_info); - - void execute(Object o, IOContext ioc, WriteOp op, - std::unique_ptr c, uint64_t* objver, - const blkin_trace_info* trace_info); - - void lookup_pool(std::string name, std::unique_ptr c); - void list_pools(std::unique_ptr c); - void create_pool_snap(int64_t pool, std::string snap_name, - std::unique_ptr c); - void allocate_selfmanaged_snap(int64_t pool, std::unique_ptr c); - void delete_pool_snap(int64_t pool, std::string snap_name, - std::unique_ptr c); - void delete_selfmanaged_snap(int64_t pool, std::uint64_t snap, - std::unique_ptr c); - void create_pool(std::string name, std::optional crush_rule, - std::unique_ptr c); - void delete_pool(std::string name, - std::unique_ptr c); - void delete_pool(int64_t pool, std::unique_ptr c); - void stat_pools(std::vector pools, - std::unique_ptr c); - void stat_fs(std::optional pool, - std::unique_ptr c); - - void watch(Object o, IOContext ioc, - std::optional timeout, - WatchCB cb, std::unique_ptr c); + static void make_with_cct_(CephContext* cct, + boost::asio::io_context& ioctx, + BuildComp c); + + void execute_(Object o, IOContext ioc, ReadOp op, + ceph::buffer::list* bl, Op::Completion c, + uint64_t* objver, const blkin_trace_info* trace_info); + + void execute_(Object o, IOContext ioc, WriteOp op, + Op::Completion c, uint64_t* objver, + const blkin_trace_info* trace_info); + + void lookup_pool_(std::string name, LookupPoolComp c); + void list_pools_(LSPoolsComp c); + void create_pool_snap_(int64_t pool, std::string snap_name, + SimpleOpComp c); + void allocate_selfmanaged_snap_(int64_t pool, SMSnapComp c); + void delete_pool_snap_(int64_t pool, std::string snap_name, + SimpleOpComp c); + void delete_selfmanaged_snap_(int64_t pool, std::uint64_t snap, + SimpleOpComp c); + void create_pool_(std::string name, std::optional crush_rule, + SimpleOpComp c); + void delete_pool_(std::string name, + SimpleOpComp c); + void delete_pool_(int64_t pool, + SimpleOpComp c); + void stat_pools_(std::vector pools, + PoolStatComp c); + void stat_fs_(std::optional pool, + StatFSComp c); + void watch_(Object o, IOContext ioc, + std::optional timeout, + WatchCB cb, WatchComp c); tl::expected - watch_check(uint64_t cookie); - void notify_ack(Object o, IOContext _ioc, - uint64_t notify_id, - uint64_t cookie, - ceph::buffer::list bl, - std::unique_ptr); - void unwatch(uint64_t cookie, IOContext ioc, - std::unique_ptr); - void notify(Object oid, IOContext ioctx, - ceph::buffer::list bl, - std::optional timeout, - std::unique_ptr c); - void flush_watch(std::unique_ptr); - - void enumerate_objects(IOContext ioc, Cursor begin, - Cursor end, std::uint32_t max, - ceph::buffer::list filter, - std::unique_ptr c); - void osd_command(int osd, std::vector cmd, - ceph::buffer::list in, std::unique_ptr c); - void pg_command(PG pg, std::vector cmd, - ceph::buffer::list in, std::unique_ptr c); - - void mon_command(std::vector command, + watch_check_(uint64_t cookie); + void notify_ack_(Object o, IOContext _ioc, + uint64_t notify_id, + uint64_t cookie, ceph::buffer::list bl, - std::string* outs, ceph::buffer::list* outbl, - std::unique_ptr c); - - void enable_application(std::string pool, std::string app_name, - bool force, std::unique_ptr c); - - void blocklist_add(std::string client_address, - std::optional expire, - std::unique_ptr c); - - void wait_for_latest_osd_map(std::unique_ptr c); + SimpleOpComp); + void unwatch_(uint64_t cookie, IOContext ioc, + SimpleOpComp); + void notify_(Object oid, IOContext ioctx, + ceph::buffer::list bl, + std::optional timeout, + NotifyComp c); + void flush_watch_(VoidOpComp); + + void enumerate_objects_(IOContext ioc, Cursor begin, + Cursor end, std::uint32_t max, + ceph::buffer::list filter, + std::vector* ls, + Cursor* cursor, + SimpleOpComp c); + void enumerate_objects_(IOContext ioc, Cursor begin, + Cursor end, std::uint32_t max, + ceph::buffer::list filter, + EnumerateComp c); + void osd_command_(int osd, std::vector cmd, + ceph::buffer::list in, CommandComp c); + void pg_command_(PG pg, std::vector cmd, + ceph::buffer::list in, CommandComp c); + + void mon_command_(std::vector command, + ceph::buffer::list bl, + std::string* outs, ceph::buffer::list* outbl, + SimpleOpComp c); + + void enable_application_(std::string pool, std::string app_name, + bool force, SimpleOpComp c); + + void blocklist_add_(std::string client_address, + std::optional expire, + SimpleOpComp c); + + void wait_for_latest_osd_map_(SimpleOpComp c); // Proxy object to provide access to low-level RADOS messaging clients std::unique_ptr impl; diff --git a/src/librados/IoCtxImpl.cc b/src/librados/IoCtxImpl.cc index e1d38fd014a..d66b56560f9 100644 --- a/src/librados/IoCtxImpl.cc +++ b/src/librados/IoCtxImpl.cc @@ -1788,9 +1788,12 @@ int librados::IoCtxImpl::notify(const object_t& oid, bufferlist& bl, extra_op_flags); C_SaferCond notify_finish_cond; + auto e = boost::asio::prefer( + objecter->service.get_executor(), + boost::asio::execution::outstanding_work.tracked); linger_op->on_notify_finish = - Objecter::LingerOp::OpComp::create( - objecter->service.get_executor(), + boost::asio::bind_executor( + std::move(e), CB_notify_Finish(client->cct, ¬ify_finish_cond, objecter, linger_op, preply_bl, preply_buf, preply_buf_len)); @@ -1844,9 +1847,12 @@ int librados::IoCtxImpl::aio_notify(const object_t& oid, AioCompletionImpl *c, c->io = this; C_aio_notify_Complete *oncomplete = new C_aio_notify_Complete(c, linger_op); + auto e = boost::asio::prefer( + objecter->service.get_executor(), + boost::asio::execution::outstanding_work.tracked); linger_op->on_notify_finish = - Objecter::LingerOp::OpComp::create( - objecter->service.get_executor(), + boost::asio::bind_executor( + std::move(e), CB_notify_Finish(client->cct, oncomplete, objecter, linger_op, preply_bl, preply_buf, diff --git a/src/mds/Server.cc b/src/mds/Server.cc index 30861aace0d..60d922d825f 100644 --- a/src/mds/Server.cc +++ b/src/mds/Server.cc @@ -6027,8 +6027,11 @@ int Server::check_layout_vxattr(MDRequestRef& mdr, // latest map. One day if COMPACT_VERSION of MClientRequest >=3, // we can remove those code. mdr->waited_for_osdmap = true; - mds->objecter->wait_for_latest_osdmap(std::ref(*new C_IO_Wrapper( - mds, new C_MDS_RetryRequest(mdcache, mdr)))); + mds->objecter->wait_for_latest_osdmap( + [c = new C_IO_Wrapper(mds, new C_MDS_RetryRequest(mdcache, mdr))] + (boost::system::error_code ec) { + c->complete(ceph::from_error_code(ec)); + }); return r; } } diff --git a/src/neorados/RADOS.cc b/src/neorados/RADOS.cc index fabf2d5b357..1a7c5cd37db 100644 --- a/src/neorados/RADOS.cc +++ b/src/neorados/RADOS.cc @@ -39,9 +39,9 @@ using namespace std::literals; +namespace asio = boost::asio; namespace bc = boost::container; namespace bs = boost::system; -namespace ca = ceph::async; namespace cb = ceph::buffer; namespace neorados { @@ -703,8 +703,8 @@ RADOS::Builder& RADOS::Builder::add_conf_file(std::string_view f) { return *this; } -void RADOS::Builder::build(boost::asio::io_context& ioctx, - std::unique_ptr c) { +void RADOS::Builder::build_(asio::io_context& ioctx, + BuildComp c) { constexpr auto env = CODE_ENVIRONMENT_LIBRARY; CephInitParameters ci(env); if (name) @@ -732,7 +732,9 @@ void RADOS::Builder::build(boost::asio::io_context& ioctx, auto r = cct->_conf.parse_config_files(conf_files ? conf_files->data() : nullptr, &ss, flags); if (r < 0) - c->post(std::move(c), ceph::to_error_code(r), RADOS{nullptr}); + asio::post(ioctx.get_executor(), + asio::append(std::move(c), ceph::to_error_code(r), + RADOS{nullptr})); } cct->_conf.parse_env(cct->get_module_type()); @@ -741,7 +743,9 @@ void RADOS::Builder::build(boost::asio::io_context& ioctx, std::stringstream ss; auto r = cct->_conf.set_val(n, v, &ss); if (r < 0) - c->post(std::move(c), ceph::to_error_code(-EINVAL), RADOS{nullptr}); + asio::post(ioctx.get_executor(), + asio::append(std::move(c), ceph::to_error_code(-EINVAL), + RADOS{nullptr})); } if (!no_mon_conf) { @@ -749,7 +753,9 @@ void RADOS::Builder::build(boost::asio::io_context& ioctx, // TODO This function should return an error code. auto err = mc_bootstrap.get_monmap_and_config(); if (err < 0) - c->post(std::move(c), ceph::to_error_code(err), RADOS{nullptr}); + asio::post(ioctx.get_executor(), + asio::append(std::move(c), ceph::to_error_code(err), + RADOS{nullptr})); } if (!cct->_log->is_started()) { cct->_log->start(); @@ -759,18 +765,19 @@ void RADOS::Builder::build(boost::asio::io_context& ioctx, RADOS::make_with_cct(cct, ioctx, std::move(c)); } -void RADOS::make_with_cct(CephContext* cct, - boost::asio::io_context& ioctx, - std::unique_ptr c) { +void RADOS::make_with_cct_(CephContext* cct, + asio::io_context& ioctx, + BuildComp c) { try { auto r = new detail::NeoClient{std::make_unique(ioctx, cct)}; r->objecter->wait_for_osd_map( [c = std::move(c), r = std::unique_ptr(r)]() mutable { - c->dispatch(std::move(c), bs::error_code{}, - RADOS{std::move(r)}); + asio::dispatch(asio::append(std::move(c), bs::error_code{}, + RADOS{std::move(r)})); }); } catch (const bs::system_error& err) { - c->post(std::move(c), err.code(), RADOS{nullptr}); + asio::post(ioctx.get_executor(), + asio::append(std::move(c), err.code(), RADOS{nullptr})); } } @@ -792,14 +799,14 @@ RADOS::executor_type RADOS::get_executor() const { return impl->ioctx.get_executor(); } -boost::asio::io_context& RADOS::get_io_context() { +asio::io_context& RADOS::get_io_context() { return impl->ioctx; } -void RADOS::execute(Object o, IOContext _ioc, ReadOp _op, - cb::list* bl, - std::unique_ptr c, version_t* objver, - const blkin_trace_info *trace_info) { +void RADOS::execute_(Object o, IOContext _ioc, ReadOp _op, + cb::list* bl, + ReadOp::Completion c, version_t* objver, + const blkin_trace_info *trace_info) { auto oid = reinterpret_cast(&o.impl); auto ioc = reinterpret_cast(&_ioc.impl); auto op = reinterpret_cast(&_op.impl); @@ -819,9 +826,9 @@ void RADOS::execute(Object o, IOContext _ioc, ReadOp _op, trace.event("submitted"); } -void RADOS::execute(Object o, IOContext _ioc, WriteOp _op, - std::unique_ptr c, version_t* objver, - const blkin_trace_info *trace_info) { +void RADOS::execute_(Object o, IOContext _ioc, WriteOp _op, + WriteOp::Completion c, version_t* objver, + const blkin_trace_info *trace_info) { auto oid = reinterpret_cast(&o.impl); auto ioc = reinterpret_cast(&_ioc.impl); auto op = reinterpret_cast(&_op.impl); @@ -850,8 +857,7 @@ boost::uuids::uuid RADOS::get_fsid() const noexcept { return impl->monclient.get_fsid().uuid; } - -void RADOS::lookup_pool(std::string name, std::unique_ptr c) +void RADOS::lookup_pool_(std::string name, LookupPoolComp c) { // I kind of want to make lookup_pg_pool return // std::optional since it can only return one error code. @@ -867,13 +873,14 @@ void RADOS::lookup_pool(std::string name, std::unique_ptr c) return osdmap.lookup_pg_pool_name(name); }); if (ret < 0) - ca::dispatch(std::move(c), osdc_errc::pool_dne, - std::int64_t(0)); + asio::dispatch(asio::append(std::move(c), osdc_errc::pool_dne, + std::int64_t(0))); else - ca::dispatch(std::move(c), bs::error_code{}, ret); + asio::dispatch(asio::append(std::move(c), bs::error_code{}, ret)); }); } else { - ca::post(std::move(c), bs::error_code{}, ret); + asio::post(get_executor(), + asio::append(std::move(c), bs::error_code{}, ret)); } } @@ -894,107 +901,124 @@ std::optional RADOS::get_pool_alignment(int64_t pool_id) }); } -void RADOS::list_pools(std::unique_ptr c) { - ca::dispatch(std::move(c), - impl->objecter->with_osdmap( - [&](OSDMap& o) { - std::vector> v; - for (auto p : o.get_pools()) - v.push_back(std::make_pair(p.first, - o.get_pool_name(p.first))); - return v; - })); +void RADOS::list_pools_(LSPoolsComp c) { + asio::dispatch(asio::append(std::move(c), + impl->objecter->with_osdmap( + [&](OSDMap& o) { + std::vector> v; + for (auto p : o.get_pools()) + v.push_back(std::make_pair(p.first, + o.get_pool_name(p.first))); + return v; + }))); } -void RADOS::create_pool_snap(std::int64_t pool, - std::string snap_name, - std::unique_ptr c) +void RADOS::create_pool_snap_(std::int64_t pool, + std::string snap_name, + SimpleOpComp c) { + auto e = asio::prefer(get_executor(), + asio::execution::outstanding_work.tracked); impl->objecter->create_pool_snap( pool, snap_name, - Objecter::PoolOp::OpComp::create( - get_executor(), + asio::bind_executor( + std::move(e), [c = std::move(c)](bs::error_code e, const bufferlist&) mutable { - ca::dispatch(std::move(c), e); + asio::dispatch(asio::append(std::move(c), e)); })); } -void RADOS::allocate_selfmanaged_snap(int64_t pool, - std::unique_ptr c) { +void RADOS::allocate_selfmanaged_snap_(int64_t pool, + SMSnapComp c) { + auto e = asio::prefer( + get_executor(), + asio::execution::outstanding_work.tracked); + impl->objecter->allocate_selfmanaged_snap( pool, - ca::Completion::create( - get_executor(), + asio::bind_executor( + std::move(e), [c = std::move(c)](bs::error_code e, snapid_t snap) mutable { - ca::dispatch(std::move(c), e, snap); + asio::dispatch(asio::append(std::move(c), e, snap)); })); } -void RADOS::delete_pool_snap(std::int64_t pool, - std::string snap_name, - std::unique_ptr c) +void RADOS::delete_pool_snap_(std::int64_t pool, + std::string snap_name, + SimpleOpComp c) { + auto e = asio::prefer(get_executor(), + asio::execution::outstanding_work.tracked); impl->objecter->delete_pool_snap( pool, snap_name, - Objecter::PoolOp::OpComp::create( - get_executor(), + asio::bind_executor( + std::move(e), [c = std::move(c)](bs::error_code e, const bufferlist&) mutable { - ca::dispatch(std::move(c), e); + asio::dispatch(asio::append(std::move(c), e)); })); } -void RADOS::delete_selfmanaged_snap(std::int64_t pool, - std::uint64_t snap, - std::unique_ptr c) +void RADOS::delete_selfmanaged_snap_(std::int64_t pool, + std::uint64_t snap, + SimpleOpComp c) { + auto e = asio::prefer(get_executor(), + asio::execution::outstanding_work.tracked); impl->objecter->delete_selfmanaged_snap( pool, snap, - Objecter::PoolOp::OpComp::create( - get_executor(), + asio::bind_executor( + std::move(e), [c = std::move(c)](bs::error_code e, const bufferlist&) mutable { - ca::dispatch(std::move(c), e); + asio::dispatch(asio::append(std::move(c), e)); })); } -void RADOS::create_pool(std::string name, - std::optional crush_rule, - std::unique_ptr c) +void RADOS::create_pool_(std::string name, + std::optional crush_rule, + SimpleOpComp c) { + auto e = asio::prefer(get_executor(), + asio::execution::outstanding_work.tracked); + impl->objecter->create_pool( name, - Objecter::PoolOp::OpComp::create( - get_executor(), + asio::bind_executor( + std::move(e), [c = std::move(c)](bs::error_code e, const bufferlist&) mutable { - ca::dispatch(std::move(c), e); + asio::dispatch(asio::append(std::move(c), e)); }), crush_rule.value_or(-1)); } -void RADOS::delete_pool(std::string name, std::unique_ptr c) +void RADOS::delete_pool_(std::string name, SimpleOpComp c) { + auto e = asio::prefer(get_executor(), + asio::execution::outstanding_work.tracked); impl->objecter->delete_pool( name, - Objecter::PoolOp::OpComp::create( - get_executor(), + asio::bind_executor( + std::move(e), [c = std::move(c)](bs::error_code e, const bufferlist&) mutable { - ca::dispatch(std::move(c), e); + asio::dispatch(asio::append(std::move(c), e)); })); } -void RADOS::delete_pool(std::int64_t pool, - std::unique_ptr c) +void RADOS::delete_pool_(std::int64_t pool, + SimpleOpComp c) { + auto e = asio::prefer(get_executor(), + asio::execution::outstanding_work.tracked); impl->objecter->delete_pool( pool, - Objecter::PoolOp::OpComp::create( - get_executor(), + asio::bind_executor( + std::move(e), [c = std::move(c)](bs::error_code e, const bufferlist&) mutable { - ca::dispatch(std::move(c), e); + asio::dispatch(asio::append(std::move(c), e)); })); } -void RADOS::stat_pools(std::vector pools, - std::unique_ptr c) { +void RADOS::stat_pools_(std::vector pools, + PoolStatComp c) { impl->objecter->get_pool_stats( pools, [c = std::move(c)] @@ -1033,12 +1057,13 @@ void RADOS::stat_pools(std::vector pools, pv.compressed_bytes_alloc = statfs.data_compressed_allocated; } - ca::dispatch(std::move(c), ec, std::move(result), per_pool); + asio::dispatch(asio::append(std::move(c), ec, std::move(result), + per_pool)); }); } -void RADOS::stat_fs(std::optional _pool, - std::unique_ptr c) { +void RADOS::stat_fs_(std::optional _pool, + StatFSComp c) { std::optional pool; if (_pool) pool = *pool; @@ -1046,15 +1071,15 @@ void RADOS::stat_fs(std::optional _pool, pool, [c = std::move(c)](bs::error_code ec, const struct ceph_statfs s) mutable { FSStats fso{s.kb, s.kb_used, s.kb_avail, s.num_objects}; - c->dispatch(std::move(c), ec, std::move(fso)); + asio::dispatch(asio::append(std::move(c), ec, std::move(fso))); }); } // --- Watch/Notify -void RADOS::watch(Object o, IOContext _ioc, - std::optional timeout, WatchCB cb, - std::unique_ptr c) { +void RADOS::watch_(Object o, IOContext _ioc, + std::optional timeout, WatchCB cb, + WatchComp c) { auto oid = reinterpret_cast(&o.impl); auto ioc = reinterpret_cast(&_ioc.impl); @@ -1066,20 +1091,22 @@ void RADOS::watch(Object o, IOContext _ioc, linger_op->handle = std::move(cb); op.watch(cookie, CEPH_OSD_WATCH_OP_WATCH, timeout.value_or(0s).count()); bufferlist bl; + auto e = asio::prefer(get_executor(), + asio::execution::outstanding_work.tracked); impl->objecter->linger_watch( linger_op, op, ioc->snapc, ceph::real_clock::now(), bl, - Objecter::LingerOp::OpComp::create( - get_executor(), + asio::bind_executor( + std::move(e), [c = std::move(c), cookie](bs::error_code e, cb::list) mutable { - ca::dispatch(std::move(c), e, cookie); + asio::dispatch(asio::append(std::move(c), e, cookie)); }), nullptr); } -void RADOS::notify_ack(Object o, IOContext _ioc, - uint64_t notify_id, - uint64_t cookie, - bufferlist bl, - std::unique_ptr c) +void RADOS::notify_ack_(Object o, IOContext _ioc, + uint64_t notify_id, + uint64_t cookie, + bufferlist bl, + SimpleOpComp c) { auto oid = reinterpret_cast(&o.impl); auto ioc = reinterpret_cast(&_ioc.impl); @@ -1091,14 +1118,14 @@ void RADOS::notify_ack(Object o, IOContext _ioc, nullptr, ioc->extra_op_flags, std::move(c)); } -tl::expected RADOS::watch_check(uint64_t cookie) +tl::expected RADOS::watch_check_(uint64_t cookie) { Objecter::LingerOp *linger_op = reinterpret_cast(cookie); return impl->objecter->linger_check(linger_op); } -void RADOS::unwatch(uint64_t cookie, IOContext _ioc, - std::unique_ptr c) +void RADOS::unwatch_(uint64_t cookie, IOContext _ioc, + SimpleOpComp c) { auto ioc = reinterpret_cast(&_ioc.impl); @@ -1106,48 +1133,50 @@ void RADOS::unwatch(uint64_t cookie, IOContext _ioc, ObjectOperation op; op.watch(cookie, CEPH_OSD_WATCH_OP_UNWATCH); + auto e = asio::prefer(get_executor(), + asio::execution::outstanding_work.tracked); impl->objecter->mutate(linger_op->target.base_oid, ioc->oloc, std::move(op), ioc->snapc, ceph::real_clock::now(), ioc->extra_op_flags, - Objecter::Op::OpComp::create( - get_executor(), + asio::bind_executor( + std::move(e), [objecter = impl->objecter, linger_op, c = std::move(c)] (bs::error_code ec) mutable { objecter->linger_cancel(linger_op); - ca::dispatch(std::move(c), ec); + asio::dispatch(asio::append(std::move(c), ec)); })); } -void RADOS::flush_watch(std::unique_ptr c) +void RADOS::flush_watch_(VoidOpComp c) { impl->objecter->linger_callback_flush([c = std::move(c)]() mutable { - ca::post(std::move(c)); + asio::dispatch(std::move(c)); }); } struct NotifyHandler : std::enable_shared_from_this { - boost::asio::io_context& ioc; - boost::asio::strand strand; + asio::io_context& ioc; + asio::strand strand; Objecter* objecter; Objecter::LingerOp* op; - std::unique_ptr c; + RADOS::NotifyComp c; bool acked = false; bool finished = false; bs::error_code res; bufferlist rbl; - NotifyHandler(boost::asio::io_context& ioc, + NotifyHandler(asio::io_context& ioc, Objecter* objecter, Objecter::LingerOp* op, - std::unique_ptr c) - : ioc(ioc), strand(boost::asio::make_strand(ioc)), + RADOS::NotifyComp c) + : ioc(ioc), strand(asio::make_strand(ioc)), objecter(objecter), op(op), c(std::move(c)) {} // Use bind or a lambda to pass this in. void handle_ack(bs::error_code ec, bufferlist&&) { - boost::asio::post( + asio::post( strand, [this, ec, p = shared_from_this()]() mutable { acked = true; @@ -1159,7 +1188,7 @@ struct NotifyHandler : std::enable_shared_from_this { void operator()(bs::error_code ec, bufferlist&& bl) { - boost::asio::post( + asio::post( strand, [this, ec, p = shared_from_this()]() mutable { finished = true; @@ -1174,14 +1203,14 @@ struct NotifyHandler : std::enable_shared_from_this { if ((acked && finished) || res) { objecter->linger_cancel(op); ceph_assert(c); - ca::dispatch(std::move(c), res, std::move(rbl)); + asio::dispatch(asio::append(std::move(c), res, std::move(rbl))); } } }; -void RADOS::notify(Object o, IOContext _ioc, bufferlist bl, - std::optional timeout, - std::unique_ptr c) +void RADOS::notify_(Object o, IOContext _ioc, bufferlist bl, + std::optional timeout, + NotifyComp c) { auto oid = reinterpret_cast(&o.impl); auto ioc = reinterpret_cast(&_ioc.impl); @@ -1190,9 +1219,11 @@ void RADOS::notify(Object o, IOContext _ioc, bufferlist bl, auto cb = std::make_shared(impl->ioctx, impl->objecter, linger_op, std::move(c)); + auto e = asio::prefer(get_executor(), + asio::execution::outstanding_work.tracked); linger_op->on_notify_finish = - Objecter::LingerOp::OpComp::create( - get_executor(), + asio::bind_executor( + e, [cb](bs::error_code ec, ceph::bufferlist bl) mutable { (*cb)(ec, std::move(bl)); }); @@ -1205,8 +1236,8 @@ void RADOS::notify(Object o, IOContext _ioc, bufferlist bl, impl->objecter->linger_notify( linger_op, rd, ioc->snap_seq, inbl, - Objecter::LingerOp::OpComp::create( - get_executor(), + asio::bind_executor( + e, [cb](bs::error_code ec, ceph::bufferlist bl) mutable { cb->handle_ack(ec, std::move(bl)); }), nullptr); @@ -1313,9 +1344,10 @@ Cursor::from_str(const std::string& s) { return e; } -void RADOS::enumerate_objects(IOContext _ioc, Cursor begin, Cursor end, - std::uint32_t max, ceph::buffer::list filter, - std::unique_ptr c) { +void RADOS::enumerate_objects_(IOContext _ioc, Cursor begin, Cursor end, + const std::uint32_t max, + bufferlist filter, + EnumerateComp c) { auto ioc = reinterpret_cast(&_ioc.impl); impl->objecter->enumerate_objects( ioc->oloc.pool, @@ -1327,43 +1359,42 @@ void RADOS::enumerate_objects(IOContext _ioc, Cursor begin, Cursor end, [c = std::move(c)] (bs::error_code ec, std::vector&& v, hobject_t&& n) mutable { - ca::dispatch(std::move(c), ec, std::move(v), - Cursor(static_cast(&n))); + asio::dispatch(asio::append(std::move(c), ec, std::move(v), + Cursor(static_cast(&n)))); + }); +} + +void RADOS::osd_command_(int osd, std::vector cmd, + ceph::bufferlist in, CommandComp c) { + impl->objecter->osd_command( + osd, std::move(cmd), std::move(in), nullptr, + [c = std::move(c)] + (bs::error_code ec, std::string&& s, ceph::bufferlist&& b) mutable { + asio::dispatch(asio::append(std::move(c), ec, std::move(s), + std::move(b))); }); } -void RADOS::osd_command(int osd, std::vector cmd, - ceph::buffer::list in, std::unique_ptr c) { - impl->objecter->osd_command(osd, std::move(cmd), std::move(in), nullptr, - [c = std::move(c)] - (bs::error_code ec, - std::string&& s, - ceph::bufferlist&& b) mutable { - ca::dispatch(std::move(c), ec, - std::move(s), - std::move(b)); - }); -} -void RADOS::pg_command(PG pg, std::vector cmd, - ceph::buffer::list in, std::unique_ptr c) { - impl->objecter->pg_command(pg_t{pg.seed, pg.pool}, std::move(cmd), std::move(in), nullptr, - [c = std::move(c)] - (bs::error_code ec, - std::string&& s, - ceph::bufferlist&& b) mutable { - ca::dispatch(std::move(c), ec, - std::move(s), - std::move(b)); - }); -} - -void RADOS::enable_application(std::string pool, std::string app_name, - bool force, std::unique_ptr c) { +void RADOS::pg_command_(PG pg, std::vector cmd, + ceph::bufferlist in, CommandComp c) { + impl->objecter->pg_command( + pg_t{pg.seed, pg.pool}, std::move(cmd), std::move(in), nullptr, + [c = std::move(c)] + (bs::error_code ec, std::string&& s, + ceph::bufferlist&& b) mutable { + asio::dispatch(asio::append(std::move(c), ec, std::move(s), + std::move(b))); + }); +} + +void RADOS::enable_application_(std::string pool, std::string app_name, + bool force, SimpleOpComp c) { // pre-Luminous clusters will return -EINVAL and application won't be // preserved until Luminous is configured as minimum version. if (!impl->get_required_monitor_features().contains_all( ceph::features::mon::FEATURE_LUMINOUS)) { - ca::post(std::move(c), ceph::to_error_code(-EOPNOTSUPP)); + asio::post(get_executor(), + asio::append(std::move(c), ceph::to_error_code(-EOPNOTSUPP))); } else { impl->monclient.start_mon_command( { fmt::format("{{ \"prefix\": \"osd pool application enable\"," @@ -1372,14 +1403,14 @@ void RADOS::enable_application(std::string pool, std::string app_name, force ? " ,\"yes_i_really_mean_it\": true" : "")}, {}, [c = std::move(c)](bs::error_code e, std::string, cb::list) mutable { - ca::post(std::move(c), e); - }); + asio::dispatch(asio::append(std::move(c), e)); + }); } } -void RADOS::blocklist_add(std::string client_address, - std::optional expire, - std::unique_ptr c) { +void RADOS::blocklist_add_(std::string client_address, + std::optional expire, + SimpleOpComp c) { auto expire_arg = (expire ? fmt::format(", \"expire\": \"{}.0\"", expire->count()) : std::string{}); impl->monclient.start_mon_command( @@ -1392,7 +1423,8 @@ void RADOS::blocklist_add(std::string client_address, [this, client_address = std::string(client_address), expire_arg, c = std::move(c)](bs::error_code ec, std::string, cb::list) mutable { if (ec != bs::errc::invalid_argument) { - ca::post(std::move(c), ec); + asio::post(get_executor(), + asio::append(std::move(c), ec)); return; } @@ -1405,18 +1437,18 @@ void RADOS::blocklist_add(std::string client_address, client_address, expire_arg) }, {}, [c = std::move(c)](bs::error_code ec, std::string, cb::list) mutable { - ca::post(std::move(c), ec); + asio::dispatch(asio::append(std::move(c), ec)); }); }); } -void RADOS::wait_for_latest_osd_map(std::unique_ptr c) { +void RADOS::wait_for_latest_osd_map_(SimpleOpComp c) { impl->objecter->wait_for_latest_osdmap(std::move(c)); } -void RADOS::mon_command(std::vector command, - cb::list bl, std::string* outs, cb::list* outbl, - std::unique_ptr c) { +void RADOS::mon_command_(std::vector command, + cb::list bl, std::string* outs, cb::list* outbl, + SimpleOpComp c) { impl->monclient.start_mon_command( command, bl, @@ -1426,7 +1458,7 @@ void RADOS::mon_command(std::vector command, *outs = std::move(s); if (outbl) *outbl = std::move(bl); - ca::post(std::move(c), e); + asio::dispatch(asio::append(std::move(c), e)); }); } diff --git a/src/osdc/Objecter.cc b/src/osdc/Objecter.cc index 6bc713fcbf5..a74d21b0f15 100644 --- a/src/osdc/Objecter.cc +++ b/src/osdc/Objecter.cc @@ -95,6 +95,7 @@ namespace bc = boost::container; namespace bs = boost::system; namespace ca = ceph::async; namespace cb = ceph::buffer; +namespace asio = boost::asio; #define dout_subsys ceph_subsys_objecter #undef dout_prefix @@ -604,14 +605,14 @@ void Objecter::_linger_commit(LingerOp *info, bs::error_code ec, std::unique_lock wl(info->watch_lock); ldout(cct, 10) << "_linger_commit " << info->linger_id << dendl; if (info->on_reg_commit) { - info->on_reg_commit->defer(std::move(info->on_reg_commit), - ec, cb::list{}); - info->on_reg_commit.reset(); + asio::defer(service.get_executor(), + asio::append(std::move(info->on_reg_commit), + ec, cb::list{})); } if (ec && info->on_notify_finish) { - info->on_notify_finish->defer(std::move(info->on_notify_finish), - ec, cb::list{}); - info->on_notify_finish.reset(); + asio::defer(service.get_executor(), + asio::append(std::move(info->on_notify_finish), + ec, cb::list{})); } // only tell the user the first time we do this @@ -673,7 +674,7 @@ void Objecter::_linger_reconnect(LingerOp *info, bs::error_code ec) if (!info->last_error) { ec = _normalize_watch_error(ec); if (info->handle) { - boost::asio::defer(finish_strand, CB_DoWatchError(this, info, ec)); + asio::defer(finish_strand, CB_DoWatchError(this, info, ec)); } } } @@ -708,7 +709,7 @@ void Objecter::_send_linger_ping(LingerOp *info) Op *o = new Op(info->target.base_oid, info->target.base_oloc, std::move(opv), info->target.flags | CEPH_OSD_FLAG_READ, - CB_Linger_Ping(this, info, now), + fu2::unique_function{CB_Linger_Ping(this, info, now)}, nullptr, nullptr); o->target = info->target; o->should_resend = false; @@ -736,7 +737,7 @@ void Objecter::_linger_ping(LingerOp *info, bs::error_code ec, ceph::coarse_mono ec = _normalize_watch_error(ec); info->last_error = ec; if (info->handle) { - boost::asio::defer(finish_strand, CB_DoWatchError(this, info, ec)); + asio::defer(finish_strand, CB_DoWatchError(this, info, ec)); } } } else { @@ -924,7 +925,7 @@ void Objecter::handle_watch_notify(MWatchNotify *m) if (!info->last_error) { info->last_error = bs::error_code(ENOTCONN, osd_category()); if (info->handle) { - boost::asio::defer(finish_strand, CB_DoWatchError(this, info, + asio::defer(finish_strand, CB_DoWatchError(this, info, info->last_error)); } } @@ -937,16 +938,16 @@ void Objecter::handle_watch_notify(MWatchNotify *m) ldout(cct, 10) << __func__ << " reply notify " << m->notify_id << " != " << info->notify_id << ", ignoring" << dendl; } else if (info->on_notify_finish) { - info->on_notify_finish->defer( - std::move(info->on_notify_finish), - osdcode(m->return_code), std::move(m->get_data())); - + asio::defer(service.get_executor(), + asio::append(std::move(info->on_notify_finish), + osdcode(m->return_code), + std::move(m->get_data()))); // if we race with reconnect we might get a second notify; only // notify the caller once! info->on_notify_finish = nullptr; } } else { - boost::asio::defer(finish_strand, CB_DoWatchNotify(this, info, m)); + asio::defer(finish_strand, CB_DoWatchNotify(this, info, m)); } } @@ -1379,7 +1380,7 @@ void Objecter::handle_osd_map(MOSDMap *m) p->first <= osdmap->get_epoch()) { //go through the list and call the onfinish methods for (auto& [c, ec] : p->second) { - ca::post(std::move(c), ec); + asio::post(service.get_executor(), asio::append(std::move(c), ec)); } waiting_for_map.erase(p++); } @@ -1568,7 +1569,7 @@ void Objecter::_check_op_pool_dne(Op *op, std::unique_lock *s << " dne" << dendl; if (op->has_completion()) { num_in_flight--; - op->complete(osdc_errc::pool_dne, -ENOENT); + op->complete(osdc_errc::pool_dne, -ENOENT, service.get_executor()); } OSDSession *s = op->session; @@ -1603,7 +1604,7 @@ void Objecter::_check_op_pool_eio(Op *op, std::unique_lock *s << " has eio" << dendl; if (op->has_completion()) { num_in_flight--; - op->complete(osdc_errc::pool_eio, -EIO); + op->complete(osdc_errc::pool_eio, -EIO, service.get_executor()); } OSDSession *s = op->session; @@ -1701,13 +1702,15 @@ void Objecter::_check_linger_pool_dne(LingerOp *op, bool *need_unregister) if (osdmap->get_epoch() >= op->map_dne_bound) { std::unique_lock wl{op->watch_lock}; if (op->on_reg_commit) { - op->on_reg_commit->defer(std::move(op->on_reg_commit), - osdc_errc::pool_dne, cb::list{}); + asio::defer(service.get_executor(), + asio::append(std::move(op->on_reg_commit), + osdc_errc::pool_dne, cb::list{})); op->on_reg_commit = nullptr; } if (op->on_notify_finish) { - op->on_notify_finish->defer(std::move(op->on_notify_finish), - osdc_errc::pool_dne, cb::list{}); + asio::defer(service.get_executor(), + asio::append(std::move(op->on_notify_finish), + osdc_errc::pool_dne, cb::list{})); op->on_notify_finish = nullptr; } *need_unregister = true; @@ -1723,14 +1726,14 @@ void Objecter::_check_linger_pool_eio(LingerOp *op) std::unique_lock wl{op->watch_lock}; if (op->on_reg_commit) { - op->on_reg_commit->defer(std::move(op->on_reg_commit), - osdc_errc::pool_dne, cb::list{}); - op->on_reg_commit = nullptr; + asio::defer(service.get_executor(), + asio::append(std::move(op->on_reg_commit), + osdc_errc::pool_dne, cb::list{})); } if (op->on_notify_finish) { - op->on_notify_finish->defer(std::move(op->on_notify_finish), - osdc_errc::pool_dne, cb::list{}); - op->on_notify_finish = nullptr; + asio::defer(service.get_executor(), + asio::append(std::move(op->on_notify_finish), + osdc_errc::pool_dne, cb::list{})); } } @@ -1984,7 +1987,10 @@ void Objecter::wait_for_osd_map(epoch_t e) } ca::waiter w; - waiting_for_map[e].emplace_back(OpCompletion::create( + auto ex = boost::asio::prefer( + service.get_executor(), + boost::asio::execution::outstanding_work.tracked); + waiting_for_map[e].emplace_back(asio::bind_executor( service.get_executor(), w.ref()), bs::error_code{}); @@ -1993,14 +1999,15 @@ void Objecter::wait_for_osd_map(epoch_t e) } void Objecter::_get_latest_version(epoch_t oldest, epoch_t newest, - std::unique_ptr fin, + OpCompletion fin, std::unique_lock&& l) { ceph_assert(fin); if (osdmap->get_epoch() >= newest) { ldout(cct, 10) << __func__ << " latest " << newest << ", have it" << dendl; l.unlock(); - ca::defer(std::move(fin), bs::error_code{}); + asio::defer(service.get_executor(), + asio::append(std::move(fin), bs::error_code{})); } else { ldout(cct, 10) << __func__ << " latest " << newest << ", waiting" << dendl; _wait_for_new_map(std::move(fin), newest, bs::error_code{}); @@ -2034,7 +2041,7 @@ void Objecter::_maybe_request_map() } } -void Objecter::_wait_for_new_map(std::unique_ptr c, epoch_t epoch, +void Objecter::_wait_for_new_map(OpCompletion c, epoch_t epoch, bs::error_code ec) { // rwlock is locked unique @@ -2399,7 +2406,7 @@ void Objecter::_op_submit(Op *op, shunique_lock& sul, ceph_t break; case RECALC_OP_TARGET_POOL_EIO: if (op->has_completion()) { - op->complete(osdc_errc::pool_eio, -EIO); + op->complete(osdc_errc::pool_eio, -EIO, service.get_executor()); } return; } @@ -2510,7 +2517,7 @@ int Objecter::op_cancel(OSDSession *s, ceph_tid_t tid, int r) Op *op = p->second; if (op->has_completion()) { num_in_flight--; - op->complete(osdcode(r), r); + op->complete(osdcode(r), r, service.get_executor()); } _op_cancel_map_check(op); _finish_op(op, r); @@ -3625,9 +3632,9 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m) // do callbacks if (Op::has_completion(onfinish)) { if (rc == 0 && handler_error) { - Op::complete(std::move(onfinish), handler_error, -EIO); + Op::complete(std::move(onfinish), handler_error, -EIO, service.get_executor()); } else { - Op::complete(std::move(onfinish), osdcode(rc), rc); + Op::complete(std::move(onfinish), osdcode(rc), rc, service.get_executor()); } } if (completion_lock.mutex()) { @@ -3928,12 +3935,14 @@ void Objecter::create_pool_snap(int64_t pool, std::string_view snap_name, const pg_pool_t *p = osdmap->get_pg_pool(pool); if (!p) { - onfinish->defer(std::move(onfinish), osdc_errc::pool_dne, cb::list{}); + asio::defer(service.get_executor(), + asio::append(std::move(onfinish), osdc_errc::pool_dne, cb::list{})); return; } if (p->snap_exists(snap_name)) { - onfinish->defer(std::move(onfinish), osdc_errc::snapshot_exists, - cb::list{}); + asio::defer(service.get_executor(), + asio::append(std::move(onfinish), osdc_errc::snapshot_exists, + cb::list{})); return; } @@ -3949,7 +3958,7 @@ void Objecter::create_pool_snap(int64_t pool, std::string_view snap_name, } struct CB_SelfmanagedSnap { - std::unique_ptr> fin; + asio::any_completion_handler fin; CB_SelfmanagedSnap(decltype(fin)&& fin) : fin(std::move(fin)) {} void operator()(bs::error_code ec, const cb::list& bl) { @@ -3962,22 +3971,23 @@ struct CB_SelfmanagedSnap { ec = e.code(); } } - fin->defer(std::move(fin), ec, snapid); + asio::dispatch(asio::append(std::move(fin), ec, snapid)); } }; void Objecter::allocate_selfmanaged_snap( int64_t pool, - std::unique_ptr> onfinish) + asio::any_completion_handler onfinish) { unique_lock wl(rwlock); ldout(cct, 10) << "allocate_selfmanaged_snap; pool: " << pool << dendl; auto op = new PoolOp; op->tid = ++last_tid; op->pool = pool; - op->onfinish = PoolOp::OpComp::create( + auto e = boost::asio::prefer( service.get_executor(), - CB_SelfmanagedSnap(std::move(onfinish))); + boost::asio::execution::outstanding_work.tracked); + op->onfinish = asio::bind_executor(e, CB_SelfmanagedSnap(std::move(onfinish))); op->pool_op = POOL_OP_CREATE_UNMANAGED_SNAP; pool_ops[op->tid] = op; @@ -3994,12 +4004,15 @@ void Objecter::delete_pool_snap( const pg_pool_t *p = osdmap->get_pg_pool(pool); if (!p) { - onfinish->defer(std::move(onfinish), osdc_errc::pool_dne, cb::list{}); + asio::defer(service.get_executor(), + asio::append(std::move(onfinish), osdc_errc::pool_dne, + cb::list{})); return; } if (!p->snap_exists(snap_name)) { - onfinish->defer(std::move(onfinish), osdc_errc::snapshot_dne, cb::list{}); + asio::defer(service.get_executor(), + asio::append(std::move(onfinish), osdc_errc::snapshot_dne, cb::list{})); return; } @@ -4039,7 +4052,9 @@ void Objecter::create_pool(std::string_view name, ldout(cct, 10) << "create_pool name=" << name << dendl; if (osdmap->lookup_pg_pool_name(name) >= 0) { - onfinish->defer(std::move(onfinish), osdc_errc::pool_exists, cb::list{}); + asio::defer(service.get_executor(), + asio::append(std::move(onfinish), osdc_errc::pool_exists, + cb::list{})); return; } @@ -4062,7 +4077,9 @@ void Objecter::delete_pool(int64_t pool, ldout(cct, 10) << "delete_pool " << pool << dendl; if (!osdmap->have_pg_pool(pool)) - onfinish->defer(std::move(onfinish), osdc_errc::pool_dne, cb::list{}); + asio::defer(service.get_executor(), + asio::append(std::move(onfinish), osdc_errc::pool_dne, + cb::list{})); else _do_delete_pool(pool, std::move(onfinish)); } @@ -4076,7 +4093,9 @@ void Objecter::delete_pool(std::string_view pool_name, int64_t pool = osdmap->lookup_pg_pool_name(pool_name); if (pool < 0) // This only returns one error: -ENOENT. - onfinish->defer(std::move(onfinish), osdc_errc::pool_dne, cb::list{}); + asio::defer(service.get_executor(), + asio::append(std::move(onfinish), osdc_errc::pool_dne, + cb::list{})); else _do_delete_pool(pool, std::move(onfinish)); } @@ -4162,12 +4181,16 @@ void Objecter::handle_pool_op_reply(MPoolOpReply *m) if (osdmap->get_epoch() < m->epoch) { ldout(cct, 20) << "waiting for client to reach epoch " << m->epoch << " before calling back" << dendl; - _wait_for_new_map(OpCompletion::create( - service.get_executor(), + auto e = boost::asio::prefer( + service.get_executor(), + boost::asio::execution::outstanding_work.tracked); + _wait_for_new_map(asio::bind_executor( + e, [o = std::move(op->onfinish), - bl = std::move(bl)]( + bl = std::move(bl), + e = service.get_executor()]( bs::error_code ec) mutable { - o->defer(std::move(o), ec, bl); + asio::defer(e, asio::append(std::move(o), ec, bl)); }), m->epoch, ec); @@ -4176,11 +4199,11 @@ void Objecter::handle_pool_op_reply(MPoolOpReply *m) // sneaked in. Do caller-specified callback now or else // we lose it forever. ceph_assert(op->onfinish); - op->onfinish->defer(std::move(op->onfinish), ec, std::move(bl)); + asio::defer(service.get_executor(), asio::append(std::move(op->onfinish), ec, std::move(bl))); } } else { ceph_assert(op->onfinish); - op->onfinish->defer(std::move(op->onfinish), ec, std::move(bl)); + asio::defer(service.get_executor(), asio::append(std::move(op->onfinish), ec, std::move(bl))); } op->onfinish = nullptr; if (!sul.owns_lock()) { @@ -4219,7 +4242,8 @@ int Objecter::pool_op_cancel(ceph_tid_t tid, int r) PoolOp *op = it->second; if (op->onfinish) - op->onfinish->defer(std::move(op->onfinish), osdcode(r), cb::list{}); + asio::defer(service.get_executor(), asio::append(std::move(op->onfinish), + osdcode(r), cb::list{})); _finish_pool_op(op, r); return 0; @@ -4240,7 +4264,7 @@ void Objecter::_finish_pool_op(PoolOp *op, int r) // pool stats -void Objecter::get_pool_stats( +void Objecter::get_pool_stats_( const std::vector& pools, decltype(PoolStatOp::onfinish)&& onfinish) { @@ -4297,8 +4321,9 @@ void Objecter::handle_get_pool_stats_reply(MGetPoolStatsReply *m) if (m->version > last_seen_pgmap_version) { last_seen_pgmap_version = m->version; } - op->onfinish->defer(std::move(op->onfinish), bs::error_code{}, - std::move(m->pool_stats), m->per_pool); + asio::defer(service.get_executor(), + asio::append(std::move(op->onfinish), bs::error_code{}, + std::move(m->pool_stats), m->per_pool)); _finish_pool_stat_op(op, 0); } else { ldout(cct, 10) << "unknown request " << tid << dendl; @@ -4323,8 +4348,9 @@ int Objecter::pool_stat_op_cancel(ceph_tid_t tid, int r) auto op = it->second; if (op->onfinish) - op->onfinish->defer(std::move(op->onfinish), osdcode(r), - bc::flat_map{}, false); + asio::defer(service.get_executor(), + asio::append(std::move(op->onfinish), osdcode(r), + bc::flat_map{}, false)); _finish_pool_stat_op(op, r); return 0; } @@ -4342,8 +4368,8 @@ void Objecter::_finish_pool_stat_op(PoolStatOp *op, int r) delete op; } -void Objecter::get_fs_stats(std::optional poolid, - decltype(StatfsOp::onfinish)&& onfinish) +void Objecter::get_fs_stats_(std::optional poolid, + decltype(StatfsOp::onfinish)&& onfinish) { ldout(cct, 10) << "get_fs_stats" << dendl; unique_lock l(rwlock); @@ -4396,7 +4422,8 @@ void Objecter::handle_fs_stats_reply(MStatfsReply *m) ldout(cct, 10) << "have request " << tid << " at " << op << dendl; if (m->h.version > last_seen_pgmap_version) last_seen_pgmap_version = m->h.version; - op->onfinish->defer(std::move(op->onfinish), bs::error_code{}, m->h.st); + asio::defer(service.get_executor(), asio::append(std::move(op->onfinish), + bs::error_code{}, m->h.st)); _finish_statfs_op(op, 0); } else { ldout(cct, 10) << "unknown request " << tid << dendl; @@ -4421,7 +4448,9 @@ int Objecter::statfs_op_cancel(ceph_tid_t tid, int r) auto op = it->second; if (op->onfinish) - op->onfinish->defer(std::move(op->onfinish), osdcode(r), ceph_statfs{}); + asio::defer(service.get_executor(), + asio::append(std::move(op->onfinish), + osdcode(r), ceph_statfs{})); _finish_statfs_op(op, r); return 0; } @@ -5022,7 +5051,9 @@ void Objecter::_finish_command(CommandOp *c, bs::error_code ec, << rs << dendl; if (c->onfinish) - c->onfinish->defer(std::move(c->onfinish), ec, std::move(rs), std::move(bl)); + asio::defer(service.get_executor(), + asio::append(std::move(c->onfinish), ec, std::move(rs), + std::move(bl))); if (c->ontimeout && ec != bs::errc::timed_out) timer.cancel_event(c->ontimeout); @@ -5045,7 +5076,7 @@ Objecter::OSDSession::~OSDSession() Objecter::Objecter(CephContext *cct, Messenger *m, MonClient *mc, - boost::asio::io_context& service) : + asio::io_context& service) : Dispatcher(cct), messenger(m), monc(mc), service(service) { mon_timeout = cct->_conf.get_val("rados_mon_op_timeout"); @@ -5249,9 +5280,12 @@ void Objecter::_issue_enumerate(hobject_t start, auto pbl = &on_ack->bl; // Issue. See you later in _enumerate_reply + auto e = boost::asio::prefer( + service.get_executor(), + boost::asio::execution::outstanding_work.tracked); pg_read(start.get_hash(), c->oloc, op, pbl, 0, - Op::OpComp::create(service.get_executor(), + asio::bind_executor(e, [c = std::move(on_ack)] (bs::error_code ec) mutable { (*c)(ec); diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h index 9ca09b47647..870083a29b6 100644 --- a/src/osdc/Objecter.h +++ b/src/osdc/Objecter.h @@ -27,7 +27,10 @@ #include #include +#include +#include #include +#include #include #include #include @@ -45,8 +48,8 @@ #include "include/function2.hpp" #include "include/neorados/RADOS_Decodable.hpp" -#include "common/admin_socket.h" #include "common/async/completion.h" +#include "common/admin_socket.h" #include "common/ceph_time.h" #include "common/ceph_mutex.h" #include "common/ceph_timer.h" @@ -1630,7 +1633,7 @@ class Objecter : public md_config_obs_t, public Dispatcher { using MOSDOp = _mosdop::MOSDOp; public: using OpSignature = void(boost::system::error_code); - using OpCompletion = ceph::async::Completion; + using OpCompletion = boost::asio::any_completion_handler; // config observer bits const char** get_tracked_conf_keys() const override; @@ -1847,55 +1850,91 @@ public: void dump(ceph::Formatter *f) const; }; - std::unique_ptr> + boost::asio::any_completion_handler OpContextVert(Context* c) { - if (c) - return ceph::async::Completion::create( + if (c) { + auto e = boost::asio::prefer( service.get_executor(), + boost::asio::execution::outstanding_work.tracked); + + return boost::asio::bind_executor( + std::move(e), [c = std::unique_ptr(c)] (boost::system::error_code e) mutable { c.release()->complete(e); }); + } else return nullptr; } template - std::unique_ptr> + boost::asio::any_completion_handler OpContextVert(Context* c, T* p) { - if (c || p) + if (c || p) { + auto e = boost::asio::prefer( + service.get_executor(), + boost::asio::execution::outstanding_work.tracked); return - ceph::async::Completion::create( - service.get_executor(), + boost::asio::bind_executor( + e, [c = std::unique_ptr(c), p] (boost::system::error_code e, T r) mutable { if (p) *p = std::move(r); if (c) c.release()->complete(ceph::from_error_code(e)); - }); - else + }); + } else { return nullptr; + } } template - std::unique_ptr> + boost::asio::any_completion_handler OpContextVert(Context* c, T& p) { + if (c) { + auto e = boost::asio::prefer( + service.get_executor(), + boost::asio::execution::outstanding_work.tracked); + return boost::asio::bind_executor( + e, + [c = std::unique_ptr(c), &p] + (boost::system::error_code e, T r) mutable { + p = std::move(r); + if (c) + c.release()->complete(ceph::from_error_code(e)); + }); + } else { + return nullptr; + } + } + + boost::asio::any_completion_handler + OpCompletionVert(std::unique_ptr> c) { if (c) - return ceph::async::Completion< - void(boost::system::error_code, T)>::create( - service.get_executor(), - [c = std::unique_ptr(c), &p] - (boost::system::error_code e, T r) mutable { - p = std::move(r); - if (c) - c.release()->complete(ceph::from_error_code(e)); - }); + return [c = std::move(c)](boost::system::error_code ec) mutable { + c->dispatch(std::move(c), ec); + }; else return nullptr; } + template + boost::asio::any_completion_handler + OpCompletionVert(std::unique_ptr> c) { + if (c) { + return [c = std::move(c)](boost::system::error_code ec, T t) mutable { + c->dispatch(std::move(c), ec, std::move(t)); + }; + } else { + return nullptr; + } + } + struct Op : public RefCountedObject { OSDSession *session = nullptr; int incarnation = 0; @@ -1923,7 +1962,7 @@ public: int priority = 0; using OpSig = void(boost::system::error_code); - using OpComp = ceph::async::Completion; + using OpComp = boost::asio::any_completion_handler; // Due to an irregularity of cmpxattr, we actualy need the 'int' // value for onfinish for legacy librados users. As such just // preserve the Context* in this one case. That way we can have @@ -1933,7 +1972,7 @@ public: // // Add a function for the linger case, where we want better // semantics than Context, but still need to be under the completion_lock. - std::variant, fu2::unique_function, + std::variant, Context*> onfinish; uint64_t ontimeout = 0; @@ -1971,8 +2010,8 @@ public: } static void complete(decltype(onfinish)&& f, boost::system::error_code ec, - int r) { - std::visit([ec, r](auto&& arg) { + int r, boost::asio::io_context::executor_type e) { + std::visit([ec, r, e](auto&& arg) { if constexpr (std::is_same_v, Context*>) { arg->complete(r); @@ -1980,17 +2019,18 @@ public: fu2::unique_function>) { std::move(arg)(ec); } else { - arg->defer(std::move(arg), ec); + boost::asio::defer(e, + boost::asio::append(std::move(arg), ec)); } }, std::move(f)); } - void complete(boost::system::error_code ec, int r) { - complete(std::move(onfinish), ec, r); + void complete(boost::system::error_code ec, int r, + boost::asio::io_context::executor_type e) { + complete(std::move(onfinish), ec, r, e); } Op(const object_t& o, const object_locator_t& ol, osdc_opvec&& _ops, - int f, std::unique_ptr&& fin, - version_t *ov, int *offset = nullptr, + int f, OpComp&& fin, version_t *ov, int *offset = nullptr, ZTracer::Trace *parent_trace = nullptr) : target(o, ol, f), ops(std::move(_ops)), @@ -2172,8 +2212,8 @@ public: using OpSig = void(boost::system::error_code, boost::container::flat_map, bool); - using OpComp = ceph::async::Completion; - std::unique_ptr onfinish; + using OpComp = boost::asio::any_completion_handler; + OpComp onfinish; std::uint64_t ontimeout; ceph::coarse_mono_time last_submit; }; @@ -2183,9 +2223,9 @@ public: std::optional data_pool; using OpSig = void(boost::system::error_code, const struct ceph_statfs); - using OpComp = ceph::async::Completion; + using OpComp = boost::asio::any_completion_handler; - std::unique_ptr onfinish; + OpComp onfinish; uint64_t ontimeout; ceph::coarse_mono_time last_submit; @@ -2196,8 +2236,8 @@ public: int64_t pool = 0; std::string name; using OpSig = void(boost::system::error_code, ceph::buffer::list); - using OpComp = ceph::async::Completion; - std::unique_ptr onfinish; + using OpComp = boost::asio::any_completion_handler; + OpComp onfinish; uint64_t ontimeout = 0; int pool_op = 0; int16_t crush_rule = 0; @@ -2226,8 +2266,8 @@ public: using OpSig = void(boost::system::error_code, std::string, ceph::buffer::list); - using OpComp = ceph::async::Completion; - std::unique_ptr onfinish; + using OpComp = boost::asio::any_completion_handler; + OpComp onfinish; uint64_t ontimeout = 0; ceph::coarse_mono_time last_submit; @@ -2293,9 +2333,9 @@ public: bool registered{false}; bool canceled{false}; using OpSig = void(boost::system::error_code, ceph::buffer::list); - using OpComp = ceph::async::Completion; - std::unique_ptr on_reg_commit; - std::unique_ptr on_notify_finish; + using OpComp = boost::asio::any_completion_handler; + OpComp on_reg_commit; + OpComp on_notify_finish; uint64_t notify_id{0}; fu2::unique_function check_latest_map_commands; std::map, + std::vector>> waiting_for_map; ceph::timespan mon_timeout; @@ -2520,9 +2560,13 @@ public: public: template auto linger_callback_flush(CT&& ct) { - boost::asio::async_completion init(ct); - boost::asio::defer(finish_strand, std::move(init.completion_handler)); - return init.result.get(); + auto consigned = boost::asio::consign( + std::forward(ct), boost::asio::make_work_guard( + service.get_executor())); + return boost::asio::async_initiate( + [this](auto handler) { + boost::asio::defer(finish_strand, std::move(handler)); + }, consigned); } private: @@ -2673,22 +2717,28 @@ private: template auto wait_for_osd_map(CompletionToken&& token) { - boost::asio::async_completion init(token); - std::unique_lock l(rwlock); - if (osdmap->get_epoch()) { - l.unlock(); - boost::asio::post(std::move(init.completion_handler)); - } else { - waiting_for_map[0].emplace_back( - OpCompletion::create( - service.get_executor(), - [c = std::move(init.completion_handler)] - (boost::system::error_code) mutable { - std::move(c)(); - }), boost::system::error_code{}); - l.unlock(); - } - return init.result.get(); + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + service.get_executor())); + return boost::asio::async_initiate( + [this](auto handler) { + std::unique_lock l(rwlock); + if (osdmap->get_epoch()) { + l.unlock(); + boost::asio::post(std::move(handler)); + } else { + auto e = boost::asio::get_associated_executor( + handler, service.get_executor()); + waiting_for_map[0].emplace_back( + boost::asio::bind_executor( + e, [c = std::move(handler)] + (boost::system::error_code) mutable { + boost::asio::dispatch(std::move(c)); + }), + boost::system::error_code{}); + l.unlock(); + } + }, consigned); } @@ -2755,9 +2805,9 @@ public: struct CB_Objecter_GetVersion { Objecter *objecter; - std::unique_ptr fin; + OpCompletion fin; - CB_Objecter_GetVersion(Objecter *o, std::unique_ptr c) + CB_Objecter_GetVersion(Objecter *o, OpCompletion c) : objecter(o), fin(std::move(c)) {} void operator()(boost::system::error_code ec, version_t newest, version_t oldest) { @@ -2765,7 +2815,8 @@ public: // try again as instructed objecter->_wait_for_latest_osdmap(std::move(*this)); } else if (ec) { - ceph::async::post(std::move(fin), ec); + boost::asio::post(objecter->service.get_executor(), + boost::asio::append(std::move(fin), ec)); } else { auto l = std::unique_lock(objecter->rwlock); objecter->_get_latest_version(oldest, newest, std::move(fin), @@ -2776,24 +2827,23 @@ public: template auto wait_for_map(epoch_t epoch, CompletionToken&& token) { - boost::asio::async_completion init(token); - - if (osdmap->get_epoch() >= epoch) { - boost::asio::post(service, - ceph::async::bind_handler( - std::move(init.completion_handler), - boost::system::error_code())); - } else { - monc->get_version("osdmap", - CB_Objecter_GetVersion( - this, - OpCompletion::create(service.get_executor(), - std::move(init.completion_handler)))); - } - return init.result.get(); + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + service.get_executor())); + return boost::asio::async_initiate( + [epoch, this](auto handler) { + if (osdmap->get_epoch() >= epoch) { + boost::asio::post(boost::asio::append( + std::move(handler), + boost::system::error_code{})); + } else { + monc->get_version( + "osdmap", + CB_Objecter_GetVersion(this, std::move(handler))); + } + }, consigned); } - - void _wait_for_new_map(std::unique_ptr, epoch_t epoch, + void _wait_for_new_map(OpCompletion, epoch_t epoch, boost::system::error_code = {}); private: @@ -2805,38 +2855,40 @@ public: template auto wait_for_latest_osdmap(CompletionToken&& token) { - boost::asio::async_completion init(token); - - monc->get_version("osdmap", - CB_Objecter_GetVersion( - this, - OpCompletion::create(service.get_executor(), - std::move(init.completion_handler)))); - return init.result.get(); + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + service.get_executor())); + boost::asio::async_initiate( + [this](auto handler) { + monc->get_version("osdmap", + CB_Objecter_GetVersion( + this, + std::move(handler))); + }, consigned); } - void wait_for_latest_osdmap(std::unique_ptr c) { - monc->get_version("osdmap", - CB_Objecter_GetVersion(this, std::move(c))); + auto wait_for_latest_osdmap(std::unique_ptr> c) { + wait_for_latest_osdmap([c = std::move(c)](boost::system::error_code e) mutable { + c->dispatch(std::move(c), e); + }); } template auto get_latest_version(epoch_t oldest, epoch_t newest, CompletionToken&& token) { - boost::asio::async_completion init(token); - { - std::unique_lock wl(rwlock); - _get_latest_version(oldest, newest, - OpCompletion::create( - service.get_executor(), - std::move(init.completion_handler)), - std::move(wl)); - } - return init.result.get(); + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + service.get_executor())); + return boost::asio::async_initiate( + [oldest, newest, this](auto handler) { + std::unique_lock wl(rwlock); + _get_latest_version(oldest, newest, + std::move(handler), std::move(wl)); + }, consigned); } void _get_latest_version(epoch_t oldest, epoch_t neweset, - std::unique_ptr fin, + OpCompletion fin, std::unique_lock&& ul); /** Get the current set of global op flags */ @@ -2869,7 +2921,7 @@ public: epoch_t op_cancel_writes(int r, int64_t pool=-1); // commands - void osd_command(int osd, std::vector cmd, + void osd_command_(int osd, std::vector cmd, ceph::buffer::list inbl, ceph_tid_t *ptid, decltype(CommandOp::onfinish)&& onfinish) { ceph_assert(osd >= 0); @@ -2884,17 +2936,20 @@ public: auto osd_command(int osd, std::vector cmd, ceph::buffer::list inbl, ceph_tid_t *ptid, CompletionToken&& token) { - boost::asio::async_completion init(token); - osd_command(osd, std::move(cmd), std::move(inbl), ptid, - CommandOp::OpComp::create(service.get_executor(), - std::move(init.completion_handler))); - return init.result.get(); - } - - void pg_command(pg_t pgid, std::vector cmd, - ceph::buffer::list inbl, ceph_tid_t *ptid, - decltype(CommandOp::onfinish)&& onfinish) { + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + service.get_executor())); + return boost::asio::async_initiate( + [osd, cmd = std::move(cmd), inbl = std::move(inbl), ptid, this] + (auto handler) { + osd_command_(osd, std::move(cmd), std::move(inbl), ptid, + std::move(handler)); + }, consigned); + } + + void pg_command_(pg_t pgid, std::vector cmd, + ceph::buffer::list inbl, ceph_tid_t *ptid, + decltype(CommandOp::onfinish)&& onfinish) { auto *c = new CommandOp( pgid, std::move(cmd), @@ -2907,12 +2962,14 @@ public: auto pg_command(pg_t pgid, std::vector cmd, ceph::buffer::list inbl, ceph_tid_t *ptid, CompletionToken&& token) { - boost::asio::async_completion init(token); - pg_command(pgid, std::move(cmd), std::move(inbl), ptid, - CommandOp::OpComp::create(service.get_executor(), - std::move(init.completion_handler))); - return init.result.get(); + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard(service.get_executor())); + return async_initiate ( + [pgid, cmd = std::move(cmd), inbl = std::move(inbl), ptid, this] + (auto handler) { + pg_command_(pgid, std::move(cmd), std::move(inbl), ptid, + std::move(handler)); + }, consigned); } // mid-level helpers @@ -2953,7 +3010,7 @@ public: void mutate(const object_t& oid, const object_locator_t& oloc, ObjectOperation&& op, const SnapContext& snapc, ceph::real_time mtime, int flags, - std::unique_ptr&& oncommit, + Op::OpComp oncommit, version_t *objver = NULL, osd_reqid_t reqid = osd_reqid_t(), ZTracer::Trace *parent_trace = nullptr) { Op *o = new Op(oid, oloc, std::move(op.ops), flags | global_op_flags | @@ -2971,6 +3028,18 @@ public: op_submit(o); } + void mutate(const object_t& oid, const object_locator_t& oloc, + ObjectOperation&& op, const SnapContext& snapc, + ceph::real_time mtime, int flags, + std::unique_ptr> oncommit, + version_t *objver = NULL, osd_reqid_t reqid = osd_reqid_t(), + ZTracer::Trace *parent_trace = nullptr) { + mutate(oid, oloc, std::move(op), snapc, mtime, flags, + [c = std::move(oncommit)](boost::system::error_code ec) mutable { + c->dispatch(std::move(c), ec); + }, objver, reqid, parent_trace); + } + Op *prepare_read_op( const object_t& oid, const object_locator_t& oloc, ObjectOperation& op, @@ -3012,7 +3081,7 @@ public: void read(const object_t& oid, const object_locator_t& oloc, ObjectOperation&& op, snapid_t snapid, ceph::buffer::list *pbl, - int flags, std::unique_ptr&& onack, + int flags, Op::OpComp onack, version_t *objver = nullptr, int *data_offset = nullptr, uint64_t features = 0, ZTracer::Trace *parent_trace = nullptr) { Op *o = new Op(oid, oloc, std::move(op.ops), flags | global_op_flags | @@ -3035,6 +3104,17 @@ public: op_submit(o); } + void read(const object_t& oid, const object_locator_t& oloc, + ObjectOperation&& op, snapid_t snapid, ceph::buffer::list *pbl, + int flags, std::unique_ptr> onack, + version_t *objver = nullptr, int *data_offset = nullptr, + uint64_t features = 0, ZTracer::Trace *parent_trace = nullptr) { + read(oid, oloc, std::move(op), snapid, pbl, flags, + [c = std::move(onack)](boost::system::error_code e) mutable { + c->dispatch(std::move(c), e); + }, objver, data_offset, features, parent_trace); + } + Op *prepare_pg_read_op( uint32_t hash, object_locator_t oloc, @@ -3078,7 +3158,7 @@ public: ceph_tid_t pg_read( uint32_t hash, object_locator_t oloc, ObjectOperation& op, ceph::buffer::list *pbl, int flags, - std::unique_ptr&& onack, epoch_t *reply_epoch, int *ctx_budget) { + Op::OpComp onack, epoch_t *reply_epoch, int *ctx_budget) { ceph_tid_t tid; Op *o = new Op(object_t(), oloc, std::move(op.ops), @@ -3122,6 +3202,18 @@ public: return linger_watch(info, op, snapc, mtime, inbl, OpContextVert(onfinish, nullptr), objver); } + ceph_tid_t linger_watch(LingerOp *info, + ObjectOperation& op, + const SnapContext& snapc, ceph::real_time mtime, + ceph::buffer::list& inbl, + std::unique_ptr> onfinish, + version_t *objver) { + return linger_watch(info, op, snapc, mtime, inbl, + OpCompletionVert( + std::move(onfinish)), objver); + } ceph_tid_t linger_notify(LingerOp *info, ObjectOperation& op, snapid_t snap, ceph::buffer::list& inbl, @@ -3137,6 +3229,17 @@ public: OpContextVert(onack, poutbl), objver); } + ceph_tid_t linger_notify(LingerOp *info, + ObjectOperation& op, + snapid_t snap, ceph::buffer::list& inbl, + std::unique_ptr> onack, + version_t *objver) { + return linger_notify(info, op, snap, inbl, + OpCompletionVert( + std::move(onack)), objver); + } tl::expected linger_check(LingerOp *info); void linger_cancel(LingerOp *info); // releases a reference @@ -3717,15 +3820,27 @@ public: create_pool_snap(pool, snapName, OpContextVert(c, nullptr)); } + void create_pool_snap( + int64_t pool, std::string_view snapName, + std::unique_ptr> c) { + create_pool_snap(pool, snapName, + OpCompletionVert(std::move(c))); + } void allocate_selfmanaged_snap(int64_t pool, - std::unique_ptr> onfinish); + snapid_t)> onfinish); void allocate_selfmanaged_snap(int64_t pool, snapid_t* psnapid, Context* c) { allocate_selfmanaged_snap(pool, OpContextVert(c, psnapid)); } + void allocate_selfmanaged_snap(int64_t pool, + std::unique_ptr> c) { + allocate_selfmanaged_snap(pool, + OpCompletionVert(std::move(c))); + } void delete_pool_snap(int64_t pool, std::string_view snapName, decltype(PoolOp::onfinish)&& onfinish); void delete_pool_snap(int64_t pool, std::string_view snapName, @@ -3733,6 +3848,12 @@ public: delete_pool_snap(pool, snapName, OpContextVert(c, nullptr)); } + void delete_pool_snap(int64_t pool, std::string_view snapName, + std::unique_ptr> c) { + delete_pool_snap(pool, snapName, + OpCompletionVert(std::move(c))); + } void delete_selfmanaged_snap(int64_t pool, snapid_t snap, decltype(PoolOp::onfinish)&& onfinish); @@ -3741,6 +3862,12 @@ public: delete_selfmanaged_snap(pool, snap, OpContextVert(c, nullptr)); } + void delete_selfmanaged_snap(int64_t pool, snapid_t snap, + std::unique_ptr> c) { + delete_selfmanaged_snap(pool, snap, + OpCompletionVert(std::move(c))); + } void create_pool(std::string_view name, @@ -3752,12 +3879,25 @@ public: OpContextVert(onfinish, nullptr), crush_rule); } + void create_pool(std::string_view name, + std::unique_ptr> c, + int crush_rule=-1) { + create_pool(name, + OpCompletionVert(std::move(c)), + crush_rule); + } void delete_pool(int64_t pool, decltype(PoolOp::onfinish)&& onfinish); void delete_pool(int64_t pool, Context* onfinish) { delete_pool(pool, OpContextVert(onfinish, nullptr)); } + void delete_pool(int64_t pool, + std::unique_ptr> c) { + delete_pool(pool, OpCompletionVert(std::move(c))); + } void delete_pool(std::string_view name, decltype(PoolOp::onfinish)&& onfinish); @@ -3766,6 +3906,11 @@ public: Context* onfinish) { delete_pool(name, OpContextVert(onfinish, nullptr)); } + void delete_pool(std::string_view name, + std::unique_ptr> c) { + delete_pool(name, OpCompletionVert(std::move(c))); + } void handle_pool_op_reply(MPoolOpReply *m); int pool_op_cancel(ceph_tid_t tid, int r); @@ -3776,18 +3921,18 @@ private: void _poolstat_submit(PoolStatOp *op); public: void handle_get_pool_stats_reply(MGetPoolStatsReply *m); - void get_pool_stats(const std::vector& pools, - decltype(PoolStatOp::onfinish)&& onfinish); + void get_pool_stats_(const std::vector& pools, + decltype(PoolStatOp::onfinish)&& onfinish); template - auto get_pool_stats(const std::vector& pools, + auto get_pool_stats(std::vector pools, CompletionToken&& token) { - boost::asio::async_completion init(token); - get_pool_stats(pools, - PoolStatOp::OpComp::create( - service.get_executor(), - std::move(init.completion_handler))); - return init.result.get(); + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + service.get_executor())); + return boost::asio::async_initiate( + [pools = std::move(pools), this](auto handler) { + get_pool_stats_(pools, std::move(handler)); + }, consigned); } int pool_stat_op_cancel(ceph_tid_t tid, int r); void _finish_pool_stat_op(PoolStatOp *op, int r); @@ -3798,20 +3943,27 @@ private: void _fs_stats_submit(StatfsOp *op); public: void handle_fs_stats_reply(MStatfsReply *m); - void get_fs_stats(std::optional poolid, - decltype(StatfsOp::onfinish)&& onfinish); + void get_fs_stats_(std::optional poolid, + decltype(StatfsOp::onfinish)&& onfinish); template auto get_fs_stats(std::optional poolid, CompletionToken&& token) { - boost::asio::async_completion init(token); - get_fs_stats(poolid, - StatfsOp::OpComp::create(service.get_executor(), - std::move(init.completion_handler))); - return init.result.get(); + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + service.get_executor())); + return boost::asio::async_initiate( + [poolid, this](auto handler) { + get_fs_stats_(poolid, std::move(handler)); + }, consigned); } void get_fs_stats(struct ceph_statfs& result, std::optional poolid, Context *onfinish) { - get_fs_stats(poolid, OpContextVert(onfinish, result)); + get_fs_stats_(poolid, OpContextVert(onfinish, result)); + } + void get_fs_stats(std::optional poolid, + std::unique_ptr> c) { + get_fs_stats_(poolid, OpCompletionVert(std::move(c))); } int statfs_op_cancel(ceph_tid_t tid, int r); void _finish_statfs_op(StatfsOp *op, int r); diff --git a/src/test/librados_test_stub/NeoradosTestStub.cc b/src/test/librados_test_stub/NeoradosTestStub.cc index 4b6866669fb..e6a87c1617b 100644 --- a/src/test/librados_test_stub/NeoradosTestStub.cc +++ b/src/test/librados_test_stub/NeoradosTestStub.cc @@ -14,14 +14,20 @@ #include "test/librados_test_stub/TestClassHandler.h" #include "test/librados_test_stub/TestIoCtxImpl.h" #include "test/librados_test_stub/TestRadosClient.h" + #include #include #include #include #include + +#include +#include +#include #include namespace bs = boost::system; +namespace asio = boost::asio; using namespace std::literals; using namespace std::placeholders; @@ -82,7 +88,7 @@ public: namespace { struct CompletionPayload { - std::unique_ptr c; + Op::Completion c; }; void completion_callback_adapter(rados_completion_t c, void *arg) { @@ -91,14 +97,14 @@ void completion_callback_adapter(rados_completion_t c, void *arg) { impl->release(); auto payload = reinterpret_cast(arg); - payload->c->defer(std::move(payload->c), - (r < 0) ? bs::error_code(-r, osd_category()) : - bs::error_code()); + asio::dispatch(asio::append(std::move(payload->c), + (r < 0) ? bs::error_code(-r, osd_category()) : + bs::error_code())); delete payload; } librados::AioCompletionImpl* create_aio_completion( - std::unique_ptr&& c) { + Op::Completion&& c) { auto payload = new CompletionPayload{std::move(c)}; auto impl = new librados::AioCompletionImpl(); @@ -557,12 +563,12 @@ boost::asio::io_context::executor_type neorados::RADOS::get_executor() const { return impl->io_context.get_executor(); } -void RADOS::execute(Object o, IOContext ioc, ReadOp op, - ceph::buffer::list* bl, std::unique_ptr c, - uint64_t* objver, const blkin_trace_info* trace_info) { +void RADOS::execute_(Object o, IOContext ioc, ReadOp op, + ceph::buffer::list* bl, Op::Completion c, + uint64_t* objver, const blkin_trace_info* trace_info) { auto io_ctx = impl->get_io_ctx(ioc); if (io_ctx == nullptr) { - c->dispatch(std::move(c), osdc_errc::pool_dne); + asio::dispatch(asio::append(std::move(c), osdc_errc::pool_dne)); return; } @@ -576,12 +582,12 @@ void RADOS::execute(Object o, IOContext ioc, ReadOp op, ceph_assert(r == 0); } -void RADOS::execute(Object o, IOContext ioc, WriteOp op, - std::unique_ptr c, uint64_t* objver, - const blkin_trace_info* trace_info) { +void RADOS::execute_(Object o, IOContext ioc, WriteOp op, + Op::Completion c, uint64_t* objver, + const blkin_trace_info* trace_info) { auto io_ctx = impl->get_io_ctx(ioc); if (io_ctx == nullptr) { - c->dispatch(std::move(c), osdc_errc::pool_dne); + asio::dispatch(asio::append(std::move(c), osdc_errc::pool_dne)); return; } @@ -599,29 +605,33 @@ void RADOS::execute(Object o, IOContext ioc, WriteOp op, ceph_assert(r == 0); } -void RADOS::mon_command(std::vector command, - bufferlist bl, - std::string* outs, bufferlist* outbl, - std::unique_ptr c) { +void RADOS::mon_command_(std::vector command, + bufferlist bl, + std::string* outs, bufferlist* outbl, + Op::Completion c) { auto r = impl->test_rados_client->mon_command(command, bl, outbl, outs); - c->post(std::move(c), - (r < 0 ? bs::error_code(-r, osd_category()) : bs::error_code())); + asio::post(get_executor(), + asio::append(std::move(c), + (r < 0 ? bs::error_code(-r, osd_category()) : + bs::error_code()))); } -void RADOS::blocklist_add(std::string client_address, - std::optional expire, - std::unique_ptr c) { +void RADOS::blocklist_add_(std::string client_address, + std::optional expire, + SimpleOpComp c) { auto r = impl->test_rados_client->blocklist_add( std::string(client_address), expire.value_or(0s).count()); - c->post(std::move(c), - (r < 0 ? bs::error_code(-r, mon_category()) : bs::error_code())); + asio::post(get_executor(), + asio::append(std::move(c), + (r < 0 ? bs::error_code(-r, mon_category()) : + bs::error_code()))); } -void RADOS::wait_for_latest_osd_map(std::unique_ptr c) { +void RADOS::wait_for_latest_osd_map_(Op::Completion c) { auto r = impl->test_rados_client->wait_for_latest_osd_map(); - c->dispatch(std::move(c), - (r < 0 ? bs::error_code(-r, osd_category()) : - bs::error_code())); + asio::dispatch(asio::append(std::move(c), + (r < 0 ? bs::error_code(-r, osd_category()) : + bs::error_code()))); } } // namespace neorados