full_status_obj(sync_env->svc->zone->get_zone_params().log_pool,
RGWBucketPipeSyncStatusManager::full_status_oid(source_zone,
source_bucket_info.bucket,
- dest_bucket))
+ dest_bucket)),
+ source_bucket_info(source_bucket_info)
{
rgw_bucket_index_marker_info remote_info;
BucketIndexShardsManager remote_markers;
const rgw_raw_obj& status_obj;
rgw_bucket_sync_status& status;
RGWObjVersionTracker& objv;
- const int num_shards;
+ const RGWBucketInfo& source_info;
const bool check_compat;
const rgw_bucket_index_marker_info& info;
BucketIndexShardsManager marker_mgr;
bool all_incremental = true;
+ bool no_zero = false;
+
public:
InitBucketFullSyncStatusCR(RGWDataSyncCtx* sc,
const rgw_bucket_sync_pair_info& sync_pair,
const rgw_raw_obj& status_obj,
rgw_bucket_sync_status& status,
RGWObjVersionTracker& objv,
- int num_shards, bool check_compat,
+ const RGWBucketInfo& source_info,
+ bool check_compat,
const rgw_bucket_index_marker_info& info)
: RGWCoroutine(sc->cct), sc(sc), sync_env(sc->env),
sync_pair(sync_pair), status_obj(status_obj),
- status(status), objv(objv), num_shards(num_shards),
+ status(status), objv(objv), source_info(source_info),
check_compat(check_compat), info(info)
{}
status.state = BucketSyncState::Init;
if (info.oldest_gen == 0) {
- if (check_compat) {
- // try to convert existing per-shard incremental status for backward compatibility
- yield call(new CheckAllBucketShardStatusIsIncremental(sc, sync_pair, num_shards, &all_incremental));
- if (retcode < 0) {
- return set_cr_error(retcode);
- }
- if (all_incremental) {
- // we can use existing status and resume incremental sync
- status.state = BucketSyncState::Incremental;
- }
- }
+ if (check_compat) {
+ // use shard count from our log gen=0
+ // try to convert existing per-shard incremental status for backward compatibility
+ if (source_info.layout.logs.front().gen > 0) {
+ ldpp_dout(dpp, 20) << "no generation zero when checking compatibility" << dendl;
+ no_zero = true;
+ }
+ if (auto& log = source_info.layout.logs.front();
+ log.layout.type != rgw::BucketLogType::InIndex) {
+ ldpp_dout(dpp, 20) << "unrecognized log layout type when checking compatibility " << log.layout.type << dendl;
+ no_zero = true;
+ }
+ if (!no_zero) {
+ yield {
+ const int num_shards0 =
+ source_info.layout.logs.front().layout.in_index.layout.num_shards;
+ call(new CheckAllBucketShardStatusIsIncremental(sc, sync_pair,
+ num_shards0,
+ &all_incremental));
+ }
+ if (retcode < 0) {
+ return set_cr_error(retcode);
+ }
+ if (all_incremental) {
+ // we can use existing status and resume incremental sync
+ status.state = BucketSyncState::Incremental;
+ }
+ } else {
+ all_incremental = false;
+ }
+ }
}
if (status.state != BucketSyncState::Incremental) {
- // initialize all shard sync status. this will populate the log marker
+ // initialize all shard sync status. this will populate the log marker
// positions where incremental sync will resume after full sync
- yield call(new InitBucketShardStatusCollectCR(sc, sync_pair, info.latest_gen, marker_mgr, num_shards));
- if (retcode < 0) {
+ yield {
+ const int num_shards = marker_mgr.get().size();
+ call(new InitBucketShardStatusCollectCR(sc, sync_pair, info.latest_gen, marker_mgr, num_shards));
+ }
+ if (retcode < 0) {
ldout(cct, 20) << "failed to init bucket shard status: "
- << cpp_strerror(retcode) << dendl;
- return set_cr_error(retcode);
+ << cpp_strerror(retcode) << dendl;
+ return set_cr_error(retcode);
}
if (sync_env->sync_module->should_full_sync()) {
}
}
- status.shards_done_with_gen.resize(num_shards);
+ status.shards_done_with_gen.resize(marker_mgr.get().size());
status.incremental_gen = info.latest_gen;
ldout(cct, 20) << "writing bucket sync status during init. state=" << status.state << ". marker=" << status.full.position.to_str() << dendl;
// write bucket sync status
using CR = RGWSimpleRadosWriteCR<rgw_bucket_sync_status>;
yield call(new CR(dpp, sync_env->async_rados, sync_env->svc->sysobj,
- status_obj, status, &objv, false));
+ status_obj, status, &objv, false));
if (retcode < 0) {
ldout(cct, 20) << "failed to write bucket shard status: "
<< cpp_strerror(retcode) << dendl;
RGWCoroutine *RGWRemoteBucketManager::init_sync_status_cr(RGWObjVersionTracker& objv, rgw_bucket_index_marker_info& info)
{
constexpr bool check_compat = false;
- const int num_shards = num_pipes();
return new InitBucketFullSyncStatusCR(&sc, sync_pairs[0], full_status_obj,
- full_status, objv, num_shards, check_compat, info);
+ full_status, objv, source_bucket_info,
+ check_compat, info);
}
#define OMAP_READ_MAX_ENTRIES 10
RGWDataSyncCtx *sc;
RGWDataSyncEnv *sync_env;
rgw::sal::RadosStore* store;
-
+
const int shard_id;
int max_entries;
rgw_bucket_index_marker_info info;
RGWSyncTraceNodeRef tn;
- rgw_bucket_index_marker_info remote_info;
- BucketIndexShardsManager remote_markers;
public:
RGWSyncBucketCR(RGWDataSyncCtx *_sc,
yield set_sleeping(true);
}
}
-
+
// if state was incremental, remove all per-shard status objects
if (bucket_status.state == BucketSyncState::Incremental) {
yield {
yield call(new ReadCR(dpp, env->async_rados, env->svc->sysobj,
status_obj, &bucket_status, false, &objv));
if (retcode < 0) {
- RELEASE_LOCK(bucket_lease_cr);
+ RELEASE_LOCK(bucket_lease_cr);
tn->log(20, SSTR("ERROR: reading the status after acquiring the lock failed. error: " << retcode));
return set_cr_error(retcode);
}
tn->log(20, SSTR("status after acquiring the lock is: " << bucket_status.state));
- // init sync status
- yield {
- init_check_compat = objv.read_version.ver <= 1; // newly-created
- rgw_read_remote_bilog_info(dpp, sc->conn, sync_pair.source_bs.bucket,
- remote_info, remote_markers, null_yield);
- const int num_shards = remote_markers.get().size();
- call(new InitBucketFullSyncStatusCR(sc, sync_pair, status_obj,
- bucket_status, objv, num_shards,
- init_check_compat, info));
- }
+ yield call(new InitBucketFullSyncStatusCR(sc, sync_pair, status_obj,
+ bucket_status, objv,
+ sync_pipe.source_bucket_info,
+ init_check_compat, info));
if (retcode < 0) {
tn->log(20, SSTR("ERROR: init full sync failed. error: " << retcode));