From 85c89155dcf255d40378cc177314ee70446bdee0 Mon Sep 17 00:00:00 2001 From: Xuehan Xu Date: Thu, 19 Aug 2021 13:33:36 +0800 Subject: [PATCH] crimson/common: keep ref count of crimson::interruptible::interrupt_cond Currently, interrupt conditionss are transfered between inner and outer continuation chains via a tls interrupt_cond variable. This simple strategy leads to problem when it comes to mixing normal future/continuation procedures and seastar::thread. When seastar::async() is called, the reactor can directly invoke the passed functor and lead to two different scenarios: 1.if a seastar::get/yield() inside the passed lambda, the interrupt_cond should be erased at the end of the continuation execution when it is yielded back; 2.otherwise, the interrupt_cond should be not erased. There can be so many possible sequences of yielding of several different fibers that we can hardly judge at the end of the continuation execution whether there was a yielding during the current execution, which means we can't be able to know whether the tls interrupt_cond should be erased. There could be other scenarios where the current strategy fails. To end this kind of issues once and for all, we involve the ref counting mechinary. Fixes: https://tracker.ceph.com/issues/52305 Signed-off-by: Xuehan Xu --- src/crimson/common/interruptible_future.h | 239 +++++++++--------- src/crimson/os/seastore/transaction.cc | 2 +- src/test/crimson/test_interruptible_future.cc | 40 +-- 3 files changed, 139 insertions(+), 142 deletions(-) diff --git a/src/crimson/common/interruptible_future.h b/src/crimson/common/interruptible_future.h index 60cfe48f332d4..2f48d4dc61a79 100644 --- a/src/crimson/common/interruptible_future.h +++ b/src/crimson/common/interruptible_future.h @@ -76,8 +76,43 @@ template using InterruptCondRef = seastar::lw_shared_ptr; template -thread_local InterruptCondRef interrupt_cond; -extern template thread_local InterruptCondRef +struct interrupt_cond_t { + InterruptCondRef interrupt_cond; + uint64_t ref_count = 0; + void set( + InterruptCondRef& ic) { + if (!interrupt_cond) { + interrupt_cond = ic; + } + assert(interrupt_cond.get() == ic.get()); + ref_count++; + INTR_FUT_DEBUG( + "{}: interrupt_cond: {}, ref_count: {}", + __func__, + (void*)interrupt_cond.get(), + ref_count); + } + void reset() { + if (--ref_count == 0) { + INTR_FUT_DEBUG( + "call_with_interruption_impl clearing interrupt_cond: {},{}", + (void*)interrupt_cond.get(), + typeid(InterruptCond).name()); + interrupt_cond.release(); + } else { + INTR_FUT_DEBUG( + "call_with_interruption_impl end without clearing interrupt_cond: {},{}, ref_count: {}", + (void*)interrupt_cond.get(), + typeid(InterruptCond).name(), + ref_count); + } + } +}; + +template +thread_local interrupt_cond_t interrupt_cond; + +extern template thread_local interrupt_cond_t interrupt_cond; template @@ -113,7 +148,6 @@ auto call_with_interruption_impl( // In this case, as crimson::do_for_each would directly do futurize_invoke // for "call_with_interruption", we have make sure this invocation would // errorly release ::crimson::interruptible::interrupt_cond - bool set_int_cond = false; // If there exists an interrupt condition, which means "Func" may not be // permitted to run as a result of the interruption, test it. If it does @@ -129,17 +163,12 @@ auto call_with_interruption_impl( "global interrupt_cond: {},{}", interrupt, (void*)interrupt_condition.get(), - (void*)interrupt_cond.get(), + (void*)interrupt_cond.interrupt_cond.get(), typeid(InterruptCond).name()); if (interrupt) { return std::move(*fut); } - if (!interrupt_cond) { - set_int_cond = true; - interrupt_cond = interrupt_condition; - } - ceph_assert(interrupt_cond.get() - == interrupt_condition.get()); + interrupt_cond.set(interrupt_condition); } auto fut = seastar::futurize_invoke( @@ -147,18 +176,7 @@ auto call_with_interruption_impl( std::forward(args)...); // Clear the global "interrupt_cond" to prevent it from interfering other // continuation chains. - if (set_int_cond && interrupt_cond) { - INTR_FUT_DEBUG( - "call_with_interruption_impl clearing interrupt_cond: {},{}", - (void*)interrupt_cond.get(), - typeid(InterruptCond).name()); - interrupt_cond.release(); - } else { - INTR_FUT_DEBUG( - "call_with_interruption_impl end without clearing interrupt_cond: {},{}", - (void*)interrupt_cond.get(), - typeid(InterruptCond).name()); - } + interrupt_cond.reset(); return fut; } @@ -238,21 +256,19 @@ Result non_futurized_call_with_interruption( InterruptCondRef interrupt_condition, Func&& func, T&&... args) { - bool set_int_cond = false; - if (!interrupt_cond && interrupt_condition) { + if (interrupt_condition) { auto [interrupt, fut] = interrupt_condition->template may_interrupt>(); - if (interrupt) { - std::rethrow_exception(fut->get_exception()); - } - set_int_cond = true; INTR_FUT_DEBUG( "non_futurized_call_with_interruption may_interrupt: {}, " "interrupt_condition: {}, interrupt_cond: {},{}", interrupt, (void*)interrupt_condition.get(), - (void*)interrupt_cond.get(), + (void*)interrupt_cond.interrupt_cond.get(), typeid(InterruptCond).name()); - interrupt_cond = std::move(interrupt_condition); + if (interrupt) { + std::rethrow_exception(fut->get_exception()); + } + interrupt_cond.set(interrupt_condition); } try { if constexpr (std::is_void_v) { @@ -260,50 +276,17 @@ Result non_futurized_call_with_interruption( // Clear the global "interrupt_cond" to prevent it from interfering other // continuation chains. - if (set_int_cond && interrupt_cond) { - INTR_FUT_DEBUG( - "non_futurized_call_with_interruption clearing interrupt_cond: {},{}", - (void*)interrupt_cond.get(), - typeid(InterruptCond).name()); - interrupt_cond.release(); - } else { - INTR_FUT_DEBUG( - "non_futurized_call_with_interruption end without clearing interrupt_cond: {},{}", - (void*)interrupt_cond.get(), - typeid(InterruptCond).name()); - } + interrupt_cond.reset(); return; } else { auto&& err = std::invoke(std::forward(func), std::forward(args)...); - if (set_int_cond && interrupt_cond) { - INTR_FUT_DEBUG( - "non_futurized_call_with_interruption clearing interrupt_cond: {},{}", - (void*)interrupt_cond.get(), - typeid(InterruptCond).name()); - interrupt_cond.release(); - } else { - INTR_FUT_DEBUG( - "non_futurized_call_with_interruption end without clearing interrupt_cond: {},{}", - (void*)interrupt_cond.get(), - typeid(InterruptCond).name()); - } + interrupt_cond.reset(); return std::forward(err); } } catch (std::exception& e) { // Clear the global "interrupt_cond" to prevent it from interfering other // continuation chains. - if (set_int_cond && interrupt_cond) { - INTR_FUT_DEBUG( - "non_futurized_call_with_interruption clearing interrupt_cond: {},{}", - (void*)interrupt_cond.get(), - typeid(InterruptCond).name()); - interrupt_cond.release(); - } else { - INTR_FUT_DEBUG( - "non_futurized_call_with_interruption end without clearing interrupt_cond: {},{}", - (void*)interrupt_cond.get(), - typeid(InterruptCond).name()); - } + interrupt_cond.reset(); throw e; } } @@ -408,16 +391,17 @@ public: return core_type::get(); } else { // destined to wait! - auto interruption_condition = interrupt_cond; + auto interruption_condition = interrupt_cond.interrupt_cond; INTR_FUT_DEBUG( "interruptible_future_detail::get() waiting, interrupt_cond: {},{}", - (void*)interrupt_cond.get(), + (void*)interrupt_cond.interrupt_cond.get(), typeid(InterruptCond).name()); + interrupt_cond.reset(); auto&& value = core_type::get(); - interrupt_cond = interruption_condition; + interrupt_cond.set(interruption_condition); INTR_FUT_DEBUG( "interruptible_future_detail::get() got, interrupt_cond: {},{}", - (void*)interrupt_cond.get(), + (void*)interrupt_cond.interrupt_cond.get(), typeid(InterruptCond).name()); return std::move(value); } @@ -431,9 +415,10 @@ public: std::invoke_result_t>>> [[gnu::always_inline]] Result then_wrapped_interruptible(Func&& func) { - ceph_assert(interrupt_cond); + ceph_assert(interrupt_cond.interrupt_cond); return core_type::then_wrapped( - [func=std::move(func), interrupt_condition=interrupt_cond] + [func=std::move(func), + interrupt_condition=interrupt_cond.interrupt_cond] (auto&& fut) mutable { return call_with_interruption( std::move(interrupt_condition), @@ -445,10 +430,11 @@ public: template [[gnu::always_inline]] auto then_interruptible(Func&& func) { - ceph_assert(interrupt_cond); + ceph_assert(interrupt_cond.interrupt_cond); if constexpr (std::is_void_v) { auto fut = core_type::then( - [func=std::move(func), interrupt_condition=interrupt_cond] + [func=std::move(func), + interrupt_condition=interrupt_cond.interrupt_cond] () mutable { return call_with_interruption( interrupt_condition, @@ -457,7 +443,8 @@ public: return (interrupt_futurize_t)(std::move(fut)); } else { auto fut = core_type::then( - [func=std::move(func), interrupt_condition=interrupt_cond] + [func=std::move(func), + interrupt_condition=interrupt_cond.interrupt_cond] (T&& arg) mutable { return call_with_interruption( interrupt_condition, @@ -481,10 +468,11 @@ public: std::result_of_t>> [[gnu::always_inline]] Result handle_exception_interruptible(Func&& func) { - ceph_assert(interrupt_cond); + ceph_assert(interrupt_cond.interrupt_cond); return core_type::then_wrapped( [func=std::forward(func), - interrupt_condition=interrupt_cond](auto&& fut) mutable { + interrupt_condition=interrupt_cond.interrupt_cond] + (auto&& fut) mutable { if (!fut.failed()) { return seastar::make_ready_future(fut.get()); } else { @@ -502,10 +490,11 @@ public: [[gnu::always_inline]] Result finally_interruptible(Func&& func) { if constexpr (may_interrupt) { - ceph_assert(interrupt_cond); + ceph_assert(interrupt_cond.interrupt_cond); return core_type::then_wrapped( [func=std::forward(func), - interrupt_condition=interrupt_cond](auto&& fut) mutable { + interrupt_condition=interrupt_cond.interrupt_cond] + (auto&& fut) mutable { return call_with_interruption( interrupt_condition, std::move(func)); @@ -521,14 +510,14 @@ public: typename seastar::function_traits::template arg<0>::type)>>> [[gnu::always_inline]] Result handle_exception_type_interruptible(Func&& func) { - ceph_assert(interrupt_cond); + ceph_assert(interrupt_cond.interrupt_cond); using trait = seastar::function_traits; static_assert(trait::arity == 1, "func can take only one parameter"); using ex_type = typename trait::template arg<0>::type; return core_type::then_wrapped( [func=std::forward(func), - interrupt_condition=interrupt_cond](auto&& fut) mutable - -> Result { + interrupt_condition=interrupt_cond.interrupt_cond] + (auto&& fut) mutable -> Result { if (!fut.failed()) { return seastar::make_ready_future(fut.get()); } else { @@ -580,7 +569,8 @@ private: [[gnu::always_inline]] Result then_wrapped(Func&& func) { return core_type::then_wrapped( - [func=std::move(func), interrupt_condition=interrupt_cond] + [func=std::move(func), + interrupt_condition=interrupt_cond.interrupt_cond] (auto&& fut) mutable { return call_with_interruption( interrupt_condition, @@ -749,16 +739,17 @@ public: typename U = T, std::enable_if_t && interruptible, int> = 0> [[gnu::always_inline]] auto safe_then_interruptible(ValueInterruptCondT&& valfunc, ErrorVisitorT&& errfunc) { - ceph_assert(interrupt_cond); + ceph_assert(interrupt_cond.interrupt_cond); auto fut = core_type::safe_then( - [func=std::move(valfunc), interrupt_condition=interrupt_cond] + [func=std::move(valfunc), + interrupt_condition=interrupt_cond.interrupt_cond] (T&& args) mutable { return call_with_interruption( interrupt_condition, std::move(func), std::forward(args)); }, [func=std::move(errfunc), - interrupt_condition=interrupt_cond] + interrupt_condition=interrupt_cond.interrupt_cond] (auto&& err) mutable -> decltype(auto) { constexpr bool return_void = std::is_void_v< std::invoke_result_t && interruptible, int> = 0> [[gnu::always_inline]] auto safe_then_interruptible(ValueInterruptCondT&& valfunc, ErrorVisitorT&& errfunc) { - ceph_assert(interrupt_cond); + ceph_assert(interrupt_cond.interrupt_cond); auto fut = core_type::safe_then( - [func=std::move(valfunc), interrupt_condition=interrupt_cond] + [func=std::move(valfunc), + interrupt_condition=interrupt_cond.interrupt_cond] () mutable { return call_with_interruption( interrupt_condition, std::move(func)); }, [func=std::move(errfunc), - interrupt_condition=interrupt_cond] + interrupt_condition=interrupt_cond.interrupt_cond] (auto&& err) mutable -> decltype(auto) { constexpr bool return_void = std::is_void_v< std::invoke_result_t && interruptible, int> = 0> [[gnu::always_inline]] auto safe_then_interruptible(ValueInterruptCondT&& valfunc) { - ceph_assert(interrupt_cond); + ceph_assert(interrupt_cond.interrupt_cond); auto fut = core_type::safe_then( [func=std::move(valfunc), - interrupt_condition=interrupt_cond] + interrupt_condition=interrupt_cond.interrupt_cond] () mutable { return call_with_interruption( interrupt_condition, @@ -852,10 +844,10 @@ public: typename U = T, std::enable_if_t && interruptible, int> = 0> [[gnu::always_inline]] auto safe_then_interruptible(ValueInterruptCondT&& valfunc) { - ceph_assert(interrupt_cond); + ceph_assert(interrupt_cond.interrupt_cond); auto fut = core_type::safe_then( [func=std::move(valfunc), - interrupt_condition=interrupt_cond] + interrupt_condition=interrupt_cond.interrupt_cond] (T&& arg) mutable { return call_with_interruption( interrupt_condition, @@ -918,10 +910,10 @@ public: template auto handle_error_interruptible(ErrorFunc&& errfunc) { if constexpr (interruptible) { - ceph_assert(interrupt_cond); + ceph_assert(interrupt_cond.interrupt_cond); auto fut = core_type::handle_error( [errfunc=std::move(errfunc), - interrupt_condition=interrupt_cond] + interrupt_condition=interrupt_cond.interrupt_cond] (auto&& err) mutable -> decltype(auto) { constexpr bool return_void = std::is_void_v< std::invoke_result_t auto handle_error_interruptible(ErrorFuncHead&& error_func_head, ErrorFuncTail&&... error_func_tail) { - ceph_assert(interrupt_cond); + ceph_assert(interrupt_cond.interrupt_cond); static_assert(sizeof...(ErrorFuncTail) > 0); return this->handle_error_interruptible( ::crimson::composer( @@ -981,7 +973,8 @@ private: using futurator_t = typename ret_ertr_t::template futurize; return core_type::then_wrapped( [func=std::move(func), - interrupt_condition=interrupt_cond](auto&& fut) mutable + interrupt_condition=interrupt_cond.interrupt_cond] + (auto&& fut) mutable -> typename futurator_t::type { if (fut.failed()) { std::exception_ptr ex = fut.get_exception(); @@ -1123,7 +1116,8 @@ public: static inline auto with_interruption_cond( OpFunc&& opfunc, OnInterrupt&& efunc, InterruptCond &&cond, Params&&... params) { INTR_FUT_DEBUG( - "with_interruption_cond: interrupt_cond: {}", (void*)interrupt_cond.get()); + "with_interruption_cond: interrupt_cond: {}", + (void*)interrupt_cond.interrupt_cond.get()); return internal::call_with_interruption_impl( seastar::make_lw_shared(std::move(cond)), std::forward(opfunc), @@ -1172,7 +1166,7 @@ public: [[gnu::always_inline]] static auto wrap_function(Func&& func) { return [func=std::forward(func), - interrupt_condition=interrupt_cond]() mutable { + interrupt_condition=interrupt_cond.interrupt_cond]() mutable { return call_with_interruption( interrupt_condition, std::forward(func)); @@ -1188,7 +1182,7 @@ public: return make_interruptible( ::seastar::do_for_each(begin, end, [action=std::move(action), - interrupt_condition=interrupt_cond] + interrupt_condition=interrupt_cond.interrupt_cond] (typename Iterator::reference x) mutable { return call_with_interruption( interrupt_condition, @@ -1200,7 +1194,7 @@ public: return make_interruptible( ::crimson::do_for_each(begin, end, [action=std::move(action), - interrupt_condition=interrupt_cond] + interrupt_condition=interrupt_cond.interrupt_cond] (typename Iterator::reference x) mutable { return call_with_interruption( interrupt_condition, @@ -1220,7 +1214,7 @@ public: return make_interruptible( ::seastar::do_for_each(begin, end, [action=std::move(action), - interrupt_condition=interrupt_cond] + interrupt_condition=interrupt_cond.interrupt_cond] (typename Iterator::reference x) mutable { return call_with_interruption( interrupt_condition, @@ -1232,7 +1226,7 @@ public: return make_interruptible( ::crimson::do_for_each(begin, end, [action=std::move(action), - interrupt_condition=interrupt_cond] + interrupt_condition=interrupt_cond.interrupt_cond] (typename Iterator::reference x) mutable { return call_with_interruption( interrupt_condition, @@ -1252,7 +1246,7 @@ public: return make_interruptible( ::seastar::repeat( [action=std::move(action), - interrupt_condition=interrupt_cond] { + interrupt_condition=interrupt_cond.interrupt_cond] { return call_with_interruption( interrupt_condition, std::move(action)).to_future(); @@ -1262,7 +1256,7 @@ public: return make_interruptible( ::crimson::repeat( [action=std::move(action), - interrupt_condition=interrupt_cond] { + interrupt_condition=interrupt_cond.interrupt_cond] { return call_with_interruption( interrupt_condition, std::move(action)).to_future(); @@ -1280,7 +1274,7 @@ public: return make_interruptible( ::seastar::repeat( [action=std::move(action), - interrupt_condition=interrupt_cond] { + interrupt_condition=interrupt_cond.interrupt_cond] { return call_with_interruption( interrupt_condition, std::move(action)); @@ -1290,7 +1284,7 @@ public: return make_interruptible( ::crimson::repeat( [action=std::move(action), - interrupt_condition=interrupt_cond] { + interrupt_condition=interrupt_cond.interrupt_cond] { return call_with_interruption( interrupt_condition, std::move(action)); @@ -1309,7 +1303,7 @@ public: parallel_for_each_state* s = nullptr; auto decorated_func = [func=std::forward(func), - interrupt_condition=interrupt_cond] + interrupt_condition=interrupt_cond.interrupt_cond] (decltype(*Iterator())&& x) mutable { return call_with_interruption( interrupt_condition, @@ -1416,42 +1410,45 @@ public: typename Result = futurize_t>> static inline Result async(Func&& func) { return seastar::async([func=std::forward(func), - interrupt_condition=interrupt_cond]() mutable { + interrupt_condition=interrupt_cond.interrupt_cond] + () mutable { return non_futurized_call_with_interruption( interrupt_condition, std::forward(func)); }); } static void yield() { - ceph_assert(interrupt_cond); - auto interruption_condition = interrupt_cond; - interrupt_cond.release(); + ceph_assert(interrupt_cond.interrupt_cond); + auto interruption_condition = interrupt_cond.interrupt_cond; INTR_FUT_DEBUG( - "interruptible_future_detail::yield() yielding out, interrupt_cond {},{} cleared", + "interruptible_future_detail::yield() yielding out, " + "interrupt_cond {},{} cleared", (void*)interruption_condition.get(), typeid(InterruptCond).name()); + interrupt_cond.reset(); seastar::thread::yield(); - interrupt_cond = interruption_condition; + interrupt_cond.set(interruption_condition); INTR_FUT_DEBUG( "interruptible_future_detail::yield() yield back, interrupt_cond: {},{}", - (void*)interrupt_cond.get(), + (void*)interrupt_cond.interrupt_cond.get(), typeid(InterruptCond).name()); } static void maybe_yield() { - ceph_assert(interrupt_cond); + ceph_assert(interrupt_cond.interrupt_cond); if (seastar::thread::should_yield()) { - auto interruption_condition = interrupt_cond; - interrupt_cond.release(); + auto interruption_condition = interrupt_cond.interrupt_cond; INTR_FUT_DEBUG( - "interruptible_future_detail::may_yield() yielding out, interrupt_cond {},{} cleared", + "interruptible_future_detail::may_yield() yielding out, " + "interrupt_cond {},{} cleared", (void*)interruption_condition.get(), typeid(InterruptCond).name()); + interrupt_cond.reset(); seastar::thread::yield(); - interrupt_cond = interruption_condition; + interrupt_cond.set(interruption_condition); INTR_FUT_DEBUG( "interruptible_future_detail::may_yield() yield back, interrupt_cond: {},{}", - (void*)interrupt_cond.get(), + (void*)interrupt_cond.interrupt_cond.get(), typeid(InterruptCond).name()); } } diff --git a/src/crimson/os/seastore/transaction.cc b/src/crimson/os/seastore/transaction.cc index ce02a95ef9182..4cab476c303f6 100644 --- a/src/crimson/os/seastore/transaction.cc +++ b/src/crimson/os/seastore/transaction.cc @@ -3,6 +3,6 @@ namespace crimson::interruptible { template -thread_local InterruptCondRef<::crimson::os::seastore::TransactionConflictCondition> +thread_local interrupt_cond_t<::crimson::os::seastore::TransactionConflictCondition> interrupt_cond<::crimson::os::seastore::TransactionConflictCondition>; } diff --git a/src/test/crimson/test_interruptible_future.cc b/src/test/crimson/test_interruptible_future.cc index c6af5f4957e98..8f98a7d98bc3a 100644 --- a/src/test/crimson/test_interruptible_future.cc +++ b/src/test/crimson/test_interruptible_future.cc @@ -52,31 +52,31 @@ TEST_F(seastar_test_suite_t, basic) run_async([] { interruptor::with_interruption( [] { - ceph_assert(interruptible::interrupt_cond); + ceph_assert(interruptible::interrupt_cond.interrupt_cond); return interruptor::make_interruptible(seastar::now()) .then_interruptible([] { - ceph_assert(interruptible::interrupt_cond); + ceph_assert(interruptible::interrupt_cond.interrupt_cond); }).then_interruptible([] { - ceph_assert(interruptible::interrupt_cond); + ceph_assert(interruptible::interrupt_cond.interrupt_cond); return errorator::make_ready_future<>(); }).safe_then_interruptible([] { - ceph_assert(interruptible::interrupt_cond); + ceph_assert(interruptible::interrupt_cond.interrupt_cond); return seastar::now(); }, errorator::all_same_way([] { - ceph_assert(interruptible::interrupt_cond); + ceph_assert(interruptible::interrupt_cond.interrupt_cond); }) ); }, [](std::exception_ptr) {}, false).get0(); interruptor::with_interruption( [] { - ceph_assert(interruptible::interrupt_cond); + ceph_assert(interruptible::interrupt_cond.interrupt_cond); return interruptor::make_interruptible(seastar::now()) .then_interruptible([] { - ceph_assert(interruptible::interrupt_cond); + ceph_assert(interruptible::interrupt_cond.interrupt_cond); }); }, [](std::exception_ptr) { - ceph_assert(!interruptible::interrupt_cond); + ceph_assert(!interruptible::interrupt_cond.interrupt_cond); return seastar::now(); }, true).get0(); @@ -94,14 +94,14 @@ TEST_F(seastar_test_suite_t, loops) interruptor::with_interruption( [] { std::cout << "interruptiion enabled" << std::endl; - ceph_assert(interruptible::interrupt_cond); + ceph_assert(interruptible::interrupt_cond.interrupt_cond); return interruptor::make_interruptible(seastar::now()) .then_interruptible([] { std::cout << "test seastar future do_for_each" << std::endl; std::vector vec = {1, 2}; return seastar::do_with(std::move(vec), [](auto& vec) { return interruptor::do_for_each(std::begin(vec), std::end(vec), [](int) { - ceph_assert(interruptible::interrupt_cond); + ceph_assert(interruptible::interrupt_cond.interrupt_cond); return seastar::now(); }); }); @@ -110,14 +110,14 @@ TEST_F(seastar_test_suite_t, loops) std::vector vec = {1, 2}; return seastar::do_with(std::move(vec), [](auto& vec) { return interruptor::do_for_each(std::begin(vec), std::end(vec), [](int) { - ceph_assert(interruptible::interrupt_cond); + ceph_assert(interruptible::interrupt_cond.interrupt_cond); return interruptor::make_interruptible(seastar::now()); }); }); }).then_interruptible([] { std::cout << "test seastar future repeat" << std::endl; return interruptor::repeat([] { - ceph_assert(interruptible::interrupt_cond); + ceph_assert(interruptible::interrupt_cond.interrupt_cond); return interruptor::make_interruptible( seastar::make_ready_future< seastar::stop_iteration>( @@ -126,7 +126,7 @@ TEST_F(seastar_test_suite_t, loops) }).then_interruptible([] { std::cout << "test interruptible seastar future repeat" << std::endl; return interruptor::repeat([] { - ceph_assert(interruptible::interrupt_cond); + ceph_assert(interruptible::interrupt_cond.interrupt_cond); return seastar::make_ready_future< seastar::stop_iteration>( seastar::stop_iteration::yes); @@ -138,14 +138,14 @@ TEST_F(seastar_test_suite_t, loops) using namespace std::chrono_literals; return interruptor::make_interruptible(seastar::now()).then_interruptible([&vec] { return interruptor::do_for_each(std::begin(vec), std::end(vec), [](int) { - ceph_assert(interruptible::interrupt_cond); + ceph_assert(interruptible::interrupt_cond.interrupt_cond); return interruptor::make_interruptible( errorator::make_ready_future<>()); }).safe_then_interruptible([] { - ceph_assert(interruptible::interrupt_cond); + ceph_assert(interruptible::interrupt_cond.interrupt_cond); return seastar::now(); }, errorator::all_same_way([] { - ceph_assert(interruptible::interrupt_cond); + ceph_assert(interruptible::interrupt_cond.interrupt_cond); })); }); }); @@ -156,18 +156,18 @@ TEST_F(seastar_test_suite_t, loops) using namespace std::chrono_literals; return interruptor::make_interruptible(seastar::now()).then_interruptible([&vec] { return interruptor::do_for_each(std::begin(vec), std::end(vec), [](int) { - ceph_assert(interruptible::interrupt_cond); + ceph_assert(interruptible::interrupt_cond.interrupt_cond); return errorator::make_ready_future<>(); }).safe_then_interruptible([] { - ceph_assert(interruptible::interrupt_cond); + ceph_assert(interruptible::interrupt_cond.interrupt_cond); return seastar::now(); }, errorator::all_same_way([] { - ceph_assert(interruptible::interrupt_cond); + ceph_assert(interruptible::interrupt_cond.interrupt_cond); })); }); }); }).then_interruptible([] { - ceph_assert(interruptible::interrupt_cond); + ceph_assert(interruptible::interrupt_cond.interrupt_cond); return seastar::now(); }); }, [](std::exception_ptr) {}, false).get0(); -- 2.39.5