}
template <typename Container, typename Func>
- static inline auto parallel_for_each(Container&& container, Func&& func) noexcept {
+ static inline auto parallel_for_each(Container& container, Func&& func) noexcept {
return parallel_for_each(
std::begin(container),
std::end(container),
// retrieve live extents
DEBUGT("start, backref_entries={}, backref_extents={}",
t, backref_entries.size(), extents.size());
- return trans_intr::parallel_for_each(
- backref_entries,
- [this, FNAME, &extents, &t](auto &ent)
- {
- TRACET("getting extent of type {} at {}~{}",
- t,
- ent.type,
- ent.paddr,
- ent.len);
- return ecb->get_extents_if_live(
- t, ent.type, ent.paddr, ent.laddr, ent.len
- ).si_then([FNAME, &extents, &ent, &t](auto list) {
- if (list.empty()) {
- TRACET("addr {} dead, skipping", t, ent.paddr);
- } else {
- for (auto &e : list) {
- extents.emplace_back(std::move(e));
- }
- }
- });
- }).si_then([FNAME, &extents, this, &reclaimed, &t] {
+ return seastar::do_with(
+ std::move(backref_entries),
+ [this, &extents, &t](auto &backref_entries) {
+ return trans_intr::parallel_for_each(
+ backref_entries,
+ [this, &extents, &t](auto &ent)
+ {
+ LOG_PREFIX(AsyncCleaner::do_reclaim_space);
+ TRACET("getting extent of type {} at {}~{}",
+ t,
+ ent.type,
+ ent.paddr,
+ ent.len);
+ return ecb->get_extents_if_live(
+ t, ent.type, ent.paddr, ent.laddr, ent.len
+ ).si_then([FNAME, &extents, &ent, &t](auto list) {
+ if (list.empty()) {
+ TRACET("addr {} dead, skipping", t, ent.paddr);
+ } else {
+ for (auto &e : list) {
+ extents.emplace_back(std::move(e));
+ }
+ }
+ });
+ });
+ }).si_then([FNAME, &extents, this, &reclaimed, &t] {
DEBUGT("reclaim {} extents", t, extents.size());
// rewrite live extents
auto modify_time = segments[reclaim_state->get_segment_id()].modify_time;
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
+// vim: ts=8 sw=2 smarttab expandtab
#include "include/denc.h"
#include "include/intarith.h"
{
auto paddr_seg_id = paddr.as_seg_paddr().get_segment_id();
return trans_intr::parallel_for_each(
- std::move(pin_list),
+ pin_list,
[=, this, &list, &t](
LBAPinRef &pin) -> Cache::get_extent_iertr::future<>
{
}
using interruptor =
crimson::interruptible::interruptor<crimson::osd::IOInterruptCondition>;
- return interruptor::parallel_for_each(std::move(started),
+ return interruptor::parallel_for_each(started,
[] (auto&& ifut) {
return std::move(ifut);
}).then_interruptible([this] {
return backend->list_objects(start, max).then_interruptible(
[this, start, version_map] (auto&& ret) {
auto&& [objects, next] = std::move(ret);
- return interruptor::parallel_for_each(std::move(objects),
- [this, version_map] (const hobject_t& object)
- -> interruptible_future<> {
- crimson::osd::ObjectContextRef obc;
- if (pg.is_primary()) {
- obc = shard_services.maybe_get_cached_obc(object);
- }
- if (obc) {
- if (obc->obs.exists) {
- logger().debug("scan_for_backfill found (primary): {} {}",
- object, obc->obs.oi.version);
- version_map->emplace(object, obc->obs.oi.version);
- } else {
- // if the object does not exist here, it must have been removed
- // between the collection_list_partial and here. This can happen
- // for the first item in the range, which is usually last_backfill.
- }
- return seastar::now();
- } else {
- return backend->load_metadata(object).safe_then_interruptible(
- [version_map, object] (auto md) {
- if (md->os.exists) {
- logger().debug("scan_for_backfill found: {} {}",
- object, md->os.oi.version);
- version_map->emplace(object, md->os.oi.version);
- }
- return seastar::now();
- }, PGBackend::load_metadata_ertr::assert_all{});
- }
+ return seastar::do_with(
+ std::move(objects),
+ [this, version_map](auto &objects) {
+ return interruptor::parallel_for_each(objects,
+ [this, version_map] (const hobject_t& object)
+ -> interruptible_future<> {
+ crimson::osd::ObjectContextRef obc;
+ if (pg.is_primary()) {
+ obc = shard_services.maybe_get_cached_obc(object);
+ }
+ if (obc) {
+ if (obc->obs.exists) {
+ logger().debug("scan_for_backfill found (primary): {} {}",
+ object, obc->obs.oi.version);
+ version_map->emplace(object, obc->obs.oi.version);
+ } else {
+ // if the object does not exist here, it must have been removed
+ // between the collection_list_partial and here. This can happen
+ // for the first item in the range, which is usually last_backfill.
+ }
+ return seastar::now();
+ } else {
+ return backend->load_metadata(object).safe_then_interruptible(
+ [version_map, object] (auto md) {
+ if (md->os.exists) {
+ logger().debug("scan_for_backfill found: {} {}",
+ object, md->os.oi.version);
+ version_map->emplace(object, md->os.oi.version);
+ }
+ return seastar::now();
+ }, PGBackend::load_metadata_ertr::assert_all{});
+ }
+ });
}).then_interruptible([version_map, start=std::move(start), next=std::move(next), this] {
BackfillInterval bi;
bi.begin = std::move(start);
const hobject_t& soid,
eversion_t need)
{
- return interruptor::parallel_for_each(get_shards_to_push(soid),
- [this, need, soid](auto shard) {
- return prep_push(soid, need, shard).then_interruptible([this, soid, shard](auto push) {
- auto msg = crimson::make_message<MOSDPGPush>();
- msg->from = pg.get_pg_whoami();
- msg->pgid = pg.get_pgid();
- msg->map_epoch = pg.get_osdmap_epoch();
- msg->min_epoch = pg.get_last_peering_reset();
- msg->pushes.push_back(std::move(push));
- msg->set_priority(pg.get_recovery_op_priority());
- return interruptor::make_interruptible(
- shard_services.send_to_osd(shard.osd,
- std::move(msg),
- pg.get_osdmap_epoch()))
- .then_interruptible(
- [this, soid, shard] {
- return get_recovering(soid).wait_for_pushes(shard);
+ return seastar::do_with(
+ get_shards_to_push(soid),
+ [this, need, soid](auto &shards) {
+ return interruptor::parallel_for_each(
+ shards,
+ [this, need, soid](auto shard) {
+ return prep_push(soid, need, shard).then_interruptible([this, soid, shard](auto push) {
+ auto msg = crimson::make_message<MOSDPGPush>();
+ msg->from = pg.get_pg_whoami();
+ msg->pgid = pg.get_pgid();
+ msg->map_epoch = pg.get_osdmap_epoch();
+ msg->min_epoch = pg.get_last_peering_reset();
+ msg->pushes.push_back(std::move(push));
+ msg->set_priority(pg.get_recovery_op_priority());
+ return interruptor::make_interruptible(
+ shard_services.send_to_osd(shard.osd,
+ std::move(msg),
+ pg.get_osdmap_epoch()))
+ .then_interruptible(
+ [this, soid, shard] {
+ return get_recovering(soid).wait_for_pushes(shard);
+ });
});
});
}).then_interruptible([this, soid] {