]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
test: add tests for subvolume utilization metrics
authorIgor Golikov <igolikov@redhat.com>
Mon, 8 Dec 2025 11:40:05 +0000 (11:40 +0000)
committerIgor Golikov <igolikov@redhat.com>
Tue, 30 Dec 2025 17:45:44 +0000 (17:45 +0000)
Add comperehensive tests to validate correct quota and current size
metrics for subvolumes

Signed-off-by: Igor Golikov <igolikov@redhat.com>
Fixes: https://tracker.ceph.com/issues/74135
qa/tasks/cephfs/test_subvolume.py

index 6d2f62d3d3f646873d108164b73418fc312c3871..08680615af153b23e87f12e653695d98da86ddee 100644 (file)
@@ -292,26 +292,30 @@ class TestSubvolumeMetrics(CephFSTestCase):
         subvol_name = "metrics_subv"
         subv_path = "/volumes/_nogroup/metrics_subv"
 
-        # no metrics initially
-        subvol_metrics = self.get_subvolume_metrics()
-        self.assertFalse(subvol_metrics, "Subvolume metrics should not be present before I/O")
+        # ensure metrics absent and quota not set yet
+        self.assertFalse(self.get_subvolume_metrics(),
+                         "Subvolume metrics should not be present before subvolume creation")
 
-        # create subvolume
-        self.fs.run_ceph_cmd('fs', 'subvolume', 'create', 'cephfs', subvol_name)
+        # create subvolume with quota (5 GiB)
+        quota_bytes = 5 * 1024 * 1024 * 1024
+        self.fs.run_ceph_cmd('fs', 'subvolume', 'create', 'cephfs', subvol_name,
+                             "--size", str(quota_bytes))
 
         # generate some I/O
         mount_point = self.mount_a.get_mount_point()
         suvolume_fs_path = self.fs.get_ceph_cmd_stdout('fs', 'subvolume', 'getpath', 'cephfs', subvol_name).strip()
         suvolume_fs_path = os.path.join(mount_point, suvolume_fs_path.strip('/'))
+        self.mount_a.run_shell(['ls', '-ld', suvolume_fs_path])
 
         # do some writes
         filename = os.path.join(suvolume_fs_path, "file0")
         self.mount_a.run_shell_payload("sudo fio "
                                        "--name test -rw=write "
                                        "--bs=4k --numjobs=1 --time_based "
-                                       "--runtime=20s --verify=0 --size=1G "
+                                       "--runtime=10s --verify=0 --size=50M "
                                        f"--filename={filename}", wait=True)
 
+        baseline_used = 0
         subvol_metrics = None
         with safe_while(sleep=1, tries=30, action='wait for subvolume write counters') as proceed:
             while proceed():
@@ -338,17 +342,25 @@ class TestSubvolumeMetrics(CephFSTestCase):
         self.assertIn("avg_write_iops", counters)
         self.assertIn("avg_write_tp_Bps", counters)
         self.assertIn("avg_write_lat_msec", counters)
+        self.assertIn("quota_bytes", counters)
+        self.assertIn("used_bytes", counters)
 
         # check write metrics
         self.assertGreater(counters["avg_write_iops"], 0, "Expected avg_write_iops to be > 0")
         self.assertGreater(counters["avg_write_tp_Bps"], 0, "Expected avg_write_tp_Bps to be > 0")
         self.assertGreaterEqual(counters["avg_write_lat_msec"], 0, "Expected avg_write_lat_msec to be > 0")
+        self.assertEqual(counters["quota_bytes"], quota_bytes,
+                         f"Expected quota_bytes to reflect provisioned quota ({quota_bytes})")
+        self.assertGreater(counters["used_bytes"], baseline_used,
+                           "Expected used_bytes to grow after writes")
+
+        baseline_used = counters["used_bytes"]
 
         # do some reads
         self.mount_a.run_shell_payload("sudo fio "
                                        "--name test -rw=read "
                                        "--bs=4k --numjobs=1 --time_based "
-                                       "--runtime=20s --verify=0 --size=1G "
+                                       "--runtime=5s --verify=0 --size=50M "
                                        f"--filename={filename}", wait=True)
 
         subvol_metrics = None
@@ -368,10 +380,618 @@ class TestSubvolumeMetrics(CephFSTestCase):
         self.assertGreater(counters["avg_read_iops"], 0, "Expected avg_read_iops to be >= 0")
         self.assertGreater(counters["avg_read_tp_Bps"], 0, "Expected avg_read_tp_Bps to be >= 0")
         self.assertGreaterEqual(counters["avg_read_lat_msec"], 0, "Expected avg_read_lat_msec to be >= 0")
+        self.assertEqual(counters["quota_bytes"], quota_bytes,
+                         "Quota should remain unchanged during workload")
+        self.assertGreaterEqual(counters["used_bytes"], baseline_used,
+                                "Used bytes should not shrink during reads alone")
+
+        # delete part of the data and ensure used_bytes drops
+        self.mount_a.run_shell_payload(f"sudo rm -f {filename}")
+        self.mount_a.run_shell_payload(f"sudo truncate -s 0 {filename}")
+        self.mount_a.run_shell_payload("sudo sync")
+
+        # Trigger I/O to generate metrics (metrics only sent during I/O)
+        trigger_file = os.path.join(suvolume_fs_path, "trigger_metrics")
+        self.mount_a.run_shell_payload(f"sudo dd if=/dev/zero of={trigger_file} bs=4k count=10", wait=True)
+
+        reduced_metrics = None
+        with safe_while(sleep=1, tries=30, action='wait for reduced usage') as proceed:
+            while proceed():
+                # Keep triggering I/O to generate metrics
+                self.mount_a.run_shell_payload(f"sudo dd if=/dev/zero of={trigger_file} bs=4k count=1 conv=notrunc", wait=True)
+                reduced_metrics = self.get_subvolume_metrics()
+                if reduced_metrics:
+                    counters_after_delete = reduced_metrics[0]["counters"]
+                    if counters_after_delete["used_bytes"] < baseline_used:
+                        break
+        self.assertIsNotNone(reduced_metrics, "Expected subvolume metrics after deletion")
+        counters_after_delete = reduced_metrics[0]["counters"]
+        self.assertLess(counters_after_delete["used_bytes"], baseline_used,
+                        "Used bytes should drop after deleting data")
+        self.assertEqual(counters_after_delete["quota_bytes"], quota_bytes,
+                         "Quota should remain unchanged after deletions")
 
         # wait for metrics to expire after inactivity
-        sleep(60)
+        sleep(30)
 
         # verify that metrics are not present anymore
         subvolume_metrics = self.get_subvolume_metrics()
         self.assertFalse(subvolume_metrics, "Subvolume metrics should be gone after inactivity window")
+
+    def test_subvolume_quota_resize_update(self):
+        """
+        Verify that subvolume quota changes (via resize) are reflected in metrics.
+        This tests that maybe_update_subvolume_quota() is called when quota is
+        broadcast to clients.
+        """
+        subvol_name = "resize_test_subv"
+        subv_path = f"/volumes/_nogroup/{subvol_name}"
+
+        # create subvolume with initial quota (1 GiB)
+        initial_quota = 1 * 1024 * 1024 * 1024
+        self.fs.run_ceph_cmd('fs', 'subvolume', 'create', 'cephfs', subvol_name,
+                             "--size", str(initial_quota))
+
+        # access subvolume to trigger registration
+        mount_point = self.mount_a.get_mount_point()
+        subvolume_fs_path = self.fs.get_ceph_cmd_stdout('fs', 'subvolume', 'getpath', 'cephfs', subvol_name).strip()
+        subvolume_fs_path = os.path.join(mount_point, subvolume_fs_path.strip('/'))
+        self.mount_a.run_shell(['ls', '-ld', subvolume_fs_path])
+
+        # do some writes to generate metrics
+        filename = os.path.join(subvolume_fs_path, "testfile")
+        self.mount_a.run_shell_payload("sudo fio "
+                                       "--name test -rw=write "
+                                       "--bs=4k --numjobs=1 --time_based "
+                                       "--runtime=3s --verify=0 --size=5M "
+                                       f"--filename={filename}", wait=True)
+
+        # verify initial quota in metrics
+        subvol_metrics = None
+        with safe_while(sleep=1, tries=30, action='wait for subvolume metrics') as proceed:
+            while proceed():
+                subvol_metrics = self.get_subvolume_metrics()
+                if subvol_metrics:
+                    break
+
+        self.assertIsNotNone(subvol_metrics, "Expected subvolume metrics to appear")
+        counters = subvol_metrics[0]["counters"]
+        self.assertEqual(counters["quota_bytes"], initial_quota,
+                         f"Expected initial quota_bytes={initial_quota}")
+
+        # resize subvolume to new quota (2 GiB)
+        new_quota = 2 * 1024 * 1024 * 1024
+        self.fs.run_ceph_cmd('fs', 'subvolume', 'resize', 'cephfs', subvol_name,
+                             str(new_quota))
+
+        # trigger quota broadcast by accessing the subvolume
+        self.mount_a.run_shell(['ls', '-ld', subvolume_fs_path])
+        # small I/O to ensure metrics update
+        self.mount_a.run_shell_payload(f"echo 'test' | sudo tee {filename}.trigger > /dev/null")
+
+        # verify new quota in metrics
+        updated_metrics = None
+        with safe_while(sleep=1, tries=30, action='wait for updated quota') as proceed:
+            while proceed():
+                updated_metrics = self.get_subvolume_metrics()
+                if updated_metrics:
+                    counters = updated_metrics[0]["counters"]
+                    if counters["quota_bytes"] == new_quota:
+                        break
+
+        self.assertIsNotNone(updated_metrics, "Expected updated subvolume metrics")
+        counters = updated_metrics[0]["counters"]
+        self.assertEqual(counters["quota_bytes"], new_quota,
+                         f"Expected quota_bytes to update to {new_quota} after resize")
+        labels = updated_metrics[0]["labels"]
+        self.assertEqual(labels["subvolume_path"], subv_path,
+                         "Unexpected subvolume_path in metrics")
+
+        # cleanup
+        self.fs.run_ceph_cmd('fs', 'subvolume', 'rm', 'cephfs', subvol_name)
+
+    def test_subvolume_quota_activity_prevents_eviction(self):
+        """
+        Verify that quota broadcasts keep subvolume_quota entries alive
+        (by updating last_activity), preventing eviction even without client I/O.
+        
+        This tests the last_activity update in maybe_update_subvolume_quota().
+        """
+        subvol_name = "evict_test_subv"
+
+        # create subvolume with quota
+        quota_bytes = 1 * 1024 * 1024 * 1024
+        self.fs.run_ceph_cmd('fs', 'subvolume', 'create', 'cephfs', subvol_name,
+                             "--size", str(quota_bytes))
+
+        # access subvolume to trigger registration
+        mount_point = self.mount_a.get_mount_point()
+        subvolume_fs_path = self.fs.get_ceph_cmd_stdout('fs', 'subvolume', 'getpath', 'cephfs', subvol_name).strip()
+        subvolume_fs_path = os.path.join(mount_point, subvolume_fs_path.strip('/'))
+
+        # do initial write to generate metrics
+        filename = os.path.join(subvolume_fs_path, "testfile")
+        self.mount_a.run_shell_payload("sudo fio "
+                                       "--name test -rw=write "
+                                       "--bs=4k --numjobs=1 --time_based "
+                                       "--runtime=3s --verify=0 --size=5M "
+                                       f"--filename={filename}", wait=True)
+
+        # verify metrics appear
+        subvol_metrics = None
+        with safe_while(sleep=1, tries=30, action='wait for subvolume metrics') as proceed:
+            while proceed():
+                subvol_metrics = self.get_subvolume_metrics()
+                if subvol_metrics:
+                    break
+
+        self.assertIsNotNone(subvol_metrics, "Expected subvolume metrics to appear")
+        initial_counters = subvol_metrics[0]["counters"]
+        self.assertEqual(initial_counters["quota_bytes"], quota_bytes)
+
+        # wait briefly and do another read to keep the entry alive through quota broadcast
+        sleep(2)
+
+        # access subvolume to trigger quota broadcast (not full I/O)
+        self.mount_a.run_shell(['ls', '-la', subvolume_fs_path])
+        self.mount_a.run_shell(['stat', filename])
+
+        # verify metrics still present
+        still_present = self.get_subvolume_metrics()
+        self.assertIsNotNone(still_present, "Metrics should still be present after directory access")
+        self.assertEqual(still_present[0]["counters"]["quota_bytes"], quota_bytes,
+                         "Quota should remain unchanged")
+
+        # cleanup
+        self.fs.run_ceph_cmd('fs', 'subvolume', 'rm', 'cephfs', subvol_name)
+
+    def test_multiple_subvolume_quotas(self):
+        """
+        Verify that multiple subvolumes each have independent quota tracking.
+        This tests that the subvolume_quota map correctly handles multiple entries.
+        """
+        subvol1_name = "multi_subv1"
+        subvol2_name = "multi_subv2"
+        subv1_path = f"/volumes/_nogroup/{subvol1_name}"
+        subv2_path = f"/volumes/_nogroup/{subvol2_name}"
+
+        # create two subvolumes with different quotas
+        quota1 = 1 * 1024 * 1024 * 1024  # 1 GiB
+        quota2 = 2 * 1024 * 1024 * 1024  # 2 GiB
+
+        self.fs.run_ceph_cmd('fs', 'subvolume', 'create', 'cephfs', subvol1_name,
+                             "--size", str(quota1))
+        self.fs.run_ceph_cmd('fs', 'subvolume', 'create', 'cephfs', subvol2_name,
+                             "--size", str(quota2))
+
+        mount_point = self.mount_a.get_mount_point()
+
+        # access and do I/O on both subvolumes
+        for subvol_name in [subvol1_name, subvol2_name]:
+            subvolume_fs_path = self.fs.get_ceph_cmd_stdout('fs', 'subvolume', 'getpath', 'cephfs', subvol_name).strip()
+            subvolume_fs_path = os.path.join(mount_point, subvolume_fs_path.strip('/'))
+            self.mount_a.run_shell(['ls', '-ld', subvolume_fs_path])
+
+            filename = os.path.join(subvolume_fs_path, "testfile")
+            self.mount_a.run_shell_payload("sudo fio "
+                                           "--name test -rw=write "
+                                           "--bs=4k --numjobs=1 --time_based "
+                                           "--runtime=3s --verify=0 --size=5M "
+                                           f"--filename={filename}", wait=True)
+
+        # wait for metrics to be collected
+        subvol_metrics = None
+        with safe_while(sleep=1, tries=30, action='wait for multiple subvolume metrics') as proceed:
+            while proceed():
+                subvol_metrics = self.get_subvolume_metrics()
+                if subvol_metrics and len(subvol_metrics) >= 2:
+                    break
+
+        self.assertIsNotNone(subvol_metrics, "Expected subvolume metrics for both subvolumes")
+        self.assertGreaterEqual(len(subvol_metrics), 2,
+                                "Expected at least 2 subvolume metrics entries")
+
+        # build a map of path -> quota from metrics
+        metrics_by_path = {}
+        for m in subvol_metrics:
+            path = m["labels"]["subvolume_path"]
+            quota = m["counters"]["quota_bytes"]
+            metrics_by_path[path] = quota
+
+        # verify each subvolume has correct quota
+        self.assertIn(subv1_path, metrics_by_path,
+                      f"Expected metrics for {subv1_path}")
+        self.assertIn(subv2_path, metrics_by_path,
+                      f"Expected metrics for {subv2_path}")
+        self.assertEqual(metrics_by_path[subv1_path], quota1,
+                         f"Expected quota {quota1} for {subv1_path}")
+        self.assertEqual(metrics_by_path[subv2_path], quota2,
+                         f"Expected quota {quota2} for {subv2_path}")
+
+        # cleanup
+        self.fs.run_ceph_cmd('fs', 'subvolume', 'rm', 'cephfs', subvol1_name)
+        self.fs.run_ceph_cmd('fs', 'subvolume', 'rm', 'cephfs', subvol2_name)
+
+    def test_subvolume_unlimited_quota(self):
+        """
+        Verify that subvolumes without quota (unlimited) report quota_bytes=0.
+        """
+        subvol_name = "unlimited_subv"
+        subv_path = f"/volumes/_nogroup/{subvol_name}"
+
+        # create subvolume without quota
+        self.fs.run_ceph_cmd('fs', 'subvolume', 'create', 'cephfs', subvol_name)
+
+        # access subvolume and do I/O
+        mount_point = self.mount_a.get_mount_point()
+        subvolume_fs_path = self.fs.get_ceph_cmd_stdout('fs', 'subvolume', 'getpath', 'cephfs', subvol_name).strip()
+        subvolume_fs_path = os.path.join(mount_point, subvolume_fs_path.strip('/'))
+        self.mount_a.run_shell(['ls', '-ld', subvolume_fs_path])
+
+        filename = os.path.join(subvolume_fs_path, "testfile")
+        self.mount_a.run_shell_payload("sudo fio "
+                                       "--name test -rw=write "
+                                       "--bs=4k --numjobs=1 --time_based "
+                                       "--runtime=3s --verify=0 --size=5M "
+                                       f"--filename={filename}", wait=True)
+
+        # wait for metrics with quota_bytes=0 (unlimited subvolume)
+        subvol_metrics = None
+        with safe_while(sleep=1, tries=30, action='wait for subvolume metrics') as proceed:
+            while proceed():
+                subvol_metrics = self.get_subvolume_metrics()
+                if subvol_metrics:
+                    # for unlimited quota, quota_bytes should be 0
+                    if subvol_metrics[0]["counters"]["quota_bytes"] == 0:
+                        break
+
+        self.assertIsNotNone(subvol_metrics, "Expected subvolume metrics")
+        counters = subvol_metrics[0]["counters"]
+        labels = subvol_metrics[0]["labels"]
+
+        self.assertEqual(labels["subvolume_path"], subv_path)
+        self.assertEqual(counters["quota_bytes"], 0,
+                         "Expected quota_bytes=0 for unlimited subvolume")
+        # used_bytes is fetched dynamically from the inode's rstat, so it should
+        # reflect actual usage even for unlimited quota subvolumes
+        self.assertGreater(counters["used_bytes"], 0,
+                           "Expected used_bytes > 0 after writes")
+
+        # cleanup
+        self.fs.run_ceph_cmd('fs', 'subvolume', 'rm', 'cephfs', subvol_name)
+
+    def test_subvolume_metrics_stress(self):
+        """
+        Comprehensive stress test for subvolume metrics covering:
+        - Multiple quota resizes (up and down)
+        - Data writes, partial deletes, overwrites
+        - Concurrent operations on multiple subvolumes
+        - Verifying metrics accurately track all changes
+        """
+        subvol_name = "stress_subv"
+        subv_path = f"/volumes/_nogroup/{subvol_name}"
+
+        # Phase 1: Create subvolume with initial small quota
+        initial_quota = 100 * 1024 * 1024  # 100 MiB
+        self.fs.run_ceph_cmd('fs', 'subvolume', 'create', 'cephfs', subvol_name,
+                             "--size", str(initial_quota))
+
+        mount_point = self.mount_a.get_mount_point()
+        subvolume_fs_path = self.fs.get_ceph_cmd_stdout('fs', 'subvolume', 'getpath', 'cephfs', subvol_name).strip()
+        subvolume_fs_path = os.path.join(mount_point, subvolume_fs_path.strip('/'))
+
+        log.info(f"Phase 1: Initial quota {initial_quota}, writing data...")
+
+        # Write multiple files
+        for i in range(3):
+            filename = os.path.join(subvolume_fs_path, f"file{i}")
+            self.mount_a.run_shell_payload("sudo fio "
+                                           "--name test -rw=write "
+                                           "--bs=4k --numjobs=1 --time_based "
+                                           f"--runtime=2s --verify=0 --size=5M "
+                                           f"--filename={filename}", wait=True)
+
+        # Verify initial metrics
+        subvol_metrics = None
+        with safe_while(sleep=1, tries=30, action='wait for initial metrics') as proceed:
+            while proceed():
+                subvol_metrics = self.get_subvolume_metrics()
+                if subvol_metrics:
+                    counters = subvol_metrics[0]["counters"]
+                    if counters["quota_bytes"] == initial_quota and counters["used_bytes"] > 0:
+                        break
+
+        self.assertIsNotNone(subvol_metrics, "Expected initial metrics")
+        counters = subvol_metrics[0]["counters"]
+        self.assertEqual(counters["quota_bytes"], initial_quota)
+        initial_used = counters["used_bytes"]
+        self.assertGreater(initial_used, 0, "Expected some data written")
+        log.info(f"Phase 1 complete: quota={counters['quota_bytes']}, used={initial_used}")
+
+        # Phase 2: Resize quota UP
+        larger_quota = 500 * 1024 * 1024  # 500 MiB
+        log.info(f"Phase 2: Resizing quota UP to {larger_quota}...")
+        self.fs.run_ceph_cmd('fs', 'subvolume', 'resize', 'cephfs', subvol_name,
+                             str(larger_quota))
+
+        # Trigger quota broadcast
+        self.mount_a.run_shell(['ls', '-la', subvolume_fs_path])
+        self.mount_a.run_shell_payload(f"echo 'trigger' | sudo tee {subvolume_fs_path}/trigger > /dev/null")
+
+        # Verify quota increased, used unchanged
+        updated_metrics = None
+        with safe_while(sleep=1, tries=30, action='wait for quota increase') as proceed:
+            while proceed():
+                updated_metrics = self.get_subvolume_metrics()
+                if updated_metrics:
+                    counters = updated_metrics[0]["counters"]
+                    if counters["quota_bytes"] == larger_quota:
+                        break
+
+        self.assertIsNotNone(updated_metrics)
+        counters = updated_metrics[0]["counters"]
+        self.assertEqual(counters["quota_bytes"], larger_quota)
+        # used_bytes should be roughly the same (small trigger file added)
+        self.assertGreater(counters["used_bytes"], initial_used - 1024)  # allow some variance
+        log.info(f"Phase 2 complete: quota={counters['quota_bytes']}, used={counters['used_bytes']}")
+
+        # Phase 3: Write more data to increase usage
+        log.info("Phase 3: Writing more data...")
+        for i in range(3, 6):
+            filename = os.path.join(subvolume_fs_path, f"file{i}")
+            self.mount_a.run_shell_payload("sudo fio "
+                                           "--name test -rw=write "
+                                           "--bs=4k --numjobs=1 --time_based "
+                                           f"--runtime=2s --verify=0 --size=5M "
+                                           f"--filename={filename}", wait=True)
+
+        # Verify used_bytes increased
+        more_metrics = None
+        with safe_while(sleep=1, tries=30, action='wait for increased usage') as proceed:
+            while proceed():
+                more_metrics = self.get_subvolume_metrics()
+                if more_metrics:
+                    counters = more_metrics[0]["counters"]
+                    if counters["used_bytes"] > initial_used * 1.5:  # expect significant increase
+                        break
+
+        self.assertIsNotNone(more_metrics)
+        counters = more_metrics[0]["counters"]
+        phase3_used = counters["used_bytes"]
+        self.assertGreater(phase3_used, initial_used * 1.5,
+                           "Expected used_bytes to increase significantly after more writes")
+        log.info(f"Phase 3 complete: quota={counters['quota_bytes']}, used={phase3_used}")
+
+        # Phase 4: Delete some files, verify used_bytes decreases
+        log.info("Phase 4: Deleting files 0-2...")
+        for i in range(3):
+            self.mount_a.run_shell_payload(f"sudo rm -f {subvolume_fs_path}/file{i}")
+        self.mount_a.run_shell_payload("sudo sync")
+
+        # Trigger I/O on remaining files to generate metrics (metrics only sent during I/O)
+        self.mount_a.run_shell_payload(f"sudo dd if=/dev/zero of={subvolume_fs_path}/trigger_delete bs=4k count=10", wait=True)
+
+        # Verify used_bytes decreased
+        deleted_metrics = None
+        with safe_while(sleep=1, tries=30, action='wait for decreased usage after delete') as proceed:
+            while proceed():
+                # Keep triggering writes to ensure metrics are generated
+                self.mount_a.run_shell_payload(f"sudo dd if=/dev/zero of={subvolume_fs_path}/trigger_delete bs=4k count=1 conv=notrunc", wait=True)
+                deleted_metrics = self.get_subvolume_metrics()
+                if deleted_metrics:
+                    counters = deleted_metrics[0]["counters"]
+                    if counters["used_bytes"] < phase3_used * 0.8:  # expect ~50% drop
+                        break
+
+        self.assertIsNotNone(deleted_metrics)
+        counters = deleted_metrics[0]["counters"]
+        phase4_used = counters["used_bytes"]
+        self.assertLess(phase4_used, phase3_used,
+                        "Expected used_bytes to decrease after file deletion")
+        log.info(f"Phase 4 complete: quota={counters['quota_bytes']}, used={phase4_used}")
+
+        # Phase 5: Resize quota DOWN (but still above current usage)
+        smaller_quota = 200 * 1024 * 1024  # 200 MiB
+        log.info(f"Phase 5: Resizing quota DOWN to {smaller_quota}...")
+        self.fs.run_ceph_cmd('fs', 'subvolume', 'resize', 'cephfs', subvol_name,
+                             str(smaller_quota))
+
+        # Trigger quota broadcast
+        self.mount_a.run_shell(['stat', subvolume_fs_path])
+
+        # Verify quota decreased
+        smaller_metrics = None
+        with safe_while(sleep=1, tries=30, action='wait for quota decrease') as proceed:
+            while proceed():
+                smaller_metrics = self.get_subvolume_metrics()
+                if smaller_metrics:
+                    counters = smaller_metrics[0]["counters"]
+                    if counters["quota_bytes"] == smaller_quota:
+                        break
+
+        self.assertIsNotNone(smaller_metrics)
+        counters = smaller_metrics[0]["counters"]
+        self.assertEqual(counters["quota_bytes"], smaller_quota)
+        log.info(f"Phase 5 complete: quota={counters['quota_bytes']}, used={counters['used_bytes']}")
+
+        # Phase 6: Overwrite existing files (usage should stay roughly same)
+        log.info("Phase 6: Overwriting existing files...")
+        for i in range(3, 6):
+            filename = os.path.join(subvolume_fs_path, f"file{i}")
+            self.mount_a.run_shell_payload("sudo fio "
+                                           "--name test -rw=write "
+                                           "--bs=4k --numjobs=1 --time_based "
+                                           f"--runtime=1s --verify=0 --size=5M "
+                                           f"--filename={filename}", wait=True)
+
+        overwrite_metrics = self.get_subvolume_metrics()
+        self.assertIsNotNone(overwrite_metrics)
+        counters = overwrite_metrics[0]["counters"]
+        # used_bytes should be similar (overwriting same files)
+        self.assertGreater(counters["used_bytes"], 0)
+        log.info(f"Phase 6 complete: quota={counters['quota_bytes']}, used={counters['used_bytes']}")
+
+        # Phase 7: Mixed read/write workload
+        log.info("Phase 7: Mixed read/write workload...")
+        filename = os.path.join(subvolume_fs_path, "mixed_file")
+        self.mount_a.run_shell_payload("sudo fio "
+                                       "--name test -rw=randrw --rwmixread=50 "
+                                       "--bs=4k --numjobs=2 --time_based "
+                                       "--runtime=3s --verify=0 --size=5M "
+                                       f"--filename={filename}", wait=True)
+
+        # Verify metrics still valid after mixed workload
+        mixed_metrics = None
+        with safe_while(sleep=1, tries=30, action='wait for mixed workload metrics') as proceed:
+            while proceed():
+                mixed_metrics = self.get_subvolume_metrics()
+                if mixed_metrics:
+                    counters = mixed_metrics[0]["counters"]
+                    # Should have both read and write ops
+                    if counters["avg_read_iops"] > 0 and counters["avg_write_iops"] > 0:
+                        break
+
+        self.assertIsNotNone(mixed_metrics)
+        counters = mixed_metrics[0]["counters"]
+        self.assertGreater(counters["avg_read_iops"], 0, "Expected read I/O from mixed workload")
+        self.assertGreater(counters["avg_write_iops"], 0, "Expected write I/O from mixed workload")
+        self.assertEqual(counters["quota_bytes"], smaller_quota, "Quota should remain unchanged")
+        log.info(f"Phase 7 complete: quota={counters['quota_bytes']}, used={counters['used_bytes']}, "
+                 f"read_iops={counters['avg_read_iops']}, write_iops={counters['avg_write_iops']}")
+
+        # Phase 8: Remove quota entirely (set to unlimited)
+        log.info("Phase 8: Removing quota (setting to unlimited)...")
+        self.fs.run_ceph_cmd('fs', 'subvolume', 'resize', 'cephfs', subvol_name, 'inf')
+
+        # Do a small write to generate metrics after quota change
+        self.mount_a.run_shell_payload(f"sudo dd if=/dev/zero of={subvolume_fs_path}/trigger_unlimited bs=4k count=10", wait=True)
+
+        # Verify quota_bytes becomes 0 (unlimited)
+        unlimited_metrics = None
+        with safe_while(sleep=1, tries=30, action='wait for unlimited quota') as proceed:
+            while proceed():
+                # Keep triggering I/O to generate fresh metrics
+                self.mount_a.run_shell_payload(f"sudo dd if=/dev/zero of={subvolume_fs_path}/trigger_unlimited bs=4k count=1 conv=notrunc", wait=True)
+                unlimited_metrics = self.get_subvolume_metrics()
+                if unlimited_metrics:
+                    counters = unlimited_metrics[0]["counters"]
+                    # quota_bytes=0 means unlimited
+                    if counters["quota_bytes"] == 0:
+                        break
+
+        self.assertIsNotNone(unlimited_metrics)
+        counters = unlimited_metrics[0]["counters"]
+        self.assertEqual(counters["quota_bytes"], 0, "Expected quota_bytes=0 for unlimited")
+        self.assertGreater(counters["used_bytes"], 0, "used_bytes should still be tracked")
+        log.info(f"Phase 8 complete: quota={counters['quota_bytes']}, used={counters['used_bytes']}")
+
+        # Phase 9: Re-apply quota after being unlimited
+        log.info("Phase 9: Re-applying quota after unlimited...")
+        final_quota = 300 * 1024 * 1024  # 300 MiB
+        self.fs.run_ceph_cmd('fs', 'subvolume', 'resize', 'cephfs', subvol_name,
+                             str(final_quota))
+
+        # Trigger broadcast
+        self.mount_a.run_shell_payload(f"echo 'final' | sudo tee {subvolume_fs_path}/final > /dev/null")
+
+        # Verify quota restored
+        final_metrics = None
+        with safe_while(sleep=1, tries=30, action='wait for restored quota') as proceed:
+            while proceed():
+                # Trigger I/O to generate metrics
+                self.mount_a.run_shell_payload(f"sudo dd if=/dev/zero of={subvolume_fs_path}/trigger_final bs=4k count=1 conv=notrunc", wait=True)
+                final_metrics = self.get_subvolume_metrics()
+                if final_metrics:
+                    counters = final_metrics[0]["counters"]
+                    if counters["quota_bytes"] == final_quota:
+                        break
+
+        self.assertIsNotNone(final_metrics)
+        counters = final_metrics[0]["counters"]
+        self.assertEqual(counters["quota_bytes"], final_quota,
+                         "Expected quota to be restored after being unlimited")
+        self.assertGreater(counters["used_bytes"], 0)
+        log.info(f"Phase 9 complete: quota={counters['quota_bytes']}, used={counters['used_bytes']}")
+
+        # Final verification: all metrics fields present and sane
+        labels = final_metrics[0]["labels"]
+        self.assertEqual(labels["fs_name"], "cephfs")
+        self.assertEqual(labels["subvolume_path"], subv_path)
+        self.assertIn("avg_read_iops", counters)
+        self.assertIn("avg_write_iops", counters)
+        self.assertIn("avg_read_tp_Bps", counters)
+        self.assertIn("avg_write_tp_Bps", counters)
+        self.assertIn("avg_read_lat_msec", counters)
+        self.assertIn("avg_write_lat_msec", counters)
+
+        log.info("Stress test completed successfully!")
+
+        # cleanup
+        self.fs.run_ceph_cmd('fs', 'subvolume', 'rm', 'cephfs', subvol_name)
+
+    def test_subvolume_metrics_eviction_reactivation(self):
+        """
+        Ensure subvolume quota cache entries are evicted after inactivity and
+        re-populated on the next quota broadcast.
+        """
+        # Shrink window to speed up eviction
+        # Use 'mds' service (not 'mds.*') for config set/get in vstart.
+        # The minimal allowed window is 30s; set to 30 to keep the test shorter.
+        original_window = self.fs.get_ceph_cmd_stdout('config', 'get', 'mds',
+                                                      'subv_metrics_window_interval').strip()
+        self.fs.run_ceph_cmd('config', 'set', 'mds', 'subv_metrics_window_interval', '30')
+        self.addCleanup(self.fs.run_ceph_cmd, 'config', 'set', 'mds',
+                        'subv_metrics_window_interval', original_window)
+
+        subvol_name = "evict_subv"
+        initial_quota = 50 * 1024 * 1024  # 50 MiB
+        updated_quota = 60 * 1024 * 1024  # 60 MiB
+
+        # Create subvolume with initial quota and write a small file
+        self.fs.run_ceph_cmd('fs', 'subvolume', 'create', 'cephfs', subvol_name,
+                             "--size", str(initial_quota))
+        mount_point = self.mount_a.get_mount_point()
+        subvolume_fs_path = self.fs.get_ceph_cmd_stdout('fs', 'subvolume', 'getpath',
+                                                        'cephfs', subvol_name).strip()
+        subvolume_fs_path = os.path.join(mount_point, subvolume_fs_path.strip('/'))
+        filename = os.path.join(subvolume_fs_path, "seed")
+        self.mount_a.run_shell_payload("sudo fio --name test -rw=write --bs=4k --numjobs=1 "
+                                       "--time_based --runtime=2s --verify=0 --size=4M "
+                                       f"--filename={filename}", wait=True)
+
+        # Wait for metrics to reflect initial quota
+        with safe_while(sleep=1, tries=30, action='wait for initial quota metrics') as proceed:
+            while proceed():
+                metrics = self.get_subvolume_metrics()
+                if metrics:
+                    counters = metrics[0]["counters"]
+                    if counters["quota_bytes"] == initial_quota and counters["used_bytes"] > 0:
+                        break
+
+        # Let the window expire (2 * window = 60s)
+        sleep(65)
+
+        # Change quota to force a new broadcast and repopulate cache
+        self.fs.run_ceph_cmd('fs', 'subvolume', 'resize', 'cephfs', subvol_name,
+                             str(updated_quota))
+        # Trigger broadcast/metrics generation
+        self.mount_a.run_shell_payload(f"echo 'poke' | sudo tee {subvolume_fs_path}/poke > /dev/null")
+
+        # Verify quota is updated after eviction and reactivation
+        metrics = None
+        with safe_while(sleep=1, tries=30, action='wait for quota after eviction') as proceed:
+            while proceed():
+                metrics = self.get_subvolume_metrics()
+                if metrics:
+                    counters = metrics[0]["counters"]
+                    if counters["quota_bytes"] == updated_quota:
+                        break
+
+        self.assertIsNotNone(metrics, "Expected metrics after reactivation")
+        counters = metrics[0]["counters"]
+        self.assertEqual(counters["quota_bytes"], updated_quota,
+                         "Expected quota to refresh after eviction")
+        self.assertGreater(counters["used_bytes"], 0, "Expected used_bytes to remain tracked")
+
+        # Cleanup
+        self.fs.run_ceph_cmd('fs', 'subvolume', 'rm', 'cephfs', subvol_name)