]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: tighter lifecycle processing
authorYehuda Sadeh <yehuda@redhat.com>
Fri, 21 Dec 2018 07:28:44 +0000 (23:28 -0800)
committerYehuda Sadeh <yehuda@redhat.com>
Fri, 4 Jan 2019 03:00:24 +0000 (19:00 -0800)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_lc.cc
src/rgw/rgw_lc.h

index 371ac075f75b66bab35aacadd7171ec57fa9be94..20e4c7842fa31649ab00b3119c7aaea975b21bdd 100644 (file)
@@ -310,7 +310,7 @@ int RGWLC::bucket_lc_prepare(int index)
   return 0;
 }
 
-bool RGWLC::obj_has_expired(ceph::real_time mtime, int days)
+static bool obj_has_expired(CephContext *cct, ceph::real_time mtime, int days, ceph::real_time *expire_time = nullptr)
 {
   double timediff, cmp;
   utime_t base_time;
@@ -325,10 +325,15 @@ bool RGWLC::obj_has_expired(ceph::real_time mtime, int days)
   }
   timediff = base_time - ceph::real_clock::to_time_t(mtime);
 
+  if (expire_time) {
+    *expire_time = mtime + make_timespan(cmp);
+  }
+
   return (timediff >= cmp);
 }
 
-int RGWLC::remove_expired_obj(RGWBucketInfo& bucket_info, rgw_obj_key obj_key, const string& owner, const string& owner_display_name, bool remove_indeed)
+static int remove_expired_obj(CephContext *cct, RGWRados *store, RGWBucketInfo& bucket_info, rgw_obj_key obj_key,
+                              const string& owner, const string& owner_display_name, bool remove_indeed)
 {
   if (remove_indeed) {
     return rgw_remove_object(store, bucket_info, bucket_info.bucket, obj_key);
@@ -385,7 +390,7 @@ int RGWLC::handle_multipart_expiration(RGWRados::Bucket *target, const map<strin
       }
 
       for (auto obj_iter = objs.begin(); obj_iter != objs.end(); ++obj_iter) {
-        if (obj_has_expired(obj_iter->meta.mtime, prefix_iter->second.mp_expiration)) {
+        if (obj_has_expired(cct, obj_iter->meta.mtime, prefix_iter->second.mp_expiration)) {
           rgw_obj_key key(obj_iter->key);
           if (!mp_obj.from_meta(key.name)) {
             continue;
@@ -415,40 +420,6 @@ static int read_obj_tags(RGWRados *store, RGWBucketInfo& bucket_info, rgw_obj& o
   return read_op.get_attr(RGW_ATTR_TAGS, tags_bl);
 }
 
-int RGWLC::check_tags(RGWRados *store, RGWObjectCtx& rctx, RGWBucketInfo& bucket_info, rgw_obj& obj, lc_op& op, bool *skip)
-{
-  if (op.obj_tags != boost::none) {
-    *skip = true;
-
-    bufferlist tags_bl;
-    int ret = read_obj_tags(store, bucket_info, obj, rctx, tags_bl);
-    if (ret < 0) {
-      if (ret != -ENODATA) {
-        ldpp_dout(this, 5) << "ERROR: read_obj_tags returned r=" << ret << dendl;
-      }
-      return 0;
-    }
-    RGWObjTags dest_obj_tags;
-    try {
-      auto iter = tags_bl.cbegin();
-      dest_obj_tags.decode(iter);
-    } catch (buffer::error& err) {
-      ldpp_dout(this,0) << "ERROR: caught buffer::error, couldn't decode TagSet" << dendl;
-      return -EIO;
-    }
-
-    if (!includes(dest_obj_tags.get_tags().begin(),
-                  dest_obj_tags.get_tags().end(),
-                  op.obj_tags->get_tags().begin(),
-                  op.obj_tags->get_tags().end())){
-      ldpp_dout(this, 20) << __func__ << "() skipping obj " << obj << " as tags do not match" << dendl;
-      return 0;
-    }
-  }
-  *skip = false;
-  return 0;
-}
-
 static bool is_valid_op(const lc_op& op)
 {
       return (op.status &&
@@ -544,6 +515,292 @@ public:
   }
 };
 
+
+struct op_env {
+  lc_op& op;
+  RGWRados *store;
+  RGWLC *lc;
+  RGWBucketInfo& bucket_info;
+  LCObjsLister& ol;
+
+  op_env(lc_op& _op, RGWRados *_store, RGWLC *_lc, RGWBucketInfo& _bucket_info,
+         LCObjsLister& _ol) : op(_op), store(_store), lc(_lc), bucket_info(_bucket_info), ol(_ol) {}
+};
+
+class LCRuleOp;
+
+struct lc_op_ctx {
+  CephContext *cct;
+  op_env& env;
+  rgw_bucket_dir_entry& o;
+
+  RGWRados *store;
+  RGWBucketInfo& bucket_info;
+  lc_op& op;
+  LCObjsLister& ol;
+
+  rgw_obj obj;
+  RGWObjectCtx rctx;
+
+  lc_op_ctx(op_env& _env, rgw_bucket_dir_entry& _o) : cct(_env.store->ctx()), env(_env), o(_o),
+                 store(env.store), bucket_info(env.bucket_info), op(env.op), ol(env.ol),
+                 obj(env.bucket_info.bucket, o.key), rctx(env.store) {}
+};
+
+class LCOpAction {
+public:
+  virtual ~LCOpAction() {}
+
+  virtual bool check(lc_op_ctx& oc, ceph::real_time *exp_time) {
+    return false;
+  };
+
+  virtual int process(lc_op_ctx& oc) {
+    return 0;
+  }
+};
+
+class LCOpFilter {
+public:
+virtual ~LCOpFilter() {}
+  virtual bool check(lc_op_ctx& oc) {
+    return false;
+  }
+};
+
+class LCOpRule {
+  friend class LCOpAction;
+
+  op_env& env;
+
+  std::vector<unique_ptr<LCOpFilter> > filters;
+  std::vector<unique_ptr<LCOpAction> > actions;
+
+public:
+  LCOpRule(op_env& _env) : env(_env) {}
+
+  void build();
+  int process(rgw_bucket_dir_entry& o);
+};
+
+static int check_tags(lc_op_ctx& oc, bool *skip)
+{
+  auto& op = oc.op;
+
+  if (op.obj_tags != boost::none) {
+    *skip = true;
+
+    bufferlist tags_bl;
+    int ret = read_obj_tags(oc.store, oc.bucket_info, oc.obj, oc.rctx, tags_bl);
+    if (ret < 0) {
+      if (ret != -ENODATA) {
+        ldout(oc.cct, 5) << "ERROR: read_obj_tags returned r=" << ret << dendl;
+      }
+      return 0;
+    }
+    RGWObjTags dest_obj_tags;
+    try {
+      auto iter = tags_bl.cbegin();
+      dest_obj_tags.decode(iter);
+    } catch (buffer::error& err) {
+      ldout(oc.cct,0) << "ERROR: caught buffer::error, couldn't decode TagSet" << dendl;
+      return -EIO;
+    }
+
+    if (!includes(dest_obj_tags.get_tags().begin(),
+                  dest_obj_tags.get_tags().end(),
+                  op.obj_tags->get_tags().begin(),
+                  op.obj_tags->get_tags().end())){
+      ldout(oc.cct, 20) << __func__ << "() skipping obj " << oc.obj << " as tags do not match" << dendl;
+      return 0;
+    }
+  }
+  *skip = false;
+  return 0;
+}
+
+class LCOpFilter_Tags : public LCOpFilter {
+public:
+  bool check(lc_op_ctx& oc) override {
+    auto& o = oc.o;
+
+    if (o.is_delete_marker()) {
+      return true;
+    }
+
+    bool skip;
+
+    int ret = check_tags(oc, &skip);
+    if (ret < 0) {
+      if (ret == -ENOENT) {
+        return false;
+      }
+      ldout(oc.cct, 0) << "ERROR: check_tags on obj=" << oc.obj << " returned ret=" << ret << dendl;
+      return false;
+    }
+
+    return !skip;
+  };
+};
+
+class LCOpAction_CurrentExpiration : public LCOpAction {
+public:
+  bool check(lc_op_ctx& oc, ceph::real_time *exp_time) override {
+    auto& o = oc.o;
+    if (!o.is_current()) {
+      return false;
+    }
+
+    auto& mtime = o.meta.mtime;
+    bool is_expired;
+    auto& op = oc.op;
+    if (op.expiration <= 0) {
+      if (op.expiration_date == boost::none) {
+        return false;
+      }
+      is_expired = ceph_clock_now() >= ceph::real_clock::to_time_t(*op.expiration_date);
+      *exp_time = *op.expiration_date;
+    } else {
+      is_expired = obj_has_expired(oc.cct, mtime, op.expiration, exp_time);
+    }
+
+    return is_expired;
+  }
+
+  int process(lc_op_ctx& oc) {
+    auto& o = oc.o;
+    int r = remove_expired_obj(oc.cct, oc.store, oc.bucket_info, o.key, o.meta.owner, o.meta.owner_display_name, !oc.bucket_info.versioned());
+    if (r < 0) {
+      ldout(oc.cct, 0) << "ERROR: remove_expired_obj " << dendl;
+      return r;
+    }
+    ldout(oc.cct, 2) << "DELETED:" << oc.bucket_info.bucket << ":" << o.key << dendl;
+    return 0;
+  }
+};
+
+class LCOpAction_NonCurrentExpiration : public LCOpAction {
+public:
+  bool check(lc_op_ctx& oc, ceph::real_time *exp_time) override {
+    auto& o = oc.o;
+    if (!o.is_current()) {
+      return false;
+    }
+
+    auto mtime = oc.ol.get_prev_obj().meta.mtime;
+    bool expiration = oc.op.noncur_expiration;
+    bool is_expired = obj_has_expired(oc.cct, mtime, expiration, exp_time);
+
+    return is_expired;
+  }
+
+  int process(lc_op_ctx& oc) {
+    auto& o = oc.o;
+    int r = remove_expired_obj(oc.cct, oc.store, oc.bucket_info, o.key, o.meta.owner, o.meta.owner_display_name, true);
+    if (r < 0) {
+      ldout(oc.cct, 0) << "ERROR: remove_expired_obj " << dendl;
+      return r;
+    }
+    ldout(oc.cct, 2) << "DELETED:" << oc.bucket_info.bucket << ":" << o.key << dendl;
+    return 0;
+  }
+};
+
+class LCOpAction_DMExpiration : public LCOpAction {
+public:
+  bool check(lc_op_ctx& oc, ceph::real_time *exp_time) override {
+    auto& o = oc.o;
+    if (!o.is_delete_marker()) {
+      return false;
+    }
+
+    if (oc.ol.next_has_same_name()) {
+      return false;
+    }
+
+    *exp_time = real_clock::now();
+
+    return true;
+  }
+
+  int process(lc_op_ctx& oc) {
+    auto& o = oc.o;
+    int r = remove_expired_obj(oc.cct, oc.store, oc.bucket_info, o.key, o.meta.owner, o.meta.owner_display_name, true);
+    if (r < 0) {
+      ldout(oc.cct, 0) << "ERROR: remove_expired_obj " << dendl;
+      return r;
+    }
+    ldout(oc.cct, 2) << "DELETED:" << oc.bucket_info.bucket << ":" << o.key << dendl;
+    return 0;
+  }
+};
+
+void LCOpRule::build()
+{
+  filters.emplace_back(new LCOpFilter_Tags);
+
+  auto& op = env.op;
+
+  if (op.expiration > 0 ||
+      op.expiration_date != boost::none) {
+    actions.emplace_back(new LCOpAction_CurrentExpiration);
+  }
+
+  if (op.dm_expiration) {
+    actions.emplace_back(new LCOpAction_DMExpiration);
+  }
+
+  if (op.noncur_expiration > 0) {
+    actions.emplace_back(new LCOpAction_NonCurrentExpiration);
+  }
+}
+
+int LCOpRule::process(rgw_bucket_dir_entry& o)
+{
+  bool cont = false;
+
+  lc_op_ctx ctx(env, o);
+
+  for (auto& f : filters) {
+    if (f->check(ctx)) {
+      cont = true;
+      break;
+    }
+  }
+
+  if (!cont) {
+    ldout(env.store->ctx(), 20) << __func__ << "(): skipping entry: " << o.key << dendl;
+    return 0;
+  }
+
+  unique_ptr<LCOpAction> *selected = nullptr;
+  real_time exp;
+
+  for (auto& a : actions) {
+    real_time action_exp;
+
+    if (a->check(ctx, &action_exp)) {
+      if (action_exp > exp) {
+        exp = action_exp;
+        selected = &a;
+      }
+    }
+  }
+
+  if (selected) {
+    int r = (*selected)->process(ctx);
+    if (r < 0) {
+      ldout(ctx.cct, 0) << "ERROR: remove_expired_obj " << dendl;
+      return r;
+    }
+    ldout(ctx.cct, 2) << "DELETED:" << env.bucket_info.bucket << ":" << o.key << dendl;
+  }
+
+  return 0;
+
+}
+
+
 int RGWLC::bucket_lc_process(string& shard_id)
 {
   RGWLifecycleConfiguration  config(cct);
@@ -609,86 +866,22 @@ int RGWLC::bucket_lc_process(string& shard_id)
       return ret;
     }
 
-    ceph::real_time mtime;
-    bool remove_indeed = true;
-    int expiration;
-    bool skip_expiration_check;
-    bool is_expired;
-    rgw_bucket_dir_entry o;
-    for (; ol.get_obj(&o); ol.next()) {
-      skip_expiration_check = false;
-      is_expired = false;
+    op_env oenv(op, store, this, bucket_info, ol);
 
-      RGWObjectCtx rctx(store);
-      rgw_obj obj(bucket_info.bucket, o.key);
-
-      if (!o.is_delete_marker()) {
-        bool skip;
-        int ret = check_tags(store, rctx, bucket_info, obj, op, &skip);
-        if (ret < 0) {
-          return ret;
-        }
+    LCOpRule orule(oenv);
 
-        if (skip) {
-          continue;
-        }
-      }
+    orule.build();
 
-      if (o.is_current()) {
-        if (op.expiration <= 0 &&
-            op.expiration_date == boost::none
-            && !op.dm_expiration) {
-          continue;
-        }
-        if (o.is_delete_marker()) {
-          if (ol.next_has_same_name()) {
-            continue;
-          }
-          skip_expiration_check = op.dm_expiration;
-          remove_indeed = true;   //we should remove the delete marker if it's the only version
-        } else {
-          remove_indeed = !bucket_info.versioned();
-        }
-        mtime = o.meta.mtime;
-        expiration = op.expiration;
-        if (!skip_expiration_check) {
-          if (expiration <= 0) {
-            if (op.expiration_date == boost::none) {
-              continue;
-            }
-            is_expired = ceph_clock_now() >= ceph::real_clock::to_time_t(*op.expiration_date);
-          } else {
-            is_expired = obj_has_expired(mtime, expiration);
-          }
-        }
-      } else { /* a noncurrent obj */
-        if (op.noncur_expiration <= 0) {
-          continue;
-        }
-        remove_indeed = true;
-        mtime = ol.get_prev_obj().meta.mtime;
-        expiration = op.noncur_expiration;
-        is_expired = obj_has_expired(mtime, expiration);
+    ceph::real_time mtime;
+    rgw_bucket_dir_entry o;
+    for (; ol.get_obj(&o); ol.next()) {
+      int ret = orule.process(o);
+      if (ret < 0) {
+        ldpp_dout(this, 20) << "ERROR: orule.process() returned ret=" << ret << dendl;
       }
-      if (skip_expiration_check || is_expired) {
-        if (o.is_visible()) {
-          RGWObjState *state;
-          int ret = store->get_obj_state(&rctx, bucket_info, obj, &state, false);
-          if (ret < 0) {
-            return ret;
-          }
-          if (state->mtime != o.meta.mtime)//Check mtime again to avoid delete a recently update object as much as possible
-            continue;
-        }
-        ret = remove_expired_obj(bucket_info, o.key, o.meta.owner, o.meta.owner_display_name, remove_indeed);
-        if (ret < 0) {
-          ldpp_dout(this, 0) << "ERROR: remove_expired_obj " << dendl;
-        } else {
-          ldpp_dout(this, 2) << "DELETED:" << bucket_name << ":" << o.key << dendl;
-        }
 
-        if (going_down())
-          return 0;
+      if (going_down()) {
+        return 0;
       }
     }
   }
index 8a5dd66e3f9f847edb0ae527baf0b6027f9a7c05..01fc01c70055091567251ac147e67c1b0fbacd4f 100644 (file)
@@ -499,9 +499,6 @@ class RGWLC : public DoutPrefixProvider {
 
   private:
 
-  int check_tags(RGWRados *store, RGWObjectCtx& rctx, RGWBucketInfo& bucket_info, rgw_obj& obj, lc_op& op, bool *skip);
-  int remove_expired_obj(RGWBucketInfo& bucket_info, rgw_obj_key obj_key, const string& owner, const string& owner_display_name, bool remove_indeed = true);
-  bool obj_has_expired(ceph::real_time mtime, int days);
   int handle_multipart_expiration(RGWRados::Bucket *target, const map<string, lc_op>& prefix_map);
 };