op_wq(this, &op_tp),
osdmap(NULL),
map_lock("OSD::map_lock"),
+ peer_map_epoch_lock("OSD::peer_map_epoch_lock"),
map_cache_lock("OSD::map_cache_lock"), map_cache_keep_from(0),
up_thru_wanted(0), up_thru_pending(0),
pg_stat_queue_lock("OSD::pg_stat_queue_lock"),
<< " dropped stat " << m->peer_stat << dendl;
}
+ note_peer_epoch(from, m->map_epoch);
+
heartbeat_from_stamp[from] = g_clock.now(); // don't let _my_ lag interfere.
// remove from failure lists if needed
// --------------------------------------
// dispatch
+epoch_t OSD::get_peer_epoch(int peer)
+{
+ Mutex::Locker l(peer_map_epoch_lock);
+ map<int,epoch_t>::iterator p = peer_map_epoch.find(peer);
+ if (p == peer_map_epoch.end())
+ return 0;
+ return p->second;
+}
+
+epoch_t OSD::note_peer_epoch(int peer, epoch_t e)
+{
+ Mutex::Locker l(peer_map_epoch_lock);
+ map<int,epoch_t>::iterator p = peer_map_epoch.find(peer);
+ if (p != peer_map_epoch.end()) {
+ if (p->second < e) {
+ dout(10) << "note_peer_epoch osd" << peer << " has " << e << dendl;
+ p->second = e;
+ } else {
+ dout(30) << "note_peer_epoch osd" << peer << " has " << p->second << " >= " << e << dendl;
+ }
+ return p->second;
+ } else {
+ dout(10) << "note_peer_epoch osd" << peer << " now has " << e << dendl;
+ peer_map_epoch[peer] = e;
+ return e;
+ }
+}
+
+void OSD::forget_peer_epoch(int peer, epoch_t as_of)
+{
+ Mutex::Locker l(peer_map_epoch_lock);
+ map<int,epoch_t>::iterator p = peer_map_epoch.find(peer);
+ if (p != peer_map_epoch.end()) {
+ if (p->second <= as_of) {
+ dout(10) << "forget_peer_epoch osd" << peer << " as_of " << as_of
+ << " had " << p->second << dendl;
+ peer_map_epoch.erase(p);
+ } else {
+ dout(10) << "forget_peer_epoch osd" << peer << " as_of " << as_of
+ << " has " << p->second << " - not forgetting" << dendl;
+ }
+ }
+}
+
+/*
+ * share incremental maps with peers. it's possible we could send
+ * incremental 3->4 on cluster msgr
+ * incremental 4->5 on heartbeat msgr
+ * and the second incremental would arrive first. who cares. the peer
+ * will figure it out sooner or later.
+ */
+
bool OSD::_share_map_incoming(const entity_inst_t& inst, epoch_t epoch,
Session* session)
{
(osdmap->get_cluster_inst(inst.name.num()) == inst ||
osdmap->get_hb_inst(inst.name.num()) == inst)) {
// remember
- if (peer_map_epoch[inst.name] < epoch) {
- dout(20) << "peer " << inst.name << " has " << epoch << dendl;
- peer_map_epoch[inst.name] = epoch;
- }
-
- // older?
- if (peer_map_epoch[inst.name] < osdmap->get_epoch()) {
+ epoch_t has = note_peer_epoch(inst.name.num(), epoch);
+
+ // share?
+ if (has < osdmap->get_epoch()) {
dout(10) << inst.name << " has old map " << epoch << " < " << osdmap->get_epoch() << dendl;
- peer_map_epoch[inst.name] = osdmap->get_epoch(); // so we don't send it again.
+ note_peer_epoch(inst.name.num(), osdmap->get_epoch());
send_incremental_map(epoch, osdmap->get_cluster_inst(inst.name.num()));
shared = true;
}
{
assert(inst.name.is_osd());
+ int peer = inst.name.num();
+
// send map?
- if (peer_map_epoch.count(inst.name)) {
- epoch_t pe = peer_map_epoch[inst.name];
+ epoch_t pe = get_peer_epoch(peer);
+ if (pe) {
if (pe < osdmap->get_epoch()) {
send_incremental_map(pe, inst);
- peer_map_epoch[inst.name] = osdmap->get_epoch();
+ note_peer_epoch(peer, osdmap->get_epoch());
}
} else {
// no idea about peer's epoch.
* assimilate new OSDMap(s). scan pgs, etc.
*/
-void OSD::note_down_osd(int osd)
+void OSD::note_down_osd(int peer)
{
- cluster_messenger->mark_down(osdmap->get_cluster_addr(osd));
+ cluster_messenger->mark_down(osdmap->get_cluster_addr(peer));
heartbeat_lock.Lock();
// note: update_heartbeat_peers will mark down the heartbeat connection.
- peer_map_epoch.erase(entity_name_t::OSD(osd));
- failure_queue.erase(osd);
- failure_pending.erase(osd);
+ forget_peer_epoch(peer, osdmap->get_epoch());
+
+ failure_queue.erase(peer);
+ failure_pending.erase(peer);
heartbeat_lock.Unlock();
}
-void OSD::note_up_osd(int osd)
+
+void OSD::note_up_osd(int peer)
{
- peer_map_epoch.erase(entity_name_t::OSD(osd));
+ forget_peer_epoch(peer, osdmap->get_epoch() - 1);
}
void OSD::handle_osd_map(MOSDMap *m)