The existing "prime temp" machinism is a good inspiration
for cluster with a large number of pgs that need to do various
calculations quickly.
I am planning to do the upmap tidy-up work the same way, hence
the need for an alternate way of specifying pgs to process other
than taking directly from the map.
Signed-off-by: xie xingguo <xie.xingguo@zte.com.cn>
(cherry picked from commit
f6fd4a312e0dda260f2c150334f06b531678ce47)
Conflicts:
s/g_conf./g_conf->/
dout(10) << __func__ << " no pools, no pg_temp priming" << dendl;
} else if (all) {
PrimeTempJob job(next, this);
- mapper.queue(&job, g_conf->mon_osd_mapping_pgs_per_chunk);
+ mapper.queue(&job, g_conf->mon_osd_mapping_pgs_per_chunk, {});
if (job.wait_for(g_conf->mon_osd_prime_pg_temp_max_time)) {
dout(10) << __func__ << " done in " << job.get_duration() << dendl;
} else {
osdmon->prime_pg_temp(*osdmap, pgid);
}
}
+ void process(const vector<pg_t>& pgs) override {}
void complete() override {}
};
void maybe_prime_pg_temp();
void ParallelPGMapper::WQ::_process(Item *i, ThreadPool::TPHandle &h)
{
- ldout(m->cct, 20) << __func__ << " " << i->job << " " << i->pool
- << " [" << i->begin << "," << i->end << ")" << dendl;
- i->job->process(i->pool, i->begin, i->end);
+ ldout(m->cct, 20) << __func__ << " " << i->job << " pool " << i->pool
+ << " [" << i->begin << "," << i->end << ")"
+ << " pgs " << i->pgs
+ << dendl;
+ if (!i->pgs.empty())
+ i->job->process(i->pgs);
+ else
+ i->job->process(i->pool, i->begin, i->end);
i->job->finish_one();
delete i;
}
void ParallelPGMapper::queue(
Job *job,
- unsigned pgs_per_item)
+ unsigned pgs_per_item,
+ const vector<pg_t>& input_pgs)
{
bool any = false;
+ if (!input_pgs.empty()) {
+ unsigned i = 0;
+ vector<pg_t> item_pgs;
+ item_pgs.reserve(pgs_per_item);
+ for (auto& pg : input_pgs) {
+ if (i < pgs_per_item) {
+ ++i;
+ item_pgs.push_back(pg);
+ }
+ if (i >= pgs_per_item) {
+ job->start_one();
+ wq.queue(new Item(job, item_pgs));
+ i = 0;
+ item_pgs.clear();
+ any = true;
+ }
+ }
+ if (!item_pgs.empty()) {
+ job->start_one();
+ wq.queue(new Item(job, item_pgs));
+ any = true;
+ }
+ ceph_assert(any);
+ return;
+ }
+ // no input pgs, load all from map
for (auto& p : job->osdmap->get_pools()) {
for (unsigned ps = 0; ps < p.second.get_pg_num(); ps += pgs_per_item) {
unsigned ps_end = std::min(ps + pgs_per_item, p.second.get_pg_num());
assert(shards == 0);
}
- // child must implement this
+ // child must implement either form of process
+ virtual void process(const vector<pg_t>& pgs) = 0;
virtual void process(int64_t poolid, unsigned ps_begin, unsigned ps_end) = 0;
virtual void complete() = 0;
Job *job;
int64_t pool;
unsigned begin, end;
+ vector<pg_t> pgs;
+ Item(Job *j, vector<pg_t> pgs) : job(j), pgs(pgs) {}
Item(Job *j, int64_t p, unsigned b, unsigned e)
: job(j),
pool(p),
void queue(
Job *job,
- unsigned pgs_per_item);
+ unsigned pgs_per_item,
+ const vector<pg_t>& input_pgs);
void drain() {
wq.drain();
: Job(osdmap), mapping(m) {
mapping->_start(*osdmap);
}
+ void process(const vector<pg_t>& pgs) override {}
void process(int64_t pool, unsigned ps_begin, unsigned ps_end) override {
mapping->_update_range(*osdmap, pool, ps_begin, ps_end);
}
ParallelPGMapper& mapper,
unsigned pgs_per_item) {
std::unique_ptr<MappingJob> job(new MappingJob(&map, this));
- mapper.queue(job.get(), pgs_per_item);
+ mapper.queue(job.get(), pgs_per_item, {});
return job;
}