msgr->start();
msgr->add_dispatcher_tail(this);
+ started_at = ceph_clock_now();
+
return 0;
}
switch (m->get_type()) {
case MSG_PGSTATS:
cluster_state.ingest_pgstats(static_cast<MPGStats*>(m));
+ maybe_ready(m->get_source().num());
m->put();
return true;
case MSG_MGR_REPORT:
};
}
+void DaemonServer::maybe_ready(int32_t osd_id)
+{
+ if (!pgmap_ready && reported_osds.find(osd_id) == reported_osds.end()) {
+ dout(4) << "initial report from osd " << osd_id << dendl;
+ reported_osds.insert(osd_id);
+ std::set<int32_t> up_osds;
+
+ cluster_state.with_osdmap([&](const OSDMap& osdmap) {
+ osdmap.get_up_osds(up_osds);
+ });
+
+ std::set<int32_t> unreported_osds;
+ std::set_difference(up_osds.begin(), up_osds.end(),
+ reported_osds.begin(), reported_osds.end(),
+ std::inserter(unreported_osds, unreported_osds.begin()));
+
+ if (unreported_osds.size() == 0) {
+ dout(4) << "all osds have reported, sending PG state to mon" << dendl;
+ pgmap_ready = true;
+ reported_osds.clear();
+ // Avoid waiting for next tick
+ send_report();
+ } else {
+ dout(4) << "still waiting for " << unreported_osds.size() << " osds"
+ " to report in before PGMap is ready" << dendl;
+ }
+ }
+}
+
void DaemonServer::shutdown()
{
dout(10) << "begin" << dendl;
void DaemonServer::send_report()
{
+ if (!pgmap_ready) {
+ if (ceph_clock_now() - started_at > g_conf->mgr_stats_period * 4.0) {
+ pgmap_ready = true;
+ reported_osds.clear();
+ dout(1) << "Giving up on OSDs that haven't reported yet, sending "
+ << "potentially incomplete PG state to mon" << dendl;
+ } else {
+ dout(1) << "Not sending PG status to monitor yet, waiting for OSDs"
+ << dendl;
+ return;
+ }
+ }
+
auto m = new MMonMgrReport();
cluster_state.with_pgmap([&](const PGMap& pg_map) {
cluster_state.update_delta_stats();