]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/lc: replace WorkPool with ceph::async::spawn_throttle
authorCasey Bodley <cbodley@redhat.com>
Thu, 5 Jun 2025 19:53:31 +0000 (15:53 -0400)
committerCasey Bodley <cbodley@redhat.com>
Tue, 21 Oct 2025 13:08:04 +0000 (09:08 -0400)
use spawn_throttle to spawn the work functions as coroutines instead of
passing WorkItems to separate WorkQ threads for processing. the
spawn_throttle concurrency limit uses the same rgw_lc_max_wp_worker
value that previously controlled the number of WorkQ threads

Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/common/options/rgw.yaml.in
src/rgw/rgw_lc.cc
src/rgw/rgw_lc.h

index 2fda1086d0e468dc87313e5ef5886e721f951f95..309e7639297b9a8862add1de3f3d81c0e0b6177d 100644 (file)
@@ -450,16 +450,6 @@ options:
   services:
   - rgw
   with_legacy: true
-- name: rgw_lc_wp_worker_max_aio
-  type: int
-  level: advanced
-  desc: Max number of concurrent lifecycle handlings per workpool thread.
-  default: 1
-  services:
-  - rgw
-  min: 1
-  max: 128
-  with_legacy: true
 - name: rgw_lc_max_objs
   type: int
   level: advanced
index e9f4fca9f0765bc3a7e2cbba069a687ffd742327..4c37f40e97b336aafae9f0a152dcd7c8f418beaa 100644 (file)
 #include <boost/algorithm/string/split.hpp>
 #include <boost/algorithm/string.hpp>
 #include <boost/algorithm/string/predicate.hpp>
-#include <boost/asio/spawn.hpp>
-#include <boost/variant.hpp>
 
 #include "include/scope_guard.h"
 #include "include/function2.hpp"
 #include "common/Clock.h" // for ceph_clock_now()
+#include "common/async/spawn_throttle.h"
 #include "common/Formatter.h"
 #include "common/containers.h"
 #include "common/split.h"
@@ -25,6 +24,7 @@
 #include "include/random.h"
 #include "cls/lock/cls_lock_client.h"
 #include "rgw_perf_counters.h"
+#include "rgw_asio_thread.h"
 #include "rgw_common.h"
 #include "rgw_bucket.h"
 #include "rgw_bucket_layout.h"
@@ -486,7 +486,6 @@ struct op_env {
 }; /* op_env */
 
 class LCRuleOp;
-class WorkQ;
 
 struct lc_op_ctx {
   CephContext *cct;
@@ -761,173 +760,10 @@ public:
              optional_yield y);
 }; /* LCOpRule */
 
-using WorkItem =
-  std::variant<void*,
-              /* out-of-line delete */
-              std::tuple<LCOpRule, rgw_bucket_dir_entry>,
-              /* uncompleted MPU expiration */
-              std::tuple<lc_op, rgw_bucket_dir_entry>,
-              rgw_bucket_dir_entry>;
-
-class WorkQ : public Thread
-{
-public:
-  using unique_lock = std::unique_lock<std::mutex>;
-  using work_f = std::function<void(RGWLC::LCWorker*, WorkItem&, optional_yield)>;
-  using dequeue_result = std::list<WorkItem>;
-
-  static constexpr uint32_t FLAG_NONE =        0x0000;
-  static constexpr uint32_t FLAG_EWAIT_SYNC =  0x0001;
-  static constexpr uint32_t FLAG_DWAIT_SYNC =  0x0002;
-  static constexpr uint32_t FLAG_EDRAIN_SYNC = 0x0004;
-
-private:
-  const work_f bsf = [](RGWLC::LCWorker* wk, WorkItem& wi, optional_yield) {};
-  RGWLC::LCWorker* wk;
-  uint32_t qmax;
-  int ix;
-  std::mutex mtx;
-  std::condition_variable cv;
-  uint32_t flags;
-  std::list<WorkItem> items;
-  work_f f;
-
-public:
-  WorkQ(RGWLC::LCWorker* wk, uint32_t ix, uint32_t qmax)
-    : wk(wk), qmax(qmax), ix(ix), flags(FLAG_NONE), f(bsf)
-    {
-      create(thr_name().c_str());
-    }
-
-  std::string thr_name() {
-    return std::string{"wp_thrd: "}
-    + std::to_string(wk->ix) + ", " + std::to_string(ix);
-  }
-
-  void setf(work_f _f) {
-    f = _f;
-  }
-
-  void enqueue(WorkItem&& item) {
-    unique_lock uniq(mtx);
-    while ((!wk->get_lc()->going_down()) &&
-          (items.size() > qmax)) {
-      flags |= FLAG_EWAIT_SYNC;
-      cv.wait_for(uniq, 200ms);
-    }
-    items.push_back(item);
-    if (flags & FLAG_DWAIT_SYNC) {
-      flags &= ~FLAG_DWAIT_SYNC;
-      cv.notify_one();
-    }
-  }
-
-  void drain() {
-    unique_lock uniq(mtx);
-    flags |= FLAG_EDRAIN_SYNC;
-    while (flags & FLAG_EDRAIN_SYNC) {
-      cv.wait_for(uniq, 200ms);
-    }
-  }
-
-private:
-  dequeue_result dequeue(size_t max_items=1) {
-    unique_lock uniq(mtx);
-    while ((!wk->get_lc()->going_down()) &&
-          (items.size() == 0)) {
-      /* clear drain state, as we are NOT doing work and qlen==0 */
-      if (flags & FLAG_EDRAIN_SYNC) {
-       flags &= ~FLAG_EDRAIN_SYNC;
-      }
-      flags |= FLAG_DWAIT_SYNC;
-      cv.wait_for(uniq, 200ms);
-    }
-    if (items.size() > 0) {
-      size_t split_size = std::min(max_items, items.size());
-      dequeue_result result;
-      result.splice(result.begin(), items, items.begin(), std::next(items.begin(), split_size));
-      if (flags & FLAG_EWAIT_SYNC) {
-       flags &= ~FLAG_EWAIT_SYNC;
-       cv.notify_one();
-      }
-      return result;
-    }
-    return dequeue_result{};
-  }
-
-  void* entry() override {
-    while (!wk->get_lc()->going_down()) {
-      boost::asio::io_context context;
-      for(auto& item : items) {
-        if(item.index() != 0) {
-          boost::asio::spawn(context, [&](boost::asio::yield_context yield) {
-            try {
-              optional_yield y(yield);
-              f(wk, item, y);
-            } catch (const std::exception& e) {
-              ldpp_dout(wk->dpp, 0) << "Coroutine error: " << e.what() << dendl;
-            }
-          });
-        }
-      }
-      try {
-        context.run();
-      } catch (const std::system_error& e) {
-        ldpp_dout(wk->dpp, 0) << "ERROR: WorkQ context run returned error r="
-                              << -e.code().value() << dendl;
-      }
-    }
-    return nullptr;
-  }
-}; /* WorkQ */
-
-class RGWLC::WorkPool
-{
-  using TVector = ceph::containers::tiny_vector<WorkQ, 3>;
-  TVector wqs;
-  uint64_t ix;
-
-public:
-  WorkPool(RGWLC::LCWorker* wk, uint16_t n_threads, uint32_t qmax)
-    : wqs(TVector{
-       n_threads,
-       [&](const size_t ix, auto emplacer) {
-         emplacer.emplace(wk, ix, qmax);
-       }}),
-      ix(0)
-    {}
-
-  ~WorkPool() {
-    for (auto& wq : wqs) {
-      wq.join();
-    }
-  }
-
-  void setf(WorkQ::work_f _f) {
-    for (auto& wq : wqs) {
-      wq.setf(_f);
-    }
-  }
-
-  void enqueue(WorkItem item) {
-    const auto tix = ix;
-    ix = (ix+1) % wqs.size();
-    (wqs[tix]).enqueue(std::move(item));
-  }
-
-  void drain() {
-    for (auto& wq : wqs) {
-      wq.drain();
-    }
-  }
-}; /* WorkPool */
-
 RGWLC::LCWorker::LCWorker(const DoutPrefixProvider* dpp, CephContext *cct,
                          RGWLC *lc, int ix)
   : dpp(dpp), cct(cct), lc(lc), ix(ix)
 {
-  auto wpw = cct->_conf.get_val<int64_t>("rgw_lc_max_wp_worker");
-  workpool = new WorkPool(this, wpw, 512);
 }
 
 static inline bool worker_should_stop(time_t stop_at, bool once)
@@ -937,6 +773,7 @@ static inline bool worker_should_stop(time_t stop_at, bool once)
 
 int RGWLC::handle_multipart_expiration(rgw::sal::Bucket* target,
                                       const multimap<string, lc_op>& prefix_map,
+                                      ceph::async::spawn_throttle& workpool,
                                       LCWorker* worker, time_t stop_at, bool once)
 {
   int ret;
@@ -956,10 +793,9 @@ int RGWLC::handle_multipart_expiration(rgw::sal::Bucket* target,
   params.ns = RGW_OBJ_NS_MULTIPART;
   params.access_list_filter = MultipartMetaFilter;
 
-  auto pf = [&](RGWLC::LCWorker *wk, WorkItem &wi, optional_yield y) {
+  auto pf = [this, target] (optional_yield y, const lc_op& rule,
+                            const rgw_bucket_dir_entry& obj) {
     int ret{0};
-    auto wt = std::get<std::tuple<lc_op, rgw_bucket_dir_entry>>(wi);
-    auto& [rule, obj] = wt;
 
     if (obj_has_expired(this, cct, obj.meta.mtime, rule.mp_expiration)) {
       rgw_obj_key key(obj.key);
@@ -987,21 +823,17 @@ int RGWLC::handle_multipart_expiration(rgw::sal::Bucket* target,
         }
       } else {
         if (ret == -ERR_NO_SUCH_UPLOAD) {
-          ldpp_dout(wk->get_lc(), 5) << "ERROR: abort_multipart_upload "
-                                        "failed, ret=" << ret
-                                     << ", meta:" << obj.key << dendl;
+          ldpp_dout(this, 5) << "ERROR: abort_multipart_upload failed, ret="
+              << ret << ", meta:" << obj.key << dendl;
         } else {
-          ldpp_dout(wk->get_lc(), 0) << "ERROR: abort_multipart_upload "
-                                        "failed, ret=" << ret
-                                     << ", meta:" << obj.key << dendl;
+          ldpp_dout(this, 0) << "ERROR: abort_multipart_upload failed, ret="
+              << ret << ", meta:" << obj.key << dendl;
         }
       } /* abort failed */
     }   /* expired */
                return ret;
   };
 
-  worker->workpool->setf(pf);
-
   for (auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end();
        ++prefix_iter) {
 
@@ -1028,9 +860,10 @@ int RGWLC::handle_multipart_expiration(rgw::sal::Bucket* target,
       }
 
       for (auto obj_iter = results.objs.begin(); obj_iter != results.objs.end(); ++obj_iter, ++offset) {
-       std::tuple<lc_op, rgw_bucket_dir_entry> t1 =
-         {prefix_iter->second, *obj_iter};
-       worker->workpool->enqueue(WorkItem{t1});
+        workpool.spawn([pf, op=prefix_iter->second, obj=*obj_iter]
+                       (boost::asio::yield_context yield) mutable {
+            pf(yield, op, obj);
+          });
        if (going_down()) {
          return 0;
        }
@@ -1049,7 +882,6 @@ int RGWLC::handle_multipart_expiration(rgw::sal::Bucket* target,
     } while(results.is_truncated);
   } /* for prefix_map */
 
-  worker->workpool->drain();
   return 0;
 } /* RGWLC::handle_multipart_expiration */
 
@@ -1351,7 +1183,7 @@ public:
 };
 
 class LCOpAction_Transition : public LCOpAction {
-  const transition_action& transition;
+  transition_action transition;
   bool need_to_process{false};
 
 protected:
@@ -1747,7 +1579,8 @@ int LCOpRule::process(rgw_bucket_dir_entry& o,
 }
 
 int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker,
-                            time_t stop_at, bool once)
+                            time_t stop_at, bool once,
+                            boost::asio::yield_context yield)
 {
   RGWLifecycleConfiguration  config(cct);
   std::unique_ptr<rgw::sal::Bucket> bucket;
@@ -1765,17 +1598,20 @@ int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker,
   }
 
   int ret = driver->load_bucket(this, rgw_bucket(bucket_tenant, bucket_name),
-                                &bucket, null_yield);
+                                &bucket, yield);
   if (ret < 0) {
     ldpp_dout(this, 0) << "LC:get_bucket for " << bucket_name
                       << " failed" << dendl;
     return ret;
   }
 
+  // use a limited number of coroutines for concurrent processing
+  size_t limit = cct->_conf.get_val<int64_t>("rgw_lc_max_wp_worker");
+  auto workpool = ceph::async::spawn_throttle{yield, limit};
   auto stack_guard = make_scope_guard(
-    [&worker]
+    [&workpool]
       {
-       worker->workpool->drain();
+       workpool.wait();
       }
     );
 
@@ -1808,22 +1644,18 @@ int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker,
   /* fetch information for zone checks */
   rgw::sal::Zone* zone = driver->get_zone();
 
-  auto pf = [&bucket_name](RGWLC::LCWorker* wk, WorkItem& wi, optional_yield y) {
-    auto wt =
-      std::get<std::tuple<LCOpRule, rgw_bucket_dir_entry>>(wi);
-    auto& [op_rule, o] = wt;
-
-    ldpp_dout(wk->get_lc(), 20)
+  auto pf = [&bucket_name](const DoutPrefixProvider* dpp, optional_yield y,
+                           LCOpRule& op_rule, rgw_bucket_dir_entry& o) {
+    ldpp_dout(dpp, 20)
       << __func__ << "(): key=" << o.key << dendl;
-    int ret = op_rule.process(o, wk->dpp, y);
+    int ret = op_rule.process(o, dpp, y);
     if (ret < 0) {
-      ldpp_dout(wk->get_lc(), 20)
+      ldpp_dout(dpp, 20)
        << "ERROR: orule.process() returned ret=" << ret
        << " bucket=" << bucket_name
        << dendl;
     }
   };
-  worker->workpool->setf(pf);
 
   multimap<string, lc_op>& prefix_map = config.get_prefix_map();
   ldpp_dout(this, 10) << __func__ <<  "() prefix_map size="
@@ -1879,8 +1711,10 @@ int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker,
     rgw_bucket_dir_entry* o{nullptr};
     for (auto offset = 0; ol.get_obj(this, &o /* , fetch_barrier */); ++offset, ol.next()) {
       orule.update();
-      std::tuple<LCOpRule, rgw_bucket_dir_entry> t1 = {orule, *o};
-      worker->workpool->enqueue(WorkItem{t1});
+      workpool.spawn([&pf, dpp=this, orule, o=*o]
+                     (boost::asio::yield_context yield) mutable {
+          pf(dpp, yield, orule, o);
+        });
       if ((offset % 100) == 0) {
        if (worker_should_stop(stop_at, once)) {
          ldpp_dout(this, 5) << __func__ << " interval budget EXPIRED worker="
@@ -1890,10 +1724,38 @@ int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker,
        }
       }
     }
-    worker->workpool->drain();
   }
 
-  ret = handle_multipart_expiration(bucket.get(), prefix_map, worker, stop_at, once);
+  ret = handle_multipart_expiration(bucket.get(), prefix_map, workpool,
+                                    worker, stop_at, once);
+  return ret;
+}
+
+int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker,
+                            time_t stop_at, bool once)
+{
+  int ret = 0;
+
+  // spawn a coroutine for bucket_lc_process() so it can use spawn_throttle
+  // for concurrent operations
+  boost::asio::io_context context;
+  boost::asio::spawn(context,
+      [this, &shard_id, worker, stop_at, once] (boost::asio::yield_context yield) {
+        return bucket_lc_process(shard_id, worker, stop_at, once, yield);
+      },
+      [&ret] (std::exception_ptr eptr, int result) {
+        if (eptr) {
+          std::rethrow_exception(eptr);
+        } else {
+          ret = result;
+        }
+      });
+
+  // warn about any blocking operations called from this coroutine
+  auto enable_warnings = warn_about_blocking_in_scope{};
+
+  context.run();
+
   return ret;
 }
 
@@ -2665,7 +2527,6 @@ int RGWLC::LCWorker::schedule_next_start_time(utime_t &start, utime_t& now)
 
 RGWLC::LCWorker::~LCWorker()
 {
-  delete workpool;
 } /* ~LCWorker */
 
 list<RGWLifecycleConfiguration> RGWLifecycleConfiguration::generate_test_instances()
index 533ef871594cda6f5f1cfeedbc4bebd35ec896a1..1b9def492144175654b9ff9ea4b696930bdd731a 100644 (file)
@@ -562,6 +562,8 @@ public:
 };
 WRITE_CLASS_ENCODER(RGWLifecycleConfiguration)
 
+namespace ceph::async { class spawn_throttle; }
+
 class RGWLC : public DoutPrefixProvider {
   CephContext *cct;
   rgw::sal::Driver* driver;
@@ -574,8 +576,6 @@ class RGWLC : public DoutPrefixProvider {
 
 public:
 
-  class WorkPool;
-
   class LCWorker : public Thread
   {
     const DoutPrefixProvider *dpp;
@@ -584,7 +584,6 @@ public:
     int ix;
     std::mutex lock;
     std::condition_variable cond;
-    WorkPool* workpool{nullptr};
     /* save the target bucket names created as part of object transition
      * to cloud. This list is maintained for the duration of each RGWLC::process()
      * post which it is discarded. */
@@ -612,7 +611,6 @@ public:
 
     friend class RGWRados;
     friend class RGWLC;
-    friend class WorkQ;
   }; /* LCWorker */
 
   friend class RGWRados;
@@ -647,6 +645,8 @@ public:
   int list_lc_progress(std::string& marker, uint32_t max_entries,
                       std::vector<rgw::sal::LCEntry>&,
                       int& index);
+  int bucket_lc_process(std::string& shard_id, LCWorker* worker, time_t stop_at,
+                       bool once, boost::asio::yield_context yield);
   int bucket_lc_process(std::string& shard_id, LCWorker* worker, time_t stop_at,
                        bool once);
   int bucket_lc_post(int index, int max_lock_sec,
@@ -673,6 +673,7 @@ public:
 
   int handle_multipart_expiration(rgw::sal::Bucket* target,
                                  const std::multimap<std::string, lc_op>& prefix_map,
+                                 ceph::async::spawn_throttle& workpool,
                                  LCWorker* worker, time_t stop_at, bool once);
 };