From de54052db49e7bab7fdfa1a9361843565faddfae Mon Sep 17 00:00:00 2001 From: Shilpa Jagannath Date: Tue, 28 Jun 2022 12:59:12 -0400 Subject: [PATCH] rgw/multisite: move sync_single_entry() cr function out of RGWDataSyncShardCR Signed-off-by: Shilpa Jagannath --- src/rgw/rgw_data_sync.cc | 45 +++++++++++++++++++++++++--------------- 1 file changed, 28 insertions(+), 17 deletions(-) diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index 392cd2ed66797..7d0a12b7d947f 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -1482,6 +1482,23 @@ public: } }; +RGWCoroutine* sync_single_entry(RGWDataSyncCtx *sc, const rgw_bucket_shard& src, + std::optional gen, + const std::string& marker, + ceph::real_time timestamp, + boost::intrusive_ptr lease_cr, + boost::intrusive_ptr bucket_shard_cache, + std::optional marker_tracker, + rgw_raw_obj& error_repo, + RGWSyncTraceNodeRef& tn, + bool retry) { + auto state = bucket_shard_cache->get(src, gen); + auto obligation = rgw_data_sync_obligation{src, gen, marker, timestamp, retry}; + return new RGWDataSyncSingleEntryCR(sc, std::move(state), std::move(obligation), + &*marker_tracker, error_repo, + lease_cr.get(), tn); +} + #define DATA_SYNC_MAX_ERR_ENTRIES 10 class RGWDataSyncShardCR : public RGWCoroutine { @@ -1552,16 +1569,6 @@ class RGWDataSyncShardCR : public RGWCoroutine { &bs.bucket, &bs.shard_id); } - RGWCoroutine* sync_single_entry(const rgw_bucket_shard& src, - std::optional gen, - const std::string& marker, - ceph::real_time timestamp, bool retry) { - auto state = bucket_shard_cache->get(src, gen); - auto obligation = rgw_data_sync_obligation{src, gen, marker, timestamp, retry}; - return new RGWDataSyncSingleEntryCR(sc, std::move(state), std::move(obligation), - &*marker_tracker, error_repo, - lease_cr.get(), tn); - } public: RGWDataSyncShardCR(RGWDataSyncCtx *_sc, rgw_pool& _pool, uint32_t _shard_id, rgw_data_sync_marker& _marker, @@ -1699,7 +1706,8 @@ public: //if any of the operations fail at any time, write them into error repo for later retry. source_bs.shard_id = 0; - yield call(sync_single_entry(source_bs, remote_info.oldest_gen, iter->first, entry_timestamp, false)); + yield call(sync_single_entry(sc, source_bs, remote_info.oldest_gen, iter->first, entry_timestamp, + lease_cr, bucket_shard_cache, marker_tracker, error_repo, tn, false)); if (retcode < 0) { tn->log(10, SSTR("full sync: failed to sync " << source_bs.shard_id << " of gen " << remote_info.oldest_gen << ". Writing to error repo for retry")); @@ -1717,7 +1725,8 @@ public: for (sid = 0; sid < each->num_shards; sid++) { source_bs.shard_id = sid; tn->log(10, SSTR("full sync: syncing shard_id " << sid << " of gen " << each->gen)); - yield_spawn_window(sync_single_entry(source_bs, each->gen, iter->first, entry_timestamp, false), + yield_spawn_window(sync_single_entry(sc, source_bs, each->gen, iter->first, entry_timestamp, + lease_cr, bucket_shard_cache, marker_tracker, error_repo, tn, false), cct->_conf->rgw_data_sync_spawn_window, [&](uint64_t stack_id, int ret) { if (ret < 0) { @@ -1834,8 +1843,8 @@ public: continue; } tn->log(20, SSTR("received async update notification: " << modified_iter->key)); - spawn(sync_single_entry(source_bs, modified_iter->gen, string(), - ceph::real_time{}, false), false); + spawn(sync_single_entry(sc, source_bs, modified_iter->gen, string(), + ceph::real_time{}, lease_cr, bucket_shard_cache, marker_tracker, error_repo, tn, false), false); } if (error_retry_time <= ceph::coarse_real_clock::now()) { @@ -1871,7 +1880,8 @@ public: } } else { tn->log(20, SSTR("handle error entry key=" << to_string(source_bs, gen) << " timestamp=" << entry_timestamp)); - spawn(sync_single_entry(source_bs, gen, "", entry_timestamp, true), false); + spawn(sync_single_entry(sc, source_bs, gen, "", entry_timestamp, lease_cr, + bucket_shard_cache, marker_tracker, error_repo, tn, true), false); } } if (!omapvals->more) { @@ -1906,8 +1916,9 @@ public: if (!marker_tracker->start(log_iter->log_id, 0, log_iter->log_timestamp)) { tn->log(0, SSTR("ERROR: cannot start syncing " << log_iter->log_id << ". Duplicate entry?")); } else { - yield_spawn_window(sync_single_entry(source_bs, log_iter->entry.gen, log_iter->log_id, - log_iter->log_timestamp, false), + yield_spawn_window(sync_single_entry(sc, source_bs, log_iter->entry.gen, log_iter->log_id, + log_iter->log_timestamp, lease_cr,bucket_shard_cache, + marker_tracker, error_repo, tn, false), cct->_conf->rgw_data_sync_spawn_window, std::nullopt); } } -- 2.39.5