object_metadata_helper.cc
ops_executer.cc
osd_operation.cc
+ osd_operations/ecrep_request.cc
osd_operations/client_request.cc
osd_operations/internal_client_request.cc
osd_operations/peering_event.cc
#include "crimson/osd/pg_backend.h"
#include "crimson/osd/pg_meta.h"
#include "crimson/osd/osd_operations/client_request.h"
+#include "crimson/osd/osd_operations/ecrep_request.h"
#include "crimson/osd/osd_operations/peering_event.h"
#include "crimson/osd/osd_operations/pgpct_request.h"
#include "crimson/osd/osd_operations/pg_advance_map.h"
case MSG_OSD_PG_PCT:
return handle_pg_pct(conn, boost::static_pointer_cast<
MOSDPGPCT>(m));
+ case MSG_OSD_EC_WRITE:
+ [[fallthrough]];
+ case MSG_OSD_EC_WRITE_REPLY:
+ [[fallthrough]];
+ case MSG_OSD_EC_READ:
+ [[fallthrough]];
+ case MSG_OSD_EC_READ_REPLY:
+ return handle_some_ec_messages(conn, m);
+#if 0
+ case MSG_OSD_PG_PUSH:
+ [[fallthrough]];
+ case MSG_OSD_PG_PUSH_REPLY:
+ return handle_ec_messages(conn, m);
+#endif
default:
return std::nullopt;
}
}
}
+template <class MessageRefT>
+seastar::future<>
+OSD::handle_some_ec_messages(crimson::net::ConnectionRef conn, MessageRefT&& m)
+{
+ m->decode_payload();
+ //m->set_features(conn->get_features());
+ (void) pg_shard_manager.start_pg_operation<ECRepRequest>(
+ std::move(conn),
+ std::forward<MessageRefT>(m));
+ return seastar::now();
+}
+
+
seastar::future<> OSD::restart()
{
beacon_timer.cancel();
seastar::future<> handle_pg_pct(
crimson::net::ConnectionRef conn,
Ref<MOSDPGPCT> m);
+ template <class MessageRefT>
+ seastar::future<> handle_some_ec_messages(
+ crimson::net::ConnectionRef conn,
+ MessageRefT&& m);
std::vector<DaemonHealthMetric> get_health_metrics();
scrub_reserve_range,
scrub_scan,
pgpct_request,
+ ecrep_request,
last_op
};
"scrub_reserve_range",
"scrub_scan",
"pgpct_request",
+ "ecrep_request",
};
// prevent the addition of OperationTypeCode-s with no matching OP_NAMES entry:
#include "crimson/osd/osdmap_gate.h"
#include "crimson/osd/osd_operations/background_recovery.h"
#include "crimson/osd/osd_operations/client_request.h"
+#include "crimson/osd/osd_operations/ecrep_request.h"
#include "crimson/osd/osd_operations/peering_event.h"
#include "crimson/osd/osd_operations/pgpct_request.h"
#include "crimson/osd/osd_operations/pg_advance_map.h"
namespace crimson {
+template <>
+struct EventBackendRegistry<osd::ECRepRequest> {
+ static std::tuple<> get_backends() {
+ return {/* no extenral backends */};
+ }
+};
+
template <>
struct EventBackendRegistry<osd::ClientRequest> {
static std::tuple<osd::LttngBackend, osd::HistoricBackend> get_backends() {
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "ecrep_request.h"
+
+#include "common/Formatter.h"
+
+#include "crimson/osd/osd.h"
+#include "crimson/osd/osd_connection_priv.h"
+#include "crimson/osd/osd_operation_external_tracking.h"
+#include "crimson/osd/pg.h"
+
+namespace {
+ seastar::logger& logger() {
+ return crimson::get_logger(ceph_subsys_osd);
+ }
+}
+
+namespace crimson::osd {
+
+void ECRepRequest::print(std::ostream& os) const
+{
+ os << "ECRepRequest("
+ << ")";
+}
+
+void ECRepRequest::dump_detail(Formatter *f) const
+{
+#if 0
+ f->open_object_section("ECRepRequest");
+ f->dump_stream("req_tid") << req->get_tid();
+ f->dump_stream("pgid") << get_pgid();
+ f->dump_unsigned("map_epoch", req->get_map_epoch());
+ f->dump_unsigned("min_epoch", req->get_min_epoch());
+ f->close_section();
+#endif
+}
+
+ConnectionPipeline &ECRepRequest::get_connection_pipeline()
+{
+ return get_osd_priv(&get_local_connection()
+ ).replicated_request_conn_pipeline;
+}
+
+PerShardPipeline &ECRepRequest::get_pershard_pipeline(
+ ShardServices &shard_services)
+{
+ return shard_services.get_replicated_request_pipeline();
+}
+
+// from https://en.cppreference.com/w/cpp/utility/variant/visit
+// helper type for the visitor #4
+template<class... Ts>
+struct overloaded : Ts... { using Ts::operator()...; };
+// explicit deduction guide (not needed as of C++20)
+template<class... Ts>
+overloaded(Ts...) -> overloaded<Ts...>;
+
+seastar::future<> ECRepRequest::with_pg(
+ ShardServices &shard_services, Ref<PG> pg)
+{
+ logger().debug("{}: ECRepRequest::with_pg", *this);
+
+ IRef ref = this;
+ return interruptor::with_interruption([this, pg] {
+ return interruptor::now();
+ }, [ref, this](std::exception_ptr) {
+ logger().debug("{}: ECRepRequest::exception handling", *this);
+ return seastar::now();
+ }, pg, pg->get_osdmap_epoch());
+}
+
+}
+
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <variant>
+
+#include "crimson/net/Connection.h"
+#include "crimson/osd/osdmap_gate.h"
+#include "crimson/osd/osd_operation.h"
+#include "crimson/osd/osd_operations/client_request.h"
+#include "crimson/osd/pg_map.h"
+#include "crimson/common/type_helpers.h"
+
+
+namespace ceph {
+ class Formatter;
+}
+
+namespace crimson::osd {
+
+class ShardServices;
+
+class OSD;
+class PG;
+
+class ECRepRequest final : public PhasedOperationT<ECRepRequest> {
+public:
+ static constexpr OperationTypeCode type = OperationTypeCode::ecrep_request;
+
+ template <class MessageRefT>
+ ECRepRequest(crimson::net::ConnectionRef&& conn,
+ MessageRefT &&req)
+ : l_conn{std::move(conn)},
+ req{std::forward<MessageRefT>(req)}
+ {}
+
+ void print(std::ostream &) const final;
+ void dump_detail(ceph::Formatter* f) const final;
+
+ static constexpr bool can_create() { return false; }
+ spg_t get_pgid() const {
+ return std::visit([] (const auto& concrete_req) {
+ return concrete_req->get_spg();
+ }, req);
+ }
+ epoch_t get_epoch() const {
+ return std::visit([] (const auto& concrete_req) {
+ return concrete_req->get_min_epoch();
+ }, req);
+ }
+
+ epoch_t get_epoch_sent_at() const {
+ return std::visit([] (const auto& concrete_req) {
+ return concrete_req->get_map_epoch();
+ }, req);
+ }
+
+ PipelineHandle &get_handle() { return handle; }
+
+ ConnectionPipeline &get_connection_pipeline();
+
+ PerShardPipeline &get_pershard_pipeline(ShardServices &);
+
+ crimson::net::Connection &get_local_connection() {
+ assert(l_conn);
+ assert(!r_conn);
+ return *l_conn;
+ };
+
+ crimson::net::Connection &get_foreign_connection() {
+ assert(r_conn);
+ assert(!l_conn);
+ return *r_conn;
+ };
+
+ crimson::net::ConnectionFFRef prepare_remote_submission() {
+ assert(l_conn);
+ assert(!r_conn);
+ auto ret = seastar::make_foreign(std::move(l_conn));
+ l_conn.reset();
+ return ret;
+ }
+ void finish_remote_submission(crimson::net::ConnectionFFRef conn) {
+ assert(conn);
+ assert(!l_conn);
+ assert(!r_conn);
+ r_conn = make_local_shared_foreign(std::move(conn));
+ }
+
+ seastar::future<> with_pg(
+ ShardServices &shard_services, Ref<PG> pg);
+
+ std::tuple<
+ StartEvent,
+ ConnectionPipeline::AwaitActive::BlockingEvent,
+ ConnectionPipeline::AwaitMap::BlockingEvent,
+ ConnectionPipeline::GetPGMapping::BlockingEvent,
+ PerShardPipeline::CreateOrWaitPG::BlockingEvent,
+ PGMap::PGCreationBlockingEvent,
+ OSD_OSDMapGate::OSDMapBlocker::BlockingEvent
+ > tracking_events;
+
+private:
+ crimson::net::ConnectionRef l_conn;
+ crimson::net::ConnectionXcoreRef r_conn;
+ // must be after `conn` to ensure the ConnectionPipeline's is alive
+ PipelineHandle handle;
+ std::variant<
+ Ref<MOSDECSubOpWrite>,
+ Ref<MOSDECSubOpWriteReply>,
+ Ref<MOSDECSubOpRead>,
+ Ref<MOSDECSubOpReadReply>
+ > req;
+};
+
+}
+
+#if FMT_VERSION >= 90000
+template <> struct fmt::formatter<crimson::osd::ECRepRequest> : fmt::ostream_formatter {};
+#endif
PglogBasedRecovery* pglog_based_recovery_op = nullptr;
friend std::ostream& operator<<(std::ostream&, const PG& pg);
+ friend class ECRepRequest;
friend class ClientRequest;
friend struct CommonClientRequest;
friend class PGAdvanceMap;