]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: rework peer map epoch caching
authorSage Weil <sage.weil@dreamhost.com>
Fri, 20 May 2011 19:55:29 +0000 (12:55 -0700)
committerSage Weil <sage.weil@dreamhost.com>
Fri, 20 May 2011 22:15:12 +0000 (15:15 -0700)
We try to keep track of which epochs our peers have so that we can be
semi-intelligent about which map incrementals we send preceeding any
messages.  Since this is useful from the heartbeat and cluster channels/
threads, protect the data with an inner lock and clean up the callers.

Be smarter about when we forget.

Make note of peer epoch when we receive a ping.

Signed-off-by: Sage Weil <sage.weil@dreamhost.com>
src/osd/OSD.cc
src/osd/OSD.h

index 0840ddf9e76ce699cd599179f4b509331c68f285..992afba9bf298a5162a33049383f41732e4fd3f3 100644 (file)
@@ -434,6 +434,7 @@ OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger,
   op_wq(this, &op_tp),
   osdmap(NULL),
   map_lock("OSD::map_lock"),
+  peer_map_epoch_lock("OSD::peer_map_epoch_lock"),
   map_cache_lock("OSD::map_cache_lock"), map_cache_keep_from(0),
   up_thru_wanted(0), up_thru_pending(0),
   pg_stat_queue_lock("OSD::pg_stat_queue_lock"),
@@ -1592,6 +1593,8 @@ void OSD::handle_osd_ping(MOSDPing *m)
               << " dropped stat " << m->peer_stat << dendl;
     }
 
+    note_peer_epoch(from, m->map_epoch);
+
     heartbeat_from_stamp[from] = g_clock.now();  // don't let _my_ lag interfere.
 
     // remove from failure lists if needed
@@ -2310,6 +2313,58 @@ done:
 // --------------------------------------
 // dispatch
 
+epoch_t OSD::get_peer_epoch(int peer)
+{
+  Mutex::Locker l(peer_map_epoch_lock);
+  map<int,epoch_t>::iterator p = peer_map_epoch.find(peer);
+  if (p == peer_map_epoch.end())
+    return 0;
+  return p->second;
+}
+
+epoch_t OSD::note_peer_epoch(int peer, epoch_t e)
+{
+  Mutex::Locker l(peer_map_epoch_lock);
+  map<int,epoch_t>::iterator p = peer_map_epoch.find(peer);
+  if (p != peer_map_epoch.end()) {
+    if (p->second < e) {
+      dout(10) << "note_peer_epoch osd" << peer << " has " << e << dendl;
+      p->second = e;
+    } else {
+      dout(30) << "note_peer_epoch osd" << peer << " has " << p->second << " >= " << e << dendl;
+    }
+    return p->second;
+  } else {
+    dout(10) << "note_peer_epoch osd" << peer << " now has " << e << dendl;
+    peer_map_epoch[peer] = e;
+    return e;
+  }
+}
+void OSD::forget_peer_epoch(int peer, epoch_t as_of) 
+{
+  Mutex::Locker l(peer_map_epoch_lock);
+  map<int,epoch_t>::iterator p = peer_map_epoch.find(peer);
+  if (p != peer_map_epoch.end()) {
+    if (p->second <= as_of) {
+      dout(10) << "forget_peer_epoch osd" << peer << " as_of " << as_of
+              << " had " << p->second << dendl;
+      peer_map_epoch.erase(p);
+    } else {
+      dout(10) << "forget_peer_epoch osd" << peer << " as_of " << as_of
+              << " has " << p->second << " - not forgetting" << dendl;
+    }
+  }
+}
+
+/*
+ * share incremental maps with peers. it's possible we could send
+ *   incremental 3->4 on cluster msgr
+ *   incremental 4->5 on heartbeat msgr
+ * and the second incremental would arrive first. who cares.  the peer
+ * will figure it out sooner or later.
+ */
+
 bool OSD::_share_map_incoming(const entity_inst_t& inst, epoch_t epoch,
                              Session* session)
 {
@@ -2344,15 +2399,12 @@ bool OSD::_share_map_incoming(const entity_inst_t& inst, epoch_t epoch,
       (osdmap->get_cluster_inst(inst.name.num()) == inst ||
        osdmap->get_hb_inst(inst.name.num()) == inst)) {
     // remember
-    if (peer_map_epoch[inst.name] < epoch) {
-      dout(20) << "peer " << inst.name << " has " << epoch << dendl;
-      peer_map_epoch[inst.name] = epoch;
-    }
-    
-    // older?
-    if (peer_map_epoch[inst.name] < osdmap->get_epoch()) {
+    epoch_t has = note_peer_epoch(inst.name.num(), epoch);
+
+    // share?
+    if (has < osdmap->get_epoch()) {
       dout(10) << inst.name << " has old map " << epoch << " < " << osdmap->get_epoch() << dendl;
-      peer_map_epoch[inst.name] = osdmap->get_epoch();  // so we don't send it again.
+      note_peer_epoch(inst.name.num(), osdmap->get_epoch());
       send_incremental_map(epoch, osdmap->get_cluster_inst(inst.name.num()));
       shared = true;
     }
@@ -2368,12 +2420,14 @@ void OSD::_share_map_outgoing(const entity_inst_t& inst)
 {
   assert(inst.name.is_osd());
 
+  int peer = inst.name.num();
+
   // send map?
-  if (peer_map_epoch.count(inst.name)) {
-    epoch_t pe = peer_map_epoch[inst.name];
+  epoch_t pe = get_peer_epoch(peer);
+  if (pe) {
     if (pe < osdmap->get_epoch()) {
       send_incremental_map(pe, inst);
-      peer_map_epoch[inst.name] = osdmap->get_epoch();
+      note_peer_epoch(peer, osdmap->get_epoch());
     }
   } else {
     // no idea about peer's epoch.
@@ -2824,23 +2878,25 @@ void OSD::wait_for_new_map(Message *m)
  * assimilate new OSDMap(s).  scan pgs, etc.
  */
 
-void OSD::note_down_osd(int osd)
+void OSD::note_down_osd(int peer)
 {
-  cluster_messenger->mark_down(osdmap->get_cluster_addr(osd));
+  cluster_messenger->mark_down(osdmap->get_cluster_addr(peer));
 
   heartbeat_lock.Lock();
 
   // note: update_heartbeat_peers will mark down the heartbeat connection.
 
-  peer_map_epoch.erase(entity_name_t::OSD(osd));
-  failure_queue.erase(osd);
-  failure_pending.erase(osd);
+  forget_peer_epoch(peer, osdmap->get_epoch());
+
+  failure_queue.erase(peer);
+  failure_pending.erase(peer);
 
   heartbeat_lock.Unlock();
 }
-void OSD::note_up_osd(int osd)
+
+void OSD::note_up_osd(int peer)
 {
-  peer_map_epoch.erase(entity_name_t::OSD(osd));
+  forget_peer_epoch(peer, osdmap->get_epoch() - 1);
 }
 
 void OSD::handle_osd_map(MOSDMap *m)
index b1d25736bb0d9a22a98d30c0b392032c8d3f8261..5fdd78e35a3de190ae3387fa81ce1e3613a8eda1 100644 (file)
@@ -452,7 +452,12 @@ private:
   RWLock          map_lock;
   list<Message*>  waiting_for_osdmap;
 
-  hash_map<entity_name_t, epoch_t>  peer_map_epoch;  // FIXME types
+  Mutex peer_map_epoch_lock;
+  map<int, epoch_t> peer_map_epoch;
+  
+  epoch_t get_peer_epoch(int p);
+  epoch_t note_peer_epoch(int p, epoch_t e);
+  void forget_peer_epoch(int p, epoch_t e);
 
   bool _share_map_incoming(const entity_inst_t& inst, epoch_t epoch,
                           Session *session = 0);