else:
self.fail('expected write to a read-only export to fail')
+ def _create_cluster_with_fs(self, fs_name, mnt_pt=None):
+ """
+ create a cluster along with fs and mount it to the path supplied
+ :param fs_name: name of CephFS volume to be created
+ :param mnt_pt: mount fs to the path
+ """
+ self._test_create_cluster()
+ self._cmd('fs', 'volume', 'create', fs_name)
+ with contextutil.safe_while(sleep=5, tries=30) as proceed:
+ while proceed():
+ output = self._cmd(
+ 'orch', 'ls', '-f', 'json',
+ '--service-name', f'mds.{fs_name}'
+ )
+ j = json.loads(output)
+ if j[0]['status']['running']:
+ break
+ if mnt_pt:
+ with contextutil.safe_while(sleep=3, tries=3) as proceed:
+ while proceed():
+ try:
+ self.ctx.cluster.run(args=['sudo', 'ceph-fuse', mnt_pt])
+ break
+ except CommandFailedError as e:
+ log.warning(f'{e}, retrying')
+ self.ctx.cluster.run(args=['sudo', 'chmod', '1777', mnt_pt])
+
+ def _delete_cluster_with_fs(self, fs_name, mnt_pt=None, mode=None):
+ """
+ delete cluster along with fs and unmount it from the path supplied
+ :param fs_name: name of CephFS volume to be deleted
+ :param mnt_pt: unmount fs from the path
+ :param mode: revert to this mode
+ """
+ if mnt_pt:
+ self.ctx.cluster.run(args=['sudo', 'umount', mnt_pt])
+ if mode:
+ if isinstance(mode, bytes):
+ mode = mode.decode().strip()
+ self.ctx.cluster.run(args=['sudo', 'chmod', mode, mnt_pt])
+ self._cmd('fs', 'volume', 'rm', fs_name, '--yes-i-really-mean-it')
+ self._test_delete_cluster()
+
def test_create_and_delete_cluster(self):
'''
Test successful creation and deletion of the nfs cluster.
if e.exitstatus != errno.ENOENT:
raise
self._test_delete_cluster()
+
+ def test_nfs_export_creation_at_filepath(self):
+ """
+ Test that nfs exports can't be created at a filepath
+ """
+ mnt_pt = '/mnt'
+ preserve_mode = self._sys_cmd(['stat', '-c', '%a', mnt_pt])
+ self._create_cluster_with_fs(self.fs_name, mnt_pt)
+ self.ctx.cluster.run(args=['touch', f'{mnt_pt}/testfile'])
+ try:
+ self._create_export(export_id='123', extra_cmd=['--pseudo-path',
+ self.pseudo_path,
+ '--path',
+ '/testfile'])
+ except CommandFailedError as e:
+ # NotADirectoryError is raised as EINVAL
+ if e.exitstatus != errno.EINVAL:
+ raise
+ self.ctx.cluster.run(args=['rm', '-rf', '/mnt/testfile'])
+ self._delete_cluster_with_fs(self.fs_name, mnt_pt, preserve_mode)
+
+ def test_nfs_export_creation_at_symlink(self):
+ """
+ Test that nfs exports can't be created at a symlink path
+ """
+ mnt_pt = '/mnt'
+ preserve_mode = self._sys_cmd(['stat', '-c', '%a', mnt_pt])
+ self._create_cluster_with_fs(self.fs_name, mnt_pt)
+ self.ctx.cluster.run(args=['mkdir', f'{mnt_pt}/testdir'])
+ self.ctx.cluster.run(args=['ln', '-s', 'testdir', 'testdir_symlink'],
+ cwd=f'{mnt_pt}')
+ try:
+ self._create_export(export_id='123',
+ extra_cmd=['--pseudo-path',
+ self.pseudo_path,
+ '--path',
+ f'{mnt_pt}/testdir_symlink'])
+ except CommandFailedError as e:
+ if e.exitstatus != errno.ENOENT:
+ raise
+ self.ctx.cluster.run(args=['rm', '-rf', f'{mnt_pt}/*'])
+ self._delete_cluster_with_fs(self.fs_name, mnt_pt, preserve_mode)