]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
osdc: Refactor SplitOp
authorAlex Ainscow <aainscow@uk.ibm.com>
Thu, 5 Feb 2026 14:45:04 +0000 (14:45 +0000)
committerJon Bailey <jonathan.bailey1@ibm.com>
Thu, 28 May 2026 14:15:50 +0000 (15:15 +0100)
There are large number of changes in this commit which were found through
development and testing of split ops.

I have split out all the objecter updates carefully, but since the split op
code is not currently used in production, I have not documented every change
and made significant refactors/rearrangements.

Signed-off-by: Alex Ainscow <aainscow@uk.ibm.com>
src/osdc/SplitOp.cc
src/osdc/SplitOp.h

index 5363f1000eccb5cbbafde5f89d82fd13de43c5aa..50e1b2e75e4958488755b84209485a879cbaf415 100644 (file)
@@ -9,10 +9,12 @@
  * Foundation.  See file COPYING.
  */
 
-#include "osdc/Objecter.h"
 #include "osdc/SplitOp.h"
+#include "osdc/Objecter.h"
 #include "osd/osd_types.h"
 
+using namespace std::literals;
+
 #define dout_subsys ceph_subsys_objecter
 #define DBG_LVL 20
 
@@ -22,41 +24,54 @@ inline boost::system::error_code osdcode(int r) {
 }
 }
 
-constexpr static uint64_t kReplicaMinShardReadSize = 128 * 1024;
 constexpr static uint64_t kReplicaMinShardReads = 2;
-constexpr static uint64_t kReplicaMinReadSize = kReplicaMinShardReadSize * kReplicaMinShardReads;
 
 #undef dout_prefix
 #define dout_prefix *_dout << " ECSplitOp::"
 
-std::pair<SplitOp::extent_set, bufferlist> ECSplitOp::assemble_buffer_sparse_read(int ops_index) {
+/**
+ * @brief Assemble sparse read results from EC shards into logical object view.
+ *
+ * Iterates through the EC stripe using ECStripeView, collecting extent maps
+ * and data buffers from each shard. The resulting buffer is built up in order,
+ * only extending (never inserting into the middle), which maintains buffer
+ * efficiency and allows for zero-copy operations where possible.
+ *
+ * @param ops_index Index of the operation in the operation list
+ * @return Pair containing the extent set and assembled buffer list
+ */
+std::pair<SplitOp::extent_set, bufferlist> ECSplitOp::assemble_buffer_sparse_read(int ops_index) const {
   bufferlist bl_out;
   extent_set extents_out;
 
-
   auto &orig_osd_op = orig_op->ops[ops_index].op;
   const pg_pool_t *pi = objecter.osdmap->get_pg_pool(orig_op->target.base_oloc.pool);
+  ceph_assert(pi);
   ECStripeView stripe_view(orig_osd_op.extent.offset, orig_osd_op.extent.length, pi);
-  ldout(cct, DBG_LVL) << __func__ << " start:"
+  ldout(cct, DBG_LVL) << __func__ << " START -->" << " object_id=" << orig_op->target.base_oid << " tid=" << orig_op->tid << " extent="
     << orig_osd_op.extent.offset << "~" << orig_osd_op.extent.length << dendl;
 
   std::vector<uint64_t> buffer_offset(stripe_view.data_chunk_count);
-  mini_flat_map<shard_id_t, extents_map::iterator> map_iterators(stripe_view.data_chunk_count);
+  mini_flat_map<int, extents_map::const_iterator> map_iterators(stripe_view.data_chunk_count);
 
   for (auto &&chunk_info : stripe_view) {
-    ldout(cct, DBG_LVL) << __func__ << " chunk: " << chunk_info << dendl;
-    auto &details = sub_reads.at(chunk_info.shard).details[ops_index];
-
-    if (!map_iterators.contains(chunk_info.shard)) {
-      map_iterators.emplace(chunk_info.shard, details.e->begin());
+    shard_id_t shard = pi->get_shard(chunk_info.raw_shard);
+    ceph_assert(shard != shard_id_t::NO_SHARD);
+    int shard_index = (int)shard;
+    ldout(cct, DBG_LVL) << __func__ << " object_id=" << orig_op->target.base_oid << " tid=" << orig_op->tid << " chunk: " << chunk_info
+        << " shard: " << shard << dendl;
+    auto &details = sub_reads.at(shard_index).details.at(ops_index);
+
+    if (!map_iterators.contains(shard_index)) {
+      map_iterators.emplace(shard_index, details.e->begin());
     }
 
-    extents_map::iterator &extent_iter = map_iterators.at(chunk_info.shard);
+    extents_map::const_iterator &extent_iter = map_iterators.at(shard_index);
 
     uint64_t bl_len = 0;
     while (extent_iter != details.e->end() && extent_iter->first < chunk_info.ro_offset + stripe_view.chunk_size) {
       auto [off, len] = *extent_iter;
-      ldout(cct, DBG_LVL) << __func__ << " extent=" << off << "~" <<  len << dendl;
+      ldout(cct, DBG_LVL) << __func__ << " object_id=" << orig_op->target.base_oid << " tid=" << orig_op->tid << " extent=" << off << "~" <<  len << dendl;
       extents_out.insert(off, len);
       bl_len += len;
       ++extent_iter;
@@ -65,138 +80,213 @@ std::pair<SplitOp::extent_set, bufferlist> ECSplitOp::assemble_buffer_sparse_rea
     // We try to keep the buffers together where possible.
     if (bl_len != 0) {
       bufferlist bl;
-      bl.substr_of(details.bl, buffer_offset[(int)chunk_info.shard], bl_len);
+      bl.substr_of(details.bl, buffer_offset[(int)chunk_info.raw_shard], bl_len);
       bl_out.append(bl);
-      buffer_offset[(int)chunk_info.shard] += bl_len;
+      buffer_offset[(int)chunk_info.raw_shard] += bl_len;
     }
   }
 
+  ldout(cct, DBG_LVL) << __func__ << " END -->" << " object_id=" << orig_op->target.base_oid << " tid=" << orig_op->tid << " extent="
+    << orig_osd_op.extent.offset << "~" << orig_osd_op.extent.length << dendl;
+
   return std::pair(extents_out, bl_out);
 }
 
-void ECSplitOp::assemble_buffer_read(bufferlist &bl_out, int ops_index) {
+/**
+ * @brief Assemble dense read results from EC shards into contiguous buffer.
+ *
+ * Iterates through the EC stripe, extracting data from each shard in order.
+ * The output buffer is built sequentially by appending data, never inserting
+ * into the middle, which maintains buffer efficiency.
+ *
+ * @param bl_out Output buffer to append assembled data
+ * @param ops_index Index of the operation in the operation list
+ */
+void ECSplitOp::assemble_buffer_read(bufferlist &bl_out, int ops_index) const {
   auto &orig_osd_op = orig_op->ops[ops_index].op;
   const pg_pool_t *pi = objecter.osdmap->get_pg_pool(orig_op->target.base_oloc.pool);
+  ceph_assert(pi);
   ECStripeView stripe_view(orig_osd_op.extent.offset, orig_osd_op.extent.length, pi);
 
   std::vector<uint64_t> buffer_offset(stripe_view.data_chunk_count);
-  ldout(cct, DBG_LVL) << __func__ << " " << orig_osd_op.extent.offset << "~" << orig_osd_op.extent.length << dendl;
+  ldout(cct, DBG_LVL) << __func__ << " object_id=" << orig_op->target.base_oid << " tid=" << orig_op->tid << " extent="
+    << orig_osd_op.extent.offset << "~" << orig_osd_op.extent.length << dendl;
 
   for (auto &&chunk_info : stripe_view) {
-    ldout(cct, DBG_LVL) << __func__ << " chunk info " << chunk_info << dendl;
-    auto &details = sub_reads.at(chunk_info.shard).details[ops_index];
+    ldout(cct, DBG_LVL) << __func__ << " object_id=" << orig_op->target.base_oid << " tid=" << orig_op->tid << " chunk info " << chunk_info << dendl;
+    shard_id_t shard = pi->get_shard(chunk_info.raw_shard);
+    ceph_assert(shard != shard_id_t::NO_SHARD);
+    int shard_index = (int)shard;
+    auto &details = sub_reads.at(shard_index).details.at(ops_index);
+    uint64_t src_len = details.bl.length();
+    uint64_t buf_off = buffer_offset[(int)chunk_info.raw_shard];
+    if (src_len <= buf_off) {
+      // Early termination: we've exhausted the available buffer data.
+      // This is a normal completion scenario, not an error condition.
+      break;
+    }
+    uint64_t len = std::min(chunk_info.length, src_len - buf_off);
     bufferlist bl;
-    bl.substr_of(details.bl, buffer_offset[(int)chunk_info.shard], chunk_info.length);
+    bl.substr_of(details.bl, buf_off, len);
     bl_out.append(bl);
-    buffer_offset[(int)chunk_info.shard] += chunk_info.length;
+    buffer_offset[(int)chunk_info.raw_shard] += len;
   }
 }
 
+/**
+ * @brief Initialize read sub-operations for erasure-coded pool.
+ *
+ * Determines which EC shards need to be read based on stripe geometry and
+ * creates sub-operations for each required shard. Sets the abort flag if
+ * any required OSD is unavailable.
+ *
+ * Primary shard is required when reading from multiple chunks, when version
+ * information is requested, or when non-read operations are present.
+ *
+ * @param op Operation descriptor containing offset and length
+ * @param sparse Whether this is a sparse read operation
+ * @param ops_index Index of the operation in the operation list
+ */
 void ECSplitOp::init_read(OSDOp &op, bool sparse, int ops_index) {
-  auto &t = orig_op->target;
-  const pg_pool_t *pi = objecter.osdmap->get_pg_pool(t.base_oloc.pool);
+  auto &target = orig_op->target;
+  const pg_pool_t *pi = objecter.osdmap->get_pg_pool(target.base_oloc.pool);
+  ceph_assert(pi);
+
   uint64_t offset = op.op.extent.offset;
   uint64_t length = op.op.extent.length;
-  uint64_t data_chunk_count = pi->nonprimary_shards.size() + 1;
+  uint64_t data_chunk_count = pi->get_ec_data_shard_count();
   uint32_t chunk_size = pi->get_stripe_width() / data_chunk_count;
   uint64_t start_chunk = offset / chunk_size;
   // This calculation is wrong for length = 0, but such IOs should not have
-  // reached here!
+  // reached here! Zero-length reads are rejected earlier in the validate()
+  // function (see lines 551-556), which returns {false, false} for any
+  // operation with length == 0, preventing it from being split and processed.
   ceph_assert( op.op.extent.length != 0);
+  // No overflow: length >= 1 (asserted above), so offset + length - 1 <= offset + length.
+  // Since offset and length are uint64_t, their sum cannot exceed UINT64_MAX in practice
+  // due to object size limits and memory constraints. Even if offset + length == UINT64_MAX + 1
+  // (impossible in uint64_t), subtracting 1 brings it back to UINT64_MAX (valid).
   uint64_t end_chunk = (offset + op.op.extent.length - 1) / chunk_size;
 
   unsigned count = std::min(data_chunk_count, end_chunk - start_chunk + 1);
-  //FIXME: This is not quite right - the ops.size() > 1 does not necessarily mean
-  // that the primary is required - it could be two reads to the same shard.
-  bool primary_required = count > 1 || orig_op->objver || orig_op->ops.size() > 1;
+  // Primary is required if:
+  // 1. Reading from multiple chunks (count > 1)
+  // 2. Version information is needed (orig_op->objver)
+  // 3. There are non-read operations that need the primary
+  bool has_non_read_ops = false;
+  for (const auto &op : orig_op->ops) {
+    if (op.op.op != CEPH_OSD_OP_READ && op.op.op != CEPH_OSD_OP_SPARSE_READ) {
+      has_non_read_ops = true;
+      break;
+    }
+  }
+  bool primary_required = count > 1 || orig_op->objver || has_non_read_ops;
 
   int first_shard = start_chunk % data_chunk_count;
   // Check all shards are online.
   for (unsigned i = first_shard; i < first_shard + count; i++) {
-    shard_id_t shard(i >= data_chunk_count ? i - data_chunk_count : i);
-    int direct_osd = t.acting[(int)shard];
-    // TODO: This is broken because there might be multiple shards on one OSD.
-    // Will fix in the next PR.
-    if (t.acting_primary == direct_osd) {
-      primary_shard.emplace(shard);
+    raw_shard_id_t raw_shard(i >= data_chunk_count ? i - data_chunk_count : i);
+    shard_id_t shard(pi->get_shard(raw_shard));
+    if (shard == shard_id_t::NO_SHARD) {
+      abort = true;
+      return;
+    }
+    int shard_index = (int)shard;
+
+    int direct_osd = target.acting[shard_index];
+    if (target.actual_pgid.shard == shard) {
+      reference_sub_read = shard_index;
     }
     if (!objecter.osdmap->exists(direct_osd)) {
       ldout(cct, DBG_LVL) << __func__ <<" ABORT: Missing OSD" << dendl;
       abort = true;
       return;
     }
-    if (!sub_reads.contains(shard)) {
-      sub_reads.emplace(shard, orig_op->ops.size());
+    if (!sub_reads.contains(shard_index)) {
+      sub_reads.emplace(shard_index, orig_op->ops.size() + 1);
     }
-    auto &d = sub_reads.at(shard).details[ops_index];
+    auto &d = sub_reads.at(shard_index).details[ops_index];
     if (sparse) {
       d.e.emplace();
-      sub_reads.at(shard).rd.sparse_read(offset, length, &(*d.e), &d.bl, &d.rval);
+      sub_reads.at(shard_index).rd.sparse_read(offset, length, &(*d.e), &d.bl, &d.rval);
     } else {
-      sub_reads.at(shard).rd.read(offset, length, &d.ec, &d.bl);
+      sub_reads.at(shard_index).rd.read(offset, length, &d.ec, &d.bl);
     }
   }
 
-  if (primary_required && !primary_shard) {
-    for (unsigned i=0; i < t.acting.size(); ++i) {
-      // TODO: Can't compare OSDs for EC as there may be more than one shard on
-      //       each OSD.
-      // Will fix in next PR.
-      if (t.acting[i] == t.acting_primary) {
-        primary_shard.emplace(i);
-        sub_reads.emplace(*primary_shard, orig_op->ops.size());
-      }
-    }
-
-    // No primary???  Let the normal code paths deal with this.
-    if (!primary_shard) {
-      ldout(cct, DBG_LVL) << __func__ <<" ABORT: Can't find primary" << dendl;
-      abort = true;
-    }
+  if (primary_required && reference_sub_read == -1) {
+    // _calc_target will have picked the primary by default on EC. The "primary"
+    // on replica is an arbitrary shard.
+    reference_sub_read = (int)target.actual_pgid.shard;
+    sub_reads.emplace(reference_sub_read, orig_op->ops.size() + 1);
   }
+  
+  ceph_assert(reference_sub_read != -1);
 }
 
-ECSplitOp::ECSplitOp(Objecter::Op *op, Objecter &objecter, CephContext *cct, int count) :
-  SplitOp(op, objecter, cct, count) {}
-
 #undef dout_prefix
 #define dout_prefix *_dout << " ReplicaSplitOp::"
 
-std::pair<SplitOp::extent_set, bufferlist> ReplicaSplitOp::assemble_buffer_sparse_read(int ops_index) {
+/**
+ * @brief Assemble sparse read results from replicas.
+ *
+ * Collects extent maps and buffers from each sub-operation. Results are
+ * assembled in order by appending, maintaining buffer efficiency.
+ *
+ * @param ops_index Index of the operation in the operation list
+ * @return Pair containing the combined extent set and buffer list
+ */
+std::pair<SplitOp::extent_set, bufferlist> ReplicaSplitOp::assemble_buffer_sparse_read(int ops_index) const {
   extent_set extents_out;
   bufferlist bl_out;
 
-  for (auto && [shard, sr] : sub_reads) {
-    for (auto [off, len] : *sr.details[ops_index].e) {
+  for (auto && [acting_index, sr] : sub_reads) {
+    for (auto [off, len] : *sr.details.at(ops_index).e) {
       extents_out.insert(off, len);
     }
-    bl_out.append(sr.details[ops_index].bl);
+    bl_out.append(sr.details.at(ops_index).bl);
   }
 
   return std::pair(extents_out, bl_out);
 }
 
-void ReplicaSplitOp::assemble_buffer_read(bufferlist &bl_out, int ops_index) {
-  for (auto && [_, sr] : sub_reads) {
-    bl_out.append(sr.details[ops_index].bl);
+/**
+ * @brief Assemble dense read results from replicas.
+ *
+ * Appends buffers from each sub-operation in order.
+ *
+ * @param bl_out Output buffer to append assembled data
+ * @param ops_index Index of the operation in the operation list
+ */
+void ReplicaSplitOp::assemble_buffer_read(bufferlist &bl_out, int ops_index) const {
+  for (auto && [acting_index, sr] : sub_reads) {
+    bl_out.append(sr.details.at(ops_index).bl);
   }
 }
 
-ReplicaSplitOp::ReplicaSplitOp(Objecter::Op *op, Objecter &objecter, CephContext *cct, int pool_size) :
-  SplitOp(op, objecter, cct, pool_size) {
-
-  // This may not actually be the primary, but since all shards are kept current
-  // in replica, it does not actually matter which we choose here. Choose 0 since
-  // there will always be a read to this shard.
-  primary_shard = shard_id_t(0);
-}
-
+/**
+ * @brief Initialize read sub-operations for replicated pool.
+ *
+ * Divides the operation into chunks and distributes them across available
+ * replicas for parallel execution. Chunk size is calculated based on the
+ * minimum shard read size configuration and number of available OSDs.
+ * Chunks are aligned to page boundaries for efficiency.
+ *
+ * If the operation is too small to benefit from splitting across all replicas,
+ * fewer OSDs are used with a random starting offset for load balancing.
+ *
+ * @param op Operation descriptor containing offset and length
+ * @param sparse Whether this is a sparse read operation
+ * @param ops_index Index of the operation in the operation list
+ */
 void ReplicaSplitOp::init_read(OSDOp &op, bool sparse, int ops_index) {
 
-  auto &t = orig_op->target;
+  auto &target = orig_op->target;
+  const pg_pool_t *pi = objecter.osdmap->get_pg_pool(target.base_oloc.pool);
+  ceph_assert(pi);
 
   std::set<int> osds;
-  for (int direct_osd : t.acting) {
+  for (int direct_osd : target.acting) {
     if (objecter.osdmap->exists(direct_osd)) {
       osds.insert(direct_osd);
     }
@@ -208,18 +298,31 @@ void ReplicaSplitOp::init_read(OSDOp &op, bool sparse, int ops_index) {
     return;
   }
 
+  uint64_t replica_min_shard_read_size
+    = objecter.get_min_split_replica_read_size();
+
   uint64_t offset = op.op.extent.offset;
   uint64_t length = op.op.extent.length;
-  uint64_t slice_count = std::min(length / kReplicaMinShardReadSize, osds.size());
+  uint64_t slice_count = replica_min_shard_read_size == 0 ? 1 :
+                          std::min(length / replica_min_shard_read_size,
+                                   osds.size());
   uint64_t chunk_size = p2roundup(length / slice_count, (uint64_t)CEPH_PAGE_SIZE);
+  unsigned start = 0;
 
-  for (unsigned i = 0; i < osds.size() && length > 0; i++) {
+  if (slice_count < osds.size()) {
+    start = rand() % osds.size();
+  }
 
-    shard_id_t shard(i);
-    if (!sub_reads.contains(shard)) {
-      sub_reads.emplace(shard, orig_op->ops.size());
+  for (unsigned i = start; length > 0; i = (i + 1 == osds.size()) ? 0 : i + 1) {
+    int acting_index = i;
+    if (!sub_reads.contains(acting_index)) {
+      sub_reads.emplace(acting_index, orig_op->ops.size() + 1);
+      // Set reference_sub_read to the first index we use
+      if (reference_sub_read == -1) {
+        reference_sub_read = acting_index;
+      }
     }
-    auto &sr = sub_reads.at(shard);
+    auto &sr = sub_reads.at(acting_index);
     auto bl = &sr.details[ops_index].bl;
     auto rval = &sr.details[ops_index].rval;
     uint64_t len = std::min(length, chunk_size);
@@ -237,34 +340,149 @@ void ReplicaSplitOp::init_read(OSDOp &op, bool sparse, int ops_index) {
 #undef dout_prefix
 #define dout_prefix *_dout << " SplitOp::"
 
-int SplitOp::assemble_rc() {
+/**
+ * @brief Assemble the final return code from all sub-operations.
+ *
+ * Returns the first non-EAGAIN error encountered, or EAGAIN if any
+ * sub-operation returned EAGAIN or if version mismatch is detected.
+ * Otherwise returns the reference sub-operation's return code.
+ *
+ * @return Combined return code for the operation
+ */
+int SplitOp::assemble_rc() const {
   int rc = 0;
-  bool rc_zero = false;
-
-  // This should only happen on a single thread.
-  for (auto & [_, sub_read] : sub_reads) {
-    if (rc >= 0 && sub_read.rc >= 0) {
-      rc += sub_read.rc;
-      if (sub_read.rc == 0) {
-        rc_zero = true;
-      }
-    } else if (rc >= 0) {
+  bool eagain = false;
+
+  // Sub-reads which use a single op should re-use original op.
+  ceph_assert(sub_reads.size() > 1);
+
+  // Pick the first bad RC, otherwise return 0.
+  for (auto & [index, sub_read] : sub_reads) {
+    if (sub_read.rc == -EAGAIN) {
+      eagain = true;
+    } else if (sub_read.rc < 0) {
+      return sub_read.rc;
+    }
+
+    // The non-reference indices only get reads, which only ever have zero RCs.
+    if (index == reference_sub_read) {
       rc = sub_read.rc;
-    } // else ignore subsequent errors.
+    }
   }
 
-  if (rc >= 0 && rc_zero) {
-    return 0;
+  if (eagain || version_mismatch()) {
+    return -EAGAIN;
   }
-
   return rc;
 }
 
+/**
+ * @brief Check for version mismatches across EC shards.
+ *
+ * Implements torn read detection by comparing internal versions returned
+ * by each shard. The reference shard maintains a map of all shard versions,
+ * allowing detection of inconsistencies across the EC stripe.
+ *
+ * @return true if versions mismatch, false if consistent
+ */
+bool ECSplitOp::version_mismatch() const {
+  // First we need to decode the version list from the reference.
+  ceph_assert(reference_sub_read != -1);
+  ceph_assert(sub_reads.at(reference_sub_read).internal_version.has_value());
+
+  std::map<shard_id_t, eversion_t> ref_vers;
+  decode(ref_vers, sub_reads.at(reference_sub_read).internal_version->bl);
+
+  for (auto & [shard_index, sub_read] : sub_reads) {
+      // Reference shard can't be different to itself.
+    if (shard_index == reference_sub_read) {
+      continue;
+    }
+
+    ceph_assert(sub_read.internal_version.has_value());
+    std::map<shard_id_t, eversion_t> shard_vers;
+    decode(shard_vers, sub_read.internal_version->bl);
+
+    shard_id_t shard(shard_index);
+    if (!ref_vers.contains(shard)) {
+      ldout(cct, DBG_LVL) << __func__ << ": "
+         << "Reference shard version missing, failing split op." << dendl;
+      return true;
+    }
+    if (!shard_vers.contains(shard)) {
+      ldout(cct, DBG_LVL) << __func__ << ": "
+         << "Shard version missing, failing split op." << dendl;
+      return true;
+    }
+    if (ref_vers.at(shard) != shard_vers.at(shard)) {
+      ldout(cct, DBG_LVL) << __func__ << ": "
+               << "Primary version (" << ref_vers.at(shard) << ") != "
+               << "shard version (" << shard_vers.at(shard) <<") "
+               << "for shard " << shard << dendl;
+      return true;
+    }
+  }
+  return false;
+}
+
+/**
+ * @brief Check for version mismatches across replicas.
+ *
+ * For replicated pools, checks that all replicas returned the same version.
+ * No single replica maintains a reference version list, so the first replica's
+ * version becomes the reference for comparison.
+ *
+ * @return true if versions mismatch, false if consistent
+ */
+bool ReplicaSplitOp::version_mismatch() const {
+  std::optional<eversion_t> ref_version;
+  constexpr shard_id_t NO_SHARD(-1);
+
+  for (const auto& [acting_index, sub_read] : sub_reads) {
+    ceph_assert(sub_read.internal_version.has_value());
+    std::map<shard_id_t, eversion_t> shard_vers;
+    decode(shard_vers, sub_read.internal_version->bl);
+
+    if (!shard_vers.contains(NO_SHARD)) {
+      ldout(cct, DBG_LVL) << __func__ << ": "
+        << "Replica version missing for acting index, failing split op." << dendl;
+      return true;
+    }
+
+    if (!ref_version) {
+      ref_version = shard_vers.at(NO_SHARD);
+    } else if (*ref_version != shard_vers.at(NO_SHARD)) {
+      return true;
+    }
+  }
+
+  return false;
+}
+
+/**
+ * @brief Complete the split operation by assembling and returning results.
+ *
+ * Assembles results from all sub-operations and invokes the original operation's
+ * completion handlers. For successful operations, results are assembled based on
+ * operation type (SPARSE_READ, READ, or other). For failures or version mismatches,
+ * triggers retry through the normal operation path.
+ *
+ * The buffer handling maintains compatibility with librados expectations, including
+ * special handling for pre-allocated buffers used by RadosStriper.
+ *
+ * @note Only called for successfully sent operations where abort=false
+ */
 void SplitOp::complete() {
+  // STAGE 6: complete() only runs for successfully sent operations.
+  // If abort was set during creation, the split op was discarded and
+  // complete() is never called. This check handles the edge case where
+  // abort might be set after creation but before sending (though this
+  // should not happen in the current implementation).
   if (abort) {
     return;
   }
-  ldout(cct, 20) << __func__ << " entry this=" << this << dendl;
+  ldout(cct, DBG_LVL) << __func__ << " object_id=" << orig_op->target.base_oid << " entry this=" << this << dendl;
+  boost::system::error_code handler_error;
 
   int rc = assemble_rc();
   if (rc >= 0) {
@@ -274,6 +492,13 @@ void SplitOp::complete() {
     // so as to reproduce as much as possible of the IO completion.
     std::vector out_ops(orig_op->ops.begin(), orig_op->ops.end());
 
+    std::optional<bufferlist> read_bl;
+    if (orig_op->outbl) {
+      read_bl = bufferlist();
+    }
+
+    // Copy is necessary: process_op_reply_handlers() reads out_ops while
+    // modifying orig_op state. Overhead is acceptable for typical 1-3 ops.
     for (unsigned ops_index=0; ops_index < out_ops.size(); ++ops_index) {
       auto &out_osd_op = out_ops[ops_index];
       switch (out_osd_op.op.op) {
@@ -281,104 +506,307 @@ void SplitOp::complete() {
           auto [extents, bl] = assemble_buffer_sparse_read(ops_index);
           encode(std::move(extents).detach(), out_osd_op.outdata);
           encode_destructively(bl, out_osd_op.outdata);
+          if (read_bl) {
+            read_bl->append(out_osd_op.outdata);
+          }
           break;
         }
         case CEPH_OSD_OP_READ: {
           assemble_buffer_read(out_osd_op.outdata, ops_index);
+          if (read_bl) {
+            read_bl->append(out_osd_op.outdata);
+          }
           break;
         }
-        case CEPH_OSD_OP_GETXATTRS:
-        case CEPH_OSD_OP_CHECKSUM:
-        case CEPH_OSD_OP_GETXATTR: {
-          out_osd_op.outdata = sub_reads.at(*primary_shard).details[ops_index].bl;
-          out_osd_op.rval = sub_reads.at(*primary_shard).details[ops_index].rval;
+        default: {
+          out_osd_op.outdata = std::move(sub_reads.at(reference_sub_read).details[ops_index].bl);
+          out_osd_op.rval = sub_reads.at(reference_sub_read).details[ops_index].rval;
           break;
         }
-      default: {
-          ceph_abort_msg("Not supported");
-          break;
+      }
+    }
+
+    // Copied from Objecter::handle_osd_op_reply() to match correct API behaviour.
+    // This is policed in the librados test harness.
+    if (orig_op->outbl) {
+      ceph_assert(read_bl);
+      // Note: Objecter will check for a single buffer here - this will always
+      //       be true. Here, there will frequently be multiple buffers due
+      //       to the splitting and the original buffer still needs to be
+      //       honoured.
+      if (orig_op->outbl->length() == read_bl->length()) {
+        // this is here to keep previous users to *relied* on getting data
+        // read into existing buffers happy.  Notably,
+        // libradosstriper::RadosStriperImpl::aio_read().
+        ldout(cct,10) << __func__ << " copying resulting " << read_bl->length()
+                      << " into existing ceph::buffer of length " << orig_op->outbl->length()
+                      << dendl;
+        // The following seems a little convoluted, but the assumption is that
+        // there is a good reason why Sage Weil wrote it this way in Objecter.
+        bufferlist t;
+        t = std::move(*orig_op->outbl);
+        t.invalidate_crc();  // we're overwriting the raw buffers via c_str()
+        read_bl->begin().copy(read_bl->length(), t.c_str());
+        orig_op->outbl->substr_of(t, 0, read_bl->length());
+      } else {
+        // librados insists that if it provided a buffer and the client is
+        // going to be returning a buffer, that this buffer must be
+        // contiguous.
+        if (orig_op->outbl->length() != 0) {
+          read_bl->rebuild();
         }
+        orig_op->outbl->substr_of(*read_bl, 0, read_bl->length());
       }
+      orig_op->outbl = 0;
     }
 
-    objecter.handle_osd_op_reply2(orig_op, out_ops);
+    handler_error = objecter.process_op_reply_handlers(orig_op, out_ops);
+    ldout(cct, DBG_LVL) << __func__ << " object_id=" << orig_op->target.base_oid << " success this=" << this << " rc=" << rc << dendl;
+  } else {
+    ldout(cct, DBG_LVL) << __func__ << " object_id=" << orig_op->target.base_oid << " retry this=" << this << " rc=" << rc << dendl;
+  }
+  objecter.op_post_split_op_complete(orig_op, handler_error, rc);
+}
+
+/**
+ * @brief Add version tracking to sub-operations for torn read protection.
+ *
+ * Adds internal version queries to all sub-operations to enable detection of
+ * version mismatches. If sub-operations read different versions of the object
+ * (torn read), the operation will be retried.
+ */
+void SplitOp::protect_torn_reads() {
+  // If multiple reads are emitted from objecter, then it is essential that
+  // each read reads the same version. It is not possible to efficiently
+  // guarantee this, so instead read the version along with the data and if they
+  // are different, then repeat the read to the primary. Such version mismatches
+  // should be rare enough that this is not a significant performance impact.
+  for (auto&& [index, sr] : sub_reads) {
+    auto &internal_version = sr.internal_version;
+    internal_version = std::make_optional<InternalVersion>();
+    sr.rd.get_internal_versions(&internal_version->ec, &internal_version->bl);
+  }
+}
 
-    ldout(cct, DBG_LVL) << __func__ << " success this=" << this << " rc=" << rc << dendl;
-    Objecter::Op::complete(std::move(orig_op->onfinish), osdcode(rc), rc, objecter.service.get_executor());
-    objecter._finish_op(orig_op, rc);
+void SplitOp::init(OSDOp &op, int ops_index) {
+  switch (op.op.op) {
+  case CEPH_OSD_OP_SPARSE_READ: {
+    init_read(op, true, ops_index);
+    break;
+  }
+  case CEPH_OSD_OP_READ: {
+    init_read(op, false, ops_index);
+    break;
+  }
+  default: {
+    // Invalid ops should have been rejected in validate.
+    Details &d = sub_reads.at(reference_sub_read).details[ops_index];
+    orig_op->pass_thru_op(sub_reads.at(reference_sub_read).rd, ops_index, &d.bl, &d.rval);
+    break;
+  }
+  }
+}
+
+#undef dout_prefix
+#define dout_prefix *_dout << " SplitOp::"
+
+namespace {
+std::pair<bool, bool> is_single_chunk(const pg_pool_t *pi, uint64_t offset, uint64_t len) {
+  if (!pi->is_erasure()) {
+    return {false, false};
+  }
+
+  uint64_t stripe_width = pi->get_stripe_width();
+
+  // Optimization: Use stripe_width / 2 as a threshold to quickly reject requests
+  // that cannot fit in a single chunk. Since k (data chunks) is at least 2,
+  // chunk_size = stripe_width / k <= stripe_width / 2. This early check avoids
+  // the more expensive division operation (stripe_width / data_chunk_count) below.
+  if (len > stripe_width / 2) {
+    return {false, false};
+  }
+  uint64_t data_chunk_count = pi->get_ec_data_shard_count();
+  uint32_t chunk_size = pi->get_stripe_width() / data_chunk_count;
+
+  // Chunk_size should never be zero, so this is paranoia.
+  if (len > chunk_size || chunk_size == 0) {
+    return {false, false};
+  }
+
+  uint64_t offset_to_end_of_chunk;
+
+  // Chunk size is normally, but not always a power of 2.
+  if (std::has_single_bit(chunk_size)) {
+    offset_to_end_of_chunk = chunk_size - (offset & (chunk_size - 1));
   } else {
-    ldout(cct, DBG_LVL) << __func__ << " retry this=" << this << " rc=" << rc << dendl;
-    objecter.op_post_submit(orig_op);
+    offset_to_end_of_chunk = chunk_size - (offset % chunk_size);
   }
+
+  if (len > offset_to_end_of_chunk) {
+    return {false, false};
+  }
+
+  return {true, offset % stripe_width < chunk_size};
 }
 
-static bool validate(Objecter::Op *op, bool is_erasure, CephContext *cct) {
+/**
+ * Validate operation flags for split operation eligibility.
+ *
+ * Checks that the operation has the required BALANCE_READS flag and
+ * is not a write operation.
+ *
+ * @param pi The pool to validate against
+ * @param op The operation to validate
+ * @param cct CephContext for logging
+ * @return true if flags are valid, false otherwise
+ */
+bool validate_flags(const pg_pool_t *pi, Objecter::Op *op, CephContext *cct) {
+  if ((op->target.flags & CEPH_OSD_FLAG_BALANCE_READS) == 0) {
+    ldout(cct, DBG_LVL) << __func__ << " REJECT: Client rejects balanced read" << dendl;
+    return false;
+  }
+
+  // We really should not have a WRITE flagged as balanced read, but out of
+  // paranoia ignore it.
+  if (op->target.flags & CEPH_OSD_FLAG_WRITE) {
+    ldout(cct, DBG_LVL) << __func__ << " REJECT: Flagged as write!" << dendl;
+    return false;
+  }
 
-  if ((op->target.flags & CEPH_OSD_FLAG_BALANCE_READS) == 0 ) {
-    ldout(cct, DBG_LVL) << __func__ <<" REJECT: Client rejects balanced read" << dendl;
+  if (pi->has_flag(pg_pool_t::FLAG_CRIMSON)) {
+    ldout(cct, DBG_LVL) << __func__ <<" ABORT: Crimson doesn't support"
+                                      " direct reads" << dendl;
     return false;
   }
 
-  uint64_t suitable_read_found = false;
-  for (auto & o : op->ops) {
+  return true;
+}
+
+/**
+ * Validate operation types and check read size constraints.
+ *
+ * Processes all operations in the op list, checking for:
+ * - Supported operation types (READ, SPARSE_READ, and various metadata ops)
+ * - Valid read sizes (non-zero length, meets minimum size requirements)
+ * - Single chunk constraints for erasure coded pools
+ *
+ * @param op The operation to validate
+ * @param pi Pool information
+ * @param is_erasure Whether the pool is erasure coded
+ * @param replica_min_read_size Minimum read size for replica pools
+ * @param cct CephContext for logging
+ * @param[out] has_primary_ops Set to true if primary-only ops are found
+ * @param[out] single_direct_op Set to true if op can be sent directly to single OSD
+ * @return true if a suitable read operation was found, false otherwise
+ */
+bool validate_operations(Objecter::Op *op, const pg_pool_t *pi, bool is_erasure,
+                        uint64_t replica_min_read_size, CephContext *cct,
+                        bool &has_primary_ops, bool &single_direct_op) {
+  bool is_first_chunk = true;
+  bool suitable_read_found = false;
+
+  for (auto &o : op->ops) {
     switch (o.op.op) {
       case CEPH_OSD_OP_READ:
       case CEPH_OSD_OP_SPARSE_READ: {
         uint64_t length = o.op.extent.length;
+        if (length == 0) {
+          // length of zero actually means "the whole object". This code cannot
+          // know the size of the object efficiently, so reject the op.
+          ldout(cct, DBG_LVL) << __func__ << " REJECT: Zero length read" << dendl;
+          return false;
+        }
         if ((is_erasure && length > 0) ||
-            (!is_erasure && length >= kReplicaMinReadSize)) {
+            (!is_erasure && length >= replica_min_read_size)) {
           suitable_read_found = true;
         }
+        if (single_direct_op) {
+          auto [single_chunk, first_chunk] = is_single_chunk(pi, o.op.extent.offset, o.op.extent.length);
+          is_first_chunk = is_first_chunk && first_chunk;
+          single_direct_op = single_direct_op && single_chunk;
+        }
         break;
       }
       case CEPH_OSD_OP_GETXATTRS:
       case CEPH_OSD_OP_CHECKSUM:
-      case CEPH_OSD_OP_GETXATTR: {
+      case CEPH_OSD_OP_GETXATTR:
+      case CEPH_OSD_OP_STAT:
+      case CEPH_OSD_OP_CMPXATTR:
+      case CEPH_OSD_OP_CALL: {
+        has_primary_ops = true;
         break; // Do not block validate.
       }
       default: {
-        ldout(cct, DBG_LVL) << __func__ <<" REJECT: unsupported op" << dendl;
+        ldout(cct, DBG_LVL) << __func__ << " REJECT: unsupported op" << dendl;
         return false;
       }
     }
   }
 
+  if (single_direct_op && has_primary_ops) {
+    single_direct_op = is_first_chunk;
+  }
+
   return suitable_read_found;
 }
 
-void SplitOp::init(OSDOp &op, int ops_index) {
-  switch (op.op.op) {
-    case CEPH_OSD_OP_SPARSE_READ: {
-      init_read(op, true, ops_index);
-      break;
-    }
-    case CEPH_OSD_OP_READ: {
-      init_read(op, false, ops_index);
-      break;
-    }
-    case CEPH_OSD_OP_GETXATTRS:
-    case CEPH_OSD_OP_CHECKSUM:
-    case CEPH_OSD_OP_GETXATTR: {
-      shard_id_t shard = *primary_shard;
-      Details &d = sub_reads.at(shard).details[ops_index];
-      orig_op->pass_thru_op(sub_reads.at(shard).rd, ops_index, &d.bl, &d.rval);
-      break;
-    }
-    default: {
-      ldout(cct, DBG_LVL) << __func__ <<" ABORT: unsupported" << dendl;
-      abort = true;
-      break;
-    }
+/**
+ * Validate if an operation is eligible for split operation optimization.
+ *
+ * This function performs a multi-stage validation to determine if an operation
+ * can be split across multiple OSDs for improved read performance:
+ * 1. Validates operation flags (BALANCE_READS required, no WRITE flag)
+ * 2. For replicated pools, rejects if min_split_replica_read_size is 0 (splitting disabled)
+ * 3. Validates operation types and read size constraints
+ *
+ * @param op The operation to validate
+ * @param objecter Objecter instance for configuration access
+ * @param pi Pool information
+ * @param cct CephContext for logging
+ * @return A pair of bools: {suitable_read_found, single_direct_op}
+ *         - suitable_read_found: true if operation contains valid read(s)
+ *         - single_direct_op: true if operation can be sent to single OSD
+ */
+std::pair<bool, bool> validate(Objecter::Op *op, Objecter &objecter,
+                               const pg_pool_t *pi, CephContext *cct) {
+  bool is_erasure = pi->is_erasure();
+
+  // Validate flags
+  if (!validate_flags(pi, op, cct)) {
+    return {false, false};
   }
+
+  // Initialize state for operation validation
+  bool has_primary_ops = nullptr != op->objver;
+  bool single_direct_op = is_erasure;
+
+  uint64_t replica_min_shard_read_size = objecter.get_min_split_replica_read_size();
+  
+  if (!is_erasure && replica_min_shard_read_size == 0) {
+    ldout(cct, DBG_LVL) << __func__ << " REJECT: splitting disabled (min_split_replica_read_size=0)" << dendl;
+    return {false, false};
+  }
+  
+  uint64_t replica_min_read_size = replica_min_shard_read_size * kReplicaMinShardReads;
+
+  // Validate operations and read sizes
+  bool suitable_read_found = validate_operations(op, pi, is_erasure,
+                                                 replica_min_read_size, cct,
+                                                 has_primary_ops, single_direct_op);
+
+  return {suitable_read_found, single_direct_op};
 }
 
-namespace {
 void debug_op_summary(const std::string &str, Objecter::Op *op, CephContext *cct) {
-  auto &t = op->target;
+  auto &target = op->target;
   ldout(cct, DBG_LVL) << str
-    << " osd=" << t.osd
-    << " shard=" << (t.force_shard ? *t.force_shard : shard_id_t(-1))
-    << " balance_reads=" << ((t.flags & CEPH_OSD_FLAG_BALANCE_READS) != 0)
+    << " object_id=" << target.base_oid
+    << " tid=" << op->tid
+    << " pool=" << target.base_oloc.pool
+    << " pgid=" << target.actual_pgid
+    << " osd=" << target.osd
+    << " force_osd=" << ((target.flags & CEPH_OSD_FLAG_FORCE_OSD) != 0)
+    << " balance_reads=" << ((target.flags & CEPH_OSD_FLAG_BALANCE_READS) != 0)
     << " ops.size()=" << op->ops.size()
     << " needs_version=" << (op->objver?"true":"false");
 
@@ -398,19 +826,106 @@ void debug_op_summary(const std::string &str, Objecter::Op *op, CephContext *cct
 }
 }
 
+/**
+ * @brief Prepare a single-chunk operation for direct execution.
+ *
+ * When an operation can be satisfied by reading from a single OSD (e.g.,
+ * single chunk in EC pool), configures the operation for direct execution
+ * without splitting. Sets EC_DIRECT_READ and FORCE_OSD flags and updates
+ * the target OSD and shard ID.
+ *
+ * This optimization avoids split operation overhead when data can be
+ * retrieved from a single shard.
+ *
+ * @param op Operation to prepare
+ * @param objecter Objecter instance
+ * @param cct CephContext for logging
+ */
+void SplitOp::prepare_single_op(Objecter::Op *op, Objecter &objecter, CephContext *cct) {
+  auto &target = op->target;
+  const pg_pool_t *pi = objecter.osdmap->get_pg_pool(target.base_oloc.pool);
+  ceph_assert(pi);
+
+  objecter._calc_target(&op->target, op);
+  uint64_t data_chunk_count = pi->get_ec_data_shard_count();
+  uint32_t chunk_size = pi->get_stripe_width() / data_chunk_count;
 
+  // Find the first read to work out where the IO goes.
+  for (auto o : op->ops) {
+    if (o.op.op == CEPH_OSD_OP_SPARSE_READ ||
+        o.op.op == CEPH_OSD_OP_READ) {
+      raw_shard_id_t raw_shard((o.op.extent.offset) / chunk_size % data_chunk_count);
+      shard_id_t shard = pi->get_shard(raw_shard);
+      if (shard != shard_id_t::NO_SHARD) {
+        int acting_index = (int)shard;
+        if (objecter.osdmap->exists(op->target.acting[acting_index])) {
+          op->target.flags |= CEPH_OSD_FLAG_EC_DIRECT_READ;
+          op->target.flags |= CEPH_OSD_FLAG_FORCE_OSD;
+          target.osd = target.acting[acting_index];
+          target.actual_pgid.reset_shard(shard);
+          target.used_replica = (target.acting_primary != target.osd);
+        }
+      }
+      break;
+    }
+  }
+  debug_op_summary("reuse_op:", op, cct);
+}
+
+/**
+ * Create and initialize a split operation for parallel reads.
+ *
+ * This function implements the abort flag pattern to efficiently handle
+ * validation and creation failures:
+ *
+ * STAGE 1: Cheap validation tests (lines 643-666)
+ * - Check pool exists
+ * - Check split reads are enabled
+ * - Validate operation types and parameters
+ * - Return false immediately if validation fails (no split op created)
+ *
+ * STAGE 2: Create split op object (lines 668-674)
+ * - Allocate ECSplitOp or ReplicaSplitOp based on pool type
+ * - Constructor may detect issues and set abort flag
+ *
+ * STAGE 3: Check abort after construction (lines 676-679)
+ * - If abort is set during construction, discard split op and return false
+ * - This catches issues that can only be detected during object creation
+ *
+ * STAGE 4: Initialize sub-operations (lines 681-690)
+ * - Call init() for each operation in the request
+ * - init_read() may set abort if OSDs are missing or state is invalid
+ * - Break early if abort is detected to avoid unnecessary work
+ *
+ * STAGE 5: Final abort check (lines 692-695)
+ * - If abort was set during initialization, discard split op and return false
+ * - Allows fallback to normal (non-split) operation path
+ *
+ * STAGE 6: Send operations (lines 697-747)
+ * - Only reached if abort=false, meaning split op is valid
+ * - complete() will only run for these successfully sent operations
+ *
+ * @return true if split op was created and sent, false to use normal operation
+ */
 bool SplitOp::create(Objecter::Op *op, Objecter &objecter,
-  shunique_lock<ceph::shared_mutex>& sul, ceph_tid_t *ptid, int *ctx_budget, CephContext *cct) {
+  shunique_lock<ceph::shared_mutex>& sul, CephContext *cct) {
 
-  auto &t = op->target;
-  const pg_pool_t *pi = objecter.osdmap->get_pg_pool(t.base_oloc.pool);
+  auto &target = op->target;
+  const pg_pool_t *pi = objecter.osdmap->get_pg_pool(target.base_oloc.pool);
 
+  // STAGE 1: Cheap validation tests run first before creating split op
   if (!pi) {
     ldout(cct, DBG_LVL) << __func__ <<" REJECT: No Pool" << dendl;
     return false;
   }
 
-  debug_op_summary("orig_op: ", op, cct);
+  debug_op_summary("orig_op:", op, cct);
+
+  // Reject if the operation is a snapshot operation
+  if (op->snapid != CEPH_NOSNAP || !op->snapc.empty()) {
+    ldout(cct, DBG_LVL) << __func__ <<" REJECT: snapshot operation" << dendl;
+    return false;
+  }
 
   // Reject if direct reads not supported by profile.
   if (!pi->has_flag(pg_pool_t::FLAG_CLIENT_SPLIT_READS)) {
@@ -418,12 +933,19 @@ bool SplitOp::create(Objecter::Op *op, Objecter &objecter,
     return false;
   }
 
-  bool validated = validate(op, pi->is_erasure(), cct);
+  auto [validated, single_op] = validate(op, objecter, pi, cct);
 
   if (!validated) {
     return false;
   }
 
+  if (single_op) {
+    ldout(cct, DBG_LVL) << __func__ <<" reusing original op " << dendl;
+    prepare_single_op(op, objecter, cct);
+    return false;
+  }
+
+  // STAGE 2: Create split op object (may set abort during construction)
   std::shared_ptr<SplitOp> split_read;
 
   if (pi->is_erasure()) {
@@ -432,15 +954,17 @@ bool SplitOp::create(Objecter::Op *op, Objecter &objecter,
     split_read = std::make_shared<ReplicaSplitOp>(op, objecter, cct, pi->size);
   }
 
+  // STAGE 3: Check if abort was set during construction
   if (split_read->abort) {
-    ldout(cct, DBG_LVL) << __func__ <<" ABORTED 1" << dendl;return false;
+    ldout(cct, DBG_LVL) << __func__ <<" ABORTED 1" << dendl;
     return false;
   }
 
   // Populate the target, to extract the acting set from it.
-  t.flags &= ~CEPH_OSD_FLAG_BALANCE_READS;
+  target.flags &= ~CEPH_OSD_FLAG_BALANCE_READS;
   objecter._calc_target(&op->target, op);
 
+  // STAGE 4: Initialize sub-operations (may set abort if problems detected)
   for (unsigned i = 0; i < op->ops.size(); ++i) {
     split_read->init( op->ops[i], i);
     if (split_read->abort) {
@@ -448,39 +972,69 @@ bool SplitOp::create(Objecter::Op *op, Objecter &objecter,
     }
   }
 
+  // STAGE 5: Final abort check - discard split op if problems were detected
   if (split_read->abort) {
     ldout(cct, DBG_LVL) << __func__ <<" ABORTED 2" << dendl;
     return false;
   }
 
-  ldout(cct, DBG_LVL) << __func__ <<" sub_reads ready. count=" << split_read->sub_reads.size() << dendl;
+  // Ideally, the validate should detect any single-op read. However, if that
+  // fails, then this will catch the cases (albeit less efficiently).
+  if (split_read->sub_reads.size() <= 1) {
+    ldout(cct, DBG_LVL) << __func__ <<" reusing original op - inefficient" << dendl;
+    prepare_single_op(op, objecter, cct);
+    split_read->abort = true; // Required for destructor.
+    return false;
+  }
+
+  objecter.add_op_to_splitop_session(op);
+
+  split_read->protect_torn_reads();
+
+
+  op->split_op_tids = std::make_unique<std::vector<ceph_tid_t>>(split_read->sub_reads.size());
+  auto &tids = *op->split_op_tids;
+
+  int i=0;
 
   // We are committed to doing a split read. Any re-attempts should not be either
   // split or balanced.
-  for (auto && [shard, sub_read] : split_read->sub_reads) {
+  for (auto && [index, sub_read] : split_read->sub_reads) {
     auto fin = new Finisher(split_read, sub_read); // Self-destructs when called.
 
     version_t *objver = nullptr;
-    if (split_read->primary_shard && shard == *split_read->primary_shard) {
+    if (index == split_read->reference_sub_read) {
       objver = split_read->orig_op->objver;
     }
 
     auto sub_op = objecter.prepare_read_op(
-      t.base_oid, t.base_oloc, split_read->sub_reads.at(shard).rd, op->snapid,
+      target.base_oid, target.base_oloc, split_read->sub_reads.at(index).rd, op->snapid,
       nullptr, split_read->flags, -1, fin, objver);
-    sub_op->target.force_shard.emplace(shard);
+
+    auto &st = sub_op->target;
+    st = target; // Target can start off in same state as parent.
+    st.flags |= CEPH_OSD_FLAG_FORCE_OSD;
+    st.flags |= CEPH_OSD_FLAG_FAIL_ON_EAGAIN;
     if (pi->is_erasure()) {
-      sub_op->target.flags |= CEPH_OSD_FLAG_EC_DIRECT_READ;
+      st.flags |= CEPH_OSD_FLAG_EC_DIRECT_READ;
+      st.actual_pgid.reset_shard(shard_id_t(index));
     } else {
-      sub_op->target.flags |= CEPH_OSD_FLAG_BALANCE_READS;
+      st.flags |= CEPH_OSD_FLAG_BALANCE_READS;
     }
+    st.osd = st.acting[index];
+
+    target.used_replica = (st.acting_primary != st.osd);
+
+    objecter._op_submit(sub_op, sul, &tids[i++]);
 
-    objecter._op_submit_with_budget(sub_op, sul, ptid, ctx_budget);
     debug_op_summary("sent_op", sub_op, cct);
   }
 
   ceph_assert(split_read->sub_reads.size() > 0);
 
+  ldout(cct, DBG_LVL) << __func__ << " object_id=" << target.base_oid
+    << " assigned parent tid=" << op->tid << dendl;
+
   return true;
 }
 
index 6f3d9a199ec88c450319576b7bf608f7cdf74409..9ae19d8566ceee09b7d0d5b029c1c6fd1a4a03f4 100644 (file)
 
 #pragma once
 
+#include <algorithm>
+#include <iterator>
+#include <map>
+#include <memory>
+#include <optional>
 #include <ostream>
-#include <ranges>
+#include <utility>
+#include <vector>
 
+ #include <boost/system/error_code.hpp>
+
+#include "include/buffer_fwd.h"
+#include "include/ceph_assert.h"
+#include "include/interval_set.h"
+#include "include/types.h"
+
+#include "common/ceph_context.h"
+#include "common/ceph_mutex.h"
+#include "common/error_code.h"
+#include "common/mini_flat_map.h"
+#include "common/shunique_lock.h"
+
+#include "osd/ECTypes.h"
+#include "osd/osd_types.h"
+
+#include "Objecter.h"
+
+/**
+ * @class SplitOp
+ * @brief Base class for splitting operations across multiple OSDs for improved performance.
+ *
+ * SplitOp implements a pattern for parallelizing operations by distributing them across
+ * multiple OSDs in a pool. This optimization is particularly effective for:
+ * - Erasure-coded pools: operations can be distributed across data shards
+ * - Replicated pools: large operations can be split across replicas
+ *
+ * The split operation pattern works as follows:
+ * 1. Validation: Check if the operation is eligible for splitting
+ * 2. Creation: Create sub-operations targeting different OSDs
+ * 3. Execution: Send sub-operations in parallel
+ * 4. Assembly: Reconstruct results from sub-operation responses
+ * 5. Completion: Return assembled results to the client
+ *
+ * Key features:
+ * - Consistency protection: Ensures all sub-operations see the same object version
+ * - Automatic fallback: Falls back to normal operation if splitting is not beneficial
+ * - Error handling: Properly handles partial failures and version mismatches
+ *
+ * Currently optimized for read operations, with potential for future write support.
+ *
+ * @see ECSplitOp for erasure-coded pool implementation
+ * @see ReplicaSplitOp for replicated pool implementation
+ */
 class SplitOp {
 
  protected:
@@ -21,28 +71,51 @@ class SplitOp {
   using extents_map = std::map<uint64_t, uint64_t>;
   using extent_set = interval_set<uint64_t, std::map, false>;
 
-  using ExtentPredicate = std::function<bool(const extent&)>;
-  using extent_map_subrange = std::ranges::subrange<extents_map::const_iterator>;
-  using extent_map_subrange_view = std::ranges::take_while_view<extent_map_subrange, ExtentPredicate>;
-  using extent_variant = std::variant<std::ranges::single_view<extent>, extent_map_subrange_view, extent_map_subrange>;
-  using buffer_appender = std::function<void(bufferlist &, uint64_t *, extent_variant)>;
-
-  // A simple struct to hold the data for each step of the iteration.
+  /**
+   * @struct ECChunkInfo
+   * @brief Information about a single chunk in an erasure-coded stripe iteration.
+   *
+   * This structure holds the mapping between logical object offsets and physical
+   * shard locations for a single chunk within an EC stripe. Used by ECStripeIterator
+   * to track position during stripe traversal.
+   */
   struct ECChunkInfo {
     uint64_t ro_offset;
     uint64_t shard_offset;
     uint64_t length;
-    shard_id_t shard;
+    raw_shard_id_t raw_shard;
 
-    friend std::ostream & operator<<(std::ostream &os, const ECChunkInfo &obj) {
+    friend std::ostream & operator<<(std::ostream &os, const ECChunkInfo &chunk_info) {
       return os
-          << "ro_offset: " << obj.ro_offset
-          << " shard_offset: " << obj.shard_offset
-          << " length: " << obj.length
-          << " shard: " << obj.shard;
+          << "ro_offset: " << chunk_info.ro_offset
+          << " shard_offset: " << chunk_info.shard_offset
+          << " length: " << chunk_info.length
+          << " raw_shard: " << (int)chunk_info.raw_shard;
     }
   };
 
+  /**
+   * @class ECStripeIterator
+   * @brief Iterator for traversing erasure-coded stripes chunk by chunk.
+   *
+   * ECStripeIterator provides a standard C++ iterator interface for walking through
+   * an erasure-coded stripe, yielding information about each chunk's location in both
+   * the logical object space and the physical shard space.
+   *
+   * The iterator handles the complex mapping between:
+   * - Logical object offsets (ro_offset)
+   * - Physical shard offsets (shard_offset)
+   * - Shard identifiers (raw_shard)
+   * - Chunk boundaries and lengths
+   *
+   * Algorithm:
+   * - Starts at a given offset and iterates through chunks sequentially
+   * - Automatically wraps around shards when reaching the end of a stripe
+   * - Handles partial chunks at the beginning and end of the range
+   * - Terminates when all requested data has been covered
+   *
+   * @note Conforms to std::input_iterator concept
+   */
   class ECStripeIterator {
    public:
     using iterator_category = std::input_iterator_tag;
@@ -66,11 +139,9 @@ class SplitOp {
       uint64_t chunk = start_offset / chunk_size;
       current_info.length = std::min(total_len, (chunk + 1) * chunk_size - start_offset);
 
-      // Maybe this is paranoia, as compiler would probably detect that this
-      // / and % could be done in a single op.
-      auto chunk_div = std::lldiv(chunk, data_chunk_count);
-      current_info.shard = shard_id_t(chunk_div.rem);
-      current_info.shard_offset = (chunk_div.quot) * chunk_size + start_offset % chunk_size;
+      current_info.raw_shard = raw_shard_id_t(chunk % data_chunk_count);
+      current_info.shard_offset = (chunk / data_chunk_count) * chunk_size +
+        start_offset % chunk_size;
     }
 
     value_type operator*() const {
@@ -83,10 +154,10 @@ class SplitOp {
       current_info.shard_offset += current_info.length - chunk_size;
       current_info.length = std::min(chunk_size, end_offset - current_info.ro_offset);
       ceph_assert(current_info.ro_offset <= end_offset);
-      ++current_info.shard;
-      if (unsigned(current_info.shard) == data_chunk_count) {
+      ++current_info.raw_shard;
+      if (std::cmp_equal((int)current_info.raw_shard, data_chunk_count)) {
         current_info.shard_offset += chunk_size;
-        current_info.shard = shard_id_t(0);
+        current_info.raw_shard = raw_shard_id_t(0);
       }
       return *this;
     }
@@ -114,8 +185,26 @@ class SplitOp {
     uint32_t data_chunk_count = 0;
   };
 
-
-  // The custom range class that provides begin() and end()
+  /**
+   * @class ECStripeView
+   * @brief Range view for iterating over erasure-coded stripes.
+   *
+   * ECStripeView provides a range-based interface for traversing an erasure-coded
+   * stripe. It encapsulates the stripe geometry (chunk size, data chunk count) and
+   * provides begin()/end() iterators for use in range-based for loops.
+   *
+   * Usage:
+   * @code
+   * ECStripeView stripe_view(offset, length, pool_info);
+   * for (auto chunk_info : stripe_view) {
+   *   // Process each chunk
+   * }
+   * @endcode
+   *
+   * @param offset Starting offset in the logical object
+   * @param length Total length to iterate over
+   * @param pi Pool information containing stripe geometry
+   */
   class ECStripeView {
    public:
     ECStripeView(
@@ -124,7 +213,7 @@ class SplitOp {
       const pg_pool_t *pi)
       : start_offset(offset),
         total_length(length),
-        data_chunk_count(pi->nonprimary_shards.size() + 1),
+        data_chunk_count(pi->get_ec_data_shard_count()),
         chunk_size(pi->get_stripe_width() / data_chunk_count) {
     }
 
@@ -147,6 +236,13 @@ class SplitOp {
   static_assert(std::input_iterator<ECStripeIterator>,
                 "ECStripeIterator does not conform to the std::input_iterator concept");
 
+  /**
+   * @struct Details
+   * @brief Holds response data and metadata for a single operation in a sub-read.
+   *
+   * Contains the buffer, return value, error code, and optional extent map
+   * returned from executing one operation within a SubRead.
+   */
   struct Details {
     bufferlist bl;
     int rval;
@@ -154,18 +250,46 @@ class SplitOp {
     std::optional<extents_map> e;
   };
 
+  /**
+   * @struct InternalVersion
+   * @brief Holds internal version information for torn read protection.
+   *
+   * Used to detect version mismatches across parallel sub-operations,
+   * ensuring all operations see the same object version.
+   */
+  struct InternalVersion {
+    boost::system::error_code ec;
+    bufferlist bl;
+  };
+
+  /**
+   * @struct SubRead
+   * @brief Represents a single sub-operation sent to one OSD.
+   *
+   * Contains the operation descriptor, response details for each operation,
+   * return code, and optional version information for consistency checking.
+   *
+   * @param count Number of operations in this sub-read
+   */
   struct SubRead {
     ::ObjectOperation rd;
     mini_flat_map<int, Details> details;
     int rc = -EIO;
+    std::optional<InternalVersion> internal_version;
 
     SubRead(int count) : details(count) {}
   };
 
-  // This structure self-destructs on each IO completions, using a legacy
-  // C++ pattern (no shared_ptr). We use the finish callback to record the
-  // RC, but otherwise rely on the shared_ptr destroying ec_read to deal with
-  // completion of the parent IO.
+  /**
+   * @struct Finisher
+   * @brief Completion callback for sub-operations.
+   *
+   * This structure self-destructs on IO completion, using a legacy C++ pattern
+   * (no shared_ptr). The finish callback records the return code, while the
+   * shared_ptr to the parent SplitOp handles overall completion.
+   *
+   * @note Self-destructs when called
+   */
   struct Finisher : Context {
     std::shared_ptr<SplitOp> split_read;
     SubRead &sub_read;
@@ -176,48 +300,263 @@ class SplitOp {
     }
   };
 
-  int assemble_rc();
-  virtual std::pair<extent_set, bufferlist> assemble_buffer_sparse_read(int ops_index) = 0;
-  virtual void assemble_buffer_read(bufferlist &bl_out, int ops_index) = 0;
+  int assemble_rc() const;
+  virtual std::pair<extent_set, bufferlist> assemble_buffer_sparse_read(int ops_index) const = 0;
+  virtual void assemble_buffer_read(bufferlist &bl_out, int ops_index) const = 0;
   virtual void init_read(OSDOp &op, bool sparse, int ops_index) = 0;
+  virtual bool version_mismatch() const = 0;
   void init(OSDOp &op, int ops_index);
 
   Objecter::Op *orig_op;
   Objecter &objecter;
-  mini_flat_map<shard_id_t, SubRead> sub_reads;
+  mini_flat_map<int, SubRead> sub_reads;
   CephContext *cct;
-  bool abort = false; // Last minute abort... We want to keep this to a minimum.
+  
+  /**
+   * Abort flag pattern for split operation creation:
+   *
+   * This flag implements a multi-stage validation and creation pattern that
+   * minimizes wasted work when a split operation cannot be completed:
+   *
+   * 1. Cheap validation tests run first in validate() and create() before
+   *    creating the split op object (e.g., checking pool flags, operation types)
+   *
+   * 2. If validation fails, return false immediately without creating split op
+   *
+   * 3. If validation passes, create the split op and begin initialization
+   *
+   * 4. During init_read() and init(), set abort=true if problems are detected
+   *    that prevent successful operation (e.g., missing OSDs, invalid state)
+   *
+   * 5. After initialization, check abort flag in create() and discard the
+   *    split op if set (return false to fall back to normal operation)
+   *
+   * 6. The complete() method only runs for successfully sent operations where
+   *    abort=false, ensuring cleanup only happens for valid split ops
+   *
+   * This pattern ensures expensive initialization work is only done when likely
+   * to succeed, while still catching edge cases that can only be detected during
+   * the creation process itself.
+   */
+  bool abort = false;
   int flags = 0;
-  std::optional<shard_id_t> primary_shard;
-  std::map<shard_id_t, std::vector<int>> op_offset_map;
+  int reference_sub_read = -1;
+  std::map<int, std::vector<int>> op_offset_map;
 
  public:
-  SplitOp(Objecter::Op *op, Objecter &objecter, CephContext *cct, int count) : orig_op(op), objecter(objecter), sub_reads(count), cct(cct) {}
-  virtual ~SplitOp() = default;
-  void complete();
-  static bool create(Objecter::Op *op, Objecter &objecter,
-    shunique_lock<ceph::shared_mutex>& sul, ceph_tid_t *ptid, int *ctx_budget, CephContext *cct);
+ /**
+  * @brief Construct a SplitOp.
+  * @param op Original operation to be split
+  * @param objecter Objecter instance for OSD communication
+  * @param cct CephContext for logging and configuration
+  * @param count Number of sub-operations to create
+  */
+ SplitOp(Objecter::Op *op, Objecter &objecter, CephContext *cct, int count) : orig_op(op), objecter(objecter), sub_reads(count), cct(cct) {}
+ virtual ~SplitOp() = default;
+ /**
+  * @brief Complete the split operation by assembling results.
+  *
+  * Called when all sub-operations have completed. Assembles the results
+  * from individual sub-operations, handles version mismatches, and invokes
+  * the original operation's completion handlers.
+  */
+ void complete();
+ /**
+  * @brief Prepare a single-chunk operation for direct execution.
+  *
+  * When an operation can be satisfied by reading from a single OSD (e.g.,
+  * single chunk in EC pool), this method configures the operation for
+  * direct execution without splitting.
+  *
+  * @param op Operation to prepare
+  * @param objecter Objecter instance
+  * @param cct CephContext for logging
+  */
+ static void prepare_single_op(Objecter::Op *op, Objecter &objecter, CephContext *cct);
+ /**
+  * @brief Add version tracking to sub-operations for consistency.
+  *
+  * Ensures all sub-operations read the same object version by adding
+  * internal version queries. If versions mismatch, the operation is retried.
+  */
+ void protect_torn_reads();
+ /**
+  * @brief Create and initialize a split operation.
+  *
+  * Main entry point for split operation creation. Validates the operation,
+  * creates appropriate sub-operations, and sends them to target OSDs.
+  *
+  * Uses a multi-stage abort pattern:
+  * 1. Cheap validation (pool checks, operation types)
+  * 2. Object creation (may set abort flag)
+  * 3. Initialization (may set abort if OSDs missing)
+  * 4. Final validation before sending
+  *
+  * @param op Operation to potentially split
+  * @param objecter Objecter instance
+  * @param sul Shared lock for OSD map access
+  * @param cct CephContext for logging
+  * @return true if operation was split and sent, false to use normal path
+  */
+ static bool create(Objecter::Op *op, Objecter &objecter,
+   shunique_lock<ceph::shared_mutex>& sul, CephContext *cct);
 };
 
+/**
+ * @class ECSplitOp
+ * @brief Split operation implementation for erasure-coded pools.
+ *
+ * ECSplitOp handles splitting operations across data shards in an erasure-coded
+ * pool. It implements the stripe iteration and buffer assembly logic specific to
+ * EC pools, where data is distributed across multiple shards according to the
+ * erasure coding scheme.
+ *
+ * Key responsibilities:
+ * - Iterate through EC stripes to determine which shards to read
+ * - Assemble data from multiple shards in the correct order
+ * - Handle sparse reads with extent maps
+ * - Verify version consistency across shards
+ *
+ * @see ECStripeIterator for stripe traversal algorithm
+ * @see ECStripeView for range-based stripe iteration
+ */
 class ECSplitOp : public SplitOp{
  public:
   using SplitOp::SplitOp;
-  std::pair<extent_set, bufferlist> assemble_buffer_sparse_read(int ops_index) override;
-  void assemble_buffer_read(bufferlist &bl_out, int ops_index) override;
+  
+  /**
+   * @brief Assemble sparse read results from EC shards.
+   *
+   * Iterates through the EC stripe, collecting extent maps and data buffers
+   * from each shard in the correct order to reconstruct the logical object view.
+   *
+   * @param ops_index Index of the operation in the operation list
+   * @return Pair of extent set and assembled buffer
+   */
+  std::pair<extent_set, bufferlist> assemble_buffer_sparse_read(int ops_index) const override;
+  
+  /**
+   * @brief Assemble dense read results from EC shards.
+   *
+   * Iterates through the EC stripe, collecting data buffers from each shard
+   * in the correct order to reconstruct the contiguous logical object data.
+   *
+   * @param bl_out Output buffer to append assembled data
+   * @param ops_index Index of the operation in the operation list
+   */
+  void assemble_buffer_read(bufferlist &bl_out, int ops_index) const override;
+  
+  /**
+   * @brief Initialize read sub-operations for EC pool.
+   *
+   * Determines which shards need to be read based on the stripe geometry,
+   * creates sub-operations for each required shard, and validates that all
+   * target OSDs are available.
+   *
+   * @param op Operation descriptor
+   * @param sparse Whether this is a sparse read
+   * @param ops_index Index of the operation in the operation list
+   */
   void init_read(OSDOp &op, bool sparse, int ops_index) override;
-  ECSplitOp(Objecter::Op *op, Objecter &objecter, CephContext *cct, int count);
+  
+  /**
+   * @brief Check for version mismatches across EC shards.
+   *
+   * Compares the internal versions returned by each shard to ensure all
+   * sub-operations read the same object version. Returns true if any
+   * mismatch is detected.
+   *
+   * @return true if versions mismatch, false if consistent
+   */
+  bool version_mismatch() const override;
+  
   ~ECSplitOp() {
     complete();
   }
 };
 
+/**
+ * @class ReplicaSplitOp
+ * @brief Split operation implementation for replicated pools.
+ *
+ * ReplicaSplitOp handles splitting large operations across replicas in a
+ * replicated pool. It divides the operation into chunks and distributes them
+ * across available replicas for parallel execution.
+ *
+ * Key responsibilities:
+ * - Divide operations into appropriately-sized chunks
+ * - Distribute chunks across available replicas
+ * - Assemble results in the correct order
+ * - Verify version consistency across replicas
+ *
+ * The chunk size is determined by configuration and the number of available
+ * replicas, with a minimum threshold to ensure splitting is beneficial.
+ */
 class ReplicaSplitOp : public SplitOp {
  public:
   using SplitOp::SplitOp;
-  std::pair<extent_set, bufferlist> assemble_buffer_sparse_read(int ops_index) override;
-  void assemble_buffer_read(bufferlist &bl_out, int ops_index) override;
+  
+  /**
+   * @brief Assemble sparse read results from replicas.
+   *
+   * Collects extent maps and data buffers from each replica sub-operation
+   * and combines them into a single result.
+   *
+   * @param ops_index Index of the operation in the operation list
+   * @return Pair of extent set and assembled buffer
+   */
+  std::pair<extent_set, bufferlist> assemble_buffer_sparse_read(int ops_index) const override;
+  
+  /**
+   * @brief Assemble dense read results from replicas.
+   *
+   * Collects data buffers from each replica sub-operation and appends them
+   * in order to reconstruct the complete result.
+   *
+   * @param bl_out Output buffer to append assembled data
+   * @param ops_index Index of the operation in the operation list
+   */
+  void assemble_buffer_read(bufferlist &bl_out, int ops_index) const override;
+  
+  /**
+   * @brief Initialize read sub-operations for replicated pool.
+   *
+   * Divides the operation into chunks and creates sub-operations targeting
+   * different replicas. The chunk size and distribution are determined by
+   * configuration and the number of available OSDs.
+   *
+   * @param op Operation descriptor
+   * @param sparse Whether this is a sparse read
+   * @param ops_index Index of the operation in the operation list
+   */
   void init_read(OSDOp &op, bool sparse, int ops_index) override;
-  ReplicaSplitOp(Objecter::Op *op, Objecter &objecter, CephContext *cct, int pool_size);
+  
+  /**
+   * @brief Check for version mismatches across replicas.
+   *
+   * Compares the versions returned by each replica to ensure all
+   * sub-operations read the same object version. Returns true if any
+   * mismatch is detected.
+   *
+   * @return true if versions mismatch, false if consistent
+   */
+  bool version_mismatch() const override;
+  
+  /**
+   * @brief Construct a ReplicaSplitOp.
+   * @param op Original operation to be split
+   * @param objecter Objecter instance
+   * @param cct CephContext for logging
+   * @param pool_size Number of replicas in the pool
+   */
+  ReplicaSplitOp(Objecter::Op *op, Objecter &objecter, CephContext *cct, int pool_size) :
+    SplitOp(op, objecter, cct, pool_size) {}
+  
   ~ReplicaSplitOp() {
     complete();
   }