From 11a7f50a439e68eea43f724d4c8d50d827adcb17 Mon Sep 17 00:00:00 2001 From: xie xingguo Date: Wed, 5 Jun 2019 10:41:52 +0800 Subject: [PATCH] osd/OSDMapMapping: make ParallelPGMapper can accept input pgs The existing "prime temp" machinism is a good inspiration for cluster with a large number of pgs that need to do various calculations quickly. I am planning to do the upmap tidy-up work the same way, hence the need for an alternate way of specifying pgs to process other than taking directly from the map. Signed-off-by: xie xingguo (cherry picked from commit f6fd4a312e0dda260f2c150334f06b531678ce47) Conflicts: s/g_conf./g_conf->/ --- src/mon/OSDMonitor.cc | 2 +- src/mon/OSDMonitor.h | 1 + src/osd/OSDMapMapping.cc | 40 ++++++++++++++++++++++++++++++++++++---- src/osd/OSDMapMapping.h | 11 ++++++++--- 4 files changed, 46 insertions(+), 8 deletions(-) diff --git a/src/mon/OSDMonitor.cc b/src/mon/OSDMonitor.cc index da9c7d992464d..f3dd6b5665e7f 100644 --- a/src/mon/OSDMonitor.cc +++ b/src/mon/OSDMonitor.cc @@ -916,7 +916,7 @@ void OSDMonitor::maybe_prime_pg_temp() dout(10) << __func__ << " no pools, no pg_temp priming" << dendl; } else if (all) { PrimeTempJob job(next, this); - mapper.queue(&job, g_conf->mon_osd_mapping_pgs_per_chunk); + mapper.queue(&job, g_conf->mon_osd_mapping_pgs_per_chunk, {}); if (job.wait_for(g_conf->mon_osd_prime_pg_temp_max_time)) { dout(10) << __func__ << " done in " << job.get_duration() << dendl; } else { diff --git a/src/mon/OSDMonitor.h b/src/mon/OSDMonitor.h index d789f33d0956e..117cf22174427 100644 --- a/src/mon/OSDMonitor.h +++ b/src/mon/OSDMonitor.h @@ -320,6 +320,7 @@ private: osdmon->prime_pg_temp(*osdmap, pgid); } } + void process(const vector& pgs) override {} void complete() override {} }; void maybe_prime_pg_temp(); diff --git a/src/osd/OSDMapMapping.cc b/src/osd/OSDMapMapping.cc index 6566acaadb3ae..09c6478ca8a75 100644 --- a/src/osd/OSDMapMapping.cc +++ b/src/osd/OSDMapMapping.cc @@ -147,18 +147,50 @@ void ParallelPGMapper::Job::finish_one() void ParallelPGMapper::WQ::_process(Item *i, ThreadPool::TPHandle &h) { - ldout(m->cct, 20) << __func__ << " " << i->job << " " << i->pool - << " [" << i->begin << "," << i->end << ")" << dendl; - i->job->process(i->pool, i->begin, i->end); + ldout(m->cct, 20) << __func__ << " " << i->job << " pool " << i->pool + << " [" << i->begin << "," << i->end << ")" + << " pgs " << i->pgs + << dendl; + if (!i->pgs.empty()) + i->job->process(i->pgs); + else + i->job->process(i->pool, i->begin, i->end); i->job->finish_one(); delete i; } void ParallelPGMapper::queue( Job *job, - unsigned pgs_per_item) + unsigned pgs_per_item, + const vector& input_pgs) { bool any = false; + if (!input_pgs.empty()) { + unsigned i = 0; + vector item_pgs; + item_pgs.reserve(pgs_per_item); + for (auto& pg : input_pgs) { + if (i < pgs_per_item) { + ++i; + item_pgs.push_back(pg); + } + if (i >= pgs_per_item) { + job->start_one(); + wq.queue(new Item(job, item_pgs)); + i = 0; + item_pgs.clear(); + any = true; + } + } + if (!item_pgs.empty()) { + job->start_one(); + wq.queue(new Item(job, item_pgs)); + any = true; + } + ceph_assert(any); + return; + } + // no input pgs, load all from map for (auto& p : job->osdmap->get_pools()) { for (unsigned ps = 0; ps < p.second.get_pg_num(); ps += pgs_per_item) { unsigned ps_end = std::min(ps + pgs_per_item, p.second.get_pg_num()); diff --git a/src/osd/OSDMapMapping.h b/src/osd/OSDMapMapping.h index 86ad9743b84c2..92b74d604af2f 100644 --- a/src/osd/OSDMapMapping.h +++ b/src/osd/OSDMapMapping.h @@ -31,7 +31,8 @@ public: assert(shards == 0); } - // child must implement this + // child must implement either form of process + virtual void process(const vector& pgs) = 0; virtual void process(int64_t poolid, unsigned ps_begin, unsigned ps_end) = 0; virtual void complete() = 0; @@ -102,7 +103,9 @@ protected: Job *job; int64_t pool; unsigned begin, end; + vector pgs; + Item(Job *j, vector pgs) : job(j), pgs(pgs) {} Item(Job *j, int64_t p, unsigned b, unsigned e) : job(j), pool(p), @@ -157,7 +160,8 @@ public: void queue( Job *job, - unsigned pgs_per_item); + unsigned pgs_per_item, + const vector& input_pgs); void drain() { wq.drain(); @@ -271,6 +275,7 @@ private: : Job(osdmap), mapping(m) { mapping->_start(*osdmap); } + void process(const vector& pgs) override {} void process(int64_t pool, unsigned ps_begin, unsigned ps_end) override { mapping->_update_range(*osdmap, pool, ps_begin, ps_end); } @@ -326,7 +331,7 @@ public: ParallelPGMapper& mapper, unsigned pgs_per_item) { std::unique_ptr job(new MappingJob(&map, this)); - mapper.queue(job.get(), pgs_per_item); + mapper.queue(job.get(), pgs_per_item, {}); return job; } -- 2.39.5