}
}
- /* process bucket shards that previously failed */
- omapkeys = std::make_shared<RGWRadosGetOmapKeysCR::Result>();
- yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, error_oid),
- error_marker, max_error_entries, omapkeys));
- error_entries = std::move(omapkeys->entries);
- tn->log(20, SSTR("read error repo, got " << error_entries.size() << " entries"));
- iter = error_entries.begin();
- for (; iter != error_entries.end(); ++iter) {
- error_marker = *iter;
- tn->log(20, SSTR("handle error entry: " << error_marker));
- spawn(new RGWDataSyncSingleEntryCR(sync_env, error_marker, error_marker, nullptr /* no marker tracker */, error_repo, true, tn), false);
- }
- if (!omapkeys->more) {
- if (error_marker.empty() && error_entries.empty()) {
- /* the retry repo is empty, we back off a bit before calling it again */
- retry_backoff_secs *= 2;
- if (retry_backoff_secs > RETRY_BACKOFF_SECS_MAX) {
- retry_backoff_secs = RETRY_BACKOFF_SECS_MAX;
+ if (error_retry_time <= ceph::coarse_real_clock::now()) {
+ /* process bucket shards that previously failed */
+ omapkeys = std::make_shared<RGWRadosGetOmapKeysCR::Result>();
+ yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, error_oid),
+ error_marker, max_error_entries, omapkeys));
+ error_entries = std::move(omapkeys->entries);
+ tn->log(20, SSTR("read error repo, got " << error_entries.size() << " entries"));
+ iter = error_entries.begin();
+ for (; iter != error_entries.end(); ++iter) {
+ error_marker = *iter;
+ tn->log(20, SSTR("handle error entry: " << error_marker));
+ spawn(new RGWDataSyncSingleEntryCR(sync_env, error_marker, error_marker, nullptr /* no marker tracker */, error_repo, true, tn), false);
+ }
+ if (!omapkeys->more) {
+ if (error_marker.empty() && error_entries.empty()) {
+ /* the retry repo is empty, we back off a bit before calling it again */
+ retry_backoff_secs *= 2;
+ if (retry_backoff_secs > RETRY_BACKOFF_SECS_MAX) {
+ retry_backoff_secs = RETRY_BACKOFF_SECS_MAX;
+ }
+ } else {
+ retry_backoff_secs = RETRY_BACKOFF_SECS_DEFAULT;
}
- } else {
- retry_backoff_secs = RETRY_BACKOFF_SECS_DEFAULT;
+ error_retry_time = ceph::coarse_real_clock::now() + make_timespan(retry_backoff_secs);
+ error_marker.clear();
}
- error_retry_time = ceph::coarse_real_clock::now() + make_timespan(retry_backoff_secs);
- error_marker.clear();
}
omapkeys.reset();
-
#define INCREMENTAL_MAX_ENTRIES 100
tn->log(20, SSTR("shard_id=" << shard_id << " sync_marker=" << sync_marker.marker));
spawned_keys.clear();
if (!truncated) {
// we reached the end, wait a while before checking for more
tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
-#define INCREMENTAL_INTERVAL 20
- yield wait(utime_t(INCREMENTAL_INTERVAL, 0));
- }
+ yield wait(get_idle_interval());
+ }
} while (true);
}
return 0;
}
+
+ utime_t get_idle_interval() const {
+#define INCREMENTAL_INTERVAL 20
+ ceph::timespan interval = std::chrono::seconds(INCREMENTAL_INTERVAL);
+ if (!ceph::coarse_real_clock::is_zero(error_retry_time)) {
+ auto now = ceph::coarse_real_clock::now();
+ if (error_retry_time > now) {
+ auto d = error_retry_time - now;
+ if (interval > d) {
+ interval = d;
+ }
+ }
+ }
+ // convert timespan -> time_point -> utime_t
+ return utime_t(ceph::coarse_real_clock::zero() + interval);
+ }
+
void stop_spawned_services() {
lease_cr->go_down();
if (error_repo) {