auto* usage_counters = rgw::get_usage_perf_counters();
if (usage_counters && s->bucket) {
// For new buckets, initialize with 0 bytes and 0 objects
- usage_counters->update_bucket_stats(s->bucket->get_name(), 0, 0);
+ usage_counters->update_bucket_stats(s->bucket->get_name(), 0, 0, s->user->get_id().id);
// Update user stats - use sync_owner_stats to get current info
if (s->user) {
usage_counters->update_user_stats(
s->user->get_id().id,
ent.size,
- ent.count
+ ent.count,
+ false
);
}
}
ldpp_dout(this, 20) << "PUT completed: updating usage for bucket="
<< bucket_key << " size=" << s->obj_size << dendl;
+ auto usage_counters = rgw::get_usage_perf_counters();
+ if (usage_counters && s->bucket && s->user) {
+ // Get actual bucket stats from RGW metadata (includes this PUT)
+ RGWBucketEnt stats;
+ int ret = s->bucket->sync_owner_stats(this, y, &stats);
- // Increment bucket stats in cache (adds to existing)
- auto existing = usage_counters->get_bucket_stats(bucket_key);
- uint64_t new_bytes = s->obj_size;
- uint64_t new_objects = 1;
-
- if (existing) {
- new_bytes += existing->bytes_used;
- new_objects += existing->num_objects;
- }
-
- // Update cache with new totals
- usage_counters->update_bucket_stats(s->bucket->get_name(),
- new_bytes, new_objects, true);
-
- // Mark as active to trigger perf counter update
- usage_counters->mark_bucket_active(s->bucket->get_name(),
- s->bucket->get_tenant());
-
- // Update user stats
- if (s->user) {
- auto user_existing = usage_counters->get_user_stats(s->user->get_id().to_str());
- uint64_t user_bytes = s->obj_size;
- uint64_t user_objects = 1;
+ if (ret >= 0) {
+ ldpp_dout(this, 20) << "PUT completed: updating usage for bucket="
+ << s->bucket->get_name()
+ << " bytes=" << stats.size
+ << " objects=" << stats.count << dendl;
- if (user_existing) {
- user_bytes += user_existing->bytes_used;
- user_objects += user_existing->num_objects;
- }
+ // Update with ACTUAL bucket totals (not calculated)
+ usage_counters->update_bucket_stats(s->bucket->get_name(),
+ stats.size,
+ stats.count,
+ s->user->get_id().id,
+ true);
- usage_counters->update_user_stats(s->user->get_id().to_str(),
- user_bytes, user_objects, true);
- usage_counters->mark_user_active(s->user->get_id().to_str());
+ // Mark as active
+ usage_counters->mark_bucket_active(s->bucket->get_name(),
+ s->bucket->get_tenant());
+
+ // User stats are aggregated in cache, just update perf counter
+ auto user_stats = usage_counters->get_user_stats(s->user->get_id().to_str());
+ if (user_stats) {
+ usage_counters->update_user_stats(s->user->get_id().to_str(),
+ user_stats->bytes_used,
+ user_stats->num_objects,
+ false);
+ usage_counters->mark_user_active(s->user->get_id().to_str());
+ }
}
}
+ }
}
} /* RGWPutObj::execute() */
op_ret = -EINVAL;
}
- // Update usage statistics after successful delete
- if (op_ret == 0 && s->bucket) {
- auto usage_counters = rgw::get_usage_perf_counters();
- if (usage_counters) {
- std::string bucket_key = s->bucket->get_tenant().empty() ?
- s->bucket->get_name() :
- s->bucket->get_tenant() + "/" + s->bucket->get_name();
+ auto usage_counters = rgw::get_usage_perf_counters();
+ if (usage_counters && s->bucket && s->user) {
+ // Get actual bucket stats from RGW metadata (after deletion)
+ RGWBucketEnt stats;
+ int ret = s->bucket->sync_owner_stats(this, y, &stats);
+
+ if (ret >= 0) {
+ ldpp_dout(this, 20) << "DELETE completed: updating usage for bucket="
+ << s->bucket->get_name()
+ << " bytes=" << stats.size
+ << " objects=" << stats.count << dendl;
- ldpp_dout(this, 20) << "DELETE completed: updating usage for bucket="
- << bucket_key << dendl;
+ // Update with ACTUAL bucket totals (not calculated)
+ usage_counters->update_bucket_stats(s->bucket->get_name(),
+ stats.size,
+ stats.count,
+ s->user->get_id().id,
+ true);
- // Decrement bucket stats in cache
- auto existing = usage_counters->get_bucket_stats(bucket_key);
- if (existing && existing->num_objects > 0) {
- uint64_t obj_size = existing->bytes_used / existing->num_objects; // approximate
- uint64_t new_bytes = existing->bytes_used > obj_size ?
- existing->bytes_used - obj_size : 0;
- uint64_t new_objects = existing->num_objects - 1;
-
- usage_counters->update_bucket_stats(s->bucket->get_name(),
- new_bytes, new_objects, true);
- }
-
- // Mark as active to trigger perf counter update
+ // Mark as active
usage_counters->mark_bucket_active(s->bucket->get_name(),
s->bucket->get_tenant());
- if (s->user) {
+ // User stats are aggregated in cache, just update perf counter
+ auto user_stats = usage_counters->get_user_stats(s->user->get_id().to_str());
+ if (user_stats) {
+ usage_counters->update_user_stats(s->user->get_id().to_str(),
+ user_stats->bytes_used,
+ user_stats->num_objects,
+ false);
usage_counters->mark_user_active(s->user->get_id().to_str());
}
}
int UsageCache::update_bucket_stats(const std::string& bucket_name,
uint64_t bytes_used,
- uint64_t num_objects) {
+ uint64_t num_objects,
+ const std::string& user_id) {
+
+ if (!initialized || user_id.empty()) {
+ return -EINVAL;
+ }
+
std::unique_lock lock(db_mutex);
+ // Get old bucket stats to calculate delta
+ auto old_bucket_stats = get_stats<UsageStats>(bucket_dbi, bucket_name);
+
UsageStats stats;
stats.bytes_used = bytes_used;
stats.num_objects = num_objects;
stats.last_updated = ceph::real_clock::now();
int ret = put_stats(bucket_dbi, bucket_name, stats);
+ if (ret != 0) {
+ return ret;
+ }
+
+ // Get current user stats
+ auto current_user_stats = get_stats<UsageStats>(user_dbi, user_id);
+
+ UsageStats new_user_stats;
+ if (current_user_stats.has_value()) {
+ new_user_stats.bytes_used = current_user_stats->bytes_used;
+ new_user_stats.num_objects = current_user_stats->num_objects;
+ } else {
+ new_user_stats.bytes_used = 0;
+ new_user_stats.num_objects = 0;
+ }
+
+ // Calculate delta (what changed for this bucket)
+ int64_t delta_bytes = (int64_t)bytes_used;
+ int64_t delta_objects = (int64_t)num_objects;
+
+ if (old_bucket_stats.has_value()) {
+ delta_bytes -= (int64_t)old_bucket_stats->bytes_used;
+ delta_objects -= (int64_t)old_bucket_stats->num_objects;
+ }
+
+ // Apply delta to user stats
+ new_user_stats.bytes_used = (uint64_t)((int64_t)new_user_stats.bytes_used + delta_bytes);
+ new_user_stats.num_objects = (uint64_t)((int64_t)new_user_stats.num_objects + delta_objects);
+ new_user_stats.last_updated = ceph::real_clock::now();
+
+ // Update user stats in cache
+ ret = put_stats(user_dbi, user_id, new_user_stats);
if (ret == 0) {
inc_counter(PERF_CACHE_UPDATE);
set_counter(PERF_CACHE_SIZE, get_cache_size_internal());
return (total > 0) ? (double)hits / total * 100.0 : 0.0;
}
+std::vector<std::pair<std::string, UsageStats>> UsageCache::get_all_users() {
+ std::vector<std::pair<std::string, UsageStats>> result;
+
+ if (!env) {
+ ldout(cct, 5) << "get_all_users: database not initialized" << dendl;
+ return result;
+ }
+
+ MDB_txn* txn = nullptr;
+ int rc = mdb_txn_begin(env, nullptr, MDB_RDONLY, &txn);
+ if (rc != 0) {
+ ldout(cct, 5) << "LMDB txn_begin failed in get_all_users: "
+ << mdb_strerror(rc) << dendl;
+ return result;
+ }
+
+ MDB_cursor* cursor = nullptr;
+ rc = mdb_cursor_open(txn, user_dbi, &cursor);
+ if (rc != 0) {
+ ldout(cct, 5) << "LMDB cursor_open failed in get_all_users: "
+ << mdb_strerror(rc) << dendl;
+ mdb_txn_abort(txn);
+ return result;
+ }
+
+ MDB_val key, data;
+ while ((rc = mdb_cursor_get(cursor, &key, &data, MDB_NEXT)) == 0) {
+ std::string user_id((char*)key.mv_data, key.mv_size);
+
+ UsageStats stats;
+ try {
+ bufferlist bl;
+ bl.append((char*)data.mv_data, data.mv_size);
+ auto iter = bl.cbegin();
+ stats.decode(iter);
+
+ result.push_back({user_id, stats});
+ ldout(cct, 20) << "get_all_users: loaded user=" << user_id
+ << " bytes=" << stats.bytes_used
+ << " objects=" << stats.num_objects << dendl;
+ } catch (const std::exception& e) {
+ ldout(cct, 1) << "Failed to decode user stats for " << user_id
+ << ": " << e.what() << dendl;
+ }
+ }
+
+ mdb_cursor_close(cursor);
+ mdb_txn_abort(txn);
+
+ ldout(cct, 10) << "get_all_users: loaded " << result.size() << " users" << dendl;
+ return result;
+}
+
+std::vector<std::pair<std::string, UsageStats>> UsageCache::get_all_buckets() {
+ std::vector<std::pair<std::string, UsageStats>> result;
+
+ if (!env) {
+ ldout(cct, 5) << "get_all_buckets: database not initialized" << dendl;
+ return result;
+ }
+
+ MDB_txn* txn = nullptr;
+ int rc = mdb_txn_begin(env, nullptr, MDB_RDONLY, &txn);
+ if (rc != 0) {
+ ldout(cct, 5) << "LMDB txn_begin failed in get_all_buckets: "
+ << mdb_strerror(rc) << dendl;
+ return result;
+ }
+
+ MDB_cursor* cursor = nullptr;
+ rc = mdb_cursor_open(txn, bucket_dbi, &cursor);
+ if (rc != 0) {
+ ldout(cct, 5) << "LMDB cursor_open failed in get_all_buckets: "
+ << mdb_strerror(rc) << dendl;
+ mdb_txn_abort(txn);
+ return result;
+ }
+
+ MDB_val key, data;
+ while ((rc = mdb_cursor_get(cursor, &key, &data, MDB_NEXT)) == 0) {
+ std::string bucket_key((char*)key.mv_data, key.mv_size);
+
+ UsageStats stats;
+ try {
+ bufferlist bl;
+ bl.append((char*)data.mv_data, data.mv_size);
+ auto iter = bl.cbegin();
+ stats.decode(iter);
+
+ result.push_back({bucket_key, stats});
+ ldout(cct, 20) << "get_all_buckets: loaded bucket=" << bucket_key
+ << " bytes=" << stats.bytes_used
+ << " objects=" << stats.num_objects << dendl;
+ } catch (const std::exception& e) {
+ ldout(cct, 1) << "Failed to decode bucket stats for " << bucket_key
+ << ": " << e.what() << dendl;
+ }
+ }
+
+ mdb_cursor_close(cursor);
+ mdb_txn_abort(txn);
+
+ ldout(cct, 10) << "get_all_buckets: loaded " << result.size() << " buckets" << dendl;
+ return result;
+}
+
// Explicit template instantiations
template int UsageCache::put_stats(MDB_dbi, const std::string&, const UsageStats&);
template std::optional<UsageStats> UsageCache::get_stats(MDB_dbi, const std::string&);
// Bucket stats operations (non-const to update counters)
int update_bucket_stats(const std::string& bucket_name,
uint64_t bytes_used,
- uint64_t num_objects);
+ uint64_t num_objects,
+ const std::string& user_id);
std::optional<UsageStats> get_bucket_stats(const std::string& bucket_name);
int remove_bucket_stats(const std::string& bucket_name);
uint64_t get_cache_misses() const;
double get_hit_rate() const;
+ // Iterator methods for initial load
+ std::vector<std::pair<std::string, UsageStats>> get_all_users();
+ std::vector<std::pair<std::string, UsageStats>> get_all_buckets();
+
private:
// Database operations
int open_database();
void UsagePerfCounters::start() {
ldout(cct, 10) << "Starting usage perf counters" << dendl;
shutdown_flag = false;
-
+
+ if (cache) {
+ ldout(cct, 10) << "Loading all stats from cache on startup" << dendl;
+
+ // Load users - directly populate perf counters without touching cache
+ auto all_users = cache->get_all_users();
+ for (const auto& [user_id, stats] : all_users) {
+ // Create perf counter if needed
+ {
+ std::unique_lock lock(counters_mutex);
+ auto it = user_perf_counters.find(user_id);
+ if (it == user_perf_counters.end()) {
+ PerfCounters* counters = create_user_counters(user_id);
+ if (counters) {
+ user_perf_counters[user_id] = counters;
+ it = user_perf_counters.find(user_id);
+ ldout(cct, 15) << "Created perf counter for user " << user_id << dendl;
+ }
+ }
+
+ // Set values directly on perf counter
+ if (it != user_perf_counters.end() && it->second) {
+ it->second->set(930001, stats.bytes_used); // l_rgw_user_bytes
+ it->second->set(930002, stats.num_objects); // l_rgw_user_objects
+ ldout(cct, 15) << "Set perf counter for user " << user_id
+ << " bytes=" << stats.bytes_used
+ << " objects=" << stats.num_objects << dendl;
+ }
+ }
+
+ // Mark as active for refresh worker
+ mark_user_active(user_id);
+ }
+
+ // Load buckets - directly populate perf counters without touching cache
+ auto all_buckets = cache->get_all_buckets();
+ for (const auto& [bucket_key, stats] : all_buckets) {
+ std::string bucket_name = bucket_key;
+ std::string tenant;
+ size_t pos = bucket_key.find('/');
+ if (pos != std::string::npos) {
+ tenant = bucket_key.substr(0, pos);
+ bucket_name = bucket_key.substr(pos + 1);
+ }
+
+ // Create perf counter if needed
+ {
+ std::unique_lock lock(counters_mutex);
+ auto it = bucket_perf_counters.find(bucket_name);
+ if (it == bucket_perf_counters.end()) {
+ PerfCounters* counters = create_bucket_counters(bucket_name);
+ if (counters) {
+ bucket_perf_counters[bucket_name] = counters;
+ it = bucket_perf_counters.find(bucket_name);
+ ldout(cct, 15) << "Created perf counter for bucket " << bucket_name << dendl;
+ }
+ }
+
+ // Set values directly on perf counter
+ if (it != bucket_perf_counters.end() && it->second) {
+ it->second->set(940001, stats.bytes_used); // l_rgw_bucket_bytes
+ it->second->set(940002, stats.num_objects); // l_rgw_bucket_objects
+ ldout(cct, 15) << "Set perf counter for bucket " << bucket_name
+ << " bytes=" << stats.bytes_used
+ << " objects=" << stats.num_objects << dendl;
+ }
+ }
+
+ // Mark as active for refresh worker
+ mark_bucket_active(bucket_name, tenant);
+ }
+
+ ldout(cct, 10) << "Initial load complete: " << all_users.size()
+ << " users, " << all_buckets.size() << " buckets" << dendl;
+ }
+
// Start cleanup thread
cleanup_thread = std::thread(&UsagePerfCounters::cleanup_worker, this);
void UsagePerfCounters::update_bucket_stats(const std::string& bucket_name,
uint64_t bytes_used,
uint64_t num_objects,
+ const std::string& user_id,
bool update_cache) {
ldout(cct, 20) << "update_bucket_stats: bucket=" << bucket_name
<< " bytes=" << bytes_used
- << " objects=" << num_objects
+ << " objects=" << num_objects
+ << " user=" << user_id
<< " update_cache=" << update_cache << dendl;
- // Update cache if requested - ALWAYS write the total values passed in
- if (update_cache && cache) {
- int ret = cache->update_bucket_stats(bucket_name, bytes_used, num_objects);
+ // Update cache if requested - cache will aggregate user stats
+ if (update_cache && cache && !user_id.empty()) {
+ int ret = cache->update_bucket_stats(bucket_name, bytes_used, num_objects, user_id);
if (ret == 0) {
global_counters->inc(l_rgw_usage_cache_update);
ldout(cct, 15) << "Cache updated for bucket " << bucket_name
}
update_bucket_stats(bucket_only, cached_stats->bytes_used,
- cached_stats->num_objects, false);
+ cached_stats->num_objects,"" ,false);
}
}
<< " objects=" << cached_stats->num_objects << dendl;
update_bucket_stats(bucket_name, cached_stats->bytes_used,
- cached_stats->num_objects, false);
+ cached_stats->num_objects, "", false);
}
}
if (bucket_stats) {
global_counters->inc(l_rgw_usage_cache_hit);
update_bucket_stats(bucket_name, bucket_stats->bytes_used,
- bucket_stats->num_objects, false);
+ bucket_stats->num_objects, "", false);
} else {
global_counters->inc(l_rgw_usage_cache_miss);
}
void update_bucket_stats(const std::string& bucket_name,
uint64_t bytes_used,
uint64_t num_objects,
+ const std::string& user_id = "",
bool update_cache = true);
void mark_bucket_active(const std::string& bucket_name,