op_type_t::TRANSACTION,
[this](auto &ctx) {
return with_trans_intr(*ctx.transaction, [&, this](auto &t) {
- return onode_manager->get_or_create_onodes(
- *ctx.transaction, ctx.iter.get_objects()
- ).si_then([this, &ctx](auto &&onodes) {
- return seastar::do_with(std::move(onodes), [this, &ctx](auto& onodes) {
- return trans_intr::repeat(
- [this, &ctx, &onodes]() -> tm_iertr::future<seastar::stop_iteration>
+ return seastar::do_with(std::vector<OnodeRef>(ctx.iter.objects.size()),
+ std::vector<OnodeRef>(),
+ [this, &ctx](auto& onodes, auto& d_onodes) mutable {
+ return trans_intr::repeat(
+ [this, &ctx, &onodes, &d_onodes]() mutable
+ -> tm_iertr::future<seastar::stop_iteration>
{
if (ctx.iter.have_op()) {
return _do_transaction_step(
- ctx, ctx.ch, onodes, ctx.iter
+ ctx, ctx.ch, onodes, d_onodes, ctx.iter
).si_then([] {
return seastar::make_ready_future<seastar::stop_iteration>(
seastar::stop_iteration::no);
return seastar::make_ready_future<seastar::stop_iteration>(
seastar::stop_iteration::yes);
};
- }).si_then([this, &ctx, &onodes] {
- return onode_manager->write_dirty(*ctx.transaction, onodes);
+ }).si_then([this, &ctx, &d_onodes] {
+ return onode_manager->write_dirty(*ctx.transaction, d_onodes);
});
- });
}).si_then([this, &ctx] {
return transaction_manager->submit_transaction(*ctx.transaction);
});
internal_context_t &ctx,
CollectionRef &col,
std::vector<OnodeRef> &onodes,
+ std::vector<OnodeRef> &d_onodes,
ceph::os::Transaction::iterator &i)
{
LOG_PREFIX(SeaStore::_do_transaction_step);
- auto get_onode = [&onodes](size_t i) -> OnodeRef& {
- ceph_assert(i < onodes.size());
- return onodes[i];
- };
+ auto op = i.decode_op();
using ceph::os::Transaction;
- try {
- switch (auto op = i.decode_op(); op->op) {
- case Transaction::OP_NOP:
- return tm_iertr::now();
- case Transaction::OP_REMOVE:
- {
- return _remove(ctx, get_onode(op->oid));
- }
- break;
- case Transaction::OP_TOUCH:
- {
- return _touch(ctx, get_onode(op->oid));
- }
- break;
- case Transaction::OP_WRITE:
- {
- uint64_t off = op->off;
- uint64_t len = op->len;
- uint32_t fadvise_flags = i.get_fadvise_flags();
- ceph::bufferlist bl;
- i.decode_bl(bl);
- return _write(
- ctx, get_onode(op->oid), off, len, std::move(bl),
- fadvise_flags);
- }
- break;
- case Transaction::OP_TRUNCATE:
- {
- uint64_t off = op->off;
- return _truncate(ctx, get_onode(op->oid), off);
- }
- break;
- case Transaction::OP_SETATTR:
- {
- std::string name = i.decode_string();
- std::map<std::string, bufferlist> to_set;
- ceph::bufferlist& bl = to_set[name];
- i.decode_bl(bl);
- return _setattrs(ctx, get_onode(op->oid), std::move(to_set));
- }
- break;
- case Transaction::OP_SETATTRS:
- {
- std::map<std::string, bufferlist> to_set;
- i.decode_attrset(to_set);
- return _setattrs(ctx, get_onode(op->oid), std::move(to_set));
- }
- break;
- case Transaction::OP_RMATTR:
- {
- std::string name = i.decode_string();
- return _rmattr(ctx, get_onode(op->oid), name);
- }
- break;
- case Transaction::OP_RMATTRS:
- {
- return _rmattrs(ctx, get_onode(op->oid));
- }
- break;
- case Transaction::OP_MKCOLL:
- {
- coll_t cid = i.get_cid(op->cid);
- return _create_collection(ctx, cid, op->split_bits);
- }
- break;
+ if (op->op == Transaction::OP_NOP)
+ return tm_iertr::now();
+
+ switch (op->op) {
case Transaction::OP_RMCOLL:
{
coll_t cid = i.get_cid(op->cid);
return _remove_collection(ctx, cid);
}
- break;
- case Transaction::OP_OMAP_SETKEYS:
- {
- std::map<std::string, ceph::bufferlist> aset;
- i.decode_attrset(aset);
- return _omap_set_values(ctx, get_onode(op->oid), std::move(aset));
- }
- break;
- case Transaction::OP_OMAP_SETHEADER:
- {
- ceph::bufferlist bl;
- i.decode_bl(bl);
- return _omap_set_header(ctx, get_onode(op->oid), std::move(bl));
- }
- break;
- case Transaction::OP_OMAP_RMKEYS:
- {
- omap_keys_t keys;
- i.decode_keyset(keys);
- return _omap_rmkeys(ctx, get_onode(op->oid), std::move(keys));
- }
- break;
- case Transaction::OP_OMAP_RMKEYRANGE:
- {
- string first, last;
- first = i.decode_string();
- last = i.decode_string();
- return _omap_rmkeyrange(
- ctx, get_onode(op->oid),
- std::move(first), std::move(last));
- }
- break;
- case Transaction::OP_OMAP_CLEAR:
+ case Transaction::OP_MKCOLL:
{
- return _omap_clear(ctx, get_onode(op->oid));
+ coll_t cid = i.get_cid(op->cid);
+ return _create_collection(ctx, cid, op->split_bits);
}
case Transaction::OP_COLL_HINT:
{
i.decode_bl(hint);
return tm_iertr::now();
}
- case Transaction::OP_ZERO:
- {
- objaddr_t off = op->off;
- extent_len_t len = op->len;
- return _zero(ctx, get_onode(op->oid), off, len);
+ }
+
+ using onode_iertr = OnodeManager::get_onode_iertr::extend<
+ crimson::ct_error::value_too_large>;
+ auto fut = onode_iertr::make_ready_future<OnodeRef>(OnodeRef());
+ bool create = false;
+ if (op->op == Transaction::OP_TOUCH ||
+ op->op == Transaction::OP_CREATE ||
+ op->op == Transaction::OP_WRITE ||
+ op->op == Transaction::OP_ZERO) {
+ create = true;
+ }
+ if (!onodes[op->oid]) {
+ if (!create) {
+ fut = onode_manager->get_onode(*ctx.transaction, i.get_oid(op->oid));
+ } else {
+ fut = onode_manager->get_or_create_onode(
+ *ctx.transaction, i.get_oid(op->oid));
}
- default:
- ERROR("bad op {}", static_cast<unsigned>(op->op));
+ }
+ return fut.si_then([&, op, this](auto&& get_onode) -> tm_ret {
+ OnodeRef &o = onodes[op->oid];
+ if (!o) {
+ assert(get_onode);
+ o = get_onode;
+ d_onodes.push_back(get_onode);
+ }
+ try {
+ switch (op->op) {
+ case Transaction::OP_REMOVE:
+ {
+ return _remove(ctx, onodes[op->oid]);
+ }
+ case Transaction::OP_CREATE:
+ case Transaction::OP_TOUCH:
+ {
+ return _touch(ctx, onodes[op->oid]);
+ }
+ case Transaction::OP_WRITE:
+ {
+ uint64_t off = op->off;
+ uint64_t len = op->len;
+ uint32_t fadvise_flags = i.get_fadvise_flags();
+ ceph::bufferlist bl;
+ i.decode_bl(bl);
+ return _write(
+ ctx, onodes[op->oid], off, len, std::move(bl),
+ fadvise_flags);
+ }
+ case Transaction::OP_TRUNCATE:
+ {
+ uint64_t off = op->off;
+ return _truncate(ctx, onodes[op->oid], off);
+ }
+ case Transaction::OP_SETATTR:
+ {
+ std::string name = i.decode_string();
+ std::map<std::string, bufferlist> to_set;
+ ceph::bufferlist& bl = to_set[name];
+ i.decode_bl(bl);
+ return _setattrs(ctx, onodes[op->oid], std::move(to_set));
+ }
+ case Transaction::OP_SETATTRS:
+ {
+ std::map<std::string, bufferlist> to_set;
+ i.decode_attrset(to_set);
+ return _setattrs(ctx, onodes[op->oid], std::move(to_set));
+ }
+ case Transaction::OP_RMATTR:
+ {
+ std::string name = i.decode_string();
+ return _rmattr(ctx, onodes[op->oid], name);
+ }
+ case Transaction::OP_RMATTRS:
+ {
+ return _rmattrs(ctx, onodes[op->oid]);
+ }
+ case Transaction::OP_OMAP_SETKEYS:
+ {
+ std::map<std::string, ceph::bufferlist> aset;
+ i.decode_attrset(aset);
+ return _omap_set_values(ctx, onodes[op->oid], std::move(aset));
+ }
+ case Transaction::OP_OMAP_SETHEADER:
+ {
+ ceph::bufferlist bl;
+ i.decode_bl(bl);
+ return _omap_set_header(ctx, onodes[op->oid], std::move(bl));
+ }
+ case Transaction::OP_OMAP_RMKEYS:
+ {
+ omap_keys_t keys;
+ i.decode_keyset(keys);
+ return _omap_rmkeys(ctx, onodes[op->oid], std::move(keys));
+ }
+ case Transaction::OP_OMAP_RMKEYRANGE:
+ {
+ string first, last;
+ first = i.decode_string();
+ last = i.decode_string();
+ return _omap_rmkeyrange(
+ ctx, onodes[op->oid],
+ std::move(first), std::move(last));
+ }
+ case Transaction::OP_OMAP_CLEAR:
+ {
+ return _omap_clear(ctx, onodes[op->oid]);
+ }
+ case Transaction::OP_ZERO:
+ {
+ objaddr_t off = op->off;
+ extent_len_t len = op->len;
+ return _zero(ctx, onodes[op->oid], off, len);
+ }
+ default:
+ ERROR("bad op {}", static_cast<unsigned>(op->op));
+ return crimson::ct_error::input_output_error::make();
+ }
+ } catch (std::exception &e) {
+ ERROR("got exception {}", e);
return crimson::ct_error::input_output_error::make();
}
- } catch (std::exception &e) {
- ERROR("got exception {}", e);
- return crimson::ct_error::input_output_error::make();
- }
+ }).handle_error_interruptible(
+ tm_iertr::pass_further{},
+ crimson::ct_error::enoent::handle([op] {
+ //OMAP_CLEAR, TRUNCATE, REMOVE etc ops will tolerate absent onode.
+ if (op->op == Transaction::OP_CLONERANGE ||
+ op->op == Transaction::OP_CLONE ||
+ op->op == Transaction::OP_CLONERANGE2 ||
+ op->op == Transaction::OP_COLL_ADD ||
+ op->op == Transaction::OP_SETATTR ||
+ op->op == Transaction::OP_SETATTRS ||
+ op->op == Transaction::OP_RMATTR ||
+ op->op == Transaction::OP_OMAP_SETKEYS ||
+ op->op == Transaction::OP_OMAP_RMKEYS ||
+ op->op == Transaction::OP_OMAP_RMKEYRANGE ||
+ op->op == Transaction::OP_OMAP_SETHEADER) {
+ ceph_abort_msg("unexpected enoent error");
+ }
+ }),
+ crimson::ct_error::assert_all{
+ "Invalid error in SeaStore::do_transaction_step"
+ }
+ );
}
SeaStore::tm_ret SeaStore::_remove(