RGWSyncTraceNodeRef tn;
ceph::real_time* progress;
- std::vector<ceph::real_time> shard_progress;
- std::vector<ceph::real_time>::iterator cur_shard_progress;
+ std::map<uint64_t, ceph::real_time> shard_progress;
+
+ ceph::real_time *cur_progress{nullptr};
+ std::optional<ceph::real_time> min_progress;
RGWRESTConn *conn{nullptr};
rgw_zone_id last_zone;
ceph::real_time* progress);
int operate() override;
+
+ void handle_complete_stack(uint64_t stack_id) {
+ auto iter = shard_progress.find(stack_id);
+ if (iter == shard_progress.end()) {
+ lderr(cct) << "ERROR: RGWRunBucketSourcesSyncCR::handle_complete_stack(): stack_id=" << stack_id << " not found! Likely a bug" << dendl;
+ return;
+ }
+ if (progress) {
+ if (!min_progress) {
+ min_progress = iter->second;
+ } else {
+ if (iter->second < *min_progress) {
+ min_progress = iter->second;
+ }
+ }
+ }
+
+ shard_progress.erase(stack_id);
+ }
};
class RGWDataSyncSingleEntryCR : public RGWCoroutine {
ldpp_dout(sync_env->dpp, 20) << __func__ << "(): num shards=" << num_shards << " cur_shard=" << cur_shard << dendl;
- shard_progress.resize(num_shards);
- cur_shard_progress = shard_progress.begin();
- for (; num_shards > 0; --num_shards, ++cur_shard, ++cur_shard_progress) {
+ for (; num_shards > 0; --num_shards, ++cur_shard) {
/*
* use a negatvie shard_id for backward compatibility,
* this affects the crafted status oid
ldpp_dout(sync_env->dpp, 20) << __func__ << "(): sync_pair=" << sync_pair << dendl;
+ cur_progress = (progress ? &shard_progress[prealloc_stack_id()] : nullptr);
+
yield_spawn_window(new RGWRunBucketSyncCoroutine(sc, lease_cr, sync_pair, tn,
- &*cur_shard_progress),
+ cur_progress),
BUCKET_SYNC_SPAWN_WINDOW,
[&](uint64_t stack_id, int ret) {
+ handle_complete_stack(stack_id);
if (ret < 0) {
tn->log(10, "a sync operation returned error");
}
}
}
drain_all_cb([&](uint64_t stack_id, int ret) {
+ handle_complete_stack(stack_id);
if (ret < 0) {
tn->log(10, "a sync operation returned error");
}
return ret;
});
- if (progress) {
- *progress = *std::min_element(shard_progress.begin(), shard_progress.end());
+ if (progress && min_progress) {
+ *progress = *min_progress;
}
return set_cr_done();
}