From: Casey Bodley Date: Tue, 31 Mar 2020 13:22:53 +0000 (-0400) Subject: rgw: DataSyncShardCR uses GetOmapVals X-Git-Tag: v16.1.0~2586^2~22 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=cbd2f7cf13cb070a3bf055617a1f64cef7d7b80f;p=ceph.git rgw: DataSyncShardCR uses GetOmapVals Signed-off-by: Casey Bodley --- diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index e11e7419d345..f2658c9a8523 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -1392,9 +1392,9 @@ class RGWDataSyncShardCR : public RGWCoroutine { uint32_t shard_id; rgw_data_sync_marker& sync_marker; - RGWRadosGetOmapKeysCR::ResultPtr omapkeys; - std::set entries; - std::set::iterator iter; + RGWRadosGetOmapValsCR::ResultPtr omapvals; + std::map entries; + std::map::iterator iter; string oid; @@ -1430,7 +1430,7 @@ class RGWDataSyncShardCR : public RGWCoroutine { string error_oid; RGWOmapAppend *error_repo; - std::set error_entries; + std::map error_entries; string error_marker; int max_error_entries; @@ -1550,31 +1550,30 @@ public: drain_all(); return set_cr_error(-ECANCELED); } - omapkeys = std::make_shared(); - yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, oid), - sync_marker.marker, max_entries, omapkeys)); + omapvals = std::make_shared(); + yield call(new RGWRadosGetOmapValsCR(sync_env->store, rgw_raw_obj(pool, oid), + sync_marker.marker, max_entries, omapvals)); if (retcode < 0) { - tn->log(0, SSTR("ERROR: RGWRadosGetOmapKeysCR() returned ret=" << retcode)); lease_cr->go_down(); drain_all(); return set_cr_error(retcode); } - entries = std::move(omapkeys->entries); + entries = std::move(omapvals->entries); if (entries.size() > 0) { tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */ } tn->log(20, SSTR("retrieved " << entries.size() << " entries to sync")); iter = entries.begin(); for (; iter != entries.end(); ++iter) { - tn->log(20, SSTR("full sync: " << *iter)); + tn->log(20, SSTR("full sync: " << iter->first)); total_entries++; - if (!marker_tracker->start(*iter, total_entries, real_time())) { - tn->log(0, SSTR("ERROR: cannot start syncing " << *iter << ". Duplicate entry?")); + if (!marker_tracker->start(iter->first, total_entries, real_time())) { + tn->log(0, SSTR("ERROR: cannot start syncing " << iter->first << ". Duplicate entry?")); } else { // fetch remote and write locally - yield spawn(new RGWDataSyncSingleEntryCR(sc, *iter, *iter, marker_tracker, error_repo, false, tn), false); + yield spawn(new RGWDataSyncSingleEntryCR(sc, iter->first, iter->first, marker_tracker, error_repo, false, tn), false); } - sync_marker.marker = *iter; + sync_marker.marker = iter->first; while ((int)num_spawned() > spawn_window) { set_status() << "num_spawned() > spawn_window"; @@ -1587,8 +1586,8 @@ public: } } } - } while (omapkeys->more); - omapkeys.reset(); + } while (omapvals->more); + omapvals.reset(); drain_all_but_stack(lease_stack.get()); @@ -1664,18 +1663,18 @@ public: if (error_retry_time <= ceph::coarse_real_clock::now()) { /* process bucket shards that previously failed */ - omapkeys = std::make_shared(); - yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, error_oid), - error_marker, max_error_entries, omapkeys)); - error_entries = std::move(omapkeys->entries); + omapvals = std::make_shared(); + yield call(new RGWRadosGetOmapValsCR(sync_env->store, rgw_raw_obj(pool, error_oid), + error_marker, max_error_entries, omapvals)); + error_entries = std::move(omapvals->entries); tn->log(20, SSTR("read error repo, got " << error_entries.size() << " entries")); iter = error_entries.begin(); for (; iter != error_entries.end(); ++iter) { - error_marker = *iter; + error_marker = iter->first; tn->log(20, SSTR("handle error entry: " << error_marker)); spawn(new RGWDataSyncSingleEntryCR(sc, error_marker, error_marker, nullptr /* no marker tracker */, error_repo, true, tn), false); } - if (!omapkeys->more) { + if (!omapvals->more) { if (error_marker.empty() && error_entries.empty()) { /* the retry repo is empty, we back off a bit before calling it again */ retry_backoff_secs *= 2; @@ -1689,7 +1688,7 @@ public: error_marker.clear(); } } - omapkeys.reset(); + omapvals.reset(); tn->log(20, SSTR("shard_id=" << shard_id << " sync_marker=" << sync_marker.marker)); yield call(new RGWReadRemoteDataLogShardCR(sc, shard_id, sync_marker.marker,