]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/common: keep ref count of crimson::interruptible::interrupt_cond
authorXuehan Xu <xxhdx1985126@gmail.com>
Thu, 19 Aug 2021 05:33:36 +0000 (13:33 +0800)
committerXuehan Xu <xxhdx1985126@gmail.com>
Mon, 23 Aug 2021 10:19:20 +0000 (18:19 +0800)
Currently, interrupt conditionss are transfered between inner and outer continuation
chains via a tls interrupt_cond variable. This simple strategy leads to problem when it
comes to mixing normal future/continuation procedures and seastar::thread. When seastar::async()
is called, the reactor can directly invoke the passed functor and lead to two different
scenarios:

1.if a seastar::get/yield() inside the passed lambda, the interrupt_cond should be erased at the
end of the continuation execution when it is yielded back;
2.otherwise, the interrupt_cond should be not erased.

There can be so many possible sequences of yielding of several different fibers that we can hardly
judge at the end of the continuation execution whether there was a yielding during the current
execution, which means we can't be able to know whether the tls interrupt_cond should be erased.
There could be other scenarios where the current strategy fails. To end this kind of issues
once and for all, we involve the ref counting mechinary.

Fixes: https://tracker.ceph.com/issues/52305
Signed-off-by: Xuehan Xu <xxhdx1985126@gmail.com>
src/crimson/common/interruptible_future.h
src/crimson/os/seastore/transaction.cc
src/test/crimson/test_interruptible_future.cc

index 60cfe48f332d499db3a24d918e00bf085cdace04..2f48d4dc61a79cb0115486e5cf11086317142ae2 100644 (file)
@@ -76,8 +76,43 @@ template <typename InterruptCond>
 using InterruptCondRef = seastar::lw_shared_ptr<InterruptCond>;
 
 template <typename InterruptCond>
-thread_local InterruptCondRef<InterruptCond> interrupt_cond;
-extern template thread_local InterruptCondRef<crimson::os::seastore::TransactionConflictCondition>
+struct interrupt_cond_t {
+  InterruptCondRef<InterruptCond> interrupt_cond;
+  uint64_t ref_count = 0;
+  void set(
+    InterruptCondRef<InterruptCond>& ic) {
+    if (!interrupt_cond) {
+      interrupt_cond = ic;
+    }
+    assert(interrupt_cond.get() == ic.get());
+    ref_count++;
+    INTR_FUT_DEBUG(
+      "{}: interrupt_cond: {}, ref_count: {}",
+      __func__,
+      (void*)interrupt_cond.get(),
+      ref_count);
+  }
+  void reset() {
+    if (--ref_count == 0) {
+      INTR_FUT_DEBUG(
+       "call_with_interruption_impl clearing interrupt_cond: {},{}",
+       (void*)interrupt_cond.get(),
+       typeid(InterruptCond).name());
+      interrupt_cond.release();
+    } else {
+      INTR_FUT_DEBUG(
+       "call_with_interruption_impl end without clearing interrupt_cond: {},{}, ref_count: {}",
+       (void*)interrupt_cond.get(),
+       typeid(InterruptCond).name(),
+       ref_count);
+    }
+  }
+};
+
+template <typename InterruptCond>
+thread_local interrupt_cond_t<InterruptCond> interrupt_cond;
+
+extern template thread_local interrupt_cond_t<crimson::os::seastore::TransactionConflictCondition>
 interrupt_cond<crimson::os::seastore::TransactionConflictCondition>;
 
 template <typename InterruptCond, typename FutureType>
@@ -113,7 +148,6 @@ auto call_with_interruption_impl(
   // In this case, as crimson::do_for_each would directly do futurize_invoke
   // for "call_with_interruption", we have make sure this invocation would
   // errorly release ::crimson::interruptible::interrupt_cond<InterruptCond>
-  bool set_int_cond = false;
 
   // If there exists an interrupt condition, which means "Func" may not be
   // permitted to run as a result of the interruption, test it. If it does
@@ -129,17 +163,12 @@ auto call_with_interruption_impl(
       "global interrupt_cond: {},{}",
       interrupt,
       (void*)interrupt_condition.get(),
-      (void*)interrupt_cond<InterruptCond>.get(),
+      (void*)interrupt_cond<InterruptCond>.interrupt_cond.get(),
       typeid(InterruptCond).name());
     if (interrupt) {
       return std::move(*fut);
     }
-    if (!interrupt_cond<InterruptCond>) {
-      set_int_cond = true;
-      interrupt_cond<InterruptCond> = interrupt_condition;
-    }
-    ceph_assert(interrupt_cond<InterruptCond>.get()
-      == interrupt_condition.get());
+    interrupt_cond<InterruptCond>.set(interrupt_condition);
   }
 
   auto fut = seastar::futurize_invoke(
@@ -147,18 +176,7 @@ auto call_with_interruption_impl(
       std::forward<Args>(args)...);
   // Clear the global "interrupt_cond" to prevent it from interfering other
   // continuation chains.
-  if (set_int_cond && interrupt_cond<InterruptCond>) {
-    INTR_FUT_DEBUG(
-      "call_with_interruption_impl clearing interrupt_cond: {},{}",
-      (void*)interrupt_cond<InterruptCond>.get(),
-      typeid(InterruptCond).name());
-    interrupt_cond<InterruptCond>.release();
-  } else {
-    INTR_FUT_DEBUG(
-      "call_with_interruption_impl end without clearing interrupt_cond: {},{}",
-      (void*)interrupt_cond<InterruptCond>.get(),
-      typeid(InterruptCond).name());
-  }
+  interrupt_cond<InterruptCond>.reset();
   return fut;
 }
 
@@ -238,21 +256,19 @@ Result non_futurized_call_with_interruption(
   InterruptCondRef<InterruptCond> interrupt_condition,
   Func&& func, T&&... args)
 {
-  bool set_int_cond = false;
-  if (!interrupt_cond<InterruptCond> && interrupt_condition) {
+  if (interrupt_condition) {
     auto [interrupt, fut] = interrupt_condition->template may_interrupt<seastar::future<>>();
-    if (interrupt) {
-      std::rethrow_exception(fut->get_exception());
-    }
-    set_int_cond = true;
     INTR_FUT_DEBUG(
       "non_futurized_call_with_interruption may_interrupt: {}, "
       "interrupt_condition: {}, interrupt_cond: {},{}",
       interrupt,
       (void*)interrupt_condition.get(),
-      (void*)interrupt_cond<InterruptCond>.get(),
+      (void*)interrupt_cond<InterruptCond>.interrupt_cond.get(),
       typeid(InterruptCond).name());
-    interrupt_cond<InterruptCond> = std::move(interrupt_condition);
+    if (interrupt) {
+      std::rethrow_exception(fut->get_exception());
+    }
+    interrupt_cond<InterruptCond>.set(interrupt_condition);
   }
   try {
     if constexpr (std::is_void_v<Result>) {
@@ -260,50 +276,17 @@ Result non_futurized_call_with_interruption(
 
       // Clear the global "interrupt_cond" to prevent it from interfering other
       // continuation chains.
-      if (set_int_cond && interrupt_cond<InterruptCond>) {
-       INTR_FUT_DEBUG(
-         "non_futurized_call_with_interruption clearing interrupt_cond: {},{}",
-         (void*)interrupt_cond<InterruptCond>.get(),
-         typeid(InterruptCond).name());
-       interrupt_cond<InterruptCond>.release();
-      } else {
-       INTR_FUT_DEBUG(
-         "non_futurized_call_with_interruption end without clearing interrupt_cond: {},{}",
-         (void*)interrupt_cond<InterruptCond>.get(),
-         typeid(InterruptCond).name());
-      }
+      interrupt_cond<InterruptCond>.reset();
       return;
     } else {
       auto&& err = std::invoke(std::forward<Func>(func), std::forward<T>(args)...);
-      if (set_int_cond && interrupt_cond<InterruptCond>) {
-       INTR_FUT_DEBUG(
-         "non_futurized_call_with_interruption clearing interrupt_cond: {},{}",
-         (void*)interrupt_cond<InterruptCond>.get(),
-         typeid(InterruptCond).name());
-       interrupt_cond<InterruptCond>.release();
-      } else {
-       INTR_FUT_DEBUG(
-         "non_futurized_call_with_interruption end without clearing interrupt_cond: {},{}",
-         (void*)interrupt_cond<InterruptCond>.get(),
-         typeid(InterruptCond).name());
-      }
+      interrupt_cond<InterruptCond>.reset();
       return std::forward<Result>(err);
     }
   } catch (std::exception& e) {
     // Clear the global "interrupt_cond" to prevent it from interfering other
     // continuation chains.
-    if (set_int_cond && interrupt_cond<InterruptCond>) {
-      INTR_FUT_DEBUG(
-       "non_futurized_call_with_interruption clearing interrupt_cond: {},{}",
-       (void*)interrupt_cond<InterruptCond>.get(),
-       typeid(InterruptCond).name());
-      interrupt_cond<InterruptCond>.release();
-    } else {
-      INTR_FUT_DEBUG(
-       "non_futurized_call_with_interruption end without clearing interrupt_cond: {},{}",
-       (void*)interrupt_cond<InterruptCond>.get(),
-       typeid(InterruptCond).name());
-    }
+    interrupt_cond<InterruptCond>.reset();
     throw e;
   }
 }
@@ -408,16 +391,17 @@ public:
       return core_type::get();
     } else {
       // destined to wait!
-      auto interruption_condition = interrupt_cond<InterruptCond>;
+      auto interruption_condition = interrupt_cond<InterruptCond>.interrupt_cond;
       INTR_FUT_DEBUG(
        "interruptible_future_detail::get() waiting, interrupt_cond: {},{}",
-       (void*)interrupt_cond<InterruptCond>.get(),
+       (void*)interrupt_cond<InterruptCond>.interrupt_cond.get(),
        typeid(InterruptCond).name());
+      interrupt_cond<InterruptCond>.reset();
       auto&& value = core_type::get();
-      interrupt_cond<InterruptCond> = interruption_condition;
+      interrupt_cond<InterruptCond>.set(interruption_condition);
       INTR_FUT_DEBUG(
        "interruptible_future_detail::get() got, interrupt_cond: {},{}",
-       (void*)interrupt_cond<InterruptCond>.get(),
+       (void*)interrupt_cond<InterruptCond>.interrupt_cond.get(),
        typeid(InterruptCond).name());
       return std::move(value);
     }
@@ -431,9 +415,10 @@ public:
                std::invoke_result_t<Func, seastar::future<T>>>>
   [[gnu::always_inline]]
   Result then_wrapped_interruptible(Func&& func) {
-    ceph_assert(interrupt_cond<InterruptCond>);
+    ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond);
     return core_type::then_wrapped(
-      [func=std::move(func), interrupt_condition=interrupt_cond<InterruptCond>]
+      [func=std::move(func),
+       interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]
       (auto&& fut) mutable {
       return call_with_interruption(
                std::move(interrupt_condition),
@@ -445,10 +430,11 @@ public:
   template <typename Func>
   [[gnu::always_inline]]
   auto then_interruptible(Func&& func) {
-    ceph_assert(interrupt_cond<InterruptCond>);
+    ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond);
     if constexpr (std::is_void_v<T>) {
       auto fut = core_type::then(
-       [func=std::move(func), interrupt_condition=interrupt_cond<InterruptCond>]
+       [func=std::move(func),
+        interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]
        () mutable {
        return call_with_interruption(
                  interrupt_condition,
@@ -457,7 +443,8 @@ public:
       return (interrupt_futurize_t<decltype(fut)>)(std::move(fut));
     } else {
       auto fut = core_type::then(
-       [func=std::move(func), interrupt_condition=interrupt_cond<InterruptCond>]
+       [func=std::move(func),
+        interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]
        (T&& arg) mutable {
        return call_with_interruption(
                  interrupt_condition,
@@ -481,10 +468,11 @@ public:
                std::result_of_t<Func(std::exception_ptr)>>>
   [[gnu::always_inline]]
   Result handle_exception_interruptible(Func&& func) {
-    ceph_assert(interrupt_cond<InterruptCond>);
+    ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond);
     return core_type::then_wrapped(
       [func=std::forward<Func>(func),
-      interrupt_condition=interrupt_cond<InterruptCond>](auto&& fut) mutable {
+       interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]
+      (auto&& fut) mutable {
       if (!fut.failed()) {
        return seastar::make_ready_future<T>(fut.get());
       } else {
@@ -502,10 +490,11 @@ public:
   [[gnu::always_inline]]
   Result finally_interruptible(Func&& func) {
     if constexpr (may_interrupt) {
-      ceph_assert(interrupt_cond<InterruptCond>);
+      ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond);
       return core_type::then_wrapped(
        [func=std::forward<Func>(func),
-       interrupt_condition=interrupt_cond<InterruptCond>](auto&& fut) mutable {
+        interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]
+       (auto&& fut) mutable {
          return call_with_interruption(
                    interrupt_condition,
                    std::move(func));
@@ -521,14 +510,14 @@ public:
                  typename seastar::function_traits<Func>::template arg<0>::type)>>>
   [[gnu::always_inline]]
   Result handle_exception_type_interruptible(Func&& func) {
-    ceph_assert(interrupt_cond<InterruptCond>);
+    ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond);
     using trait = seastar::function_traits<Func>;
     static_assert(trait::arity == 1, "func can take only one parameter");
     using ex_type = typename trait::template arg<0>::type;
     return core_type::then_wrapped(
       [func=std::forward<Func>(func),
-      interrupt_condition=interrupt_cond<InterruptCond>](auto&& fut) mutable
-      -> Result {
+      interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]
+      (auto&& fut) mutable -> Result {
       if (!fut.failed()) {
        return seastar::make_ready_future<T>(fut.get());
       } else {
@@ -580,7 +569,8 @@ private:
   [[gnu::always_inline]]
   Result then_wrapped(Func&& func) {
     return core_type::then_wrapped(
-      [func=std::move(func), interrupt_condition=interrupt_cond<InterruptCond>]
+      [func=std::move(func),
+      interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]
       (auto&& fut) mutable {
       return call_with_interruption(
                interrupt_condition,
@@ -749,16 +739,17 @@ public:
           typename U = T, std::enable_if_t<!std::is_void_v<U> && interruptible, int> = 0>
   [[gnu::always_inline]]
   auto safe_then_interruptible(ValueInterruptCondT&& valfunc, ErrorVisitorT&& errfunc) {
-    ceph_assert(interrupt_cond<InterruptCond>);
+    ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond);
     auto fut = core_type::safe_then(
-      [func=std::move(valfunc), interrupt_condition=interrupt_cond<InterruptCond>]
+      [func=std::move(valfunc),
+      interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]
       (T&& args) mutable {
       return call_with_interruption(
                interrupt_condition,
                std::move(func),
                std::forward<T>(args));
       }, [func=std::move(errfunc),
-         interrupt_condition=interrupt_cond<InterruptCond>]
+         interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]
          (auto&& err) mutable -> decltype(auto) {
          constexpr bool return_void = std::is_void_v<
            std::invoke_result_t<ErrorVisitorT,
@@ -785,15 +776,16 @@ public:
           typename U = T, std::enable_if_t<std::is_void_v<U> && interruptible, int> = 0>
   [[gnu::always_inline]]
   auto safe_then_interruptible(ValueInterruptCondT&& valfunc, ErrorVisitorT&& errfunc) {
-    ceph_assert(interrupt_cond<InterruptCond>);
+    ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond);
     auto fut = core_type::safe_then(
-      [func=std::move(valfunc), interrupt_condition=interrupt_cond<InterruptCond>]
+      [func=std::move(valfunc),
+      interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]
       () mutable {
       return call_with_interruption(
                interrupt_condition,
                std::move(func));
       }, [func=std::move(errfunc),
-         interrupt_condition=interrupt_cond<InterruptCond>]
+         interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]
          (auto&& err) mutable -> decltype(auto) {
          constexpr bool return_void = std::is_void_v<
            std::invoke_result_t<ErrorVisitorT,
@@ -820,10 +812,10 @@ public:
            typename U = T, std::enable_if_t<std::is_void_v<T> && interruptible, int> = 0>
   [[gnu::always_inline]]
   auto safe_then_interruptible(ValueInterruptCondT&& valfunc) {
-    ceph_assert(interrupt_cond<InterruptCond>);
+    ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond);
     auto fut = core_type::safe_then(
       [func=std::move(valfunc),
-       interrupt_condition=interrupt_cond<InterruptCond>]
+       interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]
       () mutable {
       return call_with_interruption(
                interrupt_condition,
@@ -852,10 +844,10 @@ public:
            typename U = T, std::enable_if_t<!std::is_void_v<T> && interruptible, int> = 0>
   [[gnu::always_inline]]
   auto safe_then_interruptible(ValueInterruptCondT&& valfunc) {
-    ceph_assert(interrupt_cond<InterruptCond>);
+    ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond);
     auto fut = core_type::safe_then(
       [func=std::move(valfunc),
-       interrupt_condition=interrupt_cond<InterruptCond>]
+       interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]
       (T&& arg) mutable {
       return call_with_interruption(
                interrupt_condition,
@@ -918,10 +910,10 @@ public:
   template <bool interruptible = true, typename ErrorFunc>
   auto handle_error_interruptible(ErrorFunc&& errfunc) {
     if constexpr (interruptible) {
-      ceph_assert(interrupt_cond<InterruptCond>);
+      ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond);
       auto fut = core_type::handle_error(
        [errfunc=std::move(errfunc),
-        interrupt_condition=interrupt_cond<InterruptCond>]
+        interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]
        (auto&& err) mutable -> decltype(auto) {
          constexpr bool return_void = std::is_void_v<
            std::invoke_result_t<ErrorFunc,
@@ -951,7 +943,7 @@ public:
            typename... ErrorFuncTail>
   auto handle_error_interruptible(ErrorFuncHead&& error_func_head,
                                  ErrorFuncTail&&... error_func_tail) {
-    ceph_assert(interrupt_cond<InterruptCond>);
+    ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond);
     static_assert(sizeof...(ErrorFuncTail) > 0);
     return this->handle_error_interruptible(
       ::crimson::composer(
@@ -981,7 +973,8 @@ private:
     using futurator_t = typename ret_ertr_t::template futurize<func_result_t>;
     return core_type::then_wrapped(
       [func=std::move(func),
-       interrupt_condition=interrupt_cond<InterruptCond>](auto&& fut) mutable
+       interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]
+      (auto&& fut) mutable
       -> typename futurator_t::type {
        if (fut.failed()) {
          std::exception_ptr ex = fut.get_exception();
@@ -1123,7 +1116,8 @@ public:
   static inline auto with_interruption_cond(
     OpFunc&& opfunc, OnInterrupt&& efunc, InterruptCond &&cond, Params&&... params) {
     INTR_FUT_DEBUG(
-      "with_interruption_cond: interrupt_cond: {}", (void*)interrupt_cond<InterruptCond>.get());
+      "with_interruption_cond: interrupt_cond: {}",
+      (void*)interrupt_cond<InterruptCond>.interrupt_cond.get());
     return internal::call_with_interruption_impl(
       seastar::make_lw_shared<InterruptCond>(std::move(cond)),
       std::forward<OpFunc>(opfunc),
@@ -1172,7 +1166,7 @@ public:
   [[gnu::always_inline]]
   static auto wrap_function(Func&& func) {
     return [func=std::forward<Func>(func),
-           interrupt_condition=interrupt_cond<InterruptCond>]() mutable {
+           interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]() mutable {
              return call_with_interruption(
                  interrupt_condition,
                  std::forward<Func>(func));
@@ -1188,7 +1182,7 @@ public:
       return make_interruptible(
          ::seastar::do_for_each(begin, end,
            [action=std::move(action),
-           interrupt_condition=interrupt_cond<InterruptCond>]
+           interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]
            (typename Iterator::reference x) mutable {
            return call_with_interruption(
                      interrupt_condition,
@@ -1200,7 +1194,7 @@ public:
       return make_interruptible(
          ::crimson::do_for_each(begin, end,
            [action=std::move(action),
-           interrupt_condition=interrupt_cond<InterruptCond>]
+           interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]
            (typename Iterator::reference x) mutable {
            return call_with_interruption(
                      interrupt_condition,
@@ -1220,7 +1214,7 @@ public:
       return make_interruptible(
          ::seastar::do_for_each(begin, end,
            [action=std::move(action),
-           interrupt_condition=interrupt_cond<InterruptCond>]
+           interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]
            (typename Iterator::reference x) mutable {
            return call_with_interruption(
                      interrupt_condition,
@@ -1232,7 +1226,7 @@ public:
       return make_interruptible(
          ::crimson::do_for_each(begin, end,
            [action=std::move(action),
-           interrupt_condition=interrupt_cond<InterruptCond>]
+           interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]
            (typename Iterator::reference x) mutable {
            return call_with_interruption(
                      interrupt_condition,
@@ -1252,7 +1246,7 @@ public:
       return make_interruptible(
          ::seastar::repeat(
            [action=std::move(action),
-           interrupt_condition=interrupt_cond<InterruptCond>] {
+           interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] {
            return call_with_interruption(
                      interrupt_condition,
                      std::move(action)).to_future();
@@ -1262,7 +1256,7 @@ public:
       return make_interruptible(
          ::crimson::repeat(
            [action=std::move(action),
-           interrupt_condition=interrupt_cond<InterruptCond>] {
+           interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] {
            return call_with_interruption(
                      interrupt_condition,
                      std::move(action)).to_future();
@@ -1280,7 +1274,7 @@ public:
       return make_interruptible(
          ::seastar::repeat(
            [action=std::move(action),
-           interrupt_condition=interrupt_cond<InterruptCond>] {
+           interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] {
            return call_with_interruption(
                      interrupt_condition,
                      std::move(action));
@@ -1290,7 +1284,7 @@ public:
       return make_interruptible(
          ::crimson::repeat(
            [action=std::move(action),
-           interrupt_condition=interrupt_cond<InterruptCond>] {
+           interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond] {
            return call_with_interruption(
                      interrupt_condition,
                      std::move(action));
@@ -1309,7 +1303,7 @@ public:
     parallel_for_each_state<InterruptCond, ResultType>* s = nullptr;
     auto decorated_func =
       [func=std::forward<Func>(func),
-      interrupt_condition=interrupt_cond<InterruptCond>]
+      interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]
       (decltype(*Iterator())&& x) mutable {
        return call_with_interruption(
                  interrupt_condition,
@@ -1416,42 +1410,45 @@ public:
            typename Result = futurize_t<std::invoke_result_t<Func>>>
   static inline Result async(Func&& func) {
     return seastar::async([func=std::forward<Func>(func),
-                          interrupt_condition=interrupt_cond<InterruptCond>]() mutable {
+                          interrupt_condition=interrupt_cond<InterruptCond>.interrupt_cond]
+                         () mutable {
       return non_futurized_call_with_interruption(
          interrupt_condition, std::forward<Func>(func));
     });
   }
 
   static void yield() {
-    ceph_assert(interrupt_cond<InterruptCond>);
-    auto interruption_condition = interrupt_cond<InterruptCond>;
-    interrupt_cond<InterruptCond>.release();
+    ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond);
+    auto interruption_condition = interrupt_cond<InterruptCond>.interrupt_cond;
     INTR_FUT_DEBUG(
-      "interruptible_future_detail::yield() yielding out, interrupt_cond {},{} cleared",
+      "interruptible_future_detail::yield() yielding out, "
+      "interrupt_cond {},{} cleared",
       (void*)interruption_condition.get(),
       typeid(InterruptCond).name());
+    interrupt_cond<InterruptCond>.reset();
     seastar::thread::yield();
-    interrupt_cond<InterruptCond> = interruption_condition;
+    interrupt_cond<InterruptCond>.set(interruption_condition);
     INTR_FUT_DEBUG(
       "interruptible_future_detail::yield() yield back, interrupt_cond: {},{}",
-      (void*)interrupt_cond<InterruptCond>.get(),
+      (void*)interrupt_cond<InterruptCond>.interrupt_cond.get(),
       typeid(InterruptCond).name());
   }
 
   static void maybe_yield() {
-    ceph_assert(interrupt_cond<InterruptCond>);
+    ceph_assert(interrupt_cond<InterruptCond>.interrupt_cond);
     if (seastar::thread::should_yield()) {
-      auto interruption_condition = interrupt_cond<InterruptCond>;
-      interrupt_cond<InterruptCond>.release();
+      auto interruption_condition = interrupt_cond<InterruptCond>.interrupt_cond;
       INTR_FUT_DEBUG(
-       "interruptible_future_detail::may_yield() yielding out, interrupt_cond {},{} cleared",
+       "interruptible_future_detail::may_yield() yielding out, "
+       "interrupt_cond {},{} cleared",
        (void*)interruption_condition.get(),
        typeid(InterruptCond).name());
+      interrupt_cond<InterruptCond>.reset();
       seastar::thread::yield();
-      interrupt_cond<InterruptCond> = interruption_condition;
+      interrupt_cond<InterruptCond>.set(interruption_condition);
       INTR_FUT_DEBUG(
        "interruptible_future_detail::may_yield() yield back, interrupt_cond: {},{}",
-       (void*)interrupt_cond<InterruptCond>.get(),
+       (void*)interrupt_cond<InterruptCond>.interrupt_cond.get(),
        typeid(InterruptCond).name());
     }
   }
index ce02a95ef91823c9e5cfa14a565dc3bb03d75e71..4cab476c303f6e6ff5e2f961c134ae2509ada9a8 100644 (file)
@@ -3,6 +3,6 @@
 
 namespace crimson::interruptible {
 template
-thread_local InterruptCondRef<::crimson::os::seastore::TransactionConflictCondition>
+thread_local interrupt_cond_t<::crimson::os::seastore::TransactionConflictCondition>
 interrupt_cond<::crimson::os::seastore::TransactionConflictCondition>;
 }
index c6af5f4957e98eb10b4c91fa8ee9055dde0ad383..8f98a7d98bc3adba6de75a89f5a868743b41608f 100644 (file)
@@ -52,31 +52,31 @@ TEST_F(seastar_test_suite_t, basic)
   run_async([] {
     interruptor::with_interruption(
       [] {
-       ceph_assert(interruptible::interrupt_cond<TestInterruptCondition>);
+       ceph_assert(interruptible::interrupt_cond<TestInterruptCondition>.interrupt_cond);
        return interruptor::make_interruptible(seastar::now())
        .then_interruptible([] {
-         ceph_assert(interruptible::interrupt_cond<TestInterruptCondition>);
+         ceph_assert(interruptible::interrupt_cond<TestInterruptCondition>.interrupt_cond);
        }).then_interruptible([] {
-         ceph_assert(interruptible::interrupt_cond<TestInterruptCondition>);
+         ceph_assert(interruptible::interrupt_cond<TestInterruptCondition>.interrupt_cond);
          return errorator<ct_error::enoent>::make_ready_future<>();
        }).safe_then_interruptible([] {
-         ceph_assert(interruptible::interrupt_cond<TestInterruptCondition>);
+         ceph_assert(interruptible::interrupt_cond<TestInterruptCondition>.interrupt_cond);
          return seastar::now();
        }, errorator<ct_error::enoent>::all_same_way([] {
-         ceph_assert(interruptible::interrupt_cond<TestInterruptCondition>);
+         ceph_assert(interruptible::interrupt_cond<TestInterruptCondition>.interrupt_cond);
          })
        );
       }, [](std::exception_ptr) {}, false).get0();
 
     interruptor::with_interruption(
       [] {
-       ceph_assert(interruptible::interrupt_cond<TestInterruptCondition>);
+       ceph_assert(interruptible::interrupt_cond<TestInterruptCondition>.interrupt_cond);
        return interruptor::make_interruptible(seastar::now())
        .then_interruptible([] {
-         ceph_assert(interruptible::interrupt_cond<TestInterruptCondition>);
+         ceph_assert(interruptible::interrupt_cond<TestInterruptCondition>.interrupt_cond);
        });
       }, [](std::exception_ptr) {
-       ceph_assert(!interruptible::interrupt_cond<TestInterruptCondition>);
+       ceph_assert(!interruptible::interrupt_cond<TestInterruptCondition>.interrupt_cond);
        return seastar::now();
       }, true).get0();
 
@@ -94,14 +94,14 @@ TEST_F(seastar_test_suite_t, loops)
     interruptor::with_interruption(
       [] {
        std::cout << "interruptiion enabled" << std::endl;
-       ceph_assert(interruptible::interrupt_cond<TestInterruptCondition>);
+       ceph_assert(interruptible::interrupt_cond<TestInterruptCondition>.interrupt_cond);
        return interruptor::make_interruptible(seastar::now())
        .then_interruptible([] {
          std::cout << "test seastar future do_for_each" << std::endl;
          std::vector<int> vec = {1, 2};
          return seastar::do_with(std::move(vec), [](auto& vec) {
            return interruptor::do_for_each(std::begin(vec), std::end(vec), [](int) {
-             ceph_assert(interruptible::interrupt_cond<TestInterruptCondition>);
+             ceph_assert(interruptible::interrupt_cond<TestInterruptCondition>.interrupt_cond);
              return seastar::now();
            });
          });
@@ -110,14 +110,14 @@ TEST_F(seastar_test_suite_t, loops)
          std::vector<int> vec = {1, 2};
          return seastar::do_with(std::move(vec), [](auto& vec) {
            return interruptor::do_for_each(std::begin(vec), std::end(vec), [](int) {
-             ceph_assert(interruptible::interrupt_cond<TestInterruptCondition>);
+             ceph_assert(interruptible::interrupt_cond<TestInterruptCondition>.interrupt_cond);
              return interruptor::make_interruptible(seastar::now());
            });
          });
        }).then_interruptible([] {
          std::cout << "test seastar future repeat" << std::endl;
          return interruptor::repeat([] {
-           ceph_assert(interruptible::interrupt_cond<TestInterruptCondition>);
+           ceph_assert(interruptible::interrupt_cond<TestInterruptCondition>.interrupt_cond);
            return interruptor::make_interruptible(
                seastar::make_ready_future<
                  seastar::stop_iteration>(
@@ -126,7 +126,7 @@ TEST_F(seastar_test_suite_t, loops)
        }).then_interruptible([] {
          std::cout << "test interruptible seastar future repeat" << std::endl;
          return interruptor::repeat([] {
-           ceph_assert(interruptible::interrupt_cond<TestInterruptCondition>);
+           ceph_assert(interruptible::interrupt_cond<TestInterruptCondition>.interrupt_cond);
            return seastar::make_ready_future<
                    seastar::stop_iteration>(
                      seastar::stop_iteration::yes);
@@ -138,14 +138,14 @@ TEST_F(seastar_test_suite_t, loops)
            using namespace std::chrono_literals;
            return interruptor::make_interruptible(seastar::now()).then_interruptible([&vec] {
              return interruptor::do_for_each(std::begin(vec), std::end(vec), [](int) {
-               ceph_assert(interruptible::interrupt_cond<TestInterruptCondition>);
+               ceph_assert(interruptible::interrupt_cond<TestInterruptCondition>.interrupt_cond);
                return interruptor::make_interruptible(
                  errorator<ct_error::enoent>::make_ready_future<>());
              }).safe_then_interruptible([] {
-               ceph_assert(interruptible::interrupt_cond<TestInterruptCondition>);
+               ceph_assert(interruptible::interrupt_cond<TestInterruptCondition>.interrupt_cond);
                return seastar::now();
              }, errorator<ct_error::enoent>::all_same_way([] {
-               ceph_assert(interruptible::interrupt_cond<TestInterruptCondition>);
+               ceph_assert(interruptible::interrupt_cond<TestInterruptCondition>.interrupt_cond);
              }));
            });
          });
@@ -156,18 +156,18 @@ TEST_F(seastar_test_suite_t, loops)
            using namespace std::chrono_literals;
            return interruptor::make_interruptible(seastar::now()).then_interruptible([&vec] {
              return interruptor::do_for_each(std::begin(vec), std::end(vec), [](int) {
-               ceph_assert(interruptible::interrupt_cond<TestInterruptCondition>);
+               ceph_assert(interruptible::interrupt_cond<TestInterruptCondition>.interrupt_cond);
                return errorator<ct_error::enoent>::make_ready_future<>();
              }).safe_then_interruptible([] {
-               ceph_assert(interruptible::interrupt_cond<TestInterruptCondition>);
+               ceph_assert(interruptible::interrupt_cond<TestInterruptCondition>.interrupt_cond);
                return seastar::now();
              }, errorator<ct_error::enoent>::all_same_way([] {
-               ceph_assert(interruptible::interrupt_cond<TestInterruptCondition>);
+               ceph_assert(interruptible::interrupt_cond<TestInterruptCondition>.interrupt_cond);
              }));
            });
          });
        }).then_interruptible([] {
-         ceph_assert(interruptible::interrupt_cond<TestInterruptCondition>);
+         ceph_assert(interruptible::interrupt_cond<TestInterruptCondition>.interrupt_cond);
          return seastar::now();
        });
       }, [](std::exception_ptr) {}, false).get0();