}
}
- /* process bucket shards that previously failed */
- yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, error_oid),
- error_marker, &error_entries,
- max_error_entries));
- tn->log(20, SSTR("read error repo, got " << error_entries.size() << " entries"));
- iter = error_entries.begin();
- for (; iter != error_entries.end(); ++iter) {
- error_marker = *iter;
- tn->log(20, SSTR("handle error entry: " << error_marker));
- spawn(new RGWDataSyncSingleEntryCR(sync_env, error_marker, error_marker, nullptr /* no marker tracker */, error_repo, true, tn), false);
- }
- if ((int)error_entries.size() != max_error_entries) {
- if (error_marker.empty() && error_entries.empty()) {
- /* the retry repo is empty, we back off a bit before calling it again */
- retry_backoff_secs *= 2;
- if (retry_backoff_secs > RETRY_BACKOFF_SECS_MAX) {
- retry_backoff_secs = RETRY_BACKOFF_SECS_MAX;
+ if (error_retry_time <= ceph::coarse_real_clock::now()) {
+ /* process bucket shards that previously failed */
+ yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, error_oid),
+ error_marker, &error_entries,
+ max_error_entries));
+ tn->log(20, SSTR("read error repo, got " << error_entries.size() << " entries"));
+ iter = error_entries.begin();
+ for (; iter != error_entries.end(); ++iter) {
+ error_marker = *iter;
+ tn->log(20, SSTR("handle error entry: " << error_marker));
+ spawn(new RGWDataSyncSingleEntryCR(sync_env, error_marker, error_marker, nullptr /* no marker tracker */, error_repo, true, tn), false);
+ }
+ if ((int)error_entries.size() != max_error_entries) {
+ if (error_marker.empty() && error_entries.empty()) {
+ /* the retry repo is empty, we back off a bit before calling it again */
+ retry_backoff_secs *= 2;
+ if (retry_backoff_secs > RETRY_BACKOFF_SECS_MAX) {
+ retry_backoff_secs = RETRY_BACKOFF_SECS_MAX;
+ }
+ } else {
+ retry_backoff_secs = RETRY_BACKOFF_SECS_DEFAULT;
}
- } else {
- retry_backoff_secs = RETRY_BACKOFF_SECS_DEFAULT;
+ error_retry_time = ceph::coarse_real_clock::now() + make_timespan(retry_backoff_secs);
+ error_marker.clear();
}
- error_retry_time = ceph::coarse_real_clock::now() + make_timespan(retry_backoff_secs);
- error_marker.clear();
}
-
yield call(new RGWReadRemoteDataLogShardInfoCR(sync_env, shard_id, &shard_info));
if (retcode < 0) {
tn->log(0, SSTR("ERROR: failed to fetch remote data log info: ret=" << retcode));
tn->log(20, SSTR("shard_id=" << shard_id << " datalog_marker=" << datalog_marker << " sync_marker.marker=" << sync_marker.marker));
if (datalog_marker == sync_marker.marker || remote_trimmed == RemoteTrimmed) {
tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
-#define INCREMENTAL_INTERVAL 20
- yield wait(utime_t(INCREMENTAL_INTERVAL, 0));
+ yield wait(get_idle_interval());
}
} while (true);
}
return 0;
}
+
+ utime_t get_idle_interval() const {
+#define INCREMENTAL_INTERVAL 20
+ ceph::timespan interval = std::chrono::seconds(INCREMENTAL_INTERVAL);
+ if (!ceph::coarse_real_clock::is_zero(error_retry_time)) {
+ auto now = ceph::coarse_real_clock::now();
+ if (error_retry_time > now) {
+ auto d = error_retry_time - now;
+ if (interval > d) {
+ interval = d;
+ }
+ }
+ }
+ // convert timespan -> time_point -> utime_t
+ return utime_t(ceph::coarse_real_clock::zero() + interval);
+ }
+
void stop_spawned_services() {
lease_cr->go_down();
if (error_repo) {