From 2f0e4f796a60fb9cc07f17dc4a706675ece85956 Mon Sep 17 00:00:00 2001 From: Adam Emerson Date: Wed, 15 May 2024 22:42:38 -0400 Subject: [PATCH] rgw/multisite/datalog: Make add_entry a stackful coroutine Since, outside of testing, it's only called from stackful coroutines, for now. Signed-off-by: Adam Emerson --- src/rgw/driver/rados/rgw_datalog.cc | 93 ++++++++++++++------------ src/rgw/driver/rados/rgw_datalog.h | 20 ++++-- src/rgw/driver/rados/rgw_log_backing.h | 34 ++++++++-- 3 files changed, 93 insertions(+), 54 deletions(-) diff --git a/src/rgw/driver/rados/rgw_datalog.cc b/src/rgw/driver/rados/rgw_datalog.cc index 45ca67f021c..603afa30bad 100644 --- a/src/rgw/driver/rados/rgw_datalog.cc +++ b/src/rgw/driver/rados/rgw_datalog.cc @@ -163,14 +163,13 @@ public: asio::use_awaitable); co_return; } - asio::awaitable push(const DoutPrefixProvider *dpp, int index, - ceph::real_time now, const std::string& key, - buffer::list&& bl) override { - co_await r.execute( - oids[index], loc, - neorados::WriteOp{}.exec(nlog::add(now, {}, key, std::move(bl))), - asio::use_awaitable); - co_return; + void push(const DoutPrefixProvider *dpp, int index, + ceph::real_time now, const std::string& key, + buffer::list&& bl, asio::yield_context y) override { + r.execute(oids[index], loc, + neorados::WriteOp{}.exec(nlog::add(now, {}, key, std::move(bl))), + y); + return; } asio::awaitable, @@ -270,7 +269,7 @@ public: }; class RGWDataChangesFIFO final : public RGWDataChangesBE { - using centries = std::vector; + using centries = std::deque; tiny_vector fifos; public: @@ -296,10 +295,10 @@ public: entries&& items) override { co_return co_await fifos[index].push(dpp, std::get(items)); } - asio::awaitable push(const DoutPrefixProvider* dpp, int index, - ceph::real_time, const std::string&, - buffer::list&& bl) override { - co_return co_await fifos[index].push(dpp, std::move(bl)); + void push(const DoutPrefixProvider* dpp, int index, + ceph::real_time, const std::string&, + buffer::list&& bl, asio::yield_context y) override { + fifos[index].push(dpp, std::move(bl), y); } asio::awaitable, std::string>> @@ -854,20 +853,15 @@ int RGWDataChangesLog::get_log_shard_id(rgw_bucket& bucket, int shard_id) { return choose_oid(bs); } -asio::awaitable -RGWDataChangesLog::filter_bucket(const DoutPrefixProvider *dpp, - const rgw_bucket& bucket) const +bool RGWDataChangesLog::filter_bucket(const DoutPrefixProvider *dpp, + const rgw_bucket& bucket, + asio::yield_context y) const { if (!bucket_filter) { - co_return true; + return true; } - co_return co_await asio::spawn( - co_await asio::this_coro::executor, - [this, dpp, &bucket](asio::yield_context yc) { - optional_yield y(yc); - return bucket_filter(bucket, y, dpp); - }, asio::use_awaitable); + return bucket_filter(bucket, y, dpp); } std::string RGWDataChangesLog::get_oid(uint64_t gen_id, int i) const { @@ -885,15 +879,29 @@ RGWDataChangesLog::add_entry(const DoutPrefixProvider* dpp, const RGWBucketInfo& bucket_info, const rgw::bucket_log_layout_generation& gen, int shard_id) +{ + co_await asio::spawn( + co_await asio::this_coro::executor, + [this, dpp, &bucket_info, &gen, shard_id](asio::yield_context y) { + return add_entry(dpp, bucket_info, gen, shard_id, y); + }, asio::use_awaitable); + co_return; +} + + +void RGWDataChangesLog::add_entry(const DoutPrefixProvider* dpp, + const RGWBucketInfo& bucket_info, + const rgw::bucket_log_layout_generation& gen, + int shard_id, asio::yield_context y) { if (!log_data) { - co_return; + return; } auto& bucket = bucket_info.bucket; - if (!co_await filter_bucket(dpp, bucket)) { - co_return; + if (!filter_bucket(dpp, bucket, y)) { + return; } if (observer) { @@ -921,8 +929,8 @@ RGWDataChangesLog::add_entry(const DoutPrefixProvider* dpp, auto be = bes->head(); // Failure on push is fatal if we're bypassing semaphores. - co_await be->push(dpp, index, now, change.key, std::move(bl)); - co_return; + be->push(dpp, index, now, change.key, std::move(bl), y); + return; } mark_modified(index, bs, gen.gen); @@ -950,17 +958,16 @@ RGWDataChangesLog::add_entry(const DoutPrefixProvider* dpp, if (need_sem_set) { using neorados::WriteOp; using neorados::cls::sem_set::increment; - co_await rados->execute(get_sem_set_oid(index), loc, - WriteOp{}.exec(increment(std::move(key))), - asio::use_awaitable); + rados->execute(get_sem_set_oid(index), loc, + WriteOp{}.exec(increment(std::move(key))), y); } - co_return; + return; } if (status->pending) { - co_await status->cond.async_wait(sl, asio::use_awaitable); + status->cond.async_wait(sl, y); sl.unlock(); - co_return; + return; } status->cond.notify(sl); @@ -987,7 +994,7 @@ RGWDataChangesLog::add_entry(const DoutPrefixProvider* dpp, auto be = bes->head(); // Failure on push isn't fatal. try { - co_await be->push(dpp, index, now, change.key, std::move(bl)); + be->push(dpp, index, now, change.key, std::move(bl), y); } catch (const std::exception& e) { ldpp_dout(dpp, 5) << "RGWDataChangesLog::add_entry(): Backend push failed " << "with exception: " << e.what() << dendl; @@ -1005,7 +1012,7 @@ RGWDataChangesLog::add_entry(const DoutPrefixProvider* dpp, status->cond.notify(sl); sl.unlock(); - co_return; + return; } int RGWDataChangesLog::add_entry(const DoutPrefixProvider* dpp, @@ -1015,19 +1022,19 @@ int RGWDataChangesLog::add_entry(const DoutPrefixProvider* dpp, { std::exception_ptr eptr; if (y) { - auto& yield = y.get_yield_context(); try { - asio::co_spawn(yield.get_executor(), - add_entry(dpp, bucket_info, gen, shard_id), - yield); + add_entry(dpp, bucket_info, gen, shard_id, y.get_yield_context()); } catch (const std::exception&) { eptr = std::current_exception(); } } else { maybe_warn_about_blocking(dpp); - eptr = asio::co_spawn(rados->get_executor(), - add_entry(dpp, bucket_info, gen, shard_id), - async::use_blocked); + eptr = asio::spawn(rados->get_executor(), + [this, dpp, &bucket_info, &gen, + &shard_id](asio::yield_context y) { + add_entry(dpp, bucket_info, gen, shard_id, y); + }, + async::use_blocked); } return ceph::from_exception(eptr); } diff --git a/src/rgw/driver/rados/rgw_datalog.h b/src/rgw/driver/rados/rgw_datalog.h index f63a890c54b..43e9ca446ab 100644 --- a/src/rgw/driver/rados/rgw_datalog.h +++ b/src/rgw/driver/rados/rgw_datalog.h @@ -415,8 +415,9 @@ class RGWDataChangesLog { std::function bucket_filter; bool going_down() const; - asio::awaitable filter_bucket(const DoutPrefixProvider* dpp, - const rgw_bucket& bucket) const; + bool filter_bucket(const DoutPrefixProvider* dpp, + const rgw_bucket& bucket, + asio::yield_context y) const; asio::awaitable renew_entries(const DoutPrefixProvider *dpp); uint64_t watchcookie = 0; @@ -451,6 +452,10 @@ public: const RGWBucketInfo& bucket_info, const rgw::bucket_log_layout_generation& gen, int shard_id); + void add_entry(const DoutPrefixProvider *dpp, + const RGWBucketInfo& bucket_info, + const rgw::bucket_log_layout_generation& gen, + int shard_id, asio::yield_context y); int add_entry(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const rgw::bucket_log_layout_generation& gen, @@ -540,7 +545,7 @@ protected: } public: using entries = std::variant, - std::vector>; + std::deque>; const uint64_t gen_id; @@ -555,10 +560,11 @@ public: ceph::buffer::list&& entry, entries& out) = 0; virtual asio::awaitable push(const DoutPrefixProvider *dpp, int index, entries&& items) = 0; - virtual asio::awaitable push(const DoutPrefixProvider *dpp, int index, - ceph::real_time now, - const std::string& key, - ceph::buffer::list&& bl) = 0; + virtual void push(const DoutPrefixProvider *dpp, int index, + ceph::real_time now, + const std::string& key, + ceph::buffer::list&& bl, + asio::yield_context y) = 0; virtual asio::awaitable, std::string>> list(const DoutPrefixProvider* dpp, int shard, diff --git a/src/rgw/driver/rados/rgw_log_backing.h b/src/rgw/driver/rados/rgw_log_backing.h index 999aa0d98d7..7d5ffee150e 100644 --- a/src/rgw/driver/rados/rgw_log_backing.h +++ b/src/rgw/driver/rados/rgw_log_backing.h @@ -9,6 +9,7 @@ #include #include +#include #include #include @@ -277,16 +278,41 @@ class LazyFIFO { co_return; } + void lazy_init(const DoutPrefixProvider *dpp, asio::yield_context y) { + std::unique_lock l(m); + if (fifo) { + return; + } else { + l.unlock(); + // FIFO supports multiple clients by design, so it's safe to + // race to create them. + auto fifo_tmp = fifo::FIFO::create(dpp, r, oid, loc, y); + l.lock(); + if (!fifo) { + // We won the race + fifo = std::move(fifo_tmp); + } + } + l.unlock(); + return; + } + public: LazyFIFO(neorados::RADOS& r, std::string oid, neorados::IOContext loc) : r(r), oid(std::move(oid)), loc(std::move(loc)) {} - template - asio::awaitable push(const DoutPrefixProvider *dpp, Args&& ...args) { + asio::awaitable push(const DoutPrefixProvider *dpp, + std::deque entries) { co_await lazy_init(dpp); - co_return co_await fifo->push(dpp, std::forward(args)..., - asio::use_awaitable); + co_return co_await fifo->push(dpp, std::move(entries), asio::use_awaitable); + } + + void push(const DoutPrefixProvider *dpp, + ceph::buffer::list entry, + asio::yield_context y) { + lazy_init(dpp, y); + fifo->push(dpp, std::move(entry), y); } asio::awaitable, std::optional>> -- 2.39.5