#undef dout_prefix
#define dout_prefix *_dout << " ECSplitOp::"
+/**
+ * @brief Initialize reference_sub_read to primary shard.
+ *
+ * Performs reverse lookup to find which acting index corresponds to the
+ * primary shard. Must be called after _calc_target() populates the acting set.
+ */
+void ECSplitOp::init_reference_sub_read() {
+ auto &target = orig_op->target;
+ shard_id_t primary_shard = target.actual_pgid.shard;
+
+ // Search through the acting set to find which index has the primary shard
+ // For EC pools, the acting index directly corresponds to the shard
+ for (size_t i = 0; i < target.acting.size(); i++) {
+ if (shard_id_t(i) == primary_shard) {
+ reference_sub_read = i;
+ ldout(cct, DBG_LVL) << __func__ << " set reference_sub_read=" << reference_sub_read
+ << " for primary_shard=" << (int)primary_shard << dendl;
+ return;
+ }
+ }
+
+ ldout(cct, DBG_LVL) << __func__ << " WARNING: Could not find primary shard "
+ << (int)primary_shard << " in acting set" << dendl;
+ abort = true;
+}
+
/**
* @brief Assemble sparse read results from EC shards into logical object view.
*
int shard_index = (int)shard;
int direct_osd = target.acting[shard_index];
- if (target.actual_pgid.shard == shard) {
- reference_sub_read = shard_index;
- }
if (!objecter.osdmap->exists(direct_osd)) {
ldout(cct, DBG_LVL) << __func__ <<" ABORT: Missing OSD" << dendl;
abort = true;
}
}
- if (primary_required && reference_sub_read == -1) {
- // _calc_target will have picked the primary by default on EC. The "primary"
- // on replica is an arbitrary shard.
- reference_sub_read = (int)target.actual_pgid.shard;
+ // If primary is required and we haven't created a sub_read for it yet, create one
+ if (primary_required && !sub_reads.contains(reference_sub_read)) {
sub_reads.emplace(reference_sub_read, orig_op->ops.size() + 1);
}
-
- ceph_assert(reference_sub_read != -1);
}
#undef dout_prefix
#define dout_prefix *_dout << " ReplicaSplitOp::"
+/**
+ * @brief Initialize reference_sub_read to a random valid OSD.
+ *
+ * Counts valid OSDs in the acting set and picks a random acting index.
+ * Must be called after _calc_target() populates the acting set.
+ */
+void ReplicaSplitOp::init_reference_sub_read() {
+ auto &target = orig_op->target;
+
+ // Count valid OSDs in the acting set
+ int valid_osd_count = 0;
+ for (size_t i = 0; i < target.acting.size(); i++) {
+ int direct_osd = target.acting[i];
+ if (objecter.osdmap->exists(direct_osd)) {
+ valid_osd_count++;
+ }
+ }
+
+ if (valid_osd_count < 2) {
+ abort = true;
+ ldout(cct, DBG_LVL) << __func__ << " ABORT: Not enough valid OSDs" << dendl;
+ return;
+ }
+
+ // Pick a random valid acting index
+ reference_sub_read = rand() % valid_osd_count;
+}
+
/**
* @brief Assemble sparse read results from replicas.
*
std::min(length / replica_min_shard_read_size,
osds.size());
uint64_t chunk_size = p2roundup(length / slice_count, (uint64_t)CEPH_PAGE_SIZE);
- unsigned start = 0;
-
- if (slice_count < osds.size()) {
- start = rand() % osds.size();
- }
-
- for (unsigned i = start; length > 0; i = (i + 1 == osds.size()) ? 0 : i + 1) {
+
+ // Use reference_sub_read (set in constructor) as the starting shard
+ // This provides load balancing while ensuring reference_sub_read is always set
+ for (unsigned i = reference_sub_read; length > 0; i = (i + 1 == osds.size()) ? 0 : i + 1) {
int acting_index = i;
if (!sub_reads.contains(acting_index)) {
sub_reads.emplace(acting_index, orig_op->ops.size() + 1);
- // Set reference_sub_read to the first index we use
- if (reference_sub_read == -1) {
- reference_sub_read = acting_index;
- }
}
auto &sr = sub_reads.at(acting_index);
auto bl = &sr.details[ops_index].bl;
}
default: {
// Invalid ops should have been rejected in validate.
+ // Ensure the reference sub_read entry exists
+ if (!sub_reads.contains(reference_sub_read)) {
+ sub_reads.emplace(reference_sub_read, orig_op->ops.size() + 1);
+ }
Details &d = sub_reads.at(reference_sub_read).details[ops_index];
orig_op->pass_thru_op(sub_reads.at(reference_sub_read).rd, ops_index, &d.bl, &d.rval);
break;
target.flags &= ~CEPH_OSD_FLAG_BALANCE_READS;
objecter._calc_target(&op->target, op);
+ // Initialize reference_sub_read now that acting set is populated
+ split_read->init_reference_sub_read();
+
+ if (split_read->reference_sub_read == -1) {
+ split_read->abort = true;
+ }
+
+ if (split_read->abort) {
+ ldout(cct, DBG_LVL) << __func__ <<" ABORTED after init_reference_sub_read" << dendl;
+ return false;
+ }
+
// STAGE 4: Initialize sub-operations (may set abort if problems detected)
for (unsigned i = 0; i < op->ops.size(); ++i) {
split_read->init( op->ops[i], i);