]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: switch back to boost::asio for spawn() and yield_context
authorCasey Bodley <cbodley@redhat.com>
Thu, 15 Feb 2024 03:53:47 +0000 (22:53 -0500)
committerCasey Bodley <cbodley@redhat.com>
Mon, 13 May 2024 16:57:01 +0000 (12:57 -0400)
a fork of boost::asio::spawn() was introduced in 2020 with spawn::spawn() from #31580. this fork enabled rgw to customize how the coroutine stacks are allocated in order to avoid stack overflows in frontend request coroutines. this customization was based on a StackAllocator concept from the boost::context library

in boost 1.80, that same StackAllocator overload was added to boost::asio::spawn(), along with other improvements like per-op cancellation. now that boost has everything we need, switch back and drop the spawn submodule

this required switching a lot of async functions from async_completion<> to async_initiate<>. similar changes were necessary to enable the c++20 coroutine token boost::asio::use_awaitable

Signed-off-by: Casey Bodley <cbodley@redhat.com>
40 files changed:
src/cls/CMakeLists.txt
src/common/async/yield_context.h
src/crypto/isa-l/CMakeLists.txt
src/crypto/openssl/CMakeLists.txt
src/crypto/qat/CMakeLists.txt
src/crypto/qat/qcccrypto.cc
src/rgw/CMakeLists.txt
src/rgw/driver/d4n/d4n_directory.cc
src/rgw/driver/d4n/d4n_policy.cc
src/rgw/driver/dbstore/CMakeLists.txt
src/rgw/driver/rados/cls_fifo_legacy.h
src/rgw/driver/rados/rgw_bucket.cc
src/rgw/driver/rados/rgw_notify.cc
src/rgw/driver/rados/rgw_pubsub_push.cc
src/rgw/driver/rados/rgw_rados.cc
src/rgw/driver/rados/rgw_reshard.cc
src/rgw/driver/rados/rgw_reshard.h
src/rgw/driver/rados/rgw_tools.cc
src/rgw/driver/rados/topic_migration.cc
src/rgw/driver/rados/topic_migration.h
src/rgw/rgw_aio.cc
src/rgw/rgw_aio_throttle.h
src/rgw/rgw_asio_frontend.cc
src/rgw/rgw_d3n_cacherequest.h
src/rgw/rgw_op.cc
src/rgw/rgw_redis_driver.cc
src/rgw/rgw_ssd_driver.cc
src/rgw/rgw_sync_checkpoint.cc
src/test/CMakeLists.txt
src/test/librados/CMakeLists.txt
src/test/librados/asio.cc
src/test/rgw/CMakeLists.txt
src/test/rgw/bench_rgw_ratelimit.cc
src/test/rgw/test_d4n_directory.cc
src/test/rgw/test_d4n_policy.cc
src/test/rgw/test_redis_driver.cc
src/test/rgw/test_rgw_dmclock_scheduler.cc
src/test/rgw/test_rgw_reshard_wait.cc
src/test/rgw/test_rgw_throttle.cc
src/test/rgw/test_ssd_driver.cc

index 08590a43989c268ea92fb5357b8b2370327806e5..953ac83195f26ee7f6d39dd81c2c3122bb7faa32 100644 (file)
@@ -76,8 +76,7 @@ if (WITH_RADOSGW)
   target_link_libraries(cls_otp OATH::OATH)
   target_include_directories(cls_otp
          PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw/driver/rados"
-         PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw"
-         PUBLIC "${CMAKE_SOURCE_DIR}/src/spawn/include")
+         PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw")
   set_target_properties(cls_otp PROPERTIES
     VERSION "1.0.0"
     SOVERSION "1"
@@ -204,8 +203,7 @@ if (WITH_RADOSGW)
   target_link_libraries(cls_rgw ${FMT_LIB} json_spirit)
   target_include_directories(cls_rgw
          PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw/driver/rados"
-         PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw"
-         PUBLIC "${CMAKE_SOURCE_DIR}/src/spawn/include")
+         PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw")
   set_target_properties(cls_rgw PROPERTIES
     VERSION "1.0.0"
     SOVERSION "1"
@@ -220,8 +218,7 @@ if (WITH_RADOSGW)
   add_library(cls_rgw_client STATIC ${cls_rgw_client_srcs})
   target_include_directories(cls_rgw_client
          PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw/driver/rados"
-         PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw"
-         PUBLIC "${CMAKE_SOURCE_DIR}/src/spawn/include")
+         PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw")
 
 endif (WITH_RADOSGW)
 
@@ -313,8 +310,7 @@ if (WITH_RADOSGW)
   add_library(cls_rgw_gc SHARED ${cls_rgw_gc_srcs})
   target_include_directories(cls_rgw_gc
          PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw/driver/rados"
-         PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw"
-         PUBLIC "${CMAKE_SOURCE_DIR}/src/spawn/include")
+         PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw")
   set_target_properties(cls_rgw_gc PROPERTIES
     VERSION "1.0.0"
     SOVERSION "1"
@@ -328,8 +324,7 @@ if (WITH_RADOSGW)
   add_library(cls_rgw_gc_client STATIC ${cls_rgw_gc_client_srcs})
   target_include_directories(cls_rgw_gc_client
          PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw/driver/rados"
-         PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw"
-         PUBLIC "${CMAKE_SOURCE_DIR}/src/spawn/include")
+         PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw")
 endif (WITH_RADOSGW)
 
 
index baa028fa1b4afa7d2d1774af93d899efd2a90c6c..fd9a20901aa5675cbc1ab71497f9d96bcac70805 100644 (file)
 #include <boost/range/begin.hpp>
 #include <boost/range/end.hpp>
 #include <boost/asio/io_context.hpp>
+#include <boost/asio/spawn.hpp>
 
 #include "acconfig.h"
 
-#include <spawn/spawn.hpp>
-
-/// optional-like wrapper for a spawn::yield_context and its associated
-/// boost::asio::io_context. operations that take an optional_yield argument
-/// will, when passed a non-empty yield context, suspend this coroutine instead
-/// of the blocking the thread of execution
+/// optional-like wrapper for a boost::asio::yield_context. operations that take
+/// an optional_yield argument will, when passed a non-empty yield context,
+/// suspend this coroutine instead of the blocking the thread of execution
 class optional_yield {
-  boost::asio::io_context *c = nullptr;
-  spawn::yield_context *y = nullptr;
+  boost::asio::yield_context *y = nullptr;
  public:
   /// construct with a valid io and yield_context
-  explicit optional_yield(boost::asio::io_context& c,
-                          spawn::yield_context& y) noexcept
-    : c(&c), y(&y) {}
+  optional_yield(boost::asio::yield_context& y) noexcept : y(&y) {}
 
   /// type tag to construct an empty object
   struct empty_t {};
@@ -42,11 +37,8 @@ class optional_yield {
   /// implicit conversion to bool, returns true if non-empty
   operator bool() const noexcept { return y; }
 
-  /// return a reference to the associated io_context. only valid if non-empty
-  boost::asio::io_context& get_io_context() const noexcept { return *c; }
-
   /// return a reference to the yield_context. only valid if non-empty
-  spawn::yield_context& get_yield_context() const noexcept { return *y; }
+  boost::asio::yield_context& get_yield_context() const noexcept { return *y; }
 };
 
 // type tag object to construct an empty optional_yield
index c8d832247d92e924b239e8aa7ab8611ac2dc540a..40da7e495c37495895fdbd52260e9572e039280d 100644 (file)
@@ -30,7 +30,7 @@ endif(HAVE_NASM_X64)
 add_library(ceph_crypto_isal SHARED ${isal_crypto_plugin_srcs})
 target_include_directories(ceph_crypto_isal PRIVATE ${isal_dir}/include)
 
-target_link_libraries(ceph_crypto_isal PRIVATE spawn)
+target_link_libraries(ceph_crypto_isal PRIVATE Boost::context)
 
 set_target_properties(ceph_crypto_isal PROPERTIES
   VERSION 1.0.0
index 5365ab9a6ca2241b30e807e0b6c38934f06d4af8..ac9d868939657885b456c4451f5c5f5b1c0aee81 100644 (file)
@@ -8,7 +8,7 @@ add_library(ceph_crypto_openssl SHARED ${openssl_crypto_plugin_srcs})
 target_link_libraries(ceph_crypto_openssl
     PRIVATE OpenSSL::Crypto
     $<$<PLATFORM_ID:Windows>:ceph-common>
-    spawn)
+    Boost::context)
 target_include_directories(ceph_crypto_openssl PRIVATE ${OPENSSL_INCLUDE_DIR})
 add_dependencies(crypto_plugins ceph_crypto_openssl)
 set_target_properties(ceph_crypto_openssl PROPERTIES INSTALL_RPATH "")
index 04bc0b7e7f4309fe037a367d100acc7cbdaaf336..85f7ff50e1343c5f758e9e6446bcf8337d0baf68 100644 (file)
@@ -14,7 +14,7 @@ add_dependencies(crypto_plugins ceph_crypto_qat)
 target_link_libraries(ceph_crypto_qat PRIVATE
                       QAT::qat
                       QAT::usdm
-                      spawn)
+                      Boost::context)
 
 add_dependencies(crypto_plugins ceph_crypto_qat)
 set_target_properties(ceph_crypto_qat PROPERTIES VERSION 1.0.0 SOVERSION 1)
index d441e2cd6163e42ad7075a72322307a2b4105534..94c98518a90b6a20761602c3c50a68b32e05319a 100644 (file)
@@ -331,7 +331,7 @@ bool QccCrypto::perform_op_batch(unsigned char* out, const unsigned char* in, si
   int avail_inst = NON_INSTANCE;
 
   if (y) {
-    spawn::yield_context yield = y.get_yield_context();
+    boost::asio::yield_context yield = y.get_yield_context();
     avail_inst = async_get_instance(yield);
   } else {
     auto result = async_get_instance(boost::asio::use_future);
@@ -546,7 +546,7 @@ bool QccCrypto::symPerformOp(int avail_inst,
     do {
       poll_retry_num = RETRY_MAX_NUM;
       if (y) {
-        spawn::yield_context yield = y.get_yield_context();
+        boost::asio::yield_context yield = y.get_yield_context();
         status = helper.async_perform_op(std::span<CpaCySymDpOpData*>(pOpDataVec), yield);
       } else {
         auto result = helper.async_perform_op(std::span<CpaCySymDpOpData*>(pOpDataVec), boost::asio::use_future);
index cf214b39c957e05f7ddcce0f177c597ba17617cf..728dc6dba217bde2dda0c62aef1fa53f51ade084 100644 (file)
@@ -301,7 +301,7 @@ target_link_libraries(rgw_common
   PUBLIC
     ${LUA_LIBRARIES}
     RapidJSON::RapidJSON
-    spawn
+    Boost::context
     ${FMT_LIB}
     OpenSSL::SSL)
 target_include_directories(rgw_common
@@ -433,7 +433,7 @@ target_link_libraries(rgw_a
     OATH::OATH
   PUBLIC
     rgw_common
-    spawn)
+    Boost::context)
 
 if(WITH_CURL_OPENSSL)
   # used by rgw_http_client_curl.cc
@@ -449,7 +449,7 @@ set(rgw_schedulers_srcs
 
 add_library(rgw_schedulers STATIC ${rgw_schedulers_srcs})
 target_link_libraries(rgw_schedulers
-  PUBLIC dmclock::dmclock spawn)
+  PUBLIC dmclock::dmclock Boost::context)
 
 set(radosgw_srcs
   rgw_main.cc)
index 2e9f9ad80cf8e37c5b1ef0156649394ef4234d65..35b34a2c1d9ae65514819eacead9fd5bb6324ac6 100644 (file)
@@ -17,7 +17,7 @@ struct initiate_exec {
   {
     auto h = boost::asio::consign(std::move(handler), conn);
     return boost::asio::dispatch(get_executor(),
-        [c = conn, &req, &resp, h = std::move(h)] {
+        [c = conn, &req, &resp, h = std::move(h)] () mutable {
             return c->async_exec(req, resp, std::move(h));
     });
   }
index 9e2fa358f81bdad0c070282a59bbe625c308bdf4..b42228078d0d9e0654ca92631826259284b75566 100644 (file)
@@ -17,9 +17,10 @@ struct initiate_exec {
   void operator()(Handler handler, const boost::redis::request& req, Response& resp)
   {
     auto h = asio::consign(std::move(handler), conn);
-    return asio::dispatch(get_executor(), [c=conn, &req, &resp, h=std::move(h)] {
-      c->async_exec(req, resp, std::move(h));
-    });
+    return asio::dispatch(get_executor(),
+        [c=conn, &req, &resp, h=std::move(h)] () mutable {
+          c->async_exec(req, resp, std::move(h));
+        });
   }
 };
 
index a3aca7a64e4cc1e97424dbb5b29574f4d1658219..f401c912f6751827bf9e8e3264ba09a45152f844 100644 (file)
@@ -29,7 +29,7 @@ target_include_directories(dbstore_lib
     PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw"
     PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw/store/rados"
     PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}")
-set(link_targets spawn)
+set(link_targets Boost::context)
 if(WITH_JAEGER)
   list(APPEND link_targets jaeger_base)
 endif()
index ed23129eb304520cc6a2e6d79ab7b81174ffe37c..85e8f53997536e6787bd49414d316641a8496c5f 100644 (file)
@@ -89,7 +89,7 @@ using part_info = fifo::part_header;
 ///
 /// This library uses optional_yield. Please see
 /// /src/common/async/yield_context.h. In summary, optional_yield
-/// contains either a spawn::yield_context (in which case the current
+/// contains either a boost::asio::yield_context (in which case the current
 /// coroutine is suspended until completion) or null_yield (in which
 /// case the current thread is blocked until completion.)
 ///
index 7cf6fbd56f1a2ad39e28ca3f6543bde67fc68638..ff6325e222a7d49b6d191dcdcde4cfe53649f0f1 100644 (file)
@@ -513,17 +513,15 @@ int RGWBucket::check_bad_index_multipart(rgw::sal::RadosStore* const rados_store
   const int max_aio = std::max(1, op_state.get_max_aio());
   int any_error = 0; // first error encountered if any
   for (int i = 0; i < max_aio; i++) {
-    spawn::spawn(context, [&](spawn::yield_context yield) {
+    boost::asio::spawn(context, [&](boost::asio::yield_context yield) {
       while (true) {
         const int shard = next_shard++;
         if (shard >= num_shards) {
           return;
         }
 
-        optional_yield y(context, yield);
-
         int r = ::check_bad_index_multipart(rados_store, &*bucket, dpp,
-                                           op_state, flusher, shard, y);
+                                           op_state, flusher, shard, yield);
         if (r < 0) {
           ldpp_dout(dpp, -1) << "WARNING: error processing shard " << shard <<
             " check_bad_index_multipart(): " << r << "; skipping" << dendl;
@@ -533,6 +531,8 @@ int RGWBucket::check_bad_index_multipart(rgw::sal::RadosStore* const rados_store
          }
         }
       } // while
+    }, [] (std::exception_ptr eptr) {
+      if (eptr) std::rethrow_exception(eptr);
     });
   } // for
 
@@ -738,14 +738,14 @@ int RGWBucket::check_index_olh(rgw::sal::RadosStore* const rados_store,
   const int max_aio = std::max(1, op_state.get_max_aio());
 
   for (int i=0; i<max_aio; i++) {
-    spawn::spawn(context, [&](spawn::yield_context yield) {
+    boost::asio::spawn(context, [&](boost::asio::yield_context yield) {
       while (true) {
         int shard = next_shard;
         next_shard += 1;
         if (shard >= max_shards) {
           return;
         }
-        optional_yield y(context, yield);
+        optional_yield y(yield);
         uint64_t shard_count;
         int r = ::check_index_olh(rados_store, &*bucket, dpp, op_state, flusher, shard, &shard_count, y);
         if (r < 0) {
@@ -758,6 +758,8 @@ int RGWBucket::check_index_olh(rgw::sal::RadosStore* const rados_store,
             " entries " << verb << ")" << dendl;
         }
       }
+    }, [] (std::exception_ptr eptr) {
+      if (eptr) std::rethrow_exception(eptr);
     });
   }
   try {
@@ -947,7 +949,7 @@ int RGWBucket::check_index_unlinked(rgw::sal::RadosStore* const rados_store,
   int next_shard = 0;
   boost::asio::io_context context;
   for (int i=0; i<max_aio; i++) {
-    spawn::spawn(context, [&](spawn::yield_context yield) {
+    boost::asio::spawn(context, [&](boost::asio::yield_context yield) {
       while (true) {
         int shard = next_shard;
         next_shard += 1;
@@ -955,8 +957,7 @@ int RGWBucket::check_index_unlinked(rgw::sal::RadosStore* const rados_store,
           return;
         }
         uint64_t shard_count;
-        optional_yield y {context, yield};
-        int r = ::check_index_unlinked(rados_store, &*bucket, dpp, op_state, flusher, shard, &shard_count, y);
+        int r = ::check_index_unlinked(rados_store, &*bucket, dpp, op_state, flusher, shard, &shard_count, yield);
         if (r < 0) {
           ldpp_dout(dpp, -1) << "ERROR: error processing shard " << shard << 
             " check_index_unlinked(): " << r << dendl;
@@ -967,6 +968,8 @@ int RGWBucket::check_index_unlinked(rgw::sal::RadosStore* const rados_store,
             " entries " << verb << ")" << dendl;
         }
       }
+    }, [] (std::exception_ptr eptr) {
+      if (eptr) std::rethrow_exception(eptr);
     });
   }
   try {
index c70693895bf83d61ba5d4de98c5720435fc0423a..e7478f1911443ff02b7f91e99a06c51af6efd9e6 100644 (file)
@@ -9,8 +9,8 @@
 #include <boost/asio/basic_waitable_timer.hpp>
 #include <boost/asio/executor_work_guard.hpp>
 #include <boost/asio/io_context.hpp>
+#include <boost/asio/spawn.hpp>
 #include <boost/context/protected_fixedsize_stack.hpp>
-#include <spawn/spawn.hpp>
 #include "include/function2.hpp"
 #include "rgw_sal_rados.h"
 #include "rgw_pubsub.h"
@@ -218,7 +218,7 @@ private:
       pending_tokens(0),
       timer(io_context) {}  
  
-    void async_wait(spawn::yield_context yield) { 
+    void async_wait(boost::asio::yield_context yield) {
       if (pending_tokens == 0) {
         return;
       }
@@ -241,7 +241,7 @@ private:
   // processing of a specific entry
   // return whether processing was successful (true) or not (false)
   EntryProcessingResult process_entry(const ConfigProxy& conf, persistency_tracker& entry_persistency_tracker,
-                                      const cls_queue_entry& entry, spawn::yield_context yield) {
+                                      const cls_queue_entry& entry, boost::asio::yield_context yield) {
     event_entry_t event_entry;
     auto iter = entry.data.cbegin();
     try {
@@ -290,7 +290,7 @@ private:
           cct);
       ldpp_dout(this, 20) << "INFO: push endpoint created: " << event_entry.push_endpoint <<
         " for entry: " << entry.marker << dendl;
-      const auto ret = push_endpoint->send(event_entry.event, optional_yield(io_context, yield));
+      const auto ret = push_endpoint->send(event_entry.event, yield);
       if (ret < 0) {
         ldpp_dout(this, 5) << "WARNING: push entry marker: " << entry.marker
                            << " failed. error: " << ret
@@ -311,7 +311,7 @@ private:
   }
 
   // clean stale reservation from queue
-  void cleanup_queue(const std::string& queue_name, spawn::yield_context yield) {
+  void cleanup_queue(const std::string& queue_name, boost::asio::yield_context yield) {
     while (!shutdown) {
       ldpp_dout(this, 20) << "INFO: trying to perform stale reservation cleanup for queue: " << queue_name << dendl;
       const auto now = ceph::coarse_real_time::clock::now();
@@ -324,7 +324,7 @@ private:
         "" /*no tag*/);
       cls_2pc_queue_expire_reservations(op, stale_time);
       // check ownership and do reservation cleanup in one batch
-      auto ret = rgw_rados_operate(this, rados_store.getRados()->get_notif_pool_ctx(), queue_name, &op, optional_yield(io_context, yield));
+      auto ret = rgw_rados_operate(this, rados_store.getRados()->get_notif_pool_ctx(), queue_name, &op, yield);
       if (ret == -ENOENT) {
         // queue was deleted
         ldpp_dout(this, 10) << "INFO: queue: " << queue_name
@@ -351,12 +351,12 @@ private:
   }
 
   // unlock (lose ownership) queue
-  int unlock_queue(const std::string& queue_name, spawn::yield_context yield) {
+  int unlock_queue(const std::string& queue_name, boost::asio::yield_context yield) {
     librados::ObjectWriteOperation op;
     op.assert_exists();
     rados::cls::lock::unlock(&op, queue_name+"_lock", lock_cookie);
     auto& rados_ioctx = rados_store.getRados()->get_notif_pool_ctx();
-    const auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield));
+    const auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, yield);
     if (ret == -ENOENT) {
       ldpp_dout(this, 10) << "INFO: queue: " << queue_name
         << ". was removed. nothing to unlock" << dendl;
@@ -371,15 +371,18 @@ private:
   }
 
   // processing of a specific queue
-  void process_queue(const std::string& queue_name, spawn::yield_context yield) {
+  void process_queue(const std::string& queue_name, boost::asio::yield_context yield) {
     constexpr auto max_elements = 1024;
     auto is_idle = false;
     const std::string start_marker;
 
     // start a the cleanup coroutine for the queue
-    spawn::spawn(io_context, [this, queue_name](spawn::yield_context yield) {
-            cleanup_queue(queue_name, yield);
-            }, make_stack_allocator());
+    boost::asio::spawn(make_strand(io_context), std::allocator_arg, make_stack_allocator(),
+            [this, queue_name](boost::asio::yield_context yield) {
+              cleanup_queue(queue_name, yield);
+            }, [] (std::exception_ptr eptr) {
+              if (eptr) std::rethrow_exception(eptr);
+            });
 
     CountersManager queue_counters_container(queue_name, this->get_cct());
 
@@ -410,7 +413,7 @@ private:
           "" /*no tag*/);
         cls_2pc_queue_list_entries(op, start_marker, max_elements, &obl, &rval);
         // check ownership and list entries in one batch
-        auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, nullptr, optional_yield(io_context, yield));
+        auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, nullptr, yield);
         if (ret == -ENOENT) {
           // queue was deleted
           topics_persistency_tracker.erase(queue_name);
@@ -460,8 +463,9 @@ private:
         }
 
         entries_persistency_tracker& notifs_persistency_tracker = topics_persistency_tracker[queue_name];
-        spawn::spawn(yield, [this, &notifs_persistency_tracker, &queue_name, entry_idx, total_entries, &end_marker,
-                             &remove_entries, &has_error, &waiter, &entry, &needs_migration_vector](spawn::yield_context yield) {
+        boost::asio::spawn(yield, std::allocator_arg, make_stack_allocator(),
+          [this, &notifs_persistency_tracker, &queue_name, entry_idx, total_entries, &end_marker,
+           &remove_entries, &has_error, &waiter, &entry, &needs_migration_vector](boost::asio::yield_context yield) {
             const auto token = waiter.make_token();
             auto& persistency_tracker = notifs_persistency_tracker[entry.marker];
             auto result = process_entry(this->get_cct()->_conf, persistency_tracker, entry, yield);
@@ -483,7 +487,9 @@ private:
               ldpp_dout(this, 20) << "INFO: processing of entry: " << 
                 entry.marker << " (" << entry_idx << "/" << total_entries << ") from: " << queue_name << " failed" << dendl;
             } 
-        }, make_stack_allocator());
+        }, [] (std::exception_ptr eptr) {
+          if (eptr) std::rethrow_exception(eptr);
+        });
         ++entry_idx;
       }
 
@@ -515,7 +521,7 @@ private:
           "" /*no tag*/);
         cls_2pc_queue_remove_entries(op, end_marker, entries_to_remove);
         // check ownership and deleted entries in one batch
-        auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield));
+        auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, yield);
         if (ret == -ENOENT) {
           // queue was deleted
           ldpp_dout(this, 10) << "INFO: queue: " << queue_name
@@ -547,7 +553,7 @@ private:
 
           rgw_pubsub_topic topic;
           auto ret_of_get_topic = ps.get_topic(this, queue_name, topic,
-                           optional_yield(io_context, yield), nullptr);
+                                               yield, nullptr);
           if (ret_of_get_topic < 0) {
             // we can't migrate entries without topic info
             ldpp_dout(this, 1) << "ERROR: failed to fetch topic: " << queue_name << " error: "
@@ -579,7 +585,7 @@ private:
           buffer::list obl;
           int rval;
           cls_2pc_queue_reserve(op, size_to_migrate, migration_vector.size(), &obl, &rval);
-          ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield), librados::OPERATION_RETURNVEC);
+          ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, yield, librados::OPERATION_RETURNVEC);
           if (ret < 0) {
             ldpp_dout(this, 1) << "ERROR: failed to reserve migration space on queue: " << queue_name << ". error: " << ret << dendl;
             return;
@@ -591,7 +597,7 @@ private:
           }
 
           cls_2pc_queue_commit(op, migration_vector, reservation_id);
-          ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield));
+          ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, yield);
           reservation_id = cls_2pc_reservation::NO_ID;
           if (ret < 0) {
             ldpp_dout(this, 1) << "ERROR: failed to commit reservation to queue: " << queue_name << ". error: " << ret << dendl;
@@ -618,7 +624,7 @@ private:
 
   // process all queues
   // find which of the queues is owned by this daemon and process it
-  void process_queues(spawn::yield_context yield) {
+  void process_queues(boost::asio::yield_context yield) {
     auto has_error = false;
     owned_queues_t owned_queues;
     size_t processed_queue_count = 0;
@@ -646,7 +652,7 @@ private:
       timer.async_wait(yield[ec]);
 
       queues_t queues;
-      auto ret = read_queue_list(queues, optional_yield(io_context, yield));
+      auto ret = read_queue_list(queues, yield);
       if (ret < 0) {
         has_error = true;
         continue;
@@ -665,7 +671,7 @@ private:
               failover_time,
               LOCK_FLAG_MAY_RENEW);
 
-        ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield));
+        ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, yield);
         if (ret == -EBUSY) {
           // lock is already taken by another RGW
           ldpp_dout(this, 20) << "INFO: queue: " << queue_name << " owned (locked) by another daemon" << dendl;
@@ -687,7 +693,8 @@ private:
         if (owned_queues.insert(queue_name).second) {
           ldpp_dout(this, 10) << "INFO: queue: " << queue_name << " now owned (locked) by this daemon" << dendl;
           // start processing this queue
-          spawn::spawn(io_context, [this, &queue_gc, &queue_gc_lock, queue_name, &processed_queue_count](spawn::yield_context yield) {
+          boost::asio::spawn(make_strand(io_context), std::allocator_arg, make_stack_allocator(),
+                             [this, &queue_gc, &queue_gc_lock, queue_name, &processed_queue_count](boost::asio::yield_context yield) {
             ++processed_queue_count;
             process_queue(queue_name, yield);
             // if queue processing ended, it means that the queue was removed or not owned anymore
@@ -703,7 +710,9 @@ private:
             queue_gc.push_back(queue_name);
             --processed_queue_count;
             ldpp_dout(this, 10) << "INFO: queue: " << queue_name << " marked for removal" << dendl;
-          }, make_stack_allocator());
+          }, [] (std::exception_ptr eptr) {
+            if (eptr) std::rethrow_exception(eptr);
+          });
         } else {
           ldpp_dout(this, 20) << "INFO: queue: " << queue_name << " ownership (lock) renewed" << dendl;
         }
@@ -741,9 +750,12 @@ public:
   }
 
   void init() {
-    spawn::spawn(io_context, [this](spawn::yield_context yield) {
+    boost::asio::spawn(make_strand(io_context), std::allocator_arg, make_stack_allocator(),
+        [this](boost::asio::yield_context yield) {
           process_queues(yield);
-        }, make_stack_allocator());
+        }, [] (std::exception_ptr eptr) {
+          if (eptr) std::rethrow_exception(eptr);
+        });
 
     // start the worker threads to do the actual queue processing
     const std::string WORKER_THREAD_NAME = "notif-worker";
index 8906d2e6cb95685eab8790f634a87ddac45a1ec5..ce765853ed4f48d1f463cdeecc8b63abf5fe8c42 100644 (file)
@@ -19,7 +19,7 @@
 #ifdef WITH_RADOSGW_KAFKA_ENDPOINT
 #include "rgw_kafka.h"
 #endif
-#include <boost/asio/yield.hpp>
+//#include <boost/asio/yield.hpp>
 #include <boost/algorithm/string.hpp>
 #include <functional>
 #include "rgw_perf_counters.h"
@@ -153,7 +153,7 @@ public:
       boost::system::error_code ec;
       auto yield = y.get_yield_context();
       auto&& token = yield[ec];
-      boost::asio::async_initiate<spawn::yield_context, Signature>(
+      boost::asio::async_initiate<boost::asio::yield_context, Signature>(
           [this] (auto handler, auto ex) {
             completion = Completion::create(ex, std::move(handler));
           }, token, yield.get_executor());
index 45d50452e03c50a8d81f6c0bc8a112c18eac9ce0..c30caf3f5335684188bef4b655835fad1d88530c 100644 (file)
@@ -1363,10 +1363,15 @@ int RGWRados::init_complete(const DoutPrefixProvider *dpp, optional_yield y)
     using namespace rgw;
     if (svc.site->is_meta_master() &&
         all_zonegroups_support(*svc.site, zone_features::notification_v2)) {
-      spawn::spawn(v1_topic_migration, [this] (spawn::yield_context yield) {
+      boost::asio::spawn(v1_topic_migration, [this] (boost::asio::yield_context yield) {
             DoutPrefix dpp{cct, dout_subsys, "v1 topic migration: "};
             rgwrados::topic_migration::migrate(&dpp, driver, v1_topic_migration, yield);
+          }, [] (std::exception_ptr eptr) {
+            if (eptr) std::rethrow_exception(eptr);
           });
+      // TODO: we run this on a separate thread so shutdown can cancel it with
+      // v1_topic_migration.stop(), but we could run it on the global thread
+      // pool and cancel spawn() with a cancellation_signal instead
       v1_topic_migration.start(1);
     }
   }
index c1011bd60a5503ce27aa086f270404c692e21574..d590bff7fcfdea899538e06188660cb61ea74280 100644 (file)
@@ -1153,10 +1153,9 @@ int RGWReshardWait::wait(optional_yield y)
   }
 
   if (y) {
-    auto& context = y.get_io_context();
     auto& yield = y.get_yield_context();
 
-    Waiter waiter(context);
+    Waiter waiter(yield.get_executor());
     waiters.push_back(waiter);
     lock.unlock();
 
index 0497414566ad28182e52c1cd802cc46bf50c71f7..8e37defa1dbdcc4ce744a4e5f5b3c53ee99399d9 100644 (file)
@@ -252,11 +252,11 @@ class RGWReshardWait {
   ceph::condition_variable cond;
 
   struct Waiter : boost::intrusive::list_base_hook<> {
-    using Executor = boost::asio::io_context::executor_type;
+    using Executor = boost::asio::any_io_executor;
     using Timer = boost::asio::basic_waitable_timer<Clock,
           boost::asio::wait_traits<Clock>, Executor>;
     Timer timer;
-    explicit Waiter(boost::asio::io_context& ioc) : timer(ioc) {}
+    explicit Waiter(boost::asio::any_io_executor ex) : timer(ex) {}
   };
   boost::intrusive::list<Waiter> waiters;
 
index 41254f9519e344a70577d552f69837c717bd5a20..eec4a799115654a30ccc4a18fc44e9688eb6f83f 100644 (file)
@@ -203,11 +203,10 @@ int rgw_rados_operate(const DoutPrefixProvider *dpp, librados::IoCtx& ioctx, con
   // given a yield_context, call async_operate() to yield the coroutine instead
   // of blocking
   if (y) {
-    auto& context = y.get_io_context();
     auto& yield = y.get_yield_context();
     boost::system::error_code ec;
     auto bl = librados::async_operate(
-      context, ioctx, oid, op, flags, trace_info, yield[ec]);
+      yield, ioctx, oid, op, flags, trace_info, yield[ec]);
     if (pbl) {
       *pbl = std::move(bl);
     }
@@ -228,10 +227,9 @@ int rgw_rados_operate(const DoutPrefixProvider *dpp, librados::IoCtx& ioctx, con
                      int flags, const jspan_context* trace_info)
 {
   if (y) {
-    auto& context = y.get_io_context();
     auto& yield = y.get_yield_context();
     boost::system::error_code ec;
-    librados::async_operate(context, ioctx, oid, op, flags, trace_info, yield[ec]);
+    librados::async_operate(yield, ioctx, oid, op, flags, trace_info, yield[ec]);
     return -ec.value();
   }
   if (is_asio_thread) {
@@ -248,10 +246,9 @@ int rgw_rados_notify(const DoutPrefixProvider *dpp, librados::IoCtx& ioctx, cons
                      optional_yield y)
 {
   if (y) {
-    auto& context = y.get_io_context();
     auto& yield = y.get_yield_context();
     boost::system::error_code ec;
-    auto reply = librados::async_notify(context, ioctx, oid,
+    auto reply = librados::async_notify(yield, ioctx, oid,
                                         bl, timeout_ms, yield[ec]);
     if (pbl) {
       *pbl = std::move(reply);
index c7dcfc37b4489508999b0d5723787f8e07790d30..0d4238f84312efd4a1f5b9d7c89cd44f4c347af8 100644 (file)
@@ -263,10 +263,8 @@ int migrate_topics(const DoutPrefixProvider* dpp, optional_yield y,
 int migrate(const DoutPrefixProvider* dpp,
             rgw::sal::RadosStore* driver,
             boost::asio::io_context& context,
-            spawn::yield_context yield)
+            boost::asio::yield_context y)
 {
-  auto y = optional_yield{context, yield};
-
   ldpp_dout(dpp, 1) << "starting v1 topic migration.." << dendl;
 
   librados::Rados* rados = driver->getRados()->get_rados_handle();
index 9545fd63c2e66c8957101bf0d50b2ebd8c160abf..2bd2b730c85a5244f587268d80337c8b6756f3c5 100644 (file)
@@ -16,7 +16,7 @@
 #pragma once
 
 #include <boost/asio/io_context.hpp>
-#include <spawn/spawn.hpp>
+#include <boost/asio/spawn.hpp>
 
 class DoutPrefixProvider;
 namespace rgw::sal { class RadosStore; }
@@ -29,6 +29,6 @@ namespace rgwrados::topic_migration {
 int migrate(const DoutPrefixProvider* dpp,
             rgw::sal::RadosStore* driver,
             boost::asio::io_context& context,
-            spawn::yield_context yield);
+            boost::asio::yield_context yield);
 
 } // rgwrados::topic_migration
index 5b2ce1569b33ae18f3d45abd07f50ebc89466c6e..a239171f104d77ff4963c1539fab5ff1d529d4fb 100644 (file)
@@ -90,16 +90,14 @@ struct Handler {
 
 template <typename Op>
 Aio::OpFunc aio_abstract(librados::IoCtx ctx, Op&& op,
-                         boost::asio::io_context& context,
-                         spawn::yield_context yield, jspan_context* trace_ctx = nullptr) {
-  return [ctx = std::move(ctx), op = std::move(op), &context, yield, trace_ctx] (Aio* aio, AioResult& r) mutable {
+                         boost::asio::yield_context yield,
+                         jspan_context* trace_ctx) {
+  return [ctx = std::move(ctx), op = std::move(op), yield, trace_ctx] (Aio* aio, AioResult& r) mutable {
       // arrange for the completion Handler to run on the yield_context's strand
       // executor so it can safely call back into Aio without locking
-      using namespace boost::asio;
-      async_completion<spawn::yield_context, void()> init(yield);
-      auto ex = get_associated_executor(init.completion_handler);
+      auto ex = yield.get_executor();
 
-      librados::async_operate(context, ctx, r.obj.oid, &op, 0, trace_ctx,
+      librados::async_operate(yield, ctx, r.obj.oid, &op, 0, trace_ctx,
                               bind_executor(ex, Handler{aio, ctx, r}));
     };
 }
@@ -111,7 +109,7 @@ Aio::OpFunc d3n_cache_aio_abstract(const DoutPrefixProvider *dpp, optional_yield
     ceph_assert(y);
     auto c = std::make_unique<D3nL1CacheRequest>();
     lsubdout(g_ceph_context, rgw_datacache, 20) << "D3nDataCache: d3n_cache_aio_abstract(): libaio Read From Cache, oid=" << r.obj.oid << dendl;
-    c->file_aio_read_abstract(dpp, y.get_io_context(), y.get_yield_context(), cache_location, read_ofs, read_len, aio, r);
+    c->file_aio_read_abstract(dpp, y.get_yield_context(), cache_location, read_ofs, read_len, aio, r);
   };
 }
 
@@ -123,7 +121,7 @@ Aio::OpFunc aio_abstract(librados::IoCtx ctx, Op&& op, optional_yield y, jspan_c
   static_assert(!std::is_const_v<Op>);
   if (y) {
     return aio_abstract(std::move(ctx), std::forward<Op>(op),
-                        y.get_io_context(), y.get_yield_context(), trace_ctx);
+                        y.get_yield_context(), trace_ctx);
   }
   return aio_abstract(std::move(ctx), std::forward<Op>(op), trace_ctx);
 }
index c0656ef225e6a038e16bb4defcf7b46191de9b4c..87fc980a94c16eff3589911b3f9483c7f6da6594 100644 (file)
@@ -80,8 +80,7 @@ class BlockingAioThrottle final : public Aio, private Throttle {
 // a throttle that yields the coroutine instead of blocking. all public
 // functions must be called within the coroutine strand
 class YieldingAioThrottle final : public Aio, private Throttle {
-  boost::asio::io_context& context;
-  spawn::yield_context yield;
+  boost::asio::yield_context yield;
   struct Handler;
 
   // completion callback associated with the waiter
@@ -94,9 +93,8 @@ class YieldingAioThrottle final : public Aio, private Throttle {
   struct Pending : AioResultEntry { uint64_t cost = 0; };
 
  public:
-  YieldingAioThrottle(uint64_t window, boost::asio::io_context& context,
-                      spawn::yield_context yield)
-    : Throttle(window), context(context), yield(yield)
+  YieldingAioThrottle(uint64_t window, boost::asio::yield_context yield)
+    : Throttle(window), yield(yield)
   {}
 
   virtual ~YieldingAioThrottle() override {};
@@ -119,7 +117,6 @@ inline auto make_throttle(uint64_t window_size, optional_yield y)
   std::unique_ptr<Aio> aio;
   if (y) {
     aio = std::make_unique<YieldingAioThrottle>(window_size,
-                                                y.get_io_context(),
                                                 y.get_yield_context());
   } else {
     aio = std::make_unique<BlockingAioThrottle>(window_size);
index 8c64570287daedb511e5b13fb4804995cba7bbe6..ace3b7aff49e9a4d3424952946ceeb8ff4b8aa0f 100644 (file)
@@ -3,6 +3,7 @@
 
 #include <atomic>
 #include <ctime>
+#include <memory>
 #include <vector>
 
 #include <boost/asio/error.hpp>
@@ -16,7 +17,7 @@
 #include <boost/smart_ptr/intrusive_ref_counter.hpp>
 
 #include <boost/context/protected_fixedsize_stack.hpp>
-#include <spawn/spawn.hpp>
+#include <boost/asio/spawn.hpp>
 
 #include "common/async/shared_mutex.h"
 #include "common/errno.h"
@@ -69,12 +70,12 @@ class StreamIO : public rgw::asio::ClientIO {
   CephContext* const cct;
   Stream& stream;
   timeout_timer& timeout;
-  spawn::yield_context yield;
+  boost::asio::yield_context yield;
   parse_buffer& buffer;
   boost::system::error_code fatal_ec;
  public:
   StreamIO(CephContext *cct, Stream& stream, timeout_timer& timeout,
-           rgw::asio::parser_type& parser, spawn::yield_context yield,
+           rgw::asio::parser_type& parser, boost::asio::yield_context yield,
            parse_buffer& buffer, bool is_ssl,
            const tcp::endpoint& local_endpoint,
            const tcp::endpoint& remote_endpoint)
@@ -200,7 +201,7 @@ void handle_connection(boost::asio::io_context& context,
                        rgw::dmclock::Scheduler *scheduler,
                        const std::string& uri_prefix,
                        boost::system::error_code& ec,
-                       spawn::yield_context yield)
+                       boost::asio::yield_context yield)
 {
   // don't impose a limit on the body, since we read it in pieces
   static constexpr size_t body_limit = std::numeric_limits<size_t>::max();
@@ -281,7 +282,7 @@ void handle_connection(boost::asio::io_context& context,
       RGWRestfulIO client(cct, &real_client_io);
       optional_yield y = null_yield;
       if (cct->_conf->rgw_beast_enable_async) {
-        y = optional_yield{context, yield};
+        y = optional_yield{yield};
       }
       int http_ret = 0;
       string user = "-";
@@ -1021,8 +1022,8 @@ void AsioFrontend::accept(Listener& l, boost::system::error_code ec)
   // spawn a coroutine to handle the connection
 #ifdef WITH_RADOSGW_BEAST_OPENSSL
   if (l.use_ssl) {
-    spawn::spawn(context,
-      [this, s=std::move(stream)] (spawn::yield_context yield) mutable {
+    boost::asio::spawn(make_strand(context), std::allocator_arg, make_stack_allocator(),
+      [this, s=std::move(stream)] (boost::asio::yield_context yield) mutable {
         auto conn = boost::intrusive_ptr{new Connection(std::move(s))};
         auto c = connections.add(*conn);
         // wrap the tcp stream in an ssl stream
@@ -1049,13 +1050,15 @@ void AsioFrontend::accept(Listener& l, boost::system::error_code ec)
         }
 
         conn->socket.shutdown(tcp::socket::shutdown_both, ec);
-      }, make_stack_allocator());
+      }, [] (std::exception_ptr eptr) {
+        if (eptr) std::rethrow_exception(eptr);
+      });
   } else {
 #else
   {
 #endif // WITH_RADOSGW_BEAST_OPENSSL
-    spawn::spawn(context,
-      [this, s=std::move(stream)] (spawn::yield_context yield) mutable {
+    boost::asio::spawn(make_strand(context), std::allocator_arg, make_stack_allocator(),
+      [this, s=std::move(stream)] (boost::asio::yield_context yield) mutable {
         auto conn = boost::intrusive_ptr{new Connection(std::move(s))};
         auto c = connections.add(*conn);
         auto timeout = timeout_timer{context.get_executor(), request_timeout, conn};
@@ -1064,7 +1067,9 @@ void AsioFrontend::accept(Listener& l, boost::system::error_code ec)
                           conn->buffer, false, pause_mutex, scheduler.get(),
                           uri_prefix, ec, yield);
         conn->socket.shutdown(tcp::socket::shutdown_both, ec);
-      }, make_stack_allocator());
+      }, [] (std::exception_ptr eptr) {
+        if (eptr) std::rethrow_exception(eptr);
+      });
   }
 }
 
index 0519c6def3e825a76dbc456ea69296bca903fa7f..54b495f5461f8d5a187918c8d7115ace7c38df49 100644 (file)
@@ -7,6 +7,8 @@
 #include <stdlib.h>
 #include <aio.h>
 
+#include <boost/asio/spawn.hpp>
+
 #include "include/rados/librados.hpp"
 #include "include/Context.h"
 #include "common/async/completion.h"
@@ -135,13 +137,10 @@ struct D3nL1CacheRequest {
     }
   };
 
-  void file_aio_read_abstract(const DoutPrefixProvider *dpp, spawn::yield_context yield,
+  void file_aio_read_abstract(const DoutPrefixProvider *dpp, boost::asio::yield_context yield,
                               std::string& cache_location, off_t read_ofs, off_t read_len,
                               rgw::Aio* aio, rgw::AioResult& r) {
-    using namespace boost::asio;
-    async_completion<spawn::yield_context, void()> init(yield);
-    auto ex = get_associated_executor(init.completion_handler);
-
+    auto ex = yield.get_executor();
     ldpp_dout(dpp, 20) << "D3nDataCache: " << __func__ << "(): oid=" << r.obj.oid << dendl;
     async_read(dpp, ex, cache_location+"/"+url_encode(r.obj.oid, true), read_ofs, read_len, bind_executor(ex, d3n_libaio_handler{aio, r}));
   }
index 61191b14e4990b0c6ba4818188c419ce1770af11..76fbb332e6bc162f2f706d8695ff1e0ead9edb58 100644 (file)
@@ -6789,7 +6789,8 @@ void RGWDeleteMultiObj::execute(optional_yield y)
   char* buf;
   std::optional<boost::asio::deadline_timer> formatter_flush_cond;
   if (y) {
-    formatter_flush_cond = std::make_optional<boost::asio::deadline_timer>(y.get_io_context());  
+    auto ex = y.get_yield_context().get_executor();
+    formatter_flush_cond = std::make_optional<boost::asio::deadline_timer>(ex);
   }
 
   buf = data.c_str();
@@ -6857,9 +6858,11 @@ void RGWDeleteMultiObj::execute(optional_yield y)
         return aio_count < max_aio;
       });
       aio_count++;
-      spawn::spawn(y.get_yield_context(), [this, &y, &aio_count, obj_key, &formatter_flush_cond] (spawn::yield_context yield) {
-        handle_individual_object(obj_key, optional_yield { y.get_io_context(), yield }, &*formatter_flush_cond); 
+      boost::asio::spawn(y.get_yield_context(), [this, &aio_count, obj_key, &formatter_flush_cond] (boost::asio::yield_context yield) {
+        handle_individual_object(obj_key, yield, &*formatter_flush_cond);
         aio_count--;
+      }, [] (std::exception_ptr eptr) {
+        if (eptr) std::rethrow_exception(eptr);
       }); 
     } else {
       handle_individual_object(obj_key, y, nullptr);
index 183f5351e2fbb89a97049150101477124e68f45b..0d8e462365eeed862edc693413ef1da3901d2ac4 100644 (file)
@@ -35,9 +35,9 @@ struct initiate_exec {
   {
     auto h = boost::asio::consign(std::move(handler), conn);
     return boost::asio::dispatch(get_executor(),
-        [c=conn, &req, &resp, h=std::move(h)] {
+        [c=conn, &req, &resp, h=std::move(h)] () mutable {
           return c->async_exec(req, resp, std::move(h));
-          });
+        });
   } 
 };
 
@@ -555,9 +555,8 @@ Aio::OpFunc RedisDriver::redis_read_op(optional_yield y, std::shared_ptr<connect
 {
   return [y, conn, &key] (Aio* aio, AioResult& r) mutable {
     using namespace boost::asio;
-    spawn::yield_context yield = y.get_yield_context();
-    async_completion<spawn::yield_context, void()> init(yield);
-    auto ex = get_associated_executor(init.completion_handler);
+    yield_context yield = y.get_yield_context();
+    auto ex = yield.get_executor();
 
     // TODO: Make unique pointer once support is added
     auto s = std::make_shared<RedisDriver::redis_response>();
@@ -574,9 +573,8 @@ Aio::OpFunc RedisDriver::redis_write_op(optional_yield y, std::shared_ptr<connec
 {
   return [y, conn, &bl, &attrs, &key] (Aio* aio, AioResult& r) mutable {
     using namespace boost::asio;
-    spawn::yield_context yield = y.get_yield_context();
-    async_completion<spawn::yield_context, void()> init(yield);
-    auto ex = get_associated_executor(init.completion_handler);
+    yield_context yield = y.get_yield_context();
+    auto ex = yield.get_executor();
 
     auto redisAttrs = build_attrs(attrs);
 
index 24726ebcf62e1713e6407a5f036344477a23f720..5d7a5a97119c252fafd48c824cf7b9a21480f8ff 100644 (file)
@@ -98,9 +98,8 @@ int SSDDriver::put(const DoutPrefixProvider* dpp, const std::string& key, const
     boost::system::error_code ec;
     if (y) {
         using namespace boost::asio;
-        spawn::yield_context yield = y.get_yield_context();
-        async_completion<spawn::yield_context, void()> init(yield);
-        auto ex = get_associated_executor(init.completion_handler);
+        yield_context yield = y.get_yield_context();
+        auto ex = yield.get_executor();
         this->put_async(dpp, ex, key, bl, len, attrs, yield[ec]);
     } else {
       auto ex = boost::asio::system_executor{};
@@ -285,9 +284,8 @@ rgw::Aio::OpFunc SSDDriver::ssd_cache_read_op(const DoutPrefixProvider *dpp, opt
     ldpp_dout(dpp, 20) << "SSDCache: cache_read_op(): Read From Cache, oid=" << r.obj.oid << dendl;
 
     using namespace boost::asio;
-    spawn::yield_context yield = y.get_yield_context();
-    async_completion<spawn::yield_context, void()> init(yield);
-    auto ex = get_associated_executor(init.completion_handler);
+    yield_context yield = y.get_yield_context();
+    auto ex = yield.get_executor();
 
     ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): key=" << key << dendl;
     this->get_async(dpp, ex, key, read_ofs, read_len, bind_executor(ex, SSDDriver::libaio_read_handler{aio, r}));
@@ -301,9 +299,8 @@ rgw::Aio::OpFunc SSDDriver::ssd_cache_write_op(const DoutPrefixProvider *dpp, op
     ldpp_dout(dpp, 20) << "SSDCache: cache_write_op(): Write to Cache, oid=" << r.obj.oid << dendl;
 
     using namespace boost::asio;
-    spawn::yield_context yield = y.get_yield_context();
-    async_completion<spawn::yield_context, void()> init(yield);
-    auto ex = get_associated_executor(init.completion_handler);
+    yield_context yield = y.get_yield_context();
+    auto ex = yield.get_executor();
 
     ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): key=" << key << dendl;
     this->put_async(dpp, ex, key, bl, len, attrs, bind_executor(ex, SSDDriver::libaio_write_handler{aio, r}));
index 1394a712a94f18693e03508851548ab69ec8a619..1172e79a48f32af7cecb2de7c53715005bea69fd 100644 (file)
@@ -226,8 +226,8 @@ int rgw_bucket_sync_checkpoint(const DoutPrefixProvider* dpp,
     entry.pipe = pipe;
 
     // fetch remote markers
-    spawn::spawn(ioctx, [&] (spawn::yield_context yield) {
-      auto y = optional_yield{ioctx, yield};
+    boost::asio::spawn(ioctx, [&] (boost::asio::yield_context yield) {
+      auto y = optional_yield{yield};
       rgw_bucket_index_marker_info info;
       int r = source_bilog_info(dpp, store->svc()->zone, entry.pipe,
                                 info, entry.remote_markers, y);
@@ -237,10 +237,12 @@ int rgw_bucket_sync_checkpoint(const DoutPrefixProvider* dpp,
         throw std::system_error(-r, std::system_category());
       }
       entry.latest_gen = info.latest_gen;
+    }, [] (std::exception_ptr eptr) {
+      if (eptr) std::rethrow_exception(eptr);
     });
     // fetch source bucket info
-    spawn::spawn(ioctx, [&] (spawn::yield_context yield) {
-      auto y = optional_yield{ioctx, yield};
+    boost::asio::spawn(ioctx, [&] (boost::asio::yield_context yield) {
+      auto y = optional_yield{yield};
       int r = store->getRados()->get_bucket_instance_info(
           *entry.pipe.source.bucket, entry.source_bucket_info,
           nullptr, nullptr, y, dpp);
@@ -249,6 +251,8 @@ int rgw_bucket_sync_checkpoint(const DoutPrefixProvider* dpp,
             << cpp_strerror(r) << dendl;
         throw std::system_error(-r, std::system_category());
       }
+    }, [] (std::exception_ptr eptr) {
+      if (eptr) std::rethrow_exception(eptr);
     });
   }
 
index a0c8fcfe823cca7fddfebbb4c53c65775aa6a141..2e756eeb583807710f1d9488d482576f791bd1a1 100644 (file)
@@ -340,7 +340,7 @@ target_link_libraries(ceph_test_librgw_file_nfsns
   ${LUA_LIBRARIES}
   ${ALLOC_LIBS}
   )
-  target_link_libraries(ceph_test_librgw_file_nfsns spawn)
+  target_link_libraries(ceph_test_librgw_file_nfsns Boost::context)
 install(TARGETS ceph_test_librgw_file_nfsns DESTINATION ${CMAKE_INSTALL_BINDIR})
 
 # ceph_test_librgw_file_aw (nfs write transaction [atomic write] tests)
@@ -374,7 +374,7 @@ target_link_libraries(ceph_test_librgw_file_marker
   ${LUA_LIBRARIES}
   ${ALLOC_LIBS}
   )
-  target_link_libraries(ceph_test_librgw_file_marker spawn)
+  target_link_libraries(ceph_test_librgw_file_marker Boost::context)
 install(TARGETS ceph_test_librgw_file_marker DESTINATION ${CMAKE_INSTALL_BINDIR})
 
 # ceph_test_librgw_file_xattr (attribute ops)
@@ -393,7 +393,7 @@ target_link_libraries(ceph_test_librgw_file_xattr
   ${LUA_LIBRARIES}
   ${ALLOC_LIBS}
   )
-target_link_libraries(ceph_test_librgw_file_xattr spawn)
+target_link_libraries(ceph_test_librgw_file_xattr Boost::context)
 
 # ceph_test_librgw_file_rename (mv/rename tests)
 add_executable(ceph_test_librgw_file_rename
index 69f511fecca55b2013bce268346c44a12ecfa6a6..e80f646065781e9b6fc0d34ec0a1b71eb01e347a 100644 (file)
@@ -60,7 +60,7 @@ target_link_libraries(ceph_test_rados_api_aio_pp
 
 add_executable(ceph_test_rados_api_asio asio.cc)
 target_link_libraries(ceph_test_rados_api_asio global
-  librados ${UNITTEST_LIBS} spawn)
+  librados ${UNITTEST_LIBS} Boost::context)
 
 add_executable(ceph_test_rados_api_list
   list.cc
@@ -133,7 +133,7 @@ target_include_directories(ceph_test_rados_api_tier_pp
   PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw")
 target_link_libraries(ceph_test_rados_api_tier_pp
   librados global ${UNITTEST_LIBS} Boost::system radostest-cxx cls_cas_internal
-  cls_cas_client spawn)
+  cls_cas_client Boost::context)
 
 add_executable(ceph_test_rados_api_snapshots
   snapshots.cc)
index 0ede2e14fb46530d9b6252e4d27911be94316f51..9f8844eb7bb82090c7cb6e40306d4efa065b1a22 100644 (file)
@@ -21,8 +21,8 @@
 
 #include <boost/range/begin.hpp>
 #include <boost/range/end.hpp>
-#include <spawn/spawn.hpp>
 #include <boost/asio/io_context.hpp>
+#include <boost/asio/spawn.hpp>
 #include <boost/asio/use_future.hpp>
 
 #define dout_subsys ceph_subsys_rados
@@ -73,6 +73,10 @@ librados::Rados AsioRados::rados;
 librados::IoCtx AsioRados::io;
 librados::IoCtx AsioRados::snapio;
 
+void rethrow(std::exception_ptr eptr) {
+  if (eptr) std::rethrow_exception(eptr);
+}
+
 TEST_F(AsioRados, AsyncReadCallback)
 {
   boost::asio::io_context service;
@@ -113,20 +117,20 @@ TEST_F(AsioRados, AsyncReadYield)
 {
   boost::asio::io_context service;
 
-  auto success_cr = [&] (spawn::yield_context yield) {
+  auto success_cr = [&] (boost::asio::yield_context yield) {
     boost::system::error_code ec;
     auto bl = librados::async_read(service, io, "exist", 256, 0, yield[ec]);
     EXPECT_FALSE(ec);
     EXPECT_EQ("hello", bl.to_str());
   };
-  spawn::spawn(service, success_cr);
+  boost::asio::spawn(service, success_cr, rethrow);
 
-  auto failure_cr = [&] (spawn::yield_context yield) {
+  auto failure_cr = [&] (boost::asio::yield_context yield) {
     boost::system::error_code ec;
     auto bl = librados::async_read(service, io, "noexist", 256, 0, yield[ec]);
     EXPECT_EQ(boost::system::errc::no_such_file_or_directory, ec);
   };
-  spawn::spawn(service, failure_cr);
+  boost::asio::spawn(service, failure_cr, rethrow);
 
   service.run();
 }
@@ -178,22 +182,22 @@ TEST_F(AsioRados, AsyncWriteYield)
   bufferlist bl;
   bl.append("hello");
 
-  auto success_cr = [&] (spawn::yield_context yield) {
+  auto success_cr = [&] (boost::asio::yield_context yield) {
     boost::system::error_code ec;
     librados::async_write(service, io, "exist", bl, bl.length(), 0,
                           yield[ec]);
     EXPECT_FALSE(ec);
     EXPECT_EQ("hello", bl.to_str());
   };
-  spawn::spawn(service, success_cr);
+  boost::asio::spawn(service, success_cr, rethrow);
 
-  auto failure_cr = [&] (spawn::yield_context yield) {
+  auto failure_cr = [&] (boost::asio::yield_context yield) {
     boost::system::error_code ec;
     librados::async_write(service, snapio, "exist", bl, bl.length(), 0,
                           yield[ec]);
     EXPECT_EQ(boost::system::errc::read_only_file_system, ec);
   };
-  spawn::spawn(service, failure_cr);
+  boost::asio::spawn(service, failure_cr, rethrow);
 
   service.run();
 }
@@ -251,7 +255,7 @@ TEST_F(AsioRados, AsyncReadOperationYield)
 {
   boost::asio::io_context service;
 
-  auto success_cr = [&] (spawn::yield_context yield) {
+  auto success_cr = [&] (boost::asio::yield_context yield) {
     librados::ObjectReadOperation op;
     op.read(0, 0, nullptr, nullptr);
     boost::system::error_code ec;
@@ -260,9 +264,9 @@ TEST_F(AsioRados, AsyncReadOperationYield)
     EXPECT_FALSE(ec);
     EXPECT_EQ("hello", bl.to_str());
   };
-  spawn::spawn(service, success_cr);
+  boost::asio::spawn(service, success_cr, rethrow);
 
-  auto failure_cr = [&] (spawn::yield_context yield) {
+  auto failure_cr = [&] (boost::asio::yield_context yield) {
     librados::ObjectReadOperation op;
     op.read(0, 0, nullptr, nullptr);
     boost::system::error_code ec;
@@ -270,7 +274,7 @@ TEST_F(AsioRados, AsyncReadOperationYield)
                                       yield[ec]);
     EXPECT_EQ(boost::system::errc::no_such_file_or_directory, ec);
   };
-  spawn::spawn(service, failure_cr);
+  boost::asio::spawn(service, failure_cr, rethrow);
 
   service.run();
 }
@@ -335,23 +339,23 @@ TEST_F(AsioRados, AsyncWriteOperationYield)
   bufferlist bl;
   bl.append("hello");
 
-  auto success_cr = [&] (spawn::yield_context yield) {
+  auto success_cr = [&] (boost::asio::yield_context yield) {
     librados::ObjectWriteOperation op;
     op.write_full(bl);
     boost::system::error_code ec;
     librados::async_operate(service, io, "exist", &op, 0, nullptr, yield[ec]);
     EXPECT_FALSE(ec);
   };
-  spawn::spawn(service, success_cr);
+  boost::asio::spawn(service, success_cr, rethrow);
 
-  auto failure_cr = [&] (spawn::yield_context yield) {
+  auto failure_cr = [&] (boost::asio::yield_context yield) {
     librados::ObjectWriteOperation op;
     op.write_full(bl);
     boost::system::error_code ec;
     librados::async_operate(service, snapio, "exist", &op, 0, nullptr, yield[ec]);
     EXPECT_EQ(boost::system::errc::read_only_file_system, ec);
   };
-  spawn::spawn(service, failure_cr);
+  boost::asio::spawn(service, failure_cr, rethrow);
 
   service.run();
 }
index 93deb5ff1ac408963cd1239a1434ca11b02ecbad..c96e9012790f6cf3d1191daee4ea69b20bfd4497 100644 (file)
@@ -34,7 +34,6 @@ target_link_libraries(ceph_test_rgw_d4n_directory PRIVATE
   ${UNITTEST_LIBS}
   ${EXTRALIBS}
   )
-  target_link_libraries(ceph_test_rgw_d4n_directory PRIVATE spawn)
 install(TARGETS ceph_test_rgw_d4n_directory DESTINATION ${CMAKE_INSTALL_BINDIR})
   
 add_executable(ceph_test_rgw_d4n_policy
@@ -50,7 +49,6 @@ target_link_libraries(ceph_test_rgw_d4n_policy PRIVATE
   ${UNITTEST_LIBS}
   ${EXTRALIBS}
   )
-  target_link_libraries(ceph_test_rgw_d4n_policy PRIVATE spawn)
 install(TARGETS ceph_test_rgw_d4n_policy DESTINATION ${CMAKE_INSTALL_BINDIR})
 
 add_executable(ceph_test_rgw_redis_driver
@@ -66,7 +64,6 @@ target_link_libraries(ceph_test_rgw_redis_driver PRIVATE
   ${UNITTEST_LIBS}
   ${EXTRALIBS}
   )
-  target_link_libraries(ceph_test_rgw_redis_driver PRIVATE spawn)
 install(TARGETS ceph_test_rgw_redis_driver DESTINATION ${CMAKE_INSTALL_BINDIR})
 
 add_executable(ceph_test_rgw_ssd_driver
@@ -82,7 +79,6 @@ target_link_libraries(ceph_test_rgw_ssd_driver PRIVATE
   ${UNITTEST_LIBS}
   ${EXTRALIBS}
   )
-  target_link_libraries(ceph_test_rgw_ssd_driver PRIVATE spawn)
 install(TARGETS ceph_test_rgw_ssd_driver DESTINATION ${CMAKE_INSTALL_BINDIR})
 endif()
 
index 1ea8714f9df00f83e96ea55ebc7a650d3e3943bd..529d8f739fd9307ec967c8dc0a168a5b16183dbb 100644 (file)
@@ -6,7 +6,7 @@
 #include <boost/asio/io_context.hpp>
 #include <boost/asio/executor_work_guard.hpp>
 #include <boost/asio/steady_timer.hpp>
-#include <spawn/spawn.hpp>
+#include <boost/asio/spawn.hpp>
 #include <chrono>
 #include <mutex>
 #include <unordered_map>
@@ -37,7 +37,7 @@ struct parameters {
 std::shared_ptr<std::vector<client_info>> ds = std::make_shared<std::vector<client_info>>(std::vector<client_info>());
 
 std::string method[2] = {"PUT", "GET"};
-void simulate_transfer(client_info& it, const RGWRateLimitInfo* info, std::shared_ptr<RateLimiter> ratelimit, const parameters& params, spawn::yield_context& yield, boost::asio::io_context& ioctx)
+void simulate_transfer(client_info& it, const RGWRateLimitInfo* info, std::shared_ptr<RateLimiter> ratelimit, const parameters& params, boost::asio::yield_context& yield, boost::asio::io_context& ioctx)
 {
     auto dout = DoutPrefix(g_ceph_context, ceph_subsys_rgw, "rate limiter: ");
     boost::asio::steady_timer timer(ioctx);
@@ -101,7 +101,7 @@ bool simulate_request(client_info& it, const RGWRateLimitInfo& info, std::shared
     it.accepted++;
     return false;
 }
-void simulate_client(client_info& it, const RGWRateLimitInfo& info, std::shared_ptr<RateLimiter> ratelimit, const parameters& params, spawn::yield_context& ctx, bool& to_run, boost::asio::io_context& ioctx)
+void simulate_client(client_info& it, const RGWRateLimitInfo& info, std::shared_ptr<RateLimiter> ratelimit, const parameters& params, boost::asio::yield_context& ctx, bool& to_run, boost::asio::io_context& ioctx)
 {
     for (;;)
     {
@@ -130,11 +130,13 @@ void simulate_clients(boost::asio::io_context& context, std::string tenant, cons
         auto& it = ds->emplace_back(client_info());
         it.tenant = tenant;
         int x = ds->size() - 1;
-        spawn::spawn(context,
-                [&to_run ,x, ratelimit, info, params, &context](spawn::yield_context ctx)
+        boost::asio::spawn(context,
+                [&to_run ,x, ratelimit, info, params, &context](boost::asio::yield_context ctx)
                 {
                     auto& it = ds.get()->operator[](x);
                     simulate_client(it, info, ratelimit, params, ctx, to_run, context);
+                }, [] (std::exception_ptr eptr) {
+                  if (eptr) std::rethrow_exception(eptr);
                 });
     }
 }
index c518eb50c7dfa98ad19e158979145c278886d08b..05ad83991412390009ae89575a06ff74380380c4 100644 (file)
@@ -138,10 +138,14 @@ class BlockDirectoryFixture: public ::testing::Test {
                                     "objName", "bucketName", "creationTime", "dirty", "objHosts"};
 };
 
+void rethrow(std::exception_ptr eptr) {
+  if (eptr) std::rethrow_exception(eptr);
+}
+
 TEST_F(ObjectDirectoryFixture, SetYield)
 {
-  spawn::spawn(io, [this] (spawn::yield_context yield) {
-    ASSERT_EQ(0, dir->set(obj, optional_yield{io, yield}));
+  boost::asio::spawn(io, [this] (boost::asio::yield_context yield) {
+    ASSERT_EQ(0, dir->set(obj, yield));
 
     boost::system::error_code ec;
     request req;
@@ -156,15 +160,15 @@ TEST_F(ObjectDirectoryFixture, SetYield)
     ASSERT_EQ((bool)ec, false);
     EXPECT_EQ(std::get<0>(resp).value(), vals);
     conn->cancel();
-  });
+  }, rethrow);
 
   io.run();
 }
 
 TEST_F(ObjectDirectoryFixture, GetYield)
 {
-  spawn::spawn(io, [this] (spawn::yield_context yield) {
-    ASSERT_EQ(0, dir->set(obj, optional_yield{io, yield}));
+  boost::asio::spawn(io, [this] (boost::asio::yield_context yield) {
+    ASSERT_EQ(0, dir->set(obj, yield));
 
     {
       boost::system::error_code ec;
@@ -178,7 +182,7 @@ TEST_F(ObjectDirectoryFixture, GetYield)
       EXPECT_EQ(std::get<0>(resp).value(), 0);
     }
 
-    ASSERT_EQ(0, dir->get(obj, optional_yield{io, yield}));
+    ASSERT_EQ(0, dir->get(obj, yield));
     EXPECT_EQ(obj->objName, "newoid");
 
     {
@@ -191,16 +195,16 @@ TEST_F(ObjectDirectoryFixture, GetYield)
     }
 
     conn->cancel();
-  });
+  }, rethrow);
 
   io.run();
 }
 
 TEST_F(ObjectDirectoryFixture, CopyYield)
 {
-  spawn::spawn(io, [this] (spawn::yield_context yield) {
-    ASSERT_EQ(0, dir->set(obj, optional_yield{io, yield}));
-    ASSERT_EQ(0, dir->copy(obj, "copyTestName", "copyBucketName", optional_yield{io, yield}));
+  boost::asio::spawn(io, [this] (boost::asio::yield_context yield) {
+    ASSERT_EQ(0, dir->set(obj, yield));
+    ASSERT_EQ(0, dir->copy(obj, "copyTestName", "copyBucketName", yield));
 
     boost::system::error_code ec;
     request req;
@@ -222,15 +226,15 @@ TEST_F(ObjectDirectoryFixture, CopyYield)
     EXPECT_EQ(std::get<1>(resp).value(), copyVals);
 
     conn->cancel();
-  });
+  }, rethrow);
 
   io.run();
 }
 
 TEST_F(ObjectDirectoryFixture, DelYield)
 {
-  spawn::spawn(io, [this] (spawn::yield_context yield) {
-    ASSERT_EQ(0, dir->set(obj, optional_yield{io, yield}));
+  boost::asio::spawn(io, [this] (boost::asio::yield_context yield) {
+    ASSERT_EQ(0, dir->set(obj, yield));
 
     {
       boost::system::error_code ec;
@@ -244,7 +248,7 @@ TEST_F(ObjectDirectoryFixture, DelYield)
       EXPECT_EQ(std::get<0>(resp).value(), 1);
     }
 
-    ASSERT_EQ(0, dir->del(obj, optional_yield{io, yield}));
+    ASSERT_EQ(0, dir->del(obj, yield));
 
     {
       boost::system::error_code ec;
@@ -260,17 +264,17 @@ TEST_F(ObjectDirectoryFixture, DelYield)
     }
 
     conn->cancel();
-  });
+  }, rethrow);
 
   io.run();
 }
 
 TEST_F(ObjectDirectoryFixture, UpdateFieldYield)
 {
-  spawn::spawn(io, [this] (spawn::yield_context yield) {
-    ASSERT_EQ(0, dir->set(obj, optional_yield{io, yield}));
-    ASSERT_EQ(0, dir->update_field(obj, "objName", "newTestName", optional_yield{io, yield}));
-    ASSERT_EQ(0, dir->update_field(obj, "objHosts", "127.0.0.1:5000", optional_yield{io, yield}));
+  boost::asio::spawn(io, [this] (boost::asio::yield_context yield) {
+    ASSERT_EQ(0, dir->set(obj, yield));
+    ASSERT_EQ(0, dir->update_field(obj, "objName", "newTestName", yield));
+    ASSERT_EQ(0, dir->update_field(obj, "objHosts", "127.0.0.1:5000", yield));
 
     boost::system::error_code ec;
     request req;
@@ -286,7 +290,7 @@ TEST_F(ObjectDirectoryFixture, UpdateFieldYield)
     EXPECT_EQ(std::get<0>(resp).value()[1], "127.0.0.1:6379_127.0.0.1:5000");
 
     conn->cancel();
-  });
+  }, rethrow);
 
   io.run();
 }
@@ -294,8 +298,8 @@ TEST_F(ObjectDirectoryFixture, UpdateFieldYield)
 
 TEST_F(BlockDirectoryFixture, SetYield)
 {
-  spawn::spawn(io, [this] (spawn::yield_context yield) {
-    ASSERT_EQ(0, dir->set(block, optional_yield{io, yield}));
+  boost::asio::spawn(io, [this] (boost::asio::yield_context yield) {
+    ASSERT_EQ(0, dir->set(block, yield));
 
     boost::system::error_code ec;
     request req;
@@ -310,15 +314,15 @@ TEST_F(BlockDirectoryFixture, SetYield)
     ASSERT_EQ((bool)ec, false);
     EXPECT_EQ(std::get<0>(resp).value(), vals);
     conn->cancel();
-  });
+  }, rethrow);
 
   io.run();
 }
 
 TEST_F(BlockDirectoryFixture, GetYield)
 {
-  spawn::spawn(io, [this] (spawn::yield_context yield) {
-    ASSERT_EQ(0, dir->set(block, optional_yield{io, yield}));
+  boost::asio::spawn(io, [this] (boost::asio::yield_context yield) {
+    ASSERT_EQ(0, dir->set(block, yield));
 
     {
       boost::system::error_code ec;
@@ -332,7 +336,7 @@ TEST_F(BlockDirectoryFixture, GetYield)
       EXPECT_EQ(std::get<0>(resp).value(), 0);
     }
 
-    ASSERT_EQ(0, dir->get(block, optional_yield{io, yield}));
+    ASSERT_EQ(0, dir->get(block, yield));
     EXPECT_EQ(block->cacheObj.objName, "newoid");
 
     {
@@ -345,16 +349,16 @@ TEST_F(BlockDirectoryFixture, GetYield)
     }
 
     conn->cancel();
-  });
+  }, rethrow);
 
   io.run();
 }
 
 TEST_F(BlockDirectoryFixture, CopyYield)
 {
-  spawn::spawn(io, [this] (spawn::yield_context yield) {
-    ASSERT_EQ(0, dir->set(block, optional_yield{io, yield}));
-    ASSERT_EQ(0, dir->copy(block, "copyTestName", "copyBucketName", optional_yield{io, yield}));
+  boost::asio::spawn(io, [this] (boost::asio::yield_context yield) {
+    ASSERT_EQ(0, dir->set(block, yield));
+    ASSERT_EQ(0, dir->copy(block, "copyTestName", "copyBucketName", yield));
 
     boost::system::error_code ec;
     request req;
@@ -376,15 +380,15 @@ TEST_F(BlockDirectoryFixture, CopyYield)
     EXPECT_EQ(std::get<1>(resp).value(), copyVals);
 
     conn->cancel();
-  });
+  }, rethrow);
 
   io.run();
 }
 
 TEST_F(BlockDirectoryFixture, DelYield)
 {
-  spawn::spawn(io, [this] (spawn::yield_context yield) {
-    ASSERT_EQ(0, dir->set(block, optional_yield{io, yield}));
+  boost::asio::spawn(io, [this] (boost::asio::yield_context yield) {
+    ASSERT_EQ(0, dir->set(block, yield));
 
     {
       boost::system::error_code ec;
@@ -398,7 +402,7 @@ TEST_F(BlockDirectoryFixture, DelYield)
       EXPECT_EQ(std::get<0>(resp).value(), 1);
     }
 
-    ASSERT_EQ(0, dir->del(block, optional_yield{io, yield}));
+    ASSERT_EQ(0, dir->del(block, yield));
 
     {
       boost::system::error_code ec;
@@ -414,17 +418,17 @@ TEST_F(BlockDirectoryFixture, DelYield)
     }
 
     conn->cancel();
-  });
+  }, rethrow);
 
   io.run();
 }
 
 TEST_F(BlockDirectoryFixture, UpdateFieldYield)
 {
-  spawn::spawn(io, [this] (spawn::yield_context yield) {
-    ASSERT_EQ(0, dir->set(block, optional_yield{io, yield}));
-    ASSERT_EQ(0, dir->update_field(block, "objName", "newTestName", optional_yield{io, yield}));
-    ASSERT_EQ(0, dir->update_field(block, "blockHosts", "127.0.0.1:5000", optional_yield{io, yield}));
+  boost::asio::spawn(io, [this] (boost::asio::yield_context yield) {
+    ASSERT_EQ(0, dir->set(block, yield));
+    ASSERT_EQ(0, dir->update_field(block, "objName", "newTestName", yield));
+    ASSERT_EQ(0, dir->update_field(block, "blockHosts", "127.0.0.1:5000", yield));
 
     boost::system::error_code ec;
     request req;
@@ -440,17 +444,17 @@ TEST_F(BlockDirectoryFixture, UpdateFieldYield)
     EXPECT_EQ(std::get<0>(resp).value()[1], "127.0.0.1:6379_127.0.0.1:5000");
 
     conn->cancel();
-  });
+  }, rethrow);
 
   io.run();
 }
 
 TEST_F(BlockDirectoryFixture, RemoveHostYield)
 {
-  spawn::spawn(io, [this] (spawn::yield_context yield) {
+  boost::asio::spawn(io, [this] (boost::asio::yield_context yield) {
     block->hostsList.push_back("127.0.0.1:6000");
-    ASSERT_EQ(0, dir->set(block, optional_yield{io, yield}));
-    ASSERT_EQ(0, dir->remove_host(block, "127.0.0.1:6379", optional_yield{io, yield}));
+    ASSERT_EQ(0, dir->set(block, yield));
+    ASSERT_EQ(0, dir->remove_host(block, "127.0.0.1:6379", yield));
 
     {
       boost::system::error_code ec;
@@ -466,7 +470,7 @@ TEST_F(BlockDirectoryFixture, RemoveHostYield)
       EXPECT_EQ(std::get<1>(resp).value(), "127.0.0.1:6000");
     }
 
-    ASSERT_EQ(0, dir->remove_host(block, "127.0.0.1:6000", optional_yield{io, yield}));
+    ASSERT_EQ(0, dir->remove_host(block, "127.0.0.1:6000", yield));
 
     {
       boost::system::error_code ec;
@@ -482,7 +486,7 @@ TEST_F(BlockDirectoryFixture, RemoveHostYield)
     }
 
     conn->cancel();
-  });
+  }, rethrow);
 
   io.run();
 }
index fe277536003608e4d97960d1751c6a852249b9b8..dea349af514edcb71305e3f99e91432fbef033e6 100644 (file)
@@ -156,14 +156,18 @@ class LFUDAPolicyFixture : public ::testing::Test {
     rgw::sal::Attrs attrs;
 };
 
+void rethrow(std::exception_ptr eptr) {
+  if (eptr) std::rethrow_exception(eptr);
+}
+
 TEST_F(LFUDAPolicyFixture, LocalGetBlockYield)
 {
-  spawn::spawn(io, [this] (spawn::yield_context yield) {
+  boost::asio::spawn(io, [this] (boost::asio::yield_context yield) {
     std::string key = block->cacheObj.bucketName + "_" + block->cacheObj.objName + "_" + std::to_string(block->blockID) + "_" + std::to_string(block->size);
-    ASSERT_EQ(0, cacheDriver->put(env->dpp, key, bl, bl.length(), attrs, optional_yield{io, yield}));
-    policyDriver->get_cache_policy()->update(env->dpp, key, 0, bl.length(), "", optional_yield{io, yield});
+    ASSERT_EQ(0, cacheDriver->put(env->dpp, key, bl, bl.length(), attrs, yield));
+    policyDriver->get_cache_policy()->update(env->dpp, key, 0, bl.length(), "", yield);
 
-    ASSERT_EQ(lfuda(env->dpp, block, cacheDriver, optional_yield{io, yield}), 0);
+    ASSERT_EQ(lfuda(env->dpp, block, cacheDriver, yield), 0);
 
     cacheDriver->shutdown();
 
@@ -179,14 +183,14 @@ TEST_F(LFUDAPolicyFixture, LocalGetBlockYield)
     ASSERT_EQ((bool)ec, false);
     EXPECT_EQ(std::get<0>(resp).value(), "2");
     conn->cancel();
-  });
+  }, rethrow);
 
   io.run();
 }
 
 TEST_F(LFUDAPolicyFixture, RemoteGetBlockYield)
 {
-  spawn::spawn(io, [this] (spawn::yield_context yield) {
+  boost::asio::spawn(io, [this] (boost::asio::yield_context yield) {
     /* Set victim block for eviction */
     rgw::d4n::CacheBlock victim = rgw::d4n::CacheBlock{
       .cacheObj = {
@@ -210,10 +214,10 @@ TEST_F(LFUDAPolicyFixture, RemoteGetBlockYield)
     attrVal.append("testBucket");
     attrs.insert({"bucket_name", attrVal});
 
-    ASSERT_EQ(0, dir->set(&victim, optional_yield{io, yield}));
+    ASSERT_EQ(0, dir->set(&victim, yield));
     std::string victimKey = victim.cacheObj.bucketName + "_" + victim.cacheObj.objName + "_" + std::to_string(victim.blockID) + "_" + std::to_string(victim.size);
-    ASSERT_EQ(0, cacheDriver->put(env->dpp, victimKey, bl, bl.length(), attrs, optional_yield{io, yield}));
-    policyDriver->get_cache_policy()->update(env->dpp, victimKey, 0, bl.length(), "", optional_yield{io, yield});
+    ASSERT_EQ(0, cacheDriver->put(env->dpp, victimKey, bl, bl.length(), attrs, yield));
+    policyDriver->get_cache_policy()->update(env->dpp, victimKey, 0, bl.length(), "", yield);
 
     /* Remote block */
     block->size = cacheDriver->get_free_space(env->dpp) + 1; /* To trigger eviction */
@@ -222,9 +226,9 @@ TEST_F(LFUDAPolicyFixture, RemoteGetBlockYield)
     block->hostsList.push_back("127.0.0.1:6000");
     block->cacheObj.hostsList.push_back("127.0.0.1:6000");
 
-    ASSERT_EQ(0, dir->set(block, optional_yield{io, yield}));
+    ASSERT_EQ(0, dir->set(block, yield));
 
-    ASSERT_GE(lfuda(env->dpp, block, cacheDriver, optional_yield{io, yield}), 0);
+    ASSERT_GE(lfuda(env->dpp, block, cacheDriver, yield), 0);
 
     cacheDriver->shutdown();
 
@@ -246,15 +250,15 @@ TEST_F(LFUDAPolicyFixture, RemoteGetBlockYield)
     EXPECT_EQ(std::get<1>(resp).value(), 0);
     EXPECT_EQ(std::get<2>(resp).value(), "1");
     conn->cancel();
-  });
+  }, rethrow);
 
   io.run();
 }
 
 TEST_F(LFUDAPolicyFixture, BackendGetBlockYield)
 {
-  spawn::spawn(io, [this] (spawn::yield_context yield) {
-    ASSERT_GE(lfuda(env->dpp, block, cacheDriver, optional_yield{io, yield}), 0);
+  boost::asio::spawn(io, [this] (boost::asio::yield_context yield) {
+    ASSERT_GE(lfuda(env->dpp, block, cacheDriver, yield), 0);
 
     cacheDriver->shutdown();
 
@@ -267,16 +271,16 @@ TEST_F(LFUDAPolicyFixture, BackendGetBlockYield)
     conn->async_exec(req, resp, yield[ec]);
 
     conn->cancel();
-  });
+  }, rethrow);
 
   io.run();
 }
 
 TEST_F(LFUDAPolicyFixture, RedisSyncTest)
 {
-  spawn::spawn(io, [this] (spawn::yield_context yield) {
+  boost::asio::spawn(io, [this] (boost::asio::yield_context yield) {
     env->cct->_conf->rgw_lfuda_sync_frequency = 1;
-    dynamic_cast<rgw::d4n::LFUDAPolicy*>(policyDriver->get_cache_policy())->save_y(optional_yield{io, yield});
+    dynamic_cast<rgw::d4n::LFUDAPolicy*>(policyDriver->get_cache_policy())->save_y(yield);
     policyDriver->get_cache_policy()->init(env->cct, env->dpp, io);
   
     cacheDriver->shutdown();
@@ -308,7 +312,7 @@ TEST_F(LFUDAPolicyFixture, RedisSyncTest)
     
     delete policyDriver; 
     policyDriver = nullptr;
-  });
+  }, rethrow);
 
   io.run();
 }
index e8ca1de6d55e9c61959f8af2f13b5e750d49691c..be5f1496a52c2afb65cf9a15dee88bc1bc7c3004 100644 (file)
@@ -122,10 +122,14 @@ class RedisDriverFixture: public ::testing::Test {
     rgw::sal::Attrs attrs;
 };
 
+void rethrow(std::exception_ptr eptr) {
+  if (eptr) std::rethrow_exception(eptr);
+}
+
 TEST_F(RedisDriverFixture, PutYield)
 {
-  spawn::spawn(io, [this] (spawn::yield_context yield) {
-    ASSERT_EQ(0, cacheDriver->put(env->dpp, "testName", bl, bl.length(), attrs, optional_yield{io, yield}));
+  boost::asio::spawn(io, [this] (boost::asio::yield_context yield) {
+    ASSERT_EQ(0, cacheDriver->put(env->dpp, "testName", bl, bl.length(), attrs, yield));
     cacheDriver->shutdown();
 
     boost::system::error_code ec;
@@ -140,15 +144,15 @@ TEST_F(RedisDriverFixture, PutYield)
     ASSERT_EQ((bool)ec, false);
     EXPECT_EQ(std::get<0>(resp).value(), "attrVal");
     conn->cancel();
-  });
+  }, rethrow);
 
   io.run();
 }
 
 TEST_F(RedisDriverFixture, GetYield)
 {
-  spawn::spawn(io, [this] (spawn::yield_context yield) {
-    ASSERT_EQ(0, cacheDriver->put(env->dpp, "testName", bl, bl.length(), attrs, optional_yield{io, yield}));
+  boost::asio::spawn(io, [this] (boost::asio::yield_context yield) {
+    ASSERT_EQ(0, cacheDriver->put(env->dpp, "testName", bl, bl.length(), attrs, yield));
 
     {
       boost::system::error_code ec;
@@ -165,7 +169,7 @@ TEST_F(RedisDriverFixture, GetYield)
     bufferlist ret;
     rgw::sal::Attrs retAttrs;
 
-    ASSERT_EQ(0, cacheDriver->get(env->dpp, "testName", 0, bl.length(), ret, retAttrs, optional_yield{io, yield}));
+    ASSERT_EQ(0, cacheDriver->get(env->dpp, "testName", 0, bl.length(), ret, retAttrs, yield));
     EXPECT_EQ(ret.to_str(), "new data");
     EXPECT_EQ(retAttrs.begin()->second.to_str(), "newVal");
     cacheDriver->shutdown();
@@ -182,16 +186,16 @@ TEST_F(RedisDriverFixture, GetYield)
     }
 
     conn->cancel();
-  });
+  }, rethrow);
 
   io.run();
 }
 
 TEST_F(RedisDriverFixture, PutAsyncYield)
 {
-  spawn::spawn(io, [this] (spawn::yield_context yield) {
-    std::unique_ptr<rgw::Aio> aio = rgw::make_throttle(env->cct->_conf->rgw_get_obj_window_size, optional_yield{io, yield});
-    auto completed = cacheDriver->put_async(env->dpp, optional_yield{io, yield}, aio.get(), "testName", bl, bl.length(), attrs, 0, 0);
+  boost::asio::spawn(io, [this] (boost::asio::yield_context yield) {
+    std::unique_ptr<rgw::Aio> aio = rgw::make_throttle(env->cct->_conf->rgw_get_obj_window_size, yield);
+    auto completed = cacheDriver->put_async(env->dpp, yield, aio.get(), "testName", bl, bl.length(), attrs, 0, 0);
     drain(env->dpp, aio.get());
 
     cacheDriver->shutdown();
@@ -208,18 +212,18 @@ TEST_F(RedisDriverFixture, PutAsyncYield)
     EXPECT_EQ(std::get<0>(resp).value()[0], "attrVal");
     EXPECT_EQ(std::get<0>(resp).value()[1], "test data");
     conn->cancel();
-  });
+  }, rethrow);
 
   io.run();
 }
 
 TEST_F(RedisDriverFixture, GetAsyncYield)
 {
-  spawn::spawn(io, [this] (spawn::yield_context yield) {
-    ASSERT_EQ(0, cacheDriver->put(env->dpp, "testName", bl, bl.length(), attrs, optional_yield{io, yield}));
+  boost::asio::spawn(io, [this] (boost::asio::yield_context yield) {
+    ASSERT_EQ(0, cacheDriver->put(env->dpp, "testName", bl, bl.length(), attrs, yield));
 
-    std::unique_ptr<rgw::Aio> aio = rgw::make_throttle(env->cct->_conf->rgw_get_obj_window_size, optional_yield{io, yield});
-    auto completed = cacheDriver->get_async(env->dpp, optional_yield{io, yield}, aio.get(), "testName", 0, bl.length(), 0, 0);
+    std::unique_ptr<rgw::Aio> aio = rgw::make_throttle(env->cct->_conf->rgw_get_obj_window_size, yield);
+    auto completed = cacheDriver->get_async(env->dpp, yield, aio.get(), "testName", 0, bl.length(), 0, 0);
     drain(env->dpp, aio.get());
 
     cacheDriver->shutdown();
@@ -236,15 +240,15 @@ TEST_F(RedisDriverFixture, GetAsyncYield)
     EXPECT_EQ(std::get<0>(resp).value()[0], "attrVal");
     EXPECT_EQ(std::get<0>(resp).value()[1], "test data");
     conn->cancel();
-  });
+  }, rethrow);
 
   io.run();
 }
 
 TEST_F(RedisDriverFixture, DelYield)
 {
-  spawn::spawn(io, [this] (spawn::yield_context yield) {
-    ASSERT_EQ(0, cacheDriver->put(env->dpp, "testName", bl, bl.length(), attrs, optional_yield{io, yield}));
+  boost::asio::spawn(io, [this] (boost::asio::yield_context yield) {
+    ASSERT_EQ(0, cacheDriver->put(env->dpp, "testName", bl, bl.length(), attrs, yield));
 
     {
       boost::system::error_code ec;
@@ -258,7 +262,7 @@ TEST_F(RedisDriverFixture, DelYield)
       EXPECT_EQ(std::get<0>(resp).value(), 1);
     }
 
-    ASSERT_EQ(0, cacheDriver->del(env->dpp, "testName", optional_yield{io, yield}));
+    ASSERT_EQ(0, cacheDriver->del(env->dpp, "testName", yield));
     cacheDriver->shutdown();
 
     {
@@ -275,15 +279,15 @@ TEST_F(RedisDriverFixture, DelYield)
     }
 
     conn->cancel();
-  });
+  }, rethrow);
 
   io.run();
 }
 
 TEST_F(RedisDriverFixture, AppendDataYield)
 {
-  spawn::spawn(io, [this] (spawn::yield_context yield) {
-    ASSERT_EQ(0, cacheDriver->put(env->dpp, "testName", bl, bl.length(), attrs, optional_yield{io, yield}));
+  boost::asio::spawn(io, [this] (boost::asio::yield_context yield) {
+    ASSERT_EQ(0, cacheDriver->put(env->dpp, "testName", bl, bl.length(), attrs, yield));
 
     {
       boost::system::error_code ec;
@@ -300,7 +304,7 @@ TEST_F(RedisDriverFixture, AppendDataYield)
     bufferlist val;
     val.append(" has been written");
 
-    ASSERT_EQ(0, cacheDriver->append_data(env->dpp, "testName", val, optional_yield{io, yield}));
+    ASSERT_EQ(0, cacheDriver->append_data(env->dpp, "testName", val, yield));
     cacheDriver->shutdown();
 
     {
@@ -317,15 +321,15 @@ TEST_F(RedisDriverFixture, AppendDataYield)
     }
 
     conn->cancel();
-  });
+  }, rethrow);
 
   io.run();
 }
 
 TEST_F(RedisDriverFixture, DeleteDataYield)
 {
-  spawn::spawn(io, [this] (spawn::yield_context yield) {
-    ASSERT_EQ(0, cacheDriver->put(env->dpp, "testName", bl, bl.length(), attrs, optional_yield{io, yield}));
+  boost::asio::spawn(io, [this] (boost::asio::yield_context yield) {
+    ASSERT_EQ(0, cacheDriver->put(env->dpp, "testName", bl, bl.length(), attrs, yield));
 
     {
       boost::system::error_code ec;
@@ -339,7 +343,7 @@ TEST_F(RedisDriverFixture, DeleteDataYield)
       EXPECT_EQ(std::get<0>(resp).value(), 1);
     }
 
-    ASSERT_EQ(0, cacheDriver->delete_data(env->dpp, "testName", optional_yield{io, yield}));
+    ASSERT_EQ(0, cacheDriver->delete_data(env->dpp, "testName", yield));
     cacheDriver->shutdown();
 
     {
@@ -356,15 +360,15 @@ TEST_F(RedisDriverFixture, DeleteDataYield)
     }
 
     conn->cancel();
-  });
+  }, rethrow);
 
   io.run();
 }
 
 TEST_F(RedisDriverFixture, SetAttrsYield)
 {
-  spawn::spawn(io, [this] (spawn::yield_context yield) {
-    ASSERT_EQ(0, cacheDriver->put(env->dpp, "testName", bl, bl.length(), attrs, optional_yield{io, yield}));
+  boost::asio::spawn(io, [this] (boost::asio::yield_context yield) {
+    ASSERT_EQ(0, cacheDriver->put(env->dpp, "testName", bl, bl.length(), attrs, yield));
 
     rgw::sal::Attrs newAttrs;
     bufferlist newVal;
@@ -375,7 +379,7 @@ TEST_F(RedisDriverFixture, SetAttrsYield)
     newVal.append("nextVal");
     newAttrs.insert({"nextAttr", newVal});
 
-    ASSERT_EQ(0, cacheDriver->set_attrs(env->dpp, "testName", newAttrs, optional_yield{io, yield}));
+    ASSERT_EQ(0, cacheDriver->set_attrs(env->dpp, "testName", newAttrs, yield));
     cacheDriver->shutdown();
 
     boost::system::error_code ec;
@@ -392,20 +396,20 @@ TEST_F(RedisDriverFixture, SetAttrsYield)
     EXPECT_EQ(std::get<0>(resp).value()[0], "newVal");
     EXPECT_EQ(std::get<0>(resp).value()[1], "nextVal");
     conn->cancel();
-  });
+  }, rethrow);
 
   io.run();
 }
 
 TEST_F(RedisDriverFixture, GetAttrsYield)
 {
-  spawn::spawn(io, [this] (spawn::yield_context yield) {
+  boost::asio::spawn(io, [this] (boost::asio::yield_context yield) {
     rgw::sal::Attrs nextAttrs = attrs;
     bufferlist nextVal;
     nextVal.append("nextVal");
     nextAttrs.insert({"nextAttr", nextVal});
 
-    ASSERT_EQ(0, cacheDriver->put(env->dpp, "testName", bl, bl.length(), nextAttrs, optional_yield{io, yield}));
+    ASSERT_EQ(0, cacheDriver->put(env->dpp, "testName", bl, bl.length(), nextAttrs, yield));
 
     {
       boost::system::error_code ec;
@@ -421,7 +425,7 @@ TEST_F(RedisDriverFixture, GetAttrsYield)
 
     rgw::sal::Attrs retAttrs;
 
-    ASSERT_EQ(0, cacheDriver->get_attrs(env->dpp, "testName", retAttrs, optional_yield{io, yield}));
+    ASSERT_EQ(0, cacheDriver->get_attrs(env->dpp, "testName", retAttrs, yield));
    
     auto it = retAttrs.begin();
     EXPECT_EQ(it->second.to_str(), "newVal1");
@@ -441,22 +445,22 @@ TEST_F(RedisDriverFixture, GetAttrsYield)
     }
 
     conn->cancel();
-  });
+  }, rethrow);
 
   io.run();
 }
 
 TEST_F(RedisDriverFixture, UpdateAttrsYield)
 {
-  spawn::spawn(io, [this] (spawn::yield_context yield) {
-    ASSERT_EQ(0, cacheDriver->put(env->dpp, "testName", bl, bl.length(), attrs, optional_yield{io, yield}));
+  boost::asio::spawn(io, [this] (boost::asio::yield_context yield) {
+    ASSERT_EQ(0, cacheDriver->put(env->dpp, "testName", bl, bl.length(), attrs, yield));
 
     rgw::sal::Attrs newAttrs;
     bufferlist newVal;
     newVal.append("newVal");
     newAttrs.insert({"attr", newVal});
 
-    ASSERT_EQ(0, cacheDriver->update_attrs(env->dpp, "testName", newAttrs, optional_yield{io, yield}));
+    ASSERT_EQ(0, cacheDriver->update_attrs(env->dpp, "testName", newAttrs, yield));
     cacheDriver->shutdown();
 
     boost::system::error_code ec;
@@ -471,15 +475,15 @@ TEST_F(RedisDriverFixture, UpdateAttrsYield)
     EXPECT_EQ(std::get<0>(resp).value(), "newVal");
 
     conn->cancel();
-  });
+  }, rethrow);
 
   io.run();
 }
 
 TEST_F(RedisDriverFixture, DeleteAttrsYield)
 {
-  spawn::spawn(io, [this] (spawn::yield_context yield) {
-    ASSERT_EQ(0, cacheDriver->put(env->dpp, "testName", bl, bl.length(), attrs, optional_yield{io, yield}));
+  boost::asio::spawn(io, [this] (boost::asio::yield_context yield) {
+    ASSERT_EQ(0, cacheDriver->put(env->dpp, "testName", bl, bl.length(), attrs, yield));
 
     {
       boost::system::error_code ec;
@@ -497,7 +501,7 @@ TEST_F(RedisDriverFixture, DeleteAttrsYield)
     bufferlist delVal;
     delAttrs.insert({"attr", delVal});
 
-    ASSERT_GE(cacheDriver->delete_attrs(env->dpp, "testName", delAttrs, optional_yield{io, yield}), 0);
+    ASSERT_GE(cacheDriver->delete_attrs(env->dpp, "testName", delAttrs, yield), 0);
     cacheDriver->shutdown();
 
     {
@@ -514,16 +518,16 @@ TEST_F(RedisDriverFixture, DeleteAttrsYield)
     }
 
     conn->cancel();
-  });
+  }, rethrow);
 
   io.run();
 }
 
 TEST_F(RedisDriverFixture, SetAttrYield)
 {
-  spawn::spawn(io, [this] (spawn::yield_context yield) {
-    ASSERT_EQ(0, cacheDriver->put(env->dpp, "testName", bl, bl.length(), attrs, optional_yield{io, yield}));
-    ASSERT_GE(cacheDriver->set_attr(env->dpp, "testName", "newAttr", "newVal", optional_yield{io, yield}), 0);
+  boost::asio::spawn(io, [this] (boost::asio::yield_context yield) {
+    ASSERT_EQ(0, cacheDriver->put(env->dpp, "testName", bl, bl.length(), attrs, yield));
+    ASSERT_GE(cacheDriver->set_attr(env->dpp, "testName", "newAttr", "newVal", yield), 0);
     cacheDriver->shutdown();
 
     boost::system::error_code ec;
@@ -538,15 +542,15 @@ TEST_F(RedisDriverFixture, SetAttrYield)
     ASSERT_EQ((bool)ec, false);
     EXPECT_EQ(std::get<0>(resp).value(), "newVal");
     conn->cancel();
-  });
+  }, rethrow);
 
   io.run();
 }
 
 TEST_F(RedisDriverFixture, GetAttrYield)
 {
-  spawn::spawn(io, [this] (spawn::yield_context yield) {
-    ASSERT_EQ(0, cacheDriver->put(env->dpp, "testName", bl, bl.length(), attrs, optional_yield{io, yield}));
+  boost::asio::spawn(io, [this] (boost::asio::yield_context yield) {
+    ASSERT_EQ(0, cacheDriver->put(env->dpp, "testName", bl, bl.length(), attrs, yield));
 
     {
       boost::system::error_code ec;
@@ -560,7 +564,7 @@ TEST_F(RedisDriverFixture, GetAttrYield)
       EXPECT_EQ(std::get<0>(resp).value(), 0);
     }
     std::string attr_val;
-    ASSERT_EQ(0, cacheDriver->get_attr(env->dpp, "testName", "attr", attr_val, optional_yield{io, yield}));
+    ASSERT_EQ(0, cacheDriver->get_attr(env->dpp, "testName", "attr", attr_val, yield));
     ASSERT_EQ("newVal", attr_val);
     cacheDriver->shutdown();
 
@@ -576,7 +580,7 @@ TEST_F(RedisDriverFixture, GetAttrYield)
     }
 
     conn->cancel();
-  });
+  }, rethrow);
 
   io.run();
 }
index c9b4a853fd4d726c36d51c374705da8f8ddeed20..da748dfa6c9ca1a99dc4c4dcef73dcd0b6064067 100644 (file)
@@ -18,7 +18,7 @@
 #include "rgw_dmclock_async_scheduler.h"
 
 #include <optional>
-#include <spawn/spawn.hpp>
+#include <boost/asio/spawn.hpp>
 #include <gtest/gtest.h>
 #include "acconfig.h"
 #include "global/global_context.h"
@@ -400,7 +400,7 @@ TEST(Queue, SpawnAsyncRequest)
 {
   boost::asio::io_context context;
 
-  spawn::spawn(context, [&] (spawn::yield_context yield) {
+  boost::asio::spawn(context, [&] (boost::asio::yield_context yield) {
     ClientCounters counters(g_ceph_context);
     AsyncScheduler queue(g_ceph_context, context, std::ref(counters), nullptr,
                     [] (client_id client) -> ClientInfo* {
@@ -419,6 +419,8 @@ TEST(Queue, SpawnAsyncRequest)
     auto p2 = queue.async_request(client_id::auth, {}, get_time(), 1, yield[ec2]);
     EXPECT_EQ(boost::system::errc::success, ec2);
     EXPECT_EQ(PhaseType::priority, p2);
+  }, [] (std::exception_ptr eptr) {
+    if (eptr) std::rethrow_exception(eptr);
   });
 
   context.run_for(std::chrono::milliseconds(50));
index 98b2aa235b95c06ad446d012a0da7bfc9ea3bd03..058828b956c88e358a6e3103dae3a667cb91a2ab 100644 (file)
@@ -13,7 +13,7 @@
  */
 
 #include "rgw_reshard.h"
-#include <spawn/spawn.hpp>
+#include <boost/asio/spawn.hpp>
 
 #include <gtest/gtest.h>
 
@@ -58,15 +58,19 @@ TEST(ReshardWait, stop_block)
   short_waiter.stop();
 }
 
+void rethrow(std::exception_ptr eptr) {
+  if (eptr) std::rethrow_exception(eptr);
+}
+
 TEST(ReshardWait, wait_yield)
 {
   constexpr ceph::timespan wait_duration = 50ms;
   RGWReshardWait waiter(wait_duration);
 
   boost::asio::io_context context;
-  spawn::spawn(context, [&] (spawn::yield_context yield) {
-      EXPECT_EQ(0, waiter.wait(optional_yield{context, yield}));
-    });
+  boost::asio::spawn(context, [&] (boost::asio::yield_context yield) {
+      EXPECT_EQ(0, waiter.wait(yield));
+    }, rethrow);
 
   const auto start = Clock::now();
   EXPECT_EQ(1u, context.poll()); // spawn
@@ -89,10 +93,10 @@ TEST(ReshardWait, stop_yield)
   RGWReshardWait short_waiter(short_duration);
 
   boost::asio::io_context context;
-  spawn::spawn(context,
-    [&] (spawn::yield_context yield) {
-      EXPECT_EQ(-ECANCELED, long_waiter.wait(optional_yield{context, yield}));
-    });
+  boost::asio::spawn(context,
+    [&] (boost::asio::yield_context yield) {
+      EXPECT_EQ(-ECANCELED, long_waiter.wait(yield));
+    }, rethrow);
 
   const auto start = Clock::now();
   EXPECT_EQ(1u, context.poll()); // spawn
@@ -133,13 +137,13 @@ TEST(ReshardWait, stop_multiple)
   // spawn 4 coroutines
   boost::asio::io_context context;
   {
-    auto async_waiter = [&] (spawn::yield_context yield) {
-        EXPECT_EQ(-ECANCELED, long_waiter.wait(optional_yield{context, yield}));
+    auto async_waiter = [&] (boost::asio::yield_context yield) {
+        EXPECT_EQ(-ECANCELED, long_waiter.wait(yield));
       };
-    spawn::spawn(context, async_waiter);
-    spawn::spawn(context, async_waiter);
-    spawn::spawn(context, async_waiter);
-    spawn::spawn(context, async_waiter);
+    boost::asio::spawn(context, async_waiter, rethrow);
+    boost::asio::spawn(context, async_waiter, rethrow);
+    boost::asio::spawn(context, async_waiter, rethrow);
+    boost::asio::spawn(context, async_waiter, rethrow);
   }
 
   const auto start = Clock::now();
index e5df9f84efa1d08b1eb7408a21016b5ffa6bcb6a..18dd8f3ffbc3e9790169524328e2c7a2202a2baf 100644 (file)
@@ -20,9 +20,9 @@
 #include <boost/asio/error.hpp>
 #include <boost/asio/executor_work_guard.hpp>
 #include <boost/asio/io_context.hpp>
+#include <boost/asio/spawn.hpp>
 #include "include/scope_guard.h"
 
-#include <spawn/spawn.hpp>
 #include <gtest/gtest.h>
 
 static rgw_raw_obj make_obj(const std::string& oid)
@@ -143,13 +143,15 @@ TEST(Aio_Throttle, YieldCostOverWindow)
   auto obj = make_obj(__PRETTY_FUNCTION__);
 
   boost::asio::io_context context;
-  spawn::spawn(context,
-    [&] (spawn::yield_context yield) {
-      YieldingAioThrottle throttle(4, context, yield);
+  boost::asio::spawn(context,
+    [&] (boost::asio::yield_context yield) {
+      YieldingAioThrottle throttle(4, yield);
       scoped_completion op;
       auto c = throttle.get(obj, wait_on(op), 8, 0);
       ASSERT_EQ(1u, c.size());
       EXPECT_EQ(-EDEADLK, c.front().result);
+    }, [] (std::exception_ptr eptr) {
+      if (eptr) std::rethrow_exception(eptr);
     });
   context.run();
 }
@@ -166,9 +168,9 @@ TEST(Aio_Throttle, YieldingThrottleOverMax)
   uint64_t outstanding = 0;
 
   boost::asio::io_context context;
-  spawn::spawn(context,
-    [&] (spawn::yield_context yield) {
-      YieldingAioThrottle throttle(window, context, yield);
+  boost::asio::spawn(context,
+    [&] (boost::asio::yield_context yield) {
+      YieldingAioThrottle throttle(window, yield);
       for (uint64_t i = 0; i < total; i++) {
         using namespace std::chrono_literals;
         auto c = throttle.get(obj, wait_for(context, 10ms), 1, 0);
@@ -180,6 +182,8 @@ TEST(Aio_Throttle, YieldingThrottleOverMax)
       }
       auto c = throttle.drain();
       outstanding -= c.size();
+    }, [] (std::exception_ptr eptr) {
+      if (eptr) std::rethrow_exception(eptr);
     });
   context.poll(); // run until we block
   EXPECT_EQ(window, outstanding);
index 682a12ae700e80e9e177a5df95370c3716a2ae1b..bbd394a0096ea6616f95d03de02f64e7cae09a8e 100644 (file)
@@ -112,159 +112,163 @@ class SSDDriverFixture: public ::testing::Test {
     rgw::sal::Attrs del_attrs;
 };
 
+void rethrow(std::exception_ptr eptr) {
+  if (eptr) std::rethrow_exception(eptr);
+}
+
 TEST_F(SSDDriverFixture, PutAndGet)
 {
-    spawn::spawn(io, [this] (spawn::yield_context yield) {
+    boost::asio::spawn(io, [this] (boost::asio::yield_context yield) {
         rgw::sal::Attrs attrs = {};
-        ASSERT_EQ(0, cacheDriver->put(env->dpp, "testPutGet", bl, bl.length(), attrs, optional_yield{io, yield}));
+        ASSERT_EQ(0, cacheDriver->put(env->dpp, "testPutGet", bl, bl.length(), attrs, yield));
         bufferlist ret;
         rgw::sal::Attrs get_attrs;
-        ASSERT_EQ(0, cacheDriver->get(env->dpp, "testPutGet", 0, bl.length(), ret, get_attrs, optional_yield{io, yield}));
+        ASSERT_EQ(0, cacheDriver->get(env->dpp, "testPutGet", 0, bl.length(), ret, get_attrs, yield));
         EXPECT_EQ(ret, bl);
         EXPECT_EQ(get_attrs.size(), 0);
-    });
+    }, rethrow);
 
     io.run();
 }
 
 TEST_F(SSDDriverFixture, AppendData)
 {
-    spawn::spawn(io, [this] (spawn::yield_context yield) {
+    boost::asio::spawn(io, [this] (boost::asio::yield_context yield) {
         rgw::sal::Attrs attrs = {};
-        ASSERT_EQ(0, cacheDriver->put(env->dpp, "testAppend", bl, bl.length(), attrs, optional_yield{io, yield}));
+        ASSERT_EQ(0, cacheDriver->put(env->dpp, "testAppend", bl, bl.length(), attrs, yield));
     
         bufferlist bl_append;
         bl_append.append(" xyz");
-        ASSERT_EQ(0, cacheDriver->append_data(env->dpp, "testAppend", bl_append, optional_yield{io, yield}));
+        ASSERT_EQ(0, cacheDriver->append_data(env->dpp, "testAppend", bl_append, yield));
     
         bufferlist ret;
         bl.append(bl_append);
         rgw::sal::Attrs get_attrs;
-        ASSERT_EQ(0, cacheDriver->get(env->dpp, "testAppend", 0, bl.length(), ret, get_attrs, optional_yield{io, yield}));
+        ASSERT_EQ(0, cacheDriver->get(env->dpp, "testAppend", 0, bl.length(), ret, get_attrs, yield));
         EXPECT_EQ(ret, bl);
         EXPECT_EQ(get_attrs.size(), 0);
-    });
+    }, rethrow);
 
     io.run();
 }
 
 TEST_F(SSDDriverFixture, SetGetAttrs)
 {
-    spawn::spawn(io, [this] (spawn::yield_context yield) {
-        ASSERT_EQ(0, cacheDriver->put(env->dpp, "testSetGetAttrs", bl, bl.length(), attrs, optional_yield{io, yield}));
+    boost::asio::spawn(io, [this] (boost::asio::yield_context yield) {
+        ASSERT_EQ(0, cacheDriver->put(env->dpp, "testSetGetAttrs", bl, bl.length(), attrs, yield));
         bufferlist ret;
         rgw::sal::Attrs ret_attrs;
-        ASSERT_EQ(0, cacheDriver->get(env->dpp, "testSetGetAttrs", 0, bl.length(), ret, ret_attrs, optional_yield{io, yield}));
+        ASSERT_EQ(0, cacheDriver->get(env->dpp, "testSetGetAttrs", 0, bl.length(), ret, ret_attrs, yield));
         EXPECT_EQ(ret, bl);
         EXPECT_EQ(ret_attrs.size(), 1);
         for (auto& it : ret_attrs) {
           EXPECT_EQ(it.first, "user.rgw.attrName");
           EXPECT_EQ(it.second, attrVal);
         }
-    });
+    }, rethrow);
 
     io.run();
 }
 
 TEST_F(SSDDriverFixture, UpdateAttrs)
 {
-    spawn::spawn(io, [this] (spawn::yield_context yield) {
-        ASSERT_EQ(0, cacheDriver->put(env->dpp, "testUpdateAttrs", bl, bl.length(), attrs, optional_yield{io, yield}));
-        ASSERT_EQ(0, cacheDriver->update_attrs(env->dpp, "testUpdateAttrs", update_attrs, optional_yield{io, yield}));
+    boost::asio::spawn(io, [this] (boost::asio::yield_context yield) {
+        ASSERT_EQ(0, cacheDriver->put(env->dpp, "testUpdateAttrs", bl, bl.length(), attrs, yield));
+        ASSERT_EQ(0, cacheDriver->update_attrs(env->dpp, "testUpdateAttrs", update_attrs, yield));
         rgw::sal::Attrs get_attrs;
-        ASSERT_EQ(0, cacheDriver->get_attrs(env->dpp, "testUpdateAttrs", get_attrs, optional_yield{io, yield}));
+        ASSERT_EQ(0, cacheDriver->get_attrs(env->dpp, "testUpdateAttrs", get_attrs, yield));
         EXPECT_EQ(get_attrs.size(), 2);
         EXPECT_EQ(get_attrs["user.rgw.attrName"], updateAttrVal1);
         EXPECT_EQ(get_attrs["user.rgw.testAttr"], updateAttrVal2);
-    });
+    }, rethrow);
 
     io.run();
 }
 
 TEST_F(SSDDriverFixture, SetGetAttr)
 {
-    spawn::spawn(io, [this] (spawn::yield_context yield) {
+    boost::asio::spawn(io, [this] (boost::asio::yield_context yield) {
       rgw::sal::Attrs attrs = {};
-      ASSERT_EQ(0, cacheDriver->put(env->dpp, "testSetGetAttr", bl, bl.length(), attrs, optional_yield{io, yield}));
+      ASSERT_EQ(0, cacheDriver->put(env->dpp, "testSetGetAttr", bl, bl.length(), attrs, yield));
       std::string attr_name = "user.ssd.testattr";
       std::string attr_val = "testattrVal";
-      ASSERT_EQ(0, cacheDriver->set_attr(env->dpp, "testSetGetAttr", attr_name, attr_val, optional_yield{io, yield}));
+      ASSERT_EQ(0, cacheDriver->set_attr(env->dpp, "testSetGetAttr", attr_name, attr_val, yield));
       std::string attr_val_ret;
-      ASSERT_EQ(0, cacheDriver->get_attr(env->dpp, "testSetGetAttr", attr_name, attr_val_ret, optional_yield{io, yield}));
+      ASSERT_EQ(0, cacheDriver->get_attr(env->dpp, "testSetGetAttr", attr_name, attr_val_ret, yield));
       ASSERT_EQ(attr_val, attr_val_ret);
-    });
+    }, rethrow);
 
     io.run();
 }
 
 TEST_F(SSDDriverFixture, DeleteAttr)
 {
-    spawn::spawn(io, [this] (spawn::yield_context yield) {
+    boost::asio::spawn(io, [this] (boost::asio::yield_context yield) {
       rgw::sal::Attrs attrs = {};
-      ASSERT_EQ(0, cacheDriver->put(env->dpp, "testDeleteAttr", bl, bl.length(), attrs, optional_yield{io, yield}));
+      ASSERT_EQ(0, cacheDriver->put(env->dpp, "testDeleteAttr", bl, bl.length(), attrs, yield));
       std::string attr_name = "user.ssd.testattr";
       std::string attr_val = "testattrVal";
-      ASSERT_EQ(0, cacheDriver->set_attr(env->dpp, "testDeleteAttr", attr_name, attr_val, optional_yield{io, yield}));
+      ASSERT_EQ(0, cacheDriver->set_attr(env->dpp, "testDeleteAttr", attr_name, attr_val, yield));
       std::string attr_val_ret;
-      ASSERT_EQ(0, cacheDriver->get_attr(env->dpp, "testDeleteAttr", attr_name, attr_val_ret, optional_yield{io, yield}));
+      ASSERT_EQ(0, cacheDriver->get_attr(env->dpp, "testDeleteAttr", attr_name, attr_val_ret, yield));
       ASSERT_EQ(attr_val, attr_val_ret);
 
       attr_val_ret.clear();
       ASSERT_EQ(0, cacheDriver->delete_attr(env->dpp, "testDeleteAttr", attr_name));
-      ASSERT_EQ(ENODATA, cacheDriver->get_attr(env->dpp, "testDeleteAttr", attr_name, attr_val_ret, optional_yield{io, yield}));
+      ASSERT_EQ(ENODATA, cacheDriver->get_attr(env->dpp, "testDeleteAttr", attr_name, attr_val_ret, yield));
       ASSERT_EQ("", attr_val_ret);
-    });
+    }, rethrow);
 
     io.run();
 }
 
 TEST_F(SSDDriverFixture, DeleteAttrs)
 {
-    spawn::spawn(io, [this] (spawn::yield_context yield) {
-      ASSERT_EQ(0, cacheDriver->put(env->dpp, "testDeleteAttr", bl, bl.length(), attrs, optional_yield{io, yield}));
+    boost::asio::spawn(io, [this] (boost::asio::yield_context yield) {
+      ASSERT_EQ(0, cacheDriver->put(env->dpp, "testDeleteAttr", bl, bl.length(), attrs, yield));
       rgw::sal::Attrs ret_attrs;
-      ASSERT_EQ(0, cacheDriver->get_attrs(env->dpp, "testDeleteAttr", ret_attrs, optional_yield{io, yield}));
+      ASSERT_EQ(0, cacheDriver->get_attrs(env->dpp, "testDeleteAttr", ret_attrs, yield));
       EXPECT_EQ(ret_attrs.size(), 1);
       for (auto& it : ret_attrs) {
         EXPECT_EQ(it.first, "user.rgw.attrName");
         EXPECT_EQ(it.second, attrVal);
       }
 
-      ASSERT_EQ(0, cacheDriver->delete_attrs(env->dpp, "testDeleteAttr", del_attrs, optional_yield{io, yield}));
+      ASSERT_EQ(0, cacheDriver->delete_attrs(env->dpp, "testDeleteAttr", del_attrs, yield));
       ret_attrs.clear();
-      ASSERT_EQ(0, cacheDriver->get_attrs(env->dpp, "testDeleteAttr", del_attrs, optional_yield{io, yield}));
+      ASSERT_EQ(0, cacheDriver->get_attrs(env->dpp, "testDeleteAttr", del_attrs, yield));
       EXPECT_EQ(ret_attrs.size(), 0);
-    });
+    }, rethrow);
 
     io.run();
 }
 
 TEST_F(SSDDriverFixture, DeleteData)
 {
-    spawn::spawn(io, [this] (spawn::yield_context yield) {
+    boost::asio::spawn(io, [this] (boost::asio::yield_context yield) {
         rgw::sal::Attrs attrs = {};
-        ASSERT_EQ(0, cacheDriver->put(env->dpp, "testDeleteData", bl, bl.length(), attrs, optional_yield{io, yield}));
+        ASSERT_EQ(0, cacheDriver->put(env->dpp, "testDeleteData", bl, bl.length(), attrs, yield));
         bufferlist ret;
         rgw::sal::Attrs get_attrs;
-        ASSERT_EQ(0, cacheDriver->get(env->dpp, "testDeleteData", 0, bl.length(), ret, get_attrs, optional_yield{io, yield}));
+        ASSERT_EQ(0, cacheDriver->get(env->dpp, "testDeleteData", 0, bl.length(), ret, get_attrs, yield));
         EXPECT_EQ(ret, bl);
         EXPECT_EQ(get_attrs.size(), 0);
-        ASSERT_EQ(0, cacheDriver->delete_data(env->dpp, "testDeleteData", optional_yield{io, yield}));
-        ASSERT_EQ(-ENOENT, cacheDriver->get(env->dpp, "testDeleteData", 0, bl.length(), ret, get_attrs, optional_yield{io, yield}));
-    });
+        ASSERT_EQ(0, cacheDriver->delete_data(env->dpp, "testDeleteData", yield));
+        ASSERT_EQ(-ENOENT, cacheDriver->get(env->dpp, "testDeleteData", 0, bl.length(), ret, get_attrs, yield));
+    }, rethrow);
 
     io.run();
 }
 
 TEST_F(SSDDriverFixture, PutAsync)
 {
-    spawn::spawn(io, [this] (spawn::yield_context yield) {
+    boost::asio::spawn(io, [this] (boost::asio::yield_context yield) {
         rgw::sal::Attrs attrs = {};
         const uint64_t window_size = env->cct->_conf->rgw_put_obj_min_window_size;
-        std::unique_ptr<rgw::Aio> aio = rgw::make_throttle(window_size, optional_yield{io, yield});
-        auto results = cacheDriver->put_async(env->dpp, optional_yield{io, yield}, aio.get(), "testPutAsync", bl, bl.length(), attrs, bl.length(), 0);
+        std::unique_ptr<rgw::Aio> aio = rgw::make_throttle(window_size, yield);
+        auto results = cacheDriver->put_async(env->dpp, yield, aio.get(), "testPutAsync", bl, bl.length(), attrs, bl.length(), 0);
         drain(env->dpp, aio.get());
-    });
+    }, rethrow);
 
     io.run();
 }