]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mon/OSDMonitor: use parallel work queue for prime_pg_temp (all)
authorSage Weil <sage@redhat.com>
Tue, 31 Jan 2017 20:00:28 +0000 (15:00 -0500)
committerSage Weil <sage@redhat.com>
Thu, 16 Feb 2017 17:04:50 +0000 (12:04 -0500)
Do the prime_pg_temp work in parallel using the mapper.

Signed-off-by: Sage Weil <sage@redhat.com>
src/mon/OSDMonitor.cc
src/mon/OSDMonitor.h

index 63664f04d0bf7069101fec5ca36cd4f8d45e0663..51f7b2e861b6e6cfb336708936454c5bbec49eef 100644 (file)
@@ -1035,39 +1035,23 @@ void OSDMonitor::maybe_prime_pg_temp()
   next.deepish_copy_from(osdmap);
   next.apply_incremental(pending_inc);
 
-  PGMap *pg_map = &mon->pgmon()->pg_map;  // FIMXE: use new creating_pgs map
-
-  utime_t stop = ceph_clock_now();
-  stop += g_conf->mon_osd_prime_pg_temp_max_time;
-  int chunk = 1000;
-  int n = chunk;
-  bool abort = false;
-
   if (all) {
-    for (auto& pi : osdmap.get_pools()) {
-      for (unsigned ps = 0; ps < pi.second.get_pg_num(); ++ps) {
-       pg_t pgid(ps, pi.first);
-       if (!pg_map->creating_pgs.count(pgid)) {
-         prime_pg_temp(next, pgid);
-       }
-       if (--n <= 0) {
-         n = chunk;
-         if (ceph_clock_now() > stop) {
-           dout(10) << __func__ << " consumed more than "
-                    << g_conf->mon_osd_prime_pg_temp_max_time
-                    << " seconds, stopping"
-                    << dendl;
-           abort = true;
-           break;
-         }
-       }
-      }
-      if (abort) {
-       break;
-      }
+    PrimeTempJob job(next, this);
+    mapper.queue(&job, g_conf->mon_osd_mapping_pgs_per_chunk);
+    if (job.wait_for(g_conf->mon_osd_prime_pg_temp_max_time)) {
+      dout(10) << __func__ << " done in " << job.get_duration() << dendl;
+    } else {
+      dout(10) << __func__ << " did not finish in "
+              << g_conf->mon_osd_prime_pg_temp_max_time
+              << ", stopping" << dendl;
+      job.abort();
     }
   } else {
     dout(10) << __func__ << " " << osds.size() << " interesting osds" << dendl;
+    utime_t stop = ceph_clock_now();
+    stop += g_conf->mon_osd_prime_pg_temp_max_time;
+    const int chunk = 1000;
+    int n = chunk;
     std::unordered_set<pg_t> did_pgs;
     for (auto osd : osds) {
       const vector<pg_t>& pgs = mapping->get_osd_acting_pgs(osd);
@@ -1076,9 +1060,7 @@ void OSDMonitor::maybe_prime_pg_temp()
        if (!did_pgs.insert(pgid).second) {
          continue;
        }
-       if (!pg_map->creating_pgs.count(pgid)) {
-         prime_pg_temp(next, pgid);
-       }
+       prime_pg_temp(next, pgid);
        if (--n <= 0) {
          n = chunk;
          if (ceph_clock_now() > stop) {
@@ -1086,25 +1068,22 @@ void OSDMonitor::maybe_prime_pg_temp()
                     << g_conf->mon_osd_prime_pg_temp_max_time
                     << " seconds, stopping"
                     << dendl;
-           abort = true;
-           break;
+           return;
          }
        }
       }
-      if (abort) {
-       break;
-      }
     }
   }
 }
 
 void OSDMonitor::prime_pg_temp(
-  OSDMap& next,
+  const OSDMap& next,
   pg_t pgid)
 {
-  // do not touch a mapping if a change is pending
-  if (pending_inc.new_pg_temp.count(pgid))
+  PGMap *pg_map = &mon->pgmon()->pg_map;  // FIMXE: use new creating_pgs map
+  if (!pg_map->creating_pgs.count(pgid)) {
     return;
+  }
 
   vector<int> up, acting;
   mapping->get(pgid, &up, nullptr, &acting, nullptr);
@@ -1126,7 +1105,11 @@ void OSDMonitor::prime_pg_temp(
           << " -> " << next_up << "/" << next_acting
           << ", priming " << acting
           << dendl;
-  pending_inc.new_pg_temp[pgid] = acting;
+  {
+    Mutex::Locker l(prime_pg_temp_lock);
+    // do not touch a mapping if a change is pending
+    pending_inc.new_pg_temp.emplace(pgid, acting);
+  }
 }
 
 /**
index 289b4a35e97296b3a093226f2ad7f571efdfc582..34d74bd6aedc043e53ffe545b3279d225cc2c2a0 100644 (file)
@@ -194,8 +194,21 @@ private:
 
   void share_map_with_random_osd();
 
+  Mutex prime_pg_temp_lock = {"OSDMonitor::prime_pg_temp_lock"};
+  struct PrimeTempJob : public ParallelPGMapper::Job {
+    OSDMonitor *osdmon;
+    PrimeTempJob(const OSDMap& om, OSDMonitor *m)
+      : ParallelPGMapper::Job(&om), osdmon(m) {}
+    void process(int64_t pool, unsigned ps_begin, unsigned ps_end) override {
+      for (unsigned ps = ps_begin; ps < ps_end; ++ps) {
+       pg_t pgid(ps, pool);
+       osdmon->prime_pg_temp(*osdmap, pgid);
+      }
+    }
+    void complete() override {}
+  };
   void maybe_prime_pg_temp();
-  void prime_pg_temp(OSDMap& next, pg_t pgid);
+  void prime_pg_temp(const OSDMap& next, pg_t pgid);
 
   void update_logger();