const RGWBucketInfo& bucket_info,
const RGWBucketInfo& source_bucket_info,
const rgw_sync_bucket_pipe& pipe,
+ uint64_t latest_gen,
const BucketIndexShardsManager& remote_markers,
ceph::timespan retry_delay,
ceph::coarse_mono_time timeout_at)
{
const auto num_shards = source_bucket_info.layout.current_index.layout.normal.num_shards;
- if (empty(remote_markers, num_shards)) {
- ldpp_dout(dpp, 1) << "bucket sync caught up with empty source" << dendl;
- return 0;
- }
-
rgw_bucket_sync_status full_status;
int r = rgw_read_bucket_full_sync_status(dpp, store, pipe, &full_status, null_yield);
if (r < 0 && r != -ENOENT) { // retry on ENOENT
return r;
}
+ // wait for incremental
while (full_status.state != BucketSyncState::Incremental) {
- auto delay_until = ceph::coarse_mono_clock::now() + retry_delay;
+ const auto delay_until = ceph::coarse_mono_clock::now() + retry_delay;
if (delay_until > timeout_at) {
lderr(store->ctx()) << "bucket checkpoint timed out waiting to reach incremental sync" << dendl;
return -ETIMEDOUT;
}
}
+ // wait for latest_gen
+ while (full_status.incremental_gen < latest_gen) {
+ const auto delay_until = ceph::coarse_mono_clock::now() + retry_delay;
+ if (delay_until > timeout_at) {
+ lderr(store->ctx()) << "bucket checkpoint timed out waiting to reach "
+ "latest generation " << latest_gen << dendl;
+ return -ETIMEDOUT;
+ }
+ ldout(store->ctx(), 1) << "waiting to reach latest gen " << latest_gen
+ << ", on " << full_status.incremental_gen << ".." << dendl;
+ std::this_thread::sleep_until(delay_until);
+
+ r = rgw_read_bucket_full_sync_status(dpp, store, pipe, &full_status, null_yield);
+ if (r < 0 && r != -ENOENT) { // retry on ENOENT
+ return r;
+ }
+ }
+
+ if (full_status.incremental_gen > latest_gen) {
+ ldpp_dout(dpp, 1) << "bucket sync caught up with source:\n"
+ << " local gen: " << full_status.incremental_gen << '\n'
+ << " remote gen: " << latest_gen << dendl;
+ return 0;
+ }
+
+ if (empty(remote_markers, num_shards)) {
+ ldpp_dout(dpp, 1) << "bucket sync caught up with empty source" << dendl;
+ return 0;
+ }
+
std::vector<rgw_bucket_shard_sync_info> status;
status.resize(std::max<size_t>(1, num_shards));
r = rgw_read_bucket_inc_sync_status(dpp, store, pipe, bucket_info,
}
while (status < remote_markers) {
- auto delay_until = ceph::coarse_mono_clock::now() + retry_delay;
+ const auto delay_until = ceph::coarse_mono_clock::now() + retry_delay;
if (delay_until > timeout_at) {
ldpp_dout(dpp, 0) << "bucket checkpoint timed out waiting for incremental sync to catch up" << dendl;
return -ETIMEDOUT;
{
struct sync_source_entry {
rgw_sync_bucket_pipe pipe;
+ uint64_t latest_gen = 0;
BucketIndexShardsManager remote_markers;
RGWBucketInfo source_bucket_info;
};
<< cpp_strerror(r) << dendl;
throw std::system_error(-r, std::system_category());
}
+ entry.latest_gen = info.latest_gen;
});
// fetch source bucket info
spawn::spawn(ioctx, [&] (yield_context yield) {
}
// checkpoint each source sequentially
- for (const auto& [pipe, remote_markers, source_bucket_info] : sources) {
- int r = bucket_source_sync_checkpoint(dpp, store, info, source_bucket_info,
- pipe, remote_markers,
+ for (const auto& e : sources) {
+ int r = bucket_source_sync_checkpoint(dpp, store, info, e.source_bucket_info,
+ e.pipe, e.latest_gen, e.remote_markers,
retry_delay, timeout_at);
if (r < 0) {
ldpp_dout(dpp, 0) << "bucket sync checkpoint failed: " << cpp_strerror(r) << dendl;