#include "crimson/osd/pg.h"
#include "crimson/osd/shard_services.h"
#include "ec_backend.h"
+#include "include/Context.h"
#include "osd/PGTransaction.h"
#include "osd/ECTransaction.h"
sinfo(ec_impl, stripe_width),
fast_read{fast_read},
allows_ecoverwrites{allows_ecoverwrites},
- read_pipeline{shard_services.get_cct(), ec_impl, sinfo, &eclistener},
+ read_pipeline{shard_services.get_cct(), ec_impl, sinfo, &eclistener, *this},
rmw_pipeline{shard_services.get_cct(), ec_impl, sinfo, &eclistener, *this}
{
}
ECBackend::ll_read_ierrorator::future<ceph::bufferlist>
ECBackend::_read(const hobject_t& hoid,
+ const uint64_t object_size,
const uint64_t off,
const uint64_t len,
const uint32_t flags)
{
- // todo
- return seastar::make_ready_future<bufferlist>();
+ LOG_PREFIX(ECBackend::_read);
+ const auto [aligned_off, aligned_len] =
+ sinfo.ro_offset_len_to_stripe_ro_offset_len(off, len);
+ std::map<hobject_t, std::list<ec_align_t>> reads;
+ reads[hoid].emplace_back(
+ ec_align_t{aligned_off, aligned_len, flags});
+ seastar::promise<ceph::bufferlist> promise;
+ auto ret = promise.get_future();
+ objects_read_and_reconstruct(
+ reads,
+ fast_read,
+ object_size,
+ make_gen_lambda_context<ec_extents_t &&>(
+ [hoid, off, len, promise=std::move(promise), FNAME](auto&& results) mutable {
+ ceph_assert(results.size() == 1);
+ ceph_assert(results.count(hoid) == 1);
+ auto& got = results.at(hoid);
+ if (got.err < 0) {
+ ceph_abort_msg("implement error handling");
+ return;
+ }
+ auto range = got.emap.get_containing_range(off, len);
+ ceph_assert(range.first != range.second);
+ ceph_assert(range.first.get_off() <= off);
+ DEBUG("offset: {}", off);
+ DEBUG("range offset: {}", range.first.get_off());
+ DEBUG("length: {}", len);
+ DEBUG("range length: {}", range.first.get_len());
+ ceph_assert(
+ (off + len) <=
+ (range.first.get_off() + range.first.get_len()));
+ ceph::bufferlist clients_bl;
+ clients_bl.substr_of(
+ range.first.get_val(),
+ off - range.first.get_off(),
+ len);
+ promise.set_value(std::move(clients_bl));
+ }));
+ return ret;
}
struct ECCrimsonOp : ECCommon::RMWPipeline::Op {
});
}
+void ECBackend::handle_sub_read_n_reply(
+ pg_shard_t from,
+ ECSubRead &op,
+ const ZTracer::Trace &)
+{
+ std::ignore = seastar::do_with(std::move(op), [this](auto&& op) {
+ return handle_rep_read_op(op).si_then([this](auto&& reply) {
+ return this->handle_rep_read_reply(reply);
+ });
+ });
+}
+
void ECBackend::handle_sub_write(
pg_shard_t from,
OpRequestRef msg,
reads, fast_read, std::move(func));
}
-ECBackend::ll_read_ierrorator::future<>
+ECBackend::ll_read_ierrorator::future<ECSubReadReply>
ECBackend::handle_rep_read_op(Ref<MOSDECSubOpRead> m)
+{
+ return handle_rep_read_op(m->op).finally([m=std::move(m)] {});
+}
+
+ECBackend::ll_read_ierrorator::future<ECSubReadReply>
+ECBackend::handle_rep_read_op(ECSubRead& op)
{
LOG_PREFIX(ECBackend::handle_rep_read_op);
return seastar::do_with(ECSubReadReply{},
- [m=std::move(m), FNAME, this] (auto&& reply) {
- const ECSubRead &op = m->op;
+ [&op, FNAME, this] (auto&& reply) {
reply.from = whoami;
reply.tid = op.tid;
using read_ertr = crimson::os::FuturizedStore::Shard::read_errorator;
+ DEBUG("op_list {}", op.to_read);
return interruptor::do_for_each(op.to_read, [FNAME, &op, &reply, this] (auto read_item) {
const auto& [obj, op_list] = read_item;
- return interruptor::do_for_each(op_list, [FNAME, &op, &reply, obj, this] (auto op_spec) {
+ // `obj=obj` is workaround for Clang's bug:
+ // https://www.reddit.com/r/LLVM/comments/s0ykcj/why_does_clang_fail_with_error_reference_to_local/?rdt=36162
+ return interruptor::do_for_each(op_list, [FNAME, &op, &reply, obj=obj, this] (auto op_spec) {
const auto& [off, size, flags] = op_spec;
return maybe_chunked_read(
obj, op, off, size, flags
return read_ertr::now();
}));
});
+ }).si_then([&reply] {
+ return read_ertr::make_ready_future<ECSubReadReply>(std::move(reply));
});
});
}
ECBackend::ll_read_ierrorator::future<>
ECBackend::handle_rep_read_reply(Ref<MOSDECSubOpReadReply> m)
{
- const auto& from = m->op.from;
- auto& mop = m->op;
+ return handle_rep_read_reply(m->op).finally([m=std::move(m)] {});
+}
+
+ECBackend::ll_read_ierrorator::future<>
+ECBackend::handle_rep_read_reply(ECSubReadReply& mop)
+{
+ const auto& from = mop.from;
logger().debug("{}: reply {} from {}", __func__, mop, from);
if (!read_pipeline.tid_to_read_map.contains(mop.tid)) {
//canceled
Ref<MOSDECSubOpWrite>,
crimson::osd::PG& pg);
write_iertr::future<> handle_rep_write_reply(ECSubWriteReply&& op);
- ll_read_ierrorator::future<> handle_rep_read_op(Ref<MOSDECSubOpRead>);
+ ll_read_ierrorator::future<ECSubReadReply> handle_rep_read_op(ECSubRead&);
+ ll_read_ierrorator::future<ECSubReadReply> handle_rep_read_op(Ref<MOSDECSubOpRead>);
+ ll_read_ierrorator::future<> handle_rep_read_reply(ECSubReadReply& mop);
ll_read_ierrorator::future<> handle_rep_read_reply(Ref<MOSDECSubOpReadReply>);
private:
friend class ECRecoveryBackend;
ll_read_ierrorator::future<ceph::bufferlist>
- _read(const hobject_t& hoid, uint64_t off, uint64_t len, uint32_t flags) override;
+ _read(const hobject_t& hoid,
+ uint64_t object_size,
+ uint64_t off,
+ uint64_t len,
+ uint32_t flags) final;
rep_op_fut_t
submit_transaction(const std::set<pg_shard_t> &pg_shards,
crimson::osd::ObjectContextRef&& obc,
const ZTracer::Trace &trace,
ECListener& eclistener) override;
+ void handle_sub_read_n_reply(
+ pg_shard_t from,
+ ECSubRead &op,
+ const ZTracer::Trace &trace) override;
+
bool is_single_chunk(const hobject_t& obj, const ECSubRead& op);
ll_read_errorator::future<ceph::bufferlist> maybe_chunked_read(