]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: dissect ECCommon and ECListener into separated header
authorRadosław Zarzyński <rzarzyns@redhat.com>
Tue, 19 Sep 2023 14:36:50 +0000 (16:36 +0200)
committerRadoslaw Zarzynski <rzarzyns@redhat.com>
Wed, 10 Jan 2024 17:30:07 +0000 (17:30 +0000)
Signed-off-by: Radosław Zarzyński <rzarzyns@redhat.com>
src/osd/ECBackend.h
src/osd/ECCommon.h [new file with mode: 0644]

index 92a8c0b2091f37bcbbae9873fda75bd26d01badf..df8cc6dc54aa1bc839da1814a3e2ccdadb5f64ac 100644 (file)
@@ -18,6 +18,7 @@
 #include <boost/intrusive/set.hpp>
 #include <boost/intrusive/list.hpp>
 
+#include "ECCommon.h"
 #include "OSD.h"
 #include "PGBackend.h"
 #include "erasure-code/ErasureCodeInterface.h"
@@ -33,519 +34,6 @@ struct ECSubReadReply;
 
 struct RecoveryMessages;
 
-// ECListener -- an interface decoupling the pipelines from
-// particular implementation of ECBackend (crimson vs cassical).
-// https://stackoverflow.com/q/7872958
-struct ECListener {
-  virtual ~ECListener() = default;
-  virtual const OSDMapRef& pgb_get_osdmap() const = 0;
-  virtual epoch_t pgb_get_osdmap_epoch() const = 0;
-  virtual const pg_info_t &get_info() const = 0;
-  /**
-   * Called when a pull on soid cannot be completed due to
-   * down peers
-   */
-  // XXX
-  virtual void cancel_pull(
-    const hobject_t &soid) = 0;
-  // XXX
-  virtual void schedule_recovery_work(
-    GenContext<ThreadPool::TPHandle&> *c,
-    uint64_t cost) = 0;
-
-  virtual epoch_t get_interval_start_epoch() const = 0;
-  virtual const std::set<pg_shard_t> &get_acting_shards() const = 0;
-  virtual const std::set<pg_shard_t> &get_backfill_shards() const = 0;
-  virtual const std::map<hobject_t, std::set<pg_shard_t>> &get_missing_loc_shards()
-    const = 0;
-
-  virtual const std::map<pg_shard_t,
-                        pg_missing_t> &get_shard_missing() const = 0;
-  virtual const pg_missing_const_i &get_shard_missing(pg_shard_t peer) const = 0;
-#if 1
-  virtual const pg_missing_const_i * maybe_get_shard_missing(
-    pg_shard_t peer) const = 0;
-  virtual const pg_info_t &get_shard_info(pg_shard_t peer) const = 0;
-#endif
-  virtual ceph_tid_t get_tid() = 0;
-  virtual pg_shard_t whoami_shard() const = 0;
-#if 0
-  int whoami() const {
-    return whoami_shard().osd;
-  }
-  spg_t whoami_spg_t() const {
-    return get_info().pgid;
-  }
-#endif
-  // XXX
-  virtual void send_message_osd_cluster(
-    std::vector<std::pair<int, Message*>>& messages, epoch_t from_epoch) = 0;
-
-  virtual std::ostream& gen_dbg_prefix(std::ostream& out) const = 0;
-
-  // RMWPipeline
-  virtual const pg_pool_t &get_pool() const = 0;
-  virtual const std::set<pg_shard_t> &get_acting_recovery_backfill_shards() const = 0;
-  // XXX
-  virtual bool should_send_op(
-    pg_shard_t peer,
-    const hobject_t &hoid) = 0;
-  virtual const std::map<pg_shard_t, pg_info_t> &get_shard_info() const = 0;
-  virtual spg_t primary_spg_t() const = 0;
-  virtual const PGLog &get_log() const = 0;
-  virtual DoutPrefixProvider *get_dpp() = 0;
-  // XXX
-  virtual void apply_stats(
-     const hobject_t &soid,
-     const object_stat_sum_t &delta_stats) = 0;
-};
-
-struct ECCommon {
-  virtual ~ECCommon() = default;
-
-  virtual void handle_sub_write(
-    pg_shard_t from,
-    OpRequestRef msg,
-    ECSubWrite &op,
-    const ZTracer::Trace &trace
-    ) = 0;
-
-  virtual void objects_read_and_reconstruct(
-    const std::map<hobject_t, std::list<boost::tuple<uint64_t, uint64_t, uint32_t> >
-    > &reads,
-    bool fast_read,
-    GenContextURef<std::map<hobject_t,std::pair<int, extent_map> > &&> &&func) = 0;
-
-  struct read_request_t {
-    const std::list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read;
-    std::map<pg_shard_t, std::vector<std::pair<int, int>>> need;
-    bool want_attrs;
-    read_request_t(
-      const std::list<boost::tuple<uint64_t, uint64_t, uint32_t> > &to_read,
-      const std::map<pg_shard_t, std::vector<std::pair<int, int>>> &need,
-      bool want_attrs)
-      : to_read(to_read), need(need), want_attrs(want_attrs) {}
-  };
-  friend ostream &operator<<(ostream &lhs, const read_request_t &rhs);
-  struct ReadOp;
-  /**
-   * Low level async read mechanism
-   *
-   * To avoid duplicating the logic for requesting and waiting for
-   * multiple object shards, there is a common async read mechanism
-   * taking a std::map of hobject_t->read_request_t which defines callbacks
-   * taking read_result_ts as arguments.
-   *
-   * tid_to_read_map gives open read ops.  check_recovery_sources uses
-   * shard_to_read_map and ReadOp::source_to_obj to restart reads
-   * involving down osds.
-   *
-   * The user is responsible for specifying replicas on which to read
-   * and for reassembling the buffer on the other side since client
-   * reads require the original object buffer while recovery only needs
-   * the missing pieces.
-   *
-   * Rather than handling reads on the primary directly, we simply send
-   * ourselves a message.  This avoids a dedicated primary path for that
-   * part.
-   */
-  struct read_result_t {
-    int r;
-    std::map<pg_shard_t, int> errors;
-    std::optional<std::map<std::string, ceph::buffer::list, std::less<>> > attrs;
-    std::list<
-      boost::tuple<
-       uint64_t, uint64_t, std::map<pg_shard_t, ceph::buffer::list> > > returned;
-    read_result_t() : r(0) {}
-  };
-
-  struct ReadCompleter {
-    virtual void finish_single_request(
-      const hobject_t &hoid,
-      read_result_t &res,
-      std::list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read) = 0;
-
-    virtual void finish(int priority) && = 0;
-
-    virtual ~ReadCompleter() = default;
-  };
-
-  friend struct CallClientContexts;
-  struct ClientAsyncReadStatus {
-    unsigned objects_to_read;
-    GenContextURef<std::map<hobject_t,std::pair<int, extent_map> > &&> func;
-    std::map<hobject_t,std::pair<int, extent_map> > results;
-    explicit ClientAsyncReadStatus(
-      unsigned objects_to_read,
-      GenContextURef<std::map<hobject_t,std::pair<int, extent_map> > &&> &&func)
-      : objects_to_read(objects_to_read), func(std::move(func)) {}
-    void complete_object(
-      const hobject_t &hoid,
-      int err,
-      extent_map &&buffers) {
-      ceph_assert(objects_to_read);
-      --objects_to_read;
-      ceph_assert(!results.count(hoid));
-      results.emplace(hoid, std::make_pair(err, std::move(buffers)));
-    }
-    bool is_complete() const {
-      return objects_to_read == 0;
-    }
-    void run() {
-      func.release()->complete(std::move(results));
-    }
-  };
-
-  struct ReadOp {
-    int priority;
-    ceph_tid_t tid;
-    OpRequestRef op; // may be null if not on behalf of a client
-    // True if redundant reads are issued, false otherwise,
-    // this is useful to tradeoff some resources (redundant ops) for
-    // low latency read, especially on relatively idle cluster
-    bool do_redundant_reads;
-    // True if reading for recovery which could possibly reading only a subset
-    // of the available shards.
-    bool for_recovery;
-    std::unique_ptr<ReadCompleter> on_complete;
-
-    ZTracer::Trace trace;
-
-    std::map<hobject_t, std::set<int>> want_to_read;
-    std::map<hobject_t, read_request_t> to_read;
-    std::map<hobject_t, read_result_t> complete;
-
-    std::map<hobject_t, std::set<pg_shard_t>> obj_to_source;
-    std::map<pg_shard_t, std::set<hobject_t> > source_to_obj;
-
-    void dump(ceph::Formatter *f) const;
-
-    std::set<pg_shard_t> in_progress;
-
-    ReadOp(
-      int priority,
-      ceph_tid_t tid,
-      bool do_redundant_reads,
-      bool for_recovery,
-      std::unique_ptr<ReadCompleter> _on_complete,
-      OpRequestRef op,
-      std::map<hobject_t, std::set<int>> &&_want_to_read,
-      std::map<hobject_t, read_request_t> &&_to_read)
-      : priority(priority),
-        tid(tid),
-        op(op),
-        do_redundant_reads(do_redundant_reads),
-        for_recovery(for_recovery),
-        on_complete(std::move(_on_complete)),
-        want_to_read(std::move(_want_to_read)),
-       to_read(std::move(_to_read)) {
-      for (auto &&hpair: to_read) {
-       auto &returned = complete[hpair.first].returned;
-       for (auto &&extent: hpair.second.to_read) {
-         returned.push_back(
-           boost::make_tuple(
-             extent.get<0>(),
-             extent.get<1>(),
-             std::map<pg_shard_t, ceph::buffer::list>()));
-       }
-      }
-    }
-    ReadOp() = delete;
-    ReadOp(const ReadOp &) = delete; // due to on_complete being unique_ptr
-    ReadOp(ReadOp &&) = default;
-  };
-  struct ReadPipeline {
-    void objects_read_and_reconstruct(
-      const std::map<hobject_t, std::list<boost::tuple<uint64_t, uint64_t, uint32_t> >
-      > &reads,
-      bool fast_read,
-      GenContextURef<std::map<hobject_t,std::pair<int, extent_map> > &&> &&func);
-
-    template <class F>
-    void filter_read_op(
-      const OSDMapRef& osdmap,
-      ReadOp &op,
-      F&& on_erase);
-
-    template <class F>
-    void check_recovery_sources(const OSDMapRef& osdmap, F&& on_erase);
-
-    void complete_read_op(ReadOp &rop);
-
-    void start_read_op(
-      int priority,
-      std::map<hobject_t, std::set<int>> &want_to_read,
-      std::map<hobject_t, read_request_t> &to_read,
-      OpRequestRef op,
-      bool do_redundant_reads,
-      bool for_recovery,
-      std::unique_ptr<ReadCompleter> on_complete);
-
-    void do_read_op(ReadOp &rop);
-
-    int send_all_remaining_reads(
-      const hobject_t &hoid,
-      ReadOp &rop);
-
-    void on_change();
-
-    void kick_reads();
-
-    std::map<ceph_tid_t, ReadOp> tid_to_read_map;
-    std::map<pg_shard_t, std::set<ceph_tid_t> > shard_to_read_map;
-    std::list<ClientAsyncReadStatus> in_progress_client_reads;
-
-    CephContext* cct;
-    ceph::ErasureCodeInterfaceRef ec_impl;
-    const ECUtil::stripe_info_t& sinfo;
-    // TODO: lay an interface down here
-    ECListener* parent;
-
-    ECListener *get_parent() const { return parent; }
-    const OSDMapRef& get_osdmap() const { return get_parent()->pgb_get_osdmap(); }
-    epoch_t get_osdmap_epoch() const { return get_parent()->pgb_get_osdmap_epoch(); }
-    const pg_info_t &get_info() { return get_parent()->get_info(); }
-
-    ReadPipeline(CephContext* cct,
-                ceph::ErasureCodeInterfaceRef ec_impl,
-                const ECUtil::stripe_info_t& sinfo,
-                ECListener* parent)
-      : cct(cct),
-        ec_impl(std::move(ec_impl)),
-        sinfo(sinfo),
-        parent(parent) {
-    }
-
-    int get_remaining_shards(
-      const hobject_t &hoid,
-      const std::set<int> &avail,
-      const std::set<int> &want,
-      const read_result_t &result,
-      std::map<pg_shard_t, std::vector<std::pair<int, int>>> *to_read,
-      bool for_recovery);
-
-    void get_all_avail_shards(
-      const hobject_t &hoid,
-      const std::set<pg_shard_t> &error_shards,
-      std::set<int> &have,
-      std::map<shard_id_t, pg_shard_t> &shards,
-      bool for_recovery);
-
-    friend ostream &operator<<(ostream &lhs, const ReadOp &rhs);
-    friend struct FinishReadOp;
-
-    void get_want_to_read_shards(std::set<int> *want_to_read) const;
-
-    /// Returns to_read replicas sufficient to reconstruct want
-    int get_min_avail_to_read_shards(
-      const hobject_t &hoid,     ///< [in] object
-      const std::set<int> &want,      ///< [in] desired shards
-      bool for_recovery,         ///< [in] true if we may use non-acting replicas
-      bool do_redundant_reads,   ///< [in] true if we want to issue redundant reads to reduce latency
-      std::map<pg_shard_t, std::vector<std::pair<int, int>>> *to_read   ///< [out] shards, corresponding subchunks to read
-      ); ///< @return error code, 0 on success
-  };
-
-  /**
-   * Client writes
-   *
-   * ECTransaction is responsible for generating a transaction for
-   * each shard to which we need to send the write.  As required
-   * by the PGBackend interface, the ECBackend write mechanism
-   * passes trim information with the write and last_complete back
-   * with the reply.
-   *
-   * As with client reads, there is a possibility of out-of-order
-   * completions. Thus, callbacks and completion are called in order
-   * on the writing std::list.
-   */
-
-  struct RMWPipeline {
-    struct Op : boost::intrusive::list_base_hook<> {
-      /// From submit_transaction caller, describes operation
-      hobject_t hoid;
-      object_stat_sum_t delta_stats;
-      eversion_t version;
-      eversion_t trim_to;
-      std::optional<pg_hit_set_history_t> updated_hit_set_history;
-      std::vector<pg_log_entry_t> log_entries;
-      ceph_tid_t tid;
-      osd_reqid_t reqid;
-      ZTracer::Trace trace;
-
-      eversion_t roll_forward_to; /// Soon to be generated internally
-
-      /// Ancillary also provided from submit_transaction caller
-      std::map<hobject_t, ObjectContextRef> obc_map;
-
-      /// see call_write_ordered
-      std::list<std::function<void(void)> > on_write;
-
-      /// Generated internally
-      std::set<hobject_t> temp_added;
-      std::set<hobject_t> temp_cleared;
-
-      ECTransaction::WritePlan plan;
-      bool requires_rmw() const { return !plan.to_read.empty(); }
-      bool invalidates_cache() const { return plan.invalidates_cache; }
-
-      // must be true if requires_rmw(), must be false if invalidates_cache()
-      bool using_cache = true;
-
-      /// In progress read state;
-      std::map<hobject_t,extent_set> pending_read; // subset already being read
-      std::map<hobject_t,extent_set> remote_read;  // subset we must read
-      std::map<hobject_t,extent_map> remote_read_result;
-      bool read_in_progress() const {
-        return !remote_read.empty() && remote_read_result.empty();
-      }
-
-      /// In progress write state.
-      std::set<pg_shard_t> pending_commit;
-      // we need pending_apply for pre-mimic peers so that we don't issue a
-      // read on a remote shard before it has applied a previous write.  We can
-      // remove this after nautilus.
-      std::set<pg_shard_t> pending_apply;
-      bool write_in_progress() const {
-        return !pending_commit.empty() || !pending_apply.empty();
-      }
-
-      /// optional, may be null, for tracking purposes
-      OpRequestRef client_op;
-
-      /// pin for cache
-      ExtentCache::write_pin pin;
-
-      /// Callbacks
-      Context *on_all_commit = nullptr;
-      virtual ~Op() {
-        delete on_all_commit;
-      }
-
-      virtual void generate_transactions(
-        ceph::ErasureCodeInterfaceRef &ecimpl,
-        pg_t pgid,
-        const ECUtil::stripe_info_t &sinfo,
-        std::map<hobject_t,extent_map> *written,
-        std::map<shard_id_t, ObjectStore::Transaction> *transactions,
-        DoutPrefixProvider *dpp,
-        const ceph_release_t require_osd_release = ceph_release_t::unknown) = 0;
-    };
-    using OpRef = std::unique_ptr<Op>;
-    using op_list = boost::intrusive::list<Op>;
-    friend ostream &operator<<(ostream &lhs, const Op &rhs);
-
-    ExtentCache cache;
-    std::map<ceph_tid_t, OpRef> tid_to_op_map; /// Owns Op structure
-    /**
-     * We model the possible rmw states as a std::set of waitlists.
-     * All writes at this time complete in order, so a write blocked
-     * at waiting_state blocks all writes behind it as well (same for
-     * other states).
-     *
-     * Future work: We can break this up into a per-object pipeline
-     * (almost).  First, provide an ordering token to submit_transaction
-     * and require that all operations within a single transaction take
-     * place on a subset of hobject_t space partitioned by that token
-     * (the hashid seem about right to me -- even works for temp objects
-     * if you recall that a temp object created for object head foo will
-     * only ever be referenced by other transactions on foo and aren't
-     * reused).  Next, factor this part into a class and maintain one per
-     * ordering token.  Next, fixup PrimaryLogPG's repop queue to be
-     * partitioned by ordering token.  Finally, refactor the op pipeline
-     * so that the log entries passed into submit_transaction aren't
-     * versioned.  We can't assign versions to them until we actually
-     * submit the operation.  That's probably going to be the hard part.
-     */
-    class pipeline_state_t {
-      enum {
-        CACHE_VALID = 0,
-        CACHE_INVALID = 1
-      } pipeline_state = CACHE_VALID;
-    public:
-      bool caching_enabled() const {
-        return pipeline_state == CACHE_VALID;
-      }
-      bool cache_invalid() const {
-        return !caching_enabled();
-      }
-      void invalidate() {
-        pipeline_state = CACHE_INVALID;
-      }
-      void clear() {
-        pipeline_state = CACHE_VALID;
-      }
-      friend ostream &operator<<(ostream &lhs, const pipeline_state_t &rhs);
-    } pipeline_state;
-
-    op_list waiting_state;        /// writes waiting on pipe_state
-    op_list waiting_reads;        /// writes waiting on partial stripe reads
-    op_list waiting_commit;       /// writes waiting on initial commit
-    eversion_t completed_to;
-    eversion_t committed_to;
-    void start_rmw(OpRef op);
-    bool try_state_to_reads();
-    bool try_reads_to_commit();
-    bool try_finish_rmw();
-    void check_ops();
-
-    void on_change();
-    void call_write_ordered(std::function<void(void)> &&cb);
-
-    CephContext* cct;
-    ECListener *get_parent() const { return parent; }
-    const OSDMapRef& get_osdmap() const { return get_parent()->pgb_get_osdmap(); }
-    epoch_t get_osdmap_epoch() const { return get_parent()->pgb_get_osdmap_epoch(); }
-    const pg_info_t &get_info() { return get_parent()->get_info(); }
-
-    template <typename Func>
-    void objects_read_async_no_cache(
-      const std::map<hobject_t,extent_set> &to_read,
-      Func &&on_complete
-    ) {
-      std::map<hobject_t,std::list<boost::tuple<uint64_t, uint64_t, uint32_t> > > _to_read;
-      for (auto &&hpair: to_read) {
-        auto &l = _to_read[hpair.first];
-        for (auto extent: hpair.second) {
-          l.emplace_back(extent.first, extent.second, 0);
-        }
-      }
-      ec_backend.objects_read_and_reconstruct(
-        _to_read,
-        false,
-        make_gen_lambda_context<
-        std::map<hobject_t,std::pair<int, extent_map> > &&, Func>(
-            std::forward<Func>(on_complete)));
-    }
-    void handle_sub_write(
-      pg_shard_t from,
-      OpRequestRef msg,
-      ECSubWrite &op,
-      const ZTracer::Trace &trace
-    ) {
-      ec_backend.handle_sub_write(from, std::move(msg), op, trace);
-    }
-    // end of iface
-
-    ceph::ErasureCodeInterfaceRef ec_impl;
-    const ECUtil::stripe_info_t& sinfo;
-    ECListener* parent;
-    ECCommon& ec_backend;
-
-    RMWPipeline(CephContext* cct,
-                ceph::ErasureCodeInterfaceRef ec_impl,
-                const ECUtil::stripe_info_t& sinfo,
-                ECListener* parent,
-                ECCommon& ec_backend)
-      : cct(cct),
-        ec_impl(std::move(ec_impl)),
-        sinfo(sinfo),
-        parent(parent),
-        ec_backend(ec_backend) {
-    }
-  };
-};
-
 class ECBackend : public PGBackend, public ECCommon {
 public:
   RecoveryHandle *open_recovery_op() override;
diff --git a/src/osd/ECCommon.h b/src/osd/ECCommon.h
new file mode 100644 (file)
index 0000000..bdbc059
--- /dev/null
@@ -0,0 +1,545 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013 Inktank Storage, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <boost/intrusive/set.hpp>
+#include <boost/intrusive/list.hpp>
+
+#include "OSD.h"
+#include "PGBackend.h"
+#include "erasure-code/ErasureCodeInterface.h"
+#include "ECUtil.h"
+#include "ECTransaction.h"
+#include "ExtentCache.h"
+
+//forward declaration
+struct ECSubWrite;
+struct ECSubWriteReply;
+struct ECSubRead;
+struct ECSubReadReply;
+
+// ECListener -- an interface decoupling the pipelines from
+// particular implementation of ECBackend (crimson vs cassical).
+// https://stackoverflow.com/q/7872958
+struct ECListener {
+  virtual ~ECListener() = default;
+  virtual const OSDMapRef& pgb_get_osdmap() const = 0;
+  virtual epoch_t pgb_get_osdmap_epoch() const = 0;
+  virtual const pg_info_t &get_info() const = 0;
+  /**
+   * Called when a pull on soid cannot be completed due to
+   * down peers
+   */
+  // XXX
+  virtual void cancel_pull(
+    const hobject_t &soid) = 0;
+  // XXX
+  virtual void schedule_recovery_work(
+    GenContext<ThreadPool::TPHandle&> *c,
+    uint64_t cost) = 0;
+
+  virtual epoch_t get_interval_start_epoch() const = 0;
+  virtual const std::set<pg_shard_t> &get_acting_shards() const = 0;
+  virtual const std::set<pg_shard_t> &get_backfill_shards() const = 0;
+  virtual const std::map<hobject_t, std::set<pg_shard_t>> &get_missing_loc_shards()
+    const = 0;
+
+  virtual const std::map<pg_shard_t,
+                        pg_missing_t> &get_shard_missing() const = 0;
+  virtual const pg_missing_const_i &get_shard_missing(pg_shard_t peer) const = 0;
+#if 1
+  virtual const pg_missing_const_i * maybe_get_shard_missing(
+    pg_shard_t peer) const = 0;
+  virtual const pg_info_t &get_shard_info(pg_shard_t peer) const = 0;
+#endif
+  virtual ceph_tid_t get_tid() = 0;
+  virtual pg_shard_t whoami_shard() const = 0;
+#if 0
+  int whoami() const {
+    return whoami_shard().osd;
+  }
+  spg_t whoami_spg_t() const {
+    return get_info().pgid;
+  }
+#endif
+  // XXX
+  virtual void send_message_osd_cluster(
+    std::vector<std::pair<int, Message*>>& messages, epoch_t from_epoch) = 0;
+
+  virtual std::ostream& gen_dbg_prefix(std::ostream& out) const = 0;
+
+  // RMWPipeline
+  virtual const pg_pool_t &get_pool() const = 0;
+  virtual const std::set<pg_shard_t> &get_acting_recovery_backfill_shards() const = 0;
+  // XXX
+  virtual bool should_send_op(
+    pg_shard_t peer,
+    const hobject_t &hoid) = 0;
+  virtual const std::map<pg_shard_t, pg_info_t> &get_shard_info() const = 0;
+  virtual spg_t primary_spg_t() const = 0;
+  virtual const PGLog &get_log() const = 0;
+  virtual DoutPrefixProvider *get_dpp() = 0;
+  // XXX
+  virtual void apply_stats(
+     const hobject_t &soid,
+     const object_stat_sum_t &delta_stats) = 0;
+};
+
+struct ECCommon {
+  virtual ~ECCommon() = default;
+
+  virtual void handle_sub_write(
+    pg_shard_t from,
+    OpRequestRef msg,
+    ECSubWrite &op,
+    const ZTracer::Trace &trace
+    ) = 0;
+
+  virtual void objects_read_and_reconstruct(
+    const std::map<hobject_t, std::list<boost::tuple<uint64_t, uint64_t, uint32_t> >
+    > &reads,
+    bool fast_read,
+    GenContextURef<std::map<hobject_t,std::pair<int, extent_map> > &&> &&func) = 0;
+
+  struct read_request_t {
+    const std::list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read;
+    std::map<pg_shard_t, std::vector<std::pair<int, int>>> need;
+    bool want_attrs;
+    read_request_t(
+      const std::list<boost::tuple<uint64_t, uint64_t, uint32_t> > &to_read,
+      const std::map<pg_shard_t, std::vector<std::pair<int, int>>> &need,
+      bool want_attrs)
+      : to_read(to_read), need(need), want_attrs(want_attrs) {}
+  };
+  friend ostream &operator<<(ostream &lhs, const read_request_t &rhs);
+  struct ReadOp;
+  /**
+   * Low level async read mechanism
+   *
+   * To avoid duplicating the logic for requesting and waiting for
+   * multiple object shards, there is a common async read mechanism
+   * taking a std::map of hobject_t->read_request_t which defines callbacks
+   * taking read_result_ts as arguments.
+   *
+   * tid_to_read_map gives open read ops.  check_recovery_sources uses
+   * shard_to_read_map and ReadOp::source_to_obj to restart reads
+   * involving down osds.
+   *
+   * The user is responsible for specifying replicas on which to read
+   * and for reassembling the buffer on the other side since client
+   * reads require the original object buffer while recovery only needs
+   * the missing pieces.
+   *
+   * Rather than handling reads on the primary directly, we simply send
+   * ourselves a message.  This avoids a dedicated primary path for that
+   * part.
+   */
+  struct read_result_t {
+    int r;
+    std::map<pg_shard_t, int> errors;
+    std::optional<std::map<std::string, ceph::buffer::list, std::less<>> > attrs;
+    std::list<
+      boost::tuple<
+       uint64_t, uint64_t, std::map<pg_shard_t, ceph::buffer::list> > > returned;
+    read_result_t() : r(0) {}
+  };
+
+  struct ReadCompleter {
+    virtual void finish_single_request(
+      const hobject_t &hoid,
+      read_result_t &res,
+      std::list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read) = 0;
+
+    virtual void finish(int priority) && = 0;
+
+    virtual ~ReadCompleter() = default;
+  };
+
+  friend struct CallClientContexts;
+  struct ClientAsyncReadStatus {
+    unsigned objects_to_read;
+    GenContextURef<std::map<hobject_t,std::pair<int, extent_map> > &&> func;
+    std::map<hobject_t,std::pair<int, extent_map> > results;
+    explicit ClientAsyncReadStatus(
+      unsigned objects_to_read,
+      GenContextURef<std::map<hobject_t,std::pair<int, extent_map> > &&> &&func)
+      : objects_to_read(objects_to_read), func(std::move(func)) {}
+    void complete_object(
+      const hobject_t &hoid,
+      int err,
+      extent_map &&buffers) {
+      ceph_assert(objects_to_read);
+      --objects_to_read;
+      ceph_assert(!results.count(hoid));
+      results.emplace(hoid, std::make_pair(err, std::move(buffers)));
+    }
+    bool is_complete() const {
+      return objects_to_read == 0;
+    }
+    void run() {
+      func.release()->complete(std::move(results));
+    }
+  };
+
+  struct ReadOp {
+    int priority;
+    ceph_tid_t tid;
+    OpRequestRef op; // may be null if not on behalf of a client
+    // True if redundant reads are issued, false otherwise,
+    // this is useful to tradeoff some resources (redundant ops) for
+    // low latency read, especially on relatively idle cluster
+    bool do_redundant_reads;
+    // True if reading for recovery which could possibly reading only a subset
+    // of the available shards.
+    bool for_recovery;
+    std::unique_ptr<ReadCompleter> on_complete;
+
+    ZTracer::Trace trace;
+
+    std::map<hobject_t, std::set<int>> want_to_read;
+    std::map<hobject_t, read_request_t> to_read;
+    std::map<hobject_t, read_result_t> complete;
+
+    std::map<hobject_t, std::set<pg_shard_t>> obj_to_source;
+    std::map<pg_shard_t, std::set<hobject_t> > source_to_obj;
+
+    void dump(ceph::Formatter *f) const;
+
+    std::set<pg_shard_t> in_progress;
+
+    ReadOp(
+      int priority,
+      ceph_tid_t tid,
+      bool do_redundant_reads,
+      bool for_recovery,
+      std::unique_ptr<ReadCompleter> _on_complete,
+      OpRequestRef op,
+      std::map<hobject_t, std::set<int>> &&_want_to_read,
+      std::map<hobject_t, read_request_t> &&_to_read)
+      : priority(priority),
+        tid(tid),
+        op(op),
+        do_redundant_reads(do_redundant_reads),
+        for_recovery(for_recovery),
+        on_complete(std::move(_on_complete)),
+        want_to_read(std::move(_want_to_read)),
+       to_read(std::move(_to_read)) {
+      for (auto &&hpair: to_read) {
+       auto &returned = complete[hpair.first].returned;
+       for (auto &&extent: hpair.second.to_read) {
+         returned.push_back(
+           boost::make_tuple(
+             extent.get<0>(),
+             extent.get<1>(),
+             std::map<pg_shard_t, ceph::buffer::list>()));
+       }
+      }
+    }
+    ReadOp() = delete;
+    ReadOp(const ReadOp &) = delete; // due to on_complete being unique_ptr
+    ReadOp(ReadOp &&) = default;
+  };
+  struct ReadPipeline {
+    void objects_read_and_reconstruct(
+      const std::map<hobject_t, std::list<boost::tuple<uint64_t, uint64_t, uint32_t> >
+      > &reads,
+      bool fast_read,
+      GenContextURef<std::map<hobject_t,std::pair<int, extent_map> > &&> &&func);
+
+    template <class F>
+    void filter_read_op(
+      const OSDMapRef& osdmap,
+      ReadOp &op,
+      F&& on_erase);
+
+    template <class F>
+    void check_recovery_sources(const OSDMapRef& osdmap, F&& on_erase);
+
+    void complete_read_op(ReadOp &rop);
+
+    void start_read_op(
+      int priority,
+      std::map<hobject_t, std::set<int>> &want_to_read,
+      std::map<hobject_t, read_request_t> &to_read,
+      OpRequestRef op,
+      bool do_redundant_reads,
+      bool for_recovery,
+      std::unique_ptr<ReadCompleter> on_complete);
+
+    void do_read_op(ReadOp &rop);
+
+    int send_all_remaining_reads(
+      const hobject_t &hoid,
+      ReadOp &rop);
+
+    void on_change();
+
+    void kick_reads();
+
+    std::map<ceph_tid_t, ReadOp> tid_to_read_map;
+    std::map<pg_shard_t, std::set<ceph_tid_t> > shard_to_read_map;
+    std::list<ClientAsyncReadStatus> in_progress_client_reads;
+
+    CephContext* cct;
+    ceph::ErasureCodeInterfaceRef ec_impl;
+    const ECUtil::stripe_info_t& sinfo;
+    // TODO: lay an interface down here
+    ECListener* parent;
+
+    ECListener *get_parent() const { return parent; }
+    const OSDMapRef& get_osdmap() const { return get_parent()->pgb_get_osdmap(); }
+    epoch_t get_osdmap_epoch() const { return get_parent()->pgb_get_osdmap_epoch(); }
+    const pg_info_t &get_info() { return get_parent()->get_info(); }
+
+    ReadPipeline(CephContext* cct,
+                ceph::ErasureCodeInterfaceRef ec_impl,
+                const ECUtil::stripe_info_t& sinfo,
+                ECListener* parent)
+      : cct(cct),
+        ec_impl(std::move(ec_impl)),
+        sinfo(sinfo),
+        parent(parent) {
+    }
+
+    int get_remaining_shards(
+      const hobject_t &hoid,
+      const std::set<int> &avail,
+      const std::set<int> &want,
+      const read_result_t &result,
+      std::map<pg_shard_t, std::vector<std::pair<int, int>>> *to_read,
+      bool for_recovery);
+
+    void get_all_avail_shards(
+      const hobject_t &hoid,
+      const std::set<pg_shard_t> &error_shards,
+      std::set<int> &have,
+      std::map<shard_id_t, pg_shard_t> &shards,
+      bool for_recovery);
+
+    friend ostream &operator<<(ostream &lhs, const ReadOp &rhs);
+    friend struct FinishReadOp;
+
+    void get_want_to_read_shards(std::set<int> *want_to_read) const;
+
+    /// Returns to_read replicas sufficient to reconstruct want
+    int get_min_avail_to_read_shards(
+      const hobject_t &hoid,     ///< [in] object
+      const std::set<int> &want,      ///< [in] desired shards
+      bool for_recovery,         ///< [in] true if we may use non-acting replicas
+      bool do_redundant_reads,   ///< [in] true if we want to issue redundant reads to reduce latency
+      std::map<pg_shard_t, std::vector<std::pair<int, int>>> *to_read   ///< [out] shards, corresponding subchunks to read
+      ); ///< @return error code, 0 on success
+  };
+
+  /**
+   * Client writes
+   *
+   * ECTransaction is responsible for generating a transaction for
+   * each shard to which we need to send the write.  As required
+   * by the PGBackend interface, the ECBackend write mechanism
+   * passes trim information with the write and last_complete back
+   * with the reply.
+   *
+   * As with client reads, there is a possibility of out-of-order
+   * completions. Thus, callbacks and completion are called in order
+   * on the writing std::list.
+   */
+
+  struct RMWPipeline {
+    struct Op : boost::intrusive::list_base_hook<> {
+      /// From submit_transaction caller, describes operation
+      hobject_t hoid;
+      object_stat_sum_t delta_stats;
+      eversion_t version;
+      eversion_t trim_to;
+      std::optional<pg_hit_set_history_t> updated_hit_set_history;
+      std::vector<pg_log_entry_t> log_entries;
+      ceph_tid_t tid;
+      osd_reqid_t reqid;
+      ZTracer::Trace trace;
+
+      eversion_t roll_forward_to; /// Soon to be generated internally
+
+      /// Ancillary also provided from submit_transaction caller
+      std::map<hobject_t, ObjectContextRef> obc_map;
+
+      /// see call_write_ordered
+      std::list<std::function<void(void)> > on_write;
+
+      /// Generated internally
+      std::set<hobject_t> temp_added;
+      std::set<hobject_t> temp_cleared;
+
+      ECTransaction::WritePlan plan;
+      bool requires_rmw() const { return !plan.to_read.empty(); }
+      bool invalidates_cache() const { return plan.invalidates_cache; }
+
+      // must be true if requires_rmw(), must be false if invalidates_cache()
+      bool using_cache = true;
+
+      /// In progress read state;
+      std::map<hobject_t,extent_set> pending_read; // subset already being read
+      std::map<hobject_t,extent_set> remote_read;  // subset we must read
+      std::map<hobject_t,extent_map> remote_read_result;
+      bool read_in_progress() const {
+        return !remote_read.empty() && remote_read_result.empty();
+      }
+
+      /// In progress write state.
+      std::set<pg_shard_t> pending_commit;
+      // we need pending_apply for pre-mimic peers so that we don't issue a
+      // read on a remote shard before it has applied a previous write.  We can
+      // remove this after nautilus.
+      std::set<pg_shard_t> pending_apply;
+      bool write_in_progress() const {
+        return !pending_commit.empty() || !pending_apply.empty();
+      }
+
+      /// optional, may be null, for tracking purposes
+      OpRequestRef client_op;
+
+      /// pin for cache
+      ExtentCache::write_pin pin;
+
+      /// Callbacks
+      Context *on_all_commit = nullptr;
+      virtual ~Op() {
+        delete on_all_commit;
+      }
+
+      virtual void generate_transactions(
+        ceph::ErasureCodeInterfaceRef &ecimpl,
+        pg_t pgid,
+        const ECUtil::stripe_info_t &sinfo,
+        std::map<hobject_t,extent_map> *written,
+        std::map<shard_id_t, ObjectStore::Transaction> *transactions,
+        DoutPrefixProvider *dpp,
+        const ceph_release_t require_osd_release = ceph_release_t::unknown) = 0;
+    };
+    using OpRef = std::unique_ptr<Op>;
+    using op_list = boost::intrusive::list<Op>;
+    friend ostream &operator<<(ostream &lhs, const Op &rhs);
+
+    ExtentCache cache;
+    std::map<ceph_tid_t, OpRef> tid_to_op_map; /// Owns Op structure
+    /**
+     * We model the possible rmw states as a std::set of waitlists.
+     * All writes at this time complete in order, so a write blocked
+     * at waiting_state blocks all writes behind it as well (same for
+     * other states).
+     *
+     * Future work: We can break this up into a per-object pipeline
+     * (almost).  First, provide an ordering token to submit_transaction
+     * and require that all operations within a single transaction take
+     * place on a subset of hobject_t space partitioned by that token
+     * (the hashid seem about right to me -- even works for temp objects
+     * if you recall that a temp object created for object head foo will
+     * only ever be referenced by other transactions on foo and aren't
+     * reused).  Next, factor this part into a class and maintain one per
+     * ordering token.  Next, fixup PrimaryLogPG's repop queue to be
+     * partitioned by ordering token.  Finally, refactor the op pipeline
+     * so that the log entries passed into submit_transaction aren't
+     * versioned.  We can't assign versions to them until we actually
+     * submit the operation.  That's probably going to be the hard part.
+     */
+    class pipeline_state_t {
+      enum {
+        CACHE_VALID = 0,
+        CACHE_INVALID = 1
+      } pipeline_state = CACHE_VALID;
+    public:
+      bool caching_enabled() const {
+        return pipeline_state == CACHE_VALID;
+      }
+      bool cache_invalid() const {
+        return !caching_enabled();
+      }
+      void invalidate() {
+        pipeline_state = CACHE_INVALID;
+      }
+      void clear() {
+        pipeline_state = CACHE_VALID;
+      }
+      friend ostream &operator<<(ostream &lhs, const pipeline_state_t &rhs);
+    } pipeline_state;
+
+    op_list waiting_state;        /// writes waiting on pipe_state
+    op_list waiting_reads;        /// writes waiting on partial stripe reads
+    op_list waiting_commit;       /// writes waiting on initial commit
+    eversion_t completed_to;
+    eversion_t committed_to;
+    void start_rmw(OpRef op);
+    bool try_state_to_reads();
+    bool try_reads_to_commit();
+    bool try_finish_rmw();
+    void check_ops();
+
+    void on_change();
+    void call_write_ordered(std::function<void(void)> &&cb);
+
+    CephContext* cct;
+    ECListener *get_parent() const { return parent; }
+    const OSDMapRef& get_osdmap() const { return get_parent()->pgb_get_osdmap(); }
+    epoch_t get_osdmap_epoch() const { return get_parent()->pgb_get_osdmap_epoch(); }
+    const pg_info_t &get_info() { return get_parent()->get_info(); }
+
+    template <typename Func>
+    void objects_read_async_no_cache(
+      const std::map<hobject_t,extent_set> &to_read,
+      Func &&on_complete
+    ) {
+      std::map<hobject_t,std::list<boost::tuple<uint64_t, uint64_t, uint32_t> > > _to_read;
+      for (auto &&hpair: to_read) {
+        auto &l = _to_read[hpair.first];
+        for (auto extent: hpair.second) {
+          l.emplace_back(extent.first, extent.second, 0);
+        }
+      }
+      ec_backend.objects_read_and_reconstruct(
+        _to_read,
+        false,
+        make_gen_lambda_context<
+        std::map<hobject_t,std::pair<int, extent_map> > &&, Func>(
+            std::forward<Func>(on_complete)));
+    }
+    void handle_sub_write(
+      pg_shard_t from,
+      OpRequestRef msg,
+      ECSubWrite &op,
+      const ZTracer::Trace &trace
+    ) {
+      ec_backend.handle_sub_write(from, std::move(msg), op, trace);
+    }
+    // end of iface
+
+    ceph::ErasureCodeInterfaceRef ec_impl;
+    const ECUtil::stripe_info_t& sinfo;
+    ECListener* parent;
+    ECCommon& ec_backend;
+
+    RMWPipeline(CephContext* cct,
+                ceph::ErasureCodeInterfaceRef ec_impl,
+                const ECUtil::stripe_info_t& sinfo,
+                ECListener* parent,
+                ECCommon& ec_backend)
+      : cct(cct),
+        ec_impl(std::move(ec_impl)),
+        sinfo(sinfo),
+        parent(parent),
+        ec_backend(ec_backend) {
+    }
+  };
+};
+