self._cmd('fs', 'volume', 'rm', fs_name, '--yes-i-really-mean-it')
self._test_delete_cluster()
+ def _nfs_export_apply(self, cluster, exports, raise_on_error=False):
+ return self.ctx.cluster.run(args=['ceph', 'nfs', 'export', 'apply',
+ cluster, '-i', '-'],
+ check_status=raise_on_error,
+ stdin=json.dumps(exports),
+ stdout=StringIO(), stderr=StringIO())
+
def test_create_and_delete_cluster(self):
'''
Test successful creation and deletion of the nfs cluster.
raise
self.ctx.cluster.run(args=['rm', '-rf', f'{mnt_pt}/*'])
self._delete_cluster_with_fs(self.fs_name, mnt_pt, preserve_mode)
+
+ def test_nfs_export_apply_multiple_exports(self):
+ """
+ Test multiple export creation/update with multiple
+ export blocks provided in the json/conf file using:
+ ceph nfs export apply <nfs_cluster> -i <{conf/json}_file>, and check
+ 1) if there are multiple failure:
+ -> Return the EIO and error status to CLI (along with JSON output
+ containing status of every export).
+ 2) if there is single failure:
+ -> Return the respective errno and error status to CLI (along with
+ JSON output containing status of every export).
+ """
+
+ mnt_pt = self._sys_cmd(['mktemp', '-d']).decode().strip()
+ self._create_cluster_with_fs(self.fs_name, mnt_pt)
+ try:
+ self.ctx.cluster.run(args=['mkdir', f'{mnt_pt}/testdir1'])
+ self.ctx.cluster.run(args=['mkdir', f'{mnt_pt}/testdir2'])
+ self.ctx.cluster.run(args=['mkdir', f'{mnt_pt}/testdir3'])
+ self._create_export(export_id='1',
+ extra_cmd=['--pseudo-path', self.pseudo_path,
+ '--path', '/testdir1'])
+ self._create_export(export_id='2',
+ extra_cmd=['--pseudo-path',
+ self.pseudo_path+'2',
+ '--path', '/testdir2'])
+ exports = [
+ {
+ "export_id": 11, # export_id change not allowed
+ "path": "/testdir1",
+ "pseudo": self.pseudo_path,
+ "squash": "none",
+ "access_type": "rw",
+ "protocols": [4],
+ "fsal": {
+ "name": "CEPH",
+ "user_id": "nfs.test.1",
+ "fs_name": self.fs_name
+ }
+ },
+ {
+ "export_id": 2,
+ "path": "/testdir2",
+ "pseudo": self.pseudo_path+'2',
+ "squash": "none",
+ "access_type": "rw",
+ "protocols": [4],
+ "fsal": {
+ "name": "CEPH",
+ "user_id": "nfs.test.2",
+ "fs_name": "invalid_fs_name" # invalid fs
+ }
+ },
+ { # no error, export creation should succeed
+ "export_id": 3,
+ "path": "/testdir3",
+ "pseudo": self.pseudo_path+'3',
+ "squash": "none",
+ "access_type": "rw",
+ "protocols": [4],
+ "fsal": {
+ "name": "CEPH",
+ "user_id": "nfs.test.3",
+ "fs_name": self.fs_name
+ }
+ }
+ ]
+
+ # multiple failures
+ ret = self._nfs_export_apply(self.cluster_id, exports)
+ self.assertEqual(ret[0].returncode, errno.EIO)
+ self.assertIn("2 export blocks (at index 1, 2) failed to be "
+ "created/updated", ret[0].stderr.getvalue())
+
+ # single failure
+ exports[1]["fsal"]["fs_name"] = self.fs_name # correct the fs
+ ret = self._nfs_export_apply(self.cluster_id, exports)
+ self.assertEqual(ret[0].returncode, errno.EINVAL)
+ self.assertIn("Export ID changed, Cannot update export for "
+ "export block at index 1", ret[0].stderr.getvalue())
+ finally:
+ self._delete_cluster_with_fs(self.fs_name, mnt_pt)
+ self.ctx.cluster.run(args=['rm', '-rf', f'{mnt_pt}'])
+
+ def test_nfs_export_apply_single_export(self):
+ """
+ Test that when single export creation/update fails with multiple
+ export blocks provided in the json/conf file using:
+ ceph nfs export apply <nfs_cluster> -i <{conf/json}_file>, it
+ returns the respective errno and error status to CLI (along with
+ JSON output containing status of every export).
+ """
+
+ mnt_pt = self._sys_cmd(['mktemp', '-d']).decode().strip()
+ self._create_cluster_with_fs(self.fs_name, mnt_pt)
+ try:
+ self.ctx.cluster.run(args=['mkdir', f'{mnt_pt}/testdir1'])
+ self._create_export(export_id='1',
+ extra_cmd=['--pseudo-path', self.pseudo_path,
+ '--path', '/testdir1'])
+ export = {
+ "export_id": 1,
+ "path": "/testdir1",
+ "pseudo": self.pseudo_path,
+ "squash": "none",
+ "access_type": "rw",
+ "protocols": [4],
+ "fsal": {
+ "name": "CEPH",
+ "user_id": "nfs.test.1",
+ "fs_name": "invalid_fs_name" # invalid fs
+ }
+ }
+ ret = self._nfs_export_apply(self.cluster_id, export)
+ self.assertEqual(ret[0].returncode, errno.ENOENT)
+ self.assertIn("filesystem invalid_fs_name not found for "
+ "export block at index 1", ret[0].stderr.getvalue())
+ finally:
+ self._delete_cluster_with_fs(self.fs_name, mnt_pt)
+ self.ctx.cluster.run(args=['rm', '-rf', f'{mnt_pt}'])
+
+ def test_nfs_export_apply_json_output_states(self):
+ """
+ If export creation/update is done using:
+ ceph nfs export apply <nfs_cluster> -i <{conf/json}_file> then the
+ "status" field in the json output maybe added, updated, error or
+ warning. Test different scenarios to make sure these states are
+ in the json output as expected.
+ """
+
+ mnt_pt = self._sys_cmd(['mktemp', '-d']).decode().strip()
+ self._create_cluster_with_fs(self.fs_name, mnt_pt)
+ try:
+ self.ctx.cluster.run(args=['mkdir', f'{mnt_pt}/testdir1'])
+ self.ctx.cluster.run(args=['mkdir', f'{mnt_pt}/testdir2'])
+ self.ctx.cluster.run(args=['mkdir', f'{mnt_pt}/testdir3'])
+ self._create_export(export_id='1',
+ extra_cmd=['--pseudo-path', self.pseudo_path,
+ '--path', '/testdir1'])
+ exports = [
+ { # change pseudo, state should be "updated"
+ "export_id": 1,
+ "path": "/testdir1",
+ "pseudo": self.pseudo_path+'1',
+ "squash": "none",
+ "access_type": "rw",
+ "protocols": [4],
+ "fsal": {
+ "name": "CEPH",
+ "user_id": "nfs.test.1",
+ "fs_name": self.fs_name
+ }
+ },
+ { # a new export, state should be "added"
+ "export_id": 2,
+ "path": "/testdir2",
+ "pseudo": self.pseudo_path+'2',
+ "squash": "none",
+ "access_type": "rw",
+ "protocols": [4],
+ "fsal": {
+ "name": "CEPH",
+ "user_id": "nfs.test.2",
+ "fs_name": self.fs_name
+ }
+ },
+ { # error in export block, state should be "error" since the
+ # fs_name is invalid
+ "export_id": 3,
+ "path": "/testdir3",
+ "pseudo": self.pseudo_path+'3',
+ "squash": "none",
+ "access_type": "RW",
+ "protocols": [4],
+ "fsal": {
+ "name": "CEPH",
+ "user_id": "nfs.test.3",
+ "fs_name": "invalid_fs_name"
+ }
+ }
+ ]
+ ret = self._nfs_export_apply(self.cluster_id, exports)
+ json_output = json.loads(ret[0].stdout.getvalue().strip())
+ self.assertEqual(len(json_output), 3)
+ self.assertEqual(json_output[0]["state"], "updated")
+ self.assertEqual(json_output[1]["state"], "added")
+ self.assertEqual(json_output[2]["state"], "error")
+ finally:
+ self._delete_cluster_with_fs(self.fs_name, mnt_pt)
+ self.ctx.cluster.run(args=['rm', '-rf', f'{mnt_pt}'])