RGWCoroutine* sync_single_entry(RGWDataSyncCtx *sc, const rgw_bucket_shard& src,
std::optional<uint64_t> gen,
- const std::string& marker,
+ const std::string marker,
ceph::real_time timestamp,
boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr,
boost::intrusive_ptr<rgw::bucket_sync::Cache> bucket_shard_cache,
lease_cr.get(), tn);
}
+class RGWHandleFullSyncCR : public RGWCoroutine {
+ RGWDataSyncCtx *sc;
+ RGWDataSyncEnv *sync_env;
+ rgw_bucket_shard source_bs;
+ const std::string key;
+ rgw_raw_obj error_repo;
+ ceph::real_time timestamp;
+ boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
+ boost::intrusive_ptr<rgw::bucket_sync::Cache> bucket_shard_cache;
+ std::optional<RGWDataSyncShardMarkerTrack> marker_tracker;
+ RGWSyncTraceNodeRef tn;
+ rgw_bucket_index_marker_info remote_info;
+ uint32_t sid;
+ std::vector<store_gen_shards>::iterator each;
+
+public:
+ RGWHandleFullSyncCR(RGWDataSyncCtx *_sc, rgw_bucket_shard& _source_bs,
+ const std::string& _key, rgw_raw_obj& _error_repo,
+ ceph::real_time& _timestamp, boost::intrusive_ptr<RGWContinuousLeaseCR> _lease_cr,
+ boost::intrusive_ptr<rgw::bucket_sync::Cache> _bucket_shard_cache,
+ std::optional<RGWDataSyncShardMarkerTrack> _marker_tracker,
+ RGWSyncTraceNodeRef& _tn)
+ : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), source_bs(_source_bs), key(_key),
+ error_repo(_error_repo), timestamp(_timestamp), lease_cr(_lease_cr),
+ bucket_shard_cache(_bucket_shard_cache), marker_tracker(_marker_tracker), tn(_tn) {}
+
+
+ int operate(const DoutPrefixProvider *dpp) override {
+ reenter(this) {
+ yield call(new RGWReadRemoteBucketIndexLogInfoCR(sc, source_bs.bucket, &remote_info));
+ if (retcode < 0) {
+ tn->log(10, SSTR("full sync: failed to read remote bucket info. Writing "
+ << source_bs.shard_id << " to error repo for retry"));
+ yield call(rgw::error_repo::write_cr(sync_env->store->svc()->rados, error_repo,
+ rgw::error_repo::encode_key(source_bs, std::nullopt),
+ timestamp));
+ if (retcode < 0) {
+ tn->log(0, SSTR("ERROR: failed to log " << source_bs.shard_id << " in error repo: retcode=" << retcode));
+ }
+ return set_cr_error(retcode);
+ }
+
+ //wait to sync the first shard of the oldest generation and then sync all other shards.
+ //if any of the operations fail at any time, write them into error repo for later retry.
+
+ source_bs.shard_id = 0;
+ yield call(sync_single_entry(sc, source_bs, remote_info.oldest_gen, key, timestamp,
+ lease_cr, bucket_shard_cache, marker_tracker, error_repo, tn, false));
+ if (retcode < 0) {
+ tn->log(10, SSTR("full sync: failed to sync " << source_bs.shard_id << " of gen "
+ << remote_info.oldest_gen << ". Writing to error repo for retry"));
+ yield call(rgw::error_repo::write_cr(sync_env->store->svc()->rados, error_repo,
+ rgw::error_repo::encode_key(source_bs, remote_info.oldest_gen),
+ timestamp));
+ if (retcode < 0) {
+ tn->log(0, SSTR("ERROR: failed to write " << remote_info.oldest_gen << ":" << source_bs.shard_id
+ << " in error repo: retcode=" << retcode));
+ return set_cr_error(retcode);
+ }
+ }
+ each = remote_info.generations.begin();
+ for (; each != remote_info.generations.end(); each++) {
+ for (sid = 0; sid < each->num_shards; sid++) {
+ source_bs.shard_id = sid;
+ tn->log(10, SSTR("full sync: syncing shard_id " << sid << " of gen " << each->gen));
+ yield_spawn_window(sync_single_entry(sc, source_bs, each->gen, key, timestamp,
+ lease_cr, bucket_shard_cache, marker_tracker, error_repo, tn, false),
+ cct->_conf->rgw_data_sync_spawn_window,
+ [&](uint64_t stack_id, int ret) {
+ if (ret < 0) {
+ sid = source_bs.shard_id;
+ for (; sid < each->num_shards; sid++) {
+ source_bs.shard_id = sid;
+ spawn(rgw::error_repo::write_cr(sync_env->store->svc()->rados, error_repo,
+ rgw::error_repo::encode_key(source_bs, each->gen),
+ timestamp), false);
+ if (retcode < 0) {
+ tn->log(0, SSTR("ERROR: failed to write " << each->gen << ":"
+ << sid << " to error repo: retcode=" << retcode));
+ }
+ }
+ auto i = std::distance(remote_info.generations.begin(), each);
+ for (each[i]; each != remote_info.generations.end(); each++) {
+ for (sid = 0; sid < each->num_shards; sid++){
+ spawn(rgw::error_repo::write_cr(sync_env->store->svc()->rados, error_repo,
+ rgw::error_repo::encode_key(source_bs, each->gen),
+ timestamp), false);
+ if (retcode < 0) {
+ tn->log(0, SSTR("ERROR: failed to write " << each->gen << ":"
+ << sid << " to error repo: retcode=" << retcode));
+ }
+ }
+ }
+ }
+ return 0;
+ });
+ drain_all_cb([&](uint64_t stack_id, int ret) {
+ if (ret < 0) {
+ tn->log(10, SSTR("a sync operation returned error: " << ret));
+ }
+ return ret;
+ });
+ }
+ }
+ return set_cr_done();
+ }
+ return 0;
+ }
+};
+
#define DATA_SYNC_MAX_ERR_ENTRIES 10
class RGWDataSyncShardCR : public RGWCoroutine {
if (!marker_tracker->start(iter->first, total_entries, entry_timestamp)) {
tn->log(0, SSTR("ERROR: cannot start syncing " << iter->first << ". Duplicate entry?"));
} else {
- yield call(new RGWReadRemoteBucketIndexLogInfoCR(sc, source_bs.bucket, &remote_info));
- if (retcode < 0) {
- tn->log(10, SSTR("full sync: failed to read remote bucket info. Writing "
- << source_bs.shard_id << " to error repo for retry"));
- yield call(rgw::error_repo::write_cr(sync_env->store->svc()->rados, error_repo,
- rgw::error_repo::encode_key(source_bs, std::nullopt),
- entry_timestamp));
- if (retcode < 0) {
- tn->log(0, SSTR("ERROR: failed to log " << source_bs.shard_id << " in error repo: retcode=" << retcode));
- }
- return set_cr_error(retcode);
- }
-
- //wait to sync the first shard of the oldest generation and then sync all other shards.
- //if any of the operations fail at any time, write them into error repo for later retry.
-
- source_bs.shard_id = 0;
- yield call(sync_single_entry(sc, source_bs, remote_info.oldest_gen, iter->first, entry_timestamp,
- lease_cr, bucket_shard_cache, marker_tracker, error_repo, tn, false));
- if (retcode < 0) {
- tn->log(10, SSTR("full sync: failed to sync " << source_bs.shard_id << " of gen "
- << remote_info.oldest_gen << ". Writing to error repo for retry"));
- yield call(rgw::error_repo::write_cr(sync_env->store->svc()->rados, error_repo,
- rgw::error_repo::encode_key(source_bs, remote_info.oldest_gen),
- entry_timestamp));
- if (retcode < 0) {
- tn->log(0, SSTR("ERROR: failed to write " << remote_info.oldest_gen << ":" << source_bs.shard_id
- << " in error repo: retcode=" << retcode));
- return set_cr_error(retcode);
- }
- }
- each = remote_info.generations.begin();
- for (; each != remote_info.generations.end(); each++) {
- for (sid = 0; sid < each->num_shards; sid++) {
- source_bs.shard_id = sid;
- tn->log(10, SSTR("full sync: syncing shard_id " << sid << " of gen " << each->gen));
- yield_spawn_window(sync_single_entry(sc, source_bs, each->gen, iter->first, entry_timestamp,
- lease_cr, bucket_shard_cache, marker_tracker, error_repo, tn, false),
- cct->_conf->rgw_data_sync_spawn_window,
- [&](uint64_t stack_id, int ret) {
- if (ret < 0) {
- sid = source_bs.shard_id;
- for (; sid < each->num_shards; sid++) {
- source_bs.shard_id = sid;
- spawn(rgw::error_repo::write_cr(sync_env->store->svc()->rados, error_repo,
- rgw::error_repo::encode_key(source_bs, each->gen),
- entry_timestamp), false);
- if (retcode < 0) {
- tn->log(0, SSTR("ERROR: failed to write " << each->gen << ":"
- << sid << " to error repo: retcode=" << retcode));
- }
- }
- auto i = std::distance(remote_info.generations.begin(), each);
- for (each[i]; each != remote_info.generations.end(); each++) {
- for (sid = 0; sid < each->num_shards; sid++){
- spawn(rgw::error_repo::write_cr(sync_env->store->svc()->rados, error_repo,
- rgw::error_repo::encode_key(source_bs, each->gen),
- entry_timestamp), false);
- if (retcode < 0) {
- tn->log(0, SSTR("ERROR: failed to write " << each->gen << ":"
- << sid << " to error repo: retcode=" << retcode));
- }
- }
- }
- }
- return 0;
- });
- drain_all_cb([&](uint64_t stack_id, int ret) {
- if (ret < 0) {
- tn->log(10, SSTR("a sync operation returned error: " << ret));
- }
- return ret;
- });
- }
- }
+ yield call(new RGWHandleFullSyncCR(sc, source_bs, iter->first, error_repo, entry_timestamp,
+ lease_cr, bucket_shard_cache, marker_tracker, tn));
}
sync_marker.marker = iter->first;
}