]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/multisite: move out full sync trigger handling block to a new cr class.
authorShilpa Jagannath <smanjara@redhat.com>
Wed, 29 Jun 2022 14:57:47 +0000 (10:57 -0400)
committerAdam C. Emerson <aemerson@redhat.com>
Mon, 8 Aug 2022 19:44:15 +0000 (15:44 -0400)
Signed-off-by: Shilpa Jagannath <smanjara@redhat.com>
src/rgw/rgw_data_sync.cc

index 7d0a12b7d947f21f4cb8fa38b1e305409e8346cc..8c9fe5189a406583517f8ffcd81b19fc11976e29 100644 (file)
@@ -1484,7 +1484,7 @@ public:
 
 RGWCoroutine* sync_single_entry(RGWDataSyncCtx *sc, const rgw_bucket_shard& src,
                                 std::optional<uint64_t> gen,
-                                const std::string& marker,
+                                const std::string marker,
                                 ceph::real_time timestamp,
                                 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr,
                                 boost::intrusive_ptr<rgw::bucket_sync::Cache> bucket_shard_cache,
@@ -1499,6 +1499,116 @@ RGWCoroutine* sync_single_entry(RGWDataSyncCtx *sc, const rgw_bucket_shard& src,
                                       lease_cr.get(), tn);
 }
 
+class RGWHandleFullSyncCR : public RGWCoroutine {
+  RGWDataSyncCtx *sc;
+  RGWDataSyncEnv *sync_env;
+  rgw_bucket_shard source_bs;
+  const std::string key;
+  rgw_raw_obj error_repo;
+  ceph::real_time timestamp;
+  boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
+  boost::intrusive_ptr<rgw::bucket_sync::Cache> bucket_shard_cache;
+  std::optional<RGWDataSyncShardMarkerTrack> marker_tracker;
+  RGWSyncTraceNodeRef tn;
+  rgw_bucket_index_marker_info remote_info;
+  uint32_t sid;
+  std::vector<store_gen_shards>::iterator each;
+
+public:
+  RGWHandleFullSyncCR(RGWDataSyncCtx *_sc, rgw_bucket_shard& _source_bs,
+                      const std::string& _key, rgw_raw_obj& _error_repo,
+                      ceph::real_time& _timestamp, boost::intrusive_ptr<RGWContinuousLeaseCR> _lease_cr,
+                      boost::intrusive_ptr<rgw::bucket_sync::Cache> _bucket_shard_cache,
+                      std::optional<RGWDataSyncShardMarkerTrack> _marker_tracker,
+                      RGWSyncTraceNodeRef& _tn)
+    : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), source_bs(_source_bs), key(_key),
+      error_repo(_error_repo), timestamp(_timestamp), lease_cr(_lease_cr),
+      bucket_shard_cache(_bucket_shard_cache), marker_tracker(_marker_tracker), tn(_tn) {}
+
+
+  int operate(const DoutPrefixProvider *dpp) override {
+    reenter(this) {
+      yield call(new RGWReadRemoteBucketIndexLogInfoCR(sc, source_bs.bucket, &remote_info));
+      if (retcode < 0) {
+        tn->log(10, SSTR("full sync: failed to read remote bucket info. Writing "
+                        << source_bs.shard_id << " to error repo for retry"));
+        yield call(rgw::error_repo::write_cr(sync_env->store->svc()->rados, error_repo,
+                                            rgw::error_repo::encode_key(source_bs, std::nullopt),
+                                            timestamp));
+        if (retcode < 0) {
+          tn->log(0, SSTR("ERROR: failed to log " << source_bs.shard_id << " in error repo: retcode=" << retcode));
+        }
+        return set_cr_error(retcode);
+      }
+
+      //wait to sync the first shard of the oldest generation and then sync all other shards.
+      //if any of the operations fail at any time, write them into error repo for later retry.
+
+      source_bs.shard_id = 0;
+      yield call(sync_single_entry(sc, source_bs, remote_info.oldest_gen, key, timestamp,
+                                  lease_cr, bucket_shard_cache, marker_tracker, error_repo, tn, false));
+      if (retcode < 0) {
+        tn->log(10, SSTR("full sync: failed to sync " << source_bs.shard_id << " of gen "
+                        << remote_info.oldest_gen << ". Writing to error repo for retry"));
+        yield call(rgw::error_repo::write_cr(sync_env->store->svc()->rados, error_repo,
+                                            rgw::error_repo::encode_key(source_bs, remote_info.oldest_gen),
+                                            timestamp));
+        if (retcode < 0) {
+          tn->log(0, SSTR("ERROR: failed to write " << remote_info.oldest_gen << ":" << source_bs.shard_id
+                         << " in error repo: retcode=" << retcode));
+          return set_cr_error(retcode);
+        }
+      }
+      each = remote_info.generations.begin();
+      for (; each != remote_info.generations.end(); each++) {
+        for (sid = 0; sid < each->num_shards; sid++) {
+          source_bs.shard_id = sid;
+          tn->log(10, SSTR("full sync: syncing shard_id " << sid << " of gen " << each->gen));
+          yield_spawn_window(sync_single_entry(sc, source_bs, each->gen, key, timestamp,
+                            lease_cr, bucket_shard_cache, marker_tracker, error_repo, tn, false),
+                            cct->_conf->rgw_data_sync_spawn_window,
+                            [&](uint64_t stack_id, int ret) {
+                            if (ret < 0) {
+                              sid = source_bs.shard_id;
+                              for (; sid < each->num_shards; sid++) {
+                                source_bs.shard_id = sid;
+                                spawn(rgw::error_repo::write_cr(sync_env->store->svc()->rados, error_repo,
+                                        rgw::error_repo::encode_key(source_bs, each->gen),
+                                        timestamp), false);
+                                if (retcode < 0) {
+                                  tn->log(0, SSTR("ERROR: failed to write " << each->gen << ":"
+                                  << sid << " to error repo: retcode=" << retcode));
+                                }
+                              }
+                              auto i = std::distance(remote_info.generations.begin(), each);
+                              for (each[i]; each != remote_info.generations.end(); each++) {
+                                for (sid = 0; sid < each->num_shards; sid++){
+                                  spawn(rgw::error_repo::write_cr(sync_env->store->svc()->rados, error_repo,
+                                          rgw::error_repo::encode_key(source_bs, each->gen),
+                                          timestamp), false);
+                                  if (retcode < 0) {
+                                    tn->log(0, SSTR("ERROR: failed to write " << each->gen << ":"
+                                    << sid << " to error repo: retcode=" << retcode));
+                                  }
+                                }
+                              }
+                            }
+                            return 0;
+                            });
+          drain_all_cb([&](uint64_t stack_id, int ret) {
+            if (ret < 0) {
+              tn->log(10, SSTR("a sync operation returned error: " << ret));
+            }
+            return ret;
+          });
+        }
+      }
+      return set_cr_done();
+    }
+    return 0;
+  }
+};
+
 #define DATA_SYNC_MAX_ERR_ENTRIES 10
 
 class RGWDataSyncShardCR : public RGWCoroutine {
@@ -1689,81 +1799,8 @@ public:
           if (!marker_tracker->start(iter->first, total_entries, entry_timestamp)) {
             tn->log(0, SSTR("ERROR: cannot start syncing " << iter->first << ". Duplicate entry?"));
           } else {
-            yield call(new RGWReadRemoteBucketIndexLogInfoCR(sc, source_bs.bucket, &remote_info));
-            if (retcode < 0) {
-              tn->log(10, SSTR("full sync: failed to read remote bucket info. Writing "
-                              << source_bs.shard_id << " to error repo for retry"));
-              yield call(rgw::error_repo::write_cr(sync_env->store->svc()->rados, error_repo,
-                                                  rgw::error_repo::encode_key(source_bs, std::nullopt),
-                                                  entry_timestamp));
-              if (retcode < 0) {
-                tn->log(0, SSTR("ERROR: failed to log " << source_bs.shard_id << " in error repo: retcode=" << retcode));
-              }
-              return set_cr_error(retcode);
-            }
-
-            //wait to sync the first shard of the oldest generation and then sync all other shards.
-            //if any of the operations fail at any time, write them into error repo for later retry.
-
-            source_bs.shard_id = 0;
-            yield call(sync_single_entry(sc, source_bs, remote_info.oldest_gen, iter->first, entry_timestamp,
-                                        lease_cr, bucket_shard_cache, marker_tracker, error_repo, tn, false));
-            if (retcode < 0) {
-              tn->log(10, SSTR("full sync: failed to sync " << source_bs.shard_id << " of gen "
-                              << remote_info.oldest_gen << ". Writing to error repo for retry"));
-              yield call(rgw::error_repo::write_cr(sync_env->store->svc()->rados, error_repo,
-                                                  rgw::error_repo::encode_key(source_bs, remote_info.oldest_gen),
-                                                  entry_timestamp));
-              if (retcode < 0) {
-                tn->log(0, SSTR("ERROR: failed to write " << remote_info.oldest_gen << ":" << source_bs.shard_id
-                              << " in error repo: retcode=" << retcode));
-                return set_cr_error(retcode);
-              }
-            }
-            each = remote_info.generations.begin();
-            for (; each != remote_info.generations.end(); each++) {
-              for (sid = 0; sid < each->num_shards; sid++) {
-                source_bs.shard_id = sid;
-                tn->log(10, SSTR("full sync: syncing shard_id " << sid << " of gen " << each->gen));
-                yield_spawn_window(sync_single_entry(sc, source_bs, each->gen, iter->first, entry_timestamp,
-                                  lease_cr, bucket_shard_cache, marker_tracker, error_repo, tn, false),
-                                  cct->_conf->rgw_data_sync_spawn_window,
-                                  [&](uint64_t stack_id, int ret) {
-                                  if (ret < 0) {
-                                    sid = source_bs.shard_id;
-                                    for (; sid < each->num_shards; sid++) {
-                                      source_bs.shard_id = sid;
-                                      spawn(rgw::error_repo::write_cr(sync_env->store->svc()->rados, error_repo,
-                                              rgw::error_repo::encode_key(source_bs, each->gen),
-                                              entry_timestamp), false);
-                                      if (retcode < 0) {
-                                        tn->log(0, SSTR("ERROR: failed to write " << each->gen << ":"
-                                        << sid << " to error repo: retcode=" << retcode));
-                                      }
-                                    }
-                                    auto i = std::distance(remote_info.generations.begin(), each);
-                                    for (each[i]; each != remote_info.generations.end(); each++) {
-                                      for (sid = 0; sid < each->num_shards; sid++){
-                                        spawn(rgw::error_repo::write_cr(sync_env->store->svc()->rados, error_repo,
-                                                rgw::error_repo::encode_key(source_bs, each->gen),
-                                                entry_timestamp), false);
-                                        if (retcode < 0) {
-                                          tn->log(0, SSTR("ERROR: failed to write " << each->gen << ":"
-                                          << sid << " to error repo: retcode=" << retcode));
-                                        }
-                                      }
-                                    }
-                                  }
-                                  return 0;
-                                  });
-                drain_all_cb([&](uint64_t stack_id, int ret) {
-                  if (ret < 0) {
-                    tn->log(10, SSTR("a sync operation returned error: " << ret));
-                  }
-                  return ret;
-                });
-              }
-            }
+            yield call(new RGWHandleFullSyncCR(sc, source_bs, iter->first, error_repo, entry_timestamp,
+                            lease_cr, bucket_shard_cache, marker_tracker, tn));
           }
          sync_marker.marker = iter->first;
         }