}
#define RESHARD_SHARD_WINDOW 64
+#define RESHARD_MAX_AIO 128
class BucketReshardShard {
RGWRados *store;
RGWRados::BucketShard bs;
vector<rgw_cls_bi_entry> entries;
map<uint8_t, rgw_bucket_category_stats> stats;
+ deque<librados::AioCompletion *>& aio_completions;
+
+ int wait_next_completion() {
+ librados::AioCompletion *c = aio_completions.front();
+ aio_completions.pop_front();
+
+ c->wait_for_safe();
+
+ int ret = c->get_return_value();
+ c->release();
+
+ if (ret < 0) {
+ cerr << "ERROR: reshard rados operation failed: " << cpp_strerror(-ret) << std::endl;
+ return ret;
+ }
+
+ return 0;
+ }
+
+ int get_completion(librados::AioCompletion **c) {
+ if (aio_completions.size() >= RESHARD_MAX_AIO) {
+ int ret = wait_next_completion();
+ if (ret < 0) {
+ return ret;
+ }
+ }
+
+ *c = librados::Rados::aio_create_completion(nullptr, nullptr, nullptr);
+ aio_completions.push_back(*c);
+
+ return 0;
+ }
public:
- BucketReshardShard(RGWRados *_store, RGWBucketInfo& _bucket_info, int _num_shard) : store(_store), bucket_info(_bucket_info), bs(store) {
+ BucketReshardShard(RGWRados *_store, RGWBucketInfo& _bucket_info,
+ int _num_shard,
+ deque<librados::AioCompletion *>& _completions) : store(_store), bucket_info(_bucket_info), bs(store),
+ aio_completions(_completions) {
num_shard = (bucket_info.num_shards > 0 ? _num_shard : -1);
bs.init(bucket_info.bucket, num_shard);
}
+ int get_num_shard() {
+ return num_shard;
+ }
+
int add_entry(rgw_cls_bi_entry& entry, bool account, uint8_t category,
- rgw_bucket_category_stats entry_stats) {
+ const rgw_bucket_category_stats& entry_stats) {
entries.push_back(entry);
if (account) {
rgw_bucket_category_stats& target = stats[category];
store->bi_put(op, bs, entry);
}
cls_rgw_bucket_update_stats(op, false, stats);
- int ret = bs.index_ctx.operate(bs.bucket_obj, &op);
+
+ librados::AioCompletion *c;
+ int ret = get_completion(&c);
+ if (ret < 0) {
+ return ret;
+ }
+ ret = bs.index_ctx.aio_operate(bs.bucket_obj, c, &op);
if (ret < 0) {
std::cerr << "ERROR: failed to store entries in target bucket shard (bs=" << bs.bucket << "/" << bs.shard_id << ") error=" << cpp_strerror(-ret) << std::endl;
return ret;
stats.clear();
return 0;
}
+
+ int wait_all_aio() {
+ int ret = 0;
+ while (!aio_completions.empty()) {
+ int r = wait_next_completion();
+ if (r < 0) {
+ ret = r;
+ }
+ }
+ return ret;
+ }
+};
+
+class BucketReshardManager {
+ RGWRados *store;
+ RGWBucketInfo& target_bucket_info;
+ deque<librados::AioCompletion *> completions;
+ int num_target_shards;
+ vector<BucketReshardShard *> target_shards;
+
+public:
+ BucketReshardManager(RGWRados *_store, RGWBucketInfo& _target_bucket_info, int _num_target_shards) : store(_store), target_bucket_info(_target_bucket_info),
+ num_target_shards(_num_target_shards) {
+ target_shards.resize(num_target_shards);
+ for (int i = 0; i < num_target_shards; ++i) {
+ target_shards[i] = new BucketReshardShard(store, target_bucket_info, i, completions);
+ }
+ }
+
+ ~BucketReshardManager() {
+ for (auto& shard : target_shards) {
+ int ret = shard->wait_all_aio();
+ if (ret < 0) {
+ ldout(store->ctx(), 20) << __func__ << ": shard->wait_all_aio() returned ret=" << ret << dendl;
+ }
+ }
+ }
+
+ int add_entry(int shard_index,
+ rgw_cls_bi_entry& entry, bool account, uint8_t category,
+ const rgw_bucket_category_stats& entry_stats) {
+ int ret = target_shards[shard_index]->add_entry(entry, account, category, entry_stats);
+ if (ret < 0) {
+ cerr << "ERROR: target_shards.add_entry(" << entry.idx << ") returned error: " << cpp_strerror(-ret) << std::endl;
+ return ret;
+ }
+ return 0;
+ }
+
+ int finish() {
+ int ret = 0;
+ for (auto& shard : target_shards) {
+ int r = shard->flush();
+ if (r < 0) {
+ cerr << "ERROR: target_shards[" << shard->get_num_shard() << "].flush() returned error: " << cpp_strerror(-r) << std::endl;
+ ret = r;
+ }
+ }
+ for (auto& shard : target_shards) {
+ int r = shard->wait_all_aio();
+ if (r < 0) {
+ cerr << "ERROR: target_shards[" << shard->get_num_shard() << "].wait_all_aio() returned error: " << cpp_strerror(-r) << std::endl;
+ ret = r;
+ }
+ delete shard;
+ }
+ target_shards.clear();
+ return ret;
+ }
};
int main(int argc, char **argv)
}
int num_target_shards = (new_bucket_info.num_shards > 0 ? new_bucket_info.num_shards : 1);
- BucketReshardShard *target_shards[num_target_shards];
- for (int i = 0; i < num_target_shards; ++i) {
- target_shards[i] = new BucketReshardShard(store, new_bucket_info, i);
- }
+
+ BucketReshardManager target_shards_mgr(store, new_bucket_info, num_target_shards);
formatter->open_array_section("entries");
int shard_index = (target_shard_id > 0 ? target_shard_id : 0);
- ret = target_shards[shard_index]->add_entry(entry, account, category, stats);
+ ret = target_shards_mgr.add_entry(shard_index, entry, account, category, stats);
if (ret < 0) {
- cerr << "ERROR: target_shards.add_entry(" << key << ") returned error: " << cpp_strerror(-ret) << std::endl;
return ret;
}
}
formatter->close_section();
formatter->flush(cout);
- for (int i = 0; i < num_target_shards; ++i) {
- int ret = target_shards[i]->flush();
- if (ret < 0) {
- cerr << "ERROR: target_shards[" << i << "].flush() returned error: " << cpp_strerror(-ret) << std::endl;
- return ret;
- }
- delete target_shards[i];
+ ret = target_shards_mgr.finish();
+ if (ret < 0) {
+ cerr << "ERROR: failed to reshard" << std::endl;
+ return EIO;
}
bucket_op.set_bucket_id(new_bucket_info.bucket.bucket_id);