]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: protect per-pg heartbeat peers with inner lock
authorSage Weil <sage.weil@dreamhost.com>
Mon, 13 Feb 2012 02:08:34 +0000 (18:08 -0800)
committerSage Weil <sage.weil@dreamhost.com>
Mon, 13 Feb 2012 02:08:34 +0000 (18:08 -0800)
Currently we update the overall heartbeat peers by looking directly at
per-pg state.  This is potentially problematic now (#2033), and definitely
so in the future when we push more peering operations into the work queues.

Create a per-pg set of peers, protected by an inner lock, and update it
using PG::update_heartbeat_peers() when appropriate under pg->lock.  Then
aggregate it into the osd peer list in OSD::update_heatbeat_peers() under
osd_lock and the inner lock.

We could probably have re-used osd->heartbeat_lock instead of adding a
new pg->heartbeat_peer_lock, but the finer locking can't hurt.

Signed-off-by: Sage Weil <sage.weil@dreamhost.com>
src/osd/OSD.cc
src/osd/PG.cc
src/osd/PG.h

index d975aa84b6506b4bde762edfa3b0d022cf504fe2..70f3bd3ae31594b2df6060593c76a63227048838 100644 (file)
@@ -1463,18 +1463,14 @@ void OSD::update_heartbeat_peers()
        i != pg_map.end();
        i++) {
     PG *pg = i->second;
-
-    // replicas (new and old) ping primary.
-    if (pg->get_role() == 0) {
-      assert(pg->acting[0] == whoami);
-      for (unsigned i=0; i<pg->acting.size(); i++)
-       _add_heartbeat_source(pg->acting[i], old_from, old_from_stamp, old_con);
-      for (unsigned i=0; i<pg->up.size(); i++)
-       _add_heartbeat_source(pg->up[i], old_from, old_from_stamp, old_con);
-      for (map<int,pg_info_t>::iterator p = pg->peer_info.begin(); p != pg->peer_info.end(); ++p)
-       if (osdmap->is_up(p->first))
-         _add_heartbeat_source(p->first, old_from, old_from_stamp, old_con);
-    }
+    pg->heartbeat_peer_lock.Lock();
+    dout(20) << *pg << " heartbeat_peers " << pg->heartbeat_peers << dendl;
+    for (set<int>::iterator p = pg->heartbeat_peers.begin();
+        p != pg->heartbeat_peers.end();
+        ++p)
+      if (osdmap->is_up(*p))
+       _add_heartbeat_source(*p, old_from, old_from_stamp, old_con);
+    pg->heartbeat_peer_lock.Unlock();
   }
 
   for (map<int,epoch_t>::iterator p = old_from.begin();
index 146d57ebb7a3c6a17b024cb7a1e6702a568d834e..73e52d4663506c32eea317c76da89e2731bd4382 100644 (file)
@@ -248,6 +248,10 @@ bool PG::proc_replica_info(int from, pg_info_t &oinfo)
   }
   update_stats();
 
+  // was this a new info?  if so, update peers!
+  if (p == peer_info.end())
+    update_heartbeat_peers();
+
   return true;
 }
 
@@ -796,15 +800,21 @@ bool PG::adjust_need_up_thru(const OSDMapRef osdmap)
 void PG::remove_down_peer_info(const OSDMapRef osdmap)
 {
   // Remove any downed osds from peer_info
+  bool removed = false;
   map<int,pg_info_t>::iterator p = peer_info.begin();
   while (p != peer_info.end()) {
     if (!osdmap->is_up(p->first)) {
       dout(10) << " dropping down osd." << p->first << " info " << p->second << dendl;
       peer_missing.erase(p->first);
       peer_info.erase(p++);
+      removed = true;
     } else
       p++;
   }
+
+  // if we removed anyone, update peers (which include peer_info)
+  if (removed)
+    update_heartbeat_peers();
 }
 
 /*
@@ -1602,6 +1612,7 @@ void PG::purge_strays()
 {
   dout(10) << "purge_strays " << stray_set << dendl;
   
+  bool removed = false;
   for (set<int>::iterator p = stray_set.begin();
        p != stray_set.end();
        p++) {
@@ -1613,8 +1624,13 @@ void PG::purge_strays()
       dout(10) << "not sending PGRemove to down osd." << *p << dendl;
     }
     peer_info.erase(*p);
+    removed = true;
   }
 
+  // if we removed anyone, update peers (which include peer_info)
+  if (removed)
+    update_heartbeat_peers();
+
   stray_set.clear();
 
   // clear _requested maps; we may have to peer() again if we discover
@@ -1623,8 +1639,22 @@ void PG::purge_strays()
   peer_missing_requested.clear();
 }
 
-
-
+void PG::update_heartbeat_peers()
+{
+  assert(is_locked());
+  heartbeat_peer_lock.Lock();
+  heartbeat_peers.clear();
+  if (role == 0) {
+    for (unsigned i=0; i<acting.size(); i++)
+      heartbeat_peers.insert(acting[i]);
+    for (unsigned i=0; i<up.size(); i++)
+      heartbeat_peers.insert(up[i]);
+    for (map<int,pg_info_t>::iterator p = peer_info.begin(); p != peer_info.end(); ++p)
+      heartbeat_peers.insert(p->first);
+  }
+  dout(10) << "update_heartbeat_peers " << heartbeat_peers << dendl;
+  heartbeat_peer_lock.Unlock();
+}
 
 void PG::update_stats()
 {
@@ -3628,6 +3658,7 @@ boost::statechart::result PG::RecoveryState::Initial::react(const MNotifyRec& no
 {
   PG *pg = context< RecoveryMachine >().pg;
   pg->proc_replica_info(notify.from, notify.info);
+  pg->update_heartbeat_peers();
   return transit< Primary >();
 }
 
@@ -3708,6 +3739,9 @@ boost::statechart::result PG::RecoveryState::Reset::react(const ActMap&)
     context< RecoveryMachine >().send_notify(pg->get_primary(),
                                             pg->info);
   }
+
+  pg->update_heartbeat_peers();
+
   return transit< Started >();
 }
 
@@ -3853,6 +3887,7 @@ boost::statechart::result PG::RecoveryState::Active::react(const ActMap&)
   dout(10) << "Active: handling ActMap" << dendl;
   assert(pg->is_active());
   assert(pg->is_primary());
+
   pg->check_recovery_op_pulls(pg->get_osdmap());
        
   if (g_conf->osd_check_for_log_corruption)
index 791cd6721ed1e5d218d1f5dba34ce97d86092878..d262859f0641752afe37635b529016d8e9d73484 100644 (file)
@@ -561,6 +561,13 @@ protected:
 
   epoch_t last_peering_reset;
 
+
+  /* heartbeat peers */
+public:
+  Mutex heartbeat_peer_lock;
+  set<int> heartbeat_peers;
+
+protected:
   /**
    * BackfillInterval
    *
@@ -740,6 +747,8 @@ public:
 
   void purge_strays();
 
+  void update_heartbeat_peers();
+
   Context *finish_sync_event;
 
   void finish_recovery(ObjectStore::Transaction& t, list<Context*>& tfin);
@@ -1237,6 +1246,7 @@ public:
     state(0),
     need_up_thru(false),
     last_peering_reset(0),
+    heartbeat_peer_lock("PG::heartbeat_peer_lock"),
     backfill_target(-1),
     pg_stats_lock("PG::pg_stats_lock"),
     pg_stats_valid(false),