]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: update_heartbeat_peers as needed
authorSage Weil <sage.weil@dreamhost.com>
Fri, 9 Mar 2012 20:26:22 +0000 (12:26 -0800)
committerSage Weil <sage.weil@dreamhost.com>
Fri, 9 Mar 2012 20:26:22 +0000 (12:26 -0800)
Before, we were being very careful about updating the heartbeat peers if
new PGs were created or when certain types of messages were received.
However, the PG can change it's peers in lots of cases (e.g., when
recovery completes), but the OSD doesn't re-aggregate.

Instead, set a flag when each PG updates it's set, and check that flag in
the OSD code periodically or in likely places.  A call in tick() acts as
a catch-all.

The num_created counts can probably be cleaned out now...

Signed-off-by: Sage Weil <sage@newdream.net>
Reviewed-by: Greg Farnum <gregory.farnum@dreamhost.com>
src/osd/OSD.cc
src/osd/OSD.h
src/osd/PG.cc

index 258e93251b427170913981399d583223a34d1c97..b64b25dc0fc87f2abd9b59e13fae3f2494d67af3 100644 (file)
@@ -546,7 +546,7 @@ OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger,
   disk_tp(external_messenger->cct, "OSD::disk_tp", g_conf->osd_disk_threads),
   command_tp(external_messenger->cct, "OSD::command_tp", 1),
   heartbeat_lock("OSD::heartbeat_lock"),
-  heartbeat_stop(false), heartbeat_epoch(0),
+  heartbeat_stop(false), heartbeat_need_update(true), heartbeat_epoch(0),
   hbin_messenger(hbinm),
   hbout_messenger(hboutm),
   heartbeat_thread(this),
@@ -1464,10 +1464,22 @@ void OSD::_add_heartbeat_source(int p, map<int, epoch_t>& old_from, map<int, uti
   }
 }
 
-void OSD::update_heartbeat_peers()
+void OSD::need_heartbeat_peer_update()
 {
-  assert(osd_lock.is_locked());
   heartbeat_lock.Lock();
+  dout(20) << "need_heartbeat_peer_update" << dendl;
+  heartbeat_need_update = true;
+  heartbeat_lock.Unlock();
+}
+
+void OSD::maybe_update_heartbeat_peers()
+{
+  assert(osd_lock.is_locked());
+  Mutex::Locker l(heartbeat_lock);
+
+  if (!heartbeat_need_update)
+    return;
+  heartbeat_need_update = false;
 
   // filter heartbeat_from_stamp to only include osds that remain in
   // heartbeat_from.
@@ -1539,8 +1551,6 @@ void OSD::update_heartbeat_peers()
 
   dout(10) << "update_heartbeat_peers: hb   to: " << heartbeat_to << dendl;
   dout(10) << "update_heartbeat_peers: hb from: " << heartbeat_from << dendl;
-
-  heartbeat_lock.Unlock();
 }
 
 void OSD::reset_heartbeat_peers()
@@ -1801,6 +1811,8 @@ void OSD::tick()
 
   map_lock.get_read();
 
+  maybe_update_heartbeat_peers();
+
   heartbeat_lock.Lock();
   heartbeat_check();
   heartbeat_lock.Unlock();
@@ -3764,7 +3776,7 @@ void OSD::activate_map(ObjectStore::Transaction& t, list<Context*>& tfin)
 
   wake_all_pg_waiters();   // the pg mapping may have shifted
   trim_map_cache(oldest_last_clean);
-  update_heartbeat_peers();
+  maybe_update_heartbeat_peers();
 
   send_pg_temp();
 
@@ -4327,8 +4339,8 @@ void OSD::handle_pg_create(OpRequest *op)
   do_queries(query_map);
   do_infos(info_map);
 
-  if (num_created)
-    update_heartbeat_peers();
+  maybe_update_heartbeat_peers();
+
   op->put();
 }
 
@@ -4451,8 +4463,7 @@ void OSD::handle_pg_notify(OpRequest *op)
   do_queries(query_map);
   do_infos(info_map);
   
-  if (created)
-    update_heartbeat_peers();
+  maybe_update_heartbeat_peers();
 
   op->put();
 }
@@ -4500,8 +4511,8 @@ void OSD::handle_pg_log(OpRequest *op)
   int tr = store->queue_transaction(&pg->osr, t, new ObjectStore::C_DeleteTransaction(t), fin);
   assert(!tr);
 
-  if (created)
-    update_heartbeat_peers();
+  maybe_update_heartbeat_peers();
+
   op->put();
 }
 
@@ -4552,8 +4563,8 @@ void OSD::handle_pg_info(OpRequest *op)
   }
 
   do_infos(info_map);
-  if (created)
-    update_heartbeat_peers();
+
+  maybe_update_heartbeat_peers();
 
   op->put();
 }
@@ -4716,8 +4727,8 @@ void OSD::handle_pg_missing(OpRequest *op)
   _pro-cess_pg_info(m->get_epoch(), from, m->info, //misspelling added to prevent erroneous finds
                   empty_log, &m->missing, query_map, NULL, created);
   do_queries(query_map);
-  if (created)
-    update_heartbeat_peers();
+
+  maybe_update_heartbeat_peers();
 
   op->put();
 #endif
index e8c24ddfa38ab21ccc6414c9882f6329477d61f1..3d8b583eadc7853ceff4cc2f47f853d0329ff212 100644 (file)
@@ -256,7 +256,7 @@ private:
   // -- heartbeat --
   Mutex heartbeat_lock;
   Cond heartbeat_cond;
-  bool heartbeat_stop;
+  bool heartbeat_stop, heartbeat_need_update;
   epoch_t heartbeat_epoch;
   map<int, epoch_t> heartbeat_to, heartbeat_from;
   map<int, utime_t> heartbeat_from_stamp;
@@ -266,11 +266,12 @@ private:
   
   void _add_heartbeat_source(int p, map<int, epoch_t>& old_from, map<int, utime_t>& old_from_stamp,
                             map<int,Connection*>& old_con);
-  void update_heartbeat_peers();
+  void maybe_update_heartbeat_peers();
   void reset_heartbeat_peers();
   void heartbeat();
   void heartbeat_check();
   void heartbeat_entry();
+  void need_heartbeat_peer_update();
 
   struct T_Heartbeat : public Thread {
     OSD *osd;
index 28277d0dcc0277ec0414a18d22f507deace3f082..b32ab11d0407614b71b6e680e8e4a6c6887117d6 100644 (file)
@@ -1686,18 +1686,30 @@ void PG::purge_strays()
 void PG::update_heartbeat_peers()
 {
   assert(is_locked());
-  heartbeat_peer_lock.Lock();
-  heartbeat_peers.clear();
+
+  set<int> new_peers;
   if (role == 0) {
     for (unsigned i=0; i<acting.size(); i++)
-      heartbeat_peers.insert(acting[i]);
+      new_peers.insert(acting[i]);
     for (unsigned i=0; i<up.size(); i++)
-      heartbeat_peers.insert(up[i]);
+      new_peers.insert(up[i]);
     for (map<int,pg_info_t>::iterator p = peer_info.begin(); p != peer_info.end(); ++p)
-      heartbeat_peers.insert(p->first);
+      new_peers.insert(p->first);
+  }
+
+  bool need_update = false;
+  heartbeat_peer_lock.Lock();
+  if (new_peers == heartbeat_peers) {
+    dout(10) << "update_heartbeat_peers " << heartbeat_peers << " unchanged" << dendl;
+  } else {
+    dout(10) << "update_heartbeat_peers " << heartbeat_peers << " -> " << new_peers << dendl;
+    heartbeat_peers.swap(new_peers);
+    need_update = true;
   }
-  dout(10) << "update_heartbeat_peers " << heartbeat_peers << dendl;
   heartbeat_peer_lock.Unlock();
+
+  if (need_update)
+    osd->need_heartbeat_peer_update();
 }
 
 void PG::update_stats()