]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
spawn: use explicit strand executor
authorCasey Bodley <cbodley@redhat.com>
Mon, 1 Nov 2021 17:14:16 +0000 (13:14 -0400)
committerCasey Bodley <cbodley@redhat.com>
Fri, 12 Nov 2021 14:37:09 +0000 (09:37 -0500)
the default spawn::yield_context uses the polymorphic boost::asio::executor
to support any executor type

rgw's beast frontend always uses the same executor type for these
coroutines, so we can use that type directly to avoid the overhead of
type erasure and virtual function calls

Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/common/async/yield_context.h
src/rgw/rgw_aio.cc
src/rgw/rgw_aio_throttle.h
src/rgw/rgw_asio_frontend.cc
src/rgw/rgw_d3n_cacherequest.h
src/rgw/rgw_notify.cc
src/rgw/rgw_sync_checkpoint.cc
src/test/rgw/test_rgw_dmclock_scheduler.cc
src/test/rgw/test_rgw_reshard_wait.cc
src/test/rgw/test_rgw_throttle.cc

index baa028fa1b4afa7d2d1774af93d899efd2a90c6c..05e6ca6140c567d042a2d546a7bb621d54411959 100644 (file)
 
 #include <spawn/spawn.hpp>
 
+// use explicit executor types instead of the type-erased boost::asio::executor.
+// coroutines wrap the default io_context executor with a strand executor
+using yield_context = spawn::basic_yield_context<
+    boost::asio::executor_binder<void(*)(),
+        boost::asio::strand<boost::asio::io_context::executor_type>>>;
+
 /// optional-like wrapper for a spawn::yield_context and its associated
 /// boost::asio::io_context. operations that take an optional_yield argument
 /// will, when passed a non-empty yield context, suspend this coroutine instead
 /// of the blocking the thread of execution
 class optional_yield {
   boost::asio::io_context *c = nullptr;
-  spawn::yield_context *y = nullptr;
+  yield_context *y = nullptr;
  public:
   /// construct with a valid io and yield_context
   explicit optional_yield(boost::asio::io_context& c,
-                          spawn::yield_context& y) noexcept
+                          yield_context& y) noexcept
     : c(&c), y(&y) {}
 
   /// type tag to construct an empty object
@@ -46,7 +52,7 @@ class optional_yield {
   boost::asio::io_context& get_io_context() const noexcept { return *c; }
 
   /// return a reference to the yield_context. only valid if non-empty
-  spawn::yield_context& get_yield_context() const noexcept { return *y; }
+  yield_context& get_yield_context() const noexcept { return *y; }
 };
 
 // type tag object to construct an empty optional_yield
index b55f59d254efd53de35b47cdae429cb12793cccb..afab8b75b0bd52caf013dc589152ef54349dd495 100644 (file)
@@ -80,12 +80,12 @@ struct Handler {
 
 template <typename Op>
 Aio::OpFunc aio_abstract(Op&& op, boost::asio::io_context& context,
-                         spawn::yield_context yield) {
+                         yield_context yield) {
   return [op = std::move(op), &context, yield] (Aio* aio, AioResult& r) mutable {
       // arrange for the completion Handler to run on the yield_context's strand
       // executor so it can safely call back into Aio without locking
       using namespace boost::asio;
-      async_completion<spawn::yield_context, void()> init(yield);
+      async_completion<yield_context, void()> init(yield);
       auto ex = get_associated_executor(init.completion_handler);
 
       auto& ref = r.obj.get_ref();
index 04e93aaa09e890265b402c0dde30e626e8a5c0d7..26a3578cabff552e8d0823a450b1c17cfb1d53ea 100644 (file)
@@ -82,7 +82,7 @@ class BlockingAioThrottle final : public Aio, private Throttle {
 // functions must be called within the coroutine strand
 class YieldingAioThrottle final : public Aio, private Throttle {
   boost::asio::io_context& context;
-  spawn::yield_context yield;
+  yield_context yield;
   struct Handler;
 
   // completion callback associated with the waiter
@@ -96,7 +96,7 @@ class YieldingAioThrottle final : public Aio, private Throttle {
 
  public:
   YieldingAioThrottle(uint64_t window, boost::asio::io_context& context,
-                      spawn::yield_context yield)
+                      yield_context yield)
     : Throttle(window), context(context), yield(yield)
   {}
 
index 8287e1125d343193e0bdf0267c16c1a81d9d91d6..a2d3fa10ac6ef56f26d33a2519b189ca3865a8b5 100644 (file)
@@ -56,13 +56,12 @@ template <typename Stream>
 class StreamIO : public rgw::asio::ClientIO {
   CephContext* const cct;
   Stream& stream;
-  spawn::yield_context yield;
+  yield_context yield;
   parse_buffer& buffer;
   ceph::timespan request_timeout;
  public:
   StreamIO(CephContext *cct, Stream& stream, rgw::asio::parser_type& parser,
-           spawn::yield_context yield,
-           parse_buffer& buffer, bool is_ssl,
+           yield_context yield, parse_buffer& buffer, bool is_ssl,
            const tcp::endpoint& local_endpoint,
            const tcp::endpoint& remote_endpoint,
            ceph::timespan request_timeout)
@@ -174,7 +173,7 @@ void handle_connection(boost::asio::io_context& context,
                        SharedMutex& pause_mutex,
                        rgw::dmclock::Scheduler *scheduler,
                        boost::system::error_code& ec,
-                       spawn::yield_context yield,
+                       yield_context yield,
                        ceph::timespan request_timeout)
 {
   // limit header to 4k, since we read it all into a single flat_buffer
@@ -954,7 +953,7 @@ void AsioFrontend::accept(Listener& l, boost::system::error_code ec)
 #ifdef WITH_RADOSGW_BEAST_OPENSSL
   if (l.use_ssl) {
     spawn::spawn(context,
-      [this, s=std::move(stream)] (spawn::yield_context yield) mutable {
+      [this, s=std::move(stream)] (yield_context yield) mutable {
         Connection conn{s.socket()};
         auto c = connections.add(conn);
         // wrap the tcp_stream in an ssl stream
@@ -985,7 +984,7 @@ void AsioFrontend::accept(Listener& l, boost::system::error_code ec)
   {
 #endif // WITH_RADOSGW_BEAST_OPENSSL
     spawn::spawn(context,
-      [this, s=std::move(stream)] (spawn::yield_context yield) mutable {
+      [this, s=std::move(stream)] (yield_context yield) mutable {
         Connection conn{s.socket()};
         auto c = connections.add(conn);
         auto buffer = std::make_unique<parse_buffer>();
index d4acc4c0b1f8fc28173448473f588b37cef89370..ad93a689f9c68afc6295b1a20962f75f0b86a01e 100644 (file)
@@ -131,11 +131,11 @@ struct D3nL1CacheRequest {
     }
   };
 
-  void file_aio_read_abstract(const DoutPrefixProvider *dpp, boost::asio::io_context& context, spawn::yield_context yield,
+  void file_aio_read_abstract(const DoutPrefixProvider *dpp, boost::asio::io_context& context, yield_context yield,
                               std::string& file_path, off_t read_ofs, off_t read_len,
                               rgw::Aio* aio, rgw::AioResult& r) {
     using namespace boost::asio;
-    async_completion<spawn::yield_context, void()> init(yield);
+    async_completion<yield_context, void()> init(yield);
     auto ex = get_associated_executor(init.completion_handler);
 
     auto& ref = r.obj.get_ref();
index 428c7174c01c7b1f9c1e86d93c3d27fb3720c816..a210ae45fec2c66a5508d591924cb4d437daaaae 100644 (file)
@@ -144,7 +144,7 @@ class Manager : public DoutPrefixProvider {
       pending_tokens(0),
       timer(io_context) {}  
  
-    void async_wait(spawn::yield_context yield) { 
+    void async_wait(yield_context yield) {
       if (pending_tokens == 0) {
         return;
       }
@@ -161,7 +161,7 @@ class Manager : public DoutPrefixProvider {
 
   // processing of a specific entry
   // return whether processing was successfull (true) or not (false)
-  bool process_entry(const cls_queue_entry& entry, spawn::yield_context yield) {
+  bool process_entry(const cls_queue_entry& entry, yield_context yield) {
     event_entry_t event_entry;
     auto iter = entry.data.cbegin();
     try {
@@ -196,7 +196,7 @@ class Manager : public DoutPrefixProvider {
   }
 
   // clean stale reservation from queue
-  void cleanup_queue(const std::string& queue_name, spawn::yield_context yield) {
+  void cleanup_queue(const std::string& queue_name, yield_context yield) {
     while (true) {
       ldpp_dout(this, 20) << "INFO: trying to perform stale reservation cleanup for queue: " << queue_name << dendl;
       const auto now = ceph::coarse_real_time::clock::now();
@@ -232,13 +232,13 @@ class Manager : public DoutPrefixProvider {
   }
 
   // processing of a specific queue
-  void process_queue(const std::string& queue_name, spawn::yield_context yield) {
+  void process_queue(const std::string& queue_name, yield_context yield) {
     constexpr auto max_elements = 1024;
     auto is_idle = false;
     const std::string start_marker;
 
     // start a the cleanup coroutine for the queue
-    spawn::spawn(io_context, [this, queue_name](spawn::yield_context yield) {
+    spawn::spawn(io_context, [this, queue_name](yield_context yield) {
             cleanup_queue(queue_name, yield);
             }, make_stack_allocator());
     
@@ -311,7 +311,7 @@ class Manager : public DoutPrefixProvider {
           break;
         }
         // TODO pass entry pointer instead of by-value
-        spawn::spawn(yield, [this, &queue_name, entry_idx, total_entries, &end_marker, &remove_entries, &has_error, &waiter, entry](spawn::yield_context yield) {
+        spawn::spawn(yield, [this, &queue_name, entry_idx, total_entries, &end_marker, &remove_entries, &has_error, &waiter, entry](yield_context yield) {
             const auto token = waiter.make_token();
             if (process_entry(entry, yield)) {
               ldpp_dout(this, 20) << "INFO: processing of entry: " << 
@@ -371,7 +371,7 @@ class Manager : public DoutPrefixProvider {
 
   // process all queues
   // find which of the queues is owned by this daemon and process it
-  void process_queues(spawn::yield_context yield) {
+  void process_queues(yield_context yield) {
     auto has_error = false;
     owned_queues_t owned_queues;
 
@@ -438,7 +438,7 @@ class Manager : public DoutPrefixProvider {
         if (owned_queues.insert(queue_name).second) {
           ldpp_dout(this, 10) << "INFO: queue: " << queue_name << " now owned (locked) by this daemon" << dendl;
           // start processing this queue
-          spawn::spawn(io_context, [this, &queue_gc, &queue_gc_lock, queue_name](spawn::yield_context yield) {
+          spawn::spawn(io_context, [this, &queue_gc, &queue_gc_lock, queue_name](yield_context yield) {
             process_queue(queue_name, yield);
             // if queue processing ended, it measn that the queue was removed or not owned anymore
             // mark it for deletion
@@ -488,7 +488,7 @@ public:
     stale_reservations_period_s(_stale_reservations_period_s),
     reservations_cleanup_period_s(_reservations_cleanup_period_s)
     {
-      spawn::spawn(io_context, [this](spawn::yield_context yield) {
+      spawn::spawn(io_context, [this] (yield_context yield) {
             process_queues(yield);
           }, make_stack_allocator());
 
index 83dc68f44c446dda4edca4e8076c4cf93b301c75..ec55eca4adf95a136addb913457f8e10bc42993e 100644 (file)
@@ -177,7 +177,7 @@ int rgw_bucket_sync_checkpoint(const DoutPrefixProvider* dpp,
     entry.pipe = pipe;
 
     // fetch remote markers
-    spawn::spawn(ioctx, [&] (spawn::yield_context yield) {
+    spawn::spawn(ioctx, [&] (yield_context yield) {
       auto y = optional_yield{ioctx, yield};
       int r = source_bilog_markers(dpp, store->svc()->zone, entry.pipe,
                                    entry.remote_markers, y);
@@ -188,7 +188,7 @@ int rgw_bucket_sync_checkpoint(const DoutPrefixProvider* dpp,
       }
     });
     // fetch source bucket info
-    spawn::spawn(ioctx, [&] (spawn::yield_context yield) {
+    spawn::spawn(ioctx, [&] (yield_context yield) {
       auto y = optional_yield{ioctx, yield};
       auto obj_ctx = store->svc()->sysobj->init_obj_ctx();
       int r = store->getRados()->get_bucket_instance_info(
index 3b9fe1a5574c6f22d8fb39fd3f28706f36080c6a..ca4aa02012d998bb0e82486e657745859cb2d3b1 100644 (file)
@@ -400,7 +400,7 @@ TEST(Queue, SpawnAsyncRequest)
 {
   boost::asio::io_context context;
 
-  spawn::spawn(context, [&] (spawn::yield_context yield) {
+  spawn::spawn(context, [&] (yield_context yield) {
     ClientCounters counters(g_ceph_context);
     AsyncScheduler queue(g_ceph_context, context, std::ref(counters), nullptr,
                     [] (client_id client) -> ClientInfo* {
index fa947c4f3c2c45afd377624c103d7250a6b051d9..1db8bad391a6723f3ce8a31d7074c6c724341902 100644 (file)
@@ -64,7 +64,7 @@ TEST(ReshardWait, wait_yield)
   RGWReshardWait waiter(wait_duration);
 
   boost::asio::io_context context;
-  spawn::spawn(context, [&] (spawn::yield_context yield) {
+  spawn::spawn(context, [&] (yield_context yield) {
       EXPECT_EQ(0, waiter.wait(optional_yield{context, yield}));
     });
 
@@ -90,7 +90,7 @@ TEST(ReshardWait, stop_yield)
 
   boost::asio::io_context context;
   spawn::spawn(context,
-    [&] (spawn::yield_context yield) {
+    [&] (yield_context yield) {
       EXPECT_EQ(-ECANCELED, long_waiter.wait(optional_yield{context, yield}));
     });
 
@@ -133,7 +133,7 @@ TEST(ReshardWait, stop_multiple)
   // spawn 4 coroutines
   boost::asio::io_context context;
   {
-    auto async_waiter = [&] (spawn::yield_context yield) {
+    auto async_waiter = [&] (yield_context yield) {
         EXPECT_EQ(-ECANCELED, long_waiter.wait(optional_yield{context, yield}));
       };
     spawn::spawn(context, async_waiter);
index e58ab365ddc63077928377122e0a75b7a66f98ce..7ac82a30762dff8f1eae89e7e1de487c4421a813 100644 (file)
@@ -172,7 +172,7 @@ TEST_F(Aio_Throttle, YieldCostOverWindow)
 
   boost::asio::io_context context;
   spawn::spawn(context,
-    [&] (spawn::yield_context yield) {
+    [&] (yield_context yield) {
       YieldingAioThrottle throttle(4, context, yield);
       scoped_completion op;
       auto c = throttle.get(obj, wait_on(op), 8, 0);
@@ -194,7 +194,7 @@ TEST_F(Aio_Throttle, YieldingThrottleOverMax)
 
   boost::asio::io_context context;
   spawn::spawn(context,
-    [&] (spawn::yield_context yield) {
+    [&] (yield_context yield) {
       YieldingAioThrottle throttle(window, context, yield);
       for (uint64_t i = 0; i < total; i++) {
         using namespace std::chrono_literals;