PERF_COUNTER_KEY_NAME_CEPHFS_MIRROR = "cephfs_mirror"
PERF_COUNTER_KEY_NAME_CEPHFS_MIRROR_FS = "cephfs_mirror_mirrored_filesystems"
PERF_COUNTER_KEY_NAME_CEPHFS_MIRROR_PEER = "cephfs_mirror_peers"
+ PERF_COUNTER_KEY_NAME_CEPHFS_MIRROR_DIRECTORY = "cephfs_mirror_directory"
def setUp(self):
super(TestMirroring, self).setUp()
self.assertIn('metrics', res)
return res['metrics'][dir_name]['peer'][peer_uuid]
+ def directory_perf_entry(self, res, dir_path, peer_uuid):
+ for entry in res.get(self.PERF_COUNTER_KEY_NAME_CEPHFS_MIRROR_DIRECTORY, []):
+ labels = entry.get('labels', {})
+ if labels.get('directory') == dir_path and labels.get('peer_uuid') == peer_uuid:
+ return entry
+ return None
+
+ def get_directory_perf_counters(self, dir_path, peer_uuid):
+ res = self.mirror_daemon_command(
+ f'counter dump for fs: {self.primary_fs_name}', 'counter', 'dump')
+ entry = self.directory_perf_entry(res, dir_path, peer_uuid)
+ self.assertIsNotNone(entry,
+ msg=f'missing {self.PERF_COUNTER_KEY_NAME_CEPHFS_MIRROR_DIRECTORY} '
+ f'for {dir_path}')
+ return entry['counters']
+
+ def assert_directory_perf_labels(self, labels, dir_path, peer_uuid):
+ self.assertEqual(labels['source_fscid'], str(self.primary_fs_id))
+ self.assertEqual(labels['source_filesystem'], self.primary_fs_name)
+ self.assertEqual(labels['peer_uuid'], peer_uuid)
+ self.assertEqual(labels['directory'], dir_path)
+ self.assertEqual(labels['peer_cluster_name'], 'ceph')
+ self.assertEqual(labels['peer_cluster_filesystem'], self.secondary_fs_name)
+
+ def assert_idle_directory_last_sync_perf(self, counters):
+ self.assertEqual(counters['dir_state'], 0)
+ self.assertEqual(counters['current_snap_id'], 0)
+ self.assertGreater(counters['last_snap_id'], 0)
+ self.assertGreater(counters['last_sync_bytes'], 0)
+ self.assertGreater(counters['last_sync_files'], 0)
+ self.assertGreaterEqual(counters['last_sync_duration_seconds'], 0)
+ self.assertGreaterEqual(counters['snaps_synced'], 1)
+
def assert_last_synced_snap_metrics(self, last_synced_snap):
for key in ('crawl_duration', 'datasync_queue_wait_duration', 'sync_duration',
'sync_time_stamp', 'sync_bytes', 'sync_files'):
e.res = res
raise
+ @retry_assert(timeout=120, interval=1)
+ def check_directory_perf_syncing(self, fs_name, fs_id, dir_path, peer_spec,
+ sync_mode=None):
+ peer_uuid = self.get_peer_uuid(peer_spec)
+ res = self.mirror_daemon_command(f'counter dump for fs: {fs_name}',
+ 'counter', 'dump')
+ try:
+ entry = self.directory_perf_entry(res, dir_path, peer_uuid)
+ self.assertIsNotNone(entry)
+ counters = entry['counters']
+ self.assertEqual(counters['dir_state'], 1)
+ self.assertGreater(counters['current_snap_id'], 0)
+ if sync_mode == 'full':
+ self.assertEqual(counters['current_sync_mode'], 0)
+ elif sync_mode == 'delta':
+ self.assertEqual(counters['current_sync_mode'], 1)
+ self.assertIn(counters['crawl_state'], (1, 2))
+ self.assertGreater(counters['current_total_files'], 0)
+ if counters['current_total_bytes'] > 0:
+ self.assertLessEqual(counters['current_sync_bytes_percent'], 10000)
+ except RETRY_EXCEPTIONS as e:
+ e.res = res
+ raise
+
@retry_assert(timeout=60, interval=3)
def check_peer_status_empty(self, fs_name, fs_id, peer_spec):
peer_uuid = self.get_peer_uuid(peer_spec)
self.assertGreaterEqual(second["counters"]["last_synced_end"], second["counters"]["last_synced_start"])
self.assertGreater(second["counters"]["last_synced_duration"], 0)
self.assertEqual(second["counters"]["last_synced_bytes"], 1048576000) # last_synced_bytes = 10 files of 1024MB size each
+ peer_uuid = self.get_peer_uuid("client.mirror_remote@ceph")
+ dir_second = self.get_directory_perf_counters('/d0', peer_uuid)
+ self.assertEqual(dir_second['snaps_synced'], second["counters"]["snaps_synced"])
# some more IO
for i in range(15):
res = self.mirror_daemon_command(f'counter dump for fs: {self.primary_fs_name}', 'counter', 'dump')
fourth = res[TestMirroring.PERF_COUNTER_KEY_NAME_CEPHFS_MIRROR_PEER][0]
self.assertGreater(fourth["counters"]["snaps_deleted"], third["counters"]["snaps_deleted"])
+ dir_fourth = self.get_directory_perf_counters('/d0', peer_uuid)
+ self.assertGreater(dir_fourth['snaps_deleted'], dir_second['snaps_deleted'])
# rename a snapshot
self.mount_a.run_shell(["mv", "d0/.snap/snap1", "d0/.snap/snap2"])
res = self.mirror_daemon_command(f'counter dump for fs: {self.primary_fs_name}', 'counter', 'dump')
fifth = res[TestMirroring.PERF_COUNTER_KEY_NAME_CEPHFS_MIRROR_PEER][0]
self.assertGreater(fifth["counters"]["snaps_renamed"], fourth["counters"]["snaps_renamed"])
+ dir_fifth = self.get_directory_perf_counters('/d0', peer_uuid)
+ self.assertGreater(dir_fifth['snaps_renamed'], dir_fourth['snaps_renamed'])
self.remove_directory(self.primary_fs_name, self.primary_fs_id, '/d0')
self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
snap_name == dir_stat['last_synced_snap']['name'] and \
expected_snap_count == dir_stat['snaps_synced']):
self.assert_last_synced_snap_metrics(dir_stat['last_synced_snap'])
+ self.assertEqual(
+ self.get_directory_perf_counters(dir_name, peer_uuid)['dir_state'], 2)
break
# remove the directory in the remote fs and check status restores to 'idle'
self.mount_b.run_shell(['sudo', 'rmdir', remote_snap_path], omit_sudo=False)
peer_spec, f'/{dir_name}', snap1, 2)
self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
+ def test_cephfs_mirror_directory_perf_counters_registration(self):
+ """counter dump exposes cephfs_mirror_directory per mirrored path."""
+ self.setup_mount_b(mds_perm='rw')
+ self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
+ peer_spec = "client.mirror_remote@ceph"
+ self.peer_add(self.primary_fs_name, self.primary_fs_id, peer_spec, self.secondary_fs_name)
+ self.mount_a.run_shell(['mkdir', 'd0', 'd1'])
+ self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d0')
+ self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d1', check_perf_counter=False)
+
+ peer_uuid = self.get_peer_uuid(peer_spec)
+ res = self.mirror_daemon_command(f'counter dump for fs: {self.primary_fs_name}',
+ 'counter', 'dump')
+ self.assertIn(self.PERF_COUNTER_KEY_NAME_CEPHFS_MIRROR_DIRECTORY, res)
+ dirs = {e['labels']['directory']
+ for e in res[self.PERF_COUNTER_KEY_NAME_CEPHFS_MIRROR_DIRECTORY]
+ if e['labels'].get('peer_uuid') == peer_uuid}
+ self.assertEqual(dirs, {'/d0', '/d1'})
+ self.assert_directory_perf_labels(
+ self.directory_perf_entry(res, '/d0', peer_uuid)['labels'], '/d0', peer_uuid)
+
+ self.remove_directory(self.primary_fs_name, self.primary_fs_id, '/d0')
+ with safe_while(sleep=2, tries=30,
+ action='wait for directory perf counter removal') as proceed:
+ while proceed():
+ res = self.mirror_daemon_command(
+ f'counter dump for fs: {self.primary_fs_name}', 'counter', 'dump')
+ if self.directory_perf_entry(res, '/d0', peer_uuid) is None:
+ break
+ res = self.mirror_daemon_command(f'counter dump for fs: {self.primary_fs_name}',
+ 'counter', 'dump')
+ self.assertIsNotNone(self.directory_perf_entry(res, '/d1', peer_uuid))
+ self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
+
+ def test_cephfs_mirror_directory_perf_counters_syncing(self):
+ """counter dump current_* gauges update while a snapshot is syncing."""
+ self.setup_mount_b(mds_perm='rw')
+ self.config_set('client.mirror', 'cephfs_mirror_tick_interval', 1)
+ self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
+ peer_spec = "client.mirror_remote@ceph"
+ self.peer_add(self.primary_fs_name, self.primary_fs_id, peer_spec, self.secondary_fs_name)
+
+ dir_name = 'd0'
+ self.mount_a.run_shell(['mkdir', dir_name])
+ self.mount_a.create_n_files(f'{dir_name}/file', 3000, sync=True)
+ self.add_directory(self.primary_fs_name, self.primary_fs_id, f'/{dir_name}')
+
+ snap0 = 'snap0'
+ self.mount_a.run_shell(['mkdir', f'{dir_name}/.snap/{snap0}'])
+ self.check_directory_perf_syncing(
+ self.primary_fs_name, self.primary_fs_id, f'/{dir_name}', peer_spec,
+ sync_mode='full')
+ self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
+ peer_spec, f'/{dir_name}', snap0, 1)
+
+ self.mount_a.write_n_mb(os.path.join(dir_name, 'file.0'), 1)
+ self.mount_a.create_n_files(f'{dir_name}/snapdiff_file', 3000, sync=True)
+ snap1 = 'snap1'
+ self.mount_a.run_shell(['mkdir', f'{dir_name}/.snap/{snap1}'])
+ self.check_directory_perf_syncing(
+ self.primary_fs_name, self.primary_fs_id, f'/{dir_name}', peer_spec,
+ sync_mode='delta')
+ self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
+ peer_spec, f'/{dir_name}', snap1, 2)
+
+ peer_uuid = self.get_peer_uuid(peer_spec)
+ idle = self.get_directory_perf_counters(f'/{dir_name}', peer_uuid)
+ self.assertEqual(idle['dir_state'], 0)
+ self.assertEqual(idle['current_snap_id'], 0)
+ self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
+
+ def test_cephfs_mirror_directory_perf_counters_last_sync_and_summary(self):
+ """counter dump last_* and snaps_* gauges reflect completed directory sync."""
+ self.setup_mount_b(mds_perm='rw')
+ self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
+ peer_spec = "client.mirror_remote@ceph"
+ self.peer_add(self.primary_fs_name, self.primary_fs_id, peer_spec, self.secondary_fs_name)
+
+ dir_name = 'metrics_dir'
+ self.mount_a.run_shell(['mkdir', dir_name])
+ self.mount_a.create_n_files(f'{dir_name}/file', 50, sync=True)
+ self.add_directory(self.primary_fs_name, self.primary_fs_id, f'/{dir_name}')
+
+ snap_name = 'snap0'
+ self.mount_a.run_shell(['mkdir', f'{dir_name}/.snap/{snap_name}'])
+ self.check_peer_status_idle(self.primary_fs_name, self.primary_fs_id,
+ peer_spec, f'/{dir_name}', snap_name, 1)
+
+ peer_uuid = self.get_peer_uuid(peer_spec)
+ self.assert_idle_directory_last_sync_perf(
+ self.get_directory_perf_counters(f'/{dir_name}', peer_uuid))
+ self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
+
def test_cephfs_mirror_sync_already_existing_snapshots(self):
"""
That mirroring syncs the already existing snapshot correctly.