]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/os/futurized_store: create ShardedStoreProxy
authorSamuel Just <sjust@redhat.com>
Wed, 24 Aug 2022 20:16:28 +0000 (13:16 -0700)
committerSamuel Just <sjust@redhat.com>
Tue, 27 Sep 2022 02:35:38 +0000 (19:35 -0700)
For now, FuturizedStore implementations assume that methods are invoked
on core 0.  Later, we'll adapt each implementation to intelligently
support invocation on any pg core.  Until then, this wrapper converts
the existing implementations to a safe, if not particuarly performant,
proxy behavior.

AlienStore should be safe as is.

Signed-off-by: Samuel Just <sjust@redhat.com>
src/crimson/os/futurized_store.cc
src/crimson/os/futurized_store.h

index e072c0d262bd70154ee5871c943450afef426252..24f632b9cb7907f026bce27df3ae6f80c3d2d054 100644 (file)
@@ -16,20 +16,23 @@ FuturizedStore::create(const std::string& type,
                        const ConfigValues& values)
 {
   if (type == "cyanstore") {
+    using crimson::os::CyanStore;
     return seastar::make_ready_future<std::unique_ptr<FuturizedStore>>(
-      std::make_unique<crimson::os::CyanStore>(data));
+      std::make_unique<ShardedStoreProxy<CyanStore>>(data));
   } else if (type == "seastore") {
     return crimson::os::seastore::make_seastore(
       data, values
     ).then([] (auto seastore) {
       return seastar::make_ready_future<std::unique_ptr<FuturizedStore>>(
-       seastore.release());
+       std::make_unique<ShardedStoreProxy<seastore::SeaStore>>(
+         seastore.release()));
     });
   } else {
+    using crimson::os::AlienStore;
 #ifdef WITH_BLUESTORE
     // use AlienStore as a fallback. It adapts e.g. BlueStore.
     return seastar::make_ready_future<std::unique_ptr<FuturizedStore>>(
-      std::make_unique<crimson::os::AlienStore>(type, data, values));
+      std::make_unique<AlienStore>(type, data, values));
 #else
     ceph_abort_msgf("unsupported objectstore type: %s", type.c_str());
     return {};
index 20f3a81f7dbdf39f761146d8253b408014072f38..0122625fba055d2151a87818bdd55daca711ddad 100644 (file)
@@ -11,6 +11,7 @@
 #include <seastar/core/future.hh>
 
 #include "os/Transaction.h"
+#include "crimson/common/smp_helpers.h"
 #include "crimson/osd/exceptions.h"
 #include "include/buffer_fwd.h"
 #include "include/uuid.h"
@@ -185,4 +186,251 @@ inline void intrusive_ptr_release(FuturizedStore::OmapIterator* iter) {
   }
 }
 
+/**
+ * ShardedStoreProxy
+ *
+ * Simple helper to proxy FuturizedStore operations to the core on which
+ * the store was initialized for implementations without support for multiple
+ * reactors.
+ */
+template <typename T>
+class ShardedStoreProxy : public FuturizedStore {
+  const core_id_t core;
+  std::unique_ptr<T> impl;
+  uuid_d fsid;
+  unsigned max_attr = 0;
+
+  template <typename Method, typename... Args>
+  decltype(auto) proxy(Method method, Args&&... args) const {
+    return proxy_method_on_core(
+      core, *impl, method, std::forward<Args>(args)...);
+  }
+
+  template <typename Method, typename... Args>
+  decltype(auto) proxy(Method method, Args&&... args) {
+    return proxy_method_on_core(
+      core, *impl, method, std::forward<Args>(args)...);
+  }
+
+  /**
+   * _OmapIterator
+   *
+   * Proxies OmapIterator operations to store's core.  Assumes that
+   * syncronous methods are safe to call directly from calling core
+   * since remote store should only be touching that memory during
+   * a method invocation.
+   *
+   * TODO: We don't really need OmapIterator at all, replace it with
+   * an appropriately paged omap_get_values variant.
+   */
+  class _OmapIterator : public OmapIterator {
+    using fref_t = seastar::foreign_ptr<OmapIteratorRef>;
+    const core_id_t core;
+    fref_t impl;
+
+    template <typename Method, typename... Args>
+    decltype(auto) proxy(Method method, Args&&... args) {
+      return proxy_method_on_core(
+       core, *impl, method, std::forward<Args>(args)...);
+    }
+
+  public:
+    _OmapIterator(core_id_t core, fref_t &&impl)
+      : core(core), impl(std::move(impl)) {}
+
+    seastar::future<> seek_to_first() final {
+      return proxy(&OmapIterator::seek_to_first);
+    }
+    seastar::future<> upper_bound(const std::string &after) final {
+      return proxy(&OmapIterator::upper_bound, after);
+    }
+    seastar::future<> lower_bound(const std::string &to) final {
+      return proxy(&OmapIterator::lower_bound, to);
+    }
+    bool valid() const final {
+      return impl->valid();
+    }
+    seastar::future<> next() final {
+      return proxy(&OmapIterator::next);
+    }
+    std::string key() final {
+      return impl->key();
+    }
+    ceph::buffer::list value() final {
+      return impl->value();
+    }
+    int status() const final {
+      return impl->status();
+    }
+    ~_OmapIterator() = default;
+  };
+
+
+public:
+  ShardedStoreProxy(T *t)
+    : core(seastar::this_shard_id()),
+      impl(t) {}
+  template <typename... Args>
+  ShardedStoreProxy(Args&&... args)
+    : core(seastar::this_shard_id()),
+      impl(std::make_unique<T>(std::forward<Args>(args)...)) {}
+  ~ShardedStoreProxy() = default;
+
+  // no copying
+  explicit ShardedStoreProxy(const ShardedStoreProxy &o) = delete;
+  const ShardedStoreProxy &operator=(const ShardedStoreProxy &o) = delete;
+
+  seastar::future<> start() final { return proxy(&T::start); }
+  seastar::future<> stop() final { return proxy(&T::stop); }
+  mount_ertr::future<> mount() final {
+    auto ret = seastar::smp::submit_to(
+      core,
+      [this] {
+       auto ret = impl->mount(
+       ).safe_then([this] {
+         fsid = impl->get_fsid();
+         max_attr = impl->get_max_attr_name_length();
+         return seastar::now();
+       });
+       return std::move(ret).to_base();
+      });
+    return mount_ertr::future<>(std::move(ret));
+  }
+  seastar::future<> umount() final { return proxy(&T::umount); }
+  mkfs_ertr::future<> mkfs(uuid_d new_osd_fsid) final {
+    return proxy(&T::mkfs, new_osd_fsid);
+  }
+  seastar::future<store_statfs_t> stat() const final {
+    return crimson::submit_to(core, [this] { return impl->stat(); });
+  }
+  read_errorator::future<ceph::bufferlist> read(
+    CollectionRef c,
+    const ghobject_t &oid,
+    uint64_t offset,
+    size_t len,
+    uint32_t op_flags = 0) final {
+    return proxy(&T::read, std::move(c), oid, offset, len, op_flags);
+  }
+  read_errorator::future<ceph::bufferlist> readv(
+    CollectionRef c,
+    const ghobject_t &oid,
+    interval_set<uint64_t> &m,
+    uint32_t op_flags = 0) final {
+    return crimson::submit_to(core, [this, c, oid, m, op_flags]() mutable {
+      return impl->readv(c, oid, m, op_flags);
+    });
+  }
+  get_attr_errorator::future<ceph::bufferlist> get_attr(
+    CollectionRef c,
+    const ghobject_t &oid,
+    std::string_view name) const final {
+    return proxy(&T::get_attr, std::move(c), oid, std::string(name));
+  }
+  get_attrs_ertr::future<attrs_t> get_attrs(
+    CollectionRef c,
+    const ghobject_t &oid) final {
+    return proxy(&T::get_attrs, std::move(c), oid);
+  }
+  seastar::future<struct stat> stat(
+    CollectionRef c,
+    const ghobject_t &oid) final {
+    return crimson::submit_to(
+      core,
+      [this, c, oid] {
+       return impl->stat(c, oid);
+      });
+  }
+  read_errorator::future<omap_values_t> omap_get_values(
+    CollectionRef c,
+    const ghobject_t &oid,
+    const omap_keys_t &keys) final {
+    return crimson::submit_to(core, [this, c, oid, keys] {
+      return impl->omap_get_values(c, oid, keys);
+    });
+  }
+  seastar::future<std::tuple<std::vector<ghobject_t>, ghobject_t>>
+  list_objects(
+    CollectionRef c,
+    const ghobject_t &start,
+    const ghobject_t &end,
+    uint64_t limit) const final {
+    return proxy(&T::list_objects, std::move(c), start, end, limit);
+  }
+  read_errorator::future<std::tuple<bool, omap_values_t>> omap_get_values(
+    CollectionRef c,
+    const ghobject_t &oid,
+    const std::optional<std::string> &start) final {
+    return crimson::submit_to(core, [this, c, oid, start] {
+      return impl->omap_get_values(c, oid, start);
+    });
+  }
+  get_attr_errorator::future<bufferlist> omap_get_header(
+    CollectionRef c,
+    const ghobject_t &oid) final {
+    return proxy(&T::omap_get_header, std::move(c), oid);
+  }
+  seastar::future<CollectionRef> create_new_collection(const coll_t &cid) final {
+    return proxy(&T::create_new_collection, cid);
+  }
+  seastar::future<CollectionRef> open_collection(const coll_t &cid) final {
+    return proxy(&T::open_collection, cid);
+  }
+  seastar::future<std::vector<coll_t>> list_collections() final {
+    return proxy(&T::list_collections);
+  }
+  seastar::future<> do_transaction(
+    CollectionRef ch,
+    ceph::os::Transaction &&txn) final {
+    return proxy(&T::do_transaction, std::move(ch), std::move(txn));
+  }
+  seastar::future<> flush(CollectionRef ch) final {
+    return proxy(&T::flush, std::move(ch));
+  }
+  seastar::future<> inject_data_error(const ghobject_t &o) final {
+    return proxy(&T::inject_data_error, o);
+  }
+  seastar::future<> inject_mdata_error(const ghobject_t &o) final {
+    return proxy(&T::inject_mdata_error, o);
+  }
+
+  seastar::future<OmapIteratorRef> get_omap_iterator(
+    CollectionRef ch,
+    const ghobject_t &oid) final {
+    return crimson::submit_to(
+      core,
+      [this, ch=std::move(ch), oid]() mutable {
+       return impl->get_omap_iterator(
+         std::move(ch), oid
+       ).then([](auto iref) {
+         return seastar::foreign_ptr(iref);
+       });
+      }).then([this](auto iref) {
+       return OmapIteratorRef(new _OmapIterator(core, std::move(iref)));
+      });
+  }
+  read_errorator::future<std::map<uint64_t, uint64_t>> fiemap(
+    CollectionRef ch,
+    const ghobject_t &oid,
+    uint64_t off,
+    uint64_t len) final {
+    return proxy(&T::fiemap, std::move(ch), oid, off, len);
+  }
+
+  seastar::future<> write_meta(
+    const std::string &key,
+    const std::string &value) final {
+    return proxy(&T::write_meta, key, value);
+  }
+  seastar::future<std::tuple<int, std::string>> read_meta(
+    const std::string &key) final {
+    return proxy(&T::read_meta, key);
+  }
+  uuid_d get_fsid() const final {
+    return fsid;
+  }
+  unsigned get_max_attr_name_length() const final {
+    return max_attr;
+  }
+};
+
 }