[this, &reclaimed, &runs, &pin_list]() mutable {
reclaimed = 0;
runs++;
- return seastar::do_with(
- JOURNAL_SEQ_NULL,
- [this, &reclaimed, &pin_list](
- auto &seq) {
- return ecb->with_transaction_intr(
- Transaction::src_t::CLEANER_RECLAIM,
- "reclaim_space",
- [this, &seq, &reclaimed, &pin_list](auto &t) {
- return seastar::do_with(
- std::vector<CachedExtentRef>(),
- [this, &reclaimed, &t, &seq, &pin_list]
- (auto &extents) {
- return backref_manager.retrieve_backref_extents(
- t,
- backref_manager.get_cached_backref_extents_in_range(
- reclaim_state->start_pos, reclaim_state->end_pos),
- extents
- ).si_then([this, &extents, &t, &pin_list] {
- // calculate live extents
- auto backref_set =
- backref_manager.get_cached_backrefs_in_range(
- reclaim_state->start_pos, reclaim_state->end_pos);
- std::set<
- backref_buf_entry_t,
- backref_buf_entry_t::cmp_t> backrefs;
- for (auto &pin : pin_list) {
- backrefs.emplace(pin->get_key(), pin->get_val(),
- pin->get_length(), pin->get_type(), journal_seq_t());
- }
- for (auto &backref : backref_set) {
- if (backref.laddr == L_ADDR_NULL) {
- auto it = backrefs.find(backref.paddr);
- assert(it->len == backref.len);
- backrefs.erase(it);
- } else {
- backrefs.emplace(backref.paddr, backref.laddr,
- backref.len, backref.type, backref.seq);
- }
- }
- // retrieve live extents
- return _retrieve_live_extents(
- t, std::move(backrefs), extents);
- }).si_then([this, &t, &seq] {
- // we need to get the backref_set in range again, because
- // it can change during live extents retrieval
- auto backref_set =
- backref_manager.get_cached_backrefs_in_range(
- reclaim_state->start_pos, reclaim_state->end_pos);
- // calculate the journal seq up to which the backref merge
- // should run
- for (auto &backref : backref_set) {
- if (backref.seq != JOURNAL_SEQ_NULL &&
- (backref.seq > seq || seq == JOURNAL_SEQ_NULL)) {
- seq = backref.seq;
- }
- }
- auto fut = BackrefManager::merge_cached_backrefs_iertr::now();
- if (seq != JOURNAL_SEQ_NULL) {
- fut = backref_manager.merge_cached_backrefs(
- t, seq, std::numeric_limits<uint64_t>::max()
- ).si_then([](auto) {
- return BackrefManager::merge_cached_backrefs_iertr::now();
- });
+ return ecb->with_transaction_intr(
+ Transaction::src_t::CLEANER_RECLAIM,
+ "reclaim_space",
+ [this, &reclaimed, &pin_list](auto &t) {
+ return seastar::do_with(
+ std::vector<CachedExtentRef>(),
+ [this, &reclaimed, &t, &pin_list]
+ (auto &extents) {
+ return backref_manager.retrieve_backref_extents(
+ t,
+ backref_manager.get_cached_backref_extents_in_range(
+ reclaim_state->start_pos, reclaim_state->end_pos),
+ extents
+ ).si_then([this, &extents, &t, &pin_list] {
+ // calculate live extents
+ auto cached_backrefs =
+ backref_manager.get_cached_backrefs_in_range(
+ reclaim_state->start_pos, reclaim_state->end_pos);
+ std::set<
+ backref_buf_entry_t,
+ backref_buf_entry_t::cmp_t> backrefs;
+ for (auto &pin : pin_list) {
+ backrefs.emplace(pin->get_key(), pin->get_val(),
+ pin->get_length(), pin->get_type(), journal_seq_t());
+ }
+ for (auto &backref : cached_backrefs) {
+ if (backref.laddr == L_ADDR_NULL) {
+ auto it = backrefs.find(backref.paddr);
+ assert(it->len == backref.len);
+ backrefs.erase(it);
+ } else {
+ backrefs.emplace(backref.paddr, backref.laddr,
+ backref.len, backref.type, backref.seq);
}
- return fut;
- }).si_then([&extents, this, &t, &reclaimed] {
- auto modify_time = segments[reclaim_state->get_segment_id()].modify_time;
- return trans_intr::do_for_each(
- extents,
- [this, modify_time, &t, &reclaimed](auto &ext) {
- reclaimed += ext->get_length();
- return ecb->rewrite_extent(
- t, ext, reclaim_state->target_generation, modify_time);
- });
- });
- }).si_then([this, &t, &seq] {
- if (reclaim_state->is_complete()) {
- t.mark_segment_to_release(reclaim_state->get_segment_id());
}
- return ecb->submit_transaction_direct(
- t, std::make_optional<journal_seq_t>(std::move(seq)),
- std::make_optional<std::pair<paddr_t, paddr_t>>(
- {reclaim_state->start_pos, reclaim_state->end_pos}));
+ return _retrieve_live_extents(
+ t, std::move(backrefs), extents);
+ }).si_then([&extents, this, &t, &reclaimed] {
+ auto modify_time = segments[reclaim_state->get_segment_id()].modify_time;
+ return trans_intr::do_for_each(
+ extents,
+ [this, modify_time, &t, &reclaimed](auto &ext) {
+ reclaimed += ext->get_length();
+ return ecb->rewrite_extent(
+ t, ext, reclaim_state->target_generation, modify_time);
+ });
});
+ }).si_then([this, &t] {
+ if (reclaim_state->is_complete()) {
+ t.mark_segment_to_release(reclaim_state->get_segment_id());
+ }
+ return ecb->submit_transaction_direct(t);
});
});
});
TransactionManager::submit_transaction_direct_ret
TransactionManager::submit_transaction_direct(
Transaction &tref,
- std::optional<journal_seq_t> seq_to_trim,
- std::optional<std::pair<paddr_t, paddr_t>> gc_range)
+ std::optional<journal_seq_t> seq_to_trim)
{
LOG_PREFIX(TransactionManager::submit_transaction_direct);
SUBTRACET(seastore_t, "start", tref);
}).si_then([this, FNAME, &tref] {
SUBTRACET(seastore_t, "about to prepare", tref);
return tref.get_handle().enter(write_pipeline.prepare);
- }).si_then([this, FNAME, &tref, seq_to_trim=std::move(seq_to_trim),
- gc_range=std::move(gc_range)]() mutable
+ }).si_then([this, FNAME, &tref, seq_to_trim=std::move(seq_to_trim)]() mutable
-> submit_transaction_iertr::future<> {
if (seq_to_trim && *seq_to_trim != JOURNAL_SEQ_NULL) {
cache->trim_backref_bufs(*seq_to_trim);
}
-#ifndef NDEBUG
- if (gc_range) {
- auto backref_set =
- backref_manager->get_cached_backrefs_in_range(
- gc_range->first, gc_range->second);
- for (auto &backref : backref_set) {
- ERRORT("unexpected backref: {}~{}, {}, {}, {}",
- tref, backref.paddr, backref.len, backref.laddr,
- backref.type, backref.seq);
- ceph_abort("impossible");
- }
- }
-#endif
auto record = cache->prepare_record(tref, async_cleaner.get());
tref.get_handle().maybe_release_collection_lock();
return rewrite_extent_iertr::now();
}
- auto fut = rewrite_extent_iertr::now();
if (extent->is_logical()) {
- fut = rewrite_logical_extent(t, extent->cast<LogicalCachedExtent>());
+ return rewrite_logical_extent(t, extent->cast<LogicalCachedExtent>());
} else {
DEBUGT("rewriting physical extent -- {}", t, *extent);
- fut = lba_manager->rewrite_extent(t, extent);
+ return lba_manager->rewrite_extent(t, extent);
}
-
- return fut.si_then([this, extent, &t] {
- t.dont_record_release(extent);
- return backref_manager->remove_mapping(
- t, extent->get_paddr()).si_then([](auto) {
- return seastar::now();
- }).handle_error_interruptible(
- crimson::ct_error::input_output_error::pass_further(),
- crimson::ct_error::assert_all()
- );
- });
}
TransactionManager::get_extents_if_live_ret TransactionManager::get_extents_if_live(