]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/osd: implement ECBackend::handle_rep_read_reply()
authorRadosław Zarzyński <rzarzyns@redhat.com>
Tue, 26 Sep 2023 15:47:24 +0000 (17:47 +0200)
committerRadoslaw Zarzynski <rzarzyns@redhat.com>
Tue, 27 Jan 2026 14:37:36 +0000 (14:37 +0000)
Signed-off-by: Radosław Zarzyński <rzarzyns@redhat.com>
src/crimson/osd/ec_backend.cc

index d07caafab3e1ea56a11accb3b9c9d1a2330b75b0..9d06eea6361283a8a9de9595b03b05f1282f7832 100644 (file)
@@ -281,8 +281,141 @@ ECBackend::handle_rep_read_op(Ref<MOSDECSubOpRead> m)
 }
 
 ECBackend::ll_read_ierrorator::future<>
-ECBackend::handle_rep_read_reply(Ref<MOSDECSubOpReadReply>)
+ECBackend::handle_rep_read_reply(Ref<MOSDECSubOpReadReply> m)
 {
+  const auto& from = m->op.from;
+  auto& mop = m->op;
+  //logger().debug("{}: reply {}", __func__, mop);
+  logger().debug("{}: reply {} from {}", __func__, "", from);
+  if (!read_pipeline.tid_to_read_map.contains(mop.tid)) {
+    //canceled
+    logger().debug("{}: canceled", __func__);
+    return ll_read_ierrorator::now();
+  }
+  auto& rop = read_pipeline.tid_to_read_map.at(mop.tid);
+
+  // 1. data
+  for (auto& [obj, buffers] : mop.buffers_read) {
+    // if attribute error we better not have sent a buffer
+    assert(!mop.errors.contains(obj));
+    if (!rop.to_read.contains(obj)) {
+      // We canceled this read! @see filter_read_op
+      logger().debug("{}: to_read skipping", __func__);
+      continue;
+    }
+    // would be cool to have the C++23's view::zip
+    auto req_iter = std::begin(rop.to_read.find(obj)->second.to_read);
+    auto ret_iter = std::begin(rop.complete[obj].returned);
+    for (auto& [len, buffer] : buffers) {
+      // bolierplate
+      assert(req_iter != std::end(rop.to_read.find(obj)->second.to_read));
+      assert(ret_iter != std::end(rop.complete[obj].returned));
+
+      const auto adjusted =
+       sinfo.chunk_aligned_offset_len_to_chunk(
+         std::make_pair(req_iter->offset, req_iter->size));
+      assert(adjusted.first == len);
+
+      ret_iter->get<2>()[from] = std::move(buffer);
+      // bolierplate
+      ++req_iter;
+      ++ret_iter;
+    }
+  }
+  // 2. attrs
+  for (auto& [obj, attrs] : mop.attrs_read) {
+    assert(!mop.errors.contains(obj)); // if read error better not have sent an attribute
+    if (!rop.to_read.contains(obj)) {
+      // we canceled this read! @see filter_read_op
+      logger().debug("{}: to_read skipping", __func__);
+      continue;
+    }
+    rop.complete[obj].attrs.emplace();
+    (*(rop.complete[obj].attrs)).swap(attrs);
+  }
+  // 3. errors
+  for (auto& [obj, errcode] : mop.errors) {
+    rop.complete[obj].errors.emplace(from, errcode);
+    logger().debug("{} shard={} error={}", __func__, from, errcode);
+  }
+  {
+    auto it = read_pipeline.shard_to_read_map.find(from);
+    assert(it != std::end(read_pipeline.shard_to_read_map));
+    // second is a set of all ongoing requests
+    auto& txc_registry = it->second;
+    assert(txc_registry.contains(mop.tid));
+    txc_registry.erase(mop.tid);
+  }
+  assert(rop.in_progress.contains(from));
+  rop.in_progress.erase(from);
+  // For redundant reads check for completion as each shard comes in,
+  // or in a non-recovery read check for completion once all the shards read.
+  unsigned is_complete = 0;
+  bool need_resend = false;
+  if (rop.do_redundant_reads || rop.in_progress.empty()) {
+    for (const auto& [obj, read_result] : rop.complete) {
+      std::set<int> have;
+      for (const auto& [shard, bl] : read_result.returned.front().get<2>()) {
+        have.emplace(shard.shard);
+       logger().debug("{} have shard={}", __func__, shard);
+      }
+      std::map<int, std::vector<std::pair<int, int>>> dummy_minimum;
+      int err;
+      if ((err = ec_impl->minimum_to_decode(rop.want_to_read[obj], have, &dummy_minimum)) < 0) {
+       logger().debug("{} minimum_to_decode failed {}", __func__, err);
+        if (rop.in_progress.empty()) {
+         // If we don't have enough copies, try other pg_shard_ts if available.
+         // During recovery there may be multiple osds with copies of the same shard,
+         // so getting EIO from one may result in multiple passes through this code path.
+         if (!rop.do_redundant_reads) {
+           // TODO:
+           int r = 0;// read_pipeline.send_all_remaining_reads(obj, rop);
+           if (r == 0) {
+             // We changed the rop's to_read and not incrementing is_complete
+             need_resend = true;
+             continue;
+           }
+           // Couldn't read any additional shards so handle as completed with errors
+         }
+         // We don't want to confuse clients / RBD with objectstore error
+         // values in particular ENOENT.  We may have different error returns
+         // from different shards, so we'll return minimum_to_decode() error
+         // (usually EIO) to reader.  It is likely an error here is due to a
+         // damaged pg.
+         rop.complete[obj].r = err;
+         ++is_complete;
+       }
+      } else {
+        assert(rop.complete[obj].r == 0);
+       if (!rop.complete[obj].errors.empty()) {
+         using crimson::common::local_conf;
+         if (local_conf()->osd_read_ec_check_for_errors) {
+           logger().info("{}: not ignoring errors, use one shard err={}",
+                         __func__, err);
+           err = rop.complete[obj].errors.begin()->second;
+            rop.complete[obj].r = err;
+         } else {
+           logger().info("{}: error(s) ignored for {} enough copies available",
+                         __func__, obj);
+           rop.complete[obj].errors.clear();
+         }
+       }
+       // avoid re-read for completed object as we may send remaining reads for uncopmpleted objects
+       rop.to_read.at(obj).need.clear();
+       rop.to_read.at(obj).want_attrs = false;
+       ++is_complete;
+      }
+    }
+  }
+  if (need_resend) {
+    read_pipeline.do_read_op(rop);
+  } else if (rop.in_progress.empty() ||
+             is_complete == rop.complete.size()) {
+    //logger().debug("{}: complete {}", __func__, rop);
+    read_pipeline.complete_read_op(rop);
+  } else {
+    //logger().info("{}: readop not completed: {}", __func__, rop);
+  }
   return ll_read_ierrorator::now();
 }