osd_operations/peering_event.cc
osd_operations/pg_advance_map.cc
osd_operations/replicated_request.cc
+ osd_operations/logmissing_request.cc
+ osd_operations/logmissing_request_reply.cc
osd_operations/background_recovery.cc
osd_operations/recovery_subrequest.cc
pg_recovery.cc
#include "messages/MOSDMarkMeDown.h"
#include "messages/MOSDOp.h"
#include "messages/MOSDPeeringOp.h"
+#include "messages/MOSDPGUpdateLogMissing.h"
+#include "messages/MOSDPGUpdateLogMissingReply.h"
#include "messages/MOSDRepOpReply.h"
#include "messages/MOSDScrub2.h"
#include "messages/MPGStats.h"
return handle_rep_op_reply(conn, boost::static_pointer_cast<MOSDRepOpReply>(m));
case MSG_OSD_SCRUB2:
return handle_scrub(conn, boost::static_pointer_cast<MOSDScrub2>(m));
+ case MSG_OSD_PG_UPDATE_LOG_MISSING:
+ return handle_update_log_missing(conn, boost::static_pointer_cast<
+ MOSDPGUpdateLogMissing>(m));
+ case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY:
+ return handle_update_log_missing_reply(conn, boost::static_pointer_cast<
+ MOSDPGUpdateLogMissingReply>(m));
default:
dispatched = false;
return seastar::now();
return seastar::now();
}
+seastar::future<> OSD::handle_update_log_missing(
+ crimson::net::ConnectionRef conn,
+ Ref<MOSDPGUpdateLogMissing> m)
+{
+ m->decode_payload();
+ (void) start_pg_operation<LogMissingRequest>(
+ std::move(conn),
+ std::move(m));
+ return seastar::now();
+}
+
+seastar::future<> OSD::handle_update_log_missing_reply(
+ crimson::net::ConnectionRef conn,
+ Ref<MOSDPGUpdateLogMissingReply> m)
+{
+ m->decode_payload();
+ (void) start_pg_operation<LogMissingRequestReply>(
+ std::move(conn),
+ std::move(m));
+ return seastar::now();
+}
+
seastar::future<> OSD::send_incremental_map(crimson::net::ConnectionRef conn,
epoch_t first)
{
seastar::future<> handle_command(crimson::net::ConnectionRef conn,
Ref<MCommand> m);
seastar::future<> start_asok_admin();
-
+ seastar::future<> handle_update_log_missing(crimson::net::ConnectionRef conn,
+ Ref<MOSDPGUpdateLogMissing> m);
+ seastar::future<> handle_update_log_missing_reply(
+ crimson::net::ConnectionRef conn,
+ Ref<MOSDPGUpdateLogMissingReply> m);
public:
OSD_OSDMapGate osdmap_gate;
background_recovery_sub,
internal_client_request,
historic_client_request,
+ logmissing_request,
+ logmissing_request_reply,
last_op
};
"background_recovery_sub",
"internal_client_request",
"historic_client_request",
+ "logmissing_request",
+ "logmissing_request_reply",
};
// prevent the addition of OperationTypeCode-s with no matching OP_NAMES entry:
}
};
+
+template <>
+struct EventBackendRegistry<osd::LogMissingRequest> {
+ static std::tuple<> get_backends() {
+ return {/* no extenral backends */};
+ }
+};
+
+template <>
+struct EventBackendRegistry<osd::LogMissingRequestReply> {
+ static std::tuple<> get_backends() {
+ return {/* no extenral backends */};
+ }
+};
+
template <>
struct EventBackendRegistry<osd::RecoverySubRequest> {
static std::tuple<> get_backends() {
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "logmissing_request.h"
+
+#include "common/Formatter.h"
+
+#include "crimson/osd/osd.h"
+#include "crimson/osd/osd_connection_priv.h"
+#include "crimson/osd/osd_operation_external_tracking.h"
+#include "crimson/osd/pg.h"
+
+namespace {
+ seastar::logger& logger() {
+ return crimson::get_logger(ceph_subsys_osd);
+ }
+}
+
+namespace crimson::osd {
+
+LogMissingRequest::LogMissingRequest(crimson::net::ConnectionRef&& conn,
+ Ref<MOSDPGUpdateLogMissing> &&req)
+ : conn{std::move(conn)},
+ req{std::move(req)}
+{}
+
+void LogMissingRequest::print(std::ostream& os) const
+{
+ os << "LogMissingRequest("
+ << "from=" << req->from
+ << " req=" << *req
+ << ")";
+}
+
+void LogMissingRequest::dump_detail(Formatter *f) const
+{
+ f->open_object_section("LogMissingRequest");
+ f->dump_stream("req_tid") << req->get_tid();
+ f->dump_stream("pgid") << req->get_spg();
+ f->dump_unsigned("map_epoch", req->get_map_epoch());
+ f->dump_unsigned("min_epoch", req->get_min_epoch());
+ f->dump_stream("entries") << req->entries;
+ f->dump_stream("from") << req->from;
+ f->close_section();
+}
+
+ConnectionPipeline &LogMissingRequest::get_connection_pipeline()
+{
+ return get_osd_priv(conn.get()).replicated_request_conn_pipeline;
+}
+
+RepRequest::PGPipeline &LogMissingRequest::pp(PG &pg)
+{
+ return pg.replicated_request_pg_pipeline;
+}
+
+seastar::future<> LogMissingRequest::with_pg(
+ ShardServices &shard_services, Ref<PG> pg)
+{
+ logger().debug("{}: LogMissingRequest::with_pg", *this);
+
+ IRef ref = this;
+ return interruptor::with_interruption([this, pg] {
+ return pg->do_update_log_missing(std::move(req));
+ }, [ref](std::exception_ptr) { return seastar::now(); }, pg);
+}
+
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include "crimson/net/Connection.h"
+#include "crimson/osd/osdmap_gate.h"
+#include "crimson/osd/osd_operation.h"
+#include "crimson/osd/osd_operations/replicated_request.h"
+#include "crimson/osd/pg_map.h"
+#include "crimson/common/type_helpers.h"
+#include "messages/MOSDPGUpdateLogMissing.h"
+
+namespace ceph {
+ class Formatter;
+}
+
+namespace crimson::osd {
+
+class ShardServices;
+
+class OSD;
+class PG;
+
+class LogMissingRequest final : public PhasedOperationT<LogMissingRequest> {
+public:
+ class PGPipeline {
+ struct AwaitMap : OrderedExclusivePhaseT<AwaitMap> {
+ static constexpr auto type_name = "LogMissingRequest::PGPipeline::await_map";
+ } await_map;
+ struct Process : OrderedExclusivePhaseT<Process> {
+ static constexpr auto type_name = "LogMissingRequest::PGPipeline::process";
+ } process;
+ friend LogMissingRequest;
+ };
+ static constexpr OperationTypeCode type = OperationTypeCode::logmissing_request;
+ LogMissingRequest(crimson::net::ConnectionRef&&, Ref<MOSDPGUpdateLogMissing>&&);
+
+ void print(std::ostream &) const final;
+ void dump_detail(ceph::Formatter* f) const final;
+
+ static constexpr bool can_create() { return false; }
+ spg_t get_pgid() const {
+ return req->get_spg();
+ }
+ ConnectionPipeline &get_connection_pipeline();
+ PipelineHandle &get_handle() { return handle; }
+ epoch_t get_epoch() const { return req->get_min_epoch(); }
+
+ seastar::future<> with_pg(
+ ShardServices &shard_services, Ref<PG> pg);
+
+ std::tuple<
+ ConnectionPipeline::AwaitActive::BlockingEvent,
+ ConnectionPipeline::AwaitMap::BlockingEvent,
+ ConnectionPipeline::GetPG::BlockingEvent,
+ PGMap::PGCreationBlockingEvent,
+ OSD_OSDMapGate::OSDMapBlocker::BlockingEvent
+ > tracking_events;
+
+private:
+ RepRequest::PGPipeline &pp(PG &pg);
+
+ crimson::net::ConnectionRef conn;
+ Ref<MOSDPGUpdateLogMissing> req;
+};
+
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "logmissing_request_reply.h"
+
+#include "common/Formatter.h"
+
+#include "crimson/osd/osd.h"
+#include "crimson/osd/osd_connection_priv.h"
+#include "crimson/osd/osd_operation_external_tracking.h"
+#include "crimson/osd/pg.h"
+
+namespace {
+ seastar::logger& logger() {
+ return crimson::get_logger(ceph_subsys_osd);
+ }
+}
+
+namespace crimson::osd {
+
+LogMissingRequestReply::LogMissingRequestReply(
+ crimson::net::ConnectionRef&& conn,
+ Ref<MOSDPGUpdateLogMissingReply> &&req)
+ : conn{std::move(conn)},
+ req{std::move(req)}
+{}
+
+void LogMissingRequestReply::print(std::ostream& os) const
+{
+ os << "LogMissingRequestReply("
+ << "from=" << req->from
+ << " req=" << *req
+ << ")";
+}
+
+void LogMissingRequestReply::dump_detail(Formatter *f) const
+{
+ f->open_object_section("LogMissingRequestReply");
+ f->dump_stream("rep_tid") << req->get_tid();
+ f->dump_stream("pgid") << req->get_spg();
+ f->dump_unsigned("map_epoch", req->get_map_epoch());
+ f->dump_unsigned("min_epoch", req->get_min_epoch());
+ f->dump_stream("from") << req->from;
+ f->close_section();
+}
+
+ConnectionPipeline &LogMissingRequestReply::get_connection_pipeline()
+{
+ return get_osd_priv(conn.get()).replicated_request_conn_pipeline;
+}
+
+RepRequest::PGPipeline &LogMissingRequestReply::pp(PG &pg)
+{
+ return pg.replicated_request_pg_pipeline;
+}
+
+seastar::future<> LogMissingRequestReply::with_pg(
+ ShardServices &shard_services, Ref<PG> pg)
+{
+ logger().debug("{}: LogMissingRequestReply::with_pg", *this);
+
+ IRef ref = this;
+ return interruptor::with_interruption([this, pg] {
+ return pg->do_update_log_missing_reply(std::move(req));
+ }, [ref](std::exception_ptr) { return seastar::now(); }, pg);
+}
+
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include "crimson/net/Connection.h"
+#include "crimson/osd/osdmap_gate.h"
+#include "crimson/osd/osd_operation.h"
+#include "crimson/osd/osd_operations/replicated_request.h"
+#include "crimson/osd/pg_map.h"
+#include "crimson/common/type_helpers.h"
+#include "messages/MOSDPGUpdateLogMissingReply.h"
+
+namespace ceph {
+ class Formatter;
+}
+
+namespace crimson::osd {
+
+class ShardServices;
+
+class OSD;
+class PG;
+
+class LogMissingRequestReply final : public PhasedOperationT<LogMissingRequestReply> {
+public:
+ class PGPipeline {
+ struct AwaitMap : OrderedExclusivePhaseT<AwaitMap> {
+ static constexpr auto type_name = "LogMissingRequestReply::PGPipeline::await_map";
+ } await_map;
+ struct Process : OrderedExclusivePhaseT<Process> {
+ static constexpr auto type_name = "LogMissingRequestReply::PGPipeline::process";
+ } process;
+ friend LogMissingRequestReply;
+ };
+ static constexpr OperationTypeCode type = OperationTypeCode::logmissing_request_reply;
+ LogMissingRequestReply(crimson::net::ConnectionRef&&, Ref<MOSDPGUpdateLogMissingReply>&&);
+
+ void print(std::ostream &) const final;
+ void dump_detail(ceph::Formatter* f) const final;
+
+ static constexpr bool can_create() { return false; }
+ spg_t get_pgid() const {
+ return req->get_spg();
+ }
+ ConnectionPipeline &get_connection_pipeline();
+ PipelineHandle &get_handle() { return handle; }
+ epoch_t get_epoch() const { return req->get_min_epoch(); }
+
+ seastar::future<> with_pg(
+ ShardServices &shard_services, Ref<PG> pg);
+
+ std::tuple<
+ ConnectionPipeline::AwaitActive::BlockingEvent,
+ ConnectionPipeline::AwaitMap::BlockingEvent,
+ ConnectionPipeline::GetPG::BlockingEvent,
+ PGMap::PGCreationBlockingEvent,
+ OSD_OSDMapGate::OSDMapBlocker::BlockingEvent
+ > tracking_events;
+
+private:
+ RepRequest::PGPipeline &pp(PG &pg);
+
+ crimson::net::ConnectionRef conn;
+ Ref<MOSDPGUpdateLogMissingReply> req;
+};
+
+}
}
}
+PG::interruptible_future<> PG::do_update_log_missing(
+ Ref<MOSDPGUpdateLogMissing> m)
+{
+ if (__builtin_expect(stopping, false)) {
+ return seastar::make_exception_future<>(
+ crimson::common::system_shutdown_exception());
+ }
+
+ ceph_assert(m->get_type() == MSG_OSD_PG_UPDATE_LOG_MISSING);
+ ObjectStore::Transaction t;
+ std::optional<eversion_t> op_trim_to, op_roll_forward_to;
+ if (m->pg_trim_to != eversion_t())
+ op_trim_to = m->pg_trim_to;
+ if (m->pg_roll_forward_to != eversion_t())
+ op_roll_forward_to = m->pg_roll_forward_to;
+ logger().debug("op_trim_to = {}, op_roll_forward_to = {}",
+ op_trim_to, op_roll_forward_to);
+
+ peering_state.append_log_entries_update_missing(
+ m->entries, t, op_trim_to, op_roll_forward_to);
+
+ return interruptor::make_interruptible(shard_services.get_store().do_transaction(
+ coll_ref, std::move(t))).then_interruptible(
+ [m, lcod=peering_state.get_info().last_complete, this] {
+ if (!peering_state.pg_has_reset_since(m->get_epoch())) {
+ peering_state.update_last_complete_ondisk(lcod);
+ auto reply =
+ crimson::make_message<MOSDPGUpdateLogMissingReply>(
+ spg_t(peering_state.get_info().pgid.pgid, get_primary().shard),
+ pg_whoami.shard,
+ m->get_epoch(),
+ m->min_epoch,
+ m->get_tid(),
+ lcod);
+ reply->set_priority(CEPH_MSG_PRIO_HIGH);
+ return m->get_connection()->send(std::move(reply));
+ }
+ return seastar::now();
+ });
+}
+
+
+PG::interruptible_future<> PG::do_update_log_missing_reply(
+ Ref<MOSDPGUpdateLogMissingReply> m)
+{
+ logger().debug("{}: got reply from {}", __func__, m->get_from());
+
+ auto it = log_entry_update_waiting_on.find(m->get_tid());
+ if (it != log_entry_update_waiting_on.end()) {
+ if (it->second.waiting_on.count(m->get_from())) {
+ it->second.waiting_on.erase(m->get_from());
+ if (m->last_complete_ondisk != eversion_t()) {
+ peering_state.update_peer_last_complete_ondisk(
+ m->get_from(), m->last_complete_ondisk);
+ }
+ } else {
+ logger().error("{} : {} got reply {} from shard we are not waiting for ",
+ __func__, peering_state.get_info().pgid, *m, m->get_from());
+ }
+
+ if (it->second.waiting_on.empty()) {
+ it->second.all_committed.set_value();
+ it->second.all_committed = {};
+ log_entry_update_waiting_on.erase(it);
+ }
+ } else {
+ logger().error("{} : {} got reply {} on unknown tid {}",
+ __func__, peering_state.get_info().pgid, *m, m->get_tid());
+ }
+ return seastar::now();
+}
+
bool PG::old_peering_msg(
const epoch_t reply_epoch,
const epoch_t query_epoch) const
#include "crimson/osd/pg_interval_interrupt_condition.h"
#include "crimson/osd/ops_executer.h"
#include "crimson/osd/osd_operations/client_request.h"
+#include "crimson/osd/osd_operations/logmissing_request.h"
+#include "crimson/osd/osd_operations/logmissing_request_reply.h"
#include "crimson/osd/osd_operations/peering_event.h"
#include "crimson/osd/osd_operations/replicated_request.h"
#include "crimson/osd/osd_operations/background_recovery.h"
interruptible_future<> handle_rep_op(Ref<MOSDRepOp> m);
void handle_rep_op_reply(crimson::net::ConnectionRef conn,
const MOSDRepOpReply& m);
+ interruptible_future<> do_update_log_missing(Ref<MOSDPGUpdateLogMissing> m);
+ interruptible_future<> do_update_log_missing_reply(
+ Ref<MOSDPGUpdateLogMissingReply> m);
+
void print(std::ostream& os) const;
void dump_primary(Formatter*);
template <class T>
friend class PeeringEvent;
friend class RepRequest;
+ friend class LogMissingRequest;
+ friend class LogMissingRequestReply;
friend class BackfillRecovery;
friend struct PGFacade;
friend class InternalClientRequest;
BackfillRecovery::BackfillRecoveryPipeline backfill_pipeline;
friend class IOInterruptCondition;
+ struct log_update_t {
+ std::set<pg_shard_t> waiting_on;
+ seastar::shared_promise<> all_committed;
+ };
+ std::map<ceph_tid_t, log_update_t> log_entry_update_waiting_on;
};
struct PG::do_osd_ops_params_t {