finally:
self._delete_cluster_with_fs(self.fs_name, mnt_pt)
self.ctx.cluster.run(args=['rm', '-rf', f'{mnt_pt}'])
+
+ def test_pseudo_path_in_json_response_when_updating_exports_failed(self):
+ """
+ Test that on export update/creation failure while using
+ ceph nfs export apply <nfs_cluster> -i <json/conf>, the failed
+ exports pseudo paths are visible in the JSON response to CLI and the
+ return code is set to EIO.
+ """
+ mnt_pt = self._sys_cmd(['mktemp', '-d']).decode().strip()
+ self._create_cluster_with_fs(self.fs_name, mnt_pt)
+ self.ctx.cluster.run(args=['mkdir', f'{mnt_pt}/testdir1'])
+ self.ctx.cluster.run(args=['mkdir', f'{mnt_pt}/testdir2'])
+ self._create_export(export_id='1',
+ extra_cmd=['--pseudo-path', self.pseudo_path])
+
+ ret = self.ctx.cluster.run(args=['ceph', 'nfs', 'export', 'apply',
+ self.cluster_id, '-i', '-'],
+ check_status=False,
+ stdin=json.dumps([
+ {
+ "export_id": 11, # change not allowed
+ "path": "/testdir1",
+ "pseudo": self.pseudo_path,
+ "squash": "none",
+ "access_type": "rw",
+ "protocols": [4],
+ "fsal": {
+ "name": "CEPH",
+ "fs_name": self.fs_name
+ }
+ },
+ {
+ "path": "/testdir2",
+ "pseudo": self.pseudo_path+'1',
+ "squash": "none",
+ "access_type": "rw",
+ "protocols": [4],
+ "fsal": {
+ "name": "CEPH",
+ "fs_name": "foo" # invalid fs
+ }
+ }]),
+ stdout=StringIO(), stderr=StringIO())
+
+ try:
+ # EIO since multiple exports failure (first export failed to be
+ # modified while the second one failed to be created)
+ self.assertEqual(ret[0].returncode, errno.EIO)
+ err_info = ret[0].stdout
+ if err_info:
+ update_details = json.loads(err_info.getvalue())
+ self.assertEqual(update_details[0]["pseudo"], self.pseudo_path)
+ self.assertEqual(update_details[1]["pseudo"], self.pseudo_path+'1')
+ else:
+ self.fail("Could not retrieve any export update data")
+
+ # verify second export wasn't created
+ exports = json.loads(self._nfs_cmd('export', 'ls',
+ self.cluster_id, '--detailed'))
+ self.assertEqual(len(exports), 1)
+
+ finally:
+ self._delete_cluster_with_fs(self.fs_name, mnt_pt)
+ self.ctx.cluster.run(args=['rm', '-rf', f'{mnt_pt}'])