Consumers of TransactionManager use wrapper classes InterruptedTransactionManager
and InterruptedTMRef for now until we convert them.
Also converts users of InterruptedCache etc and removes.
Signed-off-by: Samuel Just <sjust@redhat.com>
};
using CacheRef = std::unique_ptr<Cache>;
-#define FORWARD(METHOD) \
- template <typename... Args> \
- auto METHOD(Args&&... args) const { \
- return cache.METHOD(std::forward<Args>(args)...); \
- }
-
-#define PARAM_FORWARD(METHOD) \
- template <typename T, typename... Args> \
- auto METHOD(Args&&... args) const { \
- return cache.METHOD<T>(std::forward<Args>(args)...); \
- }
-
-#define INT_FORWARD(METHOD) \
- template <typename... Args> \
- auto METHOD(Transaction &t, Args&&... args) const { \
- return with_trans_intr( \
- t, \
- [this](auto&&... args) { \
- return cache.METHOD(args...); \
- }, \
- std::forward<Args>(args)...); \
- }
-
-#define PARAM_INT_FORWARD(METHOD) \
- template <typename T, typename... Args> \
- auto METHOD(Transaction &t, Args&&... args) const { \
- return with_trans_intr( \
- t, \
- [this](auto&&... args) { \
- return cache.METHOD<T>(args...); \
- }, \
- std::forward<Args>(args)...); \
- }
-
-/// Temporary translator to non-interruptible futures
-class InterruptedCache {
- Cache &cache;
-public:
- InterruptedCache(Cache &cache) : cache(cache) {}
-
- FORWARD(init)
- FORWARD(mkfs)
- FORWARD(replay_delta)
- FORWARD(init_cached_extents)
- FORWARD(drop_from_cache)
- FORWARD(create_transaction)
- FORWARD(create_weak_transaction)
- FORWARD(try_construct_record)
- FORWARD(complete_commit)
- FORWARD(close)
- FORWARD(dump_contents)
- FORWARD(get_next_dirty_extents)
- FORWARD(update_extent_from_transaction)
- INT_FORWARD(get_extent_if_cached)
- FORWARD(get_oldest_dirty_from)
- PARAM_FORWARD(alloc_new_extent)
- FORWARD(alloc_new_extent_by_type)
- INT_FORWARD(get_extent_by_type)
- INT_FORWARD(retire_extent_addr)
- FORWARD(retire_extent)
- INT_FORWARD(get_root)
- FORWARD(get_root_fast)
- FORWARD(duplicate_for_write)
- PARAM_INT_FORWARD(get_extent)
-};
-
-class InterruptedCacheRef {
- std::unique_ptr<Cache> ref;
- InterruptedCache icache;
-public:
- template <typename... T>
- InterruptedCacheRef(std::unique_ptr<Cache> cache)
- : ref(std::move(cache)), icache(*ref) {}
-
- auto &operator*() {
- return icache;
- }
-
- auto operator->() {
- return &icache;
- }
-};
-
}
namespace crimson::os::seastore::collection_manager {
-CollectionManagerRef create_coll_manager(TransactionManager &trans_manager) {
+CollectionManagerRef create_coll_manager(InterruptedTransactionManager trans_manager) {
return CollectionManagerRef(new FlatCollectionManager(trans_manager));
}
/// Interface for maintaining set of collections
class CollectionManager {
public:
- using base_ertr = TransactionManager::read_extent_ertr;
+ using base_ertr = with_trans_ertr<
+ TransactionManager::read_extent_iertr>;
+
/// Initialize collection manager instance for an empty store
- using mkfs_ertr = TransactionManager::alloc_extent_ertr;
+ using mkfs_ertr = with_trans_ertr<
+ TransactionManager::alloc_extent_iertr>;
using mkfs_ret = mkfs_ertr::future<coll_root_t>;
virtual mkfs_ret mkfs(
Transaction &t) = 0;
namespace collection_manager {
/* creat CollectionMapManager for Collection */
CollectionManagerRef create_coll_manager(
- TransactionManager &trans_manager);
+ InterruptedTransactionManager trans_manager);
}
namespace crimson::os::seastore::collection_manager {
struct coll_context_t {
- TransactionManager &tm;
+ InterruptedTransactionManager tm;
Transaction &t;
};
[[maybe_unused]] constexpr static segment_off_t MAX_FLAT_BLOCK_SIZE = 4<<20;
FlatCollectionManager::FlatCollectionManager(
- TransactionManager &tm)
+ InterruptedTransactionManager tm)
: tm(tm) {}
FlatCollectionManager::mkfs_ret
namespace crimson::os::seastore::collection_manager {
class FlatCollectionManager : public CollectionManager {
- TransactionManager &tm;
+ InterruptedTransactionManager tm;
coll_context_t get_coll_context(Transaction &t) {
return coll_context_t{tm, t};
}
- using get_root_ertr = TransactionManager::read_extent_ertr;
+ using get_root_ertr = base_ertr;
using get_root_ret = get_root_ertr::future<CollectionNodeRef>;
get_root_ret get_coll_root(const coll_root_t &coll_root, Transaction &t);
public:
- explicit FlatCollectionManager(TransactionManager &tm);
+ explicit FlatCollectionManager(InterruptedTransactionManager tm);
mkfs_ret mkfs(Transaction &t) final;
namespace crimson::os::seastore {
-#define LBA_INT_FORWARD(METHOD) \
- template <typename... Args> \
- auto METHOD(Transaction &t, Args&&... args) { \
- return with_trans_intr( \
- t, \
- [this](auto&&... args) { \
- return this->_##METHOD(args...); \
- }, \
- std::forward<Args>(args)...); \
- }
-
/**
* Abstract interface for managing the logical to physical mapping
*/
*
* Future will not resolve until all pins have resolved (set_paddr called)
*/
- using get_mappings_ertr = base_ertr;
using get_mappings_iertr = trans_iertr<base_ertr>;
using get_mappings_ret = get_mappings_iertr::future<lba_pin_list_t>;
- virtual get_mappings_ret _get_mappings(
+ virtual get_mappings_ret get_mappings(
Transaction &t,
laddr_t offset, extent_len_t length) = 0;
*
* Future will not resolve until all pins have resolved (set_paddr called)
*/
- virtual get_mappings_ret _get_mappings(
+ virtual get_mappings_ret get_mappings(
Transaction &t,
laddr_list_t &&extent_lisk) = 0;
- LBA_INT_FORWARD(get_mappings)
/**
* Fetches the mapping for laddr_t
using get_mapping_iertr = base_iertr::extend<
crimson::ct_error::enoent>;
using get_mapping_ret = get_mapping_iertr::future<LBAPinRef>;
- virtual get_mapping_ret _get_mapping(
+ virtual get_mapping_ret get_mapping(
Transaction &t,
laddr_t offset) = 0;
- LBA_INT_FORWARD(get_mapping)
/**
* Finds unmapped laddr extent of len len
using find_hole_ret = find_hole_iertr::future<
std::pair<laddr_t, extent_len_t>
>;
- virtual find_hole_ret _find_hole(
+ virtual find_hole_ret find_hole(
Transaction &t,
laddr_t hint,
extent_len_t) = 0;
- LBA_INT_FORWARD(find_hole)
/**
* Allocates a new mapping referenced by LBARef
using alloc_extent_ertr = base_ertr;
using alloc_extent_iertr = base_iertr;
using alloc_extent_ret = alloc_extent_iertr::future<LBAPinRef>;
- virtual alloc_extent_ret _alloc_extent(
+ virtual alloc_extent_ret alloc_extent(
Transaction &t,
laddr_t hint,
extent_len_t len,
paddr_t addr) = 0;
- LBA_INT_FORWARD(alloc_extent)
/**
* Creates a new absolute mapping.
using set_extent_iertr = base_iertr::extend<
crimson::ct_error::invarg>;
using set_extent_ret = set_extent_iertr::future<LBAPinRef>;
- virtual set_extent_ret _set_extent(
+ virtual set_extent_ret set_extent(
Transaction &t,
laddr_t off, extent_len_t len, paddr_t addr) = 0;
- LBA_INT_FORWARD(set_extent)
-
struct ref_update_result_t {
unsigned refcount = 0;
*
* @return returns resulting refcount
*/
- virtual ref_ret _decref_extent(
+ virtual ref_ret decref_extent(
Transaction &t,
laddr_t addr) = 0;
- LBA_INT_FORWARD(decref_extent)
/**
* Increments ref count on extent
*
* @return returns resulting refcount
*/
- virtual ref_ret _incref_extent(
+ virtual ref_ret incref_extent(
Transaction &t,
laddr_t addr) = 0;
- LBA_INT_FORWARD(incref_extent)
virtual void complete_transaction(
Transaction &t) = 0;
*/
using init_cached_extent_iertr = base_iertr;
using init_cached_extent_ret = init_cached_extent_iertr::future<>;
- virtual init_cached_extent_ret _init_cached_extent(
+ virtual init_cached_extent_ret init_cached_extent(
Transaction &t,
CachedExtentRef e) = 0;
- LBA_INT_FORWARD(init_cached_extent)
/**
* Calls f for each mapping in [begin, end)
using scan_mappings_ret = scan_mappings_iertr::future<>;
using scan_mappings_func_t = std::function<
void(laddr_t, paddr_t, extent_len_t)>;
- virtual scan_mappings_ret _scan_mappings(
+ virtual scan_mappings_ret scan_mappings(
Transaction &t,
laddr_t begin,
laddr_t end,
scan_mappings_func_t &&f) = 0;
- LBA_INT_FORWARD(scan_mappings)
/**
* Calls f for each mapped space usage in [begin, end)
using scan_mapped_space_ret = scan_mapped_space_iertr::future<>;
using scan_mapped_space_func_t = std::function<
void(paddr_t, extent_len_t)>;
- virtual scan_mapped_space_ret _scan_mapped_space(
+ virtual scan_mapped_space_ret scan_mapped_space(
Transaction &t,
scan_mapped_space_func_t &&f) = 0;
- LBA_INT_FORWARD(scan_mapped_space)
/**
* rewrite_extent
*/
using rewrite_extent_iertr = base_iertr;
using rewrite_extent_ret = rewrite_extent_iertr::future<>;
- virtual rewrite_extent_ret _rewrite_extent(
+ virtual rewrite_extent_ret rewrite_extent(
Transaction &t,
CachedExtentRef extent) = 0;
- LBA_INT_FORWARD(rewrite_extent)
/**
* get_physical_extent_if_live
using get_physical_extent_if_live_iertr = base_iertr;
using get_physical_extent_if_live_ret =
get_physical_extent_if_live_iertr::future<CachedExtentRef>;
- virtual get_physical_extent_if_live_ret _get_physical_extent_if_live(
+ virtual get_physical_extent_if_live_ret get_physical_extent_if_live(
Transaction &t,
extent_types_t type,
paddr_t addr,
laddr_t laddr,
segment_off_t len) = 0;
- LBA_INT_FORWARD(get_physical_extent_if_live)
virtual void add_pin(LBAPin &pin) = 0;
}
BtreeLBAManager::get_mappings_ret
-BtreeLBAManager::_get_mappings(
+BtreeLBAManager::get_mappings(
Transaction &t,
laddr_t offset, extent_len_t length)
{
BtreeLBAManager::get_mappings_ret
-BtreeLBAManager::_get_mappings(
+BtreeLBAManager::get_mappings(
Transaction &t,
laddr_list_t &&list)
{
l->begin(),
l->end(),
[this, &t, &ret](const auto &p) {
- return _get_mappings(t, p.first, p.second).si_then(
+ return get_mappings(t, p.first, p.second).si_then(
[&ret](auto res) {
ret.splice(ret.end(), res, res.begin(), res.end());
return get_mappings_iertr::now();
}
BtreeLBAManager::get_mapping_ret
-BtreeLBAManager::_get_mapping(
+BtreeLBAManager::get_mapping(
Transaction &t,
laddr_t offset)
{
}
BtreeLBAManager::find_hole_ret
-BtreeLBAManager::_find_hole(
+BtreeLBAManager::find_hole(
Transaction &t,
laddr_t hint,
extent_len_t len)
}
BtreeLBAManager::alloc_extent_ret
-BtreeLBAManager::_alloc_extent(
+BtreeLBAManager::alloc_extent(
Transaction &t,
laddr_t hint,
extent_len_t len,
}
BtreeLBAManager::set_extent_ret
-BtreeLBAManager::_set_extent(
+BtreeLBAManager::set_extent(
Transaction &t,
laddr_t off, extent_len_t len, paddr_t addr)
{
}
}
-BtreeLBAManager::init_cached_extent_ret BtreeLBAManager::_init_cached_extent(
+BtreeLBAManager::init_cached_extent_ret BtreeLBAManager::init_cached_extent(
Transaction &t,
CachedExtentRef e)
{
});
}
-BtreeLBAManager::scan_mappings_ret BtreeLBAManager::_scan_mappings(
+BtreeLBAManager::scan_mappings_ret BtreeLBAManager::scan_mappings(
Transaction &t,
laddr_t begin,
laddr_t end,
});
}
-BtreeLBAManager::scan_mapped_space_ret BtreeLBAManager::_scan_mapped_space(
+BtreeLBAManager::scan_mapped_space_ret BtreeLBAManager::scan_mapped_space(
Transaction &t,
scan_mapped_space_func_t &&f)
{
});
}
-BtreeLBAManager::rewrite_extent_ret BtreeLBAManager::_rewrite_extent(
+BtreeLBAManager::rewrite_extent_ret BtreeLBAManager::rewrite_extent(
Transaction &t,
CachedExtentRef extent)
{
- if (extent->has_been_invalidated()) {
- logger().debug(
- "BTreeLBAManager::rewrite_extent: {} is invalid, returning eagain",
- *extent
- );
- return crimson::ct_error::eagain::make();
- }
+ assert(!extent->has_been_invalidated());
logger().debug(
"{}: rewriting {}",
}
BtreeLBAManager::get_physical_extent_if_live_ret
-BtreeLBAManager::_get_physical_extent_if_live(
+BtreeLBAManager::get_physical_extent_if_live(
Transaction &t,
extent_types_t type,
paddr_t addr,
mkfs_ret mkfs(
Transaction &t) final;
- get_mappings_ret _get_mappings(
+ get_mappings_ret get_mappings(
Transaction &t,
laddr_t offset, extent_len_t length) final;
- get_mappings_ret _get_mappings(
+ get_mappings_ret get_mappings(
Transaction &t,
laddr_list_t &&list) final;
- get_mapping_ret _get_mapping(
+ get_mapping_ret get_mapping(
Transaction &t,
laddr_t offset) final;
- find_hole_ret _find_hole(
+ find_hole_ret find_hole(
Transaction &t,
laddr_t hint,
extent_len_t) final;
- alloc_extent_ret _alloc_extent(
+ alloc_extent_ret alloc_extent(
Transaction &t,
laddr_t hint,
extent_len_t len,
paddr_t addr) final;
- set_extent_ret _set_extent(
+ set_extent_ret set_extent(
Transaction &t,
laddr_t off, extent_len_t len, paddr_t addr) final;
- ref_ret _decref_extent(
+ ref_ret decref_extent(
Transaction &t,
laddr_t addr) final {
return update_refcount(t, addr, -1);
}
- ref_ret _incref_extent(
+ ref_ret incref_extent(
Transaction &t,
laddr_t addr) final {
return update_refcount(t, addr, 1);
void complete_transaction(
Transaction &t) final;
- init_cached_extent_ret _init_cached_extent(
+ init_cached_extent_ret init_cached_extent(
Transaction &t,
CachedExtentRef e) final;
- scan_mappings_ret _scan_mappings(
+ scan_mappings_ret scan_mappings(
Transaction &t,
laddr_t begin,
laddr_t end,
scan_mappings_func_t &&f) final;
- scan_mapped_space_ret _scan_mapped_space(
+ scan_mapped_space_ret scan_mapped_space(
Transaction &t,
scan_mapped_space_func_t &&f) final;
- rewrite_extent_ret _rewrite_extent(
+ rewrite_extent_ret rewrite_extent(
Transaction &t,
CachedExtentRef extent) final;
- get_physical_extent_if_live_ret _get_physical_extent_if_live(
+ get_physical_extent_if_live_ret get_physical_extent_if_live(
Transaction &t,
extent_types_t type,
paddr_t addr,
class ObjectDataHandler {
public:
- using base_ertr = TransactionManager::base_ertr;
+ using base_ertr = with_trans_ertr<TransactionManager::base_iertr>;
struct context_t {
- TransactionManager &tm;
+ InterruptedTransactionManager tm;
Transaction &t;
Onode &onode;
};
namespace crimson::os::seastore::omap_manager {
OMapManagerRef create_omap_manager(
- TransactionManager &trans_manager) {
+ InterruptedTransactionManager trans_manager) {
return OMapManagerRef(new BtreeOMapManager(trans_manager));
}
* until these functions future resolved.
*/
public:
- using base_ertr = TransactionManager::base_ertr;
+ using base_ertr = with_trans_ertr<TransactionManager::base_iertr>;
/**
* allocate omap tree root node
namespace omap_manager {
OMapManagerRef create_omap_manager (
- TransactionManager &trans_manager);
+ InterruptedTransactionManager trans_manager);
}
}
TransactionManager &tm)
: tm(tm) {}
+BtreeOMapManager::BtreeOMapManager(
+ InterruptedTransactionManager tm)
+ : tm(tm) {}
+
BtreeOMapManager::initialize_omap_ret
BtreeOMapManager::initialize_omap(Transaction &t)
{
*/
class BtreeOMapManager : public OMapManager {
- TransactionManager &tm;
+ InterruptedTransactionManager tm;
omap_context_t get_omap_context(
Transaction &t) {
OMapNode:: mutation_result_t mresult);
public:
+ explicit BtreeOMapManager(InterruptedTransactionManager tm);
explicit BtreeOMapManager(TransactionManager &tm);
initialize_omap_ret initialize_omap(Transaction &t) final;
namespace crimson::os::seastore::omap_manager{
struct omap_context_t {
- TransactionManager &tm;
+ InterruptedTransactionManager tm;
Transaction &t;
};
OnodeTree tree;
public:
- FLTreeOnodeManager(TransactionManager &tm) :
+ FLTreeOnodeManager(InterruptedTransactionManager tm) :
tree(NodeExtentManager::create_seastore(tm)) {}
mkfs_ret mkfs(Transaction &t) {
}
NodeExtentManagerURef NodeExtentManager::create_seastore(
- TransactionManager& tm, laddr_t min_laddr, double p_eagain)
+ InterruptedTransactionManager tm, laddr_t min_laddr, double p_eagain)
{
if (p_eagain == 0.0) {
return NodeExtentManagerURef(
static NodeExtentManagerURef create_dummy(bool is_sync);
static NodeExtentManagerURef create_seastore(
- TransactionManager& tm, laddr_t min_laddr = L_ADDR_MIN, double p_eagain = 0.0);
+ InterruptedTransactionManager tm, laddr_t min_laddr = L_ADDR_MIN, double p_eagain = 0.0);
};
inline std::ostream& operator<<(std::ostream& os, const NodeExtentManager& nm) {
return nm.print(os);
class SeastoreSuper final: public Super {
public:
SeastoreSuper(Transaction& t, RootNodeTracker& tracker,
- laddr_t root_addr, TransactionManager& tm)
+ laddr_t root_addr, InterruptedTransactionManager& tm)
: Super(t, tracker), root_addr{root_addr}, tm{tm} {}
~SeastoreSuper() override = default;
protected:
}
private:
laddr_t root_addr;
- TransactionManager& tm;
+ InterruptedTransactionManager tm;
};
class SeastoreNodeExtent final: public NodeExtent {
class TransactionManagerHandle : public NodeExtentManager {
public:
- TransactionManagerHandle(TransactionManager& tm) : tm{tm} {}
- TransactionManager& tm;
+ TransactionManagerHandle(InterruptedTransactionManager tm) : tm{tm} {}
+ InterruptedTransactionManager tm;
};
template <bool INJECT_EAGAIN=false>
class SeastoreNodeExtentManager final: public TransactionManagerHandle {
public:
SeastoreNodeExtentManager(
- TransactionManager& tm, laddr_t min, double p_eagain)
+ InterruptedTransactionManager tm, laddr_t min, double p_eagain)
: TransactionManagerHandle(tm), addr_min{min}, p_eagain{p_eagain} {
if constexpr (INJECT_EAGAIN) {
assert(p_eagain > 0.0 && p_eagain < 1.0);
auto c = static_cast<SeastoreCollection*>(ch.get());
LOG_PREFIX(SeaStore::get_attr);
DEBUG("{} {}", c->get_cid(), oid);
- using get_attr_ertr = TransactionManager::base_ertr::extend<
- crimson::ct_error::enodata>;
return repeat_with_onode<ceph::bufferlist>(
c, oid, [=](auto &t, auto& onode)
- -> get_attr_ertr::future<ceph::bufferlist> {
+ -> _omap_get_value_ertr::future<ceph::bufferlist> {
auto& layout = onode.get_layout();
if (name == OI_ATTR && layout.oi_size) {
ceph::bufferlist bl;
OMapManager::omap_list_config_t config);
SegmentManagerRef segment_manager;
- TransactionManagerRef transaction_manager;
+ InterruptedTMRef transaction_manager;
CollectionManagerRef collection_manager;
OnodeManagerRef onode_manager;
- using tm_ertr = TransactionManager::base_ertr;
+ using tm_ertr = with_trans_ertr<TransactionManager::base_iertr>;
using tm_ret = tm_ertr::future<>;
tm_ret _do_transaction_step(
internal_context_t &ctx,
Transaction &t,
journal_seq_t limit)
{
- return ecb->get_next_dirty_extents(
- limit,
- config.journal_rewrite_per_cycle
- ).then([=, &t](auto dirty_list) {
+ return trans_intr::make_interruptible(
+ ecb->get_next_dirty_extents(
+ limit,
+ config.journal_rewrite_per_cycle)
+ ).then_interruptible([=, &t](auto dirty_list) {
return seastar::do_with(
std::move(dirty_list),
[this, &t](auto &dirty_list) {
- return crimson::do_for_each(
+ return trans_intr::do_for_each(
dirty_list,
[this, &t](auto &e) {
logger().debug(
[this] {
return seastar::do_with(
ecb->create_transaction(),
- [this](auto &t) {
- return rewrite_dirty(*t, get_dirty_tail()
- ).safe_then([this, &t] {
- return ecb->submit_transaction_direct(
- std::move(t));
+ [this](auto &tref) {
+ return with_trans_intr(*tref, [this, &tref](auto &t) {
+ return rewrite_dirty(t, get_dirty_tail()
+ ).si_then([this, &tref] {
+ return ecb->submit_transaction_direct(
+ std::move(tref));
+ });
});
});
});
extents.size());
return seastar::do_with(
ecb->create_transaction(),
- [this, &extents](auto &t) mutable {
- return crimson::do_for_each(
- extents,
- [this, &t](auto &extent) {
- auto &[addr, info] = extent;
- logger().debug(
- "SegmentCleaner::gc_reclaim_space: checking extent {}",
- info);
- return ecb->get_extent_if_live(
- *t,
- info.type,
- addr,
- info.addr,
- info.len
- ).safe_then([addr=addr, &t, this](CachedExtentRef ext) {
- if (!ext) {
- logger().debug(
- "SegmentCleaner::gc_reclaim_space: addr {} dead, skipping",
- addr);
- return ExtentCallbackInterface::rewrite_extent_ertr::now();
- } else {
- logger().debug(
- "SegmentCleaner::gc_reclaim_space: addr {} alive, gc'ing {}",
- addr,
- *ext);
- return ecb->rewrite_extent(
- *t,
- ext);
- }
- });
- }
- ).safe_then([this, &t] {
- if (scan_cursor->is_complete()) {
- t->mark_segment_to_release(scan_cursor->get_offset().segment);
- }
- return ecb->submit_transaction_direct(std::move(t));
+ [this, &extents](auto &tref) mutable {
+ return with_trans_intr(*tref, [this, &extents, &tref](auto &t) {
+ return trans_intr::do_for_each(
+ extents,
+ [this, &t](auto &extent) {
+ auto &[addr, info] = extent;
+ logger().debug(
+ "SegmentCleaner::gc_reclaim_space: checking extent {}",
+ info);
+ return ecb->get_extent_if_live(
+ t,
+ info.type,
+ addr,
+ info.addr,
+ info.len
+ ).si_then([addr=addr, &t, this](CachedExtentRef ext) {
+ if (!ext) {
+ logger().debug(
+ "SegmentCleaner::gc_reclaim_space: addr {} dead, skipping",
+ addr);
+ return ExtentCallbackInterface::rewrite_extent_iertr::now();
+ } else {
+ logger().debug(
+ "SegmentCleaner::gc_reclaim_space: addr {} alive, gc'ing {}",
+ addr,
+ *ext);
+ return ecb->rewrite_extent(
+ t,
+ ext);
+ }
+ });
+ }
+ ).si_then([this, &tref] {
+ if (scan_cursor->is_complete()) {
+ tref->mark_segment_to_release(scan_cursor->get_offset().segment);
+ }
+ return ecb->submit_transaction_direct(std::move(tref));
+ });
});
});
});
*
* returns all extents with dirty_from < bound
*/
- using get_next_dirty_extents_ertr = crimson::errorator<>;
- using get_next_dirty_extents_ret = get_next_dirty_extents_ertr::future<
+ using get_next_dirty_extents_iertr = crimson::errorator<>;
+ using get_next_dirty_extents_ret = get_next_dirty_extents_iertr::future<
std::vector<CachedExtentRef>>;
virtual get_next_dirty_extents_ret get_next_dirty_extents(
journal_seq_t bound,///< [in] return extents with dirty_from < bound
size_t max_bytes ///< [in] return up to max_bytes of extents
) = 0;
+
using extent_mapping_ertr = crimson::errorator<
crimson::ct_error::input_output_error,
crimson::ct_error::eagain>;
+ using extent_mapping_iertr = trans_iertr<
+ crimson::errorator<
+ crimson::ct_error::input_output_error>
+ >;
/**
* rewrite_extent
* handle finding the current instance if it is still alive and
* otherwise ignore it.
*/
- using rewrite_extent_ertr = extent_mapping_ertr;
- using rewrite_extent_ret = rewrite_extent_ertr::future<>;
+ using rewrite_extent_iertr = extent_mapping_iertr;
+ using rewrite_extent_ret = rewrite_extent_iertr::future<>;
virtual rewrite_extent_ret rewrite_extent(
Transaction &t,
CachedExtentRef extent) = 0;
* See TransactionManager::get_extent_if_live and
* LBAManager::get_physical_extent_if_live.
*/
- using get_extent_if_live_ertr = extent_mapping_ertr;
- using get_extent_if_live_ret = get_extent_if_live_ertr::future<
+ using get_extent_if_live_iertr = extent_mapping_iertr;
+ using get_extent_if_live_ret = get_extent_if_live_iertr::future<
CachedExtentRef>;
virtual get_extent_if_live_ret get_extent_if_live(
Transaction &t,
*
* Submits transaction without any space throttling.
*/
- using submit_transaction_direct_ertr = crimson::errorator<
- crimson::ct_error::eagain,
- crimson::ct_error::input_output_error
+ using submit_transaction_direct_iertr = trans_iertr<
+ crimson::errorator<
+ crimson::ct_error::input_output_error>
>;
using submit_transaction_direct_ret =
- submit_transaction_direct_ertr::future<>;
+ submit_transaction_direct_iertr::future<>;
virtual submit_transaction_direct_ret submit_transaction_direct(
TransactionRef t) = 0;
};
}
using work_ertr = ExtentCallbackInterface::extent_mapping_ertr;
+ using work_iertr = ExtentCallbackInterface::extent_mapping_iertr;
private:
*
* Writes out dirty blocks dirtied earlier than limit.
*/
- using rewrite_dirty_ertr = ExtentCallbackInterface::extent_mapping_ertr;
- using rewrite_dirty_ret = rewrite_dirty_ertr::future<>;
+ using rewrite_dirty_iertr = work_iertr;
+ using rewrite_dirty_ret = rewrite_dirty_iertr::future<>;
rewrite_dirty_ret rewrite_dirty(
Transaction &t,
journal_seq_t limit);
}
} gc_process;
- using gc_ertr = ExtentCallbackInterface::extent_mapping_ertr::extend_ertr<
+ using gc_ertr = work_ertr::extend_ertr<
ExtentCallbackInterface::scan_extents_ertr
>;
return lba_manager->mkfs(*transaction);
}).safe_then([this, FNAME, &transaction] {
DEBUGT("about to submit_transaction", *transaction);
- return submit_transaction_direct(std::move(transaction)).handle_error(
+ return with_trans_intr(
+ *transaction,
+ [this, FNAME, &transaction](auto&) {
+ return submit_transaction_direct(std::move(transaction));
+ }
+ ).handle_error(
crimson::ct_error::eagain::handle([] {
ceph_assert(0 == "eagain impossible");
return mkfs_ertr::now();
segment_cleaner->set_journal_head(addr);
return seastar::do_with(
create_weak_transaction(),
- [this, FNAME](auto &t) {
- return cache->init_cached_extents(*t, [this](auto &t, auto &e) {
- return lba_manager->init_cached_extent(t, e);
- }).safe_then([this, FNAME, &t] {
- assert(segment_cleaner->debug_check_space(
- *segment_cleaner->get_empty_space_tracker()));
- return lba_manager->scan_mapped_space(
- *t,
- [this, FNAME, &t](paddr_t addr, extent_len_t len) {
- TRACET(
- "marking {}~{} used",
+ [this, FNAME](auto &tref) {
+ return with_trans_intr(
+ *tref,
+ [this, FNAME](auto &t) {
+ return cache->init_cached_extents(t, [this](auto &t, auto &e) {
+ return lba_manager->init_cached_extent(t, e);
+ }).si_then([this, FNAME, &t] {
+ assert(segment_cleaner->debug_check_space(
+ *segment_cleaner->get_empty_space_tracker()));
+ return lba_manager->scan_mapped_space(
t,
- addr,
- len);
- if (addr.is_real()) {
- segment_cleaner->mark_space_used(
- addr,
- len ,
- /* init_scan = */ true);
- }
- });
- });
+ [this, FNAME, &t](paddr_t addr, extent_len_t len) {
+ TRACET(
+ "marking {}~{} used",
+ t,
+ addr,
+ len);
+ if (addr.is_real()) {
+ segment_cleaner->mark_space_used(
+ addr,
+ len ,
+ /* init_scan = */ true);
+ }
+ });
+ });
+ });
});
}).safe_then([this] {
segment_cleaner->complete_init();
Transaction &t,
LogicalCachedExtentRef &ref)
{
- return lba_manager->incref_extent(t, ref->get_laddr()).safe_then([](auto r) {
+ return lba_manager->incref_extent(t, ref->get_laddr()).si_then([](auto r) {
return r.refcount;
- }).handle_error(
- ref_ertr::pass_further{},
+ }).handle_error_interruptible(
+ ref_iertr::pass_further{},
ct_error::all_same_way([](auto e) {
ceph_assert(0 == "unhandled error, TODO");
}));
Transaction &t,
laddr_t offset)
{
- return lba_manager->incref_extent(t, offset).safe_then([](auto result) {
+ return lba_manager->incref_extent(t, offset).si_then([](auto result) {
return result.refcount;
});
}
{
LOG_PREFIX(TransactionManager::dec_ref);
return lba_manager->decref_extent(t, ref->get_laddr()
- ).safe_then([this, FNAME, &t, ref](auto ret) {
+ ).si_then([this, FNAME, &t, ref](auto ret) {
if (ret.refcount == 0) {
DEBUGT(
"extent {} refcount 0",
{
LOG_PREFIX(TransactionManager::dec_ref);
return lba_manager->decref_extent(t, offset
- ).safe_then([this, FNAME, offset, &t](auto result) -> ref_ret {
+ ).si_then([this, FNAME, offset, &t](auto result) -> ref_ret {
if (result.refcount == 0 && !result.addr.is_zero()) {
DEBUGT("offset {} refcount 0", t, offset);
return cache->retire_extent_addr(
t, result.addr, result.length
- ).safe_then([result, this] {
+ ).si_then([result, this] {
stats.extents_retired_total++;
stats.extents_retired_bytes += result.length;
return ref_ret(
- ref_ertr::ready_future_marker{},
+ interruptible::ready_future_marker{},
0);
});
} else {
return ref_ret(
- ref_ertr::ready_future_marker{},
+ interruptible::ready_future_marker{},
result.refcount);
}
});
{
return seastar::do_with(std::move(offsets), std::vector<unsigned>(),
[this, &t] (auto &&offsets, auto &refcnt) {
- return crimson::do_for_each(offsets.begin(), offsets.end(),
+ return trans_intr::do_for_each(offsets.begin(), offsets.end(),
[this, &t, &refcnt] (auto &laddr) {
- return this->dec_ref(t, laddr).safe_then([&refcnt] (auto ref) {
+ return this->dec_ref(t, laddr).si_then([&refcnt] (auto ref) {
refcnt.push_back(ref);
- return ref_ertr::now();
+ return ref_iertr::now();
});
- }).safe_then([&refcnt] {
- return ref_ertr::make_ready_future<std::vector<unsigned>>(std::move(refcnt));
+ }).si_then([&refcnt] {
+ return ref_iertr::make_ready_future<std::vector<unsigned>>(std::move(refcnt));
});
});
}
-TransactionManager::submit_transaction_ertr::future<>
+TransactionManager::submit_transaction_iertr::future<>
TransactionManager::submit_transaction(
TransactionRef t)
{
LOG_PREFIX(TransactionManager::submit_transaction);
DEBUGT("about to await throttle", *t);
auto &tref = *t;
- return tref.handle.enter(write_pipeline.wait_throttle
- ).then([this] {
- return segment_cleaner->await_hard_limits();
- }).then([this, t=std::move(t)]() mutable {
+ return trans_intr::make_interruptible(
+ tref.handle.enter(write_pipeline.wait_throttle)
+ ).then_interruptible([this] {
+ return trans_intr::make_interruptible(segment_cleaner->await_hard_limits());
+ }).then_interruptible([this, t=std::move(t)]() mutable {
return submit_transaction_direct(std::move(t));
});
}
LOG_PREFIX(TransactionManager::submit_transaction_direct);
DEBUGT("about to prepare", *t);
auto &tref = *t;
- return tref.handle.enter(write_pipeline.prepare
- ).then([this, FNAME, &tref]() mutable
- -> submit_transaction_ertr::future<> {
+ return trans_intr::make_interruptible(
+ tref.handle.enter(write_pipeline.prepare)
+ ).then_interruptible([this, FNAME, &tref]() mutable
+ -> submit_transaction_iertr::future<> {
auto record = cache->try_construct_record(tref);
- if (!record) {
- DEBUGT("conflict detected, returning eagain.", tref);
- return crimson::ct_error::eagain::make();
- }
+ assert(record); // interruptible future would have already failed
DEBUGT("about to submit to journal", tref);
}).safe_then([&tref] {
return tref.handle.complete();
}).handle_error(
- submit_transaction_ertr::pass_further{},
+ submit_transaction_iertr::pass_further{},
crimson::ct_error::all_same_way([](auto e) {
ceph_assert(0 == "Hit error submitting to journal");
}));
auto updated = cache->update_extent_from_transaction(t, extent);
if (!updated) {
DEBUGT("{} is already retired, skipping", t, *extent);
- return rewrite_extent_ertr::now();
+ return rewrite_extent_iertr::now();
}
extent = updated;
}
if (extent->get_type() == extent_types_t::ROOT) {
DEBUGT("marking root {} for rewrite", t, *extent);
cache->duplicate_for_write(t, extent);
- return rewrite_extent_ertr::now();
+ return rewrite_extent_iertr::now();
}
return lba_manager->rewrite_extent(t, extent);
}
DEBUGT("type {}, addr {}, laddr {}, len {}", t, type, addr, laddr, len);
return cache->get_extent_if_cached(t, addr
- ).safe_then([this, FNAME, &t, type, addr, laddr, len](auto extent)
- -> get_extent_if_live_ret {
+ ).si_then([this, FNAME, &t, type, addr, laddr, len](auto extent)
+ -> get_extent_if_live_ret {
if (extent) {
- return get_extent_if_live_ret(
- get_extent_if_live_ertr::ready_future_marker{},
+ return get_extent_if_live_ret (
+ interruptible::ready_future_marker{},
extent);
}
if (is_logical_type(type)) {
+ using inner_ret = LBAManager::get_mapping_iertr::future<CachedExtentRef>;
return lba_manager->get_mapping(
t,
- laddr).safe_then([=, &t] (LBAPinRef pin) {
+ laddr).si_then([=, &t] (LBAPinRef pin) -> inner_ret {
ceph_assert(pin->get_laddr() == laddr);
ceph_assert(pin->get_length() == (extent_len_t)len);
if (pin->get_paddr() == addr) {
type,
addr,
laddr,
- len).safe_then(
- [this, pin=std::move(pin)](CachedExtentRef ret) mutable
- -> get_extent_if_live_ret {
+ len).si_then(
+ [this, pin=std::move(pin)](CachedExtentRef ret) mutable {
auto lref = ret->cast<LogicalCachedExtent>();
if (!lref->has_pin()) {
- if (pin->has_been_invalidated() ||
- lref->has_been_invalidated()) {
- return crimson::ct_error::eagain::make();
- } else {
- lref->set_pin(std::move(pin));
- lba_manager->add_pin(lref->get_pin());
- }
+ assert(!(pin->has_been_invalidated() ||
+ lref->has_been_invalidated()));
+ lref->set_pin(std::move(pin));
+ lba_manager->add_pin(lref->get_pin());
}
- return get_extent_if_live_ret(
- get_extent_if_live_ertr::ready_future_marker{},
+ return inner_ret(
+ interruptible::ready_future_marker{},
ret);
});
} else {
- return get_extent_if_live_ret(
- get_extent_if_live_ertr::ready_future_marker{},
+ return inner_ret(
+ interruptible::ready_future_marker{},
CachedExtentRef());
}
- }).handle_error(crimson::ct_error::enoent::handle([] {
- return get_extent_if_live_ret(
- get_extent_if_live_ertr::ready_future_marker{},
- CachedExtentRef());
+ }).handle_error_interruptible(crimson::ct_error::enoent::handle([] {
+ return CachedExtentRef();
}), crimson::ct_error::pass_further_all{});
} else {
DEBUGT("non-logical extent {}", t, addr);
class TransactionManager : public SegmentCleaner::ExtentCallbackInterface {
public:
using base_ertr = Cache::base_ertr;
+ using base_iertr = Cache::base_iertr;
TransactionManager(
SegmentManager &segment_manager,
LBAManagerRef lba_manager);
/// Writes initial metadata to disk
- using mkfs_ertr = crimson::errorator<
- crimson::ct_error::input_output_error
- >;
+ using mkfs_ertr = base_ertr;
mkfs_ertr::future<> mkfs();
/// Reads initial metadata from disk
- using mount_ertr = crimson::errorator<
- crimson::ct_error::input_output_error
- >;
+ using mount_ertr = base_ertr;
mount_ertr::future<> mount();
/// Closes transaction_manager
- using close_ertr = crimson::errorator<
- crimson::ct_error::input_output_error
- >;
+ using close_ertr = base_ertr;
close_ertr::future<> close();
/// Creates empty transaction
* Get the logical pin at offset
*/
using get_pin_ertr = LBAManager::get_mapping_ertr;
- using get_pin_ret = LBAManager::get_mapping_ertr::future<LBAPinRef>;
+ using get_pin_iertr = LBAManager::get_mapping_iertr;
+ using get_pin_ret = LBAManager::get_mapping_iertr::future<LBAPinRef>;
get_pin_ret get_pin(
Transaction &t,
laddr_t offset) {
*
* Get logical pins overlapping offset~length
*/
- using get_pins_ertr = LBAManager::get_mappings_ertr;
- using get_pins_ret = get_pins_ertr::future<lba_pin_list_t>;
+ using get_pins_iertr = LBAManager::get_mappings_iertr;
+ using get_pins_ret = get_pins_iertr::future<lba_pin_list_t>;
get_pins_ret get_pins(
Transaction &t,
laddr_t offset,
*
* Get extent mapped at pin.
*/
- using pin_to_extent_ertr = get_pin_ertr::extend_ertr<
+ using pin_to_extent_iertr = get_pin_iertr::extend_ertr<
SegmentManager::read_ertr>;
template <typename T>
- using pin_to_extent_ret = pin_to_extent_ertr::future<
+ using pin_to_extent_ret = pin_to_extent_iertr::future<
TCachedExtentRef<T>>;
template <typename T>
pin_to_extent_ret<T> pin_to_extent(
t,
pin->get_paddr(),
pin->get_length()
- ).safe_then([this, FNAME, &t, pin=std::move(pin)](auto ref) mutable -> ret {
+ ).si_then([this, FNAME, &t, pin=std::move(pin)](auto ref) mutable -> ret {
if (!ref->has_pin()) {
- if (pin->has_been_invalidated() || ref->has_been_invalidated()) {
- return crimson::ct_error::eagain::make();
- } else {
- ref->set_pin(std::move(pin));
- lba_manager->add_pin(ref->get_pin());
- }
+ assert(!(pin->has_been_invalidated() || ref->has_been_invalidated()));
+ ref->set_pin(std::move(pin));
+ lba_manager->add_pin(ref->get_pin());
}
DEBUGT("got extent {}", t, *ref);
return pin_to_extent_ret<T>(
- pin_to_extent_ertr::ready_future_marker{},
+ interruptible::ready_future_marker{},
std::move(ref));
});
}
*
* Read extent of type T at offset~length
*/
- using read_extent_ertr = get_pin_ertr::extend_ertr<
+ using read_extent_iertr = get_pin_iertr::extend_ertr<
SegmentManager::read_ertr>;
template <typename T>
- using read_extent_ret = read_extent_ertr::future<
+ using read_extent_ret = read_extent_iertr::future<
TCachedExtentRef<T>>;
template <typename T>
read_extent_ret<T> read_extent(
LOG_PREFIX(TransactionManager::read_extent);
return get_pin(
t, offset
- ).safe_then([this, FNAME, &t, offset, length] (auto pin) {
+ ).si_then([this, FNAME, &t, offset, length] (auto pin) {
if (length != pin->get_length() || !pin->get_paddr().is_real()) {
ERRORT("offset {} len {} got wrong pin {}",
t, offset, length, *pin);
LOG_PREFIX(TransactionManager::read_extent);
return get_pin(
t, offset
- ).safe_then([this, FNAME, &t, offset] (auto pin) {
+ ).si_then([this, FNAME, &t, offset] (auto pin) {
if (!pin->get_paddr().is_real()) {
ERRORT("offset {} got wrong pin {}",
t, offset, *pin);
}
- using ref_ertr = LBAManager::ref_ertr;
- using ref_ret = ref_ertr::future<unsigned>;
+ using ref_iertr = LBAManager::ref_iertr;
+ using ref_ret = ref_iertr::future<unsigned>;
/// Add refcount for ref
ref_ret inc_ref(
laddr_t offset);
/// remove refcount for list of offset
- using refs_ret = ref_ertr::future<std::vector<unsigned>>;
+ using refs_ret = ref_iertr::future<std::vector<unsigned>>;
refs_ret dec_ref(
Transaction &t,
std::vector<laddr_t> offsets);
* Allocates a new block of type T with the minimum lba range of size len
* greater than hint.
*/
- using alloc_extent_ertr = LBAManager::alloc_extent_ertr;
+ using alloc_extent_iertr = LBAManager::alloc_extent_iertr;
template <typename T>
- using alloc_extent_ret = alloc_extent_ertr::future<TCachedExtentRef<T>>;
+ using alloc_extent_ret = alloc_extent_iertr::future<TCachedExtentRef<T>>;
template <typename T>
alloc_extent_ret<T> alloc_extent(
Transaction &t,
hint,
len,
ext->get_paddr()
- ).safe_then([ext=std::move(ext), len, this](auto &&ref) mutable {
+ ).si_then([ext=std::move(ext), len, this](auto &&ref) mutable {
ext->set_pin(std::move(ref));
stats.extents_allocated_total++;
stats.extents_allocated_bytes += len;
- return alloc_extent_ertr::make_ready_future<TCachedExtentRef<T>>(
+ return alloc_extent_iertr::make_ready_future<TCachedExtentRef<T>>(
std::move(ext));
});
}
- using reserve_extent_ertr = alloc_extent_ertr;
- using reserve_extent_ret = reserve_extent_ertr::future<LBAPinRef>;
+ using reserve_extent_iertr = alloc_extent_iertr;
+ using reserve_extent_ret = reserve_extent_iertr::future<LBAPinRef>;
reserve_extent_ret reserve_region(
Transaction &t,
laddr_t hint,
zero_paddr());
}
- using find_hole_ertr = LBAManager::find_hole_ertr;
- using find_hole_ret = LBAManager::find_hole_ertr::future<
+ using find_hole_iertr = LBAManager::find_hole_iertr;
+ using find_hole_ret = LBAManager::find_hole_iertr::future<
std::pair<laddr_t, extent_len_t>
>;
find_hole_ret find_hole(
*
* allocates more than one new blocks of type T.
*/
- using alloc_extents_ertr = alloc_extent_ertr;
+ using alloc_extents_iertr = alloc_extent_iertr;
template<class T>
- alloc_extents_ertr::future<std::vector<TCachedExtentRef<T>>>
+ alloc_extents_iertr::future<std::vector<TCachedExtentRef<T>>>
alloc_extents(
Transaction &t,
laddr_t hint,
int num) {
return seastar::do_with(std::vector<TCachedExtentRef<T>>(),
[this, &t, hint, len, num] (auto &extents) {
- return crimson::do_for_each(
+ return trans_intr::do_for_each(
boost::make_counting_iterator(0),
boost::make_counting_iterator(num),
[this, &t, len, hint, &extents] (auto i) {
- return alloc_extent<T>(t, hint, len).safe_then(
+ return alloc_extent<T>(t, hint, len).si_then(
[&extents](auto &&node) {
extents.push_back(node);
});
- }).safe_then([&extents] {
- return alloc_extents_ertr::make_ready_future
+ }).si_then([&extents] {
+ return alloc_extents_iertr::make_ready_future
<std::vector<TCachedExtentRef<T>>>(std::move(extents));
});
});
*
* Atomically submits transaction to persistence
*/
- using submit_transaction_ertr = crimson::errorator<
- crimson::ct_error::eagain, // Caller should retry transaction from beginning
- crimson::ct_error::input_output_error // Media error
- >;
- submit_transaction_ertr::future<> submit_transaction(TransactionRef);
+ using submit_transaction_iertr = base_iertr;
+ submit_transaction_iertr::future<> submit_transaction(TransactionRef);
/// SegmentCleaner::ExtentCallbackInterface
using SegmentCleaner::ExtentCallbackInterface::submit_transaction_direct_ret;
*
* Read root block meta entry for key.
*/
- using read_root_meta_ertr = base_ertr;
+ using read_root_meta_iertr = base_iertr;
using read_root_meta_bare = std::optional<std::string>;
- using read_root_meta_ret = read_root_meta_ertr::future<
+ using read_root_meta_ret = read_root_meta_iertr::future<
read_root_meta_bare>;
read_root_meta_ret read_root_meta(
Transaction &t,
const std::string &key) {
return cache->get_root(
t
- ).safe_then([&key](auto root) {
+ ).si_then([&key](auto root) {
auto meta = root->root.get_meta();
auto iter = meta.find(key);
if (iter == meta.end()) {
*
* Update root block meta entry for key to value.
*/
- using update_root_meta_ertr = base_ertr;
- using update_root_meta_ret = update_root_meta_ertr::future<>;
+ using update_root_meta_iertr = base_iertr;
+ using update_root_meta_ret = update_root_meta_iertr::future<>;
update_root_meta_ret update_root_meta(
Transaction& t,
const std::string& key,
const std::string& value) {
return cache->get_root(
t
- ).safe_then([this, &t, &key, &value](RootBlockRef root) {
+ ).si_then([this, &t, &key, &value](RootBlockRef root) {
root = cache->duplicate_for_write(t, root)->cast<RootBlock>();
auto meta = root->root.get_meta();
*
* Get onode-tree root logical address
*/
- using read_onode_root_ertr = base_ertr;
- using read_onode_root_ret = read_onode_root_ertr::future<laddr_t>;
+ using read_onode_root_iertr = base_iertr;
+ using read_onode_root_ret = read_onode_root_iertr::future<laddr_t>;
read_onode_root_ret read_onode_root(Transaction &t) {
- return cache->get_root(t).safe_then([](auto croot) {
+ return cache->get_root(t).si_then([](auto croot) {
laddr_t ret = croot->get_root().onode_root;
return ret;
});
*
* Get collection root addr
*/
- using read_collection_root_ertr = base_ertr;
- using read_collection_root_ret = read_collection_root_ertr::future<
+ using read_collection_root_iertr = base_iertr;
+ using read_collection_root_ret = read_collection_root_iertr::future<
coll_root_t>;
read_collection_root_ret read_collection_root(Transaction &t) {
- return cache->get_root(t).safe_then([](auto croot) {
+ return cache->get_root(t).si_then([](auto croot) {
return croot->get_root().collection_root.get();
});
}
SegmentManager &segment_manager;
SegmentCleanerRef segment_cleaner;
- InterruptedCacheRef cache;
+ CacheRef cache;
LBAManagerRef lba_manager;
JournalRef journal;
};
using TransactionManagerRef = std::unique_ptr<TransactionManager>;
+#define FORWARD(METHOD) \
+ template <typename... Args> \
+ auto METHOD(Args&&... args) const { \
+ return tm.METHOD(std::forward<Args>(args)...); \
+ }
+
+#define PARAM_FORWARD(METHOD) \
+ template <typename T, typename... Args> \
+ auto METHOD(Args&&... args) const { \
+ return tm.METHOD<T>(std::forward<Args>(args)...); \
+ }
+
+#define INT_FORWARD(METHOD) \
+ template <typename... Args> \
+ auto METHOD(Transaction &t, Args&&... args) const { \
+ return with_trans_intr( \
+ t, \
+ [this](auto&&... args) { \
+ return tm.METHOD(std::forward<decltype(args)>(args)...); \
+ }, \
+ std::forward<Args>(args)...); \
+ }
+
+#define PARAM_INT_FORWARD(METHOD) \
+ template <typename T, typename... Args> \
+ auto METHOD(Transaction &t, Args&&... args) const { \
+ return with_trans_intr( \
+ t, \
+ [this](auto&&... args) { \
+ return tm.METHOD<T>(std::forward<decltype(args)>(args)...); \
+ }, \
+ std::forward<Args>(args)...); \
+ }
+
+/// Temporary translator to non-interruptible futures
+class InterruptedTransactionManager {
+ TransactionManager &tm;
+public:
+ InterruptedTransactionManager(const InterruptedTransactionManager &) = default;
+ InterruptedTransactionManager(InterruptedTransactionManager &&) = default;
+ InterruptedTransactionManager(TransactionManager &tm) : tm(tm) {}
+
+ FORWARD(mkfs)
+ FORWARD(mount)
+ FORWARD(close)
+ FORWARD(create_transaction)
+ FORWARD(create_weak_transaction)
+ INT_FORWARD(get_pin)
+ INT_FORWARD(get_pins)
+ PARAM_INT_FORWARD(pin_to_extent)
+ PARAM_INT_FORWARD(read_extent)
+ FORWARD(get_mutable_extent)
+ INT_FORWARD(inc_ref)
+ INT_FORWARD(dec_ref)
+ PARAM_INT_FORWARD(alloc_extent)
+ INT_FORWARD(reserve_region)
+ INT_FORWARD(find_hole)
+ PARAM_INT_FORWARD(alloc_extents)
+
+
+ auto submit_transaction(TransactionRef t) const {
+ return with_trans_intr(
+ *t,
+ [this, t=std::move(t)](auto &) mutable {
+ return tm.submit_transaction(std::move(t));
+ });
+ }
+
+ INT_FORWARD(read_root_meta)
+ INT_FORWARD(update_root_meta)
+ INT_FORWARD(read_onode_root)
+ FORWARD(write_onode_root)
+ INT_FORWARD(read_collection_root)
+ FORWARD(write_collection_root)
+ FORWARD(get_block_size)
+ FORWARD(store_stat)
+
+ FORWARD(get_segment_cleaner)
+ FORWARD(get_lba_manager)
+};
+
+class InterruptedTMRef {
+ std::unique_ptr<TransactionManager> ref;
+ std::optional<InterruptedTransactionManager> itm;
+public:
+ InterruptedTMRef() {}
+
+ template <typename... T>
+ InterruptedTMRef(T&&... args)
+ : ref(std::make_unique<TransactionManager>(std::forward<T>(args)...)),
+ itm(*ref) {}
+
+ InterruptedTMRef(std::unique_ptr<TransactionManager> tm)
+ : ref(std::move(tm)), itm(*ref) {}
+
+ InterruptedTMRef(InterruptedTMRef &&itmr)
+ : ref(std::move(itmr.ref)), itm(*ref) {}
+
+ InterruptedTMRef &operator=(std::unique_ptr<TransactionManager> tm) {
+ this->~InterruptedTMRef();
+ new (this) InterruptedTMRef(std::move(tm));
+ return *this;
+ }
+
+ InterruptedTMRef &operator=(InterruptedTMRef &&rhs) {
+ this->~InterruptedTMRef();
+ new (this) InterruptedTMRef(std::move(rhs));
+ return *this;
+ }
+
+ void reset() {
+ itm = std::nullopt;
+ ref.reset();
+ }
+
+ auto &operator*() const {
+ return *itm;
+ }
+
+ auto operator->() const {
+ return &*itm;
+ }
+};
+
+
+
}
journal->set_segment_provider(&*segment_cleaner);
- tm = std::make_unique<TransactionManager>(
+ tm = InterruptedTMRef(
*segment_manager,
std::move(segment_cleaner),
std::move(journal),
std::unique_ptr<BlockSegmentManager> segment_manager;
using TransactionManager = crimson::os::seastore::TransactionManager;
- std::unique_ptr<TransactionManager> tm;
+ using TMRef = crimson::os::seastore::InterruptedTMRef;
+ TMRef tm;
seastar::future<> mkfs();
void init();
void clear();
- using read_extents_ertr = TransactionManager::read_extent_ertr;
+ using read_extents_ertr = crimson::os::seastore::with_trans_ertr<
+ TransactionManager::read_extent_iertr>;
using read_extents_ret = read_extents_ertr::future<
crimson::os::seastore::lextent_list_t<crimson::os::seastore::TestBlock>
>;
laddr_t hint,
size_t len,
paddr_t paddr) {
- auto ret = lba_manager->alloc_extent(*t.t, hint, len, paddr).unsafe_get0();
+ auto ret = with_trans_intr(
+ *t.t,
+ [=](auto &t) {
+ return lba_manager->alloc_extent(t, hint, len, paddr);
+ }).unsafe_get0();
logger().debug("alloc'd: {}", *ret);
EXPECT_EQ(len, ret->get_length());
auto [b, e] = get_overlap(t, ret->get_laddr(), len);
auto [b, e] = get_overlap(t, addr, len);
EXPECT_EQ(b, e);
- auto ret = lba_manager->set_extent(*t.t, addr, len, paddr).unsafe_get0();
+ auto ret = with_trans_intr(
+ *t.t,
+ [=](auto &t) {
+ return lba_manager->set_extent(t, addr, len, paddr);
+ }).unsafe_get0();
EXPECT_EQ(addr, ret->get_laddr());
EXPECT_EQ(len, ret->get_length());
EXPECT_EQ(paddr, ret->get_paddr());
ceph_assert(target->second.refcount > 0);
target->second.refcount--;
- auto refcnt = lba_manager->decref_extent(
+ auto refcnt = with_trans_intr(
*t.t,
- target->first).unsafe_get0().refcount;
+ [=](auto &t) {
+ return lba_manager->decref_extent(
+ t,
+ target->first);
+ }).unsafe_get0().refcount;
EXPECT_EQ(refcnt, target->second.refcount);
if (target->second.refcount == 0) {
t.mappings.erase(target);
test_lba_mapping_t::iterator target) {
ceph_assert(target->second.refcount > 0);
target->second.refcount++;
- auto refcnt = lba_manager->incref_extent(
+ auto refcnt = with_trans_intr(
*t.t,
- target->first).unsafe_get0().refcount;
+ [=](auto &t) {
+ return lba_manager->incref_extent(
+ t,
+ target->first);
+ }).unsafe_get0().refcount;
EXPECT_EQ(refcnt, target->second.refcount);
}
auto laddr = i.first;
auto len = i.second.len;
- auto ret_list = lba_manager->get_mappings(
- *t.t, laddr, len
- ).unsafe_get0();
+ auto ret_list = with_trans_intr(
+ *t.t,
+ [=](auto &t) {
+ return lba_manager->get_mappings(
+ t, laddr, len);
+ }).unsafe_get0();
EXPECT_EQ(ret_list.size(), 1);
auto &ret = *ret_list.begin();
EXPECT_EQ(i.second.addr, ret->get_paddr());
EXPECT_EQ(laddr, ret->get_laddr());
EXPECT_EQ(len, ret->get_length());
- auto ret_pin = lba_manager->get_mapping(
- *t.t, laddr).unsafe_get0();
+ auto ret_pin = with_trans_intr(
+ *t.t,
+ [=](auto &t) {
+ return lba_manager->get_mapping(
+ t, laddr);
+ }).unsafe_get0();
EXPECT_EQ(i.second.addr, ret_pin->get_paddr());
EXPECT_EQ(laddr, ret_pin->get_laddr());
EXPECT_EQ(len, ret_pin->get_length());
}
- lba_manager->scan_mappings(
+ with_trans_intr(
*t.t,
- 0,
- L_ADDR_MAX,
- [iter=t.mappings.begin(), &t](auto l, auto p, auto len) mutable {
- EXPECT_NE(iter, t.mappings.end());
- EXPECT_EQ(l, iter->first);
- EXPECT_EQ(p, iter->second.addr);
- EXPECT_EQ(len, iter->second.len);
- ++iter;
+ [=, &t](auto &) {
+ return lba_manager->scan_mappings(
+ *t.t,
+ 0,
+ L_ADDR_MAX,
+ [iter=t.mappings.begin(), &t](auto l, auto p, auto len) mutable {
+ EXPECT_NE(iter, t.mappings.end());
+ EXPECT_EQ(l, iter->first);
+ EXPECT_EQ(p, iter->second.addr);
+ EXPECT_EQ(len, iter->second.len);
+ ++iter;
+ });
}).unsafe_get();
}
};
bool check_usage() {
auto t = create_weak_transaction();
SpaceTrackerIRef tracker(segment_cleaner->get_empty_space_tracker());
- lba_manager->scan_mapped_space(
+ with_trans_intr(
*t.t,
- [&tracker](auto offset, auto len) {
- tracker->allocate(
- offset.segment,
- offset.offset,
- len);
+ [this, &tracker](auto &t) {
+ return lba_manager->scan_mapped_space(
+ t,
+ [&tracker](auto offset, auto len) {
+ tracker->allocate(
+ offset.segment,
+ offset.offset,
+ len);
+ });
}).unsafe_get0();
return segment_cleaner->debug_check_space(*tracker);
}
ceph_assert(test_mappings.contains(addr, t.mapping_delta));
ceph_assert(test_mappings.get(addr, t.mapping_delta).desc.len == len);
- using ertr = TransactionManager::read_extent_ertr;
+ using ertr = with_trans_ertr<TransactionManager::read_extent_iertr>;
using ret = ertr::future<TestBlockRef>;
auto ext = tm->read_extent<TestBlock>(
*t.t, addr, len
auto ext = get_extent(t, i.first, i.second.desc.len);
EXPECT_EQ(i.second, ext->get_desc());
}
- lba_manager->scan_mappings(
+ with_trans_intr(
*t.t,
- 0,
- L_ADDR_MAX,
- [iter=overlay.begin(), &overlay](auto l, auto p, auto len) mutable {
- EXPECT_NE(iter, overlay.end());
- logger().debug(
- "check_mappings: scan {}",
- l);
- EXPECT_EQ(l, iter->first);
- ++iter;
+ [this, &overlay](auto &t) {
+ return lba_manager->scan_mappings(
+ t,
+ 0,
+ L_ADDR_MAX,
+ [iter=overlay.begin(), &overlay](auto l, auto p, auto len) mutable {
+ EXPECT_NE(iter, overlay.end());
+ logger().debug(
+ "check_mappings: scan {}",
+ l);
+ EXPECT_EQ(l, iter->first);
+ ++iter;
+ });
}).unsafe_get0();
}
bool try_submit_transaction(test_transaction_t t) {
- using ertr = TransactionManager::submit_transaction_ertr;
+ using ertr = with_trans_ertr<TransactionManager::submit_transaction_iertr>;
using ret = ertr::future<bool>;
bool success = tm->submit_transaction(std::move(t.t)
).safe_then([]() -> ret {
class TMTestState : public EphemeralTestState {
protected:
- std::unique_ptr<TransactionManager> tm;
+ InterruptedTMRef tm;
LBAManager *lba_manager;
SegmentCleaner *segment_cleaner;