]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgwlc: permit per-bucket thread parallelism
authorMatt Benjamin <mbenjamin@redhat.com>
Wed, 18 Mar 2020 01:47:17 +0000 (21:47 -0400)
committerMatt Benjamin <mbenjamin@redhat.com>
Tue, 21 Apr 2020 17:39:09 +0000 (13:39 -0400)
Introduces type-safe work queues to process lifecycle actions
in parallel.  Currently, optimized for copy avoidance, so limiting
fetch overlap--because rgw_bucket_dir_entry is b.i.g.

Oh, heck.  Open it up.

Define bsf and f in the right order.

Signed-off-by: Matt Benjamin <mbenjamin@redhat.com>
src/common/legacy_config_opts.h
src/common/options.cc
src/rgw/rgw_lc.cc
src/rgw/rgw_lc.h

index b344dd5b8d42e809ca296c89cef3298010bc0563..847991b3cff02209f31f201c2aa7ef6dd1c02af2 100644 (file)
@@ -1290,6 +1290,7 @@ OPTION(rgw_content_length_compat, OPT_BOOL) // Check both HTTP_CONTENT_LENGTH an
 OPTION(rgw_lifecycle_work_time, OPT_STR) //job process lc  at 00:00-06:00s
 OPTION(rgw_lc_lock_max_time, OPT_INT)  // total run time for a single lc processor work
 OPTION(rgw_lc_max_worker, OPT_INT)// number of (parellized) LCWorker threads
+OPTION(rgw_lc_max_wp_worker, OPT_INT)// number of per-LCWorker pool threads
 OPTION(rgw_lc_max_objs, OPT_INT)
 OPTION(rgw_lc_max_rules, OPT_U32)  // Max rules set on one bucket
 OPTION(rgw_lc_debug_interval, OPT_INT)  // Debug run interval, in seconds
index b586d495c5b5a2869322d7fdd1df2946a9f5a05a..016a5c32638d7c6e8ea6ce00c6a36dc0e513740c 100644 (file)
@@ -5548,7 +5548,7 @@ std::vector<Option> get_rgw_options() {
     .set_long_description("Local time window in which the lifecycle maintenance thread can work."),
 
     Option("rgw_lc_lock_max_time", Option::TYPE_INT, Option::LEVEL_DEV)
-    .set_default(60)
+    .set_default(90)
     .set_description(""),
 
     Option("rgw_lc_thread_delay", Option::TYPE_INT, Option::LEVEL_ADVANCED)
@@ -5559,16 +5559,23 @@ std::vector<Option> get_rgw_options() {
     .set_default(3)
     .set_description("Number of LCWorker tasks that will be run in parallel")
     .set_long_description(
-      "Number of LCWorker tasks that will be run in parallel--used to permit >1 bucket/index shards "
-      "to be processed simultaneously"),
+      "Number of LCWorker tasks that will run in parallel--used to permit >1 "
+      "bucket/index shards to be processed simultaneously"),
+
+    Option("rgw_lc_max_wp_worker", Option::TYPE_INT, Option::LEVEL_ADVANCED)
+    .set_default(3)
+    .set_description("Number of workpool threads per LCWorker")
+    .set_long_description(
+      "Number of threads in per-LCWorker workpools--used to accelerate "
+      "per-bucket processing"),
 
     Option("rgw_lc_max_objs", Option::TYPE_INT, Option::LEVEL_ADVANCED)
     .set_default(32)
     .set_description("Number of lifecycle data shards")
     .set_long_description(
-          "Number of RADOS objects to use for storing lifecycle index. This can affect "
-          "concurrency of lifecycle maintenance, but requires multiple RGW processes "
-          "running on the zone to be utilized."),
+          "Number of RADOS objects to use for storing lifecycle index. This "
+         "affects concurrency of lifecycle maintenance, as shards can be "
+          "processed in parallel."),
 
     Option("rgw_lc_max_rules", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
     .set_default(1000)
index aba753c891fe79613286b71364f498869777a551..9a3def0a10822db2f13f44cf231b524dd4aba9cb 100644 (file)
@@ -5,12 +5,16 @@
 #include <iostream>
 #include <map>
 #include <algorithm>
+#include <tuple>
+#include <functional>
 
 #include <boost/algorithm/string/split.hpp>
 #include <boost/algorithm/string.hpp>
 #include <boost/algorithm/string/predicate.hpp>
+#include <boost/variant.hpp>
 
 #include "common/Formatter.h"
+#include "common/containers.h"
 #include <common/errno.h>
 #include "include/random.h"
 #include "cls/rgw/cls_rgw_client.h"
@@ -203,10 +207,6 @@ bool RGWLifecycleConfiguration::valid()
   return true;
 }
 
-RGWLC::LCWorker::LCWorker(const DoutPrefixProvider* _dpp, CephContext *_cct,
-                         RGWLC *_lc)
-  : dpp(_dpp), cct(_cct), lc(_lc) {}
-
 void *RGWLC::LCWorker::entry() {
   do {
     utime_t start = ceph_clock_now();
@@ -391,106 +391,6 @@ static bool pass_object_lock_check(RGWRados *store, RGWBucketInfo& bucket_info,
   }
 }
 
-int RGWLC::handle_multipart_expiration(
-  RGWRados::Bucket *target, const multimap<string, lc_op>& prefix_map)
-{
-  MultipartMetaFilter mp_filter;
-  vector<rgw_bucket_dir_entry> objs;
-  RGWMPObj mp_obj;
-  bool is_truncated;
-  int ret;
-  RGWBucketInfo& bucket_info = target->get_bucket_info();
-  RGWRados::Bucket::List list_op(target);
-  auto delay_ms = cct->_conf.get_val<int64_t>("rgw_lc_thread_delay");
-  list_op.params.list_versions = false;
-  /* lifecycle processing does not depend on total order, so can
-   * take advantage of unordered listing optimizations--such as
-   * operating on one shard at a time */
-  list_op.params.allow_unordered = true;
-  list_op.params.ns = RGW_OBJ_NS_MULTIPART;
-  list_op.params.filter = &mp_filter;
-  for (auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end();
-       ++prefix_iter) {
-    if (!prefix_iter->second.status || prefix_iter->second.mp_expiration <= 0) {
-      continue;
-    }
-    list_op.params.prefix = prefix_iter->first;
-    do {
-      objs.clear();
-      list_op.params.marker = list_op.get_next_marker();
-      ret = list_op.list_objects(1000, &objs, NULL, &is_truncated, null_yield);
-      if (ret < 0) {
-          if (ret == (-ENOENT))
-            return 0;
-          ldpp_dout(this, 0) << "ERROR: store->list_objects():" <<dendl;
-          return ret;
-      }
-
-      for (auto obj_iter = objs.begin(); obj_iter != objs.end(); ++obj_iter) {
-        if (obj_has_expired(cct, obj_iter->meta.mtime,
-                           prefix_iter->second.mp_expiration)) {
-          rgw_obj_key key(obj_iter->key);
-          if (!mp_obj.from_meta(key.name)) {
-            continue;
-          }
-          RGWObjectCtx rctx(store);
-         /* XXXX where is this defined? */
-          ret = abort_multipart_upload(store, cct, &rctx, bucket_info, mp_obj);
-          if (ret < 0 && ret != -ERR_NO_SUCH_UPLOAD) {
-            ldpp_dout(this, 0) << "ERROR: abort_multipart_upload failed, ret=" << ret << ", meta:" << obj_iter->key << dendl;
-          } else if (ret == -ERR_NO_SUCH_UPLOAD) {
-            ldpp_dout(this, 5) << "ERROR: abort_multipart_upload failed, ret=" << ret << ", meta:" << obj_iter->key << dendl;
-          }
-          if (going_down())
-            return 0;
-        }
-      } /* for objs */
-      std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms));
-    } while(is_truncated);
-  }
-  return 0;
-}
-
-static int read_obj_tags(RGWRados *store, RGWBucketInfo& bucket_info,
-                        rgw_obj& obj, RGWObjectCtx& ctx, bufferlist& tags_bl)
-{
-  RGWRados::Object op_target(store, bucket_info, ctx, obj);
-  RGWRados::Object::Read read_op(&op_target);
-
-  return read_op.get_attr(RGW_ATTR_TAGS, tags_bl, null_yield);
-}
-
-static bool is_valid_op(const lc_op& op)
-{
-      return (op.status &&
-              (op.expiration > 0 
-               || op.expiration_date != boost::none
-               || op.noncur_expiration > 0
-               || op.dm_expiration
-               || !op.transitions.empty()
-               || !op.noncur_transitions.empty()));
-}
-
-static inline bool has_all_tags(const lc_op& rule_action,
-                               const RGWObjTags& object_tags)
-{
-  if(! rule_action.obj_tags)
-    return false;
-  if(object_tags.count() < rule_action.obj_tags->count())
-    return false;
-  size_t tag_count = 0;
-  for (const auto& tag : object_tags.get_tags()) {
-    const auto& rule_tags = rule_action.obj_tags->get_tags();
-    const auto& iter = rule_tags.find(tag.first);
-    if(iter->second == tag.second)
-    {
-      tag_count++;
-    }
-  /* all tags in the rule appear in obj tags */
-  }
-  return tag_count == rule_action.obj_tags->count();
-}
-
 class LCObjsLister {
   rgw::sal::RGWRadosStore *store;
   RGWBucketInfo& bucket_info;
@@ -538,23 +438,27 @@ public:
     std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms));
   }
 
-  bool get_obj(rgw_bucket_dir_entry *obj) {
+  bool get_obj(rgw_bucket_dir_entry **obj,
+              std::function<void(void)> fetch_barrier
+              = []() { /* nada */}) {
     if (obj_iter == objs.end()) {
       if (!is_truncated) {
         delay();
         return false;
       } else {
+       fetch_barrier();
         list_op.params.marker = pre_obj.key;
-
         int ret = fetch();
         if (ret < 0) {
-          ldout(store->ctx(), 0) << "ERROR: list_op returned ret=" << ret << dendl;
+          ldout(store->ctx(), 0) << "ERROR: list_op returned ret=" << ret
+                                << dendl;
           return ret;
         }
       }
       delay();
     }
-    *obj = *obj_iter;
+    /* returning address of entry in objs */
+    *obj = &(*obj_iter);
     return obj_iter != objs.end();
   }
 
@@ -576,8 +480,7 @@ public:
     }
     return (obj_iter->key.name.compare((obj_iter + 1)->key.name) == 0);
   }
-};
-
+}; /* LCObjsLister */
 
 struct op_env {
 
@@ -650,7 +553,7 @@ static int remove_expired_obj(lc_op_ctx& oc, bool remove_indeed)
   }
 
   return del_op.delete_obj(null_yield);
-}
+} /* remove_expired_obj */
 
 class LCOpAction {
 public:
@@ -678,7 +581,7 @@ public:
   virtual int process(lc_op_ctx& oc) {
     return 0;
   }
-};
+}; /* LCOpAction */
 
 class LCOpFilter {
 public:
@@ -686,7 +589,7 @@ virtual ~LCOpFilter() {}
   virtual bool check(lc_op_ctx& oc) {
     return false;
   }
-};
+}; /* LCOpFilter */
 
 class LCOpRule {
   friend class LCOpAction;
@@ -701,7 +604,251 @@ public:
 
   void build();
   int process(rgw_bucket_dir_entry& o, const DoutPrefixProvider *dpp);
-};
+}; /* LCOpRule */
+
+using WorkItem =
+  boost::variant<void*,
+                /* out-of-line delete */
+                std::tuple<LCOpRule&, rgw_bucket_dir_entry>,
+                /* uncompleted MPU expiration */
+                std::tuple<const lc_op&, rgw_bucket_dir_entry>,
+                rgw_bucket_dir_entry>;
+
+class WorkQ : public Thread
+{
+public:
+  using unique_lock = std::unique_lock<std::mutex>;
+  using work_f = std::function<void(RGWLC::LCWorker*, WorkItem&)>;
+  using dequeue_result = boost::variant<void*, WorkItem>;
+
+private:
+  const work_f bsf = [](RGWLC::LCWorker* wk, WorkItem& wi) {};
+  RGWLC::LCWorker* wk;
+  uint32_t qmax;
+  std::mutex mtx;
+  std::condition_variable cv;
+  vector<WorkItem> items;
+  work_f f;
+
+public:
+  WorkQ(RGWLC::LCWorker* wk, uint32_t ix, uint32_t qmax)
+    : wk(wk), qmax(qmax), f(bsf)
+    {
+      create((string{"workpool_thr_"} + to_string(ix)).c_str());
+    }
+
+  void setf(work_f _f) {
+    f = _f;
+  }
+
+  void enqueue(WorkItem&& item) {
+    unique_lock uniq(mtx);
+    while ((!wk->get_lc()->going_down()) &&
+          (items.size() > qmax)) {
+      cv.wait_for(uniq, 200ms);
+    }
+    items.push_back(item);
+  }
+
+  void drain() {
+    unique_lock uniq(mtx);
+    while ((!wk->get_lc()->going_down()) &&
+          (items.size() > 0)) {
+      cv.wait_for(uniq, 200ms);
+    }
+  }
+
+private:
+  dequeue_result dequeue() {
+    unique_lock uniq(mtx);
+    while ((!wk->get_lc()->going_down()) &&
+          (items.size() == 0)) {
+      cv.wait_for(uniq, 200ms);
+    }
+    if (items.size() > 0) {
+      auto item = items.back();
+      items.pop_back();
+      return {item};
+    }
+    return nullptr;
+  }
+
+  void* entry() override {
+    while (!wk->get_lc()->going_down()) {
+      auto item = dequeue();
+      if (item.which() == 0) {
+       /* going down */
+       break;
+      }
+      f(wk, boost::get<WorkItem>(item));
+    }
+    return nullptr;
+  }
+}; /* WorkQ */
+
+class RGWLC::WorkPool
+{
+  using TVector = ceph::containers::tiny_vector<WorkQ, 3>;
+  TVector wqs;
+  uint64_t ix;
+
+public:
+  WorkPool(RGWLC::LCWorker* wk, uint16_t n_threads, uint32_t qmax)
+    : wqs(TVector{
+       n_threads,
+       [&](const size_t ix, auto emplacer) {
+         emplacer.emplace(wk, ix, qmax);
+       }}),
+      ix(0)
+    {}
+
+  void setf(WorkQ::work_f _f) {
+    for (auto& wq : wqs) {
+      wq.setf(_f);
+    }
+  }
+
+  void enqueue(WorkItem item) {
+    const auto tix = ix;
+    ix = (ix+1) % wqs.size();
+    (wqs[tix]).enqueue(std::move(item));
+  }
+
+  void drain() {
+    for (auto& wq : wqs) {
+      wq.drain();
+    }
+  }
+}; /* WorkPool */
+
+RGWLC::LCWorker::LCWorker(const DoutPrefixProvider* _dpp, CephContext *_cct,
+                         RGWLC *_lc)
+  : dpp(_dpp), cct(_cct), lc(_lc)
+{
+  auto wpw = cct->_conf.get_val<int64_t>("rgw_lc_max_wp_worker");
+  workpool = new WorkPool(this, wpw, 512);
+}
+
+int RGWLC::handle_multipart_expiration(
+  RGWRados::Bucket *target, const multimap<string, lc_op>& prefix_map,
+  LCWorker* worker)
+{
+  MultipartMetaFilter mp_filter;
+  vector<rgw_bucket_dir_entry> objs;
+  bool is_truncated;
+  int ret;
+  RGWBucketInfo& bucket_info = target->get_bucket_info();
+  RGWRados::Bucket::List list_op(target);
+  auto delay_ms = cct->_conf.get_val<int64_t>("rgw_lc_thread_delay");
+  list_op.params.list_versions = false;
+  /* lifecycle processing does not depend on total order, so can
+   * take advantage of unordered listing optimizations--such as
+   * operating on one shard at a time */
+  list_op.params.allow_unordered = true;
+  list_op.params.ns = RGW_OBJ_NS_MULTIPART;
+  list_op.params.filter = &mp_filter;
+
+  auto pf = [&](RGWLC::LCWorker* wk, WorkItem& wi) {
+    auto wt = boost::get<std::tuple<const lc_op&, rgw_bucket_dir_entry>>(wi);
+    auto& [rule, obj] = wt;
+    RGWMPObj mp_obj;
+    if (obj_has_expired(cct, obj.meta.mtime, rule.mp_expiration)) {
+      rgw_obj_key key(obj.key);
+      if (!mp_obj.from_meta(key.name)) {
+       return;
+      }
+      RGWObjectCtx rctx(store);
+      ret = abort_multipart_upload(store, cct, &rctx, bucket_info, mp_obj);
+      if (ret < 0 && ret != -ERR_NO_SUCH_UPLOAD) {
+       ldpp_dout(wk->get_lc(), 0)
+         << "ERROR: abort_multipart_upload failed, ret=" << ret
+         << ", meta:" << obj.key
+         << dendl;
+      } else if (ret == -ERR_NO_SUCH_UPLOAD) {
+       ldpp_dout(wk->get_lc(), 5)
+         << "ERROR: abort_multipart_upload failed, ret=" << ret
+         << ", meta:" << obj.key
+         << dendl;
+      }
+    } /* expired */
+  };
+
+  worker->workpool->setf(pf);
+
+  for (auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end();
+       ++prefix_iter) {
+    if (!prefix_iter->second.status || prefix_iter->second.mp_expiration <= 0) {
+      continue;
+    }
+    list_op.params.prefix = prefix_iter->first;
+    do {
+      objs.clear();
+      list_op.params.marker = list_op.get_next_marker();
+      ret = list_op.list_objects(1000, &objs, NULL, &is_truncated, null_yield);
+      if (ret < 0) {
+          if (ret == (-ENOENT))
+            return 0;
+          ldpp_dout(this, 0) << "ERROR: store->list_objects():" <<dendl;
+          return ret;
+      }
+
+      for (auto obj_iter = objs.begin(); obj_iter != objs.end(); ++obj_iter) {
+       std::tuple<const lc_op&, rgw_bucket_dir_entry> t1 =
+         {prefix_iter->second, *obj_iter};
+       worker->workpool->enqueue(WorkItem{t1});
+       if (going_down()) {
+         worker->workpool->drain();
+         return 0;
+       }
+      } /* for objs */
+
+      std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms));
+    } while(is_truncated);
+  } /* for prefix_map */
+
+  worker->workpool->drain();
+  return 0;
+}
+
+static int read_obj_tags(RGWRados *store, RGWBucketInfo& bucket_info,
+                        rgw_obj& obj, RGWObjectCtx& ctx, bufferlist& tags_bl)
+{
+  RGWRados::Object op_target(store, bucket_info, ctx, obj);
+  RGWRados::Object::Read read_op(&op_target);
+
+  return read_op.get_attr(RGW_ATTR_TAGS, tags_bl, null_yield);
+}
+
+static bool is_valid_op(const lc_op& op)
+{
+      return (op.status &&
+              (op.expiration > 0
+               || op.expiration_date != boost::none
+               || op.noncur_expiration > 0
+               || op.dm_expiration
+               || !op.transitions.empty()
+               || !op.noncur_transitions.empty()));
+}
+
+static inline bool has_all_tags(const lc_op& rule_action,
+                               const RGWObjTags& object_tags)
+{
+  if(! rule_action.obj_tags)
+    return false;
+  if(object_tags.count() < rule_action.obj_tags->count())
+    return false;
+  size_t tag_count = 0;
+  for (const auto& tag : object_tags.get_tags()) {
+    const auto& rule_tags = rule_action.obj_tags->get_tags();
+    const auto& iter = rule_tags.find(tag.first);
+    if(iter->second == tag.second)
+    {
+      tag_count++;
+    }
+  /* all tags in the rule appear in obj tags */
+  }
+  return tag_count == rule_action.obj_tags->count();
+}
 
 static int check_tags(lc_op_ctx& oc, bool *skip)
 {
@@ -1115,8 +1262,23 @@ int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker)
       return -1;
     }
 
-  multimap<string, lc_op>& prefix_map = config.get_prefix_map();
+  auto pf = [](RGWLC::LCWorker* wk, WorkItem& wi) {
+    auto wt =
+      boost::get<std::tuple<LCOpRule&, rgw_bucket_dir_entry>>(wi);
+    auto& [op_rule, o] = wt;
+    ldpp_dout(wk->get_lc(), 20)
+      << __func__ << "(): key=" << o.key << dendl;
+    std::cout << "KEY2: " << o.key << std::endl;
+    int ret = op_rule.process(o, wk->dpp);
+    if (ret < 0) {
+      ldpp_dout(wk->get_lc(), 20)
+       << "ERROR: orule.process() returned ret=" << ret
+       << dendl;
+    }
+  };
+  worker->workpool->setf(pf);
 
+  multimap<string, lc_op>& prefix_map = config.get_prefix_map();
   ldpp_dout(this, 10) << __func__ <<  "() prefix_map size="
                      << prefix_map.size()
                      << dendl;
@@ -1142,7 +1304,6 @@ int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker)
     ol.set_prefix(prefix_iter->first);
 
     ret = ol.init();
-
     if (ret < 0) {
       if (ret == (-ENOENT))
         return 0;
@@ -1151,29 +1312,22 @@ int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker)
     }
 
     op_env oenv(op, store, worker, bucket_info, ol);
-
     LCOpRule orule(oenv);
-
-    orule.build();
-
-    rgw_bucket_dir_entry o;
-    for (; ol.get_obj(&o); ol.next()) {
-      ldpp_dout(this, 20) << __func__ << "(): key=" << o.key << dendl;
-      int ret = orule.process(o, this);
-      if (ret < 0) {
-        ldpp_dout(this, 20) << "ERROR: orule.process() returned ret="
-                           << ret
-                           << dendl;
-      }
-
-      if (going_down()) {
-        return 0;
-      }
+    orule.build(); // why can't ctor do it?
+#if 0
+    /* would permit passing o by reference, removes fetch overlap */
+    auto fetch_barrier = [&worker]()
+                          { worker->workpool->drain(); };
+#endif
+    rgw_bucket_dir_entry* o{nullptr};
+    for (; ol.get_obj(&o /* , fetch_barrier */); ol.next()) {
+      std::tuple<LCOpRule&, rgw_bucket_dir_entry> t1 = {orule, *o};
+      worker->workpool->enqueue(WorkItem{t1});
     }
+    worker->workpool->drain();
   }
 
-  ret = handle_multipart_expiration(&target, prefix_map);
-
+  ret = handle_multipart_expiration(&target, prefix_map, worker);
   return ret;
 }
 
@@ -1467,6 +1621,12 @@ int RGWLC::LCWorker::schedule_next_start_time(utime_t &start, utime_t& now)
   return secs>0 ? secs : secs+24*60*60;
 }
 
+RGWLC::LCWorker::~LCWorker()
+{
+  workpool->drain();
+  delete workpool;
+} /* ~LCWorker */
+
 void RGWLifecycleConfiguration::generate_test_instances(
   list<RGWLifecycleConfiguration*>& o)
 {
@@ -1590,7 +1750,13 @@ int RGWLC::remove_bucket_config(RGWBucketInfo& bucket_info,
   });
 
   return ret;
-}
+} /* RGWLC::remove_bucket_config */
+
+RGWLC::~RGWLC()
+{
+  stop_processor();
+  finalize();
+} /* ~RGWLC() */
 
 namespace rgw::lc {
 
index b583aa4e3652b2eee6e4083da1b32e0c6b38e10e..57f02a63e81f61f5046b2ddf015d812a93e1628b 100644 (file)
@@ -459,12 +459,17 @@ class RGWLC : public DoutPrefixProvider {
   string cookie;
 
 public:
-  class LCWorker : public Thread {
+
+  class WorkPool;
+
+  class LCWorker : public Thread
+  {
     const DoutPrefixProvider *dpp;
     CephContext *cct;
     RGWLC *lc;
     ceph::mutex lock = ceph::make_mutex("LCWorker");
     ceph::condition_variable cond;
+    WorkPool* workpool{nullptr};
 
   public:
     LCWorker(const DoutPrefixProvider* _dpp, CephContext *_cct, RGWLC *_lc);
@@ -473,7 +478,10 @@ public:
     void stop();
     bool should_work(utime_t& now);
     int schedule_next_start_time(utime_t& start, utime_t& now);
+    ~LCWorker();
+
     friend class RGWRados;
+    friend class RGWLC;
   }; /* LCWorker */
 
   friend class RGWRados;
@@ -481,10 +489,7 @@ public:
   std::vector<std::unique_ptr<RGWLC::LCWorker>> workers;
 
   RGWLC() : cct(nullptr), store(nullptr) {}
-  ~RGWLC() {
-    stop_processor();
-    finalize();
-  }
+  ~RGWLC();
 
   void initialize(CephContext *_cct, rgw::sal::RGWRadosStore *_store);
   void finalize();
@@ -512,7 +517,8 @@ public:
   private:
 
   int handle_multipart_expiration(RGWRados::Bucket *target,
-                                 const multimap<string, lc_op>& prefix_map);
+                                 const multimap<string, lc_op>& prefix_map,
+                                 LCWorker* worker);
 };
 
 namespace rgw::lc {