]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: dissect crimson-shareable parts of ECBackend into ECCommon
authorRadosław Zarzyński <rzarzyns@redhat.com>
Tue, 19 Sep 2023 12:48:08 +0000 (14:48 +0200)
committerRadoslaw Zarzynski <rzarzyns@redhat.com>
Wed, 10 Jan 2024 17:27:10 +0000 (17:27 +0000)
Signed-off-by: Radosław Zarzyński <rzarzyns@redhat.com>
src/osd/ECBackend.cc
src/osd/ECBackend.h

index ec8fdc0637a292f480c568381f2727d8ffd67db2..a19e95cba9d1c30727133c4d765ace87aefbb56a 100644 (file)
@@ -58,7 +58,7 @@ static ostream& _prefix(std::ostream *_dout, ECBackend *pgb) {
 static ostream& _prefix(std::ostream *_dout, ECBackend::RMWPipeline *rmw_pipeline) {
   return rmw_pipeline->get_parent()->gen_dbg_prefix(*_dout);
 }
-static ostream& _prefix(std::ostream *_dout, ECBackend::ReadPipeline *read_pipeline) {
+static ostream& _prefix(std::ostream *_dout, ECCommon::ReadPipeline *read_pipeline) {
   return read_pipeline->get_parent()->gen_dbg_prefix(*_dout);
 }
 
@@ -112,7 +112,7 @@ static ostream &operator<<(
             << rhs.get<1>() << ", " << rhs.get<2>() << ")";
 }
 
-ostream &operator<<(ostream &lhs, const ECBackend::read_request_t &rhs)
+ostream &operator<<(ostream &lhs, const ECCommon::read_request_t &rhs)
 {
   return lhs << "read_request_t(to_read=[" << rhs.to_read << "]"
             << ", need=" << rhs.need
@@ -120,7 +120,7 @@ ostream &operator<<(ostream &lhs, const ECBackend::read_request_t &rhs)
             << ")";
 }
 
-ostream &operator<<(ostream &lhs, const ECBackend::read_result_t &rhs)
+ostream &operator<<(ostream &lhs, const ECCommon::read_result_t &rhs)
 {
   lhs << "read_result_t(r=" << rhs.r
       << ", errors=" << rhs.errors;
@@ -132,7 +132,7 @@ ostream &operator<<(ostream &lhs, const ECBackend::read_result_t &rhs)
   return lhs << ", returned=" << rhs.returned << ")";
 }
 
-ostream &operator<<(ostream &lhs, const ECBackend::ReadOp &rhs)
+ostream &operator<<(ostream &lhs, const ECCommon::ReadOp &rhs)
 {
   lhs << "ReadOp(tid=" << rhs.tid;
   if (rhs.op && rhs.op->get_req()) {
@@ -147,7 +147,7 @@ ostream &operator<<(ostream &lhs, const ECBackend::ReadOp &rhs)
             << ", in_progress=" << rhs.in_progress << ")";
 }
 
-void ECBackend::ReadOp::dump(Formatter *f) const
+void ECCommon::ReadOp::dump(Formatter *f) const
 {
   f->dump_unsigned("tid", tid);
   if (op && op->get_req()) {
@@ -237,7 +237,7 @@ PGBackend::RecoveryHandle *ECBackend::open_recovery_op()
   return new ECRecoveryHandle;
 }
 
-void ECBackend::_failed_push(const hobject_t &hoid, ECBackend::read_result_t &res)
+void ECBackend::_failed_push(const hobject_t &hoid, ECCommon::read_result_t &res)
 {
   dout(10) << __func__ << ": Read error " << hoid << " r="
           << res.r << " errors=" << res.errors << dendl;
@@ -256,7 +256,7 @@ void ECBackend::_failed_push(const hobject_t &hoid, ECBackend::read_result_t &re
 
 struct RecoveryMessages {
   map<hobject_t,
-      ECBackend::read_request_t> recovery_reads;
+      ECCommon::read_request_t> recovery_reads;
   map<hobject_t, set<int>> want_to_read;
 
   void recovery_read(
@@ -272,7 +272,7 @@ struct RecoveryMessages {
     recovery_reads.insert(
       make_pair(
        hoid,
-       ECBackend::read_request_t(
+       ECCommon::read_request_t(
          to_read,
          need,
          attrs)));
@@ -515,13 +515,13 @@ struct SendPushReplies : public Context {
   }
 };
 
-struct RecoveryReadCompleter : ECBackend::ReadCompleter {
+struct RecoveryReadCompleter : ECCommon::ReadCompleter {
   RecoveryReadCompleter(ECBackend& backend)
     : backend(backend) {}
 
   void finish_single_request(
     const hobject_t &hoid,
-    ECBackend::read_result_t &res,
+    ECCommon::read_result_t &res,
     list<boost::tuple<uint64_t, uint64_t, uint32_t> >) override
   {
     if (!(res.r == 0 && res.errors.empty())) {
@@ -1347,7 +1347,7 @@ void ECBackend::handle_sub_read_reply(
   }
 }
 
-void ECBackend::ReadPipeline::complete_read_op(ReadOp &rop)
+void ECCommon::ReadPipeline::complete_read_op(ReadOp &rop)
 {
   map<hobject_t, read_request_t>::iterator reqiter =
     rop.to_read.begin();
@@ -1374,9 +1374,9 @@ void ECBackend::ReadPipeline::complete_read_op(ReadOp &rop)
 }
 
 struct FinishReadOp : public GenContext<ThreadPool::TPHandle&>  {
-  ECBackend::ReadPipeline& read_pipeline;
+  ECCommon::ReadPipeline& read_pipeline;
   ceph_tid_t tid;
-  FinishReadOp(ECBackend::ReadPipeline& read_pipeline, ceph_tid_t tid)
+  FinishReadOp(ECCommon::ReadPipeline& read_pipeline, ceph_tid_t tid)
     : read_pipeline(read_pipeline), tid(tid) {}
   void finish(ThreadPool::TPHandle&) override {
     auto ropiter = read_pipeline.tid_to_read_map.find(tid);
@@ -1386,7 +1386,7 @@ struct FinishReadOp : public GenContext<ThreadPool::TPHandle&>  {
 };
 
 template <class F>
-void ECBackend::ReadPipeline::filter_read_op(
+void ECCommon::ReadPipeline::filter_read_op(
   const OSDMapRef& osdmap,
   ReadOp &op,
   F&& on_erase)
@@ -1462,7 +1462,7 @@ void ECBackend::ReadPipeline::filter_read_op(
 }
 
 template <class F>
-void ECBackend::ReadPipeline::check_recovery_sources(
+void ECCommon::ReadPipeline::check_recovery_sources(
   const OSDMapRef& osdmap,
   F&& on_erase)
 {
@@ -1494,7 +1494,7 @@ void ECBackend::check_recovery_sources(const OSDMapRef& osdmap)
   });
 }
 
-void ECBackend::ReadPipeline::on_change()
+void ECCommon::ReadPipeline::on_change()
 {
   for (map<ceph_tid_t, ReadOp>::iterator i = tid_to_read_map.begin();
        i != tid_to_read_map.end();
@@ -1663,7 +1663,7 @@ void ECBackend::RMWPipeline::call_write_ordered(std::function<void(void)> &&cb)
   }
 }
 
-void ECBackend::ReadPipeline::get_all_avail_shards(
+void ECCommon::ReadPipeline::get_all_avail_shards(
   const hobject_t &hoid,
   const set<pg_shard_t> &error_shards,
   set<int> &have,
@@ -1767,7 +1767,7 @@ int ECBackend::get_min_avail_to_read_shards(
   return 0;
 }
 
-int ECBackend::ReadPipeline::get_remaining_shards(
+int ECCommon::ReadPipeline::get_remaining_shards(
   const hobject_t &hoid,
   const set<int> &avail,
   const set<int> &want,
@@ -1813,14 +1813,14 @@ int ECBackend::ReadPipeline::get_remaining_shards(
   return 0;
 }
 
-void ECBackend::ReadPipeline::start_read_op(
+void ECCommon::ReadPipeline::start_read_op(
   int priority,
   map<hobject_t, set<int>> &want_to_read,
   map<hobject_t, read_request_t> &to_read,
   OpRequestRef _op,
   bool do_redundant_reads,
   bool for_recovery,
-  std::unique_ptr<ECBackend::ReadCompleter> on_complete)
+  std::unique_ptr<ECCommon::ReadCompleter> on_complete)
 {
   ceph_tid_t tid = get_parent()->get_tid();
   ceph_assert(!tid_to_read_map.count(tid));
@@ -1843,7 +1843,7 @@ void ECBackend::ReadPipeline::start_read_op(
   do_read_op(op);
 }
 
-void ECBackend::ReadPipeline::do_read_op(ReadOp &op)
+void ECCommon::ReadPipeline::do_read_op(ReadOp &op)
 {
   int priority = op.priority;
   ceph_tid_t tid = op.tid;
@@ -2406,15 +2406,15 @@ void ECBackend::objects_read_async(
           on_complete)));
 }
 
-struct ClientReadCompleter : ECBackend::ReadCompleter {
-  ClientReadCompleter(ECBackend::ReadPipeline &read_pipeline,
-                      ECBackend::ClientAsyncReadStatus *status)
+struct ClientReadCompleter : ECCommon::ReadCompleter {
+  ClientReadCompleter(ECCommon::ReadPipeline &read_pipeline,
+                      ECCommon::ClientAsyncReadStatus *status)
     : read_pipeline(read_pipeline),
       status(status) {}
 
   void finish_single_request(
     const hobject_t &hoid,
-    ECBackend::read_result_t &res,
+    ECCommon::read_result_t &res,
     list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read) override
   {
     extent_map result;
@@ -2465,8 +2465,8 @@ out:
     // NOP
   }
 
-  ECBackend::ReadPipeline &read_pipeline;
-  ECBackend::ClientAsyncReadStatus *status;
+  ECCommon::ReadPipeline &read_pipeline;
+  ECCommon::ClientAsyncReadStatus *status;
 };
 
 
@@ -2481,7 +2481,7 @@ void ECBackend::objects_read_and_reconstruct(
     *this, reads, fast_read, std::move(func));
 }
 
-void ECBackend::ReadPipeline::objects_read_and_reconstruct(
+void ECCommon::ReadPipeline::objects_read_and_reconstruct(
   ECBackend& ec_backend,
   const map<hobject_t,
     std::list<boost::tuple<uint64_t, uint64_t, uint32_t> >
@@ -2532,7 +2532,7 @@ void ECBackend::ReadPipeline::objects_read_and_reconstruct(
 }
 
 
-int ECBackend::ReadPipeline::send_all_remaining_reads(
+int ECCommon::ReadPipeline::send_all_remaining_reads(
   const hobject_t &hoid,
   ReadOp &rop)
 {
@@ -2568,7 +2568,7 @@ int ECBackend::ReadPipeline::send_all_remaining_reads(
   return 0;
 }
 
-void ECBackend::ReadPipeline::kick_reads()
+void ECCommon::ReadPipeline::kick_reads()
 {
   while (in_progress_client_reads.size() &&
          in_progress_client_reads.front().is_complete()) {
index 2a351113f3fffcab26fea963f65f343212491b92..30b6f017f96817f0dff78c8e95689d802fb7bc95 100644 (file)
@@ -94,7 +94,231 @@ struct RecoveryMessages;
        const hobject_t &soid,
        const object_stat_sum_t &delta_stats) = 0;
   };
-class ECBackend : public PGBackend {
+
+struct ECBackend;
+struct ECCommon {
+  struct read_request_t {
+    const std::list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read;
+    std::map<pg_shard_t, std::vector<std::pair<int, int>>> need;
+    bool want_attrs;
+    read_request_t(
+      const std::list<boost::tuple<uint64_t, uint64_t, uint32_t> > &to_read,
+      const std::map<pg_shard_t, std::vector<std::pair<int, int>>> &need,
+      bool want_attrs)
+      : to_read(to_read), need(need), want_attrs(want_attrs) {}
+  };
+  friend ostream &operator<<(ostream &lhs, const read_request_t &rhs);
+  struct ReadOp;
+  /**
+   * Low level async read mechanism
+   *
+   * To avoid duplicating the logic for requesting and waiting for
+   * multiple object shards, there is a common async read mechanism
+   * taking a std::map of hobject_t->read_request_t which defines callbacks
+   * taking read_result_ts as arguments.
+   *
+   * tid_to_read_map gives open read ops.  check_recovery_sources uses
+   * shard_to_read_map and ReadOp::source_to_obj to restart reads
+   * involving down osds.
+   *
+   * The user is responsible for specifying replicas on which to read
+   * and for reassembling the buffer on the other side since client
+   * reads require the original object buffer while recovery only needs
+   * the missing pieces.
+   *
+   * Rather than handling reads on the primary directly, we simply send
+   * ourselves a message.  This avoids a dedicated primary path for that
+   * part.
+   */
+  struct read_result_t {
+    int r;
+    std::map<pg_shard_t, int> errors;
+    std::optional<std::map<std::string, ceph::buffer::list, std::less<>> > attrs;
+    std::list<
+      boost::tuple<
+       uint64_t, uint64_t, std::map<pg_shard_t, ceph::buffer::list> > > returned;
+    read_result_t() : r(0) {}
+  };
+
+  struct ReadCompleter {
+    virtual void finish_single_request(
+      const hobject_t &hoid,
+      read_result_t &res,
+      std::list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read) = 0;
+
+    virtual void finish(int priority) && = 0;
+
+    virtual ~ReadCompleter() = default;
+  };
+
+  friend struct CallClientContexts;
+  struct ClientAsyncReadStatus {
+    unsigned objects_to_read;
+    GenContextURef<std::map<hobject_t,std::pair<int, extent_map> > &&> func;
+    std::map<hobject_t,std::pair<int, extent_map> > results;
+    explicit ClientAsyncReadStatus(
+      unsigned objects_to_read,
+      GenContextURef<std::map<hobject_t,std::pair<int, extent_map> > &&> &&func)
+      : objects_to_read(objects_to_read), func(std::move(func)) {}
+    void complete_object(
+      const hobject_t &hoid,
+      int err,
+      extent_map &&buffers) {
+      ceph_assert(objects_to_read);
+      --objects_to_read;
+      ceph_assert(!results.count(hoid));
+      results.emplace(hoid, std::make_pair(err, std::move(buffers)));
+    }
+    bool is_complete() const {
+      return objects_to_read == 0;
+    }
+    void run() {
+      func.release()->complete(std::move(results));
+    }
+  };
+
+  struct ReadOp {
+    int priority;
+    ceph_tid_t tid;
+    OpRequestRef op; // may be null if not on behalf of a client
+    // True if redundant reads are issued, false otherwise,
+    // this is useful to tradeoff some resources (redundant ops) for
+    // low latency read, especially on relatively idle cluster
+    bool do_redundant_reads;
+    // True if reading for recovery which could possibly reading only a subset
+    // of the available shards.
+    bool for_recovery;
+    std::unique_ptr<ReadCompleter> on_complete;
+
+    ZTracer::Trace trace;
+
+    std::map<hobject_t, std::set<int>> want_to_read;
+    std::map<hobject_t, read_request_t> to_read;
+    std::map<hobject_t, read_result_t> complete;
+
+    std::map<hobject_t, std::set<pg_shard_t>> obj_to_source;
+    std::map<pg_shard_t, std::set<hobject_t> > source_to_obj;
+
+    void dump(ceph::Formatter *f) const;
+
+    std::set<pg_shard_t> in_progress;
+
+    ReadOp(
+      int priority,
+      ceph_tid_t tid,
+      bool do_redundant_reads,
+      bool for_recovery,
+      std::unique_ptr<ReadCompleter> _on_complete,
+      OpRequestRef op,
+      std::map<hobject_t, std::set<int>> &&_want_to_read,
+      std::map<hobject_t, read_request_t> &&_to_read)
+      : priority(priority),
+        tid(tid),
+        op(op),
+        do_redundant_reads(do_redundant_reads),
+        for_recovery(for_recovery),
+        on_complete(std::move(_on_complete)),
+        want_to_read(std::move(_want_to_read)),
+       to_read(std::move(_to_read)) {
+      for (auto &&hpair: to_read) {
+       auto &returned = complete[hpair.first].returned;
+       for (auto &&extent: hpair.second.to_read) {
+         returned.push_back(
+           boost::make_tuple(
+             extent.get<0>(),
+             extent.get<1>(),
+             std::map<pg_shard_t, ceph::buffer::list>()));
+       }
+      }
+    }
+    ReadOp() = delete;
+    ReadOp(const ReadOp &) = delete; // due to on_complete being unique_ptr
+    ReadOp(ReadOp &&) = default;
+  };
+  struct ReadPipeline {
+    void objects_read_and_reconstruct(
+      ECBackend& ecbackend,
+      const std::map<hobject_t, std::list<boost::tuple<uint64_t, uint64_t, uint32_t> >
+      > &reads,
+      bool fast_read,
+      GenContextURef<std::map<hobject_t,std::pair<int, extent_map> > &&> &&func);
+
+    template <class F>
+    void filter_read_op(
+      const OSDMapRef& osdmap,
+      ReadOp &op,
+      F&& on_erase);
+
+    template <class F>
+    void check_recovery_sources(const OSDMapRef& osdmap, F&& on_erase);
+
+    void complete_read_op(ReadOp &rop);
+
+    void start_read_op(
+      int priority,
+      std::map<hobject_t, std::set<int>> &want_to_read,
+      std::map<hobject_t, read_request_t> &to_read,
+      OpRequestRef op,
+      bool do_redundant_reads,
+      bool for_recovery,
+      std::unique_ptr<ReadCompleter> on_complete);
+
+    void do_read_op(ReadOp &rop);
+
+    int send_all_remaining_reads(
+      const hobject_t &hoid,
+      ReadOp &rop);
+
+    void on_change();
+
+    void kick_reads();
+
+    std::map<ceph_tid_t, ReadOp> tid_to_read_map;
+    std::map<pg_shard_t, std::set<ceph_tid_t> > shard_to_read_map;
+    std::list<ClientAsyncReadStatus> in_progress_client_reads;
+
+    CephContext* cct;
+    ceph::ErasureCodeInterfaceRef ec_impl;
+    const ECUtil::stripe_info_t& sinfo;
+    // TODO: lay an interface down here
+    ECListener* parent;
+
+    ECListener *get_parent() const { return parent; }
+    const OSDMapRef& get_osdmap() const { return get_parent()->pgb_get_osdmap(); }
+    epoch_t get_osdmap_epoch() const { return get_parent()->pgb_get_osdmap_epoch(); }
+    const pg_info_t &get_info() { return get_parent()->get_info(); }
+
+    ReadPipeline(CephContext* cct,
+                ceph::ErasureCodeInterfaceRef ec_impl,
+                const ECUtil::stripe_info_t& sinfo,
+                ECListener* parent)
+      : cct(cct),
+        ec_impl(std::move(ec_impl)),
+        sinfo(sinfo),
+        parent(parent) {
+    }
+
+    int get_remaining_shards(
+      const hobject_t &hoid,
+      const std::set<int> &avail,
+      const std::set<int> &want,
+      const read_result_t &result,
+      std::map<pg_shard_t, std::vector<std::pair<int, int>>> *to_read,
+      bool for_recovery);
+
+    void get_all_avail_shards(
+      const hobject_t &hoid,
+      const std::set<pg_shard_t> &error_shards,
+      std::set<int> &have,
+      std::map<shard_id_t, pg_shard_t> &shards,
+      bool for_recovery);
+
+    friend ostream &operator<<(ostream &lhs, const ReadOp &rhs);
+    friend struct FinishReadOp;
+  };
+};
+
+class ECBackend : public PGBackend, public ECCommon {
 public:
   RecoveryHandle *open_recovery_op() override;
 
@@ -205,31 +429,6 @@ public:
     bool fast_read,
     GenContextURef<std::map<hobject_t,std::pair<int, extent_map> > &&> &&func);
 
-  friend struct CallClientContexts;
-  struct ClientAsyncReadStatus {
-    unsigned objects_to_read;
-    GenContextURef<std::map<hobject_t,std::pair<int, extent_map> > &&> func;
-    std::map<hobject_t,std::pair<int, extent_map> > results;
-    explicit ClientAsyncReadStatus(
-      unsigned objects_to_read,
-      GenContextURef<std::map<hobject_t,std::pair<int, extent_map> > &&> &&func)
-      : objects_to_read(objects_to_read), func(std::move(func)) {}
-    void complete_object(
-      const hobject_t &hoid,
-      int err,
-      extent_map &&buffers) {
-      ceph_assert(objects_to_read);
-      --objects_to_read;
-      ceph_assert(!results.count(hoid));
-      results.emplace(hoid, std::make_pair(err, std::move(buffers)));
-    }
-    bool is_complete() const {
-      return objects_to_read == 0;
-    }
-    void run() {
-      func.release()->complete(std::move(results));
-    }
-  };
   void objects_read_async(
     const hobject_t &hoid,
     const std::list<std::pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
@@ -246,6 +445,7 @@ private:
                        sinfo.get_stripe_width());
   }
 
+public:
   void get_want_to_read_shards(std::set<int> *want_to_read) const {
     const std::vector<int> &chunk_mapping = ec_impl->get_chunk_mapping();
     for (int i = 0; i < (int)ec_impl->get_data_chunk_count(); ++i) {
@@ -253,6 +453,7 @@ private:
       want_to_read->insert(chunk);
     }
   }
+private:
 
   /**
    * Recovery
@@ -350,199 +551,7 @@ private:
     RecoveryMessages *m);
 
 public:
-  /**
-   * Low level async read mechanism
-   *
-   * To avoid duplicating the logic for requesting and waiting for
-   * multiple object shards, there is a common async read mechanism
-   * taking a std::map of hobject_t->read_request_t which defines callbacks
-   * taking read_result_ts as arguments.
-   *
-   * tid_to_read_map gives open read ops.  check_recovery_sources uses
-   * shard_to_read_map and ReadOp::source_to_obj to restart reads
-   * involving down osds.
-   *
-   * The user is responsible for specifying replicas on which to read
-   * and for reassembling the buffer on the other side since client
-   * reads require the original object buffer while recovery only needs
-   * the missing pieces.
-   *
-   * Rather than handling reads on the primary directly, we simply send
-   * ourselves a message.  This avoids a dedicated primary path for that
-   * part.
-   */
-  struct read_result_t {
-    int r;
-    std::map<pg_shard_t, int> errors;
-    std::optional<std::map<std::string, ceph::buffer::list, std::less<>> > attrs;
-    std::list<
-      boost::tuple<
-       uint64_t, uint64_t, std::map<pg_shard_t, ceph::buffer::list> > > returned;
-    read_result_t() : r(0) {}
-  };
-  struct read_request_t {
-    const std::list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read;
-    std::map<pg_shard_t, std::vector<std::pair<int, int>>> need;
-    bool want_attrs;
-    read_request_t(
-      const std::list<boost::tuple<uint64_t, uint64_t, uint32_t> > &to_read,
-      const std::map<pg_shard_t, std::vector<std::pair<int, int>>> &need,
-      bool want_attrs)
-      : to_read(to_read), need(need), want_attrs(want_attrs) {}
-  };
-  friend ostream &operator<<(ostream &lhs, const read_request_t &rhs);
-
-  struct ReadCompleter {
-    virtual void finish_single_request(
-      const hobject_t &hoid,
-      ECBackend::read_result_t &res,
-      std::list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read) = 0;
-
-    virtual void finish(int priority) && = 0;
-
-    virtual ~ReadCompleter() = default;
-  };
-
-  struct ReadOp {
-    int priority;
-    ceph_tid_t tid;
-    OpRequestRef op; // may be null if not on behalf of a client
-    // True if redundant reads are issued, false otherwise,
-    // this is useful to tradeoff some resources (redundant ops) for
-    // low latency read, especially on relatively idle cluster
-    bool do_redundant_reads;
-    // True if reading for recovery which could possibly reading only a subset
-    // of the available shards.
-    bool for_recovery;
-    std::unique_ptr<ReadCompleter> on_complete;
-
-    ZTracer::Trace trace;
-
-    std::map<hobject_t, std::set<int>> want_to_read;
-    std::map<hobject_t, read_request_t> to_read;
-    std::map<hobject_t, read_result_t> complete;
-
-    std::map<hobject_t, std::set<pg_shard_t>> obj_to_source;
-    std::map<pg_shard_t, std::set<hobject_t> > source_to_obj;
-
-    void dump(ceph::Formatter *f) const;
-
-    std::set<pg_shard_t> in_progress;
-
-    ReadOp(
-      int priority,
-      ceph_tid_t tid,
-      bool do_redundant_reads,
-      bool for_recovery,
-      std::unique_ptr<ReadCompleter> _on_complete,
-      OpRequestRef op,
-      std::map<hobject_t, std::set<int>> &&_want_to_read,
-      std::map<hobject_t, read_request_t> &&_to_read)
-      : priority(priority),
-        tid(tid),
-        op(op),
-        do_redundant_reads(do_redundant_reads),
-        for_recovery(for_recovery),
-        on_complete(std::move(_on_complete)),
-        want_to_read(std::move(_want_to_read)),
-       to_read(std::move(_to_read)) {
-      for (auto &&hpair: to_read) {
-       auto &returned = complete[hpair.first].returned;
-       for (auto &&extent: hpair.second.to_read) {
-         returned.push_back(
-           boost::make_tuple(
-             extent.get<0>(),
-             extent.get<1>(),
-             std::map<pg_shard_t, ceph::buffer::list>()));
-       }
-      }
-    }
-    ReadOp() = delete;
-    ReadOp(const ReadOp &) = delete; // due to on_complete being unique_ptr
-    ReadOp(ReadOp &&) = default;
-  };
-
-  struct ReadPipeline {
-    void objects_read_and_reconstruct(
-      ECBackend& ecbackend,
-      const std::map<hobject_t, std::list<boost::tuple<uint64_t, uint64_t, uint32_t> >
-      > &reads,
-      bool fast_read,
-      GenContextURef<std::map<hobject_t,std::pair<int, extent_map> > &&> &&func);
-
-    template <class F>
-    void filter_read_op(
-      const OSDMapRef& osdmap,
-      ReadOp &op,
-      F&& on_erase);
-
-    template <class F>
-    void check_recovery_sources(const OSDMapRef& osdmap, F&& on_erase);
-
-    void complete_read_op(ReadOp &rop);
-
-    void start_read_op(
-      int priority,
-      std::map<hobject_t, std::set<int>> &want_to_read,
-      std::map<hobject_t, read_request_t> &to_read,
-      OpRequestRef op,
-      bool do_redundant_reads,
-      bool for_recovery,
-      std::unique_ptr<ReadCompleter> on_complete);
-
-    void do_read_op(ReadOp &rop);
-
-    int send_all_remaining_reads(
-      const hobject_t &hoid,
-      ReadOp &rop);
-
-    void on_change();
-
-    void kick_reads();
-
-    std::map<ceph_tid_t, ReadOp> tid_to_read_map;
-    std::map<pg_shard_t, std::set<ceph_tid_t> > shard_to_read_map;
-    std::list<ClientAsyncReadStatus> in_progress_client_reads;
-
-    CephContext* cct;
-    ceph::ErasureCodeInterfaceRef ec_impl;
-    const ECUtil::stripe_info_t& sinfo;
-    // TODO: lay an interface down here
-    ECListener* parent;
-
-    ECListener *get_parent() const { return parent; }
-    const OSDMapRef& get_osdmap() const { return get_parent()->pgb_get_osdmap(); }
-    epoch_t get_osdmap_epoch() const { return get_parent()->pgb_get_osdmap_epoch(); }
-    const pg_info_t &get_info() { return get_parent()->get_info(); }
-
-    ReadPipeline(CephContext* cct,
-                ceph::ErasureCodeInterfaceRef ec_impl,
-                const ECUtil::stripe_info_t& sinfo,
-                ECListener* parent)
-      : cct(cct),
-        ec_impl(std::move(ec_impl)),
-        sinfo(sinfo),
-        parent(parent) {
-    }
-
-    int get_remaining_shards(
-      const hobject_t &hoid,
-      const std::set<int> &avail,
-      const std::set<int> &want,
-      const read_result_t &result,
-      std::map<pg_shard_t, std::vector<std::pair<int, int>>> *to_read,
-      bool for_recovery);
-
-    void get_all_avail_shards(
-      const hobject_t &hoid,
-      const std::set<pg_shard_t> &error_shards,
-      std::set<int> &have,
-      std::map<shard_id_t, pg_shard_t> &shards,
-      bool for_recovery);
-
-    friend ostream &operator<<(ostream &lhs, const ReadOp &rhs);
-    friend struct FinishReadOp;
-  } read_pipeline;
+  struct ReadPipeline read_pipeline;
 
 
   /**