osd->op_shardedwq.queue_front(std::move(qi));
}
+void OSDService::queue_recovery_context(
+ PG *pg,
+ GenContext<ThreadPool::TPHandle&> *c)
+{
+ epoch_t e = get_osdmap()->get_epoch();
+ enqueue_back(
+ OpQueueItem(
+ unique_ptr<OpQueueItem::OpQueueable>(
+ new PGRecoveryContext(pg->get_pgid(), c, e)),
+ cct->_conf->osd_recovery_cost,
+ cct->_conf->osd_recovery_priority,
+ ceph_clock_now(),
+ 0,
+ e));
+}
+
void OSDService::queue_for_snap_trim(PG *pg)
{
dout(10) << "queueing " << *pg << " for snaptrim" << dendl;
SafeTimer scrub_sleep_timer;
AsyncReserver<spg_t> snap_reserver;
+ void queue_recovery_context(PG *pg, GenContext<ThreadPool::TPHandle&> *c);
void queue_for_snap_trim(PG *pg);
void queue_for_scrub(PG *pg, bool with_high_priority);
void queue_for_pg_delete(spg_t pgid, epoch_t e);
pg->unlock();
}
+void PGRecoveryContext::run(
+ OSD *osd,
+ PGRef& pg,
+ ThreadPool::TPHandle &handle)
+{
+ c.release()->complete(handle);
+ pg->unlock();
+}
+
void PGDelete::run(
OSD *osd,
PGRef& pg,
{
osd->dequeue_delete(pg.get(), epoch_queued, handle);
}
-
OSD *osd, PGRef& pg, ThreadPool::TPHandle &handle) override final;
};
+class PGRecoveryContext : public PGOpQueueable {
+ unique_ptr<GenContext<ThreadPool::TPHandle&>> c;
+ epoch_t epoch;
+public:
+ PGRecoveryContext(spg_t pgid,
+ GenContext<ThreadPool::TPHandle&> *c, epoch_t epoch)
+ : PGOpQueueable(pgid),
+ c(c), epoch(epoch) {}
+ op_type_t get_op_type() const override final {
+ return op_type_t::bg_recovery;
+ }
+ ostream &print(ostream &rhs) const override final {
+ return rhs << "PGRecoveryContext(pgid=" << get_pgid()
+ << " c=" << c.get() << " epoch=" << epoch
+ << ")";
+ }
+ void run(
+ OSD *osd, PGRef& pg, ThreadPool::TPHandle &handle) override final;
+};
+
class PGDelete : public PGOpQueueable {
epoch_t epoch_queued;
public:
void PrimaryLogPG::schedule_recovery_work(
GenContext<ThreadPool::TPHandle&> *c)
{
- osd->recovery_gen_wq.queue(c);
+ osd->queue_recovery_context(this, c);
}
void PrimaryLogPG::send_message_osd_cluster(