From 973836c70d63fdb2c907c5d34c52ea2fb013652b Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Fri, 9 Feb 2018 14:39:34 -0600 Subject: [PATCH] osd: change pg_slots unordered_map to use unique_ptr<> This avoids moving slots around in memory in the unordered_map... they can be big! Signed-off-by: Sage Weil --- src/osd/OSD.cc | 167 +++++++++++++++++++++++++------------------------ src/osd/OSD.h | 10 +-- 2 files changed, 90 insertions(+), 87 deletions(-) diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 675997842f0c7..1a7d8a549ed00 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -3694,11 +3694,11 @@ void OSD::_get_pgs(vector *v, bool clear_too) for (auto& s : shards) { Mutex::Locker l(s->sdata_op_ordering_lock); for (auto& j : s->pg_slots) { - if (j.second.pg && - !j.second.pg->is_deleted()) { - v->push_back(j.second.pg); + if (j.second->pg && + !j.second->pg->is_deleted()) { + v->push_back(j.second->pg); if (clear_too) { - s->_detach_pg(j.second); + s->_detach_pg(j.second.get()); } } } @@ -3711,8 +3711,8 @@ void OSD::_get_pgids(vector *v) for (auto& s : shards) { Mutex::Locker l(s->sdata_op_ordering_lock); for (auto& j : s->pg_slots) { - if (j.second.pg && - !j.second.pg->is_deleted()) { + if (j.second->pg && + !j.second->pg->is_deleted()) { v->push_back(j.first); } } @@ -3725,8 +3725,9 @@ void OSD::register_pg(PGRef pg) uint32_t shard_index = pgid.hash_to_shard(num_shards); auto sdata = shards[shard_index]; Mutex::Locker l(sdata->sdata_op_ordering_lock); - auto& slot = sdata->pg_slots[pgid]; - assert(!slot.pg); + auto r = sdata->pg_slots.emplace(pgid, make_unique()); + assert(r.second); + auto *slot = r.first->second.get(); dout(20) << __func__ << " " << pgid << " " << pg << dendl; sdata->_attach_pg(slot, pg.get()); } @@ -3738,9 +3739,9 @@ void OSD::unregister_pg(PG *pg) Mutex::Locker l(sdata->sdata_op_ordering_lock); auto p = sdata->pg_slots.find(pg->pg_id); if (p != sdata->pg_slots.end() && - p->second.pg) { + p->second->pg) { dout(20) << __func__ << " " << pg->pg_id << " " << pg << dendl; - sdata->_detach_pg(p->second); + sdata->_detach_pg(p->second.get()); } else { dout(20) << __func__ << " " << pg->pg_id << " not found" << dendl; } @@ -3755,7 +3756,7 @@ PGRef OSD::_lookup_pg(spg_t pgid) if (p == sdata->pg_slots.end()) { return nullptr; } - return p->second.pg; + return p->second->pg; } PG *OSD::_lookup_lock_pg(spg_t pgid) @@ -9244,19 +9245,19 @@ int OSD::init_op_flags(OpRequestRef& op) #undef dout_prefix #define dout_prefix *_dout << "osd." << osd->get_nodeid() << ":" << shard_id << "." << __func__ << " " -void OSDShard::_attach_pg(pg_slot& slot, PG *pg) +void OSDShard::_attach_pg(pg_slot *slot, PG *pg) { dout(10) << pg->pg_id << " " << pg << dendl; - slot.pg = pg; + slot->pg = pg; pg->osd_shard = this; ++osd->num_pgs; } -void OSDShard::_detach_pg(pg_slot& slot) +void OSDShard::_detach_pg(pg_slot *slot) { - dout(10) << slot.pg->pg_id << " " << slot.pg << dendl; - slot.pg->osd_shard = nullptr; - slot.pg = nullptr; + dout(10) << slot->pg->pg_id << " " << slot->pg << dendl; + slot->pg->osd_shard = nullptr; + slot->pg = nullptr; --osd->num_pgs; } @@ -9276,7 +9277,7 @@ void OSDShard::consume_map( // check slots auto p = pg_slots.begin(); while (p != pg_slots.end()) { - OSDShard::pg_slot& slot = p->second; + OSDShard::pg_slot *slot = p->second.get(); const spg_t& pgid = p->first; dout(20) << __func__ << " " << pgid << dendl; if (old_osdmap && @@ -9287,14 +9288,14 @@ void OSDShard::consume_map( osd->service.identify_split_children(old_osdmap, new_osdmap, pgid, new_children); } - if (slot.waiting_for_split) { + if (slot->waiting_for_split) { dout(20) << __func__ << " " << pgid << " waiting for split" << dendl; ++p; continue; } - if (!slot.waiting_peering.empty()) { - epoch_t first = slot.waiting_peering.begin()->first; + if (!slot->waiting_peering.empty()) { + epoch_t first = slot->waiting_peering.begin()->first; if (first <= osdmap->get_epoch()) { dout(20) << __func__ << " " << pgid << " pending_peering first epoch " << first @@ -9305,27 +9306,27 @@ void OSDShard::consume_map( ++p; continue; } - if (!slot.waiting.empty()) { + if (!slot->waiting.empty()) { if (osdmap->is_up_acting_osd_shard(pgid, osd->get_nodeid())) { dout(20) << __func__ << " " << pgid << " maps to us, keeping" << dendl; ++p; continue; } - while (!slot.waiting.empty() && - slot.waiting.front().get_map_epoch() <= osdmap->get_epoch()) { - auto& qi = slot.waiting.front(); + while (!slot->waiting.empty() && + slot->waiting.front().get_map_epoch() <= osdmap->get_epoch()) { + auto& qi = slot->waiting.front(); dout(20) << __func__ << " " << pgid << " waiting item " << qi << " epoch " << qi.get_map_epoch() << " <= " << osdmap->get_epoch() << ", stale, dropping" << dendl; *pushes_to_free += qi.get_reserved_pushes(); - slot.waiting.pop_front(); + slot->waiting.pop_front(); } - if (slot.waiting.empty() && - slot.num_running == 0 && - !slot.pg) { + if (slot->waiting.empty() && + slot->num_running == 0 && + !slot->pg) { dout(20) << __func__ << " " << pgid << " empty, pruning" << dendl; p = pg_slots.erase(p); continue; @@ -9343,26 +9344,26 @@ void OSDShard::consume_map( void OSDShard::_wake_pg_slot( spg_t pgid, - OSDShard::pg_slot& slot) + OSDShard::pg_slot *slot) { dout(20) << __func__ << " " << pgid - << " to_process " << slot.to_process - << " waiting " << slot.waiting - << " waiting_peering " << slot.waiting_peering << dendl; - for (auto i = slot.to_process.rbegin(); - i != slot.to_process.rend(); + << " to_process " << slot->to_process + << " waiting " << slot->waiting + << " waiting_peering " << slot->waiting_peering << dendl; + for (auto i = slot->to_process.rbegin(); + i != slot->to_process.rend(); ++i) { _enqueue_front(std::move(*i), osd->op_prio_cutoff); } - slot.to_process.clear(); - for (auto i = slot.waiting.rbegin(); - i != slot.waiting.rend(); + slot->to_process.clear(); + for (auto i = slot->waiting.rbegin(); + i != slot->waiting.rend(); ++i) { _enqueue_front(std::move(*i), osd->op_prio_cutoff); } - slot.waiting.clear(); - for (auto i = slot.waiting_peering.rbegin(); - i != slot.waiting_peering.rend(); + slot->waiting.clear(); + for (auto i = slot->waiting_peering.rbegin(); + i != slot->waiting_peering.rend(); ++i) { // this is overkill; we requeue everything, even if some of these items are // waiting for maps we don't have yet. FIXME. @@ -9370,9 +9371,9 @@ void OSDShard::_wake_pg_slot( _enqueue_front(std::move(*j), osd->op_prio_cutoff); } } - slot.waiting_peering.clear(); - slot.waiting_for_split = false; - ++slot.requeue_seq; + slot->waiting_peering.clear(); + slot->waiting_for_split = false; + ++slot->requeue_seq; } void OSDShard::prime_splits(OSDMapRef as_of_osdmap, set *pgids) @@ -9408,11 +9409,10 @@ void OSDShard::_prime_splits(set *pgids) while (p != pgids->end()) { unsigned shard_index = p->hash_to_shard(osd->num_shards); if (shard_index == shard_id) { - auto i = pg_slots.find(*p); - if (i == pg_slots.end()) { + auto r = pg_slots.emplace(*p, make_unique()); + if (r.second) { dout(10) << "priming slot " << *p << dendl; - OSDShard::pg_slot& slot = pg_slots[*p]; - slot.waiting_for_split = true; + r.first->second->waiting_for_split = true; } else { auto q = pg_slots.find(*p); assert(q != pg_slots.end()); @@ -9437,9 +9437,9 @@ void OSDShard::register_and_wake_split_child(PG *pg) dout(10) << pg->pg_id << " " << pg << dendl; auto p = pg_slots.find(pg->pg_id); assert(p != pg_slots.end()); - auto& slot = p->second; - assert(!slot.pg); - assert(slot.waiting_for_split); + auto *slot = p->second.get(); + assert(!slot->pg); + assert(slot->waiting_for_split); _attach_pg(slot, pg); _wake_pg_slot(pg->pg_id, slot); } @@ -9474,7 +9474,7 @@ void OSDShard::unprime_split_children(spg_t parent, unsigned old_pg_num) void OSD::ShardedOpWQ::_add_slot_waiter( spg_t pgid, - OSDShard::pg_slot& slot, + OSDShard::pg_slot *slot, OpQueueItem&& qi) { if (qi.is_peering()) { @@ -9482,13 +9482,13 @@ void OSD::ShardedOpWQ::_add_slot_waiter( << " peering, item epoch is " << qi.get_map_epoch() << ", will wait on " << qi << dendl; - slot.waiting_peering[qi.get_map_epoch()].push_back(std::move(qi)); + slot->waiting_peering[qi.get_map_epoch()].push_back(std::move(qi)); } else { dout(20) << __func__ << " " << pgid << " item epoch is " << qi.get_map_epoch() << ", will wait on " << qi << dendl; - slot.waiting.push_back(std::move(qi)); + slot->waiting.push_back(std::move(qi)); } } @@ -9533,15 +9533,17 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb) uint64_t requeue_seq; const auto token = item.get_ordering_token(); { - auto& slot = sdata->pg_slots[token]; + auto r = sdata->pg_slots.emplace(token, make_unique()); + auto *slot = r.first->second.get(); dout(30) << __func__ << " " << token - << " to_process " << slot.to_process - << " waiting " << slot.waiting - << " waiting_peering " << slot.waiting_peering + << (r.second ? " (new)" : "") + << " to_process " << slot->to_process + << " waiting " << slot->waiting + << " waiting_peering " << slot->waiting_peering << dendl; - if (slot.waiting_for_split - || (item.is_peering() && !slot.waiting_peering.empty()) - || (!item.is_peering() && !slot.waiting.empty())) { + if (slot->waiting_for_split + || (item.is_peering() && !slot->waiting_peering.empty()) + || (!item.is_peering() && !slot->waiting.empty())) { dout(20) << __func__ << " " << token << " already waiting, adding " << item << dendl; _add_slot_waiter(token, slot, std::move(item)); @@ -9549,12 +9551,12 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb) return; } // note the requeue seq now... - requeue_seq = slot.requeue_seq; - pg = slot.pg; - slot.to_process.push_back(std::move(item)); - dout(20) << __func__ << " " << slot.to_process.back() + requeue_seq = slot->requeue_seq; + pg = slot->pg; + slot->to_process.push_back(std::move(item)); + dout(20) << __func__ << " " << slot->to_process.back() << " queued" << dendl; - ++slot.num_running; + ++slot->num_running; } sdata->sdata_op_ordering_lock.Unlock(); @@ -9573,10 +9575,10 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb) auto q = sdata->pg_slots.find(token); assert(q != sdata->pg_slots.end()); - auto& slot = q->second; - --slot.num_running; + auto *slot = q->second.get(); + --slot->num_running; - if (slot.to_process.empty()) { + if (slot->to_process.empty()) { // raced with wake_pg_waiters or consume_map dout(20) << __func__ << " " << token << " nothing queued" << dendl; @@ -9586,9 +9588,9 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb) sdata->sdata_op_ordering_lock.Unlock(); return; } - if (requeue_seq != slot.requeue_seq) { + if (requeue_seq != slot->requeue_seq) { dout(20) << __func__ << " " << token - << " requeue_seq " << slot.requeue_seq << " > our " + << " requeue_seq " << slot->requeue_seq << " > our " << requeue_seq << ", we raced with wake_pg_waiters" << dendl; if (pg) { @@ -9598,16 +9600,16 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb) return; } dout(30) << __func__ << " " << token - << " to_process " << slot.to_process - << " waiting " << slot.waiting - << " waiting_peering " << slot.waiting_peering << dendl; + << " to_process " << slot->to_process + << " waiting " << slot->waiting + << " waiting_peering " << slot->waiting_peering << dendl; ThreadPool::TPHandle tp_handle(osd->cct, hb, timeout_interval, suicide_interval); // take next item - auto qi = std::move(slot.to_process.front()); - slot.to_process.pop_front(); + auto qi = std::move(slot->to_process.front()); + slot->to_process.pop_front(); dout(20) << __func__ << " " << qi << " pg " << pg << dendl; unsigned pushes_to_free = 0; set new_children; @@ -9617,7 +9619,7 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb) // should this pg shard exist on this osd in this (or a later) epoch? osdmap = sdata->osdmap; const PGCreateInfo *create_info = qi.creates_pg(); - if (slot.waiting_for_split) { + if (slot->waiting_for_split) { dout(20) << __func__ << " " << token << " splitting" << dendl; _add_slot_waiter(token, slot, std::move(qi)); @@ -9786,16 +9788,17 @@ void OSD::ShardedOpWQ::_enqueue_front(OpQueueItem&& item) assert(sdata); sdata->sdata_op_ordering_lock.Lock(); auto p = sdata->pg_slots.find(item.get_ordering_token()); - if (p != sdata->pg_slots.end() && !p->second.to_process.empty()) { + if (p != sdata->pg_slots.end() && + !p->second->to_process.empty()) { // we may be racing with _process, which has dequeued a new item // from pqueue, put it on to_process, and is now busy taking the // pg lock. ensure this old requeued item is ordered before any // such newer item in to_process. - p->second.to_process.push_front(std::move(item)); - item = std::move(p->second.to_process.back()); - p->second.to_process.pop_back(); + p->second->to_process.push_front(std::move(item)); + item = std::move(p->second->to_process.back()); + p->second->to_process.pop_back(); dout(20) << __func__ - << " " << p->second.to_process.front() + << " " << p->second->to_process.front() << " shuffled w/ " << item << dendl; } else { dout(20) << __func__ << " " << item << dendl; diff --git a/src/osd/OSD.h b/src/osd/OSD.h index c0ab3e8e576d8..8a4bf386cc168 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -1120,7 +1120,7 @@ struct OSDShard { /// map of slots for each spg_t. maintains ordering of items dequeued /// from pqueue while _process thread drops shard lock to acquire the /// pg lock. slots are removed by consume_map. - unordered_map pg_slots; + unordered_map> pg_slots; /// priority queue std::unique_ptr> pqueue; @@ -1140,8 +1140,8 @@ struct OSDShard { priority, cost, std::move(item)); } - void _attach_pg(pg_slot& slot, PG *pg); - void _detach_pg(pg_slot& slot); + void _attach_pg(pg_slot *slot, PG *pg); + void _detach_pg(pg_slot *slot); /// push osdmap into shard void consume_map( @@ -1149,7 +1149,7 @@ struct OSDShard { unsigned *pushes_to_free, set *new_children); - void _wake_pg_slot(spg_t pgid, OSDShard::pg_slot& slot); + void _wake_pg_slot(spg_t pgid, OSDShard::pg_slot *slot); void _prime_splits(set *pgids); void prime_splits(OSDMapRef as_of_osdmap, set *pgids); @@ -1654,7 +1654,7 @@ protected: void _add_slot_waiter( spg_t token, - OSDShard::pg_slot& slot, + OSDShard::pg_slot *slot, OpQueueItem&& qi); /// try to do some work -- 2.39.5