From: Sage Weil Date: Thu, 8 Feb 2018 22:23:04 +0000 (-0600) Subject: osd: restructure consume_map in terms of shards X-Git-Tag: v13.1.0~390^2~75 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=295dfe0372e8c70826e9429a72c61d120e2f19ac;p=ceph.git osd: restructure consume_map in terms of shards - new split primming machinery - new primed split cleanup on pg removal - cover the pg creation path The old split tracking is now totally unused; will be removed in the next patch. Signed-off-by: Sage Weil --- diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 8d37b5e1c5ab..26c88f98a1f2 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -512,6 +512,32 @@ void OSDService::complete_split(spg_t pgid) in_progress_splits.erase(pgid); } + +void OSDService::identify_split_children( + OSDMapRef old_map, + OSDMapRef new_map, + spg_t pgid, + set *new_children) +{ + if (!old_map->have_pg_pool(pgid.pool())) { + return; + } + int old_pgnum = old_map->get_pg_num(pgid.pool()); + int new_pgnum = get_possibly_deleted_pool_pg_num( + new_map, pgid.pool()); + if (pgid.ps() < static_cast(old_pgnum)) { + set children; + if (pgid.is_split(old_pgnum, new_pgnum, &children)) { + dout(20) << __func__ << " " << pgid << " children " << children << dendl; + new_children->insert(children.begin(), children.end()); + } + } else if (pgid.ps() >= static_cast(new_pgnum)) { + dout(20) << __func__ << " " << pgid << " is post-split, skipping" << dendl; + } +} + + + void OSDService::need_heartbeat_peer_update() { osd->need_heartbeat_peer_update(); @@ -1757,11 +1783,14 @@ void OSDService::queue_for_pg_delete(spg_t pgid, epoch_t e) e)); } -void OSDService::finish_pg_delete(PG *pg) +void OSDService::finish_pg_delete(PG *pg, unsigned old_pg_num) { - osd->op_shardedwq.clear_pg_pointer(pg); pg_remove_epoch(pg->get_pgid()); - cancel_pending_splits_for_parent(pg->get_pgid()); + + osd->unregister_pg(pg); + for (auto shard : osd->shards) { + shard->unprime_split_children(pg->pg_id, old_pg_num); + } } void OSDService::_queue_for_recovery( @@ -2074,9 +2103,13 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_, char order_lock[128] = {0}; snprintf(order_lock, sizeof(order_lock), "OSDShard.%d::sdata_op_ordering_lock", i); OSDShard *one_shard = new OSDShard( + i, + cct, + this, lock_name, order_lock, cct->_conf->osd_op_pq_max_tokens_per_priority, - cct->_conf->osd_op_pq_min_cost, cct, op_queue); + cct->_conf->osd_op_pq_min_cost, + op_queue); shards.push_back(one_shard); } } @@ -2654,8 +2687,6 @@ int OSD::init() clear_temp_objects(); // initialize osdmap references in sharded wq -#warning fixme initialization - //op_shardedwq.prune_or_wake_pg_waiters(osdmap, whoami); for (auto& shard : shards) { shard->osdmap = osdmap; } @@ -3830,7 +3861,7 @@ void OSD::_get_pgs(vector *v, bool clear_too) { v->clear(); for (auto& s : shards) { - Mutex::Locker l(s->sdata_lock); + Mutex::Locker l(s->sdata_op_ordering_lock); for (auto& j : s->pg_slots) { if (j.second.pg && !j.second.pg->is_deleted()) { @@ -3848,7 +3879,7 @@ void OSD::_get_pgids(vector *v) { v->clear(); for (auto& s : shards) { - Mutex::Locker l(s->sdata_lock); + Mutex::Locker l(s->sdata_op_ordering_lock); for (auto& j : s->pg_slots) { if (j.second.pg && !j.second.pg->is_deleted()) { @@ -3858,22 +3889,41 @@ void OSD::_get_pgids(vector *v) } } -void OSD::_register_pg(PGRef pg) +void OSD::register_pg(PGRef pg) { spg_t pgid = pg->get_pgid(); uint32_t shard_index = pgid.hash_to_shard(num_shards); auto sdata = shards[shard_index]; - Mutex::Locker l(sdata->sdata_lock); + Mutex::Locker l(sdata->sdata_op_ordering_lock); auto& slot = sdata->pg_slots[pgid]; + assert(!slot.pg); + dout(20) << __func__ << " " << pgid << " " << pg << dendl; slot.pg = pg; ++num_pgs; } +void OSD::unregister_pg(PG *pg) +{ + spg_t pgid = pg->get_pgid(); + uint32_t shard_index = pgid.hash_to_shard(num_shards); + auto sdata = shards[shard_index]; + Mutex::Locker l(sdata->sdata_op_ordering_lock); + auto p = sdata->pg_slots.find(pg->pg_id); + if (p != sdata->pg_slots.end() && + p->second.pg) { + dout(20) << __func__ << " " << pg->pg_id << " cleared" << dendl; + p->second.pg.reset(); + --num_pgs; + } else { + dout(20) << __func__ << " " << pg->pg_id << " not found" << dendl; + } +} + PGRef OSD::_lookup_pg(spg_t pgid) { uint32_t shard_index = pgid.hash_to_shard(num_shards); auto sdata = shards[shard_index]; - Mutex::Locker l(sdata->sdata_lock); + Mutex::Locker l(sdata->sdata_op_ordering_lock); auto p = sdata->pg_slots.find(pgid); if (p == sdata->pg_slots.end()) { return nullptr; @@ -3970,31 +4020,28 @@ void OSD::load_pgs() if (pg->dne()) { dout(10) << "load_pgs " << *it << " deleting dne" << dendl; pg->ch = nullptr; - service.pg_remove_epoch(pg->pg_id); pg->unlock(); - { - // Delete pg - RWLock::WLocker l(pg_map_lock); - auto p = pg_map.find(pg->get_pgid()); - assert(p != pg_map.end() && p->second == pg); - dout(20) << __func__ << " removed pg " << pg << " from pg_map" << dendl; - pg_map.erase(p); - pg->put("PGMap"); - } + unregister_pg(pg.get()); recursive_remove_collection(cct, store, pgid, *it); continue; } set new_children; - service.init_splits_between(pg->pg_id, pg->get_osdmap(), osdmap, &new_children); - op_shardedwq.prime_splits(new_children); + service.identify_split_children(pg->get_osdmap(), osdmap, pg->pg_id, + &new_children); + if (!new_children.empty()) { + for (auto shard : shards) { + shard->prime_splits(osdmap, &new_children); + } + assert(new_children.empty()); + } pg->reg_next_scrub(); dout(10) << __func__ << " loaded " << *pg << dendl; pg->unlock(); - _register_pg(pg); + register_pg(pg); ++num; } dout(0) << __func__ << " opened " << num << " pgs" << dendl; @@ -4036,18 +4083,6 @@ PGRef OSD::handle_pg_create_info(OSDMapRef osdmap, const PGCreateInfo *info) PGRef pg = _open_pg(createmap, pgid); - // We need to avoid racing with consume_map(). This should get - // redone as a structured waterfall of incoming osdmaps from osd -> - // shard, and something here that gets the to-be-split pg slots - // primed, even when they land on other shards. (I think it will be - // iterative to handle the case where it races with newer incoming - // maps.) For now, just cross our fingers. FIXME - { - set new_children; - service.init_splits_between(pg->pg_id, createmap, osdmap, &new_children); - op_shardedwq.prime_splits(new_children); - } - pg->lock(true); // we are holding the shard lock @@ -7823,15 +7858,16 @@ void OSD::_finish_splits(set& pgs) epoch_t e = pg->get_osdmap_epoch(); pg->unlock(); - service.complete_split(pg->get_pgid()); service.pg_add_epoch(pg->pg_id, e); pg->lock(); pg->handle_initialize(&rctx); pg->queue_null(e, e); dispatch_context_transaction(rctx, pg); - op_shardedwq.wake_pg_split_waiters(pg->get_pgid()); pg->unlock(); + + unsigned shard_index = pg->pg_id.hash_to_shard(num_shards); + shards[shard_index]->register_and_wake_split_child(pg); } dispatch_context(rctx, 0, service.get_osdmap()); @@ -7873,7 +7909,6 @@ void OSD::advance_pg( lastmap->get_pg_num(pg->pg_id.pool()), nextmap->get_pg_num(pg->pg_id.pool()), &children)) { - service.mark_split_in_progress(pg->pg_id, children); split_pgs( pg, children, &new_pgs, lastmap, nextmap, rctx); @@ -7906,17 +7941,25 @@ void OSD::consume_map() int num_pg_primary = 0, num_pg_replica = 0, num_pg_stray = 0; - // scan pg's - set new_children; + unsigned pushes_to_free = 0; + set newly_split; + for (auto& shard : shards) { + shard->consume_map(osdmap, &pushes_to_free, &newly_split); + } + if (!newly_split.empty()) { + for (auto& shard : shards) { + shard->prime_splits(osdmap, &newly_split); + } + assert(newly_split.empty()); + } + + vector pgids; + _get_pgids(&pgids); + + // count (FIXME) vector pgs; _get_pgs(&pgs); - vector pgids; - pgids.reserve(pgs.size()); for (auto& pg : pgs) { - pgids.push_back(pg->get_pgid()); - service.init_splits_between(pg->get_pgid(), service.get_osdmap(), osdmap, - &new_children); - // FIXME: this is lockless and racy, but we don't want to take pg lock // here. if (pg->is_primary()) @@ -7926,6 +7969,7 @@ void OSD::consume_map() else num_pg_stray++; } + { // FIXME: move to OSDShard [[gnu::unused]] auto&& pending_create_locker = guardedly_lock(pending_creates_lock); @@ -7939,9 +7983,6 @@ void OSD::consume_map() } } - service.expand_pg_num(service.get_osdmap(), osdmap, &new_children); - op_shardedwq.prime_splits(new_children); - service.pre_publish_map(osdmap); service.await_reserved_maps(); service.publish_map(osdmap); @@ -7952,12 +7993,7 @@ void OSD::consume_map() service.maybe_inject_dispatch_delay(); - // remove any PGs which we no longer host from the pg_slot wait lists - dout(20) << __func__ << " checking pg_slot waiters" << dendl; - op_shardedwq.prune_or_wake_pg_waiters(osdmap, whoami); - - service.maybe_inject_dispatch_delay(); - + // queue null events to push maps down to individual PGs for (auto pgid : pgids) { enqueue_peering_evt( pgid, @@ -8131,11 +8167,8 @@ void OSD::split_pgs( OSDMapRef nextmap, PG::RecoveryCtx *rctx) { - unsigned pg_num = nextmap->get_pg_num( - parent->pg_id.pool()); - parent->update_snap_mapper_bits( - parent->get_pgid().get_split_bits(pg_num) - ); + unsigned pg_num = nextmap->get_pg_num(parent->pg_id.pool()); + parent->update_snap_mapper_bits(parent->get_pgid().get_split_bits(pg_num)); vector updated_stats; parent->start_split_stats(childpgids, &updated_stats); @@ -8145,8 +8178,7 @@ void OSD::split_pgs( i != childpgids.end(); ++i, ++stat_iter) { assert(stat_iter != updated_stats.end()); - dout(10) << "Splitting " << *parent << " into " << *i << dendl; - assert(service.splitting(*i)); + dout(10) << __func__ << " splitting " << *parent << " into " << *i << dendl; PG* child = _make_pg(nextmap, *i); child->lock(true); out_pgs->insert(child); @@ -8988,7 +9020,7 @@ void OSD::dequeue_peering_evt( ThreadPool::TPHandle& handle) { PG::RecoveryCtx rctx = create_context(); - auto curmap = service.get_osdmap(); + auto curmap = sdata->osdmap; epoch_t need_up_thru = 0, same_interval_since = 0; if (!pg) { if (const MQuery *q = dynamic_cast(evt->evt.get())) { @@ -9005,12 +9037,7 @@ void OSD::dequeue_peering_evt( dispatch_context_transaction(rctx, pg, &handle); need_up_thru = pg->get_need_up_thru(); same_interval_since = pg->get_same_interval_since(); - bool deleted = pg->is_deleted(); pg->unlock(); - - if (deleted) { -#warning hmm? - } } if (need_up_thru) { @@ -9402,13 +9429,94 @@ int OSD::init_op_flags(OpRequestRef& op) // ============================================================= #undef dout_context -#define dout_context osd->cct +#define dout_context cct #undef dout_prefix -#define dout_prefix *_dout << "osd." << osd->whoami << " op_wq " +#define dout_prefix *_dout << "osd." << osd->get_nodeid() << ":" << shard_id << "." << __func__ << " " + +void OSDShard::consume_map( + OSDMapRef& new_osdmap, + unsigned *pushes_to_free, + set *new_children) +{ + Mutex::Locker l(sdata_op_ordering_lock); + OSDMapRef old_osdmap = std::move(osdmap); + osdmap = new_osdmap; + dout(10) << new_osdmap->get_epoch() + << " (was " << (old_osdmap ? old_osdmap->get_epoch() : 0) << ")" + << dendl; + bool queued = false; + + // check slots + auto p = pg_slots.begin(); + while (p != pg_slots.end()) { + OSDShard::pg_slot& slot = p->second; + const spg_t& pgid = p->first; + dout(20) << __func__ << " " << pgid << dendl; + if (old_osdmap && + (slot->pg || slot->waiting_for_split)) { + // only prime children for parent slots that are attached to a + // pg or are waiting_for_split (because their ancestor is + // attached to a pg). + osd->service.identify_split_children(old_osdmap, new_osdmap, pgid, + new_children); + } + if (slot.waiting_for_split) { + dout(20) << __func__ << " " << pgid + << " waiting for split" << dendl; + ++p; + continue; + } + if (!slot.waiting_peering.empty()) { + epoch_t first = slot.waiting_peering.begin()->first; + if (first <= osdmap->get_epoch()) { + dout(20) << __func__ << " " << pgid + << " pending_peering first epoch " << first + << " <= " << osdmap->get_epoch() << ", requeueing" << dendl; + _wake_pg_slot(pgid, slot); + queued = true; + } + ++p; + continue; + } + if (!slot.waiting.empty()) { + if (osdmap->is_up_acting_osd_shard(pgid, osd->get_nodeid())) { + dout(20) << __func__ << " " << pgid << " maps to us, keeping" + << dendl; + ++p; + continue; + } + while (!slot.waiting.empty() && + slot.waiting.front().get_map_epoch() <= osdmap->get_epoch()) { + auto& qi = slot.waiting.front(); + dout(20) << __func__ << " " << pgid + << " waiting item " << qi + << " epoch " << qi.get_map_epoch() + << " <= " << osdmap->get_epoch() + << ", stale, dropping" << dendl; + *pushes_to_free += qi.get_reserved_pushes(); + slot.waiting.pop_front(); + } + if (slot.waiting.empty() && + slot.num_running == 0 && + !slot.pg) { + dout(20) << __func__ << " " << pgid << " empty, pruning" << dendl; + p = pg_slots.erase(p); + --osd->num_pgs; + continue; + } + } + ++p; + } + _prime_splits(new_children); + if (queued) { + sdata_lock.Lock(); + sdata_cond.SignalOne(); + sdata_lock.Unlock(); + } +} -void OSD::ShardedOpWQ::_wake_pg_slot( +void OSDShard::_wake_pg_slot( spg_t pgid, - OSDShard *sdata, OSDShard::pg_slot& slot) { dout(20) << __func__ << " " << pgid @@ -9418,13 +9526,13 @@ void OSD::ShardedOpWQ::_wake_pg_slot( for (auto i = slot.to_process.rbegin(); i != slot.to_process.rend(); ++i) { - sdata->_enqueue_front(std::move(*i), osd->op_prio_cutoff); + _enqueue_front(std::move(*i), osd->op_prio_cutoff); } slot.to_process.clear(); for (auto i = slot.waiting.rbegin(); i != slot.waiting.rend(); ++i) { - sdata->_enqueue_front(std::move(*i), osd->op_prio_cutoff); + _enqueue_front(std::move(*i), osd->op_prio_cutoff); } slot.waiting.clear(); for (auto i = slot.waiting_peering.rbegin(); @@ -9433,7 +9541,7 @@ void OSD::ShardedOpWQ::_wake_pg_slot( // this is overkill; we requeue everything, even if some of these items are // waiting for maps we don't have yet. FIXME. for (auto j = i->second.rbegin(); j != i->second.rend(); ++j) { - sdata->_enqueue_front(std::move(*j), osd->op_prio_cutoff); + _enqueue_front(std::move(*j), osd->op_prio_cutoff); } } slot.waiting_peering.clear(); @@ -9441,119 +9549,121 @@ void OSD::ShardedOpWQ::_wake_pg_slot( ++slot.requeue_seq; } -void OSD::ShardedOpWQ::wake_pg_split_waiters(spg_t pgid) +void OSDShard::prime_splits(OSDMapRef as_of_osdmap, set *pgids) { - uint32_t shard_index = pgid.hash_to_shard(osd->shards.size()); - auto sdata = osd->shards[shard_index]; - bool queued = false; - { - Mutex::Locker l(sdata->sdata_op_ordering_lock); - auto p = sdata->pg_slots.find(pgid); - if (p != sdata->pg_slots.end()) { - _wake_pg_slot(pgid, sdata, p->second); - queued = true; + Mutex::Locker l(sdata_op_ordering_lock); + _prime_splits(pgids); + if (osdmap->get_epoch() > as_of_osdmap->get_epoch()) { + set newer_children; + for (auto pgid : *pgids) { + osd->service.identify_split_children(as_of_osdmap, osdmap, pgid, + &newer_children); + } + newer_children.insert(pgids->begin(), pgids->end()); + dout(10) << "as_of_osdmap " << as_of_osdmap->get_epoch() << " < shard " + << osdmap->get_epoch() << ", new children " << newer_children + << dendl; + _prime_splits(&newer_children); + // note: we don't care what is left over here for other shards. + // if this shard is ahead of us and one isn't, e.g., one thread is + // calling into prime_splits via _process (due to a newly created + // pg) and this shard has a newer map due to a racing consume_map, + // then any grandchildren left here will be identified (or were + // identified) when the slower shard's osdmap is advanced. + // _prime_splits() will tolerate the case where the pgid is + // already primed. + } +} + +void OSDShard::_prime_splits(set *pgids) +{ + dout(10) << *pgids << dendl; + auto p = pgids->begin(); + while (p != pgids->end()) { + unsigned shard_index = p->hash_to_shard(osd->num_shards); + if (shard_index == shard_id) { + auto i = pg_slots.find(*p); + if (i == pg_slots.end()) { + dout(10) << "priming slot " << *p << dendl; + OSDShard::pg_slot& slot = pg_slots[*p]; + slot.waiting_for_split = true; + } else { + auto q = pg_slots.find(*p); + assert(q != pg_slots.end()); + if (q->second->waiting_for_split) { + dout(10) << "slot " << *p << " already primed" << dendl; + } else { + dout(10) << "priming (existing) slot " << *p << dendl; + q->second->waiting_for_split = true; + } + } + p = pgids->erase(p); + } else { + ++p; } - } - if (queued) { - sdata->sdata_lock.Lock(); - sdata->sdata_cond.SignalOne(); - sdata->sdata_lock.Unlock(); } } -void OSD::ShardedOpWQ::prime_splits(const set& pgs) +void OSDShard::register_and_wake_split_child(PG *pg) { - dout(20) << __func__ << " " << pgs << dendl; - for (auto pgid : pgs) { - unsigned shard_index = pgid.hash_to_shard(osd->shards.size()); - OSDShard* sdata = osd->shards[shard_index]; - Mutex::Locker l(sdata->sdata_op_ordering_lock); - OSDShard::pg_slot& slot = sdata->pg_slots[pgid]; - slot.waiting_for_split = true; + { + Mutex::Locker l(sdata_op_ordering_lock); + dout(10) << pg->pg_id << " " << pg << dendl; + auto p = pg_slots.find(pg->pg_id); + assert(p != pg_slots.end()); + auto& slot = p->second; + assert(!slot.pg); + assert(slot.waiting_for_split); + slot.pg = pg; + ++osd->num_pgs; + _wake_pg_slot(pg->pg_id, slot); } + sdata_lock.Lock(); + sdata_cond.SignalOne(); + sdata_lock.Unlock(); } -void OSD::ShardedOpWQ::prune_or_wake_pg_waiters(OSDMapRef osdmap, int whoami) +void OSDShard::unprime_split_children(spg_t parent, unsigned old_pg_num) { - unsigned pushes_to_free = 0; - bool queued = false; - for (auto sdata : osd->shards) { - Mutex::Locker l(sdata->sdata_op_ordering_lock); - sdata->osdmap = osdmap; - auto p = sdata->pg_slots.begin(); - while (p != sdata->pg_slots.end()) { - OSDShard::pg_slot& slot = p->second; - if (slot.waiting_for_split) { - dout(20) << __func__ << " " << p->first - << " waiting for split" << dendl; - ++p; - continue; - } - if (!slot.waiting_peering.empty()) { - epoch_t first = slot.waiting_peering.begin()->first; - if (first <= osdmap->get_epoch()) { - dout(20) << __func__ << " " << p->first - << " pending_peering first epoch " << first - << " <= " << osdmap->get_epoch() << ", requeueing" << dendl; - _wake_pg_slot(p->first, sdata, slot); - queued = true; - } - ++p; - continue; - } - if (!slot.waiting.empty()) { - if (osdmap->is_up_acting_osd_shard(p->first, whoami)) { - dout(20) << __func__ << " " << p->first << " maps to us, keeping" - << dendl; - ++p; - continue; - } - while (!slot.waiting.empty() && - slot.waiting.front().get_map_epoch() <= osdmap->get_epoch()) { - auto& qi = slot.waiting.front(); - dout(20) << __func__ << " " << p->first - << " waiting item " << qi - << " epoch " << qi.get_map_epoch() - << " <= " << osdmap->get_epoch() - << ", stale, dropping" << dendl; - pushes_to_free += qi.get_reserved_pushes(); - slot.waiting.pop_front(); - } - if (slot.waiting.empty() && - slot.num_running == 0 && - !slot.pg) { - dout(20) << __func__ << " " << p->first << " empty, pruning" << dendl; - p = sdata->pg_slots.erase(p); - --osd->num_pgs; - continue; - } - } - ++p; - } - if (queued) { - sdata->sdata_lock.Lock(); - sdata->sdata_cond.SignalOne(); - sdata->sdata_lock.Unlock(); + Mutex::Locker l(sdata_op_ordering_lock); + vector to_delete; + for (auto& i : pg_slots) { + if (i.first != parent && + i.first.get_ancestor(old_pg_num) == parent) { + dout(10) << __func__ << " parent " << parent << " clearing " << i.first + << dendl; + to_delete.push_back(i.first); } } - if (pushes_to_free > 0) { - osd->service.release_reserved_pushes(pushes_to_free); + for (auto pgid : to_delete) { + pg_slots.erase(pgid); } } -void OSD::ShardedOpWQ::clear_pg_pointer(PG *pg) +// ============================================================= + +#undef dout_context +#define dout_context osd->cct +#undef dout_prefix +#define dout_prefix *_dout << "osd." << osd->whoami << " op_wq " + +void OSD::ShardedOpWQ::wake_pg_split_waiters(spg_t pgid) { - spg_t pgid = pg->get_pgid(); uint32_t shard_index = pgid.hash_to_shard(osd->shards.size()); auto sdata = osd->shards[shard_index]; - Mutex::Locker l(sdata->sdata_op_ordering_lock); - auto p = sdata->pg_slots.find(pgid); - if (p != sdata->pg_slots.end()) { - auto& slot = p->second; - assert(!slot.pg || slot.pg == pg); - dout(20) << __func__ << " " << pgid << " pg " << pg << " cleared" << dendl; - slot.pg = nullptr; - --osd->num_pgs; + bool queued = false; + { + Mutex::Locker l(sdata->sdata_op_ordering_lock); + auto p = sdata->pg_slots.find(pgid); + if (p != sdata->pg_slots.end()) { + sdata->_wake_pg_slot(pgid, p->second); + queued = true; + } + } + if (queued) { + sdata->sdata_lock.Lock(); + sdata->sdata_cond.SignalOne(); + sdata->sdata_lock.Unlock(); } } @@ -9672,7 +9782,7 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb) --slot.num_running; if (slot.to_process.empty()) { - // raced with wake_pg_waiters or prune_or_wake_pg_waiters + // raced with wake_pg_waiters or consume_map dout(20) << __func__ << " " << token << " nothing queued" << dendl; if (pg) { @@ -9705,10 +9815,12 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb) slot.to_process.pop_front(); dout(20) << __func__ << " " << qi << " pg " << pg << dendl; unsigned pushes_to_free = 0; + set new_children; + OSDMapRef osdmap; while (!pg) { // should this pg shard exist on this osd in this (or a later) epoch? - OSDMapRef osdmap = sdata->osdmap; + osdmap = sdata->osdmap; const PGCreateInfo *create_info = qi.creates_pg(); if (slot.waiting_for_split) { dout(20) << __func__ << " " << token @@ -9739,7 +9851,13 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb) // we created the pg! drop out and continue "normally"! slot.pg = pg; // install in shard slot ++osd->num_pgs; - _wake_pg_slot(token, sdata, slot); + sdata->_wake_pg_slot(token, slot); + + // identify split children between create epoch and shard epoch. + osd->service.identify_split_children( + pg->get_osdmap(), osdmap, pg->pg_id, &new_children); + sdata->_prime_splits(&new_children); + // distribute remaining split children to other shards below! break; } dout(20) << __func__ << " ignored create on " << qi << dendl; @@ -9796,6 +9914,12 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb) } sdata->sdata_op_ordering_lock.Unlock(); + if (!new_children.empty()) { + for (auto shard : osd->shards) { + shard->prime_splits(osdmap, &new_children); + } + assert(new_children.empty()); + } if (pushes_to_free) { osd->service.release_reserved_pushes(pushes_to_free); } diff --git a/src/osd/OSD.h b/src/osd/OSD.h index b1742dae1f72..6722910b410a 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -799,7 +799,7 @@ public: void queue_for_snap_trim(PG *pg); void queue_for_scrub(PG *pg, bool with_high_priority); void queue_for_pg_delete(spg_t pgid, epoch_t e); - void finish_pg_delete(PG *pg); + void finish_pg_delete(PG *pg, unsigned old_pg_num); private: // -- pg recovery and associated throttling -- @@ -922,6 +922,13 @@ public: return get_deleted_pool_pg_num(pool); } + /// identify split child pgids over a osdmap interval + void identify_split_children( + OSDMapRef old_map, + OSDMapRef new_map, + spg_t pgid, + set *new_children); + void need_heartbeat_peer_update(); void init(); @@ -1115,6 +1122,9 @@ enum class io_queue { }; struct OSDShard { + const unsigned shard_id; + CephContext *cct; + OSD *osd; Mutex sdata_lock; Cond sdata_cond; @@ -1142,7 +1152,7 @@ struct OSDShard { /// map of slots for each spg_t. maintains ordering of items dequeued /// from pqueue while _process thread drops shard lock to acquire the - /// pg lock. slots are removed only by prune_or_wake_pg_waiters. + /// pg lock. slots are removed by consume_map. unordered_map pg_slots; /// priority queue @@ -1163,11 +1173,30 @@ struct OSDShard { priority, cost, std::move(item)); } + /// push osdmap into shard + void consume_map( + OSDMapRef& osdmap, + unsigned *pushes_to_free, + set *new_children); + + void _wake_pg_slot(spg_t pgid, OSDShard::pg_slot& slot); + + void _prime_splits(set *pgids); + void prime_splits(OSDMapRef as_of_osdmap, set *pgids); + void register_and_wake_split_child(PG *pg); + void unprime_split_children(spg_t parent, unsigned old_pg_num); + OSDShard( + int id, + CephContext *cct, + OSD *osd, string lock_name, string ordering_lock, - uint64_t max_tok_per_prio, uint64_t min_cost, CephContext *cct, + uint64_t max_tok_per_prio, uint64_t min_cost, io_queue opqueue) - : sdata_lock(lock_name.c_str(), false, true, false, cct), + : shard_id(id), + cct(cct), + osd(osd), + sdata_lock(lock_name.c_str(), false, true, false, cct), sdata_op_ordering_lock(ordering_lock.c_str(), false, true, false, cct) { if (opqueue == io_queue::weightedpriority) { @@ -1610,7 +1639,9 @@ private: friend std::ostream& operator<<(std::ostream& out, const io_queue& q); const io_queue op_queue; +public: const unsigned int op_prio_cutoff; +protected: /* * The ordered op delivery chain is: @@ -1659,17 +1690,6 @@ private: /// wake any pg waiters after a PG is split void wake_pg_split_waiters(spg_t pgid); - void _wake_pg_slot(spg_t pgid, OSDShard *sdata, OSDShard::pg_slot& slot); - - /// prime slots for splitting pgs - void prime_splits(const set& pgs); - - /// prune ops (and possibly pg_slots) for pgs that shouldn't be here - void prune_or_wake_pg_waiters(OSDMapRef osdmap, int whoami); - - /// clear cached PGRef on pg deletion - void clear_pg_pointer(PG *pg); - /// clear pg_slots on shutdown void clear_pg_slots(); @@ -1839,10 +1859,10 @@ public: vector shards; uint32_t num_shards = 0; -protected: // -- placement groups -- std::atomic num_pgs = {0}; +protected: std::mutex pending_creates_lock; using create_from_osd_t = std::pair; std::set pending_creates_from_osd; @@ -1852,7 +1872,8 @@ protected: PGRef _lookup_pg(spg_t pgid); PG *_lookup_lock_pg(spg_t pgid); - void _register_pg(PGRef pg); + void register_pg(PGRef pg); + void unregister_pg(PG *pg); void _get_pgs(vector *v, bool clear_too=false); void _get_pgids(vector *v); diff --git a/src/osd/PG.cc b/src/osd/PG.cc index 26edecd66429..de88e9ef5882 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -6543,7 +6543,7 @@ void PG::_delete_some() osd->meta_ch, std::move(t)); assert(r == 0); - osd->finish_pg_delete(this); + osd->finish_pg_delete(this, pool.info.get_pg_num()); deleted = true; // cancel reserver here, since the PG is about to get deleted and the diff --git a/src/osd/osd_types.h b/src/osd/osd_types.h index 49304b6600a2..0a41f5eb1b73 100644 --- a/src/osd/osd_types.h +++ b/src/osd/osd_types.h @@ -520,6 +520,11 @@ struct spg_t { bool parse(const std::string& s) { return parse(s.c_str()); } + + spg_t get_ancestor(unsigned old_pg_num) const { + return spg_t(pgid.get_ancestor(old_pg_num), shard); + } + bool is_split(unsigned old_pg_num, unsigned new_pg_num, set *pchildren) const { set _children;