From aca0dc1caa5323a3f98050f00c3fdcf5f167582c Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Fri, 6 Apr 2018 10:26:52 -0500 Subject: [PATCH] osd: implement pg merge - Vevamps the split tracking infrastructure, and adds new tracking for upcoming merges in consume_map. These are now unified into the same identify_ method. these consume the new pg_num change tracking instructure we just added in the prior commit. - PGs that are about to merge have a new wait infrastructure, since all sources and the target have to reach the target epoch before the merge can happen. - If one of the sources for a merge does not exist, we create an empty dummy PG to merge with. The implies that the resulting merged PG will be incomplete (and mostly useless), but it unifies the code paths. - The actual merge (PG::merge_from) happens in advance_pg(). Fixes: http://tracker.ceph.com/issues/85 Signed-off-by: Sage Weil --- src/osd/OSD.cc | 461 +++++++++++++++++++++++++++++++++++++++++-------- src/osd/OSD.h | 34 +++- src/osd/PG.h | 4 + 3 files changed, 418 insertions(+), 81 deletions(-) diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 360c1617c2914..073c90c66e08a 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -332,34 +332,104 @@ void OSDService::dump_live_pgids() -void OSDService::identify_split_children( +void OSDService::identify_splits_and_merges( OSDMapRef old_map, OSDMapRef new_map, spg_t pgid, - set *new_children) + set> *split_children, + set> *merge_pgs) { if (!old_map->have_pg_pool(pgid.pool())) { return; } int old_pgnum = old_map->get_pg_num(pgid.pool()); - int new_pgnum = get_possibly_deleted_pool_pg_num( - new_map, pgid.pool()); - dout(20) << __func__ << " old " << old_pgnum << " e" << old_map->get_epoch() - << " new " << new_pgnum << " e" << new_map->get_epoch() - << dendl; - if (pgid.ps() < static_cast(old_pgnum)) { - set children; - if (pgid.is_split(old_pgnum, new_pgnum, &children)) { - dout(20) << __func__ << " " << pgid << " children " << children << dendl; - new_children->insert(children.begin(), children.end()); + auto p = osd->pg_num_history.pg_nums.find(pgid.pool()); + if (p == osd->pg_num_history.pg_nums.end()) { + return; + } + dout(20) << __func__ << " " << pgid << " e" << old_map->get_epoch() + << " to e" << new_map->get_epoch() + << " pg_nums " << p->second << dendl; + deque queue; + queue.push_back(pgid); + while (!queue.empty()) { + auto cur = queue.front(); + queue.pop_front(); + unsigned pgnum = old_pgnum; + for (auto q = p->second.lower_bound(old_map->get_epoch()); + q != p->second.end() && + q->first <= new_map->get_epoch(); + ++q) { + derr << __func__ << " q " << q->first + << " pgnum " << pgnum << " -> " << q->second << dendl; + if (pgnum < q->second) { + // split? + if (cur.ps() < pgnum) { + set children; + if (cur.is_split(pgnum, q->second, &children)) { + dout(20) << __func__ << " " << cur << " e" << q->first + << " pg_num " << pgnum << " -> " << q->second + << " children " << children << dendl; + for (auto i : children) { + split_children->insert(make_pair(i, q->first)); + queue.push_back(i); + } + } + } else if (cur.ps() < q->second) { + dout(20) << __func__ << " " << cur << " e" << q->first + << " pg_num " << pgnum << " -> " << q->second + << " is a child" << dendl; + // normally we'd capture this from the parent, but it's + // possible the parent doesn't exist yet (it will be + // fabricated to allow an intervening merge). note this PG + // as a split child here to be sure we catch it. + split_children->insert(make_pair(cur, q->first)); + } else { + dout(20) << __func__ << " " << cur << " e" << q->first + << " pg_num " << pgnum << " -> " << q->second + << " is post-split, skipping" << dendl; + } + } else if (merge_pgs) { + // merge? + if (cur.ps() >= q->second) { + if (cur.ps() < pgnum) { + spg_t parent; + if (cur.is_merge_source(pgnum, q->second, &parent)) { + set children; + parent.is_split(q->second, pgnum, &children); + dout(20) << __func__ << " " << cur << " e" << q->first + << " pg_num " << pgnum << " -> " << q->second + << " is merge source, target " << parent + << ", source(s) " << children << dendl; + merge_pgs->insert(make_pair(cur, q->first)); + merge_pgs->insert(make_pair(parent, q->first)); + for (auto c : children) { + merge_pgs->insert(make_pair(c, q->first)); + } + } + } else { + dout(20) << __func__ << " " << cur << " e" << q->first + << " pg_num " << pgnum << " -> " << q->second + << " is beyond old pgnum, skipping" << dendl; + } + } else { + set children; + if (cur.is_split(q->second, pgnum, &children)) { + dout(20) << __func__ << " " << cur << " e" << q->first + << " pg_num " << pgnum << " -> " << q->second + << " is merge target, source " << children << dendl; + for (auto c : children) { + merge_pgs->insert(make_pair(c, q->first)); + } + merge_pgs->insert(make_pair(cur, q->first)); + } + } + } + pgnum = q->second; } - } else if (pgid.ps() >= static_cast(new_pgnum)) { - dout(20) << __func__ << " " << pgid << " is post-split, skipping" << dendl; } } - - void OSDService::need_heartbeat_peer_update() { osd->need_heartbeat_peer_update(); @@ -2678,6 +2748,31 @@ int OSD::init() service.publish_superblock(superblock); service.max_oldest_map = superblock.oldest_map; + for (auto& shard : shards) { + for (auto& i : shard->pg_slots) { + PGRef pg = i.second->pg; + + pg->lock(); + set> new_children; + set> merge_pgs; + service.identify_splits_and_merges(pg->get_osdmap(), osdmap, pg->pg_id, + &new_children, &merge_pgs); + if (!new_children.empty()) { + for (auto shard : shards) { + shard->prime_splits(osdmap, &new_children); + } + assert(new_children.empty()); + } + if (!merge_pgs.empty()) { + for (auto shard : shards) { + shard->prime_merges(osdmap, &merge_pgs); + } + assert(merge_pgs.empty()); + } + pg->unlock(); + } + } + osd_op_tp.start(); command_tp.start(); @@ -3971,16 +4066,6 @@ void OSD::load_pgs() continue; } - set new_children; - service.identify_split_children(pg->get_osdmap(), osdmap, pg->pg_id, - &new_children); - if (!new_children.empty()) { - for (auto shard : shards) { - shard->prime_splits(osdmap, &new_children); - } - ceph_assert(new_children.empty()); - } - pg->reg_next_scrub(); dout(10) << __func__ << " loaded " << *pg << dendl; @@ -7974,8 +8059,21 @@ void OSD::_finish_splits(set& pgs) dispatch_context(rctx, 0, service.get_osdmap()); }; +bool OSD::add_merge_waiter(OSDMapRef nextmap, spg_t target, PGRef src, + unsigned need) +{ + Mutex::Locker l(merge_lock); + auto& p = merge_waiters[nextmap->get_epoch()][target]; + p[src->pg_id] = src; + dout(10) << __func__ << " added merge_waiter " << src->pg_id + << " for " << target << ", have " << p.size() << "/" << need + << dendl; + return p.size() == need; +} + bool OSD::advance_pg( - epoch_t osd_epoch, PG *pg, + epoch_t osd_epoch, + PG *pg, ThreadPool::TPHandle &handle, PG::RecoveryCtx *rctx) { @@ -7986,6 +8084,10 @@ bool OSD::advance_pg( OSDMapRef lastmap = pg->get_osdmap(); ceph_assert(lastmap->get_epoch() < osd_epoch); set new_pgs; // any split children + bool ret = true; + + unsigned old_pg_num = lastmap->have_pg_pool(pg->pg_id.pool()) ? + lastmap->get_pg_num(pg->pg_id.pool()) : 0; for (epoch_t next_epoch = pg->get_osdmap_epoch() + 1; next_epoch <= osd_epoch; ++next_epoch) { @@ -7995,6 +8097,105 @@ bool OSD::advance_pg( continue; } + unsigned new_pg_num = + (old_pg_num && nextmap->have_pg_pool(pg->pg_id.pool())) ? + nextmap->get_pg_num(pg->pg_id.pool()) : 0; + if (old_pg_num && new_pg_num && old_pg_num != new_pg_num) { + // check for merge + if (nextmap->have_pg_pool(pg->pg_id.pool())) { + spg_t parent; + if (pg->pg_id.is_merge_source( + old_pg_num, + new_pg_num, + &parent)) { + // we are merge source + PGRef spg = pg; // carry a ref + dout(1) << __func__ << " " << pg->pg_id + << " is merge source, target is " << parent + << dendl; + pg->write_if_dirty(rctx); + dispatch_context_transaction(*rctx, pg, &handle); + pg->ch->flush(); + pg->on_shutdown(); + OSDShard *sdata = pg->osd_shard; + { + Mutex::Locker l(sdata->shard_lock); + if (pg->pg_slot) { + sdata->_detach_pg(pg->pg_slot); + // update pg count now since we might not get an osdmap + // any time soon. + if (pg->is_primary()) + logger->dec(l_osd_pg_primary); + else if (pg->is_replica()) + logger->dec(l_osd_pg_replica); + else + logger->dec(l_osd_pg_stray); + } + } + pg->unlock(); + + set children; + parent.is_split(new_pg_num, old_pg_num, &children); + if (add_merge_waiter(nextmap, parent, pg, children.size())) { + enqueue_peering_evt( + parent, + PGPeeringEventRef( + std::make_shared( + nextmap->get_epoch(), + nextmap->get_epoch(), + NullEvt()))); + } + ret = false; + goto out; + } else if (pg->pg_id.is_merge_target(old_pg_num, new_pg_num)) { + // we are merge target + set children; + pg->pg_id.is_split(new_pg_num, old_pg_num, &children); + dout(20) << __func__ << " " << pg->pg_id + << " is merge target, sources are " << children + << dendl; + map sources; + { + Mutex::Locker l(merge_lock); + auto& s = merge_waiters[nextmap->get_epoch()][pg->pg_id]; + unsigned need = children.size(); + dout(20) << __func__ << " have " << s.size() << "/" + << need << dendl; + if (s.size() == need) { + sources.swap(s); + merge_waiters[nextmap->get_epoch()].erase(pg->pg_id); + if (merge_waiters[nextmap->get_epoch()].empty()) { + merge_waiters.erase(nextmap->get_epoch()); + } + } + } + if (!sources.empty()) { + unsigned new_pg_num = nextmap->get_pg_num(pg->pg_id.pool()); + unsigned split_bits = pg->pg_id.get_split_bits(new_pg_num); + dout(1) << __func__ << " merging " << pg->pg_id << dendl; + pg->merge_from(sources, rctx, split_bits); + } else { + dout(20) << __func__ << " not ready to merge yet" << dendl; + pg->write_if_dirty(rctx); + pg->unlock(); + // kick source(s) to get them ready + for (auto& i : sources) { + dout(20) << __func__ << " kicking source " << i.first << dendl; + enqueue_peering_evt( + i.first, + PGPeeringEventRef( + std::make_shared( + nextmap->get_epoch(), + nextmap->get_epoch(), + NullEvt()))); + } + ret = false; + goto out; + } + } + } + } + vector newup, newacting; int up_primary, acting_primary; nextmap->pg_to_up_acting_osds( @@ -8005,29 +8206,31 @@ bool OSD::advance_pg( nextmap, lastmap, newup, up_primary, newacting, acting_primary, rctx); - // Check for split! - set children; - spg_t parent(pg->pg_id); - if (nextmap->have_pg_pool(pg->pg_id.pool()) && - parent.is_split( - lastmap->get_pg_num(pg->pg_id.pool()), - nextmap->get_pg_num(pg->pg_id.pool()), - &children)) { - split_pgs( - pg, children, &new_pgs, lastmap, nextmap, - rctx); + if (new_pg_num && old_pg_num != new_pg_num) { + // check for split + set children; + if (pg->pg_id.is_split( + old_pg_num, + new_pg_num, + &children)) { + split_pgs( + pg, children, &new_pgs, lastmap, nextmap, + rctx); + } } lastmap = nextmap; + old_pg_num = new_pg_num; handle.reset_tp_timeout(); } pg->handle_activate_map(rctx); + ret = true; + out: if (!new_pgs.empty()) { rctx->transaction->register_on_applied(new C_FinishSplits(this, new_pgs)); } - - return true; + return ret; } void OSD::consume_map() @@ -8048,10 +8251,11 @@ void OSD::consume_map() service.await_reserved_maps(); service.publish_map(osdmap); - // prime splits - set newly_split; + // prime splits and merges + set> newly_split; // splits, and when + set> merge_pgs; // merge participants, and when for (auto& shard : shards) { - shard->identify_splits(osdmap, &newly_split); + shard->identify_splits_and_merges(osdmap, &newly_split, &merge_pgs); } if (!newly_split.empty()) { for (auto& shard : shards) { @@ -8060,6 +8264,28 @@ void OSD::consume_map() ceph_assert(newly_split.empty()); } + // prune sent_ready_to_merge + service.prune_sent_ready_to_merge(osdmap); + + // FIXME, maybe: We could race against an incoming peering message + // that instantiates a merge PG after identify_merges() below and + // never set up its peer to complete the merge. An OSD restart + // would clear it up. This is a hard race to resolve, + // extraordinarily rare (we only merge PGs that are stable and + // clean, so it'd have to be an imported PG to an OSD with a + // slightly stale OSDMap...), so I'm ignoring it for now. We plan to + // replace all of this with a seastar-based code soon anyway. + if (!merge_pgs.empty()) { + // mark the pgs we already have, or create new and empty merge + // participants for those we are missing. do this all under the + // shard lock so we don't have to worry about racing pg creates + // via _process. + for (auto& shard : shards) { + shard->prime_merges(osdmap, &merge_pgs); + } + ceph_assert(merge_pgs.empty()); + } + unsigned pushes_to_free = 0; for (auto& shard : shards) { shard->consume_map(osdmap, &pushes_to_free); @@ -9626,9 +9852,16 @@ void OSDShard::consume_map( OSDShardPGSlot *slot = p->second.get(); const spg_t& pgid = p->first; dout(20) << __func__ << " " << pgid << dendl; - if (slot->waiting_for_split) { + if (!slot->waiting_for_split.empty()) { dout(20) << __func__ << " " << pgid - << " waiting for split" << dendl; + << " waiting for split " << slot->waiting_for_split << dendl; + ++p; + continue; + } + if (slot->waiting_for_merge_epoch > new_osdmap->get_epoch()) { + dout(20) << __func__ << " " << pgid + << " waiting for merge by epoch " << slot->waiting_for_merge_epoch + << dendl; ++p; continue; } @@ -9668,6 +9901,7 @@ void OSDShard::consume_map( } if (slot->waiting.empty() && slot->num_running == 0 && + slot->waiting_for_split.empty() && !slot->pg) { dout(20) << __func__ << " " << pgid << " empty, pruning" << dendl; p = pg_slots.erase(p); @@ -9714,37 +9948,47 @@ void OSDShard::_wake_pg_slot( } } slot->waiting_peering.clear(); - slot->waiting_for_split = false; ++slot->requeue_seq; } -void OSDShard::identify_splits(const OSDMapRef& as_of_osdmap, set *pgids) +void OSDShard::identify_splits_and_merges( + const OSDMapRef& as_of_osdmap, + set> *split_pgs, + set> *merge_pgs) { Mutex::Locker l(shard_lock); if (shard_osdmap) { for (auto& i : pg_slots) { const spg_t& pgid = i.first; auto *slot = i.second.get(); - if (slot->pg || slot->waiting_for_split) { - osd->service.identify_split_children(shard_osdmap, as_of_osdmap, pgid, - pgids); + if (slot->pg) { + osd->service.identify_splits_and_merges( + shard_osdmap, as_of_osdmap, pgid, + split_pgs, merge_pgs); + } else if (!slot->waiting_for_split.empty()) { + osd->service.identify_splits_and_merges( + shard_osdmap, as_of_osdmap, pgid, + split_pgs, nullptr); } else { dout(20) << __func__ << " slot " << pgid - << " has no pg and !waiting_for_split" << dendl; + << " has no pg and waiting_for_split " + << slot->waiting_for_split << dendl; } } } } -void OSDShard::prime_splits(const OSDMapRef& as_of_osdmap, set *pgids) +void OSDShard::prime_splits(const OSDMapRef& as_of_osdmap, + set> *pgids) { Mutex::Locker l(shard_lock); _prime_splits(pgids); if (shard_osdmap->get_epoch() > as_of_osdmap->get_epoch()) { - set newer_children; - for (auto pgid : *pgids) { - osd->service.identify_split_children(as_of_osdmap, shard_osdmap, pgid, - &newer_children); + set> newer_children; + for (auto i : *pgids) { + osd->service.identify_splits_and_merges( + as_of_osdmap, shard_osdmap, i.first, + &newer_children, nullptr); } newer_children.insert(pgids->begin(), pgids->end()); dout(10) << "as_of_osdmap " << as_of_osdmap->get_epoch() << " < shard " @@ -9762,27 +10006,24 @@ void OSDShard::prime_splits(const OSDMapRef& as_of_osdmap, set *pgids) } } -void OSDShard::_prime_splits(set *pgids) +void OSDShard::_prime_splits(set> *pgids) { dout(10) << *pgids << dendl; auto p = pgids->begin(); while (p != pgids->end()) { - unsigned shard_index = p->hash_to_shard(osd->num_shards); + unsigned shard_index = p->first.hash_to_shard(osd->num_shards); if (shard_index == shard_id) { - auto r = pg_slots.emplace(*p, nullptr); + auto r = pg_slots.emplace(p->first, nullptr); if (r.second) { - dout(10) << "priming slot " << *p << dendl; + dout(10) << "priming slot " << p->first << " e" << p->second << dendl; r.first->second = make_unique(); - r.first->second->waiting_for_split = true; + r.first->second->waiting_for_split.insert(p->second); } else { auto q = r.first; ceph_assert(q != pg_slots.end()); - if (q->second->waiting_for_split) { - dout(10) << "slot " << *p << " already primed" << dendl; - } else { - dout(10) << "priming (existing) slot " << *p << dendl; - q->second->waiting_for_split = true; - } + dout(10) << "priming (existing) slot " << p->first << " e" << p->second + << dendl; + q->second->waiting_for_split.insert(p->second); } p = pgids->erase(p); } else { @@ -9791,6 +10032,66 @@ void OSDShard::_prime_splits(set *pgids) } } +void OSDShard::prime_merges(const OSDMapRef& as_of_osdmap, + set> *merge_pgs) +{ + Mutex::Locker l(shard_lock); + dout(20) << __func__ << " checking shard " << shard_id + << " for remaining merge pgs " << merge_pgs << dendl; + auto p = merge_pgs->begin(); + while (p != merge_pgs->end()) { + spg_t pgid = p->first; + epoch_t epoch = p->second; + unsigned shard_index = pgid.hash_to_shard(osd->num_shards); + if (shard_index != shard_id) { + ++p; + continue; + } + OSDShardPGSlot *slot; + auto r = pg_slots.emplace(pgid, nullptr); + if (r.second) { + r.first->second = make_unique(); + } + slot = r.first->second.get(); + if (slot->pg) { + // already have pg + dout(20) << __func__ << " have merge target pg " << pgid + << " " << slot->pg << dendl; + } else if (!slot->waiting_for_split.empty() && + *slot->waiting_for_split.begin() < epoch) { + dout(20) << __func__ << " pending split on merge target pg " << pgid + << " " << slot->waiting_for_split << dendl; + } else { + dout(20) << __func__ << " creating empty merge participant " << pgid + << " for merge in " << epoch << dendl; + // Construct a history with a single previous interval, + // going back to the epoch *before* pg_num_pending was + // adjusted (since we are creating the PG as it would have + // existed just before the merge). We know that the PG was + // clean as of that epoch or else pg_num_pending wouldn't + // have been adjusted. + pg_history_t history; + history.same_interval_since = epoch - 1; + // leave these zeroed since we do not know the precist + // last_epoch_started value that the real PG instances have. If + // we are greater than they are, we will trigger an 'incomplete' + // state (see choose_acting). + history.last_epoch_started = 0; + history.last_epoch_clean = 0; + PGCreateInfo cinfo(pgid, epoch - 1, + history, PastIntervals(), false); + PGRef pg = osd->handle_pg_create_info(shard_osdmap, &cinfo); + _attach_pg(r.first->second.get(), pg.get()); + _wake_pg_slot(pgid, slot); + pg->unlock(); + } + // mark slot for merge + dout(20) << __func__ << " marking merge participant " << pgid << dendl; + slot->waiting_for_merge_epoch = epoch; + p = merge_pgs->erase(p); + } +} + void OSDShard::register_and_wake_split_child(PG *pg) { { @@ -9799,10 +10100,21 @@ void OSDShard::register_and_wake_split_child(PG *pg) auto p = pg_slots.find(pg->pg_id); ceph_assert(p != pg_slots.end()); auto *slot = p->second.get(); + dout(20) << pg->pg_id << " waiting_for_split " << slot->waiting_for_split + << dendl; ceph_assert(!slot->pg); - ceph_assert(slot->waiting_for_split); + ceph_assert(!slot->waiting_for_split.empty()); _attach_pg(slot, pg); - _wake_pg_slot(pg->pg_id, slot); + + epoch_t epoch = pg->get_osdmap_epoch(); + ceph_assert(slot->waiting_for_split.count(epoch)); + slot->waiting_for_split.erase(epoch); + if (slot->waiting_for_split.empty()) { + _wake_pg_slot(pg->pg_id, slot); + } else { + dout(10) << __func__ << " still waiting for split on " + << slot->waiting_for_split << dendl; + } } sdata_wait_lock.Lock(); sdata_cond.SignalOne(); @@ -9826,6 +10138,7 @@ void OSDShard::unprime_split_children(spg_t parent, unsigned old_pg_num) } } + // ============================================================= #undef dout_context @@ -9970,16 +10283,16 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb) auto qi = std::move(slot->to_process.front()); slot->to_process.pop_front(); dout(20) << __func__ << " " << qi << " pg " << pg << dendl; - set new_children; + set> new_children; OSDMapRef osdmap; while (!pg) { // should this pg shard exist on this osd in this (or a later) epoch? osdmap = sdata->shard_osdmap; const PGCreateInfo *create_info = qi.creates_pg(); - if (slot->waiting_for_split) { + if (!slot->waiting_for_split.empty()) { dout(20) << __func__ << " " << token - << " splitting" << dendl; + << " splitting " << slot->waiting_for_split << dendl; _add_slot_waiter(token, slot, std::move(qi)); } else if (qi.get_map_epoch() > osdmap->get_epoch()) { dout(20) << __func__ << " " << token @@ -10008,8 +10321,8 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb) sdata->_wake_pg_slot(token, slot); // identify split children between create epoch and shard epoch. - osd->service.identify_split_children( - pg->get_osdmap(), osdmap, pg->pg_id, &new_children); + osd->service.identify_splits_and_merges( + pg->get_osdmap(), osdmap, pg->pg_id, &new_children, nullptr); sdata->_prime_splits(&new_children); // distribute remaining split children to other shards below! break; diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 596b77d08dc41..463ff4e00bb03 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -877,11 +877,12 @@ public: } /// identify split child pgids over a osdmap interval - void identify_split_children( + void identify_splits_and_merges( OSDMapRef old_map, OSDMapRef new_map, spg_t pgid, - set *new_children); + set> *new_children, + set> *merge_pgs); void need_heartbeat_peer_update(); @@ -1105,11 +1106,14 @@ struct OSDShardPGSlot { /// should bail out (their op has been requeued) uint64_t requeue_seq = 0; - /// waiting for split child to materialize - bool waiting_for_split = false; + /// waiting for split child to materialize in these epoch(s) + set waiting_for_split; epoch_t epoch = 0; boost::intrusive::set_member_hook<> pg_epoch_item; + + /// waiting for a merge (source or target) by this epoch + epoch_t waiting_for_merge_epoch = 0; }; struct OSDShard { @@ -1192,9 +1196,15 @@ struct OSDShard { void _wake_pg_slot(spg_t pgid, OSDShardPGSlot *slot); - void identify_splits(const OSDMapRef& as_of_osdmap, set *pgids); - void _prime_splits(set *pgids); - void prime_splits(const OSDMapRef& as_of_osdmap, set *pgids); + void identify_splits_and_merges( + const OSDMapRef& as_of_osdmap, + set> *split_children, + set> *merge_pgs); + void _prime_splits(set> *pgids); + void prime_splits(const OSDMapRef& as_of_osdmap, + set> *pgids); + void prime_merges(const OSDMapRef& as_of_osdmap, + set> *merge_pgs); void register_and_wake_split_child(PG *pg); void unprime_split_children(spg_t parent, unsigned old_pg_num); @@ -1865,6 +1875,13 @@ public: } protected: + Mutex merge_lock = {"OSD::merge_lock"}; + /// merge epoch -> target pgid -> source pgid -> pg + map>> merge_waiters; + + bool add_merge_waiter(OSDMapRef nextmap, spg_t target, PGRef source, + unsigned need); + // -- placement groups -- std::atomic num_pgs = {0}; @@ -2026,7 +2043,10 @@ protected: void handle_fast_pg_info(MOSDPGInfo *m); void handle_fast_pg_remove(MOSDPGRemove *m); +public: + // used by OSDShard PGRef handle_pg_create_info(const OSDMapRef& osdmap, const PGCreateInfo *info); +protected: void handle_fast_force_recovery(MOSDForceRecovery *m); diff --git a/src/osd/PG.h b/src/osd/PG.h index 259cf6fdca072..a4a5142f1bdbf 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -2861,6 +2861,10 @@ public: bool dirty_epoch, bool try_fast_info, PerfCounters *logger = nullptr); + + void write_if_dirty(RecoveryCtx *rctx) { + write_if_dirty(*rctx->transaction); + } protected: void write_if_dirty(ObjectStore::Transaction& t); -- 2.39.5