]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/osd: add LogMissing support
authorchunmei-liu <chunmei.liu@intel.com>
Wed, 25 May 2022 05:50:25 +0000 (22:50 -0700)
committerchunmei-liu <chunmei.liu@intel.com>
Wed, 8 Jun 2022 22:43:39 +0000 (15:43 -0700)
Signed-off-by: chunmei-liu <chunmei.liu@intel.com>
src/crimson/osd/CMakeLists.txt
src/crimson/osd/osd.cc
src/crimson/osd/osd.h
src/crimson/osd/osd_operation.h
src/crimson/osd/osd_operation_external_tracking.h
src/crimson/osd/osd_operations/logmissing_request.cc [new file with mode: 0644]
src/crimson/osd/osd_operations/logmissing_request.h [new file with mode: 0644]
src/crimson/osd/osd_operations/logmissing_request_reply.cc [new file with mode: 0644]
src/crimson/osd/osd_operations/logmissing_request_reply.h [new file with mode: 0644]
src/crimson/osd/pg.cc
src/crimson/osd/pg.h

index 4313075a8d6d09435cdeec094b442914b52ba93f..389d2243ae30b10646f7dd6ea68c729f1207fd54 100644 (file)
@@ -21,6 +21,8 @@ add_executable(crimson-osd
   osd_operations/peering_event.cc
   osd_operations/pg_advance_map.cc
   osd_operations/replicated_request.cc
+  osd_operations/logmissing_request.cc
+  osd_operations/logmissing_request_reply.cc
   osd_operations/background_recovery.cc
   osd_operations/recovery_subrequest.cc
   pg_recovery.cc
index 4db655e81e959e513130b232d7993f58ea265cea..3c06217ef3f9ae952c420175c0ffd57582926ac3 100644 (file)
@@ -22,6 +22,8 @@
 #include "messages/MOSDMarkMeDown.h"
 #include "messages/MOSDOp.h"
 #include "messages/MOSDPeeringOp.h"
+#include "messages/MOSDPGUpdateLogMissing.h"
+#include "messages/MOSDPGUpdateLogMissingReply.h"
 #include "messages/MOSDRepOpReply.h"
 #include "messages/MOSDScrub2.h"
 #include "messages/MPGStats.h"
@@ -781,6 +783,12 @@ OSD::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m)
       return handle_rep_op_reply(conn, boost::static_pointer_cast<MOSDRepOpReply>(m));
     case MSG_OSD_SCRUB2:
       return handle_scrub(conn, boost::static_pointer_cast<MOSDScrub2>(m));
+    case MSG_OSD_PG_UPDATE_LOG_MISSING:
+      return handle_update_log_missing(conn, boost::static_pointer_cast<
+        MOSDPGUpdateLogMissing>(m));
+    case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY:
+      return handle_update_log_missing_reply(conn, boost::static_pointer_cast<
+        MOSDPGUpdateLogMissingReply>(m));
     default:
       dispatched = false;
       return seastar::now();
@@ -1209,6 +1217,28 @@ seastar::future<> OSD::handle_osd_op(crimson::net::ConnectionRef conn,
   return seastar::now();
 }
 
+seastar::future<> OSD::handle_update_log_missing(
+  crimson::net::ConnectionRef conn,
+  Ref<MOSDPGUpdateLogMissing> m)
+{
+  m->decode_payload();
+  (void) start_pg_operation<LogMissingRequest>(
+    std::move(conn),
+    std::move(m));
+  return seastar::now();
+}
+
+seastar::future<> OSD::handle_update_log_missing_reply(
+  crimson::net::ConnectionRef conn,
+  Ref<MOSDPGUpdateLogMissingReply> m)
+{
+  m->decode_payload();
+  (void) start_pg_operation<LogMissingRequestReply>(
+    std::move(conn),
+    std::move(m));
+  return seastar::now();
+}
+
 seastar::future<> OSD::send_incremental_map(crimson::net::ConnectionRef conn,
                                            epoch_t first)
 {
index 9902459bf6c64eb90946a68c81847930c9399dc6..b19203527912b7c9703ddd155376c6ed8aec49b5 100644 (file)
@@ -207,7 +207,11 @@ private:
   seastar::future<> handle_command(crimson::net::ConnectionRef conn,
                                   Ref<MCommand> m);
   seastar::future<> start_asok_admin();
-
+  seastar::future<> handle_update_log_missing(crimson::net::ConnectionRef conn,
+                                              Ref<MOSDPGUpdateLogMissing> m);
+  seastar::future<> handle_update_log_missing_reply(
+    crimson::net::ConnectionRef conn,
+    Ref<MOSDPGUpdateLogMissingReply> m);
 public:
   OSD_OSDMapGate osdmap_gate;
 
index 2f9a69d39e6ae5464e60e8245223711a13109bc1..323f7bb59397ce8e4c2077db8b693157f7bc842c 100644 (file)
@@ -44,6 +44,8 @@ enum class OperationTypeCode {
   background_recovery_sub,
   internal_client_request,
   historic_client_request,
+  logmissing_request,
+  logmissing_request_reply,
   last_op
 };
 
@@ -58,6 +60,8 @@ static constexpr const char* const OP_NAMES[] = {
   "background_recovery_sub",
   "internal_client_request",
   "historic_client_request",
+  "logmissing_request",
+  "logmissing_request_reply",
 };
 
 // prevent the addition of OperationTypeCode-s with no matching OP_NAMES entry:
index bfe0b41f676eb4b3486541a43d60d2995b55953b..b77d0101e4201bd91e25fe3d2b7e9cfc4e744e98 100644 (file)
@@ -279,6 +279,21 @@ struct EventBackendRegistry<osd::RepRequest> {
   }
 };
 
+
+template <>
+struct EventBackendRegistry<osd::LogMissingRequest> {
+  static std::tuple<> get_backends() {
+    return {/* no extenral backends */};
+  }
+};
+
+template <>
+struct EventBackendRegistry<osd::LogMissingRequestReply> {
+  static std::tuple<> get_backends() {
+    return {/* no extenral backends */};
+  }
+};
+
 template <>
 struct EventBackendRegistry<osd::RecoverySubRequest> {
   static std::tuple<> get_backends() {
diff --git a/src/crimson/osd/osd_operations/logmissing_request.cc b/src/crimson/osd/osd_operations/logmissing_request.cc
new file mode 100644 (file)
index 0000000..84efcfa
--- /dev/null
@@ -0,0 +1,68 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "logmissing_request.h"
+
+#include "common/Formatter.h"
+
+#include "crimson/osd/osd.h"
+#include "crimson/osd/osd_connection_priv.h"
+#include "crimson/osd/osd_operation_external_tracking.h"
+#include "crimson/osd/pg.h"
+
+namespace {
+  seastar::logger& logger() {
+    return crimson::get_logger(ceph_subsys_osd);
+  }
+}
+
+namespace crimson::osd {
+
+LogMissingRequest::LogMissingRequest(crimson::net::ConnectionRef&& conn,
+                      Ref<MOSDPGUpdateLogMissing> &&req)
+  : conn{std::move(conn)},
+    req{std::move(req)}
+{}
+
+void LogMissingRequest::print(std::ostream& os) const
+{
+  os << "LogMissingRequest("
+     << "from=" << req->from
+     << " req=" << *req
+     << ")";
+}
+
+void LogMissingRequest::dump_detail(Formatter *f) const
+{
+  f->open_object_section("LogMissingRequest");
+  f->dump_stream("req_tid") << req->get_tid();
+  f->dump_stream("pgid") << req->get_spg();
+  f->dump_unsigned("map_epoch", req->get_map_epoch());
+  f->dump_unsigned("min_epoch", req->get_min_epoch());
+  f->dump_stream("entries") << req->entries;
+  f->dump_stream("from") << req->from;
+  f->close_section();
+}
+
+ConnectionPipeline &LogMissingRequest::get_connection_pipeline()
+{
+  return get_osd_priv(conn.get()).replicated_request_conn_pipeline;
+}
+
+RepRequest::PGPipeline &LogMissingRequest::pp(PG &pg)
+{
+  return pg.replicated_request_pg_pipeline;
+}
+
+seastar::future<> LogMissingRequest::with_pg(
+  ShardServices &shard_services, Ref<PG> pg)
+{
+  logger().debug("{}: LogMissingRequest::with_pg", *this);
+
+  IRef ref = this;
+  return interruptor::with_interruption([this, pg] {
+    return pg->do_update_log_missing(std::move(req));
+  }, [ref](std::exception_ptr) { return seastar::now(); }, pg);
+}
+
+}
diff --git a/src/crimson/osd/osd_operations/logmissing_request.h b/src/crimson/osd/osd_operations/logmissing_request.h
new file mode 100644 (file)
index 0000000..6fad844
--- /dev/null
@@ -0,0 +1,68 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include "crimson/net/Connection.h"
+#include "crimson/osd/osdmap_gate.h"
+#include "crimson/osd/osd_operation.h"
+#include "crimson/osd/osd_operations/replicated_request.h"
+#include "crimson/osd/pg_map.h"
+#include "crimson/common/type_helpers.h"
+#include "messages/MOSDPGUpdateLogMissing.h"
+
+namespace ceph {
+  class Formatter;
+}
+
+namespace crimson::osd {
+
+class ShardServices;
+
+class OSD;
+class PG;
+
+class LogMissingRequest final : public PhasedOperationT<LogMissingRequest> {
+public:
+  class PGPipeline {
+    struct AwaitMap : OrderedExclusivePhaseT<AwaitMap> {
+      static constexpr auto type_name = "LogMissingRequest::PGPipeline::await_map";
+    } await_map;
+    struct Process : OrderedExclusivePhaseT<Process> {
+      static constexpr auto type_name = "LogMissingRequest::PGPipeline::process";
+    } process;
+    friend LogMissingRequest;
+  };
+  static constexpr OperationTypeCode type = OperationTypeCode::logmissing_request;
+  LogMissingRequest(crimson::net::ConnectionRef&&, Ref<MOSDPGUpdateLogMissing>&&);
+
+  void print(std::ostream &) const final;
+  void dump_detail(ceph::Formatter* f) const final;
+
+  static constexpr bool can_create() { return false; }
+  spg_t get_pgid() const {
+    return req->get_spg();
+  }
+  ConnectionPipeline &get_connection_pipeline();
+  PipelineHandle &get_handle() { return handle; }
+  epoch_t get_epoch() const { return req->get_min_epoch(); }
+
+  seastar::future<> with_pg(
+    ShardServices &shard_services, Ref<PG> pg);
+
+  std::tuple<
+    ConnectionPipeline::AwaitActive::BlockingEvent,
+    ConnectionPipeline::AwaitMap::BlockingEvent,
+    ConnectionPipeline::GetPG::BlockingEvent,
+    PGMap::PGCreationBlockingEvent,
+    OSD_OSDMapGate::OSDMapBlocker::BlockingEvent
+  > tracking_events;
+
+private:
+  RepRequest::PGPipeline &pp(PG &pg);
+
+  crimson::net::ConnectionRef conn;
+  Ref<MOSDPGUpdateLogMissing> req;
+};
+
+}
diff --git a/src/crimson/osd/osd_operations/logmissing_request_reply.cc b/src/crimson/osd/osd_operations/logmissing_request_reply.cc
new file mode 100644 (file)
index 0000000..6233bf5
--- /dev/null
@@ -0,0 +1,68 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "logmissing_request_reply.h"
+
+#include "common/Formatter.h"
+
+#include "crimson/osd/osd.h"
+#include "crimson/osd/osd_connection_priv.h"
+#include "crimson/osd/osd_operation_external_tracking.h"
+#include "crimson/osd/pg.h"
+
+namespace {
+  seastar::logger& logger() {
+    return crimson::get_logger(ceph_subsys_osd);
+  }
+}
+
+namespace crimson::osd {
+
+LogMissingRequestReply::LogMissingRequestReply(
+  crimson::net::ConnectionRef&& conn,
+  Ref<MOSDPGUpdateLogMissingReply> &&req)
+  : conn{std::move(conn)},
+    req{std::move(req)}
+{}
+
+void LogMissingRequestReply::print(std::ostream& os) const
+{
+  os << "LogMissingRequestReply("
+     << "from=" << req->from
+     << " req=" << *req
+     << ")";
+}
+
+void LogMissingRequestReply::dump_detail(Formatter *f) const
+{
+  f->open_object_section("LogMissingRequestReply");
+  f->dump_stream("rep_tid") << req->get_tid();
+  f->dump_stream("pgid") << req->get_spg();
+  f->dump_unsigned("map_epoch", req->get_map_epoch());
+  f->dump_unsigned("min_epoch", req->get_min_epoch());
+  f->dump_stream("from") << req->from;
+  f->close_section();
+}
+
+ConnectionPipeline &LogMissingRequestReply::get_connection_pipeline()
+{
+  return get_osd_priv(conn.get()).replicated_request_conn_pipeline;
+}
+
+RepRequest::PGPipeline &LogMissingRequestReply::pp(PG &pg)
+{
+  return pg.replicated_request_pg_pipeline;
+}
+
+seastar::future<> LogMissingRequestReply::with_pg(
+  ShardServices &shard_services, Ref<PG> pg)
+{
+  logger().debug("{}: LogMissingRequestReply::with_pg", *this);
+
+  IRef ref = this;
+  return interruptor::with_interruption([this, pg] {
+    return pg->do_update_log_missing_reply(std::move(req));
+  }, [ref](std::exception_ptr) { return seastar::now(); }, pg);
+}
+
+}
diff --git a/src/crimson/osd/osd_operations/logmissing_request_reply.h b/src/crimson/osd/osd_operations/logmissing_request_reply.h
new file mode 100644 (file)
index 0000000..5632bb4
--- /dev/null
@@ -0,0 +1,68 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include "crimson/net/Connection.h"
+#include "crimson/osd/osdmap_gate.h"
+#include "crimson/osd/osd_operation.h"
+#include "crimson/osd/osd_operations/replicated_request.h"
+#include "crimson/osd/pg_map.h"
+#include "crimson/common/type_helpers.h"
+#include "messages/MOSDPGUpdateLogMissingReply.h"
+
+namespace ceph {
+  class Formatter;
+}
+
+namespace crimson::osd {
+
+class ShardServices;
+
+class OSD;
+class PG;
+
+class LogMissingRequestReply final : public PhasedOperationT<LogMissingRequestReply> {
+public:
+  class PGPipeline {
+    struct AwaitMap : OrderedExclusivePhaseT<AwaitMap> {
+      static constexpr auto type_name = "LogMissingRequestReply::PGPipeline::await_map";
+    } await_map;
+    struct Process : OrderedExclusivePhaseT<Process> {
+      static constexpr auto type_name = "LogMissingRequestReply::PGPipeline::process";
+    } process;
+    friend LogMissingRequestReply;
+  };
+  static constexpr OperationTypeCode type = OperationTypeCode::logmissing_request_reply;
+  LogMissingRequestReply(crimson::net::ConnectionRef&&, Ref<MOSDPGUpdateLogMissingReply>&&);
+
+  void print(std::ostream &) const final;
+  void dump_detail(ceph::Formatter* f) const final;
+
+  static constexpr bool can_create() { return false; }
+  spg_t get_pgid() const {
+    return req->get_spg();
+  }
+  ConnectionPipeline &get_connection_pipeline();
+  PipelineHandle &get_handle() { return handle; }
+  epoch_t get_epoch() const { return req->get_min_epoch(); }
+
+  seastar::future<> with_pg(
+    ShardServices &shard_services, Ref<PG> pg);
+
+  std::tuple<
+    ConnectionPipeline::AwaitActive::BlockingEvent,
+    ConnectionPipeline::AwaitMap::BlockingEvent,
+    ConnectionPipeline::GetPG::BlockingEvent,
+    PGMap::PGCreationBlockingEvent,
+    OSD_OSDMapGate::OSDMapBlocker::BlockingEvent
+  > tracking_events;
+
+private:
+  RepRequest::PGPipeline &pp(PG &pg);
+
+  crimson::net::ConnectionRef conn;
+  Ref<MOSDPGUpdateLogMissingReply> req;
+};
+
+}
index 4e888b7a55d44ee40d63c478420fb41f34bf1b62..66680e2824879b1b4cd27e1401a92617d8a3bc69 100644 (file)
@@ -1121,6 +1121,78 @@ void PG::handle_rep_op_reply(crimson::net::ConnectionRef conn,
   }
 }
 
+PG::interruptible_future<> PG::do_update_log_missing(
+  Ref<MOSDPGUpdateLogMissing> m)
+{
+  if (__builtin_expect(stopping, false)) {
+    return seastar::make_exception_future<>(
+      crimson::common::system_shutdown_exception());
+  }
+
+  ceph_assert(m->get_type() == MSG_OSD_PG_UPDATE_LOG_MISSING);
+  ObjectStore::Transaction t;
+  std::optional<eversion_t> op_trim_to, op_roll_forward_to;
+  if (m->pg_trim_to != eversion_t())
+    op_trim_to = m->pg_trim_to;
+  if (m->pg_roll_forward_to != eversion_t())
+    op_roll_forward_to = m->pg_roll_forward_to;
+  logger().debug("op_trim_to = {}, op_roll_forward_to = {}",
+    op_trim_to, op_roll_forward_to);
+
+  peering_state.append_log_entries_update_missing(
+    m->entries, t, op_trim_to, op_roll_forward_to);
+
+  return interruptor::make_interruptible(shard_services.get_store().do_transaction(
+    coll_ref, std::move(t))).then_interruptible(
+    [m, lcod=peering_state.get_info().last_complete, this] {
+    if (!peering_state.pg_has_reset_since(m->get_epoch())) {
+      peering_state.update_last_complete_ondisk(lcod);
+      auto reply =
+        crimson::make_message<MOSDPGUpdateLogMissingReply>(
+          spg_t(peering_state.get_info().pgid.pgid, get_primary().shard),
+          pg_whoami.shard,
+          m->get_epoch(),
+          m->min_epoch,
+          m->get_tid(),
+          lcod);
+      reply->set_priority(CEPH_MSG_PRIO_HIGH);
+      return m->get_connection()->send(std::move(reply));
+    }
+    return seastar::now();
+  });
+}
+
+
+PG::interruptible_future<> PG::do_update_log_missing_reply(
+  Ref<MOSDPGUpdateLogMissingReply> m)
+{
+  logger().debug("{}: got reply from {}", __func__, m->get_from());
+
+  auto it = log_entry_update_waiting_on.find(m->get_tid());
+  if (it != log_entry_update_waiting_on.end()) {
+    if (it->second.waiting_on.count(m->get_from())) {
+      it->second.waiting_on.erase(m->get_from());
+      if (m->last_complete_ondisk != eversion_t()) {
+        peering_state.update_peer_last_complete_ondisk(
+         m->get_from(), m->last_complete_ondisk);
+      }
+    } else {
+      logger().error("{} : {} got reply {} from shard we are not waiting for ",
+        __func__, peering_state.get_info().pgid, *m, m->get_from());
+    }
+
+    if (it->second.waiting_on.empty()) {
+      it->second.all_committed.set_value();
+      it->second.all_committed = {};
+      log_entry_update_waiting_on.erase(it);
+    }
+  } else {
+    logger().error("{} : {} got reply {} on unknown tid {}",
+      __func__, peering_state.get_info().pgid, *m, m->get_tid());
+  }
+  return seastar::now();
+}
+
 bool PG::old_peering_msg(
   const epoch_t reply_epoch,
   const epoch_t query_epoch) const
index 25f675b9ad4e1a7a0da5ddc6b14df4517264a413..0a23e55d1311adf152c89d8a4fc067c823b498b1 100644 (file)
@@ -26,6 +26,8 @@
 #include "crimson/osd/pg_interval_interrupt_condition.h"
 #include "crimson/osd/ops_executer.h"
 #include "crimson/osd/osd_operations/client_request.h"
+#include "crimson/osd/osd_operations/logmissing_request.h"
+#include "crimson/osd/osd_operations/logmissing_request_reply.h"
 #include "crimson/osd/osd_operations/peering_event.h"
 #include "crimson/osd/osd_operations/replicated_request.h"
 #include "crimson/osd/osd_operations/background_recovery.h"
@@ -542,6 +544,10 @@ public:
   interruptible_future<> handle_rep_op(Ref<MOSDRepOp> m);
   void handle_rep_op_reply(crimson::net::ConnectionRef conn,
                           const MOSDRepOpReply& m);
+  interruptible_future<> do_update_log_missing(Ref<MOSDPGUpdateLogMissing> m);
+  interruptible_future<> do_update_log_missing_reply(
+                         Ref<MOSDPGUpdateLogMissingReply> m);
+
 
   void print(std::ostream& os) const;
   void dump_primary(Formatter*);
@@ -732,6 +738,8 @@ private:
   template <class T>
   friend class PeeringEvent;
   friend class RepRequest;
+  friend class LogMissingRequest;
+  friend class LogMissingRequestReply;
   friend class BackfillRecovery;
   friend struct PGFacade;
   friend class InternalClientRequest;
@@ -761,6 +769,11 @@ private:
   BackfillRecovery::BackfillRecoveryPipeline backfill_pipeline;
 
   friend class IOInterruptCondition;
+  struct log_update_t {
+    std::set<pg_shard_t> waiting_on;
+    seastar::shared_promise<> all_committed;
+  };
+  std::map<ceph_tid_t, log_update_t> log_entry_update_waiting_on;
 };
 
 struct PG::do_osd_ops_params_t {