]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
qa/test_mirroring: Add tests for mirroring checkpoints 69081/head
authorKarthik U S <karthik.u.s1@ibm.com>
Sat, 16 May 2026 03:15:50 +0000 (08:45 +0530)
committerKarthik U S <karthik.u.s1@ibm.com>
Thu, 2 Jul 2026 09:19:04 +0000 (14:49 +0530)
Adding integration tests for validating the cephfs mirroring
checkpoints feature

Fixes: https://tracker.ceph.com/issues/73454
Signed-off-by: Karthik U S <karthik.u.s1@ibm.com>
qa/tasks/cephfs/test_mirroring.py

index 0dd1285b178d15e63ae4c123629c8251c8418573..38522206b7cf5811a250d145e864ddae4ab6312b 100644 (file)
@@ -370,6 +370,22 @@ class TestMirroring(CephFSTestCase):
                                  f'for {dir_path}')
         return entry['counters']
 
+    def get_peer_perf_counters(self):
+        res = self.mirror_daemon_command(
+            f'counter dump for fs: {self.primary_fs_name}', 'counter', 'dump')
+        self.assertIn(self.PERF_COUNTER_KEY_NAME_CEPHFS_MIRROR_PEER, res)
+        self.assertGreater(len(res[self.PERF_COUNTER_KEY_NAME_CEPHFS_MIRROR_PEER]), 0)
+        return res[self.PERF_COUNTER_KEY_NAME_CEPHFS_MIRROR_PEER][0]['counters']
+
+    @retry_assert(timeout=60, interval=1)
+    def wait_peer_add_directory_counter(self, expected):
+        counters = self.get_peer_perf_counters()
+        self.assertIn('add_directory', counters)
+        actual = counters['add_directory']
+        if actual != expected:
+            raise AssertionError(
+                f'expected add_directory={expected}, got {actual}')
+
     def assert_directory_perf_labels(self, labels, dir_path, peer_uuid):
         self.assertEqual(labels['source_fscid'], str(self.primary_fs_id))
         self.assertEqual(labels['source_filesystem'], self.primary_fs_name)
@@ -717,6 +733,27 @@ class TestMirroring(CephFSTestCase):
             e.res = res
             raise
 
+    @retry_assert(timeout=120, interval=1)
+    def stop_daemon_when_peer_snap_in_progress(self, fs_name, fs_id,
+                                               peer_spec, dir_name, snap_name):
+        peer_uuid = self.get_peer_uuid(peer_spec)
+        res = self.mirror_daemon_command(f'peer status for fs: {fs_name}',
+                                         'fs', 'mirror', 'peer', 'status',
+                                         f'{fs_name}@{fs_id}', peer_uuid)
+        try:
+            dir_stat = self.peer_dir_status(res, dir_name, peer_uuid)
+            if dir_stat.get('last_synced_snap', {}).get('name') == snap_name:
+                raise RuntimeError(
+                    f'snapshot {snap_name!r} synced before caps could be restricted')
+            self.assertTrue('syncing' == dir_stat['state'])
+            self.assertTrue(dir_stat['current_syncing_snap']['name'] == snap_name)
+        except RETRY_EXCEPTIONS as e:
+            e.res = res
+            raise
+
+        log.debug('peer syncing %s, stopping mirror daemon', snap_name)
+        self.stop_mirror_daemon()
+
     def verify_snapshot(self, dir_name, snap_name):
         snap_list = self.mount_b.ls(path=f'{dir_name}/.snap')
         self.assertTrue(snap_name in snap_list)
@@ -802,6 +839,117 @@ class TestMirroring(CephFSTestCase):
         log.debug(f'status: {status}')
         return status
 
+    def checkpoint_list(self, fs_name, dir_path):
+        return json.loads(self.get_ceph_cmd_stdout(
+            "fs", "snapshot", "mirror", "checkpoint", "ls",
+            fs_name, dir_path))
+
+    def checkpoint_add(self, fs_name, dir_path, snap_name):
+        out = json.loads(self.get_ceph_cmd_stdout(
+            "fs", "snapshot", "mirror", "checkpoint", "add",
+            fs_name, dir_path, snap_name))
+        self.assertEqual(out['status'], 'success')
+        return out
+
+    def checkpoint_remove(self, fs_name, dir_path, snap_name):
+        out = json.loads(self.get_ceph_cmd_stdout(
+            "fs", "snapshot", "mirror", "checkpoint", "remove",
+            fs_name, dir_path, snap_name))
+        self.assertEqual(out['status'], 'success')
+        return out
+
+    def checkpoint_now(self, fs_name, dir_path):
+        out = json.loads(self.get_ceph_cmd_stdout(
+            "fs", "snapshot", "mirror", "checkpoint", "now",
+            fs_name, dir_path))
+        self.assertEqual(out['status'], 'success')
+        return out
+
+    @staticmethod
+    def find_checkpoint(checkpoints, snap_name):
+        for cp in checkpoints:
+            if cp['snap_name'] == snap_name:
+                return cp
+        return None
+
+    def assert_checkpoint_listed(self, fs_name, dir_path, snap_name):
+        res = self.checkpoint_list(fs_name, dir_path)
+        cp = self.find_checkpoint(res['checkpoints'], snap_name)
+        self.assertIsNotNone(cp, f'checkpoint {snap_name} should be listed')
+
+    def assert_checkpoint_not_listed(self, fs_name, dir_path, snap_name):
+        res = self.checkpoint_list(fs_name, dir_path)
+        cp = self.find_checkpoint(res['checkpoints'], snap_name)
+        self.assertIsNone(cp, f'checkpoint {snap_name} should not be listed')
+
+    def start_mirror_daemon(self):
+        self.mount_a.run_shell_payload(
+            'nohup cephfs-mirror --id mirror </dev/null >/dev/null 2>&1 &')
+
+        @retry_assert(timeout=60, interval=2)
+        def wait_ready():
+            res = self.mirror_daemon_command(
+                f'counter dump for fs: {self.primary_fs_name}', 'counter', 'dump')
+            self.assertIn(TestMirroring.PERF_COUNTER_KEY_NAME_CEPHFS_MIRROR_FS, res)
+
+        wait_ready()
+
+    def stop_mirror_daemon(self, wait=5):
+        pid = self.get_mirror_daemon_pid()
+        log.debug(f'stopping cephfs-mirror (pid={pid})')
+        self.mount_a.run_shell_payload(f'kill -TERM {pid} || true')
+        time.sleep(wait)
+
+    def restart_mirroring_module(self, wait=10):
+        log.debug('restarting mirroring mgr module')
+        self.run_ceph_cmd("mgr", "module", "disable", self.MODULE_NAME)
+        time.sleep(2)
+        self.run_ceph_cmd("mgr", "module", "enable", self.MODULE_NAME)
+        time.sleep(wait)
+
+    def _setup_mirrored_directory(self, dir_path, peer_spec=None, mount_b=False):
+        """Enable mirroring and track a directory; optionally mount backup FS and add peer."""
+        if mount_b:
+            self.setup_mount_b(mds_perm='rw')
+        self.mount_a.run_shell(["mkdir", "-p", dir_path.lstrip('/')])
+        self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
+        self.add_directory(self.primary_fs_name, self.primary_fs_id, dir_path)
+        if peer_spec:
+            self.peer_add(self.primary_fs_name, self.primary_fs_id, peer_spec,
+                          self.secondary_fs_name)
+
+    def _add_checkpoint_snapshot(self, dir_path, snap_name, verify_listed=False):
+        """Create a snapshot and add a checkpoint on it."""
+        self.mount_a.run_shell(["mkdir", f"{dir_path.lstrip('/')}/.snap/{snap_name}"])
+        self.checkpoint_add(self.primary_fs_name, dir_path, snap_name)
+        if verify_listed:
+            self.assert_checkpoint_listed(self.primary_fs_name, dir_path, snap_name)
+
+    def _setup_checkpoint_dir(self, dir_path, snap_name, peer_spec=None, mount_b=True):
+        """Create a tracked directory with a checkpointed snapshot."""
+        self._setup_mirrored_directory(dir_path, peer_spec=peer_spec, mount_b=mount_b)
+        self._add_checkpoint_snapshot(dir_path, snap_name, verify_listed=True)
+
+    def _teardown_mirroring(self, dir_path, peer_spec=None):
+        """Untrack directory, remove peer, and disable mirroring on the filesystem."""
+        self.remove_directory(self.primary_fs_name, self.primary_fs_id, dir_path)
+        if peer_spec:
+            self.peer_remove(self.primary_fs_name, self.primary_fs_id, peer_spec)
+        self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
+
+    @retry_assert(timeout=600, interval=10)
+    def check_checkpoint_status(self, fs_name, dir_path, snap_name, expected_status):
+        res = self.checkpoint_list(fs_name, dir_path)
+        cp = self.find_checkpoint(res['checkpoints'], snap_name)
+        if cp is None or cp['status'] != expected_status:
+            raise AssertionError(
+                f'checkpoint {snap_name}: expected status {expected_status}, '
+                f'got {None if cp is None else cp["status"]!r}')
+
+    def check_checkpoint_statuses(self, fs_name, dir_path, snap_names, expected_status):
+        for snap_name in snap_names:
+            self.check_checkpoint_status(fs_name, dir_path, snap_name, expected_status)
+
     def setup_mount_b(self, mds_perm):
         log.debug('reconfigure client auth caps')
         self.get_ceph_cmd_result(
@@ -813,6 +961,29 @@ class TestMirroring(CephFSTestCase):
         log.debug(f'mounting filesystem {self.secondary_fs_name}')
         self.mount_b.mount_wait(cephfs_name=self.secondary_fs_name)
 
+    def _restrict_mirror_remote_caps(self):
+        """Restrict mirror remote client OSD write to force snapshot sync failure.
+
+        The remote mirrored directory must already exist (created while the
+        client still has OSD write).  OSD read-only blocks file data transfer
+        during snapshot sync, where checkpoint_sync_failed() is recorded.
+        """
+        self.get_ceph_cmd_result(
+            'auth', 'caps', 'client.mirror_remote',
+            'mds', 'allow rwps',
+            'mon', 'allow r',
+            'osd', "allow r tag cephfs *=*",
+            'mgr', 'allow r')
+
+    def _restore_mirror_remote_caps(self):
+        """Restore mirror remote client caps used by the mirror test suite."""
+        self.get_ceph_cmd_result(
+            'auth', 'caps', 'client.mirror_remote',
+            'mds', 'allow rwps',
+            'mon', 'allow r',
+            'osd', "allow rw tag cephfs *=*",
+            'mgr', 'allow r')
+
     def test_basic_mirror_commands(self):
         self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
         self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
@@ -960,6 +1131,350 @@ class TestMirroring(CephFSTestCase):
         self.mount_a.run_shell(["rmdir", dir1])
         self.mount_a.run_shell(["rmdir",  dir2])
 
+    def test_checkpoint_cli_add_list_remove_now(self):
+        """Test mgr checkpoint add/list/remove/now on snapshot metadata."""
+        dir_path = '/cp_cli'
+        snap0 = 'snap0'
+        snap1 = 'snap1'
+
+        self._setup_mirrored_directory(dir_path)
+        self._add_checkpoint_snapshot(dir_path, snap0)
+
+        res = self.checkpoint_list(self.primary_fs_name, dir_path)
+        self.assertEqual(res['dir_root'], dir_path)
+        cp = self.find_checkpoint(res['checkpoints'], snap0)
+        self.assertIsNotNone(cp)
+        self.assertEqual(cp['status'], 'created')
+        self.assertTrue(cp['created_at'])
+        self.assertTrue(cp['updated_at'])
+
+        self.mount_a.run_shell(["mkdir", f"{dir_path.lstrip('/')}/.snap/{snap1}"])
+        out = self.checkpoint_now(self.primary_fs_name, dir_path)
+        self.assertEqual(out['snap_name'], snap1)
+
+        res = self.checkpoint_list(self.primary_fs_name, dir_path)
+        self.assertEqual(len(res['checkpoints']), 2)
+
+        self.checkpoint_remove(self.primary_fs_name, dir_path, snap0)
+        res = self.checkpoint_list(self.primary_fs_name, dir_path)
+        self.assertEqual(len(res['checkpoints']), 1)
+        self.assertEqual(res['checkpoints'][0]['snap_name'], snap1)
+
+        self.checkpoint_remove(self.primary_fs_name, dir_path, snap1)
+        res = self.checkpoint_list(self.primary_fs_name, dir_path)
+        self.assertEqual(res['checkpoints'], [])
+
+        self._teardown_mirroring(dir_path)
+
+    def test_checkpoint_cli_errors(self):
+        """Test checkpoint CLI validation errors."""
+        dir_path = '/cp_err'
+        tracked = '/cp_err/tracked'
+        snap = 'snap0'
+
+        self._setup_mirrored_directory(tracked)
+        self.mount_a.run_shell(["mkdir", f"{tracked.lstrip('/')}/.snap/{snap}"])
+
+        try:
+            self.checkpoint_add(self.primary_fs_name, dir_path, snap)
+        except CommandFailedError as ce:
+            if ce.exitstatus != errno.ENOENT:
+                raise RuntimeError(
+                    f'expected ENOENT for untracked directory, got {ce.exitstatus}')
+        else:
+            raise RuntimeError('expected checkpoint add to fail for untracked directory')
+
+        try:
+            self.checkpoint_remove(self.primary_fs_name, tracked, snap)
+        except CommandFailedError as ce:
+            if ce.exitstatus != errno.ENOENT:
+                raise RuntimeError(
+                    f'expected ENOENT when removing non-checkpoint snap, got {ce.exitstatus}')
+        else:
+            raise RuntimeError('expected checkpoint remove to fail')
+
+        self.checkpoint_add(self.primary_fs_name, tracked, snap)
+        try:
+            self.checkpoint_add(self.primary_fs_name, tracked, snap)
+        except CommandFailedError as ce:
+            if ce.exitstatus != errno.EEXIST:
+                raise RuntimeError(
+                    f'expected EEXIST when re-adding checkpoint, got {ce.exitstatus}')
+        else:
+            raise RuntimeError('expected checkpoint add to fail for duplicate')
+        self.checkpoint_remove(self.primary_fs_name, tracked, snap)
+
+        self.remove_directory(self.primary_fs_name, self.primary_fs_id, tracked)
+        self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
+        try:
+            self.checkpoint_list(self.primary_fs_name, tracked)
+        except CommandFailedError as ce:
+            if ce.exitstatus != errno.EINVAL:
+                raise RuntimeError(
+                    f'expected EINVAL with mirroring disabled, got {ce.exitstatus}')
+        else:
+            raise RuntimeError('expected checkpoint list to fail')
+
+    def test_checkpoint_sync_status_reaches_complete(self):
+        """Test mirror daemon updates checkpoint status after snapshot sync."""
+        dir_path = '/cp_sync'
+        snap_name = 'snap0'
+        peer_spec = "client.mirror_remote@ceph"
+
+        self._setup_mirrored_directory(dir_path, mount_b=True)
+        self._add_checkpoint_snapshot(dir_path, snap_name)
+        self.check_checkpoint_status(self.primary_fs_name, dir_path, snap_name, 'created')
+
+        res = self.checkpoint_list(self.primary_fs_name, dir_path)
+        cp = self.find_checkpoint(res['checkpoints'], snap_name)
+        created_at = cp['created_at']
+        updated_at = cp['updated_at']
+
+        self.peer_add(self.primary_fs_name, self.primary_fs_id, peer_spec, self.secondary_fs_name)
+        self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
+                               peer_spec, dir_path, snap_name, 1)
+        self.verify_snapshot(dir_path.lstrip('/'), snap_name)
+        self.check_checkpoint_status(self.primary_fs_name, dir_path, snap_name, 'complete')
+
+        res = self.checkpoint_list(self.primary_fs_name, dir_path)
+        cp = self.find_checkpoint(res['checkpoints'], snap_name)
+        self.assertEqual(cp['created_at'], created_at)
+        self.assertTrue(cp['updated_at'])
+        self.assertNotEqual(cp['updated_at'], updated_at)
+
+        self._teardown_mirroring(dir_path, peer_spec)
+
+    def test_checkpoint_deleted_snapshot_not_listed(self):
+        """Deleted checkpointed snapshots must not appear in checkpoint ls."""
+        dir_path = '/cp_del'
+        snap_name = 'snap0'
+
+        self._setup_mirrored_directory(dir_path)
+        self._add_checkpoint_snapshot(dir_path, snap_name)
+
+        res = self.checkpoint_list(self.primary_fs_name, dir_path)
+        self.assertEqual(len(res['checkpoints']), 1)
+
+        self.mount_a.run_shell(["rmdir", f"{dir_path.lstrip('/')}/.snap/{snap_name}"])
+        res = self.checkpoint_list(self.primary_fs_name, dir_path)
+        self.assertEqual(res['checkpoints'], [])
+
+        self._teardown_mirroring(dir_path)
+
+    def test_checkpoint_renamed_snapshot_shows_new_name(self):
+        """Renamed checkpointed snapshots must appear under the new name in ls."""
+        dir_path = '/cp_rename'
+        old_name = 'snap0'
+        new_name = 'snap1'
+
+        self._setup_mirrored_directory(dir_path)
+        self._add_checkpoint_snapshot(dir_path, old_name)
+
+        self.mount_a.run_shell([
+            "mv",
+            f"{dir_path.lstrip('/')}/.snap/{old_name}",
+            f"{dir_path.lstrip('/')}/.snap/{new_name}",
+        ])
+
+        self.assert_checkpoint_not_listed(self.primary_fs_name, dir_path, old_name)
+        self.check_checkpoint_status(self.primary_fs_name, dir_path, new_name, 'created')
+
+        self._teardown_mirroring(dir_path)
+
+    def test_checkpoint_add_after_rename_allows_same_snap_name(self):
+        """A new snapshot can reuse a name after the checkpointed one is renamed."""
+        dir_path = '/cp_reuse_name'
+        old_name = 'snap0'
+        new_name = 'snap1'
+
+        self._setup_mirrored_directory(dir_path)
+        self._add_checkpoint_snapshot(dir_path, old_name)
+
+        self.mount_a.run_shell([
+            "mv",
+            f"{dir_path.lstrip('/')}/.snap/{old_name}",
+            f"{dir_path.lstrip('/')}/.snap/{new_name}",
+        ])
+        self.mount_a.run_shell(["mkdir", f"{dir_path.lstrip('/')}/.snap/{old_name}"])
+        self.checkpoint_add(self.primary_fs_name, dir_path, old_name)
+
+        res = self.checkpoint_list(self.primary_fs_name, dir_path)
+        self.assertEqual(len(res['checkpoints']), 2)
+        self.check_checkpoint_status(self.primary_fs_name, dir_path, old_name, 'created')
+        self.check_checkpoint_status(self.primary_fs_name, dir_path, new_name, 'created')
+
+        self._teardown_mirroring(dir_path)
+
+    def test_checkpoint_persisted_across_mirror_daemon_restart(self):
+        """Checkpoints survive mirror daemon restart and reach complete after sync."""
+        dir_path = '/cp_mirror_restart'
+        snap_name = 'snap0'
+        peer_spec = "client.mirror_remote@ceph"
+
+        self._setup_checkpoint_dir(dir_path, snap_name)
+        res = self.checkpoint_list(self.primary_fs_name, dir_path)
+        self.assertEqual(len(res['checkpoints']), 1)
+
+        self.stop_mirror_daemon()
+        self.start_mirror_daemon()
+
+        res = self.checkpoint_list(self.primary_fs_name, dir_path)
+        self.assertEqual(len(res['checkpoints']), 1)
+        self.check_checkpoint_status(self.primary_fs_name, dir_path, snap_name, 'created')
+
+        self.peer_add(self.primary_fs_name, self.primary_fs_id, peer_spec, self.secondary_fs_name)
+        self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
+                               peer_spec, dir_path, snap_name, 1)
+        self.verify_snapshot(dir_path.lstrip('/'), snap_name)
+        self.check_checkpoint_status(self.primary_fs_name, dir_path, snap_name, 'complete')
+
+        self._teardown_mirroring(dir_path, peer_spec)
+
+    def test_checkpoint_persisted_across_mgr_module_restart(self):
+        """Checkpoints survive mirroring mgr module restart and reach complete after sync."""
+        dir_path = '/cp_mgr_restart'
+        snap_name = 'snap0'
+        peer_spec = "client.mirror_remote@ceph"
+
+        self._setup_checkpoint_dir(dir_path, snap_name)
+        res = self.checkpoint_list(self.primary_fs_name, dir_path)
+        self.assertEqual(len(res['checkpoints']), 1)
+
+        self.restart_mirroring_module()
+
+        res = self.checkpoint_list(self.primary_fs_name, dir_path)
+        self.assertEqual(len(res['checkpoints']), 1)
+        self.check_checkpoint_status(self.primary_fs_name, dir_path, snap_name, 'created')
+
+        self.peer_add(self.primary_fs_name, self.primary_fs_id, peer_spec, self.secondary_fs_name)
+        self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
+                               peer_spec, dir_path, snap_name, 1)
+        self.verify_snapshot(dir_path.lstrip('/'), snap_name)
+        self.check_checkpoint_status(self.primary_fs_name, dir_path, snap_name, 'complete')
+
+        self._teardown_mirroring(dir_path, peer_spec)
+
+    def test_checkpoint_add_on_already_synced_snapshot_is_complete(self):
+        """Checkpoint added after sync should be marked complete by the daemon."""
+        dir_path = '/cp_already_synced'
+        snap_name = 'snap0'
+        peer_spec = "client.mirror_remote@ceph"
+
+        self._setup_mirrored_directory(dir_path, peer_spec=peer_spec, mount_b=True)
+        self.mount_a.run_shell(["mkdir", f"{dir_path.lstrip('/')}/.snap/{snap_name}"])
+
+        self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
+                               peer_spec, dir_path, snap_name, 1)
+        self.verify_snapshot(dir_path.lstrip('/'), snap_name)
+
+        self.checkpoint_add(self.primary_fs_name, dir_path, snap_name)
+        self.check_checkpoint_status(self.primary_fs_name, dir_path, snap_name, 'complete')
+
+        self._teardown_mirroring(dir_path, peer_spec)
+
+    def test_checkpoint_on_synced_snaps_complete_after_daemon_restart(self):
+        """Checkpoints added on synced snaps while daemon is down reach complete after restart."""
+        dir_path = '/cp_daemon_down'
+        snap_names = [f'snap{i}' for i in range(5)]
+        checkpoint_snaps = ['snap0', 'snap2', 'snap4']
+        peer_spec = "client.mirror_remote@ceph"
+
+        self._setup_mirrored_directory(dir_path, peer_spec=peer_spec, mount_b=True)
+
+        for snap_name in snap_names:
+            self.mount_a.run_shell(["mkdir", f"{dir_path.lstrip('/')}/.snap/{snap_name}"])
+
+        self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
+                               peer_spec, dir_path, snap_names[-1], len(snap_names))
+        self.verify_snapshot(dir_path.lstrip('/'), snap_names[-1])
+
+        self.stop_mirror_daemon()
+        for snap_name in checkpoint_snaps:
+            self.checkpoint_add(self.primary_fs_name, dir_path, snap_name)
+        self.check_checkpoint_statuses(
+            self.primary_fs_name, dir_path, checkpoint_snaps, 'created')
+
+        self.start_mirror_daemon()
+        self.check_checkpoint_statuses(
+            self.primary_fs_name, dir_path, checkpoint_snaps, 'complete')
+
+        self._teardown_mirroring(dir_path, peer_spec)
+
+    def test_checkpoint_add_directory_notify_perf_counter(self):
+        """Each checkpoint add must notify the mirror daemon via add_directory()."""
+        snap_count = 100
+        dir_path = '/cp_add_dir_notify'
+        dir_name = dir_path.lstrip('/')
+        peer_spec = "client.mirror_remote@ceph"
+
+        self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
+        self.peer_add(self.primary_fs_name, self.primary_fs_id, peer_spec,
+                      self.secondary_fs_name)
+        add_dir_baseline = self.get_peer_perf_counters()['add_directory']
+
+        self.mount_a.run_shell(['mkdir', '-p', dir_name])
+        self.add_directory(self.primary_fs_name, self.primary_fs_id, dir_path,
+                           check_perf_counter=False)
+
+        for i in range(snap_count):
+            self.mount_a.run_shell(['touch', f'{dir_name}/file.{i}'])
+            self.mount_a.run_shell(['mkdir', f'{dir_name}/.snap/snap{i}'])
+            self.checkpoint_add(self.primary_fs_name, dir_path, f'snap{i}')
+
+        # Expect baseline + 101 add_directory calls: one for the initial
+        # mirror add directory and one per checkpoint add (each sends an
+        # acquire notify).
+        self.wait_peer_add_directory_counter(add_dir_baseline + 1 + snap_count)
+
+        res = self.checkpoint_list(self.primary_fs_name, dir_path)
+        self.assertEqual(len(res['checkpoints']), snap_count)
+
+        self._teardown_mirroring(dir_path, peer_spec)
+
+    def test_checkpoint_failed_then_complete(self):
+        """Checkpoint is marked failed on sync error and complete after recovery."""
+        dir_path = '/cp_failed'
+        snap_name = 'snap0'
+        peer_spec = "client.mirror_remote@ceph"
+
+        self._setup_mirrored_directory(dir_path, mount_b=True)
+        dir_name = dir_path.lstrip('/')
+        self.mount_a.create_n_files(f'{dir_name}/file', 10000, sync=True)
+        for i in range(20):
+            self.mount_a.write_n_mb(os.path.join(dir_name, f'large_file.{i}'), 100)
+        self._add_checkpoint_snapshot(dir_path, snap_name)
+        self.check_checkpoint_status(self.primary_fs_name, dir_path, snap_name, 'created')
+
+        # peer_add needs write access on the remote MDS (setxattr on ceph.mirror.info).
+        # Let the daemon create the remote dir with full caps, then stop it as soon as
+        # sync starts, restrict OSD write, and start again (checkpoint_sync_failed() path).
+        self.run_ceph_cmd("fs", "snapshot", "mirror", "peer_add",
+                          self.primary_fs_name, peer_spec, self.secondary_fs_name)
+        self.stop_daemon_when_peer_snap_in_progress(self.primary_fs_name, self.primary_fs_id,
+                                                    peer_spec, dir_path, snap_name)
+        self._restrict_mirror_remote_caps()
+        self.start_mirror_daemon()
+
+        self.check_checkpoint_status(self.primary_fs_name, dir_path, snap_name, 'failed')
+        res = self.checkpoint_list(self.primary_fs_name, dir_path)
+        cp = self.find_checkpoint(res['checkpoints'], snap_name)
+        self.assertIn('error_msg', cp)
+        self.assertTrue(cp['error_msg'])
+
+        self._restore_mirror_remote_caps()
+        self.restart_mirror_daemon()
+
+        self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
+                               peer_spec, dir_path, snap_name, 1)
+        self.verify_snapshot(dir_path.lstrip('/'), snap_name)
+        self.check_checkpoint_status(self.primary_fs_name, dir_path, snap_name, 'complete')
+
+        res = self.checkpoint_list(self.primary_fs_name, dir_path)
+        cp = self.find_checkpoint(res['checkpoints'], snap_name)
+        self.assertNotIn('error_msg', cp)
+
+        self._teardown_mirroring(dir_path, peer_spec)
+
     def test_add_relative_directory_path(self):
         self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
         try:
@@ -2835,6 +3350,8 @@ class TestMirroring(CephFSTestCase):
             sync_dir = 'mgr_nc_sync'
             self.mount_a.run_shell(['mkdir', sync_dir])
             self.mount_a.create_n_files(f'{sync_dir}/file', 10000, sync=True)
+            for i in range(20):
+                self.mount_a.write_n_mb(os.path.join(sync_dir, f'large_file.{i}'), 100)
             self.add_directory(self.primary_fs_name, self.primary_fs_id,
                                f'/{sync_dir}')
             snap_name = 'snap0'