}
};
+class RGWDataIncSyncShardCR : public RGWDataBaseSyncShardCR {
+ static constexpr int max_error_entries = DATA_SYNC_MAX_ERR_ENTRIES;
+ static constexpr uint32_t retry_backoff_secs = 60;
+
+ ceph::mutex& inc_lock;
+ bc::flat_set<rgw_data_notify_entry>& modified_shards;
+
+ bc::flat_set<rgw_data_notify_entry> current_modified;
+ decltype(current_modified)::iterator modified_iter;
+
+ ceph::coarse_real_time error_retry_time;
+ string error_marker;
+ std::map<std::string, bufferlist> error_entries;
+ decltype(error_entries)::iterator iter;
+ ceph::real_time entry_timestamp;
+ std::optional<uint64_t> gen;
+
+ string next_marker;
+ vector<rgw_data_change_log_entry> log_entries;
+ decltype(log_entries)::iterator log_iter;
+ bool truncated = false;
+
+ utime_t get_idle_interval() const {
+ ceph::timespan interval = std::chrono::seconds(cct->_conf->rgw_data_sync_poll_interval);
+ if (!ceph::coarse_real_clock::is_zero(error_retry_time)) {
+ auto now = ceph::coarse_real_clock::now();
+ if (error_retry_time > now) {
+ auto d = error_retry_time - now;
+ if (interval > d) {
+ interval = d;
+ }
+ }
+ }
+ // convert timespan -> time_point -> utime_t
+ return utime_t(ceph::coarse_real_clock::zero() + interval);
+ }
+
+
+public:
+
+ RGWDataIncSyncShardCR(
+ RGWDataSyncCtx *const sc, const rgw_pool& pool, const uint32_t shard_id,
+ rgw_data_sync_marker& sync_marker, RGWSyncTraceNodeRef tn,
+ const string& status_oid, const rgw_raw_obj& error_repo,
+ const boost::intrusive_ptr<RGWContinuousLeaseCR>& lease_cr,
+ const rgw_data_sync_status& sync_status,
+ const boost::intrusive_ptr<rgw::bucket_sync::Cache>& bucket_shard_cache,
+ ceph::mutex& inc_lock,
+ bc::flat_set<rgw_data_notify_entry>& modified_shards)
+ : RGWDataBaseSyncShardCR(sc, pool, shard_id, sync_marker, tn,
+ status_oid, error_repo, lease_cr, sync_status,
+ bucket_shard_cache),
+ inc_lock(inc_lock), modified_shards(modified_shards) {}
+
+ int operate(const DoutPrefixProvider *dpp) override {
+ reenter(this) {
+ tn->log(10, "start incremental sync");
+ marker_tracker.emplace(sc, status_oid, sync_marker, tn);
+ do {
+ if (!lease_cr->is_locked()) {
+ lease_cr->go_down();
+ drain_all();
+ return set_cr_error(-ECANCELED);
+ }
+ {
+ current_modified.clear();
+ std::unique_lock il(inc_lock);
+ current_modified.swap(modified_shards);
+ il.unlock();
+ }
+
+ if (current_modified.size() > 0) {
+ tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
+ }
+ /* process out of band updates */
+ for (modified_iter = current_modified.begin();
+ modified_iter != current_modified.end();
+ ++modified_iter) {
+ retcode = parse_bucket_key(modified_iter->key, source_bs);
+ if (retcode < 0) {
+ tn->log(1, SSTR("failed to parse bucket shard: "
+ << modified_iter->key));
+ continue;
+ }
+ tn->log(20, SSTR("received async update notification: "
+ << modified_iter->key));
+ spawn(data_sync_single_entry(sc, source_bs, modified_iter->gen, {},
+ ceph::real_time{}, lease_cr,
+ bucket_shard_cache, &*marker_tracker,
+ error_repo, tn, false), false);
+ }
+
+ if (error_retry_time <= ceph::coarse_real_clock::now()) {
+ /* process bucket shards that previously failed */
+ omapvals = std::make_shared<RGWRadosGetOmapValsCR::Result>();
+ yield call(new RGWRadosGetOmapValsCR(sc->env->store, error_repo,
+ error_marker, max_error_entries,
+ omapvals));
+ error_entries = std::move(omapvals->entries);
+ tn->log(20, SSTR("read error repo, got " << error_entries.size()
+ << " entries"));
+ iter = error_entries.begin();
+ for (; iter != error_entries.end(); ++iter) {
+ error_marker = iter->first;
+ entry_timestamp = rgw::error_repo::decode_value(iter->second);
+ retcode = rgw::error_repo::decode_key(iter->first, source_bs, gen);
+ if (retcode == -EINVAL) {
+ // backward compatibility for string keys that don't encode a gen
+ retcode = parse_bucket_key(error_marker, source_bs);
+ }
+ if (retcode < 0) {
+ tn->log(1, SSTR("failed to parse bucket shard: " << error_marker));
+ spawn(rgw::error_repo::remove_cr(sc->env->store->svc()->rados,
+ error_repo, error_marker,
+ entry_timestamp),
+ false);
+ continue;
+ }
+ tn->log(10, SSTR("gen is " << gen));
+ if (!gen) {
+ // write all full sync obligations for the bucket to error repo
+ spawn(new RGWDataIncrementalSyncFullObligationCR(
+ sc, source_bs,error_marker, entry_timestamp, tn), false);
+ } else {
+ tn->log(20, SSTR("handle error entry key="
+ << to_string(source_bs, gen)
+ << " timestamp=" << entry_timestamp));
+ spawn(data_sync_single_entry(sc, source_bs, gen, "",
+ entry_timestamp, lease_cr,
+ bucket_shard_cache, &*marker_tracker,
+ error_repo, tn, true), false);
+ }
+ }
+ if (!omapvals->more) {
+ error_retry_time = ceph::coarse_real_clock::now() +
+ make_timespan(retry_backoff_secs);
+ error_marker.clear();
+ }
+ }
+ omapvals.reset();
+
+ tn->log(20, SSTR("shard_id=" << shard_id << " sync_marker="
+ << sync_marker.marker));
+ yield call(new RGWReadRemoteDataLogShardCR(sc, shard_id,
+ sync_marker.marker,
+ &next_marker, &log_entries,
+ &truncated));
+ if (retcode < 0 && retcode != -ENOENT) {
+ tn->log(0, SSTR("ERROR: failed to read remote data log info: ret="
+ << retcode));
+ lease_cr->go_down();
+ drain_all();
+ return set_cr_error(retcode);
+ }
+
+ if (log_entries.size() > 0) {
+ tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
+ }
+
+ for (log_iter = log_entries.begin();
+ log_iter != log_entries.end();
+ ++log_iter) {
+ tn->log(20, SSTR("shard_id=" << shard_id << " log_entry: "
+ << log_iter->log_id << ":" << log_iter->log_timestamp
+ << ":" << log_iter->entry.key));
+ retcode = parse_bucket_key(log_iter->entry.key, source_bs);
+ if (retcode < 0) {
+ tn->log(1, SSTR("failed to parse bucket shard: "
+ << log_iter->entry.key));
+ marker_tracker->try_update_high_marker(log_iter->log_id, 0,
+ log_iter->log_timestamp);
+ continue;
+ }
+ if (!marker_tracker->start(log_iter->log_id, 0,
+ log_iter->log_timestamp)) {
+ tn->log(0, SSTR("ERROR: cannot start syncing " << log_iter->log_id
+ << ". Duplicate entry?"));
+ } else {
+ tn->log(1, SSTR("incremental sync on " << log_iter->entry.key
+ << "shard: " << shard_id << "on gen "
+ << log_iter->entry.gen));
+ yield_spawn_window(
+ data_sync_single_entry(sc, source_bs,log_iter->entry.gen,
+ log_iter->log_id, log_iter->log_timestamp,
+ lease_cr,bucket_shard_cache,
+ &*marker_tracker, error_repo, tn, false),
+ cct->_conf->rgw_data_sync_spawn_window, std::nullopt);
+ }
+ }
+
+ tn->log(20, SSTR("shard_id=" << shard_id <<
+ " sync_marker="<< sync_marker.marker
+ << " next_marker=" << next_marker
+ << " truncated=" << truncated));
+ if (!next_marker.empty()) {
+ sync_marker.marker = next_marker;
+ } else if (!log_entries.empty()) {
+ sync_marker.marker = log_entries.back().log_id;
+ }
+ if (!truncated) {
+ // we reached the end, wait a while before checking for more
+ tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
+ yield wait(get_idle_interval());
+ }
+ } while (true);
+ }
+ return 0;
+ }
+};
+
class RGWDataSyncShardCR : public RGWCoroutine {
static constexpr auto OMAP_GET_MAX_ENTRIES = 100;
RGWDataSyncCtx *sc;