From: Yehuda Sadeh Date: Mon, 17 Dec 2018 14:17:02 +0000 (-0800) Subject: rgw: lifecycle: rework listing iteration X-Git-Tag: v14.1.0~314^2~31 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=a0fa9f5efc22bd312748fd521b04c302090bf46d;p=ceph.git rgw: lifecycle: rework listing iteration Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/rgw_lc.cc b/src/rgw/rgw_lc.cc index cf363e06fbbb..e77efbb0a8ba 100644 --- a/src/rgw/rgw_lc.cc +++ b/src/rgw/rgw_lc.cc @@ -458,13 +458,90 @@ static bool is_valid_op(const lc_op& op) || op.dm_expiration)); } +class LCObjsLister { + RGWRados *store; + RGWBucketInfo& bucket_info; + RGWRados::Bucket target; + RGWRados::Bucket::List list_op; + bool is_truncated{false}; + rgw_obj_key next_marker; + string prefix; + vector objs; + vector::iterator obj_iter; + rgw_bucket_dir_entry pre_obj; + +public: + LCObjsLister(RGWRados *_store, RGWBucketInfo& _bucket_info) : + store(_store), bucket_info(_bucket_info), + target(store, bucket_info), list_op(&target) { + list_op.params.list_versions = bucket_info.versioned(); + list_op.params.allow_unordered = true; + } + + void set_prefix(const string& p) { + prefix = p; + } + + int init() { + return fetch(); + } + + int fetch() { + int ret = list_op.list_objects(1000, &objs, NULL, &is_truncated); + if (ret < 0) { + return ret; + } + + obj_iter = objs.begin(); + + return 0; + } + + bool get_obj(rgw_bucket_dir_entry *obj) { + if (obj_iter == objs.end()) { + return false; + } + if (is_truncated && (obj_iter + 1)==objs.end()) { + list_op.params.marker = obj_iter->key; + + int ret = fetch(); + if (ret < 0) { + ldout(store->ctx(), 0) << "ERROR: list_op returned ret=" << ret << dendl; + return ret; + } else { + obj_iter = objs.begin(); + } + } + *obj = *obj_iter; + return true; + } + + rgw_bucket_dir_entry get_prev_obj() { + return pre_obj; + } + + void next() { + pre_obj = *obj_iter; + ++obj_iter; + } + + bool next_has_same_name() + { + if ((obj_iter + 1) == objs.end()) { + /* this should have been called after get_obj() was called, so this should + * only happen if is_truncated is false */ + return false; + } + return (obj_iter->key.name.compare((obj_iter + 1)->key.name) == 0); + } +}; + int RGWLC::bucket_lc_process(string& shard_id) { RGWLifecycleConfiguration config(cct); RGWBucketInfo bucket_info; map bucket_attrs; string next_marker, no_ns, list_versions; - bool is_truncated; vector objs; auto obj_ctx = store->svc.sysobj->init_obj_ctx(); vector result; @@ -486,7 +563,7 @@ int RGWLC::bucket_lc_process(string& shard_id) } RGWRados::Bucket target(store, bucket_info); - RGWRados::Bucket::List list_op(&target); + LCObjsLister ol(store, bucket_info); map::iterator aiter = bucket_attrs.find(RGW_ATTR_LC); if (aiter == bucket_attrs.end()) @@ -501,7 +578,6 @@ int RGWLC::bucket_lc_process(string& shard_id) } map& prefix_map = config.get_prefix_map(); - list_op.params.list_versions = bucket_info.versioned(); if (!bucket_info.versioned()) { for(auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); ++prefix_iter) { if (!prefix_iter->second.status || @@ -515,70 +591,67 @@ int RGWLC::bucket_lc_process(string& shard_id) /* lifecycle processing does not depend on total order, so can * take advantage of unorderd listing optimizations--such as * operating on one shard at a time */ - list_op.params.prefix = prefix_iter->first; - list_op.params.allow_unordered = true; - do { - objs.clear(); - list_op.params.marker = list_op.get_next_marker(); - ret = list_op.list_objects(1000, &objs, NULL, &is_truncated); + ol.set_prefix(prefix_iter->first); + + ret = ol.init(); + if (ret < 0) { + if (ret == (-ENOENT)) + return 0; + ldpp_dout(this, 0) << "ERROR: store->list_objects():" <second; + + if (!key.ns.empty()) { + continue; + } + + int ret = check_tags(store, rctx, bucket_info, obj, op, &skip); if (ret < 0) { - if (ret == (-ENOENT)) - return 0; - ldpp_dout(this, 0) << "ERROR: store->list_objects():" <key); - RGWObjState *state; - rgw_obj obj(bucket_info.bucket, key); - RGWObjectCtx rctx(store); - bool skip; - auto& op = prefix_iter->second; - - if (!key.ns.empty()) { - continue; - } + if (skip) { + continue; + } - int ret = check_tags(store, rctx, bucket_info, obj, op, &skip); + if (op.expiration_date != boost::none) { + //we have checked it before + is_expired = true; + } else { + is_expired = obj_has_expired(o.meta.mtime, op.expiration); + } + if (is_expired) { + int ret = store->get_obj_state(&rctx, bucket_info, obj, &state, false); if (ret < 0) { return ret; } - if (skip) { + if (state->mtime != o.meta.mtime) { + //Check mtime again to avoid delete a recently update object as much as possible + ldpp_dout(this, 20) << __func__ << "() skipping removal: state->mtime " << state->mtime << " obj->mtime " << o.meta.mtime << dendl; continue; } - - if (op.expiration_date != boost::none) { - //we have checked it before - is_expired = true; + ret = remove_expired_obj(bucket_info, o.key, o.meta.owner, o.meta.owner_display_name, true); + if (ret < 0) { + ldpp_dout(this, 0) << "ERROR: remove_expired_obj " << dendl; } else { - is_expired = obj_has_expired(obj_iter->meta.mtime, op.expiration); + ldpp_dout(this, 2) << "DELETED:" << bucket_name << ":" << key << dendl; } - if (is_expired) { - int ret = store->get_obj_state(&rctx, bucket_info, obj, &state, false); - if (ret < 0) { - return ret; - } - if (state->mtime != obj_iter->meta.mtime) { - //Check mtime again to avoid delete a recently update object as much as possible - ldpp_dout(this, 20) << __func__ << "() skipping removal: state->mtime " << state->mtime << " obj->mtime " << obj_iter->meta.mtime << dendl; - continue; - } - ret = remove_expired_obj(bucket_info, obj_iter->key, obj_iter->meta.owner, obj_iter->meta.owner_display_name, true); - if (ret < 0) { - ldpp_dout(this, 0) << "ERROR: remove_expired_obj " << dendl; - } else { - ldpp_dout(this, 2) << "DELETED:" << bucket_name << ":" << key << dendl; - } - if (going_down()) - return 0; - } - } /* for objs */ - std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms)); - } while (is_truncated); + if (going_down()) + return 0; + } + } /* for objs */ + std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms)); } } else { //bucket versioning is enabled or suspended @@ -593,100 +666,87 @@ int RGWLC::bucket_lc_process(string& shard_id) (prefix_iter->first.compare(0, prev(prefix_iter)->first.length(), prev(prefix_iter)->first) == 0)) { next_marker = pre_marker; } else { - pre_marker = next_marker; + pre_marker = next_marker; } - list_op.params.prefix = prefix_iter->first; - rgw_bucket_dir_entry pre_obj; - do { - if (!objs.empty()) { - pre_obj = objs.back(); - } - objs.clear(); - list_op.params.marker = next_marker; - ret = list_op.list_objects(1000, &objs, NULL, &is_truncated); + ol.set_prefix(prefix_iter->first); - if (ret < 0) { - if (ret == (-ENOENT)) - return 0; - ldpp_dout(this, 0) << "ERROR: store->list_objects():" <list_objects():" <is_current()) { - if (prefix_iter->second.expiration <= 0 && prefix_iter->second.expiration_date == boost::none - && !prefix_iter->second.dm_expiration) { + ceph::real_time mtime; + bool remove_indeed = true; + int expiration; + bool skip_expiration_check; + bool is_expired; + rgw_bucket_dir_entry o; + for (; ol.get_obj(&o); ol.next()) { + skip_expiration_check = false; + is_expired = false; + if (o.is_current()) { + if (op.expiration <= 0 && + op.expiration_date == boost::none + && !op.dm_expiration) { + continue; + } + if (o.is_delete_marker()) { + if (ol.next_has_same_name()) { continue; } - if (obj_iter->is_delete_marker()) { - if ((obj_iter + 1)==objs.end()) { - if (is_truncated) { - //deal with it in next round because we can't judge whether this marker is the only version - next_marker = obj_iter->key; - break; - } - } else if (obj_iter->key.name.compare((obj_iter + 1)->key.name) == 0) { //*obj_iter is delete marker and isn't the only version, do nothing. + skip_expiration_check = op.dm_expiration; + remove_indeed = true; //we should remove the delete marker if it's the only version + } else { + remove_indeed = false; + } + mtime = o.meta.mtime; + expiration = op.expiration; + if (!skip_expiration_check) { + if (expiration <= 0) { + if (op.expiration_date == boost::none) { continue; } - skip_expiration_check = prefix_iter->second.dm_expiration; - remove_indeed = true; //we should remove the delete marker if it's the only version + is_expired = ceph_clock_now() >= ceph::real_clock::to_time_t(*op.expiration_date); } else { - remove_indeed = false; - } - mtime = obj_iter->meta.mtime; - expiration = prefix_iter->second.expiration; - if (!skip_expiration_check) { - if (expiration <= 0) { - if (prefix_iter->second.expiration_date == boost::none) { - continue; - } - is_expired = ceph_clock_now() >= ceph::real_clock::to_time_t(*prefix_iter->second.expiration_date); - } else { - is_expired = obj_has_expired(mtime, expiration); - } - } - } else { - if (prefix_iter->second.noncur_expiration <=0) { - continue; + is_expired = obj_has_expired(mtime, expiration); } - remove_indeed = true; - mtime = (obj_iter == objs.begin())?pre_obj.meta.mtime:(obj_iter - 1)->meta.mtime; - expiration = prefix_iter->second.noncur_expiration; - is_expired = obj_has_expired(mtime, expiration); } - if (skip_expiration_check || is_expired) { - if (obj_iter->is_visible()) { - RGWObjectCtx rctx(store); - rgw_obj obj(bucket_info.bucket, obj_iter->key); - RGWObjState *state; - int ret = store->get_obj_state(&rctx, bucket_info, obj, &state, false); - if (ret < 0) { - return ret; - } - if (state->mtime != obj_iter->meta.mtime)//Check mtime again to avoid delete a recently update object as much as possible - continue; - } - ret = remove_expired_obj(bucket_info, obj_iter->key, obj_iter->meta.owner, obj_iter->meta.owner_display_name, remove_indeed); + } else { /* a noncurrent obj */ + if (op.noncur_expiration <= 0) { + continue; + } + remove_indeed = true; + mtime = ol.get_prev_obj().meta.mtime; + expiration = op.noncur_expiration; + is_expired = obj_has_expired(mtime, expiration); + } + if (skip_expiration_check || is_expired) { + if (o.is_visible()) { + RGWObjectCtx rctx(store); + rgw_obj obj(bucket_info.bucket, o.key); + RGWObjState *state; + int ret = store->get_obj_state(&rctx, bucket_info, obj, &state, false); if (ret < 0) { - ldpp_dout(this, 0) << "ERROR: remove_expired_obj " << dendl; - } else { - ldpp_dout(this, 2) << "DELETED:" << bucket_name << ":" << obj_iter->key << dendl; + return ret; } - - if (going_down()) - return 0; + if (state->mtime != o.meta.mtime)//Check mtime again to avoid delete a recently update object as much as possible + continue; } + ret = remove_expired_obj(bucket_info, o.key, o.meta.owner, o.meta.owner_display_name, remove_indeed); + if (ret < 0) { + ldpp_dout(this, 0) << "ERROR: remove_expired_obj " << dendl; + } else { + ldpp_dout(this, 2) << "DELETED:" << bucket_name << ":" << o.key << dendl; + } + + if (going_down()) + return 0; } - } while (is_truncated); + } } }