RGWDataSyncCtx *sc;
RGWDataSyncEnv *sync_env;
rgw::bucket_sync::Handle state; // cached bucket-shard state
- rgw_data_sync_obligation obligation;
+ rgw_data_sync_obligation obligation; // input obligation
+ std::optional<rgw_data_sync_obligation> complete; // obligation to complete
RGWDataSyncShardMarkerTrack *marker_tracker;
boost::intrusive_ptr<RGWOmapAppend> error_repo;
RGWSyncTraceNodeRef tn;
int operate() override {
reenter(this) {
- do {
- if (marker_tracker) {
- marker_tracker->reset_need_retry(obligation.key);
+ if (state->obligation) {
+ // this is already syncing in another DataSyncSingleEntryCR
+ if (state->obligation->timestamp < obligation.timestamp) {
+ // cancel existing obligation and overwrite it
+ tn->log(10, SSTR("canceling existing obligation " << *state->obligation));
+ complete = std::move(*state->obligation);
+ *state->obligation = std::move(obligation);
+ } else {
+ // cancel new obligation
+ tn->log(10, SSTR("canceling new obligation " << obligation));
+ complete = std::move(obligation);
}
- tn->log(0, SSTR("triggering sync of source bucket/shard " << bucket_shard_str{state->key}));
+ } else {
+ // start syncing a new obligation
+ state->obligation = obligation;
- yield call(new RGWRunBucketSourcesSyncCR(sc,
- std::nullopt, /* target_bs */
- state->key,
- tn, &progress));
- if (retcode == 0) {
- tn->log(20, SSTR("RunBucketSources progress=" << progress));
- }
- } while (marker_tracker && marker_tracker->need_retry(obligation.key));
+ do {
+ yield {
+ tn->log(4, SSTR("triggering sync of source bucket/shard " << bucket_shard_str{state->key} << " " << *state->obligation));
+
+ call(new RGWRunBucketSourcesSyncCR(sc,
+ std::nullopt, /* target_bs */
+ state->key,
+ tn, &progress));
+ }
+ } while (marker_tracker && marker_tracker->need_retry(obligation.key));
+ tn->log(10, SSTR("sync finished on " << bucket_shard_str{state->key}));
+
+ // any new obligations will process themselves
+ complete = std::move(*state->obligation);
+ state->obligation.reset();
+ }
sync_status = retcode;
if (sync_status == -ENOENT) {
// this was added when 'tenant/' was added to datalog entries, because
// preexisting tenant buckets could never sync and would stay in the
// error_repo forever
- tn->log(0, SSTR("WARNING: skipping data log entry for missing bucket " << obligation.key));
+ tn->log(0, SSTR("WARNING: skipping data log entry for missing bucket " << complete->key));
sync_status = 0;
}
if (sync_status < 0) {
// write actual sync failures for 'radosgw-admin sync error list'
if (sync_status != -EBUSY && sync_status != -EAGAIN) {
- yield call(sync_env->error_logger->log_error_cr(sc->conn->get_remote_id(), "data", obligation.key,
+ yield call(sync_env->error_logger->log_error_cr(sc->conn->get_remote_id(), "data", complete->key,
-sync_status, string("failed to sync bucket instance: ") + cpp_strerror(-sync_status)));
if (retcode < 0) {
tn->log(0, SSTR("ERROR: failed to log sync failure: retcode=" << retcode));
}
}
- if (error_repo) {
+ if (error_repo && complete->timestamp != ceph::real_time{}) {
+ tn->log(10, SSTR("writing " << *complete << " to error repo for retry"));
yield call(rgw_error_repo_write_cr(sync_env->store->svc()->rados, error_repo->get_obj(),
- obligation.key, obligation.timestamp));
+ complete->key, complete->timestamp));
if (retcode < 0) {
tn->log(0, SSTR("ERROR: failed to log sync failure in error repo: retcode=" << retcode));
}
}
- } else if (error_repo && obligation.retry) {
+ } else if (error_repo && complete->retry) {
yield call(rgw_error_repo_remove_cr(sync_env->store->svc()->rados, error_repo->get_obj(),
- obligation.key, obligation.timestamp));
+ complete->key, complete->timestamp));
if (retcode < 0) {
tn->log(0, SSTR("ERROR: failed to remove omap key from error repo ("
<< error_repo->get_obj() << " retcode=" << retcode));
}
}
/* FIXME: what do do in case of error */
- if (marker_tracker && !obligation.marker.empty()) {
+ if (marker_tracker && !complete->marker.empty()) {
/* update marker */
- yield call(marker_tracker->finish(obligation.marker));
+ yield call(marker_tracker->finish(complete->marker));
}
if (sync_status == 0) {
sync_status = retcode;