#include "include/neorados/RADOS_Decodable.hpp"
-// Needed for type erasure and template support. We can't really avoid
-// it.
-
-#include "common/async/completion.h"
-
// These are needed for RGW, but in general as a 'shiny new interface'
// we should try to use forward declarations and provide standard alternatives.
std::size_t size() const;
using Signature = void(boost::system::error_code);
- using Completion = ceph::async::Completion<Signature>;
+ using Completion = boost::asio::any_completion_handler<Signature>;
friend std::ostream& operator <<(std::ostream& m, const Op& o);
protected:
}
using BuildSig = void(boost::system::error_code, RADOS);
- using BuildComp = ceph::async::Completion<BuildSig>;
+ using BuildComp = boost::asio::any_completion_handler<BuildSig>;
class Builder {
std::optional<std::string> conf_files;
std::optional<std::string> cluster;
return *this;
}
- template<typename CompletionToken>
+ template<boost::asio::completion_token_for<BuildSig> CompletionToken>
auto build(boost::asio::io_context& ioctx, CompletionToken&& token) {
- return boost::asio::async_initiate<CompletionToken, BuildSig>(
- [&ioctx, this](auto&& handler) {
- build(ioctx, BuildComp::create(ioctx.get_executor(),
- std::move(handler)));
- }, token);
+ auto consigned = boost::asio::consign(
+ std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+ boost::asio::get_associated_executor(token, ioctx.get_executor())));
+ return boost::asio::async_initiate<decltype(consigned), BuildSig>(
+ [&ioctx, this](auto handler) {
+ build_(ioctx, std::move(handler));
+ }, consigned);
}
private:
- void build(boost::asio::io_context& ioctx,
- std::unique_ptr<BuildComp> c);
+ void build_(boost::asio::io_context& ioctx,
+ BuildComp c);
};
- template<typename CompletionToken>
+ template<boost::asio::completion_token_for<BuildSig> CompletionToken>
static auto make_with_cct(CephContext* cct,
boost::asio::io_context& ioctx,
CompletionToken&& token) {
- return boost::asio::async_initiate<CompletionToken, BuildSig>(
+ auto consigned = boost::asio::consign(
+ std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+ boost::asio::get_associated_executor(token, ioctx.get_executor())));
+ return boost::asio::async_initiate<decltype(consigned), BuildSig>(
[cct, &ioctx](auto&& handler) {
- make_with_cct(cct, ioctx,
- BuildComp::create(ioctx.get_executor(),
- std::move(handler)));
- }, token);
+ make_with_cct_(cct, ioctx, std::move(handler));
+ }, consigned);
}
static RADOS make_with_librados(librados::Rados& rados);
executor_type get_executor() const;
boost::asio::io_context& get_io_context();
- template<typename CompletionToken>
+ template<boost::asio::completion_token_for<Op::Signature> CompletionToken>
auto execute(Object o, IOContext ioc, ReadOp op,
ceph::buffer::list* bl,
CompletionToken&& token, uint64_t* objver = nullptr,
const blkin_trace_info* trace_info = nullptr) {
- return boost::asio::async_initiate<CompletionToken, Op::Signature>(
+ auto consigned = boost::asio::consign(
+ std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+ boost::asio::get_associated_executor(token, get_executor())));
+ return boost::asio::async_initiate<decltype(consigned), Op::Signature>(
[o = std::move(o), ioc = std::move(ioc), op = std::move(op),
bl, objver, trace_info, this](auto&& handler) mutable {
- execute(std::move(o), std::move(ioc), std::move(op), bl,
- ReadOp::Completion::create(get_executor(),
- std::move(handler)),
- objver, trace_info);
- }, token);
+ execute_(std::move(o), std::move(ioc), std::move(op), bl,
+ std::move(handler), objver, trace_info);
+ }, consigned);
}
- template<typename CompletionToken>
+ template<boost::asio::completion_token_for<Op::Signature> CompletionToken>
auto execute(Object o, IOContext ioc, WriteOp op,
CompletionToken&& token, uint64_t* objver = nullptr,
const blkin_trace_info* trace_info = nullptr) {
- return boost::asio::async_initiate<CompletionToken, Op::Signature>(
+ auto consigned = boost::asio::consign(
+ std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+ boost::asio::get_associated_executor(token, get_executor())));
+ return boost::asio::async_initiate<decltype(consigned), Op::Signature>(
[o = std::move(o), ioc = std::move(ioc), op = std::move(op),
objver, trace_info, this](auto&& handler) mutable {
- execute(std::move(o), std::move(ioc), std::move(op),
- WriteOp::Completion::create(get_executor(),
- std::move(handler)),
- objver, trace_info);
- }, token);
+ execute_(std::move(o), std::move(ioc), std::move(op),
+ std::move(handler), objver, trace_info);
+ }, consigned);
}
boost::uuids::uuid get_fsid() const noexcept;
using LookupPoolSig = void(boost::system::error_code,
std::int64_t);
- using LookupPoolComp = ceph::async::Completion<LookupPoolSig>;
- template<typename CompletionToken>
+ using LookupPoolComp = boost::asio::any_completion_handler<LookupPoolSig>;
+ template<boost::asio::completion_token_for<LookupPoolSig> CompletionToken>
auto lookup_pool(std::string name,
CompletionToken&& token) {
- return boost::asio::async_initiate<CompletionToken, LookupPoolSig>(
+ auto consigned = boost::asio::consign(
+ std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+ boost::asio::get_associated_executor(token, get_executor())));
+ return boost::asio::async_initiate<decltype(consigned), LookupPoolSig>(
[name = std::move(name), this](auto&& handler) mutable {
- lookup_pool(std::move(name),
- LookupPoolComp::create(get_executor(),
- std::move(handler)));
- }, token);
+ lookup_pool_(std::move(name), std::move(handler));
+ }, consigned);
}
std::optional<uint64_t> get_pool_alignment(int64_t pool_id);
using LSPoolsSig = void(std::vector<std::pair<std::int64_t, std::string>>);
- using LSPoolsComp = ceph::async::Completion<LSPoolsSig>;
- template<typename CompletionToken>
+ using LSPoolsComp = boost::asio::any_completion_handler<LSPoolsSig>;
+ template<boost::asio::completion_token_for<LSPoolsSig> CompletionToken>
auto list_pools(CompletionToken&& token) {
- return boost::asio::async_initiate<CompletionToken, LSPoolsSig>(
+ auto consigned = boost::asio::consign(
+ std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+ boost::asio::get_associated_executor(token, get_executor())));
+ return boost::asio::async_initiate<decltype(consigned), LSPoolsSig>(
[this](auto&& handler) {
- list_pools(LSPoolsComp::create(get_executor(),
- std::move(handler)));
- }, token);
+ list_pools_(std::move(handler));
+ }, consigned);
}
using SimpleOpSig = void(boost::system::error_code);
- using SimpleOpComp = ceph::async::Completion<SimpleOpSig>;
- template<typename CompletionToken>
+ using SimpleOpComp = boost::asio::any_completion_handler<SimpleOpSig>;
+ template<boost::asio::completion_token_for<SimpleOpSig> CompletionToken>
auto create_pool_snap(int64_t pool, std::string snap_name,
CompletionToken&& token) {
- return boost::asio::async_initiate<CompletionToken, SimpleOpSig>(
+ auto consigned = boost::asio::consign(
+ std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+ boost::asio::get_associated_executor(token, get_executor())));
+ return boost::asio::async_initiate<decltype(consigned), SimpleOpSig>(
[pool, snap_name = std::move(snap_name), this](auto&& handler) mutable {
- create_pool_snap(pool, std::move(snap_name),
- SimpleOpComp::create(get_executor(),
- std::move(handler)));
- }, token);
+ create_pool_snap_(pool, std::move(snap_name),
+ std::move(handler));
+ }, consigned);
}
using SMSnapSig = void(boost::system::error_code, std::uint64_t);
- using SMSnapComp = ceph::async::Completion<SMSnapSig>;
- template<typename CompletionToken>
+ using SMSnapComp = boost::asio::any_completion_handler<SMSnapSig>;
+ template<boost::asio::completion_token_for<SMSnapSig> CompletionToken>
auto allocate_selfmanaged_snap(int64_t pool,
CompletionToken&& token) {
- return boost::asio::async_initiate<CompletionToken, SMSnapSig>(
+ auto consigned = boost::asio::consign(
+ std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+ boost::asio::get_associated_executor(token, get_executor())));
+ return boost::asio::async_initiate<decltype(consigned), SMSnapSig>(
[pool, this](auto&& handler) mutable {
- allocage_selfmanaged_snap(pool,
- SMSnapComp::create(get_executor(),
- std::move(handler)));
- }, token);
+ allocage_selfmanaged_snap_(pool, std::move(handler));
+ }, consigned);
}
- template<typename CompletionToken>
+ template<boost::asio::completion_token_for<SimpleOpSig> CompletionToken>
auto delete_pool_snap(int64_t pool, std::string snap_name,
CompletionToken&& token) {
- return boost::asio::async_initiate<CompletionToken, SimpleOpSig>(
+ auto consigned = boost::asio::consign(
+ std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+ boost::asio::get_associated_executor(token, get_executor())));
+ return boost::asio::async_initiate<decltype(consigned), SimpleOpSig>(
[pool, snap_name = std::move(snap_name), this](auto&& handler) mutable {
- delete_pool_snap(pool, std::move(snap_name),
- SimpleOpComp::create(get_executor(),
- std::move(handler)));
- }, token);
+ delete_pool_snap_(pool, std::move(snap_name),
+ std::move(handler));
+ }, consigned);
}
- template<typename CompletionToken>
+ template<boost::asio::completion_token_for<SimpleOpSig> CompletionToken>
auto delete_selfmanaged_snap(int64_t pool, std::string snap_name,
CompletionToken&& token) {
- return boost::asio::async_initiate<CompletionToken, SimpleOpSig>(
+ auto consigned = boost::asio::consign(
+ std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+ boost::asio::get_associated_executor(token, get_executor())));
+ return boost::asio::async_initiate<decltype(consigned), SimpleOpSig>(
[pool, snap_name = std::move(snap_name), this](auto&& handler) mutable {
- delete_selfmanaged_snap(pool, std::move(snap_name),
- SimpleOpComp::create(get_executor(),
- std::move(handler)));
- }, token);
+ delete_selfmanaged_snap_(pool, std::move(snap_name),
+ std::move(handler));
+ }, consigned);
}
- template<typename CompletionToken>
+ template<boost::asio::completion_token_for<SimpleOpSig> CompletionToken>
auto create_pool(std::string name, std::optional<int> crush_rule,
CompletionToken&& token) {
- return boost::asio::async_initiate<CompletionToken, SimpleOpSig>(
+ auto consigned = boost::asio::consign(
+ std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+ boost::asio::get_associated_executor(token, get_executor())));
+ return boost::asio::async_initiate<decltype(consigned), SimpleOpSig>(
[name = std::move(name), crush_rule, this](auto&& handler) mutable {
- create_pool(std::move(name), crush_rule,
- SimpleOpComp::create(get_executor(),
- std::move(handler)));
- }, token);
+ create_pool_(std::move(name), crush_rule,
+ std::move(handler));
+ }, consigned);
}
- template<typename CompletionToken>
+ template<boost::asio::completion_token_for<SimpleOpSig> CompletionToken>
auto delete_pool(std::string name,
CompletionToken&& token) {
- return boost::asio::async_initiate<CompletionToken, SimpleOpSig>(
+ auto consigned = boost::asio::consign(
+ std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+ boost::asio::get_associated_executor(token, get_executor())));
+ return boost::asio::async_initiate<decltype(consigned), SimpleOpSig>(
[name = std::move(name), this](auto&& handler) mutable {
- delete_pool(std::move(name),
- SimpleOpComp::create(get_executor(),
- std::move(handler)));
- }, token);
+ delete_pool_(std::move(name), std::move(handler));
+ }, consigned);
}
- template<typename CompletionToken>
+ template<boost::asio::completion_token_for<SimpleOpSig> CompletionToken>
auto delete_pool(int64_t pool,
CompletionToken&& token) {
- return boost::asio::async_initiate<CompletionToken, SimpleOpSig>(
+ auto consigned = boost::asio::consign(
+ std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+ boost::asio::get_associated_executor(token, get_executor())));
+ return boost::asio::async_initiate<decltype(consigned), SimpleOpSig>(
[pool, this](auto&& handler) mutable {
- delete_pool(pool,
- SimpleOpComp::create(get_executor(),
- std::move(handler)));
- }, token);
+ delete_pool_(pool, std::move(handler));
+ }, consigned);
}
using PoolStatSig = void(boost::system::error_code,
boost::container::flat_map<std::string,
PoolStats>, bool);
- using PoolStatComp = ceph::async::Completion<PoolStatSig>;
- template<typename CompletionToken>
+ using PoolStatComp = boost::asio::any_completion_handler<PoolStatSig>;
+ template<boost::asio::completion_token_for<PoolStatSig> CompletionToken>
auto stat_pools(std::vector<std::string> pools,
CompletionToken&& token) {
- return boost::asio::async_initiate<CompletionToken, PoolStatSig>(
+ auto consigned = boost::asio::consign(
+ std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+ boost::asio::get_associated_executor(token, get_executor())));
+ return boost::asio::async_initiate<decltype(consigned), PoolStatSig>(
[pools = std::move(pools), this](auto&& handler) mutable {
- stat_pools(std::move(pools),
- PoolStatComp::create(get_executor(),
- std::move(handler)));
- }, token);
+ stat_pools_(std::move(pools), std::move(handler));
+ }, consigned);
}
using StatFSSig = void(boost::system::error_code,
FSStats);
- using StatFSComp = ceph::async::Completion<StatFSSig>;
- template<typename CompletionToken>
+ using StatFSComp = boost::asio::any_completion_handler<StatFSSig>;
+ template<boost::asio::completion_token_for<StatFSSig> CompletionToken>
auto statfs(std::optional<int64_t> pool,
CompletionToken&& token) {
- return boost::asio::async_initiate<CompletionToken, StatFSSig>(
+ auto consigned = boost::asio::consign(
+ std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+ boost::asio::get_associated_executor(token, get_executor())));
+ return boost::asio::async_initiate<decltype(consigned), StatFSSig>(
[pool, this](auto&& handler) mutable {
- statfs(pool, StatFSComp::create(get_executor(),
- std::move(handler)));
- }, token);
+ statfs_(pool, std::move(handler));
+ }, consigned);
}
using WatchCB = fu2::unique_function<void(boost::system::error_code,
using WatchSig = void(boost::system::error_code ec,
uint64_t cookie);
- using WatchComp = ceph::async::Completion<WatchSig>;
- template<typename CompletionToken>
+ using WatchComp = boost::asio::any_completion_handler<WatchSig>;
+ template<boost::asio::completion_token_for<WatchSig> CompletionToken>
auto watch(Object o, IOContext ioc,
std::optional<std::chrono::seconds> timeout,
WatchCB cb, CompletionToken&& token) {
- return boost::asio::async_initiate<CompletionToken, WatchSig>(
+ auto consigned = boost::asio::consign(
+ std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+ boost::asio::get_associated_executor(token, get_executor())));
+ return boost::asio::async_initiate<decltype(consigned), WatchSig>(
[o = std::move(o), ioc = std::move(ioc), timeout, cb = std::move(cb),
this](auto&& handler) mutable {
- watch(std::move(o), std::move(ioc), timeout, std::move(cb),
- WatchComp::create(get_executor(),
- std::move(handler)));
- }, token);
+ watch_(std::move(o), std::move(ioc), timeout, std::move(cb),
+ std::move(handler));
+ }, consigned);
}
- template<typename CompletionToken>
+ template<boost::asio::completion_token_for<SimpleOpSig> CompletionToken>
auto notify_ack(Object o,
IOContext ioc,
uint64_t notify_id,
uint64_t cookie,
ceph::buffer::list bl,
CompletionToken&& token) {
- return boost::asio::async_initiate<CompletionToken, SimpleOpSig>(
+ auto consigned = boost::asio::consign(
+ std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+ boost::asio::get_associated_executor(token, get_executor())));
+ return boost::asio::async_initiate<decltype(consigned), SimpleOpSig>(
[o = std::move(o), ioc = std::move(ioc), notify_id, cookie,
bl = std::move(bl), this](auto&& handler) mutable {
- notify_ack(std::move(o), std::move(ioc), notify_id, std::move(cookie),
- std::move(bl), SimpleOpComp::create(get_executor(),
- std::move(handler)));
- }, token);
+ notify_ack_(std::move(o), std::move(ioc), std::move(notify_id),
+ std::move(cookie), std::move(bl), std::move(handler));
+ }, consigned);
}
- template<typename CompletionToken>
+ template<boost::asio::completion_token_for<SimpleOpSig> CompletionToken>
auto unwatch(std::uint64_t cookie, IOContext ioc,
CompletionToken&& token) {
- return boost::asio::async_initiate<CompletionToken, SimpleOpSig>(
+ auto consigned = boost::asio::consign(
+ std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+ boost::asio::get_associated_executor(token, get_executor())));
+ return boost::asio::async_initiate<decltype(consigned), SimpleOpSig>(
[cookie, ioc = std::move(ioc), this](auto&& handler) mutable {
- unwatch(cookie, std::move(ioc),
- SimpleOpComp::create(get_executor(),
- std::move(handler)));
- }, token);
+ unwatch_(cookie, std::move(ioc), std::move(handler));
+ }, consigned);
}
// This is one of those places where having to force everything into
// let us separate out the implementation details without
// sacrificing all the benefits of templates.
using VoidOpSig = void();
- using VoidOpComp = ceph::async::Completion<VoidOpSig>;
- template<typename CompletionToken>
+ using VoidOpComp = boost::asio::any_completion_handler<VoidOpSig>;
+ template<boost::asio::completion_token_for<VoidOpSig> CompletionToken>
auto flush_watch(CompletionToken&& token) {
- return boost::asio::async_initiate<CompletionToken, VoidOpSig>(
+ auto consigned = boost::asio::consign(
+ std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+ boost::asio::get_associated_executor(token, get_executor())));
+ return boost::asio::async_initiate<decltype(consigned), VoidOpSig>(
[this](auto&& handler) {
- flush_watch(VoidOpComp::create(get_executor(),
- std::move(handler)));
- }, token);
+ flush_watch_(std::move(handler));
+ }, consigned);
}
using NotifySig = void(boost::system::error_code, ceph::buffer::list);
- using NotifyComp = ceph::async::Completion<NotifySig>;
- template<typename CompletionToken>
+ using NotifyComp = boost::asio::any_completion_handler<NotifySig>;
+ template<boost::asio::completion_token_for<NotifySig> CompletionToken>
auto notify(Object o, IOContext ioc, ceph::buffer::list bl,
std::optional<std::chrono::milliseconds> timeout,
CompletionToken&& token) {
- return boost::asio::async_initiate<CompletionToken, NotifySig>(
+ auto consigned = boost::asio::consign(
+ std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+ boost::asio::get_associated_executor(token, get_executor())));
+ return boost::asio::async_initiate<decltype(consigned), NotifySig>(
[o = std::move(o), ioc = std::move(ioc), bl = std::move(bl), timeout,
this](auto&& handler) mutable {
- notify(std::move(o), std::move(ioc), std::move(bl), timeout,
- NotifyComp::create(get_executor(),
- std::move(handler)));
- }, token);
+ notify_(std::move(o), std::move(ioc), std::move(bl), timeout,
+ std::move(handler));
+ }, consigned);
}
// The versions with pointers are fine for coroutines, but
using EnumerateSig = void(boost::system::error_code,
std::vector<Entry>,
Cursor);
- using EnumerateComp = ceph::async::Completion<EnumerateSig>;
- template<typename CompletionToken>
+ using EnumerateComp = boost::asio::any_completion_handler<EnumerateSig>;
+ template<boost::asio::completion_token_for<EnumerateSig> CompletionToken>
auto enumerate_objects(IOContext ioc, Cursor begin,
Cursor end, const std::uint32_t max,
ceph::buffer::list filter,
CompletionToken&& token) {
- return boost::asio::async_initiate<CompletionToken, EnumerateSig>(
+ auto consigned = boost::asio::consign(
+ std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+ boost::asio::get_associated_executor(token, get_executor())));
+ return boost::asio::async_initiate<decltype(consigned), EnumerateSig>(
[ioc = std::move(ioc), begin = std::move(begin), end = std::move(end),
max, filter = std::move(filter), this](auto&& handler) mutable {
- enumerate_objects(std::move(ioc), std::move(begin), std::move(end),
- std::move(max), std::move(filter),
- EnumerateComp::create(get_executor(),
- std::move(handler)));
- }, token);
+ enumerate_objects_(std::move(ioc), std::move(begin), std::move(end),
+ std::move(max), std::move(filter),
+ std::move(handler));
+ }, consigned);
}
using CommandSig = void(boost::system::error_code,
std::string, ceph::buffer::list);
- using CommandComp = ceph::async::Completion<CommandSig>;
- template<typename CompletionToken>
+ using CommandComp = boost::asio::any_completion_handler<CommandSig>;
+ template<boost::asio::completion_token_for<CommandSig> CompletionToken>
auto osd_command(int osd, std::vector<std::string> cmd,
ceph::buffer::list in, CompletionToken&& token) {
- return boost::asio::async_initiate<CompletionToken, CommandSig>(
+ auto consigned = boost::asio::consign(
+ std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+ boost::asio::get_associated_executor(token, get_executor())));
+ return boost::asio::async_initiate<decltype(consigned), CommandSig>(
[osd, cmd = std::move(cmd), in = std::move(in),
this](auto&& handler) mutable {
- osd_command(osd, std::move(cmd), std::move(in),
- CommandComp::create(get_executor(),
- std::move(handler)));
- }, token);
+ osd_command_(osd, std::move(cmd), std::move(in),
+ std::move(handler));
+ }, consigned);
}
- template<typename CompletionToken>
+ template<boost::asio::completion_token_for<CommandSig> CompletionToken>
auto pg_command(PG pg, std::vector<std::string> cmd,
ceph::buffer::list in, CompletionToken&& token) {
- return boost::asio::async_initiate<CompletionToken, CommandSig>(
+ auto consigned = boost::asio::consign(
+ std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+ boost::asio::get_associated_executor(token, get_executor())));
+ return boost::asio::async_initiate<decltype(consigned), CommandSig>(
[pg = std::move(pg), cmd = std::move(cmd), in = std::move(in),
this](auto&& handler) mutable {
- pg_command(std::move(pg), std::move(cmd), std::move(in),
- CommandComp::create(get_executor(),
- std::move(handler)));
- }, token);
+ pg_command_(std::move(pg), std::move(cmd), std::move(in),
+ std::move(handler));
+ }, consigned);
}
- template<typename CompletionToken>
+ template<boost::asio::completion_token_for<SimpleOpSig> CompletionToken>
auto mon_command(std::vector<std::string> command,
ceph::buffer::list&& bl,
std::string* outs, ceph::buffer::list* outbl,
CompletionToken&& token) {
- return boost::asio::async_initiate<CompletionToken, SimpleOpSig>(
+ auto consigned = boost::asio::consign(
+ std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+ boost::asio::get_associated_executor(token, get_executor())));
+ return boost::asio::async_initiate<decltype(consigned), SimpleOpSig>(
[command = std::move(command), bl = std::move(bl), outs, outbl,
this](auto&& handler) mutable {
- mon_command(std::move(command), std::move(bl), outs, outbl,
- SimpleOpComp::create(get_executor(),
- std::move(handler)));
- }, token);
+ mon_command_(std::move(command), std::move(bl), outs, outbl,
+ std::move(handler));
+ }, consigned);
}
- template<typename CompletionToken>
+ template<boost::asio::completion_token_for<SimpleOpSig> CompletionToken>
auto enable_application(std::string pool, std::string app_name,
bool force, CompletionToken&& token) {
- return boost::asio::async_initiate<CompletionToken, SimpleOpSig>(
+ auto consigned = boost::asio::consign(
+ std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+ boost::asio::get_associated_executor(token, get_executor())));
+ return boost::asio::async_initiate<decltype(consigned), SimpleOpSig>(
[pool = std::move(pool), app_name = std::move(app_name),
force, this](auto&& handler) mutable {
- enable_application(std::move(pool), std::move(app_name), force,
- SimpleOpComp::create(get_executor(),
- std::move(handler)));
- }, token);
+ enable_application_(std::move(pool), std::move(app_name), force,
+ std::move(handler));
+ }, consigned);
}
- template<typename CompletionToken>
+ template<boost::asio::completion_token_for<SimpleOpSig> CompletionToken>
auto blocklist_add(std::string client_address,
std::optional<std::chrono::seconds> expire,
CompletionToken&& token) {
- return boost::asio::async_initiate<CompletionToken, SimpleOpSig>(
+ auto consigned = boost::asio::consign(
+ std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+ boost::asio::get_associated_executor(token, get_executor())));
+ return boost::asio::async_initiate<decltype(consigned), SimpleOpSig>(
[client_address = std::move(client_address), expire,
this](auto&& handler) mutable {
- blocklist_add(std::move(client_address), expire,
- SimpleOpComp::create(get_executor(),
- std::move(handler)));
- }, token);
+ blocklist_add_(std::move(client_address), expire,
+ std::move(handler));
+ }, consigned);
}
- template<typename CompletionToken>
+ template<boost::asio::completion_token_for<SimpleOpSig> CompletionToken>
auto wait_for_latest_osd_map(CompletionToken&& token) {
- return boost::asio::async_initiate<CompletionToken, SimpleOpSig>(
+ auto consigned = boost::asio::consign(
+ std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+ boost::asio::get_associated_executor(token, get_executor())));
+ return boost::asio::async_initiate<decltype(consigned), SimpleOpSig>(
[this](auto&& handler) {
- wait_for_latest_osd_map(SimpleOpComp::create(get_executor(),
- std::move(handler)));
- }, token);
+ wait_for_latest_osd_map_(std::move(handler));
+ }, consigned);
}
uint64_t instance_id() const;
friend Builder;
RADOS(std::unique_ptr<detail::Client> impl);
- static void make_with_cct(CephContext* cct,
- boost::asio::io_context& ioctx,
- std::unique_ptr<BuildComp> c);
-
- void execute(Object o, IOContext ioc, ReadOp op,
- ceph::buffer::list* bl, std::unique_ptr<Op::Completion> c,
- uint64_t* objver, const blkin_trace_info* trace_info);
-
- void execute(Object o, IOContext ioc, WriteOp op,
- std::unique_ptr<Op::Completion> c, uint64_t* objver,
- const blkin_trace_info* trace_info);
-
-
- void lookup_pool(std::string name, std::unique_ptr<LookupPoolComp> c);
- void list_pools(std::unique_ptr<LSPoolsComp> c);
- void create_pool_snap(int64_t pool, std::string snap_name,
- std::unique_ptr<SimpleOpComp> c);
- void allocate_selfmanaged_snap(int64_t pool, std::unique_ptr<SMSnapComp> c);
- void delete_pool_snap(int64_t pool, std::string snap_name,
- std::unique_ptr<SimpleOpComp> c);
- void delete_selfmanaged_snap(int64_t pool, std::uint64_t snap,
- std::unique_ptr<SimpleOpComp> c);
- void create_pool(std::string name, std::optional<int> crush_rule,
- std::unique_ptr<SimpleOpComp> c);
- void delete_pool(std::string name,
- std::unique_ptr<SimpleOpComp> c);
- void delete_pool(int64_t pool,
- std::unique_ptr<SimpleOpComp> c);
- void stat_pools(std::vector<std::string> pools,
- std::unique_ptr<PoolStatComp> c);
- void stat_fs(std::optional<std::int64_t> pool,
- std::unique_ptr<StatFSComp> c);
-
- void watch(Object o, IOContext ioc,
- std::optional<std::chrono::seconds> timeout,
- WatchCB cb, std::unique_ptr<WatchComp> c);
+ static void make_with_cct_(CephContext* cct,
+ boost::asio::io_context& ioctx,
+ BuildComp c);
+
+ void execute_(Object o, IOContext ioc, ReadOp op,
+ ceph::buffer::list* bl, Op::Completion c,
+ uint64_t* objver, const blkin_trace_info* trace_info);
+
+ void execute_(Object o, IOContext ioc, WriteOp op,
+ Op::Completion c, uint64_t* objver,
+ const blkin_trace_info* trace_info);
+
+
+ void lookup_pool_(std::string name, LookupPoolComp c);
+ void list_pools_(LSPoolsComp c);
+ void create_pool_snap_(int64_t pool, std::string snap_name,
+ SimpleOpComp c);
+ void allocate_selfmanaged_snap_(int64_t pool, SMSnapComp c);
+ void delete_pool_snap_(int64_t pool, std::string snap_name,
+ SimpleOpComp c);
+ void delete_selfmanaged_snap_(int64_t pool, std::uint64_t snap,
+ SimpleOpComp c);
+ void create_pool_(std::string name, std::optional<int> crush_rule,
+ SimpleOpComp c);
+ void delete_pool_(std::string name,
+ SimpleOpComp c);
+ void delete_pool_(int64_t pool,
+ SimpleOpComp c);
+ void stat_pools_(std::vector<std::string> pools,
+ PoolStatComp c);
+ void stat_fs_(std::optional<std::int64_t> pool,
+ StatFSComp c);
+
+ void watch_(Object o, IOContext ioc,
+ std::optional<std::chrono::seconds> timeout,
+ WatchCB cb, WatchComp c);
tl::expected<ceph::timespan, boost::system::error_code>
- watch_check(uint64_t cookie);
- void notify_ack(Object o, IOContext _ioc,
- uint64_t notify_id,
- uint64_t cookie,
- ceph::buffer::list bl,
- std::unique_ptr<SimpleOpComp>);
- void unwatch(uint64_t cookie, IOContext ioc,
- std::unique_ptr<SimpleOpComp>);
- void notify(Object oid, IOContext ioctx,
- ceph::buffer::list bl,
- std::optional<std::chrono::milliseconds> timeout,
- std::unique_ptr<NotifyComp> c);
- void flush_watch(std::unique_ptr<VoidOpComp>);
-
- void enumerate_objects(IOContext ioc, Cursor begin,
- Cursor end, std::uint32_t max,
- ceph::buffer::list filter,
- std::vector<Entry>* ls,
- Cursor* cursor,
- std::unique_ptr<SimpleOpComp> c);
- void enumerate_objects(IOContext ioc, Cursor begin,
- Cursor end, std::uint32_t max,
- ceph::buffer::list filter,
- std::unique_ptr<EnumerateComp> c);
- void osd_command(int osd, std::vector<std::string> cmd,
- ceph::buffer::list in, std::unique_ptr<CommandComp> c);
- void pg_command(PG pg, std::vector<std::string> cmd,
- ceph::buffer::list in, std::unique_ptr<CommandComp> c);
-
- void mon_command(std::vector<std::string> command,
+ watch_check_(uint64_t cookie);
+ void notify_ack_(Object o, IOContext _ioc,
+ uint64_t notify_id,
+ uint64_t cookie,
ceph::buffer::list bl,
- std::string* outs, ceph::buffer::list* outbl,
- std::unique_ptr<SimpleOpComp> c);
-
- void enable_application(std::string pool, std::string app_name,
- bool force, std::unique_ptr<SimpleOpComp> c);
-
- void blocklist_add(std::string client_address,
- std::optional<std::chrono::seconds> expire,
- std::unique_ptr<SimpleOpComp> c);
-
- void wait_for_latest_osd_map(std::unique_ptr<SimpleOpComp> c);
+ SimpleOpComp);
+ void unwatch_(uint64_t cookie, IOContext ioc,
+ SimpleOpComp);
+ void notify_(Object oid, IOContext ioctx,
+ ceph::buffer::list bl,
+ std::optional<std::chrono::milliseconds> timeout,
+ NotifyComp c);
+ void flush_watch_(VoidOpComp);
+
+ void enumerate_objects_(IOContext ioc, Cursor begin,
+ Cursor end, std::uint32_t max,
+ ceph::buffer::list filter,
+ std::vector<Entry>* ls,
+ Cursor* cursor,
+ SimpleOpComp c);
+ void enumerate_objects_(IOContext ioc, Cursor begin,
+ Cursor end, std::uint32_t max,
+ ceph::buffer::list filter,
+ EnumerateComp c);
+ void osd_command_(int osd, std::vector<std::string> cmd,
+ ceph::buffer::list in, CommandComp c);
+ void pg_command_(PG pg, std::vector<std::string> cmd,
+ ceph::buffer::list in, CommandComp c);
+
+ void mon_command_(std::vector<std::string> command,
+ ceph::buffer::list bl,
+ std::string* outs, ceph::buffer::list* outbl,
+ SimpleOpComp c);
+
+ void enable_application_(std::string pool, std::string app_name,
+ bool force, SimpleOpComp c);
+
+ void blocklist_add_(std::string client_address,
+ std::optional<std::chrono::seconds> expire,
+ SimpleOpComp c);
+
+ void wait_for_latest_osd_map_(SimpleOpComp c);
// Proxy object to provide access to low-level RADOS messaging clients
std::unique_ptr<detail::Client> impl;
extra_op_flags);
C_SaferCond notify_finish_cond;
+ auto e = boost::asio::prefer(
+ objecter->service.get_executor(),
+ boost::asio::execution::outstanding_work.tracked);
linger_op->on_notify_finish =
- Objecter::LingerOp::OpComp::create(
- objecter->service.get_executor(),
+ boost::asio::bind_executor(
+ std::move(e),
CB_notify_Finish(client->cct, ¬ify_finish_cond,
objecter, linger_op, preply_bl,
preply_buf, preply_buf_len));
c->io = this;
C_aio_notify_Complete *oncomplete = new C_aio_notify_Complete(c, linger_op);
+ auto e = boost::asio::prefer(
+ objecter->service.get_executor(),
+ boost::asio::execution::outstanding_work.tracked);
linger_op->on_notify_finish =
- Objecter::LingerOp::OpComp::create(
- objecter->service.get_executor(),
+ boost::asio::bind_executor(
+ std::move(e),
CB_notify_Finish(client->cct, oncomplete,
objecter, linger_op,
preply_bl, preply_buf,
// latest map. One day if COMPACT_VERSION of MClientRequest >=3,
// we can remove those code.
mdr->waited_for_osdmap = true;
- mds->objecter->wait_for_latest_osdmap(std::ref(*new C_IO_Wrapper(
- mds, new C_MDS_RetryRequest(mdcache, mdr))));
+ mds->objecter->wait_for_latest_osdmap(
+ [c = new C_IO_Wrapper(mds, new C_MDS_RetryRequest(mdcache, mdr))]
+ (boost::system::error_code ec) {
+ c->complete(ceph::from_error_code(ec));
+ });
return r;
}
}
using namespace std::literals;
+namespace asio = boost::asio;
namespace bc = boost::container;
namespace bs = boost::system;
-namespace ca = ceph::async;
namespace cb = ceph::buffer;
namespace neorados {
return *this;
}
-void RADOS::Builder::build(boost::asio::io_context& ioctx,
- std::unique_ptr<BuildComp> c) {
+void RADOS::Builder::build_(asio::io_context& ioctx,
+ BuildComp c) {
constexpr auto env = CODE_ENVIRONMENT_LIBRARY;
CephInitParameters ci(env);
if (name)
auto r = cct->_conf.parse_config_files(conf_files ? conf_files->data() : nullptr,
&ss, flags);
if (r < 0)
- c->post(std::move(c), ceph::to_error_code(r), RADOS{nullptr});
+ asio::post(ioctx.get_executor(),
+ asio::append(std::move(c), ceph::to_error_code(r),
+ RADOS{nullptr}));
}
cct->_conf.parse_env(cct->get_module_type());
std::stringstream ss;
auto r = cct->_conf.set_val(n, v, &ss);
if (r < 0)
- c->post(std::move(c), ceph::to_error_code(-EINVAL), RADOS{nullptr});
+ asio::post(ioctx.get_executor(),
+ asio::append(std::move(c), ceph::to_error_code(-EINVAL),
+ RADOS{nullptr}));
}
if (!no_mon_conf) {
// TODO This function should return an error code.
auto err = mc_bootstrap.get_monmap_and_config();
if (err < 0)
- c->post(std::move(c), ceph::to_error_code(err), RADOS{nullptr});
+ asio::post(ioctx.get_executor(),
+ asio::append(std::move(c), ceph::to_error_code(err),
+ RADOS{nullptr}));
}
if (!cct->_log->is_started()) {
cct->_log->start();
RADOS::make_with_cct(cct, ioctx, std::move(c));
}
-void RADOS::make_with_cct(CephContext* cct,
- boost::asio::io_context& ioctx,
- std::unique_ptr<BuildComp> c) {
+void RADOS::make_with_cct_(CephContext* cct,
+ asio::io_context& ioctx,
+ BuildComp c) {
try {
auto r = new detail::NeoClient{std::make_unique<detail::RADOS>(ioctx, cct)};
r->objecter->wait_for_osd_map(
[c = std::move(c), r = std::unique_ptr<detail::Client>(r)]() mutable {
- c->dispatch(std::move(c), bs::error_code{},
- RADOS{std::move(r)});
+ asio::dispatch(asio::append(std::move(c), bs::error_code{},
+ RADOS{std::move(r)}));
});
} catch (const bs::system_error& err) {
- c->post(std::move(c), err.code(), RADOS{nullptr});
+ asio::post(ioctx.get_executor(),
+ asio::append(std::move(c), err.code(), RADOS{nullptr}));
}
}
return impl->ioctx.get_executor();
}
-boost::asio::io_context& RADOS::get_io_context() {
+asio::io_context& RADOS::get_io_context() {
return impl->ioctx;
}
-void RADOS::execute(Object o, IOContext _ioc, ReadOp _op,
- cb::list* bl,
- std::unique_ptr<ReadOp::Completion> c, version_t* objver,
- const blkin_trace_info *trace_info) {
+void RADOS::execute_(Object o, IOContext _ioc, ReadOp _op,
+ cb::list* bl,
+ ReadOp::Completion c, version_t* objver,
+ const blkin_trace_info *trace_info) {
auto oid = reinterpret_cast<const object_t*>(&o.impl);
auto ioc = reinterpret_cast<const IOContextImpl*>(&_ioc.impl);
auto op = reinterpret_cast<OpImpl*>(&_op.impl);
trace.event("submitted");
}
-void RADOS::execute(Object o, IOContext _ioc, WriteOp _op,
- std::unique_ptr<WriteOp::Completion> c, version_t* objver,
- const blkin_trace_info *trace_info) {
+void RADOS::execute_(Object o, IOContext _ioc, WriteOp _op,
+ WriteOp::Completion c, version_t* objver,
+ const blkin_trace_info *trace_info) {
auto oid = reinterpret_cast<const object_t*>(&o.impl);
auto ioc = reinterpret_cast<const IOContextImpl*>(&_ioc.impl);
auto op = reinterpret_cast<OpImpl*>(&_op.impl);
trace.event("submitted");
}
-void RADOS::lookup_pool(std::string name,
- std::unique_ptr<LookupPoolComp> c)
+void RADOS::lookup_pool_(std::string name,
+ LookupPoolComp c)
{
// I kind of want to make lookup_pg_pool return
// std::optional<int64_t> since it can only return one error code.
return osdmap.lookup_pg_pool_name(name);
});
if (ret < 0)
- ca::dispatch(std::move(c), osdc_errc::pool_dne,
- std::int64_t(0));
+ asio::dispatch(asio::append(std::move(c), osdc_errc::pool_dne,
+ std::int64_t(0)));
else
- ca::dispatch(std::move(c), bs::error_code{}, ret);
+ asio::dispatch(asio::append(std::move(c), bs::error_code{}, ret));
});
} else if (ret < 0) {
- ca::post(std::move(c), osdc_errc::pool_dne,
- std::int64_t(0));
+ asio::post(get_executor(),
+ asio::append(std::move(c), osdc_errc::pool_dne,
+ std::int64_t(0)));
} else {
- ca::post(std::move(c), bs::error_code{}, ret);
+ asio::post(get_executor(),
+ asio::append(std::move(c), bs::error_code{}, ret));
}
}
});
}
-void RADOS::list_pools(std::unique_ptr<LSPoolsComp> c) {
- ca::dispatch(std::move(c),
- impl->objecter->with_osdmap(
- [&](OSDMap& o) {
- std::vector<std::pair<std::int64_t, std::string>> v;
- for (auto p : o.get_pools())
- v.push_back(std::make_pair(p.first,
- o.get_pool_name(p.first)));
- return v;
- }));
+void RADOS::list_pools_(LSPoolsComp c) {
+ asio::dispatch(asio::append(std::move(c),
+ impl->objecter->with_osdmap(
+ [&](OSDMap& o) {
+ std::vector<std::pair<std::int64_t, std::string>> v;
+ for (auto p : o.get_pools())
+ v.push_back(std::make_pair(p.first,
+ o.get_pool_name(p.first)));
+ return v;
+ })));
}
-void RADOS::create_pool_snap(std::int64_t pool,
- std::string snap_name,
- std::unique_ptr<SimpleOpComp> c)
+void RADOS::create_pool_snap_(std::int64_t pool,
+ std::string snap_name,
+ SimpleOpComp c)
{
+ auto e = asio::prefer(get_executor(),
+ asio::execution::outstanding_work.tracked);
impl->objecter->create_pool_snap(
pool, snap_name,
- Objecter::PoolOp::OpComp::create(
- get_executor(),
+ asio::bind_executor(
+ std::move(e),
[c = std::move(c)](bs::error_code e, const bufferlist&) mutable {
- ca::dispatch(std::move(c), e);
+ asio::dispatch(asio::append(std::move(c), e));
}));
}
-void RADOS::allocate_selfmanaged_snap(int64_t pool,
- std::unique_ptr<SMSnapComp> c) {
+void RADOS::allocate_selfmanaged_snap_(int64_t pool,
+ SMSnapComp c) {
+ auto e = asio::prefer(
+ get_executor(),
+ asio::execution::outstanding_work.tracked);
+
impl->objecter->allocate_selfmanaged_snap(
pool,
- ca::Completion<void(bs::error_code, snapid_t)>::create(
- get_executor(),
+ asio::bind_executor(
+ std::move(e),
[c = std::move(c)](bs::error_code e, snapid_t snap) mutable {
- ca::dispatch(std::move(c), e, snap);
+ asio::dispatch(asio::append(std::move(c), e, snap));
}));
}
-void RADOS::delete_pool_snap(std::int64_t pool,
- std::string snap_name,
- std::unique_ptr<SimpleOpComp> c)
+void RADOS::delete_pool_snap_(std::int64_t pool,
+ std::string snap_name,
+ SimpleOpComp c)
{
+ auto e = asio::prefer(get_executor(),
+ asio::execution::outstanding_work.tracked);
impl->objecter->delete_pool_snap(
pool, snap_name,
- Objecter::PoolOp::OpComp::create(
- get_executor(),
+ asio::bind_executor(
+ std::move(e),
[c = std::move(c)](bs::error_code e, const bufferlist&) mutable {
- ca::dispatch(std::move(c), e);
+ asio::dispatch(asio::append(std::move(c), e));
}));
}
-void RADOS::delete_selfmanaged_snap(std::int64_t pool,
- std::uint64_t snap,
- std::unique_ptr<SimpleOpComp> c)
+void RADOS::delete_selfmanaged_snap_(std::int64_t pool,
+ std::uint64_t snap,
+ SimpleOpComp c)
{
+ auto e = asio::prefer(get_executor(),
+ asio::execution::outstanding_work.tracked);
impl->objecter->delete_selfmanaged_snap(
pool, snap,
- Objecter::PoolOp::OpComp::create(
- get_executor(),
+ asio::bind_executor(
+ std::move(e),
[c = std::move(c)](bs::error_code e, const bufferlist&) mutable {
- ca::dispatch(std::move(c), e);
+ asio::dispatch(asio::append(std::move(c), e));
}));
}
-void RADOS::create_pool(std::string name,
- std::optional<int> crush_rule,
- std::unique_ptr<SimpleOpComp> c)
+void RADOS::create_pool_(std::string name,
+ std::optional<int> crush_rule,
+ SimpleOpComp c)
{
+ auto e = asio::prefer(get_executor(),
+ asio::execution::outstanding_work.tracked);
+
impl->objecter->create_pool(
name,
- Objecter::PoolOp::OpComp::create(
- get_executor(),
+ asio::bind_executor(
+ std::move(e),
[c = std::move(c)](bs::error_code e, const bufferlist&) mutable {
- ca::dispatch(std::move(c), e);
+ asio::dispatch(asio::append(std::move(c), e));
}),
crush_rule.value_or(-1));
}
-void RADOS::delete_pool(std::string name,
- std::unique_ptr<SimpleOpComp> c)
+void RADOS::delete_pool_(std::string name,
+ SimpleOpComp c)
{
+ auto e = asio::prefer(get_executor(),
+ asio::execution::outstanding_work.tracked);
impl->objecter->delete_pool(
name,
- Objecter::PoolOp::OpComp::create(
- get_executor(),
+ asio::bind_executor(
+ std::move(e),
[c = std::move(c)](bs::error_code e, const bufferlist&) mutable {
- ca::dispatch(std::move(c), e);
+ asio::dispatch(asio::append(std::move(c), e));
}));
}
-void RADOS::delete_pool(std::int64_t pool,
- std::unique_ptr<SimpleOpComp> c)
+void RADOS::delete_pool_(std::int64_t pool,
+ SimpleOpComp c)
{
+ auto e = asio::prefer(get_executor(),
+ asio::execution::outstanding_work.tracked);
impl->objecter->delete_pool(
pool,
- Objecter::PoolOp::OpComp::create(
- get_executor(),
+ asio::bind_executor(
+ std::move(e),
[c = std::move(c)](bs::error_code e, const bufferlist&) mutable {
- ca::dispatch(std::move(c), e);
+ asio::dispatch(asio::append(std::move(c), e));
}));
}
-void RADOS::stat_pools(std::vector<std::string> pools,
- std::unique_ptr<PoolStatComp> c) {
+void RADOS::stat_pools_(std::vector<std::string> pools,
+ PoolStatComp c) {
impl->objecter->get_pool_stats(
pools,
[c = std::move(c)]
pv.compressed_bytes_alloc = statfs.data_compressed_allocated;
}
- ca::dispatch(std::move(c), ec, std::move(result), per_pool);
+ asio::dispatch(asio::append(std::move(c), ec, std::move(result),
+ per_pool));
});
}
-void RADOS::stat_fs(std::optional<std::int64_t> _pool,
- std::unique_ptr<StatFSComp> c) {
+void RADOS::stat_fs_(std::optional<std::int64_t> _pool,
+ StatFSComp c) {
std::optional<int64_t> pool;
if (_pool)
pool = *pool;
pool,
[c = std::move(c)](bs::error_code ec, const struct ceph_statfs s) mutable {
FSStats fso{s.kb, s.kb_used, s.kb_avail, s.num_objects};
- c->dispatch(std::move(c), ec, std::move(fso));
+ asio::dispatch(asio::append(std::move(c), ec, std::move(fso)));
});
}
// --- Watch/Notify
-void RADOS::watch(Object o, IOContext _ioc,
- std::optional<std::chrono::seconds> timeout, WatchCB cb,
- std::unique_ptr<WatchComp> c) {
+void RADOS::watch_(Object o, IOContext _ioc,
+ std::optional<std::chrono::seconds> timeout, WatchCB cb,
+ WatchComp c) {
auto oid = reinterpret_cast<const object_t*>(&o.impl);
auto ioc = reinterpret_cast<const IOContextImpl*>(&_ioc.impl);
linger_op->handle = std::move(cb);
op.watch(cookie, CEPH_OSD_WATCH_OP_WATCH, timeout.value_or(0s).count());
bufferlist bl;
+ auto e = asio::prefer(get_executor(),
+ asio::execution::outstanding_work.tracked);
impl->objecter->linger_watch(
linger_op, op, ioc->snapc, ceph::real_clock::now(), bl,
- Objecter::LingerOp::OpComp::create(
- get_executor(),
+ asio::bind_executor(
+ std::move(e),
[c = std::move(c), cookie](bs::error_code e, cb::list) mutable {
- ca::dispatch(std::move(c), e, cookie);
+ asio::dispatch(asio::append(std::move(c), e, cookie));
}), nullptr);
}
-void RADOS::notify_ack(Object o,
- IOContext _ioc,
- uint64_t notify_id,
- uint64_t cookie,
- bufferlist bl,
- std::unique_ptr<SimpleOpComp> c)
+void RADOS::notify_ack_(Object o,
+ IOContext _ioc,
+ uint64_t notify_id,
+ uint64_t cookie,
+ bufferlist bl,
+ SimpleOpComp c)
{
auto oid = reinterpret_cast<const object_t*>(&o.impl);
auto ioc = reinterpret_cast<const IOContextImpl*>(&_ioc.impl);
nullptr, ioc->extra_op_flags, std::move(c));
}
-tl::expected<ceph::timespan, bs::error_code> RADOS::watch_check(uint64_t cookie)
+tl::expected<ceph::timespan, bs::error_code> RADOS::watch_check_(uint64_t cookie)
{
Objecter::LingerOp *linger_op = reinterpret_cast<Objecter::LingerOp*>(cookie);
return impl->objecter->linger_check(linger_op);
}
-void RADOS::unwatch(uint64_t cookie, IOContext _ioc,
- std::unique_ptr<SimpleOpComp> c)
+void RADOS::unwatch_(uint64_t cookie, IOContext _ioc,
+ SimpleOpComp c)
{
auto ioc = reinterpret_cast<const IOContextImpl*>(&_ioc.impl);
ObjectOperation op;
op.watch(cookie, CEPH_OSD_WATCH_OP_UNWATCH);
+ auto e = asio::prefer(get_executor(),
+ asio::execution::outstanding_work.tracked);
impl->objecter->mutate(linger_op->target.base_oid, ioc->oloc, std::move(op),
ioc->snapc, ceph::real_clock::now(), ioc->extra_op_flags,
- Objecter::Op::OpComp::create(
- get_executor(),
+ asio::bind_executor(
+ std::move(e),
[objecter = impl->objecter,
linger_op, c = std::move(c)]
(bs::error_code ec) mutable {
objecter->linger_cancel(linger_op);
- ca::dispatch(std::move(c), ec);
+ asio::dispatch(asio::append(std::move(c), ec));
}));
}
-void RADOS::flush_watch(std::unique_ptr<VoidOpComp> c)
+void RADOS::flush_watch_(VoidOpComp c)
{
impl->objecter->linger_callback_flush([c = std::move(c)]() mutable {
- ca::post(std::move(c));
+ asio::dispatch(std::move(c));
});
}
struct NotifyHandler : std::enable_shared_from_this<NotifyHandler> {
- boost::asio::io_context& ioc;
- boost::asio::strand<boost::asio::io_context::executor_type> strand;
+ asio::io_context& ioc;
+ asio::strand<asio::io_context::executor_type> strand;
Objecter* objecter;
Objecter::LingerOp* op;
- std::unique_ptr<RADOS::NotifyComp> c;
+ RADOS::NotifyComp c;
bool acked = false;
bool finished = false;
bs::error_code res;
bufferlist rbl;
- NotifyHandler(boost::asio::io_context& ioc,
+ NotifyHandler(asio::io_context& ioc,
Objecter* objecter,
Objecter::LingerOp* op,
- std::unique_ptr<RADOS::NotifyComp> c)
- : ioc(ioc), strand(boost::asio::make_strand(ioc)),
+ RADOS::NotifyComp c)
+ : ioc(ioc), strand(asio::make_strand(ioc)),
objecter(objecter), op(op), c(std::move(c)) {}
// Use bind or a lambda to pass this in.
void handle_ack(bs::error_code ec,
bufferlist&&) {
- boost::asio::post(
+ asio::post(
strand,
[this, ec, p = shared_from_this()]() mutable {
acked = true;
void operator()(bs::error_code ec,
bufferlist&& bl) {
- boost::asio::post(
+ asio::post(
strand,
[this, ec, p = shared_from_this()]() mutable {
finished = true;
if ((acked && finished) || res) {
objecter->linger_cancel(op);
ceph_assert(c);
- ca::dispatch(std::move(c), res, std::move(rbl));
+ asio::dispatch(asio::append(std::move(c), res, std::move(rbl)));
}
}
};
-void RADOS::notify(Object o, IOContext _ioc, bufferlist bl,
- std::optional<std::chrono::milliseconds> timeout,
- std::unique_ptr<NotifyComp> c)
+void RADOS::notify_(Object o, IOContext _ioc, bufferlist bl,
+ std::optional<std::chrono::milliseconds> timeout,
+ NotifyComp c)
{
auto oid = reinterpret_cast<const object_t*>(&o.impl);
auto ioc = reinterpret_cast<const IOContextImpl*>(&_ioc.impl);
auto cb = std::make_shared<NotifyHandler>(impl->ioctx, impl->objecter,
linger_op, std::move(c));
+ auto e = asio::prefer(get_executor(),
+ asio::execution::outstanding_work.tracked);
linger_op->on_notify_finish =
- Objecter::LingerOp::OpComp::create(
- get_executor(),
+ asio::bind_executor(
+ e,
[cb](bs::error_code ec, ceph::bufferlist bl) mutable {
(*cb)(ec, std::move(bl));
});
impl->objecter->linger_notify(
linger_op, rd, ioc->snap_seq, inbl,
- Objecter::LingerOp::OpComp::create(
- get_executor(),
+ asio::bind_executor(
+ e,
[cb](bs::error_code ec, ceph::bufferlist bl) mutable {
cb->handle_ack(ec, std::move(bl));
}), nullptr);
return e;
}
-void RADOS::enumerate_objects(IOContext _ioc,
- Cursor begin,
- Cursor end,
- const std::uint32_t max,
- bufferlist filter,
- std::unique_ptr<EnumerateComp> c) {
+void RADOS::enumerate_objects_(IOContext _ioc,
+ Cursor begin,
+ Cursor end,
+ const std::uint32_t max,
+ bufferlist filter,
+ EnumerateComp c) {
auto ioc = reinterpret_cast<const IOContextImpl*>(&_ioc.impl);
impl->objecter->enumerate_objects<Entry>(
[c = std::move(c)]
(bs::error_code ec, std::vector<Entry>&& v,
hobject_t&& n) mutable {
- ca::dispatch(std::move(c), ec, std::move(v),
- Cursor(static_cast<void*>(&n)));
+ asio::dispatch(asio::append(std::move(c), ec, std::move(v),
+ Cursor(static_cast<void*>(&n))));
});
}
-void RADOS::osd_command(int osd, std::vector<std::string> cmd,
- ceph::bufferlist in, std::unique_ptr<CommandComp> c) {
- impl->objecter->osd_command(osd, std::move(cmd), std::move(in), nullptr,
- [c = std::move(c)]
- (bs::error_code ec,
- std::string&& s,
- ceph::bufferlist&& b) mutable {
- ca::dispatch(std::move(c), ec,
- std::move(s),
- std::move(b));
- });
+void RADOS::osd_command_(int osd, std::vector<std::string> cmd,
+ ceph::bufferlist in, CommandComp c) {
+ impl->objecter->osd_command(
+ osd, std::move(cmd), std::move(in), nullptr,
+ [c = std::move(c)]
+ (bs::error_code ec, std::string&& s, ceph::bufferlist&& b) mutable {
+ asio::dispatch(asio::append(std::move(c), ec, std::move(s),
+ std::move(b)));
+ });
}
-void RADOS::pg_command(PG pg, std::vector<std::string> cmd,
- ceph::bufferlist in, std::unique_ptr<CommandComp> c) {
- impl->objecter->pg_command(pg_t{pg.seed, pg.pool}, std::move(cmd), std::move(in), nullptr,
- [c = std::move(c)]
- (bs::error_code ec,
- std::string&& s,
- ceph::bufferlist&& b) mutable {
- ca::dispatch(std::move(c), ec,
- std::move(s),
- std::move(b));
- });
+void RADOS::pg_command_(PG pg, std::vector<std::string> cmd,
+ ceph::bufferlist in, CommandComp c) {
+ impl->objecter->pg_command(
+ pg_t{pg.seed, pg.pool}, std::move(cmd), std::move(in), nullptr,
+ [c = std::move(c)]
+ (bs::error_code ec, std::string&& s,
+ ceph::bufferlist&& b) mutable {
+ asio::dispatch(asio::append(std::move(c), ec, std::move(s),
+ std::move(b)));
+ });
}
-void RADOS::enable_application(std::string pool, std::string app_name,
- bool force, std::unique_ptr<SimpleOpComp> c) {
+void RADOS::enable_application_(std::string pool, std::string app_name,
+ bool force, SimpleOpComp c) {
// pre-Luminous clusters will return -EINVAL and application won't be
// preserved until Luminous is configured as minimum version.
if (!impl->get_required_monitor_features().contains_all(
ceph::features::mon::FEATURE_LUMINOUS)) {
- ca::post(std::move(c), ceph::to_error_code(-EOPNOTSUPP));
+ asio::post(get_executor(),
+ asio::append(std::move(c), ceph::to_error_code(-EOPNOTSUPP)));
} else {
impl->monclient.start_mon_command(
{ fmt::format("{{ \"prefix\": \"osd pool application enable\","
force ? " ,\"yes_i_really_mean_it\": true" : "")},
{}, [c = std::move(c)](bs::error_code e,
std::string, cb::list) mutable {
- ca::post(std::move(c), e);
- });
+ asio::dispatch(asio::append(std::move(c), e));
+ });
}
}
-void RADOS::blocklist_add(std::string client_address,
- std::optional<std::chrono::seconds> expire,
- std::unique_ptr<SimpleOpComp> c) {
+void RADOS::blocklist_add_(std::string client_address,
+ std::optional<std::chrono::seconds> expire,
+ SimpleOpComp c) {
auto expire_arg = (expire ?
fmt::format(", \"expire\": \"{}.0\"", expire->count()) : std::string{});
impl->monclient.start_mon_command(
[this, client_address = std::move(client_address), expire_arg,
c = std::move(c)](bs::error_code ec, std::string, cb::list) mutable {
if (ec != bs::errc::invalid_argument) {
- ca::post(std::move(c), ec);
+ asio::post(get_executor(),
+ asio::append(std::move(c), ec));
return;
}
client_address, expire_arg) },
{},
[c = std::move(c)](bs::error_code ec, std::string, cb::list) mutable {
- ca::post(std::move(c), ec);
+ asio::dispatch(asio::append(std::move(c), ec));
});
});
}
-void RADOS::wait_for_latest_osd_map(std::unique_ptr<SimpleOpComp> c) {
+void RADOS::wait_for_latest_osd_map_(SimpleOpComp c) {
impl->objecter->wait_for_latest_osdmap(std::move(c));
}
-void RADOS::mon_command(std::vector<std::string> command,
- cb::list bl,
- std::string* outs, cb::list* outbl,
- std::unique_ptr<SimpleOpComp> c) {
+void RADOS::mon_command_(std::vector<std::string> command,
+ cb::list bl,
+ std::string* outs, cb::list* outbl,
+ SimpleOpComp c) {
impl->monclient.start_mon_command(
command, bl,
*outs = std::move(s);
if (outbl)
*outbl = std::move(bl);
- ca::post(std::move(c), e);
+ asio::dispatch(asio::append(std::move(c), e));
});
}
namespace bs = boost::system;
namespace ca = ceph::async;
namespace cb = ceph::buffer;
+namespace asio = boost::asio;
#define dout_subsys ceph_subsys_objecter
#undef dout_prefix
std::unique_lock wl(info->watch_lock);
ldout(cct, 10) << "_linger_commit " << info->linger_id << dendl;
if (info->on_reg_commit) {
- info->on_reg_commit->defer(std::move(info->on_reg_commit),
- ec, cb::list{});
- info->on_reg_commit.reset();
+ asio::defer(service.get_executor(),
+ asio::append(std::move(info->on_reg_commit),
+ ec, cb::list{}));
}
if (ec && info->on_notify_finish) {
- info->on_notify_finish->defer(std::move(info->on_notify_finish),
- ec, cb::list{});
- info->on_notify_finish.reset();
+ asio::defer(service.get_executor(),
+ asio::append(std::move(info->on_notify_finish),
+ ec, cb::list{}));
}
// only tell the user the first time we do this
if (!info->last_error) {
ec = _normalize_watch_error(ec);
if (info->handle) {
- boost::asio::defer(finish_strand, CB_DoWatchError(this, info, ec));
+ asio::defer(finish_strand, CB_DoWatchError(this, info, ec));
}
}
}
Op *o = new Op(info->target.base_oid, info->target.base_oloc,
std::move(opv), info->target.flags | CEPH_OSD_FLAG_READ,
- CB_Linger_Ping(this, info, now),
+ fu2::unique_function<Op::OpSig>{CB_Linger_Ping(this, info, now)},
nullptr, nullptr);
o->target = info->target;
o->should_resend = false;
ec = _normalize_watch_error(ec);
info->last_error = ec;
if (info->handle) {
- boost::asio::defer(finish_strand, CB_DoWatchError(this, info, ec));
+ asio::defer(finish_strand, CB_DoWatchError(this, info, ec));
}
}
} else {
if (!info->last_error) {
info->last_error = bs::error_code(ENOTCONN, osd_category());
if (info->handle) {
- boost::asio::defer(finish_strand, CB_DoWatchError(this, info,
+ asio::defer(finish_strand, CB_DoWatchError(this, info,
info->last_error));
}
}
ldout(cct, 10) << __func__ << " reply notify " << m->notify_id
<< " != " << info->notify_id << ", ignoring" << dendl;
} else if (info->on_notify_finish) {
- info->on_notify_finish->defer(
- std::move(info->on_notify_finish),
- osdcode(m->return_code), std::move(m->get_data()));
-
+ asio::defer(service.get_executor(),
+ asio::append(std::move(info->on_notify_finish),
+ osdcode(m->return_code),
+ std::move(m->get_data())));
// if we race with reconnect we might get a second notify; only
// notify the caller once!
info->on_notify_finish = nullptr;
}
} else {
- boost::asio::defer(finish_strand, CB_DoWatchNotify(this, info, m));
+ asio::defer(finish_strand, CB_DoWatchNotify(this, info, m));
}
}
p->first <= osdmap->get_epoch()) {
//go through the list and call the onfinish methods
for (auto& [c, ec] : p->second) {
- ca::post(std::move(c), ec);
+ asio::post(service.get_executor(), asio::append(std::move(c), ec));
}
waiting_for_map.erase(p++);
}
<< " dne" << dendl;
if (op->has_completion()) {
num_in_flight--;
- op->complete(osdc_errc::pool_dne, -ENOENT);
+ op->complete(osdc_errc::pool_dne, -ENOENT, service.get_executor());
}
OSDSession *s = op->session;
<< " has eio" << dendl;
if (op->has_completion()) {
num_in_flight--;
- op->complete(osdc_errc::pool_eio, -EIO);
+ op->complete(osdc_errc::pool_eio, -EIO, service.get_executor());
}
OSDSession *s = op->session;
if (osdmap->get_epoch() >= op->map_dne_bound) {
std::unique_lock wl{op->watch_lock};
if (op->on_reg_commit) {
- op->on_reg_commit->defer(std::move(op->on_reg_commit),
- osdc_errc::pool_dne, cb::list{});
+ asio::defer(service.get_executor(),
+ asio::append(std::move(op->on_reg_commit),
+ osdc_errc::pool_dne, cb::list{}));
op->on_reg_commit = nullptr;
}
if (op->on_notify_finish) {
- op->on_notify_finish->defer(std::move(op->on_notify_finish),
- osdc_errc::pool_dne, cb::list{});
+ asio::defer(service.get_executor(),
+ asio::append(std::move(op->on_notify_finish),
+ osdc_errc::pool_dne, cb::list{}));
op->on_notify_finish = nullptr;
}
*need_unregister = true;
std::unique_lock wl{op->watch_lock};
if (op->on_reg_commit) {
- op->on_reg_commit->defer(std::move(op->on_reg_commit),
- osdc_errc::pool_dne, cb::list{});
- op->on_reg_commit = nullptr;
+ asio::defer(service.get_executor(),
+ asio::append(std::move(op->on_reg_commit),
+ osdc_errc::pool_dne, cb::list{}));
}
if (op->on_notify_finish) {
- op->on_notify_finish->defer(std::move(op->on_notify_finish),
- osdc_errc::pool_dne, cb::list{});
- op->on_notify_finish = nullptr;
+ asio::defer(service.get_executor(),
+ asio::append(std::move(op->on_notify_finish),
+ osdc_errc::pool_dne, cb::list{}));
}
}
}
ca::waiter<bs::error_code> w;
- waiting_for_map[e].emplace_back(OpCompletion::create(
+ auto ex = boost::asio::prefer(
+ service.get_executor(),
+ boost::asio::execution::outstanding_work.tracked);
+ waiting_for_map[e].emplace_back(asio::bind_executor(
service.get_executor(),
w.ref()),
bs::error_code{});
}
void Objecter::_get_latest_version(epoch_t oldest, epoch_t newest,
- std::unique_ptr<OpCompletion> fin,
+ OpCompletion fin,
std::unique_lock<ceph::shared_mutex>&& l)
{
ceph_assert(fin);
if (osdmap->get_epoch() >= newest) {
ldout(cct, 10) << __func__ << " latest " << newest << ", have it" << dendl;
l.unlock();
- ca::defer(std::move(fin), bs::error_code{});
+ asio::defer(service.get_executor(),
+ asio::append(std::move(fin), bs::error_code{}));
} else {
ldout(cct, 10) << __func__ << " latest " << newest << ", waiting" << dendl;
_wait_for_new_map(std::move(fin), newest, bs::error_code{});
}
}
-void Objecter::_wait_for_new_map(std::unique_ptr<OpCompletion> c, epoch_t epoch,
+void Objecter::_wait_for_new_map(OpCompletion c, epoch_t epoch,
bs::error_code ec)
{
// rwlock is locked unique
break;
case RECALC_OP_TARGET_POOL_EIO:
if (op->has_completion()) {
- op->complete(osdc_errc::pool_eio, -EIO);
+ op->complete(osdc_errc::pool_eio, -EIO, service.get_executor());
}
return;
}
Op *op = p->second;
if (op->has_completion()) {
num_in_flight--;
- op->complete(osdcode(r), r);
+ op->complete(osdcode(r), r, service.get_executor());
}
_op_cancel_map_check(op);
_finish_op(op, r);
// do callbacks
if (Op::has_completion(onfinish)) {
if (rc == 0 && handler_error) {
- Op::complete(std::move(onfinish), handler_error, -EIO);
+ Op::complete(std::move(onfinish), handler_error, -EIO, service.get_executor());
} else {
- Op::complete(std::move(onfinish), osdcode(rc), rc);
+ Op::complete(std::move(onfinish), osdcode(rc), rc, service.get_executor());
}
}
if (completion_lock.mutex()) {
const pg_pool_t *p = osdmap->get_pg_pool(pool);
if (!p) {
- onfinish->defer(std::move(onfinish), osdc_errc::pool_dne, cb::list{});
+ asio::defer(service.get_executor(),
+ asio::append(std::move(onfinish), osdc_errc::pool_dne, cb::list{}));
return;
}
if (p->snap_exists(snap_name)) {
- onfinish->defer(std::move(onfinish), osdc_errc::snapshot_exists,
- cb::list{});
+ asio::defer(service.get_executor(),
+ asio::append(std::move(onfinish), osdc_errc::snapshot_exists,
+ cb::list{}));
return;
}
}
struct CB_SelfmanagedSnap {
- std::unique_ptr<ca::Completion<void(bs::error_code, snapid_t)>> fin;
+ asio::any_completion_handler<void(bs::error_code, snapid_t)> fin;
CB_SelfmanagedSnap(decltype(fin)&& fin)
: fin(std::move(fin)) {}
void operator()(bs::error_code ec, const cb::list& bl) {
ec = e.code();
}
}
- fin->defer(std::move(fin), ec, snapid);
+ asio::dispatch(asio::append(std::move(fin), ec, snapid));
}
};
void Objecter::allocate_selfmanaged_snap(
int64_t pool,
- std::unique_ptr<ca::Completion<void(bs::error_code, snapid_t)>> onfinish)
+ asio::any_completion_handler<void(bs::error_code, snapid_t)> onfinish)
{
unique_lock wl(rwlock);
ldout(cct, 10) << "allocate_selfmanaged_snap; pool: " << pool << dendl;
auto op = new PoolOp;
op->tid = ++last_tid;
op->pool = pool;
- op->onfinish = PoolOp::OpComp::create(
+ auto e = boost::asio::prefer(
service.get_executor(),
- CB_SelfmanagedSnap(std::move(onfinish)));
+ boost::asio::execution::outstanding_work.tracked);
+ op->onfinish = asio::bind_executor(e, CB_SelfmanagedSnap(std::move(onfinish)));
op->pool_op = POOL_OP_CREATE_UNMANAGED_SNAP;
pool_ops[op->tid] = op;
const pg_pool_t *p = osdmap->get_pg_pool(pool);
if (!p) {
- onfinish->defer(std::move(onfinish), osdc_errc::pool_dne, cb::list{});
+ asio::defer(service.get_executor(),
+ asio::append(std::move(onfinish), osdc_errc::pool_dne,
+ cb::list{}));
return;
}
if (!p->snap_exists(snap_name)) {
- onfinish->defer(std::move(onfinish), osdc_errc::snapshot_dne, cb::list{});
+ asio::defer(service.get_executor(),
+ asio::append(std::move(onfinish), osdc_errc::snapshot_dne, cb::list{}));
return;
}
ldout(cct, 10) << "create_pool name=" << name << dendl;
if (osdmap->lookup_pg_pool_name(name) >= 0) {
- onfinish->defer(std::move(onfinish), osdc_errc::pool_exists, cb::list{});
+ asio::defer(service.get_executor(),
+ asio::append(std::move(onfinish), osdc_errc::pool_exists,
+ cb::list{}));
return;
}
ldout(cct, 10) << "delete_pool " << pool << dendl;
if (!osdmap->have_pg_pool(pool))
- onfinish->defer(std::move(onfinish), osdc_errc::pool_dne, cb::list{});
+ asio::defer(service.get_executor(),
+ asio::append(std::move(onfinish), osdc_errc::pool_dne,
+ cb::list{}));
else
_do_delete_pool(pool, std::move(onfinish));
}
int64_t pool = osdmap->lookup_pg_pool_name(pool_name);
if (pool < 0)
// This only returns one error: -ENOENT.
- onfinish->defer(std::move(onfinish), osdc_errc::pool_dne, cb::list{});
+ asio::defer(service.get_executor(),
+ asio::append(std::move(onfinish), osdc_errc::pool_dne,
+ cb::list{}));
else
_do_delete_pool(pool, std::move(onfinish));
}
if (osdmap->get_epoch() < m->epoch) {
ldout(cct, 20) << "waiting for client to reach epoch " << m->epoch
<< " before calling back" << dendl;
- _wait_for_new_map(OpCompletion::create(
- service.get_executor(),
+ auto e = boost::asio::prefer(
+ service.get_executor(),
+ boost::asio::execution::outstanding_work.tracked);
+ _wait_for_new_map(asio::bind_executor(
+ e,
[o = std::move(op->onfinish),
- bl = std::move(bl)](
+ bl = std::move(bl),
+ e = service.get_executor()](
bs::error_code ec) mutable {
- o->defer(std::move(o), ec, bl);
+ asio::defer(e, asio::append(std::move(o), ec, bl));
}),
m->epoch,
ec);
// sneaked in. Do caller-specified callback now or else
// we lose it forever.
ceph_assert(op->onfinish);
- op->onfinish->defer(std::move(op->onfinish), ec, std::move(bl));
+ asio::defer(service.get_executor(), asio::append(std::move(op->onfinish), ec, std::move(bl)));
}
} else {
ceph_assert(op->onfinish);
- op->onfinish->defer(std::move(op->onfinish), ec, std::move(bl));
+ asio::defer(service.get_executor(), asio::append(std::move(op->onfinish), ec, std::move(bl)));
}
op->onfinish = nullptr;
if (!sul.owns_lock()) {
PoolOp *op = it->second;
if (op->onfinish)
- op->onfinish->defer(std::move(op->onfinish), osdcode(r), cb::list{});
+ asio::defer(service.get_executor(), asio::append(std::move(op->onfinish),
+ osdcode(r), cb::list{}));
_finish_pool_op(op, r);
return 0;
// pool stats
-void Objecter::get_pool_stats(
+void Objecter::get_pool_stats_(
const std::vector<std::string>& pools,
decltype(PoolStatOp::onfinish)&& onfinish)
{
if (m->version > last_seen_pgmap_version) {
last_seen_pgmap_version = m->version;
}
- op->onfinish->defer(std::move(op->onfinish), bs::error_code{},
- std::move(m->pool_stats), m->per_pool);
+ asio::defer(service.get_executor(),
+ asio::append(std::move(op->onfinish), bs::error_code{},
+ std::move(m->pool_stats), m->per_pool));
_finish_pool_stat_op(op, 0);
} else {
ldout(cct, 10) << "unknown request " << tid << dendl;
auto op = it->second;
if (op->onfinish)
- op->onfinish->defer(std::move(op->onfinish), osdcode(r),
- bc::flat_map<std::string, pool_stat_t>{}, false);
+ asio::defer(service.get_executor(),
+ asio::append(std::move(op->onfinish), osdcode(r),
+ bc::flat_map<std::string, pool_stat_t>{}, false));
_finish_pool_stat_op(op, r);
return 0;
}
delete op;
}
-void Objecter::get_fs_stats(std::optional<int64_t> poolid,
- decltype(StatfsOp::onfinish)&& onfinish)
+void Objecter::get_fs_stats_(std::optional<int64_t> poolid,
+ decltype(StatfsOp::onfinish)&& onfinish)
{
ldout(cct, 10) << "get_fs_stats" << dendl;
unique_lock l(rwlock);
ldout(cct, 10) << "have request " << tid << " at " << op << dendl;
if (m->h.version > last_seen_pgmap_version)
last_seen_pgmap_version = m->h.version;
- op->onfinish->defer(std::move(op->onfinish), bs::error_code{}, m->h.st);
+ asio::defer(service.get_executor(), asio::append(std::move(op->onfinish),
+ bs::error_code{}, m->h.st));
_finish_statfs_op(op, 0);
} else {
ldout(cct, 10) << "unknown request " << tid << dendl;
auto op = it->second;
if (op->onfinish)
- op->onfinish->defer(std::move(op->onfinish), osdcode(r), ceph_statfs{});
+ asio::defer(service.get_executor(),
+ asio::append(std::move(op->onfinish),
+ osdcode(r), ceph_statfs{}));
_finish_statfs_op(op, r);
return 0;
}
<< rs << dendl;
if (c->onfinish)
- c->onfinish->defer(std::move(c->onfinish), ec, std::move(rs), std::move(bl));
+ asio::defer(service.get_executor(),
+ asio::append(std::move(c->onfinish), ec, std::move(rs),
+ std::move(bl)));
if (c->ontimeout && ec != bs::errc::timed_out)
timer.cancel_event(c->ontimeout);
Objecter::Objecter(CephContext *cct,
Messenger *m, MonClient *mc,
- boost::asio::io_context& service) :
+ asio::io_context& service) :
Dispatcher(cct), messenger(m), monc(mc), service(service)
{
mon_timeout = cct->_conf.get_val<std::chrono::seconds>("rados_mon_op_timeout");
auto pbl = &on_ack->bl;
// Issue. See you later in _enumerate_reply
+ auto e = boost::asio::prefer(
+ service.get_executor(),
+ boost::asio::execution::outstanding_work.tracked);
pg_read(start.get_hash(),
c->oloc, op, pbl, 0,
- Op::OpComp::create(service.get_executor(),
+ asio::bind_executor(e,
[c = std::move(on_ack)]
(bs::error_code ec) mutable {
(*c)(ec);
#include "include/function2.hpp"
#include "include/neorados/RADOS_Decodable.hpp"
-#include "common/admin_socket.h"
#include "common/async/completion.h"
+#include "common/admin_socket.h"
#include "common/ceph_time.h"
#include "common/ceph_mutex.h"
#include "common/ceph_timer.h"
using MOSDOp = _mosdop::MOSDOp<osdc_opvec>;
public:
using OpSignature = void(boost::system::error_code);
- using OpCompletion = ceph::async::Completion<OpSignature>;
+ using OpCompletion = boost::asio::any_completion_handler<OpSignature>;
// config observer bits
const char** get_tracked_conf_keys() const override;
void dump(ceph::Formatter *f) const;
};
- std::unique_ptr<ceph::async::Completion<void(boost::system::error_code)>>
+ boost::asio::any_completion_handler<void(boost::system::error_code)>
OpContextVert(Context* c) {
- if (c)
- return ceph::async::Completion<void(boost::system::error_code)>::create(
+ if (c) {
+ auto e = boost::asio::prefer(
service.get_executor(),
+ boost::asio::execution::outstanding_work.tracked);
+
+ return boost::asio::bind_executor(
+ std::move(e),
[c = std::unique_ptr<Context>(c)]
(boost::system::error_code e) mutable {
c.release()->complete(e);
});
+ }
else
return nullptr;
}
template<typename T>
- std::unique_ptr<ceph::async::Completion<void(boost::system::error_code, T)>>
+ boost::asio::any_completion_handler<void(boost::system::error_code, T)>
OpContextVert(Context* c, T* p) {
- if (c || p)
+ if (c || p) {
+ auto e = boost::asio::prefer(
+ service.get_executor(),
+ boost::asio::execution::outstanding_work.tracked);
return
- ceph::async::Completion<void(boost::system::error_code, T)>::create(
- service.get_executor(),
+ boost::asio::bind_executor(
+ e,
[c = std::unique_ptr<Context>(c), p]
(boost::system::error_code e, T r) mutable {
if (p)
*p = std::move(r);
if (c)
c.release()->complete(ceph::from_error_code(e));
- });
- else
+ });
+ } else {
return nullptr;
+ }
}
template<typename T>
- std::unique_ptr<ceph::async::Completion<void(boost::system::error_code, T)>>
+ boost::asio::any_completion_handler<void(boost::system::error_code, T)>
OpContextVert(Context* c, T& p) {
+ if (c) {
+ auto e = boost::asio::prefer(
+ service.get_executor(),
+ boost::asio::execution::outstanding_work.tracked);
+ return boost::asio::bind_executor(
+ e,
+ [c = std::unique_ptr<Context>(c), &p]
+ (boost::system::error_code e, T r) mutable {
+ p = std::move(r);
+ if (c)
+ c.release()->complete(ceph::from_error_code(e));
+ });
+ } else {
+ return nullptr;
+ }
+ }
+
+ boost::asio::any_completion_handler<void(boost::system::error_code)>
+ OpCompletionVert(std::unique_ptr<ceph::async::Completion<
+ void(boost::system::error_code)>> c) {
if (c)
- return ceph::async::Completion<
- void(boost::system::error_code, T)>::create(
- service.get_executor(),
- [c = std::unique_ptr<Context>(c), &p]
- (boost::system::error_code e, T r) mutable {
- p = std::move(r);
- if (c)
- c.release()->complete(ceph::from_error_code(e));
- });
+ return [c = std::move(c)](boost::system::error_code ec) mutable {
+ c->dispatch(std::move(c), ec);
+ };
else
return nullptr;
}
+ template<typename T>
+ boost::asio::any_completion_handler<void(boost::system::error_code, T)>
+ OpCompletionVert(std::unique_ptr<ceph::async::Completion<
+ void(boost::system::error_code, T)>> c) {
+ if (c) {
+ return [c = std::move(c)](boost::system::error_code ec, T t) mutable {
+ c->dispatch(std::move(c), ec, std::move(t));
+ };
+ } else {
+ return nullptr;
+ }
+ }
+
struct Op : public RefCountedObject {
OSDSession *session = nullptr;
int incarnation = 0;
int priority = 0;
using OpSig = void(boost::system::error_code);
- using OpComp = ceph::async::Completion<OpSig>;
+ using OpComp = boost::asio::any_completion_handler<OpSig>;
// Due to an irregularity of cmpxattr, we actualy need the 'int'
// value for onfinish for legacy librados users. As such just
// preserve the Context* in this one case. That way we can have
//
// Add a function for the linger case, where we want better
// semantics than Context, but still need to be under the completion_lock.
- std::variant<std::unique_ptr<OpComp>, fu2::unique_function<OpSig>,
+ std::variant<OpComp, fu2::unique_function<OpSig>,
Context*> onfinish;
uint64_t ontimeout = 0;
}
static void complete(decltype(onfinish)&& f, boost::system::error_code ec,
- int r) {
- std::visit([ec, r](auto&& arg) {
+ int r, boost::asio::io_context::executor_type e) {
+ std::visit([ec, r, e](auto&& arg) {
if constexpr (std::is_same_v<std::decay_t<decltype(arg)>,
Context*>) {
arg->complete(r);
fu2::unique_function<OpSig>>) {
std::move(arg)(ec);
} else {
- arg->defer(std::move(arg), ec);
+ boost::asio::defer(e,
+ boost::asio::append(std::move(arg), ec));
}
}, std::move(f));
}
- void complete(boost::system::error_code ec, int r) {
- complete(std::move(onfinish), ec, r);
+ void complete(boost::system::error_code ec, int r,
+ boost::asio::io_context::executor_type e) {
+ complete(std::move(onfinish), ec, r, e);
}
Op(const object_t& o, const object_locator_t& ol, osdc_opvec&& _ops,
- int f, std::unique_ptr<OpComp>&& fin,
- version_t *ov, int *offset = nullptr,
+ int f, OpComp&& fin, version_t *ov, int *offset = nullptr,
ZTracer::Trace *parent_trace = nullptr) :
target(o, ol, f),
ops(std::move(_ops)),
using OpSig = void(boost::system::error_code,
boost::container::flat_map<std::string, pool_stat_t>,
bool);
- using OpComp = ceph::async::Completion<OpSig>;
- std::unique_ptr<OpComp> onfinish;
+ using OpComp = boost::asio::any_completion_handler<OpSig>;
+ OpComp onfinish;
std::uint64_t ontimeout;
ceph::coarse_mono_time last_submit;
};
std::optional<int64_t> data_pool;
using OpSig = void(boost::system::error_code,
const struct ceph_statfs);
- using OpComp = ceph::async::Completion<OpSig>;
+ using OpComp = boost::asio::any_completion_handler<OpSig>;
- std::unique_ptr<OpComp> onfinish;
+ OpComp onfinish;
uint64_t ontimeout;
ceph::coarse_mono_time last_submit;
int64_t pool = 0;
std::string name;
using OpSig = void(boost::system::error_code, ceph::buffer::list);
- using OpComp = ceph::async::Completion<OpSig>;
- std::unique_ptr<OpComp> onfinish;
+ using OpComp = boost::asio::any_completion_handler<OpSig>;
+ OpComp onfinish;
uint64_t ontimeout = 0;
int pool_op = 0;
int16_t crush_rule = 0;
using OpSig = void(boost::system::error_code, std::string,
ceph::buffer::list);
- using OpComp = ceph::async::Completion<OpSig>;
- std::unique_ptr<OpComp> onfinish;
+ using OpComp = boost::asio::any_completion_handler<OpSig>;
+ OpComp onfinish;
uint64_t ontimeout = 0;
ceph::coarse_mono_time last_submit;
bool registered{false};
bool canceled{false};
using OpSig = void(boost::system::error_code, ceph::buffer::list);
- using OpComp = ceph::async::Completion<OpSig>;
- std::unique_ptr<OpComp> on_reg_commit;
- std::unique_ptr<OpComp> on_notify_finish;
+ using OpComp = boost::asio::any_completion_handler<OpSig>;
+ OpComp on_reg_commit;
+ OpComp on_notify_finish;
uint64_t notify_id{0};
fu2::unique_function<void(boost::system::error_code,
std::map<ceph_tid_t, CommandOp*> check_latest_map_commands;
std::map<epoch_t,
- std::vector<std::pair<std::unique_ptr<OpCompletion>,
+ std::vector<std::pair<OpCompletion,
boost::system::error_code>>> waiting_for_map;
ceph::timespan mon_timeout;
public:
template<typename CT>
auto linger_callback_flush(CT&& ct) {
- boost::asio::async_completion<CT, void(void)> init(ct);
- boost::asio::defer(finish_strand, std::move(init.completion_handler));
- return init.result.get();
+ auto consigned = boost::asio::consign(
+ std::forward<CT>(ct), boost::asio::make_work_guard(
+ service.get_executor()));
+ return boost::asio::async_initiate<decltype(consigned), void()>(
+ [this](auto handler) {
+ boost::asio::defer(finish_strand, std::move(handler));
+ }, consigned);
}
private:
template<typename CompletionToken>
auto wait_for_osd_map(CompletionToken&& token) {
- boost::asio::async_completion<CompletionToken, void()> init(token);
- std::unique_lock l(rwlock);
- if (osdmap->get_epoch()) {
- l.unlock();
- boost::asio::post(std::move(init.completion_handler));
- } else {
- waiting_for_map[0].emplace_back(
- OpCompletion::create(
- service.get_executor(),
- [c = std::move(init.completion_handler)]
- (boost::system::error_code) mutable {
- std::move(c)();
- }), boost::system::error_code{});
- l.unlock();
- }
- return init.result.get();
+ auto consigned = boost::asio::consign(
+ std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+ service.get_executor()));
+ return boost::asio::async_initiate<decltype(consigned), void()>(
+ [this](auto handler) {
+ std::unique_lock l(rwlock);
+ if (osdmap->get_epoch()) {
+ l.unlock();
+ boost::asio::post(std::move(handler));
+ } else {
+ auto e = boost::asio::get_associated_executor(
+ handler, service.get_executor());
+ waiting_for_map[0].emplace_back(
+ boost::asio::bind_executor(
+ e, [c = std::move(handler)]
+ (boost::system::error_code) mutable {
+ boost::asio::dispatch(std::move(c));
+ }),
+ boost::system::error_code{});
+ l.unlock();
+ }
+ }, consigned);
}
struct CB_Objecter_GetVersion {
Objecter *objecter;
- std::unique_ptr<OpCompletion> fin;
+ OpCompletion fin;
- CB_Objecter_GetVersion(Objecter *o, std::unique_ptr<OpCompletion> c)
+ CB_Objecter_GetVersion(Objecter *o, OpCompletion c)
: objecter(o), fin(std::move(c)) {}
void operator()(boost::system::error_code ec, version_t newest,
version_t oldest) {
// try again as instructed
objecter->_wait_for_latest_osdmap(std::move(*this));
} else if (ec) {
- ceph::async::post(std::move(fin), ec);
+ boost::asio::post(objecter->service.get_executor(),
+ boost::asio::append(std::move(fin), ec));
} else {
auto l = std::unique_lock(objecter->rwlock);
objecter->_get_latest_version(oldest, newest, std::move(fin),
template<typename CompletionToken>
auto wait_for_map(epoch_t epoch, CompletionToken&& token) {
- boost::asio::async_completion<CompletionToken, OpSignature> init(token);
-
- if (osdmap->get_epoch() >= epoch) {
- boost::asio::post(service,
- ceph::async::bind_handler(
- std::move(init.completion_handler),
- boost::system::error_code()));
- } else {
- monc->get_version("osdmap",
- CB_Objecter_GetVersion(
- this,
- OpCompletion::create(service.get_executor(),
- std::move(init.completion_handler))));
- }
- return init.result.get();
+ auto consigned = boost::asio::consign(
+ std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+ service.get_executor()));
+ return boost::asio::async_initiate<decltype(consigned), OpSignature>(
+ [epoch, this](auto handler) {
+ if (osdmap->get_epoch() >= epoch) {
+ boost::asio::post(boost::asio::append(
+ std::move(handler),
+ boost::system::error_code{}));
+ } else {
+ monc->get_version(
+ "osdmap",
+ CB_Objecter_GetVersion(this, std::move(handler)));
+ }
+ }, consigned);
}
-
- void _wait_for_new_map(std::unique_ptr<OpCompletion>, epoch_t epoch,
+ void _wait_for_new_map(OpCompletion, epoch_t epoch,
boost::system::error_code = {});
private:
template<typename CompletionToken>
auto wait_for_latest_osdmap(CompletionToken&& token) {
- boost::asio::async_completion<CompletionToken, OpSignature> init(token);
-
- monc->get_version("osdmap",
- CB_Objecter_GetVersion(
- this,
- OpCompletion::create(service.get_executor(),
- std::move(init.completion_handler))));
- return init.result.get();
+ auto consigned = boost::asio::consign(
+ std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+ service.get_executor()));
+ boost::asio::async_initiate<decltype(consigned), OpSignature>(
+ [this](auto handler) {
+ monc->get_version("osdmap",
+ CB_Objecter_GetVersion(
+ this,
+ std::move(handler)));
+ }, consigned);
}
- void wait_for_latest_osdmap(std::unique_ptr<OpCompletion> c) {
- monc->get_version("osdmap",
- CB_Objecter_GetVersion(this, std::move(c)));
+ auto wait_for_latest_osdmap(std::unique_ptr<ceph::async::Completion<OpSignature>> c) {
+ wait_for_latest_osdmap([c = std::move(c)](boost::system::error_code e) mutable {
+ c->dispatch(std::move(c), e);
+ });
}
template<typename CompletionToken>
auto get_latest_version(epoch_t oldest, epoch_t newest,
CompletionToken&& token) {
- boost::asio::async_completion<CompletionToken, OpSignature> init(token);
- {
- std::unique_lock wl(rwlock);
- _get_latest_version(oldest, newest,
- OpCompletion::create(
- service.get_executor(),
- std::move(init.completion_handler)),
- std::move(wl));
- }
- return init.result.get();
+ auto consigned = boost::asio::consign(
+ std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+ service.get_executor()));
+ return boost::asio::async_initiate<decltype(consigned), OpSignature>(
+ [oldest, newest, this](auto handler) {
+ std::unique_lock wl(rwlock);
+ _get_latest_version(oldest, newest,
+ std::move(handler), std::move(wl));
+ }, consigned);
}
void _get_latest_version(epoch_t oldest, epoch_t neweset,
- std::unique_ptr<OpCompletion> fin,
+ OpCompletion fin,
std::unique_lock<ceph::shared_mutex>&& ul);
/** Get the current set of global op flags */
epoch_t op_cancel_writes(int r, int64_t pool=-1);
// commands
- void osd_command(int osd, std::vector<std::string> cmd,
+ void osd_command_(int osd, std::vector<std::string> cmd,
ceph::buffer::list inbl, ceph_tid_t *ptid,
decltype(CommandOp::onfinish)&& onfinish) {
ceph_assert(osd >= 0);
auto osd_command(int osd, std::vector<std::string> cmd,
ceph::buffer::list inbl, ceph_tid_t *ptid,
CompletionToken&& token) {
- boost::asio::async_completion<CompletionToken,
- CommandOp::OpSig> init(token);
- osd_command(osd, std::move(cmd), std::move(inbl), ptid,
- CommandOp::OpComp::create(service.get_executor(),
- std::move(init.completion_handler)));
- return init.result.get();
- }
-
- void pg_command(pg_t pgid, std::vector<std::string> cmd,
- ceph::buffer::list inbl, ceph_tid_t *ptid,
- decltype(CommandOp::onfinish)&& onfinish) {
+ auto consigned = boost::asio::consign(
+ std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+ service.get_executor()));
+ return boost::asio::async_initiate<decltype(consigned), CommandOp::OpSig>(
+ [osd, cmd = std::move(cmd), inbl = std::move(inbl), ptid, this]
+ (auto handler) {
+ osd_command_(osd, std::move(cmd), std::move(inbl), ptid,
+ std::move(handler));
+ }, consigned);
+ }
+
+ void pg_command_(pg_t pgid, std::vector<std::string> cmd,
+ ceph::buffer::list inbl, ceph_tid_t *ptid,
+ decltype(CommandOp::onfinish)&& onfinish) {
auto *c = new CommandOp(
pgid,
std::move(cmd),
auto pg_command(pg_t pgid, std::vector<std::string> cmd,
ceph::buffer::list inbl, ceph_tid_t *ptid,
CompletionToken&& token) {
- boost::asio::async_completion<CompletionToken,
- CommandOp::OpSig> init(token);
- pg_command(pgid, std::move(cmd), std::move(inbl), ptid,
- CommandOp::OpComp::create(service.get_executor(),
- std::move(init.completion_handler)));
- return init.result.get();
+ auto consigned = boost::asio::consign(
+ std::forward<CompletionToken>(token), boost::asio::make_work_guard(service.get_executor()));
+ return async_initiate<decltype(consigned), CommandOp::OpSig> (
+ [pgid, cmd = std::move(cmd), inbl = std::move(inbl), ptid, this]
+ (auto handler) {
+ pg_command_(pgid, std::move(cmd), std::move(inbl), ptid,
+ std::move(handler));
+ }, consigned);
}
// mid-level helpers
void mutate(const object_t& oid, const object_locator_t& oloc,
ObjectOperation&& op, const SnapContext& snapc,
ceph::real_time mtime, int flags,
- std::unique_ptr<Op::OpComp>&& oncommit,
+ Op::OpComp oncommit,
version_t *objver = NULL, osd_reqid_t reqid = osd_reqid_t(),
ZTracer::Trace *parent_trace = nullptr) {
Op *o = new Op(oid, oloc, std::move(op.ops), flags | global_op_flags |
op_submit(o);
}
+ void mutate(const object_t& oid, const object_locator_t& oloc,
+ ObjectOperation&& op, const SnapContext& snapc,
+ ceph::real_time mtime, int flags,
+ std::unique_ptr<ceph::async::Completion<Op::OpSig>> oncommit,
+ version_t *objver = NULL, osd_reqid_t reqid = osd_reqid_t(),
+ ZTracer::Trace *parent_trace = nullptr) {
+ mutate(oid, oloc, std::move(op), snapc, mtime, flags,
+ [c = std::move(oncommit)](boost::system::error_code ec) mutable {
+ c->dispatch(std::move(c), ec);
+ }, objver, reqid, parent_trace);
+ }
+
Op *prepare_read_op(
const object_t& oid, const object_locator_t& oloc,
ObjectOperation& op,
void read(const object_t& oid, const object_locator_t& oloc,
ObjectOperation&& op, snapid_t snapid, ceph::buffer::list *pbl,
- int flags, std::unique_ptr<Op::OpComp>&& onack,
+ int flags, Op::OpComp onack,
version_t *objver = nullptr, int *data_offset = nullptr,
uint64_t features = 0, ZTracer::Trace *parent_trace = nullptr) {
Op *o = new Op(oid, oloc, std::move(op.ops), flags | global_op_flags |
op_submit(o);
}
+ void read(const object_t& oid, const object_locator_t& oloc,
+ ObjectOperation&& op, snapid_t snapid, ceph::buffer::list *pbl,
+ int flags, std::unique_ptr<ceph::async::Completion<Op::OpSig>> onack,
+ version_t *objver = nullptr, int *data_offset = nullptr,
+ uint64_t features = 0, ZTracer::Trace *parent_trace = nullptr) {
+ read(oid, oloc, std::move(op), snapid, pbl, flags,
+ [c = std::move(onack)](boost::system::error_code e) mutable {
+ c->dispatch(std::move(c), e);
+ }, objver, data_offset, features, parent_trace);
+ }
+
Op *prepare_pg_read_op(
uint32_t hash, object_locator_t oloc,
ceph_tid_t pg_read(
uint32_t hash, object_locator_t oloc,
ObjectOperation& op, ceph::buffer::list *pbl, int flags,
- std::unique_ptr<Op::OpComp>&& onack, epoch_t *reply_epoch, int *ctx_budget) {
+ Op::OpComp onack, epoch_t *reply_epoch, int *ctx_budget) {
ceph_tid_t tid;
Op *o = new Op(object_t(), oloc,
std::move(op.ops),
return linger_watch(info, op, snapc, mtime, inbl,
OpContextVert<ceph::buffer::list>(onfinish, nullptr), objver);
}
+ ceph_tid_t linger_watch(LingerOp *info,
+ ObjectOperation& op,
+ const SnapContext& snapc, ceph::real_time mtime,
+ ceph::buffer::list& inbl,
+ std::unique_ptr<ceph::async::Completion<
+ void(boost::system::error_code,
+ ceph::buffer::list)>> onfinish,
+ version_t *objver) {
+ return linger_watch(info, op, snapc, mtime, inbl,
+ OpCompletionVert<ceph::buffer::list>(
+ std::move(onfinish)), objver);
+ }
ceph_tid_t linger_notify(LingerOp *info,
ObjectOperation& op,
snapid_t snap, ceph::buffer::list& inbl,
OpContextVert(onack, poutbl),
objver);
}
+ ceph_tid_t linger_notify(LingerOp *info,
+ ObjectOperation& op,
+ snapid_t snap, ceph::buffer::list& inbl,
+ std::unique_ptr<ceph::async::Completion<
+ void(boost::system::error_code,
+ ceph::buffer::list)>> onack,
+ version_t *objver) {
+ return linger_notify(info, op, snap, inbl,
+ OpCompletionVert<ceph::buffer::list>(
+ std::move(onack)), objver);
+ }
tl::expected<ceph::timespan,
boost::system::error_code> linger_check(LingerOp *info);
void linger_cancel(LingerOp *info); // releases a reference
create_pool_snap(pool, snapName,
OpContextVert<ceph::buffer::list>(c, nullptr));
}
+ void create_pool_snap(
+ int64_t pool, std::string_view snapName,
+ std::unique_ptr<ceph::async::Completion<PoolOp::OpSig>> c) {
+ create_pool_snap(pool, snapName,
+ OpCompletionVert<ceph::buffer::list>(std::move(c)));
+ }
void allocate_selfmanaged_snap(int64_t pool,
- std::unique_ptr<ceph::async::Completion<
+ boost::asio::any_completion_handler<
void(boost::system::error_code,
- snapid_t)>> onfinish);
+ snapid_t)> onfinish);
void allocate_selfmanaged_snap(int64_t pool, snapid_t* psnapid,
Context* c) {
allocate_selfmanaged_snap(pool,
OpContextVert(c, psnapid));
}
+ void allocate_selfmanaged_snap(int64_t pool,
+ std::unique_ptr<ceph::async::Completion<void(
+ boost::system::error_code, snapid_t)>> c) {
+ allocate_selfmanaged_snap(pool,
+ OpCompletionVert<snapid_t>(std::move(c)));
+ }
void delete_pool_snap(int64_t pool, std::string_view snapName,
decltype(PoolOp::onfinish)&& onfinish);
void delete_pool_snap(int64_t pool, std::string_view snapName,
delete_pool_snap(pool, snapName,
OpContextVert<ceph::buffer::list>(c, nullptr));
}
+ void delete_pool_snap(int64_t pool, std::string_view snapName,
+ std::unique_ptr<ceph::async::Completion<void(
+ boost::system::error_code, ceph::buffer::list)>> c) {
+ delete_pool_snap(pool, snapName,
+ OpCompletionVert<ceph::buffer::list>(std::move(c)));
+ }
void delete_selfmanaged_snap(int64_t pool, snapid_t snap,
decltype(PoolOp::onfinish)&& onfinish);
delete_selfmanaged_snap(pool, snap,
OpContextVert<ceph::buffer::list>(c, nullptr));
}
+ void delete_selfmanaged_snap(int64_t pool, snapid_t snap,
+ std::unique_ptr<ceph::async::Completion<void(
+ boost::system::error_code, ceph::buffer::list)>> c) {
+ delete_selfmanaged_snap(pool, snap,
+ OpCompletionVert<ceph::buffer::list>(std::move(c)));
+ }
void create_pool(std::string_view name,
OpContextVert<ceph::buffer::list>(onfinish, nullptr),
crush_rule);
}
+ void create_pool(std::string_view name,
+ std::unique_ptr<ceph::async::Completion<void(
+ boost::system::error_code, ceph::buffer::list)>> c,
+ int crush_rule=-1) {
+ create_pool(name,
+ OpCompletionVert<ceph::buffer::list>(std::move(c)),
+ crush_rule);
+ }
void delete_pool(int64_t pool,
decltype(PoolOp::onfinish)&& onfinish);
void delete_pool(int64_t pool,
Context* onfinish) {
delete_pool(pool, OpContextVert<ceph::buffer::list>(onfinish, nullptr));
}
+ void delete_pool(int64_t pool,
+ std::unique_ptr<ceph::async::Completion<void(
+ boost::system::error_code, ceph::buffer::list)>> c) {
+ delete_pool(pool, OpCompletionVert<ceph::buffer::list>(std::move(c)));
+ }
void delete_pool(std::string_view name,
decltype(PoolOp::onfinish)&& onfinish);
Context* onfinish) {
delete_pool(name, OpContextVert<ceph::buffer::list>(onfinish, nullptr));
}
+ void delete_pool(std::string_view name,
+ std::unique_ptr<ceph::async::Completion<void(
+ boost::system::error_code, ceph::buffer::list)>> c) {
+ delete_pool(name, OpCompletionVert<ceph::buffer::list>(std::move(c)));
+ }
void handle_pool_op_reply(MPoolOpReply *m);
int pool_op_cancel(ceph_tid_t tid, int r);
void _poolstat_submit(PoolStatOp *op);
public:
void handle_get_pool_stats_reply(MGetPoolStatsReply *m);
- void get_pool_stats(const std::vector<std::string>& pools,
- decltype(PoolStatOp::onfinish)&& onfinish);
+ void get_pool_stats_(const std::vector<std::string>& pools,
+ decltype(PoolStatOp::onfinish)&& onfinish);
template<typename CompletionToken>
- auto get_pool_stats(const std::vector<std::string>& pools,
+ auto get_pool_stats(std::vector<std::string> pools,
CompletionToken&& token) {
- boost::asio::async_completion<CompletionToken,
- PoolStatOp::OpSig> init(token);
- get_pool_stats(pools,
- PoolStatOp::OpComp::create(
- service.get_executor(),
- std::move(init.completion_handler)));
- return init.result.get();
+ auto consigned = boost::asio::consign(
+ std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+ service.get_executor()));
+ return boost::asio::async_initiate<decltype(consigned), PoolStatOp::OpSig>(
+ [pools = std::move(pools), this](auto handler) {
+ get_pool_stats_(pools, std::move(handler));
+ }, consigned);
}
int pool_stat_op_cancel(ceph_tid_t tid, int r);
void _finish_pool_stat_op(PoolStatOp *op, int r);
void _fs_stats_submit(StatfsOp *op);
public:
void handle_fs_stats_reply(MStatfsReply *m);
- void get_fs_stats(std::optional<int64_t> poolid,
- decltype(StatfsOp::onfinish)&& onfinish);
+ void get_fs_stats_(std::optional<int64_t> poolid,
+ decltype(StatfsOp::onfinish)&& onfinish);
template<typename CompletionToken>
auto get_fs_stats(std::optional<int64_t> poolid,
CompletionToken&& token) {
- boost::asio::async_completion<CompletionToken, StatfsOp::OpSig> init(token);
- get_fs_stats(poolid,
- StatfsOp::OpComp::create(service.get_executor(),
- std::move(init.completion_handler)));
- return init.result.get();
+ auto consigned = boost::asio::consign(
+ std::forward<CompletionToken>(token), boost::asio::make_work_guard(
+ service.get_executor()));
+ return boost::asio::async_initiate<decltype(consigned), StatfsOp::OpSig>(
+ [poolid, this](auto handler) {
+ get_fs_stats_(poolid, std::move(handler));
+ }, consigned);
}
void get_fs_stats(struct ceph_statfs& result, std::optional<int64_t> poolid,
Context *onfinish) {
- get_fs_stats(poolid, OpContextVert(onfinish, result));
+ get_fs_stats_(poolid, OpContextVert(onfinish, result));
+ }
+ void get_fs_stats(std::optional<int64_t> poolid,
+ std::unique_ptr<ceph::async::Completion<void(
+ boost::system::error_code, struct ceph_statfs)>> c) {
+ get_fs_stats_(poolid, OpCompletionVert<struct ceph_statfs>(std::move(c)));
}
int statfs_op_cancel(ceph_tid_t tid, int r);
void _finish_statfs_op(StatfsOp *op, int r);
#include <boost/system/system_error.hpp>
namespace bs = boost::system;
+namespace asio = boost::asio;
using namespace std::literals;
using namespace std::placeholders;
namespace {
struct CompletionPayload {
- std::unique_ptr<Op::Completion> c;
+ Op::Completion c;
};
void completion_callback_adapter(rados_completion_t c, void *arg) {
impl->release();
auto payload = reinterpret_cast<CompletionPayload*>(arg);
- payload->c->defer(std::move(payload->c),
- (r < 0) ? bs::error_code(-r, osd_category()) :
- bs::error_code());
+ asio::dispatch(asio::append(std::move(payload->c),
+ (r < 0) ? bs::error_code(-r, osd_category()) :
+ bs::error_code()));
delete payload;
}
librados::AioCompletionImpl* create_aio_completion(
- std::unique_ptr<Op::Completion>&& c) {
+ Op::Completion&& c) {
auto payload = new CompletionPayload{std::move(c)};
auto impl = new librados::AioCompletionImpl();
return impl->io_context.get_executor();
}
-void RADOS::execute(Object o, IOContext ioc, ReadOp op,
- ceph::buffer::list* bl, std::unique_ptr<Op::Completion> c,
- uint64_t* objver, const blkin_trace_info* trace_info) {
+void RADOS::execute_(Object o, IOContext ioc, ReadOp op,
+ ceph::buffer::list* bl, Op::Completion c,
+ uint64_t* objver, const blkin_trace_info* trace_info) {
auto io_ctx = impl->get_io_ctx(ioc);
if (io_ctx == nullptr) {
- c->dispatch(std::move(c), osdc_errc::pool_dne);
+ asio::dispatch(asio::append(std::move(c), osdc_errc::pool_dne));
return;
}
ceph_assert(r == 0);
}
-void RADOS::execute(Object o, IOContext ioc, WriteOp op,
- std::unique_ptr<Op::Completion> c, uint64_t* objver,
- const blkin_trace_info* trace_info) {
+void RADOS::execute_(Object o, IOContext ioc, WriteOp op,
+ Op::Completion c, uint64_t* objver,
+ const blkin_trace_info* trace_info) {
auto io_ctx = impl->get_io_ctx(ioc);
if (io_ctx == nullptr) {
- c->dispatch(std::move(c), osdc_errc::pool_dne);
+ asio::dispatch(asio::append(std::move(c), osdc_errc::pool_dne));
return;
}
ceph_assert(r == 0);
}
-void RADOS::mon_command(std::vector<std::string> command,
- bufferlist bl,
- std::string* outs, bufferlist* outbl,
- std::unique_ptr<Op::Completion> c) {
+void RADOS::mon_command_(std::vector<std::string> command,
+ bufferlist bl,
+ std::string* outs, bufferlist* outbl,
+ Op::Completion c) {
auto r = impl->test_rados_client->mon_command(command, bl, outbl, outs);
- c->post(std::move(c),
- (r < 0 ? bs::error_code(-r, osd_category()) : bs::error_code()));
+ asio::post(get_executor(),
+ asio::append(std::move(c),
+ (r < 0 ? bs::error_code(-r, osd_category()) :
+ bs::error_code())));
}
-void RADOS::blocklist_add(std::string client_address,
- std::optional<std::chrono::seconds> expire,
- std::unique_ptr<SimpleOpComp> c) {
+void RADOS::blocklist_add_(std::string client_address,
+ std::optional<std::chrono::seconds> expire,
+ SimpleOpComp c) {
auto r = impl->test_rados_client->blocklist_add(
std::string(client_address), expire.value_or(0s).count());
- c->post(std::move(c),
- (r < 0 ? bs::error_code(-r, mon_category()) : bs::error_code()));
+ asio::post(get_executor(),
+ asio::append(std::move(c),
+ (r < 0 ? bs::error_code(-r, mon_category()) :
+ bs::error_code())));
}
-void RADOS::wait_for_latest_osd_map(std::unique_ptr<Op::Completion> c) {
+void RADOS::wait_for_latest_osd_map_(Op::Completion c) {
auto r = impl->test_rados_client->wait_for_latest_osd_map();
- c->dispatch(std::move(c),
- (r < 0 ? bs::error_code(-r, osd_category()) :
- bs::error_code()));
+ asio::dispatch(asio::append(std::move(c),
+ (r < 0 ? bs::error_code(-r, osd_category()) :
+ bs::error_code())));
}
} // namespace neorados