From cdbe2c1bb4731395423ab77f3edd2ee4bd053148 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Wed, 14 Feb 2024 22:53:47 -0500 Subject: [PATCH] rgw: switch back to boost::asio for spawn() and yield_context a fork of boost::asio::spawn() was introduced in 2020 with spawn::spawn() from #31580. this fork enabled rgw to customize how the coroutine stacks are allocated in order to avoid stack overflows in frontend request coroutines. this customization was based on a StackAllocator concept from the boost::context library in boost 1.80, that same StackAllocator overload was added to boost::asio::spawn(), along with other improvements like per-op cancellation. now that boost has everything we need, switch back and drop the spawn submodule this required switching a lot of async functions from async_completion<> to async_initiate<>. similar changes were necessary to enable the c++20 coroutine token boost::asio::use_awaitable Signed-off-by: Casey Bodley (cherry picked from commit 9dd892e289b32a90b24d55ab8e1b7d7601af21ca) --- src/cls/CMakeLists.txt | 15 ++--- src/common/async/yield_context.h | 22 ++----- src/crypto/isa-l/CMakeLists.txt | 2 +- src/crypto/openssl/CMakeLists.txt | 2 +- src/crypto/qat/CMakeLists.txt | 2 +- src/crypto/qat/qcccrypto.cc | 4 +- src/rgw/CMakeLists.txt | 9 +-- src/rgw/driver/dbstore/CMakeLists.txt | 2 +- src/rgw/driver/rados/cls_fifo_legacy.h | 2 +- src/rgw/driver/rados/rgw_bucket.cc | 13 ++-- src/rgw/driver/rados/rgw_notify.cc | 74 ++++++++++++---------- src/rgw/driver/rados/rgw_pubsub_push.cc | 4 +- src/rgw/driver/rados/rgw_rados.cc | 7 +- src/rgw/driver/rados/rgw_reshard.cc | 3 +- src/rgw/driver/rados/rgw_reshard.h | 4 +- src/rgw/driver/rados/rgw_tools.cc | 9 +-- src/rgw/driver/rados/topic_migration.cc | 4 +- src/rgw/driver/rados/topic_migration.h | 4 +- src/rgw/rgw_aio.cc | 16 ++--- src/rgw/rgw_aio_throttle.h | 9 +-- src/rgw/rgw_asio_frontend.cc | 27 ++++---- src/rgw/rgw_d3n_cacherequest.h | 9 ++- src/rgw/rgw_op.cc | 9 ++- src/rgw/rgw_sync_checkpoint.cc | 12 ++-- src/test/CMakeLists.txt | 6 +- src/test/librados/CMakeLists.txt | 4 +- src/test/librados/asio.cc | 38 ++++++----- src/test/rgw/CMakeLists.txt | 3 +- src/test/rgw/bench_rgw_ratelimit.cc | 12 ++-- src/test/rgw/test_rgw_dmclock_scheduler.cc | 6 +- src/test/rgw/test_rgw_reshard_wait.cc | 32 ++++++---- src/test/rgw/test_rgw_throttle.cc | 18 ++++-- 32 files changed, 202 insertions(+), 181 deletions(-) diff --git a/src/cls/CMakeLists.txt b/src/cls/CMakeLists.txt index af2249adeae..8bfeadd529e 100644 --- a/src/cls/CMakeLists.txt +++ b/src/cls/CMakeLists.txt @@ -76,8 +76,7 @@ if (WITH_RADOSGW) target_link_libraries(cls_otp OATH::OATH) target_include_directories(cls_otp PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw/driver/rados" - PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw" - PUBLIC "${CMAKE_SOURCE_DIR}/src/spawn/include") + PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw") set_target_properties(cls_otp PROPERTIES VERSION "1.0.0" SOVERSION "1" @@ -204,8 +203,7 @@ if (WITH_RADOSGW) target_link_libraries(cls_rgw ${FMT_LIB} json_spirit) target_include_directories(cls_rgw PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw/driver/rados" - PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw" - PUBLIC "${CMAKE_SOURCE_DIR}/src/spawn/include") + PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw") set_target_properties(cls_rgw PROPERTIES VERSION "1.0.0" SOVERSION "1" @@ -220,8 +218,7 @@ if (WITH_RADOSGW) add_library(cls_rgw_client STATIC ${cls_rgw_client_srcs}) target_include_directories(cls_rgw_client PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw/driver/rados" - PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw" - PUBLIC "${CMAKE_SOURCE_DIR}/src/spawn/include") + PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw") endif (WITH_RADOSGW) @@ -313,8 +310,7 @@ if (WITH_RADOSGW) add_library(cls_rgw_gc SHARED ${cls_rgw_gc_srcs}) target_include_directories(cls_rgw_gc PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw/driver/rados" - PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw" - PUBLIC "${CMAKE_SOURCE_DIR}/src/spawn/include") + PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw") set_target_properties(cls_rgw_gc PROPERTIES VERSION "1.0.0" SOVERSION "1" @@ -328,8 +324,7 @@ if (WITH_RADOSGW) add_library(cls_rgw_gc_client STATIC ${cls_rgw_gc_client_srcs}) target_include_directories(cls_rgw_gc_client PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw/driver/rados" - PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw" - PUBLIC "${CMAKE_SOURCE_DIR}/src/spawn/include") + PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw") endif (WITH_RADOSGW) diff --git a/src/common/async/yield_context.h b/src/common/async/yield_context.h index baa028fa1b4..fd9a20901aa 100644 --- a/src/common/async/yield_context.h +++ b/src/common/async/yield_context.h @@ -17,23 +17,18 @@ #include #include #include +#include #include "acconfig.h" -#include - -/// optional-like wrapper for a spawn::yield_context and its associated -/// boost::asio::io_context. operations that take an optional_yield argument -/// will, when passed a non-empty yield context, suspend this coroutine instead -/// of the blocking the thread of execution +/// optional-like wrapper for a boost::asio::yield_context. operations that take +/// an optional_yield argument will, when passed a non-empty yield context, +/// suspend this coroutine instead of the blocking the thread of execution class optional_yield { - boost::asio::io_context *c = nullptr; - spawn::yield_context *y = nullptr; + boost::asio::yield_context *y = nullptr; public: /// construct with a valid io and yield_context - explicit optional_yield(boost::asio::io_context& c, - spawn::yield_context& y) noexcept - : c(&c), y(&y) {} + optional_yield(boost::asio::yield_context& y) noexcept : y(&y) {} /// type tag to construct an empty object struct empty_t {}; @@ -42,11 +37,8 @@ class optional_yield { /// implicit conversion to bool, returns true if non-empty operator bool() const noexcept { return y; } - /// return a reference to the associated io_context. only valid if non-empty - boost::asio::io_context& get_io_context() const noexcept { return *c; } - /// return a reference to the yield_context. only valid if non-empty - spawn::yield_context& get_yield_context() const noexcept { return *y; } + boost::asio::yield_context& get_yield_context() const noexcept { return *y; } }; // type tag object to construct an empty optional_yield diff --git a/src/crypto/isa-l/CMakeLists.txt b/src/crypto/isa-l/CMakeLists.txt index c8d832247d9..40da7e495c3 100644 --- a/src/crypto/isa-l/CMakeLists.txt +++ b/src/crypto/isa-l/CMakeLists.txt @@ -30,7 +30,7 @@ endif(HAVE_NASM_X64) add_library(ceph_crypto_isal SHARED ${isal_crypto_plugin_srcs}) target_include_directories(ceph_crypto_isal PRIVATE ${isal_dir}/include) -target_link_libraries(ceph_crypto_isal PRIVATE spawn) +target_link_libraries(ceph_crypto_isal PRIVATE Boost::context) set_target_properties(ceph_crypto_isal PROPERTIES VERSION 1.0.0 diff --git a/src/crypto/openssl/CMakeLists.txt b/src/crypto/openssl/CMakeLists.txt index 5365ab9a6ca..ac9d8689396 100644 --- a/src/crypto/openssl/CMakeLists.txt +++ b/src/crypto/openssl/CMakeLists.txt @@ -8,7 +8,7 @@ add_library(ceph_crypto_openssl SHARED ${openssl_crypto_plugin_srcs}) target_link_libraries(ceph_crypto_openssl PRIVATE OpenSSL::Crypto $<$:ceph-common> - spawn) + Boost::context) target_include_directories(ceph_crypto_openssl PRIVATE ${OPENSSL_INCLUDE_DIR}) add_dependencies(crypto_plugins ceph_crypto_openssl) set_target_properties(ceph_crypto_openssl PROPERTIES INSTALL_RPATH "") diff --git a/src/crypto/qat/CMakeLists.txt b/src/crypto/qat/CMakeLists.txt index 04bc0b7e7f4..85f7ff50e13 100644 --- a/src/crypto/qat/CMakeLists.txt +++ b/src/crypto/qat/CMakeLists.txt @@ -14,7 +14,7 @@ add_dependencies(crypto_plugins ceph_crypto_qat) target_link_libraries(ceph_crypto_qat PRIVATE QAT::qat QAT::usdm - spawn) + Boost::context) add_dependencies(crypto_plugins ceph_crypto_qat) set_target_properties(ceph_crypto_qat PROPERTIES VERSION 1.0.0 SOVERSION 1) diff --git a/src/crypto/qat/qcccrypto.cc b/src/crypto/qat/qcccrypto.cc index d441e2cd616..94c98518a90 100644 --- a/src/crypto/qat/qcccrypto.cc +++ b/src/crypto/qat/qcccrypto.cc @@ -331,7 +331,7 @@ bool QccCrypto::perform_op_batch(unsigned char* out, const unsigned char* in, si int avail_inst = NON_INSTANCE; if (y) { - spawn::yield_context yield = y.get_yield_context(); + boost::asio::yield_context yield = y.get_yield_context(); avail_inst = async_get_instance(yield); } else { auto result = async_get_instance(boost::asio::use_future); @@ -546,7 +546,7 @@ bool QccCrypto::symPerformOp(int avail_inst, do { poll_retry_num = RETRY_MAX_NUM; if (y) { - spawn::yield_context yield = y.get_yield_context(); + boost::asio::yield_context yield = y.get_yield_context(); status = helper.async_perform_op(std::span(pOpDataVec), yield); } else { auto result = helper.async_perform_op(std::span(pOpDataVec), boost::asio::use_future); diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt index f97b12e81da..b090b2eeeb8 100644 --- a/src/rgw/CMakeLists.txt +++ b/src/rgw/CMakeLists.txt @@ -297,8 +297,9 @@ target_link_libraries(rgw_common PUBLIC ${LUA_LIBRARIES} RapidJSON::RapidJSON - spawn - ${FMT_LIB}) + Boost::context + ${FMT_LIB} + OpenSSL::SSL) target_include_directories(rgw_common PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw/services" PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw" @@ -432,7 +433,7 @@ target_link_libraries(rgw_a OATH::OATH PUBLIC rgw_common - spawn) + Boost::context) if(WITH_CURL_OPENSSL) # used by rgw_http_client_curl.cc @@ -448,7 +449,7 @@ set(rgw_schedulers_srcs add_library(rgw_schedulers STATIC ${rgw_schedulers_srcs}) target_link_libraries(rgw_schedulers - PUBLIC dmclock::dmclock spawn) + PUBLIC dmclock::dmclock Boost::context) set(radosgw_srcs rgw_main.cc) diff --git a/src/rgw/driver/dbstore/CMakeLists.txt b/src/rgw/driver/dbstore/CMakeLists.txt index a3aca7a64e4..f401c912f67 100644 --- a/src/rgw/driver/dbstore/CMakeLists.txt +++ b/src/rgw/driver/dbstore/CMakeLists.txt @@ -29,7 +29,7 @@ target_include_directories(dbstore_lib PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw" PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw/store/rados" PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}") -set(link_targets spawn) +set(link_targets Boost::context) if(WITH_JAEGER) list(APPEND link_targets jaeger_base) endif() diff --git a/src/rgw/driver/rados/cls_fifo_legacy.h b/src/rgw/driver/rados/cls_fifo_legacy.h index ed23129eb30..85e8f539975 100644 --- a/src/rgw/driver/rados/cls_fifo_legacy.h +++ b/src/rgw/driver/rados/cls_fifo_legacy.h @@ -89,7 +89,7 @@ using part_info = fifo::part_header; /// /// This library uses optional_yield. Please see /// /src/common/async/yield_context.h. In summary, optional_yield -/// contains either a spawn::yield_context (in which case the current +/// contains either a boost::asio::yield_context (in which case the current /// coroutine is suspended until completion) or null_yield (in which /// case the current thread is blocked until completion.) /// diff --git a/src/rgw/driver/rados/rgw_bucket.cc b/src/rgw/driver/rados/rgw_bucket.cc index c51e61a2755..b661b564244 100644 --- a/src/rgw/driver/rados/rgw_bucket.cc +++ b/src/rgw/driver/rados/rgw_bucket.cc @@ -588,14 +588,14 @@ int RGWBucket::check_index_olh(rgw::sal::RadosStore* const rados_store, const int max_aio = std::max(1, op_state.get_max_aio()); for (int i=0; i= max_shards) { return; } - optional_yield y(context, yield); + optional_yield y(yield); uint64_t shard_count; int r = ::check_index_olh(rados_store, &*bucket, dpp, op_state, flusher, shard, &shard_count, y); if (r < 0) { @@ -608,6 +608,8 @@ int RGWBucket::check_index_olh(rgw::sal::RadosStore* const rados_store, " entries " << verb << ")" << dendl; } } + }, [] (std::exception_ptr eptr) { + if (eptr) std::rethrow_exception(eptr); }); } try { @@ -797,7 +799,7 @@ int RGWBucket::check_index_unlinked(rgw::sal::RadosStore* const rados_store, int next_shard = 0; boost::asio::io_context context; for (int i=0; i #include #include +#include #include -#include #include "include/function2.hpp" #include "rgw_sal_rados.h" #include "rgw_pubsub.h" @@ -168,7 +168,7 @@ private: pending_tokens(0), timer(io_context) {} - void async_wait(spawn::yield_context yield) { + void async_wait(boost::asio::yield_context yield) { if (pending_tokens == 0) { return; } @@ -196,7 +196,7 @@ private: const cls_queue_entry& entry, RGWPubSubEndpoint* const push_endpoint, const rgw_pubsub_topic& topic, - spawn::yield_context yield) { + boost::asio::yield_context yield) { event_entry_t event_entry; auto iter = entry.data.cbegin(); try { @@ -246,7 +246,7 @@ private: << " retry_number: " << entry_persistency_tracker.retires_num << " current time: " << time_now << dendl; - const auto ret = push_endpoint->send(event_entry.event, optional_yield{io_context, yield}); + const auto ret = push_endpoint->send(event_entry.event, yield); if (ret < 0) { ldpp_dout(this, 5) << "WARNING: push entry marker: " << entry.marker << " failed. error: " << ret @@ -262,7 +262,7 @@ private: } // clean stale reservation from queue - void cleanup_queue(const std::string& queue_name, spawn::yield_context yield) { + void cleanup_queue(const std::string& queue_name, boost::asio::yield_context yield) { while (!shutdown) { ldpp_dout(this, 20) << "INFO: trying to perform stale reservation cleanup for queue: " << queue_name << dendl; const auto now = ceph::coarse_real_time::clock::now(); @@ -275,7 +275,7 @@ private: "" /*no tag*/); cls_2pc_queue_expire_reservations(op, stale_time); // check ownership and do reservation cleanup in one batch - auto ret = rgw_rados_operate(this, rados_store.getRados()->get_notif_pool_ctx(), queue_name, &op, optional_yield(io_context, yield)); + auto ret = rgw_rados_operate(this, rados_store.getRados()->get_notif_pool_ctx(), queue_name, &op, yield); if (ret == -ENOENT) { // queue was deleted ldpp_dout(this, 5) << "INFO: queue: " @@ -299,12 +299,12 @@ private: } // unlock (lose ownership) queue - int unlock_queue(const std::string& queue_name, spawn::yield_context yield) { + int unlock_queue(const std::string& queue_name, boost::asio::yield_context yield) { librados::ObjectWriteOperation op; op.assert_exists(); rados::cls::lock::unlock(&op, queue_name+"_lock", lock_cookie); auto& rados_ioctx = rados_store.getRados()->get_notif_pool_ctx(); - const auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield)); + const auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, yield); if (ret == -ENOENT) { ldpp_dout(this, 10) << "INFO: queue: " << queue_name << ". was removed. nothing to unlock" << dendl; @@ -321,13 +321,13 @@ private: int get_topic_info(const std::string& queue_name, const cls_queue_entry& queue_entry, rgw_pubsub_topic& topic, - spawn::yield_context yield) { + boost::asio::yield_context yield) { std::string queue_topic_tenant; std::string queue_topic_name; parse_topic_metadata_key(queue_name, queue_topic_tenant, queue_topic_name); rgw_pubsub_topic topic_info; RGWPubSub ps(&rados_store, queue_topic_tenant, site); - int ret = ps.get_topic(this, queue_topic_name, topic_info, optional_yield{io_context, yield}, nullptr); + int ret = ps.get_topic(this, queue_topic_name, topic_info, yield, nullptr); if (ret < 0) { ldpp_dout(this, 1) << "WARNING: failed to fetch topic: " << queue_topic_name << " error: " << ret @@ -355,15 +355,18 @@ private: } // processing of a specific queue - void process_queue(const std::string& queue_name, spawn::yield_context yield) { + void process_queue(const std::string& queue_name, boost::asio::yield_context yield) { constexpr auto max_elements = 1024; auto is_idle = false; const std::string start_marker; // start a the cleanup coroutine for the queue - spawn::spawn(io_context, [this, queue_name](spawn::yield_context yield) { - cleanup_queue(queue_name, yield); - }, make_stack_allocator()); + boost::asio::spawn(make_strand(io_context), std::allocator_arg, make_stack_allocator(), + [this, queue_name](boost::asio::yield_context yield) { + cleanup_queue(queue_name, yield); + }, [] (std::exception_ptr eptr) { + if (eptr) std::rethrow_exception(eptr); + }); CountersManager queue_counters_container(queue_name, this->get_cct()); @@ -394,7 +397,7 @@ private: "" /*no tag*/); cls_2pc_queue_list_entries(op, start_marker, max_elements, &obl, &rval); // check ownership and list entries in one batch - auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, nullptr, optional_yield(io_context, yield)); + auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, nullptr, yield); if (ret == -ENOENT) { // queue was deleted topics_persistency_tracker.erase(queue_name); @@ -459,11 +462,10 @@ private: } entries_persistency_tracker& notifs_persistency_tracker = topics_persistency_tracker[queue_name]; - spawn::spawn(yield,[this, ¬ifs_persistency_tracker, &queue_name, entry_idx, - total_entries, &end_marker, &remove_entries, &has_error, &waiter, - &entry, &needs_migration_vector, - push_endpoint = push_endpoint.get(), - &topic_info](spawn::yield_context yield) { + boost::asio::spawn(yield, std::allocator_arg, make_stack_allocator(), + [this, ¬ifs_persistency_tracker, &queue_name, entry_idx, total_entries, &end_marker, + &remove_entries, &has_error, &waiter, &entry, &needs_migration_vector, + push_endpoint = push_endpoint.get(), &topic_info](boost::asio::yield_context yield) { const auto token = waiter.make_token(); auto& persistency_tracker = notifs_persistency_tracker[entry.marker]; auto result = @@ -487,7 +489,9 @@ private: ldpp_dout(this, 20) << "INFO: processing of entry: " << entry.marker << " (" << entry_idx << "/" << total_entries << ") from: " << queue_name << " failed" << dendl; } - }, make_stack_allocator()); + }, [] (std::exception_ptr eptr) { + if (eptr) std::rethrow_exception(eptr); + }); ++entry_idx; } @@ -519,7 +523,7 @@ private: "" /*no tag*/); cls_2pc_queue_remove_entries(op, end_marker, entries_to_remove); // check ownership and deleted entries in one batch - auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield)); + auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, yield); if (ret == -ENOENT) { // queue was deleted ldpp_dout(this, 5) << "INFO: queue: " << queue_name << ". was removed. processing will stop" << dendl; @@ -547,7 +551,7 @@ private: rgw_pubsub_topic topic; auto ret_of_get_topic = ps.get_topic(this, queue_name, topic, - optional_yield(io_context, yield), nullptr); + yield, nullptr); if (ret_of_get_topic < 0) { // we can't migrate entries without topic info ldpp_dout(this, 1) << "ERROR: failed to fetch topic: " << queue_name << " error: " @@ -579,7 +583,7 @@ private: buffer::list obl; int rval; cls_2pc_queue_reserve(op, size_to_migrate, migration_vector.size(), &obl, &rval); - ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield), librados::OPERATION_RETURNVEC); + ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, yield, librados::OPERATION_RETURNVEC); if (ret < 0) { ldpp_dout(this, 1) << "ERROR: failed to reserve migration space on queue: " << queue_name << ". error: " << ret << dendl; return; @@ -591,7 +595,7 @@ private: } cls_2pc_queue_commit(op, migration_vector, reservation_id); - ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield)); + ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, yield); reservation_id = cls_2pc_reservation::NO_ID; if (ret < 0) { ldpp_dout(this, 1) << "ERROR: failed to commit reservation to queue: " << queue_name << ". error: " << ret << dendl; @@ -618,7 +622,7 @@ private: // process all queues // find which of the queues is owned by this daemon and process it - void process_queues(spawn::yield_context yield) { + void process_queues(boost::asio::yield_context yield) { auto has_error = false; owned_queues_t owned_queues; size_t processed_queue_count = 0; @@ -646,7 +650,7 @@ private: timer.async_wait(yield[ec]); queues_t queues; - auto ret = read_queue_list(queues, optional_yield(io_context, yield)); + auto ret = read_queue_list(queues, yield); if (ret < 0) { has_error = true; continue; @@ -665,7 +669,7 @@ private: failover_time, LOCK_FLAG_MAY_RENEW); - ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield)); + ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, yield); if (ret == -EBUSY) { // lock is already taken by another RGW ldpp_dout(this, 20) << "INFO: queue: " << queue_name << " owned (locked) by another daemon" << dendl; @@ -687,7 +691,8 @@ private: if (owned_queues.insert(queue_name).second) { ldpp_dout(this, 10) << "INFO: queue: " << queue_name << " now owned (locked) by this daemon" << dendl; // start processing this queue - spawn::spawn(io_context, [this, &queue_gc, &queue_gc_lock, queue_name, &processed_queue_count](spawn::yield_context yield) { + boost::asio::spawn(make_strand(io_context), std::allocator_arg, make_stack_allocator(), + [this, &queue_gc, &queue_gc_lock, queue_name, &processed_queue_count](boost::asio::yield_context yield) { ++processed_queue_count; process_queue(queue_name, yield); // if queue processing ended, it means that the queue was removed or not owned anymore @@ -703,7 +708,9 @@ private: queue_gc.push_back(queue_name); --processed_queue_count; ldpp_dout(this, 10) << "INFO: queue: " << queue_name << " marked for removal" << dendl; - }, make_stack_allocator()); + }, [] (std::exception_ptr eptr) { + if (eptr) std::rethrow_exception(eptr); + }); } else { ldpp_dout(this, 20) << "INFO: queue: " << queue_name << " ownership (lock) renewed" << dendl; } @@ -741,9 +748,12 @@ public: } void init() { - spawn::spawn(io_context, [this](spawn::yield_context yield) { + boost::asio::spawn(make_strand(io_context), std::allocator_arg, make_stack_allocator(), + [this](boost::asio::yield_context yield) { process_queues(yield); - }, make_stack_allocator()); + }, [] (std::exception_ptr eptr) { + if (eptr) std::rethrow_exception(eptr); + }); // start the worker threads to do the actual queue processing const std::string WORKER_THREAD_NAME = "notif-worker"; diff --git a/src/rgw/driver/rados/rgw_pubsub_push.cc b/src/rgw/driver/rados/rgw_pubsub_push.cc index b73da6b42d4..69eb5c9ff32 100644 --- a/src/rgw/driver/rados/rgw_pubsub_push.cc +++ b/src/rgw/driver/rados/rgw_pubsub_push.cc @@ -19,7 +19,7 @@ #ifdef WITH_RADOSGW_KAFKA_ENDPOINT #include "rgw_kafka.h" #endif -#include +//#include #include #include #include "rgw_perf_counters.h" @@ -153,7 +153,7 @@ public: boost::system::error_code ec; auto yield = y.get_yield_context(); auto&& token = yield[ec]; - boost::asio::async_initiate( + boost::asio::async_initiate( [this] (auto handler, auto ex) { completion = Completion::create(ex, std::move(handler)); }, token, yield.get_executor()); diff --git a/src/rgw/driver/rados/rgw_rados.cc b/src/rgw/driver/rados/rgw_rados.cc index 4ad5ce96e10..e93ee1e83a2 100644 --- a/src/rgw/driver/rados/rgw_rados.cc +++ b/src/rgw/driver/rados/rgw_rados.cc @@ -1363,10 +1363,15 @@ int RGWRados::init_complete(const DoutPrefixProvider *dpp, optional_yield y) using namespace rgw; if (svc.site->is_meta_master() && all_zonegroups_support(*svc.site, zone_features::notification_v2)) { - spawn::spawn(v1_topic_migration, [this] (spawn::yield_context yield) { + boost::asio::spawn(v1_topic_migration, [this] (boost::asio::yield_context yield) { DoutPrefix dpp{cct, dout_subsys, "v1 topic migration: "}; rgwrados::topic_migration::migrate(&dpp, driver, v1_topic_migration, yield); + }, [] (std::exception_ptr eptr) { + if (eptr) std::rethrow_exception(eptr); }); + // TODO: we run this on a separate thread so shutdown can cancel it with + // v1_topic_migration.stop(), but we could run it on the global thread + // pool and cancel spawn() with a cancellation_signal instead v1_topic_migration.start(1); } } diff --git a/src/rgw/driver/rados/rgw_reshard.cc b/src/rgw/driver/rados/rgw_reshard.cc index c1011bd60a5..d590bff7fcf 100644 --- a/src/rgw/driver/rados/rgw_reshard.cc +++ b/src/rgw/driver/rados/rgw_reshard.cc @@ -1153,10 +1153,9 @@ int RGWReshardWait::wait(optional_yield y) } if (y) { - auto& context = y.get_io_context(); auto& yield = y.get_yield_context(); - Waiter waiter(context); + Waiter waiter(yield.get_executor()); waiters.push_back(waiter); lock.unlock(); diff --git a/src/rgw/driver/rados/rgw_reshard.h b/src/rgw/driver/rados/rgw_reshard.h index 0497414566a..8e37defa1db 100644 --- a/src/rgw/driver/rados/rgw_reshard.h +++ b/src/rgw/driver/rados/rgw_reshard.h @@ -252,11 +252,11 @@ class RGWReshardWait { ceph::condition_variable cond; struct Waiter : boost::intrusive::list_base_hook<> { - using Executor = boost::asio::io_context::executor_type; + using Executor = boost::asio::any_io_executor; using Timer = boost::asio::basic_waitable_timer, Executor>; Timer timer; - explicit Waiter(boost::asio::io_context& ioc) : timer(ioc) {} + explicit Waiter(boost::asio::any_io_executor ex) : timer(ex) {} }; boost::intrusive::list waiters; diff --git a/src/rgw/driver/rados/rgw_tools.cc b/src/rgw/driver/rados/rgw_tools.cc index 41254f9519e..eec4a799115 100644 --- a/src/rgw/driver/rados/rgw_tools.cc +++ b/src/rgw/driver/rados/rgw_tools.cc @@ -203,11 +203,10 @@ int rgw_rados_operate(const DoutPrefixProvider *dpp, librados::IoCtx& ioctx, con // given a yield_context, call async_operate() to yield the coroutine instead // of blocking if (y) { - auto& context = y.get_io_context(); auto& yield = y.get_yield_context(); boost::system::error_code ec; auto bl = librados::async_operate( - context, ioctx, oid, op, flags, trace_info, yield[ec]); + yield, ioctx, oid, op, flags, trace_info, yield[ec]); if (pbl) { *pbl = std::move(bl); } @@ -228,10 +227,9 @@ int rgw_rados_operate(const DoutPrefixProvider *dpp, librados::IoCtx& ioctx, con int flags, const jspan_context* trace_info) { if (y) { - auto& context = y.get_io_context(); auto& yield = y.get_yield_context(); boost::system::error_code ec; - librados::async_operate(context, ioctx, oid, op, flags, trace_info, yield[ec]); + librados::async_operate(yield, ioctx, oid, op, flags, trace_info, yield[ec]); return -ec.value(); } if (is_asio_thread) { @@ -248,10 +246,9 @@ int rgw_rados_notify(const DoutPrefixProvider *dpp, librados::IoCtx& ioctx, cons optional_yield y) { if (y) { - auto& context = y.get_io_context(); auto& yield = y.get_yield_context(); boost::system::error_code ec; - auto reply = librados::async_notify(context, ioctx, oid, + auto reply = librados::async_notify(yield, ioctx, oid, bl, timeout_ms, yield[ec]); if (pbl) { *pbl = std::move(reply); diff --git a/src/rgw/driver/rados/topic_migration.cc b/src/rgw/driver/rados/topic_migration.cc index c7dcfc37b44..0d4238f8431 100644 --- a/src/rgw/driver/rados/topic_migration.cc +++ b/src/rgw/driver/rados/topic_migration.cc @@ -263,10 +263,8 @@ int migrate_topics(const DoutPrefixProvider* dpp, optional_yield y, int migrate(const DoutPrefixProvider* dpp, rgw::sal::RadosStore* driver, boost::asio::io_context& context, - spawn::yield_context yield) + boost::asio::yield_context y) { - auto y = optional_yield{context, yield}; - ldpp_dout(dpp, 1) << "starting v1 topic migration.." << dendl; librados::Rados* rados = driver->getRados()->get_rados_handle(); diff --git a/src/rgw/driver/rados/topic_migration.h b/src/rgw/driver/rados/topic_migration.h index 9545fd63c2e..2bd2b730c85 100644 --- a/src/rgw/driver/rados/topic_migration.h +++ b/src/rgw/driver/rados/topic_migration.h @@ -16,7 +16,7 @@ #pragma once #include -#include +#include class DoutPrefixProvider; namespace rgw::sal { class RadosStore; } @@ -29,6 +29,6 @@ namespace rgwrados::topic_migration { int migrate(const DoutPrefixProvider* dpp, rgw::sal::RadosStore* driver, boost::asio::io_context& context, - spawn::yield_context yield); + boost::asio::yield_context yield); } // rgwrados::topic_migration diff --git a/src/rgw/rgw_aio.cc b/src/rgw/rgw_aio.cc index 1c9a54f0726..37636dd130b 100644 --- a/src/rgw/rgw_aio.cc +++ b/src/rgw/rgw_aio.cc @@ -89,16 +89,14 @@ struct Handler { template Aio::OpFunc aio_abstract(librados::IoCtx ctx, Op&& op, - boost::asio::io_context& context, - spawn::yield_context yield, jspan_context* trace_ctx = nullptr) { - return [ctx = std::move(ctx), op = std::move(op), &context, yield, trace_ctx] (Aio* aio, AioResult& r) mutable { + boost::asio::yield_context yield, + jspan_context* trace_ctx) { + return [ctx = std::move(ctx), op = std::move(op), yield, trace_ctx] (Aio* aio, AioResult& r) mutable { // arrange for the completion Handler to run on the yield_context's strand // executor so it can safely call back into Aio without locking - using namespace boost::asio; - async_completion init(yield); - auto ex = get_associated_executor(init.completion_handler); + auto ex = yield.get_executor(); - librados::async_operate(context, ctx, r.obj.oid, &op, 0, trace_ctx, + librados::async_operate(yield, ctx, r.obj.oid, &op, 0, trace_ctx, bind_executor(ex, Handler{aio, ctx, r})); }; } @@ -110,7 +108,7 @@ Aio::OpFunc d3n_cache_aio_abstract(const DoutPrefixProvider *dpp, optional_yield ceph_assert(y); auto c = std::make_unique(); lsubdout(g_ceph_context, rgw_datacache, 20) << "D3nDataCache: d3n_cache_aio_abstract(): libaio Read From Cache, oid=" << r.obj.oid << dendl; - c->file_aio_read_abstract(dpp, y.get_io_context(), y.get_yield_context(), cache_location, read_ofs, read_len, aio, r); + c->file_aio_read_abstract(dpp, y.get_yield_context(), cache_location, read_ofs, read_len, aio, r); }; } @@ -122,7 +120,7 @@ Aio::OpFunc aio_abstract(librados::IoCtx ctx, Op&& op, optional_yield y, jspan_c static_assert(!std::is_const_v); if (y) { return aio_abstract(std::move(ctx), std::forward(op), - y.get_io_context(), y.get_yield_context(), trace_ctx); + y.get_yield_context(), trace_ctx); } return aio_abstract(std::move(ctx), std::forward(op), trace_ctx); } diff --git a/src/rgw/rgw_aio_throttle.h b/src/rgw/rgw_aio_throttle.h index c0656ef225e..87fc980a94c 100644 --- a/src/rgw/rgw_aio_throttle.h +++ b/src/rgw/rgw_aio_throttle.h @@ -80,8 +80,7 @@ class BlockingAioThrottle final : public Aio, private Throttle { // a throttle that yields the coroutine instead of blocking. all public // functions must be called within the coroutine strand class YieldingAioThrottle final : public Aio, private Throttle { - boost::asio::io_context& context; - spawn::yield_context yield; + boost::asio::yield_context yield; struct Handler; // completion callback associated with the waiter @@ -94,9 +93,8 @@ class YieldingAioThrottle final : public Aio, private Throttle { struct Pending : AioResultEntry { uint64_t cost = 0; }; public: - YieldingAioThrottle(uint64_t window, boost::asio::io_context& context, - spawn::yield_context yield) - : Throttle(window), context(context), yield(yield) + YieldingAioThrottle(uint64_t window, boost::asio::yield_context yield) + : Throttle(window), yield(yield) {} virtual ~YieldingAioThrottle() override {}; @@ -119,7 +117,6 @@ inline auto make_throttle(uint64_t window_size, optional_yield y) std::unique_ptr aio; if (y) { aio = std::make_unique(window_size, - y.get_io_context(), y.get_yield_context()); } else { aio = std::make_unique(window_size); diff --git a/src/rgw/rgw_asio_frontend.cc b/src/rgw/rgw_asio_frontend.cc index 8c64570287d..ace3b7aff49 100644 --- a/src/rgw/rgw_asio_frontend.cc +++ b/src/rgw/rgw_asio_frontend.cc @@ -3,6 +3,7 @@ #include #include +#include #include #include @@ -16,7 +17,7 @@ #include #include -#include +#include #include "common/async/shared_mutex.h" #include "common/errno.h" @@ -69,12 +70,12 @@ class StreamIO : public rgw::asio::ClientIO { CephContext* const cct; Stream& stream; timeout_timer& timeout; - spawn::yield_context yield; + boost::asio::yield_context yield; parse_buffer& buffer; boost::system::error_code fatal_ec; public: StreamIO(CephContext *cct, Stream& stream, timeout_timer& timeout, - rgw::asio::parser_type& parser, spawn::yield_context yield, + rgw::asio::parser_type& parser, boost::asio::yield_context yield, parse_buffer& buffer, bool is_ssl, const tcp::endpoint& local_endpoint, const tcp::endpoint& remote_endpoint) @@ -200,7 +201,7 @@ void handle_connection(boost::asio::io_context& context, rgw::dmclock::Scheduler *scheduler, const std::string& uri_prefix, boost::system::error_code& ec, - spawn::yield_context yield) + boost::asio::yield_context yield) { // don't impose a limit on the body, since we read it in pieces static constexpr size_t body_limit = std::numeric_limits::max(); @@ -281,7 +282,7 @@ void handle_connection(boost::asio::io_context& context, RGWRestfulIO client(cct, &real_client_io); optional_yield y = null_yield; if (cct->_conf->rgw_beast_enable_async) { - y = optional_yield{context, yield}; + y = optional_yield{yield}; } int http_ret = 0; string user = "-"; @@ -1021,8 +1022,8 @@ void AsioFrontend::accept(Listener& l, boost::system::error_code ec) // spawn a coroutine to handle the connection #ifdef WITH_RADOSGW_BEAST_OPENSSL if (l.use_ssl) { - spawn::spawn(context, - [this, s=std::move(stream)] (spawn::yield_context yield) mutable { + boost::asio::spawn(make_strand(context), std::allocator_arg, make_stack_allocator(), + [this, s=std::move(stream)] (boost::asio::yield_context yield) mutable { auto conn = boost::intrusive_ptr{new Connection(std::move(s))}; auto c = connections.add(*conn); // wrap the tcp stream in an ssl stream @@ -1049,13 +1050,15 @@ void AsioFrontend::accept(Listener& l, boost::system::error_code ec) } conn->socket.shutdown(tcp::socket::shutdown_both, ec); - }, make_stack_allocator()); + }, [] (std::exception_ptr eptr) { + if (eptr) std::rethrow_exception(eptr); + }); } else { #else { #endif // WITH_RADOSGW_BEAST_OPENSSL - spawn::spawn(context, - [this, s=std::move(stream)] (spawn::yield_context yield) mutable { + boost::asio::spawn(make_strand(context), std::allocator_arg, make_stack_allocator(), + [this, s=std::move(stream)] (boost::asio::yield_context yield) mutable { auto conn = boost::intrusive_ptr{new Connection(std::move(s))}; auto c = connections.add(*conn); auto timeout = timeout_timer{context.get_executor(), request_timeout, conn}; @@ -1064,7 +1067,9 @@ void AsioFrontend::accept(Listener& l, boost::system::error_code ec) conn->buffer, false, pause_mutex, scheduler.get(), uri_prefix, ec, yield); conn->socket.shutdown(tcp::socket::shutdown_both, ec); - }, make_stack_allocator()); + }, [] (std::exception_ptr eptr) { + if (eptr) std::rethrow_exception(eptr); + }); } } diff --git a/src/rgw/rgw_d3n_cacherequest.h b/src/rgw/rgw_d3n_cacherequest.h index 0519c6def3e..54b495f5461 100644 --- a/src/rgw/rgw_d3n_cacherequest.h +++ b/src/rgw/rgw_d3n_cacherequest.h @@ -7,6 +7,8 @@ #include #include +#include + #include "include/rados/librados.hpp" #include "include/Context.h" #include "common/async/completion.h" @@ -135,13 +137,10 @@ struct D3nL1CacheRequest { } }; - void file_aio_read_abstract(const DoutPrefixProvider *dpp, spawn::yield_context yield, + void file_aio_read_abstract(const DoutPrefixProvider *dpp, boost::asio::yield_context yield, std::string& cache_location, off_t read_ofs, off_t read_len, rgw::Aio* aio, rgw::AioResult& r) { - using namespace boost::asio; - async_completion init(yield); - auto ex = get_associated_executor(init.completion_handler); - + auto ex = yield.get_executor(); ldpp_dout(dpp, 20) << "D3nDataCache: " << __func__ << "(): oid=" << r.obj.oid << dendl; async_read(dpp, ex, cache_location+"/"+url_encode(r.obj.oid, true), read_ofs, read_len, bind_executor(ex, d3n_libaio_handler{aio, r})); } diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index 24796ea1aa4..9ad4672718d 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -6788,7 +6788,8 @@ void RGWDeleteMultiObj::execute(optional_yield y) char* buf; std::optional formatter_flush_cond; if (y) { - formatter_flush_cond = std::make_optional(y.get_io_context()); + auto ex = y.get_yield_context().get_executor(); + formatter_flush_cond = std::make_optional(ex); } buf = data.c_str(); @@ -6856,9 +6857,11 @@ void RGWDeleteMultiObj::execute(optional_yield y) return aio_count < max_aio; }); aio_count++; - spawn::spawn(y.get_yield_context(), [this, &y, &aio_count, obj_key, &formatter_flush_cond] (spawn::yield_context yield) { - handle_individual_object(obj_key, optional_yield { y.get_io_context(), yield }, &*formatter_flush_cond); + boost::asio::spawn(y.get_yield_context(), [this, &aio_count, obj_key, &formatter_flush_cond] (boost::asio::yield_context yield) { + handle_individual_object(obj_key, yield, &*formatter_flush_cond); aio_count--; + }, [] (std::exception_ptr eptr) { + if (eptr) std::rethrow_exception(eptr); }); } else { handle_individual_object(obj_key, y, nullptr); diff --git a/src/rgw/rgw_sync_checkpoint.cc b/src/rgw/rgw_sync_checkpoint.cc index 1394a712a94..1172e79a48f 100644 --- a/src/rgw/rgw_sync_checkpoint.cc +++ b/src/rgw/rgw_sync_checkpoint.cc @@ -226,8 +226,8 @@ int rgw_bucket_sync_checkpoint(const DoutPrefixProvider* dpp, entry.pipe = pipe; // fetch remote markers - spawn::spawn(ioctx, [&] (spawn::yield_context yield) { - auto y = optional_yield{ioctx, yield}; + boost::asio::spawn(ioctx, [&] (boost::asio::yield_context yield) { + auto y = optional_yield{yield}; rgw_bucket_index_marker_info info; int r = source_bilog_info(dpp, store->svc()->zone, entry.pipe, info, entry.remote_markers, y); @@ -237,10 +237,12 @@ int rgw_bucket_sync_checkpoint(const DoutPrefixProvider* dpp, throw std::system_error(-r, std::system_category()); } entry.latest_gen = info.latest_gen; + }, [] (std::exception_ptr eptr) { + if (eptr) std::rethrow_exception(eptr); }); // fetch source bucket info - spawn::spawn(ioctx, [&] (spawn::yield_context yield) { - auto y = optional_yield{ioctx, yield}; + boost::asio::spawn(ioctx, [&] (boost::asio::yield_context yield) { + auto y = optional_yield{yield}; int r = store->getRados()->get_bucket_instance_info( *entry.pipe.source.bucket, entry.source_bucket_info, nullptr, nullptr, y, dpp); @@ -249,6 +251,8 @@ int rgw_bucket_sync_checkpoint(const DoutPrefixProvider* dpp, << cpp_strerror(r) << dendl; throw std::system_error(-r, std::system_category()); } + }, [] (std::exception_ptr eptr) { + if (eptr) std::rethrow_exception(eptr); }); } diff --git a/src/test/CMakeLists.txt b/src/test/CMakeLists.txt index a0c8fcfe823..2e756eeb583 100644 --- a/src/test/CMakeLists.txt +++ b/src/test/CMakeLists.txt @@ -340,7 +340,7 @@ target_link_libraries(ceph_test_librgw_file_nfsns ${LUA_LIBRARIES} ${ALLOC_LIBS} ) - target_link_libraries(ceph_test_librgw_file_nfsns spawn) + target_link_libraries(ceph_test_librgw_file_nfsns Boost::context) install(TARGETS ceph_test_librgw_file_nfsns DESTINATION ${CMAKE_INSTALL_BINDIR}) # ceph_test_librgw_file_aw (nfs write transaction [atomic write] tests) @@ -374,7 +374,7 @@ target_link_libraries(ceph_test_librgw_file_marker ${LUA_LIBRARIES} ${ALLOC_LIBS} ) - target_link_libraries(ceph_test_librgw_file_marker spawn) + target_link_libraries(ceph_test_librgw_file_marker Boost::context) install(TARGETS ceph_test_librgw_file_marker DESTINATION ${CMAKE_INSTALL_BINDIR}) # ceph_test_librgw_file_xattr (attribute ops) @@ -393,7 +393,7 @@ target_link_libraries(ceph_test_librgw_file_xattr ${LUA_LIBRARIES} ${ALLOC_LIBS} ) -target_link_libraries(ceph_test_librgw_file_xattr spawn) +target_link_libraries(ceph_test_librgw_file_xattr Boost::context) # ceph_test_librgw_file_rename (mv/rename tests) add_executable(ceph_test_librgw_file_rename diff --git a/src/test/librados/CMakeLists.txt b/src/test/librados/CMakeLists.txt index 5d5623f06c8..b83f1abf5bc 100644 --- a/src/test/librados/CMakeLists.txt +++ b/src/test/librados/CMakeLists.txt @@ -60,7 +60,7 @@ target_link_libraries(ceph_test_rados_api_aio_pp add_executable(ceph_test_rados_api_asio asio.cc) target_link_libraries(ceph_test_rados_api_asio global - librados ${UNITTEST_LIBS} spawn) + librados ${UNITTEST_LIBS} Boost::context) add_executable(ceph_test_rados_api_list list.cc @@ -133,7 +133,7 @@ target_include_directories(ceph_test_rados_api_tier_pp PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw") target_link_libraries(ceph_test_rados_api_tier_pp librados global ${UNITTEST_LIBS} Boost::system radostest-cxx cls_cas_internal - cls_cas_client spawn) + cls_cas_client Boost::context) add_executable(ceph_test_rados_api_snapshots snapshots.cc) diff --git a/src/test/librados/asio.cc b/src/test/librados/asio.cc index 0ede2e14fb4..9f8844eb7bb 100644 --- a/src/test/librados/asio.cc +++ b/src/test/librados/asio.cc @@ -21,8 +21,8 @@ #include #include -#include #include +#include #include #define dout_subsys ceph_subsys_rados @@ -73,6 +73,10 @@ librados::Rados AsioRados::rados; librados::IoCtx AsioRados::io; librados::IoCtx AsioRados::snapio; +void rethrow(std::exception_ptr eptr) { + if (eptr) std::rethrow_exception(eptr); +} + TEST_F(AsioRados, AsyncReadCallback) { boost::asio::io_context service; @@ -113,20 +117,20 @@ TEST_F(AsioRados, AsyncReadYield) { boost::asio::io_context service; - auto success_cr = [&] (spawn::yield_context yield) { + auto success_cr = [&] (boost::asio::yield_context yield) { boost::system::error_code ec; auto bl = librados::async_read(service, io, "exist", 256, 0, yield[ec]); EXPECT_FALSE(ec); EXPECT_EQ("hello", bl.to_str()); }; - spawn::spawn(service, success_cr); + boost::asio::spawn(service, success_cr, rethrow); - auto failure_cr = [&] (spawn::yield_context yield) { + auto failure_cr = [&] (boost::asio::yield_context yield) { boost::system::error_code ec; auto bl = librados::async_read(service, io, "noexist", 256, 0, yield[ec]); EXPECT_EQ(boost::system::errc::no_such_file_or_directory, ec); }; - spawn::spawn(service, failure_cr); + boost::asio::spawn(service, failure_cr, rethrow); service.run(); } @@ -178,22 +182,22 @@ TEST_F(AsioRados, AsyncWriteYield) bufferlist bl; bl.append("hello"); - auto success_cr = [&] (spawn::yield_context yield) { + auto success_cr = [&] (boost::asio::yield_context yield) { boost::system::error_code ec; librados::async_write(service, io, "exist", bl, bl.length(), 0, yield[ec]); EXPECT_FALSE(ec); EXPECT_EQ("hello", bl.to_str()); }; - spawn::spawn(service, success_cr); + boost::asio::spawn(service, success_cr, rethrow); - auto failure_cr = [&] (spawn::yield_context yield) { + auto failure_cr = [&] (boost::asio::yield_context yield) { boost::system::error_code ec; librados::async_write(service, snapio, "exist", bl, bl.length(), 0, yield[ec]); EXPECT_EQ(boost::system::errc::read_only_file_system, ec); }; - spawn::spawn(service, failure_cr); + boost::asio::spawn(service, failure_cr, rethrow); service.run(); } @@ -251,7 +255,7 @@ TEST_F(AsioRados, AsyncReadOperationYield) { boost::asio::io_context service; - auto success_cr = [&] (spawn::yield_context yield) { + auto success_cr = [&] (boost::asio::yield_context yield) { librados::ObjectReadOperation op; op.read(0, 0, nullptr, nullptr); boost::system::error_code ec; @@ -260,9 +264,9 @@ TEST_F(AsioRados, AsyncReadOperationYield) EXPECT_FALSE(ec); EXPECT_EQ("hello", bl.to_str()); }; - spawn::spawn(service, success_cr); + boost::asio::spawn(service, success_cr, rethrow); - auto failure_cr = [&] (spawn::yield_context yield) { + auto failure_cr = [&] (boost::asio::yield_context yield) { librados::ObjectReadOperation op; op.read(0, 0, nullptr, nullptr); boost::system::error_code ec; @@ -270,7 +274,7 @@ TEST_F(AsioRados, AsyncReadOperationYield) yield[ec]); EXPECT_EQ(boost::system::errc::no_such_file_or_directory, ec); }; - spawn::spawn(service, failure_cr); + boost::asio::spawn(service, failure_cr, rethrow); service.run(); } @@ -335,23 +339,23 @@ TEST_F(AsioRados, AsyncWriteOperationYield) bufferlist bl; bl.append("hello"); - auto success_cr = [&] (spawn::yield_context yield) { + auto success_cr = [&] (boost::asio::yield_context yield) { librados::ObjectWriteOperation op; op.write_full(bl); boost::system::error_code ec; librados::async_operate(service, io, "exist", &op, 0, nullptr, yield[ec]); EXPECT_FALSE(ec); }; - spawn::spawn(service, success_cr); + boost::asio::spawn(service, success_cr, rethrow); - auto failure_cr = [&] (spawn::yield_context yield) { + auto failure_cr = [&] (boost::asio::yield_context yield) { librados::ObjectWriteOperation op; op.write_full(bl); boost::system::error_code ec; librados::async_operate(service, snapio, "exist", &op, 0, nullptr, yield[ec]); EXPECT_EQ(boost::system::errc::read_only_file_system, ec); }; - spawn::spawn(service, failure_cr); + boost::asio::spawn(service, failure_cr, rethrow); service.run(); } diff --git a/src/test/rgw/CMakeLists.txt b/src/test/rgw/CMakeLists.txt index dec7ea1c149..8e4235a1af8 100644 --- a/src/test/rgw/CMakeLists.txt +++ b/src/test/rgw/CMakeLists.txt @@ -35,7 +35,6 @@ target_link_libraries(ceph_test_rgw_d4n_directory PRIVATE ${UNITTEST_LIBS} ${EXTRALIBS} ) - target_link_libraries(ceph_test_rgw_d4n_directory PRIVATE spawn) install(TARGETS ceph_test_rgw_d4n_directory DESTINATION ${CMAKE_INSTALL_BINDIR}) endif() @@ -54,7 +53,7 @@ target_link_libraries(ceph_test_rgw_d4n_filter PRIVATE ${UNITTEST_LIBS} ${EXTRALIBS} ) - target_link_libraries(ceph_test_rgw_d4n_filter PRIVATE spawn) + target_link_libraries(ceph_test_rgw_d4n_filter PRIVATE Boost::context) install(TARGETS ceph_test_rgw_d4n_filter DESTINATION ${CMAKE_INSTALL_BINDIR}) endif() diff --git a/src/test/rgw/bench_rgw_ratelimit.cc b/src/test/rgw/bench_rgw_ratelimit.cc index 1ea8714f9df..529d8f739fd 100644 --- a/src/test/rgw/bench_rgw_ratelimit.cc +++ b/src/test/rgw/bench_rgw_ratelimit.cc @@ -6,7 +6,7 @@ #include #include #include -#include +#include #include #include #include @@ -37,7 +37,7 @@ struct parameters { std::shared_ptr> ds = std::make_shared>(std::vector()); std::string method[2] = {"PUT", "GET"}; -void simulate_transfer(client_info& it, const RGWRateLimitInfo* info, std::shared_ptr ratelimit, const parameters& params, spawn::yield_context& yield, boost::asio::io_context& ioctx) +void simulate_transfer(client_info& it, const RGWRateLimitInfo* info, std::shared_ptr ratelimit, const parameters& params, boost::asio::yield_context& yield, boost::asio::io_context& ioctx) { auto dout = DoutPrefix(g_ceph_context, ceph_subsys_rgw, "rate limiter: "); boost::asio::steady_timer timer(ioctx); @@ -101,7 +101,7 @@ bool simulate_request(client_info& it, const RGWRateLimitInfo& info, std::shared it.accepted++; return false; } -void simulate_client(client_info& it, const RGWRateLimitInfo& info, std::shared_ptr ratelimit, const parameters& params, spawn::yield_context& ctx, bool& to_run, boost::asio::io_context& ioctx) +void simulate_client(client_info& it, const RGWRateLimitInfo& info, std::shared_ptr ratelimit, const parameters& params, boost::asio::yield_context& ctx, bool& to_run, boost::asio::io_context& ioctx) { for (;;) { @@ -130,11 +130,13 @@ void simulate_clients(boost::asio::io_context& context, std::string tenant, cons auto& it = ds->emplace_back(client_info()); it.tenant = tenant; int x = ds->size() - 1; - spawn::spawn(context, - [&to_run ,x, ratelimit, info, params, &context](spawn::yield_context ctx) + boost::asio::spawn(context, + [&to_run ,x, ratelimit, info, params, &context](boost::asio::yield_context ctx) { auto& it = ds.get()->operator[](x); simulate_client(it, info, ratelimit, params, ctx, to_run, context); + }, [] (std::exception_ptr eptr) { + if (eptr) std::rethrow_exception(eptr); }); } } diff --git a/src/test/rgw/test_rgw_dmclock_scheduler.cc b/src/test/rgw/test_rgw_dmclock_scheduler.cc index c9b4a853fd4..da748dfa6c9 100644 --- a/src/test/rgw/test_rgw_dmclock_scheduler.cc +++ b/src/test/rgw/test_rgw_dmclock_scheduler.cc @@ -18,7 +18,7 @@ #include "rgw_dmclock_async_scheduler.h" #include -#include +#include #include #include "acconfig.h" #include "global/global_context.h" @@ -400,7 +400,7 @@ TEST(Queue, SpawnAsyncRequest) { boost::asio::io_context context; - spawn::spawn(context, [&] (spawn::yield_context yield) { + boost::asio::spawn(context, [&] (boost::asio::yield_context yield) { ClientCounters counters(g_ceph_context); AsyncScheduler queue(g_ceph_context, context, std::ref(counters), nullptr, [] (client_id client) -> ClientInfo* { @@ -419,6 +419,8 @@ TEST(Queue, SpawnAsyncRequest) auto p2 = queue.async_request(client_id::auth, {}, get_time(), 1, yield[ec2]); EXPECT_EQ(boost::system::errc::success, ec2); EXPECT_EQ(PhaseType::priority, p2); + }, [] (std::exception_ptr eptr) { + if (eptr) std::rethrow_exception(eptr); }); context.run_for(std::chrono::milliseconds(50)); diff --git a/src/test/rgw/test_rgw_reshard_wait.cc b/src/test/rgw/test_rgw_reshard_wait.cc index 98b2aa235b9..058828b956c 100644 --- a/src/test/rgw/test_rgw_reshard_wait.cc +++ b/src/test/rgw/test_rgw_reshard_wait.cc @@ -13,7 +13,7 @@ */ #include "rgw_reshard.h" -#include +#include #include @@ -58,15 +58,19 @@ TEST(ReshardWait, stop_block) short_waiter.stop(); } +void rethrow(std::exception_ptr eptr) { + if (eptr) std::rethrow_exception(eptr); +} + TEST(ReshardWait, wait_yield) { constexpr ceph::timespan wait_duration = 50ms; RGWReshardWait waiter(wait_duration); boost::asio::io_context context; - spawn::spawn(context, [&] (spawn::yield_context yield) { - EXPECT_EQ(0, waiter.wait(optional_yield{context, yield})); - }); + boost::asio::spawn(context, [&] (boost::asio::yield_context yield) { + EXPECT_EQ(0, waiter.wait(yield)); + }, rethrow); const auto start = Clock::now(); EXPECT_EQ(1u, context.poll()); // spawn @@ -89,10 +93,10 @@ TEST(ReshardWait, stop_yield) RGWReshardWait short_waiter(short_duration); boost::asio::io_context context; - spawn::spawn(context, - [&] (spawn::yield_context yield) { - EXPECT_EQ(-ECANCELED, long_waiter.wait(optional_yield{context, yield})); - }); + boost::asio::spawn(context, + [&] (boost::asio::yield_context yield) { + EXPECT_EQ(-ECANCELED, long_waiter.wait(yield)); + }, rethrow); const auto start = Clock::now(); EXPECT_EQ(1u, context.poll()); // spawn @@ -133,13 +137,13 @@ TEST(ReshardWait, stop_multiple) // spawn 4 coroutines boost::asio::io_context context; { - auto async_waiter = [&] (spawn::yield_context yield) { - EXPECT_EQ(-ECANCELED, long_waiter.wait(optional_yield{context, yield})); + auto async_waiter = [&] (boost::asio::yield_context yield) { + EXPECT_EQ(-ECANCELED, long_waiter.wait(yield)); }; - spawn::spawn(context, async_waiter); - spawn::spawn(context, async_waiter); - spawn::spawn(context, async_waiter); - spawn::spawn(context, async_waiter); + boost::asio::spawn(context, async_waiter, rethrow); + boost::asio::spawn(context, async_waiter, rethrow); + boost::asio::spawn(context, async_waiter, rethrow); + boost::asio::spawn(context, async_waiter, rethrow); } const auto start = Clock::now(); diff --git a/src/test/rgw/test_rgw_throttle.cc b/src/test/rgw/test_rgw_throttle.cc index e5df9f84efa..18dd8f3ffbc 100644 --- a/src/test/rgw/test_rgw_throttle.cc +++ b/src/test/rgw/test_rgw_throttle.cc @@ -20,9 +20,9 @@ #include #include #include +#include #include "include/scope_guard.h" -#include #include static rgw_raw_obj make_obj(const std::string& oid) @@ -143,13 +143,15 @@ TEST(Aio_Throttle, YieldCostOverWindow) auto obj = make_obj(__PRETTY_FUNCTION__); boost::asio::io_context context; - spawn::spawn(context, - [&] (spawn::yield_context yield) { - YieldingAioThrottle throttle(4, context, yield); + boost::asio::spawn(context, + [&] (boost::asio::yield_context yield) { + YieldingAioThrottle throttle(4, yield); scoped_completion op; auto c = throttle.get(obj, wait_on(op), 8, 0); ASSERT_EQ(1u, c.size()); EXPECT_EQ(-EDEADLK, c.front().result); + }, [] (std::exception_ptr eptr) { + if (eptr) std::rethrow_exception(eptr); }); context.run(); } @@ -166,9 +168,9 @@ TEST(Aio_Throttle, YieldingThrottleOverMax) uint64_t outstanding = 0; boost::asio::io_context context; - spawn::spawn(context, - [&] (spawn::yield_context yield) { - YieldingAioThrottle throttle(window, context, yield); + boost::asio::spawn(context, + [&] (boost::asio::yield_context yield) { + YieldingAioThrottle throttle(window, yield); for (uint64_t i = 0; i < total; i++) { using namespace std::chrono_literals; auto c = throttle.get(obj, wait_for(context, 10ms), 1, 0); @@ -180,6 +182,8 @@ TEST(Aio_Throttle, YieldingThrottleOverMax) } auto c = throttle.drain(); outstanding -= c.size(); + }, [] (std::exception_ptr eptr) { + if (eptr) std::rethrow_exception(eptr); }); context.poll(); // run until we block EXPECT_EQ(window, outstanding); -- 2.39.5