From 62d562d76e0456cc3c735c6708531c7deb2874da Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Wed, 29 Apr 2015 14:50:15 -0700 Subject: [PATCH] rgw: iterate asynchronously over linked objects Read objects manifest. So that we could keep the relevant info later. Signed-off-by: Yehuda Sadeh (cherry picked from commit 05a953d965a0a6a0e9feddaa62f7429e154e828e) --- src/rgw/rgw_admin.cc | 5 +- src/rgw/rgw_orphan.cc | 153 ++++++++++++++++++++++++++++++++++++++++-- src/rgw/rgw_orphan.h | 10 ++- 3 files changed, 158 insertions(+), 10 deletions(-) diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index 2736faac3cd2a..d9bbb681e1557 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -1150,6 +1150,7 @@ int main(int argc, char **argv) string job_id; int init_search = false; int num_shards = 0; + int max_concurrent_ios = 32; std::string val; std::ostringstream errs; @@ -1253,6 +1254,8 @@ int main(int argc, char **argv) end_date = val; } else if (ceph_argparse_witharg(args, i, &val, "--num-shards", (char*)NULL)) { num_shards = atoi(val.c_str()); + } else if (ceph_argparse_witharg(args, i, &val, "--max-concurrent-ios", (char*)NULL)) { + max_concurrent_ios = atoi(val.c_str()); } else if (ceph_argparse_witharg(args, i, &val, "--shard-id", (char*)NULL)) { shard_id = atoi(val.c_str()); specified_shard_id = true; @@ -2575,7 +2578,7 @@ next: } if (opt_cmd == OPT_ORPHANS_FIND) { - RGWOrphanSearch search(store); + RGWOrphanSearch search(store, max_concurrent_ios); if (job_id.empty()) { cerr << "ERROR: --job-id not specified" << std::endl; diff --git a/src/rgw/rgw_orphan.cc b/src/rgw/rgw_orphan.cc index 57d96c057e18d..adef6243ff94f 100644 --- a/src/rgw/rgw_orphan.cc +++ b/src/rgw/rgw_orphan.cc @@ -70,7 +70,7 @@ int RGWOrphanStore::init() return 0; } -int RGWOrphanStore::store_entries(const string& oid, map entries) +int RGWOrphanStore::store_entries(const string& oid, const map& entries) { librados::ObjectWriteOperation op; op.omap_set(entries); @@ -83,6 +83,19 @@ int RGWOrphanStore::store_entries(const string& oid, map ent return 0; } +int RGWOrphanStore::read_entries(const string& oid, const string& marker, map *entries, bool *truncated) +{ +#define MAX_OMAP_GET 100 + int ret = ioctx.omap_get_vals(oid, marker, MAX_OMAP_GET, entries); + if (ret < 0) { + cerr << "ERROR: " << __func__ << "(" << oid << ") returned ret=" << ret << std::endl; + } + + *truncated = (entries->size() < MAX_OMAP_GET); + + return 0; +} + int RGWOrphanSearch::init(const string& job_name, RGWOrphanSearchInfo *info) { int r = orphan_store.init(); if (r < 0) { @@ -238,7 +251,7 @@ int RGWOrphanSearch::build_buckets_instance_index() string section = "bucket.instance"; int ret = store->meta_mgr->list_keys_init(section, &handle); if (ret < 0) { - cerr << "ERROR: can't get key: " << cpp_strerror(-ret) << std::endl; + lderr(store->ctx()) << "ERROR: can't get key: " << cpp_strerror(-ret) << dendl; return -ret; } @@ -254,21 +267,18 @@ int RGWOrphanSearch::build_buckets_instance_index() list keys; ret = store->meta_mgr->list_keys_next(handle, max, keys, &truncated); if (ret < 0) { - cerr << "ERROR: lists_keys_next(): " << cpp_strerror(-ret) << std::endl; + lderr(store->ctx()) << "ERROR: lists_keys_next(): " << cpp_strerror(-ret) << dendl; return -ret; } for (list::iterator iter = keys.begin(); iter != keys.end(); ++iter) { - // ssize_t pos = iter->find(':'); - // string bucket_id = iter->substr(pos + 1); - int shard = orphan_shard(*iter); instances[shard].push_back(*iter); if (++count >= COUNT_BEFORE_FLUSH) { ret = log_oids(buckets_instance_index, instances); if (ret < 0) { - cerr << __func__ << ": ERROR: log_oids() returned ret=" << ret << std::endl; + lderr(store->ctx()) << __func__ << ": ERROR: log_oids() returned ret=" << ret << dendl; return ret; } count = 0; @@ -282,6 +292,128 @@ int RGWOrphanSearch::build_buckets_instance_index() return 0; } +int RGWOrphanSearch::build_linked_oids_for_bucket(const string& bucket_instance_id) +{ + RGWBucketInfo bucket_info; + RGWObjectCtx obj_ctx(store); + int ret = store->get_bucket_instance_info(obj_ctx, bucket_instance_id, bucket_info, NULL, NULL); + if (ret < 0) { + if (ret == -ENOENT) { + /* probably raced with bucket removal */ + return 0; + } + lderr(store->ctx()) << __func__ << ": ERROR: RGWRados::get_bucket_instance_info() returned ret=" << ret << dendl; + return ret; + } + + RGWRados::Bucket target(store, bucket_info.bucket); + RGWRados::Bucket::List list_op(&target); + + string marker; + list_op.params.marker = rgw_obj_key(marker); + list_op.params.list_versions = true; + + bool truncated; + + deque stat_ops; + + do { + vector result; + +#define MAX_LIST_OBJS_ENTRIES 100 + ret = list_op.list_objects(MAX_LIST_OBJS_ENTRIES, &result, NULL, &truncated); + if (ret < 0) { + cerr << "ERROR: store->list_objects(): " << cpp_strerror(-ret) << std::endl; + return -ret; + } + + for (vector::iterator iter = result.begin(); iter != result.end(); ++iter) { + RGWObjEnt& entry = *iter; + if (entry.key.instance.empty()) { + ldout(store->ctx(), 20) << "obj entry: " << entry.key.name << dendl; + } else { + ldout(store->ctx(), 20) << "obj entry: " << entry.key.name << " [" << entry.key.instance << "]" << dendl; + } + + rgw_obj obj(bucket_info.bucket, entry.key); + + RGWRados::Object op_target(store, bucket_info, obj_ctx, obj); + + stat_ops.push_back(RGWRados::Object::Stat(&op_target)); + RGWRados::Object::Stat& op = stat_ops.back(); + + + ret = op.stat_async(); + if (ret < 0) { + lderr(store->ctx()) << "ERROR: stat_async() returned error: " << cpp_strerror(-ret) << dendl; + return ret; + } + if (stat_ops.size() >= max_concurrent_ios) { + RGWRados::Object::Stat& front_op = stat_ops.front(); + + ret = front_op.wait(); + if (ret < 0) { + lderr(store->ctx()) << "ERROR: stat_async() returned error: " << cpp_strerror(-ret) << dendl; + } + + stat_ops.pop_front(); + } + } + } while (truncated); + + while (!stat_ops.empty()) { + RGWRados::Object::Stat& front_op = stat_ops.front(); + + ret = front_op.wait(); + if (ret < 0) { + lderr(store->ctx()) << "ERROR: stat_async() returned error: " << cpp_strerror(-ret) << dendl; + } + + stat_ops.pop_front(); + } + + return 0; +} + +int RGWOrphanSearch::build_linked_oids_index() +{ + string marker; + + map::iterator iter; + for (iter = buckets_instance_index.begin(); iter != buckets_instance_index.end(); ++iter) { + bool truncated; + + string oid = iter->second; + + do { + map entries; + int ret = orphan_store.read_entries(oid, marker, &entries, &truncated); + if (ret == -ENOENT) { + truncated = false; + ret = 0; + } + + if (ret < 0) { + lderr(store->ctx()) << __func__ << ": ERROR: read_entries() oid=" << oid << " returned ret=" << ret << dendl; + return ret; + } + + if (entries.empty()) { + break; + } + + marker = entries.rbegin()->first; /* last entry */ + + for (map::iterator eiter = entries.begin(); eiter != entries.end(); ++eiter) { + ldout(store->ctx(), 20) << " indexed entry: " << eiter->first << dendl; + ret = build_linked_oids_for_bucket(eiter->first); + } + } while (truncated); + } + + return 0; +} + int RGWOrphanSearch::run() { int r; @@ -331,6 +463,13 @@ int RGWOrphanSearch::run() case ORPHAN_SEARCH_ITERATE_BI: + ldout(store->ctx(), 0) << __func__ << "(): building index of all linked objects" << dendl; + r = build_linked_oids_index(); + if (r < 0) { + lderr(store->ctx()) << __func__ << ": ERROR: build_all_objs_index returnr ret=" << r << dendl; + return r; + } + case ORPHAN_SEARCH_DONE: break; diff --git a/src/rgw/rgw_orphan.h b/src/rgw/rgw_orphan.h index 29f62114b3209..25692005bd996 100644 --- a/src/rgw/rgw_orphan.h +++ b/src/rgw/rgw_orphan.h @@ -107,7 +107,8 @@ public: int write_job(const string& job_name, const RGWOrphanSearchState& state); - int store_entries(const string& oid, map entries); + int store_entries(const string& oid, const map& entries); + int read_entries(const string& oid, const string& marker, map *entries, bool *truncated); }; @@ -122,9 +123,12 @@ class RGWOrphanSearch { map all_objs_index; map buckets_instance_index; + map linked_objs_index; string index_objs_prefix; + uint16_t max_concurrent_ios; + struct log_iter_info { string oid; list::iterator cur; @@ -139,7 +143,7 @@ class RGWOrphanSearch { } public: - RGWOrphanSearch(RGWRados *_store) : store(_store), orphan_store(store) {} + RGWOrphanSearch(RGWRados *_store, int _max_ios) : store(_store), orphan_store(store), max_concurrent_ios(_max_ios) {} int save_state() { RGWOrphanSearchState state; @@ -154,6 +158,8 @@ public: int build_all_oids_index(); int build_buckets_instance_index(); + int build_linked_oids_for_bucket(const string& bucket_instance_id); + int build_linked_oids_index(); int run(); }; -- 2.39.5