From 5258bcbd738e2a798ed95e1e7623eaa0fe7f3587 Mon Sep 17 00:00:00 2001 From: Daniel Gryniewicz Date: Tue, 24 Jan 2023 10:02:07 -0500 Subject: [PATCH] RGW - Add POSIX Driver This is the MVP for a driver for RGW that operates on top of a POSIX filesystem. It supports get, put, list, copy, multipart, external access via the filesystem itself, and ordered bucket listings via an LRU-based cache. Note that this is currently a Filter, indended to run on top of dbstore. This is because it currently doesn't have any User implementation, so it depends on dbstore's User. Everything else is implemented in POSIXDriver. Once there is a User implementation, this will become a Store, instead of a Filter. Commit messages from bucket listing cache: rgw/posixdriver: recycle lmdb database handles as required While LMDB workflows often do not close/return database handles, ours continually reuses them. This requires us to close each handle (atomically) when a cache entry is recycled. rgw/posixdriver: don't instantiate bucket cache entries from notify events rgw/posixdriver: incorporate lmdb-safe for now The current inclusion is based on https://github.com/Martchus/lmdb-safe, which is actively maintained but currently has some packaging issues the author has agreed to accept fixes for. For now, skip the submodule to save time and remove an external dependency. rgw/posixdriver: fix listing of cached, empty bucket * check lmdb enumeration result in all cases and w/better style * add unit test for enumeration of an empty cached directory rgw/posixdriver: nest lmdbs in a directory under the dbroot path to avoid cleanup issues rgw/posixdriver: refactor for posix integration * Derive BucketCache types as templates on a SAL driver and SAL bucket pair. * Integrate cache fills as callbacks into SAL layer (or mock, for tests) * Renaming and cleanups rgw/posixdriver: add bucket cache implementation and tests Adds free-standing cache of buckets and object names, with bucket names (and listing attributes, upcoming) managed in a hashed set of lmdb databases, which provides ordering and a high-performance listing cache. An framework for notification on new object creation (e.g., outside S3 workflow) is provided, and a Linux implementation using inotify. FindLMDB.cmake taken with attribution and license. rgw/posixdriver: add zpp_bits serialization (FAST) Signed-off-by: Daniel Gryniewicz Signed-off-by: Ali Maredia Signed-off-by: Matt Benjamin --- CMakeLists.txt | 1 + ceph.spec.in | 1 + cmake/modules/FindLMDB.cmake | 61 + debian/control | 1 + src/common/cohort_lru.h | 6 + src/common/options/rgw.yaml.in | 46 + src/include/config-h.in.cmake | 3 + src/rgw/CMakeLists.txt | 11 +- src/rgw/driver/posix/README.md | 37 + src/rgw/driver/posix/bucket_cache.cpp | 4 + src/rgw/driver/posix/bucket_cache.h | 549 +++ src/rgw/driver/posix/lmdb-safe-global.h | 36 + src/rgw/driver/posix/lmdb-safe.cc | 369 ++ src/rgw/driver/posix/lmdb-safe.hh | 638 +++ src/rgw/driver/posix/notify.cpp | 21 + src/rgw/driver/posix/notify.h | 255 + src/rgw/driver/posix/rgw_sal_posix.cc | 3096 ++++++++++++ src/rgw/driver/posix/rgw_sal_posix.h | 688 +++ src/rgw/driver/posix/unordered_dense.h | 1584 +++++++ src/rgw/driver/posix/zpp_bits.h | 5678 +++++++++++++++++++++++ src/rgw/driver/rados/rgw_sal_rados.cc | 6 - src/rgw/driver/rados/rgw_sal_rados.h | 1 - src/rgw/rgw_op.cc | 7 +- src/rgw/rgw_rest_s3.cc | 2 +- src/rgw/rgw_sal.cc | 19 + src/rgw/rgw_sal_dbstore.cc | 15 +- src/rgw/rgw_sal_dbstore.h | 6 +- src/rgw/rgw_sal_store.h | 4 + src/test/rgw/CMakeLists.txt | 11 + src/test/rgw/test_posix_bucket_cache.cc | 428 ++ 30 files changed, 13562 insertions(+), 22 deletions(-) create mode 100644 cmake/modules/FindLMDB.cmake create mode 100644 src/rgw/driver/posix/README.md create mode 100644 src/rgw/driver/posix/bucket_cache.cpp create mode 100644 src/rgw/driver/posix/bucket_cache.h create mode 100644 src/rgw/driver/posix/lmdb-safe-global.h create mode 100644 src/rgw/driver/posix/lmdb-safe.cc create mode 100644 src/rgw/driver/posix/lmdb-safe.hh create mode 100644 src/rgw/driver/posix/notify.cpp create mode 100644 src/rgw/driver/posix/notify.h create mode 100644 src/rgw/driver/posix/rgw_sal_posix.cc create mode 100644 src/rgw/driver/posix/rgw_sal_posix.h create mode 100644 src/rgw/driver/posix/unordered_dense.h create mode 100644 src/rgw/driver/posix/zpp_bits.h create mode 100644 src/test/rgw/test_posix_bucket_cache.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index 78881cb0188..2903e872c9e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -450,6 +450,7 @@ option(WITH_RADOSGW_DBSTORE "DBStore backend for RADOS Gateway" ON) option(WITH_RADOSGW_MOTR "CORTX-Motr backend for RADOS Gateway" OFF) option(WITH_RADOSGW_DAOS "DAOS backend for RADOS Gateway" OFF) option(WITH_RADOSGW_D4N "D4N wrapper for RADOS Gateway" ON) +option(WITH_RADOSGW_POSIX "POSIX backend for Rados Gateway" ON) option(WITH_RADOSGW_SELECT_PARQUET "Support for s3 select on parquet objects" ON) option(WITH_RADOSGW_ARROW_FLIGHT "Build arrow flight when not using system-provided arrow" OFF) option(WITH_RADOSGW_BACKTRACE_LOGGING "Enable backtraces in rgw logs" OFF) diff --git a/ceph.spec.in b/ceph.spec.in index 1fc998d014d..57ad8c4fdc5 100644 --- a/ceph.spec.in +++ b/ceph.spec.in @@ -270,6 +270,7 @@ BuildRequires: xfsprogs-devel BuildRequires: xmlstarlet BuildRequires: nasm BuildRequires: lua-devel +BuildRequires: lmdb-devel %if 0%{with seastar} || 0%{with jaeger} BuildRequires: yaml-cpp-devel >= 0.6 %endif diff --git a/cmake/modules/FindLMDB.cmake b/cmake/modules/FindLMDB.cmake new file mode 100644 index 00000000000..743cc486079 --- /dev/null +++ b/cmake/modules/FindLMDB.cmake @@ -0,0 +1,61 @@ +# Copyright (c) 2014, The Monero Project +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without modification, are +# permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, this list of +# conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, this list +# of conditions and the following disclaimer in the documentation and/or other +# materials provided with the distribution. +# +# 3. Neither the name of the copyright holder nor the names of its contributors may be +# used to endorse or promote products derived from this software without specific +# prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +# MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +# THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +# STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +# THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +MESSAGE(STATUS "Looking for liblmdb") + +FIND_PATH(LMDB_INCLUDE_DIR + NAMES lmdb.h + PATH_SUFFIXES include/ include/lmdb/ + PATHS "${PROJECT_SOURCE_DIR}" + ${LMDB_ROOT} + $ENV{LMDB_ROOT} + /usr/local/ + /usr/ +) + +if(STATIC) + if(MINGW) + find_library(LMDB_LIBRARIES liblmdb.dll.a) + else() + find_library(LMDB_LIBRARIES liblmdb.a) + endif() +else() + find_library(LMDB_LIBRARIES lmdb) +endif() + +IF(LMDB_INCLUDE_DIR) + MESSAGE(STATUS "Found liblmdb include (lmdb.h) in ${LMDB_INCLUDE_DIR}") + IF(LMDB_LIBRARIES) + MESSAGE(STATUS "Found liblmdb library") + set(LMDB_INCLUDE ${LMDB_INCLUDE_DIR}) + set(LMDB_LIBRARY ${LMDB_LIBRARIES}) + ELSE() + MESSAGE(FATAL_ERROR "${BoldRed}Could not find liblmdb library, please make sure you have installed liblmdb or liblmdb-dev or the equivalent${ColourReset}") + ENDIF() +ELSE() + MESSAGE(FATAL_ERROR "${BoldRed}Could not find liblmdb library, please make sure you have installed liblmdb or liblmdb-dev or the equivalent${ColourReset}") +ENDIF() diff --git a/debian/control b/debian/control index b17eb347d9a..47655668535 100644 --- a/debian/control +++ b/debian/control @@ -65,6 +65,7 @@ Build-Depends: automake, libsqlite3-dev, libssl-dev, libtool, + liblmdb-dev, libudev-dev, libnl-genl-3-dev, libxml2-dev, diff --git a/src/common/cohort_lru.h b/src/common/cohort_lru.h index b105c80ccd5..af2baaa5c67 100644 --- a/src/common/cohort_lru.h +++ b/src/common/cohort_lru.h @@ -16,6 +16,12 @@ #include #include +#ifdef __CEPH__ +# include "include/ceph_assert.h" +#else +# include +#endif + #include "common/likely.h" #ifndef CACHE_LINE_SIZE diff --git a/src/common/options/rgw.yaml.in b/src/common/options/rgw.yaml.in index 8928e853e64..dad1906a72c 100644 --- a/src/common/options/rgw.yaml.in +++ b/src/common/options/rgw.yaml.in @@ -3618,6 +3618,7 @@ options: - none - base - d4n + - posix - name: dbstore_db_dir type: str level: advanced @@ -3705,6 +3706,51 @@ options: default: false services: - rgw +- name: rgw_posix_base_path + type: str + level: advanced + desc: experimental Option to set base path for POSIX Driver + long_desc: Base path for the POSIX driver. All operations are relative to this path. + Defaults to /tmp/rgw_posix_driver + default: /tmp/rgw_posix_driver + services: + - rgw +- name: rgw_posix_database_root + type: str + level: advanced + desc: experimental Path to parent of POSIX Driver LMDB bucket listing cache + long_desc: Parent directory of LMDB bucket listing cache databases. + default: /var/lib/ceph/radosgw + services: + - rgw +- name: rgw_posix_cache_max_buckets + type: int + level: advanced + desc: experimental Number of buckets to maintain in the ordered listing cache + default: 100 + services: + - rgw +- name: rgw_posix_cache_lanes + type: int + level: advanced + desc: experimental Number of lanes in cache LRU + default: 3 + services: + - rgw +- name: rgw_posix_cache_partitions + type: int + level: advanced + desc: experimental Number of partitions in cache AVL + default: 3 + services: + - rgw +- name: rgw_posix_cache_lmdb_count + type: int + level: advanced + desc: experimental Number of lmdb partitions in the ordered listing cache + default: 3 + services: + - rgw - name: rgw_luarocks_location type: str level: advanced diff --git a/src/include/config-h.in.cmake b/src/include/config-h.in.cmake index 6b2ac58a351..f14a1f43a60 100644 --- a/src/include/config-h.in.cmake +++ b/src/include/config-h.in.cmake @@ -363,6 +363,9 @@ /* Backend CORTX-DAOS for Rados Gateway */ #cmakedefine WITH_RADOSGW_DAOS +/* Backend POSIX for Rados Gateway */ +#cmakedefine WITH_RADOSGW_POSIX + /* Defined if std::map::merge() is supported */ #cmakedefine HAVE_STDLIB_MAP_SPLICING diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt index 79bc05a4df1..6481970b2a9 100644 --- a/src/rgw/CMakeLists.txt +++ b/src/rgw/CMakeLists.txt @@ -227,6 +227,15 @@ if(WITH_RADOSGW_D4N) list(APPEND librgw_common_srcs driver/d4n/d4n_datacache.cc) list(APPEND librgw_common_srcs driver/d4n/rgw_sal_d4n.cc) endif() +if(WITH_RADOSGW_POSIX) + #add_subdirectory(driver/posix) + find_package(LMDB REQUIRED) + add_compile_definitions(LMDB_SAFE_NO_CPP_UTILITIES) + list(APPEND librgw_common_srcs + driver/posix/rgw_sal_posix.cc + driver/posix/lmdb-safe.cc + driver/posix/notify.cpp) +endif() if(WITH_JAEGER) list(APPEND librgw_common_srcs rgw_tracer.cc) endif() @@ -236,7 +245,6 @@ if(WITH_RADOSGW_ARROW_FLIGHT) list(APPEND librgw_common_srcs rgw_flight.cc rgw_flight_frontend.cc) endif(WITH_RADOSGW_ARROW_FLIGHT) - add_library(rgw_common STATIC ${librgw_common_srcs}) include(CheckCXXCompilerFlag) @@ -270,6 +278,7 @@ target_link_libraries(rgw_common ${EXPAT_LIBRARIES} ${ARROW_LIBRARIES} ${ARROW_FLIGHT_LIBRARIES} + ${LMDB_LIBRARIES} ${ALLOC_LIBS} PUBLIC ${LUA_LIBRARIES} diff --git a/src/rgw/driver/posix/README.md b/src/rgw/driver/posix/README.md new file mode 100644 index 00000000000..02dc8dfbe85 --- /dev/null +++ b/src/rgw/driver/posix/README.md @@ -0,0 +1,37 @@ +# POSIX Driver +Standalone Rados Gateway (RGW) on a local POSIX filesystem (Experimental) + + +## CMake Option +Add below cmake option (enabled by default) + + -DWITH_RADOSGW_POSIX=ON + + +## Build + + cd build + ninja [vstart] + + +## Running Test cluster +Currently, POSIXDriver depends on DBStore for user storage. This will change, eventually, but for now, it's run as a filter on top of DBStore. Not that only users are stored in DBStore, the rest is in the POSIX filesystem. +Edit ceph.conf to add below option + + [client] + rgw backend store = dbstore + rgw config store = dbstore + rgw filter = posix + +Start vstart cluster + + MON=0 OSD=0 MDS=0 MGR=0 RGW=1 ../src/vstart.sh -o rgw_backend_store=dbstore -o rgw_config_store=dbstore -o rgw_filter=posix -n -d + +The above vstart command brings up RGW server on POSIXDriver. It creates default zonegroup, zone and few default users (eg., testid) to be used for s3 operations. + +`radosgw-admin` can be used to create and remove other users, zonegroups and zones. + +By default, the directory exported is *'/tmp/rgw_posix_driver'*. This can be changed with the `rgw_posix_base_path` option, either in ceph.conf or on the vstart command line above. + +The POSIXDriver keeps a LMDB based cache of directories, so that it can provide ordered listings. This directory lives in `rgw_posix_database_root`, which by default is in *'/var/lib/ceph/radosgw'* + diff --git a/src/rgw/driver/posix/bucket_cache.cpp b/src/rgw/driver/posix/bucket_cache.cpp new file mode 100644 index 00000000000..9df08e40a13 --- /dev/null +++ b/src/rgw/driver/posix/bucket_cache.cpp @@ -0,0 +1,4 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp +#include "bucket_cache.h" + diff --git a/src/rgw/driver/posix/bucket_cache.h b/src/rgw/driver/posix/bucket_cache.h new file mode 100644 index 00000000000..3cbca7c58de --- /dev/null +++ b/src/rgw/driver/posix/bucket_cache.h @@ -0,0 +1,549 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "include/function2.hpp" +#include "common/cohort_lru.h" +#include "lmdb-safe.hh" +#include "zpp_bits.h" +#include "notify.h" +#include +#include // struct timespec +#include + +#include "rgw_common.h" +#include "rgw_sal.h" + +#include "fmt/format.h" + +namespace file::listing { + +namespace bi = boost::intrusive; +namespace sf = std::filesystem; + +typedef bi::link_mode link_mode; /* XXX normal */ +typedef bi::avl_set_member_hook member_hook_t; + +template +struct BucketCache; + +template +struct BucketCacheEntry : public cohort::lru::Object +{ + using lock_guard = std::lock_guard; + using unique_lock = std::unique_lock; + + static constexpr uint32_t FLAG_NONE = 0x0000; + static constexpr uint32_t FLAG_FILLED = 0x0001; + static constexpr uint32_t FLAG_DELETED = 0x0002; + + static constexpr uint64_t seed = 8675309; + + BucketCache* bc; + std::string name; + std::shared_ptr env; + LMDBSafe::MDBDbi dbi; + uint64_t hk; + member_hook_t name_hook; + + // XXX clean this up + std::mutex mtx; // XXX Adam's preferred shared mtx? + std::condition_variable cv; + uint32_t flags; + +public: + BucketCacheEntry(BucketCache* bc, const std::string& name, uint64_t hk) + : bc(bc), name(name), hk(hk), flags(FLAG_NONE) {} + + void set_env(std::shared_ptr& _env, LMDBSafe::MDBDbi& _dbi) { + env = _env; + dbi = _dbi; + } + + inline bool deleted() const { + return flags & FLAG_DELETED; + } + + class Factory : public cohort::lru::ObjectFactory + { + public: + BucketCache* bc; + const std::string& name; + uint64_t hk; + uint32_t flags; + + Factory() = delete; + Factory(BucketCache *bc, const std::string& name) + : bc(bc), name(name), flags(FLAG_NONE) { + hk = XXH64(name.c_str(), name.length(), BucketCacheEntry::seed); + } + + void recycle (cohort::lru::Object* o) override { + /* re-use an existing object */ + o->~Object(); // call lru::Object virtual dtor + // placement new! + new (o) BucketCacheEntry(bc, name, hk); + } + + cohort::lru::Object* alloc() override { + return new BucketCacheEntry(bc, name, hk); + } + }; /* Factory */ + + struct BucketCacheEntryLT + { + // for internal ordering + bool operator()(const BucketCacheEntry& lhs, const BucketCacheEntry& rhs) const + { return (lhs.name < rhs.name); } + + // for external search by name + bool operator()(const std::string& k, const BucketCacheEntry& rhs) const + { return k < rhs.name; } + + bool operator()(const BucketCacheEntry& lhs, const std::string& k) const + { return lhs.name < k; } + }; + + struct BucketCacheEntryEQ + { + bool operator()(const BucketCacheEntry& lhs, const BucketCacheEntry& rhs) const + { return (lhs.name == rhs.name); } + + bool operator()(const std::string& k, const BucketCacheEntry& rhs) const + { return k == rhs.name; } + + bool operator()(const BucketCacheEntry& lhs, const std::string& k) const + { return lhs.name == k; } + }; + + typedef cohort::lru::LRU bucket_lru; + + typedef bi::member_hook name_hook_t; + typedef bi::avltree, name_hook_t> bucket_avl_t; + typedef cohort::lru::TreeX bucket_avl_cache; + + bool reclaim(const cohort::lru::ObjectFactory* newobj_fac) { + auto factory = dynamic_cast::Factory*>(newobj_fac); + if (factory == nullptr) { + return false; + } + { /* anon block */ + /* in this case, we are being called from a context which holds + * A partition lock, and this may be still in use */ + lock_guard{mtx}; + if (! deleted()) { + flags |= FLAG_DELETED; + bc->recycle_count++; + + //std::cout << fmt::format("reclaim {}!", name) << std::endl; + bc->un->remove_watch(name); +#if 1 + // depends on safe_link + if (! name_hook.is_linked()) { + // this should not happen! + abort(); + } +#endif + bc->cache.remove(hk, this, bucket_avl_cache::FLAG_NONE); + + /* discard lmdb data associated with this bucket */ + auto txn = env->getRWTransaction(); + mdb_drop(*txn, dbi, 0); + txn->commit(); + /* LMDB applications don't "normally" close database handles, + * but doing so (atomically) is supported, and we must as + * we continually recycle them */ + mdb_dbi_close(*env, dbi); // return db handle + } /* ! deleted */ + } + return true; +} /* reclaim */ + +}; /* BucketCacheEntry */ + +using fill_cache_cb_t = + const fu2::unique_function; + +using list_bucket_each_t = + const fu2::unique_function; + +template +struct BucketCache : public Notifiable +{ + using lock_guard = std::lock_guard; + using unique_lock = std::unique_lock; + + D* driver; + std::string bucket_root; + uint32_t max_buckets; + std::atomic recycle_count; + std::mutex mtx; + + /* the bucket lru cache keeps track of the buckets whose listings are + * being cached in lmdb databases and updated from notify */ + typename BucketCacheEntry::bucket_lru lru; + typename BucketCacheEntry::bucket_avl_cache cache; + sf::path rp; + + /* the lmdb handle cache maintains a vector of lmdb environments, + * each supports 1 rw and unlimited ro transactions; the materialized + * listing for each bucket is stored as a database in one of these + * environments, selected by a hash of the bucket name; a bucket's database + * is dropped/cleared whenever its entry is reclaimed from cache; the entire + * complex is cleared on restart to preserve consistency */ + class Lmdbs + { + std::string database_root; + uint8_t lmdb_count; + std::vector> envs; + sf::path dbp; + + public: + Lmdbs(std::string& database_root, uint8_t lmdb_count) + : database_root(database_root), lmdb_count(lmdb_count), + dbp(database_root) { + + /* create a root for lmdb directory partitions (if it doesn't + * exist already) */ + sf::path safe_root_path{dbp / fmt::format("rgw_posix_lmdbs")}; + sf::create_directory(safe_root_path); + + /* purge cache completely */ + for (const auto& dir_entry : sf::directory_iterator{safe_root_path}) { + sf::remove_all(dir_entry); + } + + /* repopulate cache basis */ + for (int ix = 0; ix < lmdb_count; ++ix) { + sf::path env_path{safe_root_path / fmt::format("part_{}", ix)}; + sf::create_directory(env_path); + auto env = LMDBSafe::getMDBEnv(env_path.string().c_str(), 0 /* flags? */, 0600); + envs.push_back(env); + } + } + + inline std::shared_ptr& get_sp_env(BucketCacheEntry* bucket) { + return envs[(bucket->hk % lmdb_count)]; + } + + inline LMDBSafe::MDBEnv& get_env(BucketCacheEntry* bucket) { + return *(get_sp_env(bucket)); + } + + const std::string& get_root() const { return database_root; } + } lmdbs; + + std::unique_ptr un; + +public: + BucketCache(D* driver, std::string bucket_root, std::string database_root, + uint32_t max_buckets=100, uint8_t max_lanes=3, + uint8_t max_partitions=3, uint8_t lmdb_count=3) + : driver(driver), bucket_root(bucket_root), max_buckets(max_buckets), + lru(max_lanes, max_buckets/max_lanes), + cache(max_lanes, max_buckets/max_partitions), + rp(bucket_root), + lmdbs(database_root, lmdb_count), + un(Notify::factory(this, bucket_root)) + { + if (! (sf::exists(rp) && sf::is_directory(rp))) { + std::cerr << fmt::format("{} bucket root {} invalid", __func__, + bucket_root) << std::endl; + exit(1); + } + + sf::path dp{database_root}; + if (! (sf::exists(dp) && sf::is_directory(dp))) { + std::cerr << fmt::format("{} database root {} invalid", __func__, + database_root) << std::endl; + exit(1); + } + } + + static constexpr uint32_t FLAG_NONE = 0x0000; + static constexpr uint32_t FLAG_CREATE = 0x0001; + static constexpr uint32_t FLAG_LOCK = 0x0002; + + typedef std::tuple*, uint32_t> GetBucketResult; + + GetBucketResult get_bucket(const std::string& name, uint32_t flags) + { + /* this fn returns a bucket locked appropriately, having atomically + * found or inserted the required BucketCacheEntry in_avl*/ + BucketCacheEntry* b{nullptr}; + typename BucketCacheEntry::Factory fac(this, name); + typename BucketCacheEntry::bucket_avl_cache::Latch lat; + uint32_t iflags{cohort::lru::FLAG_INITIAL}; + GetBucketResult result{nullptr, 0}; + + retry: + b = cache.find_latch(fac.hk /* partition selector */, + name /* key */, lat /* serializer */, BucketCacheEntry::bucket_avl_cache::FLAG_LOCK); + /* LATCHED */ + if (b) { + b->mtx.lock(); + if (b->deleted() || + ! lru.ref(b, cohort::lru::FLAG_INITIAL)) { + // lru ref failed + lat.lock->unlock(); + b->mtx.unlock(); + goto retry; + } + lat.lock->unlock(); + /* LOCKED */ + } else { + /* BucketCacheEntry not in cache */ + if (! (flags & BucketCache::FLAG_CREATE)) { + /* the caller does not want to instantiate a new cache + * entry (i.e., only wants to notify on an existing one) */ + return result; + } + /* we need to create it */ + b = static_cast*>( + lru.insert(&fac, cohort::lru::Edge::MRU, iflags)); + if (b) [[likely]] { + b->mtx.lock(); + + /* attach bucket to an lmdb partition and prepare it for i/o */ + auto& env = lmdbs.get_sp_env(b); + auto dbi = env->openDB(b->name, MDB_CREATE); + b->set_env(env, dbi); + + if (! (iflags & cohort::lru::FLAG_RECYCLE)) [[likely]] { + /* inserts at cached insert iterator, releasing latch */ + cache.insert_latched(b, lat, BucketCacheEntry::bucket_avl_cache::FLAG_UNLOCK); + } else { + /* recycle step invalidates Latch */ + lat.lock->unlock(); /* !LATCHED */ + cache.insert(fac.hk, b, BucketCacheEntry::bucket_avl_cache::FLAG_NONE); + } + get<1>(result) |= BucketCache::FLAG_CREATE; + } else { + /* XXX lru allocate failed? seems impossible--that would mean that + * fallback to the allocator also failed, and maybe we should abend */ + lat.lock->unlock(); + goto retry; /* !LATCHED */ + } + } /* have BucketCacheEntry */ + + if (! (flags & BucketCache::FLAG_LOCK)) { + b->mtx.unlock(); + } + get<0>(result) = b; + return result; + } /* get_bucket */ + + static inline std::string concat_key(const rgw_obj_index_key& k) { + std::string k_str; + k_str.reserve(k.name.size() + k.instance.size()); + k_str += k.name; + k_str += k.instance; + return k_str; + } + + int fill(const DoutPrefixProvider* dpp, BucketCacheEntry* bucket, + B* sal_bucket, uint32_t flags, optional_yield y) /* assert: LOCKED */ + { + auto txn = bucket->env->getRWTransaction(); + + /* instruct the bucket provider to enumerate all entries, + * in any order */ + auto rc = sal_bucket->fill_cache(dpp, y, + [&](const DoutPrefixProvider* dpp, rgw_bucket_dir_entry& bde) -> int { + auto concat_k = concat_key(bde.key); + std::string ser_data; + zpp::bits::out out(ser_data); + struct timespec ts{ceph::real_clock::to_timespec(bde.meta.mtime)}; + auto errc = + out(bde.key.name, bde.key.instance, /* XXX bde.key.ns, */ + bde.ver.pool, bde.ver.epoch, bde.exists, + bde.meta.category, bde.meta.size, ts.tv_sec, ts.tv_nsec, + bde.meta.owner, bde.meta.owner_display_name, bde.meta.accounted_size, + bde.meta.storage_class, bde.meta.appendable, bde.meta.etag); + /*std::cout << fmt::format("fill: bde.key.name: {}", bde.key.name) + << std::endl;*/ + if (errc.code != std::errc{0}) { + abort(); + return 0; // XXX non-zero return? + } + txn->put(bucket->dbi, concat_k, ser_data); + //std::cout << fmt::format("{} {}", __func__, bde.key.name) << '\n'; + return 0; + }); + + txn->commit(); + bucket->flags |= BucketCacheEntry::FLAG_FILLED; + un->add_watch(bucket->name, bucket); + return rc; + } /* fill */ + + int list_bucket(const DoutPrefixProvider* dpp, optional_yield y, B* sal_bucket, + std::string marker, list_bucket_each_t each_func) { + + using namespace LMDBSafe; + + int rc __attribute__((unused)) = 0; + GetBucketResult gbr = + get_bucket(sal_bucket->get_name(), + BucketCache::FLAG_LOCK | BucketCache::FLAG_CREATE); + auto [b /* BucketCacheEntry */, flags] = gbr; + if (b /* XXX again, can this fail? */) { + if (! (b->flags & BucketCacheEntry::FLAG_FILLED)) { + /* bulk load into lmdb cache */ + rc = fill(dpp, b, sal_bucket, FLAG_NONE, y); + } + /* display them */ + b->mtx.unlock(); + /*! LOCKED */ + + auto txn = b->env->getROTransaction(); + auto cursor=txn->getCursor(b->dbi); + MDBOutVal key, data; + bool again{true}; + + const auto proc_result = [&]() { + zpp::bits::errc errc{}; + rgw_bucket_dir_entry bde{}; + /* XXX we may not need to recover the cache key */ + std::string_view svk __attribute__((unused)) = + key.get(); // {name, instance, [ns]} + std::string_view svv = data.get(); + std::string ser_v{svv}; + zpp::bits::in in_v(ser_v); + struct timespec ts; + errc = + in_v(bde.key.name, bde.key.instance, /* bde.key.ns, */ + bde.ver.pool, bde.ver.epoch, bde.exists, + bde.meta.category, bde.meta.size, ts.tv_sec, ts.tv_nsec, + bde.meta.owner, bde.meta.owner_display_name, bde.meta.accounted_size, + bde.meta.storage_class, bde.meta.appendable, bde.meta.etag); + if (errc.code != std::errc{0}) { + abort(); + } + bde.meta.mtime = ceph::real_clock::from_timespec(ts); + again = each_func(bde); + }; + + if (! marker.empty()) { + MDBInVal k(marker); + auto rc = cursor.lower_bound(k, key, data); + if (rc == MDB_NOTFOUND) { + /* no key sorts after k/marker, so there is nothing to do */ + return 0; + } + proc_result(); + } else { + /* position at start of index */ + auto rc = cursor.get(key, data, MDB_FIRST); + if (rc == MDB_SUCCESS) { + proc_result(); + } + } + while(cursor.get(key, data, MDB_NEXT) == MDB_SUCCESS) { + if (!again) { + return 0; + } + proc_result(); + } + lru.unref(b, cohort::lru::FLAG_NONE); + } /* b */ + + return 0; + } /* list_bucket */ + + int notify(const std::string& bname, void* opaque, + const std::vector& evec) override { + + using namespace LMDBSafe; + + int rc{0}; + GetBucketResult gbr = get_bucket(bname, BucketCache::FLAG_LOCK); + auto [b /* BucketCacheEntry */, flags] = gbr; + if (b) { + unique_lock ulk{b->mtx, std::adopt_lock}; + if ((b->name != bname) || + (b != opaque) || + (! (b->flags & BucketCacheEntry::FLAG_FILLED))) { + /* do nothing */ + return 0; + } + ulk.unlock(); + auto txn = b->env->getRWTransaction(); + for (const auto& ev : evec) { + using EventType = Notifiable::EventType; + /* + std::string_view nil{""}; + std::cout << fmt::format("notify {} {}!", + ev.name ? *ev.name : nil, + uint32_t(ev.type)) + << std::endl; */ + switch (ev.type) + { + case EventType::ADD: + { + rgw_bucket_dir_entry bde{}; + bde.key.name = *ev.name; + /* XXX will need work (if not straight up magic) to have + * side loading support instance and ns */ + auto concat_k = concat_key(bde.key); + rc = driver->mint_listing_entry(b->name, bde); + std::string ser_data; + zpp::bits::out out(ser_data); + struct timespec ts{ceph::real_clock::to_timespec(bde.meta.mtime)}; + auto errc = + out(bde.key.name, bde.key.instance, /* XXX bde.key.ns, */ + bde.ver.pool, bde.ver.epoch, bde.exists, + bde.meta.category, bde.meta.size, ts.tv_sec, ts.tv_nsec, + bde.meta.owner, bde.meta.owner_display_name, bde.meta.accounted_size, + bde.meta.storage_class, bde.meta.appendable, bde.meta.etag); + if (errc.code != std::errc{0}) { + abort(); + } + txn->put(b->dbi, concat_k, ser_data); + } + break; + case EventType::REMOVE: + { + auto& ev_name = *ev.name; + txn->del(b->dbi, ev_name); + } + break; + [[unlikely]] case EventType::INVALIDATE: + { + /* yikes, cache blown */ + ulk.lock(); + mdb_drop(*txn, b->dbi, 0); + txn->commit(); + b->flags &= ~BucketCacheEntry::FLAG_FILLED; + return 0; /* don't process any more events in this batch */ + } + break; + default: + /* unknown event */ + break; + } + } /* all events */ + txn->commit(); + lru.unref(b, cohort::lru::FLAG_NONE); + } /* b */ + return rc; + } /* notify */ + +}; /* BucketCache */ + +} // namespace file::listing diff --git a/src/rgw/driver/posix/lmdb-safe-global.h b/src/rgw/driver/posix/lmdb-safe-global.h new file mode 100644 index 00000000000..c5b2d3724b8 --- /dev/null +++ b/src/rgw/driver/posix/lmdb-safe-global.h @@ -0,0 +1,36 @@ +/* +MIT License + +Copyright (c) 2018 bert hubert + +Permission is hereby granted, free of charge, to any person obtaining a copy +*/ +#ifndef LMDB_SAFE_GLOBAL +#define LMDB_SAFE_GLOBAL + +#ifndef LMDB_SAFE_NO_CPP_UTILITIES +#include +#else +#undef LMDB_SAFE_STATIC +#define LMDB_SAFE_STATIC 1 +#endif + +#ifdef LMDB_SAFE_STATIC +#define LMDB_SAFE_EXPORT +#define LMDB_SAFE_IMPORT +#else +#define LMDB_SAFE_EXPORT CPP_UTILITIES_GENERIC_LIB_EXPORT +#define LMDB_SAFE_IMPORT CPP_UTILITIES_GENERIC_LIB_IMPORT +#endif + +/*! + * \def LMDB_SAFE_EXPORT + * \brief Marks the symbol to be exported by the lmdb-safe library. + */ + +/*! + * \def LMDB_SAFE_IMPORT + * \brief Marks the symbol to be imported from the lmdb-safe library. + */ + +#endif // LMDB_SAFE_GLOBAL diff --git a/src/rgw/driver/posix/lmdb-safe.cc b/src/rgw/driver/posix/lmdb-safe.cc new file mode 100644 index 00000000000..389d016d288 --- /dev/null +++ b/src/rgw/driver/posix/lmdb-safe.cc @@ -0,0 +1,369 @@ +/* +MIT License + +Copyright (c) 2018 bert hubert + +Permission is hereby granted, free of charge, to any person obtaining a copy +*/ +#include "lmdb-safe.hh" + +#include +#include + +#include +#include +#include +#include + +using namespace std; + +namespace LMDBSafe { + +MDBDbi::MDBDbi(MDB_env *env, MDB_txn *txn, const string_view dbname, unsigned int flags) +{ + (void)env; + // A transaction that uses this function must finish (either commit or abort) before any other transaction in the process may use this function. + + if (const auto rc = mdb_dbi_open(txn, dbname.empty() ? 0 : &dbname[0], flags, &d_dbi)) + throw LMDBError("Unable to open named database: ", rc); + + // Database names are keys in the unnamed database, and may be read but not written. +} + +MDBEnv::MDBEnv(const char *fname, unsigned int flags, mdb_mode_t mode, MDB_dbi maxDBs) +{ + mdb_env_create(&d_env); + if (const auto rc = mdb_env_set_mapsize(d_env, 16ULL * 4096 * 244140ULL)) { // 4GB + throw LMDBError("Setting map size: ", rc); + } + // Various other options may also need to be set before opening the handle, e.g. mdb_env_set_mapsize(), mdb_env_set_maxreaders(), mdb_env_set_maxdbs(), + if (const auto rc = mdb_env_set_maxdbs(d_env, maxDBs)) { + throw LMDBError("Setting maxdbs: ", rc); + } + + // we need MDB_NOTLS since we rely on its semantics + if (const auto rc = mdb_env_open(d_env, fname, flags | MDB_NOTLS, mode)) { + // If this function fails, mdb_env_close() must be called to discard the MDB_env handle. + mdb_env_close(d_env); + throw LMDBError("Unable to open database file " + std::string(fname) + ": ", rc); + } +} + +void MDBEnv::incROTX() +{ + std::lock_guard l(d_countmutex); + ++d_ROtransactionsOut[std::this_thread::get_id()]; +} + +void MDBEnv::decROTX() +{ + std::lock_guard l(d_countmutex); + --d_ROtransactionsOut[std::this_thread::get_id()]; +} + +void MDBEnv::incRWTX() +{ + std::lock_guard l(d_countmutex); + ++d_RWtransactionsOut[std::this_thread::get_id()]; +} + +void MDBEnv::decRWTX() +{ + std::lock_guard l(d_countmutex); + --d_RWtransactionsOut[std::this_thread::get_id()]; +} + +int MDBEnv::getRWTX() +{ + std::lock_guard l(d_countmutex); + return d_RWtransactionsOut[std::this_thread::get_id()]; +} +int MDBEnv::getROTX() +{ + std::lock_guard l(d_countmutex); + return d_ROtransactionsOut[std::this_thread::get_id()]; +} + +std::shared_ptr getMDBEnv(const char *fname, unsigned int flags, mdb_mode_t mode, MDB_dbi maxDBs) +{ + struct Value { + weak_ptr wp; + unsigned int flags; + }; + + static std::map, Value> s_envs; + static std::mutex mut; + + struct stat statbuf; + if (stat(fname, &statbuf)) { + if (errno != ENOENT) + throw LMDBError("Unable to stat prospective mdb database: " + string(strerror(errno))); + else { + std::lock_guard l(mut); + auto fresh = std::make_shared(fname, flags, mode, maxDBs); + if (stat(fname, &statbuf)) + throw LMDBError("Unable to stat prospective mdb database: " + string(strerror(errno))); + auto key = std::tie(statbuf.st_dev, statbuf.st_ino); + s_envs[key] = { fresh, flags }; + return fresh; + } + } + + std::lock_guard l(mut); + auto key = std::tie(statbuf.st_dev, statbuf.st_ino); + auto iter = s_envs.find(key); + if (iter != s_envs.end()) { + auto sp = iter->second.wp.lock(); + if (sp) { + if (iter->second.flags != flags) + throw LMDBError("Can't open mdb with differing flags"); + + return sp; + } else { + s_envs.erase(iter); // useful if make_shared fails + } + } + + auto fresh = std::make_shared(fname, flags, mode, maxDBs); + s_envs[key] = { fresh, flags }; + + return fresh; +} + +MDBDbi MDBEnv::openDB(const string_view dbname, unsigned int flags) +{ + unsigned int envflags; + mdb_env_get_flags(d_env, &envflags); + /* + This function must not be called from multiple concurrent transactions in the same process. A transaction that uses this function must finish (either commit or abort) before any other transaction in the process may use this function. + */ + std::lock_guard l(d_openmut); + + if (!(envflags & MDB_RDONLY)) { + auto rwt = getRWTransaction(); + MDBDbi ret = rwt->openDB(dbname, flags); + rwt->commit(); + return ret; + } + + MDBDbi ret; + { + auto rwt = getROTransaction(); + ret = rwt->openDB(dbname, flags); + } + return ret; +} + +MDBRWTransactionImpl::MDBRWTransactionImpl(MDBEnv *parent, MDB_txn *txn) + : MDBROTransactionImpl(parent, txn) + +{ +} + +MDB_txn *MDBRWTransactionImpl::openRWTransaction(MDBEnv *env, MDB_txn *parent, unsigned int flags) +{ + MDB_txn *result; + if (env->getRWTX()) + throw LMDBError("Duplicate RW transaction"); + + for (int tries = 0; tries < 3; ++tries) { // it might happen twice, who knows + if (int rc = mdb_txn_begin(env->d_env, parent, flags, &result)) { + if (rc == MDB_MAP_RESIZED && tries < 2) { + // "If the mapsize is increased by another process (..) mdb_txn_begin() will return MDB_MAP_RESIZED. + // call mdb_env_set_mapsize with a size of zero to adopt the new size." + mdb_env_set_mapsize(env->d_env, 0); + continue; + } + throw LMDBError("Unable to start RW transaction: ", rc); + } + break; + } + env->incRWTX(); + return result; +} + +MDBRWTransactionImpl::MDBRWTransactionImpl(MDBEnv *parent, unsigned int flags) + : MDBRWTransactionImpl(parent, openRWTransaction(parent, nullptr, flags)) +{ +} + +MDBRWTransactionImpl::~MDBRWTransactionImpl() +{ + MDBRWTransactionImpl::abort(); +} + +void MDBRWTransactionImpl::commit() +{ + closeRORWCursors(); + if (!d_txn) { + return; + } + + if (const auto rc = mdb_txn_commit(d_txn)) { + throw LMDBError("Committing transaction: ", rc); + } + environment().decRWTX(); + d_txn = nullptr; +} + +void MDBRWTransactionImpl::abort() +{ + closeRORWCursors(); + if (!d_txn) { + return; + } + + mdb_txn_abort(d_txn); + // prevent the RO destructor from cleaning up the transaction itself + environment().decRWTX(); + d_txn = nullptr; +} + +MDBROTransactionImpl::MDBROTransactionImpl(MDBEnv *parent, MDB_txn *txn) + : d_parent(parent) + , d_cursors() + , d_txn(txn) +{ +} + +MDB_txn *MDBROTransactionImpl::openROTransaction(MDBEnv *env, MDB_txn *parent, unsigned int flags) +{ + if (env->getRWTX()) + throw LMDBError("Duplicate RO transaction"); + + /* + A transaction and its cursors must only be used by a single thread, and a thread may only have a single transaction at a time. If MDB_NOTLS is in use, this does not apply to read-only transactions. */ + MDB_txn *result = nullptr; + for (int tries = 0; tries < 3; ++tries) { // it might happen twice, who knows + if (const auto rc = mdb_txn_begin(env->d_env, parent, MDB_RDONLY | flags, &result)) { + if (rc == MDB_MAP_RESIZED && tries < 2) { + // "If the mapsize is increased by another process (..) mdb_txn_begin() will return MDB_MAP_RESIZED. + // call mdb_env_set_mapsize with a size of zero to adopt the new size." + mdb_env_set_mapsize(env->d_env, 0); + continue; + } + throw LMDBError("Unable to start RO transaction: ", rc); + } + break; + } + env->incROTX(); + + return result; +} + +void MDBROTransactionImpl::closeROCursors() +{ + // we need to move the vector away to ensure that the cursors don’t mess with our iteration. + std::vector buf; + std::swap(d_cursors, buf); + for (auto &cursor : buf) { + cursor->close(); + } +} + +MDBROTransactionImpl::MDBROTransactionImpl(MDBEnv *parent, unsigned int flags) + : MDBROTransactionImpl(parent, openROTransaction(parent, nullptr, flags)) +{ +} + +MDBROTransactionImpl::~MDBROTransactionImpl() +{ + // this is safe because C++ will not call overrides of virtual methods in destructors. + MDBROTransactionImpl::commit(); +} + +void MDBROTransactionImpl::abort() +{ + closeROCursors(); + // if d_txn is non-nullptr here, either the transaction object was invalidated earlier (e.g. by moving from it), or it is an RW transaction which has already cleaned up the d_txn pointer (with an abort). + if (d_txn) { + d_parent->decROTX(); + mdb_txn_abort(d_txn); // this appears to work better than abort for r/o database opening + d_txn = nullptr; + } +} + +void MDBROTransactionImpl::commit() +{ + closeROCursors(); + // if d_txn is non-nullptr here, either the transaction object was invalidated earlier (e.g. by moving from it), or it is an RW transaction which has already cleaned up the d_txn pointer (with an abort). + if (d_txn) { + d_parent->decROTX(); + if (const auto rc = mdb_txn_commit(d_txn)) { // this appears to work better than abort for r/o database opening + throw LMDBError("Error committing transaction: ", rc); + } + d_txn = nullptr; + } +} + +void MDBRWTransactionImpl::clear(MDB_dbi dbi) +{ + if (const auto rc = mdb_drop(d_txn, dbi, 0)) { + throw LMDBError("Error clearing database: ", rc); + } +} + +MDBRWCursor MDBRWTransactionImpl::getRWCursor(const MDBDbi &dbi) +{ + MDB_cursor *cursor; + ; + if (const auto rc = mdb_cursor_open(d_txn, dbi, &cursor)) { + throw LMDBError("Error creating RO cursor: ", rc); + } + return MDBRWCursor(d_rw_cursors, cursor); +} + +MDBRWCursor MDBRWTransactionImpl::getCursor(const MDBDbi &dbi) +{ + return getRWCursor(dbi); +} + +MDBRWTransaction MDBRWTransactionImpl::getRWTransaction() +{ + MDB_txn *txn; + if (const auto rc = mdb_txn_begin(environment(), *this, 0, &txn)) { + throw LMDBError("Failed to start child transaction: ", rc); + } + // we need to increase the counter here because commit/abort on the child transaction will decrease it + environment().incRWTX(); + return MDBRWTransaction(new MDBRWTransactionImpl(&environment(), txn)); +} + +MDBROTransaction MDBRWTransactionImpl::getROTransaction() +{ + return getRWTransaction(); +} + +MDBROTransaction MDBEnv::getROTransaction() +{ + return MDBROTransaction(new MDBROTransactionImpl(this)); +} +MDBRWTransaction MDBEnv::getRWTransaction() +{ + return MDBRWTransaction(new MDBRWTransactionImpl(this)); +} + +void MDBRWTransactionImpl::closeRWCursors() +{ + decltype(d_rw_cursors) buf; + std::swap(d_rw_cursors, buf); + for (auto &cursor : buf) { + cursor->close(); + } +} + +MDBROCursor MDBROTransactionImpl::getCursor(const MDBDbi &dbi) +{ + return getROCursor(dbi); +} + +MDBROCursor MDBROTransactionImpl::getROCursor(const MDBDbi &dbi) +{ + MDB_cursor *cursor; + if (const auto rc = mdb_cursor_open(d_txn, dbi, &cursor)) { + throw LMDBError("Error creating RO cursor: ", rc); + } + return MDBROCursor(d_cursors, cursor); +} + +} // namespace LMDBSafe diff --git a/src/rgw/driver/posix/lmdb-safe.hh b/src/rgw/driver/posix/lmdb-safe.hh new file mode 100644 index 00000000000..807ad212b49 --- /dev/null +++ b/src/rgw/driver/posix/lmdb-safe.hh @@ -0,0 +1,638 @@ +/* +MIT License + +Copyright (c) 2018 bert hubert + +Permission is hereby granted, free of charge, to any person obtaining a copy +*/ +#pragma once + +#include "lmdb-safe-global.h" + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +/*! + * \brief The LMDBSafe namespace contains all classes/types contained by the lmdb-safe and + * lmdb-typed libraries. + * \remarks + * - Error strategy: Anything that "should never happen" turns into an exception. But things + * like "duplicate entry" or "no such key" are for you to deal with. + * - Thread safety: We are as safe as LMDB. You can talk to MDBEnv from as many threads as you + * want. + */ +namespace LMDBSafe { + +// apple compiler somehow has string_view even in c++11! +#ifdef __cpp_lib_string_view +using std::string_view; +#else +#include +#if BOOST_VERSION > 105400 +#include +using boost::string_view; +#else +#include +using string_view = boost::string_ref; +#endif +#endif + +/*! + * \brief The LMDBError class is thrown when an error happens. + */ +class LMDB_SAFE_EXPORT LMDBError : public std::runtime_error { +public: + explicit LMDBError(const std::string &error) noexcept + : std::runtime_error(error) + , ec(0) + { + } + + explicit LMDBError(const std::string &context, int error) noexcept + : std::runtime_error(context + mdb_strerror(error)) + , ec(error) + { + } + + const int ec; +}; + +/*! + * \brief The MDBDbi class is our only 'value type' object as 1) a dbi is actually an integer + * and 2) per LMDB documentation, we never close it. + */ +class LMDB_SAFE_EXPORT MDBDbi { +public: + MDBDbi() + { + d_dbi = std::numeric_limits::max(); + } + explicit MDBDbi(MDB_env *env, MDB_txn *txn, string_view dbname, unsigned int flags); + + operator const MDB_dbi &() const + { + return d_dbi; + } + + MDB_dbi d_dbi; +}; + +class MDBRWTransactionImpl; +class MDBROTransactionImpl; +using MDBROTransaction = std::unique_ptr; +using MDBRWTransaction = std::unique_ptr; + +/*! + * \brief The MDBEnv class is a handle to an MDB environment. + */ +class LMDB_SAFE_EXPORT MDBEnv { +public: + MDBEnv(const char *fname, unsigned int flags, mdb_mode_t mode, MDB_dbi maxDBs = 10); + + /*! + * \brief Closes the MDB environment. + * \remarks Only a single thread may call this function. All transactions, databases, and cursors must already be closed + * before calling this function. + */ + ~MDBEnv() + { + mdb_env_close(d_env); + } + + MDBDbi openDB(const string_view dbname, unsigned int flags); + + MDBRWTransaction getRWTransaction(); + MDBROTransaction getROTransaction(); + + operator MDB_env *&() + { + return d_env; + } + MDB_env *d_env; + + int getRWTX(); + void incRWTX(); + void decRWTX(); + int getROTX(); + void incROTX(); + void decROTX(); + +private: + std::mutex d_openmut; + std::mutex d_countmutex; + std::map d_RWtransactionsOut; + std::map d_ROtransactionsOut; +}; + +/*! + * \brief Opens an MDB environment for the specified database file. + */ +LMDB_SAFE_EXPORT std::shared_ptr getMDBEnv(const char *fname, unsigned int flags, mdb_mode_t mode, MDB_dbi maxDBs = 128); + +/*! + * \brief The MDBOutVal struct is the handle to an MDB value used as output. + */ +struct LMDB_SAFE_EXPORT MDBOutVal { + operator MDB_val &() + { + return d_mdbval; + } + + template ::value, T>::type * = nullptr> const T get() + { + T ret; + if (d_mdbval.mv_size != sizeof(T)) + throw LMDBError("MDB data has wrong length for type"); + + memcpy(&ret, d_mdbval.mv_data, sizeof(T)); + return ret; + } + + template ::value, T>::type * = nullptr> T get() const; + + template T get_struct() const + { + T ret; + if (d_mdbval.mv_size != sizeof(T)) + throw LMDBError("MDB data has wrong length for type"); + + memcpy(&ret, d_mdbval.mv_data, sizeof(T)); + return ret; + } + + template const T *get_struct_ptr() const + { + if (d_mdbval.mv_size != sizeof(T)) + throw LMDBError("MDB data has wrong length for type"); + + return reinterpret_cast(d_mdbval.mv_data); + } + + MDB_val d_mdbval; +}; + +template <> inline std::string MDBOutVal::get() const +{ + return std::string(static_cast(d_mdbval.mv_data), d_mdbval.mv_size); +} + +template <> inline string_view MDBOutVal::get() const +{ + return string_view(static_cast(d_mdbval.mv_data), d_mdbval.mv_size); +} + +/*! + * \brief The MDBInVal struct is the handle to an MDB value used as input. + */ +class LMDB_SAFE_EXPORT MDBInVal { +public: + MDBInVal(const MDBOutVal &rhs) + { + d_mdbval = rhs.d_mdbval; + } + + template ::value, T>::type * = nullptr> MDBInVal(T i) + { + memcpy(&d_memory[0], &i, sizeof(i)); + d_mdbval.mv_size = sizeof(T); + d_mdbval.mv_data = d_memory; + ; + } + + MDBInVal(const char *s) + { + d_mdbval.mv_size = strlen(s); + d_mdbval.mv_data = static_cast(const_cast(s)); + } + + MDBInVal(string_view v) + { + d_mdbval.mv_size = v.size(); + d_mdbval.mv_data = static_cast(const_cast(v.data())); + } + + MDBInVal(const std::string &v) + { + d_mdbval.mv_size = v.size(); + d_mdbval.mv_data = static_cast(const_cast(v.data())); + } + + template static MDBInVal fromStruct(const T &t) + { + MDBInVal ret; + ret.d_mdbval.mv_size = sizeof(T); + ret.d_mdbval.mv_data = static_cast(&const_cast(t)); + return ret; + } + + operator MDB_val &() + { + return d_mdbval; + } + MDB_val d_mdbval; + +private: + MDBInVal() + { + } + char d_memory[sizeof(double)]; +}; + +class MDBROCursor; + +/*! + * \brief The MDBROTransactionImpl class wraps read operations. + */ +class LMDB_SAFE_EXPORT MDBROTransactionImpl { +protected: + MDBROTransactionImpl(MDBEnv *parent, MDB_txn *txn); + +private: + static MDB_txn *openROTransaction(MDBEnv *env, MDB_txn *parent, unsigned int flags = 0); + + MDBEnv *d_parent; + std::vector d_cursors; + +protected: + MDB_txn *d_txn; + + void closeROCursors(); + +public: + explicit MDBROTransactionImpl(MDBEnv *parent, unsigned int flags = 0); + + MDBROTransactionImpl(const MDBROTransactionImpl &src) = delete; + MDBROTransactionImpl &operator=(const MDBROTransactionImpl &src) = delete; + + // The move constructor/operator cannot be made safe due to Object Slicing with MDBRWTransaction. + MDBROTransactionImpl(MDBROTransactionImpl &&rhs) = delete; + MDBROTransactionImpl &operator=(MDBROTransactionImpl &&rhs) = delete; + + virtual ~MDBROTransactionImpl(); + + virtual void abort(); + virtual void commit(); + + int get(MDB_dbi dbi, const MDBInVal &key, MDBOutVal &val) + { + if (!d_txn) + throw LMDBError("Attempt to use a closed RO transaction for get"); + + const auto rc = mdb_get(d_txn, dbi, const_cast(&key.d_mdbval), &val.d_mdbval); + if (rc && rc != MDB_NOTFOUND) + throw LMDBError("Getting data: ", rc); + + return rc; + } + + int get(MDB_dbi dbi, const MDBInVal &key, string_view &val) + { + MDBOutVal out; + int rc = get(dbi, key, out); + if (!rc) + val = out.get(); + return rc; + } + + // this is something you can do, readonly + MDBDbi openDB(string_view dbname, unsigned int flags) + { + return MDBDbi(d_parent->d_env, d_txn, dbname, flags); + } + + MDBROCursor getCursor(const MDBDbi &); + MDBROCursor getROCursor(const MDBDbi &); + + operator MDB_txn *() + { + return d_txn; + } + + inline operator bool() const + { + return d_txn; + } + + inline MDBEnv &environment() + { + return *d_parent; + } +}; + +/*! + * \brief The MDBGenCursor class represents a MDB_cursor handle. + * \remarks + * - A cursor in a read-only transaction must be closed explicitly, before or after its transaction ends. + * It can be reused with mdb_cursor_renew() before finally closing it. + * - "If the parent transaction commits, the cursor must not be used again." + */ +template class MDBGenCursor { +private: + std::vector *d_registry; + MDB_cursor *d_cursor; + +public: + MDBGenCursor() + : d_registry(nullptr) + , d_cursor(nullptr) + { + } + + MDBGenCursor(std::vector ®istry, MDB_cursor *cursor) + : d_registry(®istry) + , d_cursor(cursor) + { + registry.emplace_back(static_cast(this)); + } + +private: + void move_from(MDBGenCursor *src) + { + if (!d_registry) { + return; + } + + auto iter = std::find(d_registry->begin(), d_registry->end(), src); + if (iter != d_registry->end()) { + *iter = static_cast(this); + } else { + d_registry->emplace_back(static_cast(this)); + } + } + +public: + MDBGenCursor(const MDBGenCursor &src) = delete; + + MDBGenCursor(MDBGenCursor &&src) noexcept + : d_registry(src.d_registry) + , d_cursor(src.d_cursor) + { + move_from(&src); + src.d_registry = nullptr; + src.d_cursor = nullptr; + } + + MDBGenCursor &operator=(const MDBGenCursor &src) = delete; + + MDBGenCursor &operator=(MDBGenCursor &&src) noexcept + { + d_registry = src.d_registry; + d_cursor = src.d_cursor; + move_from(&src); + src.d_registry = nullptr; + src.d_cursor = nullptr; + return *this; + } + + ~MDBGenCursor() + { + close(); + } + +public: + int get(MDBOutVal &key, MDBOutVal &data, MDB_cursor_op op) + { + const auto rc = mdb_cursor_get(d_cursor, &key.d_mdbval, &data.d_mdbval, op); + if (rc && rc != MDB_NOTFOUND) + throw LMDBError("Unable to get from cursor: ", rc); + return rc; + } + + int find(const MDBInVal &in, MDBOutVal &key, MDBOutVal &data) + { + key.d_mdbval = in.d_mdbval; + const auto rc = mdb_cursor_get(d_cursor, const_cast(&key.d_mdbval), &data.d_mdbval, MDB_SET); + if (rc && rc != MDB_NOTFOUND) + throw LMDBError("Unable to find from cursor: ", rc); + return rc; + } + + int lower_bound(const MDBInVal &in, MDBOutVal &key, MDBOutVal &data) + { + key.d_mdbval = in.d_mdbval; + + const auto rc = mdb_cursor_get(d_cursor, const_cast(&key.d_mdbval), &data.d_mdbval, MDB_SET_RANGE); + if (rc && rc != MDB_NOTFOUND) + throw LMDBError("Unable to lower_bound from cursor: ", rc); + return rc; + } + + int nextprev(MDBOutVal &key, MDBOutVal &data, MDB_cursor_op op) + { + const auto rc = mdb_cursor_get(d_cursor, &key.d_mdbval, &data.d_mdbval, op); + if (rc && rc != MDB_NOTFOUND) + throw LMDBError("Unable to prevnext from cursor: ", rc); + return rc; + } + + int next(MDBOutVal &key, MDBOutVal &data) + { + return nextprev(key, data, MDB_NEXT); + } + + int prev(MDBOutVal &key, MDBOutVal &data) + { + return nextprev(key, data, MDB_PREV); + } + + int currentlast(MDBOutVal &key, MDBOutVal &data, MDB_cursor_op op) + { + const auto rc = mdb_cursor_get(d_cursor, &key.d_mdbval, &data.d_mdbval, op); + if (rc && rc != MDB_NOTFOUND) + throw LMDBError("Unable to next from cursor: ", rc); + return rc; + } + + int current(MDBOutVal &key, MDBOutVal &data) + { + return currentlast(key, data, MDB_GET_CURRENT); + } + int last(MDBOutVal &key, MDBOutVal &data) + { + return currentlast(key, data, MDB_LAST); + } + int first(MDBOutVal &key, MDBOutVal &data) + { + return currentlast(key, data, MDB_FIRST); + } + + operator MDB_cursor *() + { + return d_cursor; + } + + operator bool() const + { + return d_cursor; + } + + void close() + { + if (d_registry) { + auto iter = std::find(d_registry->begin(), d_registry->end(), static_cast(this)); + if (iter != d_registry->end()) { + d_registry->erase(iter); + } + d_registry = nullptr; + } + if (d_cursor) { + mdb_cursor_close(d_cursor); + d_cursor = nullptr; + } + } +}; + +/*! + * \brief The MDBROCursor class represents a read-only cursor. + */ +class LMDB_SAFE_EXPORT MDBROCursor : public MDBGenCursor { +public: + MDBROCursor() = default; + using MDBGenCursor::MDBGenCursor; + MDBROCursor(const MDBROCursor &src) = delete; + MDBROCursor(MDBROCursor &&src) = default; + MDBROCursor &operator=(const MDBROCursor &src) = delete; + MDBROCursor &operator=(MDBROCursor &&src) = default; + ~MDBROCursor() = default; +}; + +class MDBRWCursor; + +/*! + * \brief The MDBRWTransactionImpl class wraps write operations. + */ +class LMDB_SAFE_EXPORT MDBRWTransactionImpl : public MDBROTransactionImpl { +protected: + MDBRWTransactionImpl(MDBEnv *parent, MDB_txn *txn); + +private: + static MDB_txn *openRWTransaction(MDBEnv *env, MDB_txn *parent, unsigned int flags); + +private: + std::vector d_rw_cursors; + + void closeRWCursors(); + inline void closeRORWCursors() + { + closeROCursors(); + closeRWCursors(); + } + +public: + explicit MDBRWTransactionImpl(MDBEnv *parent, unsigned int flags = 0); + + MDBRWTransactionImpl(const MDBRWTransactionImpl &rhs) = delete; + MDBRWTransactionImpl(MDBRWTransactionImpl &&rhs) = delete; + MDBRWTransactionImpl &operator=(const MDBRWTransactionImpl &rhs) = delete; + MDBRWTransactionImpl &operator=(MDBRWTransactionImpl &&rhs) = delete; + + ~MDBRWTransactionImpl() override; + + void commit() override; + void abort() override; + + void clear(MDB_dbi dbi); + + void put(MDB_dbi dbi, const MDBInVal &key, const MDBInVal &val, unsigned int flags = 0) + { + if (!d_txn) + throw LMDBError("Attempt to use a closed RW transaction for put"); + if (const auto rc = mdb_put(d_txn, dbi, const_cast(&key.d_mdbval), const_cast(&val.d_mdbval), flags)) + throw LMDBError("Putting data: ", rc); + } + + int del(MDBDbi &dbi, const MDBInVal &key, const MDBInVal &val) + { + const auto rc = mdb_del(d_txn, dbi, const_cast(&key.d_mdbval), const_cast(&val.d_mdbval)); + if (rc && rc != MDB_NOTFOUND) + throw LMDBError("Deleting data: ", rc); + return rc; + } + + int del(MDBDbi &dbi, const MDBInVal &key) + { + const auto rc = mdb_del(d_txn, dbi, const_cast(&key.d_mdbval), 0); + if (rc && rc != MDB_NOTFOUND) + throw LMDBError("Deleting data: ", rc); + return rc; + } + + int get(MDBDbi &dbi, const MDBInVal &key, MDBOutVal &val) + { + if (!d_txn) + throw LMDBError("Attempt to use a closed RW transaction for get"); + + const auto rc = mdb_get(d_txn, dbi, const_cast(&key.d_mdbval), &val.d_mdbval); + if (rc && rc != MDB_NOTFOUND) + throw LMDBError("Getting data: ", rc); + return rc; + } + + int get(MDBDbi &dbi, const MDBInVal &key, string_view &val) + { + MDBOutVal out; + const auto rc = get(dbi, key, out); + if (!rc) + val = out.get(); + return rc; + } + + MDBDbi openDB(string_view dbname, unsigned int flags) + { + return MDBDbi(environment().d_env, d_txn, dbname, flags); + } + + MDBRWCursor getRWCursor(const MDBDbi &); + MDBRWCursor getCursor(const MDBDbi &); + + MDBRWTransaction getRWTransaction(); + MDBROTransaction getROTransaction(); +}; + +/*! + * \brief The MDBRWCursor class implements RW operations based on MDBGenCursor. + * \remarks + * - "A cursor in a write-transaction can be closed before its transaction ends, and will otherwise + * be closed when its transaction ends." This is a problem for us since it may means we are closing + * the cursor twice, which is bad. + */ +class LMDB_SAFE_EXPORT MDBRWCursor : public MDBGenCursor { +public: + MDBRWCursor() = default; + using MDBGenCursor::MDBGenCursor; + MDBRWCursor(const MDBRWCursor &src) = delete; + MDBRWCursor(MDBRWCursor &&src) = default; + MDBRWCursor &operator=(const MDBRWCursor &src) = delete; + MDBRWCursor &operator=(MDBRWCursor &&src) = default; + ~MDBRWCursor() = default; + + void put(const MDBOutVal &key, const MDBInVal &data) + { + if (const auto rc = mdb_cursor_put(*this, const_cast(&key.d_mdbval), const_cast(&data.d_mdbval), MDB_CURRENT)) + throw LMDBError("Putting data via mdb_cursor_put: ", rc); + } + + void put(const MDBOutVal &key, const MDBOutVal &data, unsigned int flags = 0) + { + if (const auto rc = mdb_cursor_put(*this, const_cast(&key.d_mdbval), const_cast(&data.d_mdbval), flags)) + throw LMDBError("Putting data via mdb_cursor_put: ", rc); + } + + void del(unsigned int flags = 0) + { + if (const auto rc = mdb_cursor_del(*this, flags)) + throw LMDBError("Deleting data via mdb_cursor_del: ", rc); + } +}; + +} // namespace LMDBSafe diff --git a/src/rgw/driver/posix/notify.cpp b/src/rgw/driver/posix/notify.cpp new file mode 100644 index 00000000000..8030caa93d9 --- /dev/null +++ b/src/rgw/driver/posix/notify.cpp @@ -0,0 +1,21 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +#include "notify.h" +#ifdef linux +#include +#endif + +namespace file::listing { + + std::unique_ptr Notify::factory(Notifiable* n, const std::string& bucket_root) + { +#ifdef __linux__ + return std::unique_ptr(new Inotify(n, bucket_root)); +#else +#error currently, rgw posix driver requires inotify +#endif /* linux */ + return nullptr; + } /* Notify::factory */ + +} // namespace file::listing diff --git a/src/rgw/driver/posix/notify.h b/src/rgw/driver/posix/notify.h new file mode 100644 index 00000000000..9f6088a893a --- /dev/null +++ b/src/rgw/driver/posix/notify.h @@ -0,0 +1,255 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "unordered_dense.h" +#include +#include +#ifdef __linux__ +#include +#include +#endif +#include + +namespace file::listing { + + namespace sf = std::filesystem; + + class Notifiable + { + public: + enum class EventType : uint8_t + { + ADD = 0, + REMOVE, + INVALIDATE + }; + + struct Event + { + EventType type; + std::optional name; + + Event(EventType type, std::optional name) noexcept + : type(type), name(name) + {} + + Event(Event&& rhs) noexcept + : type(rhs.type), name(rhs.name) + {} + }; + + virtual ~Notifiable() {}; + + virtual int notify(const std::string&, void*, const std::vector&) = 0; + }; + + class Notify + { + Notifiable* n; + sf::path rp; + + Notify(Notifiable* n, const std::string& bucket_root) + : n(n), rp(bucket_root) + {} + + friend class Inotify; + public: + static std::unique_ptr factory(Notifiable* n, const std::string& bucket_root); + + virtual int add_watch(const std::string& dname, void* opaque) = 0; + virtual int remove_watch(const std::string& dname) = 0; + virtual ~Notify() + {} + }; /* Notify */ + +#ifdef __linux__ + class Inotify : public Notify + { + static constexpr uint32_t rd_size = 8192; + static constexpr uint32_t aw_mask = IN_ALL_EVENTS & + ~(IN_MOVE_SELF|IN_OPEN|IN_ACCESS|IN_ATTRIB|IN_CLOSE_WRITE|IN_CLOSE_NOWRITE|IN_MODIFY|IN_DELETE_SELF); + + static constexpr uint64_t sig_shutdown = std::numeric_limits::max() - 0xdeadbeef; + + class WatchRecord + { + public: + int wd; + std::string name; + void* opaque; + public: + WatchRecord(int wd, const std::string& name, void* opaque) noexcept + : wd(wd), name(name), opaque(opaque) + {} + + WatchRecord(WatchRecord&& wr) noexcept + : wd(wr.wd), name(wr.name), opaque(wr.opaque) + {} + + WatchRecord& operator=(WatchRecord&& wr) { + wd = wr.wd; + name = std::move(wr.name); + opaque = wr.opaque; + return *this; + } + }; /* WatchRecord */ + + using wd_callback_map_t = ankerl::unordered_dense::map; + using wd_remove_map_t = ankerl::unordered_dense::map; + + int wfd, efd; + std::thread thrd; + wd_callback_map_t wd_callback_map; + wd_remove_map_t wd_remove_map; + bool shutdown{false}; + + class AlignedBuf + { + char* m; + public: + AlignedBuf() { + m = static_cast(aligned_alloc(__alignof__(struct inotify_event), rd_size)); + if (! m) [[unlikely]] { + std::cerr << fmt::format("{} buffer allocation failure", __func__) << std::endl; + abort(); + } + } + ~AlignedBuf() { + std::free(m); + } + char* get() { + return m; + } + }; /* AlignedBuf */ + + void ev_loop() { + std::unique_ptr up_buf = std::make_unique(); + struct inotify_event* event; + char* buf = up_buf.get()->get(); + ssize_t len; + int npoll; + + nfds_t nfds{2}; + struct pollfd fds[2] = {{wfd, POLLIN}, {efd, POLLIN}}; + + restart: + while(! shutdown) { + npoll = poll(fds, nfds, -1); /* for up to 10 fds, poll is fast as epoll */ + if (shutdown) { + return; + } + if (npoll == -1) { + if (errno == EINTR) { + continue; + } + // XXX + } + if (npoll > 0) { + len = read(wfd, buf, rd_size); + if (len == -1) { + continue; // hopefully, was EAGAIN + } + std::vector evec; + for (char* ptr = buf; ptr < buf + len; + ptr += sizeof(struct inotify_event) + event->len) { + event = reinterpret_cast(ptr); + const auto& it = wd_callback_map.find(event->wd); + //std::cout << fmt::format("event! {}", event->name) << std::endl; + if (it == wd_callback_map.end()) [[unlikely]] { + /* non-destructive race, it happens */ + continue; + } + const auto& wr = it->second; + if (event->mask & IN_Q_OVERFLOW) [[unlikely]] { + /* cache blown, invalidate */ + evec.clear(); + evec.emplace_back(Notifiable::Event(Notifiable::EventType::INVALIDATE, std::nullopt)); + n->notify(wr.name, wr.opaque, evec); + goto restart; + } else { + if ((event->mask & IN_CREATE) || + (event->mask & IN_MOVED_TO)) { + /* new object in dir */ + evec.emplace_back(Notifiable::Event(Notifiable::EventType::ADD, event->name)); + } else if ((event->mask & IN_DELETE) || + (event->mask & IN_MOVED_FROM)) { + /* object removed from dir */ + evec.emplace_back(Notifiable::Event(Notifiable::EventType::REMOVE, event->name)); + } + } /* !overflow */ + if (evec.size() > 0) { + n->notify(wr.name, wr.opaque, evec); + } + } /* events */ + } /* n > 0 */ + } + } /* ev_loop */ + + Inotify(Notifiable* n, const std::string& bucket_root) + : Notify(n, bucket_root), + thrd(&Inotify::ev_loop, this) + { + wfd = inotify_init1(IN_NONBLOCK); + if (wfd == -1) { + std::cerr << fmt::format("{} inotify_init1 failed with {}", __func__, wfd) << std::endl; + exit(1); + } + efd = eventfd(0, EFD_NONBLOCK); + } + + void signal_shutdown() { + uint64_t msg{sig_shutdown}; + (void) write(efd, &msg, sizeof(uint64_t)); + } + + friend class Notify; + public: + virtual int add_watch(const std::string& dname, void* opaque) override { + sf::path wp{rp / dname}; + int wd = inotify_add_watch(wfd, wp.c_str(), aw_mask); + if (wd == -1) { + std::cerr << fmt::format("{} inotify_add_watch {} failed with {}", __func__, dname, wd) << std::endl; + } else { + wd_callback_map.insert(wd_callback_map_t::value_type(wd, WatchRecord(wd, dname, opaque))); + wd_remove_map.insert(wd_remove_map_t::value_type(dname, wd)); + } + return wd; + } + + virtual int remove_watch(const std::string& dname) override { + int r{0}; + const auto& elt = wd_remove_map.find(dname); + if (elt != wd_remove_map.end()) { + auto& wd = elt->second; + r = inotify_rm_watch(wfd, wd); + if (r == -1) { + std::cerr << fmt::format("{} inotify_rm_watch {} failed with {}", __func__, dname, wd) << std::endl; + } + wd_callback_map.erase(wd); + wd_remove_map.erase(std::string(dname)); + } + return r; + } + + virtual ~Inotify() { + shutdown = true; + signal_shutdown(); + thrd.join(); + } + }; +#endif /* linux */ + +} // namespace file::listing diff --git a/src/rgw/driver/posix/rgw_sal_posix.cc b/src/rgw/driver/posix/rgw_sal_posix.cc new file mode 100644 index 00000000000..036d02cf58d --- /dev/null +++ b/src/rgw/driver/posix/rgw_sal_posix.cc @@ -0,0 +1,3096 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +/* + * Ceph - scalable distributed file system + * + * Copyright contributors to the Ceph project + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "rgw_sal_posix.h" +#include +#include +#include +#include +#include "rgw_multi.h" +#include "rgw_acl_s3.h" +#include "include/scope_guard.h" + +#define dout_subsys ceph_subsys_rgw +#define dout_context g_ceph_context + +namespace rgw { namespace sal { + +const int64_t READ_SIZE = 8 * 1024; +const std::string ATTR_PREFIX = "user.X-RGW-"; +#define RGW_POSIX_ATTR_BUCKET_INFO "POSIX-Bucket-Info" +#define RGW_POSIX_ATTR_MPUPLOAD "POSIX-Multipart-Upload" +#define RGW_POSIX_ATTR_OWNER "POSIX-Owner" +const std::string mp_ns = "multipart"; +const std::string MP_OBJ_PART_PFX = "part-"; +const std::string MP_OBJ_PART_FMT = "{:0>5}"; +const std::string MP_OBJ_HEAD_NAME = MP_OBJ_PART_PFX + "00000"; + +static int decode_policy(CephContext* cct, + bufferlist& bl, + RGWAccessControlPolicy* policy) +{ + auto iter = bl.cbegin(); + try { + policy->decode(iter); + } catch (buffer::error& err) { + ldout(cct, 0) << "ERROR: could not decode policy, caught buffer::error" << dendl; + return -EIO; + } + if (cct->_conf->subsys.should_gather()) { + ldout(cct, 15) << __func__ << " POSIX Read AccessControlPolicy"; + RGWAccessControlPolicy_S3* s3policy = static_cast(policy); + s3policy->to_xml(*_dout); + *_dout << dendl; + } + return 0; +} + +static int rgw_op_get_bucket_policy_from_attr(const DoutPrefixProvider* dpp, + POSIXDriver* driver, + User* user, + Attrs& bucket_attrs, + RGWAccessControlPolicy* policy, + optional_yield y) +{ + auto aiter = bucket_attrs.find(RGW_ATTR_ACL); + + if (aiter != bucket_attrs.end()) { + int ret = decode_policy(driver->ctx(), aiter->second, policy); + if (ret < 0) + return ret; + } else { + ldout(driver->ctx(), 0) << "WARNING: couldn't find acl header for bucket, generating default" << dendl; + /* object exists, but policy is broken */ + int r = user->load_user(dpp, y); + if (r < 0) + return r; + + policy->create_default(user->get_id(), user->get_display_name()); + } + return 0; +} + +static inline bool get_attr(Attrs& attrs, const char* name, bufferlist& bl) +{ + auto iter = attrs.find(name); + if (iter == attrs.end()) { + return false; + } + + bl = iter->second; + return true; +} + +static inline rgw_obj_key decode_obj_key(const char* fname) +{ + std::string dname, oname, ns; + dname = url_decode(fname); + rgw_obj_key::parse_index_key(dname, &oname, &ns); + rgw_obj_key key(oname, std::string(), ns); + return key; +} + +static inline ceph::real_time from_statx_timestamp(const struct statx_timestamp& xts) +{ + struct timespec ts{xts.tv_sec, xts.tv_nsec}; + return ceph::real_clock::from_timespec(ts); +} + +static inline void bucket_statx_save(struct statx& stx, RGWBucketEnt& ent, ceph::real_time& mtime) +{ + mtime = ceph::real_clock::from_time_t(stx.stx_mtime.tv_sec); + ent.creation_time = ceph::real_clock::from_time_t(stx.stx_btime.tv_sec); + // TODO Calculate size of bucket (or save it somewhere?) + //ent.size = stx.stx_size; + //ent.size_rounded = stx.stx_blocks * 512; +} + +static inline int copy_dir_fd(int old_fd) +{ + return openat(old_fd, ".", O_RDONLY | O_DIRECTORY | O_NOFOLLOW); +} + +static int get_x_attrs(optional_yield y, const DoutPrefixProvider* dpp, int fd, + Attrs& attrs, const std::string& display) +{ + char namebuf[64 * 1024]; // Max list size supported on linux + ssize_t buflen; + int ret; + + buflen = flistxattr(fd, namebuf, sizeof(namebuf)); + if (buflen < 0) { + ret = errno; + ldpp_dout(dpp, 0) << "ERROR: could not list attributes for " << display << ": " + << cpp_strerror(ret) << dendl; + return -ret; + } + + char *keyptr = namebuf; + while (buflen > 0) { + std::string value; + ssize_t vallen, keylen; + char* vp; + + keylen = strlen(keyptr) + 1; + std::string key(keyptr); + std::string::size_type prefixloc = key.find(ATTR_PREFIX); + + if (prefixloc == std::string::npos) { + /* Not one of our attributes */ + buflen -= keylen; + keyptr += keylen; + continue; + } + + /* Make a key that has just the attribute name */ + key.erase(prefixloc, ATTR_PREFIX.length()); + + vallen = fgetxattr(fd, keyptr, nullptr, 0); + if (vallen < 0) { + ret = errno; + ldpp_dout(dpp, 0) << "ERROR: could not get attribute " << keyptr << " for " << display << ": " << cpp_strerror(ret) << dendl; + return -ret; + } else if (vallen == 0) { + /* No attribute value for this name */ + buflen -= keylen; + keyptr += keylen; + continue; + } + + value.reserve(vallen + 1); + vp = &value[0]; + + vallen = fgetxattr(fd, keyptr, vp, vallen); + if (vallen < 0) { + ret = errno; + ldpp_dout(dpp, 0) << "ERROR: could not get attribute " << keyptr << " for " << display << ": " << cpp_strerror(ret) << dendl; + return -ret; + } + + bufferlist bl; + bl.append(vp, vallen); + attrs.emplace(std::move(key), std::move(bl)); /* key and bl are r-value refs */ + + buflen -= keylen; + keyptr += keylen; + } + + return 0; +} + +static int write_x_attr(const DoutPrefixProvider* dpp, optional_yield y, int fd, + const std::string& key, bufferlist& value, + const std::string& display) +{ + int ret; + std::string attrname; + + attrname = ATTR_PREFIX + key; + + ret = fsetxattr(fd, attrname.c_str(), value.c_str(), value.length(), 0); + if (ret < 0) { + ret = errno; + ldpp_dout(dpp, 0) << "ERROR: could not write attribute " << attrname << " for " << display << ": " << cpp_strerror(ret) << dendl; + return -ret; + } + + return 0; +} + +static int delete_directory(int parent_fd, const char* dname, bool delete_children, + const DoutPrefixProvider* dpp) +{ + int ret; + int dir_fd = -1; + DIR *dir; + struct dirent *entry; + + dir_fd = openat(parent_fd, dname, O_RDONLY | O_DIRECTORY | O_NOFOLLOW); + if (dir_fd < 0) { + dir_fd = errno; + ldpp_dout(dpp, 0) << "ERROR: could not open subdir " << dname << ": " + << cpp_strerror(dir_fd) << dendl; + return -dir_fd; + } + + dir = fdopendir(dir_fd); + if (dir == NULL) { + ret = errno; + ldpp_dout(dpp, 0) << "ERROR: could not open bucket " << dname + << " for listing: " << cpp_strerror(ret) << dendl; + return -ret; + } + + errno = 0; + while ((entry = readdir(dir)) != NULL) { + struct statx stx; + + if ((entry->d_name[0] == '.' && entry->d_name[1] == '\0') || + (entry->d_name[0] == '.' && entry->d_name[1] == '.' && + entry->d_name[2] == '\0')) { + /* Skip . and .. */ + errno = 0; + continue; + } + + std::string_view d_name = entry->d_name; + bool is_mp = d_name.starts_with("." + mp_ns); + if (!is_mp && !delete_children) { + return -ENOTEMPTY; + } + + ret = statx(dir_fd, entry->d_name, AT_SYMLINK_NOFOLLOW, STATX_ALL, &stx); + if (ret < 0) { + ret = errno; + ldpp_dout(dpp, 0) << "ERROR: could not stat object " << entry->d_name + << ": " << cpp_strerror(ret) << dendl; + return -ret; + } + + if (S_ISDIR(stx.stx_mode)) { + /* Recurse */ + ret = delete_directory(dir_fd, entry->d_name, true, dpp); + if (ret < 0) { + return ret; + } + + continue; + } + + /* Otherwise, unlink */ + ret = unlinkat(dir_fd, entry->d_name, 0); + if (ret < 0) { + ret = errno; + ldpp_dout(dpp, 0) << "ERROR: could not remove file " << entry->d_name + << ": " << cpp_strerror(ret) << dendl; + return -ret; + } + } + + ret = unlinkat(parent_fd, dname, AT_REMOVEDIR); + if (ret < 0) { + ret = errno; + if (errno != ENOENT) { + ldpp_dout(dpp, 0) << "ERROR: could not remove bucket " << dname << ": " + << cpp_strerror(ret) << dendl; + return -ret; + } + } + + return 0; +} + +int POSIXDriver::initialize(CephContext *cct, const DoutPrefixProvider *dpp) +{ + FilterDriver::initialize(cct, dpp); + + base_path = g_conf().get_val("rgw_posix_base_path"); + + ldpp_dout(dpp, 20) << "Initializing POSIX driver: " << base_path << dendl; + + /* ordered listing cache */ + bucket_cache.reset( + new BucketCache( + this, base_path, + g_conf().get_val("rgw_posix_database_root"), + g_conf().get_val("rgw_posix_cache_max_buckets"), + g_conf().get_val("rgw_posix_cache_lanes"), + g_conf().get_val("rgw_posix_cache_partitions"), + g_conf().get_val("rgw_posix_cache_lmdb_count"))); + + root_fd = openat(-1, base_path.c_str(), O_RDONLY | O_DIRECTORY | O_NOFOLLOW); + if (root_fd == -1) { + int err = errno; + if (err == ENOTDIR) { + ldpp_dout(dpp, 0) << " ERROR: base path (" << base_path + << "): was not a directory." << dendl; + return -err; + } else if (err == ENOENT) { + err = mkdir(base_path.c_str(), S_IRWXU); + if (err < 0) { + err = errno; + ldpp_dout(dpp, 0) << " ERROR: could not create base path (" + << base_path << "): " << cpp_strerror(err) << dendl; + return -err; + } + root_fd = ::open(base_path.c_str(), O_RDONLY | O_DIRECTORY | O_NOFOLLOW); + } + } + if (root_fd == -1) { + int err = errno; + ldpp_dout(dpp, 0) << " ERROR: could not open base path (" + << base_path << "): " << cpp_strerror(err) << dendl; + return -err; + } + + ldpp_dout(dpp, 20) << "SUCCESS" << dendl; + return 0; +} + +std::unique_ptr POSIXDriver::get_user(const rgw_user &u) +{ + std::unique_ptr user = next->get_user(u); + + return std::make_unique(std::move(user), this); +} + +int POSIXDriver::get_user_by_access_key(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y, std::unique_ptr* user) +{ + std::unique_ptr nu; + int ret; + + ret = next->get_user_by_access_key(dpp, key, y, &nu); + if (ret != 0) + return ret; + + User* u = new POSIXUser(std::move(nu), this); + user->reset(u); + return 0; +} + +int POSIXDriver::get_user_by_email(const DoutPrefixProvider* dpp, const std::string& email, optional_yield y, std::unique_ptr* user) +{ + std::unique_ptr nu; + int ret; + + ret = next->get_user_by_email(dpp, email, y, &nu); + if (ret != 0) + return ret; + + User* u = new POSIXUser(std::move(nu), this); + user->reset(u); + return 0; +} + +int POSIXDriver::get_user_by_swift(const DoutPrefixProvider* dpp, const std::string& user_str, optional_yield y, std::unique_ptr* user) +{ + std::unique_ptr nu; + int ret; + + ret = next->get_user_by_swift(dpp, user_str, y, &nu); + if (ret != 0) + return ret; + + User* u = new POSIXUser(std::move(nu), this); + user->reset(u); + return 0; +} + +std::unique_ptr POSIXDriver::get_object(const rgw_obj_key& k) +{ + return std::make_unique(this, k); +} + +int POSIXDriver::get_bucket(const DoutPrefixProvider* dpp, User* u, const rgw_bucket& b, std::unique_ptr* bucket, optional_yield y) +{ + int ret; + Bucket* bp; + + bp = new POSIXBucket(this, root_fd, b, u); + ret = bp->load_bucket(dpp, y); + if (ret < 0) { + delete bp; + return ret; + } + + bucket->reset(bp); + return 0; +} + +int POSIXDriver::get_bucket(User* u, const RGWBucketInfo& i, std::unique_ptr* bucket) +{ + Bucket* bp; + + bp = new POSIXBucket(this, root_fd, i, u); + /* Don't need to fetch the bucket info, use the provided one */ + + bucket->reset(bp); + return 0; +} + +int POSIXDriver::get_bucket(const DoutPrefixProvider* dpp, User* u, const std::string& tenant, const std::string& name, std::unique_ptr* bucket, optional_yield y) +{ + rgw_bucket b; + + b.tenant = tenant; + b.name = name; + + return get_bucket(dpp, u, b, bucket, y); +} + +std::string POSIXDriver::zone_unique_trans_id(const uint64_t unique_num) +{ + char buf[41]; /* 2 + 21 + 1 + 16 (timestamp can consume up to 16) + 1 */ + time_t timestamp = time(NULL); + + snprintf(buf, sizeof(buf), "tx%021llx-%010llx", + (unsigned long long)unique_num, + (unsigned long long)timestamp); + + return std::string(buf); +} +std::unique_ptr POSIXDriver::get_append_writer(const DoutPrefixProvider *dpp, + optional_yield y, + rgw::sal::Object* _head_obj, + const rgw_user& owner, + const rgw_placement_rule *ptail_placement_rule, + const std::string& unique_tag, + uint64_t position, + uint64_t *cur_accounted_size) +{ + std::unique_ptr writer = next->get_append_writer(dpp, y, _head_obj, + owner, ptail_placement_rule, + unique_tag, position, + cur_accounted_size); + + return std::make_unique(std::move(writer), std::move(_head_obj)); +} + +std::unique_ptr POSIXDriver::get_atomic_writer(const DoutPrefixProvider *dpp, + optional_yield y, + rgw::sal::Object* _head_obj, + const rgw_user& owner, + const rgw_placement_rule *ptail_placement_rule, + uint64_t olh_epoch, + const std::string& unique_tag) +{ + + return std::make_unique(dpp, y, _head_obj, this, owner, ptail_placement_rule, olh_epoch, unique_tag); +} + +void POSIXDriver::finalize(void) +{ + next->finalize(); +} + +void POSIXDriver::register_admin_apis(RGWRESTMgr* mgr) +{ + return next->register_admin_apis(mgr); +} + +std::unique_ptr POSIXDriver::get_notification(rgw::sal::Object* obj, + rgw::sal::Object* src_obj, struct req_state* s, + rgw::notify::EventType event_type, optional_yield y, + const std::string* object_name) +{ + return next->get_notification(obj, src_obj, s, event_type, y, object_name); +} + +std::unique_ptr POSIXDriver::get_notification(const DoutPrefixProvider* dpp, + rgw::sal::Object* obj, rgw::sal::Object* src_obj, + rgw::notify::EventType event_type, + rgw::sal::Bucket* _bucket, + std::string& _user_id, std::string& _user_tenant, + std::string& _req_id, optional_yield y) +{ + return next->get_notification(dpp, obj, src_obj, event_type, _bucket, _user_id, _user_tenant, _req_id, y); +} + +int POSIXDriver::close() +{ + if (root_fd < 0) { + return 0; + } + + ::close(root_fd); + root_fd = -1; + + return 0; +} + +int POSIXUser::list_buckets(const DoutPrefixProvider* dpp, const std::string& marker, + const std::string& end_marker, uint64_t max, + bool need_stats, BucketList &buckets, optional_yield y) +{ + DIR* dir; + struct dirent* entry; + int dfd; + int ret; + + buckets.clear(); + + /* it's not sufficient to dup(root_fd), as as the new fd would share + * the file position of root_fd */ + dfd = copy_dir_fd(driver->get_root_fd()); + if (dfd == -1) { + ret = errno; + ldpp_dout(dpp, 0) << "ERROR: could not open root to list buckets: " + << cpp_strerror(ret) << dendl; + return -errno; + } + + dir = fdopendir(dfd); + if (dir == NULL) { + ret = errno; + ldpp_dout(dpp, 0) << "ERROR: could not open root to list buckets: " + << cpp_strerror(ret) << dendl; + close(dfd); + return -ret; + } + + auto cleanup_guard = make_scope_guard( + [&dir] + { + closedir(dir); + // dfd is also closed + } + ); + + errno = 0; + while ((entry = readdir(dir)) != NULL) { + struct statx stx; + + ret = statx(driver->get_root_fd(), entry->d_name, AT_SYMLINK_NOFOLLOW, STATX_ALL, &stx); + if (ret < 0) { + ret = errno; + ldpp_dout(dpp, 0) << "ERROR: could not stat object " << entry->d_name << ": " + << cpp_strerror(ret) << dendl; + buckets.clear(); + return -ret; + } + + if (!S_ISDIR(stx.stx_mode)) { + /* Not a bucket, skip it */ + errno = 0; + continue; + } + if (entry->d_name[0] == '.') { + /* Skip dotfiles */ + errno = 0; + continue; + } + + /* TODO Use stat_to_ent */ + //RGWBucketEnt ent; + //ent.bucket.name = decode_name(entry->d_name); + //bucket_statx_save(stx, ent, mtime); + RGWBucketInfo info; + info.bucket.name = url_decode(entry->d_name); + info.owner.id = std::to_string(stx.stx_uid); // TODO convert to owner name + info.creation_time = from_statx_timestamp(stx.stx_btime); + + std::unique_ptr bucket; + ret = driver->get_bucket(this, info, &bucket); + if (ret < 0) { + ldpp_dout(dpp, 0) << "ERROR: could not get bucket " << info.bucket << ": " + << cpp_strerror(ret) << dendl; + buckets.clear(); + return -ret; + } + + buckets.add(std::move(bucket)); + + errno = 0; + } + ret = errno; + if (ret != 0) { + ldpp_dout(dpp, 0) << "ERROR: could not list buckets for " << get_display_name() << ": " + << cpp_strerror(ret) << dendl; + buckets.clear(); + return -ret; + } + + return 0; +} + +int POSIXUser::create_bucket(const DoutPrefixProvider* dpp, + const rgw_bucket& b, + const std::string& zonegroup_id, + rgw_placement_rule& placement_rule, + std::string& swift_ver_location, + const RGWQuotaInfo * pquota_info, + const RGWAccessControlPolicy& policy, + Attrs& attrs, + RGWBucketInfo& binfo, + obj_version& ep_objv, + bool exclusive, + bool obj_lock_enabled, + bool* existed, + req_info& req_info, + std::unique_ptr* bucket_out, + optional_yield y) +{ + /* Check for existence */ + { + std::unique_ptr bucket; + + int ret = driver->get_bucket(dpp, this, b, &bucket, y); + if (ret >= 0) { + *existed = true; + // Bucket exists. Check owner comparison + if (bucket->get_info().owner.compare(this->get_id()) != 0) { + return -EEXIST; + } + // Don't allow changes to ACL policy + RGWAccessControlPolicy old_policy(driver->ctx()); + ret = rgw_op_get_bucket_policy_from_attr( + dpp, driver, this, bucket->get_attrs(), &old_policy, y); + if (ret >= 0 && old_policy != policy) { + bucket_out->swap(bucket); + return -EEXIST; + } + } else { + *existed = false; + } + } + + binfo.bucket = b; + binfo.owner = get_id(); + binfo.zonegroup = zonegroup_id; + binfo.placement_rule = placement_rule; + binfo.swift_ver_location = swift_ver_location; + binfo.swift_versioning = (!swift_ver_location.empty()); + binfo.requester_pays = false; + binfo.creation_time = ceph::real_clock::now(); + if (pquota_info) { + binfo.quota = *pquota_info; + } + + POSIXBucket* fb = new POSIXBucket(driver, driver->get_root_fd(), binfo, this); + + int ret = fb->set_attrs(attrs); + if (ret < 0) { + delete fb; + return ret; + } + + ret = fb->create(dpp, y, existed); + if (ret < 0) { + delete fb; + return ret; + } + + bucket_out->reset(fb); + return 0; +} + +int POSIXUser::read_attrs(const DoutPrefixProvider* dpp, optional_yield y) +{ + return next->read_attrs(dpp, y); +} + +int POSIXUser::merge_and_store_attrs(const DoutPrefixProvider* dpp, + Attrs& new_attrs, optional_yield y) +{ + return next->merge_and_store_attrs(dpp, new_attrs, y); +} + +int POSIXUser::load_user(const DoutPrefixProvider* dpp, optional_yield y) +{ + return next->load_user(dpp, y); +} + +int POSIXUser::store_user(const DoutPrefixProvider* dpp, optional_yield y, bool exclusive, RGWUserInfo* old_info) +{ + return next->store_user(dpp, y, exclusive, old_info); +} + +int POSIXUser::remove_user(const DoutPrefixProvider* dpp, optional_yield y) +{ + return next->remove_user(dpp, y); +} + +std::unique_ptr POSIXBucket::get_object(const rgw_obj_key& k) +{ + return std::make_unique(driver, k, this); +} + +int POSIXObject::fill_bde(const DoutPrefixProvider *dpp, optional_yield y, rgw_bucket_dir_entry& bde) +{ + std::unique_ptr owner; + (void)get_owner(dpp, y, &owner); + + get_key().get_index_key(&bde.key); + bde.ver.pool = 1; + bde.ver.epoch = 1; + bde.exists = true; + bde.meta.category = RGWObjCategory::Main; + bde.meta.size = get_obj_size(); + bde.meta.mtime = get_mtime(); + if (owner) { + bde.meta.owner = owner->get_id().to_str(); + bde.meta.owner_display_name = owner->get_display_name(); + } else { + bde.meta.owner = "unknown"; + bde.meta.owner_display_name = "unknown"; + } + bde.meta.accounted_size = get_obj_size(); + bde.meta.storage_class = RGW_STORAGE_CLASS_STANDARD; + bde.meta.appendable = true; + bufferlist etag_bl; + if (rgw::sal::get_attr(get_attrs(), RGW_ATTR_ETAG, etag_bl)) { + bde.meta.etag = etag_bl.to_str(); + } + + return 0; +} + +int POSIXDriver::mint_listing_entry(const std::string &bname, + rgw_bucket_dir_entry &bde) { + std::unique_ptr b; + std::unique_ptr obj; + POSIXObject *pobj; + int ret; + + ret = get_bucket(nullptr, nullptr, std::string(), bname, &b, null_yield); + if (ret < 0) + return ret; + + obj = b->get_object(decode_obj_key(bde.key.name.c_str())); + pobj = static_cast(obj.get()); + + if (!pobj->exists(nullptr)) { + ret = errno; + return -ret; + } + + ret = pobj->get_obj_attrs(null_yield, nullptr); + if (ret < 0) + return ret; + + ret = pobj->fill_bde(nullptr, null_yield, bde); + if (ret < 0) + return ret; + + return 0; +} +int POSIXBucket::fill_cache(const DoutPrefixProvider* dpp, optional_yield y, + fill_cache_cb_t cb) +{ + int ret = for_each(dpp, [this, &cb, &dpp, &y](const char* name) { + int ret; + std::unique_ptr obj; + POSIXObject* pobj; + + if (name[0] == '.') { + /* Skip dotfiles */ + return 0; + } + + obj = get_object(decode_obj_key(name)); + pobj = static_cast(obj.get()); + + if (!pobj->exists(dpp)) { + ret = errno; + ldpp_dout(dpp, 0) << "ERROR: could not stat object " << name << ": " + << cpp_strerror(ret) << dendl; + return -ret; + } + + ret = pobj->get_obj_attrs(y, dpp); + if (ret < 0) + return ret; + + rgw_bucket_dir_entry bde{}; + ret = pobj->fill_bde(dpp, y, bde); + if (ret < 0) + return ret; + + cb(dpp, bde); + + return 0; + }); + if (ret < 0) { + ldpp_dout(dpp, 0) << "ERROR: could not list bucket " << get_name() << ": " + << cpp_strerror(ret) << dendl; + return ret; + } + + return 0; +} + +// TODO marker and other params +int POSIXBucket::list(const DoutPrefixProvider* dpp, ListParams& params, + int max, ListResults& results, optional_yield y) +{ + int count{0}; + bool in_prefix{false}; + // Names in the cache are in OID format + { + rgw_obj_key key(params.marker); + params.marker = key.get_oid(); + key.set(params.prefix); + params.prefix = key.get_oid(); + } + // Names are url_encoded, so encode prefix and delimiter + // Names seem to not be url_encoded in cache + //params.prefix = url_encode(params.prefix); + //params.delim = url_encode(params.delim); + if (max <= 0) { + return 0; + } + + int ret = driver->get_bucket_cache()->list_bucket( + dpp, y, this, params.marker.name, [&](const rgw_bucket_dir_entry& bde) -> bool + { + std::string ns; + // bde.key can be encoded with the namespace. Decode it here + if (!params.marker.empty() && params.marker == bde.key.name) { + // Skip marker + return true; + } + if (!params.prefix.empty()) { + // We have a prefix, only match + if (!bde.key.name.starts_with(params.prefix)) { + // Prefix doesn't match; skip + if (in_prefix) { + return false; + } + return true; + } + // Prefix matches + if (params.delim.empty()) { + // No delimiter, add matches + results.next_marker.set(bde.key); + results.objs.push_back(bde); + count++; + if (count >= max) { + results.is_truncated = true; + return false; + } + return true; + } + auto delim_pos = bde.key.name.find(params.delim, params.prefix.size()); + if (delim_pos == std::string_view::npos) { + // Straight prefix match + results.next_marker.set(bde.key); + results.objs.push_back(bde); + count++; + if (count >= max) { + results.is_truncated = true; + return false; + } + return true; + } + std::string prefix_key = + bde.key.name.substr(0, delim_pos + params.delim.length()); + rgw_obj_key::parse_raw_oid(prefix_key, &results.next_marker); + // Use results.next_marker.name for prefix_key, since it's been decoded + if (!results.common_prefixes.contains(results.next_marker.name)) { + results.common_prefixes[results.next_marker.name] = true; + count++; // Count will be checked when we exit prefix + if (in_prefix) { + // We've hit the next prefix entry. Check count + if (count >= max) { + results.is_truncated = true; + // Time to stop + return false; + } + } + } + in_prefix = true; + return true; + } + if (!params.delim.empty()) { + // Delimiter, but no prefix + auto delim_pos = bde.key.name.find(params.delim) ; + if (delim_pos == std::string_view::npos) { + // Delimiter doesn't match, insert + results.next_marker.set(bde.key); + results.objs.push_back(bde); + count++; + if (count >= max) { + results.is_truncated = true; + return false; + } + return true; + } + std::string prefix_key = + bde.key.name.substr(0, delim_pos + params.delim.length()); + if (!params.marker.empty() && params.marker == prefix_key) { + // Skip marker + return true; + } + std::string decoded_key; + rgw_obj_key::parse_index_key(prefix_key, &decoded_key, &ns); + if (!results.common_prefixes.contains(decoded_key)) { + if (in_prefix) { + // New prefix, check the count + count++; + if (count >= max) { + results.is_truncated = true; + return false; + } + } + in_prefix = true; + results.common_prefixes[decoded_key] = true; + // Fallthrough + } + results.next_marker.name = decoded_key; + return true; + } + + results.next_marker.set(bde.key); + results.objs.push_back(bde); + count++; + if (count >= max) { + results.is_truncated = true; + return false; + } + return true; + }); + + if (ret < 0) { + ldpp_dout(dpp, 0) << "ERROR: could not list bucket " << get_name() << ": " + << cpp_strerror(ret) << dendl; + results.objs.clear(); + return ret; + } + + return 0; +} + +int POSIXBucket::merge_and_store_attrs(const DoutPrefixProvider* dpp, + Attrs& new_attrs, optional_yield y) +{ + for (auto& it : new_attrs) { + attrs[it.first] = it.second; + } + + return write_attrs(dpp, y); +} + +int POSIXBucket::remove_bucket(const DoutPrefixProvider* dpp, + bool delete_children, + bool forward_to_master, + req_info* req_info, + optional_yield y) +{ + return delete_directory(parent_fd, get_fname().c_str(), + delete_children, dpp); +} + +int POSIXBucket::remove_bucket_bypass_gc(int concurrent_max, + bool keep_index_consistent, + optional_yield y, + const DoutPrefixProvider *dpp) +{ + return remove_bucket(dpp, true, false, nullptr, y); +} + +int POSIXBucket::load_bucket(const DoutPrefixProvider* dpp, optional_yield y, + bool get_stats) +{ + int ret; + + if (get_name()[0] == '.') { + /* Skip dotfiles */ + return -ERR_INVALID_OBJECT_NAME; + } + ret = stat(dpp); + if (ret < 0) { + return ret; + } + + bucket_statx_save(stx, ent, mtime); + info.creation_time = ent.creation_time; + + if (owner) { + info.owner = owner->get_id(); + } + + ret = open(dpp); + if (ret < 0) { + return ret; + } + get_x_attrs(y, dpp, dir_fd, attrs, get_name()); + + bufferlist bl; + if (get_attr(attrs, RGW_POSIX_ATTR_BUCKET_INFO, bl)) { + // Proper bucket with saved info + try { + auto bufit = bl.cbegin(); + decode(info, bufit); + } catch (buffer::error &err) { + ldout(driver->ctx(), 0) << "ERROR: " << __func__ << ": failed to decode " RGW_POSIX_ATTR_BUCKET_INFO " attr" << dendl; + return -EINVAL; + } + // info isn't stored in attrs + attrs.erase(RGW_POSIX_ATTR_BUCKET_INFO); + } else { + // TODO dang: fake info up (UID to owner conversion?) + } + + info.creation_time = ent.creation_time; + + return 0; +} + +int POSIXBucket::set_acl(const DoutPrefixProvider* dpp, + RGWAccessControlPolicy& acl, + optional_yield y) +{ + bufferlist aclbl; + + acls = acl; + acl.encode(aclbl); + + attrs[RGW_ATTR_ACL] = aclbl; + info.owner = acl.get_owner().get_id(); + + return write_attrs(dpp, y); +} + +int POSIXBucket::read_stats(const DoutPrefixProvider *dpp, + const bucket_index_layout_generation& idx_layout, + int shard_id, std::string* bucket_ver, std::string* master_ver, + std::map& stats, + std::string* max_marker, bool* syncstopped) +{ + return 0; +} + +int POSIXBucket::read_stats_async(const DoutPrefixProvider *dpp, + const bucket_index_layout_generation& idx_layout, + int shard_id, RGWGetBucketStats_CB* ctx) +{ + return 0; +} + +int POSIXBucket::sync_user_stats(const DoutPrefixProvider *dpp, optional_yield y) +{ + return 0; +} + +int POSIXBucket::update_container_stats(const DoutPrefixProvider* dpp, optional_yield y) +{ + /* Force re-stat */ + stat_done = false; + int ret = stat(dpp); + if (ret < 0) { + return ret; + } + + bucket_statx_save(stx, ent, mtime); + info.creation_time = ent.creation_time; + ent.count = 0; + ent.size = 0; + + // TODO dang: store size/count in attributes + ret = for_each(dpp, [this, &dpp](const char* name) { + int ret; + struct statx lstx; + + if (name[0] == '.') { + /* Skip dotfiles */ + return 0; + } + + ret = statx(dir_fd, name, AT_SYMLINK_NOFOLLOW, STATX_ALL, &lstx); + if (ret < 0) { + ret = errno; + ldpp_dout(dpp, 0) << "ERROR: could not stat object " << name << ": " + << cpp_strerror(ret) << dendl; + return -ret; + } + + if (S_ISREG(lstx.stx_mode) || S_ISDIR(lstx.stx_mode)) { + ent.count++; + ent.size += lstx.stx_size; + } + + return 0; + }); + + return 0; +} + +int POSIXBucket::check_bucket_shards(const DoutPrefixProvider* dpp, optional_yield y) +{ + return 0; +} + +int POSIXBucket::chown(const DoutPrefixProvider* dpp, User& new_user, optional_yield y) +{ + /* TODO map user to UID/GID, and change it */ + return 0; +} + +int POSIXBucket::put_info(const DoutPrefixProvider* dpp, bool exclusive, ceph::real_time _mtime, optional_yield y) +{ + mtime = _mtime; + + struct timespec ts[2]; + ts[0].tv_nsec = UTIME_OMIT; + ts[1] = ceph::real_clock::to_timespec(mtime); + int ret = utimensat(parent_fd, get_fname().c_str(), ts, AT_SYMLINK_NOFOLLOW); + if (ret < 0) { + ret = errno; + ldpp_dout(dpp, 0) << "ERROR: could not set mtime on bucket " << get_name() << ": " + << cpp_strerror(ret) << dendl; + return -ret; + } + + return write_attrs(dpp, y); +} + +int POSIXBucket::write_attrs(const DoutPrefixProvider* dpp, optional_yield y) +{ + int ret = open(dpp); + if (ret < 0) { + return ret; + } + + // Bucket info is stored as an attribute, but on in attrs[] + bufferlist bl; + encode(info, bl); + ret = write_x_attr(dpp, y, dir_fd, RGW_POSIX_ATTR_BUCKET_INFO, bl, get_name()); + if (ret < 0) { + return ret; + } + + for (auto& it : attrs) { + ret = write_x_attr(dpp, y, dir_fd, it.first, it.second, get_name()); + if (ret < 0) { + return ret; + } + } + return 0; +} + +int POSIXBucket::check_empty(const DoutPrefixProvider* dpp, optional_yield y) +{ + DIR* dir; + struct dirent* entry; + int ret; + + ret = open(dpp); + if (ret < 0) { + return ret; + } + + dir = fdopendir(dir_fd); + if (dir == NULL) { + ret = errno; + ldpp_dout(dpp, 0) << "ERROR: could not open bucket " << get_name() << " for listing: " + << cpp_strerror(ret) << dendl; + return -ret; + } + + errno = 0; + while ((entry = readdir(dir)) != NULL) { + if (entry->d_name[0] != '.') { + return -ENOTEMPTY; + } + if (entry->d_name[1] == '.' || entry->d_name[1] == '\0') { + continue; + } + } + return 0; +} + +int POSIXBucket::check_quota(const DoutPrefixProvider *dpp, RGWQuota& quota, uint64_t obj_size, + optional_yield y, bool check_size_only) +{ + return 0; +} + +int POSIXBucket::try_refresh_info(const DoutPrefixProvider* dpp, ceph::real_time* pmtime, optional_yield y) +{ + int ret = update_container_stats(dpp, y); + if (ret < 0) { + return ret; + } + + *pmtime = mtime; + + ret = open(dpp); + if (ret < 0) { + return ret; + } + get_x_attrs(y, dpp, dir_fd, attrs, get_name()); + + return 0; +} + +int POSIXBucket::read_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, + uint64_t end_epoch, uint32_t max_entries, + bool* is_truncated, RGWUsageIter& usage_iter, + std::map& usage) +{ + return 0; +} + +int POSIXBucket::trim_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch, optional_yield y) +{ + return 0; +} + +int POSIXBucket::remove_objs_from_index(const DoutPrefixProvider *dpp, std::list& objs_to_unlink) +{ + return 0; +} + +int POSIXBucket::check_index(const DoutPrefixProvider *dpp, std::map& existing_stats, std::map& calculated_stats) +{ + return 0; +} + +int POSIXBucket::rebuild_index(const DoutPrefixProvider *dpp) +{ + return 0; +} + +int POSIXBucket::set_tag_timeout(const DoutPrefixProvider *dpp, uint64_t timeout) +{ + return 0; +} + +int POSIXBucket::purge_instance(const DoutPrefixProvider* dpp, optional_yield y) +{ + return 0; +} + +std::unique_ptr POSIXBucket::get_multipart_upload( + const std::string& oid, + std::optional upload_id, + ACLOwner owner, ceph::real_time mtime) +{ + return std::make_unique(driver, this, oid, upload_id, owner, mtime); +} + +int POSIXBucket::list_multiparts(const DoutPrefixProvider *dpp, + const std::string& prefix, + std::string& marker, + const std::string& delim, + const int& max_uploads, + std::vector>& uploads, + std::map *common_prefixes, + bool *is_truncated, optional_yield y) +{ + //std::vector> nup; + //int ret; +// + //ret = next->list_multiparts(dpp, prefix, marker, delim, max_uploads, nup, + //common_prefixes, is_truncated); + //if (ret < 0) + //return ret; +// + //for (auto& ent : nup) { + //uploads.emplace_back(std::make_unique(std::move(ent), this, driver)); + //} + + return 0; +} + +int POSIXBucket::abort_multiparts(const DoutPrefixProvider* dpp, CephContext* cct, optional_yield y) +{ + return 0; +} + +int POSIXBucket::create(const DoutPrefixProvider* dpp, optional_yield y, bool* existed) +{ + int ret = mkdirat(parent_fd, get_fname().c_str(), S_IRWXU); + if (ret < 0) { + ret = errno; + if (ret != EEXIST) { + if (dpp) + ldpp_dout(dpp, 0) << "ERROR: could not create bucket " << get_name() << ": " + << cpp_strerror(ret) << dendl; + return -ret; + } else if (existed != nullptr) { + *existed = true; + } + return ret; + } + + return write_attrs(dpp, y); +} + +std::string POSIXBucket::get_fname() +{ + std::string name; + + if (ns) + name = "." + *ns + "_" + url_encode(get_name(), true); + else + name = url_encode(get_name(), true); + + return name; +} + +int POSIXBucket::get_shadow_bucket(const DoutPrefixProvider* dpp, optional_yield y, + const std::string& ns, + const std::string& tenant, const std::string& name, + bool create, std::unique_ptr* shadow) +{ + std::optional ons{std::nullopt}; + int ret; + POSIXBucket* bp; + rgw_bucket b; + + b.tenant = tenant; + b.name = name; + + if (!ns.empty()) { + ons = ns; + } + + open(dpp); + + bp = new POSIXBucket(driver, dir_fd, b, owner, ons); + ret = bp->load_bucket(dpp, y); + if (ret == -ENOENT && create) { + /* Create it if it doesn't exist */ + ret = bp->create(dpp, y, nullptr); + } + if (ret < 0) { + delete bp; + return ret; + } + + shadow->reset(bp); + return 0; +} + +template +int POSIXBucket::for_each(const DoutPrefixProvider* dpp, const F& func) +{ + DIR* dir; + struct dirent* entry; + int ret; + + ret = open(dpp); + if (ret < 0) { + return ret; + } + + dir = fdopendir(dir_fd); + if (dir == NULL) { + ret = errno; + ldpp_dout(dpp, 0) << "ERROR: could not open bucket " << get_name() << " for listing: " + << cpp_strerror(ret) << dendl; + return -ret; + } + + rewinddir(dir); + + while ((entry = readdir(dir)) != NULL) { + int r = func(entry->d_name); + if (r < 0) { + ret = r; + } + } + + if (ret == -EAGAIN) { + /* Limit reached */ + ret = 0; + } + return ret; +} + +int POSIXBucket::open(const DoutPrefixProvider* dpp) +{ + if (dir_fd >= 0) { + return 0; + } + + int ret = openat(parent_fd, get_fname().c_str(), + O_RDONLY | O_DIRECTORY | O_NOFOLLOW); + if (ret < 0) { + ret = errno; + ldpp_dout(dpp, 0) << "ERROR: could not open bucket " << get_name() << ": " + << cpp_strerror(ret) << dendl; + return -ret; + } + + dir_fd = ret; + + return 0; +} + +// This is for renaming a shadow bucket to a MP object. It won't work work for a normal bucket +int POSIXBucket::rename(const DoutPrefixProvider* dpp, optional_yield y, Object* target_obj) +{ + POSIXObject *to = static_cast(target_obj); + POSIXBucket *tb = static_cast(target_obj->get_bucket()); + std::string src_fname = get_fname(); + std::string dst_fname = to->get_fname(); + int flags = 0; + + if (to->exists(dpp)) { + flags = RENAME_EXCHANGE; + } + // swap + int ret = renameat2(tb->get_dir_fd(dpp), src_fname.c_str(), tb->get_dir_fd(dpp), dst_fname.c_str(), flags); + if(ret < 0) { + ret = errno; + ldpp_dout(dpp, 0) << "ERROR: renameat2 for shadow object could not finish: " + << cpp_strerror(ret) << dendl; + return -ret; + } + + // Update saved bucket info + info.bucket.name = to->get_name(); + bufferlist bl; + encode(info, bl); + ret = write_x_attr(dpp, y, dir_fd, RGW_POSIX_ATTR_BUCKET_INFO, bl, get_name()); + if (ret < 0) { + return ret; + } + + // Delete old one (could be file or directory) + struct statx stx; + ret = statx(parent_fd, src_fname.c_str(), AT_SYMLINK_NOFOLLOW, + STATX_ALL, &stx); + if (ret < 0) { + ret = errno; + if (ret == ENOENT) { + return 0; + } + ldpp_dout(dpp, 0) << "ERROR: could not stat object " << get_name() << ": " + << cpp_strerror(ret) << dendl; + return -ret; + } + + if (S_ISREG(stx.stx_mode)) { + ret = unlinkat(parent_fd, src_fname.c_str(), 0); + } else if (S_ISDIR(stx.stx_mode)) { + ret = delete_directory(parent_fd, src_fname.c_str(), true, dpp); + } + if (ret < 0) { + ret = errno; + ldpp_dout(dpp, 0) << "ERROR: could not remove old file " << get_name() + << ": " << cpp_strerror(ret) << dendl; + return -ret; + } + + return 0; +} + +int POSIXBucket::close() +{ + if (dir_fd < 0) { + return 0; + } + + ::close(dir_fd); + dir_fd = -1; + + return 0; +} + +int POSIXBucket::stat(const DoutPrefixProvider* dpp) +{ + if (stat_done) { + return 0; + } + + int ret = statx(parent_fd, get_fname().c_str(), AT_SYMLINK_NOFOLLOW, + STATX_ALL, &stx); + if (ret < 0) { + ret = errno; + ldpp_dout(dpp, 0) << "ERROR: could not stat bucket " << get_name() << ": " + << cpp_strerror(ret) << dendl; + return -ret; + } + if (!S_ISDIR(stx.stx_mode)) { + /* Not a bucket */ + return -EINVAL; + } + + stat_done = true; + return 0; +} + +/* This is a shadow bucket. Copy it into a new shadow bucket in the destination + * bucket */ +int POSIXBucket::copy(const DoutPrefixProvider *dpp, optional_yield y, + POSIXBucket* db, POSIXObject* dest) +{ + std::unique_ptr dsb; + + // Delete the target, in case it's not a multipart + int ret = dest->delete_object(dpp, y); + if (ret < 0) { + ldpp_dout(dpp, 0) << "ERROR: could not remove dest object " + << dest->get_name() << dendl; + return ret; + } + + ret = db->get_shadow_bucket(dpp, y, std::string(), std::string(), dest->get_fname(), true, &dsb); + if (ret < 0) { + ldpp_dout(dpp, 0) << "ERROR: could not create shadow bucket " << dest->get_name() + << " in bucket " << db->get_name() << dendl; + return ret; + } + + ret = for_each(dpp, [this, &dsb, &dpp, &y](const char *name) { + int ret; + std::unique_ptr sobj; + POSIXObject* sop; + std::unique_ptr dobj; + POSIXObject* dop; + + if (name[0] == '.') { + /* Skip dotfiles */ + return 0; + } + + sobj = this->get_object(decode_obj_key(name)); + sop = static_cast(sobj.get()); + if (!sop->exists(dpp)) { + ret = errno; + ldpp_dout(dpp, 0) << "ERROR: could not stat object " << name << ": " + << cpp_strerror(ret) << dendl; + return -ret; + } + ret = sop->open(dpp, true); + if (ret < 0) { + ldpp_dout(dpp, 0) << "ERROR: could not open source object " << get_name() + << dendl; + return ret; + } + + dobj = dsb->get_object(decode_obj_key(name)); + dop = static_cast(dobj.get()); + + return sop->copy(dpp, y, this, dsb.get(), dop); + }); + + return ret; +} + +int POSIXObject::delete_object(const DoutPrefixProvider* dpp, + optional_yield y, + bool prevent_versioning) +{ + POSIXBucket *b = static_cast(get_bucket()); + if (!b) { + ldpp_dout(dpp, 0) << "ERROR: could not get bucket for " << get_name() << dendl; + return -EINVAL; + } + + int ret = stat(dpp); + if (ret < 0) { + if (ret == -ENOENT) { + // Nothing to do + return 0; + } + return ret; + } + + if (!b->versioned()) { + if (shadow) { + ret = shadow->remove_bucket(dpp, true, false, nullptr, y); + if (ret < 0) { + return ret; + } + shadow.reset(nullptr); + } + + int ret = unlinkat(b->get_dir_fd(dpp), get_fname().c_str(), 0); + if (ret < 0) { + ret = errno; + if (errno != ENOENT) { + ldpp_dout(dpp, 0) << "ERROR: could not remove object " << get_name() + << ": " << cpp_strerror(ret) << dendl; + return -ret; + } + } + return 0; + } + + // Versioned directory. Need to remove all objects matching + b->for_each(dpp, [this, &dpp, &b](const char* name) { + int ret; + std::string_view vname(name); + + if (vname.find(get_fname().c_str()) != std::string_view::npos) { + ret = unlinkat(b->get_dir_fd(dpp), name, 0); + if (ret < 0) { + ret = errno; + if (errno != ENOENT) { + ldpp_dout(dpp, 0) << "ERROR: could not remove object " << name + << ": " << cpp_strerror(ret) << dendl; + return -ret; + } + } + } + return 0; + }); + + return 0; +} + +int POSIXObject::copy_object(User* user, + req_info* info, + const rgw_zone_id& source_zone, + rgw::sal::Object* dest_object, + rgw::sal::Bucket* dest_bucket, + rgw::sal::Bucket* src_bucket, + const rgw_placement_rule& dest_placement, + ceph::real_time* src_mtime, + ceph::real_time* mtime, + const ceph::real_time* mod_ptr, + const ceph::real_time* unmod_ptr, + bool high_precision_time, + const char* if_match, + const char* if_nomatch, + AttrsMod attrs_mod, + bool copy_if_newer, + Attrs& attrs, + RGWObjCategory category, + uint64_t olh_epoch, + boost::optional delete_at, + std::string* version_id, + std::string* tag, + std::string* etag, + void (*progress_cb)(off_t, void *), + void* progress_data, + const DoutPrefixProvider* dpp, + optional_yield y) +{ + int ret; + POSIXBucket *db = static_cast(dest_bucket); + POSIXBucket *sb = static_cast(src_bucket); + POSIXObject *dobj = static_cast(dest_object); + + if (!db || !sb) { + ldpp_dout(dpp, 0) << "ERROR: could not get bucket to copy " << get_name() + << dendl; + return -EINVAL; + } + + // Source must exist, and we need to know if it's a shadow obj + if (!exists(dpp)) { + ret = errno; + ldpp_dout(dpp, 0) << "ERROR: could not stat object " << get_name() << ": " + << cpp_strerror(ret) << dendl; + return -ret; + } + + if (shadow) { + return shadow->copy(dpp, y, db, dobj); + } else { + return copy(dpp, y, sb, db, dobj); + } +} + +int POSIXObject::get_obj_state(const DoutPrefixProvider* dpp, RGWObjState **pstate, optional_yield y, bool follow_olh) +{ + int ret = stat(dpp); + if (ret < 0) { + return ret; + } + *pstate = &state; + + return 0; +} + +int POSIXObject::set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, + Attrs* delattrs, optional_yield y) +{ + if (delattrs) { + for (auto& it : *delattrs) { + state.attrset.erase(it.first); + } + } + if (setattrs) { + for (auto& it : *setattrs) { + state.attrset[it.first] = it.second; + } + } + + for (auto& it : state.attrset) { + int ret = write_attr(dpp, y, it.first, it.second); + if (ret < 0) { + return ret; + } + } + return 0; +} + +int POSIXObject::get_obj_attrs(optional_yield y, const DoutPrefixProvider* dpp, + rgw_obj* target_obj) +{ + int ret = open(dpp, false); + if (ret < 0) { + return ret; + } + + return get_x_attrs(y, dpp, obj_fd, state.attrset, get_name()); +} + +int POSIXObject::modify_obj_attrs(const char* attr_name, bufferlist& attr_val, + optional_yield y, const DoutPrefixProvider* dpp) +{ + state.attrset[attr_name] = attr_val; + return write_attr(dpp, y, attr_name, attr_val); +} + +int POSIXObject::delete_obj_attrs(const DoutPrefixProvider* dpp, const char* attr_name, + optional_yield y) +{ + state.attrset.erase(attr_name); + + int ret = open(dpp, true); + if (ret < 0) { + return ret; + } + + ret = fremovexattr(obj_fd, attr_name); + if (ret < 0) { + ret = errno; + ldpp_dout(dpp, 0) << "ERROR: could not remover attribute " << attr_name << " for " << get_name() << ": " << cpp_strerror(ret) << dendl; + return -ret; + } + + return 0; +} + +bool POSIXObject::is_expired() +{ + bufferlist bl; + if (get_attr(state.attrset, RGW_ATTR_DELETE_AT, bl)) { + utime_t delete_at; + try { + auto bufit = bl.cbegin(); + decode(delete_at, bufit); + } catch (buffer::error& err) { + ldout(driver->ctx(), 0) << "ERROR: " << __func__ << ": failed to decode " RGW_ATTR_DELETE_AT " attr" << dendl; + return false; + } + + if (delete_at <= ceph_clock_now() && !delete_at.is_zero()) { + return true; + } + } + + return false; +} + +void POSIXObject::gen_rand_obj_instance_name() +{ + enum { OBJ_INSTANCE_LEN = 32 }; + char buf[OBJ_INSTANCE_LEN + 1]; + + gen_rand_alphanumeric_no_underscore(driver->ctx(), buf, OBJ_INSTANCE_LEN); + state.obj.key.set_instance(buf); +} + +std::unique_ptr POSIXObject::get_serializer(const DoutPrefixProvider *dpp, const std::string& lock_name) +{ + return std::make_unique(dpp, driver, this, lock_name); +} + +int MPPOSIXSerializer::try_lock(const DoutPrefixProvider *dpp, utime_t dur, optional_yield y) +{ + if (!obj->exists(dpp)) { + return -ENOENT; + } + + return 0; +} + +int POSIXObject::transition(Bucket* bucket, + const rgw_placement_rule& placement_rule, + const real_time& mtime, + uint64_t olh_epoch, + const DoutPrefixProvider* dpp, + optional_yield y) +{ + return -ERR_NOT_IMPLEMENTED; +} + +int POSIXObject::transition_to_cloud(Bucket* bucket, + rgw::sal::PlacementTier* tier, + rgw_bucket_dir_entry& o, + std::set& cloud_targets, + CephContext* cct, + bool update_object, + const DoutPrefixProvider* dpp, + optional_yield y) +{ + return -ERR_NOT_IMPLEMENTED; +} + +bool POSIXObject::placement_rules_match(rgw_placement_rule& r1, rgw_placement_rule& r2) +{ + return (r1 == r2); +} + +int POSIXObject::dump_obj_layout(const DoutPrefixProvider *dpp, optional_yield y, Formatter* f) +{ + return 0; +} + +int POSIXObject::swift_versioning_restore(bool& restored, + const DoutPrefixProvider* dpp, optional_yield y) +{ + return 0; +} + +int POSIXObject::swift_versioning_copy(const DoutPrefixProvider* dpp, + optional_yield y) +{ + return 0; +} + +int POSIXObject::omap_get_vals_by_keys(const DoutPrefixProvider *dpp, const std::string& oid, + const std::set& keys, + Attrs* vals) +{ + /* TODO Figure out omap */ + return 0; +} + +int POSIXObject::omap_set_val_by_key(const DoutPrefixProvider *dpp, const std::string& key, bufferlist& val, + bool must_exist, optional_yield y) +{ + /* TODO Figure out omap */ + return 0; +} + +int POSIXObject::chown(User& new_user, const DoutPrefixProvider* dpp, optional_yield y) +{ + POSIXBucket *b = static_cast(get_bucket()); + if (!b) { + ldpp_dout(dpp, 0) << "ERROR: could not get bucket for " << get_name() << dendl; + return -EINVAL; + } + /* TODO Get UID from user */ + int uid = 0; + int gid = 0; + + int ret = fchownat(b->get_dir_fd(dpp), get_fname().c_str(), uid, gid, AT_SYMLINK_NOFOLLOW); + if (ret < 0) { + ret = errno; + ldpp_dout(dpp, 0) << "ERROR: could not remove object " << get_name() << ": " + << cpp_strerror(ret) << dendl; + return -ret; + } + + return 0; +} + +int POSIXObject::stat(const DoutPrefixProvider* dpp) +{ + if (stat_done) { + return 0; + } + + state.exists = false; + POSIXBucket *b = static_cast(get_bucket()); + if (!b) { + ldpp_dout(dpp, 0) << "ERROR: could not get bucket for " << get_name() << dendl; + return -EINVAL; + } + + int ret = statx(b->get_dir_fd(dpp), get_fname().c_str(), AT_SYMLINK_NOFOLLOW, + STATX_ALL, &stx); + if (ret < 0) { + ret = errno; + ldpp_dout(dpp, 0) << "ERROR: could not stat object " << get_name() << ": " + << cpp_strerror(ret) << dendl; + return -ret; + } + if (S_ISREG(stx.stx_mode)) { + /* Normal object */ + state.accounted_size = state.size = stx.stx_size; + state.mtime = from_statx_timestamp(stx.stx_mtime); + } else if (S_ISDIR(stx.stx_mode)) { + /* multipart object */ + /* Get the shadow bucket */ + POSIXBucket* pb = static_cast(bucket); + ret = pb->get_shadow_bucket(dpp, null_yield, std::string(), + std::string(), get_fname(), false, &shadow); + if (ret < 0) { + return ret; + } + + state.mtime = from_statx_timestamp(stx.stx_mtime); + /* Add up size of parts */ + uint64_t total_size{0}; + int fd = shadow->get_dir_fd(dpp); + shadow->for_each(dpp, [this, &total_size, fd, &dpp](const char* name) { + int ret; + struct statx stx; + std::string sname = name; + + if (sname.rfind(MP_OBJ_PART_PFX, 0) != 0) { + /* Skip non-parts */ + return 0; + } + + ret = statx(fd, name, AT_SYMLINK_NOFOLLOW, STATX_ALL, &stx); + if (ret < 0) { + ret = errno; + ldpp_dout(dpp, 0) << "ERROR: could not stat object " << name << ": " << cpp_strerror(ret) << dendl; + return -ret; + } + + if (!S_ISREG(stx.stx_mode)) { + /* Skip non-files */ + return 0; + } + + parts[name] = stx.stx_size; + total_size += stx.stx_size; + return 0; + }); + state.accounted_size = state.size = total_size; + } else { + /* Not an object */ + return -EINVAL; + } + + stat_done = true; + state.exists = true; + + return 0; +} + +int POSIXObject::get_owner(const DoutPrefixProvider *dpp, optional_yield y, std::unique_ptr *owner) +{ + bufferlist bl; + rgw_user u; + if (!rgw::sal::get_attr(get_attrs(), RGW_POSIX_ATTR_OWNER, bl)) { + ldpp_dout(dpp, 0) << "ERROR: " << __func__ + << ": No " RGW_POSIX_ATTR_OWNER " attr" << dendl; + return -EINVAL; + } + + try { + auto bufit = bl.cbegin(); + decode(u, bufit); + } catch (buffer::error &err) { + ldpp_dout(dpp, 0) << "ERROR: " << __func__ + << ": failed to decode " RGW_POSIX_ATTR_OWNER " attr" << dendl; + return -EINVAL; + } + + *owner = driver->get_user(u); + (*owner)->load_user(dpp, y); + return 0; +} + +std::unique_ptr POSIXObject::get_read_op() +{ + return std::make_unique(this); +} + +std::unique_ptr POSIXObject::get_delete_op() +{ + return std::make_unique(this); +} + +int POSIXObject::open(const DoutPrefixProvider* dpp, bool create, bool temp_file) +{ + if (obj_fd >= 0) { + return 0; + } + + stat(dpp); + + if (shadow) { + obj_fd = shadow->get_dir_fd(dpp); + return obj_fd; + } + + POSIXBucket *b = static_cast(get_bucket()); + if (!b) { + ldpp_dout(dpp, 0) << "ERROR: could not get bucket for " << get_name() << dendl; + return -EINVAL; + } + + int ret, flags; + std::string path; + + if(temp_file) { + flags = O_TMPFILE | O_RDWR; + path = "."; + } else { + flags = O_RDWR | O_NOFOLLOW; + if (create) + flags |= O_CREAT; + path = get_fname(); + } + ret = openat(b->get_dir_fd(dpp), path.c_str(), flags, S_IRWXU); + if (ret < 0) { + ret = errno; + ldpp_dout(dpp, 0) << "ERROR: could not open object " << get_name() << ": " + << cpp_strerror(ret) << dendl; + return -ret; + } + + obj_fd = ret; + + return 0; +} + +int POSIXObject::link_temp_file(const DoutPrefixProvider *dpp, optional_yield y) +{ + if (obj_fd < 0) { + return 0; + } + + char temp_file_path[PATH_MAX]; + // Only works on Linux - Non-portable + snprintf(temp_file_path, PATH_MAX, "/proc/self/fd/%d", obj_fd); + + POSIXBucket *b = static_cast(get_bucket()); + + if (!b) { + ldpp_dout(dpp, 0) << "ERROR: could not get bucket for " << get_name() << dendl; + return -EINVAL; + } + + int ret = linkat(AT_FDCWD, temp_file_path, b->get_dir_fd(dpp), get_temp_fname().c_str(), AT_SYMLINK_FOLLOW); + if(ret < 0) { + ret = errno; + ldpp_dout(dpp, 0) << "ERROR: linkat for temp file could not finish: " + << cpp_strerror(ret) << dendl; + return -ret; + } + + // Delete the target, in case it's a multipart + ret = delete_object(dpp, y); + if (ret < 0) { + ldpp_dout(dpp, 0) << "ERROR: could not remove dest object " + << get_name() << dendl; + return ret; + } + + ret = renameat(b->get_dir_fd(dpp), get_temp_fname().c_str(), b->get_dir_fd(dpp), get_fname().c_str()); + if(ret < 0) { + ret = errno; + ldpp_dout(dpp, 0) << "ERROR: renameat for object could not finish: " + << cpp_strerror(ret) << dendl; + return -ret; + } + + return 0; +} + + +int POSIXObject::close() +{ + if (obj_fd < 0) { + return 0; + } + + int ret = ::fsync(obj_fd); + if(ret < 0) { + return ret; + } + + ret = ::close(obj_fd); + if(ret < 0) { + return ret; + } + obj_fd = -1; + + return 0; +} + +int POSIXObject::read(int64_t ofs, int64_t left, bufferlist& bl, + const DoutPrefixProvider* dpp, optional_yield y) +{ + if (!shadow) { + // Normal file, just read it + int64_t len = std::min(left + 1, READ_SIZE); + ssize_t ret; + + ret = lseek(obj_fd, ofs, SEEK_SET); + if (ret < 0) { + ret = errno; + ldpp_dout(dpp, 0) << "ERROR: could not seek object " << get_name() << " to " + << ofs << " :" << cpp_strerror(ret) << dendl; + return -ret; + } + + char read_buf[READ_SIZE]; + ret = ::read(obj_fd, read_buf, len); + if (ret < 0) { + ret = errno; + ldpp_dout(dpp, 0) << "ERROR: could not read object " << get_name() << ": " + << cpp_strerror(ret) << dendl; + return -ret; + } + + bl.append(read_buf, ret); + + return ret; + } + + // It's a multipart object, find the correct file, open it, and read it + std::string pname; + for (auto part : parts) { + if (ofs < part.second) { + pname = part.first; + break; + } + + ofs -= part.second; + } + + if (pname.empty()) { + // ofs is past the end + return 0; + } + + POSIXObject* shadow_obj; + std::unique_ptr obj = shadow->get_object(rgw_obj_key(pname)); + shadow_obj = static_cast(obj.get()); + int ret = shadow_obj->open(dpp, false); + if (ret < 0) { + return ret; + } + + return shadow_obj->read(ofs, left, bl, dpp, y); +} + +int POSIXObject::write(int64_t ofs, bufferlist& bl, const DoutPrefixProvider* dpp, + optional_yield y) +{ + if (shadow) { + // Can't write to a MP file + return -EINVAL; + } + + int64_t left = bl.length(); + char* curp = bl.c_str(); + ssize_t ret; + + ret = fchmod(obj_fd, S_IRUSR|S_IWUSR); + if(ret < 0) { + ldpp_dout(dpp, 0) << "ERROR: could not change permissions on object " << get_name() << ": " + << cpp_strerror(ret) << dendl; + return ret; + } + + + ret = lseek(obj_fd, ofs, SEEK_SET); + if (ret < 0) { + ret = errno; + ldpp_dout(dpp, 0) << "ERROR: could not seek object " << get_name() << " to " + << ofs << " :" << cpp_strerror(ret) << dendl; + return -ret; + } + + while (left > 0) { + ret = ::write(obj_fd, curp, left); + if (ret < 0) { + ret = errno; + ldpp_dout(dpp, 0) << "ERROR: could not write object " << get_name() << ": " + << cpp_strerror(ret) << dendl; + return -ret; + } + + curp += ret; + left -= ret; + } + + return 0; +} + +int POSIXObject::write_attr(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key, bufferlist& value) +{ + int ret; + std::string attrname; + + ret = open(dpp, true); + if (ret < 0) { + return ret; + } + + return write_x_attr(dpp, y, obj_fd, key, value, get_name()); +} + +int POSIXObject::POSIXReadOp::prepare(optional_yield y, const DoutPrefixProvider* dpp) +{ + int ret = source->stat(dpp); + if (ret < 0) + return ret; + + ret = source->get_obj_attrs(y, dpp); + if (ret < 0) + return ret; + + bufferlist etag_bl; + if (!rgw::sal::get_attr(source->get_attrs(), RGW_ATTR_ETAG, etag_bl)) { + /* Sideloaded file. Generate necessary attributes. Only done once. */ + int ret = source->generate_attrs(dpp, y); + if (ret < 0) { + ldpp_dout(dpp, 0) << " ERROR: could not generate attrs for " << source->get_name() << " error: " << cpp_strerror(ret) << dendl; + return ret; + } + } + + if (!rgw::sal::get_attr(source->get_attrs(), RGW_ATTR_ETAG, etag_bl)) { + return -EINVAL; + } + +#if 0 // WIP + if (params.mod_ptr || params.unmod_ptr) { + obj_time_weight src_weight; + src_weight.init(astate); + src_weight.high_precision = params.high_precision_time; + + obj_time_weight dest_weight; + dest_weight.high_precision = params.high_precision_time; + + if (params.mod_ptr && !params.if_nomatch) { + dest_weight.init(*params.mod_ptr, params.mod_zone_id, params.mod_pg_ver); + ldpp_dout(dpp, 10) << "If-Modified-Since: " << dest_weight << " Last-Modified: " << src_weight << dendl; + if (!(dest_weight < src_weight)) { + return -ERR_NOT_MODIFIED; + } + } + + if (params.unmod_ptr && !params.if_match) { + dest_weight.init(*params.unmod_ptr, params.mod_zone_id, params.mod_pg_ver); + ldpp_dout(dpp, 10) << "If-UnModified-Since: " << dest_weight << " Last-Modified: " << src_weight << dendl; + if (dest_weight < src_weight) { + return -ERR_PRECONDITION_FAILED; + } + } + } +#endif + + if (params.mod_ptr || params.unmod_ptr) { + if (params.mod_ptr && !params.if_nomatch) { + ldpp_dout(dpp, 10) << "If-Modified-Since: " << *params.mod_ptr << " Last-Modified: " << source->get_mtime() << dendl; + if (!(*params.mod_ptr < source->get_mtime())) { + return -ERR_NOT_MODIFIED; + } + } + + if (params.unmod_ptr && !params.if_match) { + ldpp_dout(dpp, 10) << "If-Modified-Since: " << *params.unmod_ptr << " Last-Modified: " << source->get_mtime() << dendl; + if (*params.unmod_ptr < source->get_mtime()) { + return -ERR_PRECONDITION_FAILED; + } + } + } + + if (params.if_match) { + std::string if_match_str = rgw_string_unquote(params.if_match); + ldpp_dout(dpp, 10) << "If-Match: " << if_match_str << " ETAG: " << etag_bl.c_str() << dendl; + + if (if_match_str.compare(0, etag_bl.length(), etag_bl.c_str(), etag_bl.length()) != 0) { + return -ERR_PRECONDITION_FAILED; + } + } + if (params.if_nomatch) { + std::string if_nomatch_str = rgw_string_unquote(params.if_nomatch); + ldpp_dout(dpp, 10) << "If-No-Match: " << if_nomatch_str << " ETAG: " << etag_bl.c_str() << dendl; + if (if_nomatch_str.compare(0, etag_bl.length(), etag_bl.c_str(), etag_bl.length()) == 0) { + return -ERR_NOT_MODIFIED; + } + } + + if (params.lastmod) { + *params.lastmod = source->get_mtime(); + } + + return 0; +} + +int POSIXObject::POSIXReadOp::read(int64_t ofs, int64_t end, bufferlist& bl, + optional_yield y, const DoutPrefixProvider* dpp) +{ + return source->read(ofs, end + 1, bl, dpp, y); +} + +int POSIXObject::generate_attrs(const DoutPrefixProvider* dpp, optional_yield y) +{ + int ret; + + /* Generate an ETAG */ + if (shadow) { + ret = generate_mp_etag(dpp, y); + } else { + ret = generate_etag(dpp, y); + } + + return ret; +} + +int POSIXObject::generate_mp_etag(const DoutPrefixProvider* dpp, optional_yield y) +{ + int64_t count = 0; + char etag_buf[CEPH_CRYPTO_MD5_DIGESTSIZE]; + char final_etag_str[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 16]; + std::string etag; + bufferlist etag_bl; + MD5 hash; + // Allow use of MD5 digest in FIPS mode for non-cryptographic purposes + hash.SetFlags(EVP_MD_CTX_FLAG_NON_FIPS_ALLOW); + int ret; + rgw::sal::Bucket::ListParams params; + rgw::sal::Bucket::ListResults results; + + do { + static constexpr auto MAX_LIST_OBJS = 100u; + ret = shadow->list(dpp, params, MAX_LIST_OBJS, results, y); + if (ret < 0) { + return ret; + } + for (rgw_bucket_dir_entry& ent : results.objs) { + std::unique_ptr obj; + POSIXObject* shadow_obj; + + if (MP_OBJ_PART_PFX.compare(0, std::string::npos, ent.key.name, + MP_OBJ_PART_PFX.size() != 0)) { + // Skip non-parts + continue; + } + + obj = shadow->get_object(rgw_obj_key(ent.key)); + shadow_obj = static_cast(obj.get()); + ret = shadow_obj->get_obj_attrs(y, dpp); + if (ret < 0) { + return ret; + } + bufferlist etag_bl; + if (!get_attr(shadow_obj->get_attrs(), RGW_ATTR_ETAG, etag_bl)) { + // Generate part's etag + ret = shadow_obj->generate_etag(dpp, y); + if (ret < 0) + return ret; + } + if (!get_attr(shadow_obj->get_attrs(), RGW_ATTR_ETAG, etag_bl)) { + // Can't get etag. + return -EINVAL; + } + hex_to_buf(etag_bl.c_str(), etag_buf, CEPH_CRYPTO_MD5_DIGESTSIZE); + hash.Update((const unsigned char *)etag_buf, sizeof(etag_buf)); + count++; + } + } while (results.is_truncated); + + hash.Final((unsigned char *)etag_buf); + + buf_to_hex((unsigned char *)etag_buf, sizeof(etag_buf), final_etag_str); + snprintf(&final_etag_str[CEPH_CRYPTO_MD5_DIGESTSIZE * 2], + sizeof(final_etag_str) - CEPH_CRYPTO_MD5_DIGESTSIZE * 2, + "-%lld", (long long)count); + etag = final_etag_str; + ldpp_dout(dpp, 10) << "calculated etag: " << etag << dendl; + + etag_bl.append(etag); + (void)write_attr(dpp, y, RGW_ATTR_ETAG, etag_bl); + get_attrs().emplace(std::move(RGW_ATTR_ETAG), std::move(etag_bl)); + + return 0; +} + +int POSIXObject::generate_etag(const DoutPrefixProvider* dpp, optional_yield y) +{ + int64_t left = get_obj_size(); + int64_t cur_ofs = 0; + MD5 hash; + // Allow use of MD5 digest in FIPS mode for non-cryptographic purposes + hash.SetFlags(EVP_MD_CTX_FLAG_NON_FIPS_ALLOW); + char calc_md5[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 1]; + unsigned char m[CEPH_CRYPTO_MD5_DIGESTSIZE]; + + bufferlist etag_bl; + + while (left > 0) { + bufferlist bl; + int len = read(cur_ofs, left, bl, dpp, y); + if (len < 0) { + ldpp_dout(dpp, 0) << " ERROR: could not read " << get_name() << + " ofs: " << cur_ofs << " error: " << cpp_strerror(len) << dendl; + return len; + } else if (len == 0) { + /* Done */ + break; + } + hash.Update((const unsigned char *)bl.c_str(), bl.length()); + + left -= len; + cur_ofs += len; + } + + hash.Final(m); + buf_to_hex(m, CEPH_CRYPTO_MD5_DIGESTSIZE, calc_md5); + etag_bl.append(calc_md5, sizeof(calc_md5)); + (void)write_attr(dpp, y, RGW_ATTR_ETAG, etag_bl); + get_attrs().emplace(std::move(RGW_ATTR_ETAG), std::move(etag_bl)); + + return 0; +} + +const std::string POSIXObject::get_fname() +{ + std::string fname = url_encode(get_obj().get_oid(), true); + + if (!get_obj().key.get_ns().empty()) { + /* Namespaced objects are hidden */ + fname.insert(0, 1, '.'); + } + + return fname; +} + +void POSIXObject::gen_temp_fname() +{ + enum { RAND_SUFFIX_SIZE = 8 }; + char buf[RAND_SUFFIX_SIZE + 1]; + + gen_rand_alphanumeric_no_underscore(driver->ctx(), buf, RAND_SUFFIX_SIZE); + temp_fname = "." + get_fname() + "."; + temp_fname.append(buf); +} + +const std::string POSIXObject::get_temp_fname() +{ + return temp_fname; +} + +int POSIXObject::POSIXReadOp::iterate(const DoutPrefixProvider* dpp, int64_t ofs, + int64_t end, RGWGetDataCB* cb, optional_yield y) +{ + int64_t left; + int64_t cur_ofs = ofs; + + if (end < 0) + left = 0; + else + left = end - ofs + 1; + + while (left > 0) { + bufferlist bl; + int len = source->read(cur_ofs, left, bl, dpp, y); + if (len < 0) { + ldpp_dout(dpp, 0) << " ERROR: could not read " << source->get_name() << + " ofs: " << cur_ofs << " error: " << cpp_strerror(len) << dendl; + return len; + } else if (len == 0) { + /* Done */ + break; + } + + /* Read some */ + int ret = cb->handle_data(bl, 0, len); + if (ret < 0) { + ldpp_dout(dpp, 0) << " ERROR: callback failed on " << source->get_name() << dendl; + return ret; + } + + left -= len; + cur_ofs += len; + } + + /* Doesn't seem to be anything needed from params */ + return 0; +} + +int POSIXObject::POSIXReadOp::get_attr(const DoutPrefixProvider* dpp, const char* name, bufferlist& dest, optional_yield y) +{ + if (!source->exists(dpp)) { + return -ENOENT; + } + if (source->get_obj_attrs(y, dpp) < 0) { + return -ENODATA; + } + if (!rgw::sal::get_attr(source->get_attrs(), name, dest)) { + return -ENODATA; + } + + return 0; +} + +int POSIXObject::POSIXDeleteOp::delete_obj(const DoutPrefixProvider* dpp, + optional_yield y) +{ + return source->delete_object(dpp, y, false); +} + +int POSIXObject::copy(const DoutPrefixProvider *dpp, optional_yield y, + POSIXBucket *sb, POSIXBucket *db, POSIXObject *dobj) +{ + off64_t scount = 0, dcount = 0; + + int ret = open(dpp, false); + if (ret < 0) { + ldpp_dout(dpp, 0) << "ERROR: could not open source object " << get_name() + << dendl; + return ret; + } + + // Delete the target, in case it's a multipart + ret = dobj->delete_object(dpp, y); + if (ret < 0) { + ldpp_dout(dpp, 0) << "ERROR: could not remove dest object " + << dobj->get_name() << dendl; + return ret; + } + + ret = dobj->open(dpp, true); + if (ret < 0) { + ldpp_dout(dpp, 0) << "ERROR: could not open dest object " + << dobj->get_name() << dendl; + return ret; + } + + ret = copy_file_range(obj_fd, &scount, dobj->get_fd(), &dcount, stx.stx_size, 0); + if (ret < 0) { + ret = errno; + ldpp_dout(dpp, 0) << "ERROR: could not copy object " << dobj->get_name() + << ": " << cpp_strerror(ret) << dendl; + return -ret; + } + + ret = get_obj_attrs(y, dpp); + if (ret < 0) { + ldpp_dout(dpp, 0) << "ERROR: could not get attrs for source object " + << get_name() << dendl; + return ret; + } + + ret = dobj->set_obj_attrs(dpp, &get_attrs(), NULL, y); + if (ret < 0) { + ldpp_dout(dpp, 0) << "ERROR: could not write attrs to dest object " + << dobj->get_name() << dendl; + return ret; + } + + return 0; +} + +void POSIXMPObj::init_gen(POSIXDriver* driver, const std::string& _oid, ACLOwner& _owner) +{ + char buf[33]; + std::string new_id = MULTIPART_UPLOAD_ID_PREFIX; /* v2 upload id */ + /* Generate an upload ID */ + + gen_rand_alphanumeric(driver->ctx(), buf, sizeof(buf) - 1); + new_id.append(buf); + init(_oid, new_id, _owner); +} + +int POSIXMultipartPart::load(const DoutPrefixProvider* dpp, optional_yield y, + POSIXDriver* driver, rgw_obj_key& key) +{ + if (shadow) { + /* Already loaded */ + return 0; + } + + shadow = std::make_unique(driver, key, upload->get_shadow()); + + RGWObjState* pstate; + // Stat the shadow object to get things like size + int ret = shadow->get_obj_state(dpp, &pstate, y); + if (ret < 0) { + return ret; + } + + ret = shadow->get_obj_attrs(y, dpp); + if (ret < 0) { + return ret; + } + + auto ait = shadow->get_attrs().find(RGW_POSIX_ATTR_MPUPLOAD); + if (ait == shadow->get_attrs().end()) { + ldout(driver->ctx(), 0) << "ERROR: " << __func__ << ": Not a part: " << key << dendl; + return -EINVAL; + } + + try { + auto bit = ait->second.cbegin(); + decode(info, bit); + } catch (buffer::error& err) { + ldout(driver->ctx(), 0) << "ERROR: " << __func__ << ": failed to decode part info: " << key << dendl; + return -EINVAL; + } + + return 0; +} + +int POSIXMultipartUpload::load(bool create) +{ + if (!shadow) { + POSIXBucket* pb = static_cast(bucket); + return pb->get_shadow_bucket(nullptr, null_yield, mp_ns, + std::string(), get_meta(), create, &shadow); + } + + return 0; +} + +std::unique_ptr POSIXMultipartUpload::get_meta_obj() +{ + load(); + if (!shadow) { + // This upload doesn't exist, but the API doesn't check this until it calls + // on the *serializer*. So make a fake object in the parent bucket that + // doesn't exist. Put it in the MP namespace just in case. + return bucket->get_object(rgw_obj_key(get_meta(), std::string(), mp_ns)); + } + return shadow->get_object(rgw_obj_key(get_meta(), std::string())); +} + +int POSIXMultipartUpload::init(const DoutPrefixProvider *dpp, optional_yield y, + ACLOwner& owner, rgw_placement_rule& dest_placement, + rgw::sal::Attrs& attrs) +{ + int ret; + + /* Create the shadow bucket */ + ret = load(true); + if (ret < 0) { + ldpp_dout(dpp, 0) << " ERROR: could not get shadow bucket for mp upload " + << get_key() << dendl; + return ret; + } + + /* Now create the meta object */ + std::unique_ptr meta_obj; + + meta_obj = get_meta_obj(); + + mp_obj.upload_info.dest_placement = dest_placement; + + bufferlist bl; + encode(mp_obj, bl); + + attrs[RGW_POSIX_ATTR_MPUPLOAD] = bl; + + return meta_obj->set_obj_attrs(dpp, &attrs, nullptr, y); +} + +int POSIXMultipartUpload::list_parts(const DoutPrefixProvider *dpp, CephContext *cct, + int num_parts, int marker, + int *next_marker, bool *truncated, optional_yield y, + bool assume_unsorted) +{ + int ret; + int last_num = 0; + + ret = load(); + if (ret < 0) { + return ret; + } + + rgw::sal::Bucket::ListParams params; + rgw::sal::Bucket::ListResults results; + + params.prefix = MP_OBJ_PART_PFX; + params.marker = MP_OBJ_PART_PFX + fmt::format("{:0>5}", marker); + + ret = shadow->list(dpp, params, num_parts + 1, results, y); + if (ret < 0) { + return ret; + } + for (rgw_bucket_dir_entry& ent : results.objs) { + std::unique_ptr part = std::make_unique(this); + POSIXMultipartPart* ppart = static_cast(part.get()); + + rgw_obj_key key(ent.key); + ret = ppart->load(dpp, y, driver, key); + if (ret == 0) { + /* Skip anything that's not a part */ + last_num = part->get_num(); + parts[part->get_num()] = std::move(part); + } + if (parts.size() == (ulong)num_parts) + break; + } + + if (truncated) + *truncated = results.is_truncated; + + if (next_marker) + *next_marker = last_num; + + return 0; +} + +int POSIXMultipartUpload::abort(const DoutPrefixProvider *dpp, CephContext *cct, optional_yield y) +{ + int ret; + + ret = load(); + if (ret < 0) { + return ret; + } + + shadow->remove_bucket(dpp, true, false, nullptr, y); + + return 0; +} + +int POSIXMultipartUpload::complete(const DoutPrefixProvider *dpp, + optional_yield y, CephContext* cct, + std::map& part_etags, + std::list& remove_objs, + uint64_t& accounted_size, bool& compressed, + RGWCompressionInfo& cs_info, off_t& ofs, + std::string& tag, ACLOwner& owner, + uint64_t olh_epoch, + rgw::sal::Object* target_obj) +{ + char final_etag[CEPH_CRYPTO_MD5_DIGESTSIZE]; + char final_etag_str[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 16]; + std::string etag; + bufferlist etag_bl; + MD5 hash; + // Allow use of MD5 digest in FIPS mode for non-cryptographic purposes + hash.SetFlags(EVP_MD_CTX_FLAG_NON_FIPS_ALLOW); + bool truncated; + int ret; + + int total_parts = 0; + int handled_parts = 0; + int max_parts = 1000; + int marker = 0; + uint64_t min_part_size = cct->_conf->rgw_multipart_min_part_size; + auto etags_iter = part_etags.begin(); + rgw::sal::Attrs attrs = target_obj->get_attrs(); + + do { + ret = list_parts(dpp, cct, max_parts, marker, &marker, &truncated, y); + if (ret == -ENOENT) { + ret = -ERR_NO_SUCH_UPLOAD; + } + if (ret < 0) + return ret; + + total_parts += parts.size(); + if (!truncated && total_parts != (int)part_etags.size()) { + ldpp_dout(dpp, 0) << "NOTICE: total parts mismatch: have: " << total_parts + << " expected: " << part_etags.size() << dendl; + ret = -ERR_INVALID_PART; + return ret; + } + + for (auto obj_iter = parts.begin(); etags_iter != part_etags.end() && obj_iter != parts.end(); ++etags_iter, ++obj_iter, ++handled_parts) { + POSIXMultipartPart* part = static_cast(obj_iter->second.get()); + uint64_t part_size = part->get_size(); + if (handled_parts < (int)part_etags.size() - 1 && + part_size < min_part_size) { + ret = -ERR_TOO_SMALL; + return ret; + } + + char petag[CEPH_CRYPTO_MD5_DIGESTSIZE]; + if (etags_iter->first != (int)obj_iter->first) { + ldpp_dout(dpp, 0) << "NOTICE: parts num mismatch: next requested: " + << etags_iter->first << " next uploaded: " + << obj_iter->first << dendl; + ret = -ERR_INVALID_PART; + return ret; + } + std::string part_etag = rgw_string_unquote(etags_iter->second); + if (part_etag.compare(part->get_etag()) != 0) { + ldpp_dout(dpp, 0) << "NOTICE: etag mismatch: part: " << etags_iter->first + << " etag: " << etags_iter->second << dendl; + ret = -ERR_INVALID_PART; + return ret; + } + + hex_to_buf(part->get_etag().c_str(), petag, + CEPH_CRYPTO_MD5_DIGESTSIZE); + hash.Update((const unsigned char *)petag, sizeof(petag)); + + // Compression is not supported yet +#if 0 + RGWUploadPartInfo& obj_part = part->info; + + bool part_compressed = (obj_part.cs_info.compression_type != "none"); + if ((handled_parts > 0) && + ((part_compressed != compressed) || + (cs_info.compression_type != obj_part.cs_info.compression_type))) { + ldpp_dout(dpp, 0) << "ERROR: compression type was changed during multipart upload (" + << cs_info.compression_type << ">>" << obj_part.cs_info.compression_type << ")" << dendl; + ret = -ERR_INVALID_PART; + return ret; + } + + if (part_compressed) { + int64_t new_ofs; // offset in compression data for new part + if (cs_info.blocks.size() > 0) + new_ofs = cs_info.blocks.back().new_ofs + cs_info.blocks.back().len; + else + new_ofs = 0; + for (const auto& block : obj_part.cs_info.blocks) { + compression_block cb; + cb.old_ofs = block.old_ofs + cs_info.orig_size; + cb.new_ofs = new_ofs; + cb.len = block.len; + cs_info.blocks.push_back(cb); + new_ofs = cb.new_ofs + cb.len; + } + if (!compressed) + cs_info.compression_type = obj_part.cs_info.compression_type; + cs_info.orig_size += obj_part.cs_info.orig_size; + compressed = true; + } +#endif + + ofs += part->get_size(); + accounted_size += part->get_size(); + } + } while (truncated); + hash.Final((unsigned char *)final_etag); + + buf_to_hex((unsigned char *)final_etag, sizeof(final_etag), final_etag_str); + snprintf(&final_etag_str[CEPH_CRYPTO_MD5_DIGESTSIZE * 2], + sizeof(final_etag_str) - CEPH_CRYPTO_MD5_DIGESTSIZE * 2, + "-%lld", (long long)part_etags.size()); + etag = final_etag_str; + ldpp_dout(dpp, 10) << "calculated etag: " << etag << dendl; + + etag_bl.append(etag); + + attrs[RGW_ATTR_ETAG] = etag_bl; + + if (compressed) { + // write compression attribute to full object + bufferlist tmp; + encode(cs_info, tmp); + attrs[RGW_ATTR_COMPRESSION] = tmp; + } + + ret = shadow->merge_and_store_attrs(dpp, attrs, y); + if (ret < 0) { + return ret; + } + + // Rename to target_obj + return shadow->rename(dpp, y, target_obj); +} + +int POSIXMultipartUpload::get_info(const DoutPrefixProvider *dpp, optional_yield y, + rgw_placement_rule** rule, rgw::sal::Attrs* attrs) +{ + std::unique_ptr meta_obj; + int ret; + + if (!rule && !attrs) { + return 0; + } + + if (attrs) { + meta_obj = get_meta_obj(); + int ret = meta_obj->get_obj_attrs(y, dpp); + if (ret < 0) { + ldpp_dout(dpp, 0) << " ERROR: could not get meta object for mp upload " + << get_key() << dendl; + return ret; + } + *attrs = meta_obj->get_attrs(); + } + + if (rule) { + if (mp_obj.oid.empty()) { + if (!meta_obj) { + meta_obj = get_meta_obj(); + ret = meta_obj->get_obj_attrs(y, dpp); + if (ret < 0) { + ldpp_dout(dpp, 0) << " ERROR: could not get meta object for mp upload " + << get_key() << dendl; + return ret; + } + } + bufferlist bl; + if (!get_attr(meta_obj->get_attrs(), RGW_POSIX_ATTR_MPUPLOAD, bl)) { + ldpp_dout(dpp, 0) << " ERROR: could not get meta object attrs for mp upload " + << get_key() << dendl; + return ret; + } + auto biter = bl.cbegin(); + decode(mp_obj, biter); + } + *rule = &mp_obj.upload_info.dest_placement; + } + + return 0; +} + +std::unique_ptr POSIXMultipartUpload::get_writer( + const DoutPrefixProvider *dpp, + optional_yield y, + rgw::sal::Object* _head_obj, + const rgw_user& owner, + const rgw_placement_rule *ptail_placement_rule, + uint64_t part_num, + const std::string& part_num_str) +{ + std::string fname = MP_OBJ_PART_PFX + fmt::format("{:0>5}", part_num); + rgw_obj_key part_key(fname); + + load(); + + return std::make_unique(dpp, y, shadow->clone(), part_key, driver, + owner, ptail_placement_rule, part_num); +} + +int POSIXMultipartWriter::prepare(optional_yield y) +{ + return obj->open(dpp, true); +} + +int POSIXMultipartWriter::process(bufferlist&& data, uint64_t offset) +{ + return obj->write(offset, data, dpp, null_yield); +} + +int POSIXMultipartWriter::complete(size_t accounted_size, const std::string& etag, + ceph::real_time *mtime, ceph::real_time set_mtime, + std::map& attrs, + ceph::real_time delete_at, + const char *if_match, const char *if_nomatch, + const std::string *user_data, + rgw_zone_set *zones_trace, bool *canceled, + optional_yield y) +{ + int ret; + POSIXUploadPartInfo info; + + if (if_match) { + if (strcmp(if_match, "*") == 0) { + // test the object is existing + if (!obj->exists(dpp)) { + return -ERR_PRECONDITION_FAILED; + } + } else { + bufferlist bl; + if (!get_attr(obj->get_attrs(), RGW_ATTR_ETAG, bl)) { + return -ERR_PRECONDITION_FAILED; + } + if (strncmp(if_match, bl.c_str(), bl.length()) != 0) { + return -ERR_PRECONDITION_FAILED; + } + } + } + + info.num = part_num; + info.etag = etag; + info.mtime = set_mtime; + + bufferlist bl; + encode(info, bl); + attrs[RGW_POSIX_ATTR_MPUPLOAD] = bl; + + for (auto& attr : attrs) { + ret = obj->write_attr(dpp, y, attr.first, attr.second); + if (ret < 0) { + ldpp_dout(dpp, 20) << "ERROR: failed writing attr " << attr.first << dendl; + return ret; + } + } + + ret = obj->close(); + if (ret < 0) { + ldpp_dout(dpp, 20) << "ERROR: failed closing file" << dendl; + return ret; + } + + return 0; +} + +int POSIXAtomicWriter::prepare(optional_yield y) +{ + obj.get_obj_attrs(y, dpp); + obj.close(); + obj.gen_temp_fname(); + return obj.open(dpp, true, true); +} + +int POSIXAtomicWriter::process(bufferlist&& data, uint64_t offset) +{ + return obj.write(offset, data, dpp, null_yield); +} + +int POSIXAtomicWriter::complete(size_t accounted_size, const std::string& etag, + ceph::real_time *mtime, ceph::real_time set_mtime, + std::map& attrs, + ceph::real_time delete_at, + const char *if_match, const char *if_nomatch, + const std::string *user_data, + rgw_zone_set *zones_trace, bool *canceled, + optional_yield y) +{ + int ret; + + if (if_match) { + if (strcmp(if_match, "*") == 0) { + // test the object is existing + if (!obj.exists(dpp)) { + return -ERR_PRECONDITION_FAILED; + } + } else { + bufferlist bl; + if (!get_attr(obj.get_attrs(), RGW_ATTR_ETAG, bl)) { + return -ERR_PRECONDITION_FAILED; + } + if (strncmp(if_match, bl.c_str(), bl.length()) != 0) { + return -ERR_PRECONDITION_FAILED; + } + } + } + if (if_nomatch) { + if (strcmp(if_nomatch, "*") == 0) { + // test the object is not existing + if (obj.exists(dpp)) { + return -ERR_PRECONDITION_FAILED; + } + } else { + bufferlist bl; + if (!get_attr(obj.get_attrs(), RGW_ATTR_ETAG, bl)) { + return -ERR_PRECONDITION_FAILED; + } + if (strncmp(if_nomatch, bl.c_str(), bl.length()) == 0) { + return -ERR_PRECONDITION_FAILED; + } + } + } + + bufferlist bl; + encode(owner, bl); + attrs[RGW_POSIX_ATTR_OWNER] = bl; + + for (auto attr : attrs) { + ret = obj.write_attr(dpp, y, attr.first, attr.second); + if (ret < 0) { + ldpp_dout(dpp, 20) << "ERROR: POSIXAtomicWriter failed writing attr " << attr.first << dendl; + return ret; + } + } + + ret = obj.link_temp_file(dpp, y); + if (ret < 0) { + ldpp_dout(dpp, 20) << "ERROR: POSIXAtomicWriter failed writing temp file" << dendl; + return ret; + } + + ret = obj.close(); + if (ret < 0) { + ldpp_dout(dpp, 20) << "ERROR: POSIXAtomicWriter failed closing file" << dendl; + return ret; + } + + return 0; +} + +} } // namespace rgw::sal + +extern "C" { + +rgw::sal::Driver* newPOSIXDriver(rgw::sal::Driver* next) +{ + rgw::sal::POSIXDriver* driver = new rgw::sal::POSIXDriver(next); + + return driver; +} + +} diff --git a/src/rgw/driver/posix/rgw_sal_posix.h b/src/rgw/driver/posix/rgw_sal_posix.h new file mode 100644 index 00000000000..bd1285c3551 --- /dev/null +++ b/src/rgw/driver/posix/rgw_sal_posix.h @@ -0,0 +1,688 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +/* + * Ceph - scalable distributed file system + * + * Copyright contributors to the Ceph project + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include "rgw_sal_filter.h" +#include "rgw_sal_store.h" +#include +#include "common/dout.h" +#include "bucket_cache.h" + +namespace rgw { namespace sal { + +class POSIXDriver; +class POSIXBucket; +class POSIXObject; + +using BucketCache = file::listing::BucketCache; + +class POSIXDriver : public FilterDriver { +private: + + std::unique_ptr bucket_cache; + std::string base_path; + int root_fd; + +public: + POSIXDriver(Driver* _next) : FilterDriver(_next) + { } + virtual ~POSIXDriver() { close(); } + virtual int initialize(CephContext *cct, const DoutPrefixProvider *dpp) override; + virtual std::unique_ptr get_user(const rgw_user& u) override; + virtual int get_user_by_access_key(const DoutPrefixProvider* dpp, const + std::string& key, optional_yield y, + std::unique_ptr* user) override; + virtual int get_user_by_email(const DoutPrefixProvider* dpp, const + std::string& email, optional_yield y, + std::unique_ptr* user) override; + virtual int get_user_by_swift(const DoutPrefixProvider* dpp, const + std::string& user_str, optional_yield y, + std::unique_ptr* user) override; + virtual std::unique_ptr get_object(const rgw_obj_key& k) override; + virtual int get_bucket(User* u, const RGWBucketInfo& i, + std::unique_ptr* bucket) override; + virtual int get_bucket(const DoutPrefixProvider* dpp, User* u, const + rgw_bucket& b, std::unique_ptr* bucket, + optional_yield y) override; + virtual int get_bucket(const DoutPrefixProvider* dpp, User* u, const + std::string& tenant, const std::string& name, + std::unique_ptr* bucket, optional_yield y) override; + virtual std::string zone_unique_trans_id(const uint64_t unique_num) override; + + virtual std::unique_ptr get_append_writer(const DoutPrefixProvider *dpp, + optional_yield y, + rgw::sal::Object* _head_obj, + const rgw_user& owner, + const rgw_placement_rule *ptail_placement_rule, + const std::string& unique_tag, + uint64_t position, + uint64_t *cur_accounted_size) override; + virtual std::unique_ptr get_atomic_writer(const DoutPrefixProvider *dpp, + optional_yield y, + rgw::sal::Object* _head_obj, + const rgw_user& owner, + const rgw_placement_rule *ptail_placement_rule, + uint64_t olh_epoch, + const std::string& unique_tag) override; + + virtual void finalize(void) override; + virtual void register_admin_apis(RGWRESTMgr* mgr) override; + + virtual std::unique_ptr get_notification(rgw::sal::Object* obj, + rgw::sal::Object* src_obj, struct req_state* s, + rgw::notify::EventType event_type, optional_yield y, + const std::string* object_name=nullptr) override; + + virtual std::unique_ptr get_notification( + const DoutPrefixProvider* dpp, + rgw::sal::Object* obj, + rgw::sal::Object* src_obj, + rgw::notify::EventType event_type, + rgw::sal::Bucket* _bucket, + std::string& _user_id, + std::string& _user_tenant, + std::string& _req_id, + optional_yield y) override; + + /* Internal APIs */ + int get_root_fd() { return root_fd; } + const std::string& get_base_path() const { return base_path; } + BucketCache* get_bucket_cache() { return bucket_cache.get(); } + + int close(); + + /* called by BucketCache layer when a new object is discovered + * by inotify or similar */ + int mint_listing_entry( + const std::string& bucket, rgw_bucket_dir_entry& bde /* OUT */); + +}; + +class POSIXUser : public FilterUser { +private: + POSIXDriver* driver; + +public: + POSIXUser(std::unique_ptr _next, POSIXDriver* _driver) : + FilterUser(std::move(_next)), + driver(_driver) {} + virtual ~POSIXUser() = default; + + virtual int list_buckets(const DoutPrefixProvider* dpp, + const std::string& marker, const std::string& end_marker, + uint64_t max, bool need_stats, BucketList& buckets, + optional_yield y) override; + virtual int create_bucket(const DoutPrefixProvider* dpp, + const rgw_bucket& b, + const std::string& zonegroup_id, + rgw_placement_rule& placement_rule, + std::string& swift_ver_location, + const RGWQuotaInfo* pquota_info, + const RGWAccessControlPolicy& policy, + Attrs& attrs, + RGWBucketInfo& info, + obj_version& ep_objv, + bool exclusive, + bool obj_lock_enabled, + bool* existed, + req_info& req_info, + std::unique_ptr* bucket, + optional_yield y) override; + virtual Attrs& get_attrs() override { return next->get_attrs(); } + virtual void set_attrs(Attrs& _attrs) override { next->set_attrs(_attrs); } + virtual int read_attrs(const DoutPrefixProvider* dpp, optional_yield y) override; + virtual int merge_and_store_attrs(const DoutPrefixProvider* dpp, Attrs& + new_attrs, optional_yield y) override; + virtual int load_user(const DoutPrefixProvider* dpp, optional_yield y) override; + virtual int store_user(const DoutPrefixProvider* dpp, optional_yield y, bool + exclusive, RGWUserInfo* old_info = nullptr) override; + virtual int remove_user(const DoutPrefixProvider* dpp, optional_yield y) override; +}; + +class POSIXBucket : public StoreBucket { +private: + POSIXDriver* driver; + int parent_fd{-1}; + int dir_fd{-1}; + struct statx stx; + bool stat_done{false}; + RGWAccessControlPolicy acls; + std::optional ns; + +public: + POSIXBucket(POSIXDriver *_dr, int _p_fd, const rgw_bucket& _b, User* _u, std::optional _ns = std::nullopt) + : StoreBucket(_b, _u), + driver(_dr), + parent_fd(_p_fd), + acls(), + ns(_ns) + { } + + POSIXBucket(POSIXDriver *_dr, int _p_fd, const RGWBucketEnt& _e, User* _u) + : StoreBucket(_e, _u), + driver(_dr), + parent_fd(_p_fd), + acls() + { } + + POSIXBucket(POSIXDriver *_dr, int _p_fd, const RGWBucketInfo& _i, User* _u) + : StoreBucket(_i, _u), + driver(_dr), + parent_fd(_p_fd), + acls() + { } + + POSIXBucket(const POSIXBucket& _b) : + StoreBucket(_b), + driver(_b.driver), + parent_fd(_b.parent_fd), + /* Don't want to copy dir_fd */ + stx(_b.stx), + stat_done(_b.stat_done), + acls(_b.acls), + ns(_b.ns) {} + + virtual ~POSIXBucket() { close(); } + + virtual std::unique_ptr get_object(const rgw_obj_key& key) override; + virtual int list(const DoutPrefixProvider* dpp, ListParams&, int, + ListResults&, optional_yield y) override; + virtual int merge_and_store_attrs(const DoutPrefixProvider* dpp, + Attrs& new_attrs, optional_yield y) override; + virtual int remove_bucket(const DoutPrefixProvider* dpp, bool delete_children, + bool forward_to_master, req_info* req_info, + optional_yield y) override; + virtual int remove_bucket_bypass_gc(int concurrent_max, + bool keep_index_consistent, + optional_yield y, + const DoutPrefixProvider *dpp) override; + virtual int load_bucket(const DoutPrefixProvider* dpp, optional_yield y, + bool get_stats = false) override; + virtual RGWAccessControlPolicy& get_acl(void) override { return acls; } + virtual int set_acl(const DoutPrefixProvider* dpp, RGWAccessControlPolicy& acl, + optional_yield y) override; + virtual int read_stats(const DoutPrefixProvider *dpp, + const bucket_index_layout_generation& idx_layout, + int shard_id, std::string* bucket_ver, std::string* master_ver, + std::map& stats, + std::string* max_marker = nullptr, + bool* syncstopped = nullptr) override; + virtual int read_stats_async(const DoutPrefixProvider *dpp, + const bucket_index_layout_generation& idx_layout, + int shard_id, RGWGetBucketStats_CB* ctx) override; + virtual int sync_user_stats(const DoutPrefixProvider *dpp, optional_yield y) override; + virtual int update_container_stats(const DoutPrefixProvider* dpp, optional_yield y) override; + virtual int check_bucket_shards(const DoutPrefixProvider* dpp, optional_yield y) override; + virtual int chown(const DoutPrefixProvider* dpp, User& new_user, optional_yield y) override; + virtual int put_info(const DoutPrefixProvider* dpp, bool exclusive, + ceph::real_time mtime, optional_yield y) override; + virtual int check_empty(const DoutPrefixProvider* dpp, optional_yield y) override; + virtual int check_quota(const DoutPrefixProvider *dpp, RGWQuota& quota, uint64_t obj_size, optional_yield y, bool check_size_only = false) override; + virtual int try_refresh_info(const DoutPrefixProvider* dpp, ceph::real_time* pmtime, optional_yield y) override; + virtual int read_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, + uint64_t end_epoch, uint32_t max_entries, bool* is_truncated, + RGWUsageIter& usage_iter, std::map& usage) override; + virtual int trim_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch, optional_yield y) override; + virtual int remove_objs_from_index(const DoutPrefixProvider *dpp, std::list& objs_to_unlink) override; + virtual int check_index(const DoutPrefixProvider *dpp, std::map& existing_stats, std::map& calculated_stats) override; + virtual int rebuild_index(const DoutPrefixProvider *dpp) override; + virtual int set_tag_timeout(const DoutPrefixProvider *dpp, uint64_t timeout) override; + virtual int purge_instance(const DoutPrefixProvider* dpp, optional_yield y) override; + + virtual std::unique_ptr clone() override { + return std::make_unique(*this); + } + + virtual std::unique_ptr get_multipart_upload( + const std::string& oid, + std::optional upload_id=std::nullopt, + ACLOwner owner={}, ceph::real_time mtime=real_clock::now()) override; + virtual int list_multiparts(const DoutPrefixProvider *dpp, + const std::string& prefix, + std::string& marker, + const std::string& delim, + const int& max_uploads, + std::vector>& uploads, + std::map *common_prefixes, + bool *is_truncated, optional_yield y) override; + virtual int abort_multiparts(const DoutPrefixProvider* dpp, + CephContext* cct, optional_yield y) override; + + /* Internal APIs */ + int create(const DoutPrefixProvider *dpp, optional_yield y, bool* existed); + void set_stat(struct statx _stx) { stx = _stx; stat_done = true; } + int get_dir_fd(const DoutPrefixProvider *dpp) { open(dpp); return dir_fd; } + /* TODO dang Escape the bucket name for file use */ + std::string get_fname(); + int get_shadow_bucket(const DoutPrefixProvider* dpp, optional_yield y, + const std::string& ns, const std::string& tenant, + const std::string& name, bool create, + std::unique_ptr* shadow); + template + int for_each(const DoutPrefixProvider* dpp, const F& func); + int open(const DoutPrefixProvider *dpp); + int close(); + int rename(const DoutPrefixProvider* dpp, optional_yield y, Object* target_obj); + int copy(const DoutPrefixProvider *dpp, optional_yield y, POSIXBucket* db, POSIXObject* dobj); + + /* integration w/bucket listing cache */ + using fill_cache_cb_t = file::listing::fill_cache_cb_t; + + /* enumerate all entries by callback, in any order */ + int fill_cache(const DoutPrefixProvider* dpp, optional_yield y, fill_cache_cb_t cb); + +private: + int stat(const DoutPrefixProvider *dpp); + int write_attrs(const DoutPrefixProvider *dpp, optional_yield y); +}; /* POSIXBucket */ + +class POSIXObject : public StoreObject { +private: + POSIXDriver* driver; + RGWAccessControlPolicy acls; + int obj_fd{-1}; + struct statx stx; + bool stat_done{false}; + std::unique_ptr shadow; + std::string temp_fname; + std::map parts; + +public: + struct POSIXReadOp : ReadOp { + POSIXObject* source; + + POSIXReadOp(POSIXObject* _source) : + source(_source) {} + virtual ~POSIXReadOp() = default; + + virtual int prepare(optional_yield y, const DoutPrefixProvider* dpp) override; + virtual int read(int64_t ofs, int64_t left, bufferlist& bl, optional_yield y, + const DoutPrefixProvider* dpp) override; + virtual int iterate(const DoutPrefixProvider* dpp, int64_t ofs, int64_t end, + RGWGetDataCB* cb, optional_yield y) override; + virtual int get_attr(const DoutPrefixProvider* dpp, const char* name, + bufferlist& dest, optional_yield y) override; + }; + + struct POSIXDeleteOp : DeleteOp { + POSIXObject* source; + + POSIXDeleteOp(POSIXObject* _source) : + source(_source) {} + virtual ~POSIXDeleteOp() = default; + + virtual int delete_obj(const DoutPrefixProvider* dpp, optional_yield y) override; + }; + + POSIXObject(POSIXDriver *_dr, const rgw_obj_key& _k) + : StoreObject(_k), + driver(_dr), + acls() {} + + POSIXObject(POSIXDriver* _driver, const rgw_obj_key& _k, Bucket* _b) : + StoreObject(_k, _b), + driver(_driver), + acls() {} + + POSIXObject(const POSIXObject& _o) : + StoreObject(_o), + driver(_o.driver) {} + + virtual ~POSIXObject() { close(); } + + virtual int delete_object(const DoutPrefixProvider* dpp, + optional_yield y, + bool prevent_versioning = false) override; + virtual int copy_object(User* user, + req_info* info, const rgw_zone_id& source_zone, + rgw::sal::Object* dest_object, rgw::sal::Bucket* dest_bucket, + rgw::sal::Bucket* src_bucket, + const rgw_placement_rule& dest_placement, + ceph::real_time* src_mtime, ceph::real_time* mtime, + const ceph::real_time* mod_ptr, const ceph::real_time* unmod_ptr, + bool high_precision_time, + const char* if_match, const char* if_nomatch, + AttrsMod attrs_mod, bool copy_if_newer, Attrs& attrs, + RGWObjCategory category, uint64_t olh_epoch, + boost::optional delete_at, + std::string* version_id, std::string* tag, std::string* etag, + void (*progress_cb)(off_t, void *), void* progress_data, + const DoutPrefixProvider* dpp, optional_yield y) override; + virtual RGWAccessControlPolicy& get_acl(void) override { return acls; } + virtual int set_acl(const RGWAccessControlPolicy& acl) override { acls = acl; return 0; } + virtual int get_obj_state(const DoutPrefixProvider* dpp, RGWObjState **state, optional_yield y, bool follow_olh = true) override; + virtual int set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, + Attrs* delattrs, optional_yield y) override; + virtual int get_obj_attrs(optional_yield y, const DoutPrefixProvider* dpp, + rgw_obj* target_obj = NULL) override; + virtual int modify_obj_attrs(const char* attr_name, bufferlist& attr_val, + optional_yield y, const DoutPrefixProvider* dpp) override; + virtual int delete_obj_attrs(const DoutPrefixProvider* dpp, const char* attr_name, + optional_yield y) override; + virtual bool is_expired() override; + virtual void gen_rand_obj_instance_name() override; + virtual std::unique_ptr get_serializer(const DoutPrefixProvider *dpp, + const std::string& lock_name) override; + virtual int transition(Bucket* bucket, + const rgw_placement_rule& placement_rule, + const real_time& mtime, + uint64_t olh_epoch, + const DoutPrefixProvider* dpp, + optional_yield y) override; + virtual int transition_to_cloud(Bucket* bucket, + rgw::sal::PlacementTier* tier, + rgw_bucket_dir_entry& o, + std::set& cloud_targets, + CephContext* cct, + bool update_object, + const DoutPrefixProvider* dpp, + optional_yield y) override; + virtual bool placement_rules_match(rgw_placement_rule& r1, rgw_placement_rule& r2) override; + virtual int dump_obj_layout(const DoutPrefixProvider *dpp, optional_yield y, Formatter* f) override; + virtual int swift_versioning_restore(bool& restored, + const DoutPrefixProvider* dpp, optional_yield y) override; + virtual int swift_versioning_copy(const DoutPrefixProvider* dpp, + optional_yield y) override; + virtual std::unique_ptr get_read_op() override; + virtual std::unique_ptr get_delete_op() override; + virtual int omap_get_vals_by_keys(const DoutPrefixProvider *dpp, const std::string& oid, + const std::set& keys, + Attrs* vals) override; + virtual int omap_set_val_by_key(const DoutPrefixProvider *dpp, const std::string& key, + bufferlist& val, bool must_exist, optional_yield y) override; + virtual int chown(User& new_user, const DoutPrefixProvider* dpp, optional_yield y) override; + virtual std::unique_ptr clone() override { + return std::unique_ptr(new POSIXObject(*this)); + } + + int open(const DoutPrefixProvider *dpp, bool create, bool temp_file = false); + int close(); + int write(int64_t ofs, bufferlist& bl, const DoutPrefixProvider* dpp, optional_yield y); + int write_attr(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key, bufferlist& value); + int link_temp_file(const DoutPrefixProvider* dpp, optional_yield y); + void gen_temp_fname(); + /* TODO dang Escape the object name for file use */ + const std::string get_fname(); + bool exists(const DoutPrefixProvider* dpp) { stat(dpp); return state.exists; } + int get_owner(const DoutPrefixProvider *dpp, optional_yield y, std::unique_ptr *owner); + int copy(const DoutPrefixProvider *dpp, optional_yield y, POSIXBucket *sb, + POSIXBucket *db, POSIXObject *dobj); + int fill_bde(const DoutPrefixProvider *dpp, optional_yield y, rgw_bucket_dir_entry &bde); + +protected: + int read(int64_t ofs, int64_t end, bufferlist& bl, const DoutPrefixProvider* dpp, optional_yield y); + int generate_attrs(const DoutPrefixProvider* dpp, optional_yield y); + int get_fd() { return obj_fd; }; +private: + const std::string get_temp_fname(); + int stat(const DoutPrefixProvider *dpp); + int generate_mp_etag(const DoutPrefixProvider* dpp, optional_yield y); + int generate_etag(const DoutPrefixProvider* dpp, optional_yield y); +}; + +struct POSIXMPObj { + std::string oid; + std::string upload_id; + ACLOwner owner; + multipart_upload_info upload_info; + std::string meta; + + POSIXMPObj(POSIXDriver* driver, const std::string& _oid, + std::optional _upload_id, ACLOwner& _owner) { + if (_upload_id && !_upload_id->empty()) { + init(_oid, *_upload_id, _owner); + } else if (!from_meta(_oid, _owner)) { + init_gen(driver, _oid, _owner); + } + } + void init(const std::string& _oid, const std::string& _upload_id, ACLOwner& _owner) { + if (_oid.empty()) { + clear(); + return; + } + oid = _oid; + upload_id = _upload_id; + owner = _owner; + meta = oid; + if (!upload_id.empty()) + meta += "." + upload_id; + } + void init_gen(POSIXDriver* driver, const std::string& _oid, ACLOwner& _owner); + bool from_meta(const std::string& meta, ACLOwner& _owner) { + int end_pos = meta.length(); + int mid_pos = meta.rfind('.', end_pos - 1); // . + if (mid_pos < 0) + return false; + oid = meta.substr(0, mid_pos); + upload_id = meta.substr(mid_pos + 1, end_pos - mid_pos - 1); + init(oid, upload_id, _owner); + return true; + } + void clear() { + oid = ""; + meta = ""; + upload_id = ""; + } + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(oid, bl); + encode(upload_id, bl); + encode(owner, bl); + encode(upload_info, bl); + encode(meta, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::const_iterator& bl) { + DECODE_START(1, bl); + decode(oid, bl); + decode(upload_id, bl); + decode(owner, bl); + decode(upload_info, bl); + decode(meta, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(POSIXMPObj) + +struct POSIXUploadPartInfo { + uint32_t num{0}; + std::string etag; + ceph::real_time mtime; + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(num, bl); + encode(etag, bl); + encode(mtime, bl); + ENCODE_FINISH(bl); + } + void decode(bufferlist::const_iterator& bl) { + DECODE_START(1, bl); + decode(num, bl); + decode(etag, bl); + decode(mtime, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(POSIXUploadPartInfo) + +class POSIXMultipartUpload; + +class POSIXMultipartPart : public StoreMultipartPart { +protected: + POSIXUploadPartInfo info; + POSIXMultipartUpload* upload; + std::unique_ptr shadow; + +public: + POSIXMultipartPart(POSIXMultipartUpload* _upload) : + upload(_upload) {} + virtual ~POSIXMultipartPart() = default; + + virtual uint32_t get_num() { return info.num; } + virtual uint64_t get_size() { return shadow->get_obj_size(); } + virtual const std::string& get_etag() { return info.etag; } + virtual ceph::real_time& get_mtime() { return info.mtime; } + + int load(const DoutPrefixProvider* dpp, optional_yield y, POSIXDriver* driver, rgw_obj_key& key); + + friend class POSIXMultipartUpload; +}; + +class POSIXMultipartUpload : public StoreMultipartUpload { +protected: + POSIXDriver* driver; + POSIXMPObj mp_obj; + ceph::real_time mtime; + std::unique_ptr shadow; + +public: + POSIXMultipartUpload(POSIXDriver* _driver, Bucket* _bucket, const std::string& _oid, + std::optional _upload_id, ACLOwner _owner, + ceph::real_time _mtime) : + StoreMultipartUpload(_bucket), driver(_driver), + mp_obj(driver, _oid, _upload_id, _owner), mtime(_mtime) {} + virtual ~POSIXMultipartUpload() = default; + + virtual const std::string& get_meta() const override { return mp_obj.meta; } + virtual const std::string& get_key() const override { return mp_obj.oid; } + virtual const std::string& get_upload_id() const override { return mp_obj.upload_id; } + virtual const ACLOwner& get_owner() const override { return mp_obj.owner; } + virtual ceph::real_time& get_mtime() override { return mtime; } + virtual std::unique_ptr get_meta_obj() override; + + virtual int init(const DoutPrefixProvider* dpp, optional_yield y, ACLOwner& owner, rgw_placement_rule& dest_placement, rgw::sal::Attrs& attrs) override; + virtual int list_parts(const DoutPrefixProvider* dpp, CephContext* cct, + int num_parts, int marker, + int* next_marker, bool* truncated, optional_yield y, + bool assume_unsorted = false) override; + virtual int abort(const DoutPrefixProvider* dpp, CephContext* cct, optional_yield y) override; + virtual int complete(const DoutPrefixProvider* dpp, + optional_yield y, CephContext* cct, + std::map& part_etags, + std::list& remove_objs, + uint64_t& accounted_size, bool& compressed, + RGWCompressionInfo& cs_info, off_t& ofs, + std::string& tag, ACLOwner& owner, + uint64_t olh_epoch, + rgw::sal::Object* target_obj) override; + virtual int get_info(const DoutPrefixProvider *dpp, optional_yield y, + rgw_placement_rule** rule, rgw::sal::Attrs* attrs) override; + + virtual std::unique_ptr get_writer(const DoutPrefixProvider *dpp, + optional_yield y, + rgw::sal::Object* _head_obj, + const rgw_user& owner, + const rgw_placement_rule *ptail_placement_rule, + uint64_t part_num, + const std::string& part_num_str) override; + + POSIXBucket* get_shadow() { return shadow.get(); } +private: + int load(bool create=false); +}; + +class POSIXAtomicWriter : public StoreWriter { +private: + POSIXDriver* driver; + const rgw_user& owner; + const rgw_placement_rule *ptail_placement_rule; + uint64_t olh_epoch; + const std::string& unique_tag; + POSIXObject obj; + +public: + POSIXAtomicWriter(const DoutPrefixProvider *dpp, + optional_yield y, + rgw::sal::Object* _head_obj, + POSIXDriver* _driver, + const rgw_user& _owner, + const rgw_placement_rule *_ptail_placement_rule, + uint64_t _olh_epoch, + const std::string& _unique_tag) : + StoreWriter(dpp, y), + driver(_driver), + owner(_owner), + ptail_placement_rule(_ptail_placement_rule), + olh_epoch(_olh_epoch), + unique_tag(_unique_tag), + obj(_driver, _head_obj->get_key(), _head_obj->get_bucket()) {} + virtual ~POSIXAtomicWriter() = default; + + virtual int prepare(optional_yield y) override; + virtual int process(bufferlist&& data, uint64_t offset) override; + virtual int complete(size_t accounted_size, const std::string& etag, + ceph::real_time *mtime, ceph::real_time set_mtime, + std::map& attrs, + ceph::real_time delete_at, + const char *if_match, const char *if_nomatch, + const std::string *user_data, + rgw_zone_set *zones_trace, bool *canceled, + optional_yield y) override; +}; + +class POSIXMultipartWriter : public StoreWriter { +private: + POSIXDriver* driver; + const rgw_user& owner; + const rgw_placement_rule *ptail_placement_rule; + uint64_t part_num; + std::unique_ptr shadow_bucket; + std::unique_ptr obj; + +public: + POSIXMultipartWriter(const DoutPrefixProvider *dpp, + optional_yield y, + std::unique_ptr _shadow_bucket, + rgw_obj_key& _key, + POSIXDriver* _driver, + const rgw_user& _owner, + const rgw_placement_rule *_ptail_placement_rule, + uint64_t _part_num) : + StoreWriter(dpp, y), + driver(_driver), + owner(_owner), + ptail_placement_rule(_ptail_placement_rule), + part_num(_part_num), + shadow_bucket(std::move(_shadow_bucket)), + obj(std::make_unique(_driver, _key, shadow_bucket.get())) {} + virtual ~POSIXMultipartWriter() = default; + + virtual int prepare(optional_yield y) override; + virtual int process(bufferlist&& data, uint64_t offset) override; + virtual int complete(size_t accounted_size, const std::string& etag, + ceph::real_time *mtime, ceph::real_time set_mtime, + std::map& attrs, + ceph::real_time delete_at, + const char *if_match, const char *if_nomatch, + const std::string *user_data, + rgw_zone_set *zones_trace, bool *canceled, + optional_yield y) override; + +}; + +class MPPOSIXSerializer : public StoreMPSerializer { + POSIXObject* obj; + +public: + MPPOSIXSerializer(const DoutPrefixProvider *dpp, POSIXDriver* driver, POSIXObject* _obj, const std::string& lock_name) : obj(_obj) {} + + virtual int try_lock(const DoutPrefixProvider *dpp, utime_t dur, optional_yield y) override; + virtual int unlock() override { return 0; } +}; + +} } // namespace rgw::sal diff --git a/src/rgw/driver/posix/unordered_dense.h b/src/rgw/driver/posix/unordered_dense.h new file mode 100644 index 00000000000..faad051d18f --- /dev/null +++ b/src/rgw/driver/posix/unordered_dense.h @@ -0,0 +1,1584 @@ +///////////////////////// ankerl::unordered_dense::{map, set} ///////////////////////// + +// A fast & densely stored hashmap and hashset based on robin-hood backward shift deletion. +// Version 3.1.0 +// https://github.com/martinus/unordered_dense +// +// Licensed under the MIT License . +// SPDX-License-Identifier: MIT +// Copyright (c) 2022-2023 Martin Leitner-Ankerl +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +#ifndef ANKERL_UNORDERED_DENSE_H +#define ANKERL_UNORDERED_DENSE_H + +// see https://semver.org/spec/v2.0.0.html +#define ANKERL_UNORDERED_DENSE_VERSION_MAJOR 3 // NOLINT(cppcoreguidelines-macro-usage) incompatible API changes +#define ANKERL_UNORDERED_DENSE_VERSION_MINOR 1 // NOLINT(cppcoreguidelines-macro-usage) backwards compatible functionality +#define ANKERL_UNORDERED_DENSE_VERSION_PATCH 0 // NOLINT(cppcoreguidelines-macro-usage) backwards compatible bug fixes + +// API versioning with inline namespace, see https://www.foonathan.net/2018/11/inline-namespaces/ +#define ANKERL_UNORDERED_DENSE_VERSION_CONCAT1(major, minor, patch) v##major##_##minor##_##patch +#define ANKERL_UNORDERED_DENSE_VERSION_CONCAT(major, minor, patch) ANKERL_UNORDERED_DENSE_VERSION_CONCAT1(major, minor, patch) +#define ANKERL_UNORDERED_DENSE_NAMESPACE \ + ANKERL_UNORDERED_DENSE_VERSION_CONCAT( \ + ANKERL_UNORDERED_DENSE_VERSION_MAJOR, ANKERL_UNORDERED_DENSE_VERSION_MINOR, ANKERL_UNORDERED_DENSE_VERSION_PATCH) + +#if defined(_MSVC_LANG) +# define ANKERL_UNORDERED_DENSE_CPP_VERSION _MSVC_LANG +#else +# define ANKERL_UNORDERED_DENSE_CPP_VERSION __cplusplus +#endif + +#if defined(__GNUC__) +// NOLINTNEXTLINE(cppcoreguidelines-macro-usage) +# define ANKERL_UNORDERED_DENSE_PACK(decl) decl __attribute__((__packed__)) +#elif defined(_MSC_VER) +// NOLINTNEXTLINE(cppcoreguidelines-macro-usage) +# define ANKERL_UNORDERED_DENSE_PACK(decl) __pragma(pack(push, 1)) decl __pragma(pack(pop)) +#endif + +// exceptions +#if defined(__cpp_exceptions) || defined(__EXCEPTIONS) || defined(_CPPUNWIND) +# define ANKERL_UNORDERED_DENSE_HAS_EXCEPTIONS() 1 +#else +# define ANKERL_UNORDERED_DENSE_HAS_EXCEPTIONS() 0 +#endif +#ifdef _MSC_VER +# define ANKERL_UNORDERED_DENSE_NOINLINE __declspec(noinline) +#else +# define ANKERL_UNORDERED_DENSE_NOINLINE __attribute__((noinline)) +#endif + +#if ANKERL_UNORDERED_DENSE_CPP_VERSION < 201703L +# error ankerl::unordered_dense requires C++17 or higher +#else +# include // for array +# include // for uint64_t, uint32_t, uint8_t, UINT64_C +# include // for size_t, memcpy, memset +# include // for equal_to, hash +# include // for initializer_list +# include // for pair, distance +# include // for numeric_limits +# include // for allocator, allocator_traits, shared_ptr +# include // for out_of_range +# include // for basic_string +# include // for basic_string_view, hash +# include // for forward_as_tuple +# include // for enable_if_t, declval, conditional_t, ena... +# include // for forward, exchange, pair, as_const, piece... +# include // for vector +# if ANKERL_UNORDERED_DENSE_HAS_EXCEPTIONS() == 0 +# include // for abort +# endif + +# define ANKERL_UNORDERED_DENSE_PMR 0 // NOLINT(cppcoreguidelines-macro-usage) +# if defined(__has_include) +# if __has_include() +# undef ANKERL_UNORDERED_DENSE_PMR +# define ANKERL_UNORDERED_DENSE_PMR 1 // NOLINT(cppcoreguidelines-macro-usage) +# define ANKERL_UNORDERED_DENSE_PMR_ALLOCATOR \ + std::pmr::polymorphic_allocator // NOLINT(cppcoreguidelines-macro-usage) +# include // for polymorphic_allocator +# elif __has_include() +# undef ANKERL_UNORDERED_DENSE_PMR +# define ANKERL_UNORDERED_DENSE_PMR 1 // NOLINT(cppcoreguidelines-macro-usage) +# define ANKERL_UNORDERED_DENSE_PMR_ALLOCATOR \ + std::experimental::pmr::polymorphic_allocator // NOLINT(cppcoreguidelines-macro-usage) +# include // for polymorphic_allocator +# endif +# endif + +# if defined(_MSC_VER) && defined(_M_X64) +# include +# pragma intrinsic(_umul128) +# endif + +# if defined(__GNUC__) || defined(__INTEL_COMPILER) || defined(__clang__) +# define ANKERL_UNORDERED_DENSE_LIKELY(x) __builtin_expect(x, 1) // NOLINT(cppcoreguidelines-macro-usage) +# define ANKERL_UNORDERED_DENSE_UNLIKELY(x) __builtin_expect(x, 0) // NOLINT(cppcoreguidelines-macro-usage) +# else +# define ANKERL_UNORDERED_DENSE_LIKELY(x) (x) // NOLINT(cppcoreguidelines-macro-usage) +# define ANKERL_UNORDERED_DENSE_UNLIKELY(x) (x) // NOLINT(cppcoreguidelines-macro-usage) +# endif + +namespace ankerl::unordered_dense { +inline namespace ANKERL_UNORDERED_DENSE_NAMESPACE { + +namespace detail { + +# if ANKERL_UNORDERED_DENSE_HAS_EXCEPTIONS() + +// make sure this is not inlined as it is slow and dramatically enlarges code, thus making other +// inlinings more difficult. Throws are also generally the slow path. +[[noreturn]] inline ANKERL_UNORDERED_DENSE_NOINLINE void on_error_key_not_found() { + throw std::out_of_range("ankerl::unordered_dense::map::at(): key not found"); +} +[[noreturn]] inline ANKERL_UNORDERED_DENSE_NOINLINE void on_error_bucket_overflow() { + throw std::overflow_error("ankerl::unordered_dense: reached max bucket size, cannot increase size"); +} +[[noreturn]] inline ANKERL_UNORDERED_DENSE_NOINLINE void on_error_too_many_elements() { + throw std::out_of_range("ankerl::unordered_dense::map::replace(): too many elements"); +} + +# else + +[[noreturn]] inline void on_error_key_not_found() { + abort(); +} +[[noreturn]] inline void on_error_bucket_overflow() { + abort(); +} +[[noreturn]] inline void on_error_too_many_elements() { + abort(); +} + +# endif + +} // namespace detail + +// hash /////////////////////////////////////////////////////////////////////// + +// This is a stripped-down implementation of wyhash: https://github.com/wangyi-fudan/wyhash +// No big-endian support (because different values on different machines don't matter), +// hardcodes seed and the secret, reformattes the code, and clang-tidy fixes. +namespace detail::wyhash { + +static inline void mum(uint64_t* a, uint64_t* b) { +# if defined(__SIZEOF_INT128__) + __uint128_t r = *a; + r *= *b; + *a = static_cast(r); + *b = static_cast(r >> 64U); +# elif defined(_MSC_VER) && defined(_M_X64) + *a = _umul128(*a, *b, b); +# else + uint64_t ha = *a >> 32U; + uint64_t hb = *b >> 32U; + uint64_t la = static_cast(*a); + uint64_t lb = static_cast(*b); + uint64_t hi{}; + uint64_t lo{}; + uint64_t rh = ha * hb; + uint64_t rm0 = ha * lb; + uint64_t rm1 = hb * la; + uint64_t rl = la * lb; + uint64_t t = rl + (rm0 << 32U); + auto c = static_cast(t < rl); + lo = t + (rm1 << 32U); + c += static_cast(lo < t); + hi = rh + (rm0 >> 32U) + (rm1 >> 32U) + c; + *a = lo; + *b = hi; +# endif +} + +// multiply and xor mix function, aka MUM +[[nodiscard]] static inline auto mix(uint64_t a, uint64_t b) -> uint64_t { + mum(&a, &b); + return a ^ b; +} + +// read functions. WARNING: we don't care about endianness, so results are different on big endian! +[[nodiscard]] static inline auto r8(const uint8_t* p) -> uint64_t { + uint64_t v{}; + std::memcpy(&v, p, 8U); + return v; +} + +[[nodiscard]] static inline auto r4(const uint8_t* p) -> uint64_t { + uint32_t v{}; + std::memcpy(&v, p, 4); + return v; +} + +// reads 1, 2, or 3 bytes +[[nodiscard]] static inline auto r3(const uint8_t* p, size_t k) -> uint64_t { + return (static_cast(p[0]) << 16U) | (static_cast(p[k >> 1U]) << 8U) | p[k - 1]; +} + +[[maybe_unused]] [[nodiscard]] static inline auto hash(void const* key, size_t len) -> uint64_t { + static constexpr auto secret = std::array{UINT64_C(0xa0761d6478bd642f), + UINT64_C(0xe7037ed1a0b428db), + UINT64_C(0x8ebc6af09c88c6e3), + UINT64_C(0x589965cc75374cc3)}; + + auto const* p = static_cast(key); + uint64_t seed = secret[0]; + uint64_t a{}; + uint64_t b{}; + if (ANKERL_UNORDERED_DENSE_LIKELY(len <= 16)) { + if (ANKERL_UNORDERED_DENSE_LIKELY(len >= 4)) { + a = (r4(p) << 32U) | r4(p + ((len >> 3U) << 2U)); + b = (r4(p + len - 4) << 32U) | r4(p + len - 4 - ((len >> 3U) << 2U)); + } else if (ANKERL_UNORDERED_DENSE_LIKELY(len > 0)) { + a = r3(p, len); + b = 0; + } else { + a = 0; + b = 0; + } + } else { + size_t i = len; + if (ANKERL_UNORDERED_DENSE_UNLIKELY(i > 48)) { + uint64_t see1 = seed; + uint64_t see2 = seed; + do { + seed = mix(r8(p) ^ secret[1], r8(p + 8) ^ seed); + see1 = mix(r8(p + 16) ^ secret[2], r8(p + 24) ^ see1); + see2 = mix(r8(p + 32) ^ secret[3], r8(p + 40) ^ see2); + p += 48; + i -= 48; + } while (ANKERL_UNORDERED_DENSE_LIKELY(i > 48)); + seed ^= see1 ^ see2; + } + while (ANKERL_UNORDERED_DENSE_UNLIKELY(i > 16)) { + seed = mix(r8(p) ^ secret[1], r8(p + 8) ^ seed); + i -= 16; + p += 16; + } + a = r8(p + i - 16); + b = r8(p + i - 8); + } + + return mix(secret[1] ^ len, mix(a ^ secret[1], b ^ seed)); +} + +[[nodiscard]] static inline auto hash(uint64_t x) -> uint64_t { + return detail::wyhash::mix(x, UINT64_C(0x9E3779B97F4A7C15)); +} + +} // namespace detail::wyhash + +template +struct hash { + auto operator()(T const& obj) const noexcept(noexcept(std::declval>().operator()(std::declval()))) + -> uint64_t { + return std::hash{}(obj); + } +}; + +template +struct hash> { + using is_avalanching = void; + auto operator()(std::basic_string const& str) const noexcept -> uint64_t { + return detail::wyhash::hash(str.data(), sizeof(CharT) * str.size()); + } +}; + +template +struct hash> { + using is_avalanching = void; + auto operator()(std::basic_string_view const& sv) const noexcept -> uint64_t { + return detail::wyhash::hash(sv.data(), sizeof(CharT) * sv.size()); + } +}; + +template +struct hash { + using is_avalanching = void; + auto operator()(T* ptr) const noexcept -> uint64_t { + // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) + return detail::wyhash::hash(reinterpret_cast(ptr)); + } +}; + +template +struct hash> { + using is_avalanching = void; + auto operator()(std::unique_ptr const& ptr) const noexcept -> uint64_t { + // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) + return detail::wyhash::hash(reinterpret_cast(ptr.get())); + } +}; + +template +struct hash> { + using is_avalanching = void; + auto operator()(std::shared_ptr const& ptr) const noexcept -> uint64_t { + // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) + return detail::wyhash::hash(reinterpret_cast(ptr.get())); + } +}; + +template +struct hash::value>::type> { + using is_avalanching = void; + auto operator()(Enum e) const noexcept -> uint64_t { + using underlying = typename std::underlying_type_t; + return detail::wyhash::hash(static_cast(e)); + } +}; + +// NOLINTNEXTLINE(cppcoreguidelines-macro-usage) +# define ANKERL_UNORDERED_DENSE_HASH_STATICCAST(T) \ + template <> \ + struct hash { \ + using is_avalanching = void; \ + auto operator()(T const& obj) const noexcept -> uint64_t { \ + return detail::wyhash::hash(static_cast(obj)); \ + } \ + } + +# if defined(__GNUC__) && !defined(__clang__) +# pragma GCC diagnostic push +# pragma GCC diagnostic ignored "-Wuseless-cast" +# endif +// see https://en.cppreference.com/w/cpp/utility/hash +ANKERL_UNORDERED_DENSE_HASH_STATICCAST(bool); +ANKERL_UNORDERED_DENSE_HASH_STATICCAST(char); +ANKERL_UNORDERED_DENSE_HASH_STATICCAST(signed char); +ANKERL_UNORDERED_DENSE_HASH_STATICCAST(unsigned char); +# if ANKERL_UNORDERED_DENSE_CPP_VERSION >= 202002L +ANKERL_UNORDERED_DENSE_HASH_STATICCAST(char8_t); +# endif +ANKERL_UNORDERED_DENSE_HASH_STATICCAST(char16_t); +ANKERL_UNORDERED_DENSE_HASH_STATICCAST(char32_t); +ANKERL_UNORDERED_DENSE_HASH_STATICCAST(wchar_t); +ANKERL_UNORDERED_DENSE_HASH_STATICCAST(short); +ANKERL_UNORDERED_DENSE_HASH_STATICCAST(unsigned short); +ANKERL_UNORDERED_DENSE_HASH_STATICCAST(int); +ANKERL_UNORDERED_DENSE_HASH_STATICCAST(unsigned int); +ANKERL_UNORDERED_DENSE_HASH_STATICCAST(long); +ANKERL_UNORDERED_DENSE_HASH_STATICCAST(long long); +ANKERL_UNORDERED_DENSE_HASH_STATICCAST(unsigned long); +ANKERL_UNORDERED_DENSE_HASH_STATICCAST(unsigned long long); + +# if defined(__GNUC__) && !defined(__clang__) +# pragma GCC diagnostic pop +# endif + +// bucket_type ////////////////////////////////////////////////////////// + +namespace bucket_type { + +struct standard { + static constexpr uint32_t dist_inc = 1U << 8U; // skip 1 byte fingerprint + static constexpr uint32_t fingerprint_mask = dist_inc - 1; // mask for 1 byte of fingerprint + + uint32_t m_dist_and_fingerprint; // upper 3 byte: distance to original bucket. lower byte: fingerprint from hash + uint32_t m_value_idx; // index into the m_values vector. +}; + +ANKERL_UNORDERED_DENSE_PACK(struct big { + static constexpr uint32_t dist_inc = 1U << 8U; // skip 1 byte fingerprint + static constexpr uint32_t fingerprint_mask = dist_inc - 1; // mask for 1 byte of fingerprint + + uint32_t m_dist_and_fingerprint; // upper 3 byte: distance to original bucket. lower byte: fingerprint from hash + size_t m_value_idx; // index into the m_values vector. +}); + +} // namespace bucket_type + +namespace detail { + +struct nonesuch {}; + +template class Op, class... Args> +struct detector { + using value_t = std::false_type; + using type = Default; +}; + +template class Op, class... Args> +struct detector>, Op, Args...> { + using value_t = std::true_type; + using type = Op; +}; + +template