cout << " role-policy get get the specified inline policy document embedded with the given role\n";
cout << " role-policy delete delete policy attached to a role\n";
cout << " reshard add schedule a resharding of a bucket\n";
- cout << " reshard list list all bucket resharding or scheduled to be reshared\n";
- cout << " reshard execute execute resharding of a bucket \n";
- cout << " reshard cancel cancel resharding a bucket\n";
+ cout << " reshard list list all bucket resharding or scheduled to be reshared\n";
+ cout << " reshard execute execute resharding of a bucket \n";
+ cout << " reshard cancel cancel resharding a bucket\n";
cout << "options:\n";
cout << " --tenant=<tenant> tenant name\n";
cout << " --uid=<id> user id\n";
}
}
-#define RESHARD_SHARD_WINDOW 64
-#define RESHARD_MAX_AIO 128
-
-class BucketReshardShard {
- RGWRados *store;
- RGWBucketInfo& bucket_info;
- int num_shard;
- RGWRados::BucketShard bs;
- vector<rgw_cls_bi_entry> entries;
- map<uint8_t, rgw_bucket_category_stats> stats;
- deque<librados::AioCompletion *>& aio_completions;
-
- int wait_next_completion() {
- librados::AioCompletion *c = aio_completions.front();
- aio_completions.pop_front();
-
- c->wait_for_safe();
-
- int ret = c->get_return_value();
- c->release();
-
- if (ret < 0) {
- cerr << "ERROR: reshard rados operation failed: " << cpp_strerror(-ret) << std::endl;
- return ret;
- }
-
- return 0;
- }
-
- int get_completion(librados::AioCompletion **c) {
- if (aio_completions.size() >= RESHARD_MAX_AIO) {
- int ret = wait_next_completion();
- if (ret < 0) {
- return ret;
- }
- }
-
- *c = librados::Rados::aio_create_completion(nullptr, nullptr, nullptr);
- aio_completions.push_back(*c);
-
- return 0;
- }
-
-public:
- BucketReshardShard(RGWRados *_store, RGWBucketInfo& _bucket_info,
- int _num_shard,
- deque<librados::AioCompletion *>& _completions) : store(_store), bucket_info(_bucket_info), bs(store),
- aio_completions(_completions) {
- num_shard = (bucket_info.num_shards > 0 ? _num_shard : -1);
- bs.init(bucket_info.bucket, num_shard);
- }
-
- int get_num_shard() {
- return num_shard;
- }
-
- int add_entry(rgw_cls_bi_entry& entry, bool account, uint8_t category,
- const rgw_bucket_category_stats& entry_stats) {
- entries.push_back(entry);
- if (account) {
- rgw_bucket_category_stats& target = stats[category];
- target.num_entries += entry_stats.num_entries;
- target.total_size += entry_stats.total_size;
- target.total_size_rounded += entry_stats.total_size_rounded;
- }
- if (entries.size() >= RESHARD_SHARD_WINDOW) {
- int ret = flush();
- if (ret < 0) {
- return ret;
- }
- }
- return 0;
- }
- int flush() {
- if (entries.size() == 0) {
- return 0;
- }
-
- librados::ObjectWriteOperation op;
- for (auto& entry : entries) {
- store->bi_put(op, bs, entry);
- }
- cls_rgw_bucket_update_stats(op, false, stats);
-
- librados::AioCompletion *c;
- int ret = get_completion(&c);
- if (ret < 0) {
- return ret;
- }
- ret = bs.index_ctx.aio_operate(bs.bucket_obj, c, &op);
- if (ret < 0) {
- std::cerr << "ERROR: failed to store entries in target bucket shard (bs=" << bs.bucket << "/" << bs.shard_id << ") error=" << cpp_strerror(-ret) << std::endl;
- return ret;
- }
- entries.clear();
- stats.clear();
- return 0;
- }
-
- int wait_all_aio() {
- int ret = 0;
- while (!aio_completions.empty()) {
- int r = wait_next_completion();
- if (r < 0) {
- ret = r;
- }
- }
- return ret;
- }
-};
-
-class BucketReshardManager {
- RGWRados *store;
- RGWBucketInfo& target_bucket_info;
- deque<librados::AioCompletion *> completions;
- int num_target_shards;
- vector<BucketReshardShard *> target_shards;
-
-public:
- BucketReshardManager(RGWRados *_store, RGWBucketInfo& _target_bucket_info, int _num_target_shards) : store(_store), target_bucket_info(_target_bucket_info),
- num_target_shards(_num_target_shards) {
- target_shards.resize(num_target_shards);
- for (int i = 0; i < num_target_shards; ++i) {
- target_shards[i] = new BucketReshardShard(store, target_bucket_info, i, completions);
- }
- }
-
- ~BucketReshardManager() {
- for (auto& shard : target_shards) {
- int ret = shard->wait_all_aio();
- if (ret < 0) {
- ldout(store->ctx(), 20) << __func__ << ": shard->wait_all_aio() returned ret=" << ret << dendl;
- }
- }
- }
-
- int add_entry(int shard_index,
- rgw_cls_bi_entry& entry, bool account, uint8_t category,
- const rgw_bucket_category_stats& entry_stats) {
- int ret = target_shards[shard_index]->add_entry(entry, account, category, entry_stats);
- if (ret < 0) {
- cerr << "ERROR: target_shards.add_entry(" << entry.idx << ") returned error: " << cpp_strerror(-ret) << std::endl;
- return ret;
- }
- return 0;
- }
-
- int finish() {
- int ret = 0;
- for (auto& shard : target_shards) {
- int r = shard->flush();
- if (r < 0) {
- cerr << "ERROR: target_shards[" << shard->get_num_shard() << "].flush() returned error: " << cpp_strerror(-r) << std::endl;
- ret = r;
- }
- }
- for (auto& shard : target_shards) {
- int r = shard->wait_all_aio();
- if (r < 0) {
- cerr << "ERROR: target_shards[" << shard->get_num_shard() << "].wait_all_aio() returned error: " << cpp_strerror(-r) << std::endl;
- ret = r;
- }
- delete shard;
- }
- target_shards.clear();
- return ret;
- }
-};
-
-
int check_reshard_bucket_params(RGWRados *store,
const string& bucket_name,
const string& tenant,
return 0;
}
-int create_new_bucket_instance(RGWRados *store,
- int new_num_shards,
- const RGWBucketInfo& bucket_info,
- map<string, bufferlist>& attrs,
- RGWBucketInfo& new_bucket_info)
-{
-
- store->create_bucket_id(&new_bucket_info.bucket.bucket_id);
- new_bucket_info.bucket.oid.clear();
-
- new_bucket_info.num_shards = new_num_shards;
- new_bucket_info.objv_tracker.clear();
-
- int ret = store->init_bucket_index(new_bucket_info, new_bucket_info.num_shards);
- if (ret < 0) {
- cerr << "ERROR: failed to init new bucket indexes: " << cpp_strerror(-ret) << std::endl;
- return -ret;
- }
-
- ret = store->put_bucket_instance_info(new_bucket_info, true, real_time(), &attrs);
- if (ret < 0) {
- cerr << "ERROR: failed to store new bucket instance info: " << cpp_strerror(-ret) << std::endl;
- return -ret;
- }
-
- return 0;
-}
-
-int reshard_bucket(RGWRados *store,
- Formatter *formatter,
- int num_shards,
- rgw_bucket& bucket,
- RGWBucketInfo& bucket_info,
- RGWBucketInfo& new_bucket_info,
- int max_entries,
- RGWBucketAdminOpState& bucket_op,
- bool verbose)
-{
-
- int ret = 0;
-
- cout << "*** NOTICE: operation will not remove old bucket index objects ***" << std::endl;
- cout << "*** these will need to be removed manually ***" << std::endl;
- cout << "old bucket instance id: " << bucket_info.bucket.bucket_id << std::endl;
- cout << "new bucket instance id: " << new_bucket_info.bucket.bucket_id << std::endl;
-
- list<rgw_cls_bi_entry> entries;
-
- if (max_entries < 0) {
- max_entries = 1000;
- }
-
- int num_target_shards = (new_bucket_info.num_shards > 0 ? new_bucket_info.num_shards : 1);
-
- BucketReshardManager target_shards_mgr(store, new_bucket_info, num_target_shards);
-
- if (verbose) {
- formatter->open_array_section("entries");
- }
-
- uint64_t total_entries = 0;
-
- if (!verbose) {
- cout << "total entries:";
- }
-
- int num_source_shards = (bucket_info.num_shards > 0 ? bucket_info.num_shards : 1);
- string marker;
- for (int i = 0; i < num_source_shards; ++i) {
- bool is_truncated = true;
- marker.clear();
- while (is_truncated) {
- entries.clear();
- ret = store->bi_list(bucket, i, string(), marker, max_entries, &entries, &is_truncated);
- if (ret < 0) {
- cerr << "ERROR: bi_list(): " << cpp_strerror(-ret) << std::endl;
- return -ret;
- }
-
- list<rgw_cls_bi_entry>::iterator iter;
- for (iter = entries.begin(); iter != entries.end(); ++iter) {
- rgw_cls_bi_entry& entry = *iter;
- if (verbose) {
- formatter->open_object_section("entry");
-
- encode_json("shard_id", i, formatter);
- encode_json("num_entry", total_entries, formatter);
- encode_json("entry", entry, formatter);
- }
- total_entries++;
-
- marker = entry.idx;
-
- int target_shard_id;
- cls_rgw_obj_key cls_key;
- uint8_t category;
- rgw_bucket_category_stats stats;
- bool account = entry.get_info(&cls_key, &category, &stats);
- rgw_obj_key key(cls_key);
- rgw_obj obj(new_bucket_info.bucket, key);
- int ret = store->get_target_shard_id(new_bucket_info, obj.get_hash_object(), &target_shard_id);
- if (ret < 0) {
- cerr << "ERROR: get_target_shard_id() returned ret=" << ret << std::endl;
- return ret;
- }
-
- int shard_index = (target_shard_id > 0 ? target_shard_id : 0);
-
- ret = target_shards_mgr.add_entry(shard_index, entry, account, category, stats);
- if (ret < 0) {
- return ret;
- }
- if (verbose) {
- formatter->close_section();
- formatter->flush(cout);
- formatter->flush(cout);
- } else if (!(total_entries % 1000)) {
- cout << " " << total_entries;
- }
- }
- }
- }
- if (verbose) {
- formatter->close_section();
- formatter->flush(cout);
- } else {
- cout << " " << total_entries << std::endl;
- }
-
- ret = target_shards_mgr.finish();
- if (ret < 0) {
- cerr << "ERROR: failed to reshard" << std::endl;
- return EIO;
- }
-
- bucket_op.set_bucket_id(new_bucket_info.bucket.bucket_id);
- bucket_op.set_user_id(new_bucket_info.owner);
- string err;
- int r = RGWBucketAdminOp::link(store, bucket_op, &err);
- if (r < 0) {
- cerr << "failed to link new bucket instance (bucket_id=" << new_bucket_info.bucket.bucket_id << ": " << err << "; " << cpp_strerror(-r) << std::endl;
- return -r;
- }
- return 0;
-}
-
#ifdef BUILDING_FOR_EMBEDDED
extern "C" int cephd_rgw_admin(int argc, const char **argv)
#else
return ret;
}
- RGWBucketInfo new_bucket_info(bucket_info);
- ret = create_new_bucket_instance(store, num_shards, bucket_info, attrs,
- new_bucket_info);
- if (ret < 0) {
- return ret;
- }
+ RGWBucketReshard br(store, bucket_info, attrs);
- return reshard_bucket(store,
- formatter,
- num_shards,
- bucket,
- bucket_info,
- new_bucket_info,
- max_entries,
- bucket_op,
- verbose);
+ return br.execute(num_shards, max_entries,
+ verbose, &cout, formatter);
}
if (opt_cmd == OPT_RESHARD_ADD) {
int num_source_shards = (bucket_info.num_shards > 0 ? bucket_info.num_shards : 1);
- RGWReshard reshard(g_ceph_context, store);
+ RGWReshard reshard(store);
cls_rgw_reshard_entry entry;
entry.time = real_clock::now();
entry.tenant = tenant;
max_entries = 1000;
}
- RGWReshard reshard(g_ceph_context, store);
+ RGWReshard reshard(store);
formatter->open_array_section("reshard");
do {
return 0;
}
+#if 0
if (opt_cmd == OPT_RESHARD_EXECUTE) {
- RGWReshard reshard(g_ceph_context, store);
-
+ RGWReshard reshard(store);
if (bucket_name.empty()) {
cerr << "ERROR: bucket not specified" << std::endl;
return EINVAL;
return -ret;
}
+ RGWBucketReshard bucket_reshard(store, bucket_info);
+
cls_rgw_reshard_entry entry;
entry.tenant = tenant;
entry.bucket_name = bucket_name;
return 0;
}
+#endif
if (opt_cmd == OPT_RESHARD_CANCEL) {
- RGWReshard reshard(g_ceph_context, store);
+ RGWReshard reshard(store);
if (bucket_name.empty()) {
cerr << "ERROR: bucket not specified" << std::endl;
// vim: ts=8 sw=2 smarttab
#include "rgw_rados.h"
+#include "rgw_bucket.h"
#include "rgw_reshard.h"
#include "cls/rgw/cls_rgw_client.h"
#include "cls/lock/cls_lock_client.h"
#include "common/errno.h"
+#include "common/ceph_json.h"
#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_rgw
const string reshard_lock_name = "reshard_process";
const string bucket_instance_lock_name = "bucket_instance_lock";
-RGWBucketReshard::RGWBucketReshard(RGWRados *_store, const RGWBucketInfo& _bucket_info) :
- store(_store), bucket_info(_bucket_info),
+using namespace std;
+
+#define RESHARD_SHARD_WINDOW 64
+#define RESHARD_MAX_AIO 128
+
+class BucketReshardShard {
+ RGWRados *store;
+ const RGWBucketInfo& bucket_info;
+ int num_shard;
+ RGWRados::BucketShard bs;
+ vector<rgw_cls_bi_entry> entries;
+ map<uint8_t, rgw_bucket_category_stats> stats;
+ deque<librados::AioCompletion *>& aio_completions;
+
+ int wait_next_completion() {
+ librados::AioCompletion *c = aio_completions.front();
+ aio_completions.pop_front();
+
+ c->wait_for_safe();
+
+ int ret = c->get_return_value();
+ c->release();
+
+ if (ret < 0) {
+ cerr << "ERROR: reshard rados operation failed: " << cpp_strerror(-ret) << std::endl;
+ return ret;
+ }
+
+ return 0;
+ }
+
+ int get_completion(librados::AioCompletion **c) {
+ if (aio_completions.size() >= RESHARD_MAX_AIO) {
+ int ret = wait_next_completion();
+ if (ret < 0) {
+ return ret;
+ }
+ }
+
+ *c = librados::Rados::aio_create_completion(nullptr, nullptr, nullptr);
+ aio_completions.push_back(*c);
+
+ return 0;
+ }
+
+public:
+ BucketReshardShard(RGWRados *_store, const RGWBucketInfo& _bucket_info,
+ int _num_shard,
+ deque<librados::AioCompletion *>& _completions) : store(_store), bucket_info(_bucket_info), bs(store),
+ aio_completions(_completions) {
+ num_shard = (bucket_info.num_shards > 0 ? _num_shard : -1);
+ bs.init(bucket_info.bucket, num_shard);
+ }
+
+ int get_num_shard() {
+ return num_shard;
+ }
+
+ int add_entry(rgw_cls_bi_entry& entry, bool account, uint8_t category,
+ const rgw_bucket_category_stats& entry_stats) {
+ entries.push_back(entry);
+ if (account) {
+ rgw_bucket_category_stats& target = stats[category];
+ target.num_entries += entry_stats.num_entries;
+ target.total_size += entry_stats.total_size;
+ target.total_size_rounded += entry_stats.total_size_rounded;
+ }
+ if (entries.size() >= RESHARD_SHARD_WINDOW) {
+ int ret = flush();
+ if (ret < 0) {
+ return ret;
+ }
+ }
+ return 0;
+ }
+ int flush() {
+ if (entries.size() == 0) {
+ return 0;
+ }
+
+ librados::ObjectWriteOperation op;
+ for (auto& entry : entries) {
+ store->bi_put(op, bs, entry);
+ }
+ cls_rgw_bucket_update_stats(op, false, stats);
+
+ librados::AioCompletion *c;
+ int ret = get_completion(&c);
+ if (ret < 0) {
+ return ret;
+ }
+ ret = bs.index_ctx.aio_operate(bs.bucket_obj, c, &op);
+ if (ret < 0) {
+ std::cerr << "ERROR: failed to store entries in target bucket shard (bs=" << bs.bucket << "/" << bs.shard_id << ") error=" << cpp_strerror(-ret) << std::endl;
+ return ret;
+ }
+ entries.clear();
+ stats.clear();
+ return 0;
+ }
+
+ int wait_all_aio() {
+ int ret = 0;
+ while (!aio_completions.empty()) {
+ int r = wait_next_completion();
+ if (r < 0) {
+ ret = r;
+ }
+ }
+ return ret;
+ }
+};
+
+class BucketReshardManager {
+ RGWRados *store;
+ const RGWBucketInfo& target_bucket_info;
+ deque<librados::AioCompletion *> completions;
+ int num_target_shards;
+ vector<BucketReshardShard *> target_shards;
+
+public:
+ BucketReshardManager(RGWRados *_store, const RGWBucketInfo& _target_bucket_info, int _num_target_shards) : store(_store), target_bucket_info(_target_bucket_info),
+ num_target_shards(_num_target_shards) {
+ target_shards.resize(num_target_shards);
+ for (int i = 0; i < num_target_shards; ++i) {
+ target_shards[i] = new BucketReshardShard(store, target_bucket_info, i, completions);
+ }
+ }
+
+ ~BucketReshardManager() {
+ for (auto& shard : target_shards) {
+ int ret = shard->wait_all_aio();
+ if (ret < 0) {
+ ldout(store->ctx(), 20) << __func__ << ": shard->wait_all_aio() returned ret=" << ret << dendl;
+ }
+ }
+ }
+
+ int add_entry(int shard_index,
+ rgw_cls_bi_entry& entry, bool account, uint8_t category,
+ const rgw_bucket_category_stats& entry_stats) {
+ int ret = target_shards[shard_index]->add_entry(entry, account, category, entry_stats);
+ if (ret < 0) {
+ cerr << "ERROR: target_shards.add_entry(" << entry.idx << ") returned error: " << cpp_strerror(-ret) << std::endl;
+ return ret;
+ }
+ return 0;
+ }
+
+ int finish() {
+ int ret = 0;
+ for (auto& shard : target_shards) {
+ int r = shard->flush();
+ if (r < 0) {
+ cerr << "ERROR: target_shards[" << shard->get_num_shard() << "].flush() returned error: " << cpp_strerror(-r) << std::endl;
+ ret = r;
+ }
+ }
+ for (auto& shard : target_shards) {
+ int r = shard->wait_all_aio();
+ if (r < 0) {
+ cerr << "ERROR: target_shards[" << shard->get_num_shard() << "].wait_all_aio() returned error: " << cpp_strerror(-r) << std::endl;
+ ret = r;
+ }
+ delete shard;
+ }
+ target_shards.clear();
+ return ret;
+ }
+};
+
+RGWBucketReshard::RGWBucketReshard(RGWRados *_store, const RGWBucketInfo& _bucket_info, const map<string, bufferlist>& _bucket_attrs) :
+ store(_store), bucket_info(_bucket_info), bucket_attrs(_bucket_attrs),
reshard_lock(reshard_lock_name) {
const rgw_bucket& b = bucket_info.bucket;
reshard_oid = b.tenant + (b.tenant.empty() ? "" : ":") + b.name + ":" + b.bucket_id;
return 0;
}
-RGWReshard::RGWReshard(CephContext *_cct, RGWRados* _store):cct(_cct), store(_store),
- instance_lock(bucket_instance_lock_name)
+int RGWBucketReshard::create_new_bucket_instance(int new_num_shards,
+ RGWBucketInfo& new_bucket_info)
+{
+ store->create_bucket_id(&new_bucket_info.bucket.bucket_id);
+ new_bucket_info.bucket.oid.clear();
+
+ new_bucket_info.num_shards = new_num_shards;
+ new_bucket_info.objv_tracker.clear();
+
+ int ret = store->init_bucket_index(new_bucket_info, new_bucket_info.num_shards);
+ if (ret < 0) {
+ cerr << "ERROR: failed to init new bucket indexes: " << cpp_strerror(-ret) << std::endl;
+ return -ret;
+ }
+
+ ret = store->put_bucket_instance_info(new_bucket_info, true, real_time(), &bucket_attrs);
+ if (ret < 0) {
+ cerr << "ERROR: failed to store new bucket instance info: " << cpp_strerror(-ret) << std::endl;
+ return -ret;
+ }
+
+ return 0;
+}
+
+int RGWBucketReshard::do_reshard(
+ int num_shards,
+ const RGWBucketInfo& new_bucket_info,
+ int max_entries,
+ bool verbose,
+ ostream *out,
+ Formatter *formatter)
+{
+ rgw_bucket& bucket = bucket_info.bucket;
+
+ int ret = 0;
+
+ if (out) {
+ (*out) << "*** NOTICE: operation will not remove old bucket index objects ***" << std::endl;
+ (*out) << "*** these will need to be removed manually ***" << std::endl;
+ (*out) << "old bucket instance id: " << bucket_info.bucket.bucket_id << std::endl;
+ (*out) << "new bucket instance id: " << new_bucket_info.bucket.bucket_id << std::endl;
+ }
+
+ list<rgw_cls_bi_entry> entries;
+
+ if (max_entries < 0) {
+ ldout(store->ctx(), 0) << __func__ << ": can't reshard, negative max_entries" << dendl;
+ return -EINVAL;
+ }
+
+ int num_target_shards = (new_bucket_info.num_shards > 0 ? new_bucket_info.num_shards : 1);
+
+ BucketReshardManager target_shards_mgr(store, new_bucket_info, num_target_shards);
+
+ verbose = verbose && (formatter != nullptr);
+
+ if (verbose) {
+ formatter->open_array_section("entries");
+ }
+
+ uint64_t total_entries = 0;
+
+ if (!verbose) {
+ cout << "total entries:";
+ }
+
+ int num_source_shards = (bucket_info.num_shards > 0 ? bucket_info.num_shards : 1);
+ string marker;
+ for (int i = 0; i < num_source_shards; ++i) {
+ bool is_truncated = true;
+ marker.clear();
+ while (is_truncated) {
+ entries.clear();
+ ret = store->bi_list(bucket, i, string(), marker, max_entries, &entries, &is_truncated);
+ if (ret < 0) {
+ derr << "ERROR: bi_list(): " << cpp_strerror(-ret) << dendl;
+ return -ret;
+ }
+
+ list<rgw_cls_bi_entry>::iterator iter;
+ for (iter = entries.begin(); iter != entries.end(); ++iter) {
+ rgw_cls_bi_entry& entry = *iter;
+ if (verbose) {
+ formatter->open_object_section("entry");
+
+ encode_json("shard_id", i, formatter);
+ encode_json("num_entry", total_entries, formatter);
+ encode_json("entry", entry, formatter);
+ }
+ total_entries++;
+
+ marker = entry.idx;
+
+ int target_shard_id;
+ cls_rgw_obj_key cls_key;
+ uint8_t category;
+ rgw_bucket_category_stats stats;
+ bool account = entry.get_info(&cls_key, &category, &stats);
+ rgw_obj_key key(cls_key);
+ rgw_obj obj(new_bucket_info.bucket, key);
+ int ret = store->get_target_shard_id(new_bucket_info, obj.get_hash_object(), &target_shard_id);
+ if (ret < 0) {
+ lderr(store->ctx()) << "ERROR: get_target_shard_id() returned ret=" << ret << dendl;
+ return ret;
+ }
+
+ int shard_index = (target_shard_id > 0 ? target_shard_id : 0);
+
+ ret = target_shards_mgr.add_entry(shard_index, entry, account, category, stats);
+ if (ret < 0) {
+ return ret;
+ }
+ if (verbose) {
+ formatter->close_section();
+ formatter->flush(*out);
+ formatter->flush(*out);
+ } else if (out && !(total_entries % 1000)) {
+ (*out) << " " << total_entries;
+ }
+ }
+ }
+ }
+ if (verbose) {
+ formatter->close_section();
+ formatter->flush(*out);
+ } else if (out) {
+ (*out) << " " << total_entries << std::endl;
+ }
+
+ ret = target_shards_mgr.finish();
+ if (ret < 0) {
+ lderr(store->ctx()) << "ERROR: failed to reshard" << dendl;
+ return EIO;
+ }
+
+ RGWBucketAdminOpState bucket_op;
+
+ bucket_op.set_bucket_name(new_bucket_info.bucket.name);
+ bucket_op.set_bucket_id(new_bucket_info.bucket.bucket_id);
+ bucket_op.set_user_id(new_bucket_info.owner);
+ string err;
+ int r = RGWBucketAdminOp::link(store, bucket_op, &err);
+ if (r < 0) {
+ lderr(store->ctx()) << "failed to link new bucket instance (bucket_id=" << new_bucket_info.bucket.bucket_id << ": " << err << "; " << cpp_strerror(-r) << dendl;
+ return -r;
+ }
+ return 0;
+}
+
+int RGWBucketReshard::execute(int num_shards, int max_op_entries,
+ bool verbose, ostream *out, Formatter *formatter)
+
+{
+ int ret = lock_bucket();
+ if (ret < 0) {
+ return ret;
+ }
+
+ RGWBucketInfo new_bucket_info;
+
+ ret = create_new_bucket_instance(num_shards, new_bucket_info);
+ if (ret < 0) {
+ return ret;
+ }
+
+ ret = do_reshard(num_shards,
+ new_bucket_info,
+ max_op_entries,
+ verbose, out, formatter);
+
+ if (ret < 0) {
+ return ret;
+ }
+
+ unlock_bucket();
+
+ return 0;
+}
+
+
+RGWReshard::RGWReshard(RGWRados* _store): store(_store), instance_lock(bucket_instance_lock_name)
{
- max_jobs = cct->_conf->rgw_reshard_max_jobs;
+ max_jobs = store->ctx()->_conf->rgw_reshard_max_jobs;
}
int ret = l.lock_exclusive(&store->reshard_pool_ctx, reshard_oid);
if (ret == -EBUSY) {
- ldout(cct, 0) << "RGWReshard::add failed to acquire lock on " << reshard_oid << dendl;
+ ldout(store->ctx(), 0) << "RGWReshard::add failed to acquire lock on " << reshard_oid << dendl;
return 0;
}
if (ret < 0)
int ret = l.lock_shared(&store->reshard_pool_ctx, reshard_oid);
if (ret == -EBUSY) {
- ldout(cct, 0) << "RGWReshard::list failed to acquire lock on " << reshard_oid << dendl;
+ ldout(store->ctx(), 0) << "RGWReshard::list failed to acquire lock on " << reshard_oid << dendl;
return 0;
}
if (ret < 0)
int ret = l.lock_shared(&store->reshard_pool_ctx, reshard_oid);
if (ret == -EBUSY) {
- ldout(cct, 0) << "RGWReshardLog::get failed to acquire lock on " << reshard_oid << dendl;
+ ldout(store->ctx(), 0) << "RGWReshardLog::get failed to acquire lock on " << reshard_oid << dendl;
return 0;
}
if (ret < 0)
int ret = l.lock_exclusive(&store->reshard_pool_ctx, reshard_oid);
if (ret == -EBUSY) {
- ldout(cct, 0) << "RGWReshardLog::remove failed to acquire lock on " << reshard_oid << dendl;
+ ldout(store->ctx(), 0) << "RGWReshardLog::remove failed to acquire lock on " << reshard_oid << dendl;
return 0;
}
if (ret < 0)
int ret = l.lock_exclusive(&store->reshard_pool_ctx, bucket_instance_oid);
if (ret == -EBUSY) {
- ldout(cct, 0) << "RGWReshardLog::add failed to acquire lock on " << reshard_oid << dendl;
+ ldout(store->ctx(), 0) << "RGWReshardLog::add failed to acquire lock on " << reshard_oid << dendl;
return 0;
}
if (ret < 0)
{
int ret = instance_lock.lock_shared(&store->reshard_pool_ctx, oid);
if (ret == -EBUSY) {
- ldout(cct, 0) << "RGWReshardLog::add failed to acquire lock on " << reshard_oid << dendl;
+ ldout(store->ctx(), 0) << "RGWReshardLog::add failed to acquire lock on " << reshard_oid << dendl;
return 0;
}
ret = cls_rgw_get_bucket_resharding(store->reshard_pool_ctx, bucket_instance_oid, &entry);
if (ret < 0) {
- ldout(cct, 0) << "RGWReshard::" << __func__ << " ERROR: failed to get bucket resharding :" <<
+ ldout(store->ctx(), 0) << "RGWReshard::" << __func__ << " ERROR: failed to get bucket resharding :" <<
cpp_strerror(-ret)<< dendl;
return ret;
}
/* needed to unlock as clear resharding uses the same lock */
sleep(default_reshard_sleep_duration);
}
- ldout(cct, 0) << "RGWReshard::" << __func__ << " ERROR: bucket is still resharding, please retry" << dendl;
+ ldout(store->ctx(), 0) << "RGWReshard::" << __func__ << " ERROR: bucket is still resharding, please retry" << dendl;
return -EAGAIN;
}
-BucketIndexLockGuard::BucketIndexLockGuard(CephContext* _cct, RGWRados* _store,
- const string& bucket_instance_id, const string& _oid, const librados::IoCtx& _io_ctx) :
- cct(_cct),store(_store),
+BucketIndexLockGuard::BucketIndexLockGuard(CephContext *_cct, RGWRados* _store, const string& bucket_instance_id, const string& _oid, const librados::IoCtx& _io_ctx) :
+ cct(_cct), store(_store),
l(create_bucket_index_lock_name(bucket_instance_id)),
oid(_oid), io_ctx(_io_ctx),locked(false)
{