From: Radosław Zarzyński Date: Mon, 11 Sep 2023 14:26:58 +0000 (+0200) Subject: crimson/osd: add support for ECWrites on replica X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=2264c86e569002a650651ca63cfa564e52447292;p=ceph-ci.git crimson/osd: add support for ECWrites on replica Signed-off-by: Radosław Zarzyński --- diff --git a/src/crimson/osd/ec_backend.cc b/src/crimson/osd/ec_backend.cc index 217351deb35..754a54a736f 100644 --- a/src/crimson/osd/ec_backend.cc +++ b/src/crimson/osd/ec_backend.cc @@ -1,6 +1,7 @@ #include #include "crimson/common/log.h" +#include "crimson/osd/pg.h" #include "crimson/osd/shard_services.h" #include "ec_backend.h" @@ -67,9 +68,76 @@ ECBackend::submit_transaction(const std::set &pg_shards, } ECBackend::write_iertr::future<> -ECBackend::handle_rep_write_op(Ref) +ECBackend::handle_sub_write( + pg_shard_t from, + ECSubWrite &&op, + crimson::osd::PG& pg) { - return write_iertr::now(); + LOG_PREFIX(ECBackend::handle_sub_write); + logger().info("{} from {}", __func__, from); + if (!op.temp_added.empty()) { + add_temp_obj(std::begin(op.temp_added), std::end(op.temp_added)); + } + ceph::os::Transaction txn; + if (op.backfill_or_async_recovery) { + for (const auto& obj : op.temp_removed) { + logger().info("{}: removing object {} since we won't get the transaction", + __func__, obj); + txn.remove(coll->get_cid(), + ghobject_t{obj, ghobject_t::NO_GEN, get_shard()}); + } + } + clear_temp_objs(op.temp_removed); + logger().debug("{}: missing before {}", __func__, ""); + + // flag set to true during async recovery + bool async = false; + if (pg.is_missing_object(op.soid)) { + async = true; + logger().debug("{}: {} is missing", __func__, op.soid); + for (const auto& e: op.log_entries) { + logger().debug("{}: add_next_event entry {}, is_delete {}", + __func__, e, e.is_delete()); + pg.add_local_next_event(e); + } + } + pg.log_operation( + std::move(op.log_entries), + op.updated_hit_set_history, + op.trim_to, + op.pg_committed_to, + op.pg_committed_to, + !op.backfill_or_async_recovery, + txn, + async); + txn.append(op.t); // hack warn + logger().debug("{}:{}", __func__, __LINE__); + if (op.at_version != eversion_t()) { + // dummy rollforward transaction doesn't get at_version + // (and doesn't advance it) + pg.op_applied(op.at_version); + } + logger().debug("{}:{}", __func__, __LINE__); + return store->do_transaction(coll, std::move(txn)).then([FNAME] { + DEBUG("transaction commited!"); + return write_iertr::now(); + }); +} + +ECBackend::write_iertr::future<> +ECBackend::handle_rep_write_op( + Ref m, + crimson::osd::PG& pg) +{ + LOG_PREFIX(ECBackend::handle_rep_write_op); + const auto tid = m->op.tid; + DEBUG("tid {} from {}", tid, m->op.from); + return handle_sub_write( + m->op.from, std::move(m->op), pg + ).si_then([&pg] { + assert(!pg.pgb_is_primary()); + return write_iertr::now(); + }, crimson::ct_error::assert_all{}); } ECBackend::write_iertr::future<> diff --git a/src/crimson/osd/ec_backend.h b/src/crimson/osd/ec_backend.h index 3b637a4d346..d685cdcf129 100644 --- a/src/crimson/osd/ec_backend.h +++ b/src/crimson/osd/ec_backend.h @@ -18,6 +18,8 @@ namespace crimson::osd { +class PG; + class ECBackend : public PGBackend { static ceph::ErasureCodeInterfaceRef create_ec_impl( @@ -37,7 +39,9 @@ public: } void on_actingset_changed(bool same_primary) final {} - write_iertr::future<> handle_rep_write_op(Ref); + write_iertr::future<> handle_rep_write_op( + Ref, + crimson::osd::PG& pg); write_iertr::future<> handle_rep_write_reply(Ref); ll_read_ierrorator::future<> handle_rep_read_op(Ref); ll_read_ierrorator::future<> handle_rep_read_reply(Ref); @@ -52,12 +56,16 @@ private: osd_op_params_t&& req, epoch_t min_epoch, epoch_t max_epoch, std::vector&& log_entries) final; - CollectionRef coll; seastar::future<> request_committed(const osd_reqid_t& reqid, const eversion_t& version) final { return seastar::now(); } + write_iertr::future<> handle_sub_write( + pg_shard_t from, + ECSubWrite&& op, + crimson::osd::PG& pg); + bool is_single_chunk(const hobject_t& obj, const ECSubRead& op); ll_read_errorator::future maybe_chunked_read( diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc index bda0b314b83..7bd5f89f384 100644 --- a/src/crimson/osd/pg.cc +++ b/src/crimson/osd/pg.cc @@ -1433,11 +1433,34 @@ void PG::handle_rep_op_reply(const MOSDRepOpReply& m) PG::interruptible_future<> PG::handle_rep_write_op(Ref m) { + logger().debug("{}", __func__); + if (!is_primary()) { + peering_state.update_stats([&new_stats=m->op.stats](auto&, auto &stats) { + stats = new_stats; + return false; + }); + } auto* ec_backend=dynamic_cast<::ECBackend*>(&get_backend()); assert(ec_backend); + const auto tid = m->op.tid; return ec_backend->handle_rep_write_op( - std::move(m) - ).handle_error_interruptible(crimson::ct_error::assert_all{}); + std::move(m), + *this + ).si_then([this, then_lcod=peering_state.get_info().last_complete, tid] { + logger().debug("{} sending response", "handle_rep_write_op"); + peering_state.update_last_complete_ondisk(then_lcod); + auto r = crimson::make_message(); + r->pgid = spg_t(peering_state.get_info().pgid.pgid, get_primary().shard); + r->map_epoch = get_osdmap_epoch(); + r->min_epoch = peering_state.get_info().history.same_interval_since; + r->op.tid = tid; + r->op.last_complete = then_lcod; + r->op.committed = true; + r->op.applied = true; + r->op.from = pg_whoami; + r->set_priority(CEPH_MSG_PRIO_HIGH); + return shard_services.send_to_osd(get_primary().osd, std::move(r), get_osdmap_epoch()); + }).handle_error_interruptible(crimson::ct_error::assert_all{}); } PG::interruptible_future<> PG::handle_rep_read_op(Ref m) @@ -1723,6 +1746,22 @@ bool PG::should_send_op( // by crimson yet } +void PG::op_applied(const eversion_t &applied_version) +{ + logger().info("{}: op_applied version {}", __func__, applied_version); + assert(applied_version != eversion_t()); + assert(applied_version <= peering_state.get_info().last_update); + peering_state.local_write_applied(applied_version); + +#if 0 + if (is_primary() && m_scrubber) { + // if there's a scrub operation waiting for the selected chunk to be fully updated - + // allow it to continue + m_scrubber->on_applied_when_primary(recovery_state.get_last_update_applied()); + } +#endif +} + PG::interruptible_future> PG::already_complete(const osd_reqid_t& reqid) { diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index 9b35246635a..8029e6982cc 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -47,6 +47,7 @@ class MQuery; class OSDMap; +class ECBackend; class PGPeeringEvent; class osd_op_params_t; @@ -997,6 +998,7 @@ private: friend class WatchTimeoutRequest; friend class SnapTrimEvent; friend class SnapTrimObjSubEvent; + friend ECBackend; private: void enqueue_push_for_backfill( @@ -1033,6 +1035,10 @@ private: const std::set &get_actingset() const { return peering_state.get_actingset(); } + void add_local_next_event(const pg_log_entry_t& e) { + peering_state.add_local_next_event(e); + } + void op_applied(const eversion_t &applied_version); private: friend class IOInterruptCondition; diff --git a/src/crimson/osd/pg_backend.cc b/src/crimson/osd/pg_backend.cc index b4f65a3e84e..d19656935cb 100644 --- a/src/crimson/osd/pg_backend.cc +++ b/src/crimson/osd/pg_backend.cc @@ -17,6 +17,8 @@ #include "os/Transaction.h" #include "common/Checksummer.h" #include "common/Clock.h" +#include "erasure-code/ErasureCodeInterface.h" +#include "erasure-code/ErasureCodePlugin.h" #include "crimson/common/coroutine.h" #include "crimson/common/exception.h" @@ -82,7 +84,9 @@ PGBackend::PGBackend(pg_shard_t whoami, shard_services{shard_services}, dpp{dpp}, store{&shard_services.get_store()} -{} +{ + logger().info("initialized PGBackend::store with {}", (void*)this->store); +} PGBackend::load_metadata_iertr::future diff --git a/src/crimson/osd/replicated_recovery_backend.cc b/src/crimson/osd/replicated_recovery_backend.cc index c9427ef9645..49fa62977c1 100644 --- a/src/crimson/osd/replicated_recovery_backend.cc +++ b/src/crimson/osd/replicated_recovery_backend.cc @@ -601,7 +601,7 @@ ReplicatedRecoveryBackend::read_metadata_for_push_op( PushOp* push_op) { LOG_PREFIX(ReplicatedRecoveryBackend::read_metadata_for_push_op); - DEBUGDPP("{}", pg, oid); + DEBUGDPP("{} progress.first {}", pg, oid, progress.first); if (!progress.first) { return seastar::make_ready_future(ver); }