}
if (op.in_progress.empty()) {
+ /* This case is odd. filter_read_op gets called while processing
+ * an OSDMap. Normal, non-recovery reads only happen from acting
+ * set osds. For this op to have had a read source go down and
+ * there not be an interval change, it must be part of a pull during
+ * log-based recovery.
+ *
+ * This callback delays calling complete_read_op until later to avoid
+ * dealing with recovery while handling an OSDMap. We assign a
+ * cost here of 1 because:
+ * 1) This should be very rare, and the operation itself was already
+ * throttled.
+ * 2) It shouldn't result in IO, rather it should result in restarting
+ * the pull on the affected objects and pushes from in-memory buffers
+ * on any now complete unaffected objects.
+ */
get_parent()->schedule_recovery_work(
get_parent()->bless_unlocked_gencontext(
- new FinishReadOp(this, op.tid)));
+ new FinishReadOp(this, op.tid)),
+ 1);
}
}
void OSDService::queue_recovery_context(
PG *pg,
- GenContext<ThreadPool::TPHandle&> *c)
+ GenContext<ThreadPool::TPHandle&> *c,
+ uint64_t cost)
{
epoch_t e = get_osdmap_epoch();
+
+ uint64_t cost_for_queue = [this, cost] {
+ if (cct->_conf->osd_op_queue == "mclock_scheduler") {
+ return cost;
+ } else {
+ /* We retain this legacy behavior for WeightedPriorityQueue. It seems to
+ * require very large costs for several messages in order to do any
+ * meaningful amount of throttling. This branch should be removed after
+ * Reef.
+ */
+ return cct->_conf->osd_recovery_cost;
+ }
+ }();
+
enqueue_back(
OpSchedulerItem(
unique_ptr<OpSchedulerItem::OpQueueable>(
new PGRecoveryContext(pg->get_pgid(), c, e)),
- cct->_conf->osd_recovery_cost,
+ cost_for_queue,
cct->_conf->osd_recovery_priority,
ceph_clock_now(),
0,
void send_pg_created();
AsyncReserver<spg_t, Finisher> snap_reserver;
- void queue_recovery_context(PG *pg, GenContext<ThreadPool::TPHandle&> *c);
+ void queue_recovery_context(PG *pg,
+ GenContext<ThreadPool::TPHandle&> *c,
+ uint64_t cost);
void queue_for_snap_trim(PG *pg);
void queue_for_scrub(PG* pg, Scrub::scrub_prio_t with_priority);
const pg_stat_t &stat) = 0;
virtual void schedule_recovery_work(
- GenContext<ThreadPool::TPHandle&> *c) = 0;
+ GenContext<ThreadPool::TPHandle&> *c,
+ uint64_t cost) = 0;
virtual pg_shard_t whoami_shard() const = 0;
int whoami() const {
}
void PrimaryLogPG::schedule_recovery_work(
- GenContext<ThreadPool::TPHandle&> *c)
+ GenContext<ThreadPool::TPHandle&> *c,
+ uint64_t cost)
{
- osd->queue_recovery_context(this, c);
+ osd->queue_recovery_context(this, c, cost);
}
void PrimaryLogPG::replica_clear_repop_obc(
}
void schedule_recovery_work(
- GenContext<ThreadPool::TPHandle&> *c) override;
+ GenContext<ThreadPool::TPHandle&> *c,
+ uint64_t cost) override;
pg_shard_t whoami_shard() const override {
return pg_whoami;
class PG_RecoveryQueueAsync : public Context {
PGBackend::Listener *pg;
unique_ptr<GenContext<ThreadPool::TPHandle&>> c;
+ uint64_t cost;
public:
PG_RecoveryQueueAsync(
PGBackend::Listener *pg,
- GenContext<ThreadPool::TPHandle&> *c) : pg(pg), c(c) {}
+ GenContext<ThreadPool::TPHandle&> *c,
+ uint64_t cost) : pg(pg), c(c), cost(cost) {}
void finish(int) override {
- pg->schedule_recovery_work(c.release());
+ pg->schedule_recovery_work(c.release(), cost);
}
};
}
ReplicatedBackend *bc;
list<ReplicatedBackend::pull_complete_info> to_continue;
int priority;
- C_ReplicatedBackend_OnPullComplete(ReplicatedBackend *bc, int priority)
- : bc(bc), priority(priority) {}
+ C_ReplicatedBackend_OnPullComplete(
+ ReplicatedBackend *bc,
+ int priority,
+ list<ReplicatedBackend::pull_complete_info> &&to_continue)
+ : bc(bc), to_continue(std::move(to_continue)), priority(priority) {}
void finish(ThreadPool::TPHandle &handle) override {
ReplicatedBackend::RPGHandle *h = bc->_open_recovery_op();
}
bc->run_recovery_op(h, priority);
}
+
+ /// Estimate total data reads required to perform pushes
+ uint64_t estimate_push_costs() const {
+ uint64_t cost = 0;
+ for (const auto &i: to_continue) {
+ cost += i.stat.num_bytes_recovered;
+ }
+ return cost;
+ }
};
void ReplicatedBackend::_do_pull_response(OpRequestRef op)
C_ReplicatedBackend_OnPullComplete *c =
new C_ReplicatedBackend_OnPullComplete(
this,
- m->get_priority());
- c->to_continue.swap(to_continue);
+ m->get_priority(),
+ std::move(to_continue));
t.register_on_complete(
new PG_RecoveryQueueAsync(
get_parent(),
- get_parent()->bless_unlocked_gencontext(c)));
+ get_parent()->bless_unlocked_gencontext(c),
+ std::max<uint64_t>(1, c->estimate_push_costs())));
}
replies.erase(replies.end() - 1);