]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
OSD: wake_pg_waiters atomically with pg_map update
authorSamuel Just <sam.just@inktank.com>
Fri, 8 Nov 2013 23:20:49 +0000 (15:20 -0800)
committerGreg Farnum <greg@inktank.com>
Mon, 5 May 2014 22:29:16 +0000 (15:29 -0700)
Also, call enqueue_op directly rather than going back
through the entire dispatch machinery.
Be sure to grab the pg lock under the pg_map_lock in _open_lock_pg() to
preserve necessary lock ordering.

Signed-off-by: Samuel Just <sam.just@inktank.com>
Signed-off-by: Greg Farnum <greg@inktank.com>
src/osd/OSD.cc
src/osd/OSD.h

index 3e354a8aa3287fcd2e981795d05e98215c7364e8..e49b075e517c430790e51ac5506e8de430082a69 100644 (file)
@@ -1854,14 +1854,13 @@ PG *OSD::_open_lock_pg(
   assert(osd_lock.is_locked());
 
   PG* pg = _make_pg(createmap, pgid);
-
   {
     RWLock::WLocker l(pg_map_lock);
+    pg->lock(no_lockdep_check);
     pg_map[pgid] = pg;
+    pg->get("PGMap");  // because it's in pg_map
+    wake_pg_waiters(pg, pgid);
   }
-
-  pg->lock(no_lockdep_check);
-  pg->get("PGMap");  // because it's in pg_map
   return pg;
 }
 
@@ -1893,6 +1892,7 @@ void OSD::add_newly_split_pg(PG *pg, PG::RecoveryCtx *rctx)
   {
     RWLock::WLocker l(pg_map_lock);
     pg_map[pg->info.pgid] = pg;
+    wake_pg_waiters(pg, pg->info.pgid);
   }
   dout(10) << "Adding newly split pg " << *pg << dendl;
   vector<int> up, acting;
@@ -1914,7 +1914,6 @@ void OSD::add_newly_split_pg(PG *pg, PG::RecoveryCtx *rctx)
     }
     peering_wait_for_split.erase(to_wake);
   }
-  wake_pg_waiters(pg->info.pgid);
   if (!service.get_osdmap()->have_pg_pool(pg->info.pgid.pool()))
     _remove_pg(pg);
 }
@@ -2425,9 +2424,6 @@ void OSD::handle_pg_peering_evt(
 
       dout(10) << *pg << " is new" << dendl;
 
-      // kick any waiters
-      wake_pg_waiters(pg->info.pgid);
-      
       pg->queue_peering_event(evt);
       pg->unlock();
       return;
@@ -2455,9 +2451,6 @@ void OSD::handle_pg_peering_evt(
 
       dout(10) << *pg << " is new (resurrected)" << dendl;
 
-      // kick any waiters
-      wake_pg_waiters(pg->info.pgid);
-
       pg->queue_peering_event(evt);
       pg->unlock();
       return;
@@ -2487,9 +2480,6 @@ void OSD::handle_pg_peering_evt(
 
       dout(10) << *parent << " is new" << dendl;
 
-      // kick any waiters
-      wake_pg_waiters(parent->info.pgid);
-
       assert(service.splitting(pgid));
       peering_wait_for_split[pgid].push_back(evt);
 
@@ -5854,6 +5844,7 @@ void OSD::advance_map(ObjectStore::Transaction& t, C_Contexts *tfin)
   }
 
   // scan pgs with waiters
+  RWLock::WLocker l(pg_map_lock);
   map<spg_t, list<OpRequestRef> >::iterator p = waiting_for_pg.begin();
   while (p != waiting_for_pg.end()) {
     spg_t pgid = p->first;
@@ -6414,7 +6405,6 @@ void OSD::handle_pg_create(OpRequestRef op)
        *rctx.transaction);
       pg->info.last_epoch_started = pg->info.history.last_epoch_started;
       creating_pgs.erase(pgid);
-      wake_pg_waiters(pg->info.pgid);
       pg->handle_create(&rctx);
       pg->write_if_dirty(*rctx.transaction);
       pg->publish_stats_to_osd();
index 0748af1785396a65cf88002c5b9c01973cb7b74d..f0cb41952605c6a67a3f1bc3362e4c61449be9e8 100644 (file)
@@ -1361,9 +1361,9 @@ private:
 
 protected:
   // -- placement groups --
-  RWLock pg_map_lock;
+  RWLock pg_map_lock; // this lock orders *above* individual PG _locks
   ceph::unordered_map<spg_t, PG*> pg_map; // protected by pg_map lock
-  map<spg_t, list<OpRequestRef> > waiting_for_pg;
+  map<spg_t, list<OpRequestRef> > waiting_for_pg; // protected by pg_map lock
   map<spg_t, list<PG::CephPeeringEvtRef> > peering_wait_for_split;
   PGRecoveryStats pg_recovery_stats;
 
@@ -1425,10 +1425,16 @@ protected:
     int lastactingprimary
     ); ///< @return false if there was a map gap between from and now
 
-  void wake_pg_waiters(spg_t pgid) {
-    if (waiting_for_pg.count(pgid)) {
-      take_waiters_front(waiting_for_pg[pgid]);
-      waiting_for_pg.erase(pgid);
+  void wake_pg_waiters(PG* pg, spg_t pgid) {
+    // Need write lock on pg_map
+    map<spg_t, list<OpRequestRef> >::iterator i = waiting_for_pg.find(pgid);
+    if (i != waiting_for_pg.end()) {
+      for (list<OpRequestRef>::iterator j = i->second.begin();
+          j != i->second.end();
+          ++j) {
+       enqueue_op(pg, *j);
+      }
+      waiting_for_pg.erase(i);
     }
   }