add_extent(root);
}
-Cache::mkfs_ertr::future<> Cache::mkfs(Transaction &t)
+Cache::mkfs_iertr::future<> Cache::mkfs(Transaction &t)
{
- return with_trans_intr(
- t,
- [this](auto &t) {
- return get_root(t).si_then([this, &t](auto croot) {
- duplicate_for_write(t, croot);
- return base_ertr::now();
- });
- }).handle_error(
- mkfs_ertr::pass_further{},
- crimson::ct_error::assert_all{
- "Invalid error in Cache::mkfs"
- }
- );
+ return get_root(t).si_then([this, &t](auto croot) {
+ duplicate_for_write(t, croot);
+ return mkfs_iertr::now();
+ }).handle_error_interruptible(
+ mkfs_iertr::pass_further{},
+ crimson::ct_error::assert_all{
+ "Invalid error in Cache::mkfs"
+ }
+ );
}
Cache::close_ertr::future<> Cache::close()
* Alloc initial root node and add to t. The intention is for other
* components to use t to adjust the resulting root ref prior to commit.
*/
- using mkfs_ertr = crimson::errorator<
- crimson::ct_error::input_output_error>;
- mkfs_ertr::future<> mkfs(Transaction &t);
+ using mkfs_iertr = base_iertr;
+ mkfs_iertr::future<> mkfs(Transaction &t);
/**
* close
*/
class LBAManager {
public:
- using base_ertr = Cache::base_ertr;
using base_iertr = Cache::base_iertr;
- using mkfs_ertr = crimson::errorator<
- crimson::ct_error::input_output_error>;
- using mkfs_ret = mkfs_ertr::future<>;
+ using mkfs_iertr = base_iertr;
+ using mkfs_ret = mkfs_iertr::future<>;
virtual mkfs_ret mkfs(
Transaction &t
) = 0;
*
* Future will not resolve until all pins have resolved (set_paddr called)
*/
- using get_mappings_iertr = trans_iertr<base_ertr>;
+ using get_mappings_iertr = base_iertr;
using get_mappings_ret = get_mappings_iertr::future<lba_pin_list_t>;
virtual get_mappings_ret get_mappings(
Transaction &t,
*
* Future will not resolve until the pin has resolved (set_paddr called)
*/
- using get_mapping_ertr = base_ertr::extend<
- crimson::ct_error::enoent>;
using get_mapping_iertr = base_iertr::extend<
crimson::ct_error::enoent>;
using get_mapping_ret = get_mapping_iertr::future<LBAPinRef>;
/**
* Finds unmapped laddr extent of len len
*/
- using find_hole_ertr = base_ertr;
using find_hole_iertr = base_iertr;
using find_hole_ret = find_hole_iertr::future<
std::pair<laddr_t, extent_len_t>
* This mapping will block from transaction submission until set_paddr
* is called on the LBAPin.
*/
- using alloc_extent_ertr = base_ertr;
using alloc_extent_iertr = base_iertr;
using alloc_extent_ret = alloc_extent_iertr::future<LBAPinRef>;
virtual alloc_extent_ret alloc_extent(
paddr_t addr;
extent_len_t length = 0;
};
- using ref_ertr = base_ertr::extend<
- crimson::ct_error::enoent>;
using ref_iertr = base_iertr::extend<
crimson::ct_error::enoent>;
using ref_ret = ref_iertr::future<ref_update_result_t>;
Transaction &t)
{
logger().debug("BtreeLBAManager::mkfs");
- return with_trans_intr(
- t,
- [this](auto &t) {
- return cache.get_root(t).si_then([this, &t](auto croot) {
- auto root_leaf = cache.alloc_new_extent<LBALeafNode>(
- t,
- LBA_BLOCK_SIZE);
- root_leaf->set_size(0);
- lba_node_meta_t meta{0, L_ADDR_MAX, 1};
- root_leaf->set_meta(meta);
- root_leaf->pin.set_range(meta);
- croot->get_root().lba_root =
- lba_root_t{root_leaf->get_paddr(), 1u};
- return mkfs_ertr::now();
- });
- }).handle_error(
- mkfs_ertr::pass_further{},
- crimson::ct_error::assert_all{
- "Invalid error in BtreeLBAManager::mkfs"
- }
- );
+ return cache.get_root(t).si_then([this, &t](auto croot) {
+ auto root_leaf = cache.alloc_new_extent<LBALeafNode>(
+ t,
+ LBA_BLOCK_SIZE);
+ root_leaf->set_size(0);
+ lba_node_meta_t meta{0, L_ADDR_MAX, 1};
+ root_leaf->set_meta(meta);
+ root_leaf->pin.set_range(meta);
+ croot->get_root().lba_root =
+ lba_root_t{root_leaf->get_paddr(), 1u};
+ return mkfs_iertr::now();
+ }).handle_error_interruptible(
+ mkfs_iertr::pass_further{},
+ crimson::ct_error::assert_all{
+ "Invalid error in BtreeLBAManager::mkfs"
+ }
+ );
}
BtreeLBAManager::get_root_ret
namespace crimson::os::seastore {
class OnodeManager {
- using base_ertr = crimson::errorator<
- crimson::ct_error::eagain>;
-
- using base_iertr = trans_iertr<base_ertr>;
+ using base_iertr = TransactionManager::base_iertr;
public:
- using mkfs_ertr = base_ertr;
- using mkfs_ret = mkfs_ertr::future<>;
+ using mkfs_iertr = base_iertr;
+ using mkfs_ret = mkfs_iertr::future<>;
virtual mkfs_ret mkfs(Transaction &t) = 0;
using contains_onode_iertr = base_iertr;
namespace crimson::os::seastore::onode {
-using eagain_ertr = crimson::errorator<
- crimson::ct_error::eagain>;
-template <class ValueT=void>
-using eagain_future = eagain_ertr::future<ValueT>;
-
-using eagain_iertr = trans_iertr<eagain_ertr>;
+using eagain_iertr = trans_iertr<
+ crimson::errorator<crimson::ct_error::input_output_error> >;
template <class ValueT=void>
using eagain_ifuture = eagain_iertr::future<ValueT>;
make_root(c, std::move(_super));
}
-eagain_future<> Node::mkfs(context_t c, RootNodeTracker& root_tracker)
+eagain_ifuture<> Node::mkfs(context_t c, RootNodeTracker& root_tracker)
{
LOG_PREFIX(OTree::Node::mkfs);
- return with_trans_intr(
- c.t,
- [c, &root_tracker](auto &t) {
- return LeafNode::allocate_root(c, root_tracker);
- }
- ).safe_then([c, FNAME](auto ret) {
+ return LeafNode::allocate_root(c, root_tracker
+ ).si_then([c, FNAME](auto ret) {
INFOT("allocated root {}", c.t, ret->get_name());
});
}
ceph_abort("fatal error");
})
).si_then([c, &root_tracker, FNAME](auto&& _super) {
+ assert(_super);
auto root_addr = _super->get_root_laddr();
assert(root_addr != L_ADDR_NULL);
TRACET("loading root_addr={:x} ...", c.t, root_addr);
})
).si_then([FNAME, c, addr, expect_is_level_tail](auto extent)
-> eagain_ifuture<Ref<Node>> {
+ assert(extent);
auto header = extent->get_header();
auto field_type = header.get_field_type();
if (!field_type) {
eagain_iertr::pass_further{},
crimson::ct_error::assert_all{"Invalid error during test clone"}
).si_then([c_other, cloned_root](auto&& super_other) {
+ assert(super_other);
cloned_root->make_root_new(c_other, std::move(super_other));
return cloned_root;
});
eagain_iertr::pass_further{},
crimson::ct_error::assert_all{"Invalid error during test clone"}
).si_then([c_other, cloned_root](auto&& super_other) {
+ assert(super_other);
cloned_root->make_root_new(c_other, std::move(super_other));
});
}).si_then([this_ref]{});
ceph_abort("fatal error");
})
).si_then([c, root](auto&& super) {
+ assert(super);
root->make_root_new(c, std::move(super));
return root;
});
const std::string& get_name() const;
/// Initializes the tree by allocating an empty root node.
- static eagain_future<> mkfs(context_t, RootNodeTracker&);
+ static eagain_ifuture<> mkfs(context_t, RootNodeTracker&);
/// Loads the tree root. The tree must be initialized.
static eagain_ifuture<Ref<Node>> load_root(context_t, RootNodeTracker&);
).si_then([this, c, FNAME] (auto fresh_extent) {
DEBUGT("update addr from {:#x} to {:#x} ...",
c.t, extent->get_laddr(), fresh_extent->get_laddr());
+ assert(fresh_extent);
assert(fresh_extent->is_initial_pending());
assert(fresh_extent->get_recorder() == nullptr);
assert(get_length() == fresh_extent->get_length());
ceph_abort("fatal error");
})
);
- }).si_then([this] {
+ }).si_then([this, c] {
+ assert(!c.t.is_conflicted());
return *mut;
});
}
ERRORT("ENOENT -- addr={:x}", c.t, addr);
ceph_abort("fatal error");
})
+#ifndef NDEBUG
+ ).si_then([this, c] {
+ assert(!c.t.is_conflicted());
+ }
+#endif
);
}
using crimson::os::seastore::TransactionManager;
class NodeExtentManager {
- using base_ertr = eagain_ertr::extend<
- crimson::ct_error::input_output_error>;
-
- using eagain_iertr = trans_iertr<eagain_ertr>;
- using base_iertr = eagain_iertr::extend<
- crimson::ct_error::input_output_error>;
+ using base_iertr = TransactionManager::base_iertr;
public:
virtual ~NodeExtentManager() = default;
if constexpr (INJECT_EAGAIN) {
if (trigger_eagain()) {
DEBUGT("reading at {:#x}: trigger eagain", t, addr);
- return crimson::ct_error::eagain::make();
+ t.test_set_conflict();
+ return read_iertr::make_ready_future<NodeExtentRef>();
}
}
return tm.read_extent<SeastoreNodeExtent>(t, addr
if constexpr (INJECT_EAGAIN) {
if (trigger_eagain()) {
DEBUGT("allocating {}B: trigger eagain", t, len);
- return crimson::ct_error::eagain::make();
+ t.test_set_conflict();
+ return alloc_iertr::make_ready_future<NodeExtentRef>();
}
}
return tm.alloc_extent<SeastoreNodeExtent>(t, addr_min, len
if (trigger_eagain()) {
DEBUGT("retiring {}B at {:#x} -- {} : trigger eagain",
t, len, addr, *extent);
- return crimson::ct_error::eagain::make();
+ t.test_set_conflict();
+ return retire_iertr::now();
}
}
return tm.dec_ref(t, extent).si_then([addr, len, &t] (unsigned cnt) {
if constexpr (INJECT_EAGAIN) {
if (trigger_eagain()) {
DEBUGT("get root: trigger eagain", t);
- return crimson::ct_error::eagain::make();
+ t.test_set_conflict();
+ return getsuper_iertr::make_ready_future<Super::URef>();
}
}
return tm.read_onode_root(t).si_then([this, &t, &tracker](auto root_addr) {
ceph_abort("fatal error");
})
).si_then([is_level_tail, level](auto extent) {
+ assert(extent);
assert(extent->is_initial_pending());
auto mut = extent->get_mutable();
node_stage_t::bootstrap_extent(
Btree& operator=(const Btree&) = delete;
Btree& operator=(Btree&&) = delete;
- eagain_future<> mkfs(Transaction& t) {
+ eagain_ifuture<> mkfs(Transaction& t) {
return Node::mkfs(get_context(t), *root_tracker);
}
}).safe_then([this] {
return transaction_manager->mount();
}).safe_then([this] {
- return seastar::do_with(
- transaction_manager->create_transaction(
- Transaction::src_t::MUTATE),
- [this](auto &t) {
- return onode_manager->mkfs(*t
- ).safe_then([this, &t] {
- return with_trans_intr(
- *t,
- [this](auto &t) {
- return collection_manager->mkfs(t);
- });
- }).safe_then([this, &t](auto coll_root) {
- transaction_manager->write_collection_root(
- *t,
- coll_root);
- return transaction_manager->submit_transaction(
- *t);
- });
+ return transaction_manager->with_transaction_intr(
+ Transaction::src_t::MUTATE, [this](auto& t) {
+ return onode_manager->mkfs(t
+ ).si_then([this, &t] {
+ return collection_manager->mkfs(t);
+ }).si_then([this, &t](auto coll_root) {
+ transaction_manager->write_collection_root(
+ t, coll_root);
+ return transaction_manager->submit_transaction(t);
});
+ });
}).safe_then([this] {
return umount();
}).handle_error(
return seastar::do_with(
RetType(),
[this, start, end, limit] (auto& ret) {
- return repeat_eagain2([this, start, end, limit, &ret] {
- return seastar::do_with(
- transaction_manager->create_transaction(
- Transaction::src_t::READ),
- [this, start, end, limit, &ret] (auto& t) {
- return with_trans_intr(
- *t,
- [this, start, end, limit](auto &t) {
- return onode_manager->list_onodes(t, start, end, limit);
- }).safe_then([&ret] (auto&& _ret) {
- ret = std::move(_ret);
- });
+ return repeat_eagain([this, start, end, limit, &ret] {
+ return transaction_manager->with_transaction_intr(
+ Transaction::src_t::READ,
+ [this, start, end, limit](auto &t) {
+ return onode_manager->list_onodes(t, start, end, limit);
+ }).safe_then([&ret](auto&& _ret) {
+ ret = std::move(_ret);
});
- }).then([&ret] {
+ }).safe_then([&ret] {
return std::move(ret);
- });
+ }).handle_error(
+ crimson::ct_error::assert_all{
+ "Invalid error in SeaStore::list_objects"
+ }
+ );
});
}
std::vector<coll_t>(),
[this](auto &ret) {
return repeat_eagain([this, &ret] {
-
- return seastar::do_with(
- transaction_manager->create_transaction(
- Transaction::src_t::READ),
- [this, &ret](auto &t) {
- return transaction_manager->read_collection_root(*t
- ).safe_then([this, &t](auto coll_root) {
- return with_trans_intr(
- *t,
- [this, &coll_root](auto &t) {
- return collection_manager->list(
- coll_root,
- t);
- });
- }).safe_then([&ret](auto colls) {
- ret.resize(colls.size());
- std::transform(
- colls.begin(), colls.end(), ret.begin(),
- [](auto p) { return p.first; });
- });
- });
+ return transaction_manager->with_transaction_intr(
+ Transaction::src_t::READ,
+ [this, &ret](auto& t) {
+ return transaction_manager->read_collection_root(t
+ ).si_then([this, &t](auto coll_root) {
+ return collection_manager->list(coll_root, t);
+ }).si_then([&ret](auto colls) {
+ ret.resize(colls.size());
+ std::transform(
+ colls.begin(), colls.end(), ret.begin(),
+ [](auto p) { return p.first; });
+ });
+ });
}).safe_then([&ret] {
- return seastar::make_ready_future<std::vector<coll_t>>(ret);
+ return seastar::make_ready_future<std::vector<coll_t>>(ret);
});
- }).handle_error(
- crimson::ct_error::assert_all{
- "Invalid error in SeaStore::list_collections"
- }
- );
+ }
+ ).handle_error(
+ crimson::ct_error::assert_all{
+ "Invalid error in SeaStore::list_collections"
+ }
+ );
}
SeaStore::read_errorator::future<ceph::bufferlist> SeaStore::read(
oid,
Transaction::src_t::READ,
op_type_t::READ,
- [=](auto &t, auto &onode)
- -> with_trans_ertr<ObjectDataHandler::read_iertr>::future<bufferlist> {
+ [=](auto &t, auto &onode) -> ObjectDataHandler::read_ret {
size_t size = onode.get_layout().size;
if (offset >= size) {
size - offset :
std::min(size - offset, len);
- return with_trans_intr(t, [&](auto &t) {
- return ObjectDataHandler().read(
- ObjectDataHandler::context_t{
- transaction_manager->get_tm(),
- t,
- onode,
- },
- offset,
- corrected_len);
- });
+ return ObjectDataHandler().read(
+ ObjectDataHandler::context_t{
+ *transaction_manager,
+ t,
+ onode,
+ },
+ offset,
+ corrected_len);
});
}
oid,
Transaction::src_t::READ,
op_type_t::GET_ATTR,
- [=](auto &t, auto& onode) -> _omap_get_value_ertr::future<ceph::bufferlist> {
+ [=](auto &t, auto& onode) -> _omap_get_value_ret {
auto& layout = onode.get_layout();
if (name == OI_ATTR && layout.oi_size) {
ceph::bufferlist bl;
auto& layout = onode.get_layout();
return _omap_list(layout.xattr_root, t, std::nullopt,
OMapManager::omap_list_config_t::with_inclusive(false)
- ).safe_then([&layout](auto p) {
+ ).si_then([&layout](auto p) {
auto& attrs = std::get<1>(p);
ceph::bufferlist bl;
if (layout.oi_size) {
std::string_view key) const
{
return seastar::do_with(
- BtreeOMapManager(transaction_manager->get_tm()),
+ BtreeOMapManager(*transaction_manager),
std::move(root),
std::string(key),
[&t](auto &manager, auto& root, auto& key) -> _omap_get_value_ret {
if (root.is_null()) {
- return crimson::ct_error::enodata::make();
+ return crimson::ct_error::enodata::make();
}
- return with_trans_intr(
- t,
- [&](auto &t) {
- return manager.omap_get_value(
- root, t, key
- );
- }).safe_then([](auto opt) -> _omap_get_value_ret {
- if (!opt) {
- return crimson::ct_error::enodata::make();
- }
- return seastar::make_ready_future<ceph::bufferlist>(std::move(*opt));
- });
- });
+ return manager.omap_get_value(root, t, key
+ ).si_then([](auto opt) -> _omap_get_value_ret {
+ if (!opt) {
+ return crimson::ct_error::enodata::make();
+ }
+ return seastar::make_ready_future<ceph::bufferlist>(std::move(*opt));
+ });
+ }
+ );
}
SeaStore::_omap_get_values_ret SeaStore::_omap_get_values(
return seastar::make_ready_future<omap_values_t>();
}
return seastar::do_with(
- BtreeOMapManager(transaction_manager->get_tm()),
+ BtreeOMapManager(*transaction_manager),
std::move(omap_root),
omap_values_t(),
[&](auto &manager, auto &root, auto &ret) {
- return with_trans_intr(
- t,
- [&](auto &t) {
- return trans_intr::do_for_each(
- keys.begin(),
- keys.end(),
- [&](auto &key) {
- return manager.omap_get_value(
- root,
- t,
- key
- ).si_then([&ret, &key](auto &&p) {
- if (p) {
- bufferlist bl;
- bl.append(*p);
- ret.emplace(
- std::move(key),
- std::move(bl));
- }
- return seastar::now();
- });
- }).si_then([&ret] {
- return std::move(ret);
- });
- });
- });
+ return trans_intr::do_for_each(
+ keys.begin(),
+ keys.end(),
+ [&](auto &key) {
+ return manager.omap_get_value(
+ root,
+ t,
+ key
+ ).si_then([&ret, &key](auto &&p) {
+ if (p) {
+ bufferlist bl;
+ bl.append(*p);
+ ret.emplace(
+ std::move(key),
+ std::move(bl));
+ }
+ return seastar::now();
+ });
+ }
+ ).si_then([&ret] {
+ return std::move(ret);
+ });
+ }
+ );
}
SeaStore::_omap_list_ret SeaStore::_omap_list(
);
}
return seastar::do_with(
- BtreeOMapManager(transaction_manager->get_tm()),
+ BtreeOMapManager(*transaction_manager),
root,
start,
[&t, config](auto &manager, auto& root, auto& start) {
- return with_trans_intr(
- t,
- [&](auto &t) {
- return manager.omap_list(root, t, start, config);
- });
+ return manager.omap_list(root, t, start, config);
});
}
Transaction::src_t::MUTATE,
op_type_t::TRANSACTION,
[this](auto &ctx) {
- return with_trans_intr(
- *ctx.transaction,
- [&](auto &t) {
+ return with_trans_intr(*ctx.transaction, [&, this](auto &t) {
return onode_manager->get_or_create_onodes(
- *ctx.transaction, ctx.iter.get_objects());
- }
- ).safe_then([this, &ctx](auto &&read_onodes) {
- ctx.onodes = std::move(read_onodes);
- return crimson::repeat(
- [this, &ctx]() -> tm_ertr::future<seastar::stop_iteration> {
- if (ctx.iter.have_op()) {
- return _do_transaction_step(
- ctx, ctx.ch, ctx.onodes, ctx.iter
- ).safe_then([] {
- return seastar::make_ready_future<seastar::stop_iteration>(
- seastar::stop_iteration::no);
- });
- } else {
- return seastar::make_ready_future<seastar::stop_iteration>(
- seastar::stop_iteration::yes);
- };
- });
- }).safe_then([this, &ctx] {
- return with_trans_intr(
- *ctx.transaction,
- [&](auto &t) {
- return onode_manager->write_dirty(*ctx.transaction, ctx.onodes);
- }
- );
- }).safe_then([this, &ctx] {
- // There are some validations in onode tree during onode value
- // destruction in debug mode, which need to be done before calling
- // submit_transaction().
- ctx.onodes.clear();
- return transaction_manager->submit_transaction(*ctx.transaction);
+ *ctx.transaction, ctx.iter.get_objects()
+ ).si_then([this, &ctx](auto &&read_onodes) {
+ ctx.onodes = std::move(read_onodes);
+ return trans_intr::repeat(
+ [this, &ctx]() -> tm_iertr::future<seastar::stop_iteration> {
+ if (ctx.iter.have_op()) {
+ return _do_transaction_step(
+ ctx, ctx.ch, ctx.onodes, ctx.iter
+ ).si_then([] {
+ return seastar::make_ready_future<seastar::stop_iteration>(
+ seastar::stop_iteration::no);
+ });
+ } else {
+ return seastar::make_ready_future<seastar::stop_iteration>(
+ seastar::stop_iteration::yes);
+ };
+ }
+ );
+ }).si_then([this, &ctx] {
+ return onode_manager->write_dirty(*ctx.transaction, ctx.onodes);
+ }).si_then([this, &ctx] {
+ // There are some validations in onode tree during onode value
+ // destruction in debug mode, which need to be done before calling
+ // submit_transaction().
+ ctx.onodes.clear();
+ return transaction_manager->submit_transaction(*ctx.transaction);
+ });
}).safe_then([&ctx]() {
- for (auto i : {
- ctx.ext_transaction.get_on_applied(),
- ctx.ext_transaction.get_on_commit(),
- ctx.ext_transaction.get_on_applied_sync()}) {
- if (i) {
- i->complete(0);
- }
- }
- return tm_ertr::now();
+ for (auto i : {
+ ctx.ext_transaction.get_on_applied(),
+ ctx.ext_transaction.get_on_commit(),
+ ctx.ext_transaction.get_on_applied_sync()}) {
+ if (i) {
+ i->complete(0);
+ }
+ }
+ return seastar::now();
});
});
}
try {
switch (auto op = i.decode_op(); op->op) {
case Transaction::OP_NOP:
- return tm_ertr::now();
+ return tm_iertr::now();
case Transaction::OP_REMOVE:
{
return _remove(ctx, get_onode(op->oid));
{
ceph::bufferlist hint;
i.decode_bl(hint);
- return tm_ertr::now();
+ return tm_iertr::now();
}
default:
ERROR("bad op {}", static_cast<unsigned>(op->op));
{
LOG_PREFIX(SeaStore::_remove);
DEBUGT("onode={}", *ctx.transaction, *onode);
- return with_trans_intr(
- *ctx.transaction,
- [&](auto &t) {
- return onode_manager->erase_onode(*ctx.transaction, onode);
- }
- );
+ return onode_manager->erase_onode(*ctx.transaction, onode);
}
SeaStore::tm_ret SeaStore::_touch(
{
LOG_PREFIX(SeaStore::_touch);
DEBUGT("onode={}", *ctx.transaction, *onode);
- return tm_ertr::now();
+ return tm_iertr::now();
}
SeaStore::tm_ret SeaStore::_write(
return seastar::do_with(
std::move(_bl),
[=, &ctx, &onode](auto &bl) {
- return with_trans_intr(*ctx.transaction, [&](auto &t) {
- return ObjectDataHandler().write(
- ObjectDataHandler::context_t{
- transaction_manager->get_tm(),
- t,
- *onode,
- },
- offset,
- bl);
- });
+ return ObjectDataHandler().write(
+ ObjectDataHandler::context_t{
+ *transaction_manager,
+ *ctx.transaction,
+ *onode,
+ },
+ offset,
+ bl);
});
}
std::map<std::string, ceph::bufferlist>&& kvs)
{
return seastar::do_with(
- BtreeOMapManager(transaction_manager->get_tm()),
+ BtreeOMapManager(*transaction_manager),
omap_root.get(),
[&, keys=std::move(kvs)](auto &omap_manager, auto &root) {
- return with_trans_intr(
- t,
- [&](auto &t) {
- tm_iertr::future<> maybe_create_root =
- !root.is_null() ?
- tm_iertr::now() :
- omap_manager.initialize_omap(
- t
- ).si_then([&root](auto new_root) {
- root = new_root;
- });
- return maybe_create_root.si_then(
- [&, keys=std::move(keys)]() mutable {
- return omap_manager.omap_set_keys(root, t, std::move(keys));
- }).si_then([&] {
- return tm_iertr::make_ready_future<omap_root_t>(std::move(root));
- }).si_then([&mutable_omap_root](auto root) {
- if (root.must_update()) {
- mutable_omap_root.update(root);
- }
- });
- });
- });
+ tm_iertr::future<> maybe_create_root =
+ !root.is_null() ?
+ tm_iertr::now() :
+ omap_manager.initialize_omap(
+ t
+ ).si_then([&root](auto new_root) {
+ root = new_root;
+ });
+ return maybe_create_root.si_then(
+ [&, keys=std::move(keys)]() mutable {
+ return omap_manager.omap_set_keys(root, t, std::move(keys));
+ }).si_then([&] {
+ return tm_iertr::make_ready_future<omap_root_t>(std::move(root));
+ }).si_then([&mutable_omap_root](auto root) {
+ if (root.must_update()) {
+ mutable_omap_root.update(root);
+ }
+ });
+ }
+ );
}
SeaStore::tm_ret SeaStore::_omap_set_values(
LOG_PREFIX(SeaStore::_omap_set_header);
DEBUGT("{} {} bytes", *ctx.transaction, *onode, header.length());
assert(0 == "not supported yet");
- return tm_ertr::now();
+ return tm_iertr::now();
}
SeaStore::tm_ret SeaStore::_omap_rmkeys(
return seastar::now();
} else {
return seastar::do_with(
- BtreeOMapManager(transaction_manager->get_tm()),
+ BtreeOMapManager(*transaction_manager),
onode->get_layout().omap_root.get(),
std::move(keys),
[&ctx, &onode](
auto &omap_manager,
auto &omap_root,
auto &keys) {
- return with_trans_intr(
- *ctx.transaction,
- [&](auto &t) {
- return trans_intr::do_for_each(
- keys.begin(),
- keys.end(),
- [&](auto &p) {
- return omap_manager.omap_rm_key(
- omap_root,
- *ctx.transaction,
- p);
- }).si_then([&] {
- if (omap_root.must_update()) {
- onode->get_mutable_layout(*ctx.transaction
- ).omap_root.update(omap_root);
- }
- });
- });
- });
+ return trans_intr::do_for_each(
+ keys.begin(),
+ keys.end(),
+ [&](auto &p) {
+ return omap_manager.omap_rm_key(
+ omap_root,
+ *ctx.transaction,
+ p);
+ }
+ ).si_then([&] {
+ if (omap_root.must_update()) {
+ onode->get_mutable_layout(*ctx.transaction
+ ).omap_root.update(omap_root);
+ }
+ });
+ }
+ );
}
}
LOG_PREFIX(SeaStore::_omap_rmkeyrange);
DEBUGT("{} first={} last={}", *ctx.transaction, *onode, first, last);
assert(0 == "not supported yet");
- return tm_ertr::now();
+ return tm_iertr::now();
}
SeaStore::tm_ret SeaStore::_truncate(
LOG_PREFIX(SeaStore::_truncate);
DEBUGT("onode={} size={}", *ctx.transaction, *onode, size);
onode->get_mutable_layout(*ctx.transaction).size = size;
- return with_trans_intr(*ctx.transaction, [&](auto &t) {
- return ObjectDataHandler().truncate(
- ObjectDataHandler::context_t{
- transaction_manager->get_tm(),
- t,
- *onode
- },
- size);
- });
+ return ObjectDataHandler().truncate(
+ ObjectDataHandler::context_t{
+ *transaction_manager,
+ *ctx.transaction,
+ *onode
+ },
+ size);
}
SeaStore::tm_ret SeaStore::_setattrs(
}
if (aset.empty()) {
- return tm_ertr::now();
+ return tm_iertr::now();
}
return _omap_set_kvs(
{
return transaction_manager->read_collection_root(
*ctx.transaction
- ).safe_then([=, &ctx](auto _cmroot) {
+ ).si_then([=, &ctx](auto _cmroot) {
return seastar::do_with(
_cmroot,
[=, &ctx](auto &cmroot) {
- return with_trans_intr(
- *ctx.transaction,
- [=, &cmroot](auto &t) {
- return collection_manager->create(
- cmroot,
- t,
- cid,
- bits);
- }).safe_then([=, &ctx, &cmroot] {
- if (cmroot.must_update()) {
- transaction_manager->write_collection_root(
- *ctx.transaction,
- cmroot);
- }
- });
- });
- }).handle_error(
- tm_ertr::pass_further{},
+ return collection_manager->create(
+ cmroot,
+ *ctx.transaction,
+ cid,
+ bits
+ ).si_then([=, &ctx, &cmroot] {
+ if (cmroot.must_update()) {
+ transaction_manager->write_collection_root(
+ *ctx.transaction,
+ cmroot);
+ }
+ });
+ }
+ );
+ }).handle_error_interruptible(
+ tm_iertr::pass_further{},
crimson::ct_error::assert_all{
"Invalid error in SeaStore::_create_collection"
}
{
return transaction_manager->read_collection_root(
*ctx.transaction
- ).safe_then([=, &ctx](auto _cmroot) {
+ ).si_then([=, &ctx](auto _cmroot) {
return seastar::do_with(
_cmroot,
[=, &ctx](auto &cmroot) {
- return with_trans_intr(
- *ctx.transaction,
- [=, &cmroot](auto &t) {
- return collection_manager->remove(
- cmroot,
- t,
- cid);
- }).safe_then([=, &ctx, &cmroot] {
- // param here denotes whether it already existed, probably error
- if (cmroot.must_update()) {
- transaction_manager->write_collection_root(
- *ctx.transaction,
- cmroot);
- }
- });
+ return collection_manager->remove(
+ cmroot,
+ *ctx.transaction,
+ cid
+ ).si_then([=, &ctx, &cmroot] {
+ // param here denotes whether it already existed, probably error
+ if (cmroot.must_update()) {
+ transaction_manager->write_collection_root(
+ *ctx.transaction,
+ cmroot);
+ }
+ });
});
- }).handle_error(
- tm_ertr::pass_further{},
+ }).handle_error_interruptible(
+ tm_iertr::pass_further{},
crimson::ct_error::assert_all{
"Invalid error in SeaStore::_create_collection"
}
LOG_PREFIX(SeaStore::write_meta);
DEBUG("key: {}; value: {}", key, value);
return seastar::do_with(
- TransactionRef(),
- key,
- value,
- [this, FNAME](auto &t, auto& key, auto& value) {
- return repeat_eagain([this, FNAME, &t, &key, &value] {
- t = transaction_manager->create_transaction(
- Transaction::src_t::MUTATE);
- DEBUGT("Have transaction, key: {}; value: {}", *t, key, value);
+ key, value,
+ [this, FNAME](auto& key, auto& value) {
+ return repeat_eagain([this, FNAME, &key, &value] {
+ return transaction_manager->with_transaction_intr(
+ Transaction::src_t::MUTATE,
+ [this, FNAME, &key, &value](auto& t) {
+ DEBUGT("Have transaction, key: {}; value: {}", t, key, value);
return transaction_manager->update_root_meta(
- *t, key, value
- ).safe_then([this, &t] {
- return transaction_manager->submit_transaction(*t);
- });
+ t, key, value
+ ).si_then([this, &t] {
+ return transaction_manager->submit_transaction(t);
+ });
});
- }).handle_error(
- crimson::ct_error::assert_all{"Invalid error in SeaStore::write_meta"}
- );
+ });
+ }).handle_error(
+ crimson::ct_error::assert_all{"Invalid error in SeaStore::write_meta"}
+ );
}
seastar::future<std::tuple<int, std::string>> SeaStore::read_meta(const std::string& key)
LOG_PREFIX(SeaStore::read_meta);
DEBUG("key: {}", key);
return seastar::do_with(
- std::tuple<int, std::string>(),
- TransactionRef(),
- key,
- [this](auto &ret, auto &t, auto& key) {
- return repeat_eagain([this, &ret, &t, &key] {
- t = transaction_manager->create_transaction(
- Transaction::src_t::READ);
- return transaction_manager->read_root_meta(
- *t, key
- ).safe_then([&ret](auto v) {
- if (v) {
- ret = std::make_tuple(0, std::move(*v));
- } else {
- ret = std::make_tuple(-1, std::string(""));
- }
- });
- }).safe_then([&ret] {
- return std::move(ret);
+ std::tuple<int, std::string>(), key,
+ [this](auto &ret, auto& key) {
+ return repeat_eagain([this, &ret, &key] {
+ return transaction_manager->with_transaction_intr(
+ Transaction::src_t::READ,
+ [this, &ret, &key](auto& t) {
+ return transaction_manager->read_root_meta(
+ t, key
+ ).si_then([&ret](auto v) {
+ if (v) {
+ ret = std::make_tuple(0, std::move(*v));
+ } else {
+ ret = std::make_tuple(-1, std::string(""));
+ }
+ });
});
- }).handle_error(
- crimson::ct_error::assert_all{"Invalid error in SeaStore::read_meta"}
- );
+ }).safe_then([&ret] {
+ return std::move(ret);
+ });
+ }).handle_error(
+ crimson::ct_error::assert_all{"Invalid error in SeaStore::read_meta"}
+ );
}
uuid_d SeaStore::get_fsid() const
ceph::os::Transaction::iterator iter;
std::chrono::steady_clock::time_point begin_timestamp = std::chrono::steady_clock::now();
- template <typename TM>
- void reset_preserve_handle(TM &tm) {
- tm->reset_transaction_preserve_handle(*transaction);
+ void reset_preserve_handle(TransactionManager &tm) {
+ tm.reset_transaction_preserve_handle(*transaction);
onodes.clear();
iter = ext_transaction.begin();
}
static_cast<SeastoreCollection&>(*(ctx.ch)).ordering_lock
).then([&, this] {
return repeat_eagain([&, this] {
- ctx.reset_preserve_handle(transaction_manager);
+ ctx.reset_preserve_handle(*transaction_manager);
return std::invoke(f, ctx);
}).handle_error(
crimson::ct_error::eagain::pass_further{},
add_latency_sample(op_type,
std::chrono::steady_clock::now() - ctx.begin_timestamp);
});
- });
+ }
+ );
}
template <typename Ret, typename F>
F &&f) const {
auto begin_time = std::chrono::steady_clock::now();
return seastar::do_with(
- oid,
- Ret{},
- TransactionRef(),
- OnodeRef(),
- std::forward<F>(f),
- [=](auto &oid, auto &ret, auto &t, auto &onode, auto &f) {
- return repeat_eagain([&, this, src] {
- t = transaction_manager->create_transaction(src);
- return with_trans_intr(
- *t,
- [&](auto &t) {
- return onode_manager->get_onode(t, oid);
- }
- ).safe_then([&](auto onode_ret) {
- onode = std::move(onode_ret);
- return f(*t, *onode);
- }).safe_then([&ret](auto _ret) {
- ret = _ret;
- });
- }).safe_then([&ret, op_type, begin_time, this] {
- const_cast<SeaStore*>(this)->add_latency_sample(op_type,
- std::chrono::steady_clock::now() - begin_time);
- return seastar::make_ready_future<Ret>(ret);
- });
+ oid, Ret{}, OnodeRef(), std::forward<F>(f),
+ [this, src, op_type, begin_time](auto &oid, auto &ret, auto &onode, auto &f) {
+ return repeat_eagain([&, this, src] {
+ return transaction_manager->with_transaction_intr(
+ src, [&, this](auto& t) {
+ return onode_manager->get_onode(t, oid
+ ).si_then([&](auto onode_ret) {
+ onode = std::move(onode_ret);
+ return f(t, *onode);
+ }).si_then([&ret](auto _ret) {
+ ret = _ret;
+ });
+ });
+ }).safe_then([&ret, op_type, begin_time, this] {
+ const_cast<SeaStore*>(this)->add_latency_sample(op_type,
+ std::chrono::steady_clock::now() - begin_time);
+ return seastar::make_ready_future<Ret>(ret);
});
+ });
}
- using _omap_get_value_ertr = with_trans_ertr<OMapManager::base_iertr>::extend<
+ using _omap_get_value_iertr = OMapManager::base_iertr::extend<
crimson::ct_error::enodata
>;
- using _omap_get_value_ret = _omap_get_value_ertr::future<ceph::bufferlist>;
+ using _omap_get_value_ret = _omap_get_value_iertr::future<ceph::bufferlist>;
_omap_get_value_ret _omap_get_value(
Transaction &t,
omap_root_t &&root,
std::string_view key) const;
- using _omap_get_values_ertr = with_trans_ertr<OMapManager::base_iertr>;
- using _omap_get_values_ret = _omap_get_values_ertr::future<omap_values_t>;
+ using _omap_get_values_iertr = OMapManager::base_iertr;
+ using _omap_get_values_ret = _omap_get_values_iertr::future<omap_values_t>;
_omap_get_values_ret _omap_get_values(
Transaction &t,
omap_root_t &&root,
const omap_keys_t &keys) const;
using _omap_list_bare_ret = OMapManager::omap_list_bare_ret;
- using _omap_list_ret =
- _omap_get_values_ertr::future<OMapManager::omap_list_bare_ret>;
+ using _omap_list_ret = OMapManager::omap_list_ret;
_omap_list_ret _omap_list(
const omap_root_le_t& omap_root,
Transaction& t,
OMapManager::omap_list_config_t config);
SegmentManagerRef segment_manager;
- InterruptedTMRef transaction_manager;
+ TransactionManagerRef transaction_manager;
CollectionManagerRef collection_manager;
OnodeManagerRef onode_manager;
using tm_iertr = TransactionManager::base_iertr;
- using tm_ertr = with_trans_ertr<tm_iertr>;
- using tm_ret = tm_ertr::future<>;
+ using tm_ret = tm_iertr::future<>;
tm_ret _do_transaction_step(
internal_context_t &ctx,
CollectionRef &col,
tm_ret _remove_collection(
internal_context_t &ctx,
const coll_t& cid);
- using omap_set_kvs_ret = tm_ertr::future<>;
+ using omap_set_kvs_ret = tm_iertr::future<>;
omap_set_kvs_ret _omap_set_kvs(
const omap_root_le_t& omap_root,
Transaction& t,
SegmentCleaner::gc_trim_journal_ret SegmentCleaner::gc_trim_journal()
{
- return repeat_eagain(
- [this] {
- return seastar::do_with(
- ecb->create_transaction(Transaction::src_t::CLEANER),
- [this](auto &tref) {
- return with_trans_intr(*tref, [this](auto &t) {
- return rewrite_dirty(t, get_dirty_tail()
- ).si_then([this, &t] {
- return ecb->submit_transaction_direct(
- t);
- });
- });
- });
+ return repeat_eagain([this] {
+ return ecb->with_transaction_intr(
+ Transaction::src_t::CLEANER, [this](auto& t) {
+ return rewrite_dirty(t, get_dirty_tail()
+ ).si_then([this, &t] {
+ return ecb->submit_transaction_direct(t);
+ });
});
+ });
}
SegmentCleaner::gc_reclaim_space_ret SegmentCleaner::gc_reclaim_space()
config.reclaim_bytes_stride
).safe_then([this](auto &&_extents) {
return seastar::do_with(
- std::move(_extents),
- [this](auto &extents) {
- return repeat_eagain([this, &extents]() mutable {
- logger().debug(
- "SegmentCleaner::gc_reclaim_space: processing {} extents",
- extents.size());
- return seastar::do_with(
- ecb->create_transaction(Transaction::src_t::CLEANER),
- [this, &extents](auto &tref) mutable {
- return with_trans_intr(*tref, [this, &extents](auto &t) {
- return trans_intr::do_for_each(
- extents,
- [this, &t](auto &extent) {
- auto &[addr, info] = extent;
- logger().debug(
- "SegmentCleaner::gc_reclaim_space: checking extent {}",
- info);
- return ecb->get_extent_if_live(
- t,
- info.type,
- addr,
- info.addr,
- info.len
- ).si_then([addr=addr, &t, this](CachedExtentRef ext) {
- if (!ext) {
- logger().debug(
- "SegmentCleaner::gc_reclaim_space: addr {} dead, skipping",
- addr);
- return ExtentCallbackInterface::rewrite_extent_iertr::now();
- } else {
- logger().debug(
- "SegmentCleaner::gc_reclaim_space: addr {} alive, gc'ing {}",
- addr,
- *ext);
- return ecb->rewrite_extent(
- t,
- ext);
- }
- });
- }
- ).si_then([this, &t] {
- if (scan_cursor->is_complete()) {
- t.mark_segment_to_release(scan_cursor->get_offset().segment);
- }
- return ecb->submit_transaction_direct(t);
- });
- });
- });
- });
+ std::move(_extents),
+ [this](auto &extents) {
+ return repeat_eagain([this, &extents]() mutable {
+ logger().debug(
+ "SegmentCleaner::gc_reclaim_space: processing {} extents",
+ extents.size());
+ return ecb->with_transaction_intr(
+ Transaction::src_t::CLEANER,
+ [this, &extents](auto& t) {
+ return trans_intr::do_for_each(
+ extents,
+ [this, &t](auto &extent) {
+ auto &[addr, info] = extent;
+ logger().debug(
+ "SegmentCleaner::gc_reclaim_space: checking extent {}",
+ info);
+ return ecb->get_extent_if_live(
+ t,
+ info.type,
+ addr,
+ info.addr,
+ info.len
+ ).si_then([addr=addr, &t, this](CachedExtentRef ext) {
+ if (!ext) {
+ logger().debug(
+ "SegmentCleaner::gc_reclaim_space: addr {} dead, skipping",
+ addr);
+ return ExtentCallbackInterface::rewrite_extent_iertr::now();
+ } else {
+ logger().debug(
+ "SegmentCleaner::gc_reclaim_space: addr {} alive, gc'ing {}",
+ addr,
+ *ext);
+ return ecb->rewrite_extent(
+ t,
+ ext);
+ }
+ });
+ }).si_then([this, &t] {
+ if (scan_cursor->is_complete()) {
+ t.mark_segment_to_release(scan_cursor->get_offset().segment);
+ }
+ return ecb->submit_transaction_direct(t);
+ });
+ });
});
+ });
}).safe_then([this] {
if (scan_cursor->is_complete()) {
scan_cursor.reset();
virtual TransactionRef create_transaction(Transaction::src_t) = 0;
+ /// Creates empty transaction with interruptible context
+ template <typename Func>
+ auto with_transaction_intr(Transaction::src_t src, Func &&f) {
+ return seastar::do_with(
+ create_transaction(src),
+ [f=std::forward<Func>(f)](auto &ref_t) mutable {
+ return with_trans_intr(
+ *ref_t,
+ [f=std::forward<Func>(f)](auto& t) mutable {
+ return f(t);
+ }
+ );
+ }
+ );
+ }
+
/**
* get_next_dirty_extent
*
return weak;
}
+ void test_set_conflict() {
+ conflicted = true;
+ }
+
bool is_conflicted() const {
return conflicted;
}
return journal->open_for_write().safe_then([this, FNAME](auto addr) {
DEBUG("about to do_with");
segment_cleaner->init_mkfs(addr);
- return seastar::do_with(
- create_transaction(Transaction::src_t::INIT),
- [this, FNAME](auto &transaction) {
- DEBUGT(
- "about to cache->mkfs",
- *transaction);
- cache->init();
- return cache->mkfs(*transaction
- ).safe_then([this, &transaction] {
- return lba_manager->mkfs(*transaction);
- }).safe_then([this, FNAME, &transaction] {
- DEBUGT("about to submit_transaction", *transaction);
- return with_trans_intr(
- *transaction,
- [this, &transaction](auto&) {
- return submit_transaction_direct(*transaction);
- }
- ).handle_error(
- crimson::ct_error::eagain::handle([] {
- ceph_assert(0 == "eagain impossible");
- return mkfs_ertr::now();
- }),
- mkfs_ertr::pass_further{}
- );
- });
+ return with_transaction_intr(
+ Transaction::src_t::INIT, [this, FNAME](auto& t) {
+ DEBUGT("about to cache->mkfs", t);
+ cache->init();
+ return cache->mkfs(t
+ ).si_then([this, &t] {
+ return lba_manager->mkfs(t);
+ }).si_then([this, FNAME, &t] {
+ DEBUGT("about to submit_transaction", t);
+ return submit_transaction_direct(t);
});
+ }).handle_error(
+ crimson::ct_error::eagain::handle([] {
+ ceph_assert(0 == "eagain impossible");
+ return mkfs_ertr::now();
+ }),
+ mkfs_ertr::pass_further{}
+ );
}).safe_then([this] {
return close();
});
});
}
-// non-errorated version
-template <typename F>
-auto repeat_eagain2(F &&f) {
- LOG_PREFIX("repeat_eagain");
- return seastar::do_with(
- std::forward<F>(f),
- [FNAME](auto &f) {
- return seastar::repeat(
- [FNAME, &f] {
- return std::invoke(f
- ).safe_then([] {
- return seastar::stop_iteration::yes;
- }).handle_error(
- [FNAME](const crimson::ct_error::eagain &e) {
- DEBUG("hit eagain, restarting");
- return seastar::make_ready_future<seastar::stop_iteration>(
- seastar::stop_iteration::no);
- }
- );
- });
- });
-}
-
/**
* TransactionManager
*
*
* Get the logical pin at offset
*/
- using get_pin_ertr = LBAManager::get_mapping_ertr;
using get_pin_iertr = LBAManager::get_mapping_iertr;
using get_pin_ret = LBAManager::get_mapping_iertr::future<LBAPinRef>;
get_pin_ret get_pin(
logger().debug("Writing offset {}", offset);
assert(offset % segment_manager->get_block_size() == 0);
assert((ptr.length() % (size_t)segment_manager->get_block_size()) == 0);
- return repeat_eagain([this, offset, ptr=std::move(ptr)] {
- return seastar::do_with(
- tm->create_transaction(Transaction::src_t::MUTATE),
- ptr,
- [this, offset](auto &t, auto &ptr) mutable {
- return tm->dec_ref(
- *t,
- offset
- ).safe_then([](auto){}).handle_error(
- crimson::ct_error::enoent::handle([](auto) { return seastar::now(); }),
- crimson::ct_error::pass_further_all{}
- ).safe_then([=, &t, &ptr] {
- logger().debug("dec_ref complete");
- return tm->alloc_extent<TestBlock>(
- *t,
- offset,
- ptr.length());
- }).safe_then([=, &t, &ptr](auto ext) mutable {
- assert(ext->get_laddr() == (size_t)offset);
- assert(ext->get_bptr().length() == ptr.length());
- ext->get_bptr().swap(ptr);
- logger().debug("submitting transaction");
- return tm->submit_transaction(*t);
- });
+ return seastar::do_with(ptr, [this, offset](auto& ptr) {
+ return repeat_eagain([this, offset, &ptr] {
+ return tm->with_transaction_intr(
+ Transaction::src_t::MUTATE,
+ [this, offset, &ptr](auto& t) {
+ return tm->dec_ref(t, offset
+ ).si_then([](auto){}).handle_error_interruptible(
+ crimson::ct_error::enoent::handle([](auto) { return seastar::now(); }),
+ crimson::ct_error::pass_further_all{}
+ ).si_then([this, offset, &t, &ptr] {
+ logger().debug("dec_ref complete");
+ return tm->alloc_extent<TestBlock>(t, offset, ptr.length());
+ }).si_then([this, offset, &t, &ptr](auto ext) {
+ assert(ext->get_laddr() == (size_t)offset);
+ assert(ext->get_bptr().length() == ptr.length());
+ ext->get_bptr().swap(ptr);
+ logger().debug("submitting transaction");
+ return tm->submit_transaction(t);
+ });
});
+ });
}).handle_error(
crimson::ct_error::assert_all{"store-nbd write"}
);
[this, &t, offset, length](auto &pins, auto &ret) {
return tm->get_pins(
t, offset, length
- ).safe_then([this, &t, &pins, &ret](auto _pins) {
+ ).si_then([this, &t, &pins, &ret](auto _pins) {
_pins.swap(pins);
logger().debug("read_extents: mappings {}", pins);
- return crimson::do_for_each(
+ return trans_intr::do_for_each(
pins.begin(),
pins.end(),
[this, &t, &ret](auto &&pin) {
return tm->pin_to_extent<TestBlock>(
t,
std::move(pin)
- ).safe_then([this, &ret](auto ref) mutable {
+ ).si_then([this, &ret](auto ref) mutable {
ret.push_back(std::make_pair(ref->get_laddr(), ref));
logger().debug(
"read_extents: got extent {}",
*ref);
return seastar::now();
});
- }).safe_then([&ret] {
+ }).si_then([&ret] {
return std::move(ret);
});
});
auto blptrret = std::make_unique<bufferlist>();
auto &blret = *blptrret;
return repeat_eagain([=, &blret] {
- return seastar::do_with(
- tm->create_transaction(Transaction::src_t::READ),
- [=, &blret](auto &t) {
- return read_extents(*t, offset, size
- ).safe_then([=, &blret](auto ext_list) mutable {
- size_t cur = offset;
- for (auto &i: ext_list) {
- if (cur != i.first) {
- assert(cur < i.first);
- blret.append_zero(i.first - cur);
- cur = i.first;
- }
- blret.append(i.second->get_bptr());
- cur += i.second->get_bptr().length();
- }
- if (blret.length() != size) {
- assert(blret.length() < size);
- blret.append_zero(size - blret.length());
- }
- });
+ return tm->with_transaction_intr(
+ Transaction::src_t::READ,
+ [=, &blret](auto& t) {
+ return read_extents(t, offset, size
+ ).si_then([=, &blret](auto ext_list) {
+ size_t cur = offset;
+ for (auto &i: ext_list) {
+ if (cur != i.first) {
+ assert(cur < i.first);
+ blret.append_zero(i.first - cur);
+ cur = i.first;
+ }
+ blret.append(i.second->get_bptr());
+ cur += i.second->get_bptr().length();
+ }
+ if (blret.length() != size) {
+ assert(blret.length() < size);
+ blret.append_zero(size - blret.length());
+ }
});
+ });
}).handle_error(
crimson::ct_error::assert_all{"store-nbd read"}
).then([blptrret=std::move(blptrret)]() mutable {
journal->set_segment_provider(&*segment_cleaner);
- tm = InterruptedTMRef(
+ tm = std::make_unique<TransactionManager>(
*segment_manager,
std::move(segment_cleaner),
std::move(journal),
std::unique_ptr<BlockSegmentManager> segment_manager;
using TransactionManager = crimson::os::seastore::TransactionManager;
- using TMRef = crimson::os::seastore::InterruptedTMRef;
- TMRef tm;
+ using TransactionManagerRef = crimson::os::seastore::TransactionManagerRef;
+ TransactionManagerRef tm;
seastar::future<> mkfs();
void init();
void clear();
- using read_extents_ertr = crimson::os::seastore::with_trans_ertr<
- TransactionManager::read_extent_iertr>;
- using read_extents_ret = read_extents_ertr::future<
+ using read_extents_iertr = TransactionManager::read_extent_iertr;
+ using read_extents_ret = read_extents_iertr::future<
crimson::os::seastore::lextent_list_t<crimson::os::seastore::TestBlock>
>;
read_extents_ret read_extents(
).safe_then([this] {
return seastar::do_with(
create_mutate_transaction(),
- [this](auto &t) {
- return manager->mkfs(*t
- ).safe_then([this, &t] {
- return submit_transaction_fut(*t);
+ [this](auto &ref_t) {
+ return with_trans_intr(*ref_t, [&](auto &t) {
+ return manager->mkfs(t
+ ).si_then([this, &t] {
+ return submit_transaction_fut(t);
+ });
});
});
}).safe_then([this] {
tree{std::move(moved_nm)} {}
seastar::future<> set_up_fut() override final {
- return tree.mkfs(t).handle_error(
+ return INTR(tree.mkfs, t).handle_error(
crimson::ct_error::all_same_way([] {
ASSERT_FALSE("Unable to mkfs");
})
std::pair<unsigned, unsigned> range_0,
size_t value_size) {
return seastar::async([this, range_2, range_1, range_0, value_size] {
- tree.mkfs(t).unsafe_get0();
+ INTR(tree.mkfs, t).unsafe_get0();
//logger().info("\n---------------------------------------------"
// "\nbefore leaf node split:\n");
auto keys = build_key_set(range_2, range_1, range_0);
seastar::future<> build_tree(
const std::vector<ghobject_t>& keys, const std::vector<test_item_t>& values) {
return seastar::async([this, keys, values] {
- tree.mkfs(t).unsafe_get0();
+ INTR(tree.mkfs, t).unsafe_get0();
//logger().info("\n---------------------------------------------"
// "\nbefore leaf node split:\n");
ASSERT_EQ(keys.size(), values.size());
DummyChildPool() = default;
~DummyChildPool() { reset(); }
- eagain_future<> build_tree(const std::set<ghobject_t>& keys) {
+ auto build_tree(const std::set<ghobject_t>& keys) {
reset();
// create tree
auto ref_dummy = NodeExtentManager::create_dummy(IS_DUMMY_SYNC);
auto iter = kvs.begin();
while (iter != kvs.end()) {
++num_ops;
- repeat_eagain2([this, &tree, &num_ops_eagain, &iter] {
+ repeat_eagain([this, &tree, &num_ops_eagain, &iter] {
++num_ops_eagain;
auto t = create_read_transaction();
return INTR_R(tree->validate_one, *t, iter
).safe_then([t=std::move(t)]{});
- }).get0();
+ }).unsafe_get0();
++iter;
}
}
}).safe_then([this](auto addr) {
return seastar::do_with(
cache.create_transaction(Transaction::src_t::MUTATE),
- [this](auto &transaction) {
- cache.init();
- return cache.mkfs(*transaction
- ).safe_then([this, &transaction] {
- return lba_manager->mkfs(*transaction);
- }).safe_then([this, &transaction] {
- return submit_transaction(std::move(transaction));
+ [this](auto &ref_t) {
+ return with_trans_intr(*ref_t, [&](auto &t) {
+ cache.init();
+ return cache.mkfs(t
+ ).si_then([this, &t] {
+ return lba_manager->mkfs(t);
+ });
+ }).safe_then([this, &ref_t] {
+ return submit_transaction(std::move(ref_t));
});
});
}).handle_error(
seastar::future<> set_up_fut() final {
return segment_manager->init(
- ).safe_then(
- [this] {
- return seastar::do_with(
- get_transaction(),
- [this](auto &transaction) {
- cache.init();
- return cache.mkfs(*transaction).safe_then(
- [this, &transaction] {
- return submit_transaction(std::move(transaction)).then(
- [](auto p) {});
- });
- });
- }).handle_error(
- crimson::ct_error::all_same_way([](auto e) {
- ASSERT_FALSE("failed to submit");
- })
- );
+ ).safe_then([this] {
+ return seastar::do_with(
+ get_transaction(),
+ [this](auto &ref_t) {
+ cache.init();
+ return with_trans_intr(*ref_t, [&](auto &t) {
+ return cache.mkfs(t);
+ }).safe_then([this, &ref_t] {
+ return submit_transaction(std::move(ref_t)
+ ).then([](auto p) {});
+ });
+ });
+ }).handle_error(
+ crimson::ct_error::all_same_way([](auto e) {
+ ASSERT_FALSE("failed to submit");
+ })
+ );
}
seastar::future<> tear_down_fut() final {