]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: implement pg merge
authorSage Weil <sage@redhat.com>
Fri, 6 Apr 2018 15:26:52 +0000 (10:26 -0500)
committerSage Weil <sage@redhat.com>
Fri, 7 Sep 2018 17:09:05 +0000 (12:09 -0500)
- Vevamps the split tracking infrastructure, and adds new tracking for
upcoming merges in consume_map.  These are now unified into the same
identify_ method.  these consume the new pg_num change tracking
instructure we just added in the prior commit.
- PGs that are about to merge have a new wait infrastructure, since all
sources and the target have to reach the target epoch before the merge
can happen.
- If one of the sources for a merge does not exist, we create an empty
dummy PG to merge with.  The implies that the resulting merged PG will
be incomplete (and mostly useless), but it unifies the code paths.
- The actual merge (PG::merge_from) happens in advance_pg().

Fixes: http://tracker.ceph.com/issues/85
Signed-off-by: Sage Weil <sage@redhat.com>
src/osd/OSD.cc
src/osd/OSD.h
src/osd/PG.h

index 360c1617c2914b6a2e86734697fd909156cecbd6..073c90c66e08a81fadd9a49330db241ff0435d21 100644 (file)
@@ -332,34 +332,104 @@ void OSDService::dump_live_pgids()
 
 
 
-void OSDService::identify_split_children(
+void OSDService::identify_splits_and_merges(
   OSDMapRef old_map,
   OSDMapRef new_map,
   spg_t pgid,
-  set<spg_t> *new_children)
+  set<pair<spg_t,epoch_t>> *split_children,
+  set<pair<spg_t,epoch_t>> *merge_pgs)
 {
   if (!old_map->have_pg_pool(pgid.pool())) {
     return;
   }
   int old_pgnum = old_map->get_pg_num(pgid.pool());
-  int new_pgnum = get_possibly_deleted_pool_pg_num(
-    new_map, pgid.pool());
-  dout(20) << __func__ << " old " << old_pgnum << " e" << old_map->get_epoch()
-          << " new " << new_pgnum << " e" << new_map->get_epoch()
-          << dendl;
-  if (pgid.ps() < static_cast<unsigned>(old_pgnum)) {
-    set<spg_t> children;
-    if (pgid.is_split(old_pgnum, new_pgnum, &children)) {
-      dout(20) << __func__ << " " << pgid << " children " << children << dendl;
-      new_children->insert(children.begin(), children.end());
+  auto p = osd->pg_num_history.pg_nums.find(pgid.pool());
+  if (p == osd->pg_num_history.pg_nums.end()) {
+    return;
+  }
+  dout(20) << __func__ << " " << pgid << " e" << old_map->get_epoch()
+          << " to e" << new_map->get_epoch()
+          << " pg_nums " << p->second << dendl;
+  deque<spg_t> queue;
+  queue.push_back(pgid);
+  while (!queue.empty()) {
+    auto cur = queue.front();
+    queue.pop_front();
+    unsigned pgnum = old_pgnum;
+    for (auto q = p->second.lower_bound(old_map->get_epoch());
+        q != p->second.end() &&
+          q->first <= new_map->get_epoch();
+        ++q) {
+      derr << __func__ << " q " << q->first
+          << " pgnum " << pgnum << " -> " << q->second << dendl;
+      if (pgnum < q->second) {
+       // split?
+       if (cur.ps() < pgnum) {
+         set<spg_t> children;
+         if (cur.is_split(pgnum, q->second, &children)) {
+           dout(20) << __func__ << " " << cur << " e" << q->first
+                    << " pg_num " << pgnum << " -> " << q->second
+                    << " children " << children << dendl;
+           for (auto i : children) {
+             split_children->insert(make_pair(i, q->first));
+             queue.push_back(i);
+           }
+         }
+       } else if (cur.ps() < q->second) {
+         dout(20) << __func__ << " " << cur << " e" << q->first
+                  << " pg_num " << pgnum << " -> " << q->second
+                  << " is a child" << dendl;
+         // normally we'd capture this from the parent, but it's
+         // possible the parent doesn't exist yet (it will be
+         // fabricated to allow an intervening merge).  note this PG
+         // as a split child here to be sure we catch it.
+         split_children->insert(make_pair(cur, q->first));
+       } else {
+         dout(20) << __func__ << " " << cur << " e" << q->first
+                  << " pg_num " << pgnum << " -> " << q->second
+                  << " is post-split, skipping" << dendl;
+       }
+      } else if (merge_pgs) {
+       // merge?
+       if (cur.ps() >= q->second) {
+         if (cur.ps() < pgnum) {
+           spg_t parent;
+           if (cur.is_merge_source(pgnum, q->second, &parent)) {
+             set<spg_t> children;
+             parent.is_split(q->second, pgnum, &children);
+             dout(20) << __func__ << " " << cur << " e" << q->first
+                      << " pg_num " << pgnum << " -> " << q->second
+                      << " is merge source, target " << parent
+                      << ", source(s) " << children << dendl;
+             merge_pgs->insert(make_pair(cur, q->first));
+             merge_pgs->insert(make_pair(parent, q->first));
+             for (auto c : children) {
+               merge_pgs->insert(make_pair(c, q->first));
+             }
+           }
+         } else {
+           dout(20) << __func__ << " " << cur << " e" << q->first
+                    << " pg_num " << pgnum << " -> " << q->second
+                    << " is beyond old pgnum, skipping" << dendl;
+         }
+       } else {
+         set<spg_t> children;
+         if (cur.is_split(q->second, pgnum, &children)) {
+           dout(20) << __func__ << " " << cur << " e" << q->first
+                    << " pg_num " << pgnum << " -> " << q->second
+                    << " is merge target, source " << children << dendl;
+           for (auto c : children) {
+             merge_pgs->insert(make_pair(c, q->first));
+           }
+           merge_pgs->insert(make_pair(cur, q->first));
+         }
+       }
+      }
+      pgnum = q->second;
     }
-  } else if (pgid.ps() >= static_cast<unsigned>(new_pgnum)) {
-    dout(20) << __func__ << " " << pgid << " is post-split, skipping" << dendl;
   }
 }
 
-
-
 void OSDService::need_heartbeat_peer_update()
 {
   osd->need_heartbeat_peer_update();
@@ -2678,6 +2748,31 @@ int OSD::init()
   service.publish_superblock(superblock);
   service.max_oldest_map = superblock.oldest_map;
 
+  for (auto& shard : shards) {
+    for (auto& i : shard->pg_slots) {
+      PGRef pg = i.second->pg;
+
+      pg->lock();
+      set<pair<spg_t,epoch_t>> new_children;
+      set<pair<spg_t,epoch_t>> merge_pgs;
+      service.identify_splits_and_merges(pg->get_osdmap(), osdmap, pg->pg_id,
+                                        &new_children, &merge_pgs);
+      if (!new_children.empty()) {
+       for (auto shard : shards) {
+         shard->prime_splits(osdmap, &new_children);
+       }
+       assert(new_children.empty());
+      }
+      if (!merge_pgs.empty()) {
+       for (auto shard : shards) {
+         shard->prime_merges(osdmap, &merge_pgs);
+       }
+       assert(merge_pgs.empty());
+      }
+      pg->unlock();
+    }
+  }
+
   osd_op_tp.start();
   command_tp.start();
 
@@ -3971,16 +4066,6 @@ void OSD::load_pgs()
       continue;
     }
 
-    set<spg_t> new_children;
-    service.identify_split_children(pg->get_osdmap(), osdmap, pg->pg_id,
-                                   &new_children);
-    if (!new_children.empty()) {
-      for (auto shard : shards) {
-       shard->prime_splits(osdmap, &new_children);
-      }
-      ceph_assert(new_children.empty());
-    }
-
     pg->reg_next_scrub();
 
     dout(10) << __func__ << " loaded " << *pg << dendl;
@@ -7974,8 +8059,21 @@ void OSD::_finish_splits(set<PGRef>& pgs)
   dispatch_context(rctx, 0, service.get_osdmap());
 };
 
+bool OSD::add_merge_waiter(OSDMapRef nextmap, spg_t target, PGRef src,
+                          unsigned need)
+{
+  Mutex::Locker l(merge_lock);
+  auto& p = merge_waiters[nextmap->get_epoch()][target];
+  p[src->pg_id] = src;
+  dout(10) << __func__ << " added merge_waiter " << src->pg_id
+          << " for " << target  << ", have " << p.size() << "/" << need
+          << dendl;
+  return p.size() == need;
+}
+
 bool OSD::advance_pg(
-  epoch_t osd_epoch, PG *pg,
+  epoch_t osd_epoch,
+  PG *pg,
   ThreadPool::TPHandle &handle,
   PG::RecoveryCtx *rctx)
 {
@@ -7986,6 +8084,10 @@ bool OSD::advance_pg(
   OSDMapRef lastmap = pg->get_osdmap();
   ceph_assert(lastmap->get_epoch() < osd_epoch);
   set<PGRef> new_pgs;  // any split children
+  bool ret = true;
+
+  unsigned old_pg_num = lastmap->have_pg_pool(pg->pg_id.pool()) ?
+    lastmap->get_pg_num(pg->pg_id.pool()) : 0;
   for (epoch_t next_epoch = pg->get_osdmap_epoch() + 1;
        next_epoch <= osd_epoch;
        ++next_epoch) {
@@ -7995,6 +8097,105 @@ bool OSD::advance_pg(
       continue;
     }
 
+    unsigned new_pg_num =
+      (old_pg_num && nextmap->have_pg_pool(pg->pg_id.pool())) ?
+      nextmap->get_pg_num(pg->pg_id.pool()) : 0;
+    if (old_pg_num && new_pg_num && old_pg_num != new_pg_num) {
+      // check for merge
+      if (nextmap->have_pg_pool(pg->pg_id.pool())) {
+       spg_t parent;
+       if (pg->pg_id.is_merge_source(
+             old_pg_num,
+             new_pg_num,
+             &parent)) {
+         // we are merge source
+         PGRef spg = pg; // carry a ref
+         dout(1) << __func__ << " " << pg->pg_id
+                 << " is merge source, target is " << parent
+                  << dendl;
+         pg->write_if_dirty(rctx);
+         dispatch_context_transaction(*rctx, pg, &handle);
+         pg->ch->flush();
+         pg->on_shutdown();
+         OSDShard *sdata = pg->osd_shard;
+         {
+           Mutex::Locker l(sdata->shard_lock);
+           if (pg->pg_slot) {
+             sdata->_detach_pg(pg->pg_slot);
+             // update pg count now since we might not get an osdmap
+             // any time soon.
+             if (pg->is_primary())
+               logger->dec(l_osd_pg_primary);
+             else if (pg->is_replica())
+               logger->dec(l_osd_pg_replica);
+             else
+               logger->dec(l_osd_pg_stray);
+           }
+         }
+         pg->unlock();
+
+         set<spg_t> children;
+         parent.is_split(new_pg_num, old_pg_num, &children);
+         if (add_merge_waiter(nextmap, parent, pg, children.size())) {
+           enqueue_peering_evt(
+             parent,
+             PGPeeringEventRef(
+               std::make_shared<PGPeeringEvent>(
+                 nextmap->get_epoch(),
+                 nextmap->get_epoch(),
+                 NullEvt())));
+         }
+         ret = false;
+         goto out;
+       } else if (pg->pg_id.is_merge_target(old_pg_num, new_pg_num)) {
+         // we are merge target
+         set<spg_t> children;
+         pg->pg_id.is_split(new_pg_num, old_pg_num, &children);
+         dout(20) << __func__ << " " << pg->pg_id
+                  << " is merge target, sources are " << children
+                  << dendl;
+         map<spg_t,PGRef> sources;
+         {
+           Mutex::Locker l(merge_lock);
+           auto& s = merge_waiters[nextmap->get_epoch()][pg->pg_id];
+           unsigned need = children.size();
+           dout(20) << __func__ << " have " << s.size() << "/"
+                    << need << dendl;
+           if (s.size() == need) {
+             sources.swap(s);
+             merge_waiters[nextmap->get_epoch()].erase(pg->pg_id);
+             if (merge_waiters[nextmap->get_epoch()].empty()) {
+               merge_waiters.erase(nextmap->get_epoch());
+             }
+           }
+         }
+         if (!sources.empty()) {
+           unsigned new_pg_num = nextmap->get_pg_num(pg->pg_id.pool());
+           unsigned split_bits = pg->pg_id.get_split_bits(new_pg_num);
+           dout(1) << __func__ << " merging " << pg->pg_id << dendl;
+           pg->merge_from(sources, rctx, split_bits);
+         } else {
+           dout(20) << __func__ << " not ready to merge yet" << dendl;
+           pg->write_if_dirty(rctx);
+           pg->unlock();
+           // kick source(s) to get them ready
+           for (auto& i : sources) {
+             dout(20) << __func__ << " kicking source " << i.first << dendl;
+             enqueue_peering_evt(
+               i.first,
+               PGPeeringEventRef(
+                 std::make_shared<PGPeeringEvent>(
+                   nextmap->get_epoch(),
+                   nextmap->get_epoch(),
+                   NullEvt())));
+           }
+           ret = false;
+           goto out;
+         }
+       }
+      }
+    }
+
     vector<int> newup, newacting;
     int up_primary, acting_primary;
     nextmap->pg_to_up_acting_osds(
@@ -8005,29 +8206,31 @@ bool OSD::advance_pg(
       nextmap, lastmap, newup, up_primary,
       newacting, acting_primary, rctx);
 
-    // Check for split!
-    set<spg_t> children;
-    spg_t parent(pg->pg_id);
-    if (nextmap->have_pg_pool(pg->pg_id.pool()) &&
-       parent.is_split(
-         lastmap->get_pg_num(pg->pg_id.pool()),
-         nextmap->get_pg_num(pg->pg_id.pool()),
-         &children)) {
-      split_pgs(
-       pg, children, &new_pgs, lastmap, nextmap,
-       rctx);
+    if (new_pg_num && old_pg_num != new_pg_num) {
+      // check for split
+      set<spg_t> children;
+      if (pg->pg_id.is_split(
+           old_pg_num,
+           new_pg_num,
+           &children)) {
+       split_pgs(
+         pg, children, &new_pgs, lastmap, nextmap,
+         rctx);
+      }
     }
 
     lastmap = nextmap;
+    old_pg_num = new_pg_num;
     handle.reset_tp_timeout();
   }
   pg->handle_activate_map(rctx);
 
+  ret = true;
+ out:
   if (!new_pgs.empty()) {
     rctx->transaction->register_on_applied(new C_FinishSplits(this, new_pgs));
   }
-
-  return true;
+  return ret;
 }
 
 void OSD::consume_map()
@@ -8048,10 +8251,11 @@ void OSD::consume_map()
   service.await_reserved_maps();
   service.publish_map(osdmap);
 
-  // prime splits
-  set<spg_t> newly_split;
+  // prime splits and merges
+  set<pair<spg_t,epoch_t>> newly_split;  // splits, and when
+  set<pair<spg_t,epoch_t>> merge_pgs;    // merge participants, and when
   for (auto& shard : shards) {
-    shard->identify_splits(osdmap, &newly_split);
+    shard->identify_splits_and_merges(osdmap, &newly_split, &merge_pgs);
   }
   if (!newly_split.empty()) {
     for (auto& shard : shards) {
@@ -8060,6 +8264,28 @@ void OSD::consume_map()
     ceph_assert(newly_split.empty());
   }
 
+  // prune sent_ready_to_merge
+  service.prune_sent_ready_to_merge(osdmap);
+
+  // FIXME, maybe: We could race against an incoming peering message
+  // that instantiates a merge PG after identify_merges() below and
+  // never set up its peer to complete the merge.  An OSD restart
+  // would clear it up.  This is a hard race to resolve,
+  // extraordinarily rare (we only merge PGs that are stable and
+  // clean, so it'd have to be an imported PG to an OSD with a
+  // slightly stale OSDMap...), so I'm ignoring it for now.  We plan to
+  // replace all of this with a seastar-based code soon anyway.
+  if (!merge_pgs.empty()) {
+    // mark the pgs we already have, or create new and empty merge
+    // participants for those we are missing.  do this all under the
+    // shard lock so we don't have to worry about racing pg creates
+    // via _process.
+    for (auto& shard : shards) {
+      shard->prime_merges(osdmap, &merge_pgs);
+    }
+    ceph_assert(merge_pgs.empty());
+  }
+
   unsigned pushes_to_free = 0;
   for (auto& shard : shards) {
     shard->consume_map(osdmap, &pushes_to_free);
@@ -9626,9 +9852,16 @@ void OSDShard::consume_map(
     OSDShardPGSlot *slot = p->second.get();
     const spg_t& pgid = p->first;
     dout(20) << __func__ << " " << pgid << dendl;
-    if (slot->waiting_for_split) {
+    if (!slot->waiting_for_split.empty()) {
       dout(20) << __func__ << "  " << pgid
-              << " waiting for split" << dendl;
+              << " waiting for split " << slot->waiting_for_split << dendl;
+      ++p;
+      continue;
+    }
+    if (slot->waiting_for_merge_epoch > new_osdmap->get_epoch()) {
+      dout(20) << __func__ << "  " << pgid
+              << " waiting for merge by epoch " << slot->waiting_for_merge_epoch
+              << dendl;
       ++p;
       continue;
     }
@@ -9668,6 +9901,7 @@ void OSDShard::consume_map(
     }
     if (slot->waiting.empty() &&
        slot->num_running == 0 &&
+       slot->waiting_for_split.empty() &&
        !slot->pg) {
       dout(20) << __func__ << "  " << pgid << " empty, pruning" << dendl;
       p = pg_slots.erase(p);
@@ -9714,37 +9948,47 @@ void OSDShard::_wake_pg_slot(
     }
   }
   slot->waiting_peering.clear();
-  slot->waiting_for_split = false;
   ++slot->requeue_seq;
 }
 
-void OSDShard::identify_splits(const OSDMapRef& as_of_osdmap, set<spg_t> *pgids)
+void OSDShard::identify_splits_and_merges(
+  const OSDMapRef& as_of_osdmap,
+  set<pair<spg_t,epoch_t>> *split_pgs,
+  set<pair<spg_t,epoch_t>> *merge_pgs)
 {
   Mutex::Locker l(shard_lock);
   if (shard_osdmap) {
     for (auto& i : pg_slots) {
       const spg_t& pgid = i.first;
       auto *slot = i.second.get();
-      if (slot->pg || slot->waiting_for_split) {
-       osd->service.identify_split_children(shard_osdmap, as_of_osdmap, pgid,
-                                            pgids);
+      if (slot->pg) {
+       osd->service.identify_splits_and_merges(
+         shard_osdmap, as_of_osdmap, pgid,
+         split_pgs, merge_pgs);
+      } else if (!slot->waiting_for_split.empty()) {
+       osd->service.identify_splits_and_merges(
+         shard_osdmap, as_of_osdmap, pgid,
+         split_pgs, nullptr);
       } else {
        dout(20) << __func__ << " slot " << pgid
-                << " has no pg and !waiting_for_split" << dendl;
+                << " has no pg and waiting_for_split "
+                << slot->waiting_for_split << dendl;
       }
     }
   }
 }
 
-void OSDShard::prime_splits(const OSDMapRef& as_of_osdmap, set<spg_t> *pgids)
+void OSDShard::prime_splits(const OSDMapRef& as_of_osdmap,
+                           set<pair<spg_t,epoch_t>> *pgids)
 {
   Mutex::Locker l(shard_lock);
   _prime_splits(pgids);
   if (shard_osdmap->get_epoch() > as_of_osdmap->get_epoch()) {
-    set<spg_t> newer_children;
-    for (auto pgid : *pgids) {
-      osd->service.identify_split_children(as_of_osdmap, shard_osdmap, pgid,
-                                          &newer_children);
+    set<pair<spg_t,epoch_t>> newer_children;
+    for (auto i : *pgids) {
+      osd->service.identify_splits_and_merges(
+       as_of_osdmap, shard_osdmap, i.first,
+       &newer_children, nullptr);
     }
     newer_children.insert(pgids->begin(), pgids->end());
     dout(10) << "as_of_osdmap " << as_of_osdmap->get_epoch() << " < shard "
@@ -9762,27 +10006,24 @@ void OSDShard::prime_splits(const OSDMapRef& as_of_osdmap, set<spg_t> *pgids)
   }
 }
 
-void OSDShard::_prime_splits(set<spg_t> *pgids)
+void OSDShard::_prime_splits(set<pair<spg_t,epoch_t>> *pgids)
 {
   dout(10) << *pgids << dendl;
   auto p = pgids->begin();
   while (p != pgids->end()) {
-    unsigned shard_index = p->hash_to_shard(osd->num_shards);
+    unsigned shard_index = p->first.hash_to_shard(osd->num_shards);
     if (shard_index == shard_id) {
-      auto r = pg_slots.emplace(*p, nullptr);
+      auto r = pg_slots.emplace(p->first, nullptr);
       if (r.second) {
-       dout(10) << "priming slot " << *p << dendl;
+       dout(10) << "priming slot " << p->first << " e" << p->second << dendl;
        r.first->second = make_unique<OSDShardPGSlot>();
-       r.first->second->waiting_for_split = true;
+       r.first->second->waiting_for_split.insert(p->second);
       } else {
        auto q = r.first;
        ceph_assert(q != pg_slots.end());
-       if (q->second->waiting_for_split) {
-         dout(10) << "slot " << *p << " already primed" << dendl;
-       } else {
-         dout(10) << "priming (existing) slot " << *p << dendl;
-         q->second->waiting_for_split = true;
-       }
+       dout(10) << "priming (existing) slot " << p->first << " e" << p->second
+                << dendl;
+       q->second->waiting_for_split.insert(p->second);
       }
       p = pgids->erase(p);
     } else {
@@ -9791,6 +10032,66 @@ void OSDShard::_prime_splits(set<spg_t> *pgids)
   }
 }
 
+void OSDShard::prime_merges(const OSDMapRef& as_of_osdmap,
+                           set<pair<spg_t,epoch_t>> *merge_pgs)
+{
+  Mutex::Locker l(shard_lock);
+  dout(20) << __func__ << " checking shard " << shard_id
+          << " for remaining merge pgs " << merge_pgs << dendl;
+  auto p = merge_pgs->begin();
+  while (p != merge_pgs->end()) {
+    spg_t pgid = p->first;
+    epoch_t epoch = p->second;
+    unsigned shard_index = pgid.hash_to_shard(osd->num_shards);
+    if (shard_index != shard_id) {
+      ++p;
+      continue;
+    }
+    OSDShardPGSlot *slot;
+    auto r = pg_slots.emplace(pgid, nullptr);
+    if (r.second) {
+      r.first->second = make_unique<OSDShardPGSlot>();
+    }
+    slot = r.first->second.get();
+    if (slot->pg) {
+      // already have pg
+      dout(20) << __func__ << "  have merge target pg " << pgid
+              << " " << slot->pg << dendl;
+    } else if (!slot->waiting_for_split.empty() &&
+              *slot->waiting_for_split.begin() < epoch) {
+      dout(20) << __func__ << "  pending split on merge target pg " << pgid
+              << " " << slot->waiting_for_split << dendl;
+    } else {
+      dout(20) << __func__ << "  creating empty merge participant " << pgid
+              << " for merge in " << epoch << dendl;
+      // Construct a history with a single previous interval,
+      // going back to the epoch *before* pg_num_pending was
+      // adjusted (since we are creating the PG as it would have
+      // existed just before the merge). We know that the PG was
+      // clean as of that epoch or else pg_num_pending wouldn't
+      // have been adjusted.
+      pg_history_t history;
+      history.same_interval_since = epoch - 1;
+      // leave these zeroed since we do not know the precist
+      // last_epoch_started value that the real PG instances have.  If
+      // we are greater than they are, we will trigger an 'incomplete'
+      // state (see choose_acting).
+      history.last_epoch_started = 0;
+      history.last_epoch_clean = 0;
+      PGCreateInfo cinfo(pgid, epoch - 1,
+                        history, PastIntervals(), false);
+      PGRef pg = osd->handle_pg_create_info(shard_osdmap, &cinfo);
+      _attach_pg(r.first->second.get(), pg.get());
+      _wake_pg_slot(pgid, slot);
+      pg->unlock();
+    }
+    // mark slot for merge
+    dout(20) << __func__ << "  marking merge participant " << pgid << dendl;
+    slot->waiting_for_merge_epoch = epoch;
+    p = merge_pgs->erase(p);
+  }
+}
+
 void OSDShard::register_and_wake_split_child(PG *pg)
 {
   {
@@ -9799,10 +10100,21 @@ void OSDShard::register_and_wake_split_child(PG *pg)
     auto p = pg_slots.find(pg->pg_id);
     ceph_assert(p != pg_slots.end());
     auto *slot = p->second.get();
+    dout(20) << pg->pg_id << " waiting_for_split " << slot->waiting_for_split
+            << dendl;
     ceph_assert(!slot->pg);
-    ceph_assert(slot->waiting_for_split);
+    ceph_assert(!slot->waiting_for_split.empty());
     _attach_pg(slot, pg);
-    _wake_pg_slot(pg->pg_id, slot);
+
+    epoch_t epoch = pg->get_osdmap_epoch();
+    ceph_assert(slot->waiting_for_split.count(epoch));
+    slot->waiting_for_split.erase(epoch);
+    if (slot->waiting_for_split.empty()) {
+      _wake_pg_slot(pg->pg_id, slot);
+    } else {
+      dout(10) << __func__ << " still waiting for split on "
+              << slot->waiting_for_split << dendl;
+    }
   }
   sdata_wait_lock.Lock();
   sdata_cond.SignalOne();
@@ -9826,6 +10138,7 @@ void OSDShard::unprime_split_children(spg_t parent, unsigned old_pg_num)
   }
 }
 
+
 // =============================================================
 
 #undef dout_context
@@ -9970,16 +10283,16 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
   auto qi = std::move(slot->to_process.front());
   slot->to_process.pop_front();
   dout(20) << __func__ << " " << qi << " pg " << pg << dendl;
-  set<spg_t> new_children;
+  set<pair<spg_t,epoch_t>> new_children;
   OSDMapRef osdmap;
 
   while (!pg) {
     // should this pg shard exist on this osd in this (or a later) epoch?
     osdmap = sdata->shard_osdmap;
     const PGCreateInfo *create_info = qi.creates_pg();
-    if (slot->waiting_for_split) {
+    if (!slot->waiting_for_split.empty()) {
       dout(20) << __func__ << " " << token
-              << " splitting" << dendl;
+              << " splitting " << slot->waiting_for_split << dendl;
       _add_slot_waiter(token, slot, std::move(qi));
     } else if (qi.get_map_epoch() > osdmap->get_epoch()) {
       dout(20) << __func__ << " " << token
@@ -10008,8 +10321,8 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
              sdata->_wake_pg_slot(token, slot);
 
              // identify split children between create epoch and shard epoch.
-             osd->service.identify_split_children(
-               pg->get_osdmap(), osdmap, pg->pg_id, &new_children);
+             osd->service.identify_splits_and_merges(
+               pg->get_osdmap(), osdmap, pg->pg_id, &new_children, nullptr);
              sdata->_prime_splits(&new_children);
              // distribute remaining split children to other shards below!
              break;
index 596b77d08dc412a493821faecf376892efc6a343..463ff4e00bb0397d42cc93a77178089cc24f5451 100644 (file)
@@ -877,11 +877,12 @@ public:
   }
 
   /// identify split child pgids over a osdmap interval
-  void identify_split_children(
+  void identify_splits_and_merges(
     OSDMapRef old_map,
     OSDMapRef new_map,
     spg_t pgid,
-    set<spg_t> *new_children);
+    set<pair<spg_t,epoch_t>> *new_children,
+    set<pair<spg_t,epoch_t>> *merge_pgs);
 
   void need_heartbeat_peer_update();
 
@@ -1105,11 +1106,14 @@ struct OSDShardPGSlot {
   /// should bail out (their op has been requeued)
   uint64_t requeue_seq = 0;
 
-  /// waiting for split child to materialize
-  bool waiting_for_split = false;
+  /// waiting for split child to materialize in these epoch(s)
+  set<epoch_t> waiting_for_split;
 
   epoch_t epoch = 0;
   boost::intrusive::set_member_hook<> pg_epoch_item;
+
+  /// waiting for a merge (source or target) by this epoch
+  epoch_t waiting_for_merge_epoch = 0;
 };
 
 struct OSDShard {
@@ -1192,9 +1196,15 @@ struct OSDShard {
 
   void _wake_pg_slot(spg_t pgid, OSDShardPGSlot *slot);
 
-  void identify_splits(const OSDMapRef& as_of_osdmap, set<spg_t> *pgids);
-  void _prime_splits(set<spg_t> *pgids);
-  void prime_splits(const OSDMapRef& as_of_osdmap, set<spg_t> *pgids);
+  void identify_splits_and_merges(
+    const OSDMapRef& as_of_osdmap,
+    set<pair<spg_t,epoch_t>> *split_children,
+    set<pair<spg_t,epoch_t>> *merge_pgs);
+  void _prime_splits(set<pair<spg_t,epoch_t>> *pgids);
+  void prime_splits(const OSDMapRef& as_of_osdmap,
+                   set<pair<spg_t,epoch_t>> *pgids);
+  void prime_merges(const OSDMapRef& as_of_osdmap,
+                   set<pair<spg_t,epoch_t>> *merge_pgs);
   void register_and_wake_split_child(PG *pg);
   void unprime_split_children(spg_t parent, unsigned old_pg_num);
 
@@ -1865,6 +1875,13 @@ public:
   }
 
 protected:
+  Mutex merge_lock = {"OSD::merge_lock"};
+  /// merge epoch -> target pgid -> source pgid -> pg
+  map<epoch_t,map<spg_t,map<spg_t,PGRef>>> merge_waiters;
+
+  bool add_merge_waiter(OSDMapRef nextmap, spg_t target, PGRef source,
+                       unsigned need);
+
   // -- placement groups --
   std::atomic<size_t> num_pgs = {0};
 
@@ -2026,7 +2043,10 @@ protected:
   void handle_fast_pg_info(MOSDPGInfo *m);
   void handle_fast_pg_remove(MOSDPGRemove *m);
 
+public:
+  // used by OSDShard
   PGRef handle_pg_create_info(const OSDMapRef& osdmap, const PGCreateInfo *info);
+protected:
 
   void handle_fast_force_recovery(MOSDForceRecovery *m);
 
index 259cf6fdca0723633b18a7faa1bee8e010ddd38a..a4a5142f1bdbf7d07b28c9efb42a88ddee49bc41 100644 (file)
@@ -2861,6 +2861,10 @@ public:
     bool dirty_epoch,
     bool try_fast_info,
     PerfCounters *logger = nullptr);
+
+  void write_if_dirty(RecoveryCtx *rctx) {
+    write_if_dirty(*rctx->transaction);
+  }
 protected:
   void write_if_dirty(ObjectStore::Transaction& t);