do {
if (!backfill_listener().budget_available()) {
- DEBUGDPP("throttle failed, turning to Waiting", pg());
post_event(RequestWaiting{});
return;
} else if (should_rescan_replicas(backfill_state().peer_backfill_info,
struct SuspendBackfill : sc::event<SuspendBackfill> {
};
- struct ThrottleAcquired : sc::event<ThrottleAcquired> {
- };
private:
// internal events
struct RequestPrimaryScanning : sc::event<RequestPrimaryScanning> {
sc::transition<RequestDone, Done>,
sc::custom_reaction<SuspendBackfill>,
sc::custom_reaction<Triggered>,
- sc::transition<ThrottleAcquired, Enqueuing>,
sc::transition<sc::event_base, Crashed>>;
explicit Waiting(my_context);
sc::result react(ObjectPushed);
with_throttle_while(std::forward<Args>(args)...), *this);
}
- // Returns std::nullopt if the throttle is acquired immediately,
- // returns the future for the acquiring otherwise
- std::optional<seastar::future<>>
- try_acquire_throttle_now(crimson::osd::scheduler::params_t params) {
- if (!max_in_progress || in_progress < max_in_progress) {
- ++in_progress;
- --pending;
- return std::nullopt;
- }
- return acquire_throttle(params);
- }
-
private:
void dump_detail(Formatter *f) const final;
if (max_to_start > 0) {
max_to_start -= start_replica_recovery_ops(trigger, max_to_start, &started);
}
+ using interruptor =
+ crimson::interruptible::interruptor<crimson::osd::IOInterruptCondition>;
return interruptor::parallel_for_each(started,
[] (auto&& ifut) {
return std::move(ifut);
bool PGRecovery::budget_available() const
{
- crimson::osd::scheduler::params_t params =
- {1, 0, crimson::osd::scheduler::scheduler_class_t::background_best_effort};
- auto &ss = pg->get_shard_services();
- auto futopt = ss.try_acquire_throttle_now(std::move(params));
- if (!futopt) {
- return true;
- }
- std::ignore = interruptor::make_interruptible(std::move(*futopt)
- ).then_interruptible([this] {
- assert(!backfill_state->is_triggered());
- using BackfillState = crimson::osd::BackfillState;
- backfill_state->process_event(
- BackfillState::ThrottleAcquired{}.intrusive_from_this());
- });
- return false;
+ // TODO: the limits!
+ return true;
}
void PGRecovery::on_pg_clean()
class PGRecovery : public crimson::osd::BackfillState::BackfillListener {
public:
- using interruptor =
- crimson::interruptible::interruptor<crimson::osd::IOInterruptCondition>;
template <typename T = void>
using interruptible_future = RecoveryBackend::interruptible_future<T>;
PGRecovery(PGRecoveryListener* pg) : pg(pg) {}
FORWARD_TO_OSD_SINGLETON(get_pool_info)
FORWARD(with_throttle_while, with_throttle_while, local_state.throttler)
- FORWARD(try_acquire_throttle_now, try_acquire_throttle_now, local_state.throttler)
FORWARD_TO_OSD_SINGLETON(build_incremental_map_msg)
FORWARD_TO_OSD_SINGLETON(send_incremental_map)