seastar::future<> OSD::handle_recovery_subreq(crimson::net::ConnectionRef conn,
Ref<MOSDFastDispatchOp> m)
{
- (void) shard_services.start_operation<RecoverySubRequest>(
- *this,
- conn,
- std::move(m));
+ std::ignore = start_pg_operation<RecoverySubRequest>(conn, std::move(m));
return seastar::now();
}
#include "crimson/osd/osd_operations/recovery_subrequest.h"
#include "crimson/osd/pg.h"
+#include "crimson/osd/osd_connection_priv.h"
namespace {
seastar::logger& logger() {
namespace crimson::osd {
-seastar::future<> RecoverySubRequest::start() {
- logger().debug("{}: start", *this);
+seastar::future<> RecoverySubRequest::with_pg(
+ ShardServices &shard_services, Ref<PG> pgref)
+{
+ logger().debug("{}: {}", "RecoverySubRequest::with_pg", *this);
track_event<StartEvent>();
IRef opref = this;
- using OSDMapBlockingEvent =
- OSD_OSDMapGate::OSDMapBlocker::BlockingEvent;
- return with_blocking_event<OSDMapBlockingEvent>(
- [this] (auto&& trigger) {
- return osd.osdmap_gate.wait_for_map(std::move(trigger), m->get_min_epoch());
- }).then([this] (epoch_t epoch) {
- return with_blocking_event<PGMap::PGCreationBlockingEvent>(
- [this] (auto&& trigger) {
- return osd.wait_for_pg(std::move(trigger), m->get_spg());
- });
- }).then([this, opref=std::move(opref)] (Ref<PG> pgref) {
- return interruptor::with_interruption([this, opref, pgref] {
- return seastar::do_with(std::move(pgref), std::move(opref),
- [this](auto& pgref, auto& opref) {
- return pgref->get_recovery_backend()->handle_recovery_op(m);
- });
- }, [](std::exception_ptr) { return seastar::now(); }, pgref);
- }).then([this] {
+ return interruptor::with_interruption([this, pgref] {
+ return pgref->get_recovery_backend()->handle_recovery_op(m);
+ }, [](std::exception_ptr) {
+ return seastar::now();
+ }, pgref).finally([this, opref, pgref] {
track_event<CompletionEvent>();
});
}
+ConnectionPipeline &RecoverySubRequest::get_connection_pipeline()
+{
+ return get_osd_priv(conn.get()).peering_request_conn_pipeline;
+}
+
}
#include "osd/osd_op_util.h"
#include "crimson/net/Connection.h"
#include "crimson/osd/osd_operation.h"
-#include "crimson/osd/osd.h"
+#include "crimson/osd/pg.h"
#include "crimson/common/type_helpers.h"
#include "messages/MOSDFastDispatchOp.h"
namespace crimson::osd {
-class OSD;
class PG;
-class RecoverySubRequest final : public TrackableOperationT<RecoverySubRequest> {
+class RecoverySubRequest final : public PhasedOperationT<RecoverySubRequest> {
public:
- static constexpr OperationTypeCode type = OperationTypeCode::background_recovery_sub;
+ static constexpr OperationTypeCode type =
+ OperationTypeCode::background_recovery_sub;
- RecoverySubRequest(OSD &osd, crimson::net::ConnectionRef conn, Ref<MOSDFastDispatchOp>&& m)
- : osd(osd), conn(conn), m(m) {}
+ RecoverySubRequest(
+ crimson::net::ConnectionRef conn,
+ Ref<MOSDFastDispatchOp>&& m)
+ : conn(conn), m(m) {}
void print(std::ostream& out) const final
{
{
}
- seastar::future<> start();
-private:
- OSD& osd;
- crimson::net::ConnectionRef conn;
- Ref<MOSDFastDispatchOp> m;
+ static constexpr bool can_create() { return false; }
+ spg_t get_pgid() const {
+ return m->get_spg();
+ }
+ ConnectionPipeline &get_connection_pipeline();
+ PipelineHandle &get_handle() { return handle; }
+ epoch_t get_epoch() const { return m->get_min_epoch(); }
+
+ seastar::future<> with_pg(
+ ShardServices &shard_services, Ref<PG> pg);
-public:
std::tuple<
StartEvent,
- OSD_OSDMapGate::OSDMapBlocker::BlockingEvent,
+ ConnectionPipeline::AwaitActive::BlockingEvent,
+ ConnectionPipeline::AwaitMap::BlockingEvent,
+ ConnectionPipeline::GetPG::BlockingEvent,
PGMap::PGCreationBlockingEvent,
+ OSD_OSDMapGate::OSDMapBlocker::BlockingEvent,
CompletionEvent
> tracking_events;
+
+private:
+ crimson::net::ConnectionRef conn;
+ Ref<MOSDFastDispatchOp> m;
};
}