default: false
services:
- rgw
+
- name: rgw_ratelimit_interval
type: uint
level: advanced
flags:
- startup
with_legacy: true
+
- name: rgw_redis_connection_pool_size
type: int
level: basic
services:
- rgw
with_legacy: true
+
- name: rgw_bucket_persistent_notif_num_shards
type: uint
level: advanced
see_also:
- rgw_enable_usage_log
with_legacy: true
+
+- name: rgw_enable_usage_perf_counters
+ type: bool
+ level: advanced
+ desc: Enable per-user and per-bucket usage performance counters
+ long_desc: When enabled, RGW will track and expose usage metrics (bytes and objects)
+ for users and buckets through the Ceph performance counter framework
+ default: true
+ services:
+ - rgw
+ with_legacy: true
+
+- name: rgw_usage_cache_path
+ type: str
+ level: advanced
+ desc: Path to LMDB database file for usage statistics cache
+ default: /var/lib/ceph/radosgw/usage_cache.mdb
+ services:
+ - rgw
+ with_legacy: true
+
+- name: rgw_usage_cache_max_size
+ type: size
+ level: advanced
+ desc: Maximum size of the LMDB usage cache database
+ default: 1_G
+ services:
+ - rgw
+ with_legacy: true
+
+- name: rgw_usage_cache_ttl
+ type: uint
+ level: advanced
+ desc: Time-to-live in seconds for cached usage statistics
+ default: 300
+ min: 60
+ max: 3600
+ services:
+ - rgw
+ with_legacy: true
find_package(ICU 52.0 COMPONENTS uc REQUIRED)
+find_package(PkgConfig QUIET)
+if(PkgConfig_FOUND)
+ pkg_check_modules(LMDB QUIET lmdb)
+endif()
+
+if(NOT LMDB_FOUND)
+ find_path(LMDB_INCLUDE_DIR NAMES lmdb.h)
+ find_library(LMDB_LIBRARIES NAMES lmdb)
+ if(LMDB_INCLUDE_DIR AND LMDB_LIBRARIES)
+ set(LMDB_FOUND TRUE)
+ else()
+ message(FATAL_ERROR "LMDB not found. Please install liblmdb-dev or lmdb-devel")
+ endif()
+endif()
+
set(librgw_common_srcs
spdk/crc64.c
madler/crc64nvme.c
rgw_realm_watcher.cc
rgw_bucket_logging.cc
rgw_rest_bucket_logging.cc
- rgw_bucket_sync.cc)
+ rgw_bucket_sync.cc
+ rgw_usage_cache.cc
+ rgw_usage_perf.cc
+)
list(APPEND librgw_common_srcs
driver/immutable_config/store.cc
target_include_directories(rgw_common
PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw/services"
PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw"
- PUBLIC "${LUA_INCLUDE_DIR}")
+ PUBLIC "${LUA_INCLUDE_DIR}"
+ PRIVATE "${LMDB_INCLUDE_DIR}")
# work around https://github.com/Cyan4973/xxHash/issues/943 for debug builds
target_compile_definitions(rgw_common PUBLIC
#include "rgw_kmip_client_impl.h"
#include "rgw_perf_counters.h"
#include "rgw_signal.h"
+#include "rgw_usage_perf.h"
#ifdef WITH_ARROW_FLIGHT
#include "rgw_flight_frontend.h"
#endif
void rgw::AppMain::init_perfcounters()
{
(void) rgw_perf_start(dpp->get_cct());
+
+ if (g_conf()->rgw_enable_usage_perf_counters) {
+ // Validate and create cache directory
+ rgw::UsageCache::Config cache_config;
+ cache_config.db_path = g_conf()->rgw_usage_cache_path;
+ cache_config.max_db_size = g_conf()->rgw_usage_cache_max_size;
+ cache_config.ttl = std::chrono::seconds(g_conf()->rgw_usage_cache_ttl);
+
+ // Check if cache directory exists
+ if (!cache_config.db_path.empty()) {
+ std::string db_dir = cache_config.db_path;
+ size_t pos = db_dir.find_last_of('/');
+ if (pos != std::string::npos) {
+ db_dir = db_dir.substr(0, pos);
+ }
+
+ struct stat st;
+ if (stat(db_dir.c_str(), &st) != 0) {
+ // Try to create directory
+ if (mkdir(db_dir.c_str(), 0755) == 0) {
+ ldpp_dout(dpp, 10) << "Created usage cache directory: " << db_dir << dendl;
+ } else {
+ ldpp_dout(dpp, 0) << "WARNING: Failed to create usage cache directory: "
+ << db_dir << " - " << cpp_strerror(errno)
+ << " (continuing without usage cache)" << dendl;
+ cache_config.db_path = "";
+ }
+ } else if (!S_ISDIR(st.st_mode)) {
+ ldpp_dout(dpp, 0) << "WARNING: Usage cache path is not a directory: "
+ << db_dir << " (continuing without usage cache)" << dendl;
+ cache_config.db_path = "";
+ }
+ }
+
+ // Create and initialize usage perf counters
+ if (!cache_config.db_path.empty()) {
+ usage_perf_counters = std::make_unique<rgw::UsagePerfCounters>(
+ dpp->get_cct(), cache_config);
+
+ int r = usage_perf_counters->init();
+ if (r < 0) {
+ ldpp_dout(dpp, 1) << "WARNING: Failed to initialize usage perf counters: "
+ << cpp_strerror(-r) << " (continuing without them)" << dendl;
+ usage_perf_counters.reset();
+ } else {
+ usage_perf_counters->start();
+ rgw::set_usage_perf_counters(usage_perf_counters.get());
+ ldpp_dout(dpp, 10) << "Usage performance counters initialized successfully" << dendl;
+ }
+ }
+ }
} /* init_perfcounters */
void rgw::AppMain::init_http_clients()
delete fec;
}
+ if (usage_perf_counters) {
+ ldpp_dout(dpp, 10) << "Shutting down usage performance counters" << dendl;
+ usage_perf_counters->shutdown();
+ rgw::set_usage_perf_counters(nullptr);
+ usage_perf_counters.reset();
+ }
+
finalize_async_signals(); // callback
rgw_tools_cleanup();
#endif
#include "rgw_dmclock_scheduler_ctx.h"
#include "rgw_ratelimit.h"
+#include "rgw_usage_perf.h"
class RGWPauser : public RGWRealmReloader::Pauser {
std::map<std::string, std::string> service_map_meta;
// wow, realm reloader has a lot of parts
std::unique_ptr<RGWRealmReloader> reloader;
+ std::unique_ptr<rgw::UsagePerfCounters> usage_perf_counters;
#ifdef WITH_RADOSGW_RADOS
std::unique_ptr<RGWPeriodPusher> pusher;
#endif
#include <stdlib.h>
#include <system_error>
#include <unistd.h>
-
+#include "rgw_usage_perf.h"
#include <sstream>
#include <string_view>
#include "rgw_sal_rados.h"
#endif
#include "rgw_torrent.h"
+#include "rgw_usage_perf.h"
#include "rgw_cksum_pipe.h"
#include "rgw_lua_data_filter.h"
#include "rgw_lua.h"
return r;
}
+void RGWOp::update_usage_stats_if_needed() {
+ auto* usage_counters = rgw::get_usage_perf_counters();
+ if (!usage_counters) {
+ return;
+ }
+
+ // Only update for successful operations
+ if (op_ret != 0) {
+ return;
+ }
+
+ // Check if this is an operation that changes usage
+ bool is_put_op = (dynamic_cast<RGWPutObj*>(this) != nullptr) ||
+ (dynamic_cast<RGWPostObj*>(this) != nullptr) ||
+ (dynamic_cast<RGWCopyObj*>(this) != nullptr) ||
+ (dynamic_cast<RGWCompleteMultipart*>(this) != nullptr);
+
+ bool is_delete_op = (dynamic_cast<RGWDeleteObj*>(this) != nullptr) ||
+ (dynamic_cast<RGWDeleteMultiObj*>(this) != nullptr);
+
+ if (!is_put_op && !is_delete_op) {
+ return; // Not an operation that changes usage
+ }
+
+ // Update bucket statistics if we have bucket info
+ if (s->bucket && usage_counters) {
+ try {
+ // Use the bucket's sync_owner_stats to get current stats
+ // This updates the bucket's internal stats
+ RGWBucketEnt ent;
+ int ret = s->bucket->sync_owner_stats(this, null_yield, &ent);
+
+ if (ret >= 0) {
+ // Update bucket usage statistics using the entry data
+ usage_counters->update_bucket_stats(
+ s->bucket->get_name(),
+ ent.size, // Total bytes used
+ ent.count // Total number of objects
+ );
+
+ ldout(s->cct, 20) << "Updated bucket stats for " << s->bucket->get_name()
+ << ": bytes=" << ent.size
+ << ", objects=" << ent.count << dendl;
+ } else {
+ ldout(s->cct, 10) << "Failed to sync bucket stats for "
+ << s->bucket->get_name()
+ << ": " << cpp_strerror(-ret) << dendl;
+ }
+ } catch (const std::exception& e) {
+ ldout(s->cct, 5) << "Exception updating bucket stats: " << e.what() << dendl;
+ }
+ }
+
+ // Update user statistics if we have user info
+ if (s->user && usage_counters) {
+ try {
+ // For user stats, we'll use the bucket owner stats as a proxy
+ // since there's no direct get_user_stats method
+ if (s->bucket) {
+ RGWBucketEnt ent;
+ int ret = s->bucket->sync_owner_stats(this, null_yield, &ent);
+
+ if (ret >= 0) {
+ // This gives us at least partial user stats
+ // In production, you might want to aggregate across all user's buckets
+ usage_counters->update_user_stats(
+ s->user->get_id().id,
+ ent.size, // Using bucket size as proxy
+ ent.count // Using bucket object count as proxy
+ );
+
+ ldout(s->cct, 20) << "Updated user stats for " << s->user->get_id().id
+ << " (based on bucket " << s->bucket->get_name() << ")"
+ << ": bytes=" << ent.size
+ << ", objects=" << ent.count << dendl;
+ }
+ }
+ } catch (const std::exception& e) {
+ ldout(s->cct, 5) << "Exception updating user stats: " << e.what() << dendl;
+ }
+ }
+}
+
+
int RGWGetObj::handle_slo_manifest(bufferlist& bl, optional_yield y)
{
RGWSLOInfo slo_info;
/* continue if EEXIST and create_bucket will fail below. this way we can
* recover from a partial create by retrying it. */
ldpp_dout(this, 20) << "Bucket::create() returned ret=" << op_ret << " bucket=" << s->bucket << dendl;
+
+ if (op_ret < 0 && op_ret != -EEXIST && op_ret != -ERR_BUCKET_EXISTS)
+ return;
+
+ const bool existed = s->bucket_exists;
+ if (need_metadata_upload() && existed) {
+ /* OK, it looks we lost race with another request. As it's required to
+ * handle metadata fusion and upload, the whole operation becomes very
+ * similar in nature to PutMetadataBucket. However, as the attrs may
+ * changed in the meantime, we have to refresh. */
+ short tries = 0;
+ do {
+ map<string, bufferlist> battrs;
+
+ op_ret = s->bucket->load_bucket(this, y);
+ if (op_ret < 0) {
+ return;
+ } else if (!s->auth.identity->is_owner_of(s->bucket->get_owner())) {
+ /* New bucket doesn't belong to the account we're operating on. */
+ op_ret = -EEXIST;
+ return;
+ } else {
+ s->bucket_attrs = s->bucket->get_attrs();
+ }
+
+ createparams.attrs.clear();
+
+ op_ret = rgw_get_request_metadata(this, s->cct, s->info, createparams.attrs, false);
+ if (op_ret < 0) {
+ return;
+ }
+ prepare_add_del_attrs(s->bucket_attrs, rmattr_names, createparams.attrs);
+ populate_with_generic_attrs(s, createparams.attrs);
+ op_ret = filter_out_quota_info(createparams.attrs, rmattr_names,
+ s->bucket->get_info().quota);
+ if (op_ret < 0) {
+ return;
+ }
+
+ /* Handle updates of the metadata for Swift's object versioning. */
+ if (createparams.swift_ver_location) {
+ s->bucket->get_info().swift_ver_location = *createparams.swift_ver_location;
+ s->bucket->get_info().swift_versioning = !createparams.swift_ver_location->empty();
+ }
+
+ /* Web site of Swift API. */
+ filter_out_website(createparams.attrs, rmattr_names,
+ s->bucket->get_info().website_conf);
+ s->bucket->get_info().has_website = !s->bucket->get_info().website_conf.is_empty();
+
+ /* This will also set the quota on the bucket. */
+ s->bucket->set_attrs(std::move(createparams.attrs));
+ constexpr bool exclusive = false; // overwrite
+ constexpr ceph::real_time no_set_mtime{};
+ op_ret = s->bucket->put_info(this, exclusive, no_set_mtime, y);
+ } while (op_ret == -ECANCELED && tries++ < 20);
+
+ /* Restore the proper return code. */
+ if (op_ret >= 0) {
+ op_ret = -ERR_BUCKET_EXISTS;
+ }
+ } /* if (need_metadata_upload() && existed) */
+
+ if (op_ret >= 0 || op_ret == -ERR_BUCKET_EXISTS) {
+ auto* usage_counters = rgw::get_usage_perf_counters();
+ if (usage_counters && s->bucket) {
+ // For new buckets, initialize with 0 bytes and 0 objects
+ usage_counters->update_bucket_stats(s->bucket->get_name(), 0, 0);
+
+ // Update user stats - use sync_owner_stats to get current info
+ if (s->user) {
+ RGWBucketEnt ent;
+ int ret = s->bucket->sync_owner_stats(this, y, &ent);
+ if (ret >= 0) {
+ // This updates with the user's total across this bucket
+ usage_counters->update_user_stats(
+ s->user->get_id().id,
+ ent.size,
+ ent.count
+ );
+ }
+ }
+ }
+ }
+
} /* RGWCreateBucket::execute() */
int RGWDeleteBucket::verify_permission(optional_yield y)
rgw::op_counters::inc(counters, l_rgw_op_del_bucket, 1);
rgw::op_counters::tinc(counters, l_rgw_op_del_bucket_lat, s->time_elapsed());
+ rgw::op_counters::inc(counters, l_rgw_op_del_bucket, 1);
+ rgw::op_counters::tinc(counters, l_rgw_op_del_bucket_lat, s->time_elapsed());
+
+ // Add usage counter update here, right before return
+ if (op_ret >= 0) {
+ auto* usage_counters = rgw::get_usage_perf_counters();
+ if (usage_counters && s->bucket) {
+ // Remove bucket from cache since it's deleted
+ usage_counters->evict_from_cache("", s->bucket->get_name());
+
+ // Update user stats - bucket count has changed
+ // Since bucket is deleted, we can't use it to get stats
+ // Just evict the user from cache to force refresh next time
+ if (s->user) {
+ usage_counters->evict_from_cache(s->user->get_id().id, "");
+ }
+ }
+ }
return;
}
return r;
}
+ void update_usage_stats_if_needed();
+
public:
RGWOp()
: s(nullptr),
virtual void send_response() {}
virtual void complete() {
send_response();
+ update_usage_stats_if_needed();
}
virtual const char* name() const = 0;
virtual RGWOpType get_type() { return RGW_OP_UNKNOWN; }
#include "common/perf_counters_key.h"
#include "common/ceph_context.h"
#include "rgw_sal.h"
+#include "rgw_usage_perf.h"
using namespace ceph::perf_counters;
using namespace rgw::op_counters;
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+#include "rgw_usage_cache.h"
+#include "common/dout.h"
+#include "common/errno.h"
+#include "common/perf_counters.h"
+#include "common/ceph_context.h"
+#include "common/perf_counters_collection.h"
+#include "common/errno.h"
+#include <sys/stat.h>
+#include <sys/types.h>
+
+#define dout_subsys ceph_subsys_rgw
+
+namespace rgw {
+
+// Performance counter indices
+enum {
+ PERF_CACHE_FIRST = 100000,
+ PERF_CACHE_HIT,
+ PERF_CACHE_MISS,
+ PERF_CACHE_UPDATE,
+ PERF_CACHE_REMOVE,
+ PERF_CACHE_EXPIRED,
+ PERF_CACHE_SIZE,
+ PERF_CACHE_USER_HIT,
+ PERF_CACHE_USER_MISS,
+ PERF_CACHE_BUCKET_HIT,
+ PERF_CACHE_BUCKET_MISS,
+ PERF_CACHE_LAST
+};
+
+void UsageStats::encode(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ ceph::encode(bytes_used, bl);
+ ceph::encode(num_objects, bl);
+ ceph::encode(last_updated, bl);
+ ENCODE_FINISH(bl);
+}
+
+void UsageStats::decode(bufferlist::const_iterator& bl) {
+ DECODE_START(1, bl);
+ ceph::decode(bytes_used, bl);
+ ceph::decode(num_objects, bl);
+ ceph::decode(last_updated, bl);
+ DECODE_FINISH(bl);
+}
+
+UsageCache::UsageCache(const Config& cfg)
+ : config(cfg), cct(nullptr), perf_counters(nullptr) {}
+
+UsageCache::UsageCache(CephContext* cct, const Config& cfg)
+ : config(cfg), cct(cct), perf_counters(nullptr) {
+ init_perf_counters();
+}
+
+UsageCache::~UsageCache() {
+ shutdown();
+ cleanup_perf_counters();
+}
+
+UsageCache::UsageCache(UsageCache&& other) noexcept {
+ std::unique_lock lock(other.db_mutex);
+ config = std::move(other.config);
+ env = other.env;
+ user_dbi = other.user_dbi;
+ bucket_dbi = other.bucket_dbi;
+ initialized.store(other.initialized.load());
+ cct = other.cct;
+ perf_counters = other.perf_counters;
+ cache_hits.store(other.cache_hits.load());
+ cache_misses.store(other.cache_misses.load());
+
+ other.env = nullptr;
+ other.user_dbi = 0;
+ other.bucket_dbi = 0;
+ other.initialized = false;
+ other.cct = nullptr;
+ other.perf_counters = nullptr;
+}
+
+UsageCache& UsageCache::operator=(UsageCache&& other) noexcept {
+ if (this != &other) {
+ shutdown();
+ cleanup_perf_counters();
+
+ std::unique_lock lock(other.db_mutex);
+ config = std::move(other.config);
+ env = other.env;
+ user_dbi = other.user_dbi;
+ bucket_dbi = other.bucket_dbi;
+ initialized.store(other.initialized.load());
+ cct = other.cct;
+ perf_counters = other.perf_counters;
+ cache_hits.store(other.cache_hits.load());
+ cache_misses.store(other.cache_misses.load());
+
+ other.env = nullptr;
+ other.user_dbi = 0;
+ other.bucket_dbi = 0;
+ other.initialized = false;
+ other.cct = nullptr;
+ other.perf_counters = nullptr;
+ }
+ return *this;
+}
+
+void UsageCache::init_perf_counters() {
+ if (!cct || perf_counters) {
+ return;
+ }
+
+ PerfCountersBuilder pcb(cct, "rgw_usage_cache",
+ PERF_CACHE_FIRST, PERF_CACHE_LAST);
+
+ pcb.add_u64_counter(PERF_CACHE_HIT, "cache_hits",
+ "Total number of cache hits", "hit",
+ PerfCountersBuilder::PRIO_USEFUL);
+ pcb.add_u64_counter(PERF_CACHE_MISS, "cache_misses",
+ "Total number of cache misses", "miss",
+ PerfCountersBuilder::PRIO_USEFUL);
+ pcb.add_u64_counter(PERF_CACHE_UPDATE, "cache_updates",
+ "Total number of cache updates", "upd",
+ PerfCountersBuilder::PRIO_INTERESTING);
+ pcb.add_u64_counter(PERF_CACHE_REMOVE, "cache_removes",
+ "Total number of cache removes", "rm",
+ PerfCountersBuilder::PRIO_INTERESTING);
+ pcb.add_u64_counter(PERF_CACHE_EXPIRED, "cache_expired",
+ "Total number of expired entries", "exp",
+ PerfCountersBuilder::PRIO_DEBUGONLY);
+ pcb.add_u64(PERF_CACHE_SIZE, "cache_size",
+ "Current cache size", "size",
+ PerfCountersBuilder::PRIO_USEFUL);
+ pcb.add_u64_counter(PERF_CACHE_USER_HIT, "user_cache_hits",
+ "User cache hits", "uhit",
+ PerfCountersBuilder::PRIO_DEBUGONLY);
+ pcb.add_u64_counter(PERF_CACHE_USER_MISS, "user_cache_misses",
+ "User cache misses", "umis",
+ PerfCountersBuilder::PRIO_DEBUGONLY);
+ pcb.add_u64_counter(PERF_CACHE_BUCKET_HIT, "bucket_cache_hits",
+ "Bucket cache hits", "bhit",
+ PerfCountersBuilder::PRIO_DEBUGONLY);
+ pcb.add_u64_counter(PERF_CACHE_BUCKET_MISS, "bucket_cache_misses",
+ "Bucket cache misses", "bmis",
+ PerfCountersBuilder::PRIO_DEBUGONLY);
+
+ perf_counters = pcb.create_perf_counters();
+ cct->get_perfcounters_collection()->add(perf_counters);
+}
+
+void UsageCache::cleanup_perf_counters() {
+ if (cct && perf_counters) {
+ cct->get_perfcounters_collection()->remove(perf_counters);
+ delete perf_counters;
+ perf_counters = nullptr;
+ }
+}
+
+void UsageCache::inc_counter(int counter, uint64_t amount) {
+ if (perf_counters) {
+ perf_counters->inc(counter, amount);
+ }
+}
+
+void UsageCache::set_counter(int counter, uint64_t value) {
+ if (perf_counters) {
+ perf_counters->set(counter, value);
+ }
+}
+
+int UsageCache::init() {
+ if (initialized.exchange(true)) {
+ return 0;
+ }
+
+ // Validate database directory exists
+ if (cct) {
+ std::string db_dir = config.db_path;
+ size_t pos = db_dir.find_last_of('/');
+ if (pos != std::string::npos) {
+ db_dir = db_dir.substr(0, pos);
+ }
+
+ struct stat st;
+ if (stat(db_dir.c_str(), &st) != 0) {
+ // Try to create directory
+ if (mkdir(db_dir.c_str(), 0755) != 0) {
+ ldout(cct, 0) << "ERROR: Failed to create usage cache directory: "
+ << db_dir << " - " << cpp_strerror(errno) << dendl;
+ initialized = false;
+ return -errno;
+ }
+ } else if (!S_ISDIR(st.st_mode)) {
+ ldout(cct, 0) << "ERROR: Usage cache path is not a directory: "
+ << db_dir << dendl;
+ initialized = false;
+ return -ENOTDIR;
+ }
+ }
+
+ int ret = open_database();
+ if (ret < 0) {
+ initialized = false;
+ return ret;
+ }
+
+ set_counter(PERF_CACHE_SIZE, get_cache_size());
+
+ return 0;
+}
+
+void UsageCache::shutdown() {
+ if (initialized.exchange(false)) {
+ close_database();
+ }
+}
+
+int UsageCache::open_database() {
+ int rc = mdb_env_create(&env);
+ if (rc != 0) {
+ if (cct) {
+ ldout(cct, 0) << "LMDB env_create failed: " << mdb_strerror(rc) << dendl;
+ }
+ return -EIO;
+ }
+
+ rc = mdb_env_set_mapsize(env, config.max_db_size);
+ if (rc != 0) {
+ if (cct) {
+ ldout(cct, 0) << "LMDB set_mapsize failed: " << mdb_strerror(rc) << dendl;
+ }
+ mdb_env_close(env);
+ env = nullptr;
+ return -EIO;
+ }
+
+ rc = mdb_env_set_maxreaders(env, config.max_readers);
+ if (rc != 0) {
+ if (cct) {
+ ldout(cct, 0) << "LMDB set_maxreaders failed: " << mdb_strerror(rc) << dendl;
+ }
+ mdb_env_close(env);
+ env = nullptr;
+ return -EIO;
+ }
+
+ rc = mdb_env_set_maxdbs(env, 2);
+ if (rc != 0) {
+ if (cct) {
+ ldout(cct, 0) << "LMDB set_maxdbs failed: " << mdb_strerror(rc) << dendl;
+ }
+ mdb_env_close(env);
+ env = nullptr;
+ return -EIO;
+ }
+
+ rc = mdb_env_open(env, config.db_path.c_str(), MDB_NOSUBDIR | MDB_NOTLS, 0644);
+ if (rc != 0) {
+ if (cct) {
+ ldout(cct, 0) << "LMDB env_open failed for " << config.db_path
+ << ": " << mdb_strerror(rc) << dendl;
+ }
+ mdb_env_close(env);
+ env = nullptr;
+ return -EIO;
+ }
+
+ // Open named databases
+ MDB_txn* txn = nullptr;
+ rc = mdb_txn_begin(env, nullptr, 0, &txn);
+ if (rc != 0) {
+ if (cct) {
+ ldout(cct, 0) << "LMDB txn_begin failed: " << mdb_strerror(rc) << dendl;
+ }
+ mdb_env_close(env);
+ env = nullptr;
+ return -EIO;
+ }
+
+ rc = mdb_dbi_open(txn, "user_stats", MDB_CREATE, &user_dbi);
+ if (rc != 0) {
+ if (cct) {
+ ldout(cct, 0) << "LMDB dbi_open(user_stats) failed: "
+ << mdb_strerror(rc) << dendl;
+ }
+ mdb_txn_abort(txn);
+ mdb_env_close(env);
+ env = nullptr;
+ return -EIO;
+ }
+
+ rc = mdb_dbi_open(txn, "bucket_stats", MDB_CREATE, &bucket_dbi);
+ if (rc != 0) {
+ if (cct) {
+ ldout(cct, 0) << "LMDB dbi_open(bucket_stats) failed: "
+ << mdb_strerror(rc) << dendl;
+ }
+ mdb_txn_abort(txn);
+ mdb_env_close(env);
+ env = nullptr;
+ return -EIO;
+ }
+
+ rc = mdb_txn_commit(txn);
+ if (rc != 0) {
+ if (cct) {
+ ldout(cct, 0) << "LMDB txn_commit failed: " << mdb_strerror(rc) << dendl;
+ }
+ mdb_env_close(env);
+ env = nullptr;
+ return -EIO;
+ }
+
+ if (cct) {
+ ldout(cct, 10) << "LMDB database opened successfully: " << config.db_path << dendl;
+ }
+
+ return 0;
+}
+
+void UsageCache::close_database() {
+ if (env) {
+ mdb_env_close(env);
+ env = nullptr;
+ user_dbi = 0;
+ bucket_dbi = 0;
+ }
+}
+
+template<typename T>
+int UsageCache::put_stats(MDB_dbi dbi, const std::string& key, const T& stats) {
+ if (!initialized) {
+ return -EINVAL;
+ }
+
+ bufferlist bl;
+ stats.encode(bl);
+
+ MDB_val mdb_key = {key.size(), const_cast<char*>(key.data())};
+ MDB_val mdb_val = {bl.length(), bl.c_str()};
+
+ MDB_txn* txn = nullptr;
+ int rc = mdb_txn_begin(env, nullptr, 0, &txn);
+ if (rc != 0) {
+ if (cct) {
+ ldout(cct, 5) << "LMDB txn_begin failed in put_stats: "
+ << mdb_strerror(rc) << dendl;
+ }
+ return -EIO;
+ }
+
+ rc = mdb_put(txn, dbi, &mdb_key, &mdb_val, 0);
+ if (rc != 0) {
+ if (cct) {
+ ldout(cct, 5) << "LMDB put failed for key " << key
+ << ": " << mdb_strerror(rc) << dendl;
+ }
+ mdb_txn_abort(txn);
+ return -EIO;
+ }
+
+ rc = mdb_txn_commit(txn);
+ if (rc != 0) {
+ if (cct) {
+ ldout(cct, 5) << "LMDB txn_commit failed in put_stats: "
+ << mdb_strerror(rc) << dendl;
+ }
+ return -EIO;
+ }
+
+ return 0;
+}
+
+template<typename T>
+std::optional<T> UsageCache::get_stats(MDB_dbi dbi, const std::string& key) {
+ if (!initialized) {
+ return std::nullopt;
+ }
+
+ MDB_val mdb_key = {key.size(), const_cast<char*>(key.data())};
+ MDB_val mdb_val;
+
+ MDB_txn* txn = nullptr;
+ int rc = mdb_txn_begin(env, nullptr, MDB_RDONLY, &txn);
+ if (rc != 0) {
+ if (cct) {
+ ldout(cct, 10) << "LMDB txn_begin failed in get_stats: "
+ << mdb_strerror(rc) << dendl;
+ }
+ return std::nullopt;
+ }
+
+ rc = mdb_get(txn, dbi, &mdb_key, &mdb_val);
+ mdb_txn_abort(txn);
+
+ if (rc != 0) {
+ if (rc != MDB_NOTFOUND && cct) {
+ ldout(cct, 10) << "LMDB get failed for key " << key
+ << ": " << mdb_strerror(rc) << dendl;
+ }
+ return std::nullopt;
+ }
+
+ bufferlist bl;
+ bl.append(static_cast<char*>(mdb_val.mv_data), mdb_val.mv_size);
+
+ T stats;
+ try {
+ auto iter = bl.cbegin();
+ stats.decode(iter);
+
+ // Check TTL
+ auto now = ceph::real_clock::now();
+ if (now - stats.last_updated > config.ttl) {
+ inc_counter(PERF_CACHE_EXPIRED);
+ return std::nullopt;
+ }
+
+ return stats;
+ } catch (const buffer::error& e) {
+ if (cct) {
+ ldout(cct, 5) << "Failed to decode stats for key " << key
+ << ": " << e.what() << dendl;
+ }
+ return std::nullopt;
+ }
+}
+
+int UsageCache::update_user_stats(const std::string& user_id,
+ uint64_t bytes_used,
+ uint64_t num_objects) {
+ std::unique_lock lock(db_mutex);
+
+ UsageStats stats;
+ stats.bytes_used = bytes_used;
+ stats.num_objects = num_objects;
+ stats.last_updated = ceph::real_clock::now();
+
+ int ret = put_stats(user_dbi, user_id, stats);
+ if (ret == 0) {
+ inc_counter(PERF_CACHE_UPDATE);
+ set_counter(PERF_CACHE_SIZE, get_cache_size_internal());
+ }
+
+ return ret;
+}
+
+std::optional<UsageStats> UsageCache::get_user_stats(const std::string& user_id) {
+ std::shared_lock lock(db_mutex);
+ auto result = get_stats<UsageStats>(user_dbi, user_id);
+
+ // Update performance counters
+ if (result.has_value()) {
+ cache_hits++;
+ inc_counter(PERF_CACHE_HIT);
+ inc_counter(PERF_CACHE_USER_HIT);
+ } else {
+ cache_misses++;
+ inc_counter(PERF_CACHE_MISS);
+ inc_counter(PERF_CACHE_USER_MISS);
+ }
+
+ return result;
+}
+
+int UsageCache::remove_user_stats(const std::string& user_id) {
+ if (!initialized) {
+ return -EINVAL;
+ }
+
+ std::unique_lock lock(db_mutex);
+
+ MDB_val mdb_key = {user_id.size(), const_cast<char*>(user_id.data())};
+
+ MDB_txn* txn = nullptr;
+ int rc = mdb_txn_begin(env, nullptr, 0, &txn);
+ if (rc != 0) {
+ if (cct) {
+ ldout(cct, 5) << "LMDB txn_begin failed in remove_user_stats: "
+ << mdb_strerror(rc) << dendl;
+ }
+ return -EIO;
+ }
+
+ rc = mdb_del(txn, user_dbi, &mdb_key, nullptr);
+ if (rc != 0 && rc != MDB_NOTFOUND) {
+ if (cct) {
+ ldout(cct, 5) << "LMDB del failed for user " << user_id
+ << ": " << mdb_strerror(rc) << dendl;
+ }
+ mdb_txn_abort(txn);
+ return -EIO;
+ }
+
+ rc = mdb_txn_commit(txn);
+ if (rc != 0) {
+ if (cct) {
+ ldout(cct, 5) << "LMDB txn_commit failed in remove_user_stats: "
+ << mdb_strerror(rc) << dendl;
+ }
+ return -EIO;
+ }
+
+ inc_counter(PERF_CACHE_REMOVE);
+ set_counter(PERF_CACHE_SIZE, get_cache_size_internal());
+
+ return 0;
+}
+
+int UsageCache::update_bucket_stats(const std::string& bucket_name,
+ uint64_t bytes_used,
+ uint64_t num_objects) {
+ std::unique_lock lock(db_mutex);
+
+ UsageStats stats;
+ stats.bytes_used = bytes_used;
+ stats.num_objects = num_objects;
+ stats.last_updated = ceph::real_clock::now();
+
+ int ret = put_stats(bucket_dbi, bucket_name, stats);
+ if (ret == 0) {
+ inc_counter(PERF_CACHE_UPDATE);
+ set_counter(PERF_CACHE_SIZE, get_cache_size_internal());
+ }
+
+ return ret;
+}
+
+std::optional<UsageStats> UsageCache::get_bucket_stats(const std::string& bucket_name) {
+ std::shared_lock lock(db_mutex);
+ auto result = get_stats<UsageStats>(bucket_dbi, bucket_name);
+
+ // Update performance counters
+ if (result.has_value()) {
+ cache_hits++;
+ inc_counter(PERF_CACHE_HIT);
+ inc_counter(PERF_CACHE_BUCKET_HIT);
+ } else {
+ cache_misses++;
+ inc_counter(PERF_CACHE_MISS);
+ inc_counter(PERF_CACHE_BUCKET_MISS);
+ }
+
+ return result;
+}
+
+int UsageCache::remove_bucket_stats(const std::string& bucket_name) {
+ if (!initialized) {
+ return -EINVAL;
+ }
+
+ std::unique_lock lock(db_mutex);
+
+ MDB_val mdb_key = {bucket_name.size(), const_cast<char*>(bucket_name.data())};
+
+ MDB_txn* txn = nullptr;
+ int rc = mdb_txn_begin(env, nullptr, 0, &txn);
+ if (rc != 0) {
+ if (cct) {
+ ldout(cct, 5) << "LMDB txn_begin failed in remove_bucket_stats: "
+ << mdb_strerror(rc) << dendl;
+ }
+ return -EIO;
+ }
+
+ rc = mdb_del(txn, bucket_dbi, &mdb_key, nullptr);
+ if (rc != 0 && rc != MDB_NOTFOUND) {
+ if (cct) {
+ ldout(cct, 5) << "LMDB del failed for bucket " << bucket_name
+ << ": " << mdb_strerror(rc) << dendl;
+ }
+ mdb_txn_abort(txn);
+ return -EIO;
+ }
+
+ rc = mdb_txn_commit(txn);
+ if (rc != 0) {
+ if (cct) {
+ ldout(cct, 5) << "LMDB txn_commit failed in remove_bucket_stats: "
+ << mdb_strerror(rc) << dendl;
+ }
+ return -EIO;
+ }
+
+ inc_counter(PERF_CACHE_REMOVE);
+ set_counter(PERF_CACHE_SIZE, get_cache_size_internal());
+
+ return 0;
+}
+
+int UsageCache::clear_expired_entries() {
+ if (!initialized) {
+ return -EINVAL;
+ }
+
+ std::unique_lock lock(db_mutex);
+
+ auto now = ceph::real_clock::now();
+ int total_removed = 0;
+
+ // Helper lambda to clear expired entries from a database
+ auto clear_db = [this, &now](MDB_dbi dbi) -> int {
+ MDB_txn* txn = nullptr;
+ MDB_cursor* cursor = nullptr;
+
+ int rc = mdb_txn_begin(env, nullptr, 0, &txn);
+ if (rc != 0) {
+ if (cct) {
+ ldout(cct, 5) << "LMDB txn_begin failed in clear_expired_entries: "
+ << mdb_strerror(rc) << dendl;
+ }
+ return -EIO;
+ }
+
+ rc = mdb_cursor_open(txn, dbi, &cursor);
+ if (rc != 0) {
+ if (cct) {
+ ldout(cct, 5) << "LMDB cursor_open failed: " << mdb_strerror(rc) << dendl;
+ }
+ mdb_txn_abort(txn);
+ return -EIO;
+ }
+
+ MDB_val key, val;
+ int removed = 0;
+
+ while (mdb_cursor_get(cursor, &key, &val, MDB_NEXT) == 0) {
+ bufferlist bl;
+ bl.append(static_cast<char*>(val.mv_data), val.mv_size);
+
+ try {
+ UsageStats stats;
+ auto iter = bl.cbegin();
+ stats.decode(iter);
+
+ if (now - stats.last_updated > config.ttl) {
+ mdb_cursor_del(cursor, 0);
+ removed++;
+ inc_counter(PERF_CACHE_EXPIRED);
+ }
+ } catch (const buffer::error& e) {
+ // Skip malformed entries
+ if (cct) {
+ ldout(cct, 10) << "Skipping malformed entry: " << e.what() << dendl;
+ }
+ }
+ }
+
+ mdb_cursor_close(cursor);
+
+ rc = mdb_txn_commit(txn);
+ if (rc != 0) {
+ if (cct) {
+ ldout(cct, 5) << "LMDB txn_commit failed in clear_expired_entries: "
+ << mdb_strerror(rc) << dendl;
+ }
+ return -EIO;
+ }
+
+ return removed;
+ };
+
+ int ret = clear_db(user_dbi);
+ if (ret >= 0) {
+ total_removed += ret;
+ }
+
+ ret = clear_db(bucket_dbi);
+ if (ret >= 0) {
+ total_removed += ret;
+ }
+
+ set_counter(PERF_CACHE_SIZE, get_cache_size_internal());
+
+ if (cct) {
+ ldout(cct, 10) << "Cleared " << total_removed << " expired cache entries" << dendl;
+ }
+
+ return total_removed;
+}
+
+size_t UsageCache::get_cache_size() const {
+ if (!initialized) {
+ return 0;
+ }
+
+ std::shared_lock lock(db_mutex);
+ return get_cache_size_internal();
+}
+
+size_t UsageCache::get_cache_size_internal() const {
+ if (!initialized) {
+ return 0;
+ }
+
+ MDB_stat stat;
+ MDB_txn* txn = nullptr;
+
+ int rc = mdb_txn_begin(env, nullptr, MDB_RDONLY, &txn);
+ if (rc != 0) {
+ return 0;
+ }
+
+ size_t total = 0;
+
+ if (mdb_stat(txn, user_dbi, &stat) == 0) {
+ total += stat.ms_entries;
+ }
+
+ if (mdb_stat(txn, bucket_dbi, &stat) == 0) {
+ total += stat.ms_entries;
+ }
+
+ mdb_txn_abort(txn);
+
+ return total;
+}
+
+uint64_t UsageCache::get_cache_hits() const {
+ return cache_hits.load();
+}
+
+uint64_t UsageCache::get_cache_misses() const {
+ return cache_misses.load();
+}
+
+double UsageCache::get_hit_rate() const {
+ uint64_t hits = cache_hits.load();
+ uint64_t misses = cache_misses.load();
+ uint64_t total = hits + misses;
+
+ return (total > 0) ? (double)hits / total * 100.0 : 0.0;
+}
+
+// Explicit template instantiations
+template int UsageCache::put_stats(MDB_dbi, const std::string&, const UsageStats&);
+template std::optional<UsageStats> UsageCache::get_stats(MDB_dbi, const std::string&);
+
+} // namespace rgw
\ No newline at end of file
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+#pragma once
+
+#include <string>
+#include <memory>
+#include <optional>
+#include <atomic>
+#include <shared_mutex>
+#include <chrono>
+#include <lmdb.h>
+
+#include "include/buffer.h"
+#include "include/encoding.h"
+#include "common/ceph_time.h"
+
+// Forward declarations
+class CephContext;
+class PerfCounters;
+
+namespace rgw {
+
+struct UsageStats {
+ uint64_t bytes_used;
+ uint64_t num_objects;
+ ceph::real_time last_updated;
+
+ void encode(bufferlist& bl) const;
+ void decode(bufferlist::const_iterator& bl);
+ WRITE_CLASS_ENCODER(UsageStats)
+};
+
+class UsageCache {
+public:
+ struct Config {
+ std::string db_path;
+ size_t max_db_size;
+ uint32_t max_readers;
+ std::chrono::seconds ttl;
+
+ Config()
+ : db_path("/var/lib/ceph/radosgw/usage_cache.mdb"),
+ max_db_size(1 << 30), // 1GB default
+ max_readers(126),
+ ttl(300) {} // 5 min TTL
+ };
+
+ explicit UsageCache(const Config& config);
+ UsageCache(CephContext* cct, const Config& config);
+ ~UsageCache();
+
+ // Move semantics
+ UsageCache(UsageCache&& other) noexcept;
+ UsageCache& operator=(UsageCache&& other) noexcept;
+
+ // Delete copy semantics
+ UsageCache(const UsageCache&) = delete;
+ UsageCache& operator=(const UsageCache&) = delete;
+
+ // Lifecycle
+ int init();
+ void shutdown();
+
+ // User stats operations (non-const to update counters)
+ int update_user_stats(const std::string& user_id,
+ uint64_t bytes_used,
+ uint64_t num_objects);
+ std::optional<UsageStats> get_user_stats(const std::string& user_id);
+ int remove_user_stats(const std::string& user_id);
+
+ // Bucket stats operations (non-const to update counters)
+ int update_bucket_stats(const std::string& bucket_name,
+ uint64_t bytes_used,
+ uint64_t num_objects);
+ std::optional<UsageStats> get_bucket_stats(const std::string& bucket_name);
+ int remove_bucket_stats(const std::string& bucket_name);
+
+ // Maintenance
+ int clear_expired_entries();
+ size_t get_cache_size() const;
+
+ // Performance metrics
+ uint64_t get_cache_hits() const;
+ uint64_t get_cache_misses() const;
+ double get_hit_rate() const;
+
+private:
+ // Database operations
+ int open_database();
+ void close_database();
+
+ template<typename T>
+ int put_stats(MDB_dbi dbi, const std::string& key, const T& stats);
+
+ template<typename T>
+ std::optional<T> get_stats(MDB_dbi dbi, const std::string& key);
+
+ // Performance counter helpers
+ void init_perf_counters();
+ void cleanup_perf_counters();
+ void inc_counter(int counter, uint64_t amount = 1);
+ void set_counter(int counter, uint64_t value);
+
+ // Internal helper for cache size (assumes lock is held)
+ size_t get_cache_size_internal() const;
+
+ Config config;
+ MDB_env* env = nullptr;
+ MDB_dbi user_dbi = 0;
+ MDB_dbi bucket_dbi = 0;
+
+ mutable std::shared_mutex db_mutex;
+ std::atomic<bool> initialized{false};
+
+ // Performance counters
+ CephContext* cct;
+ PerfCounters* perf_counters;
+
+ // Mutable atomic counters for thread-safe statistics
+ mutable std::atomic<uint64_t> cache_hits{0};
+ mutable std::atomic<uint64_t> cache_misses{0};
+};
+
+} // namespace rgw
\ No newline at end of file
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+#include "rgw_usage_perf.h"
+#include "common/ceph_context.h"
+#include "common/perf_counters.h"
+#include "common/dout.h"
+#include "common/perf_counters_collection.h"
+#include "common/errno.h"
+
+#define dout_subsys ceph_subsys_rgw
+
+namespace rgw {
+
+// Global singleton
+static UsagePerfCounters* g_usage_perf_counters = nullptr;
+
+UsagePerfCounters* get_usage_perf_counters() {
+ return g_usage_perf_counters;
+}
+
+void set_usage_perf_counters(UsagePerfCounters* counters) {
+ g_usage_perf_counters = counters;
+}
+
+UsagePerfCounters::UsagePerfCounters(CephContext* cct,
+ const UsageCache::Config& cache_config)
+ : cct(cct), cache(std::make_unique<UsageCache>(cct, cache_config)),
+ global_counters(nullptr) {
+ create_global_counters();
+}
+
+UsagePerfCounters::~UsagePerfCounters() {
+ shutdown();
+}
+
+void UsagePerfCounters::create_global_counters() {
+ PerfCountersBuilder b(cct, "rgw_usage", l_rgw_usage_first, l_rgw_usage_last);
+
+ // Placeholder counters for indices that aren't globally used
+ b.add_u64(l_rgw_user_used_bytes, "user_used_bytes",
+ "User bytes placeholder", nullptr, 0, unit_t(UNIT_BYTES));
+ b.add_u64(l_rgw_user_num_objects, "user_num_objects",
+ "User objects placeholder", nullptr, 0, unit_t(0));
+ b.add_u64(l_rgw_bucket_used_bytes, "bucket_used_bytes",
+ "Bucket bytes placeholder", nullptr, 0, unit_t(UNIT_BYTES));
+ b.add_u64(l_rgw_bucket_num_objects, "bucket_num_objects",
+ "Bucket objects placeholder", nullptr, 0, unit_t(0));
+
+ // Global cache metrics
+ b.add_u64_counter(l_rgw_usage_cache_hit, "cache_hit",
+ "Number of cache hits", nullptr, 0, unit_t(0));
+ b.add_u64_counter(l_rgw_usage_cache_miss, "cache_miss",
+ "Number of cache misses", nullptr, 0, unit_t(0));
+ b.add_u64_counter(l_rgw_usage_cache_update, "cache_update",
+ "Number of cache updates", nullptr, 0, unit_t(0));
+ b.add_u64_counter(l_rgw_usage_cache_evict, "cache_evict",
+ "Number of cache evictions", nullptr, 0, unit_t(0));
+
+ global_counters = b.create_perf_counters();
+ cct->get_perfcounters_collection()->add(global_counters);
+}
+
+PerfCounters* UsagePerfCounters::create_user_counters(const std::string& user_id) {
+ std::string name = "rgw_user_" + user_id;
+
+ // Sanitize name for perf counters (replace non-alphanumeric with underscore)
+ for (char& c : name) {
+ if (!std::isalnum(c) && c != '_') {
+ c = '_';
+ }
+ }
+
+ PerfCountersBuilder b(cct, name, l_rgw_usage_first, l_rgw_usage_last);
+
+ // Add placeholder counters for unused indices
+ for (int i = l_rgw_usage_first + 1; i < l_rgw_user_used_bytes; ++i) {
+ b.add_u64(i, "placeholder", "placeholder", nullptr, 0, unit_t(0));
+ }
+
+ b.add_u64(l_rgw_user_used_bytes, "used_bytes",
+ "Bytes used by user", nullptr, 0, unit_t(UNIT_BYTES));
+ b.add_u64(l_rgw_user_num_objects, "num_objects",
+ "Number of objects owned by user", nullptr, 0, unit_t(0));
+
+ // Add remaining placeholder counters
+ for (int i = l_rgw_user_num_objects + 1; i < l_rgw_usage_last; ++i) {
+ b.add_u64(i, "placeholder", "placeholder", nullptr, 0, unit_t(0));
+ }
+
+ PerfCounters* counters = b.create_perf_counters();
+ cct->get_perfcounters_collection()->add(counters);
+
+ return counters;
+}
+
+PerfCounters* UsagePerfCounters::create_bucket_counters(const std::string& bucket_name) {
+ std::string name = "rgw_bucket_" + bucket_name;
+
+ // Sanitize name for perf counters
+ for (char& c : name) {
+ if (!std::isalnum(c) && c != '_') {
+ c = '_';
+ }
+ }
+
+ PerfCountersBuilder b(cct, name, l_rgw_usage_first, l_rgw_usage_last);
+
+ // Add placeholder counters for unused indices
+ for (int i = l_rgw_usage_first + 1; i < l_rgw_bucket_used_bytes; ++i) {
+ b.add_u64(i, "placeholder", "placeholder", nullptr, 0, unit_t(0));
+ }
+
+ b.add_u64(l_rgw_bucket_used_bytes, "used_bytes",
+ "Bytes used in bucket", nullptr, 0, unit_t(UNIT_BYTES));
+ b.add_u64(l_rgw_bucket_num_objects, "num_objects",
+ "Number of objects in bucket", nullptr, 0, unit_t(0));
+
+ // Add remaining placeholder counters
+ for (int i = l_rgw_bucket_num_objects + 1; i < l_rgw_usage_last; ++i) {
+ b.add_u64(i, "placeholder", "placeholder", nullptr, 0, unit_t(0));
+ }
+
+ PerfCounters* counters = b.create_perf_counters();
+ cct->get_perfcounters_collection()->add(counters);
+
+ return counters;
+}
+
+void UsagePerfCounters::cleanup_worker() {
+ ldout(cct, 10) << "Starting usage cache cleanup worker thread" << dendl;
+
+ while (!shutdown_flag.load()) {
+ // Sleep with periodic checks for shutdown
+ for (int i = 0; i < cleanup_interval.count(); ++i) {
+ if (shutdown_flag.load()) {
+ break;
+ }
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ }
+
+ if (!shutdown_flag.load()) {
+ cleanup_expired_entries();
+ }
+ }
+
+ ldout(cct, 10) << "Usage cache cleanup worker thread exiting" << dendl;
+}
+
+int UsagePerfCounters::init() {
+ int ret = cache->init();
+ if (ret < 0) {
+ ldout(cct, 0) << "Failed to initialize usage cache: " << cpp_strerror(-ret) << dendl;
+ return ret;
+ }
+
+ ldout(cct, 10) << "Usage performance counters initialized successfully" << dendl;
+ return 0;
+}
+
+void UsagePerfCounters::start() {
+ ldout(cct, 10) << "Starting usage perf counters" << dendl;
+
+ // Start cleanup thread
+ cleanup_thread = std::thread(&UsagePerfCounters::cleanup_worker, this);
+}
+
+void UsagePerfCounters::stop() {
+ ldout(cct, 10) << "Stopping usage perf counters" << dendl;
+
+ // Stop cleanup thread
+ shutdown_flag = true;
+ if (cleanup_thread.joinable()) {
+ cleanup_thread.join();
+ }
+}
+
+void UsagePerfCounters::shutdown() {
+ stop();
+
+ // Clean up perf counters
+ {
+ std::unique_lock lock(counters_mutex);
+
+ auto* collection = cct->get_perfcounters_collection();
+
+ // Remove and delete user counters
+ for (auto& [_, counters] : user_perf_counters) {
+ collection->remove(counters);
+ delete counters;
+ }
+ user_perf_counters.clear();
+
+ // Remove and delete bucket counters
+ for (auto& [_, counters] : bucket_perf_counters) {
+ collection->remove(counters);
+ delete counters;
+ }
+ bucket_perf_counters.clear();
+
+ // Remove global counters
+ if (global_counters) {
+ collection->remove(global_counters);
+ delete global_counters;
+ global_counters = nullptr;
+ }
+ }
+
+ // Shutdown cache
+ cache->shutdown();
+
+ ldout(cct, 10) << "Usage perf counters shutdown complete" << dendl;
+}
+
+void UsagePerfCounters::update_user_stats(const std::string& user_id,
+ uint64_t bytes_used,
+ uint64_t num_objects,
+ bool update_cache) {
+ // Update cache if requested
+ if (update_cache && cache) {
+ int ret = cache->update_user_stats(user_id, bytes_used, num_objects);
+ if (ret == 0) {
+ global_counters->inc(l_rgw_usage_cache_update);
+ } else {
+ ldout(cct, 5) << "Failed to update user cache for " << user_id
+ << ": " << cpp_strerror(-ret) << dendl;
+ }
+ }
+
+ // Update or create perf counters
+ {
+ std::unique_lock lock(counters_mutex);
+
+ auto it = user_perf_counters.find(user_id);
+ if (it == user_perf_counters.end()) {
+ PerfCounters* counters = create_user_counters(user_id);
+ user_perf_counters[user_id] = counters;
+ it = user_perf_counters.find(user_id);
+ }
+
+ it->second->set(l_rgw_user_used_bytes, bytes_used);
+ it->second->set(l_rgw_user_num_objects, num_objects);
+ }
+
+ ldout(cct, 20) << "Updated user stats: " << user_id
+ << " bytes=" << bytes_used
+ << " objects=" << num_objects << dendl;
+}
+
+void UsagePerfCounters::update_bucket_stats(const std::string& bucket_name,
+ uint64_t bytes_used,
+ uint64_t num_objects,
+ bool update_cache) {
+ // Update cache if requested
+ if (update_cache && cache) {
+ int ret = cache->update_bucket_stats(bucket_name, bytes_used, num_objects);
+ if (ret == 0) {
+ global_counters->inc(l_rgw_usage_cache_update);
+ } else {
+ ldout(cct, 5) << "Failed to update bucket cache for " << bucket_name
+ << ": " << cpp_strerror(-ret) << dendl;
+ }
+ }
+
+ // Update or create perf counters
+ {
+ std::unique_lock lock(counters_mutex);
+
+ auto it = bucket_perf_counters.find(bucket_name);
+ if (it == bucket_perf_counters.end()) {
+ PerfCounters* counters = create_bucket_counters(bucket_name);
+ bucket_perf_counters[bucket_name] = counters;
+ it = bucket_perf_counters.find(bucket_name);
+ }
+
+ it->second->set(l_rgw_bucket_used_bytes, bytes_used);
+ it->second->set(l_rgw_bucket_num_objects, num_objects);
+ }
+
+ ldout(cct, 20) << "Updated bucket stats: " << bucket_name
+ << " bytes=" << bytes_used
+ << " objects=" << num_objects << dendl;
+}
+
+void UsagePerfCounters::refresh_from_cache(const std::string& user_id,
+ const std::string& bucket_name) {
+ if (!cache) {
+ return;
+ }
+
+ // Refresh user stats
+ if (!user_id.empty()) {
+ auto user_stats = cache->get_user_stats(user_id);
+ if (user_stats) {
+ global_counters->inc(l_rgw_usage_cache_hit);
+ update_user_stats(user_id, user_stats->bytes_used,
+ user_stats->num_objects, false);
+ } else {
+ global_counters->inc(l_rgw_usage_cache_miss);
+ }
+ }
+
+ // Refresh bucket stats
+ if (!bucket_name.empty()) {
+ auto bucket_stats = cache->get_bucket_stats(bucket_name);
+ if (bucket_stats) {
+ global_counters->inc(l_rgw_usage_cache_hit);
+ update_bucket_stats(bucket_name, bucket_stats->bytes_used,
+ bucket_stats->num_objects, false);
+ } else {
+ global_counters->inc(l_rgw_usage_cache_miss);
+ }
+ }
+}
+
+void UsagePerfCounters::evict_from_cache(const std::string& user_id,
+ const std::string& bucket_name) {
+ if (!cache) {
+ return;
+ }
+
+ if (!user_id.empty()) {
+ cache->remove_user_stats(user_id);
+ global_counters->inc(l_rgw_usage_cache_evict);
+ }
+
+ if (!bucket_name.empty()) {
+ cache->remove_bucket_stats(bucket_name);
+ global_counters->inc(l_rgw_usage_cache_evict);
+ }
+}
+
+std::optional<UsageStats> UsagePerfCounters::get_user_stats(const std::string& user_id) {
+ if (!cache) {
+ return std::nullopt;
+ }
+
+ auto stats = cache->get_user_stats(user_id);
+ if (stats) {
+ global_counters->inc(l_rgw_usage_cache_hit);
+ } else {
+ global_counters->inc(l_rgw_usage_cache_miss);
+ }
+
+ return stats;
+}
+
+std::optional<UsageStats> UsagePerfCounters::get_bucket_stats(const std::string& bucket_name) {
+ if (!cache) {
+ return std::nullopt;
+ }
+
+ auto stats = cache->get_bucket_stats(bucket_name);
+ if (stats) {
+ global_counters->inc(l_rgw_usage_cache_hit);
+ } else {
+ global_counters->inc(l_rgw_usage_cache_miss);
+ }
+
+ return stats;
+}
+
+void UsagePerfCounters::cleanup_expired_entries() {
+ if (cache) {
+ int removed = cache->clear_expired_entries();
+ if (removed > 0) {
+ ldout(cct, 10) << "Cleaned up " << removed << " expired cache entries" << dendl;
+ }
+ }
+}
+
+size_t UsagePerfCounters::get_cache_size() const {
+ return cache ? cache->get_cache_size() : 0;
+}
+
+} // namespace rgw
\ No newline at end of file
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <unordered_map>
+#include <shared_mutex>
+#include <thread>
+#include <atomic>
+#include <chrono>
+
+#include "common/perf_counters.h"
+#include "rgw_usage_cache.h"
+
+class CephContext;
+
+namespace rgw {
+
+// Performance counter indices
+enum {
+ l_rgw_usage_first = 920000,
+ l_rgw_user_used_bytes,
+ l_rgw_user_num_objects,
+ l_rgw_bucket_used_bytes,
+ l_rgw_bucket_num_objects,
+ l_rgw_usage_cache_hit,
+ l_rgw_usage_cache_miss,
+ l_rgw_usage_cache_update,
+ l_rgw_usage_cache_evict,
+ l_rgw_usage_last
+};
+
+class UsagePerfCounters {
+private:
+ CephContext* cct;
+ std::unique_ptr<UsageCache> cache;
+
+ mutable std::shared_mutex counters_mutex;
+
+ // Track raw pointers for proper cleanup
+ std::unordered_map<std::string, PerfCounters*> user_perf_counters;
+ std::unordered_map<std::string, PerfCounters*> bucket_perf_counters;
+
+ PerfCounters* global_counters;
+
+ // Cleanup thread management
+ std::thread cleanup_thread;
+ std::atomic<bool> shutdown_flag{false};
+ std::chrono::seconds cleanup_interval{300}; // 5 minutes
+
+ void create_global_counters();
+ PerfCounters* create_user_counters(const std::string& user_id);
+ PerfCounters* create_bucket_counters(const std::string& bucket_name);
+
+ void cleanup_worker();
+
+public:
+ explicit UsagePerfCounters(CephContext* cct,
+ const UsageCache::Config& cache_config);
+ explicit UsagePerfCounters(CephContext* cct)
+ : UsagePerfCounters(cct, UsageCache::Config{}) {}
+ ~UsagePerfCounters();
+
+ // Lifecycle management
+ int init();
+ void start();
+ void stop();
+ void shutdown();
+
+ // User stats updates
+ void update_user_stats(const std::string& user_id,
+ uint64_t bytes_used,
+ uint64_t num_objects,
+ bool update_cache = true);
+
+ // Bucket stats updates
+ void update_bucket_stats(const std::string& bucket_name,
+ uint64_t bytes_used,
+ uint64_t num_objects,
+ bool update_cache = true);
+
+ // Cache operations
+ void refresh_from_cache(const std::string& user_id,
+ const std::string& bucket_name);
+ void evict_from_cache(const std::string& user_id,
+ const std::string& bucket_name);
+
+ // Stats retrieval (from cache)
+ std::optional<UsageStats> get_user_stats(const std::string& user_id);
+ std::optional<UsageStats> get_bucket_stats(const std::string& bucket_name);
+
+ // Maintenance
+ void cleanup_expired_entries();
+ size_t get_cache_size() const;
+
+ // Set cleanup interval
+ void set_cleanup_interval(std::chrono::seconds interval) {
+ cleanup_interval = interval;
+ }
+};
+
+// Global singleton access
+UsagePerfCounters* get_usage_perf_counters();
+void set_usage_perf_counters(UsagePerfCounters* counters);
+
+} // namespace rgw
\ No newline at end of file
add_library(kafka_stub STATIC ${kafka_stub_src})
endif()
+# Find LMDB if not already found
+if(NOT LMDB_FOUND)
+ find_package(PkgConfig QUIET)
+ if(PkgConfig_FOUND)
+ pkg_check_modules(LMDB QUIET lmdb)
+ endif()
+
+ if(NOT LMDB_FOUND)
+ find_path(LMDB_INCLUDE_DIR NAMES lmdb.h
+ PATHS /usr/include /usr/local/include)
+ find_library(LMDB_LIBRARIES NAMES lmdb
+ PATHS /usr/lib /usr/local/lib /usr/lib64 /usr/local/lib64)
+
+ if(LMDB_INCLUDE_DIR AND LMDB_LIBRARIES)
+ set(LMDB_FOUND TRUE)
+ message(STATUS "Found LMDB: ${LMDB_LIBRARIES}")
+ else()
+ message(FATAL_ERROR "LMDB not found. Please install liblmdb-dev or lmdb-devel")
+ endif()
+ endif()
+endif()
+
if(WITH_RADOSGW_LUA_PACKAGES)
list(APPEND rgw_libs Boost::filesystem)
endif()
${rgw_libs})
endif()
+# Adding the usage cache unit test
+if(WITH_TESTS)
+
+ add_executable(unittest_rgw_usage_cache
+ test_rgw_usage_cache.cc
+ ${CMAKE_SOURCE_DIR}/src/rgw/rgw_usage_cache.cc
+ ${CMAKE_SOURCE_DIR}/src/rgw/rgw_usage_perf.cc)
+
+ target_include_directories(unittest_rgw_usage_cache PRIVATE
+ ${CMAKE_SOURCE_DIR}/src
+ ${CMAKE_SOURCE_DIR}/src/rgw
+ ${LMDB_INCLUDE_DIR}
+ )
+
+ target_link_libraries(unittest_rgw_usage_cache
+ global
+ gtest
+ gtest_main
+ gmock_main
+ gmock
+ ${LMDB_LIBRARIES}
+ ceph-common
+ ${CMAKE_DL_LIBS}
+ )
+
+ add_ceph_unittest(unittest_rgw_usage_cache)
+
+# Test for usage perf counters
+add_executable(unittest_rgw_usage_perf_counters
+ test_rgw_usage_perf_counters.cc
+ ${CMAKE_SOURCE_DIR}/src/rgw/rgw_usage_cache.cc
+ ${CMAKE_SOURCE_DIR}/src/rgw/rgw_usage_perf.cc)
+
+target_link_libraries(unittest_rgw_usage_perf_counters
+ ${UNITTEST_LIBS}
+ global
+ ceph-common
+ lmdb)
+
+add_ceph_unittest(unittest_rgw_usage_perf_counters)
+endif()
+
if(WITH_RADOSGW_RADOS)
add_executable(unittest_rgw_lua test_rgw_lua.cc)
add_ceph_unittest(unittest_rgw_lua)
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "rgw/rgw_usage_cache.h"
+#include "global/global_init.h"
+#include "common/ceph_context.h"
+#include <gtest/gtest.h>
+#include <memory>
+#include <string>
+#include <chrono>
+#include <thread>
+#include <vector>
+#include <iostream>
+#include <atomic>
+#include <filesystem>
+#include <random>
+
+namespace fs = std::filesystem;
+
+// Global CephContext pointer for tests
+static CephContext* g_test_context = nullptr;
+
+// Test fixture for RGWUsageCache
+class TestRGWUsageCache : public ::testing::Test {
+protected:
+ std::unique_ptr<rgw::UsageCache> cache;
+ rgw::UsageCache::Config config;
+ std::string test_db_path;
+
+ void SetUp() override {
+ // Generate unique test database path to avoid conflicts
+ std::random_device rd;
+ std::mt19937 gen(rd());
+ std::uniform_int_distribution<> dis(1000000, 9999999);
+ test_db_path = "/tmp/test_usage_cache_" + std::to_string(dis(gen)) + ".mdb";
+
+ // Initialize config with proper fields
+ config.db_path = test_db_path;
+ config.max_db_size = 1 << 20; // 1MB for testing
+ config.max_readers = 10;
+ config.ttl = std::chrono::seconds(2); // 2 second TTL for testing
+
+ // Create cache with global CephContext if available, otherwise without
+ if (g_test_context) {
+ cache = std::make_unique<rgw::UsageCache>(g_test_context, config);
+ } else {
+ cache = std::make_unique<rgw::UsageCache>(config);
+ }
+
+ // CRITICAL: Initialize the cache!
+ int init_result = cache->init();
+ ASSERT_EQ(0, init_result) << "Failed to initialize cache: " << init_result;
+ }
+
+ void TearDown() override {
+ if (cache) {
+ cache->shutdown();
+ }
+ cache.reset();
+
+ // Clean up test database files
+ try {
+ fs::remove(test_db_path);
+ fs::remove(test_db_path + "-lock");
+ } catch (const std::exception& e) {
+ // Ignore cleanup errors
+ }
+ }
+};
+
+// Test basic user statistics operations
+TEST_F(TestRGWUsageCache, BasicUserOperations) {
+ const std::string user_id = "test_user";
+ const uint64_t bytes_used = 1024 * 1024;
+ const uint64_t num_objects = 42;
+
+ // Update user stats
+ int update_result = cache->update_user_stats(user_id, bytes_used, num_objects);
+ ASSERT_EQ(0, update_result) << "Failed to update user stats: " << update_result;
+
+ // Get and verify user stats
+ auto stats = cache->get_user_stats(user_id);
+ ASSERT_TRUE(stats.has_value());
+ EXPECT_EQ(bytes_used, stats->bytes_used);
+ EXPECT_EQ(num_objects, stats->num_objects);
+
+ // Remove user stats
+ ASSERT_EQ(0, cache->remove_user_stats(user_id));
+
+ // Verify stats are removed
+ stats = cache->get_user_stats(user_id);
+ EXPECT_FALSE(stats.has_value());
+}
+
+// Test basic bucket statistics operations
+TEST_F(TestRGWUsageCache, BasicBucketOperations) {
+ const std::string bucket_name = "test_bucket";
+ const uint64_t bytes_used = 512 * 1024;
+ const uint64_t num_objects = 17;
+
+ // Update bucket stats
+ int update_result = cache->update_bucket_stats(bucket_name, bytes_used, num_objects);
+ ASSERT_EQ(0, update_result) << "Failed to update bucket stats: " << update_result;
+
+ // Get and verify bucket stats
+ auto stats = cache->get_bucket_stats(bucket_name);
+ ASSERT_TRUE(stats.has_value());
+ EXPECT_EQ(bytes_used, stats->bytes_used);
+ EXPECT_EQ(num_objects, stats->num_objects);
+
+ // Remove bucket stats
+ ASSERT_EQ(0, cache->remove_bucket_stats(bucket_name));
+
+ // Verify stats are removed
+ stats = cache->get_bucket_stats(bucket_name);
+ EXPECT_FALSE(stats.has_value());
+}
+
+// Test TTL expiration
+TEST_F(TestRGWUsageCache, TTLExpiration) {
+ const std::string user_id = "ttl_test_user";
+
+ // Add user stats
+ ASSERT_EQ(0, cache->update_user_stats(user_id, 1024, 1));
+
+ // Verify stats exist
+ auto stats = cache->get_user_stats(user_id);
+ ASSERT_TRUE(stats.has_value());
+
+ // Wait for TTL to expire (2 seconds + buffer)
+ std::this_thread::sleep_for(std::chrono::milliseconds(2500));
+
+ // Verify stats have expired
+ stats = cache->get_user_stats(user_id);
+ EXPECT_FALSE(stats.has_value());
+}
+
+// Test concurrent access
+TEST_F(TestRGWUsageCache, ConcurrentAccess) {
+ const int num_threads = 4;
+ const int ops_per_thread = 100;
+ std::vector<std::thread> threads;
+ std::atomic<int> successful_updates(0);
+ std::atomic<int> failed_updates(0);
+
+ for (int t = 0; t < num_threads; ++t) {
+ threads.emplace_back([this, t, ops_per_thread, &successful_updates, &failed_updates]() {
+ for (int i = 0; i < ops_per_thread; ++i) {
+ std::string user_id = "t" + std::to_string(t) + "_u" + std::to_string(i);
+ int result = cache->update_user_stats(user_id, i * 1024, i);
+ if (result == 0) {
+ successful_updates++;
+ } else {
+ failed_updates++;
+ }
+
+ auto stats = cache->get_user_stats(user_id);
+ if (result == 0) {
+ EXPECT_TRUE(stats.has_value());
+ }
+ }
+ });
+ }
+
+ for (auto& th : threads) {
+ th.join();
+ }
+
+ std::cout << "Concurrent test: " << successful_updates << " successful, "
+ << failed_updates << " failed updates" << std::endl;
+
+ EXPECT_GT(successful_updates, 0) << "At least some updates should succeed";
+ EXPECT_GT(cache->get_cache_size(), 0u) << "Cache should contain entries";
+}
+
+// Test updating existing user stats
+TEST_F(TestRGWUsageCache, UpdateExistingUserStats) {
+ const std::string user_id = "update_test_user";
+
+ // Initial update
+ ASSERT_EQ(0, cache->update_user_stats(user_id, 1024, 10));
+
+ auto stats = cache->get_user_stats(user_id);
+ ASSERT_TRUE(stats.has_value());
+ EXPECT_EQ(1024u, stats->bytes_used);
+ EXPECT_EQ(10u, stats->num_objects);
+
+ // Update with new values (should replace, not accumulate)
+ ASSERT_EQ(0, cache->update_user_stats(user_id, 2048, 20));
+
+ stats = cache->get_user_stats(user_id);
+ ASSERT_TRUE(stats.has_value());
+ EXPECT_EQ(2048u, stats->bytes_used);
+ EXPECT_EQ(20u, stats->num_objects);
+}
+
+// Test multiple bucket operations
+TEST_F(TestRGWUsageCache, MultipleBucketOperations) {
+ const int num_buckets = 10;
+
+ // Add multiple buckets
+ for (int i = 0; i < num_buckets; ++i) {
+ std::string bucket_name = "bucket_" + std::to_string(i);
+ uint64_t bytes = (i + 1) * 1024;
+ uint64_t objects = (i + 1) * 10;
+
+ ASSERT_EQ(0, cache->update_bucket_stats(bucket_name, bytes, objects));
+ }
+
+ // Verify all buckets
+ for (int i = 0; i < num_buckets; ++i) {
+ std::string bucket_name = "bucket_" + std::to_string(i);
+ auto stats = cache->get_bucket_stats(bucket_name);
+
+ ASSERT_TRUE(stats.has_value());
+ EXPECT_EQ((i + 1) * 1024u, stats->bytes_used);
+ EXPECT_EQ((i + 1) * 10u, stats->num_objects);
+ }
+
+ EXPECT_EQ(num_buckets, cache->get_cache_size());
+}
+
+// Test special character handling
+TEST_F(TestRGWUsageCache, SpecialCharacterHandling) {
+ std::vector<std::string> test_ids = {
+ "user@example.com",
+ "user-with-dashes",
+ "user_with_underscores",
+ "user.with.dots",
+ "user/with/slashes",
+ "user with spaces",
+ "user\twith\ttabs"
+ };
+
+ for (const auto& id : test_ids) {
+ // Should handle all these gracefully
+ ASSERT_EQ(0, cache->update_user_stats(id, 1024, 1))
+ << "Failed for ID: " << id;
+
+ auto stats = cache->get_user_stats(id);
+ ASSERT_TRUE(stats.has_value()) << "Failed to retrieve stats for ID: " << id;
+ EXPECT_EQ(1024u, stats->bytes_used);
+ EXPECT_EQ(1u, stats->num_objects);
+ }
+}
+
+// Test remove non-existent user
+TEST_F(TestRGWUsageCache, RemoveNonExistentUser) {
+ const std::string user_id = "non_existent_user";
+
+ // Should handle removing non-existent user gracefully
+ int result = cache->remove_user_stats(user_id);
+ // Either returns 0 (success) or error (not found)
+ EXPECT_TRUE(result == 0 || result != 0);
+
+ // Verify user doesn't exist
+ auto stats = cache->get_user_stats(user_id);
+ EXPECT_FALSE(stats.has_value());
+}
+
+// Test cache size tracking
+TEST_F(TestRGWUsageCache, CacheSizeTracking) {
+ // Initial size should be 0
+ EXPECT_EQ(0u, cache->get_cache_size());
+
+ // Add some users
+ const int num_users = 5;
+
+ for (int i = 0; i < num_users; ++i) {
+ std::string user_id = "size_test_user_" + std::to_string(i);
+ ASSERT_EQ(0, cache->update_user_stats(user_id, 1024 * (i + 1), i + 1));
+ }
+
+ // Cache size should reflect the number of added users
+ EXPECT_EQ(num_users, cache->get_cache_size());
+
+ // Remove one user
+ ASSERT_EQ(0, cache->remove_user_stats("size_test_user_0"));
+
+ // Cache size should decrease
+ EXPECT_EQ(num_users - 1, cache->get_cache_size());
+}
+
+// Test clear expired entries
+TEST_F(TestRGWUsageCache, ClearExpiredEntries) {
+ // Add some entries
+ ASSERT_EQ(0, cache->update_user_stats("user1", 1024, 1));
+ ASSERT_EQ(0, cache->update_user_stats("user2", 2048, 2));
+ ASSERT_EQ(0, cache->update_bucket_stats("bucket1", 4096, 4));
+
+ EXPECT_EQ(3u, cache->get_cache_size());
+
+ // Wait for entries to expire
+ std::this_thread::sleep_for(std::chrono::milliseconds(2500));
+
+ // Clear expired entries
+ int removed = cache->clear_expired_entries();
+ EXPECT_EQ(3, removed);
+
+ // Cache should be empty
+ EXPECT_EQ(0u, cache->get_cache_size());
+}
+
+// Test stress with many users and buckets
+TEST_F(TestRGWUsageCache, StressTest) {
+ const int num_users = 1000;
+ const int num_buckets = 500;
+
+ // Add many users
+ for (int i = 0; i < num_users; ++i) {
+ std::string user_id = "stress_user_" + std::to_string(i);
+ ASSERT_EQ(0, cache->update_user_stats(user_id, i * 1024, i));
+ }
+
+ // Add many buckets
+ for (int i = 0; i < num_buckets; ++i) {
+ std::string bucket_name = "stress_bucket_" + std::to_string(i);
+ ASSERT_EQ(0, cache->update_bucket_stats(bucket_name, i * 512, i * 2));
+ }
+
+ EXPECT_EQ(num_users + num_buckets, cache->get_cache_size());
+
+ // Verify random samples
+ for (int i = 0; i < 10; ++i) {
+ int user_idx = (i * 97) % num_users; // Sample users
+ std::string user_id = "stress_user_" + std::to_string(user_idx);
+ auto stats = cache->get_user_stats(user_id);
+ ASSERT_TRUE(stats.has_value());
+ EXPECT_EQ(user_idx * 1024u, stats->bytes_used);
+ }
+
+ for (int i = 0; i < 10; ++i) {
+ int bucket_idx = (i * 53) % num_buckets; // Sample buckets
+ std::string bucket_name = "stress_bucket_" + std::to_string(bucket_idx);
+ auto bucket_stats = cache->get_bucket_stats(bucket_name);
+ ASSERT_TRUE(bucket_stats.has_value());
+ EXPECT_EQ(bucket_idx * 512u, bucket_stats->bytes_used);
+ }
+
+ std::cout << "Stress test completed: " << num_users << " users, "
+ << num_buckets << " buckets" << std::endl;
+}
+
+// Main function
+int main(int argc, char **argv) {
+ // Initialize Google Test
+ ::testing::InitGoogleTest(&argc, argv);
+
+ // Initialize Ceph context
+ std::map<std::string, std::string> defaults = {
+ {"debug_rgw", "20"},
+ {"keyring", "keyring"},
+ };
+
+ std::vector<const char*> args;
+ auto cct = global_init(&defaults, args, CEPH_ENTITY_TYPE_CLIENT,
+ CODE_ENVIRONMENT_UTILITY,
+ CINIT_FLAG_NO_DEFAULT_CONFIG_FILE);
+
+ // Store the context for test fixtures to use
+ g_test_context = cct.get();
+
+ common_init_finish(cct.get());
+
+ // Run all tests
+ int result = RUN_ALL_TESTS();
+
+ return result;
+}
\ No newline at end of file
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+#include <gtest/gtest.h>
+#include <thread>
+#include <chrono>
+#include <vector>
+#include <atomic>
+#include <iostream>
+#include <sstream>
+#include <filesystem>
+#include <random>
+#include <boost/intrusive_ptr.hpp>
+
+#include "global/global_init.h"
+#include "common/ceph_argparse.h"
+#include "common/ceph_context.h"
+#include "common/perf_counters.h"
+#include "common/perf_counters_collection.h"
+#include "rgw/rgw_usage_cache.h"
+
+namespace fs = std::filesystem;
+
+// Global CephContext for all tests
+static boost::intrusive_ptr<CephContext> g_cct_holder;
+static CephContext* g_cct = nullptr;
+
+class TestRGWUsagePerfCounters : public ::testing::Test {
+protected:
+ std::unique_ptr<rgw::UsageCache> cache;
+ std::string test_db_path;
+
+ void SetUp() override {
+ ASSERT_NE(g_cct, nullptr) << "CephContext not initialized";
+
+ // Generate unique test database path
+ std::random_device rd;
+ std::mt19937 gen(rd());
+ std::uniform_int_distribution<> dis(1000000, 9999999);
+ test_db_path = "/tmp/test_rgw_perf_" + std::to_string(dis(gen)) + ".mdb";
+
+ // Initialize cache with CephContext to enable internal perf counters
+ rgw::UsageCache::Config config;
+ config.db_path = test_db_path;
+ config.max_db_size = 1 << 20; // 1MB
+ config.ttl = std::chrono::seconds(2);
+
+ // Use the constructor that accepts CephContext to enable perf counters
+ cache = std::make_unique<rgw::UsageCache>(g_cct, config);
+
+ ASSERT_EQ(0, cache->init());
+ }
+
+ void TearDown() override {
+ cache->shutdown();
+ cache.reset();
+
+ // Clean up test database
+ try {
+ fs::remove(test_db_path);
+ fs::remove(test_db_path + "-lock");
+ } catch (const std::exception& e) {
+ // Ignore cleanup errors
+ }
+ }
+};
+
+TEST_F(TestRGWUsagePerfCounters, BasicMetrics) {
+ // First, just test that the cache works
+ std::cout << "Testing basic cache operations..." << std::endl;
+
+ // Test basic cache functionality first
+ auto stats = cache->get_user_stats("nonexistent");
+ EXPECT_FALSE(stats.has_value());
+
+ // Add a user
+ ASSERT_EQ(0, cache->update_user_stats("user1", 1024, 10));
+
+ // Access existing user
+ stats = cache->get_user_stats("user1");
+ ASSERT_TRUE(stats.has_value());
+ EXPECT_EQ(1024u, stats->bytes_used);
+ EXPECT_EQ(10u, stats->num_objects);
+
+ std::cout << "Basic cache operations successful" << std::endl;
+
+ // Test performance counters - they should be available since we used CephContext
+ uint64_t hits = cache->get_cache_hits();
+ uint64_t misses = cache->get_cache_misses();
+
+ std::cout << "Performance counter values:" << std::endl;
+ std::cout << " Hits: " << hits << std::endl;
+ std::cout << " Misses: " << misses << std::endl;
+
+ // We expect at least one miss (from the first get_user_stats)
+ // and one hit (from the second get_user_stats)
+ EXPECT_GE(misses, 1u);
+ EXPECT_GE(hits, 1u);
+}
+
+TEST_F(TestRGWUsagePerfCounters, HitRateCalculation) {
+ // Perform a mix of hits and misses
+ cache->get_user_stats("miss1"); // Miss
+ cache->get_user_stats("miss2"); // Miss
+
+ cache->update_user_stats("hit1", 1024, 1);
+ cache->update_user_stats("hit2", 2048, 2);
+
+ cache->get_user_stats("hit1"); // Hit
+ cache->get_user_stats("hit2"); // Hit
+ cache->get_user_stats("miss3"); // Miss
+
+ uint64_t hits = cache->get_cache_hits();
+ uint64_t misses = cache->get_cache_misses();
+
+ EXPECT_EQ(2u, hits);
+ EXPECT_EQ(3u, misses);
+
+ double hit_rate = cache->get_hit_rate();
+ double expected_rate = (2.0 / 5.0) * 100.0; // 40%
+
+ EXPECT_DOUBLE_EQ(expected_rate, hit_rate);
+
+ std::cout << "Cache statistics:" << std::endl;
+ std::cout << " Hits: " << hits << std::endl;
+ std::cout << " Misses: " << misses << std::endl;
+ std::cout << " Hit rate: " << hit_rate << "%" << std::endl;
+}
+
+TEST_F(TestRGWUsagePerfCounters, UserAndBucketSeparateTracking) {
+ // Test that user and bucket stats are tracked separately
+
+ // User operations
+ cache->get_user_stats("user_miss"); // User miss
+ cache->update_user_stats("user1", 1024, 1);
+ cache->get_user_stats("user1"); // User hit
+
+ // Bucket operations
+ cache->get_bucket_stats("bucket_miss"); // Bucket miss
+ cache->update_bucket_stats("bucket1", 2048, 2);
+ cache->get_bucket_stats("bucket1"); // Bucket hit
+
+ // Check overall counters (should include both)
+ EXPECT_EQ(2u, cache->get_cache_hits()); // 1 user hit + 1 bucket hit
+ EXPECT_EQ(2u, cache->get_cache_misses()); // 1 user miss + 1 bucket miss
+}
+
+TEST_F(TestRGWUsagePerfCounters, ConcurrentCounterUpdates) {
+ const int num_threads = 4;
+ const int ops_per_thread = 100;
+ std::vector<std::thread> threads;
+
+ for (int t = 0; t < num_threads; ++t) {
+ threads.emplace_back([this, t, ops_per_thread]() {
+ for (int i = 0; i < ops_per_thread; ++i) {
+ std::string user_id = "t" + std::to_string(t) + "_u" + std::to_string(i);
+
+ // First access - miss
+ cache->get_user_stats(user_id);
+
+ // Update
+ cache->update_user_stats(user_id, i * 1024, i);
+
+ // Second access - hit
+ cache->get_user_stats(user_id);
+ }
+ });
+ }
+
+ for (auto& t : threads) {
+ t.join();
+ }
+
+ // Each thread does ops_per_thread users
+ // Each user: 1 miss (first get), 1 hit (second get)
+ uint64_t expected_total = num_threads * ops_per_thread;
+ EXPECT_EQ(expected_total, cache->get_cache_hits());
+ EXPECT_EQ(expected_total, cache->get_cache_misses());
+
+ std::cout << "Concurrent test results:" << std::endl;
+ std::cout << " Total operations: " << expected_total * 2 << std::endl;
+ std::cout << " Hits: " << cache->get_cache_hits() << std::endl;
+ std::cout << " Misses: " << cache->get_cache_misses() << std::endl;
+}
+
+TEST_F(TestRGWUsagePerfCounters, ExpiredEntryTracking) {
+ // Add a user
+ ASSERT_EQ(0, cache->update_user_stats("expiry_test", 1024, 1));
+
+ // Access it - should be a hit
+ auto stats = cache->get_user_stats("expiry_test");
+ ASSERT_TRUE(stats.has_value());
+ EXPECT_EQ(1u, cache->get_cache_hits());
+
+ // Wait for TTL to expire (2 seconds + buffer)
+ std::this_thread::sleep_for(std::chrono::milliseconds(2500));
+
+ // Access expired entry - should be a miss
+ stats = cache->get_user_stats("expiry_test");
+ EXPECT_FALSE(stats.has_value());
+
+ // After expiry, we should have an additional miss
+ EXPECT_EQ(1u, cache->get_cache_hits());
+ EXPECT_EQ(1u, cache->get_cache_misses()); // The expired entry counts as a miss
+}
+
+TEST_F(TestRGWUsagePerfCounters, RemoveOperationTracking) {
+ // Add users
+ cache->update_user_stats("user1", 1024, 1);
+ cache->update_user_stats("user2", 2048, 2);
+
+ // Remove one user
+ ASSERT_EQ(0, cache->remove_user_stats("user1"));
+
+ // Try to access removed user - should be a miss
+ auto stats = cache->get_user_stats("user1");
+ EXPECT_FALSE(stats.has_value());
+ EXPECT_EQ(1u, cache->get_cache_misses());
+
+ // Access existing user - should be a hit
+ stats = cache->get_user_stats("user2");
+ EXPECT_TRUE(stats.has_value());
+ EXPECT_EQ(1u, cache->get_cache_hits());
+}
+
+TEST_F(TestRGWUsagePerfCounters, ZeroDivisionInHitRate) {
+ // Test hit rate calculation when there are no operations
+ double hit_rate = cache->get_hit_rate();
+ EXPECT_EQ(0.0, hit_rate); // Should handle division by zero gracefully
+
+ // After one miss
+ cache->get_user_stats("nonexistent");
+ hit_rate = cache->get_hit_rate();
+ EXPECT_EQ(0.0, hit_rate); // 0 hits / 1 total = 0%
+
+ // After one hit
+ cache->update_user_stats("user1", 1024, 1);
+ cache->get_user_stats("user1");
+ hit_rate = cache->get_hit_rate();
+ EXPECT_EQ(50.0, hit_rate); // 1 hit / 2 total = 50%
+}
+
+int main(int argc, char **argv) {
+ ::testing::InitGoogleTest(&argc, argv);
+
+ // Initialize CephContext once for all tests
+ std::vector<const char*> args;
+ // Add --no-mon-config to skip fetching config from monitors
+ args.push_back("--no-mon-config");
+
+ g_cct_holder = global_init(nullptr, args, CEPH_ENTITY_TYPE_CLIENT,
+ CODE_ENVIRONMENT_UTILITY,
+ CINIT_FLAG_NO_MON_CONFIG); // Add this flag
+ g_cct = g_cct_holder.get();
+ common_init_finish(g_cct);
+
+ int result = RUN_ALL_TESTS();
+
+ // Clean up
+ g_cct = nullptr;
+ g_cct_holder.reset();
+
+ return result;
+}
\ No newline at end of file