]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: bucket autotrim: fetch aggregated bilog info of all bucket targets
authorYehuda Sadeh <yehuda@redhat.com>
Sat, 18 Jan 2020 01:33:41 +0000 (17:33 -0800)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 28 Jan 2020 18:20:40 +0000 (10:20 -0800)
Add a new option to the bilog status request that allows returning the
min status for all buckets in a zone that sync from a specific bucket.

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_admin.cc
src/rgw/rgw_bucket_sync.cc
src/rgw/rgw_bucket_sync.h
src/rgw/rgw_data_sync.cc
src/rgw/rgw_data_sync.h
src/rgw/rgw_rest_log.cc
src/rgw/rgw_trim_bilog.cc

index a1b44bf5665aefd283cb3441dedeca3c53a1cd9f..0fdb6e5976cf325b3ea3a101c9f31927d53e14b4 100644 (file)
@@ -2266,9 +2266,22 @@ static int bucket_source_sync_status(rgw::sal::RGWRadosStore *store, const RGWZo
     out << indented{width} << "does not sync from zone\n";
     return 0;
   }
+
+  if (!pipe.source.bucket) {
+    lderr(store->ctx()) << __func__ << "(): missing source bucket" << dendl;
+    return -EINVAL;
+  }
+
   RGWBucketInfo source_bucket_info;
+  rgw_bucket source_bucket;
+  int r = init_bucket(*pipe.source.bucket, source_bucket_info, source_bucket);
+  if (r < 0) {
+    lderr(store->ctx()) << "failed to read source bucket info: " << cpp_strerror(r) << dendl;
+    return r;
+  }
+
   std::vector<rgw_bucket_shard_sync_info> status;
-  int r = rgw_bucket_sync_status(dpp(), store, pipe, bucket_info, &source_bucket_info, &status);
+  r = rgw_bucket_sync_status(dpp(), store, pipe, bucket_info, &source_bucket_info, &status);
   if (r < 0) {
     lderr(store->ctx()) << "failed to read bucket sync status: " << cpp_strerror(r) << dendl;
     return r;
index 5652789021630ee3fa46291873e279796ff96d32..da4556a8b6de5dbfc692fc2e5d6bd17c687c4f0f 100644 (file)
@@ -812,7 +812,7 @@ void RGWBucketSyncPolicyHandler::reflect(RGWBucketSyncFlowManager::pipe_set *pso
   }
 }
 
-multimap<rgw_zone_id, rgw_sync_bucket_pipe> RGWBucketSyncPolicyHandler::get_all_sources()
+multimap<rgw_zone_id, rgw_sync_bucket_pipe> RGWBucketSyncPolicyHandler::get_all_sources() const
 {
   multimap<rgw_zone_id, rgw_sync_bucket_pipe> m;
 
@@ -838,7 +838,7 @@ multimap<rgw_zone_id, rgw_sync_bucket_pipe> RGWBucketSyncPolicyHandler::get_all_
   return std::move(m);
 }
 
-multimap<rgw_zone_id, rgw_sync_bucket_pipe> RGWBucketSyncPolicyHandler::get_all_dests()
+multimap<rgw_zone_id, rgw_sync_bucket_pipe> RGWBucketSyncPolicyHandler::get_all_dests() const
 {
   multimap<rgw_zone_id, rgw_sync_bucket_pipe> m;
 
@@ -861,7 +861,33 @@ multimap<rgw_zone_id, rgw_sync_bucket_pipe> RGWBucketSyncPolicyHandler::get_all_
     m.insert(make_pair(*pipe.dest.zone, pipe));
   }
 
-  return std::move(m);
+  return m;
+}
+
+multimap<rgw_zone_id, rgw_sync_bucket_pipe> RGWBucketSyncPolicyHandler::get_all_dests_in_zone(const rgw_zone_id& zone_id) const
+{
+  multimap<rgw_zone_id, rgw_sync_bucket_pipe> m;
+
+  auto iter = targets.find(zone_id);
+  if (iter != targets.end()) {
+    auto& pipes = iter->second.pipe_map;
+
+    for (auto& entry : pipes) {
+      auto& pipe = entry.second;
+      m.insert(make_pair(zone_id, pipe));
+    }
+  }
+
+  for (auto& pipe : resolved_dests) {
+    if (!pipe.dest.zone ||
+        *pipe.dest.zone != zone_id) {
+      continue;
+    }
+
+    m.insert(make_pair(*pipe.dest.zone, pipe));
+  }
+
+  return m;
 }
 
 void RGWBucketSyncPolicyHandler::get_pipes(std::set<rgw_sync_bucket_pipe> *_sources, std::set<rgw_sync_bucket_pipe> *_targets,
index 63ec487f336d788c40dac046623eaf92803796bb..f6a19261eba5cfc3a03f3278bd44646b2abf0435 100644 (file)
@@ -366,8 +366,9 @@ public:
     return sources;
   }
 
-  multimap<rgw_zone_id, rgw_sync_bucket_pipe> get_all_sources();
-  multimap<rgw_zone_id, rgw_sync_bucket_pipe> get_all_dests();
+  multimap<rgw_zone_id, rgw_sync_bucket_pipe> get_all_sources() const;
+  multimap<rgw_zone_id, rgw_sync_bucket_pipe> get_all_dests() const;
+  multimap<rgw_zone_id, rgw_sync_bucket_pipe> get_all_dests_in_zone(const rgw_zone_id& zone_id) const;
 
   const  map<rgw_zone_id, RGWBucketSyncFlowManager::pipe_set>& get_targets() {
     return targets;
index 91fc9e1b252cff92b5127d4630665333e4c79649..adfc6babc2e4bd9dd0d35589de764233676a9048 100644 (file)
@@ -4895,7 +4895,7 @@ int rgw_bucket_sync_status(const DoutPrefixProvider *dpp,
                            rgw::sal::RGWRadosStore *store,
                            const rgw_sync_bucket_pipe& pipe,
                            const RGWBucketInfo& dest_bucket_info,
-                           RGWBucketInfo *psource_bucket_info,
+                           const RGWBucketInfo *psource_bucket_info,
                            std::vector<rgw_bucket_shard_sync_info> *status)
 {
   if (!pipe.source.zone ||
@@ -4914,18 +4914,19 @@ int rgw_bucket_sync_status(const DoutPrefixProvider *dpp,
 
   RGWBucketInfo source_bucket_info;
 
-  auto& bucket_ctl = store->getRados()->ctl.bucket;
+  if (!psource_bucket_info) {
+    auto& bucket_ctl = store->getRados()->ctl.bucket;
 
-  int ret = bucket_ctl->read_bucket_info(source_bucket, &source_bucket_info, null_yield);
-  if (ret < 0) {
-    ldpp_dout(dpp, 0) << "ERROR: failed to get bucket instance info: bucket=" << source_bucket << ": " << cpp_strerror(-ret) << dendl;
-    return ret;
-  }
+    int ret = bucket_ctl->read_bucket_info(source_bucket, &source_bucket_info, null_yield);
+    if (ret < 0) {
+      ldpp_dout(dpp, 0) << "ERROR: failed to get bucket instance info: bucket=" << source_bucket << ": " << cpp_strerror(-ret) << dendl;
+      return ret;
+    }
 
-  if (psource_bucket_info) {
-    *psource_bucket_info = source_bucket_info;
+    psource_bucket_info = &source_bucket_info;
   }
 
+
   RGWDataSyncEnv env;
   RGWSyncModuleInstanceRef module; // null sync module
   env.init(dpp, store->ctx(), store, store->svc(), store->svc()->rados->get_async_processor(),
@@ -4936,7 +4937,7 @@ int rgw_bucket_sync_status(const DoutPrefixProvider *dpp,
 
   RGWCoroutinesManager crs(store->ctx(), store->getRados()->get_cr_registry());
   return crs.run(new RGWCollectBucketSyncStatusCR(store, &sc,
-                                                  source_bucket_info,
+                                                  *psource_bucket_info,
                                                   dest_bucket_info,
                                                   status));
 }
index 6e841ca8a5ab5915baf159a455987e5b55392f42..29267ca40c45c741cc211949d01e744a3930894e 100644 (file)
@@ -650,7 +650,7 @@ int rgw_bucket_sync_status(const DoutPrefixProvider *dpp,
                            rgw::sal::RGWRadosStore *store,
                            const rgw_sync_bucket_pipe& pipe,
                            const RGWBucketInfo& dest_bucket_info,
-                           RGWBucketInfo *psource_bucket_info,
+                           const RGWBucketInfo *psource_bucket_info,
                            std::vector<rgw_bucket_shard_sync_info> *status);
 
 class RGWDefaultSyncModule : public RGWSyncModule {
index 53ac3f386536b97a9a7689e0b5e3c4dc34590e71..735e652014a0cad3fa2c00e3c547d6d54517e891 100644 (file)
@@ -832,8 +832,14 @@ public:
 
 void RGWOp_BILog_Status::execute()
 {
+  const auto options = s->info.args.get("options");
+  bool merge = (options == "merge");
   const auto source_zone = s->info.args.get("source-zone");
-  const auto key = s->info.args.get("bucket");
+  const auto source_key = s->info.args.get("source-bucket");
+  auto key = s->info.args.get("bucket");
+  if (key.empty()) {
+    key = source_key;
+  }
   if (key.empty()) {
     ldpp_dout(s, 4) << "no 'bucket' provided" << dendl;
     http_ret = -EINVAL;
@@ -844,7 +850,7 @@ void RGWOp_BILog_Status::execute()
   int shard_id{-1}; // unused
   http_ret = rgw_bucket_parse_bucket_key(s->cct, key, &bucket, &shard_id);
   if (http_ret < 0) {
-    ldpp_dout(s, 4) << "no 'bucket' provided" << dendl;
+    ldpp_dout(s, 4) << "invalid 'bucket' provided" << dendl;
     http_ret = -EINVAL;
     return;
   }
@@ -857,12 +863,109 @@ void RGWOp_BILog_Status::execute()
     ldpp_dout(s, 4) << "failed to read bucket info: " << cpp_strerror(http_ret) << dendl;
     return;
   }
-  rgw_sync_bucket_pipe pipe;
-  pipe.source.zone = source_zone;
-  pipe.source.bucket = info.bucket;
-  pipe.dest.zone = store->svc()->zone->zone_id();
-  pipe.dest.bucket = info.bucket;
-  http_ret = rgw_bucket_sync_status(this, store, pipe, info, nullptr, &status);
+
+  rgw_bucket source_bucket;
+
+  if (source_key.empty()) {
+    source_bucket = info.bucket;
+  } else {
+    http_ret = rgw_bucket_parse_bucket_key(s->cct, source_key, &source_bucket, nullptr);
+    if (http_ret < 0) {
+      ldpp_dout(s, 4) << "invalid 'source-bucket' provided (key=" << source_key << ")" << dendl;
+      return;
+    }
+  }
+
+  const auto& local_zone_id = store->svc()->zone->zone_id();
+
+  if (!merge) {
+    rgw_sync_bucket_pipe pipe;
+    pipe.source.zone = source_zone;
+    pipe.source.bucket = source_bucket;
+    pipe.dest.zone = local_zone_id;
+    pipe.dest.bucket = info.bucket;
+
+    ldout(s->cct, 20) << "RGWOp_BILog_Status::execute(): getting sync status for pipe=" << pipe << dendl;
+
+    http_ret = rgw_bucket_sync_status(this, store, pipe, info, nullptr, &status);
+
+    if (http_ret < 0) {
+      lderr(s->cct) << "ERROR: rgw_bucket_sync_status() on pipe=" << pipe << " returned ret=" << http_ret << dendl;
+    }
+    return;
+  }
+
+  rgw_zone_id source_zone_id(source_zone);
+
+  RGWBucketSyncPolicyHandlerRef source_handler;
+  http_ret = store->ctl()->bucket->get_sync_policy_handler(source_zone_id, source_bucket, &source_handler, null_yield);
+  if (http_ret < 0) {
+    lderr(s->cct) << "could not get bucket sync policy handler (r=" << http_ret << ")" << dendl;
+    return;
+  }
+
+  auto local_dests = source_handler->get_all_dests_in_zone(local_zone_id);
+
+  std::vector<rgw_bucket_shard_sync_info> current_status;
+  for (auto& entry : local_dests) {
+    auto pipe = entry.second;
+
+    ldout(s->cct, 20) << "RGWOp_BILog_Status::execute(): getting sync status for pipe=" << pipe << dendl;
+
+    RGWBucketInfo *pinfo = &info;
+    std::optional<RGWBucketInfo> opt_dest_info;
+
+    if (!pipe.dest.bucket) {
+      /* Uh oh, something went wrong */
+      ldout(s->cct, 20) << "ERROR: RGWOp_BILog_Status::execute(): BUG: pipe.dest.bucket was not initialized" << pipe << dendl;
+      http_ret = -EIO;
+      return;
+    }
+
+    if (*pipe.dest.bucket != info.bucket) {
+      opt_dest_info.emplace();
+      pinfo = &(*opt_dest_info);
+
+      /* dest bucket might not have a bucket id */
+      http_ret = store->ctl()->bucket->read_bucket_info(*pipe.dest.bucket,
+                                                        pinfo,
+                                                        s->yield,
+                                                        RGWBucketCtl::BucketInstance::GetParams(),
+                                                        nullptr);
+      if (http_ret < 0) {
+        ldpp_dout(s, 4) << "failed to read target bucket info (bucket=: " << cpp_strerror(http_ret) << dendl;
+        return;
+      }
+
+      pipe.dest.bucket = pinfo->bucket;
+    }
+
+    int r = rgw_bucket_sync_status(this, store, pipe, *pinfo, &info, &current_status);
+    if (r < 0) {
+      lderr(s->cct) << "ERROR: rgw_bucket_sync_status() on pipe=" << pipe << " returned ret=" << r << dendl;
+      http_ret = r;
+      return;
+    }
+
+    if (status.empty()) {
+      status = std::move(current_status);
+    } else {
+      if (current_status.size() !=
+          status.size()) {
+        http_ret = -EINVAL;
+        lderr(s->cct) << "ERROR: different number of shards for sync status of buckets syncing from the same source: status.size()= " << status.size() << " current_status.size()=" << current_status.size() << dendl;
+        return;
+      }
+      auto m = status.begin();
+      for (auto& cur_shard_status : current_status) {
+        auto& result_shard_status = *m++;
+        // always take the first marker, or any later marker that's smaller
+        if (cur_shard_status.inc_marker.position < result_shard_status.inc_marker.position) {
+          result_shard_status = std::move(cur_shard_status);
+        }
+      }
+    }
+  }
 }
 
 void RGWOp_BILog_Status::send_response()
index c71f47c7ebae1612fdfcc6fd6f9f7738fa9f29d5..abfba41331b0688d3438e2a9c7b250ad8c4a26ab 100644 (file)
@@ -476,32 +476,35 @@ int BucketTrimInstanceCR::operate()
     // query peers for sync status
     set_status("fetching sync status from relevant peers");
     yield {
-      // query data sync status from each sync peer
-      rgw_http_param_pair params[] = {
-        { "type", "bucket-index" },
-        { "status", nullptr },
-        { "bucket", bucket_instance.c_str() },
-        { "source-zone", zone_id.c_str() },
-        { nullptr, nullptr }
-      };
-
       const auto& all_dests = source_policy->policy_handler->get_all_dests();
 
-      set<rgw_zone_id> target_zones;
-      rgw_zone_id last_zone;
-      for (const auto& entry : all_dests) {
-        if (entry.first != last_zone) {
-          last_zone = entry.first;
-          target_zones.insert(last_zone);
+      vector<rgw_zone_id> zids;
+      rgw_zone_id last_zid;
+      for (auto& diter : all_dests) {
+        const auto& zid = diter.first;
+        if (zid == last_zid) {
+          continue;
         }
+        last_zid = zid;
+        zids.push_back(zid);
       }
 
-      peer_status.resize(target_zones.size());
+      peer_status.resize(zids.size());
 
       auto& zone_conn_map = store->svc()->zone->get_zone_conn_map();
 
       auto p = peer_status.begin();
-      for (auto& zid : target_zones) {
+      for (auto& zid : zids) {
+        // query data sync status from each sync peer
+        rgw_http_param_pair params[] = {
+          { "type", "bucket-index" },
+          { "status", nullptr },
+          { "options", "merge" },
+          { "source-bucket", bucket_instance.c_str() },
+          { "source-zone", zone_id.c_str() },
+          { nullptr, nullptr }
+        };
+
         auto ziter = zone_conn_map.find(zid);
         if (ziter == zone_conn_map.end()) {
           ldout(cct, 0) << "WARNING: no connection to zone " << zid << ", can't trim bucket: " << bucket << dendl;