m_osd_pg_epoch_max_lag_factor;
assert(max_lag > 0);
if (osdmap->get_epoch() > max_lag) {
- epoch_t min = service.get_min_pg_epoch();
epoch_t need = osdmap->get_epoch() - max_lag;
- if (need > min) {
- dout(10) << __func__ << " waiting for pgs to consume " << need
- << " (current min " << min
- << ", map cache is " << cct->_conf->osd_map_cache_size
- << ", max_lag_factor " << m_osd_pg_epoch_max_lag_factor
- << ")" << dendl;
- service.wait_min_pg_epoch(need);
+ for (auto shard : shards) {
+ epoch_t min = shard->get_min_pg_epoch();
+ if (need > min) {
+ dout(10) << __func__ << " waiting for pgs to consume " << need
+ << " (shard " << shard->shard_id << " min " << min
+ << ", map cache is " << cct->_conf->osd_map_cache_size
+ << ", max_lag_factor " << m_osd_pg_epoch_max_lag_factor
+ << ")" << dendl;
+ shard->wait_min_pg_epoch(need);
+ }
}
}
}
if (!is_active()) {
dout(10) << " not yet active; waiting for peering work to drain" << dendl;
- service.wait_min_pg_epoch(last);
+ for (auto shard : shards) {
+ shard->wait_min_pg_epoch(last);
+ }
} else {
activate_map();
}
pg_slots_by_epoch.erase(pg_slots_by_epoch.iterator_to(*slot));
slot->epoch = 0;
+ if (waiting_for_min_pg_epoch) {
+ min_pg_epoch_cond.Signal();
+ }
}
void OSDShard::update_pg_epoch(OSDShardPGSlot *slot, epoch_t e)
pg_slots_by_epoch.insert(*slot);
dout(20) << "min is now " << pg_slots_by_epoch.begin()->epoch
<< " on " << pg_slots_by_epoch.begin()->pg->pg_id << dendl;
+ if (waiting_for_min_pg_epoch) {
+ min_pg_epoch_cond.Signal();
+ }
}
epoch_t OSDShard::get_min_pg_epoch()
return p->epoch;
}
+void OSDShard::wait_min_pg_epoch(epoch_t need)
+{
+ Mutex::Locker l(sdata_op_ordering_lock);
+ waiting_for_min_pg_epoch = true;
+ while (!pg_slots_by_epoch.empty() &&
+ pg_slots_by_epoch.begin()->epoch < need) {
+ dout(10) << need << " waiting on "
+ << pg_slots_by_epoch.begin()->epoch << dendl;
+ min_pg_epoch_cond.Wait(sdata_op_ordering_lock);
+ }
+ waiting_for_min_pg_epoch = false;
+}
+
void OSDShard::consume_map(
OSDMapRef& new_osdmap,
unsigned *pushes_to_free,
boost::intrusive::set_member_hook<>,
&OSDShardPGSlot::pg_epoch_item>,
boost::intrusive::compare<pg_slot_compare_by_epoch>> pg_slots_by_epoch;
+ bool waiting_for_min_pg_epoch = false;
+ Cond min_pg_epoch_cond;
/// priority queue
std::unique_ptr<OpQueue<OpQueueItem, uint64_t>> pqueue;
void update_pg_epoch(OSDShardPGSlot *slot, epoch_t epoch);
epoch_t get_min_pg_epoch();
+ void wait_min_pg_epoch(epoch_t need);
/// push osdmap into shard
void consume_map(