From 0fa436b3360936ea51459e68f8c0d5b31d7382f2 Mon Sep 17 00:00:00 2001 From: Karthik Keshavamurthy Date: Mon, 2 Jun 2025 14:48:49 -0400 Subject: [PATCH] supports lc aio using coroutines Signed-off-by: mheler --- src/common/options/rgw.yaml.in | 10 ++++ src/rgw/rgw_lc.cc | 93 ++++++++++++++++++++-------------- 2 files changed, 64 insertions(+), 39 deletions(-) diff --git a/src/common/options/rgw.yaml.in b/src/common/options/rgw.yaml.in index 309e7639297..2fda1086d0e 100644 --- a/src/common/options/rgw.yaml.in +++ b/src/common/options/rgw.yaml.in @@ -450,6 +450,16 @@ options: services: - rgw with_legacy: true +- name: rgw_lc_wp_worker_max_aio + type: int + level: advanced + desc: Max number of concurrent lifecycle handlings per workpool thread. + default: 1 + services: + - rgw + min: 1 + max: 128 + with_legacy: true - name: rgw_lc_max_objs type: int level: advanced diff --git a/src/rgw/rgw_lc.cc b/src/rgw/rgw_lc.cc index cd915c4ee80..4397c5b5502 100644 --- a/src/rgw/rgw_lc.cc +++ b/src/rgw/rgw_lc.cc @@ -12,6 +12,7 @@ #include #include #include +#include #include #include "include/scope_guard.h" @@ -303,13 +304,13 @@ static bool obj_has_expired( return (timediff >= cmp); } -static bool pass_object_lock_check(rgw::sal::Driver* driver, rgw::sal::Object* obj, const DoutPrefixProvider *dpp) +static bool pass_object_lock_check(rgw::sal::Driver* driver, rgw::sal::Object* obj, const DoutPrefixProvider *dpp, optional_yield y) { if (!obj->get_bucket()->get_info().obj_lock_enabled()) { return true; } std::unique_ptr read_op = obj->get_read_op(); - int ret = read_op->prepare(null_yield, dpp); + int ret = read_op->prepare(y, dpp); if (ret < 0) { if (ret == -ENOENT) { return true; @@ -697,7 +698,7 @@ class LCOpAction { public: virtual ~LCOpAction() {} - virtual bool check(lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp) { + virtual bool check(lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp, optional_yield y) { return false; } @@ -716,7 +717,7 @@ public: return true; } - virtual int process(lc_op_ctx& oc) { + virtual int process(lc_op_ctx& oc, optional_yield y) { return 0; } @@ -726,7 +727,7 @@ public: class LCOpFilter { public: virtual ~LCOpFilter() {} - virtual bool check(const DoutPrefixProvider *dpp, lc_op_ctx& oc) { + virtual bool check(const DoutPrefixProvider *dpp, lc_op_ctx& oc, optional_yield y) { return false; } }; /* LCOpFilter */ @@ -756,7 +757,7 @@ public: void build(); void update(); int process(rgw_bucket_dir_entry& o, const DoutPrefixProvider *dpp, - WorkQ* wq); + WorkQ* wq, optional_yield y); }; /* LCOpRule */ using WorkItem = @@ -771,8 +772,8 @@ class WorkQ : public Thread { public: using unique_lock = std::unique_lock; - using work_f = std::function; - using dequeue_result = std::variant; + using work_f = std::function; + using dequeue_result = std::list; static constexpr uint32_t FLAG_NONE = 0x0000; static constexpr uint32_t FLAG_EWAIT_SYNC = 0x0001; @@ -780,14 +781,14 @@ public: static constexpr uint32_t FLAG_EDRAIN_SYNC = 0x0004; private: - const work_f bsf = [](RGWLC::LCWorker* wk, WorkQ* wq, WorkItem& wi) {}; + const work_f bsf = [](RGWLC::LCWorker* wk, WorkQ* wq, WorkItem& wi, optional_yield) {}; RGWLC::LCWorker* wk; uint32_t qmax; int ix; std::mutex mtx; std::condition_variable cv; uint32_t flags; - vector items; + std::list items; work_f f; public: @@ -829,7 +830,7 @@ public: } private: - dequeue_result dequeue() { + dequeue_result dequeue(size_t max_items=1) { unique_lock uniq(mtx); while ((!wk->get_lc()->going_down()) && (items.size() == 0)) { @@ -841,25 +842,39 @@ private: cv.wait_for(uniq, 200ms); } if (items.size() > 0) { - auto item = items.back(); - items.pop_back(); + size_t split_size = std::min(max_items, items.size()); + dequeue_result result; + result.splice(result.begin(), items, items.begin(), std::next(items.begin(), split_size)); if (flags & FLAG_EWAIT_SYNC) { flags &= ~FLAG_EWAIT_SYNC; cv.notify_one(); } - return {item}; + return result; } - return nullptr; + return dequeue_result{}; } void* entry() override { while (!wk->get_lc()->going_down()) { - auto item = dequeue(); - if (item.index() == 0) { - /* going down */ - break; + boost::asio::io_context context; + for(auto& item : items) { + if(item.index() != 0) { + boost::asio::spawn(context, [&](boost::asio::yield_context yield) { + try { + optional_yield y(yield); + f(wk, this, item, y); + } catch (const std::exception& e) { + ldpp_dout(wk->dpp, 0) << "Coroutine error: " << e.what() << dendl; + } + }); + } + } + try { + context.run(); + } catch (const std::system_error& e) { + ldpp_dout(wk->dpp, 0) << "ERROR: WorkQ context run returned error r=" + << -e.code().value() << dendl; } - f(wk, this, std::get(item)); } return nullptr; } @@ -1039,11 +1054,11 @@ int RGWLC::handle_multipart_expiration(rgw::sal::Bucket* target, return 0; } /* RGWLC::handle_multipart_expiration */ -static int read_obj_tags(const DoutPrefixProvider *dpp, rgw::sal::Object* obj, bufferlist& tags_bl) +static int read_obj_tags(const DoutPrefixProvider *dpp, rgw::sal::Object* obj, bufferlist& tags_bl, optional_yield y) { std::unique_ptr rop = obj->get_read_op(); - return rop->get_attr(dpp, RGW_ATTR_TAGS, tags_bl, null_yield); + return rop->get_attr(dpp, RGW_ATTR_TAGS, tags_bl, y); } static bool is_valid_op(const lc_op& op) @@ -1089,7 +1104,7 @@ static inline bool has_all_tags(const lc_op& rule_action, return tag_count == rule_action.obj_tags->count(); } -static int check_tags(const DoutPrefixProvider *dpp, lc_op_ctx& oc, bool *skip) +static int check_tags(const DoutPrefixProvider *dpp, lc_op_ctx& oc, bool *skip, optional_yield y) { auto& op = oc.op; @@ -1097,7 +1112,7 @@ static int check_tags(const DoutPrefixProvider *dpp, lc_op_ctx& oc, bool *skip) *skip = true; bufferlist tags_bl; - int ret = read_obj_tags(dpp, oc.obj.get(), tags_bl); + int ret = read_obj_tags(dpp, oc.obj.get(), tags_bl, y); if (ret < 0) { if (ret != -ENODATA) { ldpp_dout(oc.dpp, 5) << "ERROR: read_obj_tags returned r=" @@ -1129,7 +1144,7 @@ static int check_tags(const DoutPrefixProvider *dpp, lc_op_ctx& oc, bool *skip) class LCOpFilter_Tags : public LCOpFilter { public: - bool check(const DoutPrefixProvider *dpp, lc_op_ctx& oc) override { + bool check(const DoutPrefixProvider *dpp, lc_op_ctx& oc, optional_yield y) override { auto& o = oc.o; if (o.is_delete_marker()) { @@ -1138,7 +1153,7 @@ public: bool skip; - int ret = check_tags(dpp, oc, &skip); + int ret = check_tags(dpp, oc, &skip, y); if (ret < 0) { if (ret == -ENOENT) { return false; @@ -1157,7 +1172,7 @@ class LCOpAction_CurrentExpiration : public LCOpAction { public: LCOpAction_CurrentExpiration(op_env& env) {} - bool check(lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp) override { + bool check(lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp, optional_yield y) override { auto& o = oc.o; if (!o.is_current()) { ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key @@ -1252,7 +1267,7 @@ public: LCOpAction_NonCurrentExpiration(op_env& env) {} - bool check(lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp) override { + bool check(lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp, optional_yield y) override { auto& o = oc.o; if (o.is_current()) { ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key @@ -1305,7 +1320,7 @@ class LCOpAction_DMExpiration : public LCOpAction { public: LCOpAction_DMExpiration(op_env& env) {} - bool check(lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp) override { + bool check(lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp, optional_yield y) override { auto& o = oc.o; if (!o.is_delete_marker()) { ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key @@ -1360,7 +1375,7 @@ public: LCOpAction_Transition(const transition_action& _transition) : transition(_transition) {} - bool check(lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp) override { + bool check(lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp, optional_yield y) override { auto& o = oc.o; if (o.is_delete_marker()) { @@ -1405,7 +1420,7 @@ public: return need_to_process; } - int delete_tier_obj(lc_op_ctx& oc) { + int delete_tier_obj(lc_op_ctx& oc, optional_yield y) { int ret = 0; /* If bucket has versioning enabled, create delete_marker for current version @@ -1479,7 +1494,7 @@ public: } if (delete_object) { - ret = delete_tier_obj(oc); + ret = delete_tier_obj(oc, y); if (ret < 0) { ldpp_dout(oc.dpp, 0) << "ERROR: Deleting tier object(" << oc.o.key << ") failed ret=" << ret << dendl; return ret; @@ -1554,13 +1569,13 @@ public: if (!r && oc.tier->is_tier_type_s3()) { ldpp_dout(oc.dpp, 30) << "Found cloud s3 tier: " << target_placement.storage_class << dendl; if (!oc.o.is_current() && - !pass_object_lock_check(oc.driver, oc.obj.get(), oc.dpp)) { + !pass_object_lock_check(oc.driver, oc.obj.get(), oc.dpp, y)) { /* Skip objects which has object lock enabled. */ ldpp_dout(oc.dpp, 10) << "Object(key:" << oc.o.key << ") is locked. Skipping transition to cloud-s3 tier: " << target_placement.storage_class << dendl; return 0; } - r = transition_obj_to_cloud(oc); + r = transition_obj_to_cloud(oc, y); if (r < 0) { ldpp_dout(oc.dpp, 0) << "ERROR: failed to transition obj(key:" << oc.o.key << ") to cloud (r=" << r << ")" << dendl; @@ -1692,7 +1707,7 @@ void LCOpRule::update() int LCOpRule::process(rgw_bucket_dir_entry& o, const DoutPrefixProvider *dpp, - WorkQ* wq) + WorkQ* wq, optional_yield y) { lc_op_ctx ctx(env, o, next_key_name, num_noncurrent, effective_mtime, dpp, wq); shared_ptr *selected = nullptr; // n.b., req'd by sharing @@ -1701,7 +1716,7 @@ int LCOpRule::process(rgw_bucket_dir_entry& o, for (auto& a : actions) { real_time action_exp; - if (a->check(ctx, &action_exp, dpp)) { + if (a->check(ctx, &action_exp, dpp, y)) { if (action_exp > exp) { exp = action_exp; selected = &a; @@ -1723,7 +1738,7 @@ int LCOpRule::process(rgw_bucket_dir_entry& o, bool cont = false; for (auto& f : filters) { - if (f->check(dpp, ctx)) { + if (f->check(dpp, ctx, y)) { cont = true; break; } @@ -1736,7 +1751,7 @@ int LCOpRule::process(rgw_bucket_dir_entry& o, return 0; } - int r = (*selected)->process(ctx); + int r = (*selected)->process(ctx, y); if (r < 0) { ldpp_dout(dpp, 0) << "ERROR: remove_expired_obj " << env.bucket << ":" << o.key @@ -1822,7 +1837,7 @@ int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker, ldpp_dout(wk->get_lc(), 20) << __func__ << "(): key=" << o.key << wq->thr_name() << dendl; - int ret = op_rule.process(o, wk->dpp, wq); + int ret = op_rule.process(o, wk->dpp, wq, y); if (ret < 0) { ldpp_dout(wk->get_lc(), 20) << "ERROR: orule.process() returned ret=" << ret -- 2.39.5