From: Radoslaw Zarzynski Date: Wed, 21 May 2025 13:40:38 +0000 (+0000) Subject: fixup: crimson/osd: implement ECBackend::handle_rep_read_reply() X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=13015a273488643d3205cba8a2bf59ed09dbbc55;p=ceph-ci.git fixup: crimson/osd: implement ECBackend::handle_rep_read_reply() Signed-off-by: Radoslaw Zarzynski --- diff --git a/src/crimson/osd/ec_backend.cc b/src/crimson/osd/ec_backend.cc index 144c481fb73..5262b7ed817 100644 --- a/src/crimson/osd/ec_backend.cc +++ b/src/crimson/osd/ec_backend.cc @@ -643,7 +643,7 @@ ECBackend::handle_rep_read_reply(ECSubReadReply& mop) auto& rop = read_pipeline.tid_to_read_map.at(mop.tid); // 1. data - for (auto& [obj, buffers] : mop.buffers_read) { + for (auto& [obj, offset_buffer_map] : mop.buffers_read) { // if attribute error we better not have sent a buffer assert(!mop.errors.contains(obj)); if (!rop.to_read.contains(obj)) { @@ -651,40 +651,60 @@ ECBackend::handle_rep_read_reply(ECSubReadReply& mop) logger().debug("{}: to_read skipping", __func__); continue; } - // would be cool to have the C++23's view::zip - auto req_iter = std::begin(rop.to_read.find(obj)->second.to_read); - auto ret_iter = std::begin(rop.complete[obj].returned); - for (auto& [len, buffer] : buffers) { - // bolierplate - assert(req_iter != std::end(rop.to_read.find(obj)->second.to_read)); - assert(ret_iter != std::end(rop.complete[obj].returned)); - - const auto adjusted = - sinfo.chunk_aligned_offset_len_to_chunk( - std::make_pair(req_iter->offset, req_iter->size)); - assert(adjusted.first == len); - - ret_iter->get<2>()[from] = std::move(buffer); - // bolierplate - ++req_iter; - ++ret_iter; + if (!rop.complete.contains(obj)) { + rop.complete.emplace(obj, &sinfo); + } + auto &buffers_read = rop.complete.at(obj).buffers_read; + for (auto &&[offset, buffer_list]: offset_buffer_map) { + buffers_read.insert_in_shard(from.shard, offset, buffer_list); + } + } + for (auto &&[hoid, req]: rop.to_read) { + if (!rop.complete.contains(hoid)) { + rop.complete.emplace(hoid, &sinfo); + } + auto &complete = rop.complete.at(hoid); + for (auto &&[shard, read]: std::as_const(req.shard_reads)) { + if (complete.errors.contains(read.pg_shard)) continue; + + complete.processed_read_requests[shard].union_of(read.extents); + + if (!rop.complete.contains(hoid) || + !complete.buffers_read.contains(shard)) { + if (!read.extents.empty()) continue; // Complete the actual read first. + + // If we are first here, populate the completion. + if (!rop.complete.contains(hoid)) { + rop.complete.emplace(hoid, read_result_t(&sinfo)); + } + } } } // 2. attrs - for (auto& [obj, attrs] : mop.attrs_read) { - assert(!mop.errors.contains(obj)); // if read error better not have sent an attribute - if (!rop.to_read.contains(obj)) { + for (auto &&[hoid, attr]: mop.attrs_read) { + assert(!mop.errors.count(hoid)); + // if read error better not have sent an attribute + if (!rop.to_read.count(hoid)) { // we canceled this read! @see filter_read_op logger().debug("{}: to_read skipping", __func__); continue; } - rop.complete[obj].attrs.emplace(); - (*(rop.complete[obj].attrs)).swap(attrs); + if (!rop.complete.contains(hoid)) { + rop.complete.emplace(hoid, &sinfo); + } + rop.complete.at(hoid).attrs.emplace(); + (*(rop.complete.at(hoid).attrs)).swap(attr); } // 3. errors - for (auto& [obj, errcode] : mop.errors) { - rop.complete[obj].errors.emplace(from, errcode); - logger().debug("{} shard={} error={}", __func__, from, errcode); + for (auto &&[hoid, err]: mop.errors) { + if (!rop.complete.contains(hoid)) { + rop.complete.emplace(hoid, &sinfo); + } + auto &complete = rop.complete.at(hoid); + complete.errors.emplace(from, err); + complete.buffers_read.erase_shard(from.shard); + complete.processed_read_requests.erase(from.shard); + logger().debug("{} shard={} error={}", __func__, from, err); } { auto it = read_pipeline.shard_to_read_map.find(from); @@ -701,57 +721,65 @@ ECBackend::handle_rep_read_reply(ECSubReadReply& mop) unsigned is_complete = 0; bool need_resend = false; if (rop.do_redundant_reads || rop.in_progress.empty()) { - for (const auto& [obj, read_result] : rop.complete) { - std::set have; - for (const auto& [shard, bl] : read_result.returned.front().get<2>()) { - have.emplace(shard.shard); - logger().debug("{} have shard={}", __func__, shard); - } - std::map>> dummy_minimum; - int err; - if ((err = ec_impl->minimum_to_decode(rop.want_to_read[obj], have, &dummy_minimum)) < 0) { + for (auto &&[oid, read_result]: rop.complete) { + shard_id_set have; + read_result.processed_read_requests.populate_shard_id_set(have); + shard_id_set dummy_minimum; + shard_id_set want_to_read; + rop.to_read.at(oid).shard_want_to_read. + populate_shard_id_set(want_to_read); + + int err = ec_impl->minimum_to_decode(want_to_read, have, dummy_minimum, + nullptr); + if (err) { logger().debug("{} minimum_to_decode failed {}", __func__, err); if (rop.in_progress.empty()) { - // If we don't have enough copies, try other pg_shard_ts if available. - // During recovery there may be multiple osds with copies of the same shard, - // so getting EIO from one may result in multiple passes through this code path. - if (!rop.do_redundant_reads) { - // TODO: - int r = 0;// read_pipeline.send_all_remaining_reads(obj, rop); - if (r == 0) { - // We changed the rop's to_read and not incrementing is_complete - need_resend = true; - continue; - } - // Couldn't read any additional shards so handle as completed with errors - } - // We don't want to confuse clients / RBD with objectstore error - // values in particular ENOENT. We may have different error returns - // from different shards, so we'll return minimum_to_decode() error - // (usually EIO) to reader. It is likely an error here is due to a - // damaged pg. - rop.complete[obj].r = err; - ++is_complete; - } - } else { - assert(rop.complete[obj].r == 0); - if (!rop.complete[obj].errors.empty()) { + // If we don't have enough copies, try other pg_shard_ts if available. + // During recovery there may be multiple osds with copies of the same shard, + // so getting EIO from one may result in multiple passes through this code path. + if (!rop.do_redundant_reads) { + int r = read_pipeline.send_all_remaining_reads(oid, rop); + if (r == 0) { + // We found that new reads are required to do a decode. + need_resend = true; + continue; + } else if (r > 0) { + // No new reads were requested. This means that some parity + // shards can be assumed to be zeros. + err = 0; + } + // else insufficient shards are available, keep the errors. + } + // Couldn't read any additional shards so handle as completed with errors + // We don't want to confuse clients / RBD with objectstore error + // values in particular ENOENT. We may have different error returns + // from different shards, so we'll return minimum_to_decode() error + // (usually EIO) to reader. It is likely an error here is due to a + // damaged pg. + rop.complete.at(oid).r = err; + ++is_complete; + } + } + + if (!err) { + ceph_assert(rop.complete.at(oid).r == 0); + if (!rop.complete.at(oid).errors.empty()) { using crimson::common::local_conf; if (local_conf()->osd_read_ec_check_for_errors) { logger().info("{}: not ignoring errors, use one shard err={}", __func__, err); - err = rop.complete[obj].errors.begin()->second; - rop.complete[obj].r = err; + err = rop.complete.at(oid).errors.begin()->second; + rop.complete.at(oid).r = err; } else { logger().info("{}: error(s) ignored for {} enough copies available", - __func__, obj); - rop.complete[obj].errors.clear(); + __func__, oid); + rop.complete.at(oid).errors.clear(); } - } - // avoid re-read for completed object as we may send remaining reads for uncopmpleted objects - rop.to_read.at(obj).need.clear(); - rop.to_read.at(obj).want_attrs = false; - ++is_complete; + } + // avoid re-read for completed object as we may send remaining reads for uncopmpleted objects + rop.to_read.at(oid).shard_reads.clear(); + rop.to_read.at(oid).want_attrs = false; + ++is_complete; } } } @@ -760,7 +788,7 @@ ECBackend::handle_rep_read_reply(ECSubReadReply& mop) } else if (rop.in_progress.empty() || is_complete == rop.complete.size()) { logger().debug("{}: complete {}", __func__, rop); - read_pipeline.complete_read_op(rop); + read_pipeline.complete_read_op(std::move(rop)); } else { logger().info("{}: readop not completed yet: {}", __func__, rop); }