auto& rop = read_pipeline.tid_to_read_map.at(mop.tid);
// 1. data
- for (auto& [obj, buffers] : mop.buffers_read) {
+ for (auto& [obj, offset_buffer_map] : mop.buffers_read) {
// if attribute error we better not have sent a buffer
assert(!mop.errors.contains(obj));
if (!rop.to_read.contains(obj)) {
logger().debug("{}: to_read skipping", __func__);
continue;
}
- // would be cool to have the C++23's view::zip
- auto req_iter = std::begin(rop.to_read.find(obj)->second.to_read);
- auto ret_iter = std::begin(rop.complete[obj].returned);
- for (auto& [len, buffer] : buffers) {
- // bolierplate
- assert(req_iter != std::end(rop.to_read.find(obj)->second.to_read));
- assert(ret_iter != std::end(rop.complete[obj].returned));
-
- const auto adjusted =
- sinfo.chunk_aligned_offset_len_to_chunk(
- std::make_pair(req_iter->offset, req_iter->size));
- assert(adjusted.first == len);
-
- ret_iter->get<2>()[from] = std::move(buffer);
- // bolierplate
- ++req_iter;
- ++ret_iter;
+ if (!rop.complete.contains(obj)) {
+ rop.complete.emplace(obj, &sinfo);
+ }
+ auto &buffers_read = rop.complete.at(obj).buffers_read;
+ for (auto &&[offset, buffer_list]: offset_buffer_map) {
+ buffers_read.insert_in_shard(from.shard, offset, buffer_list);
+ }
+ }
+ for (auto &&[hoid, req]: rop.to_read) {
+ if (!rop.complete.contains(hoid)) {
+ rop.complete.emplace(hoid, &sinfo);
+ }
+ auto &complete = rop.complete.at(hoid);
+ for (auto &&[shard, read]: std::as_const(req.shard_reads)) {
+ if (complete.errors.contains(read.pg_shard)) continue;
+
+ complete.processed_read_requests[shard].union_of(read.extents);
+
+ if (!rop.complete.contains(hoid) ||
+ !complete.buffers_read.contains(shard)) {
+ if (!read.extents.empty()) continue; // Complete the actual read first.
+
+ // If we are first here, populate the completion.
+ if (!rop.complete.contains(hoid)) {
+ rop.complete.emplace(hoid, read_result_t(&sinfo));
+ }
+ }
}
}
// 2. attrs
- for (auto& [obj, attrs] : mop.attrs_read) {
- assert(!mop.errors.contains(obj)); // if read error better not have sent an attribute
- if (!rop.to_read.contains(obj)) {
+ for (auto &&[hoid, attr]: mop.attrs_read) {
+ assert(!mop.errors.count(hoid));
+ // if read error better not have sent an attribute
+ if (!rop.to_read.count(hoid)) {
// we canceled this read! @see filter_read_op
logger().debug("{}: to_read skipping", __func__);
continue;
}
- rop.complete[obj].attrs.emplace();
- (*(rop.complete[obj].attrs)).swap(attrs);
+ if (!rop.complete.contains(hoid)) {
+ rop.complete.emplace(hoid, &sinfo);
+ }
+ rop.complete.at(hoid).attrs.emplace();
+ (*(rop.complete.at(hoid).attrs)).swap(attr);
}
// 3. errors
- for (auto& [obj, errcode] : mop.errors) {
- rop.complete[obj].errors.emplace(from, errcode);
- logger().debug("{} shard={} error={}", __func__, from, errcode);
+ for (auto &&[hoid, err]: mop.errors) {
+ if (!rop.complete.contains(hoid)) {
+ rop.complete.emplace(hoid, &sinfo);
+ }
+ auto &complete = rop.complete.at(hoid);
+ complete.errors.emplace(from, err);
+ complete.buffers_read.erase_shard(from.shard);
+ complete.processed_read_requests.erase(from.shard);
+ logger().debug("{} shard={} error={}", __func__, from, err);
}
{
auto it = read_pipeline.shard_to_read_map.find(from);
unsigned is_complete = 0;
bool need_resend = false;
if (rop.do_redundant_reads || rop.in_progress.empty()) {
- for (const auto& [obj, read_result] : rop.complete) {
- std::set<int> have;
- for (const auto& [shard, bl] : read_result.returned.front().get<2>()) {
- have.emplace(shard.shard);
- logger().debug("{} have shard={}", __func__, shard);
- }
- std::map<int, std::vector<std::pair<int, int>>> dummy_minimum;
- int err;
- if ((err = ec_impl->minimum_to_decode(rop.want_to_read[obj], have, &dummy_minimum)) < 0) {
+ for (auto &&[oid, read_result]: rop.complete) {
+ shard_id_set have;
+ read_result.processed_read_requests.populate_shard_id_set(have);
+ shard_id_set dummy_minimum;
+ shard_id_set want_to_read;
+ rop.to_read.at(oid).shard_want_to_read.
+ populate_shard_id_set(want_to_read);
+
+ int err = ec_impl->minimum_to_decode(want_to_read, have, dummy_minimum,
+ nullptr);
+ if (err) {
logger().debug("{} minimum_to_decode failed {}", __func__, err);
if (rop.in_progress.empty()) {
- // If we don't have enough copies, try other pg_shard_ts if available.
- // During recovery there may be multiple osds with copies of the same shard,
- // so getting EIO from one may result in multiple passes through this code path.
- if (!rop.do_redundant_reads) {
- // TODO:
- int r = 0;// read_pipeline.send_all_remaining_reads(obj, rop);
- if (r == 0) {
- // We changed the rop's to_read and not incrementing is_complete
- need_resend = true;
- continue;
- }
- // Couldn't read any additional shards so handle as completed with errors
- }
- // We don't want to confuse clients / RBD with objectstore error
- // values in particular ENOENT. We may have different error returns
- // from different shards, so we'll return minimum_to_decode() error
- // (usually EIO) to reader. It is likely an error here is due to a
- // damaged pg.
- rop.complete[obj].r = err;
- ++is_complete;
- }
- } else {
- assert(rop.complete[obj].r == 0);
- if (!rop.complete[obj].errors.empty()) {
+ // If we don't have enough copies, try other pg_shard_ts if available.
+ // During recovery there may be multiple osds with copies of the same shard,
+ // so getting EIO from one may result in multiple passes through this code path.
+ if (!rop.do_redundant_reads) {
+ int r = read_pipeline.send_all_remaining_reads(oid, rop);
+ if (r == 0) {
+ // We found that new reads are required to do a decode.
+ need_resend = true;
+ continue;
+ } else if (r > 0) {
+ // No new reads were requested. This means that some parity
+ // shards can be assumed to be zeros.
+ err = 0;
+ }
+ // else insufficient shards are available, keep the errors.
+ }
+ // Couldn't read any additional shards so handle as completed with errors
+ // We don't want to confuse clients / RBD with objectstore error
+ // values in particular ENOENT. We may have different error returns
+ // from different shards, so we'll return minimum_to_decode() error
+ // (usually EIO) to reader. It is likely an error here is due to a
+ // damaged pg.
+ rop.complete.at(oid).r = err;
+ ++is_complete;
+ }
+ }
+
+ if (!err) {
+ ceph_assert(rop.complete.at(oid).r == 0);
+ if (!rop.complete.at(oid).errors.empty()) {
using crimson::common::local_conf;
if (local_conf()->osd_read_ec_check_for_errors) {
logger().info("{}: not ignoring errors, use one shard err={}",
__func__, err);
- err = rop.complete[obj].errors.begin()->second;
- rop.complete[obj].r = err;
+ err = rop.complete.at(oid).errors.begin()->second;
+ rop.complete.at(oid).r = err;
} else {
logger().info("{}: error(s) ignored for {} enough copies available",
- __func__, obj);
- rop.complete[obj].errors.clear();
+ __func__, oid);
+ rop.complete.at(oid).errors.clear();
}
- }
- // avoid re-read for completed object as we may send remaining reads for uncopmpleted objects
- rop.to_read.at(obj).need.clear();
- rop.to_read.at(obj).want_attrs = false;
- ++is_complete;
+ }
+ // avoid re-read for completed object as we may send remaining reads for uncopmpleted objects
+ rop.to_read.at(oid).shard_reads.clear();
+ rop.to_read.at(oid).want_attrs = false;
+ ++is_complete;
}
}
}
} else if (rop.in_progress.empty() ||
is_complete == rop.complete.size()) {
logger().debug("{}: complete {}", __func__, rop);
- read_pipeline.complete_read_op(rop);
+ read_pipeline.complete_read_op(std::move(rop));
} else {
logger().info("{}: readop not completed yet: {}", __func__, rop);
}