From 49ded69ba9ace37ddee778dc709d9d5153320323 Mon Sep 17 00:00:00 2001 From: "Adam C. Emerson" Date: Wed, 29 Mar 2023 01:35:22 -0400 Subject: [PATCH] neorados: Use `asio::any_completion_handler` As we'd like to reduce (and eliminate) internal Ceph dependencies to the extent possible, now that Boost.Asio has a type-erased handler type, let's use it. Signed-off-by: Adam C. Emerson --- src/include/neorados/RADOS.hpp | 556 ++++++++++-------- src/librados/IoCtxImpl.cc | 14 +- src/mds/Server.cc | 7 +- src/neorados/RADOS.cc | 362 ++++++------ src/osdc/Objecter.cc | 170 +++--- src/osdc/Objecter.h | 437 +++++++++----- .../librados_test_stub/NeoradosTestStub.cc | 61 +- 7 files changed, 938 insertions(+), 669 deletions(-) diff --git a/src/include/neorados/RADOS.hpp b/src/include/neorados/RADOS.hpp index b6bdbb460b64d..15bff7df7c40e 100644 --- a/src/include/neorados/RADOS.hpp +++ b/src/include/neorados/RADOS.hpp @@ -50,11 +50,6 @@ #include "include/neorados/RADOS_Decodable.hpp" -// Needed for type erasure and template support. We can't really avoid -// it. - -#include "common/async/completion.h" - // These are needed for RGW, but in general as a 'shiny new interface' // we should try to use forward declarations and provide standard alternatives. @@ -282,7 +277,7 @@ public: std::size_t size() const; using Signature = void(boost::system::error_code); - using Completion = ceph::async::Completion; + using Completion = boost::asio::any_completion_handler; friend std::ostream& operator <<(std::ostream& m, const Op& o); protected: @@ -492,7 +487,7 @@ public: } using BuildSig = void(boost::system::error_code, RADOS); - using BuildComp = ceph::async::Completion; + using BuildComp = boost::asio::any_completion_handler; class Builder { std::optional conf_files; std::optional cluster; @@ -525,31 +520,34 @@ public: return *this; } - template + template CompletionToken> auto build(boost::asio::io_context& ioctx, CompletionToken&& token) { - return boost::asio::async_initiate( - [&ioctx, this](auto&& handler) { - build(ioctx, BuildComp::create(ioctx.get_executor(), - std::move(handler))); - }, token); + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + boost::asio::get_associated_executor(token, ioctx.get_executor()))); + return boost::asio::async_initiate( + [&ioctx, this](auto handler) { + build_(ioctx, std::move(handler)); + }, consigned); } private: - void build(boost::asio::io_context& ioctx, - std::unique_ptr c); + void build_(boost::asio::io_context& ioctx, + BuildComp c); }; - template + template CompletionToken> static auto make_with_cct(CephContext* cct, boost::asio::io_context& ioctx, CompletionToken&& token) { - return boost::asio::async_initiate( + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + boost::asio::get_associated_executor(token, ioctx.get_executor()))); + return boost::asio::async_initiate( [cct, &ioctx](auto&& handler) { - make_with_cct(cct, ioctx, - BuildComp::create(ioctx.get_executor(), - std::move(handler))); - }, token); + make_with_cct_(cct, ioctx, std::move(handler)); + }, consigned); } static RADOS make_with_librados(librados::Rados& rados); @@ -568,171 +566,190 @@ public: executor_type get_executor() const; boost::asio::io_context& get_io_context(); - template + template CompletionToken> auto execute(Object o, IOContext ioc, ReadOp op, ceph::buffer::list* bl, CompletionToken&& token, uint64_t* objver = nullptr, const blkin_trace_info* trace_info = nullptr) { - return boost::asio::async_initiate( + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + boost::asio::get_associated_executor(token, get_executor()))); + return boost::asio::async_initiate( [o = std::move(o), ioc = std::move(ioc), op = std::move(op), bl, objver, trace_info, this](auto&& handler) mutable { - execute(std::move(o), std::move(ioc), std::move(op), bl, - ReadOp::Completion::create(get_executor(), - std::move(handler)), - objver, trace_info); - }, token); + execute_(std::move(o), std::move(ioc), std::move(op), bl, + std::move(handler), objver, trace_info); + }, consigned); } - template + template CompletionToken> auto execute(Object o, IOContext ioc, WriteOp op, CompletionToken&& token, uint64_t* objver = nullptr, const blkin_trace_info* trace_info = nullptr) { - return boost::asio::async_initiate( + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + boost::asio::get_associated_executor(token, get_executor()))); + return boost::asio::async_initiate( [o = std::move(o), ioc = std::move(ioc), op = std::move(op), objver, trace_info, this](auto&& handler) mutable { - execute(std::move(o), std::move(ioc), std::move(op), - WriteOp::Completion::create(get_executor(), - std::move(handler)), - objver, trace_info); - }, token); + execute_(std::move(o), std::move(ioc), std::move(op), + std::move(handler), objver, trace_info); + }, consigned); } boost::uuids::uuid get_fsid() const noexcept; using LookupPoolSig = void(boost::system::error_code, std::int64_t); - using LookupPoolComp = ceph::async::Completion; - template + using LookupPoolComp = boost::asio::any_completion_handler; + template CompletionToken> auto lookup_pool(std::string name, CompletionToken&& token) { - return boost::asio::async_initiate( + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + boost::asio::get_associated_executor(token, get_executor()))); + return boost::asio::async_initiate( [name = std::move(name), this](auto&& handler) mutable { - lookup_pool(std::move(name), - LookupPoolComp::create(get_executor(), - std::move(handler))); - }, token); + lookup_pool_(std::move(name), std::move(handler)); + }, consigned); } std::optional get_pool_alignment(int64_t pool_id); using LSPoolsSig = void(std::vector>); - using LSPoolsComp = ceph::async::Completion; - template + using LSPoolsComp = boost::asio::any_completion_handler; + template CompletionToken> auto list_pools(CompletionToken&& token) { - return boost::asio::async_initiate( + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + boost::asio::get_associated_executor(token, get_executor()))); + return boost::asio::async_initiate( [this](auto&& handler) { - list_pools(LSPoolsComp::create(get_executor(), - std::move(handler))); - }, token); + list_pools_(std::move(handler)); + }, consigned); } using SimpleOpSig = void(boost::system::error_code); - using SimpleOpComp = ceph::async::Completion; - template + using SimpleOpComp = boost::asio::any_completion_handler; + template CompletionToken> auto create_pool_snap(int64_t pool, std::string snap_name, CompletionToken&& token) { - return boost::asio::async_initiate( + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + boost::asio::get_associated_executor(token, get_executor()))); + return boost::asio::async_initiate( [pool, snap_name = std::move(snap_name), this](auto&& handler) mutable { - create_pool_snap(pool, std::move(snap_name), - SimpleOpComp::create(get_executor(), - std::move(handler))); - }, token); + create_pool_snap_(pool, std::move(snap_name), + std::move(handler)); + }, consigned); } using SMSnapSig = void(boost::system::error_code, std::uint64_t); - using SMSnapComp = ceph::async::Completion; - template + using SMSnapComp = boost::asio::any_completion_handler; + template CompletionToken> auto allocate_selfmanaged_snap(int64_t pool, CompletionToken&& token) { - return boost::asio::async_initiate( + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + boost::asio::get_associated_executor(token, get_executor()))); + return boost::asio::async_initiate( [pool, this](auto&& handler) mutable { - allocage_selfmanaged_snap(pool, - SMSnapComp::create(get_executor(), - std::move(handler))); - }, token); + allocage_selfmanaged_snap_(pool, std::move(handler)); + }, consigned); } - template + template CompletionToken> auto delete_pool_snap(int64_t pool, std::string snap_name, CompletionToken&& token) { - return boost::asio::async_initiate( + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + boost::asio::get_associated_executor(token, get_executor()))); + return boost::asio::async_initiate( [pool, snap_name = std::move(snap_name), this](auto&& handler) mutable { - delete_pool_snap(pool, std::move(snap_name), - SimpleOpComp::create(get_executor(), - std::move(handler))); - }, token); + delete_pool_snap_(pool, std::move(snap_name), + std::move(handler)); + }, consigned); } - template + template CompletionToken> auto delete_selfmanaged_snap(int64_t pool, std::string snap_name, CompletionToken&& token) { - return boost::asio::async_initiate( + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + boost::asio::get_associated_executor(token, get_executor()))); + return boost::asio::async_initiate( [pool, snap_name = std::move(snap_name), this](auto&& handler) mutable { - delete_selfmanaged_snap(pool, std::move(snap_name), - SimpleOpComp::create(get_executor(), - std::move(handler))); - }, token); + delete_selfmanaged_snap_(pool, std::move(snap_name), + std::move(handler)); + }, consigned); } - template + template CompletionToken> auto create_pool(std::string name, std::optional crush_rule, CompletionToken&& token) { - return boost::asio::async_initiate( + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + boost::asio::get_associated_executor(token, get_executor()))); + return boost::asio::async_initiate( [name = std::move(name), crush_rule, this](auto&& handler) mutable { - create_pool(std::move(name), crush_rule, - SimpleOpComp::create(get_executor(), - std::move(handler))); - }, token); + create_pool_(std::move(name), crush_rule, + std::move(handler)); + }, consigned); } - template + template CompletionToken> auto delete_pool(std::string name, CompletionToken&& token) { - return boost::asio::async_initiate( + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + boost::asio::get_associated_executor(token, get_executor()))); + return boost::asio::async_initiate( [name = std::move(name), this](auto&& handler) mutable { - delete_pool(std::move(name), - SimpleOpComp::create(get_executor(), - std::move(handler))); - }, token); + delete_pool_(std::move(name), std::move(handler)); + }, consigned); } - template + template CompletionToken> auto delete_pool(int64_t pool, CompletionToken&& token) { - return boost::asio::async_initiate( + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + boost::asio::get_associated_executor(token, get_executor()))); + return boost::asio::async_initiate( [pool, this](auto&& handler) mutable { - delete_pool(pool, - SimpleOpComp::create(get_executor(), - std::move(handler))); - }, token); + delete_pool_(pool, std::move(handler)); + }, consigned); } using PoolStatSig = void(boost::system::error_code, boost::container::flat_map, bool); - using PoolStatComp = ceph::async::Completion; - template + using PoolStatComp = boost::asio::any_completion_handler; + template CompletionToken> auto stat_pools(std::vector pools, CompletionToken&& token) { - return boost::asio::async_initiate( + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + boost::asio::get_associated_executor(token, get_executor()))); + return boost::asio::async_initiate( [pools = std::move(pools), this](auto&& handler) mutable { - stat_pools(std::move(pools), - PoolStatComp::create(get_executor(), - std::move(handler))); - }, token); + stat_pools_(std::move(pools), std::move(handler)); + }, consigned); } using StatFSSig = void(boost::system::error_code, FSStats); - using StatFSComp = ceph::async::Completion; - template + using StatFSComp = boost::asio::any_completion_handler; + template CompletionToken> auto statfs(std::optional pool, CompletionToken&& token) { - return boost::asio::async_initiate( + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + boost::asio::get_associated_executor(token, get_executor()))); + return boost::asio::async_initiate( [pool, this](auto&& handler) mutable { - statfs(pool, StatFSComp::create(get_executor(), - std::move(handler))); - }, token); + statfs_(pool, std::move(handler)); + }, consigned); } using WatchCB = fu2::unique_function; - template + using WatchComp = boost::asio::any_completion_handler; + template CompletionToken> auto watch(Object o, IOContext ioc, std::optional timeout, WatchCB cb, CompletionToken&& token) { - return boost::asio::async_initiate( + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + boost::asio::get_associated_executor(token, get_executor()))); + return boost::asio::async_initiate( [o = std::move(o), ioc = std::move(ioc), timeout, cb = std::move(cb), this](auto&& handler) mutable { - watch(std::move(o), std::move(ioc), timeout, std::move(cb), - WatchComp::create(get_executor(), - std::move(handler))); - }, token); + watch_(std::move(o), std::move(ioc), timeout, std::move(cb), + std::move(handler)); + }, consigned); } - template + template CompletionToken> auto notify_ack(Object o, IOContext ioc, uint64_t notify_id, uint64_t cookie, ceph::buffer::list bl, CompletionToken&& token) { - return boost::asio::async_initiate( + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + boost::asio::get_associated_executor(token, get_executor()))); + return boost::asio::async_initiate( [o = std::move(o), ioc = std::move(ioc), notify_id, cookie, bl = std::move(bl), this](auto&& handler) mutable { - notify_ack(std::move(o), std::move(ioc), notify_id, std::move(cookie), - std::move(bl), SimpleOpComp::create(get_executor(), - std::move(handler))); - }, token); + notify_ack_(std::move(o), std::move(ioc), std::move(notify_id), + std::move(cookie), std::move(bl), std::move(handler)); + }, consigned); } - template + template CompletionToken> auto unwatch(std::uint64_t cookie, IOContext ioc, CompletionToken&& token) { - return boost::asio::async_initiate( + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + boost::asio::get_associated_executor(token, get_executor()))); + return boost::asio::async_initiate( [cookie, ioc = std::move(ioc), this](auto&& handler) mutable { - unwatch(cookie, std::move(ioc), - SimpleOpComp::create(get_executor(), - std::move(handler))); - }, token); + unwatch_(cookie, std::move(ioc), std::move(handler)); + }, consigned); } // This is one of those places where having to force everything into @@ -789,29 +811,33 @@ public: // let us separate out the implementation details without // sacrificing all the benefits of templates. using VoidOpSig = void(); - using VoidOpComp = ceph::async::Completion; - template + using VoidOpComp = boost::asio::any_completion_handler; + template CompletionToken> auto flush_watch(CompletionToken&& token) { - return boost::asio::async_initiate( + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + boost::asio::get_associated_executor(token, get_executor()))); + return boost::asio::async_initiate( [this](auto&& handler) { - flush_watch(VoidOpComp::create(get_executor(), - std::move(handler))); - }, token); + flush_watch_(std::move(handler)); + }, consigned); } using NotifySig = void(boost::system::error_code, ceph::buffer::list); - using NotifyComp = ceph::async::Completion; - template + using NotifyComp = boost::asio::any_completion_handler; + template CompletionToken> auto notify(Object o, IOContext ioc, ceph::buffer::list bl, std::optional timeout, CompletionToken&& token) { - return boost::asio::async_initiate( + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + boost::asio::get_associated_executor(token, get_executor()))); + return boost::asio::async_initiate( [o = std::move(o), ioc = std::move(ioc), bl = std::move(bl), timeout, this](auto&& handler) mutable { - notify(std::move(o), std::move(ioc), std::move(bl), timeout, - NotifyComp::create(get_executor(), - std::move(handler))); - }, token); + notify_(std::move(o), std::move(ioc), std::move(bl), timeout, + std::move(handler)); + }, consigned); } // The versions with pointers are fine for coroutines, but @@ -819,94 +845,108 @@ public: using EnumerateSig = void(boost::system::error_code, std::vector, Cursor); - using EnumerateComp = ceph::async::Completion; - template + using EnumerateComp = boost::asio::any_completion_handler; + template CompletionToken> auto enumerate_objects(IOContext ioc, Cursor begin, Cursor end, const std::uint32_t max, ceph::buffer::list filter, CompletionToken&& token) { - return boost::asio::async_initiate( + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + boost::asio::get_associated_executor(token, get_executor()))); + return boost::asio::async_initiate( [ioc = std::move(ioc), begin = std::move(begin), end = std::move(end), max, filter = std::move(filter), this](auto&& handler) mutable { - enumerate_objects(std::move(ioc), std::move(begin), std::move(end), - std::move(max), std::move(filter), - EnumerateComp::create(get_executor(), - std::move(handler))); - }, token); + enumerate_objects_(std::move(ioc), std::move(begin), std::move(end), + std::move(max), std::move(filter), + std::move(handler)); + }, consigned); } using CommandSig = void(boost::system::error_code, std::string, ceph::buffer::list); - using CommandComp = ceph::async::Completion; - template + using CommandComp = boost::asio::any_completion_handler; + template CompletionToken> auto osd_command(int osd, std::vector cmd, ceph::buffer::list in, CompletionToken&& token) { - return boost::asio::async_initiate( + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + boost::asio::get_associated_executor(token, get_executor()))); + return boost::asio::async_initiate( [osd, cmd = std::move(cmd), in = std::move(in), this](auto&& handler) mutable { - osd_command(osd, std::move(cmd), std::move(in), - CommandComp::create(get_executor(), - std::move(handler))); - }, token); + osd_command_(osd, std::move(cmd), std::move(in), + std::move(handler)); + }, consigned); } - template + template CompletionToken> auto pg_command(PG pg, std::vector cmd, ceph::buffer::list in, CompletionToken&& token) { - return boost::asio::async_initiate( + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + boost::asio::get_associated_executor(token, get_executor()))); + return boost::asio::async_initiate( [pg = std::move(pg), cmd = std::move(cmd), in = std::move(in), this](auto&& handler) mutable { - pg_command(std::move(pg), std::move(cmd), std::move(in), - CommandComp::create(get_executor(), - std::move(handler))); - }, token); + pg_command_(std::move(pg), std::move(cmd), std::move(in), + std::move(handler)); + }, consigned); } - template + template CompletionToken> auto mon_command(std::vector command, ceph::buffer::list&& bl, std::string* outs, ceph::buffer::list* outbl, CompletionToken&& token) { - return boost::asio::async_initiate( + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + boost::asio::get_associated_executor(token, get_executor()))); + return boost::asio::async_initiate( [command = std::move(command), bl = std::move(bl), outs, outbl, this](auto&& handler) mutable { - mon_command(std::move(command), std::move(bl), outs, outbl, - SimpleOpComp::create(get_executor(), - std::move(handler))); - }, token); + mon_command_(std::move(command), std::move(bl), outs, outbl, + std::move(handler)); + }, consigned); } - template + template CompletionToken> auto enable_application(std::string pool, std::string app_name, bool force, CompletionToken&& token) { - return boost::asio::async_initiate( + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + boost::asio::get_associated_executor(token, get_executor()))); + return boost::asio::async_initiate( [pool = std::move(pool), app_name = std::move(app_name), force, this](auto&& handler) mutable { - enable_application(std::move(pool), std::move(app_name), force, - SimpleOpComp::create(get_executor(), - std::move(handler))); - }, token); + enable_application_(std::move(pool), std::move(app_name), force, + std::move(handler)); + }, consigned); } - template + template CompletionToken> auto blocklist_add(std::string client_address, std::optional expire, CompletionToken&& token) { - return boost::asio::async_initiate( + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + boost::asio::get_associated_executor(token, get_executor()))); + return boost::asio::async_initiate( [client_address = std::move(client_address), expire, this](auto&& handler) mutable { - blocklist_add(std::move(client_address), expire, - SimpleOpComp::create(get_executor(), - std::move(handler))); - }, token); + blocklist_add_(std::move(client_address), expire, + std::move(handler)); + }, consigned); } - template + template CompletionToken> auto wait_for_latest_osd_map(CompletionToken&& token) { - return boost::asio::async_initiate( + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + boost::asio::get_associated_executor(token, get_executor()))); + return boost::asio::async_initiate( [this](auto&& handler) { - wait_for_latest_osd_map(SimpleOpComp::create(get_executor(), - std::move(handler))); - }, token); + wait_for_latest_osd_map_(std::move(handler)); + }, consigned); } uint64_t instance_id() const; @@ -918,85 +958,85 @@ private: friend Builder; RADOS(std::unique_ptr impl); - static void make_with_cct(CephContext* cct, - boost::asio::io_context& ioctx, - std::unique_ptr c); - - void execute(Object o, IOContext ioc, ReadOp op, - ceph::buffer::list* bl, std::unique_ptr c, - uint64_t* objver, const blkin_trace_info* trace_info); - - void execute(Object o, IOContext ioc, WriteOp op, - std::unique_ptr c, uint64_t* objver, - const blkin_trace_info* trace_info); - - - void lookup_pool(std::string name, std::unique_ptr c); - void list_pools(std::unique_ptr c); - void create_pool_snap(int64_t pool, std::string snap_name, - std::unique_ptr c); - void allocate_selfmanaged_snap(int64_t pool, std::unique_ptr c); - void delete_pool_snap(int64_t pool, std::string snap_name, - std::unique_ptr c); - void delete_selfmanaged_snap(int64_t pool, std::uint64_t snap, - std::unique_ptr c); - void create_pool(std::string name, std::optional crush_rule, - std::unique_ptr c); - void delete_pool(std::string name, - std::unique_ptr c); - void delete_pool(int64_t pool, - std::unique_ptr c); - void stat_pools(std::vector pools, - std::unique_ptr c); - void stat_fs(std::optional pool, - std::unique_ptr c); - - void watch(Object o, IOContext ioc, - std::optional timeout, - WatchCB cb, std::unique_ptr c); + static void make_with_cct_(CephContext* cct, + boost::asio::io_context& ioctx, + BuildComp c); + + void execute_(Object o, IOContext ioc, ReadOp op, + ceph::buffer::list* bl, Op::Completion c, + uint64_t* objver, const blkin_trace_info* trace_info); + + void execute_(Object o, IOContext ioc, WriteOp op, + Op::Completion c, uint64_t* objver, + const blkin_trace_info* trace_info); + + + void lookup_pool_(std::string name, LookupPoolComp c); + void list_pools_(LSPoolsComp c); + void create_pool_snap_(int64_t pool, std::string snap_name, + SimpleOpComp c); + void allocate_selfmanaged_snap_(int64_t pool, SMSnapComp c); + void delete_pool_snap_(int64_t pool, std::string snap_name, + SimpleOpComp c); + void delete_selfmanaged_snap_(int64_t pool, std::uint64_t snap, + SimpleOpComp c); + void create_pool_(std::string name, std::optional crush_rule, + SimpleOpComp c); + void delete_pool_(std::string name, + SimpleOpComp c); + void delete_pool_(int64_t pool, + SimpleOpComp c); + void stat_pools_(std::vector pools, + PoolStatComp c); + void stat_fs_(std::optional pool, + StatFSComp c); + + void watch_(Object o, IOContext ioc, + std::optional timeout, + WatchCB cb, WatchComp c); tl::expected - watch_check(uint64_t cookie); - void notify_ack(Object o, IOContext _ioc, - uint64_t notify_id, - uint64_t cookie, - ceph::buffer::list bl, - std::unique_ptr); - void unwatch(uint64_t cookie, IOContext ioc, - std::unique_ptr); - void notify(Object oid, IOContext ioctx, - ceph::buffer::list bl, - std::optional timeout, - std::unique_ptr c); - void flush_watch(std::unique_ptr); - - void enumerate_objects(IOContext ioc, Cursor begin, - Cursor end, std::uint32_t max, - ceph::buffer::list filter, - std::vector* ls, - Cursor* cursor, - std::unique_ptr c); - void enumerate_objects(IOContext ioc, Cursor begin, - Cursor end, std::uint32_t max, - ceph::buffer::list filter, - std::unique_ptr c); - void osd_command(int osd, std::vector cmd, - ceph::buffer::list in, std::unique_ptr c); - void pg_command(PG pg, std::vector cmd, - ceph::buffer::list in, std::unique_ptr c); - - void mon_command(std::vector command, + watch_check_(uint64_t cookie); + void notify_ack_(Object o, IOContext _ioc, + uint64_t notify_id, + uint64_t cookie, ceph::buffer::list bl, - std::string* outs, ceph::buffer::list* outbl, - std::unique_ptr c); - - void enable_application(std::string pool, std::string app_name, - bool force, std::unique_ptr c); - - void blocklist_add(std::string client_address, - std::optional expire, - std::unique_ptr c); - - void wait_for_latest_osd_map(std::unique_ptr c); + SimpleOpComp); + void unwatch_(uint64_t cookie, IOContext ioc, + SimpleOpComp); + void notify_(Object oid, IOContext ioctx, + ceph::buffer::list bl, + std::optional timeout, + NotifyComp c); + void flush_watch_(VoidOpComp); + + void enumerate_objects_(IOContext ioc, Cursor begin, + Cursor end, std::uint32_t max, + ceph::buffer::list filter, + std::vector* ls, + Cursor* cursor, + SimpleOpComp c); + void enumerate_objects_(IOContext ioc, Cursor begin, + Cursor end, std::uint32_t max, + ceph::buffer::list filter, + EnumerateComp c); + void osd_command_(int osd, std::vector cmd, + ceph::buffer::list in, CommandComp c); + void pg_command_(PG pg, std::vector cmd, + ceph::buffer::list in, CommandComp c); + + void mon_command_(std::vector command, + ceph::buffer::list bl, + std::string* outs, ceph::buffer::list* outbl, + SimpleOpComp c); + + void enable_application_(std::string pool, std::string app_name, + bool force, SimpleOpComp c); + + void blocklist_add_(std::string client_address, + std::optional expire, + SimpleOpComp c); + + void wait_for_latest_osd_map_(SimpleOpComp c); // Proxy object to provide access to low-level RADOS messaging clients std::unique_ptr impl; diff --git a/src/librados/IoCtxImpl.cc b/src/librados/IoCtxImpl.cc index e1d38fd014a4b..d66b56560f9c3 100644 --- a/src/librados/IoCtxImpl.cc +++ b/src/librados/IoCtxImpl.cc @@ -1788,9 +1788,12 @@ int librados::IoCtxImpl::notify(const object_t& oid, bufferlist& bl, extra_op_flags); C_SaferCond notify_finish_cond; + auto e = boost::asio::prefer( + objecter->service.get_executor(), + boost::asio::execution::outstanding_work.tracked); linger_op->on_notify_finish = - Objecter::LingerOp::OpComp::create( - objecter->service.get_executor(), + boost::asio::bind_executor( + std::move(e), CB_notify_Finish(client->cct, ¬ify_finish_cond, objecter, linger_op, preply_bl, preply_buf, preply_buf_len)); @@ -1844,9 +1847,12 @@ int librados::IoCtxImpl::aio_notify(const object_t& oid, AioCompletionImpl *c, c->io = this; C_aio_notify_Complete *oncomplete = new C_aio_notify_Complete(c, linger_op); + auto e = boost::asio::prefer( + objecter->service.get_executor(), + boost::asio::execution::outstanding_work.tracked); linger_op->on_notify_finish = - Objecter::LingerOp::OpComp::create( - objecter->service.get_executor(), + boost::asio::bind_executor( + std::move(e), CB_notify_Finish(client->cct, oncomplete, objecter, linger_op, preply_bl, preply_buf, diff --git a/src/mds/Server.cc b/src/mds/Server.cc index 90b8f185950ee..b69d33e600453 100644 --- a/src/mds/Server.cc +++ b/src/mds/Server.cc @@ -6008,8 +6008,11 @@ int Server::check_layout_vxattr(MDRequestRef& mdr, // latest map. One day if COMPACT_VERSION of MClientRequest >=3, // we can remove those code. mdr->waited_for_osdmap = true; - mds->objecter->wait_for_latest_osdmap(std::ref(*new C_IO_Wrapper( - mds, new C_MDS_RetryRequest(mdcache, mdr)))); + mds->objecter->wait_for_latest_osdmap( + [c = new C_IO_Wrapper(mds, new C_MDS_RetryRequest(mdcache, mdr))] + (boost::system::error_code ec) { + c->complete(ceph::from_error_code(ec)); + }); return r; } } diff --git a/src/neorados/RADOS.cc b/src/neorados/RADOS.cc index d156a49e8a501..ece5697a03fcf 100644 --- a/src/neorados/RADOS.cc +++ b/src/neorados/RADOS.cc @@ -39,9 +39,9 @@ using namespace std::literals; +namespace asio = boost::asio; namespace bc = boost::container; namespace bs = boost::system; -namespace ca = ceph::async; namespace cb = ceph::buffer; namespace neorados { @@ -742,8 +742,8 @@ RADOS::Builder& RADOS::Builder::add_conf_file(std::string_view f) { return *this; } -void RADOS::Builder::build(boost::asio::io_context& ioctx, - std::unique_ptr c) { +void RADOS::Builder::build_(asio::io_context& ioctx, + BuildComp c) { constexpr auto env = CODE_ENVIRONMENT_LIBRARY; CephInitParameters ci(env); if (name) @@ -771,7 +771,9 @@ void RADOS::Builder::build(boost::asio::io_context& ioctx, auto r = cct->_conf.parse_config_files(conf_files ? conf_files->data() : nullptr, &ss, flags); if (r < 0) - c->post(std::move(c), ceph::to_error_code(r), RADOS{nullptr}); + asio::post(ioctx.get_executor(), + asio::append(std::move(c), ceph::to_error_code(r), + RADOS{nullptr})); } cct->_conf.parse_env(cct->get_module_type()); @@ -780,7 +782,9 @@ void RADOS::Builder::build(boost::asio::io_context& ioctx, std::stringstream ss; auto r = cct->_conf.set_val(n, v, &ss); if (r < 0) - c->post(std::move(c), ceph::to_error_code(-EINVAL), RADOS{nullptr}); + asio::post(ioctx.get_executor(), + asio::append(std::move(c), ceph::to_error_code(-EINVAL), + RADOS{nullptr})); } if (!no_mon_conf) { @@ -788,7 +792,9 @@ void RADOS::Builder::build(boost::asio::io_context& ioctx, // TODO This function should return an error code. auto err = mc_bootstrap.get_monmap_and_config(); if (err < 0) - c->post(std::move(c), ceph::to_error_code(err), RADOS{nullptr}); + asio::post(ioctx.get_executor(), + asio::append(std::move(c), ceph::to_error_code(err), + RADOS{nullptr})); } if (!cct->_log->is_started()) { cct->_log->start(); @@ -798,18 +804,19 @@ void RADOS::Builder::build(boost::asio::io_context& ioctx, RADOS::make_with_cct(cct, ioctx, std::move(c)); } -void RADOS::make_with_cct(CephContext* cct, - boost::asio::io_context& ioctx, - std::unique_ptr c) { +void RADOS::make_with_cct_(CephContext* cct, + asio::io_context& ioctx, + BuildComp c) { try { auto r = new detail::NeoClient{std::make_unique(ioctx, cct)}; r->objecter->wait_for_osd_map( [c = std::move(c), r = std::unique_ptr(r)]() mutable { - c->dispatch(std::move(c), bs::error_code{}, - RADOS{std::move(r)}); + asio::dispatch(asio::append(std::move(c), bs::error_code{}, + RADOS{std::move(r)})); }); } catch (const bs::system_error& err) { - c->post(std::move(c), err.code(), RADOS{nullptr}); + asio::post(ioctx.get_executor(), + asio::append(std::move(c), err.code(), RADOS{nullptr})); } } @@ -831,14 +838,14 @@ RADOS::executor_type RADOS::get_executor() const { return impl->ioctx.get_executor(); } -boost::asio::io_context& RADOS::get_io_context() { +asio::io_context& RADOS::get_io_context() { return impl->ioctx; } -void RADOS::execute(Object o, IOContext _ioc, ReadOp _op, - cb::list* bl, - std::unique_ptr c, version_t* objver, - const blkin_trace_info *trace_info) { +void RADOS::execute_(Object o, IOContext _ioc, ReadOp _op, + cb::list* bl, + ReadOp::Completion c, version_t* objver, + const blkin_trace_info *trace_info) { auto oid = reinterpret_cast(&o.impl); auto ioc = reinterpret_cast(&_ioc.impl); auto op = reinterpret_cast(&_op.impl); @@ -858,9 +865,9 @@ void RADOS::execute(Object o, IOContext _ioc, ReadOp _op, trace.event("submitted"); } -void RADOS::execute(Object o, IOContext _ioc, WriteOp _op, - std::unique_ptr c, version_t* objver, - const blkin_trace_info *trace_info) { +void RADOS::execute_(Object o, IOContext _ioc, WriteOp _op, + WriteOp::Completion c, version_t* objver, + const blkin_trace_info *trace_info) { auto oid = reinterpret_cast(&o.impl); auto ioc = reinterpret_cast(&_ioc.impl); auto op = reinterpret_cast(&_op.impl); @@ -885,8 +892,8 @@ void RADOS::execute(Object o, IOContext _ioc, WriteOp _op, trace.event("submitted"); } -void RADOS::lookup_pool(std::string name, - std::unique_ptr c) +void RADOS::lookup_pool_(std::string name, + LookupPoolComp c) { // I kind of want to make lookup_pg_pool return // std::optional since it can only return one error code. @@ -903,16 +910,18 @@ void RADOS::lookup_pool(std::string name, return osdmap.lookup_pg_pool_name(name); }); if (ret < 0) - ca::dispatch(std::move(c), osdc_errc::pool_dne, - std::int64_t(0)); + asio::dispatch(asio::append(std::move(c), osdc_errc::pool_dne, + std::int64_t(0))); else - ca::dispatch(std::move(c), bs::error_code{}, ret); + asio::dispatch(asio::append(std::move(c), bs::error_code{}, ret)); }); } else if (ret < 0) { - ca::post(std::move(c), osdc_errc::pool_dne, - std::int64_t(0)); + asio::post(get_executor(), + asio::append(std::move(c), osdc_errc::pool_dne, + std::int64_t(0))); } else { - ca::post(std::move(c), bs::error_code{}, ret); + asio::post(get_executor(), + asio::append(std::move(c), bs::error_code{}, ret)); } } @@ -933,108 +942,125 @@ std::optional RADOS::get_pool_alignment(int64_t pool_id) }); } -void RADOS::list_pools(std::unique_ptr c) { - ca::dispatch(std::move(c), - impl->objecter->with_osdmap( - [&](OSDMap& o) { - std::vector> v; - for (auto p : o.get_pools()) - v.push_back(std::make_pair(p.first, - o.get_pool_name(p.first))); - return v; - })); +void RADOS::list_pools_(LSPoolsComp c) { + asio::dispatch(asio::append(std::move(c), + impl->objecter->with_osdmap( + [&](OSDMap& o) { + std::vector> v; + for (auto p : o.get_pools()) + v.push_back(std::make_pair(p.first, + o.get_pool_name(p.first))); + return v; + }))); } -void RADOS::create_pool_snap(std::int64_t pool, - std::string snap_name, - std::unique_ptr c) +void RADOS::create_pool_snap_(std::int64_t pool, + std::string snap_name, + SimpleOpComp c) { + auto e = asio::prefer(get_executor(), + asio::execution::outstanding_work.tracked); impl->objecter->create_pool_snap( pool, snap_name, - Objecter::PoolOp::OpComp::create( - get_executor(), + asio::bind_executor( + std::move(e), [c = std::move(c)](bs::error_code e, const bufferlist&) mutable { - ca::dispatch(std::move(c), e); + asio::dispatch(asio::append(std::move(c), e)); })); } -void RADOS::allocate_selfmanaged_snap(int64_t pool, - std::unique_ptr c) { +void RADOS::allocate_selfmanaged_snap_(int64_t pool, + SMSnapComp c) { + auto e = asio::prefer( + get_executor(), + asio::execution::outstanding_work.tracked); + impl->objecter->allocate_selfmanaged_snap( pool, - ca::Completion::create( - get_executor(), + asio::bind_executor( + std::move(e), [c = std::move(c)](bs::error_code e, snapid_t snap) mutable { - ca::dispatch(std::move(c), e, snap); + asio::dispatch(asio::append(std::move(c), e, snap)); })); } -void RADOS::delete_pool_snap(std::int64_t pool, - std::string snap_name, - std::unique_ptr c) +void RADOS::delete_pool_snap_(std::int64_t pool, + std::string snap_name, + SimpleOpComp c) { + auto e = asio::prefer(get_executor(), + asio::execution::outstanding_work.tracked); impl->objecter->delete_pool_snap( pool, snap_name, - Objecter::PoolOp::OpComp::create( - get_executor(), + asio::bind_executor( + std::move(e), [c = std::move(c)](bs::error_code e, const bufferlist&) mutable { - ca::dispatch(std::move(c), e); + asio::dispatch(asio::append(std::move(c), e)); })); } -void RADOS::delete_selfmanaged_snap(std::int64_t pool, - std::uint64_t snap, - std::unique_ptr c) +void RADOS::delete_selfmanaged_snap_(std::int64_t pool, + std::uint64_t snap, + SimpleOpComp c) { + auto e = asio::prefer(get_executor(), + asio::execution::outstanding_work.tracked); impl->objecter->delete_selfmanaged_snap( pool, snap, - Objecter::PoolOp::OpComp::create( - get_executor(), + asio::bind_executor( + std::move(e), [c = std::move(c)](bs::error_code e, const bufferlist&) mutable { - ca::dispatch(std::move(c), e); + asio::dispatch(asio::append(std::move(c), e)); })); } -void RADOS::create_pool(std::string name, - std::optional crush_rule, - std::unique_ptr c) +void RADOS::create_pool_(std::string name, + std::optional crush_rule, + SimpleOpComp c) { + auto e = asio::prefer(get_executor(), + asio::execution::outstanding_work.tracked); + impl->objecter->create_pool( name, - Objecter::PoolOp::OpComp::create( - get_executor(), + asio::bind_executor( + std::move(e), [c = std::move(c)](bs::error_code e, const bufferlist&) mutable { - ca::dispatch(std::move(c), e); + asio::dispatch(asio::append(std::move(c), e)); }), crush_rule.value_or(-1)); } -void RADOS::delete_pool(std::string name, - std::unique_ptr c) +void RADOS::delete_pool_(std::string name, + SimpleOpComp c) { + auto e = asio::prefer(get_executor(), + asio::execution::outstanding_work.tracked); impl->objecter->delete_pool( name, - Objecter::PoolOp::OpComp::create( - get_executor(), + asio::bind_executor( + std::move(e), [c = std::move(c)](bs::error_code e, const bufferlist&) mutable { - ca::dispatch(std::move(c), e); + asio::dispatch(asio::append(std::move(c), e)); })); } -void RADOS::delete_pool(std::int64_t pool, - std::unique_ptr c) +void RADOS::delete_pool_(std::int64_t pool, + SimpleOpComp c) { + auto e = asio::prefer(get_executor(), + asio::execution::outstanding_work.tracked); impl->objecter->delete_pool( pool, - Objecter::PoolOp::OpComp::create( - get_executor(), + asio::bind_executor( + std::move(e), [c = std::move(c)](bs::error_code e, const bufferlist&) mutable { - ca::dispatch(std::move(c), e); + asio::dispatch(asio::append(std::move(c), e)); })); } -void RADOS::stat_pools(std::vector pools, - std::unique_ptr c) { +void RADOS::stat_pools_(std::vector pools, + PoolStatComp c) { impl->objecter->get_pool_stats( pools, [c = std::move(c)] @@ -1073,12 +1099,13 @@ void RADOS::stat_pools(std::vector pools, pv.compressed_bytes_alloc = statfs.data_compressed_allocated; } - ca::dispatch(std::move(c), ec, std::move(result), per_pool); + asio::dispatch(asio::append(std::move(c), ec, std::move(result), + per_pool)); }); } -void RADOS::stat_fs(std::optional _pool, - std::unique_ptr c) { +void RADOS::stat_fs_(std::optional _pool, + StatFSComp c) { std::optional pool; if (_pool) pool = *pool; @@ -1086,15 +1113,15 @@ void RADOS::stat_fs(std::optional _pool, pool, [c = std::move(c)](bs::error_code ec, const struct ceph_statfs s) mutable { FSStats fso{s.kb, s.kb_used, s.kb_avail, s.num_objects}; - c->dispatch(std::move(c), ec, std::move(fso)); + asio::dispatch(asio::append(std::move(c), ec, std::move(fso))); }); } // --- Watch/Notify -void RADOS::watch(Object o, IOContext _ioc, - std::optional timeout, WatchCB cb, - std::unique_ptr c) { +void RADOS::watch_(Object o, IOContext _ioc, + std::optional timeout, WatchCB cb, + WatchComp c) { auto oid = reinterpret_cast(&o.impl); auto ioc = reinterpret_cast(&_ioc.impl); @@ -1106,21 +1133,23 @@ void RADOS::watch(Object o, IOContext _ioc, linger_op->handle = std::move(cb); op.watch(cookie, CEPH_OSD_WATCH_OP_WATCH, timeout.value_or(0s).count()); bufferlist bl; + auto e = asio::prefer(get_executor(), + asio::execution::outstanding_work.tracked); impl->objecter->linger_watch( linger_op, op, ioc->snapc, ceph::real_clock::now(), bl, - Objecter::LingerOp::OpComp::create( - get_executor(), + asio::bind_executor( + std::move(e), [c = std::move(c), cookie](bs::error_code e, cb::list) mutable { - ca::dispatch(std::move(c), e, cookie); + asio::dispatch(asio::append(std::move(c), e, cookie)); }), nullptr); } -void RADOS::notify_ack(Object o, - IOContext _ioc, - uint64_t notify_id, - uint64_t cookie, - bufferlist bl, - std::unique_ptr c) +void RADOS::notify_ack_(Object o, + IOContext _ioc, + uint64_t notify_id, + uint64_t cookie, + bufferlist bl, + SimpleOpComp c) { auto oid = reinterpret_cast(&o.impl); auto ioc = reinterpret_cast(&_ioc.impl); @@ -1132,14 +1161,14 @@ void RADOS::notify_ack(Object o, nullptr, ioc->extra_op_flags, std::move(c)); } -tl::expected RADOS::watch_check(uint64_t cookie) +tl::expected RADOS::watch_check_(uint64_t cookie) { Objecter::LingerOp *linger_op = reinterpret_cast(cookie); return impl->objecter->linger_check(linger_op); } -void RADOS::unwatch(uint64_t cookie, IOContext _ioc, - std::unique_ptr c) +void RADOS::unwatch_(uint64_t cookie, IOContext _ioc, + SimpleOpComp c) { auto ioc = reinterpret_cast(&_ioc.impl); @@ -1147,48 +1176,50 @@ void RADOS::unwatch(uint64_t cookie, IOContext _ioc, ObjectOperation op; op.watch(cookie, CEPH_OSD_WATCH_OP_UNWATCH); + auto e = asio::prefer(get_executor(), + asio::execution::outstanding_work.tracked); impl->objecter->mutate(linger_op->target.base_oid, ioc->oloc, std::move(op), ioc->snapc, ceph::real_clock::now(), ioc->extra_op_flags, - Objecter::Op::OpComp::create( - get_executor(), + asio::bind_executor( + std::move(e), [objecter = impl->objecter, linger_op, c = std::move(c)] (bs::error_code ec) mutable { objecter->linger_cancel(linger_op); - ca::dispatch(std::move(c), ec); + asio::dispatch(asio::append(std::move(c), ec)); })); } -void RADOS::flush_watch(std::unique_ptr c) +void RADOS::flush_watch_(VoidOpComp c) { impl->objecter->linger_callback_flush([c = std::move(c)]() mutable { - ca::post(std::move(c)); + asio::dispatch(std::move(c)); }); } struct NotifyHandler : std::enable_shared_from_this { - boost::asio::io_context& ioc; - boost::asio::strand strand; + asio::io_context& ioc; + asio::strand strand; Objecter* objecter; Objecter::LingerOp* op; - std::unique_ptr c; + RADOS::NotifyComp c; bool acked = false; bool finished = false; bs::error_code res; bufferlist rbl; - NotifyHandler(boost::asio::io_context& ioc, + NotifyHandler(asio::io_context& ioc, Objecter* objecter, Objecter::LingerOp* op, - std::unique_ptr c) - : ioc(ioc), strand(boost::asio::make_strand(ioc)), + RADOS::NotifyComp c) + : ioc(ioc), strand(asio::make_strand(ioc)), objecter(objecter), op(op), c(std::move(c)) {} // Use bind or a lambda to pass this in. void handle_ack(bs::error_code ec, bufferlist&&) { - boost::asio::post( + asio::post( strand, [this, ec, p = shared_from_this()]() mutable { acked = true; @@ -1200,7 +1231,7 @@ struct NotifyHandler : std::enable_shared_from_this { void operator()(bs::error_code ec, bufferlist&& bl) { - boost::asio::post( + asio::post( strand, [this, ec, p = shared_from_this()]() mutable { finished = true; @@ -1215,14 +1246,14 @@ struct NotifyHandler : std::enable_shared_from_this { if ((acked && finished) || res) { objecter->linger_cancel(op); ceph_assert(c); - ca::dispatch(std::move(c), res, std::move(rbl)); + asio::dispatch(asio::append(std::move(c), res, std::move(rbl))); } } }; -void RADOS::notify(Object o, IOContext _ioc, bufferlist bl, - std::optional timeout, - std::unique_ptr c) +void RADOS::notify_(Object o, IOContext _ioc, bufferlist bl, + std::optional timeout, + NotifyComp c) { auto oid = reinterpret_cast(&o.impl); auto ioc = reinterpret_cast(&_ioc.impl); @@ -1231,9 +1262,11 @@ void RADOS::notify(Object o, IOContext _ioc, bufferlist bl, auto cb = std::make_shared(impl->ioctx, impl->objecter, linger_op, std::move(c)); + auto e = asio::prefer(get_executor(), + asio::execution::outstanding_work.tracked); linger_op->on_notify_finish = - Objecter::LingerOp::OpComp::create( - get_executor(), + asio::bind_executor( + e, [cb](bs::error_code ec, ceph::bufferlist bl) mutable { (*cb)(ec, std::move(bl)); }); @@ -1246,8 +1279,8 @@ void RADOS::notify(Object o, IOContext _ioc, bufferlist bl, impl->objecter->linger_notify( linger_op, rd, ioc->snap_seq, inbl, - Objecter::LingerOp::OpComp::create( - get_executor(), + asio::bind_executor( + e, [cb](bs::error_code ec, ceph::bufferlist bl) mutable { cb->handle_ack(ec, std::move(bl)); }), nullptr); @@ -1354,12 +1387,12 @@ Cursor::from_str(const std::string& s) { return e; } -void RADOS::enumerate_objects(IOContext _ioc, - Cursor begin, - Cursor end, - const std::uint32_t max, - bufferlist filter, - std::unique_ptr c) { +void RADOS::enumerate_objects_(IOContext _ioc, + Cursor begin, + Cursor end, + const std::uint32_t max, + bufferlist filter, + EnumerateComp c) { auto ioc = reinterpret_cast(&_ioc.impl); impl->objecter->enumerate_objects( @@ -1372,45 +1405,43 @@ void RADOS::enumerate_objects(IOContext _ioc, [c = std::move(c)] (bs::error_code ec, std::vector&& v, hobject_t&& n) mutable { - ca::dispatch(std::move(c), ec, std::move(v), - Cursor(static_cast(&n))); + asio::dispatch(asio::append(std::move(c), ec, std::move(v), + Cursor(static_cast(&n)))); }); } -void RADOS::osd_command(int osd, std::vector cmd, - ceph::bufferlist in, std::unique_ptr c) { - impl->objecter->osd_command(osd, std::move(cmd), std::move(in), nullptr, - [c = std::move(c)] - (bs::error_code ec, - std::string&& s, - ceph::bufferlist&& b) mutable { - ca::dispatch(std::move(c), ec, - std::move(s), - std::move(b)); - }); +void RADOS::osd_command_(int osd, std::vector cmd, + ceph::bufferlist in, CommandComp c) { + impl->objecter->osd_command( + osd, std::move(cmd), std::move(in), nullptr, + [c = std::move(c)] + (bs::error_code ec, std::string&& s, ceph::bufferlist&& b) mutable { + asio::dispatch(asio::append(std::move(c), ec, std::move(s), + std::move(b))); + }); } -void RADOS::pg_command(PG pg, std::vector cmd, - ceph::bufferlist in, std::unique_ptr c) { - impl->objecter->pg_command(pg_t{pg.seed, pg.pool}, std::move(cmd), std::move(in), nullptr, - [c = std::move(c)] - (bs::error_code ec, - std::string&& s, - ceph::bufferlist&& b) mutable { - ca::dispatch(std::move(c), ec, - std::move(s), - std::move(b)); - }); +void RADOS::pg_command_(PG pg, std::vector cmd, + ceph::bufferlist in, CommandComp c) { + impl->objecter->pg_command( + pg_t{pg.seed, pg.pool}, std::move(cmd), std::move(in), nullptr, + [c = std::move(c)] + (bs::error_code ec, std::string&& s, + ceph::bufferlist&& b) mutable { + asio::dispatch(asio::append(std::move(c), ec, std::move(s), + std::move(b))); + }); } -void RADOS::enable_application(std::string pool, std::string app_name, - bool force, std::unique_ptr c) { +void RADOS::enable_application_(std::string pool, std::string app_name, + bool force, SimpleOpComp c) { // pre-Luminous clusters will return -EINVAL and application won't be // preserved until Luminous is configured as minimum version. if (!impl->get_required_monitor_features().contains_all( ceph::features::mon::FEATURE_LUMINOUS)) { - ca::post(std::move(c), ceph::to_error_code(-EOPNOTSUPP)); + asio::post(get_executor(), + asio::append(std::move(c), ceph::to_error_code(-EOPNOTSUPP))); } else { impl->monclient.start_mon_command( { fmt::format("{{ \"prefix\": \"osd pool application enable\"," @@ -1419,14 +1450,14 @@ void RADOS::enable_application(std::string pool, std::string app_name, force ? " ,\"yes_i_really_mean_it\": true" : "")}, {}, [c = std::move(c)](bs::error_code e, std::string, cb::list) mutable { - ca::post(std::move(c), e); - }); + asio::dispatch(asio::append(std::move(c), e)); + }); } } -void RADOS::blocklist_add(std::string client_address, - std::optional expire, - std::unique_ptr c) { +void RADOS::blocklist_add_(std::string client_address, + std::optional expire, + SimpleOpComp c) { auto expire_arg = (expire ? fmt::format(", \"expire\": \"{}.0\"", expire->count()) : std::string{}); impl->monclient.start_mon_command( @@ -1439,7 +1470,8 @@ void RADOS::blocklist_add(std::string client_address, [this, client_address = std::move(client_address), expire_arg, c = std::move(c)](bs::error_code ec, std::string, cb::list) mutable { if (ec != bs::errc::invalid_argument) { - ca::post(std::move(c), ec); + asio::post(get_executor(), + asio::append(std::move(c), ec)); return; } @@ -1452,19 +1484,19 @@ void RADOS::blocklist_add(std::string client_address, client_address, expire_arg) }, {}, [c = std::move(c)](bs::error_code ec, std::string, cb::list) mutable { - ca::post(std::move(c), ec); + asio::dispatch(asio::append(std::move(c), ec)); }); }); } -void RADOS::wait_for_latest_osd_map(std::unique_ptr c) { +void RADOS::wait_for_latest_osd_map_(SimpleOpComp c) { impl->objecter->wait_for_latest_osdmap(std::move(c)); } -void RADOS::mon_command(std::vector command, - cb::list bl, - std::string* outs, cb::list* outbl, - std::unique_ptr c) { +void RADOS::mon_command_(std::vector command, + cb::list bl, + std::string* outs, cb::list* outbl, + SimpleOpComp c) { impl->monclient.start_mon_command( command, bl, @@ -1474,7 +1506,7 @@ void RADOS::mon_command(std::vector command, *outs = std::move(s); if (outbl) *outbl = std::move(bl); - ca::post(std::move(c), e); + asio::dispatch(asio::append(std::move(c), e)); }); } diff --git a/src/osdc/Objecter.cc b/src/osdc/Objecter.cc index eee03c1189877..3421da8d59d26 100644 --- a/src/osdc/Objecter.cc +++ b/src/osdc/Objecter.cc @@ -95,6 +95,7 @@ namespace bc = boost::container; namespace bs = boost::system; namespace ca = ceph::async; namespace cb = ceph::buffer; +namespace asio = boost::asio; #define dout_subsys ceph_subsys_objecter #undef dout_prefix @@ -604,14 +605,14 @@ void Objecter::_linger_commit(LingerOp *info, bs::error_code ec, std::unique_lock wl(info->watch_lock); ldout(cct, 10) << "_linger_commit " << info->linger_id << dendl; if (info->on_reg_commit) { - info->on_reg_commit->defer(std::move(info->on_reg_commit), - ec, cb::list{}); - info->on_reg_commit.reset(); + asio::defer(service.get_executor(), + asio::append(std::move(info->on_reg_commit), + ec, cb::list{})); } if (ec && info->on_notify_finish) { - info->on_notify_finish->defer(std::move(info->on_notify_finish), - ec, cb::list{}); - info->on_notify_finish.reset(); + asio::defer(service.get_executor(), + asio::append(std::move(info->on_notify_finish), + ec, cb::list{})); } // only tell the user the first time we do this @@ -673,7 +674,7 @@ void Objecter::_linger_reconnect(LingerOp *info, bs::error_code ec) if (!info->last_error) { ec = _normalize_watch_error(ec); if (info->handle) { - boost::asio::defer(finish_strand, CB_DoWatchError(this, info, ec)); + asio::defer(finish_strand, CB_DoWatchError(this, info, ec)); } } } @@ -708,7 +709,7 @@ void Objecter::_send_linger_ping(LingerOp *info) Op *o = new Op(info->target.base_oid, info->target.base_oloc, std::move(opv), info->target.flags | CEPH_OSD_FLAG_READ, - CB_Linger_Ping(this, info, now), + fu2::unique_function{CB_Linger_Ping(this, info, now)}, nullptr, nullptr); o->target = info->target; o->should_resend = false; @@ -736,7 +737,7 @@ void Objecter::_linger_ping(LingerOp *info, bs::error_code ec, ceph::coarse_mono ec = _normalize_watch_error(ec); info->last_error = ec; if (info->handle) { - boost::asio::defer(finish_strand, CB_DoWatchError(this, info, ec)); + asio::defer(finish_strand, CB_DoWatchError(this, info, ec)); } } } else { @@ -924,7 +925,7 @@ void Objecter::handle_watch_notify(MWatchNotify *m) if (!info->last_error) { info->last_error = bs::error_code(ENOTCONN, osd_category()); if (info->handle) { - boost::asio::defer(finish_strand, CB_DoWatchError(this, info, + asio::defer(finish_strand, CB_DoWatchError(this, info, info->last_error)); } } @@ -937,16 +938,16 @@ void Objecter::handle_watch_notify(MWatchNotify *m) ldout(cct, 10) << __func__ << " reply notify " << m->notify_id << " != " << info->notify_id << ", ignoring" << dendl; } else if (info->on_notify_finish) { - info->on_notify_finish->defer( - std::move(info->on_notify_finish), - osdcode(m->return_code), std::move(m->get_data())); - + asio::defer(service.get_executor(), + asio::append(std::move(info->on_notify_finish), + osdcode(m->return_code), + std::move(m->get_data()))); // if we race with reconnect we might get a second notify; only // notify the caller once! info->on_notify_finish = nullptr; } } else { - boost::asio::defer(finish_strand, CB_DoWatchNotify(this, info, m)); + asio::defer(finish_strand, CB_DoWatchNotify(this, info, m)); } } @@ -1379,7 +1380,7 @@ void Objecter::handle_osd_map(MOSDMap *m) p->first <= osdmap->get_epoch()) { //go through the list and call the onfinish methods for (auto& [c, ec] : p->second) { - ca::post(std::move(c), ec); + asio::post(service.get_executor(), asio::append(std::move(c), ec)); } waiting_for_map.erase(p++); } @@ -1568,7 +1569,7 @@ void Objecter::_check_op_pool_dne(Op *op, std::unique_lock *s << " dne" << dendl; if (op->has_completion()) { num_in_flight--; - op->complete(osdc_errc::pool_dne, -ENOENT); + op->complete(osdc_errc::pool_dne, -ENOENT, service.get_executor()); } OSDSession *s = op->session; @@ -1603,7 +1604,7 @@ void Objecter::_check_op_pool_eio(Op *op, std::unique_lock *s << " has eio" << dendl; if (op->has_completion()) { num_in_flight--; - op->complete(osdc_errc::pool_eio, -EIO); + op->complete(osdc_errc::pool_eio, -EIO, service.get_executor()); } OSDSession *s = op->session; @@ -1701,13 +1702,15 @@ void Objecter::_check_linger_pool_dne(LingerOp *op, bool *need_unregister) if (osdmap->get_epoch() >= op->map_dne_bound) { std::unique_lock wl{op->watch_lock}; if (op->on_reg_commit) { - op->on_reg_commit->defer(std::move(op->on_reg_commit), - osdc_errc::pool_dne, cb::list{}); + asio::defer(service.get_executor(), + asio::append(std::move(op->on_reg_commit), + osdc_errc::pool_dne, cb::list{})); op->on_reg_commit = nullptr; } if (op->on_notify_finish) { - op->on_notify_finish->defer(std::move(op->on_notify_finish), - osdc_errc::pool_dne, cb::list{}); + asio::defer(service.get_executor(), + asio::append(std::move(op->on_notify_finish), + osdc_errc::pool_dne, cb::list{})); op->on_notify_finish = nullptr; } *need_unregister = true; @@ -1723,14 +1726,14 @@ void Objecter::_check_linger_pool_eio(LingerOp *op) std::unique_lock wl{op->watch_lock}; if (op->on_reg_commit) { - op->on_reg_commit->defer(std::move(op->on_reg_commit), - osdc_errc::pool_dne, cb::list{}); - op->on_reg_commit = nullptr; + asio::defer(service.get_executor(), + asio::append(std::move(op->on_reg_commit), + osdc_errc::pool_dne, cb::list{})); } if (op->on_notify_finish) { - op->on_notify_finish->defer(std::move(op->on_notify_finish), - osdc_errc::pool_dne, cb::list{}); - op->on_notify_finish = nullptr; + asio::defer(service.get_executor(), + asio::append(std::move(op->on_notify_finish), + osdc_errc::pool_dne, cb::list{})); } } @@ -1984,7 +1987,10 @@ void Objecter::wait_for_osd_map(epoch_t e) } ca::waiter w; - waiting_for_map[e].emplace_back(OpCompletion::create( + auto ex = boost::asio::prefer( + service.get_executor(), + boost::asio::execution::outstanding_work.tracked); + waiting_for_map[e].emplace_back(asio::bind_executor( service.get_executor(), w.ref()), bs::error_code{}); @@ -1993,14 +1999,15 @@ void Objecter::wait_for_osd_map(epoch_t e) } void Objecter::_get_latest_version(epoch_t oldest, epoch_t newest, - std::unique_ptr fin, + OpCompletion fin, std::unique_lock&& l) { ceph_assert(fin); if (osdmap->get_epoch() >= newest) { ldout(cct, 10) << __func__ << " latest " << newest << ", have it" << dendl; l.unlock(); - ca::defer(std::move(fin), bs::error_code{}); + asio::defer(service.get_executor(), + asio::append(std::move(fin), bs::error_code{})); } else { ldout(cct, 10) << __func__ << " latest " << newest << ", waiting" << dendl; _wait_for_new_map(std::move(fin), newest, bs::error_code{}); @@ -2034,7 +2041,7 @@ void Objecter::_maybe_request_map() } } -void Objecter::_wait_for_new_map(std::unique_ptr c, epoch_t epoch, +void Objecter::_wait_for_new_map(OpCompletion c, epoch_t epoch, bs::error_code ec) { // rwlock is locked unique @@ -2399,7 +2406,7 @@ void Objecter::_op_submit(Op *op, shunique_lock& sul, ceph_t break; case RECALC_OP_TARGET_POOL_EIO: if (op->has_completion()) { - op->complete(osdc_errc::pool_eio, -EIO); + op->complete(osdc_errc::pool_eio, -EIO, service.get_executor()); } return; } @@ -2510,7 +2517,7 @@ int Objecter::op_cancel(OSDSession *s, ceph_tid_t tid, int r) Op *op = p->second; if (op->has_completion()) { num_in_flight--; - op->complete(osdcode(r), r); + op->complete(osdcode(r), r, service.get_executor()); } _op_cancel_map_check(op); _finish_op(op, r); @@ -3611,9 +3618,9 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m) // do callbacks if (Op::has_completion(onfinish)) { if (rc == 0 && handler_error) { - Op::complete(std::move(onfinish), handler_error, -EIO); + Op::complete(std::move(onfinish), handler_error, -EIO, service.get_executor()); } else { - Op::complete(std::move(onfinish), osdcode(rc), rc); + Op::complete(std::move(onfinish), osdcode(rc), rc, service.get_executor()); } } if (completion_lock.mutex()) { @@ -3914,12 +3921,14 @@ void Objecter::create_pool_snap(int64_t pool, std::string_view snap_name, const pg_pool_t *p = osdmap->get_pg_pool(pool); if (!p) { - onfinish->defer(std::move(onfinish), osdc_errc::pool_dne, cb::list{}); + asio::defer(service.get_executor(), + asio::append(std::move(onfinish), osdc_errc::pool_dne, cb::list{})); return; } if (p->snap_exists(snap_name)) { - onfinish->defer(std::move(onfinish), osdc_errc::snapshot_exists, - cb::list{}); + asio::defer(service.get_executor(), + asio::append(std::move(onfinish), osdc_errc::snapshot_exists, + cb::list{})); return; } @@ -3935,7 +3944,7 @@ void Objecter::create_pool_snap(int64_t pool, std::string_view snap_name, } struct CB_SelfmanagedSnap { - std::unique_ptr> fin; + asio::any_completion_handler fin; CB_SelfmanagedSnap(decltype(fin)&& fin) : fin(std::move(fin)) {} void operator()(bs::error_code ec, const cb::list& bl) { @@ -3948,22 +3957,23 @@ struct CB_SelfmanagedSnap { ec = e.code(); } } - fin->defer(std::move(fin), ec, snapid); + asio::dispatch(asio::append(std::move(fin), ec, snapid)); } }; void Objecter::allocate_selfmanaged_snap( int64_t pool, - std::unique_ptr> onfinish) + asio::any_completion_handler onfinish) { unique_lock wl(rwlock); ldout(cct, 10) << "allocate_selfmanaged_snap; pool: " << pool << dendl; auto op = new PoolOp; op->tid = ++last_tid; op->pool = pool; - op->onfinish = PoolOp::OpComp::create( + auto e = boost::asio::prefer( service.get_executor(), - CB_SelfmanagedSnap(std::move(onfinish))); + boost::asio::execution::outstanding_work.tracked); + op->onfinish = asio::bind_executor(e, CB_SelfmanagedSnap(std::move(onfinish))); op->pool_op = POOL_OP_CREATE_UNMANAGED_SNAP; pool_ops[op->tid] = op; @@ -3980,12 +3990,15 @@ void Objecter::delete_pool_snap( const pg_pool_t *p = osdmap->get_pg_pool(pool); if (!p) { - onfinish->defer(std::move(onfinish), osdc_errc::pool_dne, cb::list{}); + asio::defer(service.get_executor(), + asio::append(std::move(onfinish), osdc_errc::pool_dne, + cb::list{})); return; } if (!p->snap_exists(snap_name)) { - onfinish->defer(std::move(onfinish), osdc_errc::snapshot_dne, cb::list{}); + asio::defer(service.get_executor(), + asio::append(std::move(onfinish), osdc_errc::snapshot_dne, cb::list{})); return; } @@ -4025,7 +4038,9 @@ void Objecter::create_pool(std::string_view name, ldout(cct, 10) << "create_pool name=" << name << dendl; if (osdmap->lookup_pg_pool_name(name) >= 0) { - onfinish->defer(std::move(onfinish), osdc_errc::pool_exists, cb::list{}); + asio::defer(service.get_executor(), + asio::append(std::move(onfinish), osdc_errc::pool_exists, + cb::list{})); return; } @@ -4048,7 +4063,9 @@ void Objecter::delete_pool(int64_t pool, ldout(cct, 10) << "delete_pool " << pool << dendl; if (!osdmap->have_pg_pool(pool)) - onfinish->defer(std::move(onfinish), osdc_errc::pool_dne, cb::list{}); + asio::defer(service.get_executor(), + asio::append(std::move(onfinish), osdc_errc::pool_dne, + cb::list{})); else _do_delete_pool(pool, std::move(onfinish)); } @@ -4062,7 +4079,9 @@ void Objecter::delete_pool(std::string_view pool_name, int64_t pool = osdmap->lookup_pg_pool_name(pool_name); if (pool < 0) // This only returns one error: -ENOENT. - onfinish->defer(std::move(onfinish), osdc_errc::pool_dne, cb::list{}); + asio::defer(service.get_executor(), + asio::append(std::move(onfinish), osdc_errc::pool_dne, + cb::list{})); else _do_delete_pool(pool, std::move(onfinish)); } @@ -4148,12 +4167,16 @@ void Objecter::handle_pool_op_reply(MPoolOpReply *m) if (osdmap->get_epoch() < m->epoch) { ldout(cct, 20) << "waiting for client to reach epoch " << m->epoch << " before calling back" << dendl; - _wait_for_new_map(OpCompletion::create( - service.get_executor(), + auto e = boost::asio::prefer( + service.get_executor(), + boost::asio::execution::outstanding_work.tracked); + _wait_for_new_map(asio::bind_executor( + e, [o = std::move(op->onfinish), - bl = std::move(bl)]( + bl = std::move(bl), + e = service.get_executor()]( bs::error_code ec) mutable { - o->defer(std::move(o), ec, bl); + asio::defer(e, asio::append(std::move(o), ec, bl)); }), m->epoch, ec); @@ -4162,11 +4185,11 @@ void Objecter::handle_pool_op_reply(MPoolOpReply *m) // sneaked in. Do caller-specified callback now or else // we lose it forever. ceph_assert(op->onfinish); - op->onfinish->defer(std::move(op->onfinish), ec, std::move(bl)); + asio::defer(service.get_executor(), asio::append(std::move(op->onfinish), ec, std::move(bl))); } } else { ceph_assert(op->onfinish); - op->onfinish->defer(std::move(op->onfinish), ec, std::move(bl)); + asio::defer(service.get_executor(), asio::append(std::move(op->onfinish), ec, std::move(bl))); } op->onfinish = nullptr; if (!sul.owns_lock()) { @@ -4205,7 +4228,8 @@ int Objecter::pool_op_cancel(ceph_tid_t tid, int r) PoolOp *op = it->second; if (op->onfinish) - op->onfinish->defer(std::move(op->onfinish), osdcode(r), cb::list{}); + asio::defer(service.get_executor(), asio::append(std::move(op->onfinish), + osdcode(r), cb::list{})); _finish_pool_op(op, r); return 0; @@ -4226,7 +4250,7 @@ void Objecter::_finish_pool_op(PoolOp *op, int r) // pool stats -void Objecter::get_pool_stats( +void Objecter::get_pool_stats_( const std::vector& pools, decltype(PoolStatOp::onfinish)&& onfinish) { @@ -4283,8 +4307,9 @@ void Objecter::handle_get_pool_stats_reply(MGetPoolStatsReply *m) if (m->version > last_seen_pgmap_version) { last_seen_pgmap_version = m->version; } - op->onfinish->defer(std::move(op->onfinish), bs::error_code{}, - std::move(m->pool_stats), m->per_pool); + asio::defer(service.get_executor(), + asio::append(std::move(op->onfinish), bs::error_code{}, + std::move(m->pool_stats), m->per_pool)); _finish_pool_stat_op(op, 0); } else { ldout(cct, 10) << "unknown request " << tid << dendl; @@ -4309,8 +4334,9 @@ int Objecter::pool_stat_op_cancel(ceph_tid_t tid, int r) auto op = it->second; if (op->onfinish) - op->onfinish->defer(std::move(op->onfinish), osdcode(r), - bc::flat_map{}, false); + asio::defer(service.get_executor(), + asio::append(std::move(op->onfinish), osdcode(r), + bc::flat_map{}, false)); _finish_pool_stat_op(op, r); return 0; } @@ -4328,8 +4354,8 @@ void Objecter::_finish_pool_stat_op(PoolStatOp *op, int r) delete op; } -void Objecter::get_fs_stats(std::optional poolid, - decltype(StatfsOp::onfinish)&& onfinish) +void Objecter::get_fs_stats_(std::optional poolid, + decltype(StatfsOp::onfinish)&& onfinish) { ldout(cct, 10) << "get_fs_stats" << dendl; unique_lock l(rwlock); @@ -4382,7 +4408,8 @@ void Objecter::handle_fs_stats_reply(MStatfsReply *m) ldout(cct, 10) << "have request " << tid << " at " << op << dendl; if (m->h.version > last_seen_pgmap_version) last_seen_pgmap_version = m->h.version; - op->onfinish->defer(std::move(op->onfinish), bs::error_code{}, m->h.st); + asio::defer(service.get_executor(), asio::append(std::move(op->onfinish), + bs::error_code{}, m->h.st)); _finish_statfs_op(op, 0); } else { ldout(cct, 10) << "unknown request " << tid << dendl; @@ -4407,7 +4434,9 @@ int Objecter::statfs_op_cancel(ceph_tid_t tid, int r) auto op = it->second; if (op->onfinish) - op->onfinish->defer(std::move(op->onfinish), osdcode(r), ceph_statfs{}); + asio::defer(service.get_executor(), + asio::append(std::move(op->onfinish), + osdcode(r), ceph_statfs{})); _finish_statfs_op(op, r); return 0; } @@ -5008,7 +5037,9 @@ void Objecter::_finish_command(CommandOp *c, bs::error_code ec, << rs << dendl; if (c->onfinish) - c->onfinish->defer(std::move(c->onfinish), ec, std::move(rs), std::move(bl)); + asio::defer(service.get_executor(), + asio::append(std::move(c->onfinish), ec, std::move(rs), + std::move(bl))); if (c->ontimeout && ec != bs::errc::timed_out) timer.cancel_event(c->ontimeout); @@ -5031,7 +5062,7 @@ Objecter::OSDSession::~OSDSession() Objecter::Objecter(CephContext *cct, Messenger *m, MonClient *mc, - boost::asio::io_context& service) : + asio::io_context& service) : Dispatcher(cct), messenger(m), monc(mc), service(service) { mon_timeout = cct->_conf.get_val("rados_mon_op_timeout"); @@ -5235,9 +5266,12 @@ void Objecter::_issue_enumerate(hobject_t start, auto pbl = &on_ack->bl; // Issue. See you later in _enumerate_reply + auto e = boost::asio::prefer( + service.get_executor(), + boost::asio::execution::outstanding_work.tracked); pg_read(start.get_hash(), c->oloc, op, pbl, 0, - Op::OpComp::create(service.get_executor(), + asio::bind_executor(e, [c = std::move(on_ack)] (bs::error_code ec) mutable { (*c)(ec); diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h index 45250c4361b46..34cb9db74ff46 100644 --- a/src/osdc/Objecter.h +++ b/src/osdc/Objecter.h @@ -41,8 +41,8 @@ #include "include/function2.hpp" #include "include/neorados/RADOS_Decodable.hpp" -#include "common/admin_socket.h" #include "common/async/completion.h" +#include "common/admin_socket.h" #include "common/ceph_time.h" #include "common/ceph_mutex.h" #include "common/ceph_timer.h" @@ -1626,7 +1626,7 @@ class Objecter : public md_config_obs_t, public Dispatcher { using MOSDOp = _mosdop::MOSDOp; public: using OpSignature = void(boost::system::error_code); - using OpCompletion = ceph::async::Completion; + using OpCompletion = boost::asio::any_completion_handler; // config observer bits const char** get_tracked_conf_keys() const override; @@ -1843,55 +1843,91 @@ public: void dump(ceph::Formatter *f) const; }; - std::unique_ptr> + boost::asio::any_completion_handler OpContextVert(Context* c) { - if (c) - return ceph::async::Completion::create( + if (c) { + auto e = boost::asio::prefer( service.get_executor(), + boost::asio::execution::outstanding_work.tracked); + + return boost::asio::bind_executor( + std::move(e), [c = std::unique_ptr(c)] (boost::system::error_code e) mutable { c.release()->complete(e); }); + } else return nullptr; } template - std::unique_ptr> + boost::asio::any_completion_handler OpContextVert(Context* c, T* p) { - if (c || p) + if (c || p) { + auto e = boost::asio::prefer( + service.get_executor(), + boost::asio::execution::outstanding_work.tracked); return - ceph::async::Completion::create( - service.get_executor(), + boost::asio::bind_executor( + e, [c = std::unique_ptr(c), p] (boost::system::error_code e, T r) mutable { if (p) *p = std::move(r); if (c) c.release()->complete(ceph::from_error_code(e)); - }); - else + }); + } else { return nullptr; + } } template - std::unique_ptr> + boost::asio::any_completion_handler OpContextVert(Context* c, T& p) { + if (c) { + auto e = boost::asio::prefer( + service.get_executor(), + boost::asio::execution::outstanding_work.tracked); + return boost::asio::bind_executor( + e, + [c = std::unique_ptr(c), &p] + (boost::system::error_code e, T r) mutable { + p = std::move(r); + if (c) + c.release()->complete(ceph::from_error_code(e)); + }); + } else { + return nullptr; + } + } + + boost::asio::any_completion_handler + OpCompletionVert(std::unique_ptr> c) { if (c) - return ceph::async::Completion< - void(boost::system::error_code, T)>::create( - service.get_executor(), - [c = std::unique_ptr(c), &p] - (boost::system::error_code e, T r) mutable { - p = std::move(r); - if (c) - c.release()->complete(ceph::from_error_code(e)); - }); + return [c = std::move(c)](boost::system::error_code ec) mutable { + c->dispatch(std::move(c), ec); + }; else return nullptr; } + template + boost::asio::any_completion_handler + OpCompletionVert(std::unique_ptr> c) { + if (c) { + return [c = std::move(c)](boost::system::error_code ec, T t) mutable { + c->dispatch(std::move(c), ec, std::move(t)); + }; + } else { + return nullptr; + } + } + struct Op : public RefCountedObject { OSDSession *session = nullptr; int incarnation = 0; @@ -1919,7 +1955,7 @@ public: int priority = 0; using OpSig = void(boost::system::error_code); - using OpComp = ceph::async::Completion; + using OpComp = boost::asio::any_completion_handler; // Due to an irregularity of cmpxattr, we actualy need the 'int' // value for onfinish for legacy librados users. As such just // preserve the Context* in this one case. That way we can have @@ -1929,7 +1965,7 @@ public: // // Add a function for the linger case, where we want better // semantics than Context, but still need to be under the completion_lock. - std::variant, fu2::unique_function, + std::variant, Context*> onfinish; uint64_t ontimeout = 0; @@ -1967,8 +2003,8 @@ public: } static void complete(decltype(onfinish)&& f, boost::system::error_code ec, - int r) { - std::visit([ec, r](auto&& arg) { + int r, boost::asio::io_context::executor_type e) { + std::visit([ec, r, e](auto&& arg) { if constexpr (std::is_same_v, Context*>) { arg->complete(r); @@ -1976,17 +2012,18 @@ public: fu2::unique_function>) { std::move(arg)(ec); } else { - arg->defer(std::move(arg), ec); + boost::asio::defer(e, + boost::asio::append(std::move(arg), ec)); } }, std::move(f)); } - void complete(boost::system::error_code ec, int r) { - complete(std::move(onfinish), ec, r); + void complete(boost::system::error_code ec, int r, + boost::asio::io_context::executor_type e) { + complete(std::move(onfinish), ec, r, e); } Op(const object_t& o, const object_locator_t& ol, osdc_opvec&& _ops, - int f, std::unique_ptr&& fin, - version_t *ov, int *offset = nullptr, + int f, OpComp&& fin, version_t *ov, int *offset = nullptr, ZTracer::Trace *parent_trace = nullptr) : target(o, ol, f), ops(std::move(_ops)), @@ -2168,8 +2205,8 @@ public: using OpSig = void(boost::system::error_code, boost::container::flat_map, bool); - using OpComp = ceph::async::Completion; - std::unique_ptr onfinish; + using OpComp = boost::asio::any_completion_handler; + OpComp onfinish; std::uint64_t ontimeout; ceph::coarse_mono_time last_submit; }; @@ -2179,9 +2216,9 @@ public: std::optional data_pool; using OpSig = void(boost::system::error_code, const struct ceph_statfs); - using OpComp = ceph::async::Completion; + using OpComp = boost::asio::any_completion_handler; - std::unique_ptr onfinish; + OpComp onfinish; uint64_t ontimeout; ceph::coarse_mono_time last_submit; @@ -2192,8 +2229,8 @@ public: int64_t pool = 0; std::string name; using OpSig = void(boost::system::error_code, ceph::buffer::list); - using OpComp = ceph::async::Completion; - std::unique_ptr onfinish; + using OpComp = boost::asio::any_completion_handler; + OpComp onfinish; uint64_t ontimeout = 0; int pool_op = 0; int16_t crush_rule = 0; @@ -2222,8 +2259,8 @@ public: using OpSig = void(boost::system::error_code, std::string, ceph::buffer::list); - using OpComp = ceph::async::Completion; - std::unique_ptr onfinish; + using OpComp = boost::asio::any_completion_handler; + OpComp onfinish; uint64_t ontimeout = 0; ceph::coarse_mono_time last_submit; @@ -2289,9 +2326,9 @@ public: bool registered{false}; bool canceled{false}; using OpSig = void(boost::system::error_code, ceph::buffer::list); - using OpComp = ceph::async::Completion; - std::unique_ptr on_reg_commit; - std::unique_ptr on_notify_finish; + using OpComp = boost::asio::any_completion_handler; + OpComp on_reg_commit; + OpComp on_notify_finish; uint64_t notify_id{0}; fu2::unique_function check_latest_map_commands; std::map, + std::vector>> waiting_for_map; ceph::timespan mon_timeout; @@ -2516,9 +2553,13 @@ public: public: template auto linger_callback_flush(CT&& ct) { - boost::asio::async_completion init(ct); - boost::asio::defer(finish_strand, std::move(init.completion_handler)); - return init.result.get(); + auto consigned = boost::asio::consign( + std::forward(ct), boost::asio::make_work_guard( + service.get_executor())); + return boost::asio::async_initiate( + [this](auto handler) { + boost::asio::defer(finish_strand, std::move(handler)); + }, consigned); } private: @@ -2669,22 +2710,28 @@ private: template auto wait_for_osd_map(CompletionToken&& token) { - boost::asio::async_completion init(token); - std::unique_lock l(rwlock); - if (osdmap->get_epoch()) { - l.unlock(); - boost::asio::post(std::move(init.completion_handler)); - } else { - waiting_for_map[0].emplace_back( - OpCompletion::create( - service.get_executor(), - [c = std::move(init.completion_handler)] - (boost::system::error_code) mutable { - std::move(c)(); - }), boost::system::error_code{}); - l.unlock(); - } - return init.result.get(); + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + service.get_executor())); + return boost::asio::async_initiate( + [this](auto handler) { + std::unique_lock l(rwlock); + if (osdmap->get_epoch()) { + l.unlock(); + boost::asio::post(std::move(handler)); + } else { + auto e = boost::asio::get_associated_executor( + handler, service.get_executor()); + waiting_for_map[0].emplace_back( + boost::asio::bind_executor( + e, [c = std::move(handler)] + (boost::system::error_code) mutable { + boost::asio::dispatch(std::move(c)); + }), + boost::system::error_code{}); + l.unlock(); + } + }, consigned); } @@ -2751,9 +2798,9 @@ public: struct CB_Objecter_GetVersion { Objecter *objecter; - std::unique_ptr fin; + OpCompletion fin; - CB_Objecter_GetVersion(Objecter *o, std::unique_ptr c) + CB_Objecter_GetVersion(Objecter *o, OpCompletion c) : objecter(o), fin(std::move(c)) {} void operator()(boost::system::error_code ec, version_t newest, version_t oldest) { @@ -2761,7 +2808,8 @@ public: // try again as instructed objecter->_wait_for_latest_osdmap(std::move(*this)); } else if (ec) { - ceph::async::post(std::move(fin), ec); + boost::asio::post(objecter->service.get_executor(), + boost::asio::append(std::move(fin), ec)); } else { auto l = std::unique_lock(objecter->rwlock); objecter->_get_latest_version(oldest, newest, std::move(fin), @@ -2772,24 +2820,23 @@ public: template auto wait_for_map(epoch_t epoch, CompletionToken&& token) { - boost::asio::async_completion init(token); - - if (osdmap->get_epoch() >= epoch) { - boost::asio::post(service, - ceph::async::bind_handler( - std::move(init.completion_handler), - boost::system::error_code())); - } else { - monc->get_version("osdmap", - CB_Objecter_GetVersion( - this, - OpCompletion::create(service.get_executor(), - std::move(init.completion_handler)))); - } - return init.result.get(); + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + service.get_executor())); + return boost::asio::async_initiate( + [epoch, this](auto handler) { + if (osdmap->get_epoch() >= epoch) { + boost::asio::post(boost::asio::append( + std::move(handler), + boost::system::error_code{})); + } else { + monc->get_version( + "osdmap", + CB_Objecter_GetVersion(this, std::move(handler))); + } + }, consigned); } - - void _wait_for_new_map(std::unique_ptr, epoch_t epoch, + void _wait_for_new_map(OpCompletion, epoch_t epoch, boost::system::error_code = {}); private: @@ -2801,38 +2848,40 @@ public: template auto wait_for_latest_osdmap(CompletionToken&& token) { - boost::asio::async_completion init(token); - - monc->get_version("osdmap", - CB_Objecter_GetVersion( - this, - OpCompletion::create(service.get_executor(), - std::move(init.completion_handler)))); - return init.result.get(); + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + service.get_executor())); + boost::asio::async_initiate( + [this](auto handler) { + monc->get_version("osdmap", + CB_Objecter_GetVersion( + this, + std::move(handler))); + }, consigned); } - void wait_for_latest_osdmap(std::unique_ptr c) { - monc->get_version("osdmap", - CB_Objecter_GetVersion(this, std::move(c))); + auto wait_for_latest_osdmap(std::unique_ptr> c) { + wait_for_latest_osdmap([c = std::move(c)](boost::system::error_code e) mutable { + c->dispatch(std::move(c), e); + }); } template auto get_latest_version(epoch_t oldest, epoch_t newest, CompletionToken&& token) { - boost::asio::async_completion init(token); - { - std::unique_lock wl(rwlock); - _get_latest_version(oldest, newest, - OpCompletion::create( - service.get_executor(), - std::move(init.completion_handler)), - std::move(wl)); - } - return init.result.get(); + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + service.get_executor())); + return boost::asio::async_initiate( + [oldest, newest, this](auto handler) { + std::unique_lock wl(rwlock); + _get_latest_version(oldest, newest, + std::move(handler), std::move(wl)); + }, consigned); } void _get_latest_version(epoch_t oldest, epoch_t neweset, - std::unique_ptr fin, + OpCompletion fin, std::unique_lock&& ul); /** Get the current set of global op flags */ @@ -2865,7 +2914,7 @@ public: epoch_t op_cancel_writes(int r, int64_t pool=-1); // commands - void osd_command(int osd, std::vector cmd, + void osd_command_(int osd, std::vector cmd, ceph::buffer::list inbl, ceph_tid_t *ptid, decltype(CommandOp::onfinish)&& onfinish) { ceph_assert(osd >= 0); @@ -2880,17 +2929,20 @@ public: auto osd_command(int osd, std::vector cmd, ceph::buffer::list inbl, ceph_tid_t *ptid, CompletionToken&& token) { - boost::asio::async_completion init(token); - osd_command(osd, std::move(cmd), std::move(inbl), ptid, - CommandOp::OpComp::create(service.get_executor(), - std::move(init.completion_handler))); - return init.result.get(); - } - - void pg_command(pg_t pgid, std::vector cmd, - ceph::buffer::list inbl, ceph_tid_t *ptid, - decltype(CommandOp::onfinish)&& onfinish) { + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + service.get_executor())); + return boost::asio::async_initiate( + [osd, cmd = std::move(cmd), inbl = std::move(inbl), ptid, this] + (auto handler) { + osd_command_(osd, std::move(cmd), std::move(inbl), ptid, + std::move(handler)); + }, consigned); + } + + void pg_command_(pg_t pgid, std::vector cmd, + ceph::buffer::list inbl, ceph_tid_t *ptid, + decltype(CommandOp::onfinish)&& onfinish) { auto *c = new CommandOp( pgid, std::move(cmd), @@ -2903,12 +2955,14 @@ public: auto pg_command(pg_t pgid, std::vector cmd, ceph::buffer::list inbl, ceph_tid_t *ptid, CompletionToken&& token) { - boost::asio::async_completion init(token); - pg_command(pgid, std::move(cmd), std::move(inbl), ptid, - CommandOp::OpComp::create(service.get_executor(), - std::move(init.completion_handler))); - return init.result.get(); + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard(service.get_executor())); + return async_initiate ( + [pgid, cmd = std::move(cmd), inbl = std::move(inbl), ptid, this] + (auto handler) { + pg_command_(pgid, std::move(cmd), std::move(inbl), ptid, + std::move(handler)); + }, consigned); } // mid-level helpers @@ -2949,7 +3003,7 @@ public: void mutate(const object_t& oid, const object_locator_t& oloc, ObjectOperation&& op, const SnapContext& snapc, ceph::real_time mtime, int flags, - std::unique_ptr&& oncommit, + Op::OpComp oncommit, version_t *objver = NULL, osd_reqid_t reqid = osd_reqid_t(), ZTracer::Trace *parent_trace = nullptr) { Op *o = new Op(oid, oloc, std::move(op.ops), flags | global_op_flags | @@ -2967,6 +3021,18 @@ public: op_submit(o); } + void mutate(const object_t& oid, const object_locator_t& oloc, + ObjectOperation&& op, const SnapContext& snapc, + ceph::real_time mtime, int flags, + std::unique_ptr> oncommit, + version_t *objver = NULL, osd_reqid_t reqid = osd_reqid_t(), + ZTracer::Trace *parent_trace = nullptr) { + mutate(oid, oloc, std::move(op), snapc, mtime, flags, + [c = std::move(oncommit)](boost::system::error_code ec) mutable { + c->dispatch(std::move(c), ec); + }, objver, reqid, parent_trace); + } + Op *prepare_read_op( const object_t& oid, const object_locator_t& oloc, ObjectOperation& op, @@ -3008,7 +3074,7 @@ public: void read(const object_t& oid, const object_locator_t& oloc, ObjectOperation&& op, snapid_t snapid, ceph::buffer::list *pbl, - int flags, std::unique_ptr&& onack, + int flags, Op::OpComp onack, version_t *objver = nullptr, int *data_offset = nullptr, uint64_t features = 0, ZTracer::Trace *parent_trace = nullptr) { Op *o = new Op(oid, oloc, std::move(op.ops), flags | global_op_flags | @@ -3031,6 +3097,17 @@ public: op_submit(o); } + void read(const object_t& oid, const object_locator_t& oloc, + ObjectOperation&& op, snapid_t snapid, ceph::buffer::list *pbl, + int flags, std::unique_ptr> onack, + version_t *objver = nullptr, int *data_offset = nullptr, + uint64_t features = 0, ZTracer::Trace *parent_trace = nullptr) { + read(oid, oloc, std::move(op), snapid, pbl, flags, + [c = std::move(onack)](boost::system::error_code e) mutable { + c->dispatch(std::move(c), e); + }, objver, data_offset, features, parent_trace); + } + Op *prepare_pg_read_op( uint32_t hash, object_locator_t oloc, @@ -3074,7 +3151,7 @@ public: ceph_tid_t pg_read( uint32_t hash, object_locator_t oloc, ObjectOperation& op, ceph::buffer::list *pbl, int flags, - std::unique_ptr&& onack, epoch_t *reply_epoch, int *ctx_budget) { + Op::OpComp onack, epoch_t *reply_epoch, int *ctx_budget) { ceph_tid_t tid; Op *o = new Op(object_t(), oloc, std::move(op.ops), @@ -3118,6 +3195,18 @@ public: return linger_watch(info, op, snapc, mtime, inbl, OpContextVert(onfinish, nullptr), objver); } + ceph_tid_t linger_watch(LingerOp *info, + ObjectOperation& op, + const SnapContext& snapc, ceph::real_time mtime, + ceph::buffer::list& inbl, + std::unique_ptr> onfinish, + version_t *objver) { + return linger_watch(info, op, snapc, mtime, inbl, + OpCompletionVert( + std::move(onfinish)), objver); + } ceph_tid_t linger_notify(LingerOp *info, ObjectOperation& op, snapid_t snap, ceph::buffer::list& inbl, @@ -3133,6 +3222,17 @@ public: OpContextVert(onack, poutbl), objver); } + ceph_tid_t linger_notify(LingerOp *info, + ObjectOperation& op, + snapid_t snap, ceph::buffer::list& inbl, + std::unique_ptr> onack, + version_t *objver) { + return linger_notify(info, op, snap, inbl, + OpCompletionVert( + std::move(onack)), objver); + } tl::expected linger_check(LingerOp *info); void linger_cancel(LingerOp *info); // releases a reference @@ -3713,15 +3813,27 @@ public: create_pool_snap(pool, snapName, OpContextVert(c, nullptr)); } + void create_pool_snap( + int64_t pool, std::string_view snapName, + std::unique_ptr> c) { + create_pool_snap(pool, snapName, + OpCompletionVert(std::move(c))); + } void allocate_selfmanaged_snap(int64_t pool, - std::unique_ptr> onfinish); + snapid_t)> onfinish); void allocate_selfmanaged_snap(int64_t pool, snapid_t* psnapid, Context* c) { allocate_selfmanaged_snap(pool, OpContextVert(c, psnapid)); } + void allocate_selfmanaged_snap(int64_t pool, + std::unique_ptr> c) { + allocate_selfmanaged_snap(pool, + OpCompletionVert(std::move(c))); + } void delete_pool_snap(int64_t pool, std::string_view snapName, decltype(PoolOp::onfinish)&& onfinish); void delete_pool_snap(int64_t pool, std::string_view snapName, @@ -3729,6 +3841,12 @@ public: delete_pool_snap(pool, snapName, OpContextVert(c, nullptr)); } + void delete_pool_snap(int64_t pool, std::string_view snapName, + std::unique_ptr> c) { + delete_pool_snap(pool, snapName, + OpCompletionVert(std::move(c))); + } void delete_selfmanaged_snap(int64_t pool, snapid_t snap, decltype(PoolOp::onfinish)&& onfinish); @@ -3737,6 +3855,12 @@ public: delete_selfmanaged_snap(pool, snap, OpContextVert(c, nullptr)); } + void delete_selfmanaged_snap(int64_t pool, snapid_t snap, + std::unique_ptr> c) { + delete_selfmanaged_snap(pool, snap, + OpCompletionVert(std::move(c))); + } void create_pool(std::string_view name, @@ -3748,12 +3872,25 @@ public: OpContextVert(onfinish, nullptr), crush_rule); } + void create_pool(std::string_view name, + std::unique_ptr> c, + int crush_rule=-1) { + create_pool(name, + OpCompletionVert(std::move(c)), + crush_rule); + } void delete_pool(int64_t pool, decltype(PoolOp::onfinish)&& onfinish); void delete_pool(int64_t pool, Context* onfinish) { delete_pool(pool, OpContextVert(onfinish, nullptr)); } + void delete_pool(int64_t pool, + std::unique_ptr> c) { + delete_pool(pool, OpCompletionVert(std::move(c))); + } void delete_pool(std::string_view name, decltype(PoolOp::onfinish)&& onfinish); @@ -3762,6 +3899,11 @@ public: Context* onfinish) { delete_pool(name, OpContextVert(onfinish, nullptr)); } + void delete_pool(std::string_view name, + std::unique_ptr> c) { + delete_pool(name, OpCompletionVert(std::move(c))); + } void handle_pool_op_reply(MPoolOpReply *m); int pool_op_cancel(ceph_tid_t tid, int r); @@ -3772,18 +3914,18 @@ private: void _poolstat_submit(PoolStatOp *op); public: void handle_get_pool_stats_reply(MGetPoolStatsReply *m); - void get_pool_stats(const std::vector& pools, - decltype(PoolStatOp::onfinish)&& onfinish); + void get_pool_stats_(const std::vector& pools, + decltype(PoolStatOp::onfinish)&& onfinish); template - auto get_pool_stats(const std::vector& pools, + auto get_pool_stats(std::vector pools, CompletionToken&& token) { - boost::asio::async_completion init(token); - get_pool_stats(pools, - PoolStatOp::OpComp::create( - service.get_executor(), - std::move(init.completion_handler))); - return init.result.get(); + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + service.get_executor())); + return boost::asio::async_initiate( + [pools = std::move(pools), this](auto handler) { + get_pool_stats_(pools, std::move(handler)); + }, consigned); } int pool_stat_op_cancel(ceph_tid_t tid, int r); void _finish_pool_stat_op(PoolStatOp *op, int r); @@ -3794,20 +3936,27 @@ private: void _fs_stats_submit(StatfsOp *op); public: void handle_fs_stats_reply(MStatfsReply *m); - void get_fs_stats(std::optional poolid, - decltype(StatfsOp::onfinish)&& onfinish); + void get_fs_stats_(std::optional poolid, + decltype(StatfsOp::onfinish)&& onfinish); template auto get_fs_stats(std::optional poolid, CompletionToken&& token) { - boost::asio::async_completion init(token); - get_fs_stats(poolid, - StatfsOp::OpComp::create(service.get_executor(), - std::move(init.completion_handler))); - return init.result.get(); + auto consigned = boost::asio::consign( + std::forward(token), boost::asio::make_work_guard( + service.get_executor())); + return boost::asio::async_initiate( + [poolid, this](auto handler) { + get_fs_stats_(poolid, std::move(handler)); + }, consigned); } void get_fs_stats(struct ceph_statfs& result, std::optional poolid, Context *onfinish) { - get_fs_stats(poolid, OpContextVert(onfinish, result)); + get_fs_stats_(poolid, OpContextVert(onfinish, result)); + } + void get_fs_stats(std::optional poolid, + std::unique_ptr> c) { + get_fs_stats_(poolid, OpCompletionVert(std::move(c))); } int statfs_op_cancel(ceph_tid_t tid, int r); void _finish_statfs_op(StatfsOp *op, int r); diff --git a/src/test/librados_test_stub/NeoradosTestStub.cc b/src/test/librados_test_stub/NeoradosTestStub.cc index 4bf4fc037ecf1..c1b29bad6c476 100644 --- a/src/test/librados_test_stub/NeoradosTestStub.cc +++ b/src/test/librados_test_stub/NeoradosTestStub.cc @@ -22,6 +22,7 @@ #include namespace bs = boost::system; +namespace asio = boost::asio; using namespace std::literals; using namespace std::placeholders; @@ -82,7 +83,7 @@ public: namespace { struct CompletionPayload { - std::unique_ptr c; + Op::Completion c; }; void completion_callback_adapter(rados_completion_t c, void *arg) { @@ -91,14 +92,14 @@ void completion_callback_adapter(rados_completion_t c, void *arg) { impl->release(); auto payload = reinterpret_cast(arg); - payload->c->defer(std::move(payload->c), - (r < 0) ? bs::error_code(-r, osd_category()) : - bs::error_code()); + asio::dispatch(asio::append(std::move(payload->c), + (r < 0) ? bs::error_code(-r, osd_category()) : + bs::error_code())); delete payload; } librados::AioCompletionImpl* create_aio_completion( - std::unique_ptr&& c) { + Op::Completion&& c) { auto payload = new CompletionPayload{std::move(c)}; auto impl = new librados::AioCompletionImpl(); @@ -588,12 +589,12 @@ boost::asio::io_context::executor_type neorados::RADOS::get_executor() const { return impl->io_context.get_executor(); } -void RADOS::execute(Object o, IOContext ioc, ReadOp op, - ceph::buffer::list* bl, std::unique_ptr c, - uint64_t* objver, const blkin_trace_info* trace_info) { +void RADOS::execute_(Object o, IOContext ioc, ReadOp op, + ceph::buffer::list* bl, Op::Completion c, + uint64_t* objver, const blkin_trace_info* trace_info) { auto io_ctx = impl->get_io_ctx(ioc); if (io_ctx == nullptr) { - c->dispatch(std::move(c), osdc_errc::pool_dne); + asio::dispatch(asio::append(std::move(c), osdc_errc::pool_dne)); return; } @@ -607,12 +608,12 @@ void RADOS::execute(Object o, IOContext ioc, ReadOp op, ceph_assert(r == 0); } -void RADOS::execute(Object o, IOContext ioc, WriteOp op, - std::unique_ptr c, uint64_t* objver, - const blkin_trace_info* trace_info) { +void RADOS::execute_(Object o, IOContext ioc, WriteOp op, + Op::Completion c, uint64_t* objver, + const blkin_trace_info* trace_info) { auto io_ctx = impl->get_io_ctx(ioc); if (io_ctx == nullptr) { - c->dispatch(std::move(c), osdc_errc::pool_dne); + asio::dispatch(asio::append(std::move(c), osdc_errc::pool_dne)); return; } @@ -629,29 +630,33 @@ void RADOS::execute(Object o, IOContext ioc, WriteOp op, ceph_assert(r == 0); } -void RADOS::mon_command(std::vector command, - bufferlist bl, - std::string* outs, bufferlist* outbl, - std::unique_ptr c) { +void RADOS::mon_command_(std::vector command, + bufferlist bl, + std::string* outs, bufferlist* outbl, + Op::Completion c) { auto r = impl->test_rados_client->mon_command(command, bl, outbl, outs); - c->post(std::move(c), - (r < 0 ? bs::error_code(-r, osd_category()) : bs::error_code())); + asio::post(get_executor(), + asio::append(std::move(c), + (r < 0 ? bs::error_code(-r, osd_category()) : + bs::error_code()))); } -void RADOS::blocklist_add(std::string client_address, - std::optional expire, - std::unique_ptr c) { +void RADOS::blocklist_add_(std::string client_address, + std::optional expire, + SimpleOpComp c) { auto r = impl->test_rados_client->blocklist_add( std::string(client_address), expire.value_or(0s).count()); - c->post(std::move(c), - (r < 0 ? bs::error_code(-r, mon_category()) : bs::error_code())); + asio::post(get_executor(), + asio::append(std::move(c), + (r < 0 ? bs::error_code(-r, mon_category()) : + bs::error_code()))); } -void RADOS::wait_for_latest_osd_map(std::unique_ptr c) { +void RADOS::wait_for_latest_osd_map_(Op::Completion c) { auto r = impl->test_rados_client->wait_for_latest_osd_map(); - c->dispatch(std::move(c), - (r < 0 ? bs::error_code(-r, osd_category()) : - bs::error_code())); + asio::dispatch(asio::append(std::move(c), + (r < 0 ? bs::error_code(-r, osd_category()) : + bs::error_code()))); } } // namespace neorados -- 2.39.5