From 5384d60f1b6e238e9ef2f42ad2638ebf38a56822 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Tue, 18 May 2021 15:55:28 -0400 Subject: [PATCH] radosgw-admin: 'bucket sync checkpoint' waits for generation to catch up poll on rgw_read_bucket_full_sync_status() until full_status.incremental_gen catches up to the latest_gen we got from rgw_read_remote_bilog_info() Signed-off-by: Casey Bodley --- src/rgw/rgw_sync_checkpoint.cc | 49 +++++++++++++++++++++++++++------- 1 file changed, 39 insertions(+), 10 deletions(-) diff --git a/src/rgw/rgw_sync_checkpoint.cc b/src/rgw/rgw_sync_checkpoint.cc index e0a82fcd6a9a3..9f4b4ca71a53a 100644 --- a/src/rgw/rgw_sync_checkpoint.cc +++ b/src/rgw/rgw_sync_checkpoint.cc @@ -83,25 +83,22 @@ int bucket_source_sync_checkpoint(const DoutPrefixProvider* dpp, const RGWBucketInfo& bucket_info, const RGWBucketInfo& source_bucket_info, const rgw_sync_bucket_pipe& pipe, + uint64_t latest_gen, const BucketIndexShardsManager& remote_markers, ceph::timespan retry_delay, ceph::coarse_mono_time timeout_at) { const auto num_shards = source_bucket_info.layout.current_index.layout.normal.num_shards; - if (empty(remote_markers, num_shards)) { - ldpp_dout(dpp, 1) << "bucket sync caught up with empty source" << dendl; - return 0; - } - rgw_bucket_sync_status full_status; int r = rgw_read_bucket_full_sync_status(dpp, store, pipe, &full_status, null_yield); if (r < 0 && r != -ENOENT) { // retry on ENOENT return r; } + // wait for incremental while (full_status.state != BucketSyncState::Incremental) { - auto delay_until = ceph::coarse_mono_clock::now() + retry_delay; + const auto delay_until = ceph::coarse_mono_clock::now() + retry_delay; if (delay_until > timeout_at) { lderr(store->ctx()) << "bucket checkpoint timed out waiting to reach incremental sync" << dendl; return -ETIMEDOUT; @@ -115,6 +112,36 @@ int bucket_source_sync_checkpoint(const DoutPrefixProvider* dpp, } } + // wait for latest_gen + while (full_status.incremental_gen < latest_gen) { + const auto delay_until = ceph::coarse_mono_clock::now() + retry_delay; + if (delay_until > timeout_at) { + lderr(store->ctx()) << "bucket checkpoint timed out waiting to reach " + "latest generation " << latest_gen << dendl; + return -ETIMEDOUT; + } + ldout(store->ctx(), 1) << "waiting to reach latest gen " << latest_gen + << ", on " << full_status.incremental_gen << ".." << dendl; + std::this_thread::sleep_until(delay_until); + + r = rgw_read_bucket_full_sync_status(dpp, store, pipe, &full_status, null_yield); + if (r < 0 && r != -ENOENT) { // retry on ENOENT + return r; + } + } + + if (full_status.incremental_gen > latest_gen) { + ldpp_dout(dpp, 1) << "bucket sync caught up with source:\n" + << " local gen: " << full_status.incremental_gen << '\n' + << " remote gen: " << latest_gen << dendl; + return 0; + } + + if (empty(remote_markers, num_shards)) { + ldpp_dout(dpp, 1) << "bucket sync caught up with empty source" << dendl; + return 0; + } + std::vector status; status.resize(std::max(1, num_shards)); r = rgw_read_bucket_inc_sync_status(dpp, store, pipe, bucket_info, @@ -124,7 +151,7 @@ int bucket_source_sync_checkpoint(const DoutPrefixProvider* dpp, } while (status < remote_markers) { - auto delay_until = ceph::coarse_mono_clock::now() + retry_delay; + const auto delay_until = ceph::coarse_mono_clock::now() + retry_delay; if (delay_until > timeout_at) { ldpp_dout(dpp, 0) << "bucket checkpoint timed out waiting for incremental sync to catch up" << dendl; return -ETIMEDOUT; @@ -177,6 +204,7 @@ int rgw_bucket_sync_checkpoint(const DoutPrefixProvider* dpp, { struct sync_source_entry { rgw_sync_bucket_pipe pipe; + uint64_t latest_gen = 0; BucketIndexShardsManager remote_markers; RGWBucketInfo source_bucket_info; }; @@ -207,6 +235,7 @@ int rgw_bucket_sync_checkpoint(const DoutPrefixProvider* dpp, << cpp_strerror(r) << dendl; throw std::system_error(-r, std::system_category()); } + entry.latest_gen = info.latest_gen; }); // fetch source bucket info spawn::spawn(ioctx, [&] (yield_context yield) { @@ -230,9 +259,9 @@ int rgw_bucket_sync_checkpoint(const DoutPrefixProvider* dpp, } // checkpoint each source sequentially - for (const auto& [pipe, remote_markers, source_bucket_info] : sources) { - int r = bucket_source_sync_checkpoint(dpp, store, info, source_bucket_info, - pipe, remote_markers, + for (const auto& e : sources) { + int r = bucket_source_sync_checkpoint(dpp, store, info, e.source_bucket_info, + e.pipe, e.latest_gen, e.remote_markers, retry_delay, timeout_at); if (r < 0) { ldpp_dout(dpp, 0) << "bucket sync checkpoint failed: " << cpp_strerror(r) << dendl; -- 2.39.5