#include "include/ceph_assert.h"
#include "include/utime.h"
#include "common/Clock.h"
+#include "common/Formatter.h"
#include "crimson/common/interruptible_future.h"
#include "crimson/common/smp_helpers.h"
#include "crimson/common/log.h"
return std::forward<FutureT>(fut);
}
+ const OpT &get_op() { return op; }
+
protected:
void record_blocking(const T& blocker) override {
this->event.trigger(op, blocker);
Operation, boost::thread_unsafe_counter> {
public:
using id_t = uint64_t;
+ static constexpr id_t NULL_ID = std::numeric_limits<uint64_t>::max();
id_t get_id() const {
return id;
}
*/
template <class T>
class OrderedExclusivePhaseT : public PipelineStageIT<T> {
- void dump_detail(ceph::Formatter *f) const final {}
+ void dump_detail(ceph::Formatter *f) const final {
+ f->dump_unsigned("waiting", waiting);
+ if (held_by != Operation::NULL_ID) {
+ f->dump_unsigned("held_by_operation_id", held_by);
+ }
+ }
class ExitBarrier final : public PipelineExitBarrierI {
OrderedExclusivePhaseT *phase;
+ Operation::id_t op_id;
public:
- ExitBarrier(OrderedExclusivePhaseT *phase) : phase(phase) {}
+ ExitBarrier(OrderedExclusivePhaseT *phase, Operation::id_t id)
+ : phase(phase), op_id(id) {}
seastar::future<> wait() final {
return seastar::now();
void exit() final {
if (phase) {
auto *p = phase;
+ auto id = op_id;
phase = nullptr;
std::ignore = seastar::smp::submit_to(
p->get_core(),
- [p] {
- p->exit();
+ [p, id] {
+ p->exit(id);
});
}
}
}
};
- void exit() {
+ void exit(Operation::id_t op_id) {
+ clear_held_by(op_id);
mutex.unlock();
}
public:
- template <class... IgnoreArgs>
- seastar::future<PipelineExitBarrierI::Ref> enter(IgnoreArgs&&...) {
- return mutex.lock().then([this] {
- return PipelineExitBarrierI::Ref(new ExitBarrier{this});
+ template <class TriggerT>
+ seastar::future<PipelineExitBarrierI::Ref> enter(TriggerT& t) {
+ waiting++;
+ return mutex.lock().then([this, op_id=t.get_op().get_id()] {
+ ceph_assert_always(waiting > 0);
+ --waiting;
+ set_held_by(op_id);
+ return PipelineExitBarrierI::Ref(new ExitBarrier{this, op_id});
});
}
private:
+ void set_held_by(Operation::id_t id) {
+ ceph_assert_always(held_by == Operation::NULL_ID);
+ held_by = id;
+ }
+
+ void clear_held_by(Operation::id_t id) {
+ ceph_assert_always(held_by == id);
+ held_by = Operation::NULL_ID;
+ }
+
+ unsigned waiting = 0;
seastar::shared_mutex mutex;
+ Operation::id_t held_by = Operation::NULL_ID;
};
/**