class RGWDataSyncSingleEntryCR : public RGWCoroutine {
RGWDataSyncCtx *sc;
RGWDataSyncEnv *sync_env;
-
rgw_bucket_shard source_bs;
- string raw_key;
- string entry_marker;
- ceph::real_time entry_timestamp;
-
- int sync_status;
-
- bufferlist md_bl;
-
+ rgw_data_sync_obligation obligation;
RGWDataSyncShardMarkerTrack *marker_tracker;
-
boost::intrusive_ptr<RGWOmapAppend> error_repo;
- bool remove_from_repo;
-
RGWSyncTraceNodeRef tn;
+
+ int sync_status = 0;
public:
RGWDataSyncSingleEntryCR(RGWDataSyncCtx *_sc, const rgw_bucket_shard& source_bs,
- const string& _raw_key, const string& _entry_marker,
- ceph::real_time entry_timestamp, RGWDataSyncShardMarkerTrack *_marker_tracker,
- RGWOmapAppend *_error_repo, bool _remove_from_repo, const RGWSyncTraceNodeRef& _tn_parent)
+ rgw_data_sync_obligation obligation, RGWDataSyncShardMarkerTrack *_marker_tracker,
+ RGWOmapAppend *_error_repo, const RGWSyncTraceNodeRef& _tn_parent)
: RGWCoroutine(_sc->cct),
sc(_sc), sync_env(_sc->env), source_bs(source_bs),
- raw_key(_raw_key), entry_marker(_entry_marker),
- entry_timestamp(entry_timestamp), sync_status(0),
+ obligation(std::move(obligation)),
marker_tracker(_marker_tracker),
- error_repo(_error_repo), remove_from_repo(_remove_from_repo) {
- set_description() << "data sync single entry (source_zone=" << sc->source_zone << ") key=" <<_raw_key << " entry=" << entry_marker;
- tn = sync_env->sync_tracer->add_node(_tn_parent, "entry", raw_key);
+ error_repo(_error_repo) {
+ set_description() << "data sync single entry (source_zone=" << sc->source_zone << ") " << obligation;
+ tn = sync_env->sync_tracer->add_node(_tn_parent, "entry", obligation.key);
}
int operate() override {
reenter(this) {
do {
if (marker_tracker) {
- marker_tracker->reset_need_retry(raw_key);
+ marker_tracker->reset_need_retry(obligation.key);
}
tn->log(0, SSTR("triggering sync of source bucket/shard " << bucket_shard_str{source_bs}));
std::nullopt, /* target_bs */
source_bs,
tn));
- } while (marker_tracker && marker_tracker->need_retry(raw_key));
+ } while (marker_tracker && marker_tracker->need_retry(obligation.key));
sync_status = retcode;
// this was added when 'tenant/' was added to datalog entries, because
// preexisting tenant buckets could never sync and would stay in the
// error_repo forever
- tn->log(0, SSTR("WARNING: skipping data log entry for missing bucket " << raw_key));
+ tn->log(0, SSTR("WARNING: skipping data log entry for missing bucket " << obligation.key));
sync_status = 0;
}
if (sync_status < 0) {
// write actual sync failures for 'radosgw-admin sync error list'
if (sync_status != -EBUSY && sync_status != -EAGAIN) {
- yield call(sync_env->error_logger->log_error_cr(sc->conn->get_remote_id(), "data", raw_key,
+ yield call(sync_env->error_logger->log_error_cr(sc->conn->get_remote_id(), "data", obligation.key,
-sync_status, string("failed to sync bucket instance: ") + cpp_strerror(-sync_status)));
if (retcode < 0) {
tn->log(0, SSTR("ERROR: failed to log sync failure: retcode=" << retcode));
}
if (error_repo) {
yield call(rgw_error_repo_write_cr(sync_env->store->svc()->rados, error_repo->get_obj(),
- raw_key, entry_timestamp));
+ obligation.key, obligation.timestamp));
if (retcode < 0) {
tn->log(0, SSTR("ERROR: failed to log sync failure in error repo: retcode=" << retcode));
}
}
- } else if (error_repo && remove_from_repo) {
+ } else if (error_repo && obligation.retry) {
yield call(rgw_error_repo_remove_cr(sync_env->store->svc()->rados, error_repo->get_obj(),
- raw_key, entry_timestamp));
+ obligation.key, obligation.timestamp));
if (retcode < 0) {
tn->log(0, SSTR("ERROR: failed to remove omap key from error repo ("
<< error_repo->get_obj() << " retcode=" << retcode));
}
}
/* FIXME: what do do in case of error */
- if (marker_tracker && !entry_marker.empty()) {
+ if (marker_tracker && !obligation.marker.empty()) {
/* update marker */
- yield call(marker_tracker->finish(entry_marker));
+ yield call(marker_tracker->finish(obligation.marker));
}
if (sync_status == 0) {
sync_status = retcode;
const std::string& key,
const std::string& marker,
ceph::real_time timestamp, bool retry) {
- return new RGWDataSyncSingleEntryCR(sc, src, key, marker, timestamp,
- &*marker_tracker, error_repo, retry, tn);
+ auto obligation = rgw_data_sync_obligation{key, marker, timestamp, retry};
+ return new RGWDataSyncSingleEntryCR(sc, src, std::move(obligation),
+ &*marker_tracker, error_repo, tn);
}
public:
RGWDataSyncShardCR(RGWDataSyncCtx *_sc,