From 5c9a6ee8a321a1b3c60c3dcca0e56ecfd886a231 Mon Sep 17 00:00:00 2001 From: chunmei-liu Date: Tue, 24 May 2022 22:50:25 -0700 Subject: [PATCH] crimson/osd: add LogMissing support Signed-off-by: chunmei-liu --- src/crimson/osd/CMakeLists.txt | 2 + src/crimson/osd/osd.cc | 30 ++++++++ src/crimson/osd/osd.h | 6 +- src/crimson/osd/osd_operation.h | 4 ++ .../osd/osd_operation_external_tracking.h | 15 ++++ .../osd/osd_operations/logmissing_request.cc | 68 ++++++++++++++++++ .../osd/osd_operations/logmissing_request.h | 68 ++++++++++++++++++ .../logmissing_request_reply.cc | 68 ++++++++++++++++++ .../osd_operations/logmissing_request_reply.h | 68 ++++++++++++++++++ src/crimson/osd/pg.cc | 72 +++++++++++++++++++ src/crimson/osd/pg.h | 13 ++++ 11 files changed, 413 insertions(+), 1 deletion(-) create mode 100644 src/crimson/osd/osd_operations/logmissing_request.cc create mode 100644 src/crimson/osd/osd_operations/logmissing_request.h create mode 100644 src/crimson/osd/osd_operations/logmissing_request_reply.cc create mode 100644 src/crimson/osd/osd_operations/logmissing_request_reply.h diff --git a/src/crimson/osd/CMakeLists.txt b/src/crimson/osd/CMakeLists.txt index 4313075a8d6..389d2243ae3 100644 --- a/src/crimson/osd/CMakeLists.txt +++ b/src/crimson/osd/CMakeLists.txt @@ -21,6 +21,8 @@ add_executable(crimson-osd osd_operations/peering_event.cc osd_operations/pg_advance_map.cc osd_operations/replicated_request.cc + osd_operations/logmissing_request.cc + osd_operations/logmissing_request_reply.cc osd_operations/background_recovery.cc osd_operations/recovery_subrequest.cc pg_recovery.cc diff --git a/src/crimson/osd/osd.cc b/src/crimson/osd/osd.cc index 4db655e81e9..3c06217ef3f 100644 --- a/src/crimson/osd/osd.cc +++ b/src/crimson/osd/osd.cc @@ -22,6 +22,8 @@ #include "messages/MOSDMarkMeDown.h" #include "messages/MOSDOp.h" #include "messages/MOSDPeeringOp.h" +#include "messages/MOSDPGUpdateLogMissing.h" +#include "messages/MOSDPGUpdateLogMissingReply.h" #include "messages/MOSDRepOpReply.h" #include "messages/MOSDScrub2.h" #include "messages/MPGStats.h" @@ -781,6 +783,12 @@ OSD::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m) return handle_rep_op_reply(conn, boost::static_pointer_cast(m)); case MSG_OSD_SCRUB2: return handle_scrub(conn, boost::static_pointer_cast(m)); + case MSG_OSD_PG_UPDATE_LOG_MISSING: + return handle_update_log_missing(conn, boost::static_pointer_cast< + MOSDPGUpdateLogMissing>(m)); + case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY: + return handle_update_log_missing_reply(conn, boost::static_pointer_cast< + MOSDPGUpdateLogMissingReply>(m)); default: dispatched = false; return seastar::now(); @@ -1209,6 +1217,28 @@ seastar::future<> OSD::handle_osd_op(crimson::net::ConnectionRef conn, return seastar::now(); } +seastar::future<> OSD::handle_update_log_missing( + crimson::net::ConnectionRef conn, + Ref m) +{ + m->decode_payload(); + (void) start_pg_operation( + std::move(conn), + std::move(m)); + return seastar::now(); +} + +seastar::future<> OSD::handle_update_log_missing_reply( + crimson::net::ConnectionRef conn, + Ref m) +{ + m->decode_payload(); + (void) start_pg_operation( + std::move(conn), + std::move(m)); + return seastar::now(); +} + seastar::future<> OSD::send_incremental_map(crimson::net::ConnectionRef conn, epoch_t first) { diff --git a/src/crimson/osd/osd.h b/src/crimson/osd/osd.h index 9902459bf6c..b1920352791 100644 --- a/src/crimson/osd/osd.h +++ b/src/crimson/osd/osd.h @@ -207,7 +207,11 @@ private: seastar::future<> handle_command(crimson::net::ConnectionRef conn, Ref m); seastar::future<> start_asok_admin(); - + seastar::future<> handle_update_log_missing(crimson::net::ConnectionRef conn, + Ref m); + seastar::future<> handle_update_log_missing_reply( + crimson::net::ConnectionRef conn, + Ref m); public: OSD_OSDMapGate osdmap_gate; diff --git a/src/crimson/osd/osd_operation.h b/src/crimson/osd/osd_operation.h index 2f9a69d39e6..323f7bb5939 100644 --- a/src/crimson/osd/osd_operation.h +++ b/src/crimson/osd/osd_operation.h @@ -44,6 +44,8 @@ enum class OperationTypeCode { background_recovery_sub, internal_client_request, historic_client_request, + logmissing_request, + logmissing_request_reply, last_op }; @@ -58,6 +60,8 @@ static constexpr const char* const OP_NAMES[] = { "background_recovery_sub", "internal_client_request", "historic_client_request", + "logmissing_request", + "logmissing_request_reply", }; // prevent the addition of OperationTypeCode-s with no matching OP_NAMES entry: diff --git a/src/crimson/osd/osd_operation_external_tracking.h b/src/crimson/osd/osd_operation_external_tracking.h index bfe0b41f676..b77d0101e42 100644 --- a/src/crimson/osd/osd_operation_external_tracking.h +++ b/src/crimson/osd/osd_operation_external_tracking.h @@ -279,6 +279,21 @@ struct EventBackendRegistry { } }; + +template <> +struct EventBackendRegistry { + static std::tuple<> get_backends() { + return {/* no extenral backends */}; + } +}; + +template <> +struct EventBackendRegistry { + static std::tuple<> get_backends() { + return {/* no extenral backends */}; + } +}; + template <> struct EventBackendRegistry { static std::tuple<> get_backends() { diff --git a/src/crimson/osd/osd_operations/logmissing_request.cc b/src/crimson/osd/osd_operations/logmissing_request.cc new file mode 100644 index 00000000000..84efcfa9f6a --- /dev/null +++ b/src/crimson/osd/osd_operations/logmissing_request.cc @@ -0,0 +1,68 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "logmissing_request.h" + +#include "common/Formatter.h" + +#include "crimson/osd/osd.h" +#include "crimson/osd/osd_connection_priv.h" +#include "crimson/osd/osd_operation_external_tracking.h" +#include "crimson/osd/pg.h" + +namespace { + seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_osd); + } +} + +namespace crimson::osd { + +LogMissingRequest::LogMissingRequest(crimson::net::ConnectionRef&& conn, + Ref &&req) + : conn{std::move(conn)}, + req{std::move(req)} +{} + +void LogMissingRequest::print(std::ostream& os) const +{ + os << "LogMissingRequest(" + << "from=" << req->from + << " req=" << *req + << ")"; +} + +void LogMissingRequest::dump_detail(Formatter *f) const +{ + f->open_object_section("LogMissingRequest"); + f->dump_stream("req_tid") << req->get_tid(); + f->dump_stream("pgid") << req->get_spg(); + f->dump_unsigned("map_epoch", req->get_map_epoch()); + f->dump_unsigned("min_epoch", req->get_min_epoch()); + f->dump_stream("entries") << req->entries; + f->dump_stream("from") << req->from; + f->close_section(); +} + +ConnectionPipeline &LogMissingRequest::get_connection_pipeline() +{ + return get_osd_priv(conn.get()).replicated_request_conn_pipeline; +} + +RepRequest::PGPipeline &LogMissingRequest::pp(PG &pg) +{ + return pg.replicated_request_pg_pipeline; +} + +seastar::future<> LogMissingRequest::with_pg( + ShardServices &shard_services, Ref pg) +{ + logger().debug("{}: LogMissingRequest::with_pg", *this); + + IRef ref = this; + return interruptor::with_interruption([this, pg] { + return pg->do_update_log_missing(std::move(req)); + }, [ref](std::exception_ptr) { return seastar::now(); }, pg); +} + +} diff --git a/src/crimson/osd/osd_operations/logmissing_request.h b/src/crimson/osd/osd_operations/logmissing_request.h new file mode 100644 index 00000000000..6fad844b678 --- /dev/null +++ b/src/crimson/osd/osd_operations/logmissing_request.h @@ -0,0 +1,68 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include "crimson/net/Connection.h" +#include "crimson/osd/osdmap_gate.h" +#include "crimson/osd/osd_operation.h" +#include "crimson/osd/osd_operations/replicated_request.h" +#include "crimson/osd/pg_map.h" +#include "crimson/common/type_helpers.h" +#include "messages/MOSDPGUpdateLogMissing.h" + +namespace ceph { + class Formatter; +} + +namespace crimson::osd { + +class ShardServices; + +class OSD; +class PG; + +class LogMissingRequest final : public PhasedOperationT { +public: + class PGPipeline { + struct AwaitMap : OrderedExclusivePhaseT { + static constexpr auto type_name = "LogMissingRequest::PGPipeline::await_map"; + } await_map; + struct Process : OrderedExclusivePhaseT { + static constexpr auto type_name = "LogMissingRequest::PGPipeline::process"; + } process; + friend LogMissingRequest; + }; + static constexpr OperationTypeCode type = OperationTypeCode::logmissing_request; + LogMissingRequest(crimson::net::ConnectionRef&&, Ref&&); + + void print(std::ostream &) const final; + void dump_detail(ceph::Formatter* f) const final; + + static constexpr bool can_create() { return false; } + spg_t get_pgid() const { + return req->get_spg(); + } + ConnectionPipeline &get_connection_pipeline(); + PipelineHandle &get_handle() { return handle; } + epoch_t get_epoch() const { return req->get_min_epoch(); } + + seastar::future<> with_pg( + ShardServices &shard_services, Ref pg); + + std::tuple< + ConnectionPipeline::AwaitActive::BlockingEvent, + ConnectionPipeline::AwaitMap::BlockingEvent, + ConnectionPipeline::GetPG::BlockingEvent, + PGMap::PGCreationBlockingEvent, + OSD_OSDMapGate::OSDMapBlocker::BlockingEvent + > tracking_events; + +private: + RepRequest::PGPipeline &pp(PG &pg); + + crimson::net::ConnectionRef conn; + Ref req; +}; + +} diff --git a/src/crimson/osd/osd_operations/logmissing_request_reply.cc b/src/crimson/osd/osd_operations/logmissing_request_reply.cc new file mode 100644 index 00000000000..6233bf50eb5 --- /dev/null +++ b/src/crimson/osd/osd_operations/logmissing_request_reply.cc @@ -0,0 +1,68 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "logmissing_request_reply.h" + +#include "common/Formatter.h" + +#include "crimson/osd/osd.h" +#include "crimson/osd/osd_connection_priv.h" +#include "crimson/osd/osd_operation_external_tracking.h" +#include "crimson/osd/pg.h" + +namespace { + seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_osd); + } +} + +namespace crimson::osd { + +LogMissingRequestReply::LogMissingRequestReply( + crimson::net::ConnectionRef&& conn, + Ref &&req) + : conn{std::move(conn)}, + req{std::move(req)} +{} + +void LogMissingRequestReply::print(std::ostream& os) const +{ + os << "LogMissingRequestReply(" + << "from=" << req->from + << " req=" << *req + << ")"; +} + +void LogMissingRequestReply::dump_detail(Formatter *f) const +{ + f->open_object_section("LogMissingRequestReply"); + f->dump_stream("rep_tid") << req->get_tid(); + f->dump_stream("pgid") << req->get_spg(); + f->dump_unsigned("map_epoch", req->get_map_epoch()); + f->dump_unsigned("min_epoch", req->get_min_epoch()); + f->dump_stream("from") << req->from; + f->close_section(); +} + +ConnectionPipeline &LogMissingRequestReply::get_connection_pipeline() +{ + return get_osd_priv(conn.get()).replicated_request_conn_pipeline; +} + +RepRequest::PGPipeline &LogMissingRequestReply::pp(PG &pg) +{ + return pg.replicated_request_pg_pipeline; +} + +seastar::future<> LogMissingRequestReply::with_pg( + ShardServices &shard_services, Ref pg) +{ + logger().debug("{}: LogMissingRequestReply::with_pg", *this); + + IRef ref = this; + return interruptor::with_interruption([this, pg] { + return pg->do_update_log_missing_reply(std::move(req)); + }, [ref](std::exception_ptr) { return seastar::now(); }, pg); +} + +} diff --git a/src/crimson/osd/osd_operations/logmissing_request_reply.h b/src/crimson/osd/osd_operations/logmissing_request_reply.h new file mode 100644 index 00000000000..5632bb4ab7c --- /dev/null +++ b/src/crimson/osd/osd_operations/logmissing_request_reply.h @@ -0,0 +1,68 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include "crimson/net/Connection.h" +#include "crimson/osd/osdmap_gate.h" +#include "crimson/osd/osd_operation.h" +#include "crimson/osd/osd_operations/replicated_request.h" +#include "crimson/osd/pg_map.h" +#include "crimson/common/type_helpers.h" +#include "messages/MOSDPGUpdateLogMissingReply.h" + +namespace ceph { + class Formatter; +} + +namespace crimson::osd { + +class ShardServices; + +class OSD; +class PG; + +class LogMissingRequestReply final : public PhasedOperationT { +public: + class PGPipeline { + struct AwaitMap : OrderedExclusivePhaseT { + static constexpr auto type_name = "LogMissingRequestReply::PGPipeline::await_map"; + } await_map; + struct Process : OrderedExclusivePhaseT { + static constexpr auto type_name = "LogMissingRequestReply::PGPipeline::process"; + } process; + friend LogMissingRequestReply; + }; + static constexpr OperationTypeCode type = OperationTypeCode::logmissing_request_reply; + LogMissingRequestReply(crimson::net::ConnectionRef&&, Ref&&); + + void print(std::ostream &) const final; + void dump_detail(ceph::Formatter* f) const final; + + static constexpr bool can_create() { return false; } + spg_t get_pgid() const { + return req->get_spg(); + } + ConnectionPipeline &get_connection_pipeline(); + PipelineHandle &get_handle() { return handle; } + epoch_t get_epoch() const { return req->get_min_epoch(); } + + seastar::future<> with_pg( + ShardServices &shard_services, Ref pg); + + std::tuple< + ConnectionPipeline::AwaitActive::BlockingEvent, + ConnectionPipeline::AwaitMap::BlockingEvent, + ConnectionPipeline::GetPG::BlockingEvent, + PGMap::PGCreationBlockingEvent, + OSD_OSDMapGate::OSDMapBlocker::BlockingEvent + > tracking_events; + +private: + RepRequest::PGPipeline &pp(PG &pg); + + crimson::net::ConnectionRef conn; + Ref req; +}; + +} diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc index 4e888b7a55d..66680e28248 100644 --- a/src/crimson/osd/pg.cc +++ b/src/crimson/osd/pg.cc @@ -1121,6 +1121,78 @@ void PG::handle_rep_op_reply(crimson::net::ConnectionRef conn, } } +PG::interruptible_future<> PG::do_update_log_missing( + Ref m) +{ + if (__builtin_expect(stopping, false)) { + return seastar::make_exception_future<>( + crimson::common::system_shutdown_exception()); + } + + ceph_assert(m->get_type() == MSG_OSD_PG_UPDATE_LOG_MISSING); + ObjectStore::Transaction t; + std::optional op_trim_to, op_roll_forward_to; + if (m->pg_trim_to != eversion_t()) + op_trim_to = m->pg_trim_to; + if (m->pg_roll_forward_to != eversion_t()) + op_roll_forward_to = m->pg_roll_forward_to; + logger().debug("op_trim_to = {}, op_roll_forward_to = {}", + op_trim_to, op_roll_forward_to); + + peering_state.append_log_entries_update_missing( + m->entries, t, op_trim_to, op_roll_forward_to); + + return interruptor::make_interruptible(shard_services.get_store().do_transaction( + coll_ref, std::move(t))).then_interruptible( + [m, lcod=peering_state.get_info().last_complete, this] { + if (!peering_state.pg_has_reset_since(m->get_epoch())) { + peering_state.update_last_complete_ondisk(lcod); + auto reply = + crimson::make_message( + spg_t(peering_state.get_info().pgid.pgid, get_primary().shard), + pg_whoami.shard, + m->get_epoch(), + m->min_epoch, + m->get_tid(), + lcod); + reply->set_priority(CEPH_MSG_PRIO_HIGH); + return m->get_connection()->send(std::move(reply)); + } + return seastar::now(); + }); +} + + +PG::interruptible_future<> PG::do_update_log_missing_reply( + Ref m) +{ + logger().debug("{}: got reply from {}", __func__, m->get_from()); + + auto it = log_entry_update_waiting_on.find(m->get_tid()); + if (it != log_entry_update_waiting_on.end()) { + if (it->second.waiting_on.count(m->get_from())) { + it->second.waiting_on.erase(m->get_from()); + if (m->last_complete_ondisk != eversion_t()) { + peering_state.update_peer_last_complete_ondisk( + m->get_from(), m->last_complete_ondisk); + } + } else { + logger().error("{} : {} got reply {} from shard we are not waiting for ", + __func__, peering_state.get_info().pgid, *m, m->get_from()); + } + + if (it->second.waiting_on.empty()) { + it->second.all_committed.set_value(); + it->second.all_committed = {}; + log_entry_update_waiting_on.erase(it); + } + } else { + logger().error("{} : {} got reply {} on unknown tid {}", + __func__, peering_state.get_info().pgid, *m, m->get_tid()); + } + return seastar::now(); +} + bool PG::old_peering_msg( const epoch_t reply_epoch, const epoch_t query_epoch) const diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index 25f675b9ad4..0a23e55d131 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -26,6 +26,8 @@ #include "crimson/osd/pg_interval_interrupt_condition.h" #include "crimson/osd/ops_executer.h" #include "crimson/osd/osd_operations/client_request.h" +#include "crimson/osd/osd_operations/logmissing_request.h" +#include "crimson/osd/osd_operations/logmissing_request_reply.h" #include "crimson/osd/osd_operations/peering_event.h" #include "crimson/osd/osd_operations/replicated_request.h" #include "crimson/osd/osd_operations/background_recovery.h" @@ -542,6 +544,10 @@ public: interruptible_future<> handle_rep_op(Ref m); void handle_rep_op_reply(crimson::net::ConnectionRef conn, const MOSDRepOpReply& m); + interruptible_future<> do_update_log_missing(Ref m); + interruptible_future<> do_update_log_missing_reply( + Ref m); + void print(std::ostream& os) const; void dump_primary(Formatter*); @@ -732,6 +738,8 @@ private: template friend class PeeringEvent; friend class RepRequest; + friend class LogMissingRequest; + friend class LogMissingRequestReply; friend class BackfillRecovery; friend struct PGFacade; friend class InternalClientRequest; @@ -761,6 +769,11 @@ private: BackfillRecovery::BackfillRecoveryPipeline backfill_pipeline; friend class IOInterruptCondition; + struct log_update_t { + std::set waiting_on; + seastar::shared_promise<> all_committed; + }; + std::map log_entry_update_waiting_on; }; struct PG::do_osd_ops_params_t { -- 2.39.5