From b5d5da9c4d2a7bb07bdc6b51b9f58831d220d99b Mon Sep 17 00:00:00 2001 From: "Adam C. Emerson" Date: Tue, 11 Oct 2022 14:49:58 -0400 Subject: [PATCH] rgw: Reduce data sync parallelism in response to RADOS lock latency Lock latency in RGWContinuousLeaseCR gets high enough under load that the locks end up timing out, leading to incorrect behavior. Monitor lock latency and cut concurrent operations in half if it goes above ten seconds. Cut currency to one if it goes about twenty seconds. Signed-off-by: Adam C. Emerson --- src/rgw/driver/rados/rgw_cr_rados.cc | 8 ++++++ src/rgw/driver/rados/rgw_cr_rados.h | 32 +++++++++++++++++++++-- src/rgw/driver/rados/rgw_data_sync.cc | 25 +++++++++--------- src/rgw/driver/rados/rgw_data_sync.h | 37 ++++++++++++++++++++++++++- src/rgw/driver/rados/rgw_sync.cc | 8 +++--- 5 files changed, 91 insertions(+), 19 deletions(-) diff --git a/src/rgw/driver/rados/rgw_cr_rados.cc b/src/rgw/driver/rados/rgw_cr_rados.cc index eb5db86f25a9f..7d9f3957dcbcd 100644 --- a/src/rgw/driver/rados/rgw_cr_rados.cc +++ b/src/rgw/driver/rados/rgw_cr_rados.cc @@ -937,7 +937,11 @@ int RGWContinuousLeaseCR::operate(const DoutPrefixProvider *dpp) reenter(this) { last_renew_try_time = ceph::coarse_mono_clock::now(); while (!going_down) { + current_time = ceph::coarse_mono_clock::now(); yield call(new RGWSimpleRadosLockCR(async_rados, store, obj, lock_name, cookie, interval)); + if (latency) { + latency->add_latency(ceph::coarse_mono_clock::now() - current_time); + } current_time = ceph::coarse_mono_clock::now(); if (current_time - last_renew_try_time > interval_tolerance) { // renewal should happen between 50%-90% of interval @@ -957,7 +961,11 @@ int RGWContinuousLeaseCR::operate(const DoutPrefixProvider *dpp) yield wait(utime_t(interval / 2, 0)); } set_locked(false); /* moot at this point anyway */ + current_time = ceph::coarse_mono_clock::now(); yield call(new RGWSimpleRadosUnlockCR(async_rados, store, obj, lock_name, cookie)); + if (latency) { + latency->add_latency(ceph::coarse_mono_clock::now() - current_time); + } return set_state(RGWCoroutine_Done); } return 0; diff --git a/src/rgw/driver/rados/rgw_cr_rados.h b/src/rgw/driver/rados/rgw_cr_rados.h index 778f66fa2539b..3451376ee6eaa 100644 --- a/src/rgw/driver/rados/rgw_cr_rados.h +++ b/src/rgw/driver/rados/rgw_cr_rados.h @@ -1421,6 +1421,31 @@ public: } }; +/// \brief Collect average latency +/// +/// Used in data sync to back off on concurrency when latency of lock +/// operations rises. +/// +/// \warning This class is not thread safe. We do not use a mutex +/// because all coroutines spawned by RGWDataSyncCR share a single thread. +class LatencyMonitor { + ceph::timespan total; + std::uint64_t count = 0; + +public: + + LatencyMonitor() = default; + void add_latency(ceph::timespan latency) { + total += latency; + ++count; + } + + ceph::timespan avg_latency() { + using namespace std::literals; + return count == 0 ? 0s : total / count; + } +}; + class RGWContinuousLeaseCR : public RGWCoroutine { RGWAsyncRadosProcessor* async_rados; rgw::sal::RadosStore* store; @@ -1444,15 +1469,18 @@ class RGWContinuousLeaseCR : public RGWCoroutine { ceph::coarse_mono_time last_renew_try_time; ceph::coarse_mono_time current_time; + LatencyMonitor* latency; + public: RGWContinuousLeaseCR(RGWAsyncRadosProcessor* async_rados, rgw::sal::RadosStore* _store, rgw_raw_obj obj, std::string lock_name, - int interval, RGWCoroutine* caller) + int interval, RGWCoroutine* caller, + LatencyMonitor* const latency) : RGWCoroutine(_store->ctx()), async_rados(async_rados), store(_store), obj(std::move(obj)), lock_name(std::move(lock_name)), interval(interval), interval_tolerance(ceph::make_timespan(9*interval/10)), - ts_interval(ceph::make_timespan(interval)), caller(caller) + ts_interval(ceph::make_timespan(interval)), caller(caller), latency(latency) {} virtual ~RGWContinuousLeaseCR() override; diff --git a/src/rgw/driver/rados/rgw_data_sync.cc b/src/rgw/driver/rados/rgw_data_sync.cc index 1b5ad619beff0..e51690974ebd4 100644 --- a/src/rgw/driver/rados/rgw_data_sync.cc +++ b/src/rgw/driver/rados/rgw_data_sync.cc @@ -561,7 +561,7 @@ public: sc->env->async_rados, sc->env->driver, { sc->env->svc->zone->get_zone_params().log_pool, RGWDataSyncStatusManager::sync_status_oid(sc->source_zone) }, - string(lock_name), lock_duration, caller); + string(lock_name), lock_duration, caller, &sc->lcc); } int operate(const DoutPrefixProvider *dpp) override { @@ -1534,7 +1534,7 @@ public: tn->log(10, SSTR("writing shard_id " << sid << " of gen " << each->gen << " to error repo for retry")); yield_spawn_window(rgw::error_repo::write_cr(sync_env->driver->svc()->rados, error_repo, rgw::error_repo::encode_key(bs, each->gen), - timestamp), cct->_conf->rgw_data_sync_spawn_window, + timestamp), sc->lcc.adj_concurrency(cct->_conf->rgw_data_sync_spawn_window), [&](uint64_t stack_id, int ret) { if (ret < 0) { retcode = ret; @@ -1660,7 +1660,7 @@ public: tn->log(10, SSTR("Write " << source_bs.shard_id << " to error repo for retry")); yield_spawn_window(rgw::error_repo::write_cr(sync_env->driver->svc()->rados, error_repo, rgw::error_repo::encode_key(source_bs, each->gen), - timestamp), cct->_conf->rgw_data_sync_spawn_window, std::nullopt); + timestamp), sc->lcc.adj_concurrency(cct->_conf->rgw_data_sync_spawn_window), std::nullopt); } else { shard_cr = data_sync_single_entry(sc, source_bs, each->gen, key, timestamp, lease_cr, bucket_shard_cache, nullptr, error_repo, tn, false); @@ -1669,7 +1669,7 @@ public: yield call(shard_cr); first_shard = false; } else { - yield_spawn_window(shard_cr, cct->_conf->rgw_data_sync_spawn_window, + yield_spawn_window(shard_cr, sc->lcc.adj_concurrency(cct->_conf->rgw_data_sync_spawn_window), [&](uint64_t stack_id, int ret) { if (ret < 0) { retcode = ret; @@ -1807,7 +1807,7 @@ public: sc, pool, source_bs, iter->first, sync_status, error_repo, entry_timestamp, lease_cr, bucket_shard_cache, &*marker_tracker, tn), - cct->_conf->rgw_data_sync_spawn_window, + sc->lcc.adj_concurrency(cct->_conf->rgw_data_sync_spawn_window), std::nullopt); } sync_marker.marker = iter->first; @@ -2026,7 +2026,7 @@ public: log_iter->log_id, log_iter->log_timestamp, lease_cr,bucket_shard_cache, &*marker_tracker, error_repo, tn, false), - cct->_conf->rgw_data_sync_spawn_window, std::nullopt); + sc->lcc.adj_concurrency(cct->_conf->rgw_data_sync_spawn_window), std::nullopt); } } @@ -2179,7 +2179,8 @@ public: auto driver = sync_env->driver; lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, driver, rgw_raw_obj(pool, status_oid), - lock_name, lock_duration, this)); + lock_name, lock_duration, this, + &sc->lcc)); lease_stack.reset(spawn(lease_cr.get(), false)); } }; @@ -4511,7 +4512,7 @@ int RGWBucketFullSyncCR::operate(const DoutPrefixProvider *dpp) entry->key, &marker_tracker, zones_trace, tn), false); } - drain_with_cb(cct->_conf->rgw_bucket_sync_spawn_window, + drain_with_cb(sc->lcc.adj_concurrency(cct->_conf->rgw_bucket_sync_spawn_window), [&](uint64_t stack_id, int ret) { if (ret < 0) { tn->log(10, "a sync operation returned error"); @@ -4908,7 +4909,7 @@ int RGWBucketShardIncrementalSyncCR::operate(const DoutPrefixProvider *dpp) false); } // } - drain_with_cb(cct->_conf->rgw_bucket_sync_spawn_window, + drain_with_cb(sc->lcc.adj_concurrency(cct->_conf->rgw_bucket_sync_spawn_window), [&](uint64_t stack_id, int ret) { if (ret < 0) { tn->log(10, "a sync operation returned error"); @@ -5169,7 +5170,7 @@ int RGWRunBucketSourcesSyncCR::operate(const DoutPrefixProvider *dpp) yield_spawn_window(sync_bucket_shard_cr(sc, lease_cr, sync_pair, gen, tn, &*cur_shard_progress), - cct->_conf->rgw_bucket_sync_spawn_window, + sc->lcc.adj_concurrency(cct->_conf->rgw_bucket_sync_spawn_window), [&](uint64_t stack_id, int ret) { if (ret < 0) { tn->log(10, SSTR("ERROR: a sync operation returned error: " << ret)); @@ -5662,7 +5663,7 @@ int RGWSyncBucketCR::operate(const DoutPrefixProvider *dpp) tn->log(20, "remote bilog indicates that sync was stopped"); if (!bucket_lease_cr) { bucket_lease_cr.reset(new RGWContinuousLeaseCR(env->async_rados, env->driver, status_obj, - lock_name, lock_duration, this)); + lock_name, lock_duration, this, &sc->lcc)); yield spawn(bucket_lease_cr.get(), false); while (!bucket_lease_cr->is_locked()) { if (bucket_lease_cr->is_done()) { @@ -5720,7 +5721,7 @@ int RGWSyncBucketCR::operate(const DoutPrefixProvider *dpp) // different shards from duplicating the init and full sync if (!bucket_lease_cr) { bucket_lease_cr.reset(new RGWContinuousLeaseCR(env->async_rados, env->driver, status_obj, - lock_name, lock_duration, this)); + lock_name, lock_duration, this, &sc->lcc)); yield spawn(bucket_lease_cr.get(), false); while (!bucket_lease_cr->is_locked()) { if (bucket_lease_cr->is_done()) { diff --git a/src/rgw/driver/rados/rgw_data_sync.h b/src/rgw/driver/rados/rgw_data_sync.h index 7514d3f8a2c44..9d1b0e77f8e1e 100644 --- a/src/rgw/driver/rados/rgw_data_sync.h +++ b/src/rgw/driver/rados/rgw_data_sync.h @@ -13,6 +13,7 @@ #include "common/likely.h" #include "rgw_coroutine.h" +#include "rgw_cr_rados.h" #include "rgw_http_client.h" #include "rgw_sal_rados.h" @@ -350,6 +351,37 @@ void pretty_print(const RGWDataSyncEnv* env, const S& fmt, T&& ...t) { } } +/// \brief Adjust concurrency based on latency +/// +/// Keep a running average of operation latency and scale concurrency +/// down when latency rises. +class LatencyConcurrencyControl : public LatencyMonitor { +public: + CephContext* cct; + + LatencyConcurrencyControl(CephContext* cct) + : cct(cct) {} + + /// \brief Lower concurrency when latency rises + /// + /// Since we have multiple spawn windows (data sync overall and + /// bucket), accept a number of concurrent operations to spawn and, + /// if latency is high, cut it in half. If latency is really high, + /// cut it to 1. + int64_t adj_concurrency(int64_t concurrency) { + using namespace std::literals; + auto threshold = (cct->_conf->rgw_sync_lease_period * 1s) / 12; + + if (avg_latency() >= 2 * threshold) [[unlikely]] { + return 1; + } else if (avg_latency() >= threshold) [[unlikely]] { + return concurrency / 2; + } else [[likely]] { + return concurrency; + } + } +}; + struct RGWDataSyncCtx { RGWDataSyncEnv *env{nullptr}; CephContext *cct{nullptr}; @@ -357,12 +389,14 @@ struct RGWDataSyncCtx { RGWRESTConn *conn{nullptr}; rgw_zone_id source_zone; + LatencyConcurrencyControl lcc{nullptr}; + RGWDataSyncCtx() = default; RGWDataSyncCtx(RGWDataSyncEnv* env, RGWRESTConn* conn, const rgw_zone_id& source_zone) - : env(env), cct(env->cct), conn(conn), source_zone(source_zone) {} + : env(env), cct(env->cct), conn(conn), source_zone(source_zone), lcc(cct) {} void init(RGWDataSyncEnv *_env, RGWRESTConn *_conn, @@ -371,6 +405,7 @@ struct RGWDataSyncCtx { env = _env; conn = _conn; source_zone = _source_zone; + lcc.cct = cct; } }; diff --git a/src/rgw/driver/rados/rgw_sync.cc b/src/rgw/driver/rados/rgw_sync.cc index 4a56595fe386c..081bc7772e967 100644 --- a/src/rgw/driver/rados/rgw_sync.cc +++ b/src/rgw/driver/rados/rgw_sync.cc @@ -637,7 +637,7 @@ public: rgw::sal::RadosStore* store = sync_env->store; lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store, rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sync_env->status_oid()), - lock_name, lock_duration, this)); + lock_name, lock_duration, this, nullptr)); lease_stack.reset(spawn(lease_cr.get(), false)); } while (!lease_cr->is_locked()) { @@ -883,7 +883,7 @@ public: lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, sync_env->store, rgw_raw_obj(sync_env->store->svc()->zone->get_zone_params().log_pool, sync_env->status_oid()), - lock_name, lock_duration, this)); + lock_name, lock_duration, this, nullptr)); lease_stack.reset(spawn(lease_cr.get(), false)); } while (!lease_cr->is_locked()) { @@ -1553,7 +1553,7 @@ public: rgw::sal::RadosStore* store = sync_env->store; lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store, rgw_raw_obj(pool, sync_env->shard_obj_name(shard_id)), - lock_name, lock_duration, this)); + lock_name, lock_duration, this, nullptr)); lease_stack.reset(spawn(lease_cr.get(), false)); lost_lock = false; } @@ -1709,7 +1709,7 @@ public: rgw::sal::RadosStore* store = sync_env->store; lease_cr.reset( new RGWContinuousLeaseCR(sync_env->async_rados, store, rgw_raw_obj(pool, sync_env->shard_obj_name(shard_id)), - lock_name, lock_duration, this)); + lock_name, lock_duration, this, nullptr)); lease_stack.reset(spawn(lease_cr.get(), false)); lost_lock = false; } -- 2.39.5