From: Casey Bodley Date: Tue, 31 Mar 2020 13:23:12 +0000 (-0400) Subject: rgw: DataSyncSingleEntry takes cached state X-Git-Tag: v16.1.0~2586^2~11 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=04dc0d0d77ae8de199628c3ccf234f3276820869;p=ceph.git rgw: DataSyncSingleEntry takes cached state Signed-off-by: Casey Bodley --- diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index 4b485599dcc..dd7e3b99a5d 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -22,6 +22,7 @@ #include "rgw_http_client.h" #include "rgw_bucket.h" #include "rgw_bucket_sync.h" +#include "rgw_bucket_sync_cache.h" #include "rgw_metadata.h" #include "rgw_sync_counters.h" #include "rgw_sync_error_repo.h" @@ -1288,7 +1289,7 @@ public: class RGWDataSyncSingleEntryCR : public RGWCoroutine { RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; - rgw_bucket_shard source_bs; + rgw::bucket_sync::Handle state; // cached bucket-shard state rgw_data_sync_obligation obligation; RGWDataSyncShardMarkerTrack *marker_tracker; boost::intrusive_ptr error_repo; @@ -1297,14 +1298,13 @@ class RGWDataSyncSingleEntryCR : public RGWCoroutine { ceph::real_time progress; int sync_status = 0; public: - RGWDataSyncSingleEntryCR(RGWDataSyncCtx *_sc, const rgw_bucket_shard& source_bs, - rgw_data_sync_obligation obligation, RGWDataSyncShardMarkerTrack *_marker_tracker, + RGWDataSyncSingleEntryCR(RGWDataSyncCtx *_sc, rgw::bucket_sync::Handle state, + rgw_data_sync_obligation obligation, + RGWDataSyncShardMarkerTrack *_marker_tracker, RGWOmapAppend *_error_repo, const RGWSyncTraceNodeRef& _tn_parent) - : RGWCoroutine(_sc->cct), - sc(_sc), sync_env(_sc->env), source_bs(source_bs), - obligation(std::move(obligation)), - marker_tracker(_marker_tracker), - error_repo(_error_repo) { + : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), + state(std::move(state)), obligation(std::move(obligation)), + marker_tracker(_marker_tracker), error_repo(_error_repo) { set_description() << "data sync single entry (source_zone=" << sc->source_zone << ") " << obligation; tn = sync_env->sync_tracer->add_node(_tn_parent, "entry", obligation.key); } @@ -1315,11 +1315,11 @@ public: if (marker_tracker) { marker_tracker->reset_need_retry(obligation.key); } - tn->log(0, SSTR("triggering sync of source bucket/shard " << bucket_shard_str{source_bs})); + tn->log(0, SSTR("triggering sync of source bucket/shard " << bucket_shard_str{state->key})); yield call(new RGWRunBucketSourcesSyncCR(sc, std::nullopt, /* target_bs */ - source_bs, + state->key, tn, &progress)); if (retcode == 0) { tn->log(20, SSTR("RunBucketSources progress=" << progress)); @@ -1400,7 +1400,7 @@ class RGWDataSyncShardCR : public RGWCoroutine { std::string next_marker; list log_entries; list::iterator log_iter; - bool truncated; + bool truncated = false; ceph::mutex inc_lock = ceph::make_mutex("RGWDataSyncShardCR::inc_lock"); ceph::condition_variable inc_cond; @@ -1414,11 +1414,9 @@ class RGWDataSyncShardCR : public RGWCoroutine { set::iterator modified_iter; - int total_entries; - - int spawn_window; - - bool *reset_backoff; + uint64_t total_entries = 0; + static constexpr int spawn_window = BUCKET_SHARD_SYNC_SPAWN_WINDOW; + bool *reset_backoff = nullptr; boost::intrusive_ptr lease_cr; boost::intrusive_ptr lease_stack; @@ -1426,23 +1424,27 @@ class RGWDataSyncShardCR : public RGWCoroutine { string error_oid; - RGWOmapAppend *error_repo; + RGWOmapAppend *error_repo = nullptr; std::map error_entries; string error_marker; ceph::real_time entry_timestamp; - int max_error_entries; + static constexpr int max_error_entries = DATA_SYNC_MAX_ERR_ENTRIES; ceph::coarse_real_time error_retry_time; #define RETRY_BACKOFF_SECS_MIN 60 #define RETRY_BACKOFF_SECS_DEFAULT 60 #define RETRY_BACKOFF_SECS_MAX 600 - uint32_t retry_backoff_secs; + uint32_t retry_backoff_secs = RETRY_BACKOFF_SECS_DEFAULT; RGWSyncTraceNodeRef tn; rgw_bucket_shard source_bs; + // target number of entries to cache before recycling idle ones + static constexpr size_t target_cache_size = 256; + boost::intrusive_ptr bucket_shard_cache; + int parse_bucket_key(const std::string& key, rgw_bucket_shard& bs) const { return rgw_bucket_parse_bucket_key(sync_env->cct, key, &bs.bucket, &bs.shard_id); @@ -1451,24 +1453,19 @@ class RGWDataSyncShardCR : public RGWCoroutine { const std::string& key, const std::string& marker, ceph::real_time timestamp, bool retry) { + auto state = bucket_shard_cache->get(src); auto obligation = rgw_data_sync_obligation{key, marker, timestamp, retry}; - return new RGWDataSyncSingleEntryCR(sc, src, std::move(obligation), + return new RGWDataSyncSingleEntryCR(sc, std::move(state), std::move(obligation), &*marker_tracker, error_repo, tn); } public: - RGWDataSyncShardCR(RGWDataSyncCtx *_sc, - rgw_pool& _pool, + RGWDataSyncShardCR(RGWDataSyncCtx *_sc, rgw_pool& _pool, uint32_t _shard_id, rgw_data_sync_marker& _marker, - RGWSyncTraceNodeRef& _tn, - bool *_reset_backoff) : RGWCoroutine(_sc->cct), - sc(_sc), sync_env(_sc->env), - pool(_pool), - shard_id(_shard_id), - sync_marker(_marker), - truncated(false), - total_entries(0), spawn_window(BUCKET_SHARD_SYNC_SPAWN_WINDOW), reset_backoff(NULL), - lease_cr(nullptr), lease_stack(nullptr), error_repo(nullptr), max_error_entries(DATA_SYNC_MAX_ERR_ENTRIES), - retry_backoff_secs(RETRY_BACKOFF_SECS_DEFAULT), tn(_tn) { + RGWSyncTraceNodeRef& _tn, bool *_reset_backoff) + : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), + pool(_pool), shard_id(_shard_id), sync_marker(_marker), tn(_tn), + bucket_shard_cache(rgw::bucket_sync::Cache::create(target_cache_size)) + { set_description() << "data sync shard source_zone=" << sc->source_zone << " shard_id=" << shard_id; status_oid = RGWDataSyncStatusManager::shard_obj_name(sc->source_zone, shard_id); error_oid = status_oid + ".retry";