vector<rgw_data_change_log_entry> log_entries;
decltype(log_entries)::iterator log_iter;
bool truncated = false;
+ int cbret = 0;
utime_t get_idle_interval() const {
ceph::timespan interval = std::chrono::seconds(cct->_conf->rgw_data_sync_poll_interval);
modified_iter != current_modified.end();
++modified_iter) {
if (!lease_cr->is_locked()) {
- yield call(marker_tracker->flush());
drain_all();
+ yield call(marker_tracker->flush());
+ if (retcode < 0) {
+ tn->log(0, SSTR("ERROR: data sync marker_tracker.flush() returned retcode=" << retcode));
+ return set_cr_error(retcode);
+ }
return set_cr_error(-ECANCELED);
}
retcode = parse_bucket_key(modified_iter->key, source_bs);
iter = error_entries.begin();
for (; iter != error_entries.end(); ++iter) {
if (!lease_cr->is_locked()) {
- yield call(marker_tracker->flush());
drain_all();
+ yield call(marker_tracker->flush());
+ if (retcode < 0) {
+ tn->log(0, SSTR("ERROR: data sync marker_tracker.flush() returned retcode=" << retcode));
+ return set_cr_error(retcode);
+ }
return set_cr_error(-ECANCELED);
}
error_marker = iter->first;
log_iter != log_entries.end();
++log_iter) {
if (!lease_cr->is_locked()) {
- yield call(marker_tracker->flush());
drain_all();
+ yield call(marker_tracker->flush());
+ if (retcode < 0) {
+ tn->log(0, SSTR("ERROR: data sync marker_tracker.flush() returned retcode=" << retcode));
+ return set_cr_error(retcode);
+ }
return set_cr_error(-ECANCELED);
}
tn->log(0, SSTR("ERROR: cannot start syncing " << log_iter->log_id
<< ". Duplicate entry?"));
} else {
- tn->log(1, SSTR("incremental sync on " << log_iter->entry.key
- << "shard: " << shard_id << "on gen "
- << log_iter->entry.gen));
- yield_spawn_window(
- data_sync_single_entry(sc, source_bs,log_iter->entry.gen,
- log_iter->log_id, log_iter->log_timestamp,
- lease_cr,bucket_shard_cache,
- &*marker_tracker, error_repo, tn, false),
- sc->lcc.adj_concurrency(cct->_conf->rgw_data_sync_spawn_window), std::nullopt);
+ tn->log(1, SSTR("incremental sync on " << log_iter->entry.key << "shard: " << shard_id << "on gen " << log_iter->entry.gen));
+ yield_spawn_window(data_sync_single_entry(sc, source_bs, log_iter->entry.gen, log_iter->log_id,
+ log_iter->log_timestamp, lease_cr,bucket_shard_cache,
+ &*marker_tracker, error_repo, tn, false),
+ sc->lcc.adj_concurrency(cct->_conf->rgw_data_sync_spawn_window),
+ [&](uint64_t stack_id, int ret) {
+ if (ret < 0) {
+ tn->log(10, SSTR("data_sync_single_entry returned error: " << ret));
+ cbret = ret;
+ }
+ return 0;
+ });
}
}
+ if (cbret < 0 ) {
+ retcode = cbret;
+ drain_all();
+ return set_cr_error(retcode);
+ }
tn->log(20, SSTR("shard_id=" << shard_id <<
" sync_marker="<< sync_marker.marker
total_entries = sync_status.full.count;
do {
if (lease_cr && !lease_cr->is_locked()) {
- drain_all();
- yield call(marker_tracker.flush());
tn->log(1, "no lease or lease is lost, abort");
+ drain_all();
+ yield call(marker_tracker.flush());
+ if (retcode < 0) {
+ tn->log(0, SSTR("ERROR: bucket full sync marker_tracker.flush() returned retcode=" << retcode));
+ return set_cr_error(retcode);
+ }
return set_cr_error(-ECANCELED);
}
set_status("listing remote bucket");
drain_all();
yield call(marker_tracker.flush());
tn->log(1, "no lease or lease is lost, abort");
+ if (retcode < 0) {
+ tn->log(0, SSTR("ERROR: bucket full sync marker_tracker.flush() returned retcode=" << retcode));
+ return set_cr_error(retcode);
+ }
return set_cr_error(-ECANCELED);
}
tn->log(20, SSTR("[full sync] syncing object: "
if (lease_cr && !lease_cr->is_locked()) {
tn->log(1, "no lease or lease is lost, abort");
yield call(marker_tracker.flush());
+ if (retcode < 0) {
+ tn->log(0, SSTR("ERROR: bucket full sync marker_tracker.flush() returned retcode=" << retcode));
+ return set_cr_error(retcode);
+ }
return set_cr_error(-ECANCELED);
}
yield call(marker_tracker.flush());
if (retcode < 0) {
- tn->log(0, SSTR("ERROR: marker_tracker.flush() returned retcode=" << retcode));
+ tn->log(0, SSTR("ERROR: bucket full sync marker_tracker.flush() returned retcode=" << retcode));
return set_cr_error(retcode);
}
/* update sync state to incremental */
reenter(this) {
do {
if (lease_cr && !lease_cr->is_locked()) {
+ tn->log(1, "no lease or lease is lost, abort");
drain_all();
yield call(marker_tracker.flush());
- tn->log(1, "no lease or lease is lost, abort");
+ if (retcode < 0) {
+ tn->log(0, SSTR("ERROR: incremental sync marker_tracker.flush() returned retcode=" << retcode));
+ return set_cr_error(retcode);
+ }
return set_cr_error(-ECANCELED);
}
tn->log(20, SSTR("listing bilog for incremental sync; position=" << sync_info.inc_marker.position));
entries_iter = list_result.begin();
for (; entries_iter != entries_end; ++entries_iter) {
if (lease_cr && !lease_cr->is_locked()) {
- drain_all();
- yield call(marker_tracker.flush());
tn->log(1, "no lease or lease is lost, abort");
+ drain_all();
+ yield call(marker_tracker.flush());
+ if (retcode < 0) {
+ tn->log(0, SSTR("ERROR: incremental sync marker_tracker.flush() returned retcode=" << retcode));
+ return set_cr_error(retcode);
+ }
return set_cr_error(-ECANCELED);
}
entry = &(*entries_iter);
yield call(marker_tracker.flush());
if (retcode < 0) {
- tn->log(0, SSTR("ERROR: marker_tracker.flush() returned retcode=" << retcode));
+ tn->log(0, SSTR("ERROR: incremental sync marker_tracker.flush() returned retcode=" << retcode));
return set_cr_error(retcode);
}
if (sync_status < 0) {