From c2c4aae3e0058453af77755e0f326f64954cddeb Mon Sep 17 00:00:00 2001 From: "Adam C. Emerson" Date: Thu, 4 Aug 2022 18:00:31 -0400 Subject: [PATCH] rgw: Break out RGWDataIncSyncShardCR This was formerly the function RGWDataSyncShardCR::incremental_sync. As with full_sync, we transfer responsibility for acquiring the lease to the top level RGWDataSyncShardCR coroutine. Fixes: https://tracker.ceph.com/issues/57063 Signed-off-by: Adam C. Emerson --- src/rgw/rgw_data_sync.cc | 210 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 210 insertions(+) diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index 18c9fa04f951a..5b74150cc8dc8 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -1765,6 +1765,216 @@ public: } }; +class RGWDataIncSyncShardCR : public RGWDataBaseSyncShardCR { + static constexpr int max_error_entries = DATA_SYNC_MAX_ERR_ENTRIES; + static constexpr uint32_t retry_backoff_secs = 60; + + ceph::mutex& inc_lock; + bc::flat_set& modified_shards; + + bc::flat_set current_modified; + decltype(current_modified)::iterator modified_iter; + + ceph::coarse_real_time error_retry_time; + string error_marker; + std::map error_entries; + decltype(error_entries)::iterator iter; + ceph::real_time entry_timestamp; + std::optional gen; + + string next_marker; + vector log_entries; + decltype(log_entries)::iterator log_iter; + bool truncated = false; + + utime_t get_idle_interval() const { + ceph::timespan interval = std::chrono::seconds(cct->_conf->rgw_data_sync_poll_interval); + if (!ceph::coarse_real_clock::is_zero(error_retry_time)) { + auto now = ceph::coarse_real_clock::now(); + if (error_retry_time > now) { + auto d = error_retry_time - now; + if (interval > d) { + interval = d; + } + } + } + // convert timespan -> time_point -> utime_t + return utime_t(ceph::coarse_real_clock::zero() + interval); + } + + +public: + + RGWDataIncSyncShardCR( + RGWDataSyncCtx *const sc, const rgw_pool& pool, const uint32_t shard_id, + rgw_data_sync_marker& sync_marker, RGWSyncTraceNodeRef tn, + const string& status_oid, const rgw_raw_obj& error_repo, + const boost::intrusive_ptr& lease_cr, + const rgw_data_sync_status& sync_status, + const boost::intrusive_ptr& bucket_shard_cache, + ceph::mutex& inc_lock, + bc::flat_set& modified_shards) + : RGWDataBaseSyncShardCR(sc, pool, shard_id, sync_marker, tn, + status_oid, error_repo, lease_cr, sync_status, + bucket_shard_cache), + inc_lock(inc_lock), modified_shards(modified_shards) {} + + int operate(const DoutPrefixProvider *dpp) override { + reenter(this) { + tn->log(10, "start incremental sync"); + marker_tracker.emplace(sc, status_oid, sync_marker, tn); + do { + if (!lease_cr->is_locked()) { + lease_cr->go_down(); + drain_all(); + return set_cr_error(-ECANCELED); + } + { + current_modified.clear(); + std::unique_lock il(inc_lock); + current_modified.swap(modified_shards); + il.unlock(); + } + + if (current_modified.size() > 0) { + tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */ + } + /* process out of band updates */ + for (modified_iter = current_modified.begin(); + modified_iter != current_modified.end(); + ++modified_iter) { + retcode = parse_bucket_key(modified_iter->key, source_bs); + if (retcode < 0) { + tn->log(1, SSTR("failed to parse bucket shard: " + << modified_iter->key)); + continue; + } + tn->log(20, SSTR("received async update notification: " + << modified_iter->key)); + spawn(data_sync_single_entry(sc, source_bs, modified_iter->gen, {}, + ceph::real_time{}, lease_cr, + bucket_shard_cache, &*marker_tracker, + error_repo, tn, false), false); + } + + if (error_retry_time <= ceph::coarse_real_clock::now()) { + /* process bucket shards that previously failed */ + omapvals = std::make_shared(); + yield call(new RGWRadosGetOmapValsCR(sc->env->store, error_repo, + error_marker, max_error_entries, + omapvals)); + error_entries = std::move(omapvals->entries); + tn->log(20, SSTR("read error repo, got " << error_entries.size() + << " entries")); + iter = error_entries.begin(); + for (; iter != error_entries.end(); ++iter) { + error_marker = iter->first; + entry_timestamp = rgw::error_repo::decode_value(iter->second); + retcode = rgw::error_repo::decode_key(iter->first, source_bs, gen); + if (retcode == -EINVAL) { + // backward compatibility for string keys that don't encode a gen + retcode = parse_bucket_key(error_marker, source_bs); + } + if (retcode < 0) { + tn->log(1, SSTR("failed to parse bucket shard: " << error_marker)); + spawn(rgw::error_repo::remove_cr(sc->env->store->svc()->rados, + error_repo, error_marker, + entry_timestamp), + false); + continue; + } + tn->log(10, SSTR("gen is " << gen)); + if (!gen) { + // write all full sync obligations for the bucket to error repo + spawn(new RGWDataIncrementalSyncFullObligationCR( + sc, source_bs,error_marker, entry_timestamp, tn), false); + } else { + tn->log(20, SSTR("handle error entry key=" + << to_string(source_bs, gen) + << " timestamp=" << entry_timestamp)); + spawn(data_sync_single_entry(sc, source_bs, gen, "", + entry_timestamp, lease_cr, + bucket_shard_cache, &*marker_tracker, + error_repo, tn, true), false); + } + } + if (!omapvals->more) { + error_retry_time = ceph::coarse_real_clock::now() + + make_timespan(retry_backoff_secs); + error_marker.clear(); + } + } + omapvals.reset(); + + tn->log(20, SSTR("shard_id=" << shard_id << " sync_marker=" + << sync_marker.marker)); + yield call(new RGWReadRemoteDataLogShardCR(sc, shard_id, + sync_marker.marker, + &next_marker, &log_entries, + &truncated)); + if (retcode < 0 && retcode != -ENOENT) { + tn->log(0, SSTR("ERROR: failed to read remote data log info: ret=" + << retcode)); + lease_cr->go_down(); + drain_all(); + return set_cr_error(retcode); + } + + if (log_entries.size() > 0) { + tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */ + } + + for (log_iter = log_entries.begin(); + log_iter != log_entries.end(); + ++log_iter) { + tn->log(20, SSTR("shard_id=" << shard_id << " log_entry: " + << log_iter->log_id << ":" << log_iter->log_timestamp + << ":" << log_iter->entry.key)); + retcode = parse_bucket_key(log_iter->entry.key, source_bs); + if (retcode < 0) { + tn->log(1, SSTR("failed to parse bucket shard: " + << log_iter->entry.key)); + marker_tracker->try_update_high_marker(log_iter->log_id, 0, + log_iter->log_timestamp); + continue; + } + if (!marker_tracker->start(log_iter->log_id, 0, + log_iter->log_timestamp)) { + tn->log(0, SSTR("ERROR: cannot start syncing " << log_iter->log_id + << ". Duplicate entry?")); + } else { + tn->log(1, SSTR("incremental sync on " << log_iter->entry.key + << "shard: " << shard_id << "on gen " + << log_iter->entry.gen)); + yield_spawn_window( + data_sync_single_entry(sc, source_bs,log_iter->entry.gen, + log_iter->log_id, log_iter->log_timestamp, + lease_cr,bucket_shard_cache, + &*marker_tracker, error_repo, tn, false), + cct->_conf->rgw_data_sync_spawn_window, std::nullopt); + } + } + + tn->log(20, SSTR("shard_id=" << shard_id << + " sync_marker="<< sync_marker.marker + << " next_marker=" << next_marker + << " truncated=" << truncated)); + if (!next_marker.empty()) { + sync_marker.marker = next_marker; + } else if (!log_entries.empty()) { + sync_marker.marker = log_entries.back().log_id; + } + if (!truncated) { + // we reached the end, wait a while before checking for more + tn->unset_flag(RGW_SNS_FLAG_ACTIVE); + yield wait(get_idle_interval()); + } + } while (true); + } + return 0; + } +}; + class RGWDataSyncShardCR : public RGWCoroutine { static constexpr auto OMAP_GET_MAX_ENTRIES = 100; RGWDataSyncCtx *sc; -- 2.39.5