import errno
import logging
import random
+import signal
import time
import functools
from tasks.cephfs.cephfs_test_case import CephFSTestCase
from teuthology.exceptions import CommandFailedError
from teuthology.contextutil import safe_while
+from teuthology.orchestra import run
log = logging.getLogger(__name__)
PERF_COUNTER_KEY_NAME_CEPHFS_MIRROR_FS = "cephfs_mirror_mirrored_filesystems"
PERF_COUNTER_KEY_NAME_CEPHFS_MIRROR_PEER = "cephfs_mirror_peers"
PERF_COUNTER_KEY_NAME_CEPHFS_MIRROR_DIRECTORY = "cephfs_mirror_directory"
+ MGR_METRICS_CACHE_TTL = 3
+ MIRROR_TICK_INTERVAL = 1
def setUp(self):
super(TestMirroring, self).setUp()
self.secondary_fs_id = self.backup_fs.id
self.enable_mirroring_module()
self.config_set('client.mirror', 'cephfs_mirror_directory_scan_interval', 1)
+ self.config_set('client.mirror', 'cephfs_mirror_tick_interval',
+ self.MIRROR_TICK_INTERVAL)
+ self.config_set('mgr', 'mgr/mirroring/snapshot_mirror_metrics_cache_ttl',
+ self.MGR_METRICS_CACHE_TTL)
+ self.enable_mgr_metrics_cache()
+
+ def disable_mgr_metrics_cache(self):
+ self.config_set('mgr', 'mgr/mirroring/snapshot_mirror_metrics_cache_enabled',
+ False)
+
+ def enable_mgr_metrics_cache(self):
+ self.config_set('mgr', 'mgr/mirroring/snapshot_mirror_metrics_cache_enabled',
+ True)
+
+ def assert_mgr_mirror_status_scopes(self, fs_name, dir_path, peer_uuid,
+ expected_dirs, asok_res):
+ full_res = self.mgr_mirror_status(fs_name)
+ self.assertEqual(set(full_res['metrics'].keys()), set(expected_dirs))
+
+ peer_res = self.mgr_mirror_status(fs_name, peer_uuid=peer_uuid)
+ self.assertEqual(set(peer_res['metrics'].keys()), set(expected_dirs))
+ for path in expected_dirs:
+ self.assertEqual(
+ set(peer_res['metrics'][path]['peer'].keys()), {peer_uuid})
+
+ dir_res = self.mgr_mirror_status(fs_name, dir_path)
+ self.assertEqual(set(dir_res['metrics'].keys()), {dir_path})
+
+ dir_peer_res = self.mgr_mirror_status(fs_name, dir_path, peer_uuid)
+ self.assertEqual(set(dir_peer_res['metrics'].keys()), {dir_path})
+ self.assertEqual(
+ set(dir_peer_res['metrics'][dir_path]['peer'].keys()), {peer_uuid})
+
+ for path in expected_dirs:
+ for res in (full_res, peer_res):
+ mgr_stat = self.peer_dir_status(res, path, peer_uuid)
+ asok_stat = self.peer_dir_status(asok_res, path, peer_uuid)
+ self.assert_mgr_dir_stat_matches_asok(mgr_stat, asok_stat)
+
+ for res in (dir_res, dir_peer_res):
+ mgr_stat = self.peer_dir_status(res, dir_path, peer_uuid)
+ asok_stat = self.peer_dir_status(asok_res, dir_path, peer_uuid)
+ self.assert_mgr_dir_stat_matches_asok(mgr_stat, asok_stat)
+
+ @retry_assert(timeout=120, interval=1)
+ def check_mgr_snapshot_mirror_status_scopes_match_asok(
+ self, fs_name, fs_id, dir_path, peer_uuid, expected_dirs,
+ expected_state=None):
+ asok_res = self.peer_status(fs_name, fs_id, peer_uuid)
+ asok_stat = self.peer_dir_status(asok_res, dir_path, peer_uuid)
+ if expected_state is not None:
+ self.assertEqual(asok_stat['state'], expected_state)
+ self.assert_mgr_mirror_status_scopes(
+ fs_name, dir_path, peer_uuid, expected_dirs, asok_res)
def tearDown(self):
self.disable_mirroring_module()
self.assertTrue(
snap['eta'] == 'calculating...' or bool(re.search(r'\d', snap['eta'])))
+ def mgr_mirror_status(self, fs_name, mirrored_dir_path=None, peer_uuid=None):
+ args = ["fs", "snapshot", "mirror", "status", fs_name]
+ if mirrored_dir_path is not None:
+ args.append(mirrored_dir_path)
+ if peer_uuid is not None:
+ args.append(f'--peer_uuid={peer_uuid}')
+ return json.loads(self.get_ceph_cmd_stdout(*args))
+
+ def peer_status(self, fs_name, fs_id, peer_uuid):
+ return self.mirror_daemon_command(
+ f'peer status for fs: {fs_name}',
+ 'fs', 'mirror', 'peer', 'status',
+ f'{fs_name}@{fs_id}', peer_uuid)
+
+ def dir_status_from_mgr(self, fs_name, dir_name, peer_uuid,
+ mirrored_dir_path=None):
+ res = self.mgr_mirror_status(
+ fs_name, mirrored_dir_path or dir_name, peer_uuid)
+ return self.peer_dir_status(res, dir_name, peer_uuid)
+
+ def dir_status_from_asok(self, fs_name, fs_id, dir_name, peer_uuid):
+ res = self.peer_status(fs_name, fs_id, peer_uuid)
+ return self.peer_dir_status(res, dir_name, peer_uuid)
+
+ def assert_default_idle_dir_stat(self, dir_stat):
+ self.assertEqual(dir_stat['state'], 'idle')
+ self.assertEqual(dir_stat['snaps_synced'], 0)
+ self.assertEqual(dir_stat['snaps_deleted'], 0)
+ self.assertEqual(dir_stat['snaps_renamed'], 0)
+ self.assertNotIn('last_synced_snap', dir_stat)
+
+ def assert_mgr_dir_stat_matches_asok(self, mgr_stat, asok_stat):
+ self.assertEqual(mgr_stat['state'], asok_stat['state'])
+ for key in ('snaps_synced', 'snaps_deleted', 'snaps_renamed'):
+ self.assertEqual(mgr_stat.get(key), asok_stat.get(key))
+ if 'failure_reason' in asok_stat:
+ self.assertEqual(mgr_stat.get('failure_reason'), asok_stat['failure_reason'])
+ if 'last_synced_snap' in asok_stat:
+ self.assertIn('last_synced_snap', mgr_stat)
+ self.assertEqual(mgr_stat['last_synced_snap']['name'],
+ asok_stat['last_synced_snap']['name'])
+ self.assert_last_synced_snap_metrics(mgr_stat['last_synced_snap'])
+ if 'current_syncing_snap' in asok_stat:
+ self.assertIn('current_syncing_snap', mgr_stat)
+ mgr_snap = mgr_stat['current_syncing_snap']
+ asok_snap = asok_stat['current_syncing_snap']
+ self.assertEqual(mgr_snap['name'], asok_snap['name'])
+ self.assert_syncing_snap_metrics(
+ mgr_snap, sync_mode=asok_snap.get('sync-mode'))
+
+ @retry_assert(timeout=120, interval=1)
+ def check_mgr_dir_stat_matches_asok(self, fs_name, fs_id, dir_name, peer_uuid,
+ mirrored_dir_path=None):
+ mgr_stat = self.dir_status_from_mgr(
+ fs_name, dir_name, peer_uuid, mirrored_dir_path)
+ asok_stat = self.dir_status_from_asok(fs_name, fs_id, dir_name, peer_uuid)
+ try:
+ self.assert_mgr_dir_stat_matches_asok(mgr_stat, asok_stat)
+ except RETRY_EXCEPTIONS as e:
+ e.mgr_stat = mgr_stat
+ e.asok_stat = asok_stat
+ raise
+
+ def wait_for_mirror_daemon_stop(self, pid):
+ with safe_while(sleep=1, tries=60,
+ action='wait for mirror daemon stop') as proceed:
+ while proceed():
+ try:
+ cur_pid = self.get_mirror_daemon_pid()
+ except CommandFailedError:
+ return
+ if cur_pid != pid:
+ return
+ p = self.mount_a.run_shell(['kill', '-0', cur_pid],
+ check_status=False)
+ if p.returncode != 0:
+ return
+
+ def restart_mirror_daemon(self):
+ # daemon.start() always calls restart(), which skips stop() once proc
+ # is cleared. Stop the real cephfs-mirror via its pid file first so
+ # the new instance can take the pidfile lock.
+ daemons = list(self.ctx.daemons.iter_daemons_of_role('cephfs-mirror'))
+ self.assertEqual(len(daemons), 1,
+ 'expected a single cephfs-mirror daemon')
+ daemon = daemons[0]
+ rados_inst_before = self.get_mirror_rados_addr(
+ self.primary_fs_name, self.primary_fs_id)
+ pid = self.get_mirror_daemon_pid()
+
+ log.debug(f'SIGTERM to cephfs-mirror pid {pid}')
+ if daemon.running():
+ try:
+ daemon.signal(signal.SIGTERM, silent=True)
+ except Exception as e:
+ log.debug(f'failed to signal cephfs-mirror via teuthology: {e}')
+ self.mount_a.run_shell(['kill', '-TERM', pid])
+ self.wait_for_mirror_daemon_stop(pid)
+
+ if daemon.running():
+ try:
+ run.wait([daemon.proc], timeout=10)
+ except CommandFailedError:
+ pass
+ daemon.reset()
+
+ log.debug('starting cephfs-mirror')
+ daemon.start()
+
+ time.sleep(60)
+ with safe_while(sleep=2, tries=30,
+ action='wait for mirror daemon restart') as proceed:
+ while proceed():
+ try:
+ rados_inst = self.get_mirror_rados_addr(
+ self.primary_fs_name, self.primary_fs_id)
+ if rados_inst and rados_inst != rados_inst_before:
+ break
+ except CommandFailedError:
+ pass
+
+ def wait_for_mirror_daemon_recovery(self, fs_name, fs_id, dir_name, peer_uuid):
+ # A new rados_inst alone does not mean mirroring is ready: wait until the
+ # peer is configured, the snap dir is registered, and asok reports it.
+ with safe_while(sleep=2, tries=60,
+ action='wait for mirror daemon recovery') as proceed:
+ while proceed():
+ if not self.get_mirror_rados_addr(fs_name, fs_id):
+ continue
+ try:
+ mirror_res = self.mirror_daemon_command(
+ f'mirror status for fs: {fs_name}',
+ 'fs', 'mirror', 'status', f'{fs_name}@{fs_id}')
+ except CommandFailedError:
+ continue
+ if peer_uuid not in mirror_res.get('peers', {}):
+ continue
+ if mirror_res.get('snap_dirs', {}).get('dir_count', 0) < 1:
+ continue
+ try:
+ peer_res = self.peer_status(fs_name, fs_id, peer_uuid)
+ self.peer_dir_status(peer_res, dir_name, peer_uuid)
+ except RETRY_EXCEPTIONS:
+ continue
+ return
+
+ @retry_assert(timeout=120, interval=1)
+ def check_mgr_and_asok_session_counters_zero(self, fs_name, fs_id, dir_name,
+ peer_uuid):
+ mgr_stat = self.dir_status_from_mgr(fs_name, dir_name, peer_uuid)
+ asok_stat = self.dir_status_from_asok(fs_name, fs_id, dir_name, peer_uuid)
+ self.assertEqual(mgr_stat['state'], 'idle')
+ self.assertEqual(asok_stat['state'], 'idle')
+ for key in ('snaps_synced', 'snaps_deleted', 'snaps_renamed'):
+ self.assertEqual(mgr_stat.get(key), 0, msg=f'mgr {key}')
+ self.assertEqual(asok_stat.get(key), 0, msg=f'asok {key}')
+ self.assertEqual(mgr_stat['last_synced_snap']['name'],
+ asok_stat['last_synced_snap']['name'])
+
+ @retry_assert(timeout=90, interval=5)
+ def check_mgr_dir_stat_stale(self, fs_name, dir_name, peer_uuid):
+ mgr_stat = self.dir_status_from_mgr(fs_name, dir_name, peer_uuid)
+ self.assertEqual(mgr_stat['state'], 'stale',
+ msg=f'unexpected mgr stat: {mgr_stat}')
+ self.assertNotIn('current_syncing_snap', mgr_stat)
+
@retry_assert(timeout=120, interval=1)
def check_peer_syncing_progress_metrics(self, fs_name, fs_id, peer_spec, dir_name,
snap_name, sync_mode=None):
self.config_set('client.mirror', 'cephfs_mirror_distribute_datasync_threads', 'true')
self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
+ def test_mgr_snapshot_mirror_status_matches_asok_idle(self):
+ """Mgr snapshot mirror status matches asok peer status after sync."""
+ self.setup_mount_b(mds_perm='rw')
+ self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
+ peer_spec = "client.mirror_remote@ceph"
+ self.peer_add(self.primary_fs_name, self.primary_fs_id, peer_spec,
+ self.secondary_fs_name)
+
+ dir_name = 'mgr_status_dir'
+ self.mount_a.run_shell(['mkdir', dir_name])
+ self.mount_a.create_n_files(f'{dir_name}/file', 50, sync=True)
+ self.add_directory(self.primary_fs_name, self.primary_fs_id, f'/{dir_name}')
+
+ snap_name = 'snap0'
+ self.mount_a.run_shell(['mkdir', f'{dir_name}/.snap/{snap_name}'])
+ self.check_peer_status_idle(self.primary_fs_name, self.primary_fs_id,
+ peer_spec, f'/{dir_name}', snap_name, 1)
+
+ peer_uuid = self.get_peer_uuid(peer_spec)
+ self.check_mgr_dir_stat_matches_asok(
+ self.primary_fs_name, self.primary_fs_id, f'/{dir_name}', peer_uuid)
+ self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
+
+ def test_mgr_snapshot_mirror_status_matches_asok_syncing(self):
+ """Mgr snapshot mirror status matches asok peer status during sync."""
+ self.setup_mount_b(mds_perm='rw')
+ self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
+ peer_spec = "client.mirror_remote@ceph"
+ self.peer_add(self.primary_fs_name, self.primary_fs_id, peer_spec,
+ self.secondary_fs_name)
+
+ dir_name = 'mgr_sync_dir'
+ self.mount_a.run_shell(['mkdir', dir_name])
+ self.mount_a.create_n_files(f'{dir_name}/file', 8000, sync=True)
+ self.add_directory(self.primary_fs_name, self.primary_fs_id, f'/{dir_name}')
+
+ snap_name = 'snap0'
+ self.mount_a.run_shell(['mkdir', f'{dir_name}/.snap/{snap_name}'])
+ self.check_peer_syncing_progress_metrics(
+ self.primary_fs_name, self.primary_fs_id, peer_spec, f'/{dir_name}',
+ snap_name, sync_mode='full')
+
+ peer_uuid = self.get_peer_uuid(peer_spec)
+ self.check_mgr_dir_stat_matches_asok(
+ self.primary_fs_name, self.primary_fs_id, f'/{dir_name}', peer_uuid)
+ self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
+
+ def test_mgr_snapshot_mirror_status_default_idle_new_dir(self):
+ """Mgr status reports default idle metrics for a newly added directory."""
+ self.setup_mount_b(mds_perm='rw')
+ self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
+ peer_spec = "client.mirror_remote@ceph"
+ self.peer_add(self.primary_fs_name, self.primary_fs_id, peer_spec,
+ self.secondary_fs_name)
+
+ dir_name = 'mgr_default_dir'
+ self.mount_a.run_shell(['mkdir', dir_name])
+ self.add_directory(self.primary_fs_name, self.primary_fs_id, f'/{dir_name}')
+
+ peer_uuid = self.get_peer_uuid(peer_spec)
+ mgr_stat = self.dir_status_from_mgr(
+ self.primary_fs_name, f'/{dir_name}', peer_uuid)
+ self.assert_default_idle_dir_stat(mgr_stat)
+ self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
+
+ def test_mgr_snapshot_mirror_status_stale_after_daemon_stop(self):
+ """Mgr status marks frozen omap syncing progress as stale."""
+ self.setup_mount_b(mds_perm='rw')
+ self.mount_a.run_shell(["mkdir", "d0"])
+ for i in range(8):
+ self.mount_a.write_n_mb(os.path.join('d0', f'file.{i}'), 1024)
+
+ self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
+ self.add_directory(self.primary_fs_name, self.primary_fs_id, '/d0')
+ peer_spec = "client.mirror_remote@ceph"
+ self.peer_add(self.primary_fs_name, self.primary_fs_id, peer_spec,
+ self.secondary_fs_name)
+
+ self.mount_a.run_shell(["mkdir", "d0/.snap/snap0"])
+ self.check_peer_snap_in_progress(self.primary_fs_name, self.primary_fs_id,
+ peer_spec, '/d0', 'snap0')
+
+ peer_uuid = self.get_peer_uuid(peer_spec)
+ # Wait for live metrics to reach omap before freezing the daemon.
+ # Stale detection requires a persisted _instance_id; without an omap
+ # write the mgr reports default idle metrics instead of stale.
+ mgr_syncing = False
+ with safe_while(sleep=2, tries=30,
+ action='wait for omap syncing metrics') as proceed:
+ while proceed():
+ mgr_stat = self.dir_status_from_mgr(
+ self.primary_fs_name, '/d0', peer_uuid)
+ if mgr_stat.get('state') == 'syncing':
+ mgr_syncing = True
+ break
+ self.assertTrue(
+ mgr_syncing,
+ 'mgr never reported syncing before SIGSTOP; '
+ f'last mgr stat: {mgr_stat}')
+
+ pid = self.get_mirror_daemon_pid()
+ log.debug(f'SIGSTOP to cephfs-mirror pid {pid}')
+ self.mount_a.run_shell(['kill', '-SIGSTOP', pid])
+ try:
+ # InstanceWatcher INSTANCE_TIMEOUT is 30s; allow extra time for
+ # the mgr notify loop to age out the frozen instance.
+ time.sleep(40)
+ self.check_mgr_dir_stat_stale(
+ self.primary_fs_name, '/d0', peer_uuid)
+ finally:
+ log.debug('SIGCONT to cephfs-mirror')
+ self.mount_a.run_shell(['kill', '-SIGCONT', pid])
+
+ # wait for restart mirror on blocklist
+ time.sleep(60)
+ with safe_while(sleep=2, tries=20,
+ action='wait for mirror daemon recovery after SIGSTOP') as proceed:
+ while proceed():
+ if not self.get_mirror_rados_addr(self.primary_fs_name,
+ self.primary_fs_id):
+ continue
+ res = self.mirror_daemon_command(
+ f'mirror status for fs: {self.primary_fs_name}',
+ 'fs', 'mirror', 'status',
+ f'{self.primary_fs_name}@{self.primary_fs_id}')
+ if 'snap_dirs' in res:
+ break
+
+ self.remove_directory(self.primary_fs_name, self.primary_fs_id, '/d0')
+ self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
+
+ def test_mgr_snapshot_mirror_status_no_peers(self):
+ """Mgr status returns empty metrics when no peers are configured."""
+ self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
+ res = self.mgr_mirror_status(self.primary_fs_name)
+ self.assertEqual(res, {'metrics': {}})
+ self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
+
+ def test_mgr_snapshot_mirror_status_errors(self):
+ """Mgr status returns expected errors for invalid inputs."""
+ self.setup_mount_b(mds_perm='rw')
+ self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
+ peer_spec = "client.mirror_remote@ceph"
+ self.peer_add(self.primary_fs_name, self.primary_fs_id, peer_spec,
+ self.secondary_fs_name)
+
+ dir_name = 'mgr_err_dir'
+ self.mount_a.run_shell(['mkdir', dir_name])
+ self.add_directory(self.primary_fs_name, self.primary_fs_id, f'/{dir_name}')
+
+ try:
+ self.get_ceph_cmd_stdout("fs", "snapshot", "mirror", "status",
+ "nonexistent_fs_name")
+ except CommandFailedError as ce:
+ if ce.exitstatus != errno.ENOENT:
+ raise RuntimeError(-errno.ENOENT,
+ 'incorrect error for unknown filesystem')
+ else:
+ raise RuntimeError(-errno.ENOENT, 'expected unknown filesystem to fail')
+
+ self.remove_directory(self.primary_fs_name, self.primary_fs_id, f'/{dir_name}')
+ self.peer_remove(self.primary_fs_name, self.primary_fs_id, peer_spec)
+ self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
+ try:
+ self.get_ceph_cmd_stdout("fs", "snapshot", "mirror", "status",
+ self.primary_fs_name)
+ except CommandFailedError as ce:
+ if ce.exitstatus != errno.EINVAL:
+ raise RuntimeError(-errno.EINVAL,
+ 'incorrect error for non-mirrored filesystem')
+ else:
+ raise RuntimeError(-errno.EINVAL, 'expected non-mirrored fs to fail')
+
+ self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
+ try:
+ self.get_ceph_cmd_stdout("fs", "snapshot", "mirror", "status",
+ self.primary_fs_name, '/not_mirrored')
+ except CommandFailedError as ce:
+ if ce.exitstatus != errno.ENOENT:
+ raise RuntimeError(-errno.ENOENT,
+ 'incorrect error for unknown directory')
+ else:
+ raise RuntimeError(-errno.ENOENT, 'expected unknown directory to fail')
+
+ try:
+ self.get_ceph_cmd_stdout(
+ "fs", "snapshot", "mirror", "status",
+ self.primary_fs_name, f'/{dir_name}',
+ '--peer_uuid=00000000-0000-0000-0000-000000000000')
+ except CommandFailedError as ce:
+ if ce.exitstatus != errno.ENOENT:
+ raise RuntimeError(-errno.ENOENT,
+ 'incorrect error for unknown peer')
+ else:
+ raise RuntimeError(-errno.ENOENT, 'expected unknown peer to fail')
+
+ self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
+
+ def test_mgr_snapshot_mirror_status_filter_scopes(self):
+ """Mgr status filters by filesystem, directory, and peer scope."""
+ self.setup_mount_b(mds_perm='rw')
+ self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
+ peer_spec = "client.mirror_remote@ceph"
+ self.peer_add(self.primary_fs_name, self.primary_fs_id, peer_spec,
+ self.secondary_fs_name)
+
+ for dir_name in ('mgr_f0', 'mgr_f1'):
+ self.mount_a.run_shell(['mkdir', dir_name])
+ self.mount_a.create_n_files(f'{dir_name}/file', 50, sync=True)
+ self.add_directory(self.primary_fs_name, self.primary_fs_id,
+ f'/{dir_name}')
+ self.mount_a.run_shell(['mkdir', f'{dir_name}/.snap/snap0'])
+
+ self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
+ peer_spec, '/mgr_f0', 'snap0', 1)
+ self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
+ peer_spec, '/mgr_f1', 'snap0', 1)
+
+ peer_uuid = self.get_peer_uuid(peer_spec)
+ full_res = self.mgr_mirror_status(self.primary_fs_name)
+ self.assertIn('/mgr_f0', full_res['metrics'])
+ self.assertIn('/mgr_f1', full_res['metrics'])
+
+ dir_res = self.mgr_mirror_status(self.primary_fs_name, '/mgr_f0')
+ self.assertEqual(set(dir_res['metrics'].keys()), {'/mgr_f0'})
+
+ peer_res = self.mgr_mirror_status(
+ self.primary_fs_name, '/mgr_f0', peer_uuid)
+ self.assertEqual(set(peer_res['metrics'].keys()), {'/mgr_f0'})
+ self.assertIn(peer_uuid, peer_res['metrics']['/mgr_f0']['peer'])
+ self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
+
+ def test_mgr_snapshot_mirror_status_survives_daemon_restart(self):
+ """Mgr status keeps persisted last_synced_snap and resets session counters after restart."""
+ self.setup_mount_b(mds_perm='rw')
+ self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
+ peer_spec = "client.mirror_remote@ceph"
+ self.peer_add(self.primary_fs_name, self.primary_fs_id, peer_spec,
+ self.secondary_fs_name)
+
+ dir_name = 'mgr_restart_dir'
+ self.mount_a.run_shell(['mkdir', dir_name])
+ self.mount_a.create_n_files(f'{dir_name}/file', 3000, sync=True)
+ self.add_directory(self.primary_fs_name, self.primary_fs_id, f'/{dir_name}')
+
+ snap0 = 'snap0'
+ self.mount_a.run_shell(['mkdir', f'{dir_name}/.snap/{snap0}'])
+ self.check_peer_status_idle(self.primary_fs_name, self.primary_fs_id,
+ peer_spec, f'/{dir_name}', snap0, 1)
+
+ for i in range(5):
+ self.mount_a.write_n_mb(os.path.join(dir_name, f'more_file.{i}'), 1)
+
+ snap1 = 'snap1'
+ self.mount_a.run_shell(['mkdir', f'{dir_name}/.snap/{snap1}'])
+ self.check_peer_status_idle(self.primary_fs_name, self.primary_fs_id,
+ peer_spec, f'/{dir_name}', snap1, 2)
+
+ self.mount_a.run_shell(['rmdir', f'{dir_name}/.snap/{snap0}'])
+ self.check_peer_status_deleted_snap(self.primary_fs_name, self.primary_fs_id,
+ peer_spec, f'/{dir_name}', 1)
+ snap_list = self.mount_b.ls(path=f'{dir_name}/.snap')
+ self.assertNotIn(snap0, snap_list)
+
+ snap2 = 'snap2'
+ self.mount_a.run_shell(['mv', f'{dir_name}/.snap/{snap1}',
+ f'{dir_name}/.snap/{snap2}'])
+ self.check_peer_status_renamed_snap(self.primary_fs_name, self.primary_fs_id,
+ peer_spec, f'/{dir_name}', 1)
+ snap_list = self.mount_b.ls(path=f'{dir_name}/.snap')
+ self.assertNotIn(snap1, snap_list)
+ self.assertIn(snap2, snap_list)
+ self.check_peer_status_idle(self.primary_fs_name, self.primary_fs_id,
+ peer_spec, f'/{dir_name}', snap2, 2)
+
+ peer_uuid = self.get_peer_uuid(peer_spec)
+ before = self.dir_status_from_mgr(
+ self.primary_fs_name, f'/{dir_name}', peer_uuid)
+ self.assertEqual(before['snaps_synced'], 2)
+ self.assertEqual(before['snaps_deleted'], 1)
+ self.assertEqual(before['snaps_renamed'], 1)
+ self.assertEqual(before['last_synced_snap']['name'], snap2)
+
+ self.restart_mirror_daemon()
+ self.wait_for_mirror_daemon_recovery(
+ self.primary_fs_name, self.primary_fs_id, f'/{dir_name}', peer_uuid)
+ self.check_mgr_and_asok_session_counters_zero(
+ self.primary_fs_name, self.primary_fs_id, f'/{dir_name}', peer_uuid)
+
+ after = self.dir_status_from_mgr(
+ self.primary_fs_name, f'/{dir_name}', peer_uuid)
+ self.assertEqual(after['last_synced_snap']['name'],
+ before['last_synced_snap']['name'])
+ self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
+
+ def test_mgr_snapshot_mirror_status_after_directory_remove(self):
+ """Mgr status for a removed directory returns ENOENT."""
+ self.setup_mount_b(mds_perm='rw')
+ self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
+ peer_spec = "client.mirror_remote@ceph"
+ self.peer_add(self.primary_fs_name, self.primary_fs_id, peer_spec,
+ self.secondary_fs_name)
+
+ dir_name = 'mgr_rm_dir'
+ self.mount_a.run_shell(['mkdir', dir_name])
+ self.add_directory(self.primary_fs_name, self.primary_fs_id, f'/{dir_name}')
+ self.remove_directory(self.primary_fs_name, self.primary_fs_id, f'/{dir_name}')
+
+ try:
+ self.mgr_mirror_status(self.primary_fs_name, f'/{dir_name}')
+ except CommandFailedError as ce:
+ if ce.exitstatus != errno.ENOENT:
+ raise RuntimeError(-errno.ENOENT,
+ 'incorrect error for removed directory')
+ else:
+ raise RuntimeError(-errno.ENOENT, 'expected removed directory to fail')
+
+ self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
+
+ def test_mgr_snapshot_mirror_status_failed_state(self):
+ """Mgr status reports failed state like asok peer status."""
+ self.setup_mount_b(mds_perm='rwps')
+ self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
+ peer_spec = "client.mirror_remote@ceph"
+ self.peer_add(self.primary_fs_name, self.primary_fs_id, peer_spec,
+ self.secondary_fs_name)
+
+ dir_name = 'd0'
+ self.mount_a.run_shell(['mkdir', dir_name])
+ self.add_directory(self.primary_fs_name, self.primary_fs_id, f'/{dir_name}')
+
+ snap_name = "snap_a"
+ self.mount_a.run_shell(['mkdir', f'{dir_name}/.snap/{snap_name}'])
+ self.check_peer_status_idle(self.primary_fs_name, self.primary_fs_id,
+ peer_spec, f'/{dir_name}', snap_name, 1)
+
+ remote_snap_path = f'{dir_name}/.snap/snap_b'
+ self.mount_b.run_shell(['sudo', 'mkdir', remote_snap_path], omit_sudo=False)
+
+ self.verify_failed_directory(self.primary_fs_name, self.primary_fs_id,
+ peer_spec, f'/{dir_name}')
+ peer_uuid = self.get_peer_uuid(peer_spec)
+ self.check_mgr_dir_stat_matches_asok(
+ self.primary_fs_name, self.primary_fs_id, f'/{dir_name}', peer_uuid)
+
+ self.mount_b.run_shell(['sudo', 'rmdir', remote_snap_path], omit_sudo=False)
+ self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
+
+ def test_mgr_snapshot_mirror_status_cache(self):
+ """Repeated mgr status calls hit the metrics cache within TTL."""
+ self.setup_mount_b(mds_perm='rw')
+ self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
+ peer_spec = "client.mirror_remote@ceph"
+ self.peer_add(self.primary_fs_name, self.primary_fs_id, peer_spec,
+ self.secondary_fs_name)
+
+ dir_name = 'mgr_cache_dir'
+ self.mount_a.run_shell(['mkdir', dir_name])
+ self.mount_a.create_n_files(f'{dir_name}/file', 50, sync=True)
+ self.add_directory(self.primary_fs_name, self.primary_fs_id, f'/{dir_name}')
+
+ snap0 = 'snap0'
+ self.mount_a.run_shell(['mkdir', f'{dir_name}/.snap/{snap0}'])
+ self.check_peer_status_idle(self.primary_fs_name, self.primary_fs_id,
+ peer_spec, f'/{dir_name}', snap0, 1)
+
+ res1 = self.mgr_mirror_status(self.primary_fs_name)
+ res2 = self.mgr_mirror_status(self.primary_fs_name)
+ self.assertEqual(res1, res2)
+
+ snap1 = 'snap1'
+ self.mount_a.run_shell(['mkdir', f'{dir_name}/.snap/{snap1}'])
+ self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
+ peer_spec, f'/{dir_name}', snap1, 2)
+
+ time.sleep(self.MGR_METRICS_CACHE_TTL + 1)
+ res3 = self.mgr_mirror_status(self.primary_fs_name)
+ peer_uuid = self.get_peer_uuid(peer_spec)
+ self.assertEqual(
+ res3['metrics'][f'/{dir_name}']['peer'][peer_uuid]['snaps_synced'], 2)
+ self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
+
+ def test_mgr_snapshot_mirror_status_cache_disabled(self):
+ """Mgr snapshot mirror status reads omap for all scopes when cache is disabled."""
+ self.disable_mgr_metrics_cache()
+ try:
+ self.setup_mount_b(mds_perm='rw')
+ self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
+ peer_spec = "client.mirror_remote@ceph"
+ self.peer_add(self.primary_fs_name, self.primary_fs_id, peer_spec,
+ self.secondary_fs_name)
+
+ dirs = []
+ for dir_name in ('mgr_nc0', 'mgr_nc1'):
+ self.mount_a.run_shell(['mkdir', dir_name])
+ self.mount_a.create_n_files(f'{dir_name}/file', 50, sync=True)
+ self.add_directory(self.primary_fs_name, self.primary_fs_id,
+ f'/{dir_name}')
+ self.mount_a.run_shell(['mkdir', f'{dir_name}/.snap/snap0'])
+ dirs.append(f'/{dir_name}')
+
+ peer_uuid = self.get_peer_uuid(peer_spec)
+ for dir_path in dirs:
+ self.check_peer_status_idle(self.primary_fs_name, self.primary_fs_id,
+ peer_spec, dir_path, 'snap0', 1)
+
+ self.check_mgr_snapshot_mirror_status_scopes_match_asok(
+ self.primary_fs_name, self.primary_fs_id, dirs[0], peer_uuid,
+ dirs)
+
+ self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
+ finally:
+ self.enable_mgr_metrics_cache()
+
+ def test_mgr_snapshot_mirror_status_cache_disabled_syncing(self):
+ """Mgr snapshot mirror status reads omap during sync when cache is disabled."""
+ self.disable_mgr_metrics_cache()
+ try:
+ self.setup_mount_b(mds_perm='rw')
+ self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
+ peer_spec = "client.mirror_remote@ceph"
+ self.peer_add(self.primary_fs_name, self.primary_fs_id, peer_spec,
+ self.secondary_fs_name)
+
+ idle_dir = 'mgr_nc_idle'
+ self.mount_a.run_shell(['mkdir', idle_dir])
+ self.mount_a.create_n_files(f'{idle_dir}/file', 50, sync=True)
+ self.add_directory(self.primary_fs_name, self.primary_fs_id,
+ f'/{idle_dir}')
+ self.mount_a.run_shell(['mkdir', f'{idle_dir}/.snap/snap0'])
+ self.check_peer_status_idle(self.primary_fs_name, self.primary_fs_id,
+ peer_spec, f'/{idle_dir}', 'snap0', 1)
+
+ sync_dir = 'mgr_nc_sync'
+ self.mount_a.run_shell(['mkdir', sync_dir])
+ self.mount_a.create_n_files(f'{sync_dir}/file', 10000, sync=True)
+ self.add_directory(self.primary_fs_name, self.primary_fs_id,
+ f'/{sync_dir}')
+ snap_name = 'snap0'
+ self.mount_a.run_shell(['mkdir', f'{sync_dir}/.snap/{snap_name}'])
+ self.check_peer_syncing_progress_metrics(
+ self.primary_fs_name, self.primary_fs_id, peer_spec,
+ f'/{sync_dir}', snap_name, sync_mode='full')
+
+ peer_uuid = self.get_peer_uuid(peer_spec)
+ dirs = [f'/{idle_dir}', f'/{sync_dir}']
+ self.check_mgr_snapshot_mirror_status_scopes_match_asok(
+ self.primary_fs_name, self.primary_fs_id, f'/{sync_dir}',
+ peer_uuid, dirs, expected_state='syncing')
+ self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
+ finally:
+ self.enable_mgr_metrics_cache()
+
def test_cephfs_mirror_duplicate_acquire_notify(self):
"""Duplicate acquire notifies must not leave a ghost replayer directory.