From 6bf0a9e2501089c2cd0c10ec61a84d9a5fa10f74 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Tue, 6 Oct 2020 17:59:29 -0400 Subject: [PATCH] rgw: split SyncBucket from SyncBucketShard Signed-off-by: Casey Bodley --- src/rgw/rgw_data_sync.cc | 146 +++++++++++++++++++++++++++++---------- 1 file changed, 111 insertions(+), 35 deletions(-) diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index cc045a6777ee..292b7a5a1271 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -4891,13 +4891,12 @@ class RGWSyncBucketShardCR : public RGWCoroutine { RGWDataSyncEnv *sync_env; boost::intrusive_ptr lease_cr; rgw_bucket_sync_pair_info sync_pair; - rgw_bucket_sync_pipe sync_pipe; - rgw_bucket_shard_sync_info sync_status; - RGWMetaSyncEnv meta_sync_env; - RGWObjVersionTracker objv_tracker; + rgw_bucket_sync_pipe& sync_pipe; ceph::real_time* progress; const std::string status_oid; + rgw_bucket_shard_sync_info sync_status; + RGWObjVersionTracker objv_tracker; RGWSyncTraceNodeRef tn; @@ -4905,27 +4904,18 @@ public: RGWSyncBucketShardCR(RGWDataSyncCtx *_sc, boost::intrusive_ptr lease_cr, const rgw_bucket_sync_pair_info& _sync_pair, - const RGWSyncTraceNodeRef& _tn_parent, + rgw_bucket_sync_pipe& sync_pipe, + const RGWSyncTraceNodeRef& tn, ceph::real_time* progress) : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), - lease_cr(std::move(lease_cr)), sync_pair(_sync_pair), progress(progress), + lease_cr(std::move(lease_cr)), sync_pair(_sync_pair), sync_pipe(sync_pipe), progress(progress), status_oid(RGWBucketPipeSyncStatusManager::inc_status_oid(sc->source_zone, sync_pair)), - tn(sync_env->sync_tracer->add_node(_tn_parent, "bucket", - SSTR(bucket_shard_str{_sync_pair.dest_bs} << "<-" << bucket_shard_str{_sync_pair.source_bs} ))) { + tn(tn) { } int operate(const DoutPrefixProvider *dpp) override; }; -static RGWCoroutine* sync_bucket_shard_cr(RGWDataSyncCtx* sc, - boost::intrusive_ptr lease, - const rgw_bucket_sync_pair_info& sync_pair, - const RGWSyncTraceNodeRef& tn, - ceph::real_time* progress) -{ - return new RGWSyncBucketShardCR(sc, std::move(lease), sync_pair, tn, progress); -} - int RGWSyncBucketShardCR::operate(const DoutPrefixProvider *dpp) { reenter(this) { @@ -4938,24 +4928,6 @@ int RGWSyncBucketShardCR::operate(const DoutPrefixProvider *dpp) tn->log(20, SSTR("sync status for source bucket: " << sync_status.state)); - yield call(new RGWSyncGetBucketInfoCR(sync_env, sync_pair.source_bs.bucket, &sync_pipe.source_bucket_info, - &sync_pipe.source_bucket_attrs, tn)); - if (retcode < 0) { - tn->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str{sync_pair.source_bs.bucket})); - drain_all(); - return set_cr_error(retcode); - } - - yield call(new RGWSyncGetBucketInfoCR(sync_env, sync_pair.dest_bs.bucket, &sync_pipe.dest_bucket_info, - &sync_pipe.dest_bucket_attrs, tn)); - if (retcode < 0) { - tn->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str{sync_pair.dest_bs.bucket})); - drain_all(); - return set_cr_error(retcode); - } - - sync_pipe.info = sync_pair; - do { if (sync_status.state == rgw_bucket_shard_sync_info::StateInit || sync_status.state == rgw_bucket_shard_sync_info::StateStopped) { @@ -5007,6 +4979,110 @@ int RGWSyncBucketShardCR::operate(const DoutPrefixProvider *dpp) return 0; } +class RGWSyncBucketCR : public RGWCoroutine { + RGWDataSyncCtx *sc; + RGWDataSyncEnv *env; + boost::intrusive_ptr lease_cr; + rgw_bucket_sync_pair_info sync_pair; + rgw_bucket_sync_pipe sync_pipe; + ceph::real_time* progress; + + const rgw_raw_obj status_obj; + rgw_bucket_sync_status bucket_status; + RGWObjVersionTracker objv; + + RGWSyncTraceNodeRef tn; + +public: + RGWSyncBucketCR(RGWDataSyncCtx *_sc, + boost::intrusive_ptr lease_cr, + const rgw_bucket_sync_pair_info& _sync_pair, + const RGWSyncTraceNodeRef& _tn_parent, + ceph::real_time* progress) + : RGWCoroutine(_sc->cct), sc(_sc), env(_sc->env), + lease_cr(std::move(lease_cr)), sync_pair(_sync_pair), progress(progress), + status_obj(env->svc->zone->get_zone_params().log_pool, + RGWBucketPipeSyncStatusManager::full_status_oid(sc->source_zone, + sync_pair.source_bs.bucket, + sync_pair.dest_bs.bucket)), + tn(env->sync_tracer->add_node(_tn_parent, "bucket", + SSTR(bucket_shard_str{_sync_pair.dest_bs} << "<-" << bucket_shard_str{_sync_pair.source_bs} ))) { + } + + int operate(const DoutPrefixProvider *dpp) override; +}; + +static RGWCoroutine* sync_bucket_shard_cr(RGWDataSyncCtx* sc, + boost::intrusive_ptr lease, + const rgw_bucket_sync_pair_info& sync_pair, + const RGWSyncTraceNodeRef& tn, + ceph::real_time* progress) +{ + return new RGWSyncBucketCR(sc, std::move(lease), sync_pair, tn, progress); +} + +int RGWSyncBucketCR::operate(const DoutPrefixProvider *dpp) +{ + reenter(this) { + // read source/destination bucket info + yield call(new RGWSyncGetBucketInfoCR(env, sync_pair.source_bs.bucket, &sync_pipe.source_bucket_info, + &sync_pipe.source_bucket_attrs, tn)); + if (retcode < 0) { + tn->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str{sync_pair.source_bs.bucket})); + return set_cr_error(retcode); + } + + yield call(new RGWSyncGetBucketInfoCR(env, sync_pair.dest_bs.bucket, &sync_pipe.dest_bucket_info, + &sync_pipe.dest_bucket_attrs, tn)); + if (retcode < 0) { + tn->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str{sync_pair.source_bs.bucket})); + return set_cr_error(retcode); + } + + sync_pipe.info = sync_pair; + + // read bucket sync status + using CR = RGWSimpleRadosReadCR; + yield call(new CR(dpp, env->async_rados, env->svc->sysobj, + status_obj, &bucket_status, true, &objv)); + if (retcode < 0) { + return set_cr_error(retcode); + } + + if (bucket_status.state == BucketSyncState::Init) { + // init sync status + yield { + // use exclusive create if it didn't exist. if we lose the race to + // create it, we'll fail with EEXIST and RGWSyncBucketCR() will come + // back later and read the new status + const bool exclusive = objv.version_for_check() == nullptr; + int num_shards = sync_pipe.dest_bucket_info.layout.current_index.layout.normal.num_shards; + call(new InitBucketFullSyncStatusCR(sc, sync_pair, status_obj, + bucket_status, objv, num_shards, + exclusive)); + } + if (retcode < 0) { + return set_cr_error(retcode); + } + } + + if (bucket_status.state == BucketSyncState::Full) { + // TODO: full sync + } + + if (bucket_status.state == BucketSyncState::Incremental) { + yield call(new RGWSyncBucketShardCR(sc, lease_cr, sync_pair, + sync_pipe, tn, progress)); + if (retcode < 0) { + return set_cr_error(retcode); + } + } + return set_cr_done(); + } + + return 0; +} + RGWCoroutine *RGWRemoteBucketManager::run_sync_cr(int num) { if ((size_t)num >= sync_pairs.size()) { -- 2.47.3