]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
osd: fast dispatch peering events (part 1)
authorSage Weil <sage@redhat.com>
Tue, 2 Jan 2018 21:35:44 +0000 (15:35 -0600)
committerSage Weil <sage@redhat.com>
Tue, 3 Apr 2018 15:12:35 +0000 (10:12 -0500)
This is a big commit that lays out the infrastructure changes to fast
dispatch the remaining peering events.  It's hard to separate it all out
so this probably doesn't quite build; it's just easier to review as a
separate patch.

- lock ordering for pg_map has changed:
  before:
    OSD::pg_map_lock
      PG::lock
        ShardData::lock

  after:
    PG::lock
      ShardData::lock
        OSD::pg_map_lock

- queue items are now annotated with whether they can proceed without a
pg at all (e.g., query) or can instantiate a pg (e.g., notify log etc).

- There is some wonkiness around getting the initial Initialize event to
a newly-created PG.  I don't love it but it gets the job done for now.

Signed-off-by: Sage Weil <sage@redhat.com>
src/osd/OSD.cc
src/osd/OSD.h
src/osd/OpQueueItem.h
src/osd/PG.cc
src/osd/PG.h
src/osd/PGPeeringEvent.h

index 2df4556ceb76231ce642b0225458d7a2a0a26748..cb346bff64a9372b9b25a9c2a1dd385243ede063 100644 (file)
@@ -3361,9 +3361,15 @@ int OSD::shutdown()
 
   // Shutdown PGs
   {
-    RWLock::RLocker l(pg_map_lock);
-    for (auto& p : pg_map) {
-      p.second->shutdown();
+    set<PGRef> pgs;
+    {
+      RWLock::RLocker l(pg_map_lock);
+      for (auto& p : pg_map) {
+       pgs.insert(p.second);
+      }
+    }
+    for (auto pg : pgs) {
+      pg->shutdown();
     }
   }
 
@@ -3470,31 +3476,38 @@ int OSD::shutdown()
 #ifdef PG_DEBUG_REFS
   service.dump_live_pgids();
 #endif
-  {
-    RWLock::RLocker l(pg_map_lock);
-    for (ceph::unordered_map<spg_t, PG*>::iterator p = pg_map.begin();
-        p != pg_map.end();
-        ++p) {
-      if (p->second->is_deleted()) {
+  while (true) {
+    set<PGRef> pgs;
+    {
+      RWLock::WLocker l(pg_map_lock);
+      for (auto& i : pg_map) {
+       pgs.insert(i.second);
+      }
+      pg_map.clear();
+    }
+    if (pgs.empty()) {
+      break;
+    }
+    for (auto& pg : pgs) {
+      if (pg->is_deleted()) {
        continue;
       }
-      dout(20) << " kicking pg " << p->first << dendl;
-      p->second->lock();
-      if (p->second->get_num_ref() != 1) {
-        derr << "pgid " << p->first << " has ref count of "
-            << p->second->get_num_ref() << dendl;
+      dout(20) << " kicking pg " << pg << dendl;
+      pg->lock();
+      if (pg->get_num_ref() != 1) {
+       derr << "pgid " << pg->get_pgid() << " has ref count of "
+            << pg->get_num_ref() << dendl;
 #ifdef PG_DEBUG_REFS
-       p->second->dump_live_ids();
+       pg->dump_live_ids();
 #endif
        if (cct->_conf->osd_shutdown_pgref_assert) {
          ceph_abort();
        }
       }
-      p->second->ch.reset();
-      p->second->unlock();
-      p->second->put("PGMap");
+      pg->ch.reset();
+      pg->unlock();
+      pg->put("PGMap");
     }
-    pg_map.clear();
   }
 #ifdef PG_DEBUG_REFS
   service.dump_live_pgids();
@@ -3780,23 +3793,32 @@ void OSD::recursive_remove_collection(CephContext* cct,
 // ======================================================
 // PG's
 
-PG *OSD::_open_lock_pg(
+PGRef OSD::_open_pg(
   OSDMapRef createmap,
-  spg_t pgid, bool no_lockdep_check)
+  OSDMapRef servicemap,
+  spg_t pgid)
 {
-  assert(osd_lock.is_locked());
-
   PG* pg = _make_pg(createmap, pgid);
   {
     RWLock::WLocker l(pg_map_lock);
-    pg->lock(no_lockdep_check);
     pg_map[pgid] = pg;
     pg->get("PGMap");  // because it's in pg_map
     service.pg_add_epoch(pg->pg_id, createmap->get_epoch());
+    service.init_splits_between(pgid, createmap, servicemap);
   }
   return pg;
 }
 
+PG *OSD::_open_lock_pg(
+  OSDMapRef createmap,
+  OSDMapRef servicemap,
+  spg_t pgid, bool no_lockdep_check)
+{
+  PGRef pg = _open_pg(createmap, servicemap, pgid);
+  pg->lock();
+  return pg.get();
+}
+
 PG* OSD::_make_pg(
   OSDMapRef createmap,
   spg_t pgid)
@@ -3828,29 +3850,6 @@ PG* OSD::_make_pg(
 }
 
 
-void OSD::add_newly_split_pg(PG *pg, PG::RecoveryCtx *rctx)
-{
-  epoch_t e(service.get_osdmap()->get_epoch());
-  pg->get("PGMap");  // For pg_map
-  pg_map[pg->pg_id] = pg;
-  service.pg_add_epoch(pg->pg_id, pg->get_osdmap()->get_epoch());
-
-  dout(10) << "Adding newly split pg " << *pg << dendl;
-  pg->handle_loaded(rctx);
-  pg->queue_null(e, e);
-  map<spg_t, list<PGPeeringEventRef> >::iterator to_wake =
-    peering_wait_for_split.find(pg->pg_id);
-  if (to_wake != peering_wait_for_split.end()) {
-    for (list<PGPeeringEventRef>::iterator i =
-          to_wake->second.begin();
-        i != to_wake->second.end();
-        ++i) {
-      enqueue_peering_evt(pg->get_pgid(), *i);
-    }
-    peering_wait_for_split.erase(to_wake);
-  }
-}
-
 PG *OSD::_create_lock_pg(
   OSDMapRef createmap,
   spg_t pgid,
@@ -3863,12 +3862,10 @@ PG *OSD::_create_lock_pg(
   const PastIntervals& pi,
   ObjectStore::Transaction& t)
 {
-  assert(osd_lock.is_locked());
   dout(20) << "_create_lock_pg pgid " << pgid << dendl;
 
-  PG *pg = _open_lock_pg(createmap, pgid, true);
+  PG *pg = _open_lock_pg(createmap, service.get_osdmap(), pgid, true);
 
-  service.init_splits_between(pgid, pg->get_osdmap(), service.get_osdmap());
   pg->init(
     role,
     up,
@@ -3886,7 +3883,7 @@ PG *OSD::_create_lock_pg(
   return pg;
 }
 
-PG *OSD::_lookup_lock_pg(spg_t pgid)
+PGRef OSD::_lookup_pg(spg_t pgid)
 {
   while (true) {
     {
@@ -3895,12 +3892,10 @@ PG *OSD::_lookup_lock_pg(spg_t pgid)
       if (p == pg_map.end()) {
        return nullptr;
       }
-      PG *pg = p->second;
-      pg->lock();
+      PGRef pg = p->second;
       if (!pg->is_deleted()) {
        return pg;
       }
-      pg->unlock();
     }
     // try again, this time with a write lock
     {
@@ -3909,19 +3904,31 @@ PG *OSD::_lookup_lock_pg(spg_t pgid)
       if (p == pg_map.end()) {
        return nullptr;
       }
-      PG *pg = p->second;
-      pg->lock();
+      PGRef pg = p->second;
       if (!pg->is_deleted()) {
        return pg;
       }
       pg_map.erase(p);
       pg->put("PGMap");
-      pg->unlock();
     }
     return nullptr;
   }
 }
 
+PG *OSD::_lookup_lock_pg(spg_t pgid)
+{
+  PGRef pg = _lookup_pg(pgid);
+  if (!pg) {
+    return nullptr;
+  }
+  pg->lock();
+  if (!pg->is_deleted()) {
+    return pg.get();
+  }
+  pg->unlock();
+  return nullptr;
+}
+
 PG *OSD::lookup_lock_pg(spg_t pgid)
 {
   return _lookup_lock_pg(pgid);
@@ -3994,9 +4001,9 @@ void OSD::load_pgs()
          assert(0 == "Missing map in load_pgs");
        }
       }
-      pg = _open_lock_pg(pgosdmap, pgid);
+      pg = _open_lock_pg(pgosdmap, osdmap, pgid);
     } else {
-      pg = _open_lock_pg(osdmap, pgid);
+      pg = _open_lock_pg(osdmap, osdmap, pgid);
     }
     // there can be no waiters here, so we don't call wake_pg_waiters
 
@@ -4035,6 +4042,69 @@ void OSD::load_pgs()
 }
 
 
+PGRef OSD::handle_pg_create_info(OSDMapRef osdmap, const PGCreateInfo *info)
+{
+  spg_t pgid = info->pgid;
+
+  int up_primary, acting_primary;
+  vector<int> up, acting;
+  osdmap->pg_to_up_acting_osds(
+    pgid.pgid, &up, &up_primary, &acting, &acting_primary);
+
+  /*
+  const bool is_mon_create =
+    evt->get_event().dynamic_type() == PG::NullEvt::static_type();
+  if (maybe_wait_for_max_pg(pgid, is_mon_create)) {
+    return nullptr;
+  }
+  */
+
+  PG::RecoveryCtx rctx = create_context();
+
+  const pg_pool_t* pp = osdmap->get_pg_pool(pgid.pool());
+  if (pp->has_flag(pg_pool_t::FLAG_EC_OVERWRITES) &&
+      store->get_type() != "bluestore") {
+    clog->warn() << "pg " << pgid
+                << " is at risk of silent data corruption: "
+                << "the pool allows ec overwrites but is not stored in "
+                << "bluestore, so deep scrubbing will not detect bitrot";
+  }
+  PG::_create(*rctx.transaction, pgid, pgid.get_split_bits(pp->get_pg_num()));
+  PG::_init(*rctx.transaction, pgid, pp);
+
+  int role = osdmap->calc_pg_role(whoami, acting, acting.size());
+  if (!pp->is_replicated() && role != pgid.shard) {
+    role = -1;
+  }
+
+  PGRef pg = _open_pg(get_map(info->epoch), osdmap, pgid);
+
+  pg->lock(true);
+
+  // we are holding the shard lock
+  assert(!pg->is_deleted());
+
+  pg->init(
+    role,
+    up,
+    up_primary,
+    acting,
+    acting_primary,
+    info->history,
+    info->past_intervals,
+    false,
+    rctx.transaction);
+
+  pg->handle_initialize(&rctx);
+  pg->handle_activate_map(&rctx);
+  rctx.created_pgs.insert(pg);
+
+  dispatch_context(rctx, pg.get(), osdmap, nullptr);
+
+  dout(10) << *pg << " is new" << dendl;
+  return pg;
+}
+
 /*
  * look up a pg.  if we have it, great.  if not, consider creating it IF the pg mapping
  * hasn't changed since the given epoch and we are the primary.
@@ -7842,32 +7912,49 @@ void OSD::check_osdmap_features()
   }
 }
 
-struct C_CompleteSplits : public Context {
+struct C_FinishSplits : public Context {
   OSD *osd;
   set<PGRef> pgs;
-  C_CompleteSplits(OSD *osd, const set<PGRef> &in)
+  C_FinishSplits(OSD *osd, const set<PGRef> &in)
     : osd(osd), pgs(in) {}
   void finish(int r) override {
-    Mutex::Locker l(osd->osd_lock);
-    if (osd->is_stopping())
-      return;
-    PG::RecoveryCtx rctx = osd->create_context();
-    for (set<PGRef>::iterator i = pgs.begin();
-        i != pgs.end();
-        ++i) {
-      osd->pg_map_lock.get_write();
-      (*i)->lock();
-      PG *pg = i->get();
-      osd->add_newly_split_pg(pg, &rctx);
-      osd->service.complete_split((*i)->get_pgid());
-      osd->pg_map_lock.put_write();
-      osd->dispatch_context_transaction(rctx, pg);
-      osd->wake_pg_waiters(*i);
-      (*i)->unlock();
-    }
+    osd->_finish_splits(pgs);
+  }
+};
+
+void OSD::_finish_splits(set<PGRef>& pgs)
+{
+  dout(10) << __func__ << " " << pgs << dendl;
+  Mutex::Locker l(osd_lock);
+  if (is_stopping())
+    return;
+  PG::RecoveryCtx rctx = create_context();
+  for (set<PGRef>::iterator i = pgs.begin();
+       i != pgs.end();
+       ++i) {
+    PG *pg = i->get();
+
+    pg->lock();
+    dout(10) << __func__ << " " << *pg << dendl;
+    epoch_t e = pg->get_osdmap()->get_epoch();
+    pg->unlock();
 
-    osd->dispatch_context(rctx, 0, osd->service.get_osdmap());
+    pg_map_lock.get_write();
+    pg->get("PGMap");  // For pg_map
+    pg_map[pg->get_pgid()] = pg;
+    service.complete_split(pg->get_pgid());
+    service.pg_add_epoch(pg->pg_id, e);
+    pg_map_lock.put_write();
+
+    pg->lock();
+    pg->handle_initialize(&rctx);
+    pg->queue_null(e, e);
+    dispatch_context_transaction(rctx, pg);
+    wake_pg_waiters(pg);
+    pg->unlock();
   }
+
+  dispatch_context(rctx, 0, service.get_osdmap());
 };
 
 void OSD::advance_pg(
@@ -7919,7 +8006,7 @@ void OSD::advance_pg(
   service.pg_update_epoch(pg->pg_id, lastmap->get_epoch());
 
   if (!new_pgs.empty()) {
-    rctx->on_applied->add(new C_CompleteSplits(this, new_pgs));
+    rctx->on_applied->add(new C_FinishSplits(this, new_pgs));
   }
 }
 
@@ -7940,26 +8027,27 @@ void OSD::consume_map()
   int num_pg_primary = 0, num_pg_replica = 0, num_pg_stray = 0;
 
   // scan pg's
+  vector<spg_t> pgids;
   {
     RWLock::RLocker l(pg_map_lock);
     for (ceph::unordered_map<spg_t,PG*>::iterator it = pg_map.begin();
         it != pg_map.end();
         ++it) {
+      pgids.push_back(it->first);
       PG *pg = it->second;
       if (pg->is_deleted()) {
        continue;
       }
-      pg->lock();
+      service.init_splits_between(it->first, service.get_osdmap(), osdmap);
+
+      // FIXME: this is lockless and racy, but we don't want to take pg lock
+      // here.
       if (pg->is_primary())
         num_pg_primary++;
       else if (pg->is_replica())
         num_pg_replica++;
       else
         num_pg_stray++;
-
-      service.init_splits_between(it->first, service.get_osdmap(), osdmap);
-
-      pg->unlock();
     }
 
     [[gnu::unused]] auto&& pending_create_locker = guardedly_lock(pending_creates_lock);
@@ -7991,23 +8079,16 @@ void OSD::consume_map()
 
   service.maybe_inject_dispatch_delay();
 
-  // scan pg's
-  {
-    RWLock::RLocker l(pg_map_lock);
-    for (ceph::unordered_map<spg_t,PG*>::iterator it = pg_map.begin();
-        it != pg_map.end();
-        ++it) {
-      enqueue_peering_evt(
-       it->first,
-       PGPeeringEventRef(
-         std::make_shared<PGPeeringEvent>(
-           osdmap->get_epoch(),
-           osdmap->get_epoch(),
-           NullEvt())));
-    }
-
-    logger->set(l_osd_pg, pg_map.size());
+  for (auto pgid : pgids) {
+    enqueue_peering_evt(
+      pgid,
+      PGPeeringEventRef(
+       std::make_shared<PGPeeringEvent>(
+         osdmap->get_epoch(),
+         osdmap->get_epoch(),
+         NullEvt())));
   }
+  logger->set(l_osd_pg, pgids.size());
   logger->set(l_osd_pg_primary, num_pg_primary);
   logger->set(l_osd_pg_replica, num_pg_replica);
   logger->set(l_osd_pg_stray, num_pg_stray);
@@ -8350,6 +8431,7 @@ void OSD::dispatch_context_transaction(PG::RecoveryCtx &ctx, PG *pg,
     ctx.transaction = new ObjectStore::Transaction;
     ctx.on_applied = new C_Contexts(cct);
     ctx.on_safe = new C_Contexts(cct);
+    ctx.created_pgs.clear();
   }
 }
 
@@ -8392,6 +8474,7 @@ void OSD::dispatch_context(PG::RecoveryCtx &ctx, PG *pg, OSDMapRef curmap,
     ctx.created_pgs.clear();
     delete (ctx.transaction);
     assert(tr == 0);
+    ctx.created_pgs.clear();
   }
 }
 
@@ -9049,35 +9132,45 @@ void OSD::dequeue_peering_evt(
   PGPeeringEventRef evt,
   ThreadPool::TPHandle& handle)
 {
-  auto curmap = service.get_osdmap();
   PG::RecoveryCtx rctx = create_context();
-  if (curmap->get_epoch() > pg->get_osdmap()->get_epoch()) {
-    advance_pg(curmap->get_epoch(), pg, handle, &rctx);
-  }
-  pg->do_peering_event(evt, &rctx);
-  auto need_up_thru = pg->get_need_up_thru();
-  auto same_interval_since = pg->get_same_interval_since();
-  dispatch_context_transaction(rctx, pg, &handle);
-  bool deleted = pg->is_deleted();
-  pg->unlock();
-
-  if (deleted) {
-    RWLock::WLocker l(pg_map_lock);
-    auto p = pg_map.find(pg->get_pgid());
-    if (p != pg_map.end() &&
-       p->second == pg) {
-      dout(20) << __func__ << " removed pg " << pg << " from pg_map" << dendl;
-      pg_map.erase(p);
-      pg->put("PGMap");
+  auto curmap = service.get_osdmap();
+  epoch_t need_up_thru = 0, same_interval_since = 0;
+  if (!pg) {
+    if (const MQuery *q = dynamic_cast<const MQuery*>(evt->evt.get())) {
+      handle_pg_query_nopg(*q);
     } else {
-      dout(20) << __func__ << " failed to remove pg " << pg << " from pg_map" << dendl;
+      derr << __func__ << " unrecognized pg-less event " << evt->get_desc() << dendl;
+      ceph_abort();
+    }
+  } else {
+    if (curmap->get_epoch() > pg->get_osdmap()->get_epoch()) {
+      advance_pg(curmap->get_epoch(), pg, handle, &rctx);
+    }
+    pg->do_peering_event(evt, &rctx);
+    dispatch_context_transaction(rctx, pg, &handle);
+    need_up_thru = pg->get_need_up_thru();
+    same_interval_since = pg->get_same_interval_since();
+    bool deleted = pg->is_deleted();
+    pg->unlock();
+
+    if (deleted) {
+      RWLock::WLocker l(pg_map_lock);
+      auto p = pg_map.find(pg->get_pgid());
+      if (p != pg_map.end() &&
+         p->second == pg) {
+       dout(20) << __func__ << " removed pg " << pg << " from pg_map" << dendl;
+       pg_map.erase(p);
+       pg->put("PGMap");
+      } else {
+       dout(20) << __func__ << " failed to remove pg " << pg << " from pg_map" << dendl;
+      }
     }
   }
 
   if (need_up_thru) {
     queue_want_up_thru(same_interval_since);
   }
-  dispatch_context(rctx, 0, curmap, &handle);
+  dispatch_context(rctx, pg, curmap, &handle);
 
   service.send_pg_temp();
 }
@@ -9537,6 +9630,7 @@ void OSD::ShardedOpWQ::wake_pg_waiters(spg_t pgid)
 void OSD::ShardedOpWQ::prune_pg_waiters(OSDMapRef osdmap, int whoami)
 {
   unsigned pushes_to_free = 0;
+  bool queued = false;
   for (auto sdata : shard_list) {
     Mutex::Locker l(sdata->sdata_op_ordering_lock);
     sdata->waiting_for_pg_osdmap = osdmap;
@@ -9545,8 +9639,26 @@ void OSD::ShardedOpWQ::prune_pg_waiters(OSDMapRef osdmap, int whoami)
       ShardData::pg_slot& slot = p->second;
       if (!slot.to_process.empty() && slot.num_running == 0) {
        if (osdmap->is_up_acting_osd_shard(p->first, whoami)) {
-         dout(20) << __func__ << "  " << p->first << " maps to us, keeping"
-                  << dendl;
+         if (slot.pending_nopg) {
+           dout(20) << __func__ << "  " << p->first << " maps to us, pending create,"
+                    << " requeuing" << dendl;
+           for (auto& q : slot.to_process) {
+             pushes_to_free += q.get_reserved_pushes();
+           }
+           for (auto i = slot.to_process.rbegin();
+                i != slot.to_process.rend();
+                ++i) {
+             sdata->_enqueue_front(std::move(*i), osd->op_prio_cutoff);
+           }
+           slot.to_process.clear();
+           slot.waiting_for_pg = false;
+           slot.pending_nopg = false;
+           ++slot.requeue_seq;
+           queued = true;
+         } else {
+           dout(20) << __func__ << "  " << p->first << " maps to us, keeping"
+                    << dendl;
+         }
          ++p;
          continue;
        }
@@ -9571,6 +9683,11 @@ void OSD::ShardedOpWQ::prune_pg_waiters(OSDMapRef osdmap, int whoami)
        ++p;
       }
     }
+    if (queued) {
+      sdata->sdata_lock.Lock();
+      sdata->sdata_cond.SignalOne();
+      sdata->sdata_lock.Unlock();
+    }
   }
   if (pushes_to_free > 0) {
     osd->service.release_reserved_pushes(pushes_to_free);
@@ -9733,27 +9850,57 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
     return;
   }
 
+  ThreadPool::TPHandle tp_handle(osd->cct, hb, timeout_interval,
+                                suicide_interval);
+
   // take next item
   auto qi = std::move(slot.to_process.front());
   slot.to_process.pop_front();
   dout(20) << __func__ << " " << qi << " pg " << pg << dendl;
+  unsigned pushes_to_free = 0;
 
-  if (!pg) {
+  while (!pg) {
     // should this pg shard exist on this osd in this (or a later) epoch?
     OSDMapRef osdmap = sdata->waiting_for_pg_osdmap;
-    if (osdmap->is_up_acting_osd_shard(token,
-                                      osd->whoami)) {
-      dout(20) << __func__ << " " << token
-              << " no pg, should exist, will wait" << " on " << qi << dendl;
-      slot.to_process.push_front(std::move(qi));
-      slot.waiting_for_pg = true;
-    } else if (qi.get_map_epoch() > osdmap->get_epoch()) {
+    const PGCreateInfo *create_info = qi.creates_pg();
+    if (qi.get_map_epoch() > osdmap->get_epoch()) {
       dout(20) << __func__ << " " << token
               << " no pg, item epoch is "
               << qi.get_map_epoch() << " > " << osdmap->get_epoch()
               << ", will wait on " << qi << dendl;
+      if (!!create_info || !qi.requires_pg()) {
+       slot.pending_nopg = true;
+      }
       slot.to_process.push_front(std::move(qi));
       slot.waiting_for_pg = true;
+    } else if (osdmap->is_up_acting_osd_shard(token, osd->whoami)) {
+      if (osd->service.splitting(token)) {
+       dout(20) << __func__ << " " << token
+                << " splitting, waiting on " << qi << dendl;
+       slot.to_process.push_front(std::move(qi));
+       slot.waiting_for_pg = true;
+      } else if (create_info) {
+       dout(20) << __func__ << " " << token
+                << " no pg, should create on " << qi << dendl;
+       pg = osd->handle_pg_create_info(osdmap, create_info);
+       if (pg) {
+         // we created the pg! drop out and continue "normally"!
+         _wake_pg_slot(token, sdata, slot, &pushes_to_free);
+         break;
+       }
+       dout(20) << __func__ << " ignored create on " << qi << dendl;
+      } else if (!qi.requires_pg()) {
+       // for pg-less events, we run them under the ordering lock, since
+       // we don't have the pg lock to keep them ordered.
+       qi.run(osd, pg, tp_handle);
+       sdata->sdata_op_ordering_lock.Unlock();
+       return;
+      } else {
+       dout(20) << __func__ << " " << token
+                << " no pg, should exist, will wait on " << qi << dendl;
+       slot.to_process.push_front(std::move(qi));
+       slot.waiting_for_pg = true;
+      }
     } else {
       dout(20) << __func__ << " " << token
               << " no pg, shouldn't exist,"
@@ -9779,6 +9926,9 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
   }
   sdata->sdata_op_ordering_lock.Unlock();
 
+  if (pushes_to_free) {
+    osd->service.release_reserved_pushes(pushes_to_free);
+  }
 
   // osd_opwq_process marks the point at which an operation has been dequeued
   // and will begin to be handled by a worker thread.
@@ -9802,8 +9952,6 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
   delete f;
   *_dout << dendl;
 
-  ThreadPool::TPHandle tp_handle(osd->cct, hb, timeout_interval,
-                                suicide_interval);
   qi.run(osd, pg, tp_handle);
 
   {
index b13d70245aacaa8cefddd9d33e30b9ccc3dbb389..544a7eaecdddf83461e4ec5fa8a509878a9bc322 100644 (file)
 
 #define CEPH_OSD_PROTOCOL    10 /* cluster internal */
 
+/*
+
+  lock ordering for pg map
+
+    PG::lock
+      ShardData::lock
+        OSD::pg_map_lock
+
+  */
 
 enum {
   l_osd_first = 10000,
@@ -233,7 +242,7 @@ class PrimaryLogPG;
 class AuthAuthorizeHandlerRegistry;
 
 class TestOpsSocketHook;
-struct C_CompleteSplits;
+struct C_FinishSplits;
 struct C_OpenPGs;
 class LogChannel;
 class CephContext;
@@ -1513,7 +1522,7 @@ private:
   void test_ops(std::string command, std::string args, ostream& ss);
   friend class TestOpsSocketHook;
   TestOpsSocketHook *test_ops_hook;
-  friend struct C_CompleteSplits;
+  friend struct C_FinishSplits;
   friend struct C_OpenPGs;
 
   // -- op queue --
@@ -1572,6 +1581,9 @@ private:
        /// to_process.  cleared by prune_pg_waiters.
        bool waiting_for_pg = false;
 
+       /// one or more queued items doesn't need a pg
+       bool pending_nopg = false;
+
        /// incremented by wake_pg_waiters; indicates racing _process threads
        /// should bail out (their op has been requeued)
        uint64_t requeue_seq = 0;
@@ -1843,6 +1855,7 @@ protected:
   map<spg_t, list<PGPeeringEventRef> > peering_wait_for_split;
   PGRecoveryStats pg_recovery_stats;
 
+  PGRef _lookup_pg(spg_t pgid);
   PG   *_lookup_lock_pg_with_map_lock_held(spg_t pgid);
   PG   *_lookup_lock_pg(spg_t pgid);
 
@@ -1857,10 +1870,15 @@ public:
   std::set<int> get_mapped_pools();
 
 protected:
-  PG   *_open_lock_pg(OSDMapRef createmap,
-                     spg_t pg, bool no_lockdep_check=false);
-
-  PG   *_create_lock_pg(
+  PGRef _open_pg(
+    OSDMapRef createmap, OSDMapRef servicemap,
+    spg_t pg);
+  PG *_open_lock_pg(
+    OSDMapRef createmap,
+    OSDMapRef servicemap,
+    spg_t pg,
+    bool no_lockdep_check=false);
+  PG *_create_lock_pg(
     OSDMapRef createmap,
     spg_t pgid,
     bool hold_map_lock,
@@ -1873,8 +1891,6 @@ protected:
     ObjectStore::Transaction& t);
 
   PG* _make_pg(OSDMapRef createmap, spg_t pgid);
-  void add_newly_split_pg(PG *pg,
-                         PG::RecoveryCtx *rctx);
 
   int handle_pg_peering_evt(
     spg_t pgid,
@@ -1919,6 +1935,7 @@ protected:
     OSDMapRef curmap,
     OSDMapRef nextmap,
     PG::RecoveryCtx *rctx);
+  void _finish_splits(set<PGRef>& pgs);
 
   // == monitor interaction ==
   Mutex mon_report_lock;
@@ -2017,6 +2034,7 @@ protected:
   void handle_pg_log(OpRequestRef op);
   void handle_pg_info(OpRequestRef op);
 
+  PGRef handle_pg_create_info(OSDMapRef osdmap, const PGCreateInfo *info);
   void handle_force_recovery(Message *m);
 
   void handle_pg_remove(OpRequestRef op);
index ac7401334e420cd89c26993933ba6a9725b37b80..fe3f66296adfc21eeee51f9a154565eb0b7e4dc6 100644 (file)
@@ -65,6 +65,13 @@ public:
       return 0;
     }
 
+    virtual bool requires_pg() const {
+      return true;
+    }
+    virtual const PGCreateInfo *creates_pg() const {
+      return nullptr;
+    }
+
     virtual ostream &print(ostream &rhs) const = 0;
 
     virtual void run(OSD *osd, PGRef& pg, ThreadPool::TPHandle &handle) = 0;
@@ -141,6 +148,12 @@ public:
   epoch_t get_map_epoch() const { return map_epoch; }
   dmc::ReqParams get_qos_params() const { return qos_params; }
   void set_qos_params(dmc::ReqParams qparams) { qos_params =  qparams; }
+  bool requires_pg() const {
+    return qitem->requires_pg();
+  }
+  const PGCreateInfo *creates_pg() const {
+    return qitem->creates_pg();
+  }
 
   friend ostream& operator<<(ostream& out, const OpQueueItem& item) {
     return out << "OpQueueItem("
@@ -212,6 +225,12 @@ public:
     return rhs << "PGPeeringEvent(" << evt->get_desc() << ")";
   }
   void run(OSD *osd, PGRef& pg, ThreadPool::TPHandle &handle) override final;
+  bool requires_pg() const override {
+    return evt->requires_pg;
+  }
+  const PGCreateInfo *creates_pg() const override {
+    return evt->create_info.get();
+  }
 };
 
 class PGSnapTrim : public PGOpQueueable {
index 7adab2ab4dfa292d490a7a3e5f18475a85dcdc78..daef636424e07298773b765c731d4d4bfdd5968f 100644 (file)
@@ -3772,7 +3772,9 @@ void PG::read_state(ObjectStore *store)
   }
 
   PG::RecoveryCtx rctx(0, 0, 0, 0, 0, new ObjectStore::Transaction);
-  handle_loaded(&rctx);
+  handle_initialize(&rctx);
+  // note: we don't activate here because we know the OSD will advance maps
+  // during boot.
   write_if_dirty(*rctx.transaction);
   store->queue_transaction(ch, std::move(*rctx.transaction));
   delete rctx.transaction;
@@ -6325,11 +6327,11 @@ void PG::do_peering_event(PGPeeringEventRef evt, RecoveryCtx *rctx)
   if (!have_same_or_newer_map(evt->get_epoch_sent())) {
     dout(10) << "deferring event " << evt->get_desc() << dendl;
     peering_waiters.push_back(evt);
-    return;
+  } else if (old_peering_evt(evt)) {
+    dout(10) << "discard old " << evt->get_desc() << dendl;
+  } else {
+    recovery_state.handle_event(evt, rctx);
   }
-  if (old_peering_evt(evt))
-    return;
-  recovery_state.handle_event(evt, rctx);
   write_if_dirty(*rctx->transaction);
 }
 
@@ -6445,27 +6447,11 @@ void PG::handle_activate_map(RecoveryCtx *rctx)
   write_if_dirty(*rctx->transaction);
 }
 
-void PG::handle_loaded(RecoveryCtx *rctx)
+void PG::handle_initialize(RecoveryCtx *rctx)
 {
-  dout(10) << "handle_loaded" << dendl;
-  Load evt;
-  recovery_state.handle_event(evt, rctx);
-  write_if_dirty(*rctx->transaction);
-}
-
-void PG::handle_create(RecoveryCtx *rctx)
-{
-  dout(10) << "handle_create" << dendl;
-  rctx->created_pgs.insert(this);
+  dout(10) << __func__ << dendl;
   Initialize evt;
   recovery_state.handle_event(evt, rctx);
-  ActMap evt2;
-  recovery_state.handle_event(evt2, rctx);
-  write_if_dirty(*rctx->transaction);
-
-  rctx->on_applied->add(make_lambda_context([this]() {
-    update_store_with_options();
-  }));
 }
 
 void PG::handle_query_state(Formatter *f)
@@ -6603,18 +6589,6 @@ PG::RecoveryState::Initial::Initial(my_context ctx)
   context< RecoveryMachine >().log_enter(state_name);
 }
 
-boost::statechart::result PG::RecoveryState::Initial::react(const Load& l)
-{
-  PG *pg = context< RecoveryMachine >().pg;
-
-  // do we tell someone we're here?
-  pg->send_notify = (!pg->is_primary());
-
-  pg->update_store_with_options();
-
-  return transit< Reset >();
-}
-
 boost::statechart::result PG::RecoveryState::Initial::react(const MNotifyRec& notify)
 {
   PG *pg = context< RecoveryMachine >().pg;
index 7518342a5feb6d4f663f4c4c345123fa94b41668..e060f3a303fc9a00079df0acd2f48b7b4d407976 100644 (file)
@@ -418,8 +418,7 @@ public:
     vector<int>& newacting, int acting_primary,
     RecoveryCtx *rctx);
   void handle_activate_map(RecoveryCtx *rctx);
-  void handle_create(RecoveryCtx *rctx);
-  void handle_loaded(RecoveryCtx *rctx);
+  void handle_initialize(RecoveryCtx *rctx);
   void handle_query_state(Formatter *f);
 
   /**
@@ -565,7 +564,7 @@ protected:
 
 
   bool deleting;  // true while in removing or OSD is shutting down
-  bool deleted = false;
+  atomic<bool> deleted = {false};
 
   ZTracer::Endpoint trace_endpoint;
 
@@ -1851,7 +1850,6 @@ public:
   };
 protected:
   TrivialEvent(Initialize)
-  TrivialEvent(Load)
   TrivialEvent(GotInfo)
   TrivialEvent(NeedUpThru)
   TrivialEvent(Backfilled)
@@ -2018,12 +2016,10 @@ protected:
 
       typedef boost::mpl::list <
        boost::statechart::transition< Initialize, Reset >,
-       boost::statechart::custom_reaction< Load >,
        boost::statechart::custom_reaction< NullEvt >,
        boost::statechart::transition< boost::statechart::event_base, Crashed >
        > reactions;
 
-      boost::statechart::result react(const Load&);
       boost::statechart::result react(const MNotifyRec&);
       boost::statechart::result react(const MInfoRec&);
       boost::statechart::result react(const MLogRec&);
index 597da5b729d6511c8a3b539b91f0b482958dacf0..302e3028f2a1e2bf2716aae4a12f0c13f67e4b66 100644 (file)
@@ -9,25 +9,46 @@
 
 class MOSDPGLog;
 
+/// what we need to instantiate a pg
+struct PGCreateInfo {
+  spg_t pgid;
+  epoch_t epoch = 0;
+  pg_history_t history;
+  PastIntervals past_intervals;
+  PGCreateInfo(spg_t p, epoch_t e,
+              const pg_history_t& h,
+              const PastIntervals& pi)
+    : pgid(p), epoch(e), history(h), past_intervals(pi) {}
+};
+
 class PGPeeringEvent {
   epoch_t epoch_sent;
   epoch_t epoch_requested;
-  boost::intrusive_ptr< const boost::statechart::event_base > evt;
   string desc;
 public:
+  boost::intrusive_ptr< const boost::statechart::event_base > evt;
+  bool requires_pg;
+  std::unique_ptr<PGCreateInfo> create_info;
   MEMPOOL_CLASS_HELPERS();
   template <class T>
   PGPeeringEvent(
     epoch_t epoch_sent,
     epoch_t epoch_requested,
-    const T &evt_)
+    const T &evt_,
+    bool req = true,
+    PGCreateInfo *ci = 0)
     : epoch_sent(epoch_sent),
       epoch_requested(epoch_requested),
-      evt(evt_.intrusive_from_this()) {
+      evt(evt_.intrusive_from_this()),
+      requires_pg(req),
+      create_info(ci) {
     stringstream out;
     out << "epoch_sent: " << epoch_sent
        << " epoch_requested: " << epoch_requested << " ";
     evt_.print(&out);
+    if (create_info) {
+      out << " +create_info";
+    }
     desc = out.str();
   }
   epoch_t get_epoch_sent() {