]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgwlc: optimize single-bucket lifecycle processing 44139/head
authorMatt Benjamin <mbenjamin@redhat.com>
Tue, 30 Nov 2021 17:42:33 +0000 (12:42 -0500)
committerMatt Benjamin <mbenjamin@redhat.com>
Wed, 1 Dec 2021 19:25:20 +0000 (14:25 -0500)
Looks up the shard index of the corresponding bucket, and only
buckets in the corresponding shard are considered for processing.
This has a side effect of matching buckets by id, and also adds
support for --tenant.

Signed-off-by: Matt Benjamin <mbenjamin@redhat.com>
doc/man/8/radosgw-admin.rst
src/rgw/rgw_admin.cc
src/rgw/rgw_lc.cc
src/rgw/rgw_lc.h
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h

index 80d9a3468e691fd642627bbd2075e3059537c2f3..57d7fcfc902384dbbbdcf0355113de0a01a2b5d6 100644 (file)
@@ -365,8 +365,9 @@ which are as follows:
   List all bucket lifecycle progress.
 
 :command:`lc process`
-  Manually process lifecycle.  If bucket specified with --bucket=<bucket>,
-  only <bucket> is processed.
+  Manually process lifecycle.  If a bucket is specified (e.g., via
+  --bucket_id or via --bucket and optional --tenant), only that bucket
+  is processed.
 
 :command:`metadata get`
   Get metadata info.
index afa8fc8535c1b5be7d88db5df9dc53481bad5fb7..1721bb0300393add62fd3f53eb774b31b1148361 100644 (file)
@@ -7549,14 +7549,24 @@ next:
   }
 
   if (opt_cmd == OPT::LC_PROCESS) {
-    int ret = static_cast<rgw::sal::RadosStore*>(store)->getRados()->process_lc(bucket_name);
+    if ((! bucket_name.empty()) ||
+       (! bucket_id.empty())) {
+        int ret = init_bucket(nullptr, tenant, bucket_name, bucket_id, &bucket);
+       if (ret < 0) {
+         cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret)
+              << std::endl;
+         return ret;
+       }
+    }
+
+    int ret =
+      static_cast<rgw::sal::RadosStore*>(store)->getRados()->process_lc(bucket);
     if (ret < 0) {
       cerr << "ERROR: lc processing returned error: " << cpp_strerror(-ret) << std::endl;
       return 1;
     }
   }
 
-
   if (opt_cmd == OPT::LC_RESHARD_FIX) {
     ret = RGWBucketAdminOp::fix_lc_shards(store, bucket_op, stream_flusher, dpp());
     if (ret < 0) {
index 90a7fe9046fa0d0c5aa28a338f756d3fc2b82482..1556124be4a44717a0c6a0a795a2255e2bec06d7 100644 (file)
@@ -214,11 +214,11 @@ bool RGWLifecycleConfiguration::valid()
 
 void *RGWLC::LCWorker::entry() {
   do {
-    std::string bucket_name{""}; // empty restriction, all buckets
+    std::unique_ptr<rgw::sal::Bucket> all_buckets; // empty restriction
     utime_t start = ceph_clock_now();
     if (should_work(start)) {
       ldpp_dout(dpp, 2) << "life cycle: start" << dendl;
-      int r = lc->process(this, bucket_name, false /* once */);
+      int r = lc->process(this, all_buckets, false /* once */);
       if (r < 0) {
         ldpp_dout(dpp, 0) << "ERROR: do life cycle process() returned error r="
                          << r << dendl;
@@ -1813,6 +1813,9 @@ int RGWLC::bucket_lc_post(int index, int max_lock_sec,
     ldpp_dout(this, 20) << "RGWLC::bucket_lc_post() lock " << obj_names[index]
                        << dendl;
     if (result ==  -ENOENT) {
+      /* XXXX are we SURE the only way result could == ENOENT is when
+       * there is no such bucket?  It is currently the value returned
+       * from bucket_lc_process(...) */
       ret = sal_lc->rm_entry(obj_names[index],  entry);
       if (ret < 0) {
         ldpp_dout(this, 0) << "RGWLC::bucket_lc_post() failed to remove entry "
@@ -1882,18 +1885,60 @@ static inline vector<int> random_sequence(uint32_t n)
   return v;
 }
 
-int RGWLC::process(LCWorker* worker, const std::string& bucket_name,
+static inline int get_lc_index(CephContext *cct,
+                              const std::string& shard_id)
+{
+  int max_objs =
+    (cct->_conf->rgw_lc_max_objs > HASH_PRIME ? HASH_PRIME :
+     cct->_conf->rgw_lc_max_objs);
+  /* n.b. review hash algo */
+  int index = ceph_str_hash_linux(shard_id.c_str(),
+                                 shard_id.size()) % HASH_PRIME % max_objs;
+  return index;
+}
+
+static inline void get_lc_oid(CephContext *cct,
+                             const std::string& shard_id, string *oid)
+{
+  /* n.b. review hash algo */
+  int index = get_lc_index(cct, shard_id);
+  *oid = lc_oid_prefix;
+  char buf[32];
+  snprintf(buf, 32, ".%d", index);
+  oid->append(buf);
+  return;
+}
+
+static std::string get_lc_shard_name(const rgw_bucket& bucket){
+  return string_join_reserve(':', bucket.tenant, bucket.name, bucket.marker);
+}
+
+int RGWLC::process(LCWorker* worker,
+                  const std::unique_ptr<rgw::sal::Bucket>& optional_bucket,
                   bool once = false)
 {
+  int ret = 0;
   int max_secs = cct->_conf->rgw_lc_lock_max_time;
 
-  /* generate an index-shard sequence unrelated to any other
-   * that might be running in parallel */
-  vector<int> shard_seq = random_sequence(max_objs);
-  for (auto index : shard_seq) {
-    int ret = process(index, max_secs, worker, bucket_name, once);
-    if (ret < 0)
-      return ret;
+  if (optional_bucket) {
+    /* if a bucket is provided, this is a single-bucket run, and
+     * can be processed without traversing any state entries (we
+     * do need the entry {pro,epi}logue which update the state entry
+     * for this bucket) */
+    auto bucket_entry_marker = get_lc_shard_name(optional_bucket->get_key());
+    auto index = get_lc_index(store->ctx(), bucket_entry_marker);
+    ret = process_bucket(index, max_secs, worker, bucket_entry_marker, once);
+    return ret;
+  } else {
+    /* generate an index-shard sequence unrelated to any other
+     * that might be running in parallel */
+    std::string all_buckets{""};
+    vector<int> shard_seq = random_sequence(max_objs);
+    for (auto index : shard_seq) {
+      ret = process(index, max_secs, worker, once);
+      if (ret < 0)
+       return ret;
+    }
   }
 
   return 0;
@@ -1925,14 +1970,96 @@ time_t RGWLC::thread_stop_at()
   return time(nullptr) + interval;
 }
 
+int RGWLC::process_bucket(int index, int max_lock_secs, LCWorker* worker,
+                         const std::string& bucket_entry_marker,
+                         bool once = false)
+{
+  ldpp_dout(this, 5) << "RGWLC::process_bucket(): ENTER: "
+         << "index: " << index << " worker ix: " << worker->ix
+         << dendl;
+
+  int ret = 0;
+  std::unique_ptr<rgw::sal::LCSerializer> serializer(
+    sal_lc->get_serializer(lc_index_lock_name, obj_names[index],
+                          std::string()));
+
+  rgw::sal::Lifecycle::LCEntry entry;
+  if (max_lock_secs <= 0) {
+    return -EAGAIN;
+  }
+
+  utime_t time(max_lock_secs, 0);
+  ret = serializer->try_lock(this, time, null_yield);
+  if (ret == -EBUSY || ret == -EEXIST) {
+    /* already locked by another lc processor */
+    ldpp_dout(this, 0) << "RGWLC::process() failed to acquire lock on "
+                      << obj_names[index] << dendl;
+    return -EBUSY;
+  }
+  if (ret < 0)
+    return 0;
+
+  std::unique_lock<rgw::sal::LCSerializer> lock(
+    *(serializer.get()), std::adopt_lock);
+
+  if (! (cct->_conf->rgw_lc_lock_max_time == 9969)) {
+    ret = sal_lc->get_entry(obj_names[index], bucket_entry_marker, entry);
+    if (ret >= 0) {
+      if (entry.status == lc_processing) {
+       if (expired_session(entry.start_time)) {
+         ldpp_dout(this, 5) << "RGWLC::process_bucket(): STALE lc session found for: " << entry
+                            << " index: " << index << " worker ix: " << worker->ix
+                            << " (clearing)"
+                            << dendl;
+       } else {
+         ldpp_dout(this, 5) << "RGWLC::process_bucket(): ACTIVE entry: "
+                            << entry
+                            << " index: " << index
+                            << " worker ix: " << worker->ix
+                            << dendl;
+         return ret;
+       }
+      }
+    }
+  }
+
+  /* do nothing if no bucket */
+  if (entry.bucket.empty()) {
+    return ret;
+  }
+
+  ldpp_dout(this, 5) << "RGWLC::process_bucket(): START entry 1: " << entry
+                    << " index: " << index << " worker ix: " << worker->ix
+                    << dendl;
+
+  entry.status = lc_processing;
+  ret = sal_lc->set_entry(obj_names[index], entry);
+  if (ret < 0) {
+    ldpp_dout(this, 0) << "RGWLC::process_bucket() failed to set obj entry "
+                      << obj_names[index] << entry.bucket << entry.status
+                      << dendl;
+    return ret;
+  }
+
+  ldpp_dout(this, 5) << "RGWLC::process_bucket(): START entry 2: " << entry
+                    << " index: " << index << " worker ix: " << worker->ix
+                    << dendl;
+
+  lock.unlock();
+  ret = bucket_lc_process(entry.bucket, worker, thread_stop_at(), once);
+  bucket_lc_post(index, max_lock_secs, entry, ret, worker);
+
+  return ret;
+} /* RGWLC::process_bucket */
+
 int RGWLC::process(int index, int max_lock_secs, LCWorker* worker,
-                  const std::string& bucket_name, bool once = false)
+                  bool once = false)
 {
   ldpp_dout(this, 5) << "RGWLC::process(): ENTER: "
          << "index: " << index << " worker ix: " << worker->ix
          << dendl;
 
-  std::string bucket_prefix = fmt::format(":{}:", bucket_name);
+  int ret = 0;
   rgw::sal::LCSerializer* lock = sal_lc->get_serializer(lc_index_lock_name,
                                                        obj_names[index],
                                                        std::string());
@@ -1944,14 +2071,13 @@ int RGWLC::process(int index, int max_lock_secs, LCWorker* worker,
       return -EAGAIN;
 
     utime_t time(max_lock_secs, 0);
-
-    int ret = lock->try_lock(this, time, null_yield);
+    ret = lock->try_lock(this, time, null_yield);
     if (ret == -EBUSY || ret == -EEXIST) {
       /* already locked by another lc processor */
       ldpp_dout(this, 0) << "RGWLC::process() failed to acquire lock on "
           << obj_names[index] << ", sleep 5, try again" << dendl;
       sleep(5);
-      continue;
+      continue; // XXXX really retry forever?
     }
     if (ret < 0)
       return 0;
@@ -1997,7 +2123,6 @@ int RGWLC::process(int index, int max_lock_secs, LCWorker* worker,
       }
     }
 
-next_bucket:          
     ret = sal_lc->get_next_entry(obj_names[index], head.marker, entry);
     if (ret < 0) {
       ldpp_dout(this, 0) << "RGWLC::process() failed to get obj entry "
@@ -2009,14 +2134,6 @@ next_bucket:
     if (entry.bucket.empty())
       goto exit;
 
-    /* skip over bucket if processing for only a single bucket was
-     * requested, and this one isn't it */
-    if (! bucket_name.empty()) {
-      if (! boost::algorithm::starts_with(entry.bucket, bucket_prefix)) {
-       goto next_bucket;
-      }
-    }
-
     ldpp_dout(this, 5) << "RGWLC::process(): START entry 1: " << entry
            << " index: " << index << " worker ix: " << worker->ix
            << dendl;
@@ -2166,26 +2283,6 @@ void RGWLifecycleConfiguration::generate_test_instances(
   o.push_back(new RGWLifecycleConfiguration);
 }
 
-static inline void get_lc_oid(CephContext *cct,
-                             const string& shard_id, string *oid)
-{
-  int max_objs =
-    (cct->_conf->rgw_lc_max_objs > HASH_PRIME ? HASH_PRIME :
-     cct->_conf->rgw_lc_max_objs);
-  /* n.b. review hash algo */
-  int index = ceph_str_hash_linux(shard_id.c_str(),
-                                 shard_id.size()) % HASH_PRIME % max_objs;
-  *oid = lc_oid_prefix;
-  char buf[32];
-  snprintf(buf, 32, ".%d", index);
-  oid->append(buf);
-  return;
-}
-
-static std::string get_lc_shard_name(const rgw_bucket& bucket){
-  return string_join_reserve(':', bucket.tenant, bucket.name, bucket.marker);
-}
-
 template<typename F>
 static int guard_lc_modify(const DoutPrefixProvider *dpp,
                            rgw::sal::Store* store,
index 2abea2c3ab9439b8a5e3c2a219aefff0a12e0422..f79dd23ae7c565c3a16c04d56f4cffdacfc129e2 100644 (file)
@@ -515,9 +515,12 @@ public:
   void initialize(CephContext *_cct, rgw::sal::Store* _store);
   void finalize();
 
-  int process(LCWorker* worker, const std::string& bucket_name, bool once);
-  int process(int index, int max_secs, LCWorker* worker,
-             const std::string& bucket_name, bool once);
+  int process(LCWorker* worker,
+             const std::unique_ptr<rgw::sal::Bucket>& optional_bucket,
+             bool once);
+  int process(int index, int max_lock_secs, LCWorker* worker, bool once);
+  int process_bucket(int index, int max_lock_secs, LCWorker* worker,
+                    const std::string& bucket_entry_marker, bool once);
   bool if_already_run_today(time_t start_date);
   bool expired_session(time_t started);
   time_t thread_stop_at();
index 0b1d705fe7e9aef9d4cbca38da8f4ac2b07295a6..d8eb2d05a565567fae22bdc2a342fd9d8fe8845b 100644 (file)
@@ -8336,12 +8336,12 @@ int RGWRados::list_lc_progress(string& marker, uint32_t max_entries,
   return lc->list_lc_progress(marker, max_entries, progress_map, index);
 }
 
-int RGWRados::process_lc(const std::string& bucket_name)
+int RGWRados::process_lc(const std::unique_ptr<rgw::sal::Bucket>& optional_bucket)
 {
   RGWLC lc;
   lc.initialize(cct, this->store);
   RGWLC::LCWorker worker(&lc, cct, &lc, 0);
-  auto ret = lc.process(&worker, bucket_name, true /* once */);
+  auto ret = lc.process(&worker, optional_bucket, true /* once */);
   lc.stop_processor(); // sets down_flag, but returns immediately
   return ret;
 }
index 5226b053ccc5e2239435ba6b291eaad6d24787e4..f4f5658353092706e858c07bfd34ed86a2b6692e 100644 (file)
@@ -1450,7 +1450,7 @@ public:
   bool process_expire_objects(const DoutPrefixProvider *dpp);
   int defer_gc(const DoutPrefixProvider *dpp, void *ctx, const RGWBucketInfo& bucket_info, const rgw_obj& obj, optional_yield y);
 
-  int process_lc(const std::string& bucket_name);
+  int process_lc(const std::unique_ptr<rgw::sal::Bucket>& optional_bucket);
   int list_lc_progress(std::string& marker, uint32_t max_entries,
                       std::vector<rgw::sal::Lifecycle::LCEntry>& progress_map, int& index);