std::chrono::milliseconds(std::lround(delay * 1000)));
}
return maybe_delay.then([ref, this] {
- return this->template with_blocking_event<OperationThrottler::BlockingEvent>(
- [ref, this] (auto&& trigger) {
- return ss.with_throttle_while(
- std::move(trigger),
- this, get_scheduler_params(), [this] {
- return interruptor::with_interruption([this] {
- return do_recovery();
- }, [](std::exception_ptr) {
- return seastar::make_ready_future<bool>(false);
- }, pg, epoch_started);
- });
+ return seastar::repeat([ref, this] {
+ return interruptor::with_interruption([this] {
+ return do_recovery();
+ }, [](std::exception_ptr) {
+ return seastar::make_ready_future<bool>(false);
+ }, pg, epoch_started).then([](bool recovery_done) {
+ if (recovery_done) {
+ return seastar::stop_iteration::yes;
+ } else {
+ return seastar::stop_iteration::no;
+ }
});
+ });
});
}
).then_interruptible([this] {
return with_blocking_event<RecoveryBackend::RecoveryBlockingEvent,
interruptor>([this] (auto&& trigger) {
- return pg->get_recovery_handler()->recover_missing(trigger, soid, need);
+ return pg->get_recovery_handler()->recover_missing(
+ trigger, soid, need, false);
}).then_interruptible([] {
return seastar::make_ready_future<bool>(false);
});
}
pg->reset_pglog_based_recovery_op();
}
- return seastar::make_ready_future<bool>(!done);
+ return seastar::make_ready_future<bool>(done);
});
}
auto it = missing.get_items().find(head);
assert(it != missing.get_items().end());
auto head_need = it->second.need;
- out->emplace_back(recover_missing(trigger, head, head_need));
+ out->emplace_back(recover_missing(trigger, head, head_need, true));
++skipped;
} else {
- out->emplace_back(recover_missing(trigger, soid, item.need));
+ out->emplace_back(recover_missing(trigger, soid, item.need, true));
}
++started;
}
PGRecovery::interruptible_future<>
PGRecovery::recover_missing(
RecoveryBackend::RecoveryBlockingEvent::TriggerI& trigger,
- const hobject_t &soid, eversion_t need)
+ const hobject_t &soid,
+ eversion_t need,
+ bool with_throttle)
{
logger().info("{} {} v {}", __func__, soid, need);
auto [recovering, added] = pg->get_recovery_backend()->add_recovering(soid);
} else {
return recovering.wait_track_blocking(
trigger,
- pg->get_recovery_backend()->recover_object(soid, need)
+ with_throttle
+ ? recover_object_with_throttle(soid, need)
+ : recover_object(soid, need)
.handle_exception_interruptible(
[=, this, soid = std::move(soid)] (auto e) {
on_failed_recover({ pg->get_pg_whoami() }, soid, need);
logger().info("{} {} v {}, new recovery", __func__, soid, need);
return recovering.wait_track_blocking(
trigger,
- pg->get_recovery_backend()->recover_object(soid, need)
+ recover_object_with_throttle(soid, need)
.handle_exception_interruptible(
[=, this, soid = std::move(soid)] (auto e) {
on_failed_recover({ pg->get_pg_whoami() }, soid, need);
});
}
+PGRecovery::interruptible_future<>
+PGRecovery::recover_object_with_throttle(
+ const hobject_t &soid,
+ eversion_t need)
+{
+ crimson::osd::scheduler::params_t params =
+ {1, 0, crimson::osd::scheduler::scheduler_class_t::background_best_effort};
+ auto &ss = pg->get_shard_services();
+ logger().debug("{} {}", soid, need);
+ return ss.with_throttle(
+ std::move(params),
+ [this, soid, need] {
+ logger().debug("got throttle: {} {}", soid, need);
+ auto backend = pg->get_recovery_backend();
+ assert(backend);
+ return backend->recover_object(soid, need);
+ });
+}
+
void PGRecovery::enqueue_push(
const hobject_t& obj,
const eversion_t& v,
if (!added)
return;
peering_state.prepare_backfill_for_missing(obj, v, peers);
- std::ignore = pg->get_recovery_backend()->recover_object(obj, v).\
+ std::ignore = recover_object_with_throttle(obj, v).\
handle_exception_interruptible([] (auto) {
ceph_abort_msg("got exception on backfill's push");
return seastar::make_ready_future<>();
bool PGRecovery::budget_available() const
{
- // TODO: the limits!
- return true;
+ auto &ss = pg->get_shard_services();
+ return ss.throttle_available();
}
void PGRecovery::on_pg_clean()
}
RecoveryBackend::interruptible_future<> recover_missing(
RecoveryBackend::RecoveryBlockingEvent::TriggerI&,
- const hobject_t &soid, eversion_t need);
+ const hobject_t &soid,
+ eversion_t need,
+ bool with_throttle);
RecoveryBackend::interruptible_future<> prep_object_replica_deletes(
RecoveryBackend::RecoveryBlockingEvent::TriggerI& trigger,
const hobject_t& soid,
friend class ReplicatedRecoveryBackend;
friend class crimson::osd::UrgentRecovery;
+ interruptible_future<> recover_object_with_throttle(
+ const hobject_t &soid,
+ eversion_t need);
+
+ interruptible_future<> recover_object(
+ const hobject_t &soid,
+ eversion_t need) {
+ auto backend = pg->get_recovery_backend();
+ assert(backend);
+ return backend->recover_object(soid, need);
+ }
+
// backfill begin
std::unique_ptr<crimson::osd::BackfillState> backfill_state;
std::map<pg_shard_t,