// their id which is monotonically increasing and unique on per PG basis, so we
// can keep an op waiting in the case explained above.
class OpSequencer {
+ bool resequencing{false};
+
+ static bool is_unfinished(const ClientRequest& this_op) {
+ // TODO: kill the tombstone; reuse op status tracking.
+ return !this_op.finished;
+ }
+
+ std::uint64_t get_prev_id(const ClientRequest& this_op,
+ const OSDOperationRegistry& registry) {
+ // an alternative to iterating till the registy's beginning could be
+ // holding a pointer to next(last_completed).
+ constexpr auto type_idx = static_cast<size_t>(ClientRequest::type);
+ for (auto it = OSDOperationRegistry::op_list::s_iterator_to(this_op);
+ it != std::begin(registry.get_registry<type_idx>());
+ --it) {
+ // we're iterating over the operation registry of all ClientRequests.
+ // this list aggrates every single instance in the system, and thus
+ // we need to skip operations coming from different client's session
+ // or targeting different PGs.
+ // as this is supposed to happen on cold paths only, the overhead is
+ // a thing we can live with.
+ auto* maybe_prev_op = std::addressof(static_cast<const ClientRequest&>(*it));
+ if (maybe_prev_op->same_session_and_pg(this_op)) {
+ if (is_unfinished(*maybe_prev_op)) {
+ return maybe_prev_op->get_id();
+ } else {
+ // an early exited one
+ }
+ }
+ }
+ // the prev op of this session targeting the same PG as this_op must has
+ // been completed and hence already has been removed from the list, that's
+ // the only way we got here
+ return last_completed_id;
+ }
+
public:
template <typename HandleT,
typename FuncT,
seastar::futurize_t<Result>
start_op(const ClientRequest& op,
HandleT& handle,
+ const OSDOperationRegistry& registry,
FuncT&& do_op) {
- const uint64_t prev_op = op.get_prev_id();
- const uint64_t this_op = op.get_id();
+ ::crimson::get_logger(ceph_subsys_osd).debug(
+ "OpSequencer::{}: op={}, last_started={}, last_unblocked={}, last_completed={}",
+ __func__, op.get_id(), last_started_id, last_unblocked_id, last_completed_id);
auto have_green_light = seastar::make_ready_future<>();
- assert(prev_op < this_op);
- if (last_issued == prev_op) {
- // starting a new op, let's advance the last_issued!
- last_issued = this_op;
+ if (last_started_id < op.get_id()) {
+ // starting a new op, let's advance the last_started!
+ last_started_id = op.get_id();
}
- if (prev_op != last_unblocked) {
- // this implies that there are some blocked ops before me, so i have to
- // wait until they are unblocked.
+ if (__builtin_expect(resequencing, false)) {
+ // this implies that there was a reset condition and there may me some
+ // older ops before me, so i have to wait until they are unblocked.
//
// i should leave the current pipeline stage when waiting for the blocked
// ones, so that the following ops can be queued up here. we cannot let
// the seastar scheduler to determine the order of performing these ops,
// once they are unblocked after the first op of the same pg interval is
// scheduled.
- assert(prev_op > last_unblocked);
+ const auto prev_id = get_prev_id(op, registry);
+ assert(prev_id >= last_unblocked_id);
handle.exit();
::crimson::get_logger(ceph_subsys_osd).debug(
- "OpSequencer::start_op: {} waiting ({} > {})",
- op, prev_op, last_unblocked);
- have_green_light = unblocked.wait([prev_op, this] {
+ "OpSequencer::start_op: {} resequencing ({} >= {})",
+ op, prev_id, last_unblocked_id);
+ have_green_light = unblocked.wait([&op, ®istry, this] {
// wait until the previous op is unblocked
- return last_unblocked == prev_op;
+ const bool unblocking =
+ get_prev_id(op, registry) == last_unblocked_id;
+ if (unblocking) {
+ // stop resequencing if everything is handled which means there is no
+ // operation after us. the range could be minimized by snapshotting
+ // `last_started` on `maybe_reset()`.
+ // `<=` is to handle the situation when `last_started` has finished out-
+ // of-the-order.
+ resequencing = !(last_started_id <= op.get_id());
+ }
+ return unblocking;
});
}
- return have_green_light.then([this_op, do_op=std::move(do_op), this]() mutable {
+ return have_green_light.then([&op, do_op=std::move(do_op), this]() mutable {
auto result = seastar::futurize_invoke(std::move(do_op));
// unblock the next one
- last_unblocked = this_op;
+ last_unblocked_id = op.get_id();
unblocked.broadcast();
return result;
});
}
- uint64_t get_last_issued() const {
- return last_issued;
+ void finish_op_in_order(ClientRequest& op) {
+ ::crimson::get_logger(ceph_subsys_osd).debug(
+ "OpSequencer::{}: op={}, last_started={}, last_unblocked={}, last_completed={}",
+ __func__, op.get_id(), last_started_id, last_unblocked_id, last_completed_id);
+ assert(op.get_id() > last_completed_id);
+ last_completed_id = op.get_id();
+ op.finished = true;
}
- void finish_op(const ClientRequest& op) {
- assert(op.get_id() > last_completed);
- last_completed = op.get_id();
+ void finish_op_out_of_order(ClientRequest& op,
+ const OSDOperationRegistry& registry) {
+ ::crimson::get_logger(ceph_subsys_osd).debug(
+ "OpSequencer::{}: op={}, last_started={}, last_unblocked={}, last_completed={}",
+ __func__, op.get_id(), last_started_id, last_unblocked_id, last_completed_id);
+ op.finished = true;
+ // fix the `last_unblocked_id`. otherwise we wouldn't be able to leave
+ // the wait loop in `start_op()` as any concrete value of `last_unblocked_id`
+ // can wake at most one blocked operation (the successor of `op` if there is any)
+ // and above we lowered this number to 0 (`get_prev_id()` there would never return
+ // a value matching `last_unblocked_id`).
+ if (last_unblocked_id == op.get_id()) {
+ last_unblocked_id = get_prev_id(op, registry);
+ }
}
void maybe_reset(const ClientRequest& op) {
+ ::crimson::get_logger(ceph_subsys_osd).debug(
+ "OpSequencer::{}: op={}, last_started={}, last_unblocked={}, last_completed={}",
+ __func__, op.get_id(), last_started_id, last_unblocked_id, last_completed_id);
const auto op_id = op.get_id();
// pg interval changes, so we need to reenqueue the previously unblocked
- // ops by rewinding the "last_unblock" pointer
- if (op_id <= last_unblocked) {
+ // ops by rewinding the "last_unblock" ID.
+ if (op_id <= last_unblocked_id) {
::crimson::get_logger(ceph_subsys_osd).debug(
"OpSequencer::maybe_reset:{} {} <= {}, resetting to {}",
- op, op_id, last_unblocked, last_completed);
- last_unblocked = last_completed;
+ op, op_id, last_unblocked_id, last_completed_id);
+ last_unblocked_id = last_completed_id;
+ resequencing = true;
}
}
void abort() {
+ ::crimson::get_logger(ceph_subsys_osd).debug(
+ "OpSequencer::{}: last_started={}, last_unblocked={}, last_completed={}",
+ __func__, last_started_id, last_unblocked_id, last_completed_id);
// all blocked ops should be canceled, likely due to the osd is not primary
// anymore.
unblocked.broken();
// /--- unblocked (in pg pipeline)
// | /--- blocked
// V V
- // |////|.....|.......| <--- last_issued
+ // |////|.....|.......| <--- last_started
// ^ ^ ^
// | | \- prev_op
// | \--- last_unblocked
// last_completed
//
// the id of last op which is issued
- uint64_t last_issued = 0;
+ std::uint64_t last_started_id = 0;
// the id of last op which is unblocked
- uint64_t last_unblocked = 0;
+ std::uint64_t last_unblocked_id = 0;
// the id of last op which is completed
- uint64_t last_completed = 0;
+ std::uint64_t last_completed_id = 0;
seastar::condition_variable unblocked;
friend fmt::formatter<OpSequencer>;
FormatContext& ctx)
{
return fmt::format_to(ctx.out(),
- "(last_completed={},last_unblocked={},last_issued={})",
- sequencer.last_completed,
- sequencer.last_unblocked,
- sequencer.last_issued);
+ "(last_completed={},last_unblocked={},last_started={})",
+ sequencer.last_completed_id,
+ sequencer.last_unblocked_id,
+ sequencer.last_started_id);
}
};
void ClientRequest::print(std::ostream &lhs) const
{
- lhs << "m=[" << *m << "], prev_op_id=" << prev_op_id;
+ lhs << "m=[" << *m << "]";
}
void ClientRequest::dump_detail(Formatter *f) const
return pg.client_request_pg_pipeline;
}
+bool ClientRequest::same_session_and_pg(const ClientRequest& other_op) const
+{
+ return &get_osd_priv(conn.get()) == &get_osd_priv(other_op.conn.get()) &&
+ m->get_spg() == other_op.m->get_spg();
+}
+
bool ClientRequest::is_pg_op() const
{
return std::any_of(
[](auto& op) { return ceph_osd_op_type_pg(op.op.op); });
}
-void ClientRequest::may_set_prev_op()
-{
- // set prev_op_id if it's not set yet
- if (__builtin_expect(!prev_op_id.has_value(), true)) {
- prev_op_id.emplace(sequencer.get_last_issued());
- }
-}
-
template <typename FuncT>
ClientRequest::interruptible_future<> ClientRequest::with_sequencer(FuncT&& func)
{
- may_set_prev_op();
- return sequencer.start_op(*this, handle, std::forward<FuncT>(func))
- .then_interruptible([this] {
- sequencer.finish_op(*this);
- });
+ return sequencer.start_op(*this, handle, osd.get_shard_services().registry, std::forward<FuncT>(func));
}
seastar::future<> ClientRequest::start()
if (m->finish_decode()) {
m->clear_payload();
}
- const bool has_pg_op = is_pg_op();
- auto interruptible_do_op = interruptor::wrap_function([=] {
+ return with_sequencer(interruptor::wrap_function([pgref, this] {
PG &pg = *pgref;
if (pg.can_discard_op(*m)) {
- return interruptible_future<>(
- osd.send_incremental_map(conn, m->get_map_epoch()));
+ return osd.send_incremental_map(
+ conn, m->get_map_epoch()).then([this] {
+ sequencer.finish_op_out_of_order(*this, osd.get_shard_services().registry);
+ return interruptor::now();
+ });
}
return with_blocking_future_interruptible<IOInterruptCondition>(
handle.enter(pp(pg).await_map)
}).then_interruptible([this, &pg]() {
return with_blocking_future_interruptible<IOInterruptCondition>(
pg.wait_for_active_blocker.wait());
- }).then_interruptible([this,
- has_pg_op,
- pgref=std::move(pgref)]() mutable {
- return (has_pg_op ?
- process_pg_op(pgref) :
- process_op(pgref));
+ }).then_interruptible([this, pgref=std::move(pgref)]() mutable {
+ if (is_pg_op()) {
+ return process_pg_op(pgref).then_interruptible([this] {
+ sequencer.finish_op_out_of_order(*this, osd.get_shard_services().registry);
+ });
+ } else {
+ return process_op(pgref).then_interruptible([this] {
+ // NOTE: this assumes process_op() handles everything
+ // in-order which I'm not sure about.
+ sequencer.finish_op_in_order(*this);
+ });
+ }
});
+ })).then_interruptible([pgref] {
+ return seastar::stop_iteration::yes;
});
- // keep the ordering of non-pg ops when across pg internvals
- return (has_pg_op ?
- interruptible_do_op() :
- with_sequencer(std::move(interruptible_do_op)))
- .then_interruptible([pgref]() {
- return seastar::stop_iteration::yes;
- });
}, [this, pgref](std::exception_ptr eptr) {
if (should_abort_request(*this, std::move(eptr))) {
sequencer.abort();