From 24a5e6e9851b66cb90a04fc9639362d2cad668c7 Mon Sep 17 00:00:00 2001 From: "Adam C. Emerson" Date: Tue, 13 Aug 2019 20:36:49 -0400 Subject: [PATCH] neorados: Create new library This library is UNSTABLE and I reserve the right to change its interface at any time for any reason. Signed-off-by: Adam C. Emerson --- src/CMakeLists.txt | 1 + src/common/config.cc | 2 +- src/common/entity_name.cc | 22 +- src/common/entity_name.h | 16 +- src/common/snap_types.h | 2 +- src/include/neorados/RADOS.hpp | 1119 +++++++++++ .../{RADOS => neorados}/RADOS_Decodable.hpp | 10 +- src/include/neorados/buffer_fwd.h | 1 + src/include/neorados/completion.h | 1 + src/include/object.h | 7 +- src/mon/ConfigMap.cc | 2 +- src/mon/ConfigMap.h | 4 +- src/mount/conf.cc | 7 +- src/neorados/CMakeLists.txt | 37 + src/neorados/RADOS.cc | 1658 +++++++++++++++++ src/neorados/RADOSImpl.cc | 112 ++ src/neorados/RADOSImpl.h | 103 + src/osdc/Objecter.cc | 12 +- src/osdc/Objecter.h | 18 +- src/test/CMakeLists.txt | 1 + src/test/neorados/CMakeLists.txt | 2 + src/test/neorados/start_stop.cc | 175 ++ 22 files changed, 3265 insertions(+), 47 deletions(-) create mode 100644 src/include/neorados/RADOS.hpp rename src/include/{RADOS => neorados}/RADOS_Decodable.hpp (93%) create mode 120000 src/include/neorados/buffer_fwd.h create mode 120000 src/include/neorados/completion.h create mode 100644 src/neorados/CMakeLists.txt create mode 100644 src/neorados/RADOS.cc create mode 100644 src/neorados/RADOSImpl.cc create mode 100644 src/neorados/RADOSImpl.h create mode 100644 src/test/neorados/CMakeLists.txt create mode 100644 src/test/neorados/start_stop.cc diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index afedbfdd9da2f..6d829e0542d13 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -490,6 +490,7 @@ option(WITH_LIBRADOSSTRIPER "build with libradosstriper support" ON) add_subdirectory(include) add_subdirectory(librados) +add_subdirectory(neorados) if(WITH_LIBRADOSSTRIPER) add_subdirectory(libradosstriper) diff --git a/src/common/config.cc b/src/common/config.cc index 792cf5fa68cb4..a8cba940384b8 100644 --- a/src/common/config.cc +++ b/src/common/config.cc @@ -1319,7 +1319,7 @@ void md_config_t::_get_my_sections(const ConfigValues& values, { sections.push_back(values.name.to_str()); - sections.push_back(values.name.get_type_name()); + sections.push_back(values.name.get_type_name().data()); sections.push_back("global"); } diff --git a/src/common/entity_name.cc b/src/common/entity_name.cc index 37d02bd94df46..2eb24829a1c86 100644 --- a/src/common/entity_name.cc +++ b/src/common/entity_name.cc @@ -42,22 +42,22 @@ to_cstr() const } bool EntityName:: -from_str(const string& s) +from_str(std::string_view s) { size_t pos = s.find('.'); if (pos == string::npos) return false; - - string type_ = s.substr(0, pos); - string id_ = s.substr(pos + 1); + + auto type_ = s.substr(0, pos); + auto id_ = s.substr(pos + 1); if (set(type_, id_)) return false; return true; } void EntityName:: -set(uint32_t type_, const std::string &id_) +set(uint32_t type_, std::string_view id_) { type = type_; id = id_; @@ -72,9 +72,9 @@ set(uint32_t type_, const std::string &id_) } int EntityName:: -set(const std::string &type_, const std::string &id_) +set(std::string_view type_, std::string_view id_) { - uint32_t t = str_to_ceph_entity_type(type_.c_str()); + uint32_t t = str_to_ceph_entity_type(type_); if (t == CEPH_ENTITY_TYPE_ANY) return -EINVAL; set(t, id_); @@ -88,13 +88,13 @@ set_type(uint32_t type_) } int EntityName:: -set_type(const char *type_) +set_type(std::string_view type_) { return set(type_, id); } void EntityName:: -set_id(const std::string &id_) +set_id(std::string_view id_) { set(type, id_); } @@ -106,13 +106,13 @@ void EntityName::set_name(entity_name_t n) set(n.type(), s); } -const char* EntityName:: +std::string_view EntityName:: get_type_str() const { return ceph_entity_type_name(type); } -const char *EntityName:: +std::string_view EntityName:: get_type_name() const { return ceph_entity_type_name(type); diff --git a/src/common/entity_name.h b/src/common/entity_name.h index 1dd56f66d740a..886c4b4946f8e 100644 --- a/src/common/entity_name.h +++ b/src/common/entity_name.h @@ -15,6 +15,8 @@ #ifndef CEPH_COMMON_ENTITY_NAME_H #define CEPH_COMMON_ENTITY_NAME_H +#include + #include #include "msg/msg_types.h" @@ -42,15 +44,15 @@ struct EntityName const std::string& to_str() const; const char *to_cstr() const; - bool from_str(const std::string& s); - void set(uint32_t type_, const std::string &id_); - int set(const std::string &type_, const std::string &id_); + bool from_str(std::string_view s); + void set(uint32_t type_, std::string_view id_); + int set(std::string_view type_, std::string_view id_); void set_type(uint32_t type_); - int set_type(const char *type); - void set_id(const std::string &id_); + int set_type(std::string_view type); + void set_id(std::string_view id_); void set_name(entity_name_t n); - const char* get_type_str() const; + std::string_view get_type_str() const; uint32_t get_type() const { return type; } bool is_osd() const { return get_type() == CEPH_ENTITY_TYPE_OSD; } @@ -59,7 +61,7 @@ struct EntityName bool is_client() const { return get_type() == CEPH_ENTITY_TYPE_CLIENT; } bool is_mon() const { return get_type() == CEPH_ENTITY_TYPE_MON; } - const char * get_type_name() const; + std::string_view get_type_name() const; const std::string &get_id() const; bool has_default_id() const; diff --git a/src/common/snap_types.h b/src/common/snap_types.h index 175183227d143..958aea339a183 100644 --- a/src/common/snap_types.h +++ b/src/common/snap_types.h @@ -53,7 +53,7 @@ struct SnapContext { seq = 0; snaps.clear(); } - bool empty() { return seq == 0; } + bool empty() const { return seq == 0; } void encode(ceph::buffer::list& bl) const { using ceph::encode; diff --git a/src/include/neorados/RADOS.hpp b/src/include/neorados/RADOS.hpp new file mode 100644 index 0000000000000..288268b1505b9 --- /dev/null +++ b/src/include/neorados/RADOS.hpp @@ -0,0 +1,1119 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2018 Red Hat + * Author: Adam C. Emerson + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef NEORADOS_RADOS_HPP +#define NEORADOS_RADOS_HPP + +#include +#include +#include +#include +#include +#include +#include + +#include + +#include +#include +#include + +#include + +// Will be in C++20! + +#include "include/expected.hpp" + +// Had better be in C++20. Why is this not in Boost? + +#include "include/function2.hpp" + +// Things broken out so we can decode them in Objecter. + +#include "include/neorados/RADOS_Decodable.hpp" + +// Needed for type erasure and template support. We can't really avoid +// it. + +#include "common/async/completion.h" + +// These are needed for RGW, but in general as a 'shiny new interface' +// we should try to use forward declarations and provide standard alternatives. + +#include "include/common_fwd.h" + +#include "include/buffer.h" + +#include "common/ceph_time.h" + +namespace neorados { +using namespace std::literals; + +class Object; +class IOContext; +} +namespace std { +template<> +struct hash; +template<> +struct hash; +} + +namespace neorados { +namespace detail { +class RADOS; +} + +class RADOS; + +// Exists mostly so that repeated operations on the same object don't +// have to pay for the string copy to construct an object_t. + +class Object final { + friend RADOS; + friend std::hash; + +public: + Object(); + Object(const char* s); + Object(std::string_view s); + Object(std::string&& s); + Object(const std::string& s); + ~Object(); + + Object(const Object& o); + Object& operator =(const Object& o); + + Object(Object&& o); + Object& operator =(Object&& o); + + operator std::string_view() const; + + friend std::ostream& operator <<(std::ostream& m, const Object& o); + friend bool operator <(const Object& lhs, const Object& rhs); + friend bool operator <=(const Object& lhs, const Object& rhs); + friend bool operator >=(const Object& lhs, const Object& rhs); + friend bool operator >(const Object& lhs, const Object& rhs); + + friend bool operator ==(const Object& lhs, const Object& rhs); + friend bool operator !=(const Object& lhs, const Object& rhs); + +private: + + static constexpr std::size_t impl_size = 4 * 8; + std::aligned_storage_t impl; +}; + +// Not the same as the librados::IoCtx, but it does gather together +// some of the same metadata. Since we're likely to do multiple +// operations in the same pool or namespace, it doesn't make sense to +// redo a bunch of lookups and string copies. + +class IOContext final { + friend RADOS; + friend std::hash; + +public: + + IOContext(); + explicit IOContext(std::int64_t pool); + IOContext(std::int64_t _pool, std::string_view _ns); + IOContext(std::int64_t _pool, std::string&& _ns); + ~IOContext(); + + IOContext(const IOContext& rhs); + IOContext& operator =(const IOContext& rhs); + + IOContext(IOContext&& rhs); + IOContext& operator =(IOContext&& rhs); + + std::int64_t pool() const; + void pool(std::int64_t _pool); + + std::string_view ns() const; + void ns(std::string_view _ns); + void ns(std::string&& _ns); + + std::optional key() const; + void key(std::string_view _key); + void key(std::string&& _key); + void clear_key(); + + std::optional hash() const; + void hash(std::int64_t _hash); + void clear_hash(); + + std::optional read_snap() const; + void read_snap(std::optional _snapid); + + // I can't actually move-construct here since snapid_t is its own + // separate class type, not an alias. + std::optional< + std::pair>> write_snap_context() const; + void write_snap_context(std::optional< + std::pair>> snapc); + + friend std::ostream& operator <<(std::ostream& m, const IOContext& o); + friend bool operator <(const IOContext& lhs, const IOContext& rhs); + friend bool operator <=(const IOContext& lhs, const IOContext& rhs); + friend bool operator >=(const IOContext& lhs, const IOContext& rhs); + friend bool operator >(const IOContext& lhs, const IOContext& rhs); + + friend bool operator ==(const IOContext& lhs, const IOContext& rhs); + friend bool operator !=(const IOContext& lhs, const IOContext& rhs); + +private: + + static constexpr std::size_t impl_size = 16 * 8; + std::aligned_storage_t impl; +}; + +inline constexpr std::string_view all_nspaces("\001"sv); + +enum class cmpxattr_op : std::uint8_t { + eq = 1, + ne = 2, + gt = 3, + gte = 4, + lt = 5, + lte = 6 +}; + +namespace alloc_hint { +enum alloc_hint_t { + sequential_write = 1, + random_write = 2, + sequential_read = 4, + random_read = 8, + append_only = 16, + immutable = 32, + shortlived = 64, + longlived = 128, + compressible = 256, + incompressible = 512 +}; +} + +class Op { + friend RADOS; + +public: + + Op(const Op&) = delete; + Op& operator =(const Op&) = delete; + Op(Op&&); + Op& operator =(Op&&); + ~Op(); + + void set_excl(); + void set_failok(); + void set_fadvise_random(); + void set_fadvise_sequential(); + void set_fadvise_willneed(); + void set_fadvise_dontneed(); + void set_fadvise_nocache(); + + void cmpext(uint64_t off, ceph::buffer::list&& cmp_bl, std::size_t* s); + void cmpxattr(std::string_view name, cmpxattr_op op, + const ceph::buffer::list& val); + void cmpxattr(std::string_view name, cmpxattr_op op, std::uint64_t val); + void assert_version(uint64_t ver); + void assert_exists(); + void cmp_omap(const boost::container::flat_map< + std::string, + std::pair>& assertions); + + void exec(std::string_view cls, std::string_view method, + const ceph::buffer::list& inbl, + ceph::buffer::list* out, + boost::system::error_code* ec = nullptr); + void exec(std::string_view cls, std::string_view method, + const ceph::buffer::list& inbl, + fu2::unique_function f); + void exec(std::string_view cls, std::string_view method, + const ceph::buffer::list& inbl, + fu2::unique_function f); + void exec(std::string_view cls, std::string_view method, + const ceph::buffer::list& inbl, + boost::system::error_code* ec = nullptr); + + + // Flags that apply to all ops in the operation vector + void balance_reads(); + void localize_reads(); + void order_reads_writes(); + void ignore_cache(); + void skiprwlocks(); + void ignore_overlay(); + void full_try(); + void full_force(); + void ignore_redirect(); + void ordersnap(); + void returnvec(); + + std::size_t size() const; + using Signature = void(boost::system::error_code); + using Completion = ceph::async::Completion; + + friend std::ostream& operator <<(std::ostream& m, const Op& o); +protected: + Op(); + static constexpr std::size_t impl_size = 85 * 8; + std::aligned_storage_t impl; +}; + +// This class is /not/ thread-safe. If you want you can wrap it in +// something that locks it. + +class ReadOp final : public Op { + friend RADOS; + +public: + + ReadOp() = default; + ReadOp(const ReadOp&) = delete; + ReadOp(ReadOp&&) = default; + + ReadOp& operator =(const ReadOp&) = delete; + ReadOp& operator =(ReadOp&&) = default; + + void read(size_t off, uint64_t len, ceph::buffer::list* out, + boost::system::error_code* ec = nullptr); + void get_xattr(std::string_view name, ceph::buffer::list* out, + boost::system::error_code* ec = nullptr); + void get_omap_header(ceph::buffer::list*, + boost::system::error_code* ec = nullptr); + + void sparse_read(uint64_t off, uint64_t len, + ceph::buffer::list* out, + std::vector>* extents, + boost::system::error_code* ec = nullptr); + + void stat(std::uint64_t* size, ceph::real_time* mtime, + boost::system::error_code* ec = nullptr); + + void get_omap_keys(std::optional start_after, + std::uint64_t max_return, + boost::container::flat_set* keys, + bool* truncated, + boost::system::error_code* ec = nullptr); + + + void get_xattrs(boost::container::flat_map* kv, + boost::system::error_code* ec = nullptr); + + void get_omap_vals(std::optional start_after, + std::optional filter_prefix, + uint64_t max_return, + boost::container::flat_map* kv, + bool* truncated, + boost::system::error_code* ec = nullptr); + + + void get_omap_vals_by_keys(const boost::container::flat_set& keys, + boost::container::flat_map* kv, + boost::system::error_code* ec = nullptr); + + void list_watchers(std::vector* watchers, + boost::system::error_code* ec = nullptr); + + void list_snaps(struct SnapSet* snaps, + boost::system::error_code* ec = nullptr); +}; + +class WriteOp final : public Op { + friend RADOS; +public: + + WriteOp() = default; + WriteOp(const WriteOp&) = delete; + WriteOp(WriteOp&&) = default; + + WriteOp& operator =(const WriteOp&) = delete; + WriteOp& operator =(WriteOp&&) = default; + + void set_mtime(ceph::real_time t); + void create(bool exclusive); + void write(uint64_t off, ceph::buffer::list&& bl); + void write_full(ceph::buffer::list&& bl); + void writesame(std::uint64_t off, std::uint64_t write_len, + ceph::buffer::list&& bl); + void append(ceph::buffer::list&& bl); + void remove(); + void truncate(uint64_t off); + void zero(uint64_t off, uint64_t len); + void rmxattr(std::string_view name); + void setxattr(std::string_view name, + ceph::buffer::list&& bl); + void rollback(uint64_t snapid); + void set_omap(const boost::container::flat_map& map); + void set_omap_header(ceph::buffer::list&& bl); + void clear_omap(); + void rm_omap_keys(const boost::container::flat_set& to_rm); + void set_alloc_hint(uint64_t expected_object_size, + uint64_t expected_write_size, + alloc_hint::alloc_hint_t flags); +}; + + +struct FSStats { + uint64_t kb; + uint64_t kb_used; + uint64_t kb_avail; + uint64_t num_objects; +}; + +// From librados.h, maybe move into a common file. But I want to see +// if we need/want to amend/add/remove anything first. +struct PoolStats { + /// space used in bytes + uint64_t num_bytes; + /// space used in KB + uint64_t num_kb; + /// number of objects in the pool + uint64_t num_objects; + /// number of clones of objects + uint64_t num_object_clones; + /// num_objects * num_replicas + uint64_t num_object_copies; + /// number of objects missing on primary + uint64_t num_objects_missing_on_primary; + /// number of objects found on no OSDs + uint64_t num_objects_unfound; + /// number of objects replicated fewer times than they should be + /// (but found on at least one OSD) + uint64_t num_objects_degraded; + /// number of objects read + uint64_t num_rd; + /// objects read in KB + uint64_t num_rd_kb; + /// number of objects written + uint64_t num_wr; + /// objects written in KB + uint64_t num_wr_kb; + /// bytes originally provided by user + uint64_t num_user_bytes; + /// bytes passed compression + uint64_t compressed_bytes_orig; + /// bytes resulted after compression + uint64_t compressed_bytes; + /// bytes allocated at storage + uint64_t compressed_bytes_alloc; +}; + +// Placement group, for PG commands +struct PG { + uint64_t pool; + uint32_t seed; +}; + +class Cursor final { +public: + static Cursor begin(); + static Cursor end(); + + Cursor(); + Cursor(const Cursor&); + Cursor& operator =(const Cursor&); + Cursor(Cursor&&); + Cursor& operator =(Cursor&&); + ~Cursor(); + + friend bool operator ==(const Cursor& lhs, + const Cursor& rhs); + friend bool operator !=(const Cursor& lhs, + const Cursor& rhs); + friend bool operator <(const Cursor& lhs, + const Cursor& rhs); + friend bool operator <=(const Cursor& lhs, + const Cursor& rhs); + friend bool operator >=(const Cursor& lhs, + const Cursor& rhs); + friend bool operator >(const Cursor& lhs, + const Cursor& rhs); + + std::string to_str() const; + static std::optional from_str(const std::string& s); + +private: + struct end_magic_t {}; + Cursor(end_magic_t); + Cursor(void*); + friend RADOS; + static constexpr std::size_t impl_size = 16 * 8; + std::aligned_storage_t impl; +}; + +class RADOS final +{ +public: + static constexpr std::tuple version() { + return {0, 0, 1}; + } + + using BuildSig = void(boost::system::error_code, RADOS); + using BuildComp = ceph::async::Completion; + class Builder { + std::optional conf_files; + std::optional cluster; + std::optional name; + std::vector> configs; + bool no_default_conf = false; + bool no_mon_conf = false; + + public: + Builder() = default; + Builder& add_conf_file(std::string_view v); + Builder& set_cluster(std::string_view c) { + cluster = std::string(c); + return *this; + } + Builder& set_name(std::string_view n) { + name = std::string(n); + return *this; + } + Builder& set_no_default_conf() { + no_default_conf = true; + return *this; + } + Builder& set_no_mon_conf() { + no_mon_conf = true; + return *this; + } + Builder& set_conf_option(std::string_view opt, std::string_view val) { + configs.emplace_back(std::string(opt), std::string(val)); + return *this; + } + + template + auto build(boost::asio::io_context& ioctx, CompletionToken&& token) { + boost::asio::async_completion init(token); + build(ioctx, + BuildComp::create(ioctx.get_executor(), + std::move(init.completion_handler))); + return init.result.get(); + } + + private: + void build(boost::asio::io_context& ioctx, + std::unique_ptr c); + }; + + + template + static auto make_with_cct(CephContext* cct, + boost::asio::io_context& ioctx, + CompletionToken&& token) { + boost::asio::async_completion init(token); + make_with_cct(cct, ioctx, + BuildComp::create(ioctx.get_executor(), + std::move(init.completion_handler))); + return init.result.get(); + } + + RADOS(const RADOS&) = delete; + RADOS& operator =(const RADOS&) = delete; + + RADOS(RADOS&&); + RADOS& operator =(RADOS&&); + + ~RADOS(); + + CephContext* cct(); + + using executor_type = boost::asio::io_context::executor_type; + executor_type get_executor() const; + boost::asio::io_context& get_io_context(); + + template + auto execute(const Object& o, const IOContext& ioc, ReadOp&& op, + ceph::buffer::list* bl, + CompletionToken&& token, uint64_t* objver = nullptr) { + boost::asio::async_completion init(token); + execute(o, ioc, std::move(op), bl, + ReadOp::Completion::create(get_executor(), + std::move(init.completion_handler)), + objver); + return init.result.get(); + } + + template + auto execute(const Object& o, const IOContext& ioc, WriteOp&& op, + CompletionToken&& token, uint64_t* objver = nullptr) { + boost::asio::async_completion init(token); + execute(o, ioc, std::move(op), + Op::Completion::create(get_executor(), + std::move(init.completion_handler)), + objver); + return init.result.get(); + } + + template + auto execute(const Object& o, std::int64_t pool, + ReadOp&& op, + ceph::buffer::list* bl, + CompletionToken&& token, + std::optional ns = {}, + std::optional key = {}, + uint64_t* objver = nullptr) { + boost::asio::async_completion init(token); + execute(o, pool, std::move(op), bl, + ReadOp::Completion::create(get_executor(), + std::move(init.completion_handler)), + ns, key, objver); + return init.result.get(); + } + + template + auto execute(const Object& o, std::int64_t pool, WriteOp&& op, + CompletionToken&& token, + std::optional ns = {}, + std::optional key = {}, + uint64_t* objver = nullptr) { + boost::asio::async_completion init(token); + execute(o, pool, std::move(op), + Op::Completion::create(get_executor(), + std::move(init.completion_handler)), + ns, key, objver); + return init.result.get(); + } + + boost::uuids::uuid get_fsid() const noexcept; + + using LookupPoolSig = void(boost::system::error_code, + std::int64_t); + using LookupPoolComp = ceph::async::Completion; + template + auto lookup_pool(std::string_view name, + CompletionToken&& token) { + boost::asio::async_completion init(token); + lookup_pool(name, + LookupPoolComp::create(get_executor(), + std::move(init.completion_handler))); + return init.result.get(); + } + + std::optional get_pool_alignment(int64_t pool_id); + + using LSPoolsSig = void(std::vector>); + using LSPoolsComp = ceph::async::Completion; + template + auto list_pools(CompletionToken&& token) { + boost::asio::async_completion init(token); + list_pools(LSPoolsComp::create(get_executor(), + std::move(init.completion_handler))); + return init.result.get(); + } + + + + using SimpleOpSig = void(boost::system::error_code); + using SimpleOpComp = ceph::async::Completion; + template + auto create_pool_snap(int64_t pool, std::string_view snapName, + CompletionToken&& token) { + boost::asio::async_completion init(token); + create_pool_snap(pool, snapName, + SimpleOpComp::create(get_executor(), + std::move(init.completion_handler))); + return init.result.get(); + } + + using SMSnapSig = void(boost::system::error_code, std::uint64_t); + using SMSnapComp = ceph::async::Completion; + template + auto allocate_selfmanaged_snap(int64_t pool, + CompletionToken&& token) { + boost::asio::async_completion init(token); + allocate_selfmanaged_snap(pool, + SMSnapComp::create( + get_executor(), + std::move(init.completion_handler))); + return init.result.get(); + } + + template + auto delete_pool_snap(int64_t pool, std::string_view snapName, + CompletionToken&& token) { + boost::asio::async_completion init(token); + delete_pool_snap(pool, snapName, + SimpleOpComp::create(get_executor(), + std::move(init.completion_handler))); + return init.result.get(); + } + + template + auto delete_selfmanaged_snap(int64_t pool, std::string_view snapName, + CompletionToken&& token) { + boost::asio::async_completion init(token); + delete_selfmanaged_snap(pool, snapName, + SimpleOpComp::create( + get_executor(), + std::move(init.completion_handler))); + return init.result.get(); + } + + template + auto create_pool(std::string_view name, std::optional crush_rule, + CompletionToken&& token) { + boost::asio::async_completion init(token); + create_pool(name, crush_rule, + SimpleOpComp::create(get_executor(), + std::move(init.completion_handler))); + return init.result.get(); + } + + template + auto delete_pool(std::string_view name, + CompletionToken&& token) { + boost::asio::async_completion init(token); + delete_pool(name, + SimpleOpComp::create(get_executor(), + std::move(init.completion_handler))); + return init.result.get(); + } + + template + auto delete_pool(int64_t pool, + CompletionToken&& token) { + boost::asio::async_completion init(token); + delete_pool(pool, + SimpleOpComp::create(get_executor(), + std::move(init.completion_handler))); + return init.result.get(); + } + + using PoolStatSig = void(boost::system::error_code, + boost::container::flat_map, bool); + using PoolStatComp = ceph::async::Completion; + template + auto stat_pools(const std::vector& pools, + CompletionToken&& token) { + boost::asio::async_completion init(token); + stat_pools(pools, + PoolStatComp::create(get_executor(), + std::move(init.completion_handler))); + return init.result.get(); + } + + using StatFSSig = void(boost::system::error_code, + FSStats); + using StatFSComp = ceph::async::Completion; + template + auto statfs(std::optional pool, + CompletionToken&& token) { + boost::asio::async_completion init(token); + ceph_statfs(pool, StatFSComp::create(get_executor(), + std::move(init.completion_handler))); + return init.result.get(); + } + + using WatchCB = fu2::unique_function; + + using WatchSig = void(boost::system::error_code ec, + uint64_t cookie); + using WatchComp = ceph::async::Completion; + template + auto watch(const Object& o, const IOContext& ioc, + std::optional timeout, + WatchCB&& cb, CompletionToken&& token) { + boost::asio::async_completion init(token); + watch(o, ioc, timeout, std::move(cb), + WatchComp::create(get_executor(), + std::move(init.completion_handler))); + return init.result.get(); + } + + template + auto watch(const Object& o, std::int64_t pool, + std::optional timeout, + WatchCB&& cb, CompletionToken&& token, + std::optional ns = {}, + std::optional key = {}) { + boost::asio::async_completion init(token); + watch(o, pool, timeout, std::move(cb), + WatchComp::create(get_executor(), + std::move(init.completion_handler)), + ns, key); + return init.result.get(); + } + + template + auto notify_ack(const Object& o, + const IOContext& ioc, + uint64_t notify_id, + uint64_t cookie, + ceph::buffer::list&& bl, + CompletionToken&& token) { + boost::asio::async_completion init(token); + notify_ack(o, ioc, notify_id, cookie, std::move(bl), + SimpleOpComp::create(get_executor(), + std::move(init.completion_handler))); + return init.result.get(); + } + + template + auto notify_ack(const Object& o, + std::int64_t pool, + uint64_t notify_id, + uint64_t cookie, + ceph::buffer::list&& bl, + CompletionToken&& token, + std::optional ns = {}, + std::optional key = {}) { + boost::asio::async_completion init(token); + notify_ack(o, pool, notify_id, cookie, std::move(bl), + SimpleOpComp::create(get_executor(), + std::move(init.completion_handler)), + ns, key); + return init.result.get(); + } + + template + auto unwatch(uint64_t cookie, const IOContext& ioc, + CompletionToken&& token) { + boost::asio::async_completion init(token); + unwatch(cookie, ioc, + SimpleOpComp::create(get_executor(), + std::move(init.completion_handler))); + return init.result.get(); + } + + template + auto unwatch(uint64_t cookie, std::int64_t pool, + CompletionToken&& token, + std::optional ns = {}, + std::optional key = {}) { + boost::asio::async_completion init(token); + unwatch(cookie, pool, + SimpleOpComp::create(get_executor(), + std::move(init.completion_handler)), + ns, key); + return init.result.get(); + } + + // This is one of those places where having to force everything into + // a .cc file is really infuriating. If we had modules, that would + // let us separate out the implementation details without + // sacrificing all the benefits of templates. + using VoidOpSig = void(); + using VoidOpComp = ceph::async::Completion; + template + auto flush_watch(CompletionToken&& token) { + boost::asio::async_completion init(token); + flush_watch(VoidOpComp::create(get_executor(), + std::move(init.completion_handler))); + return init.result.get(); + } + + using NotifySig = void(boost::system::error_code, ceph::buffer::list); + using NotifyComp = ceph::async::Completion; + template + auto notify(const Object& oid, const IOContext& ioc, ceph::buffer::list&& bl, + std::optional timeout, + CompletionToken&& token) { + boost::asio::async_completion init(token); + notify(oid, ioc, std::move(bl), timeout, + NotifyComp::create(get_executor(), + std::move(init.completion_handler))); + + return init.result.get(); + } + + template + auto notify(const Object& oid, std::int64_t pool, ceph::buffer::list&& bl, + std::optional timeout, + CompletionToken&& token, + std::optional ns = {}, + std::optional key = {}) { + boost::asio::async_completion init(token); + notify(oid, pool, bl, timeout, + NotifyComp::create(get_executor(), + std::move(init.completion_handler)), + ns, key); + + return init.result.get(); + } + + // The versions with pointers are fine for coroutines, but + // extraordinarily unappealing for callback-oriented programming. + using EnumerateSig = void(boost::system::error_code, + std::vector, + Cursor); + using EnumerateComp = ceph::async::Completion; + template + auto enumerate_objects(const IOContext& ioc, const Cursor& begin, + const Cursor& end, const std::uint32_t max, + const ceph::buffer::list& filter, + CompletionToken&& token) { + boost::asio::async_completion init(token); + enumerate_objects(ioc, begin, end, max, filter, + EnumerateComp::create(get_executor(), + std::move(init.completion_handler))); + return init.result.get(); + } + + template + auto enumerate_objects(std::int64_t pool, const Cursor& begin, + const Cursor& end, const std::uint32_t max, + const ceph::buffer::list& filter, + CompletionToken&& token, + std::optional ns = {}, + std::optional key = {}) { + boost::asio::async_completion init(token); + enumerate_objects(pool, begin, end, max, filter, + EnumerateComp::create(get_executor(), + std::move(init.completion_handler)), + ns, key); + return init.result.get(); + } + + using CommandSig = void(boost::system::error_code, + std::string, ceph::buffer::list); + using CommandComp = ceph::async::Completion; + template + auto osd_command(int osd, std::vector&& cmd, + ceph::buffer::list&& in, CompletionToken&& token) { + boost::asio::async_completion init(token); + osd_command(osd, std::move(cmd), std::move(in), + CommandComp::create(get_executor(), + std::move(init.completion_handler))); + return init.result.get(); + } + template + auto pg_command(PG pg, std::vector&& cmd, + ceph::buffer::list&& in, CompletionToken&& token) { + boost::asio::async_completion init(token); + pg_command(pg, std::move(cmd), std::move(in), + CommandComp::create(get_executor(), + std::move(init.completion_handler))); + return init.result.get(); + } + + template + auto mon_command(std::vector command, + const ceph::buffer::list& bl, + std::string* outs, ceph::buffer::list* outbl, + CompletionToken&& token) { + boost::asio::async_completion init(token); + mon_command(command, bl, outs, outbl, + SimpleOpComp::create(get_executor(), + std::move(init.completion_handler))); + return init.result.get(); + } + + template + auto enable_application(std::string_view pool, std::string_view app_name, + bool force, CompletionToken&& token) { + boost::asio::async_completion init(token); + enable_application(pool, app_name, force, + SimpleOpComp::create(get_executor(), + std::move(init.completion_handler))); + return init.result.get(); + } + uint64_t instance_id() const; + +private: + + RADOS(); + + friend Builder; + + RADOS(std::unique_ptr impl); + static void make_with_cct(CephContext* cct, + boost::asio::io_context& ioctx, + std::unique_ptr c); + + void execute(const Object& o, const IOContext& ioc, ReadOp&& op, + ceph::buffer::list* bl, std::unique_ptr c, + uint64_t* objver); + + void execute(const Object& o, const IOContext& ioc, WriteOp&& op, + std::unique_ptr c, uint64_t* objver); + + void execute(const Object& o, std::int64_t pool, ReadOp&& op, + ceph::buffer::list* bl, std::unique_ptr c, + std::optional ns, + std::optional key, + uint64_t* objver); + + void execute(const Object& o, std::int64_t pool, WriteOp&& op, + std::unique_ptr c, + std::optional ns, + std::optional key, + uint64_t* objver); + + void lookup_pool(std::string_view name, std::unique_ptr c); + void list_pools(std::unique_ptr c); + void create_pool_snap(int64_t pool, std::string_view snapName, + std::unique_ptr c); + void allocate_selfmanaged_snap(int64_t pool, std::unique_ptr c); + void delete_pool_snap(int64_t pool, std::string_view snapName, + std::unique_ptr c); + void delete_selfmanaged_snap(int64_t pool, std::uint64_t snap, + std::unique_ptr c); + void create_pool(std::string_view name, std::optional crush_rule, + std::unique_ptr c); + void delete_pool(std::string_view name, + std::unique_ptr c); + void delete_pool(int64_t pool, + std::unique_ptr c); + void stat_pools(const std::vector& pools, + std::unique_ptr c); + void stat_fs(std::optional pool, + std::unique_ptr c); + + void watch(const Object& o, const IOContext& ioc, + std::optional timeout, + WatchCB&& cb, std::unique_ptr c); + void watch(const Object& o, std::int64_t pool, + std::optional timeout, + WatchCB&& cb, std::unique_ptr c, + std::optional ns, + std::optional key); + tl::expected + watch_check(uint64_t cookie); + void notify_ack(const Object& o, + const IOContext& _ioc, + uint64_t notify_id, + uint64_t cookie, + ceph::buffer::list&& bl, + std::unique_ptr); + void notify_ack(const Object& o, + std::int64_t pool, + uint64_t notify_id, + uint64_t cookie, + ceph::buffer::list&& bl, + std::unique_ptr, + std::optional ns, + std::optional key); + void unwatch(uint64_t cookie, const IOContext& ioc, + std::unique_ptr); + void unwatch(uint64_t cookie, std::int64_t pool, + std::unique_ptr, + std::optional ns, + std::optional key); + void notify(const Object& oid, const IOContext& ioctx, + ceph::buffer::list&& bl, + std::optional timeout, + std::unique_ptr c); + void notify(const Object& oid, std::int64_t pool, + ceph::buffer::list&& bl, + std::optional timeout, + std::unique_ptr c, + std::optional ns, + std::optional key); + void flush_watch(std::unique_ptr); + + void enumerate_objects(const IOContext& ioc, const Cursor& begin, + const Cursor& end, const std::uint32_t max, + const ceph::buffer::list& filter, + std::vector* ls, + Cursor* cursor, + std::unique_ptr c); + void enumerate_objects(std::int64_t pool, const Cursor& begin, + const Cursor& end, const std::uint32_t max, + const ceph::buffer::list& filter, + std::vector* ls, + Cursor* cursor, + std::unique_ptr c, + std::optional ns, + std::optional key); + void enumerate_objects(const IOContext& ioc, const Cursor& begin, + const Cursor& end, const std::uint32_t max, + const ceph::buffer::list& filter, + std::unique_ptr c); + void enumerate_objects(std::int64_t pool, const Cursor& begin, + const Cursor& end, const std::uint32_t max, + const ceph::buffer::list& filter, + std::unique_ptr c, + std::optional ns, + std::optional key); + void osd_command(int osd, std::vector&& cmd, + ceph::buffer::list&& in, std::unique_ptr c); + void pg_command(PG pg, std::vector&& cmd, + ceph::buffer::list&& in, std::unique_ptr c); + + void mon_command(std::vector command, + const ceph::buffer::list& bl, + std::string* outs, ceph::buffer::list* outbl, + std::unique_ptr c); + + void enable_application(std::string_view pool, std::string_view app_name, + bool force, std::unique_ptr c); + + + // Since detail::RADOS has immovable things inside it, hold a + // unique_ptr to it so we can be moved. + std::unique_ptr impl; +}; + +enum class errc { + pool_dne = 1, + invalid_snapcontext +}; + +const boost::system::error_category& error_category() noexcept; +} + +namespace boost::system { +template<> +struct is_error_code_enum<::neorados::errc> { + static const bool value = true; +}; + +template<> +struct is_error_condition_enum<::neorados::errc> { + static const bool value = false; +}; +} + +namespace neorados { +// explicit conversion: +inline boost::system::error_code make_error_code(errc e) noexcept { + return { static_cast(e), error_category() }; +} + +// implicit conversion: +inline boost::system::error_condition make_error_condition(errc e) noexcept { + return { static_cast(e), error_category() }; +} +} + +namespace std { +template<> +struct hash { + size_t operator ()(const neorados::Object& r) const; +}; +template<> +struct hash { + size_t operator ()(const neorados::IOContext& r) const; +}; +} // namespace std + +#endif // NEORADOS_RADOS_HPP diff --git a/src/include/RADOS/RADOS_Decodable.hpp b/src/include/neorados/RADOS_Decodable.hpp similarity index 93% rename from src/include/RADOS/RADOS_Decodable.hpp rename to src/include/neorados/RADOS_Decodable.hpp index 023ebb808e0cd..9654a84898a18 100644 --- a/src/include/RADOS/RADOS_Decodable.hpp +++ b/src/include/neorados/RADOS_Decodable.hpp @@ -13,8 +13,8 @@ * */ -#ifndef RADOS_DECODABLE_HPP -#define RADOS_DECODABLE_HPP +#ifndef NEORADOS_RADOS_DECODABLE_HPP +#define NEORADOS_RADOS_DECODABLE_HPP #include #include @@ -24,7 +24,7 @@ #include #include -namespace RADOS { +namespace neorados { struct Entry { std::string nspace; std::string oid; @@ -96,8 +96,8 @@ struct ObjWatcher { namespace std { template<> -struct hash<::RADOS::Entry> { - std::size_t operator ()(::RADOS::Entry e) const { +struct hash<::neorados::Entry> { + std::size_t operator ()(::neorados::Entry e) const { hash h; return (h(e.nspace) << 2) ^ (h(e.oid) << 1) ^ h(e.locator); } diff --git a/src/include/neorados/buffer_fwd.h b/src/include/neorados/buffer_fwd.h new file mode 120000 index 0000000000000..bd1f6f1b06413 --- /dev/null +++ b/src/include/neorados/buffer_fwd.h @@ -0,0 +1 @@ +../buffer_fwd.h \ No newline at end of file diff --git a/src/include/neorados/completion.h b/src/include/neorados/completion.h new file mode 120000 index 0000000000000..100678fc2a510 --- /dev/null +++ b/src/include/neorados/completion.h @@ -0,0 +1 @@ +../../common/async/completion.h \ No newline at end of file diff --git a/src/include/object.h b/src/include/object.h index b0f21bc455806..96951e74de79b 100644 --- a/src/include/object.h +++ b/src/include/object.h @@ -17,10 +17,11 @@ #include #include -#include #include +#include #include - +#include +#include #include "include/rados.h" #include "include/unordered_map.h" @@ -40,6 +41,8 @@ struct object_t { object_t(const char *s) : name(s) {} // cppcheck-suppress noExplicitConstructor object_t(const std::string& s) : name(s) {} + object_t(std::string&& s) : name(std::move(s)) {} + object_t(std::string_view s) : name(s) {} void swap(object_t& o) { name.swap(o.name); diff --git a/src/mon/ConfigMap.cc b/src/mon/ConfigMap.cc index bf823cc57a27c..7fb13841df423 100644 --- a/src/mon/ConfigMap.cc +++ b/src/mon/ConfigMap.cc @@ -144,7 +144,7 @@ ConfigMap::generate_entity_map( vector> sections = { make_pair("global", &global) }; auto p = by_type.find(name.get_type_name()); if (p != by_type.end()) { - sections.push_back(make_pair(name.get_type_name(), &p->second)); + sections.emplace_back(name.get_type_name(), &p->second); } vector name_bits; boost::split(name_bits, name.to_str(), [](char c){ return c == '.'; }); diff --git a/src/mon/ConfigMap.h b/src/mon/ConfigMap.h index bac00955f9a9a..71b725a40af04 100644 --- a/src/mon/ConfigMap.h +++ b/src/mon/ConfigMap.h @@ -97,8 +97,8 @@ struct Section { struct ConfigMap { Section global; - std::map by_type; - std::map by_id; + std::map> by_type; + std::map> by_id; Section *find_section(const std::string& name) { if (name == "global") { diff --git a/src/mount/conf.cc b/src/mount/conf.cc index ea7103f26096a..7957683175ee7 100644 --- a/src/mount/conf.cc +++ b/src/mount/conf.cc @@ -9,12 +9,15 @@ #include "common/async/context_pool.h" #include "common/ceph_context.h" #include "common/ceph_argparse.h" -#include "global/global_init.h" #include "common/config.h" +#include "global/global_init.h" + #include "auth/KeyRing.h" -#include "mount.ceph.h" #include "mon/MonClient.h" +#include "mount.ceph.h" + + extern "C" void mount_ceph_get_config_info(const char *config_file, const char *name, struct ceph_config_info *cci) diff --git a/src/neorados/CMakeLists.txt b/src/neorados/CMakeLists.txt new file mode 100644 index 0000000000000..3e56edf15a69a --- /dev/null +++ b/src/neorados/CMakeLists.txt @@ -0,0 +1,37 @@ +add_library(neorados_objs OBJECT + RADOSImpl.cc) +add_library(neorados_api_obj OBJECT + RADOS.cc) + +add_library(libneorados STATIC + $ + $) +target_link_libraries(libneorados PRIVATE + osdc ceph-common cls_lock_client fmt::fmt + ${BLKID_LIBRARIES} ${CRYPTO_LIBS} ${EXTRALIBS}) + +# if(ENABLE_SHARED) +# add_library(libneorados ${CEPH_SHARED} +# $ +# $ +# $) +# set_target_properties(libneorados PROPERTIES +# OUTPUT_NAME RADOS +# VERSION 0.0.1 +# SOVERSION 1 +# CXX_VISIBILITY_PRESET hidden +# VISIBILITY_INLINES_HIDDEN ON) +# if(NOT APPLE) +# set_property(TARGET libneorados APPEND_STRING PROPERTY +# LINK_FLAGS " -Wl,--exclude-libs,ALL") +# endif() +# else(ENABLE_SHARED) +# add_library(libneorados STATIC +# $ +# $) +# endif(ENABLE_SHARED) +# target_link_libraries(libneorados PRIVATE +# osdc ceph-common cls_lock_client +# ${BLKID_LIBRARIES} ${CRYPTO_LIBS} ${EXTRALIBS}) +# target_link_libraries(libneorados ${rados_libs}) +# install(TARGETS libneorados DESTINATION ${CMAKE_INSTALL_LIBDIR}) diff --git a/src/neorados/RADOS.cc b/src/neorados/RADOS.cc new file mode 100644 index 0000000000000..bfc09ae693007 --- /dev/null +++ b/src/neorados/RADOS.cc @@ -0,0 +1,1658 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2018 Red Hat + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#define BOOST_BIND_NO_PLACEHOLDERS + +#include +#include + +#include + +#include + +#include "include/ceph_fs.h" + +#include "common/ceph_context.h" +#include "common/ceph_argparse.h" +#include "common/common_init.h" +#include "common/hobject.h" + +#include "global/global_init.h" + +#include "osd/osd_types.h" +#include "osdc/error_code.h" + +#include "neorados/RADOSImpl.h" +#include "include/neorados/RADOS.hpp" + +namespace bc = boost::container; +namespace bs = boost::system; +namespace ca = ceph::async; +namespace cb = ceph::buffer; + +namespace neorados { +// Object + +Object::Object() { + static_assert(impl_size >= sizeof(object_t)); + new (&impl) object_t(); +} + +Object::Object(const char* s) { + static_assert(impl_size >= sizeof(object_t)); + new (&impl) object_t(s); +} + +Object::Object(std::string_view s) { + static_assert(impl_size >= sizeof(object_t)); + new (&impl) object_t(s); +} + +Object::Object(std::string&& s) { + static_assert(impl_size >= sizeof(object_t)); + new (&impl) object_t(std::move(s)); +} + +Object::Object(const std::string& s) { + static_assert(impl_size >= sizeof(object_t)); + new (&impl) object_t(s); +} + +Object::~Object() { + reinterpret_cast(&impl)->~object_t(); +} + +Object::Object(const Object& o) { + static_assert(impl_size >= sizeof(object_t)); + new (&impl) object_t(*reinterpret_cast(&o.impl)); +} +Object& Object::operator =(const Object& o) { + *reinterpret_cast(&impl) = + *reinterpret_cast(&o.impl); + return *this; +} +Object::Object(Object&& o) { + static_assert(impl_size >= sizeof(object_t)); + new (&impl) object_t(std::move(*reinterpret_cast(&o.impl))); +} +Object& Object::operator =(Object&& o) { + *reinterpret_cast(&impl) = + std::move(*reinterpret_cast(&o.impl)); + return *this; +} + +Object::operator std::string_view() const { + return std::string_view(reinterpret_cast(&impl)->name); +} + +bool operator <(const Object& lhs, const Object& rhs) { + return (*reinterpret_cast(&lhs.impl) < + *reinterpret_cast(&rhs.impl)); +} +bool operator <=(const Object& lhs, const Object& rhs) { + return (*reinterpret_cast(&lhs.impl) <= + *reinterpret_cast(&rhs.impl)); +} +bool operator >=(const Object& lhs, const Object& rhs) { + return (*reinterpret_cast(&lhs.impl) >= + *reinterpret_cast(&rhs.impl)); +} +bool operator >(const Object& lhs, const Object& rhs) { + return (*reinterpret_cast(&lhs.impl) > + *reinterpret_cast(&rhs.impl)); +} + +bool operator ==(const Object& lhs, const Object& rhs) { + return (*reinterpret_cast(&lhs.impl) == + *reinterpret_cast(&rhs.impl)); +} +bool operator !=(const Object& lhs, const Object& rhs) { + return (*reinterpret_cast(&lhs.impl) != + *reinterpret_cast(&rhs.impl)); +} + +std::ostream& operator <<(std::ostream& m, const Object& o) { + return (m << *reinterpret_cast(&o.impl)); +} + +// IOContext + +struct IOContextImpl { + object_locator_t oloc; + snapid_t snap_seq = CEPH_NOSNAP; + SnapContext snapc; +}; + +IOContext::IOContext() { + static_assert(impl_size >= sizeof(IOContextImpl)); + new (&impl) IOContextImpl(); +} + +IOContext::IOContext(std::int64_t _pool) : IOContext() { + pool(_pool); +} + +IOContext::IOContext(std::int64_t _pool, std::string_view _ns) + : IOContext() { + pool(_pool); + ns(_ns); +} + +IOContext::IOContext(std::int64_t _pool, std::string&& _ns) + : IOContext() { + pool(_pool); + ns(std::move(_ns)); +} + +IOContext::~IOContext() { + reinterpret_cast(&impl)->~IOContextImpl(); +} + +IOContext::IOContext(const IOContext& rhs) { + static_assert(impl_size >= sizeof(IOContextImpl)); + new (&impl) IOContextImpl(*reinterpret_cast(&rhs.impl)); +} + +IOContext& IOContext::operator =(const IOContext& rhs) { + *reinterpret_cast(&impl) = + *reinterpret_cast(&rhs.impl); + return *this; +} + +IOContext::IOContext(IOContext&& rhs) { + static_assert(impl_size >= sizeof(IOContextImpl)); + new (&impl) IOContextImpl( + std::move(*reinterpret_cast(&rhs.impl))); +} + +IOContext& IOContext::operator =(IOContext&& rhs) { + *reinterpret_cast(&impl) = + std::move(*reinterpret_cast(&rhs.impl)); + return *this; +} + +std::int64_t IOContext::pool() const { + return reinterpret_cast(&impl)->oloc.pool; +} + +void IOContext::pool(std::int64_t _pool) { + reinterpret_cast(&impl)->oloc.pool = _pool; +} + +std::string_view IOContext::ns() const { + return reinterpret_cast(&impl)->oloc.nspace; +} + +void IOContext::ns(std::string_view _ns) { + reinterpret_cast(&impl)->oloc.nspace = _ns; +} + +void IOContext::ns(std::string&& _ns) { + reinterpret_cast(&impl)->oloc.nspace = std::move(_ns); +} + +std::optional IOContext::key() const { + auto& oloc = reinterpret_cast(&impl)->oloc; + if (oloc.key.empty()) + return std::nullopt; + else + return std::string_view(oloc.key); +} + +void IOContext::key(std::string_view _key) { + auto& oloc = reinterpret_cast(&impl)->oloc; + oloc.hash = -1; + oloc.key = _key; +} + +void IOContext::key(std::string&&_key) { + auto& oloc = reinterpret_cast(&impl)->oloc; + oloc.hash = -1; + oloc.key = std::move(_key); +} + +void IOContext::clear_key() { + auto& oloc = reinterpret_cast(&impl)->oloc; + oloc.hash = -1; + oloc.key.clear(); +} + +std::optional IOContext::hash() const { + auto& oloc = reinterpret_cast(&impl)->oloc; + if (oloc.hash < 0) + return std::nullopt; + else + return oloc.hash; +} + +void IOContext::hash(std::int64_t _hash) { + auto& oloc = reinterpret_cast(&impl)->oloc; + oloc.hash = _hash; + oloc.key.clear(); +} + +void IOContext::clear_hash() { + auto& oloc = reinterpret_cast(&impl)->oloc; + oloc.hash = -1; + oloc.key.clear(); +} + + +std::optional IOContext::read_snap() const { + auto& snap_seq = reinterpret_cast(&impl)->snap_seq; + if (snap_seq == CEPH_NOSNAP) + return std::nullopt; + else + return snap_seq; +} +void IOContext::read_snap(std::optional _snapid) { + auto& snap_seq = reinterpret_cast(&impl)->snap_seq; + snap_seq = _snapid.value_or(CEPH_NOSNAP); +} + +std::optional< + std::pair>> IOContext::write_snap_context() const { + auto& snapc = reinterpret_cast(&impl)->snapc; + if (snapc.empty()) { + return std::nullopt; + } else { + std::vector v(snapc.snaps.begin(), snapc.snaps.end()); + return std::make_optional(std::make_pair(uint64_t(snapc.seq), v)); + } +} + +void IOContext::write_snap_context( + std::optional>> _snapc) { + auto& snapc = reinterpret_cast(&impl)->snapc; + if (!_snapc) { + snapc.clear(); + } else { + SnapContext n(_snapc->first, { _snapc->second.begin(), _snapc->second.end()}); + if (!n.is_valid()) { + throw bs::system_error(EINVAL, + bs::system_category(), + "Invalid snap context."); + + } else { + snapc = n; + } + } +} + +bool operator <(const IOContext& lhs, const IOContext& rhs) { + const auto l = reinterpret_cast(&lhs.impl); + const auto r = reinterpret_cast(&rhs.impl); + + return (std::tie(l->oloc.pool, l->oloc.nspace, l->oloc.key) < + std::tie(r->oloc.pool, r->oloc.nspace, r->oloc.key)); +} + +bool operator <=(const IOContext& lhs, const IOContext& rhs) { + const auto l = reinterpret_cast(&lhs.impl); + const auto r = reinterpret_cast(&rhs.impl); + + return (std::tie(l->oloc.pool, l->oloc.nspace, l->oloc.key) <= + std::tie(r->oloc.pool, r->oloc.nspace, r->oloc.key)); +} + +bool operator >=(const IOContext& lhs, const IOContext& rhs) { + const auto l = reinterpret_cast(&lhs.impl); + const auto r = reinterpret_cast(&rhs.impl); + + return (std::tie(l->oloc.pool, l->oloc.nspace, l->oloc.key) >= + std::tie(r->oloc.pool, r->oloc.nspace, r->oloc.key)); +} + +bool operator >(const IOContext& lhs, const IOContext& rhs) { + const auto l = reinterpret_cast(&lhs.impl); + const auto r = reinterpret_cast(&rhs.impl); + + return (std::tie(l->oloc.pool, l->oloc.nspace, l->oloc.key) > + std::tie(r->oloc.pool, r->oloc.nspace, r->oloc.key)); +} + +bool operator ==(const IOContext& lhs, const IOContext& rhs) { + const auto l = reinterpret_cast(&lhs.impl); + const auto r = reinterpret_cast(&rhs.impl); + + return (std::tie(l->oloc.pool, l->oloc.nspace, l->oloc.key) == + std::tie(r->oloc.pool, r->oloc.nspace, r->oloc.key)); +} + +bool operator !=(const IOContext& lhs, const IOContext& rhs) { + const auto l = reinterpret_cast(&lhs.impl); + const auto r = reinterpret_cast(&rhs.impl); + + return (std::tie(l->oloc.pool, l->oloc.nspace, l->oloc.key) != + std::tie(r->oloc.pool, r->oloc.nspace, r->oloc.key)); +} + +std::ostream& operator <<(std::ostream& m, const IOContext& o) { + const auto l = reinterpret_cast(&o.impl); + return (m << l->oloc.pool << ":" << l->oloc.nspace << ":" << l->oloc.key); +} + + +// Op + +struct OpImpl { + ObjectOperation op; + std::optional mtime; + + OpImpl() = default; + + OpImpl(const OpImpl& rhs) = delete; + OpImpl(OpImpl&& rhs) = default; + + OpImpl& operator =(const OpImpl& rhs) = delete; + OpImpl& operator =(OpImpl&& rhs) = default; +}; + +Op::Op() { + static_assert(Op::impl_size >= sizeof(OpImpl)); + new (&impl) OpImpl; +} + +Op::Op(Op&& rhs) { + new (&impl) OpImpl(std::move(*reinterpret_cast(&rhs.impl))); +} +Op& Op::operator =(Op&& rhs) { + reinterpret_cast(&impl)->~OpImpl(); + new (&impl) OpImpl(std::move(*reinterpret_cast(&rhs.impl))); + return *this; +} +Op::~Op() { + reinterpret_cast(&impl)->~OpImpl(); +} + +void Op::set_excl() { + reinterpret_cast(&impl)->op.set_last_op_flags(CEPH_OSD_OP_FLAG_EXCL); +} +void Op::set_failok() { + reinterpret_cast(&impl)->op.set_last_op_flags( + CEPH_OSD_OP_FLAG_FAILOK); +} +void Op::set_fadvise_random() { + reinterpret_cast(&impl)->op.set_last_op_flags( + CEPH_OSD_OP_FLAG_FADVISE_RANDOM); +} +void Op::set_fadvise_sequential() { + reinterpret_cast(&impl)->op.set_last_op_flags( + CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); +} +void Op::set_fadvise_willneed() { + reinterpret_cast(&impl)->op.set_last_op_flags( + CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); +} +void Op::set_fadvise_dontneed() { + reinterpret_cast(&impl)->op.set_last_op_flags( + CEPH_OSD_OP_FLAG_FADVISE_DONTNEED); +} +void Op::set_fadvise_nocache() { + reinterpret_cast(&impl)->op.set_last_op_flags( + CEPH_OSD_OP_FLAG_FADVISE_NOCACHE); +} + +void Op::cmpext(uint64_t off, bufferlist&& cmp_bl, std::size_t* s) { + reinterpret_cast(&impl)->op.cmpext(off, std::move(cmp_bl), nullptr, + s); +} +void Op::cmpxattr(std::string_view name, cmpxattr_op op, const bufferlist& val) { + reinterpret_cast(&impl)-> + op.cmpxattr(name, std::uint8_t(op), CEPH_OSD_CMPXATTR_MODE_STRING, val); +} +void Op::cmpxattr(std::string_view name, cmpxattr_op op, std::uint64_t val) { + bufferlist bl; + encode(val, bl); + reinterpret_cast(&impl)-> + op.cmpxattr(name, std::uint8_t(op), CEPH_OSD_CMPXATTR_MODE_U64, bl); +} + +void Op::assert_version(uint64_t ver) { + reinterpret_cast(&impl)->op.assert_version(ver); +} +void Op::assert_exists() { + reinterpret_cast(&impl)->op.stat( + nullptr, + static_cast(nullptr), + static_cast(nullptr)); +} +void Op::cmp_omap(const bc::flat_map< + std::string, std::pair>& assertions) { + reinterpret_cast(&impl)->op.omap_cmp(assertions, nullptr); +} + +void Op::exec(std::string_view cls, std::string_view method, + const bufferlist& inbl, + cb::list* out, + bs::error_code* ec) { + reinterpret_cast(&impl)->op.call(cls, method, inbl, ec, out); +} + +void Op::exec(std::string_view cls, std::string_view method, + const bufferlist& inbl, + fu2::unique_function f) { + reinterpret_cast(&impl)->op.call(cls, method, inbl, std::move(f)); +} + +void Op::exec(std::string_view cls, std::string_view method, + const bufferlist& inbl, + fu2::unique_function f) { + reinterpret_cast(&impl)->op.call(cls, method, inbl, std::move(f)); +} + +void Op::exec(std::string_view cls, std::string_view method, + const bufferlist& inbl, bs::error_code* ec) { + reinterpret_cast(&impl)->op.call(cls, method, inbl, ec); +} + +void Op::balance_reads() { + reinterpret_cast(&impl)->op.flags |= CEPH_OSD_FLAG_BALANCE_READS; +} +void Op::localize_reads() { + reinterpret_cast(&impl)->op.flags |= CEPH_OSD_FLAG_LOCALIZE_READS; +} +void Op::order_reads_writes() { + reinterpret_cast(&impl)->op.flags |= CEPH_OSD_FLAG_RWORDERED; +} +void Op::ignore_cache() { + reinterpret_cast(&impl)->op.flags |= CEPH_OSD_FLAG_IGNORE_CACHE; +} +void Op::skiprwlocks() { + reinterpret_cast(&impl)->op.flags |= CEPH_OSD_FLAG_SKIPRWLOCKS; +} +void Op::ignore_overlay() { + reinterpret_cast(&impl)->op.flags |= CEPH_OSD_FLAG_IGNORE_OVERLAY; +} +void Op::full_try() { + reinterpret_cast(&impl)->op.flags |= CEPH_OSD_FLAG_FULL_TRY; +} +void Op::full_force() { + reinterpret_cast(&impl)->op.flags |= CEPH_OSD_FLAG_FULL_FORCE; +} +void Op::ignore_redirect() { + reinterpret_cast(&impl)->op.flags |= CEPH_OSD_FLAG_IGNORE_REDIRECT; +} +void Op::ordersnap() { + reinterpret_cast(&impl)->op.flags |= CEPH_OSD_FLAG_ORDERSNAP; +} +void Op::returnvec() { + reinterpret_cast(&impl)->op.flags |= CEPH_OSD_FLAG_RETURNVEC; +} + +std::size_t Op::size() const { + return reinterpret_cast(&impl)->op.size(); +} + +std::ostream& operator <<(std::ostream& m, const Op& o) { + return m << reinterpret_cast(&o.impl)->op; +} + + +// --- + +// ReadOp / WriteOp + +void ReadOp::read(size_t off, uint64_t len, cb::list* out, + bs::error_code* ec) { + reinterpret_cast(&impl)->op.read(off, len, ec, out); +} + +void ReadOp::get_xattr(std::string_view name, cb::list* out, + bs::error_code* ec) { + reinterpret_cast(&impl)->op.getxattr(name, ec, out); +} + +void ReadOp::get_omap_header(cb::list* out, + bs::error_code* ec) { + reinterpret_cast(&impl)->op.omap_get_header(ec, out); +} + +void ReadOp::sparse_read(uint64_t off, uint64_t len, cb::list* out, + std::vector>* extents, + bs::error_code* ec) { + reinterpret_cast(&impl)->op.sparse_read(off, len, ec, extents, out); +} + +void ReadOp::stat(std::uint64_t* size, ceph::real_time* mtime, + bs::error_code* ec) { + reinterpret_cast(&impl)->op.stat(size, mtime, ec); +} + +void ReadOp::get_omap_keys(std::optional start_after, + std::uint64_t max_return, + bc::flat_set* keys, + bool* done, + bs::error_code* ec) { + reinterpret_cast(&impl)->op.omap_get_keys(start_after, max_return, + ec, keys, done); +} + +void ReadOp::get_xattrs(bc::flat_map* kv, + bs::error_code* ec) { + reinterpret_cast(&impl)->op.getxattrs(ec, kv); +} + +void ReadOp::get_omap_vals(std::optional start_after, + std::optional filter_prefix, + uint64_t max_return, + bc::flat_map* kv, + bool* done, + bs::error_code* ec) { + reinterpret_cast(&impl)->op.omap_get_vals(start_after, filter_prefix, + max_return, ec, kv, done); +} + +void ReadOp::get_omap_vals_by_keys( + const bc::flat_set& keys, + bc::flat_map* kv, + bs::error_code* ec) { + reinterpret_cast(&impl)->op.omap_get_vals_by_keys(keys, ec, kv); +} + +void ReadOp::list_watchers(std::vector* watchers, + bs::error_code* ec) { + reinterpret_cast(&impl)-> op.list_watchers(watchers, ec); +} + +void ReadOp::list_snaps(SnapSet* snaps, + bs::error_code* ec) { + reinterpret_cast(&impl)->op.list_snaps(snaps, nullptr, ec); +} + +// WriteOp + +void WriteOp::set_mtime(ceph::real_time t) { + auto o = reinterpret_cast(&impl); + o->mtime = t; +} + +void WriteOp::create(bool exclusive) { + reinterpret_cast(&impl)->op.create(exclusive); +} + +void WriteOp::write(uint64_t off, bufferlist&& bl) { + reinterpret_cast(&impl)->op.write(off, bl); +} + +void WriteOp::write_full(bufferlist&& bl) { + reinterpret_cast(&impl)->op.write_full(bl); +} + +void WriteOp::writesame(uint64_t off, uint64_t write_len, bufferlist&& bl) { + reinterpret_cast(&impl)->op.writesame(off, write_len, bl); +} + +void WriteOp::append(bufferlist&& bl) { + reinterpret_cast(&impl)->op.append(bl); +} + +void WriteOp::remove() { + reinterpret_cast(&impl)->op.remove(); +} + +void WriteOp::truncate(uint64_t off) { + reinterpret_cast(&impl)->op.truncate(off); +} + +void WriteOp::zero(uint64_t off, uint64_t len) { + reinterpret_cast(&impl)->op.zero(off, len); +} + +void WriteOp::rmxattr(std::string_view name) { + reinterpret_cast(&impl)->op.rmxattr(name); +} + +void WriteOp::setxattr(std::string_view name, + bufferlist&& bl) { + reinterpret_cast(&impl)->op.setxattr(name, bl); +} + +void WriteOp::rollback(uint64_t snapid) { + reinterpret_cast(&impl)->op.rollback(snapid); +} + +void WriteOp::set_omap( + const bc::flat_map& map) { + reinterpret_cast(&impl)->op.omap_set(map); +} + +void WriteOp::set_omap_header(bufferlist&& bl) { + reinterpret_cast(&impl)->op.omap_set_header(bl); +} + +void WriteOp::clear_omap() { + reinterpret_cast(&impl)->op.omap_clear(); +} + +void WriteOp::rm_omap_keys( + const bc::flat_set& to_rm) { + reinterpret_cast(&impl)->op.omap_rm_keys(to_rm); +} + +void WriteOp::set_alloc_hint(uint64_t expected_object_size, + uint64_t expected_write_size, + alloc_hint::alloc_hint_t flags) { + using namespace alloc_hint; + static_assert(sequential_write == + static_cast(CEPH_OSD_ALLOC_HINT_FLAG_SEQUENTIAL_WRITE)); + static_assert(random_write == + static_cast(CEPH_OSD_ALLOC_HINT_FLAG_RANDOM_WRITE)); + static_assert(sequential_read == + static_cast(CEPH_OSD_ALLOC_HINT_FLAG_SEQUENTIAL_READ)); + static_assert(random_read == + static_cast(CEPH_OSD_ALLOC_HINT_FLAG_RANDOM_READ)); + static_assert(append_only == + static_cast(CEPH_OSD_ALLOC_HINT_FLAG_APPEND_ONLY)); + static_assert(immutable == + static_cast(CEPH_OSD_ALLOC_HINT_FLAG_IMMUTABLE)); + static_assert(shortlived == + static_cast(CEPH_OSD_ALLOC_HINT_FLAG_SHORTLIVED)); + static_assert(longlived == + static_cast(CEPH_OSD_ALLOC_HINT_FLAG_LONGLIVED)); + static_assert(compressible == + static_cast(CEPH_OSD_ALLOC_HINT_FLAG_COMPRESSIBLE)); + static_assert(incompressible == + static_cast(CEPH_OSD_ALLOC_HINT_FLAG_INCOMPRESSIBLE)); + + reinterpret_cast(&impl)->op.set_alloc_hint(expected_object_size, + expected_write_size, + flags); +} + +// RADOS + +RADOS::Builder& RADOS::Builder::add_conf_file(std::string_view f) { + if (conf_files) + *conf_files += (", " + std::string(f)); + else + conf_files = std::string(f); + return *this; +} + +void RADOS::Builder::build(boost::asio::io_context& ioctx, + std::unique_ptr c) { + constexpr auto env = CODE_ENVIRONMENT_LIBRARY; + CephInitParameters ci(env); + if (name) + ci.name.set(CEPH_ENTITY_TYPE_CLIENT, *name); + else + ci.name.set(CEPH_ENTITY_TYPE_CLIENT, "admin"); + uint32_t flags = 0; + if (no_default_conf) + flags |= CINIT_FLAG_NO_DEFAULT_CONFIG_FILE; + if (no_mon_conf) + flags |= CINIT_FLAG_NO_MON_CONFIG; + + CephContext *cct = common_preinit(ci, env, flags); + if (cluster) + cct->_conf->cluster = *cluster; + + if (no_mon_conf) + cct->_conf->no_mon_config = true; + + // TODO: Come up with proper error codes here. Maybe augment the + // functions with a default bs::error_code* parameter to + // pass back. + { + std::ostringstream ss; + auto r = cct->_conf.parse_config_files(conf_files ? conf_files->data() : nullptr, + &ss, flags); + if (r < 0) + c->dispatch(std::move(c), ceph::to_error_code(r), RADOS{nullptr}); + } + + cct->_conf.parse_env(cct->get_module_type()); + + for (const auto& [n, v] : configs) { + std::stringstream ss; + auto r = cct->_conf.set_val(n, v, &ss); + if (r < 0) + c->dispatch(std::move(c), ceph::to_error_code(-EINVAL), RADOS{nullptr}); + } + + if (!no_mon_conf) { + MonClient mc_bootstrap(cct, ioctx); + // TODO This function should return an error code. + auto err = mc_bootstrap.get_monmap_and_config(); + if (err < 0) + c->dispatch(std::move(c), ceph::to_error_code(err), RADOS{nullptr}); + } + if (!cct->_log->is_started()) { + cct->_log->start(); + } + common_init_finish(cct); + + RADOS::make_with_cct(cct, ioctx, std::move(c)); +} + +void RADOS::make_with_cct(CephContext* cct, + boost::asio::io_context& ioctx, + std::unique_ptr c) { + try { + auto r = new detail::RADOS(ioctx, cct); + r->objecter->wait_for_osd_map( + [c = std::move(c), r = std::unique_ptr(r)]() mutable { + c->dispatch(std::move(c), bs::error_code{}, + RADOS{std::move(r)}); + }); + } catch (const bs::system_error& err) { + c->dispatch(std::move(c), err.code(), RADOS{nullptr}); + } +} + + +RADOS::RADOS() = default; + +RADOS::RADOS(std::unique_ptr impl) + : impl(std::move(impl)) {} + +RADOS::RADOS(RADOS&&) = default; +RADOS& RADOS::operator =(RADOS&&) = default; + +RADOS::~RADOS() = default; + +RADOS::executor_type RADOS::get_executor() const { + return impl->ioctx.get_executor(); +} + +boost::asio::io_context& RADOS::get_io_context() { + return impl->ioctx; +} + +void RADOS::execute(const Object& o, const IOContext& _ioc, ReadOp&& _op, + cb::list* bl, + std::unique_ptr c, version_t* objver) { + auto oid = reinterpret_cast(&o.impl); + auto ioc = reinterpret_cast(&_ioc.impl); + auto op = reinterpret_cast(&_op.impl); + auto flags = 0; // Should be in Op. + + impl->objecter->read( + *oid, ioc->oloc, std::move(op->op), ioc->snap_seq, bl, flags, + std::move(c), objver); +} + +void RADOS::execute(const Object& o, const IOContext& _ioc, WriteOp&& _op, + std::unique_ptr c, version_t* objver) { + auto oid = reinterpret_cast(&o.impl); + auto ioc = reinterpret_cast(&_ioc.impl); + auto op = reinterpret_cast(&_op.impl); + auto flags = 0; // Should be in Op. + ceph::real_time mtime; + if (op->mtime) + mtime = *op->mtime; + else + mtime = ceph::real_clock::now(); + + impl->objecter->mutate( + *oid, ioc->oloc, std::move(op->op), ioc->snapc, + mtime, flags, + std::move(c), objver); +} + +void RADOS::execute(const Object& o, std::int64_t pool, ReadOp&& _op, + cb::list* bl, + std::unique_ptr c, + std::optional ns, + std::optional key, + version_t* objver) { + auto oid = reinterpret_cast(&o.impl); + auto op = reinterpret_cast(&_op.impl); + auto flags = 0; // Should be in Op. + object_locator_t oloc; + oloc.pool = pool; + if (ns) + oloc.nspace = *ns; + if (key) + oloc.key = *key; + + impl->objecter->read( + *oid, oloc, std::move(op->op), CEPH_NOSNAP, bl, flags, + std::move(c), objver); +} + +void RADOS::execute(const Object& o, std::int64_t pool, WriteOp&& _op, + std::unique_ptr c, + std::optional ns, + std::optional key, + version_t* objver) { + auto oid = reinterpret_cast(&o.impl); + auto op = reinterpret_cast(&_op.impl); + auto flags = 0; // Should be in Op. + object_locator_t oloc; + oloc.pool = pool; + if (ns) + oloc.nspace = *ns; + if (key) + oloc.key = *key; + + ceph::real_time mtime; + if (op->mtime) + mtime = *op->mtime; + else + mtime = ceph::real_clock::now(); + + impl->objecter->mutate( + *oid, oloc, std::move(op->op), {}, + mtime, flags, + std::move(c), objver); +} + +boost::uuids::uuid RADOS::get_fsid() const noexcept { + return impl->monclient.get_fsid().uuid; +} + + +void RADOS::lookup_pool(std::string_view name, + std::unique_ptr c) +{ + // I kind of want to make lookup_pg_pool return + // std::optional since it can only return one error code. + int64_t ret = impl->objecter->with_osdmap( + std::mem_fn(&OSDMap::lookup_pg_pool_name), + name); + if (ret < 0) { + impl->objecter->wait_for_latest_osdmap( + [name = std::string(name), c = std::move(c), + objecter = impl->objecter.get()] + (bs::error_code ec) mutable { + int64_t ret = + objecter->with_osdmap(std::mem_fn(&OSDMap::lookup_pg_pool_name), + name); + if (ret < 0) + ca::dispatch(std::move(c), osdc_errc::pool_dne, + std::int64_t(0)); + else + ca::dispatch(std::move(c), bs::error_code{}, ret); + }); + } else if (ret < 0) { + ca::dispatch(std::move(c), osdc_errc::pool_dne, + std::int64_t(0)); + } else { + ca::dispatch(std::move(c), bs::error_code{}, ret); + } +} + + +std::optional RADOS::get_pool_alignment(int64_t pool_id) +{ + return impl->objecter->with_osdmap( + [pool_id](const OSDMap &o) -> std::optional { + if (!o.have_pg_pool(pool_id)) { + throw bs::system_error( + ENOENT, bs::system_category(), + "Cannot find pool in OSDMap."); + } else if (o.get_pg_pool(pool_id)->requires_aligned_append()) { + return o.get_pg_pool(pool_id)->required_alignment(); + } else { + return std::nullopt; + } + }); +} + +void RADOS::list_pools(std::unique_ptr c) { + impl->objecter->with_osdmap( + [&](OSDMap& o) { + std::vector> v; + for (auto p : o.get_pools()) + v.push_back(std::make_pair(p.first, o.get_pool_name(p.first))); + ca::dispatch(std::move(c), std::move(v)); + }); +} + +void RADOS::create_pool_snap(std::int64_t pool, + std::string_view snapName, + std::unique_ptr c) +{ + impl->objecter->create_pool_snap( + pool, snapName, + Objecter::PoolOp::OpComp::create( + get_executor(), + [c = std::move(c)](bs::error_code e, const bufferlist&) mutable { + ca::dispatch(std::move(c), e); + })); +} + +void RADOS::allocate_selfmanaged_snap(int64_t pool, + std::unique_ptr c) { + impl->objecter->allocate_selfmanaged_snap( + pool, + ca::Completion::create( + get_executor(), + [c = std::move(c)](bs::error_code e, snapid_t snap) mutable { + ca::dispatch(std::move(c), e, snap); + })); +} + +void RADOS::delete_pool_snap(std::int64_t pool, + std::string_view snapName, + std::unique_ptr c) +{ + impl->objecter->delete_pool_snap( + pool, snapName, + Objecter::PoolOp::OpComp::create( + get_executor(), + [c = std::move(c)](bs::error_code e, const bufferlist&) mutable { + ca::dispatch(std::move(c), e); + })); +} + +void RADOS::delete_selfmanaged_snap(std::int64_t pool, + std::uint64_t snap, + std::unique_ptr c) +{ + impl->objecter->delete_selfmanaged_snap( + pool, snap, + Objecter::PoolOp::OpComp::create( + get_executor(), + [c = std::move(c)](bs::error_code e, const bufferlist&) mutable { + ca::dispatch(std::move(c), e); + })); +} + +void RADOS::create_pool(std::string_view name, + std::optional crush_rule, + std::unique_ptr c) +{ + impl->objecter->create_pool( + name, + Objecter::PoolOp::OpComp::create( + get_executor(), + [c = std::move(c)](bs::error_code e, const bufferlist&) mutable { + ca::dispatch(std::move(c), e); + }), + crush_rule.value_or(-1)); +} + +void RADOS::delete_pool(std::string_view name, + std::unique_ptr c) +{ + impl->objecter->delete_pool( + name, + Objecter::PoolOp::OpComp::create( + get_executor(), + [c = std::move(c)](bs::error_code e, const bufferlist&) mutable { + ca::dispatch(std::move(c), e); + })); +} + +void RADOS::delete_pool(std::int64_t pool, + std::unique_ptr c) +{ + impl->objecter->delete_pool( + pool, + Objecter::PoolOp::OpComp::create( + get_executor(), + [c = std::move(c)](bs::error_code e, const bufferlist&) mutable { + ca::dispatch(std::move(c), e); + })); +} + +void RADOS::stat_pools(const std::vector& pools, + std::unique_ptr c) { + impl->objecter->get_pool_stats( + pools, + [c = std::move(c)] + (bs::error_code ec, + bc::flat_map rawresult, + bool per_pool) mutable { + bc::flat_map result; + for (auto p = rawresult.begin(); p != rawresult.end(); ++p) { + auto& pv = result[p->first]; + auto& pstat = p->second; + store_statfs_t &statfs = pstat.store_stats; + uint64_t allocated_bytes = pstat.get_allocated_data_bytes(per_pool) + + pstat.get_allocated_omap_bytes(per_pool); + // FIXME: raw_used_rate is unknown hence use 1.0 here + // meaning we keep net amount aggregated over all replicas + // Not a big deal so far since this field isn't exposed + uint64_t user_bytes = pstat.get_user_data_bytes(1.0, per_pool) + + pstat.get_user_omap_bytes(1.0, per_pool); + + object_stat_sum_t *sum = &p->second.stats.sum; + pv.num_kb = shift_round_up(allocated_bytes, 10); + pv.num_bytes = allocated_bytes; + pv.num_objects = sum->num_objects; + pv.num_object_clones = sum->num_object_clones; + pv.num_object_copies = sum->num_object_copies; + pv.num_objects_missing_on_primary = sum->num_objects_missing_on_primary; + pv.num_objects_unfound = sum->num_objects_unfound; + pv.num_objects_degraded = sum->num_objects_degraded; + pv.num_rd = sum->num_rd; + pv.num_rd_kb = sum->num_rd_kb; + pv.num_wr = sum->num_wr; + pv.num_wr_kb = sum->num_wr_kb; + pv.num_user_bytes = user_bytes; + pv.compressed_bytes_orig = statfs.data_compressed_original; + pv.compressed_bytes = statfs.data_compressed; + pv.compressed_bytes_alloc = statfs.data_compressed_allocated; + } + + ca::dispatch(std::move(c), ec, std::move(result), per_pool); + }); +} + +void RADOS::stat_fs(std::optional _pool, + std::unique_ptr c) { + boost::optional pool; + if (_pool) + pool = *pool; + impl->objecter->get_fs_stats( + pool, + [c = std::move(c)](bs::error_code ec, const struct ceph_statfs s) mutable { + FSStats fso{s.kb, s.kb_used, s.kb_avail, s.num_objects}; + c->dispatch(std::move(c), ec, std::move(fso)); + }); +} + +// --- Watch/Notify + +void RADOS::watch(const Object& o, const IOContext& _ioc, + std::optional timeout, WatchCB&& cb, + std::unique_ptr c) { + auto oid = reinterpret_cast(&o.impl); + auto ioc = reinterpret_cast(&_ioc.impl); + + ObjectOperation op; + + auto linger_op = impl->objecter->linger_register(*oid, ioc->oloc, 0); + uint64_t cookie = linger_op->get_cookie(); + linger_op->handle = std::move(cb); + op.watch(cookie, CEPH_OSD_WATCH_OP_WATCH, timeout.value_or(0s).count()); + bufferlist bl; + impl->objecter->linger_watch( + linger_op, op, ioc->snapc, ceph::real_clock::now(), bl, + Objecter::LingerOp::OpComp::create( + get_executor(), + [c = std::move(c), cookie](bs::error_code e, cb::list) mutable { + ca::dispatch(std::move(c), e, cookie); + }), nullptr); +} + +void RADOS::watch(const Object& o, std::int64_t pool, + std::optional timeout, WatchCB&& cb, + std::unique_ptr c, + std::optional ns, + std::optional key) { + auto oid = reinterpret_cast(&o.impl); + object_locator_t oloc; + oloc.pool = pool; + if (ns) + oloc.nspace = *ns; + if (key) + oloc.key = *key; + + ObjectOperation op; + + Objecter::LingerOp *linger_op = impl->objecter->linger_register(*oid, oloc, 0); + uint64_t cookie = linger_op->get_cookie(); + linger_op->handle = std::move(cb); + op.watch(cookie, CEPH_OSD_WATCH_OP_WATCH, timeout.value_or(0s).count()); + bufferlist bl; + impl->objecter->linger_watch( + linger_op, op, {}, ceph::real_clock::now(), bl, + Objecter::LingerOp::OpComp::create( + get_executor(), + [c = std::move(c), cookie](bs::error_code e, bufferlist) mutable { + ca::dispatch(std::move(c), e, cookie); + }), nullptr); +} + +void RADOS::notify_ack(const Object& o, + const IOContext& _ioc, + uint64_t notify_id, + uint64_t cookie, + bufferlist&& bl, + std::unique_ptr c) +{ + auto oid = reinterpret_cast(&o.impl); + auto ioc = reinterpret_cast(&_ioc.impl); + + ObjectOperation op; + op.notify_ack(notify_id, cookie, bl); + + impl->objecter->read(*oid, ioc->oloc, std::move(op), ioc->snap_seq, + nullptr, 0, std::move(c)); +} + +void RADOS::notify_ack(const Object& o, + std::int64_t pool, + uint64_t notify_id, + uint64_t cookie, + bufferlist&& bl, + std::unique_ptr c, + std::optional ns, + std::optional key) { + auto oid = reinterpret_cast(&o.impl); + object_locator_t oloc; + oloc.pool = pool; + if (ns) + oloc.nspace = *ns; + if (key) + oloc.key = *key; + + ObjectOperation op; + op.notify_ack(notify_id, cookie, bl); + impl->objecter->read(*oid, oloc, std::move(op), CEPH_NOSNAP, nullptr, 0, + std::move(c)); +} + +tl::expected RADOS::watch_check(uint64_t cookie) +{ + Objecter::LingerOp *linger_op = reinterpret_cast(cookie); + return impl->objecter->linger_check(linger_op); +} + +void RADOS::unwatch(uint64_t cookie, const IOContext& _ioc, + std::unique_ptr c) +{ + auto ioc = reinterpret_cast(&_ioc.impl); + + Objecter::LingerOp *linger_op = reinterpret_cast(cookie); + + ObjectOperation op; + op.watch(cookie, CEPH_OSD_WATCH_OP_UNWATCH); + impl->objecter->mutate(linger_op->target.base_oid, ioc->oloc, std::move(op), + ioc->snapc, ceph::real_clock::now(), 0, + Objecter::Op::OpComp::create( + get_executor(), + [objecter = impl->objecter.get(), + linger_op, c = std::move(c)] + (bs::error_code ec) mutable { + objecter->linger_cancel(linger_op); + ca::dispatch(std::move(c), ec); + })); +} + +void RADOS::unwatch(uint64_t cookie, std::int64_t pool, + std::unique_ptr c, + std::optional ns, + std::optional key) +{ + object_locator_t oloc; + oloc.pool = pool; + if (ns) + oloc.nspace = *ns; + if (key) + oloc.key = *key; + + Objecter::LingerOp *linger_op = reinterpret_cast(cookie); + + ObjectOperation op; + op.watch(cookie, CEPH_OSD_WATCH_OP_UNWATCH); + impl->objecter->mutate(linger_op->target.base_oid, oloc, std::move(op), + {}, ceph::real_clock::now(), 0, + Objecter::Op::OpComp::create( + get_executor(), + [objecter = impl->objecter.get(), + linger_op, c = std::move(c)] + (bs::error_code ec) mutable { + objecter->linger_cancel(linger_op); + ca::dispatch(std::move(c), ec); + })); +} + +void RADOS::flush_watch(std::unique_ptr c) +{ + impl->objecter->linger_callback_flush([c = std::move(c)]() mutable { + ca::post(std::move(c)); + }); +} + +struct NotifyHandler : std::enable_shared_from_this { + boost::asio::io_context& ioc; + boost::asio::io_context::strand strand; + Objecter* objecter; + Objecter::LingerOp* op; + std::unique_ptr c; + + bool acked = false; + bool finished = false; + bs::error_code res; + bufferlist rbl; + + NotifyHandler(boost::asio::io_context& ioc, + Objecter* objecter, + Objecter::LingerOp* op, + std::unique_ptr c) + : ioc(ioc), strand(ioc), objecter(objecter), op(op), c(std::move(c)) {} + + // Use bind or a lambda to pass this in. + void handle_ack(bs::error_code ec, + bufferlist&&) { + boost::asio::post( + strand, + [this, ec, p = shared_from_this()]() mutable { + acked = true; + maybe_cleanup(ec); + }); + } + + // Notify finish callback. It can actually own the object's storage. + + void operator()(bs::error_code ec, + bufferlist&& bl) { + boost::asio::post( + strand, + [this, ec, p = shared_from_this()]() mutable { + finished = true; + maybe_cleanup(ec); + }); + } + + // Should be called from strand. + void maybe_cleanup(bs::error_code ec) { + if (!res && ec) + res = ec; + if ((acked && finished) || res) { + objecter->linger_cancel(op); + ceph_assert(c); + ca::dispatch(std::move(c), res, std::move(rbl)); + } + } +}; + +void RADOS::notify(const Object& o, const IOContext& _ioc, bufferlist&& bl, + std::optional timeout, + std::unique_ptr c) +{ + auto oid = reinterpret_cast(&o.impl); + auto ioc = reinterpret_cast(&_ioc.impl); + auto linger_op = impl->objecter->linger_register(*oid, ioc->oloc, 0); + + auto cb = std::make_shared(impl->ioctx, impl->objecter.get(), + linger_op, std::move(c)); + linger_op->on_notify_finish = + Objecter::LingerOp::OpComp::create( + get_executor(), + [cb](bs::error_code ec, ceph::bufferlist bl) mutable { + (*cb)(ec, std::move(bl)); + }); + ObjectOperation rd; + bufferlist inbl; + rd.notify( + linger_op->get_cookie(), 1, + timeout ? timeout->count() : impl->cct->_conf->client_notify_timeout, + bl, &inbl); + + impl->objecter->linger_notify( + linger_op, rd, ioc->snap_seq, inbl, + Objecter::LingerOp::OpComp::create( + get_executor(), + [cb](bs::error_code ec, ceph::bufferlist bl) mutable { + cb->handle_ack(ec, std::move(bl)); + }), nullptr); +} + +void RADOS::notify(const Object& o, std::int64_t pool, bufferlist&& bl, + std::optional timeout, + std::unique_ptr c, + std::optional ns, + std::optional key) +{ + auto oid = reinterpret_cast(&o.impl); + object_locator_t oloc; + oloc.pool = pool; + if (ns) + oloc.nspace = *ns; + if (key) + oloc.key = *key; + auto linger_op = impl->objecter->linger_register(*oid, oloc, 0); + + auto cb = std::make_shared(impl->ioctx, impl->objecter.get(), + linger_op, std::move(c)); + linger_op->on_notify_finish = + Objecter::LingerOp::OpComp::create( + get_executor(), + [cb](bs::error_code ec, ceph::bufferlist&& bl) mutable { + (*cb)(ec, std::move(bl)); + }); + ObjectOperation rd; + bufferlist inbl; + rd.notify( + linger_op->get_cookie(), 1, + timeout ? timeout->count() : impl->cct->_conf->client_notify_timeout, + bl, &inbl); + + impl->objecter->linger_notify( + linger_op, rd, CEPH_NOSNAP, inbl, + Objecter::LingerOp::OpComp::create( + get_executor(), + [cb](bs::error_code ec, bufferlist&& bl) mutable { + cb->handle_ack(ec, std::move(bl)); + }), nullptr); +} + +// Enumeration + +Cursor::Cursor() { + static_assert(impl_size >= sizeof(hobject_t)); + new (&impl) hobject_t(); +}; + +Cursor::Cursor(end_magic_t) { + static_assert(impl_size >= sizeof(hobject_t)); + new (&impl) hobject_t(hobject_t::get_max()); +} + +Cursor::Cursor(void* p) { + static_assert(impl_size >= sizeof(hobject_t)); + new (&impl) hobject_t(std::move(*reinterpret_cast(p))); +} + +Cursor Cursor::begin() { + Cursor e; + return e; +} + +Cursor Cursor::end() { + Cursor e(end_magic_t{}); + return e; +} + +Cursor::Cursor(const Cursor& rhs) { + static_assert(impl_size >= sizeof(hobject_t)); + new (&impl) hobject_t(*reinterpret_cast(&rhs.impl)); +} + +Cursor& Cursor::operator =(const Cursor& rhs) { + static_assert(impl_size >= sizeof(hobject_t)); + reinterpret_cast(&impl)->~hobject_t(); + new (&impl) hobject_t(*reinterpret_cast(&rhs.impl)); + return *this; +} + +Cursor::Cursor(Cursor&& rhs) { + static_assert(impl_size >= sizeof(hobject_t)); + new (&impl) hobject_t(std::move(*reinterpret_cast(&rhs.impl))); +} + +Cursor& Cursor::operator =(Cursor&& rhs) { + static_assert(impl_size >= sizeof(hobject_t)); + reinterpret_cast(&impl)->~hobject_t(); + new (&impl) hobject_t(std::move(*reinterpret_cast(&rhs.impl))); + return *this; +} +Cursor::~Cursor() { + reinterpret_cast(&impl)->~hobject_t(); +} + +bool operator ==(const Cursor& lhs, const Cursor& rhs) { + return (*reinterpret_cast(&lhs.impl) == + *reinterpret_cast(&rhs.impl)); +} + +bool operator !=(const Cursor& lhs, const Cursor& rhs) { + return (*reinterpret_cast(&lhs.impl) != + *reinterpret_cast(&rhs.impl)); +} + +bool operator <(const Cursor& lhs, const Cursor& rhs) { + return (*reinterpret_cast(&lhs.impl) < + *reinterpret_cast(&rhs.impl)); +} + +bool operator <=(const Cursor& lhs, const Cursor& rhs) { + return (*reinterpret_cast(&lhs.impl) <= + *reinterpret_cast(&rhs.impl)); +} + +bool operator >=(const Cursor& lhs, const Cursor& rhs) { + return (*reinterpret_cast(&lhs.impl) >= + *reinterpret_cast(&rhs.impl)); +} + +bool operator >(const Cursor& lhs, const Cursor& rhs) { + return (*reinterpret_cast(&lhs.impl) > + *reinterpret_cast(&rhs.impl)); +} + +std::string Cursor::to_str() const { + using namespace std::literals; + auto& h = *reinterpret_cast(&impl); + + return h.is_max() ? "MAX"s : h.to_str(); +} + +std::optional +Cursor::from_str(const std::string& s) { + Cursor e; + auto& h = *reinterpret_cast(&e.impl); + if (!h.parse(s)) + return std::nullopt; + + return e; +} + +void RADOS::enumerate_objects(const IOContext& _ioc, + const Cursor& begin, + const Cursor& end, + const std::uint32_t max, + const bufferlist& filter, + std::unique_ptr c) { + auto ioc = reinterpret_cast(&_ioc.impl); + + impl->objecter->enumerate_objects( + ioc->oloc.pool, + ioc->oloc.nspace, + *reinterpret_cast(&begin.impl), + *reinterpret_cast(&end.impl), + max, + filter, + [c = std::move(c)] + (bs::error_code ec, std::vector&& v, + hobject_t&& n) mutable { + ca::dispatch(std::move(c), ec, std::move(v), + Cursor(static_cast(&n))); + }); +} + +void RADOS::enumerate_objects(std::int64_t pool, + const Cursor& begin, + const Cursor& end, + const std::uint32_t max, + const bufferlist& filter, + std::unique_ptr c, + std::optional ns, + std::optional key) { + impl->objecter->enumerate_objects( + pool, + ns ? *ns : std::string_view{}, + *reinterpret_cast(&begin.impl), + *reinterpret_cast(&end.impl), + max, + filter, + [c = std::move(c)] + (bs::error_code ec, std::vector&& v, + hobject_t&& n) mutable { + ca::dispatch(std::move(c), ec, std::move(v), + Cursor(static_cast(&n))); + }); +} + + +void RADOS::osd_command(int osd, std::vector&& cmd, + ceph::bufferlist&& in, std::unique_ptr c) { + impl->objecter->osd_command(osd, std::move(cmd), std::move(in), nullptr, + [c = std::move(c)] + (bs::error_code ec, + std::string&& s, + ceph::bufferlist&& b) mutable { + ca::dispatch(std::move(c), ec, + std::move(s), + std::move(b)); + }); +} +void RADOS::pg_command(PG pg, std::vector&& cmd, + ceph::bufferlist&& in, std::unique_ptr c) { + impl->objecter->pg_command(pg_t{pg.seed, pg.pool}, std::move(cmd), std::move(in), nullptr, + [c = std::move(c)] + (bs::error_code ec, + std::string&& s, + ceph::bufferlist&& b) mutable { + ca::dispatch(std::move(c), ec, + std::move(s), + std::move(b)); + }); +} + +void RADOS::enable_application(std::string_view pool, std::string_view app_name, + bool force, std::unique_ptr c) { + // pre-Luminous clusters will return -EINVAL and application won't be + // preserved until Luminous is configured as minimum version. + if (!impl->get_required_monitor_features().contains_all( + ceph::features::mon::FEATURE_LUMINOUS)) { + ca::dispatch(std::move(c), ceph::to_error_code(-EOPNOTSUPP)); + } else { + impl->monclient.start_mon_command( + { fmt::format("{{ \"prefix\": \"osd pool application enable\"," + "\"pool\": \"{}\", \"app\": \"{}\"{}}}", + pool, app_name, + force ? " ,\"yes_i_really_mean_it\": true" : "")}, + {}, [c = std::move(c)](bs::error_code e, + std::string, cb::list) mutable { + ca::post(std::move(c), e); + }); + } +} + +void RADOS::mon_command(std::vector command, + const cb::list& bl, + std::string* outs, cb::list* outbl, + std::unique_ptr c) { + + impl->monclient.start_mon_command( + command, bl, + [c = std::move(c), outs, outbl](bs::error_code e, + std::string s, cb::list bl) mutable { + if (outs) + *outs = std::move(s); + if (outbl) + *outbl = std::move(bl); + ca::post(std::move(c), e); + }); +} + +uint64_t RADOS::instance_id() const { + return impl->get_instance_id(); +} + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wnon-virtual-dtor" +#pragma clang diagnostic push +#pragma clang diagnostic ignored "-Wnon-virtual-dtor" +class category : public ceph::converting_category { +public: + category() {} + const char* name() const noexcept override; + const char* message(int ev, char*, std::size_t) const noexcept override; + std::string message(int ev) const override; + bs::error_condition default_error_condition(int ev) const noexcept + override; + bool equivalent(int ev, const bs::error_condition& c) const + noexcept override; + using ceph::converting_category::equivalent; + int from_code(int ev) const noexcept override; +}; +#pragma GCC diagnostic pop +#pragma clang diagnostic pop + +const char* category::name() const noexcept { + return "RADOS"; +} + +const char* category::message(int ev, char*, + std::size_t) const noexcept { + if (ev == 0) + return "No error"; + + switch (static_cast(ev)) { + case errc::pool_dne: + return "Pool does not exist"; + + case errc::invalid_snapcontext: + return "Invalid snapcontext"; + } + + return "Unknown error"; +} + +std::string category::message(int ev) const { + return message(ev, nullptr, 0); +} + +bs::error_condition category::default_error_condition(int ev) const noexcept { + switch (static_cast(ev)) { + case errc::pool_dne: + return ceph::errc::does_not_exist; + case errc::invalid_snapcontext: + return bs::errc::invalid_argument; + } + + return { ev, *this }; +} + +bool category::equivalent(int ev, const bs::error_condition& c) const noexcept { + if (static_cast(ev) == errc::pool_dne) { + if (c == bs::errc::no_such_file_or_directory) { + return true; + } + } + + return default_error_condition(ev) == c; +} + +int category::from_code(int ev) const noexcept { + switch (static_cast(ev)) { + case errc::pool_dne: + return -ENOENT; + case errc::invalid_snapcontext: + return -EINVAL; + } + return -EDOM; +} + +const bs::error_category& error_category() noexcept { + static const class category c; + return c; +} + +CephContext* RADOS::cct() { + return impl->cct; +} +} + +namespace std { +size_t hash::operator ()( + const neorados::Object& r) const { + static constexpr const hash H; + return H(*reinterpret_cast(&r.impl)); +} + +size_t hash::operator ()( + const neorados::IOContext& r) const { + static constexpr const hash H; + static constexpr const hash G; + const auto l = reinterpret_cast(&r.impl); + return H(l->oloc.pool) ^ (G(l->oloc.nspace) << 1) ^ (G(l->oloc.key) << 2); +} +} diff --git a/src/neorados/RADOSImpl.cc b/src/neorados/RADOSImpl.cc new file mode 100644 index 0000000000000..4dd1f37db058a --- /dev/null +++ b/src/neorados/RADOSImpl.cc @@ -0,0 +1,112 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2012 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + + +#include + +#include "common/common_init.h" + +#include "global/global_init.h" + +#include "RADOSImpl.h" + +namespace neorados { +namespace detail { + +RADOS::RADOS(boost::asio::io_context& ioctx, + boost::intrusive_ptr _cct) + : Dispatcher(_cct.detach()), + ioctx(ioctx), + monclient(cct, ioctx), + moncsd(monclient), + mgrclient(cct, nullptr, &monclient.monmap), + mgrcsd(mgrclient) { + auto err = monclient.build_initial_monmap(); + if (err < 0) + throw std::system_error(ceph::to_error_code(err)); + + messenger.reset(Messenger::create_client_messenger(cct, "radosclient")); + if (!messenger) + throw std::bad_alloc(); + + // Require OSDREPLYMUX feature. This means we will fail to talk to + // old servers. This is necessary because otherwise we won't know + // how to decompose the reply data into its constituent pieces. + messenger->set_default_policy( + Messenger::Policy::lossy_client(CEPH_FEATURE_OSDREPLYMUX)); + + objecter.reset(new Objecter(cct, messenger.get(), &monclient, + ioctx, + cct->_conf->rados_mon_op_timeout, + cct->_conf->rados_osd_op_timeout)); + + objecter->set_balanced_budget(); + monclient.set_messenger(messenger.get()); + mgrclient.set_messenger(messenger.get()); + objecter->init(); + messenger->add_dispatcher_head(&mgrclient); + messenger->add_dispatcher_tail(objecter.get()); + messenger->start(); + monclient.set_want_keys(CEPH_ENTITY_TYPE_MON | CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MGR); + err = monclient.init(); + if (err) { + throw boost::system::system_error(ceph::to_error_code(err)); + } + err = monclient.authenticate(cct->_conf->client_mount_timeout); + if (err) { + throw boost::system::system_error(ceph::to_error_code(err)); + } + messenger->set_myname(entity_name_t::CLIENT(monclient.get_global_id())); + // Detect older cluster, put mgrclient into compatible mode + mgrclient.set_mgr_optional( + !get_required_monitor_features().contains_all( + ceph::features::mon::FEATURE_LUMINOUS)); + + // MgrClient needs this (it doesn't have MonClient reference itself) + monclient.sub_want("mgrmap", 0, 0); + monclient.renew_subs(); + + mgrclient.init(); + objecter->set_client_incarnation(0); + objecter->start(); + + messenger->add_dispatcher_tail(this); + + std::unique_lock l(lock); + instance_id = monclient.get_global_id(); +} + +bool RADOS::ms_dispatch(Message *m) +{ + switch (m->get_type()) { + // OSD + case CEPH_MSG_OSD_MAP: + m->put(); + return true; + } + return false; +} + +void RADOS::ms_handle_connect(Connection *con) {} +bool RADOS::ms_handle_reset(Connection *con) { + return false; +} +void RADOS::ms_handle_remote_reset(Connection *con) {} +bool RADOS::ms_handle_refused(Connection *con) { + return false; +} + +RADOS::~RADOS() = default; +} +} diff --git a/src/neorados/RADOSImpl.h b/src/neorados/RADOSImpl.h new file mode 100644 index 0000000000000..bf9e75d09919b --- /dev/null +++ b/src/neorados/RADOSImpl.h @@ -0,0 +1,103 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2012 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ +#ifndef CEPH_LIBRADOS_RADOSCLIENT_H +#define CEPH_LIBRADOS_RADOSCLIENT_H + +#include +#include +#include + +#include +#include + +#include "common/ceph_context.h" +#include "common/ceph_mutex.h" + +#include "mon/MonClient.h" + +#include "mgr/MgrClient.h" + +#include "osdc/Objecter.h" + +namespace neorados { + class RADOS; +namespace detail { + +class RADOS : public Dispatcher +{ + friend ::neorados::RADOS; + struct MsgDeleter { + void operator()(Messenger* p) const { + if (p) { + p->shutdown(); + p->wait(); + } + delete p; + } + }; + + struct ObjDeleter { + void operator()(Objecter* p) const { + if (p) { + p->shutdown(); + } + delete p; + } + }; + + template + struct scoped_shutdown { + T& m; + scoped_shutdown(T& m) : m(m) {} + + ~scoped_shutdown() { + m.shutdown(); + } + }; + + boost::asio::io_context& ioctx; + ceph::mutex lock = ceph::make_mutex("RADOS_unleashed::_::RADOSImpl"); + int instance_id = -1; + + std::unique_ptr messenger; + + MonClient monclient; + scoped_shutdown moncsd; + + MgrClient mgrclient; + scoped_shutdown mgrcsd; + + std::unique_ptr objecter; + + +public: + + RADOS(boost::asio::io_context& ioctx, boost::intrusive_ptr cct); + ~RADOS(); + bool ms_dispatch(Message *m) override; + void ms_handle_connect(Connection *con) override; + bool ms_handle_reset(Connection *con) override; + void ms_handle_remote_reset(Connection *con) override; + bool ms_handle_refused(Connection *con) override; + mon_feature_t get_required_monitor_features() const { + return monclient.with_monmap(std::mem_fn(&MonMap::get_required_features)); + } + int get_instance_id() const { + return instance_id; + } +}; +} +} + +#endif diff --git a/src/osdc/Objecter.cc b/src/osdc/Objecter.cc index 516fe63afd1c2..c59807ee4e389 100644 --- a/src/osdc/Objecter.cc +++ b/src/osdc/Objecter.cc @@ -5067,7 +5067,7 @@ void Objecter::enumerate_objects( hobject_t) &&> on_finish); template -void Objecter::enumerate_objects( +void Objecter::enumerate_objects( int64_t pool_id, std::string_view ns, hobject_t start, @@ -5075,7 +5075,7 @@ void Objecter::enumerate_objects( const uint32_t max, const cb::list& filter_bl, fu2::unique_function, + std::vector, hobject_t) &&> on_finish); @@ -5108,8 +5108,8 @@ void Objecter::_issue_enumerate( hobject_t start, std::unique_ptr> ctx); template -void Objecter::_issue_enumerate( - hobject_t start, std::unique_ptr> ctx); +void Objecter::_issue_enumerate( + hobject_t start, std::unique_ptr> ctx); template void Objecter::_enumerate_reply( @@ -5210,10 +5210,10 @@ void Objecter::_enumerate_reply( std::unique_ptr>&& ctx); template -void Objecter::_enumerate_reply( +void Objecter::_enumerate_reply( cb::list&& bl, bs::error_code ec, - std::unique_ptr>&& ctx); + std::unique_ptr>&& ctx); namespace { using namespace librados; diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h index a34e02fe018ad..321055b041b5e 100644 --- a/src/osdc/Objecter.h +++ b/src/osdc/Objecter.h @@ -39,7 +39,7 @@ #include "include/types.h" #include "include/rados/rados_types.hpp" #include "include/function2.hpp" -#include "include/RADOS/RADOS_Decodable.hpp" +#include "include/neorados/RADOS_Decodable.hpp" #include "common/admin_socket.h" #include "common/async/completion.h" @@ -733,10 +733,10 @@ struct ObjectOperation { }; struct CB_ObjectOperation_decodewatchersneo { - std::vector* pwatchers; + std::vector* pwatchers; int* prval; boost::system::error_code* pec; - CB_ObjectOperation_decodewatchersneo(std::vector* pw, + CB_ObjectOperation_decodewatchersneo(std::vector* pw, int* pr, boost::system::error_code* pec) : pwatchers(pw), prval(pr), pec(pec) {} @@ -749,7 +749,7 @@ struct ObjectOperation { decode(resp, p); if (pwatchers) { for (const auto& watch_item : resp.entries) { - RADOS::ObjWatcher ow; + neorados::ObjWatcher ow; ow.addr = watch_item.addr.get_legacy_str(); ow.watcher_id = watch_item.name.num(); ow.cookie = watch_item.cookie; @@ -770,11 +770,11 @@ struct ObjectOperation { struct CB_ObjectOperation_decodesnaps { librados::snap_set_t *psnaps; - RADOS::SnapSet *neosnaps; + neorados::SnapSet *neosnaps; int *prval; boost::system::error_code* pec; CB_ObjectOperation_decodesnaps(librados::snap_set_t* ps, - RADOS::SnapSet* ns, int* pr, + neorados::SnapSet* ns, int* pr, boost::system::error_code* pec) : psnaps(ps), neosnaps(ns), prval(pr), pec(pec) {} void operator()(boost::system::error_code ec, int r, const ceph::buffer::list& bl) { @@ -806,7 +806,7 @@ struct ObjectOperation { if (neosnaps) { neosnaps->clones.clear(); for (auto&& c : resp.clones) { - RADOS::CloneInfo clone; + neorados::CloneInfo clone; clone.cloneid = std::move(c.cloneid); clone.snaps.reserve(c.snaps.size()); @@ -1405,7 +1405,7 @@ struct ObjectOperation { out_rval.back() = prval; } } - void list_watchers(vector* out, + void list_watchers(vector* out, boost::system::error_code* ec) { add_op(CEPH_OSD_OP_LIST_WATCHERS); set_handler(CB_ObjectOperation_decodewatchersneo(out, nullptr, ec)); @@ -1422,7 +1422,7 @@ struct ObjectOperation { } } - void list_snaps(RADOS::SnapSet *out, int *prval, + void list_snaps(neorados::SnapSet *out, int *prval, boost::system::error_code* ec = nullptr) { add_op(CEPH_OSD_OP_LIST_SNAPS); if (prval || out || ec) { diff --git a/src/test/CMakeLists.txt b/src/test/CMakeLists.txt index cbd715661a34c..25a2f2436c41d 100644 --- a/src/test/CMakeLists.txt +++ b/src/test/CMakeLists.txt @@ -35,6 +35,7 @@ add_subdirectory(fs) add_subdirectory(journal) add_subdirectory(libcephfs) add_subdirectory(librados) +add_subdirectory(neorados) add_subdirectory(librados_test_stub) if(WITH_LIBRADOSSTRIPER) add_subdirectory(libradosstriper) diff --git a/src/test/neorados/CMakeLists.txt b/src/test/neorados/CMakeLists.txt new file mode 100644 index 0000000000000..38f7ae4b056a3 --- /dev/null +++ b/src/test/neorados/CMakeLists.txt @@ -0,0 +1,2 @@ +add_executable(ceph_test_neorados_start_stop start_stop.cc) +target_link_libraries(ceph_test_neorados_start_stop global libneorados ${unittest_libs}) diff --git a/src/test/neorados/start_stop.cc b/src/test/neorados/start_stop.cc new file mode 100644 index 0000000000000..a2475e3118008 --- /dev/null +++ b/src/test/neorados/start_stop.cc @@ -0,0 +1,175 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2019 Red Hat + * Author: Adam C. Emerson + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include +#include + +#include "include/neorados/RADOS.hpp" + +#include "common/async/context_pool.h" +#include "common/ceph_argparse.h" + +#include "global/global_init.h" + +namespace R = neorados; + + +int main(int argc, char** argv) +{ + using namespace std::literals; + + std::vector args; + argv_to_vec(argc, const_cast(argv), args); + env_to_vec(args); + + auto cct = global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT, + CODE_ENVIRONMENT_UTILITY, 0); + common_init_finish(cct.get()); + + { + ceph::async::io_context_pool p(1); + auto r = R::RADOS::make_with_cct(cct.get(), p, + boost::asio::use_future).get(); + std::this_thread::sleep_for(30s); + } + std::this_thread::sleep_for(30s); + { + ceph::async::io_context_pool p(1); + auto r = R::RADOS::make_with_cct(cct.get(), p, + boost::asio::use_future).get(); + std::this_thread::sleep_for(30s); + } + { + ceph::async::io_context_pool p(1); + auto r = R::RADOS::make_with_cct(cct.get(), p, + boost::asio::use_future).get(); + std::this_thread::sleep_for(1s); + } + { + ceph::async::io_context_pool p(1); + auto r = R::RADOS::make_with_cct(cct.get(), p, + boost::asio::use_future).get(); + std::this_thread::sleep_for(1s); + } + { + ceph::async::io_context_pool p(1); + auto r = R::RADOS::make_with_cct(cct.get(), p, + boost::asio::use_future).get(); + std::this_thread::sleep_for(1s); + } + { + ceph::async::io_context_pool p(1); + auto r = R::RADOS::make_with_cct(cct.get(), p, + boost::asio::use_future).get(); + std::this_thread::sleep_for(1s); + } + { + ceph::async::io_context_pool p(1); + auto r = R::RADOS::make_with_cct(cct.get(), p, + boost::asio::use_future).get(); + std::this_thread::sleep_for(1s); + } + { + ceph::async::io_context_pool p(1); + auto r = R::RADOS::make_with_cct(cct.get(), p, + boost::asio::use_future).get(); + std::this_thread::sleep_for(1s); + } + { + ceph::async::io_context_pool p(1); + auto r = R::RADOS::make_with_cct(cct.get(), p, + boost::asio::use_future).get(); + std::this_thread::sleep_for(1s); + } + { + ceph::async::io_context_pool p(1); + auto r = R::RADOS::make_with_cct(cct.get(), p, + boost::asio::use_future).get(); + std::this_thread::sleep_for(500ms); + } + { + ceph::async::io_context_pool p(1); + auto r = R::RADOS::make_with_cct(cct.get(), p, + boost::asio::use_future).get(); + std::this_thread::sleep_for(500ms); + } + { + ceph::async::io_context_pool p(1); + auto r = R::RADOS::make_with_cct(cct.get(), p, + boost::asio::use_future).get(); + std::this_thread::sleep_for(50ms); + } + { + ceph::async::io_context_pool p(1); + auto r = R::RADOS::make_with_cct(cct.get(), p, + boost::asio::use_future).get(); + std::this_thread::sleep_for(50ms); + } + { + ceph::async::io_context_pool p(1); + auto r = R::RADOS::make_with_cct(cct.get(), p, + boost::asio::use_future).get(); + std::this_thread::sleep_for(50ms); + } + { + ceph::async::io_context_pool p(1); + auto r = R::RADOS::make_with_cct(cct.get(), p, + boost::asio::use_future).get(); + std::this_thread::sleep_for(5ms); + } + { + ceph::async::io_context_pool p(1); + auto r = R::RADOS::make_with_cct(cct.get(), p, + boost::asio::use_future).get(); + std::this_thread::sleep_for(5ms); + } + { + ceph::async::io_context_pool p(1); + auto r = R::RADOS::make_with_cct(cct.get(), p, + boost::asio::use_future).get(); + std::this_thread::sleep_for(5ms); + } + { + ceph::async::io_context_pool p(1); + auto r = R::RADOS::make_with_cct(cct.get(), p, + boost::asio::use_future).get(); + std::this_thread::sleep_for(5ms); + } + { + ceph::async::io_context_pool p(1); + auto r = R::RADOS::make_with_cct(cct.get(), p, + boost::asio::use_future).get(); + std::this_thread::sleep_for(5ms); + } + { + ceph::async::io_context_pool p(1); + auto r = R::RADOS::make_with_cct(cct.get(), p, + boost::asio::use_future).get(); + std::this_thread::sleep_for(5us); + } + { + ceph::async::io_context_pool p(1); + auto r = R::RADOS::make_with_cct(cct.get(), p, + boost::asio::use_future).get(); + std::this_thread::sleep_for(5us); + } + { + ceph::async::io_context_pool p(1); + auto r = R::RADOS::make_with_cct(cct.get(), p, + boost::asio::use_future).get(); + std::this_thread::sleep_for(5us); + } + return 0; +} -- 2.39.5