]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: bucket trim: only fetch status from relevant zones
authorYehuda Sadeh <yehuda@redhat.com>
Sat, 18 Jan 2020 00:12:27 +0000 (16:12 -0800)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 28 Jan 2020 18:20:40 +0000 (10:20 -0800)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_bucket_sync.cc
src/rgw/rgw_bucket_sync.h
src/rgw/rgw_trim_bilog.cc

index b62839c0c7899e7029014f701785288dc27463df..5652789021630ee3fa46291873e279796ff96d32 100644 (file)
@@ -838,6 +838,32 @@ multimap<rgw_zone_id, rgw_sync_bucket_pipe> RGWBucketSyncPolicyHandler::get_all_
   return std::move(m);
 }
 
+multimap<rgw_zone_id, rgw_sync_bucket_pipe> RGWBucketSyncPolicyHandler::get_all_dests()
+{
+  multimap<rgw_zone_id, rgw_sync_bucket_pipe> m;
+
+  for (auto& dest_entry : targets) {
+    auto& zone_id = dest_entry.first;
+
+    auto& pipes = dest_entry.second.pipe_map;
+
+    for (auto& entry : pipes) {
+      auto& pipe = entry.second;
+      m.insert(make_pair(zone_id, pipe));
+    }
+  }
+
+  for (auto& pipe : resolved_dests) {
+    if (!pipe.dest.zone) {
+      continue;
+    }
+
+    m.insert(make_pair(*pipe.dest.zone, pipe));
+  }
+
+  return std::move(m);
+}
+
 void RGWBucketSyncPolicyHandler::get_pipes(std::set<rgw_sync_bucket_pipe> *_sources, std::set<rgw_sync_bucket_pipe> *_targets,
                                            std::optional<rgw_sync_bucket_entity> filter_peer) { /* return raw pipes */
   for (auto& entry : source_pipes.pipe_map) {
index e24b33af3e982987f670ebd1761a9519b486e1b8..63ec487f336d788c40dac046623eaf92803796bb 100644 (file)
@@ -367,6 +367,7 @@ public:
   }
 
   multimap<rgw_zone_id, rgw_sync_bucket_pipe> get_all_sources();
+  multimap<rgw_zone_id, rgw_sync_bucket_pipe> get_all_dests();
 
   const  map<rgw_zone_id, RGWBucketSyncFlowManager::pipe_set>& get_targets() {
     return targets;
index ac1ca25df5d972a85fcba444bd8ba94ded9d932a..c71f47c7ebae1612fdfcc6fd6f9f7738fa9f29d5 100644 (file)
@@ -24,6 +24,7 @@
 #include "rgw_trim_bilog.h"
 #include "rgw_cr_rados.h"
 #include "rgw_cr_rest.h"
+#include "rgw_cr_tools.h"
 #include "rgw_data_sync.h"
 #include "rgw_metadata.h"
 #include "rgw_sal.h"
@@ -418,9 +419,12 @@ class BucketTrimInstanceCR : public RGWCoroutine {
   RGWHTTPManager *const http;
   BucketTrimObserver *const observer;
   std::string bucket_instance;
+  rgw_bucket_get_sync_policy_params get_policy_params;
+  std::shared_ptr<rgw_bucket_get_sync_policy_result> source_policy;
   rgw_bucket bucket;
   const std::string& zone_id; //< my zone id
-  RGWBucketInfo bucket_info; //< bucket instance info to locate bucket indices
+  RGWBucketInfo _bucket_info; 
+  const RGWBucketInfo *pbucket_info; //< pointer to bucket instance info to locate bucket indices
   int child_ret = 0;
 
   using StatusShards = std::vector<rgw_bucket_shard_sync_info>;
@@ -434,9 +438,9 @@ class BucketTrimInstanceCR : public RGWCoroutine {
     : RGWCoroutine(store->ctx()), store(store),
       http(http), observer(observer),
       bucket_instance(bucket_instance),
-      zone_id(store->svc()->zone->get_zone().id),
-      peer_status(store->svc()->zone->get_zone_data_notify_to_map().size()) {
+      zone_id(store->svc()->zone->get_zone().id) {
     rgw_bucket_parse_bucket_key(cct, bucket_instance, &bucket, nullptr);
+    source_policy = make_shared<rgw_bucket_get_sync_policy_result>();
   }
 
   int operate() override;
@@ -447,8 +451,30 @@ int BucketTrimInstanceCR::operate()
   reenter(this) {
     ldout(cct, 4) << "starting trim on bucket=" << bucket_instance << dendl;
 
+    get_policy_params.zone = zone_id;
+    get_policy_params.bucket = bucket;
+    yield call(new RGWBucketGetSyncPolicyHandlerCR(store->svc()->rados->get_async_processor(),
+                                                   store,
+                                                   get_policy_params,
+                                                   source_policy));
+    if (retcode < 0) {
+      if (retcode != -ENOENT) {
+        ldout(cct, 0) << "ERROR: failed to fetch policy handler for bucket=" << bucket << dendl;
+      }
+
+      return set_cr_error(retcode);
+    }
+
+    if (auto& opt_bucket_info = source_policy->policy_handler->get_bucket_info();
+        opt_bucket_info) {
+      pbucket_info = &(*opt_bucket_info);
+    } else {
+      /* this shouldn't really happen */
+      return set_cr_error(-ENOENT);
+    }
+
     // query peers for sync status
-    set_status("fetching sync status from peers");
+    set_status("fetching sync status from relevant peers");
     yield {
       // query data sync status from each sync peer
       rgw_http_param_pair params[] = {
@@ -459,17 +485,33 @@ int BucketTrimInstanceCR::operate()
         { nullptr, nullptr }
       };
 
+      const auto& all_dests = source_policy->policy_handler->get_all_dests();
+
+      set<rgw_zone_id> target_zones;
+      rgw_zone_id last_zone;
+      for (const auto& entry : all_dests) {
+        if (entry.first != last_zone) {
+          last_zone = entry.first;
+          target_zones.insert(last_zone);
+        }
+      }
+
+      peer_status.resize(target_zones.size());
+
+      auto& zone_conn_map = store->svc()->zone->get_zone_conn_map();
+
       auto p = peer_status.begin();
-      for (auto& c : store->svc()->zone->get_zone_data_notify_to_map()) {
+      for (auto& zid : target_zones) {
+        auto ziter = zone_conn_map.find(zid);
+        if (ziter == zone_conn_map.end()) {
+          ldout(cct, 0) << "WARNING: no connection to zone " << zid << ", can't trim bucket: " << bucket << dendl;
+          return set_cr_error(-ECANCELED);
+        }
         using StatusCR = RGWReadRESTResourceCR<StatusShards>;
-        spawn(new StatusCR(cct, c.second, http, "/admin/log/", params, &*p),
+        spawn(new StatusCR(cct, ziter->second, http, "/admin/log/", params, &*p),
               false);
         ++p;
       }
-      // in parallel, read the local bucket instance info
-      spawn(new RGWGetBucketInstanceInfoCR(store->svc()->rados->get_async_processor(), store,
-                                           bucket, &bucket_info, nullptr),
-            false);
     }
     // wait for a response from each peer. all must respond to attempt trim
     while (num_spawned()) {
@@ -483,7 +525,7 @@ int BucketTrimInstanceCR::operate()
 
     // initialize each shard with the maximum marker, which is only used when
     // there are no peers syncing from us
-    min_markers.assign(std::max(1u, bucket_info.num_shards),
+    min_markers.assign(std::max(1u, pbucket_info->num_shards),
                        RGWSyncLogTrimCR::max_marker);
 
     // determine the minimum marker for each shard
@@ -495,10 +537,10 @@ int BucketTrimInstanceCR::operate()
     }
 
     // trim shards with a ShardCollectCR
-    ldout(cct, 10) << "trimming bilogs for bucket=" << bucket_info.bucket
+    ldout(cct, 10) << "trimming bilogs for bucket=" << pbucket_info->bucket
        << " markers=" << min_markers << ", shards=" << min_markers.size() << dendl;
     set_status("trimming bilog shards");
-    yield call(new BucketTrimShardCollectCR(store, bucket_info, min_markers));
+    yield call(new BucketTrimShardCollectCR(store, *pbucket_info, min_markers));
     // ENODATA just means there were no keys to trim
     if (retcode == -ENODATA) {
       retcode = 0;