From 1604b521730577fc6aa5345127381b1bfc5625b7 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Mon, 26 Sep 2016 15:49:37 -0700 Subject: [PATCH] rgw_admin: use aio operations for bucket resharding also created shards manager to make things slightly cleaner Signed-off-by: Yehuda Sadeh (cherry picked from commit 97e7ee9ca213ccf4b8f537e02125bd0c4ef24103) See: http://tracker.ceph.com/issues/17556 See: https://github.com/ceph/ceph/pull/11368 Signed-off-by: Robin H. Johnson --- src/rgw/rgw_admin.cc | 141 ++++++++++++++++++++++++++++++++++++++----- 1 file changed, 125 insertions(+), 16 deletions(-) diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index 09593aa48a018..f3cbbebc77bb0 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -2011,6 +2011,7 @@ static void parse_tier_config_param(const string& s, map& out) } #define RESHARD_SHARD_WINDOW 64 +#define RESHARD_MAX_AIO 128 class BucketReshardShard { RGWRados *store; @@ -2019,15 +2020,54 @@ class BucketReshardShard { RGWRados::BucketShard bs; vector entries; map stats; + deque& aio_completions; + + int wait_next_completion() { + librados::AioCompletion *c = aio_completions.front(); + aio_completions.pop_front(); + + c->wait_for_safe(); + + int ret = c->get_return_value(); + c->release(); + + if (ret < 0) { + cerr << "ERROR: reshard rados operation failed: " << cpp_strerror(-ret) << std::endl; + return ret; + } + + return 0; + } + + int get_completion(librados::AioCompletion **c) { + if (aio_completions.size() >= RESHARD_MAX_AIO) { + int ret = wait_next_completion(); + if (ret < 0) { + return ret; + } + } + + *c = librados::Rados::aio_create_completion(nullptr, nullptr, nullptr); + aio_completions.push_back(*c); + + return 0; + } public: - BucketReshardShard(RGWRados *_store, RGWBucketInfo& _bucket_info, int _num_shard) : store(_store), bucket_info(_bucket_info), bs(store) { + BucketReshardShard(RGWRados *_store, RGWBucketInfo& _bucket_info, + int _num_shard, + deque& _completions) : store(_store), bucket_info(_bucket_info), bs(store), + aio_completions(_completions) { num_shard = (bucket_info.num_shards > 0 ? _num_shard : -1); bs.init(bucket_info.bucket, num_shard); } + int get_num_shard() { + return num_shard; + } + int add_entry(rgw_cls_bi_entry& entry, bool account, uint8_t category, - rgw_bucket_category_stats entry_stats) { + const rgw_bucket_category_stats& entry_stats) { entries.push_back(entry); if (account) { rgw_bucket_category_stats& target = stats[category]; @@ -2053,7 +2093,13 @@ public: store->bi_put(op, bs, entry); } cls_rgw_bucket_update_stats(op, false, stats); - int ret = bs.index_ctx.operate(bs.bucket_obj, &op); + + librados::AioCompletion *c; + int ret = get_completion(&c); + if (ret < 0) { + return ret; + } + ret = bs.index_ctx.aio_operate(bs.bucket_obj, c, &op); if (ret < 0) { std::cerr << "ERROR: failed to store entries in target bucket shard (bs=" << bs.bucket << "/" << bs.shard_id << ") error=" << cpp_strerror(-ret) << std::endl; return ret; @@ -2062,6 +2108,75 @@ public: stats.clear(); return 0; } + + int wait_all_aio() { + int ret = 0; + while (!aio_completions.empty()) { + int r = wait_next_completion(); + if (r < 0) { + ret = r; + } + } + return ret; + } +}; + +class BucketReshardManager { + RGWRados *store; + RGWBucketInfo& target_bucket_info; + deque completions; + int num_target_shards; + vector target_shards; + +public: + BucketReshardManager(RGWRados *_store, RGWBucketInfo& _target_bucket_info, int _num_target_shards) : store(_store), target_bucket_info(_target_bucket_info), + num_target_shards(_num_target_shards) { + target_shards.resize(num_target_shards); + for (int i = 0; i < num_target_shards; ++i) { + target_shards[i] = new BucketReshardShard(store, target_bucket_info, i, completions); + } + } + + ~BucketReshardManager() { + for (auto& shard : target_shards) { + int ret = shard->wait_all_aio(); + if (ret < 0) { + ldout(store->ctx(), 20) << __func__ << ": shard->wait_all_aio() returned ret=" << ret << dendl; + } + } + } + + int add_entry(int shard_index, + rgw_cls_bi_entry& entry, bool account, uint8_t category, + const rgw_bucket_category_stats& entry_stats) { + int ret = target_shards[shard_index]->add_entry(entry, account, category, entry_stats); + if (ret < 0) { + cerr << "ERROR: target_shards.add_entry(" << entry.idx << ") returned error: " << cpp_strerror(-ret) << std::endl; + return ret; + } + return 0; + } + + int finish() { + int ret = 0; + for (auto& shard : target_shards) { + int r = shard->flush(); + if (r < 0) { + cerr << "ERROR: target_shards[" << shard->get_num_shard() << "].flush() returned error: " << cpp_strerror(-r) << std::endl; + ret = r; + } + } + for (auto& shard : target_shards) { + int r = shard->wait_all_aio(); + if (r < 0) { + cerr << "ERROR: target_shards[" << shard->get_num_shard() << "].wait_all_aio() returned error: " << cpp_strerror(-r) << std::endl; + ret = r; + } + delete shard; + } + target_shards.clear(); + return ret; + } }; int main(int argc, char **argv) @@ -4961,10 +5076,8 @@ next: } int num_target_shards = (new_bucket_info.num_shards > 0 ? new_bucket_info.num_shards : 1); - BucketReshardShard *target_shards[num_target_shards]; - for (int i = 0; i < num_target_shards; ++i) { - target_shards[i] = new BucketReshardShard(store, new_bucket_info, i); - } + + BucketReshardManager target_shards_mgr(store, new_bucket_info, num_target_shards); formatter->open_array_section("entries"); @@ -5000,9 +5113,8 @@ next: int shard_index = (target_shard_id > 0 ? target_shard_id : 0); - ret = target_shards[shard_index]->add_entry(entry, account, category, stats); + ret = target_shards_mgr.add_entry(shard_index, entry, account, category, stats); if (ret < 0) { - cerr << "ERROR: target_shards.add_entry(" << key << ") returned error: " << cpp_strerror(-ret) << std::endl; return ret; } } @@ -5013,13 +5125,10 @@ next: formatter->close_section(); formatter->flush(cout); - for (int i = 0; i < num_target_shards; ++i) { - int ret = target_shards[i]->flush(); - if (ret < 0) { - cerr << "ERROR: target_shards[" << i << "].flush() returned error: " << cpp_strerror(-ret) << std::endl; - return ret; - } - delete target_shards[i]; + ret = target_shards_mgr.finish(); + if (ret < 0) { + cerr << "ERROR: failed to reshard" << std::endl; + return EIO; } bucket_op.set_bucket_id(new_bucket_info.bucket.bucket_id); -- 2.39.5