]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/os/cyanstore: create multiple store shards on each reactor
authorchunmei liu <chunmei.liu@ibm.com>
Tue, 15 Jul 2025 10:27:16 +0000 (03:27 -0700)
committerChunmei Liu <chunmei.liu@ibm.com>
Thu, 21 Aug 2025 02:30:37 +0000 (02:30 +0000)
note: src/stop.sh should wait enought time before kill the crimson-osd
in case cyanstore can't write meta data to disk.

Signed-off-by: chunmei liu <chunmei.liu@ibm.com>
src/crimson/os/cyanstore/cyan_store.cc
src/crimson/os/cyanstore/cyan_store.h

index 9c3f5d6551cd606618b5a17c972bc3d38f7306b5..a8a6ba43d86eaa320d0fea09f79eb40952cf97d4 100644 (file)
@@ -54,19 +54,90 @@ private:
   };
 };
 
+namespace fs = std::filesystem;
+seastar::future<> CyanStore::get_shard_nums()
+{
+  store_shard_nums = 0;
+  for (const auto& entry : fs::directory_iterator(path)) {
+    const std::string filename = entry.path().filename().string();
+    if (filename.rfind("collections", 0) == 0) {
+      store_shard_nums++;
+    }
+  }
+  if (store_shard_nums == 0) {
+    // If no collections files found, assume seastar::smp::count shards
+    store_shard_nums = seastar::smp::count;
+  }
+  return seastar::make_ready_future<>();
+}
+
+seastar::future<unsigned int> CyanStore::start()
+{
+  ceph_assert(seastar::this_shard_id() == primary_core);
+  return get_shard_nums().then([this] {
+    auto num_shard_services = (store_shard_nums + seastar::smp::count - 1 ) / seastar::smp::count;
+    logger().info("store_shard_nums={} seastar::smp={}, num_shard_services={}", store_shard_nums, seastar::smp::count, num_shard_services);
+    return shard_stores.start(num_shard_services, path, store_shard_nums);
+  }).then([this] {
+    logger().debug("CyanStore started with {} shard stores", store_shard_nums);
+    return seastar::make_ready_future<unsigned int>(store_shard_nums);
+  });
+}
+
+seastar::future<> CyanStore::stop()
+{
+  logger().debug("stopping shard stores");
+  return shard_stores.stop();
+}
+
+CyanStore::mount_ertr::future<> CyanStore::mount()
+{
+  ceph_assert(seastar::this_shard_id() == primary_core);
+  return shard_stores.invoke_on_all([](auto &local_store) {
+    return seastar::do_for_each(local_store.mshard_stores, [](auto& mshard_store) {
+      return mshard_store->mount().handle_error(
+        crimson::ct_error::assert_all{
+          "Invalid error in CyanStore::mount"
+        });
+    });
+  });
+}
+
+seastar::future<> CyanStore::umount()
+{
+  ceph_assert(seastar::this_shard_id() == primary_core);
+  return shard_stores.invoke_on_all([](auto &local_store) {
+    return seastar::do_for_each(local_store.mshard_stores, [](auto& mshard_store) {
+      return mshard_store->umount();
+    });
+  });
+}
+
 seastar::future<store_statfs_t> CyanStore::stat() const
 {
   ceph_assert(seastar::this_shard_id() == primary_core);
   logger().debug("{}", __func__);
+
   return shard_stores.map_reduce0(
-    [](const CyanStore::Shard &local_store) {
-      return local_store.get_used_bytes();
+    [](const auto& local_store) {
+      return seastar::map_reduce(
+        local_store.mshard_stores.begin(),
+        local_store.mshard_stores.end(),
+        [](const auto& mshard_store) {
+          return seastar::make_ready_future<uint64_t>(
+            mshard_store->get_used_bytes()
+          );
+        },
+        uint64_t{0},
+        std::plus<uint64_t>()
+      );
     },
-    (uint64_t)0,
+    uint64_t{0},
     std::plus<uint64_t>()
   ).then([](uint64_t used_bytes) {
     store_statfs_t st;
-    st.total = crimson::common::local_conf().get_val<Option::size_t>("memstore_device_bytes");
+    st.total = crimson::common::local_conf()
+      .get_val<Option::size_t>("memstore_device_bytes");
     st.available = st.total - used_bytes;
     return seastar::make_ready_future<store_statfs_t>(std::move(st));
   });
@@ -109,15 +180,32 @@ CyanStore::mkfs_ertr::future<> CyanStore::mkfs(uuid_d new_osd_fsid)
   }).safe_then([this]{
     return write_meta("type", "memstore");
   }).safe_then([this] {
-    return shard_stores.invoke_on_all(
-      [](auto &local_store) {
-      return local_store.mkfs();
+    return shard_stores.invoke_on_all([](auto &local_store) {
+      return seastar::do_for_each(local_store.mshard_stores, [](auto& mshard_store) {
+        return mshard_store->mkfs();
+      });
     });
   });
 }
 
+CyanStore::Shard::Shard(
+  std::string path,
+  unsigned int store_shard_nums,
+  unsigned int store_index)
+  : path(path),
+    store_index(store_index)
+{
+  ceph_assert(store_index < store_shard_nums);
+  if(seastar::this_shard_id() + seastar::smp::count * store_index >= store_shard_nums) {
+    store_active = false;
+  }
+}
+
 seastar::future<> CyanStore::Shard::mkfs()
 {
+  if(!store_active) {
+    return seastar::now();
+  }
   std::string fn =
     path + "/collections" + std::to_string(seastar::this_shard_id());
   ceph::bufferlist bl;
@@ -131,16 +219,29 @@ seastar::future<std::vector<coll_core_t>>
 CyanStore::list_collections()
 {
   ceph_assert(seastar::this_shard_id() == primary_core);
-  return seastar::do_with(std::vector<coll_core_t>{}, [this](auto &collections) {
-    return shard_stores.map([](auto &local_store) {
-      return local_store.list_collections();
-    }).then([&collections](std::vector<std::vector<coll_core_t>> results) {
-      for (auto& colls : results) {
-        collections.insert(collections.end(), colls.begin(), colls.end());
+  return shard_stores.map_reduce0(
+    [this](auto& local_store) {
+    // For each local store, collect all collections from its mshard_stores
+    return seastar::map_reduce(
+      local_store.mshard_stores.begin(),
+      local_store.mshard_stores.end(),
+      [](auto& mshard_store) {
+        return mshard_store->list_collections();
+      },
+      std::vector<coll_core_t>(),  // Initial empty vector
+      [](auto&& merged, auto&& result) {  // Reduction function
+        merged.insert(merged.end(), result.begin(), result.end());
+        return std::move(merged);
       }
-      return seastar::make_ready_future<std::vector<coll_core_t>>(
-        std::move(collections));
-    });
+    );
+    },
+    std::vector<coll_core_t>(),  // Initial empty vector for final reduction
+    [](auto&& total, auto&& shard_result) {  // Final reduction function
+      total.insert(total.end(), shard_result.begin(), shard_result.end());
+      return std::move(total);
+    }
+  ).then([](auto all_collections) {
+    return seastar::make_ready_future<std::vector<coll_core_t>>(std::move(all_collections));
   });
 }
 
@@ -152,10 +253,13 @@ CyanStore::get_default_device_class()
 
 CyanStore::mount_ertr::future<> CyanStore::Shard::mount()
 {
+  if (!store_active) {
+    return mount_ertr::now();
+  }
   static const char read_file_errmsg[]{"read_file"};
   ceph::bufferlist bl;
   std::string fn =
-    path + "/collections" + std::to_string(seastar::this_shard_id());
+    path + "/collections" + std::to_string(seastar::this_shard_id() + seastar::smp::count * store_index);
   std::string err;
   if (int r = bl.read_file(fn.c_str(), &err); r < 0) {
     return crimson::stateful_ec{ singleton_ec<read_file_errmsg>() };
@@ -167,7 +271,7 @@ CyanStore::mount_ertr::future<> CyanStore::Shard::mount()
 
   for (auto& coll : collections) {
     std::string fn = fmt::format("{}/{}{}", path, coll,
-      std::to_string(seastar::this_shard_id()));
+      std::to_string(seastar::this_shard_id() + seastar::smp::count * store_index));
     ceph::bufferlist cbl;
     if (int r = cbl.read_file(fn.c_str(), &err); r < 0) {
       return crimson::stateful_ec{ singleton_ec<read_file_errmsg>() };
@@ -183,6 +287,9 @@ CyanStore::mount_ertr::future<> CyanStore::Shard::mount()
 
 seastar::future<> CyanStore::Shard::umount()
 {
+  if (!store_active) {
+    return seastar::now();
+  }
   return seastar::do_with(std::set<coll_t>{}, [this](auto& collections) {
     return seastar::do_for_each(coll_map, [&collections, this](auto& coll) {
       auto& [col, ch] = coll;
@@ -191,13 +298,13 @@ seastar::future<> CyanStore::Shard::umount()
       ceph_assert(ch);
       ch->encode(bl);
       std::string fn = fmt::format("{}/{}{}", path, col,
-        std::to_string(seastar::this_shard_id()));
+        std::to_string(seastar::this_shard_id()+ seastar::smp::count * store_index));
       return crimson::write_file(std::move(bl), fn);
     }).then([&collections, this] {
       ceph::bufferlist bl;
       ceph::encode(collections, bl);
       std::string fn = fmt::format("{}/collections{}",
-        path, std::to_string(seastar::this_shard_id()));
+        path, std::to_string(seastar::this_shard_id()+ seastar::smp::count * store_index));
       return crimson::write_file(std::move(bl), fn);
     });
   });
@@ -211,6 +318,7 @@ CyanStore::Shard::list_objects(
   uint64_t limit,
   uint32_t op_flags) const
 {
+  assert(store_active);
   auto c = static_cast<Collection*>(ch.get());
   logger().debug("{} {} {} {} {}",
                  __func__, c->get_cid(), start, end, limit);
@@ -234,6 +342,7 @@ CyanStore::Shard::list_objects(
 seastar::future<CollectionRef>
 CyanStore::Shard::create_new_collection(const coll_t& cid)
 {
+  assert(store_active);
   auto c = new Collection{cid};
   new_coll_map[cid] = c;
   return seastar::make_ready_future<CollectionRef>(c);
@@ -242,15 +351,19 @@ CyanStore::Shard::create_new_collection(const coll_t& cid)
 seastar::future<CollectionRef>
 CyanStore::Shard::open_collection(const coll_t& cid)
 {
+  assert(store_active);
   return seastar::make_ready_future<CollectionRef>(_get_collection(cid));
 }
 
 seastar::future<std::vector<coll_core_t>>
 CyanStore::Shard::list_collections()
 {
+  if (!store_active) {
+    return seastar::make_ready_future<std::vector<coll_core_t>>();
+  }
   std::vector<coll_core_t> collections;
   for (auto& coll : coll_map) {
-    collections.push_back(std::make_pair(coll.first, seastar::this_shard_id()));
+    collections.push_back(std::make_pair(coll.first, std::make_pair(seastar::this_shard_id(), store_index)));
   }
   return seastar::make_ready_future<std::vector<coll_core_t>>(std::move(collections));
 }
@@ -261,6 +374,7 @@ CyanStore::Shard::exists(
   const ghobject_t &oid,
   uint32_t op_flags)
 {
+  assert(store_active);
   auto c = static_cast<Collection*>(ch.get());
   if (!c->exists) {
     return base_errorator::make_ready_future<bool>(false);
@@ -276,6 +390,7 @@ seastar::future<>
 CyanStore::Shard::set_collection_opts(CollectionRef ch,
                                       const pool_opts_t& opts)
 {
+  assert(store_active);
   auto c = static_cast<Collection*>(ch.get());
   logger().debug("{} {}", __func__, c->get_cid());
   c->pool_opts = opts;
@@ -290,6 +405,7 @@ CyanStore::Shard::read(
   size_t len,
   uint32_t op_flags)
 {
+  assert(store_active);
   auto c = static_cast<Collection*>(ch.get());
   logger().debug("{} {} {} {}~{}",
                 __func__, c->get_cid(), oid, offset, len);
@@ -317,6 +433,7 @@ CyanStore::Shard::readv(
   interval_set<uint64_t>& m,
   uint32_t op_flags)
 {
+  assert(store_active);
   return seastar::do_with(ceph::bufferlist{},
     [this, ch, oid, &m, op_flags](auto& bl) {
     return crimson::do_for_each(m,
@@ -338,6 +455,7 @@ CyanStore::Shard::get_attr(
   std::string_view name,
   uint32_t op_flags) const
 {
+  assert(store_active);
   auto c = static_cast<Collection*>(ch.get());
   logger().debug("{} {} {}",
                 __func__, c->get_cid(), oid);
@@ -358,6 +476,7 @@ CyanStore::Shard::get_attrs(
   const ghobject_t& oid,
   uint32_t op_flags)
 {
+  assert(store_active);
   auto c = static_cast<Collection*>(ch.get());
   logger().debug("{} {} {}",
                 __func__, c->get_cid(), oid);
@@ -375,6 +494,7 @@ auto CyanStore::Shard::omap_get_values(
   uint32_t op_flags)
   -> read_errorator::future<omap_values_t>
 {
+  assert(store_active);
   auto c = static_cast<Collection*>(ch.get());
   logger().debug("{} {} {}", __func__, c->get_cid(), oid);
   auto o = c->get_object(oid);
@@ -398,6 +518,7 @@ auto CyanStore::Shard::omap_iterate(
   uint32_t op_flags)
   -> CyanStore::Shard::read_errorator::future<ObjectStore::omap_iter_ret_t>
 {
+  assert(store_active);
   auto c = static_cast<Collection*>(ch.get());
   logger().debug("{} {} {}", __func__, c->get_cid(), oid);
   auto o = c->get_object(oid);
@@ -423,6 +544,7 @@ auto CyanStore::Shard::omap_get_header(
   uint32_t op_flags)
   -> CyanStore::Shard::get_attr_errorator::future<ceph::bufferlist>
 {
+  assert(store_active);
   auto c = static_cast<Collection*>(ch.get());
   auto o = c->get_object(oid);
   if (!o) {
@@ -437,6 +559,7 @@ seastar::future<> CyanStore::Shard::do_transaction_no_callbacks(
   CollectionRef ch,
   ceph::os::Transaction&& t)
 {
+  assert(store_active);
   using ceph::os::Transaction;
   int r = 0;
   try {
@@ -991,6 +1114,7 @@ CyanStore::Shard::fiemap(
   uint64_t len,
   uint32_t op_flags)
 {
+  assert(store_active);
   auto c = static_cast<Collection*>(ch.get());
 
   ObjectRef o = c->get_object(oid);
@@ -1007,6 +1131,7 @@ CyanStore::Shard::stat(
   const ghobject_t& oid,
   uint32_t op_flags)
 {
+  assert(store_active);
   auto c = static_cast<Collection*>(ch.get());
   auto o = c->get_object(oid);
   if (!o) {
index f42aa63a4fe5a4aafae76ec485774b7f2d5b6b92..fd68da3ff7428849978183d0c078b573a3f078a2 100644 (file)
@@ -12,6 +12,8 @@
 #include <optional>
 #include <seastar/core/future.hh>
 #include <seastar/core/future-util.hh>
+#include <seastar/core/sharded.hh>
+#include <seastar/core/shared_ptr.hh>
 
 #include "osd/osd_types.h"
 #include "include/uuid.h"
@@ -29,8 +31,10 @@ class CyanStore final : public FuturizedStore {
 public:
   class Shard : public FuturizedStore::Shard {
   public:
-    Shard(std::string path)
-      :path(path){}
+    Shard(std::string path,
+      unsigned int store_shard_nums,
+      unsigned int store_index = 0);
+    ~Shard() = default;
 
     seastar::future<struct stat> stat(
       CollectionRef c,
@@ -128,7 +132,19 @@ public:
     using coll_core_t = FuturizedStore::coll_core_t;
     seastar::future<std::vector<coll_core_t>> list_collections();
 
-    uint64_t get_used_bytes() const { return used_bytes; }
+    uint64_t get_used_bytes() const {
+      if (!store_active) {
+        return 0;
+      }
+      return used_bytes;
+    }
+
+    unsigned int get_store_index() const {
+      return store_index;
+    }
+    bool get_status() const {
+      return store_active;
+    }
 
   private:
     int _remove(const coll_t& cid, const ghobject_t& oid);
@@ -175,38 +191,20 @@ public:
     const std::string path;
     std::unordered_map<coll_t, boost::intrusive_ptr<Collection>> coll_map;
     std::map<coll_t, boost::intrusive_ptr<Collection>> new_coll_map;
+    unsigned int store_index;
+    bool store_active = true;
   };
 
   CyanStore(const std::string& path);
   ~CyanStore() final;
 
-  seastar::future<> start() final {
-    ceph_assert(seastar::this_shard_id() == primary_core);
-    return shard_stores.start(path);
-  }
+  seastar::future<unsigned int> start() final;
 
-  seastar::future<> stop() final {
-    ceph_assert(seastar::this_shard_id() == primary_core);
-    return shard_stores.stop();
-  }
+  seastar::future<> stop() final;
 
-  mount_ertr::future<> mount() final {
-    ceph_assert(seastar::this_shard_id() == primary_core);
-    return shard_stores.invoke_on_all(
-      [](auto &local_store) {
-      return local_store.mount().handle_error(
-      crimson::stateful_ec::assert_failure(
-        fmt::format("error mounting cyanstore").c_str()));
-    });
-  }
+  mount_ertr::future<> mount() final;
 
-  seastar::future<> umount() final {
-    ceph_assert(seastar::this_shard_id() == primary_core);
-    return shard_stores.invoke_on_all(
-      [](auto &local_store) {
-      return local_store.umount();
-    });
-  }
+  seastar::future<> umount() final;
 
   mkfs_ertr::future<> mkfs(uuid_d new_osd_fsid) final;
 
@@ -219,8 +217,24 @@ public:
   seastar::future<> write_meta(const std::string& key,
                  const std::string& value) final;
 
-  FuturizedStore::Shard& get_sharded_store() final{
-    return shard_stores.local();
+  FuturizedStore::StoreShardRef get_sharded_store(unsigned int store_index = 0) final {
+    assert(!shard_stores.local().mshard_stores.empty());
+    assert(store_index < shard_stores.local().mshard_stores.size());
+    assert(shard_stores.local().mshard_stores[store_index]->get_status() == true);
+    return make_local_shared_foreign(
+      seastar::make_foreign(seastar::static_pointer_cast<FuturizedStore::Shard>(
+        shard_stores.local().mshard_stores[store_index])));
+  }
+  std::vector<FuturizedStore::StoreShardRef> get_sharded_stores() final{
+    std::vector<FuturizedStore::StoreShardRef> ret;
+    ret.reserve(shard_stores.local().mshard_stores.size());
+    for (auto& mshard_store : shard_stores.local().mshard_stores) {
+      if (mshard_store->get_status() == true) {
+        ret.emplace_back(make_local_shared_foreign(
+          seastar::make_foreign(seastar::static_pointer_cast<FuturizedStore::Shard>(mshard_store))));
+      }
+    }
+    return ret;
   }
 
   seastar::future<std::tuple<int, std::string>>
@@ -230,8 +244,31 @@ public:
 
   seastar::future<std::string> get_default_device_class() final;
 
+  seastar::future<> get_shard_nums();
+
+
 private:
-  seastar::sharded<CyanStore::Shard> shard_stores;
+class MultiShardStores {
+  public:
+    std::vector<seastar::shared_ptr<CyanStore::Shard>> mshard_stores;
+
+  public:
+    MultiShardStores(size_t count,
+                     const std::string path,
+                     unsigned int store_shard_nums)
+    : mshard_stores() {
+      mshard_stores.reserve(count); // Reserve space for the shards
+      for (size_t store_index = 0; store_index < count; ++store_index) {
+        mshard_stores.emplace_back(seastar::make_shared<CyanStore::Shard>(
+          path, store_shard_nums, store_index));
+      }
+    }
+    ~MultiShardStores() {
+      mshard_stores.clear();
+    }
+  };
+  seastar::sharded<CyanStore::MultiShardStores> shard_stores;
+  unsigned int store_shard_nums = 0;
   const std::string path;
   uuid_d osd_fsid;
 };