sync_status(sync_status), bucket_shard_cache(bucket_shard_cache) {}
};
+class RGWDataFullSyncShardCR : public RGWDataBaseSyncShardCR {
+ static constexpr auto OMAP_GET_MAX_ENTRIES = 100;
+
+ string oid;
+ uint64_t total_entries = 0;
+ ceph::real_time entry_timestamp;
+ std::map<std::string, bufferlist> entries;
+ std::map<std::string, bufferlist>::iterator iter;
+ string error_marker;
+
+public:
+
+ RGWDataFullSyncShardCR(
+ RGWDataSyncCtx *const sc, const rgw_pool& pool, const uint32_t shard_id,
+ rgw_data_sync_marker& sync_marker, RGWSyncTraceNodeRef tn,
+ const string& status_oid, const rgw_raw_obj& error_repo,
+ const boost::intrusive_ptr<RGWContinuousLeaseCR>& lease_cr,
+ const rgw_data_sync_status& sync_status,
+ const boost::intrusive_ptr<rgw::bucket_sync::Cache>& bucket_shard_cache)
+ : RGWDataBaseSyncShardCR(sc, pool, shard_id, sync_marker, tn,
+ status_oid, error_repo, lease_cr, sync_status,
+ bucket_shard_cache) {}
+
+ int operate(const DoutPrefixProvider *dpp) override {
+ reenter(this) {
+ tn->log(10, "start full sync");
+ oid = full_data_sync_index_shard_oid(sc->source_zone, shard_id);
+ marker_tracker.emplace(sc, status_oid, sync_marker, tn);
+ total_entries = sync_marker.pos;
+ entry_timestamp = sync_marker.timestamp; // time when full sync started
+ do {
+ if (!lease_cr->is_locked()) {
+ lease_cr->go_down();
+ drain_all();
+ return set_cr_error(-ECANCELED);
+ }
+ omapvals = std::make_shared<RGWRadosGetOmapValsCR::Result>();
+ yield call(new RGWRadosGetOmapValsCR(sc->env->store,
+ rgw_raw_obj(pool, oid),
+ sync_marker.marker,
+ OMAP_GET_MAX_ENTRIES, omapvals));
+ if (retcode < 0) {
+ lease_cr->go_down();
+ drain_all();
+ return set_cr_error(retcode);
+ }
+ entries = std::move(omapvals->entries);
+ if (entries.size() > 0) {
+ tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
+ }
+ tn->log(20, SSTR("retrieved " << entries.size() << " entries to sync"));
+ iter = entries.begin();
+ for (; iter != entries.end(); ++iter) {
+ retcode = parse_bucket_key(iter->first, source_bs);
+ if (retcode < 0) {
+ tn->log(1, SSTR("failed to parse bucket shard: " << iter->first));
+ marker_tracker->try_update_high_marker(iter->first, 0,
+ entry_timestamp);
+ continue;
+ }
+ tn->log(20, SSTR("full sync: " << iter->first));
+ total_entries++;
+ if (!marker_tracker->start(iter->first, total_entries,
+ entry_timestamp)) {
+ tn->log(0, SSTR("ERROR: cannot start syncing " << iter->first
+ << ". Duplicate entry?"));
+ } else {
+ tn->log(10, SSTR("timestamp for " << iter->first << " is :" << entry_timestamp));
+ yield_spawn_window(new RGWDataFullSyncSingleEntryCR(
+ sc, pool, source_bs, iter->first, sync_status,
+ error_repo, entry_timestamp, lease_cr,
+ bucket_shard_cache, &*marker_tracker, tn),
+ cct->_conf->rgw_data_sync_spawn_window,
+ std::nullopt);
+ }
+ sync_marker.marker = iter->first;
+ }
+ } while (omapvals->more);
+ omapvals.reset();
+
+ drain_all();
+
+ tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
+
+ yield {
+ /* update marker to reflect we're done with full sync */
+ sync_marker.state = rgw_data_sync_marker::IncrementalSync;
+ sync_marker.marker = sync_marker.next_step_marker;
+ sync_marker.next_step_marker.clear();
+ call(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(
+ sc->env->dpp,sc->env->async_rados, sc->env->svc->sysobj,
+ rgw_raw_obj(pool, status_oid), sync_marker));
+ }
+ if (retcode < 0) {
+ tn->log(0, SSTR("ERROR: failed to set sync marker: retcode=" << retcode));
+ lease_cr->go_down();
+ drain_all();
+ return set_cr_error(retcode);
+ }
+ // clean up full sync index
+ yield {
+ const auto& pool = sc->env->svc->zone->get_zone_params().log_pool;
+ auto oid = full_data_sync_index_shard_oid(sc->source_zone.id, shard_id);
+ call(new RGWRadosRemoveCR(sc->env->store, {pool, oid}));
+ }
+ // keep lease and transition to incremental_sync()
+ }
+ return 0;
+ }
+};
class RGWDataSyncShardCR : public RGWCoroutine {
+ static constexpr auto OMAP_GET_MAX_ENTRIES = 100;
RGWDataSyncCtx *sc;
RGWDataSyncEnv *sync_env;
}
int full_sync() {
-#define OMAP_GET_MAX_ENTRIES 100
int max_entries = OMAP_GET_MAX_ENTRIES;
reenter(&full_cr) {
tn->log(10, "start full sync");