From 532abb588d399e850af654097de9add057736a09 Mon Sep 17 00:00:00 2001 From: "Adam C. Emerson" Date: Mon, 8 Aug 2022 15:03:03 -0400 Subject: [PATCH] rgw: Clean up RGWDataSyncShardCR Remove no-longer-used functions and data members they depended on. Fixes: https://tracker.ceph.com/issues/57063 Signed-off-by: Adam C. Emerson --- src/rgw/rgw_data_sync.cc | 333 +++------------------------------------ 1 file changed, 24 insertions(+), 309 deletions(-) diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index 32829f0387656..16c8726b11f71 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -1617,8 +1617,6 @@ public: } }; -static constexpr auto DATA_SYNC_MAX_ERR_ENTRIES = 10; - class RGWDataBaseSyncShardCR : public RGWCoroutine { protected: RGWDataSyncCtx *const sc; @@ -1766,7 +1764,7 @@ public: }; class RGWDataIncSyncShardCR : public RGWDataBaseSyncShardCR { - static constexpr int max_error_entries = DATA_SYNC_MAX_ERR_ENTRIES; + static constexpr int max_error_entries = 10; static constexpr uint32_t retry_backoff_secs = 60; ceph::mutex& inc_lock; @@ -1976,83 +1974,43 @@ public: }; class RGWDataSyncShardCR : public RGWCoroutine { - static constexpr auto OMAP_GET_MAX_ENTRIES = 100; - RGWDataSyncCtx *sc; - RGWDataSyncEnv *sync_env; - - rgw_pool pool; - - uint32_t shard_id; + RGWDataSyncCtx *const sc; + const rgw_pool pool; + const uint32_t shard_id; rgw_data_sync_marker& sync_marker; rgw_data_sync_status sync_status; - - RGWRadosGetOmapValsCR::ResultPtr omapvals; - std::map entries; - std::map::iterator iter; - - string oid; - - std::optional marker_tracker; - - std::string next_marker; - vector log_entries; - vector::iterator log_iter; - bool truncated = false; + const RGWSyncTraceNodeRef tn; + bool *reset_backoff; // TODO We do nothing with this pointer. ceph::mutex inc_lock = ceph::make_mutex("RGWDataSyncShardCR::inc_lock"); ceph::condition_variable inc_cond; - boost::asio::coroutine incremental_cr; - boost::asio::coroutine full_cr; - + RGWDataSyncEnv *const sync_env{ sc->env }; - bc::flat_set modified_shards; - bc::flat_set current_modified; + const string status_oid{ RGWDataSyncStatusManager::shard_obj_name( + sc->source_zone, shard_id) }; + const rgw_raw_obj error_repo{ pool, status_oid + ".retry" }; - bc::flat_set::iterator modified_iter; - - uint64_t total_entries = 0; - bool *reset_backoff = nullptr; + // target number of entries to cache before recycling idle ones + static constexpr size_t target_cache_size = 256; + boost::intrusive_ptr bucket_shard_cache { + rgw::bucket_sync::Cache::create(target_cache_size) }; boost::intrusive_ptr lease_cr; boost::intrusive_ptr lease_stack; - string status_oid; - - rgw_raw_obj error_repo; - std::map error_entries; - string error_marker; - ceph::real_time entry_timestamp; - static constexpr int max_error_entries = DATA_SYNC_MAX_ERR_ENTRIES; - - ceph::coarse_real_time error_retry_time; - static constexpr uint32_t retry_backoff_secs = 60; - - RGWSyncTraceNodeRef tn; - - rgw_bucket_shard source_bs; - std::optional gen; - // target number of entries to cache before recycling idle ones - static constexpr size_t target_cache_size = 256; - boost::intrusive_ptr bucket_shard_cache; - - int parse_bucket_key(const std::string& key, rgw_bucket_shard& bs) const { - return rgw_bucket_parse_bucket_key(sync_env->cct, key, - &bs.bucket, &bs.shard_id); - } + bc::flat_set modified_shards; public: - RGWDataSyncShardCR(RGWDataSyncCtx *_sc, rgw_pool& _pool, - uint32_t _shard_id, rgw_data_sync_marker& _marker, - const rgw_data_sync_status& _sync_status, - RGWSyncTraceNodeRef& _tn, bool *_reset_backoff) - : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), - pool(_pool), shard_id(_shard_id), sync_marker(_marker), sync_status(_sync_status), - status_oid(RGWDataSyncStatusManager::shard_obj_name(sc->source_zone, shard_id)), - error_repo(pool, status_oid + ".retry"), tn(_tn), - bucket_shard_cache(rgw::bucket_sync::Cache::create(target_cache_size)) - { - set_description() << "data sync shard source_zone=" << sc->source_zone << " shard_id=" << shard_id; + RGWDataSyncShardCR(RGWDataSyncCtx* const _sc, const rgw_pool& pool, + const uint32_t shard_id, rgw_data_sync_marker& marker, + const rgw_data_sync_status& sync_status, + RGWSyncTraceNodeRef& tn, bool *reset_backoff) + : RGWCoroutine(_sc->cct), sc(_sc), pool(pool), shard_id(shard_id), + sync_marker(marker), sync_status(sync_status), tn(tn), + reset_backoff(reset_backoff) { + set_description() << "data sync shard source_zone=" << sc->source_zone + << " shard_id=" << shard_id; } ~RGWDataSyncShardCR() override { @@ -2146,249 +2104,6 @@ public: lock_name, lock_duration, this)); lease_stack.reset(spawn(lease_cr.get(), false)); } - - int full_sync() { - int max_entries = OMAP_GET_MAX_ENTRIES; - reenter(&full_cr) { - tn->log(10, "start full sync"); - yield init_lease_cr(); - while (!lease_cr->is_locked()) { - if (lease_cr->is_done()) { - tn->log(5, "failed to take lease"); - set_status("lease lock failed, early abort"); - drain_all(); - return set_cr_error(lease_cr->get_ret_status()); - } - set_sleeping(true); - yield; - } - tn->log(10, "took lease"); - oid = full_data_sync_index_shard_oid(sc->source_zone, shard_id); - marker_tracker.emplace(sc, status_oid, sync_marker, tn); - total_entries = sync_marker.pos; - entry_timestamp = sync_marker.timestamp; // time when full sync started - do { - if (!lease_cr->is_locked()) { - lease_cr->go_down(); - drain_all(); - return set_cr_error(-ECANCELED); - } - omapvals = std::make_shared(); - yield call(new RGWRadosGetOmapValsCR(sync_env->store, rgw_raw_obj(pool, oid), - sync_marker.marker, max_entries, omapvals)); - if (retcode < 0) { - lease_cr->go_down(); - drain_all(); - return set_cr_error(retcode); - } - entries = std::move(omapvals->entries); - if (entries.size() > 0) { - tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */ - } - tn->log(20, SSTR("retrieved " << entries.size() << " entries to sync")); - iter = entries.begin(); - for (; iter != entries.end(); ++iter) { - retcode = parse_bucket_key(iter->first, source_bs); - if (retcode < 0) { - tn->log(1, SSTR("failed to parse bucket shard: " << iter->first)); - marker_tracker->try_update_high_marker(iter->first, 0, entry_timestamp); - continue; - } - tn->log(20, SSTR("full sync: " << iter->first)); - total_entries++; - if (!marker_tracker->start(iter->first, total_entries, entry_timestamp)) { - tn->log(0, SSTR("ERROR: cannot start syncing " << iter->first << ". Duplicate entry?")); - } else { - tn->log(10, SSTR("timestamp for " << iter->first << " is :" << entry_timestamp)); - yield_spawn_window(new RGWDataFullSyncSingleEntryCR(sc, pool, source_bs, iter->first, sync_status, - error_repo, entry_timestamp, lease_cr, bucket_shard_cache, &*marker_tracker, tn), - cct->_conf->rgw_data_sync_spawn_window, std::nullopt); - } - sync_marker.marker = iter->first; - } - } while (omapvals->more); - omapvals.reset(); - - drain_all_but_stack(lease_stack.get()); - - tn->unset_flag(RGW_SNS_FLAG_ACTIVE); - - yield { - /* update marker to reflect we're done with full sync */ - sync_marker.state = rgw_data_sync_marker::IncrementalSync; - sync_marker.marker = sync_marker.next_step_marker; - sync_marker.next_step_marker.clear(); - call(new RGWSimpleRadosWriteCR(sync_env->dpp, sync_env->async_rados, sync_env->svc->sysobj, - rgw_raw_obj(pool, status_oid), - sync_marker)); - } - if (retcode < 0) { - tn->log(0, SSTR("ERROR: failed to set sync marker: retcode=" << retcode)); - lease_cr->go_down(); - drain_all(); - return set_cr_error(retcode); - } - // clean up full sync index - yield { - const auto& pool = sync_env->svc->zone->get_zone_params().log_pool; - auto oid = full_data_sync_index_shard_oid(sc->source_zone.id, shard_id); - call(new RGWRadosRemoveCR(sync_env->store, {pool, oid})); - } - // keep lease and transition to incremental_sync() - } - return 0; - } - - int incremental_sync() { - reenter(&incremental_cr) { - tn->log(10, "start incremental sync"); - if (lease_cr) { - tn->log(10, "lease already held from full sync"); - } else { - yield init_lease_cr(); - while (!lease_cr->is_locked()) { - if (lease_cr->is_done()) { - tn->log(5, "failed to take lease"); - set_status("lease lock failed, early abort"); - drain_all(); - return set_cr_error(lease_cr->get_ret_status()); - } - set_sleeping(true); - yield; - } - set_status("lease acquired"); - tn->log(10, "took lease"); - } - marker_tracker.emplace(sc, status_oid, sync_marker, tn); - do { - if (!lease_cr->is_locked()) { - lease_cr->go_down(); - drain_all(); - return set_cr_error(-ECANCELED); - } - current_modified.clear(); - inc_lock.lock(); - current_modified.swap(modified_shards); - inc_lock.unlock(); - - if (current_modified.size() > 0) { - tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */ - } - /* process out of band updates */ - for (modified_iter = current_modified.begin(); modified_iter != current_modified.end(); ++modified_iter) { - retcode = parse_bucket_key(modified_iter->key, source_bs); - if (retcode < 0) { - tn->log(1, SSTR("failed to parse bucket shard: " << modified_iter->key)); - continue; - } - tn->log(20, SSTR("received async update notification: " << modified_iter->key)); - spawn(data_sync_single_entry(sc, source_bs, modified_iter->gen, string(), - ceph::real_time{}, lease_cr, bucket_shard_cache, &*marker_tracker, error_repo, tn, false), false); - } - - if (error_retry_time <= ceph::coarse_real_clock::now()) { - /* process bucket shards that previously failed */ - omapvals = std::make_shared(); - yield call(new RGWRadosGetOmapValsCR(sync_env->store, error_repo, - error_marker, max_error_entries, omapvals)); - error_entries = std::move(omapvals->entries); - tn->log(20, SSTR("read error repo, got " << error_entries.size() << " entries")); - iter = error_entries.begin(); - for (; iter != error_entries.end(); ++iter) { - error_marker = iter->first; - entry_timestamp = rgw::error_repo::decode_value(iter->second); - retcode = rgw::error_repo::decode_key(iter->first, source_bs, gen); - if (retcode == -EINVAL) { - // backward compatibility for string keys that don't encode a gen - retcode = parse_bucket_key(error_marker, source_bs); - } - if (retcode < 0) { - tn->log(1, SSTR("failed to parse bucket shard: " << error_marker)); - spawn(rgw::error_repo::remove_cr(sync_env->store->svc()->rados, error_repo, - error_marker, entry_timestamp), false); - continue; - } - tn->log(10, SSTR("gen is " << gen)); - if (!gen) { - // write all full sync obligations for the bucket to error repo - spawn(new RGWDataIncrementalSyncFullObligationCR(sc, source_bs, error_marker, entry_timestamp, tn), false); - } else { - tn->log(20, SSTR("handle error entry key=" << to_string(source_bs, gen) << " timestamp=" << entry_timestamp)); - spawn(data_sync_single_entry(sc, source_bs, gen, "", entry_timestamp, lease_cr, - bucket_shard_cache, &*marker_tracker, error_repo, tn, true), false); - } - } - if (!omapvals->more) { - error_retry_time = ceph::coarse_real_clock::now() + make_timespan(retry_backoff_secs); - error_marker.clear(); - } - } - omapvals.reset(); - - tn->log(20, SSTR("shard_id=" << shard_id << " sync_marker=" << sync_marker.marker)); - yield call(new RGWReadRemoteDataLogShardCR(sc, shard_id, sync_marker.marker, - &next_marker, &log_entries, &truncated)); - if (retcode < 0 && retcode != -ENOENT) { - tn->log(0, SSTR("ERROR: failed to read remote data log info: ret=" << retcode)); - lease_cr->go_down(); - drain_all(); - return set_cr_error(retcode); - } - - if (log_entries.size() > 0) { - tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */ - } - - for (log_iter = log_entries.begin(); log_iter != log_entries.end(); ++log_iter) { - tn->log(20, SSTR("shard_id=" << shard_id << " log_entry: " << log_iter->log_id << ":" << log_iter->log_timestamp << ":" << log_iter->entry.key)); - retcode = parse_bucket_key(log_iter->entry.key, source_bs); - if (retcode < 0) { - tn->log(1, SSTR("failed to parse bucket shard: " << log_iter->entry.key)); - marker_tracker->try_update_high_marker(log_iter->log_id, 0, log_iter->log_timestamp); - continue; - } - if (!marker_tracker->start(log_iter->log_id, 0, log_iter->log_timestamp)) { - tn->log(0, SSTR("ERROR: cannot start syncing " << log_iter->log_id << ". Duplicate entry?")); - } else { - tn->log(1, SSTR("incremental sync on " << log_iter->entry.key << "shard: " << shard_id << "on gen " << log_iter->entry.gen)); - yield_spawn_window(data_sync_single_entry(sc, source_bs, log_iter->entry.gen, log_iter->log_id, - log_iter->log_timestamp, lease_cr,bucket_shard_cache, - &*marker_tracker, error_repo, tn, false), - cct->_conf->rgw_data_sync_spawn_window, std::nullopt); - } - } - - tn->log(20, SSTR("shard_id=" << shard_id << " sync_marker=" << sync_marker.marker - << " next_marker=" << next_marker << " truncated=" << truncated)); - if (!next_marker.empty()) { - sync_marker.marker = next_marker; - } else if (!log_entries.empty()) { - sync_marker.marker = log_entries.back().log_id; - } - if (!truncated) { - // we reached the end, wait a while before checking for more - tn->unset_flag(RGW_SNS_FLAG_ACTIVE); - yield wait(get_idle_interval()); - } - } while (true); - } - return 0; - } - - utime_t get_idle_interval() const { - ceph::timespan interval = std::chrono::seconds(cct->_conf->rgw_data_sync_poll_interval); - if (!ceph::coarse_real_clock::is_zero(error_retry_time)) { - auto now = ceph::coarse_real_clock::now(); - if (error_retry_time > now) { - auto d = error_retry_time - now; - if (interval > d) { - interval = d; - } - } - } - // convert timespan -> time_point -> utime_t - return utime_t(ceph::coarse_real_clock::zero() + interval); - } }; class RGWDataSyncShardControlCR : public RGWBackoffControlCR { -- 2.39.5