#include "include/utime.h"
#include "common/Clock.h"
#include "crimson/common/interruptible_future.h"
+#include "crimson/common/smp_helpers.h"
#include "crimson/common/log.h"
namespace ceph {
template <class T>
class PipelineStageIT : public BlockerT<T> {
+ const core_id_t core = seastar::this_shard_id();
public:
+ core_id_t get_core() const { return core; }
+
template <class... Args>
decltype(auto) enter(Args&&... args) {
return static_cast<T*>(this)->enter(std::forward<Args>(args)...);
template <typename OpT, typename T>
seastar::future<>
enter(T &stage, typename T::BlockingEvent::template Trigger<OpT>&& t) {
+ ceph_assert(stage.get_core() == seastar::this_shard_id());
return wait_barrier().then([this, &stage, t=std::move(t)] () mutable {
auto fut = t.maybe_record_blocking(stage.enter(t), stage);
exit();
void exit() final {
if (phase) {
- phase->exit();
+ auto *p = phase;
phase = nullptr;
+ std::ignore = seastar::smp::submit_to(
+ p->get_core(),
+ [p] {
+ p->exit();
+ });
}
}
phase = nullptr;
}
if (phase) {
- phase->mutex.unlock();
- phase = nullptr;
+ std::ignore = seastar::smp::submit_to(
+ phase->get_core(),
+ [this] {
+ phase->mutex.unlock();
+ phase = nullptr;
+ });
}
}