]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/osd/shard_services: get multiple store shards
authorChunmei Liu <chunmei.liu@ibm.com>
Wed, 20 Aug 2025 23:56:34 +0000 (23:56 +0000)
committerChunmei Liu <chunmei.liu@ibm.com>
Thu, 21 Aug 2025 02:30:37 +0000 (02:30 +0000)
 for per local state, and use store index to create pg mapping

Signed-off-by: Chunmei Liu <chunmei.liu@ibm.com>
src/crimson/osd/osd.cc
src/crimson/osd/osd_operations/pg_advance_map.cc
src/crimson/osd/pg_shard_manager.cc
src/crimson/osd/pg_shard_manager.h
src/crimson/osd/shard_services.cc
src/crimson/osd/shard_services.h

index 45a830fc1cb41849ca7469a9e0029846c919b807..9ccb1ad3c0061614d8d170bd3ca160f21cfaf75e 100644 (file)
@@ -183,7 +183,7 @@ CompatSet get_osd_initial_compat_set()
 seastar::future<> OSD::open_meta_coll()
 {
   ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
-  return store.get_sharded_store().open_collection(
+  return store.get_sharded_store()->open_collection(
     coll_t::meta()
   ).then([this](auto ch) {
     pg_shard_manager.init_meta_coll(ch, store.get_sharded_store());
@@ -257,9 +257,9 @@ seastar::future<MetricPayload> OSD::get_perf_reports() {
 
 seastar::future<OSDMeta> OSD::open_or_create_meta_coll(FuturizedStore &store)
 {
-  return store.get_sharded_store().open_collection(coll_t::meta()).then([&store](auto ch) {
+  return store.get_sharded_store()->open_collection(coll_t::meta()).then([&store](auto ch) {
     if (!ch) {
-      return store.get_sharded_store().create_new_collection(
+      return store.get_sharded_store()->create_new_collection(
        coll_t::meta()
       ).then([&store](auto ch) {
        return OSDMeta(ch, store.get_sharded_store());
@@ -360,7 +360,7 @@ seastar::future<> OSD::_write_superblock(
          meta_coll.create(t);
          meta_coll.store_superblock(t, superblock);
          DEBUG("OSD::_write_superblock: do_transaction...");
-         return store.get_sharded_store().do_transaction(
+         return store.get_sharded_store()->do_transaction(
            meta_coll.collection(),
            std::move(t));
        }),
@@ -452,25 +452,36 @@ seastar::future<> OSD::start()
   }
   startup_time = ceph::mono_clock::now();
   ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
-  return store.start().then([this] {
-    return pg_to_shard_mappings.start(0, seastar::smp::count
+  return store.start().then([this] (auto store_shard_nums) {
+    return pg_to_shard_mappings.start(0, seastar::smp::count, store_shard_nums
     ).then([this] {
       return osd_singleton_state.start_single(
         whoami, std::ref(*cluster_msgr), std::ref(*public_msgr),
         std::ref(*monc), std::ref(*mgrc));
     }).then([this] {
       return osd_states.start();
-    }).then([this] {
+    }).then([this, store_shard_nums] {
       ceph::mono_time startup_time = ceph::mono_clock::now();
       return shard_services.start(
         std::ref(osd_singleton_state),
         std::ref(pg_to_shard_mappings),
+        store_shard_nums,
         whoami,
         startup_time,
         osd_singleton_state.local().perf,
         osd_singleton_state.local().recoverystate_perf,
         std::ref(store),
         std::ref(osd_states));
+    }).then([this, FNAME] {
+      return shard_services.invoke_on_all(
+      [this](auto& local_service) {
+        local_service.set_container(shard_services);
+      });
+    }).then([this, FNAME] {
+      return shard_services.invoke_on_all(
+      [this](auto& local_service) {
+        return local_service.get_remote_store();
+      });
     });
   }).then([this, FNAME] {
     heartbeat.reset(new Heartbeat{
@@ -1214,7 +1225,7 @@ seastar::future<> OSD::_handle_osd_map(Ref<MOSDMap> m)
   co_await pg_shard_manager.set_superblock(superblock);
 
   DEBUG("submitting transaction");
-  co_await store.get_sharded_store().do_transaction(
+  co_await store.get_sharded_store()->do_transaction(
     pg_shard_manager.get_meta_coll().collection(), std::move(t));
 
   // TODO: write to superblock and commit the transaction
@@ -1587,7 +1598,7 @@ seastar::future<double> OSD::run_bench(int64_t count, int64_t bsize, int64_t osi
     std::vector<seastar::future<>> futures;
     std::vector<seastar::future<>> cleanup_futures;
     
-    auto collection_future = store.get_sharded_store().open_collection(
+    auto collection_future = store.get_sharded_store()->open_collection(
       coll_t::meta());
     auto collection_ref = co_await std::move(collection_future);
     ceph::os::Transaction cleanup_t;
@@ -1604,10 +1615,10 @@ seastar::future<double> OSD::run_bench(int64_t count, int64_t bsize, int64_t osi
                         ghobject_t::NO_GEN,
                         shard_id_t::NO_SHARD);
         t.write(coll_t::meta(), oid, 0, data.size(), bl);
-        futures.push_back(store.get_sharded_store().do_transaction(
+        futures.push_back(store.get_sharded_store()->do_transaction(
           collection_ref, std::move(t)));
         cleanup_t.remove(coll_t::meta(), oid);
-        cleanup_futures.push_back(store.get_sharded_store().do_transaction(
+        cleanup_futures.push_back(store.get_sharded_store()->do_transaction(
           collection_ref, std::move(cleanup_t)));
       }
     }
@@ -1641,12 +1652,12 @@ seastar::future<double> OSD::run_bench(int64_t count, int64_t bsize, int64_t osi
 
       t.write(coll_t::meta(), oid, offset, bsize, bl);
 
-      futures_bench.push_back(store.get_sharded_store().do_transaction(
+      futures_bench.push_back(store.get_sharded_store()->do_transaction(
         collection_ref, std::move(t)));
 
       if (!onum || !osize) {
         cleanup_t.remove(coll_t::meta(), oid);
-        cleanup_futures.push_back(store.get_sharded_store().do_transaction(
+        cleanup_futures.push_back(store.get_sharded_store()->do_transaction(
           collection_ref, std::move(cleanup_t)));
       }
     }
index c3ea5560a6754bf6646710c207678884a277719e..dd2c417a73184f7ccb4a3ac0cad632aa987e1dc3 100644 (file)
@@ -165,11 +165,11 @@ seastar::future<> PGAdvanceMap::split_pg(
     children_pgids.insert(child_pgid);
 
     // Map each child pg ID to a core
-    auto core = co_await shard_services.create_split_pg_mapping(child_pgid, seastar::this_shard_id());
+    auto core = co_await shard_services.create_split_pg_mapping(child_pgid, seastar::this_shard_id(), 0);
     DEBUG(" PG {} mapped to {}", child_pgid.pgid, core);
     DEBUG(" {} map epoch: {}", child_pgid.pgid, pg_epoch);
     auto map = next_map;
-    auto child_pg = co_await shard_services.make_pg(std::move(map), child_pgid, true);
+    auto child_pg = co_await shard_services.make_pg(std::move(map), child_pgid, 0, true);
 
     DEBUG(" Parent pgid: {}", pg->get_pgid());
     DEBUG(" Child pgid: {}", child_pg->get_pgid());
index cb5e556ab9efc2037e75ec30adf08f4e062ec145..c24345a96ea1d8a9aa5bb8a558c6c507c97ded0f 100644 (file)
@@ -20,19 +20,20 @@ seastar::future<> PGShardManager::load_pgs(crimson::os::FuturizedStore& store)
     return seastar::parallel_for_each(
       colls_cores,
       [this](auto coll_core) {
-        auto[coll, shard_core] = coll_core;
+        auto[coll, shard_core_index] = coll_core;
+        auto[shard_core, store_index] = shard_core_index;
        spg_t pgid;
        if (coll.is_pg(&pgid)) {
           return get_pg_to_shard_mapping().get_or_create_pg_mapping(
-            pgid, shard_core
-          ).then([this, pgid] (auto core) {
+            pgid, shard_core, store_index
+          ).then([this, pgid] (auto core_store) {
             return this->with_remote_shard_state(
-              core,
-              [pgid](
+              core_store.first,
+              [pgid, core_store](
              PerShardState &per_shard_state,
              ShardServices &shard_services) {
              return shard_services.load_pg(
-               pgid
+               pgid, core_store.second
              ).then([pgid, &per_shard_state](auto &&pg) {
                logger().info("load_pgs: loaded {}", pgid);
                return pg->clear_temp_objects(
index 4e8ddb82502baa0cae7f45cbe8139ae547e73b20..c3d4d77d9a900a078b3446f1c66154c9624f1e54 100644 (file)
@@ -223,17 +223,19 @@ public:
   template <typename T>
   seastar::future<> run_with_pg_maybe_create(
     typename T::IRef op,
-    ShardServices &target_shard_services
+    ShardServices &target_shard_services,
+    unsigned int store_index
   ) {
     static_assert(T::can_create());
     auto &logger = crimson::get_logger(ceph_subsys_osd);
     auto &opref = *op;
     return opref.template with_blocking_event<
       PGMap::PGCreationBlockingEvent
-    >([&target_shard_services, &opref](auto &&trigger) {
+    >([&target_shard_services, &opref, store_index](auto &&trigger) {
       return target_shard_services.get_or_create_pg(
         std::move(trigger),
         opref.get_pgid(),
+        store_index,
         opref.get_create_info()
       );
     }).safe_then([&logger, &target_shard_services, &opref](Ref<PG> pgref) {
@@ -407,22 +409,22 @@ public:
         return seastar::make_exception_future<>(fut.get_exception());
       }
 
-      auto core = fut.get();
+      auto core_store = fut.get();
       logger.debug("{}: can_create={}, target-core={}",
-                   *op, T::can_create(), core);
+                   *op, T::can_create(), core_store.first);
       return this->template with_remote_shard_state_and_op<T>(
-        core, std::move(op),
-        [this](ShardServices &target_shard_services,
+        core_store.first, std::move(op),
+        [this, core_store](ShardServices &target_shard_services,
                typename T::IRef op) {
         auto &opref = *op;
         auto &logger = crimson::get_logger(ceph_subsys_osd);
         logger.debug("{}: entering create_or_wait_pg", opref);
         return opref.template enter_stage<>(
           opref.get_pershard_pipeline(target_shard_services).create_or_wait_pg
-        ).then([this, &target_shard_services, op=std::move(op)]() mutable {
+        ).then([this, &target_shard_services, op=std::move(op), core_store]() mutable {
           if constexpr (T::can_create()) {
             return this->template run_with_pg_maybe_create<T>(
-                std::move(op), target_shard_services);
+                std::move(op), target_shard_services, core_store.second);
           } else {
             return this->template run_with_pg_maybe_wait<T>(
                 std::move(op), target_shard_services);
@@ -469,6 +471,7 @@ public:
            opref, opref.get_pgid());
          return seastar::now();
        }
+  SUBDEBUG(osd, "{}: have_pg", opref);
        return op->with_pg(
          target_shard_services, pg
        ).finally([op] {});
index 6a983648d04bc66919af6f533aeb8bb34d0f1f97..da8c0caa9d462be809fb924ac9f1090aee24818c 100644 (file)
@@ -40,7 +40,7 @@ PerShardState::PerShardState(
   crimson::os::FuturizedStore &store,
   OSDState &osd_state)
   : whoami(whoami),
-    store(store.get_sharded_store()),
+    stores(store.get_sharded_stores()),
     osd_state(osd_state),
     osdmap_gate("PerShardState::osdmap_gate"),
     perf(perf), recoverystate_perf(recoverystate_perf),
@@ -560,6 +560,7 @@ void OSDSingletonState::trim_maps(ceph::os::Transaction& t,
 seastar::future<Ref<PG>> ShardServices::make_pg(
   OSDMapService::cached_map_t create_map,
   spg_t pgid,
+  unsigned store_index,
   bool do_create)
 {
   using ec_profile_t = std::map<std::string, std::string>;
@@ -583,41 +584,45 @@ seastar::future<Ref<PG>> ShardServices::make_pg(
       return get_pool_info(pgid.pool());
     }
   };
-  auto get_collection = [pgid, do_create, this] {
+  auto get_collection = [pgid, do_create, store_index, this] {
     const coll_t cid{pgid};
     if (do_create) {
-      return get_store().create_new_collection(cid);
+      return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::create_new_collection>(
+        get_store(store_index), cid);
     } else {
-      return get_store().open_collection(cid);
+      return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::open_collection>(
+        get_store(store_index), cid);
     }
   };
   return seastar::when_all(
     std::move(get_pool_info_for_pg),
     std::move(get_collection)
-  ).then([pgid, create_map, this](auto &&ret) {
+  ).then([pgid, create_map, store_index, this](auto &&ret) {
     auto [pool, name, ec_profile] = std::move(std::get<0>(ret).get());
     auto coll = std::move(std::get<1>(ret).get());
     return seastar::make_ready_future<Ref<PG>>(
       new PG{
-       pgid,
-       pg_shard_t{local_state.whoami, pgid.shard},
-       std::move(coll),
-       std::move(pool),
-       std::move(name),
-       create_map,
-       *this,
-       ec_profile});
+        pgid,
+        pg_shard_t{local_state.whoami, pgid.shard},
+        std::move(store_index),
+        std::move(coll),
+        std::move(pool),
+        std::move(name),
+        create_map,
+        *this,
+        ec_profile});
   });
 }
 
 seastar::future<Ref<PG>> ShardServices::handle_pg_create_info(
+  unsigned int store_index,
   std::unique_ptr<PGCreateInfo> info) {
   return seastar::do_with(
     std::move(info),
-    [this](auto &info)
+    [store_index, this](auto &info)
     -> seastar::future<Ref<PG>> {
       return get_map(info->epoch).then(
-       [&info, this](cached_map_t startmap)
+       [&info, store_index, this](cached_map_t startmap)
        -> seastar::future<std::tuple<Ref<PG>, cached_map_t>> {
          LOG_PREFIX(ShardServices::handle_pg_create_info);
          const spg_t &pgid = info->pgid;
@@ -659,7 +664,7 @@ seastar::future<Ref<PG>> ShardServices::handle_pg_create_info(
            }
          }
          return make_pg(
-           startmap, pgid, true
+           startmap, pgid, store_index, true
          ).then([startmap=std::move(startmap)](auto pg) mutable {
            return seastar::make_ready_future<
              std::tuple<Ref<PG>, OSDMapService::cached_map_t>
@@ -716,6 +721,7 @@ ShardServices::get_or_create_pg_ret
 ShardServices::get_or_create_pg(
   PGMap::PGCreationBlockingEvent::TriggerI&& trigger,
   spg_t pgid,
+  unsigned int store_index,
   std::unique_ptr<PGCreateInfo> info)
 {
   if (info) {
@@ -724,7 +730,8 @@ ShardServices::get_or_create_pg(
     if (!existed) {
       local_state.pg_map.set_creating(pgid);
       (void)handle_pg_create_info(
-       std::move(info));
+        store_index,
+        std::move(info));
     }
     return std::move(fut);
   } else {
@@ -754,20 +761,19 @@ ShardServices::create_split_pg(
   return std::move(fut);
 }
 
-seastar::future<Ref<PG>> ShardServices::load_pg(spg_t pgid)
-
+seastar::future<Ref<PG>> ShardServices::load_pg(spg_t pgid, unsigned int store_index)
 {
   LOG_PREFIX(OSDSingletonState::load_pg);
   DEBUG("{}", pgid);
 
-  return seastar::do_with(PGMeta(get_store(), pgid), [](auto& pg_meta) {
+  return seastar::do_with(PGMeta(get_store(store_index), pgid), [](auto& pg_meta) {
     return pg_meta.get_epoch();
   }).then([this](epoch_t e) {
     return get_map(e);
-  }).then([pgid, this](auto&& create_map) {
-    return make_pg(std::move(create_map), pgid, false);
-  }).then([this](Ref<PG> pg) {
-    return pg->read_state(&get_store()).then([pg] {
+  }).then([pgid, store_index, this](auto&& create_map) {
+    return make_pg(std::move(create_map), pgid, store_index, false);
+  }).then([store_index, this](Ref<PG> pg) {
+    return pg->read_state(get_store(store_index)).then([pg] {
        return seastar::make_ready_future<Ref<PG>>(std::move(pg));
     });
   }).handle_exception([FNAME, pgid](auto ep) {
@@ -778,11 +784,13 @@ seastar::future<Ref<PG>> ShardServices::load_pg(spg_t pgid)
 }
 
 seastar::future<> ShardServices::dispatch_context_transaction(
-  crimson::os::CollectionRef col, PeeringCtx &ctx) {
+  crimson::os::CollectionRef col, PeeringCtx &ctx, unsigned int store_index) {
   LOG_PREFIX(OSDSingletonState::dispatch_context_transaction);
   if (ctx.transaction.empty()) {
     DEBUG("empty transaction");
-    co_await get_store().flush(col);
+    co_await crimson::os::with_store_do_transaction(
+      get_store(store_index),
+      col, ceph::os::Transaction{});
     Context* on_commit(
       ceph::os::Transaction::collect_all_contexts(ctx.transaction));
     if (on_commit) {
@@ -792,7 +800,8 @@ seastar::future<> ShardServices::dispatch_context_transaction(
   }
 
   DEBUG("do_transaction ...");
-  co_await get_store().do_transaction(
+  co_await crimson::os::with_store_do_transaction(
+    get_store(store_index),
     col,
     ctx.transaction.claim_and_reset());
   co_return;
@@ -821,17 +830,18 @@ seastar::future<> ShardServices::dispatch_context_messages(
 }
 
 seastar::future<> ShardServices::dispatch_context(
+  unsigned int store_index,
   crimson::os::CollectionRef col,
   PeeringCtx &&pctx)
 {
   return seastar::do_with(
     std::move(pctx),
-    [this, col](auto &ctx) {
+    [this, col, store_index](auto &ctx) {
     ceph_assert(col || ctx.transaction.empty());
     return seastar::when_all_succeed(
       dispatch_context_messages(
        BufferedRecoveryMessages{ctx}),
-      col ? dispatch_context_transaction(col, ctx) : seastar::now()
+      col ? dispatch_context_transaction(col, ctx, store_index) : seastar::now()
     ).then_unpack([] {
       return seastar::now();
     });
index f5a1f2c74ef303597b8338112fa20bf1bada2e07..5ccc87037786b12fe076ccee724ae20aaf65dfc7 100644 (file)
@@ -67,7 +67,7 @@ class PerShardState {
 #define assert_core() ceph_assert(seastar::this_shard_id() == core);
 
   const int whoami;
-  crimson::os::FuturizedStore::Shard &store;
+  std::vector<crimson::os::FuturizedStore::StoreShardRef> stores;
   crimson::common::CephContext cct;
 
   OSDState &osd_state;
@@ -353,6 +353,8 @@ class ShardServices : public OSDMapService {
   PerShardState local_state;
   seastar::sharded<OSDSingletonState> &osd_singleton_state;
   PGShardMapping& pg_to_shard_mapping;
+  seastar::sharded<ShardServices>* s_container = nullptr;
+  unsigned int store_shard_nums = 0;
 
   template <typename F, typename... Args>
   auto with_singleton(F &&f, Args&&... args) {
@@ -463,15 +465,37 @@ public:
   ShardServices(
     seastar::sharded<OSDSingletonState> &osd_singleton_state,
     PGShardMapping& pg_to_shard_mapping,
+    unsigned int store_shard_nums,
     PSSArgs&&... args)
     : local_state(std::forward<PSSArgs>(args)...),
       osd_singleton_state(osd_singleton_state),
-      pg_to_shard_mapping(pg_to_shard_mapping) {}
+      pg_to_shard_mapping(pg_to_shard_mapping),
+      store_shard_nums(store_shard_nums) {}
 
   FORWARD_TO_OSD_SINGLETON(send_to_osd)
 
-  crimson::os::FuturizedStore::Shard &get_store() {
-    return local_state.store;
+  void set_container(seastar::sharded<ShardServices>& ss) { s_container = &ss; }
+
+  seastar::future<> get_remote_store() {
+    if (local_state.stores.empty()) {
+      return s_container->invoke_on(
+        seastar::this_shard_id() % store_shard_nums,
+        [] (auto& remote_service) {
+        assert(remote_service.local_state.stores.size() == 1);
+        auto ret = remote_service.local_state.stores[0].get_foreign();
+        return std::move(ret);
+      }).then([this](auto&& remote_store) {
+        local_state.stores.emplace_back(make_local_shared_foreign(std::move(remote_store)));
+        return seastar::now();
+      });
+    } else {
+      return seastar::now();
+    }
+  }
+
+  crimson::os::FuturizedStore::StoreShardRef get_store(unsigned int store_index) {
+    assert(store_index < local_state.stores.size());
+    return local_state.stores[store_index];
   }
 
   struct shard_stats_t {
@@ -481,8 +505,8 @@ public:
     return {get_reactor_utilization()};
   }
 
-  auto create_split_pg_mapping(spg_t pgid, core_id_t core) {
-    return pg_to_shard_mapping.get_or_create_pg_mapping(pgid, core);
+  auto create_split_pg_mapping(spg_t pgid, core_id_t core, unsigned int store_index) {
+    return pg_to_shard_mapping.get_or_create_pg_mapping(pgid, core, store_index);
   }
 
   auto remove_pg(spg_t pgid) {
@@ -524,8 +548,10 @@ public:
   seastar::future<Ref<PG>> make_pg(
     cached_map_t create_map,
     spg_t pgid,
+    unsigned int store_index,
     bool do_create);
   seastar::future<Ref<PG>> handle_pg_create_info(
+    unsigned int store_index,
     std::unique_ptr<PGCreateInfo> info);
 
   using get_or_create_pg_ertr = PGMap::wait_for_pg_ertr;
@@ -533,6 +559,7 @@ public:
   get_or_create_pg_ret get_or_create_pg(
     PGMap::PGCreationBlockingEvent::TriggerI&&,
     spg_t pgid,
+    unsigned int store_index,
     std::unique_ptr<PGCreateInfo> info);
 
   using wait_for_pg_ertr = PGMap::wait_for_pg_ertr;
@@ -543,11 +570,11 @@ public:
     PGMap::PGCreationBlockingEvent::TriggerI&& trigger,
     spg_t pgid);
 
-  seastar::future<Ref<PG>> load_pg(spg_t pgid);
+  seastar::future<Ref<PG>> load_pg(spg_t pgid, unsigned int store_index);
 
   /// Dispatch and reset ctx transaction
   seastar::future<> dispatch_context_transaction(
-    crimson::os::CollectionRef col, PeeringCtx &ctx);
+    crimson::os::CollectionRef col, PeeringCtx &ctx, unsigned int store_index);
 
   /// Dispatch and reset ctx messages
   seastar::future<> dispatch_context_messages(
@@ -555,13 +582,15 @@ public:
 
   /// Dispatch ctx and dispose of context
   seastar::future<> dispatch_context(
+    unsigned int store_index,
     crimson::os::CollectionRef col,
     PeeringCtx &&ctx);
 
   /// Dispatch ctx and dispose of ctx, transaction must be empty
   seastar::future<> dispatch_context(
+    unsigned int store_index,
     PeeringCtx &&ctx) {
-    return dispatch_context({}, std::move(ctx));
+    return dispatch_context(store_index, {}, std::move(ctx));
   }
 
   PerShardPipeline &get_client_request_pipeline() {