]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/osd: settle RMWPipeline and RMWPipeline within ECBackend
authorRadosław Zarzyński <rzarzyns@redhat.com>
Tue, 26 Sep 2023 15:42:55 +0000 (17:42 +0200)
committerRadoslaw Zarzynski <rzarzyns@redhat.com>
Tue, 27 Jan 2026 14:37:36 +0000 (14:37 +0000)
Signed-off-by: Radosław Zarzyński <rzarzyns@redhat.com>
src/crimson/osd/ec_backend.cc
src/crimson/osd/ec_backend.h
src/crimson/osd/pg.cc
src/crimson/osd/pg.h
src/crimson/osd/pg_backend.cc
src/crimson/osd/pg_backend.h
src/crimson/osd/replicated_recovery_backend.cc
src/osd/ECListener.h

index 754a54a736f8979e27e6fbc881ea02a52013d7db..d07caafab3e1ea56a11accb3b9c9d1a2330b75b0 100644 (file)
@@ -35,12 +35,15 @@ ECBackend::ECBackend(pg_shard_t whoami,
                      uint64_t stripe_width,
                      bool fast_read,
                      bool allows_ecoverwrites,
-                    DoutPrefixProvider &dpp)
+                    DoutPrefixProvider &dpp,
+                    ECListener &eclistener)
   : PGBackend{whoami, coll, shard_services, dpp},
     ec_impl{create_ec_impl(ec_profile)},
     sinfo(ec_impl, stripe_width),
     fast_read{fast_read},
-    allows_ecoverwrites{allows_ecoverwrites}
+    allows_ecoverwrites{allows_ecoverwrites},
+    read_pipeline{shard_services.get_cct(), ec_impl, sinfo, &eclistener},
+    rmw_pipeline{shard_services.get_cct(), ec_impl, sinfo, &eclistener, *this}
 {
 }
 
@@ -71,7 +74,7 @@ ECBackend::write_iertr::future<>
 ECBackend::handle_sub_write(
   pg_shard_t from,
   ECSubWrite &&op,
-  crimson::osd::PG& pg)
+  ECListener& pg)
 {
   LOG_PREFIX(ECBackend::handle_sub_write);
   logger().info("{} from {}", __func__, from);
@@ -124,6 +127,16 @@ ECBackend::handle_sub_write(
   });
 }
 
+void ECBackend::handle_sub_write(
+  pg_shard_t from,
+  OpRequestRef msg,
+  ECSubWrite &op,
+  const ZTracer::Trace &trace,
+  ECListener& eclistener)
+{
+  std::ignore = handle_sub_write(from, std::move(op), eclistener);
+}
+
 ECBackend::write_iertr::future<>
 ECBackend::handle_rep_write_op(
   Ref<MOSDECSubOpWrite> m,
@@ -202,6 +215,14 @@ ECBackend::maybe_chunked_read(
   }
 }
 
+void ECBackend::objects_read_and_reconstruct(
+  const std::map<hobject_t, std::list<ec_align_t>> &reads,
+  bool fast_read,
+  GenContextURef<ec_extents_t &&> &&func)
+{
+  // TODO XXX FIXME
+}
+
 ECBackend::ll_read_ierrorator::future<>
 ECBackend::handle_rep_read_op(Ref<MOSDECSubOpRead> m)
 {
index d685cdcf1296ab83f5f81d300f49dc621e0217bc..36af2451d41fbd0d813e71f3a91f9c2904867a41 100644 (file)
@@ -12,6 +12,7 @@
 #include "messages/MOSDECSubOpWriteReply.h"
 #include "messages/MOSDECSubOpRead.h"
 #include "messages/MOSDECSubOpReadReply.h"
+#include "osd/ECCommon.h"
 #include "osd/ECUtil.h"
 #include "osd/osd_types.h"
 #include "pg_backend.h"
@@ -20,7 +21,8 @@ namespace crimson::osd {
 
 class PG;
 
-class ECBackend : public PGBackend
+class ECBackend : public PGBackend,
+                 public ECCommon
 {
   static ceph::ErasureCodeInterfaceRef create_ec_impl(
     const ec_profile_t& ec_profile);
@@ -33,7 +35,8 @@ public:
            uint64_t stripe_width,
            bool fast_read,
            bool allows_ecoverwrites,
-           DoutPrefixProvider &dpp);
+           DoutPrefixProvider &dpp,
+           ECListener &eclistener);
   seastar::future<> stop() final {
     return seastar::now();
   }
@@ -64,7 +67,14 @@ private:
   write_iertr::future<> handle_sub_write(
     pg_shard_t from,
     ECSubWrite&& op,
-    crimson::osd::PG& pg);
+    ECListener& pg);
+
+  void handle_sub_write(
+    pg_shard_t from,
+    OpRequestRef msg,
+    ECSubWrite &op,
+    const ZTracer::Trace &trace,
+    ECListener& eclistener) override;
 
   bool is_single_chunk(const hobject_t& obj, const ECSubRead& op);
 
@@ -75,11 +85,19 @@ private:
     std::uint64_t size,
     std::uint32_t flags);
 
+  void objects_read_and_reconstruct(
+    const std::map<hobject_t, std::list<ec_align_t>> &reads,
+    bool fast_read,
+    GenContextURef<ec_extents_t &&> &&func) override;
+
   ceph::ErasureCodeInterfaceRef ec_impl;
   const ECUtil::stripe_info_t sinfo;
 
   const bool fast_read;
   const bool allows_ecoverwrites;
+
+  ECCommon::ReadPipeline read_pipeline;
+  ECCommon::RMWPipeline rmw_pipeline;
 };
 
 }
index 7bd5f89f3842d0a17e66123859eba29218ea0fe1..9c6b192e45e5c6bf57f408e7d9ef3dc13b7c83d0 100644 (file)
@@ -117,6 +117,7 @@ PG::PG(
        coll_ref,
        shard_services,
        profile,
+       *this,
        *this)),
     recovery_backend(
       std::make_unique<ReplicatedRecoveryBackend>(
index 8029e6982cce07486377d78111e48b876c2b62ca..e8729981656ef4d12ee78ab650f6caf9672ce523 100644 (file)
@@ -17,6 +17,7 @@
 #include "messages/MOSDRepOpReply.h"
 #include "messages/MOSDOpReply.h"
 #include "os/Transaction.h"
+#include "osd/ECCommon.h"
 #include "osd/osd_types.h"
 #include "osd/osd_types_fmt.h"
 #include "crimson/osd/object_context.h"
@@ -70,10 +71,10 @@ class PglogBasedRecovery;
 class PGBackend;
 class ReplicatedBackend;
 
-class PG : public boost::intrusive_ref_counter<
-  PG,
-  boost::thread_unsafe_counter>,
+class PG
+: public boost::intrusive_ref_counter<PG, boost::thread_unsafe_counter>,
   public PGRecoveryListener,
+  public ECListener,
   PeeringState::PeeringListener,
   DoutPrefixProvider
 {
@@ -111,6 +112,79 @@ public:
 
   ~PG();
 
+  // ECListener begins
+  const OSDMapRef& pgb_get_osdmap() const override final {
+    return peering_state.get_osdmap();
+  }
+  epoch_t pgb_get_osdmap_epoch() const override final {
+    return get_osdmap_epoch();
+  }
+  void cancel_pull(const hobject_t &soid) override {
+    // TODO
+  }
+  const std::set<pg_shard_t> &get_acting_shards() const override {
+    return get_actingset();
+  }
+  const std::set<pg_shard_t> &get_backfill_shards() const override {
+    return peering_state.get_backfill_targets();
+  }
+  const std::map<pg_shard_t, pg_info_t> &get_shard_info() const override {
+    return peering_state.get_peer_info();
+  }
+  const pg_info_t &get_shard_info(pg_shard_t peer) const override {
+    if (peer == get_primary()) {
+      return get_info();
+    } else {
+      std::map<pg_shard_t, pg_info_t>::const_iterator i =
+        get_shard_info().find(peer);
+      ceph_assert(i != get_shard_info().end());
+      return i->second;
+    }
+  }
+  ceph_tid_t get_tid() override final {
+    return shard_services.get_tid();
+  }
+  pg_shard_t whoami_shard() const override {
+    return get_pg_whoami();
+  }
+  void send_message_osd_cluster(std::vector<std::pair<int, Message*>>& messages,
+                               epoch_t from_epoch) override final {
+    std::ignore = seastar::do_with(std::move(messages),
+                                   [this, from_epoch](auto&& messages) {
+      return seastar::do_for_each(messages, [this, from_epoch] (auto&& im) {
+        auto& [osd_id, msg] = im;
+        return shard_services.send_to_osd(osd_id, MessageURef{msg}, from_epoch);
+      });
+    });
+  }
+  std::ostream& gen_dbg_prefix(std::ostream& out) const override final {
+    return gen_prefix(out);
+  }
+  const pg_pool_t &get_pool() const override {
+    return peering_state.get_pgpool().info;
+  }
+  const std::set<pg_shard_t> &get_acting_recovery_backfill_shards() const override {
+    return get_acting_recovery_backfill();
+  }
+  bool should_send_op(pg_shard_t peer, const hobject_t &hoid) override {
+    if (peer == get_primary()) {
+      // TODO XXX FIXME
+      assert(peer == get_primary());
+      return true;
+    }
+    abort();
+  }
+  spg_t primary_spg_t() const override {
+    return spg_t(get_info().pgid.pgid, get_primary().shard);
+  }
+  const PGLog &get_log() const override {
+    return peering_state.get_pg_log();
+  }
+  DoutPrefixProvider *get_dpp() override {
+    return this;
+  }
+  // ECListener ends
+
   const pg_shard_t& get_pg_whoami() const final {
     return pg_whoami;
   }
@@ -776,7 +850,7 @@ private:
 
 
 public:
-  cached_map_t get_osdmap() { return peering_state.get_osdmap(); }
+  cached_map_t get_osdmap() const { return peering_state.get_osdmap(); }
   eversion_t get_next_version() {
     return eversion_t(get_osdmap_epoch(),
                      projected_last_update.version + 1);
@@ -835,7 +909,7 @@ public:
   pg_stat_t get_stats() const;
   void apply_stats(
     const hobject_t &soid,
-    const object_stat_sum_t &delta_stats);
+    const object_stat_sum_t &delta_stats) final;
 
 private:
   std::optional<pg_stat_t> pg_stats;
@@ -914,17 +988,23 @@ public:
   epoch_t get_interval_start_epoch() const {
     return get_info().history.same_interval_since;
   }
-  const pg_missing_const_i* get_shard_missing(pg_shard_t shard) const {
-    if (shard == pg_whoami)
+  const pg_missing_const_i* maybe_get_shard_missing(pg_shard_t shard) const {
+    if (shard == pg_whoami) {
       return &get_local_missing();
-    else {
+    else {
       auto it = peering_state.get_peer_missing().find(shard);
-      if (it == peering_state.get_peer_missing().end())
+      if (it == peering_state.get_peer_missing().end()) {
        return nullptr;
-      else
+      } else {
        return &it->second;
+      }
     }
   }
+  const pg_missing_const_i &get_shard_missing(pg_shard_t peer) const override {
+    auto m = maybe_get_shard_missing(peer);
+    assert(m);
+    return *m;
+  }
 
   struct complete_op_t {
     const version_t user_version;
@@ -1013,7 +1093,7 @@ private:
   bool can_discard_replica_op(const Message& m, epoch_t m_map_epoch) const;
   bool can_discard_op(const MOSDOp& m) const;
   void context_registry_on_change();
-  bool is_missing_object(const hobject_t& soid) const {
+  bool is_missing_object(const hobject_t& soid) const final {
     return get_local_missing().is_missing(soid);
   }
   bool is_unreadable_object(const hobject_t &oid,
@@ -1035,10 +1115,10 @@ private:
   const std::set<pg_shard_t> &get_actingset() const {
     return peering_state.get_actingset();
   }
-  void add_local_next_event(const pg_log_entry_t& e) {
+  void add_local_next_event(const pg_log_entry_t& e) override final {
     peering_state.add_local_next_event(e);
   }
-  void op_applied(const eversion_t &applied_version);
+  void op_applied(const eversion_t &applied_version) override final;
 
 private:
   friend class IOInterruptCondition;
index d19656935cb7271971684fbbdebb9bb51a34c2b0..697f3e48aa41340603c377687dd2e29a5e2fd9d8 100644 (file)
@@ -55,7 +55,8 @@ PGBackend::create(pg_t pgid,
                  crimson::os::CollectionRef coll,
                  crimson::osd::ShardServices& shard_services,
                  const ec_profile_t& ec_profile,
-                 DoutPrefixProvider &dpp)
+                 DoutPrefixProvider &dpp,
+                 ECListener &eclistener)
 {
   switch (pool.type) {
   case pg_pool_t::TYPE_REPLICATED:
@@ -68,7 +69,8 @@ PGBackend::create(pg_t pgid,
                                        pool.stripe_width,
                                       pool.fast_read,
                                       pool.allows_ecoverwrites(),
-                                      dpp);
+                                      dpp,
+                                      eclistener);
   default:
     throw runtime_error(seastar::format("unsupported pool type '{}'",
                                         pool.type));
index 4c5956e3e7ed9ad8cb40bbf4caddd671d60fc919..3ec46ad69b1358d5cd58345dc95bdef2198cf0d2 100644 (file)
@@ -74,7 +74,8 @@ public:
                                           crimson::os::CollectionRef coll,
                                           crimson::osd::ShardServices& shard_services,
                                           const ec_profile_t& ec_profile,
-                                          DoutPrefixProvider &dpp);
+                                          DoutPrefixProvider &dpp,
+                                          struct ECListener &eclistener);
   using attrs_t =
     std::map<std::string, ceph::bufferptr, std::less<>>;
   using read_errorator = ll_read_errorator::extend<
index 49fa62977c1623c6cc0861462294bda19f351b69..477f29d9ac6ddff211e8179713487eb049ef9b61 100644 (file)
@@ -319,7 +319,7 @@ ReplicatedRecoveryBackend::recover_delete(
        for (const auto& shard : pg.get_acting_recovery_backfill()) {
          if (shard == pg.get_pg_whoami())
            continue;
-         if (pg.get_shard_missing(shard)->is_missing(soid)) {
+         if (pg.get_shard_missing(shard).is_missing(soid)) {
            DEBUGDPP(
              "soid {} needs to be deleted from replica {}",
              pg, soid, shard);
index 8f606c72c45965286b30e5c1a10b9273308b8a9f..229d268e77b05ba59bbd145ad37474818bbe4773 100644 (file)
@@ -178,7 +178,7 @@ struct ECListener {
     const eversion_t &roll_forward_to,
     const eversion_t &pg_committed_to,
     bool transaction_applied,
-    ceph::os::Transaction &t,
+    ObjectStore::Transaction &t,
     bool async = false) = 0;
   virtual void op_applied(
     const eversion_t &applied_version) = 0;