From 425d74718c07132af186f04074c8c7cb9d53ea5d Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Tue, 10 Nov 2020 15:00:40 -0500 Subject: [PATCH] rgw: RGWSyncBucketCR holds lease over Init state too Signed-off-by: Casey Bodley --- src/rgw/rgw_data_sync.cc | 86 ++++++++++++++++++++++++++++------------ 1 file changed, 60 insertions(+), 26 deletions(-) diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index 8a23b363a8b72..5ab040b999e82 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -4956,6 +4956,8 @@ class RGWSyncBucketCR : public RGWCoroutine { rgw_bucket_sync_pipe sync_pipe; ceph::real_time* progress; + const std::string lock_name = "bucket sync"; + const uint32_t lock_duration; const rgw_raw_obj status_obj; rgw_bucket_sync_status bucket_status; RGWObjVersionTracker objv; @@ -4971,6 +4973,7 @@ public: ceph::real_time* progress) : RGWCoroutine(_sc->cct), sc(_sc), env(_sc->env), data_lease_cr(std::move(lease_cr)), sync_pair(_sync_pair), progress(progress), + lock_duration(cct->_conf->rgw_sync_lease_period), status_obj(env->svc->zone->get_zone_params().log_pool, RGWBucketPipeSyncStatusManager::full_status_oid(sc->source_zone, sync_pair.source_bs.bucket, @@ -5012,9 +5015,21 @@ int RGWSyncBucketCR::operate() sync_pipe.info = sync_pair; // read bucket sync status - using CR = RGWSimpleRadosReadCR; - yield call(new CR(env->async_rados, env->svc->sysobj, - status_obj, &bucket_status, true, &objv)); + using ReadCR = RGWSimpleRadosReadCR; + yield call(new ReadCR(env->async_rados, env->svc->sysobj, + status_obj, &bucket_status, false, &objv)); + if (retcode == -ENOENT) { + // use exclusive create to set state=Init + objv.generate_new_write_ver(cct); + using WriteCR = RGWSimpleRadosWriteCR; + yield call(new WriteCR(env->async_rados, env->svc->sysobj, + status_obj, bucket_status, &objv, true)); + if (retcode == -EEXIST) { + // raced with another create, read its status + yield call(new ReadCR(env->async_rados, env->svc->sysobj, + status_obj, &bucket_status, false, &objv)); + } + } if (retcode < 0) { return set_cr_error(retcode); } @@ -5022,8 +5037,37 @@ int RGWSyncBucketCR::operate() do { tn->log(20, SSTR("sync status for source bucket: " << bucket_status.state)); + // if the state wasn't Incremental, take a bucket-wide lease to prevent + // different shards from duplicating the init and full sync + if (bucket_status.state != BucketSyncState::Incremental) { + assert(!bucket_lease_cr); + bucket_lease_cr.reset(new RGWContinuousLeaseCR(env->async_rados, env->store, status_obj, + lock_name, lock_duration, this)); + yield spawn(bucket_lease_cr.get(), false); + while (!bucket_lease_cr->is_locked()) { + if (bucket_lease_cr->is_done()) { + tn->log(5, "failed to take bucket lease"); + set_status("lease lock failed, early abort"); + drain_all(); + return set_cr_error(bucket_lease_cr->get_ret_status()); + } + tn->log(5, "waiting on bucket lease"); + yield set_sleeping(true); + } + // reread the status after acquiring the lock + yield call(new ReadCR(env->async_rados, env->svc->sysobj, + status_obj, &bucket_status, false, &objv)); + if (retcode < 0) { + bucket_lease_cr->go_down(); + drain_all(); + bucket_lease_cr.reset(); + return set_cr_error(retcode); + } + } + if (bucket_status.state == BucketSyncState::Init || bucket_status.state == BucketSyncState::Stopped) { + assert(bucket_lease_cr); // init sync status yield { init_check_compat = objv.read_version.ver <= 1; // newly-created @@ -5033,43 +5077,33 @@ int RGWSyncBucketCR::operate() init_check_compat)); } if (retcode < 0) { + bucket_lease_cr->go_down(); + drain_all(); + bucket_lease_cr.reset(); return set_cr_error(retcode); } } if (bucket_status.state == BucketSyncState::Full) { - // take a bucket-wide lease for full sync to prevent the bucket shards - // from duplicating the work - yield { - const std::string lock_name = "full sync"; - const uint32_t lock_duration = cct->_conf->rgw_sync_lease_period; - bucket_lease_cr.reset(new RGWContinuousLeaseCR(env->async_rados, env->store, status_obj, - lock_name, lock_duration, this)); - spawn(bucket_lease_cr.get(), false); - } - while (!bucket_lease_cr->is_locked()) { - if (bucket_lease_cr->is_done()) { - tn->log(5, "failed to take bucket lease"); - set_status("lease lock failed, early abort"); - drain_all(); - return set_cr_error(bucket_lease_cr->get_ret_status()); - } - tn->log(5, "waiting on bucket lease"); - yield set_sleeping(true); - } - + assert(bucket_lease_cr); yield call(new RGWBucketFullSyncCR(sc, sync_pipe, status_obj, bucket_lease_cr, bucket_status, tn, objv)); - bucket_lease_cr->go_down(); - drain_all(); - bucket_lease_cr.reset(); if (retcode < 0) { + bucket_lease_cr->go_down(); + drain_all(); + bucket_lease_cr.reset(); return set_cr_error(retcode); } } if (bucket_status.state == BucketSyncState::Incremental) { + // lease not required for incremental sync + if (bucket_lease_cr) { + bucket_lease_cr->go_down(); + drain_all(); + bucket_lease_cr.reset(); + } yield call(new RGWSyncBucketShardCR(sc, data_lease_cr, sync_pair, sync_pipe, bucket_status.state, tn, progress)); -- 2.39.5