}
ECBackend::ll_read_ierrorator::future<>
-ECBackend::handle_rep_read_reply(Ref<MOSDECSubOpReadReply>)
+ECBackend::handle_rep_read_reply(Ref<MOSDECSubOpReadReply> m)
{
+ const auto& from = m->op.from;
+ auto& mop = m->op;
+ //logger().debug("{}: reply {}", __func__, mop);
+ logger().debug("{}: reply {} from {}", __func__, "", from);
+ if (!read_pipeline.tid_to_read_map.contains(mop.tid)) {
+ //canceled
+ logger().debug("{}: canceled", __func__);
+ return ll_read_ierrorator::now();
+ }
+ auto& rop = read_pipeline.tid_to_read_map.at(mop.tid);
+
+ // 1. data
+ for (auto& [obj, buffers] : mop.buffers_read) {
+ // if attribute error we better not have sent a buffer
+ assert(!mop.errors.contains(obj));
+ if (!rop.to_read.contains(obj)) {
+ // We canceled this read! @see filter_read_op
+ logger().debug("{}: to_read skipping", __func__);
+ continue;
+ }
+ // would be cool to have the C++23's view::zip
+ auto req_iter = std::begin(rop.to_read.find(obj)->second.to_read);
+ auto ret_iter = std::begin(rop.complete[obj].returned);
+ for (auto& [len, buffer] : buffers) {
+ // bolierplate
+ assert(req_iter != std::end(rop.to_read.find(obj)->second.to_read));
+ assert(ret_iter != std::end(rop.complete[obj].returned));
+
+ const auto adjusted =
+ sinfo.chunk_aligned_offset_len_to_chunk(
+ std::make_pair(req_iter->offset, req_iter->size));
+ assert(adjusted.first == len);
+
+ ret_iter->get<2>()[from] = std::move(buffer);
+ // bolierplate
+ ++req_iter;
+ ++ret_iter;
+ }
+ }
+ // 2. attrs
+ for (auto& [obj, attrs] : mop.attrs_read) {
+ assert(!mop.errors.contains(obj)); // if read error better not have sent an attribute
+ if (!rop.to_read.contains(obj)) {
+ // we canceled this read! @see filter_read_op
+ logger().debug("{}: to_read skipping", __func__);
+ continue;
+ }
+ rop.complete[obj].attrs.emplace();
+ (*(rop.complete[obj].attrs)).swap(attrs);
+ }
+ // 3. errors
+ for (auto& [obj, errcode] : mop.errors) {
+ rop.complete[obj].errors.emplace(from, errcode);
+ logger().debug("{} shard={} error={}", __func__, from, errcode);
+ }
+ {
+ auto it = read_pipeline.shard_to_read_map.find(from);
+ assert(it != std::end(read_pipeline.shard_to_read_map));
+ // second is a set of all ongoing requests
+ auto& txc_registry = it->second;
+ assert(txc_registry.contains(mop.tid));
+ txc_registry.erase(mop.tid);
+ }
+ assert(rop.in_progress.contains(from));
+ rop.in_progress.erase(from);
+ // For redundant reads check for completion as each shard comes in,
+ // or in a non-recovery read check for completion once all the shards read.
+ unsigned is_complete = 0;
+ bool need_resend = false;
+ if (rop.do_redundant_reads || rop.in_progress.empty()) {
+ for (const auto& [obj, read_result] : rop.complete) {
+ std::set<int> have;
+ for (const auto& [shard, bl] : read_result.returned.front().get<2>()) {
+ have.emplace(shard.shard);
+ logger().debug("{} have shard={}", __func__, shard);
+ }
+ std::map<int, std::vector<std::pair<int, int>>> dummy_minimum;
+ int err;
+ if ((err = ec_impl->minimum_to_decode(rop.want_to_read[obj], have, &dummy_minimum)) < 0) {
+ logger().debug("{} minimum_to_decode failed {}", __func__, err);
+ if (rop.in_progress.empty()) {
+ // If we don't have enough copies, try other pg_shard_ts if available.
+ // During recovery there may be multiple osds with copies of the same shard,
+ // so getting EIO from one may result in multiple passes through this code path.
+ if (!rop.do_redundant_reads) {
+ // TODO:
+ int r = 0;// read_pipeline.send_all_remaining_reads(obj, rop);
+ if (r == 0) {
+ // We changed the rop's to_read and not incrementing is_complete
+ need_resend = true;
+ continue;
+ }
+ // Couldn't read any additional shards so handle as completed with errors
+ }
+ // We don't want to confuse clients / RBD with objectstore error
+ // values in particular ENOENT. We may have different error returns
+ // from different shards, so we'll return minimum_to_decode() error
+ // (usually EIO) to reader. It is likely an error here is due to a
+ // damaged pg.
+ rop.complete[obj].r = err;
+ ++is_complete;
+ }
+ } else {
+ assert(rop.complete[obj].r == 0);
+ if (!rop.complete[obj].errors.empty()) {
+ using crimson::common::local_conf;
+ if (local_conf()->osd_read_ec_check_for_errors) {
+ logger().info("{}: not ignoring errors, use one shard err={}",
+ __func__, err);
+ err = rop.complete[obj].errors.begin()->second;
+ rop.complete[obj].r = err;
+ } else {
+ logger().info("{}: error(s) ignored for {} enough copies available",
+ __func__, obj);
+ rop.complete[obj].errors.clear();
+ }
+ }
+ // avoid re-read for completed object as we may send remaining reads for uncopmpleted objects
+ rop.to_read.at(obj).need.clear();
+ rop.to_read.at(obj).want_attrs = false;
+ ++is_complete;
+ }
+ }
+ }
+ if (need_resend) {
+ read_pipeline.do_read_op(rop);
+ } else if (rop.in_progress.empty() ||
+ is_complete == rop.complete.size()) {
+ //logger().debug("{}: complete {}", __func__, rop);
+ read_pipeline.complete_read_op(rop);
+ } else {
+ //logger().info("{}: readop not completed: {}", __func__, rop);
+ }
return ll_read_ierrorator::now();
}