osd_operations/peering_event.cc
osd_operations/pg_advance_map.cc
osd_operations/replicated_request.cc
+ osd_operations/replicated_request_reply.cc
osd_operations/logmissing_request.cc
osd_operations/logmissing_request_reply.cc
osd_operations/background_recovery.cc
#include "crimson/osd/osd_operations/pg_advance_map.h"
#include "crimson/osd/osd_operations/recovery_subrequest.h"
#include "crimson/osd/osd_operations/replicated_request.h"
+#include "crimson/osd/osd_operations/replicated_request_reply.h"
#include "crimson/osd/osd_operations/scrub_events.h"
#include "crimson/osd/osd_operation_external_tracking.h"
#include "crimson/crush/CrushLocation.h"
crimson::net::ConnectionRef conn,
Ref<MOSDRepOpReply> m)
{
- LOG_PREFIX(OSD::handle_rep_op_reply);
- spg_t pgid = m->get_spg();
- return pg_shard_manager.with_pg(
- pgid,
- [FNAME, m=std::move(m)](auto &&pg) {
- if (pg) {
- m->finish_decode();
- pg->handle_rep_op_reply(*m);
- } else {
- DEBUG("stale reply: {}", *m);
- }
- return seastar::now();
- });
+ return pg_shard_manager.start_pg_operation_active<ReplicatedRequestReply>(
+ std::move(conn),
+ std::move(m));
}
seastar::future<> OSD::handle_scrub_command(
pg_advance_map,
pg_creation,
replicated_request,
+ replicated_request_reply,
background_recovery,
background_recovery_sub,
internal_client_request,
"pg_advance_map",
"pg_creation",
"replicated_request",
+ "replicated_request_reply",
"background_recovery",
"background_recovery_sub",
"internal_client_request",
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "replicated_request_reply.h"
+
+#include "common/Formatter.h"
+
+#include "crimson/osd/pg.h"
+#include "crimson/osd/replicated_backend.h"
+
+SET_SUBSYS(osd);
+
+namespace crimson::osd {
+
+ReplicatedRequestReply::ReplicatedRequestReply(
+ crimson::net::ConnectionRef&& conn,
+ Ref<MOSDRepOpReply> &&req)
+ : RemoteOperation{std::move(conn)},
+ req{std::move(req)}
+{}
+
+void ReplicatedRequestReply::print(std::ostream& os) const
+{
+ os << "ReplicatedRequestReply("
+ << " req=" << *req
+ << ")";
+}
+
+void ReplicatedRequestReply::dump_detail(Formatter *f) const
+{
+ f->open_object_section("ReplicatedRequestReply");
+ f->dump_stream("pgid") << req->get_spg();
+ f->dump_unsigned("map_epoch", req->get_map_epoch());
+ f->dump_unsigned("min_epoch", req->get_min_epoch());
+ f->close_section();
+}
+
+seastar::future<> ReplicatedRequestReply::with_pg(
+ ShardServices &shard_services, Ref<PG> pgref)
+{
+ LOG_PREFIX(ReplicatedRequestReply::with_pg);
+ DEBUGDPP("{}", *pgref, *this);
+ req->finish_decode();
+ pgref->handle_rep_op_reply(*req);
+ return seastar::now();
+}
+
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include "crimson/net/Connection.h"
+#include "crimson/osd/osd_operation.h"
+#include "crimson/common/type_helpers.h"
+#include "messages/MOSDRepOpReply.h"
+
+namespace ceph {
+ class Formatter;
+}
+
+namespace crimson::osd {
+
+class ShardServices;
+
+class OSD;
+class PG;
+
+class ReplicatedRequestReply final
+ : public OperationT<ReplicatedRequestReply>,
+ public RemoteOperation
+{
+public:
+ static constexpr OperationTypeCode type =
+ OperationTypeCode::replicated_request_reply;
+ ReplicatedRequestReply(crimson::net::ConnectionRef&&, Ref<MOSDRepOpReply>&&);
+
+ void print(std::ostream &) const final;
+ void dump_detail(ceph::Formatter* f) const final;
+
+ spg_t get_pgid() const {
+ return req->get_spg();
+ }
+
+ seastar::future<> with_pg(
+ ShardServices &shard_services, Ref<PG> pg);
+
+ PipelineHandle &get_handle() { return handle; }
+private:
+ PipelineHandle handle;
+ Ref<MOSDRepOpReply> req;
+};
+
+}
+
+#if FMT_VERSION >= 90000
+template <> struct fmt::formatter<crimson::osd::ReplicatedRequestReply> : fmt::ostream_formatter {};
+#endif