From cd81b9035211b1cc59dd0cdb1f6253849e25dba3 Mon Sep 17 00:00:00 2001 From: "Adam C. Emerson" Date: Thu, 4 Aug 2022 16:52:28 -0400 Subject: [PATCH] rgw: Break out RGWDataFullSyncShardCR This function was formerly RGWDataSyncShardCR::full_sync. The only functional difference is that we leave acquiring the lease to the top level RGWDataSyncShardCR coroutine class, since the lease should be held on the transition from full to incremental sync. Fixes: https://tracker.ceph.com/issues/57063 Signed-off-by: Adam C. Emerson --- src/rgw/rgw_data_sync.cc | 112 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 111 insertions(+), 1 deletion(-) diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index 2c935b817df4f..18c9fa04f951a 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -1654,8 +1654,119 @@ protected: sync_status(sync_status), bucket_shard_cache(bucket_shard_cache) {} }; +class RGWDataFullSyncShardCR : public RGWDataBaseSyncShardCR { + static constexpr auto OMAP_GET_MAX_ENTRIES = 100; + + string oid; + uint64_t total_entries = 0; + ceph::real_time entry_timestamp; + std::map entries; + std::map::iterator iter; + string error_marker; + +public: + + RGWDataFullSyncShardCR( + RGWDataSyncCtx *const sc, const rgw_pool& pool, const uint32_t shard_id, + rgw_data_sync_marker& sync_marker, RGWSyncTraceNodeRef tn, + const string& status_oid, const rgw_raw_obj& error_repo, + const boost::intrusive_ptr& lease_cr, + const rgw_data_sync_status& sync_status, + const boost::intrusive_ptr& bucket_shard_cache) + : RGWDataBaseSyncShardCR(sc, pool, shard_id, sync_marker, tn, + status_oid, error_repo, lease_cr, sync_status, + bucket_shard_cache) {} + + int operate(const DoutPrefixProvider *dpp) override { + reenter(this) { + tn->log(10, "start full sync"); + oid = full_data_sync_index_shard_oid(sc->source_zone, shard_id); + marker_tracker.emplace(sc, status_oid, sync_marker, tn); + total_entries = sync_marker.pos; + entry_timestamp = sync_marker.timestamp; // time when full sync started + do { + if (!lease_cr->is_locked()) { + lease_cr->go_down(); + drain_all(); + return set_cr_error(-ECANCELED); + } + omapvals = std::make_shared(); + yield call(new RGWRadosGetOmapValsCR(sc->env->store, + rgw_raw_obj(pool, oid), + sync_marker.marker, + OMAP_GET_MAX_ENTRIES, omapvals)); + if (retcode < 0) { + lease_cr->go_down(); + drain_all(); + return set_cr_error(retcode); + } + entries = std::move(omapvals->entries); + if (entries.size() > 0) { + tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */ + } + tn->log(20, SSTR("retrieved " << entries.size() << " entries to sync")); + iter = entries.begin(); + for (; iter != entries.end(); ++iter) { + retcode = parse_bucket_key(iter->first, source_bs); + if (retcode < 0) { + tn->log(1, SSTR("failed to parse bucket shard: " << iter->first)); + marker_tracker->try_update_high_marker(iter->first, 0, + entry_timestamp); + continue; + } + tn->log(20, SSTR("full sync: " << iter->first)); + total_entries++; + if (!marker_tracker->start(iter->first, total_entries, + entry_timestamp)) { + tn->log(0, SSTR("ERROR: cannot start syncing " << iter->first + << ". Duplicate entry?")); + } else { + tn->log(10, SSTR("timestamp for " << iter->first << " is :" << entry_timestamp)); + yield_spawn_window(new RGWDataFullSyncSingleEntryCR( + sc, pool, source_bs, iter->first, sync_status, + error_repo, entry_timestamp, lease_cr, + bucket_shard_cache, &*marker_tracker, tn), + cct->_conf->rgw_data_sync_spawn_window, + std::nullopt); + } + sync_marker.marker = iter->first; + } + } while (omapvals->more); + omapvals.reset(); + + drain_all(); + + tn->unset_flag(RGW_SNS_FLAG_ACTIVE); + + yield { + /* update marker to reflect we're done with full sync */ + sync_marker.state = rgw_data_sync_marker::IncrementalSync; + sync_marker.marker = sync_marker.next_step_marker; + sync_marker.next_step_marker.clear(); + call(new RGWSimpleRadosWriteCR( + sc->env->dpp,sc->env->async_rados, sc->env->svc->sysobj, + rgw_raw_obj(pool, status_oid), sync_marker)); + } + if (retcode < 0) { + tn->log(0, SSTR("ERROR: failed to set sync marker: retcode=" << retcode)); + lease_cr->go_down(); + drain_all(); + return set_cr_error(retcode); + } + // clean up full sync index + yield { + const auto& pool = sc->env->svc->zone->get_zone_params().log_pool; + auto oid = full_data_sync_index_shard_oid(sc->source_zone.id, shard_id); + call(new RGWRadosRemoveCR(sc->env->store, {pool, oid})); + } + // keep lease and transition to incremental_sync() + } + return 0; + } +}; class RGWDataSyncShardCR : public RGWCoroutine { + static constexpr auto OMAP_GET_MAX_ENTRIES = 100; RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; @@ -1789,7 +1900,6 @@ public: } int full_sync() { -#define OMAP_GET_MAX_ENTRIES 100 int max_entries = OMAP_GET_MAX_ENTRIES; reenter(&full_cr) { tn->log(10, "start full sync"); -- 2.39.5