From: Yuval Lifshitz Date: Tue, 18 May 2021 15:59:54 +0000 (+0300) Subject: rgw/multisite: allow bucket sync disable/enable X-Git-Tag: v18.0.0~787^2~89 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=9ebebc3c66cb2e43cc2dc37fbc640b8240860460;p=ceph.git rgw/multisite: allow bucket sync disable/enable Signed-off-by: Yuval Lifshitz --- diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index 4a96ae46f15a..48b76f2a5c55 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -2746,60 +2746,43 @@ class RGWInitBucketShardSyncStatusCoroutine : public RGWCoroutine { rgw_bucket_shard_sync_info& status; RGWObjVersionTracker& objv_tracker; - rgw_bucket_index_marker_info& info; const BucketIndexShardsManager& marker_mgr; bool exclusive; public: RGWInitBucketShardSyncStatusCoroutine(RGWDataSyncCtx *_sc, const rgw_bucket_sync_pair_info& _sync_pair, rgw_bucket_shard_sync_info& _status, - rgw_bucket_index_marker_info& _info, + uint64_t latest_gen, const BucketIndexShardsManager& _marker_mgr, RGWObjVersionTracker& objv_tracker, bool exclusive) : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), sync_pair(_sync_pair), - sync_status_oid(RGWBucketPipeSyncStatusManager::inc_status_oid(sc->source_zone, _sync_pair, _info.latest_gen)), - status(_status), objv_tracker(objv_tracker), info(_info), marker_mgr(_marker_mgr), exclusive(exclusive) + sync_status_oid(RGWBucketPipeSyncStatusManager::inc_status_oid(sc->source_zone, _sync_pair, latest_gen)), + status(_status), objv_tracker(objv_tracker), marker_mgr(_marker_mgr), exclusive(exclusive) {} int operate(const DoutPrefixProvider *dpp) override { reenter(this) { yield { - auto store = sync_env->store; rgw_raw_obj obj(sync_env->svc->zone->get_zone_params().log_pool, sync_status_oid); - const bool stopped = status.state == rgw_bucket_shard_sync_info::StateStopped; - bool write_status = false; - auto max_marker = marker_mgr.get(sync_pair.source_bs.shard_id, ""); - - if (info.syncstopped) { - if (stopped && !sync_env->sync_module->should_full_sync()) { - // preserve our current incremental marker position - write_status = true; - } - } else { - // whether or not to do full sync, incremental sync will follow anyway - if (sync_env->sync_module->should_full_sync()) { - status.inc_marker.position = max_marker; - } - write_status = true; - status.inc_marker.timestamp = ceph::real_clock::now(); - status.state = rgw_bucket_shard_sync_info::StateIncrementalSync; - } - - if (write_status) { - map attrs; - status.encode_all_attrs(attrs); - call(new RGWSimpleRadosWriteAttrsCR(dpp, sync_env->async_rados, sync_env->svc->sysobj, - obj, attrs, &objv_tracker, exclusive)); - } else { - call(new RGWRadosRemoveCR(store, obj, &objv_tracker)); + // whether or not to do full sync, incremental sync will follow anyway + if (sync_env->sync_module->should_full_sync()) { + const auto max_marker = marker_mgr.get(sync_pair.source_bs.shard_id, ""); + status.inc_marker.position = max_marker; } + status.inc_marker.timestamp = ceph::real_clock::now(); + status.state = rgw_bucket_shard_sync_info::StateIncrementalSync; + + map attrs; + status.encode_all_attrs(attrs); + call(new RGWSimpleRadosWriteAttrsCR(dpp, sync_env->async_rados, sync_env->svc->sysobj, + obj, attrs, &objv_tracker, exclusive)); + ldout(cct, 20) << "init marker position: " << status.inc_marker.position << + ". written to shard status object: " << sync_status_oid << dendl; } - if (info.syncstopped) { - retcode = -ENOENT; - } + if (retcode < 0) { return set_cr_error(retcode); } @@ -3007,7 +2990,7 @@ class InitBucketShardStatusCR : public RGWCoroutine { rgw_bucket_sync_pair_info pair; rgw_bucket_shard_sync_info status; RGWObjVersionTracker objv; - rgw_bucket_index_marker_info& info; + const uint64_t latest_gen; const BucketIndexShardsManager& marker_mgr; int tries = 10; // retry on racing writes @@ -3017,15 +3000,15 @@ class InitBucketShardStatusCR : public RGWCoroutine { public: InitBucketShardStatusCR(RGWDataSyncCtx* sc, const rgw_bucket_sync_pair_info& pair, - rgw_bucket_index_marker_info& info, + uint64_t latest_gen, const BucketIndexShardsManager& marker_mgr) - : RGWCoroutine(sc->cct), sc(sc), pair(pair), info(info), marker_mgr(marker_mgr) + : RGWCoroutine(sc->cct), sc(sc), pair(pair), latest_gen(latest_gen), marker_mgr(marker_mgr) {} int operate(const DoutPrefixProvider *dpp) { reenter(this) { // try exclusive create with empty status objv.generate_new_write_ver(cct); - yield call(new InitCR(sc, pair, status, info, marker_mgr, objv, exclusive)); + yield call(new InitCR(sc, pair, status, latest_gen, marker_mgr, objv, exclusive)); if (retcode >= 0) { return set_cr_done(); } else if (retcode != -EEXIST) { @@ -3037,11 +3020,11 @@ class InitBucketShardStatusCR : public RGWCoroutine { while (--tries) { objv.clear(); // read current status and objv - yield call(new ReadCR(sc, pair, &status, &objv, info.latest_gen)); + yield call(new ReadCR(sc, pair, &status, &objv, latest_gen)); if (retcode < 0) { return set_cr_error(retcode); } - yield call(new InitCR(sc, pair, status, info, marker_mgr, objv, exclusive)); + yield call(new InitCR(sc, pair, status, latest_gen, marker_mgr, objv, exclusive)); if (retcode >= 0) { return set_cr_done(); } else if (retcode != -ECANCELED) { @@ -3058,7 +3041,7 @@ class InitBucketShardStatusCollectCR : public RGWShardCollectCR { static constexpr int max_concurrent_shards = 16; RGWDataSyncCtx* sc; rgw_bucket_sync_pair_info sync_pair; - rgw_bucket_index_marker_info& info; + const uint64_t latest_gen; const BucketIndexShardsManager& marker_mgr; const int num_shards; @@ -3074,11 +3057,11 @@ class InitBucketShardStatusCollectCR : public RGWShardCollectCR { public: InitBucketShardStatusCollectCR(RGWDataSyncCtx* sc, const rgw_bucket_sync_pair_info& sync_pair, - rgw_bucket_index_marker_info& info, + uint64_t latest_gen, const BucketIndexShardsManager& marker_mgr, int num_shards) : RGWShardCollectCR(sc->cct, max_concurrent_shards), - sc(sc), sync_pair(sync_pair), info(info), marker_mgr(marker_mgr), num_shards(num_shards) + sc(sc), sync_pair(sync_pair), latest_gen(latest_gen), marker_mgr(marker_mgr), num_shards(num_shards) {} bool spawn_next() override { @@ -3086,7 +3069,75 @@ class InitBucketShardStatusCollectCR : public RGWShardCollectCR { return false; } sync_pair.source_bs.shard_id = shard++; - spawn(new InitBucketShardStatusCR(sc, sync_pair, info, marker_mgr), false); + spawn(new InitBucketShardStatusCR(sc, sync_pair, latest_gen, marker_mgr), false); + return true; + } +}; + +class RemoveBucketShardStatusCR : public RGWCoroutine { + RGWDataSyncCtx* const sc; + RGWDataSyncEnv* const sync_env; + + rgw_bucket_sync_pair_info sync_pair; + rgw_raw_obj obj; + RGWObjVersionTracker objv; + +public: + RemoveBucketShardStatusCR(RGWDataSyncCtx* sc, + const rgw_bucket_sync_pair_info& sync_pair, uint64_t latest_gen) + : RGWCoroutine(sc->cct), sc(sc), sync_env(sc->env), + sync_pair(sync_pair), + obj(sync_env->svc->zone->get_zone_params().log_pool, + RGWBucketPipeSyncStatusManager::inc_status_oid(sc->source_zone, sync_pair, latest_gen)) + {} + + int operate(const DoutPrefixProvider *dpp) override { + reenter(this) { + yield call(new RGWRadosRemoveCR(sync_env->store, obj, &objv)); + if (retcode < 0 && retcode != -ENOENT) { + ldout(cct, 20) << "ERROR: failed to remove bucket shard status for: " << sync_pair << + ". with error: " << retcode << dendl; + return set_cr_error(retcode); + } + ldout(cct, 20) << "removed bucket shard status object: " << obj.oid << dendl; + return set_cr_done(); + } + return 0; + } +}; + +class RemoveBucketShardStatusCollectCR : public RGWShardCollectCR { + static constexpr int max_concurrent_shards = 16; + RGWDataSyncCtx* const sc; + RGWDataSyncEnv* const sync_env; + rgw_bucket_sync_pair_info sync_pair; + const uint64_t latest_gen; + + const int num_shards; + int shard = 0; + + int handle_result(int r) override { + if (r < 0) { + ldout(cct, 4) << "failed to remove bucket shard status object: " + << cpp_strerror(r) << dendl; + } + return r; + } + public: + RemoveBucketShardStatusCollectCR(RGWDataSyncCtx* sc, + const rgw_bucket_sync_pair_info& sync_pair, + uint64_t latest_gen, + int num_shards) + : RGWShardCollectCR(sc->cct, max_concurrent_shards), + sc(sc), sync_env(sc->env), sync_pair(sync_pair), latest_gen(latest_gen), num_shards(num_shards) + {} + + bool spawn_next() override { + if (shard >= num_shards || status < 0) { // stop spawning on any errors + return false; + } + sync_pair.source_bs.shard_id = shard++; + spawn(new RemoveBucketShardStatusCR(sc, sync_pair, latest_gen), false); return true; } }; @@ -3102,7 +3153,7 @@ class InitBucketFullSyncStatusCR : public RGWCoroutine { const int num_shards; const bool check_compat; - rgw_bucket_index_marker_info info; + const rgw_bucket_index_marker_info& info; BucketIndexShardsManager marker_mgr; bool all_incremental = true; @@ -3112,22 +3163,16 @@ public: const rgw_raw_obj& status_obj, rgw_bucket_sync_status& status, RGWObjVersionTracker& objv, - int num_shards, bool check_compat) + int num_shards, bool check_compat, + const rgw_bucket_index_marker_info& info) : RGWCoroutine(sc->cct), sc(sc), sync_env(sc->env), sync_pair(sync_pair), status_obj(status_obj), status(status), objv(objv), num_shards(num_shards), - check_compat(check_compat) + check_compat(check_compat), info(info) {} int operate(const DoutPrefixProvider *dpp) override { reenter(this) { - yield call(new RGWReadRemoteBucketIndexLogInfoCR(sc, sync_pair.dest_bucket, &info)); - if (retcode < 0) { - lderr(cct) << "failed to read remote bilog info: " - << cpp_strerror(retcode) << dendl; - return set_cr_error(retcode); - } - retcode = marker_mgr.from_string(info.max_marker, -1); if (retcode < 0) { lderr(cct) << "failed to parse bilog shard markers: " @@ -3141,6 +3186,7 @@ public: if (check_compat) { // try to convert existing per-shard incremental status for backward compatibility yield call(new CheckAllBucketShardStatusIsIncremental(sc, sync_pair, num_shards, &all_incremental)); + ldout(cct, 20) << "check for 'all incremental' in compatibility mode" << dendl; if (retcode < 0) { return set_cr_error(retcode); } @@ -3154,7 +3200,7 @@ public: if (status.state != BucketSyncState::Incremental) { // initialize all shard sync status. this will populate the log marker // positions where incremental sync will resume after full sync - yield call(new InitBucketShardStatusCollectCR(sc, sync_pair, info, marker_mgr, num_shards)); + yield call(new InitBucketShardStatusCollectCR(sc, sync_pair, info.latest_gen, marker_mgr, num_shards)); if (retcode < 0) { ldout(cct, 20) << "failed to init bucket shard status: " << cpp_strerror(retcode) << dendl; @@ -3171,7 +3217,7 @@ public: status.shards_done_with_gen.resize(num_shards); status.incremental_gen = info.latest_gen; - ldout(cct, 20) << "writing bucket sync state=" << status.state << dendl; + ldout(cct, 20) << "writing bucket sync status during init. state=" << status.state << ". marker=" << status.full.position.to_str() << dendl; // write bucket sync status using CR = RGWSimpleRadosWriteCR; @@ -3188,12 +3234,12 @@ public: } }; -RGWCoroutine *RGWRemoteBucketManager::init_sync_status_cr(RGWObjVersionTracker& objv) +RGWCoroutine *RGWRemoteBucketManager::init_sync_status_cr(RGWObjVersionTracker& objv, rgw_bucket_index_marker_info& info) { constexpr bool check_compat = false; const int num_shards = num_pipes(); return new InitBucketFullSyncStatusCR(&sc, sync_pairs[0], full_status_obj, - full_status, objv, num_shards, check_compat); + full_status, objv, num_shards, check_compat, info); } #define OMAP_READ_MAX_ENTRIES 10 @@ -4302,12 +4348,13 @@ int RGWBucketShardIncrementalSyncCR::operate(const DoutPrefixProvider *dpp) for (; entries_iter != entries_end; ++entries_iter) { auto e = *entries_iter; if (e.op == RGWModifyOp::CLS_RGW_OP_SYNCSTOP) { - ldpp_dout(dpp, 20) << "syncstop on " << e.timestamp << dendl; + ldpp_dout(dpp, 20) << "syncstop at: " << e.timestamp << ". marker: " << e.id << dendl; syncstopped = true; entries_end = std::next(entries_iter); // stop after this entry break; } if (e.op == RGWModifyOp::CLS_RGW_OP_RESYNC) { + ldpp_dout(dpp, 20) << "syncstart at: " << e.timestamp << ". marker: " << e.id << dendl; continue; } if (e.op == CLS_RGW_OP_CANCEL) { @@ -4688,7 +4735,7 @@ int RGWRunBucketSourcesSyncCR::operate(const DoutPrefixProvider *dpp) reenter(this) { yield call(new RGWGetBucketPeersCR(sync_env, target_bucket, sc->source_zone, source_bucket, &pipes, tn)); if (retcode < 0 && retcode != -ENOENT) { - tn->log(0, "ERROR: failed to read sync status for bucket"); + tn->log(0, SSTR("ERROR: failed to read sync status for bucket. error: " << retcode)); return set_cr_error(retcode); } @@ -4742,7 +4789,7 @@ int RGWRunBucketSourcesSyncCR::operate(const DoutPrefixProvider *dpp) [&](uint64_t stack_id, int ret) { handle_complete_stack(stack_id); if (ret < 0) { - tn->log(10, "a sync operation returned error"); + tn->log(10, SSTR("ERROR: a sync operation returned error: " << ret)); } return ret; }); @@ -4751,7 +4798,7 @@ int RGWRunBucketSourcesSyncCR::operate(const DoutPrefixProvider *dpp) drain_all_cb([&](uint64_t stack_id, int ret) { handle_complete_stack(stack_id); if (ret < 0) { - tn->log(10, "a sync operation returned error"); + tn->log(10, SSTR("a sync operation returned error: " << ret)); } return ret; }); @@ -5046,7 +5093,7 @@ class RGWSyncBucketShardCR : public RGWCoroutine { boost::intrusive_ptr lease_cr; rgw_bucket_sync_pair_info sync_pair; rgw_bucket_sync_pipe& sync_pipe; - BucketSyncState& bucket_state; + bool& bucket_stopped; uint64_t generation; ceph::real_time* progress; @@ -5062,13 +5109,13 @@ public: boost::intrusive_ptr lease_cr, const rgw_bucket_sync_pair_info& _sync_pair, rgw_bucket_sync_pipe& sync_pipe, - BucketSyncState& bucket_state, + bool& bucket_stopped, uint64_t generation, const RGWSyncTraceNodeRef& tn, ceph::real_time* progress) : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), lease_cr(std::move(lease_cr)), sync_pair(_sync_pair), - sync_pipe(sync_pipe), bucket_state(bucket_state), generation(generation), progress(progress), + sync_pipe(sync_pipe), bucket_stopped(bucket_stopped), generation(generation), progress(progress), shard_status_oid(RGWBucketPipeSyncStatusManager::inc_status_oid(sc->source_zone, sync_pair, generation)), bucket_status_obj(sc->env->svc->zone->get_zone_params().log_pool, RGWBucketPipeSyncStatusManager::full_status_oid(sc->source_zone, @@ -5085,7 +5132,7 @@ int RGWSyncBucketShardCR::operate(const DoutPrefixProvider *dpp) reenter(this) { yield call(new RGWReadBucketPipeSyncStatusCoroutine(sc, sync_pair, &sync_status, &objv_tracker, generation)); if (retcode < 0 && retcode != -ENOENT) { - tn->log(0, "ERROR: failed to read sync status for bucket"); + tn->log(0, SSTR("ERROR: failed to read sync status for bucket. error: " << retcode)); return set_cr_error(retcode); } @@ -5103,7 +5150,12 @@ int RGWSyncBucketShardCR::operate(const DoutPrefixProvider *dpp) tn->log(5, SSTR("incremental sync on bucket failed, retcode=" << retcode)); return set_cr_error(retcode); } - // TODO: handle transition to StateStopped + + if (sync_status.state == rgw_bucket_shard_sync_info::StateStopped) { + tn->log(20, SSTR("syncstopped indication for source bucket shard")); + bucket_stopped = true; + } + return set_cr_done(); } @@ -5124,8 +5176,10 @@ class RGWSyncBucketCR : public RGWCoroutine { const uint32_t lock_duration; const rgw_raw_obj status_obj; rgw_bucket_sync_status bucket_status; + bool bucket_stopped = false; RGWObjVersionTracker objv; bool init_check_compat = false; + rgw_bucket_index_marker_info info; RGWSyncTraceNodeRef tn; @@ -5162,6 +5216,9 @@ static RGWCoroutine* sync_bucket_shard_cr(RGWDataSyncCtx* sc, gen, tn, progress); } +#define RELEASE_LOCK(cr) \ + if (cr) {cr->go_down(); drain_all(); cr.reset();} + int RGWSyncBucketCR::operate(const DoutPrefixProvider *dpp) { reenter(this) { @@ -5184,113 +5241,170 @@ int RGWSyncBucketCR::operate(const DoutPrefixProvider *dpp) // read bucket sync status using ReadCR = RGWSimpleRadosReadCR; + using WriteCR = RGWSimpleRadosWriteCR; + yield call(new ReadCR(dpp, env->async_rados, env->svc->sysobj, status_obj, &bucket_status, false, &objv)); if (retcode == -ENOENT) { // use exclusive create to set state=Init objv.generate_new_write_ver(cct); - using WriteCR = RGWSimpleRadosWriteCR; yield call(new WriteCR(dpp, env->async_rados, env->svc->sysobj, status_obj, bucket_status, &objv, true)); + tn->log(20, "bucket status object does not exist, create a new one"); if (retcode == -EEXIST) { // raced with another create, read its status + tn->log(20, "raced with another create, read its status"); yield call(new ReadCR(dpp, env->async_rados, env->svc->sysobj, status_obj, &bucket_status, false, &objv)); } } if (retcode < 0) { + tn->log(20, SSTR("ERROR: failed to read bucket status object. error: " << retcode)); return set_cr_error(retcode); } do { - tn->log(20, SSTR("sync status for source bucket: " << bucket_status.state)); + tn->log(20, SSTR("sync status for source bucket: " << bucket_status.state << + ". lease is: " << (bucket_lease_cr ? "taken" : "not taken") << ". stop indications is: " << bucket_stopped)); + + if (bucket_status.state == BucketSyncState::Init || + bucket_status.state == BucketSyncState::Stopped || + bucket_stopped) { + // if state is Init or Stopped, we query the remote RGW for ther state + yield call(new RGWReadRemoteBucketIndexLogInfoCR(sc, sync_pair.dest_bucket, &info)); + if (retcode < 0) { + return set_cr_error(retcode); + } + if (info.syncstopped) { + // remote indicates stopped state + tn->log(20, "remote bilog indicates that sync was stopped"); + if (!bucket_lease_cr) { + bucket_lease_cr.reset(new RGWContinuousLeaseCR(env->async_rados, env->store, status_obj, + lock_name, lock_duration, this)); + yield spawn(bucket_lease_cr.get(), false); + while (!bucket_lease_cr->is_locked()) { + if (bucket_lease_cr->is_done()) { + tn->log(5, "ERROR: failed to take bucket lease"); + set_status("lease lock failed, early abort"); + drain_all(); + return set_cr_error(bucket_lease_cr->get_ret_status()); + } + tn->log(5, "waiting on bucket lease"); + yield set_sleeping(true); + } + } + + // check if local state is "stopped" + yield call(new ReadCR(dpp, env->async_rados, env->svc->sysobj, + status_obj, &bucket_status, false, &objv)); + if (retcode < 0) { + tn->log(20, SSTR("ERROR: failed to read status before writing 'stopped'. error: " << retcode)); + RELEASE_LOCK(bucket_lease_cr); + return set_cr_error(retcode); + } + if (bucket_status.state != BucketSyncState::Stopped) { + // make sure that state is changed to stopped localy + bucket_status.state = BucketSyncState::Stopped; + yield call(new WriteCR(dpp, env->async_rados, env->svc->sysobj, + status_obj, bucket_status, &objv, false)); + if (retcode < 0) { + tn->log(20, SSTR("ERROR: failed to write 'stopped' status. error: " << retcode)); + RELEASE_LOCK(bucket_lease_cr); + return set_cr_error(retcode); + } + } + yield { + const int num_shards = sync_pipe.dest_bucket_info.layout.current_index.layout.normal.num_shards; + call(new RemoveBucketShardStatusCollectCR(sc, sync_pair, info.latest_gen, num_shards)); + } + RELEASE_LOCK(bucket_lease_cr); + return set_cr_done(); + } + bucket_stopped = false; + } - // if the state wasn't Incremental, take a bucket-wide lease to prevent - // different shards from duplicating the init and full sync if (bucket_status.state != BucketSyncState::Incremental) { - assert(!bucket_lease_cr); - bucket_lease_cr.reset(new RGWContinuousLeaseCR(env->async_rados, env->store, status_obj, + // if the state wasn't Incremental, take a bucket-wide lease to prevent + // different shards from duplicating the init and full sync + if (!bucket_lease_cr) { + bucket_lease_cr.reset(new RGWContinuousLeaseCR(env->async_rados, env->store, status_obj, lock_name, lock_duration, this)); - yield spawn(bucket_lease_cr.get(), false); - while (!bucket_lease_cr->is_locked()) { - if (bucket_lease_cr->is_done()) { - tn->log(5, "failed to take bucket lease"); - set_status("lease lock failed, early abort"); - drain_all(); - return set_cr_error(bucket_lease_cr->get_ret_status()); + yield spawn(bucket_lease_cr.get(), false); + while (!bucket_lease_cr->is_locked()) { + if (bucket_lease_cr->is_done()) { + tn->log(5, "ERROR: failed to take bucket lease"); + set_status("lease lock failed, early abort"); + drain_all(); + return set_cr_error(bucket_lease_cr->get_ret_status()); + } + tn->log(5, "waiting on bucket lease"); + yield set_sleeping(true); } - tn->log(5, "waiting on bucket lease"); - yield set_sleeping(true); } + // reread the status after acquiring the lock yield call(new ReadCR(dpp, env->async_rados, env->svc->sysobj, - status_obj, &bucket_status, false, &objv)); + status_obj, &bucket_status, false, &objv)); if (retcode < 0) { - bucket_lease_cr->go_down(); - drain_all(); - bucket_lease_cr.reset(); + RELEASE_LOCK(bucket_lease_cr); + tn->log(20, SSTR("ERROR: reading the status after acquiring the lock failed. error: " << retcode)); return set_cr_error(retcode); } - } - if (bucket_status.state == BucketSyncState::Init || - bucket_status.state == BucketSyncState::Stopped) { - assert(bucket_lease_cr); // init sync status yield { init_check_compat = objv.read_version.ver <= 1; // newly-created - int num_shards = sync_pipe.dest_bucket_info.layout.current_index.layout.normal.num_shards; + const int num_shards = sync_pipe.dest_bucket_info.layout.current_index.layout.normal.num_shards; call(new InitBucketFullSyncStatusCR(sc, sync_pair, status_obj, bucket_status, objv, num_shards, - init_check_compat)); + init_check_compat, info)); } + if (retcode < 0) { - bucket_lease_cr->go_down(); - drain_all(); - bucket_lease_cr.reset(); + tn->log(20, SSTR("ERROR: init full sync failed. error: " << retcode)); + RELEASE_LOCK(bucket_lease_cr); return set_cr_error(retcode); } } + assert(bucket_status.state == BucketSyncState::Incremental || + bucket_status.state == BucketSyncState::Full); + if (bucket_status.state == BucketSyncState::Full) { assert(bucket_lease_cr); yield call(new RGWBucketFullSyncCR(sc, sync_pipe, status_obj, bucket_lease_cr, bucket_status, tn, objv)); if (retcode < 0) { - bucket_lease_cr->go_down(); - drain_all(); - bucket_lease_cr.reset(); + tn->log(20, SSTR("ERROR: full sync failed. error: " << retcode)); + RELEASE_LOCK(bucket_lease_cr); return set_cr_error(retcode); } } if (bucket_status.state == BucketSyncState::Incremental) { // lease not required for incremental sync - if (bucket_lease_cr) { - bucket_lease_cr->go_down(); - drain_all(); - bucket_lease_cr.reset(); - } + RELEASE_LOCK(bucket_lease_cr); // if a specific gen was requested, compare that to the sync status if (gen) { const auto current_gen = bucket_status.incremental_gen; if (*gen > current_gen) { retcode = -EAGAIN; - tn->log(10, SSTR("requested sync of future generation " + tn->log(10, SSTR("ERROR: requested sync of future generation " << *gen << " > " << current_gen << ", returning " << retcode << " for later retry")); return set_cr_error(retcode); } else if (*gen < current_gen) { - tn->log(10, SSTR("requested sync of past generation " + tn->log(10, SSTR("WARNING: requested sync of past generation " << *gen << " < " << current_gen << ", returning success")); return set_cr_done(); } } - if (size_t(sync_pair.source_bs.shard_id) >= bucket_status.shards_done_with_gen.size()) { + assert(sync_pair.source_bs.shard_id >= 0); + if (static_cast(sync_pair.source_bs.shard_id) >= bucket_status.shards_done_with_gen.size()) { tn->log(1, SSTR("bucket shard " << sync_pair.source_bs << " index out of bounds")); return set_cr_done(); // return success so we don't retry } @@ -5301,14 +5415,15 @@ int RGWSyncBucketCR::operate(const DoutPrefixProvider *dpp) } yield call(new RGWSyncBucketShardCR(sc, data_lease_cr, sync_pair, - sync_pipe, bucket_status.state, + sync_pipe, bucket_stopped, bucket_status.incremental_gen, tn, progress)); if (retcode < 0) { + tn->log(20, SSTR("ERROR: incremental sync failed. error: " << retcode)); return set_cr_error(retcode); } } // loop back to previous states unless incremental sync returns normally - } while (bucket_status.state != BucketSyncState::Incremental); + } while (bucket_status.state != BucketSyncState::Incremental || bucket_stopped); return set_cr_done(); } @@ -5386,11 +5501,13 @@ int RGWBucketPipeSyncStatusManager::init_sync_status(const DoutPrefixProvider *d list stacks; // pass an empty objv tracker to each so that the version gets incremented std::list objvs; + std::list infos; for (auto& mgr : source_mgrs) { RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr); objvs.emplace_back(); - stack->call(mgr->init_sync_status_cr(objvs.back())); + infos.emplace_back(); + stack->call(mgr->init_sync_status_cr(objvs.back(), infos.back())); stacks.push_back(stack); } diff --git a/src/rgw/rgw_data_sync.h b/src/rgw/rgw_data_sync.h index e1b5b2d9cb0a..5f45ecb2f3cb 100644 --- a/src/rgw/rgw_data_sync.h +++ b/src/rgw/rgw_data_sync.h @@ -686,7 +686,7 @@ public: const rgw_bucket& dest_bucket); RGWCoroutine *read_sync_status_cr(int num, rgw_bucket_shard_sync_info *sync_status); - RGWCoroutine *init_sync_status_cr(RGWObjVersionTracker& objv_tracker); + RGWCoroutine *init_sync_status_cr(RGWObjVersionTracker& objv_tracker, rgw_bucket_index_marker_info& info); RGWCoroutine *run_sync_cr(int num); int num_pipes() {