return;
}
- auto new_stat = set_osd_stat(stbuf, hb_peers, osd->get_num_pgs());
+ auto new_stat = set_osd_stat(stbuf, hb_peers, osd->num_pgs);
dout(20) << "update_osd_stat " << new_stat << dendl;
assert(new_stat.kb);
float ratio = ((float)new_stat.kb_used) / ((float)new_stat.kb);
cct->_conf->osd_op_thread_suicide_timeout,
&osd_op_tp),
map_lock("OSD::map_lock"),
- pg_map_lock("OSD::pg_map_lock"),
last_pg_create_epoch(0),
mon_report_lock("OSD::mon_report_lock"),
up_thru_wanted(0),
f->dump_string("state", get_state_name(get_state()));
f->dump_unsigned("oldest_map", superblock.oldest_map);
f->dump_unsigned("newest_map", superblock.newest_map);
- f->dump_unsigned("num_pgs", pg_map_size);
+ f->dump_unsigned("num_pgs", num_pgs);
f->close_section();
} else if (admin_command == "flush_journal") {
store->flush_journal();
clear_temp_objects();
// initialize osdmap references in sharded wq
- op_shardedwq.prune_or_wake_pg_waiters(osdmap, whoami);
+#warning fixme initialization
+ //op_shardedwq.prune_or_wake_pg_waiters(osdmap, whoami);
+ for (auto& shard : shards) {
+ shard->osdmap = osdmap;
+ }
// load up pgs (as they previously existed)
load_pgs();
service.dump_live_pgids();
#endif
while (true) {
- set<PGRef> pgs;
- {
- RWLock::WLocker l(pg_map_lock);
- for (auto& i : pg_map) {
- pgs.insert(i.second);
- i.second->put("PGMap");
- }
- pg_map.clear();
- }
+ vector<PGRef> pgs;
+ _get_pgs(&pgs, true);
if (pgs.empty()) {
break;
}
spg_t pgid)
{
PGRef pg = _make_pg(createmap, pgid);
- {
- RWLock::WLocker l(pg_map_lock);
- assert(pg_map.count(pgid) == 0);
- pg_map[pgid] = pg.get();
- pg_map_size = pg_map.size();
- pg->get("PGMap"); // because it's in pg_map
- service.pg_add_epoch(pg->pg_id, createmap->get_epoch());
- }
+ service.pg_add_epoch(pg->pg_id, createmap->get_epoch());
return pg;
}
return pg;
}
-void OSD::_get_pgs(vector<PGRef> *v)
+void OSD::_get_pgs(vector<PGRef> *v, bool clear_too)
{
v->clear();
- RWLock::RLocker l(pg_map_lock);
- for (auto& i : pg_map) {
- if (!i.second->is_deleted()) {
- v->push_back(i.second);
+ for (auto& s : shards) {
+ Mutex::Locker l(s->sdata_lock);
+ for (auto& j : s->pg_slots) {
+ if (j.second.pg &&
+ !j.second.pg->is_deleted()) {
+ v->push_back(j.second.pg);
+ if (clear_too) {
+ j.second.pg.reset();
+ --num_pgs;
+ }
+ }
}
}
}
void OSD::_get_pgids(vector<spg_t> *v)
{
v->clear();
- RWLock::RLocker l(pg_map_lock);
- for (auto& i : pg_map) {
- if (!i.second->is_deleted()) {
- v->push_back(i.first);
+ for (auto& s : shards) {
+ Mutex::Locker l(s->sdata_lock);
+ for (auto& j : s->pg_slots) {
+ if (j.second.pg &&
+ !j.second.pg->is_deleted()) {
+ v->push_back(j.first);
+ }
}
}
}
+void OSD::_register_pg(PGRef pg)
+{
+ spg_t pgid = pg->get_pgid();
+ uint32_t shard_index = pgid.hash_to_shard(num_shards);
+ auto sdata = shards[shard_index];
+ Mutex::Locker l(sdata->sdata_lock);
+ auto& slot = sdata->pg_slots[pgid];
+ slot.pg = pg;
+ ++num_pgs;
+}
+
PGRef OSD::_lookup_pg(spg_t pgid)
{
- while (true) {
- {
- RWLock::RLocker l(pg_map_lock);
- auto p = pg_map.find(pgid);
- if (p == pg_map.end()) {
- return nullptr;
- }
- PGRef pg = p->second;
- if (!pg->is_deleted()) {
- return pg;
- }
- }
- // try again, this time with a write lock
- {
- RWLock::WLocker l(pg_map_lock);
- auto p = pg_map.find(pgid);
- if (p == pg_map.end()) {
- return nullptr;
- }
- PGRef pg = p->second;
- if (!pg->is_deleted()) {
- return pg;
- }
- pg_map.erase(p);
- pg_map_size = pg_map.size();
- pg->put("PGMap");
- }
+ uint32_t shard_index = pgid.hash_to_shard(num_shards);
+ auto sdata = shards[shard_index];
+ Mutex::Locker l(sdata->sdata_lock);
+ auto p = sdata->pg_slots.find(pgid);
+ if (p == sdata->pg_slots.end()) {
return nullptr;
}
+ return p->second.pg;
}
PG *OSD::_lookup_lock_pg(spg_t pgid)
{
assert(osd_lock.is_locked());
dout(0) << "load_pgs" << dendl;
- {
- RWLock::RLocker l(pg_map_lock);
- assert(pg_map.empty());
- }
vector<coll_t> ls;
int r = store->list_collections(ls);
dout(10) << __func__ << " loaded " << *pg << dendl;
pg->unlock();
+
+ _register_pg(pg);
++num;
}
dout(0) << __func__ << " opened " << num << " pgs" << dendl;
(cct->_conf->get_val<uint64_t>("mon_max_pg_per_osd") *
cct->_conf->get_val<double>("osd_max_pg_per_osd_hard_ratio"));
- if (pg_map_size < max_pgs_per_osd) {
+ if (num_pgs < max_pgs_per_osd) {
return false;
}
pending_creates_from_osd.emplace(pgid.pgid, is_primary);
}
dout(1) << __func__ << " withhold creation of pg " << pgid
- << ": " << pg_map_size << " >= "<< max_pgs_per_osd << dendl;
+ << ": " << num_pgs << " >= "<< max_pgs_per_osd << dendl;
return true;
}
const auto max_pgs_per_osd =
(cct->_conf->get_val<uint64_t>("mon_max_pg_per_osd") *
cct->_conf->get_val<double>("osd_max_pg_per_osd_hard_ratio"));
- RWLock::RLocker l(pg_map_lock);
- if (max_pgs_per_osd <= pg_map.size()) {
+ if (max_pgs_per_osd <= num_pgs) {
// this could happen if admin decreases this setting before a PG is removed
return;
}
- unsigned spare_pgs = max_pgs_per_osd - pg_map.size();
+ unsigned spare_pgs = max_pgs_per_osd - num_pgs;
[[gnu::unused]] auto&& locker = guardedly_lock(pending_creates_lock);
if (pending_creates_from_mon > 0) {
do_sub_pg_creates = true;
Mutex::Locker lec{min_last_epoch_clean_lock};
min_last_epoch_clean = osdmap->get_epoch();
min_last_epoch_clean_pgs.clear();
- RWLock::RLocker lpg(pg_map_lock);
- for (const auto &i : pg_map) {
- PG *pg = i.second;
+ vector<PGRef> pgs;
+ _get_pgs(&pgs);
+ for (auto& pg : pgs) {
if (!pg->is_primary()) {
continue;
}
-
pg->get_pg_stats([&](const pg_stat_t& s, epoch_t lec) {
m->pg_stat[pg->pg_id.pgid] = s;
min_last_epoch_clean = min(min_last_epoch_clean, lec);
epoch_t e = pg->get_osdmap_epoch();
pg->unlock();
- pg_map_lock.get_write();
- assert(pg_map.count(pg->get_pgid()) == 0);
- pg->get("PGMap"); // For pg_map
- pg_map[pg->get_pgid()] = pg;
- pg_map_size = pg_map.size();
service.complete_split(pg->get_pgid());
service.pg_add_epoch(pg->pg_id, e);
- pg_map_lock.put_write();
pg->lock();
pg->handle_initialize(&rctx);
}
{
// FIXME: move to OSDShard
- RWLock::RLocker l(pg_map_lock);
[[gnu::unused]] auto&& pending_create_locker = guardedly_lock(pending_creates_lock);
for (auto pg = pending_creates_from_osd.cbegin();
pg != pending_creates_from_osd.cend();) {
pg->unlock();
if (deleted) {
- RWLock::WLocker l(pg_map_lock);
- auto p = pg_map.find(pg->get_pgid());
- if (p != pg_map.end() &&
- p->second == pg) {
- dout(20) << __func__ << " removed pg " << pg << " from pg_map" << dendl;
- pg_map.erase(p);
- pg_map_size = pg_map.size();
- pg->put("PGMap");
- } else {
- dout(20) << __func__ << " failed to remove pg " << pg << " from pg_map" << dendl;
- }
+#warning hmm?
}
}
!slot.pg) {
dout(20) << __func__ << " " << p->first << " empty, pruning" << dendl;
p = sdata->pg_slots.erase(p);
+ --osd->num_pgs;
continue;
}
}
assert(!slot.pg || slot.pg == pg);
dout(20) << __func__ << " " << pgid << " pg " << pg << " cleared" << dendl;
slot.pg = nullptr;
+ --osd->num_pgs;
}
}
osd->service.maybe_inject_dispatch_delay();
- // [lookup +] lock pg (if we have it)
- while (true) {
- if (!pg) {
- pg = osd->_lookup_lock_pg(token);
- } else {
- pg->lock();
- if (pg->is_deleted()) {
- dout(20) << __func__ << " got deleted pg " << pg << ", retrying" << dendl;
- pg->unlock();
- pg = nullptr;
- continue;
- }
- }
- break;
+ // lock pg (if we have it)
+ if (pg) {
+ pg->lock();
}
osd->service.maybe_inject_dispatch_delay();
sdata->sdata_op_ordering_lock.Unlock();
return;
}
- if (pg && !slot.pg) {
- dout(20) << __func__ << " " << token << " set pg to " << pg << dendl;
- slot.pg = pg;
- }
dout(30) << __func__ << " " << token
<< " to_process " << slot.to_process
<< " waiting " << slot.waiting
<< " waiting_peering " << slot.waiting_peering << dendl;
- // make sure we're not already waiting for this pg
- /*if (!slot.waiting.empty()) {
- dout(20) << __func__ << " " << token
- << " slot is waiting" << dendl;
- if (pg) {
- pg->unlock();
- }
- sdata->sdata_op_ordering_lock.Unlock();
- return;
- }*/
-
ThreadPool::TPHandle tp_handle(osd->cct, hb, timeout_interval,
suicide_interval);
if (pg) {
// we created the pg! drop out and continue "normally"!
slot.pg = pg; // install in shard slot
+ ++osd->num_pgs;
_wake_pg_slot(token, sdata, slot);
break;
}