{
map<int,HeartbeatInfo>::iterator i = heartbeat_peers.find(from);
if (i != heartbeat_peers.end()) {
- if (m->get_connection() == i->second.con_back) {
- dout(25) << "handle_osd_ping got reply from osd." << from
- << " first_tx " << i->second.first_tx
- << " last_tx " << i->second.last_tx
- << " last_rx_back " << i->second.last_rx_back << " -> " << m->stamp
- << " last_rx_front " << i->second.last_rx_front
- << dendl;
- i->second.last_rx_back = m->stamp;
- // if there is no front con, set both stamps.
- if (i->second.con_front == NULL)
- i->second.last_rx_front = m->stamp;
- } else if (m->get_connection() == i->second.con_front) {
- dout(25) << "handle_osd_ping got reply from osd." << from
- << " first_tx " << i->second.first_tx
- << " last_tx " << i->second.last_tx
- << " last_rx_back " << i->second.last_rx_back
- << " last_rx_front " << i->second.last_rx_front << " -> " << m->stamp
- << dendl;
- i->second.last_rx_front = m->stamp;
- }
+ auto acked = i->second.ping_history.find(m->stamp);
+ if (acked != i->second.ping_history.end()) {
+ utime_t now = ceph_clock_now();
+ int &unacknowledged = acked->second.second;
+ if (m->get_connection() == i->second.con_back) {
+ dout(25) << "handle_osd_ping got reply from osd." << from
+ << " first_tx " << i->second.first_tx
+ << " last_tx " << i->second.last_tx
+ << " last_rx_back " << i->second.last_rx_back << " -> " << now
+ << " last_rx_front " << i->second.last_rx_front
+ << dendl;
+ i->second.last_rx_back = now;
+ assert(unacknowledged > 0);
+ --unacknowledged;
+ // if there is no front con, set both stamps.
+ if (i->second.con_front == NULL) {
+ i->second.last_rx_front = now;
+ assert(unacknowledged > 0);
+ --unacknowledged;
+ }
+ } else if (m->get_connection() == i->second.con_front) {
+ dout(25) << "handle_osd_ping got reply from osd." << from
+ << " first_tx " << i->second.first_tx
+ << " last_tx " << i->second.last_tx
+ << " last_rx_back " << i->second.last_rx_back
+ << " last_rx_front " << i->second.last_rx_front << " -> " << now
+ << dendl;
+ i->second.last_rx_front = now;
+ assert(unacknowledged > 0);
+ --unacknowledged;
+ }
- utime_t cutoff = ceph_clock_now();
- cutoff -= cct->_conf->osd_heartbeat_grace;
- if (i->second.is_healthy(cutoff)) {
- // Cancel false reports
- auto failure_queue_entry = failure_queue.find(from);
- if (failure_queue_entry != failure_queue.end()) {
- dout(10) << "handle_osd_ping canceling queued "
- << "failure report for osd." << from << dendl;
- failure_queue.erase(failure_queue_entry);
+ if (unacknowledged == 0) {
+ // succeeded in getting all replies
+ dout(25) << "handle_osd_ping got all replies from osd." << from
+ << " , erase pending ping(sent at " << m->stamp << ")"
+ << " and older pending ping(s)"
+ << dendl;
+ i->second.ping_history.erase(i->second.ping_history.begin(), ++acked);
}
- auto failure_pending_entry = failure_pending.find(from);
- if (failure_pending_entry != failure_pending.end()) {
- dout(10) << "handle_osd_ping canceling in-flight "
- << "failure report for osd." << from << dendl;
- send_still_alive(curmap->get_epoch(),
- failure_pending_entry->second.second);
- failure_pending.erase(failure_pending_entry);
+ if (i->second.is_healthy(now)) {
+ // Cancel false reports
+ auto failure_queue_entry = failure_queue.find(from);
+ if (failure_queue_entry != failure_queue.end()) {
+ dout(10) << "handle_osd_ping canceling queued "
+ << "failure report for osd." << from << dendl;
+ failure_queue.erase(failure_queue_entry);
+ }
+
+ auto failure_pending_entry = failure_pending.find(from);
+ if (failure_pending_entry != failure_pending.end()) {
+ dout(10) << "handle_osd_ping canceling in-flight "
+ << "failure report for osd." << from << dendl;
+ send_still_alive(curmap->get_epoch(),
+ failure_pending_entry->second.second);
+ failure_pending.erase(failure_pending_entry);
+ }
}
+ } else {
+ // old replies, deprecated by newly sent pings.
+ dout(10) << "handle_osd_ping no pending ping(sent at " << m->stamp
+ << ") is found, treat as covered by newly sent pings "
+ << "and ignore"
+ << dendl;
}
}
assert(heartbeat_lock.is_locked());
utime_t now = ceph_clock_now();
- // check for heartbeat replies (move me elsewhere?)
- utime_t cutoff = now;
- cutoff -= cct->_conf->osd_heartbeat_grace;
+ // check for incoming heartbeats (move me elsewhere?)
for (map<int,HeartbeatInfo>::iterator p = heartbeat_peers.begin();
p != heartbeat_peers.end();
++p) {
<< " last_rx_back " << p->second.last_rx_back
<< " last_rx_front " << p->second.last_rx_front
<< dendl;
- if (p->second.is_unhealthy(cutoff)) {
+ if (p->second.is_unhealthy(now)) {
+ utime_t oldest_deadline = p->second.ping_history.begin()->second.first;
if (p->second.last_rx_back == utime_t() ||
p->second.last_rx_front == utime_t()) {
- derr << "heartbeat_check: no reply from " << p->second.con_front->get_peer_addr().get_sockaddr()
- << " osd." << p->first << " ever on either front or back, first ping sent "
- << p->second.first_tx << " (cutoff " << cutoff << ")" << dendl;
+ derr << "heartbeat_check: no reply from "
+ << p->second.con_front->get_peer_addr().get_sockaddr()
+ << " osd." << p->first
+ << " ever on either front or back, first ping sent "
+ << p->second.first_tx
+ << " (oldest deadline " << oldest_deadline << ")"
+ << dendl;
// fail
failure_queue[p->first] = p->second.last_tx;
} else {
- derr << "heartbeat_check: no reply from " << p->second.con_front->get_peer_addr().get_sockaddr()
+ derr << "heartbeat_check: no reply from "
+ << p->second.con_front->get_peer_addr().get_sockaddr()
<< " osd." << p->first << " since back " << p->second.last_rx_back
<< " front " << p->second.last_rx_front
- << " (cutoff " << cutoff << ")" << dendl;
+ << " (oldest deadline " << oldest_deadline << ")"
+ << dendl;
// fail
failure_queue[p->first] = std::min(p->second.last_rx_back, p->second.last_rx_front);
}
service.check_full_status(ratio);
utime_t now = ceph_clock_now();
+ utime_t deadline = now;
+ deadline += cct->_conf->osd_heartbeat_grace;
// send heartbeats
for (map<int,HeartbeatInfo>::iterator i = heartbeat_peers.begin();
i->second.last_tx = now;
if (i->second.first_tx == utime_t())
i->second.first_tx = now;
+ i->second.ping_history[now] = make_pair(deadline,
+ HeartbeatInfo::HEARTBEAT_MAX_CONN);
dout(30) << "heartbeat sending ping to osd." << peer << dendl;
i->second.con_back->send_message(new MOSDPing(monc->get_fsid(),
service.get_osdmap_epoch(),
if (is_waiting_for_healthy()) {
Mutex::Locker l(heartbeat_lock);
- utime_t cutoff = ceph_clock_now();
- cutoff -= cct->_conf->osd_heartbeat_grace;
+ utime_t now = ceph_clock_now();
int num = 0, up = 0;
for (map<int,HeartbeatInfo>::iterator p = heartbeat_peers.begin();
p != heartbeat_peers.end();
++p) {
- if (p->second.is_healthy(cutoff))
+ if (p->second.is_healthy(now))
++up;
++num;
}
utime_t last_rx_front; ///< last time we got a ping reply on the front side
utime_t last_rx_back; ///< last time we got a ping reply on the back side
epoch_t epoch; ///< most recent epoch we wanted this peer
+ /// number of connections we send and receive heartbeat pings/replies
+ const static int HEARTBEAT_MAX_CONN = 2;
+ /// history of inflight pings, arranging by timestamp we sent
+ /// send time -> deadline -> remaining replies
+ map<utime_t, pair<utime_t, int>> ping_history;
+
+ bool is_unhealthy(utime_t now) {
+ if (ping_history.empty()) {
+ /// we haven't sent a ping yet or we have got all replies,
+ /// in either way we are safe and healthy for now
+ return false;
+ }
- bool is_unhealthy(utime_t cutoff) const {
- return
- ! ((last_rx_front > cutoff ||
- (last_rx_front == utime_t() && (last_tx == utime_t() ||
- first_tx > cutoff))) &&
- (last_rx_back > cutoff ||
- (last_rx_back == utime_t() && (last_tx == utime_t() ||
- first_tx > cutoff))));
- }
- bool is_healthy(utime_t cutoff) const {
- return last_rx_front > cutoff && last_rx_back > cutoff;
+ utime_t oldest_deadline = ping_history.begin()->second.first;
+ return now > oldest_deadline;
}
+ bool is_healthy(utime_t now) {
+ return !is_unhealthy(now);
+ }
};
/// state attached to outgoing heartbeat connections
struct HeartbeatSession : public RefCountedObject {