]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: Reduce data sync parallelism in response to RADOS lock latency
authorAdam C. Emerson <aemerson@redhat.com>
Tue, 11 Oct 2022 18:49:58 +0000 (14:49 -0400)
committerAdam C. Emerson <aemerson@redhat.com>
Thu, 12 Jan 2023 23:13:46 +0000 (18:13 -0500)
Lock latency in RGWContinuousLeaseCR gets high enough under load that
the locks end up timing out, leading to incorrect behavior.

Monitor lock latency and cut concurrent operations in half if it goes
above ten seconds.

Cut currency to one if it goes about twenty seconds.

Signed-off-by: Adam C. Emerson <aemerson@redhat.com>
src/rgw/driver/rados/rgw_cr_rados.cc
src/rgw/driver/rados/rgw_cr_rados.h
src/rgw/driver/rados/rgw_data_sync.cc
src/rgw/driver/rados/rgw_data_sync.h
src/rgw/driver/rados/rgw_sync.cc

index eb5db86f25a9f06795ccee71ad46f24b11005cb1..7d9f3957dcbcd14b8d29165236aee0d7f8d298b8 100644 (file)
@@ -937,7 +937,11 @@ int RGWContinuousLeaseCR::operate(const DoutPrefixProvider *dpp)
   reenter(this) {
     last_renew_try_time = ceph::coarse_mono_clock::now();
     while (!going_down) {
+      current_time = ceph::coarse_mono_clock::now();
       yield call(new RGWSimpleRadosLockCR(async_rados, store, obj, lock_name, cookie, interval));
+      if (latency) {
+       latency->add_latency(ceph::coarse_mono_clock::now() - current_time);
+      }
       current_time = ceph::coarse_mono_clock::now();
       if (current_time - last_renew_try_time > interval_tolerance) {
         // renewal should happen between 50%-90% of interval
@@ -957,7 +961,11 @@ int RGWContinuousLeaseCR::operate(const DoutPrefixProvider *dpp)
       yield wait(utime_t(interval / 2, 0));
     }
     set_locked(false); /* moot at this point anyway */
+    current_time = ceph::coarse_mono_clock::now();
     yield call(new RGWSimpleRadosUnlockCR(async_rados, store, obj, lock_name, cookie));
+    if (latency) {
+      latency->add_latency(ceph::coarse_mono_clock::now() - current_time);
+    }
     return set_state(RGWCoroutine_Done);
   }
   return 0;
index 778f66fa2539bcf32dc442ffd1fb7a36c68b83ef..3451376ee6eaa752d7047b8185bef5b7064e3470 100644 (file)
@@ -1421,6 +1421,31 @@ public:
   }
 };
 
+/// \brief Collect average latency
+///
+/// Used in data sync to back off on concurrency when latency of lock
+/// operations rises.
+///
+/// \warning This class is not thread safe. We do not use a mutex
+/// because all coroutines spawned by RGWDataSyncCR share a single thread.
+class LatencyMonitor {
+  ceph::timespan total;
+  std::uint64_t count = 0;
+
+public:
+
+  LatencyMonitor() = default;
+  void add_latency(ceph::timespan latency) {
+    total += latency;
+    ++count;
+  }
+
+  ceph::timespan avg_latency() {
+    using namespace std::literals;
+    return count == 0 ? 0s : total / count;
+  }
+};
+
 class RGWContinuousLeaseCR : public RGWCoroutine {
   RGWAsyncRadosProcessor* async_rados;
   rgw::sal::RadosStore* store;
@@ -1444,15 +1469,18 @@ class RGWContinuousLeaseCR : public RGWCoroutine {
   ceph::coarse_mono_time last_renew_try_time;
   ceph::coarse_mono_time current_time;
 
+  LatencyMonitor* latency;
+
 public:
   RGWContinuousLeaseCR(RGWAsyncRadosProcessor* async_rados,
                        rgw::sal::RadosStore* _store,
                        rgw_raw_obj obj, std::string lock_name,
-                       int interval, RGWCoroutine* caller)
+                       int interval, RGWCoroutine* caller,
+                      LatencyMonitor* const latency)
     : RGWCoroutine(_store->ctx()), async_rados(async_rados), store(_store),
       obj(std::move(obj)), lock_name(std::move(lock_name)),
       interval(interval), interval_tolerance(ceph::make_timespan(9*interval/10)),
-      ts_interval(ceph::make_timespan(interval)), caller(caller)
+      ts_interval(ceph::make_timespan(interval)), caller(caller), latency(latency)
   {}
 
   virtual ~RGWContinuousLeaseCR() override;
index 1b5ad619beff0a59506a9bc4aa5840b62a34f55a..e51690974ebd4a1605030794053400c1f08b11a3 100644 (file)
@@ -561,7 +561,7 @@ public:
       sc->env->async_rados, sc->env->driver,
       { sc->env->svc->zone->get_zone_params().log_pool,
        RGWDataSyncStatusManager::sync_status_oid(sc->source_zone) },
-      string(lock_name), lock_duration, caller);
+      string(lock_name), lock_duration, caller, &sc->lcc);
   }
 
   int operate(const DoutPrefixProvider *dpp) override {
@@ -1534,7 +1534,7 @@ public:
           tn->log(10, SSTR("writing shard_id " << sid << " of gen " << each->gen << " to error repo for retry"));
           yield_spawn_window(rgw::error_repo::write_cr(sync_env->driver->svc()->rados, error_repo,
                             rgw::error_repo::encode_key(bs, each->gen),
-                            timestamp), cct->_conf->rgw_data_sync_spawn_window,
+                           timestamp), sc->lcc.adj_concurrency(cct->_conf->rgw_data_sync_spawn_window),
                             [&](uint64_t stack_id, int ret) {
                               if (ret < 0) {
                                 retcode = ret;
@@ -1660,7 +1660,7 @@ public:
             tn->log(10, SSTR("Write " << source_bs.shard_id << " to error repo for retry"));
             yield_spawn_window(rgw::error_repo::write_cr(sync_env->driver->svc()->rados, error_repo,
                 rgw::error_repo::encode_key(source_bs, each->gen),
-                timestamp), cct->_conf->rgw_data_sync_spawn_window, std::nullopt);
+               timestamp), sc->lcc.adj_concurrency(cct->_conf->rgw_data_sync_spawn_window), std::nullopt);
           } else {
           shard_cr = data_sync_single_entry(sc, source_bs, each->gen, key, timestamp,
                       lease_cr, bucket_shard_cache, nullptr, error_repo, tn, false);
@@ -1669,7 +1669,7 @@ public:
             yield call(shard_cr);
             first_shard = false;
           } else {
-            yield_spawn_window(shard_cr, cct->_conf->rgw_data_sync_spawn_window,
+            yield_spawn_window(shard_cr, sc->lcc.adj_concurrency(cct->_conf->rgw_data_sync_spawn_window),
                               [&](uint64_t stack_id, int ret) {
                                 if (ret < 0) {
                                   retcode = ret;
@@ -1807,7 +1807,7 @@ public:
                                 sc, pool, source_bs, iter->first, sync_status,
                                 error_repo, entry_timestamp, lease_cr,
                                 bucket_shard_cache, &*marker_tracker, tn),
-                              cct->_conf->rgw_data_sync_spawn_window,
+                              sc->lcc.adj_concurrency(cct->_conf->rgw_data_sync_spawn_window),
                               std::nullopt);
           }
          sync_marker.marker = iter->first;
@@ -2026,7 +2026,7 @@ public:
                                     log_iter->log_id, log_iter->log_timestamp,
                                     lease_cr,bucket_shard_cache,
                                     &*marker_tracker, error_repo, tn, false),
-             cct->_conf->rgw_data_sync_spawn_window, std::nullopt);
+             sc->lcc.adj_concurrency(cct->_conf->rgw_data_sync_spawn_window), std::nullopt);
           }
         }
 
@@ -2179,7 +2179,8 @@ public:
     auto driver = sync_env->driver;
     lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, driver,
                                             rgw_raw_obj(pool, status_oid),
-                                            lock_name, lock_duration, this));
+                                            lock_name, lock_duration, this,
+                                           &sc->lcc));
     lease_stack.reset(spawn(lease_cr.get(), false));
   }
 };
@@ -4511,7 +4512,7 @@ int RGWBucketFullSyncCR::operate(const DoutPrefixProvider *dpp)
                                  entry->key, &marker_tracker, zones_trace, tn),
                       false);
         }
-        drain_with_cb(cct->_conf->rgw_bucket_sync_spawn_window,
+        drain_with_cb(sc->lcc.adj_concurrency(cct->_conf->rgw_bucket_sync_spawn_window),
                       [&](uint64_t stack_id, int ret) {
                 if (ret < 0) {
                   tn->log(10, "a sync operation returned error");
@@ -4908,7 +4909,7 @@ int RGWBucketShardIncrementalSyncCR::operate(const DoutPrefixProvider *dpp)
                   false);
           }
         // }
-        drain_with_cb(cct->_conf->rgw_bucket_sync_spawn_window,
+         drain_with_cb(sc->lcc.adj_concurrency(cct->_conf->rgw_bucket_sync_spawn_window),
                       [&](uint64_t stack_id, int ret) {
                 if (ret < 0) {
                   tn->log(10, "a sync operation returned error");
@@ -5169,7 +5170,7 @@ int RGWRunBucketSourcesSyncCR::operate(const DoutPrefixProvider *dpp)
 
       yield_spawn_window(sync_bucket_shard_cr(sc, lease_cr, sync_pair,
                                               gen, tn, &*cur_shard_progress),
-                         cct->_conf->rgw_bucket_sync_spawn_window,
+                         sc->lcc.adj_concurrency(cct->_conf->rgw_bucket_sync_spawn_window),
                          [&](uint64_t stack_id, int ret) {
                            if (ret < 0) {
                              tn->log(10, SSTR("ERROR: a sync operation returned error: " << ret));
@@ -5662,7 +5663,7 @@ int RGWSyncBucketCR::operate(const DoutPrefixProvider *dpp)
           tn->log(20, "remote bilog indicates that sync was stopped");
           if (!bucket_lease_cr) {
             bucket_lease_cr.reset(new RGWContinuousLeaseCR(env->async_rados, env->driver, status_obj,
-                                                         lock_name, lock_duration, this));
+                                                          lock_name, lock_duration, this, &sc->lcc));
             yield spawn(bucket_lease_cr.get(), false);
             while (!bucket_lease_cr->is_locked()) {
               if (bucket_lease_cr->is_done()) {
@@ -5720,7 +5721,7 @@ int RGWSyncBucketCR::operate(const DoutPrefixProvider *dpp)
         // different shards from duplicating the init and full sync
         if (!bucket_lease_cr) {
           bucket_lease_cr.reset(new RGWContinuousLeaseCR(env->async_rados, env->driver, status_obj,
-                                                       lock_name, lock_duration, this));
+                                                        lock_name, lock_duration, this, &sc->lcc));
           yield spawn(bucket_lease_cr.get(), false);
           while (!bucket_lease_cr->is_locked()) {
             if (bucket_lease_cr->is_done()) {
index 7514d3f8a2c44f2a6d244622fd5f9db4935436c5..9d1b0e77f8e1e507620cfa821687f29d8c19924a 100644 (file)
@@ -13,6 +13,7 @@
 #include "common/likely.h"
 
 #include "rgw_coroutine.h"
+#include "rgw_cr_rados.h"
 #include "rgw_http_client.h"
 #include "rgw_sal_rados.h"
 
@@ -350,6 +351,37 @@ void pretty_print(const RGWDataSyncEnv* env, const S& fmt, T&& ...t) {
   }
 }
 
+/// \brief Adjust concurrency based on latency
+///
+/// Keep a running average of operation latency and scale concurrency
+/// down when latency rises.
+class LatencyConcurrencyControl : public LatencyMonitor {
+public:
+  CephContext* cct;
+
+  LatencyConcurrencyControl(CephContext* cct)
+    : cct(cct)  {}
+
+  /// \brief Lower concurrency when latency rises
+  ///
+  /// Since we have multiple spawn windows (data sync overall and
+  /// bucket), accept a number of concurrent operations to spawn and,
+  /// if latency is high, cut it in half. If latency is really high,
+  /// cut it to 1.
+  int64_t adj_concurrency(int64_t concurrency) {
+    using namespace std::literals;
+    auto threshold = (cct->_conf->rgw_sync_lease_period * 1s) / 12;
+
+    if (avg_latency() >= 2 * threshold) [[unlikely]] {
+      return 1;
+    } else if (avg_latency() >= threshold) [[unlikely]] {
+      return concurrency / 2;
+    } else [[likely]] {
+      return concurrency;
+    }
+  }
+};
+
 struct RGWDataSyncCtx {
   RGWDataSyncEnv *env{nullptr};
   CephContext *cct{nullptr};
@@ -357,12 +389,14 @@ struct RGWDataSyncCtx {
   RGWRESTConn *conn{nullptr};
   rgw_zone_id source_zone;
 
+  LatencyConcurrencyControl lcc{nullptr};
+
   RGWDataSyncCtx() = default;
 
   RGWDataSyncCtx(RGWDataSyncEnv* env,
                 RGWRESTConn* conn,
                 const rgw_zone_id& source_zone)
-    : env(env), cct(env->cct), conn(conn), source_zone(source_zone) {}
+    : env(env), cct(env->cct), conn(conn), source_zone(source_zone), lcc(cct) {}
 
   void init(RGWDataSyncEnv *_env,
             RGWRESTConn *_conn,
@@ -371,6 +405,7 @@ struct RGWDataSyncCtx {
     env = _env;
     conn = _conn;
     source_zone = _source_zone;
+    lcc.cct = cct;
   }
 };
 
index 4a56595fe386c208571cad5c4e121276ae6af265..081bc7772e9676d1460ac18d301df449f297480d 100644 (file)
@@ -637,7 +637,7 @@ public:
        rgw::sal::RadosStore* store = sync_env->store;
         lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
                                                 rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sync_env->status_oid()),
-                                                lock_name, lock_duration, this));
+                                                lock_name, lock_duration, this, nullptr));
         lease_stack.reset(spawn(lease_cr.get(), false));
       }
       while (!lease_cr->is_locked()) {
@@ -883,7 +883,7 @@ public:
         lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados,
                                                 sync_env->store,
                                                 rgw_raw_obj(sync_env->store->svc()->zone->get_zone_params().log_pool, sync_env->status_oid()),
-                                                lock_name, lock_duration, this));
+                                                lock_name, lock_duration, this, nullptr));
         lease_stack.reset(spawn(lease_cr.get(), false));
       }
       while (!lease_cr->is_locked()) {
@@ -1553,7 +1553,7 @@ public:
        rgw::sal::RadosStore* store = sync_env->store;
         lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
                                                 rgw_raw_obj(pool, sync_env->shard_obj_name(shard_id)),
-                                                lock_name, lock_duration, this));
+                                                lock_name, lock_duration, this, nullptr));
         lease_stack.reset(spawn(lease_cr.get(), false));
         lost_lock = false;
       }
@@ -1709,7 +1709,7 @@ public:
          rgw::sal::RadosStore* store = sync_env->store;
           lease_cr.reset( new RGWContinuousLeaseCR(sync_env->async_rados, store,
                                                    rgw_raw_obj(pool, sync_env->shard_obj_name(shard_id)),
-                                                   lock_name, lock_duration, this));
+                                                   lock_name, lock_duration, this, nullptr));
           lease_stack.reset(spawn(lease_cr.get(), false));
           lost_lock = false;
         }