From 14a11dedc0db05052879771ad849be79678e529a Mon Sep 17 00:00:00 2001 From: Connor Fawcett Date: Tue, 24 Jun 2025 12:45:06 +0100 Subject: [PATCH] Adds a new command-line utility which can check the consistency of objects within an erasure coded pool. A new test-only inject tells the EC backend to return both data and parity shards to the client so that they can be checked for consistency by the new tool. Supports both optimized and unoptimized EC pools. Signed-off-by: Connor Fawcett --- src/common/json/OSDStructures.cc | 26 ++- src/common/json/OSDStructures.h | 16 +- src/erasure-code/CMakeLists.txt | 1 + src/erasure-code/consistency/CMakeLists.txt | 26 +++ .../consistency/ConsistencyChecker.cc | 169 +++++++++++++++ .../consistency/ConsistencyChecker.h | 68 ++++++ src/erasure-code/consistency/ECEncoder.cc | 205 ++++++++++++++++++ src/erasure-code/consistency/ECEncoder.h | 34 +++ .../consistency/ECEncoderSwitch.cc | 59 +++++ .../consistency/ECEncoderSwitch.h | 32 +++ src/erasure-code/consistency/ECReader.cc | 119 ++++++++++ src/erasure-code/consistency/ECReader.h | 73 +++++++ src/erasure-code/consistency/Pool.cc | 16 ++ src/erasure-code/consistency/Pool.h | 24 ++ src/erasure-code/consistency/RadosCommands.cc | 181 ++++++++++++++++ src/erasure-code/consistency/RadosCommands.h | 33 +++ .../ceph_ec_consistency_checker.cc | 81 +++++++ src/osd/ECBackend.cc | 14 +- src/osd/ECBackendL.cc | 14 +- src/osd/ECCommon.cc | 59 ++++- src/osd/ECCommon.h | 8 + src/osd/ECCommonL.cc | 74 ++++++- src/osd/ECCommonL.h | 7 + src/osd/ECInject.cc | 40 +++- src/osd/ECInject.h | 3 + src/osd/OSD.cc | 38 +++- 26 files changed, 1404 insertions(+), 16 deletions(-) create mode 100644 src/erasure-code/consistency/CMakeLists.txt create mode 100644 src/erasure-code/consistency/ConsistencyChecker.cc create mode 100644 src/erasure-code/consistency/ConsistencyChecker.h create mode 100644 src/erasure-code/consistency/ECEncoder.cc create mode 100644 src/erasure-code/consistency/ECEncoder.h create mode 100644 src/erasure-code/consistency/ECEncoderSwitch.cc create mode 100644 src/erasure-code/consistency/ECEncoderSwitch.h create mode 100644 src/erasure-code/consistency/ECReader.cc create mode 100644 src/erasure-code/consistency/ECReader.h create mode 100644 src/erasure-code/consistency/Pool.cc create mode 100644 src/erasure-code/consistency/Pool.h create mode 100644 src/erasure-code/consistency/RadosCommands.cc create mode 100644 src/erasure-code/consistency/RadosCommands.h create mode 100644 src/erasure-code/consistency/ceph_ec_consistency_checker.cc diff --git a/src/common/json/OSDStructures.cc b/src/common/json/OSDStructures.cc index f9e3d8d75b6..ecf382a7549 100644 --- a/src/common/json/OSDStructures.cc +++ b/src/common/json/OSDStructures.cc @@ -61,14 +61,16 @@ void OSDPoolGetRequest::decode_json(JSONObj* obj) { void OSDPoolGetReply::dump(Formatter* f) const { encode_json("erasure_code_profile", erasure_code_profile, f); + encode_json("allow_ec_optimizations", allow_ec_optimizations, f); } void OSDPoolGetReply::decode_json(JSONObj* obj) { JSONDecoder::decode_json("erasure_code_profile", erasure_code_profile, obj); + JSONDecoder::decode_json("allow_ec_optimizations", allow_ec_optimizations, obj); } void OSDECProfileGetRequest::dump(Formatter* f) const { - encode_json("prefix", "osd pool get", f); + encode_json("prefix", "osd erasure-code-profile get", f); encode_json("name", name, f); encode_json("format", format, f); } @@ -159,4 +161,26 @@ void OSDSetRequest::dump(Formatter* f) const { void OSDSetRequest::decode_json(JSONObj* obj) { JSONDecoder::decode_json("key", key, obj); JSONDecoder::decode_json("yes_i_really_mean_it", yes_i_really_mean_it, obj); +} + +void InjectECParityRead::dump(Formatter* f) const { + encode_json("prefix", "injectparityread", f); + encode_json("pool", pool, f); + encode_json("objname", objname, f); +} + +void InjectECParityRead::decode_json(JSONObj* obj) { + JSONDecoder::decode_json("pool", pool, obj); + JSONDecoder::decode_json("objname", objname, obj); +} + +void InjectECClearParityRead::dump(Formatter* f) const { + encode_json("prefix", "injectclearparityread", f); + encode_json("pool", pool, f); + encode_json("objname", objname, f); +} + +void InjectECClearParityRead::decode_json(JSONObj* obj) { + JSONDecoder::decode_json("pool", pool, obj); + JSONDecoder::decode_json("objname", objname, obj); } \ No newline at end of file diff --git a/src/common/json/OSDStructures.h b/src/common/json/OSDStructures.h index 4fc3e307e2a..a521b7ab544 100644 --- a/src/common/json/OSDStructures.h +++ b/src/common/json/OSDStructures.h @@ -50,7 +50,7 @@ struct OSDPoolGetRequest { struct OSDPoolGetReply { std::string erasure_code_profile; - + bool allow_ec_optimizations; void dump(Formatter* f) const; void decode_json(JSONObj* obj); }; @@ -190,6 +190,20 @@ struct InjectECClearErrorRequest { JSONDecoder::decode_json("type", type, obj); } }; +struct InjectECParityRead { + std::string pool; + std::string objname; + + void dump(Formatter* f) const; + void decode_json(JSONObj* obj); +}; +struct InjectECClearParityRead { + std::string pool; + std::string objname; + + void dump(Formatter* f) const; + void decode_json(JSONObj* obj); +}; } // namespace osd } // namespace messaging } // namespace ceph \ No newline at end of file diff --git a/src/erasure-code/CMakeLists.txt b/src/erasure-code/CMakeLists.txt index 18291f64b5c..6ced377fe3c 100644 --- a/src/erasure-code/CMakeLists.txt +++ b/src/erasure-code/CMakeLists.txt @@ -21,6 +21,7 @@ add_subdirectory(jerasure) add_subdirectory(lrc) add_subdirectory(shec) add_subdirectory(clay) +add_subdirectory(consistency) if(HAVE_NASM_X64_AVX2 OR HAVE_ARMV8_SIMD) set(WITH_EC_ISA_PLUGIN TRUE CACHE BOOL "") diff --git a/src/erasure-code/consistency/CMakeLists.txt b/src/erasure-code/consistency/CMakeLists.txt new file mode 100644 index 00000000000..a2020371e6b --- /dev/null +++ b/src/erasure-code/consistency/CMakeLists.txt @@ -0,0 +1,26 @@ +add_library(ec_consistency STATIC + ECReader.cc + ECEncoder.cc + ECEncoderSwitch.cc + Pool.cc + ConsistencyChecker.cc + RadosCommands.cc + ${PROJECT_SOURCE_DIR}/src/osd/ECUtilL.cc + ${PROJECT_SOURCE_DIR}/src/osd/ECUtil.cc +) + +target_link_libraries(ec_consistency + librados + global + json_structures +) + +add_executable(ceph_ec_consistency_checker + ${CMAKE_CURRENT_SOURCE_DIR}/ceph_ec_consistency_checker.cc + ${PROJECT_SOURCE_DIR}/src/osd/ECUtilL.cc + ${PROJECT_SOURCE_DIR}/src/osd/ECUtil.cc) +target_link_libraries(ceph_ec_consistency_checker + librados global ec_consistency) +install(TARGETS +ceph_ec_consistency_checker + DESTINATION ${CMAKE_INSTALL_BINDIR}) diff --git a/src/erasure-code/consistency/ConsistencyChecker.cc b/src/erasure-code/consistency/ConsistencyChecker.cc new file mode 100644 index 00000000000..1957e5263a7 --- /dev/null +++ b/src/erasure-code/consistency/ConsistencyChecker.cc @@ -0,0 +1,169 @@ +#include "ConsistencyChecker.h" + +#include "RadosCommands.h" +#include "Pool.h" +#include "ECReader.h" +#include "ECEncoder.h" +#include "ECEncoderSwitch.h" + +using ConsistencyChecker = ceph::consistency::ConsistencyChecker; + +using Read = ceph::consistency::Read; +using ReadResult = ceph::consistency::ReadResult; +using bufferlist = ceph::bufferlist; + +ConsistencyChecker::ConsistencyChecker(librados::Rados &rados, + boost::asio::io_context& asio, + const std::string& pool_name, + int stripe_unit) : + rados(rados), + asio(asio), + reader(ceph::consistency::ECReader(rados, asio, pool_name)), + commands(ceph::consistency::RadosCommands(rados)), + pool(pool_name, + commands.get_ec_profile_for_pool(pool_name), + commands.get_pool_allow_ec_optimizations(pool_name)), + encoder(ceph::consistency::ECEncoderSwitch(pool.get_ec_profile(), + stripe_unit, + commands.get_pool_allow_ec_optimizations(pool_name) + )) {} + +/** + * Perform an end-to-end read and consistency check on a single object. + * Current implementation only supports reading the entire object, so length and + * offset should normally be 0. + * + * @param oid string Name of the pool to perform inject on + * @param block_size int Block size for the data being read + * @param offset int Which offset to read from + * @param length int How much data of each shard to read + * @return bool true if consistent, otherwise false + */ +bool ConsistencyChecker::single_read_and_check_consistency(const std::string& oid, + int block_size, + int offset, + int length) +{ + clear_results(); + std::string error_message = ""; + bool success = true; + + auto read = Read(oid, block_size, offset, length); + queue_ec_read(read); + + auto read_results = reader.get_results(); + int result_count = read_results->size(); + if (result_count != 1) { + error_message = "Incorrect number of RADOS read results returned, count: " + + std::to_string(result_count); + success = false; + } + + ReadResult read_result = (*read_results)[0]; + boost::system::error_code ec = read_result.get_ec(); + if (success && ec != boost::system::errc::success) { + error_message = "RADOS Read failed, error message: " + ec.message(); + success = false; + } + + if (success && read_result.get_data().length() == 0) { + error_message = "Empty object returned from RADOS read."; + success = false; + } + + if (success && !check_object_consistency(oid, read_result.get_data())) { + error_message = "Generated parity did not match read in parity shards."; + success = false; + } + + results.push_back({oid, error_message, success}); + commands.inject_clear_parity_read_on_primary_osd(pool.get_pool_name(), + oid); + return success; +} + +/** + * Queue up an EC read with the parity read inject set + * + * @param read Object containing information about the read + */ +void ConsistencyChecker::queue_ec_read(Read read) +{ + commands.inject_parity_read_on_primary_osd(pool.get_pool_name(), + read.get_oid()); + reader.do_read(read); +} + +/** + * Generate parities from the data and compare to the parity shards + * + * @param oid string The object ID of the object being checked + * @param inbl bufferlist The entire contents of the object, including parities + * @param stripe_unit int The chunk size for the object + */ +bool ConsistencyChecker::check_object_consistency(const std::string& oid, + const bufferlist& inbl) +{ + bool is_optimized = pool.has_optimizations_enabled(); + std::pair data_and_parity; + data_and_parity = split_data_and_parity(oid, inbl, encoder.get_k(), + encoder.get_m(), is_optimized); + + std::optional outbl; + outbl = encoder.do_encode(data_and_parity.first); + + if (!outbl.has_value()) { + return false; + } + + return buffers_match(outbl.value(), data_and_parity.second); +} + +void ConsistencyChecker::print_results(std::ostream& out) +{ + out << "Results:" << std::endl; + for (const auto &r : results) { + std::string result_str = (r.get_result()) ? "Passed" : "Failed"; + std::string error_str = r.get_error_message(); + out << "Object ID " << r.get_oid() << ": " << result_str << std::endl; + if (!error_str.empty()) { + out << "Error Message: " << error_str << std::endl; + } + } + + int count = results.size(); + std::string obj_str = (count == 1) ? "object checked." : "objects checked."; + out << "Total: " << count << " " << obj_str << std::endl; +} + +std::pair + ConsistencyChecker::split_data_and_parity(const std::string& oid, + const bufferlist& read, + int k, int m, + bool is_optimized) +{ + uint64_t data_size, parity_size; + + // Optimized EC parity read should return the exact object size + parity shards + // Legacy EC parity read will return the entire padded data shards + parity shards + data_size = is_optimized ? reader.get_object_size(oid) : (read.length() / (k + m)) * k; + parity_size = read.length() - data_size; + + bufferlist data, parity; + auto it = read.begin(); + it.copy(data_size, data); + it.copy(parity_size, parity); + return std::pair(data, parity); +} + +bool ConsistencyChecker::buffers_match(const bufferlist& b1, + const bufferlist& b2) +{ + return (b1.contents_equal(b2)); +} + +void ConsistencyChecker::clear_results() +{ + reader.clear_results(); + results.clear(); +} \ No newline at end of file diff --git a/src/erasure-code/consistency/ConsistencyChecker.h b/src/erasure-code/consistency/ConsistencyChecker.h new file mode 100644 index 00000000000..dd2ee657d90 --- /dev/null +++ b/src/erasure-code/consistency/ConsistencyChecker.h @@ -0,0 +1,68 @@ +#pragma once + +#include +#include +#include "librados/librados_asio.h" +#include "global/global_init.h" +#include "global/global_context.h" + +#include "Pool.h" +#include "ECReader.h" +#include "RadosCommands.h" +#include "ECEncoder.h" +#include "ECEncoderSwitch.h" + +#define dout_context g_ceph_context + +namespace ceph { +namespace consistency { +class ConsistencyCheckResult { + private: + std::string oid; + std::string error_message; + bool result; + + public: + std::string get_oid() const { return oid; } + std::string get_error_message() const { return error_message; } + bool get_result() const { return result; } + ConsistencyCheckResult(std::string oid, + std::string error_message, + bool result) : + oid(oid), + error_message(error_message), + result(result) {} +}; + +class ConsistencyChecker { + private: + librados::Rados& rados; + boost::asio::io_context& asio; + ceph::consistency::ECReader reader; + ceph::consistency::RadosCommands commands; + ceph::consistency::Pool pool; + ceph::consistency::ECEncoderSwitch encoder; + std::vector results; + bool buffers_match(const bufferlist& b1, const bufferlist& b2); + std::pair split_data_and_parity(const std::string& oid, + const bufferlist& read, + int k, int m, + bool is_optimized); + + public: + ConsistencyChecker(librados::Rados& rados, + boost::asio::io_context& asio, + const std::string& pool_name, + int stripe_unit); + void queue_ec_read(Read read); + bool check_object_consistency(const std::string& oid, + const bufferlist& inbl); + void print_results(std::ostream& out); + void clear_results(); + bool single_read_and_check_consistency(const std::string& oid, + int block_size, + int offset, + int length); +}; +} +} \ No newline at end of file diff --git a/src/erasure-code/consistency/ECEncoder.cc b/src/erasure-code/consistency/ECEncoder.cc new file mode 100644 index 00000000000..1b737a9f2a2 --- /dev/null +++ b/src/erasure-code/consistency/ECEncoder.cc @@ -0,0 +1,205 @@ +#include "ECEncoder.h" +#include +#include "common/errno.h" +#include "osd/ECUtil.h" +#include "osd/ECUtilL.h" + +using stripe_info_l_t = ECLegacy::ECUtilL::stripe_info_t; +using stripe_info_o_t = ECUtil::stripe_info_t; + +namespace ceph { +namespace consistency { + +template +ECEncoder::ECEncoder(ceph::ErasureCodeProfile profile, int chunk_size) : + profile(profile), + chunk_size(chunk_size) +{ + int r = ec_init_plugin(ec_impl); + if (r < 0) { + std::cerr << "Failed to initialize plugin: " << r << std::endl; + } + stripe_info = ec_init_sinfo(ec_impl); +} + +/** + * Initialize the ErasureCodeInterfaceRef needed to perform encode. + * + * @param ec_impl Pointer to plugin being initialized + * @returns int 0 if successful, otherwise 1 + */ +template +int ECEncoder::ec_init_plugin(ceph::ErasureCodeInterfaceRef &ec_impl) +{ + auto plugin = profile.find("plugin"); + if (plugin == profile.end()) { + std::cerr << "Invalid profile: plugin not specified." << std::endl; + return 1; + } + + std::stringstream ss; + std::string dir = g_conf().get_val("erasure_code_dir"); + ceph::ErasureCodePluginRegistry::instance().factory(plugin->second, + dir, profile, + &ec_impl, &ss); + if (!ec_impl) { + std::cerr << "Invalid profile: " << ss.str() << std::endl; + return 1; + } + + return 0; +} + +/** + * Initialize the stripe_info_t needed to perform encode. Optimized version for new EC. + * + * @param ec_impl Pointer to plugin object + * @returns Unique pointer to the stripe info object associated with the EC profile + */ +template <> +std::unique_ptr ECEncoder::ec_init_sinfo( + ceph::ErasureCodeInterfaceRef &ec_impl) +{ + uint64_t k = std::stol(profile["k"]); + ceph_assert(k > 0); + uint64_t stripe_width = k * chunk_size; + std::unique_ptr s( + new stripe_info_o_t(ec_impl, nullptr, stripe_width)); + return s; +} + +/** + * Initialize the stripe_info_t needed to perform encode. Legacy version for old EC. + * + * @param ec_impl Pointer to plugin object + * @returns Unique pointer to the stripe info object associated with the EC profile. + */ +template <> +std::unique_ptr ECEncoder::ec_init_sinfo( + ceph::ErasureCodeInterfaceRef &ec_impl) +{ + uint64_t k = stol(profile["k"]); + ceph_assert(k > 0); + uint64_t stripe_width = k * chunk_size; + std::unique_ptr s(new stripe_info_l_t(ec_impl, stripe_width)); + return s; +} + +/** + * Perform encode on the input buffer and place result in the supplied output buffer. + * Optimized EC encode function. + * + * @param inbl Buffer to be encoded + * @returns Optional, returns buffer for the encode output if encode is successful + */ +template <> +std::optional ECEncoder::do_encode(ceph::bufferlist inbl, + stripe_info_o_t &sinfo) +{ + ECUtil::shard_extent_map_t encoded_data(&sinfo); + + uint64_t stripe_width = sinfo.get_stripe_width(); + if (inbl.length() % stripe_width != 0) { + uint64_t pad = stripe_width - inbl.length() % stripe_width; + inbl.append_zero(pad); + } + + sinfo.ro_range_to_shard_extent_map(0, inbl.length(), inbl, encoded_data); + encoded_data.insert_parity_buffers(); + int r = encoded_data.encode(ec_impl, nullptr, encoded_data.get_ro_end()); + if (r < 0) { + std::cerr << "Failed to encode: " << cpp_strerror(r) << std::endl; + return {}; + } + + ceph::bufferlist outbl; + for (const auto &[shard, _] : encoded_data.get_extent_maps()) { + if (shard >= sinfo.get_k()) { + encoded_data.get_shard_first_buffer(shard, outbl); + } + } + + return outbl; +} + +/** + * Perform encode on the input buffer and place result in the supplied output buffer. + * Legacy EC encode function. + * + * @param inbl Buffer to be encoded + * @returns Optional, returns buffer for the encode output if encode is successful + */ +template <> +std::optional ECEncoder::do_encode(ceph::bufferlist inbl, + stripe_info_l_t &sinfo) +{ + uint64_t stripe_width = sinfo.get_stripe_width(); + + if (inbl.length() % stripe_width != 0) { + uint64_t pad = stripe_width - inbl.length() % stripe_width; + inbl.append_zero(pad); + } + + std::set want; + int k_plus_m = sinfo.get_k_plus_m(); + for (int i = 0; i < k_plus_m; i++) { + want.insert(i); + } + + std::map encoded_data; + int r = ECLegacy::ECUtilL::encode(sinfo, ec_impl, inbl, want, &encoded_data); + if (r < 0) { + std::cerr << "Failed to encode, rc: " << r << std::endl; + return {}; + } + + ceph::bufferlist outbl; + for (const auto &[shard, bl] : encoded_data) { + unsigned int raw_shard = sinfo.get_raw_shard(shard); + if (raw_shard >= sinfo.get_k()) { + bufferlist::const_iterator it = bl.begin(); + it.copy_all(outbl); + } + } + + return outbl; +} + +/** + * Generic function which call either legacy or optimized version of encode. + * + * @param inbl Buffer to be encoded + * @returns Optional, returns buffer for the encode output if encode is successful + */ +template +std::optional ECEncoder::do_encode(ceph::bufferlist inbl) +{ + return do_encode(inbl, *(stripe_info.get())); +} + +/** + * Return data shard count for the stripe + * + * @returns int Number of data shards + */ +template +int ECEncoder::get_k() +{ + return stripe_info->get_k(); +} + +/** + * Return parity shard count for the stripe + * + * @returns int Number of parity shards + */ +template +int ECEncoder::get_m() +{ + return stripe_info->get_m(); +} +} +} + +template class ceph::consistency::ECEncoder; +template class ceph::consistency::ECEncoder; diff --git a/src/erasure-code/consistency/ECEncoder.h b/src/erasure-code/consistency/ECEncoder.h new file mode 100644 index 00000000000..30f3e6d0b03 --- /dev/null +++ b/src/erasure-code/consistency/ECEncoder.h @@ -0,0 +1,34 @@ +#pragma once + +#include "include/buffer.h" +#include "erasure-code/ErasureCode.h" +#include "erasure-code/ErasureCodePlugin.h" +#include "global/global_context.h" +#include "global/global_init.h" +#include "osd/ECUtil.h" +#include "osd/ECUtilL.h" + +using stripe_info_l_t = ECLegacy::ECUtilL::stripe_info_t; +using stripe_info_o_t = ECUtil::stripe_info_t; + +namespace ceph { +namespace consistency { +template +class ECEncoder { + private: + ceph::ErasureCodeProfile profile; + ceph::ErasureCodeInterfaceRef ec_impl; + std::unique_ptr stripe_info; + int chunk_size; + int ec_init_plugin(ceph::ErasureCodeInterfaceRef &ec_impl); + std::unique_ptr ec_init_sinfo(ceph::ErasureCodeInterfaceRef &ec_impl); + std::optional do_encode(ceph::bufferlist inbl, + SInfo &sinfo); + public: + ECEncoder(ceph::ErasureCodeProfile profile, int chunk_size); + std::optional do_encode(ceph::bufferlist inbl); + int get_k(void); + int get_m(void); +}; +} +} \ No newline at end of file diff --git a/src/erasure-code/consistency/ECEncoderSwitch.cc b/src/erasure-code/consistency/ECEncoderSwitch.cc new file mode 100644 index 00000000000..9ab97ebf9d8 --- /dev/null +++ b/src/erasure-code/consistency/ECEncoderSwitch.cc @@ -0,0 +1,59 @@ + +#include "ECEncoderSwitch.h" +#include "ECEncoder.h" +#include "osd/ECUtil.h" +#include "osd/ECUtilL.h" + +using stripe_info_l_t = ECLegacy::ECUtilL::stripe_info_t; +using ECEncoderSwitch = ceph::consistency::ECEncoderSwitch; + +ECEncoderSwitch::ECEncoderSwitch(ceph::ErasureCodeProfile profile, + int chunk_size, + bool optimizations_enabled) : + encoder_optimized(ceph::consistency::ECEncoder(profile, chunk_size)), + encoder_legacy(ceph::consistency::ECEncoder(profile, chunk_size)), + optimizations_enabled(optimizations_enabled) {} + +/** + * Generic function which call either legacy or optimized version of encode + * from the correct version of the encoder + * + * @param inbl Buffer to be encoded + * @returns Optional, returns buffer for the encode output if encode is successful + */ +std::optional ECEncoderSwitch::do_encode(ceph::bufferlist inbl) +{ + if (optimizations_enabled) { + return encoder_optimized.do_encode(inbl); + } else { + return encoder_legacy.do_encode(inbl); + } +} + +/** + * Return data shard count for the stripe from the correct version of the encoder + * + * @returns int Number of data shards + */ +int ECEncoderSwitch::get_k() +{ + if (optimizations_enabled) { + return encoder_optimized.get_k(); + } else { + return encoder_legacy.get_k(); + } +} + +/** + * Return parity shard count for the stripe from the correct version of the encoder + * + * @returns int Number of parity shards + */ +int ECEncoderSwitch::get_m() +{ + if (optimizations_enabled) { + return encoder_optimized.get_m(); + } else { + return encoder_legacy.get_m(); + } +} diff --git a/src/erasure-code/consistency/ECEncoderSwitch.h b/src/erasure-code/consistency/ECEncoderSwitch.h new file mode 100644 index 00000000000..d86e911212a --- /dev/null +++ b/src/erasure-code/consistency/ECEncoderSwitch.h @@ -0,0 +1,32 @@ +#pragma once + +#include "ECEncoder.h" +#include "include/buffer.h" +#include "erasure-code/ErasureCode.h" +#include "erasure-code/ErasureCodePlugin.h" +#include "global/global_context.h" +#include "global/global_init.h" +#include "osd/ECUtil.h" +#include "osd/ECUtilL.h" + +using stripe_info_l_t = ECLegacy::ECUtilL::stripe_info_t; + +namespace ceph { +namespace consistency { +class ECEncoderSwitch { + private: + ceph::consistency::ECEncoder encoder_optimized; + ceph::consistency::ECEncoder encoder_legacy; + bool optimizations_enabled; + + public: + ECEncoderSwitch(ceph::ErasureCodeProfile profile, + int chunk_size, + bool optimizations_enabled); + std::optional do_encode(ceph::bufferlist inbl); + int get_k(void); + int get_m(void); +}; +} +} + diff --git a/src/erasure-code/consistency/ECReader.cc b/src/erasure-code/consistency/ECReader.cc new file mode 100644 index 00000000000..b89d963196d --- /dev/null +++ b/src/erasure-code/consistency/ECReader.cc @@ -0,0 +1,119 @@ +#include "ECReader.h" + +#include + + +using ECReader = ceph::consistency::ECReader; +using ReadResult = ceph::consistency::ReadResult; +using Read = ceph::consistency::Read; + +Read::Read(const std::string& oid, + uint64_t block_size, + uint64_t offset, + uint64_t length) : +oid(oid), +block_size(block_size), +offset(offset), +length(length) {} + +std::string Read::get_oid() { return oid; } +uint64_t Read::get_block_size() { return block_size; } +uint64_t Read::get_offset() { return offset; } +uint64_t Read::get_length() { return length; } + + +ECReader::ECReader(librados::Rados& rados, + boost::asio::io_context& asio, + const std::string& pool_name) : + rados(rados), + asio(asio), + pool_name(pool_name), + lock(ceph::make_mutex("ECReader::lock")), + outstanding_io(0) +{ + int rc; + rc = rados.ioctx_create(pool_name.c_str(), io); + ceph_assert(rc == 0); +} + +void ECReader::start_io() +{ + std::lock_guard l(lock); + outstanding_io++; +} + +void ECReader::finish_io() +{ + std::lock_guard l(lock); + ceph_assert(outstanding_io > 0); + outstanding_io--; + cond.notify_all(); +} + +void ECReader::wait_for_io() +{ + std::unique_lock l(lock); + while (outstanding_io > 0) { + cond.wait(l); + } +} + +/** + * Send a syncronous stat request to librados. + * + * @param oid Object ID. + * @returns The size of the object, -1 if the stat failed. + */ +uint64_t ECReader::get_object_size(std::string oid) +{ + uint64_t size; + int r = io.stat(oid, &size, nullptr); + if (r != 0) { + return -1; + } + return size; +} + +/** + * Send an async read request to librados. Push the returned data + * and oid into the results vector. + * + * @param read Read object containing oid, length, offset and block size + */ +void ECReader::do_read(Read read) +{ + start_io(); + librados::ObjectReadOperation op; + op.read(read.get_offset() * read.get_block_size(), + read.get_length() * read.get_block_size(), + nullptr, nullptr); + + std::string oid = read.get_oid(); + auto read_cb = [&, oid](boost::system::error_code ec, + version_t ver, + bufferlist outbl) { + results.push_back({oid, ec, outbl}); + finish_io(); + }; + + librados::async_operate(asio.get_executor(), io, read.get_oid(), + std::move(op), 0, nullptr, read_cb); +} + +/** + * Wait for all outstanding reads to complete then return the results vector. + * @returns Vector containing the results of all reads + */ +std::vector* ECReader::get_results() +{ + wait_for_io(); + return &results; +} + +/** + * Clear the results vector. + */ +void ECReader::clear_results() +{ + results.clear(); +} \ No newline at end of file diff --git a/src/erasure-code/consistency/ECReader.h b/src/erasure-code/consistency/ECReader.h new file mode 100644 index 00000000000..a455627b128 --- /dev/null +++ b/src/erasure-code/consistency/ECReader.h @@ -0,0 +1,73 @@ +#pragma once + +#include +#include +#include "librados/librados_asio.h" +#include "global/global_init.h" +#include "global/global_context.h" + +#define dout_context g_ceph_context + +namespace ceph { +namespace consistency { +class ReadResult { + private: + std::string oid; + boost::system::error_code ec; + ceph::bufferlist data; + + public: + std::string get_oid() const { return oid; } + boost::system::error_code get_ec() const { return ec; } + ceph::bufferlist get_data() const { return data; } + ReadResult(std::string oid, + boost::system::error_code ec, + ceph::bufferlist data) : + oid(oid), + ec(ec), + data(data) {} +}; + +class Read { + private: + std::string oid; + uint64_t block_size; + uint64_t offset; + uint64_t length; + + public: + Read(const std::string& oid, + uint64_t block_size, + uint64_t offset, + uint64_t length); + std::string get_oid(void); + uint64_t get_block_size(void); + uint64_t get_offset(void); + uint64_t get_length(void); +}; +class ECReader { + private: + librados::Rados& rados; + boost::asio::io_context& asio; + std::string pool_name; + std::string oid; + librados::IoCtx io; + ceph::condition_variable cond; + std::vector results; + ceph::mutex lock; + int outstanding_io; + + public: + ECReader(librados::Rados& rados, + boost::asio::io_context& asio, + const std::string& pool); + uint64_t get_object_size(std::string oid); + void do_read(Read read); + void start_io(void); + void finish_io(void); + void wait_for_io(void); + std::vector* get_results(void); + void clear_results(void); +}; +} +} \ No newline at end of file diff --git a/src/erasure-code/consistency/Pool.cc b/src/erasure-code/consistency/Pool.cc new file mode 100644 index 00000000000..50178c3f646 --- /dev/null +++ b/src/erasure-code/consistency/Pool.cc @@ -0,0 +1,16 @@ +#include "Pool.h" + +using Pool = ceph::consistency::Pool; + +Pool::Pool(const std::string& pool_name, + const ceph::ErasureCodeProfile& profile, + bool optimizations_enabled) : + pool_name(pool_name), + profile(profile), + optimizations_enabled(optimizations_enabled) +{ +} + +ceph::ErasureCodeProfile Pool::get_ec_profile() { return profile; } +std::string Pool::get_pool_name() { return pool_name; } +bool Pool::has_optimizations_enabled() { return optimizations_enabled; } diff --git a/src/erasure-code/consistency/Pool.h b/src/erasure-code/consistency/Pool.h new file mode 100644 index 00000000000..fb1c61d3a09 --- /dev/null +++ b/src/erasure-code/consistency/Pool.h @@ -0,0 +1,24 @@ +#pragma once + +#include "global/global_init.h" +#include "global/global_context.h" +#include "erasure-code/ErasureCodePlugin.h" + +namespace ceph { +namespace consistency { +class Pool { + private: + std::string pool_name; + ceph::ErasureCodeProfile profile; + bool optimizations_enabled; + + public: + Pool(const std::string& pool_name, + const ceph::ErasureCodeProfile& profile, + bool optimizations_enabled); + ceph::ErasureCodeProfile get_ec_profile(void); + std::string get_pool_name(void); + bool has_optimizations_enabled(void); +}; +} +} \ No newline at end of file diff --git a/src/erasure-code/consistency/RadosCommands.cc b/src/erasure-code/consistency/RadosCommands.cc new file mode 100644 index 00000000000..145ce35e7a2 --- /dev/null +++ b/src/erasure-code/consistency/RadosCommands.cc @@ -0,0 +1,181 @@ +#include "RadosCommands.h" +#include "common/ceph_json.h" +#include "common/json/OSDStructures.h" +#include "erasure-code/ErasureCodePlugin.h" +#include + +using RadosCommands = ceph::consistency::RadosCommands; + +RadosCommands::RadosCommands(librados::Rados& rados) : + rados(rados), + formatter(new JSONFormatter(true)) +{ +} + +/** + * Return the index of the acting primary OSD for the given pool + * and object name. Assert on failure. + * + * @param pool_name string Name of the pool to find acting primary of + * @param oid string OID of the object to find acting primary of + * @returns int ID of the acting primary OSD + */ +int RadosCommands::get_primary_osd(const std::string& pool_name, + const std::string& oid) +{ + ceph::messaging::osd::OSDMapRequest osd_map_request{pool_name, oid, ""}; + encode_json("OSDMapRequest", osd_map_request, formatter.get()); + + std::ostringstream oss; + formatter.get()->flush(oss); + + ceph::bufferlist inbl, outbl; + int rc = rados.mon_command(oss.str(), inbl, &outbl, nullptr); + ceph_assert(rc == 0); + + JSONParser p; + bool success = p.parse(outbl.c_str(), outbl.length()); + ceph_assert(success); + + ceph::messaging::osd::OSDMapReply reply; + reply.decode_json(&p); + int osd = reply.acting_primary; + ceph_assert(osd >= 0); + + return osd; +} + +/** + * Send a mon command to fetch the value of the 'allow_ec_optimizations' flag for the + * specified pool and return it. + * + * @param pool_name string Name of the pool to get the erasure code profile for + * @returns bool Whether allow EC optimizations is set on the pool + */ +bool RadosCommands::get_pool_allow_ec_optimizations(const std::string& pool_name) +{ + ceph::messaging::osd::OSDPoolGetRequest osd_pool_get_request{pool_name, "allow_ec_optimizations"}; + encode_json("OSDPoolGetRequest", osd_pool_get_request, formatter.get()); + + std::ostringstream oss; + formatter.get()->flush(oss); + + ceph::bufferlist inbl, outbl; + int rc = rados.mon_command(oss.str(), inbl, &outbl, nullptr); + ceph_assert(rc == 0); + + JSONParser p; + bool success = p.parse(outbl.c_str(), outbl.length()); + ceph_assert(success); + + ceph::messaging::osd::OSDPoolGetReply osd_pool_get_reply; + osd_pool_get_reply.decode_json(&p); + + return osd_pool_get_reply.allow_ec_optimizations; +} + +/** + * Send a mon command to fetch the name of the erasure code profile for the + * specified pool and return it. + * + * @param pool_name string Name of the pool to get the erasure code profile for + * @returns string The erasure code profile for the specified pool + */ +std::string RadosCommands::get_pool_ec_profile_name(const std::string& pool_name) +{ + ceph::messaging::osd::OSDPoolGetRequest osd_pool_get_request{pool_name}; + encode_json("OSDPoolGetRequest", osd_pool_get_request, formatter.get()); + + std::ostringstream oss; + formatter.get()->flush(oss); + + ceph::bufferlist inbl, outbl; + int rc = rados.mon_command(oss.str(), inbl, &outbl, nullptr); + ceph_assert(rc == 0); + + JSONParser p; + bool success = p.parse(outbl.c_str(), outbl.length()); + ceph_assert(success); + + ceph::messaging::osd::OSDPoolGetReply osd_pool_get_reply; + osd_pool_get_reply.decode_json(&p); + + return osd_pool_get_reply.erasure_code_profile; +} + +/** + * Fetch the erasure code profile for the specified pool and return it. + * + * @param pool_name string Name of the pool to get the EC profile for + * @returns ErasureCodeProfile The EC profile for the specified pool + */ +ceph::ErasureCodeProfile RadosCommands::get_ec_profile_for_pool(const std::string& pool_name) +{ + ceph::messaging::osd::OSDECProfileGetRequest osd_ec_profile_get_req{ + get_pool_ec_profile_name(pool_name), "plain"}; + encode_json("OSDECProfileGetRequest", osd_ec_profile_get_req, formatter.get()); + + std::ostringstream oss; + formatter.get()->flush(oss); + + ceph::bufferlist inbl, outbl; + int rc = rados.mon_command(oss.str(), inbl, &outbl, nullptr); + ceph_assert(rc == 0); + + // Parse the string output into an ErasureCodeProfile + ceph::ErasureCodeProfile profile; + std::string line, key, val, out(outbl.c_str(), outbl.length()); + std::stringstream ss(out); + + while (std::getline(ss, line)) { + key = line.substr(0, line.find("=")); + val = line.substr(line.find("=") + 1, line.length() - 1); + profile.emplace(key, val); + } + + return profile; +} + +/** + * RadosCommands the parity read inject on the acting primary + * for the specified object and pool. Assert on failure. + * + * @param pool_name string Name of the pool to perform inject on + * @param oid string OID of the object to perform inject on + */ +void RadosCommands::inject_parity_read_on_primary_osd(const std::string& pool_name, + const std::string& oid) +{ + int primary_osd = get_primary_osd(pool_name, oid); + ceph::messaging::osd::InjectECParityRead parity_read_req{pool_name, oid}; + encode_json("InjectECParityRead", parity_read_req, formatter.get()); + + std::ostringstream oss; + formatter.get()->flush(oss); + + ceph::bufferlist inbl, outbl; + int rc = rados.osd_command(primary_osd, oss.str(), inbl, &outbl, nullptr); + ceph_assert(rc == 0); +} + +/** + * RadosCommands the clear parity read inject on the acting primary + * for the specified object and pool. Assert on failure. + * + * @param pool_name string Name of the pool to perform inject on + * @param oid string OID of the object to perform inject on + */ +void RadosCommands::inject_clear_parity_read_on_primary_osd(const std::string& pool_name, + const std::string& oid) +{ + int primary_osd = get_primary_osd(pool_name, oid); + ceph::messaging::osd::InjectECClearParityRead clear_parity_read_req{pool_name, oid}; + encode_json("InjectECClearParityRead", clear_parity_read_req, formatter.get()); + + std::ostringstream oss; + formatter.get()->flush(oss); + + ceph::bufferlist inbl, outbl; + int rc = rados.osd_command(primary_osd, oss.str(), inbl, &outbl, nullptr); + ceph_assert(rc == 0); +} \ No newline at end of file diff --git a/src/erasure-code/consistency/RadosCommands.h b/src/erasure-code/consistency/RadosCommands.h new file mode 100644 index 00000000000..e24e81e3263 --- /dev/null +++ b/src/erasure-code/consistency/RadosCommands.h @@ -0,0 +1,33 @@ +#pragma once + +#include +#include +#include "common/ceph_json.h" +#include "librados/librados_asio.h" +#include "global/global_init.h" +#include "global/global_context.h" +#include "erasure-code/ErasureCodePlugin.h" + +#define dout_context g_ceph_context + +namespace ceph { +namespace consistency { +class RadosCommands { + private: + librados::Rados& rados; + std::unique_ptr formatter; + + public: + RadosCommands(librados::Rados& rados); + int get_primary_osd(const std::string& pool_name, + const std::string& oid); + std::string get_pool_ec_profile_name(const std::string& pool_name); + bool get_pool_allow_ec_optimizations(const std::string& pool_name); + ceph::ErasureCodeProfile get_ec_profile_for_pool(const std::string& pool_name); + void inject_parity_read_on_primary_osd(const std::string& pool_name, + const std::string& oid); + void inject_clear_parity_read_on_primary_osd(const std::string& pool_name, + const std::string& oid); +}; +} +} \ No newline at end of file diff --git a/src/erasure-code/consistency/ceph_ec_consistency_checker.cc b/src/erasure-code/consistency/ceph_ec_consistency_checker.cc new file mode 100644 index 00000000000..58a3e0b1f71 --- /dev/null +++ b/src/erasure-code/consistency/ceph_ec_consistency_checker.cc @@ -0,0 +1,81 @@ +#include +#include + +#include "global/global_init.h" +#include "global/global_context.h" +#include "librados/librados_asio.h" +#include "common/ceph_argparse.h" + +#include "ConsistencyChecker.h" + +#define dout_context g_ceph_context + +namespace po = boost::program_options; +using bufferlist = ceph::bufferlist; + +int main(int argc, char **argv) +{ + auto args = argv_to_vec(argc, argv); + env_to_vec(args); + auto cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_CLIENT, + CODE_ENVIRONMENT_UTILITY, 0); + common_init_finish(cct.get()); + + librados::Rados rados; + boost::asio::io_context asio; + std::thread thread; + std::optional> guard; + + po::options_description desc("ceph_ec_consistency_checker options"); + + desc.add_options() + ("help,h", "show help message") + ("pool,p", po::value(), "pool name") + ("oid,i", po::value(), "object io") + ("blocksize,b", po::value(), "block size") + ("offset,o", po::value(), "offset") + ("length,l", po::value(), "length") + ("stripeunit,s", po::value(), "stripe unit"); + + po::variables_map vm; + std::vector unrecognized_options; + try { + auto parsed = po::command_line_parser(argc, argv) + .options(desc) + .allow_unregistered() + .run(); + po::store(parsed, vm); + if (vm.count("help")) { + std::cout << desc << std::endl; + return 0; + } + po::notify(vm); + unrecognized_options = po::collect_unrecognized(parsed.options, po::include_positional); + } catch(const po::error& e) { + std::cerr << "error: " << e.what() << std::endl; + return 1; + } + + auto pool = vm["pool"].as(); + auto oid = vm["oid"].as(); + auto blocksize = vm["blocksize"].as(); + auto offset = vm["offset"].as(); + auto length = vm["length"].as(); + auto stripe_unit = vm["stripeunit"].as(); + + int rc; + rc = rados.init_with_context(g_ceph_context); + ceph_assert(rc == 0); + rc = rados.connect(); + ceph_assert(rc == 0); + + guard.emplace(boost::asio::make_work_guard(asio)); + thread = make_named_thread("io_thread",[&asio] { asio.run(); }); + + auto checker = ceph::consistency::ConsistencyChecker(rados, asio, pool, stripe_unit); + checker.single_read_and_check_consistency(oid, blocksize, offset, length); + checker.print_results(std::cout); + + exit(0); +} \ No newline at end of file diff --git a/src/osd/ECBackend.cc b/src/osd/ECBackend.cc index 2b093f3109d..61b8af50e31 100644 --- a/src/osd/ECBackend.cc +++ b/src/osd/ECBackend.cc @@ -1643,6 +1643,7 @@ void ECBackend::objects_read_async( list>> to_read; unique_ptr on_complete; + CephContext *cct; cb(const cb &) = delete; cb(cb &&) = default; @@ -1650,11 +1651,13 @@ void ECBackend::objects_read_async( const hobject_t &hoid, const list>> &to_read, - Context *on_complete) + Context *on_complete, + CephContext *cct) : ec(ec), hoid(hoid), to_read(to_read), - on_complete(on_complete) {} + on_complete(on_complete), + cct(cct) {} void operator()(ECCommon::ec_extents_t &&results) { auto dpp = ec->get_parent()->get_dpp(); @@ -1686,6 +1689,10 @@ void ECBackend::objects_read_async( ldpp_dout(dpp, 20) << "range offset: " << range_offset << dendl; ldpp_dout(dpp, 20) << "length: " << length << dendl; ldpp_dout(dpp, 20) << "range length: " << range_length << dendl; + if (cct->_conf->bluestore_debug_inject_read_err && + ECInject::test_parity_read(hoid)) { + length = range_length; + } ceph_assert((offset + length) <= (range_offset + range_length)); bufs->substr_of( range.first.get_val(), @@ -1719,7 +1726,8 @@ void ECBackend::objects_read_async( cb(this, hoid, to_read, - on_complete))); + on_complete, + cct))); } void ECBackend::objects_read_and_reconstruct( diff --git a/src/osd/ECBackendL.cc b/src/osd/ECBackendL.cc index 8d685a1aa32..b692995dc00 100644 --- a/src/osd/ECBackendL.cc +++ b/src/osd/ECBackendL.cc @@ -1609,17 +1609,20 @@ void ECBackendL::objects_read_async( list > > to_read; unique_ptr on_complete; + CephContext *cct; cb(const cb&) = delete; cb(cb &&) = default; cb(ECBackendL *ec, const hobject_t &hoid, const list > > &to_read, - Context *on_complete) + Context *on_complete, + CephContext *cct) : ec(ec), hoid(hoid), to_read(to_read), - on_complete(on_complete) {} + on_complete(on_complete), + cct(cct) {} void operator()(ECCommonL::ec_extents_t &&results) { auto dpp = ec->get_parent()->get_dpp(); ldpp_dout(dpp, 20) << "objects_read_async_cb: got: " << results @@ -1652,6 +1655,10 @@ void ECBackendL::objects_read_async( ldpp_dout(dpp, 20) << "length: " << length << dendl; ldpp_dout(dpp, 20) << "range length: " << range_length << dendl; ceph_assert(offset + length <= range_offset + range_length); + if (cct->_conf->bluestore_debug_inject_read_err && + ECInject::test_parity_read(hoid)) { + length = range_length; + } read.second.first->substr_of( range.first.get_val(), offset - range_offset, @@ -1682,7 +1689,8 @@ void ECBackendL::objects_read_async( cb(this, hoid, to_read, - on_complete))); + on_complete, + cct))); } void ECBackendL::objects_read_and_reconstruct( diff --git a/src/osd/ECCommon.cc b/src/osd/ECCommon.cc index 6a4d64ba415..3b886806a87 100644 --- a/src/osd/ECCommon.cc +++ b/src/osd/ECCommon.cc @@ -471,6 +471,44 @@ void ECCommon::ReadPipeline::get_want_to_read_shards( } } +void ECCommon::ReadPipeline::get_want_to_read_all_shards( + const list &to_read, + ECUtil::shard_extent_set_t &want_shard_reads) +{ + for (const auto &single_region: to_read) { + sinfo.ro_range_to_shard_extent_set_with_parity(single_region.offset, + single_region.size, + want_shard_reads); + } + dout(20) << __func__ << ": to_read " << to_read + << " read_request " << want_shard_reads << dendl; +} + +/** + * Create a buffer containing both the ordered data and parity shards + * + * @param buffers_read shard_extent_map_t of shard indexes and their corresponding extent maps + * @param read ec_align_t Read size and offset for rados object + * @param outbl Pointer to output buffer + */ +void ECCommon::ReadPipeline::create_parity_read_buffer( + ECUtil::shard_extent_map_t buffers_read, + ec_align_t read, + bufferlist *outbl) +{ + bufferlist data, parity; + data = buffers_read.get_ro_buffer(read.offset, read.size); + + for (raw_shard_id_t raw_shard(sinfo.get_k()); + raw_shard < sinfo.get_k_plus_m(); ++raw_shard) { + shard_id_t shard = sinfo.get_shard(raw_shard); + buffers_read.get_shard_first_buffer(shard, parity); + } + + outbl->append(data); + outbl->append(parity); +} + struct ClientReadCompleter final : ECCommon::ReadCompleter { ClientReadCompleter(ECCommon::ReadPipeline &read_pipeline, ECCommon::ClientAsyncReadStatus *status @@ -502,8 +540,17 @@ struct ClientReadCompleter final : ECCommon::ReadCompleter { #endif for (auto &&read: req.to_read) { - result.insert(read.offset, read.size, - res.buffers_read.get_ro_buffer(read.offset, read.size)); + // Return a buffer containing both data and parity + // if the parity read inject is set + if (cct->_conf->bluestore_debug_inject_read_err && + ECInject::test_parity_read(hoid)) { + bufferlist data_and_parity; + read_pipeline.create_parity_read_buffer(res.buffers_read, read, &data_and_parity); + result.insert(read.offset, data_and_parity.length(), data_and_parity); + } else { + result.insert(read.offset, read.size, + res.buffers_read.get_ro_buffer(read.offset, read.size)); + } } } dout(20) << __func__ << " calling complete_object with result=" @@ -540,7 +587,13 @@ void ECCommon::ReadPipeline::objects_read_and_reconstruct( map for_read_op; for (auto &&[hoid, to_read]: reads) { ECUtil::shard_extent_set_t want_shard_reads(sinfo.get_k_plus_m()); - get_want_to_read_shards(to_read, want_shard_reads); + if (cct->_conf->bluestore_debug_inject_read_err && + ECInject::test_parity_read(hoid)) { + get_want_to_read_all_shards(to_read, want_shard_reads); + } + else { + get_want_to_read_shards(to_read, want_shard_reads); + } read_request_t read_request(to_read, want_shard_reads, false, object_size); const int r = get_min_avail_to_read_shards( diff --git a/src/osd/ECCommon.h b/src/osd/ECCommon.h index c1a490878e6..e9d5491aa7f 100644 --- a/src/osd/ECCommon.h +++ b/src/osd/ECCommon.h @@ -401,6 +401,14 @@ struct ECCommon { const std::list &to_read, ECUtil::shard_extent_set_t &want_shard_reads); + void get_want_to_read_all_shards( + const std::list &to_read, + ECUtil::shard_extent_set_t &want_shard_reads); + void create_parity_read_buffer( + ECUtil::shard_extent_map_t buffers_read, + ec_align_t read, + bufferlist *outbl); + /// Returns to_read replicas sufficient to reconstruct want int get_min_avail_to_read_shards( const hobject_t &hoid, ///< [in] object diff --git a/src/osd/ECCommonL.cc b/src/osd/ECCommonL.cc index da4cd6b35c7..7c9eaf6d3d4 100644 --- a/src/osd/ECCommonL.cc +++ b/src/osd/ECCommonL.cc @@ -505,6 +505,60 @@ void ECCommonL::ReadPipeline::get_want_to_read_shards( } } +void ECCommonL::ReadPipeline::get_want_to_read_all_shards( + std::set *want_to_read) const +{ + for (int i = 0; i < (int)sinfo.get_k_plus_m(); ++i) { + want_to_read->insert(sinfo.get_shard(i)); + } +} + +/** + * Create a buffer containing both the ordered data and parity shards + * + * @param to_decode Map of shard indexes and their corresponding data buffers + * @param wanted_to_read Set of shard indexes to be read + * @param outbl Pointer to output buffer + */ +void ECCommonL::ReadPipeline::create_parity_read_buffer( + std::map to_decode, + std::set wanted_to_read, + uint64_t read_size, + bufferlist *outbl) +{ + bufferlist data, parity; + std::map parities; + + for (unsigned int i = 0; i < sinfo.get_k_plus_m(); ++i) { + unsigned int raw_shard = sinfo.get_raw_shard(i); + if (raw_shard >= sinfo.get_k()) { + parities[raw_shard] = to_decode[i]; + wanted_to_read.erase(i); + to_decode.erase(i); + } + } + + for (auto const& p : parities) { + parity.append(p.second); + } + + dout(20) << __func__ << " going to decode: " + << " wanted_to_read=" << wanted_to_read + << " to_decode=" << to_decode + << dendl; + int r = ECUtilL::decode( + sinfo, + ec_impl, + wanted_to_read, + to_decode, + &data); + + ceph_assert(r == 0); + + outbl->append(data); + outbl->append(parity); +} + struct ClientReadCompleter : ECCommonL::ReadCompleter { ClientReadCompleter(ECCommonL::ReadPipeline &read_pipeline, ECCommonL::ClientAsyncReadStatus *status) @@ -539,6 +593,21 @@ struct ClientReadCompleter : ECCommonL::ReadCompleter { ++j) { to_decode[static_cast(j->first.shard)] = std::move(j->second); } + + if (cct->_conf->bluestore_debug_inject_read_err && + ECInject::test_parity_read(hoid)) { + bufferlist outbl; + read_pipeline.create_parity_read_buffer(to_decode, + wanted_to_read, + read.size, + &outbl); + + result.insert( + read.offset, outbl.length(), std::move(outbl)); + res.returned.pop_front(); + goto out; + } + dout(20) << __func__ << " going to decode: " << " wanted_to_read=" << wanted_to_read << " to_decode=" << to_decode @@ -619,7 +688,10 @@ void ECCommonL::ReadPipeline::objects_read_and_reconstruct( map for_read_op; for (auto &&to_read: reads) { set want_to_read; - if (cct->_conf->osd_ec_partial_reads) { + if (cct->_conf->bluestore_debug_inject_read_err && + ECInject::test_parity_read(to_read.first)) { + get_want_to_read_all_shards(&want_to_read); + } else if (cct->_conf->osd_ec_partial_reads) { for (const auto& single_region : to_read.second) { get_min_want_to_read_shards(single_region.offset, single_region.size, diff --git a/src/osd/ECCommonL.h b/src/osd/ECCommonL.h index e0cbe1a7311..f2e949de103 100644 --- a/src/osd/ECCommonL.h +++ b/src/osd/ECCommonL.h @@ -327,6 +327,13 @@ struct ECCommonL { friend struct FinishReadOp; void get_want_to_read_shards(std::set *want_to_read) const; + void get_want_to_read_all_shards(std::set *want_to_read) const; + void create_parity_read_buffer( + std::map to_decode, + std::set wanted_to_read, + uint64_t read_size, + bufferlist *outbl + ); /// Returns to_read replicas sufficient to reconstruct want int get_min_avail_to_read_shards( diff --git a/src/osd/ECInject.cc b/src/osd/ECInject.cc index bdb8f981388..ebcebd03b00 100644 --- a/src/osd/ECInject.cc +++ b/src/osd/ECInject.cc @@ -29,7 +29,7 @@ namespace ECInject { static std::map> write_failures3; static std::map write_failures0_shard; static std::set write_failures0_reqid; - + static std::set parity_reads; /** * Configure a read error inject that typically forces additional reads of * shards in an EC pool to recover data using the redundancy. With multiple @@ -140,6 +140,20 @@ namespace ECInject { return result; } + /** + * Configure a parity read inject that typically forces parity shards in an + * EC pool to be returned along with the data shards. + * + * @brief Set up a parity read inject for an object in an EC pool. + * @param o Target object for the parity read inject. + * @return string Result of configuring the inject. + */ + std::string parity_read(const hobject_t& o) { + std::lock_guard l(lock); + parity_reads.insert(o); + return "ok - parity read inject set"; + } + /** * @brief Clear a previously configured read error inject. * @param o Target object for the error inject. @@ -230,6 +244,20 @@ namespace ECInject { return "ok - " + std::to_string(remaining) + " injects cleared"; } + /** + * @brief Clear a previously configured parity read inject. + * @param o Target object for the clear parity read inject. + * @return string Indication of whether the inject was cleared. + */ + std::string clear_parity_read(const hobject_t& o) { + std::lock_guard l(lock); + if (!parity_reads.count(o)) { + return "no outstanding parity read inject"; + } + parity_reads.erase(o); + return "ok - parity read inject cleared"; + } + static bool test_error(const ghobject_t& o, std::map> *failures) { @@ -320,4 +348,14 @@ namespace ECInject { &write_failures3); } + /** + * @brief Test whether parity read inject is set for a particular object. + * @param o Target object for the parity read inject. + * @return bool true if set, otherwise false. + */ + bool test_parity_read(const hobject_t& o) + { + std::lock_guard l(lock); + return (parity_reads.count(o)); + } } // ECInject \ No newline at end of file diff --git a/src/osd/ECInject.h b/src/osd/ECInject.h index 161c54e0b73..dd2f3fb9494 100644 --- a/src/osd/ECInject.h +++ b/src/osd/ECInject.h @@ -23,13 +23,16 @@ namespace ECInject { // Error inject interfaces std::string read_error(const ghobject_t& o, const int64_t type, const int64_t when, const int64_t duration); std::string write_error(const ghobject_t& o, const int64_t type, const int64_t when, const int64_t duration); + std::string parity_read(const hobject_t& o); std::string clear_read_error(const ghobject_t& o, const int64_t type); std::string clear_write_error(const ghobject_t& o, const int64_t type); + std::string clear_parity_read(const hobject_t& o); bool test_read_error0(const ghobject_t& o); bool test_read_error1(const ghobject_t& o); bool test_write_error0(const hobject_t& o,const osd_reqid_t& reqid); bool test_write_error1(const ghobject_t& o); bool test_write_error2(const hobject_t& o); bool test_write_error3(const hobject_t& o); + bool test_parity_read(const hobject_t& o); } // ECInject diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index f55898cac6d..658caf0b329 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -4430,6 +4430,21 @@ void OSD::final_init() test_ops_hook, "Inject a full disk (optional count times)"); ceph_assert(r == 0); + r = admin_socket->register_command( + "injectparityread " \ + "name=pool,type=CephString " \ + "name=objname,type=CephObjectname ", + test_ops_hook, + "Tell the OSD to return the parity chunks along with the next read"); + ceph_assert(r == 0); + r = admin_socket->register_command( + "injectclearparityread " \ + "name=pool,type=CephString " \ + "name=objname,type=CephObjectname ", + test_ops_hook, + "Clear a parity read inject"); + ceph_assert(r == 0); + r = admin_socket->register_command( "bench " \ "name=count,type=CephInt,req=false " \ @@ -6567,7 +6582,8 @@ void TestOpsSocketHook::test_ops(OSDService *service, ObjectStore *store, command == "truncobj" || command == "injectmdataerr" || command == "injectdataerr" || command == "injectecreaderr" || command == "injectecclearreaderr" || - command == "injectecwriteerr" || command == "injectecclearwriteerr" + command == "injectecwriteerr" || command == "injectecclearwriteerr" || + command == "injectparityread" || command == "injectclearparityread" ) { pg_t rawpg; int64_t pool; @@ -6613,7 +6629,9 @@ void TestOpsSocketHook::test_ops(OSDService *service, ObjectStore *store, (command != "injectecreaderr") && (command != "injectecclearreaderr") && (command != "injectecwriteerr") && - (command != "injectecclearwriteerr")) { + (command != "injectecclearwriteerr") && + (command != "injectparityread") && + (command != "injectclearparityread")) { ss << "Must not call on ec pool"; return; } @@ -6621,7 +6639,9 @@ void TestOpsSocketHook::test_ops(OSDService *service, ObjectStore *store, if ((command == "injectecreaderr") || (command == "injecteclearreaderr") || (command == "injectecwriteerr") || - (command == "injecteclearwriteerr")) { + (command == "injecteclearwriteerr") || + (command == "injectparityread") || + (command == "injectclearparityread")) { ss << "Only supported on ec pool"; return; } @@ -6733,6 +6753,18 @@ void TestOpsSocketHook::test_ops(OSDService *service, ObjectStore *store, } else { ss << "bluestore_debug_inject_read_err not enabled"; } + } else if (command == "injectparityread") { + if (service->cct->_conf->bluestore_debug_inject_read_err) { + ss << "injectparityread: " << ECInject::parity_read(obj); + } else { + ss << "bluestore_debug_inject_read_err not enabled"; + } + } else if (command == "injectclearparityread") { + if (service->cct->_conf->bluestore_debug_inject_read_err) { + ss << "injectclearparityread: " << ECInject::clear_parity_read(obj); + } else { + ss << "bluestore_debug_inject_read_err not enabled"; + } } return; } -- 2.39.5