]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: dissect the EC read pipeline from ECBackend into dedicated class
authorRadosław Zarzyński <rzarzyns@redhat.com>
Thu, 6 Jul 2023 11:30:53 +0000 (13:30 +0200)
committerRadoslaw Zarzynski <rzarzyns@redhat.com>
Tue, 9 Jan 2024 15:09:53 +0000 (15:09 +0000)
Signed-off-by: Radosław Zarzyński <rzarzyns@redhat.com>
src/osd/ECBackend.cc
src/osd/ECBackend.h

index 906b1d664504fbbfbcfb42132a58980c925a2228..4bc7695ebb16e33f52e7022be4a33e1538216083 100644 (file)
@@ -58,6 +58,9 @@ static ostream& _prefix(std::ostream *_dout, ECBackend *pgb) {
 static ostream& _prefix(std::ostream *_dout, ECBackend::RMWPipeline *rmw_pipeline) {
   return rmw_pipeline->get_parent()->gen_dbg_prefix(*_dout);
 }
+static ostream& _prefix(std::ostream *_dout, ECBackend::ReadPipeline *read_pipeline) {
+  return read_pipeline->get_parent()->gen_dbg_prefix(*_dout);
+}
 
 struct ECRecoveryHandle : public PGBackend::RecoveryHandle {
   list<ECBackend::RecoveryOp> ops;
@@ -221,6 +224,7 @@ ECBackend::ECBackend(
   ErasureCodeInterfaceRef ec_impl,
   uint64_t stripe_width)
   : PGBackend(cct, pg, store, coll, ch),
+    read_pipeline(cct, ec_impl, this->sinfo, get_parent(), *this),
     rmw_pipeline(cct, ec_impl, this->sinfo, get_parent(), *this),
     ec_impl(ec_impl),
     sinfo(ec_impl->get_data_chunk_count(), stripe_width) {
@@ -593,7 +597,7 @@ void ECBackend::dispatch_recovery_messages(RecoveryMessages &m, int priority)
 
   if (m.recovery_reads.empty())
     return;
-  start_read_op(
+  read_pipeline.start_read_op(
     priority,
     m.want_to_read,
     m.recovery_reads,
@@ -1208,8 +1212,8 @@ void ECBackend::handle_sub_read_reply(
 {
   trace.event("ec sub read reply");
   dout(10) << __func__ << ": reply " << op << dendl;
-  map<ceph_tid_t, ReadOp>::iterator iter = tid_to_read_map.find(op.tid);
-  if (iter == tid_to_read_map.end()) {
+  map<ceph_tid_t, ReadOp>::iterator iter = read_pipeline.tid_to_read_map.find(op.tid);
+  if (iter == read_pipeline.tid_to_read_map.end()) {
     //canceled
     dout(20) << __func__ << ": dropped " << op << dendl;
     return;
@@ -1265,8 +1269,8 @@ void ECBackend::handle_sub_read_reply(
   }
 
   map<pg_shard_t, set<ceph_tid_t> >::iterator siter =
-                                       shard_to_read_map.find(from);
-  ceph_assert(siter != shard_to_read_map.end());
+                                       read_pipeline.shard_to_read_map.find(from);
+  ceph_assert(siter != read_pipeline.shard_to_read_map.end());
   ceph_assert(siter->second.count(op.tid));
   siter->second.erase(op.tid);
 
@@ -1298,7 +1302,7 @@ void ECBackend::handle_sub_read_reply(
          // During recovery there may be multiple osds with copies of the same shard,
          // so getting EIO from one may result in multiple passes through this code path.
          if (!rop.do_redundant_reads) {
-           int r = send_all_remaining_reads(iter->first, rop);
+           int r = read_pipeline.send_all_remaining_reads(iter->first, rop);
            if (r == 0) {
              // We changed the rop's to_read and not incrementing is_complete
              need_resend = true;
@@ -1337,18 +1341,18 @@ void ECBackend::handle_sub_read_reply(
     }
   }
   if (need_resend) {
-    do_read_op(rop);
+    read_pipeline.do_read_op(rop);
   } else if (rop.in_progress.empty() || 
              is_complete == rop.complete.size()) {
     dout(20) << __func__ << " Complete: " << rop << dendl;
     rop.trace.event("ec read complete");
-    complete_read_op(rop);
+    read_pipeline.complete_read_op(rop);
   } else {
     dout(10) << __func__ << " readop not complete: " << rop << dendl;
   }
 }
 
-void ECBackend::complete_read_op(ReadOp &rop)
+void ECBackend::ReadPipeline::complete_read_op(ReadOp &rop)
 {
   map<hobject_t, read_request_t>::iterator reqiter =
     rop.to_read.begin();
@@ -1376,17 +1380,18 @@ void ECBackend::complete_read_op(ReadOp &rop)
 }
 
 struct FinishReadOp : public GenContext<ThreadPool::TPHandle&>  {
-  ECBackend *ec;
+  ECBackend::ReadPipeline& read_pipeline;
   ceph_tid_t tid;
-  FinishReadOp(ECBackend *ec, ceph_tid_t tid) : ec(ec), tid(tid) {}
+  FinishReadOp(ECBackend::ReadPipeline& read_pipeline, ceph_tid_t tid)
+    : read_pipeline(read_pipeline), tid(tid) {}
   void finish(ThreadPool::TPHandle&) override {
-    auto ropiter = ec->tid_to_read_map.find(tid);
-    ceph_assert(ropiter != ec->tid_to_read_map.end());
-    ec->complete_read_op(ropiter->second);
+    auto ropiter = read_pipeline.tid_to_read_map.find(tid);
+    ceph_assert(ropiter != read_pipeline.tid_to_read_map.end());
+    read_pipeline.complete_read_op(ropiter->second);
   }
 };
 
-void ECBackend::filter_read_op(
+void ECBackend::ReadPipeline::filter_read_op(
   const OSDMapRef& osdmap,
   ReadOp &op)
 {
@@ -1438,7 +1443,8 @@ void ECBackend::filter_read_op(
 
     op.to_read.erase(*i);
     op.complete.erase(*i);
-    recovery_ops.erase(*i);
+    // TODO: meh, this doesn't look like a part of the read pipeline
+    //recovery_ops.erase(*i);
   }
 
   if (op.in_progress.empty()) {
@@ -1459,21 +1465,22 @@ void ECBackend::filter_read_op(
      */
     get_parent()->schedule_recovery_work(
       get_parent()->bless_unlocked_gencontext(
-        new FinishReadOp(this, op.tid)),
+        new FinishReadOp(*this, op.tid)),
       1);
   }
 }
 
 void ECBackend::check_recovery_sources(const OSDMapRef& osdmap)
 {
+  // TODO: dissect into ReadPipeline
   set<ceph_tid_t> tids_to_filter;
   for (map<pg_shard_t, set<ceph_tid_t> >::iterator 
-       i = shard_to_read_map.begin();
-       i != shard_to_read_map.end();
+       i = read_pipeline.shard_to_read_map.begin();
+       i != read_pipeline.shard_to_read_map.end();
        ) {
     if (osdmap->is_down(i->first.osd)) {
       tids_to_filter.insert(i->second.begin(), i->second.end());
-      shard_to_read_map.erase(i++);
+      read_pipeline.shard_to_read_map.erase(i++);
     } else {
       ++i;
     }
@@ -1481,12 +1488,31 @@ void ECBackend::check_recovery_sources(const OSDMapRef& osdmap)
   for (set<ceph_tid_t>::iterator i = tids_to_filter.begin();
        i != tids_to_filter.end();
        ++i) {
-    map<ceph_tid_t, ReadOp>::iterator j = tid_to_read_map.find(*i);
-    ceph_assert(j != tid_to_read_map.end());
-    filter_read_op(osdmap, j->second);
+    map<ceph_tid_t, ReadOp>::iterator j = read_pipeline.tid_to_read_map.find(*i);
+    ceph_assert(j != read_pipeline.tid_to_read_map.end());
+    read_pipeline.filter_read_op(osdmap, j->second);
   }
 }
 
+void ECBackend::ReadPipeline::on_change()
+{
+  for (map<ceph_tid_t, ReadOp>::iterator i = tid_to_read_map.begin();
+       i != tid_to_read_map.end();
+       ++i) {
+    dout(10) << __func__ << ": cancelling " << i->second << dendl;
+    for (map<hobject_t, read_request_t>::iterator j =
+          i->second.to_read.begin();
+        j != i->second.to_read.end();
+        ++j) {
+      delete j->second.cb;
+      j->second.cb = nullptr;
+    }
+  }
+  tid_to_read_map.clear();
+  shard_to_read_map.clear();
+  in_progress_client_reads.clear();
+}
+
 void ECBackend::RMWPipeline::on_change()
 {
   dout(10) << __func__ << dendl;
@@ -1506,21 +1532,7 @@ void ECBackend::RMWPipeline::on_change()
 void ECBackend::on_change()
 {
   rmw_pipeline.on_change();
-  for (map<ceph_tid_t, ReadOp>::iterator i = tid_to_read_map.begin();
-       i != tid_to_read_map.end();
-       ++i) {
-    dout(10) << __func__ << ": cancelling " << i->second << dendl;
-    for (map<hobject_t, read_request_t>::iterator j =
-          i->second.to_read.begin();
-        j != i->second.to_read.end();
-        ++j) {
-      delete j->second.cb;
-      j->second.cb = nullptr;
-    }
-  }
-  tid_to_read_map.clear();
-  shard_to_read_map.clear();
-  in_progress_client_reads.clear();
+  read_pipeline.on_change();
   clear_recovery_state();
 }
 
@@ -1541,8 +1553,8 @@ void ECBackend::dump_recovery_info(Formatter *f) const
   }
   f->close_section();
   f->open_array_section("read_ops");
-  for (map<ceph_tid_t, ReadOp>::const_iterator i = tid_to_read_map.begin();
-       i != tid_to_read_map.end();
+  for (map<ceph_tid_t, ReadOp>::const_iterator i = read_pipeline.tid_to_read_map.begin();
+       i != read_pipeline.tid_to_read_map.end();
        ++i) {
     f->open_object_section("read_op");
     i->second.dump(f);
@@ -1658,7 +1670,7 @@ void ECBackend::RMWPipeline::call_write_ordered(std::function<void(void)> &&cb)
   }
 }
 
-void ECBackend::get_all_avail_shards(
+void ECBackend::ReadPipeline::get_all_avail_shards(
   const hobject_t &hoid,
   const set<pg_shard_t> &error_shards,
   set<int> &have,
@@ -1737,7 +1749,7 @@ int ECBackend::get_min_avail_to_read_shards(
   map<shard_id_t, pg_shard_t> shards;
   set<pg_shard_t> error_shards;
 
-  get_all_avail_shards(hoid, error_shards, have, shards, for_recovery);
+  read_pipeline.get_all_avail_shards(hoid, error_shards, have, shards, for_recovery);
 
   map<int, vector<pair<int, int>>> need;
   int r = ec_impl->minimum_to_decode(want, have, &need);
@@ -1762,7 +1774,7 @@ int ECBackend::get_min_avail_to_read_shards(
   return 0;
 }
 
-int ECBackend::get_remaining_shards(
+int ECBackend::ReadPipeline::get_remaining_shards(
   const hobject_t &hoid,
   const set<int> &avail,
   const set<int> &want,
@@ -1808,7 +1820,7 @@ int ECBackend::get_remaining_shards(
   return 0;
 }
 
-void ECBackend::start_read_op(
+void ECBackend::ReadPipeline::start_read_op(
   int priority,
   map<hobject_t, set<int>> &want_to_read,
   map<hobject_t, read_request_t> &to_read,
@@ -1838,7 +1850,7 @@ void ECBackend::start_read_op(
   do_read_op(op);
 }
 
-void ECBackend::do_read_op(ReadOp &op)
+void ECBackend::ReadPipeline::do_read_op(ReadOp &op)
 {
   int priority = op.priority;
   ceph_tid_t tid = op.tid;
@@ -2404,15 +2416,15 @@ void ECBackend::objects_read_async(
 struct CallClientContexts :
   public GenContext<ECBackend::read_result_t&> {
   hobject_t hoid;
-  ECBackend *ec;
+  ECBackend::ReadPipeline &read_pipeline;
   ECBackend::ClientAsyncReadStatus *status;
   list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read;
   CallClientContexts(
     hobject_t hoid,
-    ECBackend *ec,
+    ECBackend::ReadPipeline &read_pipeline,
     ECBackend::ClientAsyncReadStatus *status,
     const list<boost::tuple<uint64_t, uint64_t, uint32_t> > &to_read)
-    : hoid(hoid), ec(ec), status(status), to_read(to_read) {}
+      : hoid(hoid), read_pipeline(read_pipeline), status(status), to_read(to_read) {}
   void finish(ECBackend::read_result_t &res) override {
     extent_map result;
     if (res.r != 0)
@@ -2421,7 +2433,7 @@ struct CallClientContexts :
     ceph_assert(res.errors.empty());
     for (auto &&read: to_read) {
       pair<uint64_t, uint64_t> adjusted =
-       ec->sinfo.offset_len_to_stripe_bounds(
+       read_pipeline.sinfo.offset_len_to_stripe_bounds(
          make_pair(read.get<0>(), read.get<1>()));
       ceph_assert(res.returned.front().get<0>() == adjusted.first);
       ceph_assert(res.returned.front().get<1>() == adjusted.second);
@@ -2434,8 +2446,8 @@ struct CallClientContexts :
        to_decode[j->first.shard] = std::move(j->second);
       }
       int r = ECUtil::decode(
-       ec->sinfo,
-       ec->ec_impl,
+       read_pipeline.sinfo,
+       read_pipeline.ec_impl,
        to_decode,
        &bl);
       if (r < 0) {
@@ -2454,7 +2466,7 @@ struct CallClientContexts :
     }
 out:
     status->complete_object(hoid, res.r, std::move(result));
-    ec->kick_reads();
+    read_pipeline.kick_reads();
   }
 };
 
@@ -2465,6 +2477,32 @@ void ECBackend::objects_read_and_reconstruct(
   bool fast_read,
   GenContextURef<map<hobject_t,pair<int, extent_map> > &&> &&func)
 {
+  return read_pipeline.objects_read_and_reconstruct(
+    *this, reads, fast_read, std::move(func));
+}
+
+void ECBackend::ReadPipeline::objects_read_and_reconstruct(
+  ECBackend& ecbackend,
+  const map<hobject_t,
+    std::list<boost::tuple<uint64_t, uint64_t, uint32_t> >
+  > &reads,
+  bool fast_read,
+  GenContextURef<map<hobject_t,pair<int, extent_map> > &&> &&func)
+{
+  return [this,
+    kick_reads=[this] (auto...) { return this->kick_reads();},
+    get_want_to_read_shards=[&ecbackend] (auto&&... args) {
+      return ecbackend.get_want_to_read_shards(std::forward<decltype(args)>(args)...);
+    },
+    get_min_avail_to_read_shards=[&ecbackend] (auto&&... args) {
+      return ecbackend.get_min_avail_to_read_shards(std::forward<decltype(args)>(args)...);
+    },
+    cct=(CephContext*)nullptr,
+    // params
+    &reads,
+    fast_read,
+    func=std::move(func)
+  ]() mutable {
   in_progress_client_reads.emplace_back(
     reads.size(), std::move(func));
   if (!reads.size()) {
@@ -2489,7 +2527,7 @@ void ECBackend::objects_read_and_reconstruct(
 
     CallClientContexts *c = new CallClientContexts(
       to_read.first,
-      this,
+      *this,
       &(in_progress_client_reads.back()),
       to_read.second);
     for_read_op.insert(
@@ -2509,11 +2547,12 @@ void ECBackend::objects_read_and_reconstruct(
     for_read_op,
     OpRequestRef(),
     fast_read, false, nullptr);
+  }();
   return;
 }
 
 
-int ECBackend::send_all_remaining_reads(
+int ECBackend::ReadPipeline::send_all_remaining_reads(
   const hobject_t &hoid,
   ReadOp &rop)
 {
@@ -2551,6 +2590,20 @@ int ECBackend::send_all_remaining_reads(
   return 0;
 }
 
+void ECBackend::ReadPipeline::kick_reads()
+{
+  while (in_progress_client_reads.size() &&
+         in_progress_client_reads.front().is_complete()) {
+    in_progress_client_reads.front().run();
+    in_progress_client_reads.pop_front();
+  }
+}
+
+void ECBackend::kick_reads() {
+  read_pipeline.kick_reads();
+}
+
+
 int ECBackend::objects_get_attrs(
   const hobject_t &hoid,
   map<string, bufferlist, less<>> *out)
index d604a176ac7d62efe5f176ae739dbf201c198e9b..b3951f664a8f1d81167978b991ed0e222a6ac410 100644 (file)
@@ -168,7 +168,6 @@ public:
       func.release()->complete(std::move(results));
     }
   };
-  std::list<ClientAsyncReadStatus> in_progress_client_reads;
   void objects_read_async(
     const hobject_t &hoid,
     const std::list<std::pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
@@ -176,13 +175,7 @@ public:
     Context *on_complete,
     bool fast_read = false) override;
 
-  void kick_reads() {
-    while (in_progress_client_reads.size() &&
-          in_progress_client_reads.front().is_complete()) {
-      in_progress_client_reads.front().run();
-      in_progress_client_reads.pop_front();
-    }
-  }
+  void kick_reads();
 
 private:
   friend struct ECRecoveryHandle;
@@ -292,12 +285,6 @@ private:
     const PushReplyOp &op,
     pg_shard_t from,
     RecoveryMessages *m);
-  void get_all_avail_shards(
-    const hobject_t &hoid,
-    const std::set<pg_shard_t> &error_shards,
-    std::set<int> &have,
-    std::map<shard_id_t, pg_shard_t> &shards,
-    bool for_recovery);
 
 public:
   /**
@@ -403,28 +390,86 @@ public:
     ReadOp(const ReadOp &) = default;
     ReadOp(ReadOp &&) = default;
   };
-  friend struct FinishReadOp;
-  void filter_read_op(
-    const OSDMapRef& osdmap,
-    ReadOp &op);
-  void complete_read_op(ReadOp &rop);
-  friend ostream &operator<<(ostream &lhs, const ReadOp &rhs);
-
-  std::map<ceph_tid_t, ReadOp> tid_to_read_map;
-  std::map<pg_shard_t, std::set<ceph_tid_t> > shard_to_read_map;
-  void start_read_op(
-    int priority,
-    std::map<hobject_t, std::set<int>> &want_to_read,
-    std::map<hobject_t, read_request_t> &to_read,
-    OpRequestRef op,
-    bool do_redundant_reads,
-    bool for_recovery,
-    GenContext<int> *on_complete);
-
-  void do_read_op(ReadOp &rop);
-  int send_all_remaining_reads(
-    const hobject_t &hoid,
-    ReadOp &rop);
+
+  struct ReadPipeline {
+    void objects_read_and_reconstruct(
+      ECBackend& ecbackend,
+      const std::map<hobject_t, std::list<boost::tuple<uint64_t, uint64_t, uint32_t> >
+      > &reads,
+      bool fast_read,
+      GenContextURef<std::map<hobject_t,std::pair<int, extent_map> > &&> &&func);
+
+    void filter_read_op(
+      const OSDMapRef& osdmap,
+      ReadOp &op);
+
+    void complete_read_op(ReadOp &rop);
+
+    void start_read_op(
+      int priority,
+      std::map<hobject_t, std::set<int>> &want_to_read,
+      std::map<hobject_t, read_request_t> &to_read,
+      OpRequestRef op,
+      bool do_redundant_reads,
+      bool for_recovery,
+      GenContext<int> *on_complete);
+
+    void do_read_op(ReadOp &rop);
+
+    int send_all_remaining_reads(
+      const hobject_t &hoid,
+      ReadOp &rop);
+
+    void on_change();
+
+    void kick_reads();
+
+    std::map<ceph_tid_t, ReadOp> tid_to_read_map;
+    std::map<pg_shard_t, std::set<ceph_tid_t> > shard_to_read_map;
+    std::list<ClientAsyncReadStatus> in_progress_client_reads;
+
+    CephContext* cct;
+    ceph::ErasureCodeInterfaceRef ec_impl;
+    const ECUtil::stripe_info_t& sinfo;
+    PGBackend::Listener* parent;
+    // TODO: lay an interface down here
+    ECBackend& ec_backend;
+
+    PGBackend::Listener *get_parent() const { return parent; }
+    const OSDMapRef& get_osdmap() const { return get_parent()->pgb_get_osdmap(); }
+    epoch_t get_osdmap_epoch() const { return get_parent()->pgb_get_osdmap_epoch(); }
+    const pg_info_t &get_info() { return get_parent()->get_info(); }
+
+    ReadPipeline(CephContext* cct,
+                ceph::ErasureCodeInterfaceRef ec_impl,
+                const ECUtil::stripe_info_t& sinfo,
+                PGBackend::Listener* parent,
+                ECBackend& ec_backend)
+      : cct(cct),
+        ec_impl(std::move(ec_impl)),
+        sinfo(sinfo),
+        parent(parent),
+        ec_backend(ec_backend) {
+    }
+
+    int get_remaining_shards(
+      const hobject_t &hoid,
+      const std::set<int> &avail,
+      const std::set<int> &want,
+      const read_result_t &result,
+      std::map<pg_shard_t, std::vector<std::pair<int, int>>> *to_read,
+      bool for_recovery);
+
+    void get_all_avail_shards(
+      const hobject_t &hoid,
+      const std::set<pg_shard_t> &error_shards,
+      std::set<int> &have,
+      std::map<shard_id_t, pg_shard_t> &shards,
+      bool for_recovery);
+
+    friend ostream &operator<<(ostream &lhs, const ReadOp &rhs);
+    friend struct FinishReadOp;
+  } read_pipeline;
 
 
   /**
@@ -721,14 +766,6 @@ public:
     std::map<pg_shard_t, std::vector<std::pair<int, int>>> *to_read   ///< [out] shards, corresponding subchunks to read
     ); ///< @return error code, 0 on success
 
-  int get_remaining_shards(
-    const hobject_t &hoid,
-    const std::set<int> &avail,
-    const std::set<int> &want,
-    const read_result_t &result,
-    std::map<pg_shard_t, std::vector<std::pair<int, int>>> *to_read,
-    bool for_recovery);
-
   int objects_get_attrs(
     const hobject_t &hoid,
     std::map<std::string, ceph::buffer::list, std::less<>> *out) override;