return rgw_bucket_parse_bucket_key(sync_env->cct, key,
&bs.bucket, &bs.shard_id);
}
+ RGWCoroutine* sync_single_entry(const rgw_bucket_shard& src,
+ const std::string& key,
+ const std::string& marker,
+ ceph::real_time timestamp, bool retry) {
+ return new RGWDataSyncSingleEntryCR(sc, src, key, marker, timestamp,
+ &*marker_tracker, error_repo, retry, tn);
+ }
public:
RGWDataSyncShardCR(RGWDataSyncCtx *_sc,
rgw_pool& _pool,
tn->log(0, SSTR("ERROR: cannot start syncing " << iter->first << ". Duplicate entry?"));
} else {
// fetch remote and write locally
- yield spawn(new RGWDataSyncSingleEntryCR(sc, source_bs, iter->first, iter->first, entry_timestamp, &*marker_tracker, error_repo, false, tn), false);
+ spawn(sync_single_entry(source_bs, iter->first, iter->first,
+ entry_timestamp, false), false);
}
sync_marker.marker = iter->first;
continue;
}
tn->log(20, SSTR("received async update notification: " << *modified_iter));
- spawn(new RGWDataSyncSingleEntryCR(sc, source_bs, *modified_iter, string(), ceph::real_time{}, &*marker_tracker, nullptr, false, tn), false);
+ spawn(sync_single_entry(source_bs, *modified_iter, string(),
+ ceph::real_time{}, false), false);
}
if (error_retry_time <= ceph::coarse_real_clock::now()) {
continue;
}
tn->log(20, SSTR("handle error entry key=" << error_marker << " timestamp=" << entry_timestamp));
- spawn(new RGWDataSyncSingleEntryCR(sc, source_bs, error_marker, error_marker,
- entry_timestamp, nullptr /* no marker tracker */,
- error_repo, true, tn), false);
+ spawn(sync_single_entry(source_bs, error_marker, "",
+ entry_timestamp, true), false);
}
if (!omapvals->more) {
if (error_marker.empty() && error_entries.empty()) {
if (!marker_tracker->start(log_iter->log_id, 0, log_iter->log_timestamp)) {
tn->log(0, SSTR("ERROR: cannot start syncing " << log_iter->log_id << ". Duplicate entry?"));
} else {
- spawn(new RGWDataSyncSingleEntryCR(sc, source_bs, log_iter->entry.key, log_iter->log_id,
- log_iter->log_timestamp, &*marker_tracker,
- error_repo, false, tn), false);
+ spawn(sync_single_entry(source_bs, log_iter->entry.key, log_iter->log_id,
+ log_iter->log_timestamp, false), false);
}
while ((int)num_spawned() > spawn_window) {
set_status() << "num_spawned() > spawn_window";