map<int,HeartbeatInfo>::iterator i = heartbeat_peers.find(p);
if (i == heartbeat_peers.end()) {
- ConnectionRef con = service.get_con_osd_hb(p, osdmap->get_epoch());
- if (!con)
+ pair<ConnectionRef,ConnectionRef> cons = service.get_con_osd_hb(p, osdmap->get_epoch());
+ if (!cons.first)
return;
hi = &heartbeat_peers[p];
- hi->con = con.get();
- hi->con->get();
hi->peer = p;
- hi->con->set_priv(new HeartbeatSession(p));
+ HeartbeatSession *s = new HeartbeatSession(p);
+ hi->con_back = cons.first.get();
+ hi->con_back->get();
+ hi->con_back->set_priv(s);
+ if (cons.second) {
+ hi->con_front = cons.second.get();
+ hi->con_front->get();
+ hi->con_front->set_priv(s->get());
+ }
dout(10) << "_add_heartbeat_peer: new peer osd." << p
- << " " << hi->con->get_peer_addr() << dendl;
+ << " " << hi->con_back->get_peer_addr()
+ << " " << (hi->con_front ? hi->con_front->get_peer_addr() : entity_addr_t())
+ << dendl;
} else {
hi = &i->second;
}
while (p != heartbeat_peers.end()) {
if (p->second.epoch < osdmap->get_epoch()) {
dout(20) << " removing heartbeat peer osd." << p->first
- << " " << p->second.con->get_peer_addr()
+ << " " << p->second.con_back->get_peer_addr()
+ << " " << (p->second.con_front ? p->second.con_front->get_peer_addr() : entity_addr_t())
<< dendl;
- hbclient_messenger->mark_down(p->second.con);
- p->second.con->put();
+ hbclient_messenger->mark_down(p->second.con_back);
+ p->second.con_back->put();
+ if (p->second.con_front) {
+ hbclient_messenger->mark_down(p->second.con_front);
+ p->second.con_front->put();
+ }
heartbeat_peers.erase(p++);
} else {
++p;
dout(10) << "reset_heartbeat_peers" << dendl;
Mutex::Locker l(heartbeat_lock);
while (!heartbeat_peers.empty()) {
- hbclient_messenger->mark_down(heartbeat_peers.begin()->second.con);
- heartbeat_peers.begin()->second.con->put();
+ HeartbeatInfo& hi = heartbeat_peers.begin()->second;
+ hbclient_messenger->mark_down(hi.con_back);
+ hi.con_back->put();
+ if (hi.con_front) {
+ hbclient_messenger->mark_down(hi.con_front);
+ hi.con_front->put();
+ }
heartbeat_peers.erase(heartbeat_peers.begin());
}
failure_queue.clear();
curmap->get_epoch(),
MOSDPing::PING_REPLY,
m->stamp);
- hb_back_server_messenger->send_message(r, m->get_connection());
+ m->get_connection()->get_messenger()->send_message(r, m->get_connection());
if (curmap->is_up(from)) {
note_peer_epoch(from, m->map_epoch);
{
map<int,HeartbeatInfo>::iterator i = heartbeat_peers.find(from);
if (i != heartbeat_peers.end()) {
- dout(25) << "handle_osd_ping got reply from osd." << from
- << " first_rx " << i->second.first_tx
- << " last_tx " << i->second.last_tx
- << " last_rx " << i->second.last_rx << " -> " << m->stamp
- << dendl;
- i->second.last_rx = m->stamp;
+ if (m->get_connection() == i->second.con_back) {
+ dout(25) << "handle_osd_ping got reply from osd." << from
+ << " first_rx " << i->second.first_tx
+ << " last_tx " << i->second.last_tx
+ << " last_rx_back " << i->second.last_rx_back << " -> " << m->stamp
+ << " last_rx_front " << i->second.last_rx_front
+ << dendl;
+ i->second.last_rx_back = m->stamp;
+ // if there is no front con, set both stamps.
+ if (i->second.con_front == NULL)
+ i->second.last_rx_front = m->stamp;
+ } else if (m->get_connection() == i->second.con_front) {
+ dout(25) << "handle_osd_ping got reply from osd." << from
+ << " first_rx " << i->second.first_tx
+ << " last_tx " << i->second.last_tx
+ << " last_rx_back " << i->second.last_rx_back
+ << " last_rx_front " << i->second.last_rx_front << " -> " << m->stamp
+ << dendl;
+ i->second.last_rx_front = m->stamp;
+ }
}
if (m->map_epoch &&
}
}
- // Cancel false reports
- if (failure_queue.count(from))
- failure_queue.erase(from);
- if (failure_pending.count(from)) {
- send_still_alive(curmap->get_epoch(), failure_pending[from]);
- failure_pending.erase(from);
+ utime_t cutoff = ceph_clock_now(g_ceph_context);
+ cutoff -= g_conf->osd_heartbeat_grace;
+ if (i->second.is_healthy(cutoff)) {
+ // Cancel false reports
+ if (failure_queue.count(from)) {
+ dout(10) << "handle_osd_ping canceling queued failure report for osd." << from<< dendl;
+ failure_queue.erase(from);
+ }
+ if (failure_pending.count(from)) {
+ dout(10) << "handle_osd_ping canceling in-flight failure report for osd." << from<< dendl;
+ send_still_alive(curmap->get_epoch(), failure_pending[from]);
+ failure_pending.erase(from);
+ }
}
}
break;
dout(25) << "heartbeat_check osd." << p->first
<< " first_tx " << p->second.first_tx
<< " last_tx " << p->second.last_tx
- << " last_rx " << p->second.last_rx
+ << " last_rx_back " << p->second.last_rx_back
+ << " last_rx_front " << p->second.last_rx_front
<< dendl;
- if (p->second.last_rx == utime_t()) {
- if (p->second.last_tx == utime_t() ||
- p->second.first_tx > cutoff)
- continue; // just started sending recently
- derr << "heartbeat_check: no reply from osd." << p->first
- << " ever, first ping sent " << p->second.first_tx
- << " (cutoff " << cutoff << ")" << dendl;
-
- // fail
- failure_queue[p->first] = p->second.last_tx;
- } else {
- if (p->second.last_rx > cutoff)
- continue; // got recent reply
- derr << "heartbeat_check: no reply from osd." << p->first
- << " since " << p->second.last_rx
- << " (cutoff " << cutoff << ")" << dendl;
-
- // fail
- failure_queue[p->first] = p->second.last_rx;
+ if (!p->second.is_healthy(cutoff)) {
+ if (p->second.last_rx_back == utime_t() ||
+ p->second.last_rx_front == utime_t()) {
+ derr << "heartbeat_check: no reply from osd." << p->first
+ << " ever on either front or back, first ping sent " << p->second.first_tx
+ << " (cutoff " << cutoff << ")" << dendl;
+ // fail
+ failure_queue[p->first] = p->second.last_tx;
+ } else {
+ derr << "heartbeat_check: no reply from osd." << p->first
+ << " since back " << p->second.last_rx_back
+ << " front " << p->second.last_rx_front
+ << " (cutoff " << cutoff << ")" << dendl;
+ // fail
+ failure_queue[p->first] = MIN(p->second.last_rx_back, p->second.last_rx_front);
+ }
}
}
}
i != heartbeat_peers.end();
++i) {
int peer = i->first;
- dout(30) << "heartbeat allocating ping for osd." << peer << dendl;
- Message *m = new MOSDPing(monc->get_fsid(),
- service.get_osdmap()->get_epoch(),
- MOSDPing::PING,
- now);
i->second.last_tx = now;
if (i->second.first_tx == utime_t())
i->second.first_tx = now;
dout(30) << "heartbeat sending ping to osd." << peer << dendl;
- hbclient_messenger->send_message(m, i->second.con);
+ hbclient_messenger->send_message(new MOSDPing(monc->get_fsid(),
+ service.get_osdmap()->get_epoch(),
+ MOSDPing::PING,
+ now),
+ i->second.con_back);
+ if (i->second.con_front)
+ hbclient_messenger->send_message(new MOSDPing(monc->get_fsid(),
+ service.get_osdmap()->get_epoch(),
+ MOSDPing::PING,
+ now),
+ i->second.con_front);
}
dout(30) << "heartbeat check" << dendl;
}
map<int,HeartbeatInfo>::iterator p = heartbeat_peers.find(s->peer);
if (p != heartbeat_peers.end() &&
- p->second.con == con) {
- ConnectionRef newcon = service.get_con_osd_hb(p->second.peer, p->second.epoch);
- if (!newcon) {
+ p->second.con_back == con) {
+ pair<ConnectionRef,ConnectionRef> newcon = service.get_con_osd_hb(p->second.peer, p->second.epoch);
+ if (!newcon.first) {
dout(10) << "heartbeat_reset reopen failed hb con " << con << " but failed to reopen" << dendl;
} else {
dout(10) << "heartbeat_reset reopen failed hb con " << con << dendl;
- p->second.con = newcon.get();
- p->second.con->get();
- p->second.con->set_priv(s);
+ hbclient_messenger->mark_down(p->second.con_back);
+ p->second.con_back = newcon.first.get();
+ p->second.con_back->get();
+ p->second.con_back->set_priv(s);
+ if (p->second.con_front)
+ hbclient_messenger->mark_down(p->second.con_front);
+ if (newcon.second) {
+ p->second.con_front = newcon.second.get();
+ p->second.con_front->get();
+ p->second.con_front->set_priv(s->get());
+ } else {
+ p->second.con_front = NULL;
+ }
}
} else {
dout(10) << "heartbeat_reset closing (old) failed hb con " << con << dendl;
+ hbclient_messenger->mark_down(con);
}
- hbclient_messenger->mark_down(con);
heartbeat_lock.Unlock();
s->put();
}
return ret;
}
-ConnectionRef OSDService::get_con_osd_hb(int peer, epoch_t from_epoch)
+pair<ConnectionRef,ConnectionRef> OSDService::get_con_osd_hb(int peer, epoch_t from_epoch)
{
Mutex::Locker l(pre_publish_lock);
// service map is always newer/newest
assert(from_epoch <= next_osdmap->get_epoch());
+ pair<ConnectionRef,ConnectionRef> ret;
if (next_osdmap->is_down(peer) ||
next_osdmap->get_info(peer).up_from > from_epoch) {
- return NULL;
+ return ret;
}
- ConnectionRef ret(
- osd->hbclient_messenger->get_connection(next_osdmap->get_hb_back_inst(peer)));
- ret->put(); // Ref from get_connection
+ ret.first = osd->hbclient_messenger->get_connection(next_osdmap->get_hb_back_inst(peer));
+ ret.first->put(); // Ref from get_connection
+ ret.second = osd->hbclient_messenger->get_connection(next_osdmap->get_hb_front_inst(peer));
+ if (ret.second)
+ ret.second->put(); // Ref from get_connection
return ret;
}
failure_pending.erase(peer);
map<int,HeartbeatInfo>::iterator p = heartbeat_peers.find(peer);
if (p != heartbeat_peers.end()) {
- hbclient_messenger->mark_down(p->second.con);
- p->second.con->put();
+ hbclient_messenger->mark_down(p->second.con_back);
+ p->second.con_back->put();
+ if (p->second.con_front) {
+ hbclient_messenger->mark_down(p->second.con_back);
+ p->second.con_front->put();
+ }
heartbeat_peers.erase(p);
}
heartbeat_lock.Unlock();