From e320830a7123f915ea30370cb7fc4248e42796d7 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Fri, 20 Jan 2017 11:52:27 -0500 Subject: [PATCH] rgw: use get_omap_keys2 Making looping less fragile. Signed-off-by: Sage Weil Signed-off-by: Casey Bodley --- src/rgw/rgw_cr_rados.cc | 6 ++++-- src/rgw/rgw_cr_rados.h | 4 +++- src/rgw/rgw_data_sync.cc | 18 +++++++++++------- src/rgw/rgw_sync.cc | 5 +++-- 4 files changed, 21 insertions(+), 12 deletions(-) diff --git a/src/rgw/rgw_cr_rados.cc b/src/rgw/rgw_cr_rados.cc index b1bf4fa66d9..48a421f95f1 100644 --- a/src/rgw/rgw_cr_rados.cc +++ b/src/rgw/rgw_cr_rados.cc @@ -249,10 +249,12 @@ int RGWRadosSetOmapKeysCR::request_complete() RGWRadosGetOmapKeysCR::RGWRadosGetOmapKeysCR(RGWRados *_store, const rgw_raw_obj& _obj, const string& _marker, - std::set *_entries, int _max_entries) : RGWSimpleCoroutine(_store->ctx()), + std::set *_entries, + int _max_entries, bool *_pmore) : RGWSimpleCoroutine(_store->ctx()), store(_store), marker(_marker), entries(_entries), max_entries(_max_entries), + pmore(_pmore), obj(_obj), cn(NULL) { set_description() << "get omap keys dest=" << obj << " marker=" << marker; @@ -268,7 +270,7 @@ int RGWRadosGetOmapKeysCR::send_request() { set_status() << "send request"; librados::ObjectReadOperation op; - op.omap_get_keys2(marker, max_entries, entries, nullptr, nullptr); + op.omap_get_keys2(marker, max_entries, entries, pmore, nullptr); cn = stack->create_completion_notifier(); return ref.ioctx.aio_operate(ref.oid, cn->completion(), &op, NULL); diff --git a/src/rgw/rgw_cr_rados.h b/src/rgw/rgw_cr_rados.h index 28db351308b..13d6a29c325 100644 --- a/src/rgw/rgw_cr_rados.h +++ b/src/rgw/rgw_cr_rados.h @@ -412,6 +412,7 @@ class RGWRadosGetOmapKeysCR : public RGWSimpleCoroutine { string marker; std::set *entries; int max_entries; + bool *pmore; rgw_rados_ref ref; @@ -423,7 +424,8 @@ public: RGWRadosGetOmapKeysCR(RGWRados *_store, const rgw_raw_obj& _obj, const string& _marker, - std::set *_entries, int _max_entries); + std::set *_entries, + int _max_entries, bool *pmore); int send_request() override; int request_complete() override; diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index 246f67b24e2..a19dc8c7b06 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -181,7 +181,7 @@ bool RGWReadDataSyncRecoveringShardsCR::spawn_next() string error_oid = RGWDataSyncStatusManager::shard_obj_name(env->source_zone, shard_id) + ".retry"; spawn(new RGWRadosGetOmapKeysCR(env->store, rgw_raw_obj(env->store->get_zone_params().log_pool, error_oid), - marker, &entries_map[shard_id], max_entries), false); + marker, &entries_map[shard_id], max_entries, nullptr), false); ++shard_id; return true; @@ -1162,6 +1162,7 @@ class RGWDataSyncShardCR : public RGWCoroutine { std::set entries; std::set::iterator iter; + bool more = false; string oid; @@ -1321,7 +1322,9 @@ public: set_marker_tracker(new RGWDataSyncShardMarkerTrack(sync_env, status_oid, sync_marker, tn)); total_entries = sync_marker.pos; do { - yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, oid), sync_marker.marker, &entries, max_entries)); + yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, oid), + sync_marker.marker, &entries, + max_entries, &more)); if (retcode < 0) { tn->log(0, SSTR("ERROR: RGWRadosGetOmapKeysCR() returned ret=" << retcode)); lease_cr->go_down(); @@ -1349,7 +1352,7 @@ public: } sync_marker.marker = *iter; } - } while ((int)entries.size() == max_entries); + } while (more); tn->unset_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */ @@ -1416,7 +1419,7 @@ public: /* process bucket shards that previously failed */ yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, error_oid), error_marker, &error_entries, - max_error_entries)); + max_error_entries, &more)); tn->log(20, SSTR("read error repo, got " << error_entries.size() << " entries")); iter = error_entries.begin(); for (; iter != error_entries.end(); ++iter) { @@ -1424,7 +1427,7 @@ public: tn->log(20, SSTR("handle error entry: " << error_marker)); spawn(new RGWDataSyncSingleEntryCR(sync_env, error_marker, error_marker, nullptr /* no marker tracker */, error_repo, true, tn), false); } - if ((int)error_entries.size() != max_error_entries) { + if (!more) { if (error_marker.empty() && error_entries.empty()) { /* the retry repo is empty, we back off a bit before calling it again */ retry_backoff_secs *= 2; @@ -2103,6 +2106,7 @@ class RGWReadRecoveringBucketShardsCoroutine : public RGWCoroutine { set error_entries; int max_omap_entries; + bool more = false; int count; public: @@ -2125,7 +2129,7 @@ int RGWReadRecoveringBucketShardsCoroutine::operate() count = 0; do { yield call(new RGWRadosGetOmapKeysCR(store, rgw_raw_obj(store->get_zone_params().log_pool, error_oid), - marker, &error_entries, max_omap_entries)); + marker, &error_entries, max_omap_entries, &more)); if (retcode == -ENOENT) { break; @@ -2145,7 +2149,7 @@ int RGWReadRecoveringBucketShardsCoroutine::operate() marker = *error_entries.rbegin(); recovering_buckets.insert(std::make_move_iterator(error_entries.begin()), std::make_move_iterator(error_entries.end())); - }while((int)error_entries.size() == max_omap_entries && count < max_entries); + } while(more && count < max_entries); return set_cr_done(); } diff --git a/src/rgw/rgw_sync.cc b/src/rgw/rgw_sync.cc index 17ac58ca9ec..bf0dde2dec5 100644 --- a/src/rgw/rgw_sync.cc +++ b/src/rgw/rgw_sync.cc @@ -1385,6 +1385,7 @@ class RGWMetaSyncShardCR : public RGWCoroutine { const std::string& period_marker; //< max marker stored in next period std::set entries; + bool more = false; std::set::iterator iter; string oid; @@ -1572,7 +1573,7 @@ public: break; } yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, oid), - marker, &entries, max_entries)); + marker, &entries, max_entries, &more)); if (retcode < 0) { ldout(sync_env->cct, 0) << "ERROR: " << __func__ << "(): RGWRadosGetOmapKeysCR() returned ret=" << retcode << dendl; tn->log(0, SSTR("ERROR: failed to list omap keys, status=" << retcode)); @@ -1602,7 +1603,7 @@ public: } } collect_children(); - } while ((int)entries.size() == max_entries && can_adjust_marker); + } while (more && can_adjust_marker); tn->unset_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */ -- 2.39.5