]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: lifecycle: rework listing iteration
authorYehuda Sadeh <yehuda@redhat.com>
Mon, 17 Dec 2018 14:17:02 +0000 (06:17 -0800)
committerYehuda Sadeh <yehuda@redhat.com>
Fri, 4 Jan 2019 03:00:23 +0000 (19:00 -0800)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_lc.cc

index cf363e06fbbbe337a08a87a1ab604a41bb2933ba..e77efbb0a8ba6d82b782cbcd80293d4ebce0d9ff 100644 (file)
@@ -458,13 +458,90 @@ static bool is_valid_op(const lc_op& op)
                || op.dm_expiration));
 }
 
+class LCObjsLister {
+  RGWRados *store;
+  RGWBucketInfo& bucket_info;
+  RGWRados::Bucket target;
+  RGWRados::Bucket::List list_op;
+  bool is_truncated{false};
+  rgw_obj_key next_marker;
+  string prefix;
+  vector<rgw_bucket_dir_entry> objs;
+  vector<rgw_bucket_dir_entry>::iterator obj_iter;
+  rgw_bucket_dir_entry pre_obj;
+
+public:
+  LCObjsLister(RGWRados *_store, RGWBucketInfo& _bucket_info) :
+      store(_store), bucket_info(_bucket_info),
+      target(store, bucket_info), list_op(&target) {
+    list_op.params.list_versions = bucket_info.versioned();
+    list_op.params.allow_unordered = true;
+  }
+
+  void set_prefix(const string& p) {
+    prefix = p;
+  }
+
+  int init() {
+    return fetch();
+  }
+
+  int fetch() {
+    int ret = list_op.list_objects(1000, &objs, NULL, &is_truncated);
+    if (ret < 0) {
+      return ret;
+    }
+
+    obj_iter = objs.begin();
+
+    return 0;
+  }
+
+  bool get_obj(rgw_bucket_dir_entry *obj) {
+    if (obj_iter == objs.end()) {
+      return false;
+    }
+    if (is_truncated && (obj_iter + 1)==objs.end()) {
+      list_op.params.marker = obj_iter->key;
+
+      int ret = fetch();
+      if (ret < 0) {
+        ldout(store->ctx(), 0) << "ERROR: list_op returned ret=" << ret << dendl;
+        return ret;
+      } else {
+        obj_iter = objs.begin();
+      }
+    }
+    *obj = *obj_iter;
+    return true;
+  }
+
+  rgw_bucket_dir_entry get_prev_obj() {
+    return pre_obj;
+  }
+
+  void next() {
+    pre_obj = *obj_iter;
+    ++obj_iter;
+  }
+
+  bool next_has_same_name()
+  {
+    if ((obj_iter + 1) == objs.end()) {
+      /* this should have been called after get_obj() was called, so this should
+       * only happen if is_truncated is false */
+      return false;
+    }
+    return (obj_iter->key.name.compare((obj_iter + 1)->key.name) == 0);
+  }
+};
+
 int RGWLC::bucket_lc_process(string& shard_id)
 {
   RGWLifecycleConfiguration  config(cct);
   RGWBucketInfo bucket_info;
   map<string, bufferlist> bucket_attrs;
   string next_marker, no_ns, list_versions;
-  bool is_truncated;
   vector<rgw_bucket_dir_entry> objs;
   auto obj_ctx = store->svc.sysobj->init_obj_ctx();
   vector<std::string> result;
@@ -486,7 +563,7 @@ int RGWLC::bucket_lc_process(string& shard_id)
   }
 
   RGWRados::Bucket target(store, bucket_info);
-  RGWRados::Bucket::List list_op(&target);
+  LCObjsLister ol(store, bucket_info);
 
   map<string, bufferlist>::iterator aiter = bucket_attrs.find(RGW_ATTR_LC);
   if (aiter == bucket_attrs.end())
@@ -501,7 +578,6 @@ int RGWLC::bucket_lc_process(string& shard_id)
     }
 
   map<string, lc_op>& prefix_map = config.get_prefix_map();
-  list_op.params.list_versions = bucket_info.versioned();
   if (!bucket_info.versioned()) {
     for(auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); ++prefix_iter) {
       if (!prefix_iter->second.status || 
@@ -515,70 +591,67 @@ int RGWLC::bucket_lc_process(string& shard_id)
       /* lifecycle processing does not depend on total order, so can
        * take advantage of unorderd listing optimizations--such as
        * operating on one shard at a time */
-      list_op.params.prefix = prefix_iter->first;
-      list_op.params.allow_unordered = true;
-      do {
-        objs.clear();
-        list_op.params.marker = list_op.get_next_marker();
-        ret = list_op.list_objects(1000, &objs, NULL, &is_truncated);
+      ol.set_prefix(prefix_iter->first);
+
+      ret = ol.init();
 
+      if (ret < 0) {
+        if (ret == (-ENOENT))
+          return 0;
+        ldpp_dout(this, 0) << "ERROR: store->list_objects():" <<dendl;
+        return ret;
+      }
+
+      bool is_expired;
+      rgw_bucket_dir_entry o;
+      for (; ol.get_obj(&o); ol.next()) {
+        rgw_obj_key key(o.key);
+        RGWObjState *state;
+        rgw_obj obj(bucket_info.bucket, key);
+        RGWObjectCtx rctx(store);
+        bool skip;
+        auto& op = prefix_iter->second;
+
+        if (!key.ns.empty()) {
+          continue;
+        }
+
+        int ret = check_tags(store, rctx, bucket_info, obj, op, &skip);
         if (ret < 0) {
-          if (ret == (-ENOENT))
-            return 0;
-          ldpp_dout(this, 0) << "ERROR: store->list_objects():" <<dendl;
           return ret;
         }
-        
-        bool is_expired;
-        for (auto obj_iter = objs.begin(); obj_iter != objs.end(); ++obj_iter) {
-          rgw_obj_key key(obj_iter->key);
-          RGWObjState *state;
-          rgw_obj obj(bucket_info.bucket, key);
-          RGWObjectCtx rctx(store);
-          bool skip;
-          auto& op = prefix_iter->second;
-
-          if (!key.ns.empty()) {
-            continue;
-          }
+        if (skip) {
+          continue;
+        }
 
-          int ret = check_tags(store, rctx, bucket_info, obj, op, &skip);
+        if (op.expiration_date != boost::none) {
+          //we have checked it before
+          is_expired = true;
+        } else {
+          is_expired = obj_has_expired(o.meta.mtime, op.expiration);
+        }
+        if (is_expired) {
+          int ret = store->get_obj_state(&rctx, bucket_info, obj, &state, false);
           if (ret < 0) {
             return ret;
           }
-          if (skip) {
+          if (state->mtime != o.meta.mtime) {
+            //Check mtime again to avoid delete a recently update object as much as possible
+            ldpp_dout(this, 20) << __func__ << "() skipping removal: state->mtime " << state->mtime << " obj->mtime " << o.meta.mtime << dendl;
             continue;
           }
-
-          if (op.expiration_date != boost::none) {
-            //we have checked it before
-            is_expired = true;
+          ret = remove_expired_obj(bucket_info, o.key, o.meta.owner, o.meta.owner_display_name, true);
+          if (ret < 0) {
+            ldpp_dout(this, 0) << "ERROR: remove_expired_obj " << dendl;
           } else {
-            is_expired = obj_has_expired(obj_iter->meta.mtime, op.expiration);
+            ldpp_dout(this, 2) << "DELETED:" << bucket_name << ":" << key << dendl;
           }
-          if (is_expired) {
-            int ret = store->get_obj_state(&rctx, bucket_info, obj, &state, false);
-            if (ret < 0) {
-              return ret;
-            }
-            if (state->mtime != obj_iter->meta.mtime) {
-              //Check mtime again to avoid delete a recently update object as much as possible
-              ldpp_dout(this, 20) << __func__ << "() skipping removal: state->mtime " << state->mtime << " obj->mtime " << obj_iter->meta.mtime << dendl;
-              continue;
-            }
-            ret = remove_expired_obj(bucket_info, obj_iter->key, obj_iter->meta.owner, obj_iter->meta.owner_display_name, true);
-            if (ret < 0) {
-              ldpp_dout(this, 0) << "ERROR: remove_expired_obj " << dendl;
-            } else {
-              ldpp_dout(this, 2) << "DELETED:" << bucket_name << ":" << key << dendl;
-            }
 
-            if (going_down())
-              return 0;
-          }
-        } /* for objs */
-       std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms));
-      } while (is_truncated);
+          if (going_down())
+            return 0;
+        }
+      } /* for objs */
+      std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms));
     }
   } else {
   //bucket versioning is enabled or suspended
@@ -593,100 +666,87 @@ int RGWLC::bucket_lc_process(string& shard_id)
           (prefix_iter->first.compare(0, prev(prefix_iter)->first.length(), prev(prefix_iter)->first) == 0)) {
        next_marker = pre_marker;
       } else {
-       pre_marker = next_marker;
+        pre_marker = next_marker;
       }
-      list_op.params.prefix = prefix_iter->first;
-      rgw_bucket_dir_entry pre_obj;
-      do {
-        if (!objs.empty()) {
-          pre_obj = objs.back();
-        }
-        objs.clear();
-        list_op.params.marker = next_marker;
-        ret = list_op.list_objects(1000, &objs, NULL, &is_truncated);
+      ol.set_prefix(prefix_iter->first);
 
-        if (ret < 0) {
-          if (ret == (-ENOENT))
-            return 0;
-          ldpp_dout(this, 0) << "ERROR: store->list_objects():" <<dendl;
-          return ret;
-        }
+      ret = ol.init();
+
+      if (ret < 0) {
+        if (ret == (-ENOENT))
+          return 0;
+        ldpp_dout(this, 0) << "ERROR: store->list_objects():" <<dendl;
+        return ret;
+      }
 
-        next_marker = list_op.get_next_marker();
-
-        ceph::real_time mtime;
-        bool remove_indeed = true;
-        int expiration;
-        bool skip_expiration_check;
-        bool is_expired;
-        for (auto obj_iter = objs.begin(); obj_iter != objs.end(); ++obj_iter) {
-          skip_expiration_check = false;
-          is_expired = false;
-          if (obj_iter->is_current()) {
-            if (prefix_iter->second.expiration <= 0 && prefix_iter->second.expiration_date == boost::none
-              && !prefix_iter->second.dm_expiration) {
+      ceph::real_time mtime;
+      bool remove_indeed = true;
+      int expiration;
+      bool skip_expiration_check;
+      bool is_expired;
+      rgw_bucket_dir_entry o;
+      for (; ol.get_obj(&o); ol.next()) {
+        skip_expiration_check = false;
+        is_expired = false;
+        if (o.is_current()) {
+          if (op.expiration <= 0 &&
+              op.expiration_date == boost::none
+              && !op.dm_expiration) {
+            continue;
+          }
+          if (o.is_delete_marker()) {
+            if (ol.next_has_same_name()) {
               continue;
             }
-            if (obj_iter->is_delete_marker()) {
-              if ((obj_iter + 1)==objs.end()) {
-                if (is_truncated) {
-                  //deal with it in next round because we can't judge whether this marker is the only version
-                  next_marker = obj_iter->key;
-                  break;
-                }
-              } else if (obj_iter->key.name.compare((obj_iter + 1)->key.name) == 0) {   //*obj_iter is delete marker and isn't the only version, do nothing.
+            skip_expiration_check = op.dm_expiration;
+            remove_indeed = true;   //we should remove the delete marker if it's the only version
+          } else {
+            remove_indeed = false;
+          }
+          mtime = o.meta.mtime;
+          expiration = op.expiration;
+          if (!skip_expiration_check) {
+            if (expiration <= 0) {
+              if (op.expiration_date == boost::none) {
                 continue;
               }
-              skip_expiration_check = prefix_iter->second.dm_expiration;
-              remove_indeed = true;   //we should remove the delete marker if it's the only version
+              is_expired = ceph_clock_now() >= ceph::real_clock::to_time_t(*op.expiration_date);
             } else {
-              remove_indeed = false;
-            }
-            mtime = obj_iter->meta.mtime;
-            expiration = prefix_iter->second.expiration;
-            if (!skip_expiration_check) {
-              if (expiration <= 0) {
-                if (prefix_iter->second.expiration_date == boost::none) {
-                  continue;
-                }
-                is_expired = ceph_clock_now() >= ceph::real_clock::to_time_t(*prefix_iter->second.expiration_date);
-              } else {
-                is_expired = obj_has_expired(mtime, expiration);
-              }
-            }
-          } else {
-            if (prefix_iter->second.noncur_expiration <=0) {
-              continue;
+              is_expired = obj_has_expired(mtime, expiration);
             }
-            remove_indeed = true;
-            mtime = (obj_iter == objs.begin())?pre_obj.meta.mtime:(obj_iter - 1)->meta.mtime;
-            expiration = prefix_iter->second.noncur_expiration;
-            is_expired = obj_has_expired(mtime, expiration);
           }
-          if (skip_expiration_check || is_expired) {
-            if (obj_iter->is_visible()) {
-              RGWObjectCtx rctx(store);
-              rgw_obj obj(bucket_info.bucket, obj_iter->key);
-              RGWObjState *state;
-              int ret = store->get_obj_state(&rctx, bucket_info, obj, &state, false);
-              if (ret < 0) {
-                return ret;
-              }
-              if (state->mtime != obj_iter->meta.mtime)//Check mtime again to avoid delete a recently update object as much as possible
-                continue;
-            }
-            ret = remove_expired_obj(bucket_info, obj_iter->key, obj_iter->meta.owner, obj_iter->meta.owner_display_name, remove_indeed);
+        } else { /* a noncurrent obj */
+          if (op.noncur_expiration <= 0) {
+            continue;
+          }
+          remove_indeed = true;
+          mtime = ol.get_prev_obj().meta.mtime;
+          expiration = op.noncur_expiration;
+          is_expired = obj_has_expired(mtime, expiration);
+        }
+        if (skip_expiration_check || is_expired) {
+          if (o.is_visible()) {
+            RGWObjectCtx rctx(store);
+            rgw_obj obj(bucket_info.bucket, o.key);
+            RGWObjState *state;
+            int ret = store->get_obj_state(&rctx, bucket_info, obj, &state, false);
             if (ret < 0) {
-              ldpp_dout(this, 0) << "ERROR: remove_expired_obj " << dendl;
-            } else {
-              ldpp_dout(this, 2) << "DELETED:" << bucket_name << ":" << obj_iter->key << dendl;
+              return ret;
             }
-
-            if (going_down())
-              return 0;
+            if (state->mtime != o.meta.mtime)//Check mtime again to avoid delete a recently update object as much as possible
+              continue;
           }
+          ret = remove_expired_obj(bucket_info, o.key, o.meta.owner, o.meta.owner_display_name, remove_indeed);
+          if (ret < 0) {
+            ldpp_dout(this, 0) << "ERROR: remove_expired_obj " << dendl;
+          } else {
+            ldpp_dout(this, 2) << "DELETED:" << bucket_name << ":" << o.key << dendl;
+          }
+
+          if (going_down())
+            return 0;
         }
-      } while (is_truncated);
+      }
     }
   }