#include "crimson/net/Connection.h"
#include "crimson/osd/osd_operation.h"
-#include "crimson/osd/osd_operation_sequencer.h"
#include "crimson/osd/osd_operations/client_request.h"
#include "crimson/osd/osd_operations/peering_event.h"
#include "crimson/osd/osd_operations/replicated_request.h"
namespace crimson::osd {
struct OSDConnectionPriv : public crimson::net::Connection::user_private_t {
- OpSequencers op_sequencer;
ConnectionPipeline client_request_conn_pipeline;
ConnectionPipeline peering_request_conn_pipeline;
ConnectionPipeline replicated_request_conn_pipeline;
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
-// vim: ts=8 sw=2 smarttab
-
-#pragma once
-
-#include <map>
-#include <fmt/format.h>
-#include <seastar/core/condition-variable.hh>
-#include "crimson/osd/osd_operations/client_request.h"
-
-namespace crimson::osd {
-
-// when PG interval changes, we are supposed to interrupt all in-flight ops.
-// but in the order in which the ops are interrupted are not determined
-// because they are scheduled by the seastar scheduler, if we just interrupt
-// them at seeing a different interval when moving to a new continuation. but
-// we are supposed to replay the ops from the same client targeting the same
-// PG in the exact order that they are received.
-//
-// the way how we address this problem is to set up a blocker which blocks an
-// op until the preceding op is unblocked if the blocked one is issued in a new
-// pg interval.
-//
-// here, the ops from the same client are grouped by PG, and then ordered by
-// their id which is monotonically increasing and unique on per PG basis, so we
-// can keep an op waiting in the case explained above.
-class OpSequencer {
- bool resequencing{false};
-
- static bool is_unfinished(const ClientRequest& this_op) {
- // TODO: kill the tombstone; reuse op status tracking.
- return !this_op.finished;
- }
-
- std::uint64_t get_prev_id(const ClientRequest& this_op,
- const OSDOperationRegistry& registry) {
- // an alternative to iterating till the registy's beginning could be
- // holding a pointer to next(last_completed).
- constexpr auto type_idx = static_cast<size_t>(ClientRequest::type);
- for (auto it = OSDOperationRegistry::op_list::s_iterator_to(this_op);
- it != std::begin(registry.get_registry<type_idx>());
- --it) {
- // we're iterating over the operation registry of all ClientRequests.
- // this list aggrates every single instance in the system, and thus
- // we need to skip operations coming from different client's session
- // or targeting different PGs.
- // as this is supposed to happen on cold paths only, the overhead is
- // a thing we can live with.
- auto* maybe_prev_op = std::addressof(static_cast<const ClientRequest&>(*it));
- if (maybe_prev_op->same_session_and_pg(this_op)) {
- if (is_unfinished(*maybe_prev_op)) {
- return maybe_prev_op->get_id();
- } else {
- // an early exited one
- }
- }
- }
- // the prev op of this session targeting the same PG as this_op must has
- // been completed and hence already has been removed from the list, that's
- // the only way we got here
- return last_completed_id;
- }
-
-public:
- template <typename HandleT,
- typename FuncT,
- typename Result = std::invoke_result_t<FuncT>>
- seastar::futurize_t<Result>
- start_op(const ClientRequest& op,
- HandleT& handle,
- const OSDOperationRegistry& registry,
- FuncT&& do_op) {
- ::crimson::get_logger(ceph_subsys_osd).debug(
- "OpSequencer::{}: op={}, last_started={}, last_unblocked={}, last_completed={}",
- __func__, op.get_id(), last_started_id, last_unblocked_id, last_completed_id);
- auto have_green_light = seastar::make_ready_future<>();
- if (last_started_id < op.get_id()) {
- // starting a new op, let's advance the last_started!
- last_started_id = op.get_id();
- }
- if (__builtin_expect(resequencing, false)) {
- // this implies that there was a reset condition and there may me some
- // older ops before me, so i have to wait until they are unblocked.
- //
- // i should leave the current pipeline stage when waiting for the blocked
- // ones, so that the following ops can be queued up here. we cannot let
- // the seastar scheduler to determine the order of performing these ops,
- // once they are unblocked after the first op of the same pg interval is
- // scheduled.
- const auto prev_id = get_prev_id(op, registry);
- assert(prev_id >= last_unblocked_id);
- handle.exit();
- ::crimson::get_logger(ceph_subsys_osd).debug(
- "OpSequencer::start_op: {} resequencing ({} >= {})",
- op, prev_id, last_unblocked_id);
- have_green_light = unblocked.wait([&op, ®istry, this] {
- // wait until the previous op is unblocked
- const bool unblocking =
- get_prev_id(op, registry) == last_unblocked_id;
- if (unblocking) {
- // stop resequencing if everything is handled which means there is no
- // operation after us. the range could be minimized by snapshotting
- // `last_started` on `maybe_reset()`.
- // `<=` is to handle the situation when `last_started` has finished out-
- // of-the-order.
- resequencing = !(last_started_id <= op.get_id());
- }
- return unblocking;
- });
- }
- return have_green_light.then([&op, do_op=std::move(do_op), this]() mutable {
- auto result = seastar::futurize_invoke(std::move(do_op));
- // unblock the next one
- last_unblocked_id = op.get_id();
- unblocked.broadcast();
- return result;
- });
- }
- void finish_op_in_order(ClientRequest& op) {
- ::crimson::get_logger(ceph_subsys_osd).debug(
- "OpSequencer::{}: op={}, last_started={}, last_unblocked={}, last_completed={}",
- __func__, op.get_id(), last_started_id, last_unblocked_id, last_completed_id);
- assert(op.get_id() > last_completed_id);
- last_completed_id = op.get_id();
- op.finished = true;
- }
- void finish_op_out_of_order(ClientRequest& op,
- const OSDOperationRegistry& registry) {
- ::crimson::get_logger(ceph_subsys_osd).debug(
- "OpSequencer::{}: op={}, last_started={}, last_unblocked={}, last_completed={}",
- __func__, op.get_id(), last_started_id, last_unblocked_id, last_completed_id);
- op.finished = true;
- // fix the `last_unblocked_id`. otherwise we wouldn't be able to leave
- // the wait loop in `start_op()` as any concrete value of `last_unblocked_id`
- // can wake at most one blocked operation (the successor of `op` if there is any)
- // and above we lowered this number to 0 (`get_prev_id()` there would never return
- // a value matching `last_unblocked_id`).
- if (last_unblocked_id == op.get_id()) {
- last_unblocked_id = get_prev_id(op, registry);
- }
- }
- void maybe_reset(const ClientRequest& op) {
- ::crimson::get_logger(ceph_subsys_osd).debug(
- "OpSequencer::{}: op={}, last_started={}, last_unblocked={}, last_completed={}",
- __func__, op.get_id(), last_started_id, last_unblocked_id, last_completed_id);
- const auto op_id = op.get_id();
- // pg interval changes, so we need to reenqueue the previously unblocked
- // ops by rewinding the "last_unblock" ID.
- if (op_id <= last_unblocked_id) {
- ::crimson::get_logger(ceph_subsys_osd).debug(
- "OpSequencer::maybe_reset:{} {} <= {}, resetting to {}",
- op, op_id, last_unblocked_id, last_completed_id);
- last_unblocked_id = last_completed_id;
- resequencing = true;
- }
- }
- void abort() {
- ::crimson::get_logger(ceph_subsys_osd).debug(
- "OpSequencer::{}: last_started={}, last_unblocked={}, last_completed={}",
- __func__, last_started_id, last_unblocked_id, last_completed_id);
- // all blocked ops should be canceled, likely due to the osd is not primary
- // anymore.
- unblocked.broken();
- }
-private:
- // /--- unblocked (in pg pipeline)
- // | /--- blocked
- // V V
- // |////|.....|.......| <--- last_started
- // ^ ^ ^
- // | | \- prev_op
- // | \--- last_unblocked
- // last_completed
- //
- // the id of last op which is issued
- std::uint64_t last_started_id = 0;
- // the id of last op which is unblocked
- std::uint64_t last_unblocked_id = 0;
- // the id of last op which is completed
- std::uint64_t last_completed_id = 0;
- seastar::condition_variable unblocked;
-
- friend fmt::formatter<OpSequencer>;
-};
-
-
-class OpSequencers {
-public:
- OpSequencer& get(const spg_t& pgid) {
- return pg_ops.at(pgid);
- }
- OpSequencer& operator[](const spg_t& pgid) {
- // TODO: trim pg_ops if there are too many empty sequencers
- return pg_ops[pgid];
- }
-private:
- std::map<spg_t, OpSequencer> pg_ops;
-};
-} // namespace crimson::osd
-
-template <>
-struct fmt::formatter<crimson::osd::OpSequencer> {
- // ignore the format string
- constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); }
-
- template <typename FormatContext>
- auto format(const crimson::osd::OpSequencer& sequencer,
- FormatContext& ctx)
- {
- return fmt::format_to(ctx.out(),
- "(last_completed={},last_unblocked={},last_started={})",
- sequencer.last_completed_id,
- sequencer.last_unblocked_id,
- sequencer.last_started_id);
- }
-};
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
// vim: ts=8 sw=2 smarttab expandtab
-#include <seastar/core/future.hh>
-
#include "messages/MOSDOp.h"
#include "messages/MOSDOpReply.h"
#include "crimson/osd/pg.h"
#include "crimson/osd/osd.h"
#include "common/Formatter.h"
-#include "crimson/osd/osd_operation_sequencer.h"
#include "crimson/osd/osd_operation_external_tracking.h"
#include "crimson/osd/osd_operations/client_request.h"
#include "crimson/osd/osd_connection_priv.h"
namespace crimson::osd {
+
+void ClientRequest::Orderer::requeue(
+ ShardServices &shard_services, Ref<PG> pg)
+{
+ for (auto &req: list) {
+ req.handle.exit();
+ }
+ for (auto &req: list) {
+ logger().debug("{}: {} requeueing {}", __func__, *pg, req);
+ std::ignore = req.with_pg_int(shard_services, pg);
+ }
+}
+
+void ClientRequest::Orderer::clear_and_cancel()
+{
+ for (auto i = list.begin(); i != list.end(); ) {
+ logger().debug(
+ "{}: ClientRequest::Orderer::clear_and_cancel {}",
+ *i);
+ i->complete_request();
+ remove_request(*(i++));
+ }
+}
+
+void ClientRequest::complete_request()
+{
+ track_event<CompletionEvent>();
+ handle.exit();
+ on_complete.set_value();
+}
+
ClientRequest::ClientRequest(
OSD &osd, crimson::net::ConnectionRef conn, Ref<MOSDOp> &&m)
: osd(osd),
conn(conn),
- m(m),
- sequencer(get_osd_priv(conn.get()).op_sequencer[m->get_spg()])
+ m(m)
{}
ClientRequest::~ClientRequest()
[](auto& op) { return ceph_osd_op_type_pg(op.op.op); });
}
-seastar::future<seastar::stop_iteration> ClientRequest::with_pg_int(
+seastar::future<> ClientRequest::with_pg_int(
ShardServices &shard_services, Ref<PG> pgref)
{
epoch_t same_interval_since = pgref->get_interval_start_epoch();
if (m->finish_decode()) {
m->clear_payload();
}
+ const auto this_instance_id = instance_id++;
+ OperationRef opref{this};
return interruptor::with_interruption(
- [this, &shard_services, pgref]() mutable {
- return sequencer.start_op(
- *this, handle, shard_services.registry,
- interruptor::wrap_function([pgref, this, &shard_services] {
- PG &pg = *pgref;
- if (pg.can_discard_op(*m)) {
- return osd.send_incremental_map(
- conn, m->get_map_epoch()
- ).then([this, &shard_services] {
- sequencer.finish_op_out_of_order(*this, shard_services.registry);
- return interruptor::now();
- });
- }
- return enter_stage<interruptor>(pp(pg).await_map
- ).then_interruptible([this, &pg] {
- return with_blocking_event<
- PG_OSDMapGate::OSDMapBlocker::BlockingEvent
- >([this, &pg] (auto&& trigger) {
- return pg.osdmap_gate.wait_for_map(std::move(trigger),
- m->get_min_epoch());
- });
- }).then_interruptible([this, &pg](auto map) {
- return enter_stage<interruptor>(pp(pg).wait_for_active);
- }).then_interruptible([this, &pg]() {
- return with_blocking_event<
- PGActivationBlocker::BlockingEvent
- >([&pg] (auto&& trigger) {
- return pg.wait_for_active_blocker.wait(std::move(trigger));
- });
- }).then_interruptible(
- [this, &shard_services, pgref=std::move(pgref)]() mutable {
- if (is_pg_op()) {
- return process_pg_op(
- pgref
- ).then_interruptible([this, &shard_services] {
- sequencer.finish_op_out_of_order(*this, shard_services.registry);
- });
- } else {
- return process_op(
- pgref
- ).then_interruptible([this, &shard_services](const seq_mode_t mode) {
- if (mode == seq_mode_t::IN_ORDER) {
- sequencer.finish_op_in_order(*this);
- } else {
- assert(mode == seq_mode_t::OUT_OF_ORDER);
- sequencer.finish_op_out_of_order(*this, shard_services.registry);
- }
- });
- }
- });
- })).then_interruptible([] {
- return seastar::stop_iteration::yes;
+ [this, pgref, this_instance_id]() mutable {
+ PG &pg = *pgref;
+ if (pg.can_discard_op(*m)) {
+ return osd.send_incremental_map(
+ conn, m->get_map_epoch()
+ ).then([this, this_instance_id, pgref] {
+ logger().debug("{}.{}: discarding", *this, this_instance_id);
+ pgref->client_request_orderer.remove_request(*this);
+ complete_request();
+ return interruptor::now();
});
- }, [this, pgref](std::exception_ptr eptr) {
- if (should_abort_request(*this, std::move(eptr))) {
- sequencer.abort();
- return seastar::stop_iteration::yes;
- } else {
- sequencer.maybe_reset(*this);
- return seastar::stop_iteration::no;
}
- }, pgref);
+ return enter_stage<interruptor>(pp(pg).await_map
+ ).then_interruptible([this, this_instance_id, &pg] {
+ logger().debug("{}.{}: after await_map stage", *this, this_instance_id);
+ return with_blocking_event<
+ PG_OSDMapGate::OSDMapBlocker::BlockingEvent
+ >([this, &pg] (auto&& trigger) {
+ return pg.osdmap_gate.wait_for_map(std::move(trigger),
+ m->get_min_epoch());
+ });
+ }).then_interruptible([this, this_instance_id, &pg](auto map) {
+ logger().debug("{}.{}: after wait_for_map", *this, this_instance_id);
+ return enter_stage<interruptor>(pp(pg).wait_for_active);
+ }).then_interruptible([this, this_instance_id, &pg]() {
+ logger().debug(
+ "{}.{}: after wait_for_active stage", *this, this_instance_id);
+ return with_blocking_event<
+ PGActivationBlocker::BlockingEvent
+ >([&pg] (auto&& trigger) {
+ return pg.wait_for_active_blocker.wait(std::move(trigger));
+ });
+ }).then_interruptible([this, pgref, this_instance_id]() mutable
+ -> interruptible_future<> {
+ logger().debug(
+ "{}.{}: after wait_for_active", *this, this_instance_id);
+ if (is_pg_op()) {
+ return process_pg_op(pgref);
+ } else {
+ return process_op(
+ pgref
+ ).then_interruptible([](auto){});
+ }
+ }).then_interruptible([this, this_instance_id, pgref] {
+ logger().debug("{}.{}: after process*", *this, this_instance_id);
+ pgref->client_request_orderer.remove_request(*this);
+ complete_request();
+ });
+ }, [this, this_instance_id, pgref](std::exception_ptr eptr) {
+ // TODO: better debug output
+ logger().debug("{}.{}: interrupted {}", *this, this_instance_id, eptr);
+ }, pgref).finally([opref=std::move(opref), pgref=std::move(pgref)] {});
}
seastar::future<> ClientRequest::with_pg(
ShardServices &shard_services, Ref<PG> pgref)
{
- return seastar::repeat([this, &shard_services, pgref]() mutable {
- return with_pg_int(shard_services, pgref);
- }).then([this] {
- track_event<CompletionEvent>();
- });
+ pgref->client_request_orderer.add_request(*this);
+ auto ret = on_complete.get_future();
+ std::ignore = with_pg_int(
+ shard_services, std::move(pgref)
+ );
+ return ret;
}
ClientRequest::interruptible_future<>
#pragma once
+#include <seastar/core/future.hh>
+
+#include <boost/intrusive/list.hpp>
+#include <boost/intrusive_ptr.hpp>
+
#include "osd/osd_op_util.h"
#include "crimson/net/Connection.h"
#include "crimson/osd/object_context.h"
crimson::net::ConnectionRef conn;
Ref<MOSDOp> m;
OpInfo op_info;
+ seastar::promise<> on_complete;
+ unsigned instance_id = 0;
public:
class PGPipeline : public CommonPGPipeline {
friend class LttngBackend;
};
+ using ordering_hook_t = boost::intrusive::list_member_hook<>;
+ ordering_hook_t ordering_hook;
+ class Orderer {
+ using list_t = boost::intrusive::list<
+ ClientRequest,
+ boost::intrusive::member_hook<
+ ClientRequest,
+ typename ClientRequest::ordering_hook_t,
+ &ClientRequest::ordering_hook>
+ >;
+ list_t list;
+
+ public:
+ void add_request(ClientRequest &request) {
+ assert(!request.ordering_hook.is_linked());
+ intrusive_ptr_add_ref(&request);
+ list.push_back(request);
+ }
+ void remove_request(ClientRequest &request) {
+ assert(request.ordering_hook.is_linked());
+ list.erase(list_t::s_iterator_to(request));
+ intrusive_ptr_release(&request);
+ }
+ void requeue(ShardServices &shard_services, Ref<PG> pg);
+ void clear_and_cancel();
+ };
+ void complete_request();
+
static constexpr OperationTypeCode type = OperationTypeCode::client_request;
ClientRequest(OSD &osd, crimson::net::ConnectionRef, Ref<MOSDOp> &&m);
PipelineHandle &get_handle() { return handle; }
epoch_t get_epoch() const { return m->get_min_epoch(); }
- seastar::future<seastar::stop_iteration> with_pg_int(
+ seastar::future<> with_pg_int(
ShardServices &shard_services, Ref<PG> pg);
public:
bool same_session_and_pg(const ClientRequest& other_op) const;
ConnectionPipeline &cp();
PGPipeline &pp(PG &pg);
- class OpSequencer& sequencer;
- // a tombstone used currently by OpSequencer. In the future it's supposed
- // to be replaced with a reusage of OpTracking facilities.
- bool finished = false;
- friend class OpSequencer;
-
template <typename Errorator>
using interruptible_errorator =
::crimson::interruptible::interruptible_errorator<
seastar::future<> start();
std::tuple<
+ ConnectionPipeline::AwaitActive::BlockingEvent,
ConnectionPipeline::AwaitMap::BlockingEvent,
- OSD_OSDMapGate::OSDMapBlocker::BlockingEvent,
ConnectionPipeline::GetPG::BlockingEvent,
- PGMap::PGCreationBlockingEvent
+ PGMap::PGCreationBlockingEvent,
+ OSD_OSDMapGate::OSDMapBlocker::BlockingEvent
> tracking_events;
private:
}
void PG::on_change(ceph::os::Transaction &t) {
- logger().debug("{}, {}", __func__, *this);
+ logger().debug("{} {}:", *this, __func__);
for (auto& obc : obc_set_accessing) {
obc.interrupt(::crimson::common::actingset_changed(is_primary()));
}
recovery_backend->on_peering_interval_change(t);
backend->on_actingset_changed({ is_primary() });
+ wait_for_active_blocker.unblock();
+ if (is_primary()) {
+ logger().debug("{} {}: requeueing", *this, __func__);
+ client_request_orderer.requeue(shard_services, this);
+ } else {
+ logger().debug("{} {}: dropping requests", *this, __func__);
+ client_request_orderer.clear_and_cancel();
+ }
}
bool PG::can_discard_op(const MOSDOp& m) const {
}
namespace crimson::osd {
-class ClientRequest;
class OpsExecuter;
class PG : public boost::intrusive_ref_counter<
PGPeeringPipeline peering_request_pg_pipeline;
RepRequest::PGPipeline replicated_request_pg_pipeline;
+ ClientRequest::Orderer client_request_orderer;
+
spg_t pgid;
pg_shard_t pg_whoami;
crimson::os::CollectionRef coll_ref;