]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
supports lc aio using coroutines
authorKarthik Keshavamurthy <kkeshava@akamai.com>
Mon, 2 Jun 2025 18:48:49 +0000 (14:48 -0400)
committerCasey Bodley <cbodley@redhat.com>
Tue, 21 Oct 2025 13:08:04 +0000 (09:08 -0400)
Signed-off-by: mheler <mheler@akamai.com>
src/common/options/rgw.yaml.in
src/rgw/rgw_lc.cc

index 309e7639297b9a8862add1de3f3d81c0e0b6177d..2fda1086d0e468dc87313e5ef5886e721f951f95 100644 (file)
@@ -450,6 +450,16 @@ options:
   services:
   - rgw
   with_legacy: true
+- name: rgw_lc_wp_worker_max_aio
+  type: int
+  level: advanced
+  desc: Max number of concurrent lifecycle handlings per workpool thread.
+  default: 1
+  services:
+  - rgw
+  min: 1
+  max: 128
+  with_legacy: true
 - name: rgw_lc_max_objs
   type: int
   level: advanced
index cd915c4ee80a453d4aa7a5897ad318d53b9adee4..4397c5b550262bc5b0709fd632a1ebf811cf6df1 100644 (file)
@@ -12,6 +12,7 @@
 #include <boost/algorithm/string/split.hpp>
 #include <boost/algorithm/string.hpp>
 #include <boost/algorithm/string/predicate.hpp>
+#include <boost/asio/spawn.hpp>
 #include <boost/variant.hpp>
 
 #include "include/scope_guard.h"
@@ -303,13 +304,13 @@ static bool obj_has_expired(
   return (timediff >= cmp);
 }
 
-static bool pass_object_lock_check(rgw::sal::Driver* driver, rgw::sal::Object* obj, const DoutPrefixProvider *dpp)
+static bool pass_object_lock_check(rgw::sal::Driver* driver, rgw::sal::Object* obj, const DoutPrefixProvider *dpp, optional_yield y)
 {
   if (!obj->get_bucket()->get_info().obj_lock_enabled()) {
     return true;
   }
   std::unique_ptr<rgw::sal::Object::ReadOp> read_op = obj->get_read_op();
-  int ret = read_op->prepare(null_yield, dpp);
+  int ret = read_op->prepare(y, dpp);
   if (ret < 0) {
     if (ret == -ENOENT) {
       return true;
@@ -697,7 +698,7 @@ class LCOpAction {
 public:
   virtual ~LCOpAction() {}
 
-  virtual bool check(lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp) {
+  virtual bool check(lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp, optional_yield y) {
     return false;
   }
 
@@ -716,7 +717,7 @@ public:
     return true;
   }
 
-  virtual int process(lc_op_ctx& oc) {
+  virtual int process(lc_op_ctx& oc, optional_yield y) {
     return 0;
   }
 
@@ -726,7 +727,7 @@ public:
 class LCOpFilter {
 public:
 virtual ~LCOpFilter() {}
-  virtual bool check(const DoutPrefixProvider *dpp, lc_op_ctx& oc) {
+  virtual bool check(const DoutPrefixProvider *dpp, lc_op_ctx& oc, optional_yield y) {
     return false;
   }
 }; /* LCOpFilter */
@@ -756,7 +757,7 @@ public:
   void build();
   void update();
   int process(rgw_bucket_dir_entry& o, const DoutPrefixProvider *dpp,
-             WorkQ* wq);
+             WorkQ* wq, optional_yield y);
 }; /* LCOpRule */
 
 using WorkItem =
@@ -771,8 +772,8 @@ class WorkQ : public Thread
 {
 public:
   using unique_lock = std::unique_lock<std::mutex>;
-  using work_f = std::function<void(RGWLC::LCWorker*, WorkQ*, WorkItem&)>;
-  using dequeue_result = std::variant<void*, WorkItem>;
+  using work_f = std::function<void(RGWLC::LCWorker*, WorkQ*, WorkItem&, optional_yield)>;
+  using dequeue_result = std::list<WorkItem>;
 
   static constexpr uint32_t FLAG_NONE =        0x0000;
   static constexpr uint32_t FLAG_EWAIT_SYNC =  0x0001;
@@ -780,14 +781,14 @@ public:
   static constexpr uint32_t FLAG_EDRAIN_SYNC = 0x0004;
 
 private:
-  const work_f bsf = [](RGWLC::LCWorker* wk, WorkQ* wq, WorkItem& wi) {};
+  const work_f bsf = [](RGWLC::LCWorker* wk, WorkQ* wq, WorkItem& wi, optional_yield) {};
   RGWLC::LCWorker* wk;
   uint32_t qmax;
   int ix;
   std::mutex mtx;
   std::condition_variable cv;
   uint32_t flags;
-  vector<WorkItem> items;
+  std::list<WorkItem> items;
   work_f f;
 
 public:
@@ -829,7 +830,7 @@ public:
   }
 
 private:
-  dequeue_result dequeue() {
+  dequeue_result dequeue(size_t max_items=1) {
     unique_lock uniq(mtx);
     while ((!wk->get_lc()->going_down()) &&
           (items.size() == 0)) {
@@ -841,25 +842,39 @@ private:
       cv.wait_for(uniq, 200ms);
     }
     if (items.size() > 0) {
-      auto item = items.back();
-      items.pop_back();
+      size_t split_size = std::min(max_items, items.size());
+      dequeue_result result;
+      result.splice(result.begin(), items, items.begin(), std::next(items.begin(), split_size));
       if (flags & FLAG_EWAIT_SYNC) {
        flags &= ~FLAG_EWAIT_SYNC;
        cv.notify_one();
       }
-      return {item};
+      return result;
     }
-    return nullptr;
+    return dequeue_result{};
   }
 
   void* entry() override {
     while (!wk->get_lc()->going_down()) {
-      auto item = dequeue();
-      if (item.index() == 0) {
-       /* going down */
-       break;
+      boost::asio::io_context context;
+      for(auto& item : items) {
+        if(item.index() != 0) {
+          boost::asio::spawn(context, [&](boost::asio::yield_context yield) {
+            try {
+              optional_yield y(yield);
+              f(wk, this, item, y);
+            } catch (const std::exception& e) {
+              ldpp_dout(wk->dpp, 0) << "Coroutine error: " << e.what() << dendl;
+            }
+          });
+        }
+      }
+      try {
+        context.run();
+      } catch (const std::system_error& e) {
+        ldpp_dout(wk->dpp, 0) << "ERROR: WorkQ context run returned error r="
+                              << -e.code().value() << dendl;
       }
-      f(wk, this, std::get<WorkItem>(item));
     }
     return nullptr;
   }
@@ -1039,11 +1054,11 @@ int RGWLC::handle_multipart_expiration(rgw::sal::Bucket* target,
   return 0;
 } /* RGWLC::handle_multipart_expiration */
 
-static int read_obj_tags(const DoutPrefixProvider *dpp, rgw::sal::Object* obj, bufferlist& tags_bl)
+static int read_obj_tags(const DoutPrefixProvider *dpp, rgw::sal::Object* obj, bufferlist& tags_bl, optional_yield y)
 {
   std::unique_ptr<rgw::sal::Object::ReadOp> rop = obj->get_read_op();
 
-  return rop->get_attr(dpp, RGW_ATTR_TAGS, tags_bl, null_yield);
+  return rop->get_attr(dpp, RGW_ATTR_TAGS, tags_bl, y);
 }
 
 static bool is_valid_op(const lc_op& op)
@@ -1089,7 +1104,7 @@ static inline bool has_all_tags(const lc_op& rule_action,
   return tag_count == rule_action.obj_tags->count();
 }
 
-static int check_tags(const DoutPrefixProvider *dpp, lc_op_ctx& oc, bool *skip)
+static int check_tags(const DoutPrefixProvider *dpp, lc_op_ctx& oc, bool *skip, optional_yield y)
 {
   auto& op = oc.op;
 
@@ -1097,7 +1112,7 @@ static int check_tags(const DoutPrefixProvider *dpp, lc_op_ctx& oc, bool *skip)
     *skip = true;
 
     bufferlist tags_bl;
-    int ret = read_obj_tags(dpp, oc.obj.get(), tags_bl);
+    int ret = read_obj_tags(dpp, oc.obj.get(), tags_bl, y);
     if (ret < 0) {
       if (ret != -ENODATA) {
         ldpp_dout(oc.dpp, 5) << "ERROR: read_obj_tags returned r="
@@ -1129,7 +1144,7 @@ static int check_tags(const DoutPrefixProvider *dpp, lc_op_ctx& oc, bool *skip)
 
 class LCOpFilter_Tags : public LCOpFilter {
 public:
-  bool check(const DoutPrefixProvider *dpp, lc_op_ctx& oc) override {
+  bool check(const DoutPrefixProvider *dpp, lc_op_ctx& oc, optional_yield y) override {
     auto& o = oc.o;
 
     if (o.is_delete_marker()) {
@@ -1138,7 +1153,7 @@ public:
 
     bool skip;
 
-    int ret = check_tags(dpp, oc, &skip);
+    int ret = check_tags(dpp, oc, &skip, y);
     if (ret < 0) {
       if (ret == -ENOENT) {
         return false;
@@ -1157,7 +1172,7 @@ class LCOpAction_CurrentExpiration : public LCOpAction {
 public:
   LCOpAction_CurrentExpiration(op_env& env) {}
 
-  bool check(lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp) override {
+  bool check(lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp, optional_yield y) override {
     auto& o = oc.o;
     if (!o.is_current()) {
       ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key
@@ -1252,7 +1267,7 @@ public:
   LCOpAction_NonCurrentExpiration(op_env& env)
     {}
 
-  bool check(lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp) override {
+  bool check(lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp, optional_yield y) override {
     auto& o = oc.o;
     if (o.is_current()) {
       ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key
@@ -1305,7 +1320,7 @@ class LCOpAction_DMExpiration : public LCOpAction {
 public:
   LCOpAction_DMExpiration(op_env& env) {}
 
-  bool check(lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp) override {
+  bool check(lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp, optional_yield y) override {
     auto& o = oc.o;
     if (!o.is_delete_marker()) {
       ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key
@@ -1360,7 +1375,7 @@ public:
   LCOpAction_Transition(const transition_action& _transition)
     : transition(_transition) {}
 
-  bool check(lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp) override {
+  bool check(lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp, optional_yield y) override {
     auto& o = oc.o;
 
     if (o.is_delete_marker()) {
@@ -1405,7 +1420,7 @@ public:
     return need_to_process;
   }
 
-  int delete_tier_obj(lc_op_ctx& oc) {
+  int delete_tier_obj(lc_op_ctx& oc, optional_yield y) {
     int ret = 0;
 
     /* If bucket has versioning enabled, create delete_marker for current version
@@ -1479,7 +1494,7 @@ public:
     }
 
     if (delete_object) {
-      ret = delete_tier_obj(oc);
+      ret = delete_tier_obj(oc, y);
       if (ret < 0) {
         ldpp_dout(oc.dpp, 0) << "ERROR: Deleting tier object(" << oc.o.key << ") failed ret=" << ret << dendl;
         return ret;
@@ -1554,13 +1569,13 @@ public:
     if (!r && oc.tier->is_tier_type_s3()) {
       ldpp_dout(oc.dpp, 30) << "Found cloud s3 tier: " << target_placement.storage_class << dendl;
       if (!oc.o.is_current() &&
-          !pass_object_lock_check(oc.driver, oc.obj.get(), oc.dpp)) {
+          !pass_object_lock_check(oc.driver, oc.obj.get(), oc.dpp, y)) {
         /* Skip objects which has object lock enabled. */
         ldpp_dout(oc.dpp, 10) << "Object(key:" << oc.o.key << ") is locked. Skipping transition to cloud-s3 tier: " << target_placement.storage_class << dendl;
         return 0;
       }
 
-      r = transition_obj_to_cloud(oc);
+      r = transition_obj_to_cloud(oc, y);
       if (r < 0) {
         ldpp_dout(oc.dpp, 0) << "ERROR: failed to transition obj(key:" << oc.o.key << ") to cloud (r=" << r << ")"
                              << dendl;
@@ -1692,7 +1707,7 @@ void LCOpRule::update()
 
 int LCOpRule::process(rgw_bucket_dir_entry& o,
                      const DoutPrefixProvider *dpp,
-                     WorkQ* wq)
+                     WorkQ* wq, optional_yield y)
 {
   lc_op_ctx ctx(env, o, next_key_name, num_noncurrent, effective_mtime, dpp, wq);
   shared_ptr<LCOpAction> *selected = nullptr; // n.b., req'd by sharing
@@ -1701,7 +1716,7 @@ int LCOpRule::process(rgw_bucket_dir_entry& o,
   for (auto& a : actions) {
     real_time action_exp;
 
-    if (a->check(ctx, &action_exp, dpp)) {
+    if (a->check(ctx, &action_exp, dpp, y)) {
       if (action_exp > exp) {
         exp = action_exp;
         selected = &a;
@@ -1723,7 +1738,7 @@ int LCOpRule::process(rgw_bucket_dir_entry& o,
 
     bool cont = false;
     for (auto& f : filters) {
-      if (f->check(dpp, ctx)) {
+      if (f->check(dpp, ctx, y)) {
         cont = true;
         break;
       }
@@ -1736,7 +1751,7 @@ int LCOpRule::process(rgw_bucket_dir_entry& o,
       return 0;
     }
 
-    int r = (*selected)->process(ctx);
+    int r = (*selected)->process(ctx, y);
     if (r < 0) {
       ldpp_dout(dpp, 0) << "ERROR: remove_expired_obj " 
                        << env.bucket << ":" << o.key
@@ -1822,7 +1837,7 @@ int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker,
     ldpp_dout(wk->get_lc(), 20)
       << __func__ << "(): key=" << o.key << wq->thr_name() 
       << dendl;
-    int ret = op_rule.process(o, wk->dpp, wq);
+    int ret = op_rule.process(o, wk->dpp, wq, y);
     if (ret < 0) {
       ldpp_dout(wk->get_lc(), 20)
        << "ERROR: orule.process() returned ret=" << ret