import os
+import re
import json
import base64
import errno
self.assertEqual(peer['stats']['recovery_count'], 1)
def peer_dir_status(self, res, dir_name, peer_uuid):
+ self.assertIn('metrics', res)
return res['metrics'][dir_name]['peer'][peer_uuid]
+ def assert_last_synced_snap_metrics(self, last_synced_snap):
+ for key in ('crawl_duration', 'datasync_queue_wait_duration', 'sync_duration',
+ 'sync_time_stamp', 'sync_bytes', 'sync_files'):
+ self.assertIn(key, last_synced_snap, msg=f'missing last_synced_snap.{key}')
+ self.assertRegex(
+ last_synced_snap['sync_bytes'],
+ r'^\d+(\.\d+)?\s+(B|KiB|MiB|GiB|TiB|PiB)$')
+ self.assertIsInstance(last_synced_snap['sync_files'], int)
+ self.assertGreaterEqual(last_synced_snap['sync_files'], 0)
+
+ def assert_syncing_snap_metrics(self, snap, sync_mode=None):
+ for key in ('sync-mode', 'avg_read_throughput_bytes', 'avg_write_throughput_bytes',
+ 'crawl', 'bytes', 'files', 'eta'):
+ self.assertIn(key, snap, msg=f'missing current_syncing_snap.{key}')
+ if sync_mode is not None:
+ self.assertEqual(snap['sync-mode'], sync_mode)
+ self.assertTrue(snap['avg_read_throughput_bytes'].endswith('/s'))
+ self.assertTrue(snap['avg_write_throughput_bytes'].endswith('/s'))
+ self.assertIn(snap['crawl']['state'], ('in-progress', 'completed'))
+ self.assertTrue(snap['crawl']['duration'])
+ bytes_obj = snap['bytes']
+ self.assertIn('sync_bytes', bytes_obj)
+ self.assertIn('total_bytes', bytes_obj)
+ if bytes_obj.get('total_bytes') and bytes_obj['total_bytes'] != '0.00 B':
+ self.assertIn('sync_percent', bytes_obj)
+ files_obj = snap['files']
+ self.assertIn('sync_files', files_obj)
+ self.assertIn('total_files', files_obj)
+ if files_obj.get('total_files', 0) > 0:
+ self.assertIn('sync_percent', files_obj)
+ self.assertTrue(
+ snap['eta'] == 'calculating...' or bool(re.search(r'\d', snap['eta'])))
+
+ @retry_assert(timeout=120, interval=1)
+ def check_peer_syncing_progress_metrics(self, fs_name, fs_id, peer_spec, dir_name,
+ snap_name, sync_mode=None):
+ peer_uuid = self.get_peer_uuid(peer_spec)
+ res = self.mirror_daemon_command(f'peer status for fs: {fs_name}',
+ 'fs', 'mirror', 'peer', 'status',
+ f'{fs_name}@{fs_id}', peer_uuid)
+ try:
+ dir_stat = self.peer_dir_status(res, dir_name, peer_uuid)
+ self.assertEqual(dir_stat['state'], 'syncing')
+ snap = dir_stat['current_syncing_snap']
+ self.assertEqual(snap['name'], snap_name)
+ self.assert_syncing_snap_metrics(snap, sync_mode=sync_mode)
+ if 'datasync_queue_wait' in snap:
+ self.assertIn(snap['datasync_queue_wait']['state'],
+ ('waiting', 'completed'))
+ except RETRY_EXCEPTIONS as e:
+ e.res = res
+ raise
+
@retry_assert(timeout=60, interval=3)
def check_peer_status_empty(self, fs_name, fs_id, peer_spec):
peer_uuid = self.get_peer_uuid(peer_spec)
failure_reason == dir_stat.get('failure_reason', {}) and \
snap_name == dir_stat['last_synced_snap']['name'] and \
expected_snap_count == dir_stat['snaps_synced']):
+ self.assert_last_synced_snap_metrics(dir_stat['last_synced_snap'])
break
# remove the directory in the remote fs and check status restores to 'idle'
self.mount_b.run_shell(['sudo', 'rmdir', remote_snap_path], omit_sudo=False)
if('idle' == dir_stat['state'] and 'failure_reason' not in dir_stat and \
snap_name == dir_stat['last_synced_snap']['name'] and \
expected_snap_count == dir_stat['snaps_synced']):
+ self.assert_last_synced_snap_metrics(dir_stat['last_synced_snap'])
break
self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
+ def test_cephfs_mirror_peer_status_last_synced_metrics(self):
+ """Peer status last_synced_snap reports new sync timing and size metrics."""
+ self.setup_mount_b(mds_perm='rw')
+ self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
+ peer_spec = "client.mirror_remote@ceph"
+ self.peer_add(self.primary_fs_name, self.primary_fs_id, peer_spec, self.secondary_fs_name)
+
+ dir_name = 'metrics_dir'
+ self.mount_a.run_shell(['mkdir', dir_name])
+ self.mount_a.create_n_files(f'{dir_name}/file', 50, sync=True)
+ self.add_directory(self.primary_fs_name, self.primary_fs_id, f'/{dir_name}')
+
+ snap_name = 'snap0'
+ self.mount_a.run_shell(['mkdir', f'{dir_name}/.snap/{snap_name}'])
+ self.check_peer_status_idle(self.primary_fs_name, self.primary_fs_id,
+ peer_spec, f'/{dir_name}', snap_name, 1)
+
+ peer_uuid = self.get_peer_uuid(peer_spec)
+ res = self.mirror_daemon_command(f'peer status for fs: {self.primary_fs_name}',
+ 'fs', 'mirror', 'peer', 'status',
+ f'{self.primary_fs_name}@{self.primary_fs_id}',
+ peer_uuid)
+ self.assert_last_synced_snap_metrics(
+ self.peer_dir_status(res, f'/{dir_name}', peer_uuid)['last_synced_snap'])
+ self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
+
+ def test_cephfs_mirror_peer_status_syncing_progress_metrics(self):
+ """Peer status current_syncing_snap reports sync progress metrics."""
+ self.setup_mount_b(mds_perm='rw')
+ self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
+ peer_spec = "client.mirror_remote@ceph"
+ self.peer_add(self.primary_fs_name, self.primary_fs_id, peer_spec, self.secondary_fs_name)
+
+ dir_name = 'd0'
+ self.mount_a.run_shell(['mkdir', dir_name])
+ self.mount_a.create_n_files(f'{dir_name}/file', 3000, sync=True)
+ self.add_directory(self.primary_fs_name, self.primary_fs_id, f'/{dir_name}')
+
+ snap0 = 'snap0'
+ self.mount_a.run_shell(['mkdir', f'{dir_name}/.snap/{snap0}'])
+ self.check_peer_syncing_progress_metrics(
+ self.primary_fs_name, self.primary_fs_id, peer_spec, f'/{dir_name}',
+ snap0, sync_mode='full')
+ self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
+ peer_spec, f'/{dir_name}', snap0, 1)
+
+ self.mount_a.write_n_mb(os.path.join(dir_name, 'file.0'), 1)
+ self.mount_a.create_n_files(f'{dir_name}/snapdiff_file', 3000, sync=True)
+ snap1 = 'snap1'
+ self.mount_a.run_shell(['mkdir', f'{dir_name}/.snap/{snap1}'])
+ self.check_peer_syncing_progress_metrics(
+ self.primary_fs_name, self.primary_fs_id, peer_spec, f'/{dir_name}',
+ snap1, sync_mode='delta')
+ self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
+ peer_spec, f'/{dir_name}', snap1, 2)
+ self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
+
def test_cephfs_mirror_sync_already_existing_snapshots(self):
"""
That mirroring syncs the already existing snapshot correctly.