]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: switch back to boost::asio for spawn() and yield_context
authorCasey Bodley <cbodley@redhat.com>
Thu, 15 Feb 2024 03:53:47 +0000 (22:53 -0500)
committerliangmingyuan <liangmingyuan@baidu.com>
Sat, 5 Oct 2024 07:28:51 +0000 (15:28 +0800)
a fork of boost::asio::spawn() was introduced in 2020 with spawn::spawn() from #31580. this fork enabled rgw to customize how the coroutine stacks are allocated in order to avoid stack overflows in frontend request coroutines. this customization was based on a StackAllocator concept from the boost::context library

in boost 1.80, that same StackAllocator overload was added to boost::asio::spawn(), along with other improvements like per-op cancellation. now that boost has everything we need, switch back and drop the spawn submodule

this required switching a lot of async functions from async_completion<> to async_initiate<>. similar changes were necessary to enable the c++20 coroutine token boost::asio::use_awaitable

Signed-off-by: Casey Bodley <cbodley@redhat.com>
(cherry picked from commit 9dd892e289b32a90b24d55ab8e1b7d7601af21ca)

32 files changed:
src/cls/CMakeLists.txt
src/common/async/yield_context.h
src/crypto/isa-l/CMakeLists.txt
src/crypto/openssl/CMakeLists.txt
src/crypto/qat/CMakeLists.txt
src/crypto/qat/qcccrypto.cc
src/rgw/CMakeLists.txt
src/rgw/driver/dbstore/CMakeLists.txt
src/rgw/driver/rados/cls_fifo_legacy.h
src/rgw/driver/rados/rgw_bucket.cc
src/rgw/driver/rados/rgw_notify.cc
src/rgw/driver/rados/rgw_pubsub_push.cc
src/rgw/driver/rados/rgw_rados.cc
src/rgw/driver/rados/rgw_reshard.cc
src/rgw/driver/rados/rgw_reshard.h
src/rgw/driver/rados/rgw_tools.cc
src/rgw/driver/rados/topic_migration.cc
src/rgw/driver/rados/topic_migration.h
src/rgw/rgw_aio.cc
src/rgw/rgw_aio_throttle.h
src/rgw/rgw_asio_frontend.cc
src/rgw/rgw_d3n_cacherequest.h
src/rgw/rgw_op.cc
src/rgw/rgw_sync_checkpoint.cc
src/test/CMakeLists.txt
src/test/librados/CMakeLists.txt
src/test/librados/asio.cc
src/test/rgw/CMakeLists.txt
src/test/rgw/bench_rgw_ratelimit.cc
src/test/rgw/test_rgw_dmclock_scheduler.cc
src/test/rgw/test_rgw_reshard_wait.cc
src/test/rgw/test_rgw_throttle.cc

index af2249adeae2d0e74c46b2c494288a9d23937caa..8bfeadd529e63c91d7ef89623ab6f68df169c3c9 100644 (file)
@@ -76,8 +76,7 @@ if (WITH_RADOSGW)
   target_link_libraries(cls_otp OATH::OATH)
   target_include_directories(cls_otp
          PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw/driver/rados"
-         PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw"
-         PUBLIC "${CMAKE_SOURCE_DIR}/src/spawn/include")
+         PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw")
   set_target_properties(cls_otp PROPERTIES
     VERSION "1.0.0"
     SOVERSION "1"
@@ -204,8 +203,7 @@ if (WITH_RADOSGW)
   target_link_libraries(cls_rgw ${FMT_LIB} json_spirit)
   target_include_directories(cls_rgw
          PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw/driver/rados"
-         PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw"
-         PUBLIC "${CMAKE_SOURCE_DIR}/src/spawn/include")
+         PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw")
   set_target_properties(cls_rgw PROPERTIES
     VERSION "1.0.0"
     SOVERSION "1"
@@ -220,8 +218,7 @@ if (WITH_RADOSGW)
   add_library(cls_rgw_client STATIC ${cls_rgw_client_srcs})
   target_include_directories(cls_rgw_client
          PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw/driver/rados"
-         PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw"
-         PUBLIC "${CMAKE_SOURCE_DIR}/src/spawn/include")
+         PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw")
 
 endif (WITH_RADOSGW)
 
@@ -313,8 +310,7 @@ if (WITH_RADOSGW)
   add_library(cls_rgw_gc SHARED ${cls_rgw_gc_srcs})
   target_include_directories(cls_rgw_gc
          PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw/driver/rados"
-         PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw"
-         PUBLIC "${CMAKE_SOURCE_DIR}/src/spawn/include")
+         PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw")
   set_target_properties(cls_rgw_gc PROPERTIES
     VERSION "1.0.0"
     SOVERSION "1"
@@ -328,8 +324,7 @@ if (WITH_RADOSGW)
   add_library(cls_rgw_gc_client STATIC ${cls_rgw_gc_client_srcs})
   target_include_directories(cls_rgw_gc_client
          PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw/driver/rados"
-         PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw"
-         PUBLIC "${CMAKE_SOURCE_DIR}/src/spawn/include")
+         PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw")
 endif (WITH_RADOSGW)
 
 
index baa028fa1b4afa7d2d1774af93d899efd2a90c6c..fd9a20901aa5675cbc1ab71497f9d96bcac70805 100644 (file)
 #include <boost/range/begin.hpp>
 #include <boost/range/end.hpp>
 #include <boost/asio/io_context.hpp>
+#include <boost/asio/spawn.hpp>
 
 #include "acconfig.h"
 
-#include <spawn/spawn.hpp>
-
-/// optional-like wrapper for a spawn::yield_context and its associated
-/// boost::asio::io_context. operations that take an optional_yield argument
-/// will, when passed a non-empty yield context, suspend this coroutine instead
-/// of the blocking the thread of execution
+/// optional-like wrapper for a boost::asio::yield_context. operations that take
+/// an optional_yield argument will, when passed a non-empty yield context,
+/// suspend this coroutine instead of the blocking the thread of execution
 class optional_yield {
-  boost::asio::io_context *c = nullptr;
-  spawn::yield_context *y = nullptr;
+  boost::asio::yield_context *y = nullptr;
  public:
   /// construct with a valid io and yield_context
-  explicit optional_yield(boost::asio::io_context& c,
-                          spawn::yield_context& y) noexcept
-    : c(&c), y(&y) {}
+  optional_yield(boost::asio::yield_context& y) noexcept : y(&y) {}
 
   /// type tag to construct an empty object
   struct empty_t {};
@@ -42,11 +37,8 @@ class optional_yield {
   /// implicit conversion to bool, returns true if non-empty
   operator bool() const noexcept { return y; }
 
-  /// return a reference to the associated io_context. only valid if non-empty
-  boost::asio::io_context& get_io_context() const noexcept { return *c; }
-
   /// return a reference to the yield_context. only valid if non-empty
-  spawn::yield_context& get_yield_context() const noexcept { return *y; }
+  boost::asio::yield_context& get_yield_context() const noexcept { return *y; }
 };
 
 // type tag object to construct an empty optional_yield
index c8d832247d92e924b239e8aa7ab8611ac2dc540a..40da7e495c37495895fdbd52260e9572e039280d 100644 (file)
@@ -30,7 +30,7 @@ endif(HAVE_NASM_X64)
 add_library(ceph_crypto_isal SHARED ${isal_crypto_plugin_srcs})
 target_include_directories(ceph_crypto_isal PRIVATE ${isal_dir}/include)
 
-target_link_libraries(ceph_crypto_isal PRIVATE spawn)
+target_link_libraries(ceph_crypto_isal PRIVATE Boost::context)
 
 set_target_properties(ceph_crypto_isal PROPERTIES
   VERSION 1.0.0
index 5365ab9a6ca2241b30e807e0b6c38934f06d4af8..ac9d868939657885b456c4451f5c5f5b1c0aee81 100644 (file)
@@ -8,7 +8,7 @@ add_library(ceph_crypto_openssl SHARED ${openssl_crypto_plugin_srcs})
 target_link_libraries(ceph_crypto_openssl
     PRIVATE OpenSSL::Crypto
     $<$<PLATFORM_ID:Windows>:ceph-common>
-    spawn)
+    Boost::context)
 target_include_directories(ceph_crypto_openssl PRIVATE ${OPENSSL_INCLUDE_DIR})
 add_dependencies(crypto_plugins ceph_crypto_openssl)
 set_target_properties(ceph_crypto_openssl PROPERTIES INSTALL_RPATH "")
index 04bc0b7e7f4309fe037a367d100acc7cbdaaf336..85f7ff50e1343c5f758e9e6446bcf8337d0baf68 100644 (file)
@@ -14,7 +14,7 @@ add_dependencies(crypto_plugins ceph_crypto_qat)
 target_link_libraries(ceph_crypto_qat PRIVATE
                       QAT::qat
                       QAT::usdm
-                      spawn)
+                      Boost::context)
 
 add_dependencies(crypto_plugins ceph_crypto_qat)
 set_target_properties(ceph_crypto_qat PROPERTIES VERSION 1.0.0 SOVERSION 1)
index d441e2cd6163e42ad7075a72322307a2b4105534..94c98518a90b6a20761602c3c50a68b32e05319a 100644 (file)
@@ -331,7 +331,7 @@ bool QccCrypto::perform_op_batch(unsigned char* out, const unsigned char* in, si
   int avail_inst = NON_INSTANCE;
 
   if (y) {
-    spawn::yield_context yield = y.get_yield_context();
+    boost::asio::yield_context yield = y.get_yield_context();
     avail_inst = async_get_instance(yield);
   } else {
     auto result = async_get_instance(boost::asio::use_future);
@@ -546,7 +546,7 @@ bool QccCrypto::symPerformOp(int avail_inst,
     do {
       poll_retry_num = RETRY_MAX_NUM;
       if (y) {
-        spawn::yield_context yield = y.get_yield_context();
+        boost::asio::yield_context yield = y.get_yield_context();
         status = helper.async_perform_op(std::span<CpaCySymDpOpData*>(pOpDataVec), yield);
       } else {
         auto result = helper.async_perform_op(std::span<CpaCySymDpOpData*>(pOpDataVec), boost::asio::use_future);
index f97b12e81da592b726e0203624adad0e0ac32ba6..b090b2eeeb8fa7bceafda24a0d529ed7e7f511d0 100644 (file)
@@ -297,8 +297,9 @@ target_link_libraries(rgw_common
   PUBLIC
     ${LUA_LIBRARIES}
     RapidJSON::RapidJSON
-    spawn
-    ${FMT_LIB})
+    Boost::context
+    ${FMT_LIB}
+    OpenSSL::SSL)
 target_include_directories(rgw_common
   PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw/services"
   PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw"
@@ -432,7 +433,7 @@ target_link_libraries(rgw_a
     OATH::OATH
   PUBLIC
     rgw_common
-    spawn)
+    Boost::context)
 
 if(WITH_CURL_OPENSSL)
   # used by rgw_http_client_curl.cc
@@ -448,7 +449,7 @@ set(rgw_schedulers_srcs
 
 add_library(rgw_schedulers STATIC ${rgw_schedulers_srcs})
 target_link_libraries(rgw_schedulers
-  PUBLIC dmclock::dmclock spawn)
+  PUBLIC dmclock::dmclock Boost::context)
 
 set(radosgw_srcs
   rgw_main.cc)
index a3aca7a64e4cc1e97424dbb5b29574f4d1658219..f401c912f6751827bf9e8e3264ba09a45152f844 100644 (file)
@@ -29,7 +29,7 @@ target_include_directories(dbstore_lib
     PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw"
     PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw/store/rados"
     PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}")
-set(link_targets spawn)
+set(link_targets Boost::context)
 if(WITH_JAEGER)
   list(APPEND link_targets jaeger_base)
 endif()
index ed23129eb304520cc6a2e6d79ab7b81174ffe37c..85e8f53997536e6787bd49414d316641a8496c5f 100644 (file)
@@ -89,7 +89,7 @@ using part_info = fifo::part_header;
 ///
 /// This library uses optional_yield. Please see
 /// /src/common/async/yield_context.h. In summary, optional_yield
-/// contains either a spawn::yield_context (in which case the current
+/// contains either a boost::asio::yield_context (in which case the current
 /// coroutine is suspended until completion) or null_yield (in which
 /// case the current thread is blocked until completion.)
 ///
index c51e61a2755dc2176544e110d405e478bb1d6d8d..b661b56424409e11cc0a34d13fc228637321fe77 100644 (file)
@@ -588,14 +588,14 @@ int RGWBucket::check_index_olh(rgw::sal::RadosStore* const rados_store,
   const int max_aio = std::max(1, op_state.get_max_aio());
 
   for (int i=0; i<max_aio; i++) {
-    spawn::spawn(context, [&](spawn::yield_context yield) {
+    boost::asio::spawn(context, [&](boost::asio::yield_context yield) {
       while (true) {
         int shard = next_shard;
         next_shard += 1;
         if (shard >= max_shards) {
           return;
         }
-        optional_yield y(context, yield);
+        optional_yield y(yield);
         uint64_t shard_count;
         int r = ::check_index_olh(rados_store, &*bucket, dpp, op_state, flusher, shard, &shard_count, y);
         if (r < 0) {
@@ -608,6 +608,8 @@ int RGWBucket::check_index_olh(rgw::sal::RadosStore* const rados_store,
             " entries " << verb << ")" << dendl;
         }
       }
+    }, [] (std::exception_ptr eptr) {
+      if (eptr) std::rethrow_exception(eptr);
     });
   }
   try {
@@ -797,7 +799,7 @@ int RGWBucket::check_index_unlinked(rgw::sal::RadosStore* const rados_store,
   int next_shard = 0;
   boost::asio::io_context context;
   for (int i=0; i<max_aio; i++) {
-    spawn::spawn(context, [&](spawn::yield_context yield) {
+    boost::asio::spawn(context, [&](boost::asio::yield_context yield) {
       while (true) {
         int shard = next_shard;
         next_shard += 1;
@@ -805,8 +807,7 @@ int RGWBucket::check_index_unlinked(rgw::sal::RadosStore* const rados_store,
           return;
         }
         uint64_t shard_count;
-        optional_yield y {context, yield};
-        int r = ::check_index_unlinked(rados_store, &*bucket, dpp, op_state, flusher, shard, &shard_count, y);
+        int r = ::check_index_unlinked(rados_store, &*bucket, dpp, op_state, flusher, shard, &shard_count, yield);
         if (r < 0) {
           ldpp_dout(dpp, -1) << "ERROR: error processing shard " << shard << 
             " check_index_unlinked(): " << r << dendl;
@@ -817,6 +818,8 @@ int RGWBucket::check_index_unlinked(rgw::sal::RadosStore* const rados_store,
             " entries " << verb << ")" << dendl;
         }
       }
+    }, [] (std::exception_ptr eptr) {
+      if (eptr) std::rethrow_exception(eptr);
     });
   }
   try {
index 87c0247cf0af883fe6c0386fa618d2b7caec7144..38e3630dd8f8ea25f7ff7e247fcaab842ade5329 100644 (file)
@@ -9,8 +9,8 @@
 #include <boost/asio/basic_waitable_timer.hpp>
 #include <boost/asio/executor_work_guard.hpp>
 #include <boost/asio/io_context.hpp>
+#include <boost/asio/spawn.hpp>
 #include <boost/context/protected_fixedsize_stack.hpp>
-#include <spawn/spawn.hpp>
 #include "include/function2.hpp"
 #include "rgw_sal_rados.h"
 #include "rgw_pubsub.h"
@@ -168,7 +168,7 @@ private:
       pending_tokens(0),
       timer(io_context) {}  
  
-    void async_wait(spawn::yield_context yield) { 
+    void async_wait(boost::asio::yield_context yield) {
       if (pending_tokens == 0) {
         return;
       }
@@ -196,7 +196,7 @@ private:
       const cls_queue_entry& entry,
       RGWPubSubEndpoint* const push_endpoint,
       const rgw_pubsub_topic& topic,
-      spawn::yield_context yield) {
+      boost::asio::yield_context yield) {
     event_entry_t event_entry;
     auto iter = entry.data.cbegin();
     try {
@@ -246,7 +246,7 @@ private:
                         << " retry_number: "
                         << entry_persistency_tracker.retires_num
                         << " current time: " << time_now << dendl;
-    const auto ret = push_endpoint->send(event_entry.event, optional_yield{io_context, yield});
+    const auto ret = push_endpoint->send(event_entry.event, yield);
     if (ret < 0) {
       ldpp_dout(this, 5) << "WARNING: push entry marker: " << entry.marker
                          << " failed. error: " << ret
@@ -262,7 +262,7 @@ private:
   }
 
   // clean stale reservation from queue
-  void cleanup_queue(const std::string& queue_name, spawn::yield_context yield) {
+  void cleanup_queue(const std::string& queue_name, boost::asio::yield_context yield) {
     while (!shutdown) {
       ldpp_dout(this, 20) << "INFO: trying to perform stale reservation cleanup for queue: " << queue_name << dendl;
       const auto now = ceph::coarse_real_time::clock::now();
@@ -275,7 +275,7 @@ private:
         "" /*no tag*/);
       cls_2pc_queue_expire_reservations(op, stale_time);
       // check ownership and do reservation cleanup in one batch
-      auto ret = rgw_rados_operate(this, rados_store.getRados()->get_notif_pool_ctx(), queue_name, &op, optional_yield(io_context, yield));
+      auto ret = rgw_rados_operate(this, rados_store.getRados()->get_notif_pool_ctx(), queue_name, &op, yield);
       if (ret == -ENOENT) {
         // queue was deleted
         ldpp_dout(this, 5) << "INFO: queue: " 
@@ -299,12 +299,12 @@ private:
   }
 
   // unlock (lose ownership) queue
-  int unlock_queue(const std::string& queue_name, spawn::yield_context yield) {
+  int unlock_queue(const std::string& queue_name, boost::asio::yield_context yield) {
     librados::ObjectWriteOperation op;
     op.assert_exists();
     rados::cls::lock::unlock(&op, queue_name+"_lock", lock_cookie);
     auto& rados_ioctx = rados_store.getRados()->get_notif_pool_ctx();
-    const auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield));
+    const auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, yield);
     if (ret == -ENOENT) {
       ldpp_dout(this, 10) << "INFO: queue: " << queue_name
         << ". was removed. nothing to unlock" << dendl;
@@ -321,13 +321,13 @@ private:
   int get_topic_info(const std::string& queue_name,
                      const cls_queue_entry& queue_entry,
                      rgw_pubsub_topic& topic,
-                     spawn::yield_context yield) {
+                     boost::asio::yield_context yield) {
     std::string queue_topic_tenant;
     std::string queue_topic_name;
     parse_topic_metadata_key(queue_name, queue_topic_tenant, queue_topic_name);
     rgw_pubsub_topic topic_info;
     RGWPubSub ps(&rados_store, queue_topic_tenant, site);
-    int ret = ps.get_topic(this, queue_topic_name, topic_info, optional_yield{io_context, yield}, nullptr);
+    int ret = ps.get_topic(this, queue_topic_name, topic_info, yield, nullptr);
     if (ret < 0) {
       ldpp_dout(this, 1) << "WARNING: failed to fetch topic: "
                          << queue_topic_name << " error: " << ret
@@ -355,15 +355,18 @@ private:
   }
 
   // processing of a specific queue
-  void process_queue(const std::string& queue_name, spawn::yield_context yield) {
+  void process_queue(const std::string& queue_name, boost::asio::yield_context yield) {
     constexpr auto max_elements = 1024;
     auto is_idle = false;
     const std::string start_marker;
 
     // start a the cleanup coroutine for the queue
-    spawn::spawn(io_context, [this, queue_name](spawn::yield_context yield) {
-            cleanup_queue(queue_name, yield);
-            }, make_stack_allocator());
+    boost::asio::spawn(make_strand(io_context), std::allocator_arg, make_stack_allocator(),
+            [this, queue_name](boost::asio::yield_context yield) {
+              cleanup_queue(queue_name, yield);
+            }, [] (std::exception_ptr eptr) {
+              if (eptr) std::rethrow_exception(eptr);
+            });
 
     CountersManager queue_counters_container(queue_name, this->get_cct());
 
@@ -394,7 +397,7 @@ private:
           "" /*no tag*/);
         cls_2pc_queue_list_entries(op, start_marker, max_elements, &obl, &rval);
         // check ownership and list entries in one batch
-        auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, nullptr, optional_yield(io_context, yield));
+        auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, nullptr, yield);
         if (ret == -ENOENT) {
           // queue was deleted
           topics_persistency_tracker.erase(queue_name);
@@ -459,11 +462,10 @@ private:
         }
 
         entries_persistency_tracker& notifs_persistency_tracker = topics_persistency_tracker[queue_name];
-        spawn::spawn(yield,[this, &notifs_persistency_tracker, &queue_name, entry_idx,
-           total_entries, &end_marker, &remove_entries, &has_error, &waiter,
-           &entry, &needs_migration_vector,
-           push_endpoint = push_endpoint.get(),
-           &topic_info](spawn::yield_context yield) {
+        boost::asio::spawn(yield, std::allocator_arg, make_stack_allocator(),
+          [this, &notifs_persistency_tracker, &queue_name, entry_idx, total_entries, &end_marker,
+           &remove_entries, &has_error, &waiter, &entry, &needs_migration_vector,
+           push_endpoint = push_endpoint.get(), &topic_info](boost::asio::yield_context yield) {
             const auto token = waiter.make_token();
             auto& persistency_tracker = notifs_persistency_tracker[entry.marker];
             auto result =
@@ -487,7 +489,9 @@ private:
               ldpp_dout(this, 20) << "INFO: processing of entry: " << 
                 entry.marker << " (" << entry_idx << "/" << total_entries << ") from: " << queue_name << " failed" << dendl;
             } 
-        }, make_stack_allocator());
+        }, [] (std::exception_ptr eptr) {
+          if (eptr) std::rethrow_exception(eptr);
+        });
         ++entry_idx;
       }
 
@@ -519,7 +523,7 @@ private:
           "" /*no tag*/);
         cls_2pc_queue_remove_entries(op, end_marker, entries_to_remove);
         // check ownership and deleted entries in one batch
-        auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield));
+        auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, yield);
         if (ret == -ENOENT) {
           // queue was deleted
           ldpp_dout(this, 5) << "INFO: queue: " << queue_name << ". was removed. processing will stop" << dendl;
@@ -547,7 +551,7 @@ private:
 
           rgw_pubsub_topic topic;
           auto ret_of_get_topic = ps.get_topic(this, queue_name, topic,
-                           optional_yield(io_context, yield), nullptr);
+                                               yield, nullptr);
           if (ret_of_get_topic < 0) {
             // we can't migrate entries without topic info
             ldpp_dout(this, 1) << "ERROR: failed to fetch topic: " << queue_name << " error: "
@@ -579,7 +583,7 @@ private:
           buffer::list obl;
           int rval;
           cls_2pc_queue_reserve(op, size_to_migrate, migration_vector.size(), &obl, &rval);
-          ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield), librados::OPERATION_RETURNVEC);
+          ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, yield, librados::OPERATION_RETURNVEC);
           if (ret < 0) {
             ldpp_dout(this, 1) << "ERROR: failed to reserve migration space on queue: " << queue_name << ". error: " << ret << dendl;
             return;
@@ -591,7 +595,7 @@ private:
           }
 
           cls_2pc_queue_commit(op, migration_vector, reservation_id);
-          ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield));
+          ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, yield);
           reservation_id = cls_2pc_reservation::NO_ID;
           if (ret < 0) {
             ldpp_dout(this, 1) << "ERROR: failed to commit reservation to queue: " << queue_name << ". error: " << ret << dendl;
@@ -618,7 +622,7 @@ private:
 
   // process all queues
   // find which of the queues is owned by this daemon and process it
-  void process_queues(spawn::yield_context yield) {
+  void process_queues(boost::asio::yield_context yield) {
     auto has_error = false;
     owned_queues_t owned_queues;
     size_t processed_queue_count = 0;
@@ -646,7 +650,7 @@ private:
       timer.async_wait(yield[ec]);
 
       queues_t queues;
-      auto ret = read_queue_list(queues, optional_yield(io_context, yield));
+      auto ret = read_queue_list(queues, yield);
       if (ret < 0) {
         has_error = true;
         continue;
@@ -665,7 +669,7 @@ private:
               failover_time,
               LOCK_FLAG_MAY_RENEW);
 
-        ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield));
+        ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, yield);
         if (ret == -EBUSY) {
           // lock is already taken by another RGW
           ldpp_dout(this, 20) << "INFO: queue: " << queue_name << " owned (locked) by another daemon" << dendl;
@@ -687,7 +691,8 @@ private:
         if (owned_queues.insert(queue_name).second) {
           ldpp_dout(this, 10) << "INFO: queue: " << queue_name << " now owned (locked) by this daemon" << dendl;
           // start processing this queue
-          spawn::spawn(io_context, [this, &queue_gc, &queue_gc_lock, queue_name, &processed_queue_count](spawn::yield_context yield) {
+          boost::asio::spawn(make_strand(io_context), std::allocator_arg, make_stack_allocator(),
+                             [this, &queue_gc, &queue_gc_lock, queue_name, &processed_queue_count](boost::asio::yield_context yield) {
             ++processed_queue_count;
             process_queue(queue_name, yield);
             // if queue processing ended, it means that the queue was removed or not owned anymore
@@ -703,7 +708,9 @@ private:
             queue_gc.push_back(queue_name);
             --processed_queue_count;
             ldpp_dout(this, 10) << "INFO: queue: " << queue_name << " marked for removal" << dendl;
-          }, make_stack_allocator());
+          }, [] (std::exception_ptr eptr) {
+            if (eptr) std::rethrow_exception(eptr);
+          });
         } else {
           ldpp_dout(this, 20) << "INFO: queue: " << queue_name << " ownership (lock) renewed" << dendl;
         }
@@ -741,9 +748,12 @@ public:
   }
 
   void init() {
-    spawn::spawn(io_context, [this](spawn::yield_context yield) {
+    boost::asio::spawn(make_strand(io_context), std::allocator_arg, make_stack_allocator(),
+        [this](boost::asio::yield_context yield) {
           process_queues(yield);
-        }, make_stack_allocator());
+        }, [] (std::exception_ptr eptr) {
+          if (eptr) std::rethrow_exception(eptr);
+        });
 
     // start the worker threads to do the actual queue processing
     const std::string WORKER_THREAD_NAME = "notif-worker";
index b73da6b42d491dbc1eeeffd040d992689252b955..69eb5c9ff326224e6414edd8c207513f1f5fc53d 100644 (file)
@@ -19,7 +19,7 @@
 #ifdef WITH_RADOSGW_KAFKA_ENDPOINT
 #include "rgw_kafka.h"
 #endif
-#include <boost/asio/yield.hpp>
+//#include <boost/asio/yield.hpp>
 #include <boost/algorithm/string.hpp>
 #include <functional>
 #include "rgw_perf_counters.h"
@@ -153,7 +153,7 @@ public:
       boost::system::error_code ec;
       auto yield = y.get_yield_context();
       auto&& token = yield[ec];
-      boost::asio::async_initiate<spawn::yield_context, Signature>(
+      boost::asio::async_initiate<boost::asio::yield_context, Signature>(
           [this] (auto handler, auto ex) {
             completion = Completion::create(ex, std::move(handler));
           }, token, yield.get_executor());
index 4ad5ce96e10bf92a2a4973ba94c3a70691be034f..e93ee1e83a2751c11a9346cc150ac169db97be52 100644 (file)
@@ -1363,10 +1363,15 @@ int RGWRados::init_complete(const DoutPrefixProvider *dpp, optional_yield y)
     using namespace rgw;
     if (svc.site->is_meta_master() &&
         all_zonegroups_support(*svc.site, zone_features::notification_v2)) {
-      spawn::spawn(v1_topic_migration, [this] (spawn::yield_context yield) {
+      boost::asio::spawn(v1_topic_migration, [this] (boost::asio::yield_context yield) {
             DoutPrefix dpp{cct, dout_subsys, "v1 topic migration: "};
             rgwrados::topic_migration::migrate(&dpp, driver, v1_topic_migration, yield);
+          }, [] (std::exception_ptr eptr) {
+            if (eptr) std::rethrow_exception(eptr);
           });
+      // TODO: we run this on a separate thread so shutdown can cancel it with
+      // v1_topic_migration.stop(), but we could run it on the global thread
+      // pool and cancel spawn() with a cancellation_signal instead
       v1_topic_migration.start(1);
     }
   }
index c1011bd60a5503ce27aa086f270404c692e21574..d590bff7fcfdea899538e06188660cb61ea74280 100644 (file)
@@ -1153,10 +1153,9 @@ int RGWReshardWait::wait(optional_yield y)
   }
 
   if (y) {
-    auto& context = y.get_io_context();
     auto& yield = y.get_yield_context();
 
-    Waiter waiter(context);
+    Waiter waiter(yield.get_executor());
     waiters.push_back(waiter);
     lock.unlock();
 
index 0497414566ad28182e52c1cd802cc46bf50c71f7..8e37defa1dbdcc4ce744a4e5f5b3c53ee99399d9 100644 (file)
@@ -252,11 +252,11 @@ class RGWReshardWait {
   ceph::condition_variable cond;
 
   struct Waiter : boost::intrusive::list_base_hook<> {
-    using Executor = boost::asio::io_context::executor_type;
+    using Executor = boost::asio::any_io_executor;
     using Timer = boost::asio::basic_waitable_timer<Clock,
           boost::asio::wait_traits<Clock>, Executor>;
     Timer timer;
-    explicit Waiter(boost::asio::io_context& ioc) : timer(ioc) {}
+    explicit Waiter(boost::asio::any_io_executor ex) : timer(ex) {}
   };
   boost::intrusive::list<Waiter> waiters;
 
index 41254f9519e344a70577d552f69837c717bd5a20..eec4a799115654a30ccc4a18fc44e9688eb6f83f 100644 (file)
@@ -203,11 +203,10 @@ int rgw_rados_operate(const DoutPrefixProvider *dpp, librados::IoCtx& ioctx, con
   // given a yield_context, call async_operate() to yield the coroutine instead
   // of blocking
   if (y) {
-    auto& context = y.get_io_context();
     auto& yield = y.get_yield_context();
     boost::system::error_code ec;
     auto bl = librados::async_operate(
-      context, ioctx, oid, op, flags, trace_info, yield[ec]);
+      yield, ioctx, oid, op, flags, trace_info, yield[ec]);
     if (pbl) {
       *pbl = std::move(bl);
     }
@@ -228,10 +227,9 @@ int rgw_rados_operate(const DoutPrefixProvider *dpp, librados::IoCtx& ioctx, con
                      int flags, const jspan_context* trace_info)
 {
   if (y) {
-    auto& context = y.get_io_context();
     auto& yield = y.get_yield_context();
     boost::system::error_code ec;
-    librados::async_operate(context, ioctx, oid, op, flags, trace_info, yield[ec]);
+    librados::async_operate(yield, ioctx, oid, op, flags, trace_info, yield[ec]);
     return -ec.value();
   }
   if (is_asio_thread) {
@@ -248,10 +246,9 @@ int rgw_rados_notify(const DoutPrefixProvider *dpp, librados::IoCtx& ioctx, cons
                      optional_yield y)
 {
   if (y) {
-    auto& context = y.get_io_context();
     auto& yield = y.get_yield_context();
     boost::system::error_code ec;
-    auto reply = librados::async_notify(context, ioctx, oid,
+    auto reply = librados::async_notify(yield, ioctx, oid,
                                         bl, timeout_ms, yield[ec]);
     if (pbl) {
       *pbl = std::move(reply);
index c7dcfc37b4489508999b0d5723787f8e07790d30..0d4238f84312efd4a1f5b9d7c89cd44f4c347af8 100644 (file)
@@ -263,10 +263,8 @@ int migrate_topics(const DoutPrefixProvider* dpp, optional_yield y,
 int migrate(const DoutPrefixProvider* dpp,
             rgw::sal::RadosStore* driver,
             boost::asio::io_context& context,
-            spawn::yield_context yield)
+            boost::asio::yield_context y)
 {
-  auto y = optional_yield{context, yield};
-
   ldpp_dout(dpp, 1) << "starting v1 topic migration.." << dendl;
 
   librados::Rados* rados = driver->getRados()->get_rados_handle();
index 9545fd63c2e66c8957101bf0d50b2ebd8c160abf..2bd2b730c85a5244f587268d80337c8b6756f3c5 100644 (file)
@@ -16,7 +16,7 @@
 #pragma once
 
 #include <boost/asio/io_context.hpp>
-#include <spawn/spawn.hpp>
+#include <boost/asio/spawn.hpp>
 
 class DoutPrefixProvider;
 namespace rgw::sal { class RadosStore; }
@@ -29,6 +29,6 @@ namespace rgwrados::topic_migration {
 int migrate(const DoutPrefixProvider* dpp,
             rgw::sal::RadosStore* driver,
             boost::asio::io_context& context,
-            spawn::yield_context yield);
+            boost::asio::yield_context yield);
 
 } // rgwrados::topic_migration
index 1c9a54f072647d599cd2b90ee0b7f4c2eaa7ddcb..37636dd130b206e4a64339be3f63816d0790c6cc 100644 (file)
@@ -89,16 +89,14 @@ struct Handler {
 
 template <typename Op>
 Aio::OpFunc aio_abstract(librados::IoCtx ctx, Op&& op,
-                         boost::asio::io_context& context,
-                         spawn::yield_context yield, jspan_context* trace_ctx = nullptr) {
-  return [ctx = std::move(ctx), op = std::move(op), &context, yield, trace_ctx] (Aio* aio, AioResult& r) mutable {
+                         boost::asio::yield_context yield,
+                         jspan_context* trace_ctx) {
+  return [ctx = std::move(ctx), op = std::move(op), yield, trace_ctx] (Aio* aio, AioResult& r) mutable {
       // arrange for the completion Handler to run on the yield_context's strand
       // executor so it can safely call back into Aio without locking
-      using namespace boost::asio;
-      async_completion<spawn::yield_context, void()> init(yield);
-      auto ex = get_associated_executor(init.completion_handler);
+      auto ex = yield.get_executor();
 
-      librados::async_operate(context, ctx, r.obj.oid, &op, 0, trace_ctx,
+      librados::async_operate(yield, ctx, r.obj.oid, &op, 0, trace_ctx,
                               bind_executor(ex, Handler{aio, ctx, r}));
     };
 }
@@ -110,7 +108,7 @@ Aio::OpFunc d3n_cache_aio_abstract(const DoutPrefixProvider *dpp, optional_yield
     ceph_assert(y);
     auto c = std::make_unique<D3nL1CacheRequest>();
     lsubdout(g_ceph_context, rgw_datacache, 20) << "D3nDataCache: d3n_cache_aio_abstract(): libaio Read From Cache, oid=" << r.obj.oid << dendl;
-    c->file_aio_read_abstract(dpp, y.get_io_context(), y.get_yield_context(), cache_location, read_ofs, read_len, aio, r);
+    c->file_aio_read_abstract(dpp, y.get_yield_context(), cache_location, read_ofs, read_len, aio, r);
   };
 }
 
@@ -122,7 +120,7 @@ Aio::OpFunc aio_abstract(librados::IoCtx ctx, Op&& op, optional_yield y, jspan_c
   static_assert(!std::is_const_v<Op>);
   if (y) {
     return aio_abstract(std::move(ctx), std::forward<Op>(op),
-                        y.get_io_context(), y.get_yield_context(), trace_ctx);
+                        y.get_yield_context(), trace_ctx);
   }
   return aio_abstract(std::move(ctx), std::forward<Op>(op), trace_ctx);
 }
index c0656ef225e6a038e16bb4defcf7b46191de9b4c..87fc980a94c16eff3589911b3f9483c7f6da6594 100644 (file)
@@ -80,8 +80,7 @@ class BlockingAioThrottle final : public Aio, private Throttle {
 // a throttle that yields the coroutine instead of blocking. all public
 // functions must be called within the coroutine strand
 class YieldingAioThrottle final : public Aio, private Throttle {
-  boost::asio::io_context& context;
-  spawn::yield_context yield;
+  boost::asio::yield_context yield;
   struct Handler;
 
   // completion callback associated with the waiter
@@ -94,9 +93,8 @@ class YieldingAioThrottle final : public Aio, private Throttle {
   struct Pending : AioResultEntry { uint64_t cost = 0; };
 
  public:
-  YieldingAioThrottle(uint64_t window, boost::asio::io_context& context,
-                      spawn::yield_context yield)
-    : Throttle(window), context(context), yield(yield)
+  YieldingAioThrottle(uint64_t window, boost::asio::yield_context yield)
+    : Throttle(window), yield(yield)
   {}
 
   virtual ~YieldingAioThrottle() override {};
@@ -119,7 +117,6 @@ inline auto make_throttle(uint64_t window_size, optional_yield y)
   std::unique_ptr<Aio> aio;
   if (y) {
     aio = std::make_unique<YieldingAioThrottle>(window_size,
-                                                y.get_io_context(),
                                                 y.get_yield_context());
   } else {
     aio = std::make_unique<BlockingAioThrottle>(window_size);
index 8c64570287daedb511e5b13fb4804995cba7bbe6..ace3b7aff49e9a4d3424952946ceeb8ff4b8aa0f 100644 (file)
@@ -3,6 +3,7 @@
 
 #include <atomic>
 #include <ctime>
+#include <memory>
 #include <vector>
 
 #include <boost/asio/error.hpp>
@@ -16,7 +17,7 @@
 #include <boost/smart_ptr/intrusive_ref_counter.hpp>
 
 #include <boost/context/protected_fixedsize_stack.hpp>
-#include <spawn/spawn.hpp>
+#include <boost/asio/spawn.hpp>
 
 #include "common/async/shared_mutex.h"
 #include "common/errno.h"
@@ -69,12 +70,12 @@ class StreamIO : public rgw::asio::ClientIO {
   CephContext* const cct;
   Stream& stream;
   timeout_timer& timeout;
-  spawn::yield_context yield;
+  boost::asio::yield_context yield;
   parse_buffer& buffer;
   boost::system::error_code fatal_ec;
  public:
   StreamIO(CephContext *cct, Stream& stream, timeout_timer& timeout,
-           rgw::asio::parser_type& parser, spawn::yield_context yield,
+           rgw::asio::parser_type& parser, boost::asio::yield_context yield,
            parse_buffer& buffer, bool is_ssl,
            const tcp::endpoint& local_endpoint,
            const tcp::endpoint& remote_endpoint)
@@ -200,7 +201,7 @@ void handle_connection(boost::asio::io_context& context,
                        rgw::dmclock::Scheduler *scheduler,
                        const std::string& uri_prefix,
                        boost::system::error_code& ec,
-                       spawn::yield_context yield)
+                       boost::asio::yield_context yield)
 {
   // don't impose a limit on the body, since we read it in pieces
   static constexpr size_t body_limit = std::numeric_limits<size_t>::max();
@@ -281,7 +282,7 @@ void handle_connection(boost::asio::io_context& context,
       RGWRestfulIO client(cct, &real_client_io);
       optional_yield y = null_yield;
       if (cct->_conf->rgw_beast_enable_async) {
-        y = optional_yield{context, yield};
+        y = optional_yield{yield};
       }
       int http_ret = 0;
       string user = "-";
@@ -1021,8 +1022,8 @@ void AsioFrontend::accept(Listener& l, boost::system::error_code ec)
   // spawn a coroutine to handle the connection
 #ifdef WITH_RADOSGW_BEAST_OPENSSL
   if (l.use_ssl) {
-    spawn::spawn(context,
-      [this, s=std::move(stream)] (spawn::yield_context yield) mutable {
+    boost::asio::spawn(make_strand(context), std::allocator_arg, make_stack_allocator(),
+      [this, s=std::move(stream)] (boost::asio::yield_context yield) mutable {
         auto conn = boost::intrusive_ptr{new Connection(std::move(s))};
         auto c = connections.add(*conn);
         // wrap the tcp stream in an ssl stream
@@ -1049,13 +1050,15 @@ void AsioFrontend::accept(Listener& l, boost::system::error_code ec)
         }
 
         conn->socket.shutdown(tcp::socket::shutdown_both, ec);
-      }, make_stack_allocator());
+      }, [] (std::exception_ptr eptr) {
+        if (eptr) std::rethrow_exception(eptr);
+      });
   } else {
 #else
   {
 #endif // WITH_RADOSGW_BEAST_OPENSSL
-    spawn::spawn(context,
-      [this, s=std::move(stream)] (spawn::yield_context yield) mutable {
+    boost::asio::spawn(make_strand(context), std::allocator_arg, make_stack_allocator(),
+      [this, s=std::move(stream)] (boost::asio::yield_context yield) mutable {
         auto conn = boost::intrusive_ptr{new Connection(std::move(s))};
         auto c = connections.add(*conn);
         auto timeout = timeout_timer{context.get_executor(), request_timeout, conn};
@@ -1064,7 +1067,9 @@ void AsioFrontend::accept(Listener& l, boost::system::error_code ec)
                           conn->buffer, false, pause_mutex, scheduler.get(),
                           uri_prefix, ec, yield);
         conn->socket.shutdown(tcp::socket::shutdown_both, ec);
-      }, make_stack_allocator());
+      }, [] (std::exception_ptr eptr) {
+        if (eptr) std::rethrow_exception(eptr);
+      });
   }
 }
 
index 0519c6def3e825a76dbc456ea69296bca903fa7f..54b495f5461f8d5a187918c8d7115ace7c38df49 100644 (file)
@@ -7,6 +7,8 @@
 #include <stdlib.h>
 #include <aio.h>
 
+#include <boost/asio/spawn.hpp>
+
 #include "include/rados/librados.hpp"
 #include "include/Context.h"
 #include "common/async/completion.h"
@@ -135,13 +137,10 @@ struct D3nL1CacheRequest {
     }
   };
 
-  void file_aio_read_abstract(const DoutPrefixProvider *dpp, spawn::yield_context yield,
+  void file_aio_read_abstract(const DoutPrefixProvider *dpp, boost::asio::yield_context yield,
                               std::string& cache_location, off_t read_ofs, off_t read_len,
                               rgw::Aio* aio, rgw::AioResult& r) {
-    using namespace boost::asio;
-    async_completion<spawn::yield_context, void()> init(yield);
-    auto ex = get_associated_executor(init.completion_handler);
-
+    auto ex = yield.get_executor();
     ldpp_dout(dpp, 20) << "D3nDataCache: " << __func__ << "(): oid=" << r.obj.oid << dendl;
     async_read(dpp, ex, cache_location+"/"+url_encode(r.obj.oid, true), read_ofs, read_len, bind_executor(ex, d3n_libaio_handler{aio, r}));
   }
index 24796ea1aa4eb3448356ccc7e5d631b26a5bace1..9ad4672718d0d2e02510da7137f98bbcd8182566 100644 (file)
@@ -6788,7 +6788,8 @@ void RGWDeleteMultiObj::execute(optional_yield y)
   char* buf;
   std::optional<boost::asio::deadline_timer> formatter_flush_cond;
   if (y) {
-    formatter_flush_cond = std::make_optional<boost::asio::deadline_timer>(y.get_io_context());  
+    auto ex = y.get_yield_context().get_executor();
+    formatter_flush_cond = std::make_optional<boost::asio::deadline_timer>(ex);
   }
 
   buf = data.c_str();
@@ -6856,9 +6857,11 @@ void RGWDeleteMultiObj::execute(optional_yield y)
         return aio_count < max_aio;
       });
       aio_count++;
-      spawn::spawn(y.get_yield_context(), [this, &y, &aio_count, obj_key, &formatter_flush_cond] (spawn::yield_context yield) {
-        handle_individual_object(obj_key, optional_yield { y.get_io_context(), yield }, &*formatter_flush_cond); 
+      boost::asio::spawn(y.get_yield_context(), [this, &aio_count, obj_key, &formatter_flush_cond] (boost::asio::yield_context yield) {
+        handle_individual_object(obj_key, yield, &*formatter_flush_cond);
         aio_count--;
+      }, [] (std::exception_ptr eptr) {
+        if (eptr) std::rethrow_exception(eptr);
       }); 
     } else {
       handle_individual_object(obj_key, y, nullptr);
index 1394a712a94f18693e03508851548ab69ec8a619..1172e79a48f32af7cecb2de7c53715005bea69fd 100644 (file)
@@ -226,8 +226,8 @@ int rgw_bucket_sync_checkpoint(const DoutPrefixProvider* dpp,
     entry.pipe = pipe;
 
     // fetch remote markers
-    spawn::spawn(ioctx, [&] (spawn::yield_context yield) {
-      auto y = optional_yield{ioctx, yield};
+    boost::asio::spawn(ioctx, [&] (boost::asio::yield_context yield) {
+      auto y = optional_yield{yield};
       rgw_bucket_index_marker_info info;
       int r = source_bilog_info(dpp, store->svc()->zone, entry.pipe,
                                 info, entry.remote_markers, y);
@@ -237,10 +237,12 @@ int rgw_bucket_sync_checkpoint(const DoutPrefixProvider* dpp,
         throw std::system_error(-r, std::system_category());
       }
       entry.latest_gen = info.latest_gen;
+    }, [] (std::exception_ptr eptr) {
+      if (eptr) std::rethrow_exception(eptr);
     });
     // fetch source bucket info
-    spawn::spawn(ioctx, [&] (spawn::yield_context yield) {
-      auto y = optional_yield{ioctx, yield};
+    boost::asio::spawn(ioctx, [&] (boost::asio::yield_context yield) {
+      auto y = optional_yield{yield};
       int r = store->getRados()->get_bucket_instance_info(
           *entry.pipe.source.bucket, entry.source_bucket_info,
           nullptr, nullptr, y, dpp);
@@ -249,6 +251,8 @@ int rgw_bucket_sync_checkpoint(const DoutPrefixProvider* dpp,
             << cpp_strerror(r) << dendl;
         throw std::system_error(-r, std::system_category());
       }
+    }, [] (std::exception_ptr eptr) {
+      if (eptr) std::rethrow_exception(eptr);
     });
   }
 
index a0c8fcfe823cca7fddfebbb4c53c65775aa6a141..2e756eeb583807710f1d9488d482576f791bd1a1 100644 (file)
@@ -340,7 +340,7 @@ target_link_libraries(ceph_test_librgw_file_nfsns
   ${LUA_LIBRARIES}
   ${ALLOC_LIBS}
   )
-  target_link_libraries(ceph_test_librgw_file_nfsns spawn)
+  target_link_libraries(ceph_test_librgw_file_nfsns Boost::context)
 install(TARGETS ceph_test_librgw_file_nfsns DESTINATION ${CMAKE_INSTALL_BINDIR})
 
 # ceph_test_librgw_file_aw (nfs write transaction [atomic write] tests)
@@ -374,7 +374,7 @@ target_link_libraries(ceph_test_librgw_file_marker
   ${LUA_LIBRARIES}
   ${ALLOC_LIBS}
   )
-  target_link_libraries(ceph_test_librgw_file_marker spawn)
+  target_link_libraries(ceph_test_librgw_file_marker Boost::context)
 install(TARGETS ceph_test_librgw_file_marker DESTINATION ${CMAKE_INSTALL_BINDIR})
 
 # ceph_test_librgw_file_xattr (attribute ops)
@@ -393,7 +393,7 @@ target_link_libraries(ceph_test_librgw_file_xattr
   ${LUA_LIBRARIES}
   ${ALLOC_LIBS}
   )
-target_link_libraries(ceph_test_librgw_file_xattr spawn)
+target_link_libraries(ceph_test_librgw_file_xattr Boost::context)
 
 # ceph_test_librgw_file_rename (mv/rename tests)
 add_executable(ceph_test_librgw_file_rename
index 5d5623f06c829defd9f7f7da982c4f3e5726f7b2..b83f1abf5bca289387285ae2c09d175c6af670fa 100644 (file)
@@ -60,7 +60,7 @@ target_link_libraries(ceph_test_rados_api_aio_pp
 
 add_executable(ceph_test_rados_api_asio asio.cc)
 target_link_libraries(ceph_test_rados_api_asio global
-  librados ${UNITTEST_LIBS} spawn)
+  librados ${UNITTEST_LIBS} Boost::context)
 
 add_executable(ceph_test_rados_api_list
   list.cc
@@ -133,7 +133,7 @@ target_include_directories(ceph_test_rados_api_tier_pp
   PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw")
 target_link_libraries(ceph_test_rados_api_tier_pp
   librados global ${UNITTEST_LIBS} Boost::system radostest-cxx cls_cas_internal
-  cls_cas_client spawn)
+  cls_cas_client Boost::context)
 
 add_executable(ceph_test_rados_api_snapshots
   snapshots.cc)
index 0ede2e14fb46530d9b6252e4d27911be94316f51..9f8844eb7bb82090c7cb6e40306d4efa065b1a22 100644 (file)
@@ -21,8 +21,8 @@
 
 #include <boost/range/begin.hpp>
 #include <boost/range/end.hpp>
-#include <spawn/spawn.hpp>
 #include <boost/asio/io_context.hpp>
+#include <boost/asio/spawn.hpp>
 #include <boost/asio/use_future.hpp>
 
 #define dout_subsys ceph_subsys_rados
@@ -73,6 +73,10 @@ librados::Rados AsioRados::rados;
 librados::IoCtx AsioRados::io;
 librados::IoCtx AsioRados::snapio;
 
+void rethrow(std::exception_ptr eptr) {
+  if (eptr) std::rethrow_exception(eptr);
+}
+
 TEST_F(AsioRados, AsyncReadCallback)
 {
   boost::asio::io_context service;
@@ -113,20 +117,20 @@ TEST_F(AsioRados, AsyncReadYield)
 {
   boost::asio::io_context service;
 
-  auto success_cr = [&] (spawn::yield_context yield) {
+  auto success_cr = [&] (boost::asio::yield_context yield) {
     boost::system::error_code ec;
     auto bl = librados::async_read(service, io, "exist", 256, 0, yield[ec]);
     EXPECT_FALSE(ec);
     EXPECT_EQ("hello", bl.to_str());
   };
-  spawn::spawn(service, success_cr);
+  boost::asio::spawn(service, success_cr, rethrow);
 
-  auto failure_cr = [&] (spawn::yield_context yield) {
+  auto failure_cr = [&] (boost::asio::yield_context yield) {
     boost::system::error_code ec;
     auto bl = librados::async_read(service, io, "noexist", 256, 0, yield[ec]);
     EXPECT_EQ(boost::system::errc::no_such_file_or_directory, ec);
   };
-  spawn::spawn(service, failure_cr);
+  boost::asio::spawn(service, failure_cr, rethrow);
 
   service.run();
 }
@@ -178,22 +182,22 @@ TEST_F(AsioRados, AsyncWriteYield)
   bufferlist bl;
   bl.append("hello");
 
-  auto success_cr = [&] (spawn::yield_context yield) {
+  auto success_cr = [&] (boost::asio::yield_context yield) {
     boost::system::error_code ec;
     librados::async_write(service, io, "exist", bl, bl.length(), 0,
                           yield[ec]);
     EXPECT_FALSE(ec);
     EXPECT_EQ("hello", bl.to_str());
   };
-  spawn::spawn(service, success_cr);
+  boost::asio::spawn(service, success_cr, rethrow);
 
-  auto failure_cr = [&] (spawn::yield_context yield) {
+  auto failure_cr = [&] (boost::asio::yield_context yield) {
     boost::system::error_code ec;
     librados::async_write(service, snapio, "exist", bl, bl.length(), 0,
                           yield[ec]);
     EXPECT_EQ(boost::system::errc::read_only_file_system, ec);
   };
-  spawn::spawn(service, failure_cr);
+  boost::asio::spawn(service, failure_cr, rethrow);
 
   service.run();
 }
@@ -251,7 +255,7 @@ TEST_F(AsioRados, AsyncReadOperationYield)
 {
   boost::asio::io_context service;
 
-  auto success_cr = [&] (spawn::yield_context yield) {
+  auto success_cr = [&] (boost::asio::yield_context yield) {
     librados::ObjectReadOperation op;
     op.read(0, 0, nullptr, nullptr);
     boost::system::error_code ec;
@@ -260,9 +264,9 @@ TEST_F(AsioRados, AsyncReadOperationYield)
     EXPECT_FALSE(ec);
     EXPECT_EQ("hello", bl.to_str());
   };
-  spawn::spawn(service, success_cr);
+  boost::asio::spawn(service, success_cr, rethrow);
 
-  auto failure_cr = [&] (spawn::yield_context yield) {
+  auto failure_cr = [&] (boost::asio::yield_context yield) {
     librados::ObjectReadOperation op;
     op.read(0, 0, nullptr, nullptr);
     boost::system::error_code ec;
@@ -270,7 +274,7 @@ TEST_F(AsioRados, AsyncReadOperationYield)
                                       yield[ec]);
     EXPECT_EQ(boost::system::errc::no_such_file_or_directory, ec);
   };
-  spawn::spawn(service, failure_cr);
+  boost::asio::spawn(service, failure_cr, rethrow);
 
   service.run();
 }
@@ -335,23 +339,23 @@ TEST_F(AsioRados, AsyncWriteOperationYield)
   bufferlist bl;
   bl.append("hello");
 
-  auto success_cr = [&] (spawn::yield_context yield) {
+  auto success_cr = [&] (boost::asio::yield_context yield) {
     librados::ObjectWriteOperation op;
     op.write_full(bl);
     boost::system::error_code ec;
     librados::async_operate(service, io, "exist", &op, 0, nullptr, yield[ec]);
     EXPECT_FALSE(ec);
   };
-  spawn::spawn(service, success_cr);
+  boost::asio::spawn(service, success_cr, rethrow);
 
-  auto failure_cr = [&] (spawn::yield_context yield) {
+  auto failure_cr = [&] (boost::asio::yield_context yield) {
     librados::ObjectWriteOperation op;
     op.write_full(bl);
     boost::system::error_code ec;
     librados::async_operate(service, snapio, "exist", &op, 0, nullptr, yield[ec]);
     EXPECT_EQ(boost::system::errc::read_only_file_system, ec);
   };
-  spawn::spawn(service, failure_cr);
+  boost::asio::spawn(service, failure_cr, rethrow);
 
   service.run();
 }
index dec7ea1c149c4648fed6705e29b110cdfd5b1220..8e4235a1af8896a837a5ff2b5193aa10d0aad1ef 100644 (file)
@@ -35,7 +35,6 @@ target_link_libraries(ceph_test_rgw_d4n_directory PRIVATE
   ${UNITTEST_LIBS}
   ${EXTRALIBS}
   )
-  target_link_libraries(ceph_test_rgw_d4n_directory PRIVATE spawn)
 install(TARGETS ceph_test_rgw_d4n_directory DESTINATION ${CMAKE_INSTALL_BINDIR})
 endif()
   
@@ -54,7 +53,7 @@ target_link_libraries(ceph_test_rgw_d4n_filter PRIVATE
   ${UNITTEST_LIBS}
   ${EXTRALIBS}
   )
-  target_link_libraries(ceph_test_rgw_d4n_filter PRIVATE spawn)
+  target_link_libraries(ceph_test_rgw_d4n_filter PRIVATE Boost::context)
 install(TARGETS ceph_test_rgw_d4n_filter DESTINATION ${CMAKE_INSTALL_BINDIR})
 endif()
 
index 1ea8714f9df00f83e96ea55ebc7a650d3e3943bd..529d8f739fd9307ec967c8dc0a168a5b16183dbb 100644 (file)
@@ -6,7 +6,7 @@
 #include <boost/asio/io_context.hpp>
 #include <boost/asio/executor_work_guard.hpp>
 #include <boost/asio/steady_timer.hpp>
-#include <spawn/spawn.hpp>
+#include <boost/asio/spawn.hpp>
 #include <chrono>
 #include <mutex>
 #include <unordered_map>
@@ -37,7 +37,7 @@ struct parameters {
 std::shared_ptr<std::vector<client_info>> ds = std::make_shared<std::vector<client_info>>(std::vector<client_info>());
 
 std::string method[2] = {"PUT", "GET"};
-void simulate_transfer(client_info& it, const RGWRateLimitInfo* info, std::shared_ptr<RateLimiter> ratelimit, const parameters& params, spawn::yield_context& yield, boost::asio::io_context& ioctx)
+void simulate_transfer(client_info& it, const RGWRateLimitInfo* info, std::shared_ptr<RateLimiter> ratelimit, const parameters& params, boost::asio::yield_context& yield, boost::asio::io_context& ioctx)
 {
     auto dout = DoutPrefix(g_ceph_context, ceph_subsys_rgw, "rate limiter: ");
     boost::asio::steady_timer timer(ioctx);
@@ -101,7 +101,7 @@ bool simulate_request(client_info& it, const RGWRateLimitInfo& info, std::shared
     it.accepted++;
     return false;
 }
-void simulate_client(client_info& it, const RGWRateLimitInfo& info, std::shared_ptr<RateLimiter> ratelimit, const parameters& params, spawn::yield_context& ctx, bool& to_run, boost::asio::io_context& ioctx)
+void simulate_client(client_info& it, const RGWRateLimitInfo& info, std::shared_ptr<RateLimiter> ratelimit, const parameters& params, boost::asio::yield_context& ctx, bool& to_run, boost::asio::io_context& ioctx)
 {
     for (;;)
     {
@@ -130,11 +130,13 @@ void simulate_clients(boost::asio::io_context& context, std::string tenant, cons
         auto& it = ds->emplace_back(client_info());
         it.tenant = tenant;
         int x = ds->size() - 1;
-        spawn::spawn(context,
-                [&to_run ,x, ratelimit, info, params, &context](spawn::yield_context ctx)
+        boost::asio::spawn(context,
+                [&to_run ,x, ratelimit, info, params, &context](boost::asio::yield_context ctx)
                 {
                     auto& it = ds.get()->operator[](x);
                     simulate_client(it, info, ratelimit, params, ctx, to_run, context);
+                }, [] (std::exception_ptr eptr) {
+                  if (eptr) std::rethrow_exception(eptr);
                 });
     }
 }
index c9b4a853fd4d726c36d51c374705da8f8ddeed20..da748dfa6c9ca1a99dc4c4dcef73dcd0b6064067 100644 (file)
@@ -18,7 +18,7 @@
 #include "rgw_dmclock_async_scheduler.h"
 
 #include <optional>
-#include <spawn/spawn.hpp>
+#include <boost/asio/spawn.hpp>
 #include <gtest/gtest.h>
 #include "acconfig.h"
 #include "global/global_context.h"
@@ -400,7 +400,7 @@ TEST(Queue, SpawnAsyncRequest)
 {
   boost::asio::io_context context;
 
-  spawn::spawn(context, [&] (spawn::yield_context yield) {
+  boost::asio::spawn(context, [&] (boost::asio::yield_context yield) {
     ClientCounters counters(g_ceph_context);
     AsyncScheduler queue(g_ceph_context, context, std::ref(counters), nullptr,
                     [] (client_id client) -> ClientInfo* {
@@ -419,6 +419,8 @@ TEST(Queue, SpawnAsyncRequest)
     auto p2 = queue.async_request(client_id::auth, {}, get_time(), 1, yield[ec2]);
     EXPECT_EQ(boost::system::errc::success, ec2);
     EXPECT_EQ(PhaseType::priority, p2);
+  }, [] (std::exception_ptr eptr) {
+    if (eptr) std::rethrow_exception(eptr);
   });
 
   context.run_for(std::chrono::milliseconds(50));
index 98b2aa235b95c06ad446d012a0da7bfc9ea3bd03..058828b956c88e358a6e3103dae3a667cb91a2ab 100644 (file)
@@ -13,7 +13,7 @@
  */
 
 #include "rgw_reshard.h"
-#include <spawn/spawn.hpp>
+#include <boost/asio/spawn.hpp>
 
 #include <gtest/gtest.h>
 
@@ -58,15 +58,19 @@ TEST(ReshardWait, stop_block)
   short_waiter.stop();
 }
 
+void rethrow(std::exception_ptr eptr) {
+  if (eptr) std::rethrow_exception(eptr);
+}
+
 TEST(ReshardWait, wait_yield)
 {
   constexpr ceph::timespan wait_duration = 50ms;
   RGWReshardWait waiter(wait_duration);
 
   boost::asio::io_context context;
-  spawn::spawn(context, [&] (spawn::yield_context yield) {
-      EXPECT_EQ(0, waiter.wait(optional_yield{context, yield}));
-    });
+  boost::asio::spawn(context, [&] (boost::asio::yield_context yield) {
+      EXPECT_EQ(0, waiter.wait(yield));
+    }, rethrow);
 
   const auto start = Clock::now();
   EXPECT_EQ(1u, context.poll()); // spawn
@@ -89,10 +93,10 @@ TEST(ReshardWait, stop_yield)
   RGWReshardWait short_waiter(short_duration);
 
   boost::asio::io_context context;
-  spawn::spawn(context,
-    [&] (spawn::yield_context yield) {
-      EXPECT_EQ(-ECANCELED, long_waiter.wait(optional_yield{context, yield}));
-    });
+  boost::asio::spawn(context,
+    [&] (boost::asio::yield_context yield) {
+      EXPECT_EQ(-ECANCELED, long_waiter.wait(yield));
+    }, rethrow);
 
   const auto start = Clock::now();
   EXPECT_EQ(1u, context.poll()); // spawn
@@ -133,13 +137,13 @@ TEST(ReshardWait, stop_multiple)
   // spawn 4 coroutines
   boost::asio::io_context context;
   {
-    auto async_waiter = [&] (spawn::yield_context yield) {
-        EXPECT_EQ(-ECANCELED, long_waiter.wait(optional_yield{context, yield}));
+    auto async_waiter = [&] (boost::asio::yield_context yield) {
+        EXPECT_EQ(-ECANCELED, long_waiter.wait(yield));
       };
-    spawn::spawn(context, async_waiter);
-    spawn::spawn(context, async_waiter);
-    spawn::spawn(context, async_waiter);
-    spawn::spawn(context, async_waiter);
+    boost::asio::spawn(context, async_waiter, rethrow);
+    boost::asio::spawn(context, async_waiter, rethrow);
+    boost::asio::spawn(context, async_waiter, rethrow);
+    boost::asio::spawn(context, async_waiter, rethrow);
   }
 
   const auto start = Clock::now();
index e5df9f84efa1d08b1eb7408a21016b5ffa6bcb6a..18dd8f3ffbc3e9790169524328e2c7a2202a2baf 100644 (file)
@@ -20,9 +20,9 @@
 #include <boost/asio/error.hpp>
 #include <boost/asio/executor_work_guard.hpp>
 #include <boost/asio/io_context.hpp>
+#include <boost/asio/spawn.hpp>
 #include "include/scope_guard.h"
 
-#include <spawn/spawn.hpp>
 #include <gtest/gtest.h>
 
 static rgw_raw_obj make_obj(const std::string& oid)
@@ -143,13 +143,15 @@ TEST(Aio_Throttle, YieldCostOverWindow)
   auto obj = make_obj(__PRETTY_FUNCTION__);
 
   boost::asio::io_context context;
-  spawn::spawn(context,
-    [&] (spawn::yield_context yield) {
-      YieldingAioThrottle throttle(4, context, yield);
+  boost::asio::spawn(context,
+    [&] (boost::asio::yield_context yield) {
+      YieldingAioThrottle throttle(4, yield);
       scoped_completion op;
       auto c = throttle.get(obj, wait_on(op), 8, 0);
       ASSERT_EQ(1u, c.size());
       EXPECT_EQ(-EDEADLK, c.front().result);
+    }, [] (std::exception_ptr eptr) {
+      if (eptr) std::rethrow_exception(eptr);
     });
   context.run();
 }
@@ -166,9 +168,9 @@ TEST(Aio_Throttle, YieldingThrottleOverMax)
   uint64_t outstanding = 0;
 
   boost::asio::io_context context;
-  spawn::spawn(context,
-    [&] (spawn::yield_context yield) {
-      YieldingAioThrottle throttle(window, context, yield);
+  boost::asio::spawn(context,
+    [&] (boost::asio::yield_context yield) {
+      YieldingAioThrottle throttle(window, yield);
       for (uint64_t i = 0; i < total; i++) {
         using namespace std::chrono_literals;
         auto c = throttle.get(obj, wait_for(context, 10ms), 1, 0);
@@ -180,6 +182,8 @@ TEST(Aio_Throttle, YieldingThrottleOverMax)
       }
       auto c = throttle.drain();
       outstanding -= c.size();
+    }, [] (std::exception_ptr eptr) {
+      if (eptr) std::rethrow_exception(eptr);
     });
   context.poll(); // run until we block
   EXPECT_EQ(window, outstanding);