reenter(this) {
last_renew_try_time = ceph::coarse_mono_clock::now();
while (!going_down) {
+ current_time = ceph::coarse_mono_clock::now();
yield call(new RGWSimpleRadosLockCR(async_rados, store, obj, lock_name, cookie, interval));
+ if (latency) {
+ latency->add_latency(ceph::coarse_mono_clock::now() - current_time);
+ }
current_time = ceph::coarse_mono_clock::now();
if (current_time - last_renew_try_time > interval_tolerance) {
// renewal should happen between 50%-90% of interval
yield wait(utime_t(interval / 2, 0));
}
set_locked(false); /* moot at this point anyway */
+ current_time = ceph::coarse_mono_clock::now();
yield call(new RGWSimpleRadosUnlockCR(async_rados, store, obj, lock_name, cookie));
+ if (latency) {
+ latency->add_latency(ceph::coarse_mono_clock::now() - current_time);
+ }
return set_state(RGWCoroutine_Done);
}
return 0;
}
};
+/// \brief Collect average latency
+///
+/// Used in data sync to back off on concurrency when latency of lock
+/// operations rises.
+///
+/// \warning This class is not thread safe. We do not use a mutex
+/// because all coroutines spawned by RGWDataSyncCR share a single thread.
+class LatencyMonitor {
+ ceph::timespan total;
+ std::uint64_t count = 0;
+
+public:
+
+ LatencyMonitor() = default;
+ void add_latency(ceph::timespan latency) {
+ total += latency;
+ ++count;
+ }
+
+ ceph::timespan avg_latency() {
+ using namespace std::literals;
+ return count == 0 ? 0s : total / count;
+ }
+};
+
class RGWContinuousLeaseCR : public RGWCoroutine {
RGWAsyncRadosProcessor* async_rados;
rgw::sal::RadosStore* store;
ceph::coarse_mono_time last_renew_try_time;
ceph::coarse_mono_time current_time;
+ LatencyMonitor* latency;
+
public:
RGWContinuousLeaseCR(RGWAsyncRadosProcessor* async_rados,
rgw::sal::RadosStore* _store,
rgw_raw_obj obj, std::string lock_name,
- int interval, RGWCoroutine* caller)
+ int interval, RGWCoroutine* caller,
+ LatencyMonitor* const latency)
: RGWCoroutine(_store->ctx()), async_rados(async_rados), store(_store),
obj(std::move(obj)), lock_name(std::move(lock_name)),
interval(interval), interval_tolerance(ceph::make_timespan(9*interval/10)),
- ts_interval(ceph::make_timespan(interval)), caller(caller)
+ ts_interval(ceph::make_timespan(interval)), caller(caller), latency(latency)
{}
virtual ~RGWContinuousLeaseCR() override;
sc->env->async_rados, sc->env->driver,
{ sc->env->svc->zone->get_zone_params().log_pool,
RGWDataSyncStatusManager::sync_status_oid(sc->source_zone) },
- string(lock_name), lock_duration, caller);
+ string(lock_name), lock_duration, caller, &sc->lcc);
}
int operate(const DoutPrefixProvider *dpp) override {
tn->log(10, SSTR("writing shard_id " << sid << " of gen " << each->gen << " to error repo for retry"));
yield_spawn_window(rgw::error_repo::write_cr(sync_env->driver->svc()->rados, error_repo,
rgw::error_repo::encode_key(bs, each->gen),
- timestamp), cct->_conf->rgw_data_sync_spawn_window,
+ timestamp), sc->lcc.adj_concurrency(cct->_conf->rgw_data_sync_spawn_window),
[&](uint64_t stack_id, int ret) {
if (ret < 0) {
retcode = ret;
tn->log(10, SSTR("Write " << source_bs.shard_id << " to error repo for retry"));
yield_spawn_window(rgw::error_repo::write_cr(sync_env->driver->svc()->rados, error_repo,
rgw::error_repo::encode_key(source_bs, each->gen),
- timestamp), cct->_conf->rgw_data_sync_spawn_window, std::nullopt);
+ timestamp), sc->lcc.adj_concurrency(cct->_conf->rgw_data_sync_spawn_window), std::nullopt);
} else {
shard_cr = data_sync_single_entry(sc, source_bs, each->gen, key, timestamp,
lease_cr, bucket_shard_cache, nullptr, error_repo, tn, false);
yield call(shard_cr);
first_shard = false;
} else {
- yield_spawn_window(shard_cr, cct->_conf->rgw_data_sync_spawn_window,
+ yield_spawn_window(shard_cr, sc->lcc.adj_concurrency(cct->_conf->rgw_data_sync_spawn_window),
[&](uint64_t stack_id, int ret) {
if (ret < 0) {
retcode = ret;
sc, pool, source_bs, iter->first, sync_status,
error_repo, entry_timestamp, lease_cr,
bucket_shard_cache, &*marker_tracker, tn),
- cct->_conf->rgw_data_sync_spawn_window,
+ sc->lcc.adj_concurrency(cct->_conf->rgw_data_sync_spawn_window),
std::nullopt);
}
sync_marker.marker = iter->first;
log_iter->log_id, log_iter->log_timestamp,
lease_cr,bucket_shard_cache,
&*marker_tracker, error_repo, tn, false),
- cct->_conf->rgw_data_sync_spawn_window, std::nullopt);
+ sc->lcc.adj_concurrency(cct->_conf->rgw_data_sync_spawn_window), std::nullopt);
}
}
auto driver = sync_env->driver;
lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, driver,
rgw_raw_obj(pool, status_oid),
- lock_name, lock_duration, this));
+ lock_name, lock_duration, this,
+ &sc->lcc));
lease_stack.reset(spawn(lease_cr.get(), false));
}
};
entry->key, &marker_tracker, zones_trace, tn),
false);
}
- drain_with_cb(cct->_conf->rgw_bucket_sync_spawn_window,
+ drain_with_cb(sc->lcc.adj_concurrency(cct->_conf->rgw_bucket_sync_spawn_window),
[&](uint64_t stack_id, int ret) {
if (ret < 0) {
tn->log(10, "a sync operation returned error");
false);
}
// }
- drain_with_cb(cct->_conf->rgw_bucket_sync_spawn_window,
+ drain_with_cb(sc->lcc.adj_concurrency(cct->_conf->rgw_bucket_sync_spawn_window),
[&](uint64_t stack_id, int ret) {
if (ret < 0) {
tn->log(10, "a sync operation returned error");
yield_spawn_window(sync_bucket_shard_cr(sc, lease_cr, sync_pair,
gen, tn, &*cur_shard_progress),
- cct->_conf->rgw_bucket_sync_spawn_window,
+ sc->lcc.adj_concurrency(cct->_conf->rgw_bucket_sync_spawn_window),
[&](uint64_t stack_id, int ret) {
if (ret < 0) {
tn->log(10, SSTR("ERROR: a sync operation returned error: " << ret));
tn->log(20, "remote bilog indicates that sync was stopped");
if (!bucket_lease_cr) {
bucket_lease_cr.reset(new RGWContinuousLeaseCR(env->async_rados, env->driver, status_obj,
- lock_name, lock_duration, this));
+ lock_name, lock_duration, this, &sc->lcc));
yield spawn(bucket_lease_cr.get(), false);
while (!bucket_lease_cr->is_locked()) {
if (bucket_lease_cr->is_done()) {
// different shards from duplicating the init and full sync
if (!bucket_lease_cr) {
bucket_lease_cr.reset(new RGWContinuousLeaseCR(env->async_rados, env->driver, status_obj,
- lock_name, lock_duration, this));
+ lock_name, lock_duration, this, &sc->lcc));
yield spawn(bucket_lease_cr.get(), false);
while (!bucket_lease_cr->is_locked()) {
if (bucket_lease_cr->is_done()) {
#include "common/likely.h"
#include "rgw_coroutine.h"
+#include "rgw_cr_rados.h"
#include "rgw_http_client.h"
#include "rgw_sal_rados.h"
}
}
+/// \brief Adjust concurrency based on latency
+///
+/// Keep a running average of operation latency and scale concurrency
+/// down when latency rises.
+class LatencyConcurrencyControl : public LatencyMonitor {
+public:
+ CephContext* cct;
+
+ LatencyConcurrencyControl(CephContext* cct)
+ : cct(cct) {}
+
+ /// \brief Lower concurrency when latency rises
+ ///
+ /// Since we have multiple spawn windows (data sync overall and
+ /// bucket), accept a number of concurrent operations to spawn and,
+ /// if latency is high, cut it in half. If latency is really high,
+ /// cut it to 1.
+ int64_t adj_concurrency(int64_t concurrency) {
+ using namespace std::literals;
+ auto threshold = (cct->_conf->rgw_sync_lease_period * 1s) / 12;
+
+ if (avg_latency() >= 2 * threshold) [[unlikely]] {
+ return 1;
+ } else if (avg_latency() >= threshold) [[unlikely]] {
+ return concurrency / 2;
+ } else [[likely]] {
+ return concurrency;
+ }
+ }
+};
+
struct RGWDataSyncCtx {
RGWDataSyncEnv *env{nullptr};
CephContext *cct{nullptr};
RGWRESTConn *conn{nullptr};
rgw_zone_id source_zone;
+ LatencyConcurrencyControl lcc{nullptr};
+
RGWDataSyncCtx() = default;
RGWDataSyncCtx(RGWDataSyncEnv* env,
RGWRESTConn* conn,
const rgw_zone_id& source_zone)
- : env(env), cct(env->cct), conn(conn), source_zone(source_zone) {}
+ : env(env), cct(env->cct), conn(conn), source_zone(source_zone), lcc(cct) {}
void init(RGWDataSyncEnv *_env,
RGWRESTConn *_conn,
env = _env;
conn = _conn;
source_zone = _source_zone;
+ lcc.cct = cct;
}
};
rgw::sal::RadosStore* store = sync_env->store;
lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sync_env->status_oid()),
- lock_name, lock_duration, this));
+ lock_name, lock_duration, this, nullptr));
lease_stack.reset(spawn(lease_cr.get(), false));
}
while (!lease_cr->is_locked()) {
lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados,
sync_env->store,
rgw_raw_obj(sync_env->store->svc()->zone->get_zone_params().log_pool, sync_env->status_oid()),
- lock_name, lock_duration, this));
+ lock_name, lock_duration, this, nullptr));
lease_stack.reset(spawn(lease_cr.get(), false));
}
while (!lease_cr->is_locked()) {
rgw::sal::RadosStore* store = sync_env->store;
lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
rgw_raw_obj(pool, sync_env->shard_obj_name(shard_id)),
- lock_name, lock_duration, this));
+ lock_name, lock_duration, this, nullptr));
lease_stack.reset(spawn(lease_cr.get(), false));
lost_lock = false;
}
rgw::sal::RadosStore* store = sync_env->store;
lease_cr.reset( new RGWContinuousLeaseCR(sync_env->async_rados, store,
rgw_raw_obj(pool, sync_env->shard_obj_name(shard_id)),
- lock_name, lock_duration, this));
+ lock_name, lock_duration, this, nullptr));
lease_stack.reset(spawn(lease_cr.get(), false));
lost_lock = false;
}