]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw: Fix for aggregation of userStats
authorHarsimran Singh <hsthukral51@gmail.com>
Wed, 26 Nov 2025 02:19:21 +0000 (07:49 +0530)
committerHarsimran Singh <hsthukral51@gmail.com>
Fri, 12 Dec 2025 06:17:24 +0000 (11:47 +0530)
Signed-off-by: Harsimran Singh <hsthukral51@gmail.com>
src/rgw/rgw_op.cc
src/rgw/rgw_usage_cache.cc
src/rgw/rgw_usage_cache.h
src/rgw/rgw_usage_perf.cc
src/rgw/rgw_usage_perf.h

index 7af8a1cb5830746a209fb729da1bb1b84444378e..f19a0b1df5587ef650170f1564d9c77a09ee0f24 100644 (file)
@@ -4073,7 +4073,7 @@ void RGWCreateBucket::execute(optional_yield y)
     auto* usage_counters = rgw::get_usage_perf_counters();
     if (usage_counters && s->bucket) {
       // For new buckets, initialize with 0 bytes and 0 objects
-      usage_counters->update_bucket_stats(s->bucket->get_name(), 0, 0);
+      usage_counters->update_bucket_stats(s->bucket->get_name(), 0, 0, s->user->get_id().id);
       
       // Update user stats - use sync_owner_stats to get current info
       if (s->user) {
@@ -4084,7 +4084,8 @@ void RGWCreateBucket::execute(optional_yield y)
           usage_counters->update_user_stats(
             s->user->get_id().id,
             ent.size,
-            ent.count
+            ent.count,
+            false
           );
         }
       }
@@ -5049,41 +5050,41 @@ void RGWPutObj::execute(optional_yield y)
       
       ldpp_dout(this, 20) << "PUT completed: updating usage for bucket=" 
                           << bucket_key << " size=" << s->obj_size << dendl;
+    auto usage_counters = rgw::get_usage_perf_counters();
+    if (usage_counters && s->bucket && s->user) {
+      // Get actual bucket stats from RGW metadata (includes this PUT)
+      RGWBucketEnt stats;
+      int ret = s->bucket->sync_owner_stats(this, y, &stats);
       
-      // Increment bucket stats in cache (adds to existing)
-      auto existing = usage_counters->get_bucket_stats(bucket_key);
-      uint64_t new_bytes = s->obj_size;
-      uint64_t new_objects = 1;
-      
-      if (existing) {
-        new_bytes += existing->bytes_used;
-        new_objects += existing->num_objects;
-      }
-      
-      // Update cache with new totals
-      usage_counters->update_bucket_stats(s->bucket->get_name(), 
-                                         new_bytes, new_objects, true);
-      
-      // Mark as active to trigger perf counter update
-      usage_counters->mark_bucket_active(s->bucket->get_name(),
-                                        s->bucket->get_tenant());
-      
-      // Update user stats
-      if (s->user) {
-        auto user_existing = usage_counters->get_user_stats(s->user->get_id().to_str());
-        uint64_t user_bytes = s->obj_size;
-        uint64_t user_objects = 1;
+      if (ret >= 0) {
+        ldpp_dout(this, 20) << "PUT completed: updating usage for bucket="
+                            << s->bucket->get_name() 
+                            << " bytes=" << stats.size
+                            << " objects=" << stats.count << dendl;
         
-        if (user_existing) {
-          user_bytes += user_existing->bytes_used;
-          user_objects += user_existing->num_objects;
-        }
+        // Update with ACTUAL bucket totals (not calculated)
+        usage_counters->update_bucket_stats(s->bucket->get_name(),
+                                           stats.size,
+                                           stats.count,
+                                           s->user->get_id().id,
+                                           true);
         
-        usage_counters->update_user_stats(s->user->get_id().to_str(),
-                                         user_bytes, user_objects, true);
-        usage_counters->mark_user_active(s->user->get_id().to_str());
+        // Mark as active
+        usage_counters->mark_bucket_active(s->bucket->get_name(),
+                                          s->bucket->get_tenant());
+        
+        // User stats are aggregated in cache, just update perf counter
+        auto user_stats = usage_counters->get_user_stats(s->user->get_id().to_str());
+        if (user_stats) {
+          usage_counters->update_user_stats(s->user->get_id().to_str(),
+                                           user_stats->bytes_used,
+                                           user_stats->num_objects,
+                                           false);
+          usage_counters->mark_user_active(s->user->get_id().to_str());
+        }
       }
     }
+    }
   }
 
 } /* RGWPutObj::execute() */
@@ -5890,34 +5891,36 @@ void RGWDeleteObj::execute(optional_yield y)
     op_ret = -EINVAL;
   }
   
-  // Update usage statistics after successful delete
-  if (op_ret == 0 && s->bucket) {
-    auto usage_counters = rgw::get_usage_perf_counters();
-    if (usage_counters) {
-      std::string bucket_key = s->bucket->get_tenant().empty() ? 
-                               s->bucket->get_name() : 
-                               s->bucket->get_tenant() + "/" + s->bucket->get_name();
+  auto usage_counters = rgw::get_usage_perf_counters();
+  if (usage_counters && s->bucket && s->user) {
+    // Get actual bucket stats from RGW metadata (after deletion)
+    RGWBucketEnt stats;
+    int ret = s->bucket->sync_owner_stats(this, y, &stats);
+    
+    if (ret >= 0) {
+      ldpp_dout(this, 20) << "DELETE completed: updating usage for bucket="
+                          << s->bucket->get_name() 
+                          << " bytes=" << stats.size
+                          << " objects=" << stats.count << dendl;
       
-      ldpp_dout(this, 20) << "DELETE completed: updating usage for bucket=" 
-                          << bucket_key << dendl;
+      // Update with ACTUAL bucket totals (not calculated)
+      usage_counters->update_bucket_stats(s->bucket->get_name(),
+                                          stats.size,
+                                          stats.count,
+                                          s->user->get_id().id,
+                                          true);
       
-      // Decrement bucket stats in cache
-      auto existing = usage_counters->get_bucket_stats(bucket_key);
-      if (existing && existing->num_objects > 0) {
-        uint64_t obj_size = existing->bytes_used / existing->num_objects; // approximate
-        uint64_t new_bytes = existing->bytes_used > obj_size ? 
-                            existing->bytes_used - obj_size : 0;
-        uint64_t new_objects = existing->num_objects - 1;
-        
-        usage_counters->update_bucket_stats(s->bucket->get_name(),
-                                           new_bytes, new_objects, true);
-      }
-      
-      // Mark as active to trigger perf counter update
+      // Mark as active
       usage_counters->mark_bucket_active(s->bucket->get_name(),
                                         s->bucket->get_tenant());
       
-      if (s->user) {
+      // User stats are aggregated in cache, just update perf counter
+      auto user_stats = usage_counters->get_user_stats(s->user->get_id().to_str());
+      if (user_stats) {
+        usage_counters->update_user_stats(s->user->get_id().to_str(),
+                                          user_stats->bytes_used,
+                                          user_stats->num_objects,
+                                          false);
         usage_counters->mark_user_active(s->user->get_id().to_str());
       }
     }
index bd96836b672525fecc0e79526fd76497c2b761ab..90af67ce1252cca5ee7f188e9342ef0417e125d8 100644 (file)
@@ -510,15 +510,56 @@ int UsageCache::remove_user_stats(const std::string& user_id) {
 
 int UsageCache::update_bucket_stats(const std::string& bucket_name,
                                     uint64_t bytes_used,
-                                    uint64_t num_objects) {
+                                    uint64_t num_objects,
+                                    const std::string& user_id) {
+
+  if (!initialized || user_id.empty()) {
+    return -EINVAL;
+  }
+
   std::unique_lock lock(db_mutex);
   
+  // Get old bucket stats to calculate delta
+  auto old_bucket_stats = get_stats<UsageStats>(bucket_dbi, bucket_name);
+
   UsageStats stats;
   stats.bytes_used = bytes_used;
   stats.num_objects = num_objects;
   stats.last_updated = ceph::real_clock::now();
   
   int ret = put_stats(bucket_dbi, bucket_name, stats);
+  if (ret != 0) {
+    return ret;
+  }
+  
+  // Get current user stats
+  auto current_user_stats = get_stats<UsageStats>(user_dbi, user_id);
+  
+  UsageStats new_user_stats;
+  if (current_user_stats.has_value()) {
+    new_user_stats.bytes_used = current_user_stats->bytes_used;
+    new_user_stats.num_objects = current_user_stats->num_objects;
+  } else {
+    new_user_stats.bytes_used = 0;
+    new_user_stats.num_objects = 0;
+  }
+  
+  // Calculate delta (what changed for this bucket)
+  int64_t delta_bytes = (int64_t)bytes_used;
+  int64_t delta_objects = (int64_t)num_objects;
+  
+  if (old_bucket_stats.has_value()) {
+    delta_bytes -= (int64_t)old_bucket_stats->bytes_used;
+    delta_objects -= (int64_t)old_bucket_stats->num_objects;
+  }
+  
+  // Apply delta to user stats
+  new_user_stats.bytes_used = (uint64_t)((int64_t)new_user_stats.bytes_used + delta_bytes);
+  new_user_stats.num_objects = (uint64_t)((int64_t)new_user_stats.num_objects + delta_objects);
+  new_user_stats.last_updated = ceph::real_clock::now();
+  
+  // Update user stats in cache
+  ret = put_stats(user_dbi, user_id, new_user_stats);
   if (ret == 0) {
     inc_counter(PERF_CACHE_UPDATE);
     set_counter(PERF_CACHE_SIZE, get_cache_size_internal());
@@ -733,6 +774,112 @@ double UsageCache::get_hit_rate() const {
   return (total > 0) ? (double)hits / total * 100.0 : 0.0;
 }
 
+std::vector<std::pair<std::string, UsageStats>> UsageCache::get_all_users() {
+  std::vector<std::pair<std::string, UsageStats>> result;
+  
+  if (!env) {
+    ldout(cct, 5) << "get_all_users: database not initialized" << dendl;
+    return result;
+  }
+  
+  MDB_txn* txn = nullptr;
+  int rc = mdb_txn_begin(env, nullptr, MDB_RDONLY, &txn);
+  if (rc != 0) {
+    ldout(cct, 5) << "LMDB txn_begin failed in get_all_users: " 
+                  << mdb_strerror(rc) << dendl;
+    return result;
+  }
+  
+  MDB_cursor* cursor = nullptr;
+  rc = mdb_cursor_open(txn, user_dbi, &cursor);
+  if (rc != 0) {
+    ldout(cct, 5) << "LMDB cursor_open failed in get_all_users: " 
+                  << mdb_strerror(rc) << dendl;
+    mdb_txn_abort(txn);
+    return result;
+  }
+  
+  MDB_val key, data;
+  while ((rc = mdb_cursor_get(cursor, &key, &data, MDB_NEXT)) == 0) {
+    std::string user_id((char*)key.mv_data, key.mv_size);
+    
+    UsageStats stats;
+    try {
+      bufferlist bl;
+      bl.append((char*)data.mv_data, data.mv_size);
+      auto iter = bl.cbegin();
+      stats.decode(iter);
+      
+      result.push_back({user_id, stats});
+      ldout(cct, 20) << "get_all_users: loaded user=" << user_id 
+                     << " bytes=" << stats.bytes_used 
+                     << " objects=" << stats.num_objects << dendl;
+    } catch (const std::exception& e) {
+      ldout(cct, 1) << "Failed to decode user stats for " << user_id 
+                    << ": " << e.what() << dendl;
+    }
+  }
+  
+  mdb_cursor_close(cursor);
+  mdb_txn_abort(txn);
+  
+  ldout(cct, 10) << "get_all_users: loaded " << result.size() << " users" << dendl;
+  return result;
+}
+
+std::vector<std::pair<std::string, UsageStats>> UsageCache::get_all_buckets() {
+  std::vector<std::pair<std::string, UsageStats>> result;
+  
+  if (!env) {
+    ldout(cct, 5) << "get_all_buckets: database not initialized" << dendl;
+    return result;
+  }
+  
+  MDB_txn* txn = nullptr;
+  int rc = mdb_txn_begin(env, nullptr, MDB_RDONLY, &txn);
+  if (rc != 0) {
+    ldout(cct, 5) << "LMDB txn_begin failed in get_all_buckets: " 
+                  << mdb_strerror(rc) << dendl;
+    return result;
+  }
+  
+  MDB_cursor* cursor = nullptr;
+  rc = mdb_cursor_open(txn, bucket_dbi, &cursor);
+  if (rc != 0) {
+    ldout(cct, 5) << "LMDB cursor_open failed in get_all_buckets: " 
+                  << mdb_strerror(rc) << dendl;
+    mdb_txn_abort(txn);
+    return result;
+  }
+  
+  MDB_val key, data;
+  while ((rc = mdb_cursor_get(cursor, &key, &data, MDB_NEXT)) == 0) {
+    std::string bucket_key((char*)key.mv_data, key.mv_size);
+    
+    UsageStats stats;
+    try {
+      bufferlist bl;
+      bl.append((char*)data.mv_data, data.mv_size);
+      auto iter = bl.cbegin();
+      stats.decode(iter);
+      
+      result.push_back({bucket_key, stats});
+      ldout(cct, 20) << "get_all_buckets: loaded bucket=" << bucket_key 
+                     << " bytes=" << stats.bytes_used 
+                     << " objects=" << stats.num_objects << dendl;
+    } catch (const std::exception& e) {
+      ldout(cct, 1) << "Failed to decode bucket stats for " << bucket_key 
+                    << ": " << e.what() << dendl;
+    }
+  }
+  
+  mdb_cursor_close(cursor);
+  mdb_txn_abort(txn);
+  
+  ldout(cct, 10) << "get_all_buckets: loaded " << result.size() << " buckets" << dendl;
+  return result;
+}
+
 // Explicit template instantiations
 template int UsageCache::put_stats(MDB_dbi, const std::string&, const UsageStats&);
 template std::optional<UsageStats> UsageCache::get_stats(MDB_dbi, const std::string&);
index 34701044f78ac0918364dcad92947a1b844a8034..6cf951be985d586fb4e7c2465917f74ec8dacfd7 100644 (file)
@@ -70,7 +70,8 @@ public:
   // Bucket stats operations (non-const to update counters)
   int update_bucket_stats(const std::string& bucket_name,
                          uint64_t bytes_used,
-                         uint64_t num_objects);
+                         uint64_t num_objects,
+                         const std::string& user_id);
   std::optional<UsageStats> get_bucket_stats(const std::string& bucket_name);
   int remove_bucket_stats(const std::string& bucket_name);
   
@@ -85,6 +86,10 @@ public:
   uint64_t get_cache_misses() const;
   double get_hit_rate() const;
 
+  // Iterator methods for initial load
+  std::vector<std::pair<std::string, UsageStats>> get_all_users();
+  std::vector<std::pair<std::string, UsageStats>> get_all_buckets();
+  
 private:
   // Database operations
   int open_database();
index bd70553b85377c2ca25bb1df4547984589beaf18..77184a5555753663438f17af13341ab6a2fda3f3 100644 (file)
@@ -170,7 +170,82 @@ int UsagePerfCounters::init() {
 void UsagePerfCounters::start() {
   ldout(cct, 10) << "Starting usage perf counters" << dendl;
   shutdown_flag = false;
-  
+
+  if (cache) {
+    ldout(cct, 10) << "Loading all stats from cache on startup" << dendl;
+    
+    // Load users - directly populate perf counters without touching cache
+    auto all_users = cache->get_all_users();
+    for (const auto& [user_id, stats] : all_users) {
+      // Create perf counter if needed
+      {
+        std::unique_lock lock(counters_mutex);
+        auto it = user_perf_counters.find(user_id);
+        if (it == user_perf_counters.end()) {
+          PerfCounters* counters = create_user_counters(user_id);
+          if (counters) {
+            user_perf_counters[user_id] = counters;
+            it = user_perf_counters.find(user_id);
+            ldout(cct, 15) << "Created perf counter for user " << user_id << dendl;
+          }
+        }
+        
+        // Set values directly on perf counter
+        if (it != user_perf_counters.end() && it->second) {
+          it->second->set(930001, stats.bytes_used);   // l_rgw_user_bytes
+          it->second->set(930002, stats.num_objects);  // l_rgw_user_objects
+          ldout(cct, 15) << "Set perf counter for user " << user_id 
+                         << " bytes=" << stats.bytes_used 
+                         << " objects=" << stats.num_objects << dendl;
+        }
+      }
+      
+      // Mark as active for refresh worker
+      mark_user_active(user_id);
+    }
+    
+    // Load buckets - directly populate perf counters without touching cache
+    auto all_buckets = cache->get_all_buckets();
+    for (const auto& [bucket_key, stats] : all_buckets) {
+      std::string bucket_name = bucket_key;
+      std::string tenant;
+      size_t pos = bucket_key.find('/');
+      if (pos != std::string::npos) {
+        tenant = bucket_key.substr(0, pos);
+        bucket_name = bucket_key.substr(pos + 1);
+      }
+      
+      // Create perf counter if needed
+      {
+        std::unique_lock lock(counters_mutex);
+        auto it = bucket_perf_counters.find(bucket_name);
+        if (it == bucket_perf_counters.end()) {
+          PerfCounters* counters = create_bucket_counters(bucket_name);
+          if (counters) {
+            bucket_perf_counters[bucket_name] = counters;
+            it = bucket_perf_counters.find(bucket_name);
+            ldout(cct, 15) << "Created perf counter for bucket " << bucket_name << dendl;
+          }
+        }
+        
+        // Set values directly on perf counter
+        if (it != bucket_perf_counters.end() && it->second) {
+          it->second->set(940001, stats.bytes_used);   // l_rgw_bucket_bytes
+          it->second->set(940002, stats.num_objects);  // l_rgw_bucket_objects
+          ldout(cct, 15) << "Set perf counter for bucket " << bucket_name 
+                         << " bytes=" << stats.bytes_used 
+                         << " objects=" << stats.num_objects << dendl;
+        }
+      }
+      
+      // Mark as active for refresh worker
+      mark_bucket_active(bucket_name, tenant);
+    }
+    
+    ldout(cct, 10) << "Initial load complete: " << all_users.size() 
+                   << " users, " << all_buckets.size() << " buckets" << dendl;
+  }
+
   // Start cleanup thread
   cleanup_thread = std::thread(&UsagePerfCounters::cleanup_worker, this);
   
@@ -246,15 +321,17 @@ void UsagePerfCounters::shutdown() {
 void UsagePerfCounters::update_bucket_stats(const std::string& bucket_name,
                                             uint64_t bytes_used,
                                             uint64_t num_objects,
+                                            const std::string& user_id,
                                             bool update_cache) {
   ldout(cct, 20) << "update_bucket_stats: bucket=" << bucket_name
                  << " bytes=" << bytes_used 
-                 << " objects=" << num_objects 
+                 << " objects=" << num_objects
+                 << " user=" << user_id 
                  << " update_cache=" << update_cache << dendl;
   
-  // Update cache if requested - ALWAYS write the total values passed in
-  if (update_cache && cache) {
-    int ret = cache->update_bucket_stats(bucket_name, bytes_used, num_objects);
+  // Update cache if requested - cache will aggregate user stats
+  if (update_cache && cache && !user_id.empty()) {
+    int ret = cache->update_bucket_stats(bucket_name, bytes_used, num_objects, user_id);
     if (ret == 0) {
       global_counters->inc(l_rgw_usage_cache_update);
       ldout(cct, 15) << "Cache updated for bucket " << bucket_name 
@@ -388,7 +465,7 @@ void UsagePerfCounters::mark_bucket_active(const std::string& bucket_name,
     }
     
     update_bucket_stats(bucket_only, cached_stats->bytes_used, 
-                       cached_stats->num_objects, false);
+                       cached_stats->num_objects,"" ,false);
   }
 }
 
@@ -485,7 +562,7 @@ void UsagePerfCounters::refresh_bucket_stats(const std::string& bucket_key) {
                    << " objects=" << cached_stats->num_objects << dendl;
     
     update_bucket_stats(bucket_name, cached_stats->bytes_used, 
-                       cached_stats->num_objects, false);
+                       cached_stats->num_objects, "", false);
   }
 }
 
@@ -527,7 +604,7 @@ void UsagePerfCounters::refresh_from_cache(const std::string& user_id,
     if (bucket_stats) {
       global_counters->inc(l_rgw_usage_cache_hit);
       update_bucket_stats(bucket_name, bucket_stats->bytes_used,
-                         bucket_stats->num_objects, false);
+                         bucket_stats->num_objects, "", false);
     } else {
       global_counters->inc(l_rgw_usage_cache_miss);
     }
index f1d4cdfa82b74882c59c214de30777e77f4bdfcd..90a3a9f9df38600502a490f9090152b67565a746 100644 (file)
@@ -94,6 +94,7 @@ public:
   void update_bucket_stats(const std::string& bucket_name,
                           uint64_t bytes_used,
                           uint64_t num_objects,
+                          const std::string& user_id = "",
                           bool update_cache = true);
 
 void mark_bucket_active(const std::string& bucket_name,