return 0;
}
-bool RGWLC::obj_has_expired(ceph::real_time mtime, int days)
+static bool obj_has_expired(CephContext *cct, ceph::real_time mtime, int days, ceph::real_time *expire_time = nullptr)
{
double timediff, cmp;
utime_t base_time;
}
timediff = base_time - ceph::real_clock::to_time_t(mtime);
+ if (expire_time) {
+ *expire_time = mtime + make_timespan(cmp);
+ }
+
return (timediff >= cmp);
}
-int RGWLC::remove_expired_obj(RGWBucketInfo& bucket_info, rgw_obj_key obj_key, const string& owner, const string& owner_display_name, bool remove_indeed)
+static int remove_expired_obj(CephContext *cct, RGWRados *store, RGWBucketInfo& bucket_info, rgw_obj_key obj_key,
+ const string& owner, const string& owner_display_name, bool remove_indeed)
{
if (remove_indeed) {
return rgw_remove_object(store, bucket_info, bucket_info.bucket, obj_key);
}
for (auto obj_iter = objs.begin(); obj_iter != objs.end(); ++obj_iter) {
- if (obj_has_expired(obj_iter->meta.mtime, prefix_iter->second.mp_expiration)) {
+ if (obj_has_expired(cct, obj_iter->meta.mtime, prefix_iter->second.mp_expiration)) {
rgw_obj_key key(obj_iter->key);
if (!mp_obj.from_meta(key.name)) {
continue;
return read_op.get_attr(RGW_ATTR_TAGS, tags_bl);
}
-int RGWLC::check_tags(RGWRados *store, RGWObjectCtx& rctx, RGWBucketInfo& bucket_info, rgw_obj& obj, lc_op& op, bool *skip)
-{
- if (op.obj_tags != boost::none) {
- *skip = true;
-
- bufferlist tags_bl;
- int ret = read_obj_tags(store, bucket_info, obj, rctx, tags_bl);
- if (ret < 0) {
- if (ret != -ENODATA) {
- ldpp_dout(this, 5) << "ERROR: read_obj_tags returned r=" << ret << dendl;
- }
- return 0;
- }
- RGWObjTags dest_obj_tags;
- try {
- auto iter = tags_bl.cbegin();
- dest_obj_tags.decode(iter);
- } catch (buffer::error& err) {
- ldpp_dout(this,0) << "ERROR: caught buffer::error, couldn't decode TagSet" << dendl;
- return -EIO;
- }
-
- if (!includes(dest_obj_tags.get_tags().begin(),
- dest_obj_tags.get_tags().end(),
- op.obj_tags->get_tags().begin(),
- op.obj_tags->get_tags().end())){
- ldpp_dout(this, 20) << __func__ << "() skipping obj " << obj << " as tags do not match" << dendl;
- return 0;
- }
- }
- *skip = false;
- return 0;
-}
-
static bool is_valid_op(const lc_op& op)
{
return (op.status &&
}
};
+
+struct op_env {
+ lc_op& op;
+ RGWRados *store;
+ RGWLC *lc;
+ RGWBucketInfo& bucket_info;
+ LCObjsLister& ol;
+
+ op_env(lc_op& _op, RGWRados *_store, RGWLC *_lc, RGWBucketInfo& _bucket_info,
+ LCObjsLister& _ol) : op(_op), store(_store), lc(_lc), bucket_info(_bucket_info), ol(_ol) {}
+};
+
+class LCRuleOp;
+
+struct lc_op_ctx {
+ CephContext *cct;
+ op_env& env;
+ rgw_bucket_dir_entry& o;
+
+ RGWRados *store;
+ RGWBucketInfo& bucket_info;
+ lc_op& op;
+ LCObjsLister& ol;
+
+ rgw_obj obj;
+ RGWObjectCtx rctx;
+
+ lc_op_ctx(op_env& _env, rgw_bucket_dir_entry& _o) : cct(_env.store->ctx()), env(_env), o(_o),
+ store(env.store), bucket_info(env.bucket_info), op(env.op), ol(env.ol),
+ obj(env.bucket_info.bucket, o.key), rctx(env.store) {}
+};
+
+class LCOpAction {
+public:
+ virtual ~LCOpAction() {}
+
+ virtual bool check(lc_op_ctx& oc, ceph::real_time *exp_time) {
+ return false;
+ };
+
+ virtual int process(lc_op_ctx& oc) {
+ return 0;
+ }
+};
+
+class LCOpFilter {
+public:
+virtual ~LCOpFilter() {}
+ virtual bool check(lc_op_ctx& oc) {
+ return false;
+ }
+};
+
+class LCOpRule {
+ friend class LCOpAction;
+
+ op_env& env;
+
+ std::vector<unique_ptr<LCOpFilter> > filters;
+ std::vector<unique_ptr<LCOpAction> > actions;
+
+public:
+ LCOpRule(op_env& _env) : env(_env) {}
+
+ void build();
+ int process(rgw_bucket_dir_entry& o);
+};
+
+static int check_tags(lc_op_ctx& oc, bool *skip)
+{
+ auto& op = oc.op;
+
+ if (op.obj_tags != boost::none) {
+ *skip = true;
+
+ bufferlist tags_bl;
+ int ret = read_obj_tags(oc.store, oc.bucket_info, oc.obj, oc.rctx, tags_bl);
+ if (ret < 0) {
+ if (ret != -ENODATA) {
+ ldout(oc.cct, 5) << "ERROR: read_obj_tags returned r=" << ret << dendl;
+ }
+ return 0;
+ }
+ RGWObjTags dest_obj_tags;
+ try {
+ auto iter = tags_bl.cbegin();
+ dest_obj_tags.decode(iter);
+ } catch (buffer::error& err) {
+ ldout(oc.cct,0) << "ERROR: caught buffer::error, couldn't decode TagSet" << dendl;
+ return -EIO;
+ }
+
+ if (!includes(dest_obj_tags.get_tags().begin(),
+ dest_obj_tags.get_tags().end(),
+ op.obj_tags->get_tags().begin(),
+ op.obj_tags->get_tags().end())){
+ ldout(oc.cct, 20) << __func__ << "() skipping obj " << oc.obj << " as tags do not match" << dendl;
+ return 0;
+ }
+ }
+ *skip = false;
+ return 0;
+}
+
+class LCOpFilter_Tags : public LCOpFilter {
+public:
+ bool check(lc_op_ctx& oc) override {
+ auto& o = oc.o;
+
+ if (o.is_delete_marker()) {
+ return true;
+ }
+
+ bool skip;
+
+ int ret = check_tags(oc, &skip);
+ if (ret < 0) {
+ if (ret == -ENOENT) {
+ return false;
+ }
+ ldout(oc.cct, 0) << "ERROR: check_tags on obj=" << oc.obj << " returned ret=" << ret << dendl;
+ return false;
+ }
+
+ return !skip;
+ };
+};
+
+class LCOpAction_CurrentExpiration : public LCOpAction {
+public:
+ bool check(lc_op_ctx& oc, ceph::real_time *exp_time) override {
+ auto& o = oc.o;
+ if (!o.is_current()) {
+ return false;
+ }
+
+ auto& mtime = o.meta.mtime;
+ bool is_expired;
+ auto& op = oc.op;
+ if (op.expiration <= 0) {
+ if (op.expiration_date == boost::none) {
+ return false;
+ }
+ is_expired = ceph_clock_now() >= ceph::real_clock::to_time_t(*op.expiration_date);
+ *exp_time = *op.expiration_date;
+ } else {
+ is_expired = obj_has_expired(oc.cct, mtime, op.expiration, exp_time);
+ }
+
+ return is_expired;
+ }
+
+ int process(lc_op_ctx& oc) {
+ auto& o = oc.o;
+ int r = remove_expired_obj(oc.cct, oc.store, oc.bucket_info, o.key, o.meta.owner, o.meta.owner_display_name, !oc.bucket_info.versioned());
+ if (r < 0) {
+ ldout(oc.cct, 0) << "ERROR: remove_expired_obj " << dendl;
+ return r;
+ }
+ ldout(oc.cct, 2) << "DELETED:" << oc.bucket_info.bucket << ":" << o.key << dendl;
+ return 0;
+ }
+};
+
+class LCOpAction_NonCurrentExpiration : public LCOpAction {
+public:
+ bool check(lc_op_ctx& oc, ceph::real_time *exp_time) override {
+ auto& o = oc.o;
+ if (!o.is_current()) {
+ return false;
+ }
+
+ auto mtime = oc.ol.get_prev_obj().meta.mtime;
+ bool expiration = oc.op.noncur_expiration;
+ bool is_expired = obj_has_expired(oc.cct, mtime, expiration, exp_time);
+
+ return is_expired;
+ }
+
+ int process(lc_op_ctx& oc) {
+ auto& o = oc.o;
+ int r = remove_expired_obj(oc.cct, oc.store, oc.bucket_info, o.key, o.meta.owner, o.meta.owner_display_name, true);
+ if (r < 0) {
+ ldout(oc.cct, 0) << "ERROR: remove_expired_obj " << dendl;
+ return r;
+ }
+ ldout(oc.cct, 2) << "DELETED:" << oc.bucket_info.bucket << ":" << o.key << dendl;
+ return 0;
+ }
+};
+
+class LCOpAction_DMExpiration : public LCOpAction {
+public:
+ bool check(lc_op_ctx& oc, ceph::real_time *exp_time) override {
+ auto& o = oc.o;
+ if (!o.is_delete_marker()) {
+ return false;
+ }
+
+ if (oc.ol.next_has_same_name()) {
+ return false;
+ }
+
+ *exp_time = real_clock::now();
+
+ return true;
+ }
+
+ int process(lc_op_ctx& oc) {
+ auto& o = oc.o;
+ int r = remove_expired_obj(oc.cct, oc.store, oc.bucket_info, o.key, o.meta.owner, o.meta.owner_display_name, true);
+ if (r < 0) {
+ ldout(oc.cct, 0) << "ERROR: remove_expired_obj " << dendl;
+ return r;
+ }
+ ldout(oc.cct, 2) << "DELETED:" << oc.bucket_info.bucket << ":" << o.key << dendl;
+ return 0;
+ }
+};
+
+void LCOpRule::build()
+{
+ filters.emplace_back(new LCOpFilter_Tags);
+
+ auto& op = env.op;
+
+ if (op.expiration > 0 ||
+ op.expiration_date != boost::none) {
+ actions.emplace_back(new LCOpAction_CurrentExpiration);
+ }
+
+ if (op.dm_expiration) {
+ actions.emplace_back(new LCOpAction_DMExpiration);
+ }
+
+ if (op.noncur_expiration > 0) {
+ actions.emplace_back(new LCOpAction_NonCurrentExpiration);
+ }
+}
+
+int LCOpRule::process(rgw_bucket_dir_entry& o)
+{
+ bool cont = false;
+
+ lc_op_ctx ctx(env, o);
+
+ for (auto& f : filters) {
+ if (f->check(ctx)) {
+ cont = true;
+ break;
+ }
+ }
+
+ if (!cont) {
+ ldout(env.store->ctx(), 20) << __func__ << "(): skipping entry: " << o.key << dendl;
+ return 0;
+ }
+
+ unique_ptr<LCOpAction> *selected = nullptr;
+ real_time exp;
+
+ for (auto& a : actions) {
+ real_time action_exp;
+
+ if (a->check(ctx, &action_exp)) {
+ if (action_exp > exp) {
+ exp = action_exp;
+ selected = &a;
+ }
+ }
+ }
+
+ if (selected) {
+ int r = (*selected)->process(ctx);
+ if (r < 0) {
+ ldout(ctx.cct, 0) << "ERROR: remove_expired_obj " << dendl;
+ return r;
+ }
+ ldout(ctx.cct, 2) << "DELETED:" << env.bucket_info.bucket << ":" << o.key << dendl;
+ }
+
+ return 0;
+
+}
+
+
int RGWLC::bucket_lc_process(string& shard_id)
{
RGWLifecycleConfiguration config(cct);
return ret;
}
- ceph::real_time mtime;
- bool remove_indeed = true;
- int expiration;
- bool skip_expiration_check;
- bool is_expired;
- rgw_bucket_dir_entry o;
- for (; ol.get_obj(&o); ol.next()) {
- skip_expiration_check = false;
- is_expired = false;
+ op_env oenv(op, store, this, bucket_info, ol);
- RGWObjectCtx rctx(store);
- rgw_obj obj(bucket_info.bucket, o.key);
-
- if (!o.is_delete_marker()) {
- bool skip;
- int ret = check_tags(store, rctx, bucket_info, obj, op, &skip);
- if (ret < 0) {
- return ret;
- }
+ LCOpRule orule(oenv);
- if (skip) {
- continue;
- }
- }
+ orule.build();
- if (o.is_current()) {
- if (op.expiration <= 0 &&
- op.expiration_date == boost::none
- && !op.dm_expiration) {
- continue;
- }
- if (o.is_delete_marker()) {
- if (ol.next_has_same_name()) {
- continue;
- }
- skip_expiration_check = op.dm_expiration;
- remove_indeed = true; //we should remove the delete marker if it's the only version
- } else {
- remove_indeed = !bucket_info.versioned();
- }
- mtime = o.meta.mtime;
- expiration = op.expiration;
- if (!skip_expiration_check) {
- if (expiration <= 0) {
- if (op.expiration_date == boost::none) {
- continue;
- }
- is_expired = ceph_clock_now() >= ceph::real_clock::to_time_t(*op.expiration_date);
- } else {
- is_expired = obj_has_expired(mtime, expiration);
- }
- }
- } else { /* a noncurrent obj */
- if (op.noncur_expiration <= 0) {
- continue;
- }
- remove_indeed = true;
- mtime = ol.get_prev_obj().meta.mtime;
- expiration = op.noncur_expiration;
- is_expired = obj_has_expired(mtime, expiration);
+ ceph::real_time mtime;
+ rgw_bucket_dir_entry o;
+ for (; ol.get_obj(&o); ol.next()) {
+ int ret = orule.process(o);
+ if (ret < 0) {
+ ldpp_dout(this, 20) << "ERROR: orule.process() returned ret=" << ret << dendl;
}
- if (skip_expiration_check || is_expired) {
- if (o.is_visible()) {
- RGWObjState *state;
- int ret = store->get_obj_state(&rctx, bucket_info, obj, &state, false);
- if (ret < 0) {
- return ret;
- }
- if (state->mtime != o.meta.mtime)//Check mtime again to avoid delete a recently update object as much as possible
- continue;
- }
- ret = remove_expired_obj(bucket_info, o.key, o.meta.owner, o.meta.owner_display_name, remove_indeed);
- if (ret < 0) {
- ldpp_dout(this, 0) << "ERROR: remove_expired_obj " << dendl;
- } else {
- ldpp_dout(this, 2) << "DELETED:" << bucket_name << ":" << o.key << dendl;
- }
- if (going_down())
- return 0;
+ if (going_down()) {
+ return 0;
}
}
}