From 9f5473c0281524a0aca04c4c0bf05b938716d070 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Rados=C5=82aw=20Zarzy=C5=84ski?= Date: Tue, 26 Sep 2023 17:47:24 +0200 Subject: [PATCH] crimson/osd: implement ECBackend::handle_rep_read_reply() MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Radosław Zarzyński --- src/crimson/osd/ec_backend.cc | 135 +++++++++++++++++++++++++++++++++- 1 file changed, 134 insertions(+), 1 deletion(-) diff --git a/src/crimson/osd/ec_backend.cc b/src/crimson/osd/ec_backend.cc index d07caafab3e..9d06eea6361 100644 --- a/src/crimson/osd/ec_backend.cc +++ b/src/crimson/osd/ec_backend.cc @@ -281,8 +281,141 @@ ECBackend::handle_rep_read_op(Ref m) } ECBackend::ll_read_ierrorator::future<> -ECBackend::handle_rep_read_reply(Ref) +ECBackend::handle_rep_read_reply(Ref m) { + const auto& from = m->op.from; + auto& mop = m->op; + //logger().debug("{}: reply {}", __func__, mop); + logger().debug("{}: reply {} from {}", __func__, "", from); + if (!read_pipeline.tid_to_read_map.contains(mop.tid)) { + //canceled + logger().debug("{}: canceled", __func__); + return ll_read_ierrorator::now(); + } + auto& rop = read_pipeline.tid_to_read_map.at(mop.tid); + + // 1. data + for (auto& [obj, buffers] : mop.buffers_read) { + // if attribute error we better not have sent a buffer + assert(!mop.errors.contains(obj)); + if (!rop.to_read.contains(obj)) { + // We canceled this read! @see filter_read_op + logger().debug("{}: to_read skipping", __func__); + continue; + } + // would be cool to have the C++23's view::zip + auto req_iter = std::begin(rop.to_read.find(obj)->second.to_read); + auto ret_iter = std::begin(rop.complete[obj].returned); + for (auto& [len, buffer] : buffers) { + // bolierplate + assert(req_iter != std::end(rop.to_read.find(obj)->second.to_read)); + assert(ret_iter != std::end(rop.complete[obj].returned)); + + const auto adjusted = + sinfo.chunk_aligned_offset_len_to_chunk( + std::make_pair(req_iter->offset, req_iter->size)); + assert(adjusted.first == len); + + ret_iter->get<2>()[from] = std::move(buffer); + // bolierplate + ++req_iter; + ++ret_iter; + } + } + // 2. attrs + for (auto& [obj, attrs] : mop.attrs_read) { + assert(!mop.errors.contains(obj)); // if read error better not have sent an attribute + if (!rop.to_read.contains(obj)) { + // we canceled this read! @see filter_read_op + logger().debug("{}: to_read skipping", __func__); + continue; + } + rop.complete[obj].attrs.emplace(); + (*(rop.complete[obj].attrs)).swap(attrs); + } + // 3. errors + for (auto& [obj, errcode] : mop.errors) { + rop.complete[obj].errors.emplace(from, errcode); + logger().debug("{} shard={} error={}", __func__, from, errcode); + } + { + auto it = read_pipeline.shard_to_read_map.find(from); + assert(it != std::end(read_pipeline.shard_to_read_map)); + // second is a set of all ongoing requests + auto& txc_registry = it->second; + assert(txc_registry.contains(mop.tid)); + txc_registry.erase(mop.tid); + } + assert(rop.in_progress.contains(from)); + rop.in_progress.erase(from); + // For redundant reads check for completion as each shard comes in, + // or in a non-recovery read check for completion once all the shards read. + unsigned is_complete = 0; + bool need_resend = false; + if (rop.do_redundant_reads || rop.in_progress.empty()) { + for (const auto& [obj, read_result] : rop.complete) { + std::set have; + for (const auto& [shard, bl] : read_result.returned.front().get<2>()) { + have.emplace(shard.shard); + logger().debug("{} have shard={}", __func__, shard); + } + std::map>> dummy_minimum; + int err; + if ((err = ec_impl->minimum_to_decode(rop.want_to_read[obj], have, &dummy_minimum)) < 0) { + logger().debug("{} minimum_to_decode failed {}", __func__, err); + if (rop.in_progress.empty()) { + // If we don't have enough copies, try other pg_shard_ts if available. + // During recovery there may be multiple osds with copies of the same shard, + // so getting EIO from one may result in multiple passes through this code path. + if (!rop.do_redundant_reads) { + // TODO: + int r = 0;// read_pipeline.send_all_remaining_reads(obj, rop); + if (r == 0) { + // We changed the rop's to_read and not incrementing is_complete + need_resend = true; + continue; + } + // Couldn't read any additional shards so handle as completed with errors + } + // We don't want to confuse clients / RBD with objectstore error + // values in particular ENOENT. We may have different error returns + // from different shards, so we'll return minimum_to_decode() error + // (usually EIO) to reader. It is likely an error here is due to a + // damaged pg. + rop.complete[obj].r = err; + ++is_complete; + } + } else { + assert(rop.complete[obj].r == 0); + if (!rop.complete[obj].errors.empty()) { + using crimson::common::local_conf; + if (local_conf()->osd_read_ec_check_for_errors) { + logger().info("{}: not ignoring errors, use one shard err={}", + __func__, err); + err = rop.complete[obj].errors.begin()->second; + rop.complete[obj].r = err; + } else { + logger().info("{}: error(s) ignored for {} enough copies available", + __func__, obj); + rop.complete[obj].errors.clear(); + } + } + // avoid re-read for completed object as we may send remaining reads for uncopmpleted objects + rop.to_read.at(obj).need.clear(); + rop.to_read.at(obj).want_attrs = false; + ++is_complete; + } + } + } + if (need_resend) { + read_pipeline.do_read_op(rop); + } else if (rop.in_progress.empty() || + is_complete == rop.complete.size()) { + //logger().debug("{}: complete {}", __func__, rop); + read_pipeline.complete_read_op(rop); + } else { + //logger().info("{}: readop not completed: {}", __func__, rop); + } return ll_read_ierrorator::now(); } -- 2.47.3