]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/common: parallel_for_each shouldn't accept rvalue reference of containers
authorXuehan Xu <xxhdx1985126@gmail.com>
Sat, 20 Aug 2022 04:30:26 +0000 (12:30 +0800)
committerXuehan Xu <xxhdx1985126@gmail.com>
Wed, 24 Aug 2022 02:26:25 +0000 (10:26 +0800)
Signed-off-by: Xuehan Xu <xxhdx1985126@gmail.com>
src/crimson/common/interruptible_future.h
src/crimson/os/seastore/async_cleaner.cc
src/crimson/os/seastore/transaction_manager.cc
src/crimson/osd/pg_recovery.cc
src/crimson/osd/recovery_backend.cc
src/crimson/osd/replicated_recovery_backend.cc

index 1cb6419700ce1dc431962030da867e57e5b21cac..61660a217bccddeb42fb21c1324129339a9ef9e4 100644 (file)
@@ -1321,7 +1321,7 @@ public:
   }
 
   template <typename Container, typename Func>
-  static inline auto parallel_for_each(Container&& container, Func&& func) noexcept {
+  static inline auto parallel_for_each(Container& container, Func&& func) noexcept {
     return parallel_for_each(
            std::begin(container),
            std::end(container),
index 3ead6ff012770cd8e39de2466214058cd752a78a..e92f2d2092f9e1782d59089b391f2f4dc60a06a1 100644 (file)
@@ -906,27 +906,32 @@ AsyncCleaner::do_reclaim_space(
         // retrieve live extents
         DEBUGT("start, backref_entries={}, backref_extents={}",
                t, backref_entries.size(), extents.size());
-        return trans_intr::parallel_for_each(
-          backref_entries,
-          [this, FNAME, &extents, &t](auto &ent)
-        {
-          TRACET("getting extent of type {} at {}~{}",
-            t,
-            ent.type,
-            ent.paddr,
-            ent.len);
-          return ecb->get_extents_if_live(
-            t, ent.type, ent.paddr, ent.laddr, ent.len
-          ).si_then([FNAME, &extents, &ent, &t](auto list) {
-            if (list.empty()) {
-              TRACET("addr {} dead, skipping", t, ent.paddr);
-            } else {
-              for (auto &e : list) {
-                extents.emplace_back(std::move(e));
-              }
-            }
-          });
-        }).si_then([FNAME, &extents, this, &reclaimed, &t] {
+       return seastar::do_with(
+         std::move(backref_entries),
+         [this, &extents, &t](auto &backref_entries) {
+         return trans_intr::parallel_for_each(
+           backref_entries,
+           [this, &extents, &t](auto &ent)
+         {
+           LOG_PREFIX(AsyncCleaner::do_reclaim_space);
+           TRACET("getting extent of type {} at {}~{}",
+             t,
+             ent.type,
+             ent.paddr,
+             ent.len);
+           return ecb->get_extents_if_live(
+             t, ent.type, ent.paddr, ent.laddr, ent.len
+           ).si_then([FNAME, &extents, &ent, &t](auto list) {
+             if (list.empty()) {
+               TRACET("addr {} dead, skipping", t, ent.paddr);
+             } else {
+               for (auto &e : list) {
+                 extents.emplace_back(std::move(e));
+               }
+             }
+           });
+         });
+       }).si_then([FNAME, &extents, this, &reclaimed, &t] {
           DEBUGT("reclaim {} extents", t, extents.size());
           // rewrite live extents
           auto modify_time = segments[reclaim_state->get_segment_id()].modify_time;
index da95f5351a417b2f4188dfb783f50cd3665b0ccb..d4b2bfc607e775ac4dd7fde76175d0e7c18d06d2 100644 (file)
@@ -1,5 +1,5 @@
 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
+// vim: ts=8 sw=2 smarttab expandtab
 
 #include "include/denc.h"
 #include "include/intarith.h"
@@ -561,7 +561,7 @@ TransactionManager::get_extents_if_live(
         {
           auto paddr_seg_id = paddr.as_seg_paddr().get_segment_id();
           return trans_intr::parallel_for_each(
-            std::move(pin_list),
+            pin_list,
             [=, this, &list, &t](
               LBAPinRef &pin) -> Cache::get_extent_iertr::future<>
           {
index 78dd07976cbb7a808b661251b7736bd6cc040a4e..abc123c69e480864ea7bbdd0ea3fff55f46c569a 100644 (file)
@@ -59,7 +59,7 @@ PGRecovery::start_recovery_ops(
   }
   using interruptor =
     crimson::interruptible::interruptor<crimson::osd::IOInterruptCondition>;
-  return interruptor::parallel_for_each(std::move(started),
+  return interruptor::parallel_for_each(started,
                                        [] (auto&& ifut) {
     return std::move(ifut);
   }).then_interruptible([this] {
index 76ca1bd8b0dff1b8c1496d0548359b617e86b725..89d74798806bb00d35de3d9e86e25dd5e4f7214b 100644 (file)
@@ -174,35 +174,39 @@ RecoveryBackend::scan_for_backfill(
   return backend->list_objects(start, max).then_interruptible(
     [this, start, version_map] (auto&& ret) {
     auto&& [objects, next] = std::move(ret);
-    return interruptor::parallel_for_each(std::move(objects),
-      [this, version_map] (const hobject_t& object)
-      -> interruptible_future<> {
-      crimson::osd::ObjectContextRef obc;
-      if (pg.is_primary()) {
-        obc = shard_services.maybe_get_cached_obc(object);
-      }
-      if (obc) {
-        if (obc->obs.exists) {
-          logger().debug("scan_for_backfill found (primary): {}  {}",
-                         object, obc->obs.oi.version);
-          version_map->emplace(object, obc->obs.oi.version);
-        } else {
-          // if the object does not exist here, it must have been removed
-          // between the collection_list_partial and here.  This can happen
-          // for the first item in the range, which is usually last_backfill.
-        }
-        return seastar::now();
-      } else {
-        return backend->load_metadata(object).safe_then_interruptible(
-          [version_map, object] (auto md) {
-          if (md->os.exists) {
-            logger().debug("scan_for_backfill found: {}  {}",
-                           object, md->os.oi.version);
-            version_map->emplace(object, md->os.oi.version);
-          }
-          return seastar::now();
-        }, PGBackend::load_metadata_ertr::assert_all{});
-      }
+    return seastar::do_with(
+      std::move(objects),
+      [this, version_map](auto &objects) {
+      return interruptor::parallel_for_each(objects,
+       [this, version_map] (const hobject_t& object)
+       -> interruptible_future<> {
+       crimson::osd::ObjectContextRef obc;
+       if (pg.is_primary()) {
+         obc = shard_services.maybe_get_cached_obc(object);
+       }
+       if (obc) {
+         if (obc->obs.exists) {
+           logger().debug("scan_for_backfill found (primary): {}  {}",
+                          object, obc->obs.oi.version);
+           version_map->emplace(object, obc->obs.oi.version);
+         } else {
+           // if the object does not exist here, it must have been removed
+           // between the collection_list_partial and here.  This can happen
+           // for the first item in the range, which is usually last_backfill.
+         }
+         return seastar::now();
+       } else {
+         return backend->load_metadata(object).safe_then_interruptible(
+           [version_map, object] (auto md) {
+           if (md->os.exists) {
+             logger().debug("scan_for_backfill found: {}  {}",
+                            object, md->os.oi.version);
+             version_map->emplace(object, md->os.oi.version);
+           }
+           return seastar::now();
+         }, PGBackend::load_metadata_ertr::assert_all{});
+       }
+      });
     }).then_interruptible([version_map, start=std::move(start), next=std::move(next), this] {
       BackfillInterval bi;
       bi.begin = std::move(start);
index e5660a388f0d703b761bd3128cfb8c61262d2b49..3922e5085c8ff29ad1db841d90ea3969b6ce88e0 100644 (file)
@@ -54,23 +54,28 @@ ReplicatedRecoveryBackend::maybe_push_shards(
   const hobject_t& soid,
   eversion_t need)
 {
-  return interruptor::parallel_for_each(get_shards_to_push(soid),
-    [this, need, soid](auto shard) {
-    return prep_push(soid, need, shard).then_interruptible([this, soid, shard](auto push) {
-      auto msg = crimson::make_message<MOSDPGPush>();
-      msg->from = pg.get_pg_whoami();
-      msg->pgid = pg.get_pgid();
-      msg->map_epoch = pg.get_osdmap_epoch();
-      msg->min_epoch = pg.get_last_peering_reset();
-      msg->pushes.push_back(std::move(push));
-      msg->set_priority(pg.get_recovery_op_priority());
-      return interruptor::make_interruptible(
-         shard_services.send_to_osd(shard.osd,
-                                    std::move(msg),
-                                    pg.get_osdmap_epoch()))
-      .then_interruptible(
-        [this, soid, shard] {
-       return get_recovering(soid).wait_for_pushes(shard);
+  return seastar::do_with(
+    get_shards_to_push(soid),
+    [this, need, soid](auto &shards) {
+    return interruptor::parallel_for_each(
+      shards,
+      [this, need, soid](auto shard) {
+      return prep_push(soid, need, shard).then_interruptible([this, soid, shard](auto push) {
+        auto msg = crimson::make_message<MOSDPGPush>();
+        msg->from = pg.get_pg_whoami();
+        msg->pgid = pg.get_pgid();
+        msg->map_epoch = pg.get_osdmap_epoch();
+        msg->min_epoch = pg.get_last_peering_reset();
+        msg->pushes.push_back(std::move(push));
+        msg->set_priority(pg.get_recovery_op_priority());
+        return interruptor::make_interruptible(
+            shard_services.send_to_osd(shard.osd,
+                                       std::move(msg),
+                                       pg.get_osdmap_epoch()))
+        .then_interruptible(
+          [this, soid, shard] {
+          return get_recovering(soid).wait_for_pushes(shard);
+        });
       });
     });
   }).then_interruptible([this, soid] {