From: Kotresh HR Date: Sat, 7 Feb 2026 14:26:36 +0000 (+0530) Subject: qa: Add retry logic to remove most sleeps in mirroring tests X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=c1e827247bd20e8a1851bc2d7a9861c12d033ef0;p=ceph.git qa: Add retry logic to remove most sleeps in mirroring tests The mirroring tests contain lot of sleeps adding it up to ~1hr. This patch adds a retry logic and removes most of them. This is cleaner and saves considerable time in test time for mirroring. Fixes: https://tracker.ceph.com/issues/74878 Signed-off-by: Kotresh HR --- diff --git a/qa/tasks/cephfs/test_mirroring.py b/qa/tasks/cephfs/test_mirroring.py index f408fdb64e7..322f40e6327 100644 --- a/qa/tasks/cephfs/test_mirroring.py +++ b/qa/tasks/cephfs/test_mirroring.py @@ -4,6 +4,7 @@ import errno import logging import random import time +import functools from io import StringIO from collections import deque @@ -14,6 +15,45 @@ from teuthology.contextutil import safe_while log = logging.getLogger(__name__) + +# retry decorator +def retry_assert(timeout=60, interval=1): + """ + Retry a test helper until assertions inside it pass or timeout expires. + Prints retry count on each failure. + """ + tries = int(timeout/interval) + + def decorator(func): + @functools.wraps(func) + def wrapper(*args, **kwargs): + last_exc = None + attempt = 1 + + with safe_while(sleep=interval, tries=tries, action=f"retry {func.__name__}") as proceed: + while proceed(): + try: + return func(*args, **kwargs) + except (AssertionError, KeyError, IndexError) as e: + last_exc = e + log.debug( + f"[retry_assert] {func.__name__}: " + f"attempt {attempt} failed ({type(e).__name__}), retrying..." + ) + attempt += 1 + # Final failure + if last_exc is not None and hasattr(last_exc, "res"): + log.error("\n--- Last peer status (res) ---") + log.error(last_exc.res) + + raise AssertionError( + f"{func.__name__} did not succeed within {timeout}s " + f"after {attempt - 1} attempts" + ) from last_exc + + return wrapper + return decorator + class TestMirroring(CephFSTestCase): MDSS_REQUIRED = 5 CLIENTS_REQUIRED = 2 @@ -194,53 +234,112 @@ class TestMirroring(CephFSTestCase): self.assertLess(vafter["counters"]["directory_count"], vbefore["counters"]["directory_count"]) + @retry_assert(timeout=140, interval=2) + def check_mirror_status_after_failure(self): + status = self.get_mirror_daemon_status() + fs = status['filesystems'][0] + peer = fs['peers'][0] + + self.assertEqual(fs['directory_count'], 1) + self.assertEqual(peer['stats']['failure_count'], 1) + self.assertEqual(peer['stats']['recovery_count'], 0) + + @retry_assert(timeout=140, interval=2) + def check_mirror_status_after_failure_recovery(self): + status = self.get_mirror_daemon_status() + fs = status['filesystems'][0] + peer = fs['peers'][0] + + self.assertEqual(fs['directory_count'], 1) + self.assertEqual(peer['stats']['failure_count'], 1) + self.assertEqual(peer['stats']['recovery_count'], 1) + + @retry_assert(timeout=60, interval=3) + def check_peer_status_empty(self, fs_name, fs_id, peer_spec): + peer_uuid = self.get_peer_uuid(peer_spec) + res = self.mirror_daemon_command(f'peer status for fs: {fs_name}', + 'fs', 'mirror', 'peer', 'status', + f'{fs_name}@{fs_id}', peer_uuid) + try: + self.assertFalse(res) + except (AssertionError, KeyError, IndexError) as e: + e.res = res + raise + + @retry_assert(timeout=600, interval=10) def check_peer_status(self, fs_name, fs_id, peer_spec, dir_name, expected_snap_name, expected_snap_count): peer_uuid = self.get_peer_uuid(peer_spec) res = self.mirror_daemon_command(f'peer status for fs: {fs_name}', 'fs', 'mirror', 'peer', 'status', f'{fs_name}@{fs_id}', peer_uuid) - self.assertTrue(dir_name in res) - self.assertTrue(res[dir_name]['last_synced_snap']['name'] == expected_snap_name) - self.assertTrue(res[dir_name]['snaps_synced'] == expected_snap_count) + try: + self.assertTrue(dir_name in res) + self.assertTrue(res[dir_name]['last_synced_snap']['name'] == expected_snap_name) + self.assertTrue(res[dir_name]['snaps_synced'] == expected_snap_count) + except (AssertionError, KeyError, IndexError) as e: + e.res = res + raise + @retry_assert(timeout=60, interval=5) def check_peer_status_idle(self, fs_name, fs_id, peer_spec, dir_name, expected_snap_name, expected_snap_count): peer_uuid = self.get_peer_uuid(peer_spec) res = self.mirror_daemon_command(f'peer status for fs: {fs_name}', 'fs', 'mirror', 'peer', 'status', f'{fs_name}@{fs_id}', peer_uuid) - self.assertTrue(dir_name in res) - self.assertTrue('idle' == res[dir_name]['state']) - self.assertTrue(expected_snap_name == res[dir_name]['last_synced_snap']['name']) - self.assertTrue(expected_snap_count == res[dir_name]['snaps_synced']) + try: + self.assertTrue(dir_name in res) + self.assertTrue('idle' == res[dir_name]['state']) + self.assertTrue(expected_snap_name == res[dir_name]['last_synced_snap']['name']) + self.assertTrue(expected_snap_count == res[dir_name]['snaps_synced']) + except (AssertionError, KeyError, IndexError) as e: + e.res = res + raise + @retry_assert(timeout=60, interval=2) def check_peer_status_deleted_snap(self, fs_name, fs_id, peer_spec, dir_name, expected_delete_count): peer_uuid = self.get_peer_uuid(peer_spec) res = self.mirror_daemon_command(f'peer status for fs: {fs_name}', 'fs', 'mirror', 'peer', 'status', f'{fs_name}@{fs_id}', peer_uuid) - self.assertTrue(dir_name in res) - self.assertTrue(res[dir_name]['snaps_deleted'] == expected_delete_count) + try: + self.assertTrue(dir_name in res) + self.assertTrue(res[dir_name]['snaps_deleted'] == expected_delete_count) + except (AssertionError, KeyError, IndexError) as e: + e.res = res + raise + @retry_assert(timeout=60, interval=2) def check_peer_status_renamed_snap(self, fs_name, fs_id, peer_spec, dir_name, expected_rename_count): peer_uuid = self.get_peer_uuid(peer_spec) res = self.mirror_daemon_command(f'peer status for fs: {fs_name}', 'fs', 'mirror', 'peer', 'status', f'{fs_name}@{fs_id}', peer_uuid) - self.assertTrue(dir_name in res) - self.assertTrue(res[dir_name]['snaps_renamed'] == expected_rename_count) + try: + self.assertTrue(dir_name in res) + self.assertTrue(res[dir_name]['snaps_renamed'] == expected_rename_count) + except (AssertionError, KeyError, IndexError) as e: + e.res = res + raise + @retry_assert(timeout=60, interval=1) def check_peer_snap_in_progress(self, fs_name, fs_id, - peer_spec, dir_name, snap_name): + peer_spec, dir_name, snap_name, timeout=60, interval=1): peer_uuid = self.get_peer_uuid(peer_spec) - res = self.mirror_daemon_command(f'peer status for fs: {fs_name}', - 'fs', 'mirror', 'peer', 'status', - f'{fs_name}@{fs_id}', peer_uuid) - self.assertTrue('syncing' == res[dir_name]['state']) - self.assertTrue(res[dir_name]['current_syncing_snap']['name'] == snap_name) + deadline = time.time() + timeout + try: + res = self.mirror_daemon_command(f'peer status for fs: {fs_name}', + 'fs', 'mirror', 'peer', 'status', + f'{fs_name}@{fs_id}', peer_uuid) + + self.assertTrue('syncing' == res[dir_name]['state']) + self.assertTrue(res[dir_name]['current_syncing_snap']['name'] == snap_name) + except (AssertionError, KeyError, IndexError) as e: + e.res = res + raise def verify_snapshot(self, dir_name, snap_name): snap_list = self.mount_b.ls(path=f'{dir_name}/.snap') @@ -255,6 +354,7 @@ class TestMirroring(CephFSTestCase): log.debug(f'destination snapshot checksum {snap_name} {dest_res}') self.assertTrue(source_res == dest_res) + @retry_assert(timeout=150, interval=5) def verify_failed_directory(self, fs_name, fs_id, peer_spec, dir_name): peer_uuid = self.get_peer_uuid(peer_spec) res = self.mirror_daemon_command(f'peer status for fs: {fs_name}', @@ -590,7 +690,6 @@ class TestMirroring(CephFSTestCase): for i in range(10): self.mount_a.write_n_mb(os.path.join('d0', f'file.{i}'), 100) - time.sleep(60) self.enable_mirroring(self.primary_fs_name, self.primary_fs_id) self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d0') self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name) @@ -602,7 +701,6 @@ class TestMirroring(CephFSTestCase): # take a snapshot self.mount_a.run_shell(["mkdir", "d0/.snap/snap0"]) - time.sleep(120) self.check_peer_status(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", '/d0', 'snap0', 1) self.verify_snapshot('d0', 'snap0') @@ -620,12 +718,9 @@ class TestMirroring(CephFSTestCase): for i in range(15): self.mount_a.write_n_mb(os.path.join('d0', f'more_file.{i}'), 100) - time.sleep(60) - # take another snapshot self.mount_a.run_shell(["mkdir", "d0/.snap/snap1"]) - time.sleep(240) self.check_peer_status(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", '/d0', 'snap1', 2) self.verify_snapshot('d0', 'snap1') @@ -642,11 +737,11 @@ class TestMirroring(CephFSTestCase): # delete a snapshot self.mount_a.run_shell(["rmdir", "d0/.snap/snap0"]) - time.sleep(10) - snap_list = self.mount_b.ls(path='d0/.snap') - self.assertTrue('snap0' not in snap_list) self.check_peer_status_deleted_snap(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", '/d0', 1) + snap_list = self.mount_b.ls(path='d0/.snap') + self.assertTrue('snap0' not in snap_list) + # check snaps_deleted res = self.mirror_daemon_command(f'counter dump for fs: {self.primary_fs_name}', 'counter', 'dump') fourth = res[TestMirroring.PERF_COUNTER_KEY_NAME_CEPHFS_MIRROR_PEER][0] @@ -655,12 +750,11 @@ class TestMirroring(CephFSTestCase): # rename a snapshot self.mount_a.run_shell(["mv", "d0/.snap/snap1", "d0/.snap/snap2"]) - time.sleep(10) + self.check_peer_status_renamed_snap(self.primary_fs_name, self.primary_fs_id, + "client.mirror_remote@ceph", '/d0', 1) snap_list = self.mount_b.ls(path='d0/.snap') self.assertTrue('snap1' not in snap_list) self.assertTrue('snap2' in snap_list) - self.check_peer_status_renamed_snap(self.primary_fs_name, self.primary_fs_id, - "client.mirror_remote@ceph", '/d0', 1) # check snaps_renamed res = self.mirror_daemon_command(f'counter dump for fs: {self.primary_fs_name}', 'counter', 'dump') fifth = res[TestMirroring.PERF_COUNTER_KEY_NAME_CEPHFS_MIRROR_PEER][0] @@ -684,7 +778,6 @@ class TestMirroring(CephFSTestCase): # take a snapshot self.mount_a.run_shell(["mkdir", "d0/.snap/snap0"]) - time.sleep(10) self.check_peer_snap_in_progress(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", '/d0', 'snap0') @@ -722,7 +815,6 @@ class TestMirroring(CephFSTestCase): # take a snapshot self.mount_a.run_shell(["mkdir", "d0/.snap/snap0"]) - time.sleep(10) self.check_peer_snap_in_progress(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", '/d0', 'snap0') @@ -743,7 +835,6 @@ class TestMirroring(CephFSTestCase): # check if the rados addr is blocklisted self.assertTrue(self.mds_cluster.is_addr_blocklisted(rados_inst)) - time.sleep(500) self.check_peer_status(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", '/d0', 'snap0', expected_snap_count=1) self.verify_snapshot('d0', 'snap0') @@ -767,7 +858,6 @@ class TestMirroring(CephFSTestCase): self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d0') # wait for mirror daemon to mark it the directory as failed - time.sleep(120) self.verify_failed_directory(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", '/d0') @@ -776,7 +866,6 @@ class TestMirroring(CephFSTestCase): self.mount_a.run_shell(["mkdir", "d0/.snap/snap0"]) # wait for correction - time.sleep(120) self.check_peer_status(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", '/d0', 'snap0', 1) # check snaps_synced @@ -805,25 +894,12 @@ class TestMirroring(CephFSTestCase): # in daemon stats self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d0') - time.sleep(120) - status = self.get_mirror_daemon_status() - # we added one - peer = status['filesystems'][0]['peers'][0] - self.assertEqual(status['filesystems'][0]['directory_count'], 1) - # failure count should be reflected - self.assertEqual(peer['stats']['failure_count'], 1) - self.assertEqual(peer['stats']['recovery_count'], 0) + self.check_mirror_status_after_failure() # create the directory, mirror daemon would recover self.mount_a.run_shell(["mkdir", "d0"]) - time.sleep(120) - status = self.get_mirror_daemon_status() - peer = status['filesystems'][0]['peers'][0] - self.assertEqual(status['filesystems'][0]['directory_count'], 1) - # failure and recovery count should be reflected - self.assertEqual(peer['stats']['failure_count'], 1) - self.assertEqual(peer['stats']['recovery_count'], 1) + self.check_mirror_status_after_failure_recovery() self.disable_mirroring(self.primary_fs_name, self.primary_fs_id) @@ -959,7 +1035,6 @@ class TestMirroring(CephFSTestCase): # take a snapshot self.mount_a.run_shell(["mkdir", "d0/.snap/snap0"]) - time.sleep(30) self.check_peer_status(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", '/d0', 'snap0', 1) self.verify_snapshot('d0', 'snap0') @@ -986,7 +1061,6 @@ class TestMirroring(CephFSTestCase): # take a snapshot self.mount_a.run_shell(["mkdir", "d0/d1/d2/d3/.snap/snap0"]) - time.sleep(30) self.check_peer_status(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", '/d0/d1/d2/d3', 'snap0', 1) # check snaps_synced @@ -1001,7 +1075,6 @@ class TestMirroring(CephFSTestCase): # try syncing more snapshots self.mount_a.run_shell(["mkdir", "d0/d1/d2/d3/.snap/snap1"]) - time.sleep(30) self.check_peer_status(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", '/d0/d1/d2/d3', 'snap1', 2) # check snaps_synced @@ -1011,7 +1084,6 @@ class TestMirroring(CephFSTestCase): self.mount_a.run_shell(["rmdir", "d0/d1/d2/d3/.snap/snap0"]) self.mount_a.run_shell(["rmdir", "d0/d1/d2/d3/.snap/snap1"]) - time.sleep(15) self.check_peer_status_deleted_snap(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", '/d0/d1/d2/d3', 2) # check snaps_deleted @@ -1116,7 +1188,6 @@ class TestMirroring(CephFSTestCase): self.mount_a.run_shell(['mkdir', f'{repo_path}/.snap/snap_a']) # full copy, takes time - time.sleep(500) self.check_peer_status(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", f'/{repo_path}', 'snap_a', 1) self.verify_snapshot(repo_path, 'snap_a') @@ -1124,21 +1195,23 @@ class TestMirroring(CephFSTestCase): res = self.mirror_daemon_command(f'counter dump for fs: {self.primary_fs_name}', 'counter', 'dump') vsecond = res[TestMirroring.PERF_COUNTER_KEY_NAME_CEPHFS_MIRROR_PEER][0] self.assertGreater(vsecond["counters"]["snaps_synced"], vfirst["counters"]["snaps_synced"]) + full_sync_duration = vsecond["counters"]["last_synced_duration"] # create some diff - num = random.randint(5, 20) + num = random.randint(5, 10) log.debug(f'resetting to HEAD~{num}') exec_git_cmd(["reset", "--hard", f'HEAD~{num}']) self.mount_a.run_shell(['mkdir', f'{repo_path}/.snap/snap_b']) # incremental copy, should be fast - time.sleep(180) self.check_peer_status(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", f'/{repo_path}', 'snap_b', 2) self.verify_snapshot(repo_path, 'snap_b') res = self.mirror_daemon_command(f'counter dump for fs: {self.primary_fs_name}', 'counter', 'dump') vthird = res[TestMirroring.PERF_COUNTER_KEY_NAME_CEPHFS_MIRROR_PEER][0] self.assertGreater(vthird["counters"]["snaps_synced"], vsecond["counters"]["snaps_synced"]) + inc_sync_duration1 = vthird["counters"]["last_synced_duration"] + self.assertGreaterEqual(float(full_sync_duration), float(inc_sync_duration1)) # diff again, this time back to HEAD log.debug('resetting to HEAD') @@ -1146,13 +1219,14 @@ class TestMirroring(CephFSTestCase): self.mount_a.run_shell(['mkdir', f'{repo_path}/.snap/snap_c']) # incremental copy, should be fast - time.sleep(180) self.check_peer_status(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", f'/{repo_path}', 'snap_c', 3) self.verify_snapshot(repo_path, 'snap_c') res = self.mirror_daemon_command(f'counter dump for fs: {self.primary_fs_name}', 'counter', 'dump') vfourth = res[TestMirroring.PERF_COUNTER_KEY_NAME_CEPHFS_MIRROR_PEER][0] self.assertGreater(vfourth["counters"]["snaps_synced"], vthird["counters"]["snaps_synced"]) + inc_sync_duration2 = vfourth["counters"]["last_synced_duration"] + self.assertGreaterEqual(float(full_sync_duration), float(inc_sync_duration2)) self.disable_mirroring(self.primary_fs_name, self.primary_fs_id) @@ -1216,7 +1290,6 @@ class TestMirroring(CephFSTestCase): res = self.mirror_daemon_command(f'counter dump for fs: {self.primary_fs_name}', 'counter', 'dump') vbefore = res[TestMirroring.PERF_COUNTER_KEY_NAME_CEPHFS_MIRROR_PEER][0] self.mount_a.run_shell(['mkdir', f'd0/.snap/{snapname}']) - time.sleep(30) self.check_peer_status(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", '/d0', snapname, turns+1) verify_types('d0', fnames, snapname) @@ -1264,7 +1337,6 @@ class TestMirroring(CephFSTestCase): self.mount_a.run_shell(['mkdir', f'{repo_path}/.snap/snap_a']) # full copy, takes time - time.sleep(500) self.check_peer_status(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", f'/{repo_path}', 'snap_a', 1) self.verify_snapshot(repo_path, 'snap_a') @@ -1283,7 +1355,6 @@ class TestMirroring(CephFSTestCase): self.mount_a.run_shell(['rmdir', f'{repo_path}/.snap/snap_a']) # incremental copy but based on remote dir_root - time.sleep(300) self.check_peer_status(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", f'/{repo_path}', 'snap_b', 2) self.verify_snapshot(repo_path, 'snap_b') @@ -1367,7 +1438,7 @@ class TestMirroring(CephFSTestCase): self.remove_directory(self.primary_fs_name, self.primary_fs_id, '/d2') # Wait a while for the sync backoff - time.sleep(500) + self.check_peer_status_empty(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph") log.debug('removing snapshots') self.mount_a.run_shell(["rmdir", f"d0/.snap/{snap_name}"]) @@ -1397,8 +1468,6 @@ class TestMirroring(CephFSTestCase): self.mount_a.run_shell(["mkdir", f"d2/.snap/{snap_name}"]) # Wait for the threads to finish - time.sleep(500) - self.check_peer_status(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", '/d0', f'{snap_name}', 1) self.verify_snapshot('d0', f'{snap_name}') @@ -1426,7 +1495,6 @@ class TestMirroring(CephFSTestCase): self.add_directory(self.primary_fs_name, self.primary_fs_id, '/l1') self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name) - time.sleep(60) self.check_peer_status(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", '/l1', 'snap0', 1) # dump perf counters @@ -1475,7 +1543,6 @@ class TestMirroring(CephFSTestCase): expected_snap_count = 1 self.mount_a.run_shell(['mkdir', f'{dir_name}/.snap/{snap_name}']) - time.sleep(30) # confirm snapshot synced and status 'idle' self.check_peer_status_idle(self.primary_fs_name, self.primary_fs_id, peer_spec, f'/{dir_name}', snap_name, expected_snap_count) @@ -1539,8 +1606,6 @@ class TestMirroring(CephFSTestCase): # add the directory for mirroring self.add_directory(self.primary_fs_name, self.primary_fs_id, f'/{dir_name}') - time.sleep(60) - # confirm snapshot synced and status 'idle' expected_snap_count = 2 self.check_peer_status_idle(self.primary_fs_name, self.primary_fs_id,