From: Xuehan Xu Date: Sat, 20 Aug 2022 04:30:26 +0000 (+0800) Subject: crimson/common: parallel_for_each shouldn't accept rvalue reference of containers X-Git-Tag: v18.0.0~175^2~1 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=d050a85ab426e369f9bbaae0d30616309caa174e;p=ceph-ci.git crimson/common: parallel_for_each shouldn't accept rvalue reference of containers Signed-off-by: Xuehan Xu --- diff --git a/src/crimson/common/interruptible_future.h b/src/crimson/common/interruptible_future.h index 1cb6419700c..61660a217bc 100644 --- a/src/crimson/common/interruptible_future.h +++ b/src/crimson/common/interruptible_future.h @@ -1321,7 +1321,7 @@ public: } template - static inline auto parallel_for_each(Container&& container, Func&& func) noexcept { + static inline auto parallel_for_each(Container& container, Func&& func) noexcept { return parallel_for_each( std::begin(container), std::end(container), diff --git a/src/crimson/os/seastore/async_cleaner.cc b/src/crimson/os/seastore/async_cleaner.cc index 3ead6ff0127..e92f2d2092f 100644 --- a/src/crimson/os/seastore/async_cleaner.cc +++ b/src/crimson/os/seastore/async_cleaner.cc @@ -906,27 +906,32 @@ AsyncCleaner::do_reclaim_space( // retrieve live extents DEBUGT("start, backref_entries={}, backref_extents={}", t, backref_entries.size(), extents.size()); - return trans_intr::parallel_for_each( - backref_entries, - [this, FNAME, &extents, &t](auto &ent) - { - TRACET("getting extent of type {} at {}~{}", - t, - ent.type, - ent.paddr, - ent.len); - return ecb->get_extents_if_live( - t, ent.type, ent.paddr, ent.laddr, ent.len - ).si_then([FNAME, &extents, &ent, &t](auto list) { - if (list.empty()) { - TRACET("addr {} dead, skipping", t, ent.paddr); - } else { - for (auto &e : list) { - extents.emplace_back(std::move(e)); - } - } - }); - }).si_then([FNAME, &extents, this, &reclaimed, &t] { + return seastar::do_with( + std::move(backref_entries), + [this, &extents, &t](auto &backref_entries) { + return trans_intr::parallel_for_each( + backref_entries, + [this, &extents, &t](auto &ent) + { + LOG_PREFIX(AsyncCleaner::do_reclaim_space); + TRACET("getting extent of type {} at {}~{}", + t, + ent.type, + ent.paddr, + ent.len); + return ecb->get_extents_if_live( + t, ent.type, ent.paddr, ent.laddr, ent.len + ).si_then([FNAME, &extents, &ent, &t](auto list) { + if (list.empty()) { + TRACET("addr {} dead, skipping", t, ent.paddr); + } else { + for (auto &e : list) { + extents.emplace_back(std::move(e)); + } + } + }); + }); + }).si_then([FNAME, &extents, this, &reclaimed, &t] { DEBUGT("reclaim {} extents", t, extents.size()); // rewrite live extents auto modify_time = segments[reclaim_state->get_segment_id()].modify_time; diff --git a/src/crimson/os/seastore/transaction_manager.cc b/src/crimson/os/seastore/transaction_manager.cc index da95f5351a4..d4b2bfc607e 100644 --- a/src/crimson/os/seastore/transaction_manager.cc +++ b/src/crimson/os/seastore/transaction_manager.cc @@ -1,5 +1,5 @@ // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab +// vim: ts=8 sw=2 smarttab expandtab #include "include/denc.h" #include "include/intarith.h" @@ -561,7 +561,7 @@ TransactionManager::get_extents_if_live( { auto paddr_seg_id = paddr.as_seg_paddr().get_segment_id(); return trans_intr::parallel_for_each( - std::move(pin_list), + pin_list, [=, this, &list, &t]( LBAPinRef &pin) -> Cache::get_extent_iertr::future<> { diff --git a/src/crimson/osd/pg_recovery.cc b/src/crimson/osd/pg_recovery.cc index 78dd07976cb..abc123c69e4 100644 --- a/src/crimson/osd/pg_recovery.cc +++ b/src/crimson/osd/pg_recovery.cc @@ -59,7 +59,7 @@ PGRecovery::start_recovery_ops( } using interruptor = crimson::interruptible::interruptor; - return interruptor::parallel_for_each(std::move(started), + return interruptor::parallel_for_each(started, [] (auto&& ifut) { return std::move(ifut); }).then_interruptible([this] { diff --git a/src/crimson/osd/recovery_backend.cc b/src/crimson/osd/recovery_backend.cc index 76ca1bd8b0d..89d74798806 100644 --- a/src/crimson/osd/recovery_backend.cc +++ b/src/crimson/osd/recovery_backend.cc @@ -174,35 +174,39 @@ RecoveryBackend::scan_for_backfill( return backend->list_objects(start, max).then_interruptible( [this, start, version_map] (auto&& ret) { auto&& [objects, next] = std::move(ret); - return interruptor::parallel_for_each(std::move(objects), - [this, version_map] (const hobject_t& object) - -> interruptible_future<> { - crimson::osd::ObjectContextRef obc; - if (pg.is_primary()) { - obc = shard_services.maybe_get_cached_obc(object); - } - if (obc) { - if (obc->obs.exists) { - logger().debug("scan_for_backfill found (primary): {} {}", - object, obc->obs.oi.version); - version_map->emplace(object, obc->obs.oi.version); - } else { - // if the object does not exist here, it must have been removed - // between the collection_list_partial and here. This can happen - // for the first item in the range, which is usually last_backfill. - } - return seastar::now(); - } else { - return backend->load_metadata(object).safe_then_interruptible( - [version_map, object] (auto md) { - if (md->os.exists) { - logger().debug("scan_for_backfill found: {} {}", - object, md->os.oi.version); - version_map->emplace(object, md->os.oi.version); - } - return seastar::now(); - }, PGBackend::load_metadata_ertr::assert_all{}); - } + return seastar::do_with( + std::move(objects), + [this, version_map](auto &objects) { + return interruptor::parallel_for_each(objects, + [this, version_map] (const hobject_t& object) + -> interruptible_future<> { + crimson::osd::ObjectContextRef obc; + if (pg.is_primary()) { + obc = shard_services.maybe_get_cached_obc(object); + } + if (obc) { + if (obc->obs.exists) { + logger().debug("scan_for_backfill found (primary): {} {}", + object, obc->obs.oi.version); + version_map->emplace(object, obc->obs.oi.version); + } else { + // if the object does not exist here, it must have been removed + // between the collection_list_partial and here. This can happen + // for the first item in the range, which is usually last_backfill. + } + return seastar::now(); + } else { + return backend->load_metadata(object).safe_then_interruptible( + [version_map, object] (auto md) { + if (md->os.exists) { + logger().debug("scan_for_backfill found: {} {}", + object, md->os.oi.version); + version_map->emplace(object, md->os.oi.version); + } + return seastar::now(); + }, PGBackend::load_metadata_ertr::assert_all{}); + } + }); }).then_interruptible([version_map, start=std::move(start), next=std::move(next), this] { BackfillInterval bi; bi.begin = std::move(start); diff --git a/src/crimson/osd/replicated_recovery_backend.cc b/src/crimson/osd/replicated_recovery_backend.cc index e5660a388f0..3922e5085c8 100644 --- a/src/crimson/osd/replicated_recovery_backend.cc +++ b/src/crimson/osd/replicated_recovery_backend.cc @@ -54,23 +54,28 @@ ReplicatedRecoveryBackend::maybe_push_shards( const hobject_t& soid, eversion_t need) { - return interruptor::parallel_for_each(get_shards_to_push(soid), - [this, need, soid](auto shard) { - return prep_push(soid, need, shard).then_interruptible([this, soid, shard](auto push) { - auto msg = crimson::make_message(); - msg->from = pg.get_pg_whoami(); - msg->pgid = pg.get_pgid(); - msg->map_epoch = pg.get_osdmap_epoch(); - msg->min_epoch = pg.get_last_peering_reset(); - msg->pushes.push_back(std::move(push)); - msg->set_priority(pg.get_recovery_op_priority()); - return interruptor::make_interruptible( - shard_services.send_to_osd(shard.osd, - std::move(msg), - pg.get_osdmap_epoch())) - .then_interruptible( - [this, soid, shard] { - return get_recovering(soid).wait_for_pushes(shard); + return seastar::do_with( + get_shards_to_push(soid), + [this, need, soid](auto &shards) { + return interruptor::parallel_for_each( + shards, + [this, need, soid](auto shard) { + return prep_push(soid, need, shard).then_interruptible([this, soid, shard](auto push) { + auto msg = crimson::make_message(); + msg->from = pg.get_pg_whoami(); + msg->pgid = pg.get_pgid(); + msg->map_epoch = pg.get_osdmap_epoch(); + msg->min_epoch = pg.get_last_peering_reset(); + msg->pushes.push_back(std::move(push)); + msg->set_priority(pg.get_recovery_op_priority()); + return interruptor::make_interruptible( + shard_services.send_to_osd(shard.osd, + std::move(msg), + pg.get_osdmap_epoch())) + .then_interruptible( + [this, soid, shard] { + return get_recovering(soid).wait_for_pushes(shard); + }); }); }); }).then_interruptible([this, soid] {