: type(type),
path{path},
values(values),
- op_gates()
+ main_op_gates()
{
}
{
}
-seastar::future<> AlienStore::start()
+seastar::future<unsigned int> AlienStore::start()
{
cct = std::make_unique<CephContext>(
CEPH_ENTITY_TYPE_OSD,
cct->_conf.set_config_values(values);
cct->_log->start();
- store = ObjectStore::create(cct.get(), type, path);
- if (!store) {
+ main_store = ObjectStore::create(cct.get(), type, path);
+ if (!main_store) {
ceph_abort_msgf("unsupported objectstore type: %s", type.c_str());
}
/*
const auto num_threads =
get_conf<uint64_t>("crimson_bluestore_num_threads");
- tp = std::make_unique<crimson::os::ThreadPool>(num_threads, 128, alien_thread_cpu_cores);
- return tp->start();
+ main_tp = std::make_unique<crimson::os::ThreadPool>(num_threads, 128, alien_thread_cpu_cores);
+ return main_tp->start().then([this]() {
+ return shard_stores.start(main_tp.get(),
+ main_store.get(),
+ &main_op_gates,
+ &main_coll_map,
+ std::ref(main_coll_map_lock));
+ }).then([] () {
+ return seastar::make_ready_future<unsigned int>(seastar::smp::count);
+ });
}
seastar::future<> AlienStore::stop()
{
- if (!tp) {
+ if (!main_tp) {
// not really started yet
return seastar::now();
}
- return tp->submit([this] {
- store.reset();
- cct.reset();
- g_ceph_context = nullptr;
-
- }).then([this] {
- return tp->stop();
+ return shard_stores.stop()
+ .then([this]() {
+ return main_tp->submit([this] {
+ main_store.reset();
+ cct.reset();
+ g_ceph_context = nullptr;
+ }).then([this] {
+ return main_tp->stop();
+ });
});
}
-AlienStore::base_errorator::future<bool>
-AlienStore::exists(
+AlienStore::Shard::base_errorator::future<bool>
+AlienStore::Shard::exists(
CollectionRef ch,
const ghobject_t& oid,
uint32_t op_flags)
{
- return op_gates.simple_dispatch("exists", [=, this] {
+ return (*op_gates).simple_dispatch("exists", [=, this] {
return tp->submit(ch->get_cid().hash_to_shard(tp->size()), [=, this] {
auto c = static_cast<AlienCollection*>(ch.get());
return store->exists(c->collection, oid);
AlienStore::mount_ertr::future<> AlienStore::mount()
{
logger().debug("{}", __func__);
- assert(tp);
- return tp->submit([this] {
- return store->mount();
+ assert(main_tp);
+ return main_tp->submit([this] {
+ return main_store->mount();
}).then([] (const int r) -> mount_ertr::future<> {
if (r != 0) {
return crimson::stateful_ec{
seastar::future<> AlienStore::umount()
{
logger().info("{}", __func__);
- if (!tp) {
- // not really started yet
+ if (!main_tp) {
return seastar::now();
}
- return op_gates.close_all().then([this] {
- return tp->submit([this] {
+
+ return main_op_gates.close_all().then([this] {
+ return main_tp->submit([this] {
{
- std::lock_guard l(coll_map_lock);
- for (auto [cid, ch]: coll_map) {
- static_cast<AlienCollection*>(ch.get())->collection.reset();
- }
- coll_map.clear();
+ std::lock_guard l(main_coll_map_lock);
+ for (auto [cid, ch]: main_coll_map) {
+ static_cast<AlienCollection*>(ch.get())->collection.reset();
+ }
+ main_coll_map.clear();
}
- return store->umount();
- }).then([] (int r) {
+ return main_store->umount();
+ }).then([](int r) {
assert(r == 0);
return seastar::now();
});
AlienStore::mkfs_ertr::future<> AlienStore::mkfs(uuid_d osd_fsid)
{
logger().debug("{}", __func__);
- store->set_fsid(osd_fsid);
- assert(tp);
- return tp->submit([this] {
- return store->mkfs();
+ main_store->set_fsid(osd_fsid);
+ assert(main_tp);
+ return main_tp->submit([this] {
+ return main_store->mkfs();
}).then([] (int r) -> mkfs_ertr::future<> {
if (r != 0) {
return crimson::stateful_ec{
}
seastar::future<std::tuple<std::vector<ghobject_t>, ghobject_t>>
-AlienStore::list_objects(CollectionRef ch,
+AlienStore::Shard::list_objects(CollectionRef ch,
const ghobject_t& start,
const ghobject_t& end,
uint64_t limit,
});
}
-seastar::future<CollectionRef> AlienStore::create_new_collection(const coll_t& cid)
+seastar::future<CollectionRef> AlienStore::Shard::create_new_collection(const coll_t& cid)
{
logger().debug("{}", __func__);
assert(tp);
});
}
-seastar::future<CollectionRef> AlienStore::open_collection(const coll_t& cid)
+seastar::future<CollectionRef> AlienStore::Shard::open_collection(const coll_t& cid)
{
logger().debug("{}", __func__);
assert(tp);
}
seastar::future<std::vector<coll_core_t>> AlienStore::list_collections()
+{
+ return shard_stores.local().mshard_stores->shard_list_collections();
+}
+seastar::future<std::vector<coll_core_t>> AlienStore::Shard::shard_list_collections()
{
logger().debug("{}", __func__);
assert(tp);
ret.resize(ls.size());
std::transform(
ls.begin(), ls.end(), ret.begin(),
- [](auto p) { return std::make_pair(p, NULL_CORE); });
+ [](auto p) { return std::make_pair(p, std::make_pair(NULL_CORE, NULL_STORE_INDEX)); });
return seastar::make_ready_future<std::vector<coll_core_t>>(std::move(ret));
});
});
}
-seastar::future<> AlienStore::set_collection_opts(CollectionRef ch,
+seastar::future<> AlienStore::Shard::set_collection_opts(CollectionRef ch,
const pool_opts_t& opts)
{
logger().debug("{}", __func__);
});
}
-AlienStore::read_errorator::future<ceph::bufferlist>
-AlienStore::read(CollectionRef ch,
+AlienStore::Shard::read_errorator::future<ceph::bufferlist>
+AlienStore::Shard::read(CollectionRef ch,
const ghobject_t& oid,
uint64_t offset,
size_t len,
});
}
-AlienStore::read_errorator::future<ceph::bufferlist>
-AlienStore::readv(CollectionRef ch,
+AlienStore::Shard::read_errorator::future<ceph::bufferlist>
+AlienStore::Shard::readv(CollectionRef ch,
const ghobject_t& oid,
interval_set<uint64_t>& m,
uint32_t op_flags)
});
}
-AlienStore::get_attr_errorator::future<ceph::bufferlist>
-AlienStore::get_attr(CollectionRef ch,
+AlienStore::Shard::get_attr_errorator::future<ceph::bufferlist>
+AlienStore::Shard::get_attr(CollectionRef ch,
const ghobject_t& oid,
std::string_view name,
uint32_t op_flags) const
});
}
-AlienStore::get_attrs_ertr::future<AlienStore::attrs_t>
-AlienStore::get_attrs(CollectionRef ch,
+AlienStore::Shard::get_attrs_ertr::future<AlienStore::Shard::attrs_t>
+AlienStore::Shard::get_attrs(CollectionRef ch,
const ghobject_t& oid,
uint32_t op_flags)
{
});
}
-auto AlienStore::omap_get_values(CollectionRef ch,
+auto AlienStore::Shard::omap_get_values(CollectionRef ch,
const ghobject_t& oid,
const set<string>& keys,
uint32_t op_flags)
});
}
-AlienStore::read_errorator::future<ObjectStore::omap_iter_ret_t>
-AlienStore::omap_iterate(CollectionRef ch,
+AlienStore::Shard::read_errorator::future<ObjectStore::omap_iter_ret_t>
+AlienStore::Shard::omap_iterate(CollectionRef ch,
const ghobject_t &oid,
ObjectStore::omap_iter_seek_t start_from,
omap_iterate_cb_t callback,
});
}
-seastar::future<> AlienStore::do_transaction_no_callbacks(
+seastar::future<> AlienStore::Shard::do_transaction_no_callbacks(
CollectionRef ch,
ceph::os::Transaction&& txn)
{
});
}
-seastar::future<> AlienStore::inject_data_error(const ghobject_t& o)
+seastar::future<> AlienStore::Shard::inject_data_error(const ghobject_t& o)
{
logger().debug("{}", __func__);
assert(tp);
- return op_gates.simple_dispatch("inject_data_error", [=, this] {
+ return (*op_gates).simple_dispatch("inject_data_error", [=, this] {
return tp->submit([o, this] {
return store->inject_data_error(o);
});
});
}
-seastar::future<> AlienStore::inject_mdata_error(const ghobject_t& o)
+seastar::future<> AlienStore::Shard::inject_mdata_error(const ghobject_t& o)
{
logger().debug("{}", __func__);
assert(tp);
- return op_gates.simple_dispatch("inject_mdata_error", [=, this] {
+ return (*op_gates).simple_dispatch("inject_mdata_error", [=, this] {
return tp->submit([o, this] {
return store->inject_mdata_error(o);
});
const std::string& value)
{
logger().debug("{}", __func__);
- assert(tp);
- return op_gates.simple_dispatch("write_meta", [=, this] {
- return tp->submit([=, this] {
- return store->write_meta(key, value);
+ assert(main_tp);
+ return main_op_gates.simple_dispatch("write_meta", [=, this] {
+ return main_tp->submit([=, this] {
+ return main_store->write_meta(key, value);
}).then([] (int r) {
assert(r == 0);
return seastar::make_ready_future<>();
AlienStore::read_meta(const std::string& key)
{
logger().debug("{}", __func__);
- assert(tp);
- return op_gates.simple_dispatch("read_meta", [this, key] {
- return tp->submit([key, this] {
+ assert(main_tp);
+ return main_op_gates.simple_dispatch("read_meta", [this, key] {
+ return main_tp->submit([key, this] {
std::string value;
- int r = store->read_meta(key, &value);
+ int r = main_store->read_meta(key, &value);
if (r > 0) {
value.resize(r);
boost::algorithm::trim_right_if(value,
uuid_d AlienStore::get_fsid() const
{
logger().debug("{}", __func__);
- return store->get_fsid();
+ return main_store->get_fsid();
}
seastar::future<store_statfs_t> AlienStore::stat() const
+{
+ return shard_stores.local().mshard_stores->shard_stat();
+}
+seastar::future<store_statfs_t> AlienStore::Shard::shard_stat()
{
logger().info("{}", __func__);
assert(tp);
}
seastar::future<store_statfs_t> AlienStore::pool_statfs(int64_t pool_id) const
+{
+ return shard_stores.local().mshard_stores->shard_pool_statfs(pool_id);
+}
+seastar::future<store_statfs_t> AlienStore::Shard::shard_pool_statfs(int64_t pool_id)
{
logger().info("{}", __func__);
assert(tp);
});
}
-unsigned AlienStore::get_max_attr_name_length() const
+unsigned AlienStore::Shard::get_max_attr_name_length() const
{
logger().info("{}", __func__);
return 256;
}
-seastar::future<struct stat> AlienStore::stat(
+seastar::future<struct stat> AlienStore::Shard::stat(
CollectionRef ch,
const ghobject_t& oid,
uint32_t op_flags)
seastar::future<std::string> AlienStore::get_default_device_class()
{
logger().debug("{}", __func__);
- assert(tp);
- return op_gates.simple_dispatch("get_default_device_class", [=, this] {
- return tp->submit([=, this] {
- return store->get_default_device_class();
+ assert(main_tp);
+ return main_op_gates.simple_dispatch("get_default_device_class", [=, this] {
+ return main_tp->submit([=, this] {
+ return main_store->get_default_device_class();
}).then([] (std::string device_class) {
return seastar::make_ready_future<std::string>(device_class);
});
});
}
-auto AlienStore::omap_get_header(CollectionRef ch,
+auto AlienStore::Shard::omap_get_header(CollectionRef ch,
const ghobject_t& oid,
uint32_t op_flags)
-> get_attr_errorator::future<ceph::bufferlist>
});
}
-AlienStore::read_errorator::future<std::map<uint64_t, uint64_t>> AlienStore::fiemap(
+AlienStore::Shard::read_errorator::future<std::map<uint64_t, uint64_t>> AlienStore::Shard::fiemap(
CollectionRef ch,
const ghobject_t& oid,
uint64_t off,
});
}
-CollectionRef AlienStore::get_alien_coll_ref(ObjectStore::CollectionHandle c) {
+CollectionRef AlienStore::Shard::get_alien_coll_ref(ObjectStore::CollectionHandle c) {
std::lock_guard l(coll_map_lock);
CollectionRef ch;
- auto cp = coll_map.find(c->cid);
- if (cp == coll_map.end()) {
+ auto cp = coll_map->find(c->cid);
+ if (cp == coll_map->end()) {
ch = new AlienCollection(c);
- coll_map[c->cid] = ch;
+ (*coll_map)[c->cid] = ch;
} else {
ch = cp->second;
auto ach = static_cast<AlienCollection*>(ch.get());
#include <seastar/core/future.hh>
#include <seastar/core/shared_mutex.hh>
+#include <seastar/core/shared_ptr.hh>
#include "common/ceph_context.h"
#include "os/ObjectStore.h"
namespace crimson::os {
using coll_core_t = FuturizedStore::coll_core_t;
-class AlienStore final : public FuturizedStore,
- public FuturizedStore::Shard {
+class AlienStore final : public FuturizedStore {
+ class Shard : public FuturizedStore::Shard {
+ public:
+ Shard(crimson::os::ThreadPool* _tp,
+ ObjectStore* _store,
+ crimson::common::gate_per_shard* _op_gates,
+ std::unordered_map<coll_t, CollectionRef>* _coll_map,
+ std::mutex& _coll_map_lock)
+ : tp(_tp),
+ store(_store),
+ op_gates(_op_gates),
+ coll_map(_coll_map),
+ coll_map_lock(_coll_map_lock) {}
+
+ ~Shard() = default;
+
+ base_errorator::future<bool> exists(
+ CollectionRef c,
+ const ghobject_t& oid,
+ uint32_t op_flags = 0) final;
+
+ read_errorator::future<ceph::bufferlist> read(CollectionRef c,
+ const ghobject_t& oid,
+ uint64_t offset,
+ size_t len,
+ uint32_t op_flags = 0) final;
+ read_errorator::future<ceph::bufferlist> readv(CollectionRef c,
+ const ghobject_t& oid,
+ interval_set<uint64_t>& m,
+ uint32_t op_flags = 0) final;
+
+ get_attr_errorator::future<ceph::bufferlist> get_attr(
+ CollectionRef c,
+ const ghobject_t& oid,
+ std::string_view name,
+ uint32_t op_flags = 0) const final;
+ get_attrs_ertr::future<attrs_t> get_attrs(
+ CollectionRef c,
+ const ghobject_t& oid,
+ uint32_t op_flags = 0) final;
+
+ read_errorator::future<omap_values_t> omap_get_values(
+ CollectionRef c,
+ const ghobject_t& oid,
+ const omap_keys_t& keys,
+ uint32_t op_flags = 0) final;
+ get_attr_errorator::future<ceph::bufferlist> omap_get_header(
+ CollectionRef,
+ const ghobject_t&,
+ uint32_t) final;
+ read_errorator::future<std::map<uint64_t, uint64_t>> fiemap(
+ CollectionRef,
+ const ghobject_t&,
+ uint64_t off,
+ uint64_t len,
+ uint32_t op_flags) final;
+
+ seastar::future<std::tuple<std::vector<ghobject_t>, ghobject_t>> list_objects(
+ CollectionRef c,
+ const ghobject_t& start,
+ const ghobject_t& end,
+ uint64_t limit,
+ uint32_t op_flags = 0) const final;
+
+ read_errorator::future<ObjectStore::omap_iter_ret_t> omap_iterate(
+ CollectionRef c,
+ const ghobject_t &oid,
+ ObjectStore::omap_iter_seek_t start_from,
+ omap_iterate_cb_t callback,
+ uint32_t op_flags = 0) final;
+
+ seastar::future<CollectionRef> create_new_collection(const coll_t& cid) final;
+ seastar::future<CollectionRef> open_collection(const coll_t& cid) final;
+ seastar::future<std::vector<coll_core_t>> shard_list_collections();
+ seastar::future<> set_collection_opts(CollectionRef c,
+ const pool_opts_t& opts) final;
+ seastar::future<store_statfs_t> shard_stat();
+ seastar::future<store_statfs_t> shard_pool_statfs(int64_t pool_id);
+ seastar::future<struct stat> stat(
+ CollectionRef,
+ const ghobject_t&,
+ uint32_t op_flags = 0) final;
+
+ seastar::future<> do_transaction_no_callbacks(
+ CollectionRef c,
+ ceph::os::Transaction&& txn) final;
+
+ // error injection
+ seastar::future<> inject_data_error(const ghobject_t& o) final;
+ seastar::future<> inject_mdata_error(const ghobject_t& o) final;
+ unsigned get_max_attr_name_length() const final;
+
+ private:
+ CollectionRef get_alien_coll_ref(ObjectStore::CollectionHandle c);
+
+ template <class... Args>
+ auto do_with_op_gate(Args&&... args) const {
+ return (*op_gates).simple_dispatch("AlienStore::do_with_op_gate",
+ // perfect forwarding in lambda's closure isn't available in C++17
+ // using tuple as workaround; see: https://stackoverflow.com/a/49902823
+ [args = std::make_tuple(std::forward<Args>(args)...)] () mutable {
+ return std::apply([] (auto&&... args) {
+ return seastar::do_with(std::forward<decltype(args)>(args)...);
+ }, std::move(args));
+ });
+ }
+ crimson::os::ThreadPool* tp = nullptr; //for each shard
+ ObjectStore* store = nullptr; //for each shard
+ crimson::common::gate_per_shard* op_gates = nullptr; //for per shard
+ std::unordered_map<coll_t, CollectionRef>* coll_map = nullptr; // for per shard
+ std::mutex& coll_map_lock;
+ };
public:
AlienStore(const std::string& type,
const std::string& path,
const ConfigValues& values);
~AlienStore() final;
- seastar::future<> start() final;
+ seastar::future<unsigned int> start() final;
seastar::future<> stop() final;
- mount_ertr::future<> mount() final;
- seastar::future<> umount() final;
- base_errorator::future<bool> exists(
- CollectionRef c,
- const ghobject_t& oid,
- uint32_t op_flags = 0) final;
mkfs_ertr::future<> mkfs(uuid_d new_osd_fsid) final;
- read_errorator::future<ceph::bufferlist> read(CollectionRef c,
- const ghobject_t& oid,
- uint64_t offset,
- size_t len,
- uint32_t op_flags = 0) final;
- read_errorator::future<ceph::bufferlist> readv(CollectionRef c,
- const ghobject_t& oid,
- interval_set<uint64_t>& m,
- uint32_t op_flags = 0) final;
-
-
- get_attr_errorator::future<ceph::bufferlist> get_attr(
- CollectionRef c,
- const ghobject_t& oid,
- std::string_view name,
- uint32_t op_flags = 0) const final;
- get_attrs_ertr::future<attrs_t> get_attrs(
- CollectionRef c,
- const ghobject_t& oid,
- uint32_t op_flags = 0) final;
-
- read_errorator::future<omap_values_t> omap_get_values(
- CollectionRef c,
- const ghobject_t& oid,
- const omap_keys_t& keys,
- uint32_t op_flags = 0) final;
-
- seastar::future<std::tuple<std::vector<ghobject_t>, ghobject_t>> list_objects(
- CollectionRef c,
- const ghobject_t& start,
- const ghobject_t& end,
- uint64_t limit,
- uint32_t op_flags = 0) const final;
-
- read_errorator::future<ObjectStore::omap_iter_ret_t> omap_iterate(
- CollectionRef c,
- const ghobject_t &oid,
- ObjectStore::omap_iter_seek_t start_from,
- omap_iterate_cb_t callback,
- uint32_t op_flags = 0) final;
-
- seastar::future<CollectionRef> create_new_collection(const coll_t& cid) final;
- seastar::future<CollectionRef> open_collection(const coll_t& cid) final;
- seastar::future<std::vector<coll_core_t>> list_collections() final;
- seastar::future<> set_collection_opts(CollectionRef c,
- const pool_opts_t& opts) final;
-
- seastar::future<> do_transaction_no_callbacks(
- CollectionRef c,
- ceph::os::Transaction&& txn) final;
-
- // error injection
- seastar::future<> inject_data_error(const ghobject_t& o) final;
- seastar::future<> inject_mdata_error(const ghobject_t& o) final;
+ mount_ertr::future<> mount() final;
+ seastar::future<> umount() final;
seastar::future<> write_meta(const std::string& key,
const std::string& value) final;
seastar::future<std::tuple<int, std::string>> read_meta(
const std::string& key) final;
uuid_d get_fsid() const final;
+
seastar::future<store_statfs_t> stat() const final;
seastar::future<store_statfs_t> pool_statfs(int64_t pool_id) const final;
- unsigned get_max_attr_name_length() const final;
- seastar::future<struct stat> stat(
- CollectionRef,
- const ghobject_t&,
- uint32_t op_flags = 0) final;
+
seastar::future<std::string> get_default_device_class() final;
- get_attr_errorator::future<ceph::bufferlist> omap_get_header(
- CollectionRef,
- const ghobject_t&,
- uint32_t) final;
- read_errorator::future<std::map<uint64_t, uint64_t>> fiemap(
- CollectionRef,
- const ghobject_t&,
- uint64_t off,
- uint64_t len,
- uint32_t op_flags) final;
-
- FuturizedStore::Shard& get_sharded_store() final {
- return *this;
- }
-private:
+ seastar::future<std::vector<coll_core_t>> list_collections() final;
+
+ FuturizedStore::StoreShardRef get_sharded_store(unsigned int store_index = 0) final {
+ return make_local_shared_foreign(
+ seastar::make_foreign(seastar::static_pointer_cast<FuturizedStore::Shard>(
+ shard_stores.local().mshard_stores)));
+ }
- template <class... Args>
- auto do_with_op_gate(Args&&... args) const {
- return op_gates.simple_dispatch("AlienStore::do_with_op_gate",
- // perfect forwarding in lambda's closure isn't available in C++17
- // using tuple as workaround; see: https://stackoverflow.com/a/49902823
- [args = std::make_tuple(std::forward<Args>(args)...)] () mutable {
- return std::apply([] (auto&&... args) {
- return seastar::do_with(std::forward<decltype(args)>(args)...);
- }, std::move(args));
- });
+ std::vector<FuturizedStore::StoreShardRef> get_sharded_stores() final {
+ std::vector<FuturizedStore::StoreShardRef> ret;
+ ret.emplace_back(make_local_shared_foreign(
+ seastar::make_foreign(seastar::static_pointer_cast<FuturizedStore::Shard>(
+ shard_stores.local().mshard_stores))));
+ return ret;
}
- mutable std::unique_ptr<crimson::os::ThreadPool> tp;
+private:
+ mutable std::unique_ptr<crimson::os::ThreadPool> main_tp;
const std::string type;
const std::string path;
const ConfigValues values;
uint64_t used_bytes = 0;
- std::unique_ptr<ObjectStore> store;
+ std::unique_ptr<ObjectStore> main_store;
std::unique_ptr<CephContext> cct;
- mutable crimson::common::gate_per_shard op_gates;
+ mutable crimson::common::gate_per_shard main_op_gates;
+
+ class MultiShardStores {
+ public:
+ seastar::shared_ptr<AlienStore::Shard> mshard_stores;
+
+ public:
+ MultiShardStores(crimson::os::ThreadPool* _tp,
+ ObjectStore* _store,
+ crimson::common::gate_per_shard* _op_gates,
+ std::unordered_map<coll_t, CollectionRef>* _coll_map,
+ std::mutex& _coll_map_lock)
+ {
+ mshard_stores = seastar::make_shared<AlienStore::Shard>(
+ _tp, _store, _op_gates, _coll_map, _coll_map_lock);
+ }
+
+ ~MultiShardStores() {}
+ };
+
+ seastar::sharded<AlienStore::MultiShardStores> shard_stores;
/**
* coll_map
* coll_map is accessed exclusively from alien threadpool threads under the
* coll_map_lock.
*/
- std::mutex coll_map_lock;
- std::unordered_map<coll_t, CollectionRef> coll_map;
- CollectionRef get_alien_coll_ref(ObjectStore::CollectionHandle c);
+ std::mutex main_coll_map_lock;
+ std::unordered_map<coll_t, CollectionRef> main_coll_map;
+
};
+
}