}, tracking_events);
}
+ConnectionPipeline &ClientRequest::get_connection_pipeline()
+{
+ return get_osd_priv(conn.get()).client_request_conn_pipeline;
+}
+
ConnectionPipeline &ClientRequest::cp()
{
return get_osd_priv(conn.get()).client_request_conn_pipeline;
[](auto& op) { return ceph_osd_op_type_pg(op.op.op); });
}
-template <typename FuncT>
-ClientRequest::interruptible_future<> ClientRequest::with_sequencer(FuncT&& func)
+seastar::future<seastar::stop_iteration> ClientRequest::with_pg_int(
+ ShardServices &shard_services, Ref<PG> pgref)
{
- return sequencer.start_op(*this, handle, osd.get_shard_services().registry, std::forward<FuncT>(func));
+ epoch_t same_interval_since = pgref->get_interval_start_epoch();
+ logger().debug("{} same_interval_since: {}", *this, same_interval_since);
+ if (m->finish_decode()) {
+ m->clear_payload();
+ }
+ return interruptor::with_interruption(
+ [this, &shard_services, pgref]() mutable {
+ return sequencer.start_op(
+ *this, handle, shard_services.registry,
+ interruptor::wrap_function([pgref, this, &shard_services] {
+ PG &pg = *pgref;
+ if (pg.can_discard_op(*m)) {
+ return osd.send_incremental_map(
+ conn, m->get_map_epoch()
+ ).then([this, &shard_services] {
+ sequencer.finish_op_out_of_order(*this, shard_services.registry);
+ return interruptor::now();
+ });
+ }
+ return enter_stage<interruptor>(pp(pg).await_map
+ ).then_interruptible([this, &pg] {
+ return with_blocking_event<
+ PG_OSDMapGate::OSDMapBlocker::BlockingEvent
+ >([this, &pg] (auto&& trigger) {
+ return pg.osdmap_gate.wait_for_map(std::move(trigger),
+ m->get_min_epoch());
+ });
+ }).then_interruptible([this, &pg](auto map) {
+ return enter_stage<interruptor>(pp(pg).wait_for_active);
+ }).then_interruptible([this, &pg]() {
+ return with_blocking_event<
+ PGActivationBlocker::BlockingEvent
+ >([&pg] (auto&& trigger) {
+ return pg.wait_for_active_blocker.wait(std::move(trigger));
+ });
+ }).then_interruptible(
+ [this, &shard_services, pgref=std::move(pgref)]() mutable {
+ if (is_pg_op()) {
+ return process_pg_op(
+ pgref
+ ).then_interruptible([this, &shard_services] {
+ sequencer.finish_op_out_of_order(*this, shard_services.registry);
+ });
+ } else {
+ return process_op(
+ pgref
+ ).then_interruptible([this, &shard_services](const seq_mode_t mode) {
+ if (mode == seq_mode_t::IN_ORDER) {
+ sequencer.finish_op_in_order(*this);
+ } else {
+ assert(mode == seq_mode_t::OUT_OF_ORDER);
+ sequencer.finish_op_out_of_order(*this, shard_services.registry);
+ }
+ });
+ }
+ });
+ })).then_interruptible([] {
+ return seastar::stop_iteration::yes;
+ });
+ }, [this, pgref](std::exception_ptr eptr) {
+ if (should_abort_request(*this, std::move(eptr))) {
+ sequencer.abort();
+ return seastar::stop_iteration::yes;
+ } else {
+ sequencer.maybe_reset(*this);
+ return seastar::stop_iteration::no;
+ }
+ }, pgref);
}
-seastar::future<> ClientRequest::start()
+seastar::future<> ClientRequest::with_pg(
+ ShardServices &shard_services, Ref<PG> pgref)
{
- logger().debug("{}: start", *this);
-
- track_event<StartEvent>();
- return seastar::repeat([this, opref=IRef{this}]() mutable {
- logger().debug("{}: in repeat", *this);
- return enter_stage<>(cp().await_map).then([this]() {
- return with_blocking_event<OSD_OSDMapGate::OSDMapBlocker::BlockingEvent>(
- [this](auto&& trigger) {
- return osd.osdmap_gate.wait_for_map(std::move(trigger),
- m->get_min_epoch());
- });
- }).then([this](epoch_t epoch) {
- return enter_stage<>(cp().get_pg);
- }).then([this] {
- return with_blocking_event<PGMap::PGCreationBlockingEvent>(
- [this] (auto&& trigger) {
- return osd.wait_for_pg(std::move(trigger), m->get_spg());
- });
- }).then([this](Ref<PG> pgref) mutable {
- return interruptor::with_interruption([this, pgref]() mutable {
- epoch_t same_interval_since = pgref->get_interval_start_epoch();
- logger().debug("{} same_interval_since: {}", *this, same_interval_since);
- if (m->finish_decode()) {
- m->clear_payload();
- }
- return with_sequencer(interruptor::wrap_function([pgref, this] {
- PG &pg = *pgref;
- if (pg.can_discard_op(*m)) {
- return osd.send_incremental_map(
- conn, m->get_map_epoch()).then([this] {
- sequencer.finish_op_out_of_order(*this, osd.get_shard_services().registry);
- return interruptor::now();
- });
- }
- return enter_stage<interruptor>(
- pp(pg).await_map
- ).then_interruptible([this, &pg] {
- return with_blocking_event<PG_OSDMapGate::OSDMapBlocker::BlockingEvent>(
- [this, &pg] (auto&& trigger) {
- return pg.osdmap_gate.wait_for_map(std::move(trigger),
- m->get_min_epoch());
- });
- }).then_interruptible([this, &pg](auto&&) {
- return enter_stage<>(pp(pg).wait_for_active);
- }).then_interruptible([this, &pg]() {
- return with_blocking_event<PGActivationBlocker::BlockingEvent>(
- [&pg] (auto&& trigger) {
- return pg.wait_for_active_blocker.wait(std::move(trigger));
- });
- }).then_interruptible([this, pgref=std::move(pgref)]() mutable {
- if (is_pg_op()) {
- return process_pg_op(pgref).then_interruptible([this] {
- sequencer.finish_op_out_of_order(*this, osd.get_shard_services().registry);
- });
- } else {
- return process_op(pgref).then_interruptible([this] (const seq_mode_t mode) {
- if (mode == seq_mode_t::IN_ORDER) {
- sequencer.finish_op_in_order(*this);
- } else {
- assert(mode == seq_mode_t::OUT_OF_ORDER);
- sequencer.finish_op_out_of_order(*this, osd.get_shard_services().registry);
- }
- });
- }
- });
- })).then_interruptible([pgref] {
- return seastar::stop_iteration::yes;
- });
- }, [this, pgref](std::exception_ptr eptr) {
- if (should_abort_request(*this, std::move(eptr))) {
- sequencer.abort();
- return seastar::stop_iteration::yes;
- } else {
- sequencer.maybe_reset(*this);
- return seastar::stop_iteration::no;
- }
- }, pgref);
- });
- }).then([this] {
- track_event<CompletionEvent>();
- });
+ return seastar::repeat([this, &shard_services, pgref]() mutable {
+ return with_pg_int(shard_services, pgref);
+ }).then([this] {
+ track_event<CompletionEvent>();
+ });
}
ClientRequest::interruptible_future<>
ClientRequest::process_pg_op(
Ref<PG> &pg)
{
- return pg->do_pg_ops(m)
- .then_interruptible([this, pg=std::move(pg)](MURef<MOSDOpReply> reply) {
- return conn->send(std::move(reply));
- });
+ return pg->do_pg_ops(
+ m
+ ).then_interruptible([this, pg=std::move(pg)](MURef<MOSDOpReply> reply) {
+ return conn->send(std::move(reply));
+ });
}
ClientRequest::interruptible_future<ClientRequest::seq_mode_t>