RGWDataSyncEnv *sync_env;
boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr;
- std::optional<rgw_bucket_shard> target_bs;
- std::optional<rgw_bucket_shard> source_bs;
-
- std::optional<rgw_bucket> target_bucket;
- std::optional<rgw_bucket> source_bucket;
-
rgw_sync_pipe_info_set pipes;
rgw_sync_pipe_info_set::iterator siter;
RGWRESTConn *conn{nullptr};
rgw_zone_id last_zone;
- int source_num_shards{0};
- int target_num_shards{0};
-
- int num_shards{0};
- int cur_shard{0};
- bool again = false;
std::optional<uint64_t> gen;
rgw_bucket_index_marker_info marker_info;
BucketIndexShardsManager marker_mgr;
public:
RGWRunBucketSourcesSyncCR(RGWDataSyncCtx *_sc,
boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr,
- std::optional<rgw_bucket_shard> _target_bs,
- std::optional<rgw_bucket_shard> _source_bs,
+ const rgw_bucket_shard& source_bs,
const RGWSyncTraceNodeRef& _tn_parent,
std::optional<uint64_t> gen,
ceph::real_time* progress);
ldout(cct, 4) << "starting sync on " << bucket_shard_str{state->key}
<< ' ' << *state->obligation << dendl;
yield call(new RGWRunBucketSourcesSyncCR(sc, lease_cr,
- std::nullopt, /* target_bs */
state->key, tn,
state->obligation->gen,
&progress));
RGWRunBucketSourcesSyncCR::RGWRunBucketSourcesSyncCR(RGWDataSyncCtx *_sc,
boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr,
- std::optional<rgw_bucket_shard> _target_bs,
- std::optional<rgw_bucket_shard> _source_bs,
+ const rgw_bucket_shard& source_bs,
const RGWSyncTraceNodeRef& _tn_parent,
std::optional<uint64_t> gen,
ceph::real_time* progress)
: RGWCoroutine(_sc->env->cct), sc(_sc), sync_env(_sc->env),
- lease_cr(std::move(lease_cr)), target_bs(_target_bs), source_bs(_source_bs),
+ lease_cr(std::move(lease_cr)),
tn(sync_env->sync_tracer->add_node(
_tn_parent, "bucket_sync_sources",
- SSTR( "target=" << target_bucket.value_or(rgw_bucket()) <<
- ":source_bucket=" << source_bucket.value_or(rgw_bucket()) <<
- ":source_zone=" << sc->source_zone))),
+ SSTR( "source=" << source_bs << ":source_zone=" << sc->source_zone))),
progress(progress),
gen(gen)
{
- if (target_bs) {
- target_bucket = target_bs->bucket;
- }
- if (source_bs) {
- source_bucket = source_bs->bucket;
- }
+ sync_pair.source_bs = source_bs;
}
int RGWRunBucketSourcesSyncCR::operate(const DoutPrefixProvider *dpp)
{
reenter(this) {
- yield call(new RGWGetBucketPeersCR(sync_env, target_bucket, sc->source_zone, source_bucket, &pipes, tn));
+ yield call(new RGWGetBucketPeersCR(sync_env, std::nullopt, sc->source_zone,
+ sync_pair.source_bs.bucket, &pipes, tn));
if (retcode < 0 && retcode != -ENOENT) {
tn->log(0, SSTR("ERROR: failed to read sync status for bucket. error: " << retcode));
return set_cr_error(retcode);
}
- ldpp_dout(dpp, 20) << __func__ << "(): requested source_bs=" << source_bs << " target_bs=" << target_bs << dendl;
+ ldpp_dout(dpp, 20) << __func__ << "(): requested source_bs=" << sync_pair.source_bs << dendl;
if (pipes.empty()) {
ldpp_dout(dpp, 20) << __func__ << "(): no relevant sync pipes found" << dendl;
}
for (siter = pipes.begin(); siter != pipes.end(); ++siter) {
- {
- ldpp_dout(dpp, 20) << __func__ << "(): sync pipe=" << *siter << dendl;
- yield call(new RGWReadRemoteBucketIndexLogInfoCR(
- sc, siter->source.get_bucket_info().bucket, &marker_info));
- if (retcode < 0) {
- tn->log(0, SSTR("ERROR: failed to fetch markers for bucket: "
- << siter->source.get_bucket_info().bucket));
- return set_cr_error(retcode);
- }
- retcode = marker_mgr.from_string(marker_info.max_marker, -1);
- if (retcode < 0) {
- ldpp_dout(dpp, 0) << "ERROR: failed to decode markers for bucket: "
- << siter->source.get_bucket_info().bucket << dendl;
- return set_cr_error(retcode);
- }
- source_num_shards = marker_mgr.get().size();
+ ldpp_dout(dpp, 20) << __func__ << "(): sync pipe=" << *siter << dendl;
- target_num_shards = siter->target.get_bucket_info().layout.current_index.layout.normal.num_shards;
- if (source_bs) {
- sync_pair.source_bs = *source_bs;
- } else {
- sync_pair.source_bs.bucket = siter->source.get_bucket();
- }
- sync_pair.dest_bucket = siter->target.get_bucket();
+ sync_pair.dest_bucket = siter->target.get_bucket();
+ sync_pair.handler = siter->handler;
- sync_pair.handler = siter->handler;
+ ldpp_dout(dpp, 20) << __func__ << "(): sync_pair=" << sync_pair << dendl;
- if (sync_pair.source_bs.shard_id >= 0) {
- num_shards = 1;
- cur_shard = sync_pair.source_bs.shard_id;
- } else {
- num_shards = std::max<int>(1, source_num_shards);
- cur_shard = std::min<int>(0, source_num_shards);
- }
- }
-
- ldpp_dout(dpp, 20) << __func__ << "(): num shards=" << num_shards << " cur_shard=" << cur_shard << dendl;
+ cur_progress = (progress ? &shard_progress[prealloc_stack_id()] : nullptr);
- for (; num_shards > 0; --num_shards, ++cur_shard) {
- /*
- * use a negatvie shard_id for backward compatibility,
- * this affects the crafted status oid
- */
- sync_pair.source_bs.shard_id = (source_num_shards > 0 ? cur_shard : -1);
-
- ldpp_dout(dpp, 20) << __func__ << "(): sync_pair=" << sync_pair << dendl;
-
- cur_progress = (progress ? &shard_progress[prealloc_stack_id()] : nullptr);
-
- yield_spawn_window(sync_bucket_shard_cr(sc, lease_cr, sync_pair,
- gen, tn, cur_progress),
- BUCKET_SYNC_SPAWN_WINDOW,
- [&](uint64_t stack_id, int ret) {
- handle_complete_stack(stack_id);
- if (ret < 0) {
- tn->log(10, SSTR("ERROR: a sync operation returned error: " << ret));
- }
- return ret;
- });
- }
+ yield_spawn_window(sync_bucket_shard_cr(sc, lease_cr, sync_pair,
+ gen, tn, cur_progress),
+ BUCKET_SYNC_SPAWN_WINDOW,
+ [&](uint64_t stack_id, int ret) {
+ handle_complete_stack(stack_id);
+ if (ret < 0) {
+ tn->log(10, SSTR("ERROR: a sync operation returned error: " << ret));
+ }
+ return ret;
+ });
}
drain_all_cb([&](uint64_t stack_id, int ret) {
handle_complete_stack(stack_id);