clear_temp_objects();
// initialize osdmap references in sharded wq
- op_shardedwq.prune_pg_waiters(osdmap, whoami);
+ op_shardedwq.prune_or_wake_pg_waiters(osdmap, whoami);
// load up pgs (as they previously existed)
load_pgs();
pg_map_size = pg_map.size();
pg->get("PGMap"); // because it's in pg_map
service.pg_add_epoch(pg->pg_id, createmap->get_epoch());
+
+ // make sure we register any splits that happened between when the pg
+ // was created and our latest map.
service.init_splits_between(pgid, createmap, servicemap);
}
return pg;
case MSG_OSD_RECOVERY_RESERVE:
{
MOSDPeeringOp *pm = static_cast<MOSDPeeringOp*>(m);
- return enqueue_peering_evt(
- pm->get_spg(),
- PGPeeringEventRef(pm->get_event()));
+ if (require_osd_peer(pm)) {
+ enqueue_peering_evt(
+ pm->get_spg(),
+ PGPeeringEventRef(pm->get_event()));
+ }
+ pm->put();
+ return;
}
}
// remove any PGs which we no longer host from the session waiting_for_pg lists
dout(20) << __func__ << " checking waiting_for_pg" << dendl;
- op_shardedwq.prune_pg_waiters(osdmap, whoami);
+ op_shardedwq.prune_or_wake_pg_waiters(osdmap, whoami);
service.maybe_inject_dispatch_delay();
{
dout(7) << __func__ << " " << *m << " from " << m->get_source() << dendl;
if (!require_mon_peer(m)) {
+ m->put();
return;
}
for (auto& p : m->pgs) {
true)
)));
}
+ m->put();
}
void OSD::handle_fast_pg_query(MOSDPGQuery *m)
{
dout(7) << __func__ << " " << *m << " from " << m->get_source() << dendl;
+ if (!require_osd_peer(m)) {
+ m->put();
+ return;
+ }
int from = m->get_source().num();
for (auto& p : m->pg_list) {
enqueue_peering_evt(
false))
);
}
+ m->put();
}
void OSD::handle_fast_pg_notify(MOSDPGNotify* m)
{
dout(7) << __func__ << " " << *m << " from " << m->get_source() << dendl;
+ if (!require_osd_peer(m)) {
+ m->put();
+ return;
+ }
int from = m->get_source().num();
for (auto& p : m->get_pg_list()) {
spg_t pgid(p.first.info.pgid.pgid, p.first.to);
false)
)));
}
+ m->put();
}
void OSD::handle_fast_pg_info(MOSDPGInfo* m)
{
dout(7) << __func__ << " " << *m << " from " << m->get_source() << dendl;
+ if (!require_osd_peer(m)) {
+ m->put();
+ return;
+ }
int from = m->get_source().num();
for (auto& p : m->pg_list) {
enqueue_peering_evt(
p.first.epoch_sent)))
);
}
+ m->put();
}
void OSD::handle_fast_pg_remove(MOSDPGRemove *m)
{
dout(7) << __func__ << " " << *m << " from " << m->get_source() << dendl;
+ if (!require_osd_peer(m)) {
+ m->put();
+ return;
+ }
for (auto& pgid : m->pg_list) {
enqueue_peering_evt(
pgid,
m->get_epoch(), m->get_epoch(),
PG::DeleteStart())));
}
+ m->put();
}
void OSD::handle_fast_force_recovery(MOSDForceRecovery *m)
{
dout(10) << __func__ << " " << *m << dendl;
+ if (!require_mon_or_mgr_peer(m)) {
+ m->put();
+ return;
+ }
epoch_t epoch = get_osdmap()->get_epoch();
for (auto pgid : m->forced_pgs) {
if (m->options & OFR_BACKFILL) {
}
}
-void OSD::ShardedOpWQ::prune_pg_waiters(OSDMapRef osdmap, int whoami)
+void OSD::ShardedOpWQ::prune_or_wake_pg_waiters(OSDMapRef osdmap, int whoami)
{
unsigned pushes_to_free = 0;
bool queued = false;
auto p = sdata->pg_slots.begin();
while (p != sdata->pg_slots.end()) {
ShardData::pg_slot& slot = p->second;
+ if (slot.pending_nopg_epoch &&
+ slot.pending_nopg_epoch <= osdmap->get_epoch()) {
+ dout(20) << __func__ << " " << p->first
+ << " pending_nopg_epoch " << slot.pending_nopg_epoch
+ << " < " << osdmap->get_epoch() << ", requeueing" << dendl;
+ assert(slot.waiting_for_pg);
+ assert(!slot.to_process.empty());
+ for (auto& q : slot.to_process) {
+ pushes_to_free += q.get_reserved_pushes();
+ }
+ for (auto i = slot.to_process.rbegin();
+ i != slot.to_process.rend();
+ ++i) {
+ sdata->_enqueue_front(std::move(*i), osd->op_prio_cutoff);
+ }
+ slot.to_process.clear();
+ slot.waiting_for_pg = false;
+ slot.pending_nopg_epoch = 0;
+ ++slot.requeue_seq;
+ queued = true;
+ ++p;
+ continue;
+ }
if (!slot.to_process.empty() && slot.num_running == 0) {
if (osdmap->is_up_acting_osd_shard(p->first, whoami)) {
- if (slot.pending_nopg) {
- dout(20) << __func__ << " " << p->first << " maps to us, pending create,"
- << " requeuing" << dendl;
- for (auto& q : slot.to_process) {
- pushes_to_free += q.get_reserved_pushes();
- }
- for (auto i = slot.to_process.rbegin();
- i != slot.to_process.rend();
- ++i) {
- sdata->_enqueue_front(std::move(*i), osd->op_prio_cutoff);
- }
- slot.to_process.clear();
- slot.waiting_for_pg = false;
- slot.pending_nopg = false;
- ++slot.requeue_seq;
- queued = true;
- } else {
- dout(20) << __func__ << " " << p->first << " maps to us, keeping"
- << dendl;
- }
+ dout(20) << __func__ << " " << p->first << " maps to us, keeping"
+ << dendl;
++p;
continue;
}
dout(30) << __func__ << " " << token
<< " to_process " << slot.to_process
<< " waiting_for_pg=" << (int)slot.waiting_for_pg << dendl;
+ bool can_wait = item.requires_pg() && !item.creates_pg();
slot.to_process.push_back(std::move(item));
// note the requeue seq now...
requeue_seq = slot.requeue_seq;
- if (slot.waiting_for_pg) {
- // save ourselves a bit of effort
+ if (slot.waiting_for_pg && can_wait) {
dout(20) << __func__ << slot.to_process.back()
- << " queued, waiting_for_pg" << dendl;
+ << " queued, already waiting_for_pg" << dendl;
sdata->sdata_op_ordering_lock.Unlock();
return;
}
--slot.num_running;
if (slot.to_process.empty()) {
- // raced with wake_pg_waiters or prune_pg_waiters
+ // raced with wake_pg_waiters or prune_or_wake_pg_waiters
dout(20) << __func__ << " " << token
<< " nothing queued" << dendl;
if (pg) {
OSDMapRef osdmap = sdata->waiting_for_pg_osdmap;
const PGCreateInfo *create_info = qi.creates_pg();
if (qi.get_map_epoch() > osdmap->get_epoch()) {
- dout(20) << __func__ << " " << token
- << " no pg, item epoch is "
- << qi.get_map_epoch() << " > " << osdmap->get_epoch()
- << ", will wait on " << qi << dendl;
if (!!create_info || !qi.requires_pg()) {
- slot.pending_nopg = true;
+ if (!slot.pending_nopg_epoch ||
+ slot.pending_nopg_epoch > qi.get_map_epoch()) {
+ slot.pending_nopg_epoch = qi.get_map_epoch();
+ }
+ dout(20) << __func__ << " " << token
+ << " no pg, item epoch is "
+ << qi.get_map_epoch() << " > " << osdmap->get_epoch()
+ << ", will wait on " << qi
+ << ", pending_nopg_epoch now "
+ << slot.pending_nopg_epoch << dendl;
+ } else {
+ dout(20) << __func__ << " " << token
+ << " no pg, item epoch is "
+ << qi.get_map_epoch() << " > " << osdmap->get_epoch()
+ << ", will wait on " << qi << dendl;
}
slot.to_process.push_front(std::move(qi));
slot.waiting_for_pg = true;
+ } else if (!qi.requires_pg()) {
+ // for pg-less events, we run them under the ordering lock, since
+ // we don't have the pg lock to keep them ordered.
+ qi.run(osd, pg, tp_handle);
+ sdata->sdata_op_ordering_lock.Unlock();
+ return;
} else if (osdmap->is_up_acting_osd_shard(token, osd->whoami)) {
if (osd->service.splitting(token)) {
dout(20) << __func__ << " " << token
}
dout(20) << __func__ << " ignored create on " << qi << dendl;
}
- } else if (!qi.requires_pg()) {
- // for pg-less events, we run them under the ordering lock, since
- // we don't have the pg lock to keep them ordered.
- qi.run(osd, pg, tp_handle);
- sdata->sdata_op_ordering_lock.Unlock();
- return;
} else {
dout(20) << __func__ << " " << token
<< " no pg, should exist, will wait on " << qi << dendl;