]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: move ReadPipeline from ECBackend to shareable ECCommon
authorRadosław Zarzyński <rzarzyns@redhat.com>
Tue, 19 Sep 2023 13:23:31 +0000 (15:23 +0200)
committerRadoslaw Zarzynski <rzarzyns@redhat.com>
Wed, 10 Jan 2024 17:27:12 +0000 (17:27 +0000)
Signed-off-by: Radosław Zarzyński <rzarzyns@redhat.com>
src/osd/ECBackend.cc
src/osd/ECBackend.h

index 6750a8a538d8080f493726e96ffa0bd99c48aa6b..4e8f0b4e019a4b67b92a72be5de5c57095c9e06d 100644 (file)
@@ -55,7 +55,7 @@ using ceph::Formatter;
 static ostream& _prefix(std::ostream *_dout, ECBackend *pgb) {
   return pgb->get_parent()->gen_dbg_prefix(*_dout);
 }
-static ostream& _prefix(std::ostream *_dout, ECBackend::RMWPipeline *rmw_pipeline) {
+static ostream& _prefix(std::ostream *_dout, ECCommon::RMWPipeline *rmw_pipeline) {
   return rmw_pipeline->get_parent()->gen_dbg_prefix(*_dout);
 }
 static ostream& _prefix(std::ostream *_dout, ECCommon::ReadPipeline *read_pipeline) {
@@ -66,11 +66,11 @@ struct ECRecoveryHandle : public PGBackend::RecoveryHandle {
   list<ECBackend::RecoveryOp> ops;
 };
 
-ostream &operator<<(ostream &lhs, const ECBackend::RMWPipeline::pipeline_state_t &rhs) {
+ostream &operator<<(ostream &lhs, const ECCommon::RMWPipeline::pipeline_state_t &rhs) {
   switch (rhs.pipeline_state) {
-  case ECBackend::RMWPipeline::pipeline_state_t::CACHE_VALID:
+  case ECCommon::RMWPipeline::pipeline_state_t::CACHE_VALID:
     return lhs << "CACHE_VALID";
-  case ECBackend::RMWPipeline::pipeline_state_t::CACHE_INVALID:
+  case ECCommon::RMWPipeline::pipeline_state_t::CACHE_INVALID:
     return lhs << "CACHE_INVALID";
   default:
     ceph_abort_msg("invalid pipeline state");
@@ -161,7 +161,7 @@ void ECCommon::ReadOp::dump(Formatter *f) const
   f->dump_stream("in_progress") << in_progress;
 }
 
-ostream &operator<<(ostream &lhs, const ECBackend::RMWPipeline::Op &rhs)
+ostream &operator<<(ostream &lhs, const ECCommon::RMWPipeline::Op &rhs)
 {
   lhs << "Op(" << rhs.hoid
       << " v=" << rhs.version
@@ -1506,7 +1506,7 @@ void ECCommon::ReadPipeline::on_change()
   in_progress_client_reads.clear();
 }
 
-void ECBackend::RMWPipeline::on_change()
+void ECCommon::RMWPipeline::on_change()
 {
   dout(10) << __func__ << dendl;
 
@@ -1556,7 +1556,7 @@ void ECBackend::dump_recovery_info(Formatter *f) const
   f->close_section();
 }
 
-struct ECClassicalOp : ECBackend::RMWPipeline::Op {
+struct ECClassicalOp : ECCommon::RMWPipeline::Op {
   PGTransactionUPtr t;
 
   void generate_transactions(
@@ -1652,7 +1652,7 @@ void ECBackend::submit_transaction(
   rmw_pipeline.start_rmw(std::move(op));
 }
 
-void ECBackend::RMWPipeline::call_write_ordered(std::function<void(void)> &&cb) {
+void ECCommon::RMWPipeline::call_write_ordered(std::function<void(void)> &&cb) {
   if (!waiting_state.empty()) {
     waiting_state.back().on_write.emplace_back(std::move(cb));
   } else if (!waiting_reads.empty()) {
@@ -1978,7 +1978,7 @@ ECUtil::HashInfoRef ECBackend::get_hash_info(
   return ref;
 }
 
-void ECBackend::RMWPipeline::start_rmw(OpRef op)
+void ECCommon::RMWPipeline::start_rmw(OpRef op)
 {
   ceph_assert(op);
   dout(10) << __func__ << ": " << *op << dendl;
@@ -1989,7 +1989,7 @@ void ECBackend::RMWPipeline::start_rmw(OpRef op)
   check_ops();
 }
 
-bool ECBackend::RMWPipeline::try_state_to_reads()
+bool ECCommon::RMWPipeline::try_state_to_reads()
 {
   if (waiting_state.empty())
     return false;
@@ -2063,7 +2063,7 @@ bool ECBackend::RMWPipeline::try_state_to_reads()
   return true;
 }
 
-bool ECBackend::RMWPipeline::try_reads_to_commit()
+bool ECCommon::RMWPipeline::try_reads_to_commit()
 {
   if (waiting_reads.empty())
     return false;
@@ -2221,7 +2221,7 @@ bool ECBackend::RMWPipeline::try_reads_to_commit()
   return true;
 }
 
-bool ECBackend::RMWPipeline::try_finish_rmw()
+bool ECCommon::RMWPipeline::try_finish_rmw()
 {
   if (waiting_commit.empty())
     return false;
@@ -2270,7 +2270,7 @@ bool ECBackend::RMWPipeline::try_finish_rmw()
   return true;
 }
 
-void ECBackend::RMWPipeline::check_ops()
+void ECCommon::RMWPipeline::check_ops()
 {
   while (try_state_to_reads() ||
         try_reads_to_commit() ||
index a2769e6cb6dc9191b5d241c87aa3acd455261ad4..5fbe8ccbfc676cb3b4ce0bb90b5ef705389aa1b4 100644 (file)
@@ -341,6 +341,205 @@ struct ECCommon {
       std::map<pg_shard_t, std::vector<std::pair<int, int>>> *to_read   ///< [out] shards, corresponding subchunks to read
       ); ///< @return error code, 0 on success
   };
+
+  /**
+   * Client writes
+   *
+   * ECTransaction is responsible for generating a transaction for
+   * each shard to which we need to send the write.  As required
+   * by the PGBackend interface, the ECBackend write mechanism
+   * passes trim information with the write and last_complete back
+   * with the reply.
+   *
+   * As with client reads, there is a possibility of out-of-order
+   * completions. Thus, callbacks and completion are called in order
+   * on the writing std::list.
+   */
+
+  struct RMWPipeline {
+    struct Op : boost::intrusive::list_base_hook<> {
+      /// From submit_transaction caller, describes operation
+      hobject_t hoid;
+      object_stat_sum_t delta_stats;
+      eversion_t version;
+      eversion_t trim_to;
+      std::optional<pg_hit_set_history_t> updated_hit_set_history;
+      std::vector<pg_log_entry_t> log_entries;
+      ceph_tid_t tid;
+      osd_reqid_t reqid;
+      ZTracer::Trace trace;
+
+      eversion_t roll_forward_to; /// Soon to be generated internally
+
+      /// Ancillary also provided from submit_transaction caller
+      std::map<hobject_t, ObjectContextRef> obc_map;
+
+      /// see call_write_ordered
+      std::list<std::function<void(void)> > on_write;
+
+      /// Generated internally
+      std::set<hobject_t> temp_added;
+      std::set<hobject_t> temp_cleared;
+
+      ECTransaction::WritePlan plan;
+      bool requires_rmw() const { return !plan.to_read.empty(); }
+      bool invalidates_cache() const { return plan.invalidates_cache; }
+
+      // must be true if requires_rmw(), must be false if invalidates_cache()
+      bool using_cache = true;
+
+      /// In progress read state;
+      std::map<hobject_t,extent_set> pending_read; // subset already being read
+      std::map<hobject_t,extent_set> remote_read;  // subset we must read
+      std::map<hobject_t,extent_map> remote_read_result;
+      bool read_in_progress() const {
+        return !remote_read.empty() && remote_read_result.empty();
+      }
+
+      /// In progress write state.
+      std::set<pg_shard_t> pending_commit;
+      // we need pending_apply for pre-mimic peers so that we don't issue a
+      // read on a remote shard before it has applied a previous write.  We can
+      // remove this after nautilus.
+      std::set<pg_shard_t> pending_apply;
+      bool write_in_progress() const {
+        return !pending_commit.empty() || !pending_apply.empty();
+      }
+
+      /// optional, may be null, for tracking purposes
+      OpRequestRef client_op;
+
+      /// pin for cache
+      ExtentCache::write_pin pin;
+
+      /// Callbacks
+      Context *on_all_commit = nullptr;
+      virtual ~Op() {
+        delete on_all_commit;
+      }
+
+      virtual void generate_transactions(
+        ceph::ErasureCodeInterfaceRef &ecimpl,
+        pg_t pgid,
+        const ECUtil::stripe_info_t &sinfo,
+        std::map<hobject_t,extent_map> *written,
+        std::map<shard_id_t, ObjectStore::Transaction> *transactions,
+        DoutPrefixProvider *dpp,
+        const ceph_release_t require_osd_release = ceph_release_t::unknown) = 0;
+    };
+    using OpRef = std::unique_ptr<Op>;
+    using op_list = boost::intrusive::list<Op>;
+    friend ostream &operator<<(ostream &lhs, const Op &rhs);
+
+    ExtentCache cache;
+    std::map<ceph_tid_t, OpRef> tid_to_op_map; /// Owns Op structure
+    /**
+     * We model the possible rmw states as a std::set of waitlists.
+     * All writes at this time complete in order, so a write blocked
+     * at waiting_state blocks all writes behind it as well (same for
+     * other states).
+     *
+     * Future work: We can break this up into a per-object pipeline
+     * (almost).  First, provide an ordering token to submit_transaction
+     * and require that all operations within a single transaction take
+     * place on a subset of hobject_t space partitioned by that token
+     * (the hashid seem about right to me -- even works for temp objects
+     * if you recall that a temp object created for object head foo will
+     * only ever be referenced by other transactions on foo and aren't
+     * reused).  Next, factor this part into a class and maintain one per
+     * ordering token.  Next, fixup PrimaryLogPG's repop queue to be
+     * partitioned by ordering token.  Finally, refactor the op pipeline
+     * so that the log entries passed into submit_transaction aren't
+     * versioned.  We can't assign versions to them until we actually
+     * submit the operation.  That's probably going to be the hard part.
+     */
+    class pipeline_state_t {
+      enum {
+        CACHE_VALID = 0,
+        CACHE_INVALID = 1
+      } pipeline_state = CACHE_VALID;
+    public:
+      bool caching_enabled() const {
+        return pipeline_state == CACHE_VALID;
+      }
+      bool cache_invalid() const {
+        return !caching_enabled();
+      }
+      void invalidate() {
+        pipeline_state = CACHE_INVALID;
+      }
+      void clear() {
+        pipeline_state = CACHE_VALID;
+      }
+      friend ostream &operator<<(ostream &lhs, const pipeline_state_t &rhs);
+    } pipeline_state;
+
+    op_list waiting_state;        /// writes waiting on pipe_state
+    op_list waiting_reads;        /// writes waiting on partial stripe reads
+    op_list waiting_commit;       /// writes waiting on initial commit
+    eversion_t completed_to;
+    eversion_t committed_to;
+    void start_rmw(OpRef op);
+    bool try_state_to_reads();
+    bool try_reads_to_commit();
+    bool try_finish_rmw();
+    void check_ops();
+
+    void on_change();
+    void call_write_ordered(std::function<void(void)> &&cb);
+
+    CephContext* cct;
+    ECListener *get_parent() const { return parent; }
+    const OSDMapRef& get_osdmap() const { return get_parent()->pgb_get_osdmap(); }
+    epoch_t get_osdmap_epoch() const { return get_parent()->pgb_get_osdmap_epoch(); }
+    const pg_info_t &get_info() { return get_parent()->get_info(); }
+
+    template <typename Func>
+    void objects_read_async_no_cache(
+      const std::map<hobject_t,extent_set> &to_read,
+      Func &&on_complete
+    ) {
+      std::map<hobject_t,std::list<boost::tuple<uint64_t, uint64_t, uint32_t> > > _to_read;
+      for (auto &&hpair: to_read) {
+        auto &l = _to_read[hpair.first];
+        for (auto extent: hpair.second) {
+          l.emplace_back(extent.first, extent.second, 0);
+        }
+      }
+      ec_backend.objects_read_and_reconstruct(
+        _to_read,
+        false,
+        make_gen_lambda_context<
+        std::map<hobject_t,std::pair<int, extent_map> > &&, Func>(
+            std::forward<Func>(on_complete)));
+    }
+    void handle_sub_write(
+      pg_shard_t from,
+      OpRequestRef msg,
+      ECSubWrite &op,
+      const ZTracer::Trace &trace
+    ) {
+      ec_backend.handle_sub_write(from, std::move(msg), op, trace);
+    }
+    // end of iface
+
+    ceph::ErasureCodeInterfaceRef ec_impl;
+    const ECUtil::stripe_info_t& sinfo;
+    ECListener* parent;
+    ECCommon& ec_backend;
+
+    RMWPipeline(CephContext* cct,
+                ceph::ErasureCodeInterfaceRef ec_impl,
+                const ECUtil::stripe_info_t& sinfo,
+                ECListener* parent,
+                ECCommon& ec_backend)
+      : cct(cct),
+        ec_impl(std::move(ec_impl)),
+        sinfo(sinfo),
+        parent(parent),
+        ec_backend(ec_backend) {
+    }
+  };
 };
 
 class ECBackend : public PGBackend, public ECCommon {
@@ -567,206 +766,7 @@ private:
 
 public:
   struct ReadPipeline read_pipeline;
-
-
-  /**
-   * Client writes
-   *
-   * ECTransaction is responsible for generating a transaction for
-   * each shard to which we need to send the write.  As required
-   * by the PGBackend interface, the ECBackend write mechanism
-   * passes trim information with the write and last_complete back
-   * with the reply.
-   *
-   * As with client reads, there is a possibility of out-of-order
-   * completions. Thus, callbacks and completion are called in order
-   * on the writing std::list.
-   */
-
-  struct RMWPipeline {
-    struct Op : boost::intrusive::list_base_hook<> {
-      /// From submit_transaction caller, describes operation
-      hobject_t hoid;
-      object_stat_sum_t delta_stats;
-      eversion_t version;
-      eversion_t trim_to;
-      std::optional<pg_hit_set_history_t> updated_hit_set_history;
-      std::vector<pg_log_entry_t> log_entries;
-      ceph_tid_t tid;
-      osd_reqid_t reqid;
-      ZTracer::Trace trace;
-
-      eversion_t roll_forward_to; /// Soon to be generated internally
-
-      /// Ancillary also provided from submit_transaction caller
-      std::map<hobject_t, ObjectContextRef> obc_map;
-
-      /// see call_write_ordered
-      std::list<std::function<void(void)> > on_write;
-
-      /// Generated internally
-      std::set<hobject_t> temp_added;
-      std::set<hobject_t> temp_cleared;
-
-      ECTransaction::WritePlan plan;
-      bool requires_rmw() const { return !plan.to_read.empty(); }
-      bool invalidates_cache() const { return plan.invalidates_cache; }
-
-      // must be true if requires_rmw(), must be false if invalidates_cache()
-      bool using_cache = true;
-
-      /// In progress read state;
-      std::map<hobject_t,extent_set> pending_read; // subset already being read
-      std::map<hobject_t,extent_set> remote_read;  // subset we must read
-      std::map<hobject_t,extent_map> remote_read_result;
-      bool read_in_progress() const {
-        return !remote_read.empty() && remote_read_result.empty();
-      }
-
-      /// In progress write state.
-      std::set<pg_shard_t> pending_commit;
-      // we need pending_apply for pre-mimic peers so that we don't issue a
-      // read on a remote shard before it has applied a previous write.  We can
-      // remove this after nautilus.
-      std::set<pg_shard_t> pending_apply;
-      bool write_in_progress() const {
-        return !pending_commit.empty() || !pending_apply.empty();
-      }
-
-      /// optional, may be null, for tracking purposes
-      OpRequestRef client_op;
-
-      /// pin for cache
-      ExtentCache::write_pin pin;
-
-      /// Callbacks
-      Context *on_all_commit = nullptr;
-      virtual ~Op() {
-        delete on_all_commit;
-      }
-
-      virtual void generate_transactions(
-        ceph::ErasureCodeInterfaceRef &ecimpl,
-        pg_t pgid,
-        const ECUtil::stripe_info_t &sinfo,
-        std::map<hobject_t,extent_map> *written,
-        std::map<shard_id_t, ObjectStore::Transaction> *transactions,
-        DoutPrefixProvider *dpp,
-        const ceph_release_t require_osd_release = ceph_release_t::unknown) = 0;
-    };
-    using OpRef = std::unique_ptr<Op>;
-    using op_list = boost::intrusive::list<Op>;
-    friend ostream &operator<<(ostream &lhs, const Op &rhs);
-
-    ExtentCache cache;
-    std::map<ceph_tid_t, OpRef> tid_to_op_map; /// Owns Op structure
-    /**
-     * We model the possible rmw states as a std::set of waitlists.
-     * All writes at this time complete in order, so a write blocked
-     * at waiting_state blocks all writes behind it as well (same for
-     * other states).
-     *
-     * Future work: We can break this up into a per-object pipeline
-     * (almost).  First, provide an ordering token to submit_transaction
-     * and require that all operations within a single transaction take
-     * place on a subset of hobject_t space partitioned by that token
-     * (the hashid seem about right to me -- even works for temp objects
-     * if you recall that a temp object created for object head foo will
-     * only ever be referenced by other transactions on foo and aren't
-     * reused).  Next, factor this part into a class and maintain one per
-     * ordering token.  Next, fixup PrimaryLogPG's repop queue to be
-     * partitioned by ordering token.  Finally, refactor the op pipeline
-     * so that the log entries passed into submit_transaction aren't
-     * versioned.  We can't assign versions to them until we actually
-     * submit the operation.  That's probably going to be the hard part.
-     */
-    class pipeline_state_t {
-      enum {
-        CACHE_VALID = 0,
-        CACHE_INVALID = 1
-      } pipeline_state = CACHE_VALID;
-    public:
-      bool caching_enabled() const {
-        return pipeline_state == CACHE_VALID;
-      }
-      bool cache_invalid() const {
-        return !caching_enabled();
-      }
-      void invalidate() {
-        pipeline_state = CACHE_INVALID;
-      }
-      void clear() {
-        pipeline_state = CACHE_VALID;
-      }
-      friend ostream &operator<<(ostream &lhs, const pipeline_state_t &rhs);
-    } pipeline_state;
-
-    op_list waiting_state;        /// writes waiting on pipe_state
-    op_list waiting_reads;        /// writes waiting on partial stripe reads
-    op_list waiting_commit;       /// writes waiting on initial commit
-    eversion_t completed_to;
-    eversion_t committed_to;
-    void start_rmw(OpRef op);
-    bool try_state_to_reads();
-    bool try_reads_to_commit();
-    bool try_finish_rmw();
-    void check_ops();
-
-    void on_change();
-    void call_write_ordered(std::function<void(void)> &&cb);
-
-    CephContext* cct;
-    ECListener *get_parent() const { return parent; }
-    const OSDMapRef& get_osdmap() const { return get_parent()->pgb_get_osdmap(); }
-    epoch_t get_osdmap_epoch() const { return get_parent()->pgb_get_osdmap_epoch(); }
-    const pg_info_t &get_info() { return get_parent()->get_info(); }
-
-    template <typename Func>
-    void objects_read_async_no_cache(
-      const std::map<hobject_t,extent_set> &to_read,
-      Func &&on_complete
-    ) {
-      std::map<hobject_t,std::list<boost::tuple<uint64_t, uint64_t, uint32_t> > > _to_read;
-      for (auto &&hpair: to_read) {
-        auto &l = _to_read[hpair.first];
-        for (auto extent: hpair.second) {
-          l.emplace_back(extent.first, extent.second, 0);
-        }
-      }
-      ec_backend.objects_read_and_reconstruct(
-        _to_read,
-        false,
-        make_gen_lambda_context<
-        std::map<hobject_t,std::pair<int, extent_map> > &&, Func>(
-            std::forward<Func>(on_complete)));
-    }
-    void handle_sub_write(
-      pg_shard_t from,
-      OpRequestRef msg,
-      ECSubWrite &op,
-      const ZTracer::Trace &trace
-    ) {
-      ec_backend.handle_sub_write(from, std::move(msg), op, trace);
-    }
-    // end of iface
-
-    ceph::ErasureCodeInterfaceRef ec_impl;
-    const ECUtil::stripe_info_t& sinfo;
-    ECListener* parent;
-    ECCommon& ec_backend;
-
-    RMWPipeline(CephContext* cct,
-                ceph::ErasureCodeInterfaceRef ec_impl,
-                const ECUtil::stripe_info_t& sinfo,
-                ECListener* parent,
-                ECCommon& ec_backend)
-      : cct(cct),
-        ec_impl(std::move(ec_impl)),
-        sinfo(sinfo),
-        parent(parent),
-        ec_backend(ec_backend) {
-    }
-  } rmw_pipeline;
+  struct RMWPipeline rmw_pipeline;
 
   ceph::ErasureCodeInterfaceRef ec_impl;