From: xie xingguo Date: Wed, 5 Jun 2019 02:41:52 +0000 (+0800) Subject: osd/OSDMapMapping: make ParallelPGMapper can accept input pgs X-Git-Tag: v14.2.3~119^2~3 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=28bbebbb48e76fa68a28326f001c1e465ed0d726;p=ceph.git osd/OSDMapMapping: make ParallelPGMapper can accept input pgs The existing "prime temp" machinism is a good inspiration for cluster with a large number of pgs that need to do various calculations quickly. I am planning to do the upmap tidy-up work the same way, hence the need for an alternate way of specifying pgs to process other than taking directly from the map. Signed-off-by: xie xingguo (cherry picked from commit f6fd4a312e0dda260f2c150334f06b531678ce47) --- diff --git a/src/mon/OSDMonitor.cc b/src/mon/OSDMonitor.cc index 0b4c81fe86cb..e7f347750173 100644 --- a/src/mon/OSDMonitor.cc +++ b/src/mon/OSDMonitor.cc @@ -929,7 +929,7 @@ void OSDMonitor::maybe_prime_pg_temp() dout(10) << __func__ << " no pools, no pg_temp priming" << dendl; } else if (all) { PrimeTempJob job(next, this); - mapper.queue(&job, g_conf()->mon_osd_mapping_pgs_per_chunk); + mapper.queue(&job, g_conf()->mon_osd_mapping_pgs_per_chunk, {}); if (job.wait_for(g_conf()->mon_osd_prime_pg_temp_max_time)) { dout(10) << __func__ << " done in " << job.get_duration() << dendl; } else { diff --git a/src/mon/OSDMonitor.h b/src/mon/OSDMonitor.h index 4968a2d62705..1cb9f5262c28 100644 --- a/src/mon/OSDMonitor.h +++ b/src/mon/OSDMonitor.h @@ -321,6 +321,7 @@ private: osdmon->prime_pg_temp(*osdmap, pgid); } } + void process(const vector& pgs) override {} void complete() override {} }; void maybe_prime_pg_temp(); diff --git a/src/osd/OSDMapMapping.cc b/src/osd/OSDMapMapping.cc index 6640d83dd8ac..285b2d8f5619 100644 --- a/src/osd/OSDMapMapping.cc +++ b/src/osd/OSDMapMapping.cc @@ -147,18 +147,50 @@ void ParallelPGMapper::Job::finish_one() void ParallelPGMapper::WQ::_process(Item *i, ThreadPool::TPHandle &h) { - ldout(m->cct, 20) << __func__ << " " << i->job << " " << i->pool - << " [" << i->begin << "," << i->end << ")" << dendl; - i->job->process(i->pool, i->begin, i->end); + ldout(m->cct, 20) << __func__ << " " << i->job << " pool " << i->pool + << " [" << i->begin << "," << i->end << ")" + << " pgs " << i->pgs + << dendl; + if (!i->pgs.empty()) + i->job->process(i->pgs); + else + i->job->process(i->pool, i->begin, i->end); i->job->finish_one(); delete i; } void ParallelPGMapper::queue( Job *job, - unsigned pgs_per_item) + unsigned pgs_per_item, + const vector& input_pgs) { bool any = false; + if (!input_pgs.empty()) { + unsigned i = 0; + vector item_pgs; + item_pgs.reserve(pgs_per_item); + for (auto& pg : input_pgs) { + if (i < pgs_per_item) { + ++i; + item_pgs.push_back(pg); + } + if (i >= pgs_per_item) { + job->start_one(); + wq.queue(new Item(job, item_pgs)); + i = 0; + item_pgs.clear(); + any = true; + } + } + if (!item_pgs.empty()) { + job->start_one(); + wq.queue(new Item(job, item_pgs)); + any = true; + } + ceph_assert(any); + return; + } + // no input pgs, load all from map for (auto& p : job->osdmap->get_pools()) { for (unsigned ps = 0; ps < p.second.get_pg_num(); ps += pgs_per_item) { unsigned ps_end = std::min(ps + pgs_per_item, p.second.get_pg_num()); diff --git a/src/osd/OSDMapMapping.h b/src/osd/OSDMapMapping.h index 9ceef9ff0664..b0965fc12a71 100644 --- a/src/osd/OSDMapMapping.h +++ b/src/osd/OSDMapMapping.h @@ -32,7 +32,8 @@ public: ceph_assert(shards == 0); } - // child must implement this + // child must implement either form of process + virtual void process(const vector& pgs) = 0; virtual void process(int64_t poolid, unsigned ps_begin, unsigned ps_end) = 0; virtual void complete() = 0; @@ -103,7 +104,9 @@ protected: Job *job; int64_t pool; unsigned begin, end; + vector pgs; + Item(Job *j, vector pgs) : job(j), pgs(pgs) {} Item(Job *j, int64_t p, unsigned b, unsigned e) : job(j), pool(p), @@ -158,7 +161,8 @@ public: void queue( Job *job, - unsigned pgs_per_item); + unsigned pgs_per_item, + const vector& input_pgs); void drain() { wq.drain(); @@ -275,6 +279,7 @@ private: : Job(osdmap), mapping(m) { mapping->_start(*osdmap); } + void process(const vector& pgs) override {} void process(int64_t pool, unsigned ps_begin, unsigned ps_end) override { mapping->_update_range(*osdmap, pool, ps_begin, ps_end); } @@ -330,7 +335,7 @@ public: ParallelPGMapper& mapper, unsigned pgs_per_item) { std::unique_ptr job(new MappingJob(&map, this)); - mapper.queue(job.get(), pgs_per_item); + mapper.queue(job.get(), pgs_per_item, {}); return job; }