From: Karthik U S Date: Sat, 16 May 2026 03:15:50 +0000 (+0530) Subject: qa/test_mirroring: Add tests for mirroring checkpoints X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=fe7fa7b0761720181cf8735ac7b5f17080c1d8a7;p=ceph.git qa/test_mirroring: Add tests for mirroring checkpoints Adding integration tests for validating the cephfs mirroring checkpoints feature Fixes: https://tracker.ceph.com/issues/73454 Signed-off-by: Karthik U S --- diff --git a/qa/tasks/cephfs/test_mirroring.py b/qa/tasks/cephfs/test_mirroring.py index 0dd1285b178..38522206b7c 100644 --- a/qa/tasks/cephfs/test_mirroring.py +++ b/qa/tasks/cephfs/test_mirroring.py @@ -370,6 +370,22 @@ class TestMirroring(CephFSTestCase): f'for {dir_path}') return entry['counters'] + def get_peer_perf_counters(self): + res = self.mirror_daemon_command( + f'counter dump for fs: {self.primary_fs_name}', 'counter', 'dump') + self.assertIn(self.PERF_COUNTER_KEY_NAME_CEPHFS_MIRROR_PEER, res) + self.assertGreater(len(res[self.PERF_COUNTER_KEY_NAME_CEPHFS_MIRROR_PEER]), 0) + return res[self.PERF_COUNTER_KEY_NAME_CEPHFS_MIRROR_PEER][0]['counters'] + + @retry_assert(timeout=60, interval=1) + def wait_peer_add_directory_counter(self, expected): + counters = self.get_peer_perf_counters() + self.assertIn('add_directory', counters) + actual = counters['add_directory'] + if actual != expected: + raise AssertionError( + f'expected add_directory={expected}, got {actual}') + def assert_directory_perf_labels(self, labels, dir_path, peer_uuid): self.assertEqual(labels['source_fscid'], str(self.primary_fs_id)) self.assertEqual(labels['source_filesystem'], self.primary_fs_name) @@ -717,6 +733,27 @@ class TestMirroring(CephFSTestCase): e.res = res raise + @retry_assert(timeout=120, interval=1) + def stop_daemon_when_peer_snap_in_progress(self, fs_name, fs_id, + peer_spec, dir_name, snap_name): + peer_uuid = self.get_peer_uuid(peer_spec) + res = self.mirror_daemon_command(f'peer status for fs: {fs_name}', + 'fs', 'mirror', 'peer', 'status', + f'{fs_name}@{fs_id}', peer_uuid) + try: + dir_stat = self.peer_dir_status(res, dir_name, peer_uuid) + if dir_stat.get('last_synced_snap', {}).get('name') == snap_name: + raise RuntimeError( + f'snapshot {snap_name!r} synced before caps could be restricted') + self.assertTrue('syncing' == dir_stat['state']) + self.assertTrue(dir_stat['current_syncing_snap']['name'] == snap_name) + except RETRY_EXCEPTIONS as e: + e.res = res + raise + + log.debug('peer syncing %s, stopping mirror daemon', snap_name) + self.stop_mirror_daemon() + def verify_snapshot(self, dir_name, snap_name): snap_list = self.mount_b.ls(path=f'{dir_name}/.snap') self.assertTrue(snap_name in snap_list) @@ -802,6 +839,117 @@ class TestMirroring(CephFSTestCase): log.debug(f'status: {status}') return status + def checkpoint_list(self, fs_name, dir_path): + return json.loads(self.get_ceph_cmd_stdout( + "fs", "snapshot", "mirror", "checkpoint", "ls", + fs_name, dir_path)) + + def checkpoint_add(self, fs_name, dir_path, snap_name): + out = json.loads(self.get_ceph_cmd_stdout( + "fs", "snapshot", "mirror", "checkpoint", "add", + fs_name, dir_path, snap_name)) + self.assertEqual(out['status'], 'success') + return out + + def checkpoint_remove(self, fs_name, dir_path, snap_name): + out = json.loads(self.get_ceph_cmd_stdout( + "fs", "snapshot", "mirror", "checkpoint", "remove", + fs_name, dir_path, snap_name)) + self.assertEqual(out['status'], 'success') + return out + + def checkpoint_now(self, fs_name, dir_path): + out = json.loads(self.get_ceph_cmd_stdout( + "fs", "snapshot", "mirror", "checkpoint", "now", + fs_name, dir_path)) + self.assertEqual(out['status'], 'success') + return out + + @staticmethod + def find_checkpoint(checkpoints, snap_name): + for cp in checkpoints: + if cp['snap_name'] == snap_name: + return cp + return None + + def assert_checkpoint_listed(self, fs_name, dir_path, snap_name): + res = self.checkpoint_list(fs_name, dir_path) + cp = self.find_checkpoint(res['checkpoints'], snap_name) + self.assertIsNotNone(cp, f'checkpoint {snap_name} should be listed') + + def assert_checkpoint_not_listed(self, fs_name, dir_path, snap_name): + res = self.checkpoint_list(fs_name, dir_path) + cp = self.find_checkpoint(res['checkpoints'], snap_name) + self.assertIsNone(cp, f'checkpoint {snap_name} should not be listed') + + def start_mirror_daemon(self): + self.mount_a.run_shell_payload( + 'nohup cephfs-mirror --id mirror /dev/null 2>&1 &') + + @retry_assert(timeout=60, interval=2) + def wait_ready(): + res = self.mirror_daemon_command( + f'counter dump for fs: {self.primary_fs_name}', 'counter', 'dump') + self.assertIn(TestMirroring.PERF_COUNTER_KEY_NAME_CEPHFS_MIRROR_FS, res) + + wait_ready() + + def stop_mirror_daemon(self, wait=5): + pid = self.get_mirror_daemon_pid() + log.debug(f'stopping cephfs-mirror (pid={pid})') + self.mount_a.run_shell_payload(f'kill -TERM {pid} || true') + time.sleep(wait) + + def restart_mirroring_module(self, wait=10): + log.debug('restarting mirroring mgr module') + self.run_ceph_cmd("mgr", "module", "disable", self.MODULE_NAME) + time.sleep(2) + self.run_ceph_cmd("mgr", "module", "enable", self.MODULE_NAME) + time.sleep(wait) + + def _setup_mirrored_directory(self, dir_path, peer_spec=None, mount_b=False): + """Enable mirroring and track a directory; optionally mount backup FS and add peer.""" + if mount_b: + self.setup_mount_b(mds_perm='rw') + self.mount_a.run_shell(["mkdir", "-p", dir_path.lstrip('/')]) + self.enable_mirroring(self.primary_fs_name, self.primary_fs_id) + self.add_directory(self.primary_fs_name, self.primary_fs_id, dir_path) + if peer_spec: + self.peer_add(self.primary_fs_name, self.primary_fs_id, peer_spec, + self.secondary_fs_name) + + def _add_checkpoint_snapshot(self, dir_path, snap_name, verify_listed=False): + """Create a snapshot and add a checkpoint on it.""" + self.mount_a.run_shell(["mkdir", f"{dir_path.lstrip('/')}/.snap/{snap_name}"]) + self.checkpoint_add(self.primary_fs_name, dir_path, snap_name) + if verify_listed: + self.assert_checkpoint_listed(self.primary_fs_name, dir_path, snap_name) + + def _setup_checkpoint_dir(self, dir_path, snap_name, peer_spec=None, mount_b=True): + """Create a tracked directory with a checkpointed snapshot.""" + self._setup_mirrored_directory(dir_path, peer_spec=peer_spec, mount_b=mount_b) + self._add_checkpoint_snapshot(dir_path, snap_name, verify_listed=True) + + def _teardown_mirroring(self, dir_path, peer_spec=None): + """Untrack directory, remove peer, and disable mirroring on the filesystem.""" + self.remove_directory(self.primary_fs_name, self.primary_fs_id, dir_path) + if peer_spec: + self.peer_remove(self.primary_fs_name, self.primary_fs_id, peer_spec) + self.disable_mirroring(self.primary_fs_name, self.primary_fs_id) + + @retry_assert(timeout=600, interval=10) + def check_checkpoint_status(self, fs_name, dir_path, snap_name, expected_status): + res = self.checkpoint_list(fs_name, dir_path) + cp = self.find_checkpoint(res['checkpoints'], snap_name) + if cp is None or cp['status'] != expected_status: + raise AssertionError( + f'checkpoint {snap_name}: expected status {expected_status}, ' + f'got {None if cp is None else cp["status"]!r}') + + def check_checkpoint_statuses(self, fs_name, dir_path, snap_names, expected_status): + for snap_name in snap_names: + self.check_checkpoint_status(fs_name, dir_path, snap_name, expected_status) + def setup_mount_b(self, mds_perm): log.debug('reconfigure client auth caps') self.get_ceph_cmd_result( @@ -813,6 +961,29 @@ class TestMirroring(CephFSTestCase): log.debug(f'mounting filesystem {self.secondary_fs_name}') self.mount_b.mount_wait(cephfs_name=self.secondary_fs_name) + def _restrict_mirror_remote_caps(self): + """Restrict mirror remote client OSD write to force snapshot sync failure. + + The remote mirrored directory must already exist (created while the + client still has OSD write). OSD read-only blocks file data transfer + during snapshot sync, where checkpoint_sync_failed() is recorded. + """ + self.get_ceph_cmd_result( + 'auth', 'caps', 'client.mirror_remote', + 'mds', 'allow rwps', + 'mon', 'allow r', + 'osd', "allow r tag cephfs *=*", + 'mgr', 'allow r') + + def _restore_mirror_remote_caps(self): + """Restore mirror remote client caps used by the mirror test suite.""" + self.get_ceph_cmd_result( + 'auth', 'caps', 'client.mirror_remote', + 'mds', 'allow rwps', + 'mon', 'allow r', + 'osd', "allow rw tag cephfs *=*", + 'mgr', 'allow r') + def test_basic_mirror_commands(self): self.enable_mirroring(self.primary_fs_name, self.primary_fs_id) self.disable_mirroring(self.primary_fs_name, self.primary_fs_id) @@ -960,6 +1131,350 @@ class TestMirroring(CephFSTestCase): self.mount_a.run_shell(["rmdir", dir1]) self.mount_a.run_shell(["rmdir", dir2]) + def test_checkpoint_cli_add_list_remove_now(self): + """Test mgr checkpoint add/list/remove/now on snapshot metadata.""" + dir_path = '/cp_cli' + snap0 = 'snap0' + snap1 = 'snap1' + + self._setup_mirrored_directory(dir_path) + self._add_checkpoint_snapshot(dir_path, snap0) + + res = self.checkpoint_list(self.primary_fs_name, dir_path) + self.assertEqual(res['dir_root'], dir_path) + cp = self.find_checkpoint(res['checkpoints'], snap0) + self.assertIsNotNone(cp) + self.assertEqual(cp['status'], 'created') + self.assertTrue(cp['created_at']) + self.assertTrue(cp['updated_at']) + + self.mount_a.run_shell(["mkdir", f"{dir_path.lstrip('/')}/.snap/{snap1}"]) + out = self.checkpoint_now(self.primary_fs_name, dir_path) + self.assertEqual(out['snap_name'], snap1) + + res = self.checkpoint_list(self.primary_fs_name, dir_path) + self.assertEqual(len(res['checkpoints']), 2) + + self.checkpoint_remove(self.primary_fs_name, dir_path, snap0) + res = self.checkpoint_list(self.primary_fs_name, dir_path) + self.assertEqual(len(res['checkpoints']), 1) + self.assertEqual(res['checkpoints'][0]['snap_name'], snap1) + + self.checkpoint_remove(self.primary_fs_name, dir_path, snap1) + res = self.checkpoint_list(self.primary_fs_name, dir_path) + self.assertEqual(res['checkpoints'], []) + + self._teardown_mirroring(dir_path) + + def test_checkpoint_cli_errors(self): + """Test checkpoint CLI validation errors.""" + dir_path = '/cp_err' + tracked = '/cp_err/tracked' + snap = 'snap0' + + self._setup_mirrored_directory(tracked) + self.mount_a.run_shell(["mkdir", f"{tracked.lstrip('/')}/.snap/{snap}"]) + + try: + self.checkpoint_add(self.primary_fs_name, dir_path, snap) + except CommandFailedError as ce: + if ce.exitstatus != errno.ENOENT: + raise RuntimeError( + f'expected ENOENT for untracked directory, got {ce.exitstatus}') + else: + raise RuntimeError('expected checkpoint add to fail for untracked directory') + + try: + self.checkpoint_remove(self.primary_fs_name, tracked, snap) + except CommandFailedError as ce: + if ce.exitstatus != errno.ENOENT: + raise RuntimeError( + f'expected ENOENT when removing non-checkpoint snap, got {ce.exitstatus}') + else: + raise RuntimeError('expected checkpoint remove to fail') + + self.checkpoint_add(self.primary_fs_name, tracked, snap) + try: + self.checkpoint_add(self.primary_fs_name, tracked, snap) + except CommandFailedError as ce: + if ce.exitstatus != errno.EEXIST: + raise RuntimeError( + f'expected EEXIST when re-adding checkpoint, got {ce.exitstatus}') + else: + raise RuntimeError('expected checkpoint add to fail for duplicate') + self.checkpoint_remove(self.primary_fs_name, tracked, snap) + + self.remove_directory(self.primary_fs_name, self.primary_fs_id, tracked) + self.disable_mirroring(self.primary_fs_name, self.primary_fs_id) + try: + self.checkpoint_list(self.primary_fs_name, tracked) + except CommandFailedError as ce: + if ce.exitstatus != errno.EINVAL: + raise RuntimeError( + f'expected EINVAL with mirroring disabled, got {ce.exitstatus}') + else: + raise RuntimeError('expected checkpoint list to fail') + + def test_checkpoint_sync_status_reaches_complete(self): + """Test mirror daemon updates checkpoint status after snapshot sync.""" + dir_path = '/cp_sync' + snap_name = 'snap0' + peer_spec = "client.mirror_remote@ceph" + + self._setup_mirrored_directory(dir_path, mount_b=True) + self._add_checkpoint_snapshot(dir_path, snap_name) + self.check_checkpoint_status(self.primary_fs_name, dir_path, snap_name, 'created') + + res = self.checkpoint_list(self.primary_fs_name, dir_path) + cp = self.find_checkpoint(res['checkpoints'], snap_name) + created_at = cp['created_at'] + updated_at = cp['updated_at'] + + self.peer_add(self.primary_fs_name, self.primary_fs_id, peer_spec, self.secondary_fs_name) + self.check_peer_status(self.primary_fs_name, self.primary_fs_id, + peer_spec, dir_path, snap_name, 1) + self.verify_snapshot(dir_path.lstrip('/'), snap_name) + self.check_checkpoint_status(self.primary_fs_name, dir_path, snap_name, 'complete') + + res = self.checkpoint_list(self.primary_fs_name, dir_path) + cp = self.find_checkpoint(res['checkpoints'], snap_name) + self.assertEqual(cp['created_at'], created_at) + self.assertTrue(cp['updated_at']) + self.assertNotEqual(cp['updated_at'], updated_at) + + self._teardown_mirroring(dir_path, peer_spec) + + def test_checkpoint_deleted_snapshot_not_listed(self): + """Deleted checkpointed snapshots must not appear in checkpoint ls.""" + dir_path = '/cp_del' + snap_name = 'snap0' + + self._setup_mirrored_directory(dir_path) + self._add_checkpoint_snapshot(dir_path, snap_name) + + res = self.checkpoint_list(self.primary_fs_name, dir_path) + self.assertEqual(len(res['checkpoints']), 1) + + self.mount_a.run_shell(["rmdir", f"{dir_path.lstrip('/')}/.snap/{snap_name}"]) + res = self.checkpoint_list(self.primary_fs_name, dir_path) + self.assertEqual(res['checkpoints'], []) + + self._teardown_mirroring(dir_path) + + def test_checkpoint_renamed_snapshot_shows_new_name(self): + """Renamed checkpointed snapshots must appear under the new name in ls.""" + dir_path = '/cp_rename' + old_name = 'snap0' + new_name = 'snap1' + + self._setup_mirrored_directory(dir_path) + self._add_checkpoint_snapshot(dir_path, old_name) + + self.mount_a.run_shell([ + "mv", + f"{dir_path.lstrip('/')}/.snap/{old_name}", + f"{dir_path.lstrip('/')}/.snap/{new_name}", + ]) + + self.assert_checkpoint_not_listed(self.primary_fs_name, dir_path, old_name) + self.check_checkpoint_status(self.primary_fs_name, dir_path, new_name, 'created') + + self._teardown_mirroring(dir_path) + + def test_checkpoint_add_after_rename_allows_same_snap_name(self): + """A new snapshot can reuse a name after the checkpointed one is renamed.""" + dir_path = '/cp_reuse_name' + old_name = 'snap0' + new_name = 'snap1' + + self._setup_mirrored_directory(dir_path) + self._add_checkpoint_snapshot(dir_path, old_name) + + self.mount_a.run_shell([ + "mv", + f"{dir_path.lstrip('/')}/.snap/{old_name}", + f"{dir_path.lstrip('/')}/.snap/{new_name}", + ]) + self.mount_a.run_shell(["mkdir", f"{dir_path.lstrip('/')}/.snap/{old_name}"]) + self.checkpoint_add(self.primary_fs_name, dir_path, old_name) + + res = self.checkpoint_list(self.primary_fs_name, dir_path) + self.assertEqual(len(res['checkpoints']), 2) + self.check_checkpoint_status(self.primary_fs_name, dir_path, old_name, 'created') + self.check_checkpoint_status(self.primary_fs_name, dir_path, new_name, 'created') + + self._teardown_mirroring(dir_path) + + def test_checkpoint_persisted_across_mirror_daemon_restart(self): + """Checkpoints survive mirror daemon restart and reach complete after sync.""" + dir_path = '/cp_mirror_restart' + snap_name = 'snap0' + peer_spec = "client.mirror_remote@ceph" + + self._setup_checkpoint_dir(dir_path, snap_name) + res = self.checkpoint_list(self.primary_fs_name, dir_path) + self.assertEqual(len(res['checkpoints']), 1) + + self.stop_mirror_daemon() + self.start_mirror_daemon() + + res = self.checkpoint_list(self.primary_fs_name, dir_path) + self.assertEqual(len(res['checkpoints']), 1) + self.check_checkpoint_status(self.primary_fs_name, dir_path, snap_name, 'created') + + self.peer_add(self.primary_fs_name, self.primary_fs_id, peer_spec, self.secondary_fs_name) + self.check_peer_status(self.primary_fs_name, self.primary_fs_id, + peer_spec, dir_path, snap_name, 1) + self.verify_snapshot(dir_path.lstrip('/'), snap_name) + self.check_checkpoint_status(self.primary_fs_name, dir_path, snap_name, 'complete') + + self._teardown_mirroring(dir_path, peer_spec) + + def test_checkpoint_persisted_across_mgr_module_restart(self): + """Checkpoints survive mirroring mgr module restart and reach complete after sync.""" + dir_path = '/cp_mgr_restart' + snap_name = 'snap0' + peer_spec = "client.mirror_remote@ceph" + + self._setup_checkpoint_dir(dir_path, snap_name) + res = self.checkpoint_list(self.primary_fs_name, dir_path) + self.assertEqual(len(res['checkpoints']), 1) + + self.restart_mirroring_module() + + res = self.checkpoint_list(self.primary_fs_name, dir_path) + self.assertEqual(len(res['checkpoints']), 1) + self.check_checkpoint_status(self.primary_fs_name, dir_path, snap_name, 'created') + + self.peer_add(self.primary_fs_name, self.primary_fs_id, peer_spec, self.secondary_fs_name) + self.check_peer_status(self.primary_fs_name, self.primary_fs_id, + peer_spec, dir_path, snap_name, 1) + self.verify_snapshot(dir_path.lstrip('/'), snap_name) + self.check_checkpoint_status(self.primary_fs_name, dir_path, snap_name, 'complete') + + self._teardown_mirroring(dir_path, peer_spec) + + def test_checkpoint_add_on_already_synced_snapshot_is_complete(self): + """Checkpoint added after sync should be marked complete by the daemon.""" + dir_path = '/cp_already_synced' + snap_name = 'snap0' + peer_spec = "client.mirror_remote@ceph" + + self._setup_mirrored_directory(dir_path, peer_spec=peer_spec, mount_b=True) + self.mount_a.run_shell(["mkdir", f"{dir_path.lstrip('/')}/.snap/{snap_name}"]) + + self.check_peer_status(self.primary_fs_name, self.primary_fs_id, + peer_spec, dir_path, snap_name, 1) + self.verify_snapshot(dir_path.lstrip('/'), snap_name) + + self.checkpoint_add(self.primary_fs_name, dir_path, snap_name) + self.check_checkpoint_status(self.primary_fs_name, dir_path, snap_name, 'complete') + + self._teardown_mirroring(dir_path, peer_spec) + + def test_checkpoint_on_synced_snaps_complete_after_daemon_restart(self): + """Checkpoints added on synced snaps while daemon is down reach complete after restart.""" + dir_path = '/cp_daemon_down' + snap_names = [f'snap{i}' for i in range(5)] + checkpoint_snaps = ['snap0', 'snap2', 'snap4'] + peer_spec = "client.mirror_remote@ceph" + + self._setup_mirrored_directory(dir_path, peer_spec=peer_spec, mount_b=True) + + for snap_name in snap_names: + self.mount_a.run_shell(["mkdir", f"{dir_path.lstrip('/')}/.snap/{snap_name}"]) + + self.check_peer_status(self.primary_fs_name, self.primary_fs_id, + peer_spec, dir_path, snap_names[-1], len(snap_names)) + self.verify_snapshot(dir_path.lstrip('/'), snap_names[-1]) + + self.stop_mirror_daemon() + for snap_name in checkpoint_snaps: + self.checkpoint_add(self.primary_fs_name, dir_path, snap_name) + self.check_checkpoint_statuses( + self.primary_fs_name, dir_path, checkpoint_snaps, 'created') + + self.start_mirror_daemon() + self.check_checkpoint_statuses( + self.primary_fs_name, dir_path, checkpoint_snaps, 'complete') + + self._teardown_mirroring(dir_path, peer_spec) + + def test_checkpoint_add_directory_notify_perf_counter(self): + """Each checkpoint add must notify the mirror daemon via add_directory().""" + snap_count = 100 + dir_path = '/cp_add_dir_notify' + dir_name = dir_path.lstrip('/') + peer_spec = "client.mirror_remote@ceph" + + self.enable_mirroring(self.primary_fs_name, self.primary_fs_id) + self.peer_add(self.primary_fs_name, self.primary_fs_id, peer_spec, + self.secondary_fs_name) + add_dir_baseline = self.get_peer_perf_counters()['add_directory'] + + self.mount_a.run_shell(['mkdir', '-p', dir_name]) + self.add_directory(self.primary_fs_name, self.primary_fs_id, dir_path, + check_perf_counter=False) + + for i in range(snap_count): + self.mount_a.run_shell(['touch', f'{dir_name}/file.{i}']) + self.mount_a.run_shell(['mkdir', f'{dir_name}/.snap/snap{i}']) + self.checkpoint_add(self.primary_fs_name, dir_path, f'snap{i}') + + # Expect baseline + 101 add_directory calls: one for the initial + # mirror add directory and one per checkpoint add (each sends an + # acquire notify). + self.wait_peer_add_directory_counter(add_dir_baseline + 1 + snap_count) + + res = self.checkpoint_list(self.primary_fs_name, dir_path) + self.assertEqual(len(res['checkpoints']), snap_count) + + self._teardown_mirroring(dir_path, peer_spec) + + def test_checkpoint_failed_then_complete(self): + """Checkpoint is marked failed on sync error and complete after recovery.""" + dir_path = '/cp_failed' + snap_name = 'snap0' + peer_spec = "client.mirror_remote@ceph" + + self._setup_mirrored_directory(dir_path, mount_b=True) + dir_name = dir_path.lstrip('/') + self.mount_a.create_n_files(f'{dir_name}/file', 10000, sync=True) + for i in range(20): + self.mount_a.write_n_mb(os.path.join(dir_name, f'large_file.{i}'), 100) + self._add_checkpoint_snapshot(dir_path, snap_name) + self.check_checkpoint_status(self.primary_fs_name, dir_path, snap_name, 'created') + + # peer_add needs write access on the remote MDS (setxattr on ceph.mirror.info). + # Let the daemon create the remote dir with full caps, then stop it as soon as + # sync starts, restrict OSD write, and start again (checkpoint_sync_failed() path). + self.run_ceph_cmd("fs", "snapshot", "mirror", "peer_add", + self.primary_fs_name, peer_spec, self.secondary_fs_name) + self.stop_daemon_when_peer_snap_in_progress(self.primary_fs_name, self.primary_fs_id, + peer_spec, dir_path, snap_name) + self._restrict_mirror_remote_caps() + self.start_mirror_daemon() + + self.check_checkpoint_status(self.primary_fs_name, dir_path, snap_name, 'failed') + res = self.checkpoint_list(self.primary_fs_name, dir_path) + cp = self.find_checkpoint(res['checkpoints'], snap_name) + self.assertIn('error_msg', cp) + self.assertTrue(cp['error_msg']) + + self._restore_mirror_remote_caps() + self.restart_mirror_daemon() + + self.check_peer_status(self.primary_fs_name, self.primary_fs_id, + peer_spec, dir_path, snap_name, 1) + self.verify_snapshot(dir_path.lstrip('/'), snap_name) + self.check_checkpoint_status(self.primary_fs_name, dir_path, snap_name, 'complete') + + res = self.checkpoint_list(self.primary_fs_name, dir_path) + cp = self.find_checkpoint(res['checkpoints'], snap_name) + self.assertNotIn('error_msg', cp) + + self._teardown_mirroring(dir_path, peer_spec) + def test_add_relative_directory_path(self): self.enable_mirroring(self.primary_fs_name, self.primary_fs_id) try: @@ -2835,6 +3350,8 @@ class TestMirroring(CephFSTestCase): sync_dir = 'mgr_nc_sync' self.mount_a.run_shell(['mkdir', sync_dir]) self.mount_a.create_n_files(f'{sync_dir}/file', 10000, sync=True) + for i in range(20): + self.mount_a.write_n_mb(os.path.join(sync_dir, f'large_file.{i}'), 100) self.add_directory(self.primary_fs_name, self.primary_fs_id, f'/{sync_dir}') snap_name = 'snap0'