]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: lifecycle: consolidate versioned and non-versioned operation
authorYehuda Sadeh <yehuda@redhat.com>
Mon, 17 Dec 2018 15:00:36 +0000 (07:00 -0800)
committerYehuda Sadeh <yehuda@redhat.com>
Fri, 4 Jan 2019 03:00:23 +0000 (19:00 -0800)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_lc.cc

index e77efbb0a8ba6d82b782cbcd80293d4ebce0d9ff..371ac075f75b66bab35aacadd7171ec57fa9be94 100644 (file)
@@ -469,6 +469,7 @@ class LCObjsLister {
   vector<rgw_bucket_dir_entry> objs;
   vector<rgw_bucket_dir_entry>::iterator obj_iter;
   rgw_bucket_dir_entry pre_obj;
+  int64_t delay_ms;
 
 public:
   LCObjsLister(RGWRados *_store, RGWBucketInfo& _bucket_info) :
@@ -476,6 +477,7 @@ public:
       target(store, bucket_info), list_op(&target) {
     list_op.params.list_versions = bucket_info.versioned();
     list_op.params.allow_unordered = true;
+    delay_ms = store->ctx()->_conf.get_val<int64_t>("rgw_lc_thread_delay");
   }
 
   void set_prefix(const string& p) {
@@ -497,8 +499,13 @@ public:
     return 0;
   }
 
+  void delay() {
+    std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms));
+  }
+
   bool get_obj(rgw_bucket_dir_entry *obj) {
     if (obj_iter == objs.end()) {
+      delay();
       return false;
     }
     if (is_truncated && (obj_iter + 1)==objs.end()) {
@@ -511,6 +518,7 @@ public:
       } else {
         obj_iter = objs.begin();
       }
+      delay();
     }
     *obj = *obj_iter;
     return true;
@@ -541,11 +549,10 @@ int RGWLC::bucket_lc_process(string& shard_id)
   RGWLifecycleConfiguration  config(cct);
   RGWBucketInfo bucket_info;
   map<string, bufferlist> bucket_attrs;
-  string next_marker, no_ns, list_versions;
+  string no_ns, list_versions;
   vector<rgw_bucket_dir_entry> objs;
   auto obj_ctx = store->svc.sysobj->init_obj_ctx();
   vector<std::string> result;
-  auto delay_ms = cct->_conf.get_val<int64_t>("rgw_lc_thread_delay");
   boost::split(result, shard_id, boost::is_any_of(":"));
   string bucket_tenant = result[0];
   string bucket_name = result[1];
@@ -578,174 +585,110 @@ int RGWLC::bucket_lc_process(string& shard_id)
     }
 
   map<string, lc_op>& prefix_map = config.get_prefix_map();
-  if (!bucket_info.versioned()) {
-    for(auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); ++prefix_iter) {
-      if (!prefix_iter->second.status || 
-        (prefix_iter->second.expiration <=0 && prefix_iter->second.expiration_date == boost::none)) {
-        continue;
-      }
-      if (prefix_iter->second.expiration_date != boost::none && 
-        ceph_clock_now() < ceph::real_clock::to_time_t(*prefix_iter->second.expiration_date)) {
-        continue;
-      }
-      /* lifecycle processing does not depend on total order, so can
-       * take advantage of unorderd listing optimizations--such as
-       * operating on one shard at a time */
-      ol.set_prefix(prefix_iter->first);
+  rgw_obj_key pre_marker;
+  rgw_obj_key next_marker;
+  for(auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); ++prefix_iter) {
+    auto& op = prefix_iter->second;
+    if (!is_valid_op(op)) {
+      continue;
+    }
+    if (prefix_iter != prefix_map.begin() && 
+        (prefix_iter->first.compare(0, prev(prefix_iter)->first.length(), prev(prefix_iter)->first) == 0)) {
+      next_marker = pre_marker;
+    } else {
+      pre_marker = next_marker;
+    }
+    ol.set_prefix(prefix_iter->first);
 
-      ret = ol.init();
+    ret = ol.init();
 
-      if (ret < 0) {
-        if (ret == (-ENOENT))
-          return 0;
-        ldpp_dout(this, 0) << "ERROR: store->list_objects():" <<dendl;
-        return ret;
-      }
+    if (ret < 0) {
+      if (ret == (-ENOENT))
+        return 0;
+      ldpp_dout(this, 0) << "ERROR: store->list_objects():" <<dendl;
+      return ret;
+    }
 
-      bool is_expired;
-      rgw_bucket_dir_entry o;
-      for (; ol.get_obj(&o); ol.next()) {
-        rgw_obj_key key(o.key);
-        RGWObjState *state;
-        rgw_obj obj(bucket_info.bucket, key);
-        RGWObjectCtx rctx(store);
-        bool skip;
-        auto& op = prefix_iter->second;
+    ceph::real_time mtime;
+    bool remove_indeed = true;
+    int expiration;
+    bool skip_expiration_check;
+    bool is_expired;
+    rgw_bucket_dir_entry o;
+    for (; ol.get_obj(&o); ol.next()) {
+      skip_expiration_check = false;
+      is_expired = false;
 
-        if (!key.ns.empty()) {
-          continue;
-        }
+      RGWObjectCtx rctx(store);
+      rgw_obj obj(bucket_info.bucket, o.key);
 
+      if (!o.is_delete_marker()) {
+        bool skip;
         int ret = check_tags(store, rctx, bucket_info, obj, op, &skip);
         if (ret < 0) {
           return ret;
         }
+
         if (skip) {
           continue;
         }
+      }
 
-        if (op.expiration_date != boost::none) {
-          //we have checked it before
-          is_expired = true;
-        } else {
-          is_expired = obj_has_expired(o.meta.mtime, op.expiration);
+      if (o.is_current()) {
+        if (op.expiration <= 0 &&
+            op.expiration_date == boost::none
+            && !op.dm_expiration) {
+          continue;
         }
-        if (is_expired) {
-          int ret = store->get_obj_state(&rctx, bucket_info, obj, &state, false);
-          if (ret < 0) {
-            return ret;
-          }
-          if (state->mtime != o.meta.mtime) {
-            //Check mtime again to avoid delete a recently update object as much as possible
-            ldpp_dout(this, 20) << __func__ << "() skipping removal: state->mtime " << state->mtime << " obj->mtime " << o.meta.mtime << dendl;
+        if (o.is_delete_marker()) {
+          if (ol.next_has_same_name()) {
             continue;
           }
-          ret = remove_expired_obj(bucket_info, o.key, o.meta.owner, o.meta.owner_display_name, true);
-          if (ret < 0) {
-            ldpp_dout(this, 0) << "ERROR: remove_expired_obj " << dendl;
-          } else {
-            ldpp_dout(this, 2) << "DELETED:" << bucket_name << ":" << key << dendl;
-          }
-
-          if (going_down())
-            return 0;
+          skip_expiration_check = op.dm_expiration;
+          remove_indeed = true;   //we should remove the delete marker if it's the only version
+        } else {
+          remove_indeed = !bucket_info.versioned();
         }
-      } /* for objs */
-      std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms));
-    }
-  } else {
-  //bucket versioning is enabled or suspended
-    rgw_obj_key pre_marker;
-    rgw_obj_key next_marker;
-    for(auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); ++prefix_iter) {
-      auto& op = prefix_iter->second;
-      if (!is_valid_op(op)) {
-        continue;
-      }
-      if (prefix_iter != prefix_map.begin() && 
-          (prefix_iter->first.compare(0, prev(prefix_iter)->first.length(), prev(prefix_iter)->first) == 0)) {
-       next_marker = pre_marker;
-      } else {
-        pre_marker = next_marker;
-      }
-      ol.set_prefix(prefix_iter->first);
-
-      ret = ol.init();
-
-      if (ret < 0) {
-        if (ret == (-ENOENT))
-          return 0;
-        ldpp_dout(this, 0) << "ERROR: store->list_objects():" <<dendl;
-        return ret;
-      }
-
-      ceph::real_time mtime;
-      bool remove_indeed = true;
-      int expiration;
-      bool skip_expiration_check;
-      bool is_expired;
-      rgw_bucket_dir_entry o;
-      for (; ol.get_obj(&o); ol.next()) {
-        skip_expiration_check = false;
-        is_expired = false;
-        if (o.is_current()) {
-          if (op.expiration <= 0 &&
-              op.expiration_date == boost::none
-              && !op.dm_expiration) {
-            continue;
-          }
-          if (o.is_delete_marker()) {
-            if (ol.next_has_same_name()) {
+        mtime = o.meta.mtime;
+        expiration = op.expiration;
+        if (!skip_expiration_check) {
+          if (expiration <= 0) {
+            if (op.expiration_date == boost::none) {
               continue;
             }
-            skip_expiration_check = op.dm_expiration;
-            remove_indeed = true;   //we should remove the delete marker if it's the only version
+            is_expired = ceph_clock_now() >= ceph::real_clock::to_time_t(*op.expiration_date);
           } else {
-            remove_indeed = false;
-          }
-          mtime = o.meta.mtime;
-          expiration = op.expiration;
-          if (!skip_expiration_check) {
-            if (expiration <= 0) {
-              if (op.expiration_date == boost::none) {
-                continue;
-              }
-              is_expired = ceph_clock_now() >= ceph::real_clock::to_time_t(*op.expiration_date);
-            } else {
-              is_expired = obj_has_expired(mtime, expiration);
-            }
-          }
-        } else { /* a noncurrent obj */
-          if (op.noncur_expiration <= 0) {
-            continue;
+            is_expired = obj_has_expired(mtime, expiration);
           }
-          remove_indeed = true;
-          mtime = ol.get_prev_obj().meta.mtime;
-          expiration = op.noncur_expiration;
-          is_expired = obj_has_expired(mtime, expiration);
         }
-        if (skip_expiration_check || is_expired) {
-          if (o.is_visible()) {
-            RGWObjectCtx rctx(store);
-            rgw_obj obj(bucket_info.bucket, o.key);
-            RGWObjState *state;
-            int ret = store->get_obj_state(&rctx, bucket_info, obj, &state, false);
-            if (ret < 0) {
-              return ret;
-            }
-            if (state->mtime != o.meta.mtime)//Check mtime again to avoid delete a recently update object as much as possible
-              continue;
-          }
-          ret = remove_expired_obj(bucket_info, o.key, o.meta.owner, o.meta.owner_display_name, remove_indeed);
+      } else { /* a noncurrent obj */
+        if (op.noncur_expiration <= 0) {
+          continue;
+        }
+        remove_indeed = true;
+        mtime = ol.get_prev_obj().meta.mtime;
+        expiration = op.noncur_expiration;
+        is_expired = obj_has_expired(mtime, expiration);
+      }
+      if (skip_expiration_check || is_expired) {
+        if (o.is_visible()) {
+          RGWObjState *state;
+          int ret = store->get_obj_state(&rctx, bucket_info, obj, &state, false);
           if (ret < 0) {
-            ldpp_dout(this, 0) << "ERROR: remove_expired_obj " << dendl;
-          } else {
-            ldpp_dout(this, 2) << "DELETED:" << bucket_name << ":" << o.key << dendl;
+            return ret;
           }
-
-          if (going_down())
-            return 0;
+          if (state->mtime != o.meta.mtime)//Check mtime again to avoid delete a recently update object as much as possible
+            continue;
         }
+        ret = remove_expired_obj(bucket_info, o.key, o.meta.owner, o.meta.owner_display_name, remove_indeed);
+        if (ret < 0) {
+          ldpp_dout(this, 0) << "ERROR: remove_expired_obj " << dendl;
+        } else {
+          ldpp_dout(this, 2) << "DELETED:" << bucket_name << ":" << o.key << dendl;
+        }
+
+        if (going_down())
+          return 0;
       }
     }
   }