]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
fixup: crimson/osd: implement ECBackend::handle_rep_read_reply()
authorRadoslaw Zarzynski <rzarzyns@redhat.com>
Wed, 21 May 2025 13:40:38 +0000 (13:40 +0000)
committerRadoslaw Zarzynski <rzarzyns@redhat.com>
Tue, 27 Jan 2026 19:42:59 +0000 (19:42 +0000)
Signed-off-by: Radoslaw Zarzynski <rzarzyns@redhat.com>
src/crimson/osd/ec_backend.cc

index 144c481fb73774501095625cbbad44e05c92d64b..5262b7ed8177409a93f5b9915561bdd8db1fffc1 100644 (file)
@@ -643,7 +643,7 @@ ECBackend::handle_rep_read_reply(ECSubReadReply& mop)
   auto& rop = read_pipeline.tid_to_read_map.at(mop.tid);
 
   // 1. data
-  for (auto& [obj, buffers] : mop.buffers_read) {
+  for (auto& [obj, offset_buffer_map] : mop.buffers_read) {
     // if attribute error we better not have sent a buffer
     assert(!mop.errors.contains(obj));
     if (!rop.to_read.contains(obj)) {
@@ -651,40 +651,60 @@ ECBackend::handle_rep_read_reply(ECSubReadReply& mop)
       logger().debug("{}: to_read skipping", __func__);
       continue;
     }
-    // would be cool to have the C++23's view::zip
-    auto req_iter = std::begin(rop.to_read.find(obj)->second.to_read);
-    auto ret_iter = std::begin(rop.complete[obj].returned);
-    for (auto& [len, buffer] : buffers) {
-      // bolierplate
-      assert(req_iter != std::end(rop.to_read.find(obj)->second.to_read));
-      assert(ret_iter != std::end(rop.complete[obj].returned));
-
-      const auto adjusted =
-       sinfo.chunk_aligned_offset_len_to_chunk(
-         std::make_pair(req_iter->offset, req_iter->size));
-      assert(adjusted.first == len);
-
-      ret_iter->get<2>()[from] = std::move(buffer);
-      // bolierplate
-      ++req_iter;
-      ++ret_iter;
+    if (!rop.complete.contains(obj)) {
+      rop.complete.emplace(obj, &sinfo);
+    }
+    auto &buffers_read = rop.complete.at(obj).buffers_read;
+    for (auto &&[offset, buffer_list]: offset_buffer_map) {
+      buffers_read.insert_in_shard(from.shard, offset, buffer_list);
+    }
+  }
+  for (auto &&[hoid, req]: rop.to_read) {
+    if (!rop.complete.contains(hoid)) {
+      rop.complete.emplace(hoid, &sinfo);
+    }
+    auto &complete = rop.complete.at(hoid);
+    for (auto &&[shard, read]: std::as_const(req.shard_reads)) {
+      if (complete.errors.contains(read.pg_shard)) continue;
+
+      complete.processed_read_requests[shard].union_of(read.extents);
+
+      if (!rop.complete.contains(hoid) ||
+        !complete.buffers_read.contains(shard)) {
+        if (!read.extents.empty()) continue; // Complete the actual read first.
+
+        // If we are first here, populate the completion.
+        if (!rop.complete.contains(hoid)) {
+          rop.complete.emplace(hoid, read_result_t(&sinfo));
+        }
+      }
     }
   }
   // 2. attrs
-  for (auto& [obj, attrs] : mop.attrs_read) {
-    assert(!mop.errors.contains(obj)); // if read error better not have sent an attribute
-    if (!rop.to_read.contains(obj)) {
+  for (auto &&[hoid, attr]: mop.attrs_read) {
+    assert(!mop.errors.count(hoid));
+    // if read error better not have sent an attribute
+    if (!rop.to_read.count(hoid)) {
       // we canceled this read! @see filter_read_op
       logger().debug("{}: to_read skipping", __func__);
       continue;
     }
-    rop.complete[obj].attrs.emplace();
-    (*(rop.complete[obj].attrs)).swap(attrs);
+    if (!rop.complete.contains(hoid)) {
+      rop.complete.emplace(hoid, &sinfo);
+    }
+    rop.complete.at(hoid).attrs.emplace();
+    (*(rop.complete.at(hoid).attrs)).swap(attr);
   }
   // 3. errors
-  for (auto& [obj, errcode] : mop.errors) {
-    rop.complete[obj].errors.emplace(from, errcode);
-    logger().debug("{} shard={} error={}", __func__, from, errcode);
+  for (auto &&[hoid, err]: mop.errors) {
+    if (!rop.complete.contains(hoid)) {
+      rop.complete.emplace(hoid, &sinfo);
+    }
+    auto &complete = rop.complete.at(hoid);
+    complete.errors.emplace(from, err);
+    complete.buffers_read.erase_shard(from.shard);
+    complete.processed_read_requests.erase(from.shard);
+    logger().debug("{} shard={} error={}", __func__, from, err);
   }
   {
     auto it = read_pipeline.shard_to_read_map.find(from);
@@ -701,57 +721,65 @@ ECBackend::handle_rep_read_reply(ECSubReadReply& mop)
   unsigned is_complete = 0;
   bool need_resend = false;
   if (rop.do_redundant_reads || rop.in_progress.empty()) {
-    for (const auto& [obj, read_result] : rop.complete) {
-      std::set<int> have;
-      for (const auto& [shard, bl] : read_result.returned.front().get<2>()) {
-        have.emplace(shard.shard);
-       logger().debug("{} have shard={}", __func__, shard);
-      }
-      std::map<int, std::vector<std::pair<int, int>>> dummy_minimum;
-      int err;
-      if ((err = ec_impl->minimum_to_decode(rop.want_to_read[obj], have, &dummy_minimum)) < 0) {
+    for (auto &&[oid, read_result]: rop.complete) {
+      shard_id_set have;
+      read_result.processed_read_requests.populate_shard_id_set(have);
+      shard_id_set dummy_minimum;
+      shard_id_set want_to_read;
+      rop.to_read.at(oid).shard_want_to_read.
+          populate_shard_id_set(want_to_read);
+
+      int err = ec_impl->minimum_to_decode(want_to_read, have, dummy_minimum,
+                                            nullptr);
+      if (err) {
        logger().debug("{} minimum_to_decode failed {}", __func__, err);
         if (rop.in_progress.empty()) {
-         // If we don't have enough copies, try other pg_shard_ts if available.
-         // During recovery there may be multiple osds with copies of the same shard,
-         // so getting EIO from one may result in multiple passes through this code path.
-         if (!rop.do_redundant_reads) {
-           // TODO:
-           int r = 0;// read_pipeline.send_all_remaining_reads(obj, rop);
-           if (r == 0) {
-             // We changed the rop's to_read and not incrementing is_complete
-             need_resend = true;
-             continue;
-           }
-           // Couldn't read any additional shards so handle as completed with errors
-         }
-         // We don't want to confuse clients / RBD with objectstore error
-         // values in particular ENOENT.  We may have different error returns
-         // from different shards, so we'll return minimum_to_decode() error
-         // (usually EIO) to reader.  It is likely an error here is due to a
-         // damaged pg.
-         rop.complete[obj].r = err;
-         ++is_complete;
-       }
-      } else {
-        assert(rop.complete[obj].r == 0);
-       if (!rop.complete[obj].errors.empty()) {
+          // If we don't have enough copies, try other pg_shard_ts if available.
+          // During recovery there may be multiple osds with copies of the same shard,
+          // so getting EIO from one may result in multiple passes through this code path.
+          if (!rop.do_redundant_reads) {
+            int r = read_pipeline.send_all_remaining_reads(oid, rop);
+            if (r == 0) {
+              // We found that new reads are required to do a decode.
+              need_resend = true;
+              continue;
+            } else if (r >  0) {
+              // No new reads were requested. This means that some parity
+              // shards can be assumed to be zeros.
+              err = 0;
+            }
+            // else insufficient shards are available, keep the errors.
+          }
+          // Couldn't read any additional shards so handle as completed with errors
+          // We don't want to confuse clients / RBD with objectstore error
+          // values in particular ENOENT.  We may have different error returns
+          // from different shards, so we'll return minimum_to_decode() error
+          // (usually EIO) to reader.  It is likely an error here is due to a
+          // damaged pg.
+          rop.complete.at(oid).r = err;
+          ++is_complete;
+        }
+      }
+
+      if (!err) {
+        ceph_assert(rop.complete.at(oid).r == 0);
+        if (!rop.complete.at(oid).errors.empty()) {
          using crimson::common::local_conf;
          if (local_conf()->osd_read_ec_check_for_errors) {
            logger().info("{}: not ignoring errors, use one shard err={}",
                          __func__, err);
-           err = rop.complete[obj].errors.begin()->second;
-            rop.complete[obj].r = err;
+            err = rop.complete.at(oid).errors.begin()->second;
+            rop.complete.at(oid).r = err;
          } else {
            logger().info("{}: error(s) ignored for {} enough copies available",
-                         __func__, obj);
-           rop.complete[obj].errors.clear();
+                         __func__, oid);
+            rop.complete.at(oid).errors.clear();
          }
-       }
-       // avoid re-read for completed object as we may send remaining reads for uncopmpleted objects
-       rop.to_read.at(obj).need.clear();
-       rop.to_read.at(obj).want_attrs = false;
-       ++is_complete;
+        }
+        // avoid re-read for completed object as we may send remaining reads for uncopmpleted objects
+        rop.to_read.at(oid).shard_reads.clear();
+        rop.to_read.at(oid).want_attrs = false;
+        ++is_complete;
       }
     }
   }
@@ -760,7 +788,7 @@ ECBackend::handle_rep_read_reply(ECSubReadReply& mop)
   } else if (rop.in_progress.empty() ||
              is_complete == rop.complete.size()) {
     logger().debug("{}: complete {}", __func__, rop);
-    read_pipeline.complete_read_op(rop);
+    read_pipeline.complete_read_op(std::move(rop));
   } else {
     logger().info("{}: readop not completed yet: {}", __func__, rop);
   }