CollectionRef _ch,
ceph::os::Transaction&& _t)
{
+ LOG_PREFIX(SeaStoreS::do_transaction_no_callbacks);
++(shard_stats.io_num);
++(shard_stats.pending_io_num);
++(shard_stats.starting_io_num);
- // repeat_with_internal_context ensures ordering via collection lock
- auto num_bytes = _t.get_num_bytes();
- return repeat_with_internal_context(
- _ch,
- std::move(_t),
- Transaction::src_t::MUTATE,
- "do_transaction",
- op_type_t::DO_TRANSACTION,
- [this, num_bytes](auto &ctx) {
- LOG_PREFIX(SeaStoreS::do_transaction_no_callbacks);
- return with_trans_intr(*ctx.transaction, [&ctx, this, FNAME, num_bytes](auto &t) {
- DEBUGT("cid={}, {} operations, 0x{:x} bytes, {} colls, {} objects ...",
- t, ctx.ch->get_cid(),
- ctx.ext_transaction.get_num_ops(),
- num_bytes,
- ctx.iter.colls.size(),
- ctx.iter.objects.size());
+ auto flags = _t.get_fadvise_flags();
+ internal_context_t ctx{
+ _ch, std::move(_t),
+ transaction_manager->create_transaction(
+ Transaction::src_t::MUTATE,
+ "do_transaction",
+ flags)
+ };
+
+ assert(shard_stats.starting_io_num);
+ --(shard_stats.starting_io_num);
+ ++(shard_stats.waiting_collock_io_num);
+
+ co_await ctx.transaction->get_handle().take_collection_lock(
+ static_cast<SeastoreCollection&>(*(ctx.ch)).ordering_lock
+ );
+
+ assert(shard_stats.waiting_collock_io_num);
+ --(shard_stats.waiting_collock_io_num);
+ ++(shard_stats.waiting_throttler_io_num);
+
+ co_await throttler.get(1);
+
+ assert(shard_stats.waiting_throttler_io_num);
+ --(shard_stats.waiting_throttler_io_num);
+ ++(shard_stats.processing_inlock_io_num);
+
+ co_await with_repeat_trans_intr(
+ *ctx.transaction,
+ seastar::coroutine::lambda([&ctx, this, FNAME](auto &t)
+ -> tm_ret {
+ ++(shard_stats.repeat_io_num);
#ifndef NDEBUG
- TRACET(" transaction dump:\n", t);
- JSONFormatter f(true);
- f.open_object_section("transaction");
- ctx.ext_transaction.dump(&f);
- f.close_section();
- std::stringstream str;
- f.flush(str);
- TRACET("{}", t, str.str());
+ TRACET(" transaction dump:\n", t);
+ JSONFormatter f(true);
+ f.open_object_section("transaction");
+ ctx.ext_transaction.dump(&f);
+ f.close_section();
+ std::stringstream str;
+ f.flush(str);
+ TRACET("{}", t, str.str());
#endif
- return seastar::do_with(
- std::vector<OnodeRef>(ctx.iter.objects.size()),
- [this, &ctx](auto& onodes)
- {
- return trans_intr::repeat(
- [this, &ctx, &onodes]()
- -> tm_iertr::future<seastar::stop_iteration>
- {
- if (ctx.iter.have_op()) {
- return _do_transaction_step(
- ctx, ctx.ch, onodes, ctx.iter
- ).si_then([] {
- return seastar::make_ready_future<seastar::stop_iteration>(
- seastar::stop_iteration::no);
- });
- } else {
- return seastar::make_ready_future<seastar::stop_iteration>(
- seastar::stop_iteration::yes);
- };
- });
- }).si_then([this, &ctx] {
- return transaction_manager->submit_transaction(*ctx.transaction);
- });
- }).safe_then([FNAME, &ctx] {
- DEBUGT("done", *ctx.transaction);
- });
- }
- ).finally([this] {
- assert(shard_stats.pending_io_num);
- --(shard_stats.pending_io_num);
- // XXX: it's wrong to assume no failure
- --(shard_stats.processing_postlock_io_num);
- });
+
+ DEBUGT("cid={}, {} operations, 0x{:x} bytes, {} colls, {} objects ...",
+ t, ctx.ch->get_cid(),
+ ctx.ext_transaction.get_num_ops(),
+ ctx.ext_transaction.get_num_bytes(),
+ ctx.iter.colls.size(),
+ ctx.iter.objects.size());
+
+ ctx.reset_preserve_handle(*transaction_manager);
+ std::vector<OnodeRef> onodes(ctx.iter.objects.size());
+ while (ctx.iter.have_op()) {
+ co_await _do_transaction_step(
+ ctx, ctx.ch, onodes, ctx.iter);
+ }
+
+ co_await transaction_manager->submit_transaction(*ctx.transaction);
+ })
+ ).handle_error(
+ crimson::ct_error::all_same_way([&ctx](auto e) {
+ on_error(ctx.ext_transaction);
+ return seastar::now();
+ })
+ );
+
+ DEBUGT("done", *ctx.transaction);
+ add_latency_sample(
+ op_type_t::DO_TRANSACTION,
+ std::chrono::steady_clock::now() - ctx.begin_timestamp);
+
+ throttler.put();
+
+ assert(shard_stats.pending_io_num);
+ --(shard_stats.pending_io_num);
+ // XXX: it's wrong to assume no failure
+ --(shard_stats.processing_postlock_io_num);
}
static void on_error(ceph::os::Transaction &t);
- template <typename F>
- auto repeat_with_internal_context(
- CollectionRef ch,
- ceph::os::Transaction &&t,
- Transaction::src_t src,
- const char* tname,
- op_type_t op_type,
- F &&f) {
- // The below repeat_io_num requires MUTATE
- assert(src == Transaction::src_t::MUTATE);
- return seastar::do_with(
- internal_context_t(
- ch, std::move(t),
- transaction_manager->create_transaction(
- src, tname, t.get_fadvise_flags())),
- std::forward<F>(f),
- [this, op_type](auto &ctx, auto &f) {
- assert(shard_stats.starting_io_num);
- --(shard_stats.starting_io_num);
- ++(shard_stats.waiting_collock_io_num);
-
- return ctx.transaction->get_handle().take_collection_lock(
- static_cast<SeastoreCollection&>(*(ctx.ch)).ordering_lock
- ).then([this] {
- assert(shard_stats.waiting_collock_io_num);
- --(shard_stats.waiting_collock_io_num);
- ++(shard_stats.waiting_throttler_io_num);
-
- return throttler.get(1);
- }).then([&, this] {
- assert(shard_stats.waiting_throttler_io_num);
- --(shard_stats.waiting_throttler_io_num);
- ++(shard_stats.processing_inlock_io_num);
-
- return repeat_eagain([&, this] {
- ++(shard_stats.repeat_io_num);
-
- ctx.reset_preserve_handle(*transaction_manager);
- return std::invoke(f, ctx);
- }).handle_error(
- crimson::ct_error::all_same_way([&ctx](auto e) {
- on_error(ctx.ext_transaction);
- return seastar::now();
- })
- );
- }).then([this, op_type, &ctx] {
- add_latency_sample(op_type,
- std::chrono::steady_clock::now() - ctx.begin_timestamp);
- }).finally([this] {
- throttler.put();
- });
- });
- }
-
template <typename Ret, typename F>
auto repeat_with_onode(
CollectionRef ch,