namespace crimson::osd {
struct OSDConnectionPriv : public crimson::net::Connection::user_private_t {
+ OperationRepeatSequencer<ClientRequest> opSequencer;
ClientRequest::ConnectionPipeline client_request_conn_pipeline;
RemotePeeringEvent::ConnectionPipeline peering_request_conn_pipeline;
RepRequest::ConnectionPipeline replicated_request_conn_pipeline;
#pragma once
-#include "crimson/common/operation.h"
+#include "crimson/osd/osd_operation_sequencer.h"
#include "crimson/osd/scheduler/scheduler.h"
namespace crimson::osd {
virtual ~OperationT() = default;
private:
+ epoch_t interval_start_epoch = 0;
+ using ops_seq_iter_t =
+ typename OperationRepeatSequencer<T>::ops_sequence_t::iterator;
+ std::optional<ops_seq_iter_t> pos;
virtual void dump_detail(ceph::Formatter *f) const = 0;
+ template <typename>
+ friend class OperationRepeatSequencer;
};
/**
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <map>
+
+#include "crimson/common/operation.h"
+#include "osd/osd_types.h"
+
+namespace crimson::osd {
+
+template <typename>
+struct OperationComparator;
+
+template <typename T>
+class OperationRepeatSequencer {
+public:
+ using OpRef = boost::intrusive_ptr<T>;
+ using ops_sequence_t = std::map<OpRef, seastar::promise<>>;
+
+ template <typename Func, typename HandleT, typename Result = std::invoke_result_t<Func>>
+ seastar::futurize_t<Result> start_op(
+ HandleT& handle,
+ epoch_t same_interval_since,
+ OpRef& op,
+ const spg_t& pgid,
+ Func&& func) {
+ auto& ops = pg_ops[pgid];
+ if (!op->pos) {
+ [[maybe_unused]] auto [it, inserted] = ops.emplace(op, seastar::promise<>());
+ assert(inserted);
+ op->pos = it;
+ }
+
+ auto curr_op_pos = *(op->pos);
+ const bool first = (curr_op_pos == ops.begin());
+ auto prev_op_pos = first ? curr_op_pos : std::prev(curr_op_pos);
+ auto prev_ops_drained = seastar::now();
+ if (epoch_t prev_interval = prev_op_pos->first->interval_start_epoch;
+ !first && same_interval_since > prev_interval) {
+ // need to wait for previous operations,
+ // release the current pipepline stage
+ handle.exit();
+ auto& [prev_op, prev_op_complete] = *prev_op_pos;
+ ::crimson::get_logger(ceph_subsys_osd).debug(
+ "{}, same_interval_since: {}, previous op: {}, last_interval_start: {}",
+ *op, same_interval_since, prev_op, prev_interval);
+ prev_ops_drained = prev_op_complete.get_future();
+ } else {
+ assert(same_interval_since == prev_interval || first);
+ }
+ return prev_ops_drained.then(
+ [op, same_interval_since, func=std::forward<Func>(func)]() mutable {
+ op->interval_start_epoch = same_interval_since;
+ auto fut = seastar::futurize_invoke(func);
+ auto curr_op_pos = *(op->pos);
+ curr_op_pos->second.set_value();
+ curr_op_pos->second = seastar::promise<>();
+ return fut;
+ });
+ }
+
+ void finish_op(OpRef& op, const spg_t& pgid) {
+ assert(op->pos);
+ pg_ops.at(pgid).erase(*(op->pos));
+ }
+private:
+ std::map<spg_t, std::map<OpRef, seastar::promise<>, OperationComparator<T>>> pg_ops;
+};
+template <typename T>
+struct OperationComparator {
+ bool operator()(
+ const typename OperationRepeatSequencer<T>::OpRef& left,
+ const typename OperationRepeatSequencer<T>::OpRef& right) const {
+ return left->get_id() < right->get_id();
+ }
+};
+
+} // namespace crimson::osd
ClientRequest::ClientRequest(
OSD &osd, crimson::net::ConnectionRef conn, Ref<MOSDOp> &&m)
- : osd(osd), conn(conn), m(m)
+ : osd(osd), conn(conn), m(m), ors(get_osd_priv(conn.get()).opSequencer)
{}
+ClientRequest::~ClientRequest()
+{
+ logger().debug("{}: destroying", *this);
+}
+
void ClientRequest::print(std::ostream &lhs) const
{
lhs << *m;
return crimson::common::handle_system_shutdown(
[this, opref=std::move(opref)]() mutable {
return seastar::repeat([this, opref]() mutable {
+ logger().debug("{}: in repeat", *this);
return with_blocking_future(handle.enter(cp().await_map))
.then([this]() {
return with_blocking_future(osd.osdmap_gate.wait_for_map(m->get_min_epoch()));
return with_blocking_future(handle.enter(cp().get_pg));
}).then([this] {
return with_blocking_future(osd.wait_for_pg(m->get_spg()));
- }).then([this, opref](Ref<PG> pgref) {
- PG &pg = *pgref;
- if (pg.can_discard_op(*m)) {
- return osd.send_incremental_map(conn, m->get_map_epoch());
- }
- return with_blocking_future(
- handle.enter(pp(pg).await_map)
- ).then([this, &pg]() mutable {
- return with_blocking_future(
- pg.osdmap_gate.wait_for_map(m->get_min_epoch()));
- }).then([this, &pg](auto map) mutable {
- return with_blocking_future(
- handle.enter(pp(pg).wait_for_active));
- }).then([this, &pg]() mutable {
- return with_blocking_future(pg.wait_for_active_blocker.wait());
- }).then([this, pgref=std::move(pgref)]() mutable {
- if (m->finish_decode()) {
- m->clear_payload();
- }
- if (is_pg_op()) {
- return process_pg_op(pgref);
- } else {
- return process_op(pgref);
+ }).then([this, opref](Ref<PG> pgref) mutable {
+ epoch_t same_interval_since = pgref->get_interval_start_epoch();
+ logger().debug("{} same_interval_since: {}", *this, same_interval_since);
+ return ors.start_op(
+ handle, same_interval_since, opref, pgref->get_pgid(),
+ [this, opref, pgref] {
+ PG &pg = *pgref;
+ if (pg.can_discard_op(*m)) {
+ logger().debug("{} op discarded, {}, same_primary_since: {}",
+ *this, pg, pg.get_info().history.same_primary_since);
+ return osd.send_incremental_map(conn, m->get_map_epoch());
}
+ return with_blocking_future(
+ handle.enter(pp(pg).await_map)
+ ).then([this, &pg]() mutable {
+ return with_blocking_future(
+ pg.osdmap_gate.wait_for_map(m->get_min_epoch()));
+ }).then([this, &pg](auto map) mutable {
+ return with_blocking_future(
+ handle.enter(pp(pg).wait_for_active));
+ }).then([this, &pg]() mutable {
+ return with_blocking_future(pg.wait_for_active_blocker.wait());
+ }).then([this, pgref=std::move(pgref)]() mutable {
+ if (m->finish_decode()) {
+ m->clear_payload();
+ }
+ if (is_pg_op()) {
+ return process_pg_op(pgref);
+ } else {
+ return process_op(pgref);
+ }
+ });
+ }).then([this, opref, pgref]() mutable {
+ ors.finish_op(opref, pgref->get_pgid());
+ return seastar::stop_iteration::yes;
});
- }).then([] {
- return seastar::stop_iteration::yes;
}).handle_exception_type([](crimson::common::actingset_changed& e) {
if (e.is_primary()) {
logger().debug("operation restart, acting set changed");
static constexpr OperationTypeCode type = OperationTypeCode::client_request;
ClientRequest(OSD &osd, crimson::net::ConnectionRef, Ref<MOSDOp> &&m);
+ ~ClientRequest();
void print(std::ostream &) const final;
void dump_detail(Formatter *f) const final;
ConnectionPipeline &cp();
PGPipeline &pp(PG &pg);
+ OperationRepeatSequencer<ClientRequest>& ors;
private:
bool is_misdirected(const PG& pg) const;
};
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab expandtab
#pragma once
const map<pg_shard_t, pg_missing_t> &get_shard_missing() const {
return peering_state.get_peer_missing();
}
+ epoch_t get_interval_start_epoch() const {
+ return get_info().history.same_interval_since;
+ }
const pg_missing_const_i* get_shard_missing(pg_shard_t shard) const {
if (shard == pg_whoami)
return &get_local_missing();