]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: handle older/newer generations after reading bucket sync status
authorCasey Bodley <cbodley@redhat.com>
Tue, 9 Feb 2021 23:00:14 +0000 (18:00 -0500)
committerAdam C. Emerson <aemerson@redhat.com>
Mon, 13 Sep 2021 16:27:50 +0000 (12:27 -0400)
wait until we've read the bucket sync status and found that we're in
incremental sync before we start using incremental_gen for comparison

Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/rgw/rgw_data_sync.cc

index a1d2e2e9e289351883a6b3e11ab5bdbb73f96bd1..b1117567c5415d6e6549be358771e96126a5fc1b 100644 (file)
@@ -1233,8 +1233,7 @@ class RGWRunBucketSourcesSyncCR : public RGWCoroutine {
   int num_shards{0};
   int cur_shard{0};
   bool again = false;
-  std::uint64_t syncing_gen = 0; // TODO: Fill this in from bucket sync status
-  std::optional<uint64_t> entry_gen;
+  std::optional<uint64_t> gen;
 
 public:
   RGWRunBucketSourcesSyncCR(RGWDataSyncCtx *_sc,
@@ -1242,7 +1241,7 @@ public:
                             std::optional<rgw_bucket_shard> _target_bs,
                             std::optional<rgw_bucket_shard> _source_bs,
                             const RGWSyncTraceNodeRef& _tn_parent,
-                           std::optional<uint64_t> entry_gen,
+                           std::optional<uint64_t> gen,
                             ceph::real_time* progress);
 
   int operate(const DoutPrefixProvider *dpp) override;
@@ -4580,6 +4579,7 @@ std::ostream& operator<<(std::ostream& out, std::optional<rgw_bucket_shard>& bs)
 static RGWCoroutine* sync_bucket_shard_cr(RGWDataSyncCtx* sc,
                                           boost::intrusive_ptr<const RGWContinuousLeaseCR> lease,
                                           const rgw_bucket_sync_pair_info& sync_pair,
+                                          std::optional<uint64_t> gen,
                                           const RGWSyncTraceNodeRef& tn,
                                           ceph::real_time* progress);
 
@@ -4588,7 +4588,7 @@ RGWRunBucketSourcesSyncCR::RGWRunBucketSourcesSyncCR(RGWDataSyncCtx *_sc,
                                                      std::optional<rgw_bucket_shard> _target_bs,
                                                      std::optional<rgw_bucket_shard> _source_bs,
                                                      const RGWSyncTraceNodeRef& _tn_parent,
-                                                    std::optional<uint64_t> entry_gen,
+                                                    std::optional<uint64_t> gen,
                                                      ceph::real_time* progress)
   : RGWCoroutine(_sc->env->cct), sc(_sc), sync_env(_sc->env),
     lease_cr(std::move(lease_cr)), target_bs(_target_bs), source_bs(_source_bs),
@@ -4598,7 +4598,7 @@ RGWRunBucketSourcesSyncCR::RGWRunBucketSourcesSyncCR(RGWDataSyncCtx *_sc,
               ":source_bucket=" << source_bucket.value_or(rgw_bucket()) <<
               ":source_zone=" << sc->source_zone))),
     progress(progress),
-    entry_gen(entry_gen)
+    gen(gen)
 {
   if (target_bs) {
     target_bucket = target_bs->bucket;
@@ -4629,15 +4629,6 @@ int RGWRunBucketSourcesSyncCR::operate(const DoutPrefixProvider *dpp)
         ldpp_dout(dpp, 20) << __func__ << "(): sync pipe=" << *siter << dendl;
 
         source_num_shards = siter->source.get_bucket_info().layout.current_index.layout.normal.num_shards;
-       if (entry_gen) {
-         if (*entry_gen > syncing_gen) {
-           tn->log(10, "Future generation in datalog entry. Returning error so we'll retry.");
-           return set_cr_error(-EAGAIN);
-         } else if (*entry_gen < syncing_gen) {
-           tn->log(10, "Future generation in datalog entry. Returning error so we'll retry.");
-           return 0;
-         }
-       }
        target_num_shards = siter->target.get_bucket_info().layout.current_index.layout.normal.num_shards;
         if (source_bs) {
           sync_pair.source_bs = *source_bs;
@@ -4675,8 +4666,8 @@ int RGWRunBucketSourcesSyncCR::operate(const DoutPrefixProvider *dpp)
 
         cur_progress = (progress ? &shard_progress[prealloc_stack_id()] : nullptr);
 
-        yield_spawn_window(sync_bucket_shard_cr(sc, lease_cr, sync_pair, tn,
-                                                cur_progress),
+        yield_spawn_window(sync_bucket_shard_cr(sc, lease_cr, sync_pair,
+                                                gen, tn, cur_progress),
                            BUCKET_SYNC_SPAWN_WINDOW,
                            [&](uint64_t stack_id, int ret) {
                              handle_complete_stack(stack_id);
@@ -5049,6 +5040,7 @@ class RGWSyncBucketCR : public RGWCoroutine {
   boost::intrusive_ptr<RGWContinuousLeaseCR> bucket_lease_cr;
   rgw_bucket_sync_pair_info sync_pair;
   rgw_bucket_sync_pipe sync_pipe;
+  std::optional<uint64_t> gen;
   ceph::real_time* progress;
 
   const std::string lock_name = "bucket sync";
@@ -5064,10 +5056,12 @@ public:
   RGWSyncBucketCR(RGWDataSyncCtx *_sc,
                   boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr,
                   const rgw_bucket_sync_pair_info& _sync_pair,
+                  std::optional<uint64_t> gen,
                   const RGWSyncTraceNodeRef& _tn_parent,
                   ceph::real_time* progress)
     : RGWCoroutine(_sc->cct), sc(_sc), env(_sc->env),
-      data_lease_cr(std::move(lease_cr)), sync_pair(_sync_pair), progress(progress),
+      data_lease_cr(std::move(lease_cr)), sync_pair(_sync_pair),
+      gen(gen), progress(progress),
       lock_duration(cct->_conf->rgw_sync_lease_period),
       status_obj(env->svc->zone->get_zone_params().log_pool,
                  RGWBucketPipeSyncStatusManager::full_status_oid(sc->source_zone,
@@ -5083,10 +5077,12 @@ public:
 static RGWCoroutine* sync_bucket_shard_cr(RGWDataSyncCtx* sc,
                                           boost::intrusive_ptr<const RGWContinuousLeaseCR> lease,
                                           const rgw_bucket_sync_pair_info& sync_pair,
+                                          std::optional<uint64_t> gen,
                                           const RGWSyncTraceNodeRef& tn,
                                           ceph::real_time* progress)
 {
-  return new RGWSyncBucketCR(sc, std::move(lease), sync_pair, tn, progress);
+  return new RGWSyncBucketCR(sc, std::move(lease), sync_pair,
+                             gen, tn, progress);
 }
 
 int RGWSyncBucketCR::operate(const DoutPrefixProvider *dpp)
@@ -5199,6 +5195,24 @@ int RGWSyncBucketCR::operate(const DoutPrefixProvider *dpp)
           drain_all();
           bucket_lease_cr.reset();
         }
+
+        // if a specific gen was requested, compare that to the sync status
+        if (gen) {
+          const auto current_gen = bucket_status.incremental_gen;
+          if (*gen > current_gen) {
+            retcode = -EAGAIN;
+            tn->log(10, SSTR("requested sync of future generation "
+                             << *gen << " > " << current_gen
+                             << ", returning " << retcode << " for later retry"));
+            return set_cr_error(retcode);
+          } else if (*gen < current_gen) {
+            tn->log(10, SSTR("requested sync of past generation "
+                             << *gen << " < " << current_gen
+                             << ", returning success"));
+            return set_cr_done();
+          }
+        }
+
         yield call(new RGWSyncBucketShardCR(sc, data_lease_cr, sync_pair,
                                             sync_pipe, bucket_status.state,
                                             tn, progress));
@@ -5221,7 +5235,9 @@ RGWCoroutine *RGWRemoteBucketManager::run_sync_cr(int num)
     return nullptr;
   }
 
-  return sync_bucket_shard_cr(&sc, nullptr, sync_pairs[num], sync_env->sync_tracer->root_node, nullptr);
+  constexpr std::optional<uint64_t> gen; // sync current gen
+  return sync_bucket_shard_cr(&sc, nullptr, sync_pairs[num], gen,
+                              sync_env->sync_tracer->root_node, nullptr);
 }
 
 int RGWBucketPipeSyncStatusManager::init(const DoutPrefixProvider *dpp)