]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/multisite/datalog: Make add_entry a stackful coroutine
authorAdam Emerson <aemerson@redhat.com>
Thu, 16 May 2024 02:42:38 +0000 (22:42 -0400)
committerAdam C. Emerson <aemerson@redhat.com>
Tue, 1 Apr 2025 15:10:14 +0000 (11:10 -0400)
Since, outside of testing, it's only called from stackful coroutines,
for now.

Signed-off-by: Adam Emerson <aemerson@redhat.com>
src/rgw/driver/rados/rgw_datalog.cc
src/rgw/driver/rados/rgw_datalog.h
src/rgw/driver/rados/rgw_log_backing.h

index 45ca67f021cde846d03c2708b90744497a93ef87..603afa30badb7aa01760fa7b1c0edc8951951d1b 100644 (file)
@@ -163,14 +163,13 @@ public:
       asio::use_awaitable);
     co_return;
   }
-  asio::awaitable<void> push(const DoutPrefixProvider *dpp, int index,
-                            ceph::real_time now, const std::string& key,
-                            buffer::list&& bl) override {
-    co_await r.execute(
-      oids[index], loc,
-      neorados::WriteOp{}.exec(nlog::add(now, {}, key, std::move(bl))),
-      asio::use_awaitable);
-    co_return;
+  void push(const DoutPrefixProvider *dpp, int index,
+           ceph::real_time now, const std::string& key,
+           buffer::list&& bl, asio::yield_context y) override {
+    r.execute(oids[index], loc,
+             neorados::WriteOp{}.exec(nlog::add(now, {}, key, std::move(bl))),
+             y);
+    return;
   }
 
   asio::awaitable<std::tuple<std::span<rgw_data_change_log_entry>,
@@ -270,7 +269,7 @@ public:
 };
 
 class RGWDataChangesFIFO final : public RGWDataChangesBE {
-  using centries = std::vector<buffer::list>;
+  using centries = std::deque<buffer::list>;
   tiny_vector<LazyFIFO> fifos;
 
 public:
@@ -296,10 +295,10 @@ public:
                             entries&& items) override {
     co_return co_await fifos[index].push(dpp, std::get<centries>(items));
   }
-  asio::awaitable<void> push(const DoutPrefixProvider* dpp, int index,
-                            ceph::real_time, const std::string&,
-                            buffer::list&& bl) override {
-    co_return co_await fifos[index].push(dpp, std::move(bl));
+  void push(const DoutPrefixProvider* dpp, int index,
+           ceph::real_time, const std::string&,
+           buffer::list&& bl, asio::yield_context y) override {
+    fifos[index].push(dpp, std::move(bl), y);
   }
   asio::awaitable<std::tuple<std::span<rgw_data_change_log_entry>,
                             std::string>>
@@ -854,20 +853,15 @@ int RGWDataChangesLog::get_log_shard_id(rgw_bucket& bucket, int shard_id) {
   return choose_oid(bs);
 }
 
-asio::awaitable<bool>
-RGWDataChangesLog::filter_bucket(const DoutPrefixProvider *dpp,
-                                const rgw_bucket& bucket) const
+bool RGWDataChangesLog::filter_bucket(const DoutPrefixProvider *dpp,
+                                     const rgw_bucket& bucket,
+                                     asio::yield_context y) const
 {
   if (!bucket_filter) {
-    co_return true;
+    return true;
   }
 
-  co_return co_await asio::spawn(
-    co_await asio::this_coro::executor,
-    [this, dpp, &bucket](asio::yield_context yc) {
-      optional_yield y(yc);
-      return bucket_filter(bucket, y, dpp);
-    }, asio::use_awaitable);
+  return bucket_filter(bucket, y, dpp);
 }
 
 std::string RGWDataChangesLog::get_oid(uint64_t gen_id, int i) const {
@@ -885,15 +879,29 @@ RGWDataChangesLog::add_entry(const DoutPrefixProvider* dpp,
                             const RGWBucketInfo& bucket_info,
                             const rgw::bucket_log_layout_generation& gen,
                             int shard_id)
+{
+  co_await asio::spawn(
+    co_await asio::this_coro::executor,
+    [this, dpp, &bucket_info, &gen, shard_id](asio::yield_context y) {
+      return add_entry(dpp, bucket_info, gen, shard_id, y);
+    }, asio::use_awaitable);
+  co_return;
+}
+
+
+void RGWDataChangesLog::add_entry(const DoutPrefixProvider* dpp,
+                                 const RGWBucketInfo& bucket_info,
+                                 const rgw::bucket_log_layout_generation& gen,
+                                 int shard_id, asio::yield_context y)
 {
   if (!log_data) {
-    co_return;
+    return;
   }
 
   auto& bucket = bucket_info.bucket;
 
-  if (!co_await filter_bucket(dpp, bucket)) {
-    co_return;
+  if (!filter_bucket(dpp, bucket, y)) {
+    return;
   }
 
   if (observer) {
@@ -921,8 +929,8 @@ RGWDataChangesLog::add_entry(const DoutPrefixProvider* dpp,
 
     auto be = bes->head();
     // Failure on push is fatal if we're bypassing semaphores.
-    co_await be->push(dpp, index, now, change.key, std::move(bl));
-    co_return;
+    be->push(dpp, index, now, change.key, std::move(bl), y);
+    return;
   }
 
   mark_modified(index, bs, gen.gen);
@@ -950,17 +958,16 @@ RGWDataChangesLog::add_entry(const DoutPrefixProvider* dpp,
     if (need_sem_set) {
       using neorados::WriteOp;
       using neorados::cls::sem_set::increment;
-      co_await rados->execute(get_sem_set_oid(index), loc,
-                             WriteOp{}.exec(increment(std::move(key))),
-                             asio::use_awaitable);
+      rados->execute(get_sem_set_oid(index), loc,
+                    WriteOp{}.exec(increment(std::move(key))), y);
     }
-    co_return;
+    return;
   }
 
   if (status->pending) {
-    co_await status->cond.async_wait(sl, asio::use_awaitable);
+    status->cond.async_wait(sl, y);
     sl.unlock();
-    co_return;
+    return;
   }
 
   status->cond.notify(sl);
@@ -987,7 +994,7 @@ RGWDataChangesLog::add_entry(const DoutPrefixProvider* dpp,
   auto be = bes->head();
   // Failure on push isn't fatal.
   try {
-    co_await be->push(dpp, index, now, change.key, std::move(bl));
+    be->push(dpp, index, now, change.key, std::move(bl), y);
   } catch (const std::exception& e) {
     ldpp_dout(dpp, 5) << "RGWDataChangesLog::add_entry(): Backend push failed "
                      << "with exception: " << e.what() << dendl;
@@ -1005,7 +1012,7 @@ RGWDataChangesLog::add_entry(const DoutPrefixProvider* dpp,
   status->cond.notify(sl);
   sl.unlock();
 
-  co_return;
+  return;
 }
 
 int RGWDataChangesLog::add_entry(const DoutPrefixProvider* dpp,
@@ -1015,19 +1022,19 @@ int RGWDataChangesLog::add_entry(const DoutPrefixProvider* dpp,
 {
   std::exception_ptr eptr;
   if (y) {
-    auto& yield = y.get_yield_context();
     try {
-      asio::co_spawn(yield.get_executor(),
-                    add_entry(dpp, bucket_info, gen, shard_id),
-                    yield);
+      add_entry(dpp, bucket_info, gen, shard_id, y.get_yield_context());
     } catch (const std::exception&) {
       eptr = std::current_exception();
     }
   } else {
     maybe_warn_about_blocking(dpp);
-    eptr = asio::co_spawn(rados->get_executor(),
-                         add_entry(dpp, bucket_info, gen, shard_id),
-                         async::use_blocked);
+    eptr = asio::spawn(rados->get_executor(),
+                      [this, dpp, &bucket_info, &gen,
+                       &shard_id](asio::yield_context y) {
+                        add_entry(dpp, bucket_info, gen, shard_id, y);
+                      },
+                      async::use_blocked);
   }
   return ceph::from_exception(eptr);
 }
index f63a890c54b8ce9176bebcc0337f6d8332d98409..43e9ca446ab03ca05beeb6f05dffb74867bdd6d2 100644 (file)
@@ -415,8 +415,9 @@ class RGWDataChangesLog {
   std::function<bool(const rgw_bucket& bucket, optional_yield y,
                      const DoutPrefixProvider *dpp)> bucket_filter;
   bool going_down() const;
-  asio::awaitable<bool> filter_bucket(const DoutPrefixProvider* dpp,
-                                     const rgw_bucket& bucket) const;
+  bool filter_bucket(const DoutPrefixProvider* dpp,
+                    const rgw_bucket& bucket,
+                    asio::yield_context y) const;
   asio::awaitable<void> renew_entries(const DoutPrefixProvider *dpp);
 
   uint64_t watchcookie = 0;
@@ -451,6 +452,10 @@ public:
                                  const RGWBucketInfo& bucket_info,
                                  const rgw::bucket_log_layout_generation& gen,
                                  int shard_id);
+  void add_entry(const DoutPrefixProvider *dpp,
+                const RGWBucketInfo& bucket_info,
+                const rgw::bucket_log_layout_generation& gen,
+                int shard_id, asio::yield_context y);
   int add_entry(const DoutPrefixProvider *dpp,
                const RGWBucketInfo& bucket_info,
                const rgw::bucket_log_layout_generation& gen,
@@ -540,7 +545,7 @@ protected:
   }
 public:
   using entries = std::variant<std::vector<cls::log::entry>,
-                              std::vector<ceph::buffer::list>>;
+                              std::deque<ceph::buffer::list>>;
 
   const uint64_t gen_id;
 
@@ -555,10 +560,11 @@ public:
                       ceph::buffer::list&& entry, entries& out) = 0;
   virtual asio::awaitable<void> push(const DoutPrefixProvider *dpp, int index,
                                     entries&& items) = 0;
-  virtual asio::awaitable<void> push(const DoutPrefixProvider *dpp, int index,
-                                    ceph::real_time now,
-                                    const std::string& key,
-                                    ceph::buffer::list&& bl) = 0;
+  virtual void push(const DoutPrefixProvider *dpp, int index,
+                   ceph::real_time now,
+                   const std::string& key,
+                   ceph::buffer::list&& bl,
+                   asio::yield_context y) = 0;
   virtual asio::awaitable<std::tuple<std::span<rgw_data_change_log_entry>,
                          std::string>>
   list(const DoutPrefixProvider* dpp, int shard,
index 999aa0d98d7c2eb35b010021df165263ed8f03a5..7d5ffee150e91ae5a5f32268415858ef32c8abef 100644 (file)
@@ -9,6 +9,7 @@
 #include <string_view>
 
 #include <boost/asio/awaitable.hpp>
+#include <boost/asio/spawn.hpp>
 #include <boost/asio/strand.hpp>
 #include <boost/asio/use_awaitable.hpp>
 
@@ -277,16 +278,41 @@ class LazyFIFO {
     co_return;
   }
 
+  void lazy_init(const DoutPrefixProvider *dpp, asio::yield_context y) {
+    std::unique_lock l(m);
+    if (fifo) {
+      return;
+    } else {
+      l.unlock();
+      // FIFO supports multiple clients by design, so it's safe to
+      // race to create them.
+      auto fifo_tmp = fifo::FIFO::create(dpp, r, oid, loc, y);
+      l.lock();
+      if (!fifo) {
+       // We won the race
+       fifo = std::move(fifo_tmp);
+      }
+    }
+    l.unlock();
+    return;
+  }
+
 public:
 
   LazyFIFO(neorados::RADOS& r,  std::string oid, neorados::IOContext loc)
     : r(r), oid(std::move(oid)), loc(std::move(loc)) {}
 
-  template <typename... Args>
-  asio::awaitable<void> push(const DoutPrefixProvider *dpp, Args&& ...args) {
+  asio::awaitable<void> push(const DoutPrefixProvider *dpp,
+                            std::deque<ceph::buffer::list> entries) {
     co_await lazy_init(dpp);
-    co_return co_await fifo->push(dpp, std::forward<Args>(args)...,
-                                 asio::use_awaitable);
+    co_return co_await fifo->push(dpp, std::move(entries), asio::use_awaitable);
+  }
+
+  void push(const DoutPrefixProvider *dpp,
+                            ceph::buffer::list entry,
+                            asio::yield_context y) {
+    lazy_init(dpp, y);
+    fifo->push(dpp, std::move(entry), y);
   }
 
   asio::awaitable<std::tuple<std::span<fifo::entry>, std::optional<std::string>>>