// can keep an op waiting in the case explained above.
class OpSequencer {
public:
- template <typename HandleT,
+ template <typename OpT,
+ typename HandleT,
typename FuncT,
typename Result = std::invoke_result_t<FuncT>>
seastar::futurize_t<Result>
- start_op(HandleT& handle,
- uint64_t prev_op,
- uint64_t this_op,
+ start_op(const OpT& op,
+ HandleT& handle,
FuncT&& do_op) {
+ const uint64_t prev_op = op.get_prev_id();
+ const uint64_t this_op = op.get_id();
auto have_green_light = seastar::make_ready_future<>();
assert(prev_op < this_op);
if (last_issued == prev_op) {
handle.exit();
::crimson::get_logger(ceph_subsys_osd).debug(
"OpSequencer::start_op: {} waiting ({} > {})",
- this_op, prev_op, last_unblocked);
+ op, prev_op, last_unblocked);
have_green_light = unblocked.wait([prev_op, this] {
// wait until the previous op is unblocked
return last_unblocked == prev_op;
uint64_t get_last_issued() const {
return last_issued;
}
- void finish_op(uint64_t op_id) {
- assert(op_id > last_completed);
- last_completed = op_id;
+ template <class OpT>
+ void finish_op(const OpT& op) {
+ assert(op.get_id() > last_completed);
+ last_completed = op.get_id();
}
- void maybe_reset(uint64_t op_id) {
+ template <class OpT>
+ void maybe_reset(const OpT& op) {
+ const auto op_id = op.get_id();
// pg interval changes, so we need to reenqueue the previously unblocked
// ops by rewinding the "last_unblock" pointer
if (op_id <= last_unblocked) {
+ ::crimson::get_logger(ceph_subsys_osd).debug(
+ "OpSequencer::maybe_reset:{} {} <= {}, resetting to {}",
+ op, op_id, last_unblocked, last_completed);
last_unblocked = last_completed;
}
}
epoch_t same_interval_since = pgref->get_interval_start_epoch();
logger().debug("{} same_interval_since: {}", *this, same_interval_since);
may_set_prev_op();
- assert(prev_op_id.has_value());
return sequencer.start_op(
- handle, *prev_op_id, get_id(),
+ *this,
+ handle,
interruptor::wrap_function(
[this, pgref]() mutable -> interruptible_future<> {
PG &pg = *pgref;
});
})
).then_interruptible([this, pgref]() {
- sequencer.finish_op(get_id());
+ sequencer.finish_op(*this);
return seastar::stop_iteration::yes;
});
}, [this, pgref](std::exception_ptr eptr) {
if (should_abort_request(*this, std::move(eptr))) {
return seastar::stop_iteration::yes;
} else {
- sequencer.maybe_reset(get_id());
+ sequencer.maybe_reset(*this);
return seastar::stop_iteration::no;
}
}, pgref);