From eb655323781ce4d23d6983aa5164d9dc367497e9 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Tue, 14 Aug 2018 11:16:16 -0400 Subject: [PATCH] rgw: data sync respects error_retry_time for backoff on error_repo don't restart processing the error_repo until error_retry_time. when data sync is otherwise idle, don't sleep past error_retry_time Fixes: http://tracker.ceph.com/issues/26938 Signed-off-by: Casey Bodley --- src/rgw/rgw_data_sync.cc | 69 +++++++++++++++++++++++++--------------- 1 file changed, 43 insertions(+), 26 deletions(-) diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index 1614b51030059..5ccbdc7f459ec 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -1410,34 +1410,35 @@ public: } } - /* process bucket shards that previously failed */ - omapkeys = std::make_shared(); - yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, error_oid), - error_marker, max_error_entries, omapkeys)); - error_entries = std::move(omapkeys->entries); - tn->log(20, SSTR("read error repo, got " << error_entries.size() << " entries")); - iter = error_entries.begin(); - for (; iter != error_entries.end(); ++iter) { - error_marker = *iter; - tn->log(20, SSTR("handle error entry: " << error_marker)); - spawn(new RGWDataSyncSingleEntryCR(sync_env, error_marker, error_marker, nullptr /* no marker tracker */, error_repo, true, tn), false); - } - if (!omapkeys->more) { - if (error_marker.empty() && error_entries.empty()) { - /* the retry repo is empty, we back off a bit before calling it again */ - retry_backoff_secs *= 2; - if (retry_backoff_secs > RETRY_BACKOFF_SECS_MAX) { - retry_backoff_secs = RETRY_BACKOFF_SECS_MAX; + if (error_retry_time <= ceph::coarse_real_clock::now()) { + /* process bucket shards that previously failed */ + omapkeys = std::make_shared(); + yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, error_oid), + error_marker, max_error_entries, omapkeys)); + error_entries = std::move(omapkeys->entries); + tn->log(20, SSTR("read error repo, got " << error_entries.size() << " entries")); + iter = error_entries.begin(); + for (; iter != error_entries.end(); ++iter) { + error_marker = *iter; + tn->log(20, SSTR("handle error entry: " << error_marker)); + spawn(new RGWDataSyncSingleEntryCR(sync_env, error_marker, error_marker, nullptr /* no marker tracker */, error_repo, true, tn), false); + } + if (!omapkeys->more) { + if (error_marker.empty() && error_entries.empty()) { + /* the retry repo is empty, we back off a bit before calling it again */ + retry_backoff_secs *= 2; + if (retry_backoff_secs > RETRY_BACKOFF_SECS_MAX) { + retry_backoff_secs = RETRY_BACKOFF_SECS_MAX; + } + } else { + retry_backoff_secs = RETRY_BACKOFF_SECS_DEFAULT; } - } else { - retry_backoff_secs = RETRY_BACKOFF_SECS_DEFAULT; + error_retry_time = ceph::coarse_real_clock::now() + make_timespan(retry_backoff_secs); + error_marker.clear(); } - error_retry_time = ceph::coarse_real_clock::now() + make_timespan(retry_backoff_secs); - error_marker.clear(); } omapkeys.reset(); - #define INCREMENTAL_MAX_ENTRIES 100 tn->log(20, SSTR("shard_id=" << shard_id << " sync_marker=" << sync_marker.marker)); spawned_keys.clear(); @@ -1494,13 +1495,29 @@ public: if (!truncated) { // we reached the end, wait a while before checking for more tn->unset_flag(RGW_SNS_FLAG_ACTIVE); -#define INCREMENTAL_INTERVAL 20 - yield wait(utime_t(INCREMENTAL_INTERVAL, 0)); - } + yield wait(get_idle_interval()); + } } while (true); } return 0; } + + utime_t get_idle_interval() const { +#define INCREMENTAL_INTERVAL 20 + ceph::timespan interval = std::chrono::seconds(INCREMENTAL_INTERVAL); + if (!ceph::coarse_real_clock::is_zero(error_retry_time)) { + auto now = ceph::coarse_real_clock::now(); + if (error_retry_time > now) { + auto d = error_retry_time - now; + if (interval > d) { + interval = d; + } + } + } + // convert timespan -> time_point -> utime_t + return utime_t(ceph::coarse_real_clock::zero() + interval); + } + void stop_spawned_services() { lease_cr->go_down(); if (error_repo) { -- 2.39.5