]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
osd: abstract EC read pipeline's check_recovery_sources() from IO
authorRadosław Zarzyński <rzarzyns@redhat.com>
Fri, 15 Dec 2023 21:59:03 +0000 (22:59 +0100)
committerRadoslaw Zarzynski <rzarzyns@redhat.com>
Wed, 10 Jan 2024 17:30:28 +0000 (17:30 +0000)
For the sake of crimson.

Signed-off-by: Radosław Zarzyński <rzarzyns@redhat.com>
src/osd/ECBackend.cc
src/osd/ECCommon.cc
src/osd/ECCommon.h

index ecc5d5c823747f4235a3f7078925af81849e6893..8ae0fe8c5c5c371375efce3dd8ce38769e1302aa 100644 (file)
@@ -1316,11 +1316,28 @@ void ECBackend::handle_sub_read_reply(
 
 void ECBackend::check_recovery_sources(const OSDMapRef& osdmap)
 {
-#if 0
-  read_pipeline.check_recovery_sources(osdmap, [this] (const hobject_t& obj) {
-    recovery_ops.erase(obj);
-  });
-#endif
+  struct FinishReadOp : public GenContext<ThreadPool::TPHandle&>  {
+    ECCommon::ReadPipeline& read_pipeline;
+    ceph_tid_t tid;
+    FinishReadOp(ECCommon::ReadPipeline& read_pipeline, ceph_tid_t tid)
+      : read_pipeline(read_pipeline), tid(tid) {}
+    void finish(ThreadPool::TPHandle&) override {
+      auto ropiter = read_pipeline.tid_to_read_map.find(tid);
+      ceph_assert(ropiter != read_pipeline.tid_to_read_map.end());
+      read_pipeline.complete_read_op(ropiter->second);
+    }
+  };
+  read_pipeline.check_recovery_sources(
+    osdmap,
+    [this] (const hobject_t& obj) {
+      recovery_backend.recovery_ops.erase(obj);
+    },
+    [this] (const ReadOp& op) {
+      get_parent()->schedule_recovery_work(
+        get_parent()->bless_unlocked_gencontext(
+          new FinishReadOp(read_pipeline, op.tid)),
+        1);
+    });
 }
 
 void ECBackend::on_change()
index 5d7c7a48cb201664f45664d01fca9bc657dc082b..2d7acf15d18edeb5a188c1cf7f9a1f05b1568f1c 100644 (file)
@@ -182,31 +182,6 @@ void ECCommon::ReadPipeline::complete_read_op(ReadOp &rop)
   tid_to_read_map.erase(rop.tid);
 }
 
-struct FinishReadOp : public GenContext<ThreadPool::TPHandle&>  {
-  ECCommon::ReadPipeline& read_pipeline;
-  ceph_tid_t tid;
-  FinishReadOp(ECCommon::ReadPipeline& read_pipeline, ceph_tid_t tid)
-    : read_pipeline(read_pipeline), tid(tid) {}
-  void finish(ThreadPool::TPHandle&) override {
-    auto ropiter = read_pipeline.tid_to_read_map.find(tid);
-    ceph_assert(ropiter != read_pipeline.tid_to_read_map.end());
-    read_pipeline.complete_read_op(ropiter->second);
-  }
-};
-
-void ECCommon::ReadPipeline::schedule_recovery_work()
-{
-#ifndef WITH_SEASTAR
-    get_parent()->schedule_recovery_work(
-      get_parent()->bless_unlocked_gencontext(
-        nullptr), //new struct FinishReadOp(*this, op.tid)),
-      1);
-#else
-    // TODO
-    ceph_abort_msg("not yet implemented");
-#endif
-}
-
 void ECCommon::ReadPipeline::on_change()
 {
   for (map<ceph_tid_t, ReadOp>::iterator i = tid_to_read_map.begin();
index 1e30b6d80ff96caeb0093620665690b10e6807f8..3ceb3d295b2a98204e15b281f8a04af4953cef6e 100644 (file)
@@ -370,14 +370,18 @@ struct ECCommon {
       bool fast_read,
       GenContextURef<std::map<hobject_t,std::pair<int, extent_map> > &&> &&func);
 
-    template <class F>
+    template <class F, class G>
     void filter_read_op(
       const OSDMapRef& osdmap,
       ReadOp &op,
-      F&& on_erase);
+      F&& on_erase,
+      G&& on_schedule_recovery);
 
-    template <class F>
-    void check_recovery_sources(const OSDMapRef& osdmap, F&& on_erase);
+    template <class F, class G>
+    void check_recovery_sources(
+      const OSDMapRef& osdmap,
+      F&& on_erase,
+      G&& on_schedule_recovery);
 
     void complete_read_op(ReadOp &rop);
 
@@ -698,10 +702,11 @@ template <> struct fmt::formatter<ECCommon::read_result_t> : fmt::ostream_format
 template <> struct fmt::formatter<ECCommon::ReadOp> : fmt::ostream_formatter {};
 template <> struct fmt::formatter<ECCommon::RMWPipeline::Op> : fmt::ostream_formatter {};
 
-template <class F>
+template <class F, class G>
 void ECCommon::ReadPipeline::check_recovery_sources(
   const OSDMapRef& osdmap,
-  F&& on_erase)
+  F&& on_erase,
+  G&& on_schedule_recovery)
 {
   std::set<ceph_tid_t> tids_to_filter;
   for (std::map<pg_shard_t, std::set<ceph_tid_t> >::iterator 
@@ -720,15 +725,16 @@ void ECCommon::ReadPipeline::check_recovery_sources(
        ++i) {
     std::map<ceph_tid_t, ReadOp>::iterator j = tid_to_read_map.find(*i);
     ceph_assert(j != tid_to_read_map.end());
-    filter_read_op(osdmap, j->second, on_erase);
+    filter_read_op(osdmap, j->second, on_erase, on_schedule_recovery);
   }
 }
 
-template <class F>
+template <class F, class G>
 void ECCommon::ReadPipeline::filter_read_op(
   const OSDMapRef& osdmap,
   ReadOp &op,
-  F&& on_erase)
+  F&& on_erase,
+  G&& on_schedule_recovery)
 {
   std::set<hobject_t> to_cancel;
   for (std::map<pg_shard_t, std::set<hobject_t> >::iterator i = op.source_to_obj.begin();
@@ -790,6 +796,6 @@ void ECCommon::ReadPipeline::filter_read_op(
      *    the pull on the affected objects and pushes from in-memory buffers
      *    on any now complete unaffected objects.
      */
-    schedule_recovery_work();
+    on_schedule_recovery(op);
   }
 }