RGWDataSyncCtx *sc;
RGWDataSyncEnv *sync_env;
rgw_bucket_shard source_bs;
+ std::string error_marker;
ceph::real_time timestamp;
- int *error_result;
RGWSyncTraceNodeRef tn;
rgw_bucket_index_marker_info remote_info;
rgw_pool pool;
}
public:
- RGWFullSyncErrorRepoCR(RGWDataSyncCtx *_sc, rgw_bucket_shard& _source_bs,
- ceph::real_time& _timestamp, int *_error_result, RGWSyncTraceNodeRef& _tn)
+ RGWFullSyncErrorRepoCR(RGWDataSyncCtx *_sc, rgw_bucket_shard& _source_bs, std::string _error_marker,
+ ceph::real_time& _timestamp, RGWSyncTraceNodeRef& _tn)
: RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
- source_bs(_source_bs), timestamp(_timestamp), error_result(_error_result), tn(_tn) {
+ source_bs(_source_bs), error_marker(_error_marker), timestamp(_timestamp), tn(_tn) {
tn = sync_env->sync_tracer->add_node(_tn, "error_repo", SSTR(bucket_shard_str(source_bs)));
}
[&](uint64_t stack_id, int ret) {
if (ret < 0) {
tn->log(10, SSTR("writing to error repo returned error: " << ret));
- *error_result = ret;
}
- return 0;
- });
+ return ret;
+ });
}
}
drain_all_cb([&](uint64_t stack_id, int ret) {
if (ret < 0) {
tn->log(10, SSTR("writing to error repo returned error: " << ret));
- *error_result = ret;
}
- return 0;
+ return ret;
});
+ spawn(rgw::error_repo::remove_cr(sync_env->store->svc()->rados, error_repo,
+ error_marker, timestamp), false);
+
return set_cr_done();
}
return 0;
lease_cr.get(), tn);
}
-class RGWHandleFullSyncCR : public RGWCoroutine {
+class RGWDataFullSyncSingleEntryCR : public RGWCoroutine {
RGWDataSyncCtx *sc;
RGWDataSyncEnv *sync_env;
rgw_bucket_shard source_bs;
ceph::real_time timestamp;
boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
boost::intrusive_ptr<rgw::bucket_sync::Cache> bucket_shard_cache;
- std::optional<RGWDataSyncShardMarkerTrack> marker_tracker;
RGWSyncTraceNodeRef tn;
rgw_bucket_index_marker_info remote_info;
uint32_t sid;
std::vector<store_gen_shards>::iterator each;
+ uint64_t i{0};
public:
- RGWHandleFullSyncCR(RGWDataSyncCtx *_sc, rgw_bucket_shard& _source_bs,
+ RGWDataFullSyncSingleEntryCR(RGWDataSyncCtx *_sc, rgw_bucket_shard& _source_bs,
const std::string& _key, rgw_raw_obj& _error_repo,
ceph::real_time& _timestamp, boost::intrusive_ptr<RGWContinuousLeaseCR> _lease_cr,
boost::intrusive_ptr<rgw::bucket_sync::Cache> _bucket_shard_cache,
- std::optional<RGWDataSyncShardMarkerTrack> _marker_tracker,
RGWSyncTraceNodeRef& _tn)
: RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), source_bs(_source_bs), key(_key),
error_repo(_error_repo), timestamp(_timestamp), lease_cr(_lease_cr),
- bucket_shard_cache(_bucket_shard_cache), marker_tracker(_marker_tracker), tn(_tn) {}
+ bucket_shard_cache(_bucket_shard_cache), tn(_tn) {}
int operate(const DoutPrefixProvider *dpp) override {
source_bs.shard_id = 0;
yield call(sync_single_entry(sc, source_bs, remote_info.oldest_gen, key, timestamp,
- lease_cr, bucket_shard_cache, marker_tracker, error_repo, tn, false));
+ lease_cr, bucket_shard_cache, std::nullopt, error_repo, tn, false));
if (retcode < 0) {
tn->log(10, SSTR("full sync: failed to sync " << source_bs.shard_id << " of gen "
<< remote_info.oldest_gen << ". Writing to error repo for retry"));
each = remote_info.generations.begin();
for (; each != remote_info.generations.end(); each++) {
for (sid = 0; sid < each->num_shards; sid++) {
+ if (retcode < 0) {
+ sid = source_bs.shard_id;
+ for (; sid < each->num_shards; sid++) {
+ source_bs.shard_id = sid;
+ yield_spawn_window(rgw::error_repo::write_cr(sync_env->store->svc()->rados, error_repo,
+ rgw::error_repo::encode_key(source_bs, each->gen),
+ timestamp), cct->_conf->rgw_data_sync_spawn_window,
+ [&](uint64_t stack_id, int ret) {
+ if (ret < 0) {
+ tn->log(0, SSTR("ERROR: failed to write " << each->gen << ":"
+ << sid << " to error repo: retcode=" << ret));
+ }
+ return ret;
+ });
+ }
+ i = std::distance(remote_info.generations.begin(), each);
+ for (each[i]; each != remote_info.generations.end(); each++) {
+ for (sid = 0; sid < each->num_shards; sid++){
+ yield_spawn_window(rgw::error_repo::write_cr(sync_env->store->svc()->rados, error_repo,
+ rgw::error_repo::encode_key(source_bs, each->gen),
+ timestamp), cct->_conf->rgw_data_sync_spawn_window,
+ [&](uint64_t stack_id, int ret) {
+ if (ret < 0) {
+ tn->log(0, SSTR("ERROR: failed to write " << each->gen << ":"
+ << sid << " to error repo: retcode=" << ret));
+ }
+ return ret;
+ });
+ }
+ }
+ } else {
source_bs.shard_id = sid;
tn->log(10, SSTR("full sync: syncing shard_id " << sid << " of gen " << each->gen));
yield_spawn_window(sync_single_entry(sc, source_bs, each->gen, key, timestamp,
- lease_cr, bucket_shard_cache, marker_tracker, error_repo, tn, false),
+ lease_cr, bucket_shard_cache, std::nullopt, error_repo, tn, false),
cct->_conf->rgw_data_sync_spawn_window,
[&](uint64_t stack_id, int ret) {
- if (ret < 0) {
- sid = source_bs.shard_id;
- for (; sid < each->num_shards; sid++) {
- source_bs.shard_id = sid;
- spawn(rgw::error_repo::write_cr(sync_env->store->svc()->rados, error_repo,
- rgw::error_repo::encode_key(source_bs, each->gen),
- timestamp), false);
- if (retcode < 0) {
- tn->log(0, SSTR("ERROR: failed to write " << each->gen << ":"
- << sid << " to error repo: retcode=" << retcode));
- }
- }
- auto i = std::distance(remote_info.generations.begin(), each);
- for (each[i]; each != remote_info.generations.end(); each++) {
- for (sid = 0; sid < each->num_shards; sid++){
- spawn(rgw::error_repo::write_cr(sync_env->store->svc()->rados, error_repo,
- rgw::error_repo::encode_key(source_bs, each->gen),
- timestamp), false);
- if (retcode < 0) {
- tn->log(0, SSTR("ERROR: failed to write " << each->gen << ":"
- << sid << " to error repo: retcode=" << retcode));
- }
- }
+ if (ret < 0) {
+ retcode = ret;
}
- }
- return 0;
- });
- drain_all_cb([&](uint64_t stack_id, int ret) {
- if (ret < 0) {
- tn->log(10, SSTR("a sync operation returned error: " << ret));
- }
- return ret;
- });
+ return retcode;
+ });
+ }
}
}
+
+ drain_all_cb([&](uint64_t stack_id, int ret) {
+ if (ret < 0) {
+ tn->log(10, SSTR("a sync operation returned error: " << ret));
+ }
+ return ret;
+ });
+
return set_cr_done();
}
return 0;
rgw_bucket_shard source_bs;
std::optional<uint64_t> gen;
- int error_result{0};
// target number of entries to cache before recycling idle ones
static constexpr size_t target_cache_size = 256;
boost::intrusive_ptr<rgw::bucket_sync::Cache> bucket_shard_cache;
- rgw_bucket_index_marker_info remote_info;
- uint32_t sid;
- std::vector<store_gen_shards>::iterator each;
int parse_bucket_key(const std::string& key, rgw_bucket_shard& bs) const {
return rgw_bucket_parse_bucket_key(sync_env->cct, key,
if (!marker_tracker->start(iter->first, total_entries, entry_timestamp)) {
tn->log(0, SSTR("ERROR: cannot start syncing " << iter->first << ". Duplicate entry?"));
} else {
- yield call(new RGWHandleFullSyncCR(sc, source_bs, iter->first, error_repo, entry_timestamp,
- lease_cr, bucket_shard_cache, marker_tracker, tn));
+ yield_spawn_window(new RGWDataFullSyncSingleEntryCR(sc, source_bs, iter->first, error_repo, entry_timestamp,
+ lease_cr, bucket_shard_cache, tn), cct->_conf->rgw_data_sync_spawn_window, std::nullopt);
+
+ yield call(marker_tracker->finish(iter->first));
}
sync_marker.marker = iter->first;
}
}
if (!gen) {
// write all full sync obligations for the bucket to error repo
- spawn(new RGWFullSyncErrorRepoCR(sc, source_bs, entry_timestamp, &error_result, tn), false);
- if (error_result == 0) {
- spawn(rgw::error_repo::remove_cr(sync_env->store->svc()->rados, error_repo,
- error_marker, entry_timestamp), false);
- } else {
- tn->log(0, SSTR("ERROR: failed to write to error repo: retcode=" << error_result));
+ spawn(new RGWFullSyncErrorRepoCR(sc, source_bs, error_marker, entry_timestamp, tn), false);
+ if (retcode < 0) {
+ tn->log(0, SSTR("ERROR: failed to write to error repo: retcode=" << retcode));
}
} else {
tn->log(20, SSTR("handle error entry key=" << to_string(source_bs, gen) << " timestamp=" << entry_timestamp));