--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <algorithm>
+
+#include "crimson/osd/backfill_state.h"
+#include "crimson/osd/backfill_facades.h"
+#include "crimson/osd/pg.h"
+#include "osd/PeeringState.h"
+
+namespace {
+ seastar::logger& logger() {
+ return crimson::get_logger(ceph_subsys_osd);
+ }
+}
+
+namespace crimson::osd {
+
+BackfillState::BackfillState(
+ BackfillState::BackfillListener& backfill_listener,
+ std::unique_ptr<BackfillState::PeeringFacade> peering_state,
+ std::unique_ptr<BackfillState::PGFacade> pg)
+ : backfill_machine(*this,
+ backfill_listener,
+ std::move(peering_state),
+ std::move(pg)),
+ progress_tracker(
+ std::make_unique<BackfillState::ProgressTracker>(backfill_machine))
+{
+ logger().debug("{}:{}", __func__, __LINE__);
+ backfill_machine.initiate();
+}
+
+BackfillState::~BackfillState() = default;
+
+BackfillState::BackfillMachine::BackfillMachine(
+ BackfillState& backfill_state,
+ BackfillState::BackfillListener& backfill_listener,
+ std::unique_ptr<BackfillState::PeeringFacade> peering_state,
+ std::unique_ptr<BackfillState::PGFacade> pg)
+ : backfill_state(backfill_state),
+ backfill_listener(backfill_listener),
+ peering_state(std::move(peering_state)),
+ pg(std::move(pg))
+{}
+
+BackfillState::BackfillMachine::~BackfillMachine() = default;
+
+BackfillState::Initial::Initial(my_context ctx)
+ : my_base(ctx)
+{
+ backfill_state().last_backfill_started = peering_state().earliest_backfill();
+ logger().debug("{}: bft={} from {}",
+ __func__, peering_state().get_backfill_targets(),
+ backfill_state().last_backfill_started);
+ for (const auto& bt : peering_state().get_backfill_targets()) {
+ logger().debug("{}: target shard {} from {}",
+ __func__, bt, peering_state().get_peer_info(bt).last_backfill);
+ }
+ ceph_assert(peering_state().get_backfill_targets().size());
+ ceph_assert(!backfill_state().last_backfill_started.is_max());
+}
+
+boost::statechart::result
+BackfillState::Initial::react(const BackfillState::Triggered& evt)
+{
+ logger().debug("{}: backfill triggered", __func__);
+ ceph_assert(backfill_state().last_backfill_started == \
+ peering_state().earliest_backfill());
+ // initialize BackfillIntervals
+ for (const auto& bt : peering_state().get_backfill_targets()) {
+ backfill_state().peer_backfill_info[bt].reset(
+ peering_state().get_peer_info(bt).last_backfill);
+ }
+ backfill_state().backfill_info.reset(backfill_state().last_backfill_started);
+ if (Enqueuing::all_enqueued(peering_state(),
+ backfill_state().backfill_info,
+ backfill_state().peer_backfill_info)) {
+ logger().debug("{}: switching to Done state", __func__);
+ return transit<BackfillState::Done>();
+ } else {
+ logger().debug("{}: switching to Enqueuing state", __func__);
+ return transit<BackfillState::Enqueuing>();
+ }
+}
+
+
+// -- Enqueuing
+void BackfillState::Enqueuing::maybe_update_range()
+{
+ if (auto& primary_bi = backfill_state().backfill_info;
+ primary_bi.version >= pg().get_projected_last_update()) {
+ logger().info("{}: bi is current", __func__);
+ ceph_assert(primary_bi.version == pg().get_projected_last_update());
+ } else if (primary_bi.version >= peering_state().get_info().log_tail) {
+#if 0
+ if (peering_state().get_pg_log().get_log().empty() &&
+ pg().get_projected_log().empty()) {
+ /* Because we don't move log_tail on split, the log might be
+ * empty even if log_tail != last_update. However, the only
+ * way to get here with an empty log is if log_tail is actually
+ * eversion_t(), because otherwise the entry which changed
+ * last_update since the last scan would have to be present.
+ */
+ ceph_assert(primary_bi.version == eversion_t());
+ return;
+ }
+#endif
+ logger().debug("{}: bi is old, ({}) can be updated with log to {}",
+ __func__,
+ primary_bi.version,
+ pg().get_projected_last_update());
+ logger().debug("{}: scanning pg log first", __func__);
+ peering_state().get_pg_log().get_log().scan_log_after(primary_bi.version,
+ [&, this](const pg_log_entry_t& e) {
+ logger().debug("maybe_update_range(lambda): updating from version {}",
+ e.version);
+ if (e.soid >= primary_bi.begin && e.soid < primary_bi.end) {
+ if (e.is_update()) {
+ logger().debug("maybe_update_range(lambda): {} updated to ver {}",
+ e.soid, e.version);
+ primary_bi.objects.erase(e.soid);
+ primary_bi.objects.insert(std::make_pair(e.soid,
+ e.version));
+ } else if (e.is_delete()) {
+ logger().debug("maybe_update_range(lambda): {} removed",
+ e.soid);
+ primary_bi.objects.erase(e.soid);
+ }
+ }
+ });
+ primary_bi.version = pg().get_projected_last_update();
+ } else {
+ ceph_abort_msg(
+ "scan_range should have raised primary_bi.version past log_tail");
+ }
+}
+
+void BackfillState::Enqueuing::trim_backfill_infos()
+{
+ for (const auto& bt : peering_state().get_backfill_targets()) {
+ backfill_state().peer_backfill_info[bt].trim_to(
+ std::max(peering_state().get_peer_info(bt).last_backfill,
+ backfill_state().last_backfill_started));
+ }
+ backfill_state().backfill_info.trim_to(
+ backfill_state().last_backfill_started);
+}
+
+/* static */ bool BackfillState::Enqueuing::all_enqueued(
+ const PeeringFacade& peering_state,
+ const BackfillInterval& backfill_info,
+ const std::map<pg_shard_t, BackfillInterval>& peer_backfill_info)
+{
+ const bool all_local_enqueued = \
+ backfill_info.extends_to_end() && backfill_info.empty();
+ const bool all_peer_enqueued = std::all_of(
+ std::begin(peer_backfill_info),
+ std::end(peer_backfill_info),
+ [] (const auto& kv) {
+ [[maybe_unused]] const auto& [ shard, peer_backfill_info ] = kv;
+ return peer_backfill_info.extends_to_end() && peer_backfill_info.empty();
+ });
+ return all_local_enqueued && all_peer_enqueued;
+}
+
+hobject_t BackfillState::Enqueuing::earliest_peer_backfill(
+ const std::map<pg_shard_t, BackfillInterval>& peer_backfill_info) const
+{
+ hobject_t e = hobject_t::get_max();
+ for (const pg_shard_t& bt : peering_state().get_backfill_targets()) {
+ const auto iter = peer_backfill_info.find(bt);
+ ceph_assert(iter != peer_backfill_info.end());
+ e = std::min(e, iter->second.begin);
+ }
+ return e;
+}
+
+bool BackfillState::Enqueuing::should_rescan_replicas(
+ const std::map<pg_shard_t, BackfillInterval>& peer_backfill_info,
+ const BackfillInterval& backfill_info) const
+{
+ const auto& targets = peering_state().get_backfill_targets();
+ return std::any_of(std::begin(targets), std::end(targets),
+ [&, this] (const auto& bt) {
+ return ReplicasScanning::replica_needs_scan(peer_backfill_info.at(bt),
+ backfill_info);
+ });
+}
+
+bool BackfillState::Enqueuing::should_rescan_primary(
+ const std::map<pg_shard_t, BackfillInterval>& peer_backfill_info,
+ const BackfillInterval& backfill_info) const
+{
+ return backfill_info.begin <= earliest_peer_backfill(peer_backfill_info) &&
+ !backfill_info.extends_to_end();
+}
+
+void BackfillState::Enqueuing::trim_backfilled_object_from_intervals(
+ BackfillState::Enqueuing::result_t&& result,
+ hobject_t& last_backfill_started,
+ std::map<pg_shard_t, BackfillInterval>& peer_backfill_info)
+{
+ std::for_each(std::begin(result.pbi_targets), std::end(result.pbi_targets),
+ [this, &peer_backfill_info] (const auto& bt) {
+ peer_backfill_info.at(bt).pop_front();
+ });
+ last_backfill_started = std::move(result.new_last_backfill_started);
+}
+
+BackfillState::Enqueuing::result_t
+BackfillState::Enqueuing::remove_on_peers(const hobject_t& check)
+{
+ // set `new_last_backfill_started` to `check`
+ result_t result { {}, check };
+ for (const auto& bt : peering_state().get_backfill_targets()) {
+ const auto& pbi = backfill_state().peer_backfill_info.at(bt);
+ if (pbi.begin == check) {
+ result.pbi_targets.insert(bt);
+ const auto& version = pbi.objects.begin()->second;
+ backfill_state().progress_tracker->enqueue_drop(pbi.begin);
+ backfill_listener().enqueue_drop(bt, pbi.begin, version);
+ }
+ }
+ logger().debug("{}: BACKFILL removing {} from peers {}",
+ __func__, check, result.pbi_targets);
+ ceph_assert(!result.pbi_targets.empty());
+ return result;
+}
+
+BackfillState::Enqueuing::result_t
+BackfillState::Enqueuing::update_on_peers(const hobject_t& check)
+{
+ const auto& primary_bi = backfill_state().backfill_info;
+ result_t result { {}, primary_bi.begin };
+
+ for (const auto& bt : peering_state().get_backfill_targets()) {
+ const auto& peer_bi = backfill_state().peer_backfill_info.at(bt);
+
+ // Find all check peers that have the wrong version
+ if (const eversion_t& obj_v = primary_bi.objects.begin()->second;
+ check == primary_bi.begin && check == peer_bi.begin) {
+ if(peer_bi.objects.begin()->second != obj_v) {
+ backfill_state().progress_tracker->enqueue_push(primary_bi.begin);
+ backfill_listener().enqueue_push(bt, primary_bi.begin, obj_v);
+ } else {
+ // it's fine, keep it!
+ }
+ result.pbi_targets.insert(bt);
+ } else {
+ const pg_info_t& pinfo = peering_state().get_peer_info(bt);
+ // Only include peers that we've caught up to their backfill line
+ // otherwise, they only appear to be missing this object
+ // because their peer_bi.begin > backfill_info.begin.
+ if (primary_bi.begin > pinfo.last_backfill) {
+ backfill_state().progress_tracker->enqueue_push(primary_bi.begin);
+ backfill_listener().enqueue_push(bt, primary_bi.begin, obj_v);
+ }
+ }
+ }
+ return result;
+}
+
+BackfillState::Enqueuing::Enqueuing(my_context ctx)
+ : my_base(ctx)
+{
+ logger().debug("{}", __func__);
+ auto& primary_bi = backfill_state().backfill_info;
+
+ // update our local interval to cope with recent changes
+ primary_bi.begin = backfill_state().last_backfill_started;
+ if (primary_bi.version < peering_state().get_info().log_tail) {
+ // it might be that the OSD is so flooded with modifying operations
+ // that backfill will be spinning here over and over. For the sake
+ // of performance and complexity we don't synchronize with entire PG.
+ // similar can happen in classical OSD.
+ logger().warn("{}: bi is old, rescanning of local backfill_info",
+ __func__);
+ post_event(RequestPrimaryScanning{});
+ return;
+ } else {
+ maybe_update_range();
+ }
+ trim_backfill_infos();
+
+ while (!primary_bi.empty()) {
+ if (!backfill_listener().budget_available()) {
+ post_event(RequestWaiting{});
+ return;
+ } else if (should_rescan_replicas(backfill_state().peer_backfill_info,
+ primary_bi)) {
+ // Count simultaneous scans as a single op and let those complete
+ post_event(RequestReplicasScanning{});
+ return;
+ }
+ // Get object within set of peers to operate on and the set of targets
+ // for which that object applies.
+ if (const hobject_t check = \
+ earliest_peer_backfill(backfill_state().peer_backfill_info);
+ check < primary_bi.begin) {
+ // Don't increment ops here because deletions
+ // are cheap and not replied to unlike real recovery_ops,
+ // and we can't increment ops without requeueing ourself
+ // for recovery.
+ auto result = remove_on_peers(check);
+ trim_backfilled_object_from_intervals(std::move(result),
+ backfill_state().last_backfill_started,
+ backfill_state().peer_backfill_info);
+ } else {
+ auto result = update_on_peers(check);
+ trim_backfilled_object_from_intervals(std::move(result),
+ backfill_state().last_backfill_started,
+ backfill_state().peer_backfill_info);
+ backfill_state().backfill_info.pop_front();
+ }
+ }
+
+ if (should_rescan_primary(backfill_state().peer_backfill_info,
+ primary_bi)) {
+ // need to grab one another chunk of the object namespace and restart
+ // the queueing.
+ logger().debug("{}: reached end for current local chunk",
+ __func__);
+ post_event(RequestPrimaryScanning{});
+ } else if (backfill_state().progress_tracker->tracked_objects_completed()) {
+ post_event(RequestDone{});
+ } else {
+ logger().debug("{}: reached end for both local and all peers ",
+ "but still has in-flight operations", __func__);
+ post_event(RequestWaiting{});
+ }
+}
+
+// -- PrimaryScanning
+BackfillState::PrimaryScanning::PrimaryScanning(my_context ctx)
+ : my_base(ctx)
+{
+ logger().debug("{}", __func__);
+ backfill_state().backfill_info.version = \
+ peering_state().get_info().last_update;
+ backfill_listener().request_primary_scan(
+ backfill_state().backfill_info.begin);
+}
+
+boost::statechart::result
+BackfillState::PrimaryScanning::react(PrimaryScanned evt)
+{
+ logger().debug("{}", __func__);
+ backfill_state().backfill_info = std::move(evt.result);
+ return transit<Enqueuing>();
+}
+
+boost::statechart::result
+BackfillState::PrimaryScanning::react(ObjectPushed evt)
+{
+ logger().debug("PrimaryScanning::react() on ObjectPushed; evt.object={}",
+ evt.object);
+ backfill_state().progress_tracker->complete_to(evt.object, evt.stat);
+ return discard_event();
+}
+
+// -- ReplicasScanning
+bool BackfillState::ReplicasScanning::replica_needs_scan(
+ const BackfillInterval& replica_backfill_info,
+ const BackfillInterval& local_backfill_info)
+{
+ return replica_backfill_info.empty() && \
+ replica_backfill_info.begin <= local_backfill_info.begin && \
+ !replica_backfill_info.extends_to_end();
+}
+
+BackfillState::ReplicasScanning::ReplicasScanning(my_context ctx)
+ : my_base(ctx)
+{
+ logger().debug("{}", __func__);
+ for (const auto& bt : peering_state().get_backfill_targets()) {
+ if (const auto& pbi = backfill_state().peer_backfill_info.at(bt);
+ replica_needs_scan(pbi, backfill_state().backfill_info)) {
+ logger().debug("{}: scanning peer osd.{} from {}",
+ __func__, bt, pbi.end);
+ backfill_listener().request_replica_scan(bt, pbi.end, hobject_t{});
+
+ ceph_assert(waiting_on_backfill.find(bt) == \
+ waiting_on_backfill.end());
+ waiting_on_backfill.insert(bt);
+ }
+ }
+ ceph_assert(!waiting_on_backfill.empty());
+ // TODO: start_recovery_op(hobject_t::get_max()); // XXX: was pbi.end
+}
+
+#if 0
+BackfillState::ReplicasScanning::~ReplicasScanning()
+{
+ // TODO: finish_recovery_op(hobject_t::get_max());
+}
+#endif
+
+boost::statechart::result
+BackfillState::ReplicasScanning::react(ReplicaScanned evt)
+{
+ logger().debug("{}: got scan result from osd={}, result={}",
+ __func__, evt.from, evt.result);
+ // TODO: maybe we'll be able to move waiting_on_backfill from
+ // the machine to the state.
+ ceph_assert(peering_state().is_backfill_target(evt.from));
+ if (waiting_on_backfill.erase(evt.from)) {
+ backfill_state().peer_backfill_info[evt.from] = std::move(evt.result);
+ if (waiting_on_backfill.empty()) {
+ ceph_assert(backfill_state().peer_backfill_info.size() == \
+ peering_state().get_backfill_targets().size());
+ return transit<Enqueuing>();
+ }
+ } else {
+ // we canceled backfill for a while due to a too full, and this
+ // is an extra response from a non-too-full peer
+ logger().debug("{}: canceled backfill (too full?)", __func__);
+ }
+ return discard_event();
+}
+
+boost::statechart::result
+BackfillState::ReplicasScanning::react(ObjectPushed evt)
+{
+ logger().debug("ReplicasScanning::react() on ObjectPushed; evt.object={}",
+ evt.object);
+ backfill_state().progress_tracker->complete_to(evt.object, evt.stat);
+ return discard_event();
+}
+
+
+// -- Waiting
+BackfillState::Waiting::Waiting(my_context ctx)
+ : my_base(ctx)
+{
+ logger().debug("{}: entered Waiting", __func__);
+}
+
+boost::statechart::result
+BackfillState::Waiting::react(ObjectPushed evt)
+{
+ logger().debug("Waiting::react() on ObjectPushed; evt.object={}",
+ evt.object);
+ backfill_state().progress_tracker->complete_to(evt.object, evt.stat);
+ if (!Enqueuing::all_enqueued(peering_state(),
+ backfill_state().backfill_info,
+ backfill_state().peer_backfill_info)) {
+ return transit<Enqueuing>();
+ } else if (backfill_state().progress_tracker->tracked_objects_completed()) {
+ return transit<Done>();
+ } else {
+ // we still have something to wait on
+ logger().debug("Waiting::react() on ObjectPushed; still waiting");
+ return discard_event();
+ }
+}
+
+// -- Done
+BackfillState::Done::Done(my_context ctx)
+ : my_base(ctx)
+{
+ logger().info("{}: backfill is done", __func__);
+ backfill_listener().backfilled();
+}
+
+// -- Crashed
+BackfillState::Crashed::Crashed()
+{
+ ceph_abort_msg("{}: this should not happen");
+}
+
+// ProgressTracker is an intermediary between the BackfillListener and
+// BackfillMachine + its states. All requests to push or drop an object
+// are directed through it. The same happens with notifications about
+// completing given operations which are generated by BackfillListener
+// and dispatched as i.e. ObjectPushed events.
+// This allows ProgressTacker to track the list of in-flight operations
+// which is essential to make the decision whether the entire machine
+// should switch from Waiting to Done keep in Waiting.
+// ProgressTracker also coordinates .last_backfill_started and stats
+// updates.
+bool BackfillState::ProgressTracker::tracked_objects_completed() const
+{
+ return registry.empty();
+}
+
+void BackfillState::ProgressTracker::enqueue_push(const hobject_t& obj)
+{
+ ceph_assert(registry.count(obj) == 0);
+ registry[obj] = registry_item_t{ op_stage_t::enqueued_push, std::nullopt };
+}
+
+void BackfillState::ProgressTracker::enqueue_drop(const hobject_t& obj)
+{
+ ceph_assert(registry.count(obj) == 0);
+ registry[obj] = registry_item_t{ op_stage_t::enqueued_drop, pg_stat_t{} };
+}
+
+void BackfillState::ProgressTracker::complete_to(
+ const hobject_t& obj,
+ const pg_stat_t& stats)
+{
+ logger().debug("{}: obj={}",
+ __func__, obj);
+ if (auto completion_iter = registry.find(obj);
+ completion_iter != std::end(registry)) {
+ completion_iter->second = \
+ registry_item_t{ op_stage_t::completed_push, stats };
+ } else {
+ ceph_abort_msg("completing untracked object shall not happen");
+ }
+ for (auto it = std::begin(registry);
+ it != std::end(registry) &&
+ it->second.stage != op_stage_t::enqueued_push;
+ it = registry.erase(it)) {
+ auto& [soid, item] = *it;
+ assert(item.stats);
+ peering_state().update_complete_backfill_object_stats(
+ soid,
+ *item.stats);
+ }
+ if (Enqueuing::all_enqueued(peering_state(),
+ backfill_state().backfill_info,
+ backfill_state().peer_backfill_info) &&
+ tracked_objects_completed()) {
+ backfill_state().last_backfill_started = hobject_t::get_max();
+ backfill_listener().update_peers_last_backfill(hobject_t::get_max());
+ } else {
+ backfill_listener().update_peers_last_backfill(obj);
+ }
+}
+
+} // namespace crimson::osd
#pragma once
+#include <optional>
+
#include <boost/statechart/custom_reaction.hpp>
#include <boost/statechart/event.hpp>
#include <boost/statechart/event_base.hpp>
#include <boost/statechart/state_machine.hpp>
#include <boost/statechart/transition.hpp>
+#include "osd/PeeringState.h"
#include "osd/recovery_types.h"
namespace crimson::osd {
struct BackfillState {
struct BackfillListener;
+ struct PeeringFacade;
+ struct PGFacade;
// events comes first
struct PrimaryScanned : sc::event<PrimaryScanned> {
BackfillInterval result;
};
- struct Flushed : sc::event<Flushed> {
+ struct ObjectPushed : sc::event<ObjectPushed> {
+ // TODO: implement replica management; I don't want to follow
+ // current convention where the backend layer is responsible
+ // for tracking replicas.
+ hobject_t object;
+ pg_stat_t stat;
};
struct Triggered : sc::event<Triggered> {
};
+private:
+ // internal events
+ struct RequestPrimaryScanning : sc::event<RequestPrimaryScanning> {
+ };
+
+ struct RequestReplicasScanning : sc::event<RequestReplicasScanning> {
+ };
+
+ struct RequestWaiting : sc::event<RequestWaiting> {
+ };
+
+ struct RequestDone : sc::event<RequestDone> {
+ };
+
+ class ProgressTracker;
+
+public:
+
struct Initial;
struct Enqueuing;
+ struct PrimaryScanning;
+ struct ReplicasScanning;
+ struct Waiting;
+ struct Done;
- class BackfillMachine : public sc::state_machine<BackfillMachine, Initial> {
+ struct BackfillMachine : sc::state_machine<BackfillMachine, Initial> {
+ BackfillMachine(BackfillState& backfill_state,
+ BackfillListener& backfill_listener,
+ std::unique_ptr<PeeringFacade> peering_state,
+ std::unique_ptr<PGFacade> pg);
+ ~BackfillMachine();
+ BackfillState& backfill_state;
+ BackfillListener& backfill_listener;
+ std::unique_ptr<PeeringFacade> peering_state;
+ std::unique_ptr<PGFacade> pg;
};
+private:
+ template <class S>
+ struct StateHelper {
+ BackfillState& backfill_state() {
+ return static_cast<S*>(this) \
+ ->template context<BackfillMachine>().backfill_state;
+ }
+ BackfillListener& backfill_listener() {
+ return static_cast<S*>(this) \
+ ->template context<BackfillMachine>().backfill_listener;
+ }
+ PeeringFacade& peering_state() {
+ return *static_cast<S*>(this) \
+ ->template context<BackfillMachine>().peering_state;
+ }
+ PGFacade& pg() {
+ return *static_cast<S*>(this)->template context<BackfillMachine>().pg;
+ }
+
+ const PeeringFacade& peering_state() const {
+ return *static_cast<const S*>(this) \
+ ->template context<BackfillMachine>().peering_state;
+ }
+ const BackfillState& backfill_state() const {
+ return static_cast<const S*>(this) \
+ ->template context<BackfillMachine>().backfill_state;
+ }
+ };
+
+public:
+
// states
- struct Crashed : sc::state<Crashed, BackfillMachine>, NamedState {
+ struct Crashed : sc::simple_state<Crashed, BackfillMachine>,
+ StateHelper<Crashed> {
+ explicit Crashed();
};
- struct Initial : sc::state<Initial, BackfillMachine>, NamedState {
+ struct Initial : sc::state<Initial, BackfillMachine>,
+ StateHelper<Initial> {
using reactions = boost::mpl::list<
sc::custom_reaction<Triggered>,
sc::transition<sc::event_base, Crashed>>;
+ explicit Initial(my_context);
// initialize after triggering backfill by on_activate_complete().
// transit to Enqueuing.
sc::result react(const Triggered&);
};
- struct Enqueuing : sc::state<Enqueuing, BackfillMachine>, NamedState {
+ struct Enqueuing : sc::state<Enqueuing, BackfillMachine>,
+ StateHelper<Enqueuing> {
using reactions = boost::mpl::list<
+ sc::transition<RequestPrimaryScanning, PrimaryScanning>,
+ sc::transition<RequestReplicasScanning, ReplicasScanning>,
+ sc::transition<RequestWaiting, Waiting>,
+ sc::transition<RequestDone, Done>,
sc::transition<sc::event_base, Crashed>>;
+ explicit Enqueuing(my_context);
+
+ // indicate whether there is any remaining work to do when it comes
+ // to comparing the hobject_t namespace between primary and replicas.
+ // true doesn't necessarily mean backfill is done -- there could be
+ // in-flight pushes or drops which had been enqueued but aren't
+ // completed yet.
+ static bool all_enqueued(
+ const PeeringFacade& peering_state,
+ const BackfillInterval& backfill_info,
+ const std::map<pg_shard_t, BackfillInterval>& peer_backfill_info);
+
+ private:
+ void maybe_update_range();
+ void trim_backfill_infos();
+
+ // these methods take BackfillIntervals instead of extracting them from
+ // the state to emphasize the relationships across the main loop.
+ hobject_t earliest_peer_backfill(
+ const std::map<pg_shard_t, BackfillInterval>& peer_backfill_info) const;
+ bool should_rescan_replicas(
+ const std::map<pg_shard_t, BackfillInterval>& peer_backfill_info,
+ const BackfillInterval& backfill_info) const;
+ // indicate whether a particular acting primary needs to scanned again
+ // to process next piece of the hobject_t's namespace.
+ // the logic is per analogy to replica_needs_scan(). See comments there.
+ bool should_rescan_primary(
+ const std::map<pg_shard_t, BackfillInterval>& peer_backfill_info,
+ const BackfillInterval& backfill_info) const;
+
+ // the result_t is intermediary between {remove,update}_on_peers() and
+ // updating BackfillIntervals in trim_backfilled_object_from_intervals.
+ // This step is important because it affects the main loop's condition,
+ // and thus deserves to be exposed instead of being called deeply from
+ // {remove,update}_on_peers().
+ struct [[nodiscard]] result_t {
+ std::set<pg_shard_t> pbi_targets;
+ hobject_t new_last_backfill_started;
+ };
+ void trim_backfilled_object_from_intervals(
+ result_t&&,
+ hobject_t& last_backfill_started,
+ std::map<pg_shard_t, BackfillInterval>& peer_backfill_info);
+ result_t remove_on_peers(const hobject_t& check);
+ result_t update_on_peers(const hobject_t& check);
};
struct PrimaryScanning : sc::state<PrimaryScanning, BackfillMachine>,
- NamedState {
+ StateHelper<PrimaryScanning> {
using reactions = boost::mpl::list<
+ sc::custom_reaction<ObjectPushed>,
sc::custom_reaction<PrimaryScanned>,
sc::transition<sc::event_base, Crashed>>;
+ explicit PrimaryScanning(my_context);
+ sc::result react(ObjectPushed);
// collect scanning result and transit to Enqueuing.
- sc::result react(const PrimaryScanned&);
+ sc::result react(PrimaryScanned);
};
struct ReplicasScanning : sc::state<ReplicasScanning, BackfillMachine>,
- NamedState {
+ StateHelper<ReplicasScanning> {
using reactions = boost::mpl::list<
+ sc::custom_reaction<ObjectPushed>,
sc::custom_reaction<ReplicaScanned>,
sc::transition<sc::event_base, Crashed>>;
+ explicit ReplicasScanning(my_context);
// collect scanning result; if all results are collected, transition
// to Enqueuing will happen.
- sc::result react(const ReplicaScanned&);
+ sc::result react(ObjectPushed);
+ sc::result react(ReplicaScanned);
+
+ // indicate whether a particular peer should be scanned to retrieve
+ // BackfillInterval for new range of hobject_t namespace.
+ // true when bi.objects is exhausted, replica bi's end is not MAX,
+ // and primary bi'begin is further than the replica's one.
+ static bool replica_needs_scan(
+ const BackfillInterval& replica_backfill_info,
+ const BackfillInterval& local_backfill_info);
+
+ private:
+ std::set<pg_shard_t> waiting_on_backfill;
};
- struct Flushing : sc::simple_state<Flushing, BackfillMachine>,
- NamedState {
+ struct Waiting : sc::state<Waiting, BackfillMachine>,
+ StateHelper<Waiting> {
using reactions = boost::mpl::list<
- sc::transition<Flushed, Enqueuing>,
+ sc::custom_reaction<ObjectPushed>,
sc::transition<sc::event_base, Crashed>>;
+ explicit Waiting(my_context);
+ sc::result react(ObjectPushed);
};
+
+ struct Done : sc::state<Done, BackfillMachine>,
+ StateHelper<Done> {
+ using reactions = boost::mpl::list<
+ sc::transition<sc::event_base, Crashed>>;
+ explicit Done(my_context);
+ };
+
+ BackfillState(BackfillListener& backfill_listener,
+ std::unique_ptr<PeeringFacade> peering_state,
+ std::unique_ptr<PGFacade> pg);
+ ~BackfillState();
+
+private:
+ hobject_t last_backfill_started;
+ BackfillInterval backfill_info;
+ std::map<pg_shard_t, BackfillInterval> peer_backfill_info;
+ BackfillMachine backfill_machine;
+ std::unique_ptr<ProgressTracker> progress_tracker;
};
+// BackfillListener -- an interface used by the backfill FSM to request
+// low-level services like issueing `MOSDPGPush` or `MOSDPGBackfillRemove`.
+// The goals behind the interface are: 1) unittestability; 2) possibility
+// to retrofit classical OSD with BackfillState. For the second reason we
+// never use `seastar::future` -- instead responses to the requests are
+// conveyed as events; see ObjectPushed as an example.
struct BackfillState::BackfillListener {
virtual void request_replica_scan(
const pg_shard_t& target,
virtual ~BackfillListener() = default;
};
+class BackfillState::ProgressTracker {
+ // TODO: apply_stat,
+ enum class op_stage_t {
+ enqueued_push,
+ enqueued_drop,
+ completed_push,
+ };
+
+ struct registry_item_t {
+ op_stage_t stage;
+ std::optional<pg_stat_t> stats;
+ };
+
+ BackfillMachine& backfill_machine;
+ std::map<hobject_t, registry_item_t> registry;
+
+ BackfillState& backfill_state() {
+ return backfill_machine.backfill_state;
+ }
+ PeeringFacade& peering_state() {
+ return *backfill_machine.peering_state;
+ }
+ BackfillListener& backfill_listener() {
+ return backfill_machine.backfill_listener;
+ }
+
+public:
+ ProgressTracker(BackfillMachine& backfill_machine)
+ : backfill_machine(backfill_machine) {
+ }
+
+ bool tracked_objects_completed() const;
+
+ void enqueue_push(const hobject_t&);
+ void enqueue_drop(const hobject_t&);
+ void complete_to(const hobject_t&, const pg_stat_t&);
+};
+
} // namespace crimson::osd