From: Yehuda Sadeh Date: Mon, 17 Dec 2018 15:00:36 +0000 (-0800) Subject: rgw: lifecycle: consolidate versioned and non-versioned operation X-Git-Tag: v14.1.0~314^2~30 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=cad2c2abd81b27e76bea5ea4c5a56e60d4db513a;p=ceph.git rgw: lifecycle: consolidate versioned and non-versioned operation Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/rgw_lc.cc b/src/rgw/rgw_lc.cc index e77efbb0a8ba..371ac075f75b 100644 --- a/src/rgw/rgw_lc.cc +++ b/src/rgw/rgw_lc.cc @@ -469,6 +469,7 @@ class LCObjsLister { vector objs; vector::iterator obj_iter; rgw_bucket_dir_entry pre_obj; + int64_t delay_ms; public: LCObjsLister(RGWRados *_store, RGWBucketInfo& _bucket_info) : @@ -476,6 +477,7 @@ public: target(store, bucket_info), list_op(&target) { list_op.params.list_versions = bucket_info.versioned(); list_op.params.allow_unordered = true; + delay_ms = store->ctx()->_conf.get_val("rgw_lc_thread_delay"); } void set_prefix(const string& p) { @@ -497,8 +499,13 @@ public: return 0; } + void delay() { + std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms)); + } + bool get_obj(rgw_bucket_dir_entry *obj) { if (obj_iter == objs.end()) { + delay(); return false; } if (is_truncated && (obj_iter + 1)==objs.end()) { @@ -511,6 +518,7 @@ public: } else { obj_iter = objs.begin(); } + delay(); } *obj = *obj_iter; return true; @@ -541,11 +549,10 @@ int RGWLC::bucket_lc_process(string& shard_id) RGWLifecycleConfiguration config(cct); RGWBucketInfo bucket_info; map bucket_attrs; - string next_marker, no_ns, list_versions; + string no_ns, list_versions; vector objs; auto obj_ctx = store->svc.sysobj->init_obj_ctx(); vector result; - auto delay_ms = cct->_conf.get_val("rgw_lc_thread_delay"); boost::split(result, shard_id, boost::is_any_of(":")); string bucket_tenant = result[0]; string bucket_name = result[1]; @@ -578,174 +585,110 @@ int RGWLC::bucket_lc_process(string& shard_id) } map& prefix_map = config.get_prefix_map(); - if (!bucket_info.versioned()) { - for(auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); ++prefix_iter) { - if (!prefix_iter->second.status || - (prefix_iter->second.expiration <=0 && prefix_iter->second.expiration_date == boost::none)) { - continue; - } - if (prefix_iter->second.expiration_date != boost::none && - ceph_clock_now() < ceph::real_clock::to_time_t(*prefix_iter->second.expiration_date)) { - continue; - } - /* lifecycle processing does not depend on total order, so can - * take advantage of unorderd listing optimizations--such as - * operating on one shard at a time */ - ol.set_prefix(prefix_iter->first); + rgw_obj_key pre_marker; + rgw_obj_key next_marker; + for(auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); ++prefix_iter) { + auto& op = prefix_iter->second; + if (!is_valid_op(op)) { + continue; + } + if (prefix_iter != prefix_map.begin() && + (prefix_iter->first.compare(0, prev(prefix_iter)->first.length(), prev(prefix_iter)->first) == 0)) { + next_marker = pre_marker; + } else { + pre_marker = next_marker; + } + ol.set_prefix(prefix_iter->first); - ret = ol.init(); + ret = ol.init(); - if (ret < 0) { - if (ret == (-ENOENT)) - return 0; - ldpp_dout(this, 0) << "ERROR: store->list_objects():" <list_objects():" <second; + ceph::real_time mtime; + bool remove_indeed = true; + int expiration; + bool skip_expiration_check; + bool is_expired; + rgw_bucket_dir_entry o; + for (; ol.get_obj(&o); ol.next()) { + skip_expiration_check = false; + is_expired = false; - if (!key.ns.empty()) { - continue; - } + RGWObjectCtx rctx(store); + rgw_obj obj(bucket_info.bucket, o.key); + if (!o.is_delete_marker()) { + bool skip; int ret = check_tags(store, rctx, bucket_info, obj, op, &skip); if (ret < 0) { return ret; } + if (skip) { continue; } + } - if (op.expiration_date != boost::none) { - //we have checked it before - is_expired = true; - } else { - is_expired = obj_has_expired(o.meta.mtime, op.expiration); + if (o.is_current()) { + if (op.expiration <= 0 && + op.expiration_date == boost::none + && !op.dm_expiration) { + continue; } - if (is_expired) { - int ret = store->get_obj_state(&rctx, bucket_info, obj, &state, false); - if (ret < 0) { - return ret; - } - if (state->mtime != o.meta.mtime) { - //Check mtime again to avoid delete a recently update object as much as possible - ldpp_dout(this, 20) << __func__ << "() skipping removal: state->mtime " << state->mtime << " obj->mtime " << o.meta.mtime << dendl; + if (o.is_delete_marker()) { + if (ol.next_has_same_name()) { continue; } - ret = remove_expired_obj(bucket_info, o.key, o.meta.owner, o.meta.owner_display_name, true); - if (ret < 0) { - ldpp_dout(this, 0) << "ERROR: remove_expired_obj " << dendl; - } else { - ldpp_dout(this, 2) << "DELETED:" << bucket_name << ":" << key << dendl; - } - - if (going_down()) - return 0; + skip_expiration_check = op.dm_expiration; + remove_indeed = true; //we should remove the delete marker if it's the only version + } else { + remove_indeed = !bucket_info.versioned(); } - } /* for objs */ - std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms)); - } - } else { - //bucket versioning is enabled or suspended - rgw_obj_key pre_marker; - rgw_obj_key next_marker; - for(auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); ++prefix_iter) { - auto& op = prefix_iter->second; - if (!is_valid_op(op)) { - continue; - } - if (prefix_iter != prefix_map.begin() && - (prefix_iter->first.compare(0, prev(prefix_iter)->first.length(), prev(prefix_iter)->first) == 0)) { - next_marker = pre_marker; - } else { - pre_marker = next_marker; - } - ol.set_prefix(prefix_iter->first); - - ret = ol.init(); - - if (ret < 0) { - if (ret == (-ENOENT)) - return 0; - ldpp_dout(this, 0) << "ERROR: store->list_objects():" <= ceph::real_clock::to_time_t(*op.expiration_date); } else { - remove_indeed = false; - } - mtime = o.meta.mtime; - expiration = op.expiration; - if (!skip_expiration_check) { - if (expiration <= 0) { - if (op.expiration_date == boost::none) { - continue; - } - is_expired = ceph_clock_now() >= ceph::real_clock::to_time_t(*op.expiration_date); - } else { - is_expired = obj_has_expired(mtime, expiration); - } - } - } else { /* a noncurrent obj */ - if (op.noncur_expiration <= 0) { - continue; + is_expired = obj_has_expired(mtime, expiration); } - remove_indeed = true; - mtime = ol.get_prev_obj().meta.mtime; - expiration = op.noncur_expiration; - is_expired = obj_has_expired(mtime, expiration); } - if (skip_expiration_check || is_expired) { - if (o.is_visible()) { - RGWObjectCtx rctx(store); - rgw_obj obj(bucket_info.bucket, o.key); - RGWObjState *state; - int ret = store->get_obj_state(&rctx, bucket_info, obj, &state, false); - if (ret < 0) { - return ret; - } - if (state->mtime != o.meta.mtime)//Check mtime again to avoid delete a recently update object as much as possible - continue; - } - ret = remove_expired_obj(bucket_info, o.key, o.meta.owner, o.meta.owner_display_name, remove_indeed); + } else { /* a noncurrent obj */ + if (op.noncur_expiration <= 0) { + continue; + } + remove_indeed = true; + mtime = ol.get_prev_obj().meta.mtime; + expiration = op.noncur_expiration; + is_expired = obj_has_expired(mtime, expiration); + } + if (skip_expiration_check || is_expired) { + if (o.is_visible()) { + RGWObjState *state; + int ret = store->get_obj_state(&rctx, bucket_info, obj, &state, false); if (ret < 0) { - ldpp_dout(this, 0) << "ERROR: remove_expired_obj " << dendl; - } else { - ldpp_dout(this, 2) << "DELETED:" << bucket_name << ":" << o.key << dendl; + return ret; } - - if (going_down()) - return 0; + if (state->mtime != o.meta.mtime)//Check mtime again to avoid delete a recently update object as much as possible + continue; } + ret = remove_expired_obj(bucket_info, o.key, o.meta.owner, o.meta.owner_display_name, remove_indeed); + if (ret < 0) { + ldpp_dout(this, 0) << "ERROR: remove_expired_obj " << dendl; + } else { + ldpp_dout(this, 2) << "DELETED:" << bucket_name << ":" << o.key << dendl; + } + + if (going_down()) + return 0; } } }