From: Casey Bodley Date: Tue, 31 Mar 2020 13:22:56 +0000 (-0400) Subject: rgw: data sync tracks timestamps for error repo X-Git-Tag: v16.1.0~2586^2~20 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=d03fa0d310c3a610eb84a8de1ec4a649b70b9d02;p=ceph.git rgw: data sync tracks timestamps for error repo Signed-off-by: Casey Bodley --- diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index f2658c9a852..1045e01f242 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -24,6 +24,7 @@ #include "rgw_bucket_sync.h" #include "rgw_metadata.h" #include "rgw_sync_counters.h" +#include "rgw_sync_error_repo.h" #include "rgw_sync_module.h" #include "rgw_sal.h" @@ -1284,6 +1285,7 @@ class RGWDataSyncSingleEntryCR : public RGWCoroutine { string raw_key; string entry_marker; + ceph::real_time entry_timestamp; rgw_bucket_shard source_bs; @@ -1296,16 +1298,15 @@ class RGWDataSyncSingleEntryCR : public RGWCoroutine { boost::intrusive_ptr error_repo; bool remove_from_repo; - set keys; - RGWSyncTraceNodeRef tn; public: RGWDataSyncSingleEntryCR(RGWDataSyncCtx *_sc, - const string& _raw_key, const string& _entry_marker, RGWDataSyncShardMarkerTrack *_marker_tracker, + const string& _raw_key, const string& _entry_marker, + ceph::real_time entry_timestamp, RGWDataSyncShardMarkerTrack *_marker_tracker, RGWOmapAppend *_error_repo, bool _remove_from_repo, const RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), raw_key(_raw_key), entry_marker(_entry_marker), - sync_status(0), + entry_timestamp(entry_timestamp), sync_status(0), marker_tracker(_marker_tracker), error_repo(_error_repo), remove_from_repo(_remove_from_repo) { set_description() << "data sync single entry (source_zone=" << sc->source_zone << ") key=" <<_raw_key << " entry=" << entry_marker; @@ -1352,12 +1353,16 @@ public: tn->log(0, SSTR("ERROR: failed to log sync failure: retcode=" << retcode)); } } - if (error_repo && !error_repo->append(raw_key)) { - tn->log(0, SSTR("ERROR: failed to log sync failure in error repo: retcode=" << retcode)); + if (error_repo) { + yield call(rgw_error_repo_write_cr(sync_env->store->svc()->rados, error_repo->get_obj(), + raw_key, entry_timestamp)); + if (retcode < 0) { + tn->log(0, SSTR("ERROR: failed to log sync failure in error repo: retcode=" << retcode)); + } } } else if (error_repo && remove_from_repo) { - keys = {raw_key}; - yield call(new RGWRadosRemoveOmapKeysCR(sync_env->store, error_repo->get_obj(), keys)); + yield call(rgw_error_repo_remove_cr(sync_env->store->svc()->rados, error_repo->get_obj(), + raw_key, entry_timestamp)); if (retcode < 0) { tn->log(0, SSTR("ERROR: failed to remove omap key from error repo (" << error_repo->get_obj() << " retcode=" << retcode)); @@ -1432,6 +1437,7 @@ class RGWDataSyncShardCR : public RGWCoroutine { RGWOmapAppend *error_repo; std::map error_entries; string error_marker; + ceph::real_time entry_timestamp; int max_error_entries; ceph::coarse_real_time error_retry_time; @@ -1544,6 +1550,7 @@ public: oid = full_data_sync_index_shard_oid(sc->source_zone, shard_id); set_marker_tracker(new RGWDataSyncShardMarkerTrack(sc, status_oid, sync_marker, tn)); total_entries = sync_marker.pos; + entry_timestamp = sync_marker.timestamp; // time when full sync started do { if (!lease_cr->is_locked()) { stop_spawned_services(); @@ -1567,11 +1574,11 @@ public: for (; iter != entries.end(); ++iter) { tn->log(20, SSTR("full sync: " << iter->first)); total_entries++; - if (!marker_tracker->start(iter->first, total_entries, real_time())) { + if (!marker_tracker->start(iter->first, total_entries, entry_timestamp)) { tn->log(0, SSTR("ERROR: cannot start syncing " << iter->first << ". Duplicate entry?")); } else { // fetch remote and write locally - yield spawn(new RGWDataSyncSingleEntryCR(sc, iter->first, iter->first, marker_tracker, error_repo, false, tn), false); + yield spawn(new RGWDataSyncSingleEntryCR(sc, iter->first, iter->first, entry_timestamp, marker_tracker, error_repo, false, tn), false); } sync_marker.marker = iter->first; @@ -1657,7 +1664,7 @@ public: for (modified_iter = current_modified.begin(); modified_iter != current_modified.end(); ++modified_iter) { yield { tn->log(20, SSTR("received async update notification: " << *modified_iter)); - spawn(new RGWDataSyncSingleEntryCR(sc, *modified_iter, string(), marker_tracker, nullptr, false, tn), false); + spawn(new RGWDataSyncSingleEntryCR(sc, *modified_iter, string(), ceph::real_time{}, marker_tracker, nullptr, false, tn), false); } } @@ -1671,8 +1678,11 @@ public: iter = error_entries.begin(); for (; iter != error_entries.end(); ++iter) { error_marker = iter->first; - tn->log(20, SSTR("handle error entry: " << error_marker)); - spawn(new RGWDataSyncSingleEntryCR(sc, error_marker, error_marker, nullptr /* no marker tracker */, error_repo, true, tn), false); + entry_timestamp = rgw_error_repo_decode_value(iter->second); + tn->log(20, SSTR("handle error entry key=" << error_marker << " timestamp=" << entry_timestamp)); + spawn(new RGWDataSyncSingleEntryCR(sc, error_marker, error_marker, + entry_timestamp, nullptr /* no marker tracker */, + error_repo, true, tn), false); } if (!omapvals->more) { if (error_marker.empty() && error_entries.empty()) { @@ -1714,7 +1724,9 @@ public: if (!marker_tracker->start(log_iter->log_id, 0, log_iter->log_timestamp)) { tn->log(0, SSTR("ERROR: cannot start syncing " << log_iter->log_id << ". Duplicate entry?")); } else { - spawn(new RGWDataSyncSingleEntryCR(sc, log_iter->entry.key, log_iter->log_id, marker_tracker, error_repo, false, tn), false); + spawn(new RGWDataSyncSingleEntryCR(sc, log_iter->entry.key, log_iter->log_id, + log_iter->log_timestamp, marker_tracker, + error_repo, false, tn), false); } while ((int)num_spawned() > spawn_window) { set_status() << "num_spawned() > spawn_window";