]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
osd/scrub: reserve replicas one by one, and in consistent order
authorRonen Friedman <rfriedma@redhat.com>
Sun, 24 Sep 2023 12:34:14 +0000 (07:34 -0500)
committerRonen Friedman <rfriedma@redhat.com>
Sat, 14 Oct 2023 18:49:01 +0000 (21:49 +0300)
Issuing the reservation requests one by one - waiting for
approval from the secondary before the next request is sent.

The requests are sent in ascending target pg-shard-id order, reducing the
chance of having two PGs repeatedly competing for the same set of
OSDs - and doing so in an interleaved sequence.

Modifying the Session state in the scrubber FSM to react to interval
changes by discarding replica reservations.

Signed-off-by: Ronen Friedman <rfriedma@redhat.com>
src/osd/PG.cc
src/osd/scrubber/pg_scrubber.cc
src/osd/scrubber/pg_scrubber.h
src/osd/scrubber/scrub_machine.cc
src/osd/scrubber/scrub_machine.h
src/osd/scrubber/scrub_machine_lstnr.h
src/osd/scrubber_common.h

index 0d1f8d44e1c447cf57b8bfb0e9ffd19a985693ea..580b3add8484a87d9b8044b060fb9da5006d16a9 100644 (file)
@@ -359,7 +359,7 @@ void PG::clear_primary_state()
   release_pg_backoffs();
 
   if (m_scrubber) {
-    m_scrubber->discard_replica_reservations();
+    m_scrubber->on_new_interval();
   }
   scrub_after_recovery = false;
 
index e27e50b70d41a577c0b03622f87ab8e8ed03a117..294d050268d0359f7f4f71ee634d68b565aa9084 100644 (file)
@@ -5,6 +5,7 @@
 
 #include <cmath>
 #include <iostream>
+#include <span>
 #include <vector>
 
 #include <fmt/ranges.h>
@@ -469,7 +470,7 @@ unsigned int PgScrubber::scrub_requeue_priority(
 
 /* on_new_interval
  *
- * Responsible for restting any scrub state and releasing any resources.
+ * Responsible for resetting any scrub state and releasing any resources.
  * Any inflight events will be ignored via check_interval/should_drop_message
  * or canceled.
  */
@@ -481,11 +482,13 @@ void PgScrubber::on_new_interval()
                  is_scrub_active(), is_queued_or_active())
           << dendl;
 
+  // If in active session - the IntervalChanged handler takes care of
+  // discarding the remote reservations, and transitioning out of Session.
+  // That resets both the scrubber and the FSM.
+  m_fsm->process_event(IntervalChanged{});
+
+  // The 'FullReset' is only relevant if we are not an active Primary
   m_fsm->process_event(FullReset{});
-  // we may be the primary
-  if (is_queued_or_active()) {
-    clear_pgscrub_state();
-  }
   rm_from_osd_scrubbing();
 }
 
@@ -1569,7 +1572,7 @@ void PgScrubber::handle_scrub_reserve_request(OpRequestRef op)
    * replicas.  Unconditionally clear any existing state prior to handling
    * the new reservation. */
   m_fsm->process_event(FullReset{});
-  
+
   bool granted{false};
   if (m_pg->cct->_conf->osd_scrub_during_recovery ||
       !m_osds->is_recovery_active()) {
@@ -1600,6 +1603,9 @@ void PgScrubber::handle_scrub_reserve_grant(OpRequestRef op, pg_shard_t from)
   dout(10) << __func__ << " " << *op->get_req() << dendl;
   op->mark_started();
 
+  if (should_drop_message(op)) {
+    return;
+  }
   if (m_reservations.has_value()) {
     m_reservations->handle_reserve_grant(op, from);
   } else {
@@ -1613,6 +1619,9 @@ void PgScrubber::handle_scrub_reserve_reject(OpRequestRef op, pg_shard_t from)
   dout(10) << __func__ << " " << *op->get_req() << dendl;
   op->mark_started();
 
+  if (should_drop_message(op)) {
+    return;
+  }
   if (m_reservations.has_value()) {
     // there is an active reservation process. No action is required otherwise.
     m_reservations->handle_reserve_reject(op, from);
@@ -1623,6 +1632,11 @@ void PgScrubber::handle_scrub_reserve_release(OpRequestRef op)
 {
   dout(10) << __func__ << " " << *op->get_req() << dendl;
   op->mark_started();
+  if (should_drop_message(op)) {
+    // we might have turned into a Primary in the meantime. The interval
+    // change should have been noticed already, and caused us to reset.
+    return;
+  }
 
   /*
    * this specific scrub session has terminated. All incoming events carrying
@@ -1635,7 +1649,7 @@ void PgScrubber::discard_replica_reservations()
 {
   dout(10) << __func__ << dendl;
   if (m_reservations.has_value()) {
-    m_reservations->discard_all();
+    m_reservations->discard_remote_reservations();
   }
 }
 
@@ -1646,47 +1660,6 @@ void PgScrubber::clear_scrub_reservations()
   m_local_osd_resource.reset();          // the local reservation
 }
 
-void PgScrubber::message_all_replicas(int32_t opcode, std::string_view op_text)
-{
-  ceph_assert(m_pg->recovery_state.get_backfill_targets().empty());
-
-  std::vector<pair<int, Message*>> messages;
-  messages.reserve(m_pg->get_actingset().size());
-
-  epoch_t epch = get_osdmap_epoch();
-
-  for (auto& p : m_pg->get_actingset()) {
-
-    if (p == m_pg_whoami)
-      continue;
-
-    dout(10) << "scrub requesting " << op_text << " from osd." << p
-            << " Epoch: " << epch << dendl;
-    Message* m = new MOSDScrubReserve(spg_t(m_pg->info.pgid.pgid, p.shard),
-                                     epch,
-                                     opcode,
-                                     m_pg_whoami);
-    messages.push_back(std::make_pair(p.osd, m));
-  }
-
-  if (!messages.empty()) {
-    m_osds->send_message_osd_cluster(messages, epch);
-  }
-}
-
-void PgScrubber::unreserve_replicas()
-{
-  dout(10) << __func__ << dendl;
-  m_reservations.reset();
-}
-
-void PgScrubber::on_replica_reservation_timeout()
-{
-  if (m_reservations) {
-    m_reservations->handle_no_reply_timeout();
-  }
-}
-
 bool PgScrubber::set_reserving_now() {
   return m_osds->get_scrub_services().set_reserving_now(m_pg_id,
                                                         ceph_clock_now());
@@ -1738,6 +1711,11 @@ void PgScrubber::clear_scrub_blocked()
   m_pg->publish_stats_to_osd();
 }
 
+void PgScrubber::flag_reservations_failure()
+{
+  m_scrub_job->resources_failure = true;
+}
+
 /*
  * note: only called for the Primary.
  */
@@ -2195,8 +2173,7 @@ void PgScrubber::set_scrub_duration()
 void PgScrubber::reserve_replicas()
 {
   dout(10) << __func__ << dendl;
-  m_reservations.emplace(
-    m_pg, m_pg_whoami, m_scrub_job, m_pg->get_cct()->_conf);
+  m_reservations.emplace(*this);
 }
 
 void PgScrubber::cleanup_on_finish()
@@ -2454,121 +2431,71 @@ void PgScrubber::preemption_data_t::reset()
 
 
 // ///////////////////// ReplicaReservations //////////////////////////////////
-namespace Scrub {
 
-void ReplicaReservations::release_replica(pg_shard_t peer, epoch_t epoch)
+#undef dout_prefix
+#define dout_prefix _prefix_fn(_dout, this, __func__)
+template <class T>
+static std::ostream& _prefix_fn(std::ostream* _dout, T* t, std::string fn = "")
 {
-  auto m = new MOSDScrubReserve(spg_t(m_pg_info.pgid.pgid, peer.shard),
-                               epoch,
-                               MOSDScrubReserve::RELEASE,
-                               m_pg->pg_whoami);
-  m_osds->send_message_osd_cluster(peer.osd, m, epoch);
+  return t->gen_prefix(*_dout, fn);
 }
 
-ReplicaReservations::ReplicaReservations(
-  PG* pg,
-  pg_shard_t whoami,
-  Scrub::ScrubJobRef scrubjob,
-  const ConfigProxy& conf)
-    : m_pg{pg}
-    , m_acting_set{pg->get_actingset()}
+namespace Scrub {
+
+ReplicaReservations::ReplicaReservations(ScrubMachineListener& scrbr)
+
+    : m_scrubber{scrbr}
+    , m_pg{m_scrubber.get_pg()}
+    , m_pgid{m_scrubber.get_spgid().pgid}
     , m_osds{m_pg->get_pg_osd(ScrubberPasskey())}
-    , m_pending{static_cast<int>(m_acting_set.size()) - 1}
-    , m_pg_info{m_pg->get_pg_info(ScrubberPasskey())}
-    , m_scrub_job{scrubjob}
-    , m_conf{conf}
 {
+  // the acting set is sorted by pg_shard_t. The reservations are to be issued
+  // in this order, so that the OSDs will receive the requests in a consistent
+  // order. This is done to reduce the chance of having two PGs that share some
+  // of their acting-set OSDs, consistently interfering with each other's
+  // reservation process.
+  auto acting = m_pg->get_actingset();
+  m_sorted_secondaries.reserve(acting.size());
+  std::copy_if(
+      acting.cbegin(), acting.cend(), std::back_inserter(m_sorted_secondaries),
+      [whoami = m_pg->pg_whoami](const pg_shard_t& shard) {
+       return shard != whoami;
+      });
+
+  m_next_to_request = m_sorted_secondaries.cbegin();
+  // send out the 1'st request (unless we have no replicas)
+  send_next_reservation_or_complete();
+}
+
+void ReplicaReservations::release_all()
+{
+  std::span<const pg_shard_t> replicas{
+      m_sorted_secondaries.cbegin(), m_next_to_request};
+  dout(10) << fmt::format("releasing {}", replicas) << dendl;
   epoch_t epoch = m_pg->get_osdmap_epoch();
-  m_log_msg_prefix = fmt::format(
-      "osd.{} ep: {} scrubber::ReplicaReservations pg[{}]: ", m_osds->whoami,
-      epoch, pg->pg_id);
 
-  m_timeout = conf.get_val<std::chrono::milliseconds>(
-      "osd_scrub_slow_reservation_response");
-
-  if (m_pending <= 0) {
-    // A special case of no replicas.
-    // just signal the scrub state-machine to continue
-    send_all_done();
-
-  } else {
-    // send the reservation requests
-    for (auto p : m_acting_set) {
-      if (p == whoami)
-       continue;
-      auto m = new MOSDScrubReserve(
-       spg_t(m_pg_info.pgid.pgid, p.shard), epoch, MOSDScrubReserve::REQUEST,
+  // send 'release' messages to all replicas we have managed to reserve
+  for (const auto& peer : replicas) {
+    auto m = make_message<MOSDScrubReserve>(
+       spg_t{m_pgid, peer.shard}, epoch, MOSDScrubReserve::RELEASE,
        m_pg->pg_whoami);
-      m_osds->send_message_osd_cluster(p.osd, m, epoch);
-      m_waited_for_peers.push_back(p);
-      dout(10) << __func__ << ": reserve " << p.osd << dendl;
-    }
+    m_pg->send_cluster_message(peer.osd, m, epoch, false);
   }
-}
-
-void ReplicaReservations::send_all_done()
-{
-  // stop any pending timeout timer
-  m_osds->queue_for_scrub_granted(m_pg, scrub_prio_t::low_priority);
-}
 
-void ReplicaReservations::send_reject()
-{
-  // stop any pending timeout timer
-  m_scrub_job->resources_failure = true;
-  m_osds->queue_for_scrub_denied(m_pg, scrub_prio_t::low_priority);
+  m_sorted_secondaries.clear();
+  m_next_to_request = m_sorted_secondaries.cbegin();
 }
 
-void ReplicaReservations::discard_all()
+void ReplicaReservations::discard_remote_reservations()
 {
-  dout(10) << __func__ << ": " << m_reserved_peers << dendl;
-
-  m_had_rejections = true;  // preventing late-coming responses from triggering
-                           // events
-  m_reserved_peers.clear();
-  m_waited_for_peers.clear();
-}
-
-/*
- * The following holds when update_latecomers() is called:
- * - we are still waiting for replies from some of the replicas;
- * - we might have already set a timer. If so, we should restart it.
- * - we might have received responses from 50% of the replicas.
- */
-std::optional<ReplicaReservations::tpoint_t>
-ReplicaReservations::update_latecomers(tpoint_t now_is)
-{
-  if (m_reserved_peers.size() > m_waited_for_peers.size()) {
-    // at least half of the replicas have already responded. Time we flag
-    // latecomers.
-    return now_is + m_timeout;
-  } else {
-    return std::nullopt;
-  }
+  dout(10) << "reset w/o issuing messages" << dendl;
+  m_sorted_secondaries.clear();
+  m_next_to_request = m_sorted_secondaries.cbegin();
 }
 
 ReplicaReservations::~ReplicaReservations()
 {
-  m_had_rejections = true;  // preventing late-coming responses from triggering
-                           // events
-
-  // send un-reserve messages to all reserved replicas. We do not wait for
-  // answer (there wouldn't be one). Other incoming messages will be discarded
-  // on the way, by our owner.
-  epoch_t epoch = m_pg->get_osdmap_epoch();
-
-  for (auto& p : m_reserved_peers) {
-    release_replica(p, epoch);
-  }
-  m_reserved_peers.clear();
-
-  // note: the release will follow on the heels of the request. When tried
-  // otherwise, grants that followed a reject arrived after the whole scrub
-  // machine-state was reset, causing leaked reservations.
-  for (auto& p : m_waited_for_peers) {
-    release_replica(p, epoch);
-  }
-  m_waited_for_peers.clear();
+  release_all();
 }
 
 /**
@@ -2577,108 +2504,121 @@ ReplicaReservations::~ReplicaReservations()
  */
 void ReplicaReservations::handle_reserve_grant(OpRequestRef op, pg_shard_t from)
 {
-  dout(10) << __func__ << ": granted by " << from << dendl;
-  op->mark_started();
-
-  {
-    // reduce the amount of extra release messages. Not a must, but the log is
-    // cleaner
-    auto w = find(m_waited_for_peers.begin(), m_waited_for_peers.end(), from);
-    if (w != m_waited_for_peers.end())
-      m_waited_for_peers.erase(w);
+  // verify that the grant is from the peer we expected. If not?
+  // for now - abort the OSD. \todo reconsider the reaction.
+  if (!get_last_sent().has_value() || from != *get_last_sent()) {
+    dout(1) << fmt::format(
+                  "unexpected grant from {} (expected {})", from,
+                  get_last_sent().value_or(pg_shard_t{}))
+           << dendl;
+    ceph_assert(from == get_last_sent());
+    return;
   }
 
-  // are we forced to reject the reservation?
-  if (m_had_rejections) {
+  auto elapsed = clock::now() - m_last_request_sent_at;
 
-    dout(10) << __func__ << ": rejecting late-coming reservation from " << from
-            << dendl;
-    release_replica(from, m_pg->get_osdmap_epoch());
-
-  } else if (std::find(m_reserved_peers.begin(),
-                      m_reserved_peers.end(),
-                      from) != m_reserved_peers.end()) {
+  // log a warning if the response was slow to arrive
+  auto warn_timeout = m_scrubber.get_pg_cct()->_conf.get_val<milliseconds>(
+      "osd_scrub_slow_reservation_response");
+  if (!m_slow_response_warned && (elapsed > warn_timeout)) {
+    dout(1) << fmt::format(
+                  "slow reservation response from {} ({}ms)", from,
+                  duration_cast<milliseconds>(elapsed).count())
+           << dendl;
+    // prevent additional warnings
+    m_slow_response_warned = true;
+  }
+  dout(10) << fmt::format(
+                 "granted by {} ({} of {}) in {}ms", from,
+                 active_requests_cnt(), m_sorted_secondaries.size(),
+                 duration_cast<milliseconds>(elapsed).count())
+          << dendl;
+  send_next_reservation_or_complete();
+}
 
-    dout(10) << __func__ << ": already had osd." << from << " reserved"
-            << dendl;
+void ReplicaReservations::send_next_reservation_or_complete()
+{
+  if (m_next_to_request == m_sorted_secondaries.cend()) {
+    // granted by all replicas
+    dout(10) << "remote reservation complete" << dendl;
+    m_osds->queue_for_scrub_granted(m_pg, scrub_prio_t::low_priority);
 
   } else {
-
-    dout(10) << __func__ << ": osd." << from << " scrub reserve = success"
+    // send the next reservation request
+    const auto peer = *m_next_to_request;
+    const auto epoch = m_pg->get_osdmap_epoch();
+    auto m = make_message<MOSDScrubReserve>(
+       spg_t{m_pgid, peer.shard}, epoch, MOSDScrubReserve::REQUEST,
+       m_pg->pg_whoami);
+    m_pg->send_cluster_message(peer.osd, m, epoch, false);
+    m_last_request_sent_at = clock::now();
+    dout(10) << fmt::format(
+                   "reserving {} (the {} of {} replicas)", *m_next_to_request,
+                   active_requests_cnt()+1, m_sorted_secondaries.size())
             << dendl;
-    m_reserved_peers.push_back(from);
-
-    // was this response late?
-    auto now_is = clock::now();
-    if (m_timeout_point && (now_is > *m_timeout_point)) {
-      m_osds->clog->warn() << fmt::format(
-       "osd.{} scrubber pg[{}]: late reservation from osd.{}",
-       m_osds->whoami,
-       m_pg->pg_id,
-       from);
-      m_timeout_point.reset();
-    } else {
-      // possibly set a timer to warn about late-coming reservations
-      m_timeout_point = update_latecomers(now_is);
-    }
-
-    if (--m_pending == 0) {
-      send_all_done();
-    }
+    m_next_to_request++;
   }
 }
 
-void ReplicaReservations::handle_reserve_reject(OpRequestRef op,
-                                               pg_shard_t from)
+// temporary comment: the part of handle_reserve_reject() that will not
+// be delegated to the FSM in the following commits
+void ReplicaReservations::verify_rejections_source(
+    OpRequestRef op,
+    pg_shard_t from)
 {
-  dout(10) << __func__ << ": rejected by " << from << dendl;
-  dout(15) << __func__ << ": " << *op->get_req() << dendl;
-  op->mark_started();
-
-  {
-    // reduce the amount of extra release messages. Not a must, but the log is
-    // cleaner
-    auto w = find(m_waited_for_peers.begin(), m_waited_for_peers.end(), from);
-    if (w != m_waited_for_peers.end())
-      m_waited_for_peers.erase(w);
-  }
-
-  if (m_had_rejections) {
-
-    // our failure was already handled when the first rejection arrived
-    dout(15) << __func__ << ": ignoring late-coming rejection from " << from
-            << dendl;
-
-  } else if (std::find(m_reserved_peers.begin(),
-                      m_reserved_peers.end(),
-                      from) != m_reserved_peers.end()) {
+  // a convenient log message for the reservation process conclusion
+  // (matches the one in send_next_reservation_or_complete())
+  dout(10) << fmt::format(
+                 "remote reservation failure. Rejected by {} ({})", from,
+                 *op->get_req())
+          << dendl;
 
-    dout(10) << __func__ << ": already had osd." << from << " reserved"
-            << dendl;
+  // verify that the denial is from the peer we expected. If not?
+  // we should treat it as though the *correct* peer has rejected the request,
+  // but remember to release that peer, too.
 
+  ceph_assert(get_last_sent().has_value());
+  const auto expected = *get_last_sent();
+  if (from != expected) {
+    dout(1) << fmt::format(
+                  "unexpected rejection from {} (expected {})", from, expected)
+           << dendl;
   } else {
-
-    dout(10) << __func__ << ": osd." << from << " scrub reserve = fail"
-            << dendl;
-    m_had_rejections = true;  // preventing any additional notifications
-    send_reject();
+    // correct peer, wrong answer...
+    m_next_to_request--;  // no need to release this one
   }
 }
 
-void ReplicaReservations::handle_no_reply_timeout()
+// to be delegated to the FSM in the following commits
+void ReplicaReservations::handle_reserve_reject(
+    OpRequestRef op,
+    pg_shard_t from)
+{
+  verify_rejections_source(op, from);
+  release_all();
+  m_scrubber.flag_reservations_failure();
+  m_osds->queue_for_scrub_denied(m_pg, scrub_prio_t::low_priority);
+}
+
+std::optional<pg_shard_t> ReplicaReservations::get_last_sent() const
 {
-  dout(1) << fmt::format(
-              "{}: timeout! no reply from {}", __func__, m_waited_for_peers)
-         << dendl;
+  if (m_next_to_request == m_sorted_secondaries.cbegin()) {
+    return std::nullopt;
+  }
+  return *(m_next_to_request - 1);
+}
 
-  // treat reply timeout as if a REJECT was received
-  m_had_rejections = true;  // preventing any additional notifications
-  send_reject();
+size_t ReplicaReservations::active_requests_cnt() const
+{
+  return m_next_to_request - m_sorted_secondaries.cbegin();
 }
 
-std::ostream& ReplicaReservations::gen_prefix(std::ostream& out) const
+std::ostream& ReplicaReservations::gen_prefix(
+    std::ostream& out,
+    std::string fn) const
 {
-  return out << m_log_msg_prefix;
+  return m_pg->gen_prefix(out)
+        << fmt::format("scrubber::ReplicaReservations:{}: ", fn);
 }
 
 
index 52428599514e7d0b01cea8ca0369b3b6d97a7bfd..6f56390e627c060bc89bf50050ece9f3a8e17172 100644 (file)
@@ -90,90 +90,119 @@ struct BuildMap;
 /**
  * Reserving/freeing scrub resources at the replicas.
  *
- * When constructed - sends reservation requests to the acting_set.
+ * When constructed - sends reservation requests to the acting_set OSDs, one
+ * by one.
+ * Once a replica's OSD replies with a 'grant'ed reservation, we send a
+ * reservation request to the next replica.
  * A rejection triggers a "couldn't acquire the replicas' scrub resources"
- * event. All previous requests, whether already granted or not, are explicitly
- * released.
+ * event. All granted reservations are released.
+ *
+ * Reserved replicas should be released at the end of the scrub session. The
+ * one exception is if the scrub terminates upon an interval change. In that
+ * scenario - the replicas discard their reservations on their own accord
+ * when noticing the change in interval, and there is no need (and no
+ * guaranteed way) to send them the release message.
  *
  * Timeouts:
  *
  *  Slow-Secondary Warning:
- *  Once at least half of the replicas have accepted the reservation, we start
- *  reporting any secondary that takes too long (more than <conf> milliseconds
- *  after the previous response received) to respond to the reservation request.
- *  (Why? because we have encountered real-life situations where a specific OSD
- *  was systematically very slow (e.g. 5 seconds) to respond to the reservation
- *  requests, slowing the scrub process to a crawl).
+ *  Warn if a replica takes more than <conf> milliseconds to reply to a
+ *  reservation request. Only one warning is issued per session.
  *
  *  Reservation Timeout:
  *  We limit the total time we wait for the replicas to respond to the
- *  reservation request. If we don't get all the responses (either Grant or
- *  Reject) within <conf> milliseconds, we give up and release all the
- *  reservations we have acquired so far.
+ *  reservation request. If the reservation back-and-forth does not complete
+ *  within <conf> milliseconds, we give up and release all the reservations
+ *  that have been acquired until that moment.
  *  (Why? because we have encountered instances where a reservation request was
  *  lost - either due to a bug or due to a network issue.)
- *
- * A note re performance: I've measured a few container alternatives for
- * m_reserved_peers, with its specific usage pattern. Std::set is extremely
- * slow, as expected. flat_set is only slightly better. Surprisingly -
- * std::vector (with no sorting) is better than boost::small_vec. And for
- * std::vector: no need to pre-reserve.
  */
 class ReplicaReservations {
-  using clock = std::chrono::system_clock;
-  using tpoint_t = std::chrono::time_point<clock>;
+  using clock = ceph::coarse_real_clock;
 
+  ScrubMachineListener& m_scrubber;
   PG* m_pg;
-  std::set<pg_shard_t> m_acting_set;
-  OSDService* m_osds;
-  std::vector<pg_shard_t> m_waited_for_peers;
-  std::vector<pg_shard_t> m_reserved_peers;
-  bool m_had_rejections{false};
-  int m_pending{-1};
-  const pg_info_t& m_pg_info;
-  Scrub::ScrubJobRef m_scrub_job;      ///< a ref to this PG's scrub job
-  const ConfigProxy& m_conf;
 
-  // detecting slow peers (see 'slow-secondary' above)
-  std::chrono::milliseconds m_timeout;
-  std::optional<tpoint_t> m_timeout_point;
+  /// shorthand for m_scrubber.get_spgid().pgid
+  const pg_t m_pgid;
+
+  /// for dout && when queueing messages to the FSM
+  OSDService* m_osds;
 
-  void release_replica(pg_shard_t peer, epoch_t epoch);
+  /// the acting set (not including myself), sorted by pg_shard_t
+  std::vector<pg_shard_t> m_sorted_secondaries;
 
-  void send_all_done();         ///< all reservations are granted
+  /// the next replica to which we will send a reservation request
+  std::vector<pg_shard_t>::const_iterator m_next_to_request;
 
-  /// notify the scrubber that we have failed to reserve replicas' resources
-  void send_reject();
+  /// for logs, and for detecting slow peers
+  clock::time_point m_last_request_sent_at;
 
-  std::optional<tpoint_t> update_latecomers(tpoint_t now_is);
+  /// used to prevent multiple "slow response" warnings
+  bool m_slow_response_warned{false};
 
  public:
-  std::string m_log_msg_prefix;
+  ReplicaReservations(ScrubMachineListener& scrubber);
+
+  ~ReplicaReservations();
 
   /**
-   *  quietly discard all knowledge about existing reservations. No messages
-   *  are sent to peers.
-   *  To be used upon interval change, as we know the the running scrub is no
-   *  longer relevant, and that the replicas had reset the reservations on
-   *  their side.
+   * The OK received from the replica (after verifying that it is indeed
+   * the replica we are expecting a reply from) is noted, and triggers
+   * one of two: either sending a reservation request to the next replica,
+   * or notifying the scrubber that we have reserved them all.
    */
-  void discard_all();
+  void handle_reserve_grant(OpRequestRef op, pg_shard_t from);
 
-  ReplicaReservations(PG* pg,
-                      pg_shard_t whoami,
-                      Scrub::ScrubJobRef scrubjob,
-                      const ConfigProxy& conf); 
+  /**
+   * Verify that the sender of the received rejection is the replica we
+   * were expecting a reply from.
+   * If this is so - just mark the fact that the specific peer need not
+   * be released.
+   *
+   * Note - the actual handling of scrub session termination and of
+   * releasing the reserved replicas is done by the caller (the FSM).
+   */
+  void verify_rejections_source(OpRequestRef op, pg_shard_t from);
 
-  ~ReplicaReservations();
+  void handle_reserve_reject(OpRequestRef op, pg_shard_t from);
 
-  void handle_reserve_grant(OpRequestRef op, pg_shard_t from);
+  /**
+   * Notifies implementation that it is no longer responsible for releasing
+   * tracked remote reservations.
+   *
+   * The intended usage is upon interval change.  In general, replicas are
+   * responsible for releasing their own resources upon interval change without
+   * coordination from the primary.
+   *
+   * Sends no messages.
+   */
+  void discard_remote_reservations();
 
-  void handle_reserve_reject(OpRequestRef op, pg_shard_t from);
+  // note: 'public', as accessed via the 'standard' dout_prefix() macro
+  std::ostream& gen_prefix(std::ostream& out, std::string fn) const;
+
+ private:
+  /// send 'release' messages to all replicas we have managed to reserve
+  void release_all();
+
+  /// send a reservation request to a replica's OSD
+  void send_request_to_replica(pg_shard_t peer, epoch_t epoch);
+
+  /// let the scrubber know that we have reserved all the replicas
+  void send_all_done();
+
+  /// the only replica we are expecting a reply from
+  std::optional<pg_shard_t> get_last_sent() const;
 
-  // if timing out on receiving replies from our replicas:
-  void handle_no_reply_timeout();
+  /// The number of requests that have been sent (and not rejected) so far.
+  size_t active_requests_cnt() const;
 
-  std::ostream& gen_prefix(std::ostream& out) const;
+  /**
+   * Either send a reservation request to the next replica, or notify the
+   * scrubber that we have reserved all the replicas.
+   */
+  void send_next_reservation_or_complete();
 };
 
 /**
@@ -348,9 +377,6 @@ class PgScrubber : public ScrubPgIF,
   void handle_scrub_reserve_release(OpRequestRef op) final;
   void discard_replica_reservations() final;
   void clear_scrub_reservations() final;  // PG::clear... fwds to here
-  void unreserve_replicas() final;
-  void on_replica_reservation_timeout() final;
-
 
   // managing scrub op registration
 
@@ -453,10 +479,14 @@ class PgScrubber : public ScrubPgIF,
   // the I/F used by the state-machine (i.e. the implementation of
   // ScrubMachineListener)
 
-  CephContext* get_cct() const final { return m_pg->cct; }
   LogChannelRef &get_clog() const final;
   int get_whoami() const final;
   spg_t get_spgid() const final { return m_pg->get_pgid(); }
+  PG* get_pg() const final { return m_pg; }
+
+  // temporary interface (to be discarded in a follow-up PR)
+  /// set the 'resources_failure' flag in the scrub-job object
+  void flag_reservations_failure();
 
   scrubber_callback_cancel_token_t schedule_callback_after(
     ceph::timespan duration, scrubber_callback_t &&cb);
@@ -871,14 +901,6 @@ class PgScrubber : public ScrubPgIF,
 
   std::list<Context*> m_callbacks;
 
-  /**
-   * send a replica (un)reservation request to the acting set
-   *
-   * @param opcode - one of MOSDScrubReserve::REQUEST
-   *                  or MOSDScrubReserve::RELEASE
-   */
-  void message_all_replicas(int32_t opcode, std::string_view op_text);
-
   hobject_t m_max_end; ///< Largest end that may have been sent to replicas
   ScrubMapBuilder m_primary_scrubmap_pos;
 
index efb0917882067afdc2b5dd50eb4b898de5aa72cc..33924d134298f1f1f48fff9d50e9e31c9d813e93 100644 (file)
@@ -140,6 +140,16 @@ Session::~Session()
   scrbr->clear_pgscrub_state();
 }
 
+sc::result Session::react(const IntervalChanged&)
+{
+  DECLARE_LOCALS;  // 'scrbr' & 'pg_id' aliases
+  dout(10) << "Session::react(const IntervalChanged&)" << dendl;
+
+  /// \todo (future commit): the reservations will be local to this state
+  scrbr->discard_replica_reservations();
+  return transit<NotActive>();
+}
+
 
 // ----------------------- ReservingReplicas ---------------------------------
 
@@ -152,7 +162,7 @@ ReservingReplicas::ReservingReplicas(my_context ctx)
 
   scrbr->reserve_replicas();
 
-  auto timeout = scrbr->get_cct()->_conf.get_val<
+  auto timeout = scrbr->get_pg_cct()->_conf.get_val<
     std::chrono::milliseconds>("osd_scrub_reservation_timeout");
   if (timeout.count() > 0) {
     // Start a timer to handle case where the replicas take a long time to
@@ -180,8 +190,11 @@ sc::result ReservingReplicas::react(const ReservationTimeout&)
       scrbr->get_spgid(), entered_at);
   dout(5) << msg << dendl;
   scrbr->get_clog()->warn() << "osd." << scrbr->get_whoami() << " " << msg;
-  scrbr->on_replica_reservation_timeout();
-  return discard_event();
+
+  // cause the scrubber to stop the scrub session, marking 'reservation
+  // failure' as the cause (affecting future scheduling)
+  scrbr->flag_reservations_failure();
+  return transit<NotActive>();
 }
 
 sc::result ReservingReplicas::react(const ReservationFailure&)
index 071a464ce130e18b1b3053e4bda1b70414038e7b..f5a3bb677ba9bcfbce80532128de614175e58f58 100644 (file)
@@ -127,6 +127,11 @@ MEV(SchedReplica)
 /// that is in-flight to the local ObjectStore
 MEV(ReplicaPushesUpd)
 
+/// a new interval has dawned.
+/// For a Primary: Discards replica reservations, so that the FullReset that would
+/// follow it would not attempt to release them.
+MEV(IntervalChanged)
+
 /// guarantee that the FSM is in the quiescent state (i.e. NotActive)
 MEV(FullReset)
 
@@ -202,7 +207,7 @@ public:
    * from being delivered.  The intended usage is to invoke
    * schedule_timer_event_after in the constructor of the state machine state
    * intended to handle the event and assign the returned timer_event_token_t
-   * to a member of that state. That way, exiting the state will implicitely
+   * to a member of that state. That way, exiting the state will implicitly
    * cancel the event.  See RangedBlocked::m_timeout_token and
    * RangeBlockedAlarm for an example usage.
    */
@@ -338,12 +343,15 @@ struct NotActive : sc::state<NotActive, ScrubMachine>, NamedSimply {
  *  reservations are released. This is because we know that the replicas are
  *  also resetting their reservations.
  */
-struct Session : sc::state<Session, ScrubMachine, ReservingReplicas>, NamedSimply {
+struct Session : sc::state<Session, ScrubMachine, ReservingReplicas>,
+                 NamedSimply {
   explicit Session(my_context ctx);
   ~Session();
 
-  using reactions = mpl::list<sc::transition<FullReset, NotActive>>;
-  /// \todo handle interval change
+  using reactions = mpl::list<sc::transition<FullReset, NotActive>,
+                              sc::custom_reaction<IntervalChanged>>;
+
+  sc::result react(const IntervalChanged&);
 };
 
 struct ReservingReplicas : sc::state<ReservingReplicas, Session>,
index cfef666e1b118594a73cf7bb8e4e9b433bc04b93..e9db62e2a2d882f5061363241f1412ba365e609f 100644 (file)
@@ -48,10 +48,11 @@ struct preemption_t {
 }  // namespace Scrub
 
 struct ScrubMachineListener {
-  virtual CephContext *get_cct() const = 0;
+  virtual CephContext *get_pg_cct() const = 0;
   virtual LogChannelRef &get_clog() const = 0;
   virtual int get_whoami() const = 0;
   virtual spg_t get_spgid() const = 0;
+  virtual PG* get_pg() const = 0;
 
   using scrubber_callback_t = std::function<void(void)>;
   using scrubber_callback_cancel_token_t = Context*;
@@ -72,9 +73,9 @@ struct ScrubMachineListener {
   /**
    * cancel_callback
    *
-   * Attempts to cancel the callback to whcih the passed token is associated.
+   * Attempts to cancel the callback to which the passed token is associated.
    * cancel_callback is best effort, the callback may still fire.
-   * cancel_callback guarrantees that exactly one of the two things will happen:
+   * cancel_callback guarantees that exactly one of the two things will happen:
    * - the callback is destroyed and will not be invoked
    * - the callback will be invoked
    */
@@ -177,9 +178,8 @@ struct ScrubMachineListener {
    */
   virtual void reserve_replicas() = 0;
 
-  virtual void unreserve_replicas() = 0;
-
-  virtual void on_replica_reservation_timeout() = 0;
+  /// discard replica reservations without sending a message (on interval)
+  virtual void discard_replica_reservations() = 0;
 
   virtual void set_scrub_begin_time() = 0;
 
@@ -238,4 +238,8 @@ struct ScrubMachineListener {
 
   /// sending cluster-log warnings
   virtual void log_cluster_warning(const std::string& msg) const = 0;
+
+  // temporary interface (to be discarded in a follow-up PR)
+  /// set the 'resources_failure' flag in the scrub-job object
+  virtual void flag_reservations_failure() = 0;
 };
index d5d4a8c278cf27cd2552713e24add7f86a32697f..929bec06e163f35fc00f87db29921bd8088adb89 100644 (file)
@@ -382,11 +382,6 @@ struct ScrubPgIF {
 
   // --------------- reservations -----------------------------------
 
-  /**
-   *  message all replicas with a request to "unreserve" scrub
-   */
-  virtual void unreserve_replicas() = 0;
-
   /**
    *  "forget" all replica reservations. No messages are sent to the
    *  previously-reserved.