// as parallel transactions may split the extent at the same time.
ceph_assert(paddr.is_absolute_segmented());
- return cache->get_extent_if_cached(t, paddr, len, type
- ).si_then([this, FNAME, type, paddr, laddr, len, &t](auto extent)
- -> get_extents_if_live_ret {
- if (extent) {
- DEBUGT("{} {}~0x{:x} {} is cached and alive -- {}",
- t, type, laddr, len, paddr, *extent);
- assert(extent->get_length() == len);
- std::list<CachedExtentRef> res;
- res.emplace_back(std::move(extent));
- return get_extents_if_live_ret(
- interruptible::ready_future_marker{},
- res);
- }
+ auto extent = co_await cache->get_extent_if_cached(t, paddr, len, type);
+ std::list<CachedExtentRef> res;
+ if (extent) {
+ DEBUGT("{} {}~0x{:x} {} is cached and alive -- {}",
+ t, type, laddr, len, paddr, *extent);
+ assert(extent->get_length() == len);
+ std::list<CachedExtentRef> res;
+ res.emplace_back(std::move(extent));
+ } else if (is_logical_type(type)) {
+ auto pin_list = co_await lba_manager->get_mappings(
+ t,
+ laddr,
+ len
+ );
+ auto paddr_seg_id = paddr.as_seg_paddr().get_segment_id();
+ for (auto &pin : pin_list) {
+ DEBUGT("got pin, try read in parallel ... -- {}", t, pin);
+ auto pin_paddr = pin.get_val();
+ if (!pin_paddr.is_absolute_segmented()) {
+ continue;
+ }
+ auto &pin_seg_paddr = pin_paddr.as_seg_paddr();
+ auto pin_paddr_seg_id = pin_seg_paddr.get_segment_id();
+ // auto pin_len = pin->get_length();
+ if (pin_paddr_seg_id != paddr_seg_id) {
+ continue;
+ }
- if (is_logical_type(type)) {
- return lba_manager->get_mappings(
- t,
- laddr,
- len
- ).si_then([this, FNAME, type, paddr, laddr, len, &t](lba_mapping_list_t pin_list) {
- return seastar::do_with(
- std::list<CachedExtentRef>(),
- std::move(pin_list),
- [this, FNAME, type, paddr, laddr, len, &t]
- (std::list<CachedExtentRef> &extent_list, auto& pin_list)
- {
- auto paddr_seg_id = paddr.as_seg_paddr().get_segment_id();
- return trans_intr::parallel_for_each(
- pin_list,
- [this, FNAME, type, paddr_seg_id, &extent_list, &t](
- LBAMapping& pin) -> Cache::get_extent_iertr::future<>
- {
- DEBUGT("got pin, try read in parallel ... -- {}", t, pin);
- auto pin_paddr = pin.get_val();
- if (!pin_paddr.is_absolute_segmented()) {
- return seastar::now();
- }
- auto &pin_seg_paddr = pin_paddr.as_seg_paddr();
- auto pin_paddr_seg_id = pin_seg_paddr.get_segment_id();
- // auto pin_len = pin->get_length();
- if (pin_paddr_seg_id != paddr_seg_id) {
- return seastar::now();
- }
-
- // pin may be out of the range paddr~len, consider the following scene:
- // 1. Trans.A writes the final record of Segment S, in which it overwrite
- // another extent E in the same segment S;
- // 2. Before Trans.A "complete_commit", Trans.B tries to rewrite new
- // records and roll the segments, which closes Segment S;
- // 3. Before Trans.A "complete_commit", a new cleaner Transaction C tries
- // to clean the segment;
- //
- // In this scenario, C might see a part of extent E's laddr space mapped
- // to another location within the same segment S.
- //
- // FIXME: this assert should be re-enabled once we have space reclaiming
- // recognize committed segments: https://tracker.ceph.com/issues/66941
- // ceph_assert(pin_seg_paddr >= paddr &&
- // pin_seg_paddr.add_offset(pin_len) <= paddr.add_offset(len));
- return read_pin_by_type(t, std::move(pin), type
- ).si_then([&extent_list](auto ret) {
- extent_list.emplace_back(std::move(ret));
- return seastar::now();
- });
- }).si_then([&extent_list, &t, FNAME, type, laddr, len, paddr] {
- DEBUGT("{} {}~0x{:x} {} is alive as {} extents",
- t, type, laddr, len, paddr, extent_list.size());
- return get_extents_if_live_ret(
- interruptible::ready_future_marker{},
- std::move(extent_list));
- });
- });
- }).handle_error_interruptible(crimson::ct_error::enoent::handle([] {
- return get_extents_if_live_ret(
- interruptible::ready_future_marker{},
- std::list<CachedExtentRef>());
- }), crimson::ct_error::pass_further_all{});
+ // pin may be out of the range paddr~len, consider the following scene:
+ // 1. Trans.A writes the final record of Segment S, in which it overwrite
+ // another extent E in the same segment S;
+ // 2. Before Trans.A "complete_commit", Trans.B tries to rewrite new
+ // records and roll the segments, which closes Segment S;
+ // 3. Before Trans.A "complete_commit", a new cleaner Transaction C tries
+ // to clean the segment;
+ //
+ // In this scenario, C might see a part of extent E's laddr space mapped
+ // to another location within the same segment S.
+ //
+ // FIXME: this assert should be re-enabled once we have space reclaiming
+ // recognize committed segments: https://tracker.ceph.com/issues/66941
+ // ceph_assert(pin_seg_paddr >= paddr &&
+ // pin_seg_paddr.add_offset(pin_len) <= paddr.add_offset(len));
+ auto ret = co_await read_pin_by_type(t, std::move(pin), type);
+ res.emplace_back(std::move(ret));
+ }
+ DEBUGT("{} {}~0x{:x} {} is alive as {} extents",
+ t, type, laddr, len, paddr, res.size());
+ } else {
+ auto ext = co_await lba_manager->get_physical_extent_if_live(
+ t,
+ type,
+ paddr,
+ laddr,
+ len
+ );
+ if (ext) {
+ DEBUGT("{} {}~0x{:x} {} is absent and alive as physical extent -- {}",
+ t, type, laddr, len, paddr, *ext);
+ res.emplace_back(std::move(ext));
} else {
- return lba_manager->get_physical_extent_if_live(
- t,
- type,
- paddr,
- laddr,
- len
- ).si_then([=, &t](auto ret) {
- std::list<CachedExtentRef> res;
- if (ret) {
- DEBUGT("{} {}~0x{:x} {} is absent and alive as physical extent -- {}",
- t, type, laddr, len, paddr, *ret);
- res.emplace_back(std::move(ret));
- } else {
- DEBUGT("{} {}~0x{:x} {} is not alive as physical extent",
- t, type, laddr, len, paddr);
- }
- return get_extents_if_live_ret(
- interruptible::ready_future_marker{},
- std::move(res));
- });
+ DEBUGT("{} {}~0x{:x} {} is not alive as physical extent",
+ t, type, laddr, len, paddr);
}
- });
+ }
+ co_return res;
}
TransactionManager::~TransactionManager() {}