#include "replicated_request.h"
#include "common/Formatter.h"
-#include "messages/MOSDRepOp.h"
#include "crimson/osd/osd.h"
#include "crimson/osd/osd_connection_priv.h"
namespace crimson::osd {
-RepRequest::RepRequest(OSD &osd,
- crimson::net::ConnectionRef&& conn,
+RepRequest::RepRequest(crimson::net::ConnectionRef&& conn,
Ref<MOSDRepOp> &&req)
- : osd{osd},
- conn{std::move(conn)},
+ : conn{std::move(conn)},
req{req}
{}
f->close_section();
}
-ConnectionPipeline &RepRequest::cp()
+ConnectionPipeline &RepRequest::get_connection_pipeline()
{
return get_osd_priv(conn.get()).replicated_request_conn_pipeline;
}
return pg.replicated_request_pg_pipeline;
}
-seastar::future<> RepRequest::start()
+seastar::future<> RepRequest::with_pg(
+ ShardServices &shard_services, Ref<PG> pg)
{
- logger().debug("{} start", *this);
- IRef ref = this;
+ logger().debug("{}: RepRequest::with_pg", *this);
- return enter_stage<>(cp().await_map).then([this] {
- return with_blocking_event<OSD_OSDMapGate::OSDMapBlocker::BlockingEvent>(
- [this] (auto&& trigger) {
- return osd.osdmap_gate.wait_for_map(std::move(trigger),
- req->get_min_epoch());
- });
- }).then([this](epoch_t epoch) {
- return enter_stage<>(cp().get_pg);
- }).then([this] {
- return with_blocking_event<PGMap::PGCreationBlockingEvent>(
- [this] (auto&& trigger) {
- return osd.wait_for_pg(std::move(trigger), req->get_spg());
- });
- }).then([this, ref=std::move(ref)](Ref<PG> pg) {
- return interruptor::with_interruption([this, ref, pg] {
- return pg->handle_rep_op(std::move(req));
- }, [](std::exception_ptr) { return seastar::now(); }, pg);
- });
+ IRef ref = this;
+ return interruptor::with_interruption([this, pg] {
+ return pg->handle_rep_op(std::move(req));
+ }, [ref](std::exception_ptr) { return seastar::now(); }, pg);
}
+
}
#include "crimson/osd/osd_operation.h"
#include "crimson/osd/pg_map.h"
#include "crimson/common/type_helpers.h"
-
-class MOSDRepOp;
+#include "messages/MOSDRepOp.h"
namespace ceph {
class Formatter;
namespace crimson::osd {
+class ShardServices;
+
class OSD;
class PG;
friend RepRequest;
};
static constexpr OperationTypeCode type = OperationTypeCode::replicated_request;
- RepRequest(OSD&, crimson::net::ConnectionRef&&, Ref<MOSDRepOp>&&);
+ RepRequest(crimson::net::ConnectionRef&&, Ref<MOSDRepOp>&&);
void print(std::ostream &) const final;
void dump_detail(ceph::Formatter* f) const final;
- seastar::future<> start();
+
+ static constexpr bool can_create() { return false; }
+ spg_t get_pgid() const {
+ return req->get_spg();
+ }
+ ConnectionPipeline &get_connection_pipeline();
+ PipelineHandle &get_handle() { return handle; }
+ epoch_t get_epoch() const { return req->get_min_epoch(); }
+
+ seastar::future<> with_pg(
+ ShardServices &shard_services, Ref<PG> pg);
std::tuple<
ConnectionPipeline::AwaitActive::BlockingEvent,
> tracking_events;
private:
- ConnectionPipeline &cp();
PGPipeline &pp(PG &pg);
- OSD &osd;
crimson::net::ConnectionRef conn;
Ref<MOSDRepOp> req;
};