]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/osd: move osdmap service to CoreState
authorSamuel Just <sjust@redhat.com>
Fri, 1 Jul 2022 07:22:32 +0000 (00:22 -0700)
committerSamuel Just <sjust@redhat.com>
Thu, 14 Jul 2022 00:58:29 +0000 (00:58 +0000)
Signed-off-by: Samuel Just <sjust@redhat.com>
src/crimson/osd/osd.cc
src/crimson/osd/osd.h
src/crimson/osd/osd_operations/pg_advance_map.cc
src/crimson/osd/pg_shard_manager.cc
src/crimson/osd/pg_shard_manager.h
src/crimson/osd/shard_services.cc
src/crimson/osd/shard_services.h

index 85a4741775c8b74f1bb072a872db6462b3ee8ed6..bf122c017d5834fe7eeb5d69745119b134f42e0c 100644 (file)
@@ -7,7 +7,6 @@
 
 #include <boost/iterator/counting_iterator.hpp>
 #include <boost/range/join.hpp>
-#include <boost/smart_ptr/make_local_shared.hpp>
 #include <fmt/format.h>
 #include <fmt/ostream.h>
 #include <seastar/core/timer.hh>
@@ -91,7 +90,7 @@ OSD::OSD(int id, uint32_t nonce,
     mgrc{new crimson::mgr::Client{*public_msgr, *this}},
     store{store},
     pg_shard_manager{
-      static_cast<OSDMapService&>(*this), whoami, *cluster_msgr,
+      whoami, *cluster_msgr,
       *public_msgr, *monc, *mgrc, store},
     shard_services{pg_shard_manager.get_shard_services()},
     heartbeat{new Heartbeat{whoami, shard_services, *monc, hb_front_msgr, hb_back_msgr}},
@@ -104,7 +103,6 @@ OSD::OSD(int id, uint32_t nonce,
     log_client(cluster_msgr.get(), LogClient::NO_FLAGS),
     clog(log_client.create_channel())
 {
-  osdmaps[0] = boost::make_local_shared<OSDMap>();
   for (auto msgr : {std::ref(cluster_msgr), std::ref(public_msgr),
                     std::ref(hb_front_msgr), std::ref(hb_back_msgr)}) {
     msgr.get()->set_auth_server(monc.get());
@@ -159,7 +157,7 @@ seastar::future<> OSD::open_meta_coll()
   return store.open_collection(
     coll_t::meta()
   ).then([this](auto ch) {
-    meta_coll = make_unique<OSDMeta>(ch, store);
+    pg_shard_manager.init_meta_coll(ch, store);
     return seastar::now();
   });
 }
@@ -169,11 +167,11 @@ seastar::future<> OSD::open_or_create_meta_coll()
   return store.open_collection(coll_t::meta()).then([this] (auto ch) {
     if (!ch) {
       return store.create_new_collection(coll_t::meta()).then([this] (auto ch) {
-       meta_coll = make_unique<OSDMeta>(ch, store);
+       pg_shard_manager.init_meta_coll(ch, store);
        return seastar::now();
       });
     } else {
-      meta_coll = make_unique<OSDMeta>(ch, store);
+      pg_shard_manager.init_meta_coll(ch, store);
       return seastar::now();
     }
   });
@@ -230,7 +228,8 @@ seastar::future<> OSD::mkfs(
 
 seastar::future<> OSD::_write_superblock()
 {
-  return meta_coll->load_superblock().safe_then([this](OSDSuperblock&& sb) {
+  return pg_shard_manager.get_meta_coll().load_superblock(
+  ).safe_then([this](OSDSuperblock&& sb) {
     if (sb.cluster_fsid != superblock.cluster_fsid) {
       logger().error("provided cluster fsid {} != superblock's {}",
                     sb.cluster_fsid, superblock.cluster_fsid);
@@ -250,10 +249,12 @@ seastar::future<> OSD::_write_superblock()
         superblock.cluster_fsid,
         superblock.osd_fsid);
       ceph::os::Transaction t;
-      meta_coll->create(t);
-      meta_coll->store_superblock(t, superblock);
+      pg_shard_manager.get_meta_coll().create(t);
+      pg_shard_manager.get_meta_coll().store_superblock(t, superblock);
       logger().debug("OSD::_write_superblock: do_transaction...");
-      return store.do_transaction(meta_coll->collection(), std::move(t));
+      return store.do_transaction(
+       pg_shard_manager.get_meta_coll().collection(),
+       std::move(t));
     }),
     crimson::ct_error::assert_all("_write_superbock error")
   );
@@ -346,14 +347,14 @@ seastar::future<> OSD::start()
   }).then([this] {
     return open_meta_coll();
   }).then([this] {
-    return meta_coll->load_superblock(
+    return pg_shard_manager.get_meta_coll().load_superblock(
     ).handle_error(
       crimson::ct_error::assert_all("open_meta_coll error")
     );
   }).then([this](OSDSuperblock&& sb) {
     superblock = std::move(sb);
-    return get_map(superblock.current_epoch);
-  }).then([this](cached_map_t&& map) {
+    return pg_shard_manager.get_map(superblock.current_epoch);
+  }).then([this](OSDMapService::cached_map_t&& map) {
     pg_shard_manager.update_map(map);
     pg_shard_manager.got_map(map->get_epoch());
     osdmap = std::move(map);
@@ -688,7 +689,7 @@ seastar::future<> OSD::load_pgs()
   });
 }
 
-seastar::future<Ref<PG>> OSD::make_pg(cached_map_t create_map,
+seastar::future<Ref<PG>> OSD::make_pg(OSDMapService::cached_map_t create_map,
                                      spg_t pgid,
                                      bool do_create)
 {
@@ -707,7 +708,7 @@ seastar::future<Ref<PG>> OSD::make_pg(cached_map_t create_map,
                        std::move(ec_profile)));
     } else {
       // pool was deleted; grab final pg_pool_t off disk.
-      return meta_coll->load_final_pool_info(pgid.pool());
+      return pg_shard_manager.get_meta_coll().load_final_pool_info(pgid.pool());
     }
   };
   auto get_collection = [pgid, do_create, this] {
@@ -743,7 +744,7 @@ seastar::future<Ref<PG>> OSD::load_pg(spg_t pgid)
   return seastar::do_with(PGMeta(store, pgid), [](auto& pg_meta) {
     return pg_meta.get_epoch();
   }).then([this](epoch_t e) {
-    return get_map(e);
+    return pg_shard_manager.get_map(e);
   }).then([pgid, this] (auto&& create_map) {
     return make_pg(std::move(create_map), pgid, false);
   }).then([this](Ref<PG> pg) {
@@ -872,9 +873,11 @@ void OSD::handle_authentication(const EntityName& name,
 void OSD::update_stats()
 {
   osd_stat_seq++;
-  osd_stat.up_from = get_up_epoch();
+  osd_stat.up_from = pg_shard_manager.get_up_epoch();
   osd_stat.hb_peers = heartbeat->get_peers();
-  osd_stat.seq = (static_cast<uint64_t>(get_up_epoch()) << 32) | osd_stat_seq;
+  osd_stat.seq = (
+    static_cast<uint64_t>(pg_shard_manager.get_up_epoch()) << 32
+  ) | osd_stat_seq;
   gate.dispatch_in_background("statfs", *this, [this] {
     (void) store.stat().then([this](store_statfs_t&& st) {
       osd_stat.statfs = st;
@@ -906,105 +909,6 @@ uint64_t OSD::send_pg_stats()
   return osd_stat.seq;
 }
 
-OSD::cached_map_t OSD::get_map() const
-{
-  return osdmap;
-}
-
-seastar::future<OSD::cached_map_t> OSD::get_map(epoch_t e)
-{
-  // TODO: use LRU cache for managing osdmap, fallback to disk if we have to
-  if (auto found = osdmaps.find(e); found) {
-    return seastar::make_ready_future<cached_map_t>(std::move(found));
-  } else {
-    return load_map(e).then([e, this](unique_ptr<OSDMap> osdmap) {
-      return seastar::make_ready_future<cached_map_t>(
-        osdmaps.insert(e, std::move(osdmap)));
-    });
-  }
-}
-
-void OSD::store_map_bl(ceph::os::Transaction& t,
-                       epoch_t e, bufferlist&& bl)
-{
-  meta_coll->store_map(t, e, bl);
-  map_bl_cache.insert(e, std::move(bl));
-}
-
-seastar::future<bufferlist> OSD::load_map_bl(epoch_t e)
-{
-  if (std::optional<bufferlist> found = map_bl_cache.find(e); found) {
-    return seastar::make_ready_future<bufferlist>(*found);
-  } else {
-    return meta_coll->load_map(e);
-  }
-}
-
-seastar::future<std::map<epoch_t, bufferlist>> OSD::load_map_bls(
-  epoch_t first,
-  epoch_t last)
-{
-  return seastar::map_reduce(boost::make_counting_iterator<epoch_t>(first),
-                            boost::make_counting_iterator<epoch_t>(last + 1),
-                            [this](epoch_t e) {
-    return load_map_bl(e).then([e](auto&& bl) {
-       return seastar::make_ready_future<pair<epoch_t, bufferlist>>(
-           std::make_pair(e, std::move(bl)));
-    });
-  },
-  std::map<epoch_t, bufferlist>{},
-  [](auto&& bls, auto&& epoch_bl) {
-    bls.emplace(std::move(epoch_bl));
-    return std::move(bls);
-  });
-}
-
-seastar::future<std::unique_ptr<OSDMap>> OSD::load_map(epoch_t e)
-{
-  auto o = std::make_unique<OSDMap>();
-  if (e > 0) {
-    return load_map_bl(e).then([o=std::move(o)](bufferlist bl) mutable {
-      o->decode(bl);
-      return seastar::make_ready_future<unique_ptr<OSDMap>>(std::move(o));
-    });
-  } else {
-    return seastar::make_ready_future<unique_ptr<OSDMap>>(std::move(o));
-  }
-}
-
-seastar::future<> OSD::store_maps(ceph::os::Transaction& t,
-                                  epoch_t start, Ref<MOSDMap> m)
-{
-  return seastar::do_for_each(boost::make_counting_iterator(start),
-                              boost::make_counting_iterator(m->get_last() + 1),
-                              [&t, m, this](epoch_t e) {
-    if (auto p = m->maps.find(e); p != m->maps.end()) {
-      auto o = std::make_unique<OSDMap>();
-      o->decode(p->second);
-      logger().info("store_maps osdmap.{}", e);
-      store_map_bl(t, e, std::move(std::move(p->second)));
-      osdmaps.insert(e, std::move(o));
-      return seastar::now();
-    } else if (auto p = m->incremental_maps.find(e);
-               p != m->incremental_maps.end()) {
-      return load_map(e - 1).then([e, bl=p->second, &t, this](auto o) {
-        OSDMap::Incremental inc;
-        auto i = bl.cbegin();
-        inc.decode(i);
-        o->apply_incremental(inc);
-        bufferlist fbl;
-        o->encode(fbl, inc.encode_features | CEPH_FEATURE_RESERVED);
-        store_map_bl(t, e, std::move(fbl));
-        osdmaps.insert(e, std::move(o));
-        return seastar::now();
-      });
-    } else {
-      logger().error("MOSDMap lied about what maps it had?");
-      return seastar::now();
-    }
-  });
-}
-
 bool OSD::require_mon_peer(crimson::net::Connection *conn, Ref<Message> m)
 {
   if (!conn->peer_is_mon()) {
@@ -1022,9 +926,9 @@ seastar::future<Ref<PG>> OSD::handle_pg_create_info(
   return seastar::do_with(
     std::move(info),
     [this](auto &info) -> seastar::future<Ref<PG>> {
-      return get_map(info->epoch).then(
-       [&info, this](cached_map_t startmap) ->
-       seastar::future<std::tuple<Ref<PG>, cached_map_t>> {
+      return pg_shard_manager.get_map(info->epoch).then(
+       [&info, this](OSDMapService::cached_map_t startmap) ->
+       seastar::future<std::tuple<Ref<PG>, OSDMapService::cached_map_t>> {
          const spg_t &pgid = info->pgid;
          if (info->by_mon) {
            int64_t pool_id = pgid.pgid.pool();
@@ -1034,7 +938,7 @@ seastar::future<Ref<PG>> OSD::handle_pg_create_info(
                "{} ignoring pgid {}, pool dne",
                __func__,
                pgid);
-             return seastar::make_ready_future<std::tuple<Ref<PG>, cached_map_t>>(
+             return seastar::make_ready_future<std::tuple<Ref<PG>, OSDMapService::cached_map_t>>(
                 std::make_tuple(Ref<PG>(), startmap));
            }
            ceph_assert(osdmap->require_osd_release >= ceph_release_t::octopus);
@@ -1046,13 +950,13 @@ seastar::future<Ref<PG>> OSD::handle_pg_create_info(
                "{} dropping {} create, pool does not have CREATING flag set",
                __func__,
                pgid);
-             return seastar::make_ready_future<std::tuple<Ref<PG>, cached_map_t>>(
+             return seastar::make_ready_future<std::tuple<Ref<PG>, OSDMapService::cached_map_t>>(
                 std::make_tuple(Ref<PG>(), startmap));
            }
          }
          return make_pg(startmap, pgid, true).then(
            [startmap=std::move(startmap)](auto pg) mutable {
-             return seastar::make_ready_future<std::tuple<Ref<PG>, cached_map_t>>(
+             return seastar::make_ready_future<std::tuple<Ref<PG>, OSDMapService::cached_map_t>>(
                 std::make_tuple(std::move(pg), std::move(startmap)));
            });
       }).then([this, &info](auto&& ret) ->
@@ -1143,7 +1047,7 @@ seastar::future<> OSD::handle_osd_map(crimson::net::ConnectionRef conn,
 
   return seastar::do_with(ceph::os::Transaction{},
                           [=](auto& t) {
-    return store_maps(t, start, m).then([=, &t] {
+    return pg_shard_manager.store_maps(t, start, m).then([=, &t] {
       // even if this map isn't from a mon, we may have satisfied our subscription
       monc->sub_got("osdmap", last);
       if (!superblock.oldest_map || skip_maps) {
@@ -1157,9 +1061,11 @@ seastar::future<> OSD::handle_osd_map(crimson::net::ConnectionRef conn,
         superblock.mounted = boot_epoch;
         superblock.clean_thru = last;
       }
-      meta_coll->store_superblock(t, superblock);
+      pg_shard_manager.get_meta_coll().store_superblock(t, superblock);
       logger().debug("OSD::handle_osd_map: do_transaction...");
-      return store.do_transaction(meta_coll->collection(), std::move(t));
+      return store.do_transaction(
+       pg_shard_manager.get_meta_coll().collection(),
+       std::move(t));
     });
   }).then([=] {
     // TODO: write to superblock and commit the transaction
@@ -1176,13 +1082,13 @@ seastar::future<> OSD::committed_osd_maps(version_t first,
   return seastar::do_for_each(boost::make_counting_iterator(first),
                               boost::make_counting_iterator(last + 1),
                               [this](epoch_t cur) {
-    return get_map(cur).then([this](cached_map_t&& o) {
+    return pg_shard_manager.get_map(cur).then([this](OSDMapService::cached_map_t&& o) {
       osdmap = std::move(o);
       pg_shard_manager.update_map(osdmap);
-      if (up_epoch == 0 &&
+      if (pg_shard_manager.get_up_epoch() == 0 &&
           osdmap->is_up(whoami) &&
           osdmap->get_addrs(whoami) == public_msgr->get_myaddrs()) {
-        up_epoch = osdmap->get_epoch();
+        pg_shard_manager.set_up_epoch(osdmap->get_epoch());
         if (!boot_epoch) {
           boot_epoch = osdmap->get_epoch();
         }
@@ -1280,7 +1186,7 @@ seastar::future<> OSD::send_incremental_map(crimson::net::ConnectionRef conn,
                                            epoch_t first)
 {
   if (first >= superblock.oldest_map) {
-    return load_map_bls(first, superblock.newest_map)
+    return pg_shard_manager.load_map_bls(first, superblock.newest_map)
     .then([this, conn, first](auto&& bls) {
       auto m = crimson::make_message<MOSDMap>(monc->get_fsid(),
          osdmap->get_encoding_features());
@@ -1290,7 +1196,7 @@ seastar::future<> OSD::send_incremental_map(crimson::net::ConnectionRef conn,
       return conn->send(std::move(m));
     });
   } else {
-    return load_map_bl(osdmap->get_epoch())
+    return pg_shard_manager.load_map_bl(osdmap->get_epoch())
     .then([this, conn](auto&& bl) mutable {
       auto m = crimson::make_message<MOSDMap>(monc->get_fsid(),
          osdmap->get_encoding_features());
@@ -1388,7 +1294,7 @@ seastar::future<> OSD::restart()
 {
   beacon_timer.cancel();
   tick_timer.cancel();
-  up_epoch = 0;
+  pg_shard_manager.set_up_epoch(0);
   bind_epoch = osdmap->get_epoch();
   // TODO: promote to shutdown if being marked down for multiple times
   // rebind messengers
index 96398ff7daadebf003a9a0e6f54643682e5d11a6..b70f2ccdc9cbb13dccf875480c5ebb78229fa164 100644 (file)
@@ -16,7 +16,6 @@
 #include "crimson/common/gated.h"
 #include "crimson/admin/admin_socket.h"
 #include "crimson/common/simple_lru.h"
-#include "crimson/common/shared_lru.h"
 #include "crimson/mgr/client.h"
 #include "crimson/net/Dispatcher.h"
 #include "crimson/osd/osdmap_service.h"
@@ -59,7 +58,6 @@ namespace crimson::osd {
 class PG;
 
 class OSD final : public crimson::net::Dispatcher,
-                 private OSDMapService,
                  private crimson::common::AuthHandler,
                  private crimson::mgr::WithStats {
   const int whoami;
@@ -72,17 +70,12 @@ class OSD final : public crimson::net::Dispatcher,
   std::unique_ptr<crimson::mon::Client> monc;
   std::unique_ptr<crimson::mgr::Client> mgrc;
 
-  SharedLRU<epoch_t, OSDMap> osdmaps;
-  SimpleLRU<epoch_t, bufferlist, false> map_bl_cache;
-  cached_map_t osdmap;
   // TODO: use a wrapper for ObjectStore
+  OSDMapService::cached_map_t osdmap;
   crimson::os::FuturizedStore& store;
-  std::unique_ptr<OSDMeta> meta_coll;
 
   /// _first_ epoch we were marked up (after this process started)
   epoch_t boot_epoch = 0;
-  /// _most_recent_ epoch we were marked up
-  epoch_t up_epoch = 0;
   //< epoch we last did a bind to new ip:ports
   epoch_t bind_epoch = 0;
   //< since when there is no more pending pg creates from mon
@@ -153,26 +146,12 @@ private:
   seastar::future<> _send_boot();
   seastar::future<> _add_me_to_crush();
 
-  seastar::future<Ref<PG>> make_pg(cached_map_t create_map,
+  seastar::future<Ref<PG>> make_pg(OSDMapService::cached_map_t create_map,
                                   spg_t pgid,
                                   bool do_create);
   seastar::future<Ref<PG>> load_pg(spg_t pgid);
   seastar::future<> load_pgs();
 
-  // OSDMapService methods
-  epoch_t get_up_epoch() const final {
-    return up_epoch;
-  }
-  seastar::future<cached_map_t> get_map(epoch_t e) final;
-  cached_map_t get_map() const final;
-  seastar::future<std::unique_ptr<OSDMap>> load_map(epoch_t e);
-  seastar::future<bufferlist> load_map_bl(epoch_t e);
-  seastar::future<std::map<epoch_t, bufferlist>>
-  load_map_bls(epoch_t first, epoch_t last);
-  void store_map_bl(ceph::os::Transaction& t,
-                    epoch_t e, bufferlist&& bl);
-  seastar::future<> store_maps(ceph::os::Transaction& t,
-                               epoch_t start, Ref<MOSDMap> m);
   seastar::future<> osdmap_subscribe(version_t epoch, bool force_request);
 
   void write_superblock(ceph::os::Transaction& t);
index 285c4ff16ce07ffa6c78a8f581dce7fe3a5c178b..d435bc0fe248232187fb4dc986a376134d53a3e7 100644 (file)
@@ -68,7 +68,7 @@ seastar::future<> PGAdvanceMap::start()
       boost::make_counting_iterator(from + 1),
       boost::make_counting_iterator(to + 1),
       [this](epoch_t next_epoch) {
-        return osd.get_map(next_epoch).then(
+        return osd.pg_shard_manager.get_map(next_epoch).then(
           [this] (cached_map_t&& next_map) {
             pg->handle_advance_map(next_map, rctx);
           });
index 5c00651dbac291a26a7b81b8fb001a4411ef808c..f204fe295095fe7ba6ed9e52de9f850e98ecf5f7 100644 (file)
@@ -12,14 +12,13 @@ namespace {
 namespace crimson::osd {
 
 PGShardManager::PGShardManager(
-  OSDMapService &osdmap_service,
   const int whoami,
   crimson::net::Messenger &cluster_msgr,
   crimson::net::Messenger &public_msgr,
   crimson::mon::Client &monc,
   crimson::mgr::Client &mgrc,
   crimson::os::FuturizedStore &store)
-  : core_state(whoami, osdmap_service, cluster_msgr, public_msgr,
+  : core_state(whoami, cluster_msgr, public_msgr,
               monc, mgrc, store),
     local_state(whoami),
     shard_services(core_state, local_state)
index 2da189c97ff68de465a5cc0b11e66d604fb72f0d..6d54c6b4f3ba989f5189630ad92cf3b058879ea0 100644 (file)
@@ -27,7 +27,6 @@ class PGShardManager {
 
 public:
   PGShardManager(
-    OSDMapService &osdmap_service,
     const int whoami,
     crimson::net::Messenger &cluster_msgr,
     crimson::net::Messenger &public_msgr,
@@ -65,6 +64,18 @@ public:
 
   FORWARD(got_map, got_map, core_state.osdmap_gate)
   FORWARD(wait_for_map, wait_for_map, core_state.osdmap_gate)
+
+  // Metacoll
+  FORWARD_TO_CORE(init_meta_coll)
+  FORWARD_TO_CORE(get_meta_coll)
+
+  // Core OSDMap methods
+  FORWARD_TO_CORE(get_map)
+  FORWARD_TO_CORE(load_map_bl)
+  FORWARD_TO_CORE(load_map_bls)
+  FORWARD_TO_CORE(store_maps)
+  FORWARD_TO_CORE(get_up_epoch)
+  FORWARD_TO_CORE(set_up_epoch)
 };
 
 }
index 26a758464d6f09e964976d5f903d3d44537d440a..7fcbc2472971916d57f0ac07bd613e6fec215695 100644 (file)
@@ -1,9 +1,12 @@
 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
 // vim: ts=8 sw=2 smarttab
 
+#include <boost/smart_ptr/make_local_shared.hpp>
+
 #include "crimson/osd/shard_services.h"
 
 #include "messages/MOSDAlive.h"
+#include "messages/MOSDMap.h"
 #include "messages/MOSDPGCreated.h"
 #include "messages/MOSDPGTemp.h"
 
@@ -42,14 +45,12 @@ PerShardState::PerShardState(
 
 CoreState::CoreState(
   int whoami,
-  OSDMapService &osdmap_service,
   crimson::net::Messenger &cluster_msgr,
   crimson::net::Messenger &public_msgr,
   crimson::mon::Client &monc,
   crimson::mgr::Client &mgrc,
   crimson::os::FuturizedStore &store)
   : whoami(whoami),
-    osdmap_service(osdmap_service),
     osdmap_gate("CoreState::osdmap_gate"),
     cluster_msgr(cluster_msgr),
     public_msgr(public_msgr),
@@ -68,6 +69,7 @@ CoreState::CoreState(
       crimson::common::local_conf()->osd_min_recovery_priority)
 {
   crimson::common::local_conf().add_observer(this);
+  osdmaps[0] = boost::make_local_shared<OSDMap>();
 }
 
 seastar::future<> CoreState::send_to_osd(
@@ -265,6 +267,105 @@ void CoreState::handle_conf_change(const ConfigProxy& conf,
   }
 }
 
+CoreState::cached_map_t CoreState::get_map() const
+{
+  return osdmap;
+}
+
+seastar::future<CoreState::cached_map_t> CoreState::get_map(epoch_t e)
+{
+  // TODO: use LRU cache for managing osdmap, fallback to disk if we have to
+  if (auto found = osdmaps.find(e); found) {
+    return seastar::make_ready_future<cached_map_t>(std::move(found));
+  } else {
+    return load_map(e).then([e, this](std::unique_ptr<OSDMap> osdmap) {
+      return seastar::make_ready_future<cached_map_t>(
+        osdmaps.insert(e, std::move(osdmap)));
+    });
+  }
+}
+
+void CoreState::store_map_bl(ceph::os::Transaction& t,
+                       epoch_t e, bufferlist&& bl)
+{
+  meta_coll->store_map(t, e, bl);
+  map_bl_cache.insert(e, std::move(bl));
+}
+
+seastar::future<bufferlist> CoreState::load_map_bl(epoch_t e)
+{
+  if (std::optional<bufferlist> found = map_bl_cache.find(e); found) {
+    return seastar::make_ready_future<bufferlist>(*found);
+  } else {
+    return meta_coll->load_map(e);
+  }
+}
+
+seastar::future<std::map<epoch_t, bufferlist>> CoreState::load_map_bls(
+  epoch_t first,
+  epoch_t last)
+{
+  return seastar::map_reduce(boost::make_counting_iterator<epoch_t>(first),
+                            boost::make_counting_iterator<epoch_t>(last + 1),
+                            [this](epoch_t e) {
+    return load_map_bl(e).then([e](auto&& bl) {
+      return seastar::make_ready_future<std::pair<epoch_t, bufferlist>>(
+       std::make_pair(e, std::move(bl)));
+    });
+  },
+  std::map<epoch_t, bufferlist>{},
+  [](auto&& bls, auto&& epoch_bl) {
+    bls.emplace(std::move(epoch_bl));
+    return std::move(bls);
+  });
+}
+
+seastar::future<std::unique_ptr<OSDMap>> CoreState::load_map(epoch_t e)
+{
+  auto o = std::make_unique<OSDMap>();
+  if (e > 0) {
+    return load_map_bl(e).then([o=std::move(o)](bufferlist bl) mutable {
+      o->decode(bl);
+      return seastar::make_ready_future<std::unique_ptr<OSDMap>>(std::move(o));
+    });
+  } else {
+    return seastar::make_ready_future<std::unique_ptr<OSDMap>>(std::move(o));
+  }
+}
+
+seastar::future<> CoreState::store_maps(ceph::os::Transaction& t,
+                                  epoch_t start, Ref<MOSDMap> m)
+{
+  return seastar::do_for_each(boost::make_counting_iterator(start),
+                              boost::make_counting_iterator(m->get_last() + 1),
+                              [&t, m, this](epoch_t e) {
+    if (auto p = m->maps.find(e); p != m->maps.end()) {
+      auto o = std::make_unique<OSDMap>();
+      o->decode(p->second);
+      logger().info("store_maps osdmap.{}", e);
+      store_map_bl(t, e, std::move(std::move(p->second)));
+      osdmaps.insert(e, std::move(o));
+      return seastar::now();
+    } else if (auto p = m->incremental_maps.find(e);
+               p != m->incremental_maps.end()) {
+      return load_map(e - 1).then([e, bl=p->second, &t, this](auto o) {
+        OSDMap::Incremental inc;
+        auto i = bl.cbegin();
+        inc.decode(i);
+        o->apply_incremental(inc);
+        bufferlist fbl;
+        o->encode(fbl, inc.encode_features | CEPH_FEATURE_RESERVED);
+        store_map_bl(t, e, std::move(fbl));
+        osdmaps.insert(e, std::move(o));
+        return seastar::now();
+      });
+    } else {
+      logger().error("MOSDMap lied about what maps it had?");
+      return seastar::now();
+    }
+  });
+}
+
 seastar::future<> ShardServices::dispatch_context_transaction(
   crimson::os::CollectionRef col, PeeringCtx &ctx) {
   if (ctx.transaction.empty()) {
index 1dcaf64cd2fdb01796846a4eab0122c1afc52f1a..a46b8bffaa2b59a1e95692871ba27626dfe24d6a 100644 (file)
@@ -3,6 +3,8 @@
 
 #pragma once
 
+#include <memory>
+
 #include <boost/intrusive_ptr.hpp>
 #include <seastar/core/future.hh>
 
 #include "osd_operation.h"
 #include "msg/MessageRef.h"
 #include "crimson/common/exception.h"
+#include "crimson/common/shared_lru.h"
 #include "crimson/os/futurized_collection.h"
 #include "osd/PeeringState.h"
 #include "crimson/osd/osdmap_service.h"
 #include "crimson/osd/osdmap_gate.h"
+#include "crimson/osd/osd_meta.h"
 #include "crimson/osd/object_context.h"
 #include "crimson/osd/state.h"
 #include "common/AsyncReserver.h"
@@ -103,12 +107,11 @@ class PerShardState {
  * OSD-wide singleton holding instances that need to be accessible
  * from all PGs.
  */
-class CoreState : public md_config_obs_t {
+class CoreState : public md_config_obs_t, public OSDMapService {
   friend class ShardServices;
   friend class PGShardManager;
   CoreState(
     int whoami,
-    OSDMapService &osdmap_service,
     crimson::net::Messenger &cluster_msgr,
     crimson::net::Messenger &public_msgr,
     crimson::mon::Client &monc,
@@ -121,10 +124,9 @@ class CoreState : public md_config_obs_t {
 
   OSDState osd_state;
 
-  OSDMapService &osdmap_service;
-  OSDMapService::cached_map_t osdmap;
-  OSDMapService::cached_map_t &get_osdmap() { return osdmap; }
-  void update_map(OSDMapService::cached_map_t new_osdmap) {
+  cached_map_t osdmap;
+  cached_map_t &get_osdmap() { return osdmap; }
+  void update_map(cached_map_t new_osdmap) {
     osdmap = std::move(new_osdmap);
   }
   OSD_OSDMapGate osdmap_gate;
@@ -147,6 +149,16 @@ class CoreState : public md_config_obs_t {
     return (ceph_tid_t)next_tid++;
   }
 
+  std::unique_ptr<OSDMeta> meta_coll;
+  template <typename... Args>
+  void init_meta_coll(Args&&... args) {
+    meta_coll = std::make_unique<OSDMeta>(std::forward<Args>(args)...);
+  }
+  OSDMeta &get_meta_coll() {
+    assert(meta_coll);
+    return *meta_coll;
+  }
+
   // global pg temp state
   struct pg_temp_t {
     std::vector<int> acting;
@@ -202,6 +214,29 @@ class CoreState : public md_config_obs_t {
   void handle_conf_change(
     const ConfigProxy& conf,
     const std::set <std::string> &changed) final;
+
+  // OSDMapService
+  epoch_t up_epoch = 0;
+  epoch_t get_up_epoch() const final {
+    return up_epoch;
+  }
+  void set_up_epoch(epoch_t e) {
+    up_epoch = e;
+  }
+
+  SharedLRU<epoch_t, OSDMap> osdmaps;
+  SimpleLRU<epoch_t, bufferlist, false> map_bl_cache;
+
+  seastar::future<cached_map_t> get_map(epoch_t e) final;
+  cached_map_t get_map() const final;
+  seastar::future<std::unique_ptr<OSDMap>> load_map(epoch_t e);
+  seastar::future<bufferlist> load_map_bl(epoch_t e);
+  seastar::future<std::map<epoch_t, bufferlist>>
+  load_map_bls(epoch_t first, epoch_t last);
+  void store_map_bl(ceph::os::Transaction& t,
+                    epoch_t e, bufferlist&& bl);
+  seastar::future<> store_maps(ceph::os::Transaction& t,
+                               epoch_t start, Ref<MOSDMap> m);
 };
 
 #define FORWARD_CONST(FROM_METHOD, TO_METHOD, TARGET)          \
@@ -245,7 +280,7 @@ public:
 
   // OSDMapService
   const OSDMapService &get_osdmap_service() const {
-    return core_state.osdmap_service;
+    return core_state;
   }
 
   template <typename T, typename... Args>