public:
RGWReadRemoteBucketIndexLogInfoCR(RGWDataSyncCtx *_sc,
- const rgw_bucket& bucket,
- rgw_bucket_index_marker_info *_info)
+ const rgw_bucket& bucket,
+ rgw_bucket_index_marker_info *_info)
: RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
instance_key(bucket.get_key()), info(_info) {}
int cur_shard{0};
bool again = false;
std::optional<uint64_t> gen;
+ rgw_bucket_index_marker_info marker_info;
+ BucketIndexShardsManager marker_mgr;
public:
RGWRunBucketSourcesSyncCR(RGWDataSyncCtx *_sc,
for (siter = pipes.begin(); siter != pipes.end(); ++siter) {
{
ldpp_dout(dpp, 20) << __func__ << "(): sync pipe=" << *siter << dendl;
+ yield call(new RGWReadRemoteBucketIndexLogInfoCR(
+ sc, siter->source.get_bucket_info().bucket, &marker_info));
+ if (retcode < 0) {
+ tn->log(0, SSTR("ERROR: failed to fetch markers for bucket: "
+ << siter->source.get_bucket_info().bucket));
+ return set_cr_error(retcode);
+ }
+ retcode = marker_mgr.from_string(marker_info.max_marker, -1);
+ if (retcode < 0) {
+ ldpp_dout(dpp, 0) << "ERROR: failed to decode markers for bucket: "
+ << siter->source.get_bucket_info().bucket << dendl;
+ return set_cr_error(retcode);
+ }
+ source_num_shards = marker_mgr.get().size();
- source_num_shards = siter->source.get_bucket_info().layout.current_index.layout.normal.num_shards;
target_num_shards = siter->target.get_bucket_info().layout.current_index.layout.normal.num_shards;
if (source_bs) {
sync_pair.source_bs = *source_bs;