]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/osd: add support for reads over EC pool
authorRadoslaw Zarzynski <rzarzyns@redhat.com>
Thu, 2 May 2024 13:29:52 +0000 (13:29 +0000)
committerRadoslaw Zarzynski <rzarzyns@redhat.com>
Tue, 27 Jan 2026 14:47:44 +0000 (14:47 +0000)
Signed-off-by: Radoslaw Zarzynski <rzarzyns@redhat.com>
src/crimson/osd/ec_backend.cc
src/crimson/osd/ec_backend.h
src/crimson/osd/pg.cc

index d57ebfe3372779bcdbeccabf93f6143e2e287adf..4696deaf6f4b401ac2600a3e9adc7fe086d760db 100644 (file)
@@ -4,6 +4,7 @@
 #include "crimson/osd/pg.h"
 #include "crimson/osd/shard_services.h"
 #include "ec_backend.h"
+#include "include/Context.h"
 
 #include "osd/PGTransaction.h"
 #include "osd/ECTransaction.h"
@@ -47,19 +48,57 @@ ECBackend::ECBackend(pg_shard_t whoami,
     sinfo(ec_impl, stripe_width),
     fast_read{fast_read},
     allows_ecoverwrites{allows_ecoverwrites},
-    read_pipeline{shard_services.get_cct(), ec_impl, sinfo, &eclistener},
+    read_pipeline{shard_services.get_cct(), ec_impl, sinfo, &eclistener, *this},
     rmw_pipeline{shard_services.get_cct(), ec_impl, sinfo, &eclistener, *this}
 {
 }
 
 ECBackend::ll_read_ierrorator::future<ceph::bufferlist>
 ECBackend::_read(const hobject_t& hoid,
+                 const uint64_t object_size,
                  const uint64_t off,
                  const uint64_t len,
                  const uint32_t flags)
 {
-  // todo
-  return seastar::make_ready_future<bufferlist>();
+  LOG_PREFIX(ECBackend::_read);
+  const auto [aligned_off, aligned_len] =
+    sinfo.ro_offset_len_to_stripe_ro_offset_len(off, len);
+  std::map<hobject_t, std::list<ec_align_t>> reads;
+  reads[hoid].emplace_back(
+    ec_align_t{aligned_off, aligned_len, flags});
+  seastar::promise<ceph::bufferlist> promise;
+  auto ret = promise.get_future();
+  objects_read_and_reconstruct(
+    reads,
+    fast_read,
+    object_size,
+    make_gen_lambda_context<ec_extents_t &&>(
+      [hoid, off, len, promise=std::move(promise), FNAME](auto&& results) mutable {
+        ceph_assert(results.size() == 1);
+        ceph_assert(results.count(hoid) == 1);
+        auto& got = results.at(hoid);
+        if (got.err < 0) {
+          ceph_abort_msg("implement error handling");
+          return;
+        }
+       auto range = got.emap.get_containing_range(off, len);
+       ceph_assert(range.first != range.second);
+       ceph_assert(range.first.get_off() <= off);
+        DEBUG("offset: {}", off);
+        DEBUG("range offset: {}", range.first.get_off());
+        DEBUG("length: {}", len);
+        DEBUG("range length: {}", range.first.get_len());
+       ceph_assert(
+         (off + len) <=
+         (range.first.get_off() + range.first.get_len()));
+        ceph::bufferlist clients_bl;
+       clients_bl.substr_of(
+         range.first.get_val(),
+         off - range.first.get_off(),
+         len);
+        promise.set_value(std::move(clients_bl));
+      }));
+  return ret;
 }
 
 struct ECCrimsonOp : ECCommon::RMWPipeline::Op {
@@ -358,6 +397,18 @@ ECBackend::handle_sub_write(
   });
 }
 
+void ECBackend::handle_sub_read_n_reply(
+  pg_shard_t from,
+  ECSubRead &op,
+  const ZTracer::Trace &)
+{
+  std::ignore = seastar::do_with(std::move(op), [this](auto&& op) {
+    return handle_rep_read_op(op).si_then([this](auto&& reply) {
+      return this->handle_rep_read_reply(reply);
+    });
+  });
+}
+
 void ECBackend::handle_sub_write(
   pg_shard_t from,
   OpRequestRef msg,
@@ -499,19 +550,27 @@ void ECBackend::objects_read_and_reconstruct(
     reads, fast_read, std::move(func));
 }
 
-ECBackend::ll_read_ierrorator::future<>
+ECBackend::ll_read_ierrorator::future<ECSubReadReply>
 ECBackend::handle_rep_read_op(Ref<MOSDECSubOpRead> m)
+{
+  return handle_rep_read_op(m->op).finally([m=std::move(m)] {});
+}
+
+ECBackend::ll_read_ierrorator::future<ECSubReadReply>
+ECBackend::handle_rep_read_op(ECSubRead& op)
 {
   LOG_PREFIX(ECBackend::handle_rep_read_op);
   return seastar::do_with(ECSubReadReply{},
-                         [m=std::move(m), FNAME, this] (auto&& reply) {
-    const ECSubRead &op = m->op;
+                         [&op, FNAME, this] (auto&& reply) {
     reply.from = whoami;
     reply.tid = op.tid;
     using read_ertr = crimson::os::FuturizedStore::Shard::read_errorator;
+    DEBUG("op_list {}", op.to_read);
     return interruptor::do_for_each(op.to_read, [FNAME, &op, &reply, this] (auto read_item) {
       const auto& [obj, op_list] = read_item;
-      return interruptor::do_for_each(op_list, [FNAME, &op, &reply, obj, this] (auto op_spec) {
+      // `obj=obj` is workaround for Clang's bug:
+      // https://www.reddit.com/r/LLVM/comments/s0ykcj/why_does_clang_fail_with_error_reference_to_local/?rdt=36162
+      return interruptor::do_for_each(op_list, [FNAME, &op, &reply, obj=obj, this] (auto op_spec) {
         const auto& [off, size, flags] = op_spec;
         return maybe_chunked_read(
           obj, op, off, size, flags
@@ -553,6 +612,8 @@ ECBackend::handle_rep_read_op(Ref<MOSDECSubOpRead> m)
           return read_ertr::now();
        }));
       });
+    }).si_then([&reply] {
+      return read_ertr::make_ready_future<ECSubReadReply>(std::move(reply));
     });
   });
 }
@@ -560,8 +621,13 @@ ECBackend::handle_rep_read_op(Ref<MOSDECSubOpRead> m)
 ECBackend::ll_read_ierrorator::future<>
 ECBackend::handle_rep_read_reply(Ref<MOSDECSubOpReadReply> m)
 {
-  const auto& from = m->op.from;
-  auto& mop = m->op;
+  return handle_rep_read_reply(m->op).finally([m=std::move(m)] {});
+}
+
+ECBackend::ll_read_ierrorator::future<>
+ECBackend::handle_rep_read_reply(ECSubReadReply& mop)
+{
+  const auto& from = mop.from;
   logger().debug("{}: reply {} from {}", __func__, mop, from);
   if (!read_pipeline.tid_to_read_map.contains(mop.tid)) {
     //canceled
index c228e01456d8ea20964aacb98b38882b5c3a7000..bbf2fff70c0102e5b01b749e4e66db40812762a2 100644 (file)
@@ -46,14 +46,20 @@ public:
     Ref<MOSDECSubOpWrite>,
     crimson::osd::PG& pg);
   write_iertr::future<> handle_rep_write_reply(ECSubWriteReply&& op);
-  ll_read_ierrorator::future<> handle_rep_read_op(Ref<MOSDECSubOpRead>);
+  ll_read_ierrorator::future<ECSubReadReply> handle_rep_read_op(ECSubRead&);
+  ll_read_ierrorator::future<ECSubReadReply> handle_rep_read_op(Ref<MOSDECSubOpRead>);
+  ll_read_ierrorator::future<> handle_rep_read_reply(ECSubReadReply& mop);
   ll_read_ierrorator::future<> handle_rep_read_reply(Ref<MOSDECSubOpReadReply>);
 
 private:
   friend class ECRecoveryBackend;
 
   ll_read_ierrorator::future<ceph::bufferlist>
-  _read(const hobject_t& hoid, uint64_t off, uint64_t len, uint32_t flags) override;
+  _read(const hobject_t& hoid,
+        uint64_t object_size,
+        uint64_t off,
+        uint64_t len,
+        uint32_t flags) final;
   rep_op_fut_t
   submit_transaction(const std::set<pg_shard_t> &pg_shards,
                     crimson::osd::ObjectContextRef&& obc,
@@ -79,6 +85,11 @@ private:
     const ZTracer::Trace &trace,
     ECListener& eclistener) override;
 
+  void handle_sub_read_n_reply(
+    pg_shard_t from,
+    ECSubRead &op,
+    const ZTracer::Trace &trace) override;
+
   bool is_single_chunk(const hobject_t& obj, const ECSubRead& op);
 
   ll_read_errorator::future<ceph::bufferlist> maybe_chunked_read(
index dfb99101e032773f82a57b48468af92d157d2260..7c3c27f9510e04c2049f82d499ada877f9ad80fe 100644 (file)
@@ -1518,7 +1518,16 @@ PG::interruptible_future<> PG::handle_rep_read_op(Ref<MOSDECSubOpRead> m)
   assert(ec_backend);
   return ec_backend->handle_rep_read_op(
     std::move(m)
-  ).handle_error_interruptible(crimson::ct_error::assert_all{});
+  ).si_then([then_lcod=peering_state.get_info().last_complete,
+             this](auto&& rep) {
+    auto reply = crimson::make_message<MOSDECSubOpReadReply>();
+    reply->pgid = spg_t(peering_state.get_info().pgid.pgid, get_primary().shard);
+    reply->map_epoch = get_osdmap_epoch();
+    reply->min_epoch = get_interval_start_epoch();
+    reply->op = std::move(rep);
+    return shard_services.send_to_osd(
+      get_primary().osd, std::move(reply), get_osdmap_epoch());
+  }).handle_error_interruptible(crimson::ct_error::assert_all{});
 }
 
 PG::interruptible_future<> PG::do_update_log_missing(