]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
OSD: add a RWLock pg_map_lock
authorGreg Farnum <greg@inktank.com>
Fri, 14 Mar 2014 22:46:46 +0000 (15:46 -0700)
committerGreg Farnum <greg@inktank.com>
Mon, 5 May 2014 22:29:16 +0000 (15:29 -0700)
If we're going to dispatch ops without grabbing the osd lock, we need
something else to protect the pg map (and it'll be a little
contended, so use a read-write lock).

We repurpose the (previously oddly-named) _lookup_lock_pg_with_map_lock_held()
function to refer to the pg_map_lock. handle_pg_query and handle_pg_remove
switch to use that version, because they're holding pg_map_lock already and
we know the PG they're referring to exists.

Signed-off-by: Samuel Just <sam.just@inktank.com>
Signed-off-by: Greg Farnum <greg@inktank.com>
src/osd/OSD.cc
src/osd/OSD.h

index 5f186acd56cd209b7b69eabcf7d0cb4ecbef00ff..75d863edfd0938baae4c2a6353dc2078d9f0fe4b 100644 (file)
@@ -936,6 +936,7 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_,
   peering_wq(this, cct->_conf->osd_op_thread_timeout, &op_tp),
   map_lock("OSD::map_lock"),
   peer_map_epoch_lock("OSD::peer_map_epoch_lock"),
+  pg_map_lock("OSD::pg_map_lock"),
   debug_drop_pg_create_probability(cct->_conf->osd_debug_drop_pg_create_probability),
   debug_drop_pg_create_duration(cct->_conf->osd_debug_drop_pg_create_duration),
   debug_drop_pg_create_left(-1),
@@ -1031,9 +1032,10 @@ bool OSD::asok_command(string command, cmdmap_t& cmdmap, string format,
     f->dump_string("state", get_state_name(state));
     f->dump_unsigned("oldest_map", superblock.oldest_map);
     f->dump_unsigned("newest_map", superblock.newest_map);
-    osd_lock.Lock();
-    f->dump_unsigned("num_pgs", pg_map.size());
-    osd_lock.Unlock();
+    {
+      RWLock::RLocker l(pg_map_lock);
+      f->dump_unsigned("num_pgs", pg_map.size());
+    }
     f->close_section();
   } else if (command == "flush_journal") {
     store->sync_and_flush();
@@ -1063,20 +1065,22 @@ bool OSD::asok_command(string command, cmdmap_t& cmdmap, string format,
     f->close_section(); //blacklist
   } else if (command == "dump_watchers") {
     list<obj_watch_item_t> watchers;
-    osd_lock.Lock();
     // scan pg's
-    for (ceph::unordered_map<spg_t,PG*>::iterator it = pg_map.begin();
-        it != pg_map.end();
-        ++it) {
-
-      list<obj_watch_item_t> pg_watchers;
-      PG *pg = it->second;
-      pg->lock();
-      pg->get_watchers(pg_watchers);
-      pg->unlock();
-      watchers.splice(watchers.end(), pg_watchers);
+    {
+      Mutex::Locker l(osd_lock);
+      RWLock::RLocker l2(pg_map_lock);
+      for (ceph::unordered_map<spg_t,PG*>::iterator it = pg_map.begin();
+          it != pg_map.end();
+          ++it) {
+
+        list<obj_watch_item_t> pg_watchers;
+        PG *pg = it->second;
+        pg->lock();
+        pg->get_watchers(pg_watchers);
+        pg->unlock();
+        watchers.splice(watchers.end(), pg_watchers);
+      }
     }
-    osd_lock.Unlock();
 
     f->open_array_section("watchers");
     for (list<obj_watch_item_t>::iterator it = watchers.begin();
@@ -1604,14 +1608,17 @@ int OSD::shutdown()
   cct->_conf->apply_changes(NULL);
   
   // Shutdown PGs
-  for (ceph::unordered_map<spg_t, PG*>::iterator p = pg_map.begin();
-       p != pg_map.end();
-       ++p) {
-    dout(20) << " kicking pg " << p->first << dendl;
-    p->second->lock();
-    p->second->on_shutdown();
-    p->second->unlock();
-    p->second->osr->flush();
+  {
+    RWLock::RLocker l(pg_map_lock);
+    for (ceph::unordered_map<spg_t, PG*>::iterator p = pg_map.begin();
+        p != pg_map.end();
+        ++p) {
+      dout(20) << " kicking pg " << p->first << dendl;
+      p->second->lock();
+      p->second->on_shutdown();
+      p->second->unlock();
+      p->second->osr->flush();
+    }
   }
   
   // finish ops
@@ -1705,20 +1712,23 @@ int OSD::shutdown()
 #ifdef PG_DEBUG_REFS
   service.dump_live_pgids();
 #endif
-  for (ceph::unordered_map<spg_t, PG*>::iterator p = pg_map.begin();
-       p != pg_map.end();
-       ++p) {
-    dout(20) << " kicking pg " << p->first << dendl;
-    p->second->lock();
-    if (p->second->ref.read() != 1) {
-      derr << "pgid " << p->first << " has ref count of "
-          << p->second->ref.read() << dendl;
-      assert(0);
+  {
+    RWLock::RLocker l(pg_map_lock);
+    for (ceph::unordered_map<spg_t, PG*>::iterator p = pg_map.begin();
+        p != pg_map.end();
+        ++p) {
+      dout(20) << " kicking pg " << p->first << dendl;
+      p->second->lock();
+      if (p->second->ref.read() != 1) {
+        derr << "pgid " << p->first << " has ref count of "
+            << p->second->ref.read() << dendl;
+        assert(0);
+      }
+      p->second->unlock();
+      p->second->put("PGMap");
     }
-    p->second->unlock();
-    p->second->put("PGMap");
+    pg_map.clear();
   }
-  pg_map.clear();
 #ifdef PG_DEBUG_REFS
   service.dump_live_pgids();
 #endif
@@ -1844,7 +1854,10 @@ PG *OSD::_open_lock_pg(
 
   PG* pg = _make_pg(createmap, pgid);
 
-  pg_map[pgid] = pg;
+  {
+    RWLock::WLocker l(pg_map_lock);
+    pg_map[pgid] = pg;
+  }
 
   pg->lock(no_lockdep_check);
   pg->get("PGMap");  // because it's in pg_map
@@ -1876,7 +1889,10 @@ void OSD::add_newly_split_pg(PG *pg, PG::RecoveryCtx *rctx)
 {
   epoch_t e(service.get_osdmap()->get_epoch());
   pg->get("PGMap");  // For pg_map
-  pg_map[pg->info.pgid] = pg;
+  {
+    RWLock::WLocker l(pg_map_lock);
+    pg_map[pg->info.pgid] = pg;
+  }
   dout(10) << "Adding newly split pg " << *pg << dendl;
   vector<int> up, acting;
   pg->get_osdmap()->pg_to_up_acting_osds(pg->info.pgid.pgid, up, acting);
@@ -1999,12 +2015,14 @@ PG *OSD::_create_lock_pg(
 bool OSD::_have_pg(spg_t pgid)
 {
   assert(osd_lock.is_locked());
+  RWLock::RLocker l(pg_map_lock);
   return pg_map.count(pgid);
 }
 
 PG *OSD::_lookup_lock_pg(spg_t pgid)
 {
   assert(osd_lock.is_locked());
+  RWLock::RLocker l(pg_map_lock);
   if (!pg_map.count(pgid))
     return NULL;
   PG *pg = pg_map[pgid];
@@ -2016,6 +2034,7 @@ PG *OSD::_lookup_lock_pg(spg_t pgid)
 PG *OSD::_lookup_pg(spg_t pgid)
 {
   assert(osd_lock.is_locked());
+  RWLock::RLocker l(pg_map_lock);
   if (!pg_map.count(pgid))
     return NULL;
   PG *pg = pg_map[pgid];
@@ -2035,7 +2054,10 @@ void OSD::load_pgs()
 {
   assert(osd_lock.is_locked());
   dout(0) << "load_pgs" << dendl;
-  assert(pg_map.empty());
+  {
+    RWLock::RLocker l(pg_map_lock);
+    assert(pg_map.empty());
+  }
 
   vector<coll_t> ls;
   int r = store->list_collections(ls);
@@ -2163,7 +2185,10 @@ void OSD::load_pgs()
     dout(10) << "load_pgs loaded " << *pg << " " << pg->pg_log.get_log() << dendl;
     pg->unlock();
   }
-  dout(0) << "load_pgs opened " << pg_map.size() << " pgs" << dendl;
+  {
+    RWLock::RLocker l(pg_map_lock);
+    dout(0) << "load_pgs opened " << pg_map.size() << " pgs" << dendl;
+  }
   
   build_past_intervals_parallel();
 }
@@ -2193,25 +2218,28 @@ void OSD::build_past_intervals_parallel()
   // calculate untion of map range
   epoch_t end_epoch = superblock.oldest_map;
   epoch_t cur_epoch = superblock.newest_map;
-  for (ceph::unordered_map<spg_t, PG*>::iterator i = pg_map.begin();
-       i != pg_map.end();
-       ++i) {
-    PG *pg = i->second;
+  {
+    RWLock::RLocker l(pg_map_lock);
+    for (ceph::unordered_map<spg_t, PG*>::iterator i = pg_map.begin();
+        i != pg_map.end();
+        ++i) {
+      PG *pg = i->second;
 
-    epoch_t start, end;
-    if (!pg->_calc_past_interval_range(&start, &end))
-      continue;
+      epoch_t start, end;
+      if (!pg->_calc_past_interval_range(&start, &end))
+        continue;
 
-    dout(10) << pg->info.pgid << " needs " << start << "-" << end << dendl;
-    pistate& p = pis[pg];
-    p.start = start;
-    p.end = end;
-    p.same_interval_since = 0;
+      dout(10) << pg->info.pgid << " needs " << start << "-" << end << dendl;
+      pistate& p = pis[pg];
+      p.start = start;
+      p.end = end;
+      p.same_interval_since = 0;
 
-    if (start < cur_epoch)
-      cur_epoch = start;
-    if (end > end_epoch)
-      end_epoch = end;
+      if (start < cur_epoch)
+        cur_epoch = start;
+      if (end > end_epoch)
+        end_epoch = end;
+    }
   }
   if (pis.empty()) {
     dout(10) << __func__ << " nothing to build" << dendl;
@@ -2819,6 +2847,7 @@ void OSD::maybe_update_heartbeat_peers()
 
   // build heartbeat from set
   if (is_active()) {
+    RWLock::RLocker l(pg_map_lock);
     for (ceph::unordered_map<spg_t, PG*>::iterator i = pg_map.begin();
         i != pg_map.end();
         ++i) {
@@ -4465,6 +4494,7 @@ void OSD::do_command(Connection *con, ceph_tid_t tid, vector<string>& cmd, buffe
     }
 
     std::set <spg_t> keys;
+    RWLock::RLocker l(pg_map_lock);
     for (ceph::unordered_map<spg_t, PG*>::const_iterator pg_map_e = pg_map.begin();
         pg_map_e != pg_map.end(); ++pg_map_e) {
       keys.insert(pg_map_e->first);
@@ -5061,6 +5091,7 @@ void OSD::handle_scrub(MOSDScrub *m)
     return;
   }
 
+  RWLock::RLocker l(pg_map_lock);
   if (m->scrub_pgs.empty()) {
     for (ceph::unordered_map<spg_t, PG*>::iterator p = pg_map.begin();
         p != pg_map.end();
@@ -5786,7 +5817,6 @@ void OSD::advance_map(ObjectStore::Transaction& t, C_Contexts *tfin)
   assert(osd_lock.is_locked());
 
   dout(7) << "advance_map epoch " << osdmap->get_epoch()
-          << "  " << pg_map.size() << " pgs"
           << dendl;
 
   if (!up_epoch &&
@@ -5851,26 +5881,29 @@ void OSD::consume_map()
   list<PGRef> to_remove;
 
   // scan pg's
-  for (ceph::unordered_map<spg_t,PG*>::iterator it = pg_map.begin();
-       it != pg_map.end();
-       ++it) {
-    PG *pg = it->second;
-    pg->lock();
-    if (pg->is_primary())
-      num_pg_primary++;
-    else if (pg->is_replica())
-      num_pg_replica++;
-    else
-      num_pg_stray++;
-
-    if (!osdmap->have_pg_pool(pg->info.pgid.pool())) {
-      //pool is deleted!
-      to_remove.push_back(PGRef(pg));
-    } else {
-      service.init_splits_between(it->first, service.get_osdmap(), osdmap);
-    }
+  {
+    RWLock::RLocker l(pg_map_lock);
+    for (ceph::unordered_map<spg_t,PG*>::iterator it = pg_map.begin();
+        it != pg_map.end();
+        ++it) {
+      PG *pg = it->second;
+      pg->lock();
+      if (pg->is_primary())
+        num_pg_primary++;
+      else if (pg->is_replica())
+        num_pg_replica++;
+      else
+        num_pg_stray++;
 
-    pg->unlock();
+      if (!osdmap->have_pg_pool(pg->info.pgid.pool())) {
+        //pool is deleted!
+        to_remove.push_back(PGRef(pg));
+      } else {
+        service.init_splits_between(it->first, service.get_osdmap(), osdmap);
+      }
+
+      pg->unlock();
+    }
   }
 
   for (list<PGRef>::iterator i = to_remove.begin();
@@ -5889,16 +5922,19 @@ void OSD::consume_map()
   service.publish_map(osdmap);
 
   // scan pg's
-  for (ceph::unordered_map<spg_t,PG*>::iterator it = pg_map.begin();
-       it != pg_map.end();
-       ++it) {
-    PG *pg = it->second;
-    pg->lock();
-    pg->queue_null(osdmap->get_epoch(), osdmap->get_epoch());
-    pg->unlock();
+  {
+    RWLock::RLocker l(pg_map_lock);
+    for (ceph::unordered_map<spg_t,PG*>::iterator it = pg_map.begin();
+        it != pg_map.end();
+        ++it) {
+      PG *pg = it->second;
+      pg->lock();
+      pg->queue_null(osdmap->get_epoch(), osdmap->get_epoch());
+      pg->unlock();
+    }
+
+    logger->set(l_osd_pg, pg_map.size());
   }
-  
-  logger->set(l_osd_pg, pg_map.size());
   logger->set(l_osd_pg_primary, num_pg_primary);
   logger->set(l_osd_pg_replica, num_pg_replica);
   logger->set(l_osd_pg_stray, num_pg_stray);
@@ -6916,14 +6952,17 @@ void OSD::handle_pg_query(OpRequestRef op)
       continue;
     }
 
-    if (pg_map.count(pgid)) {
-      PG *pg = 0;
-      pg = _lookup_lock_pg(pgid);
-      pg->queue_query(
-       it->second.epoch_sent, it->second.epoch_sent,
-       pg_shard_t(from, it->second.from), it->second);
-      pg->unlock();
-      continue;
+    {
+      RWLock::RLocker l(pg_map_lock);
+      if (pg_map.count(pgid)) {
+        PG *pg = 0;
+        pg = _lookup_lock_pg_with_map_lock_held(pgid);
+        pg->queue_query(
+            it->second.epoch_sent, it->second.epoch_sent,
+            pg_shard_t(from, it->second.from), it->second);
+        pg->unlock();
+        continue;
+      }
     }
 
     if (!osdmap->have_pg_pool(pgid.pool()))
@@ -7007,12 +7046,15 @@ void OSD::handle_pg_remove(OpRequestRef op)
       continue;
     }
     
-    if (pg_map.count(pgid) == 0) {
-      dout(10) << " don't have pg " << pgid << dendl;
-      continue;
+    {
+      RWLock::RLocker l(pg_map_lock);
+      if (pg_map.count(pgid) == 0) {
+        dout(10) << " don't have pg " << pgid << dendl;
+        continue;
+      }
     }
     dout(5) << "queue_pg_for_deletion: " << pgid << dendl;
-    PG *pg = _lookup_lock_pg(pgid);
+    PG *pg = _lookup_lock_pg_with_map_lock_held(pgid);
     pg_history_t history = pg->info.history;
     int up_primary, acting_primary;
     vector<int> up, acting;
@@ -7064,7 +7106,10 @@ void OSD::_remove_pg(PG *pg)
   remove_wq.queue(make_pair(PGRef(pg), deleting));
 
   // remove from map
-  pg_map.erase(pg->info.pgid);
+  {
+    RWLock::WLocker l(pg_map_lock);
+    pg_map.erase(pg->info.pgid);
+  }
   pg->put("PGMap"); // since we've taken it out of map
 }
 
@@ -7091,17 +7136,20 @@ void OSD::check_replay_queue()
 
   for (list< pair<spg_t,utime_t> >::iterator p = pgids.begin(); p != pgids.end(); ++p) {
     spg_t pgid = p->first;
+    pg_map_lock.get_read();
     if (pg_map.count(pgid)) {
       PG *pg = _lookup_lock_pg_with_map_lock_held(pgid);
+      pg_map_lock.unlock();
       dout(10) << "check_replay_queue " << *pg << dendl;
       if (pg->is_active() &&
-         pg->is_replay() &&
-         pg->is_primary() &&
-         pg->replay_until == p->second) {
-       pg->replay_queued_ops();
+          pg->is_replay() &&
+          pg->is_primary() &&
+          pg->replay_until == p->second) {
+        pg->replay_queued_ops();
       }
       pg->unlock();
     } else {
+      pg_map_lock.unlock();
       dout(10) << "check_replay_queue pgid " << pgid << " (not found)" << dendl;
     }
   }
index 10762dc664c4e17e08902ebf3d1486e5d200a495..8702df16b20eb322f775ff545942caeb4260c298 100644 (file)
@@ -1335,7 +1335,8 @@ private:
 
 protected:
   // -- placement groups --
-  ceph::unordered_map<spg_t, PG*> pg_map;
+  RWLock pg_map_lock;
+  ceph::unordered_map<spg_t, PG*> pg_map; // protected by pg_map lock
   map<spg_t, list<OpRequestRef> > waiting_for_pg;
   map<spg_t, list<PG::CephPeeringEvtRef> > peering_wait_for_split;
   PGRecoveryStats pg_recovery_stats;