]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
radosgw-admin: 'bucket sync checkpoint' waits for generation to catch up
authorCasey Bodley <cbodley@redhat.com>
Tue, 18 May 2021 19:55:28 +0000 (15:55 -0400)
committerAdam C. Emerson <aemerson@redhat.com>
Tue, 1 Feb 2022 18:55:52 +0000 (13:55 -0500)
poll on rgw_read_bucket_full_sync_status() until
full_status.incremental_gen catches up to the latest_gen we got from
rgw_read_remote_bilog_info()

Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/rgw/rgw_sync_checkpoint.cc

index e0a82fcd6a9a3534321481b6e0cbb84e22a09f37..9f4b4ca71a53a07d7f22c4d2acfee5784f431726 100644 (file)
@@ -83,25 +83,22 @@ int bucket_source_sync_checkpoint(const DoutPrefixProvider* dpp,
                                   const RGWBucketInfo& bucket_info,
                                   const RGWBucketInfo& source_bucket_info,
                                   const rgw_sync_bucket_pipe& pipe,
+                                  uint64_t latest_gen,
                                   const BucketIndexShardsManager& remote_markers,
                                   ceph::timespan retry_delay,
                                   ceph::coarse_mono_time timeout_at)
 {
   const auto num_shards = source_bucket_info.layout.current_index.layout.normal.num_shards;
 
-  if (empty(remote_markers, num_shards)) {
-    ldpp_dout(dpp, 1) << "bucket sync caught up with empty source" << dendl;
-    return 0;
-  }
-
   rgw_bucket_sync_status full_status;
   int r = rgw_read_bucket_full_sync_status(dpp, store, pipe, &full_status, null_yield);
   if (r < 0 && r != -ENOENT) { // retry on ENOENT
     return r;
   }
 
+  // wait for incremental
   while (full_status.state != BucketSyncState::Incremental) {
-    auto delay_until = ceph::coarse_mono_clock::now() + retry_delay;
+    const auto delay_until = ceph::coarse_mono_clock::now() + retry_delay;
     if (delay_until > timeout_at) {
       lderr(store->ctx()) << "bucket checkpoint timed out waiting to reach incremental sync" << dendl;
       return -ETIMEDOUT;
@@ -115,6 +112,36 @@ int bucket_source_sync_checkpoint(const DoutPrefixProvider* dpp,
     }
   }
 
+  // wait for latest_gen
+  while (full_status.incremental_gen < latest_gen) {
+    const auto delay_until = ceph::coarse_mono_clock::now() + retry_delay;
+    if (delay_until > timeout_at) {
+      lderr(store->ctx()) << "bucket checkpoint timed out waiting to reach "
+          "latest generation " << latest_gen << dendl;
+      return -ETIMEDOUT;
+    }
+    ldout(store->ctx(), 1) << "waiting to reach latest gen " << latest_gen
+        << ", on " << full_status.incremental_gen << ".." << dendl;
+    std::this_thread::sleep_until(delay_until);
+
+    r = rgw_read_bucket_full_sync_status(dpp, store, pipe, &full_status, null_yield);
+    if (r < 0 && r != -ENOENT) { // retry on ENOENT
+      return r;
+    }
+  }
+
+  if (full_status.incremental_gen > latest_gen) {
+    ldpp_dout(dpp, 1) << "bucket sync caught up with source:\n"
+        << "        local gen: " << full_status.incremental_gen << '\n'
+        << "       remote gen: " << latest_gen << dendl;
+    return 0;
+  }
+
+  if (empty(remote_markers, num_shards)) {
+    ldpp_dout(dpp, 1) << "bucket sync caught up with empty source" << dendl;
+    return 0;
+  }
+
   std::vector<rgw_bucket_shard_sync_info> status;
   status.resize(std::max<size_t>(1, num_shards));
   r = rgw_read_bucket_inc_sync_status(dpp, store, pipe, bucket_info,
@@ -124,7 +151,7 @@ int bucket_source_sync_checkpoint(const DoutPrefixProvider* dpp,
   }
 
   while (status < remote_markers) {
-    auto delay_until = ceph::coarse_mono_clock::now() + retry_delay;
+    const auto delay_until = ceph::coarse_mono_clock::now() + retry_delay;
     if (delay_until > timeout_at) {
       ldpp_dout(dpp, 0) << "bucket checkpoint timed out waiting for incremental sync to catch up" << dendl;
       return -ETIMEDOUT;
@@ -177,6 +204,7 @@ int rgw_bucket_sync_checkpoint(const DoutPrefixProvider* dpp,
 {
   struct sync_source_entry {
     rgw_sync_bucket_pipe pipe;
+    uint64_t latest_gen = 0;
     BucketIndexShardsManager remote_markers;
     RGWBucketInfo source_bucket_info;
   };
@@ -207,6 +235,7 @@ int rgw_bucket_sync_checkpoint(const DoutPrefixProvider* dpp,
             << cpp_strerror(r) << dendl;
         throw std::system_error(-r, std::system_category());
       }
+      entry.latest_gen = info.latest_gen;
     });
     // fetch source bucket info
     spawn::spawn(ioctx, [&] (yield_context yield) {
@@ -230,9 +259,9 @@ int rgw_bucket_sync_checkpoint(const DoutPrefixProvider* dpp,
   }
 
   // checkpoint each source sequentially
-  for (const auto& [pipe, remote_markers, source_bucket_info] : sources) {
-    int r = bucket_source_sync_checkpoint(dpp, store, info, source_bucket_info,
-                                          pipe, remote_markers,
+  for (const auto& e : sources) {
+    int r = bucket_source_sync_checkpoint(dpp, store, info, e.source_bucket_info,
+                                          e.pipe, e.latest_gen, e.remote_markers,
                                           retry_delay, timeout_at);
     if (r < 0) {
       ldpp_dout(dpp, 0) << "bucket sync checkpoint failed: " << cpp_strerror(r) << dendl;