namespace crimson::os {
using coll_core_t = FuturizedStore::coll_core_t;
-class AlienStore final : public FuturizedStore {
+class AlienStore final : public FuturizedStore,
+ public FuturizedStore::Shard {
public:
AlienStore(const std::string& type,
const std::string& path,
uint64_t off,
uint64_t len) final;
+ FuturizedStore::Shard& get_sharded_store() final {
+ return *this;
+ }
+
private:
template <class... Args>
auto do_with_op_gate(Args&&... args) const {
};
};
-seastar::future<> CyanStore::start()
-{
- return shard_stores.start(path);
-}
-
seastar::future<store_statfs_t> CyanStore::stat() const
{
+ ceph_assert(seastar::this_shard_id() == primary_core);
logger().debug("{}", __func__);
return shard_stores.map_reduce0(
- [](const CyanStore::ShardStores &local_store) {
+ [](const CyanStore::Shard &local_store) {
return local_store.get_used_bytes();
},
(uint64_t)0,
CyanStore::mkfs_ertr::future<> CyanStore::mkfs(uuid_d new_osd_fsid)
{
+ ceph_assert(seastar::this_shard_id() == primary_core);
static const char read_meta_errmsg[]{"read_meta"};
static const char parse_fsid_errmsg[]{"failed to parse fsid"};
static const char match_ofsid_errmsg[]{"unmatched osd_fsid"};
});
}
-seastar::future<> CyanStore::ShardStores::mkfs()
+seastar::future<> CyanStore::Shard::mkfs()
{
std::string fn =
path + "/collections" + std::to_string(seastar::this_shard_id());
return crimson::write_file(std::move(bl), fn);
}
+using coll_core_t = FuturizedStore::coll_core_t;
seastar::future<std::vector<coll_core_t>>
CyanStore::list_collections()
{
+ ceph_assert(seastar::this_shard_id() == primary_core);
return seastar::do_with(std::vector<coll_core_t>{}, [this](auto &collections) {
return shard_stores.map([](auto &local_store) {
return local_store.list_collections();
});
}
-CyanStore::mount_ertr::future<> CyanStore::ShardStores::mount()
+CyanStore::mount_ertr::future<> CyanStore::Shard::mount()
{
static const char read_file_errmsg[]{"read_file"};
ceph::bufferlist bl;
return mount_ertr::now();
}
-seastar::future<> CyanStore::ShardStores::umount()
+seastar::future<> CyanStore::Shard::umount()
{
return seastar::do_with(std::set<coll_t>{}, [this](auto& collections) {
return seastar::do_for_each(coll_map, [&collections, this](auto& coll) {
}
seastar::future<std::tuple<std::vector<ghobject_t>, ghobject_t>>
-CyanStore::ShardStores::list_objects(
+CyanStore::Shard::list_objects(
CollectionRef ch,
const ghobject_t& start,
const ghobject_t& end,
}
seastar::future<CollectionRef>
-CyanStore::ShardStores::create_new_collection(const coll_t& cid)
+CyanStore::Shard::create_new_collection(const coll_t& cid)
{
auto c = new Collection{cid};
new_coll_map[cid] = c;
}
seastar::future<CollectionRef>
-CyanStore::ShardStores::open_collection(const coll_t& cid)
+CyanStore::Shard::open_collection(const coll_t& cid)
{
return seastar::make_ready_future<CollectionRef>(_get_collection(cid));
}
seastar::future<std::vector<coll_core_t>>
-CyanStore::ShardStores::list_collections()
+CyanStore::Shard::list_collections()
{
std::vector<coll_core_t> collections;
for (auto& coll : coll_map) {
return seastar::make_ready_future<std::vector<coll_core_t>>(std::move(collections));
}
-CyanStore::read_errorator::future<ceph::bufferlist>
-CyanStore::ShardStores::read(
+CyanStore::Shard::read_errorator::future<ceph::bufferlist>
+CyanStore::Shard::read(
CollectionRef ch,
const ghobject_t& oid,
uint64_t offset,
return read_errorator::make_ready_future<ceph::bufferlist>(o->read(offset, l));
}
-CyanStore::read_errorator::future<ceph::bufferlist>
-CyanStore::ShardStores::readv(
+CyanStore::Shard::read_errorator::future<ceph::bufferlist>
+CyanStore::Shard::readv(
CollectionRef ch,
const ghobject_t& oid,
interval_set<uint64_t>& m,
});
}
-CyanStore::get_attr_errorator::future<ceph::bufferlist>
-CyanStore::ShardStores::get_attr(
+CyanStore::Shard::get_attr_errorator::future<ceph::bufferlist>
+CyanStore::Shard::get_attr(
CollectionRef ch,
const ghobject_t& oid,
std::string_view name) const
}
}
-CyanStore::get_attrs_ertr::future<CyanStore::attrs_t>
-CyanStore::ShardStores::get_attrs(
+CyanStore::Shard::get_attrs_ertr::future<CyanStore::Shard::attrs_t>
+CyanStore::Shard::get_attrs(
CollectionRef ch,
const ghobject_t& oid)
{
return get_attrs_ertr::make_ready_future<attrs_t>(o->xattr);
}
-auto CyanStore::ShardStores::omap_get_values(
+auto CyanStore::Shard::omap_get_values(
CollectionRef ch,
const ghobject_t& oid,
const omap_keys_t& keys)
return seastar::make_ready_future<omap_values_t>(std::move(values));
}
-auto CyanStore::ShardStores::omap_get_values(
+auto CyanStore::Shard::omap_get_values(
CollectionRef ch,
const ghobject_t &oid,
const std::optional<string> &start)
- -> read_errorator::future<std::tuple<bool, omap_values_t>>
+ -> CyanStore::Shard::read_errorator::future<std::tuple<bool, omap_values_t>>
{
auto c = static_cast<Collection*>(ch.get());
logger().debug("{} {} {}", __func__, c->get_cid(), oid);
std::make_tuple(true, std::move(values)));
}
-auto CyanStore::ShardStores::omap_get_header(
+auto CyanStore::Shard::omap_get_header(
CollectionRef ch,
const ghobject_t& oid)
- -> get_attr_errorator::future<ceph::bufferlist>
+ -> CyanStore::Shard::get_attr_errorator::future<ceph::bufferlist>
{
auto c = static_cast<Collection*>(ch.get());
auto o = c->get_object(oid);
o->omap_header);
}
-seastar::future<> CyanStore::ShardStores::do_transaction_no_callbacks(
+seastar::future<> CyanStore::Shard::do_transaction_no_callbacks(
CollectionRef ch,
ceph::os::Transaction&& t)
{
return seastar::now();
}
-int CyanStore::ShardStores::_remove(const coll_t& cid, const ghobject_t& oid)
+int CyanStore::Shard::_remove(const coll_t& cid, const ghobject_t& oid)
{
logger().debug("{} cid={} oid={}",
__func__, cid, oid);
return 0;
}
-int CyanStore::ShardStores::_touch(const coll_t& cid, const ghobject_t& oid)
+int CyanStore::Shard::_touch(const coll_t& cid, const ghobject_t& oid)
{
logger().debug("{} cid={} oid={}",
__func__, cid, oid);
return 0;
}
-int CyanStore::ShardStores::_write(
+int CyanStore::Shard::_write(
const coll_t& cid,
const ghobject_t& oid,
uint64_t offset,
return 0;
}
-int CyanStore::ShardStores::_zero(
+int CyanStore::Shard::_zero(
const coll_t& cid,
const ghobject_t& oid,
uint64_t offset,
return _write(cid, oid, offset, len, bl, 0);
}
-int CyanStore::ShardStores::_omap_clear(
+int CyanStore::Shard::_omap_clear(
const coll_t& cid,
const ghobject_t& oid)
{
return 0;
}
-int CyanStore::ShardStores::_omap_set_values(
+int CyanStore::Shard::_omap_set_values(
const coll_t& cid,
const ghobject_t& oid,
std::map<std::string, ceph::bufferlist> &&aset)
return 0;
}
-int CyanStore::ShardStores::_omap_set_header(
+int CyanStore::Shard::_omap_set_header(
const coll_t& cid,
const ghobject_t& oid,
const ceph::bufferlist &header)
return 0;
}
-int CyanStore::ShardStores::_omap_rmkeys(
+int CyanStore::Shard::_omap_rmkeys(
const coll_t& cid,
const ghobject_t& oid,
const omap_keys_t& aset)
return 0;
}
-int CyanStore::ShardStores::_omap_rmkeyrange(
+int CyanStore::Shard::_omap_rmkeyrange(
const coll_t& cid,
const ghobject_t& oid,
const std::string &first,
return 0;
}
-int CyanStore::ShardStores::_truncate(
+int CyanStore::Shard::_truncate(
const coll_t& cid,
const ghobject_t& oid,
uint64_t size)
return r;
}
-int CyanStore::ShardStores::_clone(
+int CyanStore::Shard::_clone(
const coll_t& cid,
const ghobject_t& oid,
const ghobject_t& noid)
return 0;
}
-int CyanStore::ShardStores::_setattrs(
+int CyanStore::Shard::_setattrs(
const coll_t& cid,
const ghobject_t& oid,
std::map<std::string,bufferlist>&& aset)
return 0;
}
-int CyanStore::ShardStores::_rm_attr(
+int CyanStore::Shard::_rm_attr(
const coll_t& cid,
const ghobject_t& oid,
std::string_view name)
return 0;
}
-int CyanStore::ShardStores::_rm_attrs(
+int CyanStore::Shard::_rm_attrs(
const coll_t& cid,
const ghobject_t& oid)
{
return 0;
}
-int CyanStore::ShardStores::_create_collection(const coll_t& cid, int bits)
+int CyanStore::Shard::_create_collection(const coll_t& cid, int bits)
{
auto result = coll_map.try_emplace(cid);
if (!result.second)
}
boost::intrusive_ptr<Collection>
-CyanStore::ShardStores::_get_collection(const coll_t& cid)
+CyanStore::Shard::_get_collection(const coll_t& cid)
{
auto cp = coll_map.find(cid);
if (cp == coll_map.end())
const std::string& key,
const std::string& value)
{
+ ceph_assert(seastar::this_shard_id() == primary_core);
std::string v = value;
v += "\n";
if (int r = safe_write_file(path.c_str(), key.c_str(),
seastar::future<std::tuple<int, std::string>>
CyanStore::read_meta(const std::string& key)
{
+ ceph_assert(seastar::this_shard_id() == primary_core);
std::string fsid(4096, '\0');
int r = safe_read_file(path.c_str(), key.c_str(), fsid.data(), fsid.size());
if (r > 0) {
uuid_d CyanStore::get_fsid() const
{
+ ceph_assert(seastar::this_shard_id() == primary_core);
return osd_fsid;
}
-unsigned CyanStore::get_max_attr_name_length() const
+unsigned CyanStore::Shard::get_max_attr_name_length() const
{
// arbitrary limitation exactly like in the case of MemStore.
return 256;
}
-CyanStore::read_errorator::future<std::map<uint64_t, uint64_t>>
-CyanStore::ShardStores::fiemap(
+CyanStore::Shard::read_errorator::future<std::map<uint64_t, uint64_t>>
+CyanStore::Shard::fiemap(
CollectionRef ch,
const ghobject_t& oid,
uint64_t off,
}
seastar::future<struct stat>
-CyanStore::ShardStores::stat(
+CyanStore::Shard::stat(
CollectionRef ch,
const ghobject_t& oid)
{
}
namespace crimson::os {
-using coll_core_t = FuturizedStore::coll_core_t;
class CyanStore final : public FuturizedStore {
- class ShardStores {
+ class Shard : public FuturizedStore::Shard {
public:
- ShardStores(std::string path)
+ Shard(std::string path)
:path(path){}
- mount_ertr::future<> mount();
-
- seastar::future<> umount();
-
- seastar::future<> mkfs();
-
- mkfs_ertr::future<> mkcoll(uuid_d new_osd_fsid);
-
seastar::future<struct stat> stat(
CollectionRef c,
- const ghobject_t& oid);
+ const ghobject_t& oid) final;
read_errorator::future<ceph::bufferlist> read(
CollectionRef c,
const ghobject_t& oid,
uint64_t offset,
size_t len,
- uint32_t op_flags = 0);
+ uint32_t op_flags = 0) final;
read_errorator::future<ceph::bufferlist> readv(
CollectionRef c,
const ghobject_t& oid,
interval_set<uint64_t>& m,
- uint32_t op_flags = 0);
+ uint32_t op_flags = 0) final;
get_attr_errorator::future<ceph::bufferlist> get_attr(
CollectionRef c,
const ghobject_t& oid,
- std::string_view name) const;
+ std::string_view name) const final;
get_attrs_ertr::future<attrs_t> get_attrs(
CollectionRef c,
- const ghobject_t& oid);
+ const ghobject_t& oid) final;
read_errorator::future<omap_values_t> omap_get_values(
CollectionRef c,
const ghobject_t& oid,
- const omap_keys_t& keys);
+ const omap_keys_t& keys) final;
read_errorator::future<std::tuple<bool, omap_values_t>> omap_get_values(
CollectionRef c, ///< [in] collection
const ghobject_t &oid, ///< [in] oid
const std::optional<std::string> &start ///< [in] start, empty for begin
- );
+ ) final;
+
+ get_attr_errorator::future<ceph::bufferlist> omap_get_header(
+ CollectionRef c,
+ const ghobject_t& oid) final;
seastar::future<std::tuple<std::vector<ghobject_t>, ghobject_t>>
list_objects(
CollectionRef c,
const ghobject_t& start,
const ghobject_t& end,
- uint64_t limit) const;
+ uint64_t limit) const final;
- get_attr_errorator::future<ceph::bufferlist> omap_get_header(
- CollectionRef c,
- const ghobject_t& oid);
-
- seastar::future<CollectionRef> create_new_collection(const coll_t& cid);
+ seastar::future<CollectionRef> create_new_collection(const coll_t& cid) final;
- seastar::future<CollectionRef> open_collection(const coll_t& cid);
-
- seastar::future<std::vector<coll_core_t>> list_collections();
+ seastar::future<CollectionRef> open_collection(const coll_t& cid) final;
seastar::future<> do_transaction_no_callbacks(
CollectionRef ch,
- ceph::os::Transaction&& txn);
+ ceph::os::Transaction&& txn) final;
read_errorator::future<std::map<uint64_t, uint64_t>>
fiemap(
CollectionRef c,
const ghobject_t& oid,
uint64_t off,
- uint64_t len);
+ uint64_t len) final;
+
+ unsigned get_max_attr_name_length() const final;
+
+ public:
+ // only exposed to CyanStore
+ mount_ertr::future<> mount();
+
+ seastar::future<> umount();
+
+ seastar::future<> mkfs();
+
+ mkfs_ertr::future<> mkcoll(uuid_d new_osd_fsid);
+
+ using coll_core_t = FuturizedStore::coll_core_t;
+ seastar::future<std::vector<coll_core_t>> list_collections();
uint64_t get_used_bytes() const { return used_bytes; }
const std::string &last);
int _truncate(const coll_t& cid, const ghobject_t& oid, uint64_t size);
int _clone(const coll_t& cid, const ghobject_t& oid,
- const ghobject_t& noid);
+ const ghobject_t& noid);
int _setattrs(const coll_t& cid, const ghobject_t& oid,
- std::map<std::string,bufferlist>&& aset);
+ std::map<std::string,bufferlist>&& aset);
int _rm_attr(const coll_t& cid, const ghobject_t& oid,
- std::string_view name);
+ std::string_view name);
int _rm_attrs(const coll_t& cid, const ghobject_t& oid);
int _create_collection(const coll_t& cid, int bits);
boost::intrusive_ptr<Collection> _get_collection(const coll_t& cid);
std::map<coll_t, boost::intrusive_ptr<Collection>> new_coll_map;
};
-// public interfaces called by main OSD
public:
CyanStore(const std::string& path);
~CyanStore() final;
+ seastar::future<> start() final {
+ ceph_assert(seastar::this_shard_id() == primary_core);
+ return shard_stores.start(path);
+ }
+
seastar::future<> stop() final {
+ ceph_assert(seastar::this_shard_id() == primary_core);
return shard_stores.stop();
}
- seastar::future<> start() final;
-
- mkfs_ertr::future<> mkfs(uuid_d new_osd_fsid) final;
-
- seastar::future<store_statfs_t> stat() const final;
-
- seastar::future<> write_meta(const std::string& key,
- const std::string& value) final;
-
- seastar::future<std::tuple<int, std::string>>
- read_meta(const std::string& key) final;
-
- uuid_d get_fsid() const final;
mount_ertr::future<> mount() final {
+ ceph_assert(seastar::this_shard_id() == primary_core);
return shard_stores.invoke_on_all(
[](auto &local_store) {
return local_store.mount().handle_error(
}
seastar::future<> umount() final {
+ ceph_assert(seastar::this_shard_id() == primary_core);
return shard_stores.invoke_on_all(
[](auto &local_store) {
return local_store.umount();
});
}
- seastar::future<std::vector<coll_core_t>> list_collections() final;
-
-// public interfaces called by each shard osd
-public:
- unsigned get_max_attr_name_length() const final;
-
- seastar::future<struct stat> stat(
- CollectionRef c,
- const ghobject_t& oid) final {
- return shard_stores.local().stat(
- c, oid);
- }
-
- read_errorator::future<ceph::bufferlist> read(
- CollectionRef c,
- const ghobject_t& oid,
- uint64_t offset,
- size_t len,
- uint32_t op_flags = 0) final {
- return shard_stores.local().read(
- c, oid, offset, len, op_flags);
- }
-
- read_errorator::future<ceph::bufferlist> readv(
- CollectionRef c,
- const ghobject_t& oid,
- interval_set<uint64_t>& m,
- uint32_t op_flags = 0) final {
- return shard_stores.local().readv(
- c, oid, m, op_flags);
- }
-
- get_attr_errorator::future<ceph::bufferlist> get_attr(
- CollectionRef c,
- const ghobject_t& oid,
- std::string_view name) const final {
- return shard_stores.local().get_attr(
- c, oid, name);
- }
-
- get_attrs_ertr::future<attrs_t> get_attrs(
- CollectionRef c,
- const ghobject_t& oid) final {
- return shard_stores.local().get_attrs(
- c, oid);
- }
-
- read_errorator::future<omap_values_t> omap_get_values(
- CollectionRef c,
- const ghobject_t& oid,
- const omap_keys_t& keys) final {
- return shard_stores.local().omap_get_values(
- c, oid, keys);
- }
-
- /// Retrieves paged set of values > start (if present)
- read_errorator::future<std::tuple<bool, omap_values_t>> omap_get_values(
- CollectionRef c, ///< [in] collection
- const ghobject_t &oid, ///< [in] oid
- const std::optional<std::string> &start ///< [in] start, empty for begin
- ) final {
- return shard_stores.local().omap_get_values(
- c, oid, start);
- }
+ mkfs_ertr::future<> mkfs(uuid_d new_osd_fsid) final;
- seastar::future<std::tuple<std::vector<ghobject_t>, ghobject_t>>
- list_objects(
- CollectionRef c,
- const ghobject_t& start,
- const ghobject_t& end,
- uint64_t limit) const final {
- return shard_stores.local().list_objects(
- c, start, end, limit);
- }
+ seastar::future<store_statfs_t> stat() const final;
- get_attr_errorator::future<ceph::bufferlist> omap_get_header(
- CollectionRef c,
- const ghobject_t& oid) final {
- return shard_stores.local().omap_get_header(
- c, oid);
- }
+ uuid_d get_fsid() const final;
- seastar::future<CollectionRef>
- create_new_collection(const coll_t& cid) final {
- return shard_stores.local().create_new_collection(cid);
- }
+ seastar::future<> write_meta(const std::string& key,
+ const std::string& value) final;
- seastar::future<CollectionRef>
- open_collection(const coll_t& cid) final {
- return shard_stores.local().open_collection(cid);
+ FuturizedStore::Shard& get_sharded_store() final{
+ return shard_stores.local();
}
- seastar::future<> do_transaction_no_callbacks(
- CollectionRef ch,
- ceph::os::Transaction&& txn) final {
- return shard_stores.local().do_transaction_no_callbacks(
- ch, std::move(txn));
- }
+ seastar::future<std::tuple<int, std::string>>
+ read_meta(const std::string& key) final;
- read_errorator::future<std::map<uint64_t, uint64_t>>
- fiemap(CollectionRef c,
- const ghobject_t& oid,
- uint64_t off,
- uint64_t len) final {
- return shard_stores.local().fiemap(
- c, oid, off, len);
- }
+ seastar::future<std::vector<coll_core_t>> list_collections() final;
private:
+ seastar::sharded<CyanStore::Shard> shard_stores;
const std::string path;
uuid_d osd_fsid;
- seastar::sharded<ShardStores> shard_stores;
};
}
namespace crimson::os {
-seastar::future<std::unique_ptr<FuturizedStore>>
+std::unique_ptr<FuturizedStore>
FuturizedStore::create(const std::string& type,
const std::string& data,
const ConfigValues& values)
{
if (type == "cyanstore") {
using crimson::os::CyanStore;
- return seastar::make_ready_future<std::unique_ptr<FuturizedStore>>(
- std::make_unique<CyanStore>(data));
+ return std::make_unique<CyanStore>(data);
} else if (type == "seastore") {
return crimson::os::seastore::make_seastore(
- data, values
- ).then([] (auto seastore) {
- return seastar::make_ready_future<std::unique_ptr<FuturizedStore>>(
- std::make_unique<ShardedStoreProxy<seastore::SeaStore>>(
- seastore.release()));
- });
+ data);
} else {
using crimson::os::AlienStore;
#ifdef WITH_BLUESTORE
// use AlienStore as a fallback. It adapts e.g. BlueStore.
- return seastar::make_ready_future<std::unique_ptr<FuturizedStore>>(
- std::make_unique<AlienStore>(type, data, values));
+ return std::make_unique<AlienStore>(type, data, values);
#else
ceph_abort_msgf("unsupported objectstore type: %s", type.c_str());
return {};
namespace crimson::os {
class FuturizedCollection;
+constexpr core_id_t PRIMARY_CORE = 0;
+
class FuturizedStore {
+public:
+ class Shard {
+ public:
+ Shard() = default;
+ virtual ~Shard() = default;
+ // no copying
+ explicit Shard(const Shard& o) = delete;
+ const Shard& operator=(const Shard& o) = delete;
+
+ using CollectionRef = boost::intrusive_ptr<FuturizedCollection>;
+ using read_errorator = crimson::errorator<crimson::ct_error::enoent,
+ crimson::ct_error::input_output_error>;
+ virtual read_errorator::future<ceph::bufferlist> read(
+ CollectionRef c,
+ const ghobject_t& oid,
+ uint64_t offset,
+ size_t len,
+ uint32_t op_flags = 0) = 0;
+
+ virtual read_errorator::future<ceph::bufferlist> readv(
+ CollectionRef c,
+ const ghobject_t& oid,
+ interval_set<uint64_t>& m,
+ uint32_t op_flags = 0) = 0;
+
+ using get_attr_errorator = crimson::errorator<
+ crimson::ct_error::enoent,
+ crimson::ct_error::enodata>;
+ virtual get_attr_errorator::future<ceph::bufferlist> get_attr(
+ CollectionRef c,
+ const ghobject_t& oid,
+ std::string_view name) const = 0;
+
+ using get_attrs_ertr = crimson::errorator<
+ crimson::ct_error::enoent>;
+ using attrs_t = std::map<std::string, ceph::bufferlist, std::less<>>;
+ virtual get_attrs_ertr::future<attrs_t> get_attrs(
+ CollectionRef c,
+ const ghobject_t& oid) = 0;
+
+ virtual seastar::future<struct stat> stat(
+ CollectionRef c,
+ const ghobject_t& oid) = 0;
+
+ using omap_values_t = std::map<std::string, ceph::bufferlist, std::less<>>;
+ using omap_keys_t = std::set<std::string>;
+ virtual read_errorator::future<omap_values_t> omap_get_values(
+ CollectionRef c,
+ const ghobject_t& oid,
+ const omap_keys_t& keys) = 0;
+
+ virtual read_errorator::future<std::tuple<bool, omap_values_t>> omap_get_values(
+ CollectionRef c, ///< [in] collection
+ const ghobject_t &oid, ///< [in] oid
+ const std::optional<std::string> &start ///< [in] start, empty for begin
+ ) = 0; ///< @return <done, values> values.empty() only if done
+
+ virtual get_attr_errorator::future<bufferlist> omap_get_header(
+ CollectionRef c,
+ const ghobject_t& oid) = 0;
+
+ virtual seastar::future<std::tuple<std::vector<ghobject_t>, ghobject_t>> list_objects(
+ CollectionRef c,
+ const ghobject_t& start,
+ const ghobject_t& end,
+ uint64_t limit) const = 0;
+
+ virtual seastar::future<CollectionRef> create_new_collection(const coll_t& cid) = 0;
+
+ virtual seastar::future<CollectionRef> open_collection(const coll_t& cid) = 0;
+
+ protected:
+ virtual seastar::future<> do_transaction_no_callbacks(
+ CollectionRef ch,
+ ceph::os::Transaction&& txn) = 0;
+
+ public:
+ seastar::future<> do_transaction(
+ CollectionRef ch,
+ ceph::os::Transaction&& txn) {
+ std::unique_ptr<Context> on_commit(
+ ceph::os::Transaction::collect_all_contexts(txn));
+ return do_transaction_no_callbacks(
+ std::move(ch), std::move(txn)
+ ).then([on_commit=std::move(on_commit)]() mutable {
+ auto c = on_commit.release();
+ if (c) c->complete(0);
+ return seastar::now();
+ });
+ }
+
+
+ /**
+ * flush
+ *
+ * Flushes outstanding transactions on ch, returned future resolves
+ * after any previously submitted transactions on ch have committed.
+ *
+ * @param ch [in] collection on which to flush
+ */
+ virtual seastar::future<> flush(CollectionRef ch) {
+ return do_transaction(ch, ceph::os::Transaction{});
+ }
+
+ // error injection
+ virtual seastar::future<> inject_data_error(const ghobject_t& o) {
+ return seastar::now();
+ }
+
+ virtual seastar::future<> inject_mdata_error(const ghobject_t& o) {
+ return seastar::now();
+ }
+
+ virtual read_errorator::future<std::map<uint64_t, uint64_t>> fiemap(
+ CollectionRef ch,
+ const ghobject_t& oid,
+ uint64_t off,
+ uint64_t len) = 0;
+
+ virtual unsigned get_max_attr_name_length() const = 0;
+ };
public:
- static seastar::future<std::unique_ptr<FuturizedStore>> create(const std::string& type,
+ static std::unique_ptr<FuturizedStore> create(const std::string& type,
const std::string& data,
const ConfigValues& values);
- FuturizedStore() = default;
+ FuturizedStore()
+ : primary_core(seastar::this_shard_id())
+ {}
+
virtual ~FuturizedStore() = default;
// no copying
explicit FuturizedStore(const FuturizedStore& o) = delete;
const FuturizedStore& operator=(const FuturizedStore& o) = delete;
- virtual seastar::future<> start() {
- return seastar::now();
- }
+ virtual seastar::future<> start() = 0;
+
virtual seastar::future<> stop() = 0;
using mount_ertr = crimson::errorator<crimson::stateful_ec>;
virtual mount_ertr::future<> mount() = 0;
+
virtual seastar::future<> umount() = 0;
using mkfs_ertr = crimson::errorator<crimson::stateful_ec>;
virtual mkfs_ertr::future<> mkfs(uuid_d new_osd_fsid) = 0;
- virtual seastar::future<store_statfs_t> stat() const = 0;
- using CollectionRef = boost::intrusive_ptr<FuturizedCollection>;
- using read_errorator = crimson::errorator<crimson::ct_error::enoent,
- crimson::ct_error::input_output_error>;
- virtual read_errorator::future<ceph::bufferlist> read(
- CollectionRef c,
- const ghobject_t& oid,
- uint64_t offset,
- size_t len,
- uint32_t op_flags = 0) = 0;
- virtual read_errorator::future<ceph::bufferlist> readv(
- CollectionRef c,
- const ghobject_t& oid,
- interval_set<uint64_t>& m,
- uint32_t op_flags = 0) = 0;
-
- using get_attr_errorator = crimson::errorator<
- crimson::ct_error::enoent,
- crimson::ct_error::enodata>;
- virtual get_attr_errorator::future<ceph::bufferlist> get_attr(
- CollectionRef c,
- const ghobject_t& oid,
- std::string_view name) const = 0;
-
- using get_attrs_ertr = crimson::errorator<
- crimson::ct_error::enoent>;
- using attrs_t = std::map<std::string, ceph::bufferlist, std::less<>>;
- virtual get_attrs_ertr::future<attrs_t> get_attrs(
- CollectionRef c,
- const ghobject_t& oid) = 0;
- virtual seastar::future<struct stat> stat(
- CollectionRef c,
- const ghobject_t& oid) = 0;
-
- using omap_values_t = std::map<std::string, ceph::bufferlist, std::less<>>;
- using omap_keys_t = std::set<std::string>;
- virtual read_errorator::future<omap_values_t> omap_get_values(
- CollectionRef c,
- const ghobject_t& oid,
- const omap_keys_t& keys) = 0;
- virtual seastar::future<std::tuple<std::vector<ghobject_t>, ghobject_t>> list_objects(
- CollectionRef c,
- const ghobject_t& start,
- const ghobject_t& end,
- uint64_t limit) const = 0;
- virtual read_errorator::future<std::tuple<bool, omap_values_t>> omap_get_values(
- CollectionRef c, ///< [in] collection
- const ghobject_t &oid, ///< [in] oid
- const std::optional<std::string> &start ///< [in] start, empty for begin
- ) = 0; ///< @return <done, values> values.empty() only if done
-
- virtual get_attr_errorator::future<bufferlist> omap_get_header(
- CollectionRef c,
- const ghobject_t& oid) = 0;
-
- virtual seastar::future<CollectionRef> create_new_collection(const coll_t& cid) = 0;
- virtual seastar::future<CollectionRef> open_collection(const coll_t& cid) = 0;
- using coll_core_t = std::pair<coll_t, core_id_t>;
- virtual seastar::future<std::vector<coll_core_t>> list_collections() = 0;
-
-protected:
- virtual seastar::future<> do_transaction_no_callbacks(
- CollectionRef ch,
- ceph::os::Transaction&& txn) = 0;
+ virtual seastar::future<store_statfs_t> stat() const = 0;
-public:
- seastar::future<> do_transaction(
- CollectionRef ch,
- ceph::os::Transaction&& txn) {
- std::unique_ptr<Context> on_commit(
- ceph::os::Transaction::collect_all_contexts(txn));
- return do_transaction_no_callbacks(
- std::move(ch), std::move(txn)
- ).then([on_commit=std::move(on_commit)]() mutable {
- auto c = on_commit.release();
- if (c) c->complete(0);
- return seastar::now();
- });
- }
-
-
- /**
- * flush
- *
- * Flushes outstanding transactions on ch, returned future resolves
- * after any previously submitted transactions on ch have committed.
- *
- * @param ch [in] collection on which to flush
- */
- virtual seastar::future<> flush(CollectionRef ch) {
- return do_transaction(ch, ceph::os::Transaction{});
- }
-
- // error injection
- virtual seastar::future<> inject_data_error(const ghobject_t& o) {
- return seastar::now();
- }
- virtual seastar::future<> inject_mdata_error(const ghobject_t& o) {
- return seastar::now();
- }
-
- virtual read_errorator::future<std::map<uint64_t, uint64_t>> fiemap(
- CollectionRef ch,
- const ghobject_t& oid,
- uint64_t off,
- uint64_t len) = 0;
+ virtual uuid_d get_fsid() const = 0;
virtual seastar::future<> write_meta(const std::string& key,
const std::string& value) = 0;
+ // called on the shard and get this FuturizedStore::shard;
+ virtual Shard& get_sharded_store() = 0;
+
virtual seastar::future<std::tuple<int, std::string>> read_meta(
const std::string& key) = 0;
- virtual uuid_d get_fsid() const = 0;
- virtual unsigned get_max_attr_name_length() const = 0;
-};
-
-/**
- * ShardedStoreProxy
- *
- * Simple helper to proxy FuturizedStore operations to the core on which
- * the store was initialized for implementations without support for multiple
- * reactors.
- */
-template <typename T>
-class ShardedStoreProxy : public FuturizedStore {
- const core_id_t core;
- std::unique_ptr<T> impl;
- uuid_d fsid;
- unsigned max_attr = 0;
-
- template <typename Method, typename... Args>
- decltype(auto) proxy(Method method, Args&&... args) const {
- return proxy_method_on_core(
- core, *impl, method, std::forward<Args>(args)...);
- }
-
- template <typename Method, typename... Args>
- decltype(auto) proxy(Method method, Args&&... args) {
- return proxy_method_on_core(
- core, *impl, method, std::forward<Args>(args)...);
- }
-public:
- ShardedStoreProxy(T *t)
- : core(seastar::this_shard_id()),
- impl(t) {}
- template <typename... Args>
- ShardedStoreProxy(Args&&... args)
- : core(seastar::this_shard_id()),
- impl(std::make_unique<T>(std::forward<Args>(args)...)) {}
- ~ShardedStoreProxy() = default;
+ using coll_core_t = std::pair<coll_t, core_id_t>;
+ virtual seastar::future<std::vector<coll_core_t>> list_collections() = 0;
- // no copying
- explicit ShardedStoreProxy(const ShardedStoreProxy &o) = delete;
- const ShardedStoreProxy &operator=(const ShardedStoreProxy &o) = delete;
-
- seastar::future<> start() final { return proxy(&T::start); }
- seastar::future<> stop() final { return proxy(&T::stop); }
- mount_ertr::future<> mount() final {
- auto ret = seastar::smp::submit_to(
- core,
- [this] {
- auto ret = impl->mount(
- ).safe_then([this] {
- fsid = impl->get_fsid();
- max_attr = impl->get_max_attr_name_length();
- return seastar::now();
- });
- return std::move(ret).to_base();
- });
- return mount_ertr::future<>(std::move(ret));
- }
- seastar::future<> umount() final { return proxy(&T::umount); }
- mkfs_ertr::future<> mkfs(uuid_d new_osd_fsid) final {
- return proxy(&T::mkfs, new_osd_fsid);
- }
- seastar::future<store_statfs_t> stat() const final {
- return crimson::submit_to(core, [this] { return impl->stat(); });
- }
- read_errorator::future<ceph::bufferlist> read(
- CollectionRef c,
- const ghobject_t &oid,
- uint64_t offset,
- size_t len,
- uint32_t op_flags = 0) final {
- return proxy(&T::read, std::move(c), oid, offset, len, op_flags);
- }
- read_errorator::future<ceph::bufferlist> readv(
- CollectionRef c,
- const ghobject_t &oid,
- interval_set<uint64_t> &m,
- uint32_t op_flags = 0) final {
- return crimson::submit_to(core, [this, c, oid, m, op_flags]() mutable {
- return impl->readv(c, oid, m, op_flags);
- });
- }
- get_attr_errorator::future<ceph::bufferlist> get_attr(
- CollectionRef c,
- const ghobject_t &oid,
- std::string_view name) const final {
- return proxy(&T::get_attr, std::move(c), oid, std::string(name));
- }
- get_attrs_ertr::future<attrs_t> get_attrs(
- CollectionRef c,
- const ghobject_t &oid) final {
- return proxy(&T::get_attrs, std::move(c), oid);
- }
- seastar::future<struct stat> stat(
- CollectionRef c,
- const ghobject_t &oid) final {
- return crimson::submit_to(
- core,
- [this, c, oid] {
- return impl->stat(c, oid);
- });
- }
- read_errorator::future<omap_values_t> omap_get_values(
- CollectionRef c,
- const ghobject_t &oid,
- const omap_keys_t &keys) final {
- return crimson::submit_to(core, [this, c, oid, keys] {
- return impl->omap_get_values(c, oid, keys);
- });
- }
- seastar::future<std::tuple<std::vector<ghobject_t>, ghobject_t>>
- list_objects(
- CollectionRef c,
- const ghobject_t &start,
- const ghobject_t &end,
- uint64_t limit) const final {
- return proxy(&T::list_objects, std::move(c), start, end, limit);
- }
- read_errorator::future<std::tuple<bool, omap_values_t>> omap_get_values(
- CollectionRef c,
- const ghobject_t &oid,
- const std::optional<std::string> &start) final {
- return crimson::submit_to(core, [this, c, oid, start] {
- return impl->omap_get_values(c, oid, start);
- });
- }
- get_attr_errorator::future<bufferlist> omap_get_header(
- CollectionRef c,
- const ghobject_t &oid) final {
- return proxy(&T::omap_get_header, std::move(c), oid);
- }
- seastar::future<CollectionRef> create_new_collection(const coll_t &cid) final {
- return proxy(&T::create_new_collection, cid);
- }
- seastar::future<CollectionRef> open_collection(const coll_t &cid) final {
- return proxy(&T::open_collection, cid);
- }
- seastar::future<std::vector<coll_core_t>> list_collections() final {
- return proxy(&T::list_collections);
- }
- seastar::future<> do_transaction_no_callbacks(
- CollectionRef ch,
- ceph::os::Transaction &&txn) final {
- return proxy(&T::do_transaction_no_callbacks, std::move(ch), std::move(txn));
- }
- seastar::future<> flush(CollectionRef ch) final {
- return proxy(&T::flush, std::move(ch));
- }
- seastar::future<> inject_data_error(const ghobject_t &o) final {
- return proxy(&T::inject_data_error, o);
- }
- seastar::future<> inject_mdata_error(const ghobject_t &o) final {
- return proxy(&T::inject_mdata_error, o);
- }
-
- read_errorator::future<std::map<uint64_t, uint64_t>> fiemap(
- CollectionRef ch,
- const ghobject_t &oid,
- uint64_t off,
- uint64_t len) final {
- return proxy(&T::fiemap, std::move(ch), oid, off, len);
- }
-
- seastar::future<> write_meta(
- const std::string &key,
- const std::string &value) final {
- return proxy(&T::write_meta, key, value);
- }
- seastar::future<std::tuple<int, std::string>> read_meta(
- const std::string &key) final {
- return proxy(&T::read_meta, key);
- }
- uuid_d get_fsid() const final {
- return fsid;
- }
- unsigned get_max_attr_name_length() const final {
- return max_attr;
- }
+protected:
+ const core_id_t primary_core;
};
-
}
using std::string;
using crimson::common::local_conf;
-template <> struct fmt::formatter<crimson::os::seastore::SeaStore::op_type_t>
+template <> struct fmt::formatter<crimson::os::seastore::op_type_t>
: fmt::formatter<std::string_view> {
- using op_type_t = crimson::os::seastore::SeaStore::op_type_t;
+ using op_type_t = crimson::os::seastore::op_type_t;
// parse is inherited from formatter<string_view>.
template <typename FormatContext>
auto format(op_type_t op, FormatContext& ctx) {
using crimson::common::get_conf;
-SeaStore::SeaStore(
- const std::string& root,
- MDStoreRef mdstore,
- DeviceRef dev,
+SeaStore::Shard::Shard(
+ std::string root,
+ Device* dev,
bool is_test)
- : root(root),
- mdstore(std::move(mdstore)),
- device(std::move(dev)),
- max_object_size(
- get_conf<uint64_t>("seastore_default_max_object_size")),
- is_test(is_test),
- throttler(
+ :root(root),
+ max_object_size(
+ get_conf<uint64_t>("seastore_default_max_object_size")),
+ is_test(is_test),
+ throttler(
get_conf<uint64_t>("seastore_max_concurrent_transactions"))
{
+ device.reset(dev);
register_metrics();
}
+SeaStore::SeaStore(
+ const std::string& root,
+ MDStoreRef mdstore)
+ : root(root),
+ mdstore(std::move(mdstore))
+{
+}
+
SeaStore::~SeaStore() = default;
-void SeaStore::register_metrics()
+void SeaStore::Shard::register_metrics()
{
namespace sm = seastar::metrics;
- using op_type_t = SeaStore::op_type_t;
+ using op_type_t = crimson::os::seastore::op_type_t;
std::pair<op_type_t, sm::label_instance> labels_by_op_type[] = {
{op_type_t::TRANSACTION, sm::label_instance("latency", "TRANSACTION")},
{op_type_t::READ, sm::label_instance("latency", "READ")},
);
}
+seastar::future<> SeaStore::start()
+{
+ ceph_assert(seastar::this_shard_id() == primary_core);
+#ifndef NDEBUG
+ bool is_test = true;
+#else
+ bool is_test = false;
+#endif
+ return shard_stores.start(root, nullptr, is_test)
+ .then([this] {
+ return shard_stores.invoke_on_all([](auto& local_store) {
+ return local_store.make_shard_stores();
+ });
+ });
+}
+
+seastar::future<> SeaStore::test_start(DeviceRef device)
+{
+ if (device) {
+ ceph_assert(root == "");
+ return shard_stores.start_single(root, device.release(), true);
+ } else {
+ ceph_assert(0 == "impossible no device");
+ }
+}
+
+
seastar::future<> SeaStore::stop()
{
+ ceph_assert(seastar::this_shard_id() == primary_core);
+ return shard_stores.stop();
+}
+
+seastar::future<> SeaStore::Shard::make_shard_stores()
+{
+ if (root != "") {
+ using crimson::common::get_conf;
+ std::string type = get_conf<std::string>("seastore_main_device_type");
+ device_type_t d_type = string_to_device_type(type);
+ assert(d_type == device_type_t::SSD ||
+ d_type == device_type_t::RANDOM_BLOCK_SSD);
+
+ return Device::make_device(
+ root, d_type
+ ).then([this](DeviceRef device_obj) {
+ device = std::move(device_obj);
+ });
+ }
return seastar::now();
}
SeaStore::mount_ertr::future<> SeaStore::test_mount()
{
- init_managers();
- return transaction_manager->mount(
- ).handle_error(
- crimson::ct_error::assert_all{
- "Invalid error in SeaStore::test_mount"
- }
- );
+
+ ceph_assert(seastar::this_shard_id() == primary_core);
+ shard_stores.local().init_managers();
+ return shard_stores.local().get_transaction_manager()->mount(
+ ).handle_error(
+ crimson::ct_error::assert_all{
+ "Invalid error in SeaStore::test_mount"
+ }
+ );
}
-SeaStore::mount_ertr::future<> SeaStore::mount()
+SeaStore::mount_ertr::future<> SeaStore::Shard::mount()
{
return device->mount(
).safe_then([this] {
device_id_t id = device_entry.first;
magic_t magic = device_entry.second.magic;
device_type_t dtype = device_entry.second.dtype;
- std::ostringstream oss;
- oss << root << "/block." << dtype << "." << std::to_string(id);
- return Device::make_device(oss.str(), dtype
+ std::string path =
+ fmt::format("{}/block.{}.{}", root, dtype, std::to_string(id));
+ return Device::make_device(path, dtype
).then([this, magic](DeviceRef sec_dev) {
return sec_dev->mount(
).safe_then([this, sec_dev=std::move(sec_dev), magic]() mutable {
return transaction_manager->mount();
}).handle_error(
crimson::ct_error::assert_all{
- "Invalid error in SeaStore::mount"
+ "Invalid error in Shard::mount"
}
);
}
-seastar::future<> SeaStore::umount()
+seastar::future<> SeaStore::Shard::umount()
{
return [this] {
if (transaction_manager) {
seastar::future<> SeaStore::write_fsid(uuid_d new_osd_fsid)
{
+ ceph_assert(seastar::this_shard_id() == primary_core);
LOG_PREFIX(SeaStore::write_fsid);
return read_meta("fsid").then([this, FNAME, new_osd_fsid] (auto tuple) {
auto [ret, fsid] = tuple;
});
}
-seastar::future<> SeaStore::_mkfs(uuid_d new_osd_fsid)
+seastar::future<>
+SeaStore::Shard::mkfs_managers()
{
init_managers();
return transaction_manager->mkfs(
});
});
});
- }).safe_then([this, new_osd_fsid] {
- return write_fsid(new_osd_fsid);
- }).safe_then([this] {
- return read_meta("type").then([this] (auto tuple) {
- auto [ret, type] = tuple;
- if (ret == 0 && type == "seastore") {
- return seastar::now();
- } else if (ret == 0 && type != "seastore") {
- LOG_PREFIX(SeaStore::mkfs);
- ERROR("expected seastore, but type is {}", type);
- throw std::runtime_error("store type error");
- } else {
- return write_meta("type", "seastore");
- }
+ }).handle_error(
+ crimson::ct_error::assert_all{
+ "Invalid error in Shard::mkfs_managers"
+ }
+ );
+}
+
+seastar::future<>
+SeaStore::Shard::mkfs(
+ secondary_device_set_t &sds,
+ uuid_d new_osd_fsid)
+{
+ device_type_t d_type = device->get_device_type();
+ device_id_t id = (d_type == device_type_t::RANDOM_BLOCK_SSD) ?
+ static_cast<device_id_t>(DEVICE_ID_RANDOM_BLOCK_MIN) : 0;
+
+ return device->mkfs(
+ device_config_t{
+ true,
+ device_spec_t{
+ (magic_t)std::rand(),
+ d_type,
+ id},
+ seastore_meta_t{new_osd_fsid},
+ sds}
+ ).safe_then([this] {
+ return crimson::do_for_each(secondaries, [](auto& sec_dev) {
+ return sec_dev->mount();
});
}).safe_then([this] {
- return write_meta("mkfs_done", "yes");
+ return device->mount();
+ }).safe_then([this] {
+ return mkfs_managers();
}).handle_error(
crimson::ct_error::assert_all{
- "Invalid error in SeaStore::_mkfs"
+ "Invalid error in SeaStore::Shard::mkfs"
}
);
}
-SeaStore::mkfs_ertr::future<> SeaStore::test_mkfs(uuid_d new_osd_fsid)
+seastar::future<> SeaStore::Shard::sec_mkfs(
+ const std::string path,
+ device_type_t dtype,
+ device_id_t id,
+ secondary_device_set_t &sds,
+ uuid_d new_osd_fsid)
+{
+ return Device::make_device(path, dtype
+ ).then([this, &sds, id, dtype, new_osd_fsid](DeviceRef sec_dev) {
+ magic_t magic = (magic_t)std::rand();
+ sds.emplace(
+ (device_id_t)id,
+ device_spec_t{magic, dtype, (device_id_t)id});
+ return sec_dev->mkfs(
+ device_config_t{
+ false,
+ device_spec_t{
+ magic,
+ dtype,
+ (device_id_t)id},
+ seastore_meta_t{new_osd_fsid},
+ secondary_device_set_t()}
+ ).safe_then([this, sec_dev=std::move(sec_dev), id]() mutable {
+ LOG_PREFIX(SeaStore::Shard::sec_mkfs);
+ DEBUG("mkfs: finished for device {}", id);
+ secondaries.emplace_back(std::move(sec_dev));
+ }).handle_error(crimson::ct_error::assert_all{"not possible"});
+ });
+}
+
+seastar::future<> SeaStore::_mkfs(uuid_d new_osd_fsid)
{
+ ceph_assert(seastar::this_shard_id() == primary_core);
+ return shard_stores.local().mkfs_managers(
+ ).then([this, new_osd_fsid] {
+ return prepare_meta(new_osd_fsid);
+ });
+}
+SeaStore::mkfs_ertr::future<> SeaStore::test_mkfs(uuid_d new_osd_fsid)
+{
+ ceph_assert(seastar::this_shard_id() == primary_core);
return read_meta("mkfs_done").then([this, new_osd_fsid] (auto tuple) {
auto [done, value] = tuple;
if (done == 0) {
});
}
+seastar::future<> SeaStore::prepare_meta(uuid_d new_osd_fsid)
+{
+ ceph_assert(seastar::this_shard_id() == primary_core);
+ return write_fsid(new_osd_fsid).then([this] {
+ return read_meta("type").then([this] (auto tuple) {
+ auto [ret, type] = tuple;
+ if (ret == 0 && type == "seastore") {
+ return seastar::now();
+ } else if (ret == 0 && type != "seastore") {
+ LOG_PREFIX(SeaStore::prepare_meta);
+ ERROR("expected seastore, but type is {}", type);
+ throw std::runtime_error("store type error");
+ } else {
+ return write_meta("type", "seastore");
+ }
+ });
+ }).then([this] {
+ return write_meta("mkfs_done", "yes");
+ });
+}
+
SeaStore::mkfs_ertr::future<> SeaStore::mkfs(uuid_d new_osd_fsid)
{
+ ceph_assert(seastar::this_shard_id() == primary_core);
return read_meta("mkfs_done").then([this, new_osd_fsid] (auto tuple) {
auto [done, value] = tuple;
if (done == 0) {
return seastar::now();
} else {
return seastar::do_with(
- secondary_device_set_t(),
+ std::vector<secondary_device_set_t>(),
[this, new_osd_fsid](auto& sds) {
+ sds.resize(seastar::smp::count);
auto fut = seastar::now();
LOG_PREFIX(SeaStore::mkfs);
DEBUG("root: {}", root);
return seastar::now();
}
auto id = std::stoi(entry_name.substr(dtype_end + 1));
- std::ostringstream oss;
- oss << root << "/" << entry_name;
- return Device::make_device(oss.str(), dtype
- ).then([this, &sds, id, dtype, new_osd_fsid](DeviceRef sec_dev) {
- magic_t magic = (magic_t)std::rand();
- sds.emplace(
- (device_id_t)id,
- device_spec_t{magic, dtype, (device_id_t)id});
- return sec_dev->mkfs(device_config_t{
- false,
- device_spec_t{
- magic,
- dtype,
- (device_id_t)id},
- seastore_meta_t{new_osd_fsid},
- secondary_device_set_t()}
- ).safe_then([this, sec_dev=std::move(sec_dev), id]() mutable {
- LOG_PREFIX(SeaStore::mkfs);
- DEBUG("mkfs: finished for device {}", id);
- secondaries.emplace_back(std::move(sec_dev));
- }).handle_error(crimson::ct_error::assert_all{"not possible"});
+ std::string path = fmt::format("{}/{}", root, entry_name);
+ return shard_stores.invoke_on_all(
+ [&sds, id, path, dtype, new_osd_fsid]
+ (auto &local_store) {
+ return local_store.sec_mkfs(
+ path,
+ dtype,
+ id,
+ sds[seastar::this_shard_id()],
+ new_osd_fsid);
});
}
return seastar::now();
});
}
return fut.then([this, &sds, new_osd_fsid] {
- device_id_t id = 0;
- device_type_t d_type = device->get_device_type();
- assert(d_type == device_type_t::SSD ||
- d_type == device_type_t::RANDOM_BLOCK_SSD);
- if (d_type == device_type_t::RANDOM_BLOCK_SSD) {
- id = static_cast<device_id_t>(DEVICE_ID_RANDOM_BLOCK_MIN);
- }
-
- return device->mkfs(
- device_config_t{
- true,
- device_spec_t{
- (magic_t)std::rand(),
- d_type,
- id},
- seastore_meta_t{new_osd_fsid},
- sds}
- );
- }).safe_then([this] {
- return crimson::do_for_each(secondaries, [](auto& sec_dev) {
- return sec_dev->mount();
+ return shard_stores.invoke_on_all(
+ [&sds, new_osd_fsid](auto &local_store) {
+ return local_store.mkfs(
+ sds[seastar::this_shard_id()], new_osd_fsid);
});
});
- }).safe_then([this] {
- return device->mount();
- }).safe_then([this, new_osd_fsid] {
- return _mkfs(new_osd_fsid);
- }).safe_then([this] {
+ }).then([this, new_osd_fsid] {
+ return prepare_meta(new_osd_fsid);
+ }).then([this] {
return umount();
- }).handle_error(
- crimson::ct_error::assert_all{
- "Invalid error in SeaStore::mkfs"
- }
- );
+ });
+ }
+ });
+}
+
+using coll_core_t = FuturizedStore::coll_core_t;
+seastar::future<std::vector<coll_core_t>>
+SeaStore::list_collections()
+{
+ ceph_assert(seastar::this_shard_id() == primary_core);
+ return shard_stores.map([](auto &local_store) {
+ return local_store.list_collections();
+ }).then([](std::vector<std::vector<coll_core_t>> results) {
+ std::vector<coll_core_t> collections;
+ for (auto& colls : results) {
+ collections.insert(collections.end(), colls.begin(), colls.end());
}
+ return seastar::make_ready_future<std::vector<coll_core_t>>(
+ std::move(collections));
});
}
+store_statfs_t SeaStore::Shard::stat() const
+{
+ return transaction_manager->store_stat();
+}
+
seastar::future<store_statfs_t> SeaStore::stat() const
{
+ ceph_assert(seastar::this_shard_id() == primary_core);
LOG_PREFIX(SeaStore::stat);
DEBUG("");
- return seastar::make_ready_future<store_statfs_t>(
- transaction_manager->store_stat()
- );
+ return shard_stores.map_reduce0(
+ [](const SeaStore::Shard &local_store) {
+ return local_store.stat();
+ },
+ store_statfs_t(),
+ [](auto &&ss, auto &&ret) {
+ ss.add(ret);
+ return std::move(ss);
+ }
+ ).then([](store_statfs_t ss) {
+ return seastar::make_ready_future<store_statfs_t>(std::move(ss));
+ });
}
TransactionManager::read_extent_iertr::future<std::optional<unsigned>>
-SeaStore::get_coll_bits(CollectionRef ch, Transaction &t) const
+SeaStore::Shard::get_coll_bits(CollectionRef ch, Transaction &t) const
{
return transaction_manager->read_collection_root(t)
.si_then([this, ch, &t](auto coll_root) {
}
seastar::future<std::tuple<std::vector<ghobject_t>, ghobject_t>>
-SeaStore::list_objects(CollectionRef ch,
+SeaStore::Shard::list_objects(CollectionRef ch,
const ghobject_t& start,
const ghobject_t& end,
uint64_t limit) const
std::vector<ghobject_t>(),
ghobject_t::get_max()));
} else {
- auto filter = get_objs_range(ch, *bits);
+ auto filter = SeaStore::get_objs_range(ch, *bits);
using list_iertr = OnodeManager::list_onodes_iertr;
using repeat_ret = list_iertr::future<seastar::stop_iteration>;
return trans_intr::repeat(
});
}
-seastar::future<CollectionRef> SeaStore::create_new_collection(const coll_t& cid)
+seastar::future<CollectionRef>
+SeaStore::Shard::create_new_collection(const coll_t& cid)
{
LOG_PREFIX(SeaStore::create_new_collection);
DEBUG("{}", cid);
return seastar::make_ready_future<CollectionRef>(_get_collection(cid));
}
-seastar::future<CollectionRef> SeaStore::open_collection(const coll_t& cid)
+seastar::future<CollectionRef>
+SeaStore::Shard::open_collection(const coll_t& cid)
{
LOG_PREFIX(SeaStore::open_collection);
DEBUG("{}", cid);
return list_collections().then([cid, this] (auto colls_cores) {
- if (auto found = std::find(colls_cores.begin(), colls_cores.end(),
- std::make_pair(cid, NULL_CORE));
+ if (auto found = std::find(colls_cores.begin(),
+ colls_cores.end(),
+ std::make_pair(cid, seastar::this_shard_id()));
found != colls_cores.end()) {
return seastar::make_ready_future<CollectionRef>(_get_collection(cid));
} else {
});
}
-seastar::future<std::vector<coll_core_t>> SeaStore::list_collections()
+seastar::future<std::vector<coll_core_t>>
+SeaStore::Shard::list_collections()
{
return seastar::do_with(
std::vector<coll_core_t>(),
ret.resize(colls.size());
std::transform(
colls.begin(), colls.end(), ret.begin(),
- [](auto p) { return std::make_pair(p.first, NULL_CORE); });
+ [](auto p) {
+ return std::make_pair(p.first, seastar::this_shard_id());
+ });
});
});
}).safe_then([&ret] {
);
}
-SeaStore::read_errorator::future<ceph::bufferlist> SeaStore::read(
+SeaStore::Shard::read_errorator::future<ceph::bufferlist>
+SeaStore::Shard::read(
CollectionRef ch,
const ghobject_t& oid,
uint64_t offset,
});
}
-SeaStore::read_errorator::future<ceph::bufferlist> SeaStore::readv(
+SeaStore::Shard::read_errorator::future<ceph::bufferlist>
+SeaStore::Shard::readv(
CollectionRef ch,
const ghobject_t& _oid,
interval_set<uint64_t>& m,
using crimson::os::seastore::omap_manager::BtreeOMapManager;
-SeaStore::get_attr_errorator::future<ceph::bufferlist> SeaStore::get_attr(
+SeaStore::Shard::get_attr_errorator::future<ceph::bufferlist>
+SeaStore::Shard::get_attr(
CollectionRef ch,
const ghobject_t& oid,
std::string_view name) const
}), crimson::ct_error::pass_further_all{});
}
-SeaStore::get_attrs_ertr::future<SeaStore::attrs_t> SeaStore::get_attrs(
+SeaStore::Shard::get_attrs_ertr::future<SeaStore::Shard::attrs_t>
+SeaStore::Shard::get_attrs(
CollectionRef ch,
const ghobject_t& oid)
{
}), crimson::ct_error::pass_further_all{});
}
-seastar::future<struct stat> SeaStore::stat(
+seastar::future<struct stat> SeaStore::Shard::stat(
CollectionRef c,
const ghobject_t& oid)
{
);
}
-SeaStore::get_attr_errorator::future<ceph::bufferlist>
-SeaStore::omap_get_header(
+SeaStore::Shard::get_attr_errorator::future<ceph::bufferlist>
+SeaStore::Shard::omap_get_header(
CollectionRef ch,
const ghobject_t& oid)
{
return get_attr(ch, oid, OMAP_HEADER_XATTR_KEY);
}
-SeaStore::read_errorator::future<SeaStore::omap_values_t>
-SeaStore::omap_get_values(
+SeaStore::Shard::read_errorator::future<SeaStore::Shard::omap_values_t>
+SeaStore::Shard::omap_get_values(
CollectionRef ch,
const ghobject_t &oid,
const omap_keys_t &keys)
});
}
-SeaStore::_omap_get_value_ret SeaStore::_omap_get_value(
+SeaStore::Shard::_omap_get_value_ret
+SeaStore::Shard::_omap_get_value(
Transaction &t,
omap_root_t &&root,
std::string_view key) const
);
}
-SeaStore::_omap_get_values_ret SeaStore::_omap_get_values(
+SeaStore::Shard::_omap_get_values_ret
+SeaStore::Shard::_omap_get_values(
Transaction &t,
omap_root_t &&omap_root,
const omap_keys_t &keys) const
);
}
-SeaStore::omap_list_ret SeaStore::omap_list(
+SeaStore::Shard::omap_list_ret
+SeaStore::Shard::omap_list(
Onode &onode,
const omap_root_le_t& omap_root,
Transaction& t,
});
}
-SeaStore::omap_get_values_ret_t SeaStore::omap_get_values(
+SeaStore::Shard::omap_get_values_ret_t
+SeaStore::Shard::omap_get_values(
CollectionRef ch,
const ghobject_t &oid,
const std::optional<string> &start)
auto c = static_cast<SeastoreCollection*>(ch.get());
LOG_PREFIX(SeaStore::omap_get_values);
DEBUG("{} {}", c->get_cid(), oid);
- using ret_bare_t = std::tuple<bool, SeaStore::omap_values_t>;
+ using ret_bare_t = std::tuple<bool, SeaStore::Shard::omap_values_t>;
return repeat_with_onode<ret_bare_t>(
c,
oid,
});
}
-SeaStore::_fiemap_ret SeaStore::_fiemap(
+SeaStore::Shard::_fiemap_ret SeaStore::Shard::_fiemap(
Transaction &t,
Onode &onode,
uint64_t off,
});
}
-SeaStore::read_errorator::future<std::map<uint64_t, uint64_t>> SeaStore::fiemap(
+SeaStore::Shard::read_errorator::future<std::map<uint64_t, uint64_t>>
+SeaStore::Shard::fiemap(
CollectionRef ch,
const ghobject_t& oid,
uint64_t off,
});
}
-void SeaStore::on_error(ceph::os::Transaction &t) {
+void SeaStore::Shard::on_error(ceph::os::Transaction &t) {
LOG_PREFIX(SeaStore::on_error);
ERROR(" transaction dump:\n");
JSONFormatter f(true);
abort();
}
-seastar::future<> SeaStore::do_transaction_no_callbacks(
+seastar::future<> SeaStore::Shard::do_transaction_no_callbacks(
CollectionRef _ch,
ceph::os::Transaction&& _t)
{
}
-seastar::future<> SeaStore::flush(CollectionRef ch)
+seastar::future<> SeaStore::Shard::flush(CollectionRef ch)
{
return seastar::do_with(
get_dummy_ordering_handle(),
});
}
-SeaStore::tm_ret SeaStore::_do_transaction_step(
+SeaStore::Shard::tm_ret
+SeaStore::Shard::_do_transaction_step(
internal_context_t &ctx,
CollectionRef &col,
std::vector<OnodeRef> &onodes,
);
}
-SeaStore::tm_ret SeaStore::_remove(
+SeaStore::Shard::tm_ret
+SeaStore::Shard::_remove(
internal_context_t &ctx,
OnodeRef &onode)
{
);
}
-SeaStore::tm_ret SeaStore::_touch(
+SeaStore::Shard::tm_ret
+SeaStore::Shard::_touch(
internal_context_t &ctx,
OnodeRef &onode)
{
return tm_iertr::now();
}
-SeaStore::tm_ret SeaStore::_write(
+SeaStore::Shard::tm_ret
+SeaStore::Shard::_write(
internal_context_t &ctx,
OnodeRef &onode,
uint64_t offset, size_t len,
});
}
-SeaStore::tm_ret SeaStore::_zero(
+SeaStore::Shard::tm_ret
+SeaStore::Shard::_zero(
internal_context_t &ctx,
OnodeRef &onode,
objaddr_t offset,
});
}
-SeaStore::omap_set_kvs_ret
-SeaStore::_omap_set_kvs(
+SeaStore::Shard::omap_set_kvs_ret
+SeaStore::Shard::_omap_set_kvs(
OnodeRef &onode,
const omap_root_le_t& omap_root,
Transaction& t,
);
}
-SeaStore::tm_ret SeaStore::_omap_set_values(
+SeaStore::Shard::tm_ret
+SeaStore::Shard::_omap_set_values(
internal_context_t &ctx,
OnodeRef &onode,
std::map<std::string, ceph::bufferlist> &&aset)
std::move(aset));
}
-SeaStore::tm_ret SeaStore::_omap_set_header(
+SeaStore::Shard::tm_ret
+SeaStore::Shard::_omap_set_header(
internal_context_t &ctx,
OnodeRef &onode,
ceph::bufferlist &&header)
return _setattrs(ctx, onode,std::move(to_set));
}
-SeaStore::tm_ret SeaStore::_omap_clear(
+SeaStore::Shard::tm_ret
+SeaStore::Shard::_omap_clear(
internal_context_t &ctx,
OnodeRef &onode)
{
});
}
-SeaStore::tm_ret SeaStore::_omap_rmkeys(
+SeaStore::Shard::tm_ret
+SeaStore::Shard::_omap_rmkeys(
internal_context_t &ctx,
OnodeRef &onode,
omap_keys_t &&keys)
}
}
-SeaStore::tm_ret SeaStore::_omap_rmkeyrange(
+SeaStore::Shard::tm_ret
+SeaStore::Shard::_omap_rmkeyrange(
internal_context_t &ctx,
OnodeRef &onode,
std::string first,
}
}
-SeaStore::tm_ret SeaStore::_truncate(
+SeaStore::Shard::tm_ret
+SeaStore::Shard::_truncate(
internal_context_t &ctx,
OnodeRef &onode,
uint64_t size)
});
}
-SeaStore::tm_ret SeaStore::_setattrs(
+SeaStore::Shard::tm_ret
+SeaStore::Shard::_setattrs(
internal_context_t &ctx,
OnodeRef &onode,
std::map<std::string, bufferlist>&& aset)
});
}
-SeaStore::tm_ret SeaStore::_rmattr(
+SeaStore::Shard::tm_ret
+SeaStore::Shard::_rmattr(
internal_context_t &ctx,
OnodeRef &onode,
std::string name)
}
}
-SeaStore::tm_ret SeaStore::_xattr_rmattr(
+SeaStore::Shard::tm_ret
+SeaStore::Shard::_xattr_rmattr(
internal_context_t &ctx,
OnodeRef &onode,
std::string &&name)
}
}
-SeaStore::tm_ret SeaStore::_rmattrs(
+SeaStore::Shard::tm_ret
+SeaStore::Shard::_rmattrs(
internal_context_t &ctx,
OnodeRef &onode)
{
return _xattr_clear(ctx, onode);
}
-SeaStore::tm_ret SeaStore::_xattr_clear(
+SeaStore::Shard::tm_ret
+SeaStore::Shard::_xattr_clear(
internal_context_t &ctx,
OnodeRef &onode)
{
}
}
-SeaStore::tm_ret SeaStore::_create_collection(
+SeaStore::Shard::tm_ret
+SeaStore::Shard::_create_collection(
internal_context_t &ctx,
const coll_t& cid, int bits)
{
);
}
-SeaStore::tm_ret SeaStore::_remove_collection(
+SeaStore::Shard::tm_ret
+SeaStore::Shard::_remove_collection(
internal_context_t &ctx,
const coll_t& cid)
{
);
}
-boost::intrusive_ptr<SeastoreCollection> SeaStore::_get_collection(const coll_t& cid)
+boost::intrusive_ptr<SeastoreCollection>
+SeaStore::Shard::_get_collection(const coll_t& cid)
{
return new SeastoreCollection{cid};
}
-seastar::future<> SeaStore::write_meta(const std::string& key,
- const std::string& value)
+seastar::future<> SeaStore::Shard::write_meta(
+ const std::string& key,
+ const std::string& value)
{
LOG_PREFIX(SeaStore::write_meta);
DEBUG("key: {}; value: {}", key, value);
return transaction_manager->submit_transaction(t);
});
});
- }).safe_then([this, &key, &value] {
- return mdstore->write_meta(key, value);
});
}).handle_error(
crimson::ct_error::assert_all{"Invalid error in SeaStore::write_meta"}
);
}
-seastar::future<std::tuple<int, std::string>> SeaStore::read_meta(const std::string& key)
+seastar::future<std::tuple<int, std::string>>
+SeaStore::read_meta(const std::string& key)
{
+ ceph_assert(seastar::this_shard_id() == primary_core);
LOG_PREFIX(SeaStore::read_meta);
DEBUG("key: {}", key);
return mdstore->read_meta(key).safe_then([](auto v) {
);
}
-uuid_d SeaStore::get_fsid() const
+uuid_d SeaStore::Shard::get_fsid() const
{
return device->get_meta().seastore_id;
}
-void SeaStore::init_managers()
+void SeaStore::Shard::init_managers()
{
transaction_manager.reset();
collection_manager.reset();
*transaction_manager);
}
-seastar::future<std::unique_ptr<SeaStore>> make_seastore(
- const std::string &device,
- const ConfigValues &config)
+std::unique_ptr<SeaStore> make_seastore(
+ const std::string &device)
{
- using crimson::common::get_conf;
- std::string type = get_conf<std::string>("seastore_main_device_type");
- device_type_t d_type = string_to_device_type(type);
- assert(d_type == device_type_t::SSD ||
- d_type == device_type_t::RANDOM_BLOCK_SSD);
-
- return Device::make_device(
- device, d_type
- ).then([&device](DeviceRef device_obj) {
-#ifndef NDEBUG
- bool is_test = true;
-#else
- bool is_test = false;
-#endif
- auto mdstore = std::make_unique<FileMDStore>(device);
- return std::make_unique<SeaStore>(
- device,
- std::move(mdstore),
- std::move(device_obj),
- is_test);
- });
+ auto mdstore = std::make_unique<FileMDStore>(device);
+ return std::make_unique<SeaStore>(
+ device,
+ std::move(mdstore));
}
std::unique_ptr<SeaStore> make_test_seastore(
- DeviceRef device,
SeaStore::MDStoreRef mdstore)
{
return std::make_unique<SeaStore>(
"",
- std::move(mdstore),
- std::move(device),
- true);
+ std::move(mdstore));
}
}
using OnodeRef = boost::intrusive_ptr<Onode>;
class TransactionManager;
+enum class op_type_t : uint8_t {
+ TRANSACTION = 0,
+ READ,
+ WRITE,
+ GET_ATTR,
+ GET_ATTRS,
+ STAT,
+ OMAP_GET_VALUES,
+ OMAP_LIST,
+ MAX
+};
+
class SeastoreCollection final : public FuturizedCollection {
public:
template <typename... T>
ghobject_t obj_end;
};
-using coll_core_t = FuturizedStore::coll_core_t;
class SeaStore final : public FuturizedStore {
public:
class MDStore {
};
using MDStoreRef = std::unique_ptr<MDStore>;
- SeaStore(
- const std::string& root,
- MDStoreRef mdstore,
- DeviceRef device,
- bool is_test);
- ~SeaStore();
-
- seastar::future<> stop() final;
- mount_ertr::future<> mount() final;
- seastar::future<> umount() final;
+ class Shard : public FuturizedStore::Shard {
+ public:
+ Shard(
+ std::string root,
+ Device* device,
+ bool is_test);
+ ~Shard() = default;
+
+ seastar::future<struct stat> stat(
+ CollectionRef c,
+ const ghobject_t& oid) final;
+
+ read_errorator::future<ceph::bufferlist> read(
+ CollectionRef c,
+ const ghobject_t& oid,
+ uint64_t offset,
+ size_t len,
+ uint32_t op_flags = 0) final;
+
+ read_errorator::future<ceph::bufferlist> readv(
+ CollectionRef c,
+ const ghobject_t& oid,
+ interval_set<uint64_t>& m,
+ uint32_t op_flags = 0) final;
+
+ get_attr_errorator::future<ceph::bufferlist> get_attr(
+ CollectionRef c,
+ const ghobject_t& oid,
+ std::string_view name) const final;
+
+ get_attrs_ertr::future<attrs_t> get_attrs(
+ CollectionRef c,
+ const ghobject_t& oid) final;
+
+ read_errorator::future<omap_values_t> omap_get_values(
+ CollectionRef c,
+ const ghobject_t& oid,
+ const omap_keys_t& keys) final;
+
+ /// Retrieves paged set of values > start (if present)
+ using omap_get_values_ret_bare_t = std::tuple<bool, omap_values_t>;
+ using omap_get_values_ret_t = read_errorator::future<
+ omap_get_values_ret_bare_t>;
+ omap_get_values_ret_t omap_get_values(
+ CollectionRef c, ///< [in] collection
+ const ghobject_t &oid, ///< [in] oid
+ const std::optional<std::string> &start ///< [in] start, empty for begin
+ ) final; ///< @return <done, values> values.empty() iff done
+
+ get_attr_errorator::future<bufferlist> omap_get_header(
+ CollectionRef c,
+ const ghobject_t& oid) final;
+
+ seastar::future<std::tuple<std::vector<ghobject_t>, ghobject_t>> list_objects(
+ CollectionRef c,
+ const ghobject_t& start,
+ const ghobject_t& end,
+ uint64_t limit) const final;
+
+ seastar::future<CollectionRef> create_new_collection(const coll_t& cid) final;
+ seastar::future<CollectionRef> open_collection(const coll_t& cid) final;
+
+ seastar::future<> do_transaction_no_callbacks(
+ CollectionRef ch,
+ ceph::os::Transaction&& txn) final;
- mkfs_ertr::future<> mkfs(uuid_d new_osd_fsid) final;
- seastar::future<store_statfs_t> stat() const final;
+ /* Note, flush() machinery must go through the same pipeline
+ * stages and locks as do_transaction. */
+ seastar::future<> flush(CollectionRef ch) final;
- read_errorator::future<ceph::bufferlist> read(
- CollectionRef c,
- const ghobject_t& oid,
- uint64_t offset,
- size_t len,
- uint32_t op_flags = 0) final;
- read_errorator::future<ceph::bufferlist> readv(
- CollectionRef c,
- const ghobject_t& oid,
- interval_set<uint64_t>& m,
- uint32_t op_flags = 0) final;
- get_attr_errorator::future<ceph::bufferlist> get_attr(
- CollectionRef c,
- const ghobject_t& oid,
- std::string_view name) const final;
- get_attrs_ertr::future<attrs_t> get_attrs(
- CollectionRef c,
- const ghobject_t& oid) final;
-
- seastar::future<struct stat> stat(
- CollectionRef c,
- const ghobject_t& oid) final;
-
- read_errorator::future<omap_values_t> omap_get_values(
- CollectionRef c,
- const ghobject_t& oid,
- const omap_keys_t& keys) final;
-
- /// Retrieves paged set of values > start (if present)
- using omap_get_values_ret_bare_t = std::tuple<bool, omap_values_t>;
- using omap_get_values_ret_t = read_errorator::future<
- omap_get_values_ret_bare_t>;
- omap_get_values_ret_t omap_get_values(
- CollectionRef c, ///< [in] collection
- const ghobject_t &oid, ///< [in] oid
- const std::optional<std::string> &start ///< [in] start, empty for begin
- ) final; ///< @return <done, values> values.empty() iff done
-
- get_attr_errorator::future<bufferlist> omap_get_header(
- CollectionRef c,
- const ghobject_t& oid) final;
+ read_errorator::future<std::map<uint64_t, uint64_t>> fiemap(
+ CollectionRef ch,
+ const ghobject_t& oid,
+ uint64_t off,
+ uint64_t len) final;
- static col_obj_ranges_t
- get_objs_range(CollectionRef ch, unsigned bits);
+ unsigned get_max_attr_name_length() const final {
+ return 256;
+ }
- seastar::future<std::tuple<std::vector<ghobject_t>, ghobject_t>> list_objects(
- CollectionRef c,
- const ghobject_t& start,
- const ghobject_t& end,
- uint64_t limit) const final;
+ // only exposed to SeaStore
+ public:
+ mount_ertr::future<> mount();
- seastar::future<CollectionRef> create_new_collection(const coll_t& cid) final;
- seastar::future<CollectionRef> open_collection(const coll_t& cid) final;
- seastar::future<std::vector<coll_core_t>> list_collections() final;
+ seastar::future<> umount();
- seastar::future<> do_transaction_no_callbacks(
- CollectionRef ch,
- ceph::os::Transaction&& txn) final;
+ seastar::future<> mkfs(
+ secondary_device_set_t &sds,
+ uuid_d new_osd_fsid);
- /* Note, flush() machinery must go through the same pipeline
- * stages and locks as do_transaction. */
- seastar::future<> flush(CollectionRef ch) final;
+ using coll_core_t = FuturizedStore::coll_core_t;
+ seastar::future<std::vector<coll_core_t>> list_collections();
- read_errorator::future<std::map<uint64_t, uint64_t>> fiemap(
- CollectionRef ch,
- const ghobject_t& oid,
- uint64_t off,
- uint64_t len) final;
+ seastar::future<> write_meta(const std::string& key,
+ const std::string& value);
- seastar::future<> write_meta(const std::string& key,
- const std::string& value) final;
- seastar::future<std::tuple<int, std::string>> read_meta(const std::string& key) final;
- uuid_d get_fsid() const final;
+ store_statfs_t stat() const;
- unsigned get_max_attr_name_length() const final {
- return 256;
- }
- enum class op_type_t : uint8_t {
- TRANSACTION = 0,
- READ,
- WRITE,
- GET_ATTR,
- GET_ATTRS,
- STAT,
- OMAP_GET_VALUES,
- OMAP_LIST,
- MAX
- };
+ uuid_d get_fsid() const;
+ // for each shard store make device
+ seastar::future<> make_shard_stores();
- // for test
- mount_ertr::future<> test_mount();
- mkfs_ertr::future<> test_mkfs(uuid_d new_osd_fsid);
- DeviceRef get_primary_device_ref() {
- return std::move(device);
- }
+ seastar::future<> mkfs_managers();
-private:
- struct internal_context_t {
- CollectionRef ch;
- ceph::os::Transaction ext_transaction;
+ void init_managers();
- internal_context_t(
- CollectionRef ch,
- ceph::os::Transaction &&_ext_transaction,
- TransactionRef &&transaction)
- : ch(ch), ext_transaction(std::move(_ext_transaction)),
- transaction(std::move(transaction)),
- iter(ext_transaction.begin()) {}
+ TransactionManagerRef& get_transaction_manager() {
+ return transaction_manager;
+ }
+ // for secondaries device mkfs
+ seastar::future<> sec_mkfs(
+ const std::string path,
+ device_type_t dtype,
+ device_id_t id,
+ secondary_device_set_t &sds,
+ uuid_d new_osd_fsid);
+
+ DeviceRef get_primary_device_ref() {
+ return std::move(device);
+ }
- TransactionRef transaction;
+ private:
+ struct internal_context_t {
+ CollectionRef ch;
+ ceph::os::Transaction ext_transaction;
- ceph::os::Transaction::iterator iter;
- std::chrono::steady_clock::time_point begin_timestamp = std::chrono::steady_clock::now();
+ internal_context_t(
+ CollectionRef ch,
+ ceph::os::Transaction &&_ext_transaction,
+ TransactionRef &&transaction)
+ : ch(ch), ext_transaction(std::move(_ext_transaction)),
+ transaction(std::move(transaction)),
+ iter(ext_transaction.begin()) {}
- void reset_preserve_handle(TransactionManager &tm) {
- tm.reset_transaction_preserve_handle(*transaction);
- iter = ext_transaction.begin();
- }
- };
+ TransactionRef transaction;
+
+ ceph::os::Transaction::iterator iter;
+ std::chrono::steady_clock::time_point begin_timestamp = std::chrono::steady_clock::now();
- TransactionManager::read_extent_iertr::future<std::optional<unsigned>>
- get_coll_bits(CollectionRef ch, Transaction &t) const;
+ void reset_preserve_handle(TransactionManager &tm) {
+ tm.reset_transaction_preserve_handle(*transaction);
+ iter = ext_transaction.begin();
+ }
+ };
- static void on_error(ceph::os::Transaction &t);
+ TransactionManager::read_extent_iertr::future<std::optional<unsigned>>
+ get_coll_bits(CollectionRef ch, Transaction &t) const;
- template <typename F>
- auto repeat_with_internal_context(
- CollectionRef ch,
- ceph::os::Transaction &&t,
- Transaction::src_t src,
- const char* tname,
- op_type_t op_type,
- F &&f) {
- return seastar::do_with(
- internal_context_t(
- ch, std::move(t),
- transaction_manager->create_transaction(src, tname)),
- std::forward<F>(f),
- [this, op_type](auto &ctx, auto &f) {
+ static void on_error(ceph::os::Transaction &t);
+
+ template <typename F>
+ auto repeat_with_internal_context(
+ CollectionRef ch,
+ ceph::os::Transaction &&t,
+ Transaction::src_t src,
+ const char* tname,
+ op_type_t op_type,
+ F &&f) {
+ return seastar::do_with(
+ internal_context_t(
+ ch, std::move(t),
+ transaction_manager->create_transaction(src, tname)),
+ std::forward<F>(f),
+ [this, op_type](auto &ctx, auto &f) {
return ctx.transaction->get_handle().take_collection_lock(
static_cast<SeastoreCollection&>(*(ctx.ch)).ordering_lock
).then([this] {
}).finally([this] {
throttler.put();
});
- }
- );
- }
+ });
+ }
- template <typename Ret, typename F>
- auto repeat_with_onode(
- CollectionRef ch,
- const ghobject_t &oid,
- Transaction::src_t src,
- const char* tname,
- op_type_t op_type,
- F &&f) const {
- auto begin_time = std::chrono::steady_clock::now();
- return seastar::do_with(
- oid, Ret{}, std::forward<F>(f),
- [this, src, op_type, begin_time, tname
- ](auto &oid, auto &ret, auto &f)
- {
- return repeat_eagain([&, this, src, tname] {
- return transaction_manager->with_transaction_intr(
- src,
- tname,
- [&, this](auto& t)
- {
- return onode_manager->get_onode(t, oid
- ).si_then([&](auto onode) {
- return seastar::do_with(std::move(onode), [&](auto& onode) {
- return f(t, *onode);
+ template <typename Ret, typename F>
+ auto repeat_with_onode(
+ CollectionRef ch,
+ const ghobject_t &oid,
+ Transaction::src_t src,
+ const char* tname,
+ op_type_t op_type,
+ F &&f) const {
+ auto begin_time = std::chrono::steady_clock::now();
+ return seastar::do_with(
+ oid, Ret{}, std::forward<F>(f),
+ [this, src, op_type, begin_time, tname
+ ](auto &oid, auto &ret, auto &f)
+ {
+ return repeat_eagain([&, this, src, tname] {
+ return transaction_manager->with_transaction_intr(
+ src,
+ tname,
+ [&, this](auto& t)
+ {
+ return onode_manager->get_onode(t, oid
+ ).si_then([&](auto onode) {
+ return seastar::do_with(std::move(onode), [&](auto& onode) {
+ return f(t, *onode);
+ });
+ }).si_then([&ret](auto _ret) {
+ ret = _ret;
});
- }).si_then([&ret](auto _ret) {
- ret = _ret;
});
+ }).safe_then([&ret, op_type, begin_time, this] {
+ const_cast<Shard*>(this)->add_latency_sample(op_type,
+ std::chrono::steady_clock::now() - begin_time);
+ return seastar::make_ready_future<Ret>(ret);
});
- }).safe_then([&ret, op_type, begin_time, this] {
- const_cast<SeaStore*>(this)->add_latency_sample(op_type,
- std::chrono::steady_clock::now() - begin_time);
- return seastar::make_ready_future<Ret>(ret);
+ });
+ }
+
+ using _fiemap_ret = ObjectDataHandler::fiemap_ret;
+ _fiemap_ret _fiemap(
+ Transaction &t,
+ Onode &onode,
+ uint64_t off,
+ uint64_t len) const;
+
+ using _omap_get_value_iertr = OMapManager::base_iertr::extend<
+ crimson::ct_error::enodata
+ >;
+ using _omap_get_value_ret = _omap_get_value_iertr::future<ceph::bufferlist>;
+ _omap_get_value_ret _omap_get_value(
+ Transaction &t,
+ omap_root_t &&root,
+ std::string_view key) const;
+
+ using _omap_get_values_iertr = OMapManager::base_iertr;
+ using _omap_get_values_ret = _omap_get_values_iertr::future<omap_values_t>;
+ _omap_get_values_ret _omap_get_values(
+ Transaction &t,
+ omap_root_t &&root,
+ const omap_keys_t &keys) const;
+
+ friend class SeaStoreOmapIterator;
+
+ using omap_list_bare_ret = OMapManager::omap_list_bare_ret;
+ using omap_list_ret = OMapManager::omap_list_ret;
+ omap_list_ret omap_list(
+ Onode &onode,
+ const omap_root_le_t& omap_root,
+ Transaction& t,
+ const std::optional<std::string>& start,
+ OMapManager::omap_list_config_t config) const;
+
+ using tm_iertr = TransactionManager::base_iertr;
+ using tm_ret = tm_iertr::future<>;
+ tm_ret _do_transaction_step(
+ internal_context_t &ctx,
+ CollectionRef &col,
+ std::vector<OnodeRef> &onodes,
+ std::vector<OnodeRef> &d_onodes,
+ ceph::os::Transaction::iterator &i);
+
+ tm_ret _remove(
+ internal_context_t &ctx,
+ OnodeRef &onode);
+ tm_ret _touch(
+ internal_context_t &ctx,
+ OnodeRef &onode);
+ tm_ret _write(
+ internal_context_t &ctx,
+ OnodeRef &onode,
+ uint64_t offset, size_t len,
+ ceph::bufferlist &&bl,
+ uint32_t fadvise_flags);
+ tm_ret _zero(
+ internal_context_t &ctx,
+ OnodeRef &onode,
+ objaddr_t offset, extent_len_t len);
+ tm_ret _omap_set_values(
+ internal_context_t &ctx,
+ OnodeRef &onode,
+ std::map<std::string, ceph::bufferlist> &&aset);
+ tm_ret _omap_set_header(
+ internal_context_t &ctx,
+ OnodeRef &onode,
+ ceph::bufferlist &&header);
+ tm_ret _omap_clear(
+ internal_context_t &ctx,
+ OnodeRef &onode);
+ tm_ret _omap_rmkeys(
+ internal_context_t &ctx,
+ OnodeRef &onode,
+ omap_keys_t &&aset);
+ tm_ret _omap_rmkeyrange(
+ internal_context_t &ctx,
+ OnodeRef &onode,
+ std::string first,
+ std::string last);
+ tm_ret _truncate(
+ internal_context_t &ctx,
+ OnodeRef &onode, uint64_t size);
+ tm_ret _setattrs(
+ internal_context_t &ctx,
+ OnodeRef &onode,
+ std::map<std::string,bufferlist>&& aset);
+ tm_ret _rmattr(
+ internal_context_t &ctx,
+ OnodeRef &onode,
+ std::string name);
+ tm_ret _rmattrs(
+ internal_context_t &ctx,
+ OnodeRef &onode);
+ tm_ret _xattr_rmattr(
+ internal_context_t &ctx,
+ OnodeRef &onode,
+ std::string &&name);
+ tm_ret _xattr_clear(
+ internal_context_t &ctx,
+ OnodeRef &onode);
+ tm_ret _create_collection(
+ internal_context_t &ctx,
+ const coll_t& cid, int bits);
+ tm_ret _remove_collection(
+ internal_context_t &ctx,
+ const coll_t& cid);
+ using omap_set_kvs_ret = tm_iertr::future<>;
+ omap_set_kvs_ret _omap_set_kvs(
+ OnodeRef &onode,
+ const omap_root_le_t& omap_root,
+ Transaction& t,
+ omap_root_le_t& mutable_omap_root,
+ std::map<std::string, ceph::bufferlist>&& kvs);
+
+ boost::intrusive_ptr<SeastoreCollection> _get_collection(const coll_t& cid);
+
+ static constexpr auto LAT_MAX = static_cast<std::size_t>(op_type_t::MAX);
+
+ struct {
+ std::array<seastar::metrics::histogram, LAT_MAX> op_lat;
+ } stats;
+
+ seastar::metrics::histogram& get_latency(
+ op_type_t op_type) {
+ assert(static_cast<std::size_t>(op_type) < stats.op_lat.size());
+ return stats.op_lat[static_cast<std::size_t>(op_type)];
+ }
+
+ void add_latency_sample(op_type_t op_type,
+ std::chrono::steady_clock::duration dur) {
+ seastar::metrics::histogram& lat = get_latency(op_type);
+ lat.sample_count++;
+ lat.sample_sum += std::chrono::duration_cast<std::chrono::milliseconds>(dur).count();
+ }
+
+ private:
+ std::string root;
+ DeviceRef device;
+ const uint32_t max_object_size;
+ bool is_test;
+
+ std::vector<DeviceRef> secondaries;
+ TransactionManagerRef transaction_manager;
+ CollectionManagerRef collection_manager;
+ OnodeManagerRef onode_manager;
+
+ common::Throttle throttler;
+
+ seastar::metrics::metric_group metrics;
+ void register_metrics();
+ };
+
+public:
+ SeaStore(
+ const std::string& root,
+ MDStoreRef mdstore);
+ ~SeaStore();
+
+ seastar::future<> start() final;
+ seastar::future<> stop() final;
+
+ mount_ertr::future<> mount() final {
+ ceph_assert(seastar::this_shard_id() == primary_core);
+ return shard_stores.invoke_on_all(
+ [](auto &local_store) {
+ return local_store.mount().handle_error(
+ crimson::ct_error::assert_all{
+ "Invalid error in SeaStore::mount"
});
});
}
- using _fiemap_ret = ObjectDataHandler::fiemap_ret;
- _fiemap_ret _fiemap(
- Transaction &t,
- Onode &onode,
- uint64_t off,
- uint64_t len) const;
+ seastar::future<> umount() final {
+ ceph_assert(seastar::this_shard_id() == primary_core);
+ return shard_stores.invoke_on_all(
+ [](auto &local_store) {
+ return local_store.umount();
+ });
+ }
- using _omap_get_value_iertr = OMapManager::base_iertr::extend<
- crimson::ct_error::enodata
- >;
- using _omap_get_value_ret = _omap_get_value_iertr::future<ceph::bufferlist>;
- _omap_get_value_ret _omap_get_value(
- Transaction &t,
- omap_root_t &&root,
- std::string_view key) const;
-
- using _omap_get_values_iertr = OMapManager::base_iertr;
- using _omap_get_values_ret = _omap_get_values_iertr::future<omap_values_t>;
- _omap_get_values_ret _omap_get_values(
- Transaction &t,
- omap_root_t &&root,
- const omap_keys_t &keys) const;
-
- friend class SeaStoreOmapIterator;
-
- using omap_list_bare_ret = OMapManager::omap_list_bare_ret;
- using omap_list_ret = OMapManager::omap_list_ret;
- omap_list_ret omap_list(
- Onode &onode,
- const omap_root_le_t& omap_root,
- Transaction& t,
- const std::optional<std::string>& start,
- OMapManager::omap_list_config_t config) const;
-
- void init_managers();
+ mkfs_ertr::future<> mkfs(uuid_d new_osd_fsid) final;
+ seastar::future<store_statfs_t> stat() const final;
- std::string root;
- MDStoreRef mdstore;
- DeviceRef device;
- const uint32_t max_object_size = 0;
- bool is_test;
-
- std::vector<DeviceRef> secondaries;
- TransactionManagerRef transaction_manager;
- CollectionManagerRef collection_manager;
- OnodeManagerRef onode_manager;
-
- common::Throttle throttler;
-
- using tm_iertr = TransactionManager::base_iertr;
- using tm_ret = tm_iertr::future<>;
- tm_ret _do_transaction_step(
- internal_context_t &ctx,
- CollectionRef &col,
- std::vector<OnodeRef> &onodes,
- std::vector<OnodeRef> &d_onodes,
- ceph::os::Transaction::iterator &i);
-
- tm_ret _remove(
- internal_context_t &ctx,
- OnodeRef &onode);
- tm_ret _touch(
- internal_context_t &ctx,
- OnodeRef &onode);
- tm_ret _write(
- internal_context_t &ctx,
- OnodeRef &onode,
- uint64_t offset, size_t len,
- ceph::bufferlist &&bl,
- uint32_t fadvise_flags);
- tm_ret _zero(
- internal_context_t &ctx,
- OnodeRef &onode,
- objaddr_t offset, extent_len_t len);
- tm_ret _omap_set_values(
- internal_context_t &ctx,
- OnodeRef &onode,
- std::map<std::string, ceph::bufferlist> &&aset);
- tm_ret _omap_set_header(
- internal_context_t &ctx,
- OnodeRef &onode,
- ceph::bufferlist &&header);
- tm_ret _omap_clear(
- internal_context_t &ctx,
- OnodeRef &onode);
- tm_ret _omap_rmkeys(
- internal_context_t &ctx,
- OnodeRef &onode,
- omap_keys_t &&aset);
- tm_ret _omap_rmkeyrange(
- internal_context_t &ctx,
- OnodeRef &onode,
- std::string first,
- std::string last);
- tm_ret _truncate(
- internal_context_t &ctx,
- OnodeRef &onode, uint64_t size);
- tm_ret _setattrs(
- internal_context_t &ctx,
- OnodeRef &onode,
- std::map<std::string,bufferlist>&& aset);
- tm_ret _rmattr(
- internal_context_t &ctx,
- OnodeRef &onode,
- std::string name);
- tm_ret _rmattrs(
- internal_context_t &ctx,
- OnodeRef &onode);
- tm_ret _xattr_rmattr(
- internal_context_t &ctx,
- OnodeRef &onode,
- std::string &&name);
- tm_ret _xattr_clear(
- internal_context_t &ctx,
- OnodeRef &onode);
- tm_ret _create_collection(
- internal_context_t &ctx,
- const coll_t& cid, int bits);
- tm_ret _remove_collection(
- internal_context_t &ctx,
- const coll_t& cid);
- using omap_set_kvs_ret = tm_iertr::future<>;
- omap_set_kvs_ret _omap_set_kvs(
- OnodeRef &onode,
- const omap_root_le_t& omap_root,
- Transaction& t,
- omap_root_le_t& mutable_omap_root,
- std::map<std::string, ceph::bufferlist>&& kvs);
-
- boost::intrusive_ptr<SeastoreCollection> _get_collection(const coll_t& cid);
-
- static constexpr auto LAT_MAX = static_cast<std::size_t>(op_type_t::MAX);
- struct {
- std::array<seastar::metrics::histogram, LAT_MAX> op_lat;
- } stats;
-
- seastar::metrics::histogram& get_latency(
- op_type_t op_type) {
- assert(static_cast<std::size_t>(op_type) < stats.op_lat.size());
- return stats.op_lat[static_cast<std::size_t>(op_type)];
+ uuid_d get_fsid() const final {
+ ceph_assert(seastar::this_shard_id() == primary_core);
+ return shard_stores.local().get_fsid();
+ }
+
+ seastar::future<> write_meta(
+ const std::string& key,
+ const std::string& value) final {
+ ceph_assert(seastar::this_shard_id() == primary_core);
+ return shard_stores.local().write_meta(
+ key, value).then([this, key, value] {
+ return mdstore->write_meta(key, value);
+ }).handle_error(
+ crimson::ct_error::assert_all{"Invalid error in SeaStore::write_meta"}
+ );
+ }
+
+ seastar::future<std::tuple<int, std::string>> read_meta(const std::string& key) final;
+
+ seastar::future<std::vector<coll_core_t>> list_collections() final;
+
+ FuturizedStore::Shard& get_sharded_store() final {
+ return shard_stores.local();
}
- void add_latency_sample(op_type_t op_type,
- std::chrono::steady_clock::duration dur) {
- seastar::metrics::histogram& lat = get_latency(op_type);
- lat.sample_count++;
- lat.sample_sum += std::chrono::duration_cast<std::chrono::milliseconds>(dur).count();
+ static col_obj_ranges_t
+ get_objs_range(CollectionRef ch, unsigned bits);
+
+// for test
+public:
+ mount_ertr::future<> test_mount();
+ mkfs_ertr::future<> test_mkfs(uuid_d new_osd_fsid);
+
+ DeviceRef get_primary_device_ref() {
+ ceph_assert(seastar::this_shard_id() == primary_core);
+ return shard_stores.local().get_primary_device_ref();
}
- seastar::metrics::metric_group metrics;
- void register_metrics();
+
+ seastar::future<> test_start(DeviceRef dev);
+
+private:
seastar::future<> write_fsid(uuid_d new_osd_fsid);
+
+ seastar::future<> prepare_meta(uuid_d new_osd_fsid);
+
seastar::future<> _mkfs(uuid_d new_osd_fsid);
+
+private:
+ std::string root;
+ MDStoreRef mdstore;
+ seastar::sharded<SeaStore::Shard> shard_stores;
};
-seastar::future<std::unique_ptr<SeaStore>> make_seastore(
- const std::string &device,
- const ConfigValues &config);
+std::unique_ptr<SeaStore> make_seastore(
+ const std::string &device);
std::unique_ptr<SeaStore> make_test_seastore(
- DeviceRef device,
SeaStore::MDStoreRef mdstore);
}
static_cast<size_t>(0),
[&](auto &nr_zones) {
return seastar::open_file_dma(
- device + "/block",
+ device + "/block" + std::to_string(seastar::this_shard_id()),
seastar::open_flags::rw
).then([&](auto file) {
return seastar::do_with(
if (nr_zones != 0) {
return std::make_unique<
segment_manager::zns::ZNSSegmentManager
- >(device + "/block");
+ >(device + "/block" + std::to_string(seastar::this_shard_id()));
} else {
return std::make_unique<
segment_manager::block::BlockSegmentManager
- >(device + "/block", dtype);
+ >(device + "/block" + std::to_string(seastar::this_shard_id()), dtype);
}
});
});
return seastar::make_ready_future<crimson::os::seastore::SegmentManagerRef>(
std::make_unique<
segment_manager::block::BlockSegmentManager
- >(device + "/block", dtype));
+ >(device + "/block" + std::to_string(seastar::this_shard_id()), dtype));
#endif
}
using crimson::common::get_conf;
auto config_size = get_conf<Option::size_t>(
- "seastore_device_size");
+ "seastore_device_size")/seastar::smp::count;
size_t size = (data.size == 0) ? config_size : data.size;
check_create_device_ret maybe_create = check_create_device_ertr::now();
using crimson::common::get_conf;
if (get_conf<bool>("seastore_block_create")) {
- auto size = get_conf<Option::size_t>("seastore_device_size");
+ auto size =
+ get_conf<Option::size_t>("seastore_device_size")/seastar::smp::count;
maybe_create = check_create_device(device_path, size);
}
epoch_t min_epoch, epoch_t max_epoch,
std::vector<pg_log_entry_t>&& log_entries) final;
CollectionRef coll;
- crimson::os::FuturizedStore* store;
+ crimson::os::FuturizedStore::Shard* store;
seastar::future<> request_committed(const osd_reqid_t& reqid,
const eversion_t& version) final {
return seastar::now();
auto store = crimson::os::FuturizedStore::create(
local_conf().get_val<std::string>("osd_objectstore"),
local_conf().get_val<std::string>("osd_data"),
- local_conf().get_config_values()).get();
+ local_conf().get_config_values());
crimson::osd::OSD osd(
whoami, nonce, std::ref(should_stop.abort_source()),
seastar::future<> OSD::open_meta_coll()
{
- return store.open_collection(
+ return store.get_sharded_store().open_collection(
coll_t::meta()
).then([this](auto ch) {
- pg_shard_manager.init_meta_coll(ch, store);
+ pg_shard_manager.init_meta_coll(ch, store.get_sharded_store());
return seastar::now();
});
}
seastar::future<OSDMeta> OSD::open_or_create_meta_coll(FuturizedStore &store)
{
- return store.open_collection(coll_t::meta()).then([&store](auto ch) {
+ return store.get_sharded_store().open_collection(coll_t::meta()).then([&store](auto ch) {
if (!ch) {
- return store.create_new_collection(
+ return store.get_sharded_store().create_new_collection(
coll_t::meta()
).then([&store](auto ch) {
- return OSDMeta(ch, store);
+ return OSDMeta(ch, store.get_sharded_store());
});
} else {
- return seastar::make_ready_future<OSDMeta>(ch, store);
+ return seastar::make_ready_future<OSDMeta>(ch, store.get_sharded_store());
}
});
}
meta_coll.create(t);
meta_coll.store_superblock(t, superblock);
logger().debug("OSD::_write_superblock: do_transaction...");
- return store.do_transaction(
+ return store.get_sharded_store().do_transaction(
meta_coll.collection(),
std::move(t));
}),
startup_time = ceph::mono_clock::now();
- return pg_shard_manager.start(
+ return store.start().then([this] {
+ return pg_shard_manager.start(
whoami, *cluster_msgr,
- *public_msgr, *monc, *mgrc, store
- ).then([this] {
+ *public_msgr, *monc, *mgrc, store);
+ }).then([this] {
heartbeat.reset(new Heartbeat{
whoami, get_shard_services(),
*monc, *hb_front_msgr, *hb_back_msgr});
- return store.start();
- }).then([this] {
return store.mount().handle_error(
crimson::stateful_ec::handle([] (const auto& ec) {
logger().error("error mounting object store in {}: ({}) {}",
}).then([this] {
pg_shard_manager.got_map(osdmap->get_epoch());
bind_epoch = osdmap->get_epoch();
- return pg_shard_manager.load_pgs();
+ return pg_shard_manager.load_pgs(store);
}).then([this] {
uint64_t osd_required =
pg_shard_manager.get_meta_coll().store_superblock(t, superblock);
pg_shard_manager.set_superblock(superblock);
logger().debug("OSD::handle_osd_map: do_transaction...");
- return store.do_transaction(
+ return store.get_sharded_store().do_transaction(
pg_shard_manager.get_meta_coll().collection(),
std::move(t));
});
#include "os/Transaction.h"
using std::string;
-using read_errorator = crimson::os::FuturizedStore::read_errorator;
+using read_errorator = crimson::os::FuturizedStore::Shard::read_errorator;
void OSDMeta::create(ceph::os::Transaction& t)
{
class OSDMeta {
template<typename T> using Ref = boost::intrusive_ptr<T>;
- crimson::os::FuturizedStore& store;
+ crimson::os::FuturizedStore::Shard& store;
Ref<crimson::os::FuturizedCollection> coll;
public:
OSDMeta(Ref<crimson::os::FuturizedCollection> coll,
- crimson::os::FuturizedStore& store)
+ crimson::os::FuturizedStore::Shard& store)
: store{store}, coll{coll}
{}
void store_superblock(ceph::os::Transaction& t,
const OSDSuperblock& sb);
- using load_superblock_ertr = crimson::os::FuturizedStore::read_errorator;
+ using load_superblock_ertr = crimson::os::FuturizedStore::Shard::read_errorator;
using load_superblock_ret = load_superblock_ertr::future<OSDSuperblock>;
load_superblock_ret load_superblock();
new_acting_primary, history, pi, t);
}
-seastar::future<> PG::read_state(crimson::os::FuturizedStore* store)
+seastar::future<> PG::read_state(crimson::os::FuturizedStore::Shard* store)
{
if (__builtin_expect(stopping, false)) {
return seastar::make_exception_future<>(
const PastIntervals& pim,
ceph::os::Transaction &t);
- seastar::future<> read_state(crimson::os::FuturizedStore* store);
+ seastar::future<> read_state(crimson::os::FuturizedStore::Shard* store);
interruptible_future<> do_peering_event(
PGPeeringEvent& evt, PeeringCtx &rctx);
}
using get_omap_ertr =
- crimson::os::FuturizedStore::read_errorator::extend<
+ crimson::os::FuturizedStore::Shard::read_errorator::extend<
crimson::ct_error::enodata>;
using get_omap_iertr =
::crimson::interruptible::interruptible_errorator<
get_omap_ertr>;
static
get_omap_iertr::future<
- crimson::os::FuturizedStore::omap_values_t>
+ crimson::os::FuturizedStore::Shard::omap_values_t>
maybe_get_omap_vals_by_keys(
- crimson::os::FuturizedStore* store,
+ crimson::os::FuturizedStore::Shard* store,
const crimson::os::CollectionRef& coll,
const object_info_t& oi,
const std::set<std::string>& keys_to_get)
static
get_omap_iertr::future<
- std::tuple<bool, crimson::os::FuturizedStore::omap_values_t>>
+ std::tuple<bool, crimson::os::FuturizedStore::Shard::omap_values_t>>
maybe_get_omap_vals(
- crimson::os::FuturizedStore* store,
+ crimson::os::FuturizedStore::Shard* store,
const crimson::os::CollectionRef& coll,
const object_info_t& oi,
const std::string& start_after)
delta_stats.num_rd++;
return maybe_get_omap_vals_by_keys(store, coll, os.oi, keys_to_get)
.safe_then_interruptible(
- [&osd_op] (crimson::os::FuturizedStore::omap_values_t&& vals) {
+ [&osd_op] (crimson::os::FuturizedStore::Shard::omap_values_t&& vals) {
encode(vals, osd_op.outdata);
return ll_read_errorator::now();
}).handle_error_interruptible(
using CollectionRef = crimson::os::CollectionRef;
using ec_profile_t = std::map<std::string, std::string>;
// low-level read errorator
- using ll_read_errorator = crimson::os::FuturizedStore::read_errorator;
+ using ll_read_errorator = crimson::os::FuturizedStore::Shard::read_errorator;
using ll_read_ierrorator =
::crimson::interruptible::interruptible_errorator<
::crimson::osd::IOInterruptCondition,
const OSDOp& osd_op,
ceph::os::Transaction& trans,
object_stat_sum_t& delta_stats);
- using get_attr_errorator = crimson::os::FuturizedStore::get_attr_errorator;
+ using get_attr_errorator = crimson::os::FuturizedStore::Shard::get_attr_errorator;
using get_attr_ierrorator =
::crimson::interruptible::interruptible_errorator<
::crimson::osd::IOInterruptCondition,
OSDOp& osd_op,
object_stat_sum_t& delta_stats) const;
using omap_cmp_ertr =
- crimson::os::FuturizedStore::read_errorator::extend<
+ crimson::os::FuturizedStore::Shard::read_errorator::extend<
crimson::ct_error::ecanceled,
crimson::ct_error::invarg>;
using omap_cmp_iertr =
CollectionRef coll;
crimson::osd::ShardServices &shard_services;
DoutPrefixProvider &dpp; ///< provides log prefix context
- crimson::os::FuturizedStore* store;
+ crimson::os::FuturizedStore::Shard* store;
virtual seastar::future<> request_committed(
const osd_reqid_t& reqid,
const eversion_t& at_version) = 0;
// easily skip them
using crimson::os::FuturizedStore;
-PGMeta::PGMeta(FuturizedStore& store, spg_t pgid)
+PGMeta::PGMeta(FuturizedStore::Shard& store, spg_t pgid)
: store{store},
pgid{pgid}
{}
namespace {
template<typename T>
- std::optional<T> find_value(const FuturizedStore::omap_values_t& values,
+ std::optional<T> find_value(const FuturizedStore::Shard::omap_values_t& values,
string_view key)
{
auto found = values.find(key);
return seastar::make_ready_future<epoch_t>(*epoch);
}
},
- FuturizedStore::read_errorator::assert_all{
+ FuturizedStore::Shard::read_errorator::assert_all{
"PGMeta::get_epoch: unable to read pgmeta"
});
});
return seastar::make_ready_future<std::tuple<pg_info_t, PastIntervals>>(
std::make_tuple(std::move(info), std::move(past_intervals)));
},
- FuturizedStore::read_errorator::assert_all{
+ FuturizedStore::Shard::read_errorator::assert_all{
"PGMeta::load: unable to read pgmeta"
});
}
#include <tuple>
#include <seastar/core/future.hh>
#include "osd/osd_types.h"
-
-namespace crimson::os {
- class FuturizedStore;
-}
+#include "crimson/os/futurized_store.h"
/// PG related metadata
class PGMeta
{
- crimson::os::FuturizedStore& store;
+ crimson::os::FuturizedStore::Shard& store;
const spg_t pgid;
public:
- PGMeta(crimson::os::FuturizedStore& store, spg_t pgid);
+ PGMeta(crimson::os::FuturizedStore::Shard& store, spg_t pgid);
seastar::future<epoch_t> get_epoch();
seastar::future<std::tuple<pg_info_t, PastIntervals>> load();
};
});
}
-seastar::future<> PGShardManager::load_pgs()
+seastar::future<> PGShardManager::load_pgs(crimson::os::FuturizedStore& store)
{
ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
- return get_local_state().store.list_collections(
+ return store.list_collections(
).then([this](auto colls_cores) {
return seastar::parallel_for_each(
colls_cores,
#include "crimson/osd/shard_services.h"
#include "crimson/osd/pg_map.h"
-namespace crimson::osd {
+namespace crimson::os {
+ class FuturizedStore;
+}
+namespace crimson::osd {
/**
* PGShardManager
*
});
}
- seastar::future<> load_pgs();
+ seastar::future<> load_pgs(crimson::os::FuturizedStore& store);
seastar::future<> stop_pgs();
seastar::future<std::map<pg_t, pg_stat_t>> get_pg_stats() const;
protected:
crimson::osd::PG& pg;
crimson::osd::ShardServices& shard_services;
- crimson::os::FuturizedStore* store;
+ crimson::os::FuturizedStore::Shard* store;
crimson::os::CollectionRef coll;
PGBackend* backend;
}
return interruptor::make_interruptible(interruptor::when_all_succeed(
backend->omap_get_header(coll, ghobject_t(oid)).handle_error_interruptible<false>(
- crimson::os::FuturizedStore::read_errorator::all_same_way(
+ crimson::os::FuturizedStore::Shard::read_errorator::all_same_way(
[oid] (const std::error_code& e) {
logger().debug("read_metadata_for_push_op, error {} when getting omap header: {}", e, oid);
return seastar::make_ready_future<bufferlist>();
})),
interruptor::make_interruptible(store->get_attrs(coll, ghobject_t(oid)))
.handle_error_interruptible<false>(
- crimson::os::FuturizedStore::get_attrs_ertr::all_same_way(
+ crimson::os::FuturizedStore::Shard::get_attrs_ertr::all_same_way(
[oid] (const std::error_code& e) {
logger().debug("read_metadata_for_push_op, error {} when getting attrs: {}", e, oid);
- return seastar::make_ready_future<crimson::os::FuturizedStore::attrs_t>();
+ return seastar::make_ready_future<crimson::os::FuturizedStore::Shard::attrs_t>();
}))
)).then_unpack_interruptible([&new_progress, push_op](auto bl, auto attrs) {
if (bl.length() == 0) {
return seastar::make_ready_future<seastar::stop_iteration>(
stop ? seastar::stop_iteration::yes : seastar::stop_iteration::no
);
- }, crimson::os::FuturizedStore::read_errorator::assert_all{});
+ }, crimson::os::FuturizedStore::Shard::read_errorator::assert_all{});
});
}
PerfCounters *recoverystate_perf,
crimson::os::FuturizedStore &store)
: whoami(whoami),
- store(store),
+ store(store.get_sharded_store()),
perf(perf), recoverystate_perf(recoverystate_perf),
throttler(crimson::common::local_conf()),
next_tid(
#define assert_core() ceph_assert(seastar::this_shard_id() == core);
const int whoami;
- crimson::os::FuturizedStore &store;
+ crimson::os::FuturizedStore::Shard &store;
crimson::common::CephContext cct;
PerfCounters *perf = nullptr;
FORWARD_TO_OSD_SINGLETON(send_to_osd)
- crimson::os::FuturizedStore &get_store() {
+ crimson::os::FuturizedStore::Shard &get_store() {
return local_state.store;
}
config.log_size);
}
- return fs->do_transaction(
+ return sharded_fs->do_transaction(
mapping.pg.collection,
std::move(t));
}
{
auto mapping = map_offset(offset);
ceph_assert((mapping.offset + size) <= config.object_size);
- return fs->read(
+ return sharded_fs->read(
mapping.pg.collection,
mapping.object,
mapping.offset,
seastar::future<> FSDriver::mkfs()
{
- return init(
+ return init(
).then([this] {
assert(fs);
- return fs->start();
- }).then([this] {
uuid_d uuid;
uuid.generate_random();
return fs->mkfs(uuid).handle_error(
}).then([this] {
return fs->stop();
}).then([this] {
- return init().then([this] {
- return fs->start();
- });
+ return init();
}).then([this] {
return fs->mount(
).handle_error(
boost::counting_iterator<unsigned>(0),
boost::counting_iterator<unsigned>(config.num_pgs),
[this](auto i) {
- return fs->create_new_collection(get_coll(i)
+ return sharded_fs->create_new_collection(get_coll(i)
).then([this, i](auto coll) {
ceph::os::Transaction t;
t.create_collection(get_coll(i), 0);
- return fs->do_transaction(coll, std::move(t));
+ return sharded_fs->do_transaction(coll, std::move(t));
});
});
}).then([this] {
return (
config.mkfs ? mkfs() : seastar::now()
).then([this] {
- return init().then([this] {
- return fs->start();
- });
+ return init();
}).then([this] {
return fs->mount(
).handle_error(
boost::counting_iterator<unsigned>(0),
boost::counting_iterator<unsigned>(config.num_pgs),
[this](auto i) {
- return fs->open_collection(get_coll(i)
+ return sharded_fs->open_collection(get_coll(i)
).then([this, i](auto ref) {
collections[i].collection = ref;
collections[i].log_object = get_log_object(i);
config.log_entry_size,
config.log_size);
}
- return fs->do_transaction(
+ return sharded_fs->do_transaction(
collections[i].collection,
std::move(t));
} else {
seastar::future<> FSDriver::init()
{
fs.reset();
- return FuturizedStore::create(
+ fs = FuturizedStore::create(
config.get_fs_type(),
*config.path,
crimson::common::local_conf().get_config_values()
- ).then([this] (auto store_ptr) {
- fs = std::move(store_ptr);
- return seastar::now();
+ );
+ return fs->start().then([this] {
+ sharded_fs = &(fs->get_sharded_store());
});
}
size_t size = 0;
const config_t config;
std::unique_ptr<crimson::os::FuturizedStore> fs;
+ crimson::os::FuturizedStore::Shard* sharded_fs;
struct pg_analogue_t {
crimson::os::CollectionRef collection;
#ifdef WITH_SEASTAR
namespace {
- struct FuturizedStoreLogReader {
- crimson::os::FuturizedStore &store;
+ struct FuturizedShardStoreLogReader {
+ crimson::os::FuturizedStore::Shard &store;
const pg_info_t &info;
PGLog::IndexedLog &log;
std::set<std::string>* log_keys_debug = NULL;
return seastar::make_ready_future<seastar::stop_iteration>(
done ? seastar::stop_iteration::yes : seastar::stop_iteration::no
);
- }, crimson::os::FuturizedStore::read_errorator::assert_all{});
+ }, crimson::os::FuturizedStore::Shard::read_errorator::assert_all{});
}).then([this] {
if (info.pgid.is_no_shard()) {
// replicated pool pg does not persist this key
}
seastar::future<> PGLog::read_log_and_missing_crimson(
- crimson::os::FuturizedStore &store,
+ crimson::os::FuturizedStore::Shard &store,
crimson::os::CollectionRef ch,
const pg_info_t &info,
IndexedLog &log,
ldpp_dout(dpp, 20) << "read_log_and_missing coll "
<< ch->get_cid()
<< " " << pgmeta_oid << dendl;
- return seastar::do_with(FuturizedStoreLogReader{
+ return seastar::do_with(FuturizedShardStoreLogReader{
store, info, log, log_keys_debug,
missing, dpp},
- [ch, pgmeta_oid](FuturizedStoreLogReader& reader) {
+ [ch, pgmeta_oid](FuturizedShardStoreLogReader& reader) {
return reader.read(ch, pgmeta_oid);
});
}
seastar::future<> PGLog::rebuild_missing_set_with_deletes_crimson(
- crimson::os::FuturizedStore &store,
+ crimson::os::FuturizedStore::Shard &store,
crimson::os::CollectionRef ch,
const pg_info_t &info)
{
#ifdef WITH_SEASTAR
seastar::future<> rebuild_missing_set_with_deletes_crimson(
- crimson::os::FuturizedStore &store,
+ crimson::os::FuturizedStore::Shard &store,
crimson::os::CollectionRef ch,
const pg_info_t &info);
#endif
#ifdef WITH_SEASTAR
seastar::future<> read_log_and_missing_crimson(
- crimson::os::FuturizedStore &store,
+ crimson::os::FuturizedStore::Shard &store,
crimson::os::CollectionRef ch,
const pg_info_t &info,
ghobject_t pgmeta_oid
}
static seastar::future<> read_log_and_missing_crimson(
- crimson::os::FuturizedStore &store,
+ crimson::os::FuturizedStore::Shard &store,
crimson::os::CollectionRef ch,
const pg_info_t &info,
IndexedLog &log,
using crimson::os::FuturizedStore;
return interruptor::green_get(os->omap_get_values(
ch, hoid, keys
- ).safe_then([out] (FuturizedStore::omap_values_t&& vals) {
+ ).safe_then([out] (FuturizedStore::Shard::omap_values_t&& vals) {
// just the difference in comparator (`std::less<>` in omap_values_t`)
- reinterpret_cast<FuturizedStore::omap_values_t&>(*out) = std::move(vals);
+ reinterpret_cast<FuturizedStore::Shard::omap_values_t&>(*out) = std::move(vals);
return 0;
- }, FuturizedStore::read_errorator::all_same_way([] (auto& e) {
+ }, FuturizedStore::Shard::read_errorator::all_same_way([] (auto& e) {
assert(e.value() > 0);
return -e.value();
}))); // this requires seastar::thread
using crimson::os::FuturizedStore;
return interruptor::green_get(os->omap_get_values(
ch, hoid, key
- ).safe_then_unpack([&key, next] (bool, FuturizedStore::omap_values_t&& vals) {
+ ).safe_then_unpack([&key, next] (bool, FuturizedStore::Shard::omap_values_t&& vals) {
CRIMSON_DEBUG("OSDriver::{}:{}", "get_next", __LINE__);
if (auto nit = std::begin(vals); nit == std::end(vals)) {
CRIMSON_DEBUG("OSDriver::{}:{}", "get_next", __LINE__);
*next = *nit;
return 0;
}
- }, FuturizedStore::read_errorator::all_same_way([] {
+ }, FuturizedStore::Shard::read_errorator::all_same_way([] {
CRIMSON_DEBUG("OSDriver::{}:{}", "get_next", __LINE__);
return -EINVAL;
}))); // this requires seastar::thread
using crimson::os::FuturizedStore;
// let's try to get current first
return interruptor::green_get(os->omap_get_values(
- ch, hoid, FuturizedStore::omap_keys_t{key}
- ).safe_then([&key, next_or_current] (FuturizedStore::omap_values_t&& vals) {
+ ch, hoid, FuturizedStore::Shard::omap_keys_t{key}
+ ).safe_then([&key, next_or_current] (FuturizedStore::Shard::omap_values_t&& vals) {
assert(vals.size() == 1);
*next_or_current = std::make_pair(key, std::move(vals[0]));
return 0;
- }, FuturizedStore::read_errorator::all_same_way(
+ }, FuturizedStore::Shard::read_errorator::all_same_way(
[next_or_current, &key, this] {
// no current, try next
return get_next(key, next_or_current);
class OSDriver : public MapCacher::StoreDriver<std::string, ceph::buffer::list> {
#ifdef WITH_SEASTAR
- using ObjectStoreT = crimson::os::FuturizedStore;
+ using ObjectStoreT = crimson::os::FuturizedStore::Shard;
using CollectionHandleT = ObjectStoreT::CollectionRef;
#else
using ObjectStoreT = ObjectStore;
num_main_device_managers(num_main_device_managers),
num_cold_device_managers(num_cold_device_managers) {}
- virtual void _init() = 0;
+ virtual seastar::future<> _init() = 0;
virtual void _destroy() = 0;
virtual seastar::future<> _teardown() = 0;
SUBINFO(test, "begin ...");
return teardown().then([this] {
devices->remount();
- _init();
- return _mount().handle_error(crimson::ct_error::assert_all{});
+ return _init().then([this] {
+ return _mount().handle_error(crimson::ct_error::assert_all{});
+ });
}).then([FNAME] {
SUBINFO(test, "finish");
});
}
SUBINFO(test, "begin with {} devices ...", devices->get_num_devices());
return devices->setup(
- ).then([this, FNAME]() {
- _init();
- return _mkfs(
+ ).then([this]() {
+ return _init();
+ }).then([this, FNAME] {
+ return _mkfs(
).safe_then([this] {
- return restart_fut();
+ return restart_fut();
}).handle_error(
- crimson::ct_error::assert_all{}
+ crimson::ct_error::assert_all{}
).then([FNAME] {
- SUBINFO(test, "finish");
- });
+ SUBINFO(test, "finish");
+ });
});
}
TMTestState(std::size_t num_main_devices, std::size_t num_cold_devices)
: EphemeralTestState(num_main_devices, num_cold_devices) {}
- virtual void _init() override {
+ virtual seastar::future<> _init() override {
auto sec_devices = devices->get_secondary_devices();
auto p_dev = devices->get_primary_device();
tm = make_transaction_manager(p_dev, sec_devices, true);
epm = tm->get_epm();
lba_manager = tm->get_lba_manager();
cache = tm->get_cache();
+ return seastar::now();
}
virtual void _destroy() override {
SeaStoreTestState() : EphemeralTestState(1, 0) {}
- virtual void _init() final {
+ virtual seastar::future<> _init() final {
seastore = make_test_seastore(
- devices->get_primary_device_ref(),
std::make_unique<TestMDStoreState::Store>(mdstore_state.get_mdstore()));
+ return seastore->test_start(devices->get_primary_device_ref());
}
virtual void _destroy() final {