* Foundation. See file COPYING.
*/
-#include "osdc/Objecter.h"
#include "osdc/SplitOp.h"
+#include "osdc/Objecter.h"
#include "osd/osd_types.h"
+using namespace std::literals;
+
#define dout_subsys ceph_subsys_objecter
#define DBG_LVL 20
}
}
-constexpr static uint64_t kReplicaMinShardReadSize = 128 * 1024;
constexpr static uint64_t kReplicaMinShardReads = 2;
-constexpr static uint64_t kReplicaMinReadSize = kReplicaMinShardReadSize * kReplicaMinShardReads;
#undef dout_prefix
#define dout_prefix *_dout << " ECSplitOp::"
-std::pair<SplitOp::extent_set, bufferlist> ECSplitOp::assemble_buffer_sparse_read(int ops_index) {
+/**
+ * @brief Assemble sparse read results from EC shards into logical object view.
+ *
+ * Iterates through the EC stripe using ECStripeView, collecting extent maps
+ * and data buffers from each shard. The resulting buffer is built up in order,
+ * only extending (never inserting into the middle), which maintains buffer
+ * efficiency and allows for zero-copy operations where possible.
+ *
+ * @param ops_index Index of the operation in the operation list
+ * @return Pair containing the extent set and assembled buffer list
+ */
+std::pair<SplitOp::extent_set, bufferlist> ECSplitOp::assemble_buffer_sparse_read(int ops_index) const {
bufferlist bl_out;
extent_set extents_out;
-
auto &orig_osd_op = orig_op->ops[ops_index].op;
const pg_pool_t *pi = objecter.osdmap->get_pg_pool(orig_op->target.base_oloc.pool);
+ ceph_assert(pi);
ECStripeView stripe_view(orig_osd_op.extent.offset, orig_osd_op.extent.length, pi);
- ldout(cct, DBG_LVL) << __func__ << " start:"
+ ldout(cct, DBG_LVL) << __func__ << " START -->" << " object_id=" << orig_op->target.base_oid << " tid=" << orig_op->tid << " extent="
<< orig_osd_op.extent.offset << "~" << orig_osd_op.extent.length << dendl;
std::vector<uint64_t> buffer_offset(stripe_view.data_chunk_count);
- mini_flat_map<shard_id_t, extents_map::iterator> map_iterators(stripe_view.data_chunk_count);
+ mini_flat_map<int, extents_map::const_iterator> map_iterators(stripe_view.data_chunk_count);
for (auto &&chunk_info : stripe_view) {
- ldout(cct, DBG_LVL) << __func__ << " chunk: " << chunk_info << dendl;
- auto &details = sub_reads.at(chunk_info.shard).details[ops_index];
-
- if (!map_iterators.contains(chunk_info.shard)) {
- map_iterators.emplace(chunk_info.shard, details.e->begin());
+ shard_id_t shard = pi->get_shard(chunk_info.raw_shard);
+ ceph_assert(shard != shard_id_t::NO_SHARD);
+ int shard_index = (int)shard;
+ ldout(cct, DBG_LVL) << __func__ << " object_id=" << orig_op->target.base_oid << " tid=" << orig_op->tid << " chunk: " << chunk_info
+ << " shard: " << shard << dendl;
+ auto &details = sub_reads.at(shard_index).details.at(ops_index);
+
+ if (!map_iterators.contains(shard_index)) {
+ map_iterators.emplace(shard_index, details.e->begin());
}
- extents_map::iterator &extent_iter = map_iterators.at(chunk_info.shard);
+ extents_map::const_iterator &extent_iter = map_iterators.at(shard_index);
uint64_t bl_len = 0;
while (extent_iter != details.e->end() && extent_iter->first < chunk_info.ro_offset + stripe_view.chunk_size) {
auto [off, len] = *extent_iter;
- ldout(cct, DBG_LVL) << __func__ << " extent=" << off << "~" << len << dendl;
+ ldout(cct, DBG_LVL) << __func__ << " object_id=" << orig_op->target.base_oid << " tid=" << orig_op->tid << " extent=" << off << "~" << len << dendl;
extents_out.insert(off, len);
bl_len += len;
++extent_iter;
// We try to keep the buffers together where possible.
if (bl_len != 0) {
bufferlist bl;
- bl.substr_of(details.bl, buffer_offset[(int)chunk_info.shard], bl_len);
+ bl.substr_of(details.bl, buffer_offset[(int)chunk_info.raw_shard], bl_len);
bl_out.append(bl);
- buffer_offset[(int)chunk_info.shard] += bl_len;
+ buffer_offset[(int)chunk_info.raw_shard] += bl_len;
}
}
+ ldout(cct, DBG_LVL) << __func__ << " END -->" << " object_id=" << orig_op->target.base_oid << " tid=" << orig_op->tid << " extent="
+ << orig_osd_op.extent.offset << "~" << orig_osd_op.extent.length << dendl;
+
return std::pair(extents_out, bl_out);
}
-void ECSplitOp::assemble_buffer_read(bufferlist &bl_out, int ops_index) {
+/**
+ * @brief Assemble dense read results from EC shards into contiguous buffer.
+ *
+ * Iterates through the EC stripe, extracting data from each shard in order.
+ * The output buffer is built sequentially by appending data, never inserting
+ * into the middle, which maintains buffer efficiency.
+ *
+ * @param bl_out Output buffer to append assembled data
+ * @param ops_index Index of the operation in the operation list
+ */
+void ECSplitOp::assemble_buffer_read(bufferlist &bl_out, int ops_index) const {
auto &orig_osd_op = orig_op->ops[ops_index].op;
const pg_pool_t *pi = objecter.osdmap->get_pg_pool(orig_op->target.base_oloc.pool);
+ ceph_assert(pi);
ECStripeView stripe_view(orig_osd_op.extent.offset, orig_osd_op.extent.length, pi);
std::vector<uint64_t> buffer_offset(stripe_view.data_chunk_count);
- ldout(cct, DBG_LVL) << __func__ << " " << orig_osd_op.extent.offset << "~" << orig_osd_op.extent.length << dendl;
+ ldout(cct, DBG_LVL) << __func__ << " object_id=" << orig_op->target.base_oid << " tid=" << orig_op->tid << " extent="
+ << orig_osd_op.extent.offset << "~" << orig_osd_op.extent.length << dendl;
for (auto &&chunk_info : stripe_view) {
- ldout(cct, DBG_LVL) << __func__ << " chunk info " << chunk_info << dendl;
- auto &details = sub_reads.at(chunk_info.shard).details[ops_index];
+ ldout(cct, DBG_LVL) << __func__ << " object_id=" << orig_op->target.base_oid << " tid=" << orig_op->tid << " chunk info " << chunk_info << dendl;
+ shard_id_t shard = pi->get_shard(chunk_info.raw_shard);
+ ceph_assert(shard != shard_id_t::NO_SHARD);
+ int shard_index = (int)shard;
+ auto &details = sub_reads.at(shard_index).details.at(ops_index);
+ uint64_t src_len = details.bl.length();
+ uint64_t buf_off = buffer_offset[(int)chunk_info.raw_shard];
+ if (src_len <= buf_off) {
+ // Early termination: we've exhausted the available buffer data.
+ // This is a normal completion scenario, not an error condition.
+ break;
+ }
+ uint64_t len = std::min(chunk_info.length, src_len - buf_off);
bufferlist bl;
- bl.substr_of(details.bl, buffer_offset[(int)chunk_info.shard], chunk_info.length);
+ bl.substr_of(details.bl, buf_off, len);
bl_out.append(bl);
- buffer_offset[(int)chunk_info.shard] += chunk_info.length;
+ buffer_offset[(int)chunk_info.raw_shard] += len;
}
}
+/**
+ * @brief Initialize read sub-operations for erasure-coded pool.
+ *
+ * Determines which EC shards need to be read based on stripe geometry and
+ * creates sub-operations for each required shard. Sets the abort flag if
+ * any required OSD is unavailable.
+ *
+ * Primary shard is required when reading from multiple chunks, when version
+ * information is requested, or when non-read operations are present.
+ *
+ * @param op Operation descriptor containing offset and length
+ * @param sparse Whether this is a sparse read operation
+ * @param ops_index Index of the operation in the operation list
+ */
void ECSplitOp::init_read(OSDOp &op, bool sparse, int ops_index) {
- auto &t = orig_op->target;
- const pg_pool_t *pi = objecter.osdmap->get_pg_pool(t.base_oloc.pool);
+ auto &target = orig_op->target;
+ const pg_pool_t *pi = objecter.osdmap->get_pg_pool(target.base_oloc.pool);
+ ceph_assert(pi);
+
uint64_t offset = op.op.extent.offset;
uint64_t length = op.op.extent.length;
- uint64_t data_chunk_count = pi->nonprimary_shards.size() + 1;
+ uint64_t data_chunk_count = pi->get_ec_data_shard_count();
uint32_t chunk_size = pi->get_stripe_width() / data_chunk_count;
uint64_t start_chunk = offset / chunk_size;
// This calculation is wrong for length = 0, but such IOs should not have
- // reached here!
+ // reached here! Zero-length reads are rejected earlier in the validate()
+ // function (see lines 551-556), which returns {false, false} for any
+ // operation with length == 0, preventing it from being split and processed.
ceph_assert( op.op.extent.length != 0);
+ // No overflow: length >= 1 (asserted above), so offset + length - 1 <= offset + length.
+ // Since offset and length are uint64_t, their sum cannot exceed UINT64_MAX in practice
+ // due to object size limits and memory constraints. Even if offset + length == UINT64_MAX + 1
+ // (impossible in uint64_t), subtracting 1 brings it back to UINT64_MAX (valid).
uint64_t end_chunk = (offset + op.op.extent.length - 1) / chunk_size;
unsigned count = std::min(data_chunk_count, end_chunk - start_chunk + 1);
- //FIXME: This is not quite right - the ops.size() > 1 does not necessarily mean
- // that the primary is required - it could be two reads to the same shard.
- bool primary_required = count > 1 || orig_op->objver || orig_op->ops.size() > 1;
+ // Primary is required if:
+ // 1. Reading from multiple chunks (count > 1)
+ // 2. Version information is needed (orig_op->objver)
+ // 3. There are non-read operations that need the primary
+ bool has_non_read_ops = false;
+ for (const auto &op : orig_op->ops) {
+ if (op.op.op != CEPH_OSD_OP_READ && op.op.op != CEPH_OSD_OP_SPARSE_READ) {
+ has_non_read_ops = true;
+ break;
+ }
+ }
+ bool primary_required = count > 1 || orig_op->objver || has_non_read_ops;
int first_shard = start_chunk % data_chunk_count;
// Check all shards are online.
for (unsigned i = first_shard; i < first_shard + count; i++) {
- shard_id_t shard(i >= data_chunk_count ? i - data_chunk_count : i);
- int direct_osd = t.acting[(int)shard];
- // TODO: This is broken because there might be multiple shards on one OSD.
- // Will fix in the next PR.
- if (t.acting_primary == direct_osd) {
- primary_shard.emplace(shard);
+ raw_shard_id_t raw_shard(i >= data_chunk_count ? i - data_chunk_count : i);
+ shard_id_t shard(pi->get_shard(raw_shard));
+ if (shard == shard_id_t::NO_SHARD) {
+ abort = true;
+ return;
+ }
+ int shard_index = (int)shard;
+
+ int direct_osd = target.acting[shard_index];
+ if (target.actual_pgid.shard == shard) {
+ reference_sub_read = shard_index;
}
if (!objecter.osdmap->exists(direct_osd)) {
ldout(cct, DBG_LVL) << __func__ <<" ABORT: Missing OSD" << dendl;
abort = true;
return;
}
- if (!sub_reads.contains(shard)) {
- sub_reads.emplace(shard, orig_op->ops.size());
+ if (!sub_reads.contains(shard_index)) {
+ sub_reads.emplace(shard_index, orig_op->ops.size() + 1);
}
- auto &d = sub_reads.at(shard).details[ops_index];
+ auto &d = sub_reads.at(shard_index).details[ops_index];
if (sparse) {
d.e.emplace();
- sub_reads.at(shard).rd.sparse_read(offset, length, &(*d.e), &d.bl, &d.rval);
+ sub_reads.at(shard_index).rd.sparse_read(offset, length, &(*d.e), &d.bl, &d.rval);
} else {
- sub_reads.at(shard).rd.read(offset, length, &d.ec, &d.bl);
+ sub_reads.at(shard_index).rd.read(offset, length, &d.ec, &d.bl);
}
}
- if (primary_required && !primary_shard) {
- for (unsigned i=0; i < t.acting.size(); ++i) {
- // TODO: Can't compare OSDs for EC as there may be more than one shard on
- // each OSD.
- // Will fix in next PR.
- if (t.acting[i] == t.acting_primary) {
- primary_shard.emplace(i);
- sub_reads.emplace(*primary_shard, orig_op->ops.size());
- }
- }
-
- // No primary??? Let the normal code paths deal with this.
- if (!primary_shard) {
- ldout(cct, DBG_LVL) << __func__ <<" ABORT: Can't find primary" << dendl;
- abort = true;
- }
+ if (primary_required && reference_sub_read == -1) {
+ // _calc_target will have picked the primary by default on EC. The "primary"
+ // on replica is an arbitrary shard.
+ reference_sub_read = (int)target.actual_pgid.shard;
+ sub_reads.emplace(reference_sub_read, orig_op->ops.size() + 1);
}
+
+ ceph_assert(reference_sub_read != -1);
}
-ECSplitOp::ECSplitOp(Objecter::Op *op, Objecter &objecter, CephContext *cct, int count) :
- SplitOp(op, objecter, cct, count) {}
-
#undef dout_prefix
#define dout_prefix *_dout << " ReplicaSplitOp::"
-std::pair<SplitOp::extent_set, bufferlist> ReplicaSplitOp::assemble_buffer_sparse_read(int ops_index) {
+/**
+ * @brief Assemble sparse read results from replicas.
+ *
+ * Collects extent maps and buffers from each sub-operation. Results are
+ * assembled in order by appending, maintaining buffer efficiency.
+ *
+ * @param ops_index Index of the operation in the operation list
+ * @return Pair containing the combined extent set and buffer list
+ */
+std::pair<SplitOp::extent_set, bufferlist> ReplicaSplitOp::assemble_buffer_sparse_read(int ops_index) const {
extent_set extents_out;
bufferlist bl_out;
- for (auto && [shard, sr] : sub_reads) {
- for (auto [off, len] : *sr.details[ops_index].e) {
+ for (auto && [acting_index, sr] : sub_reads) {
+ for (auto [off, len] : *sr.details.at(ops_index).e) {
extents_out.insert(off, len);
}
- bl_out.append(sr.details[ops_index].bl);
+ bl_out.append(sr.details.at(ops_index).bl);
}
return std::pair(extents_out, bl_out);
}
-void ReplicaSplitOp::assemble_buffer_read(bufferlist &bl_out, int ops_index) {
- for (auto && [_, sr] : sub_reads) {
- bl_out.append(sr.details[ops_index].bl);
+/**
+ * @brief Assemble dense read results from replicas.
+ *
+ * Appends buffers from each sub-operation in order.
+ *
+ * @param bl_out Output buffer to append assembled data
+ * @param ops_index Index of the operation in the operation list
+ */
+void ReplicaSplitOp::assemble_buffer_read(bufferlist &bl_out, int ops_index) const {
+ for (auto && [acting_index, sr] : sub_reads) {
+ bl_out.append(sr.details.at(ops_index).bl);
}
}
-ReplicaSplitOp::ReplicaSplitOp(Objecter::Op *op, Objecter &objecter, CephContext *cct, int pool_size) :
- SplitOp(op, objecter, cct, pool_size) {
-
- // This may not actually be the primary, but since all shards are kept current
- // in replica, it does not actually matter which we choose here. Choose 0 since
- // there will always be a read to this shard.
- primary_shard = shard_id_t(0);
-}
-
+/**
+ * @brief Initialize read sub-operations for replicated pool.
+ *
+ * Divides the operation into chunks and distributes them across available
+ * replicas for parallel execution. Chunk size is calculated based on the
+ * minimum shard read size configuration and number of available OSDs.
+ * Chunks are aligned to page boundaries for efficiency.
+ *
+ * If the operation is too small to benefit from splitting across all replicas,
+ * fewer OSDs are used with a random starting offset for load balancing.
+ *
+ * @param op Operation descriptor containing offset and length
+ * @param sparse Whether this is a sparse read operation
+ * @param ops_index Index of the operation in the operation list
+ */
void ReplicaSplitOp::init_read(OSDOp &op, bool sparse, int ops_index) {
- auto &t = orig_op->target;
+ auto &target = orig_op->target;
+ const pg_pool_t *pi = objecter.osdmap->get_pg_pool(target.base_oloc.pool);
+ ceph_assert(pi);
std::set<int> osds;
- for (int direct_osd : t.acting) {
+ for (int direct_osd : target.acting) {
if (objecter.osdmap->exists(direct_osd)) {
osds.insert(direct_osd);
}
return;
}
+ uint64_t replica_min_shard_read_size
+ = objecter.get_min_split_replica_read_size();
+
uint64_t offset = op.op.extent.offset;
uint64_t length = op.op.extent.length;
- uint64_t slice_count = std::min(length / kReplicaMinShardReadSize, osds.size());
+ uint64_t slice_count = replica_min_shard_read_size == 0 ? 1 :
+ std::min(length / replica_min_shard_read_size,
+ osds.size());
uint64_t chunk_size = p2roundup(length / slice_count, (uint64_t)CEPH_PAGE_SIZE);
+ unsigned start = 0;
- for (unsigned i = 0; i < osds.size() && length > 0; i++) {
+ if (slice_count < osds.size()) {
+ start = rand() % osds.size();
+ }
- shard_id_t shard(i);
- if (!sub_reads.contains(shard)) {
- sub_reads.emplace(shard, orig_op->ops.size());
+ for (unsigned i = start; length > 0; i = (i + 1 == osds.size()) ? 0 : i + 1) {
+ int acting_index = i;
+ if (!sub_reads.contains(acting_index)) {
+ sub_reads.emplace(acting_index, orig_op->ops.size() + 1);
+ // Set reference_sub_read to the first index we use
+ if (reference_sub_read == -1) {
+ reference_sub_read = acting_index;
+ }
}
- auto &sr = sub_reads.at(shard);
+ auto &sr = sub_reads.at(acting_index);
auto bl = &sr.details[ops_index].bl;
auto rval = &sr.details[ops_index].rval;
uint64_t len = std::min(length, chunk_size);
#undef dout_prefix
#define dout_prefix *_dout << " SplitOp::"
-int SplitOp::assemble_rc() {
+/**
+ * @brief Assemble the final return code from all sub-operations.
+ *
+ * Returns the first non-EAGAIN error encountered, or EAGAIN if any
+ * sub-operation returned EAGAIN or if version mismatch is detected.
+ * Otherwise returns the reference sub-operation's return code.
+ *
+ * @return Combined return code for the operation
+ */
+int SplitOp::assemble_rc() const {
int rc = 0;
- bool rc_zero = false;
-
- // This should only happen on a single thread.
- for (auto & [_, sub_read] : sub_reads) {
- if (rc >= 0 && sub_read.rc >= 0) {
- rc += sub_read.rc;
- if (sub_read.rc == 0) {
- rc_zero = true;
- }
- } else if (rc >= 0) {
+ bool eagain = false;
+
+ // Sub-reads which use a single op should re-use original op.
+ ceph_assert(sub_reads.size() > 1);
+
+ // Pick the first bad RC, otherwise return 0.
+ for (auto & [index, sub_read] : sub_reads) {
+ if (sub_read.rc == -EAGAIN) {
+ eagain = true;
+ } else if (sub_read.rc < 0) {
+ return sub_read.rc;
+ }
+
+ // The non-reference indices only get reads, which only ever have zero RCs.
+ if (index == reference_sub_read) {
rc = sub_read.rc;
- } // else ignore subsequent errors.
+ }
}
- if (rc >= 0 && rc_zero) {
- return 0;
+ if (eagain || version_mismatch()) {
+ return -EAGAIN;
}
-
return rc;
}
+/**
+ * @brief Check for version mismatches across EC shards.
+ *
+ * Implements torn read detection by comparing internal versions returned
+ * by each shard. The reference shard maintains a map of all shard versions,
+ * allowing detection of inconsistencies across the EC stripe.
+ *
+ * @return true if versions mismatch, false if consistent
+ */
+bool ECSplitOp::version_mismatch() const {
+ // First we need to decode the version list from the reference.
+ ceph_assert(reference_sub_read != -1);
+ ceph_assert(sub_reads.at(reference_sub_read).internal_version.has_value());
+
+ std::map<shard_id_t, eversion_t> ref_vers;
+ decode(ref_vers, sub_reads.at(reference_sub_read).internal_version->bl);
+
+ for (auto & [shard_index, sub_read] : sub_reads) {
+ // Reference shard can't be different to itself.
+ if (shard_index == reference_sub_read) {
+ continue;
+ }
+
+ ceph_assert(sub_read.internal_version.has_value());
+ std::map<shard_id_t, eversion_t> shard_vers;
+ decode(shard_vers, sub_read.internal_version->bl);
+
+ shard_id_t shard(shard_index);
+ if (!ref_vers.contains(shard)) {
+ ldout(cct, DBG_LVL) << __func__ << ": "
+ << "Reference shard version missing, failing split op." << dendl;
+ return true;
+ }
+ if (!shard_vers.contains(shard)) {
+ ldout(cct, DBG_LVL) << __func__ << ": "
+ << "Shard version missing, failing split op." << dendl;
+ return true;
+ }
+ if (ref_vers.at(shard) != shard_vers.at(shard)) {
+ ldout(cct, DBG_LVL) << __func__ << ": "
+ << "Primary version (" << ref_vers.at(shard) << ") != "
+ << "shard version (" << shard_vers.at(shard) <<") "
+ << "for shard " << shard << dendl;
+ return true;
+ }
+ }
+ return false;
+}
+
+/**
+ * @brief Check for version mismatches across replicas.
+ *
+ * For replicated pools, checks that all replicas returned the same version.
+ * No single replica maintains a reference version list, so the first replica's
+ * version becomes the reference for comparison.
+ *
+ * @return true if versions mismatch, false if consistent
+ */
+bool ReplicaSplitOp::version_mismatch() const {
+ std::optional<eversion_t> ref_version;
+ constexpr shard_id_t NO_SHARD(-1);
+
+ for (const auto& [acting_index, sub_read] : sub_reads) {
+ ceph_assert(sub_read.internal_version.has_value());
+ std::map<shard_id_t, eversion_t> shard_vers;
+ decode(shard_vers, sub_read.internal_version->bl);
+
+ if (!shard_vers.contains(NO_SHARD)) {
+ ldout(cct, DBG_LVL) << __func__ << ": "
+ << "Replica version missing for acting index, failing split op." << dendl;
+ return true;
+ }
+
+ if (!ref_version) {
+ ref_version = shard_vers.at(NO_SHARD);
+ } else if (*ref_version != shard_vers.at(NO_SHARD)) {
+ return true;
+ }
+ }
+
+ return false;
+}
+
+/**
+ * @brief Complete the split operation by assembling and returning results.
+ *
+ * Assembles results from all sub-operations and invokes the original operation's
+ * completion handlers. For successful operations, results are assembled based on
+ * operation type (SPARSE_READ, READ, or other). For failures or version mismatches,
+ * triggers retry through the normal operation path.
+ *
+ * The buffer handling maintains compatibility with librados expectations, including
+ * special handling for pre-allocated buffers used by RadosStriper.
+ *
+ * @note Only called for successfully sent operations where abort=false
+ */
void SplitOp::complete() {
+ // STAGE 6: complete() only runs for successfully sent operations.
+ // If abort was set during creation, the split op was discarded and
+ // complete() is never called. This check handles the edge case where
+ // abort might be set after creation but before sending (though this
+ // should not happen in the current implementation).
if (abort) {
return;
}
- ldout(cct, 20) << __func__ << " entry this=" << this << dendl;
+ ldout(cct, DBG_LVL) << __func__ << " object_id=" << orig_op->target.base_oid << " entry this=" << this << dendl;
+ boost::system::error_code handler_error;
int rc = assemble_rc();
if (rc >= 0) {
// so as to reproduce as much as possible of the IO completion.
std::vector out_ops(orig_op->ops.begin(), orig_op->ops.end());
+ std::optional<bufferlist> read_bl;
+ if (orig_op->outbl) {
+ read_bl = bufferlist();
+ }
+
+ // Copy is necessary: process_op_reply_handlers() reads out_ops while
+ // modifying orig_op state. Overhead is acceptable for typical 1-3 ops.
for (unsigned ops_index=0; ops_index < out_ops.size(); ++ops_index) {
auto &out_osd_op = out_ops[ops_index];
switch (out_osd_op.op.op) {
auto [extents, bl] = assemble_buffer_sparse_read(ops_index);
encode(std::move(extents).detach(), out_osd_op.outdata);
encode_destructively(bl, out_osd_op.outdata);
+ if (read_bl) {
+ read_bl->append(out_osd_op.outdata);
+ }
break;
}
case CEPH_OSD_OP_READ: {
assemble_buffer_read(out_osd_op.outdata, ops_index);
+ if (read_bl) {
+ read_bl->append(out_osd_op.outdata);
+ }
break;
}
- case CEPH_OSD_OP_GETXATTRS:
- case CEPH_OSD_OP_CHECKSUM:
- case CEPH_OSD_OP_GETXATTR: {
- out_osd_op.outdata = sub_reads.at(*primary_shard).details[ops_index].bl;
- out_osd_op.rval = sub_reads.at(*primary_shard).details[ops_index].rval;
+ default: {
+ out_osd_op.outdata = std::move(sub_reads.at(reference_sub_read).details[ops_index].bl);
+ out_osd_op.rval = sub_reads.at(reference_sub_read).details[ops_index].rval;
break;
}
- default: {
- ceph_abort_msg("Not supported");
- break;
+ }
+ }
+
+ // Copied from Objecter::handle_osd_op_reply() to match correct API behaviour.
+ // This is policed in the librados test harness.
+ if (orig_op->outbl) {
+ ceph_assert(read_bl);
+ // Note: Objecter will check for a single buffer here - this will always
+ // be true. Here, there will frequently be multiple buffers due
+ // to the splitting and the original buffer still needs to be
+ // honoured.
+ if (orig_op->outbl->length() == read_bl->length()) {
+ // this is here to keep previous users to *relied* on getting data
+ // read into existing buffers happy. Notably,
+ // libradosstriper::RadosStriperImpl::aio_read().
+ ldout(cct,10) << __func__ << " copying resulting " << read_bl->length()
+ << " into existing ceph::buffer of length " << orig_op->outbl->length()
+ << dendl;
+ // The following seems a little convoluted, but the assumption is that
+ // there is a good reason why Sage Weil wrote it this way in Objecter.
+ bufferlist t;
+ t = std::move(*orig_op->outbl);
+ t.invalidate_crc(); // we're overwriting the raw buffers via c_str()
+ read_bl->begin().copy(read_bl->length(), t.c_str());
+ orig_op->outbl->substr_of(t, 0, read_bl->length());
+ } else {
+ // librados insists that if it provided a buffer and the client is
+ // going to be returning a buffer, that this buffer must be
+ // contiguous.
+ if (orig_op->outbl->length() != 0) {
+ read_bl->rebuild();
}
+ orig_op->outbl->substr_of(*read_bl, 0, read_bl->length());
}
+ orig_op->outbl = 0;
}
- objecter.handle_osd_op_reply2(orig_op, out_ops);
+ handler_error = objecter.process_op_reply_handlers(orig_op, out_ops);
+ ldout(cct, DBG_LVL) << __func__ << " object_id=" << orig_op->target.base_oid << " success this=" << this << " rc=" << rc << dendl;
+ } else {
+ ldout(cct, DBG_LVL) << __func__ << " object_id=" << orig_op->target.base_oid << " retry this=" << this << " rc=" << rc << dendl;
+ }
+ objecter.op_post_split_op_complete(orig_op, handler_error, rc);
+}
+
+/**
+ * @brief Add version tracking to sub-operations for torn read protection.
+ *
+ * Adds internal version queries to all sub-operations to enable detection of
+ * version mismatches. If sub-operations read different versions of the object
+ * (torn read), the operation will be retried.
+ */
+void SplitOp::protect_torn_reads() {
+ // If multiple reads are emitted from objecter, then it is essential that
+ // each read reads the same version. It is not possible to efficiently
+ // guarantee this, so instead read the version along with the data and if they
+ // are different, then repeat the read to the primary. Such version mismatches
+ // should be rare enough that this is not a significant performance impact.
+ for (auto&& [index, sr] : sub_reads) {
+ auto &internal_version = sr.internal_version;
+ internal_version = std::make_optional<InternalVersion>();
+ sr.rd.get_internal_versions(&internal_version->ec, &internal_version->bl);
+ }
+}
- ldout(cct, DBG_LVL) << __func__ << " success this=" << this << " rc=" << rc << dendl;
- Objecter::Op::complete(std::move(orig_op->onfinish), osdcode(rc), rc, objecter.service.get_executor());
- objecter._finish_op(orig_op, rc);
+void SplitOp::init(OSDOp &op, int ops_index) {
+ switch (op.op.op) {
+ case CEPH_OSD_OP_SPARSE_READ: {
+ init_read(op, true, ops_index);
+ break;
+ }
+ case CEPH_OSD_OP_READ: {
+ init_read(op, false, ops_index);
+ break;
+ }
+ default: {
+ // Invalid ops should have been rejected in validate.
+ Details &d = sub_reads.at(reference_sub_read).details[ops_index];
+ orig_op->pass_thru_op(sub_reads.at(reference_sub_read).rd, ops_index, &d.bl, &d.rval);
+ break;
+ }
+ }
+}
+
+#undef dout_prefix
+#define dout_prefix *_dout << " SplitOp::"
+
+namespace {
+std::pair<bool, bool> is_single_chunk(const pg_pool_t *pi, uint64_t offset, uint64_t len) {
+ if (!pi->is_erasure()) {
+ return {false, false};
+ }
+
+ uint64_t stripe_width = pi->get_stripe_width();
+
+ // Optimization: Use stripe_width / 2 as a threshold to quickly reject requests
+ // that cannot fit in a single chunk. Since k (data chunks) is at least 2,
+ // chunk_size = stripe_width / k <= stripe_width / 2. This early check avoids
+ // the more expensive division operation (stripe_width / data_chunk_count) below.
+ if (len > stripe_width / 2) {
+ return {false, false};
+ }
+ uint64_t data_chunk_count = pi->get_ec_data_shard_count();
+ uint32_t chunk_size = pi->get_stripe_width() / data_chunk_count;
+
+ // Chunk_size should never be zero, so this is paranoia.
+ if (len > chunk_size || chunk_size == 0) {
+ return {false, false};
+ }
+
+ uint64_t offset_to_end_of_chunk;
+
+ // Chunk size is normally, but not always a power of 2.
+ if (std::has_single_bit(chunk_size)) {
+ offset_to_end_of_chunk = chunk_size - (offset & (chunk_size - 1));
} else {
- ldout(cct, DBG_LVL) << __func__ << " retry this=" << this << " rc=" << rc << dendl;
- objecter.op_post_submit(orig_op);
+ offset_to_end_of_chunk = chunk_size - (offset % chunk_size);
}
+
+ if (len > offset_to_end_of_chunk) {
+ return {false, false};
+ }
+
+ return {true, offset % stripe_width < chunk_size};
}
-static bool validate(Objecter::Op *op, bool is_erasure, CephContext *cct) {
+/**
+ * Validate operation flags for split operation eligibility.
+ *
+ * Checks that the operation has the required BALANCE_READS flag and
+ * is not a write operation.
+ *
+ * @param pi The pool to validate against
+ * @param op The operation to validate
+ * @param cct CephContext for logging
+ * @return true if flags are valid, false otherwise
+ */
+bool validate_flags(const pg_pool_t *pi, Objecter::Op *op, CephContext *cct) {
+ if ((op->target.flags & CEPH_OSD_FLAG_BALANCE_READS) == 0) {
+ ldout(cct, DBG_LVL) << __func__ << " REJECT: Client rejects balanced read" << dendl;
+ return false;
+ }
+
+ // We really should not have a WRITE flagged as balanced read, but out of
+ // paranoia ignore it.
+ if (op->target.flags & CEPH_OSD_FLAG_WRITE) {
+ ldout(cct, DBG_LVL) << __func__ << " REJECT: Flagged as write!" << dendl;
+ return false;
+ }
- if ((op->target.flags & CEPH_OSD_FLAG_BALANCE_READS) == 0 ) {
- ldout(cct, DBG_LVL) << __func__ <<" REJECT: Client rejects balanced read" << dendl;
+ if (pi->has_flag(pg_pool_t::FLAG_CRIMSON)) {
+ ldout(cct, DBG_LVL) << __func__ <<" ABORT: Crimson doesn't support"
+ " direct reads" << dendl;
return false;
}
- uint64_t suitable_read_found = false;
- for (auto & o : op->ops) {
+ return true;
+}
+
+/**
+ * Validate operation types and check read size constraints.
+ *
+ * Processes all operations in the op list, checking for:
+ * - Supported operation types (READ, SPARSE_READ, and various metadata ops)
+ * - Valid read sizes (non-zero length, meets minimum size requirements)
+ * - Single chunk constraints for erasure coded pools
+ *
+ * @param op The operation to validate
+ * @param pi Pool information
+ * @param is_erasure Whether the pool is erasure coded
+ * @param replica_min_read_size Minimum read size for replica pools
+ * @param cct CephContext for logging
+ * @param[out] has_primary_ops Set to true if primary-only ops are found
+ * @param[out] single_direct_op Set to true if op can be sent directly to single OSD
+ * @return true if a suitable read operation was found, false otherwise
+ */
+bool validate_operations(Objecter::Op *op, const pg_pool_t *pi, bool is_erasure,
+ uint64_t replica_min_read_size, CephContext *cct,
+ bool &has_primary_ops, bool &single_direct_op) {
+ bool is_first_chunk = true;
+ bool suitable_read_found = false;
+
+ for (auto &o : op->ops) {
switch (o.op.op) {
case CEPH_OSD_OP_READ:
case CEPH_OSD_OP_SPARSE_READ: {
uint64_t length = o.op.extent.length;
+ if (length == 0) {
+ // length of zero actually means "the whole object". This code cannot
+ // know the size of the object efficiently, so reject the op.
+ ldout(cct, DBG_LVL) << __func__ << " REJECT: Zero length read" << dendl;
+ return false;
+ }
if ((is_erasure && length > 0) ||
- (!is_erasure && length >= kReplicaMinReadSize)) {
+ (!is_erasure && length >= replica_min_read_size)) {
suitable_read_found = true;
}
+ if (single_direct_op) {
+ auto [single_chunk, first_chunk] = is_single_chunk(pi, o.op.extent.offset, o.op.extent.length);
+ is_first_chunk = is_first_chunk && first_chunk;
+ single_direct_op = single_direct_op && single_chunk;
+ }
break;
}
case CEPH_OSD_OP_GETXATTRS:
case CEPH_OSD_OP_CHECKSUM:
- case CEPH_OSD_OP_GETXATTR: {
+ case CEPH_OSD_OP_GETXATTR:
+ case CEPH_OSD_OP_STAT:
+ case CEPH_OSD_OP_CMPXATTR:
+ case CEPH_OSD_OP_CALL: {
+ has_primary_ops = true;
break; // Do not block validate.
}
default: {
- ldout(cct, DBG_LVL) << __func__ <<" REJECT: unsupported op" << dendl;
+ ldout(cct, DBG_LVL) << __func__ << " REJECT: unsupported op" << dendl;
return false;
}
}
}
+ if (single_direct_op && has_primary_ops) {
+ single_direct_op = is_first_chunk;
+ }
+
return suitable_read_found;
}
-void SplitOp::init(OSDOp &op, int ops_index) {
- switch (op.op.op) {
- case CEPH_OSD_OP_SPARSE_READ: {
- init_read(op, true, ops_index);
- break;
- }
- case CEPH_OSD_OP_READ: {
- init_read(op, false, ops_index);
- break;
- }
- case CEPH_OSD_OP_GETXATTRS:
- case CEPH_OSD_OP_CHECKSUM:
- case CEPH_OSD_OP_GETXATTR: {
- shard_id_t shard = *primary_shard;
- Details &d = sub_reads.at(shard).details[ops_index];
- orig_op->pass_thru_op(sub_reads.at(shard).rd, ops_index, &d.bl, &d.rval);
- break;
- }
- default: {
- ldout(cct, DBG_LVL) << __func__ <<" ABORT: unsupported" << dendl;
- abort = true;
- break;
- }
+/**
+ * Validate if an operation is eligible for split operation optimization.
+ *
+ * This function performs a multi-stage validation to determine if an operation
+ * can be split across multiple OSDs for improved read performance:
+ * 1. Validates operation flags (BALANCE_READS required, no WRITE flag)
+ * 2. For replicated pools, rejects if min_split_replica_read_size is 0 (splitting disabled)
+ * 3. Validates operation types and read size constraints
+ *
+ * @param op The operation to validate
+ * @param objecter Objecter instance for configuration access
+ * @param pi Pool information
+ * @param cct CephContext for logging
+ * @return A pair of bools: {suitable_read_found, single_direct_op}
+ * - suitable_read_found: true if operation contains valid read(s)
+ * - single_direct_op: true if operation can be sent to single OSD
+ */
+std::pair<bool, bool> validate(Objecter::Op *op, Objecter &objecter,
+ const pg_pool_t *pi, CephContext *cct) {
+ bool is_erasure = pi->is_erasure();
+
+ // Validate flags
+ if (!validate_flags(pi, op, cct)) {
+ return {false, false};
}
+
+ // Initialize state for operation validation
+ bool has_primary_ops = nullptr != op->objver;
+ bool single_direct_op = is_erasure;
+
+ uint64_t replica_min_shard_read_size = objecter.get_min_split_replica_read_size();
+
+ if (!is_erasure && replica_min_shard_read_size == 0) {
+ ldout(cct, DBG_LVL) << __func__ << " REJECT: splitting disabled (min_split_replica_read_size=0)" << dendl;
+ return {false, false};
+ }
+
+ uint64_t replica_min_read_size = replica_min_shard_read_size * kReplicaMinShardReads;
+
+ // Validate operations and read sizes
+ bool suitable_read_found = validate_operations(op, pi, is_erasure,
+ replica_min_read_size, cct,
+ has_primary_ops, single_direct_op);
+
+ return {suitable_read_found, single_direct_op};
}
-namespace {
void debug_op_summary(const std::string &str, Objecter::Op *op, CephContext *cct) {
- auto &t = op->target;
+ auto &target = op->target;
ldout(cct, DBG_LVL) << str
- << " osd=" << t.osd
- << " shard=" << (t.force_shard ? *t.force_shard : shard_id_t(-1))
- << " balance_reads=" << ((t.flags & CEPH_OSD_FLAG_BALANCE_READS) != 0)
+ << " object_id=" << target.base_oid
+ << " tid=" << op->tid
+ << " pool=" << target.base_oloc.pool
+ << " pgid=" << target.actual_pgid
+ << " osd=" << target.osd
+ << " force_osd=" << ((target.flags & CEPH_OSD_FLAG_FORCE_OSD) != 0)
+ << " balance_reads=" << ((target.flags & CEPH_OSD_FLAG_BALANCE_READS) != 0)
<< " ops.size()=" << op->ops.size()
<< " needs_version=" << (op->objver?"true":"false");
}
}
+/**
+ * @brief Prepare a single-chunk operation for direct execution.
+ *
+ * When an operation can be satisfied by reading from a single OSD (e.g.,
+ * single chunk in EC pool), configures the operation for direct execution
+ * without splitting. Sets EC_DIRECT_READ and FORCE_OSD flags and updates
+ * the target OSD and shard ID.
+ *
+ * This optimization avoids split operation overhead when data can be
+ * retrieved from a single shard.
+ *
+ * @param op Operation to prepare
+ * @param objecter Objecter instance
+ * @param cct CephContext for logging
+ */
+void SplitOp::prepare_single_op(Objecter::Op *op, Objecter &objecter, CephContext *cct) {
+ auto &target = op->target;
+ const pg_pool_t *pi = objecter.osdmap->get_pg_pool(target.base_oloc.pool);
+ ceph_assert(pi);
+
+ objecter._calc_target(&op->target, op);
+ uint64_t data_chunk_count = pi->get_ec_data_shard_count();
+ uint32_t chunk_size = pi->get_stripe_width() / data_chunk_count;
+ // Find the first read to work out where the IO goes.
+ for (auto o : op->ops) {
+ if (o.op.op == CEPH_OSD_OP_SPARSE_READ ||
+ o.op.op == CEPH_OSD_OP_READ) {
+ raw_shard_id_t raw_shard((o.op.extent.offset) / chunk_size % data_chunk_count);
+ shard_id_t shard = pi->get_shard(raw_shard);
+ if (shard != shard_id_t::NO_SHARD) {
+ int acting_index = (int)shard;
+ if (objecter.osdmap->exists(op->target.acting[acting_index])) {
+ op->target.flags |= CEPH_OSD_FLAG_EC_DIRECT_READ;
+ op->target.flags |= CEPH_OSD_FLAG_FORCE_OSD;
+ target.osd = target.acting[acting_index];
+ target.actual_pgid.reset_shard(shard);
+ target.used_replica = (target.acting_primary != target.osd);
+ }
+ }
+ break;
+ }
+ }
+ debug_op_summary("reuse_op:", op, cct);
+}
+
+/**
+ * Create and initialize a split operation for parallel reads.
+ *
+ * This function implements the abort flag pattern to efficiently handle
+ * validation and creation failures:
+ *
+ * STAGE 1: Cheap validation tests (lines 643-666)
+ * - Check pool exists
+ * - Check split reads are enabled
+ * - Validate operation types and parameters
+ * - Return false immediately if validation fails (no split op created)
+ *
+ * STAGE 2: Create split op object (lines 668-674)
+ * - Allocate ECSplitOp or ReplicaSplitOp based on pool type
+ * - Constructor may detect issues and set abort flag
+ *
+ * STAGE 3: Check abort after construction (lines 676-679)
+ * - If abort is set during construction, discard split op and return false
+ * - This catches issues that can only be detected during object creation
+ *
+ * STAGE 4: Initialize sub-operations (lines 681-690)
+ * - Call init() for each operation in the request
+ * - init_read() may set abort if OSDs are missing or state is invalid
+ * - Break early if abort is detected to avoid unnecessary work
+ *
+ * STAGE 5: Final abort check (lines 692-695)
+ * - If abort was set during initialization, discard split op and return false
+ * - Allows fallback to normal (non-split) operation path
+ *
+ * STAGE 6: Send operations (lines 697-747)
+ * - Only reached if abort=false, meaning split op is valid
+ * - complete() will only run for these successfully sent operations
+ *
+ * @return true if split op was created and sent, false to use normal operation
+ */
bool SplitOp::create(Objecter::Op *op, Objecter &objecter,
- shunique_lock<ceph::shared_mutex>& sul, ceph_tid_t *ptid, int *ctx_budget, CephContext *cct) {
+ shunique_lock<ceph::shared_mutex>& sul, CephContext *cct) {
- auto &t = op->target;
- const pg_pool_t *pi = objecter.osdmap->get_pg_pool(t.base_oloc.pool);
+ auto &target = op->target;
+ const pg_pool_t *pi = objecter.osdmap->get_pg_pool(target.base_oloc.pool);
+ // STAGE 1: Cheap validation tests run first before creating split op
if (!pi) {
ldout(cct, DBG_LVL) << __func__ <<" REJECT: No Pool" << dendl;
return false;
}
- debug_op_summary("orig_op: ", op, cct);
+ debug_op_summary("orig_op:", op, cct);
+
+ // Reject if the operation is a snapshot operation
+ if (op->snapid != CEPH_NOSNAP || !op->snapc.empty()) {
+ ldout(cct, DBG_LVL) << __func__ <<" REJECT: snapshot operation" << dendl;
+ return false;
+ }
// Reject if direct reads not supported by profile.
if (!pi->has_flag(pg_pool_t::FLAG_CLIENT_SPLIT_READS)) {
return false;
}
- bool validated = validate(op, pi->is_erasure(), cct);
+ auto [validated, single_op] = validate(op, objecter, pi, cct);
if (!validated) {
return false;
}
+ if (single_op) {
+ ldout(cct, DBG_LVL) << __func__ <<" reusing original op " << dendl;
+ prepare_single_op(op, objecter, cct);
+ return false;
+ }
+
+ // STAGE 2: Create split op object (may set abort during construction)
std::shared_ptr<SplitOp> split_read;
if (pi->is_erasure()) {
split_read = std::make_shared<ReplicaSplitOp>(op, objecter, cct, pi->size);
}
+ // STAGE 3: Check if abort was set during construction
if (split_read->abort) {
- ldout(cct, DBG_LVL) << __func__ <<" ABORTED 1" << dendl;return false;
+ ldout(cct, DBG_LVL) << __func__ <<" ABORTED 1" << dendl;
return false;
}
// Populate the target, to extract the acting set from it.
- t.flags &= ~CEPH_OSD_FLAG_BALANCE_READS;
+ target.flags &= ~CEPH_OSD_FLAG_BALANCE_READS;
objecter._calc_target(&op->target, op);
+ // STAGE 4: Initialize sub-operations (may set abort if problems detected)
for (unsigned i = 0; i < op->ops.size(); ++i) {
split_read->init( op->ops[i], i);
if (split_read->abort) {
}
}
+ // STAGE 5: Final abort check - discard split op if problems were detected
if (split_read->abort) {
ldout(cct, DBG_LVL) << __func__ <<" ABORTED 2" << dendl;
return false;
}
- ldout(cct, DBG_LVL) << __func__ <<" sub_reads ready. count=" << split_read->sub_reads.size() << dendl;
+ // Ideally, the validate should detect any single-op read. However, if that
+ // fails, then this will catch the cases (albeit less efficiently).
+ if (split_read->sub_reads.size() <= 1) {
+ ldout(cct, DBG_LVL) << __func__ <<" reusing original op - inefficient" << dendl;
+ prepare_single_op(op, objecter, cct);
+ split_read->abort = true; // Required for destructor.
+ return false;
+ }
+
+ objecter.add_op_to_splitop_session(op);
+
+ split_read->protect_torn_reads();
+
+
+ op->split_op_tids = std::make_unique<std::vector<ceph_tid_t>>(split_read->sub_reads.size());
+ auto &tids = *op->split_op_tids;
+
+ int i=0;
// We are committed to doing a split read. Any re-attempts should not be either
// split or balanced.
- for (auto && [shard, sub_read] : split_read->sub_reads) {
+ for (auto && [index, sub_read] : split_read->sub_reads) {
auto fin = new Finisher(split_read, sub_read); // Self-destructs when called.
version_t *objver = nullptr;
- if (split_read->primary_shard && shard == *split_read->primary_shard) {
+ if (index == split_read->reference_sub_read) {
objver = split_read->orig_op->objver;
}
auto sub_op = objecter.prepare_read_op(
- t.base_oid, t.base_oloc, split_read->sub_reads.at(shard).rd, op->snapid,
+ target.base_oid, target.base_oloc, split_read->sub_reads.at(index).rd, op->snapid,
nullptr, split_read->flags, -1, fin, objver);
- sub_op->target.force_shard.emplace(shard);
+
+ auto &st = sub_op->target;
+ st = target; // Target can start off in same state as parent.
+ st.flags |= CEPH_OSD_FLAG_FORCE_OSD;
+ st.flags |= CEPH_OSD_FLAG_FAIL_ON_EAGAIN;
if (pi->is_erasure()) {
- sub_op->target.flags |= CEPH_OSD_FLAG_EC_DIRECT_READ;
+ st.flags |= CEPH_OSD_FLAG_EC_DIRECT_READ;
+ st.actual_pgid.reset_shard(shard_id_t(index));
} else {
- sub_op->target.flags |= CEPH_OSD_FLAG_BALANCE_READS;
+ st.flags |= CEPH_OSD_FLAG_BALANCE_READS;
}
+ st.osd = st.acting[index];
+
+ target.used_replica = (st.acting_primary != st.osd);
+
+ objecter._op_submit(sub_op, sul, &tids[i++]);
- objecter._op_submit_with_budget(sub_op, sul, ptid, ctx_budget);
debug_op_summary("sent_op", sub_op, cct);
}
ceph_assert(split_read->sub_reads.size() > 0);
+ ldout(cct, DBG_LVL) << __func__ << " object_id=" << target.base_oid
+ << " assigned parent tid=" << op->tid << dendl;
+
return true;
}
#pragma once
+#include <algorithm>
+#include <iterator>
+#include <map>
+#include <memory>
+#include <optional>
#include <ostream>
-#include <ranges>
+#include <utility>
+#include <vector>
+ #include <boost/system/error_code.hpp>
+
+#include "include/buffer_fwd.h"
+#include "include/ceph_assert.h"
+#include "include/interval_set.h"
+#include "include/types.h"
+
+#include "common/ceph_context.h"
+#include "common/ceph_mutex.h"
+#include "common/error_code.h"
+#include "common/mini_flat_map.h"
+#include "common/shunique_lock.h"
+
+#include "osd/ECTypes.h"
+#include "osd/osd_types.h"
+
+#include "Objecter.h"
+
+/**
+ * @class SplitOp
+ * @brief Base class for splitting operations across multiple OSDs for improved performance.
+ *
+ * SplitOp implements a pattern for parallelizing operations by distributing them across
+ * multiple OSDs in a pool. This optimization is particularly effective for:
+ * - Erasure-coded pools: operations can be distributed across data shards
+ * - Replicated pools: large operations can be split across replicas
+ *
+ * The split operation pattern works as follows:
+ * 1. Validation: Check if the operation is eligible for splitting
+ * 2. Creation: Create sub-operations targeting different OSDs
+ * 3. Execution: Send sub-operations in parallel
+ * 4. Assembly: Reconstruct results from sub-operation responses
+ * 5. Completion: Return assembled results to the client
+ *
+ * Key features:
+ * - Consistency protection: Ensures all sub-operations see the same object version
+ * - Automatic fallback: Falls back to normal operation if splitting is not beneficial
+ * - Error handling: Properly handles partial failures and version mismatches
+ *
+ * Currently optimized for read operations, with potential for future write support.
+ *
+ * @see ECSplitOp for erasure-coded pool implementation
+ * @see ReplicaSplitOp for replicated pool implementation
+ */
class SplitOp {
protected:
using extents_map = std::map<uint64_t, uint64_t>;
using extent_set = interval_set<uint64_t, std::map, false>;
- using ExtentPredicate = std::function<bool(const extent&)>;
- using extent_map_subrange = std::ranges::subrange<extents_map::const_iterator>;
- using extent_map_subrange_view = std::ranges::take_while_view<extent_map_subrange, ExtentPredicate>;
- using extent_variant = std::variant<std::ranges::single_view<extent>, extent_map_subrange_view, extent_map_subrange>;
- using buffer_appender = std::function<void(bufferlist &, uint64_t *, extent_variant)>;
-
- // A simple struct to hold the data for each step of the iteration.
+ /**
+ * @struct ECChunkInfo
+ * @brief Information about a single chunk in an erasure-coded stripe iteration.
+ *
+ * This structure holds the mapping between logical object offsets and physical
+ * shard locations for a single chunk within an EC stripe. Used by ECStripeIterator
+ * to track position during stripe traversal.
+ */
struct ECChunkInfo {
uint64_t ro_offset;
uint64_t shard_offset;
uint64_t length;
- shard_id_t shard;
+ raw_shard_id_t raw_shard;
- friend std::ostream & operator<<(std::ostream &os, const ECChunkInfo &obj) {
+ friend std::ostream & operator<<(std::ostream &os, const ECChunkInfo &chunk_info) {
return os
- << "ro_offset: " << obj.ro_offset
- << " shard_offset: " << obj.shard_offset
- << " length: " << obj.length
- << " shard: " << obj.shard;
+ << "ro_offset: " << chunk_info.ro_offset
+ << " shard_offset: " << chunk_info.shard_offset
+ << " length: " << chunk_info.length
+ << " raw_shard: " << (int)chunk_info.raw_shard;
}
};
+ /**
+ * @class ECStripeIterator
+ * @brief Iterator for traversing erasure-coded stripes chunk by chunk.
+ *
+ * ECStripeIterator provides a standard C++ iterator interface for walking through
+ * an erasure-coded stripe, yielding information about each chunk's location in both
+ * the logical object space and the physical shard space.
+ *
+ * The iterator handles the complex mapping between:
+ * - Logical object offsets (ro_offset)
+ * - Physical shard offsets (shard_offset)
+ * - Shard identifiers (raw_shard)
+ * - Chunk boundaries and lengths
+ *
+ * Algorithm:
+ * - Starts at a given offset and iterates through chunks sequentially
+ * - Automatically wraps around shards when reaching the end of a stripe
+ * - Handles partial chunks at the beginning and end of the range
+ * - Terminates when all requested data has been covered
+ *
+ * @note Conforms to std::input_iterator concept
+ */
class ECStripeIterator {
public:
using iterator_category = std::input_iterator_tag;
uint64_t chunk = start_offset / chunk_size;
current_info.length = std::min(total_len, (chunk + 1) * chunk_size - start_offset);
- // Maybe this is paranoia, as compiler would probably detect that this
- // / and % could be done in a single op.
- auto chunk_div = std::lldiv(chunk, data_chunk_count);
- current_info.shard = shard_id_t(chunk_div.rem);
- current_info.shard_offset = (chunk_div.quot) * chunk_size + start_offset % chunk_size;
+ current_info.raw_shard = raw_shard_id_t(chunk % data_chunk_count);
+ current_info.shard_offset = (chunk / data_chunk_count) * chunk_size +
+ start_offset % chunk_size;
}
value_type operator*() const {
current_info.shard_offset += current_info.length - chunk_size;
current_info.length = std::min(chunk_size, end_offset - current_info.ro_offset);
ceph_assert(current_info.ro_offset <= end_offset);
- ++current_info.shard;
- if (unsigned(current_info.shard) == data_chunk_count) {
+ ++current_info.raw_shard;
+ if (std::cmp_equal((int)current_info.raw_shard, data_chunk_count)) {
current_info.shard_offset += chunk_size;
- current_info.shard = shard_id_t(0);
+ current_info.raw_shard = raw_shard_id_t(0);
}
return *this;
}
uint32_t data_chunk_count = 0;
};
-
- // The custom range class that provides begin() and end()
+ /**
+ * @class ECStripeView
+ * @brief Range view for iterating over erasure-coded stripes.
+ *
+ * ECStripeView provides a range-based interface for traversing an erasure-coded
+ * stripe. It encapsulates the stripe geometry (chunk size, data chunk count) and
+ * provides begin()/end() iterators for use in range-based for loops.
+ *
+ * Usage:
+ * @code
+ * ECStripeView stripe_view(offset, length, pool_info);
+ * for (auto chunk_info : stripe_view) {
+ * // Process each chunk
+ * }
+ * @endcode
+ *
+ * @param offset Starting offset in the logical object
+ * @param length Total length to iterate over
+ * @param pi Pool information containing stripe geometry
+ */
class ECStripeView {
public:
ECStripeView(
const pg_pool_t *pi)
: start_offset(offset),
total_length(length),
- data_chunk_count(pi->nonprimary_shards.size() + 1),
+ data_chunk_count(pi->get_ec_data_shard_count()),
chunk_size(pi->get_stripe_width() / data_chunk_count) {
}
static_assert(std::input_iterator<ECStripeIterator>,
"ECStripeIterator does not conform to the std::input_iterator concept");
+ /**
+ * @struct Details
+ * @brief Holds response data and metadata for a single operation in a sub-read.
+ *
+ * Contains the buffer, return value, error code, and optional extent map
+ * returned from executing one operation within a SubRead.
+ */
struct Details {
bufferlist bl;
int rval;
std::optional<extents_map> e;
};
+ /**
+ * @struct InternalVersion
+ * @brief Holds internal version information for torn read protection.
+ *
+ * Used to detect version mismatches across parallel sub-operations,
+ * ensuring all operations see the same object version.
+ */
+ struct InternalVersion {
+ boost::system::error_code ec;
+ bufferlist bl;
+ };
+
+ /**
+ * @struct SubRead
+ * @brief Represents a single sub-operation sent to one OSD.
+ *
+ * Contains the operation descriptor, response details for each operation,
+ * return code, and optional version information for consistency checking.
+ *
+ * @param count Number of operations in this sub-read
+ */
struct SubRead {
::ObjectOperation rd;
mini_flat_map<int, Details> details;
int rc = -EIO;
+ std::optional<InternalVersion> internal_version;
SubRead(int count) : details(count) {}
};
- // This structure self-destructs on each IO completions, using a legacy
- // C++ pattern (no shared_ptr). We use the finish callback to record the
- // RC, but otherwise rely on the shared_ptr destroying ec_read to deal with
- // completion of the parent IO.
+ /**
+ * @struct Finisher
+ * @brief Completion callback for sub-operations.
+ *
+ * This structure self-destructs on IO completion, using a legacy C++ pattern
+ * (no shared_ptr). The finish callback records the return code, while the
+ * shared_ptr to the parent SplitOp handles overall completion.
+ *
+ * @note Self-destructs when called
+ */
struct Finisher : Context {
std::shared_ptr<SplitOp> split_read;
SubRead &sub_read;
}
};
- int assemble_rc();
- virtual std::pair<extent_set, bufferlist> assemble_buffer_sparse_read(int ops_index) = 0;
- virtual void assemble_buffer_read(bufferlist &bl_out, int ops_index) = 0;
+ int assemble_rc() const;
+ virtual std::pair<extent_set, bufferlist> assemble_buffer_sparse_read(int ops_index) const = 0;
+ virtual void assemble_buffer_read(bufferlist &bl_out, int ops_index) const = 0;
virtual void init_read(OSDOp &op, bool sparse, int ops_index) = 0;
+ virtual bool version_mismatch() const = 0;
void init(OSDOp &op, int ops_index);
Objecter::Op *orig_op;
Objecter &objecter;
- mini_flat_map<shard_id_t, SubRead> sub_reads;
+ mini_flat_map<int, SubRead> sub_reads;
CephContext *cct;
- bool abort = false; // Last minute abort... We want to keep this to a minimum.
+
+ /**
+ * Abort flag pattern for split operation creation:
+ *
+ * This flag implements a multi-stage validation and creation pattern that
+ * minimizes wasted work when a split operation cannot be completed:
+ *
+ * 1. Cheap validation tests run first in validate() and create() before
+ * creating the split op object (e.g., checking pool flags, operation types)
+ *
+ * 2. If validation fails, return false immediately without creating split op
+ *
+ * 3. If validation passes, create the split op and begin initialization
+ *
+ * 4. During init_read() and init(), set abort=true if problems are detected
+ * that prevent successful operation (e.g., missing OSDs, invalid state)
+ *
+ * 5. After initialization, check abort flag in create() and discard the
+ * split op if set (return false to fall back to normal operation)
+ *
+ * 6. The complete() method only runs for successfully sent operations where
+ * abort=false, ensuring cleanup only happens for valid split ops
+ *
+ * This pattern ensures expensive initialization work is only done when likely
+ * to succeed, while still catching edge cases that can only be detected during
+ * the creation process itself.
+ */
+ bool abort = false;
int flags = 0;
- std::optional<shard_id_t> primary_shard;
- std::map<shard_id_t, std::vector<int>> op_offset_map;
+ int reference_sub_read = -1;
+ std::map<int, std::vector<int>> op_offset_map;
public:
- SplitOp(Objecter::Op *op, Objecter &objecter, CephContext *cct, int count) : orig_op(op), objecter(objecter), sub_reads(count), cct(cct) {}
- virtual ~SplitOp() = default;
- void complete();
- static bool create(Objecter::Op *op, Objecter &objecter,
- shunique_lock<ceph::shared_mutex>& sul, ceph_tid_t *ptid, int *ctx_budget, CephContext *cct);
+ /**
+ * @brief Construct a SplitOp.
+ * @param op Original operation to be split
+ * @param objecter Objecter instance for OSD communication
+ * @param cct CephContext for logging and configuration
+ * @param count Number of sub-operations to create
+ */
+ SplitOp(Objecter::Op *op, Objecter &objecter, CephContext *cct, int count) : orig_op(op), objecter(objecter), sub_reads(count), cct(cct) {}
+
+ virtual ~SplitOp() = default;
+
+ /**
+ * @brief Complete the split operation by assembling results.
+ *
+ * Called when all sub-operations have completed. Assembles the results
+ * from individual sub-operations, handles version mismatches, and invokes
+ * the original operation's completion handlers.
+ */
+ void complete();
+
+ /**
+ * @brief Prepare a single-chunk operation for direct execution.
+ *
+ * When an operation can be satisfied by reading from a single OSD (e.g.,
+ * single chunk in EC pool), this method configures the operation for
+ * direct execution without splitting.
+ *
+ * @param op Operation to prepare
+ * @param objecter Objecter instance
+ * @param cct CephContext for logging
+ */
+ static void prepare_single_op(Objecter::Op *op, Objecter &objecter, CephContext *cct);
+
+ /**
+ * @brief Add version tracking to sub-operations for consistency.
+ *
+ * Ensures all sub-operations read the same object version by adding
+ * internal version queries. If versions mismatch, the operation is retried.
+ */
+ void protect_torn_reads();
+
+ /**
+ * @brief Create and initialize a split operation.
+ *
+ * Main entry point for split operation creation. Validates the operation,
+ * creates appropriate sub-operations, and sends them to target OSDs.
+ *
+ * Uses a multi-stage abort pattern:
+ * 1. Cheap validation (pool checks, operation types)
+ * 2. Object creation (may set abort flag)
+ * 3. Initialization (may set abort if OSDs missing)
+ * 4. Final validation before sending
+ *
+ * @param op Operation to potentially split
+ * @param objecter Objecter instance
+ * @param sul Shared lock for OSD map access
+ * @param cct CephContext for logging
+ * @return true if operation was split and sent, false to use normal path
+ */
+ static bool create(Objecter::Op *op, Objecter &objecter,
+ shunique_lock<ceph::shared_mutex>& sul, CephContext *cct);
};
+/**
+ * @class ECSplitOp
+ * @brief Split operation implementation for erasure-coded pools.
+ *
+ * ECSplitOp handles splitting operations across data shards in an erasure-coded
+ * pool. It implements the stripe iteration and buffer assembly logic specific to
+ * EC pools, where data is distributed across multiple shards according to the
+ * erasure coding scheme.
+ *
+ * Key responsibilities:
+ * - Iterate through EC stripes to determine which shards to read
+ * - Assemble data from multiple shards in the correct order
+ * - Handle sparse reads with extent maps
+ * - Verify version consistency across shards
+ *
+ * @see ECStripeIterator for stripe traversal algorithm
+ * @see ECStripeView for range-based stripe iteration
+ */
class ECSplitOp : public SplitOp{
public:
using SplitOp::SplitOp;
- std::pair<extent_set, bufferlist> assemble_buffer_sparse_read(int ops_index) override;
- void assemble_buffer_read(bufferlist &bl_out, int ops_index) override;
+
+ /**
+ * @brief Assemble sparse read results from EC shards.
+ *
+ * Iterates through the EC stripe, collecting extent maps and data buffers
+ * from each shard in the correct order to reconstruct the logical object view.
+ *
+ * @param ops_index Index of the operation in the operation list
+ * @return Pair of extent set and assembled buffer
+ */
+ std::pair<extent_set, bufferlist> assemble_buffer_sparse_read(int ops_index) const override;
+
+ /**
+ * @brief Assemble dense read results from EC shards.
+ *
+ * Iterates through the EC stripe, collecting data buffers from each shard
+ * in the correct order to reconstruct the contiguous logical object data.
+ *
+ * @param bl_out Output buffer to append assembled data
+ * @param ops_index Index of the operation in the operation list
+ */
+ void assemble_buffer_read(bufferlist &bl_out, int ops_index) const override;
+
+ /**
+ * @brief Initialize read sub-operations for EC pool.
+ *
+ * Determines which shards need to be read based on the stripe geometry,
+ * creates sub-operations for each required shard, and validates that all
+ * target OSDs are available.
+ *
+ * @param op Operation descriptor
+ * @param sparse Whether this is a sparse read
+ * @param ops_index Index of the operation in the operation list
+ */
void init_read(OSDOp &op, bool sparse, int ops_index) override;
- ECSplitOp(Objecter::Op *op, Objecter &objecter, CephContext *cct, int count);
+
+ /**
+ * @brief Check for version mismatches across EC shards.
+ *
+ * Compares the internal versions returned by each shard to ensure all
+ * sub-operations read the same object version. Returns true if any
+ * mismatch is detected.
+ *
+ * @return true if versions mismatch, false if consistent
+ */
+ bool version_mismatch() const override;
+
~ECSplitOp() {
complete();
}
};
+/**
+ * @class ReplicaSplitOp
+ * @brief Split operation implementation for replicated pools.
+ *
+ * ReplicaSplitOp handles splitting large operations across replicas in a
+ * replicated pool. It divides the operation into chunks and distributes them
+ * across available replicas for parallel execution.
+ *
+ * Key responsibilities:
+ * - Divide operations into appropriately-sized chunks
+ * - Distribute chunks across available replicas
+ * - Assemble results in the correct order
+ * - Verify version consistency across replicas
+ *
+ * The chunk size is determined by configuration and the number of available
+ * replicas, with a minimum threshold to ensure splitting is beneficial.
+ */
class ReplicaSplitOp : public SplitOp {
public:
using SplitOp::SplitOp;
- std::pair<extent_set, bufferlist> assemble_buffer_sparse_read(int ops_index) override;
- void assemble_buffer_read(bufferlist &bl_out, int ops_index) override;
+
+ /**
+ * @brief Assemble sparse read results from replicas.
+ *
+ * Collects extent maps and data buffers from each replica sub-operation
+ * and combines them into a single result.
+ *
+ * @param ops_index Index of the operation in the operation list
+ * @return Pair of extent set and assembled buffer
+ */
+ std::pair<extent_set, bufferlist> assemble_buffer_sparse_read(int ops_index) const override;
+
+ /**
+ * @brief Assemble dense read results from replicas.
+ *
+ * Collects data buffers from each replica sub-operation and appends them
+ * in order to reconstruct the complete result.
+ *
+ * @param bl_out Output buffer to append assembled data
+ * @param ops_index Index of the operation in the operation list
+ */
+ void assemble_buffer_read(bufferlist &bl_out, int ops_index) const override;
+
+ /**
+ * @brief Initialize read sub-operations for replicated pool.
+ *
+ * Divides the operation into chunks and creates sub-operations targeting
+ * different replicas. The chunk size and distribution are determined by
+ * configuration and the number of available OSDs.
+ *
+ * @param op Operation descriptor
+ * @param sparse Whether this is a sparse read
+ * @param ops_index Index of the operation in the operation list
+ */
void init_read(OSDOp &op, bool sparse, int ops_index) override;
- ReplicaSplitOp(Objecter::Op *op, Objecter &objecter, CephContext *cct, int pool_size);
+
+ /**
+ * @brief Check for version mismatches across replicas.
+ *
+ * Compares the versions returned by each replica to ensure all
+ * sub-operations read the same object version. Returns true if any
+ * mismatch is detected.
+ *
+ * @return true if versions mismatch, false if consistent
+ */
+ bool version_mismatch() const override;
+
+ /**
+ * @brief Construct a ReplicaSplitOp.
+ * @param op Original operation to be split
+ * @param objecter Objecter instance
+ * @param cct CephContext for logging
+ * @param pool_size Number of replicas in the pool
+ */
+ ReplicaSplitOp(Objecter::Op *op, Objecter &objecter, CephContext *cct, int pool_size) :
+ SplitOp(op, objecter, cct, pool_size) {}
+
~ReplicaSplitOp() {
complete();
}