From 2c1f213bfa8ae54c0f8337b7471d0bfa47a08b0f Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Wed, 24 Aug 2022 13:16:28 -0700 Subject: [PATCH] crimson/os/futurized_store: create ShardedStoreProxy For now, FuturizedStore implementations assume that methods are invoked on core 0. Later, we'll adapt each implementation to intelligently support invocation on any pg core. Until then, this wrapper converts the existing implementations to a safe, if not particuarly performant, proxy behavior. AlienStore should be safe as is. Signed-off-by: Samuel Just --- src/crimson/os/futurized_store.cc | 9 +- src/crimson/os/futurized_store.h | 248 ++++++++++++++++++++++++++++++ 2 files changed, 254 insertions(+), 3 deletions(-) diff --git a/src/crimson/os/futurized_store.cc b/src/crimson/os/futurized_store.cc index e072c0d262b..24f632b9cb7 100644 --- a/src/crimson/os/futurized_store.cc +++ b/src/crimson/os/futurized_store.cc @@ -16,20 +16,23 @@ FuturizedStore::create(const std::string& type, const ConfigValues& values) { if (type == "cyanstore") { + using crimson::os::CyanStore; return seastar::make_ready_future>( - std::make_unique(data)); + std::make_unique>(data)); } else if (type == "seastore") { return crimson::os::seastore::make_seastore( data, values ).then([] (auto seastore) { return seastar::make_ready_future>( - seastore.release()); + std::make_unique>( + seastore.release())); }); } else { + using crimson::os::AlienStore; #ifdef WITH_BLUESTORE // use AlienStore as a fallback. It adapts e.g. BlueStore. return seastar::make_ready_future>( - std::make_unique(type, data, values)); + std::make_unique(type, data, values)); #else ceph_abort_msgf("unsupported objectstore type: %s", type.c_str()); return {}; diff --git a/src/crimson/os/futurized_store.h b/src/crimson/os/futurized_store.h index 20f3a81f7db..0122625fba0 100644 --- a/src/crimson/os/futurized_store.h +++ b/src/crimson/os/futurized_store.h @@ -11,6 +11,7 @@ #include #include "os/Transaction.h" +#include "crimson/common/smp_helpers.h" #include "crimson/osd/exceptions.h" #include "include/buffer_fwd.h" #include "include/uuid.h" @@ -185,4 +186,251 @@ inline void intrusive_ptr_release(FuturizedStore::OmapIterator* iter) { } } +/** + * ShardedStoreProxy + * + * Simple helper to proxy FuturizedStore operations to the core on which + * the store was initialized for implementations without support for multiple + * reactors. + */ +template +class ShardedStoreProxy : public FuturizedStore { + const core_id_t core; + std::unique_ptr impl; + uuid_d fsid; + unsigned max_attr = 0; + + template + decltype(auto) proxy(Method method, Args&&... args) const { + return proxy_method_on_core( + core, *impl, method, std::forward(args)...); + } + + template + decltype(auto) proxy(Method method, Args&&... args) { + return proxy_method_on_core( + core, *impl, method, std::forward(args)...); + } + + /** + * _OmapIterator + * + * Proxies OmapIterator operations to store's core. Assumes that + * syncronous methods are safe to call directly from calling core + * since remote store should only be touching that memory during + * a method invocation. + * + * TODO: We don't really need OmapIterator at all, replace it with + * an appropriately paged omap_get_values variant. + */ + class _OmapIterator : public OmapIterator { + using fref_t = seastar::foreign_ptr; + const core_id_t core; + fref_t impl; + + template + decltype(auto) proxy(Method method, Args&&... args) { + return proxy_method_on_core( + core, *impl, method, std::forward(args)...); + } + + public: + _OmapIterator(core_id_t core, fref_t &&impl) + : core(core), impl(std::move(impl)) {} + + seastar::future<> seek_to_first() final { + return proxy(&OmapIterator::seek_to_first); + } + seastar::future<> upper_bound(const std::string &after) final { + return proxy(&OmapIterator::upper_bound, after); + } + seastar::future<> lower_bound(const std::string &to) final { + return proxy(&OmapIterator::lower_bound, to); + } + bool valid() const final { + return impl->valid(); + } + seastar::future<> next() final { + return proxy(&OmapIterator::next); + } + std::string key() final { + return impl->key(); + } + ceph::buffer::list value() final { + return impl->value(); + } + int status() const final { + return impl->status(); + } + ~_OmapIterator() = default; + }; + + +public: + ShardedStoreProxy(T *t) + : core(seastar::this_shard_id()), + impl(t) {} + template + ShardedStoreProxy(Args&&... args) + : core(seastar::this_shard_id()), + impl(std::make_unique(std::forward(args)...)) {} + ~ShardedStoreProxy() = default; + + // no copying + explicit ShardedStoreProxy(const ShardedStoreProxy &o) = delete; + const ShardedStoreProxy &operator=(const ShardedStoreProxy &o) = delete; + + seastar::future<> start() final { return proxy(&T::start); } + seastar::future<> stop() final { return proxy(&T::stop); } + mount_ertr::future<> mount() final { + auto ret = seastar::smp::submit_to( + core, + [this] { + auto ret = impl->mount( + ).safe_then([this] { + fsid = impl->get_fsid(); + max_attr = impl->get_max_attr_name_length(); + return seastar::now(); + }); + return std::move(ret).to_base(); + }); + return mount_ertr::future<>(std::move(ret)); + } + seastar::future<> umount() final { return proxy(&T::umount); } + mkfs_ertr::future<> mkfs(uuid_d new_osd_fsid) final { + return proxy(&T::mkfs, new_osd_fsid); + } + seastar::future stat() const final { + return crimson::submit_to(core, [this] { return impl->stat(); }); + } + read_errorator::future read( + CollectionRef c, + const ghobject_t &oid, + uint64_t offset, + size_t len, + uint32_t op_flags = 0) final { + return proxy(&T::read, std::move(c), oid, offset, len, op_flags); + } + read_errorator::future readv( + CollectionRef c, + const ghobject_t &oid, + interval_set &m, + uint32_t op_flags = 0) final { + return crimson::submit_to(core, [this, c, oid, m, op_flags]() mutable { + return impl->readv(c, oid, m, op_flags); + }); + } + get_attr_errorator::future get_attr( + CollectionRef c, + const ghobject_t &oid, + std::string_view name) const final { + return proxy(&T::get_attr, std::move(c), oid, std::string(name)); + } + get_attrs_ertr::future get_attrs( + CollectionRef c, + const ghobject_t &oid) final { + return proxy(&T::get_attrs, std::move(c), oid); + } + seastar::future stat( + CollectionRef c, + const ghobject_t &oid) final { + return crimson::submit_to( + core, + [this, c, oid] { + return impl->stat(c, oid); + }); + } + read_errorator::future omap_get_values( + CollectionRef c, + const ghobject_t &oid, + const omap_keys_t &keys) final { + return crimson::submit_to(core, [this, c, oid, keys] { + return impl->omap_get_values(c, oid, keys); + }); + } + seastar::future, ghobject_t>> + list_objects( + CollectionRef c, + const ghobject_t &start, + const ghobject_t &end, + uint64_t limit) const final { + return proxy(&T::list_objects, std::move(c), start, end, limit); + } + read_errorator::future> omap_get_values( + CollectionRef c, + const ghobject_t &oid, + const std::optional &start) final { + return crimson::submit_to(core, [this, c, oid, start] { + return impl->omap_get_values(c, oid, start); + }); + } + get_attr_errorator::future omap_get_header( + CollectionRef c, + const ghobject_t &oid) final { + return proxy(&T::omap_get_header, std::move(c), oid); + } + seastar::future create_new_collection(const coll_t &cid) final { + return proxy(&T::create_new_collection, cid); + } + seastar::future open_collection(const coll_t &cid) final { + return proxy(&T::open_collection, cid); + } + seastar::future> list_collections() final { + return proxy(&T::list_collections); + } + seastar::future<> do_transaction( + CollectionRef ch, + ceph::os::Transaction &&txn) final { + return proxy(&T::do_transaction, std::move(ch), std::move(txn)); + } + seastar::future<> flush(CollectionRef ch) final { + return proxy(&T::flush, std::move(ch)); + } + seastar::future<> inject_data_error(const ghobject_t &o) final { + return proxy(&T::inject_data_error, o); + } + seastar::future<> inject_mdata_error(const ghobject_t &o) final { + return proxy(&T::inject_mdata_error, o); + } + + seastar::future get_omap_iterator( + CollectionRef ch, + const ghobject_t &oid) final { + return crimson::submit_to( + core, + [this, ch=std::move(ch), oid]() mutable { + return impl->get_omap_iterator( + std::move(ch), oid + ).then([](auto iref) { + return seastar::foreign_ptr(iref); + }); + }).then([this](auto iref) { + return OmapIteratorRef(new _OmapIterator(core, std::move(iref))); + }); + } + read_errorator::future> fiemap( + CollectionRef ch, + const ghobject_t &oid, + uint64_t off, + uint64_t len) final { + return proxy(&T::fiemap, std::move(ch), oid, off, len); + } + + seastar::future<> write_meta( + const std::string &key, + const std::string &value) final { + return proxy(&T::write_meta, key, value); + } + seastar::future> read_meta( + const std::string &key) final { + return proxy(&T::read_meta, key); + } + uuid_d get_fsid() const final { + return fsid; + } + unsigned get_max_attr_name_length() const final { + return max_attr; + } +}; + } -- 2.39.5