return seastar::make_ready_future<std::map<uint64_t, uint64_t>>();
}
+void SeaStore::on_error(ceph::os::Transaction &t) {
+ logger().error(" transaction dump:\n");
+ JSONFormatter f(true);
+ f.open_object_section("transaction");
+ t.dump(&f);
+ f.close_section();
+ std::stringstream str;
+ f.flush(str);
+ logger().error("{}", str.str());
+ abort();
+}
+
seastar::future<> SeaStore::do_transaction(
CollectionRef _ch,
ceph::os::Transaction&& _t)
{
- return seastar::do_with(
- _t.begin(),
- transaction_manager.create_transaction(),
- std::vector<OnodeRef>(),
+ return repeat_with_internal_context(
+ _ch,
std::move(_t),
- std::move(_ch),
- [this](auto &iter, auto &trans, auto &onodes, auto &t, auto &ch) {
+ [this](auto &ctx) {
return onode_manager->get_or_create_onodes(
- *trans, iter.get_objects()).safe_then(
- [this, &iter, &trans, &onodes, &t, &ch](auto &&read_onodes) {
- onodes = std::move(read_onodes);
- return seastar::do_until(
- [&iter]() { return iter.have_op(); },
- [this, &iter, &trans, &onodes, &t, &ch]() {
- return _do_transaction_step(trans, ch, onodes, iter).safe_then(
- [this, &trans] {
- return transaction_manager.submit_transaction(std::move(trans));
- }).handle_error(
- // TODO: add errorator::do_until
- crimson::ct_error::eagain::handle([]() {
- // TODO retry
- }),
- write_ertr::all_same_way([&t](auto e) {
- logger().error(" transaction dump:\n");
- JSONFormatter f(true);
- f.open_object_section("transaction");
- t.dump(&f);
- f.close_section();
- std::stringstream str;
- f.flush(str);
- logger().error("{}", str.str());
- abort();
- }));
- });
- }).safe_then([this, &trans, &onodes]() {
- return onode_manager->write_dirty(*trans, onodes);
- }).safe_then([]() {
- // TODO: complete transaction!
- return;
- }).handle_error(
- write_ertr::all_same_way([&t](auto e) {
- logger().error(" transaction dump:\n");
- JSONFormatter f(true);
- f.open_object_section("transaction");
- t.dump(&f);
- f.close_section();
- std::stringstream str;
- f.flush(str);
- logger().error("{}", str.str());
- abort();
- })).then([&t]() {
- for (auto i : {
- t.get_on_applied(),
- t.get_on_commit(),
- t.get_on_applied_sync()}) {
- if (i) {
- i->complete(0);
- }
- }
+ *ctx.transaction, ctx.iter.get_objects()
+ ).safe_then([this, &ctx](auto &&read_onodes) {
+ ctx.onodes = std::move(read_onodes);
+ return crimson::do_until(
+ [this, &ctx] {
+ return _do_transaction_step(
+ ctx, ctx.ch, ctx.onodes, ctx.iter
+ ).safe_then([&ctx] {
+ return seastar::make_ready_future<bool>(!ctx.iter.have_op());
});
+ });
+ }).safe_then([this, &ctx] {
+ return onode_manager->write_dirty(*ctx.transaction, ctx.onodes);
+ }).safe_then([this, &ctx] {
+ return transaction_manager.submit_transaction(std::move(ctx.transaction));
+ }).safe_then([&ctx]() {
+ for (auto i : {
+ ctx.ext_transaction.get_on_applied(),
+ ctx.ext_transaction.get_on_commit(),
+ ctx.ext_transaction.get_on_applied_sync()}) {
+ if (i) {
+ i->complete(0);
+ }
+ }
+ return tm_ertr::now();
+ });
});
}
-SeaStore::write_ertr::future<> SeaStore::_do_transaction_step(
- TransactionRef &trans,
+SeaStore::tm_ret SeaStore::_do_transaction_step(
+ internal_context_t &ctx,
CollectionRef &col,
std::vector<OnodeRef> &onodes,
ceph::os::Transaction::iterator &i)
try {
switch (auto op = i.decode_op(); op->op) {
case Transaction::OP_NOP:
- return write_ertr::now();
+ return tm_ertr::now();
case Transaction::OP_REMOVE:
{
- return _remove(trans, get_onode(op->oid));
+ return _remove(ctx, get_onode(op->oid));
}
break;
case Transaction::OP_TOUCH:
{
- return _touch(trans, get_onode(op->oid));
+ return _touch(ctx, get_onode(op->oid));
}
break;
case Transaction::OP_WRITE:
uint32_t fadvise_flags = i.get_fadvise_flags();
ceph::bufferlist bl;
i.decode_bl(bl);
- return _write(trans, get_onode(op->oid), off, len, bl, fadvise_flags);
+ return _write(ctx, get_onode(op->oid), off, len, bl, fadvise_flags);
}
break;
case Transaction::OP_TRUNCATE:
{
uint64_t off = op->off;
- return _truncate(trans, get_onode(op->oid), off);
+ return _truncate(ctx, get_onode(op->oid), off);
}
break;
case Transaction::OP_SETATTR:
i.decode_bl(bl);
std::map<std::string, bufferptr> to_set;
to_set[name] = bufferptr(bl.c_str(), bl.length());
- return _setattrs(trans, get_onode(op->oid), to_set);
+ return _setattrs(ctx, get_onode(op->oid), to_set);
}
break;
case Transaction::OP_MKCOLL:
{
coll_t cid = i.get_cid(op->cid);
- return _create_collection(trans, cid, op->split_bits);
+ return _create_collection(ctx, cid, op->split_bits);
}
break;
case Transaction::OP_OMAP_SETKEYS:
{
std::map<std::string, ceph::bufferlist> aset;
i.decode_attrset(aset);
- return _omap_set_values(trans, get_onode(op->oid), std::move(aset));
+ return _omap_set_values(ctx, get_onode(op->oid), std::move(aset));
}
break;
case Transaction::OP_OMAP_SETHEADER:
{
ceph::bufferlist bl;
i.decode_bl(bl);
- return _omap_set_header(trans, get_onode(op->oid), bl);
+ return _omap_set_header(ctx, get_onode(op->oid), bl);
}
break;
case Transaction::OP_OMAP_RMKEYS:
{
omap_keys_t keys;
i.decode_keyset(keys);
- return _omap_rmkeys(trans, get_onode(op->oid), keys);
+ return _omap_rmkeys(ctx, get_onode(op->oid), keys);
}
break;
case Transaction::OP_OMAP_RMKEYRANGE:
string first, last;
first = i.decode_string();
last = i.decode_string();
- return _omap_rmkeyrange(trans, get_onode(op->oid), first, last);
+ return _omap_rmkeyrange(ctx, get_onode(op->oid), first, last);
}
break;
case Transaction::OP_COLL_HINT:
{
ceph::bufferlist hint;
i.decode_bl(hint);
- return write_ertr::now();
+ return tm_ertr::now();
}
default:
logger().error("bad op {}", static_cast<unsigned>(op->op));
}
}
-SeaStore::write_ertr::future<> SeaStore::_remove(
- TransactionRef &trans,
+SeaStore::tm_ret SeaStore::_remove(
+ internal_context_t &ctx,
OnodeRef &onode)
{
logger().debug("{} onode={}",
__func__, *onode);
- return write_ertr::now();
+ return tm_ertr::now();
}
-SeaStore::write_ertr::future<> SeaStore::_touch(
- TransactionRef &trans,
+SeaStore::tm_ret SeaStore::_touch(
+ internal_context_t &ctx,
OnodeRef &onode)
{
logger().debug("{} onode={}",
__func__, *onode);
- return write_ertr::now();
+ return tm_ertr::now();
}
-SeaStore::write_ertr::future<> SeaStore::_write(
- TransactionRef &trans,
+SeaStore::tm_ret SeaStore::_write(
+ internal_context_t &ctx,
OnodeRef &onode,
uint64_t offset, size_t len, const ceph::bufferlist& bl,
uint32_t fadvise_flags)
OnodeManager::open_ertr::pass_further{}
);
*/
- return write_ertr::now();
+ return tm_ertr::now();
}
-SeaStore::write_ertr::future<> SeaStore::_omap_set_values(
- TransactionRef &trans,
+SeaStore::tm_ret SeaStore::_omap_set_values(
+ internal_context_t &ctx,
OnodeRef &onode,
std::map<std::string, ceph::bufferlist> &&aset)
{
"{}: {} {} keys",
__func__, *onode, aset.size());
- return write_ertr::now();
+ return tm_ertr::now();
}
-SeaStore::write_ertr::future<> SeaStore::_omap_set_header(
- TransactionRef &trans,
+SeaStore::tm_ret SeaStore::_omap_set_header(
+ internal_context_t &ctx,
OnodeRef &onode,
const ceph::bufferlist &header)
{
logger().debug(
"{}: {} {} bytes",
__func__, *onode, header.length());
- return write_ertr::now();
+ return tm_ertr::now();
}
-SeaStore::write_ertr::future<> SeaStore::_omap_rmkeys(
- TransactionRef &trans,
+SeaStore::tm_ret SeaStore::_omap_rmkeys(
+ internal_context_t &ctx,
OnodeRef &onode,
const omap_keys_t& aset)
{
logger().debug(
"{} {} {} keys",
__func__, *onode, aset.size());
- return write_ertr::now();
+ return tm_ertr::now();
}
-SeaStore::write_ertr::future<> SeaStore::_omap_rmkeyrange(
- TransactionRef &trans,
+SeaStore::tm_ret SeaStore::_omap_rmkeyrange(
+ internal_context_t &ctx,
OnodeRef &onode,
const std::string &first,
const std::string &last)
logger().debug(
"{} {} first={} last={}",
__func__, *onode, first, last);
- return write_ertr::now();
+ return tm_ertr::now();
}
-SeaStore::write_ertr::future<> SeaStore::_truncate(
- TransactionRef &trans,
+SeaStore::tm_ret SeaStore::_truncate(
+ internal_context_t &ctx,
OnodeRef &onode,
uint64_t size)
{
logger().debug("{} onode={} size={}",
__func__, *onode, size);
- return write_ertr::now();
+ return tm_ertr::now();
}
-SeaStore::write_ertr::future<> SeaStore::_setattrs(
- TransactionRef &trans,
+SeaStore::tm_ret SeaStore::_setattrs(
+ internal_context_t &ctx,
OnodeRef &onode,
std::map<std::string,bufferptr>& aset)
{
logger().debug("{} onode={}",
__func__, *onode);
- return write_ertr::now();
+ return tm_ertr::now();
}
-SeaStore::write_ertr::future<> SeaStore::_create_collection(
- TransactionRef &trans,
+SeaStore::tm_ret SeaStore::_create_collection(
+ internal_context_t &ctx,
const coll_t& cid, int bits)
{
- return write_ertr::now();
+ return tm_ertr::now();
}
boost::intrusive_ptr<SeastoreCollection> SeaStore::_get_collection(const coll_t& cid)
}
private:
+ struct internal_context_t {
+ CollectionRef ch;
+ ceph::os::Transaction ext_transaction;
+
+ internal_context_t(
+ CollectionRef ch,
+ ceph::os::Transaction &&_ext_transaction)
+ : ch(ch), ext_transaction(std::move(_ext_transaction)),
+ iter(ext_transaction.begin()) {}
+
+ TransactionRef transaction;
+ std::vector<OnodeRef> onodes;
+
+ ceph::os::Transaction::iterator iter;
+
+ void reset(TransactionRef &&t) {
+ transaction = std::move(t);
+ onodes.clear();
+ iter = ext_transaction.begin();
+ }
+ };
+
+ static void on_error(ceph::os::Transaction &t);
+
+ template <typename F>
+ auto repeat_with_internal_context(
+ CollectionRef ch,
+ ceph::os::Transaction &&t,
+ F &&f) {
+ return seastar::do_with(
+ internal_context_t{ ch, std::move(t) },
+ std::forward<F>(f),
+ [](auto &ctx, auto &f) {
+ return repeat_eagain([&]() {
+ ctx.reset(make_transaction());
+ return std::invoke(f, ctx);
+ }).handle_error(
+ crimson::ct_error::eagain::pass_further{},
+ crimson::ct_error::all_same_way([&ctx](auto e) {
+ on_error(ctx.ext_transaction);
+ })
+ );
+ });
+ }
+
TransactionManager &transaction_manager;
std::unique_ptr<OnodeManager> onode_manager;
- using write_ertr = crimson::errorator<
- crimson::ct_error::input_output_error>;
- write_ertr::future<> _do_transaction_step(
- TransactionRef &trans,
+ using tm_ertr = TransactionManager::base_ertr;
+ using tm_ret = tm_ertr::future<>;
+ tm_ret _do_transaction_step(
+ internal_context_t &ctx,
CollectionRef &col,
std::vector<OnodeRef> &onodes,
ceph::os::Transaction::iterator &i);
- write_ertr::future<> _remove(
- TransactionRef &trans,
+ tm_ret _remove(
+ internal_context_t &ctx,
OnodeRef &onode);
- write_ertr::future<> _touch(
- TransactionRef &trans,
+ tm_ret _touch(
+ internal_context_t &ctx,
OnodeRef &onode);
- write_ertr::future<> _write(
- TransactionRef &trans,
+ tm_ret _write(
+ internal_context_t &ctx,
OnodeRef &onode,
uint64_t offset, size_t len, const ceph::bufferlist& bl,
uint32_t fadvise_flags);
- write_ertr::future<> _omap_set_values(
- TransactionRef &trans,
+ tm_ret _omap_set_values(
+ internal_context_t &ctx,
OnodeRef &onode,
std::map<std::string, ceph::bufferlist> &&aset);
- write_ertr::future<> _omap_set_header(
- TransactionRef &trans,
+ tm_ret _omap_set_header(
+ internal_context_t &ctx,
OnodeRef &onode,
const ceph::bufferlist &header);
- write_ertr::future<> _omap_rmkeys(
- TransactionRef &trans,
+ tm_ret _omap_rmkeys(
+ internal_context_t &ctx,
OnodeRef &onode,
const omap_keys_t& aset);
- write_ertr::future<> _omap_rmkeyrange(
- TransactionRef &trans,
+ tm_ret _omap_rmkeyrange(
+ internal_context_t &ctx,
OnodeRef &onode,
const std::string &first,
const std::string &last);
- write_ertr::future<> _truncate(
- TransactionRef &trans,
+ tm_ret _truncate(
+ internal_context_t &ctx,
OnodeRef &onode, uint64_t size);
- write_ertr::future<> _setattrs(
- TransactionRef &trans,
+ tm_ret _setattrs(
+ internal_context_t &ctx,
OnodeRef &onode,
std::map<std::string,bufferptr>& aset);
- write_ertr::future<> _create_collection(
- TransactionRef &trans,
+ tm_ret _create_collection(
+ internal_context_t &ctx,
const coll_t& cid, int bits);
boost::intrusive_ptr<SeastoreCollection> _get_collection(const coll_t& cid);