* look up a pg. if we have it, great. if not, consider creating it IF the pg mapping
* hasn't changed since the given epoch and we are the primary.
*/
-PG *OSD::get_or_create_pg(
- const pg_info_t& info, pg_interval_map_t& pi,
- epoch_t epoch, int from, bool primary)
+void OSD::handle_pg_peering_evt(
+ const pg_info_t& info,
+ pg_interval_map_t& pi,
+ epoch_t epoch,
+ int from,
+ bool primary,
+ PG::CephPeeringEvtRef evt)
{
+ if (service.splitting(info.pgid)) {
+ peering_wait_for_split[info.pgid].push_back(evt);
+ return;
+ }
+
PG *pg;
if (!_have_pg(info.pgid)) {
// same primary?
if (!osdmap->have_pg_pool(info.pgid.pool()))
- return 0;
+ return;
vector<int> up, acting;
osdmap->pg_to_up_acting_osds(info.pgid, up, acting);
int role = osdmap->calc_pg_role(whoami, acting, acting.size());
if (epoch < history.same_interval_since) {
dout(10) << "get_or_create_pg " << info.pgid << " acting changed in "
<< history.same_interval_since << " (msg from " << epoch << ")" << dendl;
- return NULL;
+ return;
}
if (service.splitting(info.pgid)) {
if (creating_pgs.count(info.pgid)) {
creating_pgs[info.pgid].prior.erase(from);
if (!can_create_pg(info.pgid))
- return NULL;
+ return;
history = creating_pgs[info.pgid].history;
create = true;
} else {
dout(10) << "get_or_create_pg " << info.pgid
<< " DNE on source, but creation probe, ignoring" << dendl;
- return NULL;
+ return;
}
}
creating_pgs.erase(info.pgid);
<< pg->info.history.same_interval_since
<< " (msg from " << epoch << ")" << dendl;
pg->unlock();
- return NULL;
+ return;
}
}
- return pg;
+
+ pg->queue_peering_event(evt);
+ pg->unlock();
}
for (vector<pair<pg_notify_t, pg_interval_map_t> >::iterator it = m->get_pg_list().begin();
it != m->get_pg_list().end();
++it) {
- PG *pg = 0;
if (it->first.info.pgid.preferred() >= 0) {
dout(20) << "ignoring localized pg " << it->first.info.pgid << dendl;
continue;
}
- if (service.splitting(it->first.info.pgid)) {
- peering_wait_for_split[it->first.info.pgid].push_back(
- PG::CephPeeringEvtRef(
- new PG::CephPeeringEvt(
- it->first.epoch_sent, it->first.query_epoch,
- PG::MNotifyRec(from, it->first))));
- continue;
- }
-
- pg = get_or_create_pg(it->first.info, it->second,
- it->first.query_epoch, from, true);
- if (!pg)
- continue;
- pg->queue_notify(it->first.epoch_sent, it->first.query_epoch, from, it->first);
- pg->unlock();
+ handle_pg_peering_evt(
+ it->first.info, it->second,
+ it->first.query_epoch, from, true,
+ PG::CephPeeringEvtRef(
+ new PG::CephPeeringEvt(
+ it->first.epoch_sent, it->first.query_epoch,
+ PG::MNotifyRec(from, it->first)))
+ );
}
}
return;
}
- if (service.splitting(m->info.pgid)) {
- peering_wait_for_split[m->info.pgid].push_back(
- PG::CephPeeringEvtRef(
- new PG::CephPeeringEvt(
- m->get_epoch(), m->get_query_epoch(),
- PG::MLogRec(from, m))));
- return;
- }
-
- PG *pg = get_or_create_pg(m->info, m->past_intervals, m->get_epoch(),
- from, false);
- if (!pg)
- return;
op->mark_started();
- pg->queue_log(m->get_epoch(), m->get_query_epoch(), from, m);
- pg->unlock();
+ handle_pg_peering_evt(
+ m->info, m->past_intervals, m->get_epoch(),
+ from, false,
+ PG::CephPeeringEvtRef(
+ new PG::CephPeeringEvt(
+ m->get_epoch(), m->get_query_epoch(),
+ PG::MLogRec(from, m)))
+ );
}
void OSD::handle_pg_info(OpRequestRef op)
continue;
}
- if (service.splitting(p->first.info.pgid)) {
- peering_wait_for_split[p->first.info.pgid].push_back(
- PG::CephPeeringEvtRef(
- new PG::CephPeeringEvt(
- p->first.epoch_sent, p->first.query_epoch,
- PG::MInfoRec(from, p->first.info, p->first.epoch_sent))));
- continue;
- }
- PG *pg = get_or_create_pg(p->first.info, p->second, p->first.epoch_sent,
- from, false);
- if (!pg)
- continue;
- pg->queue_info(p->first.epoch_sent, p->first.query_epoch, from,
- p->first.info);
- pg->unlock();
+ handle_pg_peering_evt(
+ p->first.info, p->second, p->first.epoch_sent,
+ from, false,
+ PG::CephPeeringEvtRef(
+ new PG::CephPeeringEvt(
+ p->first.epoch_sent, p->first.query_epoch,
+ PG::MInfoRec(from, p->first.info, p->first.epoch_sent)))
+ );
}
}