]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: restructure consume_map in terms of shards
authorSage Weil <sage@redhat.com>
Thu, 8 Feb 2018 22:23:04 +0000 (16:23 -0600)
committerSage Weil <sage@redhat.com>
Wed, 4 Apr 2018 13:26:53 +0000 (08:26 -0500)
- new split primming machinery
- new primed split cleanup on pg removal
- cover the pg creation path

The old split tracking is now totally unused; will be removed in the next
patch.

Signed-off-by: Sage Weil <sage@redhat.com>
src/osd/OSD.cc
src/osd/OSD.h
src/osd/PG.cc
src/osd/osd_types.h

index 8d37b5e1c5ab9d08d949aa5cfe82d22755cac511..26c88f98a1f2acc5a6e1cd49219f0e763993d73a 100644 (file)
@@ -512,6 +512,32 @@ void OSDService::complete_split(spg_t pgid)
   in_progress_splits.erase(pgid);
 }
 
+
+void OSDService::identify_split_children(
+  OSDMapRef old_map,
+  OSDMapRef new_map,
+  spg_t pgid,
+  set<spg_t> *new_children)
+{
+  if (!old_map->have_pg_pool(pgid.pool())) {
+    return;
+  }
+  int old_pgnum = old_map->get_pg_num(pgid.pool());
+  int new_pgnum = get_possibly_deleted_pool_pg_num(
+    new_map, pgid.pool());
+  if (pgid.ps() < static_cast<unsigned>(old_pgnum)) {
+    set<spg_t> children;
+    if (pgid.is_split(old_pgnum, new_pgnum, &children)) {
+      dout(20) << __func__ << " " << pgid << " children " << children << dendl;
+      new_children->insert(children.begin(), children.end());
+    }
+  } else if (pgid.ps() >= static_cast<unsigned>(new_pgnum)) {
+    dout(20) << __func__ << " " << pgid << " is post-split, skipping" << dendl;
+  }
+}
+
+
+
 void OSDService::need_heartbeat_peer_update()
 {
   osd->need_heartbeat_peer_update();
@@ -1757,11 +1783,14 @@ void OSDService::queue_for_pg_delete(spg_t pgid, epoch_t e)
       e));
 }
 
-void OSDService::finish_pg_delete(PG *pg)
+void OSDService::finish_pg_delete(PG *pg, unsigned old_pg_num)
 {
-  osd->op_shardedwq.clear_pg_pointer(pg);
   pg_remove_epoch(pg->get_pgid());
-  cancel_pending_splits_for_parent(pg->get_pgid());
+
+  osd->unregister_pg(pg);
+  for (auto shard : osd->shards) {
+    shard->unprime_split_children(pg->pg_id, old_pg_num);
+  }
 }
 
 void OSDService::_queue_for_recovery(
@@ -2074,9 +2103,13 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_,
     char order_lock[128] = {0};
     snprintf(order_lock, sizeof(order_lock), "OSDShard.%d::sdata_op_ordering_lock", i);
     OSDShard *one_shard = new OSDShard(
+      i,
+      cct,
+      this,
       lock_name, order_lock,
       cct->_conf->osd_op_pq_max_tokens_per_priority,
-      cct->_conf->osd_op_pq_min_cost, cct, op_queue);
+      cct->_conf->osd_op_pq_min_cost,
+      op_queue);
     shards.push_back(one_shard);
   }
 }
@@ -2654,8 +2687,6 @@ int OSD::init()
   clear_temp_objects();
 
   // initialize osdmap references in sharded wq
-#warning fixme initialization
-  //op_shardedwq.prune_or_wake_pg_waiters(osdmap, whoami);
   for (auto& shard : shards) {
     shard->osdmap = osdmap;
   }
@@ -3830,7 +3861,7 @@ void OSD::_get_pgs(vector<PGRef> *v, bool clear_too)
 {
   v->clear();
   for (auto& s : shards) {
-    Mutex::Locker l(s->sdata_lock);
+    Mutex::Locker l(s->sdata_op_ordering_lock);
     for (auto& j : s->pg_slots) {
       if (j.second.pg &&
          !j.second.pg->is_deleted()) {
@@ -3848,7 +3879,7 @@ void OSD::_get_pgids(vector<spg_t> *v)
 {
   v->clear();
   for (auto& s : shards) {
-    Mutex::Locker l(s->sdata_lock);
+    Mutex::Locker l(s->sdata_op_ordering_lock);
     for (auto& j : s->pg_slots) {
       if (j.second.pg &&
          !j.second.pg->is_deleted()) {
@@ -3858,22 +3889,41 @@ void OSD::_get_pgids(vector<spg_t> *v)
   }
 }
 
-void OSD::_register_pg(PGRef pg)
+void OSD::register_pg(PGRef pg)
 {
   spg_t pgid = pg->get_pgid();
   uint32_t shard_index = pgid.hash_to_shard(num_shards);
   auto sdata = shards[shard_index];
-  Mutex::Locker l(sdata->sdata_lock);
+  Mutex::Locker l(sdata->sdata_op_ordering_lock);
   auto& slot = sdata->pg_slots[pgid];
+  assert(!slot.pg);
+  dout(20) << __func__ << " " << pgid << " " << pg << dendl;
   slot.pg = pg;
   ++num_pgs;
 }
 
+void OSD::unregister_pg(PG *pg)
+{
+  spg_t pgid = pg->get_pgid();
+  uint32_t shard_index = pgid.hash_to_shard(num_shards);
+  auto sdata = shards[shard_index];
+  Mutex::Locker l(sdata->sdata_op_ordering_lock);
+  auto p = sdata->pg_slots.find(pg->pg_id);
+  if (p != sdata->pg_slots.end() &&
+      p->second.pg) {
+    dout(20) << __func__ << " " << pg->pg_id << " cleared" << dendl;
+    p->second.pg.reset();
+    --num_pgs;
+  } else {
+    dout(20) << __func__ << " " << pg->pg_id << " not found" << dendl;
+  }
+}
+
 PGRef OSD::_lookup_pg(spg_t pgid)
 {
   uint32_t shard_index = pgid.hash_to_shard(num_shards);
   auto sdata = shards[shard_index];
-  Mutex::Locker l(sdata->sdata_lock);
+  Mutex::Locker l(sdata->sdata_op_ordering_lock);
   auto p = sdata->pg_slots.find(pgid);
   if (p == sdata->pg_slots.end()) {
     return nullptr;
@@ -3970,31 +4020,28 @@ void OSD::load_pgs()
     if (pg->dne())  {
       dout(10) << "load_pgs " << *it << " deleting dne" << dendl;
       pg->ch = nullptr;
-      service.pg_remove_epoch(pg->pg_id);
       pg->unlock();
-      {
-       // Delete pg
-       RWLock::WLocker l(pg_map_lock);
-       auto p = pg_map.find(pg->get_pgid());
-       assert(p != pg_map.end() && p->second == pg);
-       dout(20) << __func__ << " removed pg " << pg << " from pg_map" << dendl;
-       pg_map.erase(p);
-       pg->put("PGMap");
-      }
+      unregister_pg(pg.get());
       recursive_remove_collection(cct, store, pgid, *it);
       continue;
     }
 
     set<spg_t> new_children;
-    service.init_splits_between(pg->pg_id, pg->get_osdmap(), osdmap, &new_children);
-    op_shardedwq.prime_splits(new_children);
+    service.identify_split_children(pg->get_osdmap(), osdmap, pg->pg_id,
+                                   &new_children);
+    if (!new_children.empty()) {
+      for (auto shard : shards) {
+       shard->prime_splits(osdmap, &new_children);
+      }
+      assert(new_children.empty());
+    }
 
     pg->reg_next_scrub();
 
     dout(10) << __func__ << " loaded " << *pg << dendl;
     pg->unlock();
 
-    _register_pg(pg);
+    register_pg(pg);
     ++num;
   }
   dout(0) << __func__ << " opened " << num << " pgs" << dendl;
@@ -4036,18 +4083,6 @@ PGRef OSD::handle_pg_create_info(OSDMapRef osdmap, const PGCreateInfo *info)
 
   PGRef pg = _open_pg(createmap, pgid);
 
-  // We need to avoid racing with consume_map().  This should get
-  // redone as a structured waterfall of incoming osdmaps from osd ->
-  // shard, and something here that gets the to-be-split pg slots
-  // primed, even when they land on other shards.  (I think it will be
-  // iterative to handle the case where it races with newer incoming
-  // maps.)  For now, just cross our fingers. FIXME
-  {
-    set<spg_t> new_children;
-    service.init_splits_between(pg->pg_id, createmap, osdmap, &new_children);
-    op_shardedwq.prime_splits(new_children);
-  }
-
   pg->lock(true);
 
   // we are holding the shard lock
@@ -7823,15 +7858,16 @@ void OSD::_finish_splits(set<PGRef>& pgs)
     epoch_t e = pg->get_osdmap_epoch();
     pg->unlock();
 
-    service.complete_split(pg->get_pgid());
     service.pg_add_epoch(pg->pg_id, e);
 
     pg->lock();
     pg->handle_initialize(&rctx);
     pg->queue_null(e, e);
     dispatch_context_transaction(rctx, pg);
-    op_shardedwq.wake_pg_split_waiters(pg->get_pgid());
     pg->unlock();
+
+    unsigned shard_index = pg->pg_id.hash_to_shard(num_shards);
+    shards[shard_index]->register_and_wake_split_child(pg);
   }
 
   dispatch_context(rctx, 0, service.get_osdmap());
@@ -7873,7 +7909,6 @@ void OSD::advance_pg(
          lastmap->get_pg_num(pg->pg_id.pool()),
          nextmap->get_pg_num(pg->pg_id.pool()),
          &children)) {
-      service.mark_split_in_progress(pg->pg_id, children);
       split_pgs(
        pg, children, &new_pgs, lastmap, nextmap,
        rctx);
@@ -7906,17 +7941,25 @@ void OSD::consume_map()
 
   int num_pg_primary = 0, num_pg_replica = 0, num_pg_stray = 0;
 
-  // scan pg's
-  set<spg_t> new_children;
+  unsigned pushes_to_free = 0;
+  set<spg_t> newly_split;
+  for (auto& shard : shards) {
+    shard->consume_map(osdmap, &pushes_to_free, &newly_split);
+  }
+  if (!newly_split.empty()) {
+    for (auto& shard : shards) {
+      shard->prime_splits(osdmap, &newly_split);
+    }
+    assert(newly_split.empty());
+  }
+
+  vector<spg_t> pgids;
+  _get_pgids(&pgids);
+
+  // count (FIXME)
   vector<PGRef> pgs;
   _get_pgs(&pgs);
-  vector<spg_t> pgids;
-  pgids.reserve(pgs.size());
   for (auto& pg : pgs) {
-    pgids.push_back(pg->get_pgid());
-    service.init_splits_between(pg->get_pgid(), service.get_osdmap(), osdmap,
-                               &new_children);
-
     // FIXME: this is lockless and racy, but we don't want to take pg lock
     // here.
     if (pg->is_primary())
@@ -7926,6 +7969,7 @@ void OSD::consume_map()
     else
       num_pg_stray++;
   }
+
   {
     // FIXME: move to OSDShard
     [[gnu::unused]] auto&& pending_create_locker = guardedly_lock(pending_creates_lock);
@@ -7939,9 +7983,6 @@ void OSD::consume_map()
     }
   }
 
-  service.expand_pg_num(service.get_osdmap(), osdmap, &new_children);
-  op_shardedwq.prime_splits(new_children);
-
   service.pre_publish_map(osdmap);
   service.await_reserved_maps();
   service.publish_map(osdmap);
@@ -7952,12 +7993,7 @@ void OSD::consume_map()
 
   service.maybe_inject_dispatch_delay();
 
-  // remove any PGs which we no longer host from the pg_slot wait lists
-  dout(20) << __func__ << " checking pg_slot waiters" << dendl;
-  op_shardedwq.prune_or_wake_pg_waiters(osdmap, whoami);
-
-  service.maybe_inject_dispatch_delay();
-
+  // queue null events to push maps down to individual PGs
   for (auto pgid : pgids) {
     enqueue_peering_evt(
       pgid,
@@ -8131,11 +8167,8 @@ void OSD::split_pgs(
   OSDMapRef nextmap,
   PG::RecoveryCtx *rctx)
 {
-  unsigned pg_num = nextmap->get_pg_num(
-    parent->pg_id.pool());
-  parent->update_snap_mapper_bits(
-    parent->get_pgid().get_split_bits(pg_num)
-    );
+  unsigned pg_num = nextmap->get_pg_num(parent->pg_id.pool());
+  parent->update_snap_mapper_bits(parent->get_pgid().get_split_bits(pg_num));
 
   vector<object_stat_sum_t> updated_stats;
   parent->start_split_stats(childpgids, &updated_stats);
@@ -8145,8 +8178,7 @@ void OSD::split_pgs(
        i != childpgids.end();
        ++i, ++stat_iter) {
     assert(stat_iter != updated_stats.end());
-    dout(10) << "Splitting " << *parent << " into " << *i << dendl;
-    assert(service.splitting(*i));
+    dout(10) << __func__ << " splitting " << *parent << " into " << *i << dendl;
     PG* child = _make_pg(nextmap, *i);
     child->lock(true);
     out_pgs->insert(child);
@@ -8988,7 +9020,7 @@ void OSD::dequeue_peering_evt(
   ThreadPool::TPHandle& handle)
 {
   PG::RecoveryCtx rctx = create_context();
-  auto curmap = service.get_osdmap();
+  auto curmap = sdata->osdmap;
   epoch_t need_up_thru = 0, same_interval_since = 0;
   if (!pg) {
     if (const MQuery *q = dynamic_cast<const MQuery*>(evt->evt.get())) {
@@ -9005,12 +9037,7 @@ void OSD::dequeue_peering_evt(
     dispatch_context_transaction(rctx, pg, &handle);
     need_up_thru = pg->get_need_up_thru();
     same_interval_since = pg->get_same_interval_since();
-    bool deleted = pg->is_deleted();
     pg->unlock();
-
-    if (deleted) {
-#warning hmm?
-    }
   }
 
   if (need_up_thru) {
@@ -9402,13 +9429,94 @@ int OSD::init_op_flags(OpRequestRef& op)
 // =============================================================
 
 #undef dout_context
-#define dout_context osd->cct
+#define dout_context cct
 #undef dout_prefix
-#define dout_prefix *_dout << "osd." << osd->whoami << " op_wq "
+#define dout_prefix *_dout << "osd." << osd->get_nodeid() << ":" << shard_id << "." << __func__ << " "
+
+void OSDShard::consume_map(
+  OSDMapRef& new_osdmap,
+  unsigned *pushes_to_free,
+  set<spg_t> *new_children)
+{
+  Mutex::Locker l(sdata_op_ordering_lock);
+  OSDMapRef old_osdmap = std::move(osdmap);
+  osdmap = new_osdmap;
+  dout(10) << new_osdmap->get_epoch()
+           << " (was " << (old_osdmap ? old_osdmap->get_epoch() : 0) << ")"
+          << dendl;
+  bool queued = false;
+
+  // check slots
+  auto p = pg_slots.begin();
+  while (p != pg_slots.end()) {
+    OSDShard::pg_slot& slot = p->second;
+    const spg_t& pgid = p->first;
+    dout(20) << __func__ << " " << pgid << dendl;
+    if (old_osdmap &&
+       (slot->pg || slot->waiting_for_split)) {
+      // only prime children for parent slots that are attached to a
+      // pg or are waiting_for_split (because their ancestor is
+      // attached to a pg).
+      osd->service.identify_split_children(old_osdmap, new_osdmap, pgid,
+                                          new_children);
+    }
+    if (slot.waiting_for_split) {
+      dout(20) << __func__ << "  " << pgid
+              << " waiting for split" << dendl;
+      ++p;
+      continue;
+    }
+    if (!slot.waiting_peering.empty()) {
+      epoch_t first = slot.waiting_peering.begin()->first;
+      if (first <= osdmap->get_epoch()) {
+       dout(20) << __func__ << "  " << pgid
+                << " pending_peering first epoch " << first
+                << " <= " << osdmap->get_epoch() << ", requeueing" << dendl;
+       _wake_pg_slot(pgid, slot);
+       queued = true;
+      }
+      ++p;
+      continue;
+    }
+    if (!slot.waiting.empty()) {
+      if (osdmap->is_up_acting_osd_shard(pgid, osd->get_nodeid())) {
+       dout(20) << __func__ << "  " << pgid << " maps to us, keeping"
+                << dendl;
+       ++p;
+       continue;
+      }
+      while (!slot.waiting.empty() &&
+            slot.waiting.front().get_map_epoch() <= osdmap->get_epoch()) {
+       auto& qi = slot.waiting.front();
+       dout(20) << __func__ << "  " << pgid
+                << " waiting item " << qi
+                << " epoch " << qi.get_map_epoch()
+                << " <= " << osdmap->get_epoch()
+                << ", stale, dropping" << dendl;
+       *pushes_to_free += qi.get_reserved_pushes();
+       slot.waiting.pop_front();
+      }
+      if (slot.waiting.empty() &&
+         slot.num_running == 0 &&
+         !slot.pg) {
+       dout(20) << __func__ << "  " << pgid << " empty, pruning" << dendl;
+       p = pg_slots.erase(p);
+       --osd->num_pgs;
+       continue;
+      }
+    }
+    ++p;
+  }
+  _prime_splits(new_children);
+  if (queued) {
+    sdata_lock.Lock();
+    sdata_cond.SignalOne();
+    sdata_lock.Unlock();
+  }
+}
 
-void OSD::ShardedOpWQ::_wake_pg_slot(
+void OSDShard::_wake_pg_slot(
   spg_t pgid,
-  OSDShard *sdata,
   OSDShard::pg_slot& slot)
 {
   dout(20) << __func__ << " " << pgid
@@ -9418,13 +9526,13 @@ void OSD::ShardedOpWQ::_wake_pg_slot(
   for (auto i = slot.to_process.rbegin();
        i != slot.to_process.rend();
        ++i) {
-    sdata->_enqueue_front(std::move(*i), osd->op_prio_cutoff);
+    _enqueue_front(std::move(*i), osd->op_prio_cutoff);
   }
   slot.to_process.clear();
   for (auto i = slot.waiting.rbegin();
        i != slot.waiting.rend();
        ++i) {
-    sdata->_enqueue_front(std::move(*i), osd->op_prio_cutoff);
+    _enqueue_front(std::move(*i), osd->op_prio_cutoff);
   }
   slot.waiting.clear();
   for (auto i = slot.waiting_peering.rbegin();
@@ -9433,7 +9541,7 @@ void OSD::ShardedOpWQ::_wake_pg_slot(
     // this is overkill; we requeue everything, even if some of these items are
     // waiting for maps we don't have yet.  FIXME.
     for (auto j = i->second.rbegin(); j != i->second.rend(); ++j) {
-      sdata->_enqueue_front(std::move(*j), osd->op_prio_cutoff);
+      _enqueue_front(std::move(*j), osd->op_prio_cutoff);
     }
   }
   slot.waiting_peering.clear();
@@ -9441,119 +9549,121 @@ void OSD::ShardedOpWQ::_wake_pg_slot(
   ++slot.requeue_seq;
 }
 
-void OSD::ShardedOpWQ::wake_pg_split_waiters(spg_t pgid)
+void OSDShard::prime_splits(OSDMapRef as_of_osdmap, set<spg_t> *pgids)
 {
-  uint32_t shard_index = pgid.hash_to_shard(osd->shards.size());
-  auto sdata = osd->shards[shard_index];
-  bool queued = false;
-  {
-    Mutex::Locker l(sdata->sdata_op_ordering_lock);
-    auto p = sdata->pg_slots.find(pgid);
-    if (p != sdata->pg_slots.end()) {
-      _wake_pg_slot(pgid, sdata, p->second);
-      queued = true;
+  Mutex::Locker l(sdata_op_ordering_lock);
+  _prime_splits(pgids);
+  if (osdmap->get_epoch() > as_of_osdmap->get_epoch()) {
+    set<spg_t> newer_children;
+    for (auto pgid : *pgids) {
+      osd->service.identify_split_children(as_of_osdmap, osdmap, pgid,
+                                          &newer_children);
+    }
+    newer_children.insert(pgids->begin(), pgids->end());
+    dout(10) << "as_of_osdmap " << as_of_osdmap->get_epoch() << " < shard "
+            << osdmap->get_epoch() << ", new children " << newer_children
+            << dendl;
+    _prime_splits(&newer_children);
+    // note: we don't care what is left over here for other shards.
+    // if this shard is ahead of us and one isn't, e.g., one thread is
+    // calling into prime_splits via _process (due to a newly created
+    // pg) and this shard has a newer map due to a racing consume_map,
+    // then any grandchildren left here will be identified (or were
+    // identified) when the slower shard's osdmap is advanced.
+    // _prime_splits() will tolerate the case where the pgid is
+    // already primed.
+  }
+}
+
+void OSDShard::_prime_splits(set<spg_t> *pgids)
+{
+  dout(10) << *pgids << dendl;
+  auto p = pgids->begin();
+  while (p != pgids->end()) {
+    unsigned shard_index = p->hash_to_shard(osd->num_shards);
+    if (shard_index == shard_id) {
+      auto i = pg_slots.find(*p);
+      if (i == pg_slots.end()) {
+       dout(10) << "priming slot " << *p << dendl;
+       OSDShard::pg_slot& slot = pg_slots[*p];
+       slot.waiting_for_split = true;
+      } else {
+       auto q = pg_slots.find(*p);
+       assert(q != pg_slots.end());
+       if (q->second->waiting_for_split) {
+         dout(10) << "slot " << *p << " already primed" << dendl;
+       } else {
+         dout(10) << "priming (existing) slot " << *p << dendl;
+         q->second->waiting_for_split = true;
+       }
+      }
+      p = pgids->erase(p);
+    } else {
+      ++p;
     }
-  }
-  if (queued) {
-    sdata->sdata_lock.Lock();
-    sdata->sdata_cond.SignalOne();
-    sdata->sdata_lock.Unlock();
   }
 }
 
-void OSD::ShardedOpWQ::prime_splits(const set<spg_t>& pgs)
+void OSDShard::register_and_wake_split_child(PG *pg)
 {
-  dout(20) << __func__ << " " << pgs << dendl;
-  for (auto pgid : pgs) {
-    unsigned shard_index = pgid.hash_to_shard(osd->shards.size());
-    OSDShard* sdata = osd->shards[shard_index];
-    Mutex::Locker l(sdata->sdata_op_ordering_lock);
-    OSDShard::pg_slot& slot = sdata->pg_slots[pgid];
-    slot.waiting_for_split = true;
+  {
+    Mutex::Locker l(sdata_op_ordering_lock);
+    dout(10) << pg->pg_id << " " << pg << dendl;
+    auto p = pg_slots.find(pg->pg_id);
+    assert(p != pg_slots.end());
+    auto& slot = p->second;
+    assert(!slot.pg);
+    assert(slot.waiting_for_split);
+    slot.pg = pg;
+    ++osd->num_pgs;
+    _wake_pg_slot(pg->pg_id, slot);
   }
+  sdata_lock.Lock();
+  sdata_cond.SignalOne();
+  sdata_lock.Unlock();
 }
 
-void OSD::ShardedOpWQ::prune_or_wake_pg_waiters(OSDMapRef osdmap, int whoami)
+void OSDShard::unprime_split_children(spg_t parent, unsigned old_pg_num)
 {
-  unsigned pushes_to_free = 0;
-  bool queued = false;
-  for (auto sdata : osd->shards) {
-    Mutex::Locker l(sdata->sdata_op_ordering_lock);
-    sdata->osdmap = osdmap;
-    auto p = sdata->pg_slots.begin();
-    while (p != sdata->pg_slots.end()) {
-      OSDShard::pg_slot& slot = p->second;
-      if (slot.waiting_for_split) {
-       dout(20) << __func__ << "  " << p->first
-                << " waiting for split" << dendl;
-       ++p;
-       continue;
-      }
-      if (!slot.waiting_peering.empty()) {
-       epoch_t first = slot.waiting_peering.begin()->first;
-       if (first <= osdmap->get_epoch()) {
-         dout(20) << __func__ << "  " << p->first
-                  << " pending_peering first epoch " << first
-                  << " <= " << osdmap->get_epoch() << ", requeueing" << dendl;
-         _wake_pg_slot(p->first, sdata, slot);
-         queued = true;
-       }
-       ++p;
-       continue;
-      }
-      if (!slot.waiting.empty()) {
-       if (osdmap->is_up_acting_osd_shard(p->first, whoami)) {
-         dout(20) << __func__ << "  " << p->first << " maps to us, keeping"
-                  << dendl;
-         ++p;
-         continue;
-       }
-       while (!slot.waiting.empty() &&
-              slot.waiting.front().get_map_epoch() <= osdmap->get_epoch()) {
-         auto& qi = slot.waiting.front();
-         dout(20) << __func__ << "  " << p->first
-                  << " waiting item " << qi
-                  << " epoch " << qi.get_map_epoch()
-                  << " <= " << osdmap->get_epoch()
-                  << ", stale, dropping" << dendl;
-         pushes_to_free += qi.get_reserved_pushes();
-         slot.waiting.pop_front();
-       }
-       if (slot.waiting.empty() &&
-           slot.num_running == 0 &&
-           !slot.pg) {
-         dout(20) << __func__ << "  " << p->first << " empty, pruning" << dendl;
-         p = sdata->pg_slots.erase(p);
-         --osd->num_pgs;
-         continue;
-       }
-      }
-      ++p;
-    }
-    if (queued) {
-      sdata->sdata_lock.Lock();
-      sdata->sdata_cond.SignalOne();
-      sdata->sdata_lock.Unlock();
+  Mutex::Locker l(sdata_op_ordering_lock);
+  vector<spg_t> to_delete;
+  for (auto& i : pg_slots) {
+    if (i.first != parent &&
+       i.first.get_ancestor(old_pg_num) == parent) {
+      dout(10) << __func__ << " parent " << parent << " clearing " << i.first
+              << dendl;
+      to_delete.push_back(i.first);
     }
   }
-  if (pushes_to_free > 0) {
-    osd->service.release_reserved_pushes(pushes_to_free);
+  for (auto pgid : to_delete) {
+    pg_slots.erase(pgid);
   }
 }
 
-void OSD::ShardedOpWQ::clear_pg_pointer(PG *pg)
+// =============================================================
+
+#undef dout_context
+#define dout_context osd->cct
+#undef dout_prefix
+#define dout_prefix *_dout << "osd." << osd->whoami << " op_wq "
+
+void OSD::ShardedOpWQ::wake_pg_split_waiters(spg_t pgid)
 {
-  spg_t pgid = pg->get_pgid();
   uint32_t shard_index = pgid.hash_to_shard(osd->shards.size());
   auto sdata = osd->shards[shard_index];
-  Mutex::Locker l(sdata->sdata_op_ordering_lock);
-  auto p = sdata->pg_slots.find(pgid);
-  if (p != sdata->pg_slots.end()) {
-    auto& slot = p->second;
-    assert(!slot.pg || slot.pg == pg);
-    dout(20) << __func__ << " " << pgid << " pg " << pg << " cleared" << dendl;
-    slot.pg = nullptr;
-    --osd->num_pgs;
+  bool queued = false;
+  {
+    Mutex::Locker l(sdata->sdata_op_ordering_lock);
+    auto p = sdata->pg_slots.find(pgid);
+    if (p != sdata->pg_slots.end()) {
+      sdata->_wake_pg_slot(pgid, p->second);
+      queued = true;
+    }
+  }
+  if (queued) {
+    sdata->sdata_lock.Lock();
+    sdata->sdata_cond.SignalOne();
+    sdata->sdata_lock.Unlock();
   }
 }
 
@@ -9672,7 +9782,7 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
   --slot.num_running;
 
   if (slot.to_process.empty()) {
-    // raced with wake_pg_waiters or prune_or_wake_pg_waiters
+    // raced with wake_pg_waiters or consume_map
     dout(20) << __func__ << " " << token
             << " nothing queued" << dendl;
     if (pg) {
@@ -9705,10 +9815,12 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
   slot.to_process.pop_front();
   dout(20) << __func__ << " " << qi << " pg " << pg << dendl;
   unsigned pushes_to_free = 0;
+  set<spg_t> new_children;
+  OSDMapRef osdmap;
 
   while (!pg) {
     // should this pg shard exist on this osd in this (or a later) epoch?
-    OSDMapRef osdmap = sdata->osdmap;
+    osdmap = sdata->osdmap;
     const PGCreateInfo *create_info = qi.creates_pg();
     if (slot.waiting_for_split) {
       dout(20) << __func__ << " " << token
@@ -9739,7 +9851,13 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
              // we created the pg! drop out and continue "normally"!
              slot.pg = pg;           // install in shard slot
              ++osd->num_pgs;
-             _wake_pg_slot(token, sdata, slot);
+             sdata->_wake_pg_slot(token, slot);
+
+             // identify split children between create epoch and shard epoch.
+             osd->service.identify_split_children(
+               pg->get_osdmap(), osdmap, pg->pg_id, &new_children);
+             sdata->_prime_splits(&new_children);
+             // distribute remaining split children to other shards below!
              break;
            }
            dout(20) << __func__ << " ignored create on " << qi << dendl;
@@ -9796,6 +9914,12 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
   }
   sdata->sdata_op_ordering_lock.Unlock();
 
+  if (!new_children.empty()) {
+    for (auto shard : osd->shards) {
+      shard->prime_splits(osdmap, &new_children);
+    }
+    assert(new_children.empty());
+  }
   if (pushes_to_free) {
     osd->service.release_reserved_pushes(pushes_to_free);
   }
index b1742dae1f7269eff036edbe0a05242e2a24ce36..6722910b410af8e109a487d7d31987f880d11245 100644 (file)
@@ -799,7 +799,7 @@ public:
   void queue_for_snap_trim(PG *pg);
   void queue_for_scrub(PG *pg, bool with_high_priority);
   void queue_for_pg_delete(spg_t pgid, epoch_t e);
-  void finish_pg_delete(PG *pg);
+  void finish_pg_delete(PG *pg, unsigned old_pg_num);
 
 private:
   // -- pg recovery and associated throttling --
@@ -922,6 +922,13 @@ public:
     return get_deleted_pool_pg_num(pool);
   }
 
+  /// identify split child pgids over a osdmap interval
+  void identify_split_children(
+    OSDMapRef old_map,
+    OSDMapRef new_map,
+    spg_t pgid,
+    set<spg_t> *new_children);
+
   void need_heartbeat_peer_update();
 
   void init();
@@ -1115,6 +1122,9 @@ enum class io_queue {
 };
 
 struct OSDShard {
+  const unsigned shard_id;
+  CephContext *cct;
+  OSD *osd;
   Mutex sdata_lock;
   Cond sdata_cond;
 
@@ -1142,7 +1152,7 @@ struct OSDShard {
 
   /// map of slots for each spg_t.  maintains ordering of items dequeued
   /// from pqueue while _process thread drops shard lock to acquire the
-  /// pg lock.  slots are removed only by prune_or_wake_pg_waiters.
+  /// pg lock.  slots are removed by consume_map.
   unordered_map<spg_t,pg_slot> pg_slots;
 
   /// priority queue
@@ -1163,11 +1173,30 @@ struct OSDShard {
        priority, cost, std::move(item));
   }
 
+  /// push osdmap into shard
+  void consume_map(
+    OSDMapRef& osdmap,
+    unsigned *pushes_to_free,
+    set<spg_t> *new_children);
+
+  void _wake_pg_slot(spg_t pgid, OSDShard::pg_slot& slot);
+
+  void _prime_splits(set<spg_t> *pgids);
+  void prime_splits(OSDMapRef as_of_osdmap, set<spg_t> *pgids);
+  void register_and_wake_split_child(PG *pg);
+  void unprime_split_children(spg_t parent, unsigned old_pg_num);
+
   OSDShard(
+    int id,
+    CephContext *cct,
+    OSD *osd,
     string lock_name, string ordering_lock,
-    uint64_t max_tok_per_prio, uint64_t min_cost, CephContext *cct,
+    uint64_t max_tok_per_prio, uint64_t min_cost,
     io_queue opqueue)
-    : sdata_lock(lock_name.c_str(), false, true, false, cct),
+    : shard_id(id),
+      cct(cct),
+      osd(osd),
+      sdata_lock(lock_name.c_str(), false, true, false, cct),
       sdata_op_ordering_lock(ordering_lock.c_str(), false, true,
                             false, cct) {
     if (opqueue == io_queue::weightedpriority) {
@@ -1610,7 +1639,9 @@ private:
   friend std::ostream& operator<<(std::ostream& out, const io_queue& q);
 
   const io_queue op_queue;
+public:
   const unsigned int op_prio_cutoff;
+protected:
 
   /*
    * The ordered op delivery chain is:
@@ -1659,17 +1690,6 @@ private:
     /// wake any pg waiters after a PG is split
     void wake_pg_split_waiters(spg_t pgid);
 
-    void _wake_pg_slot(spg_t pgid, OSDShard *sdata, OSDShard::pg_slot& slot);
-
-    /// prime slots for splitting pgs
-    void prime_splits(const set<spg_t>& pgs);
-
-    /// prune ops (and possibly pg_slots) for pgs that shouldn't be here
-    void prune_or_wake_pg_waiters(OSDMapRef osdmap, int whoami);
-
-    /// clear cached PGRef on pg deletion
-    void clear_pg_pointer(PG *pg);
-
     /// clear pg_slots on shutdown
     void clear_pg_slots();
 
@@ -1839,10 +1859,10 @@ public:
   vector<OSDShard*> shards;
   uint32_t num_shards = 0;
 
-protected:
   // -- placement groups --
   std::atomic<size_t> num_pgs = {0};
 
+protected:
   std::mutex pending_creates_lock;
   using create_from_osd_t = std::pair<pg_t, bool /* is primary*/>;
   std::set<create_from_osd_t> pending_creates_from_osd;
@@ -1852,7 +1872,8 @@ protected:
 
   PGRef _lookup_pg(spg_t pgid);
   PG   *_lookup_lock_pg(spg_t pgid);
-  void _register_pg(PGRef pg);
+  void register_pg(PGRef pg);
+  void unregister_pg(PG *pg);
 
   void _get_pgs(vector<PGRef> *v, bool clear_too=false);
   void _get_pgids(vector<spg_t> *v);
index 26edecd6642971d76c570c002dfab5d372638999..de88e9ef58825e1646c0565019f59c19873d74ea 100644 (file)
@@ -6543,7 +6543,7 @@ void PG::_delete_some()
       osd->meta_ch, std::move(t));
     assert(r == 0);
 
-    osd->finish_pg_delete(this);
+    osd->finish_pg_delete(this, pool.info.get_pg_num());
     deleted = true;
 
     // cancel reserver here, since the PG is about to get deleted and the
index 49304b6600a2792ea44cc6702673a204a551213c..0a41f5eb1b73961baa583509728cf1041dbd7c3c 100644 (file)
@@ -520,6 +520,11 @@ struct spg_t {
   bool parse(const std::string& s) {
     return parse(s.c_str());
   }
+
+  spg_t get_ancestor(unsigned old_pg_num) const {
+    return spg_t(pgid.get_ancestor(old_pg_num), shard);
+  }
+
   bool is_split(unsigned old_pg_num, unsigned new_pg_num,
                set<spg_t> *pchildren) const {
     set<pg_t> _children;