]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
osd/OSDMapMapping: make ParallelPGMapper can accept input pgs
authorxie xingguo <xie.xingguo@zte.com.cn>
Wed, 5 Jun 2019 02:41:52 +0000 (10:41 +0800)
committerNathan Cutler <ncutler@suse.com>
Wed, 26 Jun 2019 15:25:12 +0000 (17:25 +0200)
The existing "prime temp" machinism is a good inspiration
for cluster with a large number of pgs that need to do various
calculations quickly.
I am planning to do the upmap tidy-up work the same way, hence
the need for an alternate way of specifying pgs to process other
than taking directly from the map.

Signed-off-by: xie xingguo <xie.xingguo@zte.com.cn>
(cherry picked from commit f6fd4a312e0dda260f2c150334f06b531678ce47)

src/mon/OSDMonitor.cc
src/mon/OSDMonitor.h
src/osd/OSDMapMapping.cc
src/osd/OSDMapMapping.h

index 0b4c81fe86cb54f052a43df7859c842c8bd15d1d..e7f347750173b9f6f03e96f866c8891306a14411 100644 (file)
@@ -929,7 +929,7 @@ void OSDMonitor::maybe_prime_pg_temp()
     dout(10) << __func__ << " no pools, no pg_temp priming" << dendl;
   } else if (all) {
     PrimeTempJob job(next, this);
-    mapper.queue(&job, g_conf()->mon_osd_mapping_pgs_per_chunk);
+    mapper.queue(&job, g_conf()->mon_osd_mapping_pgs_per_chunk, {});
     if (job.wait_for(g_conf()->mon_osd_prime_pg_temp_max_time)) {
       dout(10) << __func__ << " done in " << job.get_duration() << dendl;
     } else {
index 4968a2d6270544b1f220c1fe92206eeebbe90eaa..1cb9f5262c2878275f2cddbbd84df4048d02e0d7 100644 (file)
@@ -321,6 +321,7 @@ private:
        osdmon->prime_pg_temp(*osdmap, pgid);
       }
     }
+    void process(const vector<pg_t>& pgs) override {}
     void complete() override {}
   };
   void maybe_prime_pg_temp();
index 6640d83dd8ac7480cb9e68f4e10de42b48ebae7c..285b2d8f561932a9be92a2bd4de0ee77638e8d59 100644 (file)
@@ -147,18 +147,50 @@ void ParallelPGMapper::Job::finish_one()
 
 void ParallelPGMapper::WQ::_process(Item *i, ThreadPool::TPHandle &h)
 {
-  ldout(m->cct, 20) << __func__ << " " << i->job << " " << i->pool
-                   << " [" << i->begin << "," << i->end << ")" << dendl;
-  i->job->process(i->pool, i->begin, i->end);
+  ldout(m->cct, 20) << __func__ << " " << i->job << " pool " << i->pool
+                    << " [" << i->begin << "," << i->end << ")"
+                    << " pgs " << i->pgs
+                    << dendl;
+  if (!i->pgs.empty())
+    i->job->process(i->pgs);
+  else
+    i->job->process(i->pool, i->begin, i->end);
   i->job->finish_one();
   delete i;
 }
 
 void ParallelPGMapper::queue(
   Job *job,
-  unsigned pgs_per_item)
+  unsigned pgs_per_item,
+  const vector<pg_t>& input_pgs)
 {
   bool any = false;
+  if (!input_pgs.empty()) {
+    unsigned i = 0;
+    vector<pg_t> item_pgs;
+    item_pgs.reserve(pgs_per_item);
+    for (auto& pg : input_pgs) {
+      if (i < pgs_per_item) {
+        ++i;
+        item_pgs.push_back(pg);
+      }
+      if (i >= pgs_per_item) {
+        job->start_one();
+        wq.queue(new Item(job, item_pgs));
+        i = 0;
+        item_pgs.clear();
+        any = true;
+      }
+    }
+    if (!item_pgs.empty()) {
+      job->start_one();
+      wq.queue(new Item(job, item_pgs));
+      any = true;
+    }
+    ceph_assert(any);
+    return;
+  }
+  // no input pgs, load all from map
   for (auto& p : job->osdmap->get_pools()) {
     for (unsigned ps = 0; ps < p.second.get_pg_num(); ps += pgs_per_item) {
       unsigned ps_end = std::min(ps + pgs_per_item, p.second.get_pg_num());
index 9ceef9ff0664cae03172e6881af2ae14234767e8..b0965fc12a713d2540eed1be793ee962d540cd28 100644 (file)
@@ -32,7 +32,8 @@ public:
       ceph_assert(shards == 0);
     }
 
-    // child must implement this
+    // child must implement either form of process
+    virtual void process(const vector<pg_t>& pgs) = 0;
     virtual void process(int64_t poolid, unsigned ps_begin, unsigned ps_end) = 0;
     virtual void complete() = 0;
 
@@ -103,7 +104,9 @@ protected:
     Job *job;
     int64_t pool;
     unsigned begin, end;
+    vector<pg_t> pgs;
 
+    Item(Job *j, vector<pg_t> pgs) : job(j), pgs(pgs) {}
     Item(Job *j, int64_t p, unsigned b, unsigned e)
       : job(j),
        pool(p),
@@ -158,7 +161,8 @@ public:
 
   void queue(
     Job *job,
-    unsigned pgs_per_item);
+    unsigned pgs_per_item,
+    const vector<pg_t>& input_pgs);
 
   void drain() {
     wq.drain();
@@ -275,6 +279,7 @@ private:
       : Job(osdmap), mapping(m) {
       mapping->_start(*osdmap);
     }
+    void process(const vector<pg_t>& pgs) override {}
     void process(int64_t pool, unsigned ps_begin, unsigned ps_end) override {
       mapping->_update_range(*osdmap, pool, ps_begin, ps_end);
     }
@@ -330,7 +335,7 @@ public:
     ParallelPGMapper& mapper,
     unsigned pgs_per_item) {
     std::unique_ptr<MappingJob> job(new MappingJob(&map, this));
-    mapper.queue(job.get(), pgs_per_item);
+    mapper.queue(job.get(), pgs_per_item, {});
     return job;
   }