From: Radoslaw Zarzynski Date: Thu, 2 May 2024 13:29:52 +0000 (+0000) Subject: crimson/osd: add support for reads over EC pool X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=f3df30d0f63b5c207cdad4ffd4094f13c4db4f79;p=ceph-ci.git crimson/osd: add support for reads over EC pool Signed-off-by: Radoslaw Zarzynski --- diff --git a/src/crimson/osd/ec_backend.cc b/src/crimson/osd/ec_backend.cc index d57ebfe3372..4696deaf6f4 100644 --- a/src/crimson/osd/ec_backend.cc +++ b/src/crimson/osd/ec_backend.cc @@ -4,6 +4,7 @@ #include "crimson/osd/pg.h" #include "crimson/osd/shard_services.h" #include "ec_backend.h" +#include "include/Context.h" #include "osd/PGTransaction.h" #include "osd/ECTransaction.h" @@ -47,19 +48,57 @@ ECBackend::ECBackend(pg_shard_t whoami, sinfo(ec_impl, stripe_width), fast_read{fast_read}, allows_ecoverwrites{allows_ecoverwrites}, - read_pipeline{shard_services.get_cct(), ec_impl, sinfo, &eclistener}, + read_pipeline{shard_services.get_cct(), ec_impl, sinfo, &eclistener, *this}, rmw_pipeline{shard_services.get_cct(), ec_impl, sinfo, &eclistener, *this} { } ECBackend::ll_read_ierrorator::future ECBackend::_read(const hobject_t& hoid, + const uint64_t object_size, const uint64_t off, const uint64_t len, const uint32_t flags) { - // todo - return seastar::make_ready_future(); + LOG_PREFIX(ECBackend::_read); + const auto [aligned_off, aligned_len] = + sinfo.ro_offset_len_to_stripe_ro_offset_len(off, len); + std::map> reads; + reads[hoid].emplace_back( + ec_align_t{aligned_off, aligned_len, flags}); + seastar::promise promise; + auto ret = promise.get_future(); + objects_read_and_reconstruct( + reads, + fast_read, + object_size, + make_gen_lambda_context( + [hoid, off, len, promise=std::move(promise), FNAME](auto&& results) mutable { + ceph_assert(results.size() == 1); + ceph_assert(results.count(hoid) == 1); + auto& got = results.at(hoid); + if (got.err < 0) { + ceph_abort_msg("implement error handling"); + return; + } + auto range = got.emap.get_containing_range(off, len); + ceph_assert(range.first != range.second); + ceph_assert(range.first.get_off() <= off); + DEBUG("offset: {}", off); + DEBUG("range offset: {}", range.first.get_off()); + DEBUG("length: {}", len); + DEBUG("range length: {}", range.first.get_len()); + ceph_assert( + (off + len) <= + (range.first.get_off() + range.first.get_len())); + ceph::bufferlist clients_bl; + clients_bl.substr_of( + range.first.get_val(), + off - range.first.get_off(), + len); + promise.set_value(std::move(clients_bl)); + })); + return ret; } struct ECCrimsonOp : ECCommon::RMWPipeline::Op { @@ -358,6 +397,18 @@ ECBackend::handle_sub_write( }); } +void ECBackend::handle_sub_read_n_reply( + pg_shard_t from, + ECSubRead &op, + const ZTracer::Trace &) +{ + std::ignore = seastar::do_with(std::move(op), [this](auto&& op) { + return handle_rep_read_op(op).si_then([this](auto&& reply) { + return this->handle_rep_read_reply(reply); + }); + }); +} + void ECBackend::handle_sub_write( pg_shard_t from, OpRequestRef msg, @@ -499,19 +550,27 @@ void ECBackend::objects_read_and_reconstruct( reads, fast_read, std::move(func)); } -ECBackend::ll_read_ierrorator::future<> +ECBackend::ll_read_ierrorator::future ECBackend::handle_rep_read_op(Ref m) +{ + return handle_rep_read_op(m->op).finally([m=std::move(m)] {}); +} + +ECBackend::ll_read_ierrorator::future +ECBackend::handle_rep_read_op(ECSubRead& op) { LOG_PREFIX(ECBackend::handle_rep_read_op); return seastar::do_with(ECSubReadReply{}, - [m=std::move(m), FNAME, this] (auto&& reply) { - const ECSubRead &op = m->op; + [&op, FNAME, this] (auto&& reply) { reply.from = whoami; reply.tid = op.tid; using read_ertr = crimson::os::FuturizedStore::Shard::read_errorator; + DEBUG("op_list {}", op.to_read); return interruptor::do_for_each(op.to_read, [FNAME, &op, &reply, this] (auto read_item) { const auto& [obj, op_list] = read_item; - return interruptor::do_for_each(op_list, [FNAME, &op, &reply, obj, this] (auto op_spec) { + // `obj=obj` is workaround for Clang's bug: + // https://www.reddit.com/r/LLVM/comments/s0ykcj/why_does_clang_fail_with_error_reference_to_local/?rdt=36162 + return interruptor::do_for_each(op_list, [FNAME, &op, &reply, obj=obj, this] (auto op_spec) { const auto& [off, size, flags] = op_spec; return maybe_chunked_read( obj, op, off, size, flags @@ -553,6 +612,8 @@ ECBackend::handle_rep_read_op(Ref m) return read_ertr::now(); })); }); + }).si_then([&reply] { + return read_ertr::make_ready_future(std::move(reply)); }); }); } @@ -560,8 +621,13 @@ ECBackend::handle_rep_read_op(Ref m) ECBackend::ll_read_ierrorator::future<> ECBackend::handle_rep_read_reply(Ref m) { - const auto& from = m->op.from; - auto& mop = m->op; + return handle_rep_read_reply(m->op).finally([m=std::move(m)] {}); +} + +ECBackend::ll_read_ierrorator::future<> +ECBackend::handle_rep_read_reply(ECSubReadReply& mop) +{ + const auto& from = mop.from; logger().debug("{}: reply {} from {}", __func__, mop, from); if (!read_pipeline.tid_to_read_map.contains(mop.tid)) { //canceled diff --git a/src/crimson/osd/ec_backend.h b/src/crimson/osd/ec_backend.h index c228e01456d..bbf2fff70c0 100644 --- a/src/crimson/osd/ec_backend.h +++ b/src/crimson/osd/ec_backend.h @@ -46,14 +46,20 @@ public: Ref, crimson::osd::PG& pg); write_iertr::future<> handle_rep_write_reply(ECSubWriteReply&& op); - ll_read_ierrorator::future<> handle_rep_read_op(Ref); + ll_read_ierrorator::future handle_rep_read_op(ECSubRead&); + ll_read_ierrorator::future handle_rep_read_op(Ref); + ll_read_ierrorator::future<> handle_rep_read_reply(ECSubReadReply& mop); ll_read_ierrorator::future<> handle_rep_read_reply(Ref); private: friend class ECRecoveryBackend; ll_read_ierrorator::future - _read(const hobject_t& hoid, uint64_t off, uint64_t len, uint32_t flags) override; + _read(const hobject_t& hoid, + uint64_t object_size, + uint64_t off, + uint64_t len, + uint32_t flags) final; rep_op_fut_t submit_transaction(const std::set &pg_shards, crimson::osd::ObjectContextRef&& obc, @@ -79,6 +85,11 @@ private: const ZTracer::Trace &trace, ECListener& eclistener) override; + void handle_sub_read_n_reply( + pg_shard_t from, + ECSubRead &op, + const ZTracer::Trace &trace) override; + bool is_single_chunk(const hobject_t& obj, const ECSubRead& op); ll_read_errorator::future maybe_chunked_read( diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc index dfb99101e03..7c3c27f9510 100644 --- a/src/crimson/osd/pg.cc +++ b/src/crimson/osd/pg.cc @@ -1518,7 +1518,16 @@ PG::interruptible_future<> PG::handle_rep_read_op(Ref m) assert(ec_backend); return ec_backend->handle_rep_read_op( std::move(m) - ).handle_error_interruptible(crimson::ct_error::assert_all{}); + ).si_then([then_lcod=peering_state.get_info().last_complete, + this](auto&& rep) { + auto reply = crimson::make_message(); + reply->pgid = spg_t(peering_state.get_info().pgid.pgid, get_primary().shard); + reply->map_epoch = get_osdmap_epoch(); + reply->min_epoch = get_interval_start_epoch(); + reply->op = std::move(rep); + return shard_services.send_to_osd( + get_primary().osd, std::move(reply), get_osdmap_epoch()); + }).handle_error_interruptible(crimson::ct_error::assert_all{}); } PG::interruptible_future<> PG::do_update_log_missing(