From: Yehuda Sadeh Date: Tue, 9 May 2017 22:19:10 +0000 (-0700) Subject: rgw: move resharding code to BucketReshard class X-Git-Tag: ses5-milestone6~8^2~7^2~57 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=bb0cb4d7a51942288b834515bddc4ad75a3a4e56;p=ceph.git rgw: move resharding code to BucketReshard class Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index 478f1124c1d9..a5692985dbdb 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -191,9 +191,9 @@ void _usage() cout << " role-policy get get the specified inline policy document embedded with the given role\n"; cout << " role-policy delete delete policy attached to a role\n"; cout << " reshard add schedule a resharding of a bucket\n"; - cout << " reshard list list all bucket resharding or scheduled to be reshared\n"; - cout << " reshard execute execute resharding of a bucket \n"; - cout << " reshard cancel cancel resharding a bucket\n"; + cout << " reshard list list all bucket resharding or scheduled to be reshared\n"; + cout << " reshard execute execute resharding of a bucket \n"; + cout << " reshard cancel cancel resharding a bucket\n"; cout << "options:\n"; cout << " --tenant= tenant name\n"; cout << " --uid= user id\n"; @@ -2215,176 +2215,6 @@ static void parse_tier_config_param(const string& s, map entries; - map stats; - deque& aio_completions; - - int wait_next_completion() { - librados::AioCompletion *c = aio_completions.front(); - aio_completions.pop_front(); - - c->wait_for_safe(); - - int ret = c->get_return_value(); - c->release(); - - if (ret < 0) { - cerr << "ERROR: reshard rados operation failed: " << cpp_strerror(-ret) << std::endl; - return ret; - } - - return 0; - } - - int get_completion(librados::AioCompletion **c) { - if (aio_completions.size() >= RESHARD_MAX_AIO) { - int ret = wait_next_completion(); - if (ret < 0) { - return ret; - } - } - - *c = librados::Rados::aio_create_completion(nullptr, nullptr, nullptr); - aio_completions.push_back(*c); - - return 0; - } - -public: - BucketReshardShard(RGWRados *_store, RGWBucketInfo& _bucket_info, - int _num_shard, - deque& _completions) : store(_store), bucket_info(_bucket_info), bs(store), - aio_completions(_completions) { - num_shard = (bucket_info.num_shards > 0 ? _num_shard : -1); - bs.init(bucket_info.bucket, num_shard); - } - - int get_num_shard() { - return num_shard; - } - - int add_entry(rgw_cls_bi_entry& entry, bool account, uint8_t category, - const rgw_bucket_category_stats& entry_stats) { - entries.push_back(entry); - if (account) { - rgw_bucket_category_stats& target = stats[category]; - target.num_entries += entry_stats.num_entries; - target.total_size += entry_stats.total_size; - target.total_size_rounded += entry_stats.total_size_rounded; - } - if (entries.size() >= RESHARD_SHARD_WINDOW) { - int ret = flush(); - if (ret < 0) { - return ret; - } - } - return 0; - } - int flush() { - if (entries.size() == 0) { - return 0; - } - - librados::ObjectWriteOperation op; - for (auto& entry : entries) { - store->bi_put(op, bs, entry); - } - cls_rgw_bucket_update_stats(op, false, stats); - - librados::AioCompletion *c; - int ret = get_completion(&c); - if (ret < 0) { - return ret; - } - ret = bs.index_ctx.aio_operate(bs.bucket_obj, c, &op); - if (ret < 0) { - std::cerr << "ERROR: failed to store entries in target bucket shard (bs=" << bs.bucket << "/" << bs.shard_id << ") error=" << cpp_strerror(-ret) << std::endl; - return ret; - } - entries.clear(); - stats.clear(); - return 0; - } - - int wait_all_aio() { - int ret = 0; - while (!aio_completions.empty()) { - int r = wait_next_completion(); - if (r < 0) { - ret = r; - } - } - return ret; - } -}; - -class BucketReshardManager { - RGWRados *store; - RGWBucketInfo& target_bucket_info; - deque completions; - int num_target_shards; - vector target_shards; - -public: - BucketReshardManager(RGWRados *_store, RGWBucketInfo& _target_bucket_info, int _num_target_shards) : store(_store), target_bucket_info(_target_bucket_info), - num_target_shards(_num_target_shards) { - target_shards.resize(num_target_shards); - for (int i = 0; i < num_target_shards; ++i) { - target_shards[i] = new BucketReshardShard(store, target_bucket_info, i, completions); - } - } - - ~BucketReshardManager() { - for (auto& shard : target_shards) { - int ret = shard->wait_all_aio(); - if (ret < 0) { - ldout(store->ctx(), 20) << __func__ << ": shard->wait_all_aio() returned ret=" << ret << dendl; - } - } - } - - int add_entry(int shard_index, - rgw_cls_bi_entry& entry, bool account, uint8_t category, - const rgw_bucket_category_stats& entry_stats) { - int ret = target_shards[shard_index]->add_entry(entry, account, category, entry_stats); - if (ret < 0) { - cerr << "ERROR: target_shards.add_entry(" << entry.idx << ") returned error: " << cpp_strerror(-ret) << std::endl; - return ret; - } - return 0; - } - - int finish() { - int ret = 0; - for (auto& shard : target_shards) { - int r = shard->flush(); - if (r < 0) { - cerr << "ERROR: target_shards[" << shard->get_num_shard() << "].flush() returned error: " << cpp_strerror(-r) << std::endl; - ret = r; - } - } - for (auto& shard : target_shards) { - int r = shard->wait_all_aio(); - if (r < 0) { - cerr << "ERROR: target_shards[" << shard->get_num_shard() << "].wait_all_aio() returned error: " << cpp_strerror(-r) << std::endl; - ret = r; - } - delete shard; - } - target_shards.clear(); - return ret; - } -}; - - int check_reshard_bucket_params(RGWRados *store, const string& bucket_name, const string& tenant, @@ -2427,152 +2257,6 @@ int check_reshard_bucket_params(RGWRados *store, return 0; } -int create_new_bucket_instance(RGWRados *store, - int new_num_shards, - const RGWBucketInfo& bucket_info, - map& attrs, - RGWBucketInfo& new_bucket_info) -{ - - store->create_bucket_id(&new_bucket_info.bucket.bucket_id); - new_bucket_info.bucket.oid.clear(); - - new_bucket_info.num_shards = new_num_shards; - new_bucket_info.objv_tracker.clear(); - - int ret = store->init_bucket_index(new_bucket_info, new_bucket_info.num_shards); - if (ret < 0) { - cerr << "ERROR: failed to init new bucket indexes: " << cpp_strerror(-ret) << std::endl; - return -ret; - } - - ret = store->put_bucket_instance_info(new_bucket_info, true, real_time(), &attrs); - if (ret < 0) { - cerr << "ERROR: failed to store new bucket instance info: " << cpp_strerror(-ret) << std::endl; - return -ret; - } - - return 0; -} - -int reshard_bucket(RGWRados *store, - Formatter *formatter, - int num_shards, - rgw_bucket& bucket, - RGWBucketInfo& bucket_info, - RGWBucketInfo& new_bucket_info, - int max_entries, - RGWBucketAdminOpState& bucket_op, - bool verbose) -{ - - int ret = 0; - - cout << "*** NOTICE: operation will not remove old bucket index objects ***" << std::endl; - cout << "*** these will need to be removed manually ***" << std::endl; - cout << "old bucket instance id: " << bucket_info.bucket.bucket_id << std::endl; - cout << "new bucket instance id: " << new_bucket_info.bucket.bucket_id << std::endl; - - list entries; - - if (max_entries < 0) { - max_entries = 1000; - } - - int num_target_shards = (new_bucket_info.num_shards > 0 ? new_bucket_info.num_shards : 1); - - BucketReshardManager target_shards_mgr(store, new_bucket_info, num_target_shards); - - if (verbose) { - formatter->open_array_section("entries"); - } - - uint64_t total_entries = 0; - - if (!verbose) { - cout << "total entries:"; - } - - int num_source_shards = (bucket_info.num_shards > 0 ? bucket_info.num_shards : 1); - string marker; - for (int i = 0; i < num_source_shards; ++i) { - bool is_truncated = true; - marker.clear(); - while (is_truncated) { - entries.clear(); - ret = store->bi_list(bucket, i, string(), marker, max_entries, &entries, &is_truncated); - if (ret < 0) { - cerr << "ERROR: bi_list(): " << cpp_strerror(-ret) << std::endl; - return -ret; - } - - list::iterator iter; - for (iter = entries.begin(); iter != entries.end(); ++iter) { - rgw_cls_bi_entry& entry = *iter; - if (verbose) { - formatter->open_object_section("entry"); - - encode_json("shard_id", i, formatter); - encode_json("num_entry", total_entries, formatter); - encode_json("entry", entry, formatter); - } - total_entries++; - - marker = entry.idx; - - int target_shard_id; - cls_rgw_obj_key cls_key; - uint8_t category; - rgw_bucket_category_stats stats; - bool account = entry.get_info(&cls_key, &category, &stats); - rgw_obj_key key(cls_key); - rgw_obj obj(new_bucket_info.bucket, key); - int ret = store->get_target_shard_id(new_bucket_info, obj.get_hash_object(), &target_shard_id); - if (ret < 0) { - cerr << "ERROR: get_target_shard_id() returned ret=" << ret << std::endl; - return ret; - } - - int shard_index = (target_shard_id > 0 ? target_shard_id : 0); - - ret = target_shards_mgr.add_entry(shard_index, entry, account, category, stats); - if (ret < 0) { - return ret; - } - if (verbose) { - formatter->close_section(); - formatter->flush(cout); - formatter->flush(cout); - } else if (!(total_entries % 1000)) { - cout << " " << total_entries; - } - } - } - } - if (verbose) { - formatter->close_section(); - formatter->flush(cout); - } else { - cout << " " << total_entries << std::endl; - } - - ret = target_shards_mgr.finish(); - if (ret < 0) { - cerr << "ERROR: failed to reshard" << std::endl; - return EIO; - } - - bucket_op.set_bucket_id(new_bucket_info.bucket.bucket_id); - bucket_op.set_user_id(new_bucket_info.owner); - string err; - int r = RGWBucketAdminOp::link(store, bucket_op, &err); - if (r < 0) { - cerr << "failed to link new bucket instance (bucket_id=" << new_bucket_info.bucket.bucket_id << ": " << err << "; " << cpp_strerror(-r) << std::endl; - return -r; - } - return 0; -} - #ifdef BUILDING_FOR_EMBEDDED extern "C" int cephd_rgw_admin(int argc, const char **argv) #else @@ -5860,22 +5544,10 @@ next: return ret; } - RGWBucketInfo new_bucket_info(bucket_info); - ret = create_new_bucket_instance(store, num_shards, bucket_info, attrs, - new_bucket_info); - if (ret < 0) { - return ret; - } + RGWBucketReshard br(store, bucket_info, attrs); - return reshard_bucket(store, - formatter, - num_shards, - bucket, - bucket_info, - new_bucket_info, - max_entries, - bucket_op, - verbose); + return br.execute(num_shards, max_entries, + verbose, &cout, formatter); } if (opt_cmd == OPT_RESHARD_ADD) { @@ -5899,7 +5571,7 @@ next: int num_source_shards = (bucket_info.num_shards > 0 ? bucket_info.num_shards : 1); - RGWReshard reshard(g_ceph_context, store); + RGWReshard reshard(store); cls_rgw_reshard_entry entry; entry.time = real_clock::now(); entry.tenant = tenant; @@ -5921,7 +5593,7 @@ next: max_entries = 1000; } - RGWReshard reshard(g_ceph_context, store); + RGWReshard reshard(store); formatter->open_array_section("reshard"); do { @@ -5944,9 +5616,9 @@ next: return 0; } +#if 0 if (opt_cmd == OPT_RESHARD_EXECUTE) { - RGWReshard reshard(g_ceph_context, store); - + RGWReshard reshard(store); if (bucket_name.empty()) { cerr << "ERROR: bucket not specified" << std::endl; return EINVAL; @@ -5961,6 +5633,8 @@ next: return -ret; } + RGWBucketReshard bucket_reshard(store, bucket_info); + cls_rgw_reshard_entry entry; entry.tenant = tenant; entry.bucket_name = bucket_name; @@ -6016,9 +5690,10 @@ next: return 0; } +#endif if (opt_cmd == OPT_RESHARD_CANCEL) { - RGWReshard reshard(g_ceph_context, store); + RGWReshard reshard(store); if (bucket_name.empty()) { cerr << "ERROR: bucket not specified" << std::endl; diff --git a/src/rgw/rgw_bucket.h b/src/rgw/rgw_bucket.h index 14c472faa8cd..b4c32b2c570e 100644 --- a/src/rgw/rgw_bucket.h +++ b/src/rgw/rgw_bucket.h @@ -210,11 +210,11 @@ struct RGWBucketAdminOpState { void set_max_aio(int value) { max_aio = value; } - void set_user_id(rgw_user& user_id) { + void set_user_id(const rgw_user& user_id) { if (!user_id.empty()) uid = user_id; } - void set_bucket_name(std::string& bucket_str) { + void set_bucket_name(const std::string& bucket_str) { bucket_name = bucket_str; } void set_object(std::string& object_str) { diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 003c1a0db7c9..0f88e99558c9 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -4270,7 +4270,7 @@ int RGWRados::init_complete() obj_tombstone_cache = new tombstone_cache_t(cct->_conf->rgw_obj_tombstone_cache_size); } - reshard = new RGWReshard(cct, this); + reshard = new RGWReshard(this); return ret; } diff --git a/src/rgw/rgw_reshard.cc b/src/rgw/rgw_reshard.cc index 7c2fb7a0f7ff..cac34a3e4327 100644 --- a/src/rgw/rgw_reshard.cc +++ b/src/rgw/rgw_reshard.cc @@ -2,10 +2,12 @@ // vim: ts=8 sw=2 smarttab #include "rgw_rados.h" +#include "rgw_bucket.h" #include "rgw_reshard.h" #include "cls/rgw/cls_rgw_client.h" #include "cls/lock/cls_lock_client.h" #include "common/errno.h" +#include "common/ceph_json.h" #define dout_context g_ceph_context #define dout_subsys ceph_subsys_rgw @@ -14,8 +16,179 @@ const string reshard_oid = "reshard"; const string reshard_lock_name = "reshard_process"; const string bucket_instance_lock_name = "bucket_instance_lock"; -RGWBucketReshard::RGWBucketReshard(RGWRados *_store, const RGWBucketInfo& _bucket_info) : - store(_store), bucket_info(_bucket_info), +using namespace std; + +#define RESHARD_SHARD_WINDOW 64 +#define RESHARD_MAX_AIO 128 + +class BucketReshardShard { + RGWRados *store; + const RGWBucketInfo& bucket_info; + int num_shard; + RGWRados::BucketShard bs; + vector entries; + map stats; + deque& aio_completions; + + int wait_next_completion() { + librados::AioCompletion *c = aio_completions.front(); + aio_completions.pop_front(); + + c->wait_for_safe(); + + int ret = c->get_return_value(); + c->release(); + + if (ret < 0) { + cerr << "ERROR: reshard rados operation failed: " << cpp_strerror(-ret) << std::endl; + return ret; + } + + return 0; + } + + int get_completion(librados::AioCompletion **c) { + if (aio_completions.size() >= RESHARD_MAX_AIO) { + int ret = wait_next_completion(); + if (ret < 0) { + return ret; + } + } + + *c = librados::Rados::aio_create_completion(nullptr, nullptr, nullptr); + aio_completions.push_back(*c); + + return 0; + } + +public: + BucketReshardShard(RGWRados *_store, const RGWBucketInfo& _bucket_info, + int _num_shard, + deque& _completions) : store(_store), bucket_info(_bucket_info), bs(store), + aio_completions(_completions) { + num_shard = (bucket_info.num_shards > 0 ? _num_shard : -1); + bs.init(bucket_info.bucket, num_shard); + } + + int get_num_shard() { + return num_shard; + } + + int add_entry(rgw_cls_bi_entry& entry, bool account, uint8_t category, + const rgw_bucket_category_stats& entry_stats) { + entries.push_back(entry); + if (account) { + rgw_bucket_category_stats& target = stats[category]; + target.num_entries += entry_stats.num_entries; + target.total_size += entry_stats.total_size; + target.total_size_rounded += entry_stats.total_size_rounded; + } + if (entries.size() >= RESHARD_SHARD_WINDOW) { + int ret = flush(); + if (ret < 0) { + return ret; + } + } + return 0; + } + int flush() { + if (entries.size() == 0) { + return 0; + } + + librados::ObjectWriteOperation op; + for (auto& entry : entries) { + store->bi_put(op, bs, entry); + } + cls_rgw_bucket_update_stats(op, false, stats); + + librados::AioCompletion *c; + int ret = get_completion(&c); + if (ret < 0) { + return ret; + } + ret = bs.index_ctx.aio_operate(bs.bucket_obj, c, &op); + if (ret < 0) { + std::cerr << "ERROR: failed to store entries in target bucket shard (bs=" << bs.bucket << "/" << bs.shard_id << ") error=" << cpp_strerror(-ret) << std::endl; + return ret; + } + entries.clear(); + stats.clear(); + return 0; + } + + int wait_all_aio() { + int ret = 0; + while (!aio_completions.empty()) { + int r = wait_next_completion(); + if (r < 0) { + ret = r; + } + } + return ret; + } +}; + +class BucketReshardManager { + RGWRados *store; + const RGWBucketInfo& target_bucket_info; + deque completions; + int num_target_shards; + vector target_shards; + +public: + BucketReshardManager(RGWRados *_store, const RGWBucketInfo& _target_bucket_info, int _num_target_shards) : store(_store), target_bucket_info(_target_bucket_info), + num_target_shards(_num_target_shards) { + target_shards.resize(num_target_shards); + for (int i = 0; i < num_target_shards; ++i) { + target_shards[i] = new BucketReshardShard(store, target_bucket_info, i, completions); + } + } + + ~BucketReshardManager() { + for (auto& shard : target_shards) { + int ret = shard->wait_all_aio(); + if (ret < 0) { + ldout(store->ctx(), 20) << __func__ << ": shard->wait_all_aio() returned ret=" << ret << dendl; + } + } + } + + int add_entry(int shard_index, + rgw_cls_bi_entry& entry, bool account, uint8_t category, + const rgw_bucket_category_stats& entry_stats) { + int ret = target_shards[shard_index]->add_entry(entry, account, category, entry_stats); + if (ret < 0) { + cerr << "ERROR: target_shards.add_entry(" << entry.idx << ") returned error: " << cpp_strerror(-ret) << std::endl; + return ret; + } + return 0; + } + + int finish() { + int ret = 0; + for (auto& shard : target_shards) { + int r = shard->flush(); + if (r < 0) { + cerr << "ERROR: target_shards[" << shard->get_num_shard() << "].flush() returned error: " << cpp_strerror(-r) << std::endl; + ret = r; + } + } + for (auto& shard : target_shards) { + int r = shard->wait_all_aio(); + if (r < 0) { + cerr << "ERROR: target_shards[" << shard->get_num_shard() << "].wait_all_aio() returned error: " << cpp_strerror(-r) << std::endl; + ret = r; + } + delete shard; + } + target_shards.clear(); + return ret; + } +}; + +RGWBucketReshard::RGWBucketReshard(RGWRados *_store, const RGWBucketInfo& _bucket_info, const map& _bucket_attrs) : + store(_store), bucket_info(_bucket_info), bucket_attrs(_bucket_attrs), reshard_lock(reshard_lock_name) { const rgw_bucket& b = bucket_info.bucket; reshard_oid = b.tenant + (b.tenant.empty() ? "" : ":") + b.name + ":" + b.bucket_id; @@ -73,10 +246,189 @@ int RGWBucketReshard::clear_resharding() return 0; } -RGWReshard::RGWReshard(CephContext *_cct, RGWRados* _store):cct(_cct), store(_store), - instance_lock(bucket_instance_lock_name) +int RGWBucketReshard::create_new_bucket_instance(int new_num_shards, + RGWBucketInfo& new_bucket_info) +{ + store->create_bucket_id(&new_bucket_info.bucket.bucket_id); + new_bucket_info.bucket.oid.clear(); + + new_bucket_info.num_shards = new_num_shards; + new_bucket_info.objv_tracker.clear(); + + int ret = store->init_bucket_index(new_bucket_info, new_bucket_info.num_shards); + if (ret < 0) { + cerr << "ERROR: failed to init new bucket indexes: " << cpp_strerror(-ret) << std::endl; + return -ret; + } + + ret = store->put_bucket_instance_info(new_bucket_info, true, real_time(), &bucket_attrs); + if (ret < 0) { + cerr << "ERROR: failed to store new bucket instance info: " << cpp_strerror(-ret) << std::endl; + return -ret; + } + + return 0; +} + +int RGWBucketReshard::do_reshard( + int num_shards, + const RGWBucketInfo& new_bucket_info, + int max_entries, + bool verbose, + ostream *out, + Formatter *formatter) +{ + rgw_bucket& bucket = bucket_info.bucket; + + int ret = 0; + + if (out) { + (*out) << "*** NOTICE: operation will not remove old bucket index objects ***" << std::endl; + (*out) << "*** these will need to be removed manually ***" << std::endl; + (*out) << "old bucket instance id: " << bucket_info.bucket.bucket_id << std::endl; + (*out) << "new bucket instance id: " << new_bucket_info.bucket.bucket_id << std::endl; + } + + list entries; + + if (max_entries < 0) { + ldout(store->ctx(), 0) << __func__ << ": can't reshard, negative max_entries" << dendl; + return -EINVAL; + } + + int num_target_shards = (new_bucket_info.num_shards > 0 ? new_bucket_info.num_shards : 1); + + BucketReshardManager target_shards_mgr(store, new_bucket_info, num_target_shards); + + verbose = verbose && (formatter != nullptr); + + if (verbose) { + formatter->open_array_section("entries"); + } + + uint64_t total_entries = 0; + + if (!verbose) { + cout << "total entries:"; + } + + int num_source_shards = (bucket_info.num_shards > 0 ? bucket_info.num_shards : 1); + string marker; + for (int i = 0; i < num_source_shards; ++i) { + bool is_truncated = true; + marker.clear(); + while (is_truncated) { + entries.clear(); + ret = store->bi_list(bucket, i, string(), marker, max_entries, &entries, &is_truncated); + if (ret < 0) { + derr << "ERROR: bi_list(): " << cpp_strerror(-ret) << dendl; + return -ret; + } + + list::iterator iter; + for (iter = entries.begin(); iter != entries.end(); ++iter) { + rgw_cls_bi_entry& entry = *iter; + if (verbose) { + formatter->open_object_section("entry"); + + encode_json("shard_id", i, formatter); + encode_json("num_entry", total_entries, formatter); + encode_json("entry", entry, formatter); + } + total_entries++; + + marker = entry.idx; + + int target_shard_id; + cls_rgw_obj_key cls_key; + uint8_t category; + rgw_bucket_category_stats stats; + bool account = entry.get_info(&cls_key, &category, &stats); + rgw_obj_key key(cls_key); + rgw_obj obj(new_bucket_info.bucket, key); + int ret = store->get_target_shard_id(new_bucket_info, obj.get_hash_object(), &target_shard_id); + if (ret < 0) { + lderr(store->ctx()) << "ERROR: get_target_shard_id() returned ret=" << ret << dendl; + return ret; + } + + int shard_index = (target_shard_id > 0 ? target_shard_id : 0); + + ret = target_shards_mgr.add_entry(shard_index, entry, account, category, stats); + if (ret < 0) { + return ret; + } + if (verbose) { + formatter->close_section(); + formatter->flush(*out); + formatter->flush(*out); + } else if (out && !(total_entries % 1000)) { + (*out) << " " << total_entries; + } + } + } + } + if (verbose) { + formatter->close_section(); + formatter->flush(*out); + } else if (out) { + (*out) << " " << total_entries << std::endl; + } + + ret = target_shards_mgr.finish(); + if (ret < 0) { + lderr(store->ctx()) << "ERROR: failed to reshard" << dendl; + return EIO; + } + + RGWBucketAdminOpState bucket_op; + + bucket_op.set_bucket_name(new_bucket_info.bucket.name); + bucket_op.set_bucket_id(new_bucket_info.bucket.bucket_id); + bucket_op.set_user_id(new_bucket_info.owner); + string err; + int r = RGWBucketAdminOp::link(store, bucket_op, &err); + if (r < 0) { + lderr(store->ctx()) << "failed to link new bucket instance (bucket_id=" << new_bucket_info.bucket.bucket_id << ": " << err << "; " << cpp_strerror(-r) << dendl; + return -r; + } + return 0; +} + +int RGWBucketReshard::execute(int num_shards, int max_op_entries, + bool verbose, ostream *out, Formatter *formatter) + +{ + int ret = lock_bucket(); + if (ret < 0) { + return ret; + } + + RGWBucketInfo new_bucket_info; + + ret = create_new_bucket_instance(num_shards, new_bucket_info); + if (ret < 0) { + return ret; + } + + ret = do_reshard(num_shards, + new_bucket_info, + max_op_entries, + verbose, out, formatter); + + if (ret < 0) { + return ret; + } + + unlock_bucket(); + + return 0; +} + + +RGWReshard::RGWReshard(RGWRados* _store): store(_store), instance_lock(bucket_instance_lock_name) { - max_jobs = cct->_conf->rgw_reshard_max_jobs; + max_jobs = store->ctx()->_conf->rgw_reshard_max_jobs; } @@ -86,7 +438,7 @@ int RGWReshard::add(cls_rgw_reshard_entry& entry) int ret = l.lock_exclusive(&store->reshard_pool_ctx, reshard_oid); if (ret == -EBUSY) { - ldout(cct, 0) << "RGWReshard::add failed to acquire lock on " << reshard_oid << dendl; + ldout(store->ctx(), 0) << "RGWReshard::add failed to acquire lock on " << reshard_oid << dendl; return 0; } if (ret < 0) @@ -108,7 +460,7 @@ int RGWReshard::list(string& marker, uint32_t max, std::listreshard_pool_ctx, reshard_oid); if (ret == -EBUSY) { - ldout(cct, 0) << "RGWReshard::list failed to acquire lock on " << reshard_oid << dendl; + ldout(store->ctx(), 0) << "RGWReshard::list failed to acquire lock on " << reshard_oid << dendl; return 0; } if (ret < 0) @@ -126,7 +478,7 @@ int RGWReshard::get(cls_rgw_reshard_entry& entry) int ret = l.lock_shared(&store->reshard_pool_ctx, reshard_oid); if (ret == -EBUSY) { - ldout(cct, 0) << "RGWReshardLog::get failed to acquire lock on " << reshard_oid << dendl; + ldout(store->ctx(), 0) << "RGWReshardLog::get failed to acquire lock on " << reshard_oid << dendl; return 0; } if (ret < 0) @@ -144,7 +496,7 @@ int RGWReshard::remove(cls_rgw_reshard_entry& entry) int ret = l.lock_exclusive(&store->reshard_pool_ctx, reshard_oid); if (ret == -EBUSY) { - ldout(cct, 0) << "RGWReshardLog::remove failed to acquire lock on " << reshard_oid << dendl; + ldout(store->ctx(), 0) << "RGWReshardLog::remove failed to acquire lock on " << reshard_oid << dendl; return 0; } if (ret < 0) @@ -169,7 +521,7 @@ int RGWReshard::clear_bucket_resharding(const string& bucket_instance_oid, cls_r int ret = l.lock_exclusive(&store->reshard_pool_ctx, bucket_instance_oid); if (ret == -EBUSY) { - ldout(cct, 0) << "RGWReshardLog::add failed to acquire lock on " << reshard_oid << dendl; + ldout(store->ctx(), 0) << "RGWReshardLog::add failed to acquire lock on " << reshard_oid << dendl; return 0; } if (ret < 0) @@ -187,7 +539,7 @@ int RGWReshard::lock_bucket_index_shared(const string& oid) { int ret = instance_lock.lock_shared(&store->reshard_pool_ctx, oid); if (ret == -EBUSY) { - ldout(cct, 0) << "RGWReshardLog::add failed to acquire lock on " << reshard_oid << dendl; + ldout(store->ctx(), 0) << "RGWReshardLog::add failed to acquire lock on " << reshard_oid << dendl; return 0; } @@ -217,7 +569,7 @@ int RGWReshard::block_while_resharding(const string& bucket_instance_oid, ret = cls_rgw_get_bucket_resharding(store->reshard_pool_ctx, bucket_instance_oid, &entry); if (ret < 0) { - ldout(cct, 0) << "RGWReshard::" << __func__ << " ERROR: failed to get bucket resharding :" << + ldout(store->ctx(), 0) << "RGWReshard::" << __func__ << " ERROR: failed to get bucket resharding :" << cpp_strerror(-ret)<< dendl; return ret; } @@ -232,13 +584,12 @@ int RGWReshard::block_while_resharding(const string& bucket_instance_oid, /* needed to unlock as clear resharding uses the same lock */ sleep(default_reshard_sleep_duration); } - ldout(cct, 0) << "RGWReshard::" << __func__ << " ERROR: bucket is still resharding, please retry" << dendl; + ldout(store->ctx(), 0) << "RGWReshard::" << __func__ << " ERROR: bucket is still resharding, please retry" << dendl; return -EAGAIN; } -BucketIndexLockGuard::BucketIndexLockGuard(CephContext* _cct, RGWRados* _store, - const string& bucket_instance_id, const string& _oid, const librados::IoCtx& _io_ctx) : - cct(_cct),store(_store), +BucketIndexLockGuard::BucketIndexLockGuard(CephContext *_cct, RGWRados* _store, const string& bucket_instance_id, const string& _oid, const librados::IoCtx& _io_ctx) : + cct(_cct), store(_store), l(create_bucket_index_lock_name(bucket_instance_id)), oid(_oid), io_ctx(_io_ctx),locked(false) { diff --git a/src/rgw/rgw_reshard.h b/src/rgw/rgw_reshard.h index 8f8a46df7784..c8dcda5dbece 100644 --- a/src/rgw/rgw_reshard.h +++ b/src/rgw/rgw_reshard.h @@ -37,6 +37,7 @@ protected: class RGWBucketReshard { RGWRados *store; RGWBucketInfo bucket_info; + std::map bucket_attrs; string reshard_oid; rados::cls::lock::Lock reshard_lock; @@ -45,15 +46,26 @@ class RGWBucketReshard { void unlock_bucket(); int init_resharding(const cls_rgw_reshard_entry& entry); int clear_resharding(); + + int create_new_bucket_instance(int new_num_shards, + RGWBucketInfo& new_bucket_info); + int do_reshard(int num_shards, + const RGWBucketInfo& new_bucket_info, + int max_entries, + bool verbose, + ostream *os, + Formatter *formatter); public: - RGWBucketReshard(RGWRados *_store, const RGWBucketInfo& _bucket_info); + RGWBucketReshard(RGWRados *_store, const RGWBucketInfo& _bucket_info, + const std::map& _bucket_attrs); - int reshard(); - int abort_reshard(); + int execute(int num_shards, int max_op_entries, + bool verbose = false, ostream *out = nullptr, + Formatter *formatter = nullptr); + int abort(); }; class RGWReshard { - CephContext *cct; RGWRados *store; string lock_name; int max_jobs; @@ -63,7 +75,7 @@ class RGWReshard { int unlock_bucket_index(const string& oid); public: - RGWReshard(CephContext* cct, RGWRados* _store); + RGWReshard(RGWRados* _store); int add(cls_rgw_reshard_entry& entry); int get(cls_rgw_reshard_entry& entry); int remove(cls_rgw_reshard_entry& entry);