From: Samuel Just Date: Tue, 26 Jan 2021 00:15:04 +0000 (-0800) Subject: crimson/os/seastore: rework transaction loop X-Git-Tag: v17.1.0~2805^2~9 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=02345ad9627ae6e439eaa4379a30bed6b6895d31;p=ceph.git crimson/os/seastore: rework transaction loop Signed-off-by: Samuel Just --- diff --git a/src/crimson/os/seastore/seastore.cc b/src/crimson/os/seastore/seastore.cc index 601ce429e445..d1c60c49c042 100644 --- a/src/crimson/os/seastore/seastore.cc +++ b/src/crimson/os/seastore/seastore.cc @@ -197,75 +197,58 @@ seastar::future> SeaStore::fiemap( return seastar::make_ready_future>(); } +void SeaStore::on_error(ceph::os::Transaction &t) { + logger().error(" transaction dump:\n"); + JSONFormatter f(true); + f.open_object_section("transaction"); + t.dump(&f); + f.close_section(); + std::stringstream str; + f.flush(str); + logger().error("{}", str.str()); + abort(); +} + seastar::future<> SeaStore::do_transaction( CollectionRef _ch, ceph::os::Transaction&& _t) { - return seastar::do_with( - _t.begin(), - transaction_manager.create_transaction(), - std::vector(), + return repeat_with_internal_context( + _ch, std::move(_t), - std::move(_ch), - [this](auto &iter, auto &trans, auto &onodes, auto &t, auto &ch) { + [this](auto &ctx) { return onode_manager->get_or_create_onodes( - *trans, iter.get_objects()).safe_then( - [this, &iter, &trans, &onodes, &t, &ch](auto &&read_onodes) { - onodes = std::move(read_onodes); - return seastar::do_until( - [&iter]() { return iter.have_op(); }, - [this, &iter, &trans, &onodes, &t, &ch]() { - return _do_transaction_step(trans, ch, onodes, iter).safe_then( - [this, &trans] { - return transaction_manager.submit_transaction(std::move(trans)); - }).handle_error( - // TODO: add errorator::do_until - crimson::ct_error::eagain::handle([]() { - // TODO retry - }), - write_ertr::all_same_way([&t](auto e) { - logger().error(" transaction dump:\n"); - JSONFormatter f(true); - f.open_object_section("transaction"); - t.dump(&f); - f.close_section(); - std::stringstream str; - f.flush(str); - logger().error("{}", str.str()); - abort(); - })); - }); - }).safe_then([this, &trans, &onodes]() { - return onode_manager->write_dirty(*trans, onodes); - }).safe_then([]() { - // TODO: complete transaction! - return; - }).handle_error( - write_ertr::all_same_way([&t](auto e) { - logger().error(" transaction dump:\n"); - JSONFormatter f(true); - f.open_object_section("transaction"); - t.dump(&f); - f.close_section(); - std::stringstream str; - f.flush(str); - logger().error("{}", str.str()); - abort(); - })).then([&t]() { - for (auto i : { - t.get_on_applied(), - t.get_on_commit(), - t.get_on_applied_sync()}) { - if (i) { - i->complete(0); - } - } + *ctx.transaction, ctx.iter.get_objects() + ).safe_then([this, &ctx](auto &&read_onodes) { + ctx.onodes = std::move(read_onodes); + return crimson::do_until( + [this, &ctx] { + return _do_transaction_step( + ctx, ctx.ch, ctx.onodes, ctx.iter + ).safe_then([&ctx] { + return seastar::make_ready_future(!ctx.iter.have_op()); }); + }); + }).safe_then([this, &ctx] { + return onode_manager->write_dirty(*ctx.transaction, ctx.onodes); + }).safe_then([this, &ctx] { + return transaction_manager.submit_transaction(std::move(ctx.transaction)); + }).safe_then([&ctx]() { + for (auto i : { + ctx.ext_transaction.get_on_applied(), + ctx.ext_transaction.get_on_commit(), + ctx.ext_transaction.get_on_applied_sync()}) { + if (i) { + i->complete(0); + } + } + return tm_ertr::now(); + }); }); } -SeaStore::write_ertr::future<> SeaStore::_do_transaction_step( - TransactionRef &trans, +SeaStore::tm_ret SeaStore::_do_transaction_step( + internal_context_t &ctx, CollectionRef &col, std::vector &onodes, ceph::os::Transaction::iterator &i) @@ -279,15 +262,15 @@ SeaStore::write_ertr::future<> SeaStore::_do_transaction_step( try { switch (auto op = i.decode_op(); op->op) { case Transaction::OP_NOP: - return write_ertr::now(); + return tm_ertr::now(); case Transaction::OP_REMOVE: { - return _remove(trans, get_onode(op->oid)); + return _remove(ctx, get_onode(op->oid)); } break; case Transaction::OP_TOUCH: { - return _touch(trans, get_onode(op->oid)); + return _touch(ctx, get_onode(op->oid)); } break; case Transaction::OP_WRITE: @@ -297,13 +280,13 @@ SeaStore::write_ertr::future<> SeaStore::_do_transaction_step( uint32_t fadvise_flags = i.get_fadvise_flags(); ceph::bufferlist bl; i.decode_bl(bl); - return _write(trans, get_onode(op->oid), off, len, bl, fadvise_flags); + return _write(ctx, get_onode(op->oid), off, len, bl, fadvise_flags); } break; case Transaction::OP_TRUNCATE: { uint64_t off = op->off; - return _truncate(trans, get_onode(op->oid), off); + return _truncate(ctx, get_onode(op->oid), off); } break; case Transaction::OP_SETATTR: @@ -313,34 +296,34 @@ SeaStore::write_ertr::future<> SeaStore::_do_transaction_step( i.decode_bl(bl); std::map to_set; to_set[name] = bufferptr(bl.c_str(), bl.length()); - return _setattrs(trans, get_onode(op->oid), to_set); + return _setattrs(ctx, get_onode(op->oid), to_set); } break; case Transaction::OP_MKCOLL: { coll_t cid = i.get_cid(op->cid); - return _create_collection(trans, cid, op->split_bits); + return _create_collection(ctx, cid, op->split_bits); } break; case Transaction::OP_OMAP_SETKEYS: { std::map aset; i.decode_attrset(aset); - return _omap_set_values(trans, get_onode(op->oid), std::move(aset)); + return _omap_set_values(ctx, get_onode(op->oid), std::move(aset)); } break; case Transaction::OP_OMAP_SETHEADER: { ceph::bufferlist bl; i.decode_bl(bl); - return _omap_set_header(trans, get_onode(op->oid), bl); + return _omap_set_header(ctx, get_onode(op->oid), bl); } break; case Transaction::OP_OMAP_RMKEYS: { omap_keys_t keys; i.decode_keyset(keys); - return _omap_rmkeys(trans, get_onode(op->oid), keys); + return _omap_rmkeys(ctx, get_onode(op->oid), keys); } break; case Transaction::OP_OMAP_RMKEYRANGE: @@ -348,14 +331,14 @@ SeaStore::write_ertr::future<> SeaStore::_do_transaction_step( string first, last; first = i.decode_string(); last = i.decode_string(); - return _omap_rmkeyrange(trans, get_onode(op->oid), first, last); + return _omap_rmkeyrange(ctx, get_onode(op->oid), first, last); } break; case Transaction::OP_COLL_HINT: { ceph::bufferlist hint; i.decode_bl(hint); - return write_ertr::now(); + return tm_ertr::now(); } default: logger().error("bad op {}", static_cast(op->op)); @@ -367,26 +350,26 @@ SeaStore::write_ertr::future<> SeaStore::_do_transaction_step( } } -SeaStore::write_ertr::future<> SeaStore::_remove( - TransactionRef &trans, +SeaStore::tm_ret SeaStore::_remove( + internal_context_t &ctx, OnodeRef &onode) { logger().debug("{} onode={}", __func__, *onode); - return write_ertr::now(); + return tm_ertr::now(); } -SeaStore::write_ertr::future<> SeaStore::_touch( - TransactionRef &trans, +SeaStore::tm_ret SeaStore::_touch( + internal_context_t &ctx, OnodeRef &onode) { logger().debug("{} onode={}", __func__, *onode); - return write_ertr::now(); + return tm_ertr::now(); } -SeaStore::write_ertr::future<> SeaStore::_write( - TransactionRef &trans, +SeaStore::tm_ret SeaStore::_write( + internal_context_t &ctx, OnodeRef &onode, uint64_t offset, size_t len, const ceph::bufferlist& bl, uint32_t fadvise_flags) @@ -405,11 +388,11 @@ SeaStore::write_ertr::future<> SeaStore::_write( OnodeManager::open_ertr::pass_further{} ); */ - return write_ertr::now(); + return tm_ertr::now(); } -SeaStore::write_ertr::future<> SeaStore::_omap_set_values( - TransactionRef &trans, +SeaStore::tm_ret SeaStore::_omap_set_values( + internal_context_t &ctx, OnodeRef &onode, std::map &&aset) { @@ -417,33 +400,33 @@ SeaStore::write_ertr::future<> SeaStore::_omap_set_values( "{}: {} {} keys", __func__, *onode, aset.size()); - return write_ertr::now(); + return tm_ertr::now(); } -SeaStore::write_ertr::future<> SeaStore::_omap_set_header( - TransactionRef &trans, +SeaStore::tm_ret SeaStore::_omap_set_header( + internal_context_t &ctx, OnodeRef &onode, const ceph::bufferlist &header) { logger().debug( "{}: {} {} bytes", __func__, *onode, header.length()); - return write_ertr::now(); + return tm_ertr::now(); } -SeaStore::write_ertr::future<> SeaStore::_omap_rmkeys( - TransactionRef &trans, +SeaStore::tm_ret SeaStore::_omap_rmkeys( + internal_context_t &ctx, OnodeRef &onode, const omap_keys_t& aset) { logger().debug( "{} {} {} keys", __func__, *onode, aset.size()); - return write_ertr::now(); + return tm_ertr::now(); } -SeaStore::write_ertr::future<> SeaStore::_omap_rmkeyrange( - TransactionRef &trans, +SeaStore::tm_ret SeaStore::_omap_rmkeyrange( + internal_context_t &ctx, OnodeRef &onode, const std::string &first, const std::string &last) @@ -451,34 +434,34 @@ SeaStore::write_ertr::future<> SeaStore::_omap_rmkeyrange( logger().debug( "{} {} first={} last={}", __func__, *onode, first, last); - return write_ertr::now(); + return tm_ertr::now(); } -SeaStore::write_ertr::future<> SeaStore::_truncate( - TransactionRef &trans, +SeaStore::tm_ret SeaStore::_truncate( + internal_context_t &ctx, OnodeRef &onode, uint64_t size) { logger().debug("{} onode={} size={}", __func__, *onode, size); - return write_ertr::now(); + return tm_ertr::now(); } -SeaStore::write_ertr::future<> SeaStore::_setattrs( - TransactionRef &trans, +SeaStore::tm_ret SeaStore::_setattrs( + internal_context_t &ctx, OnodeRef &onode, std::map& aset) { logger().debug("{} onode={}", __func__, *onode); - return write_ertr::now(); + return tm_ertr::now(); } -SeaStore::write_ertr::future<> SeaStore::_create_collection( - TransactionRef &trans, +SeaStore::tm_ret SeaStore::_create_collection( + internal_context_t &ctx, const coll_t& cid, int bits) { - return write_ertr::now(); + return tm_ertr::now(); } boost::intrusive_ptr SeaStore::_get_collection(const coll_t& cid) diff --git a/src/crimson/os/seastore/seastore.h b/src/crimson/os/seastore/seastore.h index 4e57542a14fd..25ccdd899c7c 100644 --- a/src/crimson/os/seastore/seastore.h +++ b/src/crimson/os/seastore/seastore.h @@ -115,54 +115,99 @@ public: } private: + struct internal_context_t { + CollectionRef ch; + ceph::os::Transaction ext_transaction; + + internal_context_t( + CollectionRef ch, + ceph::os::Transaction &&_ext_transaction) + : ch(ch), ext_transaction(std::move(_ext_transaction)), + iter(ext_transaction.begin()) {} + + TransactionRef transaction; + std::vector onodes; + + ceph::os::Transaction::iterator iter; + + void reset(TransactionRef &&t) { + transaction = std::move(t); + onodes.clear(); + iter = ext_transaction.begin(); + } + }; + + static void on_error(ceph::os::Transaction &t); + + template + auto repeat_with_internal_context( + CollectionRef ch, + ceph::os::Transaction &&t, + F &&f) { + return seastar::do_with( + internal_context_t{ ch, std::move(t) }, + std::forward(f), + [](auto &ctx, auto &f) { + return repeat_eagain([&]() { + ctx.reset(make_transaction()); + return std::invoke(f, ctx); + }).handle_error( + crimson::ct_error::eagain::pass_further{}, + crimson::ct_error::all_same_way([&ctx](auto e) { + on_error(ctx.ext_transaction); + }) + ); + }); + } + TransactionManager &transaction_manager; std::unique_ptr onode_manager; - using write_ertr = crimson::errorator< - crimson::ct_error::input_output_error>; - write_ertr::future<> _do_transaction_step( - TransactionRef &trans, + using tm_ertr = TransactionManager::base_ertr; + using tm_ret = tm_ertr::future<>; + tm_ret _do_transaction_step( + internal_context_t &ctx, CollectionRef &col, std::vector &onodes, ceph::os::Transaction::iterator &i); - write_ertr::future<> _remove( - TransactionRef &trans, + tm_ret _remove( + internal_context_t &ctx, OnodeRef &onode); - write_ertr::future<> _touch( - TransactionRef &trans, + tm_ret _touch( + internal_context_t &ctx, OnodeRef &onode); - write_ertr::future<> _write( - TransactionRef &trans, + tm_ret _write( + internal_context_t &ctx, OnodeRef &onode, uint64_t offset, size_t len, const ceph::bufferlist& bl, uint32_t fadvise_flags); - write_ertr::future<> _omap_set_values( - TransactionRef &trans, + tm_ret _omap_set_values( + internal_context_t &ctx, OnodeRef &onode, std::map &&aset); - write_ertr::future<> _omap_set_header( - TransactionRef &trans, + tm_ret _omap_set_header( + internal_context_t &ctx, OnodeRef &onode, const ceph::bufferlist &header); - write_ertr::future<> _omap_rmkeys( - TransactionRef &trans, + tm_ret _omap_rmkeys( + internal_context_t &ctx, OnodeRef &onode, const omap_keys_t& aset); - write_ertr::future<> _omap_rmkeyrange( - TransactionRef &trans, + tm_ret _omap_rmkeyrange( + internal_context_t &ctx, OnodeRef &onode, const std::string &first, const std::string &last); - write_ertr::future<> _truncate( - TransactionRef &trans, + tm_ret _truncate( + internal_context_t &ctx, OnodeRef &onode, uint64_t size); - write_ertr::future<> _setattrs( - TransactionRef &trans, + tm_ret _setattrs( + internal_context_t &ctx, OnodeRef &onode, std::map& aset); - write_ertr::future<> _create_collection( - TransactionRef &trans, + tm_ret _create_collection( + internal_context_t &ctx, const coll_t& cid, int bits); boost::intrusive_ptr _get_collection(const coll_t& cid);