From: Radoslaw Zarzynski Date: Tue, 28 Apr 2020 14:45:52 +0000 (+0200) Subject: crimson/osd: bring the boost::statechart-based FSM for backfill. X-Git-Tag: v16.1.0~1720^2~17 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=bfc512023b0a8db0696d4e0cbe17fca2f6918a13;p=ceph.git crimson/osd: bring the boost::statechart-based FSM for backfill. Signed-off-by: Radoslaw Zarzynski --- diff --git a/src/crimson/osd/CMakeLists.txt b/src/crimson/osd/CMakeLists.txt index b740e45b0fab..0e85a23f37c4 100644 --- a/src/crimson/osd/CMakeLists.txt +++ b/src/crimson/osd/CMakeLists.txt @@ -1,4 +1,5 @@ add_executable(crimson-osd + backfill_state.cc ec_backend.cc heartbeat.cc main.cc diff --git a/src/crimson/osd/backfill_facades.h b/src/crimson/osd/backfill_facades.h new file mode 100644 index 000000000000..41faca5b9201 --- /dev/null +++ b/src/crimson/osd/backfill_facades.h @@ -0,0 +1,57 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include "crimson/osd/backfill_state.h" +#include "crimson/osd/pg.h" +#include "osd/PeeringState.h" + +namespace crimson::osd { + +// PeeringFacade -- a facade (in the GoF-defined meaning) simplifying +// the interface of PeeringState. The motivation is to have an inventory +// of behaviour that must be provided by a unit test's mock. +struct BackfillState::PeeringFacade { + PeeringState& peering_state; + + decltype(auto) earliest_backfill() const { + return peering_state.earliest_backfill(); + } + + decltype(auto) get_backfill_targets() const { + return peering_state.get_backfill_targets(); + } + + decltype(auto) get_peer_info(pg_shard_t peer) const { + return peering_state.get_peer_info(peer); + } + + decltype(auto) get_info() const { + return peering_state.get_info(); + } + + decltype(auto) get_pg_log() const { + return peering_state.get_pg_log(); + } + bool is_backfill_target(pg_shard_t peer) const { + return peering_state.is_backfill_target(peer); + } + void update_complete_backfill_object_stats(const hobject_t &hoid, + const pg_stat_t &stats) { + return peering_state.update_complete_backfill_object_stats(hoid, stats); + } +}; + +// PGFacade -- a facade (in the GoF-defined meaning) simplifying the huge +// interface of crimson's PG class. The motivation is to have an inventory +// of behaviour that must be provided by a unit test's mock. +struct BackfillState::PGFacade { + PG& pg; + + decltype(auto) get_projected_last_update() const { + return pg.projected_last_update; + } +}; + +} // namespace crimson::osd diff --git a/src/crimson/osd/backfill_state.cc b/src/crimson/osd/backfill_state.cc new file mode 100644 index 000000000000..7181e31a3a63 --- /dev/null +++ b/src/crimson/osd/backfill_state.cc @@ -0,0 +1,533 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include + +#include "crimson/osd/backfill_state.h" +#include "crimson/osd/backfill_facades.h" +#include "crimson/osd/pg.h" +#include "osd/PeeringState.h" + +namespace { + seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_osd); + } +} + +namespace crimson::osd { + +BackfillState::BackfillState( + BackfillState::BackfillListener& backfill_listener, + std::unique_ptr peering_state, + std::unique_ptr pg) + : backfill_machine(*this, + backfill_listener, + std::move(peering_state), + std::move(pg)), + progress_tracker( + std::make_unique(backfill_machine)) +{ + logger().debug("{}:{}", __func__, __LINE__); + backfill_machine.initiate(); +} + +BackfillState::~BackfillState() = default; + +BackfillState::BackfillMachine::BackfillMachine( + BackfillState& backfill_state, + BackfillState::BackfillListener& backfill_listener, + std::unique_ptr peering_state, + std::unique_ptr pg) + : backfill_state(backfill_state), + backfill_listener(backfill_listener), + peering_state(std::move(peering_state)), + pg(std::move(pg)) +{} + +BackfillState::BackfillMachine::~BackfillMachine() = default; + +BackfillState::Initial::Initial(my_context ctx) + : my_base(ctx) +{ + backfill_state().last_backfill_started = peering_state().earliest_backfill(); + logger().debug("{}: bft={} from {}", + __func__, peering_state().get_backfill_targets(), + backfill_state().last_backfill_started); + for (const auto& bt : peering_state().get_backfill_targets()) { + logger().debug("{}: target shard {} from {}", + __func__, bt, peering_state().get_peer_info(bt).last_backfill); + } + ceph_assert(peering_state().get_backfill_targets().size()); + ceph_assert(!backfill_state().last_backfill_started.is_max()); +} + +boost::statechart::result +BackfillState::Initial::react(const BackfillState::Triggered& evt) +{ + logger().debug("{}: backfill triggered", __func__); + ceph_assert(backfill_state().last_backfill_started == \ + peering_state().earliest_backfill()); + // initialize BackfillIntervals + for (const auto& bt : peering_state().get_backfill_targets()) { + backfill_state().peer_backfill_info[bt].reset( + peering_state().get_peer_info(bt).last_backfill); + } + backfill_state().backfill_info.reset(backfill_state().last_backfill_started); + if (Enqueuing::all_enqueued(peering_state(), + backfill_state().backfill_info, + backfill_state().peer_backfill_info)) { + logger().debug("{}: switching to Done state", __func__); + return transit(); + } else { + logger().debug("{}: switching to Enqueuing state", __func__); + return transit(); + } +} + + +// -- Enqueuing +void BackfillState::Enqueuing::maybe_update_range() +{ + if (auto& primary_bi = backfill_state().backfill_info; + primary_bi.version >= pg().get_projected_last_update()) { + logger().info("{}: bi is current", __func__); + ceph_assert(primary_bi.version == pg().get_projected_last_update()); + } else if (primary_bi.version >= peering_state().get_info().log_tail) { +#if 0 + if (peering_state().get_pg_log().get_log().empty() && + pg().get_projected_log().empty()) { + /* Because we don't move log_tail on split, the log might be + * empty even if log_tail != last_update. However, the only + * way to get here with an empty log is if log_tail is actually + * eversion_t(), because otherwise the entry which changed + * last_update since the last scan would have to be present. + */ + ceph_assert(primary_bi.version == eversion_t()); + return; + } +#endif + logger().debug("{}: bi is old, ({}) can be updated with log to {}", + __func__, + primary_bi.version, + pg().get_projected_last_update()); + logger().debug("{}: scanning pg log first", __func__); + peering_state().get_pg_log().get_log().scan_log_after(primary_bi.version, + [&, this](const pg_log_entry_t& e) { + logger().debug("maybe_update_range(lambda): updating from version {}", + e.version); + if (e.soid >= primary_bi.begin && e.soid < primary_bi.end) { + if (e.is_update()) { + logger().debug("maybe_update_range(lambda): {} updated to ver {}", + e.soid, e.version); + primary_bi.objects.erase(e.soid); + primary_bi.objects.insert(std::make_pair(e.soid, + e.version)); + } else if (e.is_delete()) { + logger().debug("maybe_update_range(lambda): {} removed", + e.soid); + primary_bi.objects.erase(e.soid); + } + } + }); + primary_bi.version = pg().get_projected_last_update(); + } else { + ceph_abort_msg( + "scan_range should have raised primary_bi.version past log_tail"); + } +} + +void BackfillState::Enqueuing::trim_backfill_infos() +{ + for (const auto& bt : peering_state().get_backfill_targets()) { + backfill_state().peer_backfill_info[bt].trim_to( + std::max(peering_state().get_peer_info(bt).last_backfill, + backfill_state().last_backfill_started)); + } + backfill_state().backfill_info.trim_to( + backfill_state().last_backfill_started); +} + +/* static */ bool BackfillState::Enqueuing::all_enqueued( + const PeeringFacade& peering_state, + const BackfillInterval& backfill_info, + const std::map& peer_backfill_info) +{ + const bool all_local_enqueued = \ + backfill_info.extends_to_end() && backfill_info.empty(); + const bool all_peer_enqueued = std::all_of( + std::begin(peer_backfill_info), + std::end(peer_backfill_info), + [] (const auto& kv) { + [[maybe_unused]] const auto& [ shard, peer_backfill_info ] = kv; + return peer_backfill_info.extends_to_end() && peer_backfill_info.empty(); + }); + return all_local_enqueued && all_peer_enqueued; +} + +hobject_t BackfillState::Enqueuing::earliest_peer_backfill( + const std::map& peer_backfill_info) const +{ + hobject_t e = hobject_t::get_max(); + for (const pg_shard_t& bt : peering_state().get_backfill_targets()) { + const auto iter = peer_backfill_info.find(bt); + ceph_assert(iter != peer_backfill_info.end()); + e = std::min(e, iter->second.begin); + } + return e; +} + +bool BackfillState::Enqueuing::should_rescan_replicas( + const std::map& peer_backfill_info, + const BackfillInterval& backfill_info) const +{ + const auto& targets = peering_state().get_backfill_targets(); + return std::any_of(std::begin(targets), std::end(targets), + [&, this] (const auto& bt) { + return ReplicasScanning::replica_needs_scan(peer_backfill_info.at(bt), + backfill_info); + }); +} + +bool BackfillState::Enqueuing::should_rescan_primary( + const std::map& peer_backfill_info, + const BackfillInterval& backfill_info) const +{ + return backfill_info.begin <= earliest_peer_backfill(peer_backfill_info) && + !backfill_info.extends_to_end(); +} + +void BackfillState::Enqueuing::trim_backfilled_object_from_intervals( + BackfillState::Enqueuing::result_t&& result, + hobject_t& last_backfill_started, + std::map& peer_backfill_info) +{ + std::for_each(std::begin(result.pbi_targets), std::end(result.pbi_targets), + [this, &peer_backfill_info] (const auto& bt) { + peer_backfill_info.at(bt).pop_front(); + }); + last_backfill_started = std::move(result.new_last_backfill_started); +} + +BackfillState::Enqueuing::result_t +BackfillState::Enqueuing::remove_on_peers(const hobject_t& check) +{ + // set `new_last_backfill_started` to `check` + result_t result { {}, check }; + for (const auto& bt : peering_state().get_backfill_targets()) { + const auto& pbi = backfill_state().peer_backfill_info.at(bt); + if (pbi.begin == check) { + result.pbi_targets.insert(bt); + const auto& version = pbi.objects.begin()->second; + backfill_state().progress_tracker->enqueue_drop(pbi.begin); + backfill_listener().enqueue_drop(bt, pbi.begin, version); + } + } + logger().debug("{}: BACKFILL removing {} from peers {}", + __func__, check, result.pbi_targets); + ceph_assert(!result.pbi_targets.empty()); + return result; +} + +BackfillState::Enqueuing::result_t +BackfillState::Enqueuing::update_on_peers(const hobject_t& check) +{ + const auto& primary_bi = backfill_state().backfill_info; + result_t result { {}, primary_bi.begin }; + + for (const auto& bt : peering_state().get_backfill_targets()) { + const auto& peer_bi = backfill_state().peer_backfill_info.at(bt); + + // Find all check peers that have the wrong version + if (const eversion_t& obj_v = primary_bi.objects.begin()->second; + check == primary_bi.begin && check == peer_bi.begin) { + if(peer_bi.objects.begin()->second != obj_v) { + backfill_state().progress_tracker->enqueue_push(primary_bi.begin); + backfill_listener().enqueue_push(bt, primary_bi.begin, obj_v); + } else { + // it's fine, keep it! + } + result.pbi_targets.insert(bt); + } else { + const pg_info_t& pinfo = peering_state().get_peer_info(bt); + // Only include peers that we've caught up to their backfill line + // otherwise, they only appear to be missing this object + // because their peer_bi.begin > backfill_info.begin. + if (primary_bi.begin > pinfo.last_backfill) { + backfill_state().progress_tracker->enqueue_push(primary_bi.begin); + backfill_listener().enqueue_push(bt, primary_bi.begin, obj_v); + } + } + } + return result; +} + +BackfillState::Enqueuing::Enqueuing(my_context ctx) + : my_base(ctx) +{ + logger().debug("{}", __func__); + auto& primary_bi = backfill_state().backfill_info; + + // update our local interval to cope with recent changes + primary_bi.begin = backfill_state().last_backfill_started; + if (primary_bi.version < peering_state().get_info().log_tail) { + // it might be that the OSD is so flooded with modifying operations + // that backfill will be spinning here over and over. For the sake + // of performance and complexity we don't synchronize with entire PG. + // similar can happen in classical OSD. + logger().warn("{}: bi is old, rescanning of local backfill_info", + __func__); + post_event(RequestPrimaryScanning{}); + return; + } else { + maybe_update_range(); + } + trim_backfill_infos(); + + while (!primary_bi.empty()) { + if (!backfill_listener().budget_available()) { + post_event(RequestWaiting{}); + return; + } else if (should_rescan_replicas(backfill_state().peer_backfill_info, + primary_bi)) { + // Count simultaneous scans as a single op and let those complete + post_event(RequestReplicasScanning{}); + return; + } + // Get object within set of peers to operate on and the set of targets + // for which that object applies. + if (const hobject_t check = \ + earliest_peer_backfill(backfill_state().peer_backfill_info); + check < primary_bi.begin) { + // Don't increment ops here because deletions + // are cheap and not replied to unlike real recovery_ops, + // and we can't increment ops without requeueing ourself + // for recovery. + auto result = remove_on_peers(check); + trim_backfilled_object_from_intervals(std::move(result), + backfill_state().last_backfill_started, + backfill_state().peer_backfill_info); + } else { + auto result = update_on_peers(check); + trim_backfilled_object_from_intervals(std::move(result), + backfill_state().last_backfill_started, + backfill_state().peer_backfill_info); + backfill_state().backfill_info.pop_front(); + } + } + + if (should_rescan_primary(backfill_state().peer_backfill_info, + primary_bi)) { + // need to grab one another chunk of the object namespace and restart + // the queueing. + logger().debug("{}: reached end for current local chunk", + __func__); + post_event(RequestPrimaryScanning{}); + } else if (backfill_state().progress_tracker->tracked_objects_completed()) { + post_event(RequestDone{}); + } else { + logger().debug("{}: reached end for both local and all peers ", + "but still has in-flight operations", __func__); + post_event(RequestWaiting{}); + } +} + +// -- PrimaryScanning +BackfillState::PrimaryScanning::PrimaryScanning(my_context ctx) + : my_base(ctx) +{ + logger().debug("{}", __func__); + backfill_state().backfill_info.version = \ + peering_state().get_info().last_update; + backfill_listener().request_primary_scan( + backfill_state().backfill_info.begin); +} + +boost::statechart::result +BackfillState::PrimaryScanning::react(PrimaryScanned evt) +{ + logger().debug("{}", __func__); + backfill_state().backfill_info = std::move(evt.result); + return transit(); +} + +boost::statechart::result +BackfillState::PrimaryScanning::react(ObjectPushed evt) +{ + logger().debug("PrimaryScanning::react() on ObjectPushed; evt.object={}", + evt.object); + backfill_state().progress_tracker->complete_to(evt.object, evt.stat); + return discard_event(); +} + +// -- ReplicasScanning +bool BackfillState::ReplicasScanning::replica_needs_scan( + const BackfillInterval& replica_backfill_info, + const BackfillInterval& local_backfill_info) +{ + return replica_backfill_info.empty() && \ + replica_backfill_info.begin <= local_backfill_info.begin && \ + !replica_backfill_info.extends_to_end(); +} + +BackfillState::ReplicasScanning::ReplicasScanning(my_context ctx) + : my_base(ctx) +{ + logger().debug("{}", __func__); + for (const auto& bt : peering_state().get_backfill_targets()) { + if (const auto& pbi = backfill_state().peer_backfill_info.at(bt); + replica_needs_scan(pbi, backfill_state().backfill_info)) { + logger().debug("{}: scanning peer osd.{} from {}", + __func__, bt, pbi.end); + backfill_listener().request_replica_scan(bt, pbi.end, hobject_t{}); + + ceph_assert(waiting_on_backfill.find(bt) == \ + waiting_on_backfill.end()); + waiting_on_backfill.insert(bt); + } + } + ceph_assert(!waiting_on_backfill.empty()); + // TODO: start_recovery_op(hobject_t::get_max()); // XXX: was pbi.end +} + +#if 0 +BackfillState::ReplicasScanning::~ReplicasScanning() +{ + // TODO: finish_recovery_op(hobject_t::get_max()); +} +#endif + +boost::statechart::result +BackfillState::ReplicasScanning::react(ReplicaScanned evt) +{ + logger().debug("{}: got scan result from osd={}, result={}", + __func__, evt.from, evt.result); + // TODO: maybe we'll be able to move waiting_on_backfill from + // the machine to the state. + ceph_assert(peering_state().is_backfill_target(evt.from)); + if (waiting_on_backfill.erase(evt.from)) { + backfill_state().peer_backfill_info[evt.from] = std::move(evt.result); + if (waiting_on_backfill.empty()) { + ceph_assert(backfill_state().peer_backfill_info.size() == \ + peering_state().get_backfill_targets().size()); + return transit(); + } + } else { + // we canceled backfill for a while due to a too full, and this + // is an extra response from a non-too-full peer + logger().debug("{}: canceled backfill (too full?)", __func__); + } + return discard_event(); +} + +boost::statechart::result +BackfillState::ReplicasScanning::react(ObjectPushed evt) +{ + logger().debug("ReplicasScanning::react() on ObjectPushed; evt.object={}", + evt.object); + backfill_state().progress_tracker->complete_to(evt.object, evt.stat); + return discard_event(); +} + + +// -- Waiting +BackfillState::Waiting::Waiting(my_context ctx) + : my_base(ctx) +{ + logger().debug("{}: entered Waiting", __func__); +} + +boost::statechart::result +BackfillState::Waiting::react(ObjectPushed evt) +{ + logger().debug("Waiting::react() on ObjectPushed; evt.object={}", + evt.object); + backfill_state().progress_tracker->complete_to(evt.object, evt.stat); + if (!Enqueuing::all_enqueued(peering_state(), + backfill_state().backfill_info, + backfill_state().peer_backfill_info)) { + return transit(); + } else if (backfill_state().progress_tracker->tracked_objects_completed()) { + return transit(); + } else { + // we still have something to wait on + logger().debug("Waiting::react() on ObjectPushed; still waiting"); + return discard_event(); + } +} + +// -- Done +BackfillState::Done::Done(my_context ctx) + : my_base(ctx) +{ + logger().info("{}: backfill is done", __func__); + backfill_listener().backfilled(); +} + +// -- Crashed +BackfillState::Crashed::Crashed() +{ + ceph_abort_msg("{}: this should not happen"); +} + +// ProgressTracker is an intermediary between the BackfillListener and +// BackfillMachine + its states. All requests to push or drop an object +// are directed through it. The same happens with notifications about +// completing given operations which are generated by BackfillListener +// and dispatched as i.e. ObjectPushed events. +// This allows ProgressTacker to track the list of in-flight operations +// which is essential to make the decision whether the entire machine +// should switch from Waiting to Done keep in Waiting. +// ProgressTracker also coordinates .last_backfill_started and stats +// updates. +bool BackfillState::ProgressTracker::tracked_objects_completed() const +{ + return registry.empty(); +} + +void BackfillState::ProgressTracker::enqueue_push(const hobject_t& obj) +{ + ceph_assert(registry.count(obj) == 0); + registry[obj] = registry_item_t{ op_stage_t::enqueued_push, std::nullopt }; +} + +void BackfillState::ProgressTracker::enqueue_drop(const hobject_t& obj) +{ + ceph_assert(registry.count(obj) == 0); + registry[obj] = registry_item_t{ op_stage_t::enqueued_drop, pg_stat_t{} }; +} + +void BackfillState::ProgressTracker::complete_to( + const hobject_t& obj, + const pg_stat_t& stats) +{ + logger().debug("{}: obj={}", + __func__, obj); + if (auto completion_iter = registry.find(obj); + completion_iter != std::end(registry)) { + completion_iter->second = \ + registry_item_t{ op_stage_t::completed_push, stats }; + } else { + ceph_abort_msg("completing untracked object shall not happen"); + } + for (auto it = std::begin(registry); + it != std::end(registry) && + it->second.stage != op_stage_t::enqueued_push; + it = registry.erase(it)) { + auto& [soid, item] = *it; + assert(item.stats); + peering_state().update_complete_backfill_object_stats( + soid, + *item.stats); + } + if (Enqueuing::all_enqueued(peering_state(), + backfill_state().backfill_info, + backfill_state().peer_backfill_info) && + tracked_objects_completed()) { + backfill_state().last_backfill_started = hobject_t::get_max(); + backfill_listener().update_peers_last_backfill(hobject_t::get_max()); + } else { + backfill_listener().update_peers_last_backfill(obj); + } +} + +} // namespace crimson::osd diff --git a/src/crimson/osd/backfill_state.h b/src/crimson/osd/backfill_state.h index 21015a46cb15..a608a317bea4 100644 --- a/src/crimson/osd/backfill_state.h +++ b/src/crimson/osd/backfill_state.h @@ -3,6 +3,8 @@ #pragma once +#include + #include #include #include @@ -11,6 +13,7 @@ #include #include +#include "osd/PeeringState.h" #include "osd/recovery_types.h" namespace crimson::osd { @@ -19,6 +22,8 @@ namespace sc = boost::statechart; struct BackfillState { struct BackfillListener; + struct PeeringFacade; + struct PGFacade; // events comes first struct PrimaryScanned : sc::event { @@ -30,63 +35,228 @@ struct BackfillState { BackfillInterval result; }; - struct Flushed : sc::event { + struct ObjectPushed : sc::event { + // TODO: implement replica management; I don't want to follow + // current convention where the backend layer is responsible + // for tracking replicas. + hobject_t object; + pg_stat_t stat; }; struct Triggered : sc::event { }; +private: + // internal events + struct RequestPrimaryScanning : sc::event { + }; + + struct RequestReplicasScanning : sc::event { + }; + + struct RequestWaiting : sc::event { + }; + + struct RequestDone : sc::event { + }; + + class ProgressTracker; + +public: + struct Initial; struct Enqueuing; + struct PrimaryScanning; + struct ReplicasScanning; + struct Waiting; + struct Done; - class BackfillMachine : public sc::state_machine { + struct BackfillMachine : sc::state_machine { + BackfillMachine(BackfillState& backfill_state, + BackfillListener& backfill_listener, + std::unique_ptr peering_state, + std::unique_ptr pg); + ~BackfillMachine(); + BackfillState& backfill_state; + BackfillListener& backfill_listener; + std::unique_ptr peering_state; + std::unique_ptr pg; }; +private: + template + struct StateHelper { + BackfillState& backfill_state() { + return static_cast(this) \ + ->template context().backfill_state; + } + BackfillListener& backfill_listener() { + return static_cast(this) \ + ->template context().backfill_listener; + } + PeeringFacade& peering_state() { + return *static_cast(this) \ + ->template context().peering_state; + } + PGFacade& pg() { + return *static_cast(this)->template context().pg; + } + + const PeeringFacade& peering_state() const { + return *static_cast(this) \ + ->template context().peering_state; + } + const BackfillState& backfill_state() const { + return static_cast(this) \ + ->template context().backfill_state; + } + }; + +public: + // states - struct Crashed : sc::state, NamedState { + struct Crashed : sc::simple_state, + StateHelper { + explicit Crashed(); }; - struct Initial : sc::state, NamedState { + struct Initial : sc::state, + StateHelper { using reactions = boost::mpl::list< sc::custom_reaction, sc::transition>; + explicit Initial(my_context); // initialize after triggering backfill by on_activate_complete(). // transit to Enqueuing. sc::result react(const Triggered&); }; - struct Enqueuing : sc::state, NamedState { + struct Enqueuing : sc::state, + StateHelper { using reactions = boost::mpl::list< + sc::transition, + sc::transition, + sc::transition, + sc::transition, sc::transition>; + explicit Enqueuing(my_context); + + // indicate whether there is any remaining work to do when it comes + // to comparing the hobject_t namespace between primary and replicas. + // true doesn't necessarily mean backfill is done -- there could be + // in-flight pushes or drops which had been enqueued but aren't + // completed yet. + static bool all_enqueued( + const PeeringFacade& peering_state, + const BackfillInterval& backfill_info, + const std::map& peer_backfill_info); + + private: + void maybe_update_range(); + void trim_backfill_infos(); + + // these methods take BackfillIntervals instead of extracting them from + // the state to emphasize the relationships across the main loop. + hobject_t earliest_peer_backfill( + const std::map& peer_backfill_info) const; + bool should_rescan_replicas( + const std::map& peer_backfill_info, + const BackfillInterval& backfill_info) const; + // indicate whether a particular acting primary needs to scanned again + // to process next piece of the hobject_t's namespace. + // the logic is per analogy to replica_needs_scan(). See comments there. + bool should_rescan_primary( + const std::map& peer_backfill_info, + const BackfillInterval& backfill_info) const; + + // the result_t is intermediary between {remove,update}_on_peers() and + // updating BackfillIntervals in trim_backfilled_object_from_intervals. + // This step is important because it affects the main loop's condition, + // and thus deserves to be exposed instead of being called deeply from + // {remove,update}_on_peers(). + struct [[nodiscard]] result_t { + std::set pbi_targets; + hobject_t new_last_backfill_started; + }; + void trim_backfilled_object_from_intervals( + result_t&&, + hobject_t& last_backfill_started, + std::map& peer_backfill_info); + result_t remove_on_peers(const hobject_t& check); + result_t update_on_peers(const hobject_t& check); }; struct PrimaryScanning : sc::state, - NamedState { + StateHelper { using reactions = boost::mpl::list< + sc::custom_reaction, sc::custom_reaction, sc::transition>; + explicit PrimaryScanning(my_context); + sc::result react(ObjectPushed); // collect scanning result and transit to Enqueuing. - sc::result react(const PrimaryScanned&); + sc::result react(PrimaryScanned); }; struct ReplicasScanning : sc::state, - NamedState { + StateHelper { using reactions = boost::mpl::list< + sc::custom_reaction, sc::custom_reaction, sc::transition>; + explicit ReplicasScanning(my_context); // collect scanning result; if all results are collected, transition // to Enqueuing will happen. - sc::result react(const ReplicaScanned&); + sc::result react(ObjectPushed); + sc::result react(ReplicaScanned); + + // indicate whether a particular peer should be scanned to retrieve + // BackfillInterval for new range of hobject_t namespace. + // true when bi.objects is exhausted, replica bi's end is not MAX, + // and primary bi'begin is further than the replica's one. + static bool replica_needs_scan( + const BackfillInterval& replica_backfill_info, + const BackfillInterval& local_backfill_info); + + private: + std::set waiting_on_backfill; }; - struct Flushing : sc::simple_state, - NamedState { + struct Waiting : sc::state, + StateHelper { using reactions = boost::mpl::list< - sc::transition, + sc::custom_reaction, sc::transition>; + explicit Waiting(my_context); + sc::result react(ObjectPushed); }; + + struct Done : sc::state, + StateHelper { + using reactions = boost::mpl::list< + sc::transition>; + explicit Done(my_context); + }; + + BackfillState(BackfillListener& backfill_listener, + std::unique_ptr peering_state, + std::unique_ptr pg); + ~BackfillState(); + +private: + hobject_t last_backfill_started; + BackfillInterval backfill_info; + std::map peer_backfill_info; + BackfillMachine backfill_machine; + std::unique_ptr progress_tracker; }; +// BackfillListener -- an interface used by the backfill FSM to request +// low-level services like issueing `MOSDPGPush` or `MOSDPGBackfillRemove`. +// The goals behind the interface are: 1) unittestability; 2) possibility +// to retrofit classical OSD with BackfillState. For the second reason we +// never use `seastar::future` -- instead responses to the requests are +// conveyed as events; see ObjectPushed as an example. struct BackfillState::BackfillListener { virtual void request_replica_scan( const pg_shard_t& target, @@ -116,4 +286,42 @@ struct BackfillState::BackfillListener { virtual ~BackfillListener() = default; }; +class BackfillState::ProgressTracker { + // TODO: apply_stat, + enum class op_stage_t { + enqueued_push, + enqueued_drop, + completed_push, + }; + + struct registry_item_t { + op_stage_t stage; + std::optional stats; + }; + + BackfillMachine& backfill_machine; + std::map registry; + + BackfillState& backfill_state() { + return backfill_machine.backfill_state; + } + PeeringFacade& peering_state() { + return *backfill_machine.peering_state; + } + BackfillListener& backfill_listener() { + return backfill_machine.backfill_listener; + } + +public: + ProgressTracker(BackfillMachine& backfill_machine) + : backfill_machine(backfill_machine) { + } + + bool tracked_objects_completed() const; + + void enqueue_push(const hobject_t&); + void enqueue_drop(const hobject_t&); + void complete_to(const hobject_t&, const pg_stat_t&); +}; + } // namespace crimson::osd diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index 8c94590ec1ba..1768c0a52eb8 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -23,6 +23,7 @@ #include "crimson/common/type_helpers.h" #include "crimson/os/futurized_collection.h" +#include "crimson/osd/backfill_state.h" #include "crimson/osd/osd_operations/client_request.h" #include "crimson/osd/osd_operations/peering_event.h" #include "crimson/osd/osd_operations/replicated_request.h" @@ -657,6 +658,7 @@ private: friend class PGAdvanceMap; friend class PeeringEvent; friend class RepRequest; + friend struct BackfillState::PGFacade; private: seastar::future find_unfound() { return seastar::make_ready_future(true);