NFS_POOL_NAME = '.nfs' # should match mgr_module.py
# TODO Add test for cluster update when ganesha can be deployed on multiple ports.
+
+
class TestNFS(MgrTestCase):
def _cmd(self, *args):
return self.get_ceph_cmd_stdout(args)
return
self.fail(fail_msg)
- def _check_auth_ls(self, fs_name, check_in=False):
+ def _check_auth_ls(self, fs_name, check_in=False, user_id=None):
'''
Tests export user id creation or deletion.
:param export_id: Denotes export number
'''
output = self._cmd('auth', 'ls')
client_id = f'client.nfs.{self.cluster_id}'
+ search_id = f'client.{user_id}' if user_id else f'{client_id}.{fs_name}'
+
if check_in:
- self.assertIn(f'{client_id}.{fs_name}', output)
+ self.assertIn(search_id, output)
else:
- self.assertNotIn(f'{client_id}.{fs_name}', output)
+ self.assertNotIn(search_id, output)
def _test_idempotency(self, cmd_func, cmd_args):
'''
self._test_create_cluster()
self._create_export(export_id='1', create_fs=True)
- def _delete_export(self):
+ def _delete_export(self, pseduo_path=None):
'''
Delete an export.
'''
- self._nfs_cmd('export', 'rm', self.cluster_id, self.pseudo_path)
+ self._nfs_cmd('export', 'rm', self.cluster_id, pseduo_path if pseduo_path else self.pseudo_path)
self._check_auth_ls(self.fs_name)
def _test_list_export(self):
self.sample_export['fsal']['user_id'] = f'{self.expected_name}.{self.fs_name}.3746f603'
self.assertDictEqual(self.sample_export, nfs_output[3])
- def _get_export(self):
+ def _get_export(self, pseudo_path=None):
'''
Returns export block in json format
'''
- return json.loads(self._nfs_cmd('export', 'info', self.cluster_id, self.pseudo_path))
+ return json.loads(self._nfs_cmd('export', 'info', self.cluster_id,
+ pseudo_path if pseudo_path else self.pseudo_path))
def _test_get_export(self):
'''
finally:
self._delete_cluster_with_fs(self.fs_name, mnt_pt)
self.ctx.cluster.run(args=['rm', '-rf', f'{mnt_pt}'])
+
+ def test_cephfs_export_update_with_nonexistent_dir(self):
+ """
+ Test that invalid path is not allowed while updating a CephFS
+ export.
+ """
+ self._create_cluster_with_fs(self.fs_name)
+ self._create_export(export_id=1)
+
+ try:
+ self.update_export(self.cluster_id, "/not_existent_dir",
+ self.pseudo_path, self.fs_name)
+ except CommandFailedError as e:
+ if e.exitstatus != errno.ENOENT:
+ raise
+
+ self._delete_export()
+ self._delete_cluster_with_fs(self.fs_name)
+
+ def test_cephfs_export_update_at_non_dir_path(self):
+ """
+ Test that non-directory path are not allowed while updating a CephFS
+ export.
+ """
+ mnt_pt = '/mnt'
+ preserve_mode = self._sys_cmd(['stat', '-c', '%a', mnt_pt])
+ self._create_cluster_with_fs(self.fs_name, mnt_pt)
+ try:
+ self.ctx.cluster.run(args=['touch', f'{mnt_pt}/testfile'])
+ self._create_export(export_id=1)
+
+ # test at a file path
+ try:
+ self.update_export(self.cluster_id, "/testfile",
+ self.pseudo_path, self.fs_name)
+ except CommandFailedError as e:
+ if e.exitstatus != errno.ENOTDIR:
+ raise
+
+ # test at a symlink path
+ self.ctx.cluster.run(args=['mkdir', f'{mnt_pt}/testdir'])
+ self.ctx.cluster.run(args=['ln', '-s', f'{mnt_pt}/testdir',
+ f'{mnt_pt}/testdir_symlink'])
+ try:
+ self.update_export(self.cluster_id, "/testdir_symlink",
+ self.pseudo_path, self.fs_name)
+ except CommandFailedError as e:
+ if e.exitstatus != errno.ENOTDIR:
+ raise
+
+ # verify the path wasn't changed
+ export = json.loads(self._nfs_cmd("export", "ls",
+ self.cluster_id, "--detailed"))
+ self.assertEqual(export[0]["pseudo"], "/cephfs")
+
+ finally:
+ self.ctx.cluster.run(args=['rm', '-rf', f'{mnt_pt}/*'])
+ self._delete_cluster_with_fs(self.fs_name, mnt_pt, preserve_mode)
+
+ def test_nfs_export_creation_without_cmount_path(self):
+ """
+ Test that ensure cmount_path is present in FSAL block
+ """
+ pseudo_path = '/test_without_cmount'
+ self._create_export(export_id='1234',
+ extra_cmd=['--pseudo-path', pseudo_path])
+ nfs_output = self._get_export(pseudo_path)
+ self.assertIn('cmount_path', nfs_output['fsal'])
+
+ self._delete_export(pseudo_path)
+
+ def test_nfs_exports_with_same_and_diff_user_id(self):
+ """
+ Test that exports with same FSAL share same user_id
+ """
+ pseudo_path_1 = '/test1'
+ pseudo_path_2 = '/test2'
+ pseudo_path_3 = '/test3'
+
+ # Create subvolumes
+ self._cmd('fs', 'subvolume', 'create', self.fs_name, 'sub_vol_1')
+ self._cmd('fs', 'subvolume', 'create', self.fs_name, 'sub_vol_2')
+
+ fs_path_1 = self._cmd('fs', 'subvolume', 'getpath', self.fs_name, 'sub_vol_1').strip()
+ fs_path_2 = self._cmd('fs', 'subvolume', 'getpath', self.fs_name, 'sub_vol_2').strip()
+ # Both exports should have same user_id(since cmount_path=/ & fs_name is same)
+ self._create_export(export_id='21',
+ extra_cmd=['--pseudo-path', pseudo_path_1,
+ '--path', fs_path_1])
+ self._create_export(export_id='22',
+ extra_cmd=['--pseudo-path', pseudo_path_2,
+ '--path', fs_path_2])
+
+ nfs_output_1 = self._get_export(pseudo_path_1)
+ nfs_output_2 = self._get_export(pseudo_path_2)
+ # Check if both exports have same user_id
+ self.assertEqual(nfs_output_2['fsal']['user_id'], nfs_output_1['fsal']['user_id'])
+ self.assertEqual(nfs_output_1['fsal']['user_id'], 'nfs.test.nfs-cephfs.3746f603')
+
+ cmount_path = '/volumes'
+ self._create_export(export_id='23',
+ extra_cmd=['--pseudo-path', pseudo_path_3,
+ '--path', fs_path_1,
+ '--cmount-path', cmount_path])
+
+ nfs_output_3 = self._get_export(pseudo_path_3)
+ self.assertNotEqual(nfs_output_3['fsal']['user_id'], nfs_output_1['fsal']['user_id'])
+ self.assertEqual(nfs_output_3['fsal']['user_id'], 'nfs.test.nfs-cephfs.32cd8545')
+
+ self._delete_export(pseudo_path_1)
+ # Deleting export with same user_id should not delete the user_id
+ self._check_auth_ls(self.fs_name, True, nfs_output_2['fsal']['user_id'])
+ self._delete_export(pseudo_path_2)
+ # Deleting export 22 should delete the user_id since it's only export left with that user_id
+ self._check_auth_ls(self.fs_name, False, nfs_output_2['fsal']['user_id'])
+
+ self._delete_export(pseudo_path_3)
+ # Deleting export 23 should delete the user_id since it's only export with that user_id
+ self._check_auth_ls(self.fs_name, False, nfs_output_3['fsal']['user_id'])