]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
osd: move templated ReadPipeline::check_recovery_sources() to .h
authorRadosław Zarzyński <rzarzyns@redhat.com>
Wed, 13 Dec 2023 14:08:08 +0000 (15:08 +0100)
committerRadoslaw Zarzynski <rzarzyns@redhat.com>
Wed, 10 Jan 2024 17:30:28 +0000 (17:30 +0000)
Behind shluffing, this commit includes also adding `std` namespaces
to dissecting the `schedule_recovery_work()` to keep it in .cc.

Signed-off-by: Radosław Zarzyński <rzarzyns@redhat.com>
src/osd/ECCommon.cc
src/osd/ECCommon.h

index 2c6534d081f157803f4036c3e6c8d5d154b00c54..5d7c7a48cb201664f45664d01fca9bc657dc082b 100644 (file)
@@ -194,111 +194,17 @@ struct FinishReadOp : public GenContext<ThreadPool::TPHandle&>  {
   }
 };
 
-template <class F>
-void ECCommon::ReadPipeline::filter_read_op(
-  const OSDMapRef& osdmap,
-  ReadOp &op,
-  F&& on_erase)
+void ECCommon::ReadPipeline::schedule_recovery_work()
 {
-  set<hobject_t> to_cancel;
-  for (map<pg_shard_t, set<hobject_t> >::iterator i = op.source_to_obj.begin();
-       i != op.source_to_obj.end();
-       ++i) {
-    if (osdmap->is_down(i->first.osd)) {
-      to_cancel.insert(i->second.begin(), i->second.end());
-      op.in_progress.erase(i->first);
-      continue;
-    }
-  }
-
-  if (to_cancel.empty())
-    return;
-
-  for (map<pg_shard_t, set<hobject_t> >::iterator i = op.source_to_obj.begin();
-       i != op.source_to_obj.end();
-       ) {
-    for (set<hobject_t>::iterator j = i->second.begin();
-        j != i->second.end();
-        ) {
-      if (to_cancel.count(*j))
-       i->second.erase(j++);
-      else
-       ++j;
-    }
-    if (i->second.empty()) {
-      op.source_to_obj.erase(i++);
-    } else {
-      ceph_assert(!osdmap->is_down(i->first.osd));
-      ++i;
-    }
-  }
-
-  for (set<hobject_t>::iterator i = to_cancel.begin();
-       i != to_cancel.end();
-       ++i) {
-    get_parent()->cancel_pull(*i);
-
-    ceph_assert(op.to_read.count(*i));
-    read_request_t &req = op.to_read.find(*i)->second;
-    dout(10) << __func__ << ": canceling " << req
-            << "  for obj " << *i << dendl;
-    op.to_read.erase(*i);
-    op.complete.erase(*i);
-    on_erase(*i);
-  }
-
-  if (op.in_progress.empty()) {
-    /* This case is odd.  filter_read_op gets called while processing
-     * an OSDMap.  Normal, non-recovery reads only happen from acting
-     * set osds.  For this op to have had a read source go down and
-     * there not be an interval change, it must be part of a pull during
-     * log-based recovery.
-     *
-     * This callback delays calling complete_read_op until later to avoid
-     * dealing with recovery while handling an OSDMap.  We assign a
-     * cost here of 1 because:
-     * 1) This should be very rare, and the operation itself was already
-     *    throttled.
-     * 2) It shouldn't result in IO, rather it should result in restarting
-     *    the pull on the affected objects and pushes from in-memory buffers
-     *    on any now complete unaffected objects.
-     */
 #ifndef WITH_SEASTAR
     get_parent()->schedule_recovery_work(
       get_parent()->bless_unlocked_gencontext(
-        new FinishReadOp(*this, op.tid)),
+        nullptr), //new struct FinishReadOp(*this, op.tid)),
       1);
 #else
     // TODO
     ceph_abort_msg("not yet implemented");
 #endif
-  }
-}
-
-template <class F>
-void ECCommon::ReadPipeline::check_recovery_sources(
-  const OSDMapRef& osdmap,
-  F&& on_erase)
-{
-  set<ceph_tid_t> tids_to_filter;
-  for (map<pg_shard_t, set<ceph_tid_t> >::iterator 
-       i = shard_to_read_map.begin();
-       i != shard_to_read_map.end();
-       ) {
-    if (osdmap->is_down(i->first.osd)) {
-      tids_to_filter.insert(i->second.begin(), i->second.end());
-      shard_to_read_map.erase(i++);
-    } else {
-      ++i;
-    }
-  }
-  for (set<ceph_tid_t>::iterator i = tids_to_filter.begin();
-       i != tids_to_filter.end();
-       ++i) {
-    map<ceph_tid_t, ReadOp>::iterator j = tid_to_read_map.find(*i);
-    ceph_assert(j != tid_to_read_map.end());
-    filter_read_op(osdmap, j->second, on_erase);
-  }
 }
 
 void ECCommon::ReadPipeline::on_change()
index f074a79abaf88b830cbbbaffb612c0d06e1444d4..1e30b6d80ff96caeb0093620665690b10e6807f8 100644 (file)
@@ -453,6 +453,8 @@ struct ECCommon {
       bool do_redundant_reads,   ///< [in] true if we want to issue redundant reads to reduce latency
       std::map<pg_shard_t, std::vector<std::pair<int, int>>> *to_read   ///< [out] shards, corresponding subchunks to read
       ); ///< @return error code, 0 on success
+
+    void schedule_recovery_work();
   };
 
   /**
@@ -695,3 +697,99 @@ template <> struct fmt::formatter<ECCommon::read_request_t> : fmt::ostream_forma
 template <> struct fmt::formatter<ECCommon::read_result_t> : fmt::ostream_formatter {};
 template <> struct fmt::formatter<ECCommon::ReadOp> : fmt::ostream_formatter {};
 template <> struct fmt::formatter<ECCommon::RMWPipeline::Op> : fmt::ostream_formatter {};
+
+template <class F>
+void ECCommon::ReadPipeline::check_recovery_sources(
+  const OSDMapRef& osdmap,
+  F&& on_erase)
+{
+  std::set<ceph_tid_t> tids_to_filter;
+  for (std::map<pg_shard_t, std::set<ceph_tid_t> >::iterator 
+       i = shard_to_read_map.begin();
+       i != shard_to_read_map.end();
+       ) {
+    if (osdmap->is_down(i->first.osd)) {
+      tids_to_filter.insert(i->second.begin(), i->second.end());
+      shard_to_read_map.erase(i++);
+    } else {
+      ++i;
+    }
+  }
+  for (std::set<ceph_tid_t>::iterator i = tids_to_filter.begin();
+       i != tids_to_filter.end();
+       ++i) {
+    std::map<ceph_tid_t, ReadOp>::iterator j = tid_to_read_map.find(*i);
+    ceph_assert(j != tid_to_read_map.end());
+    filter_read_op(osdmap, j->second, on_erase);
+  }
+}
+
+template <class F>
+void ECCommon::ReadPipeline::filter_read_op(
+  const OSDMapRef& osdmap,
+  ReadOp &op,
+  F&& on_erase)
+{
+  std::set<hobject_t> to_cancel;
+  for (std::map<pg_shard_t, std::set<hobject_t> >::iterator i = op.source_to_obj.begin();
+       i != op.source_to_obj.end();
+       ++i) {
+    if (osdmap->is_down(i->first.osd)) {
+      to_cancel.insert(i->second.begin(), i->second.end());
+      op.in_progress.erase(i->first);
+      continue;
+    }
+  }
+
+  if (to_cancel.empty())
+    return;
+
+  for (std::map<pg_shard_t, std::set<hobject_t> >::iterator i = op.source_to_obj.begin();
+       i != op.source_to_obj.end();
+       ) {
+    for (std::set<hobject_t>::iterator j = i->second.begin();
+        j != i->second.end();
+        ) {
+      if (to_cancel.count(*j))
+       i->second.erase(j++);
+      else
+       ++j;
+    }
+    if (i->second.empty()) {
+      op.source_to_obj.erase(i++);
+    } else {
+      ceph_assert(!osdmap->is_down(i->first.osd));
+      ++i;
+    }
+  }
+
+  for (std::set<hobject_t>::iterator i = to_cancel.begin();
+       i != to_cancel.end();
+       ++i) {
+    get_parent()->cancel_pull(*i);
+
+    ceph_assert(op.to_read.count(*i));
+    op.to_read.erase(*i);
+    op.complete.erase(*i);
+    on_erase(*i);
+  }
+
+  if (op.in_progress.empty()) {
+    /* This case is odd.  filter_read_op gets called while processing
+     * an OSDMap.  Normal, non-recovery reads only happen from acting
+     * set osds.  For this op to have had a read source go down and
+     * there not be an interval change, it must be part of a pull during
+     * log-based recovery.
+     *
+     * This callback delays calling complete_read_op until later to avoid
+     * dealing with recovery while handling an OSDMap.  We assign a
+     * cost here of 1 because:
+     * 1) This should be very rare, and the operation itself was already
+     *    throttled.
+     * 2) It shouldn't result in IO, rather it should result in restarting
+     *    the pull on the affected objects and pushes from in-memory buffers
+     *    on any now complete unaffected objects.
+     */
+    schedule_recovery_work();
+  }
+}