]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
osd: Deep scrubbing implementation for optimised erasure coding
authorJon Bailey <jonathan.bailey1@ibm.com>
Mon, 12 May 2025 20:29:40 +0000 (21:29 +0100)
committerJon Bailey <jonathan.bailey1@ibm.com>
Tue, 15 Jul 2025 12:36:10 +0000 (13:36 +0100)
Signed-off-by: Jon Bailey <jonathan.bailey1@ibm.com>
13 files changed:
src/osd/ECBackend.cc
src/osd/ECBackend.h
src/osd/ECBackendL.h
src/osd/ECSwitch.h
src/osd/ECUtil.h
src/osd/PG.h
src/osd/PGBackend.h
src/osd/ReplicatedBackend.cc
src/osd/ReplicatedBackend.h
src/osd/scrubber/scrub_backend.cc
src/osd/scrubber/scrub_backend.h
src/osd/scrubber_common.h
src/test/osd/test_scrubber_be.cc

index ce6fe25246ec62a596593f1501dcfba721c49372..84a315596f76d64ab86474deacb0f944e248afe9 100644 (file)
@@ -1667,6 +1667,57 @@ void ECBackend::objects_read_async(
          cct)));
 }
 
+bool ECBackend::ec_can_decode(const shard_id_set &available_shards) const {
+  if (sinfo.supports_sub_chunks()) {
+    ceph_abort_msg("Interface does not support subchunks");
+    return false;
+  }
+
+  mini_flat_map<shard_id_t, std::vector<std::pair<int, int>>>
+      minimum_sub_chunks{ec_impl->get_chunk_count()};
+  shard_id_set want_to_read = sinfo.get_all_shards();
+  shard_id_set available(available_shards);
+  shard_id_set minimum_set;
+
+  int r = ec_impl->minimum_to_decode(want_to_read, available, minimum_set,
+                                     &minimum_sub_chunks);
+  return (r == 0);
+}
+
+shard_id_map<bufferlist> ECBackend::ec_encode_acting_set(
+    const bufferlist &in_bl) const {
+  shard_id_set want_to_encode;
+  for (raw_shard_id_t raw_shard_id;raw_shard_id < ec_impl->get_chunk_count();
+       ++raw_shard_id) {
+    want_to_encode.insert(sinfo.get_shard(raw_shard_id));
+  }
+  shard_id_map<bufferlist> encoded{ec_impl->get_chunk_count()};
+  ec_impl->encode(want_to_encode, in_bl, &encoded);
+  return encoded;
+}
+
+shard_id_map<bufferlist> ECBackend::ec_decode_acting_set(
+    const shard_id_map<bufferlist> &shard_map, int chunk_size) const {
+  shard_id_set want_to_read;
+  for (raw_shard_id_t raw_shard_id; raw_shard_id < ec_impl->get_chunk_count();
+       ++raw_shard_id) {
+    shard_id_t shard_id = sinfo.get_shard(raw_shard_id);
+    if (!shard_map.contains(shard_id)) want_to_read.insert(shard_id);
+  }
+
+  shard_id_map<bufferlist> decoded_buffers(ec_impl->get_chunk_count());
+  ec_impl->decode(want_to_read, shard_map, &decoded_buffers, chunk_size);
+
+  shard_id_map<bufferlist> decoded_buffer_map{ec_impl->get_chunk_count()};
+  for (auto &[shard_id, bl] : decoded_buffers) {
+    decoded_buffer_map[shard_id] = bl;
+  }
+
+  return decoded_buffer_map;
+}
+
+ECUtil::stripe_info_t ECBackend::ec_get_sinfo() const { return sinfo; }
+
 void ECBackend::objects_read_and_reconstruct(
   const map<hobject_t, std::list<ec_align_t>> &reads,
   bool fast_read,
@@ -1760,7 +1811,16 @@ int ECBackend::be_deep_scrub(
     return -EINPROGRESS;
   }
 
-  o.digest = 0;
+  if (sinfo.supports_encode_decode_crcs()) {
+    // We pass the calculated digest here
+    // This will be used along with the plugin to verify data consistency
+    o.digest = pos.data_hash.digest();
+  }
+  else
+  {
+    o.digest = 0;
+  }
+
   o.digest_present = true;
   o.omap_digest = -1;
   o.omap_digest_present = true;
index d85defc9e59878163c95d721f52604938af71e42..0eafbf4acb69a0efd132c2a8b0300cafd4990a5d 100644 (file)
@@ -175,6 +175,12 @@ class ECBackend : public ECCommon {
       bool fast_read = false
     );
 
+  bool ec_can_decode(const shard_id_set &available_shards) const;
+  shard_id_map<bufferlist> ec_encode_acting_set(const bufferlist &in_bl) const;
+  shard_id_map<bufferlist> ec_decode_acting_set(
+      const shard_id_map<bufferlist> &shard_map, int chunk_size) const;
+  ECUtil::stripe_info_t ec_get_sinfo() const;
+
  private:
   friend struct ECRecoveryHandle;
 
@@ -436,6 +442,10 @@ class ECBackend : public ECCommon {
     return sinfo.get_chunk_size();
   }
 
+  bool get_ec_supports_crc_encode_decode() const {
+    return sinfo.supports_encode_decode_crcs();
+  }
+
   uint64_t object_size_to_shard_size(const uint64_t size, shard_id_t shard
     ) const {
     return sinfo.object_size_to_shard_size(size, shard);
index cd0a1e846aa564ceffcc728ee8568610dbd78243..592e5d744d42b270269c8071f6f64b863c15d9b3 100644 (file)
@@ -374,6 +374,7 @@ IGNORE_DEPRECATED
 END_IGNORE_DEPRECATED
     }
   };
+
   std::unique_ptr<ECRecPred> get_is_recoverable_predicate() const {
     return std::make_unique<ECRecPred>(ec_impl);
   }
@@ -381,9 +382,13 @@ END_IGNORE_DEPRECATED
   unsigned get_ec_data_chunk_count() const {
     return ec_impl->get_data_chunk_count();
   }
+
   int get_ec_stripe_chunk_size() const {
     return sinfo.get_chunk_size();
   }
+
+  bool get_ec_supports_crc_encode_decode() const { return false; }
+
   uint64_t object_size_to_shard_size(const uint64_t size) const {
     if (size == std::numeric_limits<uint64_t>::max()) {
       return size;
index bd0acad9ad7c629a2881d777baac9a0f13902e3c..689071eb08e46f3619a3e7b449b033e927823826 100644 (file)
@@ -328,6 +328,51 @@ public:
     return legacy.get_ec_stripe_chunk_size();
   }
 
+  bool get_ec_supports_crc_encode_decode() const override {
+    if (is_optimized()) {
+      return optimized.get_ec_supports_crc_encode_decode();
+    }
+    return legacy.get_ec_supports_crc_encode_decode();
+  }
+
+  bool ec_can_decode(const shard_id_set &available_shards) const override {
+    if (is_optimized()) {
+      return optimized.ec_can_decode(available_shards);
+    }
+
+    return false;
+  }
+
+  shard_id_map<bufferlist> ec_encode_acting_set(
+      const bufferlist &in_bl) const override {
+    if (is_optimized()) {
+      return optimized.ec_encode_acting_set(in_bl);
+    }
+
+    ceph_abort_msg("This interface is not supported by legacy EC");
+    return {0};
+  }
+
+  shard_id_map<bufferlist> ec_decode_acting_set(
+      const shard_id_map<bufferlist> &shard_map,
+      int chunk_size) const override {
+    if (is_optimized()) {
+      return optimized.ec_decode_acting_set(shard_map, chunk_size);
+    }
+
+    ceph_abort_msg("This interface is not supported by legacy EC");
+    return {0};
+  }
+
+  ECUtil::stripe_info_t ec_get_sinfo() const {
+    if (is_optimized()) {
+      return optimized.ec_get_sinfo();
+    }
+
+    ceph_abort_msg("This interface is not supported by legacy EC");
+    return {0, 0, 0};
+  }
+
   int objects_get_attrs(
     const hobject_t &hoid,
     std::map<std::string, ceph::buffer::list, std::less<>> *out) override
index 8a228795054c7ad0a0a499c6fdf170c0da8c9e04..db254e25e2ae22d688b97ec782780ab13ba73dfc 100644 (file)
@@ -417,7 +417,6 @@ private:
     return all_shards;
   }
 
-
 public:
   stripe_info_t(const ErasureCodeInterfaceRef &ec_impl, const pg_pool_t *pool,
                 uint64_t stripe_width
@@ -550,6 +549,10 @@ public:
     return pool->allows_ecoverwrites();
   }
 
+  bool supports_ec_optimisations() const {
+    return pool->allows_ecoptimizations();
+  }
+
   bool supports_sub_chunks() const {
     return (plugin_flags &
       ErasureCodeInterface::FLAG_EC_PLUGIN_REQUIRE_SUB_CHUNKS) != 0;
index f3e760c1b8fd154c736bccc1838a79f9452b4e91..84c88efc301e47004d121dd27010843ab3871bfa 100644 (file)
@@ -1415,6 +1415,28 @@ public:
                                  shard_id_t shard_id) const final {
    return get_pgbackend()->be_get_ondisk_size(logical_size, shard_id_t(shard_id));
  }
+
+ bool ec_can_decode(const shard_id_set &available_shards) const final {
+   return get_pgbackend()->ec_can_decode(available_shards);
+ }
+
+ shard_id_map<bufferlist> ec_encode_acting_set(
+     const bufferlist &in_bl) const final {
+   return get_pgbackend()->ec_encode_acting_set(in_bl);
+ }
+
+ shard_id_map<bufferlist> ec_decode_acting_set(
+     const shard_id_map<bufferlist> &shard_map, int chunk_size) const final {
+   return get_pgbackend()->ec_decode_acting_set(shard_map, chunk_size);
+ }
+
+ bool get_ec_supports_crc_encode_decode() const final {
+   return get_pgbackend()->get_ec_supports_crc_encode_decode();
+ }
+
+ ECUtil::stripe_info_t get_ec_sinfo() const final {
+   return get_pgbackend()->ec_get_sinfo();
+ }
 };
 
 /**
index acbff0745b23ef9f27d36a34dd1c12de78e8daf3..768eee302cd4527f6140d103c242d944977ca0a1 100644 (file)
@@ -428,6 +428,7 @@ typedef std::shared_ptr<const OSDMap> OSDMapRef;
    virtual IsPGReadablePredicate *get_is_readable_predicate() const = 0;
    virtual unsigned int get_ec_data_chunk_count() const { return 0; };
    virtual int get_ec_stripe_chunk_size() const { return 0; };
+   virtual bool get_ec_supports_crc_encode_decode() const = 0;
    virtual uint64_t object_size_to_shard_size(const uint64_t size, shard_id_t shard) const { return size; };
    virtual void dump_recovery_info(ceph::Formatter *f) const = 0;
    virtual bool get_is_nonprimary_shard(shard_id_t shard) const {
@@ -439,6 +440,12 @@ typedef std::shared_ptr<const OSDMap> OSDMapRef;
    virtual bool get_is_ec_optimized() const {
      return false; // Only EC can have be ec optimized!
    }
+   virtual bool ec_can_decode(const shard_id_set &available_shards) const = 0;
+   virtual shard_id_map<bufferlist> ec_encode_acting_set(
+       const bufferlist &in_bl) const = 0;
+   virtual shard_id_map<bufferlist> ec_decode_acting_set(
+       const shard_id_map<bufferlist> &shard_map, int chunk_size) const = 0;
+   virtual ECUtil::stripe_info_t ec_get_sinfo() const = 0;
 
  private:
    std::set<hobject_t> temp_contents;
index 0ffeb9f23be17508b6fbed53ac7889f2f9eec345..ef84380d12168e54d6bea8de7dabc03fdb6d79d1 100644 (file)
@@ -310,6 +310,34 @@ void ReplicatedBackend::objects_read_async(
   ceph_abort_msg("async read is not used by replica pool");
 }
 
+bool ReplicatedBackend::get_ec_supports_crc_encode_decode() const {
+  ceph_abort_msg("crc encode decode is not used by replica pool");
+  return false;
+}
+
+bool ReplicatedBackend::ec_can_decode(
+    const shard_id_set &available_shards) const {
+  ceph_abort_msg("can decode is not used by replica pool");
+  return false;
+}
+
+shard_id_map<bufferlist> ReplicatedBackend::ec_encode_acting_set(
+    const bufferlist &in_bl) const {
+  ceph_abort_msg("encode is not used by replica pool");
+  return {0};
+}
+
+shard_id_map<bufferlist> ReplicatedBackend::ec_decode_acting_set(
+    const shard_id_map<bufferlist> &shard_map, int chunk_size) const {
+  ceph_abort_msg("decode is not used by replica pool");
+  return {0};
+}
+
+ECUtil::stripe_info_t ReplicatedBackend::ec_get_sinfo() const {
+  ceph_abort_msg("get_ec_sinfo is not used by replica pool");
+  return {0, 0, 0};
+}
+
 class C_OSD_OnOpCommit : public Context {
   ReplicatedBackend *pg;
   ceph::ref_t<ReplicatedBackend::InProgressOp> op;
index db9e3d776e66f3095fa2c50ad60c272e2f7bcdad..a1cd8a84f12a0fdac53224b53e4f77c936f7beed 100644 (file)
@@ -153,8 +153,15 @@ public:
               std::pair<ceph::buffer::list*, Context*> > > &to_read,
                Context *on_complete,
                bool fast_read = false) override;
-
-private:
+  bool get_ec_supports_crc_encode_decode() const override;
+  ECUtil::stripe_info_t ec_get_sinfo() const override;
+  bool ec_can_decode(const shard_id_set &available_shards) const override;
+  shard_id_map<bufferlist> ec_encode_acting_set(
+      const bufferlist &in_bl) const override;
+  shard_id_map<bufferlist> ec_decode_acting_set(
+      const shard_id_map<bufferlist> &shard_map, int chunk_size) const override;
+
+ private:
   // push
   struct push_info_t {
     ObjectRecoveryProgress recovery_progress;
index 2dccc1bb92eab568441b4e63601ba67989e0720c..7911ad7961d1675eb1e8dd3aebf88224fc7a3f7e 100644 (file)
@@ -70,6 +70,7 @@ ScrubBackend::ScrubBackend(ScrubBeListener& scrubber,
                [i_am](const pg_shard_t& shard) { return shard != i_am; });
 
   m_is_replicated = m_pool.info.is_replicated();
+  m_is_optimized_ec = m_pool.info.allows_ecoptimizations();
   m_mode_desc =
     (m_repair ? "repair"sv
               : (m_depth == scrub_level_t::deep ? "deep-scrub"sv : "scrub"sv));
@@ -93,6 +94,7 @@ ScrubBackend::ScrubBackend(ScrubBeListener& scrubber,
 {
   m_formatted_id = m_pg_id.calc_name_sring();
   m_is_replicated = m_pool.info.is_replicated();
+  m_is_optimized_ec = m_pool.info.allows_ecoptimizations();
   m_mode_desc =
     (m_repair ? "repair"sv
               : (m_depth == scrub_level_t::deep ? "deep-scrub"sv : "scrub"sv));
@@ -104,6 +106,19 @@ uint64_t ScrubBackend::logical_to_ondisk_size(uint64_t logical_size,
   return m_pg.logical_to_ondisk_size(logical_size, shard_id);
 }
 
+uint32_t ScrubBackend::generate_zero_buffer_crc(shard_id_t shard_id,
+                                                int length) const {
+  // Shards can have different lengths.
+  // Lengths of zero buffers need to match the length of the shard.
+  // So we initialise a new buffer to the correct length per shard.
+  bufferlist zero_bl;
+  zero_bl.append_zero(logical_to_ondisk_size(length, shard_id));
+
+  bufferhash zero_data_hash(-1);
+  zero_data_hash << zero_bl;
+  return zero_data_hash.digest();
+}
+
 void ScrubBackend::update_repair_status(bool should_repair)
 {
   dout(15) << __func__
@@ -420,10 +435,35 @@ auth_selection_t ScrubBackend::select_auth_object(const hobject_t& ho,
   /// that is auth eligible.
   /// This creates an issue with 'digest_match' that should be handled.
   std::list<pg_shard_t> shards;
+  shard_id_set available_shards;
+  uint32_t digest_map_size = 0;
+  if (!m_is_replicated && m_pg.get_ec_supports_crc_encode_decode()) {
+    digest_map_size = m_pg.get_ec_sinfo().get_k_plus_m();
+  }
+  shard_id_map<bufferlist> digest_map{digest_map_size};
+
   for (const auto& [srd, smap] : this_chunk->received_maps) {
     if (srd != m_pg_whoami) {
       shards.push_back(srd);
     }
+
+    if (!m_is_replicated && m_pg.get_ec_supports_crc_encode_decode() &&
+        smap.objects.contains(ho)) {
+      available_shards.insert(srd.shard);
+
+      uint32_t digest = smap.objects.at(ho).digest;
+      constexpr std::size_t length = sizeof(digest);
+      char crc_bytes[length];
+      for (std::size_t i = 0; i < length; i++) {
+        crc_bytes[i] = digest >> (8 * i) & 0xFF;
+      }
+      ceph::bufferptr b = ceph::buffer::create_page_aligned(
+          m_pg.get_ec_sinfo().get_chunk_size());
+      b.copy_in(0, length, crc_bytes);
+
+      digest_map[srd.shard] = bufferlist{};
+      digest_map[srd.shard].append(b);
+    }
   }
   shards.push_front(m_pg_whoami);
 
@@ -510,6 +550,71 @@ auth_selection_t ScrubBackend::select_auth_object(const hobject_t& ho,
     }
   }
 
+  if (auth_version != eversion_t() && !m_is_replicated &&
+      m_pg.get_ec_supports_crc_encode_decode() &&
+      available_shards.size() != 0) {
+    if (m_pg.ec_can_decode(available_shards)) {
+      // Decode missing data shards needed to do an encode
+      // Only bother doing this if the number of missing shards is less than the
+      // number of parity shards
+
+      int missing_shards =
+          std::count_if(m_pg.get_ec_sinfo().get_data_shards().begin(),
+                        m_pg.get_ec_sinfo().get_data_shards().end(),
+                        [&available_shards](const auto& shard_id) {
+                          return available_shards.contains(shard_id);
+                        });
+
+      const int num_redundancy_shards = m_pg.get_ec_sinfo().get_m();
+      if (missing_shards > 0 && missing_shards < num_redundancy_shards) {
+        dout(10) << fmt::format(
+                        "{}: Decoding {} missing shards for pg {} "
+                        "as only received shards were ({}).",
+                        __func__, missing_shards, m_pg_whoami, available_shards)
+                 << dendl;
+        digest_map = m_pg.ec_decode_acting_set(
+            digest_map, m_pg.get_ec_sinfo().get_chunk_size());
+      } else if (missing_shards != 0) {
+        dout(5) << fmt::format(
+                       "{}: Cannot decode {} shards from pg {} "
+                       "when only shards {} were received. Ignoring.",
+                       __func__, missing_shards, m_pg_whoami, available_shards)
+                << dendl;
+      } else {
+        dout(30) << fmt::format(
+                        "{}: All shards received for pg {}. "
+                        "skipping decoding.",
+                        __func__, m_pg_whoami)
+                 << dendl;
+      }
+
+      bufferlist crc_bl;
+      for (const auto& shard_id : m_pg.get_ec_sinfo().get_data_shards()) {
+        uint32_t zero_data_crc = generate_zero_buffer_crc(
+            shard_id, logical_to_ondisk_size(ret_auth.auth_oi.size, shard_id));
+        for (std::size_t i = 0; i < sizeof(zero_data_crc); i++) {
+          digest_map[shard_id].c_str()[i] =
+              digest_map[shard_id][i] ^ ((zero_data_crc >> (8 * i)) & 0xff);
+        }
+
+        crc_bl.append(digest_map[shard_id]);
+      }
+
+      shard_id_map<bufferlist> encoded_crcs = m_pg.ec_encode_acting_set(crc_bl);
+
+      if (encoded_crcs[shard_id_t(m_pg.get_ec_sinfo().get_k())] !=
+          digest_map[shard_id_t(m_pg.get_ec_sinfo().get_k())]) {
+        ret_auth.digest_match = false;
+      }
+    } else {
+      dout(10) << fmt::format(
+                      "{}: Cannot decode missing shards in pg {} "
+                      "when only shards {} were received. Ignoring.",
+                      __func__, m_pg_whoami, available_shards)
+               << dendl;
+    }
+  }
+
   dout(10) << fmt::format("{}: selecting osd {} for obj {} with oi {}",
                           __func__,
                           ret_auth.auth_shard,
@@ -1046,6 +1151,11 @@ ScrubBackend::auth_and_obj_errs_t ScrubBackend::match_in_shards(
 {
   std::list<pg_shard_t> auth_list;     // out "param" to
   std::set<pg_shard_t> object_errors;  // be returned
+  std::size_t digest_size = 0;
+  if (!m_is_replicated && m_pg.get_ec_supports_crc_encode_decode()) {
+    digest_size = m_pg.get_ec_sinfo().get_k_plus_m();
+  }
+  shard_id_map<bufferlist> digests{digest_size};
 
   for (auto& [srd, smap] : this_chunk->received_maps) {
 
@@ -1071,6 +1181,24 @@ ScrubBackend::auth_and_obj_errs_t ScrubBackend::match_in_shards(
                                                      ho.has_snapset(),
                                                      srd);
 
+      if (!m_is_replicated && m_pg.get_ec_supports_crc_encode_decode()) {
+        // Create map containing all data shards except current shard and all
+        // parity shards Decode the current data shard Add to set<shard_id>
+        // incorrectly_decoded_shards if the shard did not decode
+
+        constexpr std::size_t length = sizeof(smap.objects[ho].digest);
+        char crc_bytes[length];
+        for (std::size_t i = 0; i < length; i++) {
+          crc_bytes[i] = smap.objects[ho].digest >> (8 * i) & 0xFF;
+        }
+        ceph::bufferptr b = ceph::buffer::create_page_aligned(
+            m_pg.get_ec_sinfo().get_chunk_size());
+        b.copy_in(0, length, crc_bytes);
+
+        digests[srd.shard] = bufferlist{};
+        digests[srd.shard].append(b);
+      }
+
       dout(20) << fmt::format(
                    "{}: {}{} <{}:{}> shards: {} {} {}", __func__,
                    (m_repair ? "repair " : ""),
@@ -1155,6 +1283,66 @@ ScrubBackend::auth_and_obj_errs_t ScrubBackend::match_in_shards(
              << dendl;
   }
 
+  if (!m_is_replicated && m_pg.get_ec_supports_crc_encode_decode()) {
+    set<shard_id_t> incorrectly_decoded_shards;
+
+    if (std::any_of(
+            digests.begin(), digests.end(),
+            [](const std::pair<const shard_id_t, ceph::bufferlist&>& digest) {
+              return !std::string(digest.second.c_str()).empty();
+            })) {
+      // Unseed all buffers in chunks
+      for (auto& [srd, bl] : digests) {
+        uint32_t zero_data_crc = generate_zero_buffer_crc(
+            srd, logical_to_ondisk_size(auth_sel.auth_oi.size, srd));
+        for (uint32_t i = 0; i < sizeof(zero_data_crc); i++) {
+          bl.c_str()[i] = bl[i] ^ ((zero_data_crc >> (8 * i)) & 0xff);
+        }
+      }
+
+      // For each digest, we will remove it from our map and then redecode it
+      // using the erasure coding plugin for this pool.
+      // When we find it does not decode back correctly, we know we have found
+      // a data consistency issue that should be reported.
+      for (auto& [srd, bl] : digests) {
+        if (m_pg.get_ec_sinfo().get_data_shards().contains(srd)) {
+          bufferlist removed_shard = std::move(bl);
+          digests.erase(srd);
+
+          shard_id_map<bufferlist> decoded_map = m_pg.ec_decode_acting_set(
+              digests, m_pg.get_ec_sinfo().get_chunk_size());
+
+          if (!std::equal(removed_shard.begin(), removed_shard.end(),
+                          decoded_map[srd].begin())) {
+            incorrectly_decoded_shards.insert(srd);
+          }
+
+          digests.insert(srd, std::move(removed_shard));
+        }
+      }
+    }
+
+    if (incorrectly_decoded_shards.size() == 1) {
+      obj_result.set_data_digest_mismatch();
+      this_chunk->m_error_counts.deep_errors++;
+      errstream << m_pg_id << " " << ho << "data digest inconsistent on shard "
+                << *incorrectly_decoded_shards.begin() << "\n";
+    } else if (incorrectly_decoded_shards.size() <
+               m_pg.get_ec_sinfo().get_k()) {
+      for (shard_id_t incorrectly_decoded_shard : incorrectly_decoded_shards) {
+        obj_result.set_data_digest_mismatch();
+        this_chunk->m_error_counts.deep_errors++;
+        errstream << m_pg_id << " " << ho << "data digest inconsistent on shard "
+                  << incorrectly_decoded_shard << "\n";
+      }
+    } else if (incorrectly_decoded_shards.size() ==
+               m_pg.get_ec_sinfo().get_k()) {
+      obj_result.set_data_digest_mismatch();
+      this_chunk->m_error_counts.deep_errors++;
+      errstream << m_pg_id << " " << ho << "data digests are inconsistent\n";
+    }
+  }
+
   dout(15) << fmt::format("{}: auth_list: {} #: {}; obj-errs#: {}",
                           __func__,
                           auth_list,
@@ -1180,7 +1368,7 @@ bool ScrubBackend::compare_obj_details(pg_shard_t auth_shard,
 
   // ------------------------------------------------------------------------
 
-  if (auth.digest_present && candidate.digest_present &&
+  if (m_is_replicated && auth.digest_present && candidate.digest_present &&
       auth.digest != candidate.digest) {
     fmt::format_to(std::back_inserter(out),
                    "data_digest {:#x} != data_digest {:#x} from shard {}",
@@ -1191,7 +1379,8 @@ bool ScrubBackend::compare_obj_details(pg_shard_t auth_shard,
     obj_result.set_data_digest_mismatch();
   }
 
-  if (auth.omap_digest_present && candidate.omap_digest_present &&
+  if (m_is_replicated && auth.omap_digest_present &&
+      candidate.omap_digest_present &&
       auth.omap_digest != candidate.omap_digest) {
     fmt::format_to(std::back_inserter(out),
                    "{}omap_digest {:#x} != omap_digest {:#x} from shard {}",
index 83abd71981cb4cebee6731bb640a9700ce036f93..8c8e17b7ed12916d89edaaf3043e0b5a35fa4e37 100644 (file)
@@ -527,6 +527,7 @@ class ScrubBackend {
   // accessing the PG backend for this translation service
   uint64_t logical_to_ondisk_size(uint64_t logical_size,
                                  shard_id_t shard_id) const;
+  uint32_t generate_zero_buffer_crc(shard_id_t shard_id, int length) const;
 };
 
 namespace fmt {
index d52b57db4bb6976ceac312c818cf487088f29001..b814d01fa85a7baac52ce4ed276a9fce3ec2161c 100644 (file)
@@ -8,6 +8,7 @@
 #include <string_view>
 
 #include <fmt/ranges.h>
+
 #include "common/ceph_time.h"
 #include "common/fmt_common.h"
 #include "common/scrub_types.h"
@@ -17,6 +18,7 @@
 #include "os/ObjectStore.h"
 #include "osd/osd_perf_counters.h" // for osd_counter_idx_t
 
+#include "ECUtil.h"
 #include "OpRequest.h"
 
 namespace ceph {
@@ -279,6 +281,27 @@ struct PgScrubBeListener {
 
   // If true, the EC optimisations have been enabled.
   virtual bool get_is_ec_optimized() const = 0;
+
+  // If true, EC can decode all shards using the available shards
+  virtual bool ec_can_decode(const shard_id_set& available_shards) const = 0;
+
+  // Returns a map of the data + encoded parity shards when supplied with
+  // a bufferlist containing the data shards
+  virtual shard_id_map<bufferlist> ec_encode_acting_set(
+      const bufferlist& in_bl) const = 0;
+
+  // Returns a map of all shards when given a map with missing shards that need
+  // to be decoded
+  virtual shard_id_map<bufferlist> ec_decode_acting_set(
+      const shard_id_map<bufferlist>& shard_map, int chunk_size) const = 0;
+
+  // If true, the EC profile supports passing CRCs through the EC plugin encode
+  // and decode functions to get a resulting CRC that is the same as if you were
+  // to encode or decode the data and take the CRC of the resulting shards
+  virtual bool get_ec_supports_crc_encode_decode() const = 0;
+
+  // Returns the stripe_info_t used by the PG in EC
+  virtual ECUtil::stripe_info_t get_ec_sinfo() const = 0;
 };
 
 // defining a specific subset of performance counters. Each of the members
index fc183a682cf8503a012788fac514727ea09ce6b9..ef154eca4e6d3ea714c1fdd2176aa4ed5f662a40 100644 (file)
@@ -96,11 +96,37 @@ class TestPg : public PgScrubBeListener {
     return logical_size;
   }
 
+  bool ec_can_decode(const shard_id_set& available_shards) const final {
+    return false;
+  };
+
+  shard_id_map<bufferlist> ec_encode_acting_set(
+      const bufferlist& in_bl) const final {
+    return shard_id_map<bufferlist>(0);
+  };
+
+  shard_id_map<bufferlist> ec_decode_acting_set(
+      const shard_id_map<bufferlist>& chunks, int chunk_size) const final {
+    return shard_id_map<bufferlist>(0);
+  }
+
+  bool get_ec_supports_crc_encode_decode() const final {
+    return get_is_ec_optimized();
+  }
+
+  ECUtil::stripe_info_t get_ec_sinfo() const final { return *m_sinfo; }
+
+  void set_stripe_info(unsigned int k, unsigned int m, uint64_t stripe_width,
+                       const pg_pool_t* pool) {
+    m_sinfo.reset(new ECUtil::stripe_info_t{k, m, stripe_width, pool});
+  }
+
   bool is_waiting_for_unreadable_object() const final { return false; }
 
   std::shared_ptr<PGPool> m_pool;
   pg_info_t& m_info;
   pg_shard_t m_pshard;
+  std::unique_ptr<ECUtil::stripe_info_t> m_sinfo;
 
   bool get_is_nonprimary_shard(const pg_shard_t &pg_shard) const final
   {