}
};
-static constexpr auto DATA_SYNC_MAX_ERR_ENTRIES = 10;
-
class RGWDataBaseSyncShardCR : public RGWCoroutine {
protected:
RGWDataSyncCtx *const sc;
};
class RGWDataIncSyncShardCR : public RGWDataBaseSyncShardCR {
- static constexpr int max_error_entries = DATA_SYNC_MAX_ERR_ENTRIES;
+ static constexpr int max_error_entries = 10;
static constexpr uint32_t retry_backoff_secs = 60;
ceph::mutex& inc_lock;
};
class RGWDataSyncShardCR : public RGWCoroutine {
- static constexpr auto OMAP_GET_MAX_ENTRIES = 100;
- RGWDataSyncCtx *sc;
- RGWDataSyncEnv *sync_env;
-
- rgw_pool pool;
-
- uint32_t shard_id;
+ RGWDataSyncCtx *const sc;
+ const rgw_pool pool;
+ const uint32_t shard_id;
rgw_data_sync_marker& sync_marker;
rgw_data_sync_status sync_status;
-
- RGWRadosGetOmapValsCR::ResultPtr omapvals;
- std::map<std::string, bufferlist> entries;
- std::map<std::string, bufferlist>::iterator iter;
-
- string oid;
-
- std::optional<RGWDataSyncShardMarkerTrack> marker_tracker;
-
- std::string next_marker;
- vector<rgw_data_change_log_entry> log_entries;
- vector<rgw_data_change_log_entry>::iterator log_iter;
- bool truncated = false;
+ const RGWSyncTraceNodeRef tn;
+ bool *reset_backoff; // TODO We do nothing with this pointer.
ceph::mutex inc_lock = ceph::make_mutex("RGWDataSyncShardCR::inc_lock");
ceph::condition_variable inc_cond;
- boost::asio::coroutine incremental_cr;
- boost::asio::coroutine full_cr;
-
+ RGWDataSyncEnv *const sync_env{ sc->env };
- bc::flat_set<rgw_data_notify_entry> modified_shards;
- bc::flat_set<rgw_data_notify_entry> current_modified;
+ const string status_oid{ RGWDataSyncStatusManager::shard_obj_name(
+ sc->source_zone, shard_id) };
+ const rgw_raw_obj error_repo{ pool, status_oid + ".retry" };
- bc::flat_set<rgw_data_notify_entry>::iterator modified_iter;
-
- uint64_t total_entries = 0;
- bool *reset_backoff = nullptr;
+ // target number of entries to cache before recycling idle ones
+ static constexpr size_t target_cache_size = 256;
+ boost::intrusive_ptr<rgw::bucket_sync::Cache> bucket_shard_cache {
+ rgw::bucket_sync::Cache::create(target_cache_size) };
boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
- string status_oid;
-
- rgw_raw_obj error_repo;
- std::map<std::string, bufferlist> error_entries;
- string error_marker;
- ceph::real_time entry_timestamp;
- static constexpr int max_error_entries = DATA_SYNC_MAX_ERR_ENTRIES;
-
- ceph::coarse_real_time error_retry_time;
- static constexpr uint32_t retry_backoff_secs = 60;
-
- RGWSyncTraceNodeRef tn;
-
- rgw_bucket_shard source_bs;
- std::optional<uint64_t> gen;
- // target number of entries to cache before recycling idle ones
- static constexpr size_t target_cache_size = 256;
- boost::intrusive_ptr<rgw::bucket_sync::Cache> bucket_shard_cache;
-
- int parse_bucket_key(const std::string& key, rgw_bucket_shard& bs) const {
- return rgw_bucket_parse_bucket_key(sync_env->cct, key,
- &bs.bucket, &bs.shard_id);
- }
+ bc::flat_set<rgw_data_notify_entry> modified_shards;
public:
- RGWDataSyncShardCR(RGWDataSyncCtx *_sc, rgw_pool& _pool,
- uint32_t _shard_id, rgw_data_sync_marker& _marker,
- const rgw_data_sync_status& _sync_status,
- RGWSyncTraceNodeRef& _tn, bool *_reset_backoff)
- : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
- pool(_pool), shard_id(_shard_id), sync_marker(_marker), sync_status(_sync_status),
- status_oid(RGWDataSyncStatusManager::shard_obj_name(sc->source_zone, shard_id)),
- error_repo(pool, status_oid + ".retry"), tn(_tn),
- bucket_shard_cache(rgw::bucket_sync::Cache::create(target_cache_size))
- {
- set_description() << "data sync shard source_zone=" << sc->source_zone << " shard_id=" << shard_id;
+ RGWDataSyncShardCR(RGWDataSyncCtx* const _sc, const rgw_pool& pool,
+ const uint32_t shard_id, rgw_data_sync_marker& marker,
+ const rgw_data_sync_status& sync_status,
+ RGWSyncTraceNodeRef& tn, bool *reset_backoff)
+ : RGWCoroutine(_sc->cct), sc(_sc), pool(pool), shard_id(shard_id),
+ sync_marker(marker), sync_status(sync_status), tn(tn),
+ reset_backoff(reset_backoff) {
+ set_description() << "data sync shard source_zone=" << sc->source_zone
+ << " shard_id=" << shard_id;
}
~RGWDataSyncShardCR() override {
lock_name, lock_duration, this));
lease_stack.reset(spawn(lease_cr.get(), false));
}
-
- int full_sync() {
- int max_entries = OMAP_GET_MAX_ENTRIES;
- reenter(&full_cr) {
- tn->log(10, "start full sync");
- yield init_lease_cr();
- while (!lease_cr->is_locked()) {
- if (lease_cr->is_done()) {
- tn->log(5, "failed to take lease");
- set_status("lease lock failed, early abort");
- drain_all();
- return set_cr_error(lease_cr->get_ret_status());
- }
- set_sleeping(true);
- yield;
- }
- tn->log(10, "took lease");
- oid = full_data_sync_index_shard_oid(sc->source_zone, shard_id);
- marker_tracker.emplace(sc, status_oid, sync_marker, tn);
- total_entries = sync_marker.pos;
- entry_timestamp = sync_marker.timestamp; // time when full sync started
- do {
- if (!lease_cr->is_locked()) {
- lease_cr->go_down();
- drain_all();
- return set_cr_error(-ECANCELED);
- }
- omapvals = std::make_shared<RGWRadosGetOmapValsCR::Result>();
- yield call(new RGWRadosGetOmapValsCR(sync_env->store, rgw_raw_obj(pool, oid),
- sync_marker.marker, max_entries, omapvals));
- if (retcode < 0) {
- lease_cr->go_down();
- drain_all();
- return set_cr_error(retcode);
- }
- entries = std::move(omapvals->entries);
- if (entries.size() > 0) {
- tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
- }
- tn->log(20, SSTR("retrieved " << entries.size() << " entries to sync"));
- iter = entries.begin();
- for (; iter != entries.end(); ++iter) {
- retcode = parse_bucket_key(iter->first, source_bs);
- if (retcode < 0) {
- tn->log(1, SSTR("failed to parse bucket shard: " << iter->first));
- marker_tracker->try_update_high_marker(iter->first, 0, entry_timestamp);
- continue;
- }
- tn->log(20, SSTR("full sync: " << iter->first));
- total_entries++;
- if (!marker_tracker->start(iter->first, total_entries, entry_timestamp)) {
- tn->log(0, SSTR("ERROR: cannot start syncing " << iter->first << ". Duplicate entry?"));
- } else {
- tn->log(10, SSTR("timestamp for " << iter->first << " is :" << entry_timestamp));
- yield_spawn_window(new RGWDataFullSyncSingleEntryCR(sc, pool, source_bs, iter->first, sync_status,
- error_repo, entry_timestamp, lease_cr, bucket_shard_cache, &*marker_tracker, tn),
- cct->_conf->rgw_data_sync_spawn_window, std::nullopt);
- }
- sync_marker.marker = iter->first;
- }
- } while (omapvals->more);
- omapvals.reset();
-
- drain_all_but_stack(lease_stack.get());
-
- tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
-
- yield {
- /* update marker to reflect we're done with full sync */
- sync_marker.state = rgw_data_sync_marker::IncrementalSync;
- sync_marker.marker = sync_marker.next_step_marker;
- sync_marker.next_step_marker.clear();
- call(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->dpp, sync_env->async_rados, sync_env->svc->sysobj,
- rgw_raw_obj(pool, status_oid),
- sync_marker));
- }
- if (retcode < 0) {
- tn->log(0, SSTR("ERROR: failed to set sync marker: retcode=" << retcode));
- lease_cr->go_down();
- drain_all();
- return set_cr_error(retcode);
- }
- // clean up full sync index
- yield {
- const auto& pool = sync_env->svc->zone->get_zone_params().log_pool;
- auto oid = full_data_sync_index_shard_oid(sc->source_zone.id, shard_id);
- call(new RGWRadosRemoveCR(sync_env->store, {pool, oid}));
- }
- // keep lease and transition to incremental_sync()
- }
- return 0;
- }
-
- int incremental_sync() {
- reenter(&incremental_cr) {
- tn->log(10, "start incremental sync");
- if (lease_cr) {
- tn->log(10, "lease already held from full sync");
- } else {
- yield init_lease_cr();
- while (!lease_cr->is_locked()) {
- if (lease_cr->is_done()) {
- tn->log(5, "failed to take lease");
- set_status("lease lock failed, early abort");
- drain_all();
- return set_cr_error(lease_cr->get_ret_status());
- }
- set_sleeping(true);
- yield;
- }
- set_status("lease acquired");
- tn->log(10, "took lease");
- }
- marker_tracker.emplace(sc, status_oid, sync_marker, tn);
- do {
- if (!lease_cr->is_locked()) {
- lease_cr->go_down();
- drain_all();
- return set_cr_error(-ECANCELED);
- }
- current_modified.clear();
- inc_lock.lock();
- current_modified.swap(modified_shards);
- inc_lock.unlock();
-
- if (current_modified.size() > 0) {
- tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
- }
- /* process out of band updates */
- for (modified_iter = current_modified.begin(); modified_iter != current_modified.end(); ++modified_iter) {
- retcode = parse_bucket_key(modified_iter->key, source_bs);
- if (retcode < 0) {
- tn->log(1, SSTR("failed to parse bucket shard: " << modified_iter->key));
- continue;
- }
- tn->log(20, SSTR("received async update notification: " << modified_iter->key));
- spawn(data_sync_single_entry(sc, source_bs, modified_iter->gen, string(),
- ceph::real_time{}, lease_cr, bucket_shard_cache, &*marker_tracker, error_repo, tn, false), false);
- }
-
- if (error_retry_time <= ceph::coarse_real_clock::now()) {
- /* process bucket shards that previously failed */
- omapvals = std::make_shared<RGWRadosGetOmapValsCR::Result>();
- yield call(new RGWRadosGetOmapValsCR(sync_env->store, error_repo,
- error_marker, max_error_entries, omapvals));
- error_entries = std::move(omapvals->entries);
- tn->log(20, SSTR("read error repo, got " << error_entries.size() << " entries"));
- iter = error_entries.begin();
- for (; iter != error_entries.end(); ++iter) {
- error_marker = iter->first;
- entry_timestamp = rgw::error_repo::decode_value(iter->second);
- retcode = rgw::error_repo::decode_key(iter->first, source_bs, gen);
- if (retcode == -EINVAL) {
- // backward compatibility for string keys that don't encode a gen
- retcode = parse_bucket_key(error_marker, source_bs);
- }
- if (retcode < 0) {
- tn->log(1, SSTR("failed to parse bucket shard: " << error_marker));
- spawn(rgw::error_repo::remove_cr(sync_env->store->svc()->rados, error_repo,
- error_marker, entry_timestamp), false);
- continue;
- }
- tn->log(10, SSTR("gen is " << gen));
- if (!gen) {
- // write all full sync obligations for the bucket to error repo
- spawn(new RGWDataIncrementalSyncFullObligationCR(sc, source_bs, error_marker, entry_timestamp, tn), false);
- } else {
- tn->log(20, SSTR("handle error entry key=" << to_string(source_bs, gen) << " timestamp=" << entry_timestamp));
- spawn(data_sync_single_entry(sc, source_bs, gen, "", entry_timestamp, lease_cr,
- bucket_shard_cache, &*marker_tracker, error_repo, tn, true), false);
- }
- }
- if (!omapvals->more) {
- error_retry_time = ceph::coarse_real_clock::now() + make_timespan(retry_backoff_secs);
- error_marker.clear();
- }
- }
- omapvals.reset();
-
- tn->log(20, SSTR("shard_id=" << shard_id << " sync_marker=" << sync_marker.marker));
- yield call(new RGWReadRemoteDataLogShardCR(sc, shard_id, sync_marker.marker,
- &next_marker, &log_entries, &truncated));
- if (retcode < 0 && retcode != -ENOENT) {
- tn->log(0, SSTR("ERROR: failed to read remote data log info: ret=" << retcode));
- lease_cr->go_down();
- drain_all();
- return set_cr_error(retcode);
- }
-
- if (log_entries.size() > 0) {
- tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
- }
-
- for (log_iter = log_entries.begin(); log_iter != log_entries.end(); ++log_iter) {
- tn->log(20, SSTR("shard_id=" << shard_id << " log_entry: " << log_iter->log_id << ":" << log_iter->log_timestamp << ":" << log_iter->entry.key));
- retcode = parse_bucket_key(log_iter->entry.key, source_bs);
- if (retcode < 0) {
- tn->log(1, SSTR("failed to parse bucket shard: " << log_iter->entry.key));
- marker_tracker->try_update_high_marker(log_iter->log_id, 0, log_iter->log_timestamp);
- continue;
- }
- if (!marker_tracker->start(log_iter->log_id, 0, log_iter->log_timestamp)) {
- tn->log(0, SSTR("ERROR: cannot start syncing " << log_iter->log_id << ". Duplicate entry?"));
- } else {
- tn->log(1, SSTR("incremental sync on " << log_iter->entry.key << "shard: " << shard_id << "on gen " << log_iter->entry.gen));
- yield_spawn_window(data_sync_single_entry(sc, source_bs, log_iter->entry.gen, log_iter->log_id,
- log_iter->log_timestamp, lease_cr,bucket_shard_cache,
- &*marker_tracker, error_repo, tn, false),
- cct->_conf->rgw_data_sync_spawn_window, std::nullopt);
- }
- }
-
- tn->log(20, SSTR("shard_id=" << shard_id << " sync_marker=" << sync_marker.marker
- << " next_marker=" << next_marker << " truncated=" << truncated));
- if (!next_marker.empty()) {
- sync_marker.marker = next_marker;
- } else if (!log_entries.empty()) {
- sync_marker.marker = log_entries.back().log_id;
- }
- if (!truncated) {
- // we reached the end, wait a while before checking for more
- tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
- yield wait(get_idle_interval());
- }
- } while (true);
- }
- return 0;
- }
-
- utime_t get_idle_interval() const {
- ceph::timespan interval = std::chrono::seconds(cct->_conf->rgw_data_sync_poll_interval);
- if (!ceph::coarse_real_clock::is_zero(error_retry_time)) {
- auto now = ceph::coarse_real_clock::now();
- if (error_retry_time > now) {
- auto d = error_retry_time - now;
- if (interval > d) {
- interval = d;
- }
- }
- }
- // convert timespan -> time_point -> utime_t
- return utime_t(ceph::coarse_real_clock::zero() + interval);
- }
};
class RGWDataSyncShardControlCR : public RGWBackoffControlCR {