From 2f60449098fdb93b84827ca3e69c9f2abee015eb Mon Sep 17 00:00:00 2001 From: Alex Ainscow Date: Tue, 14 Oct 2025 09:24:56 +0100 Subject: [PATCH] osdc: Add SplitOp capability to Objecter This will provide the ability for Objecter to split up certain ops and distribute them to the OSDs directly if that provides a preformance advantage. This is experimental code and is switched off unless the magic pool flags are enabled. These magic pool flags were pushed in an earlier commit in the same PR. Signed-off-by: Alex Ainscow --- src/CMakeLists.txt | 1 + src/osdc/Objecter.cc | 9 +- src/osdc/Objecter.h | 4 + src/osdc/SplitOp.cc | 473 +++++++++++++++++++++++++++++++++++++++++++ src/osdc/SplitOp.h | 214 ++++++++++++++++++++ 5 files changed, 700 insertions(+), 1 deletion(-) create mode 100644 src/osdc/SplitOp.cc create mode 100644 src/osdc/SplitOp.h diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 06f900d6e6e..ab6dc257623 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -532,6 +532,7 @@ set(libcommon_files osdc/Striper.cc osdc/Objecter.cc osdc/error_code.cc + osdc/SplitOp.cc librbd/Features.cc librbd/io/IoOperations.cc ${mds_files}) diff --git a/src/osdc/Objecter.cc b/src/osdc/Objecter.cc index 2d3b3df235a..d4a97f2a2dd 100644 --- a/src/osdc/Objecter.cc +++ b/src/osdc/Objecter.cc @@ -63,6 +63,8 @@ #include "neorados/RADOSImpl.h" +#include "osdc/SplitOp.h" + using std::list; using std::make_pair; using std::map; @@ -2332,7 +2334,12 @@ void Objecter::op_submit(Op *op, ceph_tid_t *ptid, int *ctx_budget) if (!ptid) ptid = &tid; op->trace.event("op submit"); - _op_submit_with_budget(op, rl, ptid, ctx_budget); + + bool was_split = SplitOp::create(op, *this, rl, ptid, ctx_budget, cct); + + if (!was_split) { + _op_submit_with_budget(op, rl, ptid, ctx_budget); + } } void Objecter::_op_submit_with_budget(Op *op, diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h index 7f03f99041e..7734cc0d449 100644 --- a/src/osdc/Objecter.h +++ b/src/osdc/Objecter.h @@ -1691,6 +1691,10 @@ inline std::ostream& operator <<(std::ostream& m, const ObjectOperation& oo) { // ---------------- class Objecter : public md_config_obs_t, public Dispatcher { + friend class SplitOp; + friend class ECSplitOp; + friend class ReplicaSplitOp; + using MOSDOp = _mosdop::MOSDOp; public: using OpSignature = void(boost::system::error_code); diff --git a/src/osdc/SplitOp.cc b/src/osdc/SplitOp.cc new file mode 100644 index 00000000000..884b140a953 --- /dev/null +++ b/src/osdc/SplitOp.cc @@ -0,0 +1,473 @@ +#include "osdc/Objecter.h" +#include "osdc/SplitOp.h" +#include "osd/osd_types.h" + +#define dout_subsys ceph_subsys_objecter +#define DBG_LVL 0 + +namespace { +inline boost::system::error_code osdcode(int r) { + return (r < 0) ? boost::system::error_code(-r, osd_category()) : boost::system::error_code(); +} +} + +constexpr static uint64_t kReplicaMinShardReadSize = 128 * 1024; +constexpr static uint64_t kReplicaMinShardReads = 2; +constexpr static uint64_t kReplicaMinReadSize = kReplicaMinShardReadSize * kReplicaMinShardReads; + +#undef dout_prefix +#define dout_prefix *_dout << " ECSplitOp::" + +std::pair ECSplitOp::assemble_buffer_sparse_read(int ops_index) { + bufferlist bl_out; + extent_set extents_out; + + + auto &orig_osd_op = orig_op->ops[ops_index].op; + const pg_pool_t *pi = objecter.osdmap->get_pg_pool(orig_op->target.base_oloc.pool); + ECStripeView stripe_view(orig_osd_op.extent.offset, orig_osd_op.extent.length, pi); + ldout(cct, DBG_LVL) << __func__ << " start:" + << orig_osd_op.extent.offset << "~" << orig_osd_op.extent.length << dendl; + + std::vector buffer_offset(stripe_view.data_chunk_count); + mini_flat_map map_iterators(stripe_view.data_chunk_count); + + for (auto &&chunk_info : stripe_view) { + ldout(cct, DBG_LVL) << __func__ << " chunk: " << chunk_info << dendl; + auto &details = sub_reads.at(chunk_info.shard).details[ops_index]; + + if (!map_iterators.contains(chunk_info.shard)) { + map_iterators.emplace(chunk_info.shard, details.e->begin()); + } + + extents_map::iterator &extent_iter = map_iterators.at(chunk_info.shard); + + uint64_t bl_len = 0; + while (extent_iter != details.e->end() && extent_iter->first < chunk_info.ro_offset + stripe_view.chunk_size) { + auto [off, len] = *extent_iter; + ldout(cct, DBG_LVL) << __func__ << " extent=" << off << "~" << len << dendl; + extents_out.insert(off, len); + bl_len += len; + ++extent_iter; + } + + // We try to keep the buffers together where possible. + if (bl_len != 0) { + bufferlist bl; + bl.substr_of(details.bl, buffer_offset[(int)chunk_info.shard], bl_len); + bl_out.append(bl); + buffer_offset[(int)chunk_info.shard] += bl_len; + } + } + + return std::pair(extents_out, bl_out); +} + +void ECSplitOp::assemble_buffer_read(bufferlist &bl_out, int ops_index) { + auto &orig_osd_op = orig_op->ops[ops_index].op; + const pg_pool_t *pi = objecter.osdmap->get_pg_pool(orig_op->target.base_oloc.pool); + ECStripeView stripe_view(orig_osd_op.extent.offset, orig_osd_op.extent.length, pi); + + std::vector buffer_offset(stripe_view.data_chunk_count); + ldout(cct, DBG_LVL) << __func__ << " " << orig_osd_op.extent.offset << "~" << orig_osd_op.extent.length << dendl; + + for (auto &&chunk_info : stripe_view) { + ldout(cct, DBG_LVL) << __func__ << " chunk info " << chunk_info << dendl; + auto &details = sub_reads.at(chunk_info.shard).details[ops_index]; + bufferlist bl; + bl.substr_of(details.bl, buffer_offset[(int)chunk_info.shard], chunk_info.length); + bl_out.append(bl); + buffer_offset[(int)chunk_info.shard] += chunk_info.length; + } +} + +void ECSplitOp::init_read(OSDOp &op, bool sparse, int ops_index) { + auto &t = orig_op->target; + const pg_pool_t *pi = objecter.osdmap->get_pg_pool(t.base_oloc.pool); + uint64_t offset = op.op.extent.offset; + uint64_t length = op.op.extent.length; + uint64_t data_chunk_count = pi->nonprimary_shards.size() + 1; + uint32_t chunk_size = pi->get_stripe_width() / data_chunk_count; + uint64_t start_chunk = offset / chunk_size; + // This calculation is wrong for length = 0, but such IOs should not have + // reached here! + ceph_assert( op.op.extent.length != 0); + uint64_t end_chunk = (offset + op.op.extent.length - 1) / chunk_size; + + unsigned count = std::min(data_chunk_count, end_chunk - start_chunk + 1); + //FIXME: This is not quite right - the ops.size() > 1 does not necessarily mean + // that the primary is required - it could be two reads to the same shard. + bool primary_required = count > 1 || orig_op->objver || orig_op->ops.size() > 1; + + int first_shard = start_chunk % data_chunk_count; + // Check all shards are online. + for (unsigned i = first_shard; i < first_shard + count; i++) { + shard_id_t shard(i >= data_chunk_count ? i - data_chunk_count : i); + int direct_osd = t.acting[(int)shard]; + if (t.acting_primary == direct_osd) { + primary_shard.emplace(shard); + } + if (!objecter.osdmap->exists(direct_osd)) { + ldout(cct, DBG_LVL) << __func__ <<" ABORT: Missing OSD" << dendl; + abort = true; + return; + } + if (!sub_reads.contains(shard)) { + sub_reads.emplace(shard, orig_op->ops.size()); + } + auto &d = sub_reads.at(shard).details[ops_index]; + if (sparse) { + d.e.emplace(); + sub_reads.at(shard).rd.sparse_read(offset, length, &(*d.e), &d.bl, &d.rval); + } else { + sub_reads.at(shard).rd.read(offset, length, &d.ec, &d.bl); + } + } + + if (primary_required && !primary_shard) { + for (unsigned i=0; i < t.acting.size(); ++i) { + if (t.acting[i] == t.acting_primary) { + primary_shard.emplace(i); + sub_reads.emplace(*primary_shard, orig_op->ops.size()); + } + } + + // No primary??? Let the normal code paths deal with this. + if (!primary_shard) { + ldout(cct, DBG_LVL) << __func__ <<" ABORT: Can't find primary" << dendl; + abort = true; + } + } +} + +ECSplitOp::ECSplitOp(Objecter::Op *op, Objecter &objecter, CephContext *cct, int count) : + SplitOp(op, objecter, cct, count) {} + +#undef dout_prefix +#define dout_prefix *_dout << " ReplicaSplitOp::" + +std::pair ReplicaSplitOp::assemble_buffer_sparse_read(int ops_index) { + extent_set extents_out; + bufferlist bl_out; + + for (auto && [shard, sr] : sub_reads) { + for (auto [off, len] : *sr.details[ops_index].e) { + extents_out.insert(off, len); + } + bl_out.append(sr.details[ops_index].bl); + } + + return std::pair(extents_out, bl_out); +} + +void ReplicaSplitOp::assemble_buffer_read(bufferlist &bl_out, int ops_index) { + for (auto && [_, sr] : sub_reads) { + bl_out.append(sr.details[ops_index].bl); + } +} + +ReplicaSplitOp::ReplicaSplitOp(Objecter::Op *op, Objecter &objecter, CephContext *cct, int pool_size) : + SplitOp(op, objecter, cct, pool_size) { + + // This may not actually be the primary, but since all shards are kept current + // in replica, it does not actually matter which we choose here. Choose 0 since + // there will always be a read to this shard. + primary_shard = shard_id_t(0); +} + +void ReplicaSplitOp::init_read(OSDOp &op, bool sparse, int ops_index) { + + auto &t = orig_op->target; + + std::set osds; + for (int direct_osd : t.acting) { + if (objecter.osdmap->exists(direct_osd)) { + osds.insert(direct_osd); + } + } + + if (osds.size() < 2) { + ldout(cct, DBG_LVL) << __func__ <<" ABORT: No OSDs" << dendl; + abort = true; + return; + } + + uint64_t offset = op.op.extent.offset; + uint64_t length = op.op.extent.length; + uint64_t slice_count = std::min(length / kReplicaMinShardReadSize, osds.size()); + uint64_t chunk_size = p2roundup(length / slice_count, (uint64_t)CEPH_PAGE_SIZE); + + for (unsigned i = 0; i < osds.size() && length > 0; i++) { + + shard_id_t shard(i); + if (!sub_reads.contains(shard)) { + sub_reads.emplace(shard, orig_op->ops.size()); + } + auto &sr = sub_reads.at(shard); + auto bl = &sr.details[ops_index].bl; + auto rval = &sr.details[ops_index].rval; + uint64_t len = std::min(length, chunk_size); + if (sparse) { + sr.details[ops_index].e.emplace(); + sr.rd.sparse_read(offset, len, &(*sr.details[ops_index].e), bl, rval); + } else { + sr.rd.read(offset, len, &sr.details[ops_index].ec, bl); + } + offset += len; + length -= len; + } +} + +#undef dout_prefix +#define dout_prefix *_dout << " SplitOp::" + +int SplitOp::assemble_rc() { + int rc = 0; + bool rc_zero = false; + + // This should only happen on a single thread. + for (auto & [_, sub_read] : sub_reads) { + if (rc >= 0 && sub_read.rc >= 0) { + rc += sub_read.rc; + if (sub_read.rc == 0) { + rc_zero = true; + } + } else if (rc >= 0) { + rc = sub_read.rc; + } // else ignore subsequent errors. + } + + if (rc >= 0 && rc_zero) { + return 0; + } + + return rc; +} + +void SplitOp::complete() { + if (abort) { + return; + } + ldout(cct, 20) << __func__ << " entry this=" << this << dendl; + + int rc = assemble_rc(); + if (rc >= 0) { + + // In a "normal" completion, out_ops is generated in the MOSDOpReply reply + // which we do not have here. Here we are going to mimic this behaviour + // so as to reproduce as much as possible of the IO completion. + std::vector out_ops(orig_op->ops.begin(), orig_op->ops.end()); + + for (unsigned ops_index=0; ops_index < out_ops.size(); ++ops_index) { + auto &out_osd_op = out_ops[ops_index]; + switch (out_osd_op.op.op) { + case CEPH_OSD_OP_SPARSE_READ: { + auto [extents, bl] = assemble_buffer_sparse_read(ops_index); + encode(std::move(extents).detach(), out_osd_op.outdata); + encode_destructively(bl, out_osd_op.outdata); + break; + } + case CEPH_OSD_OP_READ: { + assemble_buffer_read(out_osd_op.outdata, ops_index); + break; + } + case CEPH_OSD_OP_GETXATTRS: + case CEPH_OSD_OP_CHECKSUM: + case CEPH_OSD_OP_GETXATTR: { + out_osd_op.outdata = sub_reads.at(*primary_shard).details[ops_index].bl; + out_osd_op.rval = sub_reads.at(*primary_shard).details[ops_index].rval; + break; + } + default: { + ceph_abort_msg("Not supported"); + break; + } + } + } + + objecter.handle_osd_op_reply2(orig_op, out_ops); + + ldout(cct, DBG_LVL) << __func__ << " success this=" << this << " rc=" << rc << dendl; + Objecter::Op::complete(std::move(orig_op->onfinish), osdcode(rc), rc, objecter.service.get_executor()); + objecter._finish_op(orig_op, rc); + } else { + ldout(cct, DBG_LVL) << __func__ << " retry this=" << this << " rc=" << rc << dendl; + objecter.op_post_submit(orig_op); + } +} + +static bool validate(Objecter::Op *op, bool is_erasure, CephContext *cct) { + + if ((op->target.flags & CEPH_OSD_FLAG_BALANCE_READS) == 0 ) { + ldout(cct, DBG_LVL) << __func__ <<" REJECT: Client rejects balanced read" << dendl; + return false; + } + + uint64_t suitable_read_found = false; + for (auto & o : op->ops) { + switch (o.op.op) { + case CEPH_OSD_OP_READ: + case CEPH_OSD_OP_SPARSE_READ: { + uint64_t length = o.op.extent.length; + if ((is_erasure && length > 0) || + (!is_erasure && length >= kReplicaMinReadSize)) { + suitable_read_found = true; + } + break; + } + case CEPH_OSD_OP_GETXATTRS: + case CEPH_OSD_OP_CHECKSUM: + case CEPH_OSD_OP_GETXATTR: { + break; // Do not block validate. + } + default: { + ldout(cct, DBG_LVL) << __func__ <<" REJECT: unsupported op" << dendl; + return false; + } + } + } + + return suitable_read_found; +} + +void SplitOp::init(OSDOp &op, int ops_index) { + switch (op.op.op) { + case CEPH_OSD_OP_SPARSE_READ: { + init_read(op, true, ops_index); + break; + } + case CEPH_OSD_OP_READ: { + init_read(op, false, ops_index); + break; + } + case CEPH_OSD_OP_GETXATTRS: + case CEPH_OSD_OP_CHECKSUM: + case CEPH_OSD_OP_GETXATTR: { + shard_id_t shard = *primary_shard; + Details &d = sub_reads.at(shard).details[ops_index]; + orig_op->pass_thru_op(sub_reads.at(shard).rd, ops_index, &d.bl, &d.rval); + break; + } + default: { + ldout(cct, DBG_LVL) << __func__ <<" ABORT: unsupported" << dendl; + abort = true; + break; + } + } +} + +namespace { +void debug_op_summary(const std::string &str, Objecter::Op *op, CephContext *cct) { + auto &t = op->target; + ldout(cct, DBG_LVL) << str + << " osd=" << t.osd + << " shard=" << (t.force_shard ? *t.force_shard : shard_id_t(-1)) + << " balance_reads=" << ((t.flags & CEPH_OSD_FLAG_BALANCE_READS) != 0) + << " ops.size()=" << op->ops.size() + << " needs_version=" << (op->objver?"true":"false"); + + for (auto && o : op->ops) { + *_dout << " op_code=" << ceph_osd_op_name(o.op.op); + switch (o.op.op) { + case CEPH_OSD_OP_READ: + case CEPH_OSD_OP_SPARSE_READ: { + *_dout << "(" << o.op.extent.offset << "~" << o.op.extent.length << ")"; + break; + } + default: + break; + } + } + *_dout << dendl; +} +} + + +bool SplitOp::create(Objecter::Op *op, Objecter &objecter, + shunique_lock& sul, ceph_tid_t *ptid, int *ctx_budget, CephContext *cct) { + + auto &t = op->target; + const pg_pool_t *pi = objecter.osdmap->get_pg_pool(t.base_oloc.pool); + + if (!pi) { + ldout(cct, DBG_LVL) << __func__ <<" REJECT: No Pool" << dendl; + return false; + } + + debug_op_summary("orig_op: ", op, cct); + + // Reject if direct reads not supported by profile. + if (!pi->has_flag(pg_pool_t::FLAG_CLIENT_SPLIT_READS)) { + ldout(cct, DBG_LVL) << __func__ <<" REJECT: split reads off" << dendl; + return false; + } + + bool validated = validate(op, pi->is_erasure(), cct); + + if (!validated) { + return false; + } + + std::shared_ptr split_read; + + if (pi->is_erasure()) { + split_read = std::make_shared(op, objecter, cct, pi->size); + } else { + split_read = std::make_shared(op, objecter, cct, pi->size); + } + + if (split_read->abort) { + ldout(cct, DBG_LVL) << __func__ <<" ABORTED 1" << dendl;return false; + return false; + } + + // Populate the target, to extract the acting set from it. + t.flags &= ~CEPH_OSD_FLAG_BALANCE_READS; + objecter._calc_target(&op->target, op); + + for (unsigned i = 0; i < op->ops.size(); ++i) { + split_read->init( op->ops[i], i); + if (split_read->abort) { + break; + } + } + + if (split_read->abort) { + ldout(cct, DBG_LVL) << __func__ <<" ABORTED 2" << dendl; + return false; + } + + ldout(cct, DBG_LVL) << __func__ <<" sub_reads ready. count=" << split_read->sub_reads.size() << dendl; + + // We are committed to doing a split read. Any re-attempts should not be either + // split or balanced. + for (auto && [shard, sub_read] : split_read->sub_reads) { + auto fin = new Finisher(split_read, sub_read); // Self-destructs when called. + + version_t *objver = nullptr; + if (split_read->primary_shard && shard == *split_read->primary_shard) { + objver = split_read->orig_op->objver; + } + + auto sub_op = objecter.prepare_read_op( + t.base_oid, t.base_oloc, split_read->sub_reads.at(shard).rd, op->snapid, + nullptr, split_read->flags, -1, fin, objver); + sub_op->target.force_shard.emplace(shard); + if (pi->is_erasure()) { + sub_op->target.flags |= CEPH_OSD_FLAG_EC_DIRECT_READ; + } else { + sub_op->target.flags |= CEPH_OSD_FLAG_BALANCE_READS; + } + + objecter._op_submit_with_budget(sub_op, sul, ptid, ctx_budget); + debug_op_summary("sent_op", sub_op, cct); + } + + ceph_assert(split_read->sub_reads.size() > 0); + + return true; +} + + +#undef dout_prefix +#define dout_prefix *_dout << messenger->get_myname() << ".objecter " \ No newline at end of file diff --git a/src/osdc/SplitOp.h b/src/osdc/SplitOp.h new file mode 100644 index 00000000000..16547990de6 --- /dev/null +++ b/src/osdc/SplitOp.h @@ -0,0 +1,214 @@ +#pragma once + +#include +#include + +class SplitOp { + + protected: + using extent = std::pair; + using extents_map = std::map; + using extent_set = interval_set; + + using ExtentPredicate = std::function; + using extent_map_subrange = std::ranges::subrange; + using extent_map_subrange_view = std::ranges::take_while_view; + using extent_variant = std::variant, extent_map_subrange_view, extent_map_subrange>; + using buffer_appender = std::function; + + // A simple struct to hold the data for each step of the iteration. + struct ECChunkInfo { + uint64_t ro_offset; + uint64_t shard_offset; + uint64_t length; + shard_id_t shard; + + friend std::ostream & operator<<(std::ostream &os, const ECChunkInfo &obj) { + return os + << "ro_offset: " << obj.ro_offset + << " shard_offset: " << obj.shard_offset + << " length: " << obj.length + << " shard: " << obj.shard; + } + }; + + class ECStripeIterator { + public: + using iterator_category = std::input_iterator_tag; + using value_type = ECChunkInfo; + using difference_type = std::ptrdiff_t; + using pointer = ECChunkInfo*; + using reference = ECChunkInfo&; + + ECStripeIterator() = default; + + // Constructor for the "begin" iterator + ECStripeIterator( + uint64_t start_offset, + uint64_t total_len, + uint32_t chunk_s, + uint32_t data_chunks) + : chunk_size(chunk_s), + data_chunk_count(data_chunks) { + end_offset = start_offset + total_len; + current_info.ro_offset = start_offset; + uint64_t chunk = start_offset / chunk_size; + current_info.length = std::min(total_len, (chunk + 1) * chunk_size - start_offset); + + // Maybe this is paranoia, as compiler would probably detect that this + // / and % could be done in a single op. + auto chunk_div = std::lldiv(chunk, data_chunk_count); + current_info.shard = shard_id_t(chunk_div.rem); + current_info.shard_offset = (chunk_div.quot) * chunk_size + start_offset % chunk_size; + } + + value_type operator*() const { + return current_info; + } + + // Pre-increment + ECStripeIterator& operator++() { + current_info.ro_offset += current_info.length; + current_info.shard_offset += current_info.length - chunk_size; + current_info.length = std::min(chunk_size, end_offset - current_info.ro_offset); + ceph_assert(current_info.ro_offset <= end_offset); + ++current_info.shard; + if (unsigned(current_info.shard) == data_chunk_count) { + current_info.shard_offset += chunk_size; + current_info.shard = shard_id_t(0); + } + return *this; + } + + // post-increment + ECStripeIterator operator++(int) { + ECStripeIterator tmp = *this; + ++(*this); + return tmp; + } + + bool operator!=(const ECStripeIterator& other) const { + // This is only here to terminate the loop! + return current_info.length != other.current_info.length; + } + + bool operator==(const ECStripeIterator& other) const { + return !(*this != other); + } + value_type current_info{}; + + private: + uint64_t end_offset = 0; + uint64_t chunk_size = 0; + uint32_t data_chunk_count = 0; + }; + + + // The custom range class that provides begin() and end() + class ECStripeView { + public: + ECStripeView( + uint64_t offset, + uint64_t length, + const pg_pool_t *pi) + : start_offset(offset), + total_length(length), + data_chunk_count(pi->nonprimary_shards.size() + 1), + chunk_size(pi->get_stripe_width() / data_chunk_count) { + } + + ECStripeIterator begin() const { + return ECStripeIterator(start_offset, total_length, chunk_size, data_chunk_count); + } + + ECStripeIterator end() const { + ECStripeIterator end_iter; + end_iter.current_info.length = 0; + return end_iter; + } + + uint64_t start_offset; + uint64_t total_length; + uint32_t data_chunk_count; + uint64_t chunk_size; + }; + + static_assert(std::input_iterator, + "ECStripeIterator does not conform to the std::input_iterator concept"); + + struct Details { + bufferlist bl; + int rval; + boost::system::error_code ec; + std::optional e; + }; + + struct SubRead { + ::ObjectOperation rd; + mini_flat_map details; + int rc = -EIO; + + SubRead(int count) : details(count) {} + }; + + // This structure self-destructs on each IO completions, using a legacy + // C++ pattern (no shared_ptr). We use the finish callback to record the + // RC, but otherwise rely on the shared_ptr destroying ec_read to deal with + // completion of the parent IO. + struct Finisher : Context { + std::shared_ptr split_read; + SubRead &sub_read; + + Finisher(std::shared_ptr split_read, SubRead &sub_read) : split_read(split_read), sub_read(sub_read) {} + void finish(int r) override { + sub_read.rc = r; + } + }; + + int assemble_rc(); + virtual std::pair assemble_buffer_sparse_read(int ops_index) = 0; + virtual void assemble_buffer_read(bufferlist &bl_out, int ops_index) = 0; + virtual void init_read(OSDOp &op, bool sparse, int ops_index) = 0; + void init(OSDOp &op, int ops_index); + + Objecter::Op *orig_op; + Objecter &objecter; + mini_flat_map sub_reads; + CephContext *cct; + bool abort = false; // Last minute abort... We want to keep this to a minimum. + int flags = 0; + std::optional primary_shard; + std::map> op_offset_map; + + public: + SplitOp(Objecter::Op *op, Objecter &objecter, CephContext *cct, int count) : orig_op(op), objecter(objecter), sub_reads(count), cct(cct) {} + virtual ~SplitOp() = default; + void complete(); + static bool create(Objecter::Op *op, Objecter &objecter, + shunique_lock& sul, ceph_tid_t *ptid, int *ctx_budget, CephContext *cct); +}; + +class ECSplitOp : public SplitOp{ + public: + using SplitOp::SplitOp; + std::pair assemble_buffer_sparse_read(int ops_index) override; + void assemble_buffer_read(bufferlist &bl_out, int ops_index) override; + void init_read(OSDOp &op, bool sparse, int ops_index) override; + ECSplitOp(Objecter::Op *op, Objecter &objecter, CephContext *cct, int count); + ~ECSplitOp() { + complete(); + } +}; + +class ReplicaSplitOp : public SplitOp { + public: + using SplitOp::SplitOp; + std::pair assemble_buffer_sparse_read(int ops_index) override; + void assemble_buffer_read(bufferlist &bl_out, int ops_index) override; + void init_read(OSDOp &op, bool sparse, int ops_index) override; + ReplicaSplitOp(Objecter::Op *op, Objecter &objecter, CephContext *cct, int pool_size); + ~ReplicaSplitOp() { + complete(); + } +}; + -- 2.39.5