From: Radosław Zarzyński Date: Tue, 26 Sep 2023 15:42:55 +0000 (+0200) Subject: crimson/osd: settle RMWPipeline and RMWPipeline within ECBackend X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=25cc7916daa983650eab5d64601e97df3e41cf6d;p=ceph-ci.git crimson/osd: settle RMWPipeline and RMWPipeline within ECBackend Signed-off-by: Radosław Zarzyński --- diff --git a/src/crimson/osd/ec_backend.cc b/src/crimson/osd/ec_backend.cc index 754a54a736f..d07caafab3e 100644 --- a/src/crimson/osd/ec_backend.cc +++ b/src/crimson/osd/ec_backend.cc @@ -35,12 +35,15 @@ ECBackend::ECBackend(pg_shard_t whoami, uint64_t stripe_width, bool fast_read, bool allows_ecoverwrites, - DoutPrefixProvider &dpp) + DoutPrefixProvider &dpp, + ECListener &eclistener) : PGBackend{whoami, coll, shard_services, dpp}, ec_impl{create_ec_impl(ec_profile)}, sinfo(ec_impl, stripe_width), fast_read{fast_read}, - allows_ecoverwrites{allows_ecoverwrites} + allows_ecoverwrites{allows_ecoverwrites}, + read_pipeline{shard_services.get_cct(), ec_impl, sinfo, &eclistener}, + rmw_pipeline{shard_services.get_cct(), ec_impl, sinfo, &eclistener, *this} { } @@ -71,7 +74,7 @@ ECBackend::write_iertr::future<> ECBackend::handle_sub_write( pg_shard_t from, ECSubWrite &&op, - crimson::osd::PG& pg) + ECListener& pg) { LOG_PREFIX(ECBackend::handle_sub_write); logger().info("{} from {}", __func__, from); @@ -124,6 +127,16 @@ ECBackend::handle_sub_write( }); } +void ECBackend::handle_sub_write( + pg_shard_t from, + OpRequestRef msg, + ECSubWrite &op, + const ZTracer::Trace &trace, + ECListener& eclistener) +{ + std::ignore = handle_sub_write(from, std::move(op), eclistener); +} + ECBackend::write_iertr::future<> ECBackend::handle_rep_write_op( Ref m, @@ -202,6 +215,14 @@ ECBackend::maybe_chunked_read( } } +void ECBackend::objects_read_and_reconstruct( + const std::map> &reads, + bool fast_read, + GenContextURef &&func) +{ + // TODO XXX FIXME +} + ECBackend::ll_read_ierrorator::future<> ECBackend::handle_rep_read_op(Ref m) { diff --git a/src/crimson/osd/ec_backend.h b/src/crimson/osd/ec_backend.h index d685cdcf129..36af2451d41 100644 --- a/src/crimson/osd/ec_backend.h +++ b/src/crimson/osd/ec_backend.h @@ -12,6 +12,7 @@ #include "messages/MOSDECSubOpWriteReply.h" #include "messages/MOSDECSubOpRead.h" #include "messages/MOSDECSubOpReadReply.h" +#include "osd/ECCommon.h" #include "osd/ECUtil.h" #include "osd/osd_types.h" #include "pg_backend.h" @@ -20,7 +21,8 @@ namespace crimson::osd { class PG; -class ECBackend : public PGBackend +class ECBackend : public PGBackend, + public ECCommon { static ceph::ErasureCodeInterfaceRef create_ec_impl( const ec_profile_t& ec_profile); @@ -33,7 +35,8 @@ public: uint64_t stripe_width, bool fast_read, bool allows_ecoverwrites, - DoutPrefixProvider &dpp); + DoutPrefixProvider &dpp, + ECListener &eclistener); seastar::future<> stop() final { return seastar::now(); } @@ -64,7 +67,14 @@ private: write_iertr::future<> handle_sub_write( pg_shard_t from, ECSubWrite&& op, - crimson::osd::PG& pg); + ECListener& pg); + + void handle_sub_write( + pg_shard_t from, + OpRequestRef msg, + ECSubWrite &op, + const ZTracer::Trace &trace, + ECListener& eclistener) override; bool is_single_chunk(const hobject_t& obj, const ECSubRead& op); @@ -75,11 +85,19 @@ private: std::uint64_t size, std::uint32_t flags); + void objects_read_and_reconstruct( + const std::map> &reads, + bool fast_read, + GenContextURef &&func) override; + ceph::ErasureCodeInterfaceRef ec_impl; const ECUtil::stripe_info_t sinfo; const bool fast_read; const bool allows_ecoverwrites; + + ECCommon::ReadPipeline read_pipeline; + ECCommon::RMWPipeline rmw_pipeline; }; } diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc index 7bd5f89f384..9c6b192e45e 100644 --- a/src/crimson/osd/pg.cc +++ b/src/crimson/osd/pg.cc @@ -117,6 +117,7 @@ PG::PG( coll_ref, shard_services, profile, + *this, *this)), recovery_backend( std::make_unique( diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index 8029e6982cc..e8729981656 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -17,6 +17,7 @@ #include "messages/MOSDRepOpReply.h" #include "messages/MOSDOpReply.h" #include "os/Transaction.h" +#include "osd/ECCommon.h" #include "osd/osd_types.h" #include "osd/osd_types_fmt.h" #include "crimson/osd/object_context.h" @@ -70,10 +71,10 @@ class PglogBasedRecovery; class PGBackend; class ReplicatedBackend; -class PG : public boost::intrusive_ref_counter< - PG, - boost::thread_unsafe_counter>, +class PG +: public boost::intrusive_ref_counter, public PGRecoveryListener, + public ECListener, PeeringState::PeeringListener, DoutPrefixProvider { @@ -111,6 +112,79 @@ public: ~PG(); + // ECListener begins + const OSDMapRef& pgb_get_osdmap() const override final { + return peering_state.get_osdmap(); + } + epoch_t pgb_get_osdmap_epoch() const override final { + return get_osdmap_epoch(); + } + void cancel_pull(const hobject_t &soid) override { + // TODO + } + const std::set &get_acting_shards() const override { + return get_actingset(); + } + const std::set &get_backfill_shards() const override { + return peering_state.get_backfill_targets(); + } + const std::map &get_shard_info() const override { + return peering_state.get_peer_info(); + } + const pg_info_t &get_shard_info(pg_shard_t peer) const override { + if (peer == get_primary()) { + return get_info(); + } else { + std::map::const_iterator i = + get_shard_info().find(peer); + ceph_assert(i != get_shard_info().end()); + return i->second; + } + } + ceph_tid_t get_tid() override final { + return shard_services.get_tid(); + } + pg_shard_t whoami_shard() const override { + return get_pg_whoami(); + } + void send_message_osd_cluster(std::vector>& messages, + epoch_t from_epoch) override final { + std::ignore = seastar::do_with(std::move(messages), + [this, from_epoch](auto&& messages) { + return seastar::do_for_each(messages, [this, from_epoch] (auto&& im) { + auto& [osd_id, msg] = im; + return shard_services.send_to_osd(osd_id, MessageURef{msg}, from_epoch); + }); + }); + } + std::ostream& gen_dbg_prefix(std::ostream& out) const override final { + return gen_prefix(out); + } + const pg_pool_t &get_pool() const override { + return peering_state.get_pgpool().info; + } + const std::set &get_acting_recovery_backfill_shards() const override { + return get_acting_recovery_backfill(); + } + bool should_send_op(pg_shard_t peer, const hobject_t &hoid) override { + if (peer == get_primary()) { + // TODO XXX FIXME + assert(peer == get_primary()); + return true; + } + abort(); + } + spg_t primary_spg_t() const override { + return spg_t(get_info().pgid.pgid, get_primary().shard); + } + const PGLog &get_log() const override { + return peering_state.get_pg_log(); + } + DoutPrefixProvider *get_dpp() override { + return this; + } + // ECListener ends + const pg_shard_t& get_pg_whoami() const final { return pg_whoami; } @@ -776,7 +850,7 @@ private: public: - cached_map_t get_osdmap() { return peering_state.get_osdmap(); } + cached_map_t get_osdmap() const { return peering_state.get_osdmap(); } eversion_t get_next_version() { return eversion_t(get_osdmap_epoch(), projected_last_update.version + 1); @@ -835,7 +909,7 @@ public: pg_stat_t get_stats() const; void apply_stats( const hobject_t &soid, - const object_stat_sum_t &delta_stats); + const object_stat_sum_t &delta_stats) final; private: std::optional pg_stats; @@ -914,17 +988,23 @@ public: epoch_t get_interval_start_epoch() const { return get_info().history.same_interval_since; } - const pg_missing_const_i* get_shard_missing(pg_shard_t shard) const { - if (shard == pg_whoami) + const pg_missing_const_i* maybe_get_shard_missing(pg_shard_t shard) const { + if (shard == pg_whoami) { return &get_local_missing(); - else { + } else { auto it = peering_state.get_peer_missing().find(shard); - if (it == peering_state.get_peer_missing().end()) + if (it == peering_state.get_peer_missing().end()) { return nullptr; - else + } else { return &it->second; + } } } + const pg_missing_const_i &get_shard_missing(pg_shard_t peer) const override { + auto m = maybe_get_shard_missing(peer); + assert(m); + return *m; + } struct complete_op_t { const version_t user_version; @@ -1013,7 +1093,7 @@ private: bool can_discard_replica_op(const Message& m, epoch_t m_map_epoch) const; bool can_discard_op(const MOSDOp& m) const; void context_registry_on_change(); - bool is_missing_object(const hobject_t& soid) const { + bool is_missing_object(const hobject_t& soid) const final { return get_local_missing().is_missing(soid); } bool is_unreadable_object(const hobject_t &oid, @@ -1035,10 +1115,10 @@ private: const std::set &get_actingset() const { return peering_state.get_actingset(); } - void add_local_next_event(const pg_log_entry_t& e) { + void add_local_next_event(const pg_log_entry_t& e) override final { peering_state.add_local_next_event(e); } - void op_applied(const eversion_t &applied_version); + void op_applied(const eversion_t &applied_version) override final; private: friend class IOInterruptCondition; diff --git a/src/crimson/osd/pg_backend.cc b/src/crimson/osd/pg_backend.cc index d19656935cb..697f3e48aa4 100644 --- a/src/crimson/osd/pg_backend.cc +++ b/src/crimson/osd/pg_backend.cc @@ -55,7 +55,8 @@ PGBackend::create(pg_t pgid, crimson::os::CollectionRef coll, crimson::osd::ShardServices& shard_services, const ec_profile_t& ec_profile, - DoutPrefixProvider &dpp) + DoutPrefixProvider &dpp, + ECListener &eclistener) { switch (pool.type) { case pg_pool_t::TYPE_REPLICATED: @@ -68,7 +69,8 @@ PGBackend::create(pg_t pgid, pool.stripe_width, pool.fast_read, pool.allows_ecoverwrites(), - dpp); + dpp, + eclistener); default: throw runtime_error(seastar::format("unsupported pool type '{}'", pool.type)); diff --git a/src/crimson/osd/pg_backend.h b/src/crimson/osd/pg_backend.h index 4c5956e3e7e..3ec46ad69b1 100644 --- a/src/crimson/osd/pg_backend.h +++ b/src/crimson/osd/pg_backend.h @@ -74,7 +74,8 @@ public: crimson::os::CollectionRef coll, crimson::osd::ShardServices& shard_services, const ec_profile_t& ec_profile, - DoutPrefixProvider &dpp); + DoutPrefixProvider &dpp, + struct ECListener &eclistener); using attrs_t = std::map>; using read_errorator = ll_read_errorator::extend< diff --git a/src/crimson/osd/replicated_recovery_backend.cc b/src/crimson/osd/replicated_recovery_backend.cc index 49fa62977c1..477f29d9ac6 100644 --- a/src/crimson/osd/replicated_recovery_backend.cc +++ b/src/crimson/osd/replicated_recovery_backend.cc @@ -319,7 +319,7 @@ ReplicatedRecoveryBackend::recover_delete( for (const auto& shard : pg.get_acting_recovery_backfill()) { if (shard == pg.get_pg_whoami()) continue; - if (pg.get_shard_missing(shard)->is_missing(soid)) { + if (pg.get_shard_missing(shard).is_missing(soid)) { DEBUGDPP( "soid {} needs to be deleted from replica {}", pg, soid, shard); diff --git a/src/osd/ECListener.h b/src/osd/ECListener.h index 8f606c72c45..229d268e77b 100644 --- a/src/osd/ECListener.h +++ b/src/osd/ECListener.h @@ -178,7 +178,7 @@ struct ECListener { const eversion_t &roll_forward_to, const eversion_t &pg_committed_to, bool transaction_applied, - ceph::os::Transaction &t, + ObjectStore::Transaction &t, bool async = false) = 0; virtual void op_applied( const eversion_t &applied_version) = 0;