-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#include <errno.h>
OPT_BUCKET_CHECK,
OPT_BUCKET_RM,
OPT_BUCKET_REWRITE,
+ OPT_BUCKET_RESHARD,
OPT_POLICY,
OPT_POOL_ADD,
OPT_POOL_RM,
return OPT_BUCKET_RM;
if (strcmp(cmd, "rewrite") == 0)
return OPT_BUCKET_REWRITE;
+ if (strcmp(cmd, "reshard") == 0)
+ return OPT_BUCKET_RESHARD;
if (strcmp(cmd, "check") == 0)
return OPT_BUCKET_CHECK;
} else if (strcmp(prev_cmd, "log") == 0) {
};
static int init_bucket(const string& bucket_name, const string& bucket_id,
- RGWBucketInfo& bucket_info, rgw_bucket& bucket)
+ RGWBucketInfo& bucket_info, rgw_bucket& bucket, map<string, bufferlist> *pattrs = NULL)
{
if (!bucket_name.empty()) {
RGWObjectCtx obj_ctx(store);
int r;
if (bucket_id.empty()) {
- r = store->get_bucket_info(obj_ctx, bucket_name, bucket_info, NULL);
+ r = store->get_bucket_info(obj_ctx, bucket_name, bucket_info, NULL, pattrs);
} else {
string bucket_instance_id = bucket_name + ":" + bucket_id;
- r = store->get_bucket_instance_info(obj_ctx, bucket_instance_id, bucket_info, NULL, NULL);
+ r = store->get_bucket_instance_info(obj_ctx, bucket_instance_id, bucket_info, NULL, pattrs);
}
if (r < 0) {
cerr << "could not get bucket info for bucket=" << bucket_name << std::endl;
return 0;
}
+#define RESHARD_SHARD_WINDOW 64
+
+class BucketReshardShard {
+ RGWRados *store;
+ RGWBucketInfo& bucket_info;
+ int num_shard;
+ RGWRados::BucketShard bs;
+ vector<rgw_cls_bi_entry> entries;
+
+public:
+ BucketReshardShard(RGWRados *_store, RGWBucketInfo& _bucket_info, int _num_shard) : store(_store), bucket_info(_bucket_info), bs(store) {
+ num_shard = (bucket_info.num_shards > 0 ? _num_shard : -1);
+ bs.init(bucket_info.bucket, num_shard);
+ }
+
+ int add_entry(rgw_cls_bi_entry& entry) {
+ entries.push_back(entry);
+ if (entries.size() >= RESHARD_SHARD_WINDOW) {
+ int ret = flush();
+ if (ret < 0) {
+ return ret;
+ }
+ }
+ return 0;
+ }
+ int flush() {
+ if (entries.size() == 0) {
+ return 0;
+ }
+
+ librados::ObjectWriteOperation op;
+ for (vector<rgw_cls_bi_entry>::iterator iter = entries.begin(); iter != entries.end(); ++iter) {
+ rgw_cls_bi_entry& entry = *iter;
+ store->bi_put(op, bs, entry);
+ }
+ int ret = bs.index_ctx.operate(bs.bucket_obj, &op);
+ if (ret < 0) {
+ std::cerr << "ERROR: failed to store entries in target bucket shard (bs=" << bs.bucket << "/" << bs.shard_id << ") error=" << cpp_strerror(-ret) << std::endl;
+ return ret;
+ }
+ entries.clear();
+ return 0;
+ }
+};
+
int main(int argc, char **argv)
{
}
if (opt_cmd == OPT_BI_LIST) {
+ if (bucket_name.empty()) {
+ cerr << "ERROR: bucket name not specified" << std::endl;
+ return EINVAL;
+ }
RGWBucketInfo bucket_info;
int ret = init_bucket(bucket_name, bucket_id, bucket_info, bucket);
if (ret < 0) {
max_entries = 1000;
}
+ int max_shards = (bucket_info.num_shards > 0 ? bucket_info.num_shards : 1);
- formatter->open_array_section("entries");
-
- do {
- entries.clear();
- ret = store->bi_list(bucket, object, marker, max_entries, &entries, &is_truncated);
+ for (int i = 0; i < max_shards; i++) {
+ RGWRados::BucketShard bs(store);
+ int shard_id = (bucket_info.num_shards > 0 ? i : -1);
+ int ret = bs.init(bucket, shard_id);
if (ret < 0) {
- cerr << "ERROR: bi_list(): " << cpp_strerror(-ret) << std::endl;
+ cerr << "ERROR: bs.init(bucket=" << bucket << ", shard=" << shard_id << "): " << cpp_strerror(-ret) << std::endl;
return -ret;
}
- list<rgw_cls_bi_entry>::iterator iter;
- for (iter = entries.begin(); iter != entries.end(); ++iter) {
- rgw_cls_bi_entry& entry = *iter;
- encode_json("entry", entry, formatter);
- marker = entry.idx;
- }
- formatter->flush(cout);
- } while (is_truncated);
+ formatter->open_array_section("entries");
+
+ do {
+ entries.clear();
+ ret = store->bi_list(bs, marker, max_entries, &entries, &is_truncated);
+ if (ret < 0) {
+ cerr << "ERROR: bi_list(): " << cpp_strerror(-ret) << std::endl;
+ return -ret;
+ }
+
+ list<rgw_cls_bi_entry>::iterator iter;
+ for (iter = entries.begin(); iter != entries.end(); ++iter) {
+ rgw_cls_bi_entry& entry = *iter;
+ encode_json("entry", entry, formatter);
+ marker = entry.idx;
+ }
+ formatter->flush(cout);
+ } while (is_truncated);
+ }
formatter->close_section();
formatter->flush(cout);
}
formatter->flush(cout);
}
+ if (opt_cmd == OPT_BUCKET_RESHARD) {
+ if (bucket_name.empty()) {
+ cerr << "ERROR: bucket not specified" << std::endl;
+ return EINVAL;
+ }
+
+ RGWBucketInfo bucket_info;
+ map<string, bufferlist> attrs;
+ int ret = init_bucket(bucket_name, bucket_id, bucket_info, bucket, &attrs);
+ if (ret < 0) {
+ cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl;
+ return -ret;
+ }
+
+ int num_source_shards = (bucket_info.num_shards > 0 ? bucket_info.num_shards : 1);
+
+ RGWBucketInfo new_bucket_info(bucket_info);
+ store->create_bucket_id(&new_bucket_info.bucket.bucket_id);
+ new_bucket_info.bucket.oid.clear();
+
+ new_bucket_info.num_shards = num_shards;
+ new_bucket_info.objv_tracker.clear();
+
+ ret = store->init_bucket_index(new_bucket_info.bucket, new_bucket_info.num_shards);
+ if (ret < 0) {
+ cerr << "ERROR: failed to init new bucket indexes: " << cpp_strerror(-ret) << std::endl;
+ return -ret;
+ }
+
+ ret = store->put_bucket_instance_info(new_bucket_info, true, 0, &attrs);
+ if (ret < 0) {
+ cerr << "ERROR: failed to store new bucket instance info: " << cpp_strerror(-ret) << std::endl;
+ return -ret;
+ }
+ list<rgw_cls_bi_entry> entries;
+
+ if (max_entries < 0) {
+ max_entries = 1000;
+ }
+
+ int num_target_shards = (new_bucket_info.num_shards > 0 ? new_bucket_info.num_shards : 1);
+ BucketReshardShard *target_shards[num_target_shards];
+ for (int i = 0; i < num_target_shards; ++i) {
+ target_shards[i] = new BucketReshardShard(store, new_bucket_info, i);
+ }
+
+ formatter->open_array_section("entries");
+
+ for (int i = 0; i < num_source_shards; ++i) {
+ bool is_truncated = true;
+ marker.clear();
+ while (is_truncated) {
+ entries.clear();
+ ret = store->bi_list(bucket, i, marker, max_entries, &entries, &is_truncated);
+ if (ret < 0) {
+ cerr << "ERROR: bi_list(): " << cpp_strerror(-ret) << std::endl;
+ return -ret;
+ }
+
+ list<rgw_cls_bi_entry>::iterator iter;
+ for (iter = entries.begin(); iter != entries.end(); ++iter) {
+ rgw_cls_bi_entry& entry = *iter;
+ encode_json("entry", entry, formatter);
+ marker = entry.idx;
+
+ int target_shard_id;
+ cls_rgw_obj_key cls_key;
+ entry.get_key(&cls_key);
+ rgw_obj_key key(cls_key);
+ rgw_obj obj(new_bucket_info.bucket, key);
+ int ret = store->get_target_shard_id(new_bucket_info, obj.get_hash_object(), &target_shard_id);
+ if (ret < 0) {
+ cerr << "ERROR: get_target_shard_id() returned ret=" << ret << std::endl;
+ return ret;
+ }
+
+ int shard_index = (target_shard_id > 0 ? target_shard_id : 0);
+
+ ret = target_shards[shard_index]->add_entry(entry);
+ if (ret < 0) {
+ cerr << "ERROR: target_shards.add_entry(" << key << ") returned error: " << cpp_strerror(-ret) << std::endl;
+ return ret;
+ }
+ }
+
+ formatter->flush(cout);
+ }
+ }
+ formatter->close_section();
+ formatter->flush(cout);
+
+ for (int i = 0; i < num_target_shards; ++i) {
+ int ret = target_shards[i]->flush();
+ if (ret < 0) {
+ cerr << "ERROR: target_shards[" << i << "].flush() returned error: " << cpp_strerror(-ret) << std::endl;
+ return ret;
+ }
+ delete target_shards[i];
+ }
+ }
+
if (opt_cmd == OPT_OBJECT_UNLINK) {
RGWBucketInfo bucket_info;
int ret = init_bucket(bucket_name, bucket_id, bucket_info, bucket);