debug_drop_pg_create_duration(cct->_conf->osd_debug_drop_pg_create_duration),
debug_drop_pg_create_left(-1),
stats_ack_timeout(cct->_conf->osd_mon_ack_timeout),
- outstanding_pg_stats(0),
up_thru_wanted(0), up_thru_pending(0),
requested_full_first(0),
requested_full_last(0),
bool report = false;
utime_t now = ceph_clock_now(cct);
pg_stat_queue_lock.Lock();
- if (outstanding_pg_stats &&
+ if (!outstanding_pg_stats.empty() &&
(now - stats_ack_timeout) > last_pg_stats_ack) {
dout(1) << __func__ << " mon hasn't acked PGStats in "
<< now - last_pg_stats_ack
stats_ack_timeout =
MAX(g_conf->osd_mon_ack_timeout,
stats_ack_timeout * g_conf->osd_stats_ack_timeout_factor);
+ outstanding_pg_stats.clear();
}
if (now - last_pg_stats_sent > cct->_conf->osd_mon_report_interval_max) {
osd_stat_updated = true;
report = true;
- } else if (outstanding_pg_stats >= cct->_conf->osd_mon_report_max_in_flight) {
+ } else if ((int)outstanding_pg_stats.size() >=
+ cct->_conf->osd_mon_report_max_in_flight) {
dout(20) << __func__ << " have max " << outstanding_pg_stats
<< " stats updates in flight" << dendl;
} else {
}
pg_stat_queue_lock.Unlock();
- if (reset)
+ if (reset) {
monc->reopen_session();
- else if (report) {
+ } else if (report) {
last_mon_report = now;
// do any pending reports
return;
dout(10) << "ms_handle_connect on mon" << dendl;
- // reset pg stats state
- pg_stat_queue_lock.Lock();
- outstanding_pg_stats = 0;
- pg_stat_queue_lock.Unlock();
-
if (is_booting()) {
start_boot();
} else {
had_for -= had_map_since;
MPGStats *m = new MPGStats(monc->get_fsid(), osdmap->get_epoch(), had_for);
- m->set_tid(++pg_stat_tid);
+ uint64_t tid = ++pg_stat_tid;
+ m->set_tid(tid);
m->osd_stat = cur_stat;
xlist<PG*>::iterator p = pg_stat_queue.begin();
pg->pg_stats_publish_lock.Unlock();
}
- if (!outstanding_pg_stats) {
+ if (!outstanding_pg_stats.empty()) {
last_pg_stats_ack = ceph_clock_now(cct);
}
- ++outstanding_pg_stats;
- dout(20) << __func__ << " " << outstanding_pg_stats << " updates pending"
- << dendl;
+ outstanding_pg_stats.insert(tid);
+ dout(20) << __func__ << " updates pending: " << outstanding_pg_stats << dendl;
monc->send_mon_message(m);
}
return;
}
+ // NOTE: we may get replies from a previous mon even while
+ // outstanding_pg_stats is empty if reconnecting races with replies
+ // in flight.
+
pg_stat_queue_lock.Lock();
last_pg_stats_ack = ceph_clock_now(cct);
}
}
- assert(outstanding_pg_stats > 0);
- --outstanding_pg_stats;
- if (!pg_stat_queue.size()) {
- assert(outstanding_pg_stats == 0);
- }
- dout(20) << __func__ << " " << outstanding_pg_stats << " updates pending"
- << dendl;
+ outstanding_pg_stats.erase(ack->get_tid());
+ dout(20) << __func__ << " still pending: " << outstanding_pg_stats << dendl;
pg_stat_queue_lock.Unlock();