int bi_get_instance(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const rgw_obj& obj, rgw_bucket_dir_entry *dirent, optional_yield y);
int bi_get_olh(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const rgw_obj& obj, rgw_bucket_olh_entry *olh, optional_yield y);
int bi_get(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const rgw_obj& obj, BIIndexType index_type, rgw_cls_bi_entry *entry, optional_yield y);
- int bi_get_vals(BucketShard& bs, std::set<std::string> log_entries_wanted, std::list<rgw_cls_bi_entry> *entries, optional_yield y);
+ int bi_get_vals(BucketShard& bs, std::set<std::string>& log_entries_wanted, std::list<rgw_cls_bi_entry> *entries, optional_yield y);
void bi_put(librados::ObjectWriteOperation& op, BucketShard& bs, rgw_cls_bi_entry& entry, optional_yield y);
int bi_put(BucketShard& bs, rgw_cls_bi_entry& entry, optional_yield y);
int bi_put(const DoutPrefixProvider *dpp, rgw_bucket& bucket, rgw_obj& obj, rgw_cls_bi_entry& entry, optional_yield y);
return 0;
}
- int flush(bool process_log = false) {
+ int flush(bool process_log = false, RGWBucketReshard *br = nullptr,
+ const DoutPrefixProvider *dpp = nullptr) {
if (entries.size() == 0) {
return 0;
}
}
entries.clear();
stats.clear();
+
+ if (br != nullptr) {
+ ret = br->renew_lock_if_needed(dpp);
+ if (ret < 0) {
+ return ret;
+ }
+ }
return 0;
}
return 0;
}
- int finish(bool process_log = false) {
+ int finish(bool process_log = false, RGWBucketReshard *br = nullptr,
+ const DoutPrefixProvider *dpp = nullptr) {
int ret = 0;
for (auto& shard : target_shards) {
- int r = shard.flush(process_log);
+ int r = shard.flush(process_log, br, dpp);
if (r < 0) {
derr << "ERROR: target_shards[" << shard.get_shard_id() << "].flush() returned error: " << cpp_strerror(-r) << dendl;
ret = r;
return ret;
}
- if (bucket_info.layout.resharding != rgw::BucketReshardState::InProgress ||
+ if (bucket_info.layout.resharding != rgw::BucketReshardState::InProgress &&
bucket_info.layout.resharding != rgw::BucketReshardState::InLogrecord) {
ldpp_dout(dpp, -1) << "ERROR: bucket is not resharding" << dendl;
ret = -EINVAL;
return 0;
}
+int RGWBucketReshard::renew_lock_if_needed(const DoutPrefixProvider *dpp) {
+ int ret = 0;
+ Clock::time_point now = Clock::now();
+ if (reshard_lock.should_renew(now)) {
+ // assume outer locks have timespans at least the size of ours, so
+ // can call inside conditional
+ if (outer_reshard_lock) {
+ ret = outer_reshard_lock->renew(now);
+ if (ret < 0) {
+ return ret;
+ }
+ }
+ ret = reshard_lock.renew(now);
+ if (ret < 0) {
+ ldpp_dout(dpp, -1) << "Error renewing bucket lock: " << ret << dendl;
+ return ret;
+ }
+ }
+ return 0;
+}
+
+int RGWBucketReshard::calc_target_shard(const RGWBucketInfo& bucket_info, const rgw_obj_key& key,
+ int& shard, const DoutPrefixProvider *dpp) {
+ int target_shard_id, ret;
+
+ rgw_obj obj(bucket_info.bucket, key);
+ RGWMPObj mp;
+ if (key.ns == RGW_OBJ_NS_MULTIPART && mp.from_meta(key.name)) {
+ // place the multipart .meta object on the same shard as its head object
+ obj.index_hash_source = mp.get_key();
+ }
+ ret = store->getRados()->get_target_shard_id(bucket_info.layout.target_index->layout.normal,
+ obj.get_hash_object(), &target_shard_id);
+ if (ret < 0) {
+ ldpp_dout(dpp, -1) << "ERROR: get_target_shard_id() returned ret=" << ret << dendl;
+ return ret;
+ }
+ shard = (target_shard_id > 0 ? target_shard_id : 0);
+
+ return 0;
+}
+
int RGWBucketReshard::reshard_process(const rgw::bucket_index_layout_generation& current,
- int& max_op_entries,
- BucketReshardManager& target_shards_mgr,
- bool verbose_json_out,
- ostream *out,
- Formatter *formatter, rgw::BucketReshardState reshard_stage,
- const DoutPrefixProvider *dpp, optional_yield y)
+ int& max_op_entries,
+ BucketReshardManager& target_shards_mgr,
+ bool verbose_json_out,
+ ostream *out,
+ Formatter *formatter, rgw::BucketReshardState reshard_stage,
+ const DoutPrefixProvider *dpp, optional_yield y)
{
list<rgw_cls_bi_entry> entries;
uint64_t stage_entries = 0;
stage.append(":");
if (!verbose_json_out && out) {
+ (*out) << "start time: " << real_clock::now() << std::endl;
(*out) << stage;
}
marker = entry.idx;
- int target_shard_id;
cls_rgw_obj_key cls_key;
RGWObjCategory category;
rgw_bucket_category_stats stats;
ldpp_dout(dpp, 10) << "Dropping entry with empty name, idx=" << marker << dendl;
continue;
}
- rgw_obj obj(bucket_info.bucket, key);
- RGWMPObj mp;
- if (key.ns == RGW_OBJ_NS_MULTIPART && mp.from_meta(key.name)) {
- // place the multipart .meta object on the same shard as its head object
- obj.index_hash_source = mp.get_key();
- }
- ret = store->getRados()->get_target_shard_id(bucket_info.layout.target_index->layout.normal,
- obj.get_hash_object(), &target_shard_id);
+
+ int shard_index;
+ ret = calc_target_shard(bucket_info, key, shard_index, dpp);
if (ret < 0) {
- ldpp_dout(dpp, -1) << "ERROR: get_target_shard_id() returned ret=" << ret << dendl;
return ret;
}
- int shard_index = (target_shard_id > 0 ? target_shard_id : 0);
-
ret = target_shards_mgr.add_entry(shard_index, entry, account,
category, stats, process_log);
if (ret < 0) {
return ret;
}
- Clock::time_point now = Clock::now();
- if (reshard_lock.should_renew(now)) {
- // assume outer locks have timespans at least the size of ours, so
- // can call inside conditional
- if (outer_reshard_lock) {
- ret = outer_reshard_lock->renew(now);
- if (ret < 0) {
- return ret;
- }
- }
- ret = reshard_lock.renew(now);
- if (ret < 0) {
- ldpp_dout(dpp, -1) << "Error renewing bucket lock: " << ret << dendl;
- return ret;
- }
+ ret = renew_lock_if_needed(dpp);
+ if (ret < 0) {
+ return ret;
}
+
if (verbose_json_out) {
formatter->close_section();
formatter->flush(*out);
formatter->flush(*out);
} else if (out) {
(*out) << " " << stage_entries << std::endl;
+ (*out) << "end time: " << real_clock::now() << std::endl;
}
- int ret = target_shards_mgr.finish(process_log);
+ int ret = target_shards_mgr.finish(process_log, this, dpp);
if (ret < 0) {
ldpp_dout(dpp, -1) << "ERROR: failed to reshard: " << ret << dendl;
return -EIO;
}
+
return 0;
}
// using an initializer_list as an array in contiguous memory
// allocated in at once
static const std::initializer_list<uint16_t> reshard_primes;
+
+ int calc_target_shard(const RGWBucketInfo& bucket_info, const rgw_obj_key& key,
+ int& shard, const DoutPrefixProvider *dpp);
int reshard_process(const rgw::bucket_index_layout_generation& current,
int& max_entries,
BucketReshardManager& target_shards_mgr,
std::ostream *out,
Formatter *formatter, rgw::BucketReshardState reshard_stage,
const DoutPrefixProvider *dpp, optional_yield y);
+
int do_reshard(const rgw::bucket_index_layout_generation& current,
const rgw::bucket_index_layout_generation& target,
int max_entries, bool support_logrecord,
RGWReshard *reshard_log = nullptr);
int get_status(const DoutPrefixProvider *dpp, std::list<cls_rgw_bucket_instance_entry> *status);
int cancel(const DoutPrefixProvider* dpp, optional_yield y);
+ int renew_lock_if_needed(const DoutPrefixProvider *dpp);
static int clear_resharding(rgw::sal::RadosStore* store,
RGWBucketInfo& bucket_info,