]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgwlc: enable thread-parallelism in RGWLC
authorMatt Benjamin <mbenjamin@redhat.com>
Sun, 26 Jan 2020 16:45:26 +0000 (11:45 -0500)
committerMatt Benjamin <mbenjamin@redhat.com>
Tue, 21 Apr 2020 17:39:05 +0000 (13:39 -0400)
Allow RGWLC to spawn >1 LCWorker threads, controlled by option
rgw_lc_max_worker (default to 3).

LCWorker parallelism exactly resembles previous cross-instance
parallelism (and extends it), with no new locking.

Replace in-order index shard enumeration (w/random start) with
a number sequence to avoid convoying.

Fixes: https://tracker.ceph.com/issues/43841
Signed-off-by: Matt Benjamin <mbenjamin@redhat.com>
src/common/legacy_config_opts.h
src/common/options.cc
src/rgw/rgw_lc.cc
src/rgw/rgw_lc.h
src/rgw/rgw_perf_counters.cc
src/rgw/rgw_perf_counters.h
src/rgw/rgw_rados.cc

index bfbf4f293ffe40808e3714482a2386b84ebfb21d..b344dd5b8d42e809ca296c89cef3298010bc0563 100644 (file)
@@ -1289,6 +1289,7 @@ OPTION(rgw_service_provider_name, OPT_STR) //service provider name which is cont
 OPTION(rgw_content_length_compat, OPT_BOOL) // Check both HTTP_CONTENT_LENGTH and CONTENT_LENGTH in fcgi env
 OPTION(rgw_lifecycle_work_time, OPT_STR) //job process lc  at 00:00-06:00s
 OPTION(rgw_lc_lock_max_time, OPT_INT)  // total run time for a single lc processor work
+OPTION(rgw_lc_max_worker, OPT_INT)// number of (parellized) LCWorker threads
 OPTION(rgw_lc_max_objs, OPT_INT)
 OPTION(rgw_lc_max_rules, OPT_U32)  // Max rules set on one bucket
 OPTION(rgw_lc_debug_interval, OPT_INT)  // Debug run interval, in seconds
index 330f165328463b8e5b20fbe93d50e2f35e9dcfff..b586d495c5b5a2869322d7fdd1df2946a9f5a05a 100644 (file)
@@ -5555,6 +5555,13 @@ std::vector<Option> get_rgw_options() {
     .set_default(0)
     .set_description("Delay after processing of bucket listing chunks (i.e., per 1000 entries) in milliseconds"),
 
+    Option("rgw_lc_max_worker", Option::TYPE_INT, Option::LEVEL_ADVANCED)
+    .set_default(3)
+    .set_description("Number of LCWorker tasks that will be run in parallel")
+    .set_long_description(
+      "Number of LCWorker tasks that will be run in parallel--used to permit >1 bucket/index shards "
+      "to be processed simultaneously"),
+
     Option("rgw_lc_max_objs", Option::TYPE_INT, Option::LEVEL_ADVANCED)
     .set_default(32)
     .set_description("Number of lifecycle data shards")
index a38169655a433b21dfdaf63e49ed81edc9e1bc40..aba753c891fe79613286b71364f498869777a551 100644 (file)
@@ -4,6 +4,7 @@
 #include <string.h>
 #include <iostream>
 #include <map>
+#include <algorithm>
 
 #include <boost/algorithm/string/split.hpp>
 #include <boost/algorithm/string.hpp>
@@ -14,6 +15,7 @@
 #include "include/random.h"
 #include "cls/rgw/cls_rgw_client.h"
 #include "cls/lock/cls_lock_client.h"
+#include "rgw_perf_counters.h"
 #include "rgw_common.h"
 #include "rgw_bucket.h"
 #include "rgw_lc.h"
@@ -48,11 +50,13 @@ bool LCRule::valid() const
   if (id.length() > MAX_ID_LEN) {
     return false;
   }
-  else if(expiration.empty() && noncur_expiration.empty() && mp_expiration.empty() && !dm_expiration &&
+  else if(expiration.empty() && noncur_expiration.empty() &&
+         mp_expiration.empty() && !dm_expiration &&
           transitions.empty() && noncur_transitions.empty()) {
     return false;
   }
-  else if (!expiration.valid() || !noncur_expiration.valid() || !mp_expiration.valid()) {
+  else if (!expiration.valid() || !noncur_expiration.valid() ||
+          !mp_expiration.valid()) {
     return false;
   }
   if (!transitions.empty()) {
@@ -78,7 +82,8 @@ bool LCRule::valid() const
   return true;
 }
 
-void LCRule::init_simple_days_rule(std::string_view _id, std::string_view _prefix, int num_days)
+void LCRule::init_simple_days_rule(std::string_view _id,
+                                  std::string_view _prefix, int num_days)
 {
   id = _id;
   prefix = _prefix;
@@ -118,7 +123,8 @@ bool RGWLifecycleConfiguration::_add_rule(const LCRule& rule)
     } else {
       action.date = ceph::from_iso_8601(elem.second.get_date());
     }
-    action.storage_class = rgw_placement_rule::get_canonical_storage_class(elem.first);
+    action.storage_class
+      = rgw_placement_rule::get_canonical_storage_class(elem.first);
     op.transitions.emplace(elem.first, std::move(action));
   }
   for (const auto &elem : rule.get_noncur_transitions()) {
@@ -151,7 +157,8 @@ int RGWLifecycleConfiguration::check_and_add_rule(const LCRule& rule)
   if (rule_map.find(id) != rule_map.end()) {  //id shouldn't be the same 
     return -EINVAL;
   }
-  if (rule.get_filter().has_tags() && (rule.get_dm_expiration() || !rule.get_mp_expiration().empty())) {
+  if (rule.get_filter().has_tags() && (rule.get_dm_expiration() ||
+                                      !rule.get_mp_expiration().empty())) {
     return -ERR_INVALID_REQUEST;
   }
   rule_map.insert(pair<string, LCRule>(id, rule));
@@ -162,7 +169,8 @@ int RGWLifecycleConfiguration::check_and_add_rule(const LCRule& rule)
   return 0;
 }
 
-bool RGWLifecycleConfiguration::has_same_action(const lc_op& first, const lc_op& second) {
+bool RGWLifecycleConfiguration::has_same_action(const lc_op& first,
+                                               const lc_op& second) {
   if ((first.expiration > 0 || first.expiration_date != boost::none) && 
     (second.expiration > 0 || second.expiration_date != boost::none)) {
     return true;
@@ -176,9 +184,11 @@ bool RGWLifecycleConfiguration::has_same_action(const lc_op& first, const lc_op&
         return true;
       }
     }
-  } else if (!first.noncur_transitions.empty() && !second.noncur_transitions.empty()) {
+  } else if (!first.noncur_transitions.empty() &&
+            !second.noncur_transitions.empty()) {
     for (auto &elem : first.noncur_transitions) {
-      if (second.noncur_transitions.find(elem.first) != second.noncur_transitions.end()) {
+      if (second.noncur_transitions.find(elem.first) !=
+         second.noncur_transitions.end()) {
         return true;
       }
     }
@@ -193,12 +203,16 @@ bool RGWLifecycleConfiguration::valid()
   return true;
 }
 
+RGWLC::LCWorker::LCWorker(const DoutPrefixProvider* _dpp, CephContext *_cct,
+                         RGWLC *_lc)
+  : dpp(_dpp), cct(_cct), lc(_lc) {}
+
 void *RGWLC::LCWorker::entry() {
   do {
     utime_t start = ceph_clock_now();
     if (should_work(start)) {
       ldpp_dout(dpp, 2) << "life cycle: start" << dendl;
-      int r = lc->process();
+      int r = lc->process(this);
       if (r < 0) {
         ldpp_dout(dpp, 0) << "ERROR: do life cycle process() returned error r=" << r << dendl;
       }
@@ -272,7 +286,7 @@ bool RGWLC::if_already_run_today(time_t& start_date)
     return false;
 }
 
-int RGWLC::bucket_lc_prepare(int index)
+int RGWLC::bucket_lc_prepare(int index, LCWorker* worker)
 {
   map<string, int > entries;
 
@@ -280,13 +294,15 @@ int RGWLC::bucket_lc_prepare(int index)
 
 #define MAX_LC_LIST_ENTRIES 100
   do {
-    int ret = cls_rgw_lc_list(store->getRados()->lc_pool_ctx, obj_names[index], marker, MAX_LC_LIST_ENTRIES, entries);
+    int ret = cls_rgw_lc_list(store->getRados()->lc_pool_ctx, obj_names[index],
+                             marker, MAX_LC_LIST_ENTRIES, entries);
     if (ret < 0)
       return ret;
     map<string, int>::iterator iter;
     for (iter = entries.begin(); iter != entries.end(); ++iter) {
       pair<string, int > entry(iter->first, lc_uninitial);
-      ret = cls_rgw_lc_set_entry(store->getRados()->lc_pool_ctx, obj_names[index],  entry);
+      ret = cls_rgw_lc_set_entry(store->getRados()->lc_pool_ctx,
+                                obj_names[index],  entry);
       if (ret < 0) {
         ldpp_dout(this, 0) << "RGWLC::bucket_lc_prepare() failed to set entry on "
             << obj_names[index] << dendl;
@@ -302,7 +318,8 @@ int RGWLC::bucket_lc_prepare(int index)
   return 0;
 }
 
-static bool obj_has_expired(CephContext *cct, ceph::real_time mtime, int days, ceph::real_time *expire_time = nullptr)
+static bool obj_has_expired(CephContext *cct, ceph::real_time mtime, int days,
+                           ceph::real_time *expire_time = nullptr)
 {
   double timediff, cmp;
   utime_t base_time;
@@ -325,7 +342,8 @@ static bool obj_has_expired(CephContext *cct, ceph::real_time mtime, int days, c
   return (timediff >= cmp);
 }
 
-static bool pass_object_lock_check(RGWRados *store, RGWBucketInfo& bucket_info, rgw_obj& obj, RGWObjectCtx& ctx)
+static bool pass_object_lock_check(RGWRados *store, RGWBucketInfo& bucket_info,
+                                  rgw_obj& obj, RGWObjectCtx& ctx)
 {
   if (!bucket_info.obj_lock_enabled()) {
     return true;
@@ -351,7 +369,8 @@ static bool pass_object_lock_check(RGWRados *store, RGWBucketInfo& bucket_info,
         ldout(store->ctx(), 0) << "ERROR: failed to decode RGWObjectRetention" << dendl;
         return false;
       }
-      if (ceph::real_clock::to_time_t(retention.get_retain_until_date()) > ceph_clock_now()) {
+      if (ceph::real_clock::to_time_t(retention.get_retain_until_date()) >
+         ceph_clock_now()) {
         return false;
       }
     }
@@ -385,12 +404,13 @@ int RGWLC::handle_multipart_expiration(
   auto delay_ms = cct->_conf.get_val<int64_t>("rgw_lc_thread_delay");
   list_op.params.list_versions = false;
   /* lifecycle processing does not depend on total order, so can
-   * take advantage of unorderd listing optimizations--such as
+   * take advantage of unordered listing optimizations--such as
    * operating on one shard at a time */
   list_op.params.allow_unordered = true;
   list_op.params.ns = RGW_OBJ_NS_MULTIPART;
   list_op.params.filter = &mp_filter;
-  for (auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); ++prefix_iter) {
+  for (auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end();
+       ++prefix_iter) {
     if (!prefix_iter->second.status || prefix_iter->second.mp_expiration <= 0) {
       continue;
     }
@@ -407,12 +427,14 @@ int RGWLC::handle_multipart_expiration(
       }
 
       for (auto obj_iter = objs.begin(); obj_iter != objs.end(); ++obj_iter) {
-        if (obj_has_expired(cct, obj_iter->meta.mtime, prefix_iter->second.mp_expiration)) {
+        if (obj_has_expired(cct, obj_iter->meta.mtime,
+                           prefix_iter->second.mp_expiration)) {
           rgw_obj_key key(obj_iter->key);
           if (!mp_obj.from_meta(key.name)) {
             continue;
           }
           RGWObjectCtx rctx(store);
+         /* XXXX where is this defined? */
           ret = abort_multipart_upload(store, cct, &rctx, bucket_info, mp_obj);
           if (ret < 0 && ret != -ERR_NO_SUCH_UPLOAD) {
             ldpp_dout(this, 0) << "ERROR: abort_multipart_upload failed, ret=" << ret << ", meta:" << obj_iter->key << dendl;
@@ -429,7 +451,8 @@ int RGWLC::handle_multipart_expiration(
   return 0;
 }
 
-static int read_obj_tags(RGWRados *store, RGWBucketInfo& bucket_info, rgw_obj& obj, RGWObjectCtx& ctx, bufferlist& tags_bl)
+static int read_obj_tags(RGWRados *store, RGWBucketInfo& bucket_info,
+                        rgw_obj& obj, RGWObjectCtx& ctx, bufferlist& tags_bl)
 {
   RGWRados::Object op_target(store, bucket_info, ctx, obj);
   RGWRados::Object::Read read_op(&op_target);
@@ -500,7 +523,8 @@ public:
   }
 
   int fetch() {
-    int ret = list_op.list_objects(1000, &objs, NULL, &is_truncated, null_yield);
+    int ret = list_op.list_objects(
+      1000, &objs, NULL, &is_truncated, null_yield);
     if (ret < 0) {
       return ret;
     }
@@ -556,15 +580,20 @@ public:
 
 
 struct op_env {
+
+  using LCWorker = RGWLC::LCWorker;
+
   lc_op& op;
   rgw::sal::RGWRadosStore *store;
-  RGWLC *lc;
+  LCWorker* worker;
   RGWBucketInfo& bucket_info;
   LCObjsLister& ol;
 
-  op_env(lc_op& _op, rgw::sal::RGWRadosStore *_store, RGWLC *_lc, RGWBucketInfo& _bucket_info,
-         LCObjsLister& _ol) : op(_op), store(_store), lc(_lc), bucket_info(_bucket_info), ol(_ol) {}
-};
+  op_env(lc_op& _op, rgw::sal::RGWRadosStore *_store, LCWorker* _worker,
+        RGWBucketInfo& _bucket_info, LCObjsLister& _ol)
+    : op(_op), store(_store), worker(_worker), bucket_info(_bucket_info),
+      ol(_ol) {}
+}; /* op_env */
 
 class LCRuleOp;
 
@@ -582,10 +611,12 @@ struct lc_op_ctx {
   RGWObjectCtx rctx;
   const DoutPrefixProvider *dpp;
 
-  lc_op_ctx(op_env& _env, rgw_bucket_dir_entry& _o, const DoutPrefixProvider *_dpp) : cct(_env.store->ctx()), env(_env), o(_o),
-                 store(env.store), bucket_info(env.bucket_info), op(env.op), ol(env.ol),
-                 obj(env.bucket_info.bucket, o.key), rctx(env.store), dpp(_dpp) {}
-};
+  lc_op_ctx(op_env& _env, rgw_bucket_dir_entry& _o,
+           const DoutPrefixProvider *_dpp)
+    : cct(_env.store->ctx()), env(_env), o(_o),
+      store(env.store), bucket_info(env.bucket_info), op(env.op), ol(env.ol),
+      obj(env.bucket_info.bucket, o.key), rctx(env.store), dpp(_dpp) {}
+}; /* lc_op_ctx */
 
 static int remove_expired_obj(lc_op_ctx& oc, bool remove_indeed)
 {
@@ -614,6 +645,10 @@ static int remove_expired_obj(lc_op_ctx& oc, bool remove_indeed)
   del_op.params.obj_owner = obj_owner;
   del_op.params.unmod_since = meta.mtime;
 
+  if (perfcounter) {
+    perfcounter->inc(l_rgw_lc_remove_expired, 1);
+  }
+
   return del_op.delete_obj(null_yield);
 }
 
@@ -676,7 +711,8 @@ static int check_tags(lc_op_ctx& oc, bool *skip)
     *skip = true;
 
     bufferlist tags_bl;
-    int ret = read_obj_tags(oc.store->getRados(), oc.bucket_info, oc.obj, oc.rctx, tags_bl);
+    int ret = read_obj_tags(oc.store->getRados(), oc.bucket_info, oc.obj,
+                           oc.rctx, tags_bl);
     if (ret < 0) {
       if (ret != -ENODATA) {
         ldout(oc.cct, 5) << "ERROR: read_obj_tags returned r=" << ret << dendl;
@@ -750,7 +786,8 @@ public:
         ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": no expiration set in rule, skipping" << dendl;
         return false;
       }
-      is_expired = ceph_clock_now() >= ceph::real_clock::to_time_t(*op.expiration_date);
+      is_expired = ceph_clock_now() >=
+       ceph::real_clock::to_time_t(*op.expiration_date);
       *exp_time = *op.expiration_date;
     } else {
       is_expired = obj_has_expired(oc.cct, mtime, op.expiration, exp_time);
@@ -793,7 +830,9 @@ public:
     bool is_expired = obj_has_expired(oc.cct, mtime, expiration, exp_time);
 
     ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": is_expired=" << is_expired << dendl;
-    return is_expired && pass_object_lock_check(oc.store->getRados(), oc.bucket_info, oc.obj, oc.rctx);
+    return is_expired &&
+      pass_object_lock_check(oc.store->getRados(),
+                            oc.bucket_info, oc.obj, oc.rctx);
   }
 
   int process(lc_op_ctx& oc) {
@@ -851,7 +890,8 @@ protected:
   virtual bool check_current_state(bool is_current) = 0;
   virtual ceph::real_time get_effective_mtime(lc_op_ctx& oc) = 0;
 public:
-  LCOpAction_Transition(const transition_action& _transition) : transition(_transition) {}
+  LCOpAction_Transition(const transition_action& _transition)
+    : transition(_transition) {}
 
   bool check(lc_op_ctx& oc, ceph::real_time *exp_time) override {
     auto& o = oc.o;
@@ -871,7 +911,8 @@ public:
         ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": no transition day/date set in rule, skipping" << dendl;
         return false;
       }
-      is_expired = ceph_clock_now() >= ceph::real_clock::to_time_t(*transition.date);
+      is_expired = ceph_clock_now() >=
+       ceph::real_clock::to_time_t(*transition.date);
       *exp_time = *transition.date;
     } else {
       is_expired = obj_has_expired(oc.cct, mtime, transition.days, exp_time);
@@ -879,7 +920,9 @@ public:
 
     ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": is_expired=" << is_expired << dendl;
 
-    need_to_process = (rgw_placement_rule::get_canonical_storage_class(o.meta.storage_class) != transition.storage_class);
+    need_to_process =
+      (rgw_placement_rule::get_canonical_storage_class(o.meta.storage_class) !=
+       transition.storage_class);
 
     return is_expired;
   }
@@ -895,15 +938,17 @@ public:
     target_placement.inherit_from(oc.bucket_info.placement_rule);
     target_placement.storage_class = transition.storage_class;
 
-    if (!oc.store->svc()->zone->get_zone_params().valid_placement(target_placement)) {
+    if (!oc.store->svc()->zone->get_zone_params().
+       valid_placement(target_placement)) {
       ldpp_dout(oc.dpp, 0) << "ERROR: non existent dest placement: " << target_placement
                            << " bucket="<< oc.bucket_info.bucket
                            << " rule_id=" << oc.op.id << dendl;
       return -EINVAL;
     }
 
-    int r = oc.store->getRados()->transition_obj(oc.rctx, oc.bucket_info, oc.obj,
-                                     target_placement, o.meta.mtime, o.versioned_epoch, oc.dpp, null_yield);
+    int r = oc.store->getRados()->transition_obj(
+      oc.rctx, oc.bucket_info, oc.obj, target_placement, o.meta.mtime,
+      o.versioned_epoch, oc.dpp, null_yield);
     if (r < 0) {
       ldpp_dout(oc.dpp, 0) << "ERROR: failed to transition obj " 
       << oc.bucket_info.bucket << ":" << o.key 
@@ -926,7 +971,8 @@ protected:
     return oc.o.meta.mtime;
   }
 public:
-  LCOpAction_CurrentTransition(const transition_action& _transition) : LCOpAction_Transition(_transition) {}
+  LCOpAction_CurrentTransition(const transition_action& _transition)
+    : LCOpAction_Transition(_transition) {}
 };
 
 class LCOpAction_NonCurrentTransition : public LCOpAction_Transition {
@@ -939,7 +985,8 @@ protected:
     return oc.ol.get_prev_obj().meta.mtime;
   }
 public:
-  LCOpAction_NonCurrentTransition(const transition_action& _transition) : LCOpAction_Transition(_transition) {}
+  LCOpAction_NonCurrentTransition(const transition_action& _transition)
+    : LCOpAction_Transition(_transition) {}
 };
 
 void LCOpRule::build()
@@ -1027,7 +1074,7 @@ int LCOpRule::process(rgw_bucket_dir_entry& o, const DoutPrefixProvider *dpp)
 
 }
 
-int RGWLC::bucket_lc_process(string& shard_id)
+int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker)
 {
   RGWLifecycleConfiguration  config(cct);
   RGWBucketInfo bucket_info;
@@ -1039,7 +1086,9 @@ int RGWLC::bucket_lc_process(string& shard_id)
   string bucket_tenant = result[0];
   string bucket_name = result[1];
   string bucket_marker = result[2];
-  int ret = store->getRados()->get_bucket_info(store->svc(), bucket_tenant, bucket_name, bucket_info, NULL, null_yield, &bucket_attrs);
+  int ret = store->getRados()->get_bucket_info(
+    store->svc(), bucket_tenant, bucket_name, bucket_info, NULL, null_yield,
+    &bucket_attrs);
   if (ret < 0) {
     ldpp_dout(this, 0) << "LC:get_bucket_info for " << bucket_name << " failed" << dendl;
     return ret;
@@ -1074,14 +1123,16 @@ int RGWLC::bucket_lc_process(string& shard_id)
 
   rgw_obj_key pre_marker;
   rgw_obj_key next_marker;
-  for(auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); ++prefix_iter) {
+  for(auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end();
+      ++prefix_iter) {
     auto& op = prefix_iter->second;
     if (!is_valid_op(op)) {
       continue;
     }
     ldpp_dout(this, 20) << __func__ << "(): prefix=" << prefix_iter->first << dendl;
     if (prefix_iter != prefix_map.begin() && 
-        (prefix_iter->first.compare(0, prev(prefix_iter)->first.length(), prev(prefix_iter)->first) == 0)) {
+        (prefix_iter->first.compare(0, prev(prefix_iter)->first.length(),
+                                   prev(prefix_iter)->first) == 0)) {
       next_marker = pre_marker;
     } else {
       pre_marker = next_marker;
@@ -1099,7 +1150,7 @@ int RGWLC::bucket_lc_process(string& shard_id)
       return ret;
     }
 
-    op_env oenv(op, store, this, bucket_info, ol);
+    op_env oenv(op, store, worker, bucket_info, ol);
 
     LCOpRule orule(oenv);
 
@@ -1126,7 +1177,9 @@ int RGWLC::bucket_lc_process(string& shard_id)
   return ret;
 }
 
-int RGWLC::bucket_lc_post(int index, int max_lock_sec, pair<string, int >& entry, int& result)
+int RGWLC::bucket_lc_post(int index, int max_lock_sec,
+                         pair<string, int>& entry, int& result,
+                         LCWorker* worker)
 {
   utime_t lock_duration(cct->_conf->rgw_lc_lock_max_time, 0);
 
@@ -1135,7 +1188,8 @@ int RGWLC::bucket_lc_post(int index, int max_lock_sec, pair<string, int >& entry
   l.set_duration(lock_duration);
 
   do {
-    int ret = l.lock_exclusive(&store->getRados()->lc_pool_ctx, obj_names[index]);
+    int ret = l.lock_exclusive(
+      &store->getRados()->lc_pool_ctx, obj_names[index]);
     if (ret == -EBUSY || ret == -EEXIST) { /* already locked by another lc processor */
       ldpp_dout(this, 0) << "RGWLC::bucket_lc_post() failed to acquire lock on "
           << obj_names[index] << ", sleep 5, try again" << dendl;
@@ -1146,7 +1200,8 @@ int RGWLC::bucket_lc_post(int index, int max_lock_sec, pair<string, int >& entry
       return 0;
     ldpp_dout(this, 20) << "RGWLC::bucket_lc_post() lock " << obj_names[index] << dendl;
     if (result ==  -ENOENT) {
-      ret = cls_rgw_lc_rm_entry(store->getRados()->lc_pool_ctx, obj_names[index],  entry);
+      ret = cls_rgw_lc_rm_entry(store->getRados()->lc_pool_ctx,
+                               obj_names[index],  entry);
       if (ret < 0) {
         ldpp_dout(this, 0) << "RGWLC::bucket_lc_post() failed to remove entry "
             << obj_names[index] << dendl;
@@ -1158,7 +1213,8 @@ int RGWLC::bucket_lc_post(int index, int max_lock_sec, pair<string, int >& entry
       entry.second = lc_complete;
     }
 
-    ret = cls_rgw_lc_set_entry(store->getRados()->lc_pool_ctx, obj_names[index],  entry);
+    ret = cls_rgw_lc_set_entry(store->getRados()->lc_pool_ctx,
+                              obj_names[index],  entry);
     if (ret < 0) {
       ldpp_dout(this, 0) << "RGWLC::process() failed to set entry on "
           << obj_names[index] << dendl;
@@ -1170,13 +1226,16 @@ clean:
   } while (true);
 }
 
-int RGWLC::list_lc_progress(const string& marker, uint32_t max_entries, map<string, int> *progress_map)
+int RGWLC::list_lc_progress(const string& marker, uint32_t max_entries,
+                           map<string, int>* progress_map)
 {
   int index = 0;
   progress_map->clear();
   for(; index <max_objs; index++) {
     map<string, int > entries;
-    int ret = cls_rgw_lc_list(store->getRados()->lc_pool_ctx, obj_names[index], marker, max_entries, entries);
+    int ret =
+      cls_rgw_lc_list(store->getRados()->lc_pool_ctx, obj_names[index], marker,
+                     max_entries, entries);
     if (ret < 0) {
       if (ret == -ENOENT) {
         ldpp_dout(this, 10) << __func__ << "() ignoring unfound lc object="
@@ -1194,15 +1253,26 @@ int RGWLC::list_lc_progress(const string& marker, uint32_t max_entries, map<stri
   return 0;
 }
 
-int RGWLC::process()
+static inline vector<int> random_sequence(uint32_t n)
 {
-  int max_secs = cct->_conf->rgw_lc_lock_max_time;
+  vector<int> v(n-1, 0);
+  std::generate(v.begin(), v.end(),
+    [ix = 0]() mutable {
+      return ix++;
+    });
+  std::random_shuffle(v.begin(), v.end());
+  return v;
+}
 
-  const int start = ceph::util::generate_random_number(0, max_objs - 1);
+int RGWLC::process(LCWorker* worker)
+{
+  int max_secs = cct->_conf->rgw_lc_lock_max_time;
 
-  for (int i = 0; i < max_objs; i++) {
-    int index = (i + start) % max_objs;
-    int ret = process(index, max_secs);
+  /* generate an index-shard sequence unrelated to any other
+   * that might be running in parallel */
+  vector<int> shard_seq = random_sequence(max_objs);
+  for (auto index : shard_seq) {
+    int ret = process(index, max_secs, worker);
     if (ret < 0)
       return ret;
   }
@@ -1210,7 +1280,7 @@ int RGWLC::process()
   return 0;
 }
 
-int RGWLC::process(int index, int max_lock_secs)
+int RGWLC::process(int index, int max_lock_secs, LCWorker* worker)
 {
   rados::cls::lock::Lock l(lc_index_lock_name);
   do {
@@ -1222,7 +1292,8 @@ int RGWLC::process(int index, int max_lock_secs)
     utime_t time(max_lock_secs, 0);
     l.set_duration(time);
 
-    int ret = l.lock_exclusive(&store->getRados()->lc_pool_ctx, obj_names[index]);
+    int ret = l.lock_exclusive(&store->getRados()->lc_pool_ctx,
+                              obj_names[index]);
     if (ret == -EBUSY || ret == -EEXIST) { /* already locked by another lc processor */
       ldpp_dout(this, 0) << "RGWLC::process() failed to acquire lock on "
           << obj_names[index] << ", sleep 5, try again" << dendl;
@@ -1233,7 +1304,8 @@ int RGWLC::process(int index, int max_lock_secs)
       return 0;
 
     cls_rgw_lc_obj_head head;
-    ret = cls_rgw_lc_get_head(store->getRados()->lc_pool_ctx, obj_names[index], head);
+    ret = cls_rgw_lc_get_head(store->getRados()->lc_pool_ctx, obj_names[index],
+                             head);
     if (ret < 0) {
       ldpp_dout(this, 0) << "RGWLC::process() failed to get obj head "
           << obj_names[index] << ", ret=" << ret << dendl;
@@ -1243,42 +1315,53 @@ int RGWLC::process(int index, int max_lock_secs)
     if(!if_already_run_today(head.start_date)) {
       head.start_date = now;
       head.marker.clear();
-      ret = bucket_lc_prepare(index);
+      ret = bucket_lc_prepare(index, worker);
       if (ret < 0) {
       ldpp_dout(this, 0) << "RGWLC::process() failed to update lc object "
-          << obj_names[index] << ", ret=" << ret << dendl;
+                        << obj_names[index]
+                        << ", ret=" << ret
+                        << dendl;
       goto exit;
       }
     }
 
-    ret = cls_rgw_lc_get_next_entry(store->getRados()->lc_pool_ctx, obj_names[index], head.marker, entry);
+    ret = cls_rgw_lc_get_next_entry(store->getRados()->lc_pool_ctx,
+                                   obj_names[index], head.marker, entry);
     if (ret < 0) {
       ldpp_dout(this, 0) << "RGWLC::process() failed to get obj entry "
           << obj_names[index] << dendl;
       goto exit;
     }
 
+    /* termination condition (eof) */
     if (entry.first.empty())
       goto exit;
 
     entry.second = lc_processing;
-    ret = cls_rgw_lc_set_entry(store->getRados()->lc_pool_ctx, obj_names[index],  entry);
+    ret = cls_rgw_lc_set_entry(store->getRados()->lc_pool_ctx,
+                              obj_names[index],  entry);
     if (ret < 0) {
-      ldpp_dout(this, 0) << "RGWLC::process() failed to set obj entry " << obj_names[index]
-          << " (" << entry.first << "," << entry.second << ")" << dendl;
+      ldpp_dout(this, 0) << "RGWLC::process() failed to set obj entry "
+                        << obj_names[index]
+                        << " (" << entry.first << ","
+                        << entry.second << ")"
+                        << dendl;
       goto exit;
     }
 
     head.marker = entry.first;
-    ret = cls_rgw_lc_put_head(store->getRados()->lc_pool_ctx, obj_names[index],  head);
+    ret = cls_rgw_lc_put_head(store->getRados()->lc_pool_ctx, obj_names[index],
+                             head);
     if (ret < 0) {
-      ldpp_dout(this, 0) << "RGWLC::process() failed to put head " << obj_names[index] << dendl;
+      ldpp_dout(this, 0) << "RGWLC::process() failed to put head "
+                        << obj_names[index]
+                        << dendl;
       goto exit;
     }
     l.unlock(&store->getRados()->lc_pool_ctx, obj_names[index]);
-    ret = bucket_lc_process(entry.first);
-    bucket_lc_post(index, max_lock_secs, entry, ret);
-  }while(1);
+    ret = bucket_lc_process(entry.first, worker);
+    bucket_lc_post(index, max_lock_secs, entry, ret, worker);
+  } while(1);
 
 exit:
     l.unlock(&store->getRados()->lc_pool_ctx, obj_names[index]);
@@ -1287,22 +1370,26 @@ exit:
 
 void RGWLC::start_processor()
 {
-  worker = new LCWorker(this, cct, this);
-  worker->create("lifecycle_thr");
+  auto maxw = cct->_conf->rgw_lc_max_worker;
+  workers.reserve(maxw);
+  for (int ix = 0; ix < maxw; ++ix) {
+    auto worker  =
+      std::make_unique<RGWLC::LCWorker>(this /* dpp */, cct, this);
+    worker->create((string{"lifecycle_thr_"} + to_string(ix)).c_str());
+    workers.emplace_back(std::move(worker));
+  }
 }
 
 void RGWLC::stop_processor()
 {
   down_flag = true;
-  if (worker) {
+  for (auto& worker : workers) {
     worker->stop();
     worker->join();
   }
-  delete worker;
-  worker = NULL;
+  workers.clear();
 }
 
-
 unsigned RGWLC::get_subsys() const
 {
   return dout_subsys;
@@ -1331,7 +1418,8 @@ bool RGWLC::LCWorker::should_work(utime_t& now)
   int end_hour;
   int end_minute;
   string worktime = cct->_conf->rgw_lifecycle_work_time;
-  sscanf(worktime.c_str(),"%d:%d-%d:%d",&start_hour, &start_minute, &end_hour, &end_minute);
+  sscanf(worktime.c_str(),"%d:%d-%d:%d",&start_hour, &start_minute,
+        &end_hour, &end_minute);
   struct tm bdt;
   time_t tt = now.sec();
   localtime_r(&tt, &bdt);
@@ -1364,7 +1452,8 @@ int RGWLC::LCWorker::schedule_next_start_time(utime_t &start, utime_t& now)
   int end_hour;
   int end_minute;
   string worktime = cct->_conf->rgw_lifecycle_work_time;
-  sscanf(worktime.c_str(),"%d:%d-%d:%d",&start_hour, &start_minute, &end_hour, &end_minute);
+  sscanf(worktime.c_str(),"%d:%d-%d:%d",&start_hour, &start_minute, &end_hour,
+        &end_minute);
   struct tm bdt;
   time_t tt = now.sec();
   time_t nt;
@@ -1378,15 +1467,20 @@ int RGWLC::LCWorker::schedule_next_start_time(utime_t &start, utime_t& now)
   return secs>0 ? secs : secs+24*60*60;
 }
 
-void RGWLifecycleConfiguration::generate_test_instances(list<RGWLifecycleConfiguration*>& o)
+void RGWLifecycleConfiguration::generate_test_instances(
+  list<RGWLifecycleConfiguration*>& o)
 {
   o.push_back(new RGWLifecycleConfiguration);
 }
 
 void get_lc_oid(CephContext *cct, const string& shard_id, string *oid)
 {
-  int max_objs = (cct->_conf->rgw_lc_max_objs > HASH_PRIME ? HASH_PRIME : cct->_conf->rgw_lc_max_objs);
-  int index = ceph_str_hash_linux(shard_id.c_str(), shard_id.size()) % HASH_PRIME % max_objs;
+  int max_objs =
+    (cct->_conf->rgw_lc_max_objs > HASH_PRIME ? HASH_PRIME :
+     cct->_conf->rgw_lc_max_objs);
+  /* XXXX oh noes!!! */
+  int index = ceph_str_hash_linux(shard_id.c_str(),
+                                 shard_id.size()) % HASH_PRIME % max_objs;
   *oid = lc_oid_prefix;
   char buf[32];
   snprintf(buf, 32, ".%d", index);
@@ -1394,14 +1488,14 @@ void get_lc_oid(CephContext *cct, const string& shard_id, string *oid)
   return;
 }
 
-
-
 static std::string get_lc_shard_name(const rgw_bucket& bucket){
   return string_join_reserve(':', bucket.tenant, bucket.name, bucket.marker);
 }
 
 template<typename F>
-static int guard_lc_modify(rgw::sal::RGWRadosStore* store, const rgw_bucket& bucket, const string& cookie, const F& f) {
+static int guard_lc_modify(
+  rgw::sal::RGWRadosStore* store, const rgw_bucket& bucket,
+  const string& cookie, const F& f) {
   CephContext *cct = store->ctx();
 
   string shard_id = get_lc_shard_name(bucket);
@@ -1454,17 +1548,17 @@ int RGWLC::set_bucket_config(RGWBucketInfo& bucket_info,
 
   attrs[RGW_ATTR_LC] = std::move(lc_bl);
 
-  int ret = store->ctl()->bucket->set_bucket_instance_attrs(bucket_info, attrs,
-                                                        &bucket_info.objv_tracker,
-                                                        null_yield);
+  int ret =
+    store->ctl()->bucket->set_bucket_instance_attrs(
+      bucket_info, attrs, &bucket_info.objv_tracker, null_yield);
   if (ret < 0)
     return ret;
 
   rgw_bucket& bucket = bucket_info.bucket;
 
-
-  ret = guard_lc_modify(store, bucket, cookie, [&](librados::IoCtx *ctx, const string& oid,
-                                                   const pair<string, int>& entry) {
+  ret = guard_lc_modify(store, bucket, cookie,
+                       [&](librados::IoCtx *ctx, const string& oid,
+                           const pair<string, int>& entry) {
     return cls_rgw_lc_set_entry(*ctx, oid, entry);
   });
 
@@ -1476,9 +1570,9 @@ int RGWLC::remove_bucket_config(RGWBucketInfo& bucket_info,
 {
   map<string, bufferlist> attrs = bucket_attrs;
   attrs.erase(RGW_ATTR_LC);
-  int ret = store->ctl()->bucket->set_bucket_instance_attrs(bucket_info, attrs,
-                                                        &bucket_info.objv_tracker,
-                                                        null_yield);
+  int ret =
+    store->ctl()->bucket->set_bucket_instance_attrs(
+      bucket_info, attrs, &bucket_info.objv_tracker, null_yield);
 
   rgw_bucket& bucket = bucket_info.bucket;
 
@@ -1489,8 +1583,9 @@ int RGWLC::remove_bucket_config(RGWBucketInfo& bucket_info,
   }
 
 
-  ret = guard_lc_modify(store, bucket, cookie, [&](librados::IoCtx *ctx, const string& oid,
-                                                   const pair<string, int>& entry) {
+  ret = guard_lc_modify(store, bucket, cookie,
+                       [&](librados::IoCtx *ctx, const string& oid,
+                           const pair<string, int>& entry) {
     return cls_rgw_lc_rm_entry(*ctx, oid, entry);
   });
 
@@ -1499,7 +1594,8 @@ int RGWLC::remove_bucket_config(RGWBucketInfo& bucket_info,
 
 namespace rgw::lc {
 
-int fix_lc_shard_entry(rgw::sal::RGWRadosStore* store, const RGWBucketInfo& bucket_info,
+int fix_lc_shard_entry(rgw::sal::RGWRadosStore* store,
+                      const RGWBucketInfo& bucket_info,
                       const map<std::string,bufferlist>& battrs)
 {
   if (auto aiter = battrs.find(RGW_ATTR_LC);
@@ -1533,12 +1629,12 @@ int fix_lc_shard_entry(rgw::sal::RGWRadosStore* store, const RGWBucketInfo& buck
     gen_rand_alphanumeric(store->ctx(), cookie_buf, sizeof(cookie_buf) - 1);
     std::string cookie = cookie_buf;
 
-    ret = guard_lc_modify(store, bucket_info.bucket, cookie,
-                         [&lc_pool_ctx, &lc_oid](librados::IoCtx *ctx, const string& oid,
-                                           const pair<string, int>& entry) {
-                           return cls_rgw_lc_set_entry(*lc_pool_ctx,
-                                                       lc_oid, entry);
-                         });
+    ret = guard_lc_modify(
+      store, bucket_info.bucket, cookie,
+      [&lc_pool_ctx, &lc_oid](librados::IoCtx *ctx, const string& oid,
+                             const pair<string, int>& entry) {
+       return cls_rgw_lc_set_entry(*lc_pool_ctx, lc_oid, entry);
+      });
 
   }
 
index 32de9581664cd143ebc98b224946a39f48d96ee7..b583aa4e3652b2eee6e4083da1b32e0c6b38e10e 100644 (file)
@@ -458,6 +458,7 @@ class RGWLC : public DoutPrefixProvider {
   std::atomic<bool> down_flag = { false };
   string cookie;
 
+public:
   class LCWorker : public Thread {
     const DoutPrefixProvider *dpp;
     CephContext *cct;
@@ -466,16 +467,20 @@ class RGWLC : public DoutPrefixProvider {
     ceph::condition_variable cond;
 
   public:
-    LCWorker(const DoutPrefixProvider* _dpp, CephContext *_cct, RGWLC *_lc) : dpp(_dpp), cct(_cct), lc(_lc) {}
+    LCWorker(const DoutPrefixProvider* _dpp, CephContext *_cct, RGWLC *_lc);
+    RGWLC* get_lc() { return lc; }
     void *entry() override;
     void stop();
     bool should_work(utime_t& now);
     int schedule_next_start_time(utime_t& start, utime_t& now);
-  };
-  
-  public:
-  LCWorker *worker;
-  RGWLC() : cct(NULL), store(NULL), worker(NULL) {}
+    friend class RGWRados;
+  }; /* LCWorker */
+
+  friend class RGWRados;
+
+  std::vector<std::unique_ptr<RGWLC::LCWorker>> workers;
+
+  RGWLC() : cct(nullptr), store(nullptr) {}
   ~RGWLC() {
     stop_processor();
     finalize();
@@ -484,13 +489,13 @@ class RGWLC : public DoutPrefixProvider {
   void initialize(CephContext *_cct, rgw::sal::RGWRadosStore *_store);
   void finalize();
 
-  int process();
-  int process(int index, int max_secs);
+  int process(LCWorker* worker);
+  int process(int index, int max_secs, LCWorker* worker);
   bool if_already_run_today(time_t& start_date);
   int list_lc_progress(const string& marker, uint32_t max_entries, map<string, int> *progress_map);
-  int bucket_lc_prepare(int index);
-  int bucket_lc_process(string& shard_id);
-  int bucket_lc_post(int index, int max_lock_sec, pair<string, int >& entry, int& result);
+  int bucket_lc_prepare(int index, LCWorker* worker);
+  int bucket_lc_process(string& shard_id, LCWorker* worker);
+  int bucket_lc_post(int index, int max_lock_sec, pair<string, int >& entry, int& result, LCWorker* worker);
   bool going_down();
   void start_processor();
   void stop_processor();
index 77cb134b46b575ce617c03c0d3c6fdb23de660b2..951bf54fee7b26175ada432156c576fd85e0b8cb 100644 (file)
@@ -35,6 +35,7 @@ int rgw_perf_start(CephContext *cct)
   plb.add_u64_counter(l_rgw_keystone_token_cache_miss, "keystone_token_cache_miss", "Keystone token cache miss");
 
   plb.add_u64_counter(l_rgw_gc_retire, "gc_retire_object", "GC object retires");
+  plb.add_u64_counter(l_rgw_lc_remove_expired, "lc_remove_expired", "LC removed objects");
 
   plb.add_u64_counter(l_rgw_pubsub_event_triggered, "pubsub_event_triggered", "Pubsub events with at least one topic");
   plb.add_u64_counter(l_rgw_pubsub_event_lost, "pubsub_event_lost", "Pubsub events lost");
index 1307d36d7043b5e3e016f44f0c4c8491a67ae13b..e07a3d9897c2a27b817a57346d0af473b3c8c93f 100644 (file)
@@ -32,6 +32,7 @@ enum {
   l_rgw_keystone_token_cache_miss,
 
   l_rgw_gc_retire,
+  l_rgw_lc_remove_expired,
 
   l_rgw_pubsub_event_triggered,
   l_rgw_pubsub_event_lost,
index 4f3e2f0c2e17f5a1f9750b1149918c7dec2d79d3..9acf875a2db20fbe2396d7cee03ec1c2d3d02e73 100644 (file)
@@ -8047,7 +8047,7 @@ int RGWRados::list_lc_progress(const string& marker, uint32_t max_entries, map<s
 
 int RGWRados::process_lc()
 {
-  return lc->process();
+  return lc->process(nullptr);
 }
 
 bool RGWRados::process_expire_objects()