From bd2fd97ff9efb3891a541c6476a4befe7a4c0622 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Fri, 17 Jan 2020 16:12:27 -0800 Subject: [PATCH] rgw: bucket trim: only fetch status from relevant zones Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_bucket_sync.cc | 26 +++++++++++++++ src/rgw/rgw_bucket_sync.h | 1 + src/rgw/rgw_trim_bilog.cc | 68 ++++++++++++++++++++++++++++++-------- 3 files changed, 82 insertions(+), 13 deletions(-) diff --git a/src/rgw/rgw_bucket_sync.cc b/src/rgw/rgw_bucket_sync.cc index b62839c0c7899..5652789021630 100644 --- a/src/rgw/rgw_bucket_sync.cc +++ b/src/rgw/rgw_bucket_sync.cc @@ -838,6 +838,32 @@ multimap RGWBucketSyncPolicyHandler::get_all_ return std::move(m); } +multimap RGWBucketSyncPolicyHandler::get_all_dests() +{ + multimap m; + + for (auto& dest_entry : targets) { + auto& zone_id = dest_entry.first; + + auto& pipes = dest_entry.second.pipe_map; + + for (auto& entry : pipes) { + auto& pipe = entry.second; + m.insert(make_pair(zone_id, pipe)); + } + } + + for (auto& pipe : resolved_dests) { + if (!pipe.dest.zone) { + continue; + } + + m.insert(make_pair(*pipe.dest.zone, pipe)); + } + + return std::move(m); +} + void RGWBucketSyncPolicyHandler::get_pipes(std::set *_sources, std::set *_targets, std::optional filter_peer) { /* return raw pipes */ for (auto& entry : source_pipes.pipe_map) { diff --git a/src/rgw/rgw_bucket_sync.h b/src/rgw/rgw_bucket_sync.h index e24b33af3e982..63ec487f336d7 100644 --- a/src/rgw/rgw_bucket_sync.h +++ b/src/rgw/rgw_bucket_sync.h @@ -367,6 +367,7 @@ public: } multimap get_all_sources(); + multimap get_all_dests(); const map& get_targets() { return targets; diff --git a/src/rgw/rgw_trim_bilog.cc b/src/rgw/rgw_trim_bilog.cc index ac1ca25df5d97..c71f47c7ebae1 100644 --- a/src/rgw/rgw_trim_bilog.cc +++ b/src/rgw/rgw_trim_bilog.cc @@ -24,6 +24,7 @@ #include "rgw_trim_bilog.h" #include "rgw_cr_rados.h" #include "rgw_cr_rest.h" +#include "rgw_cr_tools.h" #include "rgw_data_sync.h" #include "rgw_metadata.h" #include "rgw_sal.h" @@ -418,9 +419,12 @@ class BucketTrimInstanceCR : public RGWCoroutine { RGWHTTPManager *const http; BucketTrimObserver *const observer; std::string bucket_instance; + rgw_bucket_get_sync_policy_params get_policy_params; + std::shared_ptr source_policy; rgw_bucket bucket; const std::string& zone_id; //< my zone id - RGWBucketInfo bucket_info; //< bucket instance info to locate bucket indices + RGWBucketInfo _bucket_info; + const RGWBucketInfo *pbucket_info; //< pointer to bucket instance info to locate bucket indices int child_ret = 0; using StatusShards = std::vector; @@ -434,9 +438,9 @@ class BucketTrimInstanceCR : public RGWCoroutine { : RGWCoroutine(store->ctx()), store(store), http(http), observer(observer), bucket_instance(bucket_instance), - zone_id(store->svc()->zone->get_zone().id), - peer_status(store->svc()->zone->get_zone_data_notify_to_map().size()) { + zone_id(store->svc()->zone->get_zone().id) { rgw_bucket_parse_bucket_key(cct, bucket_instance, &bucket, nullptr); + source_policy = make_shared(); } int operate() override; @@ -447,8 +451,30 @@ int BucketTrimInstanceCR::operate() reenter(this) { ldout(cct, 4) << "starting trim on bucket=" << bucket_instance << dendl; + get_policy_params.zone = zone_id; + get_policy_params.bucket = bucket; + yield call(new RGWBucketGetSyncPolicyHandlerCR(store->svc()->rados->get_async_processor(), + store, + get_policy_params, + source_policy)); + if (retcode < 0) { + if (retcode != -ENOENT) { + ldout(cct, 0) << "ERROR: failed to fetch policy handler for bucket=" << bucket << dendl; + } + + return set_cr_error(retcode); + } + + if (auto& opt_bucket_info = source_policy->policy_handler->get_bucket_info(); + opt_bucket_info) { + pbucket_info = &(*opt_bucket_info); + } else { + /* this shouldn't really happen */ + return set_cr_error(-ENOENT); + } + // query peers for sync status - set_status("fetching sync status from peers"); + set_status("fetching sync status from relevant peers"); yield { // query data sync status from each sync peer rgw_http_param_pair params[] = { @@ -459,17 +485,33 @@ int BucketTrimInstanceCR::operate() { nullptr, nullptr } }; + const auto& all_dests = source_policy->policy_handler->get_all_dests(); + + set target_zones; + rgw_zone_id last_zone; + for (const auto& entry : all_dests) { + if (entry.first != last_zone) { + last_zone = entry.first; + target_zones.insert(last_zone); + } + } + + peer_status.resize(target_zones.size()); + + auto& zone_conn_map = store->svc()->zone->get_zone_conn_map(); + auto p = peer_status.begin(); - for (auto& c : store->svc()->zone->get_zone_data_notify_to_map()) { + for (auto& zid : target_zones) { + auto ziter = zone_conn_map.find(zid); + if (ziter == zone_conn_map.end()) { + ldout(cct, 0) << "WARNING: no connection to zone " << zid << ", can't trim bucket: " << bucket << dendl; + return set_cr_error(-ECANCELED); + } using StatusCR = RGWReadRESTResourceCR; - spawn(new StatusCR(cct, c.second, http, "/admin/log/", params, &*p), + spawn(new StatusCR(cct, ziter->second, http, "/admin/log/", params, &*p), false); ++p; } - // in parallel, read the local bucket instance info - spawn(new RGWGetBucketInstanceInfoCR(store->svc()->rados->get_async_processor(), store, - bucket, &bucket_info, nullptr), - false); } // wait for a response from each peer. all must respond to attempt trim while (num_spawned()) { @@ -483,7 +525,7 @@ int BucketTrimInstanceCR::operate() // initialize each shard with the maximum marker, which is only used when // there are no peers syncing from us - min_markers.assign(std::max(1u, bucket_info.num_shards), + min_markers.assign(std::max(1u, pbucket_info->num_shards), RGWSyncLogTrimCR::max_marker); // determine the minimum marker for each shard @@ -495,10 +537,10 @@ int BucketTrimInstanceCR::operate() } // trim shards with a ShardCollectCR - ldout(cct, 10) << "trimming bilogs for bucket=" << bucket_info.bucket + ldout(cct, 10) << "trimming bilogs for bucket=" << pbucket_info->bucket << " markers=" << min_markers << ", shards=" << min_markers.size() << dendl; set_status("trimming bilog shards"); - yield call(new BucketTrimShardCollectCR(store, bucket_info, min_markers)); + yield call(new BucketTrimShardCollectCR(store, *pbucket_info, min_markers)); // ENODATA just means there were no keys to trim if (retcode == -ENODATA) { retcode = 0; -- 2.39.5