}
-void OSDService::queue_want_pg_temp(pg_t pgid, const vector<int>& want)
+void OSDService::queue_want_pg_temp(pg_t pgid,
+ const vector<int>& want,
+ bool forced)
{
Mutex::Locker l(pg_temp_lock);
- map<pg_t,vector<int> >::iterator p = pg_temp_pending.find(pgid);
+ auto p = pg_temp_pending.find(pgid);
if (p == pg_temp_pending.end() ||
- p->second != want) {
- pg_temp_wanted[pgid] = want;
+ p->second.acting != want ||
+ forced) {
+ pg_temp_wanted[pgid] = {want, forced};
}
}
<< pg_temp_wanted.size() << dendl;
}
+std::ostream& operator<<(std::ostream& out,
+ const OSDService::pg_temp_t& pg_temp)
+{
+ out << pg_temp.acting;
+ if (pg_temp.forced) {
+ out << " (forced)";
+ }
+ return out;
+}
+
void OSDService::send_pg_temp()
{
Mutex::Locker l(pg_temp_lock);
if (pg_temp_wanted.empty())
return;
dout(10) << "send_pg_temp " << pg_temp_wanted << dendl;
- MOSDPGTemp *m = new MOSDPGTemp(osdmap->get_epoch());
- m->pg_temp = pg_temp_wanted;
- monc->send_mon_message(m);
+ MOSDPGTemp *ms[2] = {nullptr, nullptr};
+ for (auto& [pgid, pg_temp] : pg_temp_wanted) {
+ auto& m = ms[pg_temp.forced];
+ if (!m) {
+ m = new MOSDPGTemp(osdmap->get_epoch());
+ m->forced = pg_temp.forced;
+ }
+ m->pg_temp.emplace(pgid, pg_temp.acting);
+ }
+ for (auto m : ms) {
+ if (m) {
+ monc->send_mon_message(m);
+ }
+ }
_sent_pg_temp();
}
{
bool do_sub_pg_creates = false;
bool have_pending_creates = false;
- MOSDPGTemp *pgtemp = nullptr;
{
const auto max_pgs_per_osd =
(cct->_conf->get_val<uint64_t>("mon_max_pg_per_osd") *
auto pg = pending_creates_from_osd.cbegin();
while (spare_pgs > 0 && pg != pending_creates_from_osd.cend()) {
dout(20) << __func__ << " pg " << pg->first << dendl;
- if (!pgtemp) {
- pgtemp = new MOSDPGTemp{osdmap->get_epoch()};
- }
vector<int> acting;
osdmap->pg_to_up_acting_osds(pg->first, nullptr, nullptr, &acting, nullptr);
- pgtemp->pg_temp[pg->first] = twiddle(acting);
+ service.queue_want_pg_temp(pg->first, twiddle(acting), true);
pg = pending_creates_from_osd.erase(pg);
+ do_sub_pg_creates = true;
spare_pgs--;
}
have_pending_creates = (pending_creates_from_mon > 0 ||
<< start << dendl;
do_renew_subs = true;
}
- } else if (pgtemp || do_sub_pg_creates) {
+ } else if (do_sub_pg_creates) {
// no need to subscribe the osdmap continuously anymore
// once the pgtemp and/or mon_subscribe(pg_creates) is sent
if (monc->sub_want_increment("osdmap", start, CEPH_SUBSCRIBE_ONETIME)) {
monc->renew_subs();
}
- if (pgtemp) {
- pgtemp->forced = true;
- monc->send_mon_message(pgtemp);
- }
+ service.send_pg_temp();
}
void OSD::build_initial_pg_history(
// -- pg_temp --
private:
Mutex pg_temp_lock;
- map<pg_t, vector<int> > pg_temp_wanted;
- map<pg_t, vector<int> > pg_temp_pending;
+ struct pg_temp_t {
+ vector<int> acting;
+ bool forced = false;
+ };
+ map<pg_t, pg_temp_t> pg_temp_wanted;
+ map<pg_t, pg_temp_t> pg_temp_pending;
void _sent_pg_temp();
+ friend std::ostream& operator<<(std::ostream&, const pg_temp_t&);
public:
- void queue_want_pg_temp(pg_t pgid, const vector<int>& want);
+ void queue_want_pg_temp(pg_t pgid, const vector<int>& want,
+ bool forced = false);
void remove_want_pg_temp(pg_t pgid);
void requeue_pg_temp();
void send_pg_temp();
pg_shard_t old_acting_primary = get_primary();
pg_shard_t old_up_primary = up_primary;
bool was_old_primary = is_primary();
+ bool was_old_replica = is_replica();
acting.swap(oldacting);
up.swap(oldup);
acting_recovery_backfill.clear();
scrub_queued = false;
- // reset primary state?
+ // reset primary/replica state?
if (was_old_primary || is_primary()) {
osd->remove_want_pg_temp(info.pgid.pgid);
+ } else if (was_old_replica || is_replica()) {
+ osd->remove_want_pg_temp(info.pgid.pgid);
}
clear_primary_state();