From: Yehuda Sadeh Date: Mon, 26 Sep 2016 23:09:34 +0000 (-0700) Subject: rgw_admin: bucket rehsrading, initial work X-Git-Tag: v0.94.10~8^2~17 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=53982a2e61614398de2e37705da12aa6874360a0;p=ceph.git rgw_admin: bucket rehsrading, initial work Signed-off-by: Yehuda Sadeh Conflicts: src/rgw/rgw_admin.cc --- diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index d2c04a8fac6..4eed8ed2b02 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -1,4 +1,3 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab #include @@ -224,6 +223,7 @@ enum { OPT_BUCKET_CHECK, OPT_BUCKET_RM, OPT_BUCKET_REWRITE, + OPT_BUCKET_RESHARD, OPT_POLICY, OPT_POOL_ADD, OPT_POOL_RM, @@ -367,6 +367,8 @@ static int get_cmd(const char *cmd, const char *prev_cmd, bool *need_more) return OPT_BUCKET_RM; if (strcmp(cmd, "rewrite") == 0) return OPT_BUCKET_REWRITE; + if (strcmp(cmd, "reshard") == 0) + return OPT_BUCKET_RESHARD; if (strcmp(cmd, "check") == 0) return OPT_BUCKET_CHECK; } else if (strcmp(prev_cmd, "log") == 0) { @@ -633,16 +635,16 @@ public: }; static int init_bucket(const string& bucket_name, const string& bucket_id, - RGWBucketInfo& bucket_info, rgw_bucket& bucket) + RGWBucketInfo& bucket_info, rgw_bucket& bucket, map *pattrs = NULL) { if (!bucket_name.empty()) { RGWObjectCtx obj_ctx(store); int r; if (bucket_id.empty()) { - r = store->get_bucket_info(obj_ctx, bucket_name, bucket_info, NULL); + r = store->get_bucket_info(obj_ctx, bucket_name, bucket_info, NULL, pattrs); } else { string bucket_instance_id = bucket_name + ":" + bucket_id; - r = store->get_bucket_instance_info(obj_ctx, bucket_instance_id, bucket_info, NULL, NULL); + r = store->get_bucket_instance_info(obj_ctx, bucket_instance_id, bucket_info, NULL, pattrs); } if (r < 0) { cerr << "could not get bucket info for bucket=" << bucket_name << std::endl; @@ -1082,6 +1084,51 @@ int do_check_object_locator(const string& bucket_name, bool fix, bool remove_bad return 0; } +#define RESHARD_SHARD_WINDOW 64 + +class BucketReshardShard { + RGWRados *store; + RGWBucketInfo& bucket_info; + int num_shard; + RGWRados::BucketShard bs; + vector entries; + +public: + BucketReshardShard(RGWRados *_store, RGWBucketInfo& _bucket_info, int _num_shard) : store(_store), bucket_info(_bucket_info), bs(store) { + num_shard = (bucket_info.num_shards > 0 ? _num_shard : -1); + bs.init(bucket_info.bucket, num_shard); + } + + int add_entry(rgw_cls_bi_entry& entry) { + entries.push_back(entry); + if (entries.size() >= RESHARD_SHARD_WINDOW) { + int ret = flush(); + if (ret < 0) { + return ret; + } + } + return 0; + } + int flush() { + if (entries.size() == 0) { + return 0; + } + + librados::ObjectWriteOperation op; + for (vector::iterator iter = entries.begin(); iter != entries.end(); ++iter) { + rgw_cls_bi_entry& entry = *iter; + store->bi_put(op, bs, entry); + } + int ret = bs.index_ctx.operate(bs.bucket_obj, &op); + if (ret < 0) { + std::cerr << "ERROR: failed to store entries in target bucket shard (bs=" << bs.bucket << "/" << bs.shard_id << ") error=" << cpp_strerror(-ret) << std::endl; + return ret; + } + entries.clear(); + return 0; + } +}; + int main(int argc, char **argv) { @@ -2286,6 +2333,10 @@ next: } if (opt_cmd == OPT_BI_LIST) { + if (bucket_name.empty()) { + cerr << "ERROR: bucket name not specified" << std::endl; + return EINVAL; + } RGWBucketInfo bucket_info; int ret = init_bucket(bucket_name, bucket_id, bucket_info, bucket); if (ret < 0) { @@ -2299,25 +2350,36 @@ next: max_entries = 1000; } + int max_shards = (bucket_info.num_shards > 0 ? bucket_info.num_shards : 1); - formatter->open_array_section("entries"); - - do { - entries.clear(); - ret = store->bi_list(bucket, object, marker, max_entries, &entries, &is_truncated); + for (int i = 0; i < max_shards; i++) { + RGWRados::BucketShard bs(store); + int shard_id = (bucket_info.num_shards > 0 ? i : -1); + int ret = bs.init(bucket, shard_id); if (ret < 0) { - cerr << "ERROR: bi_list(): " << cpp_strerror(-ret) << std::endl; + cerr << "ERROR: bs.init(bucket=" << bucket << ", shard=" << shard_id << "): " << cpp_strerror(-ret) << std::endl; return -ret; } - list::iterator iter; - for (iter = entries.begin(); iter != entries.end(); ++iter) { - rgw_cls_bi_entry& entry = *iter; - encode_json("entry", entry, formatter); - marker = entry.idx; - } - formatter->flush(cout); - } while (is_truncated); + formatter->open_array_section("entries"); + + do { + entries.clear(); + ret = store->bi_list(bs, marker, max_entries, &entries, &is_truncated); + if (ret < 0) { + cerr << "ERROR: bi_list(): " << cpp_strerror(-ret) << std::endl; + return -ret; + } + + list::iterator iter; + for (iter = entries.begin(); iter != entries.end(); ++iter) { + rgw_cls_bi_entry& entry = *iter; + encode_json("entry", entry, formatter); + marker = entry.idx; + } + formatter->flush(cout); + } while (is_truncated); + } formatter->close_section(); formatter->flush(cout); } @@ -2477,6 +2539,107 @@ next: formatter->flush(cout); } + if (opt_cmd == OPT_BUCKET_RESHARD) { + if (bucket_name.empty()) { + cerr << "ERROR: bucket not specified" << std::endl; + return EINVAL; + } + + RGWBucketInfo bucket_info; + map attrs; + int ret = init_bucket(bucket_name, bucket_id, bucket_info, bucket, &attrs); + if (ret < 0) { + cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl; + return -ret; + } + + int num_source_shards = (bucket_info.num_shards > 0 ? bucket_info.num_shards : 1); + + RGWBucketInfo new_bucket_info(bucket_info); + store->create_bucket_id(&new_bucket_info.bucket.bucket_id); + new_bucket_info.bucket.oid.clear(); + + new_bucket_info.num_shards = num_shards; + new_bucket_info.objv_tracker.clear(); + + ret = store->init_bucket_index(new_bucket_info.bucket, new_bucket_info.num_shards); + if (ret < 0) { + cerr << "ERROR: failed to init new bucket indexes: " << cpp_strerror(-ret) << std::endl; + return -ret; + } + + ret = store->put_bucket_instance_info(new_bucket_info, true, 0, &attrs); + if (ret < 0) { + cerr << "ERROR: failed to store new bucket instance info: " << cpp_strerror(-ret) << std::endl; + return -ret; + } + list entries; + + if (max_entries < 0) { + max_entries = 1000; + } + + int num_target_shards = (new_bucket_info.num_shards > 0 ? new_bucket_info.num_shards : 1); + BucketReshardShard *target_shards[num_target_shards]; + for (int i = 0; i < num_target_shards; ++i) { + target_shards[i] = new BucketReshardShard(store, new_bucket_info, i); + } + + formatter->open_array_section("entries"); + + for (int i = 0; i < num_source_shards; ++i) { + bool is_truncated = true; + marker.clear(); + while (is_truncated) { + entries.clear(); + ret = store->bi_list(bucket, i, marker, max_entries, &entries, &is_truncated); + if (ret < 0) { + cerr << "ERROR: bi_list(): " << cpp_strerror(-ret) << std::endl; + return -ret; + } + + list::iterator iter; + for (iter = entries.begin(); iter != entries.end(); ++iter) { + rgw_cls_bi_entry& entry = *iter; + encode_json("entry", entry, formatter); + marker = entry.idx; + + int target_shard_id; + cls_rgw_obj_key cls_key; + entry.get_key(&cls_key); + rgw_obj_key key(cls_key); + rgw_obj obj(new_bucket_info.bucket, key); + int ret = store->get_target_shard_id(new_bucket_info, obj.get_hash_object(), &target_shard_id); + if (ret < 0) { + cerr << "ERROR: get_target_shard_id() returned ret=" << ret << std::endl; + return ret; + } + + int shard_index = (target_shard_id > 0 ? target_shard_id : 0); + + ret = target_shards[shard_index]->add_entry(entry); + if (ret < 0) { + cerr << "ERROR: target_shards.add_entry(" << key << ") returned error: " << cpp_strerror(-ret) << std::endl; + return ret; + } + } + + formatter->flush(cout); + } + } + formatter->close_section(); + formatter->flush(cout); + + for (int i = 0; i < num_target_shards; ++i) { + int ret = target_shards[i]->flush(); + if (ret < 0) { + cerr << "ERROR: target_shards[" << i << "].flush() returned error: " << cpp_strerror(-ret) << std::endl; + return ret; + } + delete target_shards[i]; + } + } + if (opt_cmd == OPT_OBJECT_UNLINK) { RGWBucketInfo bucket_info; int ret = init_bucket(bucket_name, bucket_id, bucket_info, bucket);