#include "crimson/osd/osd_operations/peering_event.h"
#include "crimson/osd/osd_operations/recovery_subrequest.h"
#include "crimson/osd/osd_operations/replicated_request.h"
+#include "crimson/osd/pg_activation_blocker.h"
+#include "crimson/osd/pg_map.h"
namespace crimson::osd {
: ClientRequest::StartEvent::Backend,
ClientRequest::ConnectionPipeline::AwaitMap::BlockingEvent::Backend,
OSD_OSDMapGate::OSDMapBlocker::BlockingEvent::Backend,
+ ClientRequest::ConnectionPipeline::GetPG::BlockingEvent::Backend,
+ PGMap::PGCreationBlockingEvent::Backend,
+ ClientRequest::PGPipeline::AwaitMap::BlockingEvent::Backend,
+ PG_OSDMapGate::OSDMapBlocker::BlockingEvent::Backend,
+ ClientRequest::PGPipeline::WaitForActive::BlockingEvent::Backend,
+ PGActivationBlocker::BlockingEvent::Backend,
+ ClientRequest::PGPipeline::RecoverMissing::BlockingEvent::Backend,
+ ClientRequest::PGPipeline::GetOBC::BlockingEvent::Backend,
+ ClientRequest::PGPipeline::Process::BlockingEvent::Backend,
+ ClientRequest::PGPipeline::WaitRepop::BlockingEvent::Backend,
+ ClientRequest::PGPipeline::SendReply::BlockingEvent::Backend,
ClientRequest::CompletionEvent::Backend
{
void handle(ClientRequest::StartEvent&,
const OSD_OSDMapGate::OSDMapBlocker&) override {
}
+ void handle(ClientRequest::ConnectionPipeline::GetPG::BlockingEvent& ev,
+ const Operation& op,
+ const ClientRequest::ConnectionPipeline::GetPG& blocker) override {
+ }
+
+ void handle(PGMap::PGCreationBlockingEvent&,
+ const Operation&,
+ const PGMap::PGCreationBlocker&) override {
+ }
+
+ void handle(ClientRequest::PGPipeline::AwaitMap::BlockingEvent& ev,
+ const Operation& op,
+ const ClientRequest::PGPipeline::AwaitMap& blocker) override {
+ }
+
+ void handle(PG_OSDMapGate::OSDMapBlocker::BlockingEvent&,
+ const Operation&,
+ const PG_OSDMapGate::OSDMapBlocker&) override {
+ }
+
+ void handle(ClientRequest::PGPipeline::WaitForActive::BlockingEvent& ev,
+ const Operation& op,
+ const ClientRequest::PGPipeline::WaitForActive& blocker) override {
+ }
+
+ void handle(PGActivationBlocker::BlockingEvent& ev,
+ const Operation& op,
+ const PGActivationBlocker& blocker) override {
+ }
+
+ void handle(ClientRequest::PGPipeline::RecoverMissing::BlockingEvent& ev,
+ const Operation& op,
+ const ClientRequest::PGPipeline::RecoverMissing& blocker) override {
+ }
+
+ void handle(ClientRequest::PGPipeline::GetOBC::BlockingEvent& ev,
+ const Operation& op,
+ const ClientRequest::PGPipeline::GetOBC& blocker) override {
+ }
+
+ void handle(ClientRequest::PGPipeline::Process::BlockingEvent& ev,
+ const Operation& op,
+ const ClientRequest::PGPipeline::Process& blocker) override {
+ }
+
+ void handle(ClientRequest::PGPipeline::WaitRepop::BlockingEvent& ev,
+ const Operation& op,
+ const ClientRequest::PGPipeline::WaitRepop& blocker) override {
+ }
+
+ void handle(ClientRequest::PGPipeline::SendReply::BlockingEvent& ev,
+ const Operation& op,
+ const ClientRequest::PGPipeline::SendReply& blocker) override {
+ }
+
void handle(ClientRequest::CompletionEvent&,
const Operation&) override {}
};
m->get_min_epoch());
});
}).then([this](epoch_t epoch) {
- return with_blocking_future(handle.enter(cp().get_pg));
+ return enter_stage<>(cp().get_pg);
}).then([this] {
- return with_blocking_future(osd.wait_for_pg(m->get_spg()));
+ return with_blocking_event<PGMap::PGCreationBlockingEvent>(
+ [this] (auto&& trigger) {
+ return osd.wait_for_pg(std::move(trigger), m->get_spg());
+ });
}).then([this](Ref<PG> pgref) mutable {
return interruptor::with_interruption([this, pgref]() mutable {
epoch_t same_interval_since = pgref->get_interval_start_epoch();
return interruptor::now();
});
}
- return with_blocking_future_interruptible<interruptor::condition>(
- handle.enter(pp(pg).await_map)
- ).then_interruptible([this, &pg] {
- return with_blocking_future_interruptible<interruptor::condition>(
- pg.osdmap_gate.wait_for_map(m->get_min_epoch()));
- }).then_interruptible([this, &pg](auto map) {
- return with_blocking_future_interruptible<interruptor::condition>(
- handle.enter(pp(pg).wait_for_active));
+ return enter_stage<interruptor>(
+ pp(pg).await_map
+ ).then_interruptible([this, &pg] {
+ return with_blocking_event<PG_OSDMapGate::OSDMapBlocker::BlockingEvent>(
+ [this, &pg] (auto&& trigger) {
+ return pg.osdmap_gate.wait_for_map(std::move(trigger),
+ m->get_min_epoch());
+ });
+ }).then_interruptible([this, &pg](auto&&) {
+ return enter_stage<>(pp(pg).wait_for_active);
}).then_interruptible([this, &pg]() {
- return with_blocking_future_interruptible<interruptor::condition>(
- pg.wait_for_active_blocker.wait());
+ return with_blocking_event<PGActivationBlocker::BlockingEvent>(
+ [&pg] (auto&& trigger) {
+ return pg.wait_for_active_blocker.wait(std::move(trigger));
+ });
}).then_interruptible([this, pgref=std::move(pgref)]() mutable {
if (is_pg_op()) {
return process_pg_op(pgref).then_interruptible([this] {
ClientRequest::interruptible_future<ClientRequest::seq_mode_t>
ClientRequest::process_op(Ref<PG> &pg)
{
- return with_blocking_future_interruptible<interruptor::condition>(
- handle.enter(pp(*pg).recover_missing))
- .then_interruptible(
+ return enter_stage<interruptor>(
+ pp(*pg).recover_missing
+ ).then_interruptible(
[this, pg]() mutable {
return do_recover_missing(pg, m->get_hobj());
}).then_interruptible([this, pg]() mutable {
return seastar::make_ready_future<seq_mode_t>(seq_mode_t::OUT_OF_ORDER);
});
} else {
- return with_blocking_future_interruptible<interruptor::condition>(
- handle.enter(pp(*pg).get_obc)).then_interruptible(
+ return enter_stage<interruptor>(pp(*pg).get_obc).then_interruptible(
[this, pg]() mutable -> PG::load_obc_iertr::future<seq_mode_t> {
logger().debug("{}: got obc lock", *this);
op_info.set_from_op(&*m, *pg->get_osdmap());
return seastar::do_with(seq_mode_t{}, [this, &pg] (seq_mode_t& mode) {
return pg->with_locked_obc(m->get_hobj(), op_info,
[this, pg, &mode](auto obc) mutable {
- return with_blocking_future_interruptible<interruptor::condition>(
- handle.enter(pp(*pg).process)
- ).then_interruptible([this, pg, obc, &mode]() mutable {
+ return enter_stage<interruptor>(pp(*pg).process).then_interruptible(
+ [this, pg, obc, &mode]() mutable {
return do_process(pg, obc).then_interruptible([&mode] (seq_mode_t _mode) {
mode = _mode;
return seastar::now();
return pg->do_osd_ops(m, obc, op_info).safe_then_unpack_interruptible(
[this, pg](auto submitted, auto all_completed) mutable {
- return submitted.then_interruptible(
- [this, pg] {
- return with_blocking_future_interruptible<interruptor::condition>(
- handle.enter(pp(*pg).wait_repop));
+ return submitted.then_interruptible([this, pg] {
+ return enter_stage<interruptor>(pp(*pg).wait_repop);
}).then_interruptible(
[this, pg, all_completed=std::move(all_completed)]() mutable {
return all_completed.safe_then_interruptible(
[this, pg](MURef<MOSDOpReply> reply) {
- return with_blocking_future_interruptible<interruptor::condition>(
- handle.enter(pp(*pg).send_reply)).then_interruptible(
- [this, reply=std::move(reply)]() mutable{
- return conn->send(std::move(reply)).then([] {
- return seastar::make_ready_future<seq_mode_t>(seq_mode_t::IN_ORDER);
- });
- });
+ return enter_stage<interruptor>(pp(*pg).send_reply).then_interruptible(
+ [this, reply=std::move(reply)]() mutable {
+ return conn->send(std::move(reply)).then([] {
+ return seastar::make_ready_future<seq_mode_t>(seq_mode_t::IN_ORDER);
+ });
+ });
}, crimson::ct_error::eagain::handle([this, pg]() mutable {
return process_op(pg);
}));
#include "crimson/osd/osd_operation.h"
#include "crimson/osd/osd_operations/client_request_common.h"
#include "crimson/osd/osd_operations/common/pg_pipeline.h"
+#include "crimson/osd/pg_activation_blocker.h"
+#include "crimson/osd/pg_map.h"
#include "crimson/common/type_helpers.h"
#include "messages/MOSDOp.h"
StartEvent,
ConnectionPipeline::AwaitMap::BlockingEvent,
OSD_OSDMapGate::OSDMapBlocker::BlockingEvent,
+ ConnectionPipeline::GetPG::BlockingEvent,
+ PGMap::PGCreationBlockingEvent,
+ PGPipeline::AwaitMap::BlockingEvent,
+ PG_OSDMapGate::OSDMapBlocker::BlockingEvent,
+ PGPipeline::WaitForActive::BlockingEvent,
+ PGActivationBlocker::BlockingEvent,
+ PGPipeline::RecoverMissing::BlockingEvent,
+ PGPipeline::GetOBC::BlockingEvent,
+ PGPipeline::Process::BlockingEvent,
+ PGPipeline::WaitRepop::BlockingEvent,
+ PGPipeline::SendReply::BlockingEvent,
CompletionEvent
> tracking_events;