namespace crimson::osd {
+static constexpr crimson::osd::scheduler::params_t backfill_throttle_params{
+ 1, 0, 0, SchedulerClass::background_best_effort
+};
+
void PGRecovery::start_pglogbased_recovery()
{
auto [op, fut] = pg->get_shard_services().start_operation<PglogBasedRecovery>(
}
}
+PGRecovery::interruptible_future<>
+PGRecovery::do_request_replica_scan(
+ pg_shard_t target,
+ hobject_t begin,
+ hobject_t end)
+{
+ LOG_PREFIX(PGRecovery::do_request_replica_scan);
+ DEBUGDPP("target.osd={}", *pg->get_dpp(), target.osd);
+ try {
+ auto releaser = co_await get_backfill_throttle();
+ replica_scan_throttle_releasers.erase(target);
+ replica_scan_throttle_releasers.emplace(target, std::move(releaser));
+ std::ignore = pg->get_shard_services().send_to_osd(
+ target.osd,
+ crimson::make_message<MOSDPGScan>(
+ MOSDPGScan::OP_SCAN_GET_DIGEST,
+ pg->get_pg_whoami(),
+ pg->get_osdmap_epoch(),
+ pg->get_last_peering_reset(),
+ spg_t(pg->get_pgid().pgid, target.shard),
+ begin,
+ end),
+ pg->get_osdmap_epoch());
+ } catch (const crimson::common::interruption& e) {
+ DEBUGDPP("replica scan interrupted: {}", *pg->get_dpp(), e.what());
+ co_return;
+ } catch (...) {
+ ceph_abort_msg(fmt::format(
+ "got unexpected exception on backfill's replica scan for {}: {}",
+ pg->get_pgid(), std::current_exception()));
+ }
+}
+
void PGRecovery::request_replica_scan(
const pg_shard_t& target,
const hobject_t& begin,
const hobject_t& end)
{
- LOG_PREFIX(PGRecovery::request_replica_scan);
- DEBUGDPP("target.osd={}", *pg->get_dpp(), target.osd);
- auto msg = crimson::make_message<MOSDPGScan>(
- MOSDPGScan::OP_SCAN_GET_DIGEST,
- pg->get_pg_whoami(),
- pg->get_osdmap_epoch(),
- pg->get_last_peering_reset(),
- spg_t(pg->get_pgid().pgid, target.shard),
- begin,
- end);
- std::ignore = pg->get_shard_services().send_to_osd(
- target.osd,
- std::move(msg),
- pg->get_osdmap_epoch());
+ std::ignore = do_request_replica_scan(target, begin, end);
}
-void PGRecovery::request_primary_scan(
- const hobject_t& begin)
+PGRecovery::interruptible_future<>
+PGRecovery::do_request_primary_scan(hobject_t begin)
{
- LOG_PREFIX(PGRecovery::request_primary_scan);
+ LOG_PREFIX(PGRecovery::do_request_primary_scan);
DEBUGDPP("begin {}", *pg->get_dpp(), begin);
using crimson::common::local_conf;
- std::ignore = pg->get_recovery_backend()->scan_for_backfill_primary(
- begin,
- local_conf()->osd_backfill_scan_min,
- local_conf()->osd_backfill_scan_max,
- pg->get_peering_state().get_backfill_targets()
- ).then_interruptible([this] (PrimaryBackfillInterval bi) {
- using BackfillState = crimson::osd::BackfillState;
+ try {
+ auto releaser = co_await get_backfill_throttle();
+ auto bi = co_await pg->get_recovery_backend()->scan_for_backfill_primary(
+ begin,
+ local_conf()->osd_backfill_scan_min,
+ local_conf()->osd_backfill_scan_max,
+ pg->get_peering_state().get_backfill_targets());
+ using BackfillState = crimson::osd::BackfillState;
backfill_state->process_event(
BackfillState::PrimaryScanned{ std::move(bi) }.intrusive_from_this());
- });
+ // releaser destroyed here as the coroutine frame unwinds
+ } catch (const crimson::common::interruption& e) {
+ DEBUGDPP("primary scan interrupted: {}", *pg->get_dpp(), e.what());
+ co_return;
+ } catch (...) {
+ ceph_abort_msg(fmt::format(
+ "got unexpected exception on backfill's primary scan for {}: {}",
+ pg->get_pgid(), std::current_exception()));
+ }
+}
+
+void PGRecovery::request_primary_scan(
+ const hobject_t& begin)
+{
+ std::ignore = do_request_primary_scan(begin);
+}
+
+PGRecovery::interruptible_future<OperationThrottler::ThrottleReleaser>
+PGRecovery::get_backfill_throttle()
+{
+ return interruptor::make_interruptible(
+ pg->get_shard_services().get_throttle(backfill_throttle_params));
}
PGRecovery::interruptible_future<>
{
LOG_PREFIX(PGRecovery::recover_object_with_throttle);
DEBUGDPP("{} {}", *pg->get_dpp(), soid, need);
- auto releaser = co_await interruptor::make_interruptible(
- pg->get_shard_services().get_throttle(
- crimson::osd::scheduler::params_t{
- 1, 0, 0, SchedulerClass::background_best_effort
- }));
+ auto releaser = co_await get_backfill_throttle();
DEBUGDPP("got throttle: {} {}", *pg->get_dpp(), soid, need);
co_await pg->get_recovery_backend()->recover_object(soid, need);
- co_return;
}
void PGRecovery::enqueue_push(
void PGRecovery::on_pg_clean()
{
+ replica_scan_throttle_releasers.clear();
backfill_state.reset();
}
LOG_PREFIX(PGRecovery::dispatch_backfill_event);
DEBUGDPP("", *pg->get_dpp());
assert(backfill_state);
+ if (auto* scanned =
+ dynamic_cast<const BackfillState::ReplicaScanned*>(evt.get())) {
+ replica_scan_throttle_releasers.erase(scanned->from);
+ }
backfill_state->process_event(evt);
// TODO: Do we need to worry about cases in which the pg has
// been through both backfill cancellations and backfill
{
LOG_PREFIX(PGRecovery::on_activate_complete);
DEBUGDPP("backfill_state={}", *pg->get_dpp(), fmt::ptr(backfill_state.get()));
+ replica_scan_throttle_releasers.clear();
backfill_state.reset();
}