};
};
+namespace fs = std::filesystem;
+seastar::future<> CyanStore::get_shard_nums()
+{
+ store_shard_nums = 0;
+ for (const auto& entry : fs::directory_iterator(path)) {
+ const std::string filename = entry.path().filename().string();
+ if (filename.rfind("collections", 0) == 0) {
+ store_shard_nums++;
+ }
+ }
+ if (store_shard_nums == 0) {
+ // If no collections files found, assume seastar::smp::count shards
+ store_shard_nums = seastar::smp::count;
+ }
+ return seastar::make_ready_future<>();
+}
+
+seastar::future<unsigned int> CyanStore::start()
+{
+ ceph_assert(seastar::this_shard_id() == primary_core);
+ return get_shard_nums().then([this] {
+ auto num_shard_services = (store_shard_nums + seastar::smp::count - 1 ) / seastar::smp::count;
+ logger().info("store_shard_nums={} seastar::smp={}, num_shard_services={}", store_shard_nums, seastar::smp::count, num_shard_services);
+ return shard_stores.start(num_shard_services, path, store_shard_nums);
+ }).then([this] {
+ logger().debug("CyanStore started with {} shard stores", store_shard_nums);
+ return seastar::make_ready_future<unsigned int>(store_shard_nums);
+ });
+}
+
+seastar::future<> CyanStore::stop()
+{
+ logger().debug("stopping shard stores");
+ return shard_stores.stop();
+}
+
+CyanStore::mount_ertr::future<> CyanStore::mount()
+{
+ ceph_assert(seastar::this_shard_id() == primary_core);
+ return shard_stores.invoke_on_all([](auto &local_store) {
+ return seastar::do_for_each(local_store.mshard_stores, [](auto& mshard_store) {
+ return mshard_store->mount().handle_error(
+ crimson::ct_error::assert_all{
+ "Invalid error in CyanStore::mount"
+ });
+ });
+ });
+}
+
+seastar::future<> CyanStore::umount()
+{
+ ceph_assert(seastar::this_shard_id() == primary_core);
+ return shard_stores.invoke_on_all([](auto &local_store) {
+ return seastar::do_for_each(local_store.mshard_stores, [](auto& mshard_store) {
+ return mshard_store->umount();
+ });
+ });
+}
+
seastar::future<store_statfs_t> CyanStore::stat() const
{
ceph_assert(seastar::this_shard_id() == primary_core);
logger().debug("{}", __func__);
+
return shard_stores.map_reduce0(
- [](const CyanStore::Shard &local_store) {
- return local_store.get_used_bytes();
+ [](const auto& local_store) {
+ return seastar::map_reduce(
+ local_store.mshard_stores.begin(),
+ local_store.mshard_stores.end(),
+ [](const auto& mshard_store) {
+ return seastar::make_ready_future<uint64_t>(
+ mshard_store->get_used_bytes()
+ );
+ },
+ uint64_t{0},
+ std::plus<uint64_t>()
+ );
},
- (uint64_t)0,
+ uint64_t{0},
std::plus<uint64_t>()
).then([](uint64_t used_bytes) {
store_statfs_t st;
- st.total = crimson::common::local_conf().get_val<Option::size_t>("memstore_device_bytes");
+ st.total = crimson::common::local_conf()
+ .get_val<Option::size_t>("memstore_device_bytes");
st.available = st.total - used_bytes;
return seastar::make_ready_future<store_statfs_t>(std::move(st));
});
}).safe_then([this]{
return write_meta("type", "memstore");
}).safe_then([this] {
- return shard_stores.invoke_on_all(
- [](auto &local_store) {
- return local_store.mkfs();
+ return shard_stores.invoke_on_all([](auto &local_store) {
+ return seastar::do_for_each(local_store.mshard_stores, [](auto& mshard_store) {
+ return mshard_store->mkfs();
+ });
});
});
}
+CyanStore::Shard::Shard(
+ std::string path,
+ unsigned int store_shard_nums,
+ unsigned int store_index)
+ : path(path),
+ store_index(store_index)
+{
+ ceph_assert(store_index < store_shard_nums);
+ if(seastar::this_shard_id() + seastar::smp::count * store_index >= store_shard_nums) {
+ store_active = false;
+ }
+}
+
seastar::future<> CyanStore::Shard::mkfs()
{
+ if(!store_active) {
+ return seastar::now();
+ }
std::string fn =
path + "/collections" + std::to_string(seastar::this_shard_id());
ceph::bufferlist bl;
CyanStore::list_collections()
{
ceph_assert(seastar::this_shard_id() == primary_core);
- return seastar::do_with(std::vector<coll_core_t>{}, [this](auto &collections) {
- return shard_stores.map([](auto &local_store) {
- return local_store.list_collections();
- }).then([&collections](std::vector<std::vector<coll_core_t>> results) {
- for (auto& colls : results) {
- collections.insert(collections.end(), colls.begin(), colls.end());
+ return shard_stores.map_reduce0(
+ [this](auto& local_store) {
+ // For each local store, collect all collections from its mshard_stores
+ return seastar::map_reduce(
+ local_store.mshard_stores.begin(),
+ local_store.mshard_stores.end(),
+ [](auto& mshard_store) {
+ return mshard_store->list_collections();
+ },
+ std::vector<coll_core_t>(), // Initial empty vector
+ [](auto&& merged, auto&& result) { // Reduction function
+ merged.insert(merged.end(), result.begin(), result.end());
+ return std::move(merged);
}
- return seastar::make_ready_future<std::vector<coll_core_t>>(
- std::move(collections));
- });
+ );
+ },
+ std::vector<coll_core_t>(), // Initial empty vector for final reduction
+ [](auto&& total, auto&& shard_result) { // Final reduction function
+ total.insert(total.end(), shard_result.begin(), shard_result.end());
+ return std::move(total);
+ }
+ ).then([](auto all_collections) {
+ return seastar::make_ready_future<std::vector<coll_core_t>>(std::move(all_collections));
});
}
CyanStore::mount_ertr::future<> CyanStore::Shard::mount()
{
+ if (!store_active) {
+ return mount_ertr::now();
+ }
static const char read_file_errmsg[]{"read_file"};
ceph::bufferlist bl;
std::string fn =
- path + "/collections" + std::to_string(seastar::this_shard_id());
+ path + "/collections" + std::to_string(seastar::this_shard_id() + seastar::smp::count * store_index);
std::string err;
if (int r = bl.read_file(fn.c_str(), &err); r < 0) {
return crimson::stateful_ec{ singleton_ec<read_file_errmsg>() };
for (auto& coll : collections) {
std::string fn = fmt::format("{}/{}{}", path, coll,
- std::to_string(seastar::this_shard_id()));
+ std::to_string(seastar::this_shard_id() + seastar::smp::count * store_index));
ceph::bufferlist cbl;
if (int r = cbl.read_file(fn.c_str(), &err); r < 0) {
return crimson::stateful_ec{ singleton_ec<read_file_errmsg>() };
seastar::future<> CyanStore::Shard::umount()
{
+ if (!store_active) {
+ return seastar::now();
+ }
return seastar::do_with(std::set<coll_t>{}, [this](auto& collections) {
return seastar::do_for_each(coll_map, [&collections, this](auto& coll) {
auto& [col, ch] = coll;
ceph_assert(ch);
ch->encode(bl);
std::string fn = fmt::format("{}/{}{}", path, col,
- std::to_string(seastar::this_shard_id()));
+ std::to_string(seastar::this_shard_id()+ seastar::smp::count * store_index));
return crimson::write_file(std::move(bl), fn);
}).then([&collections, this] {
ceph::bufferlist bl;
ceph::encode(collections, bl);
std::string fn = fmt::format("{}/collections{}",
- path, std::to_string(seastar::this_shard_id()));
+ path, std::to_string(seastar::this_shard_id()+ seastar::smp::count * store_index));
return crimson::write_file(std::move(bl), fn);
});
});
uint64_t limit,
uint32_t op_flags) const
{
+ assert(store_active);
auto c = static_cast<Collection*>(ch.get());
logger().debug("{} {} {} {} {}",
__func__, c->get_cid(), start, end, limit);
seastar::future<CollectionRef>
CyanStore::Shard::create_new_collection(const coll_t& cid)
{
+ assert(store_active);
auto c = new Collection{cid};
new_coll_map[cid] = c;
return seastar::make_ready_future<CollectionRef>(c);
seastar::future<CollectionRef>
CyanStore::Shard::open_collection(const coll_t& cid)
{
+ assert(store_active);
return seastar::make_ready_future<CollectionRef>(_get_collection(cid));
}
seastar::future<std::vector<coll_core_t>>
CyanStore::Shard::list_collections()
{
+ if (!store_active) {
+ return seastar::make_ready_future<std::vector<coll_core_t>>();
+ }
std::vector<coll_core_t> collections;
for (auto& coll : coll_map) {
- collections.push_back(std::make_pair(coll.first, seastar::this_shard_id()));
+ collections.push_back(std::make_pair(coll.first, std::make_pair(seastar::this_shard_id(), store_index)));
}
return seastar::make_ready_future<std::vector<coll_core_t>>(std::move(collections));
}
const ghobject_t &oid,
uint32_t op_flags)
{
+ assert(store_active);
auto c = static_cast<Collection*>(ch.get());
if (!c->exists) {
return base_errorator::make_ready_future<bool>(false);
CyanStore::Shard::set_collection_opts(CollectionRef ch,
const pool_opts_t& opts)
{
+ assert(store_active);
auto c = static_cast<Collection*>(ch.get());
logger().debug("{} {}", __func__, c->get_cid());
c->pool_opts = opts;
size_t len,
uint32_t op_flags)
{
+ assert(store_active);
auto c = static_cast<Collection*>(ch.get());
logger().debug("{} {} {} {}~{}",
__func__, c->get_cid(), oid, offset, len);
interval_set<uint64_t>& m,
uint32_t op_flags)
{
+ assert(store_active);
return seastar::do_with(ceph::bufferlist{},
[this, ch, oid, &m, op_flags](auto& bl) {
return crimson::do_for_each(m,
std::string_view name,
uint32_t op_flags) const
{
+ assert(store_active);
auto c = static_cast<Collection*>(ch.get());
logger().debug("{} {} {}",
__func__, c->get_cid(), oid);
const ghobject_t& oid,
uint32_t op_flags)
{
+ assert(store_active);
auto c = static_cast<Collection*>(ch.get());
logger().debug("{} {} {}",
__func__, c->get_cid(), oid);
uint32_t op_flags)
-> read_errorator::future<omap_values_t>
{
+ assert(store_active);
auto c = static_cast<Collection*>(ch.get());
logger().debug("{} {} {}", __func__, c->get_cid(), oid);
auto o = c->get_object(oid);
uint32_t op_flags)
-> CyanStore::Shard::read_errorator::future<ObjectStore::omap_iter_ret_t>
{
+ assert(store_active);
auto c = static_cast<Collection*>(ch.get());
logger().debug("{} {} {}", __func__, c->get_cid(), oid);
auto o = c->get_object(oid);
uint32_t op_flags)
-> CyanStore::Shard::get_attr_errorator::future<ceph::bufferlist>
{
+ assert(store_active);
auto c = static_cast<Collection*>(ch.get());
auto o = c->get_object(oid);
if (!o) {
CollectionRef ch,
ceph::os::Transaction&& t)
{
+ assert(store_active);
using ceph::os::Transaction;
int r = 0;
try {
uint64_t len,
uint32_t op_flags)
{
+ assert(store_active);
auto c = static_cast<Collection*>(ch.get());
ObjectRef o = c->get_object(oid);
const ghobject_t& oid,
uint32_t op_flags)
{
+ assert(store_active);
auto c = static_cast<Collection*>(ch.get());
auto o = c->get_object(oid);
if (!o) {
#include <optional>
#include <seastar/core/future.hh>
#include <seastar/core/future-util.hh>
+#include <seastar/core/sharded.hh>
+#include <seastar/core/shared_ptr.hh>
#include "osd/osd_types.h"
#include "include/uuid.h"
public:
class Shard : public FuturizedStore::Shard {
public:
- Shard(std::string path)
- :path(path){}
+ Shard(std::string path,
+ unsigned int store_shard_nums,
+ unsigned int store_index = 0);
+ ~Shard() = default;
seastar::future<struct stat> stat(
CollectionRef c,
using coll_core_t = FuturizedStore::coll_core_t;
seastar::future<std::vector<coll_core_t>> list_collections();
- uint64_t get_used_bytes() const { return used_bytes; }
+ uint64_t get_used_bytes() const {
+ if (!store_active) {
+ return 0;
+ }
+ return used_bytes;
+ }
+
+ unsigned int get_store_index() const {
+ return store_index;
+ }
+ bool get_status() const {
+ return store_active;
+ }
private:
int _remove(const coll_t& cid, const ghobject_t& oid);
const std::string path;
std::unordered_map<coll_t, boost::intrusive_ptr<Collection>> coll_map;
std::map<coll_t, boost::intrusive_ptr<Collection>> new_coll_map;
+ unsigned int store_index;
+ bool store_active = true;
};
CyanStore(const std::string& path);
~CyanStore() final;
- seastar::future<> start() final {
- ceph_assert(seastar::this_shard_id() == primary_core);
- return shard_stores.start(path);
- }
+ seastar::future<unsigned int> start() final;
- seastar::future<> stop() final {
- ceph_assert(seastar::this_shard_id() == primary_core);
- return shard_stores.stop();
- }
+ seastar::future<> stop() final;
- mount_ertr::future<> mount() final {
- ceph_assert(seastar::this_shard_id() == primary_core);
- return shard_stores.invoke_on_all(
- [](auto &local_store) {
- return local_store.mount().handle_error(
- crimson::stateful_ec::assert_failure(
- fmt::format("error mounting cyanstore").c_str()));
- });
- }
+ mount_ertr::future<> mount() final;
- seastar::future<> umount() final {
- ceph_assert(seastar::this_shard_id() == primary_core);
- return shard_stores.invoke_on_all(
- [](auto &local_store) {
- return local_store.umount();
- });
- }
+ seastar::future<> umount() final;
mkfs_ertr::future<> mkfs(uuid_d new_osd_fsid) final;
seastar::future<> write_meta(const std::string& key,
const std::string& value) final;
- FuturizedStore::Shard& get_sharded_store() final{
- return shard_stores.local();
+ FuturizedStore::StoreShardRef get_sharded_store(unsigned int store_index = 0) final {
+ assert(!shard_stores.local().mshard_stores.empty());
+ assert(store_index < shard_stores.local().mshard_stores.size());
+ assert(shard_stores.local().mshard_stores[store_index]->get_status() == true);
+ return make_local_shared_foreign(
+ seastar::make_foreign(seastar::static_pointer_cast<FuturizedStore::Shard>(
+ shard_stores.local().mshard_stores[store_index])));
+ }
+ std::vector<FuturizedStore::StoreShardRef> get_sharded_stores() final{
+ std::vector<FuturizedStore::StoreShardRef> ret;
+ ret.reserve(shard_stores.local().mshard_stores.size());
+ for (auto& mshard_store : shard_stores.local().mshard_stores) {
+ if (mshard_store->get_status() == true) {
+ ret.emplace_back(make_local_shared_foreign(
+ seastar::make_foreign(seastar::static_pointer_cast<FuturizedStore::Shard>(mshard_store))));
+ }
+ }
+ return ret;
}
seastar::future<std::tuple<int, std::string>>
seastar::future<std::string> get_default_device_class() final;
+ seastar::future<> get_shard_nums();
+
+
private:
- seastar::sharded<CyanStore::Shard> shard_stores;
+class MultiShardStores {
+ public:
+ std::vector<seastar::shared_ptr<CyanStore::Shard>> mshard_stores;
+
+ public:
+ MultiShardStores(size_t count,
+ const std::string path,
+ unsigned int store_shard_nums)
+ : mshard_stores() {
+ mshard_stores.reserve(count); // Reserve space for the shards
+ for (size_t store_index = 0; store_index < count; ++store_index) {
+ mshard_stores.emplace_back(seastar::make_shared<CyanStore::Shard>(
+ path, store_shard_nums, store_index));
+ }
+ }
+ ~MultiShardStores() {
+ mshard_stores.clear();
+ }
+ };
+ seastar::sharded<CyanStore::MultiShardStores> shard_stores;
+ unsigned int store_shard_nums = 0;
const std::string path;
uuid_d osd_fsid;
};