-void OSDService::identify_split_children(
+void OSDService::identify_splits_and_merges(
OSDMapRef old_map,
OSDMapRef new_map,
spg_t pgid,
- set<spg_t> *new_children)
+ set<pair<spg_t,epoch_t>> *split_children,
+ set<pair<spg_t,epoch_t>> *merge_pgs)
{
if (!old_map->have_pg_pool(pgid.pool())) {
return;
}
int old_pgnum = old_map->get_pg_num(pgid.pool());
- int new_pgnum = get_possibly_deleted_pool_pg_num(
- new_map, pgid.pool());
- dout(20) << __func__ << " old " << old_pgnum << " e" << old_map->get_epoch()
- << " new " << new_pgnum << " e" << new_map->get_epoch()
- << dendl;
- if (pgid.ps() < static_cast<unsigned>(old_pgnum)) {
- set<spg_t> children;
- if (pgid.is_split(old_pgnum, new_pgnum, &children)) {
- dout(20) << __func__ << " " << pgid << " children " << children << dendl;
- new_children->insert(children.begin(), children.end());
+ auto p = osd->pg_num_history.pg_nums.find(pgid.pool());
+ if (p == osd->pg_num_history.pg_nums.end()) {
+ return;
+ }
+ dout(20) << __func__ << " " << pgid << " e" << old_map->get_epoch()
+ << " to e" << new_map->get_epoch()
+ << " pg_nums " << p->second << dendl;
+ deque<spg_t> queue;
+ queue.push_back(pgid);
+ while (!queue.empty()) {
+ auto cur = queue.front();
+ queue.pop_front();
+ unsigned pgnum = old_pgnum;
+ for (auto q = p->second.lower_bound(old_map->get_epoch());
+ q != p->second.end() &&
+ q->first <= new_map->get_epoch();
+ ++q) {
+ derr << __func__ << " q " << q->first
+ << " pgnum " << pgnum << " -> " << q->second << dendl;
+ if (pgnum < q->second) {
+ // split?
+ if (cur.ps() < pgnum) {
+ set<spg_t> children;
+ if (cur.is_split(pgnum, q->second, &children)) {
+ dout(20) << __func__ << " " << cur << " e" << q->first
+ << " pg_num " << pgnum << " -> " << q->second
+ << " children " << children << dendl;
+ for (auto i : children) {
+ split_children->insert(make_pair(i, q->first));
+ queue.push_back(i);
+ }
+ }
+ } else if (cur.ps() < q->second) {
+ dout(20) << __func__ << " " << cur << " e" << q->first
+ << " pg_num " << pgnum << " -> " << q->second
+ << " is a child" << dendl;
+ // normally we'd capture this from the parent, but it's
+ // possible the parent doesn't exist yet (it will be
+ // fabricated to allow an intervening merge). note this PG
+ // as a split child here to be sure we catch it.
+ split_children->insert(make_pair(cur, q->first));
+ } else {
+ dout(20) << __func__ << " " << cur << " e" << q->first
+ << " pg_num " << pgnum << " -> " << q->second
+ << " is post-split, skipping" << dendl;
+ }
+ } else if (merge_pgs) {
+ // merge?
+ if (cur.ps() >= q->second) {
+ if (cur.ps() < pgnum) {
+ spg_t parent;
+ if (cur.is_merge_source(pgnum, q->second, &parent)) {
+ set<spg_t> children;
+ parent.is_split(q->second, pgnum, &children);
+ dout(20) << __func__ << " " << cur << " e" << q->first
+ << " pg_num " << pgnum << " -> " << q->second
+ << " is merge source, target " << parent
+ << ", source(s) " << children << dendl;
+ merge_pgs->insert(make_pair(cur, q->first));
+ merge_pgs->insert(make_pair(parent, q->first));
+ for (auto c : children) {
+ merge_pgs->insert(make_pair(c, q->first));
+ }
+ }
+ } else {
+ dout(20) << __func__ << " " << cur << " e" << q->first
+ << " pg_num " << pgnum << " -> " << q->second
+ << " is beyond old pgnum, skipping" << dendl;
+ }
+ } else {
+ set<spg_t> children;
+ if (cur.is_split(q->second, pgnum, &children)) {
+ dout(20) << __func__ << " " << cur << " e" << q->first
+ << " pg_num " << pgnum << " -> " << q->second
+ << " is merge target, source " << children << dendl;
+ for (auto c : children) {
+ merge_pgs->insert(make_pair(c, q->first));
+ }
+ merge_pgs->insert(make_pair(cur, q->first));
+ }
+ }
+ }
+ pgnum = q->second;
}
- } else if (pgid.ps() >= static_cast<unsigned>(new_pgnum)) {
- dout(20) << __func__ << " " << pgid << " is post-split, skipping" << dendl;
}
}
-
-
void OSDService::need_heartbeat_peer_update()
{
osd->need_heartbeat_peer_update();
service.publish_superblock(superblock);
service.max_oldest_map = superblock.oldest_map;
+ for (auto& shard : shards) {
+ for (auto& i : shard->pg_slots) {
+ PGRef pg = i.second->pg;
+
+ pg->lock();
+ set<pair<spg_t,epoch_t>> new_children;
+ set<pair<spg_t,epoch_t>> merge_pgs;
+ service.identify_splits_and_merges(pg->get_osdmap(), osdmap, pg->pg_id,
+ &new_children, &merge_pgs);
+ if (!new_children.empty()) {
+ for (auto shard : shards) {
+ shard->prime_splits(osdmap, &new_children);
+ }
+ assert(new_children.empty());
+ }
+ if (!merge_pgs.empty()) {
+ for (auto shard : shards) {
+ shard->prime_merges(osdmap, &merge_pgs);
+ }
+ assert(merge_pgs.empty());
+ }
+ pg->unlock();
+ }
+ }
+
osd_op_tp.start();
command_tp.start();
continue;
}
- set<spg_t> new_children;
- service.identify_split_children(pg->get_osdmap(), osdmap, pg->pg_id,
- &new_children);
- if (!new_children.empty()) {
- for (auto shard : shards) {
- shard->prime_splits(osdmap, &new_children);
- }
- ceph_assert(new_children.empty());
- }
-
pg->reg_next_scrub();
dout(10) << __func__ << " loaded " << *pg << dendl;
dispatch_context(rctx, 0, service.get_osdmap());
};
+bool OSD::add_merge_waiter(OSDMapRef nextmap, spg_t target, PGRef src,
+ unsigned need)
+{
+ Mutex::Locker l(merge_lock);
+ auto& p = merge_waiters[nextmap->get_epoch()][target];
+ p[src->pg_id] = src;
+ dout(10) << __func__ << " added merge_waiter " << src->pg_id
+ << " for " << target << ", have " << p.size() << "/" << need
+ << dendl;
+ return p.size() == need;
+}
+
bool OSD::advance_pg(
- epoch_t osd_epoch, PG *pg,
+ epoch_t osd_epoch,
+ PG *pg,
ThreadPool::TPHandle &handle,
PG::RecoveryCtx *rctx)
{
OSDMapRef lastmap = pg->get_osdmap();
ceph_assert(lastmap->get_epoch() < osd_epoch);
set<PGRef> new_pgs; // any split children
+ bool ret = true;
+
+ unsigned old_pg_num = lastmap->have_pg_pool(pg->pg_id.pool()) ?
+ lastmap->get_pg_num(pg->pg_id.pool()) : 0;
for (epoch_t next_epoch = pg->get_osdmap_epoch() + 1;
next_epoch <= osd_epoch;
++next_epoch) {
continue;
}
+ unsigned new_pg_num =
+ (old_pg_num && nextmap->have_pg_pool(pg->pg_id.pool())) ?
+ nextmap->get_pg_num(pg->pg_id.pool()) : 0;
+ if (old_pg_num && new_pg_num && old_pg_num != new_pg_num) {
+ // check for merge
+ if (nextmap->have_pg_pool(pg->pg_id.pool())) {
+ spg_t parent;
+ if (pg->pg_id.is_merge_source(
+ old_pg_num,
+ new_pg_num,
+ &parent)) {
+ // we are merge source
+ PGRef spg = pg; // carry a ref
+ dout(1) << __func__ << " " << pg->pg_id
+ << " is merge source, target is " << parent
+ << dendl;
+ pg->write_if_dirty(rctx);
+ dispatch_context_transaction(*rctx, pg, &handle);
+ pg->ch->flush();
+ pg->on_shutdown();
+ OSDShard *sdata = pg->osd_shard;
+ {
+ Mutex::Locker l(sdata->shard_lock);
+ if (pg->pg_slot) {
+ sdata->_detach_pg(pg->pg_slot);
+ // update pg count now since we might not get an osdmap
+ // any time soon.
+ if (pg->is_primary())
+ logger->dec(l_osd_pg_primary);
+ else if (pg->is_replica())
+ logger->dec(l_osd_pg_replica);
+ else
+ logger->dec(l_osd_pg_stray);
+ }
+ }
+ pg->unlock();
+
+ set<spg_t> children;
+ parent.is_split(new_pg_num, old_pg_num, &children);
+ if (add_merge_waiter(nextmap, parent, pg, children.size())) {
+ enqueue_peering_evt(
+ parent,
+ PGPeeringEventRef(
+ std::make_shared<PGPeeringEvent>(
+ nextmap->get_epoch(),
+ nextmap->get_epoch(),
+ NullEvt())));
+ }
+ ret = false;
+ goto out;
+ } else if (pg->pg_id.is_merge_target(old_pg_num, new_pg_num)) {
+ // we are merge target
+ set<spg_t> children;
+ pg->pg_id.is_split(new_pg_num, old_pg_num, &children);
+ dout(20) << __func__ << " " << pg->pg_id
+ << " is merge target, sources are " << children
+ << dendl;
+ map<spg_t,PGRef> sources;
+ {
+ Mutex::Locker l(merge_lock);
+ auto& s = merge_waiters[nextmap->get_epoch()][pg->pg_id];
+ unsigned need = children.size();
+ dout(20) << __func__ << " have " << s.size() << "/"
+ << need << dendl;
+ if (s.size() == need) {
+ sources.swap(s);
+ merge_waiters[nextmap->get_epoch()].erase(pg->pg_id);
+ if (merge_waiters[nextmap->get_epoch()].empty()) {
+ merge_waiters.erase(nextmap->get_epoch());
+ }
+ }
+ }
+ if (!sources.empty()) {
+ unsigned new_pg_num = nextmap->get_pg_num(pg->pg_id.pool());
+ unsigned split_bits = pg->pg_id.get_split_bits(new_pg_num);
+ dout(1) << __func__ << " merging " << pg->pg_id << dendl;
+ pg->merge_from(sources, rctx, split_bits);
+ } else {
+ dout(20) << __func__ << " not ready to merge yet" << dendl;
+ pg->write_if_dirty(rctx);
+ pg->unlock();
+ // kick source(s) to get them ready
+ for (auto& i : sources) {
+ dout(20) << __func__ << " kicking source " << i.first << dendl;
+ enqueue_peering_evt(
+ i.first,
+ PGPeeringEventRef(
+ std::make_shared<PGPeeringEvent>(
+ nextmap->get_epoch(),
+ nextmap->get_epoch(),
+ NullEvt())));
+ }
+ ret = false;
+ goto out;
+ }
+ }
+ }
+ }
+
vector<int> newup, newacting;
int up_primary, acting_primary;
nextmap->pg_to_up_acting_osds(
nextmap, lastmap, newup, up_primary,
newacting, acting_primary, rctx);
- // Check for split!
- set<spg_t> children;
- spg_t parent(pg->pg_id);
- if (nextmap->have_pg_pool(pg->pg_id.pool()) &&
- parent.is_split(
- lastmap->get_pg_num(pg->pg_id.pool()),
- nextmap->get_pg_num(pg->pg_id.pool()),
- &children)) {
- split_pgs(
- pg, children, &new_pgs, lastmap, nextmap,
- rctx);
+ if (new_pg_num && old_pg_num != new_pg_num) {
+ // check for split
+ set<spg_t> children;
+ if (pg->pg_id.is_split(
+ old_pg_num,
+ new_pg_num,
+ &children)) {
+ split_pgs(
+ pg, children, &new_pgs, lastmap, nextmap,
+ rctx);
+ }
}
lastmap = nextmap;
+ old_pg_num = new_pg_num;
handle.reset_tp_timeout();
}
pg->handle_activate_map(rctx);
+ ret = true;
+ out:
if (!new_pgs.empty()) {
rctx->transaction->register_on_applied(new C_FinishSplits(this, new_pgs));
}
-
- return true;
+ return ret;
}
void OSD::consume_map()
service.await_reserved_maps();
service.publish_map(osdmap);
- // prime splits
- set<spg_t> newly_split;
+ // prime splits and merges
+ set<pair<spg_t,epoch_t>> newly_split; // splits, and when
+ set<pair<spg_t,epoch_t>> merge_pgs; // merge participants, and when
for (auto& shard : shards) {
- shard->identify_splits(osdmap, &newly_split);
+ shard->identify_splits_and_merges(osdmap, &newly_split, &merge_pgs);
}
if (!newly_split.empty()) {
for (auto& shard : shards) {
ceph_assert(newly_split.empty());
}
+ // prune sent_ready_to_merge
+ service.prune_sent_ready_to_merge(osdmap);
+
+ // FIXME, maybe: We could race against an incoming peering message
+ // that instantiates a merge PG after identify_merges() below and
+ // never set up its peer to complete the merge. An OSD restart
+ // would clear it up. This is a hard race to resolve,
+ // extraordinarily rare (we only merge PGs that are stable and
+ // clean, so it'd have to be an imported PG to an OSD with a
+ // slightly stale OSDMap...), so I'm ignoring it for now. We plan to
+ // replace all of this with a seastar-based code soon anyway.
+ if (!merge_pgs.empty()) {
+ // mark the pgs we already have, or create new and empty merge
+ // participants for those we are missing. do this all under the
+ // shard lock so we don't have to worry about racing pg creates
+ // via _process.
+ for (auto& shard : shards) {
+ shard->prime_merges(osdmap, &merge_pgs);
+ }
+ ceph_assert(merge_pgs.empty());
+ }
+
unsigned pushes_to_free = 0;
for (auto& shard : shards) {
shard->consume_map(osdmap, &pushes_to_free);
OSDShardPGSlot *slot = p->second.get();
const spg_t& pgid = p->first;
dout(20) << __func__ << " " << pgid << dendl;
- if (slot->waiting_for_split) {
+ if (!slot->waiting_for_split.empty()) {
dout(20) << __func__ << " " << pgid
- << " waiting for split" << dendl;
+ << " waiting for split " << slot->waiting_for_split << dendl;
+ ++p;
+ continue;
+ }
+ if (slot->waiting_for_merge_epoch > new_osdmap->get_epoch()) {
+ dout(20) << __func__ << " " << pgid
+ << " waiting for merge by epoch " << slot->waiting_for_merge_epoch
+ << dendl;
++p;
continue;
}
}
if (slot->waiting.empty() &&
slot->num_running == 0 &&
+ slot->waiting_for_split.empty() &&
!slot->pg) {
dout(20) << __func__ << " " << pgid << " empty, pruning" << dendl;
p = pg_slots.erase(p);
}
}
slot->waiting_peering.clear();
- slot->waiting_for_split = false;
++slot->requeue_seq;
}
-void OSDShard::identify_splits(const OSDMapRef& as_of_osdmap, set<spg_t> *pgids)
+void OSDShard::identify_splits_and_merges(
+ const OSDMapRef& as_of_osdmap,
+ set<pair<spg_t,epoch_t>> *split_pgs,
+ set<pair<spg_t,epoch_t>> *merge_pgs)
{
Mutex::Locker l(shard_lock);
if (shard_osdmap) {
for (auto& i : pg_slots) {
const spg_t& pgid = i.first;
auto *slot = i.second.get();
- if (slot->pg || slot->waiting_for_split) {
- osd->service.identify_split_children(shard_osdmap, as_of_osdmap, pgid,
- pgids);
+ if (slot->pg) {
+ osd->service.identify_splits_and_merges(
+ shard_osdmap, as_of_osdmap, pgid,
+ split_pgs, merge_pgs);
+ } else if (!slot->waiting_for_split.empty()) {
+ osd->service.identify_splits_and_merges(
+ shard_osdmap, as_of_osdmap, pgid,
+ split_pgs, nullptr);
} else {
dout(20) << __func__ << " slot " << pgid
- << " has no pg and !waiting_for_split" << dendl;
+ << " has no pg and waiting_for_split "
+ << slot->waiting_for_split << dendl;
}
}
}
}
-void OSDShard::prime_splits(const OSDMapRef& as_of_osdmap, set<spg_t> *pgids)
+void OSDShard::prime_splits(const OSDMapRef& as_of_osdmap,
+ set<pair<spg_t,epoch_t>> *pgids)
{
Mutex::Locker l(shard_lock);
_prime_splits(pgids);
if (shard_osdmap->get_epoch() > as_of_osdmap->get_epoch()) {
- set<spg_t> newer_children;
- for (auto pgid : *pgids) {
- osd->service.identify_split_children(as_of_osdmap, shard_osdmap, pgid,
- &newer_children);
+ set<pair<spg_t,epoch_t>> newer_children;
+ for (auto i : *pgids) {
+ osd->service.identify_splits_and_merges(
+ as_of_osdmap, shard_osdmap, i.first,
+ &newer_children, nullptr);
}
newer_children.insert(pgids->begin(), pgids->end());
dout(10) << "as_of_osdmap " << as_of_osdmap->get_epoch() << " < shard "
}
}
-void OSDShard::_prime_splits(set<spg_t> *pgids)
+void OSDShard::_prime_splits(set<pair<spg_t,epoch_t>> *pgids)
{
dout(10) << *pgids << dendl;
auto p = pgids->begin();
while (p != pgids->end()) {
- unsigned shard_index = p->hash_to_shard(osd->num_shards);
+ unsigned shard_index = p->first.hash_to_shard(osd->num_shards);
if (shard_index == shard_id) {
- auto r = pg_slots.emplace(*p, nullptr);
+ auto r = pg_slots.emplace(p->first, nullptr);
if (r.second) {
- dout(10) << "priming slot " << *p << dendl;
+ dout(10) << "priming slot " << p->first << " e" << p->second << dendl;
r.first->second = make_unique<OSDShardPGSlot>();
- r.first->second->waiting_for_split = true;
+ r.first->second->waiting_for_split.insert(p->second);
} else {
auto q = r.first;
ceph_assert(q != pg_slots.end());
- if (q->second->waiting_for_split) {
- dout(10) << "slot " << *p << " already primed" << dendl;
- } else {
- dout(10) << "priming (existing) slot " << *p << dendl;
- q->second->waiting_for_split = true;
- }
+ dout(10) << "priming (existing) slot " << p->first << " e" << p->second
+ << dendl;
+ q->second->waiting_for_split.insert(p->second);
}
p = pgids->erase(p);
} else {
}
}
+void OSDShard::prime_merges(const OSDMapRef& as_of_osdmap,
+ set<pair<spg_t,epoch_t>> *merge_pgs)
+{
+ Mutex::Locker l(shard_lock);
+ dout(20) << __func__ << " checking shard " << shard_id
+ << " for remaining merge pgs " << merge_pgs << dendl;
+ auto p = merge_pgs->begin();
+ while (p != merge_pgs->end()) {
+ spg_t pgid = p->first;
+ epoch_t epoch = p->second;
+ unsigned shard_index = pgid.hash_to_shard(osd->num_shards);
+ if (shard_index != shard_id) {
+ ++p;
+ continue;
+ }
+ OSDShardPGSlot *slot;
+ auto r = pg_slots.emplace(pgid, nullptr);
+ if (r.second) {
+ r.first->second = make_unique<OSDShardPGSlot>();
+ }
+ slot = r.first->second.get();
+ if (slot->pg) {
+ // already have pg
+ dout(20) << __func__ << " have merge target pg " << pgid
+ << " " << slot->pg << dendl;
+ } else if (!slot->waiting_for_split.empty() &&
+ *slot->waiting_for_split.begin() < epoch) {
+ dout(20) << __func__ << " pending split on merge target pg " << pgid
+ << " " << slot->waiting_for_split << dendl;
+ } else {
+ dout(20) << __func__ << " creating empty merge participant " << pgid
+ << " for merge in " << epoch << dendl;
+ // Construct a history with a single previous interval,
+ // going back to the epoch *before* pg_num_pending was
+ // adjusted (since we are creating the PG as it would have
+ // existed just before the merge). We know that the PG was
+ // clean as of that epoch or else pg_num_pending wouldn't
+ // have been adjusted.
+ pg_history_t history;
+ history.same_interval_since = epoch - 1;
+ // leave these zeroed since we do not know the precist
+ // last_epoch_started value that the real PG instances have. If
+ // we are greater than they are, we will trigger an 'incomplete'
+ // state (see choose_acting).
+ history.last_epoch_started = 0;
+ history.last_epoch_clean = 0;
+ PGCreateInfo cinfo(pgid, epoch - 1,
+ history, PastIntervals(), false);
+ PGRef pg = osd->handle_pg_create_info(shard_osdmap, &cinfo);
+ _attach_pg(r.first->second.get(), pg.get());
+ _wake_pg_slot(pgid, slot);
+ pg->unlock();
+ }
+ // mark slot for merge
+ dout(20) << __func__ << " marking merge participant " << pgid << dendl;
+ slot->waiting_for_merge_epoch = epoch;
+ p = merge_pgs->erase(p);
+ }
+}
+
void OSDShard::register_and_wake_split_child(PG *pg)
{
{
auto p = pg_slots.find(pg->pg_id);
ceph_assert(p != pg_slots.end());
auto *slot = p->second.get();
+ dout(20) << pg->pg_id << " waiting_for_split " << slot->waiting_for_split
+ << dendl;
ceph_assert(!slot->pg);
- ceph_assert(slot->waiting_for_split);
+ ceph_assert(!slot->waiting_for_split.empty());
_attach_pg(slot, pg);
- _wake_pg_slot(pg->pg_id, slot);
+
+ epoch_t epoch = pg->get_osdmap_epoch();
+ ceph_assert(slot->waiting_for_split.count(epoch));
+ slot->waiting_for_split.erase(epoch);
+ if (slot->waiting_for_split.empty()) {
+ _wake_pg_slot(pg->pg_id, slot);
+ } else {
+ dout(10) << __func__ << " still waiting for split on "
+ << slot->waiting_for_split << dendl;
+ }
}
sdata_wait_lock.Lock();
sdata_cond.SignalOne();
}
}
+
// =============================================================
#undef dout_context
auto qi = std::move(slot->to_process.front());
slot->to_process.pop_front();
dout(20) << __func__ << " " << qi << " pg " << pg << dendl;
- set<spg_t> new_children;
+ set<pair<spg_t,epoch_t>> new_children;
OSDMapRef osdmap;
while (!pg) {
// should this pg shard exist on this osd in this (or a later) epoch?
osdmap = sdata->shard_osdmap;
const PGCreateInfo *create_info = qi.creates_pg();
- if (slot->waiting_for_split) {
+ if (!slot->waiting_for_split.empty()) {
dout(20) << __func__ << " " << token
- << " splitting" << dendl;
+ << " splitting " << slot->waiting_for_split << dendl;
_add_slot_waiter(token, slot, std::move(qi));
} else if (qi.get_map_epoch() > osdmap->get_epoch()) {
dout(20) << __func__ << " " << token
sdata->_wake_pg_slot(token, slot);
// identify split children between create epoch and shard epoch.
- osd->service.identify_split_children(
- pg->get_osdmap(), osdmap, pg->pg_id, &new_children);
+ osd->service.identify_splits_and_merges(
+ pg->get_osdmap(), osdmap, pg->pg_id, &new_children, nullptr);
sdata->_prime_splits(&new_children);
// distribute remaining split children to other shards below!
break;