osd_logtype.add_inc("rlnum");
osd_logtype.add_set("numpg");
- osd_logtype.add_set("pingset");
+ osd_logtype.add_set("hbto");
+ osd_logtype.add_set("hbfrom");
osd_logtype.add_set("buf");
peer_stat[peer] = stat;
}
+void OSD::update_heartbeat_sets()
+{
+ // build heartbeat to/from set
+ heartbeat_to.clear();
+ heartbeat_from.clear();
+ for (hash_map<pg_t, PG*>::iterator i = pg_map.begin();
+ i != pg_map.end();
+ i++) {
+ PG *pg = i->second;
+
+ // replicas ping primary.
+ if (pg->get_role() > 0) {
+ assert(pg->acting.size() > 1);
+ heartbeat_to.insert(pg->acting[0]);
+ }
+ else if (pg->get_role() == 0) {
+ assert(pg->acting[0] == whoami);
+ for (unsigned i=1; i<pg->acting.size(); i++) {
+ assert(pg->acting[i] != whoami);
+ heartbeat_from.insert(pg->acting[i]);
+ }
+ }
+ }
+ dout(10) << "hb to: " << heartbeat_to << dendl;
+ dout(10) << "hb from: " << heartbeat_from << dendl;
+}
+
void OSD::heartbeat()
{
utime_t now = g_clock.now();
- utime_t since = now;
- since.sec_ref() -= g_conf.osd_heartbeat_interval;
// get CPU load avg
ifstream in("/proc/loadavg");
// calc my stats
Mutex::Locker lock(peer_stat_lock);
_refresh_my_stat(now);
+ my_stat_on_peer.clear();
dout(5) << "heartbeat: " << my_stat << dendl;
//load_calc.set_size(stat_ops);
- // send pings
- set<int> pingset;
- for (hash_map<pg_t, PG*>::iterator i = pg_map.begin();
- i != pg_map.end();
- i++) {
- PG *pg = i->second;
-
- // we want to ping the primary.
- if (pg->get_role() <= 0) continue;
- if (pg->acting.size() < 1) continue;
-
- if (pg->last_heartbeat < since) {
- pg->last_heartbeat = now;
- pingset.insert(pg->acting[0]);
- }
- }
- my_stat_on_peer.clear();
- for (set<int>::iterator i = pingset.begin();
- i != pingset.end();
+ // send heartbeats
+ for (set<int>::iterator i = heartbeat_to.begin();
+ i != heartbeat_to.end();
i++) {
_share_map_outgoing( osdmap->get_inst(*i) );
my_stat_on_peer[*i] = my_stat;
osdmap->get_inst(*i));
}
- if (logger) logger->set("pingset", pingset.size());
+ // check for incoming heartbeats (move me elsewhere?)
+ utime_t grace = now;
+ grace -= g_conf.osd_heartbeat_grace;
+ for (set<int>::iterator p = heartbeat_from.begin();
+ p != heartbeat_from.end();
+ p++) {
+ if (heartbeat_from_stamp.count(*p)) {
+ if (heartbeat_from_stamp[*p] < grace) {
+ dout(0) << "no heartbeat from osd" << *p << " since " << heartbeat_from_stamp[*p]
+ << " (cutoff " << grace << ")" << dendl;
+ int mon = monmap->pick_mon();
+ messenger->send_message(new MOSDFailure(messenger->get_myinst(), osdmap->get_inst(*p), osdmap->get_epoch()),
+ monmap->get_inst(mon));
+ }
+ } else
+ heartbeat_from_stamp[*p] = now; // fake initial
+ }
+
+
+ if (logger) logger->set("hbto", heartbeat_to.size());
+ if (logger) logger->set("hbfrom", heartbeat_from.size());
// hack: fake reorg?
if (osdmap && g_conf.fake_osdmap_updates) {
int from = m->get_source().num();
take_peer_stat(from, m->peer_stat);
+ heartbeat_from_stamp[from] = m->get_recv_stamp();
delete m;
}
do_activators(activator_map);
logger->set("numpg", pg_map.size());
+
+ update_heartbeat_sets();
}