]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd/OSDMapMapping: make ParallelPGMapper can accept input pgs
authorxie xingguo <xie.xingguo@zte.com.cn>
Wed, 5 Jun 2019 02:41:52 +0000 (10:41 +0800)
committerxie xingguo <xie.xingguo@zte.com.cn>
Tue, 18 Jun 2019 02:09:17 +0000 (10:09 +0800)
The existing "prime temp" machinism is a good inspiration
for cluster with a large number of pgs that need to do various
calculations quickly.
I am planning to do the upmap tidy-up work the same way, hence
the need for an alternate way of specifying pgs to process other
than taking directly from the map.

Signed-off-by: xie xingguo <xie.xingguo@zte.com.cn>
(cherry picked from commit f6fd4a312e0dda260f2c150334f06b531678ce47)

Conflicts:
s/g_conf./g_conf->/

src/mon/OSDMonitor.cc
src/mon/OSDMonitor.h
src/osd/OSDMapMapping.cc
src/osd/OSDMapMapping.h

index da9c7d992464dacf239215e3187a34e3e87207c3..f3dd6b5665e7ff643cae94187146f60b31cee244 100644 (file)
@@ -916,7 +916,7 @@ void OSDMonitor::maybe_prime_pg_temp()
     dout(10) << __func__ << " no pools, no pg_temp priming" << dendl;
   } else if (all) {
     PrimeTempJob job(next, this);
-    mapper.queue(&job, g_conf->mon_osd_mapping_pgs_per_chunk);
+    mapper.queue(&job, g_conf->mon_osd_mapping_pgs_per_chunk, {});
     if (job.wait_for(g_conf->mon_osd_prime_pg_temp_max_time)) {
       dout(10) << __func__ << " done in " << job.get_duration() << dendl;
     } else {
index d789f33d0956ea3b41ce88b9030e104cf7d9e4dd..117cf2217442792a283b14ec69da7bcc468a1b0c 100644 (file)
@@ -320,6 +320,7 @@ private:
        osdmon->prime_pg_temp(*osdmap, pgid);
       }
     }
+    void process(const vector<pg_t>& pgs) override {}
     void complete() override {}
   };
   void maybe_prime_pg_temp();
index 6566acaadb3ae20061272f2b7ee7df34bb2e7549..09c6478ca8a75739688d866ec145c38a2b78e86f 100644 (file)
@@ -147,18 +147,50 @@ void ParallelPGMapper::Job::finish_one()
 
 void ParallelPGMapper::WQ::_process(Item *i, ThreadPool::TPHandle &h)
 {
-  ldout(m->cct, 20) << __func__ << " " << i->job << " " << i->pool
-                   << " [" << i->begin << "," << i->end << ")" << dendl;
-  i->job->process(i->pool, i->begin, i->end);
+  ldout(m->cct, 20) << __func__ << " " << i->job << " pool " << i->pool
+                    << " [" << i->begin << "," << i->end << ")"
+                    << " pgs " << i->pgs
+                    << dendl;
+  if (!i->pgs.empty())
+    i->job->process(i->pgs);
+  else
+    i->job->process(i->pool, i->begin, i->end);
   i->job->finish_one();
   delete i;
 }
 
 void ParallelPGMapper::queue(
   Job *job,
-  unsigned pgs_per_item)
+  unsigned pgs_per_item,
+  const vector<pg_t>& input_pgs)
 {
   bool any = false;
+  if (!input_pgs.empty()) {
+    unsigned i = 0;
+    vector<pg_t> item_pgs;
+    item_pgs.reserve(pgs_per_item);
+    for (auto& pg : input_pgs) {
+      if (i < pgs_per_item) {
+        ++i;
+        item_pgs.push_back(pg);
+      }
+      if (i >= pgs_per_item) {
+        job->start_one();
+        wq.queue(new Item(job, item_pgs));
+        i = 0;
+        item_pgs.clear();
+        any = true;
+      }
+    }
+    if (!item_pgs.empty()) {
+      job->start_one();
+      wq.queue(new Item(job, item_pgs));
+      any = true;
+    }
+    ceph_assert(any);
+    return;
+  }
+  // no input pgs, load all from map
   for (auto& p : job->osdmap->get_pools()) {
     for (unsigned ps = 0; ps < p.second.get_pg_num(); ps += pgs_per_item) {
       unsigned ps_end = std::min(ps + pgs_per_item, p.second.get_pg_num());
index 86ad9743b84c2254e0c872da9cf641ee28f57fa7..92b74d604af2f21530abac77fe3a5f0c102c8241 100644 (file)
@@ -31,7 +31,8 @@ public:
       assert(shards == 0);
     }
 
-    // child must implement this
+    // child must implement either form of process
+    virtual void process(const vector<pg_t>& pgs) = 0;
     virtual void process(int64_t poolid, unsigned ps_begin, unsigned ps_end) = 0;
     virtual void complete() = 0;
 
@@ -102,7 +103,9 @@ protected:
     Job *job;
     int64_t pool;
     unsigned begin, end;
+    vector<pg_t> pgs;
 
+    Item(Job *j, vector<pg_t> pgs) : job(j), pgs(pgs) {}
     Item(Job *j, int64_t p, unsigned b, unsigned e)
       : job(j),
        pool(p),
@@ -157,7 +160,8 @@ public:
 
   void queue(
     Job *job,
-    unsigned pgs_per_item);
+    unsigned pgs_per_item,
+    const vector<pg_t>& input_pgs);
 
   void drain() {
     wq.drain();
@@ -271,6 +275,7 @@ private:
       : Job(osdmap), mapping(m) {
       mapping->_start(*osdmap);
     }
+    void process(const vector<pg_t>& pgs) override {}
     void process(int64_t pool, unsigned ps_begin, unsigned ps_end) override {
       mapping->_update_range(*osdmap, pool, ps_begin, ps_end);
     }
@@ -326,7 +331,7 @@ public:
     ParallelPGMapper& mapper,
     unsigned pgs_per_item) {
     std::unique_ptr<MappingJob> job(new MappingJob(&map, this));
-    mapper.queue(job.get(), pgs_per_item);
+    mapper.queue(job.get(), pgs_per_item, {});
     return job;
   }