wake();
}
-blocking_future<> OperationThrottler::acquire_throttle(
+seastar::future<> OperationThrottler::acquire_throttle(
crimson::osd::scheduler::params_t params)
{
crimson::osd::scheduler::item_t item{params, seastar::promise<>()};
auto fut = item.wake.get_future();
scheduler->enqueue(std::move(item));
- return make_blocking_future(std::move(fut));
+ return fut;
}
void OperationThrottler::dump_detail(Formatter *f) const
crimson::osd::scheduler::params_t params,
F &&f) {
if (!max_in_progress) return f();
- auto fut = acquire_throttle(params);
- // At any given moment a particular op can be blocked by a given
- // OperationThrottler instance no more than once. This means the
- // same throtter won't be on the op's blockers list more than one
- // time.
- return op->with_blocking_future(std::move(fut))
+ return acquire_throttle(params)
.then(std::forward<F>(f))
.then([this](auto x) {
release_throttle();
});
}
+ template <typename OperationT, typename F>
+ seastar::future<> with_throttle_while(
+ OperationT* op,
+ crimson::osd::scheduler::params_t params,
+ F &&f) {
+ return with_throttle(op, params, f).then([this, params, op, f](bool cont) {
+ return cont ? with_throttle_while(op, params, f) : seastar::now();
+ });
+ }
+
+
public:
OperationThrottler(ConfigProxy &conf);
const std::set<std::string> &changed) final;
void update_from_config(const ConfigProxy &conf);
- template <typename OperationT, typename F>
+ template <class OpT, class... Args>
seastar::future<> with_throttle_while(
- OperationT* op,
- crimson::osd::scheduler::params_t params,
- F &&f) {
- return with_throttle(op, params, f).then([this, params, op, f](bool cont) {
- return cont ? with_throttle_while(op, params, f) : seastar::now();
- });
+ BlockingEvent::Trigger<OpT>&& trigger,
+ Args&&... args) {
+ return trigger.maybe_record_blocking(
+ with_throttle_while(std::forward<Args>(args)...), *this);
}
private:
void wake();
- blocking_future<> acquire_throttle(
+ seastar::future<> acquire_throttle(
crimson::osd::scheduler::params_t params);
void release_throttle();
std::chrono::milliseconds(std::lround(delay * 1000)));
}
return maybe_delay.then([ref, this] {
- return ss.throttler.with_throttle_while(
- this, get_scheduler_params(), [this] {
- return T::interruptor::with_interruption([this] {
- return do_recovery();
- }, [](std::exception_ptr) {
- return seastar::make_ready_future<bool>(false);
- }, pg);
- }).handle_exception_type([ref, this](const std::system_error& err) {
- if (err.code() == std::make_error_code(std::errc::interrupted)) {
- logger().debug("{} recovery interruped: {}", *pg, err.what());
- return seastar::now();
- }
- return seastar::make_exception_future<>(err);
+ return this->template with_blocking_event<OperationThrottler::BlockingEvent>(
+ [ref, this] (auto&& trigger) {
+ return ss.throttler.with_throttle_while(
+ std::move(trigger),
+ this, get_scheduler_params(), [this] {
+ return T::interruptor::with_interruption([this] {
+ return do_recovery();
+ }, [](std::exception_ptr) {
+ return seastar::make_ready_future<bool>(false);
+ }, pg);
+ }).handle_exception_type([ref, this](const std::system_error& err) {
+ if (err.code() == std::make_error_code(std::errc::interrupted)) {
+ logger().debug("{} recovery interruped: {}", *pg, err.what());
+ return seastar::now();
+ }
+ return seastar::make_exception_future<>(err);
+ });
});
});
}
void print(std::ostream&) const final;
std::tuple<
+ OperationThrottler::BlockingEvent,
RecoveryBackend::RecoveryBlockingEvent
> tracking_events;
float delay = 0);
std::tuple<
+ OperationThrottler::BlockingEvent,
RecoveryBackend::RecoveryBlockingEvent
> tracking_events;
static BackfillRecoveryPipeline &bp(PG &pg);
std::tuple<
+ OperationThrottler::BlockingEvent,
BackfillRecoveryPipeline::Process::BlockingEvent
> tracking_events;