RGWSyncTraceNodeRef tn;
ceph::real_time* progress;
- std::map<uint64_t, ceph::real_time> shard_progress;
-
- ceph::real_time *cur_progress{nullptr};
- std::optional<ceph::real_time> min_progress;
+ std::vector<ceph::real_time> shard_progress;
+ std::vector<ceph::real_time>::iterator cur_shard_progress;
RGWRESTConn *conn{nullptr};
rgw_zone_id last_zone;
ceph::real_time* progress);
int operate(const DoutPrefixProvider *dpp) override;
-
- void handle_complete_stack(uint64_t stack_id) {
- auto iter = shard_progress.find(stack_id);
- if (iter == shard_progress.end()) {
- lderr(cct) << "ERROR: RGWRunBucketSourcesSyncCR::handle_complete_stack(): stack_id=" << stack_id << " not found! Likely a bug" << dendl;
- return;
- }
- if (progress) {
- if (!min_progress) {
- min_progress = iter->second;
- } else {
- if (iter->second < *min_progress) {
- min_progress = iter->second;
- }
- }
- }
-
- shard_progress.erase(stack_id);
- }
};
class RGWDataSyncSingleEntryCR : public RGWCoroutine {
ldpp_dout(dpp, 20) << __func__ << "(): sync_pair=" << sync_pair << dendl;
- cur_progress = (progress ? &shard_progress[prealloc_stack_id()] : nullptr);
+ shard_progress.resize(1);
+ cur_shard_progress = shard_progress.begin();
yield_spawn_window(sync_bucket_shard_cr(sc, lease_cr, sync_pair,
- gen, tn, cur_progress),
+ gen, tn, &*cur_shard_progress),
BUCKET_SYNC_SPAWN_WINDOW,
[&](uint64_t stack_id, int ret) {
- handle_complete_stack(stack_id);
if (ret < 0) {
tn->log(10, SSTR("ERROR: a sync operation returned error: " << ret));
}
});
}
drain_all_cb([&](uint64_t stack_id, int ret) {
- handle_complete_stack(stack_id);
if (ret < 0) {
tn->log(10, SSTR("a sync operation returned error: " << ret));
}
return ret;
});
- if (progress && min_progress) {
- *progress = *min_progress;
+ if (progress) {
+ *progress = *std::min_element(shard_progress.begin(), shard_progress.end());
}
return set_cr_done();
}