next.deepish_copy_from(osdmap);
next.apply_incremental(pending_inc);
- PGMap *pg_map = &mon->pgmon()->pg_map; // FIMXE: use new creating_pgs map
-
- utime_t stop = ceph_clock_now();
- stop += g_conf->mon_osd_prime_pg_temp_max_time;
- int chunk = 1000;
- int n = chunk;
- bool abort = false;
-
if (all) {
- for (auto& pi : osdmap.get_pools()) {
- for (unsigned ps = 0; ps < pi.second.get_pg_num(); ++ps) {
- pg_t pgid(ps, pi.first);
- if (!pg_map->creating_pgs.count(pgid)) {
- prime_pg_temp(next, pgid);
- }
- if (--n <= 0) {
- n = chunk;
- if (ceph_clock_now() > stop) {
- dout(10) << __func__ << " consumed more than "
- << g_conf->mon_osd_prime_pg_temp_max_time
- << " seconds, stopping"
- << dendl;
- abort = true;
- break;
- }
- }
- }
- if (abort) {
- break;
- }
+ PrimeTempJob job(next, this);
+ mapper.queue(&job, g_conf->mon_osd_mapping_pgs_per_chunk);
+ if (job.wait_for(g_conf->mon_osd_prime_pg_temp_max_time)) {
+ dout(10) << __func__ << " done in " << job.get_duration() << dendl;
+ } else {
+ dout(10) << __func__ << " did not finish in "
+ << g_conf->mon_osd_prime_pg_temp_max_time
+ << ", stopping" << dendl;
+ job.abort();
}
} else {
dout(10) << __func__ << " " << osds.size() << " interesting osds" << dendl;
+ utime_t stop = ceph_clock_now();
+ stop += g_conf->mon_osd_prime_pg_temp_max_time;
+ const int chunk = 1000;
+ int n = chunk;
std::unordered_set<pg_t> did_pgs;
for (auto osd : osds) {
const vector<pg_t>& pgs = mapping->get_osd_acting_pgs(osd);
if (!did_pgs.insert(pgid).second) {
continue;
}
- if (!pg_map->creating_pgs.count(pgid)) {
- prime_pg_temp(next, pgid);
- }
+ prime_pg_temp(next, pgid);
if (--n <= 0) {
n = chunk;
if (ceph_clock_now() > stop) {
<< g_conf->mon_osd_prime_pg_temp_max_time
<< " seconds, stopping"
<< dendl;
- abort = true;
- break;
+ return;
}
}
}
- if (abort) {
- break;
- }
}
}
}
void OSDMonitor::prime_pg_temp(
- OSDMap& next,
+ const OSDMap& next,
pg_t pgid)
{
- // do not touch a mapping if a change is pending
- if (pending_inc.new_pg_temp.count(pgid))
+ PGMap *pg_map = &mon->pgmon()->pg_map; // FIMXE: use new creating_pgs map
+ if (!pg_map->creating_pgs.count(pgid)) {
return;
+ }
vector<int> up, acting;
mapping->get(pgid, &up, nullptr, &acting, nullptr);
<< " -> " << next_up << "/" << next_acting
<< ", priming " << acting
<< dendl;
- pending_inc.new_pg_temp[pgid] = acting;
+ {
+ Mutex::Locker l(prime_pg_temp_lock);
+ // do not touch a mapping if a change is pending
+ pending_inc.new_pg_temp.emplace(pgid, acting);
+ }
}
/**
void share_map_with_random_osd();
+ Mutex prime_pg_temp_lock = {"OSDMonitor::prime_pg_temp_lock"};
+ struct PrimeTempJob : public ParallelPGMapper::Job {
+ OSDMonitor *osdmon;
+ PrimeTempJob(const OSDMap& om, OSDMonitor *m)
+ : ParallelPGMapper::Job(&om), osdmon(m) {}
+ void process(int64_t pool, unsigned ps_begin, unsigned ps_end) override {
+ for (unsigned ps = ps_begin; ps < ps_end; ++ps) {
+ pg_t pgid(ps, pool);
+ osdmon->prime_pg_temp(*osdmap, pgid);
+ }
+ }
+ void complete() override {}
+ };
void maybe_prime_pg_temp();
- void prime_pg_temp(OSDMap& next, pg_t pgid);
+ void prime_pg_temp(const OSDMap& next, pg_t pgid);
void update_logger();