// Shutdown PGs
   {
-    RWLock::RLocker l(pg_map_lock);
-    for (auto& p : pg_map) {
-      p.second->shutdown();
+    set<PGRef> pgs;
+    {
+      RWLock::RLocker l(pg_map_lock);
+      for (auto& p : pg_map) {
+       pgs.insert(p.second);
+      }
+    }
+    for (auto pg : pgs) {
+      pg->shutdown();
     }
   }
 
 #ifdef PG_DEBUG_REFS
   service.dump_live_pgids();
 #endif
-  {
-    RWLock::RLocker l(pg_map_lock);
-    for (ceph::unordered_map<spg_t, PG*>::iterator p = pg_map.begin();
-        p != pg_map.end();
-        ++p) {
-      if (p->second->is_deleted()) {
+  while (true) {
+    set<PGRef> pgs;
+    {
+      RWLock::WLocker l(pg_map_lock);
+      for (auto& i : pg_map) {
+       pgs.insert(i.second);
+      }
+      pg_map.clear();
+    }
+    if (pgs.empty()) {
+      break;
+    }
+    for (auto& pg : pgs) {
+      if (pg->is_deleted()) {
        continue;
       }
-      dout(20) << " kicking pg " << p->first << dendl;
-      p->second->lock();
-      if (p->second->get_num_ref() != 1) {
-        derr << "pgid " << p->first << " has ref count of "
-            << p->second->get_num_ref() << dendl;
+      dout(20) << " kicking pg " << pg << dendl;
+      pg->lock();
+      if (pg->get_num_ref() != 1) {
+       derr << "pgid " << pg->get_pgid() << " has ref count of "
+            << pg->get_num_ref() << dendl;
 #ifdef PG_DEBUG_REFS
-       p->second->dump_live_ids();
+       pg->dump_live_ids();
 #endif
        if (cct->_conf->osd_shutdown_pgref_assert) {
          ceph_abort();
        }
       }
-      p->second->ch.reset();
-      p->second->unlock();
-      p->second->put("PGMap");
+      pg->ch.reset();
+      pg->unlock();
+      pg->put("PGMap");
     }
-    pg_map.clear();
   }
 #ifdef PG_DEBUG_REFS
   service.dump_live_pgids();
 // ======================================================
 // PG's
 
-PG *OSD::_open_lock_pg(
+PGRef OSD::_open_pg(
   OSDMapRef createmap,
-  spg_t pgid, bool no_lockdep_check)
+  OSDMapRef servicemap,
+  spg_t pgid)
 {
-  assert(osd_lock.is_locked());
-
   PG* pg = _make_pg(createmap, pgid);
   {
     RWLock::WLocker l(pg_map_lock);
-    pg->lock(no_lockdep_check);
     pg_map[pgid] = pg;
     pg->get("PGMap");  // because it's in pg_map
     service.pg_add_epoch(pg->pg_id, createmap->get_epoch());
+    service.init_splits_between(pgid, createmap, servicemap);
   }
   return pg;
 }
 
+PG *OSD::_open_lock_pg(
+  OSDMapRef createmap,
+  OSDMapRef servicemap,
+  spg_t pgid, bool no_lockdep_check)
+{
+  PGRef pg = _open_pg(createmap, servicemap, pgid);
+  pg->lock();
+  return pg.get();
+}
+
 PG* OSD::_make_pg(
   OSDMapRef createmap,
   spg_t pgid)
 }
 
 
-void OSD::add_newly_split_pg(PG *pg, PG::RecoveryCtx *rctx)
-{
-  epoch_t e(service.get_osdmap()->get_epoch());
-  pg->get("PGMap");  // For pg_map
-  pg_map[pg->pg_id] = pg;
-  service.pg_add_epoch(pg->pg_id, pg->get_osdmap()->get_epoch());
-
-  dout(10) << "Adding newly split pg " << *pg << dendl;
-  pg->handle_loaded(rctx);
-  pg->queue_null(e, e);
-  map<spg_t, list<PGPeeringEventRef> >::iterator to_wake =
-    peering_wait_for_split.find(pg->pg_id);
-  if (to_wake != peering_wait_for_split.end()) {
-    for (list<PGPeeringEventRef>::iterator i =
-          to_wake->second.begin();
-        i != to_wake->second.end();
-        ++i) {
-      enqueue_peering_evt(pg->get_pgid(), *i);
-    }
-    peering_wait_for_split.erase(to_wake);
-  }
-}
-
 PG *OSD::_create_lock_pg(
   OSDMapRef createmap,
   spg_t pgid,
   const PastIntervals& pi,
   ObjectStore::Transaction& t)
 {
-  assert(osd_lock.is_locked());
   dout(20) << "_create_lock_pg pgid " << pgid << dendl;
 
-  PG *pg = _open_lock_pg(createmap, pgid, true);
+  PG *pg = _open_lock_pg(createmap, service.get_osdmap(), pgid, true);
 
-  service.init_splits_between(pgid, pg->get_osdmap(), service.get_osdmap());
   pg->init(
     role,
     up,
   return pg;
 }
 
-PG *OSD::_lookup_lock_pg(spg_t pgid)
+PGRef OSD::_lookup_pg(spg_t pgid)
 {
   while (true) {
     {
       if (p == pg_map.end()) {
        return nullptr;
       }
-      PG *pg = p->second;
-      pg->lock();
+      PGRef pg = p->second;
       if (!pg->is_deleted()) {
        return pg;
       }
-      pg->unlock();
     }
     // try again, this time with a write lock
     {
       if (p == pg_map.end()) {
        return nullptr;
       }
-      PG *pg = p->second;
-      pg->lock();
+      PGRef pg = p->second;
       if (!pg->is_deleted()) {
        return pg;
       }
       pg_map.erase(p);
       pg->put("PGMap");
-      pg->unlock();
     }
     return nullptr;
   }
 }
 
+PG *OSD::_lookup_lock_pg(spg_t pgid)
+{
+  PGRef pg = _lookup_pg(pgid);
+  if (!pg) {
+    return nullptr;
+  }
+  pg->lock();
+  if (!pg->is_deleted()) {
+    return pg.get();
+  }
+  pg->unlock();
+  return nullptr;
+}
+
 PG *OSD::lookup_lock_pg(spg_t pgid)
 {
   return _lookup_lock_pg(pgid);
          assert(0 == "Missing map in load_pgs");
        }
       }
-      pg = _open_lock_pg(pgosdmap, pgid);
+      pg = _open_lock_pg(pgosdmap, osdmap, pgid);
     } else {
-      pg = _open_lock_pg(osdmap, pgid);
+      pg = _open_lock_pg(osdmap, osdmap, pgid);
     }
     // there can be no waiters here, so we don't call wake_pg_waiters
 
 }
 
 
+PGRef OSD::handle_pg_create_info(OSDMapRef osdmap, const PGCreateInfo *info)
+{
+  spg_t pgid = info->pgid;
+
+  int up_primary, acting_primary;
+  vector<int> up, acting;
+  osdmap->pg_to_up_acting_osds(
+    pgid.pgid, &up, &up_primary, &acting, &acting_primary);
+
+  /*
+  const bool is_mon_create =
+    evt->get_event().dynamic_type() == PG::NullEvt::static_type();
+  if (maybe_wait_for_max_pg(pgid, is_mon_create)) {
+    return nullptr;
+  }
+  */
+
+  PG::RecoveryCtx rctx = create_context();
+
+  const pg_pool_t* pp = osdmap->get_pg_pool(pgid.pool());
+  if (pp->has_flag(pg_pool_t::FLAG_EC_OVERWRITES) &&
+      store->get_type() != "bluestore") {
+    clog->warn() << "pg " << pgid
+                << " is at risk of silent data corruption: "
+                << "the pool allows ec overwrites but is not stored in "
+                << "bluestore, so deep scrubbing will not detect bitrot";
+  }
+  PG::_create(*rctx.transaction, pgid, pgid.get_split_bits(pp->get_pg_num()));
+  PG::_init(*rctx.transaction, pgid, pp);
+
+  int role = osdmap->calc_pg_role(whoami, acting, acting.size());
+  if (!pp->is_replicated() && role != pgid.shard) {
+    role = -1;
+  }
+
+  PGRef pg = _open_pg(get_map(info->epoch), osdmap, pgid);
+
+  pg->lock(true);
+
+  // we are holding the shard lock
+  assert(!pg->is_deleted());
+
+  pg->init(
+    role,
+    up,
+    up_primary,
+    acting,
+    acting_primary,
+    info->history,
+    info->past_intervals,
+    false,
+    rctx.transaction);
+
+  pg->handle_initialize(&rctx);
+  pg->handle_activate_map(&rctx);
+  rctx.created_pgs.insert(pg);
+
+  dispatch_context(rctx, pg.get(), osdmap, nullptr);
+
+  dout(10) << *pg << " is new" << dendl;
+  return pg;
+}
+
 /*
  * look up a pg.  if we have it, great.  if not, consider creating it IF the pg mapping
  * hasn't changed since the given epoch and we are the primary.
   }
 }
 
-struct C_CompleteSplits : public Context {
+struct C_FinishSplits : public Context {
   OSD *osd;
   set<PGRef> pgs;
-  C_CompleteSplits(OSD *osd, const set<PGRef> &in)
+  C_FinishSplits(OSD *osd, const set<PGRef> &in)
     : osd(osd), pgs(in) {}
   void finish(int r) override {
-    Mutex::Locker l(osd->osd_lock);
-    if (osd->is_stopping())
-      return;
-    PG::RecoveryCtx rctx = osd->create_context();
-    for (set<PGRef>::iterator i = pgs.begin();
-        i != pgs.end();
-        ++i) {
-      osd->pg_map_lock.get_write();
-      (*i)->lock();
-      PG *pg = i->get();
-      osd->add_newly_split_pg(pg, &rctx);
-      osd->service.complete_split((*i)->get_pgid());
-      osd->pg_map_lock.put_write();
-      osd->dispatch_context_transaction(rctx, pg);
-      osd->wake_pg_waiters(*i);
-      (*i)->unlock();
-    }
+    osd->_finish_splits(pgs);
+  }
+};
+
+void OSD::_finish_splits(set<PGRef>& pgs)
+{
+  dout(10) << __func__ << " " << pgs << dendl;
+  Mutex::Locker l(osd_lock);
+  if (is_stopping())
+    return;
+  PG::RecoveryCtx rctx = create_context();
+  for (set<PGRef>::iterator i = pgs.begin();
+       i != pgs.end();
+       ++i) {
+    PG *pg = i->get();
+
+    pg->lock();
+    dout(10) << __func__ << " " << *pg << dendl;
+    epoch_t e = pg->get_osdmap()->get_epoch();
+    pg->unlock();
 
-    osd->dispatch_context(rctx, 0, osd->service.get_osdmap());
+    pg_map_lock.get_write();
+    pg->get("PGMap");  // For pg_map
+    pg_map[pg->get_pgid()] = pg;
+    service.complete_split(pg->get_pgid());
+    service.pg_add_epoch(pg->pg_id, e);
+    pg_map_lock.put_write();
+
+    pg->lock();
+    pg->handle_initialize(&rctx);
+    pg->queue_null(e, e);
+    dispatch_context_transaction(rctx, pg);
+    wake_pg_waiters(pg);
+    pg->unlock();
   }
+
+  dispatch_context(rctx, 0, service.get_osdmap());
 };
 
 void OSD::advance_pg(
   service.pg_update_epoch(pg->pg_id, lastmap->get_epoch());
 
   if (!new_pgs.empty()) {
-    rctx->on_applied->add(new C_CompleteSplits(this, new_pgs));
+    rctx->on_applied->add(new C_FinishSplits(this, new_pgs));
   }
 }
 
   int num_pg_primary = 0, num_pg_replica = 0, num_pg_stray = 0;
 
   // scan pg's
+  vector<spg_t> pgids;
   {
     RWLock::RLocker l(pg_map_lock);
     for (ceph::unordered_map<spg_t,PG*>::iterator it = pg_map.begin();
         it != pg_map.end();
         ++it) {
+      pgids.push_back(it->first);
       PG *pg = it->second;
       if (pg->is_deleted()) {
        continue;
       }
-      pg->lock();
+      service.init_splits_between(it->first, service.get_osdmap(), osdmap);
+
+      // FIXME: this is lockless and racy, but we don't want to take pg lock
+      // here.
       if (pg->is_primary())
         num_pg_primary++;
       else if (pg->is_replica())
         num_pg_replica++;
       else
         num_pg_stray++;
-
-      service.init_splits_between(it->first, service.get_osdmap(), osdmap);
-
-      pg->unlock();
     }
 
     [[gnu::unused]] auto&& pending_create_locker = guardedly_lock(pending_creates_lock);
 
   service.maybe_inject_dispatch_delay();
 
-  // scan pg's
-  {
-    RWLock::RLocker l(pg_map_lock);
-    for (ceph::unordered_map<spg_t,PG*>::iterator it = pg_map.begin();
-        it != pg_map.end();
-        ++it) {
-      enqueue_peering_evt(
-       it->first,
-       PGPeeringEventRef(
-         std::make_shared<PGPeeringEvent>(
-           osdmap->get_epoch(),
-           osdmap->get_epoch(),
-           NullEvt())));
-    }
-
-    logger->set(l_osd_pg, pg_map.size());
+  for (auto pgid : pgids) {
+    enqueue_peering_evt(
+      pgid,
+      PGPeeringEventRef(
+       std::make_shared<PGPeeringEvent>(
+         osdmap->get_epoch(),
+         osdmap->get_epoch(),
+         NullEvt())));
   }
+  logger->set(l_osd_pg, pgids.size());
   logger->set(l_osd_pg_primary, num_pg_primary);
   logger->set(l_osd_pg_replica, num_pg_replica);
   logger->set(l_osd_pg_stray, num_pg_stray);
     ctx.transaction = new ObjectStore::Transaction;
     ctx.on_applied = new C_Contexts(cct);
     ctx.on_safe = new C_Contexts(cct);
+    ctx.created_pgs.clear();
   }
 }
 
     ctx.created_pgs.clear();
     delete (ctx.transaction);
     assert(tr == 0);
+    ctx.created_pgs.clear();
   }
 }
 
   PGPeeringEventRef evt,
   ThreadPool::TPHandle& handle)
 {
-  auto curmap = service.get_osdmap();
   PG::RecoveryCtx rctx = create_context();
-  if (curmap->get_epoch() > pg->get_osdmap()->get_epoch()) {
-    advance_pg(curmap->get_epoch(), pg, handle, &rctx);
-  }
-  pg->do_peering_event(evt, &rctx);
-  auto need_up_thru = pg->get_need_up_thru();
-  auto same_interval_since = pg->get_same_interval_since();
-  dispatch_context_transaction(rctx, pg, &handle);
-  bool deleted = pg->is_deleted();
-  pg->unlock();
-
-  if (deleted) {
-    RWLock::WLocker l(pg_map_lock);
-    auto p = pg_map.find(pg->get_pgid());
-    if (p != pg_map.end() &&
-       p->second == pg) {
-      dout(20) << __func__ << " removed pg " << pg << " from pg_map" << dendl;
-      pg_map.erase(p);
-      pg->put("PGMap");
+  auto curmap = service.get_osdmap();
+  epoch_t need_up_thru = 0, same_interval_since = 0;
+  if (!pg) {
+    if (const MQuery *q = dynamic_cast<const MQuery*>(evt->evt.get())) {
+      handle_pg_query_nopg(*q);
     } else {
-      dout(20) << __func__ << " failed to remove pg " << pg << " from pg_map" << dendl;
+      derr << __func__ << " unrecognized pg-less event " << evt->get_desc() << dendl;
+      ceph_abort();
+    }
+  } else {
+    if (curmap->get_epoch() > pg->get_osdmap()->get_epoch()) {
+      advance_pg(curmap->get_epoch(), pg, handle, &rctx);
+    }
+    pg->do_peering_event(evt, &rctx);
+    dispatch_context_transaction(rctx, pg, &handle);
+    need_up_thru = pg->get_need_up_thru();
+    same_interval_since = pg->get_same_interval_since();
+    bool deleted = pg->is_deleted();
+    pg->unlock();
+
+    if (deleted) {
+      RWLock::WLocker l(pg_map_lock);
+      auto p = pg_map.find(pg->get_pgid());
+      if (p != pg_map.end() &&
+         p->second == pg) {
+       dout(20) << __func__ << " removed pg " << pg << " from pg_map" << dendl;
+       pg_map.erase(p);
+       pg->put("PGMap");
+      } else {
+       dout(20) << __func__ << " failed to remove pg " << pg << " from pg_map" << dendl;
+      }
     }
   }
 
   if (need_up_thru) {
     queue_want_up_thru(same_interval_since);
   }
-  dispatch_context(rctx, 0, curmap, &handle);
+  dispatch_context(rctx, pg, curmap, &handle);
 
   service.send_pg_temp();
 }
 void OSD::ShardedOpWQ::prune_pg_waiters(OSDMapRef osdmap, int whoami)
 {
   unsigned pushes_to_free = 0;
+  bool queued = false;
   for (auto sdata : shard_list) {
     Mutex::Locker l(sdata->sdata_op_ordering_lock);
     sdata->waiting_for_pg_osdmap = osdmap;
       ShardData::pg_slot& slot = p->second;
       if (!slot.to_process.empty() && slot.num_running == 0) {
        if (osdmap->is_up_acting_osd_shard(p->first, whoami)) {
-         dout(20) << __func__ << "  " << p->first << " maps to us, keeping"
-                  << dendl;
+         if (slot.pending_nopg) {
+           dout(20) << __func__ << "  " << p->first << " maps to us, pending create,"
+                    << " requeuing" << dendl;
+           for (auto& q : slot.to_process) {
+             pushes_to_free += q.get_reserved_pushes();
+           }
+           for (auto i = slot.to_process.rbegin();
+                i != slot.to_process.rend();
+                ++i) {
+             sdata->_enqueue_front(std::move(*i), osd->op_prio_cutoff);
+           }
+           slot.to_process.clear();
+           slot.waiting_for_pg = false;
+           slot.pending_nopg = false;
+           ++slot.requeue_seq;
+           queued = true;
+         } else {
+           dout(20) << __func__ << "  " << p->first << " maps to us, keeping"
+                    << dendl;
+         }
          ++p;
          continue;
        }
        ++p;
       }
     }
+    if (queued) {
+      sdata->sdata_lock.Lock();
+      sdata->sdata_cond.SignalOne();
+      sdata->sdata_lock.Unlock();
+    }
   }
   if (pushes_to_free > 0) {
     osd->service.release_reserved_pushes(pushes_to_free);
     return;
   }
 
+  ThreadPool::TPHandle tp_handle(osd->cct, hb, timeout_interval,
+                                suicide_interval);
+
   // take next item
   auto qi = std::move(slot.to_process.front());
   slot.to_process.pop_front();
   dout(20) << __func__ << " " << qi << " pg " << pg << dendl;
+  unsigned pushes_to_free = 0;
 
-  if (!pg) {
+  while (!pg) {
     // should this pg shard exist on this osd in this (or a later) epoch?
     OSDMapRef osdmap = sdata->waiting_for_pg_osdmap;
-    if (osdmap->is_up_acting_osd_shard(token,
-                                      osd->whoami)) {
-      dout(20) << __func__ << " " << token
-              << " no pg, should exist, will wait" << " on " << qi << dendl;
-      slot.to_process.push_front(std::move(qi));
-      slot.waiting_for_pg = true;
-    } else if (qi.get_map_epoch() > osdmap->get_epoch()) {
+    const PGCreateInfo *create_info = qi.creates_pg();
+    if (qi.get_map_epoch() > osdmap->get_epoch()) {
       dout(20) << __func__ << " " << token
               << " no pg, item epoch is "
               << qi.get_map_epoch() << " > " << osdmap->get_epoch()
               << ", will wait on " << qi << dendl;
+      if (!!create_info || !qi.requires_pg()) {
+       slot.pending_nopg = true;
+      }
       slot.to_process.push_front(std::move(qi));
       slot.waiting_for_pg = true;
+    } else if (osdmap->is_up_acting_osd_shard(token, osd->whoami)) {
+      if (osd->service.splitting(token)) {
+       dout(20) << __func__ << " " << token
+                << " splitting, waiting on " << qi << dendl;
+       slot.to_process.push_front(std::move(qi));
+       slot.waiting_for_pg = true;
+      } else if (create_info) {
+       dout(20) << __func__ << " " << token
+                << " no pg, should create on " << qi << dendl;
+       pg = osd->handle_pg_create_info(osdmap, create_info);
+       if (pg) {
+         // we created the pg! drop out and continue "normally"!
+         _wake_pg_slot(token, sdata, slot, &pushes_to_free);
+         break;
+       }
+       dout(20) << __func__ << " ignored create on " << qi << dendl;
+      } else if (!qi.requires_pg()) {
+       // for pg-less events, we run them under the ordering lock, since
+       // we don't have the pg lock to keep them ordered.
+       qi.run(osd, pg, tp_handle);
+       sdata->sdata_op_ordering_lock.Unlock();
+       return;
+      } else {
+       dout(20) << __func__ << " " << token
+                << " no pg, should exist, will wait on " << qi << dendl;
+       slot.to_process.push_front(std::move(qi));
+       slot.waiting_for_pg = true;
+      }
     } else {
       dout(20) << __func__ << " " << token
               << " no pg, shouldn't exist,"
   }
   sdata->sdata_op_ordering_lock.Unlock();
 
+  if (pushes_to_free) {
+    osd->service.release_reserved_pushes(pushes_to_free);
+  }
 
   // osd_opwq_process marks the point at which an operation has been dequeued
   // and will begin to be handled by a worker thread.
   delete f;
   *_dout << dendl;
 
-  ThreadPool::TPHandle tp_handle(osd->cct, hb, timeout_interval,
-                                suicide_interval);
   qi.run(osd, pg, tp_handle);
 
   {