From d1f31bcb3da2d049ed01cccd6a448f2662d480ba Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Thu, 5 Jun 2025 15:53:31 -0400 Subject: [PATCH] rgw/lc: replace WorkPool with ceph::async::spawn_throttle use spawn_throttle to spawn the work functions as coroutines instead of passing WorkItems to separate WorkQ threads for processing. the spawn_throttle concurrency limit uses the same rgw_lc_max_wp_worker value that previously controlled the number of WorkQ threads Signed-off-by: Casey Bodley --- src/common/options/rgw.yaml.in | 10 -- src/rgw/rgw_lc.cc | 261 ++++++++------------------------- src/rgw/rgw_lc.h | 9 +- 3 files changed, 66 insertions(+), 214 deletions(-) diff --git a/src/common/options/rgw.yaml.in b/src/common/options/rgw.yaml.in index 2fda1086d0e..309e7639297 100644 --- a/src/common/options/rgw.yaml.in +++ b/src/common/options/rgw.yaml.in @@ -450,16 +450,6 @@ options: services: - rgw with_legacy: true -- name: rgw_lc_wp_worker_max_aio - type: int - level: advanced - desc: Max number of concurrent lifecycle handlings per workpool thread. - default: 1 - services: - - rgw - min: 1 - max: 128 - with_legacy: true - name: rgw_lc_max_objs type: int level: advanced diff --git a/src/rgw/rgw_lc.cc b/src/rgw/rgw_lc.cc index e9f4fca9f07..4c37f40e97b 100644 --- a/src/rgw/rgw_lc.cc +++ b/src/rgw/rgw_lc.cc @@ -12,12 +12,11 @@ #include #include #include -#include -#include #include "include/scope_guard.h" #include "include/function2.hpp" #include "common/Clock.h" // for ceph_clock_now() +#include "common/async/spawn_throttle.h" #include "common/Formatter.h" #include "common/containers.h" #include "common/split.h" @@ -25,6 +24,7 @@ #include "include/random.h" #include "cls/lock/cls_lock_client.h" #include "rgw_perf_counters.h" +#include "rgw_asio_thread.h" #include "rgw_common.h" #include "rgw_bucket.h" #include "rgw_bucket_layout.h" @@ -486,7 +486,6 @@ struct op_env { }; /* op_env */ class LCRuleOp; -class WorkQ; struct lc_op_ctx { CephContext *cct; @@ -761,173 +760,10 @@ public: optional_yield y); }; /* LCOpRule */ -using WorkItem = - std::variant, - /* uncompleted MPU expiration */ - std::tuple, - rgw_bucket_dir_entry>; - -class WorkQ : public Thread -{ -public: - using unique_lock = std::unique_lock; - using work_f = std::function; - using dequeue_result = std::list; - - static constexpr uint32_t FLAG_NONE = 0x0000; - static constexpr uint32_t FLAG_EWAIT_SYNC = 0x0001; - static constexpr uint32_t FLAG_DWAIT_SYNC = 0x0002; - static constexpr uint32_t FLAG_EDRAIN_SYNC = 0x0004; - -private: - const work_f bsf = [](RGWLC::LCWorker* wk, WorkItem& wi, optional_yield) {}; - RGWLC::LCWorker* wk; - uint32_t qmax; - int ix; - std::mutex mtx; - std::condition_variable cv; - uint32_t flags; - std::list items; - work_f f; - -public: - WorkQ(RGWLC::LCWorker* wk, uint32_t ix, uint32_t qmax) - : wk(wk), qmax(qmax), ix(ix), flags(FLAG_NONE), f(bsf) - { - create(thr_name().c_str()); - } - - std::string thr_name() { - return std::string{"wp_thrd: "} - + std::to_string(wk->ix) + ", " + std::to_string(ix); - } - - void setf(work_f _f) { - f = _f; - } - - void enqueue(WorkItem&& item) { - unique_lock uniq(mtx); - while ((!wk->get_lc()->going_down()) && - (items.size() > qmax)) { - flags |= FLAG_EWAIT_SYNC; - cv.wait_for(uniq, 200ms); - } - items.push_back(item); - if (flags & FLAG_DWAIT_SYNC) { - flags &= ~FLAG_DWAIT_SYNC; - cv.notify_one(); - } - } - - void drain() { - unique_lock uniq(mtx); - flags |= FLAG_EDRAIN_SYNC; - while (flags & FLAG_EDRAIN_SYNC) { - cv.wait_for(uniq, 200ms); - } - } - -private: - dequeue_result dequeue(size_t max_items=1) { - unique_lock uniq(mtx); - while ((!wk->get_lc()->going_down()) && - (items.size() == 0)) { - /* clear drain state, as we are NOT doing work and qlen==0 */ - if (flags & FLAG_EDRAIN_SYNC) { - flags &= ~FLAG_EDRAIN_SYNC; - } - flags |= FLAG_DWAIT_SYNC; - cv.wait_for(uniq, 200ms); - } - if (items.size() > 0) { - size_t split_size = std::min(max_items, items.size()); - dequeue_result result; - result.splice(result.begin(), items, items.begin(), std::next(items.begin(), split_size)); - if (flags & FLAG_EWAIT_SYNC) { - flags &= ~FLAG_EWAIT_SYNC; - cv.notify_one(); - } - return result; - } - return dequeue_result{}; - } - - void* entry() override { - while (!wk->get_lc()->going_down()) { - boost::asio::io_context context; - for(auto& item : items) { - if(item.index() != 0) { - boost::asio::spawn(context, [&](boost::asio::yield_context yield) { - try { - optional_yield y(yield); - f(wk, item, y); - } catch (const std::exception& e) { - ldpp_dout(wk->dpp, 0) << "Coroutine error: " << e.what() << dendl; - } - }); - } - } - try { - context.run(); - } catch (const std::system_error& e) { - ldpp_dout(wk->dpp, 0) << "ERROR: WorkQ context run returned error r=" - << -e.code().value() << dendl; - } - } - return nullptr; - } -}; /* WorkQ */ - -class RGWLC::WorkPool -{ - using TVector = ceph::containers::tiny_vector; - TVector wqs; - uint64_t ix; - -public: - WorkPool(RGWLC::LCWorker* wk, uint16_t n_threads, uint32_t qmax) - : wqs(TVector{ - n_threads, - [&](const size_t ix, auto emplacer) { - emplacer.emplace(wk, ix, qmax); - }}), - ix(0) - {} - - ~WorkPool() { - for (auto& wq : wqs) { - wq.join(); - } - } - - void setf(WorkQ::work_f _f) { - for (auto& wq : wqs) { - wq.setf(_f); - } - } - - void enqueue(WorkItem item) { - const auto tix = ix; - ix = (ix+1) % wqs.size(); - (wqs[tix]).enqueue(std::move(item)); - } - - void drain() { - for (auto& wq : wqs) { - wq.drain(); - } - } -}; /* WorkPool */ - RGWLC::LCWorker::LCWorker(const DoutPrefixProvider* dpp, CephContext *cct, RGWLC *lc, int ix) : dpp(dpp), cct(cct), lc(lc), ix(ix) { - auto wpw = cct->_conf.get_val("rgw_lc_max_wp_worker"); - workpool = new WorkPool(this, wpw, 512); } static inline bool worker_should_stop(time_t stop_at, bool once) @@ -937,6 +773,7 @@ static inline bool worker_should_stop(time_t stop_at, bool once) int RGWLC::handle_multipart_expiration(rgw::sal::Bucket* target, const multimap& prefix_map, + ceph::async::spawn_throttle& workpool, LCWorker* worker, time_t stop_at, bool once) { int ret; @@ -956,10 +793,9 @@ int RGWLC::handle_multipart_expiration(rgw::sal::Bucket* target, params.ns = RGW_OBJ_NS_MULTIPART; params.access_list_filter = MultipartMetaFilter; - auto pf = [&](RGWLC::LCWorker *wk, WorkItem &wi, optional_yield y) { + auto pf = [this, target] (optional_yield y, const lc_op& rule, + const rgw_bucket_dir_entry& obj) { int ret{0}; - auto wt = std::get>(wi); - auto& [rule, obj] = wt; if (obj_has_expired(this, cct, obj.meta.mtime, rule.mp_expiration)) { rgw_obj_key key(obj.key); @@ -987,21 +823,17 @@ int RGWLC::handle_multipart_expiration(rgw::sal::Bucket* target, } } else { if (ret == -ERR_NO_SUCH_UPLOAD) { - ldpp_dout(wk->get_lc(), 5) << "ERROR: abort_multipart_upload " - "failed, ret=" << ret - << ", meta:" << obj.key << dendl; + ldpp_dout(this, 5) << "ERROR: abort_multipart_upload failed, ret=" + << ret << ", meta:" << obj.key << dendl; } else { - ldpp_dout(wk->get_lc(), 0) << "ERROR: abort_multipart_upload " - "failed, ret=" << ret - << ", meta:" << obj.key << dendl; + ldpp_dout(this, 0) << "ERROR: abort_multipart_upload failed, ret=" + << ret << ", meta:" << obj.key << dendl; } } /* abort failed */ } /* expired */ return ret; }; - worker->workpool->setf(pf); - for (auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); ++prefix_iter) { @@ -1028,9 +860,10 @@ int RGWLC::handle_multipart_expiration(rgw::sal::Bucket* target, } for (auto obj_iter = results.objs.begin(); obj_iter != results.objs.end(); ++obj_iter, ++offset) { - std::tuple t1 = - {prefix_iter->second, *obj_iter}; - worker->workpool->enqueue(WorkItem{t1}); + workpool.spawn([pf, op=prefix_iter->second, obj=*obj_iter] + (boost::asio::yield_context yield) mutable { + pf(yield, op, obj); + }); if (going_down()) { return 0; } @@ -1049,7 +882,6 @@ int RGWLC::handle_multipart_expiration(rgw::sal::Bucket* target, } while(results.is_truncated); } /* for prefix_map */ - worker->workpool->drain(); return 0; } /* RGWLC::handle_multipart_expiration */ @@ -1351,7 +1183,7 @@ public: }; class LCOpAction_Transition : public LCOpAction { - const transition_action& transition; + transition_action transition; bool need_to_process{false}; protected: @@ -1747,7 +1579,8 @@ int LCOpRule::process(rgw_bucket_dir_entry& o, } int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker, - time_t stop_at, bool once) + time_t stop_at, bool once, + boost::asio::yield_context yield) { RGWLifecycleConfiguration config(cct); std::unique_ptr bucket; @@ -1765,17 +1598,20 @@ int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker, } int ret = driver->load_bucket(this, rgw_bucket(bucket_tenant, bucket_name), - &bucket, null_yield); + &bucket, yield); if (ret < 0) { ldpp_dout(this, 0) << "LC:get_bucket for " << bucket_name << " failed" << dendl; return ret; } + // use a limited number of coroutines for concurrent processing + size_t limit = cct->_conf.get_val("rgw_lc_max_wp_worker"); + auto workpool = ceph::async::spawn_throttle{yield, limit}; auto stack_guard = make_scope_guard( - [&worker] + [&workpool] { - worker->workpool->drain(); + workpool.wait(); } ); @@ -1808,22 +1644,18 @@ int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker, /* fetch information for zone checks */ rgw::sal::Zone* zone = driver->get_zone(); - auto pf = [&bucket_name](RGWLC::LCWorker* wk, WorkItem& wi, optional_yield y) { - auto wt = - std::get>(wi); - auto& [op_rule, o] = wt; - - ldpp_dout(wk->get_lc(), 20) + auto pf = [&bucket_name](const DoutPrefixProvider* dpp, optional_yield y, + LCOpRule& op_rule, rgw_bucket_dir_entry& o) { + ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key << dendl; - int ret = op_rule.process(o, wk->dpp, y); + int ret = op_rule.process(o, dpp, y); if (ret < 0) { - ldpp_dout(wk->get_lc(), 20) + ldpp_dout(dpp, 20) << "ERROR: orule.process() returned ret=" << ret << " bucket=" << bucket_name << dendl; } }; - worker->workpool->setf(pf); multimap& prefix_map = config.get_prefix_map(); ldpp_dout(this, 10) << __func__ << "() prefix_map size=" @@ -1879,8 +1711,10 @@ int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker, rgw_bucket_dir_entry* o{nullptr}; for (auto offset = 0; ol.get_obj(this, &o /* , fetch_barrier */); ++offset, ol.next()) { orule.update(); - std::tuple t1 = {orule, *o}; - worker->workpool->enqueue(WorkItem{t1}); + workpool.spawn([&pf, dpp=this, orule, o=*o] + (boost::asio::yield_context yield) mutable { + pf(dpp, yield, orule, o); + }); if ((offset % 100) == 0) { if (worker_should_stop(stop_at, once)) { ldpp_dout(this, 5) << __func__ << " interval budget EXPIRED worker=" @@ -1890,10 +1724,38 @@ int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker, } } } - worker->workpool->drain(); } - ret = handle_multipart_expiration(bucket.get(), prefix_map, worker, stop_at, once); + ret = handle_multipart_expiration(bucket.get(), prefix_map, workpool, + worker, stop_at, once); + return ret; +} + +int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker, + time_t stop_at, bool once) +{ + int ret = 0; + + // spawn a coroutine for bucket_lc_process() so it can use spawn_throttle + // for concurrent operations + boost::asio::io_context context; + boost::asio::spawn(context, + [this, &shard_id, worker, stop_at, once] (boost::asio::yield_context yield) { + return bucket_lc_process(shard_id, worker, stop_at, once, yield); + }, + [&ret] (std::exception_ptr eptr, int result) { + if (eptr) { + std::rethrow_exception(eptr); + } else { + ret = result; + } + }); + + // warn about any blocking operations called from this coroutine + auto enable_warnings = warn_about_blocking_in_scope{}; + + context.run(); + return ret; } @@ -2665,7 +2527,6 @@ int RGWLC::LCWorker::schedule_next_start_time(utime_t &start, utime_t& now) RGWLC::LCWorker::~LCWorker() { - delete workpool; } /* ~LCWorker */ list RGWLifecycleConfiguration::generate_test_instances() diff --git a/src/rgw/rgw_lc.h b/src/rgw/rgw_lc.h index 533ef871594..1b9def49214 100644 --- a/src/rgw/rgw_lc.h +++ b/src/rgw/rgw_lc.h @@ -562,6 +562,8 @@ public: }; WRITE_CLASS_ENCODER(RGWLifecycleConfiguration) +namespace ceph::async { class spawn_throttle; } + class RGWLC : public DoutPrefixProvider { CephContext *cct; rgw::sal::Driver* driver; @@ -574,8 +576,6 @@ class RGWLC : public DoutPrefixProvider { public: - class WorkPool; - class LCWorker : public Thread { const DoutPrefixProvider *dpp; @@ -584,7 +584,6 @@ public: int ix; std::mutex lock; std::condition_variable cond; - WorkPool* workpool{nullptr}; /* save the target bucket names created as part of object transition * to cloud. This list is maintained for the duration of each RGWLC::process() * post which it is discarded. */ @@ -612,7 +611,6 @@ public: friend class RGWRados; friend class RGWLC; - friend class WorkQ; }; /* LCWorker */ friend class RGWRados; @@ -647,6 +645,8 @@ public: int list_lc_progress(std::string& marker, uint32_t max_entries, std::vector&, int& index); + int bucket_lc_process(std::string& shard_id, LCWorker* worker, time_t stop_at, + bool once, boost::asio::yield_context yield); int bucket_lc_process(std::string& shard_id, LCWorker* worker, time_t stop_at, bool once); int bucket_lc_post(int index, int max_lock_sec, @@ -673,6 +673,7 @@ public: int handle_multipart_expiration(rgw::sal::Bucket* target, const std::multimap& prefix_map, + ceph::async::spawn_throttle& workpool, LCWorker* worker, time_t stop_at, bool once); }; -- 2.39.5