return 0;
}
-int RGWOrphanStore::store_entries(const string& oid, map<string, bufferlist> entries)
+int RGWOrphanStore::store_entries(const string& oid, const map<string, bufferlist>& entries)
{
librados::ObjectWriteOperation op;
op.omap_set(entries);
return 0;
}
+int RGWOrphanStore::read_entries(const string& oid, const string& marker, map<string, bufferlist> *entries, bool *truncated)
+{
+#define MAX_OMAP_GET 100
+ int ret = ioctx.omap_get_vals(oid, marker, MAX_OMAP_GET, entries);
+ if (ret < 0) {
+ cerr << "ERROR: " << __func__ << "(" << oid << ") returned ret=" << ret << std::endl;
+ }
+
+ *truncated = (entries->size() < MAX_OMAP_GET);
+
+ return 0;
+}
+
int RGWOrphanSearch::init(const string& job_name, RGWOrphanSearchInfo *info) {
int r = orphan_store.init();
if (r < 0) {
string section = "bucket.instance";
int ret = store->meta_mgr->list_keys_init(section, &handle);
if (ret < 0) {
- cerr << "ERROR: can't get key: " << cpp_strerror(-ret) << std::endl;
+ lderr(store->ctx()) << "ERROR: can't get key: " << cpp_strerror(-ret) << dendl;
return -ret;
}
list<string> keys;
ret = store->meta_mgr->list_keys_next(handle, max, keys, &truncated);
if (ret < 0) {
- cerr << "ERROR: lists_keys_next(): " << cpp_strerror(-ret) << std::endl;
+ lderr(store->ctx()) << "ERROR: lists_keys_next(): " << cpp_strerror(-ret) << dendl;
return -ret;
}
for (list<string>::iterator iter = keys.begin(); iter != keys.end(); ++iter) {
- // ssize_t pos = iter->find(':');
- // string bucket_id = iter->substr(pos + 1);
-
int shard = orphan_shard(*iter);
instances[shard].push_back(*iter);
if (++count >= COUNT_BEFORE_FLUSH) {
ret = log_oids(buckets_instance_index, instances);
if (ret < 0) {
- cerr << __func__ << ": ERROR: log_oids() returned ret=" << ret << std::endl;
+ lderr(store->ctx()) << __func__ << ": ERROR: log_oids() returned ret=" << ret << dendl;
return ret;
}
count = 0;
return 0;
}
+int RGWOrphanSearch::build_linked_oids_for_bucket(const string& bucket_instance_id)
+{
+ RGWBucketInfo bucket_info;
+ RGWObjectCtx obj_ctx(store);
+ int ret = store->get_bucket_instance_info(obj_ctx, bucket_instance_id, bucket_info, NULL, NULL);
+ if (ret < 0) {
+ if (ret == -ENOENT) {
+ /* probably raced with bucket removal */
+ return 0;
+ }
+ lderr(store->ctx()) << __func__ << ": ERROR: RGWRados::get_bucket_instance_info() returned ret=" << ret << dendl;
+ return ret;
+ }
+
+ RGWRados::Bucket target(store, bucket_info.bucket);
+ RGWRados::Bucket::List list_op(&target);
+
+ string marker;
+ list_op.params.marker = rgw_obj_key(marker);
+ list_op.params.list_versions = true;
+
+ bool truncated;
+
+ deque<RGWRados::Object::Stat> stat_ops;
+
+ do {
+ vector<RGWObjEnt> result;
+
+#define MAX_LIST_OBJS_ENTRIES 100
+ ret = list_op.list_objects(MAX_LIST_OBJS_ENTRIES, &result, NULL, &truncated);
+ if (ret < 0) {
+ cerr << "ERROR: store->list_objects(): " << cpp_strerror(-ret) << std::endl;
+ return -ret;
+ }
+
+ for (vector<RGWObjEnt>::iterator iter = result.begin(); iter != result.end(); ++iter) {
+ RGWObjEnt& entry = *iter;
+ if (entry.key.instance.empty()) {
+ ldout(store->ctx(), 20) << "obj entry: " << entry.key.name << dendl;
+ } else {
+ ldout(store->ctx(), 20) << "obj entry: " << entry.key.name << " [" << entry.key.instance << "]" << dendl;
+ }
+
+ rgw_obj obj(bucket_info.bucket, entry.key);
+
+ RGWRados::Object op_target(store, bucket_info, obj_ctx, obj);
+
+ stat_ops.push_back(RGWRados::Object::Stat(&op_target));
+ RGWRados::Object::Stat& op = stat_ops.back();
+
+
+ ret = op.stat_async();
+ if (ret < 0) {
+ lderr(store->ctx()) << "ERROR: stat_async() returned error: " << cpp_strerror(-ret) << dendl;
+ return ret;
+ }
+ if (stat_ops.size() >= max_concurrent_ios) {
+ RGWRados::Object::Stat& front_op = stat_ops.front();
+
+ ret = front_op.wait();
+ if (ret < 0) {
+ lderr(store->ctx()) << "ERROR: stat_async() returned error: " << cpp_strerror(-ret) << dendl;
+ }
+
+ stat_ops.pop_front();
+ }
+ }
+ } while (truncated);
+
+ while (!stat_ops.empty()) {
+ RGWRados::Object::Stat& front_op = stat_ops.front();
+
+ ret = front_op.wait();
+ if (ret < 0) {
+ lderr(store->ctx()) << "ERROR: stat_async() returned error: " << cpp_strerror(-ret) << dendl;
+ }
+
+ stat_ops.pop_front();
+ }
+
+ return 0;
+}
+
+int RGWOrphanSearch::build_linked_oids_index()
+{
+ string marker;
+
+ map<int, string>::iterator iter;
+ for (iter = buckets_instance_index.begin(); iter != buckets_instance_index.end(); ++iter) {
+ bool truncated;
+
+ string oid = iter->second;
+
+ do {
+ map<string, bufferlist> entries;
+ int ret = orphan_store.read_entries(oid, marker, &entries, &truncated);
+ if (ret == -ENOENT) {
+ truncated = false;
+ ret = 0;
+ }
+
+ if (ret < 0) {
+ lderr(store->ctx()) << __func__ << ": ERROR: read_entries() oid=" << oid << " returned ret=" << ret << dendl;
+ return ret;
+ }
+
+ if (entries.empty()) {
+ break;
+ }
+
+ marker = entries.rbegin()->first; /* last entry */
+
+ for (map<string, bufferlist>::iterator eiter = entries.begin(); eiter != entries.end(); ++eiter) {
+ ldout(store->ctx(), 20) << " indexed entry: " << eiter->first << dendl;
+ ret = build_linked_oids_for_bucket(eiter->first);
+ }
+ } while (truncated);
+ }
+
+ return 0;
+}
+
int RGWOrphanSearch::run()
{
int r;
case ORPHAN_SEARCH_ITERATE_BI:
+ ldout(store->ctx(), 0) << __func__ << "(): building index of all linked objects" << dendl;
+ r = build_linked_oids_index();
+ if (r < 0) {
+ lderr(store->ctx()) << __func__ << ": ERROR: build_all_objs_index returnr ret=" << r << dendl;
+ return r;
+ }
+
case ORPHAN_SEARCH_DONE:
break;
int write_job(const string& job_name, const RGWOrphanSearchState& state);
- int store_entries(const string& oid, map<string, bufferlist> entries);
+ int store_entries(const string& oid, const map<string, bufferlist>& entries);
+ int read_entries(const string& oid, const string& marker, map<string, bufferlist> *entries, bool *truncated);
};
map<int, string> all_objs_index;
map<int, string> buckets_instance_index;
+ map<int, string> linked_objs_index;
string index_objs_prefix;
+ uint16_t max_concurrent_ios;
+
struct log_iter_info {
string oid;
list<string>::iterator cur;
}
public:
- RGWOrphanSearch(RGWRados *_store) : store(_store), orphan_store(store) {}
+ RGWOrphanSearch(RGWRados *_store, int _max_ios) : store(_store), orphan_store(store), max_concurrent_ios(_max_ios) {}
int save_state() {
RGWOrphanSearchState state;
int build_all_oids_index();
int build_buckets_instance_index();
+ int build_linked_oids_for_bucket(const string& bucket_instance_id);
+ int build_linked_oids_index();
int run();
};