osd_stat.kb_avail = stbuf.f_bavail * stbuf.f_bsize / 1024;
osd_stat.hb_in.clear();
- for (map<int,epoch_t>::iterator p = heartbeat_from.begin(); p != heartbeat_from.end(); p++)
+ for (map<int,HeartbeatInfo>::iterator p = heartbeat_peers.begin(); p != heartbeat_peers.end(); p++)
osd_stat.hb_in.push_back(p->first);
osd_stat.hb_out.clear();
- for (map<int,epoch_t>::iterator p = heartbeat_to.begin(); p != heartbeat_to.end(); p++)
- osd_stat.hb_out.push_back(p->first);
dout(20) << "update_osd_stat " << osd_stat << dendl;
}
-void OSD::_add_heartbeat_source(int p, map<int, epoch_t>& old_from, map<int, utime_t>& old_from_stamp,
- map<int,Connection*>& old_con)
+void OSD::_add_heartbeat_peer(int p)
{
if (p == whoami)
return;
- if (heartbeat_from.count(p))
- return;
-
- heartbeat_from[p] = osdmap->get_epoch();
- Connection *con = hbin_messenger->get_connection(osdmap->get_hb_inst(p));
- heartbeat_from_con[p] = con;
- if (old_from_stamp.count(p) && old_from.count(p) && old_con[p] == con) {
- // have a stamp _AND_ i'm not new to the set
- heartbeat_from_stamp[p] = old_from_stamp[p];
+ HeartbeatInfo *hi;
+
+ map<int,HeartbeatInfo>::iterator i = heartbeat_peers.find(p);
+ if (i == heartbeat_peers.end()) {
+ hi = &heartbeat_peers[p];
+ hi->con = hbin_messenger->get_connection(osdmap->get_hb_inst(p));
+ dout(10) << "_add_heartbeat_peer: new peer osd." << p
+ << " " << hi->con->get_peer_addr() << dendl;
} else {
- dout(10) << "update_heartbeat_peers: new _from osd." << p
- << " " << con->get_peer_addr() << dendl;
- heartbeat_from_stamp[p] = ceph_clock_now(g_ceph_context);
- MOSDPing *m = new MOSDPing(monc->get_fsid(), 0, heartbeat_epoch,
- MOSDPing::START_HEARTBEAT);
- hbin_messenger->send_message(m, con);
+ hi = &i->second;
}
+ hi->epoch = osdmap->get_epoch();
}
void OSD::need_heartbeat_peer_update()
return;
heartbeat_need_update = false;
- // filter heartbeat_from_stamp to only include osds that remain in
- // heartbeat_from.
- map<int, utime_t> old_from_stamp;
- old_from_stamp.swap(heartbeat_from_stamp);
-
- map<int, epoch_t> old_from;
- map<int, Connection*> old_con;
- old_from.swap(heartbeat_from);
- old_con.swap(heartbeat_from_con);
-
heartbeat_epoch = osdmap->get_epoch();
// build heartbeat from set
p != pg->heartbeat_peers.end();
++p)
if (osdmap->is_up(*p))
- _add_heartbeat_source(*p, old_from, old_from_stamp, old_con);
+ _add_heartbeat_peer(*p);
pg->heartbeat_peer_lock.Unlock();
}
- for (map<int,epoch_t>::iterator p = old_from.begin();
- p != old_from.end();
- p++) {
- assert(old_con.count(p->first));
- Connection *con = old_con[p->first];
-
- if (heartbeat_from.count(p->first) && heartbeat_from_con[p->first] == con) {
- con->put();
- continue;
- }
-
- // share latest map with this peer, just to be nice.
- if (osdmap->is_up(p->first) && is_active()) {
- dout(10) << "update_heartbeat_peers: sharing map with old _from peer osd." << p->first << dendl;
- _share_map_outgoing(osdmap->get_cluster_inst(p->first));
- }
-
- dout(10) << "update_heartbeat_peers: will mark down old _from peer osd." << p->first
- << " " << con->get_peer_addr()
- << " as of " << p->second << dendl;
-
- if (!osdmap->is_up(p->first) && is_active()) {
- dout(10) << "update_heartbeat_peers: telling old peer osd." << p->first
- << " " << old_con[p->first]->get_peer_addr()
- << " they are down" << dendl;
- hbin_messenger->send_message(new MOSDPing(monc->get_fsid(), heartbeat_epoch,
- heartbeat_epoch,
- MOSDPing::YOU_DIED), con);
- hbin_messenger->mark_disposable(con);
- hbin_messenger->mark_down_on_empty(con);
+ map<int,HeartbeatInfo>::iterator p = heartbeat_peers.begin();
+ while (p != heartbeat_peers.end()) {
+ if (p->second.epoch < osdmap->get_epoch()) {
+ dout(20) << " removing heartbeat peer osd." << p->first
+ << " " << p->second.con->get_peer_addr()
+ << dendl;
+ hbin_messenger->mark_down(p->second.con);
+ p->second.con->put();
+ heartbeat_peers.erase(p++);
} else {
- // tell them to stop sending heartbeats
- hbin_messenger->send_message(new MOSDPing(monc->get_fsid(), 0, heartbeat_epoch,
- MOSDPing::STOP_HEARTBEAT), con);
- }
- if (!osdmap->is_up(p->first)) {
- forget_peer_epoch(p->first, osdmap->get_epoch());
+ ++p;
}
- con->put();
}
-
- dout(10) << "update_heartbeat_peers: hb to: " << heartbeat_to << dendl;
- dout(10) << "update_heartbeat_peers: hb from: " << heartbeat_from << dendl;
+ dout(10) << "maybe_update_heartbeat_peers " << heartbeat_peers.size() << " peers" << dendl;
}
void OSD::reset_heartbeat_peers()
{
dout(10) << "reset_heartbeat_peers" << dendl;
heartbeat_lock.Lock();
- heartbeat_to.clear();
- heartbeat_from.clear();
- heartbeat_from_stamp.clear();
- while (!heartbeat_to_con.empty()) {
- heartbeat_to_con.begin()->second->put();
- heartbeat_to_con.erase(heartbeat_to_con.begin());
- }
- while (!heartbeat_from_con.empty()) {
- heartbeat_from_con.begin()->second->put();
- heartbeat_from_con.erase(heartbeat_from_con.begin());
+ while (!heartbeat_peers.empty()) {
+ hbin_messenger->mark_down(heartbeat_peers.begin()->second.con);
+ heartbeat_peers.begin()->second.con->put();
+ heartbeat_peers.erase(heartbeat_peers.begin());
}
failure_queue.clear();
heartbeat_lock.Unlock();
-
}
void OSD::handle_osd_ping(MOSDPing *m)
return;
}
- heartbeat_lock.Lock();
int from = m->get_source().num();
+
+ heartbeat_lock.Lock();
+
bool locked = map_lock.try_get_read();
switch (m->op) {
- case MOSDPing::START_HEARTBEAT:
- if (heartbeat_to.count(from)) {
- if (heartbeat_to_con[from] != m->get_connection()) {
- // different connection
- if (heartbeat_to[from] > m->peer_as_of_epoch) {
- dout(5) << "handle_osd_ping marking down peer " << m->get_source_inst()
- << " after old start message from epoch " << m->peer_as_of_epoch
- << " <= current " << heartbeat_to[from] << dendl;
- hbout_messenger->mark_down(m->get_connection());
- } else {
- dout(5) << "handle_osd_ping replacing old peer "
- << heartbeat_to_con[from]->get_peer_addr()
- << " epoch " << heartbeat_to[from]
- << " with new peer " << m->get_source_inst()
- << " epoch " << m->peer_as_of_epoch
- << dendl;
- hbout_messenger->mark_down(heartbeat_to_con[from]);
- heartbeat_to_con[from]->put();
-
- // remember new connection
- heartbeat_to[from] = m->peer_as_of_epoch;
- heartbeat_to_con[from] = m->get_connection();
- heartbeat_to_con[from]->get();
- }
- } else {
- // same connection
- dout(5) << "handle_osd_ping dup peer " << m->get_source_inst()
- << " request for heartbeats as_of " << m->peer_as_of_epoch
- << ", same connection" << dendl;
- heartbeat_to[from] = m->peer_as_of_epoch;
- }
- } else {
- // new
- dout(5) << "handle_osd_ping peer " << m->get_source_inst()
- << " requesting heartbeats as_of " << m->peer_as_of_epoch << dendl;
- heartbeat_to[from] = m->peer_as_of_epoch;
- heartbeat_to_con[from] = m->get_connection();
- heartbeat_to_con[from]->get();
-
- if (locked && m->map_epoch && is_active()) {
- _share_map_incoming(m->get_source_inst(), m->map_epoch,
- (Session*) m->get_connection()->get_priv());
- }
- }
- break;
- case MOSDPing::STOP_HEARTBEAT:
+ case MOSDPing::PING:
{
- if (heartbeat_to.count(from) && heartbeat_to_con[from] == m->get_connection()) {
- dout(5) << "handle_osd_ping peer " << m->get_source_inst()
- << " stopping heartbeats as_of " << m->peer_as_of_epoch << dendl;
- heartbeat_to.erase(from);
- //hbout_messenger->mark_down(heartbeat_to_con[from]);
- heartbeat_to_con[from]->put();
- heartbeat_to_con.erase(from);
- } else {
- dout(5) << "handle_osd_ping peer " << m->get_source_inst()
- << " ignoring stop request as_of " << m->peer_as_of_epoch << dendl;
+ Message *r = new MOSDPing(monc->get_fsid(),
+ locked ? osdmap->get_epoch():0,
+ MOSDPing::PING_REPLY,
+ m->stamp);
+ hbout_messenger->send_message(r, m->get_connection());
+
+ if (osdmap->is_up(from)) {
+ note_peer_epoch(from, m->map_epoch);
+ if (locked && is_active())
+ _share_map_outgoing(osdmap->get_cluster_inst(from));
}
}
break;
- case MOSDPing::HEARTBEAT:
- if (heartbeat_from.count(from) && heartbeat_from_con[from] == m->get_connection()) {
- dout(20) << "handle_osd_ping " << m->get_source_inst() << dendl;
+ case MOSDPing::PING_REPLY:
+ {
+ map<int,HeartbeatInfo>::iterator i = heartbeat_peers.find(from);
+ if (i != heartbeat_peers.end()) {
+ i->second.last_rx = m->stamp;
+ }
- note_peer_epoch(from, m->map_epoch);
- if (locked && is_active())
- _share_map_outgoing(osdmap->get_cluster_inst(from));
-
- heartbeat_from_stamp[from] = ceph_clock_now(g_ceph_context); // don't let _my_ lag interfere.
-
- // remove from failure lists if needed
- if (failure_pending.count(from)) {
- send_still_alive(failure_pending[from]);
- failure_pending.erase(from);
+ if (osdmap->is_up(from)) {
+ note_peer_epoch(from, m->map_epoch);
+ if (locked && is_active())
+ _share_map_outgoing(osdmap->get_cluster_inst(from));
}
- failure_queue.erase(from);
- } else {
- dout(10) << "handle_osd_ping ignoring " << m->get_source_inst() << dendl;
}
break;
heartbeat_lock.Unlock();
m->put();
-
}
void OSD::heartbeat_entry()
// we should also have map_lock rdlocked.
// check for incoming heartbeats (move me elsewhere?)
- utime_t grace = ceph_clock_now(g_ceph_context);
- grace -= g_conf->osd_heartbeat_grace;
- for (map<int, epoch_t>::iterator p = heartbeat_from.begin();
- p != heartbeat_from.end();
+ utime_t cutoff = ceph_clock_now(g_ceph_context);
+ cutoff -= g_conf->osd_heartbeat_grace;
+ for (map<int,HeartbeatInfo>::iterator p = heartbeat_peers.begin();
+ p != heartbeat_peers.end();
p++) {
- if (heartbeat_from_stamp.count(p->first) &&
- heartbeat_from_stamp[p->first] < grace) {
- derr << "heartbeat_check: no heartbeat from osd." << p->first
- << " since " << heartbeat_from_stamp[p->first]
- << " (cutoff " << grace << ")" << dendl;
- queue_failure(p->first);
-
+ if (p->second.last_rx == utime_t()) {
+ if (p->second.last_tx > cutoff)
+ continue; // just started sending recently
+ derr << "heartbeat_check: no reply from osd." << p->first
+ << " ever, first ping sent " << p->second.first_tx
+ << " (cutoff " << cutoff << ")" << dendl;
+ } else {
+ if (p->second.last_rx > cutoff)
+ continue; // got recent reply
+ derr << "heartbeat_check: no reply from osd." << p->first
+ << " since " << p->second.last_rx
+ << " (cutoff " << cutoff << ")" << dendl;
}
+
+ // fail!
+ queue_failure(p->first);
}
}
void OSD::heartbeat()
{
- utime_t now = ceph_clock_now(g_ceph_context);
-
dout(30) << "heartbeat" << dendl;
// get CPU load avg
bool map_locked = map_lock.try_get_read();
dout(30) << "heartbeat map_locked=" << map_locked << dendl;
+ utime_t now = ceph_clock_now(g_ceph_context);
+
// send heartbeats
- for (map<int, epoch_t>::iterator i = heartbeat_to.begin();
- i != heartbeat_to.end();
+ for (map<int,HeartbeatInfo>::iterator i = heartbeat_peers.begin();
+ i != heartbeat_peers.end();
i++) {
int peer = i->first;
dout(30) << "heartbeat allocating ping for osd." << peer << dendl;
Message *m = new MOSDPing(monc->get_fsid(),
map_locked ? osdmap->get_epoch():0,
- i->second, MOSDPing::HEARTBEAT);
- m->set_priority(CEPH_MSG_PRIO_HIGH);
+ MOSDPing::PING,
+ now);
+ i->second.last_tx = now;
+ if (i->second.first_tx == utime_t())
+ i->second.first_tx = now;
dout(30) << "heartbeat sending ping to osd." << peer << dendl;
- hbout_messenger->send_message(m, heartbeat_to_con[peer]);
+ hbout_messenger->send_message(m, i->second.con);
}
if (map_locked) {
}
if (logger) {
- logger->set(l_osd_hb_to, heartbeat_to.size());
- logger->set(l_osd_hb_from, heartbeat_from.size());
+ logger->set(l_osd_hb_to, heartbeat_peers.size());
+ logger->set(l_osd_hb_from, 0);
}
// hmm.. am i all alone?
dout(30) << "heartbeat lonely?" << dendl;
- if (heartbeat_from.empty() || heartbeat_to.empty()) {
+ if (heartbeat_peers.empty()) {
if (now - last_mon_heartbeat > g_conf->osd_mon_heartbeat_interval && is_active()) {
last_mon_heartbeat = now;
dout(10) << "i have no heartbeat peers; checking mon for new map" << dendl;
cluster_messenger->mark_down(osdmap->get_cluster_addr(peer));
heartbeat_lock.Lock();
-
- // clean out down osds i am sending heartbeats _to_.
- // update_heartbeat_peers() will clean out peers i expect heartbeast _from_.
- if (heartbeat_to.count(peer) &&
- heartbeat_to[peer] < osdmap->get_epoch()) {
- dout(10) << "note_down_osd osd." << peer << " marking down hbout connection "
- << heartbeat_to_con[peer]->get_peer_addr() << dendl;
- hbout_messenger->mark_down(heartbeat_to_con[peer]);
- heartbeat_to_con[peer]->put();
- heartbeat_to_con.erase(peer);
- heartbeat_to.erase(peer);
- }
-
failure_queue.erase(peer);
failure_pending.erase(peer);
-
heartbeat_lock.Unlock();
}