});
}
-AsyncCleaner::retrieve_live_extents_ret
-AsyncCleaner::_retrieve_live_extents(
- Transaction &t,
- std::set<
- backref_entry_t,
- backref_entry_t::cmp_t> &&backrefs,
- std::vector<CachedExtentRef> &extents)
+AsyncCleaner::do_reclaim_space_ret
+AsyncCleaner::do_reclaim_space(
+ const std::vector<CachedExtentRef> &backref_extents,
+ const backref_pin_list_t &pin_list,
+ std::size_t &reclaimed,
+ std::size_t &runs)
{
- return seastar::do_with(
- std::move(backrefs),
- [this, &t, &extents](auto &backrefs) {
- return trans_intr::parallel_for_each(
- backrefs,
- [this, &extents, &t](auto &ent) {
- LOG_PREFIX(AsyncCleaner::_retrieve_live_extents);
- DEBUGT("getting extent of type {} at {}~{}",
- t,
- ent.type,
- ent.paddr,
- ent.len);
- return ecb->get_extents_if_live(
- t, ent.type, ent.paddr, ent.laddr, ent.len
- ).si_then([&extents, &ent, &t](auto list) {
- LOG_PREFIX(AsyncCleaner::_retrieve_live_extents);
- if (list.empty()) {
- DEBUGT("addr {} dead, skipping", t, ent.paddr);
- } else {
- for (auto &e : list) {
- extents.emplace_back(std::move(e));
- }
- }
- return ExtentCallbackInterface::rewrite_extent_iertr::now();
+ return repeat_eagain([this, &backref_extents,
+ &pin_list, &reclaimed, &runs] {
+ reclaimed = 0;
+ runs++;
+ return ecb->with_transaction_intr(
+ Transaction::src_t::CLEANER_RECLAIM,
+ "reclaim_space",
+ [this, &backref_extents, &pin_list, &reclaimed](auto &t)
+ {
+ return seastar::do_with(
+ std::vector<CachedExtentRef>(backref_extents),
+ [this, &t, &reclaimed, &pin_list](auto &extents)
+ {
+ LOG_PREFIX(AsyncCleaner::do_reclaim_space);
+ // calculate live extents
+ auto cached_backref_entries =
+ backref_manager.get_cached_backref_entries_in_range(
+ reclaim_state->start_pos, reclaim_state->end_pos);
+ backref_entry_query_set_t backref_entries;
+ for (auto &pin : pin_list) {
+ backref_entries.emplace(
+ pin->get_key(),
+ pin->get_val(),
+ pin->get_length(),
+ pin->get_type(),
+ JOURNAL_SEQ_NULL);
+ }
+ for (auto &cached_backref : cached_backref_entries) {
+ if (cached_backref.laddr == L_ADDR_NULL) {
+ auto it = backref_entries.find(cached_backref.paddr);
+ assert(it->len == cached_backref.len);
+ backref_entries.erase(it);
+ } else {
+ backref_entries.emplace(cached_backref);
+ }
+ }
+ // retrieve live extents
+ DEBUGT("start, backref_entries={}, backref_extents={}",
+ t, backref_entries.size(), extents.size());
+ return trans_intr::parallel_for_each(
+ backref_entries,
+ [this, FNAME, &extents, &t](auto &ent)
+ {
+ TRACET("getting extent of type {} at {}~{}",
+ t,
+ ent.type,
+ ent.paddr,
+ ent.len);
+ return ecb->get_extents_if_live(
+ t, ent.type, ent.paddr, ent.laddr, ent.len
+ ).si_then([FNAME, &extents, &ent, &t](auto list) {
+ if (list.empty()) {
+ TRACET("addr {} dead, skipping", t, ent.paddr);
+ } else {
+ for (auto &e : list) {
+ extents.emplace_back(std::move(e));
+ }
+ }
+ });
+ }).si_then([FNAME, &extents, this, &reclaimed, &t] {
+ DEBUGT("reclaim {} extents", t, extents.size());
+ // rewrite live extents
+ auto modify_time = segments[reclaim_state->get_segment_id()].modify_time;
+ return trans_intr::do_for_each(
+ extents,
+ [this, modify_time, &t, &reclaimed](auto ext)
+ {
+ reclaimed += ext->get_length();
+ return ecb->rewrite_extent(
+ t, ext, reclaim_state->target_generation, modify_time);
+ });
+ });
+ }).si_then([this, &t] {
+ return ecb->submit_transaction_direct(t);
});
});
});
[this, FNAME, pavail_ratio, start](
auto &backref_extents, auto &pin_list, auto &reclaimed, auto &runs)
{
- return repeat_eagain([this, &backref_extents, &pin_list, &reclaimed, &runs] {
- reclaimed = 0;
- runs++;
- return ecb->with_transaction_intr(
- Transaction::src_t::CLEANER_RECLAIM,
- "reclaim_space",
- [this, &backref_extents, &pin_list, &reclaimed](auto &t)
- {
- return seastar::do_with(
- std::vector<CachedExtentRef>(backref_extents),
- [this, &t, &pin_list, &reclaimed](auto &extents)
- {
- // calculate live extents
- auto cached_backrefs =
- backref_manager.get_cached_backref_entries_in_range(
- reclaim_state->start_pos, reclaim_state->end_pos);
- std::set<
- backref_entry_t,
- backref_entry_t::cmp_t> backrefs;
- for (auto &pin : pin_list) {
- backrefs.emplace(pin->get_key(), pin->get_val(),
- pin->get_length(), pin->get_type(), journal_seq_t());
- }
- for (auto &backref : cached_backrefs) {
- if (backref.laddr == L_ADDR_NULL) {
- auto it = backrefs.find(backref.paddr);
- assert(it->len == backref.len);
- backrefs.erase(it);
- } else {
- backrefs.emplace(backref.paddr, backref.laddr,
- backref.len, backref.type, backref.seq);
- }
- }
- return _retrieve_live_extents(t, std::move(backrefs), extents
- ).si_then([this, &t, &reclaimed, &extents] {
- auto modify_time = segments[reclaim_state->get_segment_id()].modify_time;
- return trans_intr::do_for_each(
- extents,
- [this, modify_time, &t, &reclaimed](auto &ext)
- {
- reclaimed += ext->get_length();
- return ecb->rewrite_extent(
- t, ext, reclaim_state->target_generation, modify_time);
- });
- });
- }).si_then([this, &t] {
- return ecb->submit_transaction_direct(t);
- });
- });
- }).safe_then([this, FNAME, pavail_ratio, start, &reclaimed, &runs] {
+ return do_reclaim_space(
+ backref_extents,
+ pin_list,
+ reclaimed,
+ runs
+ ).safe_then([this, FNAME, pavail_ratio, start, &reclaimed, &runs] {
stats.reclaiming_bytes += reclaimed;
auto d = seastar::lowres_system_clock::now() - start;
- DEBUG("duration: {}, pavail_ratio before: {}, repeats: {}", d, pavail_ratio, runs);
+ DEBUG("duration: {}, pavail_ratio before: {}, repeats: {}",
+ d, pavail_ratio, runs);
if (reclaim_state->is_complete()) {
auto segment_to_release = reclaim_state->get_segment_id();
INFO("reclaim {} finish, reclaimed alive/total={}",