]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
osdc: Add SplitOp capability to Objecter
authorAlex Ainscow <aainscow@uk.ibm.com>
Tue, 14 Oct 2025 08:24:56 +0000 (09:24 +0100)
committerAlex Ainscow <aainscow@uk.ibm.com>
Tue, 14 Oct 2025 10:42:14 +0000 (11:42 +0100)
This will provide the ability for Objecter to split up
certain ops and distribute them to the OSDs directly if
that provides a preformance advantage.

This is experimental code and is switched off unless the
magic pool flags are enabled. These magic pool flags were
pushed in an earlier commit in the same PR.

Signed-off-by: Alex Ainscow <aainscow@uk.ibm.com>
src/CMakeLists.txt
src/osdc/Objecter.cc
src/osdc/Objecter.h
src/osdc/SplitOp.cc [new file with mode: 0644]
src/osdc/SplitOp.h [new file with mode: 0644]

index ab832ad7d7914a6041293e21933c5ba8dca5646d..1c8a03e112abe3b978643c48294d54f0452d9df6 100644 (file)
@@ -528,6 +528,7 @@ set(libcommon_files
   osdc/Striper.cc
   osdc/Objecter.cc
   osdc/error_code.cc
+  osdc/SplitOp.cc
   librbd/Features.cc
   librbd/io/IoOperations.cc
   ${mds_files})
index 2d3b3df235a1dcbc502cb434b78010e6f8af28cd..d4a97f2a2ddf6aad76e858608292dba3e605da55 100644 (file)
@@ -63,6 +63,8 @@
 
 #include "neorados/RADOSImpl.h"
 
+#include "osdc/SplitOp.h"
+
 using std::list;
 using std::make_pair;
 using std::map;
@@ -2332,7 +2334,12 @@ void Objecter::op_submit(Op *op, ceph_tid_t *ptid, int *ctx_budget)
   if (!ptid)
     ptid = &tid;
   op->trace.event("op submit");
-  _op_submit_with_budget(op, rl, ptid, ctx_budget);
+
+  bool was_split = SplitOp::create(op, *this, rl, ptid, ctx_budget, cct);
+
+  if (!was_split) {
+    _op_submit_with_budget(op, rl, ptid, ctx_budget);
+  }
 }
 
 void Objecter::_op_submit_with_budget(Op *op,
index 7f03f99041ea23127816945b41692a1cacdb1d86..7734cc0d4493e848601a9c3c5180af8136afd7cb 100644 (file)
@@ -1691,6 +1691,10 @@ inline std::ostream& operator <<(std::ostream& m, const ObjectOperation& oo) {
 // ----------------
 
 class Objecter : public md_config_obs_t, public Dispatcher {
+  friend class SplitOp;
+  friend class ECSplitOp;
+  friend class ReplicaSplitOp;
+
   using MOSDOp = _mosdop::MOSDOp<osdc_opvec>;
 public:
   using OpSignature = void(boost::system::error_code);
diff --git a/src/osdc/SplitOp.cc b/src/osdc/SplitOp.cc
new file mode 100644 (file)
index 0000000..6d3aba5
--- /dev/null
@@ -0,0 +1,473 @@
+#include "osdc/Objecter.h"
+#include "osdc/SplitOp.h"
+#include "osd/osd_types.h"
+
+#define dout_subsys ceph_subsys_objecter
+#define DBG_LVL 20
+
+namespace {
+inline boost::system::error_code osdcode(int r) {
+  return (r < 0) ? boost::system::error_code(-r, osd_category()) : boost::system::error_code();
+}
+}
+
+constexpr static uint64_t kReplicaMinShardReadSize = 128 * 1024;
+constexpr static uint64_t kReplicaMinShardReads = 2;
+constexpr static uint64_t kReplicaMinReadSize = kReplicaMinShardReadSize * kReplicaMinShardReads;
+
+#undef dout_prefix
+#define dout_prefix *_dout << " ECSplitOp::"
+
+std::pair<SplitOp::extent_set, bufferlist> ECSplitOp::assemble_buffer_sparse_read(int ops_index) {
+  bufferlist bl_out;
+  extent_set extents_out;
+
+
+  auto &orig_osd_op = orig_op->ops[ops_index].op;
+  const pg_pool_t *pi = objecter.osdmap->get_pg_pool(orig_op->target.base_oloc.pool);
+  ECStripeView stripe_view(orig_osd_op.extent.offset, orig_osd_op.extent.length, pi);
+  ldout(cct, DBG_LVL) << __func__ << " start:"
+    << orig_osd_op.extent.offset << "~" << orig_osd_op.extent.length << dendl;
+
+  std::vector<uint64_t> buffer_offset(stripe_view.data_chunk_count);
+  mini_flat_map<shard_id_t, extents_map::iterator> map_iterators(stripe_view.data_chunk_count);
+
+  for (auto &&chunk_info : stripe_view) {
+    ldout(cct, DBG_LVL) << __func__ << " chunk: " << chunk_info << dendl;
+    auto &details = sub_reads.at(chunk_info.shard).details[ops_index];
+
+    if (!map_iterators.contains(chunk_info.shard)) {
+      map_iterators.emplace(chunk_info.shard, details.e->begin());
+    }
+
+    extents_map::iterator &extent_iter = map_iterators.at(chunk_info.shard);
+
+    uint64_t bl_len = 0;
+    while (extent_iter != details.e->end() && extent_iter->first < chunk_info.ro_offset + stripe_view.chunk_size) {
+      auto [off, len] = *extent_iter;
+      ldout(cct, DBG_LVL) << __func__ << " extent=" << off << "~" <<  len << dendl;
+      extents_out.insert(off, len);
+      bl_len += len;
+      ++extent_iter;
+    }
+
+    // We try to keep the buffers together where possible.
+    if (bl_len != 0) {
+      bufferlist bl;
+      bl.substr_of(details.bl, buffer_offset[(int)chunk_info.shard], bl_len);
+      bl_out.append(bl);
+      buffer_offset[(int)chunk_info.shard] += bl_len;
+    }
+  }
+
+  return std::pair(extents_out, bl_out);
+}
+
+void ECSplitOp::assemble_buffer_read(bufferlist &bl_out, int ops_index) {
+  auto &orig_osd_op = orig_op->ops[ops_index].op;
+  const pg_pool_t *pi = objecter.osdmap->get_pg_pool(orig_op->target.base_oloc.pool);
+  ECStripeView stripe_view(orig_osd_op.extent.offset, orig_osd_op.extent.length, pi);
+
+  std::vector<uint64_t> buffer_offset(stripe_view.data_chunk_count);
+  ldout(cct, DBG_LVL) << __func__ << " " << orig_osd_op.extent.offset << "~" << orig_osd_op.extent.length << dendl;
+
+  for (auto &&chunk_info : stripe_view) {
+    ldout(cct, DBG_LVL) << __func__ << " chunk info " << chunk_info << dendl;
+    auto &details = sub_reads.at(chunk_info.shard).details[ops_index];
+    bufferlist bl;
+    bl.substr_of(details.bl, buffer_offset[(int)chunk_info.shard], chunk_info.length);
+    bl_out.append(bl);
+    buffer_offset[(int)chunk_info.shard] += chunk_info.length;
+  }
+}
+
+void ECSplitOp::init_read(OSDOp &op, bool sparse, int ops_index) {
+  auto &t = orig_op->target;
+  const pg_pool_t *pi = objecter.osdmap->get_pg_pool(t.base_oloc.pool);
+  uint64_t offset = op.op.extent.offset;
+  uint64_t length = op.op.extent.length;
+  uint64_t data_chunk_count = pi->nonprimary_shards.size() + 1;
+  uint32_t chunk_size = pi->get_stripe_width() / data_chunk_count;
+  uint64_t start_chunk = offset / chunk_size;
+  // This calculation is wrong for length = 0, but such IOs should not have
+  // reached here!
+  ceph_assert( op.op.extent.length != 0);
+  uint64_t end_chunk = (offset + op.op.extent.length - 1) / chunk_size;
+
+  unsigned count = std::min(data_chunk_count, end_chunk - start_chunk + 1);
+  //FIXME: This is not quite right - the ops.size() > 1 does not necessarily mean
+  // that the primary is required - it could be two reads to the same shard.
+  bool primary_required = count > 1 || orig_op->objver || orig_op->ops.size() > 1;
+
+  int first_shard = start_chunk % data_chunk_count;
+  // Check all shards are online.
+  for (unsigned i = first_shard; i < first_shard + count; i++) {
+    shard_id_t shard(i >= data_chunk_count ? i - data_chunk_count : i);
+    int direct_osd = t.acting[(int)shard];
+    if (t.acting_primary == direct_osd) {
+      primary_shard.emplace(shard);
+    }
+    if (!objecter.osdmap->exists(direct_osd)) {
+      ldout(cct, DBG_LVL) << __func__ <<" ABORT: Missing OSD" << dendl;
+      abort = true;
+      return;
+    }
+    if (!sub_reads.contains(shard)) {
+      sub_reads.emplace(shard, orig_op->ops.size());
+    }
+    auto &d = sub_reads.at(shard).details[ops_index];
+    if (sparse) {
+      d.e.emplace();
+      sub_reads.at(shard).rd.sparse_read(offset, length, &(*d.e), &d.bl, &d.rval);
+    } else {
+      sub_reads.at(shard).rd.read(offset, length, &d.ec, &d.bl);
+    }
+  }
+
+  if (primary_required && !primary_shard) {
+    for (unsigned i=0; i < t.acting.size(); ++i) {
+      if (t.acting[i] == t.acting_primary) {
+        primary_shard.emplace(i);
+        sub_reads.emplace(*primary_shard, orig_op->ops.size());
+      }
+    }
+
+    // No primary???  Let the normal code paths deal with this.
+    if (!primary_shard) {
+      ldout(cct, DBG_LVL) << __func__ <<" ABORT: Can't find primary" << dendl;
+      abort = true;
+    }
+  }
+}
+
+ECSplitOp::ECSplitOp(Objecter::Op *op, Objecter &objecter, CephContext *cct, int count) :
+  SplitOp(op, objecter, cct, count) {}
+
+#undef dout_prefix
+#define dout_prefix *_dout << " ReplicaSplitOp::"
+
+std::pair<SplitOp::extent_set, bufferlist> ReplicaSplitOp::assemble_buffer_sparse_read(int ops_index) {
+  extent_set extents_out;
+  bufferlist bl_out;
+
+  for (auto && [shard, sr] : sub_reads) {
+    for (auto [off, len] : *sr.details[ops_index].e) {
+      extents_out.insert(off, len);
+    }
+    bl_out.append(sr.details[ops_index].bl);
+  }
+
+  return std::pair(extents_out, bl_out);
+}
+
+void ReplicaSplitOp::assemble_buffer_read(bufferlist &bl_out, int ops_index) {
+  for (auto && [_, sr] : sub_reads) {
+    bl_out.append(sr.details[ops_index].bl);
+  }
+}
+
+ReplicaSplitOp::ReplicaSplitOp(Objecter::Op *op, Objecter &objecter, CephContext *cct, int pool_size) :
+  SplitOp(op, objecter, cct, pool_size) {
+
+  // This may not actually be the primary, but since all shards are kept current
+  // in replica, it does not actually matter which we choose here. Choose 0 since
+  // there will always be a read to this shard.
+  primary_shard = shard_id_t(0);
+}
+
+void ReplicaSplitOp::init_read(OSDOp &op, bool sparse, int ops_index) {
+
+  auto &t = orig_op->target;
+
+  std::set<int> osds;
+  for (int direct_osd : t.acting) {
+    if (objecter.osdmap->exists(direct_osd)) {
+      osds.insert(direct_osd);
+    }
+  }
+
+  if (osds.size() < 2) {
+    ldout(cct, DBG_LVL) << __func__ <<" ABORT: No OSDs" << dendl;
+    abort = true;
+    return;
+  }
+
+  uint64_t offset = op.op.extent.offset;
+  uint64_t length = op.op.extent.length;
+  uint64_t slice_count = std::min(length / kReplicaMinShardReadSize, osds.size());
+  uint64_t chunk_size = p2roundup(length / slice_count, (uint64_t)CEPH_PAGE_SIZE);
+
+  for (unsigned i = 0; i < osds.size() && length > 0; i++) {
+
+    shard_id_t shard(i);
+    if (!sub_reads.contains(shard)) {
+      sub_reads.emplace(shard, orig_op->ops.size());
+    }
+    auto &sr = sub_reads.at(shard);
+    auto bl = &sr.details[ops_index].bl;
+    auto rval = &sr.details[ops_index].rval;
+    uint64_t len = std::min(length, chunk_size);
+    if (sparse) {
+      sr.details[ops_index].e.emplace();
+      sr.rd.sparse_read(offset, len, &(*sr.details[ops_index].e), bl, rval);
+    } else {
+      sr.rd.read(offset, len, &sr.details[ops_index].ec, bl);
+    }
+    offset += len;
+    length -= len;
+  }
+}
+
+#undef dout_prefix
+#define dout_prefix *_dout << " SplitOp::"
+
+int SplitOp::assemble_rc() {
+  int rc = 0;
+  bool rc_zero = false;
+
+  // This should only happen on a single thread.
+  for (auto & [_, sub_read] : sub_reads) {
+    if (rc >= 0 && sub_read.rc >= 0) {
+      rc += sub_read.rc;
+      if (sub_read.rc == 0) {
+        rc_zero = true;
+      }
+    } else if (rc >= 0) {
+      rc = sub_read.rc;
+    } // else ignore subsequent errors.
+  }
+
+  if (rc >= 0 && rc_zero) {
+    return 0;
+  }
+
+  return rc;
+}
+
+void SplitOp::complete() {
+  if (abort) {
+    return;
+  }
+  ldout(cct, 20) << __func__ << " entry this=" << this << dendl;
+
+  int rc = assemble_rc();
+  if (rc >= 0) {
+
+    // In a "normal" completion, out_ops is generated in the MOSDOpReply reply
+    // which we do not have here. Here we are going to mimic this behaviour
+    // so as to reproduce as much as possible of the IO completion.
+    std::vector out_ops(orig_op->ops.begin(), orig_op->ops.end());
+
+    for (unsigned ops_index=0; ops_index < out_ops.size(); ++ops_index) {
+      auto &out_osd_op = out_ops[ops_index];
+      switch (out_osd_op.op.op) {
+        case CEPH_OSD_OP_SPARSE_READ: {
+          auto [extents, bl] = assemble_buffer_sparse_read(ops_index);
+          encode(std::move(extents).detach(), out_osd_op.outdata);
+          encode_destructively(bl, out_osd_op.outdata);
+          break;
+        }
+        case CEPH_OSD_OP_READ: {
+          assemble_buffer_read(out_osd_op.outdata, ops_index);
+          break;
+        }
+        case CEPH_OSD_OP_GETXATTRS:
+        case CEPH_OSD_OP_CHECKSUM:
+        case CEPH_OSD_OP_GETXATTR: {
+          out_osd_op.outdata = sub_reads.at(*primary_shard).details[ops_index].bl;
+          out_osd_op.rval = sub_reads.at(*primary_shard).details[ops_index].rval;
+          break;
+        }
+      default: {
+          ceph_abort_msg("Not supported");
+          break;
+        }
+      }
+    }
+
+    objecter.handle_osd_op_reply2(orig_op, out_ops);
+
+    ldout(cct, DBG_LVL) << __func__ << " success this=" << this << " rc=" << rc << dendl;
+    Objecter::Op::complete(std::move(orig_op->onfinish), osdcode(rc), rc, objecter.service.get_executor());
+    objecter._finish_op(orig_op, rc);
+  } else {
+    ldout(cct, DBG_LVL) << __func__ << " retry this=" << this << " rc=" << rc << dendl;
+    objecter.op_post_submit(orig_op);
+  }
+}
+
+static bool validate(Objecter::Op *op, bool is_erasure, CephContext *cct) {
+
+  if ((op->target.flags & CEPH_OSD_FLAG_BALANCE_READS) == 0 ) {
+    ldout(cct, DBG_LVL) << __func__ <<" REJECT: Client rejects balanced read" << dendl;
+    return false;
+  }
+
+  uint64_t suitable_read_found = false;
+  for (auto & o : op->ops) {
+    switch (o.op.op) {
+      case CEPH_OSD_OP_READ:
+      case CEPH_OSD_OP_SPARSE_READ: {
+        uint64_t length = o.op.extent.length;
+        if ((is_erasure && length > 0) ||
+            (!is_erasure && length >= kReplicaMinReadSize)) {
+          suitable_read_found = true;
+        }
+        break;
+      }
+      case CEPH_OSD_OP_GETXATTRS:
+      case CEPH_OSD_OP_CHECKSUM:
+      case CEPH_OSD_OP_GETXATTR: {
+        break; // Do not block validate.
+      }
+      default: {
+        ldout(cct, DBG_LVL) << __func__ <<" REJECT: unsupported op" << dendl;
+        return false;
+      }
+    }
+  }
+
+  return suitable_read_found;
+}
+
+void SplitOp::init(OSDOp &op, int ops_index) {
+  switch (op.op.op) {
+    case CEPH_OSD_OP_SPARSE_READ: {
+      init_read(op, true, ops_index);
+      break;
+    }
+    case CEPH_OSD_OP_READ: {
+      init_read(op, false, ops_index);
+      break;
+    }
+    case CEPH_OSD_OP_GETXATTRS:
+    case CEPH_OSD_OP_CHECKSUM:
+    case CEPH_OSD_OP_GETXATTR: {
+      shard_id_t shard = *primary_shard;
+      Details &d = sub_reads.at(shard).details[ops_index];
+      orig_op->pass_thru_op(sub_reads.at(shard).rd, ops_index, &d.bl, &d.rval);
+      break;
+    }
+    default: {
+      ldout(cct, DBG_LVL) << __func__ <<" ABORT: unsupported" << dendl;
+      abort = true;
+      break;
+    }
+  }
+}
+
+namespace {
+void debug_op_summary(const std::string &str, Objecter::Op *op, CephContext *cct) {
+  auto &t = op->target;
+  ldout(cct, DBG_LVL) << str
+    << " osd=" << t.osd
+    << " shard=" << (t.force_shard ? *t.force_shard : shard_id_t(-1))
+    << " balance_reads=" << ((t.flags & CEPH_OSD_FLAG_BALANCE_READS) != 0)
+    << " ops.size()=" << op->ops.size()
+    << " needs_version=" << (op->objver?"true":"false");
+
+  for (auto && o : op->ops) {
+    *_dout << " op_code=" << ceph_osd_op_name(o.op.op);
+    switch (o.op.op) {
+      case CEPH_OSD_OP_READ:
+      case CEPH_OSD_OP_SPARSE_READ: {
+        *_dout << "(" << o.op.extent.offset << "~" << o.op.extent.length << ")";
+        break;
+      }
+      default:
+      break;
+    }
+  }
+  *_dout << dendl;
+}
+}
+
+
+bool SplitOp::create(Objecter::Op *op, Objecter &objecter,
+  shunique_lock<ceph::shared_mutex>& sul, ceph_tid_t *ptid, int *ctx_budget, CephContext *cct) {
+
+  auto &t = op->target;
+  const pg_pool_t *pi = objecter.osdmap->get_pg_pool(t.base_oloc.pool);
+
+  if (!pi) {
+    ldout(cct, DBG_LVL) << __func__ <<" REJECT: No Pool" << dendl;
+    return false;
+  }
+
+  debug_op_summary("orig_op: ", op, cct);
+
+  // Reject if direct reads not supported by profile.
+  if (!pi->has_flag(pg_pool_t::FLAG_CLIENT_SPLIT_READS)) {
+    ldout(cct, DBG_LVL) << __func__ <<" REJECT: split reads off" << dendl;
+    return false;
+  }
+
+  bool validated = validate(op, pi->is_erasure(), cct);
+
+  if (!validated) {
+    return false;
+  }
+
+  std::shared_ptr<SplitOp> split_read;
+
+  if (pi->is_erasure()) {
+    split_read = std::make_shared<ECSplitOp>(op, objecter, cct, pi->size);
+  } else {
+    split_read = std::make_shared<ReplicaSplitOp>(op, objecter, cct, pi->size);
+  }
+
+  if (split_read->abort) {
+    ldout(cct, DBG_LVL) << __func__ <<" ABORTED 1" << dendl;return false;
+    return false;
+  }
+
+  // Populate the target, to extract the acting set from it.
+  t.flags &= ~CEPH_OSD_FLAG_BALANCE_READS;
+  objecter._calc_target(&op->target, op);
+
+  for (unsigned i = 0; i < op->ops.size(); ++i) {
+    split_read->init( op->ops[i], i);
+    if (split_read->abort) {
+      break;
+    }
+  }
+
+  if (split_read->abort) {
+    ldout(cct, DBG_LVL) << __func__ <<" ABORTED 2" << dendl;
+    return false;
+  }
+
+  ldout(cct, DBG_LVL) << __func__ <<" sub_reads ready. count=" << split_read->sub_reads.size() << dendl;
+
+  // We are committed to doing a split read. Any re-attempts should not be either
+  // split or balanced.
+  for (auto && [shard, sub_read] : split_read->sub_reads) {
+    auto fin = new Finisher(split_read, sub_read); // Self-destructs when called.
+
+    version_t *objver = nullptr;
+    if (split_read->primary_shard && shard == *split_read->primary_shard) {
+      objver = split_read->orig_op->objver;
+    }
+
+    auto sub_op = objecter.prepare_read_op(
+      t.base_oid, t.base_oloc, split_read->sub_reads.at(shard).rd, op->snapid,
+      nullptr, split_read->flags, -1, fin, objver);
+    sub_op->target.force_shard.emplace(shard);
+    if (pi->is_erasure()) {
+      sub_op->target.flags |= CEPH_OSD_FLAG_EC_DIRECT_READ;
+    } else {
+      sub_op->target.flags |= CEPH_OSD_FLAG_BALANCE_READS;
+    }
+
+    objecter._op_submit_with_budget(sub_op, sul, ptid, ctx_budget);
+    debug_op_summary("sent_op", sub_op, cct);
+  }
+
+  ceph_assert(split_read->sub_reads.size() > 0);
+
+  return true;
+}
+
+
+#undef dout_prefix
+#define dout_prefix *_dout << messenger->get_myname() << ".objecter "
\ No newline at end of file
diff --git a/src/osdc/SplitOp.h b/src/osdc/SplitOp.h
new file mode 100644 (file)
index 0000000..1654799
--- /dev/null
@@ -0,0 +1,214 @@
+#pragma once
+
+#include <ostream>
+#include <ranges>
+
+class SplitOp {
+
+ protected:
+  using extent = std::pair<uint64_t, uint64_t>;
+  using extents_map = std::map<uint64_t, uint64_t>;
+  using extent_set = interval_set<uint64_t, std::map, false>;
+
+  using ExtentPredicate = std::function<bool(const extent&)>;
+  using extent_map_subrange = std::ranges::subrange<extents_map::const_iterator>;
+  using extent_map_subrange_view = std::ranges::take_while_view<extent_map_subrange, ExtentPredicate>;
+  using extent_variant = std::variant<std::ranges::single_view<extent>, extent_map_subrange_view, extent_map_subrange>;
+  using buffer_appender = std::function<void(bufferlist &, uint64_t *, extent_variant)>;
+
+  // A simple struct to hold the data for each step of the iteration.
+  struct ECChunkInfo {
+    uint64_t ro_offset;
+    uint64_t shard_offset;
+    uint64_t length;
+    shard_id_t shard;
+
+    friend std::ostream & operator<<(std::ostream &os, const ECChunkInfo &obj) {
+      return os
+          << "ro_offset: " << obj.ro_offset
+          << " shard_offset: " << obj.shard_offset
+          << " length: " << obj.length
+          << " shard: " << obj.shard;
+    }
+  };
+
+  class ECStripeIterator {
+   public:
+    using iterator_category = std::input_iterator_tag;
+    using value_type = ECChunkInfo;
+    using difference_type = std::ptrdiff_t;
+    using pointer = ECChunkInfo*;
+    using reference = ECChunkInfo&;
+
+    ECStripeIterator() = default;
+
+    // Constructor for the "begin" iterator
+    ECStripeIterator(
+      uint64_t start_offset,
+      uint64_t total_len,
+      uint32_t chunk_s,
+      uint32_t data_chunks)
+      : chunk_size(chunk_s),
+        data_chunk_count(data_chunks) {
+      end_offset = start_offset + total_len;
+      current_info.ro_offset = start_offset;
+      uint64_t chunk = start_offset / chunk_size;
+      current_info.length = std::min(total_len, (chunk + 1) * chunk_size - start_offset);
+
+      // Maybe this is paranoia, as compiler would probably detect that this
+      // / and % could be done in a single op.
+      auto chunk_div = std::lldiv(chunk, data_chunk_count);
+      current_info.shard = shard_id_t(chunk_div.rem);
+      current_info.shard_offset = (chunk_div.quot) * chunk_size + start_offset % chunk_size;
+    }
+
+    value_type operator*() const {
+      return current_info;
+    }
+
+    // Pre-increment
+    ECStripeIterator& operator++() {
+      current_info.ro_offset += current_info.length;
+      current_info.shard_offset += current_info.length - chunk_size;
+      current_info.length = std::min(chunk_size, end_offset - current_info.ro_offset);
+      ceph_assert(current_info.ro_offset <= end_offset);
+      ++current_info.shard;
+      if (unsigned(current_info.shard) == data_chunk_count) {
+        current_info.shard_offset += chunk_size;
+        current_info.shard = shard_id_t(0);
+      }
+      return *this;
+    }
+
+    // post-increment
+    ECStripeIterator operator++(int) {
+      ECStripeIterator tmp = *this;
+      ++(*this);
+      return tmp;
+    }
+
+    bool operator!=(const ECStripeIterator& other) const {
+      // This is only here to terminate the loop!
+      return current_info.length != other.current_info.length;
+    }
+
+    bool operator==(const ECStripeIterator& other) const {
+      return !(*this != other);
+    }
+    value_type current_info{};
+
+  private:
+    uint64_t end_offset = 0;
+    uint64_t chunk_size = 0;
+    uint32_t data_chunk_count = 0;
+  };
+
+
+  // The custom range class that provides begin() and end()
+  class ECStripeView {
+   public:
+    ECStripeView(
+      uint64_t offset,
+      uint64_t length,
+      const pg_pool_t *pi)
+      : start_offset(offset),
+        total_length(length),
+        data_chunk_count(pi->nonprimary_shards.size() + 1),
+        chunk_size(pi->get_stripe_width() / data_chunk_count) {
+    }
+
+    ECStripeIterator begin() const {
+      return ECStripeIterator(start_offset, total_length, chunk_size, data_chunk_count);
+    }
+
+    ECStripeIterator end() const {
+      ECStripeIterator end_iter;
+      end_iter.current_info.length = 0;
+      return end_iter;
+    }
+
+    uint64_t start_offset;
+    uint64_t total_length;
+    uint32_t data_chunk_count;
+    uint64_t chunk_size;
+  };
+
+  static_assert(std::input_iterator<ECStripeIterator>,
+                "ECStripeIterator does not conform to the std::input_iterator concept");
+
+  struct Details {
+    bufferlist bl;
+    int rval;
+    boost::system::error_code ec;
+    std::optional<extents_map> e;
+  };
+
+  struct SubRead {
+    ::ObjectOperation rd;
+    mini_flat_map<int, Details> details;
+    int rc = -EIO;
+
+    SubRead(int count) : details(count) {}
+  };
+
+  // This structure self-destructs on each IO completions, using a legacy
+  // C++ pattern (no shared_ptr). We use the finish callback to record the
+  // RC, but otherwise rely on the shared_ptr destroying ec_read to deal with
+  // completion of the parent IO.
+  struct Finisher : Context {
+    std::shared_ptr<SplitOp> split_read;
+    SubRead &sub_read;
+
+    Finisher(std::shared_ptr<SplitOp> split_read, SubRead &sub_read) : split_read(split_read), sub_read(sub_read) {}
+    void finish(int r) override {
+      sub_read.rc = r;
+    }
+  };
+
+  int assemble_rc();
+  virtual std::pair<extent_set, bufferlist> assemble_buffer_sparse_read(int ops_index) = 0;
+  virtual void assemble_buffer_read(bufferlist &bl_out, int ops_index) = 0;
+  virtual void init_read(OSDOp &op, bool sparse, int ops_index) = 0;
+  void init(OSDOp &op, int ops_index);
+
+  Objecter::Op *orig_op;
+  Objecter &objecter;
+  mini_flat_map<shard_id_t, SubRead> sub_reads;
+  CephContext *cct;
+  bool abort = false; // Last minute abort... We want to keep this to a minimum.
+  int flags = 0;
+  std::optional<shard_id_t> primary_shard;
+  std::map<shard_id_t, std::vector<int>> op_offset_map;
+
+ public:
+  SplitOp(Objecter::Op *op, Objecter &objecter, CephContext *cct, int count) : orig_op(op), objecter(objecter), sub_reads(count), cct(cct) {}
+  virtual ~SplitOp() = default;
+  void complete();
+  static bool create(Objecter::Op *op, Objecter &objecter,
+    shunique_lock<ceph::shared_mutex>& sul, ceph_tid_t *ptid, int *ctx_budget, CephContext *cct);
+};
+
+class ECSplitOp : public SplitOp{
+ public:
+  using SplitOp::SplitOp;
+  std::pair<extent_set, bufferlist> assemble_buffer_sparse_read(int ops_index) override;
+  void assemble_buffer_read(bufferlist &bl_out, int ops_index) override;
+  void init_read(OSDOp &op, bool sparse, int ops_index) override;
+  ECSplitOp(Objecter::Op *op, Objecter &objecter, CephContext *cct, int count);
+  ~ECSplitOp() {
+    complete();
+  }
+};
+
+class ReplicaSplitOp : public SplitOp {
+ public:
+  using SplitOp::SplitOp;
+  std::pair<extent_set, bufferlist> assemble_buffer_sparse_read(int ops_index) override;
+  void assemble_buffer_read(bufferlist &bl_out, int ops_index) override;
+  void init_read(OSDOp &op, bool sparse, int ops_index) override;
+  ReplicaSplitOp(Objecter::Op *op, Objecter &objecter, CephContext *cct, int pool_size);
+  ~ReplicaSplitOp() {
+    complete();
+  }
+};
+