vector<rgw_bucket_dir_entry> objs;
vector<rgw_bucket_dir_entry>::iterator obj_iter;
rgw_bucket_dir_entry pre_obj;
+ int64_t delay_ms;
public:
LCObjsLister(RGWRados *_store, RGWBucketInfo& _bucket_info) :
target(store, bucket_info), list_op(&target) {
list_op.params.list_versions = bucket_info.versioned();
list_op.params.allow_unordered = true;
+ delay_ms = store->ctx()->_conf.get_val<int64_t>("rgw_lc_thread_delay");
}
void set_prefix(const string& p) {
return 0;
}
+ void delay() {
+ std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms));
+ }
+
bool get_obj(rgw_bucket_dir_entry *obj) {
if (obj_iter == objs.end()) {
+ delay();
return false;
}
if (is_truncated && (obj_iter + 1)==objs.end()) {
} else {
obj_iter = objs.begin();
}
+ delay();
}
*obj = *obj_iter;
return true;
RGWLifecycleConfiguration config(cct);
RGWBucketInfo bucket_info;
map<string, bufferlist> bucket_attrs;
- string next_marker, no_ns, list_versions;
+ string no_ns, list_versions;
vector<rgw_bucket_dir_entry> objs;
auto obj_ctx = store->svc.sysobj->init_obj_ctx();
vector<std::string> result;
- auto delay_ms = cct->_conf.get_val<int64_t>("rgw_lc_thread_delay");
boost::split(result, shard_id, boost::is_any_of(":"));
string bucket_tenant = result[0];
string bucket_name = result[1];
}
map<string, lc_op>& prefix_map = config.get_prefix_map();
- if (!bucket_info.versioned()) {
- for(auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); ++prefix_iter) {
- if (!prefix_iter->second.status ||
- (prefix_iter->second.expiration <=0 && prefix_iter->second.expiration_date == boost::none)) {
- continue;
- }
- if (prefix_iter->second.expiration_date != boost::none &&
- ceph_clock_now() < ceph::real_clock::to_time_t(*prefix_iter->second.expiration_date)) {
- continue;
- }
- /* lifecycle processing does not depend on total order, so can
- * take advantage of unorderd listing optimizations--such as
- * operating on one shard at a time */
- ol.set_prefix(prefix_iter->first);
+ rgw_obj_key pre_marker;
+ rgw_obj_key next_marker;
+ for(auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); ++prefix_iter) {
+ auto& op = prefix_iter->second;
+ if (!is_valid_op(op)) {
+ continue;
+ }
+ if (prefix_iter != prefix_map.begin() &&
+ (prefix_iter->first.compare(0, prev(prefix_iter)->first.length(), prev(prefix_iter)->first) == 0)) {
+ next_marker = pre_marker;
+ } else {
+ pre_marker = next_marker;
+ }
+ ol.set_prefix(prefix_iter->first);
- ret = ol.init();
+ ret = ol.init();
- if (ret < 0) {
- if (ret == (-ENOENT))
- return 0;
- ldpp_dout(this, 0) << "ERROR: store->list_objects():" <<dendl;
- return ret;
- }
+ if (ret < 0) {
+ if (ret == (-ENOENT))
+ return 0;
+ ldpp_dout(this, 0) << "ERROR: store->list_objects():" <<dendl;
+ return ret;
+ }
- bool is_expired;
- rgw_bucket_dir_entry o;
- for (; ol.get_obj(&o); ol.next()) {
- rgw_obj_key key(o.key);
- RGWObjState *state;
- rgw_obj obj(bucket_info.bucket, key);
- RGWObjectCtx rctx(store);
- bool skip;
- auto& op = prefix_iter->second;
+ ceph::real_time mtime;
+ bool remove_indeed = true;
+ int expiration;
+ bool skip_expiration_check;
+ bool is_expired;
+ rgw_bucket_dir_entry o;
+ for (; ol.get_obj(&o); ol.next()) {
+ skip_expiration_check = false;
+ is_expired = false;
- if (!key.ns.empty()) {
- continue;
- }
+ RGWObjectCtx rctx(store);
+ rgw_obj obj(bucket_info.bucket, o.key);
+ if (!o.is_delete_marker()) {
+ bool skip;
int ret = check_tags(store, rctx, bucket_info, obj, op, &skip);
if (ret < 0) {
return ret;
}
+
if (skip) {
continue;
}
+ }
- if (op.expiration_date != boost::none) {
- //we have checked it before
- is_expired = true;
- } else {
- is_expired = obj_has_expired(o.meta.mtime, op.expiration);
+ if (o.is_current()) {
+ if (op.expiration <= 0 &&
+ op.expiration_date == boost::none
+ && !op.dm_expiration) {
+ continue;
}
- if (is_expired) {
- int ret = store->get_obj_state(&rctx, bucket_info, obj, &state, false);
- if (ret < 0) {
- return ret;
- }
- if (state->mtime != o.meta.mtime) {
- //Check mtime again to avoid delete a recently update object as much as possible
- ldpp_dout(this, 20) << __func__ << "() skipping removal: state->mtime " << state->mtime << " obj->mtime " << o.meta.mtime << dendl;
+ if (o.is_delete_marker()) {
+ if (ol.next_has_same_name()) {
continue;
}
- ret = remove_expired_obj(bucket_info, o.key, o.meta.owner, o.meta.owner_display_name, true);
- if (ret < 0) {
- ldpp_dout(this, 0) << "ERROR: remove_expired_obj " << dendl;
- } else {
- ldpp_dout(this, 2) << "DELETED:" << bucket_name << ":" << key << dendl;
- }
-
- if (going_down())
- return 0;
+ skip_expiration_check = op.dm_expiration;
+ remove_indeed = true; //we should remove the delete marker if it's the only version
+ } else {
+ remove_indeed = !bucket_info.versioned();
}
- } /* for objs */
- std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms));
- }
- } else {
- //bucket versioning is enabled or suspended
- rgw_obj_key pre_marker;
- rgw_obj_key next_marker;
- for(auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); ++prefix_iter) {
- auto& op = prefix_iter->second;
- if (!is_valid_op(op)) {
- continue;
- }
- if (prefix_iter != prefix_map.begin() &&
- (prefix_iter->first.compare(0, prev(prefix_iter)->first.length(), prev(prefix_iter)->first) == 0)) {
- next_marker = pre_marker;
- } else {
- pre_marker = next_marker;
- }
- ol.set_prefix(prefix_iter->first);
-
- ret = ol.init();
-
- if (ret < 0) {
- if (ret == (-ENOENT))
- return 0;
- ldpp_dout(this, 0) << "ERROR: store->list_objects():" <<dendl;
- return ret;
- }
-
- ceph::real_time mtime;
- bool remove_indeed = true;
- int expiration;
- bool skip_expiration_check;
- bool is_expired;
- rgw_bucket_dir_entry o;
- for (; ol.get_obj(&o); ol.next()) {
- skip_expiration_check = false;
- is_expired = false;
- if (o.is_current()) {
- if (op.expiration <= 0 &&
- op.expiration_date == boost::none
- && !op.dm_expiration) {
- continue;
- }
- if (o.is_delete_marker()) {
- if (ol.next_has_same_name()) {
+ mtime = o.meta.mtime;
+ expiration = op.expiration;
+ if (!skip_expiration_check) {
+ if (expiration <= 0) {
+ if (op.expiration_date == boost::none) {
continue;
}
- skip_expiration_check = op.dm_expiration;
- remove_indeed = true; //we should remove the delete marker if it's the only version
+ is_expired = ceph_clock_now() >= ceph::real_clock::to_time_t(*op.expiration_date);
} else {
- remove_indeed = false;
- }
- mtime = o.meta.mtime;
- expiration = op.expiration;
- if (!skip_expiration_check) {
- if (expiration <= 0) {
- if (op.expiration_date == boost::none) {
- continue;
- }
- is_expired = ceph_clock_now() >= ceph::real_clock::to_time_t(*op.expiration_date);
- } else {
- is_expired = obj_has_expired(mtime, expiration);
- }
- }
- } else { /* a noncurrent obj */
- if (op.noncur_expiration <= 0) {
- continue;
+ is_expired = obj_has_expired(mtime, expiration);
}
- remove_indeed = true;
- mtime = ol.get_prev_obj().meta.mtime;
- expiration = op.noncur_expiration;
- is_expired = obj_has_expired(mtime, expiration);
}
- if (skip_expiration_check || is_expired) {
- if (o.is_visible()) {
- RGWObjectCtx rctx(store);
- rgw_obj obj(bucket_info.bucket, o.key);
- RGWObjState *state;
- int ret = store->get_obj_state(&rctx, bucket_info, obj, &state, false);
- if (ret < 0) {
- return ret;
- }
- if (state->mtime != o.meta.mtime)//Check mtime again to avoid delete a recently update object as much as possible
- continue;
- }
- ret = remove_expired_obj(bucket_info, o.key, o.meta.owner, o.meta.owner_display_name, remove_indeed);
+ } else { /* a noncurrent obj */
+ if (op.noncur_expiration <= 0) {
+ continue;
+ }
+ remove_indeed = true;
+ mtime = ol.get_prev_obj().meta.mtime;
+ expiration = op.noncur_expiration;
+ is_expired = obj_has_expired(mtime, expiration);
+ }
+ if (skip_expiration_check || is_expired) {
+ if (o.is_visible()) {
+ RGWObjState *state;
+ int ret = store->get_obj_state(&rctx, bucket_info, obj, &state, false);
if (ret < 0) {
- ldpp_dout(this, 0) << "ERROR: remove_expired_obj " << dendl;
- } else {
- ldpp_dout(this, 2) << "DELETED:" << bucket_name << ":" << o.key << dendl;
+ return ret;
}
-
- if (going_down())
- return 0;
+ if (state->mtime != o.meta.mtime)//Check mtime again to avoid delete a recently update object as much as possible
+ continue;
}
+ ret = remove_expired_obj(bucket_info, o.key, o.meta.owner, o.meta.owner_display_name, remove_indeed);
+ if (ret < 0) {
+ ldpp_dout(this, 0) << "ERROR: remove_expired_obj " << dendl;
+ } else {
+ ldpp_dout(this, 2) << "DELETED:" << bucket_name << ":" << o.key << dendl;
+ }
+
+ if (going_down())
+ return 0;
}
}
}