]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
qa: Add mgr snapshot mirror status tests 68827/head
authorKotresh HR <khiremat@redhat.com>
Tue, 23 Jun 2026 15:13:18 +0000 (20:43 +0530)
committerKotresh HR <khiremat@redhat.com>
Tue, 23 Jun 2026 12:42:26 +0000 (18:12 +0530)
Add teuthology coverage for `ceph fs snapshot mirror status`:
parity with asok peer_status, default idle metrics, stale omap
handling, error paths, filter scopes, daemon restart, and cache TTL.

Reuse existing peer_dir_status and metrics assertion helpers.
Adjust stale test wait for InstanceWatcher timeout and set a
short cache TTL in the cache test. Pass peer_uuid via --peer_uuid=
in the test helper for peer-only scope queries.

Fixes: https://tracker.ceph.com/issues/76686
Signed-off-by: Kotresh HR <khiremat@redhat.com>
qa/tasks/cephfs/test_mirroring.py

index ad4642c4e7dd9ff0afb58825b21997fce593682c..0dd1285b178d15e63ae4c123629c8251c8418573 100644 (file)
@@ -5,6 +5,7 @@ import base64
 import errno
 import logging
 import random
+import signal
 import time
 import functools
 
@@ -14,6 +15,7 @@ from collections import deque
 from tasks.cephfs.cephfs_test_case import CephFSTestCase
 from teuthology.exceptions import CommandFailedError
 from teuthology.contextutil import safe_while
+from teuthology.orchestra import run
 
 log = logging.getLogger(__name__)
 
@@ -70,6 +72,8 @@ class TestMirroring(CephFSTestCase):
     PERF_COUNTER_KEY_NAME_CEPHFS_MIRROR_FS = "cephfs_mirror_mirrored_filesystems"
     PERF_COUNTER_KEY_NAME_CEPHFS_MIRROR_PEER = "cephfs_mirror_peers"
     PERF_COUNTER_KEY_NAME_CEPHFS_MIRROR_DIRECTORY = "cephfs_mirror_directory"
+    MGR_METRICS_CACHE_TTL = 3
+    MIRROR_TICK_INTERVAL = 1
 
     def setUp(self):
         super(TestMirroring, self).setUp()
@@ -79,6 +83,60 @@ class TestMirroring(CephFSTestCase):
         self.secondary_fs_id = self.backup_fs.id
         self.enable_mirroring_module()
         self.config_set('client.mirror', 'cephfs_mirror_directory_scan_interval', 1)
+        self.config_set('client.mirror', 'cephfs_mirror_tick_interval',
+                          self.MIRROR_TICK_INTERVAL)
+        self.config_set('mgr', 'mgr/mirroring/snapshot_mirror_metrics_cache_ttl',
+                          self.MGR_METRICS_CACHE_TTL)
+        self.enable_mgr_metrics_cache()
+
+    def disable_mgr_metrics_cache(self):
+        self.config_set('mgr', 'mgr/mirroring/snapshot_mirror_metrics_cache_enabled',
+                        False)
+
+    def enable_mgr_metrics_cache(self):
+        self.config_set('mgr', 'mgr/mirroring/snapshot_mirror_metrics_cache_enabled',
+                        True)
+
+    def assert_mgr_mirror_status_scopes(self, fs_name, dir_path, peer_uuid,
+                                        expected_dirs, asok_res):
+        full_res = self.mgr_mirror_status(fs_name)
+        self.assertEqual(set(full_res['metrics'].keys()), set(expected_dirs))
+
+        peer_res = self.mgr_mirror_status(fs_name, peer_uuid=peer_uuid)
+        self.assertEqual(set(peer_res['metrics'].keys()), set(expected_dirs))
+        for path in expected_dirs:
+            self.assertEqual(
+                set(peer_res['metrics'][path]['peer'].keys()), {peer_uuid})
+
+        dir_res = self.mgr_mirror_status(fs_name, dir_path)
+        self.assertEqual(set(dir_res['metrics'].keys()), {dir_path})
+
+        dir_peer_res = self.mgr_mirror_status(fs_name, dir_path, peer_uuid)
+        self.assertEqual(set(dir_peer_res['metrics'].keys()), {dir_path})
+        self.assertEqual(
+            set(dir_peer_res['metrics'][dir_path]['peer'].keys()), {peer_uuid})
+
+        for path in expected_dirs:
+            for res in (full_res, peer_res):
+                mgr_stat = self.peer_dir_status(res, path, peer_uuid)
+                asok_stat = self.peer_dir_status(asok_res, path, peer_uuid)
+                self.assert_mgr_dir_stat_matches_asok(mgr_stat, asok_stat)
+
+        for res in (dir_res, dir_peer_res):
+            mgr_stat = self.peer_dir_status(res, dir_path, peer_uuid)
+            asok_stat = self.peer_dir_status(asok_res, dir_path, peer_uuid)
+            self.assert_mgr_dir_stat_matches_asok(mgr_stat, asok_stat)
+
+    @retry_assert(timeout=120, interval=1)
+    def check_mgr_snapshot_mirror_status_scopes_match_asok(
+            self, fs_name, fs_id, dir_path, peer_uuid, expected_dirs,
+            expected_state=None):
+        asok_res = self.peer_status(fs_name, fs_id, peer_uuid)
+        asok_stat = self.peer_dir_status(asok_res, dir_path, peer_uuid)
+        if expected_state is not None:
+            self.assertEqual(asok_stat['state'], expected_state)
+        self.assert_mgr_mirror_status_scopes(
+            fs_name, dir_path, peer_uuid, expected_dirs, asok_res)
 
     def tearDown(self):
         self.disable_mirroring_module()
@@ -362,6 +420,172 @@ class TestMirroring(CephFSTestCase):
         self.assertTrue(
             snap['eta'] == 'calculating...' or bool(re.search(r'\d', snap['eta'])))
 
+    def mgr_mirror_status(self, fs_name, mirrored_dir_path=None, peer_uuid=None):
+        args = ["fs", "snapshot", "mirror", "status", fs_name]
+        if mirrored_dir_path is not None:
+            args.append(mirrored_dir_path)
+        if peer_uuid is not None:
+            args.append(f'--peer_uuid={peer_uuid}')
+        return json.loads(self.get_ceph_cmd_stdout(*args))
+
+    def peer_status(self, fs_name, fs_id, peer_uuid):
+        return self.mirror_daemon_command(
+            f'peer status for fs: {fs_name}',
+            'fs', 'mirror', 'peer', 'status',
+            f'{fs_name}@{fs_id}', peer_uuid)
+
+    def dir_status_from_mgr(self, fs_name, dir_name, peer_uuid,
+                            mirrored_dir_path=None):
+        res = self.mgr_mirror_status(
+            fs_name, mirrored_dir_path or dir_name, peer_uuid)
+        return self.peer_dir_status(res, dir_name, peer_uuid)
+
+    def dir_status_from_asok(self, fs_name, fs_id, dir_name, peer_uuid):
+        res = self.peer_status(fs_name, fs_id, peer_uuid)
+        return self.peer_dir_status(res, dir_name, peer_uuid)
+
+    def assert_default_idle_dir_stat(self, dir_stat):
+        self.assertEqual(dir_stat['state'], 'idle')
+        self.assertEqual(dir_stat['snaps_synced'], 0)
+        self.assertEqual(dir_stat['snaps_deleted'], 0)
+        self.assertEqual(dir_stat['snaps_renamed'], 0)
+        self.assertNotIn('last_synced_snap', dir_stat)
+
+    def assert_mgr_dir_stat_matches_asok(self, mgr_stat, asok_stat):
+        self.assertEqual(mgr_stat['state'], asok_stat['state'])
+        for key in ('snaps_synced', 'snaps_deleted', 'snaps_renamed'):
+            self.assertEqual(mgr_stat.get(key), asok_stat.get(key))
+        if 'failure_reason' in asok_stat:
+            self.assertEqual(mgr_stat.get('failure_reason'), asok_stat['failure_reason'])
+        if 'last_synced_snap' in asok_stat:
+            self.assertIn('last_synced_snap', mgr_stat)
+            self.assertEqual(mgr_stat['last_synced_snap']['name'],
+                             asok_stat['last_synced_snap']['name'])
+            self.assert_last_synced_snap_metrics(mgr_stat['last_synced_snap'])
+        if 'current_syncing_snap' in asok_stat:
+            self.assertIn('current_syncing_snap', mgr_stat)
+            mgr_snap = mgr_stat['current_syncing_snap']
+            asok_snap = asok_stat['current_syncing_snap']
+            self.assertEqual(mgr_snap['name'], asok_snap['name'])
+            self.assert_syncing_snap_metrics(
+                mgr_snap, sync_mode=asok_snap.get('sync-mode'))
+
+    @retry_assert(timeout=120, interval=1)
+    def check_mgr_dir_stat_matches_asok(self, fs_name, fs_id, dir_name, peer_uuid,
+                                      mirrored_dir_path=None):
+        mgr_stat = self.dir_status_from_mgr(
+            fs_name, dir_name, peer_uuid, mirrored_dir_path)
+        asok_stat = self.dir_status_from_asok(fs_name, fs_id, dir_name, peer_uuid)
+        try:
+            self.assert_mgr_dir_stat_matches_asok(mgr_stat, asok_stat)
+        except RETRY_EXCEPTIONS as e:
+            e.mgr_stat = mgr_stat
+            e.asok_stat = asok_stat
+            raise
+
+    def wait_for_mirror_daemon_stop(self, pid):
+        with safe_while(sleep=1, tries=60,
+                        action='wait for mirror daemon stop') as proceed:
+            while proceed():
+                try:
+                    cur_pid = self.get_mirror_daemon_pid()
+                except CommandFailedError:
+                    return
+                if cur_pid != pid:
+                    return
+                p = self.mount_a.run_shell(['kill', '-0', cur_pid],
+                                           check_status=False)
+                if p.returncode != 0:
+                    return
+
+    def restart_mirror_daemon(self):
+        # daemon.start() always calls restart(), which skips stop() once proc
+        # is cleared.  Stop the real cephfs-mirror via its pid file first so
+        # the new instance can take the pidfile lock.
+        daemons = list(self.ctx.daemons.iter_daemons_of_role('cephfs-mirror'))
+        self.assertEqual(len(daemons), 1,
+                         'expected a single cephfs-mirror daemon')
+        daemon = daemons[0]
+        rados_inst_before = self.get_mirror_rados_addr(
+            self.primary_fs_name, self.primary_fs_id)
+        pid = self.get_mirror_daemon_pid()
+
+        log.debug(f'SIGTERM to cephfs-mirror pid {pid}')
+        if daemon.running():
+            try:
+                daemon.signal(signal.SIGTERM, silent=True)
+            except Exception as e:
+                log.debug(f'failed to signal cephfs-mirror via teuthology: {e}')
+        self.mount_a.run_shell(['kill', '-TERM', pid])
+        self.wait_for_mirror_daemon_stop(pid)
+
+        if daemon.running():
+            try:
+                run.wait([daemon.proc], timeout=10)
+            except CommandFailedError:
+                pass
+        daemon.reset()
+
+        log.debug('starting cephfs-mirror')
+        daemon.start()
+
+        time.sleep(60)
+        with safe_while(sleep=2, tries=30,
+                        action='wait for mirror daemon restart') as proceed:
+            while proceed():
+                try:
+                    rados_inst = self.get_mirror_rados_addr(
+                        self.primary_fs_name, self.primary_fs_id)
+                    if rados_inst and rados_inst != rados_inst_before:
+                        break
+                except CommandFailedError:
+                    pass
+
+    def wait_for_mirror_daemon_recovery(self, fs_name, fs_id, dir_name, peer_uuid):
+        # A new rados_inst alone does not mean mirroring is ready: wait until the
+        # peer is configured, the snap dir is registered, and asok reports it.
+        with safe_while(sleep=2, tries=60,
+                        action='wait for mirror daemon recovery') as proceed:
+            while proceed():
+                if not self.get_mirror_rados_addr(fs_name, fs_id):
+                    continue
+                try:
+                    mirror_res = self.mirror_daemon_command(
+                        f'mirror status for fs: {fs_name}',
+                        'fs', 'mirror', 'status', f'{fs_name}@{fs_id}')
+                except CommandFailedError:
+                    continue
+                if peer_uuid not in mirror_res.get('peers', {}):
+                    continue
+                if mirror_res.get('snap_dirs', {}).get('dir_count', 0) < 1:
+                    continue
+                try:
+                    peer_res = self.peer_status(fs_name, fs_id, peer_uuid)
+                    self.peer_dir_status(peer_res, dir_name, peer_uuid)
+                except RETRY_EXCEPTIONS:
+                    continue
+                return
+
+    @retry_assert(timeout=120, interval=1)
+    def check_mgr_and_asok_session_counters_zero(self, fs_name, fs_id, dir_name,
+                                                 peer_uuid):
+        mgr_stat = self.dir_status_from_mgr(fs_name, dir_name, peer_uuid)
+        asok_stat = self.dir_status_from_asok(fs_name, fs_id, dir_name, peer_uuid)
+        self.assertEqual(mgr_stat['state'], 'idle')
+        self.assertEqual(asok_stat['state'], 'idle')
+        for key in ('snaps_synced', 'snaps_deleted', 'snaps_renamed'):
+            self.assertEqual(mgr_stat.get(key), 0, msg=f'mgr {key}')
+            self.assertEqual(asok_stat.get(key), 0, msg=f'asok {key}')
+        self.assertEqual(mgr_stat['last_synced_snap']['name'],
+                         asok_stat['last_synced_snap']['name'])
+
+    @retry_assert(timeout=90, interval=5)
+    def check_mgr_dir_stat_stale(self, fs_name, dir_name, peer_uuid):
+        mgr_stat = self.dir_status_from_mgr(fs_name, dir_name, peer_uuid)
+        self.assertEqual(mgr_stat['state'], 'stale',
+                         msg=f'unexpected mgr stat: {mgr_stat}')
+        self.assertNotIn('current_syncing_snap', mgr_stat)
+
     @retry_assert(timeout=120, interval=1)
     def check_peer_syncing_progress_metrics(self, fs_name, fs_id, peer_spec, dir_name,
                                             snap_name, sync_mode=None):
@@ -2175,6 +2399,459 @@ class TestMirroring(CephFSTestCase):
         self.config_set('client.mirror', 'cephfs_mirror_distribute_datasync_threads', 'true')
         self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
 
+    def test_mgr_snapshot_mirror_status_matches_asok_idle(self):
+        """Mgr snapshot mirror status matches asok peer status after sync."""
+        self.setup_mount_b(mds_perm='rw')
+        self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
+        peer_spec = "client.mirror_remote@ceph"
+        self.peer_add(self.primary_fs_name, self.primary_fs_id, peer_spec,
+                      self.secondary_fs_name)
+
+        dir_name = 'mgr_status_dir'
+        self.mount_a.run_shell(['mkdir', dir_name])
+        self.mount_a.create_n_files(f'{dir_name}/file', 50, sync=True)
+        self.add_directory(self.primary_fs_name, self.primary_fs_id, f'/{dir_name}')
+
+        snap_name = 'snap0'
+        self.mount_a.run_shell(['mkdir', f'{dir_name}/.snap/{snap_name}'])
+        self.check_peer_status_idle(self.primary_fs_name, self.primary_fs_id,
+                                    peer_spec, f'/{dir_name}', snap_name, 1)
+
+        peer_uuid = self.get_peer_uuid(peer_spec)
+        self.check_mgr_dir_stat_matches_asok(
+            self.primary_fs_name, self.primary_fs_id, f'/{dir_name}', peer_uuid)
+        self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
+
+    def test_mgr_snapshot_mirror_status_matches_asok_syncing(self):
+        """Mgr snapshot mirror status matches asok peer status during sync."""
+        self.setup_mount_b(mds_perm='rw')
+        self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
+        peer_spec = "client.mirror_remote@ceph"
+        self.peer_add(self.primary_fs_name, self.primary_fs_id, peer_spec,
+                      self.secondary_fs_name)
+
+        dir_name = 'mgr_sync_dir'
+        self.mount_a.run_shell(['mkdir', dir_name])
+        self.mount_a.create_n_files(f'{dir_name}/file', 8000, sync=True)
+        self.add_directory(self.primary_fs_name, self.primary_fs_id, f'/{dir_name}')
+
+        snap_name = 'snap0'
+        self.mount_a.run_shell(['mkdir', f'{dir_name}/.snap/{snap_name}'])
+        self.check_peer_syncing_progress_metrics(
+            self.primary_fs_name, self.primary_fs_id, peer_spec, f'/{dir_name}',
+            snap_name, sync_mode='full')
+
+        peer_uuid = self.get_peer_uuid(peer_spec)
+        self.check_mgr_dir_stat_matches_asok(
+            self.primary_fs_name, self.primary_fs_id, f'/{dir_name}', peer_uuid)
+        self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
+
+    def test_mgr_snapshot_mirror_status_default_idle_new_dir(self):
+        """Mgr status reports default idle metrics for a newly added directory."""
+        self.setup_mount_b(mds_perm='rw')
+        self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
+        peer_spec = "client.mirror_remote@ceph"
+        self.peer_add(self.primary_fs_name, self.primary_fs_id, peer_spec,
+                      self.secondary_fs_name)
+
+        dir_name = 'mgr_default_dir'
+        self.mount_a.run_shell(['mkdir', dir_name])
+        self.add_directory(self.primary_fs_name, self.primary_fs_id, f'/{dir_name}')
+
+        peer_uuid = self.get_peer_uuid(peer_spec)
+        mgr_stat = self.dir_status_from_mgr(
+            self.primary_fs_name, f'/{dir_name}', peer_uuid)
+        self.assert_default_idle_dir_stat(mgr_stat)
+        self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
+
+    def test_mgr_snapshot_mirror_status_stale_after_daemon_stop(self):
+        """Mgr status marks frozen omap syncing progress as stale."""
+        self.setup_mount_b(mds_perm='rw')
+        self.mount_a.run_shell(["mkdir", "d0"])
+        for i in range(8):
+            self.mount_a.write_n_mb(os.path.join('d0', f'file.{i}'), 1024)
+
+        self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
+        self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d0')
+        peer_spec = "client.mirror_remote@ceph"
+        self.peer_add(self.primary_fs_name, self.primary_fs_id, peer_spec,
+                      self.secondary_fs_name)
+
+        self.mount_a.run_shell(["mkdir", "d0/.snap/snap0"])
+        self.check_peer_snap_in_progress(self.primary_fs_name, self.primary_fs_id,
+                                         peer_spec, '/d0', 'snap0')
+
+        peer_uuid = self.get_peer_uuid(peer_spec)
+        # Wait for live metrics to reach omap before freezing the daemon.
+        # Stale detection requires a persisted _instance_id; without an omap
+        # write the mgr reports default idle metrics instead of stale.
+        mgr_syncing = False
+        with safe_while(sleep=2, tries=30,
+                        action='wait for omap syncing metrics') as proceed:
+            while proceed():
+                mgr_stat = self.dir_status_from_mgr(
+                    self.primary_fs_name, '/d0', peer_uuid)
+                if mgr_stat.get('state') == 'syncing':
+                    mgr_syncing = True
+                    break
+        self.assertTrue(
+            mgr_syncing,
+            'mgr never reported syncing before SIGSTOP; '
+            f'last mgr stat: {mgr_stat}')
+
+        pid = self.get_mirror_daemon_pid()
+        log.debug(f'SIGSTOP to cephfs-mirror pid {pid}')
+        self.mount_a.run_shell(['kill', '-SIGSTOP', pid])
+        try:
+            # InstanceWatcher INSTANCE_TIMEOUT is 30s; allow extra time for
+            # the mgr notify loop to age out the frozen instance.
+            time.sleep(40)
+            self.check_mgr_dir_stat_stale(
+                self.primary_fs_name, '/d0', peer_uuid)
+        finally:
+            log.debug('SIGCONT to cephfs-mirror')
+            self.mount_a.run_shell(['kill', '-SIGCONT', pid])
+
+        # wait for restart mirror on blocklist
+        time.sleep(60)
+        with safe_while(sleep=2, tries=20,
+                        action='wait for mirror daemon recovery after SIGSTOP') as proceed:
+            while proceed():
+                if not self.get_mirror_rados_addr(self.primary_fs_name,
+                                                   self.primary_fs_id):
+                    continue
+                res = self.mirror_daemon_command(
+                    f'mirror status for fs: {self.primary_fs_name}',
+                    'fs', 'mirror', 'status',
+                    f'{self.primary_fs_name}@{self.primary_fs_id}')
+                if 'snap_dirs' in res:
+                    break
+
+        self.remove_directory(self.primary_fs_name, self.primary_fs_id, '/d0')
+        self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
+
+    def test_mgr_snapshot_mirror_status_no_peers(self):
+        """Mgr status returns empty metrics when no peers are configured."""
+        self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
+        res = self.mgr_mirror_status(self.primary_fs_name)
+        self.assertEqual(res, {'metrics': {}})
+        self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
+
+    def test_mgr_snapshot_mirror_status_errors(self):
+        """Mgr status returns expected errors for invalid inputs."""
+        self.setup_mount_b(mds_perm='rw')
+        self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
+        peer_spec = "client.mirror_remote@ceph"
+        self.peer_add(self.primary_fs_name, self.primary_fs_id, peer_spec,
+                      self.secondary_fs_name)
+
+        dir_name = 'mgr_err_dir'
+        self.mount_a.run_shell(['mkdir', dir_name])
+        self.add_directory(self.primary_fs_name, self.primary_fs_id, f'/{dir_name}')
+
+        try:
+            self.get_ceph_cmd_stdout("fs", "snapshot", "mirror", "status",
+                                     "nonexistent_fs_name")
+        except CommandFailedError as ce:
+            if ce.exitstatus != errno.ENOENT:
+                raise RuntimeError(-errno.ENOENT,
+                                   'incorrect error for unknown filesystem')
+        else:
+            raise RuntimeError(-errno.ENOENT, 'expected unknown filesystem to fail')
+
+        self.remove_directory(self.primary_fs_name, self.primary_fs_id, f'/{dir_name}')
+        self.peer_remove(self.primary_fs_name, self.primary_fs_id, peer_spec)
+        self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
+        try:
+            self.get_ceph_cmd_stdout("fs", "snapshot", "mirror", "status",
+                                     self.primary_fs_name)
+        except CommandFailedError as ce:
+            if ce.exitstatus != errno.EINVAL:
+                raise RuntimeError(-errno.EINVAL,
+                                   'incorrect error for non-mirrored filesystem')
+        else:
+            raise RuntimeError(-errno.EINVAL, 'expected non-mirrored fs to fail')
+
+        self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
+        try:
+            self.get_ceph_cmd_stdout("fs", "snapshot", "mirror", "status",
+                                     self.primary_fs_name, '/not_mirrored')
+        except CommandFailedError as ce:
+            if ce.exitstatus != errno.ENOENT:
+                raise RuntimeError(-errno.ENOENT,
+                                   'incorrect error for unknown directory')
+        else:
+            raise RuntimeError(-errno.ENOENT, 'expected unknown directory to fail')
+
+        try:
+            self.get_ceph_cmd_stdout(
+                "fs", "snapshot", "mirror", "status",
+                self.primary_fs_name, f'/{dir_name}',
+                '--peer_uuid=00000000-0000-0000-0000-000000000000')
+        except CommandFailedError as ce:
+            if ce.exitstatus != errno.ENOENT:
+                raise RuntimeError(-errno.ENOENT,
+                                   'incorrect error for unknown peer')
+        else:
+            raise RuntimeError(-errno.ENOENT, 'expected unknown peer to fail')
+
+        self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
+
+    def test_mgr_snapshot_mirror_status_filter_scopes(self):
+        """Mgr status filters by filesystem, directory, and peer scope."""
+        self.setup_mount_b(mds_perm='rw')
+        self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
+        peer_spec = "client.mirror_remote@ceph"
+        self.peer_add(self.primary_fs_name, self.primary_fs_id, peer_spec,
+                      self.secondary_fs_name)
+
+        for dir_name in ('mgr_f0', 'mgr_f1'):
+            self.mount_a.run_shell(['mkdir', dir_name])
+            self.mount_a.create_n_files(f'{dir_name}/file', 50, sync=True)
+            self.add_directory(self.primary_fs_name, self.primary_fs_id,
+                              f'/{dir_name}')
+            self.mount_a.run_shell(['mkdir', f'{dir_name}/.snap/snap0'])
+
+        self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
+                               peer_spec, '/mgr_f0', 'snap0', 1)
+        self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
+                               peer_spec, '/mgr_f1', 'snap0', 1)
+
+        peer_uuid = self.get_peer_uuid(peer_spec)
+        full_res = self.mgr_mirror_status(self.primary_fs_name)
+        self.assertIn('/mgr_f0', full_res['metrics'])
+        self.assertIn('/mgr_f1', full_res['metrics'])
+
+        dir_res = self.mgr_mirror_status(self.primary_fs_name, '/mgr_f0')
+        self.assertEqual(set(dir_res['metrics'].keys()), {'/mgr_f0'})
+
+        peer_res = self.mgr_mirror_status(
+            self.primary_fs_name, '/mgr_f0', peer_uuid)
+        self.assertEqual(set(peer_res['metrics'].keys()), {'/mgr_f0'})
+        self.assertIn(peer_uuid, peer_res['metrics']['/mgr_f0']['peer'])
+        self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
+
+    def test_mgr_snapshot_mirror_status_survives_daemon_restart(self):
+        """Mgr status keeps persisted last_synced_snap and resets session counters after restart."""
+        self.setup_mount_b(mds_perm='rw')
+        self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
+        peer_spec = "client.mirror_remote@ceph"
+        self.peer_add(self.primary_fs_name, self.primary_fs_id, peer_spec,
+                      self.secondary_fs_name)
+
+        dir_name = 'mgr_restart_dir'
+        self.mount_a.run_shell(['mkdir', dir_name])
+        self.mount_a.create_n_files(f'{dir_name}/file', 3000, sync=True)
+        self.add_directory(self.primary_fs_name, self.primary_fs_id, f'/{dir_name}')
+
+        snap0 = 'snap0'
+        self.mount_a.run_shell(['mkdir', f'{dir_name}/.snap/{snap0}'])
+        self.check_peer_status_idle(self.primary_fs_name, self.primary_fs_id,
+                                    peer_spec, f'/{dir_name}', snap0, 1)
+
+        for i in range(5):
+            self.mount_a.write_n_mb(os.path.join(dir_name, f'more_file.{i}'), 1)
+
+        snap1 = 'snap1'
+        self.mount_a.run_shell(['mkdir', f'{dir_name}/.snap/{snap1}'])
+        self.check_peer_status_idle(self.primary_fs_name, self.primary_fs_id,
+                                    peer_spec, f'/{dir_name}', snap1, 2)
+
+        self.mount_a.run_shell(['rmdir', f'{dir_name}/.snap/{snap0}'])
+        self.check_peer_status_deleted_snap(self.primary_fs_name, self.primary_fs_id,
+                                            peer_spec, f'/{dir_name}', 1)
+        snap_list = self.mount_b.ls(path=f'{dir_name}/.snap')
+        self.assertNotIn(snap0, snap_list)
+
+        snap2 = 'snap2'
+        self.mount_a.run_shell(['mv', f'{dir_name}/.snap/{snap1}',
+                                f'{dir_name}/.snap/{snap2}'])
+        self.check_peer_status_renamed_snap(self.primary_fs_name, self.primary_fs_id,
+                                            peer_spec, f'/{dir_name}', 1)
+        snap_list = self.mount_b.ls(path=f'{dir_name}/.snap')
+        self.assertNotIn(snap1, snap_list)
+        self.assertIn(snap2, snap_list)
+        self.check_peer_status_idle(self.primary_fs_name, self.primary_fs_id,
+                                    peer_spec, f'/{dir_name}', snap2, 2)
+
+        peer_uuid = self.get_peer_uuid(peer_spec)
+        before = self.dir_status_from_mgr(
+            self.primary_fs_name, f'/{dir_name}', peer_uuid)
+        self.assertEqual(before['snaps_synced'], 2)
+        self.assertEqual(before['snaps_deleted'], 1)
+        self.assertEqual(before['snaps_renamed'], 1)
+        self.assertEqual(before['last_synced_snap']['name'], snap2)
+
+        self.restart_mirror_daemon()
+        self.wait_for_mirror_daemon_recovery(
+            self.primary_fs_name, self.primary_fs_id, f'/{dir_name}', peer_uuid)
+        self.check_mgr_and_asok_session_counters_zero(
+            self.primary_fs_name, self.primary_fs_id, f'/{dir_name}', peer_uuid)
+
+        after = self.dir_status_from_mgr(
+            self.primary_fs_name, f'/{dir_name}', peer_uuid)
+        self.assertEqual(after['last_synced_snap']['name'],
+                         before['last_synced_snap']['name'])
+        self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
+
+    def test_mgr_snapshot_mirror_status_after_directory_remove(self):
+        """Mgr status for a removed directory returns ENOENT."""
+        self.setup_mount_b(mds_perm='rw')
+        self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
+        peer_spec = "client.mirror_remote@ceph"
+        self.peer_add(self.primary_fs_name, self.primary_fs_id, peer_spec,
+                      self.secondary_fs_name)
+
+        dir_name = 'mgr_rm_dir'
+        self.mount_a.run_shell(['mkdir', dir_name])
+        self.add_directory(self.primary_fs_name, self.primary_fs_id, f'/{dir_name}')
+        self.remove_directory(self.primary_fs_name, self.primary_fs_id, f'/{dir_name}')
+
+        try:
+            self.mgr_mirror_status(self.primary_fs_name, f'/{dir_name}')
+        except CommandFailedError as ce:
+            if ce.exitstatus != errno.ENOENT:
+                raise RuntimeError(-errno.ENOENT,
+                                   'incorrect error for removed directory')
+        else:
+            raise RuntimeError(-errno.ENOENT, 'expected removed directory to fail')
+
+        self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
+
+    def test_mgr_snapshot_mirror_status_failed_state(self):
+        """Mgr status reports failed state like asok peer status."""
+        self.setup_mount_b(mds_perm='rwps')
+        self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
+        peer_spec = "client.mirror_remote@ceph"
+        self.peer_add(self.primary_fs_name, self.primary_fs_id, peer_spec,
+                      self.secondary_fs_name)
+
+        dir_name = 'd0'
+        self.mount_a.run_shell(['mkdir', dir_name])
+        self.add_directory(self.primary_fs_name, self.primary_fs_id, f'/{dir_name}')
+
+        snap_name = "snap_a"
+        self.mount_a.run_shell(['mkdir', f'{dir_name}/.snap/{snap_name}'])
+        self.check_peer_status_idle(self.primary_fs_name, self.primary_fs_id,
+                                    peer_spec, f'/{dir_name}', snap_name, 1)
+
+        remote_snap_path = f'{dir_name}/.snap/snap_b'
+        self.mount_b.run_shell(['sudo', 'mkdir', remote_snap_path], omit_sudo=False)
+
+        self.verify_failed_directory(self.primary_fs_name, self.primary_fs_id,
+                                     peer_spec, f'/{dir_name}')
+        peer_uuid = self.get_peer_uuid(peer_spec)
+        self.check_mgr_dir_stat_matches_asok(
+            self.primary_fs_name, self.primary_fs_id, f'/{dir_name}', peer_uuid)
+
+        self.mount_b.run_shell(['sudo', 'rmdir', remote_snap_path], omit_sudo=False)
+        self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
+
+    def test_mgr_snapshot_mirror_status_cache(self):
+        """Repeated mgr status calls hit the metrics cache within TTL."""
+        self.setup_mount_b(mds_perm='rw')
+        self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
+        peer_spec = "client.mirror_remote@ceph"
+        self.peer_add(self.primary_fs_name, self.primary_fs_id, peer_spec,
+                      self.secondary_fs_name)
+
+        dir_name = 'mgr_cache_dir'
+        self.mount_a.run_shell(['mkdir', dir_name])
+        self.mount_a.create_n_files(f'{dir_name}/file', 50, sync=True)
+        self.add_directory(self.primary_fs_name, self.primary_fs_id, f'/{dir_name}')
+
+        snap0 = 'snap0'
+        self.mount_a.run_shell(['mkdir', f'{dir_name}/.snap/{snap0}'])
+        self.check_peer_status_idle(self.primary_fs_name, self.primary_fs_id,
+                                    peer_spec, f'/{dir_name}', snap0, 1)
+
+        res1 = self.mgr_mirror_status(self.primary_fs_name)
+        res2 = self.mgr_mirror_status(self.primary_fs_name)
+        self.assertEqual(res1, res2)
+
+        snap1 = 'snap1'
+        self.mount_a.run_shell(['mkdir', f'{dir_name}/.snap/{snap1}'])
+        self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
+                               peer_spec, f'/{dir_name}', snap1, 2)
+
+        time.sleep(self.MGR_METRICS_CACHE_TTL + 1)
+        res3 = self.mgr_mirror_status(self.primary_fs_name)
+        peer_uuid = self.get_peer_uuid(peer_spec)
+        self.assertEqual(
+            res3['metrics'][f'/{dir_name}']['peer'][peer_uuid]['snaps_synced'], 2)
+        self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
+
+    def test_mgr_snapshot_mirror_status_cache_disabled(self):
+        """Mgr snapshot mirror status reads omap for all scopes when cache is disabled."""
+        self.disable_mgr_metrics_cache()
+        try:
+            self.setup_mount_b(mds_perm='rw')
+            self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
+            peer_spec = "client.mirror_remote@ceph"
+            self.peer_add(self.primary_fs_name, self.primary_fs_id, peer_spec,
+                          self.secondary_fs_name)
+
+            dirs = []
+            for dir_name in ('mgr_nc0', 'mgr_nc1'):
+                self.mount_a.run_shell(['mkdir', dir_name])
+                self.mount_a.create_n_files(f'{dir_name}/file', 50, sync=True)
+                self.add_directory(self.primary_fs_name, self.primary_fs_id,
+                                   f'/{dir_name}')
+                self.mount_a.run_shell(['mkdir', f'{dir_name}/.snap/snap0'])
+                dirs.append(f'/{dir_name}')
+
+            peer_uuid = self.get_peer_uuid(peer_spec)
+            for dir_path in dirs:
+                self.check_peer_status_idle(self.primary_fs_name, self.primary_fs_id,
+                                            peer_spec, dir_path, 'snap0', 1)
+
+            self.check_mgr_snapshot_mirror_status_scopes_match_asok(
+                self.primary_fs_name, self.primary_fs_id, dirs[0], peer_uuid,
+                dirs)
+
+            self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
+        finally:
+            self.enable_mgr_metrics_cache()
+
+    def test_mgr_snapshot_mirror_status_cache_disabled_syncing(self):
+        """Mgr snapshot mirror status reads omap during sync when cache is disabled."""
+        self.disable_mgr_metrics_cache()
+        try:
+            self.setup_mount_b(mds_perm='rw')
+            self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
+            peer_spec = "client.mirror_remote@ceph"
+            self.peer_add(self.primary_fs_name, self.primary_fs_id, peer_spec,
+                          self.secondary_fs_name)
+
+            idle_dir = 'mgr_nc_idle'
+            self.mount_a.run_shell(['mkdir', idle_dir])
+            self.mount_a.create_n_files(f'{idle_dir}/file', 50, sync=True)
+            self.add_directory(self.primary_fs_name, self.primary_fs_id,
+                               f'/{idle_dir}')
+            self.mount_a.run_shell(['mkdir', f'{idle_dir}/.snap/snap0'])
+            self.check_peer_status_idle(self.primary_fs_name, self.primary_fs_id,
+                                        peer_spec, f'/{idle_dir}', 'snap0', 1)
+
+            sync_dir = 'mgr_nc_sync'
+            self.mount_a.run_shell(['mkdir', sync_dir])
+            self.mount_a.create_n_files(f'{sync_dir}/file', 10000, sync=True)
+            self.add_directory(self.primary_fs_name, self.primary_fs_id,
+                               f'/{sync_dir}')
+            snap_name = 'snap0'
+            self.mount_a.run_shell(['mkdir', f'{sync_dir}/.snap/{snap_name}'])
+            self.check_peer_syncing_progress_metrics(
+                self.primary_fs_name, self.primary_fs_id, peer_spec,
+                f'/{sync_dir}', snap_name, sync_mode='full')
+
+            peer_uuid = self.get_peer_uuid(peer_spec)
+            dirs = [f'/{idle_dir}', f'/{sync_dir}']
+            self.check_mgr_snapshot_mirror_status_scopes_match_asok(
+                self.primary_fs_name, self.primary_fs_id, f'/{sync_dir}',
+                peer_uuid, dirs, expected_state='syncing')
+            self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
+        finally:
+            self.enable_mgr_metrics_cache()
+
     def test_cephfs_mirror_duplicate_acquire_notify(self):
         """Duplicate acquire notifies must not leave a ghost replayer directory.