f'for {dir_path}')
return entry['counters']
+ def get_peer_perf_counters(self):
+ res = self.mirror_daemon_command(
+ f'counter dump for fs: {self.primary_fs_name}', 'counter', 'dump')
+ self.assertIn(self.PERF_COUNTER_KEY_NAME_CEPHFS_MIRROR_PEER, res)
+ self.assertGreater(len(res[self.PERF_COUNTER_KEY_NAME_CEPHFS_MIRROR_PEER]), 0)
+ return res[self.PERF_COUNTER_KEY_NAME_CEPHFS_MIRROR_PEER][0]['counters']
+
+ @retry_assert(timeout=60, interval=1)
+ def wait_peer_add_directory_counter(self, expected):
+ counters = self.get_peer_perf_counters()
+ self.assertIn('add_directory', counters)
+ actual = counters['add_directory']
+ if actual != expected:
+ raise AssertionError(
+ f'expected add_directory={expected}, got {actual}')
+
def assert_directory_perf_labels(self, labels, dir_path, peer_uuid):
self.assertEqual(labels['source_fscid'], str(self.primary_fs_id))
self.assertEqual(labels['source_filesystem'], self.primary_fs_name)
e.res = res
raise
+ @retry_assert(timeout=120, interval=1)
+ def stop_daemon_when_peer_snap_in_progress(self, fs_name, fs_id,
+ peer_spec, dir_name, snap_name):
+ peer_uuid = self.get_peer_uuid(peer_spec)
+ res = self.mirror_daemon_command(f'peer status for fs: {fs_name}',
+ 'fs', 'mirror', 'peer', 'status',
+ f'{fs_name}@{fs_id}', peer_uuid)
+ try:
+ dir_stat = self.peer_dir_status(res, dir_name, peer_uuid)
+ if dir_stat.get('last_synced_snap', {}).get('name') == snap_name:
+ raise RuntimeError(
+ f'snapshot {snap_name!r} synced before caps could be restricted')
+ self.assertTrue('syncing' == dir_stat['state'])
+ self.assertTrue(dir_stat['current_syncing_snap']['name'] == snap_name)
+ except RETRY_EXCEPTIONS as e:
+ e.res = res
+ raise
+
+ log.debug('peer syncing %s, stopping mirror daemon', snap_name)
+ self.stop_mirror_daemon()
+
def verify_snapshot(self, dir_name, snap_name):
snap_list = self.mount_b.ls(path=f'{dir_name}/.snap')
self.assertTrue(snap_name in snap_list)
log.debug(f'status: {status}')
return status
+ def checkpoint_list(self, fs_name, dir_path):
+ return json.loads(self.get_ceph_cmd_stdout(
+ "fs", "snapshot", "mirror", "checkpoint", "ls",
+ fs_name, dir_path))
+
+ def checkpoint_add(self, fs_name, dir_path, snap_name):
+ out = json.loads(self.get_ceph_cmd_stdout(
+ "fs", "snapshot", "mirror", "checkpoint", "add",
+ fs_name, dir_path, snap_name))
+ self.assertEqual(out['status'], 'success')
+ return out
+
+ def checkpoint_remove(self, fs_name, dir_path, snap_name):
+ out = json.loads(self.get_ceph_cmd_stdout(
+ "fs", "snapshot", "mirror", "checkpoint", "remove",
+ fs_name, dir_path, snap_name))
+ self.assertEqual(out['status'], 'success')
+ return out
+
+ def checkpoint_now(self, fs_name, dir_path):
+ out = json.loads(self.get_ceph_cmd_stdout(
+ "fs", "snapshot", "mirror", "checkpoint", "now",
+ fs_name, dir_path))
+ self.assertEqual(out['status'], 'success')
+ return out
+
+ @staticmethod
+ def find_checkpoint(checkpoints, snap_name):
+ for cp in checkpoints:
+ if cp['snap_name'] == snap_name:
+ return cp
+ return None
+
+ def assert_checkpoint_listed(self, fs_name, dir_path, snap_name):
+ res = self.checkpoint_list(fs_name, dir_path)
+ cp = self.find_checkpoint(res['checkpoints'], snap_name)
+ self.assertIsNotNone(cp, f'checkpoint {snap_name} should be listed')
+
+ def assert_checkpoint_not_listed(self, fs_name, dir_path, snap_name):
+ res = self.checkpoint_list(fs_name, dir_path)
+ cp = self.find_checkpoint(res['checkpoints'], snap_name)
+ self.assertIsNone(cp, f'checkpoint {snap_name} should not be listed')
+
+ def start_mirror_daemon(self):
+ self.mount_a.run_shell_payload(
+ 'nohup cephfs-mirror --id mirror </dev/null >/dev/null 2>&1 &')
+
+ @retry_assert(timeout=60, interval=2)
+ def wait_ready():
+ res = self.mirror_daemon_command(
+ f'counter dump for fs: {self.primary_fs_name}', 'counter', 'dump')
+ self.assertIn(TestMirroring.PERF_COUNTER_KEY_NAME_CEPHFS_MIRROR_FS, res)
+
+ wait_ready()
+
+ def stop_mirror_daemon(self, wait=5):
+ pid = self.get_mirror_daemon_pid()
+ log.debug(f'stopping cephfs-mirror (pid={pid})')
+ self.mount_a.run_shell_payload(f'kill -TERM {pid} || true')
+ time.sleep(wait)
+
+ def restart_mirroring_module(self, wait=10):
+ log.debug('restarting mirroring mgr module')
+ self.run_ceph_cmd("mgr", "module", "disable", self.MODULE_NAME)
+ time.sleep(2)
+ self.run_ceph_cmd("mgr", "module", "enable", self.MODULE_NAME)
+ time.sleep(wait)
+
+ def _setup_mirrored_directory(self, dir_path, peer_spec=None, mount_b=False):
+ """Enable mirroring and track a directory; optionally mount backup FS and add peer."""
+ if mount_b:
+ self.setup_mount_b(mds_perm='rw')
+ self.mount_a.run_shell(["mkdir", "-p", dir_path.lstrip('/')])
+ self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
+ self.add_directory(self.primary_fs_name, self.primary_fs_id, dir_path)
+ if peer_spec:
+ self.peer_add(self.primary_fs_name, self.primary_fs_id, peer_spec,
+ self.secondary_fs_name)
+
+ def _add_checkpoint_snapshot(self, dir_path, snap_name, verify_listed=False):
+ """Create a snapshot and add a checkpoint on it."""
+ self.mount_a.run_shell(["mkdir", f"{dir_path.lstrip('/')}/.snap/{snap_name}"])
+ self.checkpoint_add(self.primary_fs_name, dir_path, snap_name)
+ if verify_listed:
+ self.assert_checkpoint_listed(self.primary_fs_name, dir_path, snap_name)
+
+ def _setup_checkpoint_dir(self, dir_path, snap_name, peer_spec=None, mount_b=True):
+ """Create a tracked directory with a checkpointed snapshot."""
+ self._setup_mirrored_directory(dir_path, peer_spec=peer_spec, mount_b=mount_b)
+ self._add_checkpoint_snapshot(dir_path, snap_name, verify_listed=True)
+
+ def _teardown_mirroring(self, dir_path, peer_spec=None):
+ """Untrack directory, remove peer, and disable mirroring on the filesystem."""
+ self.remove_directory(self.primary_fs_name, self.primary_fs_id, dir_path)
+ if peer_spec:
+ self.peer_remove(self.primary_fs_name, self.primary_fs_id, peer_spec)
+ self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
+
+ @retry_assert(timeout=600, interval=10)
+ def check_checkpoint_status(self, fs_name, dir_path, snap_name, expected_status):
+ res = self.checkpoint_list(fs_name, dir_path)
+ cp = self.find_checkpoint(res['checkpoints'], snap_name)
+ if cp is None or cp['status'] != expected_status:
+ raise AssertionError(
+ f'checkpoint {snap_name}: expected status {expected_status}, '
+ f'got {None if cp is None else cp["status"]!r}')
+
+ def check_checkpoint_statuses(self, fs_name, dir_path, snap_names, expected_status):
+ for snap_name in snap_names:
+ self.check_checkpoint_status(fs_name, dir_path, snap_name, expected_status)
+
def setup_mount_b(self, mds_perm):
log.debug('reconfigure client auth caps')
self.get_ceph_cmd_result(
log.debug(f'mounting filesystem {self.secondary_fs_name}')
self.mount_b.mount_wait(cephfs_name=self.secondary_fs_name)
+ def _restrict_mirror_remote_caps(self):
+ """Restrict mirror remote client OSD write to force snapshot sync failure.
+
+ The remote mirrored directory must already exist (created while the
+ client still has OSD write). OSD read-only blocks file data transfer
+ during snapshot sync, where checkpoint_sync_failed() is recorded.
+ """
+ self.get_ceph_cmd_result(
+ 'auth', 'caps', 'client.mirror_remote',
+ 'mds', 'allow rwps',
+ 'mon', 'allow r',
+ 'osd', "allow r tag cephfs *=*",
+ 'mgr', 'allow r')
+
+ def _restore_mirror_remote_caps(self):
+ """Restore mirror remote client caps used by the mirror test suite."""
+ self.get_ceph_cmd_result(
+ 'auth', 'caps', 'client.mirror_remote',
+ 'mds', 'allow rwps',
+ 'mon', 'allow r',
+ 'osd', "allow rw tag cephfs *=*",
+ 'mgr', 'allow r')
+
def test_basic_mirror_commands(self):
self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
self.mount_a.run_shell(["rmdir", dir1])
self.mount_a.run_shell(["rmdir", dir2])
+ def test_checkpoint_cli_add_list_remove_now(self):
+ """Test mgr checkpoint add/list/remove/now on snapshot metadata."""
+ dir_path = '/cp_cli'
+ snap0 = 'snap0'
+ snap1 = 'snap1'
+
+ self._setup_mirrored_directory(dir_path)
+ self._add_checkpoint_snapshot(dir_path, snap0)
+
+ res = self.checkpoint_list(self.primary_fs_name, dir_path)
+ self.assertEqual(res['dir_root'], dir_path)
+ cp = self.find_checkpoint(res['checkpoints'], snap0)
+ self.assertIsNotNone(cp)
+ self.assertEqual(cp['status'], 'created')
+ self.assertTrue(cp['created_at'])
+ self.assertTrue(cp['updated_at'])
+
+ self.mount_a.run_shell(["mkdir", f"{dir_path.lstrip('/')}/.snap/{snap1}"])
+ out = self.checkpoint_now(self.primary_fs_name, dir_path)
+ self.assertEqual(out['snap_name'], snap1)
+
+ res = self.checkpoint_list(self.primary_fs_name, dir_path)
+ self.assertEqual(len(res['checkpoints']), 2)
+
+ self.checkpoint_remove(self.primary_fs_name, dir_path, snap0)
+ res = self.checkpoint_list(self.primary_fs_name, dir_path)
+ self.assertEqual(len(res['checkpoints']), 1)
+ self.assertEqual(res['checkpoints'][0]['snap_name'], snap1)
+
+ self.checkpoint_remove(self.primary_fs_name, dir_path, snap1)
+ res = self.checkpoint_list(self.primary_fs_name, dir_path)
+ self.assertEqual(res['checkpoints'], [])
+
+ self._teardown_mirroring(dir_path)
+
+ def test_checkpoint_cli_errors(self):
+ """Test checkpoint CLI validation errors."""
+ dir_path = '/cp_err'
+ tracked = '/cp_err/tracked'
+ snap = 'snap0'
+
+ self._setup_mirrored_directory(tracked)
+ self.mount_a.run_shell(["mkdir", f"{tracked.lstrip('/')}/.snap/{snap}"])
+
+ try:
+ self.checkpoint_add(self.primary_fs_name, dir_path, snap)
+ except CommandFailedError as ce:
+ if ce.exitstatus != errno.ENOENT:
+ raise RuntimeError(
+ f'expected ENOENT for untracked directory, got {ce.exitstatus}')
+ else:
+ raise RuntimeError('expected checkpoint add to fail for untracked directory')
+
+ try:
+ self.checkpoint_remove(self.primary_fs_name, tracked, snap)
+ except CommandFailedError as ce:
+ if ce.exitstatus != errno.ENOENT:
+ raise RuntimeError(
+ f'expected ENOENT when removing non-checkpoint snap, got {ce.exitstatus}')
+ else:
+ raise RuntimeError('expected checkpoint remove to fail')
+
+ self.checkpoint_add(self.primary_fs_name, tracked, snap)
+ try:
+ self.checkpoint_add(self.primary_fs_name, tracked, snap)
+ except CommandFailedError as ce:
+ if ce.exitstatus != errno.EEXIST:
+ raise RuntimeError(
+ f'expected EEXIST when re-adding checkpoint, got {ce.exitstatus}')
+ else:
+ raise RuntimeError('expected checkpoint add to fail for duplicate')
+ self.checkpoint_remove(self.primary_fs_name, tracked, snap)
+
+ self.remove_directory(self.primary_fs_name, self.primary_fs_id, tracked)
+ self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
+ try:
+ self.checkpoint_list(self.primary_fs_name, tracked)
+ except CommandFailedError as ce:
+ if ce.exitstatus != errno.EINVAL:
+ raise RuntimeError(
+ f'expected EINVAL with mirroring disabled, got {ce.exitstatus}')
+ else:
+ raise RuntimeError('expected checkpoint list to fail')
+
+ def test_checkpoint_sync_status_reaches_complete(self):
+ """Test mirror daemon updates checkpoint status after snapshot sync."""
+ dir_path = '/cp_sync'
+ snap_name = 'snap0'
+ peer_spec = "client.mirror_remote@ceph"
+
+ self._setup_mirrored_directory(dir_path, mount_b=True)
+ self._add_checkpoint_snapshot(dir_path, snap_name)
+ self.check_checkpoint_status(self.primary_fs_name, dir_path, snap_name, 'created')
+
+ res = self.checkpoint_list(self.primary_fs_name, dir_path)
+ cp = self.find_checkpoint(res['checkpoints'], snap_name)
+ created_at = cp['created_at']
+ updated_at = cp['updated_at']
+
+ self.peer_add(self.primary_fs_name, self.primary_fs_id, peer_spec, self.secondary_fs_name)
+ self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
+ peer_spec, dir_path, snap_name, 1)
+ self.verify_snapshot(dir_path.lstrip('/'), snap_name)
+ self.check_checkpoint_status(self.primary_fs_name, dir_path, snap_name, 'complete')
+
+ res = self.checkpoint_list(self.primary_fs_name, dir_path)
+ cp = self.find_checkpoint(res['checkpoints'], snap_name)
+ self.assertEqual(cp['created_at'], created_at)
+ self.assertTrue(cp['updated_at'])
+ self.assertNotEqual(cp['updated_at'], updated_at)
+
+ self._teardown_mirroring(dir_path, peer_spec)
+
+ def test_checkpoint_deleted_snapshot_not_listed(self):
+ """Deleted checkpointed snapshots must not appear in checkpoint ls."""
+ dir_path = '/cp_del'
+ snap_name = 'snap0'
+
+ self._setup_mirrored_directory(dir_path)
+ self._add_checkpoint_snapshot(dir_path, snap_name)
+
+ res = self.checkpoint_list(self.primary_fs_name, dir_path)
+ self.assertEqual(len(res['checkpoints']), 1)
+
+ self.mount_a.run_shell(["rmdir", f"{dir_path.lstrip('/')}/.snap/{snap_name}"])
+ res = self.checkpoint_list(self.primary_fs_name, dir_path)
+ self.assertEqual(res['checkpoints'], [])
+
+ self._teardown_mirroring(dir_path)
+
+ def test_checkpoint_renamed_snapshot_shows_new_name(self):
+ """Renamed checkpointed snapshots must appear under the new name in ls."""
+ dir_path = '/cp_rename'
+ old_name = 'snap0'
+ new_name = 'snap1'
+
+ self._setup_mirrored_directory(dir_path)
+ self._add_checkpoint_snapshot(dir_path, old_name)
+
+ self.mount_a.run_shell([
+ "mv",
+ f"{dir_path.lstrip('/')}/.snap/{old_name}",
+ f"{dir_path.lstrip('/')}/.snap/{new_name}",
+ ])
+
+ self.assert_checkpoint_not_listed(self.primary_fs_name, dir_path, old_name)
+ self.check_checkpoint_status(self.primary_fs_name, dir_path, new_name, 'created')
+
+ self._teardown_mirroring(dir_path)
+
+ def test_checkpoint_add_after_rename_allows_same_snap_name(self):
+ """A new snapshot can reuse a name after the checkpointed one is renamed."""
+ dir_path = '/cp_reuse_name'
+ old_name = 'snap0'
+ new_name = 'snap1'
+
+ self._setup_mirrored_directory(dir_path)
+ self._add_checkpoint_snapshot(dir_path, old_name)
+
+ self.mount_a.run_shell([
+ "mv",
+ f"{dir_path.lstrip('/')}/.snap/{old_name}",
+ f"{dir_path.lstrip('/')}/.snap/{new_name}",
+ ])
+ self.mount_a.run_shell(["mkdir", f"{dir_path.lstrip('/')}/.snap/{old_name}"])
+ self.checkpoint_add(self.primary_fs_name, dir_path, old_name)
+
+ res = self.checkpoint_list(self.primary_fs_name, dir_path)
+ self.assertEqual(len(res['checkpoints']), 2)
+ self.check_checkpoint_status(self.primary_fs_name, dir_path, old_name, 'created')
+ self.check_checkpoint_status(self.primary_fs_name, dir_path, new_name, 'created')
+
+ self._teardown_mirroring(dir_path)
+
+ def test_checkpoint_persisted_across_mirror_daemon_restart(self):
+ """Checkpoints survive mirror daemon restart and reach complete after sync."""
+ dir_path = '/cp_mirror_restart'
+ snap_name = 'snap0'
+ peer_spec = "client.mirror_remote@ceph"
+
+ self._setup_checkpoint_dir(dir_path, snap_name)
+ res = self.checkpoint_list(self.primary_fs_name, dir_path)
+ self.assertEqual(len(res['checkpoints']), 1)
+
+ self.stop_mirror_daemon()
+ self.start_mirror_daemon()
+
+ res = self.checkpoint_list(self.primary_fs_name, dir_path)
+ self.assertEqual(len(res['checkpoints']), 1)
+ self.check_checkpoint_status(self.primary_fs_name, dir_path, snap_name, 'created')
+
+ self.peer_add(self.primary_fs_name, self.primary_fs_id, peer_spec, self.secondary_fs_name)
+ self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
+ peer_spec, dir_path, snap_name, 1)
+ self.verify_snapshot(dir_path.lstrip('/'), snap_name)
+ self.check_checkpoint_status(self.primary_fs_name, dir_path, snap_name, 'complete')
+
+ self._teardown_mirroring(dir_path, peer_spec)
+
+ def test_checkpoint_persisted_across_mgr_module_restart(self):
+ """Checkpoints survive mirroring mgr module restart and reach complete after sync."""
+ dir_path = '/cp_mgr_restart'
+ snap_name = 'snap0'
+ peer_spec = "client.mirror_remote@ceph"
+
+ self._setup_checkpoint_dir(dir_path, snap_name)
+ res = self.checkpoint_list(self.primary_fs_name, dir_path)
+ self.assertEqual(len(res['checkpoints']), 1)
+
+ self.restart_mirroring_module()
+
+ res = self.checkpoint_list(self.primary_fs_name, dir_path)
+ self.assertEqual(len(res['checkpoints']), 1)
+ self.check_checkpoint_status(self.primary_fs_name, dir_path, snap_name, 'created')
+
+ self.peer_add(self.primary_fs_name, self.primary_fs_id, peer_spec, self.secondary_fs_name)
+ self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
+ peer_spec, dir_path, snap_name, 1)
+ self.verify_snapshot(dir_path.lstrip('/'), snap_name)
+ self.check_checkpoint_status(self.primary_fs_name, dir_path, snap_name, 'complete')
+
+ self._teardown_mirroring(dir_path, peer_spec)
+
+ def test_checkpoint_add_on_already_synced_snapshot_is_complete(self):
+ """Checkpoint added after sync should be marked complete by the daemon."""
+ dir_path = '/cp_already_synced'
+ snap_name = 'snap0'
+ peer_spec = "client.mirror_remote@ceph"
+
+ self._setup_mirrored_directory(dir_path, peer_spec=peer_spec, mount_b=True)
+ self.mount_a.run_shell(["mkdir", f"{dir_path.lstrip('/')}/.snap/{snap_name}"])
+
+ self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
+ peer_spec, dir_path, snap_name, 1)
+ self.verify_snapshot(dir_path.lstrip('/'), snap_name)
+
+ self.checkpoint_add(self.primary_fs_name, dir_path, snap_name)
+ self.check_checkpoint_status(self.primary_fs_name, dir_path, snap_name, 'complete')
+
+ self._teardown_mirroring(dir_path, peer_spec)
+
+ def test_checkpoint_on_synced_snaps_complete_after_daemon_restart(self):
+ """Checkpoints added on synced snaps while daemon is down reach complete after restart."""
+ dir_path = '/cp_daemon_down'
+ snap_names = [f'snap{i}' for i in range(5)]
+ checkpoint_snaps = ['snap0', 'snap2', 'snap4']
+ peer_spec = "client.mirror_remote@ceph"
+
+ self._setup_mirrored_directory(dir_path, peer_spec=peer_spec, mount_b=True)
+
+ for snap_name in snap_names:
+ self.mount_a.run_shell(["mkdir", f"{dir_path.lstrip('/')}/.snap/{snap_name}"])
+
+ self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
+ peer_spec, dir_path, snap_names[-1], len(snap_names))
+ self.verify_snapshot(dir_path.lstrip('/'), snap_names[-1])
+
+ self.stop_mirror_daemon()
+ for snap_name in checkpoint_snaps:
+ self.checkpoint_add(self.primary_fs_name, dir_path, snap_name)
+ self.check_checkpoint_statuses(
+ self.primary_fs_name, dir_path, checkpoint_snaps, 'created')
+
+ self.start_mirror_daemon()
+ self.check_checkpoint_statuses(
+ self.primary_fs_name, dir_path, checkpoint_snaps, 'complete')
+
+ self._teardown_mirroring(dir_path, peer_spec)
+
+ def test_checkpoint_add_directory_notify_perf_counter(self):
+ """Each checkpoint add must notify the mirror daemon via add_directory()."""
+ snap_count = 100
+ dir_path = '/cp_add_dir_notify'
+ dir_name = dir_path.lstrip('/')
+ peer_spec = "client.mirror_remote@ceph"
+
+ self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
+ self.peer_add(self.primary_fs_name, self.primary_fs_id, peer_spec,
+ self.secondary_fs_name)
+ add_dir_baseline = self.get_peer_perf_counters()['add_directory']
+
+ self.mount_a.run_shell(['mkdir', '-p', dir_name])
+ self.add_directory(self.primary_fs_name, self.primary_fs_id, dir_path,
+ check_perf_counter=False)
+
+ for i in range(snap_count):
+ self.mount_a.run_shell(['touch', f'{dir_name}/file.{i}'])
+ self.mount_a.run_shell(['mkdir', f'{dir_name}/.snap/snap{i}'])
+ self.checkpoint_add(self.primary_fs_name, dir_path, f'snap{i}')
+
+ # Expect baseline + 101 add_directory calls: one for the initial
+ # mirror add directory and one per checkpoint add (each sends an
+ # acquire notify).
+ self.wait_peer_add_directory_counter(add_dir_baseline + 1 + snap_count)
+
+ res = self.checkpoint_list(self.primary_fs_name, dir_path)
+ self.assertEqual(len(res['checkpoints']), snap_count)
+
+ self._teardown_mirroring(dir_path, peer_spec)
+
+ def test_checkpoint_failed_then_complete(self):
+ """Checkpoint is marked failed on sync error and complete after recovery."""
+ dir_path = '/cp_failed'
+ snap_name = 'snap0'
+ peer_spec = "client.mirror_remote@ceph"
+
+ self._setup_mirrored_directory(dir_path, mount_b=True)
+ dir_name = dir_path.lstrip('/')
+ self.mount_a.create_n_files(f'{dir_name}/file', 10000, sync=True)
+ for i in range(20):
+ self.mount_a.write_n_mb(os.path.join(dir_name, f'large_file.{i}'), 100)
+ self._add_checkpoint_snapshot(dir_path, snap_name)
+ self.check_checkpoint_status(self.primary_fs_name, dir_path, snap_name, 'created')
+
+ # peer_add needs write access on the remote MDS (setxattr on ceph.mirror.info).
+ # Let the daemon create the remote dir with full caps, then stop it as soon as
+ # sync starts, restrict OSD write, and start again (checkpoint_sync_failed() path).
+ self.run_ceph_cmd("fs", "snapshot", "mirror", "peer_add",
+ self.primary_fs_name, peer_spec, self.secondary_fs_name)
+ self.stop_daemon_when_peer_snap_in_progress(self.primary_fs_name, self.primary_fs_id,
+ peer_spec, dir_path, snap_name)
+ self._restrict_mirror_remote_caps()
+ self.start_mirror_daemon()
+
+ self.check_checkpoint_status(self.primary_fs_name, dir_path, snap_name, 'failed')
+ res = self.checkpoint_list(self.primary_fs_name, dir_path)
+ cp = self.find_checkpoint(res['checkpoints'], snap_name)
+ self.assertIn('error_msg', cp)
+ self.assertTrue(cp['error_msg'])
+
+ self._restore_mirror_remote_caps()
+ self.restart_mirror_daemon()
+
+ self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
+ peer_spec, dir_path, snap_name, 1)
+ self.verify_snapshot(dir_path.lstrip('/'), snap_name)
+ self.check_checkpoint_status(self.primary_fs_name, dir_path, snap_name, 'complete')
+
+ res = self.checkpoint_list(self.primary_fs_name, dir_path)
+ cp = self.find_checkpoint(res['checkpoints'], snap_name)
+ self.assertNotIn('error_msg', cp)
+
+ self._teardown_mirroring(dir_path, peer_spec)
+
def test_add_relative_directory_path(self):
self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
try:
sync_dir = 'mgr_nc_sync'
self.mount_a.run_shell(['mkdir', sync_dir])
self.mount_a.create_n_files(f'{sync_dir}/file', 10000, sync=True)
+ for i in range(20):
+ self.mount_a.write_n_mb(os.path.join(sync_dir, f'large_file.{i}'), 100)
self.add_directory(self.primary_fs_name, self.primary_fs_id,
f'/{sync_dir}')
snap_name = 'snap0'