#include <spawn/spawn.hpp>
+// use explicit executor types instead of the type-erased boost::asio::executor.
+// coroutines wrap the default io_context executor with a strand executor
+using yield_context = spawn::basic_yield_context<
+ boost::asio::executor_binder<void(*)(),
+ boost::asio::strand<boost::asio::io_context::executor_type>>>;
+
/// optional-like wrapper for a spawn::yield_context and its associated
/// boost::asio::io_context. operations that take an optional_yield argument
/// will, when passed a non-empty yield context, suspend this coroutine instead
/// of the blocking the thread of execution
class optional_yield {
boost::asio::io_context *c = nullptr;
- spawn::yield_context *y = nullptr;
+ yield_context *y = nullptr;
public:
/// construct with a valid io and yield_context
explicit optional_yield(boost::asio::io_context& c,
- spawn::yield_context& y) noexcept
+ yield_context& y) noexcept
: c(&c), y(&y) {}
/// type tag to construct an empty object
boost::asio::io_context& get_io_context() const noexcept { return *c; }
/// return a reference to the yield_context. only valid if non-empty
- spawn::yield_context& get_yield_context() const noexcept { return *y; }
+ yield_context& get_yield_context() const noexcept { return *y; }
};
// type tag object to construct an empty optional_yield
template <typename Op>
Aio::OpFunc aio_abstract(Op&& op, boost::asio::io_context& context,
- spawn::yield_context yield) {
+ yield_context yield) {
return [op = std::move(op), &context, yield] (Aio* aio, AioResult& r) mutable {
// arrange for the completion Handler to run on the yield_context's strand
// executor so it can safely call back into Aio without locking
using namespace boost::asio;
- async_completion<spawn::yield_context, void()> init(yield);
+ async_completion<yield_context, void()> init(yield);
auto ex = get_associated_executor(init.completion_handler);
auto& ref = r.obj.get_ref();
// functions must be called within the coroutine strand
class YieldingAioThrottle final : public Aio, private Throttle {
boost::asio::io_context& context;
- spawn::yield_context yield;
+ yield_context yield;
struct Handler;
// completion callback associated with the waiter
public:
YieldingAioThrottle(uint64_t window, boost::asio::io_context& context,
- spawn::yield_context yield)
+ yield_context yield)
: Throttle(window), context(context), yield(yield)
{}
class StreamIO : public rgw::asio::ClientIO {
CephContext* const cct;
Stream& stream;
- spawn::yield_context yield;
+ yield_context yield;
parse_buffer& buffer;
ceph::timespan request_timeout;
public:
StreamIO(CephContext *cct, Stream& stream, rgw::asio::parser_type& parser,
- spawn::yield_context yield,
- parse_buffer& buffer, bool is_ssl,
+ yield_context yield, parse_buffer& buffer, bool is_ssl,
const tcp::endpoint& local_endpoint,
const tcp::endpoint& remote_endpoint,
ceph::timespan request_timeout)
SharedMutex& pause_mutex,
rgw::dmclock::Scheduler *scheduler,
boost::system::error_code& ec,
- spawn::yield_context yield,
+ yield_context yield,
ceph::timespan request_timeout)
{
// limit header to 4k, since we read it all into a single flat_buffer
#ifdef WITH_RADOSGW_BEAST_OPENSSL
if (l.use_ssl) {
spawn::spawn(context,
- [this, s=std::move(stream)] (spawn::yield_context yield) mutable {
+ [this, s=std::move(stream)] (yield_context yield) mutable {
Connection conn{s.socket()};
auto c = connections.add(conn);
// wrap the tcp_stream in an ssl stream
{
#endif // WITH_RADOSGW_BEAST_OPENSSL
spawn::spawn(context,
- [this, s=std::move(stream)] (spawn::yield_context yield) mutable {
+ [this, s=std::move(stream)] (yield_context yield) mutable {
Connection conn{s.socket()};
auto c = connections.add(conn);
auto buffer = std::make_unique<parse_buffer>();
}
};
- void file_aio_read_abstract(const DoutPrefixProvider *dpp, boost::asio::io_context& context, spawn::yield_context yield,
+ void file_aio_read_abstract(const DoutPrefixProvider *dpp, boost::asio::io_context& context, yield_context yield,
std::string& file_path, off_t read_ofs, off_t read_len,
rgw::Aio* aio, rgw::AioResult& r) {
using namespace boost::asio;
- async_completion<spawn::yield_context, void()> init(yield);
+ async_completion<yield_context, void()> init(yield);
auto ex = get_associated_executor(init.completion_handler);
auto& ref = r.obj.get_ref();
pending_tokens(0),
timer(io_context) {}
- void async_wait(spawn::yield_context yield) {
+ void async_wait(yield_context yield) {
if (pending_tokens == 0) {
return;
}
// processing of a specific entry
// return whether processing was successfull (true) or not (false)
- bool process_entry(const cls_queue_entry& entry, spawn::yield_context yield) {
+ bool process_entry(const cls_queue_entry& entry, yield_context yield) {
event_entry_t event_entry;
auto iter = entry.data.cbegin();
try {
}
// clean stale reservation from queue
- void cleanup_queue(const std::string& queue_name, spawn::yield_context yield) {
+ void cleanup_queue(const std::string& queue_name, yield_context yield) {
while (true) {
ldpp_dout(this, 20) << "INFO: trying to perform stale reservation cleanup for queue: " << queue_name << dendl;
const auto now = ceph::coarse_real_time::clock::now();
}
// processing of a specific queue
- void process_queue(const std::string& queue_name, spawn::yield_context yield) {
+ void process_queue(const std::string& queue_name, yield_context yield) {
constexpr auto max_elements = 1024;
auto is_idle = false;
const std::string start_marker;
// start a the cleanup coroutine for the queue
- spawn::spawn(io_context, [this, queue_name](spawn::yield_context yield) {
+ spawn::spawn(io_context, [this, queue_name](yield_context yield) {
cleanup_queue(queue_name, yield);
}, make_stack_allocator());
break;
}
// TODO pass entry pointer instead of by-value
- spawn::spawn(yield, [this, &queue_name, entry_idx, total_entries, &end_marker, &remove_entries, &has_error, &waiter, entry](spawn::yield_context yield) {
+ spawn::spawn(yield, [this, &queue_name, entry_idx, total_entries, &end_marker, &remove_entries, &has_error, &waiter, entry](yield_context yield) {
const auto token = waiter.make_token();
if (process_entry(entry, yield)) {
ldpp_dout(this, 20) << "INFO: processing of entry: " <<
// process all queues
// find which of the queues is owned by this daemon and process it
- void process_queues(spawn::yield_context yield) {
+ void process_queues(yield_context yield) {
auto has_error = false;
owned_queues_t owned_queues;
if (owned_queues.insert(queue_name).second) {
ldpp_dout(this, 10) << "INFO: queue: " << queue_name << " now owned (locked) by this daemon" << dendl;
// start processing this queue
- spawn::spawn(io_context, [this, &queue_gc, &queue_gc_lock, queue_name](spawn::yield_context yield) {
+ spawn::spawn(io_context, [this, &queue_gc, &queue_gc_lock, queue_name](yield_context yield) {
process_queue(queue_name, yield);
// if queue processing ended, it measn that the queue was removed or not owned anymore
// mark it for deletion
stale_reservations_period_s(_stale_reservations_period_s),
reservations_cleanup_period_s(_reservations_cleanup_period_s)
{
- spawn::spawn(io_context, [this](spawn::yield_context yield) {
+ spawn::spawn(io_context, [this] (yield_context yield) {
process_queues(yield);
}, make_stack_allocator());
entry.pipe = pipe;
// fetch remote markers
- spawn::spawn(ioctx, [&] (spawn::yield_context yield) {
+ spawn::spawn(ioctx, [&] (yield_context yield) {
auto y = optional_yield{ioctx, yield};
int r = source_bilog_markers(dpp, store->svc()->zone, entry.pipe,
entry.remote_markers, y);
}
});
// fetch source bucket info
- spawn::spawn(ioctx, [&] (spawn::yield_context yield) {
+ spawn::spawn(ioctx, [&] (yield_context yield) {
auto y = optional_yield{ioctx, yield};
auto obj_ctx = store->svc()->sysobj->init_obj_ctx();
int r = store->getRados()->get_bucket_instance_info(
{
boost::asio::io_context context;
- spawn::spawn(context, [&] (spawn::yield_context yield) {
+ spawn::spawn(context, [&] (yield_context yield) {
ClientCounters counters(g_ceph_context);
AsyncScheduler queue(g_ceph_context, context, std::ref(counters), nullptr,
[] (client_id client) -> ClientInfo* {
RGWReshardWait waiter(wait_duration);
boost::asio::io_context context;
- spawn::spawn(context, [&] (spawn::yield_context yield) {
+ spawn::spawn(context, [&] (yield_context yield) {
EXPECT_EQ(0, waiter.wait(optional_yield{context, yield}));
});
boost::asio::io_context context;
spawn::spawn(context,
- [&] (spawn::yield_context yield) {
+ [&] (yield_context yield) {
EXPECT_EQ(-ECANCELED, long_waiter.wait(optional_yield{context, yield}));
});
// spawn 4 coroutines
boost::asio::io_context context;
{
- auto async_waiter = [&] (spawn::yield_context yield) {
+ auto async_waiter = [&] (yield_context yield) {
EXPECT_EQ(-ECANCELED, long_waiter.wait(optional_yield{context, yield}));
};
spawn::spawn(context, async_waiter);
boost::asio::io_context context;
spawn::spawn(context,
- [&] (spawn::yield_context yield) {
+ [&] (yield_context yield) {
YieldingAioThrottle throttle(4, context, yield);
scoped_completion op;
auto c = throttle.get(obj, wait_on(op), 8, 0);
boost::asio::io_context context;
spawn::spawn(context,
- [&] (spawn::yield_context yield) {
+ [&] (yield_context yield) {
YieldingAioThrottle throttle(window, context, yield);
for (uint64_t i = 0; i < total; i++) {
using namespace std::chrono_literals;