|| op.dm_expiration));
}
+class LCObjsLister {
+ RGWRados *store;
+ RGWBucketInfo& bucket_info;
+ RGWRados::Bucket target;
+ RGWRados::Bucket::List list_op;
+ bool is_truncated{false};
+ rgw_obj_key next_marker;
+ string prefix;
+ vector<rgw_bucket_dir_entry> objs;
+ vector<rgw_bucket_dir_entry>::iterator obj_iter;
+ rgw_bucket_dir_entry pre_obj;
+
+public:
+ LCObjsLister(RGWRados *_store, RGWBucketInfo& _bucket_info) :
+ store(_store), bucket_info(_bucket_info),
+ target(store, bucket_info), list_op(&target) {
+ list_op.params.list_versions = bucket_info.versioned();
+ list_op.params.allow_unordered = true;
+ }
+
+ void set_prefix(const string& p) {
+ prefix = p;
+ }
+
+ int init() {
+ return fetch();
+ }
+
+ int fetch() {
+ int ret = list_op.list_objects(1000, &objs, NULL, &is_truncated);
+ if (ret < 0) {
+ return ret;
+ }
+
+ obj_iter = objs.begin();
+
+ return 0;
+ }
+
+ bool get_obj(rgw_bucket_dir_entry *obj) {
+ if (obj_iter == objs.end()) {
+ return false;
+ }
+ if (is_truncated && (obj_iter + 1)==objs.end()) {
+ list_op.params.marker = obj_iter->key;
+
+ int ret = fetch();
+ if (ret < 0) {
+ ldout(store->ctx(), 0) << "ERROR: list_op returned ret=" << ret << dendl;
+ return ret;
+ } else {
+ obj_iter = objs.begin();
+ }
+ }
+ *obj = *obj_iter;
+ return true;
+ }
+
+ rgw_bucket_dir_entry get_prev_obj() {
+ return pre_obj;
+ }
+
+ void next() {
+ pre_obj = *obj_iter;
+ ++obj_iter;
+ }
+
+ bool next_has_same_name()
+ {
+ if ((obj_iter + 1) == objs.end()) {
+ /* this should have been called after get_obj() was called, so this should
+ * only happen if is_truncated is false */
+ return false;
+ }
+ return (obj_iter->key.name.compare((obj_iter + 1)->key.name) == 0);
+ }
+};
+
int RGWLC::bucket_lc_process(string& shard_id)
{
RGWLifecycleConfiguration config(cct);
RGWBucketInfo bucket_info;
map<string, bufferlist> bucket_attrs;
string next_marker, no_ns, list_versions;
- bool is_truncated;
vector<rgw_bucket_dir_entry> objs;
auto obj_ctx = store->svc.sysobj->init_obj_ctx();
vector<std::string> result;
}
RGWRados::Bucket target(store, bucket_info);
- RGWRados::Bucket::List list_op(&target);
+ LCObjsLister ol(store, bucket_info);
map<string, bufferlist>::iterator aiter = bucket_attrs.find(RGW_ATTR_LC);
if (aiter == bucket_attrs.end())
}
map<string, lc_op>& prefix_map = config.get_prefix_map();
- list_op.params.list_versions = bucket_info.versioned();
if (!bucket_info.versioned()) {
for(auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); ++prefix_iter) {
if (!prefix_iter->second.status ||
/* lifecycle processing does not depend on total order, so can
* take advantage of unorderd listing optimizations--such as
* operating on one shard at a time */
- list_op.params.prefix = prefix_iter->first;
- list_op.params.allow_unordered = true;
- do {
- objs.clear();
- list_op.params.marker = list_op.get_next_marker();
- ret = list_op.list_objects(1000, &objs, NULL, &is_truncated);
+ ol.set_prefix(prefix_iter->first);
+
+ ret = ol.init();
+ if (ret < 0) {
+ if (ret == (-ENOENT))
+ return 0;
+ ldpp_dout(this, 0) << "ERROR: store->list_objects():" <<dendl;
+ return ret;
+ }
+
+ bool is_expired;
+ rgw_bucket_dir_entry o;
+ for (; ol.get_obj(&o); ol.next()) {
+ rgw_obj_key key(o.key);
+ RGWObjState *state;
+ rgw_obj obj(bucket_info.bucket, key);
+ RGWObjectCtx rctx(store);
+ bool skip;
+ auto& op = prefix_iter->second;
+
+ if (!key.ns.empty()) {
+ continue;
+ }
+
+ int ret = check_tags(store, rctx, bucket_info, obj, op, &skip);
if (ret < 0) {
- if (ret == (-ENOENT))
- return 0;
- ldpp_dout(this, 0) << "ERROR: store->list_objects():" <<dendl;
return ret;
}
-
- bool is_expired;
- for (auto obj_iter = objs.begin(); obj_iter != objs.end(); ++obj_iter) {
- rgw_obj_key key(obj_iter->key);
- RGWObjState *state;
- rgw_obj obj(bucket_info.bucket, key);
- RGWObjectCtx rctx(store);
- bool skip;
- auto& op = prefix_iter->second;
-
- if (!key.ns.empty()) {
- continue;
- }
+ if (skip) {
+ continue;
+ }
- int ret = check_tags(store, rctx, bucket_info, obj, op, &skip);
+ if (op.expiration_date != boost::none) {
+ //we have checked it before
+ is_expired = true;
+ } else {
+ is_expired = obj_has_expired(o.meta.mtime, op.expiration);
+ }
+ if (is_expired) {
+ int ret = store->get_obj_state(&rctx, bucket_info, obj, &state, false);
if (ret < 0) {
return ret;
}
- if (skip) {
+ if (state->mtime != o.meta.mtime) {
+ //Check mtime again to avoid delete a recently update object as much as possible
+ ldpp_dout(this, 20) << __func__ << "() skipping removal: state->mtime " << state->mtime << " obj->mtime " << o.meta.mtime << dendl;
continue;
}
-
- if (op.expiration_date != boost::none) {
- //we have checked it before
- is_expired = true;
+ ret = remove_expired_obj(bucket_info, o.key, o.meta.owner, o.meta.owner_display_name, true);
+ if (ret < 0) {
+ ldpp_dout(this, 0) << "ERROR: remove_expired_obj " << dendl;
} else {
- is_expired = obj_has_expired(obj_iter->meta.mtime, op.expiration);
+ ldpp_dout(this, 2) << "DELETED:" << bucket_name << ":" << key << dendl;
}
- if (is_expired) {
- int ret = store->get_obj_state(&rctx, bucket_info, obj, &state, false);
- if (ret < 0) {
- return ret;
- }
- if (state->mtime != obj_iter->meta.mtime) {
- //Check mtime again to avoid delete a recently update object as much as possible
- ldpp_dout(this, 20) << __func__ << "() skipping removal: state->mtime " << state->mtime << " obj->mtime " << obj_iter->meta.mtime << dendl;
- continue;
- }
- ret = remove_expired_obj(bucket_info, obj_iter->key, obj_iter->meta.owner, obj_iter->meta.owner_display_name, true);
- if (ret < 0) {
- ldpp_dout(this, 0) << "ERROR: remove_expired_obj " << dendl;
- } else {
- ldpp_dout(this, 2) << "DELETED:" << bucket_name << ":" << key << dendl;
- }
- if (going_down())
- return 0;
- }
- } /* for objs */
- std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms));
- } while (is_truncated);
+ if (going_down())
+ return 0;
+ }
+ } /* for objs */
+ std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms));
}
} else {
//bucket versioning is enabled or suspended
(prefix_iter->first.compare(0, prev(prefix_iter)->first.length(), prev(prefix_iter)->first) == 0)) {
next_marker = pre_marker;
} else {
- pre_marker = next_marker;
+ pre_marker = next_marker;
}
- list_op.params.prefix = prefix_iter->first;
- rgw_bucket_dir_entry pre_obj;
- do {
- if (!objs.empty()) {
- pre_obj = objs.back();
- }
- objs.clear();
- list_op.params.marker = next_marker;
- ret = list_op.list_objects(1000, &objs, NULL, &is_truncated);
+ ol.set_prefix(prefix_iter->first);
- if (ret < 0) {
- if (ret == (-ENOENT))
- return 0;
- ldpp_dout(this, 0) << "ERROR: store->list_objects():" <<dendl;
- return ret;
- }
+ ret = ol.init();
+
+ if (ret < 0) {
+ if (ret == (-ENOENT))
+ return 0;
+ ldpp_dout(this, 0) << "ERROR: store->list_objects():" <<dendl;
+ return ret;
+ }
- next_marker = list_op.get_next_marker();
-
- ceph::real_time mtime;
- bool remove_indeed = true;
- int expiration;
- bool skip_expiration_check;
- bool is_expired;
- for (auto obj_iter = objs.begin(); obj_iter != objs.end(); ++obj_iter) {
- skip_expiration_check = false;
- is_expired = false;
- if (obj_iter->is_current()) {
- if (prefix_iter->second.expiration <= 0 && prefix_iter->second.expiration_date == boost::none
- && !prefix_iter->second.dm_expiration) {
+ ceph::real_time mtime;
+ bool remove_indeed = true;
+ int expiration;
+ bool skip_expiration_check;
+ bool is_expired;
+ rgw_bucket_dir_entry o;
+ for (; ol.get_obj(&o); ol.next()) {
+ skip_expiration_check = false;
+ is_expired = false;
+ if (o.is_current()) {
+ if (op.expiration <= 0 &&
+ op.expiration_date == boost::none
+ && !op.dm_expiration) {
+ continue;
+ }
+ if (o.is_delete_marker()) {
+ if (ol.next_has_same_name()) {
continue;
}
- if (obj_iter->is_delete_marker()) {
- if ((obj_iter + 1)==objs.end()) {
- if (is_truncated) {
- //deal with it in next round because we can't judge whether this marker is the only version
- next_marker = obj_iter->key;
- break;
- }
- } else if (obj_iter->key.name.compare((obj_iter + 1)->key.name) == 0) { //*obj_iter is delete marker and isn't the only version, do nothing.
+ skip_expiration_check = op.dm_expiration;
+ remove_indeed = true; //we should remove the delete marker if it's the only version
+ } else {
+ remove_indeed = false;
+ }
+ mtime = o.meta.mtime;
+ expiration = op.expiration;
+ if (!skip_expiration_check) {
+ if (expiration <= 0) {
+ if (op.expiration_date == boost::none) {
continue;
}
- skip_expiration_check = prefix_iter->second.dm_expiration;
- remove_indeed = true; //we should remove the delete marker if it's the only version
+ is_expired = ceph_clock_now() >= ceph::real_clock::to_time_t(*op.expiration_date);
} else {
- remove_indeed = false;
- }
- mtime = obj_iter->meta.mtime;
- expiration = prefix_iter->second.expiration;
- if (!skip_expiration_check) {
- if (expiration <= 0) {
- if (prefix_iter->second.expiration_date == boost::none) {
- continue;
- }
- is_expired = ceph_clock_now() >= ceph::real_clock::to_time_t(*prefix_iter->second.expiration_date);
- } else {
- is_expired = obj_has_expired(mtime, expiration);
- }
- }
- } else {
- if (prefix_iter->second.noncur_expiration <=0) {
- continue;
+ is_expired = obj_has_expired(mtime, expiration);
}
- remove_indeed = true;
- mtime = (obj_iter == objs.begin())?pre_obj.meta.mtime:(obj_iter - 1)->meta.mtime;
- expiration = prefix_iter->second.noncur_expiration;
- is_expired = obj_has_expired(mtime, expiration);
}
- if (skip_expiration_check || is_expired) {
- if (obj_iter->is_visible()) {
- RGWObjectCtx rctx(store);
- rgw_obj obj(bucket_info.bucket, obj_iter->key);
- RGWObjState *state;
- int ret = store->get_obj_state(&rctx, bucket_info, obj, &state, false);
- if (ret < 0) {
- return ret;
- }
- if (state->mtime != obj_iter->meta.mtime)//Check mtime again to avoid delete a recently update object as much as possible
- continue;
- }
- ret = remove_expired_obj(bucket_info, obj_iter->key, obj_iter->meta.owner, obj_iter->meta.owner_display_name, remove_indeed);
+ } else { /* a noncurrent obj */
+ if (op.noncur_expiration <= 0) {
+ continue;
+ }
+ remove_indeed = true;
+ mtime = ol.get_prev_obj().meta.mtime;
+ expiration = op.noncur_expiration;
+ is_expired = obj_has_expired(mtime, expiration);
+ }
+ if (skip_expiration_check || is_expired) {
+ if (o.is_visible()) {
+ RGWObjectCtx rctx(store);
+ rgw_obj obj(bucket_info.bucket, o.key);
+ RGWObjState *state;
+ int ret = store->get_obj_state(&rctx, bucket_info, obj, &state, false);
if (ret < 0) {
- ldpp_dout(this, 0) << "ERROR: remove_expired_obj " << dendl;
- } else {
- ldpp_dout(this, 2) << "DELETED:" << bucket_name << ":" << obj_iter->key << dendl;
+ return ret;
}
-
- if (going_down())
- return 0;
+ if (state->mtime != o.meta.mtime)//Check mtime again to avoid delete a recently update object as much as possible
+ continue;
}
+ ret = remove_expired_obj(bucket_info, o.key, o.meta.owner, o.meta.owner_display_name, remove_indeed);
+ if (ret < 0) {
+ ldpp_dout(this, 0) << "ERROR: remove_expired_obj " << dendl;
+ } else {
+ ldpp_dout(this, 2) << "DELETED:" << bucket_name << ":" << o.key << dendl;
+ }
+
+ if (going_down())
+ return 0;
}
- } while (is_truncated);
+ }
}
}