A new test-only inject tells the EC backend to return both data and parity shards to the client so that they can
be checked for consistency by the new tool.
Supports both optimized and unoptimized EC pools.
Signed-off-by: Connor Fawcett <connorfa@uk.ibm.com>
void OSDPoolGetReply::dump(Formatter* f) const {
encode_json("erasure_code_profile", erasure_code_profile, f);
+ encode_json("allow_ec_optimizations", allow_ec_optimizations, f);
}
void OSDPoolGetReply::decode_json(JSONObj* obj) {
JSONDecoder::decode_json("erasure_code_profile", erasure_code_profile, obj);
+ JSONDecoder::decode_json("allow_ec_optimizations", allow_ec_optimizations, obj);
}
void OSDECProfileGetRequest::dump(Formatter* f) const {
- encode_json("prefix", "osd pool get", f);
+ encode_json("prefix", "osd erasure-code-profile get", f);
encode_json("name", name, f);
encode_json("format", format, f);
}
void OSDSetRequest::decode_json(JSONObj* obj) {
JSONDecoder::decode_json("key", key, obj);
JSONDecoder::decode_json("yes_i_really_mean_it", yes_i_really_mean_it, obj);
+}
+
+void InjectECParityRead::dump(Formatter* f) const {
+ encode_json("prefix", "injectparityread", f);
+ encode_json("pool", pool, f);
+ encode_json("objname", objname, f);
+}
+
+void InjectECParityRead::decode_json(JSONObj* obj) {
+ JSONDecoder::decode_json("pool", pool, obj);
+ JSONDecoder::decode_json("objname", objname, obj);
+}
+
+void InjectECClearParityRead::dump(Formatter* f) const {
+ encode_json("prefix", "injectclearparityread", f);
+ encode_json("pool", pool, f);
+ encode_json("objname", objname, f);
+}
+
+void InjectECClearParityRead::decode_json(JSONObj* obj) {
+ JSONDecoder::decode_json("pool", pool, obj);
+ JSONDecoder::decode_json("objname", objname, obj);
}
\ No newline at end of file
struct OSDPoolGetReply {
std::string erasure_code_profile;
-
+ bool allow_ec_optimizations;
void dump(Formatter* f) const;
void decode_json(JSONObj* obj);
};
JSONDecoder::decode_json("type", type, obj);
}
};
+struct InjectECParityRead {
+ std::string pool;
+ std::string objname;
+
+ void dump(Formatter* f) const;
+ void decode_json(JSONObj* obj);
+};
+struct InjectECClearParityRead {
+ std::string pool;
+ std::string objname;
+
+ void dump(Formatter* f) const;
+ void decode_json(JSONObj* obj);
+};
} // namespace osd
} // namespace messaging
} // namespace ceph
\ No newline at end of file
add_subdirectory(lrc)
add_subdirectory(shec)
add_subdirectory(clay)
+add_subdirectory(consistency)
if(HAVE_NASM_X64_AVX2 OR HAVE_ARMV8_SIMD)
set(WITH_EC_ISA_PLUGIN TRUE CACHE BOOL "")
--- /dev/null
+add_library(ec_consistency STATIC
+ ECReader.cc
+ ECEncoder.cc
+ ECEncoderSwitch.cc
+ Pool.cc
+ ConsistencyChecker.cc
+ RadosCommands.cc
+ ${PROJECT_SOURCE_DIR}/src/osd/ECUtilL.cc
+ ${PROJECT_SOURCE_DIR}/src/osd/ECUtil.cc
+)
+
+target_link_libraries(ec_consistency
+ librados
+ global
+ json_structures
+)
+
+add_executable(ceph_ec_consistency_checker
+ ${CMAKE_CURRENT_SOURCE_DIR}/ceph_ec_consistency_checker.cc
+ ${PROJECT_SOURCE_DIR}/src/osd/ECUtilL.cc
+ ${PROJECT_SOURCE_DIR}/src/osd/ECUtil.cc)
+target_link_libraries(ceph_ec_consistency_checker
+ librados global ec_consistency)
+install(TARGETS
+ceph_ec_consistency_checker
+ DESTINATION ${CMAKE_INSTALL_BINDIR})
--- /dev/null
+#include "ConsistencyChecker.h"
+
+#include "RadosCommands.h"
+#include "Pool.h"
+#include "ECReader.h"
+#include "ECEncoder.h"
+#include "ECEncoderSwitch.h"
+
+using ConsistencyChecker = ceph::consistency::ConsistencyChecker;
+
+using Read = ceph::consistency::Read;
+using ReadResult = ceph::consistency::ReadResult;
+using bufferlist = ceph::bufferlist;
+
+ConsistencyChecker::ConsistencyChecker(librados::Rados &rados,
+ boost::asio::io_context& asio,
+ const std::string& pool_name,
+ int stripe_unit) :
+ rados(rados),
+ asio(asio),
+ reader(ceph::consistency::ECReader(rados, asio, pool_name)),
+ commands(ceph::consistency::RadosCommands(rados)),
+ pool(pool_name,
+ commands.get_ec_profile_for_pool(pool_name),
+ commands.get_pool_allow_ec_optimizations(pool_name)),
+ encoder(ceph::consistency::ECEncoderSwitch(pool.get_ec_profile(),
+ stripe_unit,
+ commands.get_pool_allow_ec_optimizations(pool_name)
+ )) {}
+
+/**
+ * Perform an end-to-end read and consistency check on a single object.
+ * Current implementation only supports reading the entire object, so length and
+ * offset should normally be 0.
+ *
+ * @param oid string Name of the pool to perform inject on
+ * @param block_size int Block size for the data being read
+ * @param offset int Which offset to read from
+ * @param length int How much data of each shard to read
+ * @return bool true if consistent, otherwise false
+ */
+bool ConsistencyChecker::single_read_and_check_consistency(const std::string& oid,
+ int block_size,
+ int offset,
+ int length)
+{
+ clear_results();
+ std::string error_message = "";
+ bool success = true;
+
+ auto read = Read(oid, block_size, offset, length);
+ queue_ec_read(read);
+
+ auto read_results = reader.get_results();
+ int result_count = read_results->size();
+ if (result_count != 1) {
+ error_message = "Incorrect number of RADOS read results returned, count: "
+ + std::to_string(result_count);
+ success = false;
+ }
+
+ ReadResult read_result = (*read_results)[0];
+ boost::system::error_code ec = read_result.get_ec();
+ if (success && ec != boost::system::errc::success) {
+ error_message = "RADOS Read failed, error message: " + ec.message();
+ success = false;
+ }
+
+ if (success && read_result.get_data().length() == 0) {
+ error_message = "Empty object returned from RADOS read.";
+ success = false;
+ }
+
+ if (success && !check_object_consistency(oid, read_result.get_data())) {
+ error_message = "Generated parity did not match read in parity shards.";
+ success = false;
+ }
+
+ results.push_back({oid, error_message, success});
+ commands.inject_clear_parity_read_on_primary_osd(pool.get_pool_name(),
+ oid);
+ return success;
+}
+
+/**
+ * Queue up an EC read with the parity read inject set
+ *
+ * @param read Object containing information about the read
+ */
+void ConsistencyChecker::queue_ec_read(Read read)
+{
+ commands.inject_parity_read_on_primary_osd(pool.get_pool_name(),
+ read.get_oid());
+ reader.do_read(read);
+}
+
+/**
+ * Generate parities from the data and compare to the parity shards
+ *
+ * @param oid string The object ID of the object being checked
+ * @param inbl bufferlist The entire contents of the object, including parities
+ * @param stripe_unit int The chunk size for the object
+ */
+bool ConsistencyChecker::check_object_consistency(const std::string& oid,
+ const bufferlist& inbl)
+{
+ bool is_optimized = pool.has_optimizations_enabled();
+ std::pair<bufferlist, bufferlist> data_and_parity;
+ data_and_parity = split_data_and_parity(oid, inbl, encoder.get_k(),
+ encoder.get_m(), is_optimized);
+
+ std::optional<bufferlist> outbl;
+ outbl = encoder.do_encode(data_and_parity.first);
+
+ if (!outbl.has_value()) {
+ return false;
+ }
+
+ return buffers_match(outbl.value(), data_and_parity.second);
+}
+
+void ConsistencyChecker::print_results(std::ostream& out)
+{
+ out << "Results:" << std::endl;
+ for (const auto &r : results) {
+ std::string result_str = (r.get_result()) ? "Passed" : "Failed";
+ std::string error_str = r.get_error_message();
+ out << "Object ID " << r.get_oid() << ": " << result_str << std::endl;
+ if (!error_str.empty()) {
+ out << "Error Message: " << error_str << std::endl;
+ }
+ }
+
+ int count = results.size();
+ std::string obj_str = (count == 1) ? "object checked." : "objects checked.";
+ out << "Total: " << count << " " << obj_str << std::endl;
+}
+
+std::pair<bufferlist, bufferlist>
+ ConsistencyChecker::split_data_and_parity(const std::string& oid,
+ const bufferlist& read,
+ int k, int m,
+ bool is_optimized)
+{
+ uint64_t data_size, parity_size;
+
+ // Optimized EC parity read should return the exact object size + parity shards
+ // Legacy EC parity read will return the entire padded data shards + parity shards
+ data_size = is_optimized ? reader.get_object_size(oid) : (read.length() / (k + m)) * k;
+ parity_size = read.length() - data_size;
+
+ bufferlist data, parity;
+ auto it = read.begin();
+ it.copy(data_size, data);
+ it.copy(parity_size, parity);
+ return std::pair<bufferlist, bufferlist>(data, parity);
+}
+
+bool ConsistencyChecker::buffers_match(const bufferlist& b1,
+ const bufferlist& b2)
+{
+ return (b1.contents_equal(b2));
+}
+
+void ConsistencyChecker::clear_results()
+{
+ reader.clear_results();
+ results.clear();
+}
\ No newline at end of file
--- /dev/null
+#pragma once
+
+#include <boost/asio/io_context.hpp>
+#include <boost/program_options.hpp>
+#include "librados/librados_asio.h"
+#include "global/global_init.h"
+#include "global/global_context.h"
+
+#include "Pool.h"
+#include "ECReader.h"
+#include "RadosCommands.h"
+#include "ECEncoder.h"
+#include "ECEncoderSwitch.h"
+
+#define dout_context g_ceph_context
+
+namespace ceph {
+namespace consistency {
+class ConsistencyCheckResult {
+ private:
+ std::string oid;
+ std::string error_message;
+ bool result;
+
+ public:
+ std::string get_oid() const { return oid; }
+ std::string get_error_message() const { return error_message; }
+ bool get_result() const { return result; }
+ ConsistencyCheckResult(std::string oid,
+ std::string error_message,
+ bool result) :
+ oid(oid),
+ error_message(error_message),
+ result(result) {}
+};
+
+class ConsistencyChecker {
+ private:
+ librados::Rados& rados;
+ boost::asio::io_context& asio;
+ ceph::consistency::ECReader reader;
+ ceph::consistency::RadosCommands commands;
+ ceph::consistency::Pool pool;
+ ceph::consistency::ECEncoderSwitch encoder;
+ std::vector<ConsistencyCheckResult> results;
+ bool buffers_match(const bufferlist& b1, const bufferlist& b2);
+ std::pair<bufferlist, bufferlist> split_data_and_parity(const std::string& oid,
+ const bufferlist& read,
+ int k, int m,
+ bool is_optimized);
+
+ public:
+ ConsistencyChecker(librados::Rados& rados,
+ boost::asio::io_context& asio,
+ const std::string& pool_name,
+ int stripe_unit);
+ void queue_ec_read(Read read);
+ bool check_object_consistency(const std::string& oid,
+ const bufferlist& inbl);
+ void print_results(std::ostream& out);
+ void clear_results();
+ bool single_read_and_check_consistency(const std::string& oid,
+ int block_size,
+ int offset,
+ int length);
+};
+}
+}
\ No newline at end of file
--- /dev/null
+#include "ECEncoder.h"
+#include <typeinfo>
+#include "common/errno.h"
+#include "osd/ECUtil.h"
+#include "osd/ECUtilL.h"
+
+using stripe_info_l_t = ECLegacy::ECUtilL::stripe_info_t;
+using stripe_info_o_t = ECUtil::stripe_info_t;
+
+namespace ceph {
+namespace consistency {
+
+template <typename SInfo>
+ECEncoder<SInfo>::ECEncoder(ceph::ErasureCodeProfile profile, int chunk_size) :
+ profile(profile),
+ chunk_size(chunk_size)
+{
+ int r = ec_init_plugin(ec_impl);
+ if (r < 0) {
+ std::cerr << "Failed to initialize plugin: " << r << std::endl;
+ }
+ stripe_info = ec_init_sinfo(ec_impl);
+}
+
+/**
+ * Initialize the ErasureCodeInterfaceRef needed to perform encode.
+ *
+ * @param ec_impl Pointer to plugin being initialized
+ * @returns int 0 if successful, otherwise 1
+ */
+template <typename SInfo>
+int ECEncoder<SInfo>::ec_init_plugin(ceph::ErasureCodeInterfaceRef &ec_impl)
+{
+ auto plugin = profile.find("plugin");
+ if (plugin == profile.end()) {
+ std::cerr << "Invalid profile: plugin not specified." << std::endl;
+ return 1;
+ }
+
+ std::stringstream ss;
+ std::string dir = g_conf().get_val<std::string>("erasure_code_dir");
+ ceph::ErasureCodePluginRegistry::instance().factory(plugin->second,
+ dir, profile,
+ &ec_impl, &ss);
+ if (!ec_impl) {
+ std::cerr << "Invalid profile: " << ss.str() << std::endl;
+ return 1;
+ }
+
+ return 0;
+}
+
+/**
+ * Initialize the stripe_info_t needed to perform encode. Optimized version for new EC.
+ *
+ * @param ec_impl Pointer to plugin object
+ * @returns Unique pointer to the stripe info object associated with the EC profile
+ */
+template <>
+std::unique_ptr<stripe_info_o_t> ECEncoder<stripe_info_o_t>::ec_init_sinfo(
+ ceph::ErasureCodeInterfaceRef &ec_impl)
+{
+ uint64_t k = std::stol(profile["k"]);
+ ceph_assert(k > 0);
+ uint64_t stripe_width = k * chunk_size;
+ std::unique_ptr<stripe_info_o_t> s(
+ new stripe_info_o_t(ec_impl, nullptr, stripe_width));
+ return s;
+}
+
+/**
+ * Initialize the stripe_info_t needed to perform encode. Legacy version for old EC.
+ *
+ * @param ec_impl Pointer to plugin object
+ * @returns Unique pointer to the stripe info object associated with the EC profile.
+ */
+template <>
+std::unique_ptr<stripe_info_l_t> ECEncoder<stripe_info_l_t>::ec_init_sinfo(
+ ceph::ErasureCodeInterfaceRef &ec_impl)
+{
+ uint64_t k = stol(profile["k"]);
+ ceph_assert(k > 0);
+ uint64_t stripe_width = k * chunk_size;
+ std::unique_ptr<stripe_info_l_t> s(new stripe_info_l_t(ec_impl, stripe_width));
+ return s;
+}
+
+/**
+ * Perform encode on the input buffer and place result in the supplied output buffer.
+ * Optimized EC encode function.
+ *
+ * @param inbl Buffer to be encoded
+ * @returns Optional, returns buffer for the encode output if encode is successful
+ */
+template <>
+std::optional<ceph::bufferlist> ECEncoder<stripe_info_o_t>::do_encode(ceph::bufferlist inbl,
+ stripe_info_o_t &sinfo)
+{
+ ECUtil::shard_extent_map_t encoded_data(&sinfo);
+
+ uint64_t stripe_width = sinfo.get_stripe_width();
+ if (inbl.length() % stripe_width != 0) {
+ uint64_t pad = stripe_width - inbl.length() % stripe_width;
+ inbl.append_zero(pad);
+ }
+
+ sinfo.ro_range_to_shard_extent_map(0, inbl.length(), inbl, encoded_data);
+ encoded_data.insert_parity_buffers();
+ int r = encoded_data.encode(ec_impl, nullptr, encoded_data.get_ro_end());
+ if (r < 0) {
+ std::cerr << "Failed to encode: " << cpp_strerror(r) << std::endl;
+ return {};
+ }
+
+ ceph::bufferlist outbl;
+ for (const auto &[shard, _] : encoded_data.get_extent_maps()) {
+ if (shard >= sinfo.get_k()) {
+ encoded_data.get_shard_first_buffer(shard, outbl);
+ }
+ }
+
+ return outbl;
+}
+
+/**
+ * Perform encode on the input buffer and place result in the supplied output buffer.
+ * Legacy EC encode function.
+ *
+ * @param inbl Buffer to be encoded
+ * @returns Optional, returns buffer for the encode output if encode is successful
+ */
+template <>
+std::optional<ceph::bufferlist> ECEncoder<stripe_info_l_t>::do_encode(ceph::bufferlist inbl,
+ stripe_info_l_t &sinfo)
+{
+ uint64_t stripe_width = sinfo.get_stripe_width();
+
+ if (inbl.length() % stripe_width != 0) {
+ uint64_t pad = stripe_width - inbl.length() % stripe_width;
+ inbl.append_zero(pad);
+ }
+
+ std::set<int> want;
+ int k_plus_m = sinfo.get_k_plus_m();
+ for (int i = 0; i < k_plus_m; i++) {
+ want.insert(i);
+ }
+
+ std::map<int, ceph::bufferlist> encoded_data;
+ int r = ECLegacy::ECUtilL::encode(sinfo, ec_impl, inbl, want, &encoded_data);
+ if (r < 0) {
+ std::cerr << "Failed to encode, rc: " << r << std::endl;
+ return {};
+ }
+
+ ceph::bufferlist outbl;
+ for (const auto &[shard, bl] : encoded_data) {
+ unsigned int raw_shard = sinfo.get_raw_shard(shard);
+ if (raw_shard >= sinfo.get_k()) {
+ bufferlist::const_iterator it = bl.begin();
+ it.copy_all(outbl);
+ }
+ }
+
+ return outbl;
+}
+
+/**
+ * Generic function which call either legacy or optimized version of encode.
+ *
+ * @param inbl Buffer to be encoded
+ * @returns Optional, returns buffer for the encode output if encode is successful
+ */
+template <typename SInfo>
+std::optional<ceph::bufferlist> ECEncoder<SInfo>::do_encode(ceph::bufferlist inbl)
+{
+ return do_encode(inbl, *(stripe_info.get()));
+}
+
+/**
+ * Return data shard count for the stripe
+ *
+ * @returns int Number of data shards
+ */
+template <typename SInfo>
+int ECEncoder<SInfo>::get_k()
+{
+ return stripe_info->get_k();
+}
+
+/**
+ * Return parity shard count for the stripe
+ *
+ * @returns int Number of parity shards
+ */
+template <typename SInfo>
+int ECEncoder<SInfo>::get_m()
+{
+ return stripe_info->get_m();
+}
+}
+}
+
+template class ceph::consistency::ECEncoder<stripe_info_l_t>;
+template class ceph::consistency::ECEncoder<stripe_info_o_t>;
--- /dev/null
+#pragma once
+
+#include "include/buffer.h"
+#include "erasure-code/ErasureCode.h"
+#include "erasure-code/ErasureCodePlugin.h"
+#include "global/global_context.h"
+#include "global/global_init.h"
+#include "osd/ECUtil.h"
+#include "osd/ECUtilL.h"
+
+using stripe_info_l_t = ECLegacy::ECUtilL::stripe_info_t;
+using stripe_info_o_t = ECUtil::stripe_info_t;
+
+namespace ceph {
+namespace consistency {
+template <typename SInfo>
+class ECEncoder {
+ private:
+ ceph::ErasureCodeProfile profile;
+ ceph::ErasureCodeInterfaceRef ec_impl;
+ std::unique_ptr<SInfo> stripe_info;
+ int chunk_size;
+ int ec_init_plugin(ceph::ErasureCodeInterfaceRef &ec_impl);
+ std::unique_ptr<SInfo> ec_init_sinfo(ceph::ErasureCodeInterfaceRef &ec_impl);
+ std::optional<ceph::bufferlist> do_encode(ceph::bufferlist inbl,
+ SInfo &sinfo);
+ public:
+ ECEncoder(ceph::ErasureCodeProfile profile, int chunk_size);
+ std::optional<ceph::bufferlist> do_encode(ceph::bufferlist inbl);
+ int get_k(void);
+ int get_m(void);
+};
+}
+}
\ No newline at end of file
--- /dev/null
+
+#include "ECEncoderSwitch.h"
+#include "ECEncoder.h"
+#include "osd/ECUtil.h"
+#include "osd/ECUtilL.h"
+
+using stripe_info_l_t = ECLegacy::ECUtilL::stripe_info_t;
+using ECEncoderSwitch = ceph::consistency::ECEncoderSwitch;
+
+ECEncoderSwitch::ECEncoderSwitch(ceph::ErasureCodeProfile profile,
+ int chunk_size,
+ bool optimizations_enabled) :
+ encoder_optimized(ceph::consistency::ECEncoder<ECUtil::stripe_info_t>(profile, chunk_size)),
+ encoder_legacy(ceph::consistency::ECEncoder<stripe_info_l_t>(profile, chunk_size)),
+ optimizations_enabled(optimizations_enabled) {}
+
+/**
+ * Generic function which call either legacy or optimized version of encode
+ * from the correct version of the encoder
+ *
+ * @param inbl Buffer to be encoded
+ * @returns Optional, returns buffer for the encode output if encode is successful
+ */
+std::optional<ceph::bufferlist> ECEncoderSwitch::do_encode(ceph::bufferlist inbl)
+{
+ if (optimizations_enabled) {
+ return encoder_optimized.do_encode(inbl);
+ } else {
+ return encoder_legacy.do_encode(inbl);
+ }
+}
+
+/**
+ * Return data shard count for the stripe from the correct version of the encoder
+ *
+ * @returns int Number of data shards
+ */
+int ECEncoderSwitch::get_k()
+{
+ if (optimizations_enabled) {
+ return encoder_optimized.get_k();
+ } else {
+ return encoder_legacy.get_k();
+ }
+}
+
+/**
+ * Return parity shard count for the stripe from the correct version of the encoder
+ *
+ * @returns int Number of parity shards
+ */
+int ECEncoderSwitch::get_m()
+{
+ if (optimizations_enabled) {
+ return encoder_optimized.get_m();
+ } else {
+ return encoder_legacy.get_m();
+ }
+}
--- /dev/null
+#pragma once
+
+#include "ECEncoder.h"
+#include "include/buffer.h"
+#include "erasure-code/ErasureCode.h"
+#include "erasure-code/ErasureCodePlugin.h"
+#include "global/global_context.h"
+#include "global/global_init.h"
+#include "osd/ECUtil.h"
+#include "osd/ECUtilL.h"
+
+using stripe_info_l_t = ECLegacy::ECUtilL::stripe_info_t;
+
+namespace ceph {
+namespace consistency {
+class ECEncoderSwitch {
+ private:
+ ceph::consistency::ECEncoder<ECUtil::stripe_info_t> encoder_optimized;
+ ceph::consistency::ECEncoder<stripe_info_l_t> encoder_legacy;
+ bool optimizations_enabled;
+
+ public:
+ ECEncoderSwitch(ceph::ErasureCodeProfile profile,
+ int chunk_size,
+ bool optimizations_enabled);
+ std::optional<ceph::bufferlist> do_encode(ceph::bufferlist inbl);
+ int get_k(void);
+ int get_m(void);
+};
+}
+}
+
--- /dev/null
+#include "ECReader.h"
+
+#include <boost/program_options.hpp>
+
+
+using ECReader = ceph::consistency::ECReader;
+using ReadResult = ceph::consistency::ReadResult;
+using Read = ceph::consistency::Read;
+
+Read::Read(const std::string& oid,
+ uint64_t block_size,
+ uint64_t offset,
+ uint64_t length) :
+oid(oid),
+block_size(block_size),
+offset(offset),
+length(length) {}
+
+std::string Read::get_oid() { return oid; }
+uint64_t Read::get_block_size() { return block_size; }
+uint64_t Read::get_offset() { return offset; }
+uint64_t Read::get_length() { return length; }
+
+
+ECReader::ECReader(librados::Rados& rados,
+ boost::asio::io_context& asio,
+ const std::string& pool_name) :
+ rados(rados),
+ asio(asio),
+ pool_name(pool_name),
+ lock(ceph::make_mutex("ECReader::lock")),
+ outstanding_io(0)
+{
+ int rc;
+ rc = rados.ioctx_create(pool_name.c_str(), io);
+ ceph_assert(rc == 0);
+}
+
+void ECReader::start_io()
+{
+ std::lock_guard l(lock);
+ outstanding_io++;
+}
+
+void ECReader::finish_io()
+{
+ std::lock_guard l(lock);
+ ceph_assert(outstanding_io > 0);
+ outstanding_io--;
+ cond.notify_all();
+}
+
+void ECReader::wait_for_io()
+{
+ std::unique_lock l(lock);
+ while (outstanding_io > 0) {
+ cond.wait(l);
+ }
+}
+
+/**
+ * Send a syncronous stat request to librados.
+ *
+ * @param oid Object ID.
+ * @returns The size of the object, -1 if the stat failed.
+ */
+uint64_t ECReader::get_object_size(std::string oid)
+{
+ uint64_t size;
+ int r = io.stat(oid, &size, nullptr);
+ if (r != 0) {
+ return -1;
+ }
+ return size;
+}
+
+/**
+ * Send an async read request to librados. Push the returned data
+ * and oid into the results vector.
+ *
+ * @param read Read object containing oid, length, offset and block size
+ */
+void ECReader::do_read(Read read)
+{
+ start_io();
+ librados::ObjectReadOperation op;
+ op.read(read.get_offset() * read.get_block_size(),
+ read.get_length() * read.get_block_size(),
+ nullptr, nullptr);
+
+ std::string oid = read.get_oid();
+ auto read_cb = [&, oid](boost::system::error_code ec,
+ version_t ver,
+ bufferlist outbl) {
+ results.push_back({oid, ec, outbl});
+ finish_io();
+ };
+
+ librados::async_operate(asio.get_executor(), io, read.get_oid(),
+ std::move(op), 0, nullptr, read_cb);
+}
+
+/**
+ * Wait for all outstanding reads to complete then return the results vector.
+ * @returns Vector containing the results of all reads
+ */
+std::vector<ReadResult>* ECReader::get_results()
+{
+ wait_for_io();
+ return &results;
+}
+
+/**
+ * Clear the results vector.
+ */
+void ECReader::clear_results()
+{
+ results.clear();
+}
\ No newline at end of file
--- /dev/null
+#pragma once
+
+#include <boost/asio/io_context.hpp>
+#include <boost/program_options.hpp>
+#include "librados/librados_asio.h"
+#include "global/global_init.h"
+#include "global/global_context.h"
+
+#define dout_context g_ceph_context
+
+namespace ceph {
+namespace consistency {
+class ReadResult {
+ private:
+ std::string oid;
+ boost::system::error_code ec;
+ ceph::bufferlist data;
+
+ public:
+ std::string get_oid() const { return oid; }
+ boost::system::error_code get_ec() const { return ec; }
+ ceph::bufferlist get_data() const { return data; }
+ ReadResult(std::string oid,
+ boost::system::error_code ec,
+ ceph::bufferlist data) :
+ oid(oid),
+ ec(ec),
+ data(data) {}
+};
+
+class Read {
+ private:
+ std::string oid;
+ uint64_t block_size;
+ uint64_t offset;
+ uint64_t length;
+
+ public:
+ Read(const std::string& oid,
+ uint64_t block_size,
+ uint64_t offset,
+ uint64_t length);
+ std::string get_oid(void);
+ uint64_t get_block_size(void);
+ uint64_t get_offset(void);
+ uint64_t get_length(void);
+};
+class ECReader {
+ private:
+ librados::Rados& rados;
+ boost::asio::io_context& asio;
+ std::string pool_name;
+ std::string oid;
+ librados::IoCtx io;
+ ceph::condition_variable cond;
+ std::vector<ReadResult> results;
+ ceph::mutex lock;
+ int outstanding_io;
+
+ public:
+ ECReader(librados::Rados& rados,
+ boost::asio::io_context& asio,
+ const std::string& pool);
+ uint64_t get_object_size(std::string oid);
+ void do_read(Read read);
+ void start_io(void);
+ void finish_io(void);
+ void wait_for_io(void);
+ std::vector<ReadResult>* get_results(void);
+ void clear_results(void);
+};
+}
+}
\ No newline at end of file
--- /dev/null
+#include "Pool.h"
+
+using Pool = ceph::consistency::Pool;
+
+Pool::Pool(const std::string& pool_name,
+ const ceph::ErasureCodeProfile& profile,
+ bool optimizations_enabled) :
+ pool_name(pool_name),
+ profile(profile),
+ optimizations_enabled(optimizations_enabled)
+{
+}
+
+ceph::ErasureCodeProfile Pool::get_ec_profile() { return profile; }
+std::string Pool::get_pool_name() { return pool_name; }
+bool Pool::has_optimizations_enabled() { return optimizations_enabled; }
--- /dev/null
+#pragma once
+
+#include "global/global_init.h"
+#include "global/global_context.h"
+#include "erasure-code/ErasureCodePlugin.h"
+
+namespace ceph {
+namespace consistency {
+class Pool {
+ private:
+ std::string pool_name;
+ ceph::ErasureCodeProfile profile;
+ bool optimizations_enabled;
+
+ public:
+ Pool(const std::string& pool_name,
+ const ceph::ErasureCodeProfile& profile,
+ bool optimizations_enabled);
+ ceph::ErasureCodeProfile get_ec_profile(void);
+ std::string get_pool_name(void);
+ bool has_optimizations_enabled(void);
+};
+}
+}
\ No newline at end of file
--- /dev/null
+#include "RadosCommands.h"
+#include "common/ceph_json.h"
+#include "common/json/OSDStructures.h"
+#include "erasure-code/ErasureCodePlugin.h"
+#include <boost/algorithm/string.hpp>
+
+using RadosCommands = ceph::consistency::RadosCommands;
+
+RadosCommands::RadosCommands(librados::Rados& rados) :
+ rados(rados),
+ formatter(new JSONFormatter(true))
+{
+}
+
+/**
+ * Return the index of the acting primary OSD for the given pool
+ * and object name. Assert on failure.
+ *
+ * @param pool_name string Name of the pool to find acting primary of
+ * @param oid string OID of the object to find acting primary of
+ * @returns int ID of the acting primary OSD
+ */
+int RadosCommands::get_primary_osd(const std::string& pool_name,
+ const std::string& oid)
+{
+ ceph::messaging::osd::OSDMapRequest osd_map_request{pool_name, oid, ""};
+ encode_json("OSDMapRequest", osd_map_request, formatter.get());
+
+ std::ostringstream oss;
+ formatter.get()->flush(oss);
+
+ ceph::bufferlist inbl, outbl;
+ int rc = rados.mon_command(oss.str(), inbl, &outbl, nullptr);
+ ceph_assert(rc == 0);
+
+ JSONParser p;
+ bool success = p.parse(outbl.c_str(), outbl.length());
+ ceph_assert(success);
+
+ ceph::messaging::osd::OSDMapReply reply;
+ reply.decode_json(&p);
+ int osd = reply.acting_primary;
+ ceph_assert(osd >= 0);
+
+ return osd;
+}
+
+/**
+ * Send a mon command to fetch the value of the 'allow_ec_optimizations' flag for the
+ * specified pool and return it.
+ *
+ * @param pool_name string Name of the pool to get the erasure code profile for
+ * @returns bool Whether allow EC optimizations is set on the pool
+ */
+bool RadosCommands::get_pool_allow_ec_optimizations(const std::string& pool_name)
+{
+ ceph::messaging::osd::OSDPoolGetRequest osd_pool_get_request{pool_name, "allow_ec_optimizations"};
+ encode_json("OSDPoolGetRequest", osd_pool_get_request, formatter.get());
+
+ std::ostringstream oss;
+ formatter.get()->flush(oss);
+
+ ceph::bufferlist inbl, outbl;
+ int rc = rados.mon_command(oss.str(), inbl, &outbl, nullptr);
+ ceph_assert(rc == 0);
+
+ JSONParser p;
+ bool success = p.parse(outbl.c_str(), outbl.length());
+ ceph_assert(success);
+
+ ceph::messaging::osd::OSDPoolGetReply osd_pool_get_reply;
+ osd_pool_get_reply.decode_json(&p);
+
+ return osd_pool_get_reply.allow_ec_optimizations;
+}
+
+/**
+ * Send a mon command to fetch the name of the erasure code profile for the
+ * specified pool and return it.
+ *
+ * @param pool_name string Name of the pool to get the erasure code profile for
+ * @returns string The erasure code profile for the specified pool
+ */
+std::string RadosCommands::get_pool_ec_profile_name(const std::string& pool_name)
+{
+ ceph::messaging::osd::OSDPoolGetRequest osd_pool_get_request{pool_name};
+ encode_json("OSDPoolGetRequest", osd_pool_get_request, formatter.get());
+
+ std::ostringstream oss;
+ formatter.get()->flush(oss);
+
+ ceph::bufferlist inbl, outbl;
+ int rc = rados.mon_command(oss.str(), inbl, &outbl, nullptr);
+ ceph_assert(rc == 0);
+
+ JSONParser p;
+ bool success = p.parse(outbl.c_str(), outbl.length());
+ ceph_assert(success);
+
+ ceph::messaging::osd::OSDPoolGetReply osd_pool_get_reply;
+ osd_pool_get_reply.decode_json(&p);
+
+ return osd_pool_get_reply.erasure_code_profile;
+}
+
+/**
+ * Fetch the erasure code profile for the specified pool and return it.
+ *
+ * @param pool_name string Name of the pool to get the EC profile for
+ * @returns ErasureCodeProfile The EC profile for the specified pool
+ */
+ceph::ErasureCodeProfile RadosCommands::get_ec_profile_for_pool(const std::string& pool_name)
+{
+ ceph::messaging::osd::OSDECProfileGetRequest osd_ec_profile_get_req{
+ get_pool_ec_profile_name(pool_name), "plain"};
+ encode_json("OSDECProfileGetRequest", osd_ec_profile_get_req, formatter.get());
+
+ std::ostringstream oss;
+ formatter.get()->flush(oss);
+
+ ceph::bufferlist inbl, outbl;
+ int rc = rados.mon_command(oss.str(), inbl, &outbl, nullptr);
+ ceph_assert(rc == 0);
+
+ // Parse the string output into an ErasureCodeProfile
+ ceph::ErasureCodeProfile profile;
+ std::string line, key, val, out(outbl.c_str(), outbl.length());
+ std::stringstream ss(out);
+
+ while (std::getline(ss, line)) {
+ key = line.substr(0, line.find("="));
+ val = line.substr(line.find("=") + 1, line.length() - 1);
+ profile.emplace(key, val);
+ }
+
+ return profile;
+}
+
+/**
+ * RadosCommands the parity read inject on the acting primary
+ * for the specified object and pool. Assert on failure.
+ *
+ * @param pool_name string Name of the pool to perform inject on
+ * @param oid string OID of the object to perform inject on
+ */
+void RadosCommands::inject_parity_read_on_primary_osd(const std::string& pool_name,
+ const std::string& oid)
+{
+ int primary_osd = get_primary_osd(pool_name, oid);
+ ceph::messaging::osd::InjectECParityRead parity_read_req{pool_name, oid};
+ encode_json("InjectECParityRead", parity_read_req, formatter.get());
+
+ std::ostringstream oss;
+ formatter.get()->flush(oss);
+
+ ceph::bufferlist inbl, outbl;
+ int rc = rados.osd_command(primary_osd, oss.str(), inbl, &outbl, nullptr);
+ ceph_assert(rc == 0);
+}
+
+/**
+ * RadosCommands the clear parity read inject on the acting primary
+ * for the specified object and pool. Assert on failure.
+ *
+ * @param pool_name string Name of the pool to perform inject on
+ * @param oid string OID of the object to perform inject on
+ */
+void RadosCommands::inject_clear_parity_read_on_primary_osd(const std::string& pool_name,
+ const std::string& oid)
+{
+ int primary_osd = get_primary_osd(pool_name, oid);
+ ceph::messaging::osd::InjectECClearParityRead clear_parity_read_req{pool_name, oid};
+ encode_json("InjectECClearParityRead", clear_parity_read_req, formatter.get());
+
+ std::ostringstream oss;
+ formatter.get()->flush(oss);
+
+ ceph::bufferlist inbl, outbl;
+ int rc = rados.osd_command(primary_osd, oss.str(), inbl, &outbl, nullptr);
+ ceph_assert(rc == 0);
+}
\ No newline at end of file
--- /dev/null
+#pragma once
+
+#include <boost/asio/io_context.hpp>
+#include <boost/program_options.hpp>
+#include "common/ceph_json.h"
+#include "librados/librados_asio.h"
+#include "global/global_init.h"
+#include "global/global_context.h"
+#include "erasure-code/ErasureCodePlugin.h"
+
+#define dout_context g_ceph_context
+
+namespace ceph {
+namespace consistency {
+class RadosCommands {
+ private:
+ librados::Rados& rados;
+ std::unique_ptr<JSONFormatter> formatter;
+
+ public:
+ RadosCommands(librados::Rados& rados);
+ int get_primary_osd(const std::string& pool_name,
+ const std::string& oid);
+ std::string get_pool_ec_profile_name(const std::string& pool_name);
+ bool get_pool_allow_ec_optimizations(const std::string& pool_name);
+ ceph::ErasureCodeProfile get_ec_profile_for_pool(const std::string& pool_name);
+ void inject_parity_read_on_primary_osd(const std::string& pool_name,
+ const std::string& oid);
+ void inject_clear_parity_read_on_primary_osd(const std::string& pool_name,
+ const std::string& oid);
+};
+}
+}
\ No newline at end of file
--- /dev/null
+#include <boost/asio/io_context.hpp>
+#include <boost/program_options.hpp>
+
+#include "global/global_init.h"
+#include "global/global_context.h"
+#include "librados/librados_asio.h"
+#include "common/ceph_argparse.h"
+
+#include "ConsistencyChecker.h"
+
+#define dout_context g_ceph_context
+
+namespace po = boost::program_options;
+using bufferlist = ceph::bufferlist;
+
+int main(int argc, char **argv)
+{
+ auto args = argv_to_vec(argc, argv);
+ env_to_vec(args);
+ auto cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_CLIENT,
+ CODE_ENVIRONMENT_UTILITY, 0);
+ common_init_finish(cct.get());
+
+ librados::Rados rados;
+ boost::asio::io_context asio;
+ std::thread thread;
+ std::optional<boost::asio::executor_work_guard<
+ boost::asio::io_context::executor_type>> guard;
+
+ po::options_description desc("ceph_ec_consistency_checker options");
+
+ desc.add_options()
+ ("help,h", "show help message")
+ ("pool,p", po::value<std::string>(), "pool name")
+ ("oid,i", po::value<std::string>(), "object io")
+ ("blocksize,b", po::value<int>(), "block size")
+ ("offset,o", po::value<int>(), "offset")
+ ("length,l", po::value<int>(), "length")
+ ("stripeunit,s", po::value<int>(), "stripe unit");
+
+ po::variables_map vm;
+ std::vector<std::string> unrecognized_options;
+ try {
+ auto parsed = po::command_line_parser(argc, argv)
+ .options(desc)
+ .allow_unregistered()
+ .run();
+ po::store(parsed, vm);
+ if (vm.count("help")) {
+ std::cout << desc << std::endl;
+ return 0;
+ }
+ po::notify(vm);
+ unrecognized_options = po::collect_unrecognized(parsed.options, po::include_positional);
+ } catch(const po::error& e) {
+ std::cerr << "error: " << e.what() << std::endl;
+ return 1;
+ }
+
+ auto pool = vm["pool"].as<std::string>();
+ auto oid = vm["oid"].as<std::string>();
+ auto blocksize = vm["blocksize"].as<int>();
+ auto offset = vm["offset"].as<int>();
+ auto length = vm["length"].as<int>();
+ auto stripe_unit = vm["stripeunit"].as<int>();
+
+ int rc;
+ rc = rados.init_with_context(g_ceph_context);
+ ceph_assert(rc == 0);
+ rc = rados.connect();
+ ceph_assert(rc == 0);
+
+ guard.emplace(boost::asio::make_work_guard(asio));
+ thread = make_named_thread("io_thread",[&asio] { asio.run(); });
+
+ auto checker = ceph::consistency::ConsistencyChecker(rados, asio, pool, stripe_unit);
+ checker.single_read_and_check_consistency(oid, blocksize, offset, length);
+ checker.print_results(std::cout);
+
+ exit(0);
+}
\ No newline at end of file
list<pair<ec_align_t,
pair<bufferlist*, Context*>>> to_read;
unique_ptr<Context> on_complete;
+ CephContext *cct;
cb(const cb &) = delete;
cb(cb &&) = default;
const hobject_t &hoid,
const list<pair<ec_align_t,
pair<bufferlist*, Context*>>> &to_read,
- Context *on_complete)
+ Context *on_complete,
+ CephContext *cct)
: ec(ec),
hoid(hoid),
to_read(to_read),
- on_complete(on_complete) {}
+ on_complete(on_complete),
+ cct(cct) {}
void operator()(ECCommon::ec_extents_t &&results) {
auto dpp = ec->get_parent()->get_dpp();
ldpp_dout(dpp, 20) << "range offset: " << range_offset << dendl;
ldpp_dout(dpp, 20) << "length: " << length << dendl;
ldpp_dout(dpp, 20) << "range length: " << range_length << dendl;
+ if (cct->_conf->bluestore_debug_inject_read_err &&
+ ECInject::test_parity_read(hoid)) {
+ length = range_length;
+ }
ceph_assert((offset + length) <= (range_offset + range_length));
bufs->substr_of(
range.first.get_val(),
cb(this,
hoid,
to_read,
- on_complete)));
+ on_complete,
+ cct)));
}
void ECBackend::objects_read_and_reconstruct(
list<pair<ec_align_t,
pair<bufferlist*, Context*> > > to_read;
unique_ptr<Context> on_complete;
+ CephContext *cct;
cb(const cb&) = delete;
cb(cb &&) = default;
cb(ECBackendL *ec,
const hobject_t &hoid,
const list<pair<ec_align_t,
pair<bufferlist*, Context*> > > &to_read,
- Context *on_complete)
+ Context *on_complete,
+ CephContext *cct)
: ec(ec),
hoid(hoid),
to_read(to_read),
- on_complete(on_complete) {}
+ on_complete(on_complete),
+ cct(cct) {}
void operator()(ECCommonL::ec_extents_t &&results) {
auto dpp = ec->get_parent()->get_dpp();
ldpp_dout(dpp, 20) << "objects_read_async_cb: got: " << results
ldpp_dout(dpp, 20) << "length: " << length << dendl;
ldpp_dout(dpp, 20) << "range length: " << range_length << dendl;
ceph_assert(offset + length <= range_offset + range_length);
+ if (cct->_conf->bluestore_debug_inject_read_err &&
+ ECInject::test_parity_read(hoid)) {
+ length = range_length;
+ }
read.second.first->substr_of(
range.first.get_val(),
offset - range_offset,
cb(this,
hoid,
to_read,
- on_complete)));
+ on_complete,
+ cct)));
}
void ECBackendL::objects_read_and_reconstruct(
}
}
+void ECCommon::ReadPipeline::get_want_to_read_all_shards(
+ const list<ec_align_t> &to_read,
+ ECUtil::shard_extent_set_t &want_shard_reads)
+{
+ for (const auto &single_region: to_read) {
+ sinfo.ro_range_to_shard_extent_set_with_parity(single_region.offset,
+ single_region.size,
+ want_shard_reads);
+ }
+ dout(20) << __func__ << ": to_read " << to_read
+ << " read_request " << want_shard_reads << dendl;
+}
+
+/**
+ * Create a buffer containing both the ordered data and parity shards
+ *
+ * @param buffers_read shard_extent_map_t of shard indexes and their corresponding extent maps
+ * @param read ec_align_t Read size and offset for rados object
+ * @param outbl Pointer to output buffer
+ */
+void ECCommon::ReadPipeline::create_parity_read_buffer(
+ ECUtil::shard_extent_map_t buffers_read,
+ ec_align_t read,
+ bufferlist *outbl)
+{
+ bufferlist data, parity;
+ data = buffers_read.get_ro_buffer(read.offset, read.size);
+
+ for (raw_shard_id_t raw_shard(sinfo.get_k());
+ raw_shard < sinfo.get_k_plus_m(); ++raw_shard) {
+ shard_id_t shard = sinfo.get_shard(raw_shard);
+ buffers_read.get_shard_first_buffer(shard, parity);
+ }
+
+ outbl->append(data);
+ outbl->append(parity);
+}
+
struct ClientReadCompleter final : ECCommon::ReadCompleter {
ClientReadCompleter(ECCommon::ReadPipeline &read_pipeline,
ECCommon::ClientAsyncReadStatus *status
#endif
for (auto &&read: req.to_read) {
- result.insert(read.offset, read.size,
- res.buffers_read.get_ro_buffer(read.offset, read.size));
+ // Return a buffer containing both data and parity
+ // if the parity read inject is set
+ if (cct->_conf->bluestore_debug_inject_read_err &&
+ ECInject::test_parity_read(hoid)) {
+ bufferlist data_and_parity;
+ read_pipeline.create_parity_read_buffer(res.buffers_read, read, &data_and_parity);
+ result.insert(read.offset, data_and_parity.length(), data_and_parity);
+ } else {
+ result.insert(read.offset, read.size,
+ res.buffers_read.get_ro_buffer(read.offset, read.size));
+ }
}
}
dout(20) << __func__ << " calling complete_object with result="
map<hobject_t, read_request_t> for_read_op;
for (auto &&[hoid, to_read]: reads) {
ECUtil::shard_extent_set_t want_shard_reads(sinfo.get_k_plus_m());
- get_want_to_read_shards(to_read, want_shard_reads);
+ if (cct->_conf->bluestore_debug_inject_read_err &&
+ ECInject::test_parity_read(hoid)) {
+ get_want_to_read_all_shards(to_read, want_shard_reads);
+ }
+ else {
+ get_want_to_read_shards(to_read, want_shard_reads);
+ }
read_request_t read_request(to_read, want_shard_reads, false, object_size);
const int r = get_min_avail_to_read_shards(
const std::list<ec_align_t> &to_read,
ECUtil::shard_extent_set_t &want_shard_reads);
+ void get_want_to_read_all_shards(
+ const std::list<ec_align_t> &to_read,
+ ECUtil::shard_extent_set_t &want_shard_reads);
+ void create_parity_read_buffer(
+ ECUtil::shard_extent_map_t buffers_read,
+ ec_align_t read,
+ bufferlist *outbl);
+
/// Returns to_read replicas sufficient to reconstruct want
int get_min_avail_to_read_shards(
const hobject_t &hoid, ///< [in] object
}
}
+void ECCommonL::ReadPipeline::get_want_to_read_all_shards(
+ std::set<int> *want_to_read) const
+{
+ for (int i = 0; i < (int)sinfo.get_k_plus_m(); ++i) {
+ want_to_read->insert(sinfo.get_shard(i));
+ }
+}
+
+/**
+ * Create a buffer containing both the ordered data and parity shards
+ *
+ * @param to_decode Map of shard indexes and their corresponding data buffers
+ * @param wanted_to_read Set of shard indexes to be read
+ * @param outbl Pointer to output buffer
+ */
+void ECCommonL::ReadPipeline::create_parity_read_buffer(
+ std::map<int, bufferlist> to_decode,
+ std::set<int> wanted_to_read,
+ uint64_t read_size,
+ bufferlist *outbl)
+{
+ bufferlist data, parity;
+ std::map<int, bufferlist> parities;
+
+ for (unsigned int i = 0; i < sinfo.get_k_plus_m(); ++i) {
+ unsigned int raw_shard = sinfo.get_raw_shard(i);
+ if (raw_shard >= sinfo.get_k()) {
+ parities[raw_shard] = to_decode[i];
+ wanted_to_read.erase(i);
+ to_decode.erase(i);
+ }
+ }
+
+ for (auto const& p : parities) {
+ parity.append(p.second);
+ }
+
+ dout(20) << __func__ << " going to decode: "
+ << " wanted_to_read=" << wanted_to_read
+ << " to_decode=" << to_decode
+ << dendl;
+ int r = ECUtilL::decode(
+ sinfo,
+ ec_impl,
+ wanted_to_read,
+ to_decode,
+ &data);
+
+ ceph_assert(r == 0);
+
+ outbl->append(data);
+ outbl->append(parity);
+}
+
struct ClientReadCompleter : ECCommonL::ReadCompleter {
ClientReadCompleter(ECCommonL::ReadPipeline &read_pipeline,
ECCommonL::ClientAsyncReadStatus *status)
++j) {
to_decode[static_cast<int>(j->first.shard)] = std::move(j->second);
}
+
+ if (cct->_conf->bluestore_debug_inject_read_err &&
+ ECInject::test_parity_read(hoid)) {
+ bufferlist outbl;
+ read_pipeline.create_parity_read_buffer(to_decode,
+ wanted_to_read,
+ read.size,
+ &outbl);
+
+ result.insert(
+ read.offset, outbl.length(), std::move(outbl));
+ res.returned.pop_front();
+ goto out;
+ }
+
dout(20) << __func__ << " going to decode: "
<< " wanted_to_read=" << wanted_to_read
<< " to_decode=" << to_decode
map<hobject_t, read_request_t> for_read_op;
for (auto &&to_read: reads) {
set<int> want_to_read;
- if (cct->_conf->osd_ec_partial_reads) {
+ if (cct->_conf->bluestore_debug_inject_read_err &&
+ ECInject::test_parity_read(to_read.first)) {
+ get_want_to_read_all_shards(&want_to_read);
+ } else if (cct->_conf->osd_ec_partial_reads) {
for (const auto& single_region : to_read.second) {
get_min_want_to_read_shards(single_region.offset,
single_region.size,
friend struct FinishReadOp;
void get_want_to_read_shards(std::set<int> *want_to_read) const;
+ void get_want_to_read_all_shards(std::set<int> *want_to_read) const;
+ void create_parity_read_buffer(
+ std::map<int, bufferlist> to_decode,
+ std::set<int> wanted_to_read,
+ uint64_t read_size,
+ bufferlist *outbl
+ );
/// Returns to_read replicas sufficient to reconstruct want
int get_min_avail_to_read_shards(
static std::map<ghobject_t,std::pair<int64_t,int64_t>> write_failures3;
static std::map<ghobject_t,shard_id_t> write_failures0_shard;
static std::set<osd_reqid_t> write_failures0_reqid;
-
+ static std::set<hobject_t> parity_reads;
/**
* Configure a read error inject that typically forces additional reads of
* shards in an EC pool to recover data using the redundancy. With multiple
return result;
}
+ /**
+ * Configure a parity read inject that typically forces parity shards in an
+ * EC pool to be returned along with the data shards.
+ *
+ * @brief Set up a parity read inject for an object in an EC pool.
+ * @param o Target object for the parity read inject.
+ * @return string Result of configuring the inject.
+ */
+ std::string parity_read(const hobject_t& o) {
+ std::lock_guard<ceph::recursive_mutex> l(lock);
+ parity_reads.insert(o);
+ return "ok - parity read inject set";
+ }
+
/**
* @brief Clear a previously configured read error inject.
* @param o Target object for the error inject.
return "ok - " + std::to_string(remaining) + " injects cleared";
}
+ /**
+ * @brief Clear a previously configured parity read inject.
+ * @param o Target object for the clear parity read inject.
+ * @return string Indication of whether the inject was cleared.
+ */
+ std::string clear_parity_read(const hobject_t& o) {
+ std::lock_guard<ceph::recursive_mutex> l(lock);
+ if (!parity_reads.count(o)) {
+ return "no outstanding parity read inject";
+ }
+ parity_reads.erase(o);
+ return "ok - parity read inject cleared";
+ }
+
static bool test_error(const ghobject_t& o,
std::map<ghobject_t,std::pair<int64_t,int64_t>> *failures)
{
&write_failures3);
}
+ /**
+ * @brief Test whether parity read inject is set for a particular object.
+ * @param o Target object for the parity read inject.
+ * @return bool true if set, otherwise false.
+ */
+ bool test_parity_read(const hobject_t& o)
+ {
+ std::lock_guard<ceph::recursive_mutex> l(lock);
+ return (parity_reads.count(o));
+ }
} // ECInject
\ No newline at end of file
// Error inject interfaces
std::string read_error(const ghobject_t& o, const int64_t type, const int64_t when, const int64_t duration);
std::string write_error(const ghobject_t& o, const int64_t type, const int64_t when, const int64_t duration);
+ std::string parity_read(const hobject_t& o);
std::string clear_read_error(const ghobject_t& o, const int64_t type);
std::string clear_write_error(const ghobject_t& o, const int64_t type);
+ std::string clear_parity_read(const hobject_t& o);
bool test_read_error0(const ghobject_t& o);
bool test_read_error1(const ghobject_t& o);
bool test_write_error0(const hobject_t& o,const osd_reqid_t& reqid);
bool test_write_error1(const ghobject_t& o);
bool test_write_error2(const hobject_t& o);
bool test_write_error3(const hobject_t& o);
+ bool test_parity_read(const hobject_t& o);
} // ECInject
test_ops_hook,
"Inject a full disk (optional count times)");
ceph_assert(r == 0);
+ r = admin_socket->register_command(
+ "injectparityread " \
+ "name=pool,type=CephString " \
+ "name=objname,type=CephObjectname ",
+ test_ops_hook,
+ "Tell the OSD to return the parity chunks along with the next read");
+ ceph_assert(r == 0);
+ r = admin_socket->register_command(
+ "injectclearparityread " \
+ "name=pool,type=CephString " \
+ "name=objname,type=CephObjectname ",
+ test_ops_hook,
+ "Clear a parity read inject");
+ ceph_assert(r == 0);
+
r = admin_socket->register_command(
"bench " \
"name=count,type=CephInt,req=false " \
command == "truncobj" ||
command == "injectmdataerr" || command == "injectdataerr" ||
command == "injectecreaderr" || command == "injectecclearreaderr" ||
- command == "injectecwriteerr" || command == "injectecclearwriteerr"
+ command == "injectecwriteerr" || command == "injectecclearwriteerr" ||
+ command == "injectparityread" || command == "injectclearparityread"
) {
pg_t rawpg;
int64_t pool;
(command != "injectecreaderr") &&
(command != "injectecclearreaderr") &&
(command != "injectecwriteerr") &&
- (command != "injectecclearwriteerr")) {
+ (command != "injectecclearwriteerr") &&
+ (command != "injectparityread") &&
+ (command != "injectclearparityread")) {
ss << "Must not call on ec pool";
return;
}
if ((command == "injectecreaderr") ||
(command == "injecteclearreaderr") ||
(command == "injectecwriteerr") ||
- (command == "injecteclearwriteerr")) {
+ (command == "injecteclearwriteerr") ||
+ (command == "injectparityread") ||
+ (command == "injectclearparityread")) {
ss << "Only supported on ec pool";
return;
}
} else {
ss << "bluestore_debug_inject_read_err not enabled";
}
+ } else if (command == "injectparityread") {
+ if (service->cct->_conf->bluestore_debug_inject_read_err) {
+ ss << "injectparityread: " << ECInject::parity_read(obj);
+ } else {
+ ss << "bluestore_debug_inject_read_err not enabled";
+ }
+ } else if (command == "injectclearparityread") {
+ if (service->cct->_conf->bluestore_debug_inject_read_err) {
+ ss << "injectclearparityread: " << ECInject::clear_parity_read(obj);
+ } else {
+ ss << "bluestore_debug_inject_read_err not enabled";
+ }
}
return;
}