From: Harsimran Singh Date: Wed, 26 Nov 2025 02:19:21 +0000 (+0530) Subject: rgw: Fix for aggregation of userStats X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=084b4d392fa9edb405dfc44b52e47e6a10b24b49;p=ceph-ci.git rgw: Fix for aggregation of userStats Signed-off-by: Harsimran Singh --- diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index 7af8a1cb583..f19a0b1df55 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -4073,7 +4073,7 @@ void RGWCreateBucket::execute(optional_yield y) auto* usage_counters = rgw::get_usage_perf_counters(); if (usage_counters && s->bucket) { // For new buckets, initialize with 0 bytes and 0 objects - usage_counters->update_bucket_stats(s->bucket->get_name(), 0, 0); + usage_counters->update_bucket_stats(s->bucket->get_name(), 0, 0, s->user->get_id().id); // Update user stats - use sync_owner_stats to get current info if (s->user) { @@ -4084,7 +4084,8 @@ void RGWCreateBucket::execute(optional_yield y) usage_counters->update_user_stats( s->user->get_id().id, ent.size, - ent.count + ent.count, + false ); } } @@ -5049,41 +5050,41 @@ void RGWPutObj::execute(optional_yield y) ldpp_dout(this, 20) << "PUT completed: updating usage for bucket=" << bucket_key << " size=" << s->obj_size << dendl; + auto usage_counters = rgw::get_usage_perf_counters(); + if (usage_counters && s->bucket && s->user) { + // Get actual bucket stats from RGW metadata (includes this PUT) + RGWBucketEnt stats; + int ret = s->bucket->sync_owner_stats(this, y, &stats); - // Increment bucket stats in cache (adds to existing) - auto existing = usage_counters->get_bucket_stats(bucket_key); - uint64_t new_bytes = s->obj_size; - uint64_t new_objects = 1; - - if (existing) { - new_bytes += existing->bytes_used; - new_objects += existing->num_objects; - } - - // Update cache with new totals - usage_counters->update_bucket_stats(s->bucket->get_name(), - new_bytes, new_objects, true); - - // Mark as active to trigger perf counter update - usage_counters->mark_bucket_active(s->bucket->get_name(), - s->bucket->get_tenant()); - - // Update user stats - if (s->user) { - auto user_existing = usage_counters->get_user_stats(s->user->get_id().to_str()); - uint64_t user_bytes = s->obj_size; - uint64_t user_objects = 1; + if (ret >= 0) { + ldpp_dout(this, 20) << "PUT completed: updating usage for bucket=" + << s->bucket->get_name() + << " bytes=" << stats.size + << " objects=" << stats.count << dendl; - if (user_existing) { - user_bytes += user_existing->bytes_used; - user_objects += user_existing->num_objects; - } + // Update with ACTUAL bucket totals (not calculated) + usage_counters->update_bucket_stats(s->bucket->get_name(), + stats.size, + stats.count, + s->user->get_id().id, + true); - usage_counters->update_user_stats(s->user->get_id().to_str(), - user_bytes, user_objects, true); - usage_counters->mark_user_active(s->user->get_id().to_str()); + // Mark as active + usage_counters->mark_bucket_active(s->bucket->get_name(), + s->bucket->get_tenant()); + + // User stats are aggregated in cache, just update perf counter + auto user_stats = usage_counters->get_user_stats(s->user->get_id().to_str()); + if (user_stats) { + usage_counters->update_user_stats(s->user->get_id().to_str(), + user_stats->bytes_used, + user_stats->num_objects, + false); + usage_counters->mark_user_active(s->user->get_id().to_str()); + } } } + } } } /* RGWPutObj::execute() */ @@ -5890,34 +5891,36 @@ void RGWDeleteObj::execute(optional_yield y) op_ret = -EINVAL; } - // Update usage statistics after successful delete - if (op_ret == 0 && s->bucket) { - auto usage_counters = rgw::get_usage_perf_counters(); - if (usage_counters) { - std::string bucket_key = s->bucket->get_tenant().empty() ? - s->bucket->get_name() : - s->bucket->get_tenant() + "/" + s->bucket->get_name(); + auto usage_counters = rgw::get_usage_perf_counters(); + if (usage_counters && s->bucket && s->user) { + // Get actual bucket stats from RGW metadata (after deletion) + RGWBucketEnt stats; + int ret = s->bucket->sync_owner_stats(this, y, &stats); + + if (ret >= 0) { + ldpp_dout(this, 20) << "DELETE completed: updating usage for bucket=" + << s->bucket->get_name() + << " bytes=" << stats.size + << " objects=" << stats.count << dendl; - ldpp_dout(this, 20) << "DELETE completed: updating usage for bucket=" - << bucket_key << dendl; + // Update with ACTUAL bucket totals (not calculated) + usage_counters->update_bucket_stats(s->bucket->get_name(), + stats.size, + stats.count, + s->user->get_id().id, + true); - // Decrement bucket stats in cache - auto existing = usage_counters->get_bucket_stats(bucket_key); - if (existing && existing->num_objects > 0) { - uint64_t obj_size = existing->bytes_used / existing->num_objects; // approximate - uint64_t new_bytes = existing->bytes_used > obj_size ? - existing->bytes_used - obj_size : 0; - uint64_t new_objects = existing->num_objects - 1; - - usage_counters->update_bucket_stats(s->bucket->get_name(), - new_bytes, new_objects, true); - } - - // Mark as active to trigger perf counter update + // Mark as active usage_counters->mark_bucket_active(s->bucket->get_name(), s->bucket->get_tenant()); - if (s->user) { + // User stats are aggregated in cache, just update perf counter + auto user_stats = usage_counters->get_user_stats(s->user->get_id().to_str()); + if (user_stats) { + usage_counters->update_user_stats(s->user->get_id().to_str(), + user_stats->bytes_used, + user_stats->num_objects, + false); usage_counters->mark_user_active(s->user->get_id().to_str()); } } diff --git a/src/rgw/rgw_usage_cache.cc b/src/rgw/rgw_usage_cache.cc index bd96836b672..90af67ce125 100644 --- a/src/rgw/rgw_usage_cache.cc +++ b/src/rgw/rgw_usage_cache.cc @@ -510,15 +510,56 @@ int UsageCache::remove_user_stats(const std::string& user_id) { int UsageCache::update_bucket_stats(const std::string& bucket_name, uint64_t bytes_used, - uint64_t num_objects) { + uint64_t num_objects, + const std::string& user_id) { + + if (!initialized || user_id.empty()) { + return -EINVAL; + } + std::unique_lock lock(db_mutex); + // Get old bucket stats to calculate delta + auto old_bucket_stats = get_stats(bucket_dbi, bucket_name); + UsageStats stats; stats.bytes_used = bytes_used; stats.num_objects = num_objects; stats.last_updated = ceph::real_clock::now(); int ret = put_stats(bucket_dbi, bucket_name, stats); + if (ret != 0) { + return ret; + } + + // Get current user stats + auto current_user_stats = get_stats(user_dbi, user_id); + + UsageStats new_user_stats; + if (current_user_stats.has_value()) { + new_user_stats.bytes_used = current_user_stats->bytes_used; + new_user_stats.num_objects = current_user_stats->num_objects; + } else { + new_user_stats.bytes_used = 0; + new_user_stats.num_objects = 0; + } + + // Calculate delta (what changed for this bucket) + int64_t delta_bytes = (int64_t)bytes_used; + int64_t delta_objects = (int64_t)num_objects; + + if (old_bucket_stats.has_value()) { + delta_bytes -= (int64_t)old_bucket_stats->bytes_used; + delta_objects -= (int64_t)old_bucket_stats->num_objects; + } + + // Apply delta to user stats + new_user_stats.bytes_used = (uint64_t)((int64_t)new_user_stats.bytes_used + delta_bytes); + new_user_stats.num_objects = (uint64_t)((int64_t)new_user_stats.num_objects + delta_objects); + new_user_stats.last_updated = ceph::real_clock::now(); + + // Update user stats in cache + ret = put_stats(user_dbi, user_id, new_user_stats); if (ret == 0) { inc_counter(PERF_CACHE_UPDATE); set_counter(PERF_CACHE_SIZE, get_cache_size_internal()); @@ -733,6 +774,112 @@ double UsageCache::get_hit_rate() const { return (total > 0) ? (double)hits / total * 100.0 : 0.0; } +std::vector> UsageCache::get_all_users() { + std::vector> result; + + if (!env) { + ldout(cct, 5) << "get_all_users: database not initialized" << dendl; + return result; + } + + MDB_txn* txn = nullptr; + int rc = mdb_txn_begin(env, nullptr, MDB_RDONLY, &txn); + if (rc != 0) { + ldout(cct, 5) << "LMDB txn_begin failed in get_all_users: " + << mdb_strerror(rc) << dendl; + return result; + } + + MDB_cursor* cursor = nullptr; + rc = mdb_cursor_open(txn, user_dbi, &cursor); + if (rc != 0) { + ldout(cct, 5) << "LMDB cursor_open failed in get_all_users: " + << mdb_strerror(rc) << dendl; + mdb_txn_abort(txn); + return result; + } + + MDB_val key, data; + while ((rc = mdb_cursor_get(cursor, &key, &data, MDB_NEXT)) == 0) { + std::string user_id((char*)key.mv_data, key.mv_size); + + UsageStats stats; + try { + bufferlist bl; + bl.append((char*)data.mv_data, data.mv_size); + auto iter = bl.cbegin(); + stats.decode(iter); + + result.push_back({user_id, stats}); + ldout(cct, 20) << "get_all_users: loaded user=" << user_id + << " bytes=" << stats.bytes_used + << " objects=" << stats.num_objects << dendl; + } catch (const std::exception& e) { + ldout(cct, 1) << "Failed to decode user stats for " << user_id + << ": " << e.what() << dendl; + } + } + + mdb_cursor_close(cursor); + mdb_txn_abort(txn); + + ldout(cct, 10) << "get_all_users: loaded " << result.size() << " users" << dendl; + return result; +} + +std::vector> UsageCache::get_all_buckets() { + std::vector> result; + + if (!env) { + ldout(cct, 5) << "get_all_buckets: database not initialized" << dendl; + return result; + } + + MDB_txn* txn = nullptr; + int rc = mdb_txn_begin(env, nullptr, MDB_RDONLY, &txn); + if (rc != 0) { + ldout(cct, 5) << "LMDB txn_begin failed in get_all_buckets: " + << mdb_strerror(rc) << dendl; + return result; + } + + MDB_cursor* cursor = nullptr; + rc = mdb_cursor_open(txn, bucket_dbi, &cursor); + if (rc != 0) { + ldout(cct, 5) << "LMDB cursor_open failed in get_all_buckets: " + << mdb_strerror(rc) << dendl; + mdb_txn_abort(txn); + return result; + } + + MDB_val key, data; + while ((rc = mdb_cursor_get(cursor, &key, &data, MDB_NEXT)) == 0) { + std::string bucket_key((char*)key.mv_data, key.mv_size); + + UsageStats stats; + try { + bufferlist bl; + bl.append((char*)data.mv_data, data.mv_size); + auto iter = bl.cbegin(); + stats.decode(iter); + + result.push_back({bucket_key, stats}); + ldout(cct, 20) << "get_all_buckets: loaded bucket=" << bucket_key + << " bytes=" << stats.bytes_used + << " objects=" << stats.num_objects << dendl; + } catch (const std::exception& e) { + ldout(cct, 1) << "Failed to decode bucket stats for " << bucket_key + << ": " << e.what() << dendl; + } + } + + mdb_cursor_close(cursor); + mdb_txn_abort(txn); + + ldout(cct, 10) << "get_all_buckets: loaded " << result.size() << " buckets" << dendl; + return result; +} + // Explicit template instantiations template int UsageCache::put_stats(MDB_dbi, const std::string&, const UsageStats&); template std::optional UsageCache::get_stats(MDB_dbi, const std::string&); diff --git a/src/rgw/rgw_usage_cache.h b/src/rgw/rgw_usage_cache.h index 34701044f78..6cf951be985 100644 --- a/src/rgw/rgw_usage_cache.h +++ b/src/rgw/rgw_usage_cache.h @@ -70,7 +70,8 @@ public: // Bucket stats operations (non-const to update counters) int update_bucket_stats(const std::string& bucket_name, uint64_t bytes_used, - uint64_t num_objects); + uint64_t num_objects, + const std::string& user_id); std::optional get_bucket_stats(const std::string& bucket_name); int remove_bucket_stats(const std::string& bucket_name); @@ -85,6 +86,10 @@ public: uint64_t get_cache_misses() const; double get_hit_rate() const; + // Iterator methods for initial load + std::vector> get_all_users(); + std::vector> get_all_buckets(); + private: // Database operations int open_database(); diff --git a/src/rgw/rgw_usage_perf.cc b/src/rgw/rgw_usage_perf.cc index bd70553b853..77184a55557 100644 --- a/src/rgw/rgw_usage_perf.cc +++ b/src/rgw/rgw_usage_perf.cc @@ -170,7 +170,82 @@ int UsagePerfCounters::init() { void UsagePerfCounters::start() { ldout(cct, 10) << "Starting usage perf counters" << dendl; shutdown_flag = false; - + + if (cache) { + ldout(cct, 10) << "Loading all stats from cache on startup" << dendl; + + // Load users - directly populate perf counters without touching cache + auto all_users = cache->get_all_users(); + for (const auto& [user_id, stats] : all_users) { + // Create perf counter if needed + { + std::unique_lock lock(counters_mutex); + auto it = user_perf_counters.find(user_id); + if (it == user_perf_counters.end()) { + PerfCounters* counters = create_user_counters(user_id); + if (counters) { + user_perf_counters[user_id] = counters; + it = user_perf_counters.find(user_id); + ldout(cct, 15) << "Created perf counter for user " << user_id << dendl; + } + } + + // Set values directly on perf counter + if (it != user_perf_counters.end() && it->second) { + it->second->set(930001, stats.bytes_used); // l_rgw_user_bytes + it->second->set(930002, stats.num_objects); // l_rgw_user_objects + ldout(cct, 15) << "Set perf counter for user " << user_id + << " bytes=" << stats.bytes_used + << " objects=" << stats.num_objects << dendl; + } + } + + // Mark as active for refresh worker + mark_user_active(user_id); + } + + // Load buckets - directly populate perf counters without touching cache + auto all_buckets = cache->get_all_buckets(); + for (const auto& [bucket_key, stats] : all_buckets) { + std::string bucket_name = bucket_key; + std::string tenant; + size_t pos = bucket_key.find('/'); + if (pos != std::string::npos) { + tenant = bucket_key.substr(0, pos); + bucket_name = bucket_key.substr(pos + 1); + } + + // Create perf counter if needed + { + std::unique_lock lock(counters_mutex); + auto it = bucket_perf_counters.find(bucket_name); + if (it == bucket_perf_counters.end()) { + PerfCounters* counters = create_bucket_counters(bucket_name); + if (counters) { + bucket_perf_counters[bucket_name] = counters; + it = bucket_perf_counters.find(bucket_name); + ldout(cct, 15) << "Created perf counter for bucket " << bucket_name << dendl; + } + } + + // Set values directly on perf counter + if (it != bucket_perf_counters.end() && it->second) { + it->second->set(940001, stats.bytes_used); // l_rgw_bucket_bytes + it->second->set(940002, stats.num_objects); // l_rgw_bucket_objects + ldout(cct, 15) << "Set perf counter for bucket " << bucket_name + << " bytes=" << stats.bytes_used + << " objects=" << stats.num_objects << dendl; + } + } + + // Mark as active for refresh worker + mark_bucket_active(bucket_name, tenant); + } + + ldout(cct, 10) << "Initial load complete: " << all_users.size() + << " users, " << all_buckets.size() << " buckets" << dendl; + } + // Start cleanup thread cleanup_thread = std::thread(&UsagePerfCounters::cleanup_worker, this); @@ -246,15 +321,17 @@ void UsagePerfCounters::shutdown() { void UsagePerfCounters::update_bucket_stats(const std::string& bucket_name, uint64_t bytes_used, uint64_t num_objects, + const std::string& user_id, bool update_cache) { ldout(cct, 20) << "update_bucket_stats: bucket=" << bucket_name << " bytes=" << bytes_used - << " objects=" << num_objects + << " objects=" << num_objects + << " user=" << user_id << " update_cache=" << update_cache << dendl; - // Update cache if requested - ALWAYS write the total values passed in - if (update_cache && cache) { - int ret = cache->update_bucket_stats(bucket_name, bytes_used, num_objects); + // Update cache if requested - cache will aggregate user stats + if (update_cache && cache && !user_id.empty()) { + int ret = cache->update_bucket_stats(bucket_name, bytes_used, num_objects, user_id); if (ret == 0) { global_counters->inc(l_rgw_usage_cache_update); ldout(cct, 15) << "Cache updated for bucket " << bucket_name @@ -388,7 +465,7 @@ void UsagePerfCounters::mark_bucket_active(const std::string& bucket_name, } update_bucket_stats(bucket_only, cached_stats->bytes_used, - cached_stats->num_objects, false); + cached_stats->num_objects,"" ,false); } } @@ -485,7 +562,7 @@ void UsagePerfCounters::refresh_bucket_stats(const std::string& bucket_key) { << " objects=" << cached_stats->num_objects << dendl; update_bucket_stats(bucket_name, cached_stats->bytes_used, - cached_stats->num_objects, false); + cached_stats->num_objects, "", false); } } @@ -527,7 +604,7 @@ void UsagePerfCounters::refresh_from_cache(const std::string& user_id, if (bucket_stats) { global_counters->inc(l_rgw_usage_cache_hit); update_bucket_stats(bucket_name, bucket_stats->bytes_used, - bucket_stats->num_objects, false); + bucket_stats->num_objects, "", false); } else { global_counters->inc(l_rgw_usage_cache_miss); } diff --git a/src/rgw/rgw_usage_perf.h b/src/rgw/rgw_usage_perf.h index f1d4cdfa82b..90a3a9f9df3 100644 --- a/src/rgw/rgw_usage_perf.h +++ b/src/rgw/rgw_usage_perf.h @@ -94,6 +94,7 @@ public: void update_bucket_stats(const std::string& bucket_name, uint64_t bytes_used, uint64_t num_objects, + const std::string& user_id = "", bool update_cache = true); void mark_bucket_active(const std::string& bucket_name,