}
}
+int UsageCache::delete_corrupted_database() {
+ std::vector<std::string> files_to_delete = {
+ config.db_path, // Main database file (e.g., /tmp/usage_cache.mdb)
+ config.db_path + "-lock" // Lock file (e.g., /tmp/usage_cache.mdb-lock)
+ };
+
+ bool any_deleted = false;
+ for (const auto& file : files_to_delete) {
+ if (unlink(file.c_str()) == 0) {
+ if (cct) {
+ ldout(cct, 1) << "UsageCache: Deleted corrupted file: " << file << dendl;
+ }
+ any_deleted = true;
+ } else if (errno != ENOENT) {
+ // ENOENT is okay (file doesn't exist), other errors are problems
+ if (cct) {
+ ldout(cct, 1) << "UsageCache: Warning - could not delete " << file
+ << ": " << cpp_strerror(errno) << dendl;
+ }
+ }
+ }
+
+ return any_deleted ? 0 : -ENOENT;
+}
+
int UsageCache::init() {
if (initialized.exchange(true)) {
return 0;
}
}
+ // Try to open database
int ret = open_database();
- if (ret < 0) {
+
+ // Handle corruption with auto-recovery
+ if (ret == -MDB_CORRUPTED || ret == -MDB_INVALID || ret == -MDB_BAD_TXN || ret == -MDB_PANIC) {
+ ldout(cct, 0) << "UsageCache: Corrupted cache detected (error=" << ret
+ << "), attempting recovery..." << dendl;
+
+ // Delete corrupted database files
+ ret = delete_corrupted_database();
+ if (ret < 0) {
+ ldout(cct, 0) << "UsageCache: Failed to delete corrupted cache files: "
+ << cpp_strerror(-ret) << dendl;
+ // Continue anyway - open_database will create fresh files
+ }
+
+ // Try to open fresh database
+ ret = open_database();
+ if (ret < 0) {
+ ldout(cct, 0) << "UsageCache: Failed to recreate cache after corruption: "
+ << cpp_strerror(-ret) << dendl;
+ initialized = false;
+ return ret;
+ }
+
+ ldout(cct, 0) << "UsageCache: Successfully recovered from corruption. "
+ << "Cache will be repopulated on next refresh cycle." << dendl;
+ } else if (ret < 0) {
+ ldout(cct, 0) << "UsageCache: Failed to open database: "
+ << cpp_strerror(-ret) << dendl;
initialized = false;
return ret;
}
set_counter(PERF_CACHE_SIZE, get_cache_size());
+ if (cct) {
+ ldout(cct, 1) << "UsageCache: Initialized successfully at "
+ << config.db_path << dendl;
+ }
+
return 0;
}
if (cct) {
ldout(cct, 0) << "LMDB env_create failed: " << mdb_strerror(rc) << dendl;
}
- return -EIO;
+ return -rc;
}
rc = mdb_env_set_mapsize(env, config.max_db_size);
}
mdb_env_close(env);
env = nullptr;
- return -EIO;
+ return -rc;
}
rc = mdb_env_set_maxreaders(env, config.max_readers);
}
mdb_env_close(env);
env = nullptr;
- return -EIO;
+ return -rc;
}
rc = mdb_env_set_maxdbs(env, 2);
}
mdb_env_close(env);
env = nullptr;
- return -EIO;
+ return -rc;
}
rc = mdb_env_open(env, config.db_path.c_str(), MDB_NOSUBDIR | MDB_NOTLS, 0644);
}
mdb_env_close(env);
env = nullptr;
- return -EIO;
+ return -rc;
}
// Open named databases
}
mdb_env_close(env);
env = nullptr;
- return -EIO;
+ return -rc;
}
rc = mdb_dbi_open(txn, "user_stats", MDB_CREATE, &user_dbi);
mdb_txn_abort(txn);
mdb_env_close(env);
env = nullptr;
- return -EIO;
+ return -rc;
}
rc = mdb_dbi_open(txn, "bucket_stats", MDB_CREATE, &bucket_dbi);
mdb_txn_abort(txn);
mdb_env_close(env);
env = nullptr;
- return -EIO;
+ return -rc;
}
rc = mdb_txn_commit(txn);
}
mdb_env_close(env);
env = nullptr;
- return -EIO;
+ return -rc;
}
if (cct) {
void UsagePerfCounters::start() {
ldout(cct, 10) << "Starting usage perf counters" << dendl;
shutdown_flag = false;
-
+ bool cache_was_empty = false;
if (cache) {
ldout(cct, 10) << "Loading all stats from cache on startup" << dendl;
// Load users - directly populate perf counters without touching cache
auto all_users = cache->get_all_users();
+ // Load buckets - directly populate perf counters without touching cache
+ auto all_buckets = cache->get_all_buckets();
+ if (all_users.empty() && all_buckets.empty()) {
+ ldout(cct, 10) << "Cache is empty (likely after recovery or first start)" << dendl;
+ cache_was_empty = true;
+ }
for (const auto& [user_id, stats] : all_users) {
// Create perf counter if needed
{
mark_user_active(user_id);
}
- // Load buckets - directly populate perf counters without touching cache
- auto all_buckets = cache->get_all_buckets();
for (const auto& [bucket_key, stats] : all_buckets) {
std::string bucket_name = bucket_key;
std::string tenant;
ldout(cct, 10) << "Initial load complete: " << all_users.size()
<< " users, " << all_buckets.size() << " buckets" << dendl;
}
-
+ if (cache_was_empty) {
+ ldout(cct, 10) << "Cache was empty, enumerating all buckets from RADOS metadata" << dendl;
+ enumerate_all_buckets_from_metadata();
+ enumerate_all_users_from_metadata();
+ }
// Start refresh thread
refresh_thread = std::thread(&UsagePerfCounters::refresh_worker, this);
update_user_stats(user_id, stats.size, stats.num_objects, false);
}
+void UsagePerfCounters::sync_bucket_from_rados(const std::string& bucket_key) {
+ if (!driver) {
+ ldout(cct, 10) << "sync_bucket_from_rados: no driver available" << dendl;
+ return;
+ }
+
+ ldout(cct, 15) << "sync_bucket_from_rados: bucket=" << bucket_key << dendl;
+
+ // Parse tenant/bucket from bucket_key
+ std::string tenant;
+ std::string bucket_name = bucket_key;
+ size_t pos = bucket_key.find('/');
+ if (pos != std::string::npos) {
+ tenant = bucket_key.substr(0, pos);
+ bucket_name = bucket_key.substr(pos + 1);
+ }
+
+ UsagePerfDoutPrefix dpp(cct);
+
+ // Load the bucket
+ std::unique_ptr<rgw::sal::Bucket> bucket;
+ int ret = driver->load_bucket(&dpp, rgw_bucket(tenant, bucket_name),
+ &bucket, null_yield);
+ if (ret < 0) {
+ ldout(cct, 10) << "Failed to load bucket " << bucket_key
+ << ": " << cpp_strerror(-ret) << dendl;
+
+ // If bucket doesn't exist anymore, clean up stale entries
+ if (ret == -ENOENT) {
+ ldout(cct, 10) << "Bucket " << bucket_key << " no longer exists, cleaning up" << dendl;
+
+ // Remove from active_buckets
+ {
+ std::lock_guard<std::mutex> lock(activity_mutex);
+ active_buckets.erase(bucket_key);
+ }
+
+ // Remove from LMDB cache
+ if (cache) {
+ cache->remove_bucket_stats(bucket_key);
+ }
+
+ // Remove perf counter
+ {
+ std::unique_lock<std::shared_mutex> lock(counters_mutex);
+ auto it = bucket_perf_counters.find(bucket_name);
+ if (it != bucket_perf_counters.end()) {
+ auto* coll = cct->get_perfcounters_collection();
+ if (coll) {
+ coll->remove(it->second);
+ }
+ delete it->second;
+ bucket_perf_counters.erase(it);
+ }
+ }
+ }
+ return;
+ }
+
+ // Get bucket stats
+ RGWBucketEnt ent;
+ ret = bucket->sync_owner_stats(&dpp, null_yield, &ent);
+ if (ret < 0) {
+ ldout(cct, 10) << "Failed to sync bucket stats for " << bucket_key
+ << ": " << cpp_strerror(-ret) << dendl;
+ return;
+ }
+
+ ldout(cct, 15) << "Got stats from RADOS: bucket=" << bucket_key
+ << " bytes=" << ent.size
+ << " objects=" << ent.count << dendl;
+
+ // Update local LMDB cache
+ if (cache) {
+ cache->update_bucket_stats(bucket_key, ent.size, ent.count);
+ }
+
+ // Update perf counters
+ update_bucket_stats(bucket_name, ent.size, ent.count, tenant, false);
+}
+
void UsagePerfCounters::stop() {
ldout(cct, 10) << "Stopping usage perf counters" << dendl;
ldout(cct, 10) << "Stopped usage stats refresh worker thread" << dendl;
}
-void UsagePerfCounters::refresh_bucket_stats(const std::string& bucket_key) {
- ldout(cct, 20) << "refresh_bucket_stats: key=" << bucket_key << dendl;
+void UsagePerfCounters::enumerate_all_buckets_from_metadata() {
+ if (!driver) {
+ ldout(cct, 10) << "enumerate_all_buckets: no driver available" << dendl;
+ return;
+ }
- auto cached_stats = cache->get_bucket_stats(bucket_key);
- if (cached_stats) {
- std::string bucket_name = bucket_key;
- size_t pos = bucket_key.find('/');
- if (pos != std::string::npos) {
- bucket_name = bucket_key.substr(pos + 1);
+ UsagePerfDoutPrefix dpp(cct);
+ ldout(cct, 10) << "Enumerating all buckets via metadata API" << dendl;
+
+ void* handle = nullptr;
+ int ret = driver->meta_list_keys_init(&dpp, "bucket.instance", "", &handle);
+ if (ret < 0) {
+ ldout(cct, 1) << "Failed to init bucket metadata listing: "
+ << cpp_strerror(-ret) << dendl;
+ return;
+ }
+
+ int total_buckets = 0;
+ bool truncated = true;
+
+ while (truncated && !shutdown_flag) {
+ std::list<std::string> keys;
+ ret = driver->meta_list_keys_next(&dpp, handle, 1000, keys, &truncated);
+ if (ret < 0) {
+ ldout(cct, 1) << "Failed to list bucket metadata: "
+ << cpp_strerror(-ret) << dendl;
+ break;
}
- ldout(cct, 15) << "Refreshing bucket " << bucket_key
- << " bytes=" << cached_stats->bytes_used
- << " objects=" << cached_stats->num_objects << dendl;
+ for (const auto& key : keys) {
+ if (shutdown_flag) break;
+
+ // Parse bucket key format: "tenant:bucket_name:bucket_id" or "bucket_name:bucket_id"
+ std::string bucket_name;
+ std::string tenant;
+
+ size_t first_colon = key.find(':');
+ if (first_colon != std::string::npos) {
+ // Check if there's a second colon (indicates tenant is present)
+ size_t second_colon = key.find(':', first_colon + 1);
+ if (second_colon != std::string::npos) {
+ // Format: tenant:bucket_name:bucket_id
+ tenant = key.substr(0, first_colon);
+ bucket_name = key.substr(first_colon + 1, second_colon - first_colon - 1);
+ } else {
+ // Format: bucket_name:bucket_id (no tenant)
+ bucket_name = key.substr(0, first_colon);
+ }
+ } else {
+ // No colons, just bucket name
+ bucket_name = key;
+ }
+
+ if (bucket_name.empty()) {
+ ldout(cct, 5) << "Skipping empty bucket name from key: " << key << dendl;
+ continue;
+ }
+
+ std::string bucket_key = tenant.empty() ? bucket_name : tenant + "/" + bucket_name;
+ mark_bucket_active(bucket_key, tenant);
+ total_buckets++;
+
+ ldout(cct, 20) << "Added bucket to monitoring: " << bucket_key
+ << " (from metadata key: " << key << ")" << dendl;
+ }
+ }
+
+ driver->meta_list_keys_complete(handle);
+ ldout(cct, 10) << "Bucket enumeration complete: monitoring "
+ << total_buckets << " buckets from metadata" << dendl;
+}
+
+void UsagePerfCounters::enumerate_all_users_from_metadata() {
+ if (!driver) {
+ ldout(cct, 10) << "enumerate_all_users: no driver available" << dendl;
+ return;
+ }
+
+ UsagePerfDoutPrefix dpp(cct);
+ ldout(cct, 10) << "Enumerating all users via metadata API" << dendl;
+
+ void* handle = nullptr;
+ // Use "user" section which contains all users
+ int ret = driver->meta_list_keys_init(&dpp, "user", "", &handle);
+ if (ret < 0) {
+ ldout(cct, 1) << "Failed to init user metadata listing: "
+ << cpp_strerror(-ret) << dendl;
+ return;
+ }
+
+ int total_users = 0;
+ bool truncated = true;
+
+ while (truncated && !shutdown_flag) {
+ std::list<std::string> keys;
+ ret = driver->meta_list_keys_next(&dpp, handle, 1000, keys, &truncated);
+ if (ret < 0) {
+ ldout(cct, 1) << "Failed to list user metadata: "
+ << cpp_strerror(-ret) << dendl;
+ break;
+ }
- update_bucket_stats(bucket_name, cached_stats->bytes_used,
- cached_stats->num_objects, "", false);
+ for (const auto& user_id : keys) {
+ if (shutdown_flag) break;
+
+ if (user_id.empty()) {
+ continue;
+ }
+
+ mark_user_active(user_id);
+ total_users++;
+
+ ldout(cct, 20) << "Added user to monitoring: " << user_id
+ << " (from metadata)" << dendl;
+ }
}
+
+ driver->meta_list_keys_complete(handle);
+ ldout(cct, 10) << "User enumeration complete: monitoring "
+ << total_users << " users from metadata" << dendl;
+}
+
+void UsagePerfCounters::refresh_bucket_stats(const std::string& bucket_key) {
+ ldout(cct, 20) << "refresh_bucket_stats: key=" << bucket_key << dendl;
+
+ // Fetch real stats from RADOS
+ sync_bucket_from_rados(bucket_key);
}
void UsagePerfCounters::refresh_user_stats(const std::string& user_id) {