From 5275f32c018e9682c4d9fc2d4958992264a0298f Mon Sep 17 00:00:00 2001 From: "Adam C. Emerson" Date: Thu, 2 Sep 2021 17:36:09 -0400 Subject: [PATCH] rgw: InitBucketFullSyncStatusCR gets num shards from remote As specified in rgw_bucket_index_marker_info, unless we're doing the compatibility check, in which case we look at generation 0. Signed-off-by: Adam C. Emerson --- src/rgw/rgw_data_sync.cc | 97 ++++++++++++++++++++++++---------------- src/rgw/rgw_data_sync.h | 2 + 2 files changed, 60 insertions(+), 39 deletions(-) diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index 4362819167819..4d3f62688b854 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -2812,7 +2812,8 @@ RGWRemoteBucketManager::RGWRemoteBucketManager(const DoutPrefixProvider *_dpp, full_status_obj(sync_env->svc->zone->get_zone_params().log_pool, RGWBucketPipeSyncStatusManager::full_status_oid(source_zone, source_bucket_info.bucket, - dest_bucket)) + dest_bucket)), + source_bucket_info(source_bucket_info) { rgw_bucket_index_marker_info remote_info; BucketIndexShardsManager remote_markers; @@ -3140,24 +3141,27 @@ class InitBucketFullSyncStatusCR : public RGWCoroutine { const rgw_raw_obj& status_obj; rgw_bucket_sync_status& status; RGWObjVersionTracker& objv; - const int num_shards; + const RGWBucketInfo& source_info; const bool check_compat; const rgw_bucket_index_marker_info& info; BucketIndexShardsManager marker_mgr; bool all_incremental = true; + bool no_zero = false; + public: InitBucketFullSyncStatusCR(RGWDataSyncCtx* sc, const rgw_bucket_sync_pair_info& sync_pair, const rgw_raw_obj& status_obj, rgw_bucket_sync_status& status, RGWObjVersionTracker& objv, - int num_shards, bool check_compat, + const RGWBucketInfo& source_info, + bool check_compat, const rgw_bucket_index_marker_info& info) : RGWCoroutine(sc->cct), sc(sc), sync_env(sc->env), sync_pair(sync_pair), status_obj(status_obj), - status(status), objv(objv), num_shards(num_shards), + status(status), objv(objv), source_info(source_info), check_compat(check_compat), info(info) {} @@ -3173,27 +3177,50 @@ public: status.state = BucketSyncState::Init; if (info.oldest_gen == 0) { - if (check_compat) { - // try to convert existing per-shard incremental status for backward compatibility - yield call(new CheckAllBucketShardStatusIsIncremental(sc, sync_pair, num_shards, &all_incremental)); - if (retcode < 0) { - return set_cr_error(retcode); - } - if (all_incremental) { - // we can use existing status and resume incremental sync - status.state = BucketSyncState::Incremental; - } - } + if (check_compat) { + // use shard count from our log gen=0 + // try to convert existing per-shard incremental status for backward compatibility + if (source_info.layout.logs.front().gen > 0) { + ldpp_dout(dpp, 20) << "no generation zero when checking compatibility" << dendl; + no_zero = true; + } + if (auto& log = source_info.layout.logs.front(); + log.layout.type != rgw::BucketLogType::InIndex) { + ldpp_dout(dpp, 20) << "unrecognized log layout type when checking compatibility " << log.layout.type << dendl; + no_zero = true; + } + if (!no_zero) { + yield { + const int num_shards0 = + source_info.layout.logs.front().layout.in_index.layout.num_shards; + call(new CheckAllBucketShardStatusIsIncremental(sc, sync_pair, + num_shards0, + &all_incremental)); + } + if (retcode < 0) { + return set_cr_error(retcode); + } + if (all_incremental) { + // we can use existing status and resume incremental sync + status.state = BucketSyncState::Incremental; + } + } else { + all_incremental = false; + } + } } if (status.state != BucketSyncState::Incremental) { - // initialize all shard sync status. this will populate the log marker + // initialize all shard sync status. this will populate the log marker // positions where incremental sync will resume after full sync - yield call(new InitBucketShardStatusCollectCR(sc, sync_pair, info.latest_gen, marker_mgr, num_shards)); - if (retcode < 0) { + yield { + const int num_shards = marker_mgr.get().size(); + call(new InitBucketShardStatusCollectCR(sc, sync_pair, info.latest_gen, marker_mgr, num_shards)); + } + if (retcode < 0) { ldout(cct, 20) << "failed to init bucket shard status: " - << cpp_strerror(retcode) << dendl; - return set_cr_error(retcode); + << cpp_strerror(retcode) << dendl; + return set_cr_error(retcode); } if (sync_env->sync_module->should_full_sync()) { @@ -3203,7 +3230,7 @@ public: } } - status.shards_done_with_gen.resize(num_shards); + status.shards_done_with_gen.resize(marker_mgr.get().size()); status.incremental_gen = info.latest_gen; ldout(cct, 20) << "writing bucket sync status during init. state=" << status.state << ". marker=" << status.full.position.to_str() << dendl; @@ -3211,7 +3238,7 @@ public: // write bucket sync status using CR = RGWSimpleRadosWriteCR; yield call(new CR(dpp, sync_env->async_rados, sync_env->svc->sysobj, - status_obj, status, &objv, false)); + status_obj, status, &objv, false)); if (retcode < 0) { ldout(cct, 20) << "failed to write bucket shard status: " << cpp_strerror(retcode) << dendl; @@ -3226,9 +3253,9 @@ public: RGWCoroutine *RGWRemoteBucketManager::init_sync_status_cr(RGWObjVersionTracker& objv, rgw_bucket_index_marker_info& info) { constexpr bool check_compat = false; - const int num_shards = num_pipes(); return new InitBucketFullSyncStatusCR(&sc, sync_pairs[0], full_status_obj, - full_status, objv, num_shards, check_compat, info); + full_status, objv, source_bucket_info, + check_compat, info); } #define OMAP_READ_MAX_ENTRIES 10 @@ -3303,7 +3330,7 @@ class RGWReadPendingBucketShardsCoroutine : public RGWCoroutine { RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; rgw::sal::RadosStore* store; - + const int shard_id; int max_entries; @@ -5171,8 +5198,6 @@ class RGWSyncBucketCR : public RGWCoroutine { rgw_bucket_index_marker_info info; RGWSyncTraceNodeRef tn; - rgw_bucket_index_marker_info remote_info; - BucketIndexShardsManager remote_markers; public: RGWSyncBucketCR(RGWDataSyncCtx *_sc, @@ -5284,7 +5309,7 @@ int RGWSyncBucketCR::operate(const DoutPrefixProvider *dpp) yield set_sleeping(true); } } - + // if state was incremental, remove all per-shard status objects if (bucket_status.state == BucketSyncState::Incremental) { yield { @@ -5346,22 +5371,16 @@ int RGWSyncBucketCR::operate(const DoutPrefixProvider *dpp) yield call(new ReadCR(dpp, env->async_rados, env->svc->sysobj, status_obj, &bucket_status, false, &objv)); if (retcode < 0) { - RELEASE_LOCK(bucket_lease_cr); + RELEASE_LOCK(bucket_lease_cr); tn->log(20, SSTR("ERROR: reading the status after acquiring the lock failed. error: " << retcode)); return set_cr_error(retcode); } tn->log(20, SSTR("status after acquiring the lock is: " << bucket_status.state)); - // init sync status - yield { - init_check_compat = objv.read_version.ver <= 1; // newly-created - rgw_read_remote_bilog_info(dpp, sc->conn, sync_pair.source_bs.bucket, - remote_info, remote_markers, null_yield); - const int num_shards = remote_markers.get().size(); - call(new InitBucketFullSyncStatusCR(sc, sync_pair, status_obj, - bucket_status, objv, num_shards, - init_check_compat, info)); - } + yield call(new InitBucketFullSyncStatusCR(sc, sync_pair, status_obj, + bucket_status, objv, + sync_pipe.source_bucket_info, + init_check_compat, info)); if (retcode < 0) { tn->log(20, SSTR("ERROR: init full sync failed. error: " << retcode)); diff --git a/src/rgw/rgw_data_sync.h b/src/rgw/rgw_data_sync.h index 5f45ecb2f3cbd..7c2abed736acc 100644 --- a/src/rgw/rgw_data_sync.h +++ b/src/rgw/rgw_data_sync.h @@ -674,10 +674,12 @@ class RGWRemoteBucketManager { RGWDataSyncCtx sc; rgw_bucket_sync_status full_status; + const RGWBucketInfo source_bucket_info; rgw_bucket_shard_sync_info shard_status; RGWBucketSyncCR *sync_cr{nullptr}; + public: RGWRemoteBucketManager(const DoutPrefixProvider *_dpp, RGWDataSyncEnv *_sync_env, -- 2.39.5