#include "rgw_http_client.h"
#include "rgw_bucket.h"
#include "rgw_bucket_sync.h"
+#include "rgw_bucket_sync_cache.h"
#include "rgw_metadata.h"
#include "rgw_sync_counters.h"
#include "rgw_sync_error_repo.h"
class RGWDataSyncSingleEntryCR : public RGWCoroutine {
RGWDataSyncCtx *sc;
RGWDataSyncEnv *sync_env;
- rgw_bucket_shard source_bs;
+ rgw::bucket_sync::Handle state; // cached bucket-shard state
rgw_data_sync_obligation obligation;
RGWDataSyncShardMarkerTrack *marker_tracker;
boost::intrusive_ptr<RGWOmapAppend> error_repo;
ceph::real_time progress;
int sync_status = 0;
public:
- RGWDataSyncSingleEntryCR(RGWDataSyncCtx *_sc, const rgw_bucket_shard& source_bs,
- rgw_data_sync_obligation obligation, RGWDataSyncShardMarkerTrack *_marker_tracker,
+ RGWDataSyncSingleEntryCR(RGWDataSyncCtx *_sc, rgw::bucket_sync::Handle state,
+ rgw_data_sync_obligation obligation,
+ RGWDataSyncShardMarkerTrack *_marker_tracker,
RGWOmapAppend *_error_repo, const RGWSyncTraceNodeRef& _tn_parent)
- : RGWCoroutine(_sc->cct),
- sc(_sc), sync_env(_sc->env), source_bs(source_bs),
- obligation(std::move(obligation)),
- marker_tracker(_marker_tracker),
- error_repo(_error_repo) {
+ : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
+ state(std::move(state)), obligation(std::move(obligation)),
+ marker_tracker(_marker_tracker), error_repo(_error_repo) {
set_description() << "data sync single entry (source_zone=" << sc->source_zone << ") " << obligation;
tn = sync_env->sync_tracer->add_node(_tn_parent, "entry", obligation.key);
}
if (marker_tracker) {
marker_tracker->reset_need_retry(obligation.key);
}
- tn->log(0, SSTR("triggering sync of source bucket/shard " << bucket_shard_str{source_bs}));
+ tn->log(0, SSTR("triggering sync of source bucket/shard " << bucket_shard_str{state->key}));
yield call(new RGWRunBucketSourcesSyncCR(sc,
std::nullopt, /* target_bs */
- source_bs,
+ state->key,
tn, &progress));
if (retcode == 0) {
tn->log(20, SSTR("RunBucketSources progress=" << progress));
std::string next_marker;
list<rgw_data_change_log_entry> log_entries;
list<rgw_data_change_log_entry>::iterator log_iter;
- bool truncated;
+ bool truncated = false;
ceph::mutex inc_lock = ceph::make_mutex("RGWDataSyncShardCR::inc_lock");
ceph::condition_variable inc_cond;
set<string>::iterator modified_iter;
- int total_entries;
-
- int spawn_window;
-
- bool *reset_backoff;
+ uint64_t total_entries = 0;
+ static constexpr int spawn_window = BUCKET_SHARD_SYNC_SPAWN_WINDOW;
+ bool *reset_backoff = nullptr;
boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
string error_oid;
- RGWOmapAppend *error_repo;
+ RGWOmapAppend *error_repo = nullptr;
std::map<std::string, bufferlist> error_entries;
string error_marker;
ceph::real_time entry_timestamp;
- int max_error_entries;
+ static constexpr int max_error_entries = DATA_SYNC_MAX_ERR_ENTRIES;
ceph::coarse_real_time error_retry_time;
#define RETRY_BACKOFF_SECS_MIN 60
#define RETRY_BACKOFF_SECS_DEFAULT 60
#define RETRY_BACKOFF_SECS_MAX 600
- uint32_t retry_backoff_secs;
+ uint32_t retry_backoff_secs = RETRY_BACKOFF_SECS_DEFAULT;
RGWSyncTraceNodeRef tn;
rgw_bucket_shard source_bs;
+ // target number of entries to cache before recycling idle ones
+ static constexpr size_t target_cache_size = 256;
+ boost::intrusive_ptr<rgw::bucket_sync::Cache> bucket_shard_cache;
+
int parse_bucket_key(const std::string& key, rgw_bucket_shard& bs) const {
return rgw_bucket_parse_bucket_key(sync_env->cct, key,
&bs.bucket, &bs.shard_id);
const std::string& key,
const std::string& marker,
ceph::real_time timestamp, bool retry) {
+ auto state = bucket_shard_cache->get(src);
auto obligation = rgw_data_sync_obligation{key, marker, timestamp, retry};
- return new RGWDataSyncSingleEntryCR(sc, src, std::move(obligation),
+ return new RGWDataSyncSingleEntryCR(sc, std::move(state), std::move(obligation),
&*marker_tracker, error_repo, tn);
}
public:
- RGWDataSyncShardCR(RGWDataSyncCtx *_sc,
- rgw_pool& _pool,
+ RGWDataSyncShardCR(RGWDataSyncCtx *_sc, rgw_pool& _pool,
uint32_t _shard_id, rgw_data_sync_marker& _marker,
- RGWSyncTraceNodeRef& _tn,
- bool *_reset_backoff) : RGWCoroutine(_sc->cct),
- sc(_sc), sync_env(_sc->env),
- pool(_pool),
- shard_id(_shard_id),
- sync_marker(_marker),
- truncated(false),
- total_entries(0), spawn_window(BUCKET_SHARD_SYNC_SPAWN_WINDOW), reset_backoff(NULL),
- lease_cr(nullptr), lease_stack(nullptr), error_repo(nullptr), max_error_entries(DATA_SYNC_MAX_ERR_ENTRIES),
- retry_backoff_secs(RETRY_BACKOFF_SECS_DEFAULT), tn(_tn) {
+ RGWSyncTraceNodeRef& _tn, bool *_reset_backoff)
+ : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
+ pool(_pool), shard_id(_shard_id), sync_marker(_marker), tn(_tn),
+ bucket_shard_cache(rgw::bucket_sync::Cache::create(target_cache_size))
+ {
set_description() << "data sync shard source_zone=" << sc->source_zone << " shard_id=" << shard_id;
status_oid = RGWDataSyncStatusManager::shard_obj_name(sc->source_zone, shard_id);
error_oid = status_oid + ".retry";