--- /dev/null
+#include "osdc/Objecter.h"
+#include "osdc/SplitOp.h"
+#include "osd/osd_types.h"
+
+#define dout_subsys ceph_subsys_objecter
+#define DBG_LVL 20
+
+namespace {
+inline boost::system::error_code osdcode(int r) {
+ return (r < 0) ? boost::system::error_code(-r, osd_category()) : boost::system::error_code();
+}
+}
+
+constexpr static uint64_t kReplicaMinShardReadSize = 128 * 1024;
+constexpr static uint64_t kReplicaMinShardReads = 2;
+constexpr static uint64_t kReplicaMinReadSize = kReplicaMinShardReadSize * kReplicaMinShardReads;
+
+#undef dout_prefix
+#define dout_prefix *_dout << " ECSplitOp::"
+
+std::pair<SplitOp::extent_set, bufferlist> ECSplitOp::assemble_buffer_sparse_read(int ops_index) {
+ bufferlist bl_out;
+ extent_set extents_out;
+
+
+ auto &orig_osd_op = orig_op->ops[ops_index].op;
+ const pg_pool_t *pi = objecter.osdmap->get_pg_pool(orig_op->target.base_oloc.pool);
+ ECStripeView stripe_view(orig_osd_op.extent.offset, orig_osd_op.extent.length, pi);
+ ldout(cct, DBG_LVL) << __func__ << " start:"
+ << orig_osd_op.extent.offset << "~" << orig_osd_op.extent.length << dendl;
+
+ std::vector<uint64_t> buffer_offset(stripe_view.data_chunk_count);
+ mini_flat_map<shard_id_t, extents_map::iterator> map_iterators(stripe_view.data_chunk_count);
+
+ for (auto &&chunk_info : stripe_view) {
+ ldout(cct, DBG_LVL) << __func__ << " chunk: " << chunk_info << dendl;
+ auto &details = sub_reads.at(chunk_info.shard).details[ops_index];
+
+ if (!map_iterators.contains(chunk_info.shard)) {
+ map_iterators.emplace(chunk_info.shard, details.e->begin());
+ }
+
+ extents_map::iterator &extent_iter = map_iterators.at(chunk_info.shard);
+
+ uint64_t bl_len = 0;
+ while (extent_iter != details.e->end() && extent_iter->first < chunk_info.ro_offset + stripe_view.chunk_size) {
+ auto [off, len] = *extent_iter;
+ ldout(cct, DBG_LVL) << __func__ << " extent=" << off << "~" << len << dendl;
+ extents_out.insert(off, len);
+ bl_len += len;
+ ++extent_iter;
+ }
+
+ // We try to keep the buffers together where possible.
+ if (bl_len != 0) {
+ bufferlist bl;
+ bl.substr_of(details.bl, buffer_offset[(int)chunk_info.shard], bl_len);
+ bl_out.append(bl);
+ buffer_offset[(int)chunk_info.shard] += bl_len;
+ }
+ }
+
+ return std::pair(extents_out, bl_out);
+}
+
+void ECSplitOp::assemble_buffer_read(bufferlist &bl_out, int ops_index) {
+ auto &orig_osd_op = orig_op->ops[ops_index].op;
+ const pg_pool_t *pi = objecter.osdmap->get_pg_pool(orig_op->target.base_oloc.pool);
+ ECStripeView stripe_view(orig_osd_op.extent.offset, orig_osd_op.extent.length, pi);
+
+ std::vector<uint64_t> buffer_offset(stripe_view.data_chunk_count);
+ ldout(cct, DBG_LVL) << __func__ << " " << orig_osd_op.extent.offset << "~" << orig_osd_op.extent.length << dendl;
+
+ for (auto &&chunk_info : stripe_view) {
+ ldout(cct, DBG_LVL) << __func__ << " chunk info " << chunk_info << dendl;
+ auto &details = sub_reads.at(chunk_info.shard).details[ops_index];
+ bufferlist bl;
+ bl.substr_of(details.bl, buffer_offset[(int)chunk_info.shard], chunk_info.length);
+ bl_out.append(bl);
+ buffer_offset[(int)chunk_info.shard] += chunk_info.length;
+ }
+}
+
+void ECSplitOp::init_read(OSDOp &op, bool sparse, int ops_index) {
+ auto &t = orig_op->target;
+ const pg_pool_t *pi = objecter.osdmap->get_pg_pool(t.base_oloc.pool);
+ uint64_t offset = op.op.extent.offset;
+ uint64_t length = op.op.extent.length;
+ uint64_t data_chunk_count = pi->nonprimary_shards.size() + 1;
+ uint32_t chunk_size = pi->get_stripe_width() / data_chunk_count;
+ uint64_t start_chunk = offset / chunk_size;
+ // This calculation is wrong for length = 0, but such IOs should not have
+ // reached here!
+ ceph_assert( op.op.extent.length != 0);
+ uint64_t end_chunk = (offset + op.op.extent.length - 1) / chunk_size;
+
+ unsigned count = std::min(data_chunk_count, end_chunk - start_chunk + 1);
+ //FIXME: This is not quite right - the ops.size() > 1 does not necessarily mean
+ // that the primary is required - it could be two reads to the same shard.
+ bool primary_required = count > 1 || orig_op->objver || orig_op->ops.size() > 1;
+
+ int first_shard = start_chunk % data_chunk_count;
+ // Check all shards are online.
+ for (unsigned i = first_shard; i < first_shard + count; i++) {
+ shard_id_t shard(i >= data_chunk_count ? i - data_chunk_count : i);
+ int direct_osd = t.acting[(int)shard];
+ if (t.acting_primary == direct_osd) {
+ primary_shard.emplace(shard);
+ }
+ if (!objecter.osdmap->exists(direct_osd)) {
+ ldout(cct, DBG_LVL) << __func__ <<" ABORT: Missing OSD" << dendl;
+ abort = true;
+ return;
+ }
+ if (!sub_reads.contains(shard)) {
+ sub_reads.emplace(shard, orig_op->ops.size());
+ }
+ auto &d = sub_reads.at(shard).details[ops_index];
+ if (sparse) {
+ d.e.emplace();
+ sub_reads.at(shard).rd.sparse_read(offset, length, &(*d.e), &d.bl, &d.rval);
+ } else {
+ sub_reads.at(shard).rd.read(offset, length, &d.ec, &d.bl);
+ }
+ }
+
+ if (primary_required && !primary_shard) {
+ for (unsigned i=0; i < t.acting.size(); ++i) {
+ if (t.acting[i] == t.acting_primary) {
+ primary_shard.emplace(i);
+ sub_reads.emplace(*primary_shard, orig_op->ops.size());
+ }
+ }
+
+ // No primary??? Let the normal code paths deal with this.
+ if (!primary_shard) {
+ ldout(cct, DBG_LVL) << __func__ <<" ABORT: Can't find primary" << dendl;
+ abort = true;
+ }
+ }
+}
+
+ECSplitOp::ECSplitOp(Objecter::Op *op, Objecter &objecter, CephContext *cct, int count) :
+ SplitOp(op, objecter, cct, count) {}
+
+#undef dout_prefix
+#define dout_prefix *_dout << " ReplicaSplitOp::"
+
+std::pair<SplitOp::extent_set, bufferlist> ReplicaSplitOp::assemble_buffer_sparse_read(int ops_index) {
+ extent_set extents_out;
+ bufferlist bl_out;
+
+ for (auto && [shard, sr] : sub_reads) {
+ for (auto [off, len] : *sr.details[ops_index].e) {
+ extents_out.insert(off, len);
+ }
+ bl_out.append(sr.details[ops_index].bl);
+ }
+
+ return std::pair(extents_out, bl_out);
+}
+
+void ReplicaSplitOp::assemble_buffer_read(bufferlist &bl_out, int ops_index) {
+ for (auto && [_, sr] : sub_reads) {
+ bl_out.append(sr.details[ops_index].bl);
+ }
+}
+
+ReplicaSplitOp::ReplicaSplitOp(Objecter::Op *op, Objecter &objecter, CephContext *cct, int pool_size) :
+ SplitOp(op, objecter, cct, pool_size) {
+
+ // This may not actually be the primary, but since all shards are kept current
+ // in replica, it does not actually matter which we choose here. Choose 0 since
+ // there will always be a read to this shard.
+ primary_shard = shard_id_t(0);
+}
+
+void ReplicaSplitOp::init_read(OSDOp &op, bool sparse, int ops_index) {
+
+ auto &t = orig_op->target;
+
+ std::set<int> osds;
+ for (int direct_osd : t.acting) {
+ if (objecter.osdmap->exists(direct_osd)) {
+ osds.insert(direct_osd);
+ }
+ }
+
+ if (osds.size() < 2) {
+ ldout(cct, DBG_LVL) << __func__ <<" ABORT: No OSDs" << dendl;
+ abort = true;
+ return;
+ }
+
+ uint64_t offset = op.op.extent.offset;
+ uint64_t length = op.op.extent.length;
+ uint64_t slice_count = std::min(length / kReplicaMinShardReadSize, osds.size());
+ uint64_t chunk_size = p2roundup(length / slice_count, (uint64_t)CEPH_PAGE_SIZE);
+
+ for (unsigned i = 0; i < osds.size() && length > 0; i++) {
+
+ shard_id_t shard(i);
+ if (!sub_reads.contains(shard)) {
+ sub_reads.emplace(shard, orig_op->ops.size());
+ }
+ auto &sr = sub_reads.at(shard);
+ auto bl = &sr.details[ops_index].bl;
+ auto rval = &sr.details[ops_index].rval;
+ uint64_t len = std::min(length, chunk_size);
+ if (sparse) {
+ sr.details[ops_index].e.emplace();
+ sr.rd.sparse_read(offset, len, &(*sr.details[ops_index].e), bl, rval);
+ } else {
+ sr.rd.read(offset, len, &sr.details[ops_index].ec, bl);
+ }
+ offset += len;
+ length -= len;
+ }
+}
+
+#undef dout_prefix
+#define dout_prefix *_dout << " SplitOp::"
+
+int SplitOp::assemble_rc() {
+ int rc = 0;
+ bool rc_zero = false;
+
+ // This should only happen on a single thread.
+ for (auto & [_, sub_read] : sub_reads) {
+ if (rc >= 0 && sub_read.rc >= 0) {
+ rc += sub_read.rc;
+ if (sub_read.rc == 0) {
+ rc_zero = true;
+ }
+ } else if (rc >= 0) {
+ rc = sub_read.rc;
+ } // else ignore subsequent errors.
+ }
+
+ if (rc >= 0 && rc_zero) {
+ return 0;
+ }
+
+ return rc;
+}
+
+void SplitOp::complete() {
+ if (abort) {
+ return;
+ }
+ ldout(cct, 20) << __func__ << " entry this=" << this << dendl;
+
+ int rc = assemble_rc();
+ if (rc >= 0) {
+
+ // In a "normal" completion, out_ops is generated in the MOSDOpReply reply
+ // which we do not have here. Here we are going to mimic this behaviour
+ // so as to reproduce as much as possible of the IO completion.
+ std::vector out_ops(orig_op->ops.begin(), orig_op->ops.end());
+
+ for (unsigned ops_index=0; ops_index < out_ops.size(); ++ops_index) {
+ auto &out_osd_op = out_ops[ops_index];
+ switch (out_osd_op.op.op) {
+ case CEPH_OSD_OP_SPARSE_READ: {
+ auto [extents, bl] = assemble_buffer_sparse_read(ops_index);
+ encode(std::move(extents).detach(), out_osd_op.outdata);
+ encode_destructively(bl, out_osd_op.outdata);
+ break;
+ }
+ case CEPH_OSD_OP_READ: {
+ assemble_buffer_read(out_osd_op.outdata, ops_index);
+ break;
+ }
+ case CEPH_OSD_OP_GETXATTRS:
+ case CEPH_OSD_OP_CHECKSUM:
+ case CEPH_OSD_OP_GETXATTR: {
+ out_osd_op.outdata = sub_reads.at(*primary_shard).details[ops_index].bl;
+ out_osd_op.rval = sub_reads.at(*primary_shard).details[ops_index].rval;
+ break;
+ }
+ default: {
+ ceph_abort_msg("Not supported");
+ break;
+ }
+ }
+ }
+
+ objecter.handle_osd_op_reply2(orig_op, out_ops);
+
+ ldout(cct, DBG_LVL) << __func__ << " success this=" << this << " rc=" << rc << dendl;
+ Objecter::Op::complete(std::move(orig_op->onfinish), osdcode(rc), rc, objecter.service.get_executor());
+ objecter._finish_op(orig_op, rc);
+ } else {
+ ldout(cct, DBG_LVL) << __func__ << " retry this=" << this << " rc=" << rc << dendl;
+ objecter.op_post_submit(orig_op);
+ }
+}
+
+static bool validate(Objecter::Op *op, bool is_erasure, CephContext *cct) {
+
+ if ((op->target.flags & CEPH_OSD_FLAG_BALANCE_READS) == 0 ) {
+ ldout(cct, DBG_LVL) << __func__ <<" REJECT: Client rejects balanced read" << dendl;
+ return false;
+ }
+
+ uint64_t suitable_read_found = false;
+ for (auto & o : op->ops) {
+ switch (o.op.op) {
+ case CEPH_OSD_OP_READ:
+ case CEPH_OSD_OP_SPARSE_READ: {
+ uint64_t length = o.op.extent.length;
+ if ((is_erasure && length > 0) ||
+ (!is_erasure && length >= kReplicaMinReadSize)) {
+ suitable_read_found = true;
+ }
+ break;
+ }
+ case CEPH_OSD_OP_GETXATTRS:
+ case CEPH_OSD_OP_CHECKSUM:
+ case CEPH_OSD_OP_GETXATTR: {
+ break; // Do not block validate.
+ }
+ default: {
+ ldout(cct, DBG_LVL) << __func__ <<" REJECT: unsupported op" << dendl;
+ return false;
+ }
+ }
+ }
+
+ return suitable_read_found;
+}
+
+void SplitOp::init(OSDOp &op, int ops_index) {
+ switch (op.op.op) {
+ case CEPH_OSD_OP_SPARSE_READ: {
+ init_read(op, true, ops_index);
+ break;
+ }
+ case CEPH_OSD_OP_READ: {
+ init_read(op, false, ops_index);
+ break;
+ }
+ case CEPH_OSD_OP_GETXATTRS:
+ case CEPH_OSD_OP_CHECKSUM:
+ case CEPH_OSD_OP_GETXATTR: {
+ shard_id_t shard = *primary_shard;
+ Details &d = sub_reads.at(shard).details[ops_index];
+ orig_op->pass_thru_op(sub_reads.at(shard).rd, ops_index, &d.bl, &d.rval);
+ break;
+ }
+ default: {
+ ldout(cct, DBG_LVL) << __func__ <<" ABORT: unsupported" << dendl;
+ abort = true;
+ break;
+ }
+ }
+}
+
+namespace {
+void debug_op_summary(const std::string &str, Objecter::Op *op, CephContext *cct) {
+ auto &t = op->target;
+ ldout(cct, DBG_LVL) << str
+ << " osd=" << t.osd
+ << " shard=" << (t.force_shard ? *t.force_shard : shard_id_t(-1))
+ << " balance_reads=" << ((t.flags & CEPH_OSD_FLAG_BALANCE_READS) != 0)
+ << " ops.size()=" << op->ops.size()
+ << " needs_version=" << (op->objver?"true":"false");
+
+ for (auto && o : op->ops) {
+ *_dout << " op_code=" << ceph_osd_op_name(o.op.op);
+ switch (o.op.op) {
+ case CEPH_OSD_OP_READ:
+ case CEPH_OSD_OP_SPARSE_READ: {
+ *_dout << "(" << o.op.extent.offset << "~" << o.op.extent.length << ")";
+ break;
+ }
+ default:
+ break;
+ }
+ }
+ *_dout << dendl;
+}
+}
+
+
+bool SplitOp::create(Objecter::Op *op, Objecter &objecter,
+ shunique_lock<ceph::shared_mutex>& sul, ceph_tid_t *ptid, int *ctx_budget, CephContext *cct) {
+
+ auto &t = op->target;
+ const pg_pool_t *pi = objecter.osdmap->get_pg_pool(t.base_oloc.pool);
+
+ if (!pi) {
+ ldout(cct, DBG_LVL) << __func__ <<" REJECT: No Pool" << dendl;
+ return false;
+ }
+
+ debug_op_summary("orig_op: ", op, cct);
+
+ // Reject if direct reads not supported by profile.
+ if (!pi->has_flag(pg_pool_t::FLAG_CLIENT_SPLIT_READS)) {
+ ldout(cct, DBG_LVL) << __func__ <<" REJECT: split reads off" << dendl;
+ return false;
+ }
+
+ bool validated = validate(op, pi->is_erasure(), cct);
+
+ if (!validated) {
+ return false;
+ }
+
+ std::shared_ptr<SplitOp> split_read;
+
+ if (pi->is_erasure()) {
+ split_read = std::make_shared<ECSplitOp>(op, objecter, cct, pi->size);
+ } else {
+ split_read = std::make_shared<ReplicaSplitOp>(op, objecter, cct, pi->size);
+ }
+
+ if (split_read->abort) {
+ ldout(cct, DBG_LVL) << __func__ <<" ABORTED 1" << dendl;return false;
+ return false;
+ }
+
+ // Populate the target, to extract the acting set from it.
+ t.flags &= ~CEPH_OSD_FLAG_BALANCE_READS;
+ objecter._calc_target(&op->target, op);
+
+ for (unsigned i = 0; i < op->ops.size(); ++i) {
+ split_read->init( op->ops[i], i);
+ if (split_read->abort) {
+ break;
+ }
+ }
+
+ if (split_read->abort) {
+ ldout(cct, DBG_LVL) << __func__ <<" ABORTED 2" << dendl;
+ return false;
+ }
+
+ ldout(cct, DBG_LVL) << __func__ <<" sub_reads ready. count=" << split_read->sub_reads.size() << dendl;
+
+ // We are committed to doing a split read. Any re-attempts should not be either
+ // split or balanced.
+ for (auto && [shard, sub_read] : split_read->sub_reads) {
+ auto fin = new Finisher(split_read, sub_read); // Self-destructs when called.
+
+ version_t *objver = nullptr;
+ if (split_read->primary_shard && shard == *split_read->primary_shard) {
+ objver = split_read->orig_op->objver;
+ }
+
+ auto sub_op = objecter.prepare_read_op(
+ t.base_oid, t.base_oloc, split_read->sub_reads.at(shard).rd, op->snapid,
+ nullptr, split_read->flags, -1, fin, objver);
+ sub_op->target.force_shard.emplace(shard);
+ if (pi->is_erasure()) {
+ sub_op->target.flags |= CEPH_OSD_FLAG_EC_DIRECT_READ;
+ } else {
+ sub_op->target.flags |= CEPH_OSD_FLAG_BALANCE_READS;
+ }
+
+ objecter._op_submit_with_budget(sub_op, sul, ptid, ctx_budget);
+ debug_op_summary("sent_op", sub_op, cct);
+ }
+
+ ceph_assert(split_read->sub_reads.size() > 0);
+
+ return true;
+}
+
+
+#undef dout_prefix
+#define dout_prefix *_dout << messenger->get_myname() << ".objecter "
\ No newline at end of file
--- /dev/null
+#pragma once
+
+#include <ostream>
+#include <ranges>
+
+class SplitOp {
+
+ protected:
+ using extent = std::pair<uint64_t, uint64_t>;
+ using extents_map = std::map<uint64_t, uint64_t>;
+ using extent_set = interval_set<uint64_t, std::map, false>;
+
+ using ExtentPredicate = std::function<bool(const extent&)>;
+ using extent_map_subrange = std::ranges::subrange<extents_map::const_iterator>;
+ using extent_map_subrange_view = std::ranges::take_while_view<extent_map_subrange, ExtentPredicate>;
+ using extent_variant = std::variant<std::ranges::single_view<extent>, extent_map_subrange_view, extent_map_subrange>;
+ using buffer_appender = std::function<void(bufferlist &, uint64_t *, extent_variant)>;
+
+ // A simple struct to hold the data for each step of the iteration.
+ struct ECChunkInfo {
+ uint64_t ro_offset;
+ uint64_t shard_offset;
+ uint64_t length;
+ shard_id_t shard;
+
+ friend std::ostream & operator<<(std::ostream &os, const ECChunkInfo &obj) {
+ return os
+ << "ro_offset: " << obj.ro_offset
+ << " shard_offset: " << obj.shard_offset
+ << " length: " << obj.length
+ << " shard: " << obj.shard;
+ }
+ };
+
+ class ECStripeIterator {
+ public:
+ using iterator_category = std::input_iterator_tag;
+ using value_type = ECChunkInfo;
+ using difference_type = std::ptrdiff_t;
+ using pointer = ECChunkInfo*;
+ using reference = ECChunkInfo&;
+
+ ECStripeIterator() = default;
+
+ // Constructor for the "begin" iterator
+ ECStripeIterator(
+ uint64_t start_offset,
+ uint64_t total_len,
+ uint32_t chunk_s,
+ uint32_t data_chunks)
+ : chunk_size(chunk_s),
+ data_chunk_count(data_chunks) {
+ end_offset = start_offset + total_len;
+ current_info.ro_offset = start_offset;
+ uint64_t chunk = start_offset / chunk_size;
+ current_info.length = std::min(total_len, (chunk + 1) * chunk_size - start_offset);
+
+ // Maybe this is paranoia, as compiler would probably detect that this
+ // / and % could be done in a single op.
+ auto chunk_div = std::lldiv(chunk, data_chunk_count);
+ current_info.shard = shard_id_t(chunk_div.rem);
+ current_info.shard_offset = (chunk_div.quot) * chunk_size + start_offset % chunk_size;
+ }
+
+ value_type operator*() const {
+ return current_info;
+ }
+
+ // Pre-increment
+ ECStripeIterator& operator++() {
+ current_info.ro_offset += current_info.length;
+ current_info.shard_offset += current_info.length - chunk_size;
+ current_info.length = std::min(chunk_size, end_offset - current_info.ro_offset);
+ ceph_assert(current_info.ro_offset <= end_offset);
+ ++current_info.shard;
+ if (unsigned(current_info.shard) == data_chunk_count) {
+ current_info.shard_offset += chunk_size;
+ current_info.shard = shard_id_t(0);
+ }
+ return *this;
+ }
+
+ // post-increment
+ ECStripeIterator operator++(int) {
+ ECStripeIterator tmp = *this;
+ ++(*this);
+ return tmp;
+ }
+
+ bool operator!=(const ECStripeIterator& other) const {
+ // This is only here to terminate the loop!
+ return current_info.length != other.current_info.length;
+ }
+
+ bool operator==(const ECStripeIterator& other) const {
+ return !(*this != other);
+ }
+ value_type current_info{};
+
+ private:
+ uint64_t end_offset = 0;
+ uint64_t chunk_size = 0;
+ uint32_t data_chunk_count = 0;
+ };
+
+
+ // The custom range class that provides begin() and end()
+ class ECStripeView {
+ public:
+ ECStripeView(
+ uint64_t offset,
+ uint64_t length,
+ const pg_pool_t *pi)
+ : start_offset(offset),
+ total_length(length),
+ data_chunk_count(pi->nonprimary_shards.size() + 1),
+ chunk_size(pi->get_stripe_width() / data_chunk_count) {
+ }
+
+ ECStripeIterator begin() const {
+ return ECStripeIterator(start_offset, total_length, chunk_size, data_chunk_count);
+ }
+
+ ECStripeIterator end() const {
+ ECStripeIterator end_iter;
+ end_iter.current_info.length = 0;
+ return end_iter;
+ }
+
+ uint64_t start_offset;
+ uint64_t total_length;
+ uint32_t data_chunk_count;
+ uint64_t chunk_size;
+ };
+
+ static_assert(std::input_iterator<ECStripeIterator>,
+ "ECStripeIterator does not conform to the std::input_iterator concept");
+
+ struct Details {
+ bufferlist bl;
+ int rval;
+ boost::system::error_code ec;
+ std::optional<extents_map> e;
+ };
+
+ struct SubRead {
+ ::ObjectOperation rd;
+ mini_flat_map<int, Details> details;
+ int rc = -EIO;
+
+ SubRead(int count) : details(count) {}
+ };
+
+ // This structure self-destructs on each IO completions, using a legacy
+ // C++ pattern (no shared_ptr). We use the finish callback to record the
+ // RC, but otherwise rely on the shared_ptr destroying ec_read to deal with
+ // completion of the parent IO.
+ struct Finisher : Context {
+ std::shared_ptr<SplitOp> split_read;
+ SubRead &sub_read;
+
+ Finisher(std::shared_ptr<SplitOp> split_read, SubRead &sub_read) : split_read(split_read), sub_read(sub_read) {}
+ void finish(int r) override {
+ sub_read.rc = r;
+ }
+ };
+
+ int assemble_rc();
+ virtual std::pair<extent_set, bufferlist> assemble_buffer_sparse_read(int ops_index) = 0;
+ virtual void assemble_buffer_read(bufferlist &bl_out, int ops_index) = 0;
+ virtual void init_read(OSDOp &op, bool sparse, int ops_index) = 0;
+ void init(OSDOp &op, int ops_index);
+
+ Objecter::Op *orig_op;
+ Objecter &objecter;
+ mini_flat_map<shard_id_t, SubRead> sub_reads;
+ CephContext *cct;
+ bool abort = false; // Last minute abort... We want to keep this to a minimum.
+ int flags = 0;
+ std::optional<shard_id_t> primary_shard;
+ std::map<shard_id_t, std::vector<int>> op_offset_map;
+
+ public:
+ SplitOp(Objecter::Op *op, Objecter &objecter, CephContext *cct, int count) : orig_op(op), objecter(objecter), sub_reads(count), cct(cct) {}
+ virtual ~SplitOp() = default;
+ void complete();
+ static bool create(Objecter::Op *op, Objecter &objecter,
+ shunique_lock<ceph::shared_mutex>& sul, ceph_tid_t *ptid, int *ctx_budget, CephContext *cct);
+};
+
+class ECSplitOp : public SplitOp{
+ public:
+ using SplitOp::SplitOp;
+ std::pair<extent_set, bufferlist> assemble_buffer_sparse_read(int ops_index) override;
+ void assemble_buffer_read(bufferlist &bl_out, int ops_index) override;
+ void init_read(OSDOp &op, bool sparse, int ops_index) override;
+ ECSplitOp(Objecter::Op *op, Objecter &objecter, CephContext *cct, int count);
+ ~ECSplitOp() {
+ complete();
+ }
+};
+
+class ReplicaSplitOp : public SplitOp {
+ public:
+ using SplitOp::SplitOp;
+ std::pair<extent_set, bufferlist> assemble_buffer_sparse_read(int ops_index) override;
+ void assemble_buffer_read(bufferlist &bl_out, int ops_index) override;
+ void init_read(OSDOp &op, bool sparse, int ops_index) override;
+ ReplicaSplitOp(Objecter::Op *op, Objecter &objecter, CephContext *cct, int pool_size);
+ ~ReplicaSplitOp() {
+ complete();
+ }
+};
+