From af11694ad1177a20a896dcba9425697f155aff15 Mon Sep 17 00:00:00 2001 From: Matt Benjamin Date: Tue, 30 Nov 2021 12:42:33 -0500 Subject: [PATCH] rgwlc: optimize single-bucket lifecycle processing Looks up the shard index of the corresponding bucket, and only buckets in the corresponding shard are considered for processing. This has a side effect of matching buckets by id, and also adds support for --tenant. Signed-off-by: Matt Benjamin --- doc/man/8/radosgw-admin.rst | 5 +- src/rgw/rgw_admin.cc | 14 ++- src/rgw/rgw_lc.cc | 185 +++++++++++++++++++++++++++--------- src/rgw/rgw_lc.h | 9 +- src/rgw/rgw_rados.cc | 4 +- src/rgw/rgw_rados.h | 2 +- 6 files changed, 165 insertions(+), 54 deletions(-) diff --git a/doc/man/8/radosgw-admin.rst b/doc/man/8/radosgw-admin.rst index 80d9a3468e6..57d7fcfc902 100644 --- a/doc/man/8/radosgw-admin.rst +++ b/doc/man/8/radosgw-admin.rst @@ -365,8 +365,9 @@ which are as follows: List all bucket lifecycle progress. :command:`lc process` - Manually process lifecycle. If bucket specified with --bucket=, - only is processed. + Manually process lifecycle. If a bucket is specified (e.g., via + --bucket_id or via --bucket and optional --tenant), only that bucket + is processed. :command:`metadata get` Get metadata info. diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index afa8fc8535c..1721bb03003 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -7549,14 +7549,24 @@ next: } if (opt_cmd == OPT::LC_PROCESS) { - int ret = static_cast(store)->getRados()->process_lc(bucket_name); + if ((! bucket_name.empty()) || + (! bucket_id.empty())) { + int ret = init_bucket(nullptr, tenant, bucket_name, bucket_id, &bucket); + if (ret < 0) { + cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) + << std::endl; + return ret; + } + } + + int ret = + static_cast(store)->getRados()->process_lc(bucket); if (ret < 0) { cerr << "ERROR: lc processing returned error: " << cpp_strerror(-ret) << std::endl; return 1; } } - if (opt_cmd == OPT::LC_RESHARD_FIX) { ret = RGWBucketAdminOp::fix_lc_shards(store, bucket_op, stream_flusher, dpp()); if (ret < 0) { diff --git a/src/rgw/rgw_lc.cc b/src/rgw/rgw_lc.cc index 90a7fe9046f..1556124be4a 100644 --- a/src/rgw/rgw_lc.cc +++ b/src/rgw/rgw_lc.cc @@ -214,11 +214,11 @@ bool RGWLifecycleConfiguration::valid() void *RGWLC::LCWorker::entry() { do { - std::string bucket_name{""}; // empty restriction, all buckets + std::unique_ptr all_buckets; // empty restriction utime_t start = ceph_clock_now(); if (should_work(start)) { ldpp_dout(dpp, 2) << "life cycle: start" << dendl; - int r = lc->process(this, bucket_name, false /* once */); + int r = lc->process(this, all_buckets, false /* once */); if (r < 0) { ldpp_dout(dpp, 0) << "ERROR: do life cycle process() returned error r=" << r << dendl; @@ -1813,6 +1813,9 @@ int RGWLC::bucket_lc_post(int index, int max_lock_sec, ldpp_dout(this, 20) << "RGWLC::bucket_lc_post() lock " << obj_names[index] << dendl; if (result == -ENOENT) { + /* XXXX are we SURE the only way result could == ENOENT is when + * there is no such bucket? It is currently the value returned + * from bucket_lc_process(...) */ ret = sal_lc->rm_entry(obj_names[index], entry); if (ret < 0) { ldpp_dout(this, 0) << "RGWLC::bucket_lc_post() failed to remove entry " @@ -1882,18 +1885,60 @@ static inline vector random_sequence(uint32_t n) return v; } -int RGWLC::process(LCWorker* worker, const std::string& bucket_name, +static inline int get_lc_index(CephContext *cct, + const std::string& shard_id) +{ + int max_objs = + (cct->_conf->rgw_lc_max_objs > HASH_PRIME ? HASH_PRIME : + cct->_conf->rgw_lc_max_objs); + /* n.b. review hash algo */ + int index = ceph_str_hash_linux(shard_id.c_str(), + shard_id.size()) % HASH_PRIME % max_objs; + return index; +} + +static inline void get_lc_oid(CephContext *cct, + const std::string& shard_id, string *oid) +{ + /* n.b. review hash algo */ + int index = get_lc_index(cct, shard_id); + *oid = lc_oid_prefix; + char buf[32]; + snprintf(buf, 32, ".%d", index); + oid->append(buf); + return; +} + +static std::string get_lc_shard_name(const rgw_bucket& bucket){ + return string_join_reserve(':', bucket.tenant, bucket.name, bucket.marker); +} + +int RGWLC::process(LCWorker* worker, + const std::unique_ptr& optional_bucket, bool once = false) { + int ret = 0; int max_secs = cct->_conf->rgw_lc_lock_max_time; - /* generate an index-shard sequence unrelated to any other - * that might be running in parallel */ - vector shard_seq = random_sequence(max_objs); - for (auto index : shard_seq) { - int ret = process(index, max_secs, worker, bucket_name, once); - if (ret < 0) - return ret; + if (optional_bucket) { + /* if a bucket is provided, this is a single-bucket run, and + * can be processed without traversing any state entries (we + * do need the entry {pro,epi}logue which update the state entry + * for this bucket) */ + auto bucket_entry_marker = get_lc_shard_name(optional_bucket->get_key()); + auto index = get_lc_index(store->ctx(), bucket_entry_marker); + ret = process_bucket(index, max_secs, worker, bucket_entry_marker, once); + return ret; + } else { + /* generate an index-shard sequence unrelated to any other + * that might be running in parallel */ + std::string all_buckets{""}; + vector shard_seq = random_sequence(max_objs); + for (auto index : shard_seq) { + ret = process(index, max_secs, worker, once); + if (ret < 0) + return ret; + } } return 0; @@ -1925,14 +1970,96 @@ time_t RGWLC::thread_stop_at() return time(nullptr) + interval; } +int RGWLC::process_bucket(int index, int max_lock_secs, LCWorker* worker, + const std::string& bucket_entry_marker, + bool once = false) +{ + ldpp_dout(this, 5) << "RGWLC::process_bucket(): ENTER: " + << "index: " << index << " worker ix: " << worker->ix + << dendl; + + int ret = 0; + std::unique_ptr serializer( + sal_lc->get_serializer(lc_index_lock_name, obj_names[index], + std::string())); + + rgw::sal::Lifecycle::LCEntry entry; + if (max_lock_secs <= 0) { + return -EAGAIN; + } + + utime_t time(max_lock_secs, 0); + ret = serializer->try_lock(this, time, null_yield); + if (ret == -EBUSY || ret == -EEXIST) { + /* already locked by another lc processor */ + ldpp_dout(this, 0) << "RGWLC::process() failed to acquire lock on " + << obj_names[index] << dendl; + return -EBUSY; + } + if (ret < 0) + return 0; + + std::unique_lock lock( + *(serializer.get()), std::adopt_lock); + + if (! (cct->_conf->rgw_lc_lock_max_time == 9969)) { + ret = sal_lc->get_entry(obj_names[index], bucket_entry_marker, entry); + if (ret >= 0) { + if (entry.status == lc_processing) { + if (expired_session(entry.start_time)) { + ldpp_dout(this, 5) << "RGWLC::process_bucket(): STALE lc session found for: " << entry + << " index: " << index << " worker ix: " << worker->ix + << " (clearing)" + << dendl; + } else { + ldpp_dout(this, 5) << "RGWLC::process_bucket(): ACTIVE entry: " + << entry + << " index: " << index + << " worker ix: " << worker->ix + << dendl; + return ret; + } + } + } + } + + /* do nothing if no bucket */ + if (entry.bucket.empty()) { + return ret; + } + + ldpp_dout(this, 5) << "RGWLC::process_bucket(): START entry 1: " << entry + << " index: " << index << " worker ix: " << worker->ix + << dendl; + + entry.status = lc_processing; + ret = sal_lc->set_entry(obj_names[index], entry); + if (ret < 0) { + ldpp_dout(this, 0) << "RGWLC::process_bucket() failed to set obj entry " + << obj_names[index] << entry.bucket << entry.status + << dendl; + return ret; + } + + ldpp_dout(this, 5) << "RGWLC::process_bucket(): START entry 2: " << entry + << " index: " << index << " worker ix: " << worker->ix + << dendl; + + lock.unlock(); + ret = bucket_lc_process(entry.bucket, worker, thread_stop_at(), once); + bucket_lc_post(index, max_lock_secs, entry, ret, worker); + + return ret; +} /* RGWLC::process_bucket */ + int RGWLC::process(int index, int max_lock_secs, LCWorker* worker, - const std::string& bucket_name, bool once = false) + bool once = false) { ldpp_dout(this, 5) << "RGWLC::process(): ENTER: " << "index: " << index << " worker ix: " << worker->ix << dendl; - std::string bucket_prefix = fmt::format(":{}:", bucket_name); + int ret = 0; rgw::sal::LCSerializer* lock = sal_lc->get_serializer(lc_index_lock_name, obj_names[index], std::string()); @@ -1944,14 +2071,13 @@ int RGWLC::process(int index, int max_lock_secs, LCWorker* worker, return -EAGAIN; utime_t time(max_lock_secs, 0); - - int ret = lock->try_lock(this, time, null_yield); + ret = lock->try_lock(this, time, null_yield); if (ret == -EBUSY || ret == -EEXIST) { /* already locked by another lc processor */ ldpp_dout(this, 0) << "RGWLC::process() failed to acquire lock on " << obj_names[index] << ", sleep 5, try again" << dendl; sleep(5); - continue; + continue; // XXXX really retry forever? } if (ret < 0) return 0; @@ -1997,7 +2123,6 @@ int RGWLC::process(int index, int max_lock_secs, LCWorker* worker, } } -next_bucket: ret = sal_lc->get_next_entry(obj_names[index], head.marker, entry); if (ret < 0) { ldpp_dout(this, 0) << "RGWLC::process() failed to get obj entry " @@ -2009,14 +2134,6 @@ next_bucket: if (entry.bucket.empty()) goto exit; - /* skip over bucket if processing for only a single bucket was - * requested, and this one isn't it */ - if (! bucket_name.empty()) { - if (! boost::algorithm::starts_with(entry.bucket, bucket_prefix)) { - goto next_bucket; - } - } - ldpp_dout(this, 5) << "RGWLC::process(): START entry 1: " << entry << " index: " << index << " worker ix: " << worker->ix << dendl; @@ -2166,26 +2283,6 @@ void RGWLifecycleConfiguration::generate_test_instances( o.push_back(new RGWLifecycleConfiguration); } -static inline void get_lc_oid(CephContext *cct, - const string& shard_id, string *oid) -{ - int max_objs = - (cct->_conf->rgw_lc_max_objs > HASH_PRIME ? HASH_PRIME : - cct->_conf->rgw_lc_max_objs); - /* n.b. review hash algo */ - int index = ceph_str_hash_linux(shard_id.c_str(), - shard_id.size()) % HASH_PRIME % max_objs; - *oid = lc_oid_prefix; - char buf[32]; - snprintf(buf, 32, ".%d", index); - oid->append(buf); - return; -} - -static std::string get_lc_shard_name(const rgw_bucket& bucket){ - return string_join_reserve(':', bucket.tenant, bucket.name, bucket.marker); -} - template static int guard_lc_modify(const DoutPrefixProvider *dpp, rgw::sal::Store* store, diff --git a/src/rgw/rgw_lc.h b/src/rgw/rgw_lc.h index 2abea2c3ab9..f79dd23ae7c 100644 --- a/src/rgw/rgw_lc.h +++ b/src/rgw/rgw_lc.h @@ -515,9 +515,12 @@ public: void initialize(CephContext *_cct, rgw::sal::Store* _store); void finalize(); - int process(LCWorker* worker, const std::string& bucket_name, bool once); - int process(int index, int max_secs, LCWorker* worker, - const std::string& bucket_name, bool once); + int process(LCWorker* worker, + const std::unique_ptr& optional_bucket, + bool once); + int process(int index, int max_lock_secs, LCWorker* worker, bool once); + int process_bucket(int index, int max_lock_secs, LCWorker* worker, + const std::string& bucket_entry_marker, bool once); bool if_already_run_today(time_t start_date); bool expired_session(time_t started); time_t thread_stop_at(); diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 0b1d705fe7e..d8eb2d05a56 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -8336,12 +8336,12 @@ int RGWRados::list_lc_progress(string& marker, uint32_t max_entries, return lc->list_lc_progress(marker, max_entries, progress_map, index); } -int RGWRados::process_lc(const std::string& bucket_name) +int RGWRados::process_lc(const std::unique_ptr& optional_bucket) { RGWLC lc; lc.initialize(cct, this->store); RGWLC::LCWorker worker(&lc, cct, &lc, 0); - auto ret = lc.process(&worker, bucket_name, true /* once */); + auto ret = lc.process(&worker, optional_bucket, true /* once */); lc.stop_processor(); // sets down_flag, but returns immediately return ret; } diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index 5226b053ccc..f4f56583530 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -1450,7 +1450,7 @@ public: bool process_expire_objects(const DoutPrefixProvider *dpp); int defer_gc(const DoutPrefixProvider *dpp, void *ctx, const RGWBucketInfo& bucket_info, const rgw_obj& obj, optional_yield y); - int process_lc(const std::string& bucket_name); + int process_lc(const std::unique_ptr& optional_bucket); int list_lc_progress(std::string& marker, uint32_t max_entries, std::vector& progress_map, int& index); -- 2.39.5