]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: update bucket sync status after bucket shards finishes current gen
authorShilpa Jagannath <smanjara@redhat.com>
Mon, 5 Apr 2021 20:15:45 +0000 (01:45 +0530)
committerAdam C. Emerson <aemerson@redhat.com>
Mon, 13 Sep 2021 16:27:50 +0000 (12:27 -0400)
Signed-off-by: Shilpa Jagannath <smanjara@redhat.com>
src/rgw/rgw_data_sync.cc

index b1117567c5415d6e6549be358771e96126a5fc1b..b51e7b939783a26ba433a16c6ee5c8c7cbc77c2f 100644 (file)
@@ -3183,6 +3183,7 @@ public:
         }
       }
 
+      status.shards_done_with_gen.resize(num_shards);
       status.incremental_gen = info.latest_gen;
 
       ldout(cct, 20) << "writing bucket sync state=" << status.state << dendl;
@@ -4137,12 +4138,88 @@ static bool has_olh_epoch(RGWModifyOp op) {
   return op == CLS_RGW_OP_LINK_OLH || op == CLS_RGW_OP_UNLINK_INSTANCE;
 }
 
+class RGWBucketShardIsDoneCR : public RGWCoroutine {
+  RGWDataSyncCtx *sc;
+  RGWDataSyncEnv *sync_env;
+  rgw_bucket_sync_status bucket_status;
+  const rgw_raw_obj& bucket_status_obj;
+  const int shard_id;
+  RGWObjVersionTracker objv_tracker;
+  const next_bilog_result& next_log;
+  const uint64_t generation;
+
+public:
+  RGWBucketShardIsDoneCR(RGWDataSyncCtx *_sc, const rgw_raw_obj& _bucket_status_obj,
+                         int _shard_id, const next_bilog_result& _next_log, const uint64_t _gen)
+    : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
+      bucket_status_obj(_bucket_status_obj),
+      shard_id(_shard_id), next_log(_next_log), generation(_gen) {}
+
+  int operate(const DoutPrefixProvider* dpp) override
+  {
+    reenter(this) {
+      do {
+        // read bucket sync status
+        objv_tracker.clear();
+        using ReadCR = RGWSimpleRadosReadCR<rgw_bucket_sync_status>;
+        yield call(new ReadCR(dpp, sync_env->async_rados, sync_env->svc->sysobj,
+                              bucket_status_obj, &bucket_status, false, &objv_tracker));
+        if (retcode < 0) {
+          ldpp_dout(dpp, 20) << "failed to read bucket shard status: "
+              << cpp_strerror(retcode) << dendl;
+          return set_cr_error(retcode);
+        }
+
+        if (bucket_status.state != BucketSyncState::Incremental) {
+          // exit with success to avoid stale shard being
+          // retried in error repo if we lost a race
+          ldpp_dout(dpp, 20) << "RGWBucketShardIsDoneCR found sync state = " << bucket_status.state << dendl;
+          return set_cr_done();
+        }
+
+        if (bucket_status.incremental_gen != generation) {
+          // exit with success to avoid stale shard being
+          // retried in error repo if we lost a race
+          ldpp_dout(dpp, 20) << "RGWBucketShardIsDoneCR expected gen: " << generation
+              << ", got: " << bucket_status.incremental_gen << dendl;
+          return set_cr_done();
+        }
+
+        yield {
+          // update bucket_status after a shard is done with current gen
+          auto& done = bucket_status.shards_done_with_gen;
+          done[shard_id] = true;
+
+          // increment gen if all shards are already done with current gen
+          if (std::all_of(done.begin(), done.end(),
+            [] (const bool done){return done; } )) {
+            bucket_status.incremental_gen = next_log.generation;
+            done.clear();
+            done.resize(next_log.num_shards, false);
+          }
+          using WriteCR = RGWSimpleRadosWriteCR<rgw_bucket_sync_status>;
+          call(new WriteCR(dpp, sync_env->async_rados, sync_env->svc->sysobj,
+                            bucket_status_obj, bucket_status, &objv_tracker, false));
+        }
+        if (retcode < 0 && retcode != -ECANCELED) {
+          ldpp_dout(dpp, 20) << "failed to write bucket sync status: " << cpp_strerror(retcode) << dendl;
+          return set_cr_error(retcode);
+        } else if (retcode >= 0) {
+          return set_cr_done();
+        }
+      } while (retcode == -ECANCELED);
+    }
+    return 0;
+  }
+};
+
 class RGWBucketShardIncrementalSyncCR : public RGWCoroutine {
   RGWDataSyncCtx *sc;
   RGWDataSyncEnv *sync_env;
   rgw_bucket_sync_pipe& sync_pipe;
   RGWBucketSyncFlowManager::pipe_rules_ref rules;
   rgw_bucket_shard& bs;
+  const rgw_raw_obj& bucket_status_obj;
   boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr;
   bilog_list_result extended_result;
   list<rgw_bi_log_entry> list_result;
@@ -4171,7 +4248,8 @@ class RGWBucketShardIncrementalSyncCR : public RGWCoroutine {
 public:
   RGWBucketShardIncrementalSyncCR(RGWDataSyncCtx *_sc,
                                   rgw_bucket_sync_pipe& _sync_pipe,
-                                  const std::string& status_oid,
+                                  const std::string& shard_status_oid,
+                                  const rgw_raw_obj& _bucket_status_obj,
                                   boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr,
                                   rgw_bucket_shard_sync_info& sync_info,
                                   RGWSyncTraceNodeRef& _tn_parent,
@@ -4179,11 +4257,11 @@ public:
                                   ceph::real_time* stable_timestamp)
     : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
       sync_pipe(_sync_pipe), bs(_sync_pipe.info.source_bs),
-      lease_cr(std::move(lease_cr)), sync_info(sync_info),
-      zone_id(sync_env->svc->zone->get_zone().id),
+      bucket_status_obj(_bucket_status_obj), lease_cr(std::move(lease_cr)),
+      sync_info(sync_info), zone_id(sync_env->svc->zone->get_zone().id),
       tn(sync_env->sync_tracer->add_node(_tn_parent, "inc_sync",
                                          SSTR(bucket_shard_str{bs}))),
-      marker_tracker(sc, status_oid, sync_info.inc_marker, tn,
+      marker_tracker(sc, shard_status_oid, sync_info.inc_marker, tn,
                      objv_tracker, stable_timestamp)
   {
     set_description() << "bucket shard incremental sync bucket="
@@ -4402,6 +4480,7 @@ int RGWBucketShardIncrementalSyncCR::operate(const DoutPrefixProvider *dpp)
                 return 0;
               });
       }
+
     } while (!list_result.empty() && sync_status == 0 && !syncstopped);
 
     drain_all_cb([&](uint64_t stack_id, int ret) {
@@ -4431,6 +4510,17 @@ int RGWBucketShardIncrementalSyncCR::operate(const DoutPrefixProvider *dpp)
       tn->log(10, SSTR("backing out with sync_status=" << sync_status));
       return set_cr_error(sync_status);
     }
+
+    if (!truncated && extended_result.next_log) {
+      yield call(new RGWBucketShardIsDoneCR(sc, bucket_status_obj, bs.shard_id, *extended_result.next_log, generation));
+      if (retcode < 0) {
+        ldout(cct, 20) << "failed to update bucket sync status: "
+            << cpp_strerror(retcode) << dendl;
+        drain_all();
+        return set_cr_error(retcode);
+      }
+    }
+
     return set_cr_done();
   }
   return 0;
@@ -4979,7 +5069,8 @@ class RGWSyncBucketShardCR : public RGWCoroutine {
   BucketSyncState& bucket_state;
   ceph::real_time* progress;
 
-  const std::string status_oid;
+  const std::string shard_status_oid;
+  const rgw_raw_obj bucket_status_obj;
   rgw_bucket_shard_sync_info sync_status;
   RGWObjVersionTracker objv_tracker;
 
@@ -4996,7 +5087,11 @@ public:
     : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
       lease_cr(std::move(lease_cr)), sync_pair(_sync_pair),
       sync_pipe(sync_pipe), bucket_state(bucket_state), progress(progress),
-      status_oid(RGWBucketPipeSyncStatusManager::inc_status_oid(sc->source_zone, sync_pair)),
+      shard_status_oid(RGWBucketPipeSyncStatusManager::inc_status_oid(sc->source_zone, sync_pair)),
+      bucket_status_obj(sc->env->svc->zone->get_zone_params().log_pool,
+                 RGWBucketPipeSyncStatusManager::full_status_oid(sc->source_zone,
+                                                                 sync_pair.source_bs.bucket,
+                                                                 sync_pair.dest_bs.bucket)),
       tn(tn) {
   }
 
@@ -5019,7 +5114,7 @@ int RGWSyncBucketShardCR::operate(const DoutPrefixProvider *dpp)
     }
 
     yield call(new RGWBucketShardIncrementalSyncCR(sc, sync_pipe,
-                                                   status_oid, lease_cr,
+                                                   shard_status_oid, bucket_status_obj, lease_cr,
                                                    sync_status, tn,
                                                    objv_tracker, progress));
     if (retcode < 0) {
@@ -5213,6 +5308,16 @@ int RGWSyncBucketCR::operate(const DoutPrefixProvider *dpp)
           }
         }
 
+        if (sync_pair.source_bs.shard_id >= bucket_status.shards_done_with_gen.size()) {
+          tn->log(1, SSTR("bucket shard " << sync_pair.source_bs << " index out of bounds"));
+          return set_cr_done(); // return success so we don't retry
+        }
+        if (bucket_status.shards_done_with_gen[sync_pair.source_bs.shard_id]) {
+          tn->log(10, SSTR("bucket shard " << sync_pair.source_bs << " of gen " <<
+                          gen << " already synced."));
+          return set_cr_done();
+        }
+
         yield call(new RGWSyncBucketShardCR(sc, data_lease_cr, sync_pair,
                                             sync_pipe, bucket_status.state,
                                             tn, progress));