}
}
- /* process bucket shards that previously failed */
- yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, error_oid),
- error_marker, &error_entries,
- max_error_entries));
- ldout(sync_env->cct, 20) << __func__ << "(): read error repo, got " << error_entries.size() << " entries" << dendl;
- iter = error_entries.begin();
- for (; iter != error_entries.end(); ++iter) {
- error_marker = *iter;
- ldout(sync_env->cct, 20) << __func__ << "(): handle error entry: " << error_marker << dendl;
- spawn(new RGWDataSyncSingleEntryCR(sync_env, error_marker, error_marker, nullptr /* no marker tracker */, error_repo, true), false);
- }
- if ((int)error_entries.size() != max_error_entries) {
- if (error_marker.empty() && error_entries.empty()) {
- /* the retry repo is empty, we back off a bit before calling it again */
- retry_backoff_secs *= 2;
- if (retry_backoff_secs > RETRY_BACKOFF_SECS_MAX) {
- retry_backoff_secs = RETRY_BACKOFF_SECS_MAX;
+ if (error_retry_time <= ceph::coarse_real_clock::now()) {
+ /* process bucket shards that previously failed */
+ yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, error_oid),
+ error_marker, &error_entries,
+ max_error_entries));
+ ldout(sync_env->cct, 20) << __func__ << "(): read error repo, got " << error_entries.size() << " entries" << dendl;
+ iter = error_entries.begin();
+ for (; iter != error_entries.end(); ++iter) {
+ error_marker = *iter;
+ ldout(sync_env->cct, 20) << __func__ << "(): handle error entry: " << error_marker << dendl;
+ spawn(new RGWDataSyncSingleEntryCR(sync_env, error_marker, error_marker, nullptr /* no marker tracker */, error_repo, true), false);
+ }
+ if ((int)error_entries.size() != max_error_entries) {
+ if (error_marker.empty() && error_entries.empty()) {
+ /* the retry repo is empty, we back off a bit before calling it again */
+ retry_backoff_secs *= 2;
+ if (retry_backoff_secs > RETRY_BACKOFF_SECS_MAX) {
+ retry_backoff_secs = RETRY_BACKOFF_SECS_MAX;
+ }
+ } else {
+ retry_backoff_secs = RETRY_BACKOFF_SECS_DEFAULT;
}
- } else {
- retry_backoff_secs = RETRY_BACKOFF_SECS_DEFAULT;
+ error_retry_time = ceph::coarse_real_clock::now() + make_timespan(retry_backoff_secs);
+ error_marker.clear();
}
- error_retry_time = ceph::coarse_real_clock::now() + make_timespan(retry_backoff_secs);
- error_marker.clear();
}
-
#define INCREMENTAL_MAX_ENTRIES 100
ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " sync_marker=" << sync_marker.marker << dendl;
spawned_keys.clear();
}
ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " sync_marker=" << sync_marker.marker << " truncated=" << truncated << dendl;
if (!truncated) {
-#define INCREMENTAL_INTERVAL 20
- yield wait(utime_t(INCREMENTAL_INTERVAL, 0));
- }
+ yield wait(get_idle_interval());
+ }
} while (true);
}
return 0;
}
+
+ utime_t get_idle_interval() const {
+#define INCREMENTAL_INTERVAL 20
+ ceph::timespan interval = std::chrono::seconds(INCREMENTAL_INTERVAL);
+ if (!ceph::coarse_real_clock::is_zero(error_retry_time)) {
+ auto now = ceph::coarse_real_clock::now();
+ if (error_retry_time > now) {
+ auto d = error_retry_time - now;
+ if (interval > d) {
+ interval = d;
+ }
+ }
+ }
+ // convert timespan -> time_point -> utime_t
+ return utime_t(ceph::coarse_real_clock::zero() + interval);
+ }
+
void stop_spawned_services() {
lease_cr->go_down();
if (error_repo) {