#ifndef CEPH_COMMON_ASYNC_BLOCKED_COMPLETION_H
#define CEPH_COMMON_ASYNC_BLOCKED_COMPLETION_H
-#include <atomic>
#include <condition_variable>
#include <mutex>
#include <optional>
-#include <type_traits>
#include <boost/asio/async_result.hpp>
+#include <boost/asio/redirect_error.hpp>
#include <boost/system/error_code.hpp>
-#include <boost/system/system_error.hpp>
+
+#include <common/async/concepts.h>
namespace ceph::async {
namespace bs = boost::system;
class use_blocked_t {
- use_blocked_t(bs::error_code* ec) : ec(ec) {}
public:
use_blocked_t() = default;
- use_blocked_t operator [](bs::error_code& _ec) const {
- return use_blocked_t(&_ec);
+ auto operator [](bs::error_code& ec) const {
+ return boost::asio::redirect_error(use_blocked_t{}, ec);
}
-
- bs::error_code* ec = nullptr;
};
inline constexpr use_blocked_t use_blocked;
namespace detail {
-
-template<typename... Ts>
+// Obnoxiously repetitive, but it cuts down on the amount of
+// copying/moving/splicing/concatenating of tuples I need to do.
+template<typename ...Ts>
struct blocked_handler
{
- blocked_handler(use_blocked_t b) noexcept : ec(b.ec) {}
+ blocked_handler(std::optional<std::tuple<Ts...>>* vals, std::mutex* m,
+ std::condition_variable* cv, bool* done)
+ : vals(vals), m(m), cv(cv), done(done) {
+ }
- void operator ()(Ts... values) noexcept {
+ template<typename ...Args>
+ void operator ()(Args&& ...args) noexcept {
+ static_assert(sizeof...(Ts) == sizeof...(Args));
std::scoped_lock l(*m);
- *ec = bs::error_code{};
- *value = std::forward_as_tuple(std::move(values)...);
+ *vals = std::tuple<Ts...>(std::forward<Args>(args)...);
*done = true;
cv->notify_one();
}
- void operator ()(bs::error_code ec, Ts... values) noexcept {
+ //private:
+ std::optional<std::tuple<Ts...>>* vals;
+ std::mutex* m = nullptr;
+ std::condition_variable* cv = nullptr;
+ bool* done = nullptr;
+};
+
+template<boost::asio::disposition D, typename ...Ts>
+struct blocked_handler<D, Ts...>
+{
+ blocked_handler(D* dispo, std::optional<std::tuple<Ts...>>* vals,
+ std::mutex* m, std::condition_variable* cv, bool* done)
+ : dispo(dispo), vals(vals), m(m), cv(cv), done(done) {
+ }
+
+ template<typename Arg0, typename... Args>
+ void operator ()(Arg0&& arg0, Args&& ...args) noexcept {
+ static_assert(sizeof...(Ts) == sizeof...(Args));
std::scoped_lock l(*m);
- *this->ec = ec;
- *value = std::forward_as_tuple(std::move(values)...);
+ *dispo = std::move(arg0);
+ *vals = std::tuple<Ts...>(std::forward<Args>(args)...);
*done = true;
cv->notify_one();
}
- bs::error_code* ec;
- std::optional<std::tuple<Ts...>>* value = nullptr;
+ //private:
+ D* dispo;
+ std::optional<std::tuple<Ts...>>* vals;
std::mutex* m = nullptr;
std::condition_variable* cv = nullptr;
bool* done = nullptr;
template<typename T>
struct blocked_handler<T>
{
- blocked_handler(use_blocked_t b) noexcept : ec(b.ec) {}
+ blocked_handler(std::optional<T>* val, std::mutex* m,
+ std::condition_variable* cv, bool* done)
+ : val(val), m(m), cv(cv), done(done) {}
- void operator ()(T value) noexcept {
+ template<typename Arg>
+ void operator ()(Arg&& arg) noexcept {
std::scoped_lock l(*m);
- *ec = bs::error_code();
- *this->value = std::move(value);
+ *val = std::forward<Arg>(arg);
*done = true;
cv->notify_one();
}
- void operator ()(bs::error_code ec, T value) noexcept {
+ //private:
+ std::optional<T>* val;
+ std::mutex* m = nullptr;
+ std::condition_variable* cv = nullptr;
+ bool* done = nullptr;
+};
+
+template<boost::asio::disposition D, typename T>
+struct blocked_handler<D, T>
+{
+ blocked_handler(D* dispo, std::optional<T>* val, std::mutex* m,
+ std::condition_variable* cv, bool* done)
+ : dispo(dispo), val(val), m(m), cv(cv), done(done) {}
+
+ template<typename Arg0, typename Arg>
+ void operator ()(Arg0&& arg0, Arg&& arg) noexcept {
std::scoped_lock l(*m);
- *this->ec = ec;
- *this->value = std::move(value);
+ *dispo = std::move(arg0);
+ *val = std::move(arg);
*done = true;
cv->notify_one();
}
//private:
- bs::error_code* ec;
- std::optional<T>* value;
+ D* dispo;
+ std::optional<T>* val;
std::mutex* m = nullptr;
std::condition_variable* cv = nullptr;
bool* done = nullptr;
template<>
struct blocked_handler<void>
{
- blocked_handler(use_blocked_t b) noexcept : ec(b.ec) {}
+ blocked_handler(std::mutex* m, std::condition_variable* cv, bool* done)
+ : m(m), cv(cv), done(done) {}
void operator ()() noexcept {
std::scoped_lock l(*m);
- *ec = bs::error_code{};
*done = true;
cv->notify_one();
}
- void operator ()(bs::error_code ec) noexcept {
+ std::mutex* m = nullptr;
+ std::condition_variable* cv = nullptr;
+ bool* done = nullptr;
+};
+
+template<boost::asio::disposition D>
+struct blocked_handler<D>
+{
+ blocked_handler(D* dispo, std::mutex* m,
+ std::condition_variable* cv, bool* done)
+ : dispo(dispo), m(m), cv(cv), done(done) {}
+
+ template<typename Arg0>
+ void operator ()(Arg0&& arg0) noexcept {
std::scoped_lock l(*m);
- *this->ec = ec;
+ *dispo = std::move(arg0);
*done = true;
cv->notify_one();
}
- bs::error_code* ec;
+ //private:
+ D* dispo;
std::mutex* m = nullptr;
std::condition_variable* cv = nullptr;
bool* done = nullptr;
using completion_handler_type = blocked_handler<Ts...>;
using return_type = std::tuple<Ts...>;
- explicit blocked_result(completion_handler_type& h) noexcept {
- std::scoped_lock l(m);
- out_ec = h.ec;
- if (!out_ec) h.ec = &ec;
- h.value = &value;
- h.m = &m;
- h.cv = &cv;
- h.done = &done;
- }
+ template<typename Initiation, typename... Args>
+ static return_type initiate(Initiation&& init,
+ use_blocked_t,
+ Args&& ...args) {
+ using ttype = std::tuple<Ts...>;
+ std::optional<ttype> vals;
+ std::mutex m;
+ std::condition_variable cv;
+ bool done = false;
+ static_assert(std::tuple_size_v<ttype> > 1);
+
+ std::move(init)(completion_handler_type(&vals, &m, &cv, &done),
+ std::forward<Args>(args)...);
- return_type get() {
std::unique_lock l(m);
- cv.wait(l, [this]() { return done; });
- if (!out_ec && ec) throw bs::system_error(ec);
- return std::move(*value);
+ cv.wait(l, [&done]() { return done; });
+ return std::move(*vals);
}
blocked_result(const blocked_result&) = delete;
blocked_result& operator =(const blocked_result&) = delete;
blocked_result(blocked_result&&) = delete;
blocked_result& operator =(blocked_result&&) = delete;
+};
-private:
- bs::error_code* out_ec;
- bs::error_code ec;
- std::optional<return_type> value;
- std::mutex m;
- std::condition_variable cv;
- bool done = false;
+template<boost::asio::disposition D, typename... Ts>
+class blocked_result<D, Ts...>
+{
+public:
+ using completion_handler_type = blocked_handler<D, Ts...>;
+ using return_type = std::tuple<Ts...>;
+
+ template<typename Initiation, typename... Args>
+ static return_type initiate(Initiation&& init,
+ use_blocked_t,
+ Args&& ...args) {
+ using ttype = std::tuple<Ts...>;
+ std::optional<ttype> vals;
+ D dispo;
+ std::mutex m;
+ std::condition_variable cv;
+ bool done = false;
+ static_assert(std::tuple_size_v<ttype> > 1);
+
+ std::move(init)(completion_handler_type(&dispo, &vals, &m, &cv, &done),
+ std::forward<Args>(args)...);
+
+ std::unique_lock l(m);
+ cv.wait(l, [&done]() { return done; });
+ if (dispo != boost::asio::no_error) {
+ boost::asio::disposition_traits<D>::throw_exception(dispo);
+ }
+ return std::move(*vals);
+ }
+
+ blocked_result(const blocked_result&) = delete;
+ blocked_result& operator =(const blocked_result&) = delete;
+ blocked_result(blocked_result&&) = delete;
+ blocked_result& operator =(blocked_result&&) = delete;
};
template<typename T>
using completion_handler_type = blocked_handler<T>;
using return_type = T;
- explicit blocked_result(completion_handler_type& h) noexcept {
- std::scoped_lock l(m);
- out_ec = h.ec;
- if (!out_ec) h.ec = &ec;
- h.value = &value;
- h.m = &m;
- h.cv = &cv;
- h.done = &done;
- }
+ template<typename Initiation, typename... Args>
+ static return_type initiate(Initiation&& init,
+ use_blocked_t,
+ Args&& ...args) {
+ std::optional<T> val;
+ std::mutex m;
+ std::condition_variable cv;
+ bool done = false;
+
+ std::move(init)(completion_handler_type(&val, &m, &cv, &done),
+ std::forward<Args>(args)...);
- return_type get() {
std::unique_lock l(m);
- cv.wait(l, [this]() { return done; });
- if (!out_ec && ec) throw bs::system_error(ec);
- return std::move(*value);
+ cv.wait(l, [&done]() { return done; });
+ return std::move(*val);
}
blocked_result(const blocked_result&) = delete;
blocked_result& operator =(const blocked_result&) = delete;
blocked_result(blocked_result&&) = delete;
blocked_result& operator =(blocked_result&&) = delete;
+};
+
+template<boost::asio::disposition D, typename T>
+class blocked_result<D, T>
+{
+public:
+ using completion_handler_type = blocked_handler<D, T>;
+ using return_type = T;
+
+ template<typename Initiation, typename... Args>
+ static return_type initiate(Initiation&& init,
+ use_blocked_t,
+ Args&& ...args) {
+ D dispo;
+ std::optional<T> val;
+ std::mutex m;
+ std::condition_variable cv;
+ bool done = false;
+
+ std::move(init)(completion_handler_type(&dispo, &val, &m, &cv, &done),
+ std::forward<Args>(args)...);
+
+ std::unique_lock l(m);
+ cv.wait(l, [&done]() { return done; });
+ if (dispo != boost::asio::no_error) {
+ boost::asio::disposition_traits<D>::throw_exception(dispo);
+ }
+ return std::move(*val);
+ }
-private:
- bs::error_code* out_ec;
- bs::error_code ec;
- std::optional<return_type> value;
- std::mutex m;
- std::condition_variable cv;
- bool done = false;
+ blocked_result(const blocked_result&) = delete;
+ blocked_result& operator =(const blocked_result&) = delete;
+ blocked_result(blocked_result&&) = delete;
+ blocked_result& operator =(blocked_result&&) = delete;
};
template<>
using completion_handler_type = blocked_handler<void>;
using return_type = void;
- explicit blocked_result(completion_handler_type& h) noexcept {
- std::scoped_lock l(m);
- out_ec = h.ec;
- if (!out_ec) h.ec = &ec;
- h.m = &m;
- h.cv = &cv;
- h.done = &done;
- }
+ template<typename Initiation, typename... Args>
+ static return_type initiate(Initiation&& init,
+ use_blocked_t,
+ Args&& ...args) {
+ std::mutex m;
+ std::condition_variable cv;
+ bool done = false;
+
+ std::move(init)(completion_handler_type(&m, &cv, &done),
+ std::forward<Args>(args)...);
- void get() {
std::unique_lock l(m);
- cv.wait(l, [this]() { return done; });
- if (!out_ec && ec) throw bs::system_error(ec);
+ cv.wait(l, [&done]() { return done; });
+ return;
}
blocked_result(const blocked_result&) = delete;
blocked_result& operator =(const blocked_result&) = delete;
blocked_result(blocked_result&&) = delete;
blocked_result& operator =(blocked_result&&) = delete;
+};
+
-private:
- bs::error_code* out_ec;
- bs::error_code ec;
- std::mutex m;
- std::condition_variable cv;
- bool done = false;
+template<boost::asio::disposition D>
+class blocked_result<D>
+{
+public:
+ using completion_handler_type = blocked_handler<D>;
+ using return_type = void;
+
+ template<typename Initiation, typename... Args>
+ static return_type initiate(Initiation&& init,
+ use_blocked_t,
+ Args&& ...args) {
+ D dispo;
+ std::mutex m;
+ std::condition_variable cv;
+ bool done = false;
+
+ std::move(init)(completion_handler_type(&dispo, &m, &cv, &done),
+ std::forward<Args>(args)...);
+
+ std::unique_lock l(m);
+ cv.wait(l, [&done]() { return done; });
+ if (dispo != boost::asio::no_error) {
+ boost::asio::disposition_traits<D>::throw_exception(dispo);
+ }
+ return;
+ }
+
+ blocked_result(const blocked_result&) = delete;
+ blocked_result& operator =(const blocked_result&) = delete;
+ blocked_result(blocked_result&&) = delete;
+ blocked_result& operator =(blocked_result&&) = delete;
};
+
} // namespace detail
} // namespace ceph::async
namespace boost::asio {
-template<typename ReturnType>
-class async_result<ceph::async::use_blocked_t, ReturnType()>
+template<typename R>
+class async_result<ceph::async::use_blocked_t, R()>
: public ceph::async::detail::blocked_result<void>
{
-public:
- explicit async_result(typename ceph::async::detail::blocked_result<void>
- ::completion_handler_type& h)
- : ceph::async::detail::blocked_result<void>(h) {}
};
-template<typename ReturnType, typename... Args>
-class async_result<ceph::async::use_blocked_t, ReturnType(Args...)>
- : public ceph::async::detail::blocked_result<std::decay_t<Args>...>
+template<typename R, disposition D>
+class async_result<ceph::async::use_blocked_t, R(D)>
+ : public ceph::async::detail::blocked_result<D>
{
-public:
- explicit async_result(
- typename ceph::async::detail::blocked_result<std::decay_t<Args>...>::completion_handler_type& h)
- : ceph::async::detail::blocked_result<std::decay_t<Args>...>(h) {}
};
-template<typename ReturnType>
-class async_result<ceph::async::use_blocked_t,
- ReturnType(boost::system::error_code)>
- : public ceph::async::detail::blocked_result<void>
+template<typename R, typename Arg>
+class async_result<ceph::async::use_blocked_t, R(Arg)>
+ : public ceph::async::detail::blocked_result<Arg>
{
-public:
- explicit async_result(
- typename ceph::async::detail::blocked_result<void>::completion_handler_type& h)
- : ceph::async::detail::blocked_result<void>(h) {}
};
-template<typename ReturnType, typename... Args>
-class async_result<ceph::async::use_blocked_t,
- ReturnType(boost::system::error_code, Args...)>
- : public ceph::async::detail::blocked_result<std::decay_t<Args>...>
+template<typename R, disposition D, typename Arg>
+class async_result<ceph::async::use_blocked_t, R(D, Arg)>
+ : public ceph::async::detail::blocked_result<D, Arg>
+{
+};
+
+template<typename R, typename... Args>
+class async_result<ceph::async::use_blocked_t, R(Args...)>
+ : public ceph::async::detail::blocked_result<Args...>
+{
+};
+
+template<typename R, disposition D, typename... Args>
+class async_result<ceph::async::use_blocked_t, R(D, Args...)>
+ : public ceph::async::detail::blocked_result<D, Args...>
{
-public:
- explicit async_result(
- typename ceph::async::detail::blocked_result<std::decay_t<Args>...>::completion_handler_type& h)
- : ceph::async::detail::blocked_result<std::decay_t<Args>...>(h) {}
};
}
#ifndef CEPH_ASYNC_FORWARD_HANDLER_H
#define CEPH_ASYNC_FORWARD_HANDLER_H
+#include <utility>
+
#include <boost/asio/associator.hpp>
namespace ceph::async {
#pragma GCC diagnostic pop
#pragma clang diagnostic pop
-inline int from_exception(std::exception_ptr eptr) {
+[[nodiscard]] inline int from_exception(std::exception_ptr eptr) {
if (!eptr) [[likely]] {
return 0;
}
#undef dout_prefix
#define dout_prefix *_dout << "monclient" << (_hunting() ? "(hunting)":"") << ": "
+namespace asio = boost::asio;
namespace bs = boost::system;
using std::string;
using namespace std::literals;
monc_lock.lock();
stopping = true;
while (!version_requests.empty()) {
- ceph::async::post(std::move(version_requests.begin()->second),
- monc_errc::shutting_down, 0, 0);
+ asio::dispatch(
+ asio::append(std::move(version_requests.begin()->second),
+ make_error_code(monc_errc::shutting_down), 0, 0));
ldout(cct, 20) << __func__ << " canceling and discarding version request "
<< version_requests.begin()->first << dendl;
version_requests.erase(version_requests.begin());
ceph_assert(auth);
_check_auth_tickets();
} else if (auth_err == -EAGAIN && !active_con) {
- ldout(cct,10) << __func__
+ ldout(cct,10) << __func__
<< " auth returned EAGAIN, reopening the session to try again"
<< dendl;
_reopen_session();
// throw out version check requests
while (!version_requests.empty()) {
- ceph::async::post(std::move(version_requests.begin()->second),
- monc_errc::session_reset, 0, 0);
+ asio::dispatch(asio::append(std::move(version_requests.begin()->second),
+ make_error_code(monc_errc::session_reset),
+ 0, 0));
version_requests.erase(version_requests.begin());
}
if (r->is_tell()) {
++r->send_attempts;
if (r->send_attempts > cct->_conf->mon_client_directed_command_retry) {
- _finish_command(r, monc_errc::mon_unavailable, "mon unavailable", {});
+ _finish_command(r, make_error_code(monc_errc::mon_unavailable),
+ "mon unavailable", {});
return;
}
// tell-style command
if (r->target_rank >= (int)monmap.size()) {
ldout(cct, 10) << " target " << r->target_rank
<< " >= max mon " << monmap.size() << dendl;
- _finish_command(r, monc_errc::rank_dne, "mon rank dne"sv, {});
+ _finish_command(r, make_error_code(monc_errc::rank_dne),
+ "mon rank dne"sv, {});
return;
}
r->target_con = messenger->connect_to_mon(
if (!monmap.contains(r->target_name)) {
ldout(cct, 10) << " target " << r->target_name
<< " not present in monmap" << dendl;
- _finish_command(r, monc_errc::mon_dne, "mon dne"sv, {});
+ _finish_command(r, make_error_code(monc_errc::mon_dne),
+ "mon dne"sv, {});
return;
}
r->target_con = messenger->connect_to_mon(
if (r->target_rank >= (int)monmap.size()) {
ldout(cct, 10) << " target " << r->target_rank
<< " >= max mon " << monmap.size() << dendl;
- _finish_command(r, monc_errc::rank_dne, "mon rank dne"sv, {});
+ _finish_command(r, make_error_code(monc_errc::rank_dne),
+ "mon rank dne"sv, {});
return;
}
_reopen_session(r->target_rank);
if (!monmap.contains(r->target_name)) {
ldout(cct, 10) << " target " << r->target_name
<< " not present in monmap" << dendl;
- _finish_command(r, monc_errc::mon_dne, "mon dne"sv, {});
+ _finish_command(r, make_error_code(monc_errc::mon_dne),
+ "mon dne"sv, {});
return;
}
_reopen_session(monmap.get_rank(r->target_name));
ldout(cct, 10) << __func__ << " tid " << tid << dendl;
MonCommand *cmd = it->second;
- _finish_command(cmd, monc_errc::timed_out, "timed out"sv, {});
+ _finish_command(cmd, make_error_code(monc_errc::timed_out),
+ "timed out"sv, {});
return 0;
}
{
ldout(cct, 10) << __func__ << " " << r->tid << " = " << ret << " " << rs
<< dendl;
- ceph::async::post(std::move(r->onfinish), ret, std::string(rs),
- std::move(bl));
+ asio::post(service.get_executor(),
+ asio::append(std::move(r->onfinish), ret, std::string(rs),
+ std::move(bl)));
if (r->target_con) {
r->target_con->mark_down();
}
ldout(cct, 10) << __func__ << " finishing " << iter->first << " version "
<< m->version << dendl;
version_requests.erase(iter);
- ceph::async::post(std::move(req), bs::error_code(),
- m->version, m->oldest_version);
+ asio::post(service.get_executor(),
+ asio::append(std::move(req), bs::error_code(),
+ m->version, m->oldest_version));
}
m->put();
}
#include <string>
#include <vector>
+#include <boost/asio/append.hpp>
+#include <boost/asio/consign.hpp>
#include <boost/asio/io_context.hpp>
+#include <boost/asio/any_completion_handler.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/asio/strand.hpp>
+#include <boost/asio/executor_work_guard.hpp>
+#include <boost/asio/post.hpp>
#include "msg/Messenger.h"
#include "MonSub.h"
#include "common/admin_socket.h"
-#include "common/async/completion.h"
#include "common/strtol.h" // for strict_strtoll()
#include "common/Timer.h"
#include "common/config.h"
public:
// Error, Newest, Oldest
using VersionSig = void(boost::system::error_code, version_t, version_t);
- using VersionCompletion = ceph::async::Completion<VersionSig>;
+ using VersionCompletion = boost::asio::any_completion_handler<VersionSig>;
using CommandSig = void(boost::system::error_code, std::string,
ceph::buffer::list);
- using CommandCompletion = ceph::async::Completion<CommandSig>;
+ using CommandCompletion = boost::asio::any_completion_handler<CommandSig>;
MonMap monmap;
std::map<std::string,std::string> config_mgr;
uint64_t tid;
std::vector<std::string> cmd;
ceph::buffer::list inbl;
- std::unique_ptr<CommandCompletion> onfinish;
+ CommandCompletion onfinish;
std::optional<boost::asio::steady_timer> cancel_timer;
- MonCommand(MonClient& monc, uint64_t t, std::unique_ptr<CommandCompletion> onfinish)
+ MonCommand(MonClient& monc, uint64_t t, CommandCompletion onfinish)
: tid(t), onfinish(std::move(onfinish)) {
auto timeout =
monc.cct->_conf.get_val<std::chrono::seconds>("rados_mon_op_timeout");
public:
template<typename CompletionToken>
- auto start_mon_command(const std::vector<std::string>& cmd,
- const ceph::buffer::list& inbl,
+ auto start_mon_command(std::vector<std::string> cmd,
+ ceph::buffer::list inbl,
CompletionToken&& token) {
+ namespace asio = boost::asio;
ldout(cct,10) << __func__ << " cmd=" << cmd << dendl;
- boost::asio::async_completion<CompletionToken, CommandSig> init(token);
- {
- std::scoped_lock l(monc_lock);
- auto h = CommandCompletion::create(service.get_executor(),
- std::move(init.completion_handler));
- if (!initialized || stopping) {
- ceph::async::post(std::move(h), monc_errc::shutting_down, std::string{},
- bufferlist{});
- } else {
- auto r = new MonCommand(*this, ++last_mon_command_tid, std::move(h));
- r->cmd = cmd;
- r->inbl = inbl;
- mon_commands.emplace(r->tid, r);
- _send_command(r);
- }
- }
- return init.result.get();
+ auto consigned = asio::consign(
+ std::forward<CompletionToken>(token), asio::make_work_guard(
+ asio::get_associated_executor(token, service.get_executor())));
+ return asio::async_initiate<decltype(consigned), CommandSig>(
+ [this, cmd = std::move(cmd),
+ inbl = std::move(inbl)](auto handler) mutable {
+ std::scoped_lock l(monc_lock);
+ if (!initialized || stopping) {
+ asio::dispatch(
+ asio::get_associated_immediate_executor(handler,
+ service.get_executor()),
+ asio::append(std::move(handler),
+ make_error_code(monc_errc::shutting_down),
+ std::string{}, bufferlist{}));
+ } else {
+ auto r = new MonCommand(*this, ++last_mon_command_tid,
+ std::move(handler));
+ r->cmd = std::move(cmd);
+ r->inbl = std::move(inbl);
+ mon_commands.emplace(r->tid, r);
+ _send_command(r);
+ }
+ }, consigned);
}
template<typename CompletionToken>
- auto start_mon_command(int mon_rank, const std::vector<std::string>& cmd,
- const ceph::buffer::list& inbl, CompletionToken&& token) {
+ auto start_mon_command(int mon_rank, std::vector<std::string> cmd,
+ ceph::buffer::list inbl,
+ CompletionToken&& token) {
+ namespace asio = boost::asio;
+ namespace sys = boost::system;
ldout(cct,10) << __func__ << " cmd=" << cmd << dendl;
- boost::asio::async_completion<CompletionToken, CommandSig> init(token);
- {
- std::scoped_lock l(monc_lock);
- auto h = CommandCompletion::create(service.get_executor(),
- std::move(init.completion_handler));
- if (!initialized || stopping) {
- ceph::async::post(std::move(h), monc_errc::shutting_down, std::string{},
- bufferlist{});
- } else {
- auto r = new MonCommand(*this, ++last_mon_command_tid, std::move(h));
- r->target_rank = mon_rank;
- r->cmd = cmd;
- r->inbl = inbl;
- mon_commands.emplace(r->tid, r);
- _send_command(r);
- }
- }
- return init.result.get();
+ auto consigned = asio::consign(
+ std::forward<CompletionToken>(token), asio::make_work_guard(
+ asio::get_associated_executor(token, service.get_executor())));
+ return asio::async_initiate<decltype(consigned), CommandSig>(
+ [this, mon_rank, cmd = std::move(cmd),
+ inbl = std::move(inbl)](auto handler) mutable {
+ std::scoped_lock l(monc_lock);
+ if (!initialized || stopping) {
+ asio::dispatch(
+ asio::get_associated_immediate_executor(handler,
+ service.get_executor()),
+ asio::append(std::move(handler),
+ make_error_code(monc_errc::shutting_down),
+ std::string{}, bufferlist{}));
+ } else {
+ auto r = new MonCommand(*this, ++last_mon_command_tid,
+ std::move(handler));
+ r->target_rank = mon_rank;
+ r->cmd = std::move(cmd);
+ r->inbl = std::move(inbl);
+ mon_commands.emplace(r->tid, r);
+ _send_command(r);
+ }
+ }, consigned);
}
template<typename CompletionToken>
- auto start_mon_command(const std::string& mon_name,
- const std::vector<std::string>& cmd,
- const ceph::buffer::list& inbl,
+ auto start_mon_command(std::string mon_name,
+ std::vector<std::string> cmd,
+ ceph::buffer::list inbl,
CompletionToken&& token) {
+ namespace asio = boost::asio;
ldout(cct,10) << __func__ << " cmd=" << cmd << dendl;
- boost::asio::async_completion<CompletionToken, CommandSig> init(token);
- {
- std::scoped_lock l(monc_lock);
- auto h = CommandCompletion::create(service.get_executor(),
- std::move(init.completion_handler));
- if (!initialized || stopping) {
- ceph::async::post(std::move(h), monc_errc::shutting_down, std::string{},
- bufferlist{});
- } else {
- auto r = new MonCommand(*this, ++last_mon_command_tid, std::move(h));
- // detect/tolerate mon *rank* passed as a string
- std::string err;
- int rank = strict_strtoll(mon_name.c_str(), 10, &err);
- if (err.size() == 0 && rank >= 0) {
- ldout(cct,10) << __func__ << " interpreting name '" << mon_name
- << "' as rank " << rank << dendl;
- r->target_rank = rank;
+ auto consigned = asio::consign(
+ std::forward<CompletionToken>(token), asio::make_work_guard(
+ asio::get_associated_executor(token, service.get_executor())));
+ return asio::async_initiate<decltype(consigned), CommandSig>(
+ [this, mon_name = std::move(mon_name), cmd = std::move(cmd),
+ inbl = std::move(inbl)](auto handler) mutable {
+ std::scoped_lock l(monc_lock);
+ if (!initialized || stopping) {
+ asio::dispatch(
+ asio::get_associated_immediate_executor(handler,
+ service.get_executor()),
+ asio::append(std::move(handler),
+ make_error_code(monc_errc::shutting_down),
+ std::string{}, bufferlist{}));
} else {
- r->target_name = mon_name;
+ auto r = new MonCommand(*this, ++last_mon_command_tid,
+ std::move(handler));
+ // detect/tolerate mon *rank* passed as a string
+ std::string err;
+ int rank = strict_strtoll(mon_name.c_str(), 10, &err);
+ if (err.size() == 0 && rank >= 0) {
+ ldout(cct,10) << __func__ << " interpreting name '" << mon_name
+ << "' as rank " << rank << dendl;
+ r->target_rank = rank;
+ } else {
+ r->target_name = std::move(mon_name);
+ }
+ r->cmd = std::move(cmd);
+ r->inbl = std::move(inbl);
+ mon_commands.emplace(r->tid, r);
+ _send_command(r);
}
- r->cmd = cmd;
- r->inbl = inbl;
- mon_commands.emplace(r->tid, r);
- _send_command(r);
- }
- }
- return init.result.get();
+ }, consigned);
}
class ContextVerter {
}
};
- void start_mon_command(const std::vector<std::string>& cmd, const bufferlist& inbl,
+ void start_mon_command(std::vector<std::string> cmd, bufferlist inbl,
bufferlist *outbl, std::string *outs,
Context *onfinish) {
- start_mon_command(cmd, inbl, ContextVerter(outs, outbl, onfinish));
+ start_mon_command(std::move(cmd), std::move(inbl),
+ ContextVerter(outs, outbl, onfinish));
}
- void start_mon_command(int mon_rank,
- const std::vector<std::string>& cmd, const bufferlist& inbl,
- bufferlist *outbl, std::string *outs,
+ void start_mon_command(int mon_rank, std::vector<std::string> cmd,
+ bufferlist inbl, bufferlist *outbl, std::string *outs,
Context *onfinish) {
- start_mon_command(mon_rank, cmd, inbl, ContextVerter(outs, outbl, onfinish));
+ start_mon_command(mon_rank, std::move(cmd), std::move(inbl),
+ ContextVerter(outs, outbl, onfinish));
}
- void start_mon_command(const std::string &mon_name, ///< mon name, with mon. prefix
- const std::vector<std::string>& cmd, const bufferlist& inbl,
+ void start_mon_command(std::string mon_name, ///< mon name, with mon. prefix
+ std::vector<std::string> cmd, bufferlist inbl,
bufferlist *outbl, std::string *outs,
Context *onfinish) {
- start_mon_command(mon_name, cmd, inbl, ContextVerter(outs, outbl, onfinish));
+ start_mon_command(std::move(mon_name), std::move(cmd), std::move(inbl),
+ ContextVerter(outs, outbl, onfinish));
}
*/
template<typename CompletionToken>
auto get_version(std::string&& map, CompletionToken&& token) {
- boost::asio::async_completion<CompletionToken, VersionSig> init(token);
- {
- std::scoped_lock l(monc_lock);
- auto m = ceph::make_message<MMonGetVersion>();
- m->what = std::move(map);
- m->handle = ++version_req_id;
- version_requests.emplace(m->handle,
- VersionCompletion::create(
- service.get_executor(),
- std::move(init.completion_handler)));
- _send_mon_message(m);
- }
- return init.result.get();
+ namespace asio = boost::asio;
+ auto consigned = asio::consign(
+ std::forward<CompletionToken>(token), asio::make_work_guard(
+ asio::get_associated_executor(token, service.get_executor())));
+ return asio::async_initiate<decltype(consigned), VersionSig>(
+ [this, map = std::move(map)](auto handler) mutable {
+ std::scoped_lock l(monc_lock);
+ auto m = ceph::make_message<MMonGetVersion>();
+ m->what = std::move(map);
+ m->handle = ++version_req_id;
+ version_requests.emplace(m->handle, std::move(handler));
+ _send_mon_message(m);
+ }, consigned);
}
/**
private:
- std::map<ceph_tid_t, std::unique_ptr<VersionCompletion>> version_requests;
+ std::map<ceph_tid_t, VersionCompletion> version_requests;
ceph_tid_t version_req_id;
void handle_get_version_reply(MMonGetVersionReply* m);
md_config_t::config_callback config_cb;
return osdmap.lookup_pg_pool_name(name);
});
if (ret < 0)
- asio::dispatch(asio::append(std::move(c), osdc_errc::pool_dne,
+ asio::dispatch(asio::append(std::move(c),
+ make_error_code(osdc_errc::pool_dne),
std::int64_t(0)));
else
asio::dispatch(asio::append(std::move(c), bs::error_code{}, ret));
return impl->objecter->with_osdmap([pool](const OSDMap& osdmap) {
const auto pgpool = osdmap.get_pg_pool(pool);
if (!pgpool) {
- throw bs::system_error(bs::error_code(errc::pool_dne));
+ throw bs::system_error(errc::pool_dne);
}
return pgpool->is_unmanaged_snaps_mode();
});
return impl->objecter->with_osdmap([pool](const OSDMap& osdmap) {
int64_t poolid = osdmap.lookup_pg_pool_name(pool);
if (poolid < 0) {
- throw bs::system_error(bs::error_code(errc::pool_dne));
+ throw bs::system_error(errc::pool_dne);
}
const auto pgpool = osdmap.get_pg_pool(poolid);
if (!pgpool) {
- throw bs::system_error(bs::error_code(errc::pool_dne));
+ throw bs::system_error(errc::pool_dne);
}
return pgpool->is_unmanaged_snaps_mode();
});
return impl->objecter->with_osdmap([pool](const OSDMap& osdmap) {
int64_t poolid = osdmap.lookup_pg_pool_name(pool);
if (poolid < 0) {
- throw bs::system_error(bs::error_code(errc::pool_dne));
+ throw bs::system_error(errc::pool_dne);
}
const auto pgpool = osdmap.get_pg_pool(poolid);
if (!pgpool) {
- throw bs::system_error(bs::error_code(errc::pool_dne));
+ throw bs::system_error(errc::pool_dne);
}
std::vector<std::uint64_t> snaps;
for (const auto& [snapid, snapinfo] : pgpool->snaps) {
return impl->objecter->with_osdmap([pool, snap](const OSDMap& osdmap) {
const auto pgpool = osdmap.get_pg_pool(pool);
if (!pgpool) {
- throw bs::system_error(bs::error_code(errc::pool_dne));
+ throw bs::system_error(errc::pool_dne);
}
for (const auto& [id, snapinfo] : pgpool->snaps) {
if (snapinfo.name == snap) return id;
}
- throw bs::system_error(bs::error_code(errc::snap_dne));
+ throw bs::system_error(errc::snap_dne);
});
}
return impl->objecter->with_osdmap([pool, snap](const OSDMap& osdmap) {
int64_t poolid = osdmap.lookup_pg_pool_name(pool);
if (poolid < 0) {
- throw bs::system_error(bs::error_code(errc::pool_dne));
+ throw bs::system_error(errc::pool_dne);
}
const auto pgpool = osdmap.get_pg_pool(poolid);
if (!pgpool) {
- throw bs::system_error(bs::error_code(errc::pool_dne));
+ throw bs::system_error(errc::pool_dne);
}
for (const auto& [id, snapinfo] : pgpool->snaps) {
if (snapinfo.name == snap) return id;
}
- throw bs::system_error(bs::error_code(errc::snap_dne));
+ throw bs::system_error(errc::snap_dne);
});
}
return impl->objecter->with_osdmap([pool, snap](const OSDMap& osdmap) {
const auto pgpool = osdmap.get_pg_pool(pool);
if (!pgpool) {
- throw bs::system_error(bs::error_code(errc::pool_dne));
+ throw bs::system_error(errc::pool_dne);
}
if (auto i = pgpool->snaps.find(snap); i == pgpool->snaps.cend()) {
- throw bs::system_error(bs::error_code(errc::snap_dne));
+ throw bs::system_error(errc::snap_dne);
} else {
return i->second.name;
}
return impl->objecter->with_osdmap([pool, snap](const OSDMap& osdmap) {
int64_t poolid = osdmap.lookup_pg_pool_name(pool);
if (poolid < 0) {
- throw bs::system_error(bs::error_code(errc::pool_dne));
+ throw bs::system_error(errc::pool_dne);
}
const auto pgpool = osdmap.get_pg_pool(poolid);
if (!pgpool) {
throw bs::system_error(bs::error_code(errc::pool_dne));
}
if (auto i = pgpool->snaps.find(snap); i == pgpool->snaps.cend()) {
- throw bs::system_error(bs::error_code(errc::snap_dne));
+ throw bs::system_error(errc::snap_dne);
} else {
return i->second.name;
}
return impl->objecter->with_osdmap([pool, snap](const OSDMap& osdmap) {
const auto pgpool = osdmap.get_pg_pool(pool);
if (!pgpool) {
- throw bs::system_error(bs::error_code(errc::pool_dne));
+ throw bs::system_error(errc::pool_dne);
}
if (auto i = pgpool->snaps.find(snap); i == pgpool->snaps.cend()) {
- throw bs::system_error(bs::error_code(errc::snap_dne));
+ throw bs::system_error(errc::snap_dne);
} else {
return i->second.stamp.to_real_time();
}
return impl->objecter->with_osdmap([pool, snap](const OSDMap& osdmap) {
int64_t poolid = osdmap.lookup_pg_pool_name(pool);
if (poolid < 0) {
- throw bs::system_error(bs::error_code(errc::pool_dne));
+ throw bs::system_error(errc::pool_dne);
}
const auto pgpool = osdmap.get_pg_pool(poolid);
if (!pgpool) {
- throw bs::system_error(bs::error_code(errc::pool_dne));
+ throw bs::system_error(errc::pool_dne);
}
if (auto i = pgpool->snaps.find(snap); i == pgpool->snaps.cend()) {
- throw bs::system_error(bs::error_code(errc::snap_dne));
+ throw bs::system_error(errc::snap_dne);
} else {
return i->second.stamp.to_real_time();
}
n->add_handler(id, std::move(c));
} catch (const std::bad_any_cast&) {
dispatch(asio::append(std::move(c),
- bs::error_code(errc::polled_callback_watch),
+ make_error_code(errc::polled_callback_watch),
Notification{}));
}
}
<< " dne" << dendl;
if (op->has_completion()) {
num_in_flight--;
- op->complete(osdc_errc::pool_dne, -ENOENT, service.get_executor());
+ op->complete(make_error_code(osdc_errc::pool_dne), -ENOENT, service.get_executor());
}
OSDSession *s = op->session;
<< " has eio" << dendl;
if (op->has_completion()) {
num_in_flight--;
- op->complete(osdc_errc::pool_eio, -EIO, service.get_executor());
+ op->complete(make_error_code(osdc_errc::pool_eio), -EIO,
+ service.get_executor());
}
OSDSession *s = op->session;
if (op->on_reg_commit) {
asio::defer(service.get_executor(),
asio::append(std::move(op->on_reg_commit),
- osdc_errc::pool_dne, cb::list{}));
+ make_error_code(osdc_errc::pool_dne),
+ cb::list{}));
op->on_reg_commit = nullptr;
}
if (op->on_notify_finish) {
asio::defer(service.get_executor(),
asio::append(std::move(op->on_notify_finish),
- osdc_errc::pool_dne, cb::list{}));
+ make_error_code(osdc_errc::pool_dne),
+ cb::list{}));
op->on_notify_finish = nullptr;
}
*need_unregister = true;
if (op->on_reg_commit) {
asio::defer(service.get_executor(),
asio::append(std::move(op->on_reg_commit),
- osdc_errc::pool_dne, cb::list{}));
+ make_error_code(osdc_errc::pool_dne), cb::list{}));
}
if (op->on_notify_finish) {
asio::defer(service.get_executor(),
asio::append(std::move(op->on_notify_finish),
- osdc_errc::pool_dne, cb::list{}));
+ make_error_code(osdc_errc::pool_dne), cb::list{}));
}
}
break;
case RECALC_OP_TARGET_POOL_EIO:
if (op->has_completion()) {
- op->complete(osdc_errc::pool_eio, -EIO, service.get_executor());
+ op->complete(make_error_code(osdc_errc::pool_eio), -EIO,
+ service.get_executor());
}
return;
}
const pg_pool_t *p = osdmap->get_pg_pool(pool);
if (!p) {
asio::defer(service.get_executor(),
- asio::append(std::move(onfinish), osdc_errc::pool_dne, cb::list{}));
+ asio::append(std::move(onfinish),
+ make_error_code(osdc_errc::pool_dne), cb::list{}));
return;
}
if (p->snap_exists(snap_name)) {
const pg_pool_t *p = osdmap->get_pg_pool(pool);
if (!p) {
asio::defer(service.get_executor(),
- asio::append(std::move(onfinish), osdc_errc::pool_dne,
+ asio::append(std::move(onfinish),
+ make_error_code(osdc_errc::pool_dne),
cb::list{}));
return;
}
if (!p->snap_exists(snap_name)) {
asio::defer(service.get_executor(),
- asio::append(std::move(onfinish), osdc_errc::snapshot_dne, cb::list{}));
+ asio::append(std::move(onfinish),
+ make_error_code(osdc_errc::snapshot_dne),
+ cb::list{}));
return;
}
if (osdmap->lookup_pg_pool_name(name) >= 0) {
asio::defer(service.get_executor(),
- asio::append(std::move(onfinish), osdc_errc::pool_exists,
+ asio::append(std::move(onfinish),
+ make_error_code(osdc_errc::pool_exists),
cb::list{}));
return;
}
if (!osdmap->have_pg_pool(pool))
asio::defer(service.get_executor(),
- asio::append(std::move(onfinish), osdc_errc::pool_dne,
+ asio::append(std::move(onfinish),
+ make_error_code(osdc_errc::pool_dne),
cb::list{}));
else
_do_delete_pool(pool, std::move(onfinish));
if (pool < 0)
// This only returns one error: -ENOENT.
asio::defer(service.get_executor(),
- asio::append(std::move(onfinish), osdc_errc::pool_dne,
+ asio::append(std::move(onfinish),
+ make_error_code(osdc_errc::pool_dne),
cb::list{}));
else
_do_delete_pool(pool, std::move(onfinish));
#include <variant>
#include <boost/container/small_vector.hpp>
+#include <boost/asio/bind_executor.hpp>
#include <boost/asio/any_completion_handler.hpp>
#include <boost/asio/append.hpp>
#include <boost/asio/async_result.hpp>
}, consigned);
}
- auto wait_for_latest_osdmap(std::unique_ptr<ceph::async::Completion<OpSignature>> c) {
+ auto wait_for_latest_osdmap(boost::asio::any_completion_handler<OpSignature> c) {
wait_for_latest_osdmap([c = std::move(c)](boost::system::error_code e) mutable {
- c->dispatch(std::move(c), e);
+ boost::asio::dispatch(boost::asio::append(std::move(c), e));
});
}
op_submit(o);
}
- void mutate(const object_t& oid, const object_locator_t& oloc,
- ObjectOperation&& op, const SnapContext& snapc,
- ceph::real_time mtime, int flags,
- std::unique_ptr<ceph::async::Completion<Op::OpSig>> oncommit,
- version_t *objver = NULL, osd_reqid_t reqid = osd_reqid_t(),
- ZTracer::Trace *parent_trace = nullptr) {
- mutate(oid, oloc, std::move(op), snapc, mtime, flags,
- [c = std::move(oncommit)](boost::system::error_code ec) mutable {
- c->dispatch(std::move(c), ec);
- }, objver, reqid, parent_trace);
- }
-
Op *prepare_read_op(
const object_t& oid, const object_locator_t& oloc,
ObjectOperation& op,
op_submit(o);
}
- void read(const object_t& oid, const object_locator_t& oloc,
- ObjectOperation&& op, snapid_t snapid, ceph::buffer::list *pbl,
- int flags, std::unique_ptr<ceph::async::Completion<Op::OpSig>> onack,
- version_t *objver = nullptr, int *data_offset = nullptr,
- uint64_t features = 0, ZTracer::Trace *parent_trace = nullptr) {
- read(oid, oloc, std::move(op), snapid, pbl, flags,
- [c = std::move(onack)](boost::system::error_code e) mutable {
- c->dispatch(std::move(c), e);
- }, objver, data_offset, features, parent_trace);
- }
-
-
Op *prepare_pg_read_op(
uint32_t hash, object_locator_t oloc,
ObjectOperation& op, ceph::buffer::list *pbl, int flags,
rados = &store->get_neorados();
try {
// Blocking in startup code, not ideal, but won't hurt anything.
- std::exception_ptr eptr
- = asio::co_spawn(store->get_io_context(),
- start(dpp, zoneparams.log_pool,
- background_tasks, background_tasks,
- background_tasks),
- async::use_blocked);
- if (eptr) {
- std::rethrow_exception(eptr);
- }
+ asio::co_spawn(store->get_io_context(),
+ start(dpp, zoneparams.log_pool,
+ background_tasks, background_tasks,
+ background_tasks),
+ async::use_blocked);
} catch (const sys::system_error& e) {
ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
<< ": Failed to start datalog: " << e.what()
ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
<< ": Failed to start datalog: " << e.what()
<< dendl;
- return -EIO;
+ return ceph::from_exception(std::current_exception());
}
return 0;
}
const rgw::bucket_log_layout_generation& gen,
int shard_id, optional_yield y)
{
- std::exception_ptr eptr;
- if (y) {
- try {
+ try {
+ if (y) {
add_entry(dpp, bucket_info, gen, shard_id, y.get_yield_context());
- } catch (const std::exception&) {
- eptr = std::current_exception();
+ } else {
+ maybe_warn_about_blocking(dpp);
+ asio::spawn(rados->get_executor(),
+ [this, dpp, &bucket_info, &gen,
+ &shard_id](asio::yield_context y) {
+ add_entry(dpp, bucket_info, gen, shard_id, y);
+ }, async::use_blocked);
}
- } else {
- maybe_warn_about_blocking(dpp);
- eptr = asio::spawn(rados->get_executor(),
- [this, dpp, &bucket_info, &gen,
- &shard_id](asio::yield_context y) {
- add_entry(dpp, bucket_info, gen, shard_id, y);
- },
- async::use_blocked);
- }
- return ceph::from_exception(eptr);
+ } catch (const std::exception&) {
+ return ceph::from_exception(std::current_exception());
+ }
+ return 0;
}
asio::awaitable<std::tuple<std::span<rgw_data_change_log_entry>,
std::string_view marker, std::string* out_marker, bool* truncated,
std::string* errstr, optional_yield y)
{
- std::exception_ptr eptr;
std::tuple<std::span<rgw_data_change_log_entry>,
std::string> out;
if (shard >= num_shards) [[unlikely]] {
if (std::ssize(entries) < max_entries) {
entries.resize(max_entries);
}
- if (y) {
- auto& yield = y.get_yield_context();
- try {
+ try {
+ if (y) {
+ auto& yield = y.get_yield_context();
out = asio::co_spawn(yield.get_executor(),
bes->list(dpp, shard, entries,
std::string{marker}),
yield);
- } catch (const std::exception&) {
- eptr = std::current_exception();
+ } else {
+ maybe_warn_about_blocking(dpp);
+ out = asio::co_spawn(rados->get_executor(),
+ bes->list(dpp, shard, entries,
+ std::string{marker}),
+ async::use_blocked);
}
- } else {
- maybe_warn_about_blocking(dpp);
- std::tie(eptr, out) = asio::co_spawn(rados->get_executor(),
- bes->list(dpp, shard, entries,
- std::string{marker}),
- async::use_blocked);
- }
- if (eptr) {
- return ceph::from_exception(eptr);
+ } catch (const std::exception&) {
+ return ceph::from_exception(std::current_exception());
}
auto& [outries, outmark] = out;
if (auto size = std::ssize(outries); size < std::ssize(entries)) {
RGWDataChangesLogMarker& marker, bool *ptruncated,
optional_yield y)
{
- std::exception_ptr eptr;
std::tuple<std::vector<rgw_data_change_log_entry>,
RGWDataChangesLogMarker> out;
if (std::ssize(entries) < max_entries) {
entries.resize(max_entries);
}
- if (y) {
- auto& yield = y.get_yield_context();
- try {
+ try {
+ if (y) {
+ auto& yield = y.get_yield_context();
out = asio::co_spawn(yield.get_executor(),
list_entries(dpp, max_entries,
RGWDataChangesLogMarker{marker}),
yield);
- } catch (const std::exception&) {
- eptr = std::current_exception();
- }
} else {
- maybe_warn_about_blocking(dpp);
- std::tie(eptr, out) =
- asio::co_spawn(rados->get_executor(),
- list_entries(dpp, max_entries,
- RGWDataChangesLogMarker{marker}),
- async::use_blocked);
- }
- if (eptr) {
- return ceph::from_exception(eptr);
+ maybe_warn_about_blocking(dpp);
+ out = asio::co_spawn(rados->get_executor(),
+ list_entries(dpp, max_entries,
+ RGWDataChangesLogMarker{marker}),
+ async::use_blocked);
+ }
+ } catch (const std::exception&) {
+ return ceph::from_exception(std::current_exception());
}
auto& [outries, outmark] = out;
if (auto size = std::ssize(outries); size < std::ssize(entries)) {
}
}
auto be = bes->head();
- std::exception_ptr eptr;
- if (y) {
- auto& yield = y.get_yield_context();
- try {
+ try {
+ if (y) {
+ auto& yield = y.get_yield_context();
*info = asio::co_spawn(yield.get_executor(),
be->get_info(dpp, shard_id),
yield);
- } catch (const std::exception&) {
- eptr = std::current_exception();
+ } else {
+ maybe_warn_about_blocking(dpp);
+ *info = asio::co_spawn(rados->get_executor(),
+ be->get_info(dpp, shard_id),
+ async::use_blocked);
}
- } else {
- maybe_warn_about_blocking(dpp);
- std::tie(eptr, *info) = asio::co_spawn(rados->get_executor(),
- be->get_info(dpp, shard_id),
- async::use_blocked);
+ } catch (const std::exception&) {
+ return ceph::from_exception(std::current_exception());
}
if (!info->marker.empty()) {
info->marker = gencursor(be->gen_id, info->marker);
}
- return ceph::from_exception(eptr);
+ return 0;
}
asio::awaitable<void> DataLogBackends::trim_entries(
shard_id, num_shards);
}
}
- std::exception_ptr eptr;
- if (y) {
- auto& yield = y.get_yield_context();
- try {
+ try {
+ if (y) {
+ auto& yield = y.get_yield_context();
asio::co_spawn(yield.get_executor(),
bes->trim_entries(dpp, shard_id, marker),
yield);
- } catch (const std::exception& e) {
- eptr = std::current_exception();
+ } else {
+ maybe_warn_about_blocking(dpp);
+ asio::co_spawn(rados->get_executor(),
+ bes->trim_entries(dpp, shard_id, marker),
+ async::use_blocked);
}
- } else {
- maybe_warn_about_blocking(dpp);
- eptr = asio::co_spawn(rados->get_executor(),
- bes->trim_entries(dpp, shard_id, marker),
- async::use_blocked);
+ } catch (const std::exception& e) {
+ return ceph::from_exception(std::current_exception());
}
- return ceph::from_exception(eptr);
+ return 0;
}
int RGWDataChangesLog::trim_entries(const DoutPrefixProvider* dpp, int shard_id,
void RGWDataChangesLog::blocking_shutdown() {
if (!down_flag) {
try {
- auto eptr = asio::co_spawn(rados->get_io_context(),
- shutdown_or_timeout(),
- async::use_blocked);
- if (eptr) {
- std::rethrow_exception(eptr);
- }
+ asio::co_spawn(rados->get_io_context(),
+ shutdown_or_timeout(),
+ async::use_blocked);
} catch (const sys::system_error& e) {
lderr(cct) << __PRETTY_FUNCTION__
<< ": Failed to shutting down: " << e.what()
}
int RGWDataChangesLog::change_format(const DoutPrefixProvider *dpp,
- log_type type,optional_yield y) {
- std::exception_ptr eptr;
- if (y) {
- auto& yield = y.get_yield_context();
- try {
+ log_type type,optional_yield y)
+{
+ try {
+ if (y) {
+ auto& yield = y.get_yield_context();
asio::co_spawn(yield.get_executor(),
bes->new_backing(dpp, type),
yield);
- } catch (const std::exception&) {
- eptr = std::current_exception();
+ } else {
+ maybe_warn_about_blocking(dpp);
+ asio::co_spawn(rados->get_executor(),
+ bes->new_backing(dpp, type),
+ async::use_blocked);
}
- } else {
- maybe_warn_about_blocking(dpp);
- eptr = asio::co_spawn(rados->get_executor(),
- bes->new_backing(dpp, type),
- async::use_blocked);
+ } catch (const std::exception&) {
+ return ceph::from_exception(std::current_exception());
}
- return ceph::from_exception(eptr);
+ return 0;;
}
int RGWDataChangesLog::trim_generations(const DoutPrefixProvider *dpp,
std::optional<uint64_t>& through,
- optional_yield y) {
- std::exception_ptr eptr;
- if (y) {
- auto& yield = y.get_yield_context();
- try {
+ optional_yield y)
+{
+ try {
+ if (y) {
+ auto& yield = y.get_yield_context();
asio::co_spawn(yield.get_executor(),
bes->trim_generations(dpp, through),
yield);
- } catch (const std::exception& e) {
- eptr = std::current_exception();
+ } else {
+ maybe_warn_about_blocking(dpp);
+ asio::co_spawn(rados->get_executor(),
+ bes->trim_generations(dpp, through),
+ async::use_blocked);
}
-
- } else {
- maybe_warn_about_blocking(dpp);
- eptr = asio::co_spawn(rados->get_executor(),
- bes->trim_generations(dpp, through),
- async::use_blocked);
+ } catch (const std::exception& e) {
+ return ceph::from_exception(std::current_exception());
}
- return ceph::from_exception(eptr);
+
+ return 0;
}
asio::awaitable<std::pair<bc::flat_map<std::string, uint64_t>,
int run_coro(asio::awaitable<void> coro, std::string_view name) {
try {
// Blocking in startup code, not ideal, but won't hurt anything.
- std::exception_ptr eptr
- = asio::co_spawn(static_cast<rgw::sal::RadosStore*>(driver)->get_io_context(),
- std::move(coro),
- async::use_blocked);
- if (eptr) {
- std::rethrow_exception(eptr);
- }
+ asio::co_spawn(static_cast<rgw::sal::RadosStore*>(driver)->get_io_context(),
+ std::move(coro),
+ async::use_blocked);
} catch (boost::system::system_error& e) {
ldpp_dout(dpp(), -1) << name << ": failed: " << e.what() << dendl;
return ceph::from_error_code(e.code());
*/
+#include <boost/asio/append.hpp>
+#include <boost/asio/async_result.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/post.hpp>
#include <boost/asio/steady_timer.hpp>
+
#include <boost/system/error_code.hpp>
#include <gtest/gtest.h>
-#include "common/async/bind_handler.h"
#include "common/async/blocked_completion.h"
-#include "common/async/forward_handler.h"
-
using namespace std::literals;
-namespace ba = boost::asio;
-namespace bs = boost::system;
-namespace ca = ceph::async;
+namespace asio = boost::asio;
+namespace sys = boost::system;
+namespace async = ceph::async;
class context_thread {
- ba::io_context c;
- ba::executor_work_guard<ba::io_context::executor_type> guard;
+ asio::io_context c;
+ asio::executor_work_guard<asio::io_context::executor_type> guard;
std::thread th;
public:
context_thread() noexcept
- : guard(ba::make_work_guard(c)),
+ : guard(asio::make_work_guard(c)),
th([this]() noexcept { c.run();}) {}
~context_thread() {
th.join();
}
- ba::io_context& io_context() noexcept {
+ asio::io_context& io_context() noexcept {
return c;
}
- ba::io_context::executor_type get_executor() noexcept {
+ asio::io_context::executor_type get_executor() noexcept {
return c.get_executor();
}
};
auto id(const Executor& executor, CompletionToken&& token,
Args&& ...args)
{
- ba::async_completion<CompletionToken, void(Args...)> init(token);
- boost::asio::post(ca::forward_handler(
- ca::bind_handler(std::move(init.completion_handler),
- std::forward<Args>(args)...)));
- return init.result.get();
+ return asio::async_initiate<CompletionToken, void(Args...)>(
+ []<typename ...Args2>(auto handler, Args2&& ...args2) mutable {
+ asio::post(asio::append(std::move(handler),
+ std::forward<Args2>(args2)...));
+ }, token, std::forward<Args>(args)...);
}
TEST(BlockedCompletion, Void)
{
context_thread t;
- ba::post(t.get_executor(), ca::use_blocked);
+ asio::post(t.get_executor(), async::use_blocked);
}
TEST(BlockedCompletion, Timer)
{
context_thread t;
- ba::steady_timer timer(t.io_context(), 50ms);
- timer.async_wait(ca::use_blocked);
+ asio::steady_timer timer(t.io_context(), 50ms);
+ timer.async_wait(async::use_blocked);
}
TEST(BlockedCompletion, NoError)
{
context_thread t;
- ba::steady_timer timer(t.io_context(), 1s);
- bs::error_code ec;
+ asio::steady_timer timer(t.io_context(), 1s);
+ sys::error_code ec;
- EXPECT_NO_THROW(id(t.get_executor(), ca::use_blocked, bs::error_code{}));
- EXPECT_NO_THROW(id(t.get_executor(), ca::use_blocked[ec], bs::error_code{}));
+ EXPECT_NO_THROW(id(t.get_executor(), async::use_blocked, sys::error_code{}));
+ EXPECT_NO_THROW(id(t.get_executor(), async::use_blocked[ec], sys::error_code{}));
EXPECT_FALSE(ec);
int i;
- EXPECT_NO_THROW(i = id(t.get_executor(), ca::use_blocked,
- bs::error_code{}, 5));
+ EXPECT_NO_THROW(i = id(t.get_executor(), async::use_blocked,
+ sys::error_code{}, 5));
ASSERT_EQ(5, i);
- EXPECT_NO_THROW(i = id(t.get_executor(), ca::use_blocked[ec],
- bs::error_code{}, 7));
+ EXPECT_NO_THROW(
+ i = id(t.get_executor(), async::use_blocked[ec], sys::error_code{}, 7));
EXPECT_FALSE(ec);
ASSERT_EQ(7, i);
float j;
- EXPECT_NO_THROW(std::tie(i, j) = id(t.get_executor(), ca::use_blocked, 9,
+ EXPECT_NO_THROW(std::tie(i, j) = id(t.get_executor(), async::use_blocked, 9,
3.5));
ASSERT_EQ(9, i);
ASSERT_EQ(3.5, j);
- EXPECT_NO_THROW(std::tie(i, j) = id(t.get_executor(), ca::use_blocked[ec],
+ EXPECT_NO_THROW(std::tie(i, j) = id(t.get_executor(), async::use_blocked[ec],
11, 2.25));
EXPECT_FALSE(ec);
ASSERT_EQ(11, i);
TEST(BlockedCompletion, AnError)
{
context_thread t;
- ba::steady_timer timer(t.io_context(), 1s);
- bs::error_code ec;
-
- EXPECT_THROW(id(t.get_executor(), ca::use_blocked,
- bs::error_code{EDOM, bs::system_category()}),
- bs::system_error);
- EXPECT_NO_THROW(id(t.get_executor(), ca::use_blocked[ec],
- bs::error_code{EDOM, bs::system_category()}));
- EXPECT_EQ(bs::error_code(EDOM, bs::system_category()), ec);
-
- EXPECT_THROW(id(t.get_executor(), ca::use_blocked,
- bs::error_code{EDOM, bs::system_category()}, 5),
- bs::system_error);
- EXPECT_NO_THROW(id(t.get_executor(), ca::use_blocked[ec],
- bs::error_code{EDOM, bs::system_category()}, 5));
- EXPECT_EQ(bs::error_code(EDOM, bs::system_category()), ec);
-
- EXPECT_THROW(id(t.get_executor(), ca::use_blocked,
- bs::error_code{EDOM, bs::system_category()}, 5, 3),
- bs::system_error);
- EXPECT_NO_THROW(id(t.get_executor(), ca::use_blocked[ec],
- bs::error_code{EDOM, bs::system_category()}, 5, 3));
- EXPECT_EQ(bs::error_code(EDOM, bs::system_category()), ec);
+ asio::steady_timer timer(t.io_context(), 1s);
+ sys::error_code ec;
+
+ EXPECT_THROW(id(t.get_executor(), async::use_blocked,
+ sys::error_code{EDOM, sys::system_category()}),
+ sys::system_error);
+ EXPECT_NO_THROW(id(t.get_executor(), async::use_blocked[ec],
+ sys::error_code{EDOM, sys::system_category()}));
+ EXPECT_EQ(sys::error_code(EDOM, sys::system_category()), ec);
+
+ EXPECT_THROW(id(t.get_executor(), async::use_blocked,
+ sys::error_code{EDOM, sys::system_category()}, 5),
+ sys::system_error);
+ EXPECT_NO_THROW(id(t.get_executor(), async::use_blocked[ec],
+ sys::error_code{EDOM, sys::system_category()}, 5));
+ EXPECT_EQ(sys::error_code(EDOM, sys::system_category()), ec);
+
+ EXPECT_THROW(id(t.get_executor(), async::use_blocked,
+ sys::error_code{EDOM, sys::system_category()}, 5, 3),
+ sys::system_error);
+ EXPECT_NO_THROW(id(t.get_executor(), async::use_blocked[ec],
+ sys::error_code{EDOM, sys::system_category()}, 5, 3));
+ EXPECT_EQ(sys::error_code(EDOM, sys::system_category()), ec);
}
TEST(BlockedCompletion, MoveOnly)
{
context_thread t;
- ba::steady_timer timer(t.io_context(), 1s);
- bs::error_code ec;
+ asio::steady_timer timer(t.io_context(), 1s);
+ sys::error_code ec;
- EXPECT_NO_THROW(id(t.get_executor(), ca::use_blocked,
- bs::error_code{}, move_only{}));
- EXPECT_NO_THROW(id(t.get_executor(), ca::use_blocked[ec],
- bs::error_code{}, move_only{}));
+ EXPECT_NO_THROW(id(t.get_executor(), async::use_blocked,
+ sys::error_code{}, move_only{}));
+ EXPECT_NO_THROW(id(t.get_executor(), async::use_blocked[ec],
+ sys::error_code{}, move_only{}));
EXPECT_FALSE(ec);
{
- auto [i, j] = id(t.get_executor(), ca::use_blocked, move_only{}, 5);
+ auto [i, j] = id(t.get_executor(), async::use_blocked, move_only{}, 5);
EXPECT_EQ(j, 5);
}
{
- auto [i, j] = id(t.get_executor(), ca::use_blocked[ec], move_only{}, 5);
+ auto [i, j] = id(t.get_executor(), async::use_blocked[ec], move_only{}, 5);
EXPECT_EQ(j, 5);
}
EXPECT_FALSE(ec);
- EXPECT_THROW(id(t.get_executor(), ca::use_blocked,
- bs::error_code{EDOM, bs::system_category()}, move_only{}),
- bs::system_error);
- EXPECT_NO_THROW(id(t.get_executor(), ca::use_blocked[ec],
- bs::error_code{EDOM, bs::system_category()}, move_only{}));
- EXPECT_EQ(bs::error_code(EDOM, bs::system_category()), ec);
+ EXPECT_THROW(id(t.get_executor(), async::use_blocked,
+ sys::error_code{EDOM, sys::system_category()}, move_only{}),
+ sys::system_error);
+ EXPECT_NO_THROW(id(t.get_executor(), async::use_blocked[ec],
+ sys::error_code{EDOM, sys::system_category()}, move_only{}));
+ EXPECT_EQ(sys::error_code(EDOM, sys::system_category()), ec);
- EXPECT_THROW(id(t.get_executor(), ca::use_blocked,
- bs::error_code{EDOM, bs::system_category()}, move_only{}, 3),
- bs::system_error);
- EXPECT_NO_THROW(id(t.get_executor(), ca::use_blocked[ec],
- bs::error_code{EDOM, bs::system_category()},
+ EXPECT_THROW(id(t.get_executor(), async::use_blocked,
+ sys::error_code{EDOM, sys::system_category()}, move_only{}, 3),
+ sys::system_error);
+ EXPECT_NO_THROW(id(t.get_executor(), async::use_blocked[ec],
+ sys::error_code{EDOM, sys::system_category()},
move_only{}, 3));
- EXPECT_EQ(bs::error_code(EDOM, bs::system_category()), ec);
+ EXPECT_EQ(sys::error_code(EDOM, sys::system_category()), ec);
}
TEST(BlockedCompletion, DefaultLess)
{
context_thread t;
- ba::steady_timer timer(t.io_context(), 1s);
- bs::error_code ec;
+ asio::steady_timer timer(t.io_context(), 1s);
+ sys::error_code ec;
{
- auto l = id(t.get_executor(), ca::use_blocked, bs::error_code{}, defaultless{5});
+ auto l = id(t.get_executor(), async::use_blocked, sys::error_code{}, defaultless{5});
EXPECT_EQ(5, l.a);
}
{
- auto l = id(t.get_executor(), ca::use_blocked[ec], bs::error_code{}, defaultless{7});
+ auto l = id(t.get_executor(), async::use_blocked[ec], sys::error_code{}, defaultless{7});
EXPECT_EQ(7, l.a);
}
{
- auto [i, j] = id(t.get_executor(), ca::use_blocked, defaultless{3}, 5);
+ auto [i, j] = id(t.get_executor(), async::use_blocked, defaultless{3}, 5);
EXPECT_EQ(i.a, 3);
EXPECT_EQ(j, 5);
}
{
- auto [i, j] = id(t.get_executor(), ca::use_blocked[ec], defaultless{3}, 5);
+ auto [i, j] = id(t.get_executor(), async::use_blocked[ec], defaultless{3}, 5);
EXPECT_EQ(i.a, 3);
EXPECT_EQ(j, 5);
}
EXPECT_FALSE(ec);
- EXPECT_THROW(id(t.get_executor(), ca::use_blocked,
- bs::error_code{EDOM, bs::system_category()}, move_only{}),
- bs::system_error);
- EXPECT_NO_THROW(id(t.get_executor(), ca::use_blocked[ec],
- bs::error_code{EDOM, bs::system_category()}, move_only{}));
- EXPECT_EQ(bs::error_code(EDOM, bs::system_category()), ec);
-
- EXPECT_THROW(id(t.get_executor(), ca::use_blocked,
- bs::error_code{EDOM, bs::system_category()}, move_only{}, 3),
- bs::system_error);
- EXPECT_NO_THROW(id(t.get_executor(), ca::use_blocked[ec],
- bs::error_code{EDOM, bs::system_category()},
+ EXPECT_THROW(id(t.get_executor(), async::use_blocked,
+ sys::error_code{EDOM, sys::system_category()}, move_only{}),
+ sys::system_error);
+ EXPECT_NO_THROW(id(t.get_executor(), async::use_blocked[ec],
+ sys::error_code{EDOM, sys::system_category()}, move_only{}));
+ EXPECT_EQ(sys::error_code(EDOM, sys::system_category()), ec);
+
+ EXPECT_THROW(id(t.get_executor(), async::use_blocked,
+ sys::error_code{EDOM, sys::system_category()}, move_only{}, 3),
+ sys::system_error);
+ EXPECT_NO_THROW(id(t.get_executor(), async::use_blocked[ec],
+ sys::error_code{EDOM, sys::system_category()},
move_only{}, 3));
- EXPECT_EQ(bs::error_code(EDOM, bs::system_category()), ec);
+ EXPECT_EQ(sys::error_code(EDOM, sys::system_category()), ec);
}