]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/os/cyanstore: create multiple store shards on each reactor
authorchunmei liu <chunmei.liu@ibm.com>
Tue, 15 Jul 2025 10:27:16 +0000 (03:27 -0700)
committerChunmei Liu <chunmei.liu@ibm.com>
Tue, 27 Jan 2026 23:42:31 +0000 (23:42 +0000)
note: src/stop.sh should wait enought time before kill the crimson-osd
in case cyanstore can't write meta data to disk.

Signed-off-by: chunmei liu <chunmei.liu@ibm.com>
src/crimson/os/cyanstore/cyan_store.cc
src/crimson/os/cyanstore/cyan_store.h

index 4f576fc98a53a2522a94f92a920851acda84ada8..ecb82afc9a8571dd041c263e48851be1a8dc686e 100644 (file)
@@ -55,19 +55,90 @@ private:
   };
 };
 
+namespace fs = std::filesystem;
+seastar::future<> CyanStore::get_shard_nums()
+{
+  store_shard_nums = 0;
+  for (const auto& entry : fs::directory_iterator(path)) {
+    const std::string filename = entry.path().filename().string();
+    if (filename.rfind("collections", 0) == 0) {
+      store_shard_nums++;
+    }
+  }
+  if (store_shard_nums == 0) {
+    // If no collections files found, assume seastar::smp::count shards
+    store_shard_nums = seastar::smp::count;
+  }
+  return seastar::make_ready_future<>();
+}
+
+seastar::future<unsigned int> CyanStore::start()
+{
+  ceph_assert(seastar::this_shard_id() == primary_core);
+  return get_shard_nums().then([this] {
+    auto num_shard_services = (store_shard_nums + seastar::smp::count - 1 ) / seastar::smp::count;
+    logger().info("store_shard_nums={} seastar::smp={}, num_shard_services={}", store_shard_nums, seastar::smp::count, num_shard_services);
+    return shard_stores.start(num_shard_services, path, store_shard_nums);
+  }).then([this] {
+    logger().debug("CyanStore started with {} shard stores", store_shard_nums);
+    return seastar::make_ready_future<unsigned int>(store_shard_nums);
+  });
+}
+
+seastar::future<> CyanStore::stop()
+{
+  logger().debug("stopping shard stores");
+  return shard_stores.stop();
+}
+
+CyanStore::mount_ertr::future<> CyanStore::mount()
+{
+  ceph_assert(seastar::this_shard_id() == primary_core);
+  return shard_stores.invoke_on_all([](auto &local_store) {
+    return seastar::do_for_each(local_store.mshard_stores, [](auto& mshard_store) {
+      return mshard_store->mount().handle_error(
+        crimson::ct_error::assert_all{
+          "Invalid error in CyanStore::mount"
+        });
+    });
+  });
+}
+
+seastar::future<> CyanStore::umount()
+{
+  ceph_assert(seastar::this_shard_id() == primary_core);
+  return shard_stores.invoke_on_all([](auto &local_store) {
+    return seastar::do_for_each(local_store.mshard_stores, [](auto& mshard_store) {
+      return mshard_store->umount();
+    });
+  });
+}
+
 seastar::future<store_statfs_t> CyanStore::stat() const
 {
   ceph_assert(seastar::this_shard_id() == primary_core);
   logger().debug("{}", __func__);
+
   return shard_stores.map_reduce0(
-    [](const CyanStore::Shard &local_store) {
-      return local_store.get_used_bytes();
+    [](const auto& local_store) {
+      return seastar::map_reduce(
+        local_store.mshard_stores.begin(),
+        local_store.mshard_stores.end(),
+        [](const auto& mshard_store) {
+          return seastar::make_ready_future<uint64_t>(
+            mshard_store->get_used_bytes()
+          );
+        },
+        uint64_t{0},
+        std::plus<uint64_t>()
+      );
     },
-    (uint64_t)0,
+    uint64_t{0},
     std::plus<uint64_t>()
   ).then([](uint64_t used_bytes) {
     store_statfs_t st;
-    st.total = crimson::common::local_conf().get_val<Option::size_t>("memstore_device_bytes");
+    st.total = crimson::common::local_conf()
+      .get_val<Option::size_t>("memstore_device_bytes");
     st.available = st.total - used_bytes;
     return seastar::make_ready_future<store_statfs_t>(std::move(st));
   });
@@ -110,15 +181,33 @@ CyanStore::mkfs_ertr::future<> CyanStore::mkfs(uuid_d new_osd_fsid)
   }).safe_then([this]{
     return write_meta("type", "memstore");
   }).safe_then([this] {
-    return shard_stores.invoke_on_all(
-      [](auto &local_store) {
-      return local_store.mkfs();
+    return shard_stores.invoke_on_all([](auto &local_store) {
+      return seastar::do_for_each(local_store.mshard_stores, [](auto& mshard_store) {
+        return mshard_store->mkfs();
+      });
     });
   });
 }
 
+CyanStore::Shard::Shard(
+  std::string path,
+  unsigned int store_shard_nums,
+  unsigned int store_index)
+  : path(path),
+    store_index(store_index)
+{
+  ceph_assert(store_index < store_shard_nums);
+  if (store_active = is_shard_store_active(store_index, store_shard_nums); !store_active) {
+    logger().info("store_index {} is out of range - inactivating this store shard, store_shard_nums {}", store_index, store_shard_nums);
+  }
+
+}
+
 seastar::future<> CyanStore::Shard::mkfs()
 {
+  if (!store_active) {
+    return seastar::now();
+  }
   std::string fn =
     path + "/collections" + std::to_string(seastar::this_shard_id());
   ceph::bufferlist bl;
@@ -132,16 +221,29 @@ seastar::future<std::vector<coll_core_t>>
 CyanStore::list_collections()
 {
   ceph_assert(seastar::this_shard_id() == primary_core);
-  return seastar::do_with(std::vector<coll_core_t>{}, [this](auto &collections) {
-    return shard_stores.map([](auto &local_store) {
-      return local_store.list_collections();
-    }).then([&collections](std::vector<std::vector<coll_core_t>> results) {
-      for (auto& colls : results) {
-        collections.insert(collections.end(), colls.begin(), colls.end());
+  return shard_stores.map_reduce0(
+    [](auto& local_store) {
+    // For each local store, collect all collections from its mshard_stores
+    return seastar::map_reduce(
+      local_store.mshard_stores.begin(),
+      local_store.mshard_stores.end(),
+      [](auto& mshard_store) {
+        return mshard_store->list_collections();
+      },
+      std::vector<coll_core_t>(),  // Initial empty vector
+      [](auto&& merged, auto&& result) {  // Reduction function
+        merged.insert(merged.end(), result.begin(), result.end());
+        return std::move(merged);
       }
-      return seastar::make_ready_future<std::vector<coll_core_t>>(
-        std::move(collections));
-    });
+    );
+    },
+    std::vector<coll_core_t>(),  // Initial empty vector for final reduction
+    [](auto&& total, auto&& shard_result) {  // Final reduction function
+      total.insert(total.end(), shard_result.begin(), shard_result.end());
+      return std::move(total);
+    }
+  ).then([](auto all_collections) {
+    return seastar::make_ready_future<std::vector<coll_core_t>>(std::move(all_collections));
   });
 }
 
@@ -153,10 +255,13 @@ CyanStore::get_default_device_class()
 
 CyanStore::mount_ertr::future<> CyanStore::Shard::mount()
 {
+  if (!store_active) {
+    return mount_ertr::now();
+  }
   static const char read_file_errmsg[]{"read_file"};
   ceph::bufferlist bl;
   std::string fn =
-    path + "/collections" + std::to_string(seastar::this_shard_id());
+    path + "/collections" + std::to_string(seastar::this_shard_id() + seastar::smp::count * store_index);
   std::string err;
   if (int r = bl.read_file(fn.c_str(), &err); r < 0) {
     return crimson::stateful_ec{ singleton_ec<read_file_errmsg>() };
@@ -168,7 +273,7 @@ CyanStore::mount_ertr::future<> CyanStore::Shard::mount()
 
   for (auto& coll : collections) {
     std::string fn = fmt::format("{}/{}{}", path, coll,
-      std::to_string(seastar::this_shard_id()));
+      std::to_string(seastar::this_shard_id() + seastar::smp::count * store_index));
     ceph::bufferlist cbl;
     if (int r = cbl.read_file(fn.c_str(), &err); r < 0) {
       return crimson::stateful_ec{ singleton_ec<read_file_errmsg>() };
@@ -184,6 +289,9 @@ CyanStore::mount_ertr::future<> CyanStore::Shard::mount()
 
 seastar::future<> CyanStore::Shard::umount()
 {
+  if (!store_active) {
+    return seastar::now();
+  }
   return seastar::do_with(std::set<coll_t>{}, [this](auto& collections) {
     return seastar::do_for_each(coll_map, [&collections, this](auto& coll) {
       auto& [col, ch] = coll;
@@ -192,13 +300,13 @@ seastar::future<> CyanStore::Shard::umount()
       ceph_assert(ch);
       ch->encode(bl);
       std::string fn = fmt::format("{}/{}{}", path, col,
-        std::to_string(seastar::this_shard_id()));
+        std::to_string(seastar::this_shard_id()+ seastar::smp::count * store_index));
       return crimson::write_file(std::move(bl), fn);
     }).then([&collections, this] {
       ceph::bufferlist bl;
       ceph::encode(collections, bl);
       std::string fn = fmt::format("{}/collections{}",
-        path, std::to_string(seastar::this_shard_id()));
+        path, std::to_string(seastar::this_shard_id()+ seastar::smp::count * store_index));
       return crimson::write_file(std::move(bl), fn);
     });
   });
@@ -212,6 +320,7 @@ CyanStore::Shard::list_objects(
   uint64_t limit,
   uint32_t op_flags) const
 {
+  assert(store_active);
   auto c = static_cast<Collection*>(ch.get());
   logger().debug("{} {} {} {} {}",
                  __func__, c->get_cid(), start, end, limit);
@@ -235,6 +344,7 @@ CyanStore::Shard::list_objects(
 seastar::future<CollectionRef>
 CyanStore::Shard::create_new_collection(const coll_t& cid)
 {
+  assert(store_active);
   auto c = new Collection{cid};
   new_coll_map[cid] = c;
   return seastar::make_ready_future<CollectionRef>(c);
@@ -243,15 +353,19 @@ CyanStore::Shard::create_new_collection(const coll_t& cid)
 seastar::future<CollectionRef>
 CyanStore::Shard::open_collection(const coll_t& cid)
 {
+  assert(store_active);
   return seastar::make_ready_future<CollectionRef>(_get_collection(cid));
 }
 
 seastar::future<std::vector<coll_core_t>>
 CyanStore::Shard::list_collections()
 {
+  if (!store_active) {
+    return seastar::make_ready_future<std::vector<coll_core_t>>();
+  }
   std::vector<coll_core_t> collections;
   for (auto& coll : coll_map) {
-    collections.push_back(std::make_pair(coll.first, seastar::this_shard_id()));
+    collections.push_back(std::make_pair(coll.first, std::make_pair(seastar::this_shard_id(), store_index)));
   }
   return seastar::make_ready_future<std::vector<coll_core_t>>(std::move(collections));
 }
@@ -262,6 +376,7 @@ CyanStore::Shard::exists(
   const ghobject_t &oid,
   uint32_t op_flags)
 {
+  assert(store_active);
   auto c = static_cast<Collection*>(ch.get());
   if (!c->exists) {
     return base_errorator::make_ready_future<bool>(false);
@@ -277,6 +392,7 @@ seastar::future<>
 CyanStore::Shard::set_collection_opts(CollectionRef ch,
                                       const pool_opts_t& opts)
 {
+  assert(store_active);
   auto c = static_cast<Collection*>(ch.get());
   logger().debug("{} {}", __func__, c->get_cid());
   c->pool_opts = opts;
@@ -291,6 +407,7 @@ CyanStore::Shard::read(
   size_t len,
   uint32_t op_flags)
 {
+  assert(store_active);
   auto c = static_cast<Collection*>(ch.get());
   logger().debug("{} {} {} {}~{}",
                 __func__, c->get_cid(), oid, offset, len);
@@ -318,6 +435,7 @@ CyanStore::Shard::readv(
   interval_set<uint64_t>& m,
   uint32_t op_flags)
 {
+  assert(store_active);
   return seastar::do_with(ceph::bufferlist{},
     [this, ch, oid, &m, op_flags](auto& bl) {
     return crimson::do_for_each(m,
@@ -339,6 +457,7 @@ CyanStore::Shard::get_attr(
   std::string_view name,
   uint32_t op_flags) const
 {
+  assert(store_active);
   auto c = static_cast<Collection*>(ch.get());
   logger().debug("{} {} {}",
                 __func__, c->get_cid(), oid);
@@ -359,6 +478,7 @@ CyanStore::Shard::get_attrs(
   const ghobject_t& oid,
   uint32_t op_flags)
 {
+  assert(store_active);
   auto c = static_cast<Collection*>(ch.get());
   logger().debug("{} {} {}",
                 __func__, c->get_cid(), oid);
@@ -376,6 +496,7 @@ auto CyanStore::Shard::omap_get_values(
   uint32_t op_flags)
   -> read_errorator::future<omap_values_t>
 {
+  assert(store_active);
   auto c = static_cast<Collection*>(ch.get());
   logger().debug("{} {} {}", __func__, c->get_cid(), oid);
   auto o = c->get_object(oid);
@@ -399,6 +520,7 @@ auto CyanStore::Shard::omap_iterate(
   uint32_t op_flags)
   -> CyanStore::Shard::read_errorator::future<ObjectStore::omap_iter_ret_t>
 {
+  assert(store_active);
   auto c = static_cast<Collection*>(ch.get());
   logger().debug("{} {} {}", __func__, c->get_cid(), oid);
   auto o = c->get_object(oid);
@@ -424,6 +546,7 @@ auto CyanStore::Shard::omap_get_header(
   uint32_t op_flags)
   -> CyanStore::Shard::get_attr_errorator::future<ceph::bufferlist>
 {
+  assert(store_active);
   auto c = static_cast<Collection*>(ch.get());
   auto o = c->get_object(oid);
   if (!o) {
@@ -438,6 +561,7 @@ seastar::future<> CyanStore::Shard::do_transaction_no_callbacks(
   CollectionRef ch,
   ceph::os::Transaction&& t)
 {
+  assert(store_active);
   using ceph::os::Transaction;
   int r = 0;
   try {
@@ -992,6 +1116,7 @@ CyanStore::Shard::fiemap(
   uint64_t len,
   uint32_t op_flags)
 {
+  assert(store_active);
   auto c = static_cast<Collection*>(ch.get());
 
   ObjectRef o = c->get_object(oid);
@@ -1008,6 +1133,7 @@ CyanStore::Shard::stat(
   const ghobject_t& oid,
   uint32_t op_flags)
 {
+  assert(store_active);
   auto c = static_cast<Collection*>(ch.get());
   auto o = c->get_object(oid);
   if (!o) {
index 44f6db3c08ee78cd4342ef9ac6ef6d3269f2e899..954bb309570519473b8972f42c55445425b646ad 100644 (file)
@@ -12,6 +12,8 @@
 #include <optional>
 #include <seastar/core/future.hh>
 #include <seastar/core/future-util.hh>
+#include <seastar/core/sharded.hh>
+#include <seastar/core/shared_ptr.hh>
 
 #include "osd/osd_types.h"
 #include "include/uuid.h"
@@ -29,8 +31,10 @@ class CyanStore final : public FuturizedStore {
 public:
   class Shard : public FuturizedStore::Shard {
   public:
-    Shard(std::string path)
-      :path(path){}
+    Shard(std::string path,
+      unsigned int store_shard_nums,
+      unsigned int store_index = 0);
+    ~Shard() = default;
 
     seastar::future<struct stat> stat(
       CollectionRef c,
@@ -128,7 +132,19 @@ public:
     using coll_core_t = FuturizedStore::coll_core_t;
     seastar::future<std::vector<coll_core_t>> list_collections();
 
-    uint64_t get_used_bytes() const { return used_bytes; }
+    uint64_t get_used_bytes() const {
+      if (!store_active) {
+        return 0;
+      }
+      return used_bytes;
+    }
+
+    unsigned int get_store_index() const {
+      return store_index;
+    }
+    bool get_status() const {
+      return store_active;
+    }
 
   private:
     int _remove(const coll_t& cid, const ghobject_t& oid);
@@ -175,38 +191,20 @@ public:
     const std::string path;
     std::unordered_map<coll_t, boost::intrusive_ptr<Collection>> coll_map;
     std::map<coll_t, boost::intrusive_ptr<Collection>> new_coll_map;
+    unsigned int store_index;
+    bool store_active = true;
   };
 
   CyanStore(const std::string& path);
   ~CyanStore() final;
 
-  seastar::future<> start() final {
-    ceph_assert(seastar::this_shard_id() == primary_core);
-    return shard_stores.start(path);
-  }
+  seastar::future<unsigned int> start() final;
 
-  seastar::future<> stop() final {
-    ceph_assert(seastar::this_shard_id() == primary_core);
-    return shard_stores.stop();
-  }
+  seastar::future<> stop() final;
 
-  mount_ertr::future<> mount() final {
-    ceph_assert(seastar::this_shard_id() == primary_core);
-    return shard_stores.invoke_on_all(
-      [](auto &local_store) {
-      return local_store.mount().handle_error(
-      crimson::stateful_ec::assert_failure(
-        fmt::format("error mounting cyanstore").c_str()));
-    });
-  }
+  mount_ertr::future<> mount() final;
 
-  seastar::future<> umount() final {
-    ceph_assert(seastar::this_shard_id() == primary_core);
-    return shard_stores.invoke_on_all(
-      [](auto &local_store) {
-      return local_store.umount();
-    });
-  }
+  seastar::future<> umount() final;
 
   mkfs_ertr::future<> mkfs(uuid_d new_osd_fsid) final;
 
@@ -219,8 +217,24 @@ public:
   seastar::future<> write_meta(const std::string& key,
                  const std::string& value) final;
 
-  FuturizedStore::Shard& get_sharded_store() final{
-    return shard_stores.local();
+  FuturizedStore::StoreShardRef get_sharded_store(unsigned int store_index = 0) final {
+    assert(!shard_stores.local().mshard_stores.empty());
+    assert(store_index < shard_stores.local().mshard_stores.size());
+    assert(shard_stores.local().mshard_stores[store_index]->get_status() == true);
+    return make_local_shared_foreign(
+      seastar::make_foreign(seastar::static_pointer_cast<FuturizedStore::Shard>(
+        shard_stores.local().mshard_stores[store_index])));
+  }
+  std::vector<FuturizedStore::StoreShardRef> get_sharded_stores() final{
+    std::vector<FuturizedStore::StoreShardRef> ret;
+    ret.reserve(shard_stores.local().mshard_stores.size());
+    for (auto& mshard_store : shard_stores.local().mshard_stores) {
+      if (mshard_store->get_status() == true) {
+        ret.emplace_back(make_local_shared_foreign(
+          seastar::make_foreign(seastar::static_pointer_cast<FuturizedStore::Shard>(mshard_store))));
+      }
+    }
+    return ret;
   }
 
   seastar::future<std::tuple<int, std::string>>
@@ -230,8 +244,31 @@ public:
 
   seastar::future<std::string> get_default_device_class() final;
 
+  seastar::future<> get_shard_nums();
+
+
 private:
-  seastar::sharded<CyanStore::Shard> shard_stores;
+class MultiShardStores {
+  public:
+    std::vector<seastar::shared_ptr<CyanStore::Shard>> mshard_stores;
+
+  public:
+    MultiShardStores(size_t count,
+                     const std::string path,
+                     unsigned int store_shard_nums)
+    : mshard_stores() {
+      mshard_stores.reserve(count); // Reserve space for the shards
+      for (size_t store_index = 0; store_index < count; ++store_index) {
+        mshard_stores.emplace_back(seastar::make_shared<CyanStore::Shard>(
+          path, store_shard_nums, store_index));
+      }
+    }
+    ~MultiShardStores() {
+      mshard_stores.clear();
+    }
+  };
+  seastar::sharded<CyanStore::MultiShardStores> shard_stores;
+  unsigned int store_shard_nums = 0;
   const std::string path;
   uuid_d osd_fsid;
 };