target_link_libraries(cls_otp OATH::OATH)
target_include_directories(cls_otp
PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw/driver/rados"
- PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw"
- PUBLIC "${CMAKE_SOURCE_DIR}/src/spawn/include")
+ PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw")
set_target_properties(cls_otp PROPERTIES
VERSION "1.0.0"
SOVERSION "1"
target_link_libraries(cls_rgw ${FMT_LIB} json_spirit)
target_include_directories(cls_rgw
PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw/driver/rados"
- PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw"
- PUBLIC "${CMAKE_SOURCE_DIR}/src/spawn/include")
+ PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw")
set_target_properties(cls_rgw PROPERTIES
VERSION "1.0.0"
SOVERSION "1"
add_library(cls_rgw_client STATIC ${cls_rgw_client_srcs})
target_include_directories(cls_rgw_client
PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw/driver/rados"
- PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw"
- PUBLIC "${CMAKE_SOURCE_DIR}/src/spawn/include")
+ PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw")
endif (WITH_RADOSGW)
add_library(cls_rgw_gc SHARED ${cls_rgw_gc_srcs})
target_include_directories(cls_rgw_gc
PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw/driver/rados"
- PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw"
- PUBLIC "${CMAKE_SOURCE_DIR}/src/spawn/include")
+ PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw")
set_target_properties(cls_rgw_gc PROPERTIES
VERSION "1.0.0"
SOVERSION "1"
add_library(cls_rgw_gc_client STATIC ${cls_rgw_gc_client_srcs})
target_include_directories(cls_rgw_gc_client
PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw/driver/rados"
- PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw"
- PUBLIC "${CMAKE_SOURCE_DIR}/src/spawn/include")
+ PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw")
endif (WITH_RADOSGW)
#include <boost/range/begin.hpp>
#include <boost/range/end.hpp>
#include <boost/asio/io_context.hpp>
+#include <boost/asio/spawn.hpp>
#include "acconfig.h"
-#include <spawn/spawn.hpp>
-
-/// optional-like wrapper for a spawn::yield_context and its associated
-/// boost::asio::io_context. operations that take an optional_yield argument
-/// will, when passed a non-empty yield context, suspend this coroutine instead
-/// of the blocking the thread of execution
+/// optional-like wrapper for a boost::asio::yield_context. operations that take
+/// an optional_yield argument will, when passed a non-empty yield context,
+/// suspend this coroutine instead of the blocking the thread of execution
class optional_yield {
- boost::asio::io_context *c = nullptr;
- spawn::yield_context *y = nullptr;
+ boost::asio::yield_context *y = nullptr;
public:
/// construct with a valid io and yield_context
- explicit optional_yield(boost::asio::io_context& c,
- spawn::yield_context& y) noexcept
- : c(&c), y(&y) {}
+ optional_yield(boost::asio::yield_context& y) noexcept : y(&y) {}
/// type tag to construct an empty object
struct empty_t {};
/// implicit conversion to bool, returns true if non-empty
operator bool() const noexcept { return y; }
- /// return a reference to the associated io_context. only valid if non-empty
- boost::asio::io_context& get_io_context() const noexcept { return *c; }
-
/// return a reference to the yield_context. only valid if non-empty
- spawn::yield_context& get_yield_context() const noexcept { return *y; }
+ boost::asio::yield_context& get_yield_context() const noexcept { return *y; }
};
// type tag object to construct an empty optional_yield
add_library(ceph_crypto_isal SHARED ${isal_crypto_plugin_srcs})
target_include_directories(ceph_crypto_isal PRIVATE ${isal_dir}/include)
-target_link_libraries(ceph_crypto_isal PRIVATE spawn)
+target_link_libraries(ceph_crypto_isal PRIVATE Boost::context)
set_target_properties(ceph_crypto_isal PROPERTIES
VERSION 1.0.0
target_link_libraries(ceph_crypto_openssl
PRIVATE OpenSSL::Crypto
$<$<PLATFORM_ID:Windows>:ceph-common>
- spawn)
+ Boost::context)
target_include_directories(ceph_crypto_openssl PRIVATE ${OPENSSL_INCLUDE_DIR})
add_dependencies(crypto_plugins ceph_crypto_openssl)
set_target_properties(ceph_crypto_openssl PROPERTIES INSTALL_RPATH "")
target_link_libraries(ceph_crypto_qat PRIVATE
QAT::qat
QAT::usdm
- spawn)
+ Boost::context)
add_dependencies(crypto_plugins ceph_crypto_qat)
set_target_properties(ceph_crypto_qat PROPERTIES VERSION 1.0.0 SOVERSION 1)
int avail_inst = NON_INSTANCE;
if (y) {
- spawn::yield_context yield = y.get_yield_context();
+ boost::asio::yield_context yield = y.get_yield_context();
avail_inst = async_get_instance(yield);
} else {
auto result = async_get_instance(boost::asio::use_future);
do {
poll_retry_num = RETRY_MAX_NUM;
if (y) {
- spawn::yield_context yield = y.get_yield_context();
+ boost::asio::yield_context yield = y.get_yield_context();
status = helper.async_perform_op(std::span<CpaCySymDpOpData*>(pOpDataVec), yield);
} else {
auto result = helper.async_perform_op(std::span<CpaCySymDpOpData*>(pOpDataVec), boost::asio::use_future);
PUBLIC
${LUA_LIBRARIES}
RapidJSON::RapidJSON
- spawn
- ${FMT_LIB})
+ Boost::context
+ ${FMT_LIB}
+ OpenSSL::SSL)
target_include_directories(rgw_common
PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw/services"
PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw"
OATH::OATH
PUBLIC
rgw_common
- spawn)
+ Boost::context)
if(WITH_CURL_OPENSSL)
# used by rgw_http_client_curl.cc
add_library(rgw_schedulers STATIC ${rgw_schedulers_srcs})
target_link_libraries(rgw_schedulers
- PUBLIC dmclock::dmclock spawn)
+ PUBLIC dmclock::dmclock Boost::context)
set(radosgw_srcs
rgw_main.cc)
PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw"
PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw/store/rados"
PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}")
-set(link_targets spawn)
+set(link_targets Boost::context)
if(WITH_JAEGER)
list(APPEND link_targets jaeger_base)
endif()
///
/// This library uses optional_yield. Please see
/// /src/common/async/yield_context.h. In summary, optional_yield
-/// contains either a spawn::yield_context (in which case the current
+/// contains either a boost::asio::yield_context (in which case the current
/// coroutine is suspended until completion) or null_yield (in which
/// case the current thread is blocked until completion.)
///
const int max_aio = std::max(1, op_state.get_max_aio());
for (int i=0; i<max_aio; i++) {
- spawn::spawn(context, [&](spawn::yield_context yield) {
+ boost::asio::spawn(context, [&](boost::asio::yield_context yield) {
while (true) {
int shard = next_shard;
next_shard += 1;
if (shard >= max_shards) {
return;
}
- optional_yield y(context, yield);
+ optional_yield y(yield);
uint64_t shard_count;
int r = ::check_index_olh(rados_store, &*bucket, dpp, op_state, flusher, shard, &shard_count, y);
if (r < 0) {
" entries " << verb << ")" << dendl;
}
}
+ }, [] (std::exception_ptr eptr) {
+ if (eptr) std::rethrow_exception(eptr);
});
}
try {
int next_shard = 0;
boost::asio::io_context context;
for (int i=0; i<max_aio; i++) {
- spawn::spawn(context, [&](spawn::yield_context yield) {
+ boost::asio::spawn(context, [&](boost::asio::yield_context yield) {
while (true) {
int shard = next_shard;
next_shard += 1;
return;
}
uint64_t shard_count;
- optional_yield y {context, yield};
- int r = ::check_index_unlinked(rados_store, &*bucket, dpp, op_state, flusher, shard, &shard_count, y);
+ int r = ::check_index_unlinked(rados_store, &*bucket, dpp, op_state, flusher, shard, &shard_count, yield);
if (r < 0) {
ldpp_dout(dpp, -1) << "ERROR: error processing shard " << shard <<
" check_index_unlinked(): " << r << dendl;
" entries " << verb << ")" << dendl;
}
}
+ }, [] (std::exception_ptr eptr) {
+ if (eptr) std::rethrow_exception(eptr);
});
}
try {
#include <boost/asio/basic_waitable_timer.hpp>
#include <boost/asio/executor_work_guard.hpp>
#include <boost/asio/io_context.hpp>
+#include <boost/asio/spawn.hpp>
#include <boost/context/protected_fixedsize_stack.hpp>
-#include <spawn/spawn.hpp>
#include "include/function2.hpp"
#include "rgw_sal_rados.h"
#include "rgw_pubsub.h"
pending_tokens(0),
timer(io_context) {}
- void async_wait(spawn::yield_context yield) {
+ void async_wait(boost::asio::yield_context yield) {
if (pending_tokens == 0) {
return;
}
const cls_queue_entry& entry,
RGWPubSubEndpoint* const push_endpoint,
const rgw_pubsub_topic& topic,
- spawn::yield_context yield) {
+ boost::asio::yield_context yield) {
event_entry_t event_entry;
auto iter = entry.data.cbegin();
try {
<< " retry_number: "
<< entry_persistency_tracker.retires_num
<< " current time: " << time_now << dendl;
- const auto ret = push_endpoint->send(event_entry.event, optional_yield{io_context, yield});
+ const auto ret = push_endpoint->send(event_entry.event, yield);
if (ret < 0) {
ldpp_dout(this, 5) << "WARNING: push entry marker: " << entry.marker
<< " failed. error: " << ret
}
// clean stale reservation from queue
- void cleanup_queue(const std::string& queue_name, spawn::yield_context yield) {
+ void cleanup_queue(const std::string& queue_name, boost::asio::yield_context yield) {
while (!shutdown) {
ldpp_dout(this, 20) << "INFO: trying to perform stale reservation cleanup for queue: " << queue_name << dendl;
const auto now = ceph::coarse_real_time::clock::now();
"" /*no tag*/);
cls_2pc_queue_expire_reservations(op, stale_time);
// check ownership and do reservation cleanup in one batch
- auto ret = rgw_rados_operate(this, rados_store.getRados()->get_notif_pool_ctx(), queue_name, &op, optional_yield(io_context, yield));
+ auto ret = rgw_rados_operate(this, rados_store.getRados()->get_notif_pool_ctx(), queue_name, &op, yield);
if (ret == -ENOENT) {
// queue was deleted
ldpp_dout(this, 5) << "INFO: queue: "
}
// unlock (lose ownership) queue
- int unlock_queue(const std::string& queue_name, spawn::yield_context yield) {
+ int unlock_queue(const std::string& queue_name, boost::asio::yield_context yield) {
librados::ObjectWriteOperation op;
op.assert_exists();
rados::cls::lock::unlock(&op, queue_name+"_lock", lock_cookie);
auto& rados_ioctx = rados_store.getRados()->get_notif_pool_ctx();
- const auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield));
+ const auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, yield);
if (ret == -ENOENT) {
ldpp_dout(this, 10) << "INFO: queue: " << queue_name
<< ". was removed. nothing to unlock" << dendl;
int get_topic_info(const std::string& queue_name,
const cls_queue_entry& queue_entry,
rgw_pubsub_topic& topic,
- spawn::yield_context yield) {
+ boost::asio::yield_context yield) {
std::string queue_topic_tenant;
std::string queue_topic_name;
parse_topic_metadata_key(queue_name, queue_topic_tenant, queue_topic_name);
rgw_pubsub_topic topic_info;
RGWPubSub ps(&rados_store, queue_topic_tenant, site);
- int ret = ps.get_topic(this, queue_topic_name, topic_info, optional_yield{io_context, yield}, nullptr);
+ int ret = ps.get_topic(this, queue_topic_name, topic_info, yield, nullptr);
if (ret < 0) {
ldpp_dout(this, 1) << "WARNING: failed to fetch topic: "
<< queue_topic_name << " error: " << ret
}
// processing of a specific queue
- void process_queue(const std::string& queue_name, spawn::yield_context yield) {
+ void process_queue(const std::string& queue_name, boost::asio::yield_context yield) {
constexpr auto max_elements = 1024;
auto is_idle = false;
const std::string start_marker;
// start a the cleanup coroutine for the queue
- spawn::spawn(io_context, [this, queue_name](spawn::yield_context yield) {
- cleanup_queue(queue_name, yield);
- }, make_stack_allocator());
+ boost::asio::spawn(make_strand(io_context), std::allocator_arg, make_stack_allocator(),
+ [this, queue_name](boost::asio::yield_context yield) {
+ cleanup_queue(queue_name, yield);
+ }, [] (std::exception_ptr eptr) {
+ if (eptr) std::rethrow_exception(eptr);
+ });
CountersManager queue_counters_container(queue_name, this->get_cct());
"" /*no tag*/);
cls_2pc_queue_list_entries(op, start_marker, max_elements, &obl, &rval);
// check ownership and list entries in one batch
- auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, nullptr, optional_yield(io_context, yield));
+ auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, nullptr, yield);
if (ret == -ENOENT) {
// queue was deleted
topics_persistency_tracker.erase(queue_name);
}
entries_persistency_tracker& notifs_persistency_tracker = topics_persistency_tracker[queue_name];
- spawn::spawn(yield,[this, ¬ifs_persistency_tracker, &queue_name, entry_idx,
- total_entries, &end_marker, &remove_entries, &has_error, &waiter,
- &entry, &needs_migration_vector,
- push_endpoint = push_endpoint.get(),
- &topic_info](spawn::yield_context yield) {
+ boost::asio::spawn(yield, std::allocator_arg, make_stack_allocator(),
+ [this, ¬ifs_persistency_tracker, &queue_name, entry_idx, total_entries, &end_marker,
+ &remove_entries, &has_error, &waiter, &entry, &needs_migration_vector,
+ push_endpoint = push_endpoint.get(), &topic_info](boost::asio::yield_context yield) {
const auto token = waiter.make_token();
auto& persistency_tracker = notifs_persistency_tracker[entry.marker];
auto result =
ldpp_dout(this, 20) << "INFO: processing of entry: " <<
entry.marker << " (" << entry_idx << "/" << total_entries << ") from: " << queue_name << " failed" << dendl;
}
- }, make_stack_allocator());
+ }, [] (std::exception_ptr eptr) {
+ if (eptr) std::rethrow_exception(eptr);
+ });
++entry_idx;
}
"" /*no tag*/);
cls_2pc_queue_remove_entries(op, end_marker, entries_to_remove);
// check ownership and deleted entries in one batch
- auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield));
+ auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, yield);
if (ret == -ENOENT) {
// queue was deleted
ldpp_dout(this, 5) << "INFO: queue: " << queue_name << ". was removed. processing will stop" << dendl;
rgw_pubsub_topic topic;
auto ret_of_get_topic = ps.get_topic(this, queue_name, topic,
- optional_yield(io_context, yield), nullptr);
+ yield, nullptr);
if (ret_of_get_topic < 0) {
// we can't migrate entries without topic info
ldpp_dout(this, 1) << "ERROR: failed to fetch topic: " << queue_name << " error: "
buffer::list obl;
int rval;
cls_2pc_queue_reserve(op, size_to_migrate, migration_vector.size(), &obl, &rval);
- ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield), librados::OPERATION_RETURNVEC);
+ ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, yield, librados::OPERATION_RETURNVEC);
if (ret < 0) {
ldpp_dout(this, 1) << "ERROR: failed to reserve migration space on queue: " << queue_name << ". error: " << ret << dendl;
return;
}
cls_2pc_queue_commit(op, migration_vector, reservation_id);
- ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield));
+ ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, yield);
reservation_id = cls_2pc_reservation::NO_ID;
if (ret < 0) {
ldpp_dout(this, 1) << "ERROR: failed to commit reservation to queue: " << queue_name << ". error: " << ret << dendl;
// process all queues
// find which of the queues is owned by this daemon and process it
- void process_queues(spawn::yield_context yield) {
+ void process_queues(boost::asio::yield_context yield) {
auto has_error = false;
owned_queues_t owned_queues;
size_t processed_queue_count = 0;
timer.async_wait(yield[ec]);
queues_t queues;
- auto ret = read_queue_list(queues, optional_yield(io_context, yield));
+ auto ret = read_queue_list(queues, yield);
if (ret < 0) {
has_error = true;
continue;
failover_time,
LOCK_FLAG_MAY_RENEW);
- ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield));
+ ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, yield);
if (ret == -EBUSY) {
// lock is already taken by another RGW
ldpp_dout(this, 20) << "INFO: queue: " << queue_name << " owned (locked) by another daemon" << dendl;
if (owned_queues.insert(queue_name).second) {
ldpp_dout(this, 10) << "INFO: queue: " << queue_name << " now owned (locked) by this daemon" << dendl;
// start processing this queue
- spawn::spawn(io_context, [this, &queue_gc, &queue_gc_lock, queue_name, &processed_queue_count](spawn::yield_context yield) {
+ boost::asio::spawn(make_strand(io_context), std::allocator_arg, make_stack_allocator(),
+ [this, &queue_gc, &queue_gc_lock, queue_name, &processed_queue_count](boost::asio::yield_context yield) {
++processed_queue_count;
process_queue(queue_name, yield);
// if queue processing ended, it means that the queue was removed or not owned anymore
queue_gc.push_back(queue_name);
--processed_queue_count;
ldpp_dout(this, 10) << "INFO: queue: " << queue_name << " marked for removal" << dendl;
- }, make_stack_allocator());
+ }, [] (std::exception_ptr eptr) {
+ if (eptr) std::rethrow_exception(eptr);
+ });
} else {
ldpp_dout(this, 20) << "INFO: queue: " << queue_name << " ownership (lock) renewed" << dendl;
}
}
void init() {
- spawn::spawn(io_context, [this](spawn::yield_context yield) {
+ boost::asio::spawn(make_strand(io_context), std::allocator_arg, make_stack_allocator(),
+ [this](boost::asio::yield_context yield) {
process_queues(yield);
- }, make_stack_allocator());
+ }, [] (std::exception_ptr eptr) {
+ if (eptr) std::rethrow_exception(eptr);
+ });
// start the worker threads to do the actual queue processing
const std::string WORKER_THREAD_NAME = "notif-worker";
#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
#include "rgw_kafka.h"
#endif
-#include <boost/asio/yield.hpp>
+//#include <boost/asio/yield.hpp>
#include <boost/algorithm/string.hpp>
#include <functional>
#include "rgw_perf_counters.h"
boost::system::error_code ec;
auto yield = y.get_yield_context();
auto&& token = yield[ec];
- boost::asio::async_initiate<spawn::yield_context, Signature>(
+ boost::asio::async_initiate<boost::asio::yield_context, Signature>(
[this] (auto handler, auto ex) {
completion = Completion::create(ex, std::move(handler));
}, token, yield.get_executor());
using namespace rgw;
if (svc.site->is_meta_master() &&
all_zonegroups_support(*svc.site, zone_features::notification_v2)) {
- spawn::spawn(v1_topic_migration, [this] (spawn::yield_context yield) {
+ boost::asio::spawn(v1_topic_migration, [this] (boost::asio::yield_context yield) {
DoutPrefix dpp{cct, dout_subsys, "v1 topic migration: "};
rgwrados::topic_migration::migrate(&dpp, driver, v1_topic_migration, yield);
+ }, [] (std::exception_ptr eptr) {
+ if (eptr) std::rethrow_exception(eptr);
});
+ // TODO: we run this on a separate thread so shutdown can cancel it with
+ // v1_topic_migration.stop(), but we could run it on the global thread
+ // pool and cancel spawn() with a cancellation_signal instead
v1_topic_migration.start(1);
}
}
}
if (y) {
- auto& context = y.get_io_context();
auto& yield = y.get_yield_context();
- Waiter waiter(context);
+ Waiter waiter(yield.get_executor());
waiters.push_back(waiter);
lock.unlock();
ceph::condition_variable cond;
struct Waiter : boost::intrusive::list_base_hook<> {
- using Executor = boost::asio::io_context::executor_type;
+ using Executor = boost::asio::any_io_executor;
using Timer = boost::asio::basic_waitable_timer<Clock,
boost::asio::wait_traits<Clock>, Executor>;
Timer timer;
- explicit Waiter(boost::asio::io_context& ioc) : timer(ioc) {}
+ explicit Waiter(boost::asio::any_io_executor ex) : timer(ex) {}
};
boost::intrusive::list<Waiter> waiters;
// given a yield_context, call async_operate() to yield the coroutine instead
// of blocking
if (y) {
- auto& context = y.get_io_context();
auto& yield = y.get_yield_context();
boost::system::error_code ec;
auto bl = librados::async_operate(
- context, ioctx, oid, op, flags, trace_info, yield[ec]);
+ yield, ioctx, oid, op, flags, trace_info, yield[ec]);
if (pbl) {
*pbl = std::move(bl);
}
int flags, const jspan_context* trace_info)
{
if (y) {
- auto& context = y.get_io_context();
auto& yield = y.get_yield_context();
boost::system::error_code ec;
- librados::async_operate(context, ioctx, oid, op, flags, trace_info, yield[ec]);
+ librados::async_operate(yield, ioctx, oid, op, flags, trace_info, yield[ec]);
return -ec.value();
}
if (is_asio_thread) {
optional_yield y)
{
if (y) {
- auto& context = y.get_io_context();
auto& yield = y.get_yield_context();
boost::system::error_code ec;
- auto reply = librados::async_notify(context, ioctx, oid,
+ auto reply = librados::async_notify(yield, ioctx, oid,
bl, timeout_ms, yield[ec]);
if (pbl) {
*pbl = std::move(reply);
int migrate(const DoutPrefixProvider* dpp,
rgw::sal::RadosStore* driver,
boost::asio::io_context& context,
- spawn::yield_context yield)
+ boost::asio::yield_context y)
{
- auto y = optional_yield{context, yield};
-
ldpp_dout(dpp, 1) << "starting v1 topic migration.." << dendl;
librados::Rados* rados = driver->getRados()->get_rados_handle();
#pragma once
#include <boost/asio/io_context.hpp>
-#include <spawn/spawn.hpp>
+#include <boost/asio/spawn.hpp>
class DoutPrefixProvider;
namespace rgw::sal { class RadosStore; }
int migrate(const DoutPrefixProvider* dpp,
rgw::sal::RadosStore* driver,
boost::asio::io_context& context,
- spawn::yield_context yield);
+ boost::asio::yield_context yield);
} // rgwrados::topic_migration
template <typename Op>
Aio::OpFunc aio_abstract(librados::IoCtx ctx, Op&& op,
- boost::asio::io_context& context,
- spawn::yield_context yield, jspan_context* trace_ctx = nullptr) {
- return [ctx = std::move(ctx), op = std::move(op), &context, yield, trace_ctx] (Aio* aio, AioResult& r) mutable {
+ boost::asio::yield_context yield,
+ jspan_context* trace_ctx) {
+ return [ctx = std::move(ctx), op = std::move(op), yield, trace_ctx] (Aio* aio, AioResult& r) mutable {
// arrange for the completion Handler to run on the yield_context's strand
// executor so it can safely call back into Aio without locking
- using namespace boost::asio;
- async_completion<spawn::yield_context, void()> init(yield);
- auto ex = get_associated_executor(init.completion_handler);
+ auto ex = yield.get_executor();
- librados::async_operate(context, ctx, r.obj.oid, &op, 0, trace_ctx,
+ librados::async_operate(yield, ctx, r.obj.oid, &op, 0, trace_ctx,
bind_executor(ex, Handler{aio, ctx, r}));
};
}
ceph_assert(y);
auto c = std::make_unique<D3nL1CacheRequest>();
lsubdout(g_ceph_context, rgw_datacache, 20) << "D3nDataCache: d3n_cache_aio_abstract(): libaio Read From Cache, oid=" << r.obj.oid << dendl;
- c->file_aio_read_abstract(dpp, y.get_io_context(), y.get_yield_context(), cache_location, read_ofs, read_len, aio, r);
+ c->file_aio_read_abstract(dpp, y.get_yield_context(), cache_location, read_ofs, read_len, aio, r);
};
}
static_assert(!std::is_const_v<Op>);
if (y) {
return aio_abstract(std::move(ctx), std::forward<Op>(op),
- y.get_io_context(), y.get_yield_context(), trace_ctx);
+ y.get_yield_context(), trace_ctx);
}
return aio_abstract(std::move(ctx), std::forward<Op>(op), trace_ctx);
}
// a throttle that yields the coroutine instead of blocking. all public
// functions must be called within the coroutine strand
class YieldingAioThrottle final : public Aio, private Throttle {
- boost::asio::io_context& context;
- spawn::yield_context yield;
+ boost::asio::yield_context yield;
struct Handler;
// completion callback associated with the waiter
struct Pending : AioResultEntry { uint64_t cost = 0; };
public:
- YieldingAioThrottle(uint64_t window, boost::asio::io_context& context,
- spawn::yield_context yield)
- : Throttle(window), context(context), yield(yield)
+ YieldingAioThrottle(uint64_t window, boost::asio::yield_context yield)
+ : Throttle(window), yield(yield)
{}
virtual ~YieldingAioThrottle() override {};
std::unique_ptr<Aio> aio;
if (y) {
aio = std::make_unique<YieldingAioThrottle>(window_size,
- y.get_io_context(),
y.get_yield_context());
} else {
aio = std::make_unique<BlockingAioThrottle>(window_size);
#include <atomic>
#include <ctime>
+#include <memory>
#include <vector>
#include <boost/asio/error.hpp>
#include <boost/smart_ptr/intrusive_ref_counter.hpp>
#include <boost/context/protected_fixedsize_stack.hpp>
-#include <spawn/spawn.hpp>
+#include <boost/asio/spawn.hpp>
#include "common/async/shared_mutex.h"
#include "common/errno.h"
CephContext* const cct;
Stream& stream;
timeout_timer& timeout;
- spawn::yield_context yield;
+ boost::asio::yield_context yield;
parse_buffer& buffer;
boost::system::error_code fatal_ec;
public:
StreamIO(CephContext *cct, Stream& stream, timeout_timer& timeout,
- rgw::asio::parser_type& parser, spawn::yield_context yield,
+ rgw::asio::parser_type& parser, boost::asio::yield_context yield,
parse_buffer& buffer, bool is_ssl,
const tcp::endpoint& local_endpoint,
const tcp::endpoint& remote_endpoint)
rgw::dmclock::Scheduler *scheduler,
const std::string& uri_prefix,
boost::system::error_code& ec,
- spawn::yield_context yield)
+ boost::asio::yield_context yield)
{
// don't impose a limit on the body, since we read it in pieces
static constexpr size_t body_limit = std::numeric_limits<size_t>::max();
RGWRestfulIO client(cct, &real_client_io);
optional_yield y = null_yield;
if (cct->_conf->rgw_beast_enable_async) {
- y = optional_yield{context, yield};
+ y = optional_yield{yield};
}
int http_ret = 0;
string user = "-";
// spawn a coroutine to handle the connection
#ifdef WITH_RADOSGW_BEAST_OPENSSL
if (l.use_ssl) {
- spawn::spawn(context,
- [this, s=std::move(stream)] (spawn::yield_context yield) mutable {
+ boost::asio::spawn(make_strand(context), std::allocator_arg, make_stack_allocator(),
+ [this, s=std::move(stream)] (boost::asio::yield_context yield) mutable {
auto conn = boost::intrusive_ptr{new Connection(std::move(s))};
auto c = connections.add(*conn);
// wrap the tcp stream in an ssl stream
}
conn->socket.shutdown(tcp::socket::shutdown_both, ec);
- }, make_stack_allocator());
+ }, [] (std::exception_ptr eptr) {
+ if (eptr) std::rethrow_exception(eptr);
+ });
} else {
#else
{
#endif // WITH_RADOSGW_BEAST_OPENSSL
- spawn::spawn(context,
- [this, s=std::move(stream)] (spawn::yield_context yield) mutable {
+ boost::asio::spawn(make_strand(context), std::allocator_arg, make_stack_allocator(),
+ [this, s=std::move(stream)] (boost::asio::yield_context yield) mutable {
auto conn = boost::intrusive_ptr{new Connection(std::move(s))};
auto c = connections.add(*conn);
auto timeout = timeout_timer{context.get_executor(), request_timeout, conn};
conn->buffer, false, pause_mutex, scheduler.get(),
uri_prefix, ec, yield);
conn->socket.shutdown(tcp::socket::shutdown_both, ec);
- }, make_stack_allocator());
+ }, [] (std::exception_ptr eptr) {
+ if (eptr) std::rethrow_exception(eptr);
+ });
}
}
#include <stdlib.h>
#include <aio.h>
+#include <boost/asio/spawn.hpp>
+
#include "include/rados/librados.hpp"
#include "include/Context.h"
#include "common/async/completion.h"
}
};
- void file_aio_read_abstract(const DoutPrefixProvider *dpp, spawn::yield_context yield,
+ void file_aio_read_abstract(const DoutPrefixProvider *dpp, boost::asio::yield_context yield,
std::string& cache_location, off_t read_ofs, off_t read_len,
rgw::Aio* aio, rgw::AioResult& r) {
- using namespace boost::asio;
- async_completion<spawn::yield_context, void()> init(yield);
- auto ex = get_associated_executor(init.completion_handler);
-
+ auto ex = yield.get_executor();
ldpp_dout(dpp, 20) << "D3nDataCache: " << __func__ << "(): oid=" << r.obj.oid << dendl;
async_read(dpp, ex, cache_location+"/"+url_encode(r.obj.oid, true), read_ofs, read_len, bind_executor(ex, d3n_libaio_handler{aio, r}));
}
char* buf;
std::optional<boost::asio::deadline_timer> formatter_flush_cond;
if (y) {
- formatter_flush_cond = std::make_optional<boost::asio::deadline_timer>(y.get_io_context());
+ auto ex = y.get_yield_context().get_executor();
+ formatter_flush_cond = std::make_optional<boost::asio::deadline_timer>(ex);
}
buf = data.c_str();
return aio_count < max_aio;
});
aio_count++;
- spawn::spawn(y.get_yield_context(), [this, &y, &aio_count, obj_key, &formatter_flush_cond] (spawn::yield_context yield) {
- handle_individual_object(obj_key, optional_yield { y.get_io_context(), yield }, &*formatter_flush_cond);
+ boost::asio::spawn(y.get_yield_context(), [this, &aio_count, obj_key, &formatter_flush_cond] (boost::asio::yield_context yield) {
+ handle_individual_object(obj_key, yield, &*formatter_flush_cond);
aio_count--;
+ }, [] (std::exception_ptr eptr) {
+ if (eptr) std::rethrow_exception(eptr);
});
} else {
handle_individual_object(obj_key, y, nullptr);
entry.pipe = pipe;
// fetch remote markers
- spawn::spawn(ioctx, [&] (spawn::yield_context yield) {
- auto y = optional_yield{ioctx, yield};
+ boost::asio::spawn(ioctx, [&] (boost::asio::yield_context yield) {
+ auto y = optional_yield{yield};
rgw_bucket_index_marker_info info;
int r = source_bilog_info(dpp, store->svc()->zone, entry.pipe,
info, entry.remote_markers, y);
throw std::system_error(-r, std::system_category());
}
entry.latest_gen = info.latest_gen;
+ }, [] (std::exception_ptr eptr) {
+ if (eptr) std::rethrow_exception(eptr);
});
// fetch source bucket info
- spawn::spawn(ioctx, [&] (spawn::yield_context yield) {
- auto y = optional_yield{ioctx, yield};
+ boost::asio::spawn(ioctx, [&] (boost::asio::yield_context yield) {
+ auto y = optional_yield{yield};
int r = store->getRados()->get_bucket_instance_info(
*entry.pipe.source.bucket, entry.source_bucket_info,
nullptr, nullptr, y, dpp);
<< cpp_strerror(r) << dendl;
throw std::system_error(-r, std::system_category());
}
+ }, [] (std::exception_ptr eptr) {
+ if (eptr) std::rethrow_exception(eptr);
});
}
${LUA_LIBRARIES}
${ALLOC_LIBS}
)
- target_link_libraries(ceph_test_librgw_file_nfsns spawn)
+ target_link_libraries(ceph_test_librgw_file_nfsns Boost::context)
install(TARGETS ceph_test_librgw_file_nfsns DESTINATION ${CMAKE_INSTALL_BINDIR})
# ceph_test_librgw_file_aw (nfs write transaction [atomic write] tests)
${LUA_LIBRARIES}
${ALLOC_LIBS}
)
- target_link_libraries(ceph_test_librgw_file_marker spawn)
+ target_link_libraries(ceph_test_librgw_file_marker Boost::context)
install(TARGETS ceph_test_librgw_file_marker DESTINATION ${CMAKE_INSTALL_BINDIR})
# ceph_test_librgw_file_xattr (attribute ops)
${LUA_LIBRARIES}
${ALLOC_LIBS}
)
-target_link_libraries(ceph_test_librgw_file_xattr spawn)
+target_link_libraries(ceph_test_librgw_file_xattr Boost::context)
# ceph_test_librgw_file_rename (mv/rename tests)
add_executable(ceph_test_librgw_file_rename
add_executable(ceph_test_rados_api_asio asio.cc)
target_link_libraries(ceph_test_rados_api_asio global
- librados ${UNITTEST_LIBS} spawn)
+ librados ${UNITTEST_LIBS} Boost::context)
add_executable(ceph_test_rados_api_list
list.cc
PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw")
target_link_libraries(ceph_test_rados_api_tier_pp
librados global ${UNITTEST_LIBS} Boost::system radostest-cxx cls_cas_internal
- cls_cas_client spawn)
+ cls_cas_client Boost::context)
add_executable(ceph_test_rados_api_snapshots
snapshots.cc)
#include <boost/range/begin.hpp>
#include <boost/range/end.hpp>
-#include <spawn/spawn.hpp>
#include <boost/asio/io_context.hpp>
+#include <boost/asio/spawn.hpp>
#include <boost/asio/use_future.hpp>
#define dout_subsys ceph_subsys_rados
librados::IoCtx AsioRados::io;
librados::IoCtx AsioRados::snapio;
+void rethrow(std::exception_ptr eptr) {
+ if (eptr) std::rethrow_exception(eptr);
+}
+
TEST_F(AsioRados, AsyncReadCallback)
{
boost::asio::io_context service;
{
boost::asio::io_context service;
- auto success_cr = [&] (spawn::yield_context yield) {
+ auto success_cr = [&] (boost::asio::yield_context yield) {
boost::system::error_code ec;
auto bl = librados::async_read(service, io, "exist", 256, 0, yield[ec]);
EXPECT_FALSE(ec);
EXPECT_EQ("hello", bl.to_str());
};
- spawn::spawn(service, success_cr);
+ boost::asio::spawn(service, success_cr, rethrow);
- auto failure_cr = [&] (spawn::yield_context yield) {
+ auto failure_cr = [&] (boost::asio::yield_context yield) {
boost::system::error_code ec;
auto bl = librados::async_read(service, io, "noexist", 256, 0, yield[ec]);
EXPECT_EQ(boost::system::errc::no_such_file_or_directory, ec);
};
- spawn::spawn(service, failure_cr);
+ boost::asio::spawn(service, failure_cr, rethrow);
service.run();
}
bufferlist bl;
bl.append("hello");
- auto success_cr = [&] (spawn::yield_context yield) {
+ auto success_cr = [&] (boost::asio::yield_context yield) {
boost::system::error_code ec;
librados::async_write(service, io, "exist", bl, bl.length(), 0,
yield[ec]);
EXPECT_FALSE(ec);
EXPECT_EQ("hello", bl.to_str());
};
- spawn::spawn(service, success_cr);
+ boost::asio::spawn(service, success_cr, rethrow);
- auto failure_cr = [&] (spawn::yield_context yield) {
+ auto failure_cr = [&] (boost::asio::yield_context yield) {
boost::system::error_code ec;
librados::async_write(service, snapio, "exist", bl, bl.length(), 0,
yield[ec]);
EXPECT_EQ(boost::system::errc::read_only_file_system, ec);
};
- spawn::spawn(service, failure_cr);
+ boost::asio::spawn(service, failure_cr, rethrow);
service.run();
}
{
boost::asio::io_context service;
- auto success_cr = [&] (spawn::yield_context yield) {
+ auto success_cr = [&] (boost::asio::yield_context yield) {
librados::ObjectReadOperation op;
op.read(0, 0, nullptr, nullptr);
boost::system::error_code ec;
EXPECT_FALSE(ec);
EXPECT_EQ("hello", bl.to_str());
};
- spawn::spawn(service, success_cr);
+ boost::asio::spawn(service, success_cr, rethrow);
- auto failure_cr = [&] (spawn::yield_context yield) {
+ auto failure_cr = [&] (boost::asio::yield_context yield) {
librados::ObjectReadOperation op;
op.read(0, 0, nullptr, nullptr);
boost::system::error_code ec;
yield[ec]);
EXPECT_EQ(boost::system::errc::no_such_file_or_directory, ec);
};
- spawn::spawn(service, failure_cr);
+ boost::asio::spawn(service, failure_cr, rethrow);
service.run();
}
bufferlist bl;
bl.append("hello");
- auto success_cr = [&] (spawn::yield_context yield) {
+ auto success_cr = [&] (boost::asio::yield_context yield) {
librados::ObjectWriteOperation op;
op.write_full(bl);
boost::system::error_code ec;
librados::async_operate(service, io, "exist", &op, 0, nullptr, yield[ec]);
EXPECT_FALSE(ec);
};
- spawn::spawn(service, success_cr);
+ boost::asio::spawn(service, success_cr, rethrow);
- auto failure_cr = [&] (spawn::yield_context yield) {
+ auto failure_cr = [&] (boost::asio::yield_context yield) {
librados::ObjectWriteOperation op;
op.write_full(bl);
boost::system::error_code ec;
librados::async_operate(service, snapio, "exist", &op, 0, nullptr, yield[ec]);
EXPECT_EQ(boost::system::errc::read_only_file_system, ec);
};
- spawn::spawn(service, failure_cr);
+ boost::asio::spawn(service, failure_cr, rethrow);
service.run();
}
${UNITTEST_LIBS}
${EXTRALIBS}
)
- target_link_libraries(ceph_test_rgw_d4n_directory PRIVATE spawn)
install(TARGETS ceph_test_rgw_d4n_directory DESTINATION ${CMAKE_INSTALL_BINDIR})
endif()
${UNITTEST_LIBS}
${EXTRALIBS}
)
- target_link_libraries(ceph_test_rgw_d4n_filter PRIVATE spawn)
+ target_link_libraries(ceph_test_rgw_d4n_filter PRIVATE Boost::context)
install(TARGETS ceph_test_rgw_d4n_filter DESTINATION ${CMAKE_INSTALL_BINDIR})
endif()
#include <boost/asio/io_context.hpp>
#include <boost/asio/executor_work_guard.hpp>
#include <boost/asio/steady_timer.hpp>
-#include <spawn/spawn.hpp>
+#include <boost/asio/spawn.hpp>
#include <chrono>
#include <mutex>
#include <unordered_map>
std::shared_ptr<std::vector<client_info>> ds = std::make_shared<std::vector<client_info>>(std::vector<client_info>());
std::string method[2] = {"PUT", "GET"};
-void simulate_transfer(client_info& it, const RGWRateLimitInfo* info, std::shared_ptr<RateLimiter> ratelimit, const parameters& params, spawn::yield_context& yield, boost::asio::io_context& ioctx)
+void simulate_transfer(client_info& it, const RGWRateLimitInfo* info, std::shared_ptr<RateLimiter> ratelimit, const parameters& params, boost::asio::yield_context& yield, boost::asio::io_context& ioctx)
{
auto dout = DoutPrefix(g_ceph_context, ceph_subsys_rgw, "rate limiter: ");
boost::asio::steady_timer timer(ioctx);
it.accepted++;
return false;
}
-void simulate_client(client_info& it, const RGWRateLimitInfo& info, std::shared_ptr<RateLimiter> ratelimit, const parameters& params, spawn::yield_context& ctx, bool& to_run, boost::asio::io_context& ioctx)
+void simulate_client(client_info& it, const RGWRateLimitInfo& info, std::shared_ptr<RateLimiter> ratelimit, const parameters& params, boost::asio::yield_context& ctx, bool& to_run, boost::asio::io_context& ioctx)
{
for (;;)
{
auto& it = ds->emplace_back(client_info());
it.tenant = tenant;
int x = ds->size() - 1;
- spawn::spawn(context,
- [&to_run ,x, ratelimit, info, params, &context](spawn::yield_context ctx)
+ boost::asio::spawn(context,
+ [&to_run ,x, ratelimit, info, params, &context](boost::asio::yield_context ctx)
{
auto& it = ds.get()->operator[](x);
simulate_client(it, info, ratelimit, params, ctx, to_run, context);
+ }, [] (std::exception_ptr eptr) {
+ if (eptr) std::rethrow_exception(eptr);
});
}
}
#include "rgw_dmclock_async_scheduler.h"
#include <optional>
-#include <spawn/spawn.hpp>
+#include <boost/asio/spawn.hpp>
#include <gtest/gtest.h>
#include "acconfig.h"
#include "global/global_context.h"
{
boost::asio::io_context context;
- spawn::spawn(context, [&] (spawn::yield_context yield) {
+ boost::asio::spawn(context, [&] (boost::asio::yield_context yield) {
ClientCounters counters(g_ceph_context);
AsyncScheduler queue(g_ceph_context, context, std::ref(counters), nullptr,
[] (client_id client) -> ClientInfo* {
auto p2 = queue.async_request(client_id::auth, {}, get_time(), 1, yield[ec2]);
EXPECT_EQ(boost::system::errc::success, ec2);
EXPECT_EQ(PhaseType::priority, p2);
+ }, [] (std::exception_ptr eptr) {
+ if (eptr) std::rethrow_exception(eptr);
});
context.run_for(std::chrono::milliseconds(50));
*/
#include "rgw_reshard.h"
-#include <spawn/spawn.hpp>
+#include <boost/asio/spawn.hpp>
#include <gtest/gtest.h>
short_waiter.stop();
}
+void rethrow(std::exception_ptr eptr) {
+ if (eptr) std::rethrow_exception(eptr);
+}
+
TEST(ReshardWait, wait_yield)
{
constexpr ceph::timespan wait_duration = 50ms;
RGWReshardWait waiter(wait_duration);
boost::asio::io_context context;
- spawn::spawn(context, [&] (spawn::yield_context yield) {
- EXPECT_EQ(0, waiter.wait(optional_yield{context, yield}));
- });
+ boost::asio::spawn(context, [&] (boost::asio::yield_context yield) {
+ EXPECT_EQ(0, waiter.wait(yield));
+ }, rethrow);
const auto start = Clock::now();
EXPECT_EQ(1u, context.poll()); // spawn
RGWReshardWait short_waiter(short_duration);
boost::asio::io_context context;
- spawn::spawn(context,
- [&] (spawn::yield_context yield) {
- EXPECT_EQ(-ECANCELED, long_waiter.wait(optional_yield{context, yield}));
- });
+ boost::asio::spawn(context,
+ [&] (boost::asio::yield_context yield) {
+ EXPECT_EQ(-ECANCELED, long_waiter.wait(yield));
+ }, rethrow);
const auto start = Clock::now();
EXPECT_EQ(1u, context.poll()); // spawn
// spawn 4 coroutines
boost::asio::io_context context;
{
- auto async_waiter = [&] (spawn::yield_context yield) {
- EXPECT_EQ(-ECANCELED, long_waiter.wait(optional_yield{context, yield}));
+ auto async_waiter = [&] (boost::asio::yield_context yield) {
+ EXPECT_EQ(-ECANCELED, long_waiter.wait(yield));
};
- spawn::spawn(context, async_waiter);
- spawn::spawn(context, async_waiter);
- spawn::spawn(context, async_waiter);
- spawn::spawn(context, async_waiter);
+ boost::asio::spawn(context, async_waiter, rethrow);
+ boost::asio::spawn(context, async_waiter, rethrow);
+ boost::asio::spawn(context, async_waiter, rethrow);
+ boost::asio::spawn(context, async_waiter, rethrow);
}
const auto start = Clock::now();
#include <boost/asio/error.hpp>
#include <boost/asio/executor_work_guard.hpp>
#include <boost/asio/io_context.hpp>
+#include <boost/asio/spawn.hpp>
#include "include/scope_guard.h"
-#include <spawn/spawn.hpp>
#include <gtest/gtest.h>
static rgw_raw_obj make_obj(const std::string& oid)
auto obj = make_obj(__PRETTY_FUNCTION__);
boost::asio::io_context context;
- spawn::spawn(context,
- [&] (spawn::yield_context yield) {
- YieldingAioThrottle throttle(4, context, yield);
+ boost::asio::spawn(context,
+ [&] (boost::asio::yield_context yield) {
+ YieldingAioThrottle throttle(4, yield);
scoped_completion op;
auto c = throttle.get(obj, wait_on(op), 8, 0);
ASSERT_EQ(1u, c.size());
EXPECT_EQ(-EDEADLK, c.front().result);
+ }, [] (std::exception_ptr eptr) {
+ if (eptr) std::rethrow_exception(eptr);
});
context.run();
}
uint64_t outstanding = 0;
boost::asio::io_context context;
- spawn::spawn(context,
- [&] (spawn::yield_context yield) {
- YieldingAioThrottle throttle(window, context, yield);
+ boost::asio::spawn(context,
+ [&] (boost::asio::yield_context yield) {
+ YieldingAioThrottle throttle(window, yield);
for (uint64_t i = 0; i < total; i++) {
using namespace std::chrono_literals;
auto c = throttle.get(obj, wait_for(context, 10ms), 1, 0);
}
auto c = throttle.drain();
outstanding -= c.size();
+ }, [] (std::exception_ptr eptr) {
+ if (eptr) std::rethrow_exception(eptr);
});
context.poll(); // run until we block
EXPECT_EQ(window, outstanding);