]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw/lc: pass optional_yield to LCObjsLister
authorCasey Bodley <cbodley@redhat.com>
Tue, 12 Aug 2025 15:16:03 +0000 (11:16 -0400)
committerCasey Bodley <cbodley@redhat.com>
Tue, 21 Oct 2025 13:08:04 +0000 (09:08 -0400)
avoid blocking operations in LCObjsLister under bucket_lc_process()
because it's serviced by a single thread

Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/rgw/rgw_lc.cc
src/rgw/rgw_lc.h

index 4c37f40e97b336aafae9f0a152dcd7c8f418beaa..7e38e8be04bea48e54c2bf8fea095eed7e191f99 100644 (file)
@@ -394,14 +394,14 @@ public:
     list_params.prefix = prefix;
   }
 
-  int init(const DoutPrefixProvider *dpp) {
-    return fetch(dpp);
+  int init(const DoutPrefixProvider *dpp, optional_yield y) {
+    return fetch(dpp, y);
   }
 
-  int fetch(const DoutPrefixProvider *dpp) {
+  int fetch(const DoutPrefixProvider *dpp, optional_yield y) {
     CephContext* cct = dpp->get_cct();
     int cnt = cct->_conf.get_val<uint64_t>("rgw_lc_list_cnt");
-    int ret = bucket->list(dpp, list_params, cnt, list_results, null_yield);
+    int ret = bucket->list(dpp, list_params, cnt, list_results, y);
     if (ret < 0) {
       return ret;
     }
@@ -411,28 +411,32 @@ public:
     return 0;
   }
 
-  void delay() {
-    std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms));
+  void delay(const DoutPrefixProvider* dpp) {
+    if (delay_ms) {
+      maybe_warn_about_blocking(dpp);
+      std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms));
+    }
   }
 
-  bool get_obj(const DoutPrefixProvider *dpp, rgw_bucket_dir_entry **obj,
+  bool get_obj(const DoutPrefixProvider *dpp, optional_yield y,
+              rgw_bucket_dir_entry **obj,
               std::function<void(void)> fetch_barrier
               = []() { /* nada */}) {
     if (obj_iter == list_results.objs.end()) {
       if (!list_results.is_truncated) {
-        delay();
+        delay(dpp);
         return false;
       } else {
        fetch_barrier();
         list_params.marker = pre_obj.key;
-        int ret = fetch(dpp);
+        int ret = fetch(dpp, y);
         if (ret < 0) {
           ldpp_dout(dpp, 0) << "ERROR: list_op returned ret=" << ret
                                 << dendl;
           return false;
         }
       }
-      delay();
+      delay(dpp);
     }
 
     if (obj_iter->key.name == pre_obj.key.name) {
@@ -774,12 +778,12 @@ static inline bool worker_should_stop(time_t stop_at, bool once)
 int RGWLC::handle_multipart_expiration(rgw::sal::Bucket* target,
                                       const multimap<string, lc_op>& prefix_map,
                                       ceph::async::spawn_throttle& workpool,
+                                      boost::asio::yield_context yield,
                                       LCWorker* worker, time_t stop_at, bool once)
 {
   int ret;
   rgw::sal::Bucket::ListParams params;
   rgw::sal::Bucket::ListResults results;
-  auto delay_ms = cct->_conf.get_val<int64_t>("rgw_lc_thread_delay");
   params.list_versions = false;
   /* lifecycle processing does not depend on total order, so can
    * take advantage of unordered listing optimizations--such as
@@ -851,7 +855,7 @@ int RGWLC::handle_multipart_expiration(rgw::sal::Bucket* target,
     do {
       auto offset = 0;
       results.objs.clear();
-      ret = target->list(this, params, 1000, results, null_yield);
+      ret = target->list(this, params, 1000, results, yield);
       if (ret < 0) {
           if (ret == (-ENOENT))
             return 0;
@@ -877,8 +881,6 @@ int RGWLC::handle_multipart_expiration(rgw::sal::Bucket* target,
          return 0;
        }
       }
-
-      std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms));
     } while(results.is_truncated);
   } /* for prefix_map */
 
@@ -1697,7 +1699,7 @@ int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker,
       continue;
     }
 
-    ret = ol.init(this);
+    ret = ol.init(this, yield);
     if (ret < 0) {
       if (ret == (-ENOENT))
         return 0;
@@ -1709,7 +1711,7 @@ int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker,
     LCOpRule orule(oenv);
     orule.build(); // why can't ctor do it?
     rgw_bucket_dir_entry* o{nullptr};
-    for (auto offset = 0; ol.get_obj(this, &o /* , fetch_barrier */); ++offset, ol.next()) {
+    for (auto offset = 0; ol.get_obj(this, yield, &o /* , fetch_barrier */); ++offset, ol.next()) {
       orule.update();
       workpool.spawn([&pf, dpp=this, orule, o=*o]
                      (boost::asio::yield_context yield) mutable {
@@ -1727,7 +1729,7 @@ int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker,
   }
 
   ret = handle_multipart_expiration(bucket.get(), prefix_map, workpool,
-                                    worker, stop_at, once);
+                                    yield, worker, stop_at, once);
   return ret;
 }
 
index 1b9def492144175654b9ff9ea4b696930bdd731a..ba8d6b46a4f7f8ef93201fb84490bbae44ebc5f9 100644 (file)
@@ -674,6 +674,7 @@ public:
   int handle_multipart_expiration(rgw::sal::Bucket* target,
                                  const std::multimap<std::string, lc_op>& prefix_map,
                                  ceph::async::spawn_throttle& workpool,
+                                 boost::asio::yield_context yield,
                                  LCWorker* worker, time_t stop_at, bool once);
 };