const hobject_t &hoid,
ECUtil::shard_extent_map_t &&buffers_read,
std::optional<map<string, bufferlist, less<>>> attrs,
- const ECUtil::shard_extent_set_t &want_to_read,
+ read_request_t &req,
RecoveryMessages *m) {
dout(10) << __func__ << ": returned " << hoid << " " << buffers_read << dendl;
ceph_assert(recovery_ops.contains(hoid));
ceph_assert(op.obc);
op.recovery_info.size = op.obc->obs.oi.size;
op.recovery_info.oi = op.obc->obs.oi;
+
+ // We didn't know the size before, meaning the zero for decode calculations
+ // will be off. Recalculate them!
+ req.object_size = op.obc->obs.oi.size;
+ int r = read_pipeline.get_min_avail_to_read_shards(
+ op.hoid, true, false, req);
+ ceph_assert(r == 0);
}
}
ceph_assert(op.xattrs.size());
sinfo.ro_size_to_read_mask(op.recovery_info.size, read_mask);
ECUtil::shard_extent_set_t shard_want_to_read(sinfo.get_k_plus_m());
- for (auto &[shard, eset] : want_to_read) {
+ for (auto &[shard, eset] : req.shard_want_to_read) {
/* Read buffers do not need recovering! */
if (buffers_read.contains(shard)) {
continue;
uint64_t aligned_size = ECUtil::align_next(op.obc->obs.oi.size);
+ dout(20) << __func__ << " before decode: oid=" << op.hoid << " EC_DEBUG_BUFFERS: "
+ << op.returned_data->debug_string(2048, 0)
+ << dendl;
+
+ op.returned_data->add_zero_padding_for_decode(req.zeros_for_decode);
int r = op.returned_data->decode(ec_impl, shard_want_to_read, aligned_size, get_parent()->get_dpp(), true);
ceph_assert(r == 0);
}
dout(20) << __func__ << ": oid=" << op.hoid << dendl;
- dout(20) << __func__ << "EC_DEBUG_BUFFERS: "
+ dout(20) << __func__ << " after decode: oid=" << op.hoid << " EC_DEBUG_BUFFERS: "
<< op.returned_data->debug_string(2048, 0)
<< dendl;
hoid,
std::move(res.buffers_read),
res.attrs,
- req.shard_want_to_read,
+ req,
&rm);
}
rop.debug_log.emplace_back(ECUtil::ERROR, op.from, complete.buffers_read);
complete.buffers_read.erase_shard(from.shard);
complete.processed_read_requests.erase(from.shard);
+ // If we are doing redundant reads, then we must take care that any failed
+ // reads are not replaced with a zero buffer. When fast_reads are disabled,
+ // the send_all_remaining_reads() call will replace the zeros_for_decode
+ // based on the recovery read.
+ if (rop.do_redundant_reads) {
+ rop.to_read.at(hoid).zeros_for_decode.erase(from.shard);
+ }
dout(20) << __func__ << " shard=" << from << " error=" << err << dendl;
}
rop.in_progress.erase(from);
unsigned is_complete = 0;
bool need_resend = false;
+ bool all_sub_reads_done = rop.in_progress.empty();
// For redundant reads check for completion as each shard comes in,
// or in a non-recovery read check for completion once all the shards read.
- if (rop.do_redundant_reads || rop.in_progress.empty()) {
+ if (rop.do_redundant_reads || all_sub_reads_done) {
for (auto &&[oid, read_result]: rop.complete) {
shard_id_set have;
read_result.processed_read_requests.populate_shard_id_set(have);
populate_shard_id_set(want_to_read);
dout(20) << __func__ << " read_result: " << read_result << dendl;
+ // If all reads are done, we can safely assume that zero buffers can
+ // be applied.
+ if (all_sub_reads_done) {
+ rop.to_read.at(oid).zeros_for_decode.populate_shard_id_set(have);
+ }
int err = ec_impl->minimum_to_decode(want_to_read, have, dummy_minimum,
nullptr);
if (err) {
dout(20) << __func__ << " minimum_to_decode failed" << dendl;
- if (rop.in_progress.empty()) {
+ if (all_sub_reads_done) {
// If we don't have enough copies, try other pg_shard_ts if available.
// During recovery there may be multiple osds with copies of the same shard,
// so getting EIO from one may result in multiple passes through this code path.
- if (!rop.do_redundant_reads) {
- rop.debug_log.emplace_back(ECUtil::REQUEST_MISSING, op.from);
- int r = read_pipeline.send_all_remaining_reads(oid, rop);
- if (r == 0) {
- // We found that new reads are required to do a decode.
- need_resend = true;
- continue;
- } else if (r > 0) {
- // No new reads were requested. This means that some parity
- // shards can be assumed to be zeros.
- err = 0;
- }
- // else insufficient shards are available, keep the errors.
+
+ rop.debug_log.emplace_back(ECUtil::REQUEST_MISSING, op.from);
+ int r = read_pipeline.send_all_remaining_reads(oid, rop);
+ if (r == 0 && !rop.do_redundant_reads) {
+ // We found that new reads are required to do a decode.
+ need_resend = true;
+ continue;
}
+ // else insufficient shards are available, keep the errors.
+
// Couldn't read any additional shards so handle as completed with errors
// We don't want to confuse clients / RBD with objectstore error
// values in particular ENOENT. We may have different error returns
dout(20) << __func__ << " Complete: " << rop << dendl;
rop.trace.event("ec read complete");
rop.debug_log.emplace_back(ECUtil::COMPLETE, op.from);
+
+ /* If do_redundant_reads is set then there might be some in progress
+ * reads remaining. We need to make sure that these non-read shards
+ * do not get padded. If there was no in progress read, then the zero
+ * padding is allowed to stay.
+ */
+ for (auto pg_shard : rop.in_progress) {
+ for (auto &&[oid, read] : rop.to_read) {
+ read.zeros_for_decode.erase(pg_shard.shard);
+ }
+ }
read_pipeline.complete_read_op(std::move(rop));
} else {
dout(10) << __func__ << " readop not complete: " << rop << dendl;
ECUtil::shard_extent_map_t &&buffers_read,
std::optional<std::map<std::string, ceph::buffer::list, std::less<>>>
attrs,
- const ECUtil::shard_extent_set_t &want_to_read,
+ read_request_t &req,
RecoveryMessages *m);
void handle_recovery_push(
const PushOp &op,
* redundant reads is set, then we want to have the same reads on
* every extent. Otherwise, we need to read every shard only if the
* necessary shard is missing.
+ * In some (recovery) scenarios, we can "want" a shard, but not "need" to
+ * read it. This typically happens when we do not have a data shard and the
+ * recovery for this will read enough shards to also generate all the parity.
+ *
+ * Since parity shards are often larger than data shards, we must make sure
+ * to read the extra bit!
*/
- if (!have.contains(shard) || do_redundant_reads) {
+ if (!have.contains(shard) || do_redundant_reads ||
+ (want.contains(shard) && !need_set.contains(shard))) {
extra_extents.union_of(extent_set);
}
}
+ read_request.zeros_for_decode.clear();
for (auto &shard: need_set) {
if (!have.contains(shard)) {
continue;
shard_read.extents.intersection_of(extents, read_mask.at(shard));
}
+ if (zero_mask.contains(shard)) {
+ extents.intersection_of(zero_mask.at(shard));
+
+ /* Any remaining extents can be assumed ot be zeros... so record these. */
+ if (!extents.empty()) {
+ read_request.zeros_for_decode.emplace(shard, std::move(extents));
+ }
+ }
+
if (!shard_read.extents.empty()) {
read_request.shard_reads[shard_id] = std::move(shard_read);
}
}
}
- return read_request.shard_reads.empty()?1:0;
+ return 0;
}
void ECCommon::ReadPipeline::start_read_op(
<< res.buffers_read.debug_string(2048, 0)
<< dendl;
/* Decode any missing buffers */
+ res.buffers_read.add_zero_padding_for_decode(req.zeros_for_decode);
int r = res.buffers_read.decode(read_pipeline.ec_impl,
req.shard_want_to_read,
req.object_size,
const std::list<ec_align_t> to_read;
const uint32_t flags = 0;
const ECUtil::shard_extent_set_t shard_want_to_read;
+ ECUtil::shard_extent_set_t zeros_for_decode;
shard_id_map<shard_read_t> shard_reads;
bool want_attrs = false;
uint64_t object_size;
to_read(to_read),
flags(to_read.front().flags),
shard_want_to_read(shard_want_to_read),
+ zeros_for_decode(shard_want_to_read.get_max_shards()),
shard_reads(shard_want_to_read.get_max_shards()),
want_attrs(want_attrs),
object_size(object_size) {}
read_request_t(const ECUtil::shard_extent_set_t &shard_want_to_read,
bool want_attrs, uint64_t object_size) :
shard_want_to_read(shard_want_to_read),
+ zeros_for_decode(shard_want_to_read.get_max_shards()),
shard_reads(shard_want_to_read.get_max_shards()),
want_attrs(want_attrs),
object_size(object_size) {}
os << "read_request_t(to_read=[" << to_read << "]"
<< ", flags=" << flags
<< ", shard_want_to_read=" << shard_want_to_read
+ << ", zeros_for_decode=" << zeros_for_decode
<< ", shard_reads=" << shard_reads
<< ", want_attrs=" << want_attrs
<< ")";
if (!pad_to.contains(shard)) {
continue;
}
- for (auto &[off, length] : pad_to.at(shard)) {
- bufferlist bl;
- bl.push_back(buffer::create_aligned(length, EC_ALIGN_SIZE));
- insert_in_shard(shard, off, bl);
- }
+ pad_on_shard(pad_to.at(shard), shard);
}
}
if (pad_to.size() == 0) {
return;
}
+
for (auto &[off, length] : pad_to) {
bufferlist bl;
- bl.push_back(buffer::create_aligned(length, EC_ALIGN_SIZE));
- insert_in_shard(shard, off, bl);
+ uint64_t start = align_prev(off);
+ uint64_t end = align_next(off + length);
+
+ bl.push_back(buffer::create_aligned(end - start, EC_ALIGN_SIZE));
+ insert_in_shard(shard, start, bl);
}
}
return;
}
for (auto &shard : shards) {
- for (auto &[off, length] : pad_to) {
- bufferlist bl;
- bl.push_back(buffer::create_aligned(length, EC_ALIGN_SIZE));
- insert_in_shard(shard, off, bl);
- }
+ pad_on_shard(pad_to, shard);
}
}
return 0;
}
- if (add_zero_padding_for_decode(object_size, need_set)) {
- // We added some zero buffers, which means our have and need set may change
- extent_maps.populate_bitset_set(have_set);
- need_set = shard_id_set::difference(want_set, have_set);
- }
-
shard_id_set decode_set = shard_id_set::intersection(need_set, sinfo->get_data_shards());
shard_id_set encode_set = shard_id_set::intersection(need_set, sinfo->get_parity_shards());
- shard_extent_set_t read_mask(sinfo->get_k_plus_m());
- sinfo->ro_size_to_read_mask(object_size, read_mask);
+
+ if (!encode_set.empty()) {
+ shard_extent_set_t read_mask(sinfo->get_k_plus_m());
+ sinfo->ro_size_to_read_mask(object_size, read_mask);
+
+ /* The function has been asked to "decode" parity. To achieve this, we
+ * need all the data shards to be present... So first see if there are
+ * any missing...
+ */
+ shard_id_set decode_for_parity_shards = shard_id_set::difference(sinfo->get_data_shards(), have_set);
+ decode_for_parity_shards = shard_id_set::intersection(decode_for_parity_shards, read_mask.get_shard_id_set());
+
+ if (!decode_for_parity_shards.empty()) {
+ /* So there are missing data shards which need decoding before we encode,
+ * We need to add these to the decode set and insert buffers for the
+ * decode to happen.
+ */
+ decode_set.insert(decode_for_parity_shards);
+ need_set.insert(decode_for_parity_shards);
+ extent_set decode_for_parity = get_extent_superset();
+
+ for (auto shard : decode_for_parity_shards) {
+ extent_set parity_pad;
+ parity_pad.intersection_of(decode_for_parity, read_mask.at(shard));
+ pad_on_shard(decode_for_parity, shard);
+ }
+ }
+ }
+
int r = 0;
if (!decode_set.empty()) {
pad_on_shards(want, decode_set);
- /* If we are going to be encoding, we need to make sure all the necessary
- * shards are decoded. The get_min_available functions should have already
- * worked out what needs to be read for this.
- */
- for (auto shard : encode_set) {
- extent_set decode_for_parity;
- decode_for_parity.intersection_of(want.at(shard), read_mask.at(shard));
- pad_on_shard(decode_for_parity, shard);
- }
r = _decode(ec_impl, want_set, decode_set, dpp);
}
if (!r && !encode_set.empty()) {
- pad_on_shards(want, encode_set);
+ pad_on_shards(get_extent_superset(), sinfo->get_parity_shards());
r = encode(ec_impl, dpp, dedup_zeros?&need_set:nullptr);
}
const shard_id_set &need_set,
DoutPrefixProvider *dpp) {
bool rebuild_req = false;
+
for (auto iter = begin_slice_iterator(need_set, dpp); !iter.is_end(); ++iter) {
if (!iter.is_page_aligned()) {
rebuild_req = true;
break;
}
+
shard_id_map<bufferptr> &in = iter.get_in_bufferptrs();
shard_id_map<bufferptr> &out = iter.get_out_bufferptrs();
+ if (out.empty()) {
+ continue;
+ }
+
if (int ret = ec_impl->decode_chunks(want_set, in, out)) {
return ret;
}
}
}
- bool add_zero_padding_for_decode(uint64_t object_size, shard_id_set &exclude_set) {
- shard_extent_set_t zeros(sinfo->get_k_plus_m());
- sinfo->ro_size_to_zero_mask(object_size, zeros);
- extent_set superset = get_extent_superset();
+ void add_zero_padding_for_decode(ECUtil::shard_extent_set_t &zeros) {
bool changed = false;
for (auto &&[shard, z] : zeros) {
- if (exclude_set.contains(shard)) {
- continue;
- }
- z.intersection_of(superset);
for (auto [off, len] : z) {
changed = true;
bufferlist bl;
if (changed) {
compute_ro_range();
}
-
- return changed;
}
template <typename IntervalSetT> requires is_interval_set_v<IntervalSetT>
}
}
+ semap.add_zero_padding_for_decode(read_request.zeros_for_decode);
ASSERT_EQ(0, semap.decode(ec_impl, want, object_size, nullptr, true));
}
acting_set.insert_range(shard_id_t(0), 3);
test_decode(k, m, chunk_size, object_size, want, acting_set);
-}
\ No newline at end of file
+}