void ClientRequest::Orderer::requeue(
ShardServices &shard_services, Ref<PG> pg)
{
- for (auto &req: list) {
- req.handle.exit();
- }
for (auto &req: list) {
logger().debug("{}: {} requeueing {}", __func__, *pg, req);
+ req.reset_instance_handle();
std::ignore = req.with_pg_int(shard_services, pg);
}
}
void ClientRequest::complete_request()
{
track_event<CompletionEvent>();
- handle.exit();
on_complete.set_value();
}
OSD &osd, crimson::net::ConnectionRef conn, Ref<MOSDOp> &&m)
: osd(osd),
conn(std::move(conn)),
- m(std::move(m))
+ m(std::move(m)),
+ instance_handle(seastar::make_lw_shared<instance_handle_t>())
{}
ClientRequest::~ClientRequest()
}
const auto this_instance_id = instance_id++;
OperationRef opref{this};
+ auto instance_handle = get_instance_handle();
+ auto &ihref = *instance_handle;
return interruptor::with_interruption(
- [this, pgref, this_instance_id]() mutable {
+ [this, pgref, this_instance_id, &ihref]() mutable {
PG &pg = *pgref;
if (pg.can_discard_op(*m)) {
return osd.send_incremental_map(
return interruptor::now();
});
}
- return enter_stage<interruptor>(pp(pg).await_map
- ).then_interruptible([this, this_instance_id, &pg] {
+ return ihref.enter_stage<interruptor>(pp(pg).await_map, *this
+ ).then_interruptible([this, this_instance_id, &pg, &ihref] {
logger().debug("{}.{}: after await_map stage", *this, this_instance_id);
- return with_blocking_event<
- PG_OSDMapGate::OSDMapBlocker::BlockingEvent
- >([this, &pg] (auto&& trigger) {
- return pg.osdmap_gate.wait_for_map(
- std::move(trigger),
- m->get_min_epoch());
- });
- }).then_interruptible([this, this_instance_id, &pg](auto map) {
+ return ihref.enter_blocker(
+ *this, pg.osdmap_gate, &decltype(pg.osdmap_gate)::wait_for_map,
+ m->get_min_epoch(), nullptr);
+ }).then_interruptible([this, this_instance_id, &pg, &ihref](auto map) {
logger().debug("{}.{}: after wait_for_map", *this, this_instance_id);
- return enter_stage<interruptor>(pp(pg).wait_for_active);
- }).then_interruptible([this, this_instance_id, &pg]() {
+ return ihref.enter_stage<interruptor>(pp(pg).wait_for_active, *this);
+ }).then_interruptible([this, this_instance_id, &pg, &ihref]() {
logger().debug(
"{}.{}: after wait_for_active stage", *this, this_instance_id);
- return with_blocking_event<
- PGActivationBlocker::BlockingEvent
- >([&pg] (auto&& trigger) {
- return pg.wait_for_active_blocker.wait(std::move(trigger));
- });
- }).then_interruptible([this, pgref, this_instance_id]() mutable
+ return ihref.enter_blocker(
+ *this,
+ pg.wait_for_active_blocker,
+ &decltype(pg.wait_for_active_blocker)::wait);
+ }).then_interruptible([this, pgref, this_instance_id, &ihref]() mutable
-> interruptible_future<> {
logger().debug(
"{}.{}: after wait_for_active", *this, this_instance_id);
return process_pg_op(pgref);
} else {
return process_op(
+ ihref,
pgref
).then_interruptible([](auto){});
}
}, [this, this_instance_id, pgref](std::exception_ptr eptr) {
// TODO: better debug output
logger().debug("{}.{}: interrupted {}", *this, this_instance_id, eptr);
- }, pgref).finally([opref=std::move(opref), pgref=std::move(pgref)] {});
+ }, pgref).finally(
+ [opref=std::move(opref), pgref=std::move(pgref),
+ instance_handle=std::move(instance_handle), &ihref] {
+ ihref.handle.exit();
+ });
}
seastar::future<> ClientRequest::with_pg(
}
ClientRequest::interruptible_future<ClientRequest::seq_mode_t>
-ClientRequest::process_op(Ref<PG> &pg)
+ClientRequest::process_op(instance_handle_t &ihref, Ref<PG> &pg)
{
- return enter_stage<interruptor>(
- pp(*pg).recover_missing
+ return ihref.enter_stage<interruptor>(
+ pp(*pg).recover_missing,
+ *this
).then_interruptible(
[this, pg]() mutable {
return do_recover_missing(pg, m->get_hobj());
- }).then_interruptible([this, pg]() mutable {
+ }).then_interruptible([this, pg, &ihref]() mutable {
return pg->already_complete(m->get_reqid()).then_unpack_interruptible(
- [this, pg](bool completed, int ret) mutable
+ [this, pg, &ihref](bool completed, int ret) mutable
-> PG::load_obc_iertr::future<seq_mode_t> {
if (completed) {
auto reply = crimson::make_message<MOSDOpReply>(
return seastar::make_ready_future<seq_mode_t>(seq_mode_t::OUT_OF_ORDER);
});
} else {
- return enter_stage<interruptor>(pp(*pg).get_obc).then_interruptible(
- [this, pg]() mutable -> PG::load_obc_iertr::future<seq_mode_t> {
+ return ihref.enter_stage<interruptor>(pp(*pg).get_obc, *this
+ ).then_interruptible(
+ [this, pg, &ihref]() mutable -> PG::load_obc_iertr::future<seq_mode_t> {
logger().debug("{}: got obc lock", *this);
op_info.set_from_op(&*m, *pg->get_osdmap());
// XXX: `do_with()` is just a workaround for `with_obc_func_t` imposing
// `future<void>`.
- return seastar::do_with(seq_mode_t{}, [this, &pg] (seq_mode_t& mode) {
- return pg->with_locked_obc(m->get_hobj(), op_info,
- [this, pg, &mode](auto obc) mutable {
- return enter_stage<interruptor>(pp(*pg).process).then_interruptible(
- [this, pg, obc, &mode]() mutable {
- return do_process(pg, obc).then_interruptible([&mode] (seq_mode_t _mode) {
- mode = _mode;
- return seastar::now();
- });
- });
- }).safe_then_interruptible([&mode] {
- return PG::load_obc_iertr::make_ready_future<seq_mode_t>(mode);
- });
- });
- });
+ return seastar::do_with(
+ seq_mode_t{},
+ [this, &pg, &ihref](seq_mode_t& mode) {
+ return pg->with_locked_obc(
+ m->get_hobj(), op_info,
+ [this, pg, &mode, &ihref](auto obc) mutable {
+ return ihref.enter_stage<interruptor>(pp(*pg).process, *this
+ ).then_interruptible(
+ [this, pg, obc, &mode, &ihref]() mutable {
+ return do_process(ihref, pg, obc
+ ).then_interruptible([&mode] (seq_mode_t _mode) {
+ mode = _mode;
+ return seastar::now();
+ });
+ });
+ }).safe_then_interruptible([&mode] {
+ return PG::load_obc_iertr::make_ready_future<seq_mode_t>(mode);
+ });
+ });
+ });
}
- });
+ });
}).safe_then_interruptible([pg=std::move(pg)] (const seq_mode_t mode) {
return seastar::make_ready_future<seq_mode_t>(mode);
}, PG::load_obc_ertr::all_same_way([](auto &code) {
}
ClientRequest::interruptible_future<ClientRequest::seq_mode_t>
-ClientRequest::do_process(Ref<PG>& pg, crimson::osd::ObjectContextRef obc)
+ClientRequest::do_process(
+ instance_handle_t &ihref,
+ Ref<PG>& pg, crimson::osd::ObjectContextRef obc)
{
if (!pg->is_primary()) {
// primary can handle both normal ops and balanced reads
}
return pg->do_osd_ops(m, obc, op_info).safe_then_unpack_interruptible(
- [this, pg](auto submitted, auto all_completed) mutable {
- return submitted.then_interruptible([this, pg] {
- return enter_stage<interruptor>(pp(*pg).wait_repop);
+ [this, pg, &ihref](auto submitted, auto all_completed) mutable {
+ return submitted.then_interruptible([this, pg, &ihref] {
+ return ihref.enter_stage<interruptor>(pp(*pg).wait_repop, *this);
}).then_interruptible(
- [this, pg, all_completed=std::move(all_completed)]() mutable {
+ [this, pg, all_completed=std::move(all_completed), &ihref]() mutable {
return all_completed.safe_then_interruptible(
- [this, pg](MURef<MOSDOpReply> reply) {
- return enter_stage<interruptor>(pp(*pg).send_reply).then_interruptible(
+ [this, pg, &ihref](MURef<MOSDOpReply> reply) {
+ return ihref.enter_stage<interruptor>(pp(*pg).send_reply, *this
+ ).then_interruptible(
[this, reply=std::move(reply)]() mutable {
return conn->send(std::move(reply)).then([] {
return seastar::make_ready_future<seq_mode_t>(seq_mode_t::IN_ORDER);
});
});
- }, crimson::ct_error::eagain::handle([this, pg]() mutable {
- return process_op(pg);
+ }, crimson::ct_error::eagain::handle([this, pg, &ihref]() mutable {
+ return process_op(ihref, pg);
}));
});
- }, crimson::ct_error::eagain::handle([this, pg]() mutable {
- return process_op(pg);
+ }, crimson::ct_error::eagain::handle([this, pg, &ihref]() mutable {
+ return process_op(ihref, pg);
}));
}
#include "crimson/osd/pg_activation_blocker.h"
#include "crimson/osd/pg_map.h"
#include "crimson/common/type_helpers.h"
+#include "crimson/common/utility.h"
#include "messages/MOSDOp.h"
namespace crimson::osd {
OSD &osd;
const crimson::net::ConnectionRef conn;
// must be after conn due to ConnectionPipeline's life-time
- PipelineHandle handle;
Ref<MOSDOp> m;
OpInfo op_info;
seastar::promise<> on_complete;
friend class HistoricBackend;
};
+ /**
+ * instance_handle_t
+ *
+ * Client request is, at present, the only Operation which can be requeued.
+ * This is, mostly, fine. However, reusing the PipelineHandle or
+ * BlockingEvent structures before proving that the prior instance has stopped
+ * can create hangs or crashes due to violations of the BlockerT and
+ * PipelineHandle invariants.
+ *
+ * To solve this, we create an instance_handle_t which contains the events
+ * for the portion of execution that can be rerun as well as the
+ * PipelineHandle. ClientRequest::with_pg_int grabs a reference to the current
+ * instance_handle_t and releases its PipelineHandle in the finally block.
+ * On requeue, we create a new instance_handle_t with a fresh PipelineHandle
+ * and events tuple and use it and use it for the next invocation of
+ * with_pg_int.
+ */
+ std::tuple<
+ StartEvent,
+ ConnectionPipeline::AwaitActive::BlockingEvent,
+ ConnectionPipeline::AwaitMap::BlockingEvent,
+ OSD_OSDMapGate::OSDMapBlocker::BlockingEvent,
+ ConnectionPipeline::GetPG::BlockingEvent,
+ PGMap::PGCreationBlockingEvent,
+ CompletionEvent
+ > tracking_events;
+
+ class instance_handle_t
+ : public seastar::enable_lw_shared_from_this<instance_handle_t> {
+ public:
+ using ref_t = seastar::lw_shared_ptr<instance_handle_t>;
+ PipelineHandle handle;
+
+ std::tuple<
+ PGPipeline::AwaitMap::BlockingEvent,
+ PG_OSDMapGate::OSDMapBlocker::BlockingEvent,
+ PGPipeline::WaitForActive::BlockingEvent,
+ PGActivationBlocker::BlockingEvent,
+ PGPipeline::RecoverMissing::BlockingEvent,
+ PGPipeline::GetOBC::BlockingEvent,
+ PGPipeline::Process::BlockingEvent,
+ PGPipeline::WaitRepop::BlockingEvent,
+ PGPipeline::SendReply::BlockingEvent,
+ CompletionEvent
+ > pg_tracking_events;
+
+ template <typename BlockingEventT, typename InterruptorT=void, typename F>
+ auto with_blocking_event(F &&f, ClientRequest &op) {
+ auto ret = std::forward<F>(f)(
+ typename BlockingEventT::template Trigger<ClientRequest>{
+ std::get<BlockingEventT>(pg_tracking_events), op
+ });
+ if constexpr (std::is_same_v<InterruptorT, void>) {
+ return ret;
+ } else {
+ using ret_t = decltype(ret);
+ return typename InterruptorT::template futurize_t<ret_t>{std::move(ret)};
+ }
+ }
+
+ template <typename InterruptorT=void, typename StageT>
+ auto enter_stage(StageT &stage, ClientRequest &op) {
+ return this->template with_blocking_event<
+ typename StageT::BlockingEvent,
+ InterruptorT>(
+ [&stage, this](auto &&trigger) {
+ return handle.template enter<ClientRequest>(
+ stage, std::move(trigger));
+ }, op);
+ }
+
+ template <
+ typename InterruptorT=void, typename BlockingObj, typename Method,
+ typename... Args>
+ auto enter_blocker(
+ ClientRequest &op, BlockingObj &obj, Method method, Args&&... args) {
+ return this->template with_blocking_event<
+ typename BlockingObj::Blocker::BlockingEvent,
+ InterruptorT>(
+ [&obj, method,
+ args=std::forward_as_tuple(std::move(args)...)](auto &&trigger) mutable {
+ return apply_method_to_tuple(
+ obj, method,
+ std::tuple_cat(
+ std::forward_as_tuple(std::move(trigger)),
+ std::move(args))
+ );
+ }, op);
+ }
+ };
+ instance_handle_t::ref_t instance_handle;
+ void reset_instance_handle() {
+ instance_handle = seastar::make_lw_shared<instance_handle_t>();
+ }
+ auto get_instance_handle() { return instance_handle; }
+
using ordering_hook_t = boost::intrusive::list_member_hook<>;
ordering_hook_t ordering_hook;
class Orderer {
return m->get_spg();
}
ConnectionPipeline &get_connection_pipeline();
- PipelineHandle &get_handle() { return handle; }
+ PipelineHandle &get_handle() { return instance_handle->handle; }
epoch_t get_epoch() const { return m->get_min_epoch(); }
seastar::future<> with_pg_int(
};
interruptible_future<seq_mode_t> do_process(
+ instance_handle_t &ihref,
Ref<PG>& pg,
crimson::osd::ObjectContextRef obc);
::crimson::interruptible::interruptible_future<
Ref<PG> &pg);
::crimson::interruptible::interruptible_future<
::crimson::osd::IOInterruptCondition, seq_mode_t> process_op(
- Ref<PG> &pg);
+ instance_handle_t &ihref,
+ Ref<PG> &pg);
bool is_pg_op() const;
ConnectionPipeline &cp();
bool is_misdirected(const PG& pg) const;
public:
- std::tuple<
- StartEvent,
- ConnectionPipeline::AwaitActive::BlockingEvent,
- ConnectionPipeline::AwaitMap::BlockingEvent,
- OSD_OSDMapGate::OSDMapBlocker::BlockingEvent,
- ConnectionPipeline::GetPG::BlockingEvent,
- PGMap::PGCreationBlockingEvent,
- PGPipeline::AwaitMap::BlockingEvent,
- PG_OSDMapGate::OSDMapBlocker::BlockingEvent,
- PGPipeline::WaitForActive::BlockingEvent,
- PGActivationBlocker::BlockingEvent,
- PGPipeline::RecoverMissing::BlockingEvent,
- PGPipeline::GetOBC::BlockingEvent,
- PGPipeline::Process::BlockingEvent,
- PGPipeline::WaitRepop::BlockingEvent,
- PGPipeline::SendReply::BlockingEvent,
- CompletionEvent
- > tracking_events;
friend class LttngBackend;
friend class HistoricBackend;