float(0.001));
}
-PGRecovery::blocking_interruptible_future<bool>
-PGRecovery::start_recovery_ops(size_t max_to_start)
+PGRecovery::interruptible_future<bool>
+PGRecovery::start_recovery_ops(
+ RecoveryBackend::RecoveryBlockingEvent::TriggerI& trigger,
+ size_t max_to_start)
{
assert(pg->is_primary());
assert(pg->is_peered());
assert(!pg->is_backfilling());
assert(!pg->get_peering_state().is_deleting());
- std::vector<blocking_interruptible_future<>> started;
+ std::vector<RecoveryBackend::interruptible_future<>> new_started;
+ std::vector<interruptible_future<>> started;
+ new_started.reserve(max_to_start);
started.reserve(max_to_start);
- max_to_start -= start_primary_recovery_ops(max_to_start, &started);
+ max_to_start -= start_primary_recovery_ops(trigger, max_to_start, &started);
if (max_to_start > 0) {
- max_to_start -= start_replica_recovery_ops(max_to_start, &started);
+ max_to_start -= start_replica_recovery_ops(trigger, max_to_start, &started);
}
- return crimson::join_blocking_interruptible_futures<
- ::crimson::osd::IOInterruptCondition>(std::move(started)).then_interruptible<
- ::crimson::osd::IOInterruptCondition>(
- [this] {
+ using interruptor =
+ crimson::interruptible::interruptor<crimson::osd::IOInterruptCondition>;
+ return interruptor::parallel_for_each(std::move(new_started),
+ [] (auto&& ifut) {
+ return std::move(ifut);
+ }).then_interruptible([this] {
bool done = !pg->get_peering_state().needs_recovery();
if (done) {
logger().debug("start_recovery_ops: AllReplicasRecovered for pg: {}",
}
size_t PGRecovery::start_primary_recovery_ops(
+ RecoveryBackend::RecoveryBlockingEvent::TriggerI& trigger,
size_t max_to_start,
- std::vector<PGRecovery::blocking_interruptible_future<>> *out)
+ std::vector<PGRecovery::interruptible_future<>> *out)
{
if (!pg->is_recovering()) {
return 0;
// TODO: handle lost/unfound
if (pg->get_recovery_backend()->is_recovering(soid)) {
auto& recovery_waiter = pg->get_recovery_backend()->get_recovering(soid);
- out->push_back(recovery_waiter.wait_for_recovered_blocking<
- ::crimson::osd::IOInterruptCondition>());
+ out->emplace_back(recovery_waiter.wait_for_recovered(
+ *trigger.create_part_trigger()));
++started;
} else if (pg->get_recovery_backend()->is_recovering(head)) {
++skipped;
} else {
- out->push_back(recover_missing(soid, item.need));
+ out->emplace_back(recover_missing(trigger, soid, item.need));
++started;
}
}
size_t PGRecovery::start_replica_recovery_ops(
+ RecoveryBackend::RecoveryBlockingEvent::TriggerI& trigger,
size_t max_to_start,
- std::vector<PGRecovery::blocking_interruptible_future<>> *out)
+ std::vector<PGRecovery::interruptible_future<>> *out)
{
if (!pg->is_recovering()) {
return 0;
if (pg->get_recovery_backend()->is_recovering(soid)) {
logger().debug("{}: already recovering object {}", __func__, soid);
auto& recovery_waiter = pg->get_recovery_backend()->get_recovering(soid);
- out->push_back(recovery_waiter.wait_for_recovered_blocking<
- ::crimson::osd::IOInterruptCondition>());
+ out->emplace_back(recovery_waiter.wait_for_recovered(
+ *trigger.create_part_trigger()));
started++;
continue;
}
logger().debug("{}: soid {} is a delete, removing", __func__, soid);
map<hobject_t,pg_missing_item>::const_iterator r =
pm.get_items().find(soid);
- started += prep_object_replica_deletes(
- soid, r->second.need, out);
+ started++;
+ out->emplace_back(
+ prep_object_replica_deletes(trigger, soid, r->second.need));
continue;
}
logger().debug("{}: recover_object_replicas({})", __func__,soid);
map<hobject_t,pg_missing_item>::const_iterator r = pm.get_items().find(
soid);
- started += prep_object_replica_pushes(
- soid, r->second.need, out);
+ started++;
+ out->emplace_back(
+ prep_object_replica_pushes(trigger, soid, r->second.need));
}
}
return started;
}
-PGRecovery::blocking_interruptible_future<>
+PGRecovery::interruptible_future<>
PGRecovery::recover_missing(
+ RecoveryBackend::RecoveryBlockingEvent::TriggerI& trigger,
const hobject_t &soid, eversion_t need)
{
if (pg->get_peering_state().get_missing_loc().is_deleted(soid)) {
- return pg->get_recovery_backend()->add_recovering(soid).make_blocking_future(
- pg->get_recovery_backend()->recover_delete(soid, need));
+ return pg->get_recovery_backend()->add_recovering(soid).track_blocking(
+ trigger,
+ pg->get_recovery_backend()->recover_delete(soid, need));
} else {
- return pg->get_recovery_backend()->add_recovering(soid).make_blocking_future(
+ return pg->get_recovery_backend()->add_recovering(soid).track_blocking(
+ trigger,
pg->get_recovery_backend()->recover_object(soid, need)
.handle_exception_interruptible(
[=, soid = std::move(soid)] (auto e) {
}
}
-size_t PGRecovery::prep_object_replica_deletes(
+RecoveryBackend::interruptible_future<> PGRecovery::prep_object_replica_deletes(
+ RecoveryBackend::RecoveryBlockingEvent::TriggerI& trigger,
const hobject_t& soid,
- eversion_t need,
- std::vector<PGRecovery::blocking_interruptible_future<>> *in_progress)
+ eversion_t need)
{
- in_progress->push_back(
- pg->get_recovery_backend()->add_recovering(soid).make_blocking_future(
- pg->get_recovery_backend()->push_delete(soid, need).then_interruptible(
- [=] {
- object_stat_sum_t stat_diff;
- stat_diff.num_objects_recovered = 1;
- on_global_recover(soid, stat_diff, true);
- return seastar::make_ready_future<>();
- })
- )
+ return pg->get_recovery_backend()->add_recovering(soid).track_blocking(
+ trigger,
+ pg->get_recovery_backend()->push_delete(soid, need).then_interruptible(
+ [=] {
+ object_stat_sum_t stat_diff;
+ stat_diff.num_objects_recovered = 1;
+ on_global_recover(soid, stat_diff, true);
+ return seastar::make_ready_future<>();
+ })
);
- return 1;
}
-size_t PGRecovery::prep_object_replica_pushes(
+RecoveryBackend::interruptible_future<> PGRecovery::prep_object_replica_pushes(
+ RecoveryBackend::RecoveryBlockingEvent::TriggerI& trigger,
const hobject_t& soid,
- eversion_t need,
- std::vector<PGRecovery::blocking_interruptible_future<>> *in_progress)
+ eversion_t need)
{
- in_progress->push_back(
- pg->get_recovery_backend()->add_recovering(soid).make_blocking_future(
- pg->get_recovery_backend()->recover_object(soid, need)
- .handle_exception_interruptible(
- [=, soid = std::move(soid)] (auto e) {
- on_failed_recover({ pg->get_pg_whoami() }, soid, need);
- return seastar::make_ready_future<>();
- })
- )
+ return pg->get_recovery_backend()->add_recovering(soid).track_blocking(
+ trigger,
+ pg->get_recovery_backend()->recover_object(soid, need)
+ .handle_exception_interruptible(
+ [=, soid = std::move(soid)] (auto e) {
+ on_failed_recover({ pg->get_pg_whoami() }, soid, need);
+ return seastar::make_ready_future<>();
+ })
);
- return 1;
}
void PGRecovery::on_local_recover(
class PGRecovery : public crimson::osd::BackfillState::BackfillListener {
public:
template <typename T = void>
- using blocking_interruptible_future =
- ::crimson::blocking_interruptible_future<
- ::crimson::osd::IOInterruptCondition, T>;
+ using interruptible_future = RecoveryBackend::interruptible_future<T>;
PGRecovery(PGRecoveryListener* pg) : pg(pg) {}
virtual ~PGRecovery() {}
void start_pglogbased_recovery();
- blocking_interruptible_future<bool> start_recovery_ops(size_t max_to_start);
+ interruptible_future<bool> start_recovery_ops(
+ RecoveryBackend::RecoveryBlockingEvent::TriggerI&,
+ size_t max_to_start);
void on_backfill_reserved();
void dispatch_backfill_event(
boost::intrusive_ptr<const boost::statechart::event_base> evt);
private:
PGRecoveryListener* pg;
size_t start_primary_recovery_ops(
+ RecoveryBackend::RecoveryBlockingEvent::TriggerI&,
size_t max_to_start,
- std::vector<blocking_interruptible_future<>> *out);
+ std::vector<interruptible_future<>> *out);
size_t start_replica_recovery_ops(
+ RecoveryBackend::RecoveryBlockingEvent::TriggerI&,
size_t max_to_start,
- std::vector<blocking_interruptible_future<>> *out);
+ std::vector<interruptible_future<>> *out);
std::vector<pg_shard_t> get_replica_recovery_order() const {
return pg->get_replica_recovery_order();
}
- blocking_interruptible_future<> recover_missing(
+ RecoveryBackend::interruptible_future<> recover_missing(
+ RecoveryBackend::RecoveryBlockingEvent::TriggerI&,
const hobject_t &soid, eversion_t need);
- size_t prep_object_replica_deletes(
+ RecoveryBackend::interruptible_future<> prep_object_replica_deletes(
+ RecoveryBackend::RecoveryBlockingEvent::TriggerI& trigger,
const hobject_t& soid,
- eversion_t need,
- std::vector<blocking_interruptible_future<>> *in_progress);
- size_t prep_object_replica_pushes(
+ eversion_t need);
+ RecoveryBackend::interruptible_future<> prep_object_replica_pushes(
+ RecoveryBackend::RecoveryBlockingEvent::TriggerI& trigger,
const hobject_t& soid,
- eversion_t need,
- std::vector<blocking_interruptible_future<>> *in_progress);
+ eversion_t need);
void on_local_recover(
const hobject_t& soid,