std::string incremental_marker(const rgw_bucket_shard_sync_info& info)
{
- if (info.state != rgw_bucket_shard_sync_info::StateIncrementalSync) {
- return "";
- }
return BucketIndexShardsManager::get_shard_marker(info.inc_marker.position);
}
return 0;
}
+ rgw_bucket_sync_status full_status;
+ int r = rgw_read_bucket_full_sync_status(dpp, store, pipe, &full_status, null_yield);
+ if (r < 0 && r != -ENOENT) { // retry on ENOENT
+ return r;
+ }
+
+ while (full_status.state != BucketSyncState::Incremental) {
+ auto delay_until = ceph::coarse_mono_clock::now() + retry_delay;
+ if (delay_until > timeout_at) {
+ lderr(store->ctx()) << "bucket checkpoint timed out waiting to reach incremental sync" << dendl;
+ return -ETIMEDOUT;
+ }
+ ldout(store->ctx(), 1) << "waiting to reach incremental sync.." << dendl;
+ std::this_thread::sleep_until(delay_until);
+
+ r = rgw_read_bucket_full_sync_status(dpp, store, pipe, &full_status, null_yield);
+ if (r < 0 && r != -ENOENT) { // retry on ENOENT
+ return r;
+ }
+ }
+
std::vector<rgw_bucket_shard_sync_info> status;
status.resize(std::max<size_t>(1, num_shards));
- int r = rgw_read_bucket_inc_sync_status(dpp, store, pipe, bucket_info,
- &source_bucket_info, &status);
+ r = rgw_read_bucket_inc_sync_status(dpp, store, pipe, bucket_info,
+ &source_bucket_info, &status);
if (r < 0) {
return r;
}