}
};
-class RGWFullSyncErrorRepoCR: public RGWCoroutine {
+class RGWDataIncrementalSyncFullObligationCR: public RGWCoroutine {
RGWDataSyncCtx *sc;
RGWDataSyncEnv *sync_env;
rgw_bucket_shard source_bs;
RGWSyncTraceNodeRef tn;
rgw_bucket_index_marker_info remote_info;
rgw_pool pool;
- RGWDataChangesLog *datalog_changes{nullptr};
rgw_raw_obj error_repo;
uint32_t sid;
rgw_bucket_shard bs;
std::vector<store_gen_shards>::const_iterator each;
rgw_raw_obj datalog_oid_for_error_repo(rgw_bucket_shard& bs) {
- int datalog_shard = datalog_changes->choose_oid(bs);
+ int datalog_shard = sync_env->store->svc()->datalog_rados->choose_oid(bs);
string oid = RGWDataSyncStatusManager::shard_obj_name(sc->source_zone, datalog_shard);
return rgw_raw_obj(pool, oid + ".retry");
}
public:
- RGWFullSyncErrorRepoCR(RGWDataSyncCtx *_sc, rgw_bucket_shard& _source_bs, std::string _error_marker,
+ RGWDataIncrementalSyncFullObligationCR(RGWDataSyncCtx *_sc, rgw_bucket_shard& _source_bs, std::string _error_marker,
ceph::real_time& _timestamp, RGWSyncTraceNodeRef& _tn)
: RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
source_bs(_source_bs), error_marker(_error_marker), timestamp(_timestamp), tn(_tn) {
timestamp), cct->_conf->rgw_data_sync_spawn_window,
[&](uint64_t stack_id, int ret) {
if (ret < 0) {
- tn->log(10, SSTR("writing to error repo returned error: " << ret));
+ retcode = ret;
}
- return ret;
+ return 0;
});
}
}
return ret;
});
- spawn(rgw::error_repo::remove_cr(sync_env->store->svc()->rados, error_repo,
- error_marker, timestamp), false);
-
return set_cr_done();
}
return 0;
}
};
-RGWCoroutine* sync_single_entry(RGWDataSyncCtx *sc, const rgw_bucket_shard& src,
+RGWCoroutine* data_sync_single_entry(RGWDataSyncCtx *sc, const rgw_bucket_shard& src,
std::optional<uint64_t> gen,
const std::string marker,
ceph::real_time timestamp,
boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr,
boost::intrusive_ptr<rgw::bucket_sync::Cache> bucket_shard_cache,
- std::optional<RGWDataSyncShardMarkerTrack> marker_tracker,
+ RGWDataSyncShardMarkerTrack* marker_tracker,
rgw_raw_obj& error_repo,
RGWSyncTraceNodeRef& tn,
bool retry) {
ceph::real_time timestamp;
boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
boost::intrusive_ptr<rgw::bucket_sync::Cache> bucket_shard_cache;
+ RGWDataSyncShardMarkerTrack* marker_tracker;
RGWSyncTraceNodeRef tn;
rgw_bucket_index_marker_info remote_info;
uint32_t sid;
uint64_t i{0};
public:
- RGWDataFullSyncSingleEntryCR(RGWDataSyncCtx *_sc, rgw_bucket_shard& _source_bs,
- const std::string& _key, rgw_raw_obj& _error_repo,
- ceph::real_time& _timestamp, boost::intrusive_ptr<RGWContinuousLeaseCR> _lease_cr,
+ RGWDataFullSyncSingleEntryCR(RGWDataSyncCtx *_sc, const rgw_bucket_shard& _source_bs,
+ const std::string& _key, const rgw_raw_obj& _error_repo,
+ ceph::real_time _timestamp, boost::intrusive_ptr<RGWContinuousLeaseCR> _lease_cr,
boost::intrusive_ptr<rgw::bucket_sync::Cache> _bucket_shard_cache,
+ RGWDataSyncShardMarkerTrack* _marker_tracker,
RGWSyncTraceNodeRef& _tn)
: RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), source_bs(_source_bs), key(_key),
error_repo(_error_repo), timestamp(_timestamp), lease_cr(_lease_cr),
- bucket_shard_cache(_bucket_shard_cache), tn(_tn) {}
+ bucket_shard_cache(_bucket_shard_cache), marker_tracker(_marker_tracker), tn(_tn) {}
int operate(const DoutPrefixProvider *dpp) override {
//if any of the operations fail at any time, write them into error repo for later retry.
source_bs.shard_id = 0;
- yield call(sync_single_entry(sc, source_bs, remote_info.oldest_gen, key, timestamp,
- lease_cr, bucket_shard_cache, std::nullopt, error_repo, tn, false));
+ yield call(data_sync_single_entry(sc, source_bs, remote_info.oldest_gen, key, timestamp,
+ lease_cr, bucket_shard_cache, nullptr, error_repo, tn, false));
if (retcode < 0) {
tn->log(10, SSTR("full sync: failed to sync " << source_bs.shard_id << " of gen "
<< remote_info.oldest_gen << ". Writing to error repo for retry"));
} else {
source_bs.shard_id = sid;
tn->log(10, SSTR("full sync: syncing shard_id " << sid << " of gen " << each->gen));
- yield_spawn_window(sync_single_entry(sc, source_bs, each->gen, key, timestamp,
- lease_cr, bucket_shard_cache, std::nullopt, error_repo, tn, false),
+ yield_spawn_window(data_sync_single_entry(sc, source_bs, each->gen, key, timestamp,
+ lease_cr, bucket_shard_cache, nullptr, error_repo, tn, false),
cct->_conf->rgw_data_sync_spawn_window,
[&](uint64_t stack_id, int ret) {
if (ret < 0) {
return ret;
});
+ yield call(marker_tracker->finish(key));
+
return set_cr_done();
}
return 0;
tn->log(0, SSTR("ERROR: cannot start syncing " << iter->first << ". Duplicate entry?"));
} else {
yield_spawn_window(new RGWDataFullSyncSingleEntryCR(sc, source_bs, iter->first, error_repo, entry_timestamp,
- lease_cr, bucket_shard_cache, tn), cct->_conf->rgw_data_sync_spawn_window, std::nullopt);
-
- yield call(marker_tracker->finish(iter->first));
+ lease_cr, bucket_shard_cache, &*marker_tracker, tn),
+ cct->_conf->rgw_data_sync_spawn_window, std::nullopt);
}
sync_marker.marker = iter->first;
}
continue;
}
tn->log(20, SSTR("received async update notification: " << modified_iter->key));
- spawn(sync_single_entry(sc, source_bs, modified_iter->gen, string(),
- ceph::real_time{}, lease_cr, bucket_shard_cache, marker_tracker, error_repo, tn, false), false);
+ spawn(data_sync_single_entry(sc, source_bs, modified_iter->gen, string(),
+ ceph::real_time{}, lease_cr, bucket_shard_cache, &*marker_tracker, error_repo, tn, false), false);
}
if (error_retry_time <= ceph::coarse_real_clock::now()) {
}
if (!gen) {
// write all full sync obligations for the bucket to error repo
- spawn(new RGWFullSyncErrorRepoCR(sc, source_bs, error_marker, entry_timestamp, tn), false);
- if (retcode < 0) {
- tn->log(0, SSTR("ERROR: failed to write to error repo: retcode=" << retcode));
- }
+ spawn(new RGWDataIncrementalSyncFullObligationCR(sc, source_bs, error_marker, entry_timestamp, tn), false);
} else {
tn->log(20, SSTR("handle error entry key=" << to_string(source_bs, gen) << " timestamp=" << entry_timestamp));
- spawn(sync_single_entry(sc, source_bs, gen, "", entry_timestamp, lease_cr,
- bucket_shard_cache, marker_tracker, error_repo, tn, true), false);
+ spawn(data_sync_single_entry(sc, source_bs, gen, "", entry_timestamp, lease_cr,
+ bucket_shard_cache, &*marker_tracker, error_repo, tn, true), false);
}
}
if (!omapvals->more) {
if (!marker_tracker->start(log_iter->log_id, 0, log_iter->log_timestamp)) {
tn->log(0, SSTR("ERROR: cannot start syncing " << log_iter->log_id << ". Duplicate entry?"));
} else {
- yield_spawn_window(sync_single_entry(sc, source_bs, log_iter->entry.gen, log_iter->log_id,
+ yield_spawn_window(data_sync_single_entry(sc, source_bs, log_iter->entry.gen, log_iter->log_id,
log_iter->log_timestamp, lease_cr,bucket_shard_cache,
- marker_tracker, error_repo, tn, false),
+ &*marker_tracker, error_repo, tn, false),
cct->_conf->rgw_data_sync_spawn_window, std::nullopt);
}
}