]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: lay down the ECListener for ECBackend::ReadPipeline
authorRadosław Zarzyński <rzarzyns@redhat.com>
Fri, 15 Sep 2023 12:07:46 +0000 (14:07 +0200)
committerRadoslaw Zarzynski <rzarzyns@redhat.com>
Wed, 10 Jan 2024 17:23:14 +0000 (17:23 +0000)
Signed-off-by: Radosław Zarzyński <rzarzyns@redhat.com>
src/osd/ECBackend.cc
src/osd/ECBackend.h
src/osd/PGBackend.h
src/osd/PrimaryLogPG.cc
src/osd/PrimaryLogPG.h

index a1d777cba2485bb4a8c7c1c6ec5825b21d1bf243..e597129626f5c0dff782f00d47ef247fa298dc8d 100644 (file)
@@ -224,7 +224,7 @@ ECBackend::ECBackend(
   ErasureCodeInterfaceRef ec_impl,
   uint64_t stripe_width)
   : PGBackend(cct, pg, store, coll, ch),
-    read_pipeline(cct, ec_impl, this->sinfo, get_parent()),
+    read_pipeline(cct, ec_impl, this->sinfo, get_parent()->get_eclistener()),
     rmw_pipeline(cct, ec_impl, this->sinfo, get_parent(), *this),
     ec_impl(ec_impl),
     sinfo(ec_impl->get_data_chunk_count(), stripe_width) {
@@ -1897,7 +1897,7 @@ void ECBackend::ReadPipeline::do_read_op(ReadOp &op)
     MOSDECSubOpRead *msg = new MOSDECSubOpRead;
     msg->set_priority(priority);
     msg->pgid = spg_t(
-      get_parent()->whoami_spg_t().pgid,
+      get_info().pgid.pgid,
       i->first.shard);
     msg->map_epoch = get_osdmap_epoch();
     msg->min_epoch = get_parent()->get_interval_start_epoch();
index 0bc4f9f423be7c5e149f3a3ee27f74fbdab2d26e..238895d3e5b346615e847f8ba39ecb477f885835 100644 (file)
@@ -32,6 +32,54 @@ struct ECSubRead;
 struct ECSubReadReply;
 
 struct RecoveryMessages;
+
+  // ECListener -- an interface decoupling the pipelines from
+  // particular implementation of ECBackend (crimson vs cassical).
+  // https://stackoverflow.com/q/7872958
+  struct ECListener {
+    virtual ~ECListener() = default;
+    virtual const OSDMapRef& pgb_get_osdmap() const = 0;
+    virtual epoch_t pgb_get_osdmap_epoch() const = 0;
+    virtual const pg_info_t &get_info() const = 0;
+    /**
+     * Called when a pull on soid cannot be completed due to
+     * down peers
+     */
+    virtual void cancel_pull(
+      const hobject_t &soid) = 0;
+    virtual void schedule_recovery_work(
+      GenContext<ThreadPool::TPHandle&> *c,
+      uint64_t cost) = 0;
+
+    virtual epoch_t get_interval_start_epoch() const = 0;
+    virtual const std::set<pg_shard_t> &get_acting_shards() const = 0;
+    virtual const std::set<pg_shard_t> &get_backfill_shards() const = 0;
+    virtual const std::map<hobject_t, std::set<pg_shard_t>> &get_missing_loc_shards()
+      const = 0;
+
+    virtual const std::map<pg_shard_t,
+                          pg_missing_t> &get_shard_missing() const = 0;
+    virtual const pg_missing_const_i &get_shard_missing(pg_shard_t peer) const = 0;
+#if 1
+    virtual const pg_missing_const_i * maybe_get_shard_missing(
+      pg_shard_t peer) const = 0;
+    virtual const pg_info_t &get_shard_info(pg_shard_t peer) const = 0;
+#endif
+    virtual ceph_tid_t get_tid() = 0;
+    virtual pg_shard_t whoami_shard() const = 0;
+#if 0
+    int whoami() const {
+      return whoami_shard().osd;
+    }
+    spg_t whoami_spg_t() const {
+      return get_info().pgid;
+    }
+#endif
+    virtual void send_message_osd_cluster(
+      std::vector<std::pair<int, Message*>>& messages, epoch_t from_epoch) = 0;
+
+    virtual std::ostream& gen_dbg_prefix(std::ostream& out) const = 0;
+  };
 class ECBackend : public PGBackend {
 public:
   RecoveryHandle *open_recovery_op() override;
@@ -446,9 +494,9 @@ public:
     ceph::ErasureCodeInterfaceRef ec_impl;
     const ECUtil::stripe_info_t& sinfo;
     // TODO: lay an interface down here
-    PGBackend::Listener* parent;
+    ECListener* parent;
 
-    PGBackend::Listener *get_parent() const { return parent; }
+    ECListener *get_parent() const { return parent; }
     const OSDMapRef& get_osdmap() const { return get_parent()->pgb_get_osdmap(); }
     epoch_t get_osdmap_epoch() const { return get_parent()->pgb_get_osdmap_epoch(); }
     const pg_info_t &get_info() { return get_parent()->get_info(); }
@@ -456,7 +504,7 @@ public:
     ReadPipeline(CephContext* cct,
                 ceph::ErasureCodeInterfaceRef ec_impl,
                 const ECUtil::stripe_info_t& sinfo,
-                PGBackend::Listener* parent)
+                ECListener* parent)
       : cct(cct),
         ec_impl(std::move(ec_impl)),
         sinfo(sinfo),
index ac17f05035db059b33fc49ebc4eac3694e7f0685..274d1fb874bcd82636ae316a2252e108c434bf4b 100644 (file)
@@ -171,25 +171,7 @@ typedef std::shared_ptr<const OSDMap> OSDMapRef;
      virtual void add_local_next_event(const pg_log_entry_t& e) = 0;
      virtual const std::map<pg_shard_t, pg_missing_t> &get_shard_missing()
        const = 0;
-     virtual const pg_missing_const_i * maybe_get_shard_missing(
-       pg_shard_t peer) const {
-       if (peer == primary_shard()) {
-        return &get_local_missing();
-       } else {
-        std::map<pg_shard_t, pg_missing_t>::const_iterator i =
-          get_shard_missing().find(peer);
-        if (i == get_shard_missing().end()) {
-          return nullptr;
-        } else {
-          return &(i->second);
-        }
-       }
-     }
-     virtual const pg_missing_const_i &get_shard_missing(pg_shard_t peer) const {
-       auto m = maybe_get_shard_missing(peer);
-       ceph_assert(m);
-       return *m;
-     }
+     virtual const pg_missing_const_i &get_shard_missing(pg_shard_t peer) const = 0;
 
      virtual const std::map<pg_shard_t, pg_info_t> &get_shard_info() const = 0;
      virtual const pg_info_t &get_shard_info(pg_shard_t peer) const {
@@ -305,6 +287,7 @@ typedef std::shared_ptr<const OSDMap> OSDMapRef;
      virtual void pg_add_num_bytes(int64_t num_bytes) = 0;
      virtual void pg_sub_num_bytes(int64_t num_bytes) = 0;
      virtual bool maybe_preempt_replica_scrub(const hobject_t& oid) = 0;
+     virtual struct ECListener *get_eclistener() = 0;
      virtual ~Listener() {}
    };
    Listener *parent;
index 4ecbe58a1f3889d890bb031b5fa5d883e78fc66d..f3d03df1d36818d96790ee701657571681d33194 100644 (file)
@@ -15799,6 +15799,11 @@ bool PrimaryLogPG::maybe_preempt_replica_scrub(const hobject_t& oid)
   return m_scrubber->write_blocked_by_scrub(oid);
 }
 
+struct ECListener *PrimaryLogPG::get_eclistener()
+{
+  return this;
+}
+
 void intrusive_ptr_add_ref(PrimaryLogPG *pg) { pg->get("intptr"); }
 void intrusive_ptr_release(PrimaryLogPG *pg) { pg->put("intptr"); }
 
index 6ed29927463deb108eaa9ec8d6ef2b176710ebbe..420871b1990ee658d5bbf78591a67401646a4c8f 100644 (file)
@@ -29,6 +29,7 @@
 #include "common/sharedptr_registry.hpp"
 #include "common/shared_cache.hpp"
 #include "ReplicatedBackend.h"
+#include "ECBackend.h"
 #include "PGTransaction.h"
 #include "cls/cas/cls_cas_ops.h"
 
@@ -55,7 +56,9 @@ void put_with_id(PrimaryLogPG *pg, uint64_t id);
 
 struct inconsistent_snapset_wrapper;
 
-class PrimaryLogPG : public PG, public PGBackend::Listener {
+class PrimaryLogPG : public PG,
+                    public PGBackend::Listener,
+                    public ECListener {
   friend class OSD;
   friend class Watch;
   friend class PrimaryLogScrub;
@@ -391,11 +394,24 @@ public:
   const std::map<pg_shard_t, pg_missing_t> &get_shard_missing() const override {
     return recovery_state.get_peer_missing();
   }
-  using PGBackend::Listener::get_shard_missing;
+  const pg_missing_const_i &get_shard_missing(pg_shard_t peer) const override {
+    auto m = maybe_get_shard_missing(peer);
+    ceph_assert(m);
+    return *m;
+  }
   const std::map<pg_shard_t, pg_info_t> &get_shard_info() const override {
     return recovery_state.get_peer_info();
   }
-  using PGBackend::Listener::get_shard_info;
+  const pg_info_t &get_shard_info(pg_shard_t peer) const override {
+    if (peer == primary_shard()) {
+      return get_info();
+    } else {
+      std::map<pg_shard_t, pg_info_t>::const_iterator i =
+        get_shard_info().find(peer);
+      ceph_assert(i != get_shard_info().end());
+      return i->second;
+    }
+  }
   const pg_missing_tracker_t &get_local_missing() const override {
     return recovery_state.get_pg_log().get_missing();
   }
@@ -1892,6 +1908,21 @@ public:
   void on_shutdown() override;
   bool check_failsafe_full() override;
   bool maybe_preempt_replica_scrub(const hobject_t& oid) override;
+  struct ECListener *get_eclistener() override;
+  const pg_missing_const_i * maybe_get_shard_missing(
+    pg_shard_t peer) const {
+    if (peer == primary_shard()) {
+      return &get_local_missing();
+    } else {
+      std::map<pg_shard_t, pg_missing_t>::const_iterator i =
+        get_shard_missing().find(peer);
+      if (i == get_shard_missing().end()) {
+        return nullptr;
+      } else {
+        return &(i->second);
+      }
+    }
+  }
   int rep_repair_primary_object(const hobject_t& soid, OpContext *ctx);
 
   // attr cache handling