]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: iterate asynchronously over linked objects
authorYehuda Sadeh <yehuda@redhat.com>
Wed, 29 Apr 2015 21:50:15 +0000 (14:50 -0700)
committerLoic Dachary <ldachary@redhat.com>
Sun, 30 Aug 2015 15:55:59 +0000 (17:55 +0200)
Read objects manifest. So that we could keep the relevant info later.

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
(cherry picked from commit 05a953d965a0a6a0e9feddaa62f7429e154e828e)

src/rgw/rgw_admin.cc
src/rgw/rgw_orphan.cc
src/rgw/rgw_orphan.h

index 2736faac3cd2af74b52b2f675c787e653705ba52..d9bbb681e15571f94cbf918986397e65412de25b 100644 (file)
@@ -1150,6 +1150,7 @@ int main(int argc, char **argv)
   string job_id;
   int init_search = false;
   int num_shards = 0;
+  int max_concurrent_ios = 32;
 
   std::string val;
   std::ostringstream errs;
@@ -1253,6 +1254,8 @@ int main(int argc, char **argv)
       end_date = val;
     } else if (ceph_argparse_witharg(args, i, &val, "--num-shards", (char*)NULL)) {
       num_shards = atoi(val.c_str());
+    } else if (ceph_argparse_witharg(args, i, &val, "--max-concurrent-ios", (char*)NULL)) {
+      max_concurrent_ios = atoi(val.c_str());
     } else if (ceph_argparse_witharg(args, i, &val, "--shard-id", (char*)NULL)) {
       shard_id = atoi(val.c_str());
       specified_shard_id = true;
@@ -2575,7 +2578,7 @@ next:
   }
 
   if (opt_cmd == OPT_ORPHANS_FIND) {
-    RGWOrphanSearch search(store);
+    RGWOrphanSearch search(store, max_concurrent_ios);
 
     if (job_id.empty()) {
       cerr << "ERROR: --job-id not specified" << std::endl;
index 57d96c057e18dc2cc60e6773238c38cc58246333..adef6243ff94f2757eb6c78dcd29589b06a45ae5 100644 (file)
@@ -70,7 +70,7 @@ int RGWOrphanStore::init()
   return 0;
 }
 
-int RGWOrphanStore::store_entries(const string& oid, map<string, bufferlist> entries)
+int RGWOrphanStore::store_entries(const string& oid, const map<string, bufferlist>& entries)
 {
   librados::ObjectWriteOperation op;
   op.omap_set(entries);
@@ -83,6 +83,19 @@ int RGWOrphanStore::store_entries(const string& oid, map<string, bufferlist> ent
   return 0;
 }
 
+int RGWOrphanStore::read_entries(const string& oid, const string& marker, map<string, bufferlist> *entries, bool *truncated)
+{
+#define MAX_OMAP_GET 100
+  int ret = ioctx.omap_get_vals(oid, marker, MAX_OMAP_GET, entries);
+  if (ret < 0) {
+    cerr << "ERROR: " << __func__ << "(" << oid << ") returned ret=" << ret << std::endl;
+  }
+
+  *truncated = (entries->size() < MAX_OMAP_GET);
+
+  return 0;
+}
+
 int RGWOrphanSearch::init(const string& job_name, RGWOrphanSearchInfo *info) {
   int r = orphan_store.init();
   if (r < 0) {
@@ -238,7 +251,7 @@ int RGWOrphanSearch::build_buckets_instance_index()
   string section = "bucket.instance";
   int ret = store->meta_mgr->list_keys_init(section, &handle);
   if (ret < 0) {
-    cerr << "ERROR: can't get key: " << cpp_strerror(-ret) << std::endl;
+    lderr(store->ctx()) << "ERROR: can't get key: " << cpp_strerror(-ret) << dendl;
     return -ret;
   }
 
@@ -254,21 +267,18 @@ int RGWOrphanSearch::build_buckets_instance_index()
     list<string> keys;
     ret = store->meta_mgr->list_keys_next(handle, max, keys, &truncated);
     if (ret < 0) {
-      cerr << "ERROR: lists_keys_next(): " << cpp_strerror(-ret) << std::endl;
+      lderr(store->ctx()) << "ERROR: lists_keys_next(): " << cpp_strerror(-ret) << dendl;
       return -ret;
     }
 
     for (list<string>::iterator iter = keys.begin(); iter != keys.end(); ++iter) {
-      // ssize_t pos = iter->find(':');
-      // string bucket_id = iter->substr(pos + 1);
-
       int shard = orphan_shard(*iter);
       instances[shard].push_back(*iter);
 
       if (++count >= COUNT_BEFORE_FLUSH) {
         ret = log_oids(buckets_instance_index, instances);
         if (ret < 0) {
-          cerr << __func__ << ": ERROR: log_oids() returned ret=" << ret << std::endl;
+          lderr(store->ctx()) << __func__ << ": ERROR: log_oids() returned ret=" << ret << dendl;
           return ret;
         }
         count = 0;
@@ -282,6 +292,128 @@ int RGWOrphanSearch::build_buckets_instance_index()
   return 0;
 }
 
+int RGWOrphanSearch::build_linked_oids_for_bucket(const string& bucket_instance_id)
+{
+  RGWBucketInfo bucket_info;
+  RGWObjectCtx obj_ctx(store);
+  int ret = store->get_bucket_instance_info(obj_ctx, bucket_instance_id, bucket_info, NULL, NULL);
+  if (ret < 0) {
+    if (ret == -ENOENT) {
+      /* probably raced with bucket removal */
+      return 0;
+    }
+    lderr(store->ctx()) << __func__ << ": ERROR: RGWRados::get_bucket_instance_info() returned ret=" << ret << dendl;
+    return ret;
+  }
+
+  RGWRados::Bucket target(store, bucket_info.bucket);
+  RGWRados::Bucket::List list_op(&target);
+
+  string marker;
+  list_op.params.marker = rgw_obj_key(marker);
+  list_op.params.list_versions = true;
+
+  bool truncated;
+
+  deque<RGWRados::Object::Stat> stat_ops;
+
+  do {
+    vector<RGWObjEnt> result;
+
+#define MAX_LIST_OBJS_ENTRIES 100
+    ret = list_op.list_objects(MAX_LIST_OBJS_ENTRIES, &result, NULL, &truncated);
+    if (ret < 0) {
+      cerr << "ERROR: store->list_objects(): " << cpp_strerror(-ret) << std::endl;
+      return -ret;
+    }
+
+    for (vector<RGWObjEnt>::iterator iter = result.begin(); iter != result.end(); ++iter) {
+      RGWObjEnt& entry = *iter;
+      if (entry.key.instance.empty()) {
+        ldout(store->ctx(), 20) << "obj entry: " << entry.key.name << dendl;
+      } else {
+        ldout(store->ctx(), 20) << "obj entry: " << entry.key.name << " [" << entry.key.instance << "]" << dendl;
+      }
+
+      rgw_obj obj(bucket_info.bucket, entry.key);
+
+      RGWRados::Object op_target(store, bucket_info, obj_ctx, obj);
+
+      stat_ops.push_back(RGWRados::Object::Stat(&op_target));
+      RGWRados::Object::Stat& op = stat_ops.back();
+
+
+      ret = op.stat_async();
+      if (ret < 0) {
+        lderr(store->ctx()) << "ERROR: stat_async() returned error: " << cpp_strerror(-ret) << dendl;
+        return ret;
+      }
+      if (stat_ops.size() >= max_concurrent_ios) {
+        RGWRados::Object::Stat& front_op = stat_ops.front();
+
+        ret = front_op.wait();
+        if (ret < 0) {
+          lderr(store->ctx()) << "ERROR: stat_async() returned error: " << cpp_strerror(-ret) << dendl;
+        }
+
+        stat_ops.pop_front();
+      }
+    }
+  } while (truncated);
+
+  while (!stat_ops.empty()) {
+    RGWRados::Object::Stat& front_op = stat_ops.front();
+
+    ret = front_op.wait();
+    if (ret < 0) {
+      lderr(store->ctx()) << "ERROR: stat_async() returned error: " << cpp_strerror(-ret) << dendl;
+    }
+
+    stat_ops.pop_front();
+  }
+
+  return 0;
+}
+
+int RGWOrphanSearch::build_linked_oids_index()
+{
+  string marker;
+
+  map<int, string>::iterator iter;
+  for (iter = buckets_instance_index.begin(); iter != buckets_instance_index.end(); ++iter) {
+    bool truncated;
+
+    string oid = iter->second;
+
+    do {
+      map<string, bufferlist> entries;
+      int ret = orphan_store.read_entries(oid, marker, &entries, &truncated);
+      if (ret == -ENOENT) {
+        truncated = false;
+        ret = 0;
+      }
+
+      if (ret < 0) {
+        lderr(store->ctx()) << __func__ << ": ERROR: read_entries() oid=" << oid << " returned ret=" << ret << dendl;
+        return ret;
+      }
+
+      if (entries.empty()) {
+        break;
+      }
+
+      marker = entries.rbegin()->first; /* last entry */
+
+      for (map<string, bufferlist>::iterator eiter = entries.begin(); eiter != entries.end(); ++eiter) {
+        ldout(store->ctx(), 20) << " indexed entry: " << eiter->first << dendl;
+        ret = build_linked_oids_for_bucket(eiter->first);
+      }
+    } while (truncated);
+  }
+
+  return 0;
+}
+
 int RGWOrphanSearch::run()
 {
   int r;
@@ -331,6 +463,13 @@ int RGWOrphanSearch::run()
 
 
     case ORPHAN_SEARCH_ITERATE_BI:
+      ldout(store->ctx(), 0) << __func__ << "(): building index of all linked objects" << dendl;
+      r = build_linked_oids_index();
+      if (r < 0) {
+        lderr(store->ctx()) << __func__ << ": ERROR: build_all_objs_index returnr ret=" << r << dendl;
+        return r;
+      }
+
     case ORPHAN_SEARCH_DONE:
       break;
 
index 29f62114b320938358b806e76155e9cff39eabe6..25692005bd9966eb3cc11e0c3a7d595124e59f5f 100644 (file)
@@ -107,7 +107,8 @@ public:
   int write_job(const string& job_name, const RGWOrphanSearchState& state);
 
 
-  int store_entries(const string& oid, map<string, bufferlist> entries);
+  int store_entries(const string& oid, const map<string, bufferlist>& entries);
+  int read_entries(const string& oid, const string& marker, map<string, bufferlist> *entries, bool *truncated);
 };
 
 
@@ -122,9 +123,12 @@ class RGWOrphanSearch {
 
   map<int, string> all_objs_index;
   map<int, string> buckets_instance_index;
+  map<int, string> linked_objs_index;
 
   string index_objs_prefix;
 
+  uint16_t max_concurrent_ios;
+
   struct log_iter_info {
     string oid;
     list<string>::iterator cur;
@@ -139,7 +143,7 @@ class RGWOrphanSearch {
   }
 
 public:
-  RGWOrphanSearch(RGWRados *_store) : store(_store), orphan_store(store) {}
+  RGWOrphanSearch(RGWRados *_store, int _max_ios) : store(_store), orphan_store(store), max_concurrent_ios(_max_ios) {}
 
   int save_state() {
     RGWOrphanSearchState state;
@@ -154,6 +158,8 @@ public:
 
   int build_all_oids_index();
   int build_buckets_instance_index();
+  int build_linked_oids_for_bucket(const string& bucket_instance_id);
+  int build_linked_oids_index();
 
   int run();
 };