cct)));
}
+bool ECBackend::ec_can_decode(const shard_id_set &available_shards) const {
+ if (sinfo.supports_sub_chunks()) {
+ ceph_abort_msg("Interface does not support subchunks");
+ return false;
+ }
+
+ mini_flat_map<shard_id_t, std::vector<std::pair<int, int>>>
+ minimum_sub_chunks{ec_impl->get_chunk_count()};
+ shard_id_set want_to_read = sinfo.get_all_shards();
+ shard_id_set available(available_shards);
+ shard_id_set minimum_set;
+
+ int r = ec_impl->minimum_to_decode(want_to_read, available, minimum_set,
+ &minimum_sub_chunks);
+ return (r == 0);
+}
+
+shard_id_map<bufferlist> ECBackend::ec_encode_acting_set(
+ const bufferlist &in_bl) const {
+ shard_id_set want_to_encode;
+ for (raw_shard_id_t raw_shard_id;raw_shard_id < ec_impl->get_chunk_count();
+ ++raw_shard_id) {
+ want_to_encode.insert(sinfo.get_shard(raw_shard_id));
+ }
+ shard_id_map<bufferlist> encoded{ec_impl->get_chunk_count()};
+ ec_impl->encode(want_to_encode, in_bl, &encoded);
+ return encoded;
+}
+
+shard_id_map<bufferlist> ECBackend::ec_decode_acting_set(
+ const shard_id_map<bufferlist> &shard_map, int chunk_size) const {
+ shard_id_set want_to_read;
+ for (raw_shard_id_t raw_shard_id; raw_shard_id < ec_impl->get_chunk_count();
+ ++raw_shard_id) {
+ shard_id_t shard_id = sinfo.get_shard(raw_shard_id);
+ if (!shard_map.contains(shard_id)) want_to_read.insert(shard_id);
+ }
+
+ shard_id_map<bufferlist> decoded_buffers(ec_impl->get_chunk_count());
+ ec_impl->decode(want_to_read, shard_map, &decoded_buffers, chunk_size);
+
+ shard_id_map<bufferlist> decoded_buffer_map{ec_impl->get_chunk_count()};
+ for (auto &[shard_id, bl] : decoded_buffers) {
+ decoded_buffer_map[shard_id] = bl;
+ }
+
+ return decoded_buffer_map;
+}
+
+ECUtil::stripe_info_t ECBackend::ec_get_sinfo() const { return sinfo; }
+
void ECBackend::objects_read_and_reconstruct(
const map<hobject_t, std::list<ec_align_t>> &reads,
bool fast_read,
return -EINPROGRESS;
}
- o.digest = 0;
+ if (sinfo.supports_encode_decode_crcs()) {
+ // We pass the calculated digest here
+ // This will be used along with the plugin to verify data consistency
+ o.digest = pos.data_hash.digest();
+ }
+ else
+ {
+ o.digest = 0;
+ }
+
o.digest_present = true;
o.omap_digest = -1;
o.omap_digest_present = true;
bool fast_read = false
);
+ bool ec_can_decode(const shard_id_set &available_shards) const;
+ shard_id_map<bufferlist> ec_encode_acting_set(const bufferlist &in_bl) const;
+ shard_id_map<bufferlist> ec_decode_acting_set(
+ const shard_id_map<bufferlist> &shard_map, int chunk_size) const;
+ ECUtil::stripe_info_t ec_get_sinfo() const;
+
private:
friend struct ECRecoveryHandle;
return sinfo.get_chunk_size();
}
+ bool get_ec_supports_crc_encode_decode() const {
+ return sinfo.supports_encode_decode_crcs();
+ }
+
uint64_t object_size_to_shard_size(const uint64_t size, shard_id_t shard
) const {
return sinfo.object_size_to_shard_size(size, shard);
END_IGNORE_DEPRECATED
}
};
+
std::unique_ptr<ECRecPred> get_is_recoverable_predicate() const {
return std::make_unique<ECRecPred>(ec_impl);
}
unsigned get_ec_data_chunk_count() const {
return ec_impl->get_data_chunk_count();
}
+
int get_ec_stripe_chunk_size() const {
return sinfo.get_chunk_size();
}
+
+ bool get_ec_supports_crc_encode_decode() const { return false; }
+
uint64_t object_size_to_shard_size(const uint64_t size) const {
if (size == std::numeric_limits<uint64_t>::max()) {
return size;
return legacy.get_ec_stripe_chunk_size();
}
+ bool get_ec_supports_crc_encode_decode() const override {
+ if (is_optimized()) {
+ return optimized.get_ec_supports_crc_encode_decode();
+ }
+ return legacy.get_ec_supports_crc_encode_decode();
+ }
+
+ bool ec_can_decode(const shard_id_set &available_shards) const override {
+ if (is_optimized()) {
+ return optimized.ec_can_decode(available_shards);
+ }
+
+ return false;
+ }
+
+ shard_id_map<bufferlist> ec_encode_acting_set(
+ const bufferlist &in_bl) const override {
+ if (is_optimized()) {
+ return optimized.ec_encode_acting_set(in_bl);
+ }
+
+ ceph_abort_msg("This interface is not supported by legacy EC");
+ return {0};
+ }
+
+ shard_id_map<bufferlist> ec_decode_acting_set(
+ const shard_id_map<bufferlist> &shard_map,
+ int chunk_size) const override {
+ if (is_optimized()) {
+ return optimized.ec_decode_acting_set(shard_map, chunk_size);
+ }
+
+ ceph_abort_msg("This interface is not supported by legacy EC");
+ return {0};
+ }
+
+ ECUtil::stripe_info_t ec_get_sinfo() const {
+ if (is_optimized()) {
+ return optimized.ec_get_sinfo();
+ }
+
+ ceph_abort_msg("This interface is not supported by legacy EC");
+ return {0, 0, 0};
+ }
+
int objects_get_attrs(
const hobject_t &hoid,
std::map<std::string, ceph::buffer::list, std::less<>> *out) override
return all_shards;
}
-
public:
stripe_info_t(const ErasureCodeInterfaceRef &ec_impl, const pg_pool_t *pool,
uint64_t stripe_width
return pool->allows_ecoverwrites();
}
+ bool supports_ec_optimisations() const {
+ return pool->allows_ecoptimizations();
+ }
+
bool supports_sub_chunks() const {
return (plugin_flags &
ErasureCodeInterface::FLAG_EC_PLUGIN_REQUIRE_SUB_CHUNKS) != 0;
shard_id_t shard_id) const final {
return get_pgbackend()->be_get_ondisk_size(logical_size, shard_id_t(shard_id));
}
+
+ bool ec_can_decode(const shard_id_set &available_shards) const final {
+ return get_pgbackend()->ec_can_decode(available_shards);
+ }
+
+ shard_id_map<bufferlist> ec_encode_acting_set(
+ const bufferlist &in_bl) const final {
+ return get_pgbackend()->ec_encode_acting_set(in_bl);
+ }
+
+ shard_id_map<bufferlist> ec_decode_acting_set(
+ const shard_id_map<bufferlist> &shard_map, int chunk_size) const final {
+ return get_pgbackend()->ec_decode_acting_set(shard_map, chunk_size);
+ }
+
+ bool get_ec_supports_crc_encode_decode() const final {
+ return get_pgbackend()->get_ec_supports_crc_encode_decode();
+ }
+
+ ECUtil::stripe_info_t get_ec_sinfo() const final {
+ return get_pgbackend()->ec_get_sinfo();
+ }
};
/**
virtual IsPGReadablePredicate *get_is_readable_predicate() const = 0;
virtual unsigned int get_ec_data_chunk_count() const { return 0; };
virtual int get_ec_stripe_chunk_size() const { return 0; };
+ virtual bool get_ec_supports_crc_encode_decode() const = 0;
virtual uint64_t object_size_to_shard_size(const uint64_t size, shard_id_t shard) const { return size; };
virtual void dump_recovery_info(ceph::Formatter *f) const = 0;
virtual bool get_is_nonprimary_shard(shard_id_t shard) const {
virtual bool get_is_ec_optimized() const {
return false; // Only EC can have be ec optimized!
}
+ virtual bool ec_can_decode(const shard_id_set &available_shards) const = 0;
+ virtual shard_id_map<bufferlist> ec_encode_acting_set(
+ const bufferlist &in_bl) const = 0;
+ virtual shard_id_map<bufferlist> ec_decode_acting_set(
+ const shard_id_map<bufferlist> &shard_map, int chunk_size) const = 0;
+ virtual ECUtil::stripe_info_t ec_get_sinfo() const = 0;
private:
std::set<hobject_t> temp_contents;
ceph_abort_msg("async read is not used by replica pool");
}
+bool ReplicatedBackend::get_ec_supports_crc_encode_decode() const {
+ ceph_abort_msg("crc encode decode is not used by replica pool");
+ return false;
+}
+
+bool ReplicatedBackend::ec_can_decode(
+ const shard_id_set &available_shards) const {
+ ceph_abort_msg("can decode is not used by replica pool");
+ return false;
+}
+
+shard_id_map<bufferlist> ReplicatedBackend::ec_encode_acting_set(
+ const bufferlist &in_bl) const {
+ ceph_abort_msg("encode is not used by replica pool");
+ return {0};
+}
+
+shard_id_map<bufferlist> ReplicatedBackend::ec_decode_acting_set(
+ const shard_id_map<bufferlist> &shard_map, int chunk_size) const {
+ ceph_abort_msg("decode is not used by replica pool");
+ return {0};
+}
+
+ECUtil::stripe_info_t ReplicatedBackend::ec_get_sinfo() const {
+ ceph_abort_msg("get_ec_sinfo is not used by replica pool");
+ return {0, 0, 0};
+}
+
class C_OSD_OnOpCommit : public Context {
ReplicatedBackend *pg;
ceph::ref_t<ReplicatedBackend::InProgressOp> op;
std::pair<ceph::buffer::list*, Context*> > > &to_read,
Context *on_complete,
bool fast_read = false) override;
-
-private:
+ bool get_ec_supports_crc_encode_decode() const override;
+ ECUtil::stripe_info_t ec_get_sinfo() const override;
+ bool ec_can_decode(const shard_id_set &available_shards) const override;
+ shard_id_map<bufferlist> ec_encode_acting_set(
+ const bufferlist &in_bl) const override;
+ shard_id_map<bufferlist> ec_decode_acting_set(
+ const shard_id_map<bufferlist> &shard_map, int chunk_size) const override;
+
+ private:
// push
struct push_info_t {
ObjectRecoveryProgress recovery_progress;
[i_am](const pg_shard_t& shard) { return shard != i_am; });
m_is_replicated = m_pool.info.is_replicated();
+ m_is_optimized_ec = m_pool.info.allows_ecoptimizations();
m_mode_desc =
(m_repair ? "repair"sv
: (m_depth == scrub_level_t::deep ? "deep-scrub"sv : "scrub"sv));
{
m_formatted_id = m_pg_id.calc_name_sring();
m_is_replicated = m_pool.info.is_replicated();
+ m_is_optimized_ec = m_pool.info.allows_ecoptimizations();
m_mode_desc =
(m_repair ? "repair"sv
: (m_depth == scrub_level_t::deep ? "deep-scrub"sv : "scrub"sv));
return m_pg.logical_to_ondisk_size(logical_size, shard_id);
}
+uint32_t ScrubBackend::generate_zero_buffer_crc(shard_id_t shard_id,
+ int length) const {
+ // Shards can have different lengths.
+ // Lengths of zero buffers need to match the length of the shard.
+ // So we initialise a new buffer to the correct length per shard.
+ bufferlist zero_bl;
+ zero_bl.append_zero(logical_to_ondisk_size(length, shard_id));
+
+ bufferhash zero_data_hash(-1);
+ zero_data_hash << zero_bl;
+ return zero_data_hash.digest();
+}
+
void ScrubBackend::update_repair_status(bool should_repair)
{
dout(15) << __func__
/// that is auth eligible.
/// This creates an issue with 'digest_match' that should be handled.
std::list<pg_shard_t> shards;
+ shard_id_set available_shards;
+ uint32_t digest_map_size = 0;
+ if (!m_is_replicated && m_pg.get_ec_supports_crc_encode_decode()) {
+ digest_map_size = m_pg.get_ec_sinfo().get_k_plus_m();
+ }
+ shard_id_map<bufferlist> digest_map{digest_map_size};
+
for (const auto& [srd, smap] : this_chunk->received_maps) {
if (srd != m_pg_whoami) {
shards.push_back(srd);
}
+
+ if (!m_is_replicated && m_pg.get_ec_supports_crc_encode_decode() &&
+ smap.objects.contains(ho)) {
+ available_shards.insert(srd.shard);
+
+ uint32_t digest = smap.objects.at(ho).digest;
+ constexpr std::size_t length = sizeof(digest);
+ char crc_bytes[length];
+ for (std::size_t i = 0; i < length; i++) {
+ crc_bytes[i] = digest >> (8 * i) & 0xFF;
+ }
+ ceph::bufferptr b = ceph::buffer::create_page_aligned(
+ m_pg.get_ec_sinfo().get_chunk_size());
+ b.copy_in(0, length, crc_bytes);
+
+ digest_map[srd.shard] = bufferlist{};
+ digest_map[srd.shard].append(b);
+ }
}
shards.push_front(m_pg_whoami);
}
}
+ if (auth_version != eversion_t() && !m_is_replicated &&
+ m_pg.get_ec_supports_crc_encode_decode() &&
+ available_shards.size() != 0) {
+ if (m_pg.ec_can_decode(available_shards)) {
+ // Decode missing data shards needed to do an encode
+ // Only bother doing this if the number of missing shards is less than the
+ // number of parity shards
+
+ int missing_shards =
+ std::count_if(m_pg.get_ec_sinfo().get_data_shards().begin(),
+ m_pg.get_ec_sinfo().get_data_shards().end(),
+ [&available_shards](const auto& shard_id) {
+ return available_shards.contains(shard_id);
+ });
+
+ const int num_redundancy_shards = m_pg.get_ec_sinfo().get_m();
+ if (missing_shards > 0 && missing_shards < num_redundancy_shards) {
+ dout(10) << fmt::format(
+ "{}: Decoding {} missing shards for pg {} "
+ "as only received shards were ({}).",
+ __func__, missing_shards, m_pg_whoami, available_shards)
+ << dendl;
+ digest_map = m_pg.ec_decode_acting_set(
+ digest_map, m_pg.get_ec_sinfo().get_chunk_size());
+ } else if (missing_shards != 0) {
+ dout(5) << fmt::format(
+ "{}: Cannot decode {} shards from pg {} "
+ "when only shards {} were received. Ignoring.",
+ __func__, missing_shards, m_pg_whoami, available_shards)
+ << dendl;
+ } else {
+ dout(30) << fmt::format(
+ "{}: All shards received for pg {}. "
+ "skipping decoding.",
+ __func__, m_pg_whoami)
+ << dendl;
+ }
+
+ bufferlist crc_bl;
+ for (const auto& shard_id : m_pg.get_ec_sinfo().get_data_shards()) {
+ uint32_t zero_data_crc = generate_zero_buffer_crc(
+ shard_id, logical_to_ondisk_size(ret_auth.auth_oi.size, shard_id));
+ for (std::size_t i = 0; i < sizeof(zero_data_crc); i++) {
+ digest_map[shard_id].c_str()[i] =
+ digest_map[shard_id][i] ^ ((zero_data_crc >> (8 * i)) & 0xff);
+ }
+
+ crc_bl.append(digest_map[shard_id]);
+ }
+
+ shard_id_map<bufferlist> encoded_crcs = m_pg.ec_encode_acting_set(crc_bl);
+
+ if (encoded_crcs[shard_id_t(m_pg.get_ec_sinfo().get_k())] !=
+ digest_map[shard_id_t(m_pg.get_ec_sinfo().get_k())]) {
+ ret_auth.digest_match = false;
+ }
+ } else {
+ dout(10) << fmt::format(
+ "{}: Cannot decode missing shards in pg {} "
+ "when only shards {} were received. Ignoring.",
+ __func__, m_pg_whoami, available_shards)
+ << dendl;
+ }
+ }
+
dout(10) << fmt::format("{}: selecting osd {} for obj {} with oi {}",
__func__,
ret_auth.auth_shard,
{
std::list<pg_shard_t> auth_list; // out "param" to
std::set<pg_shard_t> object_errors; // be returned
+ std::size_t digest_size = 0;
+ if (!m_is_replicated && m_pg.get_ec_supports_crc_encode_decode()) {
+ digest_size = m_pg.get_ec_sinfo().get_k_plus_m();
+ }
+ shard_id_map<bufferlist> digests{digest_size};
for (auto& [srd, smap] : this_chunk->received_maps) {
ho.has_snapset(),
srd);
+ if (!m_is_replicated && m_pg.get_ec_supports_crc_encode_decode()) {
+ // Create map containing all data shards except current shard and all
+ // parity shards Decode the current data shard Add to set<shard_id>
+ // incorrectly_decoded_shards if the shard did not decode
+
+ constexpr std::size_t length = sizeof(smap.objects[ho].digest);
+ char crc_bytes[length];
+ for (std::size_t i = 0; i < length; i++) {
+ crc_bytes[i] = smap.objects[ho].digest >> (8 * i) & 0xFF;
+ }
+ ceph::bufferptr b = ceph::buffer::create_page_aligned(
+ m_pg.get_ec_sinfo().get_chunk_size());
+ b.copy_in(0, length, crc_bytes);
+
+ digests[srd.shard] = bufferlist{};
+ digests[srd.shard].append(b);
+ }
+
dout(20) << fmt::format(
"{}: {}{} <{}:{}> shards: {} {} {}", __func__,
(m_repair ? "repair " : ""),
<< dendl;
}
+ if (!m_is_replicated && m_pg.get_ec_supports_crc_encode_decode()) {
+ set<shard_id_t> incorrectly_decoded_shards;
+
+ if (std::any_of(
+ digests.begin(), digests.end(),
+ [](const std::pair<const shard_id_t, ceph::bufferlist&>& digest) {
+ return !std::string(digest.second.c_str()).empty();
+ })) {
+ // Unseed all buffers in chunks
+ for (auto& [srd, bl] : digests) {
+ uint32_t zero_data_crc = generate_zero_buffer_crc(
+ srd, logical_to_ondisk_size(auth_sel.auth_oi.size, srd));
+ for (uint32_t i = 0; i < sizeof(zero_data_crc); i++) {
+ bl.c_str()[i] = bl[i] ^ ((zero_data_crc >> (8 * i)) & 0xff);
+ }
+ }
+
+ // For each digest, we will remove it from our map and then redecode it
+ // using the erasure coding plugin for this pool.
+ // When we find it does not decode back correctly, we know we have found
+ // a data consistency issue that should be reported.
+ for (auto& [srd, bl] : digests) {
+ if (m_pg.get_ec_sinfo().get_data_shards().contains(srd)) {
+ bufferlist removed_shard = std::move(bl);
+ digests.erase(srd);
+
+ shard_id_map<bufferlist> decoded_map = m_pg.ec_decode_acting_set(
+ digests, m_pg.get_ec_sinfo().get_chunk_size());
+
+ if (!std::equal(removed_shard.begin(), removed_shard.end(),
+ decoded_map[srd].begin())) {
+ incorrectly_decoded_shards.insert(srd);
+ }
+
+ digests.insert(srd, std::move(removed_shard));
+ }
+ }
+ }
+
+ if (incorrectly_decoded_shards.size() == 1) {
+ obj_result.set_data_digest_mismatch();
+ this_chunk->m_error_counts.deep_errors++;
+ errstream << m_pg_id << " " << ho << "data digest inconsistent on shard "
+ << *incorrectly_decoded_shards.begin() << "\n";
+ } else if (incorrectly_decoded_shards.size() <
+ m_pg.get_ec_sinfo().get_k()) {
+ for (shard_id_t incorrectly_decoded_shard : incorrectly_decoded_shards) {
+ obj_result.set_data_digest_mismatch();
+ this_chunk->m_error_counts.deep_errors++;
+ errstream << m_pg_id << " " << ho << "data digest inconsistent on shard "
+ << incorrectly_decoded_shard << "\n";
+ }
+ } else if (incorrectly_decoded_shards.size() ==
+ m_pg.get_ec_sinfo().get_k()) {
+ obj_result.set_data_digest_mismatch();
+ this_chunk->m_error_counts.deep_errors++;
+ errstream << m_pg_id << " " << ho << "data digests are inconsistent\n";
+ }
+ }
+
dout(15) << fmt::format("{}: auth_list: {} #: {}; obj-errs#: {}",
__func__,
auth_list,
// ------------------------------------------------------------------------
- if (auth.digest_present && candidate.digest_present &&
+ if (m_is_replicated && auth.digest_present && candidate.digest_present &&
auth.digest != candidate.digest) {
fmt::format_to(std::back_inserter(out),
"data_digest {:#x} != data_digest {:#x} from shard {}",
obj_result.set_data_digest_mismatch();
}
- if (auth.omap_digest_present && candidate.omap_digest_present &&
+ if (m_is_replicated && auth.omap_digest_present &&
+ candidate.omap_digest_present &&
auth.omap_digest != candidate.omap_digest) {
fmt::format_to(std::back_inserter(out),
"{}omap_digest {:#x} != omap_digest {:#x} from shard {}",
// accessing the PG backend for this translation service
uint64_t logical_to_ondisk_size(uint64_t logical_size,
shard_id_t shard_id) const;
+ uint32_t generate_zero_buffer_crc(shard_id_t shard_id, int length) const;
};
namespace fmt {
#include <string_view>
#include <fmt/ranges.h>
+
#include "common/ceph_time.h"
#include "common/fmt_common.h"
#include "common/scrub_types.h"
#include "os/ObjectStore.h"
#include "osd/osd_perf_counters.h" // for osd_counter_idx_t
+#include "ECUtil.h"
#include "OpRequest.h"
namespace ceph {
// If true, the EC optimisations have been enabled.
virtual bool get_is_ec_optimized() const = 0;
+
+ // If true, EC can decode all shards using the available shards
+ virtual bool ec_can_decode(const shard_id_set& available_shards) const = 0;
+
+ // Returns a map of the data + encoded parity shards when supplied with
+ // a bufferlist containing the data shards
+ virtual shard_id_map<bufferlist> ec_encode_acting_set(
+ const bufferlist& in_bl) const = 0;
+
+ // Returns a map of all shards when given a map with missing shards that need
+ // to be decoded
+ virtual shard_id_map<bufferlist> ec_decode_acting_set(
+ const shard_id_map<bufferlist>& shard_map, int chunk_size) const = 0;
+
+ // If true, the EC profile supports passing CRCs through the EC plugin encode
+ // and decode functions to get a resulting CRC that is the same as if you were
+ // to encode or decode the data and take the CRC of the resulting shards
+ virtual bool get_ec_supports_crc_encode_decode() const = 0;
+
+ // Returns the stripe_info_t used by the PG in EC
+ virtual ECUtil::stripe_info_t get_ec_sinfo() const = 0;
};
// defining a specific subset of performance counters. Each of the members
return logical_size;
}
+ bool ec_can_decode(const shard_id_set& available_shards) const final {
+ return false;
+ };
+
+ shard_id_map<bufferlist> ec_encode_acting_set(
+ const bufferlist& in_bl) const final {
+ return shard_id_map<bufferlist>(0);
+ };
+
+ shard_id_map<bufferlist> ec_decode_acting_set(
+ const shard_id_map<bufferlist>& chunks, int chunk_size) const final {
+ return shard_id_map<bufferlist>(0);
+ }
+
+ bool get_ec_supports_crc_encode_decode() const final {
+ return get_is_ec_optimized();
+ }
+
+ ECUtil::stripe_info_t get_ec_sinfo() const final { return *m_sinfo; }
+
+ void set_stripe_info(unsigned int k, unsigned int m, uint64_t stripe_width,
+ const pg_pool_t* pool) {
+ m_sinfo.reset(new ECUtil::stripe_info_t{k, m, stripe_width, pool});
+ }
+
bool is_waiting_for_unreadable_object() const final { return false; }
std::shared_ptr<PGPool> m_pool;
pg_info_t& m_info;
pg_shard_t m_pshard;
+ std::unique_ptr<ECUtil::stripe_info_t> m_sinfo;
bool get_is_nonprimary_shard(const pg_shard_t &pg_shard) const final
{