From: Harsimran Singh Date: Fri, 12 Dec 2025 06:09:43 +0000 (+0530) Subject: rgw: fixing per bucket counters X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=9742fbedb0e54df5aeade0f7b8aee0b44166a96e;p=ceph-ci.git rgw: fixing per bucket counters Signed-off-by: Harsimran Singh --- diff --git a/src/rgw/rgw_usage_cache.cc b/src/rgw/rgw_usage_cache.cc index ed22c9f7f14..d69d247e999 100644 --- a/src/rgw/rgw_usage_cache.cc +++ b/src/rgw/rgw_usage_cache.cc @@ -145,6 +145,31 @@ void UsageCache::set_counter(int counter, uint64_t value) { } } +int UsageCache::delete_corrupted_database() { + std::vector files_to_delete = { + config.db_path, // Main database file (e.g., /tmp/usage_cache.mdb) + config.db_path + "-lock" // Lock file (e.g., /tmp/usage_cache.mdb-lock) + }; + + bool any_deleted = false; + for (const auto& file : files_to_delete) { + if (unlink(file.c_str()) == 0) { + if (cct) { + ldout(cct, 1) << "UsageCache: Deleted corrupted file: " << file << dendl; + } + any_deleted = true; + } else if (errno != ENOENT) { + // ENOENT is okay (file doesn't exist), other errors are problems + if (cct) { + ldout(cct, 1) << "UsageCache: Warning - could not delete " << file + << ": " << cpp_strerror(errno) << dendl; + } + } + } + + return any_deleted ? 0 : -ENOENT; +} + int UsageCache::init() { if (initialized.exchange(true)) { return 0; @@ -175,14 +200,47 @@ int UsageCache::init() { } } + // Try to open database int ret = open_database(); - if (ret < 0) { + + // Handle corruption with auto-recovery + if (ret == -MDB_CORRUPTED || ret == -MDB_INVALID || ret == -MDB_BAD_TXN || ret == -MDB_PANIC) { + ldout(cct, 0) << "UsageCache: Corrupted cache detected (error=" << ret + << "), attempting recovery..." << dendl; + + // Delete corrupted database files + ret = delete_corrupted_database(); + if (ret < 0) { + ldout(cct, 0) << "UsageCache: Failed to delete corrupted cache files: " + << cpp_strerror(-ret) << dendl; + // Continue anyway - open_database will create fresh files + } + + // Try to open fresh database + ret = open_database(); + if (ret < 0) { + ldout(cct, 0) << "UsageCache: Failed to recreate cache after corruption: " + << cpp_strerror(-ret) << dendl; + initialized = false; + return ret; + } + + ldout(cct, 0) << "UsageCache: Successfully recovered from corruption. " + << "Cache will be repopulated on next refresh cycle." << dendl; + } else if (ret < 0) { + ldout(cct, 0) << "UsageCache: Failed to open database: " + << cpp_strerror(-ret) << dendl; initialized = false; return ret; } set_counter(PERF_CACHE_SIZE, get_cache_size()); + if (cct) { + ldout(cct, 1) << "UsageCache: Initialized successfully at " + << config.db_path << dendl; + } + return 0; } @@ -198,7 +256,7 @@ int UsageCache::open_database() { if (cct) { ldout(cct, 0) << "LMDB env_create failed: " << mdb_strerror(rc) << dendl; } - return -EIO; + return -rc; } rc = mdb_env_set_mapsize(env, config.max_db_size); @@ -208,7 +266,7 @@ int UsageCache::open_database() { } mdb_env_close(env); env = nullptr; - return -EIO; + return -rc; } rc = mdb_env_set_maxreaders(env, config.max_readers); @@ -218,7 +276,7 @@ int UsageCache::open_database() { } mdb_env_close(env); env = nullptr; - return -EIO; + return -rc; } rc = mdb_env_set_maxdbs(env, 2); @@ -228,7 +286,7 @@ int UsageCache::open_database() { } mdb_env_close(env); env = nullptr; - return -EIO; + return -rc; } rc = mdb_env_open(env, config.db_path.c_str(), MDB_NOSUBDIR | MDB_NOTLS, 0644); @@ -239,7 +297,7 @@ int UsageCache::open_database() { } mdb_env_close(env); env = nullptr; - return -EIO; + return -rc; } // Open named databases @@ -251,7 +309,7 @@ int UsageCache::open_database() { } mdb_env_close(env); env = nullptr; - return -EIO; + return -rc; } rc = mdb_dbi_open(txn, "user_stats", MDB_CREATE, &user_dbi); @@ -263,7 +321,7 @@ int UsageCache::open_database() { mdb_txn_abort(txn); mdb_env_close(env); env = nullptr; - return -EIO; + return -rc; } rc = mdb_dbi_open(txn, "bucket_stats", MDB_CREATE, &bucket_dbi); @@ -275,7 +333,7 @@ int UsageCache::open_database() { mdb_txn_abort(txn); mdb_env_close(env); env = nullptr; - return -EIO; + return -rc; } rc = mdb_txn_commit(txn); @@ -285,7 +343,7 @@ int UsageCache::open_database() { } mdb_env_close(env); env = nullptr; - return -EIO; + return -rc; } if (cct) { diff --git a/src/rgw/rgw_usage_cache.h b/src/rgw/rgw_usage_cache.h index aad811856db..69078efc681 100644 --- a/src/rgw/rgw_usage_cache.h +++ b/src/rgw/rgw_usage_cache.h @@ -91,6 +91,7 @@ private: // Database operations int open_database(); void close_database(); + int delete_corrupted_database(); template int put_stats(MDB_dbi dbi, const std::string& key, const T& stats); diff --git a/src/rgw/rgw_usage_perf.cc b/src/rgw/rgw_usage_perf.cc index 101066ddf2c..a0b3dd4714b 100644 --- a/src/rgw/rgw_usage_perf.cc +++ b/src/rgw/rgw_usage_perf.cc @@ -153,12 +153,18 @@ int UsagePerfCounters::init() { void UsagePerfCounters::start() { ldout(cct, 10) << "Starting usage perf counters" << dendl; shutdown_flag = false; - + bool cache_was_empty = false; if (cache) { ldout(cct, 10) << "Loading all stats from cache on startup" << dendl; // Load users - directly populate perf counters without touching cache auto all_users = cache->get_all_users(); + // Load buckets - directly populate perf counters without touching cache + auto all_buckets = cache->get_all_buckets(); + if (all_users.empty() && all_buckets.empty()) { + ldout(cct, 10) << "Cache is empty (likely after recovery or first start)" << dendl; + cache_was_empty = true; + } for (const auto& [user_id, stats] : all_users) { // Create perf counter if needed { @@ -187,8 +193,6 @@ void UsagePerfCounters::start() { mark_user_active(user_id); } - // Load buckets - directly populate perf counters without touching cache - auto all_buckets = cache->get_all_buckets(); for (const auto& [bucket_key, stats] : all_buckets) { std::string bucket_name = bucket_key; std::string tenant; @@ -228,7 +232,11 @@ void UsagePerfCounters::start() { ldout(cct, 10) << "Initial load complete: " << all_users.size() << " users, " << all_buckets.size() << " buckets" << dendl; } - + if (cache_was_empty) { + ldout(cct, 10) << "Cache was empty, enumerating all buckets from RADOS metadata" << dendl; + enumerate_all_buckets_from_metadata(); + enumerate_all_users_from_metadata(); + } // Start refresh thread refresh_thread = std::thread(&UsagePerfCounters::refresh_worker, this); @@ -275,6 +283,87 @@ void UsagePerfCounters::sync_user_from_rados(const std::string& user_id) { update_user_stats(user_id, stats.size, stats.num_objects, false); } +void UsagePerfCounters::sync_bucket_from_rados(const std::string& bucket_key) { + if (!driver) { + ldout(cct, 10) << "sync_bucket_from_rados: no driver available" << dendl; + return; + } + + ldout(cct, 15) << "sync_bucket_from_rados: bucket=" << bucket_key << dendl; + + // Parse tenant/bucket from bucket_key + std::string tenant; + std::string bucket_name = bucket_key; + size_t pos = bucket_key.find('/'); + if (pos != std::string::npos) { + tenant = bucket_key.substr(0, pos); + bucket_name = bucket_key.substr(pos + 1); + } + + UsagePerfDoutPrefix dpp(cct); + + // Load the bucket + std::unique_ptr bucket; + int ret = driver->load_bucket(&dpp, rgw_bucket(tenant, bucket_name), + &bucket, null_yield); + if (ret < 0) { + ldout(cct, 10) << "Failed to load bucket " << bucket_key + << ": " << cpp_strerror(-ret) << dendl; + + // If bucket doesn't exist anymore, clean up stale entries + if (ret == -ENOENT) { + ldout(cct, 10) << "Bucket " << bucket_key << " no longer exists, cleaning up" << dendl; + + // Remove from active_buckets + { + std::lock_guard lock(activity_mutex); + active_buckets.erase(bucket_key); + } + + // Remove from LMDB cache + if (cache) { + cache->remove_bucket_stats(bucket_key); + } + + // Remove perf counter + { + std::unique_lock lock(counters_mutex); + auto it = bucket_perf_counters.find(bucket_name); + if (it != bucket_perf_counters.end()) { + auto* coll = cct->get_perfcounters_collection(); + if (coll) { + coll->remove(it->second); + } + delete it->second; + bucket_perf_counters.erase(it); + } + } + } + return; + } + + // Get bucket stats + RGWBucketEnt ent; + ret = bucket->sync_owner_stats(&dpp, null_yield, &ent); + if (ret < 0) { + ldout(cct, 10) << "Failed to sync bucket stats for " << bucket_key + << ": " << cpp_strerror(-ret) << dendl; + return; + } + + ldout(cct, 15) << "Got stats from RADOS: bucket=" << bucket_key + << " bytes=" << ent.size + << " objects=" << ent.count << dendl; + + // Update local LMDB cache + if (cache) { + cache->update_bucket_stats(bucket_key, ent.size, ent.count); + } + + // Update perf counters + update_bucket_stats(bucket_name, ent.size, ent.count, tenant, false); +} + void UsagePerfCounters::stop() { ldout(cct, 10) << "Stopping usage perf counters" << dendl; @@ -560,24 +649,133 @@ void UsagePerfCounters::refresh_worker() { ldout(cct, 10) << "Stopped usage stats refresh worker thread" << dendl; } -void UsagePerfCounters::refresh_bucket_stats(const std::string& bucket_key) { - ldout(cct, 20) << "refresh_bucket_stats: key=" << bucket_key << dendl; +void UsagePerfCounters::enumerate_all_buckets_from_metadata() { + if (!driver) { + ldout(cct, 10) << "enumerate_all_buckets: no driver available" << dendl; + return; + } - auto cached_stats = cache->get_bucket_stats(bucket_key); - if (cached_stats) { - std::string bucket_name = bucket_key; - size_t pos = bucket_key.find('/'); - if (pos != std::string::npos) { - bucket_name = bucket_key.substr(pos + 1); + UsagePerfDoutPrefix dpp(cct); + ldout(cct, 10) << "Enumerating all buckets via metadata API" << dendl; + + void* handle = nullptr; + int ret = driver->meta_list_keys_init(&dpp, "bucket.instance", "", &handle); + if (ret < 0) { + ldout(cct, 1) << "Failed to init bucket metadata listing: " + << cpp_strerror(-ret) << dendl; + return; + } + + int total_buckets = 0; + bool truncated = true; + + while (truncated && !shutdown_flag) { + std::list keys; + ret = driver->meta_list_keys_next(&dpp, handle, 1000, keys, &truncated); + if (ret < 0) { + ldout(cct, 1) << "Failed to list bucket metadata: " + << cpp_strerror(-ret) << dendl; + break; } - ldout(cct, 15) << "Refreshing bucket " << bucket_key - << " bytes=" << cached_stats->bytes_used - << " objects=" << cached_stats->num_objects << dendl; + for (const auto& key : keys) { + if (shutdown_flag) break; + + // Parse bucket key format: "tenant:bucket_name:bucket_id" or "bucket_name:bucket_id" + std::string bucket_name; + std::string tenant; + + size_t first_colon = key.find(':'); + if (first_colon != std::string::npos) { + // Check if there's a second colon (indicates tenant is present) + size_t second_colon = key.find(':', first_colon + 1); + if (second_colon != std::string::npos) { + // Format: tenant:bucket_name:bucket_id + tenant = key.substr(0, first_colon); + bucket_name = key.substr(first_colon + 1, second_colon - first_colon - 1); + } else { + // Format: bucket_name:bucket_id (no tenant) + bucket_name = key.substr(0, first_colon); + } + } else { + // No colons, just bucket name + bucket_name = key; + } + + if (bucket_name.empty()) { + ldout(cct, 5) << "Skipping empty bucket name from key: " << key << dendl; + continue; + } + + std::string bucket_key = tenant.empty() ? bucket_name : tenant + "/" + bucket_name; + mark_bucket_active(bucket_key, tenant); + total_buckets++; + + ldout(cct, 20) << "Added bucket to monitoring: " << bucket_key + << " (from metadata key: " << key << ")" << dendl; + } + } + + driver->meta_list_keys_complete(handle); + ldout(cct, 10) << "Bucket enumeration complete: monitoring " + << total_buckets << " buckets from metadata" << dendl; +} + +void UsagePerfCounters::enumerate_all_users_from_metadata() { + if (!driver) { + ldout(cct, 10) << "enumerate_all_users: no driver available" << dendl; + return; + } + + UsagePerfDoutPrefix dpp(cct); + ldout(cct, 10) << "Enumerating all users via metadata API" << dendl; + + void* handle = nullptr; + // Use "user" section which contains all users + int ret = driver->meta_list_keys_init(&dpp, "user", "", &handle); + if (ret < 0) { + ldout(cct, 1) << "Failed to init user metadata listing: " + << cpp_strerror(-ret) << dendl; + return; + } + + int total_users = 0; + bool truncated = true; + + while (truncated && !shutdown_flag) { + std::list keys; + ret = driver->meta_list_keys_next(&dpp, handle, 1000, keys, &truncated); + if (ret < 0) { + ldout(cct, 1) << "Failed to list user metadata: " + << cpp_strerror(-ret) << dendl; + break; + } - update_bucket_stats(bucket_name, cached_stats->bytes_used, - cached_stats->num_objects, "", false); + for (const auto& user_id : keys) { + if (shutdown_flag) break; + + if (user_id.empty()) { + continue; + } + + mark_user_active(user_id); + total_users++; + + ldout(cct, 20) << "Added user to monitoring: " << user_id + << " (from metadata)" << dendl; + } } + + driver->meta_list_keys_complete(handle); + ldout(cct, 10) << "User enumeration complete: monitoring " + << total_users << " users from metadata" << dendl; +} + +void UsagePerfCounters::refresh_bucket_stats(const std::string& bucket_key) { + ldout(cct, 20) << "refresh_bucket_stats: key=" << bucket_key << dendl; + + // Fetch real stats from RADOS + sync_bucket_from_rados(bucket_key); } void UsagePerfCounters::refresh_user_stats(const std::string& user_id) { diff --git a/src/rgw/rgw_usage_perf.h b/src/rgw/rgw_usage_perf.h index 3ced9271d5b..c8c33da0498 100644 --- a/src/rgw/rgw_usage_perf.h +++ b/src/rgw/rgw_usage_perf.h @@ -75,6 +75,8 @@ private: void cleanup_worker(); void refresh_worker(); + void enumerate_all_buckets_from_metadata(); + void enumerate_all_users_from_metadata(); void refresh_bucket_stats(const std::string& bucket_key); void refresh_user_stats(const std::string& user_id); @@ -121,6 +123,7 @@ void evict_from_cache(const std::string& user_id, const std::string& bucket_name); void sync_user_from_rados(const std::string& user_id); +void sync_bucket_from_rados(const std::string& bucket_key); // Stats retrieval (from cache) std::optional get_user_stats(const std::string& user_id); diff --git a/src/test/rgw/CMakeLists.txt b/src/test/rgw/CMakeLists.txt index 94c8bdcc1e4..0fd98dec879 100644 --- a/src/test/rgw/CMakeLists.txt +++ b/src/test/rgw/CMakeLists.txt @@ -392,6 +392,8 @@ target_include_directories(unittest_rgw_usage_perf_counters PRIVATE ) target_link_libraries(unittest_rgw_usage_perf_counters + rgw_common + ${rgw_libs} ${UNITTEST_LIBS} global ceph-common