#undef dout_prefix
#define dout_prefix *_dout << "osd." << osd->whoami << " op_wq "
+void OSD::ShardedOpWQ::_wake_pg_slot(
+ spg_t pgid,
+ ShardData *sdata,
+ ShardData::pg_slot& slot,
+ unsigned *pushes_to_free)
+{
+ dout(20) << __func__ << " " << pgid
+ << " to_process " << slot.to_process
+ << " waiting_for_pg=" << (int)slot.waiting_for_pg << dendl;
+ for (auto& q : slot.to_process) {
+ *pushes_to_free += q.get_reserved_pushes();
+ }
+ for (auto i = slot.to_process.rbegin();
+ i != slot.to_process.rend();
+ ++i) {
+ sdata->_enqueue_front(std::move(*i), osd->op_prio_cutoff);
+ }
+ slot.to_process.clear();
+ slot.waiting_for_pg = false;
+ ++slot.requeue_seq;
+}
+
void OSD::ShardedOpWQ::wake_pg_waiters(spg_t pgid)
{
uint32_t shard_index = pgid.hash_to_shard(shard_list.size());
Mutex::Locker l(sdata->sdata_op_ordering_lock);
auto p = sdata->pg_slots.find(pgid);
if (p != sdata->pg_slots.end()) {
- dout(20) << __func__ << " " << pgid
- << " to_process " << p->second.to_process
- << " waiting_for_pg=" << (int)p->second.waiting_for_pg << dendl;
- for (auto& q : p->second.to_process) {
- pushes_to_free += q.get_reserved_pushes();
- }
- for (auto i = p->second.to_process.rbegin();
- i != p->second.to_process.rend();
- ++i) {
- sdata->_enqueue_front(std::move(*i), osd->op_prio_cutoff);
- }
- p->second.to_process.clear();
- p->second.waiting_for_pg = false;
- ++p->second.requeue_seq;
+ _wake_pg_slot(pgid, sdata, p->second, &pushes_to_free);
queued = true;
}
}
/// wake any pg waiters after a PG is created/instantiated
void wake_pg_waiters(spg_t pgid);
+ void _wake_pg_slot(spg_t pgid, ShardData *sdata, ShardData::pg_slot& slot,
+ unsigned *pushes_to_free);
+
/// prune ops (and possibly pg_slots) for pgs that shouldn't be here
void prune_pg_waiters(OSDMapRef osdmap, int whoami);