]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osdc: Add SplitOp capability to Objecter 65771/head
authorAlex Ainscow <aainscow@uk.ibm.com>
Tue, 14 Oct 2025 08:24:56 +0000 (09:24 +0100)
committerAlex Ainscow <aainscow@uk.ibm.com>
Wed, 26 Nov 2025 11:28:25 +0000 (11:28 +0000)
This will provide the ability for Objecter to split up
certain ops and distribute them to the OSDs directly if
that provides a preformance advantage.

This is experimental code and is switched off unless the
magic pool flags are enabled. These magic pool flags were
pushed in an earlier commit in the same PR.

Signed-off-by: Alex Ainscow <aainscow@uk.ibm.com>
src/CMakeLists.txt
src/osdc/Objecter.cc
src/osdc/Objecter.h
src/osdc/SplitOp.cc [new file with mode: 0644]
src/osdc/SplitOp.h [new file with mode: 0644]

index 06f900d6e6e081af0da219cf51f628239db74b72..ab6dc2576232bc6e00a9c176f4bc5644fa62ae10 100644 (file)
@@ -532,6 +532,7 @@ set(libcommon_files
   osdc/Striper.cc
   osdc/Objecter.cc
   osdc/error_code.cc
+  osdc/SplitOp.cc
   librbd/Features.cc
   librbd/io/IoOperations.cc
   ${mds_files})
index de16750d88d9149f76758aa5eb0377271a51d7a9..e8d8df64092d8efb74288585b387f4bbcca13a3f 100644 (file)
@@ -63,6 +63,8 @@
 
 #include "neorados/RADOSImpl.h"
 
+#include "osdc/SplitOp.h"
+
 using std::list;
 using std::make_pair;
 using std::map;
@@ -2350,7 +2352,12 @@ void Objecter::op_submit(Op *op, ceph_tid_t *ptid, int *ctx_budget)
   if (!ptid)
     ptid = &tid;
   op->trace.event("op submit");
-  _op_submit_with_budget(op, rl, ptid, ctx_budget);
+
+  bool was_split = SplitOp::create(op, *this, rl, ptid, ctx_budget, cct);
+
+  if (!was_split) {
+    _op_submit_with_budget(op, rl, ptid, ctx_budget);
+  }
 }
 
 void Objecter::_op_submit_with_budget(Op *op,
index fffcb9cb71d7f2c10c5207296e9c7ebc6de1cc11..400596acb063acf9e9f840099ece05c54484d5db 100644 (file)
@@ -1691,6 +1691,10 @@ inline std::ostream& operator <<(std::ostream& m, const ObjectOperation& oo) {
 // ----------------
 
 class Objecter : public md_config_obs_t, public Dispatcher {
+  friend class SplitOp;
+  friend class ECSplitOp;
+  friend class ReplicaSplitOp;
+
   using MOSDOp = _mosdop::MOSDOp<osdc_opvec>;
 public:
   using OpSignature = void(boost::system::error_code);
diff --git a/src/osdc/SplitOp.cc b/src/osdc/SplitOp.cc
new file mode 100644 (file)
index 0000000..5363f10
--- /dev/null
@@ -0,0 +1,489 @@
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2025 IBM
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ */
+
+#include "osdc/Objecter.h"
+#include "osdc/SplitOp.h"
+#include "osd/osd_types.h"
+
+#define dout_subsys ceph_subsys_objecter
+#define DBG_LVL 20
+
+namespace {
+inline boost::system::error_code osdcode(int r) {
+  return (r < 0) ? boost::system::error_code(-r, osd_category()) : boost::system::error_code();
+}
+}
+
+constexpr static uint64_t kReplicaMinShardReadSize = 128 * 1024;
+constexpr static uint64_t kReplicaMinShardReads = 2;
+constexpr static uint64_t kReplicaMinReadSize = kReplicaMinShardReadSize * kReplicaMinShardReads;
+
+#undef dout_prefix
+#define dout_prefix *_dout << " ECSplitOp::"
+
+std::pair<SplitOp::extent_set, bufferlist> ECSplitOp::assemble_buffer_sparse_read(int ops_index) {
+  bufferlist bl_out;
+  extent_set extents_out;
+
+
+  auto &orig_osd_op = orig_op->ops[ops_index].op;
+  const pg_pool_t *pi = objecter.osdmap->get_pg_pool(orig_op->target.base_oloc.pool);
+  ECStripeView stripe_view(orig_osd_op.extent.offset, orig_osd_op.extent.length, pi);
+  ldout(cct, DBG_LVL) << __func__ << " start:"
+    << orig_osd_op.extent.offset << "~" << orig_osd_op.extent.length << dendl;
+
+  std::vector<uint64_t> buffer_offset(stripe_view.data_chunk_count);
+  mini_flat_map<shard_id_t, extents_map::iterator> map_iterators(stripe_view.data_chunk_count);
+
+  for (auto &&chunk_info : stripe_view) {
+    ldout(cct, DBG_LVL) << __func__ << " chunk: " << chunk_info << dendl;
+    auto &details = sub_reads.at(chunk_info.shard).details[ops_index];
+
+    if (!map_iterators.contains(chunk_info.shard)) {
+      map_iterators.emplace(chunk_info.shard, details.e->begin());
+    }
+
+    extents_map::iterator &extent_iter = map_iterators.at(chunk_info.shard);
+
+    uint64_t bl_len = 0;
+    while (extent_iter != details.e->end() && extent_iter->first < chunk_info.ro_offset + stripe_view.chunk_size) {
+      auto [off, len] = *extent_iter;
+      ldout(cct, DBG_LVL) << __func__ << " extent=" << off << "~" <<  len << dendl;
+      extents_out.insert(off, len);
+      bl_len += len;
+      ++extent_iter;
+    }
+
+    // We try to keep the buffers together where possible.
+    if (bl_len != 0) {
+      bufferlist bl;
+      bl.substr_of(details.bl, buffer_offset[(int)chunk_info.shard], bl_len);
+      bl_out.append(bl);
+      buffer_offset[(int)chunk_info.shard] += bl_len;
+    }
+  }
+
+  return std::pair(extents_out, bl_out);
+}
+
+void ECSplitOp::assemble_buffer_read(bufferlist &bl_out, int ops_index) {
+  auto &orig_osd_op = orig_op->ops[ops_index].op;
+  const pg_pool_t *pi = objecter.osdmap->get_pg_pool(orig_op->target.base_oloc.pool);
+  ECStripeView stripe_view(orig_osd_op.extent.offset, orig_osd_op.extent.length, pi);
+
+  std::vector<uint64_t> buffer_offset(stripe_view.data_chunk_count);
+  ldout(cct, DBG_LVL) << __func__ << " " << orig_osd_op.extent.offset << "~" << orig_osd_op.extent.length << dendl;
+
+  for (auto &&chunk_info : stripe_view) {
+    ldout(cct, DBG_LVL) << __func__ << " chunk info " << chunk_info << dendl;
+    auto &details = sub_reads.at(chunk_info.shard).details[ops_index];
+    bufferlist bl;
+    bl.substr_of(details.bl, buffer_offset[(int)chunk_info.shard], chunk_info.length);
+    bl_out.append(bl);
+    buffer_offset[(int)chunk_info.shard] += chunk_info.length;
+  }
+}
+
+void ECSplitOp::init_read(OSDOp &op, bool sparse, int ops_index) {
+  auto &t = orig_op->target;
+  const pg_pool_t *pi = objecter.osdmap->get_pg_pool(t.base_oloc.pool);
+  uint64_t offset = op.op.extent.offset;
+  uint64_t length = op.op.extent.length;
+  uint64_t data_chunk_count = pi->nonprimary_shards.size() + 1;
+  uint32_t chunk_size = pi->get_stripe_width() / data_chunk_count;
+  uint64_t start_chunk = offset / chunk_size;
+  // This calculation is wrong for length = 0, but such IOs should not have
+  // reached here!
+  ceph_assert( op.op.extent.length != 0);
+  uint64_t end_chunk = (offset + op.op.extent.length - 1) / chunk_size;
+
+  unsigned count = std::min(data_chunk_count, end_chunk - start_chunk + 1);
+  //FIXME: This is not quite right - the ops.size() > 1 does not necessarily mean
+  // that the primary is required - it could be two reads to the same shard.
+  bool primary_required = count > 1 || orig_op->objver || orig_op->ops.size() > 1;
+
+  int first_shard = start_chunk % data_chunk_count;
+  // Check all shards are online.
+  for (unsigned i = first_shard; i < first_shard + count; i++) {
+    shard_id_t shard(i >= data_chunk_count ? i - data_chunk_count : i);
+    int direct_osd = t.acting[(int)shard];
+    // TODO: This is broken because there might be multiple shards on one OSD.
+    // Will fix in the next PR.
+    if (t.acting_primary == direct_osd) {
+      primary_shard.emplace(shard);
+    }
+    if (!objecter.osdmap->exists(direct_osd)) {
+      ldout(cct, DBG_LVL) << __func__ <<" ABORT: Missing OSD" << dendl;
+      abort = true;
+      return;
+    }
+    if (!sub_reads.contains(shard)) {
+      sub_reads.emplace(shard, orig_op->ops.size());
+    }
+    auto &d = sub_reads.at(shard).details[ops_index];
+    if (sparse) {
+      d.e.emplace();
+      sub_reads.at(shard).rd.sparse_read(offset, length, &(*d.e), &d.bl, &d.rval);
+    } else {
+      sub_reads.at(shard).rd.read(offset, length, &d.ec, &d.bl);
+    }
+  }
+
+  if (primary_required && !primary_shard) {
+    for (unsigned i=0; i < t.acting.size(); ++i) {
+      // TODO: Can't compare OSDs for EC as there may be more than one shard on
+      //       each OSD.
+      // Will fix in next PR.
+      if (t.acting[i] == t.acting_primary) {
+        primary_shard.emplace(i);
+        sub_reads.emplace(*primary_shard, orig_op->ops.size());
+      }
+    }
+
+    // No primary???  Let the normal code paths deal with this.
+    if (!primary_shard) {
+      ldout(cct, DBG_LVL) << __func__ <<" ABORT: Can't find primary" << dendl;
+      abort = true;
+    }
+  }
+}
+
+ECSplitOp::ECSplitOp(Objecter::Op *op, Objecter &objecter, CephContext *cct, int count) :
+  SplitOp(op, objecter, cct, count) {}
+
+#undef dout_prefix
+#define dout_prefix *_dout << " ReplicaSplitOp::"
+
+std::pair<SplitOp::extent_set, bufferlist> ReplicaSplitOp::assemble_buffer_sparse_read(int ops_index) {
+  extent_set extents_out;
+  bufferlist bl_out;
+
+  for (auto && [shard, sr] : sub_reads) {
+    for (auto [off, len] : *sr.details[ops_index].e) {
+      extents_out.insert(off, len);
+    }
+    bl_out.append(sr.details[ops_index].bl);
+  }
+
+  return std::pair(extents_out, bl_out);
+}
+
+void ReplicaSplitOp::assemble_buffer_read(bufferlist &bl_out, int ops_index) {
+  for (auto && [_, sr] : sub_reads) {
+    bl_out.append(sr.details[ops_index].bl);
+  }
+}
+
+ReplicaSplitOp::ReplicaSplitOp(Objecter::Op *op, Objecter &objecter, CephContext *cct, int pool_size) :
+  SplitOp(op, objecter, cct, pool_size) {
+
+  // This may not actually be the primary, but since all shards are kept current
+  // in replica, it does not actually matter which we choose here. Choose 0 since
+  // there will always be a read to this shard.
+  primary_shard = shard_id_t(0);
+}
+
+void ReplicaSplitOp::init_read(OSDOp &op, bool sparse, int ops_index) {
+
+  auto &t = orig_op->target;
+
+  std::set<int> osds;
+  for (int direct_osd : t.acting) {
+    if (objecter.osdmap->exists(direct_osd)) {
+      osds.insert(direct_osd);
+    }
+  }
+
+  if (osds.size() < 2) {
+    ldout(cct, DBG_LVL) << __func__ <<" ABORT: No OSDs" << dendl;
+    abort = true;
+    return;
+  }
+
+  uint64_t offset = op.op.extent.offset;
+  uint64_t length = op.op.extent.length;
+  uint64_t slice_count = std::min(length / kReplicaMinShardReadSize, osds.size());
+  uint64_t chunk_size = p2roundup(length / slice_count, (uint64_t)CEPH_PAGE_SIZE);
+
+  for (unsigned i = 0; i < osds.size() && length > 0; i++) {
+
+    shard_id_t shard(i);
+    if (!sub_reads.contains(shard)) {
+      sub_reads.emplace(shard, orig_op->ops.size());
+    }
+    auto &sr = sub_reads.at(shard);
+    auto bl = &sr.details[ops_index].bl;
+    auto rval = &sr.details[ops_index].rval;
+    uint64_t len = std::min(length, chunk_size);
+    if (sparse) {
+      sr.details[ops_index].e.emplace();
+      sr.rd.sparse_read(offset, len, &(*sr.details[ops_index].e), bl, rval);
+    } else {
+      sr.rd.read(offset, len, &sr.details[ops_index].ec, bl);
+    }
+    offset += len;
+    length -= len;
+  }
+}
+
+#undef dout_prefix
+#define dout_prefix *_dout << " SplitOp::"
+
+int SplitOp::assemble_rc() {
+  int rc = 0;
+  bool rc_zero = false;
+
+  // This should only happen on a single thread.
+  for (auto & [_, sub_read] : sub_reads) {
+    if (rc >= 0 && sub_read.rc >= 0) {
+      rc += sub_read.rc;
+      if (sub_read.rc == 0) {
+        rc_zero = true;
+      }
+    } else if (rc >= 0) {
+      rc = sub_read.rc;
+    } // else ignore subsequent errors.
+  }
+
+  if (rc >= 0 && rc_zero) {
+    return 0;
+  }
+
+  return rc;
+}
+
+void SplitOp::complete() {
+  if (abort) {
+    return;
+  }
+  ldout(cct, 20) << __func__ << " entry this=" << this << dendl;
+
+  int rc = assemble_rc();
+  if (rc >= 0) {
+
+    // In a "normal" completion, out_ops is generated in the MOSDOpReply reply
+    // which we do not have here. Here we are going to mimic this behaviour
+    // so as to reproduce as much as possible of the IO completion.
+    std::vector out_ops(orig_op->ops.begin(), orig_op->ops.end());
+
+    for (unsigned ops_index=0; ops_index < out_ops.size(); ++ops_index) {
+      auto &out_osd_op = out_ops[ops_index];
+      switch (out_osd_op.op.op) {
+        case CEPH_OSD_OP_SPARSE_READ: {
+          auto [extents, bl] = assemble_buffer_sparse_read(ops_index);
+          encode(std::move(extents).detach(), out_osd_op.outdata);
+          encode_destructively(bl, out_osd_op.outdata);
+          break;
+        }
+        case CEPH_OSD_OP_READ: {
+          assemble_buffer_read(out_osd_op.outdata, ops_index);
+          break;
+        }
+        case CEPH_OSD_OP_GETXATTRS:
+        case CEPH_OSD_OP_CHECKSUM:
+        case CEPH_OSD_OP_GETXATTR: {
+          out_osd_op.outdata = sub_reads.at(*primary_shard).details[ops_index].bl;
+          out_osd_op.rval = sub_reads.at(*primary_shard).details[ops_index].rval;
+          break;
+        }
+      default: {
+          ceph_abort_msg("Not supported");
+          break;
+        }
+      }
+    }
+
+    objecter.handle_osd_op_reply2(orig_op, out_ops);
+
+    ldout(cct, DBG_LVL) << __func__ << " success this=" << this << " rc=" << rc << dendl;
+    Objecter::Op::complete(std::move(orig_op->onfinish), osdcode(rc), rc, objecter.service.get_executor());
+    objecter._finish_op(orig_op, rc);
+  } else {
+    ldout(cct, DBG_LVL) << __func__ << " retry this=" << this << " rc=" << rc << dendl;
+    objecter.op_post_submit(orig_op);
+  }
+}
+
+static bool validate(Objecter::Op *op, bool is_erasure, CephContext *cct) {
+
+  if ((op->target.flags & CEPH_OSD_FLAG_BALANCE_READS) == 0 ) {
+    ldout(cct, DBG_LVL) << __func__ <<" REJECT: Client rejects balanced read" << dendl;
+    return false;
+  }
+
+  uint64_t suitable_read_found = false;
+  for (auto & o : op->ops) {
+    switch (o.op.op) {
+      case CEPH_OSD_OP_READ:
+      case CEPH_OSD_OP_SPARSE_READ: {
+        uint64_t length = o.op.extent.length;
+        if ((is_erasure && length > 0) ||
+            (!is_erasure && length >= kReplicaMinReadSize)) {
+          suitable_read_found = true;
+        }
+        break;
+      }
+      case CEPH_OSD_OP_GETXATTRS:
+      case CEPH_OSD_OP_CHECKSUM:
+      case CEPH_OSD_OP_GETXATTR: {
+        break; // Do not block validate.
+      }
+      default: {
+        ldout(cct, DBG_LVL) << __func__ <<" REJECT: unsupported op" << dendl;
+        return false;
+      }
+    }
+  }
+
+  return suitable_read_found;
+}
+
+void SplitOp::init(OSDOp &op, int ops_index) {
+  switch (op.op.op) {
+    case CEPH_OSD_OP_SPARSE_READ: {
+      init_read(op, true, ops_index);
+      break;
+    }
+    case CEPH_OSD_OP_READ: {
+      init_read(op, false, ops_index);
+      break;
+    }
+    case CEPH_OSD_OP_GETXATTRS:
+    case CEPH_OSD_OP_CHECKSUM:
+    case CEPH_OSD_OP_GETXATTR: {
+      shard_id_t shard = *primary_shard;
+      Details &d = sub_reads.at(shard).details[ops_index];
+      orig_op->pass_thru_op(sub_reads.at(shard).rd, ops_index, &d.bl, &d.rval);
+      break;
+    }
+    default: {
+      ldout(cct, DBG_LVL) << __func__ <<" ABORT: unsupported" << dendl;
+      abort = true;
+      break;
+    }
+  }
+}
+
+namespace {
+void debug_op_summary(const std::string &str, Objecter::Op *op, CephContext *cct) {
+  auto &t = op->target;
+  ldout(cct, DBG_LVL) << str
+    << " osd=" << t.osd
+    << " shard=" << (t.force_shard ? *t.force_shard : shard_id_t(-1))
+    << " balance_reads=" << ((t.flags & CEPH_OSD_FLAG_BALANCE_READS) != 0)
+    << " ops.size()=" << op->ops.size()
+    << " needs_version=" << (op->objver?"true":"false");
+
+  for (auto && o : op->ops) {
+    *_dout << " op_code=" << ceph_osd_op_name(o.op.op);
+    switch (o.op.op) {
+      case CEPH_OSD_OP_READ:
+      case CEPH_OSD_OP_SPARSE_READ: {
+        *_dout << "(" << o.op.extent.offset << "~" << o.op.extent.length << ")";
+        break;
+      }
+      default:
+      break;
+    }
+  }
+  *_dout << dendl;
+}
+}
+
+
+bool SplitOp::create(Objecter::Op *op, Objecter &objecter,
+  shunique_lock<ceph::shared_mutex>& sul, ceph_tid_t *ptid, int *ctx_budget, CephContext *cct) {
+
+  auto &t = op->target;
+  const pg_pool_t *pi = objecter.osdmap->get_pg_pool(t.base_oloc.pool);
+
+  if (!pi) {
+    ldout(cct, DBG_LVL) << __func__ <<" REJECT: No Pool" << dendl;
+    return false;
+  }
+
+  debug_op_summary("orig_op: ", op, cct);
+
+  // Reject if direct reads not supported by profile.
+  if (!pi->has_flag(pg_pool_t::FLAG_CLIENT_SPLIT_READS)) {
+    ldout(cct, DBG_LVL) << __func__ <<" REJECT: split reads off" << dendl;
+    return false;
+  }
+
+  bool validated = validate(op, pi->is_erasure(), cct);
+
+  if (!validated) {
+    return false;
+  }
+
+  std::shared_ptr<SplitOp> split_read;
+
+  if (pi->is_erasure()) {
+    split_read = std::make_shared<ECSplitOp>(op, objecter, cct, pi->size);
+  } else {
+    split_read = std::make_shared<ReplicaSplitOp>(op, objecter, cct, pi->size);
+  }
+
+  if (split_read->abort) {
+    ldout(cct, DBG_LVL) << __func__ <<" ABORTED 1" << dendl;return false;
+    return false;
+  }
+
+  // Populate the target, to extract the acting set from it.
+  t.flags &= ~CEPH_OSD_FLAG_BALANCE_READS;
+  objecter._calc_target(&op->target, op);
+
+  for (unsigned i = 0; i < op->ops.size(); ++i) {
+    split_read->init( op->ops[i], i);
+    if (split_read->abort) {
+      break;
+    }
+  }
+
+  if (split_read->abort) {
+    ldout(cct, DBG_LVL) << __func__ <<" ABORTED 2" << dendl;
+    return false;
+  }
+
+  ldout(cct, DBG_LVL) << __func__ <<" sub_reads ready. count=" << split_read->sub_reads.size() << dendl;
+
+  // We are committed to doing a split read. Any re-attempts should not be either
+  // split or balanced.
+  for (auto && [shard, sub_read] : split_read->sub_reads) {
+    auto fin = new Finisher(split_read, sub_read); // Self-destructs when called.
+
+    version_t *objver = nullptr;
+    if (split_read->primary_shard && shard == *split_read->primary_shard) {
+      objver = split_read->orig_op->objver;
+    }
+
+    auto sub_op = objecter.prepare_read_op(
+      t.base_oid, t.base_oloc, split_read->sub_reads.at(shard).rd, op->snapid,
+      nullptr, split_read->flags, -1, fin, objver);
+    sub_op->target.force_shard.emplace(shard);
+    if (pi->is_erasure()) {
+      sub_op->target.flags |= CEPH_OSD_FLAG_EC_DIRECT_READ;
+    } else {
+      sub_op->target.flags |= CEPH_OSD_FLAG_BALANCE_READS;
+    }
+
+    objecter._op_submit_with_budget(sub_op, sul, ptid, ctx_budget);
+    debug_op_summary("sent_op", sub_op, cct);
+  }
+
+  ceph_assert(split_read->sub_reads.size() > 0);
+
+  return true;
+}
+
+
+#undef dout_prefix
+#define dout_prefix *_dout << messenger->get_myname() << ".objecter "
\ No newline at end of file
diff --git a/src/osdc/SplitOp.h b/src/osdc/SplitOp.h
new file mode 100644 (file)
index 0000000..6f3d9a1
--- /dev/null
@@ -0,0 +1,225 @@
+/*
+* Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2025 IBM
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ */
+
+#pragma once
+
+#include <ostream>
+#include <ranges>
+
+class SplitOp {
+
+ protected:
+  using extent = std::pair<uint64_t, uint64_t>;
+  using extents_map = std::map<uint64_t, uint64_t>;
+  using extent_set = interval_set<uint64_t, std::map, false>;
+
+  using ExtentPredicate = std::function<bool(const extent&)>;
+  using extent_map_subrange = std::ranges::subrange<extents_map::const_iterator>;
+  using extent_map_subrange_view = std::ranges::take_while_view<extent_map_subrange, ExtentPredicate>;
+  using extent_variant = std::variant<std::ranges::single_view<extent>, extent_map_subrange_view, extent_map_subrange>;
+  using buffer_appender = std::function<void(bufferlist &, uint64_t *, extent_variant)>;
+
+  // A simple struct to hold the data for each step of the iteration.
+  struct ECChunkInfo {
+    uint64_t ro_offset;
+    uint64_t shard_offset;
+    uint64_t length;
+    shard_id_t shard;
+
+    friend std::ostream & operator<<(std::ostream &os, const ECChunkInfo &obj) {
+      return os
+          << "ro_offset: " << obj.ro_offset
+          << " shard_offset: " << obj.shard_offset
+          << " length: " << obj.length
+          << " shard: " << obj.shard;
+    }
+  };
+
+  class ECStripeIterator {
+   public:
+    using iterator_category = std::input_iterator_tag;
+    using value_type = ECChunkInfo;
+    using difference_type = std::ptrdiff_t;
+    using pointer = ECChunkInfo*;
+    using reference = ECChunkInfo&;
+
+    ECStripeIterator() = default;
+
+    // Constructor for the "begin" iterator
+    ECStripeIterator(
+      uint64_t start_offset,
+      uint64_t total_len,
+      uint32_t chunk_s,
+      uint32_t data_chunks)
+      : chunk_size(chunk_s),
+        data_chunk_count(data_chunks) {
+      end_offset = start_offset + total_len;
+      current_info.ro_offset = start_offset;
+      uint64_t chunk = start_offset / chunk_size;
+      current_info.length = std::min(total_len, (chunk + 1) * chunk_size - start_offset);
+
+      // Maybe this is paranoia, as compiler would probably detect that this
+      // / and % could be done in a single op.
+      auto chunk_div = std::lldiv(chunk, data_chunk_count);
+      current_info.shard = shard_id_t(chunk_div.rem);
+      current_info.shard_offset = (chunk_div.quot) * chunk_size + start_offset % chunk_size;
+    }
+
+    value_type operator*() const {
+      return current_info;
+    }
+
+    // Pre-increment
+    ECStripeIterator& operator++() {
+      current_info.ro_offset += current_info.length;
+      current_info.shard_offset += current_info.length - chunk_size;
+      current_info.length = std::min(chunk_size, end_offset - current_info.ro_offset);
+      ceph_assert(current_info.ro_offset <= end_offset);
+      ++current_info.shard;
+      if (unsigned(current_info.shard) == data_chunk_count) {
+        current_info.shard_offset += chunk_size;
+        current_info.shard = shard_id_t(0);
+      }
+      return *this;
+    }
+
+    // post-increment
+    ECStripeIterator operator++(int) {
+      ECStripeIterator tmp = *this;
+      ++(*this);
+      return tmp;
+    }
+
+    bool operator!=(const ECStripeIterator& other) const {
+      // This is only here to terminate the loop!
+      return current_info.length != other.current_info.length;
+    }
+
+    bool operator==(const ECStripeIterator& other) const {
+      return !(*this != other);
+    }
+    value_type current_info{};
+
+  private:
+    uint64_t end_offset = 0;
+    uint64_t chunk_size = 0;
+    uint32_t data_chunk_count = 0;
+  };
+
+
+  // The custom range class that provides begin() and end()
+  class ECStripeView {
+   public:
+    ECStripeView(
+      uint64_t offset,
+      uint64_t length,
+      const pg_pool_t *pi)
+      : start_offset(offset),
+        total_length(length),
+        data_chunk_count(pi->nonprimary_shards.size() + 1),
+        chunk_size(pi->get_stripe_width() / data_chunk_count) {
+    }
+
+    ECStripeIterator begin() const {
+      return ECStripeIterator(start_offset, total_length, chunk_size, data_chunk_count);
+    }
+
+    ECStripeIterator end() const {
+      ECStripeIterator end_iter;
+      end_iter.current_info.length = 0;
+      return end_iter;
+    }
+
+    uint64_t start_offset;
+    uint64_t total_length;
+    uint32_t data_chunk_count;
+    uint64_t chunk_size;
+  };
+
+  static_assert(std::input_iterator<ECStripeIterator>,
+                "ECStripeIterator does not conform to the std::input_iterator concept");
+
+  struct Details {
+    bufferlist bl;
+    int rval;
+    boost::system::error_code ec;
+    std::optional<extents_map> e;
+  };
+
+  struct SubRead {
+    ::ObjectOperation rd;
+    mini_flat_map<int, Details> details;
+    int rc = -EIO;
+
+    SubRead(int count) : details(count) {}
+  };
+
+  // This structure self-destructs on each IO completions, using a legacy
+  // C++ pattern (no shared_ptr). We use the finish callback to record the
+  // RC, but otherwise rely on the shared_ptr destroying ec_read to deal with
+  // completion of the parent IO.
+  struct Finisher : Context {
+    std::shared_ptr<SplitOp> split_read;
+    SubRead &sub_read;
+
+    Finisher(std::shared_ptr<SplitOp> split_read, SubRead &sub_read) : split_read(split_read), sub_read(sub_read) {}
+    void finish(int r) override {
+      sub_read.rc = r;
+    }
+  };
+
+  int assemble_rc();
+  virtual std::pair<extent_set, bufferlist> assemble_buffer_sparse_read(int ops_index) = 0;
+  virtual void assemble_buffer_read(bufferlist &bl_out, int ops_index) = 0;
+  virtual void init_read(OSDOp &op, bool sparse, int ops_index) = 0;
+  void init(OSDOp &op, int ops_index);
+
+  Objecter::Op *orig_op;
+  Objecter &objecter;
+  mini_flat_map<shard_id_t, SubRead> sub_reads;
+  CephContext *cct;
+  bool abort = false; // Last minute abort... We want to keep this to a minimum.
+  int flags = 0;
+  std::optional<shard_id_t> primary_shard;
+  std::map<shard_id_t, std::vector<int>> op_offset_map;
+
+ public:
+  SplitOp(Objecter::Op *op, Objecter &objecter, CephContext *cct, int count) : orig_op(op), objecter(objecter), sub_reads(count), cct(cct) {}
+  virtual ~SplitOp() = default;
+  void complete();
+  static bool create(Objecter::Op *op, Objecter &objecter,
+    shunique_lock<ceph::shared_mutex>& sul, ceph_tid_t *ptid, int *ctx_budget, CephContext *cct);
+};
+
+class ECSplitOp : public SplitOp{
+ public:
+  using SplitOp::SplitOp;
+  std::pair<extent_set, bufferlist> assemble_buffer_sparse_read(int ops_index) override;
+  void assemble_buffer_read(bufferlist &bl_out, int ops_index) override;
+  void init_read(OSDOp &op, bool sparse, int ops_index) override;
+  ECSplitOp(Objecter::Op *op, Objecter &objecter, CephContext *cct, int count);
+  ~ECSplitOp() {
+    complete();
+  }
+};
+
+class ReplicaSplitOp : public SplitOp {
+ public:
+  using SplitOp::SplitOp;
+  std::pair<extent_set, bufferlist> assemble_buffer_sparse_read(int ops_index) override;
+  void assemble_buffer_read(bufferlist &bl_out, int ops_index) override;
+  void init_read(OSDOp &op, bool sparse, int ops_index) override;
+  ReplicaSplitOp(Objecter::Op *op, Objecter &objecter, CephContext *cct, int pool_size);
+  ~ReplicaSplitOp() {
+    complete();
+  }
+};
+