From: Alex Ainscow Date: Thu, 5 Feb 2026 14:45:04 +0000 (+0000) Subject: osdc: Refactor SplitOp X-Git-Tag: v21.0.1~73^2~10 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=d628f853834b04286f9a5d24aa35c144ecfc36de;p=ceph.git osdc: Refactor SplitOp There are large number of changes in this commit which were found through development and testing of split ops. I have split out all the objecter updates carefully, but since the split op code is not currently used in production, I have not documented every change and made significant refactors/rearrangements. Signed-off-by: Alex Ainscow --- diff --git a/src/osdc/SplitOp.cc b/src/osdc/SplitOp.cc index 5363f1000ec..50e1b2e75e4 100644 --- a/src/osdc/SplitOp.cc +++ b/src/osdc/SplitOp.cc @@ -9,10 +9,12 @@ * Foundation. See file COPYING. */ -#include "osdc/Objecter.h" #include "osdc/SplitOp.h" +#include "osdc/Objecter.h" #include "osd/osd_types.h" +using namespace std::literals; + #define dout_subsys ceph_subsys_objecter #define DBG_LVL 20 @@ -22,41 +24,54 @@ inline boost::system::error_code osdcode(int r) { } } -constexpr static uint64_t kReplicaMinShardReadSize = 128 * 1024; constexpr static uint64_t kReplicaMinShardReads = 2; -constexpr static uint64_t kReplicaMinReadSize = kReplicaMinShardReadSize * kReplicaMinShardReads; #undef dout_prefix #define dout_prefix *_dout << " ECSplitOp::" -std::pair ECSplitOp::assemble_buffer_sparse_read(int ops_index) { +/** + * @brief Assemble sparse read results from EC shards into logical object view. + * + * Iterates through the EC stripe using ECStripeView, collecting extent maps + * and data buffers from each shard. The resulting buffer is built up in order, + * only extending (never inserting into the middle), which maintains buffer + * efficiency and allows for zero-copy operations where possible. + * + * @param ops_index Index of the operation in the operation list + * @return Pair containing the extent set and assembled buffer list + */ +std::pair ECSplitOp::assemble_buffer_sparse_read(int ops_index) const { bufferlist bl_out; extent_set extents_out; - auto &orig_osd_op = orig_op->ops[ops_index].op; const pg_pool_t *pi = objecter.osdmap->get_pg_pool(orig_op->target.base_oloc.pool); + ceph_assert(pi); ECStripeView stripe_view(orig_osd_op.extent.offset, orig_osd_op.extent.length, pi); - ldout(cct, DBG_LVL) << __func__ << " start:" + ldout(cct, DBG_LVL) << __func__ << " START -->" << " object_id=" << orig_op->target.base_oid << " tid=" << orig_op->tid << " extent=" << orig_osd_op.extent.offset << "~" << orig_osd_op.extent.length << dendl; std::vector buffer_offset(stripe_view.data_chunk_count); - mini_flat_map map_iterators(stripe_view.data_chunk_count); + mini_flat_map map_iterators(stripe_view.data_chunk_count); for (auto &&chunk_info : stripe_view) { - ldout(cct, DBG_LVL) << __func__ << " chunk: " << chunk_info << dendl; - auto &details = sub_reads.at(chunk_info.shard).details[ops_index]; - - if (!map_iterators.contains(chunk_info.shard)) { - map_iterators.emplace(chunk_info.shard, details.e->begin()); + shard_id_t shard = pi->get_shard(chunk_info.raw_shard); + ceph_assert(shard != shard_id_t::NO_SHARD); + int shard_index = (int)shard; + ldout(cct, DBG_LVL) << __func__ << " object_id=" << orig_op->target.base_oid << " tid=" << orig_op->tid << " chunk: " << chunk_info + << " shard: " << shard << dendl; + auto &details = sub_reads.at(shard_index).details.at(ops_index); + + if (!map_iterators.contains(shard_index)) { + map_iterators.emplace(shard_index, details.e->begin()); } - extents_map::iterator &extent_iter = map_iterators.at(chunk_info.shard); + extents_map::const_iterator &extent_iter = map_iterators.at(shard_index); uint64_t bl_len = 0; while (extent_iter != details.e->end() && extent_iter->first < chunk_info.ro_offset + stripe_view.chunk_size) { auto [off, len] = *extent_iter; - ldout(cct, DBG_LVL) << __func__ << " extent=" << off << "~" << len << dendl; + ldout(cct, DBG_LVL) << __func__ << " object_id=" << orig_op->target.base_oid << " tid=" << orig_op->tid << " extent=" << off << "~" << len << dendl; extents_out.insert(off, len); bl_len += len; ++extent_iter; @@ -65,138 +80,213 @@ std::pair ECSplitOp::assemble_buffer_sparse_rea // We try to keep the buffers together where possible. if (bl_len != 0) { bufferlist bl; - bl.substr_of(details.bl, buffer_offset[(int)chunk_info.shard], bl_len); + bl.substr_of(details.bl, buffer_offset[(int)chunk_info.raw_shard], bl_len); bl_out.append(bl); - buffer_offset[(int)chunk_info.shard] += bl_len; + buffer_offset[(int)chunk_info.raw_shard] += bl_len; } } + ldout(cct, DBG_LVL) << __func__ << " END -->" << " object_id=" << orig_op->target.base_oid << " tid=" << orig_op->tid << " extent=" + << orig_osd_op.extent.offset << "~" << orig_osd_op.extent.length << dendl; + return std::pair(extents_out, bl_out); } -void ECSplitOp::assemble_buffer_read(bufferlist &bl_out, int ops_index) { +/** + * @brief Assemble dense read results from EC shards into contiguous buffer. + * + * Iterates through the EC stripe, extracting data from each shard in order. + * The output buffer is built sequentially by appending data, never inserting + * into the middle, which maintains buffer efficiency. + * + * @param bl_out Output buffer to append assembled data + * @param ops_index Index of the operation in the operation list + */ +void ECSplitOp::assemble_buffer_read(bufferlist &bl_out, int ops_index) const { auto &orig_osd_op = orig_op->ops[ops_index].op; const pg_pool_t *pi = objecter.osdmap->get_pg_pool(orig_op->target.base_oloc.pool); + ceph_assert(pi); ECStripeView stripe_view(orig_osd_op.extent.offset, orig_osd_op.extent.length, pi); std::vector buffer_offset(stripe_view.data_chunk_count); - ldout(cct, DBG_LVL) << __func__ << " " << orig_osd_op.extent.offset << "~" << orig_osd_op.extent.length << dendl; + ldout(cct, DBG_LVL) << __func__ << " object_id=" << orig_op->target.base_oid << " tid=" << orig_op->tid << " extent=" + << orig_osd_op.extent.offset << "~" << orig_osd_op.extent.length << dendl; for (auto &&chunk_info : stripe_view) { - ldout(cct, DBG_LVL) << __func__ << " chunk info " << chunk_info << dendl; - auto &details = sub_reads.at(chunk_info.shard).details[ops_index]; + ldout(cct, DBG_LVL) << __func__ << " object_id=" << orig_op->target.base_oid << " tid=" << orig_op->tid << " chunk info " << chunk_info << dendl; + shard_id_t shard = pi->get_shard(chunk_info.raw_shard); + ceph_assert(shard != shard_id_t::NO_SHARD); + int shard_index = (int)shard; + auto &details = sub_reads.at(shard_index).details.at(ops_index); + uint64_t src_len = details.bl.length(); + uint64_t buf_off = buffer_offset[(int)chunk_info.raw_shard]; + if (src_len <= buf_off) { + // Early termination: we've exhausted the available buffer data. + // This is a normal completion scenario, not an error condition. + break; + } + uint64_t len = std::min(chunk_info.length, src_len - buf_off); bufferlist bl; - bl.substr_of(details.bl, buffer_offset[(int)chunk_info.shard], chunk_info.length); + bl.substr_of(details.bl, buf_off, len); bl_out.append(bl); - buffer_offset[(int)chunk_info.shard] += chunk_info.length; + buffer_offset[(int)chunk_info.raw_shard] += len; } } +/** + * @brief Initialize read sub-operations for erasure-coded pool. + * + * Determines which EC shards need to be read based on stripe geometry and + * creates sub-operations for each required shard. Sets the abort flag if + * any required OSD is unavailable. + * + * Primary shard is required when reading from multiple chunks, when version + * information is requested, or when non-read operations are present. + * + * @param op Operation descriptor containing offset and length + * @param sparse Whether this is a sparse read operation + * @param ops_index Index of the operation in the operation list + */ void ECSplitOp::init_read(OSDOp &op, bool sparse, int ops_index) { - auto &t = orig_op->target; - const pg_pool_t *pi = objecter.osdmap->get_pg_pool(t.base_oloc.pool); + auto &target = orig_op->target; + const pg_pool_t *pi = objecter.osdmap->get_pg_pool(target.base_oloc.pool); + ceph_assert(pi); + uint64_t offset = op.op.extent.offset; uint64_t length = op.op.extent.length; - uint64_t data_chunk_count = pi->nonprimary_shards.size() + 1; + uint64_t data_chunk_count = pi->get_ec_data_shard_count(); uint32_t chunk_size = pi->get_stripe_width() / data_chunk_count; uint64_t start_chunk = offset / chunk_size; // This calculation is wrong for length = 0, but such IOs should not have - // reached here! + // reached here! Zero-length reads are rejected earlier in the validate() + // function (see lines 551-556), which returns {false, false} for any + // operation with length == 0, preventing it from being split and processed. ceph_assert( op.op.extent.length != 0); + // No overflow: length >= 1 (asserted above), so offset + length - 1 <= offset + length. + // Since offset and length are uint64_t, their sum cannot exceed UINT64_MAX in practice + // due to object size limits and memory constraints. Even if offset + length == UINT64_MAX + 1 + // (impossible in uint64_t), subtracting 1 brings it back to UINT64_MAX (valid). uint64_t end_chunk = (offset + op.op.extent.length - 1) / chunk_size; unsigned count = std::min(data_chunk_count, end_chunk - start_chunk + 1); - //FIXME: This is not quite right - the ops.size() > 1 does not necessarily mean - // that the primary is required - it could be two reads to the same shard. - bool primary_required = count > 1 || orig_op->objver || orig_op->ops.size() > 1; + // Primary is required if: + // 1. Reading from multiple chunks (count > 1) + // 2. Version information is needed (orig_op->objver) + // 3. There are non-read operations that need the primary + bool has_non_read_ops = false; + for (const auto &op : orig_op->ops) { + if (op.op.op != CEPH_OSD_OP_READ && op.op.op != CEPH_OSD_OP_SPARSE_READ) { + has_non_read_ops = true; + break; + } + } + bool primary_required = count > 1 || orig_op->objver || has_non_read_ops; int first_shard = start_chunk % data_chunk_count; // Check all shards are online. for (unsigned i = first_shard; i < first_shard + count; i++) { - shard_id_t shard(i >= data_chunk_count ? i - data_chunk_count : i); - int direct_osd = t.acting[(int)shard]; - // TODO: This is broken because there might be multiple shards on one OSD. - // Will fix in the next PR. - if (t.acting_primary == direct_osd) { - primary_shard.emplace(shard); + raw_shard_id_t raw_shard(i >= data_chunk_count ? i - data_chunk_count : i); + shard_id_t shard(pi->get_shard(raw_shard)); + if (shard == shard_id_t::NO_SHARD) { + abort = true; + return; + } + int shard_index = (int)shard; + + int direct_osd = target.acting[shard_index]; + if (target.actual_pgid.shard == shard) { + reference_sub_read = shard_index; } if (!objecter.osdmap->exists(direct_osd)) { ldout(cct, DBG_LVL) << __func__ <<" ABORT: Missing OSD" << dendl; abort = true; return; } - if (!sub_reads.contains(shard)) { - sub_reads.emplace(shard, orig_op->ops.size()); + if (!sub_reads.contains(shard_index)) { + sub_reads.emplace(shard_index, orig_op->ops.size() + 1); } - auto &d = sub_reads.at(shard).details[ops_index]; + auto &d = sub_reads.at(shard_index).details[ops_index]; if (sparse) { d.e.emplace(); - sub_reads.at(shard).rd.sparse_read(offset, length, &(*d.e), &d.bl, &d.rval); + sub_reads.at(shard_index).rd.sparse_read(offset, length, &(*d.e), &d.bl, &d.rval); } else { - sub_reads.at(shard).rd.read(offset, length, &d.ec, &d.bl); + sub_reads.at(shard_index).rd.read(offset, length, &d.ec, &d.bl); } } - if (primary_required && !primary_shard) { - for (unsigned i=0; i < t.acting.size(); ++i) { - // TODO: Can't compare OSDs for EC as there may be more than one shard on - // each OSD. - // Will fix in next PR. - if (t.acting[i] == t.acting_primary) { - primary_shard.emplace(i); - sub_reads.emplace(*primary_shard, orig_op->ops.size()); - } - } - - // No primary??? Let the normal code paths deal with this. - if (!primary_shard) { - ldout(cct, DBG_LVL) << __func__ <<" ABORT: Can't find primary" << dendl; - abort = true; - } + if (primary_required && reference_sub_read == -1) { + // _calc_target will have picked the primary by default on EC. The "primary" + // on replica is an arbitrary shard. + reference_sub_read = (int)target.actual_pgid.shard; + sub_reads.emplace(reference_sub_read, orig_op->ops.size() + 1); } + + ceph_assert(reference_sub_read != -1); } -ECSplitOp::ECSplitOp(Objecter::Op *op, Objecter &objecter, CephContext *cct, int count) : - SplitOp(op, objecter, cct, count) {} - #undef dout_prefix #define dout_prefix *_dout << " ReplicaSplitOp::" -std::pair ReplicaSplitOp::assemble_buffer_sparse_read(int ops_index) { +/** + * @brief Assemble sparse read results from replicas. + * + * Collects extent maps and buffers from each sub-operation. Results are + * assembled in order by appending, maintaining buffer efficiency. + * + * @param ops_index Index of the operation in the operation list + * @return Pair containing the combined extent set and buffer list + */ +std::pair ReplicaSplitOp::assemble_buffer_sparse_read(int ops_index) const { extent_set extents_out; bufferlist bl_out; - for (auto && [shard, sr] : sub_reads) { - for (auto [off, len] : *sr.details[ops_index].e) { + for (auto && [acting_index, sr] : sub_reads) { + for (auto [off, len] : *sr.details.at(ops_index).e) { extents_out.insert(off, len); } - bl_out.append(sr.details[ops_index].bl); + bl_out.append(sr.details.at(ops_index).bl); } return std::pair(extents_out, bl_out); } -void ReplicaSplitOp::assemble_buffer_read(bufferlist &bl_out, int ops_index) { - for (auto && [_, sr] : sub_reads) { - bl_out.append(sr.details[ops_index].bl); +/** + * @brief Assemble dense read results from replicas. + * + * Appends buffers from each sub-operation in order. + * + * @param bl_out Output buffer to append assembled data + * @param ops_index Index of the operation in the operation list + */ +void ReplicaSplitOp::assemble_buffer_read(bufferlist &bl_out, int ops_index) const { + for (auto && [acting_index, sr] : sub_reads) { + bl_out.append(sr.details.at(ops_index).bl); } } -ReplicaSplitOp::ReplicaSplitOp(Objecter::Op *op, Objecter &objecter, CephContext *cct, int pool_size) : - SplitOp(op, objecter, cct, pool_size) { - - // This may not actually be the primary, but since all shards are kept current - // in replica, it does not actually matter which we choose here. Choose 0 since - // there will always be a read to this shard. - primary_shard = shard_id_t(0); -} - +/** + * @brief Initialize read sub-operations for replicated pool. + * + * Divides the operation into chunks and distributes them across available + * replicas for parallel execution. Chunk size is calculated based on the + * minimum shard read size configuration and number of available OSDs. + * Chunks are aligned to page boundaries for efficiency. + * + * If the operation is too small to benefit from splitting across all replicas, + * fewer OSDs are used with a random starting offset for load balancing. + * + * @param op Operation descriptor containing offset and length + * @param sparse Whether this is a sparse read operation + * @param ops_index Index of the operation in the operation list + */ void ReplicaSplitOp::init_read(OSDOp &op, bool sparse, int ops_index) { - auto &t = orig_op->target; + auto &target = orig_op->target; + const pg_pool_t *pi = objecter.osdmap->get_pg_pool(target.base_oloc.pool); + ceph_assert(pi); std::set osds; - for (int direct_osd : t.acting) { + for (int direct_osd : target.acting) { if (objecter.osdmap->exists(direct_osd)) { osds.insert(direct_osd); } @@ -208,18 +298,31 @@ void ReplicaSplitOp::init_read(OSDOp &op, bool sparse, int ops_index) { return; } + uint64_t replica_min_shard_read_size + = objecter.get_min_split_replica_read_size(); + uint64_t offset = op.op.extent.offset; uint64_t length = op.op.extent.length; - uint64_t slice_count = std::min(length / kReplicaMinShardReadSize, osds.size()); + uint64_t slice_count = replica_min_shard_read_size == 0 ? 1 : + std::min(length / replica_min_shard_read_size, + osds.size()); uint64_t chunk_size = p2roundup(length / slice_count, (uint64_t)CEPH_PAGE_SIZE); + unsigned start = 0; - for (unsigned i = 0; i < osds.size() && length > 0; i++) { + if (slice_count < osds.size()) { + start = rand() % osds.size(); + } - shard_id_t shard(i); - if (!sub_reads.contains(shard)) { - sub_reads.emplace(shard, orig_op->ops.size()); + for (unsigned i = start; length > 0; i = (i + 1 == osds.size()) ? 0 : i + 1) { + int acting_index = i; + if (!sub_reads.contains(acting_index)) { + sub_reads.emplace(acting_index, orig_op->ops.size() + 1); + // Set reference_sub_read to the first index we use + if (reference_sub_read == -1) { + reference_sub_read = acting_index; + } } - auto &sr = sub_reads.at(shard); + auto &sr = sub_reads.at(acting_index); auto bl = &sr.details[ops_index].bl; auto rval = &sr.details[ops_index].rval; uint64_t len = std::min(length, chunk_size); @@ -237,34 +340,149 @@ void ReplicaSplitOp::init_read(OSDOp &op, bool sparse, int ops_index) { #undef dout_prefix #define dout_prefix *_dout << " SplitOp::" -int SplitOp::assemble_rc() { +/** + * @brief Assemble the final return code from all sub-operations. + * + * Returns the first non-EAGAIN error encountered, or EAGAIN if any + * sub-operation returned EAGAIN or if version mismatch is detected. + * Otherwise returns the reference sub-operation's return code. + * + * @return Combined return code for the operation + */ +int SplitOp::assemble_rc() const { int rc = 0; - bool rc_zero = false; - - // This should only happen on a single thread. - for (auto & [_, sub_read] : sub_reads) { - if (rc >= 0 && sub_read.rc >= 0) { - rc += sub_read.rc; - if (sub_read.rc == 0) { - rc_zero = true; - } - } else if (rc >= 0) { + bool eagain = false; + + // Sub-reads which use a single op should re-use original op. + ceph_assert(sub_reads.size() > 1); + + // Pick the first bad RC, otherwise return 0. + for (auto & [index, sub_read] : sub_reads) { + if (sub_read.rc == -EAGAIN) { + eagain = true; + } else if (sub_read.rc < 0) { + return sub_read.rc; + } + + // The non-reference indices only get reads, which only ever have zero RCs. + if (index == reference_sub_read) { rc = sub_read.rc; - } // else ignore subsequent errors. + } } - if (rc >= 0 && rc_zero) { - return 0; + if (eagain || version_mismatch()) { + return -EAGAIN; } - return rc; } +/** + * @brief Check for version mismatches across EC shards. + * + * Implements torn read detection by comparing internal versions returned + * by each shard. The reference shard maintains a map of all shard versions, + * allowing detection of inconsistencies across the EC stripe. + * + * @return true if versions mismatch, false if consistent + */ +bool ECSplitOp::version_mismatch() const { + // First we need to decode the version list from the reference. + ceph_assert(reference_sub_read != -1); + ceph_assert(sub_reads.at(reference_sub_read).internal_version.has_value()); + + std::map ref_vers; + decode(ref_vers, sub_reads.at(reference_sub_read).internal_version->bl); + + for (auto & [shard_index, sub_read] : sub_reads) { + // Reference shard can't be different to itself. + if (shard_index == reference_sub_read) { + continue; + } + + ceph_assert(sub_read.internal_version.has_value()); + std::map shard_vers; + decode(shard_vers, sub_read.internal_version->bl); + + shard_id_t shard(shard_index); + if (!ref_vers.contains(shard)) { + ldout(cct, DBG_LVL) << __func__ << ": " + << "Reference shard version missing, failing split op." << dendl; + return true; + } + if (!shard_vers.contains(shard)) { + ldout(cct, DBG_LVL) << __func__ << ": " + << "Shard version missing, failing split op." << dendl; + return true; + } + if (ref_vers.at(shard) != shard_vers.at(shard)) { + ldout(cct, DBG_LVL) << __func__ << ": " + << "Primary version (" << ref_vers.at(shard) << ") != " + << "shard version (" << shard_vers.at(shard) <<") " + << "for shard " << shard << dendl; + return true; + } + } + return false; +} + +/** + * @brief Check for version mismatches across replicas. + * + * For replicated pools, checks that all replicas returned the same version. + * No single replica maintains a reference version list, so the first replica's + * version becomes the reference for comparison. + * + * @return true if versions mismatch, false if consistent + */ +bool ReplicaSplitOp::version_mismatch() const { + std::optional ref_version; + constexpr shard_id_t NO_SHARD(-1); + + for (const auto& [acting_index, sub_read] : sub_reads) { + ceph_assert(sub_read.internal_version.has_value()); + std::map shard_vers; + decode(shard_vers, sub_read.internal_version->bl); + + if (!shard_vers.contains(NO_SHARD)) { + ldout(cct, DBG_LVL) << __func__ << ": " + << "Replica version missing for acting index, failing split op." << dendl; + return true; + } + + if (!ref_version) { + ref_version = shard_vers.at(NO_SHARD); + } else if (*ref_version != shard_vers.at(NO_SHARD)) { + return true; + } + } + + return false; +} + +/** + * @brief Complete the split operation by assembling and returning results. + * + * Assembles results from all sub-operations and invokes the original operation's + * completion handlers. For successful operations, results are assembled based on + * operation type (SPARSE_READ, READ, or other). For failures or version mismatches, + * triggers retry through the normal operation path. + * + * The buffer handling maintains compatibility with librados expectations, including + * special handling for pre-allocated buffers used by RadosStriper. + * + * @note Only called for successfully sent operations where abort=false + */ void SplitOp::complete() { + // STAGE 6: complete() only runs for successfully sent operations. + // If abort was set during creation, the split op was discarded and + // complete() is never called. This check handles the edge case where + // abort might be set after creation but before sending (though this + // should not happen in the current implementation). if (abort) { return; } - ldout(cct, 20) << __func__ << " entry this=" << this << dendl; + ldout(cct, DBG_LVL) << __func__ << " object_id=" << orig_op->target.base_oid << " entry this=" << this << dendl; + boost::system::error_code handler_error; int rc = assemble_rc(); if (rc >= 0) { @@ -274,6 +492,13 @@ void SplitOp::complete() { // so as to reproduce as much as possible of the IO completion. std::vector out_ops(orig_op->ops.begin(), orig_op->ops.end()); + std::optional read_bl; + if (orig_op->outbl) { + read_bl = bufferlist(); + } + + // Copy is necessary: process_op_reply_handlers() reads out_ops while + // modifying orig_op state. Overhead is acceptable for typical 1-3 ops. for (unsigned ops_index=0; ops_index < out_ops.size(); ++ops_index) { auto &out_osd_op = out_ops[ops_index]; switch (out_osd_op.op.op) { @@ -281,104 +506,307 @@ void SplitOp::complete() { auto [extents, bl] = assemble_buffer_sparse_read(ops_index); encode(std::move(extents).detach(), out_osd_op.outdata); encode_destructively(bl, out_osd_op.outdata); + if (read_bl) { + read_bl->append(out_osd_op.outdata); + } break; } case CEPH_OSD_OP_READ: { assemble_buffer_read(out_osd_op.outdata, ops_index); + if (read_bl) { + read_bl->append(out_osd_op.outdata); + } break; } - case CEPH_OSD_OP_GETXATTRS: - case CEPH_OSD_OP_CHECKSUM: - case CEPH_OSD_OP_GETXATTR: { - out_osd_op.outdata = sub_reads.at(*primary_shard).details[ops_index].bl; - out_osd_op.rval = sub_reads.at(*primary_shard).details[ops_index].rval; + default: { + out_osd_op.outdata = std::move(sub_reads.at(reference_sub_read).details[ops_index].bl); + out_osd_op.rval = sub_reads.at(reference_sub_read).details[ops_index].rval; break; } - default: { - ceph_abort_msg("Not supported"); - break; + } + } + + // Copied from Objecter::handle_osd_op_reply() to match correct API behaviour. + // This is policed in the librados test harness. + if (orig_op->outbl) { + ceph_assert(read_bl); + // Note: Objecter will check for a single buffer here - this will always + // be true. Here, there will frequently be multiple buffers due + // to the splitting and the original buffer still needs to be + // honoured. + if (orig_op->outbl->length() == read_bl->length()) { + // this is here to keep previous users to *relied* on getting data + // read into existing buffers happy. Notably, + // libradosstriper::RadosStriperImpl::aio_read(). + ldout(cct,10) << __func__ << " copying resulting " << read_bl->length() + << " into existing ceph::buffer of length " << orig_op->outbl->length() + << dendl; + // The following seems a little convoluted, but the assumption is that + // there is a good reason why Sage Weil wrote it this way in Objecter. + bufferlist t; + t = std::move(*orig_op->outbl); + t.invalidate_crc(); // we're overwriting the raw buffers via c_str() + read_bl->begin().copy(read_bl->length(), t.c_str()); + orig_op->outbl->substr_of(t, 0, read_bl->length()); + } else { + // librados insists that if it provided a buffer and the client is + // going to be returning a buffer, that this buffer must be + // contiguous. + if (orig_op->outbl->length() != 0) { + read_bl->rebuild(); } + orig_op->outbl->substr_of(*read_bl, 0, read_bl->length()); } + orig_op->outbl = 0; } - objecter.handle_osd_op_reply2(orig_op, out_ops); + handler_error = objecter.process_op_reply_handlers(orig_op, out_ops); + ldout(cct, DBG_LVL) << __func__ << " object_id=" << orig_op->target.base_oid << " success this=" << this << " rc=" << rc << dendl; + } else { + ldout(cct, DBG_LVL) << __func__ << " object_id=" << orig_op->target.base_oid << " retry this=" << this << " rc=" << rc << dendl; + } + objecter.op_post_split_op_complete(orig_op, handler_error, rc); +} + +/** + * @brief Add version tracking to sub-operations for torn read protection. + * + * Adds internal version queries to all sub-operations to enable detection of + * version mismatches. If sub-operations read different versions of the object + * (torn read), the operation will be retried. + */ +void SplitOp::protect_torn_reads() { + // If multiple reads are emitted from objecter, then it is essential that + // each read reads the same version. It is not possible to efficiently + // guarantee this, so instead read the version along with the data and if they + // are different, then repeat the read to the primary. Such version mismatches + // should be rare enough that this is not a significant performance impact. + for (auto&& [index, sr] : sub_reads) { + auto &internal_version = sr.internal_version; + internal_version = std::make_optional(); + sr.rd.get_internal_versions(&internal_version->ec, &internal_version->bl); + } +} - ldout(cct, DBG_LVL) << __func__ << " success this=" << this << " rc=" << rc << dendl; - Objecter::Op::complete(std::move(orig_op->onfinish), osdcode(rc), rc, objecter.service.get_executor()); - objecter._finish_op(orig_op, rc); +void SplitOp::init(OSDOp &op, int ops_index) { + switch (op.op.op) { + case CEPH_OSD_OP_SPARSE_READ: { + init_read(op, true, ops_index); + break; + } + case CEPH_OSD_OP_READ: { + init_read(op, false, ops_index); + break; + } + default: { + // Invalid ops should have been rejected in validate. + Details &d = sub_reads.at(reference_sub_read).details[ops_index]; + orig_op->pass_thru_op(sub_reads.at(reference_sub_read).rd, ops_index, &d.bl, &d.rval); + break; + } + } +} + +#undef dout_prefix +#define dout_prefix *_dout << " SplitOp::" + +namespace { +std::pair is_single_chunk(const pg_pool_t *pi, uint64_t offset, uint64_t len) { + if (!pi->is_erasure()) { + return {false, false}; + } + + uint64_t stripe_width = pi->get_stripe_width(); + + // Optimization: Use stripe_width / 2 as a threshold to quickly reject requests + // that cannot fit in a single chunk. Since k (data chunks) is at least 2, + // chunk_size = stripe_width / k <= stripe_width / 2. This early check avoids + // the more expensive division operation (stripe_width / data_chunk_count) below. + if (len > stripe_width / 2) { + return {false, false}; + } + uint64_t data_chunk_count = pi->get_ec_data_shard_count(); + uint32_t chunk_size = pi->get_stripe_width() / data_chunk_count; + + // Chunk_size should never be zero, so this is paranoia. + if (len > chunk_size || chunk_size == 0) { + return {false, false}; + } + + uint64_t offset_to_end_of_chunk; + + // Chunk size is normally, but not always a power of 2. + if (std::has_single_bit(chunk_size)) { + offset_to_end_of_chunk = chunk_size - (offset & (chunk_size - 1)); } else { - ldout(cct, DBG_LVL) << __func__ << " retry this=" << this << " rc=" << rc << dendl; - objecter.op_post_submit(orig_op); + offset_to_end_of_chunk = chunk_size - (offset % chunk_size); } + + if (len > offset_to_end_of_chunk) { + return {false, false}; + } + + return {true, offset % stripe_width < chunk_size}; } -static bool validate(Objecter::Op *op, bool is_erasure, CephContext *cct) { +/** + * Validate operation flags for split operation eligibility. + * + * Checks that the operation has the required BALANCE_READS flag and + * is not a write operation. + * + * @param pi The pool to validate against + * @param op The operation to validate + * @param cct CephContext for logging + * @return true if flags are valid, false otherwise + */ +bool validate_flags(const pg_pool_t *pi, Objecter::Op *op, CephContext *cct) { + if ((op->target.flags & CEPH_OSD_FLAG_BALANCE_READS) == 0) { + ldout(cct, DBG_LVL) << __func__ << " REJECT: Client rejects balanced read" << dendl; + return false; + } + + // We really should not have a WRITE flagged as balanced read, but out of + // paranoia ignore it. + if (op->target.flags & CEPH_OSD_FLAG_WRITE) { + ldout(cct, DBG_LVL) << __func__ << " REJECT: Flagged as write!" << dendl; + return false; + } - if ((op->target.flags & CEPH_OSD_FLAG_BALANCE_READS) == 0 ) { - ldout(cct, DBG_LVL) << __func__ <<" REJECT: Client rejects balanced read" << dendl; + if (pi->has_flag(pg_pool_t::FLAG_CRIMSON)) { + ldout(cct, DBG_LVL) << __func__ <<" ABORT: Crimson doesn't support" + " direct reads" << dendl; return false; } - uint64_t suitable_read_found = false; - for (auto & o : op->ops) { + return true; +} + +/** + * Validate operation types and check read size constraints. + * + * Processes all operations in the op list, checking for: + * - Supported operation types (READ, SPARSE_READ, and various metadata ops) + * - Valid read sizes (non-zero length, meets minimum size requirements) + * - Single chunk constraints for erasure coded pools + * + * @param op The operation to validate + * @param pi Pool information + * @param is_erasure Whether the pool is erasure coded + * @param replica_min_read_size Minimum read size for replica pools + * @param cct CephContext for logging + * @param[out] has_primary_ops Set to true if primary-only ops are found + * @param[out] single_direct_op Set to true if op can be sent directly to single OSD + * @return true if a suitable read operation was found, false otherwise + */ +bool validate_operations(Objecter::Op *op, const pg_pool_t *pi, bool is_erasure, + uint64_t replica_min_read_size, CephContext *cct, + bool &has_primary_ops, bool &single_direct_op) { + bool is_first_chunk = true; + bool suitable_read_found = false; + + for (auto &o : op->ops) { switch (o.op.op) { case CEPH_OSD_OP_READ: case CEPH_OSD_OP_SPARSE_READ: { uint64_t length = o.op.extent.length; + if (length == 0) { + // length of zero actually means "the whole object". This code cannot + // know the size of the object efficiently, so reject the op. + ldout(cct, DBG_LVL) << __func__ << " REJECT: Zero length read" << dendl; + return false; + } if ((is_erasure && length > 0) || - (!is_erasure && length >= kReplicaMinReadSize)) { + (!is_erasure && length >= replica_min_read_size)) { suitable_read_found = true; } + if (single_direct_op) { + auto [single_chunk, first_chunk] = is_single_chunk(pi, o.op.extent.offset, o.op.extent.length); + is_first_chunk = is_first_chunk && first_chunk; + single_direct_op = single_direct_op && single_chunk; + } break; } case CEPH_OSD_OP_GETXATTRS: case CEPH_OSD_OP_CHECKSUM: - case CEPH_OSD_OP_GETXATTR: { + case CEPH_OSD_OP_GETXATTR: + case CEPH_OSD_OP_STAT: + case CEPH_OSD_OP_CMPXATTR: + case CEPH_OSD_OP_CALL: { + has_primary_ops = true; break; // Do not block validate. } default: { - ldout(cct, DBG_LVL) << __func__ <<" REJECT: unsupported op" << dendl; + ldout(cct, DBG_LVL) << __func__ << " REJECT: unsupported op" << dendl; return false; } } } + if (single_direct_op && has_primary_ops) { + single_direct_op = is_first_chunk; + } + return suitable_read_found; } -void SplitOp::init(OSDOp &op, int ops_index) { - switch (op.op.op) { - case CEPH_OSD_OP_SPARSE_READ: { - init_read(op, true, ops_index); - break; - } - case CEPH_OSD_OP_READ: { - init_read(op, false, ops_index); - break; - } - case CEPH_OSD_OP_GETXATTRS: - case CEPH_OSD_OP_CHECKSUM: - case CEPH_OSD_OP_GETXATTR: { - shard_id_t shard = *primary_shard; - Details &d = sub_reads.at(shard).details[ops_index]; - orig_op->pass_thru_op(sub_reads.at(shard).rd, ops_index, &d.bl, &d.rval); - break; - } - default: { - ldout(cct, DBG_LVL) << __func__ <<" ABORT: unsupported" << dendl; - abort = true; - break; - } +/** + * Validate if an operation is eligible for split operation optimization. + * + * This function performs a multi-stage validation to determine if an operation + * can be split across multiple OSDs for improved read performance: + * 1. Validates operation flags (BALANCE_READS required, no WRITE flag) + * 2. For replicated pools, rejects if min_split_replica_read_size is 0 (splitting disabled) + * 3. Validates operation types and read size constraints + * + * @param op The operation to validate + * @param objecter Objecter instance for configuration access + * @param pi Pool information + * @param cct CephContext for logging + * @return A pair of bools: {suitable_read_found, single_direct_op} + * - suitable_read_found: true if operation contains valid read(s) + * - single_direct_op: true if operation can be sent to single OSD + */ +std::pair validate(Objecter::Op *op, Objecter &objecter, + const pg_pool_t *pi, CephContext *cct) { + bool is_erasure = pi->is_erasure(); + + // Validate flags + if (!validate_flags(pi, op, cct)) { + return {false, false}; } + + // Initialize state for operation validation + bool has_primary_ops = nullptr != op->objver; + bool single_direct_op = is_erasure; + + uint64_t replica_min_shard_read_size = objecter.get_min_split_replica_read_size(); + + if (!is_erasure && replica_min_shard_read_size == 0) { + ldout(cct, DBG_LVL) << __func__ << " REJECT: splitting disabled (min_split_replica_read_size=0)" << dendl; + return {false, false}; + } + + uint64_t replica_min_read_size = replica_min_shard_read_size * kReplicaMinShardReads; + + // Validate operations and read sizes + bool suitable_read_found = validate_operations(op, pi, is_erasure, + replica_min_read_size, cct, + has_primary_ops, single_direct_op); + + return {suitable_read_found, single_direct_op}; } -namespace { void debug_op_summary(const std::string &str, Objecter::Op *op, CephContext *cct) { - auto &t = op->target; + auto &target = op->target; ldout(cct, DBG_LVL) << str - << " osd=" << t.osd - << " shard=" << (t.force_shard ? *t.force_shard : shard_id_t(-1)) - << " balance_reads=" << ((t.flags & CEPH_OSD_FLAG_BALANCE_READS) != 0) + << " object_id=" << target.base_oid + << " tid=" << op->tid + << " pool=" << target.base_oloc.pool + << " pgid=" << target.actual_pgid + << " osd=" << target.osd + << " force_osd=" << ((target.flags & CEPH_OSD_FLAG_FORCE_OSD) != 0) + << " balance_reads=" << ((target.flags & CEPH_OSD_FLAG_BALANCE_READS) != 0) << " ops.size()=" << op->ops.size() << " needs_version=" << (op->objver?"true":"false"); @@ -398,19 +826,106 @@ void debug_op_summary(const std::string &str, Objecter::Op *op, CephContext *cct } } +/** + * @brief Prepare a single-chunk operation for direct execution. + * + * When an operation can be satisfied by reading from a single OSD (e.g., + * single chunk in EC pool), configures the operation for direct execution + * without splitting. Sets EC_DIRECT_READ and FORCE_OSD flags and updates + * the target OSD and shard ID. + * + * This optimization avoids split operation overhead when data can be + * retrieved from a single shard. + * + * @param op Operation to prepare + * @param objecter Objecter instance + * @param cct CephContext for logging + */ +void SplitOp::prepare_single_op(Objecter::Op *op, Objecter &objecter, CephContext *cct) { + auto &target = op->target; + const pg_pool_t *pi = objecter.osdmap->get_pg_pool(target.base_oloc.pool); + ceph_assert(pi); + + objecter._calc_target(&op->target, op); + uint64_t data_chunk_count = pi->get_ec_data_shard_count(); + uint32_t chunk_size = pi->get_stripe_width() / data_chunk_count; + // Find the first read to work out where the IO goes. + for (auto o : op->ops) { + if (o.op.op == CEPH_OSD_OP_SPARSE_READ || + o.op.op == CEPH_OSD_OP_READ) { + raw_shard_id_t raw_shard((o.op.extent.offset) / chunk_size % data_chunk_count); + shard_id_t shard = pi->get_shard(raw_shard); + if (shard != shard_id_t::NO_SHARD) { + int acting_index = (int)shard; + if (objecter.osdmap->exists(op->target.acting[acting_index])) { + op->target.flags |= CEPH_OSD_FLAG_EC_DIRECT_READ; + op->target.flags |= CEPH_OSD_FLAG_FORCE_OSD; + target.osd = target.acting[acting_index]; + target.actual_pgid.reset_shard(shard); + target.used_replica = (target.acting_primary != target.osd); + } + } + break; + } + } + debug_op_summary("reuse_op:", op, cct); +} + +/** + * Create and initialize a split operation for parallel reads. + * + * This function implements the abort flag pattern to efficiently handle + * validation and creation failures: + * + * STAGE 1: Cheap validation tests (lines 643-666) + * - Check pool exists + * - Check split reads are enabled + * - Validate operation types and parameters + * - Return false immediately if validation fails (no split op created) + * + * STAGE 2: Create split op object (lines 668-674) + * - Allocate ECSplitOp or ReplicaSplitOp based on pool type + * - Constructor may detect issues and set abort flag + * + * STAGE 3: Check abort after construction (lines 676-679) + * - If abort is set during construction, discard split op and return false + * - This catches issues that can only be detected during object creation + * + * STAGE 4: Initialize sub-operations (lines 681-690) + * - Call init() for each operation in the request + * - init_read() may set abort if OSDs are missing or state is invalid + * - Break early if abort is detected to avoid unnecessary work + * + * STAGE 5: Final abort check (lines 692-695) + * - If abort was set during initialization, discard split op and return false + * - Allows fallback to normal (non-split) operation path + * + * STAGE 6: Send operations (lines 697-747) + * - Only reached if abort=false, meaning split op is valid + * - complete() will only run for these successfully sent operations + * + * @return true if split op was created and sent, false to use normal operation + */ bool SplitOp::create(Objecter::Op *op, Objecter &objecter, - shunique_lock& sul, ceph_tid_t *ptid, int *ctx_budget, CephContext *cct) { + shunique_lock& sul, CephContext *cct) { - auto &t = op->target; - const pg_pool_t *pi = objecter.osdmap->get_pg_pool(t.base_oloc.pool); + auto &target = op->target; + const pg_pool_t *pi = objecter.osdmap->get_pg_pool(target.base_oloc.pool); + // STAGE 1: Cheap validation tests run first before creating split op if (!pi) { ldout(cct, DBG_LVL) << __func__ <<" REJECT: No Pool" << dendl; return false; } - debug_op_summary("orig_op: ", op, cct); + debug_op_summary("orig_op:", op, cct); + + // Reject if the operation is a snapshot operation + if (op->snapid != CEPH_NOSNAP || !op->snapc.empty()) { + ldout(cct, DBG_LVL) << __func__ <<" REJECT: snapshot operation" << dendl; + return false; + } // Reject if direct reads not supported by profile. if (!pi->has_flag(pg_pool_t::FLAG_CLIENT_SPLIT_READS)) { @@ -418,12 +933,19 @@ bool SplitOp::create(Objecter::Op *op, Objecter &objecter, return false; } - bool validated = validate(op, pi->is_erasure(), cct); + auto [validated, single_op] = validate(op, objecter, pi, cct); if (!validated) { return false; } + if (single_op) { + ldout(cct, DBG_LVL) << __func__ <<" reusing original op " << dendl; + prepare_single_op(op, objecter, cct); + return false; + } + + // STAGE 2: Create split op object (may set abort during construction) std::shared_ptr split_read; if (pi->is_erasure()) { @@ -432,15 +954,17 @@ bool SplitOp::create(Objecter::Op *op, Objecter &objecter, split_read = std::make_shared(op, objecter, cct, pi->size); } + // STAGE 3: Check if abort was set during construction if (split_read->abort) { - ldout(cct, DBG_LVL) << __func__ <<" ABORTED 1" << dendl;return false; + ldout(cct, DBG_LVL) << __func__ <<" ABORTED 1" << dendl; return false; } // Populate the target, to extract the acting set from it. - t.flags &= ~CEPH_OSD_FLAG_BALANCE_READS; + target.flags &= ~CEPH_OSD_FLAG_BALANCE_READS; objecter._calc_target(&op->target, op); + // STAGE 4: Initialize sub-operations (may set abort if problems detected) for (unsigned i = 0; i < op->ops.size(); ++i) { split_read->init( op->ops[i], i); if (split_read->abort) { @@ -448,39 +972,69 @@ bool SplitOp::create(Objecter::Op *op, Objecter &objecter, } } + // STAGE 5: Final abort check - discard split op if problems were detected if (split_read->abort) { ldout(cct, DBG_LVL) << __func__ <<" ABORTED 2" << dendl; return false; } - ldout(cct, DBG_LVL) << __func__ <<" sub_reads ready. count=" << split_read->sub_reads.size() << dendl; + // Ideally, the validate should detect any single-op read. However, if that + // fails, then this will catch the cases (albeit less efficiently). + if (split_read->sub_reads.size() <= 1) { + ldout(cct, DBG_LVL) << __func__ <<" reusing original op - inefficient" << dendl; + prepare_single_op(op, objecter, cct); + split_read->abort = true; // Required for destructor. + return false; + } + + objecter.add_op_to_splitop_session(op); + + split_read->protect_torn_reads(); + + + op->split_op_tids = std::make_unique>(split_read->sub_reads.size()); + auto &tids = *op->split_op_tids; + + int i=0; // We are committed to doing a split read. Any re-attempts should not be either // split or balanced. - for (auto && [shard, sub_read] : split_read->sub_reads) { + for (auto && [index, sub_read] : split_read->sub_reads) { auto fin = new Finisher(split_read, sub_read); // Self-destructs when called. version_t *objver = nullptr; - if (split_read->primary_shard && shard == *split_read->primary_shard) { + if (index == split_read->reference_sub_read) { objver = split_read->orig_op->objver; } auto sub_op = objecter.prepare_read_op( - t.base_oid, t.base_oloc, split_read->sub_reads.at(shard).rd, op->snapid, + target.base_oid, target.base_oloc, split_read->sub_reads.at(index).rd, op->snapid, nullptr, split_read->flags, -1, fin, objver); - sub_op->target.force_shard.emplace(shard); + + auto &st = sub_op->target; + st = target; // Target can start off in same state as parent. + st.flags |= CEPH_OSD_FLAG_FORCE_OSD; + st.flags |= CEPH_OSD_FLAG_FAIL_ON_EAGAIN; if (pi->is_erasure()) { - sub_op->target.flags |= CEPH_OSD_FLAG_EC_DIRECT_READ; + st.flags |= CEPH_OSD_FLAG_EC_DIRECT_READ; + st.actual_pgid.reset_shard(shard_id_t(index)); } else { - sub_op->target.flags |= CEPH_OSD_FLAG_BALANCE_READS; + st.flags |= CEPH_OSD_FLAG_BALANCE_READS; } + st.osd = st.acting[index]; + + target.used_replica = (st.acting_primary != st.osd); + + objecter._op_submit(sub_op, sul, &tids[i++]); - objecter._op_submit_with_budget(sub_op, sul, ptid, ctx_budget); debug_op_summary("sent_op", sub_op, cct); } ceph_assert(split_read->sub_reads.size() > 0); + ldout(cct, DBG_LVL) << __func__ << " object_id=" << target.base_oid + << " assigned parent tid=" << op->tid << dendl; + return true; } diff --git a/src/osdc/SplitOp.h b/src/osdc/SplitOp.h index 6f3d9a199ec..9ae19d8566c 100644 --- a/src/osdc/SplitOp.h +++ b/src/osdc/SplitOp.h @@ -11,9 +11,59 @@ #pragma once +#include +#include +#include +#include +#include #include -#include +#include +#include + #include + +#include "include/buffer_fwd.h" +#include "include/ceph_assert.h" +#include "include/interval_set.h" +#include "include/types.h" + +#include "common/ceph_context.h" +#include "common/ceph_mutex.h" +#include "common/error_code.h" +#include "common/mini_flat_map.h" +#include "common/shunique_lock.h" + +#include "osd/ECTypes.h" +#include "osd/osd_types.h" + +#include "Objecter.h" + +/** + * @class SplitOp + * @brief Base class for splitting operations across multiple OSDs for improved performance. + * + * SplitOp implements a pattern for parallelizing operations by distributing them across + * multiple OSDs in a pool. This optimization is particularly effective for: + * - Erasure-coded pools: operations can be distributed across data shards + * - Replicated pools: large operations can be split across replicas + * + * The split operation pattern works as follows: + * 1. Validation: Check if the operation is eligible for splitting + * 2. Creation: Create sub-operations targeting different OSDs + * 3. Execution: Send sub-operations in parallel + * 4. Assembly: Reconstruct results from sub-operation responses + * 5. Completion: Return assembled results to the client + * + * Key features: + * - Consistency protection: Ensures all sub-operations see the same object version + * - Automatic fallback: Falls back to normal operation if splitting is not beneficial + * - Error handling: Properly handles partial failures and version mismatches + * + * Currently optimized for read operations, with potential for future write support. + * + * @see ECSplitOp for erasure-coded pool implementation + * @see ReplicaSplitOp for replicated pool implementation + */ class SplitOp { protected: @@ -21,28 +71,51 @@ class SplitOp { using extents_map = std::map; using extent_set = interval_set; - using ExtentPredicate = std::function; - using extent_map_subrange = std::ranges::subrange; - using extent_map_subrange_view = std::ranges::take_while_view; - using extent_variant = std::variant, extent_map_subrange_view, extent_map_subrange>; - using buffer_appender = std::function; - - // A simple struct to hold the data for each step of the iteration. + /** + * @struct ECChunkInfo + * @brief Information about a single chunk in an erasure-coded stripe iteration. + * + * This structure holds the mapping between logical object offsets and physical + * shard locations for a single chunk within an EC stripe. Used by ECStripeIterator + * to track position during stripe traversal. + */ struct ECChunkInfo { uint64_t ro_offset; uint64_t shard_offset; uint64_t length; - shard_id_t shard; + raw_shard_id_t raw_shard; - friend std::ostream & operator<<(std::ostream &os, const ECChunkInfo &obj) { + friend std::ostream & operator<<(std::ostream &os, const ECChunkInfo &chunk_info) { return os - << "ro_offset: " << obj.ro_offset - << " shard_offset: " << obj.shard_offset - << " length: " << obj.length - << " shard: " << obj.shard; + << "ro_offset: " << chunk_info.ro_offset + << " shard_offset: " << chunk_info.shard_offset + << " length: " << chunk_info.length + << " raw_shard: " << (int)chunk_info.raw_shard; } }; + /** + * @class ECStripeIterator + * @brief Iterator for traversing erasure-coded stripes chunk by chunk. + * + * ECStripeIterator provides a standard C++ iterator interface for walking through + * an erasure-coded stripe, yielding information about each chunk's location in both + * the logical object space and the physical shard space. + * + * The iterator handles the complex mapping between: + * - Logical object offsets (ro_offset) + * - Physical shard offsets (shard_offset) + * - Shard identifiers (raw_shard) + * - Chunk boundaries and lengths + * + * Algorithm: + * - Starts at a given offset and iterates through chunks sequentially + * - Automatically wraps around shards when reaching the end of a stripe + * - Handles partial chunks at the beginning and end of the range + * - Terminates when all requested data has been covered + * + * @note Conforms to std::input_iterator concept + */ class ECStripeIterator { public: using iterator_category = std::input_iterator_tag; @@ -66,11 +139,9 @@ class SplitOp { uint64_t chunk = start_offset / chunk_size; current_info.length = std::min(total_len, (chunk + 1) * chunk_size - start_offset); - // Maybe this is paranoia, as compiler would probably detect that this - // / and % could be done in a single op. - auto chunk_div = std::lldiv(chunk, data_chunk_count); - current_info.shard = shard_id_t(chunk_div.rem); - current_info.shard_offset = (chunk_div.quot) * chunk_size + start_offset % chunk_size; + current_info.raw_shard = raw_shard_id_t(chunk % data_chunk_count); + current_info.shard_offset = (chunk / data_chunk_count) * chunk_size + + start_offset % chunk_size; } value_type operator*() const { @@ -83,10 +154,10 @@ class SplitOp { current_info.shard_offset += current_info.length - chunk_size; current_info.length = std::min(chunk_size, end_offset - current_info.ro_offset); ceph_assert(current_info.ro_offset <= end_offset); - ++current_info.shard; - if (unsigned(current_info.shard) == data_chunk_count) { + ++current_info.raw_shard; + if (std::cmp_equal((int)current_info.raw_shard, data_chunk_count)) { current_info.shard_offset += chunk_size; - current_info.shard = shard_id_t(0); + current_info.raw_shard = raw_shard_id_t(0); } return *this; } @@ -114,8 +185,26 @@ class SplitOp { uint32_t data_chunk_count = 0; }; - - // The custom range class that provides begin() and end() + /** + * @class ECStripeView + * @brief Range view for iterating over erasure-coded stripes. + * + * ECStripeView provides a range-based interface for traversing an erasure-coded + * stripe. It encapsulates the stripe geometry (chunk size, data chunk count) and + * provides begin()/end() iterators for use in range-based for loops. + * + * Usage: + * @code + * ECStripeView stripe_view(offset, length, pool_info); + * for (auto chunk_info : stripe_view) { + * // Process each chunk + * } + * @endcode + * + * @param offset Starting offset in the logical object + * @param length Total length to iterate over + * @param pi Pool information containing stripe geometry + */ class ECStripeView { public: ECStripeView( @@ -124,7 +213,7 @@ class SplitOp { const pg_pool_t *pi) : start_offset(offset), total_length(length), - data_chunk_count(pi->nonprimary_shards.size() + 1), + data_chunk_count(pi->get_ec_data_shard_count()), chunk_size(pi->get_stripe_width() / data_chunk_count) { } @@ -147,6 +236,13 @@ class SplitOp { static_assert(std::input_iterator, "ECStripeIterator does not conform to the std::input_iterator concept"); + /** + * @struct Details + * @brief Holds response data and metadata for a single operation in a sub-read. + * + * Contains the buffer, return value, error code, and optional extent map + * returned from executing one operation within a SubRead. + */ struct Details { bufferlist bl; int rval; @@ -154,18 +250,46 @@ class SplitOp { std::optional e; }; + /** + * @struct InternalVersion + * @brief Holds internal version information for torn read protection. + * + * Used to detect version mismatches across parallel sub-operations, + * ensuring all operations see the same object version. + */ + struct InternalVersion { + boost::system::error_code ec; + bufferlist bl; + }; + + /** + * @struct SubRead + * @brief Represents a single sub-operation sent to one OSD. + * + * Contains the operation descriptor, response details for each operation, + * return code, and optional version information for consistency checking. + * + * @param count Number of operations in this sub-read + */ struct SubRead { ::ObjectOperation rd; mini_flat_map details; int rc = -EIO; + std::optional internal_version; SubRead(int count) : details(count) {} }; - // This structure self-destructs on each IO completions, using a legacy - // C++ pattern (no shared_ptr). We use the finish callback to record the - // RC, but otherwise rely on the shared_ptr destroying ec_read to deal with - // completion of the parent IO. + /** + * @struct Finisher + * @brief Completion callback for sub-operations. + * + * This structure self-destructs on IO completion, using a legacy C++ pattern + * (no shared_ptr). The finish callback records the return code, while the + * shared_ptr to the parent SplitOp handles overall completion. + * + * @note Self-destructs when called + */ struct Finisher : Context { std::shared_ptr split_read; SubRead &sub_read; @@ -176,48 +300,263 @@ class SplitOp { } }; - int assemble_rc(); - virtual std::pair assemble_buffer_sparse_read(int ops_index) = 0; - virtual void assemble_buffer_read(bufferlist &bl_out, int ops_index) = 0; + int assemble_rc() const; + virtual std::pair assemble_buffer_sparse_read(int ops_index) const = 0; + virtual void assemble_buffer_read(bufferlist &bl_out, int ops_index) const = 0; virtual void init_read(OSDOp &op, bool sparse, int ops_index) = 0; + virtual bool version_mismatch() const = 0; void init(OSDOp &op, int ops_index); Objecter::Op *orig_op; Objecter &objecter; - mini_flat_map sub_reads; + mini_flat_map sub_reads; CephContext *cct; - bool abort = false; // Last minute abort... We want to keep this to a minimum. + + /** + * Abort flag pattern for split operation creation: + * + * This flag implements a multi-stage validation and creation pattern that + * minimizes wasted work when a split operation cannot be completed: + * + * 1. Cheap validation tests run first in validate() and create() before + * creating the split op object (e.g., checking pool flags, operation types) + * + * 2. If validation fails, return false immediately without creating split op + * + * 3. If validation passes, create the split op and begin initialization + * + * 4. During init_read() and init(), set abort=true if problems are detected + * that prevent successful operation (e.g., missing OSDs, invalid state) + * + * 5. After initialization, check abort flag in create() and discard the + * split op if set (return false to fall back to normal operation) + * + * 6. The complete() method only runs for successfully sent operations where + * abort=false, ensuring cleanup only happens for valid split ops + * + * This pattern ensures expensive initialization work is only done when likely + * to succeed, while still catching edge cases that can only be detected during + * the creation process itself. + */ + bool abort = false; int flags = 0; - std::optional primary_shard; - std::map> op_offset_map; + int reference_sub_read = -1; + std::map> op_offset_map; public: - SplitOp(Objecter::Op *op, Objecter &objecter, CephContext *cct, int count) : orig_op(op), objecter(objecter), sub_reads(count), cct(cct) {} - virtual ~SplitOp() = default; - void complete(); - static bool create(Objecter::Op *op, Objecter &objecter, - shunique_lock& sul, ceph_tid_t *ptid, int *ctx_budget, CephContext *cct); + /** + * @brief Construct a SplitOp. + * @param op Original operation to be split + * @param objecter Objecter instance for OSD communication + * @param cct CephContext for logging and configuration + * @param count Number of sub-operations to create + */ + SplitOp(Objecter::Op *op, Objecter &objecter, CephContext *cct, int count) : orig_op(op), objecter(objecter), sub_reads(count), cct(cct) {} + + virtual ~SplitOp() = default; + + /** + * @brief Complete the split operation by assembling results. + * + * Called when all sub-operations have completed. Assembles the results + * from individual sub-operations, handles version mismatches, and invokes + * the original operation's completion handlers. + */ + void complete(); + + /** + * @brief Prepare a single-chunk operation for direct execution. + * + * When an operation can be satisfied by reading from a single OSD (e.g., + * single chunk in EC pool), this method configures the operation for + * direct execution without splitting. + * + * @param op Operation to prepare + * @param objecter Objecter instance + * @param cct CephContext for logging + */ + static void prepare_single_op(Objecter::Op *op, Objecter &objecter, CephContext *cct); + + /** + * @brief Add version tracking to sub-operations for consistency. + * + * Ensures all sub-operations read the same object version by adding + * internal version queries. If versions mismatch, the operation is retried. + */ + void protect_torn_reads(); + + /** + * @brief Create and initialize a split operation. + * + * Main entry point for split operation creation. Validates the operation, + * creates appropriate sub-operations, and sends them to target OSDs. + * + * Uses a multi-stage abort pattern: + * 1. Cheap validation (pool checks, operation types) + * 2. Object creation (may set abort flag) + * 3. Initialization (may set abort if OSDs missing) + * 4. Final validation before sending + * + * @param op Operation to potentially split + * @param objecter Objecter instance + * @param sul Shared lock for OSD map access + * @param cct CephContext for logging + * @return true if operation was split and sent, false to use normal path + */ + static bool create(Objecter::Op *op, Objecter &objecter, + shunique_lock& sul, CephContext *cct); }; +/** + * @class ECSplitOp + * @brief Split operation implementation for erasure-coded pools. + * + * ECSplitOp handles splitting operations across data shards in an erasure-coded + * pool. It implements the stripe iteration and buffer assembly logic specific to + * EC pools, where data is distributed across multiple shards according to the + * erasure coding scheme. + * + * Key responsibilities: + * - Iterate through EC stripes to determine which shards to read + * - Assemble data from multiple shards in the correct order + * - Handle sparse reads with extent maps + * - Verify version consistency across shards + * + * @see ECStripeIterator for stripe traversal algorithm + * @see ECStripeView for range-based stripe iteration + */ class ECSplitOp : public SplitOp{ public: using SplitOp::SplitOp; - std::pair assemble_buffer_sparse_read(int ops_index) override; - void assemble_buffer_read(bufferlist &bl_out, int ops_index) override; + + /** + * @brief Assemble sparse read results from EC shards. + * + * Iterates through the EC stripe, collecting extent maps and data buffers + * from each shard in the correct order to reconstruct the logical object view. + * + * @param ops_index Index of the operation in the operation list + * @return Pair of extent set and assembled buffer + */ + std::pair assemble_buffer_sparse_read(int ops_index) const override; + + /** + * @brief Assemble dense read results from EC shards. + * + * Iterates through the EC stripe, collecting data buffers from each shard + * in the correct order to reconstruct the contiguous logical object data. + * + * @param bl_out Output buffer to append assembled data + * @param ops_index Index of the operation in the operation list + */ + void assemble_buffer_read(bufferlist &bl_out, int ops_index) const override; + + /** + * @brief Initialize read sub-operations for EC pool. + * + * Determines which shards need to be read based on the stripe geometry, + * creates sub-operations for each required shard, and validates that all + * target OSDs are available. + * + * @param op Operation descriptor + * @param sparse Whether this is a sparse read + * @param ops_index Index of the operation in the operation list + */ void init_read(OSDOp &op, bool sparse, int ops_index) override; - ECSplitOp(Objecter::Op *op, Objecter &objecter, CephContext *cct, int count); + + /** + * @brief Check for version mismatches across EC shards. + * + * Compares the internal versions returned by each shard to ensure all + * sub-operations read the same object version. Returns true if any + * mismatch is detected. + * + * @return true if versions mismatch, false if consistent + */ + bool version_mismatch() const override; + ~ECSplitOp() { complete(); } }; +/** + * @class ReplicaSplitOp + * @brief Split operation implementation for replicated pools. + * + * ReplicaSplitOp handles splitting large operations across replicas in a + * replicated pool. It divides the operation into chunks and distributes them + * across available replicas for parallel execution. + * + * Key responsibilities: + * - Divide operations into appropriately-sized chunks + * - Distribute chunks across available replicas + * - Assemble results in the correct order + * - Verify version consistency across replicas + * + * The chunk size is determined by configuration and the number of available + * replicas, with a minimum threshold to ensure splitting is beneficial. + */ class ReplicaSplitOp : public SplitOp { public: using SplitOp::SplitOp; - std::pair assemble_buffer_sparse_read(int ops_index) override; - void assemble_buffer_read(bufferlist &bl_out, int ops_index) override; + + /** + * @brief Assemble sparse read results from replicas. + * + * Collects extent maps and data buffers from each replica sub-operation + * and combines them into a single result. + * + * @param ops_index Index of the operation in the operation list + * @return Pair of extent set and assembled buffer + */ + std::pair assemble_buffer_sparse_read(int ops_index) const override; + + /** + * @brief Assemble dense read results from replicas. + * + * Collects data buffers from each replica sub-operation and appends them + * in order to reconstruct the complete result. + * + * @param bl_out Output buffer to append assembled data + * @param ops_index Index of the operation in the operation list + */ + void assemble_buffer_read(bufferlist &bl_out, int ops_index) const override; + + /** + * @brief Initialize read sub-operations for replicated pool. + * + * Divides the operation into chunks and creates sub-operations targeting + * different replicas. The chunk size and distribution are determined by + * configuration and the number of available OSDs. + * + * @param op Operation descriptor + * @param sparse Whether this is a sparse read + * @param ops_index Index of the operation in the operation list + */ void init_read(OSDOp &op, bool sparse, int ops_index) override; - ReplicaSplitOp(Objecter::Op *op, Objecter &objecter, CephContext *cct, int pool_size); + + /** + * @brief Check for version mismatches across replicas. + * + * Compares the versions returned by each replica to ensure all + * sub-operations read the same object version. Returns true if any + * mismatch is detected. + * + * @return true if versions mismatch, false if consistent + */ + bool version_mismatch() const override; + + /** + * @brief Construct a ReplicaSplitOp. + * @param op Original operation to be split + * @param objecter Objecter instance + * @param cct CephContext for logging + * @param pool_size Number of replicas in the pool + */ + ReplicaSplitOp(Objecter::Op *op, Objecter &objecter, CephContext *cct, int pool_size) : + SplitOp(op, objecter, cct, pool_size) {} + ~ReplicaSplitOp() { complete(); }