From 9dd892e289b32a90b24d55ab8e1b7d7601af21ca Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Wed, 14 Feb 2024 22:53:47 -0500 Subject: [PATCH] rgw: switch back to boost::asio for spawn() and yield_context a fork of boost::asio::spawn() was introduced in 2020 with spawn::spawn() from #31580. this fork enabled rgw to customize how the coroutine stacks are allocated in order to avoid stack overflows in frontend request coroutines. this customization was based on a StackAllocator concept from the boost::context library in boost 1.80, that same StackAllocator overload was added to boost::asio::spawn(), along with other improvements like per-op cancellation. now that boost has everything we need, switch back and drop the spawn submodule this required switching a lot of async functions from async_completion<> to async_initiate<>. similar changes were necessary to enable the c++20 coroutine token boost::asio::use_awaitable Signed-off-by: Casey Bodley --- src/cls/CMakeLists.txt | 15 +-- src/common/async/yield_context.h | 22 ++--- src/crypto/isa-l/CMakeLists.txt | 2 +- src/crypto/openssl/CMakeLists.txt | 2 +- src/crypto/qat/CMakeLists.txt | 2 +- src/crypto/qat/qcccrypto.cc | 4 +- src/rgw/CMakeLists.txt | 6 +- src/rgw/driver/d4n/d4n_directory.cc | 2 +- src/rgw/driver/d4n/d4n_policy.cc | 7 +- src/rgw/driver/dbstore/CMakeLists.txt | 2 +- src/rgw/driver/rados/cls_fifo_legacy.h | 2 +- src/rgw/driver/rados/rgw_bucket.cc | 21 ++-- src/rgw/driver/rados/rgw_notify.cc | 66 +++++++------ src/rgw/driver/rados/rgw_pubsub_push.cc | 4 +- src/rgw/driver/rados/rgw_rados.cc | 7 +- src/rgw/driver/rados/rgw_reshard.cc | 3 +- src/rgw/driver/rados/rgw_reshard.h | 4 +- src/rgw/driver/rados/rgw_tools.cc | 9 +- src/rgw/driver/rados/topic_migration.cc | 4 +- src/rgw/driver/rados/topic_migration.h | 4 +- src/rgw/rgw_aio.cc | 16 ++- src/rgw/rgw_aio_throttle.h | 9 +- src/rgw/rgw_asio_frontend.cc | 27 +++--- src/rgw/rgw_d3n_cacherequest.h | 9 +- src/rgw/rgw_op.cc | 9 +- src/rgw/rgw_redis_driver.cc | 14 ++- src/rgw/rgw_ssd_driver.cc | 15 ++- src/rgw/rgw_sync_checkpoint.cc | 12 ++- src/test/CMakeLists.txt | 6 +- src/test/librados/CMakeLists.txt | 4 +- src/test/librados/asio.cc | 38 ++++---- src/test/rgw/CMakeLists.txt | 4 - src/test/rgw/bench_rgw_ratelimit.cc | 12 ++- src/test/rgw/test_d4n_directory.cc | 94 +++++++++--------- src/test/rgw/test_d4n_policy.cc | 40 ++++---- src/test/rgw/test_redis_driver.cc | 108 +++++++++++---------- src/test/rgw/test_rgw_dmclock_scheduler.cc | 6 +- src/test/rgw/test_rgw_reshard_wait.cc | 32 +++--- src/test/rgw/test_rgw_throttle.cc | 18 ++-- src/test/rgw/test_ssd_driver.cc | 94 +++++++++--------- 40 files changed, 393 insertions(+), 362 deletions(-) diff --git a/src/cls/CMakeLists.txt b/src/cls/CMakeLists.txt index 08590a43989c2..953ac83195f26 100644 --- a/src/cls/CMakeLists.txt +++ b/src/cls/CMakeLists.txt @@ -76,8 +76,7 @@ if (WITH_RADOSGW) target_link_libraries(cls_otp OATH::OATH) target_include_directories(cls_otp PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw/driver/rados" - PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw" - PUBLIC "${CMAKE_SOURCE_DIR}/src/spawn/include") + PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw") set_target_properties(cls_otp PROPERTIES VERSION "1.0.0" SOVERSION "1" @@ -204,8 +203,7 @@ if (WITH_RADOSGW) target_link_libraries(cls_rgw ${FMT_LIB} json_spirit) target_include_directories(cls_rgw PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw/driver/rados" - PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw" - PUBLIC "${CMAKE_SOURCE_DIR}/src/spawn/include") + PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw") set_target_properties(cls_rgw PROPERTIES VERSION "1.0.0" SOVERSION "1" @@ -220,8 +218,7 @@ if (WITH_RADOSGW) add_library(cls_rgw_client STATIC ${cls_rgw_client_srcs}) target_include_directories(cls_rgw_client PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw/driver/rados" - PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw" - PUBLIC "${CMAKE_SOURCE_DIR}/src/spawn/include") + PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw") endif (WITH_RADOSGW) @@ -313,8 +310,7 @@ if (WITH_RADOSGW) add_library(cls_rgw_gc SHARED ${cls_rgw_gc_srcs}) target_include_directories(cls_rgw_gc PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw/driver/rados" - PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw" - PUBLIC "${CMAKE_SOURCE_DIR}/src/spawn/include") + PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw") set_target_properties(cls_rgw_gc PROPERTIES VERSION "1.0.0" SOVERSION "1" @@ -328,8 +324,7 @@ if (WITH_RADOSGW) add_library(cls_rgw_gc_client STATIC ${cls_rgw_gc_client_srcs}) target_include_directories(cls_rgw_gc_client PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw/driver/rados" - PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw" - PUBLIC "${CMAKE_SOURCE_DIR}/src/spawn/include") + PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw") endif (WITH_RADOSGW) diff --git a/src/common/async/yield_context.h b/src/common/async/yield_context.h index baa028fa1b4af..fd9a20901aa56 100644 --- a/src/common/async/yield_context.h +++ b/src/common/async/yield_context.h @@ -17,23 +17,18 @@ #include #include #include +#include #include "acconfig.h" -#include - -/// optional-like wrapper for a spawn::yield_context and its associated -/// boost::asio::io_context. operations that take an optional_yield argument -/// will, when passed a non-empty yield context, suspend this coroutine instead -/// of the blocking the thread of execution +/// optional-like wrapper for a boost::asio::yield_context. operations that take +/// an optional_yield argument will, when passed a non-empty yield context, +/// suspend this coroutine instead of the blocking the thread of execution class optional_yield { - boost::asio::io_context *c = nullptr; - spawn::yield_context *y = nullptr; + boost::asio::yield_context *y = nullptr; public: /// construct with a valid io and yield_context - explicit optional_yield(boost::asio::io_context& c, - spawn::yield_context& y) noexcept - : c(&c), y(&y) {} + optional_yield(boost::asio::yield_context& y) noexcept : y(&y) {} /// type tag to construct an empty object struct empty_t {}; @@ -42,11 +37,8 @@ class optional_yield { /// implicit conversion to bool, returns true if non-empty operator bool() const noexcept { return y; } - /// return a reference to the associated io_context. only valid if non-empty - boost::asio::io_context& get_io_context() const noexcept { return *c; } - /// return a reference to the yield_context. only valid if non-empty - spawn::yield_context& get_yield_context() const noexcept { return *y; } + boost::asio::yield_context& get_yield_context() const noexcept { return *y; } }; // type tag object to construct an empty optional_yield diff --git a/src/crypto/isa-l/CMakeLists.txt b/src/crypto/isa-l/CMakeLists.txt index c8d832247d92e..40da7e495c374 100644 --- a/src/crypto/isa-l/CMakeLists.txt +++ b/src/crypto/isa-l/CMakeLists.txt @@ -30,7 +30,7 @@ endif(HAVE_NASM_X64) add_library(ceph_crypto_isal SHARED ${isal_crypto_plugin_srcs}) target_include_directories(ceph_crypto_isal PRIVATE ${isal_dir}/include) -target_link_libraries(ceph_crypto_isal PRIVATE spawn) +target_link_libraries(ceph_crypto_isal PRIVATE Boost::context) set_target_properties(ceph_crypto_isal PROPERTIES VERSION 1.0.0 diff --git a/src/crypto/openssl/CMakeLists.txt b/src/crypto/openssl/CMakeLists.txt index 5365ab9a6ca22..ac9d868939657 100644 --- a/src/crypto/openssl/CMakeLists.txt +++ b/src/crypto/openssl/CMakeLists.txt @@ -8,7 +8,7 @@ add_library(ceph_crypto_openssl SHARED ${openssl_crypto_plugin_srcs}) target_link_libraries(ceph_crypto_openssl PRIVATE OpenSSL::Crypto $<$:ceph-common> - spawn) + Boost::context) target_include_directories(ceph_crypto_openssl PRIVATE ${OPENSSL_INCLUDE_DIR}) add_dependencies(crypto_plugins ceph_crypto_openssl) set_target_properties(ceph_crypto_openssl PROPERTIES INSTALL_RPATH "") diff --git a/src/crypto/qat/CMakeLists.txt b/src/crypto/qat/CMakeLists.txt index 04bc0b7e7f430..85f7ff50e1343 100644 --- a/src/crypto/qat/CMakeLists.txt +++ b/src/crypto/qat/CMakeLists.txt @@ -14,7 +14,7 @@ add_dependencies(crypto_plugins ceph_crypto_qat) target_link_libraries(ceph_crypto_qat PRIVATE QAT::qat QAT::usdm - spawn) + Boost::context) add_dependencies(crypto_plugins ceph_crypto_qat) set_target_properties(ceph_crypto_qat PROPERTIES VERSION 1.0.0 SOVERSION 1) diff --git a/src/crypto/qat/qcccrypto.cc b/src/crypto/qat/qcccrypto.cc index d441e2cd6163e..94c98518a90b6 100644 --- a/src/crypto/qat/qcccrypto.cc +++ b/src/crypto/qat/qcccrypto.cc @@ -331,7 +331,7 @@ bool QccCrypto::perform_op_batch(unsigned char* out, const unsigned char* in, si int avail_inst = NON_INSTANCE; if (y) { - spawn::yield_context yield = y.get_yield_context(); + boost::asio::yield_context yield = y.get_yield_context(); avail_inst = async_get_instance(yield); } else { auto result = async_get_instance(boost::asio::use_future); @@ -546,7 +546,7 @@ bool QccCrypto::symPerformOp(int avail_inst, do { poll_retry_num = RETRY_MAX_NUM; if (y) { - spawn::yield_context yield = y.get_yield_context(); + boost::asio::yield_context yield = y.get_yield_context(); status = helper.async_perform_op(std::span(pOpDataVec), yield); } else { auto result = helper.async_perform_op(std::span(pOpDataVec), boost::asio::use_future); diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt index cf214b39c957e..728dc6dba217b 100644 --- a/src/rgw/CMakeLists.txt +++ b/src/rgw/CMakeLists.txt @@ -301,7 +301,7 @@ target_link_libraries(rgw_common PUBLIC ${LUA_LIBRARIES} RapidJSON::RapidJSON - spawn + Boost::context ${FMT_LIB} OpenSSL::SSL) target_include_directories(rgw_common @@ -433,7 +433,7 @@ target_link_libraries(rgw_a OATH::OATH PUBLIC rgw_common - spawn) + Boost::context) if(WITH_CURL_OPENSSL) # used by rgw_http_client_curl.cc @@ -449,7 +449,7 @@ set(rgw_schedulers_srcs add_library(rgw_schedulers STATIC ${rgw_schedulers_srcs}) target_link_libraries(rgw_schedulers - PUBLIC dmclock::dmclock spawn) + PUBLIC dmclock::dmclock Boost::context) set(radosgw_srcs rgw_main.cc) diff --git a/src/rgw/driver/d4n/d4n_directory.cc b/src/rgw/driver/d4n/d4n_directory.cc index 2e9f9ad80cf8e..35b34a2c1d9ae 100644 --- a/src/rgw/driver/d4n/d4n_directory.cc +++ b/src/rgw/driver/d4n/d4n_directory.cc @@ -17,7 +17,7 @@ struct initiate_exec { { auto h = boost::asio::consign(std::move(handler), conn); return boost::asio::dispatch(get_executor(), - [c = conn, &req, &resp, h = std::move(h)] { + [c = conn, &req, &resp, h = std::move(h)] () mutable { return c->async_exec(req, resp, std::move(h)); }); } diff --git a/src/rgw/driver/d4n/d4n_policy.cc b/src/rgw/driver/d4n/d4n_policy.cc index 9e2fa358f81bd..b42228078d0d9 100644 --- a/src/rgw/driver/d4n/d4n_policy.cc +++ b/src/rgw/driver/d4n/d4n_policy.cc @@ -17,9 +17,10 @@ struct initiate_exec { void operator()(Handler handler, const boost::redis::request& req, Response& resp) { auto h = asio::consign(std::move(handler), conn); - return asio::dispatch(get_executor(), [c=conn, &req, &resp, h=std::move(h)] { - c->async_exec(req, resp, std::move(h)); - }); + return asio::dispatch(get_executor(), + [c=conn, &req, &resp, h=std::move(h)] () mutable { + c->async_exec(req, resp, std::move(h)); + }); } }; diff --git a/src/rgw/driver/dbstore/CMakeLists.txt b/src/rgw/driver/dbstore/CMakeLists.txt index a3aca7a64e4cc..f401c912f6751 100644 --- a/src/rgw/driver/dbstore/CMakeLists.txt +++ b/src/rgw/driver/dbstore/CMakeLists.txt @@ -29,7 +29,7 @@ target_include_directories(dbstore_lib PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw" PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw/store/rados" PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}") -set(link_targets spawn) +set(link_targets Boost::context) if(WITH_JAEGER) list(APPEND link_targets jaeger_base) endif() diff --git a/src/rgw/driver/rados/cls_fifo_legacy.h b/src/rgw/driver/rados/cls_fifo_legacy.h index ed23129eb3045..85e8f53997536 100644 --- a/src/rgw/driver/rados/cls_fifo_legacy.h +++ b/src/rgw/driver/rados/cls_fifo_legacy.h @@ -89,7 +89,7 @@ using part_info = fifo::part_header; /// /// This library uses optional_yield. Please see /// /src/common/async/yield_context.h. In summary, optional_yield -/// contains either a spawn::yield_context (in which case the current +/// contains either a boost::asio::yield_context (in which case the current /// coroutine is suspended until completion) or null_yield (in which /// case the current thread is blocked until completion.) /// diff --git a/src/rgw/driver/rados/rgw_bucket.cc b/src/rgw/driver/rados/rgw_bucket.cc index 7cf6fbd56f1a2..ff6325e222a7d 100644 --- a/src/rgw/driver/rados/rgw_bucket.cc +++ b/src/rgw/driver/rados/rgw_bucket.cc @@ -513,17 +513,15 @@ int RGWBucket::check_bad_index_multipart(rgw::sal::RadosStore* const rados_store const int max_aio = std::max(1, op_state.get_max_aio()); int any_error = 0; // first error encountered if any for (int i = 0; i < max_aio; i++) { - spawn::spawn(context, [&](spawn::yield_context yield) { + boost::asio::spawn(context, [&](boost::asio::yield_context yield) { while (true) { const int shard = next_shard++; if (shard >= num_shards) { return; } - optional_yield y(context, yield); - int r = ::check_bad_index_multipart(rados_store, &*bucket, dpp, - op_state, flusher, shard, y); + op_state, flusher, shard, yield); if (r < 0) { ldpp_dout(dpp, -1) << "WARNING: error processing shard " << shard << " check_bad_index_multipart(): " << r << "; skipping" << dendl; @@ -533,6 +531,8 @@ int RGWBucket::check_bad_index_multipart(rgw::sal::RadosStore* const rados_store } } } // while + }, [] (std::exception_ptr eptr) { + if (eptr) std::rethrow_exception(eptr); }); } // for @@ -738,14 +738,14 @@ int RGWBucket::check_index_olh(rgw::sal::RadosStore* const rados_store, const int max_aio = std::max(1, op_state.get_max_aio()); for (int i=0; i= max_shards) { return; } - optional_yield y(context, yield); + optional_yield y(yield); uint64_t shard_count; int r = ::check_index_olh(rados_store, &*bucket, dpp, op_state, flusher, shard, &shard_count, y); if (r < 0) { @@ -758,6 +758,8 @@ int RGWBucket::check_index_olh(rgw::sal::RadosStore* const rados_store, " entries " << verb << ")" << dendl; } } + }, [] (std::exception_ptr eptr) { + if (eptr) std::rethrow_exception(eptr); }); } try { @@ -947,7 +949,7 @@ int RGWBucket::check_index_unlinked(rgw::sal::RadosStore* const rados_store, int next_shard = 0; boost::asio::io_context context; for (int i=0; i #include #include +#include #include -#include #include "include/function2.hpp" #include "rgw_sal_rados.h" #include "rgw_pubsub.h" @@ -218,7 +218,7 @@ private: pending_tokens(0), timer(io_context) {} - void async_wait(spawn::yield_context yield) { + void async_wait(boost::asio::yield_context yield) { if (pending_tokens == 0) { return; } @@ -241,7 +241,7 @@ private: // processing of a specific entry // return whether processing was successful (true) or not (false) EntryProcessingResult process_entry(const ConfigProxy& conf, persistency_tracker& entry_persistency_tracker, - const cls_queue_entry& entry, spawn::yield_context yield) { + const cls_queue_entry& entry, boost::asio::yield_context yield) { event_entry_t event_entry; auto iter = entry.data.cbegin(); try { @@ -290,7 +290,7 @@ private: cct); ldpp_dout(this, 20) << "INFO: push endpoint created: " << event_entry.push_endpoint << " for entry: " << entry.marker << dendl; - const auto ret = push_endpoint->send(event_entry.event, optional_yield(io_context, yield)); + const auto ret = push_endpoint->send(event_entry.event, yield); if (ret < 0) { ldpp_dout(this, 5) << "WARNING: push entry marker: " << entry.marker << " failed. error: " << ret @@ -311,7 +311,7 @@ private: } // clean stale reservation from queue - void cleanup_queue(const std::string& queue_name, spawn::yield_context yield) { + void cleanup_queue(const std::string& queue_name, boost::asio::yield_context yield) { while (!shutdown) { ldpp_dout(this, 20) << "INFO: trying to perform stale reservation cleanup for queue: " << queue_name << dendl; const auto now = ceph::coarse_real_time::clock::now(); @@ -324,7 +324,7 @@ private: "" /*no tag*/); cls_2pc_queue_expire_reservations(op, stale_time); // check ownership and do reservation cleanup in one batch - auto ret = rgw_rados_operate(this, rados_store.getRados()->get_notif_pool_ctx(), queue_name, &op, optional_yield(io_context, yield)); + auto ret = rgw_rados_operate(this, rados_store.getRados()->get_notif_pool_ctx(), queue_name, &op, yield); if (ret == -ENOENT) { // queue was deleted ldpp_dout(this, 10) << "INFO: queue: " << queue_name @@ -351,12 +351,12 @@ private: } // unlock (lose ownership) queue - int unlock_queue(const std::string& queue_name, spawn::yield_context yield) { + int unlock_queue(const std::string& queue_name, boost::asio::yield_context yield) { librados::ObjectWriteOperation op; op.assert_exists(); rados::cls::lock::unlock(&op, queue_name+"_lock", lock_cookie); auto& rados_ioctx = rados_store.getRados()->get_notif_pool_ctx(); - const auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield)); + const auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, yield); if (ret == -ENOENT) { ldpp_dout(this, 10) << "INFO: queue: " << queue_name << ". was removed. nothing to unlock" << dendl; @@ -371,15 +371,18 @@ private: } // processing of a specific queue - void process_queue(const std::string& queue_name, spawn::yield_context yield) { + void process_queue(const std::string& queue_name, boost::asio::yield_context yield) { constexpr auto max_elements = 1024; auto is_idle = false; const std::string start_marker; // start a the cleanup coroutine for the queue - spawn::spawn(io_context, [this, queue_name](spawn::yield_context yield) { - cleanup_queue(queue_name, yield); - }, make_stack_allocator()); + boost::asio::spawn(make_strand(io_context), std::allocator_arg, make_stack_allocator(), + [this, queue_name](boost::asio::yield_context yield) { + cleanup_queue(queue_name, yield); + }, [] (std::exception_ptr eptr) { + if (eptr) std::rethrow_exception(eptr); + }); CountersManager queue_counters_container(queue_name, this->get_cct()); @@ -410,7 +413,7 @@ private: "" /*no tag*/); cls_2pc_queue_list_entries(op, start_marker, max_elements, &obl, &rval); // check ownership and list entries in one batch - auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, nullptr, optional_yield(io_context, yield)); + auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, nullptr, yield); if (ret == -ENOENT) { // queue was deleted topics_persistency_tracker.erase(queue_name); @@ -460,8 +463,9 @@ private: } entries_persistency_tracker& notifs_persistency_tracker = topics_persistency_tracker[queue_name]; - spawn::spawn(yield, [this, ¬ifs_persistency_tracker, &queue_name, entry_idx, total_entries, &end_marker, - &remove_entries, &has_error, &waiter, &entry, &needs_migration_vector](spawn::yield_context yield) { + boost::asio::spawn(yield, std::allocator_arg, make_stack_allocator(), + [this, ¬ifs_persistency_tracker, &queue_name, entry_idx, total_entries, &end_marker, + &remove_entries, &has_error, &waiter, &entry, &needs_migration_vector](boost::asio::yield_context yield) { const auto token = waiter.make_token(); auto& persistency_tracker = notifs_persistency_tracker[entry.marker]; auto result = process_entry(this->get_cct()->_conf, persistency_tracker, entry, yield); @@ -483,7 +487,9 @@ private: ldpp_dout(this, 20) << "INFO: processing of entry: " << entry.marker << " (" << entry_idx << "/" << total_entries << ") from: " << queue_name << " failed" << dendl; } - }, make_stack_allocator()); + }, [] (std::exception_ptr eptr) { + if (eptr) std::rethrow_exception(eptr); + }); ++entry_idx; } @@ -515,7 +521,7 @@ private: "" /*no tag*/); cls_2pc_queue_remove_entries(op, end_marker, entries_to_remove); // check ownership and deleted entries in one batch - auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield)); + auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, yield); if (ret == -ENOENT) { // queue was deleted ldpp_dout(this, 10) << "INFO: queue: " << queue_name @@ -547,7 +553,7 @@ private: rgw_pubsub_topic topic; auto ret_of_get_topic = ps.get_topic(this, queue_name, topic, - optional_yield(io_context, yield), nullptr); + yield, nullptr); if (ret_of_get_topic < 0) { // we can't migrate entries without topic info ldpp_dout(this, 1) << "ERROR: failed to fetch topic: " << queue_name << " error: " @@ -579,7 +585,7 @@ private: buffer::list obl; int rval; cls_2pc_queue_reserve(op, size_to_migrate, migration_vector.size(), &obl, &rval); - ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield), librados::OPERATION_RETURNVEC); + ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, yield, librados::OPERATION_RETURNVEC); if (ret < 0) { ldpp_dout(this, 1) << "ERROR: failed to reserve migration space on queue: " << queue_name << ". error: " << ret << dendl; return; @@ -591,7 +597,7 @@ private: } cls_2pc_queue_commit(op, migration_vector, reservation_id); - ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield)); + ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, yield); reservation_id = cls_2pc_reservation::NO_ID; if (ret < 0) { ldpp_dout(this, 1) << "ERROR: failed to commit reservation to queue: " << queue_name << ". error: " << ret << dendl; @@ -618,7 +624,7 @@ private: // process all queues // find which of the queues is owned by this daemon and process it - void process_queues(spawn::yield_context yield) { + void process_queues(boost::asio::yield_context yield) { auto has_error = false; owned_queues_t owned_queues; size_t processed_queue_count = 0; @@ -646,7 +652,7 @@ private: timer.async_wait(yield[ec]); queues_t queues; - auto ret = read_queue_list(queues, optional_yield(io_context, yield)); + auto ret = read_queue_list(queues, yield); if (ret < 0) { has_error = true; continue; @@ -665,7 +671,7 @@ private: failover_time, LOCK_FLAG_MAY_RENEW); - ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield)); + ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, yield); if (ret == -EBUSY) { // lock is already taken by another RGW ldpp_dout(this, 20) << "INFO: queue: " << queue_name << " owned (locked) by another daemon" << dendl; @@ -687,7 +693,8 @@ private: if (owned_queues.insert(queue_name).second) { ldpp_dout(this, 10) << "INFO: queue: " << queue_name << " now owned (locked) by this daemon" << dendl; // start processing this queue - spawn::spawn(io_context, [this, &queue_gc, &queue_gc_lock, queue_name, &processed_queue_count](spawn::yield_context yield) { + boost::asio::spawn(make_strand(io_context), std::allocator_arg, make_stack_allocator(), + [this, &queue_gc, &queue_gc_lock, queue_name, &processed_queue_count](boost::asio::yield_context yield) { ++processed_queue_count; process_queue(queue_name, yield); // if queue processing ended, it means that the queue was removed or not owned anymore @@ -703,7 +710,9 @@ private: queue_gc.push_back(queue_name); --processed_queue_count; ldpp_dout(this, 10) << "INFO: queue: " << queue_name << " marked for removal" << dendl; - }, make_stack_allocator()); + }, [] (std::exception_ptr eptr) { + if (eptr) std::rethrow_exception(eptr); + }); } else { ldpp_dout(this, 20) << "INFO: queue: " << queue_name << " ownership (lock) renewed" << dendl; } @@ -741,9 +750,12 @@ public: } void init() { - spawn::spawn(io_context, [this](spawn::yield_context yield) { + boost::asio::spawn(make_strand(io_context), std::allocator_arg, make_stack_allocator(), + [this](boost::asio::yield_context yield) { process_queues(yield); - }, make_stack_allocator()); + }, [] (std::exception_ptr eptr) { + if (eptr) std::rethrow_exception(eptr); + }); // start the worker threads to do the actual queue processing const std::string WORKER_THREAD_NAME = "notif-worker"; diff --git a/src/rgw/driver/rados/rgw_pubsub_push.cc b/src/rgw/driver/rados/rgw_pubsub_push.cc index 8906d2e6cb956..ce765853ed4f4 100644 --- a/src/rgw/driver/rados/rgw_pubsub_push.cc +++ b/src/rgw/driver/rados/rgw_pubsub_push.cc @@ -19,7 +19,7 @@ #ifdef WITH_RADOSGW_KAFKA_ENDPOINT #include "rgw_kafka.h" #endif -#include +//#include #include #include #include "rgw_perf_counters.h" @@ -153,7 +153,7 @@ public: boost::system::error_code ec; auto yield = y.get_yield_context(); auto&& token = yield[ec]; - boost::asio::async_initiate( + boost::asio::async_initiate( [this] (auto handler, auto ex) { completion = Completion::create(ex, std::move(handler)); }, token, yield.get_executor()); diff --git a/src/rgw/driver/rados/rgw_rados.cc b/src/rgw/driver/rados/rgw_rados.cc index 45d50452e03c5..c30caf3f53356 100644 --- a/src/rgw/driver/rados/rgw_rados.cc +++ b/src/rgw/driver/rados/rgw_rados.cc @@ -1363,10 +1363,15 @@ int RGWRados::init_complete(const DoutPrefixProvider *dpp, optional_yield y) using namespace rgw; if (svc.site->is_meta_master() && all_zonegroups_support(*svc.site, zone_features::notification_v2)) { - spawn::spawn(v1_topic_migration, [this] (spawn::yield_context yield) { + boost::asio::spawn(v1_topic_migration, [this] (boost::asio::yield_context yield) { DoutPrefix dpp{cct, dout_subsys, "v1 topic migration: "}; rgwrados::topic_migration::migrate(&dpp, driver, v1_topic_migration, yield); + }, [] (std::exception_ptr eptr) { + if (eptr) std::rethrow_exception(eptr); }); + // TODO: we run this on a separate thread so shutdown can cancel it with + // v1_topic_migration.stop(), but we could run it on the global thread + // pool and cancel spawn() with a cancellation_signal instead v1_topic_migration.start(1); } } diff --git a/src/rgw/driver/rados/rgw_reshard.cc b/src/rgw/driver/rados/rgw_reshard.cc index c1011bd60a550..d590bff7fcfde 100644 --- a/src/rgw/driver/rados/rgw_reshard.cc +++ b/src/rgw/driver/rados/rgw_reshard.cc @@ -1153,10 +1153,9 @@ int RGWReshardWait::wait(optional_yield y) } if (y) { - auto& context = y.get_io_context(); auto& yield = y.get_yield_context(); - Waiter waiter(context); + Waiter waiter(yield.get_executor()); waiters.push_back(waiter); lock.unlock(); diff --git a/src/rgw/driver/rados/rgw_reshard.h b/src/rgw/driver/rados/rgw_reshard.h index 0497414566ad2..8e37defa1dbdc 100644 --- a/src/rgw/driver/rados/rgw_reshard.h +++ b/src/rgw/driver/rados/rgw_reshard.h @@ -252,11 +252,11 @@ class RGWReshardWait { ceph::condition_variable cond; struct Waiter : boost::intrusive::list_base_hook<> { - using Executor = boost::asio::io_context::executor_type; + using Executor = boost::asio::any_io_executor; using Timer = boost::asio::basic_waitable_timer, Executor>; Timer timer; - explicit Waiter(boost::asio::io_context& ioc) : timer(ioc) {} + explicit Waiter(boost::asio::any_io_executor ex) : timer(ex) {} }; boost::intrusive::list waiters; diff --git a/src/rgw/driver/rados/rgw_tools.cc b/src/rgw/driver/rados/rgw_tools.cc index 41254f9519e34..eec4a79911565 100644 --- a/src/rgw/driver/rados/rgw_tools.cc +++ b/src/rgw/driver/rados/rgw_tools.cc @@ -203,11 +203,10 @@ int rgw_rados_operate(const DoutPrefixProvider *dpp, librados::IoCtx& ioctx, con // given a yield_context, call async_operate() to yield the coroutine instead // of blocking if (y) { - auto& context = y.get_io_context(); auto& yield = y.get_yield_context(); boost::system::error_code ec; auto bl = librados::async_operate( - context, ioctx, oid, op, flags, trace_info, yield[ec]); + yield, ioctx, oid, op, flags, trace_info, yield[ec]); if (pbl) { *pbl = std::move(bl); } @@ -228,10 +227,9 @@ int rgw_rados_operate(const DoutPrefixProvider *dpp, librados::IoCtx& ioctx, con int flags, const jspan_context* trace_info) { if (y) { - auto& context = y.get_io_context(); auto& yield = y.get_yield_context(); boost::system::error_code ec; - librados::async_operate(context, ioctx, oid, op, flags, trace_info, yield[ec]); + librados::async_operate(yield, ioctx, oid, op, flags, trace_info, yield[ec]); return -ec.value(); } if (is_asio_thread) { @@ -248,10 +246,9 @@ int rgw_rados_notify(const DoutPrefixProvider *dpp, librados::IoCtx& ioctx, cons optional_yield y) { if (y) { - auto& context = y.get_io_context(); auto& yield = y.get_yield_context(); boost::system::error_code ec; - auto reply = librados::async_notify(context, ioctx, oid, + auto reply = librados::async_notify(yield, ioctx, oid, bl, timeout_ms, yield[ec]); if (pbl) { *pbl = std::move(reply); diff --git a/src/rgw/driver/rados/topic_migration.cc b/src/rgw/driver/rados/topic_migration.cc index c7dcfc37b4489..0d4238f84312e 100644 --- a/src/rgw/driver/rados/topic_migration.cc +++ b/src/rgw/driver/rados/topic_migration.cc @@ -263,10 +263,8 @@ int migrate_topics(const DoutPrefixProvider* dpp, optional_yield y, int migrate(const DoutPrefixProvider* dpp, rgw::sal::RadosStore* driver, boost::asio::io_context& context, - spawn::yield_context yield) + boost::asio::yield_context y) { - auto y = optional_yield{context, yield}; - ldpp_dout(dpp, 1) << "starting v1 topic migration.." << dendl; librados::Rados* rados = driver->getRados()->get_rados_handle(); diff --git a/src/rgw/driver/rados/topic_migration.h b/src/rgw/driver/rados/topic_migration.h index 9545fd63c2e66..2bd2b730c85a5 100644 --- a/src/rgw/driver/rados/topic_migration.h +++ b/src/rgw/driver/rados/topic_migration.h @@ -16,7 +16,7 @@ #pragma once #include -#include +#include class DoutPrefixProvider; namespace rgw::sal { class RadosStore; } @@ -29,6 +29,6 @@ namespace rgwrados::topic_migration { int migrate(const DoutPrefixProvider* dpp, rgw::sal::RadosStore* driver, boost::asio::io_context& context, - spawn::yield_context yield); + boost::asio::yield_context yield); } // rgwrados::topic_migration diff --git a/src/rgw/rgw_aio.cc b/src/rgw/rgw_aio.cc index 5b2ce1569b33a..a239171f104d7 100644 --- a/src/rgw/rgw_aio.cc +++ b/src/rgw/rgw_aio.cc @@ -90,16 +90,14 @@ struct Handler { template Aio::OpFunc aio_abstract(librados::IoCtx ctx, Op&& op, - boost::asio::io_context& context, - spawn::yield_context yield, jspan_context* trace_ctx = nullptr) { - return [ctx = std::move(ctx), op = std::move(op), &context, yield, trace_ctx] (Aio* aio, AioResult& r) mutable { + boost::asio::yield_context yield, + jspan_context* trace_ctx) { + return [ctx = std::move(ctx), op = std::move(op), yield, trace_ctx] (Aio* aio, AioResult& r) mutable { // arrange for the completion Handler to run on the yield_context's strand // executor so it can safely call back into Aio without locking - using namespace boost::asio; - async_completion init(yield); - auto ex = get_associated_executor(init.completion_handler); + auto ex = yield.get_executor(); - librados::async_operate(context, ctx, r.obj.oid, &op, 0, trace_ctx, + librados::async_operate(yield, ctx, r.obj.oid, &op, 0, trace_ctx, bind_executor(ex, Handler{aio, ctx, r})); }; } @@ -111,7 +109,7 @@ Aio::OpFunc d3n_cache_aio_abstract(const DoutPrefixProvider *dpp, optional_yield ceph_assert(y); auto c = std::make_unique(); lsubdout(g_ceph_context, rgw_datacache, 20) << "D3nDataCache: d3n_cache_aio_abstract(): libaio Read From Cache, oid=" << r.obj.oid << dendl; - c->file_aio_read_abstract(dpp, y.get_io_context(), y.get_yield_context(), cache_location, read_ofs, read_len, aio, r); + c->file_aio_read_abstract(dpp, y.get_yield_context(), cache_location, read_ofs, read_len, aio, r); }; } @@ -123,7 +121,7 @@ Aio::OpFunc aio_abstract(librados::IoCtx ctx, Op&& op, optional_yield y, jspan_c static_assert(!std::is_const_v); if (y) { return aio_abstract(std::move(ctx), std::forward(op), - y.get_io_context(), y.get_yield_context(), trace_ctx); + y.get_yield_context(), trace_ctx); } return aio_abstract(std::move(ctx), std::forward(op), trace_ctx); } diff --git a/src/rgw/rgw_aio_throttle.h b/src/rgw/rgw_aio_throttle.h index c0656ef225e6a..87fc980a94c16 100644 --- a/src/rgw/rgw_aio_throttle.h +++ b/src/rgw/rgw_aio_throttle.h @@ -80,8 +80,7 @@ class BlockingAioThrottle final : public Aio, private Throttle { // a throttle that yields the coroutine instead of blocking. all public // functions must be called within the coroutine strand class YieldingAioThrottle final : public Aio, private Throttle { - boost::asio::io_context& context; - spawn::yield_context yield; + boost::asio::yield_context yield; struct Handler; // completion callback associated with the waiter @@ -94,9 +93,8 @@ class YieldingAioThrottle final : public Aio, private Throttle { struct Pending : AioResultEntry { uint64_t cost = 0; }; public: - YieldingAioThrottle(uint64_t window, boost::asio::io_context& context, - spawn::yield_context yield) - : Throttle(window), context(context), yield(yield) + YieldingAioThrottle(uint64_t window, boost::asio::yield_context yield) + : Throttle(window), yield(yield) {} virtual ~YieldingAioThrottle() override {}; @@ -119,7 +117,6 @@ inline auto make_throttle(uint64_t window_size, optional_yield y) std::unique_ptr aio; if (y) { aio = std::make_unique(window_size, - y.get_io_context(), y.get_yield_context()); } else { aio = std::make_unique(window_size); diff --git a/src/rgw/rgw_asio_frontend.cc b/src/rgw/rgw_asio_frontend.cc index 8c64570287dae..ace3b7aff49e9 100644 --- a/src/rgw/rgw_asio_frontend.cc +++ b/src/rgw/rgw_asio_frontend.cc @@ -3,6 +3,7 @@ #include #include +#include #include #include @@ -16,7 +17,7 @@ #include #include -#include +#include #include "common/async/shared_mutex.h" #include "common/errno.h" @@ -69,12 +70,12 @@ class StreamIO : public rgw::asio::ClientIO { CephContext* const cct; Stream& stream; timeout_timer& timeout; - spawn::yield_context yield; + boost::asio::yield_context yield; parse_buffer& buffer; boost::system::error_code fatal_ec; public: StreamIO(CephContext *cct, Stream& stream, timeout_timer& timeout, - rgw::asio::parser_type& parser, spawn::yield_context yield, + rgw::asio::parser_type& parser, boost::asio::yield_context yield, parse_buffer& buffer, bool is_ssl, const tcp::endpoint& local_endpoint, const tcp::endpoint& remote_endpoint) @@ -200,7 +201,7 @@ void handle_connection(boost::asio::io_context& context, rgw::dmclock::Scheduler *scheduler, const std::string& uri_prefix, boost::system::error_code& ec, - spawn::yield_context yield) + boost::asio::yield_context yield) { // don't impose a limit on the body, since we read it in pieces static constexpr size_t body_limit = std::numeric_limits::max(); @@ -281,7 +282,7 @@ void handle_connection(boost::asio::io_context& context, RGWRestfulIO client(cct, &real_client_io); optional_yield y = null_yield; if (cct->_conf->rgw_beast_enable_async) { - y = optional_yield{context, yield}; + y = optional_yield{yield}; } int http_ret = 0; string user = "-"; @@ -1021,8 +1022,8 @@ void AsioFrontend::accept(Listener& l, boost::system::error_code ec) // spawn a coroutine to handle the connection #ifdef WITH_RADOSGW_BEAST_OPENSSL if (l.use_ssl) { - spawn::spawn(context, - [this, s=std::move(stream)] (spawn::yield_context yield) mutable { + boost::asio::spawn(make_strand(context), std::allocator_arg, make_stack_allocator(), + [this, s=std::move(stream)] (boost::asio::yield_context yield) mutable { auto conn = boost::intrusive_ptr{new Connection(std::move(s))}; auto c = connections.add(*conn); // wrap the tcp stream in an ssl stream @@ -1049,13 +1050,15 @@ void AsioFrontend::accept(Listener& l, boost::system::error_code ec) } conn->socket.shutdown(tcp::socket::shutdown_both, ec); - }, make_stack_allocator()); + }, [] (std::exception_ptr eptr) { + if (eptr) std::rethrow_exception(eptr); + }); } else { #else { #endif // WITH_RADOSGW_BEAST_OPENSSL - spawn::spawn(context, - [this, s=std::move(stream)] (spawn::yield_context yield) mutable { + boost::asio::spawn(make_strand(context), std::allocator_arg, make_stack_allocator(), + [this, s=std::move(stream)] (boost::asio::yield_context yield) mutable { auto conn = boost::intrusive_ptr{new Connection(std::move(s))}; auto c = connections.add(*conn); auto timeout = timeout_timer{context.get_executor(), request_timeout, conn}; @@ -1064,7 +1067,9 @@ void AsioFrontend::accept(Listener& l, boost::system::error_code ec) conn->buffer, false, pause_mutex, scheduler.get(), uri_prefix, ec, yield); conn->socket.shutdown(tcp::socket::shutdown_both, ec); - }, make_stack_allocator()); + }, [] (std::exception_ptr eptr) { + if (eptr) std::rethrow_exception(eptr); + }); } } diff --git a/src/rgw/rgw_d3n_cacherequest.h b/src/rgw/rgw_d3n_cacherequest.h index 0519c6def3e82..54b495f5461f8 100644 --- a/src/rgw/rgw_d3n_cacherequest.h +++ b/src/rgw/rgw_d3n_cacherequest.h @@ -7,6 +7,8 @@ #include #include +#include + #include "include/rados/librados.hpp" #include "include/Context.h" #include "common/async/completion.h" @@ -135,13 +137,10 @@ struct D3nL1CacheRequest { } }; - void file_aio_read_abstract(const DoutPrefixProvider *dpp, spawn::yield_context yield, + void file_aio_read_abstract(const DoutPrefixProvider *dpp, boost::asio::yield_context yield, std::string& cache_location, off_t read_ofs, off_t read_len, rgw::Aio* aio, rgw::AioResult& r) { - using namespace boost::asio; - async_completion init(yield); - auto ex = get_associated_executor(init.completion_handler); - + auto ex = yield.get_executor(); ldpp_dout(dpp, 20) << "D3nDataCache: " << __func__ << "(): oid=" << r.obj.oid << dendl; async_read(dpp, ex, cache_location+"/"+url_encode(r.obj.oid, true), read_ofs, read_len, bind_executor(ex, d3n_libaio_handler{aio, r})); } diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index 61191b14e4990..76fbb332e6bc1 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -6789,7 +6789,8 @@ void RGWDeleteMultiObj::execute(optional_yield y) char* buf; std::optional formatter_flush_cond; if (y) { - formatter_flush_cond = std::make_optional(y.get_io_context()); + auto ex = y.get_yield_context().get_executor(); + formatter_flush_cond = std::make_optional(ex); } buf = data.c_str(); @@ -6857,9 +6858,11 @@ void RGWDeleteMultiObj::execute(optional_yield y) return aio_count < max_aio; }); aio_count++; - spawn::spawn(y.get_yield_context(), [this, &y, &aio_count, obj_key, &formatter_flush_cond] (spawn::yield_context yield) { - handle_individual_object(obj_key, optional_yield { y.get_io_context(), yield }, &*formatter_flush_cond); + boost::asio::spawn(y.get_yield_context(), [this, &aio_count, obj_key, &formatter_flush_cond] (boost::asio::yield_context yield) { + handle_individual_object(obj_key, yield, &*formatter_flush_cond); aio_count--; + }, [] (std::exception_ptr eptr) { + if (eptr) std::rethrow_exception(eptr); }); } else { handle_individual_object(obj_key, y, nullptr); diff --git a/src/rgw/rgw_redis_driver.cc b/src/rgw/rgw_redis_driver.cc index 183f5351e2fbb..0d8e462365eee 100644 --- a/src/rgw/rgw_redis_driver.cc +++ b/src/rgw/rgw_redis_driver.cc @@ -35,9 +35,9 @@ struct initiate_exec { { auto h = boost::asio::consign(std::move(handler), conn); return boost::asio::dispatch(get_executor(), - [c=conn, &req, &resp, h=std::move(h)] { + [c=conn, &req, &resp, h=std::move(h)] () mutable { return c->async_exec(req, resp, std::move(h)); - }); + }); } }; @@ -555,9 +555,8 @@ Aio::OpFunc RedisDriver::redis_read_op(optional_yield y, std::shared_ptr init(yield); - auto ex = get_associated_executor(init.completion_handler); + yield_context yield = y.get_yield_context(); + auto ex = yield.get_executor(); // TODO: Make unique pointer once support is added auto s = std::make_shared(); @@ -574,9 +573,8 @@ Aio::OpFunc RedisDriver::redis_write_op(optional_yield y, std::shared_ptr init(yield); - auto ex = get_associated_executor(init.completion_handler); + yield_context yield = y.get_yield_context(); + auto ex = yield.get_executor(); auto redisAttrs = build_attrs(attrs); diff --git a/src/rgw/rgw_ssd_driver.cc b/src/rgw/rgw_ssd_driver.cc index 24726ebcf62e1..5d7a5a97119c2 100644 --- a/src/rgw/rgw_ssd_driver.cc +++ b/src/rgw/rgw_ssd_driver.cc @@ -98,9 +98,8 @@ int SSDDriver::put(const DoutPrefixProvider* dpp, const std::string& key, const boost::system::error_code ec; if (y) { using namespace boost::asio; - spawn::yield_context yield = y.get_yield_context(); - async_completion init(yield); - auto ex = get_associated_executor(init.completion_handler); + yield_context yield = y.get_yield_context(); + auto ex = yield.get_executor(); this->put_async(dpp, ex, key, bl, len, attrs, yield[ec]); } else { auto ex = boost::asio::system_executor{}; @@ -285,9 +284,8 @@ rgw::Aio::OpFunc SSDDriver::ssd_cache_read_op(const DoutPrefixProvider *dpp, opt ldpp_dout(dpp, 20) << "SSDCache: cache_read_op(): Read From Cache, oid=" << r.obj.oid << dendl; using namespace boost::asio; - spawn::yield_context yield = y.get_yield_context(); - async_completion init(yield); - auto ex = get_associated_executor(init.completion_handler); + yield_context yield = y.get_yield_context(); + auto ex = yield.get_executor(); ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): key=" << key << dendl; this->get_async(dpp, ex, key, read_ofs, read_len, bind_executor(ex, SSDDriver::libaio_read_handler{aio, r})); @@ -301,9 +299,8 @@ rgw::Aio::OpFunc SSDDriver::ssd_cache_write_op(const DoutPrefixProvider *dpp, op ldpp_dout(dpp, 20) << "SSDCache: cache_write_op(): Write to Cache, oid=" << r.obj.oid << dendl; using namespace boost::asio; - spawn::yield_context yield = y.get_yield_context(); - async_completion init(yield); - auto ex = get_associated_executor(init.completion_handler); + yield_context yield = y.get_yield_context(); + auto ex = yield.get_executor(); ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): key=" << key << dendl; this->put_async(dpp, ex, key, bl, len, attrs, bind_executor(ex, SSDDriver::libaio_write_handler{aio, r})); diff --git a/src/rgw/rgw_sync_checkpoint.cc b/src/rgw/rgw_sync_checkpoint.cc index 1394a712a94f1..1172e79a48f32 100644 --- a/src/rgw/rgw_sync_checkpoint.cc +++ b/src/rgw/rgw_sync_checkpoint.cc @@ -226,8 +226,8 @@ int rgw_bucket_sync_checkpoint(const DoutPrefixProvider* dpp, entry.pipe = pipe; // fetch remote markers - spawn::spawn(ioctx, [&] (spawn::yield_context yield) { - auto y = optional_yield{ioctx, yield}; + boost::asio::spawn(ioctx, [&] (boost::asio::yield_context yield) { + auto y = optional_yield{yield}; rgw_bucket_index_marker_info info; int r = source_bilog_info(dpp, store->svc()->zone, entry.pipe, info, entry.remote_markers, y); @@ -237,10 +237,12 @@ int rgw_bucket_sync_checkpoint(const DoutPrefixProvider* dpp, throw std::system_error(-r, std::system_category()); } entry.latest_gen = info.latest_gen; + }, [] (std::exception_ptr eptr) { + if (eptr) std::rethrow_exception(eptr); }); // fetch source bucket info - spawn::spawn(ioctx, [&] (spawn::yield_context yield) { - auto y = optional_yield{ioctx, yield}; + boost::asio::spawn(ioctx, [&] (boost::asio::yield_context yield) { + auto y = optional_yield{yield}; int r = store->getRados()->get_bucket_instance_info( *entry.pipe.source.bucket, entry.source_bucket_info, nullptr, nullptr, y, dpp); @@ -249,6 +251,8 @@ int rgw_bucket_sync_checkpoint(const DoutPrefixProvider* dpp, << cpp_strerror(r) << dendl; throw std::system_error(-r, std::system_category()); } + }, [] (std::exception_ptr eptr) { + if (eptr) std::rethrow_exception(eptr); }); } diff --git a/src/test/CMakeLists.txt b/src/test/CMakeLists.txt index a0c8fcfe823cc..2e756eeb58380 100644 --- a/src/test/CMakeLists.txt +++ b/src/test/CMakeLists.txt @@ -340,7 +340,7 @@ target_link_libraries(ceph_test_librgw_file_nfsns ${LUA_LIBRARIES} ${ALLOC_LIBS} ) - target_link_libraries(ceph_test_librgw_file_nfsns spawn) + target_link_libraries(ceph_test_librgw_file_nfsns Boost::context) install(TARGETS ceph_test_librgw_file_nfsns DESTINATION ${CMAKE_INSTALL_BINDIR}) # ceph_test_librgw_file_aw (nfs write transaction [atomic write] tests) @@ -374,7 +374,7 @@ target_link_libraries(ceph_test_librgw_file_marker ${LUA_LIBRARIES} ${ALLOC_LIBS} ) - target_link_libraries(ceph_test_librgw_file_marker spawn) + target_link_libraries(ceph_test_librgw_file_marker Boost::context) install(TARGETS ceph_test_librgw_file_marker DESTINATION ${CMAKE_INSTALL_BINDIR}) # ceph_test_librgw_file_xattr (attribute ops) @@ -393,7 +393,7 @@ target_link_libraries(ceph_test_librgw_file_xattr ${LUA_LIBRARIES} ${ALLOC_LIBS} ) -target_link_libraries(ceph_test_librgw_file_xattr spawn) +target_link_libraries(ceph_test_librgw_file_xattr Boost::context) # ceph_test_librgw_file_rename (mv/rename tests) add_executable(ceph_test_librgw_file_rename diff --git a/src/test/librados/CMakeLists.txt b/src/test/librados/CMakeLists.txt index 69f511fecca55..e80f646065781 100644 --- a/src/test/librados/CMakeLists.txt +++ b/src/test/librados/CMakeLists.txt @@ -60,7 +60,7 @@ target_link_libraries(ceph_test_rados_api_aio_pp add_executable(ceph_test_rados_api_asio asio.cc) target_link_libraries(ceph_test_rados_api_asio global - librados ${UNITTEST_LIBS} spawn) + librados ${UNITTEST_LIBS} Boost::context) add_executable(ceph_test_rados_api_list list.cc @@ -133,7 +133,7 @@ target_include_directories(ceph_test_rados_api_tier_pp PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw") target_link_libraries(ceph_test_rados_api_tier_pp librados global ${UNITTEST_LIBS} Boost::system radostest-cxx cls_cas_internal - cls_cas_client spawn) + cls_cas_client Boost::context) add_executable(ceph_test_rados_api_snapshots snapshots.cc) diff --git a/src/test/librados/asio.cc b/src/test/librados/asio.cc index 0ede2e14fb465..9f8844eb7bb82 100644 --- a/src/test/librados/asio.cc +++ b/src/test/librados/asio.cc @@ -21,8 +21,8 @@ #include #include -#include #include +#include #include #define dout_subsys ceph_subsys_rados @@ -73,6 +73,10 @@ librados::Rados AsioRados::rados; librados::IoCtx AsioRados::io; librados::IoCtx AsioRados::snapio; +void rethrow(std::exception_ptr eptr) { + if (eptr) std::rethrow_exception(eptr); +} + TEST_F(AsioRados, AsyncReadCallback) { boost::asio::io_context service; @@ -113,20 +117,20 @@ TEST_F(AsioRados, AsyncReadYield) { boost::asio::io_context service; - auto success_cr = [&] (spawn::yield_context yield) { + auto success_cr = [&] (boost::asio::yield_context yield) { boost::system::error_code ec; auto bl = librados::async_read(service, io, "exist", 256, 0, yield[ec]); EXPECT_FALSE(ec); EXPECT_EQ("hello", bl.to_str()); }; - spawn::spawn(service, success_cr); + boost::asio::spawn(service, success_cr, rethrow); - auto failure_cr = [&] (spawn::yield_context yield) { + auto failure_cr = [&] (boost::asio::yield_context yield) { boost::system::error_code ec; auto bl = librados::async_read(service, io, "noexist", 256, 0, yield[ec]); EXPECT_EQ(boost::system::errc::no_such_file_or_directory, ec); }; - spawn::spawn(service, failure_cr); + boost::asio::spawn(service, failure_cr, rethrow); service.run(); } @@ -178,22 +182,22 @@ TEST_F(AsioRados, AsyncWriteYield) bufferlist bl; bl.append("hello"); - auto success_cr = [&] (spawn::yield_context yield) { + auto success_cr = [&] (boost::asio::yield_context yield) { boost::system::error_code ec; librados::async_write(service, io, "exist", bl, bl.length(), 0, yield[ec]); EXPECT_FALSE(ec); EXPECT_EQ("hello", bl.to_str()); }; - spawn::spawn(service, success_cr); + boost::asio::spawn(service, success_cr, rethrow); - auto failure_cr = [&] (spawn::yield_context yield) { + auto failure_cr = [&] (boost::asio::yield_context yield) { boost::system::error_code ec; librados::async_write(service, snapio, "exist", bl, bl.length(), 0, yield[ec]); EXPECT_EQ(boost::system::errc::read_only_file_system, ec); }; - spawn::spawn(service, failure_cr); + boost::asio::spawn(service, failure_cr, rethrow); service.run(); } @@ -251,7 +255,7 @@ TEST_F(AsioRados, AsyncReadOperationYield) { boost::asio::io_context service; - auto success_cr = [&] (spawn::yield_context yield) { + auto success_cr = [&] (boost::asio::yield_context yield) { librados::ObjectReadOperation op; op.read(0, 0, nullptr, nullptr); boost::system::error_code ec; @@ -260,9 +264,9 @@ TEST_F(AsioRados, AsyncReadOperationYield) EXPECT_FALSE(ec); EXPECT_EQ("hello", bl.to_str()); }; - spawn::spawn(service, success_cr); + boost::asio::spawn(service, success_cr, rethrow); - auto failure_cr = [&] (spawn::yield_context yield) { + auto failure_cr = [&] (boost::asio::yield_context yield) { librados::ObjectReadOperation op; op.read(0, 0, nullptr, nullptr); boost::system::error_code ec; @@ -270,7 +274,7 @@ TEST_F(AsioRados, AsyncReadOperationYield) yield[ec]); EXPECT_EQ(boost::system::errc::no_such_file_or_directory, ec); }; - spawn::spawn(service, failure_cr); + boost::asio::spawn(service, failure_cr, rethrow); service.run(); } @@ -335,23 +339,23 @@ TEST_F(AsioRados, AsyncWriteOperationYield) bufferlist bl; bl.append("hello"); - auto success_cr = [&] (spawn::yield_context yield) { + auto success_cr = [&] (boost::asio::yield_context yield) { librados::ObjectWriteOperation op; op.write_full(bl); boost::system::error_code ec; librados::async_operate(service, io, "exist", &op, 0, nullptr, yield[ec]); EXPECT_FALSE(ec); }; - spawn::spawn(service, success_cr); + boost::asio::spawn(service, success_cr, rethrow); - auto failure_cr = [&] (spawn::yield_context yield) { + auto failure_cr = [&] (boost::asio::yield_context yield) { librados::ObjectWriteOperation op; op.write_full(bl); boost::system::error_code ec; librados::async_operate(service, snapio, "exist", &op, 0, nullptr, yield[ec]); EXPECT_EQ(boost::system::errc::read_only_file_system, ec); }; - spawn::spawn(service, failure_cr); + boost::asio::spawn(service, failure_cr, rethrow); service.run(); } diff --git a/src/test/rgw/CMakeLists.txt b/src/test/rgw/CMakeLists.txt index 93deb5ff1ac40..c96e9012790f6 100644 --- a/src/test/rgw/CMakeLists.txt +++ b/src/test/rgw/CMakeLists.txt @@ -34,7 +34,6 @@ target_link_libraries(ceph_test_rgw_d4n_directory PRIVATE ${UNITTEST_LIBS} ${EXTRALIBS} ) - target_link_libraries(ceph_test_rgw_d4n_directory PRIVATE spawn) install(TARGETS ceph_test_rgw_d4n_directory DESTINATION ${CMAKE_INSTALL_BINDIR}) add_executable(ceph_test_rgw_d4n_policy @@ -50,7 +49,6 @@ target_link_libraries(ceph_test_rgw_d4n_policy PRIVATE ${UNITTEST_LIBS} ${EXTRALIBS} ) - target_link_libraries(ceph_test_rgw_d4n_policy PRIVATE spawn) install(TARGETS ceph_test_rgw_d4n_policy DESTINATION ${CMAKE_INSTALL_BINDIR}) add_executable(ceph_test_rgw_redis_driver @@ -66,7 +64,6 @@ target_link_libraries(ceph_test_rgw_redis_driver PRIVATE ${UNITTEST_LIBS} ${EXTRALIBS} ) - target_link_libraries(ceph_test_rgw_redis_driver PRIVATE spawn) install(TARGETS ceph_test_rgw_redis_driver DESTINATION ${CMAKE_INSTALL_BINDIR}) add_executable(ceph_test_rgw_ssd_driver @@ -82,7 +79,6 @@ target_link_libraries(ceph_test_rgw_ssd_driver PRIVATE ${UNITTEST_LIBS} ${EXTRALIBS} ) - target_link_libraries(ceph_test_rgw_ssd_driver PRIVATE spawn) install(TARGETS ceph_test_rgw_ssd_driver DESTINATION ${CMAKE_INSTALL_BINDIR}) endif() diff --git a/src/test/rgw/bench_rgw_ratelimit.cc b/src/test/rgw/bench_rgw_ratelimit.cc index 1ea8714f9df00..529d8f739fd93 100644 --- a/src/test/rgw/bench_rgw_ratelimit.cc +++ b/src/test/rgw/bench_rgw_ratelimit.cc @@ -6,7 +6,7 @@ #include #include #include -#include +#include #include #include #include @@ -37,7 +37,7 @@ struct parameters { std::shared_ptr> ds = std::make_shared>(std::vector()); std::string method[2] = {"PUT", "GET"}; -void simulate_transfer(client_info& it, const RGWRateLimitInfo* info, std::shared_ptr ratelimit, const parameters& params, spawn::yield_context& yield, boost::asio::io_context& ioctx) +void simulate_transfer(client_info& it, const RGWRateLimitInfo* info, std::shared_ptr ratelimit, const parameters& params, boost::asio::yield_context& yield, boost::asio::io_context& ioctx) { auto dout = DoutPrefix(g_ceph_context, ceph_subsys_rgw, "rate limiter: "); boost::asio::steady_timer timer(ioctx); @@ -101,7 +101,7 @@ bool simulate_request(client_info& it, const RGWRateLimitInfo& info, std::shared it.accepted++; return false; } -void simulate_client(client_info& it, const RGWRateLimitInfo& info, std::shared_ptr ratelimit, const parameters& params, spawn::yield_context& ctx, bool& to_run, boost::asio::io_context& ioctx) +void simulate_client(client_info& it, const RGWRateLimitInfo& info, std::shared_ptr ratelimit, const parameters& params, boost::asio::yield_context& ctx, bool& to_run, boost::asio::io_context& ioctx) { for (;;) { @@ -130,11 +130,13 @@ void simulate_clients(boost::asio::io_context& context, std::string tenant, cons auto& it = ds->emplace_back(client_info()); it.tenant = tenant; int x = ds->size() - 1; - spawn::spawn(context, - [&to_run ,x, ratelimit, info, params, &context](spawn::yield_context ctx) + boost::asio::spawn(context, + [&to_run ,x, ratelimit, info, params, &context](boost::asio::yield_context ctx) { auto& it = ds.get()->operator[](x); simulate_client(it, info, ratelimit, params, ctx, to_run, context); + }, [] (std::exception_ptr eptr) { + if (eptr) std::rethrow_exception(eptr); }); } } diff --git a/src/test/rgw/test_d4n_directory.cc b/src/test/rgw/test_d4n_directory.cc index c518eb50c7dfa..05ad839914123 100644 --- a/src/test/rgw/test_d4n_directory.cc +++ b/src/test/rgw/test_d4n_directory.cc @@ -138,10 +138,14 @@ class BlockDirectoryFixture: public ::testing::Test { "objName", "bucketName", "creationTime", "dirty", "objHosts"}; }; +void rethrow(std::exception_ptr eptr) { + if (eptr) std::rethrow_exception(eptr); +} + TEST_F(ObjectDirectoryFixture, SetYield) { - spawn::spawn(io, [this] (spawn::yield_context yield) { - ASSERT_EQ(0, dir->set(obj, optional_yield{io, yield})); + boost::asio::spawn(io, [this] (boost::asio::yield_context yield) { + ASSERT_EQ(0, dir->set(obj, yield)); boost::system::error_code ec; request req; @@ -156,15 +160,15 @@ TEST_F(ObjectDirectoryFixture, SetYield) ASSERT_EQ((bool)ec, false); EXPECT_EQ(std::get<0>(resp).value(), vals); conn->cancel(); - }); + }, rethrow); io.run(); } TEST_F(ObjectDirectoryFixture, GetYield) { - spawn::spawn(io, [this] (spawn::yield_context yield) { - ASSERT_EQ(0, dir->set(obj, optional_yield{io, yield})); + boost::asio::spawn(io, [this] (boost::asio::yield_context yield) { + ASSERT_EQ(0, dir->set(obj, yield)); { boost::system::error_code ec; @@ -178,7 +182,7 @@ TEST_F(ObjectDirectoryFixture, GetYield) EXPECT_EQ(std::get<0>(resp).value(), 0); } - ASSERT_EQ(0, dir->get(obj, optional_yield{io, yield})); + ASSERT_EQ(0, dir->get(obj, yield)); EXPECT_EQ(obj->objName, "newoid"); { @@ -191,16 +195,16 @@ TEST_F(ObjectDirectoryFixture, GetYield) } conn->cancel(); - }); + }, rethrow); io.run(); } TEST_F(ObjectDirectoryFixture, CopyYield) { - spawn::spawn(io, [this] (spawn::yield_context yield) { - ASSERT_EQ(0, dir->set(obj, optional_yield{io, yield})); - ASSERT_EQ(0, dir->copy(obj, "copyTestName", "copyBucketName", optional_yield{io, yield})); + boost::asio::spawn(io, [this] (boost::asio::yield_context yield) { + ASSERT_EQ(0, dir->set(obj, yield)); + ASSERT_EQ(0, dir->copy(obj, "copyTestName", "copyBucketName", yield)); boost::system::error_code ec; request req; @@ -222,15 +226,15 @@ TEST_F(ObjectDirectoryFixture, CopyYield) EXPECT_EQ(std::get<1>(resp).value(), copyVals); conn->cancel(); - }); + }, rethrow); io.run(); } TEST_F(ObjectDirectoryFixture, DelYield) { - spawn::spawn(io, [this] (spawn::yield_context yield) { - ASSERT_EQ(0, dir->set(obj, optional_yield{io, yield})); + boost::asio::spawn(io, [this] (boost::asio::yield_context yield) { + ASSERT_EQ(0, dir->set(obj, yield)); { boost::system::error_code ec; @@ -244,7 +248,7 @@ TEST_F(ObjectDirectoryFixture, DelYield) EXPECT_EQ(std::get<0>(resp).value(), 1); } - ASSERT_EQ(0, dir->del(obj, optional_yield{io, yield})); + ASSERT_EQ(0, dir->del(obj, yield)); { boost::system::error_code ec; @@ -260,17 +264,17 @@ TEST_F(ObjectDirectoryFixture, DelYield) } conn->cancel(); - }); + }, rethrow); io.run(); } TEST_F(ObjectDirectoryFixture, UpdateFieldYield) { - spawn::spawn(io, [this] (spawn::yield_context yield) { - ASSERT_EQ(0, dir->set(obj, optional_yield{io, yield})); - ASSERT_EQ(0, dir->update_field(obj, "objName", "newTestName", optional_yield{io, yield})); - ASSERT_EQ(0, dir->update_field(obj, "objHosts", "127.0.0.1:5000", optional_yield{io, yield})); + boost::asio::spawn(io, [this] (boost::asio::yield_context yield) { + ASSERT_EQ(0, dir->set(obj, yield)); + ASSERT_EQ(0, dir->update_field(obj, "objName", "newTestName", yield)); + ASSERT_EQ(0, dir->update_field(obj, "objHosts", "127.0.0.1:5000", yield)); boost::system::error_code ec; request req; @@ -286,7 +290,7 @@ TEST_F(ObjectDirectoryFixture, UpdateFieldYield) EXPECT_EQ(std::get<0>(resp).value()[1], "127.0.0.1:6379_127.0.0.1:5000"); conn->cancel(); - }); + }, rethrow); io.run(); } @@ -294,8 +298,8 @@ TEST_F(ObjectDirectoryFixture, UpdateFieldYield) TEST_F(BlockDirectoryFixture, SetYield) { - spawn::spawn(io, [this] (spawn::yield_context yield) { - ASSERT_EQ(0, dir->set(block, optional_yield{io, yield})); + boost::asio::spawn(io, [this] (boost::asio::yield_context yield) { + ASSERT_EQ(0, dir->set(block, yield)); boost::system::error_code ec; request req; @@ -310,15 +314,15 @@ TEST_F(BlockDirectoryFixture, SetYield) ASSERT_EQ((bool)ec, false); EXPECT_EQ(std::get<0>(resp).value(), vals); conn->cancel(); - }); + }, rethrow); io.run(); } TEST_F(BlockDirectoryFixture, GetYield) { - spawn::spawn(io, [this] (spawn::yield_context yield) { - ASSERT_EQ(0, dir->set(block, optional_yield{io, yield})); + boost::asio::spawn(io, [this] (boost::asio::yield_context yield) { + ASSERT_EQ(0, dir->set(block, yield)); { boost::system::error_code ec; @@ -332,7 +336,7 @@ TEST_F(BlockDirectoryFixture, GetYield) EXPECT_EQ(std::get<0>(resp).value(), 0); } - ASSERT_EQ(0, dir->get(block, optional_yield{io, yield})); + ASSERT_EQ(0, dir->get(block, yield)); EXPECT_EQ(block->cacheObj.objName, "newoid"); { @@ -345,16 +349,16 @@ TEST_F(BlockDirectoryFixture, GetYield) } conn->cancel(); - }); + }, rethrow); io.run(); } TEST_F(BlockDirectoryFixture, CopyYield) { - spawn::spawn(io, [this] (spawn::yield_context yield) { - ASSERT_EQ(0, dir->set(block, optional_yield{io, yield})); - ASSERT_EQ(0, dir->copy(block, "copyTestName", "copyBucketName", optional_yield{io, yield})); + boost::asio::spawn(io, [this] (boost::asio::yield_context yield) { + ASSERT_EQ(0, dir->set(block, yield)); + ASSERT_EQ(0, dir->copy(block, "copyTestName", "copyBucketName", yield)); boost::system::error_code ec; request req; @@ -376,15 +380,15 @@ TEST_F(BlockDirectoryFixture, CopyYield) EXPECT_EQ(std::get<1>(resp).value(), copyVals); conn->cancel(); - }); + }, rethrow); io.run(); } TEST_F(BlockDirectoryFixture, DelYield) { - spawn::spawn(io, [this] (spawn::yield_context yield) { - ASSERT_EQ(0, dir->set(block, optional_yield{io, yield})); + boost::asio::spawn(io, [this] (boost::asio::yield_context yield) { + ASSERT_EQ(0, dir->set(block, yield)); { boost::system::error_code ec; @@ -398,7 +402,7 @@ TEST_F(BlockDirectoryFixture, DelYield) EXPECT_EQ(std::get<0>(resp).value(), 1); } - ASSERT_EQ(0, dir->del(block, optional_yield{io, yield})); + ASSERT_EQ(0, dir->del(block, yield)); { boost::system::error_code ec; @@ -414,17 +418,17 @@ TEST_F(BlockDirectoryFixture, DelYield) } conn->cancel(); - }); + }, rethrow); io.run(); } TEST_F(BlockDirectoryFixture, UpdateFieldYield) { - spawn::spawn(io, [this] (spawn::yield_context yield) { - ASSERT_EQ(0, dir->set(block, optional_yield{io, yield})); - ASSERT_EQ(0, dir->update_field(block, "objName", "newTestName", optional_yield{io, yield})); - ASSERT_EQ(0, dir->update_field(block, "blockHosts", "127.0.0.1:5000", optional_yield{io, yield})); + boost::asio::spawn(io, [this] (boost::asio::yield_context yield) { + ASSERT_EQ(0, dir->set(block, yield)); + ASSERT_EQ(0, dir->update_field(block, "objName", "newTestName", yield)); + ASSERT_EQ(0, dir->update_field(block, "blockHosts", "127.0.0.1:5000", yield)); boost::system::error_code ec; request req; @@ -440,17 +444,17 @@ TEST_F(BlockDirectoryFixture, UpdateFieldYield) EXPECT_EQ(std::get<0>(resp).value()[1], "127.0.0.1:6379_127.0.0.1:5000"); conn->cancel(); - }); + }, rethrow); io.run(); } TEST_F(BlockDirectoryFixture, RemoveHostYield) { - spawn::spawn(io, [this] (spawn::yield_context yield) { + boost::asio::spawn(io, [this] (boost::asio::yield_context yield) { block->hostsList.push_back("127.0.0.1:6000"); - ASSERT_EQ(0, dir->set(block, optional_yield{io, yield})); - ASSERT_EQ(0, dir->remove_host(block, "127.0.0.1:6379", optional_yield{io, yield})); + ASSERT_EQ(0, dir->set(block, yield)); + ASSERT_EQ(0, dir->remove_host(block, "127.0.0.1:6379", yield)); { boost::system::error_code ec; @@ -466,7 +470,7 @@ TEST_F(BlockDirectoryFixture, RemoveHostYield) EXPECT_EQ(std::get<1>(resp).value(), "127.0.0.1:6000"); } - ASSERT_EQ(0, dir->remove_host(block, "127.0.0.1:6000", optional_yield{io, yield})); + ASSERT_EQ(0, dir->remove_host(block, "127.0.0.1:6000", yield)); { boost::system::error_code ec; @@ -482,7 +486,7 @@ TEST_F(BlockDirectoryFixture, RemoveHostYield) } conn->cancel(); - }); + }, rethrow); io.run(); } diff --git a/src/test/rgw/test_d4n_policy.cc b/src/test/rgw/test_d4n_policy.cc index fe27753600360..dea349af514ed 100644 --- a/src/test/rgw/test_d4n_policy.cc +++ b/src/test/rgw/test_d4n_policy.cc @@ -156,14 +156,18 @@ class LFUDAPolicyFixture : public ::testing::Test { rgw::sal::Attrs attrs; }; +void rethrow(std::exception_ptr eptr) { + if (eptr) std::rethrow_exception(eptr); +} + TEST_F(LFUDAPolicyFixture, LocalGetBlockYield) { - spawn::spawn(io, [this] (spawn::yield_context yield) { + boost::asio::spawn(io, [this] (boost::asio::yield_context yield) { std::string key = block->cacheObj.bucketName + "_" + block->cacheObj.objName + "_" + std::to_string(block->blockID) + "_" + std::to_string(block->size); - ASSERT_EQ(0, cacheDriver->put(env->dpp, key, bl, bl.length(), attrs, optional_yield{io, yield})); - policyDriver->get_cache_policy()->update(env->dpp, key, 0, bl.length(), "", optional_yield{io, yield}); + ASSERT_EQ(0, cacheDriver->put(env->dpp, key, bl, bl.length(), attrs, yield)); + policyDriver->get_cache_policy()->update(env->dpp, key, 0, bl.length(), "", yield); - ASSERT_EQ(lfuda(env->dpp, block, cacheDriver, optional_yield{io, yield}), 0); + ASSERT_EQ(lfuda(env->dpp, block, cacheDriver, yield), 0); cacheDriver->shutdown(); @@ -179,14 +183,14 @@ TEST_F(LFUDAPolicyFixture, LocalGetBlockYield) ASSERT_EQ((bool)ec, false); EXPECT_EQ(std::get<0>(resp).value(), "2"); conn->cancel(); - }); + }, rethrow); io.run(); } TEST_F(LFUDAPolicyFixture, RemoteGetBlockYield) { - spawn::spawn(io, [this] (spawn::yield_context yield) { + boost::asio::spawn(io, [this] (boost::asio::yield_context yield) { /* Set victim block for eviction */ rgw::d4n::CacheBlock victim = rgw::d4n::CacheBlock{ .cacheObj = { @@ -210,10 +214,10 @@ TEST_F(LFUDAPolicyFixture, RemoteGetBlockYield) attrVal.append("testBucket"); attrs.insert({"bucket_name", attrVal}); - ASSERT_EQ(0, dir->set(&victim, optional_yield{io, yield})); + ASSERT_EQ(0, dir->set(&victim, yield)); std::string victimKey = victim.cacheObj.bucketName + "_" + victim.cacheObj.objName + "_" + std::to_string(victim.blockID) + "_" + std::to_string(victim.size); - ASSERT_EQ(0, cacheDriver->put(env->dpp, victimKey, bl, bl.length(), attrs, optional_yield{io, yield})); - policyDriver->get_cache_policy()->update(env->dpp, victimKey, 0, bl.length(), "", optional_yield{io, yield}); + ASSERT_EQ(0, cacheDriver->put(env->dpp, victimKey, bl, bl.length(), attrs, yield)); + policyDriver->get_cache_policy()->update(env->dpp, victimKey, 0, bl.length(), "", yield); /* Remote block */ block->size = cacheDriver->get_free_space(env->dpp) + 1; /* To trigger eviction */ @@ -222,9 +226,9 @@ TEST_F(LFUDAPolicyFixture, RemoteGetBlockYield) block->hostsList.push_back("127.0.0.1:6000"); block->cacheObj.hostsList.push_back("127.0.0.1:6000"); - ASSERT_EQ(0, dir->set(block, optional_yield{io, yield})); + ASSERT_EQ(0, dir->set(block, yield)); - ASSERT_GE(lfuda(env->dpp, block, cacheDriver, optional_yield{io, yield}), 0); + ASSERT_GE(lfuda(env->dpp, block, cacheDriver, yield), 0); cacheDriver->shutdown(); @@ -246,15 +250,15 @@ TEST_F(LFUDAPolicyFixture, RemoteGetBlockYield) EXPECT_EQ(std::get<1>(resp).value(), 0); EXPECT_EQ(std::get<2>(resp).value(), "1"); conn->cancel(); - }); + }, rethrow); io.run(); } TEST_F(LFUDAPolicyFixture, BackendGetBlockYield) { - spawn::spawn(io, [this] (spawn::yield_context yield) { - ASSERT_GE(lfuda(env->dpp, block, cacheDriver, optional_yield{io, yield}), 0); + boost::asio::spawn(io, [this] (boost::asio::yield_context yield) { + ASSERT_GE(lfuda(env->dpp, block, cacheDriver, yield), 0); cacheDriver->shutdown(); @@ -267,16 +271,16 @@ TEST_F(LFUDAPolicyFixture, BackendGetBlockYield) conn->async_exec(req, resp, yield[ec]); conn->cancel(); - }); + }, rethrow); io.run(); } TEST_F(LFUDAPolicyFixture, RedisSyncTest) { - spawn::spawn(io, [this] (spawn::yield_context yield) { + boost::asio::spawn(io, [this] (boost::asio::yield_context yield) { env->cct->_conf->rgw_lfuda_sync_frequency = 1; - dynamic_cast(policyDriver->get_cache_policy())->save_y(optional_yield{io, yield}); + dynamic_cast(policyDriver->get_cache_policy())->save_y(yield); policyDriver->get_cache_policy()->init(env->cct, env->dpp, io); cacheDriver->shutdown(); @@ -308,7 +312,7 @@ TEST_F(LFUDAPolicyFixture, RedisSyncTest) delete policyDriver; policyDriver = nullptr; - }); + }, rethrow); io.run(); } diff --git a/src/test/rgw/test_redis_driver.cc b/src/test/rgw/test_redis_driver.cc index e8ca1de6d55e9..be5f1496a52c2 100644 --- a/src/test/rgw/test_redis_driver.cc +++ b/src/test/rgw/test_redis_driver.cc @@ -122,10 +122,14 @@ class RedisDriverFixture: public ::testing::Test { rgw::sal::Attrs attrs; }; +void rethrow(std::exception_ptr eptr) { + if (eptr) std::rethrow_exception(eptr); +} + TEST_F(RedisDriverFixture, PutYield) { - spawn::spawn(io, [this] (spawn::yield_context yield) { - ASSERT_EQ(0, cacheDriver->put(env->dpp, "testName", bl, bl.length(), attrs, optional_yield{io, yield})); + boost::asio::spawn(io, [this] (boost::asio::yield_context yield) { + ASSERT_EQ(0, cacheDriver->put(env->dpp, "testName", bl, bl.length(), attrs, yield)); cacheDriver->shutdown(); boost::system::error_code ec; @@ -140,15 +144,15 @@ TEST_F(RedisDriverFixture, PutYield) ASSERT_EQ((bool)ec, false); EXPECT_EQ(std::get<0>(resp).value(), "attrVal"); conn->cancel(); - }); + }, rethrow); io.run(); } TEST_F(RedisDriverFixture, GetYield) { - spawn::spawn(io, [this] (spawn::yield_context yield) { - ASSERT_EQ(0, cacheDriver->put(env->dpp, "testName", bl, bl.length(), attrs, optional_yield{io, yield})); + boost::asio::spawn(io, [this] (boost::asio::yield_context yield) { + ASSERT_EQ(0, cacheDriver->put(env->dpp, "testName", bl, bl.length(), attrs, yield)); { boost::system::error_code ec; @@ -165,7 +169,7 @@ TEST_F(RedisDriverFixture, GetYield) bufferlist ret; rgw::sal::Attrs retAttrs; - ASSERT_EQ(0, cacheDriver->get(env->dpp, "testName", 0, bl.length(), ret, retAttrs, optional_yield{io, yield})); + ASSERT_EQ(0, cacheDriver->get(env->dpp, "testName", 0, bl.length(), ret, retAttrs, yield)); EXPECT_EQ(ret.to_str(), "new data"); EXPECT_EQ(retAttrs.begin()->second.to_str(), "newVal"); cacheDriver->shutdown(); @@ -182,16 +186,16 @@ TEST_F(RedisDriverFixture, GetYield) } conn->cancel(); - }); + }, rethrow); io.run(); } TEST_F(RedisDriverFixture, PutAsyncYield) { - spawn::spawn(io, [this] (spawn::yield_context yield) { - std::unique_ptr aio = rgw::make_throttle(env->cct->_conf->rgw_get_obj_window_size, optional_yield{io, yield}); - auto completed = cacheDriver->put_async(env->dpp, optional_yield{io, yield}, aio.get(), "testName", bl, bl.length(), attrs, 0, 0); + boost::asio::spawn(io, [this] (boost::asio::yield_context yield) { + std::unique_ptr aio = rgw::make_throttle(env->cct->_conf->rgw_get_obj_window_size, yield); + auto completed = cacheDriver->put_async(env->dpp, yield, aio.get(), "testName", bl, bl.length(), attrs, 0, 0); drain(env->dpp, aio.get()); cacheDriver->shutdown(); @@ -208,18 +212,18 @@ TEST_F(RedisDriverFixture, PutAsyncYield) EXPECT_EQ(std::get<0>(resp).value()[0], "attrVal"); EXPECT_EQ(std::get<0>(resp).value()[1], "test data"); conn->cancel(); - }); + }, rethrow); io.run(); } TEST_F(RedisDriverFixture, GetAsyncYield) { - spawn::spawn(io, [this] (spawn::yield_context yield) { - ASSERT_EQ(0, cacheDriver->put(env->dpp, "testName", bl, bl.length(), attrs, optional_yield{io, yield})); + boost::asio::spawn(io, [this] (boost::asio::yield_context yield) { + ASSERT_EQ(0, cacheDriver->put(env->dpp, "testName", bl, bl.length(), attrs, yield)); - std::unique_ptr aio = rgw::make_throttle(env->cct->_conf->rgw_get_obj_window_size, optional_yield{io, yield}); - auto completed = cacheDriver->get_async(env->dpp, optional_yield{io, yield}, aio.get(), "testName", 0, bl.length(), 0, 0); + std::unique_ptr aio = rgw::make_throttle(env->cct->_conf->rgw_get_obj_window_size, yield); + auto completed = cacheDriver->get_async(env->dpp, yield, aio.get(), "testName", 0, bl.length(), 0, 0); drain(env->dpp, aio.get()); cacheDriver->shutdown(); @@ -236,15 +240,15 @@ TEST_F(RedisDriverFixture, GetAsyncYield) EXPECT_EQ(std::get<0>(resp).value()[0], "attrVal"); EXPECT_EQ(std::get<0>(resp).value()[1], "test data"); conn->cancel(); - }); + }, rethrow); io.run(); } TEST_F(RedisDriverFixture, DelYield) { - spawn::spawn(io, [this] (spawn::yield_context yield) { - ASSERT_EQ(0, cacheDriver->put(env->dpp, "testName", bl, bl.length(), attrs, optional_yield{io, yield})); + boost::asio::spawn(io, [this] (boost::asio::yield_context yield) { + ASSERT_EQ(0, cacheDriver->put(env->dpp, "testName", bl, bl.length(), attrs, yield)); { boost::system::error_code ec; @@ -258,7 +262,7 @@ TEST_F(RedisDriverFixture, DelYield) EXPECT_EQ(std::get<0>(resp).value(), 1); } - ASSERT_EQ(0, cacheDriver->del(env->dpp, "testName", optional_yield{io, yield})); + ASSERT_EQ(0, cacheDriver->del(env->dpp, "testName", yield)); cacheDriver->shutdown(); { @@ -275,15 +279,15 @@ TEST_F(RedisDriverFixture, DelYield) } conn->cancel(); - }); + }, rethrow); io.run(); } TEST_F(RedisDriverFixture, AppendDataYield) { - spawn::spawn(io, [this] (spawn::yield_context yield) { - ASSERT_EQ(0, cacheDriver->put(env->dpp, "testName", bl, bl.length(), attrs, optional_yield{io, yield})); + boost::asio::spawn(io, [this] (boost::asio::yield_context yield) { + ASSERT_EQ(0, cacheDriver->put(env->dpp, "testName", bl, bl.length(), attrs, yield)); { boost::system::error_code ec; @@ -300,7 +304,7 @@ TEST_F(RedisDriverFixture, AppendDataYield) bufferlist val; val.append(" has been written"); - ASSERT_EQ(0, cacheDriver->append_data(env->dpp, "testName", val, optional_yield{io, yield})); + ASSERT_EQ(0, cacheDriver->append_data(env->dpp, "testName", val, yield)); cacheDriver->shutdown(); { @@ -317,15 +321,15 @@ TEST_F(RedisDriverFixture, AppendDataYield) } conn->cancel(); - }); + }, rethrow); io.run(); } TEST_F(RedisDriverFixture, DeleteDataYield) { - spawn::spawn(io, [this] (spawn::yield_context yield) { - ASSERT_EQ(0, cacheDriver->put(env->dpp, "testName", bl, bl.length(), attrs, optional_yield{io, yield})); + boost::asio::spawn(io, [this] (boost::asio::yield_context yield) { + ASSERT_EQ(0, cacheDriver->put(env->dpp, "testName", bl, bl.length(), attrs, yield)); { boost::system::error_code ec; @@ -339,7 +343,7 @@ TEST_F(RedisDriverFixture, DeleteDataYield) EXPECT_EQ(std::get<0>(resp).value(), 1); } - ASSERT_EQ(0, cacheDriver->delete_data(env->dpp, "testName", optional_yield{io, yield})); + ASSERT_EQ(0, cacheDriver->delete_data(env->dpp, "testName", yield)); cacheDriver->shutdown(); { @@ -356,15 +360,15 @@ TEST_F(RedisDriverFixture, DeleteDataYield) } conn->cancel(); - }); + }, rethrow); io.run(); } TEST_F(RedisDriverFixture, SetAttrsYield) { - spawn::spawn(io, [this] (spawn::yield_context yield) { - ASSERT_EQ(0, cacheDriver->put(env->dpp, "testName", bl, bl.length(), attrs, optional_yield{io, yield})); + boost::asio::spawn(io, [this] (boost::asio::yield_context yield) { + ASSERT_EQ(0, cacheDriver->put(env->dpp, "testName", bl, bl.length(), attrs, yield)); rgw::sal::Attrs newAttrs; bufferlist newVal; @@ -375,7 +379,7 @@ TEST_F(RedisDriverFixture, SetAttrsYield) newVal.append("nextVal"); newAttrs.insert({"nextAttr", newVal}); - ASSERT_EQ(0, cacheDriver->set_attrs(env->dpp, "testName", newAttrs, optional_yield{io, yield})); + ASSERT_EQ(0, cacheDriver->set_attrs(env->dpp, "testName", newAttrs, yield)); cacheDriver->shutdown(); boost::system::error_code ec; @@ -392,20 +396,20 @@ TEST_F(RedisDriverFixture, SetAttrsYield) EXPECT_EQ(std::get<0>(resp).value()[0], "newVal"); EXPECT_EQ(std::get<0>(resp).value()[1], "nextVal"); conn->cancel(); - }); + }, rethrow); io.run(); } TEST_F(RedisDriverFixture, GetAttrsYield) { - spawn::spawn(io, [this] (spawn::yield_context yield) { + boost::asio::spawn(io, [this] (boost::asio::yield_context yield) { rgw::sal::Attrs nextAttrs = attrs; bufferlist nextVal; nextVal.append("nextVal"); nextAttrs.insert({"nextAttr", nextVal}); - ASSERT_EQ(0, cacheDriver->put(env->dpp, "testName", bl, bl.length(), nextAttrs, optional_yield{io, yield})); + ASSERT_EQ(0, cacheDriver->put(env->dpp, "testName", bl, bl.length(), nextAttrs, yield)); { boost::system::error_code ec; @@ -421,7 +425,7 @@ TEST_F(RedisDriverFixture, GetAttrsYield) rgw::sal::Attrs retAttrs; - ASSERT_EQ(0, cacheDriver->get_attrs(env->dpp, "testName", retAttrs, optional_yield{io, yield})); + ASSERT_EQ(0, cacheDriver->get_attrs(env->dpp, "testName", retAttrs, yield)); auto it = retAttrs.begin(); EXPECT_EQ(it->second.to_str(), "newVal1"); @@ -441,22 +445,22 @@ TEST_F(RedisDriverFixture, GetAttrsYield) } conn->cancel(); - }); + }, rethrow); io.run(); } TEST_F(RedisDriverFixture, UpdateAttrsYield) { - spawn::spawn(io, [this] (spawn::yield_context yield) { - ASSERT_EQ(0, cacheDriver->put(env->dpp, "testName", bl, bl.length(), attrs, optional_yield{io, yield})); + boost::asio::spawn(io, [this] (boost::asio::yield_context yield) { + ASSERT_EQ(0, cacheDriver->put(env->dpp, "testName", bl, bl.length(), attrs, yield)); rgw::sal::Attrs newAttrs; bufferlist newVal; newVal.append("newVal"); newAttrs.insert({"attr", newVal}); - ASSERT_EQ(0, cacheDriver->update_attrs(env->dpp, "testName", newAttrs, optional_yield{io, yield})); + ASSERT_EQ(0, cacheDriver->update_attrs(env->dpp, "testName", newAttrs, yield)); cacheDriver->shutdown(); boost::system::error_code ec; @@ -471,15 +475,15 @@ TEST_F(RedisDriverFixture, UpdateAttrsYield) EXPECT_EQ(std::get<0>(resp).value(), "newVal"); conn->cancel(); - }); + }, rethrow); io.run(); } TEST_F(RedisDriverFixture, DeleteAttrsYield) { - spawn::spawn(io, [this] (spawn::yield_context yield) { - ASSERT_EQ(0, cacheDriver->put(env->dpp, "testName", bl, bl.length(), attrs, optional_yield{io, yield})); + boost::asio::spawn(io, [this] (boost::asio::yield_context yield) { + ASSERT_EQ(0, cacheDriver->put(env->dpp, "testName", bl, bl.length(), attrs, yield)); { boost::system::error_code ec; @@ -497,7 +501,7 @@ TEST_F(RedisDriverFixture, DeleteAttrsYield) bufferlist delVal; delAttrs.insert({"attr", delVal}); - ASSERT_GE(cacheDriver->delete_attrs(env->dpp, "testName", delAttrs, optional_yield{io, yield}), 0); + ASSERT_GE(cacheDriver->delete_attrs(env->dpp, "testName", delAttrs, yield), 0); cacheDriver->shutdown(); { @@ -514,16 +518,16 @@ TEST_F(RedisDriverFixture, DeleteAttrsYield) } conn->cancel(); - }); + }, rethrow); io.run(); } TEST_F(RedisDriverFixture, SetAttrYield) { - spawn::spawn(io, [this] (spawn::yield_context yield) { - ASSERT_EQ(0, cacheDriver->put(env->dpp, "testName", bl, bl.length(), attrs, optional_yield{io, yield})); - ASSERT_GE(cacheDriver->set_attr(env->dpp, "testName", "newAttr", "newVal", optional_yield{io, yield}), 0); + boost::asio::spawn(io, [this] (boost::asio::yield_context yield) { + ASSERT_EQ(0, cacheDriver->put(env->dpp, "testName", bl, bl.length(), attrs, yield)); + ASSERT_GE(cacheDriver->set_attr(env->dpp, "testName", "newAttr", "newVal", yield), 0); cacheDriver->shutdown(); boost::system::error_code ec; @@ -538,15 +542,15 @@ TEST_F(RedisDriverFixture, SetAttrYield) ASSERT_EQ((bool)ec, false); EXPECT_EQ(std::get<0>(resp).value(), "newVal"); conn->cancel(); - }); + }, rethrow); io.run(); } TEST_F(RedisDriverFixture, GetAttrYield) { - spawn::spawn(io, [this] (spawn::yield_context yield) { - ASSERT_EQ(0, cacheDriver->put(env->dpp, "testName", bl, bl.length(), attrs, optional_yield{io, yield})); + boost::asio::spawn(io, [this] (boost::asio::yield_context yield) { + ASSERT_EQ(0, cacheDriver->put(env->dpp, "testName", bl, bl.length(), attrs, yield)); { boost::system::error_code ec; @@ -560,7 +564,7 @@ TEST_F(RedisDriverFixture, GetAttrYield) EXPECT_EQ(std::get<0>(resp).value(), 0); } std::string attr_val; - ASSERT_EQ(0, cacheDriver->get_attr(env->dpp, "testName", "attr", attr_val, optional_yield{io, yield})); + ASSERT_EQ(0, cacheDriver->get_attr(env->dpp, "testName", "attr", attr_val, yield)); ASSERT_EQ("newVal", attr_val); cacheDriver->shutdown(); @@ -576,7 +580,7 @@ TEST_F(RedisDriverFixture, GetAttrYield) } conn->cancel(); - }); + }, rethrow); io.run(); } diff --git a/src/test/rgw/test_rgw_dmclock_scheduler.cc b/src/test/rgw/test_rgw_dmclock_scheduler.cc index c9b4a853fd4d7..da748dfa6c9ca 100644 --- a/src/test/rgw/test_rgw_dmclock_scheduler.cc +++ b/src/test/rgw/test_rgw_dmclock_scheduler.cc @@ -18,7 +18,7 @@ #include "rgw_dmclock_async_scheduler.h" #include -#include +#include #include #include "acconfig.h" #include "global/global_context.h" @@ -400,7 +400,7 @@ TEST(Queue, SpawnAsyncRequest) { boost::asio::io_context context; - spawn::spawn(context, [&] (spawn::yield_context yield) { + boost::asio::spawn(context, [&] (boost::asio::yield_context yield) { ClientCounters counters(g_ceph_context); AsyncScheduler queue(g_ceph_context, context, std::ref(counters), nullptr, [] (client_id client) -> ClientInfo* { @@ -419,6 +419,8 @@ TEST(Queue, SpawnAsyncRequest) auto p2 = queue.async_request(client_id::auth, {}, get_time(), 1, yield[ec2]); EXPECT_EQ(boost::system::errc::success, ec2); EXPECT_EQ(PhaseType::priority, p2); + }, [] (std::exception_ptr eptr) { + if (eptr) std::rethrow_exception(eptr); }); context.run_for(std::chrono::milliseconds(50)); diff --git a/src/test/rgw/test_rgw_reshard_wait.cc b/src/test/rgw/test_rgw_reshard_wait.cc index 98b2aa235b95c..058828b956c88 100644 --- a/src/test/rgw/test_rgw_reshard_wait.cc +++ b/src/test/rgw/test_rgw_reshard_wait.cc @@ -13,7 +13,7 @@ */ #include "rgw_reshard.h" -#include +#include #include @@ -58,15 +58,19 @@ TEST(ReshardWait, stop_block) short_waiter.stop(); } +void rethrow(std::exception_ptr eptr) { + if (eptr) std::rethrow_exception(eptr); +} + TEST(ReshardWait, wait_yield) { constexpr ceph::timespan wait_duration = 50ms; RGWReshardWait waiter(wait_duration); boost::asio::io_context context; - spawn::spawn(context, [&] (spawn::yield_context yield) { - EXPECT_EQ(0, waiter.wait(optional_yield{context, yield})); - }); + boost::asio::spawn(context, [&] (boost::asio::yield_context yield) { + EXPECT_EQ(0, waiter.wait(yield)); + }, rethrow); const auto start = Clock::now(); EXPECT_EQ(1u, context.poll()); // spawn @@ -89,10 +93,10 @@ TEST(ReshardWait, stop_yield) RGWReshardWait short_waiter(short_duration); boost::asio::io_context context; - spawn::spawn(context, - [&] (spawn::yield_context yield) { - EXPECT_EQ(-ECANCELED, long_waiter.wait(optional_yield{context, yield})); - }); + boost::asio::spawn(context, + [&] (boost::asio::yield_context yield) { + EXPECT_EQ(-ECANCELED, long_waiter.wait(yield)); + }, rethrow); const auto start = Clock::now(); EXPECT_EQ(1u, context.poll()); // spawn @@ -133,13 +137,13 @@ TEST(ReshardWait, stop_multiple) // spawn 4 coroutines boost::asio::io_context context; { - auto async_waiter = [&] (spawn::yield_context yield) { - EXPECT_EQ(-ECANCELED, long_waiter.wait(optional_yield{context, yield})); + auto async_waiter = [&] (boost::asio::yield_context yield) { + EXPECT_EQ(-ECANCELED, long_waiter.wait(yield)); }; - spawn::spawn(context, async_waiter); - spawn::spawn(context, async_waiter); - spawn::spawn(context, async_waiter); - spawn::spawn(context, async_waiter); + boost::asio::spawn(context, async_waiter, rethrow); + boost::asio::spawn(context, async_waiter, rethrow); + boost::asio::spawn(context, async_waiter, rethrow); + boost::asio::spawn(context, async_waiter, rethrow); } const auto start = Clock::now(); diff --git a/src/test/rgw/test_rgw_throttle.cc b/src/test/rgw/test_rgw_throttle.cc index e5df9f84efa1d..18dd8f3ffbc3e 100644 --- a/src/test/rgw/test_rgw_throttle.cc +++ b/src/test/rgw/test_rgw_throttle.cc @@ -20,9 +20,9 @@ #include #include #include +#include #include "include/scope_guard.h" -#include #include static rgw_raw_obj make_obj(const std::string& oid) @@ -143,13 +143,15 @@ TEST(Aio_Throttle, YieldCostOverWindow) auto obj = make_obj(__PRETTY_FUNCTION__); boost::asio::io_context context; - spawn::spawn(context, - [&] (spawn::yield_context yield) { - YieldingAioThrottle throttle(4, context, yield); + boost::asio::spawn(context, + [&] (boost::asio::yield_context yield) { + YieldingAioThrottle throttle(4, yield); scoped_completion op; auto c = throttle.get(obj, wait_on(op), 8, 0); ASSERT_EQ(1u, c.size()); EXPECT_EQ(-EDEADLK, c.front().result); + }, [] (std::exception_ptr eptr) { + if (eptr) std::rethrow_exception(eptr); }); context.run(); } @@ -166,9 +168,9 @@ TEST(Aio_Throttle, YieldingThrottleOverMax) uint64_t outstanding = 0; boost::asio::io_context context; - spawn::spawn(context, - [&] (spawn::yield_context yield) { - YieldingAioThrottle throttle(window, context, yield); + boost::asio::spawn(context, + [&] (boost::asio::yield_context yield) { + YieldingAioThrottle throttle(window, yield); for (uint64_t i = 0; i < total; i++) { using namespace std::chrono_literals; auto c = throttle.get(obj, wait_for(context, 10ms), 1, 0); @@ -180,6 +182,8 @@ TEST(Aio_Throttle, YieldingThrottleOverMax) } auto c = throttle.drain(); outstanding -= c.size(); + }, [] (std::exception_ptr eptr) { + if (eptr) std::rethrow_exception(eptr); }); context.poll(); // run until we block EXPECT_EQ(window, outstanding); diff --git a/src/test/rgw/test_ssd_driver.cc b/src/test/rgw/test_ssd_driver.cc index 682a12ae700e8..bbd394a0096ea 100644 --- a/src/test/rgw/test_ssd_driver.cc +++ b/src/test/rgw/test_ssd_driver.cc @@ -112,159 +112,163 @@ class SSDDriverFixture: public ::testing::Test { rgw::sal::Attrs del_attrs; }; +void rethrow(std::exception_ptr eptr) { + if (eptr) std::rethrow_exception(eptr); +} + TEST_F(SSDDriverFixture, PutAndGet) { - spawn::spawn(io, [this] (spawn::yield_context yield) { + boost::asio::spawn(io, [this] (boost::asio::yield_context yield) { rgw::sal::Attrs attrs = {}; - ASSERT_EQ(0, cacheDriver->put(env->dpp, "testPutGet", bl, bl.length(), attrs, optional_yield{io, yield})); + ASSERT_EQ(0, cacheDriver->put(env->dpp, "testPutGet", bl, bl.length(), attrs, yield)); bufferlist ret; rgw::sal::Attrs get_attrs; - ASSERT_EQ(0, cacheDriver->get(env->dpp, "testPutGet", 0, bl.length(), ret, get_attrs, optional_yield{io, yield})); + ASSERT_EQ(0, cacheDriver->get(env->dpp, "testPutGet", 0, bl.length(), ret, get_attrs, yield)); EXPECT_EQ(ret, bl); EXPECT_EQ(get_attrs.size(), 0); - }); + }, rethrow); io.run(); } TEST_F(SSDDriverFixture, AppendData) { - spawn::spawn(io, [this] (spawn::yield_context yield) { + boost::asio::spawn(io, [this] (boost::asio::yield_context yield) { rgw::sal::Attrs attrs = {}; - ASSERT_EQ(0, cacheDriver->put(env->dpp, "testAppend", bl, bl.length(), attrs, optional_yield{io, yield})); + ASSERT_EQ(0, cacheDriver->put(env->dpp, "testAppend", bl, bl.length(), attrs, yield)); bufferlist bl_append; bl_append.append(" xyz"); - ASSERT_EQ(0, cacheDriver->append_data(env->dpp, "testAppend", bl_append, optional_yield{io, yield})); + ASSERT_EQ(0, cacheDriver->append_data(env->dpp, "testAppend", bl_append, yield)); bufferlist ret; bl.append(bl_append); rgw::sal::Attrs get_attrs; - ASSERT_EQ(0, cacheDriver->get(env->dpp, "testAppend", 0, bl.length(), ret, get_attrs, optional_yield{io, yield})); + ASSERT_EQ(0, cacheDriver->get(env->dpp, "testAppend", 0, bl.length(), ret, get_attrs, yield)); EXPECT_EQ(ret, bl); EXPECT_EQ(get_attrs.size(), 0); - }); + }, rethrow); io.run(); } TEST_F(SSDDriverFixture, SetGetAttrs) { - spawn::spawn(io, [this] (spawn::yield_context yield) { - ASSERT_EQ(0, cacheDriver->put(env->dpp, "testSetGetAttrs", bl, bl.length(), attrs, optional_yield{io, yield})); + boost::asio::spawn(io, [this] (boost::asio::yield_context yield) { + ASSERT_EQ(0, cacheDriver->put(env->dpp, "testSetGetAttrs", bl, bl.length(), attrs, yield)); bufferlist ret; rgw::sal::Attrs ret_attrs; - ASSERT_EQ(0, cacheDriver->get(env->dpp, "testSetGetAttrs", 0, bl.length(), ret, ret_attrs, optional_yield{io, yield})); + ASSERT_EQ(0, cacheDriver->get(env->dpp, "testSetGetAttrs", 0, bl.length(), ret, ret_attrs, yield)); EXPECT_EQ(ret, bl); EXPECT_EQ(ret_attrs.size(), 1); for (auto& it : ret_attrs) { EXPECT_EQ(it.first, "user.rgw.attrName"); EXPECT_EQ(it.second, attrVal); } - }); + }, rethrow); io.run(); } TEST_F(SSDDriverFixture, UpdateAttrs) { - spawn::spawn(io, [this] (spawn::yield_context yield) { - ASSERT_EQ(0, cacheDriver->put(env->dpp, "testUpdateAttrs", bl, bl.length(), attrs, optional_yield{io, yield})); - ASSERT_EQ(0, cacheDriver->update_attrs(env->dpp, "testUpdateAttrs", update_attrs, optional_yield{io, yield})); + boost::asio::spawn(io, [this] (boost::asio::yield_context yield) { + ASSERT_EQ(0, cacheDriver->put(env->dpp, "testUpdateAttrs", bl, bl.length(), attrs, yield)); + ASSERT_EQ(0, cacheDriver->update_attrs(env->dpp, "testUpdateAttrs", update_attrs, yield)); rgw::sal::Attrs get_attrs; - ASSERT_EQ(0, cacheDriver->get_attrs(env->dpp, "testUpdateAttrs", get_attrs, optional_yield{io, yield})); + ASSERT_EQ(0, cacheDriver->get_attrs(env->dpp, "testUpdateAttrs", get_attrs, yield)); EXPECT_EQ(get_attrs.size(), 2); EXPECT_EQ(get_attrs["user.rgw.attrName"], updateAttrVal1); EXPECT_EQ(get_attrs["user.rgw.testAttr"], updateAttrVal2); - }); + }, rethrow); io.run(); } TEST_F(SSDDriverFixture, SetGetAttr) { - spawn::spawn(io, [this] (spawn::yield_context yield) { + boost::asio::spawn(io, [this] (boost::asio::yield_context yield) { rgw::sal::Attrs attrs = {}; - ASSERT_EQ(0, cacheDriver->put(env->dpp, "testSetGetAttr", bl, bl.length(), attrs, optional_yield{io, yield})); + ASSERT_EQ(0, cacheDriver->put(env->dpp, "testSetGetAttr", bl, bl.length(), attrs, yield)); std::string attr_name = "user.ssd.testattr"; std::string attr_val = "testattrVal"; - ASSERT_EQ(0, cacheDriver->set_attr(env->dpp, "testSetGetAttr", attr_name, attr_val, optional_yield{io, yield})); + ASSERT_EQ(0, cacheDriver->set_attr(env->dpp, "testSetGetAttr", attr_name, attr_val, yield)); std::string attr_val_ret; - ASSERT_EQ(0, cacheDriver->get_attr(env->dpp, "testSetGetAttr", attr_name, attr_val_ret, optional_yield{io, yield})); + ASSERT_EQ(0, cacheDriver->get_attr(env->dpp, "testSetGetAttr", attr_name, attr_val_ret, yield)); ASSERT_EQ(attr_val, attr_val_ret); - }); + }, rethrow); io.run(); } TEST_F(SSDDriverFixture, DeleteAttr) { - spawn::spawn(io, [this] (spawn::yield_context yield) { + boost::asio::spawn(io, [this] (boost::asio::yield_context yield) { rgw::sal::Attrs attrs = {}; - ASSERT_EQ(0, cacheDriver->put(env->dpp, "testDeleteAttr", bl, bl.length(), attrs, optional_yield{io, yield})); + ASSERT_EQ(0, cacheDriver->put(env->dpp, "testDeleteAttr", bl, bl.length(), attrs, yield)); std::string attr_name = "user.ssd.testattr"; std::string attr_val = "testattrVal"; - ASSERT_EQ(0, cacheDriver->set_attr(env->dpp, "testDeleteAttr", attr_name, attr_val, optional_yield{io, yield})); + ASSERT_EQ(0, cacheDriver->set_attr(env->dpp, "testDeleteAttr", attr_name, attr_val, yield)); std::string attr_val_ret; - ASSERT_EQ(0, cacheDriver->get_attr(env->dpp, "testDeleteAttr", attr_name, attr_val_ret, optional_yield{io, yield})); + ASSERT_EQ(0, cacheDriver->get_attr(env->dpp, "testDeleteAttr", attr_name, attr_val_ret, yield)); ASSERT_EQ(attr_val, attr_val_ret); attr_val_ret.clear(); ASSERT_EQ(0, cacheDriver->delete_attr(env->dpp, "testDeleteAttr", attr_name)); - ASSERT_EQ(ENODATA, cacheDriver->get_attr(env->dpp, "testDeleteAttr", attr_name, attr_val_ret, optional_yield{io, yield})); + ASSERT_EQ(ENODATA, cacheDriver->get_attr(env->dpp, "testDeleteAttr", attr_name, attr_val_ret, yield)); ASSERT_EQ("", attr_val_ret); - }); + }, rethrow); io.run(); } TEST_F(SSDDriverFixture, DeleteAttrs) { - spawn::spawn(io, [this] (spawn::yield_context yield) { - ASSERT_EQ(0, cacheDriver->put(env->dpp, "testDeleteAttr", bl, bl.length(), attrs, optional_yield{io, yield})); + boost::asio::spawn(io, [this] (boost::asio::yield_context yield) { + ASSERT_EQ(0, cacheDriver->put(env->dpp, "testDeleteAttr", bl, bl.length(), attrs, yield)); rgw::sal::Attrs ret_attrs; - ASSERT_EQ(0, cacheDriver->get_attrs(env->dpp, "testDeleteAttr", ret_attrs, optional_yield{io, yield})); + ASSERT_EQ(0, cacheDriver->get_attrs(env->dpp, "testDeleteAttr", ret_attrs, yield)); EXPECT_EQ(ret_attrs.size(), 1); for (auto& it : ret_attrs) { EXPECT_EQ(it.first, "user.rgw.attrName"); EXPECT_EQ(it.second, attrVal); } - ASSERT_EQ(0, cacheDriver->delete_attrs(env->dpp, "testDeleteAttr", del_attrs, optional_yield{io, yield})); + ASSERT_EQ(0, cacheDriver->delete_attrs(env->dpp, "testDeleteAttr", del_attrs, yield)); ret_attrs.clear(); - ASSERT_EQ(0, cacheDriver->get_attrs(env->dpp, "testDeleteAttr", del_attrs, optional_yield{io, yield})); + ASSERT_EQ(0, cacheDriver->get_attrs(env->dpp, "testDeleteAttr", del_attrs, yield)); EXPECT_EQ(ret_attrs.size(), 0); - }); + }, rethrow); io.run(); } TEST_F(SSDDriverFixture, DeleteData) { - spawn::spawn(io, [this] (spawn::yield_context yield) { + boost::asio::spawn(io, [this] (boost::asio::yield_context yield) { rgw::sal::Attrs attrs = {}; - ASSERT_EQ(0, cacheDriver->put(env->dpp, "testDeleteData", bl, bl.length(), attrs, optional_yield{io, yield})); + ASSERT_EQ(0, cacheDriver->put(env->dpp, "testDeleteData", bl, bl.length(), attrs, yield)); bufferlist ret; rgw::sal::Attrs get_attrs; - ASSERT_EQ(0, cacheDriver->get(env->dpp, "testDeleteData", 0, bl.length(), ret, get_attrs, optional_yield{io, yield})); + ASSERT_EQ(0, cacheDriver->get(env->dpp, "testDeleteData", 0, bl.length(), ret, get_attrs, yield)); EXPECT_EQ(ret, bl); EXPECT_EQ(get_attrs.size(), 0); - ASSERT_EQ(0, cacheDriver->delete_data(env->dpp, "testDeleteData", optional_yield{io, yield})); - ASSERT_EQ(-ENOENT, cacheDriver->get(env->dpp, "testDeleteData", 0, bl.length(), ret, get_attrs, optional_yield{io, yield})); - }); + ASSERT_EQ(0, cacheDriver->delete_data(env->dpp, "testDeleteData", yield)); + ASSERT_EQ(-ENOENT, cacheDriver->get(env->dpp, "testDeleteData", 0, bl.length(), ret, get_attrs, yield)); + }, rethrow); io.run(); } TEST_F(SSDDriverFixture, PutAsync) { - spawn::spawn(io, [this] (spawn::yield_context yield) { + boost::asio::spawn(io, [this] (boost::asio::yield_context yield) { rgw::sal::Attrs attrs = {}; const uint64_t window_size = env->cct->_conf->rgw_put_obj_min_window_size; - std::unique_ptr aio = rgw::make_throttle(window_size, optional_yield{io, yield}); - auto results = cacheDriver->put_async(env->dpp, optional_yield{io, yield}, aio.get(), "testPutAsync", bl, bl.length(), attrs, bl.length(), 0); + std::unique_ptr aio = rgw::make_throttle(window_size, yield); + auto results = cacheDriver->put_async(env->dpp, yield, aio.get(), "testPutAsync", bl, bl.length(), attrs, bl.length(), 0); drain(env->dpp, aio.get()); - }); + }, rethrow); io.run(); } -- 2.39.5