peering_wq(this, cct->_conf->osd_op_thread_timeout, &op_tp),
map_lock("OSD::map_lock"),
peer_map_epoch_lock("OSD::peer_map_epoch_lock"),
+ pg_map_lock("OSD::pg_map_lock"),
debug_drop_pg_create_probability(cct->_conf->osd_debug_drop_pg_create_probability),
debug_drop_pg_create_duration(cct->_conf->osd_debug_drop_pg_create_duration),
debug_drop_pg_create_left(-1),
f->dump_string("state", get_state_name(state));
f->dump_unsigned("oldest_map", superblock.oldest_map);
f->dump_unsigned("newest_map", superblock.newest_map);
- osd_lock.Lock();
- f->dump_unsigned("num_pgs", pg_map.size());
- osd_lock.Unlock();
+ {
+ RWLock::RLocker l(pg_map_lock);
+ f->dump_unsigned("num_pgs", pg_map.size());
+ }
f->close_section();
} else if (command == "flush_journal") {
store->sync_and_flush();
f->close_section(); //blacklist
} else if (command == "dump_watchers") {
list<obj_watch_item_t> watchers;
- osd_lock.Lock();
// scan pg's
- for (ceph::unordered_map<spg_t,PG*>::iterator it = pg_map.begin();
- it != pg_map.end();
- ++it) {
-
- list<obj_watch_item_t> pg_watchers;
- PG *pg = it->second;
- pg->lock();
- pg->get_watchers(pg_watchers);
- pg->unlock();
- watchers.splice(watchers.end(), pg_watchers);
+ {
+ Mutex::Locker l(osd_lock);
+ RWLock::RLocker l2(pg_map_lock);
+ for (ceph::unordered_map<spg_t,PG*>::iterator it = pg_map.begin();
+ it != pg_map.end();
+ ++it) {
+
+ list<obj_watch_item_t> pg_watchers;
+ PG *pg = it->second;
+ pg->lock();
+ pg->get_watchers(pg_watchers);
+ pg->unlock();
+ watchers.splice(watchers.end(), pg_watchers);
+ }
}
- osd_lock.Unlock();
f->open_array_section("watchers");
for (list<obj_watch_item_t>::iterator it = watchers.begin();
cct->_conf->apply_changes(NULL);
// Shutdown PGs
- for (ceph::unordered_map<spg_t, PG*>::iterator p = pg_map.begin();
- p != pg_map.end();
- ++p) {
- dout(20) << " kicking pg " << p->first << dendl;
- p->second->lock();
- p->second->on_shutdown();
- p->second->unlock();
- p->second->osr->flush();
+ {
+ RWLock::RLocker l(pg_map_lock);
+ for (ceph::unordered_map<spg_t, PG*>::iterator p = pg_map.begin();
+ p != pg_map.end();
+ ++p) {
+ dout(20) << " kicking pg " << p->first << dendl;
+ p->second->lock();
+ p->second->on_shutdown();
+ p->second->unlock();
+ p->second->osr->flush();
+ }
}
// finish ops
#ifdef PG_DEBUG_REFS
service.dump_live_pgids();
#endif
- for (ceph::unordered_map<spg_t, PG*>::iterator p = pg_map.begin();
- p != pg_map.end();
- ++p) {
- dout(20) << " kicking pg " << p->first << dendl;
- p->second->lock();
- if (p->second->ref.read() != 1) {
- derr << "pgid " << p->first << " has ref count of "
- << p->second->ref.read() << dendl;
- assert(0);
+ {
+ RWLock::RLocker l(pg_map_lock);
+ for (ceph::unordered_map<spg_t, PG*>::iterator p = pg_map.begin();
+ p != pg_map.end();
+ ++p) {
+ dout(20) << " kicking pg " << p->first << dendl;
+ p->second->lock();
+ if (p->second->ref.read() != 1) {
+ derr << "pgid " << p->first << " has ref count of "
+ << p->second->ref.read() << dendl;
+ assert(0);
+ }
+ p->second->unlock();
+ p->second->put("PGMap");
}
- p->second->unlock();
- p->second->put("PGMap");
+ pg_map.clear();
}
- pg_map.clear();
#ifdef PG_DEBUG_REFS
service.dump_live_pgids();
#endif
PG* pg = _make_pg(createmap, pgid);
- pg_map[pgid] = pg;
+ {
+ RWLock::WLocker l(pg_map_lock);
+ pg_map[pgid] = pg;
+ }
pg->lock(no_lockdep_check);
pg->get("PGMap"); // because it's in pg_map
{
epoch_t e(service.get_osdmap()->get_epoch());
pg->get("PGMap"); // For pg_map
- pg_map[pg->info.pgid] = pg;
+ {
+ RWLock::WLocker l(pg_map_lock);
+ pg_map[pg->info.pgid] = pg;
+ }
dout(10) << "Adding newly split pg " << *pg << dendl;
vector<int> up, acting;
pg->get_osdmap()->pg_to_up_acting_osds(pg->info.pgid.pgid, up, acting);
bool OSD::_have_pg(spg_t pgid)
{
assert(osd_lock.is_locked());
+ RWLock::RLocker l(pg_map_lock);
return pg_map.count(pgid);
}
PG *OSD::_lookup_lock_pg(spg_t pgid)
{
assert(osd_lock.is_locked());
+ RWLock::RLocker l(pg_map_lock);
if (!pg_map.count(pgid))
return NULL;
PG *pg = pg_map[pgid];
PG *OSD::_lookup_pg(spg_t pgid)
{
assert(osd_lock.is_locked());
+ RWLock::RLocker l(pg_map_lock);
if (!pg_map.count(pgid))
return NULL;
PG *pg = pg_map[pgid];
{
assert(osd_lock.is_locked());
dout(0) << "load_pgs" << dendl;
- assert(pg_map.empty());
+ {
+ RWLock::RLocker l(pg_map_lock);
+ assert(pg_map.empty());
+ }
vector<coll_t> ls;
int r = store->list_collections(ls);
dout(10) << "load_pgs loaded " << *pg << " " << pg->pg_log.get_log() << dendl;
pg->unlock();
}
- dout(0) << "load_pgs opened " << pg_map.size() << " pgs" << dendl;
+ {
+ RWLock::RLocker l(pg_map_lock);
+ dout(0) << "load_pgs opened " << pg_map.size() << " pgs" << dendl;
+ }
build_past_intervals_parallel();
}
// calculate untion of map range
epoch_t end_epoch = superblock.oldest_map;
epoch_t cur_epoch = superblock.newest_map;
- for (ceph::unordered_map<spg_t, PG*>::iterator i = pg_map.begin();
- i != pg_map.end();
- ++i) {
- PG *pg = i->second;
+ {
+ RWLock::RLocker l(pg_map_lock);
+ for (ceph::unordered_map<spg_t, PG*>::iterator i = pg_map.begin();
+ i != pg_map.end();
+ ++i) {
+ PG *pg = i->second;
- epoch_t start, end;
- if (!pg->_calc_past_interval_range(&start, &end))
- continue;
+ epoch_t start, end;
+ if (!pg->_calc_past_interval_range(&start, &end))
+ continue;
- dout(10) << pg->info.pgid << " needs " << start << "-" << end << dendl;
- pistate& p = pis[pg];
- p.start = start;
- p.end = end;
- p.same_interval_since = 0;
+ dout(10) << pg->info.pgid << " needs " << start << "-" << end << dendl;
+ pistate& p = pis[pg];
+ p.start = start;
+ p.end = end;
+ p.same_interval_since = 0;
- if (start < cur_epoch)
- cur_epoch = start;
- if (end > end_epoch)
- end_epoch = end;
+ if (start < cur_epoch)
+ cur_epoch = start;
+ if (end > end_epoch)
+ end_epoch = end;
+ }
}
if (pis.empty()) {
dout(10) << __func__ << " nothing to build" << dendl;
// build heartbeat from set
if (is_active()) {
+ RWLock::RLocker l(pg_map_lock);
for (ceph::unordered_map<spg_t, PG*>::iterator i = pg_map.begin();
i != pg_map.end();
++i) {
}
std::set <spg_t> keys;
+ RWLock::RLocker l(pg_map_lock);
for (ceph::unordered_map<spg_t, PG*>::const_iterator pg_map_e = pg_map.begin();
pg_map_e != pg_map.end(); ++pg_map_e) {
keys.insert(pg_map_e->first);
return;
}
+ RWLock::RLocker l(pg_map_lock);
if (m->scrub_pgs.empty()) {
for (ceph::unordered_map<spg_t, PG*>::iterator p = pg_map.begin();
p != pg_map.end();
assert(osd_lock.is_locked());
dout(7) << "advance_map epoch " << osdmap->get_epoch()
- << " " << pg_map.size() << " pgs"
<< dendl;
if (!up_epoch &&
list<PGRef> to_remove;
// scan pg's
- for (ceph::unordered_map<spg_t,PG*>::iterator it = pg_map.begin();
- it != pg_map.end();
- ++it) {
- PG *pg = it->second;
- pg->lock();
- if (pg->is_primary())
- num_pg_primary++;
- else if (pg->is_replica())
- num_pg_replica++;
- else
- num_pg_stray++;
-
- if (!osdmap->have_pg_pool(pg->info.pgid.pool())) {
- //pool is deleted!
- to_remove.push_back(PGRef(pg));
- } else {
- service.init_splits_between(it->first, service.get_osdmap(), osdmap);
- }
+ {
+ RWLock::RLocker l(pg_map_lock);
+ for (ceph::unordered_map<spg_t,PG*>::iterator it = pg_map.begin();
+ it != pg_map.end();
+ ++it) {
+ PG *pg = it->second;
+ pg->lock();
+ if (pg->is_primary())
+ num_pg_primary++;
+ else if (pg->is_replica())
+ num_pg_replica++;
+ else
+ num_pg_stray++;
- pg->unlock();
+ if (!osdmap->have_pg_pool(pg->info.pgid.pool())) {
+ //pool is deleted!
+ to_remove.push_back(PGRef(pg));
+ } else {
+ service.init_splits_between(it->first, service.get_osdmap(), osdmap);
+ }
+
+ pg->unlock();
+ }
}
for (list<PGRef>::iterator i = to_remove.begin();
service.publish_map(osdmap);
// scan pg's
- for (ceph::unordered_map<spg_t,PG*>::iterator it = pg_map.begin();
- it != pg_map.end();
- ++it) {
- PG *pg = it->second;
- pg->lock();
- pg->queue_null(osdmap->get_epoch(), osdmap->get_epoch());
- pg->unlock();
+ {
+ RWLock::RLocker l(pg_map_lock);
+ for (ceph::unordered_map<spg_t,PG*>::iterator it = pg_map.begin();
+ it != pg_map.end();
+ ++it) {
+ PG *pg = it->second;
+ pg->lock();
+ pg->queue_null(osdmap->get_epoch(), osdmap->get_epoch());
+ pg->unlock();
+ }
+
+ logger->set(l_osd_pg, pg_map.size());
}
-
- logger->set(l_osd_pg, pg_map.size());
logger->set(l_osd_pg_primary, num_pg_primary);
logger->set(l_osd_pg_replica, num_pg_replica);
logger->set(l_osd_pg_stray, num_pg_stray);
continue;
}
- if (pg_map.count(pgid)) {
- PG *pg = 0;
- pg = _lookup_lock_pg(pgid);
- pg->queue_query(
- it->second.epoch_sent, it->second.epoch_sent,
- pg_shard_t(from, it->second.from), it->second);
- pg->unlock();
- continue;
+ {
+ RWLock::RLocker l(pg_map_lock);
+ if (pg_map.count(pgid)) {
+ PG *pg = 0;
+ pg = _lookup_lock_pg_with_map_lock_held(pgid);
+ pg->queue_query(
+ it->second.epoch_sent, it->second.epoch_sent,
+ pg_shard_t(from, it->second.from), it->second);
+ pg->unlock();
+ continue;
+ }
}
if (!osdmap->have_pg_pool(pgid.pool()))
continue;
}
- if (pg_map.count(pgid) == 0) {
- dout(10) << " don't have pg " << pgid << dendl;
- continue;
+ {
+ RWLock::RLocker l(pg_map_lock);
+ if (pg_map.count(pgid) == 0) {
+ dout(10) << " don't have pg " << pgid << dendl;
+ continue;
+ }
}
dout(5) << "queue_pg_for_deletion: " << pgid << dendl;
- PG *pg = _lookup_lock_pg(pgid);
+ PG *pg = _lookup_lock_pg_with_map_lock_held(pgid);
pg_history_t history = pg->info.history;
int up_primary, acting_primary;
vector<int> up, acting;
remove_wq.queue(make_pair(PGRef(pg), deleting));
// remove from map
- pg_map.erase(pg->info.pgid);
+ {
+ RWLock::WLocker l(pg_map_lock);
+ pg_map.erase(pg->info.pgid);
+ }
pg->put("PGMap"); // since we've taken it out of map
}
for (list< pair<spg_t,utime_t> >::iterator p = pgids.begin(); p != pgids.end(); ++p) {
spg_t pgid = p->first;
+ pg_map_lock.get_read();
if (pg_map.count(pgid)) {
PG *pg = _lookup_lock_pg_with_map_lock_held(pgid);
+ pg_map_lock.unlock();
dout(10) << "check_replay_queue " << *pg << dendl;
if (pg->is_active() &&
- pg->is_replay() &&
- pg->is_primary() &&
- pg->replay_until == p->second) {
- pg->replay_queued_ops();
+ pg->is_replay() &&
+ pg->is_primary() &&
+ pg->replay_until == p->second) {
+ pg->replay_queued_ops();
}
pg->unlock();
} else {
+ pg_map_lock.unlock();
dout(10) << "check_replay_queue pgid " << pgid << " (not found)" << dendl;
}
}