]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw: fixing per bucket counters
authorHarsimran Singh <hsthukral51@gmail.com>
Fri, 12 Dec 2025 06:09:43 +0000 (11:39 +0530)
committerHarsimran Singh <hsthukral51@gmail.com>
Fri, 12 Dec 2025 10:43:39 +0000 (16:13 +0530)
Signed-off-by: Harsimran Singh <hsthukral51@gmail.com>
src/rgw/rgw_usage_cache.cc
src/rgw/rgw_usage_cache.h
src/rgw/rgw_usage_perf.cc
src/rgw/rgw_usage_perf.h
src/test/rgw/CMakeLists.txt

index ed22c9f7f14b5019507ec63fadeb3fc991cab619..d69d247e999e8c861c5ab23e4258ebe434a5a217 100644 (file)
@@ -145,6 +145,31 @@ void UsageCache::set_counter(int counter, uint64_t value) {
   }
 }
 
+int UsageCache::delete_corrupted_database() {
+  std::vector<std::string> files_to_delete = {
+    config.db_path,           // Main database file (e.g., /tmp/usage_cache.mdb)
+    config.db_path + "-lock"  // Lock file (e.g., /tmp/usage_cache.mdb-lock)
+  };
+  
+  bool any_deleted = false;
+  for (const auto& file : files_to_delete) {
+    if (unlink(file.c_str()) == 0) {
+      if (cct) {
+        ldout(cct, 1) << "UsageCache: Deleted corrupted file: " << file << dendl;
+      }
+      any_deleted = true;
+    } else if (errno != ENOENT) {
+      // ENOENT is okay (file doesn't exist), other errors are problems
+      if (cct) {
+        ldout(cct, 1) << "UsageCache: Warning - could not delete " << file 
+                      << ": " << cpp_strerror(errno) << dendl;
+      }
+    }
+  }
+  
+  return any_deleted ? 0 : -ENOENT;
+}
+
 int UsageCache::init() {
   if (initialized.exchange(true)) {
     return 0;
@@ -175,14 +200,47 @@ int UsageCache::init() {
     }
   }
   
+  // Try to open database
   int ret = open_database();
-  if (ret < 0) {
+  
+  // Handle corruption with auto-recovery
+  if (ret == -MDB_CORRUPTED || ret == -MDB_INVALID || ret == -MDB_BAD_TXN || ret == -MDB_PANIC) {
+    ldout(cct, 0) << "UsageCache: Corrupted cache detected (error=" << ret 
+                  << "), attempting recovery..." << dendl;
+    
+    // Delete corrupted database files
+    ret = delete_corrupted_database();
+    if (ret < 0) {
+      ldout(cct, 0) << "UsageCache: Failed to delete corrupted cache files: " 
+                    << cpp_strerror(-ret) << dendl;
+      // Continue anyway - open_database will create fresh files
+    }
+    
+    // Try to open fresh database
+    ret = open_database();
+    if (ret < 0) {
+      ldout(cct, 0) << "UsageCache: Failed to recreate cache after corruption: " 
+                    << cpp_strerror(-ret) << dendl;
+      initialized = false;
+      return ret;
+    }
+    
+    ldout(cct, 0) << "UsageCache: Successfully recovered from corruption. "
+                  << "Cache will be repopulated on next refresh cycle." << dendl;
+  } else if (ret < 0) {
+    ldout(cct, 0) << "UsageCache: Failed to open database: " 
+                  << cpp_strerror(-ret) << dendl;
     initialized = false;
     return ret;
   }
   
   set_counter(PERF_CACHE_SIZE, get_cache_size());
   
+  if (cct) {
+    ldout(cct, 1) << "UsageCache: Initialized successfully at " 
+                  << config.db_path << dendl;
+  }
+  
   return 0;
 }
 
@@ -198,7 +256,7 @@ int UsageCache::open_database() {
     if (cct) {
       ldout(cct, 0) << "LMDB env_create failed: " << mdb_strerror(rc) << dendl;
     }
-    return -EIO;
+    return -rc;
   }
 
   rc = mdb_env_set_mapsize(env, config.max_db_size);
@@ -208,7 +266,7 @@ int UsageCache::open_database() {
     }
     mdb_env_close(env);
     env = nullptr;
-    return -EIO;
+    return -rc;
   }
 
   rc = mdb_env_set_maxreaders(env, config.max_readers);
@@ -218,7 +276,7 @@ int UsageCache::open_database() {
     }
     mdb_env_close(env);
     env = nullptr;
-    return -EIO;
+    return -rc;
   }
 
   rc = mdb_env_set_maxdbs(env, 2);
@@ -228,7 +286,7 @@ int UsageCache::open_database() {
     }
     mdb_env_close(env);
     env = nullptr;
-    return -EIO;
+    return -rc;
   }
 
   rc = mdb_env_open(env, config.db_path.c_str(), MDB_NOSUBDIR | MDB_NOTLS, 0644);
@@ -239,7 +297,7 @@ int UsageCache::open_database() {
     }
     mdb_env_close(env);
     env = nullptr;
-    return -EIO;
+    return -rc;
   }
 
   // Open named databases
@@ -251,7 +309,7 @@ int UsageCache::open_database() {
     }
     mdb_env_close(env);
     env = nullptr;
-    return -EIO;
+    return -rc;
   }
 
   rc = mdb_dbi_open(txn, "user_stats", MDB_CREATE, &user_dbi);
@@ -263,7 +321,7 @@ int UsageCache::open_database() {
     mdb_txn_abort(txn);
     mdb_env_close(env);
     env = nullptr;
-    return -EIO;
+    return -rc;
   }
 
   rc = mdb_dbi_open(txn, "bucket_stats", MDB_CREATE, &bucket_dbi);
@@ -275,7 +333,7 @@ int UsageCache::open_database() {
     mdb_txn_abort(txn);
     mdb_env_close(env);
     env = nullptr;
-    return -EIO;
+    return -rc;
   }
 
   rc = mdb_txn_commit(txn);
@@ -285,7 +343,7 @@ int UsageCache::open_database() {
     }
     mdb_env_close(env);
     env = nullptr;
-    return -EIO;
+    return -rc;
   }
 
   if (cct) {
index aad811856db34bdb27ab696c878922167c81be4f..69078efc681a3fd57b8090f5fece57f971a1df41 100644 (file)
@@ -91,6 +91,7 @@ private:
   // Database operations
   int open_database();
   void close_database();
+  int delete_corrupted_database();
   
   template<typename T>
   int put_stats(MDB_dbi dbi, const std::string& key, const T& stats);
index 101066ddf2c79b5c5d018f3a05a2e5cf5adbdbf6..a0b3dd4714be83305297429743482de8831d90a2 100644 (file)
@@ -153,12 +153,18 @@ int UsagePerfCounters::init() {
 void UsagePerfCounters::start() {
   ldout(cct, 10) << "Starting usage perf counters" << dendl;
   shutdown_flag = false;
-
+  bool cache_was_empty = false;
   if (cache) {
     ldout(cct, 10) << "Loading all stats from cache on startup" << dendl;
     
     // Load users - directly populate perf counters without touching cache
     auto all_users = cache->get_all_users();
+    // Load buckets - directly populate perf counters without touching cache
+    auto all_buckets = cache->get_all_buckets();
+    if (all_users.empty() && all_buckets.empty()) {
+      ldout(cct, 10) << "Cache is empty (likely after recovery or first start)" << dendl;
+      cache_was_empty = true;
+    }
     for (const auto& [user_id, stats] : all_users) {
       // Create perf counter if needed
       {
@@ -187,8 +193,6 @@ void UsagePerfCounters::start() {
       mark_user_active(user_id);
     }
     
-    // Load buckets - directly populate perf counters without touching cache
-    auto all_buckets = cache->get_all_buckets();
     for (const auto& [bucket_key, stats] : all_buckets) {
       std::string bucket_name = bucket_key;
       std::string tenant;
@@ -228,7 +232,11 @@ void UsagePerfCounters::start() {
     ldout(cct, 10) << "Initial load complete: " << all_users.size() 
                    << " users, " << all_buckets.size() << " buckets" << dendl;
   }
-  
+  if (cache_was_empty) {
+    ldout(cct, 10) << "Cache was empty, enumerating all buckets from RADOS metadata" << dendl;
+    enumerate_all_buckets_from_metadata();
+    enumerate_all_users_from_metadata();
+  }
   // Start refresh thread
   refresh_thread = std::thread(&UsagePerfCounters::refresh_worker, this);
   
@@ -275,6 +283,87 @@ void UsagePerfCounters::sync_user_from_rados(const std::string& user_id) {
   update_user_stats(user_id, stats.size, stats.num_objects, false);
 }
 
+void UsagePerfCounters::sync_bucket_from_rados(const std::string& bucket_key) {
+  if (!driver) {
+    ldout(cct, 10) << "sync_bucket_from_rados: no driver available" << dendl;
+    return;
+  }
+  
+  ldout(cct, 15) << "sync_bucket_from_rados: bucket=" << bucket_key << dendl;
+  
+  // Parse tenant/bucket from bucket_key
+  std::string tenant;
+  std::string bucket_name = bucket_key;
+  size_t pos = bucket_key.find('/');
+  if (pos != std::string::npos) {
+    tenant = bucket_key.substr(0, pos);
+    bucket_name = bucket_key.substr(pos + 1);
+  }
+  
+  UsagePerfDoutPrefix dpp(cct);
+  
+  // Load the bucket
+  std::unique_ptr<rgw::sal::Bucket> bucket;
+  int ret = driver->load_bucket(&dpp, rgw_bucket(tenant, bucket_name),
+                                 &bucket, null_yield);
+  if (ret < 0) {
+    ldout(cct, 10) << "Failed to load bucket " << bucket_key
+                   << ": " << cpp_strerror(-ret) << dendl;
+    
+    // If bucket doesn't exist anymore, clean up stale entries
+    if (ret == -ENOENT) {
+      ldout(cct, 10) << "Bucket " << bucket_key << " no longer exists, cleaning up" << dendl;
+      
+      // Remove from active_buckets
+      {
+        std::lock_guard<std::mutex> lock(activity_mutex);
+        active_buckets.erase(bucket_key);
+      }
+      
+      // Remove from LMDB cache
+      if (cache) {
+        cache->remove_bucket_stats(bucket_key);
+      }
+      
+      // Remove perf counter
+      {
+        std::unique_lock<std::shared_mutex> lock(counters_mutex);
+        auto it = bucket_perf_counters.find(bucket_name);
+        if (it != bucket_perf_counters.end()) {
+          auto* coll = cct->get_perfcounters_collection();
+          if (coll) {
+            coll->remove(it->second);
+          }
+          delete it->second;
+          bucket_perf_counters.erase(it);
+        }
+      }
+    }
+    return;
+  }
+  
+  // Get bucket stats
+  RGWBucketEnt ent;
+  ret = bucket->sync_owner_stats(&dpp, null_yield, &ent);
+  if (ret < 0) {
+    ldout(cct, 10) << "Failed to sync bucket stats for " << bucket_key 
+                   << ": " << cpp_strerror(-ret) << dendl;
+    return;
+  }
+  
+  ldout(cct, 15) << "Got stats from RADOS: bucket=" << bucket_key
+                 << " bytes=" << ent.size
+                 << " objects=" << ent.count << dendl;
+  
+  // Update local LMDB cache
+  if (cache) {
+    cache->update_bucket_stats(bucket_key, ent.size, ent.count);
+  }
+  
+  // Update perf counters
+  update_bucket_stats(bucket_name, ent.size, ent.count, tenant, false);
+}
+
 void UsagePerfCounters::stop() {
   ldout(cct, 10) << "Stopping usage perf counters" << dendl;
   
@@ -560,24 +649,133 @@ void UsagePerfCounters::refresh_worker() {
   ldout(cct, 10) << "Stopped usage stats refresh worker thread" << dendl;
 }
 
-void UsagePerfCounters::refresh_bucket_stats(const std::string& bucket_key) {
-  ldout(cct, 20) << "refresh_bucket_stats: key=" << bucket_key << dendl;
+void UsagePerfCounters::enumerate_all_buckets_from_metadata() {
+  if (!driver) {
+    ldout(cct, 10) << "enumerate_all_buckets: no driver available" << dendl;
+    return;
+  }
   
-  auto cached_stats = cache->get_bucket_stats(bucket_key);
-  if (cached_stats) {
-    std::string bucket_name = bucket_key;
-    size_t pos = bucket_key.find('/');
-    if (pos != std::string::npos) {
-      bucket_name = bucket_key.substr(pos + 1);
+  UsagePerfDoutPrefix dpp(cct);
+  ldout(cct, 10) << "Enumerating all buckets via metadata API" << dendl;
+  
+  void* handle = nullptr;
+  int ret = driver->meta_list_keys_init(&dpp, "bucket.instance", "", &handle);
+  if (ret < 0) {
+    ldout(cct, 1) << "Failed to init bucket metadata listing: " 
+                  << cpp_strerror(-ret) << dendl;
+    return;
+  }
+  
+  int total_buckets = 0;
+  bool truncated = true;
+  
+  while (truncated && !shutdown_flag) {
+    std::list<std::string> keys;
+    ret = driver->meta_list_keys_next(&dpp, handle, 1000, keys, &truncated);
+    if (ret < 0) {
+      ldout(cct, 1) << "Failed to list bucket metadata: " 
+                    << cpp_strerror(-ret) << dendl;
+      break;
     }
     
-    ldout(cct, 15) << "Refreshing bucket " << bucket_key 
-                   << " bytes=" << cached_stats->bytes_used 
-                   << " objects=" << cached_stats->num_objects << dendl;
+    for (const auto& key : keys) {
+      if (shutdown_flag) break;
+      
+      // Parse bucket key format: "tenant:bucket_name:bucket_id" or "bucket_name:bucket_id"
+      std::string bucket_name;
+      std::string tenant;
+      
+      size_t first_colon = key.find(':');
+      if (first_colon != std::string::npos) {
+        // Check if there's a second colon (indicates tenant is present)
+        size_t second_colon = key.find(':', first_colon + 1);
+        if (second_colon != std::string::npos) {
+          // Format: tenant:bucket_name:bucket_id
+          tenant = key.substr(0, first_colon);
+          bucket_name = key.substr(first_colon + 1, second_colon - first_colon - 1);
+        } else {
+          // Format: bucket_name:bucket_id (no tenant)
+          bucket_name = key.substr(0, first_colon);
+        }
+      } else {
+        // No colons, just bucket name
+        bucket_name = key;
+      }
+      
+      if (bucket_name.empty()) {
+        ldout(cct, 5) << "Skipping empty bucket name from key: " << key << dendl;
+        continue;
+      }
+      
+      std::string bucket_key = tenant.empty() ? bucket_name : tenant + "/" + bucket_name;
+      mark_bucket_active(bucket_key, tenant);
+      total_buckets++;
+      
+      ldout(cct, 20) << "Added bucket to monitoring: " << bucket_key 
+                     << " (from metadata key: " << key << ")" << dendl;
+    }
+  }
+  
+  driver->meta_list_keys_complete(handle);
+  ldout(cct, 10) << "Bucket enumeration complete: monitoring " 
+                 << total_buckets << " buckets from metadata" << dendl;
+}
+
+void UsagePerfCounters::enumerate_all_users_from_metadata() {
+  if (!driver) {
+    ldout(cct, 10) << "enumerate_all_users: no driver available" << dendl;
+    return;
+  }
+  
+  UsagePerfDoutPrefix dpp(cct);
+  ldout(cct, 10) << "Enumerating all users via metadata API" << dendl;
+  
+  void* handle = nullptr;
+  // Use "user" section which contains all users
+  int ret = driver->meta_list_keys_init(&dpp, "user", "", &handle);
+  if (ret < 0) {
+    ldout(cct, 1) << "Failed to init user metadata listing: " 
+                  << cpp_strerror(-ret) << dendl;
+    return;
+  }
+  
+  int total_users = 0;
+  bool truncated = true;
+  
+  while (truncated && !shutdown_flag) {
+    std::list<std::string> keys;
+    ret = driver->meta_list_keys_next(&dpp, handle, 1000, keys, &truncated);
+    if (ret < 0) {
+      ldout(cct, 1) << "Failed to list user metadata: " 
+                    << cpp_strerror(-ret) << dendl;
+      break;
+    }
     
-    update_bucket_stats(bucket_name, cached_stats->bytes_used, 
-                       cached_stats->num_objects, "", false);
+    for (const auto& user_id : keys) {
+      if (shutdown_flag) break;
+      
+      if (user_id.empty()) {
+        continue;
+      }
+      
+      mark_user_active(user_id);
+      total_users++;
+      
+      ldout(cct, 20) << "Added user to monitoring: " << user_id 
+                     << " (from metadata)" << dendl;
+    }
   }
+  
+  driver->meta_list_keys_complete(handle);
+  ldout(cct, 10) << "User enumeration complete: monitoring " 
+                 << total_users << " users from metadata" << dendl;
+}
+
+void UsagePerfCounters::refresh_bucket_stats(const std::string& bucket_key) {
+  ldout(cct, 20) << "refresh_bucket_stats: key=" << bucket_key << dendl;
+  
+  // Fetch real stats from RADOS
+  sync_bucket_from_rados(bucket_key);
 }
 
 void UsagePerfCounters::refresh_user_stats(const std::string& user_id) {
index 3ced9271d5b0d08312b15d2135a3434079eca807..c8c33da0498506124e11f63695bb22ff3b8e57b5 100644 (file)
@@ -75,6 +75,8 @@ private:
   
   void cleanup_worker();
   void refresh_worker();
+  void enumerate_all_buckets_from_metadata();
+  void enumerate_all_users_from_metadata();
   
   void refresh_bucket_stats(const std::string& bucket_key);
   void refresh_user_stats(const std::string& user_id);
@@ -121,6 +123,7 @@ void evict_from_cache(const std::string& user_id,
                       const std::string& bucket_name);
   
 void sync_user_from_rados(const std::string& user_id);
+void sync_bucket_from_rados(const std::string& bucket_key);
 
 // Stats retrieval (from cache)
 std::optional<UsageStats> get_user_stats(const std::string& user_id);
index 94c8bdcc1e4854614a565858b14fe7de1025ac02..0fd98dec879b9fdb88022292d9c47a72b7f6e229 100644 (file)
@@ -392,6 +392,8 @@ target_include_directories(unittest_rgw_usage_perf_counters PRIVATE
 )
 
 target_link_libraries(unittest_rgw_usage_perf_counters
+                      rgw_common
+                      ${rgw_libs}
                       ${UNITTEST_LIBS}
                       global
                       ceph-common