old_from_stamp.swap(heartbeat_from_stamp);
map<int, epoch_t> old_to, old_from;
- map<int, entity_inst_t> old_inst;
+ map<int, Connection*> old_con;
old_to.swap(heartbeat_to);
old_from.swap(heartbeat_from);
- old_inst.swap(heartbeat_inst);
+ old_con.swap(heartbeat_con);
utime_t now = g_clock.now();
p != old_to.end();
p++) {
if (p->second > osdmap->get_epoch()) {
- dout(10) << "update_heartbeat_peers: keeping newer _to peer " << old_inst[p->first]
+ dout(10) << "update_heartbeat_peers: keeping newer _to peer osd" << p->first
+ << " " << old_con[p->first]->get_peer_addr()
<< " as of " << p->second << dendl;
heartbeat_to[p->first] = p->second;
- heartbeat_inst[p->first] = old_inst[p->first];
+ heartbeat_con[p->first] = old_con[p->first];
}
}
if (heartbeat_to.count(p))
continue;
heartbeat_to[p] = osdmap->get_epoch();
- heartbeat_inst[p] = osdmap->get_hb_inst(p);
- if (old_to.count(p) == 0 || old_inst[p] != heartbeat_inst[p])
- dout(10) << "update_heartbeat_peers: new _to osd" << p << " " << heartbeat_inst[p] << dendl;
+ heartbeat_con[p] = heartbeat_messenger->get_connection(osdmap->get_hb_inst(p));
+ if (old_to.count(p) == 0 || old_con[p] != heartbeat_con[p])
+ dout(10) << "update_heartbeat_peers: new _to osd" << p
+ << " " << heartbeat_con[p]->get_peer_addr() << dendl;
}
else if (pg->get_role() == 0) {
assert(pg->acting[0] == whoami);
if (heartbeat_from.count(p))
continue;
heartbeat_from[p] = osdmap->get_epoch();
- heartbeat_inst[p] = osdmap->get_hb_inst(p);
+ heartbeat_con[p] = heartbeat_messenger->get_connection(osdmap->get_hb_inst(p));
if (old_from_stamp.count(p) && old_from.count(p) &&
- old_inst[p] == heartbeat_inst[p]) {
+ old_con[p] == heartbeat_con[p]) {
// have a stamp _AND_ i'm not new to the set
heartbeat_from_stamp[p] = old_from_stamp[p];
} else {
- dout(10) << "update_heartbeat_peers: new _from osd" << p << " " << heartbeat_inst[p] << dendl;
+ dout(10) << "update_heartbeat_peers: new _from osd" << p
+ << " " << heartbeat_con[p]->get_peer_addr() << dendl;
heartbeat_from_stamp[p] = now;
- MOSDPing *m = new MOSDPing(osdmap->get_fsid(), 0, heartbeat_epoch, my_stat, true); // request hb
- heartbeat_messenger->send_message(m, heartbeat_inst[p]);
+ MOSDPing *m = new MOSDPing(osdmap->get_fsid(), 0, heartbeat_epoch, my_stat,
+ MOSDPing::REQUEST_HEARTBEAT);
+ heartbeat_messenger->send_message(m, heartbeat_con[p]);
}
}
}
}
+
+ map<int, Connection*> down;
+
for (map<int,epoch_t>::iterator p = old_to.begin();
p != old_to.end();
p++) {
- assert(old_inst.count(p->first));
- if (heartbeat_to.count(p->first) && heartbeat_inst[p->first] == old_inst[p->first])
+ assert(old_con.count(p->first));
+ if (heartbeat_to.count(p->first) && heartbeat_con[p->first] == old_con[p->first])
continue;
assert(p->second <= osdmap->get_epoch());
// share latest map with this peer so they know not to expect
// heartbeats from us. otherwise they may mark us down!
- dout(10) << "update_heartbeat_peers: sharing map with old _to peer " << old_inst[p->first]
- << " as of " << p->second << dendl;
- _share_map_outgoing(old_inst[p->first]);
+ if (osdmap->is_up(p->first)) {
+ dout(10) << "update_heartbeat_peers: sharing map with old _to peer osd" << p->first << dendl;
+ _share_map_outgoing(osdmap->get_cluster_inst(p->first));
+ }
- if (heartbeat_from.count(p->first) && old_inst[p->first] == heartbeat_inst[p->first]) {
- dout(10) << "update_heartbeat_peers: old _to peer " << old_inst[p->first]
+ if (heartbeat_from.count(p->first) && old_con[p->first] == heartbeat_con[p->first]) {
+ dout(10) << "update_heartbeat_peers: old _to peer osd" << p->first
+ << " " << old_con[p->first]->get_peer_addr()
<< " is still a _from peer, not marking down" << dendl;
} else {
- dout(10) << "update_heartbeat_peers: marking down old _to peer " << old_inst[p->first]
+ dout(10) << "update_heartbeat_peers: will mark down old _to peer osd" << p->first
+ << " " << old_con[p->first]->get_peer_addr()
<< " as of " << p->second << dendl;
- Connection *con = heartbeat_messenger->get_connection(old_inst[p->first]);
- heartbeat_messenger->mark_disposable(con);
- heartbeat_messenger->mark_down_on_empty(con);
- con->put();
- if (!osdmap->is_up(p->first))
- forget_peer_epoch(p->first, osdmap->get_epoch());
+ down[p->first] = old_con[p->first];
}
}
for (map<int,epoch_t>::iterator p = old_from.begin();
p != old_from.end();
p++) {
- assert(old_inst.count(p->first));
- if (heartbeat_from.count(p->first) && heartbeat_inst[p->first] == old_inst[p->first])
+ assert(old_con.count(p->first));
+ if (heartbeat_from.count(p->first) && heartbeat_con[p->first] == old_con[p->first])
continue;
// share latest map with this peer, just to be nice.
- dout(10) << "update_heartbeat_peers: sharing map with old _from peer " << old_inst[p->first]
- << dendl;
- _share_map_outgoing(old_inst[p->first]);
+ if (osdmap->is_up(p->first)) {
+ dout(10) << "update_heartbeat_peers: sharing map with old _from peer osd" << p->first << dendl;
+ _share_map_outgoing(osdmap->get_cluster_inst(p->first));
+ }
- if (heartbeat_to.count(p->first) && old_inst[p->first] == heartbeat_inst[p->first]) {
- dout(10) << "update_heartbeat_peers: old _from peer " << old_inst[p->first]
+ if (heartbeat_to.count(p->first) && old_con[p->first] == heartbeat_con[p->first]) {
+ dout(10) << "update_heartbeat_peers: old _from peer osd" << p->first
+ << " " << old_con[p->first]->get_peer_addr()
<< " is still a _to peer, not marking down" << dendl;
} else {
- dout(10) << "update_heartbeat_peers: marking down old _from peer " << old_inst[p->first]
+ dout(10) << "update_heartbeat_peers: will mark down old _from peer osd" << p->first
+ << " " << old_con[p->first]->get_peer_addr()
<< " as of " << p->second << dendl;
- Connection *con = heartbeat_messenger->get_connection(old_inst[p->first]);
- heartbeat_messenger->mark_disposable(con);
- heartbeat_messenger->mark_down_on_empty(con);
- con->put();
- if (!osdmap->is_up(p->first))
- forget_peer_epoch(p->first, osdmap->get_epoch());
+ down[p->first] = old_con[p->first];
}
}
+ for (map<int, Connection*>::iterator p = down.begin(); p != down.end(); ++p) {
+ Connection *con = p->second;
+ heartbeat_messenger->mark_disposable(con);
+ if (!osdmap->is_up(p->first)) {
+ dout(10) << "update_heartbeat_peers: telling old peer osd" << p->first
+ << " " << old_con[p->first]->get_peer_addr()
+ << " they are down" << dendl;
+ heartbeat_messenger->send_message(new MOSDPing(osdmap->get_fsid(), heartbeat_epoch,
+ heartbeat_epoch, my_stat,
+ MOSDPing::YOU_DIED), con);
+ }
+ heartbeat_messenger->mark_down_on_empty(con);
+ con->put();
+ if (!osdmap->is_up(p->first))
+ forget_peer_epoch(p->first, osdmap->get_epoch());
+ }
dout(10) << "update_heartbeat_peers: hb to: " << heartbeat_to << dendl;
dout(10) << "update_heartbeat_peers: hb from: " << heartbeat_from << dendl;
heartbeat_to.clear();
heartbeat_from.clear();
heartbeat_from_stamp.clear();
- heartbeat_inst.clear();
+ while (!heartbeat_con.empty()) {
+ heartbeat_con.begin()->second->put();
+ heartbeat_con.erase(heartbeat_con.begin());
+ }
failure_queue.clear();
heartbeat_lock.Unlock();
bool locked = map_lock.try_get_read();
- if (m->ack) {
+ switch (m->op) {
+ case MOSDPing::REQUEST_HEARTBEAT:
if (heartbeat_to.count(from) && m->peer_as_of_epoch <= heartbeat_to[from]) {
dout(5) << "handle_osd_ping ignoring peer " << m->get_source_inst()
<< " request for heartbeats as_of " << m->peer_as_of_epoch
dout(5) << "handle_osd_ping peer " << m->get_source_inst()
<< " requesting heartbeats as_of " << m->peer_as_of_epoch << dendl;
heartbeat_to[from] = m->peer_as_of_epoch;
- heartbeat_inst[from] = m->get_source_inst();
+ heartbeat_con[from] = m->get_connection();
+ heartbeat_con[from]->get();
if (locked && m->map_epoch && !is_booting())
_share_map_incoming(m->get_source_inst(), m->map_epoch,
(Session*) m->get_connection()->get_priv());
}
- }
+ break;
- if (heartbeat_from.count(from) &&
- heartbeat_inst[from] == m->get_source_inst()) {
- // only take peer stat or share map now if map_lock is uncontended
- if (locked) {
- dout(20) << "handle_osd_ping " << m->get_source_inst()
- << " took stat " << m->peer_stat << dendl;
- if (m->map_epoch && !is_booting())
- _share_map_incoming(m->get_source_inst(), m->map_epoch,
- (Session*) m->get_connection()->get_priv());
- take_peer_stat(from, m->peer_stat); // only with map_lock held!
+ case MOSDPing::HEARTBEAT:
+ if (heartbeat_from.count(from) &&
+ heartbeat_con[from] == m->get_connection()) {
+ // only take peer stat or share map now if map_lock is uncontended
+ if (locked) {
+ dout(20) << "handle_osd_ping " << m->get_source_inst()
+ << " took stat " << m->peer_stat << dendl;
+ if (m->map_epoch && !is_booting())
+ _share_map_incoming(m->get_source_inst(), m->map_epoch,
+ (Session*) m->get_connection()->get_priv());
+ take_peer_stat(from, m->peer_stat); // only with map_lock held!
+ } else {
+ dout(20) << "handle_osd_ping " << m->get_source_inst()
+ << " dropped stat " << m->peer_stat << dendl;
+ }
+
+ note_peer_epoch(from, m->map_epoch);
+
+ heartbeat_from_stamp[from] = g_clock.now(); // don't let _my_ lag interfere.
+
+ // remove from failure lists if needed
+ if (failure_pending.count(from)) {
+ send_still_alive(from);
+ failure_pending.erase(from);
+ }
+ failure_queue.erase(from);
} else {
- dout(20) << "handle_osd_ping " << m->get_source_inst()
- << " dropped stat " << m->peer_stat << dendl;
+ dout(10) << "handle_osd_ping ignoring " << m->get_source_inst() << dendl;
}
+ break;
- note_peer_epoch(from, m->map_epoch);
-
- heartbeat_from_stamp[from] = g_clock.now(); // don't let _my_ lag interfere.
-
- // remove from failure lists if needed
- if (failure_pending.count(from)) {
- send_still_alive(from);
- failure_pending.erase(from);
- }
- failure_queue.erase(from);
- } else {
- dout(10) << "handle_osd_ping ignoring " << m->get_source_inst() << dendl;
+ case MOSDPing::YOU_DIED:
+ dout(10) << "handle_osd_ping " << m->get_source_inst() << " says i am down in " << m->map_epoch
+ << dendl;
+ monc->sub_want("osdmap", m->map_epoch, CEPH_SUBSCRIBE_ONETIME);
+ monc->renew_subs();
+ break;
}
if (locked)
heartbeat_lock.Unlock();
m->put();
+
}
void OSD::heartbeat_entry()
i != heartbeat_to.end();
i++) {
int peer = i->first;
- if (heartbeat_inst.count(peer)) {
+ if (heartbeat_con.count(peer)) {
my_stat_on_peer[peer] = my_stat;
dout(30) << "heartbeat allocating ping for osd" << peer << dendl;
Message *m = new MOSDPing(osdmap->get_fsid(),
my_stat);
m->set_priority(CEPH_MSG_PRIO_HIGH);
dout(30) << "heartbeat sending ping to osd" << peer << dendl;
- heartbeat_messenger->send_message(m, heartbeat_inst[peer]);
+ heartbeat_messenger->send_message(m, heartbeat_con[peer]);
}
}
}
}
-/*
- * share incremental maps with peers. it's possible we could send
- * incremental 3->4 on cluster msgr
- * incremental 4->5 on heartbeat msgr
- * and the second incremental would arrive first. who cares. the peer
- * will figure it out sooner or later.
- */
bool OSD::_share_map_incoming(const entity_inst_t& inst, epoch_t epoch,
Session* session)
if (pe < osdmap->get_epoch()) {
send_incremental_map(pe, inst);
note_peer_epoch(peer, osdmap->get_epoch());
- }
+ } else
+ dout(20) << "_share_map_outgoing " << inst << " already has epoch " << pe << dendl;
} else {
+ dout(20) << "_share_map_outgoing " << inst << " don't know epoch, doing nothing" << dendl;
// no idea about peer's epoch.
// ??? send recent ???
// do nothing.
}
-void OSD::send_incremental_map(epoch_t since, const entity_inst_t& inst, bool lazy)
+MOSDMap *OSD::build_incremental_map_msg(epoch_t since)
{
- dout(10) << "send_incremental_map " << since << " -> " << osdmap->get_epoch()
- << " to " << inst << dendl;
-
MOSDMap *m = new MOSDMap(monc->get_fsid());
for (epoch_t e = osdmap->get_epoch();
assert(0); // we should have all maps.
}
}
+ return m;
+}
+
+void OSD::send_incremental_map(epoch_t since, const entity_inst_t& inst, bool lazy)
+{
+ dout(10) << "send_incremental_map " << since << " -> " << osdmap->get_epoch()
+ << " to " << inst << dendl;
+
+ MOSDMap *m = build_incremental_map_msg(since);
Messenger *msgr = client_messenger;
if (entity_name_t::TYPE_OSD == inst.name._type)
msgr = cluster_messenger;