void *RGWLC::LCWorker::entry() {
do {
- std::string bucket_name{""}; // empty restriction, all buckets
+ std::unique_ptr<rgw::sal::Bucket> all_buckets; // empty restriction
utime_t start = ceph_clock_now();
if (should_work(start)) {
ldpp_dout(dpp, 2) << "life cycle: start" << dendl;
- int r = lc->process(this, bucket_name, false /* once */);
+ int r = lc->process(this, all_buckets, false /* once */);
if (r < 0) {
ldpp_dout(dpp, 0) << "ERROR: do life cycle process() returned error r="
<< r << dendl;
ldpp_dout(this, 20) << "RGWLC::bucket_lc_post() lock " << obj_names[index]
<< dendl;
if (result == -ENOENT) {
+ /* XXXX are we SURE the only way result could == ENOENT is when
+ * there is no such bucket? It is currently the value returned
+ * from bucket_lc_process(...) */
ret = sal_lc->rm_entry(obj_names[index], entry);
if (ret < 0) {
ldpp_dout(this, 0) << "RGWLC::bucket_lc_post() failed to remove entry "
return v;
}
-int RGWLC::process(LCWorker* worker, const std::string& bucket_name,
+static inline int get_lc_index(CephContext *cct,
+ const std::string& shard_id)
+{
+ int max_objs =
+ (cct->_conf->rgw_lc_max_objs > HASH_PRIME ? HASH_PRIME :
+ cct->_conf->rgw_lc_max_objs);
+ /* n.b. review hash algo */
+ int index = ceph_str_hash_linux(shard_id.c_str(),
+ shard_id.size()) % HASH_PRIME % max_objs;
+ return index;
+}
+
+static inline void get_lc_oid(CephContext *cct,
+ const std::string& shard_id, string *oid)
+{
+ /* n.b. review hash algo */
+ int index = get_lc_index(cct, shard_id);
+ *oid = lc_oid_prefix;
+ char buf[32];
+ snprintf(buf, 32, ".%d", index);
+ oid->append(buf);
+ return;
+}
+
+static std::string get_lc_shard_name(const rgw_bucket& bucket){
+ return string_join_reserve(':', bucket.tenant, bucket.name, bucket.marker);
+}
+
+int RGWLC::process(LCWorker* worker,
+ const std::unique_ptr<rgw::sal::Bucket>& optional_bucket,
bool once = false)
{
+ int ret = 0;
int max_secs = cct->_conf->rgw_lc_lock_max_time;
- /* generate an index-shard sequence unrelated to any other
- * that might be running in parallel */
- vector<int> shard_seq = random_sequence(max_objs);
- for (auto index : shard_seq) {
- int ret = process(index, max_secs, worker, bucket_name, once);
- if (ret < 0)
- return ret;
+ if (optional_bucket) {
+ /* if a bucket is provided, this is a single-bucket run, and
+ * can be processed without traversing any state entries (we
+ * do need the entry {pro,epi}logue which update the state entry
+ * for this bucket) */
+ auto bucket_entry_marker = get_lc_shard_name(optional_bucket->get_key());
+ auto index = get_lc_index(store->ctx(), bucket_entry_marker);
+ ret = process_bucket(index, max_secs, worker, bucket_entry_marker, once);
+ return ret;
+ } else {
+ /* generate an index-shard sequence unrelated to any other
+ * that might be running in parallel */
+ std::string all_buckets{""};
+ vector<int> shard_seq = random_sequence(max_objs);
+ for (auto index : shard_seq) {
+ ret = process(index, max_secs, worker, once);
+ if (ret < 0)
+ return ret;
+ }
}
return 0;
return time(nullptr) + interval;
}
+int RGWLC::process_bucket(int index, int max_lock_secs, LCWorker* worker,
+ const std::string& bucket_entry_marker,
+ bool once = false)
+{
+ ldpp_dout(this, 5) << "RGWLC::process_bucket(): ENTER: "
+ << "index: " << index << " worker ix: " << worker->ix
+ << dendl;
+
+ int ret = 0;
+ std::unique_ptr<rgw::sal::LCSerializer> serializer(
+ sal_lc->get_serializer(lc_index_lock_name, obj_names[index],
+ std::string()));
+
+ rgw::sal::Lifecycle::LCEntry entry;
+ if (max_lock_secs <= 0) {
+ return -EAGAIN;
+ }
+
+ utime_t time(max_lock_secs, 0);
+ ret = serializer->try_lock(this, time, null_yield);
+ if (ret == -EBUSY || ret == -EEXIST) {
+ /* already locked by another lc processor */
+ ldpp_dout(this, 0) << "RGWLC::process() failed to acquire lock on "
+ << obj_names[index] << dendl;
+ return -EBUSY;
+ }
+ if (ret < 0)
+ return 0;
+
+ std::unique_lock<rgw::sal::LCSerializer> lock(
+ *(serializer.get()), std::adopt_lock);
+
+ if (! (cct->_conf->rgw_lc_lock_max_time == 9969)) {
+ ret = sal_lc->get_entry(obj_names[index], bucket_entry_marker, entry);
+ if (ret >= 0) {
+ if (entry.status == lc_processing) {
+ if (expired_session(entry.start_time)) {
+ ldpp_dout(this, 5) << "RGWLC::process_bucket(): STALE lc session found for: " << entry
+ << " index: " << index << " worker ix: " << worker->ix
+ << " (clearing)"
+ << dendl;
+ } else {
+ ldpp_dout(this, 5) << "RGWLC::process_bucket(): ACTIVE entry: "
+ << entry
+ << " index: " << index
+ << " worker ix: " << worker->ix
+ << dendl;
+ return ret;
+ }
+ }
+ }
+ }
+
+ /* do nothing if no bucket */
+ if (entry.bucket.empty()) {
+ return ret;
+ }
+
+ ldpp_dout(this, 5) << "RGWLC::process_bucket(): START entry 1: " << entry
+ << " index: " << index << " worker ix: " << worker->ix
+ << dendl;
+
+ entry.status = lc_processing;
+ ret = sal_lc->set_entry(obj_names[index], entry);
+ if (ret < 0) {
+ ldpp_dout(this, 0) << "RGWLC::process_bucket() failed to set obj entry "
+ << obj_names[index] << entry.bucket << entry.status
+ << dendl;
+ return ret;
+ }
+
+ ldpp_dout(this, 5) << "RGWLC::process_bucket(): START entry 2: " << entry
+ << " index: " << index << " worker ix: " << worker->ix
+ << dendl;
+
+ lock.unlock();
+ ret = bucket_lc_process(entry.bucket, worker, thread_stop_at(), once);
+ bucket_lc_post(index, max_lock_secs, entry, ret, worker);
+
+ return ret;
+} /* RGWLC::process_bucket */
+
int RGWLC::process(int index, int max_lock_secs, LCWorker* worker,
- const std::string& bucket_name, bool once = false)
+ bool once = false)
{
ldpp_dout(this, 5) << "RGWLC::process(): ENTER: "
<< "index: " << index << " worker ix: " << worker->ix
<< dendl;
- std::string bucket_prefix = fmt::format(":{}:", bucket_name);
+ int ret = 0;
rgw::sal::LCSerializer* lock = sal_lc->get_serializer(lc_index_lock_name,
obj_names[index],
std::string());
return -EAGAIN;
utime_t time(max_lock_secs, 0);
-
- int ret = lock->try_lock(this, time, null_yield);
+ ret = lock->try_lock(this, time, null_yield);
if (ret == -EBUSY || ret == -EEXIST) {
/* already locked by another lc processor */
ldpp_dout(this, 0) << "RGWLC::process() failed to acquire lock on "
<< obj_names[index] << ", sleep 5, try again" << dendl;
sleep(5);
- continue;
+ continue; // XXXX really retry forever?
}
if (ret < 0)
return 0;
}
}
-next_bucket:
ret = sal_lc->get_next_entry(obj_names[index], head.marker, entry);
if (ret < 0) {
ldpp_dout(this, 0) << "RGWLC::process() failed to get obj entry "
if (entry.bucket.empty())
goto exit;
- /* skip over bucket if processing for only a single bucket was
- * requested, and this one isn't it */
- if (! bucket_name.empty()) {
- if (! boost::algorithm::starts_with(entry.bucket, bucket_prefix)) {
- goto next_bucket;
- }
- }
-
ldpp_dout(this, 5) << "RGWLC::process(): START entry 1: " << entry
<< " index: " << index << " worker ix: " << worker->ix
<< dendl;
o.push_back(new RGWLifecycleConfiguration);
}
-static inline void get_lc_oid(CephContext *cct,
- const string& shard_id, string *oid)
-{
- int max_objs =
- (cct->_conf->rgw_lc_max_objs > HASH_PRIME ? HASH_PRIME :
- cct->_conf->rgw_lc_max_objs);
- /* n.b. review hash algo */
- int index = ceph_str_hash_linux(shard_id.c_str(),
- shard_id.size()) % HASH_PRIME % max_objs;
- *oid = lc_oid_prefix;
- char buf[32];
- snprintf(buf, 32, ".%d", index);
- oid->append(buf);
- return;
-}
-
-static std::string get_lc_shard_name(const rgw_bucket& bucket){
- return string_join_reserve(':', bucket.tenant, bucket.name, bucket.marker);
-}
-
template<typename F>
static int guard_lc_modify(const DoutPrefixProvider *dpp,
rgw::sal::Store* store,