From: Ronen Friedman Date: Thu, 12 Nov 2020 11:12:48 +0000 (+0200) Subject: osd: refactoring scrub: the scrubber state-machine X-Git-Tag: v16.1.0~270^2~8 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=a83e4c294ea40c72a400889811e911d409157c81;p=ceph.git osd: refactoring scrub: the scrubber state-machine The scrubber's state-machine Note: A diagram of the state-machine implemented: (to update) https://drive.google.com/file/d/1KWV7tAfq0rhAtj0Fv94EWt_QykM9-hka/view?usp=sharing Signed-off-by: Ronen Friedman --- diff --git a/src/osd/scrub_machine.cc b/src/osd/scrub_machine.cc new file mode 100644 index 000000000000..53c4427d489f --- /dev/null +++ b/src/osd/scrub_machine.cc @@ -0,0 +1,595 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "scrub_machine.h" + +#include + +#include "OSD.h" +#include "OpRequest.h" +#include "ScrubStore.h" +#include "scrub_machine_lstnr.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_osd +#undef dout_prefix +#define dout_prefix *_dout << " scrubberFSM " + + +using namespace std::chrono; +using namespace std::chrono_literals; +namespace sc = boost::statechart; + +#define DECLARE_LOCALS \ + ScrubMachineListener* scrbr = context().m_scrbr; \ + std::ignore = scrbr; \ + auto pg_id = context().m_pg_id; \ + std::ignore = pg_id; + +namespace Scrub { + +// --------- trace/debug auxiliaries ------------------------------- + +// development code. To be removed +void on_event_creation(std::string_view nm) +{ + dout(20) << " scrubberFSM event: --vvvv---- " << nm << dendl; +} + +// development code. To be removed +void on_event_discard(std::string_view nm) +{ + dout(20) << " scrubberFSM event: --^^^^---- " << nm << dendl; +} + +void ScrubMachine::my_states() const +{ + for (auto si = state_begin(); si != state_end(); ++si) { + const auto& siw{*si}; // prevents a warning re side-effects + dout(20) << __func__ << " : scrub-states : " << typeid(siw).name() << dendl; + } +} + +void ScrubMachine::assert_not_active() const +{ + ceph_assert(state_cast()); +} + +bool ScrubMachine::is_reserving() const +{ + return state_cast(); +} + +// for the rest of the code in this file - we know what PG we are dealing with: +#undef dout_prefix +#define dout_prefix _prefix(_dout, this->context().m_pg) +template static ostream& _prefix(std::ostream* _dout, T* t) +{ + return t->gen_prefix(*_dout) << " scrubberFSM pg(" << t->pg_id << ") "; +} + +// ////////////// the actual actions + +// ----------------------- NotActive ----------------------------------------- + +NotActive::NotActive(my_context ctx) : my_base(ctx) +{ + dout(10) << " -- state -->> NotActive" << dendl; +} + +sc::result NotActive::react(const EpochChanged&) +{ + dout(15) << "NotActive::react(const EpochChanged&)" << dendl; + return discard_event(); +} + +// ----------------------- ReservingReplicas --------------------------------- + +ReservingReplicas::ReservingReplicas(my_context ctx) : my_base(ctx) +{ + dout(10) << " -- state -->> ReservingReplicas" << dendl; + DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases + scrbr->reserve_replicas(); +} + +/** + * at least one replica denied us the scrub resources we've requested + */ +sc::result ReservingReplicas::react(const ReservationFailure&) +{ + DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases + dout(10) << "ReservingReplicas::react(const ReservationFailure&)" << dendl; + + // the Scrubber must release all resources and abort the scrubbing + scrbr->clear_pgscrub_state(false); + return transit(); +} + +sc::result ReservingReplicas::react(const EpochChanged&) +{ + DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases + dout(10) << "ReservingReplicas::react(const EpochChanged&)" << dendl; + + // the Scrubber must release all resources and abort the scrubbing + scrbr->clear_pgscrub_state(false); + return transit(); +} + +/** + * note: the event poster is handling the scrubber reset + */ +sc::result ReservingReplicas::react(const FullReset&) +{ + dout(10) << "ReservingReplicas::react(const FullReset&)" << dendl; + return transit(); +} + +// ----------------------- ActiveScrubbing ----------------------------------- + +ActiveScrubbing::ActiveScrubbing(my_context ctx) : my_base(ctx) +{ + dout(10) << " -- state -->> ActiveScrubbing" << dendl; + DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases + scrbr->on_init(); +} + +/** + * upon exiting the Active state + */ +ActiveScrubbing::~ActiveScrubbing() +{ + DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases + dout(15) << __func__ << dendl; + scrbr->unreserve_replicas(); +} + +void ScrubMachine::down_on_epoch_change(const EpochChanged&) +{ + DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases + dout(10) << __func__ << dendl; + scrbr->unreserve_replicas(); +} + +void ScrubMachine::on_epoch_changed(const EpochChanged&) +{ + DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases + dout(10) << __func__ << dendl; + // the Scrubber must release all resources and abort the scrubbing + scrbr->clear_pgscrub_state(false); +} + +sc::result ActiveScrubbing::react(const FullReset&) +{ + dout(10) << "ActiveScrubbing::react(const FullReset&)" << dendl; + // caller takes care of this: scrbr->clear_pgscrub_state(false); + return transit(); +} + +// ----------------------- RangeBlocked ----------------------------------- + +/* + * Blocked. Will be released by kick_object_context_blocked() (or upon + * an abort) + */ +RangeBlocked::RangeBlocked(my_context ctx) : my_base(ctx) +{ + dout(10) << " -- state -->> Act/RangeBlocked" << dendl; +} + +// ----------------------- PendingTimer ----------------------------------- + +/** + * Sleeping till timer reactivation - or just requeuing + */ +PendingTimer::PendingTimer(my_context ctx) : my_base(ctx) +{ + dout(10) << " -- state -->> Act/PendingTimer" << dendl; + DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases + + scrbr->add_delayed_scheduling(); +} + +// ----------------------- NewChunk ----------------------------------- + +/** + * Preconditions: + * - preemption data was set + * - epoch start was updated + */ +NewChunk::NewChunk(my_context ctx) : my_base(ctx) +{ + dout(10) << " -- state -->> Act/NewChunk" << dendl; + DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases + + scrbr->get_preemptor()->adjust_parameters(); + + // choose range to work on + bool got_a_chunk = scrbr->select_range(); + if (got_a_chunk) { + dout(15) << __func__ << " selection OK" << dendl; + post_event(boost::intrusive_ptr(new SelectedChunkFree{})); + } else { + dout(10) << __func__ << " selected chunk is busy" << dendl; + // wait until we are available (transitioning to Blocked) + post_event(boost::intrusive_ptr(new ChunkIsBusy{})); + } +} + +sc::result NewChunk::react(const SelectedChunkFree&) +{ + DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases + dout(10) << "NewChunk::react(const SelectedChunkFree&)" << dendl; + + scrbr->set_subset_last_update(scrbr->search_log_for_updates()); + return transit(); +} + +// ----------------------- WaitPushes ----------------------------------- + +WaitPushes::WaitPushes(my_context ctx) : my_base(ctx) +{ + dout(10) << " -- state -->> Act/WaitPushes" << dendl; + post_event(boost::intrusive_ptr(new ActivePushesUpd{})); +} + +/* + * Triggered externally, by the entity that had an update re pushes + */ +sc::result WaitPushes::react(const ActivePushesUpd&) +{ + DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases + dout(10) << "WaitPushes::react(const ActivePushesUpd&) pending_active_pushes: " + << scrbr->pending_active_pushes() << dendl; + + if (!scrbr->pending_active_pushes()) { + // done waiting + return transit(); + } + + return discard_event(); +} + +// ----------------------- WaitLastUpdate ----------------------------------- + +WaitLastUpdate::WaitLastUpdate(my_context ctx) : my_base(ctx) +{ + dout(10) << " -- state -->> Act/WaitLastUpdate" << dendl; + post_event(boost::intrusive_ptr(new UpdatesApplied{})); +} + +void WaitLastUpdate::on_new_updates(const UpdatesApplied&) +{ + DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases + dout(10) << "WaitLastUpdate::on_new_updates(const UpdatesApplied&)" << dendl; + + if (scrbr->has_pg_marked_new_updates()) { + post_event(boost::intrusive_ptr(new InternalAllUpdates{})); + } else { + // will be requeued by op_applied + dout(10) << "wait for EC read/modify/writes to queue" << dendl; + } +} + +/* + * request maps from the replicas in the acting set + */ +sc::result WaitLastUpdate::react(const InternalAllUpdates&) +{ + DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases + dout(10) << "WaitLastUpdate::react(const InternalAllUpdates&)" << dendl; + + if (scrbr->was_epoch_changed()) { + dout(10) << "WaitLastUpdate: epoch!" << dendl; + post_event(boost::intrusive_ptr(new EpochChanged{})); + return discard_event(); + } + + dout(10) << "WaitLastUpdate::react(const InternalAllUpdates&) " + << scrbr->get_preemptor()->is_preemptable() << dendl; + scrbr->get_replicas_maps(scrbr->get_preemptor()->is_preemptable()); + return transit(); +} + +// ----------------------- BuildMap ----------------------------------- + +BuildMap::BuildMap(my_context ctx) : my_base(ctx) +{ + dout(10) << " -- state -->> Act/BuildMap" << dendl; + DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases + dout(15) << __func__ << " same epoch? " << (scrbr->was_epoch_changed() ? "no" : "yes") + << dendl; + + if (scrbr->was_epoch_changed()) { + + post_event(boost::intrusive_ptr(new EpochChanged{})); + + } else if (scrbr->get_preemptor()->was_preempted()) { + + // we were preempted, either directly or by a replica + dout(10) << __func__ << " preempted!!!" << dendl; + scrbr->mark_local_map_ready(); + post_event(boost::intrusive_ptr(new IntBmPreempted{})); + + } else { + + auto ret = scrbr->build_primary_map_chunk(); + + if (ret == -EINPROGRESS) { + // must wait for the backend to finish. No specific event provided. + // build_primary_map_chunk() has already requeued us. + dout(20) << "waiting for the backend..." << dendl; + + } else if (ret < 0) { + + dout(10) << "BuildMap::BuildMap() Error! Aborting. Ret: " << ret << dendl; + scrbr->mark_local_map_ready(); + post_event(boost::intrusive_ptr(new InternalError{})); + + } else { + + // the local map was created + post_event(boost::intrusive_ptr(new IntLocalMapDone{})); + } + } +} + +sc::result BuildMap::react(const IntLocalMapDone&) +{ + DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases + dout(10) << "BuildMap::react(const IntLocalMapDone&)" << dendl; + + scrbr->mark_local_map_ready(); + return transit(); +} + +// ----------------------- DrainReplMaps ----------------------------------- + +DrainReplMaps::DrainReplMaps(my_context ctx) : my_base(ctx) +{ + dout(10) << " -- state -->> Act/DrainReplMaps" << dendl; + // we may have received all maps already. Send the event that will make us check. + post_event(boost::intrusive_ptr(new GotReplicas{})); +} + +sc::result DrainReplMaps::react(const GotReplicas&) +{ + DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases + dout(10) << "DrainReplMaps::react(const GotReplicas&)" << dendl; + + if (scrbr->are_all_maps_available()) { + // NewChunk will handle the preemption that brought us to this state + return transit(); + } + + dout(10) << "DrainReplMaps::react(const GotReplicas&): still draining incoming maps: " + << scrbr->dump_awaited_maps() << dendl; + return discard_event(); +} + +// ----------------------- WaitReplicas ----------------------------------- + +WaitReplicas::WaitReplicas(my_context ctx) : my_base(ctx) +{ + dout(10) << " -- state -->> Act/WaitReplicas" << dendl; + post_event(boost::intrusive_ptr(new GotReplicas{})); +} + +sc::result WaitReplicas::react(const GotReplicas&) +{ + DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases + dout(10) << "WaitReplicas::react(const GotReplicas&)" << dendl; + + if (scrbr->are_all_maps_available()) { + dout(10) << "WaitReplicas::react(const GotReplicas&) got all" << dendl; + + // were we preempted? + if (scrbr->get_preemptor()->disable_and_test()) { // a test&set + + + dout(10) << "WaitReplicas::react(const GotReplicas&) PREEMPTED!" << dendl; + return transit(); + + } else { + + dout(8) << "got the replicas!" << dendl; + scrbr->maps_compare_n_cleanup(); + return transit(); + } + } else { + return discard_event(); + } +} + +// ----------------------- WaitDigestUpdate ----------------------------------- + +WaitDigestUpdate::WaitDigestUpdate(my_context ctx) : my_base(ctx) +{ + dout(10) << " -- state -->> Act/WaitDigestUpdate" << dendl; + // perform an initial check: maybe we already + // have all the updates we need: + // (note that DigestUpdate is usually an external event) + post_event(boost::intrusive_ptr(new DigestUpdate{})); +} + +sc::result WaitDigestUpdate::react(const DigestUpdate&) +{ + DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases + dout(10) << "WaitDigestUpdate::react(const DigestUpdate&)" << dendl; + + switch (scrbr->on_digest_updates()) { + + case Scrub::FsmNext::goto_notactive: + // scrubbing is done + return transit(); + + case Scrub::FsmNext::next_chunk: + // go get the next chunk + return transit(); + + case Scrub::FsmNext::do_discard: + // still waiting for more updates + return discard_event(); + } + __builtin_unreachable(); // Prevent a gcc warning. + // Adding a phony 'default:' above is wrong: (a) prevents a + // warning if FsmNext is extended, and (b) elicits a correct + // warning from Clang +} + +ScrubMachine::ScrubMachine(PG* pg, ScrubMachineListener* pg_scrub) + : m_pg{pg}, m_pg_id{pg->pg_id}, m_scrbr{pg_scrub} +{ + dout(15) << "ScrubMachine created " << m_pg_id << dendl; +} + +ScrubMachine::~ScrubMachine() +{ + dout(20) << "~ScrubMachine " << m_pg_id << dendl; +} + +// -------- for replicas ----------------------------------------------------- + +// ----------------------- ReplicaWaitUpdates -------------------------------- + +ReplicaWaitUpdates::ReplicaWaitUpdates(my_context ctx) : my_base(ctx) +{ + dout(10) << " -- state -->> ReplicaWaitUpdates" << dendl; + DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases + + scrbr->on_replica_init(); + post_event(boost::intrusive_ptr(new ReplicaPushesUpd{})); +} + +sc::result ReplicaWaitUpdates::react(const EpochChanged&) +{ + dout(10) << "ReplicaWaitUpdates::react(const EpochChanged&)" << dendl; + return transit(); +} + +/* + * Triggered externally, by the entity that had an update re pushes + */ +sc::result ReplicaWaitUpdates::react(const ReplicaPushesUpd&) +{ + DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases + dout(10) << "ReplicaWaitUpdates::react(const ReplicaPushesUpd&): " + << scrbr->pending_active_pushes() << dendl; + dout(8) << "same epoch? " << !scrbr->was_epoch_changed() << dendl; + + if (scrbr->was_epoch_changed()) { + + post_event(boost::intrusive_ptr(new EpochChanged{})); + + } else if (scrbr->pending_active_pushes() == 0) { + + // done waiting + scrbr->replica_update_start_epoch(); + return transit(); + } + + return discard_event(); +} + +// ----------------------- ActiveReplica ----------------------------------- + +ActiveReplica::ActiveReplica(my_context ctx) : my_base(ctx) +{ + dout(10) << " -- state -->> ActiveReplica" << dendl; + post_event(boost::intrusive_ptr(new SchedReplica{})); +} + +sc::result ActiveReplica::react(const SchedReplica&) +{ + DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases + dout(10) << "ActiveReplica::react(const SchedReplica&). is_preemptable? " + << scrbr->get_preemptor()->is_preemptable() << dendl; + + if (scrbr->was_epoch_changed()) { + + dout(10) << "epoch changed" << dendl; + post_event(boost::intrusive_ptr(new EpochChanged{})); + + } else if (scrbr->get_preemptor()->was_preempted()) { + + dout(10) << "replica scrub job preempted" << dendl; + + scrbr->send_replica_map(true); + post_event(boost::intrusive_ptr(new IntLocalMapDone{})); + + } else { + + // start or check progress of build_replica_map_chunk() + + auto ret = scrbr->build_replica_map_chunk(); + dout(15) << "ActiveReplica::react(const SchedReplica&) Ret: " << ret << dendl; + + if (ret == -EINPROGRESS) { + + // must wait for the backend to finish. No external event source. + // build_replica_map_chunk() has already requeued a SchedReplica + // event. + + dout(20) << "waiting for the backend..." << dendl; + + } else if (ret < 0) { + + // the existing code ignores this option, treating an error + // report as a success. + /// \todo what should we do here? + + dout(1) << "Error! Aborting. ActiveReplica::react(const " + "SchedReplica&) Ret: " + << ret << dendl; + post_event(boost::intrusive_ptr(new IntLocalMapDone{})); + + } else { + + // the local map was created. Send it to the primary. + + scrbr->send_replica_map(false); // 'false' == not preempted + post_event(boost::intrusive_ptr(new IntLocalMapDone{})); + } + } + return discard_event(); +} + +sc::result ActiveReplica::react(const IntLocalMapDone&) +{ + dout(10) << "ActiveReplica::react(const IntLocalMapDone&)" << dendl; + DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases + + scrbr->replica_handling_done(); + return transit(); +} + +sc::result ActiveReplica::react(const InternalError&) +{ + DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases + dout(1) << "Error! Aborting." + << " ActiveReplica::react(const InternalError&) " << dendl; + + scrbr->replica_handling_done(); + return transit(); +} + +sc::result ActiveReplica::react(const EpochChanged&) +{ + DECLARE_LOCALS; // 'scrbr' & 'pg_id' aliases + dout(10) << "ActiveReplica::react(const EpochChanged&) " << dendl; + + scrbr->send_replica_map(false); + scrbr->replica_handling_done(); + return transit(); +} + +/** + * the event poster is handling the scrubber reset + */ +sc::result ActiveReplica::react(const FullReset&) +{ + dout(10) << "ActiveReplica::react(const FullReset&)" << dendl; + // caller takes care of this: scrbr->clear_pgscrub_state(false); + return transit(); +} + +} // namespace Scrub diff --git a/src/osd/scrub_machine.h b/src/osd/scrub_machine.h new file mode 100644 index 000000000000..ae1f7e812a1e --- /dev/null +++ b/src/osd/scrub_machine.h @@ -0,0 +1,327 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +#pragma once + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "common/version.h" +#include "include/Context.h" + +#include "scrub_machine_lstnr.h" +#include "scrubber_common.h" + +using namespace std::string_literals; + +class PG; // holding a pointer to that one - just for testing +class PgScrubber; +namespace Scrub { + +namespace sc = ::boost::statechart; +namespace mpl = ::boost::mpl; + +// +// EVENTS +// + +void on_event_creation(std::string_view nm); +void on_event_discard(std::string_view nm); + +#define MEV(E) \ + struct E : sc::event { \ + inline static int actv{0}; \ + E() \ + { \ + if (!actv++) \ + on_event_creation(#E); \ + } \ + ~E() \ + { \ + if (!--actv) \ + on_event_discard(#E); \ + } \ + }; + +MEV(RemotesReserved) ///< all replicas have granted our reserve request +MEV(ReservationFailure) ///< a reservation request has failed +MEV(EpochChanged) ///< ... from what it was when this chunk started +MEV(StartScrub) ///< initiate a new scrubbing session (relevant if we are a Primary) +MEV(AfterRepairScrub) ///< initiate a new scrubbing session. Only triggered at Recovery + ///< completion. +MEV(Unblocked) ///< triggered when the PG unblocked an object that was marked for + ///< scrubbing. Via the PGScrubUnblocked op +MEV(InternalSchedScrub) +MEV(SelectedChunkFree) +MEV(ChunkIsBusy) +MEV(ActivePushesUpd) ///< Update to active_pushes. 'active_pushes' represents recovery + ///< that is in-flight to the local ObjectStore +MEV(UpdatesApplied) // external +MEV(InternalAllUpdates) ///< the internal counterpart of UpdatesApplied +MEV(GotReplicas) ///< got a map from a replica + +MEV(IntBmPreempted) ///< internal - BuildMap preempted. Required, as detected within the + ///< ctor +MEV(InternalError) + +MEV(IntLocalMapDone) + +MEV(DigestUpdate) ///< external. called upon success of a MODIFY op. See + ///< scrub_snapshot_metadata() +MEV(AllChunksDone) + +MEV(StartReplica) ///< initiating replica scrub. replica_scrub_op() -> OSD Q -> + ///< replica_scrub() +MEV(SchedReplica) +MEV(ReplicaPushesUpd) ///< Update to active_pushes. 'active_pushes' represents recovery + ///< that is in-flight to the local ObjectStore + +MEV(FullReset) ///< guarantee that the FSM is in the quiescent state (i.e. NotActive) + + +struct NotActive; ///< the quiescent state. No active scrubbing. +struct ReservingReplicas; ///< securing scrub resources from replicas' OSDs +struct ActiveScrubbing; ///< the active state for a Primary. A sub-machine. +struct ReplicaWaitUpdates; ///< an active state for a replica. Waiting for all active + ///< operations to finish. +struct ActiveReplica; ///< an active state for a replica. + + +class ScrubMachine : public sc::state_machine { + public: + friend class PgScrubber; + + public: + explicit ScrubMachine(PG* pg, ScrubMachineListener* pg_scrub); + ~ScrubMachine(); + + PG* m_pg; // only used for dout messages + spg_t m_pg_id; + + ScrubMachineListener* m_scrbr; + + void down_on_epoch_change(const EpochChanged&); + + void on_epoch_changed(const EpochChanged&); + + void my_states() const; + + void assert_not_active() const; + + [[nodiscard]] bool is_reserving() const; +}; + +/** + * The Scrubber's base (quiescent) state. + * Scrubbing is triggered by one of the following events: + * - (standard scenario for a Primary): 'StartScrub'. Initiates the OSDs resources + * reservation process. Will be issued by PG::scrub(), following a + * queued "PGScrub" op. + * - a special end-of-recovery Primary scrub event ('AfterRepairScrub') that is + * not required to reserve resources. + * - (for a replica) 'StartReplica', triggered by an incoming MOSDRepScrub msg. + */ +struct NotActive : sc::state { + explicit NotActive(my_context ctx); + + using reactions = mpl::list, + sc::transition, + // a scrubbing that was initiated at recovery completion, + // and requires no resource reservations: + sc::transition, + sc::transition, + sc::custom_reaction>; + + sc::result react(const EpochChanged&); + sc::result react(const sc::event_base&) // in the future: assert here + { + return discard_event(); + } +}; + +struct ReservingReplicas : sc::state { + + explicit ReservingReplicas(my_context ctx); + using reactions = mpl::list, + // all replicas granted our resources request + sc::transition, + sc::custom_reaction, + sc::custom_reaction>; + + sc::result react(const EpochChanged&); + sc::result react(const FullReset&); + sc::result react(const ReservationFailure&); +}; + + +// the "active" sub-states + +struct RangeBlocked; ///< the objects range is blocked +struct PendingTimer; ///< either delaying the scrub by some time and requeuing, or just + ///< requeue +struct NewChunk; ///< select a chunk to scrub, and verify its availability +struct WaitPushes; +struct WaitLastUpdate; +struct BuildMap; +struct DrainReplMaps; ///< a problem during BuildMap. Wait for all replicas to report, + ///< then restart. +struct WaitReplicas; ///< wait for all replicas to report + +struct ActiveScrubbing : sc::state { + + explicit ActiveScrubbing(my_context ctx); + ~ActiveScrubbing(); + + using reactions = mpl::list< + // done scrubbing + sc::transition, + + sc::transition, + sc::custom_reaction>; + + sc::result react(const AllChunksDone&); + sc::result react(const FullReset&); +}; + +struct RangeBlocked : sc::state { + explicit RangeBlocked(my_context ctx); + using reactions = mpl::list>; +}; + +struct PendingTimer : sc::state { + + explicit PendingTimer(my_context ctx); + + using reactions = mpl::list>; +}; + +struct NewChunk : sc::state { + + explicit NewChunk(my_context ctx); + + using reactions = mpl::list, + sc::custom_reaction>; + + sc::result react(const SelectedChunkFree&); +}; + +/** + * initiate the update process for this chunk + * + * Wait fo 'active_pushes' to clear. + * 'active_pushes' represents recovery that is in-flight to the local Objectstore, hence + * scrub waits until the correct data is readable (in-flight data to the Objectstore is + * not readable until written to disk, termed 'applied' here) + */ +struct WaitPushes : sc::state { + + explicit WaitPushes(my_context ctx); + + using reactions = mpl::list>; + + sc::result react(const ActivePushesUpd&); +}; + +struct WaitLastUpdate : sc::state { + + explicit WaitLastUpdate(my_context ctx); + + void on_new_updates(const UpdatesApplied&); + + using reactions = mpl::list, + sc::in_state_reaction>; + + sc::result react(const InternalAllUpdates&); +}; + +struct BuildMap : sc::state { + explicit BuildMap(my_context ctx); + + using reactions = + mpl::list, + sc::transition, // looping, waiting + // for the backend to + // finish + sc::custom_reaction, + sc::transition>; // to discuss RRR + + sc::result react(const IntLocalMapDone&); +}; + +/* + * "drain" scrub-maps responses from replicas + */ +struct DrainReplMaps : sc::state { + explicit DrainReplMaps(my_context ctx); + + using reactions = + mpl::list // all replicas are accounted for + >; + + sc::result react(const GotReplicas&); +}; + +struct WaitReplicas : sc::state { + explicit WaitReplicas(my_context ctx); + + using reactions = + mpl::list, sc::deferral>; + + sc::result react(const GotReplicas&); +}; + +struct WaitDigestUpdate : sc::state { + explicit WaitDigestUpdate(my_context ctx); + + using reactions = mpl::list>; + sc::result react(const DigestUpdate&); +}; + +// ----------------------------- the "replica active" states ----------------------- + +/* + * Waiting for 'active_pushes' to complete + * + * When in this state: + * - the details of the Primary's request were internalized by PgScrubber; + * - 'active' scrubbing is set + */ +struct ReplicaWaitUpdates : sc::state { + explicit ReplicaWaitUpdates(my_context ctx); + using reactions = + mpl::list, sc::custom_reaction>; + + sc::result react(const ReplicaPushesUpd&); + sc::result react(const EpochChanged&); +}; + + +struct ActiveReplica : sc::state { + explicit ActiveReplica(my_context ctx); + using reactions = mpl::list, + sc::custom_reaction, + sc::custom_reaction, + sc::custom_reaction, + sc::custom_reaction>; + + sc::result react(const SchedReplica&); + sc::result react(const EpochChanged&); + sc::result react(const IntLocalMapDone&); + sc::result react(const FullReset&); + sc::result react(const InternalError&); +}; + +} // namespace Scrub