void OSDService::queue_for_scrub_resched(PG* pg, Scrub::scrub_prio_t with_priority)
{
+ // Resulting scrub event: 'InternalSchedScrub'
queue_scrub_event_msg<PGScrubResched>(pg, with_priority);
}
return pgid < rhs.pgid;
}
-// this one is only moved here (from the header) temporarily, for debugging:
-void OSDService::unreg_pg_scrub(spg_t pgid, utime_t t)
-{
- std::lock_guard l{OSDService::sched_scrub_lock};
- size_t removed = sched_scrub_pg.erase(ScrubJob{cct, pgid, t});
- ceph_assert(removed);
- dout(10) << __func__ << " scrub-set removed: " << pgid << " T(" << t << ")" << dendl;
-}
-
-// this one is only moved here (from the header) temporarily, for debugging:
-utime_t OSDService::reg_pg_scrub(spg_t pgid, utime_t t, double pool_scrub_min_interval,
- double pool_scrub_max_interval, bool must)
-{
- ScrubJob scrub_job(cct, pgid, t, pool_scrub_min_interval, pool_scrub_max_interval,
- must);
- std::lock_guard l(OSDService::sched_scrub_lock);
- auto [x, inserted] = sched_scrub_pg.insert(scrub_job);
- dout(10) << __func__ << " scrub-set inserted: " << pgid << " T(" << t << ")" << " must: " << must << " inserted "
- << inserted << dendl;
- return scrub_job.sched_time;
-}
-
void OSDService::dumps_scrub(ceph::Formatter *f)
{
ceph_assert(f != nullptr);
};
std::set<ScrubJob> sched_scrub_pg;
- /// @returns the scrub_reg_stamp used for unregister'ing the scrub job
- utime_t reg_pg_scrub(spg_t pgid, utime_t t, double pool_scrub_min_interval,
- double pool_scrub_max_interval, bool must);
- void unreg_pg_scrub(spg_t pgid, utime_t t);
+ /// @returns the scrub_reg_stamp used for unregistering the scrub job
+ utime_t reg_pg_scrub(spg_t pgid,
+ utime_t t,
+ double pool_scrub_min_interval,
+ double pool_scrub_max_interval,
+ bool must) {
+ ScrubJob scrub_job(cct, pgid, t, pool_scrub_min_interval, pool_scrub_max_interval,
+ must);
+ std::lock_guard l(OSDService::sched_scrub_lock);
+ sched_scrub_pg.insert(scrub_job);
+ return scrub_job.sched_time;
+ }
+
+ void unreg_pg_scrub(spg_t pgid, utime_t t) {
+ std::lock_guard l(sched_scrub_lock);
+ size_t removed = sched_scrub_pg.erase(ScrubJob(cct, pgid, t));
+ ceph_assert(removed);
+ }
+
bool first_scrub_stamp(ScrubJob *out) {
std::lock_guard l(sched_scrub_lock);
if (sched_scrub_pg.empty())
m_scrubber->replica_scrub_op(op);
}
-void PG::scrub(epoch_t queued, ThreadPool::TPHandle& handle)
+void PG::scrub(epoch_t epoch_queued, ThreadPool::TPHandle& handle)
{
- dout(10) << __func__ << (is_primary() ? " (primary)" : " (replica)") << dendl;
+ dout(10) << __func__ << " queued at: " << epoch_queued << dendl;
scrub_queued = false;
+ ceph_assert(is_primary());
+ ceph_assert(!m_scrubber->is_scrub_active());
- if (pg_has_reset_since(queued)) {
- dout(10) << " pg::scrub reset_since " << __func__ << " " << queued << dendl;
- dout(10) << " pg::scrub reset_since " << __func__ << " "
- << recovery_state.get_last_peering_reset() << dendl;
- m_scrubber->scrub_clear_state(false);
- return;
- }
+ // a new scrub
- ceph_assert(
- is_primary()); // as the replica request should have reached PG::replica_scrub()
+ m_scrubber->reset_epoch(epoch_queued);
- ceph_assert(!m_scrubber->is_scrub_active());
- // a new scrub
- m_scrubber->reset_epoch(queued);
- m_scrubber->send_start_scrub();
+ // note: send_start_scrub() will verify 'epoch queued' against our current interval
+ m_scrubber->send_start_scrub(epoch_queued);
}
// note: no need to secure OSD resources for a recovery scrub
void PG::recovery_scrub(epoch_t epoch_queued, ThreadPool::TPHandle& handle)
{
- dout(10) << "pg::" << __func__ << " queued at: " << epoch_queued << dendl;
+ dout(10) << __func__ << " queued at: " << epoch_queued << dendl;
scrub_queued = false;
-
- if (pg_has_reset_since(epoch_queued)) {
- dout(10) << " reset_since " << __func__ << " " << epoch_queued << dendl;
- dout(10) << " reset_since " << __func__ << " "
- << recovery_state.get_last_peering_reset() << dendl;
- return;
- }
-
ceph_assert(is_primary());
ceph_assert(!m_scrubber->is_scrub_active());
// a new scrub
m_scrubber->reset_epoch(epoch_queued);
- m_scrubber->send_start_after_repair();
+ m_scrubber->send_start_after_repair(epoch_queued);
}
void PG::replica_scrub(epoch_t epoch_queued,
[[maybe_unused]] ThreadPool::TPHandle& handle)
{
- dout(10) << "pg::" << __func__ << " queued at: " << epoch_queued
+ dout(10) << __func__ << " queued at: " << epoch_queued
<< (is_primary() ? " (primary)" : " (replica)") << dendl;
scrub_queued = false;
- m_scrubber->replica_scrub(epoch_queued);
+ m_scrubber->send_start_replica(epoch_queued);
}
void PG::scrub_send_scrub_resched(epoch_t epoch_queued,
[[maybe_unused]] ThreadPool::TPHandle& handle)
{
- dout(10) << __func__ << (is_primary() ? " (primary)" : " (replica)") << dendl;
+ dout(10) << __func__ << " queued at: " << epoch_queued << dendl;
scrub_queued = false;
- m_scrubber->send_scrub_resched();
+ m_scrubber->send_scrub_resched(epoch_queued);
}
void PG::scrub_send_resources_granted(epoch_t epoch_queued,
[[maybe_unused]] ThreadPool::TPHandle& handle)
{
dout(10) << __func__ << " queued at: " << epoch_queued << dendl;
- m_scrubber->send_remotes_reserved();
+ m_scrubber->send_remotes_reserved(epoch_queued);
}
void PG::scrub_send_resources_denied(epoch_t epoch_queued,
[[maybe_unused]] ThreadPool::TPHandle& handle)
{
dout(10) << __func__ << " queued at: " << epoch_queued << dendl;
- m_scrubber->send_reservation_failure();
+ m_scrubber->send_reservation_failure(epoch_queued);
}
void PG::replica_scrub_resched(epoch_t epoch_queued,
{
dout(10) << __func__ << " queued at: " << epoch_queued << dendl;
scrub_queued = false;
- m_scrubber->replica_scrub_resched(epoch_queued);
+ m_scrubber->send_sched_replica(epoch_queued);
}
void PG::scrub_send_pushes_update(epoch_t epoch_queued,
[[maybe_unused]] ThreadPool::TPHandle& handle)
{
dout(10) << __func__ << " queued at: " << epoch_queued << dendl;
- if (pg_has_reset_since(epoch_queued)) {
- dout(10) << __func__ << " been reset at "
- << recovery_state.get_last_peering_reset() << dendl;
- return;
- }
- m_scrubber->active_pushes_notification();
+ m_scrubber->active_pushes_notification(epoch_queued);
}
void PG::scrub_send_replica_pushes(epoch_t epoch_queued,
[[maybe_unused]] ThreadPool::TPHandle& handle)
{
- dout(10) << __func__ << " queued at: " << epoch_queued << dendl;
- m_scrubber->send_replica_pushes_upd();
+ dout(15) << __func__ << " queued at: " << epoch_queued << dendl;
+ m_scrubber->send_replica_pushes_upd(epoch_queued);
}
void PG::scrub_send_applied_update(epoch_t epoch_queued,
[[maybe_unused]] ThreadPool::TPHandle& handle)
{
- dout(10) << __func__ << " queued at: " << epoch_queued << dendl;
- if (pg_has_reset_since(epoch_queued)) {
- dout(10) << __func__ << " been reset at "
- << recovery_state.get_last_peering_reset() << dendl;
- return;
- }
+ dout(15) << __func__ << " queued at: " << epoch_queued << dendl;
m_scrubber->update_applied_notification(epoch_queued);
}
void PG::scrub_send_unblocking(epoch_t epoch_queued,
[[maybe_unused]] ThreadPool::TPHandle& handle)
{
- dout(10) << __func__ << " queued at: " << epoch_queued << dendl;
- if (pg_has_reset_since(epoch_queued)) {
- dout(10) << __func__ << " been reset at "
- << recovery_state.get_last_peering_reset() << dendl;
- return;
- }
- m_scrubber->send_scrub_unblock();
+ dout(15) << __func__ << " queued at: " << epoch_queued << dendl;
+ m_scrubber->send_scrub_unblock(epoch_queued);
}
void PG::scrub_send_digest_update(epoch_t epoch_queued,
[[maybe_unused]] ThreadPool::TPHandle& handle)
{
- dout(10) << __func__ << " queued at: " << epoch_queued << dendl;
- m_scrubber->digest_update_notification();
+ dout(15) << __func__ << " queued at: " << epoch_queued << dendl;
+ m_scrubber->digest_update_notification(epoch_queued);
}
void PG::scrub_send_replmaps_ready(epoch_t epoch_queued,
[[maybe_unused]] ThreadPool::TPHandle& handle)
{
- dout(10) << __func__ << " queued at: " << epoch_queued << dendl;
- m_scrubber->send_replica_maps_ready();
+ dout(15) << __func__ << " queued at: " << epoch_queued << dendl;
+ m_scrubber->send_replica_maps_ready(epoch_queued);
}
bool PG::ops_blocked_by_scrub() const
r = -EAGAIN;
} else {
bool store_queried = m_scrubber->get_store_errors(arg, result);
- if (!store_queried) {
+ if (store_queried) {
+ encode(result, osd_op->outdata);
+ } else {
// the scrubber's store is not initialized
r = -ENOENT;
}
}
- encode(result, osd_op->outdata); // RRR really? even if no store?
return r;
}
}
m_pl_pg->finish_ctx(ctx.get(), pg_log_entry_t::MODIFY);
+ ++num_digest_updates_pending;
ctx->register_on_success([this]() {
dout(20) << "updating scrub digest " << num_digest_updates_pending << dendl;
if (--num_digest_updates_pending <= 0) {
}
});
- ++num_digest_updates_pending;
m_pl_pg->simple_opc_submit(std::move(ctx));
}
dout(10) << __func__ << " (" << mode << ") finish" << dendl;
}
-PrimaryLogScrub::PrimaryLogScrub(PrimaryLogPG* pg)
- : PgScrubber{pg}, m_pl_pg{pg}
-{}
+PrimaryLogScrub::PrimaryLogScrub(PrimaryLogPG* pg) : PgScrubber{pg}, m_pl_pg{pg} {}
void PrimaryLogScrub::_scrub_clear_state()
{
- dout(7) << __func__ << " - pg(" << m_pl_pg->pg_id << dendl;
+ dout(15) << __func__ << dendl;
m_scrub_cstat = object_stat_collection_t();
}
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
+// -*- mode:C++; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=2 sw=2 smarttab
#include "pg_scrubber.h"
return out;
}
-bool PgScrubber::is_event_relevant(epoch_t queued) const
+// returns false if the message should be discarded. Handles the notification of interval
+// change, if not done already. called only for active scrub? not sure.
+
+// let's first make this a Primary-only function
+bool PgScrubber::check_interval(epoch_t epoch_to_verify)
{
- return is_primary() && m_pg->is_active() && m_pg->is_clean() && is_scrub_active() &&
- !was_epoch_changed() && (!queued || !m_pg->pg_has_reset_since(queued));
+ const auto current_interval = m_pg->get_same_interval_since();
+
+ if (epoch_to_verify < current_interval) {
+ // the event will not be delivered. If we have already noticed and handled
+ // the change of seasons, it will be silently discarded. Otherwise - we
+ // reset the scrubber and its FSM.
+ dout(10) << __func__ << " stale message. epoch: " << epoch_to_verify << " vs. "
+ << current_interval << " (handled: " << m_last_dead_interval << ")" << dendl;
+
+ if (epoch_to_verify > m_last_dead_interval) {
+
+ // we have not seen this interval change yet.
+ // The remote reservations are no longer relevant.
+
+ m_last_dead_interval = current_interval;
+
+ // clear the remote reservations. No need to send messages.
+ if (m_reservations) {
+ m_reservations->discard_all();
+ }
+
+ // stop the scrub and send a reset message to the FSM
+ scrub_clear_state();
+ }
+ return false;
+ }
+
+ return true;
}
-bool PgScrubber::should_abort_scrub(epoch_t queued) const
+bool PgScrubber::check_interval_replica(epoch_t epoch_to_verify)
{
- dout(10) << __func__ << "(): queued:" << queued << " required: " << m_flags.required
- << " noscrub: " << get_osdmap()->test_flag(CEPH_OSDMAP_NOSCRUB) << " / "
- << m_pg->pool.info.has_flag(pg_pool_t::FLAG_NOSCRUB) << dendl;
+ const auto current_interval = m_pg->get_same_interval_since();
+
+ if (epoch_to_verify < current_interval) {
+ // the event will not be delivered. If we have already noticed and handled
+ // the change of seasons, it will be silently discarded. Otherwise - we
+ // reset the scrubber and its FSM.
+ dout(10) << __func__ << " stale message. epoch: " << epoch_to_verify << " vs. "
+ << current_interval << " (handled: " << m_last_dead_interval << ")" << dendl;
- if (!is_primary() || !m_pg->is_active() ||
- (queued && m_pg->pg_has_reset_since(queued))) {
+ if (epoch_to_verify > m_last_dead_interval) {
+
+ // we have not seen this interval change yet.
+ // The remote reservations are no longer relevant.
+
+ m_last_dead_interval = current_interval;
+
+ // clear the remote reservations. No need to send messages
+ m_remote_osd_resource.reset();
+
+ // stop the scrub and send a reset message to the FSM
+ // replica_handling_done();
+ send_interval_changed();
+ }
+ return false;
+ }
+
+ // verify that we are reserved by the primary
+ // not true anymore (see rapair scrubs) ceph_assert(m_remote_osd_resource &&
+ // m_remote_osd_resource->is_reserved());
+
+ return true;
+}
+
+bool PgScrubber::is_message_relevant(epoch_t epoch_to_verify)
+{
+ if (!m_active) {
+ // not scrubbing. We can assume that the scrub was already terminated, and we
+ // can silently discard the incoming event.
+ return false;
+ }
+
+ // is this a message from before we started this scrub?
+ if (epoch_to_verify < m_epoch_start) {
+ return false;
+ }
+
+ // check for reasons to abort this scrub
+
+ // has a new interval started?
+ if (!check_interval(epoch_to_verify)) {
+ // if this is a new interval, check_interval() just discarded
+ // remote resources and then killed the scrub
+ return false;
+ }
+
+ ceph_assert(is_primary());
+
+ // were we instructed to abort?
+ return verify_against_abort(epoch_to_verify);
+}
+
+
+// false if the message was discarded because of an abort flag.
+// Reset everything if the abort was not handled before.
+bool PgScrubber::verify_against_abort(epoch_t epoch_to_verify)
+{
+ if (!should_abort()) {
return true;
}
+ dout(15) << __func__ << " aborting. incoming epoch: " << epoch_to_verify
+ << "vs last-aborted: " << m_last_aborted << dendl;
+
+ // if we were not aware of the abort before - kill the scrub.
+ if (epoch_to_verify > m_last_aborted) {
+ scrub_clear_state();
+ m_last_aborted = std::max(epoch_to_verify, m_epoch_start);
+ }
+ return false;
+}
+
+bool PgScrubber::should_abort() const
+{
if (m_flags.required) {
return false; // not stopping 'required' scrubs for configuration changes
}
- if (state_test(PG_STATE_DEEP_SCRUB)) {
+ if (m_is_deep) {
if (get_osdmap()->test_flag(CEPH_OSDMAP_NODEEP_SCRUB) ||
m_pg->pool.info.has_flag(pg_pool_t::FLAG_NODEEP_SCRUB)) {
dout(10) << "nodeep_scrub set, aborting" << dendl;
return true;
}
- } else if (state_test(PG_STATE_SCRUBBING)) {
- if (get_osdmap()->test_flag(CEPH_OSDMAP_NOSCRUB) ||
- m_pg->pool.info.has_flag(pg_pool_t::FLAG_NOSCRUB)) {
- dout(10) << "noscrub set, aborting" << dendl;
- return true;
- }
+ }
+
+ if (get_osdmap()->test_flag(CEPH_OSDMAP_NOSCRUB) ||
+ m_pg->pool.info.has_flag(pg_pool_t::FLAG_NOSCRUB)) {
+ dout(10) << "noscrub set, aborting" << dendl;
+ return true;
}
return false;
}
-void PgScrubber::send_start_scrub()
+// sending (processing) state-machine events --------------------------------
+
+/*
+ * a note re the checks performed before sending scrub-initiating messages:
+ *
+ * For those ('StartScrub', 'AfterRepairScrub') scrub-initiation messages that
+ * possibly were in the queue while the PG changed state and became unavailable for
+ * scrubbing:
+ *
+ * The check_interval() catches all major changes to the PG. As for the other conditions
+ * we may check (and see is_message_relevant() above):
+ *
+ * - we are not 'active' yet, so must check against is_active(), andL
+ *
+ * - the 'abort' flags were just verified (when the triggering message was queued). As
+ * those are only modified in human speeds - they need not be queried again.
+ *
+ * Some of the considerations above are also relevant to the replica-side initiation
+ * ('StartReplica' & 'StartReplicaNoWait').
+ */
+
+
+void PgScrubber::send_start_scrub(epoch_t epoch_queued)
{
- dout(10) << "scrubber event -->> " << __func__ << dendl;
- if (should_abort_scrub(epoch_t(0))) {
- dout(10) << __func__ << " aborting!" << dendl;
- scrub_clear_state(false);
- } else {
+ dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued << dendl;
+ if (check_interval(epoch_queued)) {
m_fsm->my_states();
m_fsm->process_event(StartScrub{});
}
dout(10) << "scrubber event --<< " << __func__ << dendl;
}
-void PgScrubber::send_start_after_repair()
+void PgScrubber::send_start_after_repair(epoch_t epoch_queued)
{
- dout(10) << "scrubber event -->> " << __func__ << dendl;
- m_fsm->my_states();
- m_fsm->process_event(AfterRepairScrub{});
+ dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued << dendl;
+ if (check_interval(epoch_queued)) {
+ m_fsm->my_states();
+ m_fsm->process_event(AfterRepairScrub{});
+ }
dout(10) << "scrubber event --<< " << __func__ << dendl;
}
-void PgScrubber::send_scrub_unblock()
+void PgScrubber::send_scrub_unblock(epoch_t epoch_queued)
{
- dout(10) << "scrubber event -->> " << __func__ << dendl;
- if (should_abort_scrub(epoch_t(0))) {
-
- dout(10) << __func__ << " aborting!" << dendl;
- scrub_clear_state(false);
-
- } else if (is_scrub_active()) {
-
+ dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued << dendl;
+ if (is_message_relevant(epoch_queued)) {
m_fsm->my_states();
m_fsm->process_event(Unblocked{});
-
- } else {
- dout(10) << __func__ << " ignored as scrub not active" << dendl;
}
dout(10) << "scrubber event --<< " << __func__ << dendl;
}
-void PgScrubber::send_scrub_resched()
+void PgScrubber::send_scrub_resched(epoch_t epoch_queued)
{
- dout(10) << "scrubber event -->> " << __func__ << dendl;
- if (should_abort_scrub(epoch_t(0))) {
- dout(10) << __func__ << " aborting!" << dendl;
- scrub_clear_state(false);
- } else if (is_scrub_active()) {
+ dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued << dendl;
+ if (is_message_relevant(epoch_queued)) {
m_fsm->my_states();
m_fsm->process_event(InternalSchedScrub{});
- } else {
- // no need to send anything
- dout(10) << __func__ << " event no longer relevant" << dendl;
}
- dout(10) << "scrubber event --<< " << __func__ << dendl;
}
-void PgScrubber::send_start_replica()
+void PgScrubber::send_start_replica(epoch_t epoch_queued)
{
- dout(10) << "scrubber event -->> " << __func__ << dendl;
- m_fsm->my_states();
- m_fsm->process_event(StartReplica{});
+ dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued << dendl;
+ if (is_primary()) {
+ // shouldn't happen. Ignore
+ dout(1) << "got a replica scrub request while Primary!" << dendl;
+ return;
+ }
+ if (check_interval_replica(epoch_queued)) {
+ m_fsm->my_states();
+ // buy us some time by not waiting for updates if there are none
+ // to wait for. Affects the transition from NotActive into either
+ // ReplicaWaitUpdates or ActiveReplica.
+ if (pending_active_pushes())
+ m_fsm->process_event(StartReplica{});
+ else
+ m_fsm->process_event(StartReplicaNoWait{});
+ }
dout(10) << "scrubber event --<< " << __func__ << dendl;
}
-void PgScrubber::send_sched_replica()
+void PgScrubber::send_sched_replica(epoch_t epoch_queued)
{
- dout(10) << "scrubber event -->> " << __func__ << dendl;
- m_fsm->my_states();
- m_fsm->process_event(SchedReplica{}); // retest for map availability
+ dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued << dendl;
+ if (check_interval_replica(epoch_queued)) {
+ m_fsm->my_states();
+ m_fsm->process_event(SchedReplica{}); // retest for map availability
+ }
dout(10) << "scrubber event --<< " << __func__ << dendl;
}
-void PgScrubber::active_pushes_notification()
+void PgScrubber::active_pushes_notification(epoch_t epoch_queued)
{
- dout(10) << "scrubber event -->> " << __func__ << dendl;
- if (should_abort_scrub(epoch_t(0))) {
- dout(10) << __func__ << " aborting!" << dendl;
- scrub_clear_state(false);
- } else {
+ // note: Primary only
+ dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued << dendl;
+ if (is_message_relevant(epoch_queued)) {
m_fsm->my_states();
m_fsm->process_event(ActivePushesUpd{});
}
void PgScrubber::update_applied_notification(epoch_t epoch_queued)
{
- dout(10) << "scrubber event -->> " << __func__ << "() epoch: " << epoch_queued << dendl;
- if (should_abort_scrub(epoch_queued)) {
- dout(10) << __func__ << " aborting!" << dendl;
- scrub_clear_state(false);
- } else {
+ // note: Primary only
+ dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued << dendl;
+ if (is_message_relevant(epoch_queued)) {
m_fsm->my_states();
m_fsm->process_event(UpdatesApplied{});
}
dout(10) << "scrubber event --<< " << __func__ << dendl;
}
-void PgScrubber::digest_update_notification()
+void PgScrubber::digest_update_notification(epoch_t epoch_queued)
{
- dout(10) << "scrubber event -->> " << __func__ << dendl;
- m_fsm->my_states();
- if (is_event_relevant(epoch_t(0))) {
+ // note: Primary only
+ dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued << dendl;
+ if (is_message_relevant(epoch_queued)) {
+ m_fsm->my_states();
m_fsm->process_event(DigestUpdate{});
- } else {
- // no need to send anything
- dout(10) << __func__ << " event no longer relevant" << dendl;
}
dout(10) << "scrubber event --<< " << __func__ << dendl;
}
-void PgScrubber::send_epoch_changed()
+// no checks should be performed here
+void PgScrubber::send_interval_changed()
{
dout(10) << "scrubber event -->> " << __func__ << dendl;
- if (is_scrub_active()) {
- m_fsm->my_states();
- m_fsm->process_event(EpochChanged{});
- }
+ m_fsm->my_states();
+ m_fsm->process_event(IntervalChanged{});
dout(10) << "scrubber event --<< " << __func__ << dendl;
}
-void PgScrubber::send_replica_maps_ready()
+void PgScrubber::send_replica_maps_ready(epoch_t epoch_queued)
{
- dout(10) << "scrubber event -->> " << __func__ << dendl;
- m_fsm->my_states();
- if (is_scrub_active()) {
+ dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued << dendl;
+ if (is_message_relevant(epoch_queued)) {
+ m_fsm->my_states();
m_fsm->process_event(GotReplicas{});
}
dout(10) << "scrubber event --<< " << __func__ << dendl;
}
-void PgScrubber::send_replica_pushes_upd()
+void PgScrubber::send_replica_pushes_upd(epoch_t epoch_queued)
{
- dout(10) << "scrubber event -->> " << __func__ << dendl;
- m_fsm->my_states();
- if (is_scrub_active()) {
+ dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued << dendl;
+ if (check_interval_replica(epoch_queued)) {
+ m_fsm->my_states();
m_fsm->process_event(ReplicaPushesUpd{});
}
dout(10) << "scrubber event --<< " << __func__ << dendl;
}
-void PgScrubber::send_remotes_reserved()
+void PgScrubber::send_remotes_reserved(epoch_t epoch_queued)
{
- dout(10) << "scrubber event -->> " << __func__ << dendl;
- m_fsm->my_states();
- m_fsm->process_event(RemotesReserved{}); // note: too early to check for 'active'!
+ dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued << dendl;
+ // note: scrub is not active yet
+ if (check_interval(epoch_queued)) {
+ m_fsm->my_states();
+ m_fsm->process_event(RemotesReserved{});
+ }
dout(10) << "scrubber event --<< " << __func__ << dendl;
}
-void PgScrubber::send_reservation_failure()
+void PgScrubber::send_reservation_failure(epoch_t epoch_queued)
{
- dout(10) << "scrubber event -->> " << __func__ << dendl;
- m_fsm->my_states();
- m_fsm->process_event(ReservationFailure{}); // do not check for 'active'!
+ dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued << dendl;
+ if (check_interval(epoch_queued)) { // do not check for 'active'!
+ m_fsm->my_states();
+ m_fsm->process_event(ReservationFailure{});
+ }
dout(10) << "scrubber event --<< " << __func__ << dendl;
}
-bool PgScrubber::is_scrub_active() const
-{
- dout(10) << " " << __func__ << " actv? " << m_active << "pg:" << m_pg->pg_id << dendl;
- return m_active;
-}
-
bool PgScrubber::is_reserving() const
{
return m_fsm->is_reserving();
void PgScrubber::reset_epoch(epoch_t epoch_queued)
{
- dout(10) << __func__ << " PG( " << m_pg->pg_id
- << (m_pg->is_primary() ? ") prm" : ") rpl") << " epoch: " << epoch_queued
- << " state deep? " << state_test(PG_STATE_DEEP_SCRUB) << dendl;
-
- dout(10) << __func__ << " STATE_SCRUBBING? " << state_test(PG_STATE_SCRUBBING) << dendl;
- m_epoch_queued = epoch_queued;
- m_needs_sleep = true;
-
+ dout(10) << __func__ << " state deep? " << state_test(PG_STATE_DEEP_SCRUB) << dendl;
m_fsm->assert_not_active();
+ m_epoch_start = epoch_queued;
+ m_needs_sleep = true;
m_is_deep = state_test(PG_STATE_DEEP_SCRUB);
}
}
// ///////////////////////////////////////////////////////////////////// //
-// scrub op registration handling
+// scrub-op registration handling
bool PgScrubber::is_scrub_registered() const
{
reg_stamp = m_pg->info.history.last_scrub_stamp;
}
- dout(9) << __func__ << " pg(" << m_pg_id << ") must: " << must
- << " required:" << m_flags.required << " flags: " << request_flags
- << " stamp: " << reg_stamp << dendl;
+ dout(15) << __func__ << " pg(" << m_pg_id << ") must: " << must
+ << " required:" << m_flags.required << " flags: " << request_flags
+ << " stamp: " << reg_stamp << dendl;
// note down the sched_time, so we can locate this scrub, and remove it
// later on.
void PgScrubber::request_rescrubbing(requested_scrub_t& req_flags)
{
- dout(10) << __func__ << " existing-" << m_scrub_reg_stamp << " ## "
+ dout(10) << __func__ << " existing-" << m_scrub_reg_stamp << ". was registered? "
<< is_scrub_registered() << dendl;
unreg_next_scrub();
void PgScrubber::set_subset_last_update(eversion_t e)
{
m_subset_last_update = e;
+ dout(15) << __func__ << " last-update: " << e << dendl;
}
/*
}
}
-/**
- * walk the log to find the latest update that affects our chunk
- */
eversion_t PgScrubber::search_log_for_updates() const
{
auto& projected = m_pg->projected_log.log;
bool PgScrubber::get_replicas_maps(bool replica_can_preempt)
{
- dout(10) << __func__ << " epoch_start: " << m_epoch_start
+ dout(10) << __func__ << " started in epoch/interval: " << m_epoch_start << "/"
+ << m_interval_start
<< " pg same_interval_since: " << m_pg->info.history.same_interval_since
<< dendl;
bool PgScrubber::was_epoch_changed() const
{
// for crimson we have m_pg->get_info().history.same_interval_since
- dout(10) << __func__ << " epoch_start: " << m_epoch_start
+ dout(10) << __func__ << " epoch_start: " << m_interval_start
<< " from pg: " << m_pg->get_history().same_interval_since << dendl;
- return m_epoch_start < m_pg->get_history().same_interval_since;
+ return m_interval_start < m_pg->get_history().same_interval_since;
}
void PgScrubber::mark_local_map_ready()
dout(10) << __func__ << " scrubmap from osd." << replica
<< (deep ? " deep" : " shallow") << dendl;
- auto repscrubop = new MOSDRepScrub(
- spg_t(m_pg->info.pgid.pgid, replica.shard), version, m_pg->get_osdmap_epoch(),
- m_pg->get_last_peering_reset(), start, end, deep, allow_preemption, m_flags.priority,
- m_pg->ops_blocked_by_scrub());
+ auto repscrubop =
+ new MOSDRepScrub(spg_t(m_pg->info.pgid.pgid, replica.shard), version,
+ get_osdmap_epoch(), m_pg->get_last_peering_reset(), start, end, deep,
+ allow_preemption, m_flags.priority, m_pg->ops_blocked_by_scrub());
// default priority. We want the replica-scrub processed prior to any recovery
// or client io messages (we are holding a lock!)
preemption_data.reset();
m_pg->publish_stats_to_osd();
- m_epoch_start = m_pg->get_history().same_interval_since;
+ m_interval_start = m_pg->get_history().same_interval_since;
+ // m_epoch_started = m_pg->get_osdmap_epoch();
- dout(10) << __func__ << " start same_interval:" << m_epoch_start << dendl;
+ dout(10) << __func__ << " start same_interval:" << m_interval_start << dendl;
// create a new store
{
}
m_start = m_pg->info.pgid.pgid.get_hobj_start();
+ m_last_dead_interval = get_osdmap_epoch();
m_active = true;
}
{
ceph_assert(!m_active);
m_active = true;
+ m_last_dead_interval = get_osdmap_epoch(); // so that check_interval_replica() won't
+ // kill a scrub for stale messages
}
void PgScrubber::_scan_snaps(ScrubMap& smap)
hobject_t head;
SnapSet snapset;
- // Test qa/standalone/scrub/osd-scrub-snaps.sh uses this message to verify
- // caller using clean_meta_map(), and it works properly.
- dout(15) << __func__ << " starts" << dendl;
+ // Test qa/standalone/scrub/osd-scrub-snaps.sh greps for the strings
+ // in this function
+ dout(15) << "_scan_snaps starts" << dendl;
for (auto i = smap.objects.rbegin(); i != smap.objects.rend(); ++i) {
m_end, m_is_deep);
if (ret == -EINPROGRESS)
- m_osds->queue_for_scrub_resched(m_pg, Scrub::scrub_prio_t::high_priority);
+ m_osds->queue_for_scrub_resched(m_pg, Scrub::scrub_prio_t::low_priority);
return ret;
}
int PgScrubber::build_replica_map_chunk()
{
- dout(10) << __func__ << " epoch start: " << m_epoch_start << " ep q: " << m_epoch_queued
- << dendl;
- dout(10) << __func__ << " deep: " << m_is_deep << dendl;
+ dout(10) << __func__ << " interval start: " << m_interval_start
+ << " epoch: " << m_epoch_start << " deep: " << m_is_deep << dendl;
auto ret = build_scrub_map_chunk(replica_scrubmap, replica_scrubmap_pos, m_start, m_end,
m_is_deep);
// finished!
// In case we restarted smaller chunk, clear old data
- ScrubMap for_meta_scrub;
m_cleaned_meta_map.clear_from(m_start);
m_cleaned_meta_map.insert(replica_scrubmap);
- clean_meta_map(for_meta_scrub);
+ auto for_meta_scrub = clean_meta_map();
_scan_snaps(for_meta_scrub);
}
return 0;
}
-/**
- * \todo describe what we are doing here
+/*
+ * Process:
+ * Building a map of objects suitable for snapshot validation.
+ * The data in m_cleaned_meta_map is the left over partial items that need to
+ * be completed before they can be processed.
*
- * @param for_meta_scrub
+ * Snapshots in maps precede the head object, which is why we are scanning backwards.
*/
-void PgScrubber::clean_meta_map(ScrubMap& for_meta_scrub)
+ScrubMap PgScrubber::clean_meta_map()
{
+ ScrubMap for_meta_scrub;
+
if (m_end.is_max() || m_cleaned_meta_map.objects.empty()) {
m_cleaned_meta_map.swap(for_meta_scrub);
} else {
for_meta_scrub.objects.insert(begin, iter);
m_cleaned_meta_map.objects.erase(begin, iter);
}
+
+ return for_meta_scrub;
}
void PgScrubber::run_callbacks()
requeue_waiting();
}
-Scrub::preemption_t* PgScrubber::get_preemptor()
+Scrub::preemption_t& PgScrubber::get_preemptor()
{
- return &preemption_data;
+ return preemption_data;
}
void PgScrubber::requeue_replica(Scrub::scrub_prio_t is_high_priority)
dout(10) << __func__ << " pg:" << m_pg->pg_id << " Msg: map_epoch:" << msg->map_epoch
<< " min_epoch:" << msg->min_epoch << " deep?" << msg->deep << dendl;
+ // are we still processing a previous scrub-map request without noticing that the
+ // interval changed? won't see it here, but rather at the reservation stage.
+
+
if (msg->map_epoch < m_pg->info.history.same_interval_since) {
dout(10) << "replica_scrub_op discarding old replica_scrub from " << msg->map_epoch
<< " < " << m_pg->info.history.same_interval_since << dendl;
+
+ // is there a general sync issue? are we holding a stale reservation?
+ // not checking now - assuming we will actively react to interval change.
+
return;
}
replica_scrubmap = ScrubMap{};
replica_scrubmap_pos = ScrubMapBuilder{};
- // m_replica_epoch_start is overwritten if requeued waiting for active pushes
- m_replica_epoch_start = m_pg->info.history.same_interval_since;
m_replica_min_epoch = msg->min_epoch;
m_start = msg->start;
m_end = msg->end;
m_max_end = msg->end;
m_is_deep = msg->deep;
- m_epoch_start = m_pg->info.history.same_interval_since;
+ m_interval_start = m_pg->info.history.same_interval_since;
m_replica_request_priority = msg->high_priority ? Scrub::scrub_prio_t::high_priority
: Scrub::scrub_prio_t::low_priority;
m_flags.priority = msg->priority ? msg->priority : m_pg->get_scrub_priority();
m_osds->queue_for_rep_scrub(m_pg, m_replica_request_priority, m_flags.priority);
}
-void PgScrubber::replica_scrub(epoch_t epoch_queued)
-{
- dout(10) << __func__ << ": " << m_pg->pg_id << " epoch queued: " << epoch_queued
- << dendl;
- dout(20) << __func__ << " m_epoch_start: " << m_epoch_start
- << " better be >= " << m_pg->info.history.same_interval_since << dendl;
- dout(20) << __func__ << " m_is_deep: " << m_is_deep << dendl;
-
- if (m_pg->pg_has_reset_since(epoch_queued)) {
- dout(10) << "replica_scrub(epoch,) - reset!" << dendl;
- send_epoch_changed();
- return;
- }
-
- if (was_epoch_changed()) {
- dout(10) << "replica_scrub(epoch,) - epoch!" << dendl;
- send_epoch_changed();
- return;
- }
- ceph_assert(!is_primary()); // as should have been caught by the epoch-changed check
-
- send_start_replica();
-}
-
-void PgScrubber::replica_scrub_resched(epoch_t epoch_queued)
-{
- dout(10) << __func__ << ": " << m_pg->pg_id << " epoch queued: " << epoch_queued
- << dendl;
-
- if (m_pg->pg_has_reset_since(epoch_queued)) {
- dout(10) << "replica_scrub(epoch,) - reset!" << dendl;
- send_epoch_changed();
- return;
- }
-
- if (was_epoch_changed()) {
- dout(10) << __func__ << " epoch changed!" << dendl;
- send_epoch_changed();
- return;
- }
- ceph_assert(!is_primary()); // as should have been caught by the epoch-changed check
-
- send_sched_replica();
-}
-
void PgScrubber::set_op_parameters(requested_scrub_t& request)
{
dout(10) << __func__ << " input: " << request << dendl;
+ // write down the epoch of starting a new scrub. Will be used
+ // to discard stale messages from previous aborted scrubs.
+ m_epoch_start = m_pg->get_osdmap_epoch();
+
m_flags.check_repair = request.check_repair;
m_flags.auto_repair = request.auto_repair || request.need_auto;
m_flags.required = request.req_scrub || request.must_scrub;
request = requested_scrub_t{};
}
-/**
- * RRR \todo ask why we collect from acting+recovery+backfill, but use the size of
- * only the acting set
- */
void PgScrubber::scrub_compare_maps()
{
dout(10) << __func__ << " has maps, analyzing" << dendl;
m_osds->clog->warn(ss);
}
- if (m_pg->recovery_state.get_acting().size() > 1) {
+ if (m_pg->recovery_state.get_acting_recovery_backfill().size() > 1) {
dout(10) << __func__ << " comparing replica scrub maps" << dendl;
}
}
- ScrubMap for_meta_scrub;
- clean_meta_map(for_meta_scrub);
+ auto for_meta_scrub = clean_meta_map();
// ok, do the pg-type specific scrubbing
}
}
-void PgScrubber::replica_update_start_epoch()
-{
- dout(10) << __func__ << " start:" << m_pg->info.history.same_interval_since << dendl;
- m_replica_epoch_start = m_pg->info.history.same_interval_since;
-}
-
/**
* Send the requested map back to the primary (or - if we
* were preempted - let the primary know).
*/
-void PgScrubber::send_replica_map(bool was_preempted)
+void PgScrubber::send_replica_map(PreemptionNoted was_preempted)
{
- dout(10) << __func__ << " min epoch:" << m_replica_min_epoch
- << " epoch_start:" << m_replica_epoch_start << dendl;
+ dout(10) << __func__ << " min epoch:" << m_replica_min_epoch << dendl;
auto reply = new MOSDRepScrubMap(spg_t(m_pg->info.pgid.pgid, m_pg->get_primary().shard),
m_replica_min_epoch, m_pg_whoami);
- reply->preempted = was_preempted;
+ reply->preempted = (was_preempted == PreemptionNoted::preempted);
::encode(replica_scrubmap, reply->get_data());
m_osds->send_message_osd_cluster(m_pg->get_primary().osd, reply, m_replica_min_epoch);
}
-/**
+/*
* - if the replica lets us know it was interrupted, we mark the chunk as interrupted.
* The state-machine will react to that when all replica maps are received.
* - when all maps are received, we signal the FSM with the GotReplicas event (see
* scrub_send_replmaps_ready()). Note that due to the no-reentrancy limitations of the
* FSM, we do not 'process' the event directly. Instead - it is queued for the OSD to
- * handle (well - the incoming message is marked for fast dispatching, which is an
- * even better reason for handling it via the queue).
+ * handle.
*/
void PgScrubber::map_from_replica(OpRequestRef op)
{
m_received_maps[m->from].decode(p, m_pg->info.pgid.pool());
dout(15) << "map version is " << m_received_maps[m->from].valid_through << dendl;
- [[maybe_unused]] auto [is_ok, err_txt] = m_maps_status.mark_arriving_map(m->from);
- ceph_assert(is_ok); // and not an error message, following the original code
+ auto [is_ok, err_txt] = m_maps_status.mark_arriving_map(m->from);
+ if (!is_ok) {
+ // previously an unexpected map was triggering an assert. Now, as scrubs can be
+ // aborted at any time, the chances of this happening have increased, and aborting is
+ // not justified
+ dout(1) << __func__ << err_txt << " from OSD " << m->from << dendl;
+ return;
+ }
if (m->preempted) {
dout(10) << __func__ << " replica was preempted, setting flag" << dendl;
}
if (m_maps_status.are_all_maps_available()) {
- dout(10) << __func__ << " osd-queuing GotReplicas" << dendl;
+ dout(15) << __func__ << " all repl-maps available" << dendl;
m_osds->queue_scrub_got_repl_maps(m_pg, m_pg->is_scrub_blocking_ops());
}
}
-/**
- * we are a replica being asked by the Primary to reserve OSD resources for
- * scrubbing
- */
void PgScrubber::handle_scrub_reserve_request(OpRequestRef op)
{
dout(10) << __func__ << " " << *op->get_req() << dendl;
op->mark_started();
+ auto request_ep = op->get_req<MOSDScrubReserve>()->get_map_epoch();
+
+ /*
+ * if we are currently holding a reservation, then:
+ * either (1) we, the scrubber, did not yet notice an interval change. The remembered
+ * reservation epoch is from before our interval, and we can silently discard the
+ * reservation (no message is required).
+ * or:
+ * (2) the interval hasn't changed, but the same Primary that (we think) holds the
+ * lock just sent us a new request. Note that we know it's the same Primary, as
+ * otherwise the interval would have changed.
+ * Ostensibly we can discard & redo the reservation. But then we
+ * will be temporarily releasing the OSD resource - and might not be able to grab it
+ * again. Thus we simple treat this as a successful new request.
+ */
+
+ if (m_remote_osd_resource.has_value() && m_remote_osd_resource->is_stale()) {
+ // we are holding a stale reservation from a past epoch
+ m_remote_osd_resource.reset();
+ }
- if (m_remote_osd_resource.has_value() && m_remote_osd_resource->is_reserved()) {
- dout(10) << __func__ << " ignoring reserve request: Already reserved" << dendl;
+ if (request_ep < m_pg->get_same_interval_since()) {
+ // will not ack stale requests
return;
}
bool granted{false};
+ if (m_remote_osd_resource.has_value()) {
- if (m_pg->cct->_conf->osd_scrub_during_recovery || !m_osds->is_recovery_active()) {
+ dout(10) << __func__ << " already reserved." << dendl;
+ granted = true;
- m_remote_osd_resource.emplace(m_pg, m_osds);
+ } else if (m_pg->cct->_conf->osd_scrub_during_recovery ||
+ !m_osds->is_recovery_active()) {
+ m_remote_osd_resource.emplace(m_pg, m_osds, request_ep);
// OSD resources allocated?
granted = m_remote_osd_resource->is_reserved();
if (!granted) {
dout(10) << __func__ << " reserved? " << (granted ? "yes" : "no") << dendl;
- auto m = op->get_req<MOSDScrubReserve>();
Message* reply = new MOSDScrubReserve(
- spg_t(m_pg->info.pgid.pgid, m_pg->get_primary().shard), m->map_epoch,
+ spg_t(m_pg->info.pgid.pgid, m_pg->get_primary().shard), request_ep,
granted ? MOSDScrubReserve::GRANT : MOSDScrubReserve::REJECT, m_pg_whoami);
m_osds->send_message_osd_cluster(reply, op->get_req()->get_connection());
if (m_reservations.has_value()) {
m_reservations->handle_reserve_grant(op, from);
} else {
- derr << __func__ << ": replica scrub reservations that will be leaked!" << dendl;
+ derr << __func__ << ": received unsolicited reservation grant from osd " << from
+ << " (" << op << ")" << dendl;
}
}
void PgScrubber::dump(ceph::Formatter* f) const
{
f->open_object_section("scrubber");
- f->dump_stream("epoch_start") << m_epoch_start;
+ f->dump_stream("epoch_start") << m_interval_start;
f->dump_bool("active", m_active);
if (m_active) {
f->dump_stream("start") << m_start;
dout(10) << __func__ << dendl;
f->open_object_section("scrub");
- f->dump_stream("scrubber.epoch_start") << m_epoch_start;
+ f->dump_stream("scrubber.epoch_start") << m_interval_start;
f->dump_bool("scrubber.active", m_active);
f->dump_stream("scrubber.start") << m_start;
f->dump_stream("scrubber.end") << m_end;
, m_pg_id{pg->pg_id}
, m_osds{m_pg->osd}
, m_pg_whoami{pg->pg_whoami}
- , m_epoch_queued{0}
, preemption_data{pg}
{
dout(20) << " creating PgScrubber for " << pg->pg_id << " / " << m_pg_whoami << dendl;
m_reservations.emplace(m_pg, m_pg_whoami);
}
-// called only for normal end-of-scrub, and only for a Primary
void PgScrubber::cleanup_on_finish()
{
dout(10) << __func__ << dendl;
m_reservations.reset();
m_local_osd_resource.reset();
- m_pg->requeue_ops(m_pg->waiting_for_scrub);
+ requeue_waiting();
reset_internal_state();
// type-specific state clear
}
// uses process_event(), so must be invoked externally
-void PgScrubber::scrub_clear_state(bool keep_repair_state)
+void PgScrubber::scrub_clear_state()
{
dout(10) << __func__ << dendl;
- clear_pgscrub_state(keep_repair_state);
+ clear_pgscrub_state();
m_fsm->process_event(FullReset{});
}
/*
* note: does not access the state-machine
*/
-void PgScrubber::clear_pgscrub_state(bool keep_repair_state)
+void PgScrubber::clear_pgscrub_state()
{
dout(10) << __func__ << dendl;
ceph_assert(m_pg->is_locked());
state_clear(PG_STATE_SCRUBBING);
state_clear(PG_STATE_DEEP_SCRUB);
- if (!keep_repair_state)
- state_clear(PG_STATE_REPAIR);
+
+ state_clear(PG_STATE_REPAIR);
clear_scrub_reservations();
m_pg->publish_stats_to_osd();
- m_pg->requeue_ops(m_pg->waiting_for_scrub);
+ requeue_waiting();
reset_internal_state();
state_clear(PG_STATE_SCRUBBING);
state_clear(PG_STATE_DEEP_SCRUB);
- // make sure we cleared the reservations!
-
preemption_data.reset();
m_maps_status.reset();
m_received_maps.clear();
ostream& PgScrubber::show(ostream& out) const
{
- return out << " [ " << m_pg_id << ": " << /*for now*/ m_flags << " ] ";
+ return out << " [ " << m_pg_id << ": " << m_flags << " ] ";
}
// ///////////////////// preemption_data_t //////////////////////////////////
m_osds->queue_for_scrub_denied(m_pg, scrub_prio_t::low_priority);
}
-void ReplicaReservations::release_all()
+void ReplicaReservations::discard_all()
{
dout(10) << __func__ << " " << m_reserved_peers << dendl;
m_had_rejections = true; // preventing late-coming responses from triggering events
- epoch_t epoch = m_pg->get_osdmap_epoch();
-
- for (auto p : m_reserved_peers) {
- release_replica(p, epoch);
- }
m_reserved_peers.clear();
-
- // note: the release will follow on the heels of the request. When tried otherwise,
- // grants that followed a reject arrived after the whole scrub machine-state was
- // reset, causing leaked reservations.
- if (m_pending) {
- for (auto p : m_waited_for_peers) {
- release_replica(p, epoch);
- }
- }
m_waited_for_peers.clear();
}
// send un-reserve messages to all reserved replicas. We do not wait for answer (there
// wouldn't be one). Other incoming messages will be discarded on the way, by our
// owner.
- release_all();
+ dout(10) << __func__ << " " << m_reserved_peers << dendl;
+
+ epoch_t epoch = m_pg->get_osdmap_epoch();
+
+ for (auto& p : m_reserved_peers) {
+ release_replica(p, epoch);
+ }
+ m_reserved_peers.clear();
+
+ // note: the release will follow on the heels of the request. When tried otherwise,
+ // grants that followed a reject arrived after the whole scrub machine-state was
+ // reset, causing leaked reservations.
+ for (auto& p : m_waited_for_peers) {
+ release_replica(p, epoch);
+ }
+ m_waited_for_peers.clear();
}
/**
} else if (std::find(m_reserved_peers.begin(), m_reserved_peers.end(), from) !=
m_reserved_peers.end()) {
- dout(15) << " already had osd." << from << " reserved" << dendl;
+ dout(10) << " already had osd." << from << " reserved" << dendl;
} else {
dout(10) << " osd." << from << " scrub reserve = fail" << dendl;
m_had_rejections = true; // preventing any additional notifications
- --m_pending; // not sure we need this bookkeeping anymore
send_reject();
}
}
+
// ///////////////////// LocalReservation //////////////////////////////////
LocalReservation::LocalReservation(PG* pg, OSDService* osds)
m_holding_local_reservation = true;
}
-void LocalReservation::early_release()
+LocalReservation::~LocalReservation()
{
if (m_holding_local_reservation) {
m_holding_local_reservation = false;
}
}
-LocalReservation::~LocalReservation()
-{
- early_release();
-}
-
// ///////////////////// ReservedByRemotePrimary ///////////////////////////////
-ReservedByRemotePrimary::ReservedByRemotePrimary(PG* pg, OSDService* osds)
- : m_pg{pg} // holding the "whole PG" for dout() sake
- , m_osds{osds}
+ReservedByRemotePrimary::ReservedByRemotePrimary(PG* pg, OSDService* osds, epoch_t epoch)
+ : m_pg{pg}, m_osds{osds}, m_reserved_at{epoch}
{
if (!m_osds->inc_scrubs_remote()) {
dout(10) << __func__ << ": failed to reserve at Primary request" << dendl;
m_reserved_by_remote_primary = true;
}
-void ReservedByRemotePrimary::early_release()
+bool ReservedByRemotePrimary::is_stale() const
+{
+ return m_reserved_at < m_pg->get_same_interval_since();
+}
+
+ReservedByRemotePrimary::~ReservedByRemotePrimary()
{
- dout(20) << "ReservedByRemotePrimary::" << __func__ << ": "
- << m_reserved_by_remote_primary << dendl;
if (m_reserved_by_remote_primary) {
m_reserved_by_remote_primary = false;
m_osds->dec_scrubs_remote();
}
}
-ReservedByRemotePrimary::~ReservedByRemotePrimary()
-{
- early_release();
-}
-
// ///////////////////// MapsCollectionStatus ////////////////////////////////
auto MapsCollectionStatus::mark_arriving_map(pg_shard_t from)
m_maps_awaited_for.erase(fe);
return std::tuple{true, ""sv};
} else {
- return std::tuple{false, "unsolicited scrub-map"sv};
+ return std::tuple{false, " unsolicited scrub-map"sv};
}
}
return out << " ] ";
}
-} // namespace Scrub
+} // namespace Scrub
\ No newline at end of file
/// notify the scrubber that we have failed to reserve replicas' resources
void send_reject();
- void release_all();
-
public:
+ /**
+ * quietly discard all knowledge about existing reservations. No messages
+ * are sent to peers.
+ * To be used upon interval change, as we know the the running scrub is no longer
+ * relevant, and that the replicas had reset the reservations on their side.
+ */
+ void discard_all();
+
ReplicaReservations(PG* pg, pg_shard_t whoami);
~ReplicaReservations();
LocalReservation(PG* pg, OSDService* osds);
~LocalReservation();
bool is_reserved() const { return m_holding_local_reservation; }
- void early_release();
};
/**
PG* m_pg;
OSDService* m_osds;
bool m_reserved_by_remote_primary{false};
+ const epoch_t m_reserved_at;
public:
- ReservedByRemotePrimary(PG* pg, OSDService* osds);
+ ReservedByRemotePrimary(PG* pg, OSDService* osds, epoch_t epoch);
~ReservedByRemotePrimary();
[[nodiscard]] bool is_reserved() const { return m_reserved_by_remote_primary; }
- void early_release();
+
+ /// compare the remembered reserved-at epoch to the current interval
+ [[nodiscard]] bool is_stale() const;
};
/**
/// are we waiting for resource reservation grants form our replicas?
[[nodiscard]] bool is_reserving() const final;
- void send_start_scrub() final;
+ void send_start_scrub(epoch_t epoch_queued) final;
- void send_start_after_repair() final;
+ void send_start_after_repair(epoch_t epoch_queued) final;
- void send_scrub_resched() final;
+ void send_scrub_resched(epoch_t epoch_queued) final;
- void active_pushes_notification() final;
+ void active_pushes_notification(epoch_t epoch_queued) final;
void update_applied_notification(epoch_t epoch_queued) final;
- void send_scrub_unblock() final;
+ void send_scrub_unblock(epoch_t epoch_queued) final;
+
+ void digest_update_notification(epoch_t epoch_queued) final;
- void digest_update_notification() final;
+ void send_replica_maps_ready(epoch_t epoch_queued) final;
- void send_replica_maps_ready() final;
+ void send_start_replica(epoch_t epoch_queued) final;
- void send_replica_pushes_upd() final;
+ void send_sched_replica(epoch_t epoch_queued) final;
+
+ void send_replica_pushes_upd(epoch_t epoch_queued) final;
void reset_epoch(epoch_t epoch_queued) final;
/// true if the given range intersects the scrub interval in any way
bool range_intersects_scrub(const hobject_t& start, const hobject_t& end) final;
+ /**
+ * we are a replica being asked by the Primary to reserve OSD resources for
+ * scrubbing
+ */
void handle_scrub_reserve_request(OpRequestRef op) final;
+
void handle_scrub_reserve_grant(OpRequestRef op, pg_shard_t from) final;
void handle_scrub_reserve_reject(OpRequestRef op, pg_shard_t from) final;
void handle_scrub_reserve_release(OpRequestRef op) final;
// used if we are a replica
void replica_scrub_op(OpRequestRef op) final;
- void replica_scrub(epoch_t epoch_queued) final;
- void replica_scrub_resched(epoch_t epoch_queued) final;
/// the op priority, taken from the primary's request message
Scrub::scrub_prio_t replica_op_priority() const final
return false;
}
- void scrub_clear_state(bool keep_repair_state = false) final;
+ void scrub_clear_state() final;
/**
* add to scrub statistics, but only if the soid is below the scrub start
return m_pg->recovery_state.get_last_update_applied();
}
- void requeue_waiting() const final { m_pg->requeue_ops(m_pg->waiting_for_scrub); }
-
int pending_active_pushes() const final { return m_pg->active_pushes; }
void scrub_compare_maps() final;
/// the version of 'scrub_clear_state()' that does not try to invoke FSM services
/// (thus can be called from FSM reactions)
- void clear_pgscrub_state(bool keep_repair_state) final;
+ void clear_pgscrub_state() final;
void add_delayed_scheduling() final;
Scrub::FsmNext on_digest_updates() final;
- void send_replica_map(bool was_preempted) final;
+ void send_replica_map(Scrub::PreemptionNoted was_preempted) final;
- void send_remotes_reserved() final;
- void send_reservation_failure() final;
+ void send_remotes_reserved(epoch_t epoch_queued) final;
+ void send_reservation_failure(epoch_t epoch_queued) final;
/**
* does the PG have newer updates than what we (the scrubber) know?
void set_subset_last_update(eversion_t e) final;
- void replica_update_start_epoch() final;
-
void maps_compare_n_cleanup() final;
- Scrub::preemption_t* get_preemptor() final;
+ Scrub::preemption_t& get_preemptor() final;
int build_primary_map_chunk() final;
virtual ~PgScrubber(); // must be defined separately, in the .cc file
- [[nodiscard]] bool is_scrub_active() const final;
+ [[nodiscard]] bool is_scrub_active() const final { return m_active; }
private:
void reset_internal_state();
- void _scan_snaps(ScrubMap& smap); // note that the (non-standard for a
- // non-virtual) name of the function is searched
- // for by the QA standalone tests. Do not modify.
+ void requeue_waiting() const { m_pg->requeue_ops(m_pg->waiting_for_scrub); }
- void clean_meta_map(ScrubMap& for_meta_scrub);
+ void _scan_snaps(ScrubMap& smap);
+
+ ScrubMap clean_meta_map();
void run_callbacks();
+ void send_interval_changed();
+
+ // ----- methods used to verify the relevance of incoming events:
+
/**
- * are we still a clean & healthy scrubbing primary?
+ * is the incoming event still relevant, and should be processed?
*
- * relevant only after the initial sched_scrub
+ * It isn't if:
+ * - (1) we are no longer 'actively scrubbing'; or
+ * - (2) the message is from an epoch prior to when we started the current scrub
+ * session; or
+ * - (3) the message epoch is from a previous interval; or
+ * - (4) the 'abort' configuration flags were set.
+ *
+ * For (1) & (2) - teh incoming message is discarded, w/o further action.
+ *
+ * For (3): (see check_interval() for a full description) if we have not reacted yet
+ * to this specific new interval, we do now:
+ * - replica reservations are silently discarded (we count on the replicas to notice
+ * the interval change and un-reserve themselves);
+ * - the scrubbing is halted.
+ *
+ * For (4): the message will be discarded, but also:
+ * if this is the first time we've noticed the 'abort' request, we perform the abort.
+ *
+ * \returns should the incoming event be processed?
*/
- [[nodiscard]] bool is_event_relevant(epoch_t queued) const;
+ bool is_message_relevant(epoch_t epoch_to_verify);
/**
* check the 'no scrub' configuration options.
*/
- [[nodiscard]] bool should_abort_scrub(epoch_t queued) const;
+ [[nodiscard]] bool should_abort() const;
+ [[nodiscard]] bool verify_against_abort(epoch_t epoch_to_verify);
+
+ bool check_interval(epoch_t epoch_to_verify);
+ bool check_interval_replica(epoch_t epoch_to_verify);
+
+ epoch_t m_last_dead_interval{};
+ epoch_t m_last_aborted{}; // last time we've noticed a request to abort
+
- void send_epoch_changed();
/**
* return true if any inconsistency/missing is repaired, false otherwise
bool m_needs_sleep{true}; ///< should we sleep before being rescheduled? always
///< 'true', unless we just got out of a sleep period
+ utime_t m_sleep_started_at;
+
// 'optional', as 'ReplicaReservations' & 'LocalReservation' are 'RAII-designed'
// to guarantee un-reserving when deleted.
/// the part that actually finalizes a scrub
void scrub_finish();
- utime_t m_sleep_started_at;
-
protected:
PG* const m_pg;
OSDService* const m_osds;
const pg_shard_t m_pg_whoami; ///< a local copy of m_pg->pg_whoami;
- epoch_t m_epoch_start; ///< epoch when scrubbing was first scheduled
- epoch_t m_epoch_queued;
+ epoch_t m_interval_start; ///< interval's 'from' of when scrubbing was first scheduled
+ /*
+ * the exact epoch when the scrubbing actually started (started here - cleared checks
+ * for no-scrub conf). Incoming events are verified against this, with stale events
+ * discarded.
+ */
+ epoch_t m_epoch_start{0}; ///< the actual epoch when scrubbing started
scrub_flags_t m_flags;
bool m_active{false};
- eversion_t m_subset_last_update;
+ eversion_t m_subset_last_update{};
std::unique_ptr<Scrub::Store> m_store;
CephContext* get_pg_cct() const { return m_pg->cct; }
- void send_start_replica();
-
- void send_sched_replica();
-
// collected statistics
int m_shallow_errors{0};
int m_deep_errors{0};
*/
bool m_is_deep{false};
- inline static int fake_count{2}; // unit-tests. To be removed
-
/**
* initiate a deep-scrub after the current scrub ended with errors.
*/
// ------------ members used if we are a replica
- epoch_t m_replica_epoch_start;
epoch_t m_replica_min_epoch; ///< the min epoch needed to handle this message
- ScrubMapBuilder replica_scrubmap_pos; /// \todo document
- ScrubMap replica_scrubmap; /// \todo document
+ ScrubMapBuilder replica_scrubmap_pos;
+ ScrubMap replica_scrubmap;
/**
* we mark the request priority as it arrived. It influences the queuing priority
* when we wait for local updates
// --------- trace/debug auxiliaries -------------------------------
-// development code. To be removed
void on_event_creation(std::string_view nm)
{
dout(20) << " scrubberFSM event: --vvvv---- " << nm << dendl;
}
-// development code. To be removed
void on_event_discard(std::string_view nm)
{
dout(20) << " scrubberFSM event: --^^^^---- " << nm << dendl;
NotActive::NotActive(my_context ctx) : my_base(ctx)
{
- dout(10) << " -- state -->> NotActive" << dendl;
+ dout(10) << "-- state -->> NotActive" << dendl;
}
-sc::result NotActive::react(const EpochChanged&)
+sc::result NotActive::react(const IntervalChanged&)
{
- dout(15) << "NotActive::react(const EpochChanged&)" << dendl;
+ dout(15) << "NotActive::react(const IntervalChanged&)" << dendl;
return discard_event();
}
ReservingReplicas::ReservingReplicas(my_context ctx) : my_base(ctx)
{
- dout(10) << " -- state -->> ReservingReplicas" << dendl;
+ dout(10) << "-- state -->> ReservingReplicas" << dendl;
DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases
scrbr->reserve_replicas();
}
-/**
- * at least one replica denied us the scrub resources we've requested
- */
sc::result ReservingReplicas::react(const ReservationFailure&)
{
DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases
dout(10) << "ReservingReplicas::react(const ReservationFailure&)" << dendl;
// the Scrubber must release all resources and abort the scrubbing
- scrbr->clear_pgscrub_state(false);
- return transit<NotActive>();
-}
-
-sc::result ReservingReplicas::react(const EpochChanged&)
-{
- DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases
- dout(10) << "ReservingReplicas::react(const EpochChanged&)" << dendl;
-
- // the Scrubber must release all resources and abort the scrubbing
- scrbr->clear_pgscrub_state(false);
+ scrbr->clear_pgscrub_state();
return transit<NotActive>();
}
ActiveScrubbing::ActiveScrubbing(my_context ctx) : my_base(ctx)
{
- dout(10) << " -- state -->> ActiveScrubbing" << dendl;
+ dout(10) << "-- state -->> ActiveScrubbing" << dendl;
DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases
scrbr->on_init();
}
scrbr->unreserve_replicas();
}
-void ScrubMachine::down_on_epoch_change(const EpochChanged&)
-{
- DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases
- dout(10) << __func__ << dendl;
- scrbr->unreserve_replicas();
-}
-
-void ScrubMachine::on_epoch_changed(const EpochChanged&)
+/*
+ * The only source of an InternalError event as of now is the BuildMap state,
+ * when encountering a backend error.
+ * We kill the scrub and reset the FSM.
+ */
+sc::result ActiveScrubbing::react(const InternalError&)
{
DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases
dout(10) << __func__ << dendl;
- // the Scrubber must release all resources and abort the scrubbing
- scrbr->clear_pgscrub_state(false);
+ scrbr->clear_pgscrub_state();
+ return transit<NotActive>();
}
sc::result ActiveScrubbing::react(const FullReset&)
{
dout(10) << "ActiveScrubbing::react(const FullReset&)" << dendl;
- // caller takes care of this: scrbr->clear_pgscrub_state(false);
+ // caller takes care of clearing the scrubber & FSM states
return transit<NotActive>();
}
*/
RangeBlocked::RangeBlocked(my_context ctx) : my_base(ctx)
{
- dout(10) << " -- state -->> Act/RangeBlocked" << dendl;
+ dout(10) << "-- state -->> Act/RangeBlocked" << dendl;
}
// ----------------------- PendingTimer -----------------------------------
*/
PendingTimer::PendingTimer(my_context ctx) : my_base(ctx)
{
- dout(10) << " -- state -->> Act/PendingTimer" << dendl;
+ dout(10) << "-- state -->> Act/PendingTimer" << dendl;
DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases
scrbr->add_delayed_scheduling();
*/
NewChunk::NewChunk(my_context ctx) : my_base(ctx)
{
- dout(10) << " -- state -->> Act/NewChunk" << dendl;
+ dout(10) << "-- state -->> Act/NewChunk" << dendl;
DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases
- scrbr->get_preemptor()->adjust_parameters();
+ scrbr->get_preemptor().adjust_parameters();
// choose range to work on
bool got_a_chunk = scrbr->select_range();
DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases
dout(10) << "WaitLastUpdate::react(const InternalAllUpdates&)" << dendl;
- if (scrbr->was_epoch_changed()) {
- dout(10) << "WaitLastUpdate: epoch!" << dendl;
- post_event(boost::intrusive_ptr<EpochChanged>(new EpochChanged{}));
- return discard_event();
- }
-
- dout(10) << "WaitLastUpdate::react(const InternalAllUpdates&) "
- << scrbr->get_preemptor()->is_preemptable() << dendl;
- scrbr->get_replicas_maps(scrbr->get_preemptor()->is_preemptable());
+ scrbr->get_replicas_maps(scrbr->get_preemptor().is_preemptable());
return transit<BuildMap>();
}
{
dout(10) << " -- state -->> Act/BuildMap" << dendl;
DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases
- dout(15) << __func__ << " same epoch? " << (scrbr->was_epoch_changed() ? "no" : "yes")
- << dendl;
-
- if (scrbr->was_epoch_changed()) {
- post_event(boost::intrusive_ptr<EpochChanged>(new EpochChanged{}));
+ // no need to check for an epoch change, as all possible flows that brought us here have
+ // an check_interval() verification of their final event.
- } else if (scrbr->get_preemptor()->was_preempted()) {
+ if (scrbr->get_preemptor().was_preempted()) {
// we were preempted, either directly or by a replica
dout(10) << __func__ << " preempted!!!" << dendl;
} else if (ret < 0) {
dout(10) << "BuildMap::BuildMap() Error! Aborting. Ret: " << ret << dendl;
- scrbr->mark_local_map_ready();
+ // scrbr->mark_local_map_ready();
post_event(boost::intrusive_ptr<InternalError>(new InternalError{}));
} else {
DrainReplMaps::DrainReplMaps(my_context ctx) : my_base(ctx)
{
- dout(10) << " -- state -->> Act/DrainReplMaps" << dendl;
+ dout(10) << "-- state -->> Act/DrainReplMaps" << dendl;
// we may have received all maps already. Send the event that will make us check.
post_event(boost::intrusive_ptr<GotReplicas>(new GotReplicas{}));
}
return transit<PendingTimer>();
}
- dout(10) << "DrainReplMaps::react(const GotReplicas&): still draining incoming maps: "
+ dout(15) << "DrainReplMaps::react(const GotReplicas&): still draining incoming maps: "
<< scrbr->dump_awaited_maps() << dendl;
return discard_event();
}
WaitReplicas::WaitReplicas(my_context ctx) : my_base(ctx)
{
- dout(10) << " -- state -->> Act/WaitReplicas" << dendl;
+ dout(10) << "-- state -->> Act/WaitReplicas" << dendl;
post_event(boost::intrusive_ptr<GotReplicas>(new GotReplicas{}));
}
dout(10) << "WaitReplicas::react(const GotReplicas&) got all" << dendl;
// were we preempted?
- if (scrbr->get_preemptor()->disable_and_test()) { // a test&set
+ if (scrbr->get_preemptor().disable_and_test()) { // a test&set
dout(10) << "WaitReplicas::react(const GotReplicas&) PREEMPTED!" << dendl;
} else {
- dout(8) << "got the replicas!" << dendl;
scrbr->maps_compare_n_cleanup();
return transit<WaitDigestUpdate>();
}
WaitDigestUpdate::WaitDigestUpdate(my_context ctx) : my_base(ctx)
{
- dout(10) << " -- state -->> Act/WaitDigestUpdate" << dendl;
+ dout(10) << "-- state -->> Act/WaitDigestUpdate" << dendl;
// perform an initial check: maybe we already
// have all the updates we need:
// (note that DigestUpdate is usually an external event)
ReplicaWaitUpdates::ReplicaWaitUpdates(my_context ctx) : my_base(ctx)
{
- dout(10) << " -- state -->> ReplicaWaitUpdates" << dendl;
+ dout(10) << "-- state -->> ReplicaWaitUpdates" << dendl;
DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases
-
scrbr->on_replica_init();
- post_event(boost::intrusive_ptr<ReplicaPushesUpd>(new ReplicaPushesUpd{}));
}
-sc::result ReplicaWaitUpdates::react(const EpochChanged&)
+sc::result ReplicaWaitUpdates::react(const IntervalChanged&)
{
- dout(10) << "ReplicaWaitUpdates::react(const EpochChanged&)" << dendl;
+ DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases
+ dout(10) << "ReplicaWaitUpdates::react(const IntervalChanged&)" << dendl;
+
+ // note: the master's reservation of us was just discarded by our caller
+ scrbr->replica_handling_done();
return transit<NotActive>();
}
DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases
dout(10) << "ReplicaWaitUpdates::react(const ReplicaPushesUpd&): "
<< scrbr->pending_active_pushes() << dendl;
- dout(8) << "same epoch? " << !scrbr->was_epoch_changed() << dendl;
- if (scrbr->was_epoch_changed()) {
-
- post_event(boost::intrusive_ptr<EpochChanged>(new EpochChanged{}));
-
- } else if (scrbr->pending_active_pushes() == 0) {
+ if (scrbr->pending_active_pushes() == 0) {
// done waiting
- scrbr->replica_update_start_epoch();
return transit<ActiveReplica>();
}
return discard_event();
}
+/**
+ * the event poster is handling the scrubber reset
+ */
+sc::result ReplicaWaitUpdates::react(const FullReset&)
+{
+ dout(10) << "ReplicaWaitUpdates::react(const FullReset&)" << dendl;
+ return transit<NotActive>();
+}
+
// ----------------------- ActiveReplica -----------------------------------
ActiveReplica::ActiveReplica(my_context ctx) : my_base(ctx)
{
- dout(10) << " -- state -->> ActiveReplica" << dendl;
+ DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases
+ dout(10) << "-- state -->> ActiveReplica" << dendl;
+ scrbr->on_replica_init(); // as we might have skipped ReplicaWaitUpdates
post_event(boost::intrusive_ptr<SchedReplica>(new SchedReplica{}));
}
{
DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases
dout(10) << "ActiveReplica::react(const SchedReplica&). is_preemptable? "
- << scrbr->get_preemptor()->is_preemptable() << dendl;
-
- if (scrbr->was_epoch_changed()) {
-
- dout(10) << "epoch changed" << dendl;
- post_event(boost::intrusive_ptr<EpochChanged>(new EpochChanged{}));
-
- } else if (scrbr->get_preemptor()->was_preempted()) {
+ << scrbr->get_preemptor().is_preemptable() << dendl;
+ if (scrbr->get_preemptor().was_preempted()) {
dout(10) << "replica scrub job preempted" << dendl;
- scrbr->send_replica_map(true);
- post_event(boost::intrusive_ptr<IntLocalMapDone>(new IntLocalMapDone{}));
-
- } else {
-
- // start or check progress of build_replica_map_chunk()
-
- auto ret = scrbr->build_replica_map_chunk();
- dout(15) << "ActiveReplica::react(const SchedReplica&) Ret: " << ret << dendl;
-
- if (ret == -EINPROGRESS) {
-
- // must wait for the backend to finish. No external event source.
- // build_replica_map_chunk() has already requeued a SchedReplica
- // event.
-
- dout(20) << "waiting for the backend..." << dendl;
-
- } else if (ret < 0) {
-
- // the existing code ignores this option, treating an error
- // report as a success.
- /// \todo what should we do here?
+ scrbr->send_replica_map(PreemptionNoted::preempted);
+ scrbr->replica_handling_done();
+ return transit<NotActive>();
+ }
- dout(1) << "Error! Aborting. ActiveReplica::react(const "
- "SchedReplica&) Ret: "
- << ret << dendl;
- post_event(boost::intrusive_ptr<IntLocalMapDone>(new IntLocalMapDone{}));
+ // start or check progress of build_replica_map_chunk()
- } else {
+ auto ret = scrbr->build_replica_map_chunk();
+ dout(15) << "ActiveReplica::react(const SchedReplica&) Ret: " << ret << dendl;
- // the local map was created. Send it to the primary.
+ if (ret == -EINPROGRESS) {
+ // must wait for the backend to finish. No external event source.
+ // build_replica_map_chunk() has already requeued a SchedReplica
+ // event.
- scrbr->send_replica_map(false); // 'false' == not preempted
- post_event(boost::intrusive_ptr<IntLocalMapDone>(new IntLocalMapDone{}));
- }
+ dout(20) << "waiting for the backend..." << dendl;
+ return discard_event();
}
- return discard_event();
-}
-sc::result ActiveReplica::react(const IntLocalMapDone&)
-{
- dout(10) << "ActiveReplica::react(const IntLocalMapDone&)" << dendl;
- DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases
-
- scrbr->replica_handling_done();
- return transit<NotActive>();
-}
+ if (ret < 0) {
+ // the existing code ignores this option, treating an error
+ // report as a success.
+ dout(1) << "Error! Aborting. ActiveReplica::react(SchedReplica) Ret: " << ret
+ << dendl;
+ scrbr->replica_handling_done();
+ return transit<NotActive>();
+ }
-sc::result ActiveReplica::react(const InternalError&)
-{
- DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases
- dout(1) << "Error! Aborting."
- << " ActiveReplica::react(const InternalError&) " << dendl;
+ // the local map was created. Send it to the primary.
+ scrbr->send_replica_map(PreemptionNoted::no_preemption);
scrbr->replica_handling_done();
return transit<NotActive>();
}
-sc::result ActiveReplica::react(const EpochChanged&)
+sc::result ActiveReplica::react(const IntervalChanged&)
{
DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases
- dout(10) << "ActiveReplica::react(const EpochChanged&) " << dendl;
+ dout(10) << "ActiveReplica::react(const IntervalChanged&) " << dendl;
- scrbr->send_replica_map(false);
+ scrbr->send_replica_map(PreemptionNoted::no_preemption);
scrbr->replica_handling_done();
return transit<NotActive>();
}
sc::result ActiveReplica::react(const FullReset&)
{
dout(10) << "ActiveReplica::react(const FullReset&)" << dendl;
- // caller takes care of this: scrbr->clear_pgscrub_state(false);
return transit<NotActive>();
}
MEV(RemotesReserved) ///< all replicas have granted our reserve request
MEV(ReservationFailure) ///< a reservation request has failed
-MEV(EpochChanged) ///< ... from what it was when this chunk started
+
+MEV(IntervalChanged) ///< ... from what it was when this chunk started
+
MEV(StartScrub) ///< initiate a new scrubbing session (relevant if we are a Primary)
MEV(AfterRepairScrub) ///< initiate a new scrubbing session. Only triggered at Recovery
///< completion.
///< scrub_snapshot_metadata()
MEV(AllChunksDone)
-MEV(StartReplica) ///< initiating replica scrub. replica_scrub_op() -> OSD Q ->
- ///< replica_scrub()
+MEV(StartReplica) ///< initiating replica scrub. replica_scrub_op() -> OSD Q ->
+ ///< replica_scrub()
+MEV(StartReplicaNoWait) ///< 'start replica' when there are no pending updates
+
MEV(SchedReplica)
MEV(ReplicaPushesUpd) ///< Update to active_pushes. 'active_pushes' represents recovery
///< that is in-flight to the local ObjectStore
PG* m_pg; // only used for dout messages
spg_t m_pg_id;
-
ScrubMachineListener* m_scrbr;
- void down_on_epoch_change(const EpochChanged&);
-
- void on_epoch_changed(const EpochChanged&);
-
void my_states() const;
-
void assert_not_active() const;
-
[[nodiscard]] bool is_reserving() const;
};
* queued "PGScrub" op.
* - a special end-of-recovery Primary scrub event ('AfterRepairScrub') that is
* not required to reserve resources.
- * - (for a replica) 'StartReplica', triggered by an incoming MOSDRepScrub msg.
+ * - (for a replica) 'StartReplica' or 'StartReplicaNoWait', triggered by an incoming
+ * MOSDRepScrub message.
*/
struct NotActive : sc::state<NotActive, ScrubMachine> {
explicit NotActive(my_context ctx);
- using reactions = mpl::list<sc::custom_reaction<EpochChanged>,
+ using reactions = mpl::list<sc::custom_reaction<IntervalChanged>,
sc::transition<StartScrub, ReservingReplicas>,
// a scrubbing that was initiated at recovery completion,
// and requires no resource reservations:
sc::transition<AfterRepairScrub, ActiveScrubbing>,
sc::transition<StartReplica, ReplicaWaitUpdates>,
- sc::custom_reaction<sc::event_base>>;
+ sc::transition<StartReplicaNoWait, ActiveReplica>>;
- sc::result react(const EpochChanged&);
- sc::result react(const sc::event_base&) // in the future: assert here
- {
- return discard_event();
- }
+ sc::result react(const IntervalChanged&);
};
struct ReservingReplicas : sc::state<ReservingReplicas, ScrubMachine> {
explicit ReservingReplicas(my_context ctx);
- using reactions = mpl::list<sc::custom_reaction<EpochChanged>,
+ using reactions = mpl::list<sc::custom_reaction<FullReset>,
// all replicas granted our resources request
sc::transition<RemotesReserved, ActiveScrubbing>,
- sc::custom_reaction<FullReset>,
sc::custom_reaction<ReservationFailure>>;
- sc::result react(const EpochChanged&);
sc::result react(const FullReset&);
+
+ /// at least one replica denied us the scrub resources we've requested
sc::result react(const ReservationFailure&);
};
// done scrubbing
sc::transition<AllChunksDone, NotActive>,
- sc::transition<EpochChanged,
- NotActive,
- ScrubMachine,
- &ScrubMachine::down_on_epoch_change>,
+ sc::custom_reaction<InternalError>,
sc::custom_reaction<FullReset>>;
sc::result react(const AllChunksDone&);
sc::result react(const FullReset&);
+ sc::result react(const InternalError&);
};
struct RangeBlocked : sc::state<RangeBlocked, ActiveScrubbing> {
struct BuildMap : sc::state<BuildMap, ActiveScrubbing> {
explicit BuildMap(my_context ctx);
+ // possible error scenarios:
+ // - an error reported by the backend will trigger an 'InternalError' event,
+ // handled by our parent state;
+ // - if preempted, we switch to DrainReplMaps, where we will wait for all
+ // replicas to send their maps before acknowledging the preemption;
+ // - an interval change will be handled by the relevant 'send-event' functions,
+ // and will translated into a 'FullReset' event.
using reactions =
mpl::list<sc::transition<IntBmPreempted, DrainReplMaps>,
sc::transition<InternalSchedScrub, BuildMap>, // looping, waiting
// for the backend to
// finish
- sc::custom_reaction<IntLocalMapDone>,
- sc::transition<InternalError, NotActive>>; // to discuss RRR
+ sc::custom_reaction<IntLocalMapDone>>;
sc::result react(const IntLocalMapDone&);
};
*/
struct ReplicaWaitUpdates : sc::state<ReplicaWaitUpdates, ScrubMachine> {
explicit ReplicaWaitUpdates(my_context ctx);
- using reactions =
- mpl::list<sc::custom_reaction<ReplicaPushesUpd>, sc::custom_reaction<EpochChanged>>;
+ using reactions = mpl::list<sc::custom_reaction<ReplicaPushesUpd>,
+ sc::custom_reaction<FullReset>,
+ sc::custom_reaction<IntervalChanged>>;
sc::result react(const ReplicaPushesUpd&);
- sc::result react(const EpochChanged&);
+ sc::result react(const IntervalChanged&);
+ sc::result react(const FullReset&);
};
struct ActiveReplica : sc::state<ActiveReplica, ScrubMachine> {
explicit ActiveReplica(my_context ctx);
- using reactions = mpl::list<sc::custom_reaction<EpochChanged>,
+ using reactions = mpl::list<sc::custom_reaction<IntervalChanged>,
sc::custom_reaction<SchedReplica>,
- sc::custom_reaction<IntLocalMapDone>,
- sc::custom_reaction<FullReset>,
- sc::custom_reaction<InternalError>>;
+ sc::custom_reaction<FullReset>>;
sc::result react(const SchedReplica&);
- sc::result react(const EpochChanged&);
- sc::result react(const IntLocalMapDone&);
+ sc::result react(const IntervalChanged&);
sc::result react(const FullReset&);
- sc::result react(const InternalError&);
};
} // namespace Scrub
/// used when PgScrubber is called by the scrub-machine, to tell the FSM
/// how to continue
enum class FsmNext { do_discard, next_chunk, goto_notactive };
+enum class PreemptionNoted { no_preemption, preempted };
/// the interface exposed by the PgScrubber into its internal
/// preemption_data object
virtual eversion_t get_last_update_applied() const = 0;
- virtual void requeue_waiting() const = 0;
-
virtual int pending_active_pushes() const = 0;
virtual int build_primary_map_chunk() = 0;
virtual void replica_handling_done() = 0;
+ // no virtual void discard_reservation_by_primary() = 0;
+
/// the version of 'scrub_clear_state()' that does not try to invoke FSM services
/// (thus can be called from FSM reactions)
- virtual void clear_pgscrub_state(bool keep_repair_state) = 0;
+ virtual void clear_pgscrub_state() = 0;
virtual void add_delayed_scheduling() = 0;
virtual Scrub::FsmNext on_digest_updates() = 0;
- virtual void send_replica_map(bool was_preempted) = 0;
-
- virtual void replica_update_start_epoch() = 0;
+ virtual void send_replica_map(Scrub::PreemptionNoted was_preempted) = 0;
[[nodiscard]] virtual bool has_pg_marked_new_updates() const = 0;
[[nodiscard]] virtual bool was_epoch_changed() const = 0;
- virtual Scrub::preemption_t* get_preemptor() = 0;
+ virtual Scrub::preemption_t& get_preemptor() = 0;
/**
* a "technical" collection of the steps performed once all
// --------------- triggering state-machine events:
- virtual void send_start_scrub() = 0;
+ virtual void send_start_scrub(epoch_t epoch_queued) = 0;
- virtual void send_start_after_repair() = 0;
+ virtual void send_start_after_repair(epoch_t epoch_queued) = 0;
- virtual void send_scrub_resched() = 0;
+ virtual void send_scrub_resched(epoch_t epoch_queued) = 0;
- virtual void replica_scrub_resched(epoch_t epoch_queued) = 0;
-
- virtual void active_pushes_notification() = 0;
+ virtual void active_pushes_notification(epoch_t epoch_queued) = 0;
virtual void update_applied_notification(epoch_t epoch_queued) = 0;
- virtual void digest_update_notification() = 0;
+ virtual void digest_update_notification(epoch_t epoch_queued) = 0;
+
+ virtual void send_scrub_unblock(epoch_t epoch_queued) = 0;
+
+ virtual void send_replica_maps_ready(epoch_t epoch_queued) = 0;
- virtual void send_scrub_unblock() = 0;
+ virtual void send_replica_pushes_upd(epoch_t epoch_queued) = 0;
- virtual void send_replica_maps_ready() = 0;
+ virtual void send_start_replica(epoch_t epoch_queued) = 0;
- virtual void send_replica_pushes_upd() = 0;
+ virtual void send_sched_replica(epoch_t epoch_queued) = 0;
// --------------------------------------------------
virtual void replica_scrub_op(OpRequestRef op) = 0;
- virtual void replica_scrub(epoch_t epoch_queued) = 0;
-
virtual void set_op_parameters(requested_scrub_t&) = 0;
- virtual void scrub_clear_state(bool keep_repair_state = false) = 0;
+ virtual void scrub_clear_state() = 0;
virtual void handle_query_state(ceph::Formatter* f) = 0;
virtual void dump(ceph::Formatter* f) const = 0;
/**
- * we allow some number of preemptions of the scrub, which mean we do
- * not block. Then we start to block. Once we start blocking, we do
- * not stop until the scrub range is completed.
+ * Return true if soid is currently being scrubbed and pending IOs should block.
+ * May have a side effect of preempting an in-progress scrub -- will return false
+ * in that case.
+ *
+ * @param soid object to check for ongoing scrub
+ * @return boolean whether a request on soid should block until scrub completion
*/
virtual bool write_blocked_by_scrub(const hobject_t& soid) = 0;
- /// true if the given range intersects the scrub interval in any way
+ /// Returns whether any objects in the range [begin, end] are being scrubbed
virtual bool range_intersects_scrub(const hobject_t& start, const hobject_t& end) = 0;
/// the op priority, taken from the primary's request message
* the version of 'scrub_clear_state()' that does not try to invoke FSM services
* (thus can be called from FSM reactions)
*/
- virtual void clear_pgscrub_state(bool keep_repair_state) = 0;
+ virtual void clear_pgscrub_state() = 0;
/**
* triggers the 'RemotesReserved' (all replicas granted scrub resources)
* state-machine event
*/
- virtual void send_remotes_reserved() = 0;
+ virtual void send_remotes_reserved(epoch_t epoch_queued) = 0;
/**
* triggers the 'ReservationFailure' (at least one replica denied us the requested
* resources) state-machine event
*/
- virtual void send_reservation_failure() = 0;
+ virtual void send_reservation_failure(epoch_t epoch_queued) = 0;
virtual void cleanup_store(ObjectStore::Transaction* t) = 0;