From c0baf3eb34c6c1de7e4de2e35cb62e219c174b0b Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Thu, 25 Jun 2020 14:04:52 -0700 Subject: [PATCH] rgw: bucket sync: track progress by stack id Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_data_sync.cc | 39 +++++++++++++++++++++++++++++++-------- 1 file changed, 31 insertions(+), 8 deletions(-) diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index 082916d97ac8..300a5d862dd5 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -1217,8 +1217,10 @@ class RGWRunBucketSourcesSyncCR : public RGWCoroutine { RGWSyncTraceNodeRef tn; ceph::real_time* progress; - std::vector shard_progress; - std::vector::iterator cur_shard_progress; + std::map shard_progress; + + ceph::real_time *cur_progress{nullptr}; + std::optional min_progress; RGWRESTConn *conn{nullptr}; rgw_zone_id last_zone; @@ -1241,6 +1243,25 @@ public: ceph::real_time* progress); int operate() override; + + void handle_complete_stack(uint64_t stack_id) { + auto iter = shard_progress.find(stack_id); + if (iter == shard_progress.end()) { + lderr(cct) << "ERROR: RGWRunBucketSourcesSyncCR::handle_complete_stack(): stack_id=" << stack_id << " not found! Likely a bug" << dendl; + return; + } + if (progress) { + if (!min_progress) { + min_progress = iter->second; + } else { + if (iter->second < *min_progress) { + min_progress = iter->second; + } + } + } + + shard_progress.erase(stack_id); + } }; class RGWDataSyncSingleEntryCR : public RGWCoroutine { @@ -4332,9 +4353,7 @@ int RGWRunBucketSourcesSyncCR::operate() ldpp_dout(sync_env->dpp, 20) << __func__ << "(): num shards=" << num_shards << " cur_shard=" << cur_shard << dendl; - shard_progress.resize(num_shards); - cur_shard_progress = shard_progress.begin(); - for (; num_shards > 0; --num_shards, ++cur_shard, ++cur_shard_progress) { + for (; num_shards > 0; --num_shards, ++cur_shard) { /* * use a negatvie shard_id for backward compatibility, * this affects the crafted status oid @@ -4348,10 +4367,13 @@ int RGWRunBucketSourcesSyncCR::operate() ldpp_dout(sync_env->dpp, 20) << __func__ << "(): sync_pair=" << sync_pair << dendl; + cur_progress = (progress ? &shard_progress[prealloc_stack_id()] : nullptr); + yield_spawn_window(new RGWRunBucketSyncCoroutine(sc, lease_cr, sync_pair, tn, - &*cur_shard_progress), + cur_progress), BUCKET_SYNC_SPAWN_WINDOW, [&](uint64_t stack_id, int ret) { + handle_complete_stack(stack_id); if (ret < 0) { tn->log(10, "a sync operation returned error"); } @@ -4360,13 +4382,14 @@ int RGWRunBucketSourcesSyncCR::operate() } } drain_all_cb([&](uint64_t stack_id, int ret) { + handle_complete_stack(stack_id); if (ret < 0) { tn->log(10, "a sync operation returned error"); } return ret; }); - if (progress) { - *progress = *std::min_element(shard_progress.begin(), shard_progress.end()); + if (progress && min_progress) { + *progress = *min_progress; } return set_cr_done(); } -- 2.47.3