From: Harsimran Singh Date: Tue, 2 Sep 2025 14:24:57 +0000 (+0530) Subject: rgw: Quota tracking integration and testing X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=a27aed3f5bb8feec7efa195733c7874ee26b4fdc;p=ceph-ci.git rgw: Quota tracking integration and testing This squashes: - Quota Tracking Changes - Fixing issues in integration and Testing Signed-off-by: Harsimran Singh --- diff --git a/src/common/options/rgw.yaml.in b/src/common/options/rgw.yaml.in index 7b44a9587e0..11b66d1e255 100644 --- a/src/common/options/rgw.yaml.in +++ b/src/common/options/rgw.yaml.in @@ -4520,6 +4520,7 @@ options: default: false services: - rgw + - name: rgw_ratelimit_interval type: uint level: advanced @@ -4534,6 +4535,7 @@ options: flags: - startup with_legacy: true + - name: rgw_redis_connection_pool_size type: int level: basic @@ -4547,6 +4549,7 @@ options: services: - rgw with_legacy: true + - name: rgw_bucket_persistent_notif_num_shards type: uint level: advanced @@ -4572,3 +4575,43 @@ options: see_also: - rgw_enable_usage_log with_legacy: true + +- name: rgw_enable_usage_perf_counters + type: bool + level: advanced + desc: Enable per-user and per-bucket usage performance counters + long_desc: When enabled, RGW will track and expose usage metrics (bytes and objects) + for users and buckets through the Ceph performance counter framework + default: true + services: + - rgw + with_legacy: true + +- name: rgw_usage_cache_path + type: str + level: advanced + desc: Path to LMDB database file for usage statistics cache + default: /var/lib/ceph/radosgw/usage_cache.mdb + services: + - rgw + with_legacy: true + +- name: rgw_usage_cache_max_size + type: size + level: advanced + desc: Maximum size of the LMDB usage cache database + default: 1_G + services: + - rgw + with_legacy: true + +- name: rgw_usage_cache_ttl + type: uint + level: advanced + desc: Time-to-live in seconds for cached usage statistics + default: 300 + min: 60 + max: 3600 + services: + - rgw + with_legacy: true diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt index f52dd9e5864..7b1dab92c1f 100644 --- a/src/rgw/CMakeLists.txt +++ b/src/rgw/CMakeLists.txt @@ -33,6 +33,21 @@ endfunction() find_package(ICU 52.0 COMPONENTS uc REQUIRED) +find_package(PkgConfig QUIET) +if(PkgConfig_FOUND) + pkg_check_modules(LMDB QUIET lmdb) +endif() + +if(NOT LMDB_FOUND) + find_path(LMDB_INCLUDE_DIR NAMES lmdb.h) + find_library(LMDB_LIBRARIES NAMES lmdb) + if(LMDB_INCLUDE_DIR AND LMDB_LIBRARIES) + set(LMDB_FOUND TRUE) + else() + message(FATAL_ERROR "LMDB not found. Please install liblmdb-dev or lmdb-devel") + endif() +endif() + set(librgw_common_srcs spdk/crc64.c madler/crc64nvme.c @@ -143,7 +158,10 @@ set(librgw_common_srcs rgw_realm_watcher.cc rgw_bucket_logging.cc rgw_rest_bucket_logging.cc - rgw_bucket_sync.cc) + rgw_bucket_sync.cc + rgw_usage_cache.cc + rgw_usage_perf.cc +) list(APPEND librgw_common_srcs driver/immutable_config/store.cc @@ -315,7 +333,8 @@ target_link_libraries(rgw_common target_include_directories(rgw_common PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw/services" PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw" - PUBLIC "${LUA_INCLUDE_DIR}") + PUBLIC "${LUA_INCLUDE_DIR}" + PRIVATE "${LMDB_INCLUDE_DIR}") # work around https://github.com/Cyan4973/xxHash/issues/943 for debug builds target_compile_definitions(rgw_common PUBLIC diff --git a/src/rgw/rgw_appmain.cc b/src/rgw/rgw_appmain.cc index 3ccd2c18a04..55fa48581b1 100644 --- a/src/rgw/rgw_appmain.cc +++ b/src/rgw/rgw_appmain.cc @@ -59,6 +59,7 @@ #include "rgw_kmip_client_impl.h" #include "rgw_perf_counters.h" #include "rgw_signal.h" +#include "rgw_usage_perf.h" #ifdef WITH_ARROW_FLIGHT #include "rgw_flight_frontend.h" #endif @@ -269,6 +270,57 @@ int rgw::AppMain::init_storage() void rgw::AppMain::init_perfcounters() { (void) rgw_perf_start(dpp->get_cct()); + + if (g_conf()->rgw_enable_usage_perf_counters) { + // Validate and create cache directory + rgw::UsageCache::Config cache_config; + cache_config.db_path = g_conf()->rgw_usage_cache_path; + cache_config.max_db_size = g_conf()->rgw_usage_cache_max_size; + cache_config.ttl = std::chrono::seconds(g_conf()->rgw_usage_cache_ttl); + + // Check if cache directory exists + if (!cache_config.db_path.empty()) { + std::string db_dir = cache_config.db_path; + size_t pos = db_dir.find_last_of('/'); + if (pos != std::string::npos) { + db_dir = db_dir.substr(0, pos); + } + + struct stat st; + if (stat(db_dir.c_str(), &st) != 0) { + // Try to create directory + if (mkdir(db_dir.c_str(), 0755) == 0) { + ldpp_dout(dpp, 10) << "Created usage cache directory: " << db_dir << dendl; + } else { + ldpp_dout(dpp, 0) << "WARNING: Failed to create usage cache directory: " + << db_dir << " - " << cpp_strerror(errno) + << " (continuing without usage cache)" << dendl; + cache_config.db_path = ""; + } + } else if (!S_ISDIR(st.st_mode)) { + ldpp_dout(dpp, 0) << "WARNING: Usage cache path is not a directory: " + << db_dir << " (continuing without usage cache)" << dendl; + cache_config.db_path = ""; + } + } + + // Create and initialize usage perf counters + if (!cache_config.db_path.empty()) { + usage_perf_counters = std::make_unique( + dpp->get_cct(), cache_config); + + int r = usage_perf_counters->init(); + if (r < 0) { + ldpp_dout(dpp, 1) << "WARNING: Failed to initialize usage perf counters: " + << cpp_strerror(-r) << " (continuing without them)" << dendl; + usage_perf_counters.reset(); + } else { + usage_perf_counters->start(); + rgw::set_usage_perf_counters(usage_perf_counters.get()); + ldpp_dout(dpp, 10) << "Usage performance counters initialized successfully" << dendl; + } + } + } } /* init_perfcounters */ void rgw::AppMain::init_http_clients() @@ -669,6 +721,13 @@ void rgw::AppMain::shutdown(std::function finalize_async_signals) delete fec; } + if (usage_perf_counters) { + ldpp_dout(dpp, 10) << "Shutting down usage performance counters" << dendl; + usage_perf_counters->shutdown(); + rgw::set_usage_perf_counters(nullptr); + usage_perf_counters.reset(); + } + finalize_async_signals(); // callback rgw_tools_cleanup(); diff --git a/src/rgw/rgw_main.h b/src/rgw/rgw_main.h index ae151711d57..dd7e2bb528d 100644 --- a/src/rgw/rgw_main.h +++ b/src/rgw/rgw_main.h @@ -33,6 +33,7 @@ #endif #include "rgw_dmclock_scheduler_ctx.h" #include "rgw_ratelimit.h" +#include "rgw_usage_perf.h" class RGWPauser : public RGWRealmReloader::Pauser { @@ -84,6 +85,7 @@ class AppMain { std::map service_map_meta; // wow, realm reloader has a lot of parts std::unique_ptr reloader; + std::unique_ptr usage_perf_counters; #ifdef WITH_RADOSGW_RADOS std::unique_ptr pusher; #endif diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index a166d8f35c8..9a1999441b9 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -6,7 +6,7 @@ #include #include #include - +#include "rgw_usage_perf.h" #include #include @@ -62,6 +62,7 @@ #include "rgw_sal_rados.h" #endif #include "rgw_torrent.h" +#include "rgw_usage_perf.h" #include "rgw_cksum_pipe.h" #include "rgw_lua_data_filter.h" #include "rgw_lua.h" @@ -2256,6 +2257,90 @@ int RGWGetObj::handle_user_manifest(const char *prefix, optional_yield y) return r; } +void RGWOp::update_usage_stats_if_needed() { + auto* usage_counters = rgw::get_usage_perf_counters(); + if (!usage_counters) { + return; + } + + // Only update for successful operations + if (op_ret != 0) { + return; + } + + // Check if this is an operation that changes usage + bool is_put_op = (dynamic_cast(this) != nullptr) || + (dynamic_cast(this) != nullptr) || + (dynamic_cast(this) != nullptr) || + (dynamic_cast(this) != nullptr); + + bool is_delete_op = (dynamic_cast(this) != nullptr) || + (dynamic_cast(this) != nullptr); + + if (!is_put_op && !is_delete_op) { + return; // Not an operation that changes usage + } + + // Update bucket statistics if we have bucket info + if (s->bucket && usage_counters) { + try { + // Use the bucket's sync_owner_stats to get current stats + // This updates the bucket's internal stats + RGWBucketEnt ent; + int ret = s->bucket->sync_owner_stats(this, null_yield, &ent); + + if (ret >= 0) { + // Update bucket usage statistics using the entry data + usage_counters->update_bucket_stats( + s->bucket->get_name(), + ent.size, // Total bytes used + ent.count // Total number of objects + ); + + ldout(s->cct, 20) << "Updated bucket stats for " << s->bucket->get_name() + << ": bytes=" << ent.size + << ", objects=" << ent.count << dendl; + } else { + ldout(s->cct, 10) << "Failed to sync bucket stats for " + << s->bucket->get_name() + << ": " << cpp_strerror(-ret) << dendl; + } + } catch (const std::exception& e) { + ldout(s->cct, 5) << "Exception updating bucket stats: " << e.what() << dendl; + } + } + + // Update user statistics if we have user info + if (s->user && usage_counters) { + try { + // For user stats, we'll use the bucket owner stats as a proxy + // since there's no direct get_user_stats method + if (s->bucket) { + RGWBucketEnt ent; + int ret = s->bucket->sync_owner_stats(this, null_yield, &ent); + + if (ret >= 0) { + // This gives us at least partial user stats + // In production, you might want to aggregate across all user's buckets + usage_counters->update_user_stats( + s->user->get_id().id, + ent.size, // Using bucket size as proxy + ent.count // Using bucket object count as proxy + ); + + ldout(s->cct, 20) << "Updated user stats for " << s->user->get_id().id + << " (based on bucket " << s->bucket->get_name() << ")" + << ": bytes=" << ent.size + << ", objects=" << ent.count << dendl; + } + } + } catch (const std::exception& e) { + ldout(s->cct, 5) << "Exception updating user stats: " << e.what() << dendl; + } + } +} + + int RGWGetObj::handle_slo_manifest(bufferlist& bl, optional_yield y) { RGWSLOInfo slo_info; @@ -3998,6 +4083,91 @@ void RGWCreateBucket::execute(optional_yield y) /* continue if EEXIST and create_bucket will fail below. this way we can * recover from a partial create by retrying it. */ ldpp_dout(this, 20) << "Bucket::create() returned ret=" << op_ret << " bucket=" << s->bucket << dendl; + + if (op_ret < 0 && op_ret != -EEXIST && op_ret != -ERR_BUCKET_EXISTS) + return; + + const bool existed = s->bucket_exists; + if (need_metadata_upload() && existed) { + /* OK, it looks we lost race with another request. As it's required to + * handle metadata fusion and upload, the whole operation becomes very + * similar in nature to PutMetadataBucket. However, as the attrs may + * changed in the meantime, we have to refresh. */ + short tries = 0; + do { + map battrs; + + op_ret = s->bucket->load_bucket(this, y); + if (op_ret < 0) { + return; + } else if (!s->auth.identity->is_owner_of(s->bucket->get_owner())) { + /* New bucket doesn't belong to the account we're operating on. */ + op_ret = -EEXIST; + return; + } else { + s->bucket_attrs = s->bucket->get_attrs(); + } + + createparams.attrs.clear(); + + op_ret = rgw_get_request_metadata(this, s->cct, s->info, createparams.attrs, false); + if (op_ret < 0) { + return; + } + prepare_add_del_attrs(s->bucket_attrs, rmattr_names, createparams.attrs); + populate_with_generic_attrs(s, createparams.attrs); + op_ret = filter_out_quota_info(createparams.attrs, rmattr_names, + s->bucket->get_info().quota); + if (op_ret < 0) { + return; + } + + /* Handle updates of the metadata for Swift's object versioning. */ + if (createparams.swift_ver_location) { + s->bucket->get_info().swift_ver_location = *createparams.swift_ver_location; + s->bucket->get_info().swift_versioning = !createparams.swift_ver_location->empty(); + } + + /* Web site of Swift API. */ + filter_out_website(createparams.attrs, rmattr_names, + s->bucket->get_info().website_conf); + s->bucket->get_info().has_website = !s->bucket->get_info().website_conf.is_empty(); + + /* This will also set the quota on the bucket. */ + s->bucket->set_attrs(std::move(createparams.attrs)); + constexpr bool exclusive = false; // overwrite + constexpr ceph::real_time no_set_mtime{}; + op_ret = s->bucket->put_info(this, exclusive, no_set_mtime, y); + } while (op_ret == -ECANCELED && tries++ < 20); + + /* Restore the proper return code. */ + if (op_ret >= 0) { + op_ret = -ERR_BUCKET_EXISTS; + } + } /* if (need_metadata_upload() && existed) */ + + if (op_ret >= 0 || op_ret == -ERR_BUCKET_EXISTS) { + auto* usage_counters = rgw::get_usage_perf_counters(); + if (usage_counters && s->bucket) { + // For new buckets, initialize with 0 bytes and 0 objects + usage_counters->update_bucket_stats(s->bucket->get_name(), 0, 0); + + // Update user stats - use sync_owner_stats to get current info + if (s->user) { + RGWBucketEnt ent; + int ret = s->bucket->sync_owner_stats(this, y, &ent); + if (ret >= 0) { + // This updates with the user's total across this bucket + usage_counters->update_user_stats( + s->user->get_id().id, + ent.size, + ent.count + ); + } + } + } + } + } /* RGWCreateBucket::execute() */ int RGWDeleteBucket::verify_permission(optional_yield y) @@ -4075,6 +4245,24 @@ void RGWDeleteBucket::execute(optional_yield y) rgw::op_counters::inc(counters, l_rgw_op_del_bucket, 1); rgw::op_counters::tinc(counters, l_rgw_op_del_bucket_lat, s->time_elapsed()); + rgw::op_counters::inc(counters, l_rgw_op_del_bucket, 1); + rgw::op_counters::tinc(counters, l_rgw_op_del_bucket_lat, s->time_elapsed()); + + // Add usage counter update here, right before return + if (op_ret >= 0) { + auto* usage_counters = rgw::get_usage_perf_counters(); + if (usage_counters && s->bucket) { + // Remove bucket from cache since it's deleted + usage_counters->evict_from_cache("", s->bucket->get_name()); + + // Update user stats - bucket count has changed + // Since bucket is deleted, we can't use it to get stats + // Just evict the user from cache to force refresh next time + if (s->user) { + usage_counters->evict_from_cache(s->user->get_id().id, ""); + } + } + } return; } diff --git a/src/rgw/rgw_op.h b/src/rgw/rgw_op.h index 15ffd7b399b..95ef52ad98a 100644 --- a/src/rgw/rgw_op.h +++ b/src/rgw/rgw_op.h @@ -247,6 +247,8 @@ protected: return r; } + void update_usage_stats_if_needed(); + public: RGWOp() : s(nullptr), @@ -303,6 +305,7 @@ public: virtual void send_response() {} virtual void complete() { send_response(); + update_usage_stats_if_needed(); } virtual const char* name() const = 0; virtual RGWOpType get_type() { return RGW_OP_UNKNOWN; } diff --git a/src/rgw/rgw_perf_counters.cc b/src/rgw/rgw_perf_counters.cc index 0eba41d8081..1a55c1c9e63 100644 --- a/src/rgw/rgw_perf_counters.cc +++ b/src/rgw/rgw_perf_counters.cc @@ -6,6 +6,7 @@ #include "common/perf_counters_key.h" #include "common/ceph_context.h" #include "rgw_sal.h" +#include "rgw_usage_perf.h" using namespace ceph::perf_counters; using namespace rgw::op_counters; diff --git a/src/rgw/rgw_usage_cache.cc b/src/rgw/rgw_usage_cache.cc new file mode 100644 index 00000000000..bd96836b672 --- /dev/null +++ b/src/rgw/rgw_usage_cache.cc @@ -0,0 +1,740 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +#include "rgw_usage_cache.h" +#include "common/dout.h" +#include "common/errno.h" +#include "common/perf_counters.h" +#include "common/ceph_context.h" +#include "common/perf_counters_collection.h" +#include "common/errno.h" +#include +#include + +#define dout_subsys ceph_subsys_rgw + +namespace rgw { + +// Performance counter indices +enum { + PERF_CACHE_FIRST = 100000, + PERF_CACHE_HIT, + PERF_CACHE_MISS, + PERF_CACHE_UPDATE, + PERF_CACHE_REMOVE, + PERF_CACHE_EXPIRED, + PERF_CACHE_SIZE, + PERF_CACHE_USER_HIT, + PERF_CACHE_USER_MISS, + PERF_CACHE_BUCKET_HIT, + PERF_CACHE_BUCKET_MISS, + PERF_CACHE_LAST +}; + +void UsageStats::encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + ceph::encode(bytes_used, bl); + ceph::encode(num_objects, bl); + ceph::encode(last_updated, bl); + ENCODE_FINISH(bl); +} + +void UsageStats::decode(bufferlist::const_iterator& bl) { + DECODE_START(1, bl); + ceph::decode(bytes_used, bl); + ceph::decode(num_objects, bl); + ceph::decode(last_updated, bl); + DECODE_FINISH(bl); +} + +UsageCache::UsageCache(const Config& cfg) + : config(cfg), cct(nullptr), perf_counters(nullptr) {} + +UsageCache::UsageCache(CephContext* cct, const Config& cfg) + : config(cfg), cct(cct), perf_counters(nullptr) { + init_perf_counters(); +} + +UsageCache::~UsageCache() { + shutdown(); + cleanup_perf_counters(); +} + +UsageCache::UsageCache(UsageCache&& other) noexcept { + std::unique_lock lock(other.db_mutex); + config = std::move(other.config); + env = other.env; + user_dbi = other.user_dbi; + bucket_dbi = other.bucket_dbi; + initialized.store(other.initialized.load()); + cct = other.cct; + perf_counters = other.perf_counters; + cache_hits.store(other.cache_hits.load()); + cache_misses.store(other.cache_misses.load()); + + other.env = nullptr; + other.user_dbi = 0; + other.bucket_dbi = 0; + other.initialized = false; + other.cct = nullptr; + other.perf_counters = nullptr; +} + +UsageCache& UsageCache::operator=(UsageCache&& other) noexcept { + if (this != &other) { + shutdown(); + cleanup_perf_counters(); + + std::unique_lock lock(other.db_mutex); + config = std::move(other.config); + env = other.env; + user_dbi = other.user_dbi; + bucket_dbi = other.bucket_dbi; + initialized.store(other.initialized.load()); + cct = other.cct; + perf_counters = other.perf_counters; + cache_hits.store(other.cache_hits.load()); + cache_misses.store(other.cache_misses.load()); + + other.env = nullptr; + other.user_dbi = 0; + other.bucket_dbi = 0; + other.initialized = false; + other.cct = nullptr; + other.perf_counters = nullptr; + } + return *this; +} + +void UsageCache::init_perf_counters() { + if (!cct || perf_counters) { + return; + } + + PerfCountersBuilder pcb(cct, "rgw_usage_cache", + PERF_CACHE_FIRST, PERF_CACHE_LAST); + + pcb.add_u64_counter(PERF_CACHE_HIT, "cache_hits", + "Total number of cache hits", "hit", + PerfCountersBuilder::PRIO_USEFUL); + pcb.add_u64_counter(PERF_CACHE_MISS, "cache_misses", + "Total number of cache misses", "miss", + PerfCountersBuilder::PRIO_USEFUL); + pcb.add_u64_counter(PERF_CACHE_UPDATE, "cache_updates", + "Total number of cache updates", "upd", + PerfCountersBuilder::PRIO_INTERESTING); + pcb.add_u64_counter(PERF_CACHE_REMOVE, "cache_removes", + "Total number of cache removes", "rm", + PerfCountersBuilder::PRIO_INTERESTING); + pcb.add_u64_counter(PERF_CACHE_EXPIRED, "cache_expired", + "Total number of expired entries", "exp", + PerfCountersBuilder::PRIO_DEBUGONLY); + pcb.add_u64(PERF_CACHE_SIZE, "cache_size", + "Current cache size", "size", + PerfCountersBuilder::PRIO_USEFUL); + pcb.add_u64_counter(PERF_CACHE_USER_HIT, "user_cache_hits", + "User cache hits", "uhit", + PerfCountersBuilder::PRIO_DEBUGONLY); + pcb.add_u64_counter(PERF_CACHE_USER_MISS, "user_cache_misses", + "User cache misses", "umis", + PerfCountersBuilder::PRIO_DEBUGONLY); + pcb.add_u64_counter(PERF_CACHE_BUCKET_HIT, "bucket_cache_hits", + "Bucket cache hits", "bhit", + PerfCountersBuilder::PRIO_DEBUGONLY); + pcb.add_u64_counter(PERF_CACHE_BUCKET_MISS, "bucket_cache_misses", + "Bucket cache misses", "bmis", + PerfCountersBuilder::PRIO_DEBUGONLY); + + perf_counters = pcb.create_perf_counters(); + cct->get_perfcounters_collection()->add(perf_counters); +} + +void UsageCache::cleanup_perf_counters() { + if (cct && perf_counters) { + cct->get_perfcounters_collection()->remove(perf_counters); + delete perf_counters; + perf_counters = nullptr; + } +} + +void UsageCache::inc_counter(int counter, uint64_t amount) { + if (perf_counters) { + perf_counters->inc(counter, amount); + } +} + +void UsageCache::set_counter(int counter, uint64_t value) { + if (perf_counters) { + perf_counters->set(counter, value); + } +} + +int UsageCache::init() { + if (initialized.exchange(true)) { + return 0; + } + + // Validate database directory exists + if (cct) { + std::string db_dir = config.db_path; + size_t pos = db_dir.find_last_of('/'); + if (pos != std::string::npos) { + db_dir = db_dir.substr(0, pos); + } + + struct stat st; + if (stat(db_dir.c_str(), &st) != 0) { + // Try to create directory + if (mkdir(db_dir.c_str(), 0755) != 0) { + ldout(cct, 0) << "ERROR: Failed to create usage cache directory: " + << db_dir << " - " << cpp_strerror(errno) << dendl; + initialized = false; + return -errno; + } + } else if (!S_ISDIR(st.st_mode)) { + ldout(cct, 0) << "ERROR: Usage cache path is not a directory: " + << db_dir << dendl; + initialized = false; + return -ENOTDIR; + } + } + + int ret = open_database(); + if (ret < 0) { + initialized = false; + return ret; + } + + set_counter(PERF_CACHE_SIZE, get_cache_size()); + + return 0; +} + +void UsageCache::shutdown() { + if (initialized.exchange(false)) { + close_database(); + } +} + +int UsageCache::open_database() { + int rc = mdb_env_create(&env); + if (rc != 0) { + if (cct) { + ldout(cct, 0) << "LMDB env_create failed: " << mdb_strerror(rc) << dendl; + } + return -EIO; + } + + rc = mdb_env_set_mapsize(env, config.max_db_size); + if (rc != 0) { + if (cct) { + ldout(cct, 0) << "LMDB set_mapsize failed: " << mdb_strerror(rc) << dendl; + } + mdb_env_close(env); + env = nullptr; + return -EIO; + } + + rc = mdb_env_set_maxreaders(env, config.max_readers); + if (rc != 0) { + if (cct) { + ldout(cct, 0) << "LMDB set_maxreaders failed: " << mdb_strerror(rc) << dendl; + } + mdb_env_close(env); + env = nullptr; + return -EIO; + } + + rc = mdb_env_set_maxdbs(env, 2); + if (rc != 0) { + if (cct) { + ldout(cct, 0) << "LMDB set_maxdbs failed: " << mdb_strerror(rc) << dendl; + } + mdb_env_close(env); + env = nullptr; + return -EIO; + } + + rc = mdb_env_open(env, config.db_path.c_str(), MDB_NOSUBDIR | MDB_NOTLS, 0644); + if (rc != 0) { + if (cct) { + ldout(cct, 0) << "LMDB env_open failed for " << config.db_path + << ": " << mdb_strerror(rc) << dendl; + } + mdb_env_close(env); + env = nullptr; + return -EIO; + } + + // Open named databases + MDB_txn* txn = nullptr; + rc = mdb_txn_begin(env, nullptr, 0, &txn); + if (rc != 0) { + if (cct) { + ldout(cct, 0) << "LMDB txn_begin failed: " << mdb_strerror(rc) << dendl; + } + mdb_env_close(env); + env = nullptr; + return -EIO; + } + + rc = mdb_dbi_open(txn, "user_stats", MDB_CREATE, &user_dbi); + if (rc != 0) { + if (cct) { + ldout(cct, 0) << "LMDB dbi_open(user_stats) failed: " + << mdb_strerror(rc) << dendl; + } + mdb_txn_abort(txn); + mdb_env_close(env); + env = nullptr; + return -EIO; + } + + rc = mdb_dbi_open(txn, "bucket_stats", MDB_CREATE, &bucket_dbi); + if (rc != 0) { + if (cct) { + ldout(cct, 0) << "LMDB dbi_open(bucket_stats) failed: " + << mdb_strerror(rc) << dendl; + } + mdb_txn_abort(txn); + mdb_env_close(env); + env = nullptr; + return -EIO; + } + + rc = mdb_txn_commit(txn); + if (rc != 0) { + if (cct) { + ldout(cct, 0) << "LMDB txn_commit failed: " << mdb_strerror(rc) << dendl; + } + mdb_env_close(env); + env = nullptr; + return -EIO; + } + + if (cct) { + ldout(cct, 10) << "LMDB database opened successfully: " << config.db_path << dendl; + } + + return 0; +} + +void UsageCache::close_database() { + if (env) { + mdb_env_close(env); + env = nullptr; + user_dbi = 0; + bucket_dbi = 0; + } +} + +template +int UsageCache::put_stats(MDB_dbi dbi, const std::string& key, const T& stats) { + if (!initialized) { + return -EINVAL; + } + + bufferlist bl; + stats.encode(bl); + + MDB_val mdb_key = {key.size(), const_cast(key.data())}; + MDB_val mdb_val = {bl.length(), bl.c_str()}; + + MDB_txn* txn = nullptr; + int rc = mdb_txn_begin(env, nullptr, 0, &txn); + if (rc != 0) { + if (cct) { + ldout(cct, 5) << "LMDB txn_begin failed in put_stats: " + << mdb_strerror(rc) << dendl; + } + return -EIO; + } + + rc = mdb_put(txn, dbi, &mdb_key, &mdb_val, 0); + if (rc != 0) { + if (cct) { + ldout(cct, 5) << "LMDB put failed for key " << key + << ": " << mdb_strerror(rc) << dendl; + } + mdb_txn_abort(txn); + return -EIO; + } + + rc = mdb_txn_commit(txn); + if (rc != 0) { + if (cct) { + ldout(cct, 5) << "LMDB txn_commit failed in put_stats: " + << mdb_strerror(rc) << dendl; + } + return -EIO; + } + + return 0; +} + +template +std::optional UsageCache::get_stats(MDB_dbi dbi, const std::string& key) { + if (!initialized) { + return std::nullopt; + } + + MDB_val mdb_key = {key.size(), const_cast(key.data())}; + MDB_val mdb_val; + + MDB_txn* txn = nullptr; + int rc = mdb_txn_begin(env, nullptr, MDB_RDONLY, &txn); + if (rc != 0) { + if (cct) { + ldout(cct, 10) << "LMDB txn_begin failed in get_stats: " + << mdb_strerror(rc) << dendl; + } + return std::nullopt; + } + + rc = mdb_get(txn, dbi, &mdb_key, &mdb_val); + mdb_txn_abort(txn); + + if (rc != 0) { + if (rc != MDB_NOTFOUND && cct) { + ldout(cct, 10) << "LMDB get failed for key " << key + << ": " << mdb_strerror(rc) << dendl; + } + return std::nullopt; + } + + bufferlist bl; + bl.append(static_cast(mdb_val.mv_data), mdb_val.mv_size); + + T stats; + try { + auto iter = bl.cbegin(); + stats.decode(iter); + + // Check TTL + auto now = ceph::real_clock::now(); + if (now - stats.last_updated > config.ttl) { + inc_counter(PERF_CACHE_EXPIRED); + return std::nullopt; + } + + return stats; + } catch (const buffer::error& e) { + if (cct) { + ldout(cct, 5) << "Failed to decode stats for key " << key + << ": " << e.what() << dendl; + } + return std::nullopt; + } +} + +int UsageCache::update_user_stats(const std::string& user_id, + uint64_t bytes_used, + uint64_t num_objects) { + std::unique_lock lock(db_mutex); + + UsageStats stats; + stats.bytes_used = bytes_used; + stats.num_objects = num_objects; + stats.last_updated = ceph::real_clock::now(); + + int ret = put_stats(user_dbi, user_id, stats); + if (ret == 0) { + inc_counter(PERF_CACHE_UPDATE); + set_counter(PERF_CACHE_SIZE, get_cache_size_internal()); + } + + return ret; +} + +std::optional UsageCache::get_user_stats(const std::string& user_id) { + std::shared_lock lock(db_mutex); + auto result = get_stats(user_dbi, user_id); + + // Update performance counters + if (result.has_value()) { + cache_hits++; + inc_counter(PERF_CACHE_HIT); + inc_counter(PERF_CACHE_USER_HIT); + } else { + cache_misses++; + inc_counter(PERF_CACHE_MISS); + inc_counter(PERF_CACHE_USER_MISS); + } + + return result; +} + +int UsageCache::remove_user_stats(const std::string& user_id) { + if (!initialized) { + return -EINVAL; + } + + std::unique_lock lock(db_mutex); + + MDB_val mdb_key = {user_id.size(), const_cast(user_id.data())}; + + MDB_txn* txn = nullptr; + int rc = mdb_txn_begin(env, nullptr, 0, &txn); + if (rc != 0) { + if (cct) { + ldout(cct, 5) << "LMDB txn_begin failed in remove_user_stats: " + << mdb_strerror(rc) << dendl; + } + return -EIO; + } + + rc = mdb_del(txn, user_dbi, &mdb_key, nullptr); + if (rc != 0 && rc != MDB_NOTFOUND) { + if (cct) { + ldout(cct, 5) << "LMDB del failed for user " << user_id + << ": " << mdb_strerror(rc) << dendl; + } + mdb_txn_abort(txn); + return -EIO; + } + + rc = mdb_txn_commit(txn); + if (rc != 0) { + if (cct) { + ldout(cct, 5) << "LMDB txn_commit failed in remove_user_stats: " + << mdb_strerror(rc) << dendl; + } + return -EIO; + } + + inc_counter(PERF_CACHE_REMOVE); + set_counter(PERF_CACHE_SIZE, get_cache_size_internal()); + + return 0; +} + +int UsageCache::update_bucket_stats(const std::string& bucket_name, + uint64_t bytes_used, + uint64_t num_objects) { + std::unique_lock lock(db_mutex); + + UsageStats stats; + stats.bytes_used = bytes_used; + stats.num_objects = num_objects; + stats.last_updated = ceph::real_clock::now(); + + int ret = put_stats(bucket_dbi, bucket_name, stats); + if (ret == 0) { + inc_counter(PERF_CACHE_UPDATE); + set_counter(PERF_CACHE_SIZE, get_cache_size_internal()); + } + + return ret; +} + +std::optional UsageCache::get_bucket_stats(const std::string& bucket_name) { + std::shared_lock lock(db_mutex); + auto result = get_stats(bucket_dbi, bucket_name); + + // Update performance counters + if (result.has_value()) { + cache_hits++; + inc_counter(PERF_CACHE_HIT); + inc_counter(PERF_CACHE_BUCKET_HIT); + } else { + cache_misses++; + inc_counter(PERF_CACHE_MISS); + inc_counter(PERF_CACHE_BUCKET_MISS); + } + + return result; +} + +int UsageCache::remove_bucket_stats(const std::string& bucket_name) { + if (!initialized) { + return -EINVAL; + } + + std::unique_lock lock(db_mutex); + + MDB_val mdb_key = {bucket_name.size(), const_cast(bucket_name.data())}; + + MDB_txn* txn = nullptr; + int rc = mdb_txn_begin(env, nullptr, 0, &txn); + if (rc != 0) { + if (cct) { + ldout(cct, 5) << "LMDB txn_begin failed in remove_bucket_stats: " + << mdb_strerror(rc) << dendl; + } + return -EIO; + } + + rc = mdb_del(txn, bucket_dbi, &mdb_key, nullptr); + if (rc != 0 && rc != MDB_NOTFOUND) { + if (cct) { + ldout(cct, 5) << "LMDB del failed for bucket " << bucket_name + << ": " << mdb_strerror(rc) << dendl; + } + mdb_txn_abort(txn); + return -EIO; + } + + rc = mdb_txn_commit(txn); + if (rc != 0) { + if (cct) { + ldout(cct, 5) << "LMDB txn_commit failed in remove_bucket_stats: " + << mdb_strerror(rc) << dendl; + } + return -EIO; + } + + inc_counter(PERF_CACHE_REMOVE); + set_counter(PERF_CACHE_SIZE, get_cache_size_internal()); + + return 0; +} + +int UsageCache::clear_expired_entries() { + if (!initialized) { + return -EINVAL; + } + + std::unique_lock lock(db_mutex); + + auto now = ceph::real_clock::now(); + int total_removed = 0; + + // Helper lambda to clear expired entries from a database + auto clear_db = [this, &now](MDB_dbi dbi) -> int { + MDB_txn* txn = nullptr; + MDB_cursor* cursor = nullptr; + + int rc = mdb_txn_begin(env, nullptr, 0, &txn); + if (rc != 0) { + if (cct) { + ldout(cct, 5) << "LMDB txn_begin failed in clear_expired_entries: " + << mdb_strerror(rc) << dendl; + } + return -EIO; + } + + rc = mdb_cursor_open(txn, dbi, &cursor); + if (rc != 0) { + if (cct) { + ldout(cct, 5) << "LMDB cursor_open failed: " << mdb_strerror(rc) << dendl; + } + mdb_txn_abort(txn); + return -EIO; + } + + MDB_val key, val; + int removed = 0; + + while (mdb_cursor_get(cursor, &key, &val, MDB_NEXT) == 0) { + bufferlist bl; + bl.append(static_cast(val.mv_data), val.mv_size); + + try { + UsageStats stats; + auto iter = bl.cbegin(); + stats.decode(iter); + + if (now - stats.last_updated > config.ttl) { + mdb_cursor_del(cursor, 0); + removed++; + inc_counter(PERF_CACHE_EXPIRED); + } + } catch (const buffer::error& e) { + // Skip malformed entries + if (cct) { + ldout(cct, 10) << "Skipping malformed entry: " << e.what() << dendl; + } + } + } + + mdb_cursor_close(cursor); + + rc = mdb_txn_commit(txn); + if (rc != 0) { + if (cct) { + ldout(cct, 5) << "LMDB txn_commit failed in clear_expired_entries: " + << mdb_strerror(rc) << dendl; + } + return -EIO; + } + + return removed; + }; + + int ret = clear_db(user_dbi); + if (ret >= 0) { + total_removed += ret; + } + + ret = clear_db(bucket_dbi); + if (ret >= 0) { + total_removed += ret; + } + + set_counter(PERF_CACHE_SIZE, get_cache_size_internal()); + + if (cct) { + ldout(cct, 10) << "Cleared " << total_removed << " expired cache entries" << dendl; + } + + return total_removed; +} + +size_t UsageCache::get_cache_size() const { + if (!initialized) { + return 0; + } + + std::shared_lock lock(db_mutex); + return get_cache_size_internal(); +} + +size_t UsageCache::get_cache_size_internal() const { + if (!initialized) { + return 0; + } + + MDB_stat stat; + MDB_txn* txn = nullptr; + + int rc = mdb_txn_begin(env, nullptr, MDB_RDONLY, &txn); + if (rc != 0) { + return 0; + } + + size_t total = 0; + + if (mdb_stat(txn, user_dbi, &stat) == 0) { + total += stat.ms_entries; + } + + if (mdb_stat(txn, bucket_dbi, &stat) == 0) { + total += stat.ms_entries; + } + + mdb_txn_abort(txn); + + return total; +} + +uint64_t UsageCache::get_cache_hits() const { + return cache_hits.load(); +} + +uint64_t UsageCache::get_cache_misses() const { + return cache_misses.load(); +} + +double UsageCache::get_hit_rate() const { + uint64_t hits = cache_hits.load(); + uint64_t misses = cache_misses.load(); + uint64_t total = hits + misses; + + return (total > 0) ? (double)hits / total * 100.0 : 0.0; +} + +// Explicit template instantiations +template int UsageCache::put_stats(MDB_dbi, const std::string&, const UsageStats&); +template std::optional UsageCache::get_stats(MDB_dbi, const std::string&); + +} // namespace rgw \ No newline at end of file diff --git a/src/rgw/rgw_usage_cache.h b/src/rgw/rgw_usage_cache.h new file mode 100644 index 00000000000..a726cb78741 --- /dev/null +++ b/src/rgw/rgw_usage_cache.h @@ -0,0 +1,125 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include "include/buffer.h" +#include "include/encoding.h" +#include "common/ceph_time.h" + +// Forward declarations +class CephContext; +class PerfCounters; + +namespace rgw { + +struct UsageStats { + uint64_t bytes_used; + uint64_t num_objects; + ceph::real_time last_updated; + + void encode(bufferlist& bl) const; + void decode(bufferlist::const_iterator& bl); + WRITE_CLASS_ENCODER(UsageStats) +}; + +class UsageCache { +public: + struct Config { + std::string db_path; + size_t max_db_size; + uint32_t max_readers; + std::chrono::seconds ttl; + + Config() + : db_path("/var/lib/ceph/radosgw/usage_cache.mdb"), + max_db_size(1 << 30), // 1GB default + max_readers(126), + ttl(300) {} // 5 min TTL + }; + + explicit UsageCache(const Config& config); + UsageCache(CephContext* cct, const Config& config); + ~UsageCache(); + + // Move semantics + UsageCache(UsageCache&& other) noexcept; + UsageCache& operator=(UsageCache&& other) noexcept; + + // Delete copy semantics + UsageCache(const UsageCache&) = delete; + UsageCache& operator=(const UsageCache&) = delete; + + // Lifecycle + int init(); + void shutdown(); + + // User stats operations (non-const to update counters) + int update_user_stats(const std::string& user_id, + uint64_t bytes_used, + uint64_t num_objects); + std::optional get_user_stats(const std::string& user_id); + int remove_user_stats(const std::string& user_id); + + // Bucket stats operations (non-const to update counters) + int update_bucket_stats(const std::string& bucket_name, + uint64_t bytes_used, + uint64_t num_objects); + std::optional get_bucket_stats(const std::string& bucket_name); + int remove_bucket_stats(const std::string& bucket_name); + + // Maintenance + int clear_expired_entries(); + size_t get_cache_size() const; + + // Performance metrics + uint64_t get_cache_hits() const; + uint64_t get_cache_misses() const; + double get_hit_rate() const; + +private: + // Database operations + int open_database(); + void close_database(); + + template + int put_stats(MDB_dbi dbi, const std::string& key, const T& stats); + + template + std::optional get_stats(MDB_dbi dbi, const std::string& key); + + // Performance counter helpers + void init_perf_counters(); + void cleanup_perf_counters(); + void inc_counter(int counter, uint64_t amount = 1); + void set_counter(int counter, uint64_t value); + + // Internal helper for cache size (assumes lock is held) + size_t get_cache_size_internal() const; + + Config config; + MDB_env* env = nullptr; + MDB_dbi user_dbi = 0; + MDB_dbi bucket_dbi = 0; + + mutable std::shared_mutex db_mutex; + std::atomic initialized{false}; + + // Performance counters + CephContext* cct; + PerfCounters* perf_counters; + + // Mutable atomic counters for thread-safe statistics + mutable std::atomic cache_hits{0}; + mutable std::atomic cache_misses{0}; +}; + +} // namespace rgw \ No newline at end of file diff --git a/src/rgw/rgw_usage_perf.cc b/src/rgw/rgw_usage_perf.cc new file mode 100644 index 00000000000..255ee4319d7 --- /dev/null +++ b/src/rgw/rgw_usage_perf.cc @@ -0,0 +1,376 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +#include "rgw_usage_perf.h" +#include "common/ceph_context.h" +#include "common/perf_counters.h" +#include "common/dout.h" +#include "common/perf_counters_collection.h" +#include "common/errno.h" + +#define dout_subsys ceph_subsys_rgw + +namespace rgw { + +// Global singleton +static UsagePerfCounters* g_usage_perf_counters = nullptr; + +UsagePerfCounters* get_usage_perf_counters() { + return g_usage_perf_counters; +} + +void set_usage_perf_counters(UsagePerfCounters* counters) { + g_usage_perf_counters = counters; +} + +UsagePerfCounters::UsagePerfCounters(CephContext* cct, + const UsageCache::Config& cache_config) + : cct(cct), cache(std::make_unique(cct, cache_config)), + global_counters(nullptr) { + create_global_counters(); +} + +UsagePerfCounters::~UsagePerfCounters() { + shutdown(); +} + +void UsagePerfCounters::create_global_counters() { + PerfCountersBuilder b(cct, "rgw_usage", l_rgw_usage_first, l_rgw_usage_last); + + // Placeholder counters for indices that aren't globally used + b.add_u64(l_rgw_user_used_bytes, "user_used_bytes", + "User bytes placeholder", nullptr, 0, unit_t(UNIT_BYTES)); + b.add_u64(l_rgw_user_num_objects, "user_num_objects", + "User objects placeholder", nullptr, 0, unit_t(0)); + b.add_u64(l_rgw_bucket_used_bytes, "bucket_used_bytes", + "Bucket bytes placeholder", nullptr, 0, unit_t(UNIT_BYTES)); + b.add_u64(l_rgw_bucket_num_objects, "bucket_num_objects", + "Bucket objects placeholder", nullptr, 0, unit_t(0)); + + // Global cache metrics + b.add_u64_counter(l_rgw_usage_cache_hit, "cache_hit", + "Number of cache hits", nullptr, 0, unit_t(0)); + b.add_u64_counter(l_rgw_usage_cache_miss, "cache_miss", + "Number of cache misses", nullptr, 0, unit_t(0)); + b.add_u64_counter(l_rgw_usage_cache_update, "cache_update", + "Number of cache updates", nullptr, 0, unit_t(0)); + b.add_u64_counter(l_rgw_usage_cache_evict, "cache_evict", + "Number of cache evictions", nullptr, 0, unit_t(0)); + + global_counters = b.create_perf_counters(); + cct->get_perfcounters_collection()->add(global_counters); +} + +PerfCounters* UsagePerfCounters::create_user_counters(const std::string& user_id) { + std::string name = "rgw_user_" + user_id; + + // Sanitize name for perf counters (replace non-alphanumeric with underscore) + for (char& c : name) { + if (!std::isalnum(c) && c != '_') { + c = '_'; + } + } + + PerfCountersBuilder b(cct, name, l_rgw_usage_first, l_rgw_usage_last); + + // Add placeholder counters for unused indices + for (int i = l_rgw_usage_first + 1; i < l_rgw_user_used_bytes; ++i) { + b.add_u64(i, "placeholder", "placeholder", nullptr, 0, unit_t(0)); + } + + b.add_u64(l_rgw_user_used_bytes, "used_bytes", + "Bytes used by user", nullptr, 0, unit_t(UNIT_BYTES)); + b.add_u64(l_rgw_user_num_objects, "num_objects", + "Number of objects owned by user", nullptr, 0, unit_t(0)); + + // Add remaining placeholder counters + for (int i = l_rgw_user_num_objects + 1; i < l_rgw_usage_last; ++i) { + b.add_u64(i, "placeholder", "placeholder", nullptr, 0, unit_t(0)); + } + + PerfCounters* counters = b.create_perf_counters(); + cct->get_perfcounters_collection()->add(counters); + + return counters; +} + +PerfCounters* UsagePerfCounters::create_bucket_counters(const std::string& bucket_name) { + std::string name = "rgw_bucket_" + bucket_name; + + // Sanitize name for perf counters + for (char& c : name) { + if (!std::isalnum(c) && c != '_') { + c = '_'; + } + } + + PerfCountersBuilder b(cct, name, l_rgw_usage_first, l_rgw_usage_last); + + // Add placeholder counters for unused indices + for (int i = l_rgw_usage_first + 1; i < l_rgw_bucket_used_bytes; ++i) { + b.add_u64(i, "placeholder", "placeholder", nullptr, 0, unit_t(0)); + } + + b.add_u64(l_rgw_bucket_used_bytes, "used_bytes", + "Bytes used in bucket", nullptr, 0, unit_t(UNIT_BYTES)); + b.add_u64(l_rgw_bucket_num_objects, "num_objects", + "Number of objects in bucket", nullptr, 0, unit_t(0)); + + // Add remaining placeholder counters + for (int i = l_rgw_bucket_num_objects + 1; i < l_rgw_usage_last; ++i) { + b.add_u64(i, "placeholder", "placeholder", nullptr, 0, unit_t(0)); + } + + PerfCounters* counters = b.create_perf_counters(); + cct->get_perfcounters_collection()->add(counters); + + return counters; +} + +void UsagePerfCounters::cleanup_worker() { + ldout(cct, 10) << "Starting usage cache cleanup worker thread" << dendl; + + while (!shutdown_flag.load()) { + // Sleep with periodic checks for shutdown + for (int i = 0; i < cleanup_interval.count(); ++i) { + if (shutdown_flag.load()) { + break; + } + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + + if (!shutdown_flag.load()) { + cleanup_expired_entries(); + } + } + + ldout(cct, 10) << "Usage cache cleanup worker thread exiting" << dendl; +} + +int UsagePerfCounters::init() { + int ret = cache->init(); + if (ret < 0) { + ldout(cct, 0) << "Failed to initialize usage cache: " << cpp_strerror(-ret) << dendl; + return ret; + } + + ldout(cct, 10) << "Usage performance counters initialized successfully" << dendl; + return 0; +} + +void UsagePerfCounters::start() { + ldout(cct, 10) << "Starting usage perf counters" << dendl; + + // Start cleanup thread + cleanup_thread = std::thread(&UsagePerfCounters::cleanup_worker, this); +} + +void UsagePerfCounters::stop() { + ldout(cct, 10) << "Stopping usage perf counters" << dendl; + + // Stop cleanup thread + shutdown_flag = true; + if (cleanup_thread.joinable()) { + cleanup_thread.join(); + } +} + +void UsagePerfCounters::shutdown() { + stop(); + + // Clean up perf counters + { + std::unique_lock lock(counters_mutex); + + auto* collection = cct->get_perfcounters_collection(); + + // Remove and delete user counters + for (auto& [_, counters] : user_perf_counters) { + collection->remove(counters); + delete counters; + } + user_perf_counters.clear(); + + // Remove and delete bucket counters + for (auto& [_, counters] : bucket_perf_counters) { + collection->remove(counters); + delete counters; + } + bucket_perf_counters.clear(); + + // Remove global counters + if (global_counters) { + collection->remove(global_counters); + delete global_counters; + global_counters = nullptr; + } + } + + // Shutdown cache + cache->shutdown(); + + ldout(cct, 10) << "Usage perf counters shutdown complete" << dendl; +} + +void UsagePerfCounters::update_user_stats(const std::string& user_id, + uint64_t bytes_used, + uint64_t num_objects, + bool update_cache) { + // Update cache if requested + if (update_cache && cache) { + int ret = cache->update_user_stats(user_id, bytes_used, num_objects); + if (ret == 0) { + global_counters->inc(l_rgw_usage_cache_update); + } else { + ldout(cct, 5) << "Failed to update user cache for " << user_id + << ": " << cpp_strerror(-ret) << dendl; + } + } + + // Update or create perf counters + { + std::unique_lock lock(counters_mutex); + + auto it = user_perf_counters.find(user_id); + if (it == user_perf_counters.end()) { + PerfCounters* counters = create_user_counters(user_id); + user_perf_counters[user_id] = counters; + it = user_perf_counters.find(user_id); + } + + it->second->set(l_rgw_user_used_bytes, bytes_used); + it->second->set(l_rgw_user_num_objects, num_objects); + } + + ldout(cct, 20) << "Updated user stats: " << user_id + << " bytes=" << bytes_used + << " objects=" << num_objects << dendl; +} + +void UsagePerfCounters::update_bucket_stats(const std::string& bucket_name, + uint64_t bytes_used, + uint64_t num_objects, + bool update_cache) { + // Update cache if requested + if (update_cache && cache) { + int ret = cache->update_bucket_stats(bucket_name, bytes_used, num_objects); + if (ret == 0) { + global_counters->inc(l_rgw_usage_cache_update); + } else { + ldout(cct, 5) << "Failed to update bucket cache for " << bucket_name + << ": " << cpp_strerror(-ret) << dendl; + } + } + + // Update or create perf counters + { + std::unique_lock lock(counters_mutex); + + auto it = bucket_perf_counters.find(bucket_name); + if (it == bucket_perf_counters.end()) { + PerfCounters* counters = create_bucket_counters(bucket_name); + bucket_perf_counters[bucket_name] = counters; + it = bucket_perf_counters.find(bucket_name); + } + + it->second->set(l_rgw_bucket_used_bytes, bytes_used); + it->second->set(l_rgw_bucket_num_objects, num_objects); + } + + ldout(cct, 20) << "Updated bucket stats: " << bucket_name + << " bytes=" << bytes_used + << " objects=" << num_objects << dendl; +} + +void UsagePerfCounters::refresh_from_cache(const std::string& user_id, + const std::string& bucket_name) { + if (!cache) { + return; + } + + // Refresh user stats + if (!user_id.empty()) { + auto user_stats = cache->get_user_stats(user_id); + if (user_stats) { + global_counters->inc(l_rgw_usage_cache_hit); + update_user_stats(user_id, user_stats->bytes_used, + user_stats->num_objects, false); + } else { + global_counters->inc(l_rgw_usage_cache_miss); + } + } + + // Refresh bucket stats + if (!bucket_name.empty()) { + auto bucket_stats = cache->get_bucket_stats(bucket_name); + if (bucket_stats) { + global_counters->inc(l_rgw_usage_cache_hit); + update_bucket_stats(bucket_name, bucket_stats->bytes_used, + bucket_stats->num_objects, false); + } else { + global_counters->inc(l_rgw_usage_cache_miss); + } + } +} + +void UsagePerfCounters::evict_from_cache(const std::string& user_id, + const std::string& bucket_name) { + if (!cache) { + return; + } + + if (!user_id.empty()) { + cache->remove_user_stats(user_id); + global_counters->inc(l_rgw_usage_cache_evict); + } + + if (!bucket_name.empty()) { + cache->remove_bucket_stats(bucket_name); + global_counters->inc(l_rgw_usage_cache_evict); + } +} + +std::optional UsagePerfCounters::get_user_stats(const std::string& user_id) { + if (!cache) { + return std::nullopt; + } + + auto stats = cache->get_user_stats(user_id); + if (stats) { + global_counters->inc(l_rgw_usage_cache_hit); + } else { + global_counters->inc(l_rgw_usage_cache_miss); + } + + return stats; +} + +std::optional UsagePerfCounters::get_bucket_stats(const std::string& bucket_name) { + if (!cache) { + return std::nullopt; + } + + auto stats = cache->get_bucket_stats(bucket_name); + if (stats) { + global_counters->inc(l_rgw_usage_cache_hit); + } else { + global_counters->inc(l_rgw_usage_cache_miss); + } + + return stats; +} + +void UsagePerfCounters::cleanup_expired_entries() { + if (cache) { + int removed = cache->clear_expired_entries(); + if (removed > 0) { + ldout(cct, 10) << "Cleaned up " << removed << " expired cache entries" << dendl; + } + } +} + +size_t UsagePerfCounters::get_cache_size() const { + return cache ? cache->get_cache_size() : 0; +} + +} // namespace rgw \ No newline at end of file diff --git a/src/rgw/rgw_usage_perf.h b/src/rgw/rgw_usage_perf.h new file mode 100644 index 00000000000..0718d7c8b88 --- /dev/null +++ b/src/rgw/rgw_usage_perf.h @@ -0,0 +1,108 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include "common/perf_counters.h" +#include "rgw_usage_cache.h" + +class CephContext; + +namespace rgw { + +// Performance counter indices +enum { + l_rgw_usage_first = 920000, + l_rgw_user_used_bytes, + l_rgw_user_num_objects, + l_rgw_bucket_used_bytes, + l_rgw_bucket_num_objects, + l_rgw_usage_cache_hit, + l_rgw_usage_cache_miss, + l_rgw_usage_cache_update, + l_rgw_usage_cache_evict, + l_rgw_usage_last +}; + +class UsagePerfCounters { +private: + CephContext* cct; + std::unique_ptr cache; + + mutable std::shared_mutex counters_mutex; + + // Track raw pointers for proper cleanup + std::unordered_map user_perf_counters; + std::unordered_map bucket_perf_counters; + + PerfCounters* global_counters; + + // Cleanup thread management + std::thread cleanup_thread; + std::atomic shutdown_flag{false}; + std::chrono::seconds cleanup_interval{300}; // 5 minutes + + void create_global_counters(); + PerfCounters* create_user_counters(const std::string& user_id); + PerfCounters* create_bucket_counters(const std::string& bucket_name); + + void cleanup_worker(); + +public: + explicit UsagePerfCounters(CephContext* cct, + const UsageCache::Config& cache_config); + explicit UsagePerfCounters(CephContext* cct) + : UsagePerfCounters(cct, UsageCache::Config{}) {} + ~UsagePerfCounters(); + + // Lifecycle management + int init(); + void start(); + void stop(); + void shutdown(); + + // User stats updates + void update_user_stats(const std::string& user_id, + uint64_t bytes_used, + uint64_t num_objects, + bool update_cache = true); + + // Bucket stats updates + void update_bucket_stats(const std::string& bucket_name, + uint64_t bytes_used, + uint64_t num_objects, + bool update_cache = true); + + // Cache operations + void refresh_from_cache(const std::string& user_id, + const std::string& bucket_name); + void evict_from_cache(const std::string& user_id, + const std::string& bucket_name); + + // Stats retrieval (from cache) + std::optional get_user_stats(const std::string& user_id); + std::optional get_bucket_stats(const std::string& bucket_name); + + // Maintenance + void cleanup_expired_entries(); + size_t get_cache_size() const; + + // Set cleanup interval + void set_cleanup_interval(std::chrono::seconds interval) { + cleanup_interval = interval; + } +}; + +// Global singleton access +UsagePerfCounters* get_usage_perf_counters(); +void set_usage_perf_counters(UsagePerfCounters* counters); + +} // namespace rgw \ No newline at end of file diff --git a/src/test/rgw/CMakeLists.txt b/src/test/rgw/CMakeLists.txt index 9b5ca191932..40fa3b4bfbc 100644 --- a/src/test/rgw/CMakeLists.txt +++ b/src/test/rgw/CMakeLists.txt @@ -12,6 +12,28 @@ if(WITH_RADOSGW_KAFKA_ENDPOINT) add_library(kafka_stub STATIC ${kafka_stub_src}) endif() +# Find LMDB if not already found +if(NOT LMDB_FOUND) + find_package(PkgConfig QUIET) + if(PkgConfig_FOUND) + pkg_check_modules(LMDB QUIET lmdb) + endif() + + if(NOT LMDB_FOUND) + find_path(LMDB_INCLUDE_DIR NAMES lmdb.h + PATHS /usr/include /usr/local/include) + find_library(LMDB_LIBRARIES NAMES lmdb + PATHS /usr/lib /usr/local/lib /usr/lib64 /usr/local/lib64) + + if(LMDB_INCLUDE_DIR AND LMDB_LIBRARIES) + set(LMDB_FOUND TRUE) + message(STATUS "Found LMDB: ${LMDB_LIBRARIES}") + else() + message(FATAL_ERROR "LMDB not found. Please install liblmdb-dev or lmdb-devel") + endif() + endif() +endif() + if(WITH_RADOSGW_LUA_PACKAGES) list(APPEND rgw_libs Boost::filesystem) endif() @@ -350,6 +372,48 @@ target_link_libraries(unittest_log_backing ${rgw_libs}) endif() +# Adding the usage cache unit test +if(WITH_TESTS) + + add_executable(unittest_rgw_usage_cache + test_rgw_usage_cache.cc + ${CMAKE_SOURCE_DIR}/src/rgw/rgw_usage_cache.cc + ${CMAKE_SOURCE_DIR}/src/rgw/rgw_usage_perf.cc) + + target_include_directories(unittest_rgw_usage_cache PRIVATE + ${CMAKE_SOURCE_DIR}/src + ${CMAKE_SOURCE_DIR}/src/rgw + ${LMDB_INCLUDE_DIR} + ) + + target_link_libraries(unittest_rgw_usage_cache + global + gtest + gtest_main + gmock_main + gmock + ${LMDB_LIBRARIES} + ceph-common + ${CMAKE_DL_LIBS} + ) + + add_ceph_unittest(unittest_rgw_usage_cache) + +# Test for usage perf counters +add_executable(unittest_rgw_usage_perf_counters + test_rgw_usage_perf_counters.cc + ${CMAKE_SOURCE_DIR}/src/rgw/rgw_usage_cache.cc + ${CMAKE_SOURCE_DIR}/src/rgw/rgw_usage_perf.cc) + +target_link_libraries(unittest_rgw_usage_perf_counters + ${UNITTEST_LIBS} + global + ceph-common + lmdb) + +add_ceph_unittest(unittest_rgw_usage_perf_counters) +endif() + if(WITH_RADOSGW_RADOS) add_executable(unittest_rgw_lua test_rgw_lua.cc) add_ceph_unittest(unittest_rgw_lua) diff --git a/src/test/rgw/test_rgw_usage_cache.cc b/src/test/rgw/test_rgw_usage_cache.cc new file mode 100644 index 00000000000..39e0ead23c0 --- /dev/null +++ b/src/test/rgw/test_rgw_usage_cache.cc @@ -0,0 +1,369 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "rgw/rgw_usage_cache.h" +#include "global/global_init.h" +#include "common/ceph_context.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace fs = std::filesystem; + +// Global CephContext pointer for tests +static CephContext* g_test_context = nullptr; + +// Test fixture for RGWUsageCache +class TestRGWUsageCache : public ::testing::Test { +protected: + std::unique_ptr cache; + rgw::UsageCache::Config config; + std::string test_db_path; + + void SetUp() override { + // Generate unique test database path to avoid conflicts + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution<> dis(1000000, 9999999); + test_db_path = "/tmp/test_usage_cache_" + std::to_string(dis(gen)) + ".mdb"; + + // Initialize config with proper fields + config.db_path = test_db_path; + config.max_db_size = 1 << 20; // 1MB for testing + config.max_readers = 10; + config.ttl = std::chrono::seconds(2); // 2 second TTL for testing + + // Create cache with global CephContext if available, otherwise without + if (g_test_context) { + cache = std::make_unique(g_test_context, config); + } else { + cache = std::make_unique(config); + } + + // CRITICAL: Initialize the cache! + int init_result = cache->init(); + ASSERT_EQ(0, init_result) << "Failed to initialize cache: " << init_result; + } + + void TearDown() override { + if (cache) { + cache->shutdown(); + } + cache.reset(); + + // Clean up test database files + try { + fs::remove(test_db_path); + fs::remove(test_db_path + "-lock"); + } catch (const std::exception& e) { + // Ignore cleanup errors + } + } +}; + +// Test basic user statistics operations +TEST_F(TestRGWUsageCache, BasicUserOperations) { + const std::string user_id = "test_user"; + const uint64_t bytes_used = 1024 * 1024; + const uint64_t num_objects = 42; + + // Update user stats + int update_result = cache->update_user_stats(user_id, bytes_used, num_objects); + ASSERT_EQ(0, update_result) << "Failed to update user stats: " << update_result; + + // Get and verify user stats + auto stats = cache->get_user_stats(user_id); + ASSERT_TRUE(stats.has_value()); + EXPECT_EQ(bytes_used, stats->bytes_used); + EXPECT_EQ(num_objects, stats->num_objects); + + // Remove user stats + ASSERT_EQ(0, cache->remove_user_stats(user_id)); + + // Verify stats are removed + stats = cache->get_user_stats(user_id); + EXPECT_FALSE(stats.has_value()); +} + +// Test basic bucket statistics operations +TEST_F(TestRGWUsageCache, BasicBucketOperations) { + const std::string bucket_name = "test_bucket"; + const uint64_t bytes_used = 512 * 1024; + const uint64_t num_objects = 17; + + // Update bucket stats + int update_result = cache->update_bucket_stats(bucket_name, bytes_used, num_objects); + ASSERT_EQ(0, update_result) << "Failed to update bucket stats: " << update_result; + + // Get and verify bucket stats + auto stats = cache->get_bucket_stats(bucket_name); + ASSERT_TRUE(stats.has_value()); + EXPECT_EQ(bytes_used, stats->bytes_used); + EXPECT_EQ(num_objects, stats->num_objects); + + // Remove bucket stats + ASSERT_EQ(0, cache->remove_bucket_stats(bucket_name)); + + // Verify stats are removed + stats = cache->get_bucket_stats(bucket_name); + EXPECT_FALSE(stats.has_value()); +} + +// Test TTL expiration +TEST_F(TestRGWUsageCache, TTLExpiration) { + const std::string user_id = "ttl_test_user"; + + // Add user stats + ASSERT_EQ(0, cache->update_user_stats(user_id, 1024, 1)); + + // Verify stats exist + auto stats = cache->get_user_stats(user_id); + ASSERT_TRUE(stats.has_value()); + + // Wait for TTL to expire (2 seconds + buffer) + std::this_thread::sleep_for(std::chrono::milliseconds(2500)); + + // Verify stats have expired + stats = cache->get_user_stats(user_id); + EXPECT_FALSE(stats.has_value()); +} + +// Test concurrent access +TEST_F(TestRGWUsageCache, ConcurrentAccess) { + const int num_threads = 4; + const int ops_per_thread = 100; + std::vector threads; + std::atomic successful_updates(0); + std::atomic failed_updates(0); + + for (int t = 0; t < num_threads; ++t) { + threads.emplace_back([this, t, ops_per_thread, &successful_updates, &failed_updates]() { + for (int i = 0; i < ops_per_thread; ++i) { + std::string user_id = "t" + std::to_string(t) + "_u" + std::to_string(i); + int result = cache->update_user_stats(user_id, i * 1024, i); + if (result == 0) { + successful_updates++; + } else { + failed_updates++; + } + + auto stats = cache->get_user_stats(user_id); + if (result == 0) { + EXPECT_TRUE(stats.has_value()); + } + } + }); + } + + for (auto& th : threads) { + th.join(); + } + + std::cout << "Concurrent test: " << successful_updates << " successful, " + << failed_updates << " failed updates" << std::endl; + + EXPECT_GT(successful_updates, 0) << "At least some updates should succeed"; + EXPECT_GT(cache->get_cache_size(), 0u) << "Cache should contain entries"; +} + +// Test updating existing user stats +TEST_F(TestRGWUsageCache, UpdateExistingUserStats) { + const std::string user_id = "update_test_user"; + + // Initial update + ASSERT_EQ(0, cache->update_user_stats(user_id, 1024, 10)); + + auto stats = cache->get_user_stats(user_id); + ASSERT_TRUE(stats.has_value()); + EXPECT_EQ(1024u, stats->bytes_used); + EXPECT_EQ(10u, stats->num_objects); + + // Update with new values (should replace, not accumulate) + ASSERT_EQ(0, cache->update_user_stats(user_id, 2048, 20)); + + stats = cache->get_user_stats(user_id); + ASSERT_TRUE(stats.has_value()); + EXPECT_EQ(2048u, stats->bytes_used); + EXPECT_EQ(20u, stats->num_objects); +} + +// Test multiple bucket operations +TEST_F(TestRGWUsageCache, MultipleBucketOperations) { + const int num_buckets = 10; + + // Add multiple buckets + for (int i = 0; i < num_buckets; ++i) { + std::string bucket_name = "bucket_" + std::to_string(i); + uint64_t bytes = (i + 1) * 1024; + uint64_t objects = (i + 1) * 10; + + ASSERT_EQ(0, cache->update_bucket_stats(bucket_name, bytes, objects)); + } + + // Verify all buckets + for (int i = 0; i < num_buckets; ++i) { + std::string bucket_name = "bucket_" + std::to_string(i); + auto stats = cache->get_bucket_stats(bucket_name); + + ASSERT_TRUE(stats.has_value()); + EXPECT_EQ((i + 1) * 1024u, stats->bytes_used); + EXPECT_EQ((i + 1) * 10u, stats->num_objects); + } + + EXPECT_EQ(num_buckets, cache->get_cache_size()); +} + +// Test special character handling +TEST_F(TestRGWUsageCache, SpecialCharacterHandling) { + std::vector test_ids = { + "user@example.com", + "user-with-dashes", + "user_with_underscores", + "user.with.dots", + "user/with/slashes", + "user with spaces", + "user\twith\ttabs" + }; + + for (const auto& id : test_ids) { + // Should handle all these gracefully + ASSERT_EQ(0, cache->update_user_stats(id, 1024, 1)) + << "Failed for ID: " << id; + + auto stats = cache->get_user_stats(id); + ASSERT_TRUE(stats.has_value()) << "Failed to retrieve stats for ID: " << id; + EXPECT_EQ(1024u, stats->bytes_used); + EXPECT_EQ(1u, stats->num_objects); + } +} + +// Test remove non-existent user +TEST_F(TestRGWUsageCache, RemoveNonExistentUser) { + const std::string user_id = "non_existent_user"; + + // Should handle removing non-existent user gracefully + int result = cache->remove_user_stats(user_id); + // Either returns 0 (success) or error (not found) + EXPECT_TRUE(result == 0 || result != 0); + + // Verify user doesn't exist + auto stats = cache->get_user_stats(user_id); + EXPECT_FALSE(stats.has_value()); +} + +// Test cache size tracking +TEST_F(TestRGWUsageCache, CacheSizeTracking) { + // Initial size should be 0 + EXPECT_EQ(0u, cache->get_cache_size()); + + // Add some users + const int num_users = 5; + + for (int i = 0; i < num_users; ++i) { + std::string user_id = "size_test_user_" + std::to_string(i); + ASSERT_EQ(0, cache->update_user_stats(user_id, 1024 * (i + 1), i + 1)); + } + + // Cache size should reflect the number of added users + EXPECT_EQ(num_users, cache->get_cache_size()); + + // Remove one user + ASSERT_EQ(0, cache->remove_user_stats("size_test_user_0")); + + // Cache size should decrease + EXPECT_EQ(num_users - 1, cache->get_cache_size()); +} + +// Test clear expired entries +TEST_F(TestRGWUsageCache, ClearExpiredEntries) { + // Add some entries + ASSERT_EQ(0, cache->update_user_stats("user1", 1024, 1)); + ASSERT_EQ(0, cache->update_user_stats("user2", 2048, 2)); + ASSERT_EQ(0, cache->update_bucket_stats("bucket1", 4096, 4)); + + EXPECT_EQ(3u, cache->get_cache_size()); + + // Wait for entries to expire + std::this_thread::sleep_for(std::chrono::milliseconds(2500)); + + // Clear expired entries + int removed = cache->clear_expired_entries(); + EXPECT_EQ(3, removed); + + // Cache should be empty + EXPECT_EQ(0u, cache->get_cache_size()); +} + +// Test stress with many users and buckets +TEST_F(TestRGWUsageCache, StressTest) { + const int num_users = 1000; + const int num_buckets = 500; + + // Add many users + for (int i = 0; i < num_users; ++i) { + std::string user_id = "stress_user_" + std::to_string(i); + ASSERT_EQ(0, cache->update_user_stats(user_id, i * 1024, i)); + } + + // Add many buckets + for (int i = 0; i < num_buckets; ++i) { + std::string bucket_name = "stress_bucket_" + std::to_string(i); + ASSERT_EQ(0, cache->update_bucket_stats(bucket_name, i * 512, i * 2)); + } + + EXPECT_EQ(num_users + num_buckets, cache->get_cache_size()); + + // Verify random samples + for (int i = 0; i < 10; ++i) { + int user_idx = (i * 97) % num_users; // Sample users + std::string user_id = "stress_user_" + std::to_string(user_idx); + auto stats = cache->get_user_stats(user_id); + ASSERT_TRUE(stats.has_value()); + EXPECT_EQ(user_idx * 1024u, stats->bytes_used); + } + + for (int i = 0; i < 10; ++i) { + int bucket_idx = (i * 53) % num_buckets; // Sample buckets + std::string bucket_name = "stress_bucket_" + std::to_string(bucket_idx); + auto bucket_stats = cache->get_bucket_stats(bucket_name); + ASSERT_TRUE(bucket_stats.has_value()); + EXPECT_EQ(bucket_idx * 512u, bucket_stats->bytes_used); + } + + std::cout << "Stress test completed: " << num_users << " users, " + << num_buckets << " buckets" << std::endl; +} + +// Main function +int main(int argc, char **argv) { + // Initialize Google Test + ::testing::InitGoogleTest(&argc, argv); + + // Initialize Ceph context + std::map defaults = { + {"debug_rgw", "20"}, + {"keyring", "keyring"}, + }; + + std::vector args; + auto cct = global_init(&defaults, args, CEPH_ENTITY_TYPE_CLIENT, + CODE_ENVIRONMENT_UTILITY, + CINIT_FLAG_NO_DEFAULT_CONFIG_FILE); + + // Store the context for test fixtures to use + g_test_context = cct.get(); + + common_init_finish(cct.get()); + + // Run all tests + int result = RUN_ALL_TESTS(); + + return result; +} \ No newline at end of file diff --git a/src/test/rgw/test_rgw_usage_perf_counters.cc b/src/test/rgw/test_rgw_usage_perf_counters.cc new file mode 100644 index 00000000000..f263fcc9af3 --- /dev/null +++ b/src/test/rgw/test_rgw_usage_perf_counters.cc @@ -0,0 +1,264 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "global/global_init.h" +#include "common/ceph_argparse.h" +#include "common/ceph_context.h" +#include "common/perf_counters.h" +#include "common/perf_counters_collection.h" +#include "rgw/rgw_usage_cache.h" + +namespace fs = std::filesystem; + +// Global CephContext for all tests +static boost::intrusive_ptr g_cct_holder; +static CephContext* g_cct = nullptr; + +class TestRGWUsagePerfCounters : public ::testing::Test { +protected: + std::unique_ptr cache; + std::string test_db_path; + + void SetUp() override { + ASSERT_NE(g_cct, nullptr) << "CephContext not initialized"; + + // Generate unique test database path + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution<> dis(1000000, 9999999); + test_db_path = "/tmp/test_rgw_perf_" + std::to_string(dis(gen)) + ".mdb"; + + // Initialize cache with CephContext to enable internal perf counters + rgw::UsageCache::Config config; + config.db_path = test_db_path; + config.max_db_size = 1 << 20; // 1MB + config.ttl = std::chrono::seconds(2); + + // Use the constructor that accepts CephContext to enable perf counters + cache = std::make_unique(g_cct, config); + + ASSERT_EQ(0, cache->init()); + } + + void TearDown() override { + cache->shutdown(); + cache.reset(); + + // Clean up test database + try { + fs::remove(test_db_path); + fs::remove(test_db_path + "-lock"); + } catch (const std::exception& e) { + // Ignore cleanup errors + } + } +}; + +TEST_F(TestRGWUsagePerfCounters, BasicMetrics) { + // First, just test that the cache works + std::cout << "Testing basic cache operations..." << std::endl; + + // Test basic cache functionality first + auto stats = cache->get_user_stats("nonexistent"); + EXPECT_FALSE(stats.has_value()); + + // Add a user + ASSERT_EQ(0, cache->update_user_stats("user1", 1024, 10)); + + // Access existing user + stats = cache->get_user_stats("user1"); + ASSERT_TRUE(stats.has_value()); + EXPECT_EQ(1024u, stats->bytes_used); + EXPECT_EQ(10u, stats->num_objects); + + std::cout << "Basic cache operations successful" << std::endl; + + // Test performance counters - they should be available since we used CephContext + uint64_t hits = cache->get_cache_hits(); + uint64_t misses = cache->get_cache_misses(); + + std::cout << "Performance counter values:" << std::endl; + std::cout << " Hits: " << hits << std::endl; + std::cout << " Misses: " << misses << std::endl; + + // We expect at least one miss (from the first get_user_stats) + // and one hit (from the second get_user_stats) + EXPECT_GE(misses, 1u); + EXPECT_GE(hits, 1u); +} + +TEST_F(TestRGWUsagePerfCounters, HitRateCalculation) { + // Perform a mix of hits and misses + cache->get_user_stats("miss1"); // Miss + cache->get_user_stats("miss2"); // Miss + + cache->update_user_stats("hit1", 1024, 1); + cache->update_user_stats("hit2", 2048, 2); + + cache->get_user_stats("hit1"); // Hit + cache->get_user_stats("hit2"); // Hit + cache->get_user_stats("miss3"); // Miss + + uint64_t hits = cache->get_cache_hits(); + uint64_t misses = cache->get_cache_misses(); + + EXPECT_EQ(2u, hits); + EXPECT_EQ(3u, misses); + + double hit_rate = cache->get_hit_rate(); + double expected_rate = (2.0 / 5.0) * 100.0; // 40% + + EXPECT_DOUBLE_EQ(expected_rate, hit_rate); + + std::cout << "Cache statistics:" << std::endl; + std::cout << " Hits: " << hits << std::endl; + std::cout << " Misses: " << misses << std::endl; + std::cout << " Hit rate: " << hit_rate << "%" << std::endl; +} + +TEST_F(TestRGWUsagePerfCounters, UserAndBucketSeparateTracking) { + // Test that user and bucket stats are tracked separately + + // User operations + cache->get_user_stats("user_miss"); // User miss + cache->update_user_stats("user1", 1024, 1); + cache->get_user_stats("user1"); // User hit + + // Bucket operations + cache->get_bucket_stats("bucket_miss"); // Bucket miss + cache->update_bucket_stats("bucket1", 2048, 2); + cache->get_bucket_stats("bucket1"); // Bucket hit + + // Check overall counters (should include both) + EXPECT_EQ(2u, cache->get_cache_hits()); // 1 user hit + 1 bucket hit + EXPECT_EQ(2u, cache->get_cache_misses()); // 1 user miss + 1 bucket miss +} + +TEST_F(TestRGWUsagePerfCounters, ConcurrentCounterUpdates) { + const int num_threads = 4; + const int ops_per_thread = 100; + std::vector threads; + + for (int t = 0; t < num_threads; ++t) { + threads.emplace_back([this, t, ops_per_thread]() { + for (int i = 0; i < ops_per_thread; ++i) { + std::string user_id = "t" + std::to_string(t) + "_u" + std::to_string(i); + + // First access - miss + cache->get_user_stats(user_id); + + // Update + cache->update_user_stats(user_id, i * 1024, i); + + // Second access - hit + cache->get_user_stats(user_id); + } + }); + } + + for (auto& t : threads) { + t.join(); + } + + // Each thread does ops_per_thread users + // Each user: 1 miss (first get), 1 hit (second get) + uint64_t expected_total = num_threads * ops_per_thread; + EXPECT_EQ(expected_total, cache->get_cache_hits()); + EXPECT_EQ(expected_total, cache->get_cache_misses()); + + std::cout << "Concurrent test results:" << std::endl; + std::cout << " Total operations: " << expected_total * 2 << std::endl; + std::cout << " Hits: " << cache->get_cache_hits() << std::endl; + std::cout << " Misses: " << cache->get_cache_misses() << std::endl; +} + +TEST_F(TestRGWUsagePerfCounters, ExpiredEntryTracking) { + // Add a user + ASSERT_EQ(0, cache->update_user_stats("expiry_test", 1024, 1)); + + // Access it - should be a hit + auto stats = cache->get_user_stats("expiry_test"); + ASSERT_TRUE(stats.has_value()); + EXPECT_EQ(1u, cache->get_cache_hits()); + + // Wait for TTL to expire (2 seconds + buffer) + std::this_thread::sleep_for(std::chrono::milliseconds(2500)); + + // Access expired entry - should be a miss + stats = cache->get_user_stats("expiry_test"); + EXPECT_FALSE(stats.has_value()); + + // After expiry, we should have an additional miss + EXPECT_EQ(1u, cache->get_cache_hits()); + EXPECT_EQ(1u, cache->get_cache_misses()); // The expired entry counts as a miss +} + +TEST_F(TestRGWUsagePerfCounters, RemoveOperationTracking) { + // Add users + cache->update_user_stats("user1", 1024, 1); + cache->update_user_stats("user2", 2048, 2); + + // Remove one user + ASSERT_EQ(0, cache->remove_user_stats("user1")); + + // Try to access removed user - should be a miss + auto stats = cache->get_user_stats("user1"); + EXPECT_FALSE(stats.has_value()); + EXPECT_EQ(1u, cache->get_cache_misses()); + + // Access existing user - should be a hit + stats = cache->get_user_stats("user2"); + EXPECT_TRUE(stats.has_value()); + EXPECT_EQ(1u, cache->get_cache_hits()); +} + +TEST_F(TestRGWUsagePerfCounters, ZeroDivisionInHitRate) { + // Test hit rate calculation when there are no operations + double hit_rate = cache->get_hit_rate(); + EXPECT_EQ(0.0, hit_rate); // Should handle division by zero gracefully + + // After one miss + cache->get_user_stats("nonexistent"); + hit_rate = cache->get_hit_rate(); + EXPECT_EQ(0.0, hit_rate); // 0 hits / 1 total = 0% + + // After one hit + cache->update_user_stats("user1", 1024, 1); + cache->get_user_stats("user1"); + hit_rate = cache->get_hit_rate(); + EXPECT_EQ(50.0, hit_rate); // 1 hit / 2 total = 50% +} + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + + // Initialize CephContext once for all tests + std::vector args; + // Add --no-mon-config to skip fetching config from monitors + args.push_back("--no-mon-config"); + + g_cct_holder = global_init(nullptr, args, CEPH_ENTITY_TYPE_CLIENT, + CODE_ENVIRONMENT_UTILITY, + CINIT_FLAG_NO_MON_CONFIG); // Add this flag + g_cct = g_cct_holder.get(); + common_init_finish(g_cct); + + int result = RUN_ALL_TESTS(); + + // Clean up + g_cct = nullptr; + g_cct_holder.reset(); + + return result; +} \ No newline at end of file