From 43d4b188a279a487d7ee03151abe6a1228acc3c4 Mon Sep 17 00:00:00 2001 From: Shilpa Jagannath Date: Wed, 6 Jul 2022 15:00:52 -0400 Subject: [PATCH] rgw/multisite: handle marker_tracker updates. handle retcodes correctly. Signed-off-by: Shilpa Jagannath --- src/rgw/rgw_data_sync.cc | 60 +++++++++++++++++++--------------------- 1 file changed, 28 insertions(+), 32 deletions(-) diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index f191f45d231f8..38b1f5055dbb7 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -1413,7 +1413,7 @@ public: } }; -class RGWFullSyncErrorRepoCR: public RGWCoroutine { +class RGWDataIncrementalSyncFullObligationCR: public RGWCoroutine { RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; rgw_bucket_shard source_bs; @@ -1422,20 +1422,19 @@ class RGWFullSyncErrorRepoCR: public RGWCoroutine { RGWSyncTraceNodeRef tn; rgw_bucket_index_marker_info remote_info; rgw_pool pool; - RGWDataChangesLog *datalog_changes{nullptr}; rgw_raw_obj error_repo; uint32_t sid; rgw_bucket_shard bs; std::vector::const_iterator each; rgw_raw_obj datalog_oid_for_error_repo(rgw_bucket_shard& bs) { - int datalog_shard = datalog_changes->choose_oid(bs); + int datalog_shard = sync_env->store->svc()->datalog_rados->choose_oid(bs); string oid = RGWDataSyncStatusManager::shard_obj_name(sc->source_zone, datalog_shard); return rgw_raw_obj(pool, oid + ".retry"); } public: - RGWFullSyncErrorRepoCR(RGWDataSyncCtx *_sc, rgw_bucket_shard& _source_bs, std::string _error_marker, + RGWDataIncrementalSyncFullObligationCR(RGWDataSyncCtx *_sc, rgw_bucket_shard& _source_bs, std::string _error_marker, ceph::real_time& _timestamp, RGWSyncTraceNodeRef& _tn) : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), source_bs(_source_bs), error_marker(_error_marker), timestamp(_timestamp), tn(_tn) { @@ -1461,9 +1460,9 @@ public: timestamp), cct->_conf->rgw_data_sync_spawn_window, [&](uint64_t stack_id, int ret) { if (ret < 0) { - tn->log(10, SSTR("writing to error repo returned error: " << ret)); + retcode = ret; } - return ret; + return 0; }); } } @@ -1474,22 +1473,19 @@ public: return ret; }); - spawn(rgw::error_repo::remove_cr(sync_env->store->svc()->rados, error_repo, - error_marker, timestamp), false); - return set_cr_done(); } return 0; } }; -RGWCoroutine* sync_single_entry(RGWDataSyncCtx *sc, const rgw_bucket_shard& src, +RGWCoroutine* data_sync_single_entry(RGWDataSyncCtx *sc, const rgw_bucket_shard& src, std::optional gen, const std::string marker, ceph::real_time timestamp, boost::intrusive_ptr lease_cr, boost::intrusive_ptr bucket_shard_cache, - std::optional marker_tracker, + RGWDataSyncShardMarkerTrack* marker_tracker, rgw_raw_obj& error_repo, RGWSyncTraceNodeRef& tn, bool retry) { @@ -1509,6 +1505,7 @@ class RGWDataFullSyncSingleEntryCR : public RGWCoroutine { ceph::real_time timestamp; boost::intrusive_ptr lease_cr; boost::intrusive_ptr bucket_shard_cache; + RGWDataSyncShardMarkerTrack* marker_tracker; RGWSyncTraceNodeRef tn; rgw_bucket_index_marker_info remote_info; uint32_t sid; @@ -1516,14 +1513,15 @@ class RGWDataFullSyncSingleEntryCR : public RGWCoroutine { uint64_t i{0}; public: - RGWDataFullSyncSingleEntryCR(RGWDataSyncCtx *_sc, rgw_bucket_shard& _source_bs, - const std::string& _key, rgw_raw_obj& _error_repo, - ceph::real_time& _timestamp, boost::intrusive_ptr _lease_cr, + RGWDataFullSyncSingleEntryCR(RGWDataSyncCtx *_sc, const rgw_bucket_shard& _source_bs, + const std::string& _key, const rgw_raw_obj& _error_repo, + ceph::real_time _timestamp, boost::intrusive_ptr _lease_cr, boost::intrusive_ptr _bucket_shard_cache, + RGWDataSyncShardMarkerTrack* _marker_tracker, RGWSyncTraceNodeRef& _tn) : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), source_bs(_source_bs), key(_key), error_repo(_error_repo), timestamp(_timestamp), lease_cr(_lease_cr), - bucket_shard_cache(_bucket_shard_cache), tn(_tn) {} + bucket_shard_cache(_bucket_shard_cache), marker_tracker(_marker_tracker), tn(_tn) {} int operate(const DoutPrefixProvider *dpp) override { @@ -1545,8 +1543,8 @@ public: //if any of the operations fail at any time, write them into error repo for later retry. source_bs.shard_id = 0; - yield call(sync_single_entry(sc, source_bs, remote_info.oldest_gen, key, timestamp, - lease_cr, bucket_shard_cache, std::nullopt, error_repo, tn, false)); + yield call(data_sync_single_entry(sc, source_bs, remote_info.oldest_gen, key, timestamp, + lease_cr, bucket_shard_cache, nullptr, error_repo, tn, false)); if (retcode < 0) { tn->log(10, SSTR("full sync: failed to sync " << source_bs.shard_id << " of gen " << remote_info.oldest_gen << ". Writing to error repo for retry")); @@ -1595,8 +1593,8 @@ public: } else { source_bs.shard_id = sid; tn->log(10, SSTR("full sync: syncing shard_id " << sid << " of gen " << each->gen)); - yield_spawn_window(sync_single_entry(sc, source_bs, each->gen, key, timestamp, - lease_cr, bucket_shard_cache, std::nullopt, error_repo, tn, false), + yield_spawn_window(data_sync_single_entry(sc, source_bs, each->gen, key, timestamp, + lease_cr, bucket_shard_cache, nullptr, error_repo, tn, false), cct->_conf->rgw_data_sync_spawn_window, [&](uint64_t stack_id, int ret) { if (ret < 0) { @@ -1615,6 +1613,8 @@ public: return ret; }); + yield call(marker_tracker->finish(key)); + return set_cr_done(); } return 0; @@ -1808,9 +1808,8 @@ public: tn->log(0, SSTR("ERROR: cannot start syncing " << iter->first << ". Duplicate entry?")); } else { yield_spawn_window(new RGWDataFullSyncSingleEntryCR(sc, source_bs, iter->first, error_repo, entry_timestamp, - lease_cr, bucket_shard_cache, tn), cct->_conf->rgw_data_sync_spawn_window, std::nullopt); - - yield call(marker_tracker->finish(iter->first)); + lease_cr, bucket_shard_cache, &*marker_tracker, tn), + cct->_conf->rgw_data_sync_spawn_window, std::nullopt); } sync_marker.marker = iter->first; } @@ -1890,8 +1889,8 @@ public: continue; } tn->log(20, SSTR("received async update notification: " << modified_iter->key)); - spawn(sync_single_entry(sc, source_bs, modified_iter->gen, string(), - ceph::real_time{}, lease_cr, bucket_shard_cache, marker_tracker, error_repo, tn, false), false); + spawn(data_sync_single_entry(sc, source_bs, modified_iter->gen, string(), + ceph::real_time{}, lease_cr, bucket_shard_cache, &*marker_tracker, error_repo, tn, false), false); } if (error_retry_time <= ceph::coarse_real_clock::now()) { @@ -1918,14 +1917,11 @@ public: } if (!gen) { // write all full sync obligations for the bucket to error repo - spawn(new RGWFullSyncErrorRepoCR(sc, source_bs, error_marker, entry_timestamp, tn), false); - if (retcode < 0) { - tn->log(0, SSTR("ERROR: failed to write to error repo: retcode=" << retcode)); - } + spawn(new RGWDataIncrementalSyncFullObligationCR(sc, source_bs, error_marker, entry_timestamp, tn), false); } else { tn->log(20, SSTR("handle error entry key=" << to_string(source_bs, gen) << " timestamp=" << entry_timestamp)); - spawn(sync_single_entry(sc, source_bs, gen, "", entry_timestamp, lease_cr, - bucket_shard_cache, marker_tracker, error_repo, tn, true), false); + spawn(data_sync_single_entry(sc, source_bs, gen, "", entry_timestamp, lease_cr, + bucket_shard_cache, &*marker_tracker, error_repo, tn, true), false); } } if (!omapvals->more) { @@ -1960,9 +1956,9 @@ public: if (!marker_tracker->start(log_iter->log_id, 0, log_iter->log_timestamp)) { tn->log(0, SSTR("ERROR: cannot start syncing " << log_iter->log_id << ". Duplicate entry?")); } else { - yield_spawn_window(sync_single_entry(sc, source_bs, log_iter->entry.gen, log_iter->log_id, + yield_spawn_window(data_sync_single_entry(sc, source_bs, log_iter->entry.gen, log_iter->log_id, log_iter->log_timestamp, lease_cr,bucket_shard_cache, - marker_tracker, error_repo, tn, false), + &*marker_tracker, error_repo, tn, false), cct->_conf->rgw_data_sync_spawn_window, std::nullopt); } } -- 2.39.5