From 4222eb610e2019bad89c8fb190358bef35905ec9 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Tue, 14 Aug 2018 11:16:16 -0400 Subject: [PATCH] rgw: data sync respects error_retry_time for backoff on error_repo don't restart processing the error_repo until error_retry_time. when data sync is otherwise idle, don't sleep past error_retry_time Fixes: http://tracker.ceph.com/issues/26938 Signed-off-by: Casey Bodley (cherry picked from commit eb655323781ce4d23d6983aa5164d9dc367497e9) Conflicts: src/rgw/rgw_data_sync.cc --- src/rgw/rgw_data_sync.cc | 67 +++++++++++++++++++++++++--------------- 1 file changed, 42 insertions(+), 25 deletions(-) diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index f9090024f4714..52a1f6efa89a2 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -1330,32 +1330,33 @@ public: } } - /* process bucket shards that previously failed */ - yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, error_oid), - error_marker, &error_entries, - max_error_entries)); - ldout(sync_env->cct, 20) << __func__ << "(): read error repo, got " << error_entries.size() << " entries" << dendl; - iter = error_entries.begin(); - for (; iter != error_entries.end(); ++iter) { - error_marker = *iter; - ldout(sync_env->cct, 20) << __func__ << "(): handle error entry: " << error_marker << dendl; - spawn(new RGWDataSyncSingleEntryCR(sync_env, error_marker, error_marker, nullptr /* no marker tracker */, error_repo, true), false); - } - if ((int)error_entries.size() != max_error_entries) { - if (error_marker.empty() && error_entries.empty()) { - /* the retry repo is empty, we back off a bit before calling it again */ - retry_backoff_secs *= 2; - if (retry_backoff_secs > RETRY_BACKOFF_SECS_MAX) { - retry_backoff_secs = RETRY_BACKOFF_SECS_MAX; + if (error_retry_time <= ceph::coarse_real_clock::now()) { + /* process bucket shards that previously failed */ + yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, error_oid), + error_marker, &error_entries, + max_error_entries)); + ldout(sync_env->cct, 20) << __func__ << "(): read error repo, got " << error_entries.size() << " entries" << dendl; + iter = error_entries.begin(); + for (; iter != error_entries.end(); ++iter) { + error_marker = *iter; + ldout(sync_env->cct, 20) << __func__ << "(): handle error entry: " << error_marker << dendl; + spawn(new RGWDataSyncSingleEntryCR(sync_env, error_marker, error_marker, nullptr /* no marker tracker */, error_repo, true), false); + } + if ((int)error_entries.size() != max_error_entries) { + if (error_marker.empty() && error_entries.empty()) { + /* the retry repo is empty, we back off a bit before calling it again */ + retry_backoff_secs *= 2; + if (retry_backoff_secs > RETRY_BACKOFF_SECS_MAX) { + retry_backoff_secs = RETRY_BACKOFF_SECS_MAX; + } + } else { + retry_backoff_secs = RETRY_BACKOFF_SECS_DEFAULT; } - } else { - retry_backoff_secs = RETRY_BACKOFF_SECS_DEFAULT; + error_retry_time = ceph::coarse_real_clock::now() + make_timespan(retry_backoff_secs); + error_marker.clear(); } - error_retry_time = ceph::coarse_real_clock::now() + make_timespan(retry_backoff_secs); - error_marker.clear(); } - #define INCREMENTAL_MAX_ENTRIES 100 ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " sync_marker=" << sync_marker.marker << dendl; spawned_keys.clear(); @@ -1405,13 +1406,29 @@ public: } ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " sync_marker=" << sync_marker.marker << " truncated=" << truncated << dendl; if (!truncated) { -#define INCREMENTAL_INTERVAL 20 - yield wait(utime_t(INCREMENTAL_INTERVAL, 0)); - } + yield wait(get_idle_interval()); + } } while (true); } return 0; } + + utime_t get_idle_interval() const { +#define INCREMENTAL_INTERVAL 20 + ceph::timespan interval = std::chrono::seconds(INCREMENTAL_INTERVAL); + if (!ceph::coarse_real_clock::is_zero(error_retry_time)) { + auto now = ceph::coarse_real_clock::now(); + if (error_retry_time > now) { + auto d = error_retry_time - now; + if (interval > d) { + interval = d; + } + } + } + // convert timespan -> time_point -> utime_t + return utime_t(ceph::coarse_real_clock::zero() + interval); + } + void stop_spawned_services() { lease_cr->go_down(); if (error_repo) { -- 2.39.5