]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: Break out RGWDataFullSyncShardCR
authorAdam C. Emerson <aemerson@redhat.com>
Thu, 4 Aug 2022 20:52:28 +0000 (16:52 -0400)
committerCasey Bodley <cbodley@redhat.com>
Tue, 9 Aug 2022 19:29:21 +0000 (15:29 -0400)
This function was formerly RGWDataSyncShardCR::full_sync. The only
functional difference is that we leave acquiring the lease to the top
level RGWDataSyncShardCR coroutine class, since the lease should be
held on the transition from full to incremental sync.

Fixes: https://tracker.ceph.com/issues/57063
Signed-off-by: Adam C. Emerson <aemerson@redhat.com>
src/rgw/rgw_data_sync.cc

index 2c935b817df4fa43d6db37097dce52e1b1597527..18c9fa04f951a5b341ac27bdaffbf4274dab5da3 100644 (file)
@@ -1654,8 +1654,119 @@ protected:
       sync_status(sync_status), bucket_shard_cache(bucket_shard_cache) {}
 };
 
+class RGWDataFullSyncShardCR : public RGWDataBaseSyncShardCR {
+  static constexpr auto OMAP_GET_MAX_ENTRIES = 100;
+
+  string oid;
+  uint64_t total_entries = 0;
+  ceph::real_time entry_timestamp;
+  std::map<std::string, bufferlist> entries;
+  std::map<std::string, bufferlist>::iterator iter;
+  string error_marker;
+
+public:
+
+  RGWDataFullSyncShardCR(
+    RGWDataSyncCtx *const sc, const rgw_pool& pool, const uint32_t shard_id,
+    rgw_data_sync_marker& sync_marker, RGWSyncTraceNodeRef tn,
+    const string& status_oid, const rgw_raw_obj& error_repo,
+    const boost::intrusive_ptr<RGWContinuousLeaseCR>& lease_cr,
+    const rgw_data_sync_status& sync_status,
+    const boost::intrusive_ptr<rgw::bucket_sync::Cache>& bucket_shard_cache)
+    : RGWDataBaseSyncShardCR(sc, pool, shard_id, sync_marker, tn,
+                            status_oid, error_repo, lease_cr, sync_status,
+                            bucket_shard_cache) {}
+
+  int operate(const DoutPrefixProvider *dpp) override {
+    reenter(this) {
+      tn->log(10, "start full sync");
+      oid = full_data_sync_index_shard_oid(sc->source_zone, shard_id);
+      marker_tracker.emplace(sc, status_oid, sync_marker, tn);
+      total_entries = sync_marker.pos;
+      entry_timestamp = sync_marker.timestamp; // time when full sync started
+      do {
+        if (!lease_cr->is_locked()) {
+          lease_cr->go_down();
+          drain_all();
+          return set_cr_error(-ECANCELED);
+        }
+        omapvals = std::make_shared<RGWRadosGetOmapValsCR::Result>();
+        yield call(new RGWRadosGetOmapValsCR(sc->env->store,
+                                            rgw_raw_obj(pool, oid),
+                                             sync_marker.marker,
+                                            OMAP_GET_MAX_ENTRIES, omapvals));
+        if (retcode < 0) {
+          lease_cr->go_down();
+          drain_all();
+          return set_cr_error(retcode);
+        }
+        entries = std::move(omapvals->entries);
+        if (entries.size() > 0) {
+          tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
+        }
+        tn->log(20, SSTR("retrieved " << entries.size() << " entries to sync"));
+        iter = entries.begin();
+        for (; iter != entries.end(); ++iter) {
+          retcode = parse_bucket_key(iter->first, source_bs);
+          if (retcode < 0) {
+            tn->log(1, SSTR("failed to parse bucket shard: " << iter->first));
+            marker_tracker->try_update_high_marker(iter->first, 0,
+                                                  entry_timestamp);
+            continue;
+          }
+          tn->log(20, SSTR("full sync: " << iter->first));
+          total_entries++;
+          if (!marker_tracker->start(iter->first, total_entries,
+                                    entry_timestamp)) {
+            tn->log(0, SSTR("ERROR: cannot start syncing " << iter->first
+                           << ". Duplicate entry?"));
+          } else {
+            tn->log(10, SSTR("timestamp for " << iter->first << " is :" << entry_timestamp));
+            yield_spawn_window(new RGWDataFullSyncSingleEntryCR(
+                                sc, pool, source_bs, iter->first, sync_status,
+                                error_repo, entry_timestamp, lease_cr,
+                                bucket_shard_cache, &*marker_tracker, tn),
+                              cct->_conf->rgw_data_sync_spawn_window,
+                              std::nullopt);
+          }
+         sync_marker.marker = iter->first;
+        }
+      } while (omapvals->more);
+      omapvals.reset();
+
+      drain_all();
+
+      tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
+
+      yield {
+        /* update marker to reflect we're done with full sync */
+        sync_marker.state = rgw_data_sync_marker::IncrementalSync;
+        sync_marker.marker = sync_marker.next_step_marker;
+        sync_marker.next_step_marker.clear();
+        call(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(
+              sc->env->dpp,sc->env->async_rados, sc->env->svc->sysobj,
+              rgw_raw_obj(pool, status_oid), sync_marker));
+      }
+      if (retcode < 0) {
+        tn->log(0, SSTR("ERROR: failed to set sync marker: retcode=" << retcode));
+        lease_cr->go_down();
+        drain_all();
+        return set_cr_error(retcode);
+      }
+      // clean up full sync index
+      yield {
+        const auto& pool = sc->env->svc->zone->get_zone_params().log_pool;
+        auto oid = full_data_sync_index_shard_oid(sc->source_zone.id, shard_id);
+        call(new RGWRadosRemoveCR(sc->env->store, {pool, oid}));
+      }
+      // keep lease and transition to incremental_sync()
+    }
+    return 0;
+  }
+};
 
 class RGWDataSyncShardCR : public RGWCoroutine {
+  static constexpr auto OMAP_GET_MAX_ENTRIES = 100;
   RGWDataSyncCtx *sc;
   RGWDataSyncEnv *sync_env;
 
@@ -1789,7 +1900,6 @@ public:
   }
 
   int full_sync() {
-#define OMAP_GET_MAX_ENTRIES 100
     int max_entries = OMAP_GET_MAX_ENTRIES;
     reenter(&full_cr) {
       tn->log(10, "start full sync");