]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: BucketInstanceTrimCR knows about generations
authorAdam C. Emerson <aemerson@redhat.com>
Fri, 14 May 2021 19:44:01 +0000 (15:44 -0400)
committerAdam C. Emerson <aemerson@redhat.com>
Mon, 13 Sep 2021 16:27:51 +0000 (12:27 -0400)
Fetch the current generation from remote peers and trim the minimum
marker on the minimum generation.

Signed-off-by: Adam C. Emerson <aemerson@redhat.com>
src/rgw/rgw_cr_rados.cc
src/rgw/rgw_cr_rados.h
src/rgw/rgw_rest_conn.h
src/rgw/rgw_trim_bilog.cc

index 1c18d3ea1aa24d5e9f6c48c803b6195e5a5354fc..65ac4e0aa55c22ccdc67ff33503ab3d15ceca75a 100644 (file)
@@ -604,17 +604,19 @@ int RGWAsyncGetBucketInstanceInfo::_send_request(const DoutPrefixProvider *dpp)
   return 0;
 }
 
-RGWRadosBILogTrimCR::RGWRadosBILogTrimCR(const DoutPrefixProvider *dpp,
-                                         rgw::sal::RadosStore* store,
-                                         const RGWBucketInfo& bucket_info,
-                                         int shard_id,
-                                         const std::string& start_marker,
-                                         const std::string& end_marker)
+RGWRadosBILogTrimCR::RGWRadosBILogTrimCR(
+  const DoutPrefixProvider *dpp,
+  rgw::sal::RadosStore* store,
+  const RGWBucketInfo& bucket_info,
+  int shard_id,
+  const rgw::bucket_index_layout_generation& generation,
+  const std::string& start_marker,
+  const std::string& end_marker)
   : RGWSimpleCoroutine(store->ctx()), bs(store->getRados()),
     start_marker(BucketIndexShardsManager::get_shard_marker(start_marker)),
     end_marker(BucketIndexShardsManager::get_shard_marker(end_marker))
 {
-  bs.init(dpp, bucket_info, bucket_info.layout.current_index, shard_id);
+  bs.init(dpp, bucket_info, generation, shard_id);
 }
 
 int RGWRadosBILogTrimCR::send_request(const DoutPrefixProvider *dpp)
index 8dc5a9d17d270100821fcd42f4325523e416f027..f83080d53e7bba5f6f833d04429be5cea3ce8b25 100644 (file)
@@ -929,7 +929,9 @@ class RGWRadosBILogTrimCR : public RGWSimpleCoroutine {
  public:
   RGWRadosBILogTrimCR(const DoutPrefixProvider *dpp,
                       rgw::sal::RadosStore* store, const RGWBucketInfo& bucket_info,
-                      int shard_id, const std::string& start_marker,
+                      int shard_id,
+                     const rgw::bucket_index_layout_generation& generation,
+                     const std::string& start_marker,
                       const std::string& end_marker);
 
   int send_request(const DoutPrefixProvider *dpp) override;
index e0e1b12abe5d7e5f350a1e69eb8f2331845f67e2..a0be6e2dd2fe386b6da8f41666d2ebafc9004cb3 100644 (file)
@@ -16,8 +16,8 @@ namespace rgw { namespace sal {
 
 class RGWSI_Zone;
 
-template <class T>
-static int parse_decode_json(T& t, bufferlist& bl)
+template<class T>
+inline int parse_decode_json(T& t, bufferlist& bl)
 {
   JSONParser p;
   if (!p.parse(bl.c_str(), bl.length())) {
index 9508d9d02983180a1f10c1980f0ee72d782e3eb9..7848d979dc331050b3ff0006010ce73280625ea1 100644 (file)
@@ -353,32 +353,6 @@ struct BucketTrimObserver {
   virtual bool trimmed_recently(const std::string_view& bucket_instance) = 0;
 };
 
-/// populate the status with the minimum stable marker of each shard
-template <typename Iter>
-int take_min_status(CephContext *cct, Iter first, Iter last,
-                    std::vector<std::string> *status)
-{
-  for (auto peer = first; peer != last; ++peer) {
-    if (peer->size() != status->size()) {
-      // all peers must agree on the number of shards
-      return -EINVAL;
-    }
-    auto m = status->begin();
-    for (auto& shard : *peer) {
-      auto& marker = *m++;
-      // if no sync has started, we can safely trim everything
-      if (shard.state == rgw_bucket_shard_sync_info::StateInit) {
-        continue;
-      }
-      // always take the first marker, or any later marker that's smaller
-      if (peer == first || marker > shard.inc_marker.position) {
-        marker = std::move(shard.inc_marker.position);
-      }
-    }
-  }
-  return 0;
-}
-
 /// trim each bilog shard to the given marker, while limiting the number of
 /// concurrent requests
 class BucketTrimShardCollectCR : public RGWShardCollectCR {
@@ -386,6 +360,7 @@ class BucketTrimShardCollectCR : public RGWShardCollectCR {
   const DoutPrefixProvider *dpp;
   rgw::sal::RadosStore* const store;
   const RGWBucketInfo& bucket_info;
+  rgw::bucket_index_layout_generation generation;
   const std::vector<std::string>& markers; //< shard markers to trim
   size_t i{0}; //< index of current shard marker
 
@@ -401,9 +376,11 @@ class BucketTrimShardCollectCR : public RGWShardCollectCR {
  public:
   BucketTrimShardCollectCR(const DoutPrefixProvider *dpp,
                            rgw::sal::RadosStore* store, const RGWBucketInfo& bucket_info,
+                          rgw::bucket_index_layout_generation generation,
                            const std::vector<std::string>& markers)
     : RGWShardCollectCR(store->ctx(), MAX_CONCURRENT_SHARDS),
-      dpp(dpp), store(store), bucket_info(bucket_info), markers(markers)
+      dpp(dpp), store(store), bucket_info(bucket_info),
+      generation(generation), markers(markers)
   {}
   bool spawn_next() override;
 };
@@ -419,7 +396,7 @@ bool BucketTrimShardCollectCR::spawn_next()
       ldpp_dout(dpp, 10) << "trimming bilog shard " << shard_id
           << " of " << bucket_info.bucket << " at marker " << marker << dendl;
       spawn(new RGWRadosBILogTrimCR(dpp, store, bucket_info, shard_id,
-                                    std::string{}, marker),
+                                    generation, std::string{}, marker),
             false);
       return true;
     }
@@ -437,15 +414,60 @@ class BucketTrimInstanceCR : public RGWCoroutine {
   std::shared_ptr<rgw_bucket_get_sync_policy_result> source_policy;
   rgw_bucket bucket;
   const std::string& zone_id; //< my zone id
-  RGWBucketInfo _bucket_info; 
+  RGWBucketInfo _bucket_info;
   const RGWBucketInfo *pbucket_info; //< pointer to bucket instance info to locate bucket indices
   int child_ret = 0;
   const DoutPrefixProvider *dpp;
 
-  using StatusShards = std::vector<rgw_bucket_shard_sync_info>;
+public:
+  struct StatusShards {
+    uint64_t generation = 0;
+    std::vector<rgw_bucket_shard_sync_info> shards;
+  };
+private:
   std::vector<StatusShards> peer_status; //< sync status for each peer
   std::vector<std::string> min_markers; //< min marker per shard
 
+  /// The layout of the generation to trim
+  rgw::bucket_index_layout_generation totrim;
+
+  int take_min_generation() {
+    // Initialize the min_generation to the bucket's current
+    // generation, used in case we have no peers.
+    auto min_generation = pbucket_info->layout.logs.back().gen;
+
+    // Determine the minimum generation
+    if (auto m = std::min_element(peer_status.begin(),
+                                 peer_status.end(),
+                                 [](const StatusShards& l,
+                                    const StatusShards& r) {
+                                   return l.generation < r.generation;
+                                 }); m != peer_status.end()) {
+      min_generation = m->generation;
+    }
+
+    auto& logs = pbucket_info->layout.logs;
+    auto log = std::find_if(logs.begin(), logs.end(),
+                           rgw::matches_gen(min_generation));
+    if (log == logs.end()) {
+      ldpp_dout(dpp, 5) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                       << "ERROR: No log layout for min_generation="
+                       << min_generation << dendl;
+      return -ENOENT;
+    }
+
+    if (log->layout.type == rgw::BucketLogType::InIndex) {
+      totrim = log->layout.in_index;
+    } else {
+      ldpp_dout(dpp, 0) << "Unable to convert log of unknown type "
+                       << log->layout.type
+                       << " to rgw::bucket_index_layout_generation " << dendl;
+      return -EINVAL;
+    }
+
+    return 0;
+  }
+
  public:
   BucketTrimInstanceCR(rgw::sal::RadosStore* store, RGWHTTPManager *http,
                        BucketTrimObserver *observer,
@@ -463,6 +485,69 @@ class BucketTrimInstanceCR : public RGWCoroutine {
   int operate(const DoutPrefixProvider *dpp) override;
 };
 
+namespace {
+/// populate the status with the minimum stable marker of each shard
+int take_min_status(
+  CephContext *cct,
+  const uint64_t min_generation,
+  std::vector<BucketTrimInstanceCR::StatusShards>::const_iterator first,
+  std::vector<BucketTrimInstanceCR::StatusShards>::const_iterator last,
+  std::vector<std::string> *status) {
+  for (auto peer = first; peer != last; ++peer) {
+    if (peer->shards.size() != status->size()) {
+      // all peers must agree on the number of shards
+      return -EINVAL;
+    }
+
+    auto m = status->begin();
+    for (auto& shard : peer->shards) {
+      auto& marker = *m++;
+      // Peers on later generations don't get a say in the matter
+      if (peer->generation > min_generation) {
+       continue;
+      }
+      // if no sync has started, we can safely trim everything
+      if (shard.state == rgw_bucket_shard_sync_info::StateInit) {
+       continue;
+      }
+      // always take the first marker, or any later marker that's smaller
+      if (peer == first || marker > shard.inc_marker.position) {
+       marker = std::move(shard.inc_marker.position);
+      }
+    }
+  }
+  return 0;
+}
+}
+
+template<>
+inline int parse_decode_json<BucketTrimInstanceCR::StatusShards>(
+  BucketTrimInstanceCR::StatusShards& s, bufferlist& bl)
+{
+  JSONParser p;
+  if (!p.parse(bl.c_str(), bl.length())) {
+    return -EINVAL;
+  }
+
+  try {
+    bilog_status_v2 v;
+    decode_json_obj(v, &p);
+    s.generation = v.sync_status.incremental_gen;
+    s.shards = std::move(v.inc_status);
+  } catch (JSONDecoder::err& e) {
+    try {
+      // Fall back if we're talking to an old node that can't give v2
+      // output.
+      s.generation = 0;
+      decode_json_obj(s.shards, &p);
+    } catch (JSONDecoder::err& e) {
+      return -EINVAL;
+    }
+  }
+  return 0;
+}
+
+
 int BucketTrimInstanceCR::operate(const DoutPrefixProvider *dpp)
 {
   reenter(this) {
@@ -521,6 +606,7 @@ int BucketTrimInstanceCR::operate(const DoutPrefixProvider *dpp)
           { "bucket", bucket_instance.c_str() }, /* equal to source-bucket when `options==merge` and source-bucket
                                                     param is not provided */
           { "source-zone", zone_id.c_str() },
+          { "version", "2" },
           { nullptr, nullptr }
         };
 
@@ -529,7 +615,8 @@ int BucketTrimInstanceCR::operate(const DoutPrefixProvider *dpp)
           ldpp_dout(dpp, 0) << "WARNING: no connection to zone " << zid << ", can't trim bucket: " << bucket << dendl;
           return set_cr_error(-ECANCELED);
         }
-        using StatusCR = RGWReadRESTResourceCR<StatusShards>;
+
+       using StatusCR = RGWReadRESTResourceCR<StatusShards>;
         spawn(new StatusCR(cct, ziter->second, http, "/admin/log/", params, &*p),
               false);
         ++p;
@@ -550,9 +637,15 @@ int BucketTrimInstanceCR::operate(const DoutPrefixProvider *dpp)
     min_markers.assign(std::max(1u, pbucket_info->layout.current_index.layout.normal.num_shards),
                        RGWSyncLogTrimCR::max_marker);
 
+    // Determine the minimum generation
+    retcode = take_min_generation();
+    if (retcode < 0) {
+      ldpp_dout(dpp, 4) << "failed to find minimum generation" << dendl;
+      return set_cr_error(retcode);
+    }
     // determine the minimum marker for each shard
-    retcode = take_min_status(cct, peer_status.begin(), peer_status.end(),
-                              &min_markers);
+    retcode = take_min_status(cct, totrim.gen, peer_status.cbegin(),
+                             peer_status.cend(), &min_markers);
     if (retcode < 0) {
       ldpp_dout(dpp, 4) << "failed to correlate bucket sync status from peers" << dendl;
       return set_cr_error(retcode);
@@ -562,7 +655,8 @@ int BucketTrimInstanceCR::operate(const DoutPrefixProvider *dpp)
     ldpp_dout(dpp, 10) << "trimming bilogs for bucket=" << pbucket_info->bucket
        << " markers=" << min_markers << ", shards=" << min_markers.size() << dendl;
     set_status("trimming bilog shards");
-    yield call(new BucketTrimShardCollectCR(dpp, store, *pbucket_info, min_markers));
+    yield call(new BucketTrimShardCollectCR(dpp, store, *pbucket_info, totrim,
+                                           min_markers));
     // ENODATA just means there were no keys to trim
     if (retcode == -ENODATA) {
       retcode = 0;