RGWDataSyncCtx *sc;
RGWDataSyncEnv *sync_env;
+ rgw_bucket_shard source_bs;
string raw_key;
string entry_marker;
ceph::real_time entry_timestamp;
- rgw_bucket_shard source_bs;
-
int sync_status;
bufferlist md_bl;
RGWSyncTraceNodeRef tn;
public:
- RGWDataSyncSingleEntryCR(RGWDataSyncCtx *_sc,
+ RGWDataSyncSingleEntryCR(RGWDataSyncCtx *_sc, const rgw_bucket_shard& source_bs,
const string& _raw_key, const string& _entry_marker,
ceph::real_time entry_timestamp, RGWDataSyncShardMarkerTrack *_marker_tracker,
- RGWOmapAppend *_error_repo, bool _remove_from_repo, const RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sc->cct),
- sc(_sc), sync_env(_sc->env),
- raw_key(_raw_key), entry_marker(_entry_marker),
- entry_timestamp(entry_timestamp), sync_status(0),
- marker_tracker(_marker_tracker),
- error_repo(_error_repo), remove_from_repo(_remove_from_repo) {
+ RGWOmapAppend *_error_repo, bool _remove_from_repo, const RGWSyncTraceNodeRef& _tn_parent)
+ : RGWCoroutine(_sc->cct),
+ sc(_sc), sync_env(_sc->env), source_bs(source_bs),
+ raw_key(_raw_key), entry_marker(_entry_marker),
+ entry_timestamp(entry_timestamp), sync_status(0),
+ marker_tracker(_marker_tracker),
+ error_repo(_error_repo), remove_from_repo(_remove_from_repo) {
set_description() << "data sync single entry (source_zone=" << sc->source_zone << ") key=" <<_raw_key << " entry=" << entry_marker;
tn = sync_env->sync_tracer->add_node(_tn_parent, "entry", raw_key);
}
int operate() override {
reenter(this) {
do {
- yield {
- int ret = rgw_bucket_parse_bucket_key(sync_env->cct, raw_key,
- &source_bs.bucket, &source_bs.shard_id);
- if (ret < 0) {
- return set_cr_error(-EIO);
- }
- if (marker_tracker) {
- marker_tracker->reset_need_retry(raw_key);
- }
- tn->log(0, SSTR("triggering sync of source bucket/shard " << bucket_shard_str{source_bs}));
-
- call(new RGWRunBucketSourcesSyncCR(sc,
- std::nullopt, /* target_bs */
- source_bs,
- tn));
+ if (marker_tracker) {
+ marker_tracker->reset_need_retry(raw_key);
}
+ tn->log(0, SSTR("triggering sync of source bucket/shard " << bucket_shard_str{source_bs}));
+
+ yield call(new RGWRunBucketSourcesSyncCR(sc,
+ std::nullopt, /* target_bs */
+ source_bs,
+ tn));
} while (marker_tracker && marker_tracker->need_retry(raw_key));
sync_status = retcode;
uint32_t retry_backoff_secs;
RGWSyncTraceNodeRef tn;
+
+ rgw_bucket_shard source_bs;
+
+ int parse_bucket_key(const std::string& key, rgw_bucket_shard& bs) const {
+ return rgw_bucket_parse_bucket_key(sync_env->cct, key,
+ &bs.bucket, &bs.shard_id);
+ }
public:
RGWDataSyncShardCR(RGWDataSyncCtx *_sc,
rgw_pool& _pool,
tn->log(20, SSTR("retrieved " << entries.size() << " entries to sync"));
iter = entries.begin();
for (; iter != entries.end(); ++iter) {
+ retcode = parse_bucket_key(iter->first, source_bs);
+ if (retcode < 0) {
+ tn->log(1, SSTR("failed to parse bucket shard: " << iter->first));
+ marker_tracker->try_update_high_marker(iter->first, 0, entry_timestamp);
+ continue;
+ }
tn->log(20, SSTR("full sync: " << iter->first));
total_entries++;
if (!marker_tracker->start(iter->first, total_entries, entry_timestamp)) {
tn->log(0, SSTR("ERROR: cannot start syncing " << iter->first << ". Duplicate entry?"));
} else {
// fetch remote and write locally
- yield spawn(new RGWDataSyncSingleEntryCR(sc, iter->first, iter->first, entry_timestamp, marker_tracker, error_repo, false, tn), false);
+ yield spawn(new RGWDataSyncSingleEntryCR(sc, source_bs, iter->first, iter->first, entry_timestamp, marker_tracker, error_repo, false, tn), false);
}
sync_marker.marker = iter->first;
}
/* process out of band updates */
for (modified_iter = current_modified.begin(); modified_iter != current_modified.end(); ++modified_iter) {
- yield {
- tn->log(20, SSTR("received async update notification: " << *modified_iter));
- spawn(new RGWDataSyncSingleEntryCR(sc, *modified_iter, string(), ceph::real_time{}, marker_tracker, nullptr, false, tn), false);
+ retcode = parse_bucket_key(*modified_iter, source_bs);
+ if (retcode < 0) {
+ tn->log(1, SSTR("failed to parse bucket shard: " << *modified_iter));
+ continue;
}
+ tn->log(20, SSTR("received async update notification: " << *modified_iter));
+ spawn(new RGWDataSyncSingleEntryCR(sc, source_bs, *modified_iter, string(), ceph::real_time{}, marker_tracker, nullptr, false, tn), false);
}
if (error_retry_time <= ceph::coarse_real_clock::now()) {
for (; iter != error_entries.end(); ++iter) {
error_marker = iter->first;
entry_timestamp = rgw_error_repo_decode_value(iter->second);
+ retcode = parse_bucket_key(error_marker, source_bs);
+ if (retcode < 0) {
+ tn->log(1, SSTR("failed to parse bucket shard: " << error_marker));
+ spawn(rgw_error_repo_remove_cr(sync_env->store->svc()->rados, rgw_raw_obj{pool, error_oid},
+ error_marker, entry_timestamp), false);
+ continue;
+ }
tn->log(20, SSTR("handle error entry key=" << error_marker << " timestamp=" << entry_timestamp));
- spawn(new RGWDataSyncSingleEntryCR(sc, error_marker, error_marker,
+ spawn(new RGWDataSyncSingleEntryCR(sc, source_bs, error_marker, error_marker,
entry_timestamp, nullptr /* no marker tracker */,
error_repo, true, tn), false);
}
for (log_iter = log_entries.begin(); log_iter != log_entries.end(); ++log_iter) {
tn->log(20, SSTR("shard_id=" << shard_id << " log_entry: " << log_iter->log_id << ":" << log_iter->log_timestamp << ":" << log_iter->entry.key));
+ retcode = parse_bucket_key(log_iter->entry.key, source_bs);
+ if (retcode < 0) {
+ tn->log(1, SSTR("failed to parse bucket shard: " << log_iter->entry.key));
+ marker_tracker->try_update_high_marker(log_iter->log_id, 0, log_iter->log_timestamp);
+ continue;
+ }
if (!marker_tracker->index_key_to_marker(log_iter->entry.key, log_iter->log_id)) {
tn->log(20, SSTR("skipping sync of entry: " << log_iter->log_id << ":" << log_iter->entry.key << " sync already in progress for bucket shard"));
marker_tracker->try_update_high_marker(log_iter->log_id, 0, log_iter->log_timestamp);
if (!marker_tracker->start(log_iter->log_id, 0, log_iter->log_timestamp)) {
tn->log(0, SSTR("ERROR: cannot start syncing " << log_iter->log_id << ". Duplicate entry?"));
} else {
- spawn(new RGWDataSyncSingleEntryCR(sc, log_iter->entry.key, log_iter->log_id,
+ spawn(new RGWDataSyncSingleEntryCR(sc, source_bs, log_iter->entry.key, log_iter->log_id,
log_iter->log_timestamp, marker_tracker,
error_repo, false, tn), false);
}