.set_default(40)
.set_description(""),
+ Option("osd_pg_epoch_max_lag_factor", Option::TYPE_FLOAT, Option::LEVEL_ADVANCED)
+ .set_default(2.0)
+ .set_description("Max multiple of the map cache that PGs can lag before we throttle map injest")
+ .add_see_also("osd_map_cache_size"),
+
Option("osd_inject_bad_map_crc_probability", Option::TYPE_FLOAT, Option::LEVEL_DEV)
.set_default(0)
.set_description(""),
store_is_rotational(store->is_rotational()),
trace_endpoint("0.0.0.0", 0, "osd"),
asok_hook(NULL),
+ m_osd_pg_epoch_max_lag_factor(cct->_conf->get_val<double>(
+ "osd_pg_epoch_max_lag_factor")),
osd_compat(get_osd_compat_set()),
osd_op_tp(cct, "OSD::osd_op_tp", "tp_osd_tp",
get_num_op_threads()),
skip_maps = true;
}
+ // wait for pgs to catch up
+ {
+ // we extend the map cache pins to accomodate pgs slow to consume maps
+ // for some period, until we hit the max_lag_factor bound, at which point
+ // we block here to stop injesting more maps than they are able to keep
+ // up with.
+ epoch_t max_lag = cct->_conf->osd_map_cache_size *
+ m_osd_pg_epoch_max_lag_factor;
+ assert(max_lag > 0);
+ if (osdmap->get_epoch() > max_lag) {
+ epoch_t min = service.get_min_pg_epoch();
+ epoch_t need = osdmap->get_epoch() - max_lag;
+ if (need > min) {
+ dout(10) << __func__ << " waiting for pgs to consume " << need
+ << " (current min " << min
+ << ", map cache is " << cct->_conf->osd_map_cache_size
+ << ", max_lag_factor " << m_osd_pg_epoch_max_lag_factor
+ << ")" << dendl;
+ service.wait_min_pg_epoch(need);
+ }
+ }
+ }
+
ObjectStore::Transaction t;
uint64_t txn_size = 0;
"osd_op_history_slow_op_threshold",
"osd_enable_op_tracker",
"osd_map_cache_size",
+ "osd_pg_epoch_max_lag_factor",
"osd_pg_epoch_persisted_max_stale",
"osd_disk_thread_ioprio_class",
"osd_disk_thread_ioprio_priority",
changed.count("fsid")) {
update_log_config();
}
+ if (changed.count("osd_pg_epoch_max_lag_factor")) {
+ m_osd_pg_epoch_max_lag_factor = conf->get_val<double>(
+ "osd_pg_epoch_max_lag_factor");
+ }
#ifdef HAVE_LIBFUSE
if (changed.count("osd_objectstore_fuse")) {
private:
// -- map epoch lower bound --
Mutex pg_epoch_lock;
+ Cond pg_cond;
multiset<epoch_t> pg_epochs;
map<spg_t,epoch_t> pg_epoch;
assert(t == pg_epoch.end());
pg_epoch[pgid] = epoch;
pg_epochs.insert(epoch);
+ if (*pg_epochs.begin() == epoch) {
+ // we are the (new?) blocking epoch
+ pg_cond.Signal();
+ }
}
void pg_update_epoch(spg_t pgid, epoch_t epoch) {
Mutex::Locker l(pg_epoch_lock);
map<spg_t,epoch_t>::iterator t = pg_epoch.find(pgid);
assert(t != pg_epoch.end());
+ if (*pg_epochs.begin() == t->second) {
+ // we were on the blocking epoch
+ pg_cond.Signal();
+ }
pg_epochs.erase(pg_epochs.find(t->second));
t->second = epoch;
pg_epochs.insert(epoch);
Mutex::Locker l(pg_epoch_lock);
map<spg_t,epoch_t>::iterator t = pg_epoch.find(pgid);
if (t != pg_epoch.end()) {
+ if (*pg_epochs.begin() == t->second) {
+ // we were on the blocking epoch
+ pg_cond.Signal();
+ }
pg_epochs.erase(pg_epochs.find(t->second));
pg_epoch.erase(t);
}
return *pg_epochs.begin();
}
+ void wait_min_pg_epoch(epoch_t e) {
+ Mutex::Locker l(pg_epoch_lock);
+ while (!pg_epochs.empty() &&
+ *pg_epochs.begin() < e) {
+ pg_cond.Wait(pg_epoch_lock);
+ }
+ }
+
private:
// -- superblock --
Mutex publish_lock, pre_publish_lock; // pre-publish orders before publish
class C_Tick;
class C_Tick_WithoutOSDLock;
+ // -- config settings --
+ float m_osd_pg_epoch_max_lag_factor;
+
// -- superblock --
OSDSuperblock superblock;