// ---
void OSDService::_queue_for_recovery(
- std::pair<epoch_t, PGRef> p,
+ pg_awaiting_throttle_t p,
uint64_t reserved_pushes)
{
ceph_assert(ceph_mutex_is_locked_by_me(recovery_lock));
+
+ uint64_t cost_for_queue = [this, &reserved_pushes, &p] {
+ if (cct->_conf->osd_op_queue == "mclock_scheduler") {
+ return p.cost_per_object * reserved_pushes;
+ } else {
+ /* We retain this legacy behavior for WeightedPriorityQueue. It seems to
+ * require very large costs for several messages in order to do any
+ * meaningful amount of throttling. This branch should be removed after
+ * Reef.
+ */
+ return cct->_conf->osd_recovery_cost;
+ }
+ }();
+
enqueue_back(
OpSchedulerItem(
unique_ptr<OpSchedulerItem::OpQueueable>(
new PGRecovery(
- p.second->get_pgid(), p.first, reserved_pushes)),
- cct->_conf->osd_recovery_cost,
+ p.pg->get_pgid(),
+ p.epoch_queued,
+ reserved_pushes)),
+ cost_for_queue,
cct->_conf->osd_recovery_priority,
ceph_clock_now(),
0,
- p.first));
+ p.epoch_queued));
}
// ====================================================================
private:
// -- pg recovery and associated throttling --
ceph::mutex recovery_lock = ceph::make_mutex("OSDService::recovery_lock");
- std::list<std::pair<epoch_t, PGRef> > awaiting_throttle;
+
+ struct pg_awaiting_throttle_t {
+ const epoch_t epoch_queued;
+ PGRef pg;
+ const uint64_t cost_per_object;
+ };
+ std::list<pg_awaiting_throttle_t> awaiting_throttle;
/// queue a scrub-related message for a PG
template <class MSG_TYPE>
#endif
bool _recover_now(uint64_t *available_pushes);
void _maybe_queue_recovery();
- void _queue_for_recovery(
- std::pair<epoch_t, PGRef> p, uint64_t reserved_pushes);
+ void _queue_for_recovery(pg_awaiting_throttle_t p, uint64_t reserved_pushes);
public:
void start_recovery_op(PG *pg, const hobject_t& soid);
void finish_recovery_op(PG *pg, const hobject_t& soid, bool dequeue);
std::lock_guard l(recovery_lock);
awaiting_throttle.remove_if(
[pg](decltype(awaiting_throttle)::const_reference awaiting ) {
- return awaiting.second.get() == pg;
+ return awaiting.pg.get() == pg;
});
}
unsigned get_target_pg_log_entries() const;
// delayed pg activation
- void queue_for_recovery(PG *pg) {
+ void queue_for_recovery(PG *pg, uint64_t cost_per_object) {
std::lock_guard l(recovery_lock);
if (pg->is_forced_recovery_or_backfill()) {
- awaiting_throttle.push_front(std::make_pair(pg->get_osdmap()->get_epoch(), pg));
+ awaiting_throttle.emplace_front(
+ pg_awaiting_throttle_t{
+ pg->get_osdmap()->get_epoch(), pg, cost_per_object});
} else {
- awaiting_throttle.push_back(std::make_pair(pg->get_osdmap()->get_epoch(), pg));
+ awaiting_throttle.emplace_back(
+ pg_awaiting_throttle_t{
+ pg->get_osdmap()->get_epoch(), pg, cost_per_object});
}
_maybe_queue_recovery();
}
void queue_recovery_after_sleep(PG *pg, epoch_t queued, uint64_t reserved_pushes) {
std::lock_guard l(recovery_lock);
- _queue_for_recovery(std::make_pair(queued, pg), reserved_pushes);
+ // Send cost as 1 in pg_awaiting_throttle_t below. The cost is ignored
+ // as this path is only applicable for WeightedPriorityQueue scheduler.
+ _queue_for_recovery(pg_awaiting_throttle_t{queued, pg, 1}, reserved_pushes);
}
void queue_check_readable(spg_t spgid,
} else {
dout(10) << "queue_recovery -- queuing" << dendl;
recovery_queued = true;
- osd->queue_for_recovery(this);
+ // Let cost per object be the average object size
+ auto num_bytes = static_cast<uint64_t>(
+ std::max<int64_t>(
+ 0, // ensure bytes is non-negative
+ info.stats.stats.sum.num_bytes));
+ auto num_objects = static_cast<uint64_t>(
+ std::max<int64_t>(
+ 1, // ensure objects is non-negative and non-zero
+ info.stats.stats.sum.num_objects));
+ uint64_t cost_per_object = std::max<uint64_t>(num_bytes / num_objects, 1);
+ osd->queue_for_recovery(this, cost_per_object);
}
}