From: Avan Thakkar Date: Thu, 29 Aug 2024 10:39:14 +0000 (+0530) Subject: mgr/nfs: add additional tests for cmount_path & user_id deletion X-Git-Tag: v19.2.1~24^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=0e6c5212aafe5344bcd8925f588d65dd95177dd2;p=ceph.git mgr/nfs: add additional tests for cmount_path & user_id deletion Add unit tests for unique user ID generation, deletion and `cmount_path` handling in FSAL exports - Ensure unique user ID generation for different FSAL blocks when creating exports. - Test deletion behavior when multiple exports share the same user ID and one has a unique ID. - Test default behavior when no `cmount_path` is provided (defaults to `/`). - Add tests to validate error handling for invalid `cmount_path` values. Signed-off-by: Avan Thakkar (cherry picked from commit b8f3db5b4042b737a4146833269f82512421e0a5) --- diff --git a/qa/tasks/cephfs/test_nfs.py b/qa/tasks/cephfs/test_nfs.py index 8d4f70828439..12c28e35f657 100644 --- a/qa/tasks/cephfs/test_nfs.py +++ b/qa/tasks/cephfs/test_nfs.py @@ -14,6 +14,8 @@ log = logging.getLogger(__name__) NFS_POOL_NAME = '.nfs' # should match mgr_module.py # TODO Add test for cluster update when ganesha can be deployed on multiple ports. + + class TestNFS(MgrTestCase): def _cmd(self, *args): return self.get_ceph_cmd_stdout(args) @@ -119,7 +121,7 @@ class TestNFS(MgrTestCase): return self.fail(fail_msg) - def _check_auth_ls(self, fs_name, check_in=False): + def _check_auth_ls(self, fs_name, check_in=False, user_id=None): ''' Tests export user id creation or deletion. :param export_id: Denotes export number @@ -127,10 +129,12 @@ class TestNFS(MgrTestCase): ''' output = self._cmd('auth', 'ls') client_id = f'client.nfs.{self.cluster_id}' + search_id = f'client.{user_id}' if user_id else f'{client_id}.{fs_name}' + if check_in: - self.assertIn(f'{client_id}.{fs_name}', output) + self.assertIn(search_id, output) else: - self.assertNotIn(f'{client_id}.{fs_name}', output) + self.assertNotIn(search_id, output) def _test_idempotency(self, cmd_func, cmd_args): ''' @@ -231,11 +235,11 @@ class TestNFS(MgrTestCase): self._test_create_cluster() self._create_export(export_id='1', create_fs=True) - def _delete_export(self): + def _delete_export(self, pseduo_path=None): ''' Delete an export. ''' - self._nfs_cmd('export', 'rm', self.cluster_id, self.pseudo_path) + self._nfs_cmd('export', 'rm', self.cluster_id, pseduo_path if pseduo_path else self.pseudo_path) self._check_auth_ls(self.fs_name) def _test_list_export(self): @@ -272,11 +276,12 @@ class TestNFS(MgrTestCase): self.sample_export['fsal']['user_id'] = f'{self.expected_name}.{self.fs_name}.3746f603' self.assertDictEqual(self.sample_export, nfs_output[3]) - def _get_export(self): + def _get_export(self, pseudo_path=None): ''' Returns export block in json format ''' - return json.loads(self._nfs_cmd('export', 'info', self.cluster_id, self.pseudo_path)) + return json.loads(self._nfs_cmd('export', 'info', self.cluster_id, + pseudo_path if pseudo_path else self.pseudo_path)) def _test_get_export(self): ''' @@ -1139,3 +1144,122 @@ class TestNFS(MgrTestCase): finally: self._delete_cluster_with_fs(self.fs_name, mnt_pt) self.ctx.cluster.run(args=['rm', '-rf', f'{mnt_pt}']) + + def test_cephfs_export_update_with_nonexistent_dir(self): + """ + Test that invalid path is not allowed while updating a CephFS + export. + """ + self._create_cluster_with_fs(self.fs_name) + self._create_export(export_id=1) + + try: + self.update_export(self.cluster_id, "/not_existent_dir", + self.pseudo_path, self.fs_name) + except CommandFailedError as e: + if e.exitstatus != errno.ENOENT: + raise + + self._delete_export() + self._delete_cluster_with_fs(self.fs_name) + + def test_cephfs_export_update_at_non_dir_path(self): + """ + Test that non-directory path are not allowed while updating a CephFS + export. + """ + mnt_pt = '/mnt' + preserve_mode = self._sys_cmd(['stat', '-c', '%a', mnt_pt]) + self._create_cluster_with_fs(self.fs_name, mnt_pt) + try: + self.ctx.cluster.run(args=['touch', f'{mnt_pt}/testfile']) + self._create_export(export_id=1) + + # test at a file path + try: + self.update_export(self.cluster_id, "/testfile", + self.pseudo_path, self.fs_name) + except CommandFailedError as e: + if e.exitstatus != errno.ENOTDIR: + raise + + # test at a symlink path + self.ctx.cluster.run(args=['mkdir', f'{mnt_pt}/testdir']) + self.ctx.cluster.run(args=['ln', '-s', f'{mnt_pt}/testdir', + f'{mnt_pt}/testdir_symlink']) + try: + self.update_export(self.cluster_id, "/testdir_symlink", + self.pseudo_path, self.fs_name) + except CommandFailedError as e: + if e.exitstatus != errno.ENOTDIR: + raise + + # verify the path wasn't changed + export = json.loads(self._nfs_cmd("export", "ls", + self.cluster_id, "--detailed")) + self.assertEqual(export[0]["pseudo"], "/cephfs") + + finally: + self.ctx.cluster.run(args=['rm', '-rf', f'{mnt_pt}/*']) + self._delete_cluster_with_fs(self.fs_name, mnt_pt, preserve_mode) + + def test_nfs_export_creation_without_cmount_path(self): + """ + Test that ensure cmount_path is present in FSAL block + """ + pseudo_path = '/test_without_cmount' + self._create_export(export_id='1234', + extra_cmd=['--pseudo-path', pseudo_path]) + nfs_output = self._get_export(pseudo_path) + self.assertIn('cmount_path', nfs_output['fsal']) + + self._delete_export(pseudo_path) + + def test_nfs_exports_with_same_and_diff_user_id(self): + """ + Test that exports with same FSAL share same user_id + """ + pseudo_path_1 = '/test1' + pseudo_path_2 = '/test2' + pseudo_path_3 = '/test3' + + # Create subvolumes + self._cmd('fs', 'subvolume', 'create', self.fs_name, 'sub_vol_1') + self._cmd('fs', 'subvolume', 'create', self.fs_name, 'sub_vol_2') + + fs_path_1 = self._cmd('fs', 'subvolume', 'getpath', self.fs_name, 'sub_vol_1').strip() + fs_path_2 = self._cmd('fs', 'subvolume', 'getpath', self.fs_name, 'sub_vol_2').strip() + # Both exports should have same user_id(since cmount_path=/ & fs_name is same) + self._create_export(export_id='21', + extra_cmd=['--pseudo-path', pseudo_path_1, + '--path', fs_path_1]) + self._create_export(export_id='22', + extra_cmd=['--pseudo-path', pseudo_path_2, + '--path', fs_path_2]) + + nfs_output_1 = self._get_export(pseudo_path_1) + nfs_output_2 = self._get_export(pseudo_path_2) + # Check if both exports have same user_id + self.assertEqual(nfs_output_2['fsal']['user_id'], nfs_output_1['fsal']['user_id']) + self.assertEqual(nfs_output_1['fsal']['user_id'], 'nfs.test.nfs-cephfs.3746f603') + + cmount_path = '/volumes' + self._create_export(export_id='23', + extra_cmd=['--pseudo-path', pseudo_path_3, + '--path', fs_path_1, + '--cmount-path', cmount_path]) + + nfs_output_3 = self._get_export(pseudo_path_3) + self.assertNotEqual(nfs_output_3['fsal']['user_id'], nfs_output_1['fsal']['user_id']) + self.assertEqual(nfs_output_3['fsal']['user_id'], 'nfs.test.nfs-cephfs.32cd8545') + + self._delete_export(pseudo_path_1) + # Deleting export with same user_id should not delete the user_id + self._check_auth_ls(self.fs_name, True, nfs_output_2['fsal']['user_id']) + self._delete_export(pseudo_path_2) + # Deleting export 22 should delete the user_id since it's only export left with that user_id + self._check_auth_ls(self.fs_name, False, nfs_output_2['fsal']['user_id']) + + self._delete_export(pseudo_path_3) + # Deleting export 23 should delete the user_id since it's only export with that user_id + self._check_auth_ls(self.fs_name, False, nfs_output_3['fsal']['user_id']) diff --git a/src/pybind/mgr/nfs/tests/test_nfs.py b/src/pybind/mgr/nfs/tests/test_nfs.py index 268e51e64c82..edf8bab37a16 100644 --- a/src/pybind/mgr/nfs/tests/test_nfs.py +++ b/src/pybind/mgr/nfs/tests/test_nfs.py @@ -1119,6 +1119,9 @@ NFS_CORE_PARAM { def test_create_export_cephfs_with_cmount_path(self): self._do_mock_test(self._do_test_create_export_cephfs_with_cmount_path) + + def test_create_export_cephfs_with_invalid_cmount_path(self): + self._do_mock_test(self._do_test_create_export_cephfs_with_invalid_cmount_path) def _do_test_create_export_cephfs(self): nfs_mod = Module('nfs', '', '') @@ -1193,6 +1196,25 @@ NFS_CORE_PARAM { assert export.fsal.cephx_key == "thekeyforclientabc" assert export.fsal.cmount_path == "/" assert export.cluster_id == self.cluster_id + + def _do_test_create_export_cephfs_with_invalid_cmount_path(self): + import object_format + + nfs_mod = Module('nfs', '', '') + conf = ExportMgr(nfs_mod) + + with pytest.raises(object_format.ErrorResponse) as e: + conf.create_export( + fsal_type='cephfs', + cluster_id=self.cluster_id, + fs_name='myfs', + path='/', + pseudo_path='/cephfs4', + read_only=False, + squash='root', + cmount_path='/invalid', + ) + assert "Invalid cmount_path: '/invalid'" in str(e.value) def _do_test_cluster_ls(self): nfs_mod = Module('nfs', '', '')