]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: bucket sync run walks over generations
authorAdam C. Emerson <aemerson@redhat.com>
Sat, 14 May 2022 05:11:57 +0000 (01:11 -0400)
committerCasey Bodley <cbodley@redhat.com>
Fri, 27 May 2022 19:47:34 +0000 (15:47 -0400)
This should make the troubleshooting use case of bucket sync init/run
usable with multisite reshard.

This also fixes a few issues with the original bucket sync run code,
by spawning multiple shards at a time and retrying retryable errors.

Signed-off-by: Adam C. Emerson <aemerson@redhat.com>
src/rgw/rgw_data_sync.cc

index 40d25a7a4c26f03ab9b5d01294bb42ca54052d99..2794d80c2e4b5ec112ae207129bd0c47fc47d2ed 100644 (file)
@@ -5508,52 +5508,248 @@ RGWBucketPipeSyncStatusManager::read_sync_status(
   return sync_status;
 }
 
-int RGWBucketPipeSyncStatusManager::run(const DoutPrefixProvider *dpp)
-{
-  list<RGWCoroutinesStack *> stacks;
+namespace rgw::bucket_sync_run {
+// Retry-loop over calls to sync_bucket_shard_cr
+class ShardCR : public RGWCoroutine {
+  static constexpr auto allowed_retries = 10u;
 
-  struct bk {
-    std::vector<rgw_bucket_sync_pair_info> pairs;
-    uint64_t latest_gen;
-  };
-  std::vector<bk> bookkeeping;
+  RGWDataSyncCtx& sc;
+  const rgw_bucket_sync_pair_info& pair;
+  const uint64_t gen;
+  unsigned retries = 0;
 
-  for (auto& s : sources) {
-    auto stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
-    uint64_t num_shards, latest_gen;
-    auto ret = remote_info(dpp, s, nullptr, &latest_gen, &num_shards);
-    if (!ret) {
-      ldpp_dout(this, 5) << "Unable to get remote info: "
-                        << ret << dendl;
-      return ret;
+  ceph::real_time prev_progress;
+  ceph::real_time progress;
+
+public:
+
+  ShardCR(RGWDataSyncCtx& sc, const rgw_bucket_sync_pair_info& pair,
+         const uint64_t gen)
+    : RGWCoroutine(sc.cct), sc(sc), pair(pair), gen(gen) {}
+
+  int operate(const DoutPrefixProvider *dpp) override {
+    reenter(this) {
+      // Since all errors (except ECANCELED) are considered retryable,
+      // retry other errors so long as we're making progress.
+      for (retries = 0u, retcode = -EDOM;
+          (retries < allowed_retries) && (retcode != 0);
+          ++retries) {
+       ldpp_dout(dpp, 5) << "ShardCR: syncing bucket shard on: "
+                         << "zone=" << sc.source_zone
+                         << ", bucket=" << pair.source_bs.bucket.name
+                         << ", shard=" << pair.source_bs.shard_id
+                         << ", gen=" << gen
+                         << dendl;
+       yield call(sync_bucket_shard_cr(&sc, nullptr, pair, gen,
+                                       sc.env->sync_tracer->root_node,
+                                       &progress));
+
+       if (retcode == -ECANCELED) {
+         ldpp_dout(dpp, -1) << "ERROR: Got -ECANCELED for "
+                            << pair.source_bs << dendl;
+         drain_all();
+         return set_cr_error(retcode);
+       } else if (retcode < 0) {
+         ldpp_dout(dpp, 5) << "WARNING: Got error, retcode=" << retcode << " for "
+                           << pair.source_bs << "on retry "
+                           << retries + 1 << " of " << allowed_retries
+                           << " allowed" << dendl;
+         // Reset the retry counter if we made any progress
+         if (progress != prev_progress) {
+           retries = 0;
+         }
+         prev_progress = progress;
+       }
+      }
+      if (retcode < 0) {
+       ldpp_dout(dpp, -1) << "ERROR: Exhausted retries for "
+                          << pair.source_bs << " retcode="
+                          << retcode << dendl;
+       drain_all();
+       return set_cr_error(retcode);
+      }
+
+      drain_all();
+      return set_cr_done();
     }
-    bookkeeping.emplace_back();
-    auto& cur = bookkeeping.back();
-    cur.latest_gen = latest_gen;
-    cur.pairs.resize(num_shards);
-    for (auto shard = 0u; shard < num_shards; ++shard) {
-      auto& pair = cur.pairs[shard];
-      pair.handler = s.handler;
-      pair.source_bs.bucket = s.info.bucket;
-      pair.dest_bucket = s.dest;
+    return 0;
+  }
+};
+
+// Loop over calls to ShardCR with limited concurrency
+class GenCR : public RGWShardCollectCR {
+  static constexpr auto MAX_CONCURRENT_SHARDS = 64;
+
+  RGWDataSyncCtx& sc;
+  const uint64_t gen;
+
+  std::vector<rgw_bucket_sync_pair_info> pairs;
+  decltype(pairs)::const_iterator iter;
+
+public:
+  GenCR(RGWDataSyncCtx& sc, const rgw_bucket& source, const rgw_bucket& dest,
+       const uint64_t gen, const uint64_t shards,
+       const RGWBucketSyncFlowManager::pipe_handler& handler)
+    : RGWShardCollectCR(sc.cct, MAX_CONCURRENT_SHARDS),
+      sc(sc), gen(gen) {
+    pairs.resize(shards);
+    for (auto shard = 0u; shard < shards; ++shard) {
+      auto& pair = pairs[shard];
+      pair.handler = handler;
+      pair.source_bs.bucket = source;
+      pair.dest_bucket = dest;
       pair.source_bs.shard_id = shard;
-      stack->call(sync_bucket_shard_cr(&s.sc, nullptr, pair,
-                                      cur.latest_gen,
-                                      sync_env.sync_tracer->root_node,
-                                      nullptr));
+    }
+    iter = pairs.cbegin();
+    assert(pairs.size() == shards);
+  }
 
+  virtual bool spawn_next() override {
+    if (iter == pairs.cend()) {
+      return false;
     }
-    stacks.push_back(stack);
+    spawn(new ShardCR(sc, *iter, gen), false);
+    ++iter;
+    return true;
   }
 
-  int ret = cr_mgr.run(dpp, stacks);
-  if (ret < 0) {
-    ldpp_dout(this, 0) << "ERROR: failed to read sync status for "
-        << bucket_str{dest_bucket} << dendl;
-    return ret;
+  int handle_result(int r) override {
+    if (r < 0) {
+      ldpp_dout(sc.env->dpp, 4) << "ERROR: Error syncing shard: "
+                               << cpp_strerror(r) << dendl;
+    }
+    return r;
   }
+};
 
-  return 0;
+// Read sync status, loop over calls to GenCR
+class SourceCR : public RGWCoroutine {
+  RGWDataSyncCtx& sc;
+  const RGWBucketInfo& info;
+  const rgw_bucket& dest;
+  const RGWBucketSyncFlowManager::pipe_handler& handler;
+  const rgw_raw_obj status_obj{
+    sc.env->svc->zone->get_zone_params().log_pool,
+    RGWBucketPipeSyncStatusManager::full_status_oid(sc.source_zone, info.bucket,
+                                                   dest)};
+
+  BucketSyncState state = BucketSyncState::Incremental;
+  uint64_t gen = 0;
+  uint64_t num_shards = 0;
+  rgw_bucket_sync_status status;
+
+public:
+
+  SourceCR(RGWDataSyncCtx& sc, const RGWBucketInfo& info,
+          const rgw_bucket& dest,
+          const RGWBucketSyncFlowManager::pipe_handler& handler)
+    : RGWCoroutine(sc.cct), sc(sc), info(info), dest(dest), handler(handler) {}
+
+  int operate(const DoutPrefixProvider *dpp) override {
+    reenter(this) {
+      // Get the source's status. In incremental sync, this gives us
+      // the generation and shard count that is next needed to be run.
+      yield call(new RGWSimpleRadosReadCR<rgw_bucket_sync_status>(
+                  dpp, sc.env->async_rados, sc.env->svc->sysobj,
+                  status_obj, &status));
+      if (retcode < 0) {
+       ldpp_dout(dpp, -1) << "ERROR: Unable to fetch status for zone="
+                          << sc.source_zone << " retcode="
+                          << retcode << dendl;
+       drain_all();
+       return set_cr_error(retcode);
+      }
+
+      if (status.state == BucketSyncState::Stopped) {
+       // Nothing to do.
+       ldpp_dout(dpp, 0) << "SourceCR: Bucket is in state Stopped, returning."
+                         << dendl;
+       drain_all();
+       return set_cr_done();
+      }
+
+      do {
+       state = status.state;
+       gen = status.incremental_gen;
+       num_shards = status.shards_done_with_gen.size();
+
+       ldpp_dout(dpp, 5) << "SourceCR: "
+                         << "state=" << state
+                         << ", gen=" << gen
+                         << ", num_shards=" << num_shards
+                         << dendl;
+
+       // Special case to handle full sync. Since full sync no longer
+       // uses shards and has no generations, we sync shard zero,
+       // though use the current generation so a following
+       // incremental sync can carry on.
+       if (state != BucketSyncState::Incremental) {
+         ldpp_dout(dpp, 1)  << "SourceCR: Calling GenCR with "
+                            << "gen=" << gen
+                            << ", num_shards=" << 1
+                            << dendl;
+         yield call(new GenCR(sc, info.bucket, dest, gen, 1, handler));
+       } else {
+         ldpp_dout(dpp, 1) << "SourceCR: Calling GenCR with "
+                           << "gen=" << gen
+                           << ", num_shards=" << num_shards
+                           << dendl;
+         yield call(new GenCR(sc, info.bucket, dest, gen, num_shards,
+                              handler));
+       }
+       if (retcode < 0) {
+         ldpp_dout(dpp, -1) << "ERROR: Giving up syncing from "
+                            << sc.source_zone << " retcode="
+                            << retcode << dendl;
+         drain_all();
+         return set_cr_error(retcode);
+       }
+
+       yield call(new RGWSimpleRadosReadCR<rgw_bucket_sync_status>(
+                    dpp, sc.env->async_rados, sc.env->svc->sysobj,
+                    status_obj, &status));
+       if (retcode < 0) {
+         ldpp_dout(dpp, -1) << "ERROR: Unable to fetch status for zone="
+                            << sc.source_zone << " retcode="
+                            << retcode << dendl;
+         drain_all();
+         return set_cr_error(retcode);
+       }
+       // Repeat until we have done an incremental run and the
+       // generation remains unchanged.
+       ldpp_dout(dpp, 5) << "SourceCR: "
+                         << "state=" << state
+                         << ", gen=" << gen
+                         << ", num_shards=" << num_shards
+                         << ", status.state=" << status.state
+                         << ", status.incremental_gen=" << status.incremental_gen
+                         << ", status.shards_done_with_gen.size()=" << status.shards_done_with_gen.size()
+                         << dendl;
+      } while (state != BucketSyncState::Incremental ||
+              gen != status.incremental_gen);
+      drain_all();
+      return set_cr_done();
+    }
+    return 0;
+  }
+};
+} // namespace rgw::bucket_sync_run
+
+int RGWBucketPipeSyncStatusManager::run(const DoutPrefixProvider *dpp)
+{
+  list<RGWCoroutinesStack *> stacks;
+  for (auto& source : sources) {
+    auto stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
+    stack->call(new rgw::bucket_sync_run::SourceCR(source.sc, source.info,
+                                                  source.dest, source.handler));
+    stacks.push_back(stack);
+  }
+  auto ret = cr_mgr.run(dpp, stacks);
+  if (ret < 0) {
+    ldpp_dout(this, 0) << "ERROR: Sync unsuccessful on bucket "
+                      << bucket_str{dest_bucket} << dendl;
+  }
+  return ret;
 }
 
 unsigned RGWBucketPipeSyncStatusManager::get_subsys() const