]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/os/seastore: create multiple device shards and
authorChunmei Liu <chunmei.liu@ibm.com>
Fri, 17 Oct 2025 23:15:40 +0000 (23:15 +0000)
committerChunmei Liu <chunmei.liu@ibm.com>
Fri, 17 Oct 2025 23:15:40 +0000 (23:15 +0000)
 store shards on each reactor.

Signed-off-by: Chunmei Liu <chunmei.liu@ibm.com>
src/crimson/os/futurized_store.h
src/crimson/os/seastore/device.h
src/crimson/os/seastore/seastore.cc
src/crimson/os/seastore/seastore.h
src/crimson/os/seastore/segment_manager.h
src/crimson/os/seastore/segment_manager/block.cc
src/crimson/os/seastore/segment_manager/block.h

index 1ef8db7422e02ae014589d6a59469caa12baad22..b2fa51bea064d8f98cd27babc757969dfcfb319c 100644 (file)
@@ -9,8 +9,11 @@
 #include <vector>
 
 #include <seastar/core/future.hh>
+#include <seastar/core/shared_ptr.hh>
+#include <seastar/core/sharded.hh>
 
 #include "os/Transaction.h"
+#include "crimson/common/local_shared_foreign_ptr.h"
 #include "crimson/common/smp_helpers.h"
 #include "crimson/common/smp_helpers.h"
 #include "crimson/osd/exceptions.h"
@@ -205,7 +208,7 @@ public:
   explicit FuturizedStore(const FuturizedStore& o) = delete;
   const FuturizedStore& operator=(const FuturizedStore& o) = delete;
 
-  virtual seastar::future<> start() = 0;
+  virtual seastar::future<unsigned int> start() = 0;
 
   virtual seastar::future<> stop() = 0;
 
@@ -227,13 +230,21 @@ public:
 
   virtual seastar::future<> write_meta(const std::string& key,
                                       const std::string& value) = 0;
+
+  using StoreShardLRef = seastar::shared_ptr<FuturizedStore::Shard>;
+  using StoreShardFRef = seastar::foreign_ptr<StoreShardLRef>;
+  using StoreShardRef = ::crimson::local_shared_foreign_ptr<StoreShardLRef>;
+  using StoreShardFFRef = seastar::foreign_ptr<StoreShardRef>;
+  using StoreShardXcoreRef = ::crimson::local_shared_foreign_ptr<StoreShardRef>;
+
   // called on the shard and get this FuturizedStore::shard;
-  virtual Shard& get_sharded_store() = 0;
+  virtual StoreShardRef get_sharded_store(unsigned int store_index = 0) = 0;
+  virtual std::vector<StoreShardRef> get_sharded_stores() = 0;
 
   virtual seastar::future<std::tuple<int, std::string>> read_meta(
     const std::string& key) = 0;
 
-  using coll_core_t = std::pair<coll_t, core_id_t>;
+  using coll_core_t = std::pair<coll_t, std::pair<core_id_t, unsigned int>>;
   virtual seastar::future<std::vector<coll_core_t>> list_collections() = 0;
 
   virtual seastar::future<std::string> get_default_device_class() = 0;
index 5ef1b64217110d4a7aed15aced56a674df2cbe8c..19c63404bbd04b1f5a99a8276e95d40abd9ef031 100644 (file)
@@ -90,15 +90,16 @@ class Device {
 public:
   virtual ~Device() {}
 
-  virtual seastar::future<> start() {
+  virtual seastar::future<> start(unsigned int shard_nums) {
     return seastar::now();
   }
 
   virtual seastar::future<> stop() {
     return seastar::now();
   }
+
   // called on the shard to get this shard device;
-  virtual Device& get_sharded_device() {
+  virtual Device& get_sharded_device(unsigned int store_index = 0) {
     return *this;
   }
 
@@ -166,6 +167,9 @@ public:
       return read_ertr::make_ready_future<bufferptr>(std::move(*ptrref));
     });
   }
+  virtual read_ertr::future<unsigned int> get_shard_nums() {
+    return read_ertr::make_ready_future<unsigned int>(seastar::smp::count);
+  }
 };
 
 using check_create_device_ertr = Device::access_ertr;
index 89b0843438d9e8f7be5a236ca520bd10ba5de4fe..b8329b077c7642e44c97f4971f5389640551eafe 100644 (file)
@@ -127,16 +127,23 @@ using crimson::common::get_conf;
 SeaStore::Shard::Shard(
   std::string root,
   Device* dev,
-  bool is_test)
+  bool is_test,
+  unsigned int store_shard_nums,
+  unsigned int store_index)
   :root(root),
    max_object_size(
      get_conf<uint64_t>("seastore_default_max_object_size")),
    is_test(is_test),
    throttler(
-      get_conf<uint64_t>("seastore_max_concurrent_transactions"))
+      get_conf<uint64_t>("seastore_max_concurrent_transactions")),
+   store_index(store_index)
 {
-  device = &(dev->get_sharded_device());
-  register_metrics();
+  if(seastar::this_shard_id() + seastar::smp::count * store_index >= store_shard_nums) {
+    store_active = false;
+  }
+  device = &(dev->get_sharded_device(store_index));
+
+  register_metrics(store_index);
 }
 
 SeaStore::SeaStore(
@@ -145,12 +152,16 @@ SeaStore::SeaStore(
   : root(root),
     mdstore(std::move(mdstore))
 {
+  store_shard_nums = seastar::smp::count;
 }
 
 SeaStore::~SeaStore() = default;
 
-void SeaStore::Shard::register_metrics()
+void SeaStore::Shard::register_metrics(unsigned int store_index)
 {
+  if(!store_active) {
+    return;
+  }
   namespace sm = seastar::metrics;
   using op_type_t = crimson::os::seastore::op_type_t;
   std::pair<op_type_t, sm::label_instance> labels_by_op_type[] = {
@@ -174,7 +185,7 @@ void SeaStore::Shard::register_metrics()
             return get_latency(op_type);
           },
           sm::description(desc),
-          {label}
+          {label, sm::label_instance("shard_store_index", std::to_string(store_index))}
         ),
       }
     );
@@ -188,21 +199,60 @@ void SeaStore::Shard::register_metrics()
        [this] {
          return throttler.get_current();
        },
-       sm::description("transactions that are running inside seastore")
+  sm::description("transactions that are running inside seastore"),
+  {sm::label_instance("shard_store_index", std::to_string(store_index))}
       ),
       sm::make_gauge(
        "pending_transactions",
        [this] {
          return throttler.get_pending();
        },
-       sm::description("transactions waiting to get "
-                       "through seastore's throttler")
+  sm::description("transactions waiting to get "
+                       "through seastore's throttler"),
+  {sm::label_instance("shard_store_index", std::to_string(store_index))}
       )
     }
   );
 }
 
-seastar::future<> SeaStore::start()
+seastar::future<> SeaStore::get_shard_nums()
+{
+  LOG_PREFIX(SeaStore::get_shard_nums);
+  auto tuple = co_await read_meta("mkfs_done");
+  auto [done, value] = tuple;
+  if (done == -1) {
+    INFO("seastore not mkfs yet");
+    store_shard_nums = seastar::smp::count;
+    co_return;
+  } else {
+    INFO("seastore mkfs done");
+    auto shard_nums = co_await device->get_shard_nums(
+      ).handle_error(
+        crimson::ct_error::assert_all{
+          "Invalid error in device->get_shard_nums"
+      });
+    INFO("seastore shard nums {}", shard_nums);
+    store_shard_nums = shard_nums;
+    co_return;
+  }
+}
+
+seastar::future<> SeaStore::shard_stores_start(bool is_test)
+{
+  LOG_PREFIX(SeaStore::shard_stores_start);
+  auto num_shard_services = (store_shard_nums + seastar::smp::count - 1 ) / seastar::smp::count;
+  INFO("store_shard_nums={} seastar::smp={}, num_shard_services={}", store_shard_nums, seastar::smp::count, num_shard_services);
+  return shard_stores.start(num_shard_services, root, device.get(), is_test, store_shard_nums);
+}
+
+seastar::future<> SeaStore::shard_stores_stop()
+{
+  LOG_PREFIX(SeaStore::shard_stores_stop);
+  INFO("stopping shard stores");
+  return shard_stores.stop();
+}
+
+seastar::future<unsigned int> SeaStore::start()
 {
   LOG_PREFIX(SeaStore::start);
   INFO("...");
@@ -222,10 +272,12 @@ seastar::future<> SeaStore::start()
   ceph_assert(root != "");
   DeviceRef device_obj = co_await Device::make_device(root, d_type);
   device = std::move(device_obj);
-  co_await device->start();
+  co_await get_shard_nums();
+  co_await device->start(store_shard_nums);
   ceph_assert(device);
-  co_await shard_stores.start(root, device.get(), is_test);
+  co_await shard_stores_start(is_test);;
   INFO("done");
+  co_return store_shard_nums;
 }
 
 seastar::future<> SeaStore::test_start(DeviceRef device_obj)
@@ -236,7 +288,7 @@ seastar::future<> SeaStore::test_start(DeviceRef device_obj)
   ceph_assert(device_obj);
   ceph_assert(root == "");
   device = std::move(device_obj);
-  co_await shard_stores.start_single(root, device.get(), true);
+  co_await shard_stores.start_single(1, root, device.get(), true, seastar::smp::count);
   INFO("done");
 }
 
@@ -253,7 +305,7 @@ seastar::future<> SeaStore::stop()
   if (device) {
     co_await device->stop();
   }
-  co_await shard_stores.stop();
+  co_await shard_stores_stop();
   INFO("done");
 }
 
@@ -263,7 +315,9 @@ SeaStore::mount_ertr::future<> SeaStore::test_mount()
   INFO("...");
 
   ceph_assert(seastar::this_shard_id() == primary_core);
-  co_await shard_stores.local().mount_managers();
+  co_await seastar::do_for_each(shard_stores.local().mshard_stores, [this](auto& mshard_store) {
+    return mshard_store->mount_managers();
+  });
   INFO("done");
 }
 
@@ -274,30 +328,35 @@ Device::access_ertr::future<> SeaStore::_mount()
 
   ceph_assert(seastar::this_shard_id() == primary_core);
   co_await device->mount();
-  ceph_assert(device->get_sharded_device().get_block_size() >= laddr_t::UNIT_SIZE);
+  ceph_assert(device->get_sharded_device(0).get_block_size() >= laddr_t::UNIT_SIZE);
 
-  auto &sec_devices = device->get_sharded_device().get_secondary_devices();
+  auto &sec_devices = device->get_sharded_device(0).get_secondary_devices();
   for (auto& device_entry : sec_devices) {
     device_id_t id = device_entry.first;
     [[maybe_unused]] magic_t magic = device_entry.second.magic;
     device_type_t dtype = device_entry.second.dtype;
     std::string path = fmt::format("{}/block.{}.{}", root, dtype, std::to_string(id));
     DeviceRef sec_dev = co_await Device::make_device(path, dtype);
-    co_await sec_dev->start();
+    co_await sec_dev->start(store_shard_nums);
     co_await sec_dev->mount();
-    ceph_assert(sec_dev->get_sharded_device().get_block_size() >= laddr_t::UNIT_SIZE);
-    assert(sec_dev->get_sharded_device().get_magic() == magic);
+    ceph_assert(sec_dev->get_sharded_device(0).get_block_size() >= laddr_t::UNIT_SIZE);
+    assert(sec_dev->get_sharded_device(0).get_magic() == magic);
     secondaries.emplace_back(std::move(sec_dev));
     co_await set_secondaries();
   }
   co_await shard_stores.invoke_on_all([](auto &local_store) {
-    return local_store.mount_managers();
+    return seastar::do_for_each(local_store.mshard_stores, [](auto& mshard_store) {
+      return mshard_store->mount_managers();
+    });
   });
   INFO("done");
 }
 
 seastar::future<> SeaStore::Shard::mount_managers()
 {
+  if(!store_active) {
+    return seastar::now();
+  }
   LOG_PREFIX(SeaStore::mount_managers);
   INFO("start");
   init_managers();
@@ -315,15 +374,21 @@ seastar::future<> SeaStore::umount()
 
   ceph_assert(seastar::this_shard_id() == primary_core);
   co_await shard_stores.invoke_on_all([](auto &local_store) {
-    return local_store.umount().handle_error(
-      crimson::ct_error::assert_all{"Invalid error in SeaStoreS::umount"}
-    );
+    return seastar::do_for_each(local_store.mshard_stores, [](auto& mshard_store) {
+      return mshard_store->umount().handle_error(
+        crimson::ct_error::assert_all{
+          "Invalid error in shard_store->umount"
+      });
+    });
   });
   INFO("done");
 }
 
 base_ertr::future<> SeaStore::Shard::umount()
 {
+  if(!store_active) {
+    co_return;
+  }
   if (transaction_manager) {
     co_await transaction_manager->close();
   }
@@ -357,6 +422,9 @@ SeaStore::Shard::mkfs_managers()
 {
   LOG_PREFIX(SeaStoreS::mkfs_managers);
   INFO("...");
+  if(!store_active) {
+    co_return;
+  }
   init_managers();
   co_await transaction_manager->mkfs();
   init_managers();
@@ -386,8 +454,12 @@ seastar::future<> SeaStore::set_secondaries()
 {
   auto sec_dev_ite = secondaries.rbegin();
   Device* sec_dev = sec_dev_ite->get();
+
   return shard_stores.invoke_on_all([sec_dev](auto &local_store) {
-    local_store.set_secondaries(sec_dev->get_sharded_device());
+    return seastar::do_for_each(local_store.mshard_stores, [sec_dev](auto& mshard_store) {
+      unsigned int index = mshard_store->get_store_index();
+      mshard_store->set_secondaries(sec_dev->get_sharded_device(index));
+    });
   });
 }
 
@@ -403,7 +475,7 @@ SeaStore::mkfs_ertr::future<> SeaStore::test_mkfs(uuid_d new_osd_fsid)
     ERROR("failed");
     co_return;
   }
-  co_await shard_stores.local().mkfs_managers().handle_error(
+  co_await shard_stores.local().mshard_stores[0]->mkfs_managers().handle_error(
     crimson::ct_error::assert_all{"Invalid error in SeaStore::mkfs"});
   co_await prepare_meta(new_osd_fsid);
   INFO("done");
@@ -462,7 +534,7 @@ Device::access_ertr::future<> SeaStore::_mkfs(uuid_d new_osd_fsid)
         DeviceRef sec_dev = co_await Device::make_device(path, dtype);
         auto p_sec_dev = sec_dev.get();
         secondaries.emplace_back(std::move(sec_dev));
-        co_await p_sec_dev->start();
+        co_await p_sec_dev->start(store_shard_nums);
         magic_t magic = (magic_t)std::rand();
         sds.emplace((device_id_t)id, device_spec_t{magic, dtype, (device_id_t)id});
         co_await p_sec_dev->mkfs(
@@ -491,8 +563,10 @@ Device::access_ertr::future<> SeaStore::_mkfs(uuid_d new_osd_fsid)
   co_await device->mount();
   DEBUG("mkfs managers");
   co_await shard_stores.invoke_on_all([] (auto &local_store) {
-      return local_store.mkfs_managers().handle_error(
-      crimson::ct_error::assert_all{"Invalid error in SeaStoreS::mkfs_managers"});
+    return seastar::do_for_each(local_store.mshard_stores, [](auto& mshard_store) {
+      return mshard_store->mkfs_managers().handle_error(
+        crimson::ct_error::assert_all{"Invalid error in SeaStoreS::mkfs_managers"});
+    });
   });
   co_await prepare_meta(new_osd_fsid);
   co_await umount();
@@ -507,25 +581,42 @@ SeaStore::list_collections()
   DEBUG("...");
 
   ceph_assert(seastar::this_shard_id() == primary_core);
-  return shard_stores.map([](auto &local_store) {
-    return local_store.list_collections();
-  }).then([FNAME](std::vector<std::vector<coll_core_t>> results) {
-    std::vector<coll_core_t> collections;
-    for (auto& colls : results) {
-      collections.insert(collections.end(), colls.begin(), colls.end());
+  return shard_stores.map_reduce0(
+    [this](auto& local_store) {
+    // For each local store, collect all collections from its mshard_stores
+    return seastar::map_reduce(
+      local_store.mshard_stores.begin(),
+      local_store.mshard_stores.end(),
+      [](auto& mshard_store) {
+        return mshard_store->list_collections();
+      },
+      std::vector<coll_core_t>(),
+      [](auto&& merged, auto&& result) {
+        merged.insert(merged.end(), result.begin(), result.end());
+        return std::move(merged);
+      }
+    );
+    },
+    std::vector<coll_core_t>(),
+    [](auto&& total, auto&& shard_result) {
+      total.insert(total.end(), shard_result.begin(), shard_result.end());
+      return std::move(total);
     }
-    DEBUG("got {} collections", collections.size());
-    return seastar::make_ready_future<std::vector<coll_core_t>>(
-      std::move(collections));
+  ).then([FNAME](auto all_collections) {
+    DEBUG("got {} collections", all_collections.size());
+    return seastar::make_ready_future<std::vector<coll_core_t>>(std::move(all_collections));
   });
 }
 
-store_statfs_t SeaStore::Shard::stat() const
+seastar::future<store_statfs_t> SeaStore::Shard::stat() const
 {
+  if(!store_active) {
+    return seastar::make_ready_future<store_statfs_t>(store_statfs_t());
+  }
   LOG_PREFIX(SeaStoreS::stat);
   auto ss = transaction_manager->store_stat();
   DEBUG("stat={}", ss);
-  return ss;
+  return seastar::make_ready_future<store_statfs_t>(ss);
 }
 
 seastar::future<store_statfs_t> SeaStore::stat() const
@@ -535,17 +626,26 @@ seastar::future<store_statfs_t> SeaStore::stat() const
 
   ceph_assert(seastar::this_shard_id() == primary_core);
   return shard_stores.map_reduce0(
-    [](const SeaStore::Shard &local_store) {
-      return local_store.stat();
+    [](auto& local_store) {
+        return seastar::map_reduce(
+            local_store.mshard_stores.begin(),
+            local_store.mshard_stores.end(),
+            [](auto& mshard_store) { return mshard_store->stat(); },
+            store_statfs_t(),
+            [](auto&& ss, auto&& ret) {
+                ss.add(ret);
+                return std::move(ss);
+            }
+        );
     },
     store_statfs_t(),
-    [](auto &&ss, auto &&ret) {
-      ss.add(ret);
-      return std::move(ss);
+    [](auto&& total_stats, auto&& shard_stats) {
+        total_stats.add(shard_stats);
+        return std::move(total_stats);
     }
-  ).then([FNAME](store_statfs_t ss) {
-    DEBUG("done, stat={}", ss);
-    return seastar::make_ready_future<store_statfs_t>(std::move(ss));
+  ).then([FNAME](auto final_stats) {
+    DEBUG("done, stat={}", final_stats);
+    return seastar::make_ready_future<store_statfs_t>(std::move(final_stats));
   });
 }
 
@@ -568,23 +668,25 @@ seastar::future<> SeaStore::report_stats()
   DEBUG("...");
 
   ceph_assert(seastar::this_shard_id() == primary_core);
-  shard_device_stats.resize(seastar::smp::count);
-  shard_io_stats.resize(seastar::smp::count);
-  shard_cache_stats.resize(seastar::smp::count);
-  return shard_stores.invoke_on_all([this](const Shard &local_store) {
-    bool report_detail = false;
-    double seconds = 0;
-    if (seastar::this_shard_id() == 0) {
-      // avoid too verbose logs, only report detail in a particular shard
-      report_detail = true;
-      seconds = local_store.reset_report_interval();
-    }
-    shard_device_stats[seastar::this_shard_id()] =
-      local_store.get_device_stats(report_detail, seconds);
-    shard_io_stats[seastar::this_shard_id()] =
-      local_store.get_io_stats(report_detail, seconds);
-    shard_cache_stats[seastar::this_shard_id()] =
-      local_store.get_cache_stats(report_detail, seconds);
+  shard_device_stats.resize(store_shard_nums);
+  shard_io_stats.resize(store_shard_nums);
+  shard_cache_stats.resize(store_shard_nums);
+  return shard_stores.invoke_on_all([this](auto& local_store) {
+    return seastar::do_for_each(local_store.mshard_stores, [this](auto& mshard_store) {
+      bool report_detail = false;
+      double seconds = 0;
+      if (seastar::this_shard_id() == 0 && mshard_store->get_store_index() == 0) {
+        // avoid too verbose logs, only report detail in a particular shard
+        report_detail = true;
+        seconds = mshard_store->reset_report_interval();
+      }
+      shard_device_stats[seastar::this_shard_id() + seastar::smp::count * mshard_store->get_store_index()] =
+        mshard_store->get_device_stats(report_detail, seconds);
+      shard_io_stats[seastar::this_shard_id() + seastar::smp::count * mshard_store->get_store_index()] =
+        mshard_store->get_io_stats(report_detail, seconds);
+      shard_cache_stats[seastar::this_shard_id() + seastar::smp::count * mshard_store->get_store_index()] =
+        mshard_store->get_cache_stats(report_detail, seconds);
+    });
   }).then([this, FNAME] {
     auto now = seastar::lowres_clock::now();
     if (last_tp == seastar::lowres_clock::time_point::min()) {
@@ -718,22 +820,19 @@ seastar::future<> SeaStore::report_stats()
 TransactionManager::read_extent_iertr::future<std::optional<unsigned>>
 SeaStore::Shard::get_coll_bits(CollectionRef ch, Transaction &t) const
 {
-  return transaction_manager->read_collection_root(t)
-    .si_then([this, ch, &t](auto coll_root) {
-      return collection_manager->list(coll_root, t);
-    }).si_then([ch](auto colls) {
-      auto it = std::find_if(colls.begin(), colls.end(),
-        [ch](const std::pair<coll_t, coll_info_t>& element) {
-          return element.first == ch->get_cid();
-      });
-      if (it != colls.end()) {
-        return TransactionManager::read_extent_iertr::make_ready_future<
-          std::optional<unsigned>>(it->second.split_bits);
-      } else {
-        return TransactionManager::read_extent_iertr::make_ready_future<
-         std::optional<unsigned>>(std::nullopt);
-      }
-    });
+  auto coll_root = co_await transaction_manager->read_collection_root(t);
+  auto colls = co_await collection_manager->list(coll_root, t);
+
+  auto it = std::find_if(colls.begin(), colls.end(),
+    [ch](const std::pair<coll_t, coll_info_t>& element) {
+      return element.first == ch->get_cid();
+  });
+
+  if (it != colls.end()) {
+    co_return it->second.split_bits;
+  } else {
+    co_return std::nullopt;
+  }
 }
 
 col_obj_ranges_t
@@ -815,6 +914,7 @@ SeaStore::Shard::list_objects(CollectionRef ch,
                              uint64_t limit,
                              uint32_t op_flags) const
 {
+  assert(store_active);
   ++(shard_stats.read_num);
   ++(shard_stats.pending_read_num);
 
@@ -919,6 +1019,7 @@ SeaStore::Shard::list_objects(CollectionRef ch,
 seastar::future<CollectionRef>
 SeaStore::Shard::create_new_collection(const coll_t& cid)
 {
+  assert(store_active);
   LOG_PREFIX(SeaStoreS::create_new_collection);
   DEBUG("cid={}", cid);
   return seastar::make_ready_future<CollectionRef>(_get_collection(cid));
@@ -927,13 +1028,14 @@ SeaStore::Shard::create_new_collection(const coll_t& cid)
 seastar::future<CollectionRef>
 SeaStore::Shard::open_collection(const coll_t& cid)
 {
+  assert(store_active);
   LOG_PREFIX(SeaStoreS::open_collection);
   DEBUG("cid={} ...", cid);
   return list_collections(
   ).then([cid, this, FNAME] (auto colls_cores) {
     if (auto found = std::find(colls_cores.begin(),
                                colls_cores.end(),
-                               std::make_pair(cid, seastar::this_shard_id()));
+                               std::make_pair(cid, std::make_pair(seastar::this_shard_id(), store_index)));
       found != colls_cores.end()) {
       DEBUG("cid={} exists", cid);
       return seastar::make_ready_future<CollectionRef>(_get_collection(cid));
@@ -948,6 +1050,7 @@ seastar::future<>
 SeaStore::Shard::set_collection_opts(CollectionRef c,
                                         const pool_opts_t& opts)
 {
+  assert(store_active);
   LOG_PREFIX(SeaStoreS::set_collection_opts);
   DEBUG("cid={}, opts={} not implemented", c->get_cid(), opts);
   //TODO
@@ -957,6 +1060,9 @@ SeaStore::Shard::set_collection_opts(CollectionRef c,
 seastar::future<std::vector<coll_core_t>>
 SeaStore::Shard::list_collections()
 {
+  if(!store_active) {
+    return seastar::make_ready_future<std::vector<coll_core_t>>();
+  }
   ++(shard_stats.read_num);
   ++(shard_stats.pending_read_num);
 
@@ -977,12 +1083,12 @@ SeaStore::Shard::list_collections()
           return transaction_manager->read_collection_root(t
           ).si_then([this, &t](auto coll_root) {
             return collection_manager->list(coll_root, t);
-          }).si_then([&ret](auto colls) {
+          }).si_then([this, &ret](auto colls) {
             ret.resize(colls.size());
             std::transform(
               colls.begin(), colls.end(), ret.begin(),
-              [](auto p) {
-              return std::make_pair(p.first, seastar::this_shard_id());
+              [this](auto p) {
+              return std::make_pair(p.first, std::make_pair(seastar::this_shard_id(), store_index));
             });
           });
         });
@@ -1008,6 +1114,7 @@ SeaStore::Shard::_read(
   std::size_t len,
   uint32_t op_flags)
 {
+  assert(store_active);
   LOG_PREFIX(SeaStoreS::_read);
   size_t size = onode.get_layout().size;
   if (offset >= size) {
@@ -1044,6 +1151,7 @@ SeaStore::Shard::read(
   size_t len,
   uint32_t op_flags)
 {
+  assert(store_active);
   ++(shard_stats.read_num);
   ++(shard_stats.pending_read_num);
 
@@ -1068,6 +1176,7 @@ SeaStore::Shard::exists(
   const ghobject_t& oid,
   uint32_t op_flags)
 {
+  assert(store_active);
   LOG_PREFIX(SeaStoreS::exists);
   ++(shard_stats.read_num);
   ++(shard_stats.pending_read_num);
@@ -1101,6 +1210,7 @@ SeaStore::Shard::readv(
   interval_set<uint64_t>& m,
   uint32_t op_flags)
 {
+  assert(store_active);
   LOG_PREFIX(SeaStoreS::readv);
   DEBUG("cid={} oid={} op_flags=0x{:x} {} intervals",
         ch->get_cid(), _oid, op_flags, m.num_intervals());
@@ -1131,6 +1241,7 @@ SeaStore::Shard::_get_attr(
   Onode& onode,
   std::string_view name) const
 {
+  assert(store_active);
   LOG_PREFIX(SeaStoreS::_get_attr);
   auto& layout = onode.get_layout();
   if (name == OI_ATTR && layout.oi_size) {
@@ -1155,6 +1266,7 @@ SeaStore::Shard::get_attr(
   std::string_view name,
   uint32_t op_flags) const
 {
+  assert(store_active);
   ++(shard_stats.read_num);
   ++(shard_stats.pending_read_num);
 
@@ -1182,6 +1294,7 @@ SeaStore::Shard::_get_attrs(
   Transaction& t,
   Onode& onode)
 {
+  assert(store_active);
   auto& layout = onode.get_layout();
   return omaptree_get_values(
     t, get_omap_root(omap_type_t::XATTR, onode), std::nullopt
@@ -1210,6 +1323,7 @@ SeaStore::Shard::get_attrs(
   const ghobject_t& oid,
   uint32_t op_flags)
 {
+  assert(store_active);
   ++(shard_stats.read_num);
   ++(shard_stats.pending_read_num);
 
@@ -1254,6 +1368,9 @@ seastar::future<struct stat> SeaStore::Shard::stat(
   const ghobject_t& oid,
   uint32_t op_flags)
 {
+  if(!store_active) {
+    return seastar::make_ready_future<struct stat>();
+  }
   ++(shard_stats.read_num);
   ++(shard_stats.pending_read_num);
 
@@ -1282,11 +1399,13 @@ SeaStore::Shard::omap_get_header(
   const ghobject_t& oid,
   uint32_t op_flags)
 {
+  assert(store_active);
   return get_attr(ch, oid, OMAP_HEADER_XATTR_KEY, op_flags);
 }
 
 omap_root_t SeaStore::Shard::select_log_omap_root(Onode& onode) const
 {
+  assert(store_active);
   auto log_root = get_omap_root(omap_type_t::LOG, onode);
   if (log_root.is_null()) {
     return get_omap_root(omap_type_t::OMAP, onode);
@@ -1303,6 +1422,7 @@ SeaStore::Shard::omap_get_values(
   const omap_keys_t &keys,
   uint32_t op_flags)
 {
+  assert(store_active);
   ++(shard_stats.read_num);
   ++(shard_stats.pending_read_num);
 
@@ -1332,6 +1452,7 @@ SeaStore::Shard::omap_iterate(
   omap_iterate_cb_t callback,
   uint32_t op_flags)
 {
+  assert(store_active);
   ++(shard_stats.read_num);
   ++(shard_stats.pending_read_num);
   return seastar::do_with(
@@ -1401,6 +1522,7 @@ SeaStore::Shard::fiemap(
   uint64_t len,
   uint32_t op_flags)
 {
+  assert(store_active);
   ++(shard_stats.read_num);
   ++(shard_stats.pending_read_num);
 
@@ -1436,6 +1558,7 @@ seastar::future<> SeaStore::Shard::do_transaction_no_callbacks(
   CollectionRef _ch,
   ceph::os::Transaction&& _t)
 {
+  assert(store_active);
   LOG_PREFIX(SeaStoreS::do_transaction_no_callbacks);
   ++(shard_stats.io_num);
   ++(shard_stats.pending_io_num);
@@ -1523,6 +1646,7 @@ seastar::future<> SeaStore::Shard::do_transaction_no_callbacks(
 
 seastar::future<> SeaStore::Shard::flush(CollectionRef ch)
 {
+  assert(store_active);
   ++(shard_stats.flush_num);
   ++(shard_stats.pending_flush_num);
 
@@ -2375,7 +2499,7 @@ seastar::future<> SeaStore::write_meta(
   ceph_assert(seastar::this_shard_id() == primary_core);
   return seastar::do_with(key, value,
     [this, FNAME](auto& key, auto& value) {
-    return shard_stores.local().write_meta(key, value
+    return shard_stores.local().mshard_stores[0]->write_meta(key, value
     ).then([this, &key, &value] {
       return mdstore->write_meta(key, value);
     }).safe_then([FNAME, &key, &value] {
@@ -2390,6 +2514,7 @@ seastar::future<> SeaStore::Shard::write_meta(
   const std::string& key,
   const std::string& value)
 {
+  assert(store_active);
   ++(shard_stats.io_num);
   ++(shard_stats.pending_io_num);
   // For TM::submit_transaction()
@@ -2460,6 +2585,7 @@ uuid_d SeaStore::Shard::get_fsid() const
 
 void SeaStore::Shard::init_managers()
 {
+  assert(store_active);
   LOG_PREFIX(SeaStore::init_managers);
   DEBUG("start");
   transaction_manager.reset();
@@ -2468,7 +2594,7 @@ void SeaStore::Shard::init_managers()
   shard_stats = {};
 
   transaction_manager = make_transaction_manager(
-      device, secondaries, shard_stats, is_test);
+      device, secondaries, shard_stats, store_index, is_test);
   collection_manager = std::make_unique<collection_manager::FlatCollectionManager>(
       *transaction_manager);
   onode_manager = std::make_unique<crimson::os::seastore::onode::FLTreeOnodeManager>(
@@ -2477,6 +2603,9 @@ void SeaStore::Shard::init_managers()
 
 double SeaStore::Shard::reset_report_interval() const
 {
+  if(!store_active) {
+    return 0;
+  }
   double seconds;
   auto now = seastar::lowres_clock::now();
   if (last_tp == seastar::lowres_clock::time_point::min()) {
@@ -2492,12 +2621,18 @@ double SeaStore::Shard::reset_report_interval() const
 device_stats_t SeaStore::Shard::get_device_stats(
     bool report_detail, double seconds) const
 {
+  if (!store_active) {
+    return device_stats_t();
+  }
   return transaction_manager->get_device_stats(report_detail, seconds);
 }
 
 shard_stats_t SeaStore::Shard::get_io_stats(
     bool report_detail, double seconds) const
 {
+  if (!store_active) {
+    return shard_stats_t();
+  }
   shard_stats_t ret = shard_stats;
   ret.minus(last_shard_stats);
 
@@ -2545,6 +2680,9 @@ shard_stats_t SeaStore::Shard::get_io_stats(
 cache_stats_t SeaStore::Shard::get_cache_stats(
     bool report_detail, double seconds) const
 {
+  if (!store_active) {
+    return cache_stats_t();
+  }
   return transaction_manager->get_cache_stats(
       report_detail, seconds);
 }
@@ -2576,6 +2714,7 @@ SeaStore::Shard::omaptree_get_value(
   omap_root_t&& root,
   std::string_view key) const
 {
+  assert(store_active);
   return seastar::do_with(
     BtreeOMapManager(*transaction_manager),
     std::move(root),
@@ -2607,6 +2746,7 @@ SeaStore::Shard::omaptree_get_values(
   omap_root_t&& root,
   const omap_keys_t& keys) const
 {
+  assert(store_active);
   LOG_PREFIX(SeaStoreS::omaptree_get_values);
   auto type = root.get_type();
   if (root.is_null()) {
@@ -2676,6 +2816,7 @@ SeaStore::Shard::omaptree_list(
   const std::optional<std::string>& start,
   OMapManager::omap_list_config_t config) const
 {
+  assert(store_active);
   if (root.is_null()) {
     return seastar::make_ready_future<omaptree_list_bare_ret>(
       true, omap_values_t{}
@@ -2698,6 +2839,7 @@ SeaStore::Shard::omaptree_get_values(
   omap_root_t&& root,
   const std::optional<std::string>& start) const
 {
+  assert(store_active);
   LOG_PREFIX(SeaStoreS::omaptree_get_values);
   auto type = root.get_type();
   DEBUGT("{} start={} ...", t, type, start.has_value() ? *start : "");
@@ -2717,6 +2859,7 @@ SeaStore::Shard::omaptree_do_clear(
   Transaction& t,
   omap_root_t&& root)
 {
+  assert(store_active);
   assert(!root.is_null());
   return seastar::do_with(
     BtreeOMapManager(*transaction_manager),
@@ -2736,6 +2879,7 @@ SeaStore::Shard::omaptree_clear_no_onode(
   Transaction& t,
   omap_root_t&& root)
 {
+  assert(store_active);
   LOG_PREFIX(SeaStoreS::omaptree_clear_no_onode);
   if (root.is_null()) {
     DEBUGT("{}, null root", t, root.get_type());
@@ -2771,6 +2915,7 @@ SeaStore::Shard::omaptree_clear(
   omap_root_t&& root,
   Onode& onode)
 {
+  assert(store_active);
   LOG_PREFIX(SeaStoreS::omaptree_clear);
   if (root.is_null()) {
     DEBUGT("{}, null root", t, root.get_type());
@@ -2793,6 +2938,7 @@ SeaStore::Shard::omaptree_clone(
   Onode& onode,
   Onode& d_onode)
 {
+  assert(store_active);
   LOG_PREFIX(SeaStoreS::omaptree_clone);
   DEBUGT("{} start, list ...", t, type);
   return trans_intr::repeat([&t, &onode, &d_onode, this, type, FNAME] {
@@ -2849,6 +2995,7 @@ SeaStore::Shard::omaptree_set_keys(
   Onode& onode,
   std::map<std::string, ceph::bufferlist>&& kvs)
 {
+  assert(store_active);
   return seastar::do_with(
     BtreeOMapManager(*transaction_manager),
     std::move(root),
@@ -2884,6 +3031,7 @@ SeaStore::Shard::omaptree_rm_keys(
   Onode& onode,
   omap_keys_t&& keys)
 {
+  assert(store_active);
   LOG_PREFIX(SeaStoreS::omaptree_rm_keys);
   auto type = root.get_type();
   if (root.is_null()) {
@@ -2921,6 +3069,7 @@ SeaStore::Shard::omaptree_rm_keyrange(
   std::string first,
   std::string last)
 {
+  assert(store_active);
   LOG_PREFIX(SeaStoreS::omaptree_rm_keyrange);
   auto type = root.get_type();
   if (first > last) {
@@ -2961,6 +3110,7 @@ SeaStore::Shard::omaptree_rm_key(
   Onode& onode,
   std::string&& name)
 {
+  assert(store_active);
   LOG_PREFIX(SeaStoreS::omaptree_rm_key);
   if (root.is_null()) {
     DEBUGT("{} key={}, null root", t, root.get_type(), name);
index 1cd88cc95be133d3619511ac01aa3e62b8777d74..16a271049e0c9609a141d55715f54d93fa9f8340 100644 (file)
@@ -18,6 +18,7 @@
 
 #include "os/Transaction.h"
 #include "crimson/common/throttle.h"
+#include "crimson/common/smp_helpers.h"
 #include "crimson/os/futurized_collection.h"
 #include "crimson/os/futurized_store.h"
 
@@ -93,7 +94,9 @@ public:
     Shard(
       std::string root,
       Device* device,
-      bool is_test);
+      bool is_test,
+      unsigned int store_shard_nums,
+      unsigned int store_index = 0);
     ~Shard() = default;
 
     seastar::future<struct stat> stat(
@@ -200,7 +203,7 @@ public:
 
     seastar::future<std::string> get_default_device_class();
 
-    store_statfs_t stat() const;
+    seastar::future<store_statfs_t> stat() const;
 
     uuid_d get_fsid() const;
 
@@ -216,6 +219,13 @@ public:
 
     cache_stats_t get_cache_stats(bool report_detail, double seconds) const;
 
+    unsigned int get_store_index() const {
+      return store_index;
+    }
+
+    bool get_status() const {
+      return store_active;
+    }
   private:
     struct internal_context_t {
       CollectionRef ch;
@@ -522,9 +532,11 @@ public:
     OnodeManagerRef onode_manager;
 
     common::Throttle throttler;
+    unsigned int store_index;
+    bool store_active = true;
 
     seastar::metrics::metric_group metrics;
-    void register_metrics();
+    void register_metrics(unsigned int store_index);
 
     mutable shard_stats_t shard_stats;
     mutable seastar::lowres_clock::time_point last_tp =
@@ -538,7 +550,7 @@ public:
     MDStoreRef mdstore);
   ~SeaStore();
 
-  seastar::future<> start() final;
+  seastar::future<unsigned int> start() final;
   seastar::future<> stop() final;
 
   Device::access_ertr::future<> _mount();
@@ -569,7 +581,7 @@ public:
 
   uuid_d get_fsid() const final {
     ceph_assert(seastar::this_shard_id() == primary_core);
-    return shard_stores.local().get_fsid();
+    return shard_stores.local().mshard_stores[0]->get_fsid();
   }
 
   seastar::future<> write_meta(const std::string& key, const std::string& value) final;
@@ -580,8 +592,24 @@ public:
 
   seastar::future<std::string> get_default_device_class() final;
 
-  FuturizedStore::Shard& get_sharded_store() final {
-    return shard_stores.local();
+  FuturizedStore::StoreShardRef get_sharded_store(unsigned int store_index = 0) final {
+    assert(!shard_stores.local().mshard_stores.empty());
+    assert(store_index < shard_stores.local().mshard_stores.size());
+    assert(shard_stores.local().mshard_stores[store_index]->get_status() == true);
+    return make_local_shared_foreign(
+      seastar::make_foreign(seastar::static_pointer_cast<FuturizedStore::Shard>(
+        shard_stores.local().mshard_stores[store_index])));
+  }
+  std::vector<FuturizedStore::StoreShardRef> get_sharded_stores() final {
+    std::vector<FuturizedStore::StoreShardRef> ret;
+    ret.reserve(shard_stores.local().mshard_stores.size());
+    for (auto& mshard_store : shard_stores.local().mshard_stores) {
+      if (mshard_store->get_status() == true) {
+        ret.emplace_back(make_local_shared_foreign(
+          seastar::make_foreign(seastar::static_pointer_cast<FuturizedStore::Shard>(mshard_store))));
+      }
+    }
+    return ret;
   }
 
   static col_obj_ranges_t
@@ -605,12 +633,38 @@ private:
 
   seastar::future<> set_secondaries();
 
+  seastar::future<> get_shard_nums();
+  seastar::future<> shard_stores_start(bool is_test);
+  seastar::future<> shard_stores_stop();
+
 private:
+class MultiShardStores {
+  public:
+    std::vector<seastar::shared_ptr<SeaStore::Shard>> mshard_stores;
+
+  public:
+    MultiShardStores(size_t count,
+                     const std::string& root,
+                     Device* dev,
+                     bool is_test,
+                     unsigned int store_shard_nums)
+    : mshard_stores() {
+      mshard_stores.reserve(count); // Reserve space for the shards
+      for (size_t store_index = 0; store_index < count; ++store_index) {
+        mshard_stores.emplace_back(seastar::make_shared<SeaStore::Shard>(
+          root, dev, is_test, store_shard_nums, store_index));
+      }
+    }
+    ~MultiShardStores() {
+      mshard_stores.clear();
+    }
+  };
   std::string root;
   MDStoreRef mdstore;
   DeviceRef device;
   std::vector<DeviceRef> secondaries;
-  seastar::sharded<SeaStore::Shard> shard_stores;
+  seastar::sharded<SeaStore::MultiShardStores> shard_stores;
+  unsigned int store_shard_nums = 0;
 
   mutable seastar::lowres_clock::time_point last_tp =
     seastar::lowres_clock::time_point::min();
index 1e037d320cb23bc766ef94ab3ec2271bb9a8f652..ba28c83677729c3b417f90323c92cf3218569938 100644 (file)
@@ -57,12 +57,11 @@ struct block_sm_superblock_t {
   }
 
   void validate() const {
-    ceph_assert(shard_num == seastar::smp::count);
     ceph_assert(block_size > 0);
     ceph_assert(segment_size > 0 &&
                 segment_size % block_size == 0);
     ceph_assert_always(segment_size <= SEGMENT_OFF_MAX);
-    for (unsigned int i = 0; i < seastar::smp::count; i ++) {
+    for (unsigned int i = 0; i < shard_num; i ++) {
       ceph_assert(shard_infos[i].size > segment_size &&
                   shard_infos[i].size % block_size == 0);
       ceph_assert_always(shard_infos[i].size <= DEVICE_OFF_MAX);
index b68b8cc8609f7db878b40e0363f67ce788458daf..f1bd3cace596b6b34fed667c4d0fefe65cfed671 100644 (file)
@@ -4,6 +4,8 @@
 #include <sys/mman.h>
 #include <string.h>
 
+#include <boost/range/irange.hpp>
+
 #include <fmt/format.h>
 
 #include <seastar/core/metrics.hh>
@@ -442,13 +444,53 @@ BlockSegmentManager::~BlockSegmentManager()
 {
 }
 
+seastar::future<> BlockSegmentManager::start(unsigned int shard_nums)
+{
+  LOG_PREFIX(BlockSegmentManager::start);
+  device_shard_nums = shard_nums;
+  auto num_shard_services = (device_shard_nums + seastar::smp::count - 1 ) / seastar::smp::count;
+  INFO("device_shard_nums={} seastar::smp={}, num_shard_services={}", device_shard_nums, seastar::smp::count, num_shard_services);
+  return shard_devices.start(num_shard_services, device_path, superblock.config.spec.dtype);
+
+}
+
+seastar::future<> BlockSegmentManager::stop()
+{
+  return shard_devices.stop();
+}
+
+Device& BlockSegmentManager::get_sharded_device(unsigned int store_index)
+{
+  assert(store_index < shard_devices.local().mshard_devices.size());
+  return *shard_devices.local().mshard_devices[store_index];
+}
+
+SegmentManager::read_ertr::future<unsigned int> BlockSegmentManager::get_shard_nums()
+{
+  return open_device(
+    device_path
+  ).safe_then([this](auto p) {
+    device = std::move(p.first);
+    auto sd = p.second;
+    return read_superblock(device, sd);
+  }).safe_then([](auto sb) {
+    return read_ertr::make_ready_future<unsigned int>(sb.shard_num);
+  }).handle_error(
+    crimson::ct_error::assert_all{
+      "Invalid error in BlockSegmentManager::get_shard_nums"
+    }
+  );
+}
+
 BlockSegmentManager::mount_ret BlockSegmentManager::mount()
 {
   return shard_devices.invoke_on_all([](auto &local_device) {
-    return local_device.shard_mount(
-    ).handle_error(
-      crimson::ct_error::assert_all{
-        "Invalid error in BlockSegmentManager::mount"
+    return seastar::do_for_each(local_device.mshard_devices, [](auto& mshard_device) {
+      return mshard_device->shard_mount(
+      ).handle_error(
+        crimson::ct_error::assert_all{
+          "Invalid error in BlockSegmentManager::mount"
+      });
     });
   });
 }
@@ -462,9 +504,17 @@ BlockSegmentManager::mount_ret BlockSegmentManager::shard_mount()
     device = std::move(p.first);
     auto sd = p.second;
     return read_superblock(device, sd);
-  }).safe_then([=, this](auto sb) {
+  }).safe_then([=, this](auto sb) ->mount_ertr::future<> {
     set_device_id(sb.config.spec.id);
-    shard_info = sb.shard_infos[seastar::this_shard_id()];
+    if(seastar::this_shard_id() + seastar::smp::count * store_index >= sb.shard_num) {
+      INFO("{} shard_id {} out of range {}",
+      device_id_printer_t{get_device_id()},
+        seastar::this_shard_id() + seastar::smp::count * store_index,
+        sb.shard_num);
+      store_active = false;
+      return mount_ertr::now();
+    }
+    shard_info = sb.shard_infos[seastar::this_shard_id() + seastar::smp::count * store_index];
     INFO("{} read {}", device_id_printer_t{get_device_id()}, shard_info);
     sb.validate();
     superblock = sb;
@@ -491,20 +541,22 @@ BlockSegmentManager::mount_ret BlockSegmentManager::shard_mount()
     });
   }).safe_then([this, FNAME] {
     INFO("{} complete", device_id_printer_t{get_device_id()});
-    register_metrics();
+    register_metrics(store_index);
   });
 }
 
 BlockSegmentManager::mkfs_ret BlockSegmentManager::mkfs(
   device_config_t sm_config)
 {
-  return shard_devices.local().primary_mkfs(sm_config
+  return shard_devices.local().mshard_devices[0]->primary_mkfs(sm_config
   ).safe_then([this] {
     return shard_devices.invoke_on_all([](auto &local_device) {
-      return local_device.shard_mkfs(
-      ).handle_error(
-        crimson::ct_error::assert_all{
-          "Invalid error in BlockSegmentManager::mkfs"
+      return seastar::do_for_each(local_device.mshard_devices, [](auto& mshard_device) {
+        return mshard_device->shard_mkfs(
+        ).handle_error(
+          crimson::ct_error::assert_all{
+            "Invalid error in BlockSegmentManager::mkfs"
+        });
       });
     });
   });
@@ -687,14 +739,23 @@ SegmentManager::read_ertr::future<> BlockSegmentManager::read(
     out);
 }
 
-void BlockSegmentManager::register_metrics()
+void BlockSegmentManager::register_metrics(unsigned int store_index)
 {
   LOG_PREFIX(BlockSegmentManager::register_metrics);
+  if (!store_active) {
+    INFO("{} shard {} is not active, skip registering metrics",
+         device_id_printer_t{get_device_id()}, store_index);
+    return;
+  }
+
   DEBUG("{}", device_id_printer_t{get_device_id()});
   namespace sm = seastar::metrics;
   std::vector<sm::label_instance> label_instances;
   label_instances.push_back(sm::label_instance("device_id", get_device_id()));
+  label_instances.push_back(
+    sm::label_instance("shard_device_index", std::to_string(store_index)));
   stats.reset();
+
   metrics.add_group(
     "segment_manager",
     {
@@ -702,61 +763,61 @@ void BlockSegmentManager::register_metrics()
         "data_read_num",
         stats.data_read.num,
         sm::description("total number of data read"),
-       label_instances
+        label_instances
       ),
       sm::make_counter(
         "data_read_bytes",
         stats.data_read.bytes,
         sm::description("total bytes of data read"),
-       label_instances
+        label_instances
       ),
       sm::make_counter(
         "data_write_num",
         stats.data_write.num,
         sm::description("total number of data write"),
-       label_instances
+        label_instances
       ),
       sm::make_counter(
         "data_write_bytes",
         stats.data_write.bytes,
         sm::description("total bytes of data write"),
-       label_instances
+        label_instances
       ),
       sm::make_counter(
         "metadata_write_num",
         stats.metadata_write.num,
         sm::description("total number of metadata write"),
-       label_instances
+        label_instances
       ),
       sm::make_counter(
         "metadata_write_bytes",
         stats.metadata_write.bytes,
         sm::description("total bytes of metadata write"),
-       label_instances
+        label_instances
       ),
       sm::make_counter(
         "opened_segments",
         stats.opened_segments,
         sm::description("total segments opened"),
-       label_instances
+        label_instances
       ),
       sm::make_counter(
         "closed_segments",
         stats.closed_segments,
         sm::description("total segments closed"),
-       label_instances
+        label_instances
       ),
       sm::make_counter(
         "closed_segments_unused_bytes",
         stats.closed_segments_unused_bytes,
         sm::description("total unused bytes of closed segments"),
-       label_instances
+        label_instances
       ),
       sm::make_counter(
         "released_segments",
         stats.released_segments,
         sm::description("total segments released"),
-       label_instances
+             label_instances
       ),
     }
   );
index a0445371016ccf9f213b15904d1f1b1e59d6ea22..05a253e3f6482174e07af358a6bd45f1e9c69946 100644 (file)
@@ -112,17 +112,12 @@ public:
 class BlockSegmentManager final : public SegmentManager {
 // interfaces used by Device
 public:
-  seastar::future<> start() {
-    return shard_devices.start(device_path, superblock.config.spec.dtype);
-  }
+  seastar::future<> start(unsigned int shard_nums) final;
 
-  seastar::future<> stop() {
-    return shard_devices.stop();
-  }
+  seastar::future<> stop() final;
+
+  Device& get_sharded_device(unsigned int store_index = 0) final;
 
-  Device& get_sharded_device() final {
-    return shard_devices.local();
-  }
   mount_ret mount() final;
 
   mkfs_ret mkfs(device_config_t) final;
@@ -132,8 +127,10 @@ public:
 
   BlockSegmentManager(
     const std::string &path,
-    device_type_t dtype)
-  : device_path(path) {
+    device_type_t dtype,
+    unsigned int store_index = 0)
+  : device_path(path),
+    store_index(store_index) {
     ceph_assert(get_device_type() == device_type_t::NONE);
     superblock.config.spec.dtype = dtype;
   }
@@ -149,6 +146,8 @@ public:
     size_t len,
     ceph::bufferptr &out) final;
 
+  read_ertr::future<unsigned int> get_shard_nums() final;
+
   device_type_t get_device_type() const final {
     return superblock.config.spec.dtype;
   }
@@ -213,7 +212,7 @@ private:
     }
   } stats;
 
-  void register_metrics();
+  void register_metrics(unsigned int store_index);
   seastar::metrics::metric_group metrics;
 
   std::string device_path;
@@ -256,7 +255,29 @@ private:
   // all shards mount
   mount_ret shard_mount();
 
-  seastar::sharded<BlockSegmentManager> shard_devices;
+  unsigned int device_shard_nums = 0;
+  unsigned int store_index = 0;
+  bool store_active = true;
+  class MultiShardDevices {
+    public:
+      std::vector<std::unique_ptr<BlockSegmentManager>> mshard_devices;
+
+    public:
+    MultiShardDevices(size_t count,
+                      const std::string path,
+                      device_type_t dtype)
+    : mshard_devices() {
+      mshard_devices.reserve(count);
+      for (size_t store_index = 0; store_index < count; ++store_index) {
+        mshard_devices.emplace_back(std::make_unique<BlockSegmentManager>(
+          path, dtype, store_index));
+      }
+    }
+    ~MultiShardDevices() {
+     mshard_devices.clear();
+    }
+  };
+  seastar::sharded<MultiShardDevices> shard_devices;
 };
 
 }