import logging
import random
import time
+import functools
from io import StringIO
from collections import deque
log = logging.getLogger(__name__)
+
+# retry decorator
+def retry_assert(timeout=60, interval=1):
+ """
+ Retry a test helper until assertions inside it pass or timeout expires.
+ Prints retry count on each failure.
+ """
+ tries = int(timeout/interval)
+
+ def decorator(func):
+ @functools.wraps(func)
+ def wrapper(*args, **kwargs):
+ last_exc = None
+ attempt = 1
+
+ with safe_while(sleep=interval, tries=tries, action=f"retry {func.__name__}") as proceed:
+ while proceed():
+ try:
+ return func(*args, **kwargs)
+ except (AssertionError, KeyError, IndexError) as e:
+ last_exc = e
+ log.debug(
+ f"[retry_assert] {func.__name__}: "
+ f"attempt {attempt} failed ({type(e).__name__}), retrying..."
+ )
+ attempt += 1
+ # Final failure
+ if last_exc is not None and hasattr(last_exc, "res"):
+ log.error("\n--- Last peer status (res) ---")
+ log.error(last_exc.res)
+
+ raise AssertionError(
+ f"{func.__name__} did not succeed within {timeout}s "
+ f"after {attempt - 1} attempts"
+ ) from last_exc
+
+ return wrapper
+ return decorator
+
class TestMirroring(CephFSTestCase):
MDSS_REQUIRED = 5
CLIENTS_REQUIRED = 2
self.assertLess(vafter["counters"]["directory_count"], vbefore["counters"]["directory_count"])
+ @retry_assert(timeout=140, interval=2)
+ def check_mirror_status_after_failure(self):
+ status = self.get_mirror_daemon_status()
+ fs = status['filesystems'][0]
+ peer = fs['peers'][0]
+
+ self.assertEqual(fs['directory_count'], 1)
+ self.assertEqual(peer['stats']['failure_count'], 1)
+ self.assertEqual(peer['stats']['recovery_count'], 0)
+
+ @retry_assert(timeout=140, interval=2)
+ def check_mirror_status_after_failure_recovery(self):
+ status = self.get_mirror_daemon_status()
+ fs = status['filesystems'][0]
+ peer = fs['peers'][0]
+
+ self.assertEqual(fs['directory_count'], 1)
+ self.assertEqual(peer['stats']['failure_count'], 1)
+ self.assertEqual(peer['stats']['recovery_count'], 1)
+
+ @retry_assert(timeout=60, interval=3)
+ def check_peer_status_empty(self, fs_name, fs_id, peer_spec):
+ peer_uuid = self.get_peer_uuid(peer_spec)
+ res = self.mirror_daemon_command(f'peer status for fs: {fs_name}',
+ 'fs', 'mirror', 'peer', 'status',
+ f'{fs_name}@{fs_id}', peer_uuid)
+ try:
+ self.assertFalse(res)
+ except (AssertionError, KeyError, IndexError) as e:
+ e.res = res
+ raise
+
+ @retry_assert(timeout=600, interval=10)
def check_peer_status(self, fs_name, fs_id, peer_spec, dir_name, expected_snap_name,
expected_snap_count):
peer_uuid = self.get_peer_uuid(peer_spec)
res = self.mirror_daemon_command(f'peer status for fs: {fs_name}',
'fs', 'mirror', 'peer', 'status',
f'{fs_name}@{fs_id}', peer_uuid)
- self.assertTrue(dir_name in res)
- self.assertTrue(res[dir_name]['last_synced_snap']['name'] == expected_snap_name)
- self.assertTrue(res[dir_name]['snaps_synced'] == expected_snap_count)
+ try:
+ self.assertTrue(dir_name in res)
+ self.assertTrue(res[dir_name]['last_synced_snap']['name'] == expected_snap_name)
+ self.assertTrue(res[dir_name]['snaps_synced'] == expected_snap_count)
+ except (AssertionError, KeyError, IndexError) as e:
+ e.res = res
+ raise
+ @retry_assert(timeout=60, interval=5)
def check_peer_status_idle(self, fs_name, fs_id, peer_spec, dir_name, expected_snap_name,
expected_snap_count):
peer_uuid = self.get_peer_uuid(peer_spec)
res = self.mirror_daemon_command(f'peer status for fs: {fs_name}',
'fs', 'mirror', 'peer', 'status',
f'{fs_name}@{fs_id}', peer_uuid)
- self.assertTrue(dir_name in res)
- self.assertTrue('idle' == res[dir_name]['state'])
- self.assertTrue(expected_snap_name == res[dir_name]['last_synced_snap']['name'])
- self.assertTrue(expected_snap_count == res[dir_name]['snaps_synced'])
+ try:
+ self.assertTrue(dir_name in res)
+ self.assertTrue('idle' == res[dir_name]['state'])
+ self.assertTrue(expected_snap_name == res[dir_name]['last_synced_snap']['name'])
+ self.assertTrue(expected_snap_count == res[dir_name]['snaps_synced'])
+ except (AssertionError, KeyError, IndexError) as e:
+ e.res = res
+ raise
+ @retry_assert(timeout=60, interval=2)
def check_peer_status_deleted_snap(self, fs_name, fs_id, peer_spec, dir_name,
expected_delete_count):
peer_uuid = self.get_peer_uuid(peer_spec)
res = self.mirror_daemon_command(f'peer status for fs: {fs_name}',
'fs', 'mirror', 'peer', 'status',
f'{fs_name}@{fs_id}', peer_uuid)
- self.assertTrue(dir_name in res)
- self.assertTrue(res[dir_name]['snaps_deleted'] == expected_delete_count)
+ try:
+ self.assertTrue(dir_name in res)
+ self.assertTrue(res[dir_name]['snaps_deleted'] == expected_delete_count)
+ except (AssertionError, KeyError, IndexError) as e:
+ e.res = res
+ raise
+ @retry_assert(timeout=60, interval=2)
def check_peer_status_renamed_snap(self, fs_name, fs_id, peer_spec, dir_name,
expected_rename_count):
peer_uuid = self.get_peer_uuid(peer_spec)
res = self.mirror_daemon_command(f'peer status for fs: {fs_name}',
'fs', 'mirror', 'peer', 'status',
f'{fs_name}@{fs_id}', peer_uuid)
- self.assertTrue(dir_name in res)
- self.assertTrue(res[dir_name]['snaps_renamed'] == expected_rename_count)
+ try:
+ self.assertTrue(dir_name in res)
+ self.assertTrue(res[dir_name]['snaps_renamed'] == expected_rename_count)
+ except (AssertionError, KeyError, IndexError) as e:
+ e.res = res
+ raise
+ @retry_assert(timeout=60, interval=1)
def check_peer_snap_in_progress(self, fs_name, fs_id,
- peer_spec, dir_name, snap_name):
+ peer_spec, dir_name, snap_name, timeout=60, interval=1):
peer_uuid = self.get_peer_uuid(peer_spec)
- res = self.mirror_daemon_command(f'peer status for fs: {fs_name}',
- 'fs', 'mirror', 'peer', 'status',
- f'{fs_name}@{fs_id}', peer_uuid)
- self.assertTrue('syncing' == res[dir_name]['state'])
- self.assertTrue(res[dir_name]['current_syncing_snap']['name'] == snap_name)
+ deadline = time.time() + timeout
+ try:
+ res = self.mirror_daemon_command(f'peer status for fs: {fs_name}',
+ 'fs', 'mirror', 'peer', 'status',
+ f'{fs_name}@{fs_id}', peer_uuid)
+
+ self.assertTrue('syncing' == res[dir_name]['state'])
+ self.assertTrue(res[dir_name]['current_syncing_snap']['name'] == snap_name)
+ except (AssertionError, KeyError, IndexError) as e:
+ e.res = res
+ raise
def verify_snapshot(self, dir_name, snap_name):
snap_list = self.mount_b.ls(path=f'{dir_name}/.snap')
log.debug(f'destination snapshot checksum {snap_name} {dest_res}')
self.assertTrue(source_res == dest_res)
+ @retry_assert(timeout=150, interval=5)
def verify_failed_directory(self, fs_name, fs_id, peer_spec, dir_name):
peer_uuid = self.get_peer_uuid(peer_spec)
res = self.mirror_daemon_command(f'peer status for fs: {fs_name}',
for i in range(10):
self.mount_a.write_n_mb(os.path.join('d0', f'file.{i}'), 100)
- time.sleep(60)
self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d0')
self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name)
# take a snapshot
self.mount_a.run_shell(["mkdir", "d0/.snap/snap0"])
- time.sleep(120)
self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
"client.mirror_remote@ceph", '/d0', 'snap0', 1)
self.verify_snapshot('d0', 'snap0')
for i in range(15):
self.mount_a.write_n_mb(os.path.join('d0', f'more_file.{i}'), 100)
- time.sleep(60)
-
# take another snapshot
self.mount_a.run_shell(["mkdir", "d0/.snap/snap1"])
- time.sleep(240)
self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
"client.mirror_remote@ceph", '/d0', 'snap1', 2)
self.verify_snapshot('d0', 'snap1')
# delete a snapshot
self.mount_a.run_shell(["rmdir", "d0/.snap/snap0"])
- time.sleep(10)
- snap_list = self.mount_b.ls(path='d0/.snap')
- self.assertTrue('snap0' not in snap_list)
self.check_peer_status_deleted_snap(self.primary_fs_name, self.primary_fs_id,
"client.mirror_remote@ceph", '/d0', 1)
+ snap_list = self.mount_b.ls(path='d0/.snap')
+ self.assertTrue('snap0' not in snap_list)
+
# check snaps_deleted
res = self.mirror_daemon_command(f'counter dump for fs: {self.primary_fs_name}', 'counter', 'dump')
fourth = res[TestMirroring.PERF_COUNTER_KEY_NAME_CEPHFS_MIRROR_PEER][0]
# rename a snapshot
self.mount_a.run_shell(["mv", "d0/.snap/snap1", "d0/.snap/snap2"])
- time.sleep(10)
+ self.check_peer_status_renamed_snap(self.primary_fs_name, self.primary_fs_id,
+ "client.mirror_remote@ceph", '/d0', 1)
snap_list = self.mount_b.ls(path='d0/.snap')
self.assertTrue('snap1' not in snap_list)
self.assertTrue('snap2' in snap_list)
- self.check_peer_status_renamed_snap(self.primary_fs_name, self.primary_fs_id,
- "client.mirror_remote@ceph", '/d0', 1)
# check snaps_renamed
res = self.mirror_daemon_command(f'counter dump for fs: {self.primary_fs_name}', 'counter', 'dump')
fifth = res[TestMirroring.PERF_COUNTER_KEY_NAME_CEPHFS_MIRROR_PEER][0]
# take a snapshot
self.mount_a.run_shell(["mkdir", "d0/.snap/snap0"])
- time.sleep(10)
self.check_peer_snap_in_progress(self.primary_fs_name, self.primary_fs_id,
"client.mirror_remote@ceph", '/d0', 'snap0')
# take a snapshot
self.mount_a.run_shell(["mkdir", "d0/.snap/snap0"])
- time.sleep(10)
self.check_peer_snap_in_progress(self.primary_fs_name, self.primary_fs_id,
"client.mirror_remote@ceph", '/d0', 'snap0')
# check if the rados addr is blocklisted
self.assertTrue(self.mds_cluster.is_addr_blocklisted(rados_inst))
- time.sleep(500)
self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
"client.mirror_remote@ceph", '/d0', 'snap0', expected_snap_count=1)
self.verify_snapshot('d0', 'snap0')
self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d0')
# wait for mirror daemon to mark it the directory as failed
- time.sleep(120)
self.verify_failed_directory(self.primary_fs_name, self.primary_fs_id,
"client.mirror_remote@ceph", '/d0')
self.mount_a.run_shell(["mkdir", "d0/.snap/snap0"])
# wait for correction
- time.sleep(120)
self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
"client.mirror_remote@ceph", '/d0', 'snap0', 1)
# check snaps_synced
# in daemon stats
self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d0')
- time.sleep(120)
- status = self.get_mirror_daemon_status()
- # we added one
- peer = status['filesystems'][0]['peers'][0]
- self.assertEqual(status['filesystems'][0]['directory_count'], 1)
- # failure count should be reflected
- self.assertEqual(peer['stats']['failure_count'], 1)
- self.assertEqual(peer['stats']['recovery_count'], 0)
+ self.check_mirror_status_after_failure()
# create the directory, mirror daemon would recover
self.mount_a.run_shell(["mkdir", "d0"])
- time.sleep(120)
- status = self.get_mirror_daemon_status()
- peer = status['filesystems'][0]['peers'][0]
- self.assertEqual(status['filesystems'][0]['directory_count'], 1)
- # failure and recovery count should be reflected
- self.assertEqual(peer['stats']['failure_count'], 1)
- self.assertEqual(peer['stats']['recovery_count'], 1)
+ self.check_mirror_status_after_failure_recovery()
self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
# take a snapshot
self.mount_a.run_shell(["mkdir", "d0/.snap/snap0"])
- time.sleep(30)
self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
"client.mirror_remote@ceph", '/d0', 'snap0', 1)
self.verify_snapshot('d0', 'snap0')
# take a snapshot
self.mount_a.run_shell(["mkdir", "d0/d1/d2/d3/.snap/snap0"])
- time.sleep(30)
self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
"client.mirror_remote@ceph", '/d0/d1/d2/d3', 'snap0', 1)
# check snaps_synced
# try syncing more snapshots
self.mount_a.run_shell(["mkdir", "d0/d1/d2/d3/.snap/snap1"])
- time.sleep(30)
self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
"client.mirror_remote@ceph", '/d0/d1/d2/d3', 'snap1', 2)
# check snaps_synced
self.mount_a.run_shell(["rmdir", "d0/d1/d2/d3/.snap/snap0"])
self.mount_a.run_shell(["rmdir", "d0/d1/d2/d3/.snap/snap1"])
- time.sleep(15)
self.check_peer_status_deleted_snap(self.primary_fs_name, self.primary_fs_id,
"client.mirror_remote@ceph", '/d0/d1/d2/d3', 2)
# check snaps_deleted
self.mount_a.run_shell(['mkdir', f'{repo_path}/.snap/snap_a'])
# full copy, takes time
- time.sleep(500)
self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
"client.mirror_remote@ceph", f'/{repo_path}', 'snap_a', 1)
self.verify_snapshot(repo_path, 'snap_a')
res = self.mirror_daemon_command(f'counter dump for fs: {self.primary_fs_name}', 'counter', 'dump')
vsecond = res[TestMirroring.PERF_COUNTER_KEY_NAME_CEPHFS_MIRROR_PEER][0]
self.assertGreater(vsecond["counters"]["snaps_synced"], vfirst["counters"]["snaps_synced"])
+ full_sync_duration = vsecond["counters"]["last_synced_duration"]
# create some diff
- num = random.randint(5, 20)
+ num = random.randint(5, 10)
log.debug(f'resetting to HEAD~{num}')
exec_git_cmd(["reset", "--hard", f'HEAD~{num}'])
self.mount_a.run_shell(['mkdir', f'{repo_path}/.snap/snap_b'])
# incremental copy, should be fast
- time.sleep(180)
self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
"client.mirror_remote@ceph", f'/{repo_path}', 'snap_b', 2)
self.verify_snapshot(repo_path, 'snap_b')
res = self.mirror_daemon_command(f'counter dump for fs: {self.primary_fs_name}', 'counter', 'dump')
vthird = res[TestMirroring.PERF_COUNTER_KEY_NAME_CEPHFS_MIRROR_PEER][0]
self.assertGreater(vthird["counters"]["snaps_synced"], vsecond["counters"]["snaps_synced"])
+ inc_sync_duration1 = vthird["counters"]["last_synced_duration"]
+ self.assertGreaterEqual(float(full_sync_duration), float(inc_sync_duration1))
# diff again, this time back to HEAD
log.debug('resetting to HEAD')
self.mount_a.run_shell(['mkdir', f'{repo_path}/.snap/snap_c'])
# incremental copy, should be fast
- time.sleep(180)
self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
"client.mirror_remote@ceph", f'/{repo_path}', 'snap_c', 3)
self.verify_snapshot(repo_path, 'snap_c')
res = self.mirror_daemon_command(f'counter dump for fs: {self.primary_fs_name}', 'counter', 'dump')
vfourth = res[TestMirroring.PERF_COUNTER_KEY_NAME_CEPHFS_MIRROR_PEER][0]
self.assertGreater(vfourth["counters"]["snaps_synced"], vthird["counters"]["snaps_synced"])
+ inc_sync_duration2 = vfourth["counters"]["last_synced_duration"]
+ self.assertGreaterEqual(float(full_sync_duration), float(inc_sync_duration2))
self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
res = self.mirror_daemon_command(f'counter dump for fs: {self.primary_fs_name}', 'counter', 'dump')
vbefore = res[TestMirroring.PERF_COUNTER_KEY_NAME_CEPHFS_MIRROR_PEER][0]
self.mount_a.run_shell(['mkdir', f'd0/.snap/{snapname}'])
- time.sleep(30)
self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
"client.mirror_remote@ceph", '/d0', snapname, turns+1)
verify_types('d0', fnames, snapname)
self.mount_a.run_shell(['mkdir', f'{repo_path}/.snap/snap_a'])
# full copy, takes time
- time.sleep(500)
self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
"client.mirror_remote@ceph", f'/{repo_path}', 'snap_a', 1)
self.verify_snapshot(repo_path, 'snap_a')
self.mount_a.run_shell(['rmdir', f'{repo_path}/.snap/snap_a'])
# incremental copy but based on remote dir_root
- time.sleep(300)
self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
"client.mirror_remote@ceph", f'/{repo_path}', 'snap_b', 2)
self.verify_snapshot(repo_path, 'snap_b')
self.remove_directory(self.primary_fs_name, self.primary_fs_id, '/d2')
# Wait a while for the sync backoff
- time.sleep(500)
+ self.check_peer_status_empty(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph")
log.debug('removing snapshots')
self.mount_a.run_shell(["rmdir", f"d0/.snap/{snap_name}"])
self.mount_a.run_shell(["mkdir", f"d2/.snap/{snap_name}"])
# Wait for the threads to finish
- time.sleep(500)
-
self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
"client.mirror_remote@ceph", '/d0', f'{snap_name}', 1)
self.verify_snapshot('d0', f'{snap_name}')
self.add_directory(self.primary_fs_name, self.primary_fs_id, '/l1')
self.peer_add(self.primary_fs_name, self.primary_fs_id, "client.mirror_remote@ceph", self.secondary_fs_name)
- time.sleep(60)
self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
"client.mirror_remote@ceph", '/l1', 'snap0', 1)
# dump perf counters
expected_snap_count = 1
self.mount_a.run_shell(['mkdir', f'{dir_name}/.snap/{snap_name}'])
- time.sleep(30)
# confirm snapshot synced and status 'idle'
self.check_peer_status_idle(self.primary_fs_name, self.primary_fs_id,
peer_spec, f'/{dir_name}', snap_name, expected_snap_count)
# add the directory for mirroring
self.add_directory(self.primary_fs_name, self.primary_fs_id, f'/{dir_name}')
- time.sleep(60)
-
# confirm snapshot synced and status 'idle'
expected_snap_count = 2
self.check_peer_status_idle(self.primary_fs_name, self.primary_fs_id,