From: Casey Bodley Date: Fri, 17 Aug 2018 17:15:49 +0000 (-0400) Subject: rgw: RGWRadosGetOmapKeysCR takes result by shared_ptr X-Git-Tag: v13.2.3~47^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=refs%2Fpull%2F24912%2Fhead;p=ceph.git rgw: RGWRadosGetOmapKeysCR takes result by shared_ptr Fixes: http://tracker.ceph.com/issues/21154 Signed-off-by: Casey Bodley (cherry picked from commit fd77ff74ae47bf09553186a5a5e79ec13a9de16d) Conflicts: src/rgw/rgw_data_sync.cc - in mimic, the entire "process bucket shards that previously failed" code block is enclosed in an if statement - in master, this is not the case --- diff --git a/src/rgw/rgw_cr_rados.cc b/src/rgw/rgw_cr_rados.cc index 706e84c4ab0d..2a839b325358 100644 --- a/src/rgw/rgw_cr_rados.cc +++ b/src/rgw/rgw_cr_rados.cc @@ -256,19 +256,18 @@ int RGWRadosSetOmapKeysCR::request_complete() RGWRadosGetOmapKeysCR::RGWRadosGetOmapKeysCR(RGWRados *_store, const rgw_raw_obj& _obj, const string& _marker, - std::set *_entries, - int _max_entries, bool *_pmore) : RGWSimpleCoroutine(_store->ctx()), - store(_store), - marker(_marker), - entries(_entries), max_entries(_max_entries), - pmore(_pmore), - obj(_obj), cn(NULL) + int _max_entries, + ResultPtr _result) + : RGWSimpleCoroutine(_store->ctx()), store(_store), obj(_obj), + marker(_marker), max_entries(_max_entries), + result(std::move(_result)) { + ceph_assert(result); // must be allocated set_description() << "get omap keys dest=" << obj << " marker=" << marker; } int RGWRadosGetOmapKeysCR::send_request() { - int r = store->get_raw_obj_ref(obj, &ref); + int r = store->get_raw_obj_ref(obj, &result->ref); if (r < 0) { lderr(store->ctx()) << "ERROR: failed to get ref for (" << obj << ") ret=" << r << dendl; return r; @@ -277,10 +276,10 @@ int RGWRadosGetOmapKeysCR::send_request() { set_status() << "send request"; librados::ObjectReadOperation op; - op.omap_get_keys2(marker, max_entries, entries, pmore, nullptr); + op.omap_get_keys2(marker, max_entries, &result->entries, &result->more, nullptr); - cn = stack->create_completion_notifier(); - return ref.ioctx.aio_operate(ref.oid, cn->completion(), &op, NULL); + cn = stack->create_completion_notifier(result); + return result->ref.ioctx.aio_operate(result->ref.oid, cn->completion(), &op, NULL); } int RGWRadosGetOmapKeysCR::request_complete() diff --git a/src/rgw/rgw_cr_rados.h b/src/rgw/rgw_cr_rados.h index 37b5bdd911ec..b8ff6128ed58 100644 --- a/src/rgw/rgw_cr_rados.h +++ b/src/rgw/rgw_cr_rados.h @@ -394,28 +394,28 @@ public: }; class RGWRadosGetOmapKeysCR : public RGWSimpleCoroutine { - RGWRados *store; + public: + struct Result { + rgw_rados_ref ref; + std::set entries; + bool more = false; + }; + using ResultPtr = std::shared_ptr; - string marker; - std::set *entries; - int max_entries; - bool *pmore; + RGWRadosGetOmapKeysCR(RGWRados *_store, const rgw_raw_obj& _obj, + const string& _marker, int _max_entries, + ResultPtr result); - rgw_rados_ref ref; + int send_request() override; + int request_complete() override; + private: + RGWRados *store; rgw_raw_obj obj; - + string marker; + int max_entries; + ResultPtr result; boost::intrusive_ptr cn; - -public: - RGWRadosGetOmapKeysCR(RGWRados *_store, - const rgw_raw_obj& _obj, - const string& _marker, - std::set *_entries, - int _max_entries, bool *pmore); - - int send_request() override; - int request_complete() override; }; class RGWRadosRemoveOmapKeysCR : public RGWSimpleCoroutine { diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index 819fc0dd08ee..ca2f1aec2b94 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -163,16 +163,16 @@ class RGWReadDataSyncRecoveringShardsCR : public RGWShardCollectCR { uint64_t max_entries; int num_shards; - int shard_id{0};; + int shard_id{0}; string marker; - map> &entries_map; + std::vector& omapkeys; public: RGWReadDataSyncRecoveringShardsCR(RGWDataSyncEnv *env, uint64_t _max_entries, int _num_shards, - map>& _entries_map) + std::vector& omapkeys) : RGWShardCollectCR(env->cct, MAX_CONCURRENT_SHARDS), env(env), - max_entries(_max_entries), num_shards(_num_shards), entries_map(_entries_map) + max_entries(_max_entries), num_shards(_num_shards), omapkeys(omapkeys) {} bool spawn_next() override; }; @@ -183,8 +183,10 @@ bool RGWReadDataSyncRecoveringShardsCR::spawn_next() return false; string error_oid = RGWDataSyncStatusManager::shard_obj_name(env->source_zone, shard_id) + ".retry"; + auto& shard_keys = omapkeys[shard_id]; + shard_keys = std::make_shared(); spawn(new RGWRadosGetOmapKeysCR(env->store, rgw_raw_obj(env->store->get_zone_params().log_pool, error_oid), - marker, &entries_map[shard_id], max_entries, nullptr), false); + marker, max_entries, shard_keys), false); ++shard_id; return true; @@ -722,15 +724,16 @@ int RGWRemoteDataLog::read_recovering_shards(const int num_shards, set& rec } RGWDataSyncEnv sync_env_local = sync_env; sync_env_local.http_manager = &http_manager; - map> entries_map; + std::vector omapkeys; + omapkeys.resize(num_shards); uint64_t max_entries{1}; - ret = crs.run(new RGWReadDataSyncRecoveringShardsCR(&sync_env_local, max_entries, num_shards, entries_map)); + ret = crs.run(new RGWReadDataSyncRecoveringShardsCR(&sync_env_local, max_entries, num_shards, omapkeys)); http_manager.stop(); if (ret == 0) { - for (const auto& entry : entries_map) { - if (entry.second.size() != 0) { - recovering_shards.insert(entry.first); + for (int i = 0; i < num_shards; i++) { + if (omapkeys[i]->entries.size() != 0) { + recovering_shards.insert(i); } } } @@ -1154,9 +1157,9 @@ class RGWDataSyncShardCR : public RGWCoroutine { uint32_t shard_id; rgw_data_sync_marker sync_marker; + RGWRadosGetOmapKeysCR::ResultPtr omapkeys; std::set entries; std::set::iterator iter; - bool more = false; string oid; @@ -1317,15 +1320,16 @@ public: drain_all(); return set_cr_error(-ECANCELED); } + omapkeys = std::make_shared(); yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, oid), - sync_marker.marker, &entries, - max_entries, &more)); + sync_marker.marker, max_entries, omapkeys)); if (retcode < 0) { tn->log(0, SSTR("ERROR: RGWRadosGetOmapKeysCR() returned ret=" << retcode)); lease_cr->go_down(); drain_all(); return set_cr_error(retcode); } + entries = std::move(omapkeys->entries); if (entries.size() > 0) { tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */ } @@ -1353,7 +1357,8 @@ public: } } } - } while (more); + } while (omapkeys->more); + omapkeys.reset(); drain_all_but_stack(lease_stack.get()); @@ -1430,9 +1435,10 @@ public: if (error_retry_time <= ceph::coarse_real_clock::now()) { /* process bucket shards that previously failed */ + omapkeys = std::make_shared(); yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, error_oid), - error_marker, &error_entries, - max_error_entries, &more)); + error_marker, max_error_entries, omapkeys)); + error_entries = std::move(omapkeys->entries); tn->log(20, SSTR("read error repo, got " << error_entries.size() << " entries")); iter = error_entries.begin(); for (; iter != error_entries.end(); ++iter) { @@ -1440,7 +1446,7 @@ public: tn->log(20, SSTR("handle error entry: " << error_marker)); spawn(new RGWDataSyncSingleEntryCR(sync_env, error_marker, error_marker, nullptr /* no marker tracker */, error_repo, true, tn), false); } - if (!more) { + if (!omapkeys->more) { if (error_marker.empty() && error_entries.empty()) { /* the retry repo is empty, we back off a bit before calling it again */ retry_backoff_secs *= 2; @@ -1454,6 +1460,7 @@ public: error_marker.clear(); } } + omapkeys.reset(); #define INCREMENTAL_MAX_ENTRIES 100 tn->log(20, SSTR("shard_id=" << shard_id << " sync_marker=" << sync_marker.marker)); @@ -2123,9 +2130,9 @@ class RGWReadRecoveringBucketShardsCoroutine : public RGWCoroutine { string marker; string error_oid; + RGWRadosGetOmapKeysCR::ResultPtr omapkeys; set error_entries; int max_omap_entries; - bool more = false; int count; public: @@ -2147,8 +2154,9 @@ int RGWReadRecoveringBucketShardsCoroutine::operate() //read recovering bucket shards count = 0; do { + omapkeys = std::make_shared(); yield call(new RGWRadosGetOmapKeysCR(store, rgw_raw_obj(store->get_zone_params().log_pool, error_oid), - marker, &error_entries, max_omap_entries, &more)); + marker, max_omap_entries, omapkeys)); if (retcode == -ENOENT) { break; @@ -2160,6 +2168,7 @@ int RGWReadRecoveringBucketShardsCoroutine::operate() return set_cr_error(retcode); } + error_entries = std::move(omapkeys->entries); if (error_entries.empty()) { break; } @@ -2168,7 +2177,7 @@ int RGWReadRecoveringBucketShardsCoroutine::operate() marker = *error_entries.rbegin(); recovering_buckets.insert(std::make_move_iterator(error_entries.begin()), std::make_move_iterator(error_entries.end())); - } while(more && count < max_entries); + } while (omapkeys->more && count < max_entries); return set_cr_done(); } diff --git a/src/rgw/rgw_sync.cc b/src/rgw/rgw_sync.cc index 11953426bc12..c4979bd53461 100644 --- a/src/rgw/rgw_sync.cc +++ b/src/rgw/rgw_sync.cc @@ -1384,8 +1384,8 @@ class RGWMetaSyncShardCR : public RGWCoroutine { string max_marker; const std::string& period_marker; //< max marker stored in next period + RGWRadosGetOmapKeysCR::ResultPtr omapkeys; std::set entries; - bool more = false; std::set::iterator iter; string oid; @@ -1572,8 +1572,9 @@ public: lost_lock = true; break; } + omapkeys = std::make_shared(); yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, oid), - marker, &entries, max_entries, &more)); + marker, max_entries, omapkeys)); if (retcode < 0) { ldout(sync_env->cct, 0) << "ERROR: " << __func__ << "(): RGWRadosGetOmapKeysCR() returned ret=" << retcode << dendl; tn->log(0, SSTR("ERROR: failed to list omap keys, status=" << retcode)); @@ -1581,6 +1582,7 @@ public: drain_all(); return retcode; } + entries = std::move(omapkeys->entries); tn->log(20, SSTR("retrieved " << entries.size() << " entries to sync")); if (entries.size() > 0) { tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */ @@ -1603,7 +1605,7 @@ public: } } collect_children(); - } while (more && can_adjust_marker); + } while (omapkeys->more && can_adjust_marker); tn->unset_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */