]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/common: deal with seastar::thread in interruptible_future 37536/head
authorXuehan Xu <xxhdx1985126@gmail.com>
Tue, 10 Nov 2020 09:14:44 +0000 (17:14 +0800)
committerXuehan Xu <xxhdx1985126@gmail.com>
Sun, 7 Mar 2021 06:24:51 +0000 (14:24 +0800)
Signed-off-by: Xuehan Xu <xxhdx1985126@gmail.com>
src/crimson/common/interruptible_future.h
src/crimson/osd/ops_executer.cc

index 65fdb7d427124845e7a27ee1c2dd1fd0d82a1861..0d66ca843c4a16bad402a50f37f07bec84e232bc 100644 (file)
@@ -6,6 +6,7 @@
 #include <seastar/core/future-util.hh>
 #include <seastar/core/do_with.hh>
 #include <seastar/core/when_all.hh>
+#include <seastar/core/thread.hh>
 
 #include "crimson/common/errorator.h"
 
@@ -254,7 +255,15 @@ public:
 
   [[gnu::always_inline]]
   value_type&& get() {
-    return std::move(core_type::get());
+    if (core_type::available()) {
+      return core_type::get();
+    } else {
+      // destined to wait!
+      auto interruption_condition = interrupt_cond<InterruptCond>;
+      auto&& value = core_type::get();
+      interrupt_cond<InterruptCond> = interruption_condition;
+      return std::move(value);
+    }
   }
 
   using core_type::available;
@@ -1173,6 +1182,34 @@ public:
     return ::seastar::internal::when_all_succeed_impl(
        futurize_invoke_if_func(std::forward<FutOrFuncs>(fut_or_funcs))...);
   }
+
+  template <typename Func,
+           typename Result = futurize_t<std::invoke_result_t<Func>>>
+  static inline Result async(Func&& func) {
+    return seastar::async([func=std::forward<Func>(func),
+                          interrupt_condition=interrupt_cond<InterruptCond>]() mutable {
+      return non_futurized_call_with_interruption(
+         interrupt_condition, std::forward<Func>(func));
+    });
+  }
+
+  static void yield() {
+    assert(interrupt_cond<InterruptCond>);
+    auto interruption_condition = interrupt_cond<InterruptCond>;
+    interrupt_cond<InterruptCond>.release();
+    seastar::thread::yield();
+    interrupt_cond<InterruptCond> = interruption_condition;
+  }
+
+  static void maybe_yield() {
+    assert(interrupt_cond<InterruptCond>);
+    if (seastar::thread::should_yield()) {
+      auto interruption_condition = interrupt_cond<InterruptCond>;
+      interrupt_cond<InterruptCond>.release();
+      seastar::thread::yield();
+      interrupt_cond<InterruptCond> = interruption_condition;
+    }
+  }
 private:
   // return true if an new interrupt condition is created and false otherwise
   template <typename... Args>
index 63ca81af9ba65a41571b4b17ed06017097381829..fd9731312d312d03641ba7a888aaf1981026b599 100644 (file)
@@ -79,14 +79,14 @@ OpsExecuter::call_ierrorator::future<> OpsExecuter::do_op_call(OSDOp& osd_op)
                  cname, mname, num_read, num_write);
   const auto prev_rd = num_read;
   const auto prev_wr = num_write;
-  return seastar::async(
+  return interruptor::async(
     [this, method, indata=std::move(indata)]() mutable {
       ceph::bufferlist outdata;
       auto cls_context = reinterpret_cast<cls_method_context_t>(this);
       const auto ret = method->exec(cls_context, indata, outdata);
       return std::make_pair(ret, std::move(outdata));
     }
-  ).then(
+  ).then_interruptible(
     [this, prev_rd, prev_wr, &osd_op, flags]
     (auto outcome) -> call_errorator::future<> {
       auto& [ret, outdata] = outcome;