From: Igor Golikov Date: Mon, 8 Dec 2025 11:40:05 +0000 (+0000) Subject: test: add tests for subvolume utilization metrics X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=251e97b9f9e9a732b38ef4774a1a33b331b8df4b;p=ceph-ci.git test: add tests for subvolume utilization metrics Add comperehensive tests to validate correct quota and current size metrics for subvolumes Signed-off-by: Igor Golikov Fixes: https://tracker.ceph.com/issues/74135 --- diff --git a/qa/tasks/cephfs/test_subvolume.py b/qa/tasks/cephfs/test_subvolume.py index 6d2f62d3d3f..08680615af1 100644 --- a/qa/tasks/cephfs/test_subvolume.py +++ b/qa/tasks/cephfs/test_subvolume.py @@ -292,26 +292,30 @@ class TestSubvolumeMetrics(CephFSTestCase): subvol_name = "metrics_subv" subv_path = "/volumes/_nogroup/metrics_subv" - # no metrics initially - subvol_metrics = self.get_subvolume_metrics() - self.assertFalse(subvol_metrics, "Subvolume metrics should not be present before I/O") + # ensure metrics absent and quota not set yet + self.assertFalse(self.get_subvolume_metrics(), + "Subvolume metrics should not be present before subvolume creation") - # create subvolume - self.fs.run_ceph_cmd('fs', 'subvolume', 'create', 'cephfs', subvol_name) + # create subvolume with quota (5 GiB) + quota_bytes = 5 * 1024 * 1024 * 1024 + self.fs.run_ceph_cmd('fs', 'subvolume', 'create', 'cephfs', subvol_name, + "--size", str(quota_bytes)) # generate some I/O mount_point = self.mount_a.get_mount_point() suvolume_fs_path = self.fs.get_ceph_cmd_stdout('fs', 'subvolume', 'getpath', 'cephfs', subvol_name).strip() suvolume_fs_path = os.path.join(mount_point, suvolume_fs_path.strip('/')) + self.mount_a.run_shell(['ls', '-ld', suvolume_fs_path]) # do some writes filename = os.path.join(suvolume_fs_path, "file0") self.mount_a.run_shell_payload("sudo fio " "--name test -rw=write " "--bs=4k --numjobs=1 --time_based " - "--runtime=20s --verify=0 --size=1G " + "--runtime=10s --verify=0 --size=50M " f"--filename={filename}", wait=True) + baseline_used = 0 subvol_metrics = None with safe_while(sleep=1, tries=30, action='wait for subvolume write counters') as proceed: while proceed(): @@ -338,17 +342,25 @@ class TestSubvolumeMetrics(CephFSTestCase): self.assertIn("avg_write_iops", counters) self.assertIn("avg_write_tp_Bps", counters) self.assertIn("avg_write_lat_msec", counters) + self.assertIn("quota_bytes", counters) + self.assertIn("used_bytes", counters) # check write metrics self.assertGreater(counters["avg_write_iops"], 0, "Expected avg_write_iops to be > 0") self.assertGreater(counters["avg_write_tp_Bps"], 0, "Expected avg_write_tp_Bps to be > 0") self.assertGreaterEqual(counters["avg_write_lat_msec"], 0, "Expected avg_write_lat_msec to be > 0") + self.assertEqual(counters["quota_bytes"], quota_bytes, + f"Expected quota_bytes to reflect provisioned quota ({quota_bytes})") + self.assertGreater(counters["used_bytes"], baseline_used, + "Expected used_bytes to grow after writes") + + baseline_used = counters["used_bytes"] # do some reads self.mount_a.run_shell_payload("sudo fio " "--name test -rw=read " "--bs=4k --numjobs=1 --time_based " - "--runtime=20s --verify=0 --size=1G " + "--runtime=5s --verify=0 --size=50M " f"--filename={filename}", wait=True) subvol_metrics = None @@ -368,10 +380,618 @@ class TestSubvolumeMetrics(CephFSTestCase): self.assertGreater(counters["avg_read_iops"], 0, "Expected avg_read_iops to be >= 0") self.assertGreater(counters["avg_read_tp_Bps"], 0, "Expected avg_read_tp_Bps to be >= 0") self.assertGreaterEqual(counters["avg_read_lat_msec"], 0, "Expected avg_read_lat_msec to be >= 0") + self.assertEqual(counters["quota_bytes"], quota_bytes, + "Quota should remain unchanged during workload") + self.assertGreaterEqual(counters["used_bytes"], baseline_used, + "Used bytes should not shrink during reads alone") + + # delete part of the data and ensure used_bytes drops + self.mount_a.run_shell_payload(f"sudo rm -f {filename}") + self.mount_a.run_shell_payload(f"sudo truncate -s 0 {filename}") + self.mount_a.run_shell_payload("sudo sync") + + # Trigger I/O to generate metrics (metrics only sent during I/O) + trigger_file = os.path.join(suvolume_fs_path, "trigger_metrics") + self.mount_a.run_shell_payload(f"sudo dd if=/dev/zero of={trigger_file} bs=4k count=10", wait=True) + + reduced_metrics = None + with safe_while(sleep=1, tries=30, action='wait for reduced usage') as proceed: + while proceed(): + # Keep triggering I/O to generate metrics + self.mount_a.run_shell_payload(f"sudo dd if=/dev/zero of={trigger_file} bs=4k count=1 conv=notrunc", wait=True) + reduced_metrics = self.get_subvolume_metrics() + if reduced_metrics: + counters_after_delete = reduced_metrics[0]["counters"] + if counters_after_delete["used_bytes"] < baseline_used: + break + self.assertIsNotNone(reduced_metrics, "Expected subvolume metrics after deletion") + counters_after_delete = reduced_metrics[0]["counters"] + self.assertLess(counters_after_delete["used_bytes"], baseline_used, + "Used bytes should drop after deleting data") + self.assertEqual(counters_after_delete["quota_bytes"], quota_bytes, + "Quota should remain unchanged after deletions") # wait for metrics to expire after inactivity - sleep(60) + sleep(30) # verify that metrics are not present anymore subvolume_metrics = self.get_subvolume_metrics() self.assertFalse(subvolume_metrics, "Subvolume metrics should be gone after inactivity window") + + def test_subvolume_quota_resize_update(self): + """ + Verify that subvolume quota changes (via resize) are reflected in metrics. + This tests that maybe_update_subvolume_quota() is called when quota is + broadcast to clients. + """ + subvol_name = "resize_test_subv" + subv_path = f"/volumes/_nogroup/{subvol_name}" + + # create subvolume with initial quota (1 GiB) + initial_quota = 1 * 1024 * 1024 * 1024 + self.fs.run_ceph_cmd('fs', 'subvolume', 'create', 'cephfs', subvol_name, + "--size", str(initial_quota)) + + # access subvolume to trigger registration + mount_point = self.mount_a.get_mount_point() + subvolume_fs_path = self.fs.get_ceph_cmd_stdout('fs', 'subvolume', 'getpath', 'cephfs', subvol_name).strip() + subvolume_fs_path = os.path.join(mount_point, subvolume_fs_path.strip('/')) + self.mount_a.run_shell(['ls', '-ld', subvolume_fs_path]) + + # do some writes to generate metrics + filename = os.path.join(subvolume_fs_path, "testfile") + self.mount_a.run_shell_payload("sudo fio " + "--name test -rw=write " + "--bs=4k --numjobs=1 --time_based " + "--runtime=3s --verify=0 --size=5M " + f"--filename={filename}", wait=True) + + # verify initial quota in metrics + subvol_metrics = None + with safe_while(sleep=1, tries=30, action='wait for subvolume metrics') as proceed: + while proceed(): + subvol_metrics = self.get_subvolume_metrics() + if subvol_metrics: + break + + self.assertIsNotNone(subvol_metrics, "Expected subvolume metrics to appear") + counters = subvol_metrics[0]["counters"] + self.assertEqual(counters["quota_bytes"], initial_quota, + f"Expected initial quota_bytes={initial_quota}") + + # resize subvolume to new quota (2 GiB) + new_quota = 2 * 1024 * 1024 * 1024 + self.fs.run_ceph_cmd('fs', 'subvolume', 'resize', 'cephfs', subvol_name, + str(new_quota)) + + # trigger quota broadcast by accessing the subvolume + self.mount_a.run_shell(['ls', '-ld', subvolume_fs_path]) + # small I/O to ensure metrics update + self.mount_a.run_shell_payload(f"echo 'test' | sudo tee {filename}.trigger > /dev/null") + + # verify new quota in metrics + updated_metrics = None + with safe_while(sleep=1, tries=30, action='wait for updated quota') as proceed: + while proceed(): + updated_metrics = self.get_subvolume_metrics() + if updated_metrics: + counters = updated_metrics[0]["counters"] + if counters["quota_bytes"] == new_quota: + break + + self.assertIsNotNone(updated_metrics, "Expected updated subvolume metrics") + counters = updated_metrics[0]["counters"] + self.assertEqual(counters["quota_bytes"], new_quota, + f"Expected quota_bytes to update to {new_quota} after resize") + labels = updated_metrics[0]["labels"] + self.assertEqual(labels["subvolume_path"], subv_path, + "Unexpected subvolume_path in metrics") + + # cleanup + self.fs.run_ceph_cmd('fs', 'subvolume', 'rm', 'cephfs', subvol_name) + + def test_subvolume_quota_activity_prevents_eviction(self): + """ + Verify that quota broadcasts keep subvolume_quota entries alive + (by updating last_activity), preventing eviction even without client I/O. + + This tests the last_activity update in maybe_update_subvolume_quota(). + """ + subvol_name = "evict_test_subv" + + # create subvolume with quota + quota_bytes = 1 * 1024 * 1024 * 1024 + self.fs.run_ceph_cmd('fs', 'subvolume', 'create', 'cephfs', subvol_name, + "--size", str(quota_bytes)) + + # access subvolume to trigger registration + mount_point = self.mount_a.get_mount_point() + subvolume_fs_path = self.fs.get_ceph_cmd_stdout('fs', 'subvolume', 'getpath', 'cephfs', subvol_name).strip() + subvolume_fs_path = os.path.join(mount_point, subvolume_fs_path.strip('/')) + + # do initial write to generate metrics + filename = os.path.join(subvolume_fs_path, "testfile") + self.mount_a.run_shell_payload("sudo fio " + "--name test -rw=write " + "--bs=4k --numjobs=1 --time_based " + "--runtime=3s --verify=0 --size=5M " + f"--filename={filename}", wait=True) + + # verify metrics appear + subvol_metrics = None + with safe_while(sleep=1, tries=30, action='wait for subvolume metrics') as proceed: + while proceed(): + subvol_metrics = self.get_subvolume_metrics() + if subvol_metrics: + break + + self.assertIsNotNone(subvol_metrics, "Expected subvolume metrics to appear") + initial_counters = subvol_metrics[0]["counters"] + self.assertEqual(initial_counters["quota_bytes"], quota_bytes) + + # wait briefly and do another read to keep the entry alive through quota broadcast + sleep(2) + + # access subvolume to trigger quota broadcast (not full I/O) + self.mount_a.run_shell(['ls', '-la', subvolume_fs_path]) + self.mount_a.run_shell(['stat', filename]) + + # verify metrics still present + still_present = self.get_subvolume_metrics() + self.assertIsNotNone(still_present, "Metrics should still be present after directory access") + self.assertEqual(still_present[0]["counters"]["quota_bytes"], quota_bytes, + "Quota should remain unchanged") + + # cleanup + self.fs.run_ceph_cmd('fs', 'subvolume', 'rm', 'cephfs', subvol_name) + + def test_multiple_subvolume_quotas(self): + """ + Verify that multiple subvolumes each have independent quota tracking. + This tests that the subvolume_quota map correctly handles multiple entries. + """ + subvol1_name = "multi_subv1" + subvol2_name = "multi_subv2" + subv1_path = f"/volumes/_nogroup/{subvol1_name}" + subv2_path = f"/volumes/_nogroup/{subvol2_name}" + + # create two subvolumes with different quotas + quota1 = 1 * 1024 * 1024 * 1024 # 1 GiB + quota2 = 2 * 1024 * 1024 * 1024 # 2 GiB + + self.fs.run_ceph_cmd('fs', 'subvolume', 'create', 'cephfs', subvol1_name, + "--size", str(quota1)) + self.fs.run_ceph_cmd('fs', 'subvolume', 'create', 'cephfs', subvol2_name, + "--size", str(quota2)) + + mount_point = self.mount_a.get_mount_point() + + # access and do I/O on both subvolumes + for subvol_name in [subvol1_name, subvol2_name]: + subvolume_fs_path = self.fs.get_ceph_cmd_stdout('fs', 'subvolume', 'getpath', 'cephfs', subvol_name).strip() + subvolume_fs_path = os.path.join(mount_point, subvolume_fs_path.strip('/')) + self.mount_a.run_shell(['ls', '-ld', subvolume_fs_path]) + + filename = os.path.join(subvolume_fs_path, "testfile") + self.mount_a.run_shell_payload("sudo fio " + "--name test -rw=write " + "--bs=4k --numjobs=1 --time_based " + "--runtime=3s --verify=0 --size=5M " + f"--filename={filename}", wait=True) + + # wait for metrics to be collected + subvol_metrics = None + with safe_while(sleep=1, tries=30, action='wait for multiple subvolume metrics') as proceed: + while proceed(): + subvol_metrics = self.get_subvolume_metrics() + if subvol_metrics and len(subvol_metrics) >= 2: + break + + self.assertIsNotNone(subvol_metrics, "Expected subvolume metrics for both subvolumes") + self.assertGreaterEqual(len(subvol_metrics), 2, + "Expected at least 2 subvolume metrics entries") + + # build a map of path -> quota from metrics + metrics_by_path = {} + for m in subvol_metrics: + path = m["labels"]["subvolume_path"] + quota = m["counters"]["quota_bytes"] + metrics_by_path[path] = quota + + # verify each subvolume has correct quota + self.assertIn(subv1_path, metrics_by_path, + f"Expected metrics for {subv1_path}") + self.assertIn(subv2_path, metrics_by_path, + f"Expected metrics for {subv2_path}") + self.assertEqual(metrics_by_path[subv1_path], quota1, + f"Expected quota {quota1} for {subv1_path}") + self.assertEqual(metrics_by_path[subv2_path], quota2, + f"Expected quota {quota2} for {subv2_path}") + + # cleanup + self.fs.run_ceph_cmd('fs', 'subvolume', 'rm', 'cephfs', subvol1_name) + self.fs.run_ceph_cmd('fs', 'subvolume', 'rm', 'cephfs', subvol2_name) + + def test_subvolume_unlimited_quota(self): + """ + Verify that subvolumes without quota (unlimited) report quota_bytes=0. + """ + subvol_name = "unlimited_subv" + subv_path = f"/volumes/_nogroup/{subvol_name}" + + # create subvolume without quota + self.fs.run_ceph_cmd('fs', 'subvolume', 'create', 'cephfs', subvol_name) + + # access subvolume and do I/O + mount_point = self.mount_a.get_mount_point() + subvolume_fs_path = self.fs.get_ceph_cmd_stdout('fs', 'subvolume', 'getpath', 'cephfs', subvol_name).strip() + subvolume_fs_path = os.path.join(mount_point, subvolume_fs_path.strip('/')) + self.mount_a.run_shell(['ls', '-ld', subvolume_fs_path]) + + filename = os.path.join(subvolume_fs_path, "testfile") + self.mount_a.run_shell_payload("sudo fio " + "--name test -rw=write " + "--bs=4k --numjobs=1 --time_based " + "--runtime=3s --verify=0 --size=5M " + f"--filename={filename}", wait=True) + + # wait for metrics with quota_bytes=0 (unlimited subvolume) + subvol_metrics = None + with safe_while(sleep=1, tries=30, action='wait for subvolume metrics') as proceed: + while proceed(): + subvol_metrics = self.get_subvolume_metrics() + if subvol_metrics: + # for unlimited quota, quota_bytes should be 0 + if subvol_metrics[0]["counters"]["quota_bytes"] == 0: + break + + self.assertIsNotNone(subvol_metrics, "Expected subvolume metrics") + counters = subvol_metrics[0]["counters"] + labels = subvol_metrics[0]["labels"] + + self.assertEqual(labels["subvolume_path"], subv_path) + self.assertEqual(counters["quota_bytes"], 0, + "Expected quota_bytes=0 for unlimited subvolume") + # used_bytes is fetched dynamically from the inode's rstat, so it should + # reflect actual usage even for unlimited quota subvolumes + self.assertGreater(counters["used_bytes"], 0, + "Expected used_bytes > 0 after writes") + + # cleanup + self.fs.run_ceph_cmd('fs', 'subvolume', 'rm', 'cephfs', subvol_name) + + def test_subvolume_metrics_stress(self): + """ + Comprehensive stress test for subvolume metrics covering: + - Multiple quota resizes (up and down) + - Data writes, partial deletes, overwrites + - Concurrent operations on multiple subvolumes + - Verifying metrics accurately track all changes + """ + subvol_name = "stress_subv" + subv_path = f"/volumes/_nogroup/{subvol_name}" + + # Phase 1: Create subvolume with initial small quota + initial_quota = 100 * 1024 * 1024 # 100 MiB + self.fs.run_ceph_cmd('fs', 'subvolume', 'create', 'cephfs', subvol_name, + "--size", str(initial_quota)) + + mount_point = self.mount_a.get_mount_point() + subvolume_fs_path = self.fs.get_ceph_cmd_stdout('fs', 'subvolume', 'getpath', 'cephfs', subvol_name).strip() + subvolume_fs_path = os.path.join(mount_point, subvolume_fs_path.strip('/')) + + log.info(f"Phase 1: Initial quota {initial_quota}, writing data...") + + # Write multiple files + for i in range(3): + filename = os.path.join(subvolume_fs_path, f"file{i}") + self.mount_a.run_shell_payload("sudo fio " + "--name test -rw=write " + "--bs=4k --numjobs=1 --time_based " + f"--runtime=2s --verify=0 --size=5M " + f"--filename={filename}", wait=True) + + # Verify initial metrics + subvol_metrics = None + with safe_while(sleep=1, tries=30, action='wait for initial metrics') as proceed: + while proceed(): + subvol_metrics = self.get_subvolume_metrics() + if subvol_metrics: + counters = subvol_metrics[0]["counters"] + if counters["quota_bytes"] == initial_quota and counters["used_bytes"] > 0: + break + + self.assertIsNotNone(subvol_metrics, "Expected initial metrics") + counters = subvol_metrics[0]["counters"] + self.assertEqual(counters["quota_bytes"], initial_quota) + initial_used = counters["used_bytes"] + self.assertGreater(initial_used, 0, "Expected some data written") + log.info(f"Phase 1 complete: quota={counters['quota_bytes']}, used={initial_used}") + + # Phase 2: Resize quota UP + larger_quota = 500 * 1024 * 1024 # 500 MiB + log.info(f"Phase 2: Resizing quota UP to {larger_quota}...") + self.fs.run_ceph_cmd('fs', 'subvolume', 'resize', 'cephfs', subvol_name, + str(larger_quota)) + + # Trigger quota broadcast + self.mount_a.run_shell(['ls', '-la', subvolume_fs_path]) + self.mount_a.run_shell_payload(f"echo 'trigger' | sudo tee {subvolume_fs_path}/trigger > /dev/null") + + # Verify quota increased, used unchanged + updated_metrics = None + with safe_while(sleep=1, tries=30, action='wait for quota increase') as proceed: + while proceed(): + updated_metrics = self.get_subvolume_metrics() + if updated_metrics: + counters = updated_metrics[0]["counters"] + if counters["quota_bytes"] == larger_quota: + break + + self.assertIsNotNone(updated_metrics) + counters = updated_metrics[0]["counters"] + self.assertEqual(counters["quota_bytes"], larger_quota) + # used_bytes should be roughly the same (small trigger file added) + self.assertGreater(counters["used_bytes"], initial_used - 1024) # allow some variance + log.info(f"Phase 2 complete: quota={counters['quota_bytes']}, used={counters['used_bytes']}") + + # Phase 3: Write more data to increase usage + log.info("Phase 3: Writing more data...") + for i in range(3, 6): + filename = os.path.join(subvolume_fs_path, f"file{i}") + self.mount_a.run_shell_payload("sudo fio " + "--name test -rw=write " + "--bs=4k --numjobs=1 --time_based " + f"--runtime=2s --verify=0 --size=5M " + f"--filename={filename}", wait=True) + + # Verify used_bytes increased + more_metrics = None + with safe_while(sleep=1, tries=30, action='wait for increased usage') as proceed: + while proceed(): + more_metrics = self.get_subvolume_metrics() + if more_metrics: + counters = more_metrics[0]["counters"] + if counters["used_bytes"] > initial_used * 1.5: # expect significant increase + break + + self.assertIsNotNone(more_metrics) + counters = more_metrics[0]["counters"] + phase3_used = counters["used_bytes"] + self.assertGreater(phase3_used, initial_used * 1.5, + "Expected used_bytes to increase significantly after more writes") + log.info(f"Phase 3 complete: quota={counters['quota_bytes']}, used={phase3_used}") + + # Phase 4: Delete some files, verify used_bytes decreases + log.info("Phase 4: Deleting files 0-2...") + for i in range(3): + self.mount_a.run_shell_payload(f"sudo rm -f {subvolume_fs_path}/file{i}") + self.mount_a.run_shell_payload("sudo sync") + + # Trigger I/O on remaining files to generate metrics (metrics only sent during I/O) + self.mount_a.run_shell_payload(f"sudo dd if=/dev/zero of={subvolume_fs_path}/trigger_delete bs=4k count=10", wait=True) + + # Verify used_bytes decreased + deleted_metrics = None + with safe_while(sleep=1, tries=30, action='wait for decreased usage after delete') as proceed: + while proceed(): + # Keep triggering writes to ensure metrics are generated + self.mount_a.run_shell_payload(f"sudo dd if=/dev/zero of={subvolume_fs_path}/trigger_delete bs=4k count=1 conv=notrunc", wait=True) + deleted_metrics = self.get_subvolume_metrics() + if deleted_metrics: + counters = deleted_metrics[0]["counters"] + if counters["used_bytes"] < phase3_used * 0.8: # expect ~50% drop + break + + self.assertIsNotNone(deleted_metrics) + counters = deleted_metrics[0]["counters"] + phase4_used = counters["used_bytes"] + self.assertLess(phase4_used, phase3_used, + "Expected used_bytes to decrease after file deletion") + log.info(f"Phase 4 complete: quota={counters['quota_bytes']}, used={phase4_used}") + + # Phase 5: Resize quota DOWN (but still above current usage) + smaller_quota = 200 * 1024 * 1024 # 200 MiB + log.info(f"Phase 5: Resizing quota DOWN to {smaller_quota}...") + self.fs.run_ceph_cmd('fs', 'subvolume', 'resize', 'cephfs', subvol_name, + str(smaller_quota)) + + # Trigger quota broadcast + self.mount_a.run_shell(['stat', subvolume_fs_path]) + + # Verify quota decreased + smaller_metrics = None + with safe_while(sleep=1, tries=30, action='wait for quota decrease') as proceed: + while proceed(): + smaller_metrics = self.get_subvolume_metrics() + if smaller_metrics: + counters = smaller_metrics[0]["counters"] + if counters["quota_bytes"] == smaller_quota: + break + + self.assertIsNotNone(smaller_metrics) + counters = smaller_metrics[0]["counters"] + self.assertEqual(counters["quota_bytes"], smaller_quota) + log.info(f"Phase 5 complete: quota={counters['quota_bytes']}, used={counters['used_bytes']}") + + # Phase 6: Overwrite existing files (usage should stay roughly same) + log.info("Phase 6: Overwriting existing files...") + for i in range(3, 6): + filename = os.path.join(subvolume_fs_path, f"file{i}") + self.mount_a.run_shell_payload("sudo fio " + "--name test -rw=write " + "--bs=4k --numjobs=1 --time_based " + f"--runtime=1s --verify=0 --size=5M " + f"--filename={filename}", wait=True) + + overwrite_metrics = self.get_subvolume_metrics() + self.assertIsNotNone(overwrite_metrics) + counters = overwrite_metrics[0]["counters"] + # used_bytes should be similar (overwriting same files) + self.assertGreater(counters["used_bytes"], 0) + log.info(f"Phase 6 complete: quota={counters['quota_bytes']}, used={counters['used_bytes']}") + + # Phase 7: Mixed read/write workload + log.info("Phase 7: Mixed read/write workload...") + filename = os.path.join(subvolume_fs_path, "mixed_file") + self.mount_a.run_shell_payload("sudo fio " + "--name test -rw=randrw --rwmixread=50 " + "--bs=4k --numjobs=2 --time_based " + "--runtime=3s --verify=0 --size=5M " + f"--filename={filename}", wait=True) + + # Verify metrics still valid after mixed workload + mixed_metrics = None + with safe_while(sleep=1, tries=30, action='wait for mixed workload metrics') as proceed: + while proceed(): + mixed_metrics = self.get_subvolume_metrics() + if mixed_metrics: + counters = mixed_metrics[0]["counters"] + # Should have both read and write ops + if counters["avg_read_iops"] > 0 and counters["avg_write_iops"] > 0: + break + + self.assertIsNotNone(mixed_metrics) + counters = mixed_metrics[0]["counters"] + self.assertGreater(counters["avg_read_iops"], 0, "Expected read I/O from mixed workload") + self.assertGreater(counters["avg_write_iops"], 0, "Expected write I/O from mixed workload") + self.assertEqual(counters["quota_bytes"], smaller_quota, "Quota should remain unchanged") + log.info(f"Phase 7 complete: quota={counters['quota_bytes']}, used={counters['used_bytes']}, " + f"read_iops={counters['avg_read_iops']}, write_iops={counters['avg_write_iops']}") + + # Phase 8: Remove quota entirely (set to unlimited) + log.info("Phase 8: Removing quota (setting to unlimited)...") + self.fs.run_ceph_cmd('fs', 'subvolume', 'resize', 'cephfs', subvol_name, 'inf') + + # Do a small write to generate metrics after quota change + self.mount_a.run_shell_payload(f"sudo dd if=/dev/zero of={subvolume_fs_path}/trigger_unlimited bs=4k count=10", wait=True) + + # Verify quota_bytes becomes 0 (unlimited) + unlimited_metrics = None + with safe_while(sleep=1, tries=30, action='wait for unlimited quota') as proceed: + while proceed(): + # Keep triggering I/O to generate fresh metrics + self.mount_a.run_shell_payload(f"sudo dd if=/dev/zero of={subvolume_fs_path}/trigger_unlimited bs=4k count=1 conv=notrunc", wait=True) + unlimited_metrics = self.get_subvolume_metrics() + if unlimited_metrics: + counters = unlimited_metrics[0]["counters"] + # quota_bytes=0 means unlimited + if counters["quota_bytes"] == 0: + break + + self.assertIsNotNone(unlimited_metrics) + counters = unlimited_metrics[0]["counters"] + self.assertEqual(counters["quota_bytes"], 0, "Expected quota_bytes=0 for unlimited") + self.assertGreater(counters["used_bytes"], 0, "used_bytes should still be tracked") + log.info(f"Phase 8 complete: quota={counters['quota_bytes']}, used={counters['used_bytes']}") + + # Phase 9: Re-apply quota after being unlimited + log.info("Phase 9: Re-applying quota after unlimited...") + final_quota = 300 * 1024 * 1024 # 300 MiB + self.fs.run_ceph_cmd('fs', 'subvolume', 'resize', 'cephfs', subvol_name, + str(final_quota)) + + # Trigger broadcast + self.mount_a.run_shell_payload(f"echo 'final' | sudo tee {subvolume_fs_path}/final > /dev/null") + + # Verify quota restored + final_metrics = None + with safe_while(sleep=1, tries=30, action='wait for restored quota') as proceed: + while proceed(): + # Trigger I/O to generate metrics + self.mount_a.run_shell_payload(f"sudo dd if=/dev/zero of={subvolume_fs_path}/trigger_final bs=4k count=1 conv=notrunc", wait=True) + final_metrics = self.get_subvolume_metrics() + if final_metrics: + counters = final_metrics[0]["counters"] + if counters["quota_bytes"] == final_quota: + break + + self.assertIsNotNone(final_metrics) + counters = final_metrics[0]["counters"] + self.assertEqual(counters["quota_bytes"], final_quota, + "Expected quota to be restored after being unlimited") + self.assertGreater(counters["used_bytes"], 0) + log.info(f"Phase 9 complete: quota={counters['quota_bytes']}, used={counters['used_bytes']}") + + # Final verification: all metrics fields present and sane + labels = final_metrics[0]["labels"] + self.assertEqual(labels["fs_name"], "cephfs") + self.assertEqual(labels["subvolume_path"], subv_path) + self.assertIn("avg_read_iops", counters) + self.assertIn("avg_write_iops", counters) + self.assertIn("avg_read_tp_Bps", counters) + self.assertIn("avg_write_tp_Bps", counters) + self.assertIn("avg_read_lat_msec", counters) + self.assertIn("avg_write_lat_msec", counters) + + log.info("Stress test completed successfully!") + + # cleanup + self.fs.run_ceph_cmd('fs', 'subvolume', 'rm', 'cephfs', subvol_name) + + def test_subvolume_metrics_eviction_reactivation(self): + """ + Ensure subvolume quota cache entries are evicted after inactivity and + re-populated on the next quota broadcast. + """ + # Shrink window to speed up eviction + # Use 'mds' service (not 'mds.*') for config set/get in vstart. + # The minimal allowed window is 30s; set to 30 to keep the test shorter. + original_window = self.fs.get_ceph_cmd_stdout('config', 'get', 'mds', + 'subv_metrics_window_interval').strip() + self.fs.run_ceph_cmd('config', 'set', 'mds', 'subv_metrics_window_interval', '30') + self.addCleanup(self.fs.run_ceph_cmd, 'config', 'set', 'mds', + 'subv_metrics_window_interval', original_window) + + subvol_name = "evict_subv" + initial_quota = 50 * 1024 * 1024 # 50 MiB + updated_quota = 60 * 1024 * 1024 # 60 MiB + + # Create subvolume with initial quota and write a small file + self.fs.run_ceph_cmd('fs', 'subvolume', 'create', 'cephfs', subvol_name, + "--size", str(initial_quota)) + mount_point = self.mount_a.get_mount_point() + subvolume_fs_path = self.fs.get_ceph_cmd_stdout('fs', 'subvolume', 'getpath', + 'cephfs', subvol_name).strip() + subvolume_fs_path = os.path.join(mount_point, subvolume_fs_path.strip('/')) + filename = os.path.join(subvolume_fs_path, "seed") + self.mount_a.run_shell_payload("sudo fio --name test -rw=write --bs=4k --numjobs=1 " + "--time_based --runtime=2s --verify=0 --size=4M " + f"--filename={filename}", wait=True) + + # Wait for metrics to reflect initial quota + with safe_while(sleep=1, tries=30, action='wait for initial quota metrics') as proceed: + while proceed(): + metrics = self.get_subvolume_metrics() + if metrics: + counters = metrics[0]["counters"] + if counters["quota_bytes"] == initial_quota and counters["used_bytes"] > 0: + break + + # Let the window expire (2 * window = 60s) + sleep(65) + + # Change quota to force a new broadcast and repopulate cache + self.fs.run_ceph_cmd('fs', 'subvolume', 'resize', 'cephfs', subvol_name, + str(updated_quota)) + # Trigger broadcast/metrics generation + self.mount_a.run_shell_payload(f"echo 'poke' | sudo tee {subvolume_fs_path}/poke > /dev/null") + + # Verify quota is updated after eviction and reactivation + metrics = None + with safe_while(sleep=1, tries=30, action='wait for quota after eviction') as proceed: + while proceed(): + metrics = self.get_subvolume_metrics() + if metrics: + counters = metrics[0]["counters"] + if counters["quota_bytes"] == updated_quota: + break + + self.assertIsNotNone(metrics, "Expected metrics after reactivation") + counters = metrics[0]["counters"] + self.assertEqual(counters["quota_bytes"], updated_quota, + "Expected quota to refresh after eviction") + self.assertGreater(counters["used_bytes"], 0, "Expected used_bytes to remain tracked") + + # Cleanup + self.fs.run_ceph_cmd('fs', 'subvolume', 'rm', 'cephfs', subvol_name)