From dc02f111807383f17e20e164ad77af5a7df3b0e2 Mon Sep 17 00:00:00 2001 From: Shilpa Jagannath Date: Tue, 6 Apr 2021 01:45:45 +0530 Subject: [PATCH] rgw: update bucket sync status after bucket shards finishes current gen Signed-off-by: Shilpa Jagannath --- src/rgw/rgw_data_sync.cc | 119 ++++++++++++++++++++++++++++++++++++--- 1 file changed, 112 insertions(+), 7 deletions(-) diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index b1117567c5415..b51e7b939783a 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -3183,6 +3183,7 @@ public: } } + status.shards_done_with_gen.resize(num_shards); status.incremental_gen = info.latest_gen; ldout(cct, 20) << "writing bucket sync state=" << status.state << dendl; @@ -4137,12 +4138,88 @@ static bool has_olh_epoch(RGWModifyOp op) { return op == CLS_RGW_OP_LINK_OLH || op == CLS_RGW_OP_UNLINK_INSTANCE; } +class RGWBucketShardIsDoneCR : public RGWCoroutine { + RGWDataSyncCtx *sc; + RGWDataSyncEnv *sync_env; + rgw_bucket_sync_status bucket_status; + const rgw_raw_obj& bucket_status_obj; + const int shard_id; + RGWObjVersionTracker objv_tracker; + const next_bilog_result& next_log; + const uint64_t generation; + +public: + RGWBucketShardIsDoneCR(RGWDataSyncCtx *_sc, const rgw_raw_obj& _bucket_status_obj, + int _shard_id, const next_bilog_result& _next_log, const uint64_t _gen) + : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), + bucket_status_obj(_bucket_status_obj), + shard_id(_shard_id), next_log(_next_log), generation(_gen) {} + + int operate(const DoutPrefixProvider* dpp) override + { + reenter(this) { + do { + // read bucket sync status + objv_tracker.clear(); + using ReadCR = RGWSimpleRadosReadCR; + yield call(new ReadCR(dpp, sync_env->async_rados, sync_env->svc->sysobj, + bucket_status_obj, &bucket_status, false, &objv_tracker)); + if (retcode < 0) { + ldpp_dout(dpp, 20) << "failed to read bucket shard status: " + << cpp_strerror(retcode) << dendl; + return set_cr_error(retcode); + } + + if (bucket_status.state != BucketSyncState::Incremental) { + // exit with success to avoid stale shard being + // retried in error repo if we lost a race + ldpp_dout(dpp, 20) << "RGWBucketShardIsDoneCR found sync state = " << bucket_status.state << dendl; + return set_cr_done(); + } + + if (bucket_status.incremental_gen != generation) { + // exit with success to avoid stale shard being + // retried in error repo if we lost a race + ldpp_dout(dpp, 20) << "RGWBucketShardIsDoneCR expected gen: " << generation + << ", got: " << bucket_status.incremental_gen << dendl; + return set_cr_done(); + } + + yield { + // update bucket_status after a shard is done with current gen + auto& done = bucket_status.shards_done_with_gen; + done[shard_id] = true; + + // increment gen if all shards are already done with current gen + if (std::all_of(done.begin(), done.end(), + [] (const bool done){return done; } )) { + bucket_status.incremental_gen = next_log.generation; + done.clear(); + done.resize(next_log.num_shards, false); + } + using WriteCR = RGWSimpleRadosWriteCR; + call(new WriteCR(dpp, sync_env->async_rados, sync_env->svc->sysobj, + bucket_status_obj, bucket_status, &objv_tracker, false)); + } + if (retcode < 0 && retcode != -ECANCELED) { + ldpp_dout(dpp, 20) << "failed to write bucket sync status: " << cpp_strerror(retcode) << dendl; + return set_cr_error(retcode); + } else if (retcode >= 0) { + return set_cr_done(); + } + } while (retcode == -ECANCELED); + } + return 0; + } +}; + class RGWBucketShardIncrementalSyncCR : public RGWCoroutine { RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; rgw_bucket_sync_pipe& sync_pipe; RGWBucketSyncFlowManager::pipe_rules_ref rules; rgw_bucket_shard& bs; + const rgw_raw_obj& bucket_status_obj; boost::intrusive_ptr lease_cr; bilog_list_result extended_result; list list_result; @@ -4171,7 +4248,8 @@ class RGWBucketShardIncrementalSyncCR : public RGWCoroutine { public: RGWBucketShardIncrementalSyncCR(RGWDataSyncCtx *_sc, rgw_bucket_sync_pipe& _sync_pipe, - const std::string& status_oid, + const std::string& shard_status_oid, + const rgw_raw_obj& _bucket_status_obj, boost::intrusive_ptr lease_cr, rgw_bucket_shard_sync_info& sync_info, RGWSyncTraceNodeRef& _tn_parent, @@ -4179,11 +4257,11 @@ public: ceph::real_time* stable_timestamp) : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), sync_pipe(_sync_pipe), bs(_sync_pipe.info.source_bs), - lease_cr(std::move(lease_cr)), sync_info(sync_info), - zone_id(sync_env->svc->zone->get_zone().id), + bucket_status_obj(_bucket_status_obj), lease_cr(std::move(lease_cr)), + sync_info(sync_info), zone_id(sync_env->svc->zone->get_zone().id), tn(sync_env->sync_tracer->add_node(_tn_parent, "inc_sync", SSTR(bucket_shard_str{bs}))), - marker_tracker(sc, status_oid, sync_info.inc_marker, tn, + marker_tracker(sc, shard_status_oid, sync_info.inc_marker, tn, objv_tracker, stable_timestamp) { set_description() << "bucket shard incremental sync bucket=" @@ -4402,6 +4480,7 @@ int RGWBucketShardIncrementalSyncCR::operate(const DoutPrefixProvider *dpp) return 0; }); } + } while (!list_result.empty() && sync_status == 0 && !syncstopped); drain_all_cb([&](uint64_t stack_id, int ret) { @@ -4431,6 +4510,17 @@ int RGWBucketShardIncrementalSyncCR::operate(const DoutPrefixProvider *dpp) tn->log(10, SSTR("backing out with sync_status=" << sync_status)); return set_cr_error(sync_status); } + + if (!truncated && extended_result.next_log) { + yield call(new RGWBucketShardIsDoneCR(sc, bucket_status_obj, bs.shard_id, *extended_result.next_log, generation)); + if (retcode < 0) { + ldout(cct, 20) << "failed to update bucket sync status: " + << cpp_strerror(retcode) << dendl; + drain_all(); + return set_cr_error(retcode); + } + } + return set_cr_done(); } return 0; @@ -4979,7 +5069,8 @@ class RGWSyncBucketShardCR : public RGWCoroutine { BucketSyncState& bucket_state; ceph::real_time* progress; - const std::string status_oid; + const std::string shard_status_oid; + const rgw_raw_obj bucket_status_obj; rgw_bucket_shard_sync_info sync_status; RGWObjVersionTracker objv_tracker; @@ -4996,7 +5087,11 @@ public: : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), lease_cr(std::move(lease_cr)), sync_pair(_sync_pair), sync_pipe(sync_pipe), bucket_state(bucket_state), progress(progress), - status_oid(RGWBucketPipeSyncStatusManager::inc_status_oid(sc->source_zone, sync_pair)), + shard_status_oid(RGWBucketPipeSyncStatusManager::inc_status_oid(sc->source_zone, sync_pair)), + bucket_status_obj(sc->env->svc->zone->get_zone_params().log_pool, + RGWBucketPipeSyncStatusManager::full_status_oid(sc->source_zone, + sync_pair.source_bs.bucket, + sync_pair.dest_bs.bucket)), tn(tn) { } @@ -5019,7 +5114,7 @@ int RGWSyncBucketShardCR::operate(const DoutPrefixProvider *dpp) } yield call(new RGWBucketShardIncrementalSyncCR(sc, sync_pipe, - status_oid, lease_cr, + shard_status_oid, bucket_status_obj, lease_cr, sync_status, tn, objv_tracker, progress)); if (retcode < 0) { @@ -5213,6 +5308,16 @@ int RGWSyncBucketCR::operate(const DoutPrefixProvider *dpp) } } + if (sync_pair.source_bs.shard_id >= bucket_status.shards_done_with_gen.size()) { + tn->log(1, SSTR("bucket shard " << sync_pair.source_bs << " index out of bounds")); + return set_cr_done(); // return success so we don't retry + } + if (bucket_status.shards_done_with_gen[sync_pair.source_bs.shard_id]) { + tn->log(10, SSTR("bucket shard " << sync_pair.source_bs << " of gen " << + gen << " already synced.")); + return set_cr_done(); + } + yield call(new RGWSyncBucketShardCR(sc, data_lease_cr, sync_pair, sync_pipe, bucket_status.state, tn, progress)); -- 2.39.5