]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/osd: move pg_map and associated state to CoreState
authorSamuel Just <sjust@redhat.com>
Fri, 8 Jul 2022 06:16:25 +0000 (06:16 +0000)
committerSamuel Just <sjust@redhat.com>
Thu, 14 Jul 2022 00:58:29 +0000 (00:58 +0000)
Signed-off-by: Samuel Just <sjust@redhat.com>
src/crimson/admin/pg_commands.cc
src/crimson/osd/osd.cc
src/crimson/osd/osd.h
src/crimson/osd/osd_operations/pg_advance_map.cc
src/crimson/osd/osd_operations/pg_advance_map.h
src/crimson/osd/pg_shard_manager.h
src/crimson/osd/shard_services.cc
src/crimson/osd/shard_services.h

index dacfd515db481ef50efa8440f75690125345fdb6..033382a0db88da217943bd361c7e32a359d2f8a6 100644 (file)
@@ -56,16 +56,23 @@ public:
       return seastar::make_ready_future<tell_result_t>(tell_result_t{
           -ENOENT, fmt::format("pgid '{}' does not exist", pgid_str)});
     }
-    Ref<PG> pg = osd.get_pg(spg_id);
-    if (!pg) {
-      return seastar::make_ready_future<tell_result_t>(tell_result_t{
-        -ENOENT, fmt::format("i don't have pgid '{}'", spg_id)});
-    }
-    if (!pg->is_primary()) {
-      return seastar::make_ready_future<tell_result_t>(tell_result_t{
-        -EAGAIN, fmt::format("not primary for pgid '{}'", spg_id)});
-    }
-    return this->do_command(pg, cmdmap, format, std::move(input));
+    return osd.get_pg_shard_manager().with_pg(
+      spg_id,
+      [this, spg_id,
+       cmdmap=std::move(cmdmap),
+       format=std::move(format),
+       input=std::move(input)
+      ](auto &&pg) mutable {
+       if (!pg) {
+         return seastar::make_ready_future<tell_result_t>(tell_result_t{
+             -ENOENT, fmt::format("i don't have pgid '{}'", spg_id)});
+       }
+       if (!pg->is_primary()) {
+         return seastar::make_ready_future<tell_result_t>(tell_result_t{
+             -EAGAIN, fmt::format("not primary for pgid '{}'", spg_id)});
+       }
+       return this->do_command(pg, cmdmap, format, std::move(input));
+      });
   }
 
 private:
index bf122c017d5834fe7eeb5d69745119b134f42e0c..28a939d24d6d8fb72d0b9f5f9d4f25d8733c7873 100644 (file)
@@ -359,7 +359,7 @@ seastar::future<> OSD::start()
     pg_shard_manager.got_map(map->get_epoch());
     osdmap = std::move(map);
     bind_epoch = osdmap->get_epoch();
-    return load_pgs();
+    return pg_shard_manager.load_pgs();
   }).then([this] {
 
     uint64_t osd_required =
@@ -613,10 +613,7 @@ seastar::future<> OSD::stop()
     }).then([this] {
       return store.stop();
     }).then([this] {
-      return seastar::parallel_for_each(pg_map.get_pgs(),
-       [](auto& p) {
-       return p.second->stop();
-      });
+      return pg_shard_manager.stop_pgs();
     }).then([this] {
       return monc->stop();
     }).then([this] {
@@ -641,121 +638,29 @@ void OSD::dump_status(Formatter* f) const
   f->dump_string("state", pg_shard_manager.get_osd_state_string());
   f->dump_unsigned("oldest_map", superblock.oldest_map);
   f->dump_unsigned("newest_map", superblock.newest_map);
-  f->dump_unsigned("num_pgs", pg_map.get_pgs().size());
+  f->dump_unsigned("num_pgs", pg_shard_manager.get_num_pgs());
 }
 
 void OSD::dump_pg_state_history(Formatter* f) const
 {
   f->open_array_section("pgs");
-  for (auto [pgid, pg] : pg_map.get_pgs()) {
+  pg_shard_manager.for_each_pg([f](auto &pgid, auto &pg) {
     f->open_object_section("pg");
     f->dump_stream("pg") << pgid;
     const auto& peering_state = pg->get_peering_state();
     f->dump_string("currently", peering_state.get_current_state());
     peering_state.dump_history(f);
     f->close_section();
-  }
+  });
   f->close_section();
 }
 
 void OSD::print(std::ostream& out) const
 {
   out << "{osd." << superblock.whoami << " "
-    << superblock.osd_fsid << " [" << superblock.oldest_map
-    << "," << superblock.newest_map << "] " << pg_map.get_pgs().size()
-    << " pgs}";
-}
-
-seastar::future<> OSD::load_pgs()
-{
-  return store.list_collections().then([this](auto colls) {
-    return seastar::parallel_for_each(colls, [this](auto coll) {
-      spg_t pgid;
-      if (coll.is_pg(&pgid)) {
-        return load_pg(pgid).then([pgid, this](auto&& pg) {
-          logger().info("load_pgs: loaded {}", pgid);
-          pg_map.pg_loaded(pgid, std::move(pg));
-          shard_services.inc_pg_num();
-          return seastar::now();
-        });
-      } else if (coll.is_temp(&pgid)) {
-        // TODO: remove the collection
-        return seastar::now();
-      } else {
-        logger().warn("ignoring unrecognized collection: {}", coll);
-        return seastar::now();
-      }
-    });
-  });
-}
-
-seastar::future<Ref<PG>> OSD::make_pg(OSDMapService::cached_map_t create_map,
-                                     spg_t pgid,
-                                     bool do_create)
-{
-  using ec_profile_t = map<string,string>;
-  auto get_pool_info = [create_map, pgid, this] {
-    if (create_map->have_pg_pool(pgid.pool())) {
-      pg_pool_t pi = *create_map->get_pg_pool(pgid.pool());
-      string name = create_map->get_pool_name(pgid.pool());
-      ec_profile_t ec_profile;
-      if (pi.is_erasure()) {
-       ec_profile = create_map->get_erasure_code_profile(pi.erasure_code_profile);
-      }
-      return seastar::make_ready_future<std::tuple<pg_pool_t, string, ec_profile_t>>(
-        std::make_tuple(std::move(pi),
-                       std::move(name),
-                       std::move(ec_profile)));
-    } else {
-      // pool was deleted; grab final pg_pool_t off disk.
-      return pg_shard_manager.get_meta_coll().load_final_pool_info(pgid.pool());
-    }
-  };
-  auto get_collection = [pgid, do_create, this] {
-    const coll_t cid{pgid};
-    if (do_create) {
-      return store.create_new_collection(cid);
-    } else {
-      return store.open_collection(cid);
-    }
-  };
-  return seastar::when_all(
-    std::move(get_pool_info),
-    std::move(get_collection)
-  ).then([pgid, create_map, this] (auto&& ret) {
-    auto [pool, name, ec_profile] = std::move(std::get<0>(ret).get0());
-    auto coll = std::move(std::get<1>(ret).get0());
-    return seastar::make_ready_future<Ref<PG>>(
-      new PG{pgid,
-            pg_shard_t{whoami, pgid.shard},
-            std::move(coll),
-            std::move(pool),
-            std::move(name),
-            create_map,
-            shard_services,
-            ec_profile});
-  });
-}
-
-seastar::future<Ref<PG>> OSD::load_pg(spg_t pgid)
-{
-  logger().debug("{}: {}", __func__, pgid);
-
-  return seastar::do_with(PGMeta(store, pgid), [](auto& pg_meta) {
-    return pg_meta.get_epoch();
-  }).then([this](epoch_t e) {
-    return pg_shard_manager.get_map(e);
-  }).then([pgid, this] (auto&& create_map) {
-    return make_pg(std::move(create_map), pgid, false);
-  }).then([this](Ref<PG> pg) {
-    return pg->read_state(&store).then([pg] {
-       return seastar::make_ready_future<Ref<PG>>(std::move(pg));
-    });
-  }).handle_exception([pgid](auto ep) {
-    logger().info("pg {} saw exception on load {}", pgid, ep);
-    ceph_abort("Could not load pg" == 0);
-    return seastar::make_exception_future<Ref<PG>>(ep);
-  });
+      << superblock.osd_fsid << " [" << superblock.oldest_map
+      << "," << superblock.newest_map << "] " << pg_shard_manager.get_num_pgs()
+      << " pgs}";
 }
 
 std::optional<seastar::future<>>
@@ -887,18 +792,10 @@ void OSD::update_stats()
 
 MessageURef OSD::get_stats() const
 {
-  // todo: m-to-n: collect stats using map-reduce
   // MPGStats::had_map_for is not used since PGMonitor was removed
   auto m = crimson::make_message<MPGStats>(monc->get_fsid(), osdmap->get_epoch());
   m->osd_stat = osd_stat;
-  for (auto [pgid, pg] : pg_map.get_pgs()) {
-    if (pg->is_primary()) {
-      auto stats = pg->get_stats();
-      // todo: update reported_epoch,reported_seq,last_fresh
-      stats.reported_epoch = osdmap->get_epoch();
-      m->pg_stat.emplace(pgid.pgid, std::move(stats));
-    }
-  }
+  m->pg_stat = pg_shard_manager.get_pg_stats();
   return m;
 }
 
@@ -921,88 +818,6 @@ bool OSD::require_mon_peer(crimson::net::Connection *conn, Ref<Message> m)
   return true;
 }
 
-seastar::future<Ref<PG>> OSD::handle_pg_create_info(
-  std::unique_ptr<PGCreateInfo> info) {
-  return seastar::do_with(
-    std::move(info),
-    [this](auto &info) -> seastar::future<Ref<PG>> {
-      return pg_shard_manager.get_map(info->epoch).then(
-       [&info, this](OSDMapService::cached_map_t startmap) ->
-       seastar::future<std::tuple<Ref<PG>, OSDMapService::cached_map_t>> {
-         const spg_t &pgid = info->pgid;
-         if (info->by_mon) {
-           int64_t pool_id = pgid.pgid.pool();
-           const pg_pool_t *pool = osdmap->get_pg_pool(pool_id);
-           if (!pool) {
-             logger().debug(
-               "{} ignoring pgid {}, pool dne",
-               __func__,
-               pgid);
-             return seastar::make_ready_future<std::tuple<Ref<PG>, OSDMapService::cached_map_t>>(
-                std::make_tuple(Ref<PG>(), startmap));
-           }
-           ceph_assert(osdmap->require_osd_release >= ceph_release_t::octopus);
-           if (!pool->has_flag(pg_pool_t::FLAG_CREATING)) {
-             // this ensures we do not process old creating messages after the
-             // pool's initial pgs have been created (and pg are subsequently
-             // allowed to split or merge).
-             logger().debug(
-               "{} dropping {} create, pool does not have CREATING flag set",
-               __func__,
-               pgid);
-             return seastar::make_ready_future<std::tuple<Ref<PG>, OSDMapService::cached_map_t>>(
-                std::make_tuple(Ref<PG>(), startmap));
-           }
-         }
-         return make_pg(startmap, pgid, true).then(
-           [startmap=std::move(startmap)](auto pg) mutable {
-             return seastar::make_ready_future<std::tuple<Ref<PG>, OSDMapService::cached_map_t>>(
-                std::make_tuple(std::move(pg), std::move(startmap)));
-           });
-      }).then([this, &info](auto&& ret) ->
-              seastar::future<Ref<PG>> {
-        auto [pg, startmap] = std::move(ret);
-        if (!pg)
-          return seastar::make_ready_future<Ref<PG>>(Ref<PG>());
-        const pg_pool_t* pp = startmap->get_pg_pool(info->pgid.pool());
-
-        int up_primary, acting_primary;
-        vector<int> up, acting;
-        startmap->pg_to_up_acting_osds(
-          info->pgid.pgid, &up, &up_primary, &acting, &acting_primary);
-
-        int role = startmap->calc_pg_role(pg_shard_t(whoami, info->pgid.shard),
-                                          acting);
-
-        PeeringCtx rctx;
-        create_pg_collection(
-          rctx.transaction,
-          info->pgid,
-          info->pgid.get_split_bits(pp->get_pg_num()));
-        init_pg_ondisk(
-          rctx.transaction,
-          info->pgid,
-          pp);
-
-        pg->init(
-          role,
-          up,
-          up_primary,
-          acting,
-          acting_primary,
-          info->history,
-          info->past_intervals,
-          rctx.transaction);
-
-        return shard_services.start_operation<PGAdvanceMap>(
-          *this, pg, pg->get_osdmap_epoch(),
-          osdmap->get_epoch(), std::move(rctx), true).second.then([pg=pg] {
-            return seastar::make_ready_future<Ref<PG>>(pg);
-        });
-      });
-  });
-}
-
 seastar::future<> OSD::handle_osd_map(crimson::net::ConnectionRef conn,
                                       Ref<MOSDMap> m)
 {
@@ -1118,7 +933,7 @@ seastar::future<> OSD::committed_osd_maps(version_t first,
     }
     return check_osdmap_features().then([this] {
       // yay!
-      return consume_map(osdmap->get_epoch());
+      return pg_shard_manager.broadcast_map_to_pgs(osdmap->get_epoch());
     });
   }).then([m, this] {
     if (pg_shard_manager.is_active()) {
@@ -1221,14 +1036,17 @@ seastar::future<> OSD::handle_rep_op(crimson::net::ConnectionRef conn,
 seastar::future<> OSD::handle_rep_op_reply(crimson::net::ConnectionRef conn,
                                           Ref<MOSDRepOpReply> m)
 {
-  const auto& pgs = pg_map.get_pgs();
-  if (auto pg = pgs.find(m->get_spg()); pg != pgs.end()) {
-    m->finish_decode();
-    pg->second->handle_rep_op_reply(conn, *m);
-  } else {
-    logger().warn("stale reply: {}", *m);
-  }
-  return seastar::now();
+  return pg_shard_manager.with_pg(
+    m->get_spg(),
+    [conn=std::move(conn), m=std::move(m)](auto &&pg) {
+      if (pg) {
+       m->finish_decode();
+       pg->handle_rep_op_reply(conn, *m);
+      } else {
+       logger().warn("stale reply: {}", *m);
+      }
+      return seastar::now();
+    });
 }
 
 seastar::future<> OSD::handle_scrub(crimson::net::ConnectionRef conn,
@@ -1329,9 +1147,10 @@ void OSD::update_heartbeat_peers()
   if (!pg_shard_manager.is_active()) {
     return;
   }
-  for (auto& pg : pg_map.get_pgs()) {
+
+  pg_shard_manager.for_each_pg([this](auto &pgid, auto &pg) {
     vector<int> up, acting;
-    osdmap->pg_to_up_acting_osds(pg.first.pgid,
+    osdmap->pg_to_up_acting_osds(pgid.pgid,
                                  &up, nullptr,
                                  &acting, nullptr);
     for (int osd : boost::join(up, acting)) {
@@ -1341,7 +1160,7 @@ void OSD::update_heartbeat_peers()
         heartbeat->add_peer(osd, osdmap->get_epoch());
       }
     }
-  }
+  });
   heartbeat->update_peers(whoami);
 }
 
@@ -1367,51 +1186,6 @@ seastar::future<> OSD::check_osdmap_features()
                           stringify((int)osdmap->require_osd_release));
 }
 
-seastar::future<> OSD::consume_map(epoch_t epoch)
-{
-  // todo: m-to-n: broadcast this news to all shards
-  auto &pgs = pg_map.get_pgs();
-  return seastar::parallel_for_each(pgs.begin(), pgs.end(), [=](auto& pg) {
-    return shard_services.start_operation<PGAdvanceMap>(
-      *this, pg.second, pg.second->get_osdmap_epoch(), epoch,
-      PeeringCtx{}, false).second;
-  }).then([epoch, this] {
-    pg_shard_manager.got_map(epoch);
-    return seastar::make_ready_future();
-  });
-}
-
-
-seastar::future<Ref<PG>>
-OSD::get_or_create_pg(
-  PGMap::PGCreationBlockingEvent::TriggerI&& trigger,
-  spg_t pgid,
-  epoch_t epoch,
-  std::unique_ptr<PGCreateInfo> info)
-{
-  if (info) {
-    auto [fut, creating] = pg_map.wait_for_pg(std::move(trigger), pgid);
-    if (!creating) {
-      pg_map.set_creating(pgid);
-      (void)handle_pg_create_info(std::move(info));
-    }
-    return std::move(fut);
-  } else {
-    return seastar::make_ready_future<Ref<PG>>(pg_map.get_pg(pgid));
-  }
-}
-
-seastar::future<Ref<PG>> OSD::wait_for_pg(
-  PGMap::PGCreationBlockingEvent::TriggerI&& trigger, spg_t pgid)
-{
-  return pg_map.wait_for_pg(std::move(trigger), pgid).first;
-}
-
-Ref<PG> OSD::get_pg(spg_t pgid)
-{
-  return pg_map.get_pg(pgid);
-}
-
 seastar::future<> OSD::prepare_to_stop()
 {
   if (osdmap && osdmap->is_up(whoami)) {
index b70f2ccdc9cbb13dccf875480c5ebb78229fa164..ca7e59a62afb80e4236ef8aae453f3e12aef93e6 100644 (file)
@@ -146,12 +146,6 @@ private:
   seastar::future<> _send_boot();
   seastar::future<> _add_me_to_crush();
 
-  seastar::future<Ref<PG>> make_pg(OSDMapService::cached_map_t create_map,
-                                  spg_t pgid,
-                                  bool do_create);
-  seastar::future<Ref<PG>> load_pg(spg_t pgid);
-  seastar::future<> load_pgs();
-
   seastar::future<> osdmap_subscribe(version_t epoch, bool force_request);
 
   void write_superblock(ceph::os::Transaction& t);
@@ -159,9 +153,6 @@ private:
 
   bool require_mon_peer(crimson::net::Connection *conn, Ref<Message> m);
 
-  seastar::future<Ref<PG>> handle_pg_create_info(
-    std::unique_ptr<PGCreateInfo> info);
-
   seastar::future<> handle_osd_map(crimson::net::ConnectionRef conn,
                                    Ref<MOSDMap> m);
   seastar::future<> handle_osd_op(crimson::net::ConnectionRef conn,
@@ -194,14 +185,14 @@ private:
     crimson::net::ConnectionRef conn,
     Ref<MOSDPGUpdateLogMissingReply> m);
 public:
+  auto &get_pg_shard_manager() {
+    return pg_shard_manager;
+  }
   ShardServices &get_shard_services() {
     return pg_shard_manager.get_shard_services();
   }
 
-  seastar::future<> consume_map(epoch_t epoch);
-
 private:
-  PGMap pg_map;
   crimson::common::Gated gate;
 
   seastar::promise<> stop_acked;
@@ -215,16 +206,7 @@ private:
   void update_heartbeat_peers();
   friend class PGAdvanceMap;
 
-  seastar::future<Ref<PG>> get_or_create_pg(
-    PGMap::PGCreationBlockingEvent::TriggerI&&,
-    spg_t pgid,
-    epoch_t epoch,
-    std::unique_ptr<PGCreateInfo> info);
-  seastar::future<Ref<PG>> wait_for_pg(
-    PGMap::PGCreationBlockingEvent::TriggerI&&, spg_t pgid);
-
 public:
-  Ref<PG> get_pg(spg_t pgid);
   seastar::future<> send_beacon();
 
   template <typename T, typename... Args>
@@ -268,7 +250,9 @@ public:
          PGMap::PGCreationBlockingEvent
          >([this, &opref](auto &&trigger) {
            std::ignore = this; // avoid clang warning
-           return get_or_create_pg(
+           return pg_shard_manager.get_or_create_pg(
+             pg_shard_manager,
+             pg_shard_manager.get_shard_services(),
              std::move(trigger),
              opref.get_pgid(), opref.get_epoch(),
              std::move(opref.get_create_info()));
@@ -279,7 +263,8 @@ public:
          PGMap::PGCreationBlockingEvent
          >([this, &opref](auto &&trigger) {
            std::ignore = this; // avoid clang warning
-           return wait_for_pg(std::move(trigger), opref.get_pgid());
+           return pg_shard_manager.wait_for_pg(
+             std::move(trigger), opref.get_pgid());
          });
       }
     }).then([this, &logger, &opref](Ref<PG> pgref) {
index d435bc0fe248232187fb4dc986a376134d53a3e7..51b279c7edb8b7957d74f29b1ba1c9e2c89bec95 100644 (file)
@@ -7,7 +7,7 @@
 #include "include/types.h"
 #include "common/Formatter.h"
 #include "crimson/osd/pg.h"
-#include "crimson/osd/osd.h"
+#include "crimson/osd/pg_shard_manager.h"
 #include "crimson/osd/osd_operations/pg_advance_map.h"
 #include "crimson/osd/osd_operation_external_tracking.h"
 #include "osd/PeeringState.h"
@@ -21,9 +21,9 @@ namespace {
 namespace crimson::osd {
 
 PGAdvanceMap::PGAdvanceMap(
-  OSD &osd, Ref<PG> pg, epoch_t from, epoch_t to,
+  PGShardManager &shard_manager, Ref<PG> pg, epoch_t from, epoch_t to,
   PeeringCtx &&rctx, bool do_init)
-  : osd(osd), pg(pg), from(from), to(to),
+  : shard_manager(shard_manager), pg(pg), from(from), to(to),
     rctx(std::move(rctx)), do_init(do_init) {}
 
 PGAdvanceMap::~PGAdvanceMap() {}
@@ -68,7 +68,7 @@ seastar::future<> PGAdvanceMap::start()
       boost::make_counting_iterator(from + 1),
       boost::make_counting_iterator(to + 1),
       [this](epoch_t next_epoch) {
-        return osd.pg_shard_manager.get_map(next_epoch).then(
+        return shard_manager.get_map(next_epoch).then(
           [this] (cached_map_t&& next_map) {
             pg->handle_advance_map(next_map, rctx);
           });
@@ -76,19 +76,20 @@ seastar::future<> PGAdvanceMap::start()
         pg->handle_activate_map(rctx);
         handle.exit();
         if (do_init) {
-          osd.pg_map.pg_created(pg->get_pgid(), pg);
-          osd.shard_services.inc_pg_num();
+          shard_manager.pg_created(pg->get_pgid(), pg);
+          shard_manager.get_shard_services().inc_pg_num();
           logger().info("PGAdvanceMap::start new pg {}", *pg);
         }
         return seastar::when_all_succeed(
-          pg->get_need_up_thru() \
-            ? osd.shard_services.send_alive(pg->get_same_interval_since())
-            : seastar::now(),
-          osd.shard_services.dispatch_context(
+          pg->get_need_up_thru()
+         ? shard_manager.get_shard_services().send_alive(
+           pg->get_same_interval_since())
+         : seastar::now(),
+          shard_manager.get_shard_services().dispatch_context(
             pg->get_collection_ref(),
             std::move(rctx)));
       }).then_unpack([this] {
-        return osd.shard_services.send_pg_temp();
+        return shard_manager.get_shard_services().send_pg_temp();
       });
   }).then([this, ref=std::move(ref)] {
     logger().debug("{}: complete", *this);
index 1ec23029b2d976117a7cc17e337fac53638ac9dd..51789a3a80270f6bb353f227daecc5aa3357946e 100644 (file)
@@ -17,7 +17,7 @@ namespace ceph {
 
 namespace crimson::osd {
 
-class OSD;
+class PGShardManager;
 class PG;
 
 class PGAdvanceMap : public PhasedOperationT<PGAdvanceMap> {
@@ -25,7 +25,7 @@ public:
   static constexpr OperationTypeCode type = OperationTypeCode::pg_advance_map;
 
 protected:
-  OSD &osd;
+  PGShardManager &shard_manager;
   Ref<PG> pg;
   PipelineHandle handle;
 
@@ -37,7 +37,7 @@ protected:
 
 public:
   PGAdvanceMap(
-    OSD &osd, Ref<PG> pg, epoch_t from, epoch_t to,
+    PGShardManager &shard_manager, Ref<PG> pg, epoch_t from, epoch_t to,
     PeeringCtx &&rctx, bool do_init);
   ~PGAdvanceMap();
 
index 6d54c6b4f3ba989f5189630ad92cf3b058879ea0..1f759ac97b7a4de31f42665b4271aae2a2165a38 100644 (file)
@@ -76,6 +76,28 @@ public:
   FORWARD_TO_CORE(store_maps)
   FORWARD_TO_CORE(get_up_epoch)
   FORWARD_TO_CORE(set_up_epoch)
+
+  FORWARD(pg_created, pg_created, core_state.pg_map)
+  auto load_pgs() {
+    return core_state.load_pgs(shard_services);
+  }
+  FORWARD_TO_CORE(stop_pgs)
+  FORWARD_CONST(get_pg_stats, get_pg_stats, core_state)
+
+  FORWARD_TO_CORE(get_or_create_pg)
+  FORWARD_TO_CORE(wait_for_pg)
+  FORWARD_CONST(for_each_pg, for_each_pg, core_state)
+  auto get_num_pgs() const { return core_state.pg_map.get_pgs().size(); }
+
+  auto broadcast_map_to_pgs(epoch_t epoch) {
+    return core_state.broadcast_map_to_pgs(
+      *this, shard_services, epoch);
+  }
+
+  template <typename F>
+  auto with_pg(spg_t pgid, F &&f) {
+    return std::invoke(std::forward<F>(f), core_state.get_pg(pgid));
+  }
 };
 
 }
index 7fcbc2472971916d57f0ac07bd613e6fec215695..52dcda071b69ed8d4b0f43482a8aa87ab6d61014 100644 (file)
@@ -19,6 +19,9 @@
 #include "crimson/net/Connection.h"
 #include "crimson/os/cyanstore/cyan_store.h"
 #include "crimson/osd/osdmap_service.h"
+#include "crimson/osd/osd_operations/pg_advance_map.h"
+#include "crimson/osd/pg.h"
+#include "crimson/osd/pg_meta.h"
 
 namespace {
   seastar::logger& logger() {
@@ -366,6 +369,280 @@ seastar::future<> CoreState::store_maps(ceph::os::Transaction& t,
   });
 }
 
+seastar::future<Ref<PG>> CoreState::make_pg(
+  ShardServices &shard_services,
+  OSDMapService::cached_map_t create_map,
+  spg_t pgid,
+  bool do_create)
+{
+  using ec_profile_t = std::map<std::string, std::string>;
+  auto get_pool_info = [create_map, pgid, this] {
+    if (create_map->have_pg_pool(pgid.pool())) {
+      pg_pool_t pi = *create_map->get_pg_pool(pgid.pool());
+      std::string name = create_map->get_pool_name(pgid.pool());
+      ec_profile_t ec_profile;
+      if (pi.is_erasure()) {
+       ec_profile = create_map->get_erasure_code_profile(
+         pi.erasure_code_profile);
+      }
+      return seastar::make_ready_future<
+       std::tuple<pg_pool_t,std::string, ec_profile_t>
+       >(std::make_tuple(
+           std::move(pi),
+           std::move(name),
+           std::move(ec_profile)));
+    } else {
+      // pool was deleted; grab final pg_pool_t off disk.
+      return get_meta_coll().load_final_pool_info(pgid.pool());
+    }
+  };
+  auto get_collection = [pgid, do_create, this] {
+    const coll_t cid{pgid};
+    if (do_create) {
+      return store.create_new_collection(cid);
+    } else {
+      return store.open_collection(cid);
+    }
+  };
+  return seastar::when_all(
+    std::move(get_pool_info),
+    std::move(get_collection)
+  ).then([&shard_services, pgid, create_map, this] (auto&& ret) {
+    auto [pool, name, ec_profile] = std::move(std::get<0>(ret).get0());
+    auto coll = std::move(std::get<1>(ret).get0());
+    return seastar::make_ready_future<Ref<PG>>(
+      new PG{
+       pgid,
+       pg_shard_t{whoami, pgid.shard},
+       std::move(coll),
+       std::move(pool),
+       std::move(name),
+       create_map,
+       shard_services,
+       ec_profile});
+  });
+}
+
+seastar::future<Ref<PG>> CoreState::handle_pg_create_info(
+  PGShardManager &shard_manager,
+  ShardServices &shard_services,
+  std::unique_ptr<PGCreateInfo> info) {
+  return seastar::do_with(
+    std::move(info),
+    [this, &shard_manager, &shard_services](auto &info) -> seastar::future<Ref<PG>> {
+      return get_map(info->epoch).then(
+       [&info, &shard_services, this](
+         OSDMapService::cached_map_t startmap) ->
+       seastar::future<std::tuple<Ref<PG>, OSDMapService::cached_map_t>> {
+         const spg_t &pgid = info->pgid;
+         if (info->by_mon) {
+           int64_t pool_id = pgid.pgid.pool();
+           const pg_pool_t *pool = get_osdmap()->get_pg_pool(pool_id);
+           if (!pool) {
+             logger().debug(
+               "{} ignoring pgid {}, pool dne",
+               __func__,
+               pgid);
+             return seastar::make_ready_future<
+               std::tuple<Ref<PG>, OSDMapService::cached_map_t>
+               >(std::make_tuple(Ref<PG>(), startmap));
+           }
+           ceph_assert(osdmap->require_osd_release >= ceph_release_t::octopus);
+           if (!pool->has_flag(pg_pool_t::FLAG_CREATING)) {
+             // this ensures we do not process old creating messages after the
+             // pool's initial pgs have been created (and pg are subsequently
+             // allowed to split or merge).
+             logger().debug(
+               "{} dropping {} create, pool does not have CREATING flag set",
+               __func__,
+               pgid);
+             return seastar::make_ready_future<
+               std::tuple<Ref<PG>, OSDMapService::cached_map_t>
+               >(std::make_tuple(Ref<PG>(), startmap));
+           }
+         }
+         return make_pg(shard_services, startmap, pgid, true).then(
+           [startmap=std::move(startmap)](auto pg) mutable {
+             return seastar::make_ready_future<
+               std::tuple<Ref<PG>, OSDMapService::cached_map_t>
+               >(std::make_tuple(std::move(pg), std::move(startmap)));
+           });
+       }).then([this, &shard_manager, &shard_services, &info](auto&& ret) ->
+               seastar::future<Ref<PG>> {
+         auto [pg, startmap] = std::move(ret);
+         if (!pg)
+           return seastar::make_ready_future<Ref<PG>>(Ref<PG>());
+         const pg_pool_t* pp = startmap->get_pg_pool(info->pgid.pool());
+
+         int up_primary, acting_primary;
+         vector<int> up, acting;
+         startmap->pg_to_up_acting_osds(
+           info->pgid.pgid, &up, &up_primary, &acting, &acting_primary);
+
+         int role = startmap->calc_pg_role(
+           pg_shard_t(whoami, info->pgid.shard),
+           acting);
+
+         PeeringCtx rctx;
+         create_pg_collection(
+           rctx.transaction,
+           info->pgid,
+           info->pgid.get_split_bits(pp->get_pg_num()));
+         init_pg_ondisk(
+           rctx.transaction,
+           info->pgid,
+           pp);
+
+         pg->init(
+           role,
+           up,
+           up_primary,
+           acting,
+           acting_primary,
+           info->history,
+           info->past_intervals,
+           rctx.transaction);
+
+         return shard_services.start_operation<PGAdvanceMap>(
+           shard_manager, pg, pg->get_osdmap_epoch(),
+           osdmap->get_epoch(), std::move(rctx), true).second.then([pg=pg] {
+             return seastar::make_ready_future<Ref<PG>>(pg);
+           });
+       });
+    });
+}
+
+
+seastar::future<Ref<PG>>
+CoreState::get_or_create_pg(
+  PGShardManager &shard_manager,
+  ShardServices &shard_services,
+  PGMap::PGCreationBlockingEvent::TriggerI&& trigger,
+  spg_t pgid,
+  epoch_t epoch,
+  std::unique_ptr<PGCreateInfo> info)
+{
+  if (info) {
+    auto [fut, creating] = pg_map.wait_for_pg(std::move(trigger), pgid);
+    if (!creating) {
+      pg_map.set_creating(pgid);
+      (void)handle_pg_create_info(
+       shard_manager, shard_services, std::move(info));
+    }
+    return std::move(fut);
+  } else {
+    return seastar::make_ready_future<Ref<PG>>(pg_map.get_pg(pgid));
+  }
+}
+
+seastar::future<Ref<PG>> CoreState::wait_for_pg(
+  PGMap::PGCreationBlockingEvent::TriggerI&& trigger, spg_t pgid)
+{
+  return pg_map.wait_for_pg(std::move(trigger), pgid).first;
+}
+
+Ref<PG> CoreState::get_pg(spg_t pgid)
+{
+  return pg_map.get_pg(pgid);
+}
+
+seastar::future<> CoreState::load_pgs(
+  ShardServices &shard_services)
+{
+  return store.list_collections(
+  ).then([this, &shard_services](auto colls) {
+    return seastar::parallel_for_each(
+      colls,
+      [this, &shard_services](auto coll) {
+       spg_t pgid;
+       if (coll.is_pg(&pgid)) {
+         return load_pg(
+           shard_services,
+           pgid
+         ).then([pgid, this, &shard_services](auto &&pg) {
+           logger().info("load_pgs: loaded {}", pgid);
+           pg_map.pg_loaded(pgid, std::move(pg));
+           shard_services.inc_pg_num();
+           return seastar::now();
+         });
+       } else if (coll.is_temp(&pgid)) {
+         logger().warn(
+           "found temp collection on crimson osd, should be impossible: {}",
+           coll);
+         ceph_assert(0 == "temp collection on crimson osd, should be impossible");
+         return seastar::now();
+       } else {
+         logger().warn("ignoring unrecognized collection: {}", coll);
+         return seastar::now();
+       }
+      });
+  });
+}
+
+seastar::future<Ref<PG>> CoreState::load_pg(
+  ShardServices &shard_services,
+  spg_t pgid)
+{
+  logger().debug("{}: {}", __func__, pgid);
+
+  return seastar::do_with(PGMeta(store, pgid), [](auto& pg_meta) {
+    return pg_meta.get_epoch();
+  }).then([this](epoch_t e) {
+    return get_map(e);
+  }).then([pgid, this, &shard_services] (auto&& create_map) {
+    return make_pg(shard_services, std::move(create_map), pgid, false);
+  }).then([this](Ref<PG> pg) {
+    return pg->read_state(&store).then([pg] {
+       return seastar::make_ready_future<Ref<PG>>(std::move(pg));
+    });
+  }).handle_exception([pgid](auto ep) {
+    logger().info("pg {} saw exception on load {}", pgid, ep);
+    ceph_abort("Could not load pg" == 0);
+    return seastar::make_exception_future<Ref<PG>>(ep);
+  });
+}
+
+seastar::future<> CoreState::stop_pgs()
+{
+  return seastar::parallel_for_each(
+    pg_map.get_pgs(),
+    [](auto& p) {
+      return p.second->stop();
+    });
+}
+
+std::map<pg_t, pg_stat_t> CoreState::get_pg_stats() const
+{
+  std::map<pg_t, pg_stat_t> ret;
+  for (auto [pgid, pg] : pg_map.get_pgs()) {
+    if (pg->is_primary()) {
+      auto stats = pg->get_stats();
+      // todo: update reported_epoch,reported_seq,last_fresh
+      stats.reported_epoch = osdmap->get_epoch();
+      ret.emplace(pgid.pgid, std::move(stats));
+    }
+  }
+  return ret;
+}
+
+seastar::future<> CoreState::broadcast_map_to_pgs(
+  PGShardManager &shard_manager,
+  ShardServices &shard_services,
+  epoch_t epoch)
+{
+  auto &pgs = pg_map.get_pgs();
+  return seastar::parallel_for_each(
+    pgs.begin(), pgs.end(),
+    [=, &shard_manager, &shard_services](auto& pg) {
+      return shard_services.start_operation<PGAdvanceMap>(
+       shard_manager, pg.second, pg.second->get_osdmap_epoch(), epoch,
+       PeeringCtx{}, false).second;
+    }).then([epoch, this] {
+      osdmap_gate.got_map(epoch);
+      return seastar::make_ready_future();
+    });
+}
+
 seastar::future<> ShardServices::dispatch_context_transaction(
   crimson::os::CollectionRef col, PeeringCtx &ctx) {
   if (ctx.transaction.empty()) {
index a46b8bffaa2b59a1e95692871ba27626dfe24d6a..1860c94dbf5d4f438e918ee51081ea34c7020679 100644 (file)
@@ -19,6 +19,7 @@
 #include "crimson/osd/osdmap_gate.h"
 #include "crimson/osd/osd_meta.h"
 #include "crimson/osd/object_context.h"
+#include "crimson/osd/pg_map.h"
 #include "crimson/osd/state.h"
 #include "common/AsyncReserver.h"
 
@@ -44,6 +45,8 @@ class BufferedRecoveryMessages;
 
 namespace crimson::osd {
 
+class PGShardManager;
+
 /**
  * PerShardState
  *
@@ -237,6 +240,46 @@ class CoreState : public md_config_obs_t, public OSDMapService {
                     epoch_t e, bufferlist&& bl);
   seastar::future<> store_maps(ceph::os::Transaction& t,
                                epoch_t start, Ref<MOSDMap> m);
+
+  // PGMap state
+  PGMap pg_map;
+
+  seastar::future<Ref<PG>> make_pg(
+    ShardServices &shard_services,
+    cached_map_t create_map,
+    spg_t pgid,
+    bool do_create);
+  seastar::future<Ref<PG>> handle_pg_create_info(
+    PGShardManager &shard_manager,
+    ShardServices &shard_services,
+    std::unique_ptr<PGCreateInfo> info);
+  seastar::future<Ref<PG>> get_or_create_pg(
+    PGShardManager &shard_manager,
+    ShardServices &shard_services,
+    PGMap::PGCreationBlockingEvent::TriggerI&&,
+    spg_t pgid,
+    epoch_t epoch,
+    std::unique_ptr<PGCreateInfo> info);
+  seastar::future<Ref<PG>> wait_for_pg(
+    PGMap::PGCreationBlockingEvent::TriggerI&&, spg_t pgid);
+  Ref<PG> get_pg(spg_t pgid);
+  seastar::future<> load_pgs(ShardServices &shard_services);
+  seastar::future<Ref<PG>> load_pg(
+    ShardServices &shard_services,
+    spg_t pgid);
+  seastar::future<> stop_pgs();
+  std::map<pg_t, pg_stat_t> get_pg_stats() const;
+  seastar::future<> broadcast_map_to_pgs(
+    PGShardManager &shard_manager,
+    ShardServices &shard_services,
+    epoch_t epoch);
+
+  template <typename F>
+  void for_each_pg(F &&f) const {
+    for (auto &pg : pg_map.get_pgs()) {
+      std::invoke(f, pg.first, pg.second);
+    }
+  }
 };
 
 #define FORWARD_CONST(FROM_METHOD, TO_METHOD, TARGET)          \