-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
// vim: ts=8 sw=2 smarttab
#pragma once
#include <map>
-
+#include <seastar/core/condition-variable.hh>
#include "crimson/common/operation.h"
#include "osd/osd_types.h"
namespace crimson::osd {
-template <typename>
-struct OperationComparator;
-
-template <typename T>
-class OperationRepeatSequencer {
+// when PG interval changes, we are supposed to interrupt all in-flight ops.
+// but in the order in which the ops are interrupted are not determined
+// because they are scheduled by the seastar scheduler, if we just interrupt
+// them at seeing a different interval when moving to a new continuation. but
+// we are supposed to replay the ops from the same client targeting the same
+// PG in the exact order that they are received.
+//
+// the way how we address this problem is to set up a blocker which blocks an
+// op until the preceding op is unblocked if the blocked one is issued in a new
+// pg interval.
+//
+// here, the ops from the same client are grouped by PG, and then ordered by
+// their id which is monotonically increasing and unique on per PG basis, so we
+// can keep an op waiting in the case explained above.
+class OpSequencer {
public:
- using OpRef = boost::intrusive_ptr<T>;
- using ops_sequence_t = std::map<OpRef, seastar::promise<>>;
-
- template <typename Func, typename HandleT, typename Result = std::invoke_result_t<Func>>
- seastar::futurize_t<Result> start_op(
- HandleT& handle,
- epoch_t same_interval_since,
- OpRef& op,
- const spg_t& pgid,
- Func&& func) {
- auto& ops = pg_ops[pgid];
- if (!op->pos) {
- [[maybe_unused]] auto [it, inserted] = ops.emplace(op, seastar::promise<>());
- assert(inserted);
- op->pos = it;
+ template <typename HandleT,
+ typename FuncT,
+ typename Result = std::invoke_result_t<FuncT>>
+ seastar::futurize_t<Result>
+ start_op(HandleT& handle,
+ uint64_t prev_op,
+ uint64_t this_op,
+ FuncT&& do_op) {
+ auto have_green_light = seastar::make_ready_future<>();
+ assert(prev_op < this_op);
+ if (last_issued == prev_op) {
+ // starting a new op, let's advance the last_issued!
+ last_issued = this_op;
}
-
- auto curr_op_pos = *(op->pos);
- const bool first = (curr_op_pos == ops.begin());
- auto prev_op_pos = first ? curr_op_pos : std::prev(curr_op_pos);
- auto prev_ops_drained = seastar::now();
- if (epoch_t prev_interval = prev_op_pos->first->interval_start_epoch;
- !first && same_interval_since > prev_interval) {
- // need to wait for previous operations,
- // release the current pipepline stage
+ if (prev_op != last_unblocked) {
+ // this implies that there are some blocked ops before me, so i have to
+ // wait until they are unblocked.
+ //
+ // i should leave the current pipeline stage when waiting for the blocked
+ // ones, so that the following ops can be queued up here. we cannot let
+ // the seastar scheduler to determine the order of performing these ops,
+ // once they are unblocked after the first op of the same pg interval is
+ // scheduled.
+ assert(prev_op > last_unblocked);
handle.exit();
- auto& [prev_op, prev_op_complete] = *prev_op_pos;
::crimson::get_logger(ceph_subsys_osd).debug(
- "{}, same_interval_since: {}, previous op: {}, last_interval_start: {}",
- *op, same_interval_since, prev_op, prev_interval);
- prev_ops_drained = prev_op_complete.get_future();
- } else {
- assert(same_interval_since == prev_interval || first);
+ "OpSequencer::start_op: {} waiting ({} > {})",
+ this_op, prev_op, last_unblocked);
+ have_green_light = unblocked.wait([prev_op, this] {
+ // wait until the previous op is unblocked
+ return last_unblocked == prev_op;
+ });
}
- return prev_ops_drained.then(
- [op, same_interval_since, func=std::forward<Func>(func)]() mutable {
- op->interval_start_epoch = same_interval_since;
- auto fut = seastar::futurize_invoke(func);
- auto curr_op_pos = *(op->pos);
- curr_op_pos->second.set_value();
- curr_op_pos->second = seastar::promise<>();
- return fut;
+ return have_green_light.then([this_op, do_op=std::move(do_op), this] {
+ auto result = seastar::futurize_invoke(do_op);
+ // unblock the next one
+ last_unblocked = this_op;
+ unblocked.broadcast();
+ return result;
});
}
-
- void finish_op(OpRef& op, const spg_t& pgid, bool interrutped) {
- assert(op->pos);
- auto curr_op_pos = *(op->pos);
- if (interrutped) {
- curr_op_pos->second.set_value();
+ uint64_t get_last_issued() const {
+ return last_issued;
+ }
+ void finish_op(uint64_t op_id) {
+ assert(op_id > last_completed);
+ last_completed = op_id;
+ }
+ void maybe_reset(uint64_t op_id) {
+ // pg interval changes, so we need to reenqueue the previously unblocked
+ // ops by rewinding the "last_unblock" pointer
+ if (op_id <= last_unblocked) {
+ last_unblocked = last_completed;
}
- pg_ops.at(pgid).erase(curr_op_pos);
+ }
+ void abort() {
+ // all blocked ops should be canceled, likely due to the osd is not primary
+ // anymore.
+ unblocked.broken();
}
private:
- std::map<spg_t, std::map<OpRef, seastar::promise<>, OperationComparator<T>>> pg_ops;
+ // /--- unblocked (in pg pipeline)
+ // | /--- blocked
+ // V V
+ // |////|.....|.......| <--- last_issued
+ // ^ ^ ^
+ // | | \- prev_op
+ // | \--- last_unblocked
+ // last_completed
+ //
+ // the id of last op which is issued
+ uint64_t last_issued = 0;
+ // the id of last op which is unblocked
+ uint64_t last_unblocked = 0;
+ // the id of last op which is completed
+ uint64_t last_completed = 0;
+ seastar::condition_variable unblocked;
};
-template <typename T>
-struct OperationComparator {
- bool operator()(
- const typename OperationRepeatSequencer<T>::OpRef& left,
- const typename OperationRepeatSequencer<T>::OpRef& right) const {
- return left->get_id() < right->get_id();
+
+class OperationRepeatSequencer {
+public:
+ OpSequencer& get(const spg_t& pgid) {
+ return pg_ops.at(pgid);
}
+ OpSequencer& operator[](const spg_t& pgid) {
+ // TODO: trim pg_ops if there are too many empty sequencers
+ return pg_ops[pgid];
+ }
+private:
+ std::map<spg_t, OpSequencer> pg_ops;
};
-
} // namespace crimson::osd
ClientRequest::ClientRequest(
OSD &osd, crimson::net::ConnectionRef conn, Ref<MOSDOp> &&m)
- : osd(osd), conn(conn), m(m), ors(get_osd_priv(conn.get()).op_sequencer)
+ : osd(osd),
+ conn(conn),
+ m(m),
+ sequencer(get_osd_priv(conn.get()).op_sequencer[m->get_spg()]),
+ prev_op_id(sequencer.get_last_issued())
{}
ClientRequest::~ClientRequest()
}).then([this, opref](Ref<PG> pgref) mutable {
epoch_t same_interval_since = pgref->get_interval_start_epoch();
logger().debug("{} same_interval_since: {}", *this, same_interval_since);
- return ors.start_op(
- handle, same_interval_since, opref, pgref->get_pgid(),
+ return sequencer.start_op(
+ handle, prev_op_id, opref->get_id(),
[this, opref, pgref] {
PG &pg = *pgref;
if (pg.can_discard_op(*m)) {
return process_op(pgref);
}
});
- }).then([this, opref, pgref]() mutable {
- ors.finish_op(opref, pgref->get_pgid(), false);
- return seastar::stop_iteration::yes;
+ }).then([this] {
+ sequencer.finish_op(get_id());
+ return seastar::stop_iteration::yes;
}).handle_exception_type(
- [opref, pgref, this](crimson::common::actingset_changed& e) mutable {
+ [this](crimson::common::actingset_changed& e) mutable {
if (e.is_primary()) {
logger().debug("operation restart, acting set changed");
+ sequencer.maybe_reset(get_id());
return seastar::stop_iteration::no;
} else {
- ors.finish_op(opref, pgref->get_pgid(), true);
+ sequencer.abort();
logger().debug("operation abort, up primary changed");
return seastar::stop_iteration::yes;
}
+ }).handle_exception_type(
+ [](seastar::broken_condition_variable&) {
+ return seastar::stop_iteration::yes;
});
});
});