From: Casey Bodley Date: Tue, 6 Oct 2020 21:59:28 +0000 (-0400) Subject: rgw: add InitBucketFullSyncStatusCR X-Git-Tag: v18.0.0~787^2~180 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=06c30057f4a83bcf46a6c81255dd3690b666e8ca;p=ceph-ci.git rgw: add InitBucketFullSyncStatusCR a coroutine to initialize a bucket for full sync using a new bucket-wide sync status object Signed-off-by: Casey Bodley --- diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index 750c4ae7a3a..cc045a6777e 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -2738,15 +2738,17 @@ class RGWInitBucketShardSyncStatusCoroutine : public RGWCoroutine { rgw_bucket_shard_sync_info& status; RGWObjVersionTracker& objv_tracker; rgw_bucket_index_marker_info info; + bool exclusive; public: RGWInitBucketShardSyncStatusCoroutine(RGWDataSyncCtx *_sc, const rgw_bucket_sync_pair_info& _sync_pair, rgw_bucket_shard_sync_info& _status, - RGWObjVersionTracker& objv_tracker) + RGWObjVersionTracker& objv_tracker, + bool exclusive) : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), sync_pair(_sync_pair), sync_status_oid(RGWBucketPipeSyncStatusManager::inc_status_oid(sc->source_zone, _sync_pair)), - status(_status), objv_tracker(objv_tracker) + status(_status), objv_tracker(objv_tracker), exclusive(exclusive) {} int operate(const DoutPrefixProvider *dpp) override { @@ -2787,7 +2789,8 @@ public: if (write_status) { map attrs; status.encode_all_attrs(attrs); - call(new RGWSimpleRadosWriteAttrsCR(dpp, sync_env->async_rados, sync_env->svc->sysobj, obj, attrs, &objv_tracker)); + call(new RGWSimpleRadosWriteAttrsCR(dpp, sync_env->async_rados, sync_env->svc->sysobj, + obj, attrs, &objv_tracker, exclusive)); } else { call(new RGWRadosRemoveCR(store, obj, &objv_tracker)); } @@ -2809,11 +2812,13 @@ RGWRemoteBucketManager::RGWRemoteBucketManager(const DoutPrefixProvider *_dpp, const rgw_zone_id& _source_zone, RGWRESTConn *_conn, const RGWBucketInfo& source_bucket_info, - const rgw_bucket& dest_bucket) : dpp(_dpp), sync_env(_sync_env) + const rgw_bucket& dest_bucket) + : dpp(_dpp), sync_env(_sync_env), conn(_conn), source_zone(_source_zone), + full_status_obj(sync_env->svc->zone->get_zone_params().log_pool, + RGWBucketPipeSyncStatusManager::full_status_oid(source_zone, + source_bucket_info.bucket, + dest_bucket)) { - conn = _conn; - source_zone = _source_zone; - int num_shards = (source_bucket_info.layout.current_index.layout.normal.num_shards <= 0 ? 1 : source_bucket_info.layout.current_index.layout.normal.num_shards); @@ -2839,14 +2844,6 @@ RGWRemoteBucketManager::RGWRemoteBucketManager(const DoutPrefixProvider *_dpp, sc.init(sync_env, conn, source_zone); } -RGWCoroutine *RGWRemoteBucketManager::init_sync_status_cr(int num, RGWObjVersionTracker& objv_tracker) -{ - if ((size_t)num >= sync_pairs.size()) { - return nullptr; - } - return new RGWInitBucketShardSyncStatusCoroutine(&sc, sync_pairs[num], init_status, objv_tracker); -} - #define BUCKET_SYNC_ATTR_PREFIX RGW_ATTR_PREFIX "bucket-sync." template @@ -2945,6 +2942,234 @@ int RGWReadBucketPipeSyncStatusCoroutine::operate(const DoutPrefixProvider *dpp) return 0; } +// wrap ReadSyncStatus and set a flag if it's not in incremental +class CheckBucketShardStatusIsIncremental : public RGWReadBucketPipeSyncStatusCoroutine { + bool* result; + rgw_bucket_shard_sync_info status; + public: + CheckBucketShardStatusIsIncremental(RGWDataSyncCtx* sc, + const rgw_bucket_sync_pair_info& sync_pair, + bool* result) + : RGWReadBucketPipeSyncStatusCoroutine(sc, sync_pair, &status, nullptr), + result(result) + {} + + int operate(const DoutPrefixProvider *dpp) override { + int r = RGWReadBucketPipeSyncStatusCoroutine::operate(dpp); + if (state == RGWCoroutine_Done && + status.state != rgw_bucket_shard_sync_info::StateIncrementalSync) { + *result = false; + } + return r; + } +}; + +class CheckAllBucketShardStatusIsIncremental : public RGWShardCollectCR { + // start with 1 shard, and only spawn more if we detect an existing shard. + // this makes the backward compatilibility check far less expensive in the + // general case where no shards exist + static constexpr int initial_concurrent_shards = 1; + static constexpr int max_concurrent_shards = 16; + + RGWDataSyncCtx* sc; + rgw_bucket_sync_pair_info sync_pair; + const int num_shards; + bool* result; + int shard = 0; + public: + CheckAllBucketShardStatusIsIncremental(RGWDataSyncCtx* sc, + const rgw_bucket_sync_pair_info& sync_pair, + int num_shards, bool* result) + : RGWShardCollectCR(sc->cct, initial_concurrent_shards), + sc(sc), sync_pair(sync_pair), num_shards(num_shards), result(result) + {} + + bool spawn_next() override { + // stop spawning if we saw any errors or non-incremental shards + if (shard >= num_shards || status < 0 || !*result) { + return false; + } + sync_pair.dest_bs.shard_id = shard++; + spawn(new CheckBucketShardStatusIsIncremental(sc, sync_pair, result), false); + return true; + } + + private: + int handle_result(int r) override { + if (r < 0) { + ldout(cct, 4) << "failed to read bucket shard status: " + << cpp_strerror(r) << dendl; + } else if (shard == 0) { + // enable concurrency once the first shard succeeds + max_concurrent = max_concurrent_shards; + } + return r; + } +}; + +// wrap InitBucketShardSyncStatus with local storage for 'status' and 'objv' +// and a loop to retry on racing writes +class InitBucketShardStatusCR : public RGWCoroutine { + RGWDataSyncCtx* sc; + const rgw_bucket_sync_pair_info& pair; + rgw_bucket_shard_sync_info status; + RGWObjVersionTracker objv; + int tries = 10; // retry on racing writes + bool exclusive = true; // first try is exclusive + using ReadCR = RGWReadBucketPipeSyncStatusCoroutine; + using InitCR = RGWInitBucketShardSyncStatusCoroutine; + public: + InitBucketShardStatusCR(RGWDataSyncCtx* sc, const rgw_bucket_sync_pair_info& pair) + : RGWCoroutine(sc->cct), sc(sc), pair(pair) + {} + int operate(const DoutPrefixProvider *dpp) { + reenter(this) { + // try exclusive create with empty status + objv.generate_new_write_ver(cct); + yield call(new InitCR(sc, pair, status, objv, exclusive)); + if (retcode >= 0) { + return set_cr_done(); + } else if (retcode != -EEXIST) { + return set_cr_error(retcode); + } + + exclusive = false; + // retry loop to reinitialize + while (--tries) { + objv.clear(); + // read current status and objv + yield call(new ReadCR(sc, pair, &status, &objv)); + if (retcode < 0) { + return set_cr_error(retcode); + } + yield call(new InitCR(sc, pair, status, objv, exclusive)); + if (retcode >= 0) { + return set_cr_done(); + } else if (retcode != -ECANCELED) { + return set_cr_error(retcode); + } + } + return set_cr_error(retcode); + } + return 0; + } +}; + +class InitBucketShardStatusCollectCR : public RGWShardCollectCR { + static constexpr int max_concurrent_shards = 16; + RGWDataSyncCtx* sc; + rgw_bucket_sync_pair_info sync_pair; + const int num_shards; + int shard = 0; + + int handle_result(int r) override { + if (r < 0) { + ldout(cct, 4) << "failed to init bucket shard status: " + << cpp_strerror(r) << dendl; + } + return r; + } + public: + InitBucketShardStatusCollectCR(RGWDataSyncCtx* sc, + const rgw_bucket_sync_pair_info& sync_pair, + int num_shards) + : RGWShardCollectCR(sc->cct, max_concurrent_shards), + sc(sc), sync_pair(sync_pair), num_shards(num_shards) + {} + + bool spawn_next() override { + if (shard >= num_shards || status < 0) { // stop spawning on any errors + return false; + } + sync_pair.dest_bs.shard_id = shard++; + spawn(new InitBucketShardStatusCR(sc, sync_pair), false); + return true; + } +}; + +class InitBucketFullSyncStatusCR : public RGWCoroutine { + RGWDataSyncCtx *sc; + RGWDataSyncEnv *sync_env; + + const rgw_bucket_sync_pair_info& sync_pair; + const rgw_raw_obj& status_obj; + rgw_bucket_sync_status& status; + RGWObjVersionTracker& objv; + const int num_shards; + const bool check_compat; + + bool all_incremental = true; +public: + InitBucketFullSyncStatusCR(RGWDataSyncCtx* sc, + const rgw_bucket_sync_pair_info& sync_pair, + const rgw_raw_obj& status_obj, + rgw_bucket_sync_status& status, + RGWObjVersionTracker& objv, + int num_shards, bool check_compat) + : RGWCoroutine(sc->cct), sc(sc), sync_env(sc->env), + sync_pair(sync_pair), status_obj(status_obj), + status(status), objv(objv), num_shards(num_shards), + check_compat(check_compat) + {} + + int operate(const DoutPrefixProvider *dpp) override { + reenter(this) { + status.state = BucketSyncState::Init; + + if (check_compat) { + // try to convert existing per-shard incremental status for backward compatibility + yield call(new CheckAllBucketShardStatusIsIncremental(sc, sync_pair, num_shards, &all_incremental)); + if (retcode < 0) { + return set_cr_error(retcode); + } + if (all_incremental) { + // we can use existing status and resume incremental sync + status.state = BucketSyncState::Incremental; + } + } + + if (status.state != BucketSyncState::Incremental) { + // initialize all shard sync status. this will populate the log marker + // positions where incremental sync will resume after full sync + yield call(new InitBucketShardStatusCollectCR(sc, sync_pair, num_shards)); + if (retcode < 0) { + ldout(cct, 20) << "failed to init bucket shard status: " + << cpp_strerror(retcode) << dendl; + return set_cr_error(retcode); + } + + if (sync_env->sync_module->should_full_sync()) { + status.state = BucketSyncState::Full; + } else { + status.state = BucketSyncState::Incremental; + } + } + + ldout(cct, 20) << "writing bucket sync state=" << status.state << dendl; + + // write bucket sync status + using CR = RGWSimpleRadosWriteCR; + yield call(new CR(dpp, sync_env->async_rados, sync_env->svc->sysobj, + status_obj, status, &objv, false)); + if (retcode < 0) { + ldout(cct, 20) << "failed to write bucket shard status: " + << cpp_strerror(retcode) << dendl; + return set_cr_error(retcode); + } + return set_cr_done(); + } + return 0; + } +}; + +RGWCoroutine *RGWRemoteBucketManager::init_sync_status_cr(RGWObjVersionTracker& objv) +{ + constexpr bool check_compat = false; + const int num_shards = num_pipes(); + return new InitBucketFullSyncStatusCR(&sc, sync_pairs[0], full_status_obj, + full_status, objv, num_shards, check_compat); +} + #define OMAP_READ_MAX_ENTRIES 10 class RGWReadRecoveringBucketShardsCoroutine : public RGWCoroutine { RGWDataSyncCtx *sc; @@ -4734,7 +4959,7 @@ int RGWSyncBucketShardCR::operate(const DoutPrefixProvider *dpp) do { if (sync_status.state == rgw_bucket_shard_sync_info::StateInit || sync_status.state == rgw_bucket_shard_sync_info::StateStopped) { - yield call(new RGWInitBucketShardSyncStatusCoroutine(sc, sync_pair, sync_status, objv_tracker)); + yield call(new RGWInitBucketShardSyncStatusCoroutine(sc, sync_pair, sync_status, objv_tracker, false)); if (retcode == -ENOENT) { tn->log(0, "bucket sync disabled"); drain_all(); @@ -4853,11 +5078,8 @@ int RGWBucketPipeSyncStatusManager::init_sync_status(const DoutPrefixProvider *d for (auto& mgr : source_mgrs) { RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr); - - for (int i = 0; i < mgr->num_pipes(); ++i) { - objvs.emplace_back(); - stack->call(mgr->init_sync_status_cr(i, objvs.back())); - } + objvs.emplace_back(); + stack->call(mgr->init_sync_status_cr(objvs.back())); stacks.push_back(stack); } diff --git a/src/rgw/rgw_data_sync.h b/src/rgw/rgw_data_sync.h index 2f33a7eb0e5..27885783533 100644 --- a/src/rgw/rgw_data_sync.h +++ b/src/rgw/rgw_data_sync.h @@ -648,10 +648,12 @@ class RGWRemoteBucketManager { RGWRESTConn *conn{nullptr}; rgw_zone_id source_zone; + rgw_raw_obj full_status_obj; std::vector sync_pairs; RGWDataSyncCtx sc; - rgw_bucket_shard_sync_info init_status; + rgw_bucket_sync_status full_status; + rgw_bucket_shard_sync_info shard_status; RGWBucketSyncCR *sync_cr{nullptr}; @@ -662,12 +664,8 @@ public: const RGWBucketInfo& source_bucket_info, const rgw_bucket& dest_bucket); - void init(const rgw_zone_id& _source_zone, RGWRESTConn *_conn, - const rgw_bucket& source_bucket, int shard_id, - const rgw_bucket& dest_bucket); - RGWCoroutine *read_sync_status_cr(int num, rgw_bucket_shard_sync_info *sync_status); - RGWCoroutine *init_sync_status_cr(int num, RGWObjVersionTracker& objv_tracker); + RGWCoroutine *init_sync_status_cr(RGWObjVersionTracker& objv_tracker); RGWCoroutine *run_sync_cr(int num); int num_pipes() {