@RESTController.MethodMap(version=APIVersion(2, 0)) # type: ignore
def create(self, path, cluster_id, pseudo, access_type,
squash, security_label, protocols, transports, fsal, clients) -> Dict[str, Any]:
- export_mgr = mgr.remote('nfs', 'fetch_nfs_export_obj')
- if export_mgr.get_export_by_pseudo(cluster_id, pseudo):
- raise DashboardException(msg=f'Pseudo {pseudo} is already in use.',
- component='nfs')
if hasattr(fsal, 'user_id'):
fsal.pop('user_id') # mgr/nfs does not let you customize user_id
raw_ex = {
'fsal': fsal,
'clients': clients
}
- applied_exports = export_mgr.apply_export(cluster_id, json.dumps(raw_ex))
- if not applied_exports.has_error:
- return self._get_schema_export(
- export_mgr.get_export_by_pseudo(cluster_id, pseudo))
- raise NFSException(f"Export creation failed {applied_exports.changes[0].msg}")
+ result = mgr.remote('nfs', 'export_apply', cluster_id, json.dumps(raw_ex))
+ if result.has_error:
+ raise NFSException(
+ result.mgr_status_value() or 'Failed to create export'
+ )
+
+ return self._get_schema_export(raw_ex)
@EndpointDoc("Get an NFS-Ganesha export",
parameters={
}
existing_export = mgr.remote('nfs', 'export_get', cluster_id, export_id)
- export_mgr = mgr.remote('nfs', 'fetch_nfs_export_obj')
if existing_export and raw_ex:
ss_export_fsal = existing_export.get('fsal', {})
for key, value in ss_export_fsal.items():
raw_ex['fsal'][key] = value
- applied_exports = export_mgr.apply_export(cluster_id, json.dumps(raw_ex))
- if not applied_exports.has_error:
- return self._get_schema_export(
- export_mgr.get_export_by_pseudo(cluster_id, pseudo))
- raise NFSException(f"Export creation failed {applied_exports.changes[0].msg}")
+
+ result = mgr.remote('nfs', 'export_apply', cluster_id, json.dumps(raw_ex))
+ if result.has_error:
+ raise NFSException(
+ result.mgr_status_value() or 'Failed to update export'
+ )
+ return self._get_schema_export(raw_ex)
@NfsTask('delete', {'cluster_id': '{cluster_id}',
'export_id': '{export_id}'}, 2.0)
self.assertJsonBody(self._expected_export)
def test_create_export(self):
- export_mgr = Mock()
- created_nfs_export = deepcopy(self._nfs_module_export)
- applied_nfs_export = deepcopy(self._applied_export)
- created_nfs_export['pseudo'] = 'new-pseudo'
- created_nfs_export['export_id'] = 2
- export_mgr.get_export_by_pseudo.side_effect = [None, created_nfs_export]
- export_mgr.apply_export.return_value = applied_nfs_export
- mgr.remote.return_value = export_mgr
-
export_create_body = deepcopy(self._expected_export)
del export_create_body['export_id']
- export_create_body['pseudo'] = created_nfs_export['pseudo']
- applied_nfs_export.append(export_create_body)
+ export_create_body['pseudo'] = 'new-pseudo'
+
+ applied_nfs_export = deepcopy(self._applied_export)
+ applied_nfs_export.has_error = False
+ mgr.remote.return_value = applied_nfs_export
self._post('/api/nfs-ganesha/export',
export_create_body,
version=APIVersion(2, 0))
self.assertStatus(201)
- applied_nfs_export.changes[0]['export_id'] = created_nfs_export['export_id']
- self.assertJsonBody(applied_nfs_export.changes[0])
+ self.assertJsonBody(export_create_body)
def test_create_export_with_existing_pseudo_fails(self):
- export_mgr = Mock()
- export_mgr.get_export_by_pseudo.return_value = self._nfs_module_export
- mgr.remote.return_value = export_mgr
-
export_create_body = deepcopy(self._expected_export)
del export_create_body['export_id']
+ pseudo = export_create_body["pseudo"]
+
+ applied_nfs_export = deepcopy(self._applied_export)
+ applied_nfs_export.has_error = True
+ expected_error_string = f"Pseudo {pseudo} is already in use for export block at index 1"
+ applied_nfs_export.mgr_status_value = Mock(return_value=expected_error_string)
+ mgr.remote.return_value = applied_nfs_export
self._post('/api/nfs-ganesha/export',
export_create_body,
version=APIVersion(2, 0))
self.assertStatus(400)
response = self.json_body()
- self.assertIn(f'Pseudo {export_create_body["pseudo"]} is already in use',
- response['detail'])
+ self.assertIn(expected_error_string, response['detail'])
def test_set_export(self):
- export_mgr = Mock()
existing_export = deepcopy(self._nfs_module_export)
updated_nfs_export = deepcopy(self._nfs_module_export)
applied_nfs_export = deepcopy(self._applied_export)
existing_export['fsal']['user_id'] = 'dashboard'
- mgr.remote = Mock(side_effect=[existing_export, export_mgr])
-
updated_nfs_export['pseudo'] = 'updated-pseudo'
- export_mgr.get_export_by_pseudo.return_value = updated_nfs_export
- export_mgr.apply_export.return_value = applied_nfs_export
-
- updated_export_body = deepcopy(self._expected_export)
- updated_export_body['pseudo'] = updated_nfs_export['pseudo']
- applied_nfs_export.append(updated_export_body)
+ applied_nfs_export.append(updated_nfs_export)
+ applied_nfs_export.has_error = False
+ mgr.remote.side_effect = [existing_export, applied_nfs_export]
self._put('/api/nfs-ganesha/export/myc/2',
- updated_export_body,
+ updated_nfs_export,
version=APIVersion(2, 0))
self.assertStatus(200)
self.assertJsonBody(applied_nfs_export.changes[0])
"""Reset NFS-Ganesha Config to default"""
return self.nfs.reset_nfs_cluster_config(cluster_id=cluster_id)
- def fetch_nfs_export_obj(self) -> ExportMgr:
- return self.export_mgr
+ def export_apply(self, cluster_id: str, export_config: str) -> AppliedExportResults:
+ """Create or update an export by `export_config` which can be json string or ganesha export specification"""
+ earmark_resolver = CephFSEarmarkResolver(self)
+ return self.export_mgr.apply_export(cluster_id, export_config=export_config,
+ earmark_resolver=earmark_resolver)
def export_ls(self, cluster_id: Optional[str] = None, detailed: bool = False) -> List[Dict[Any, Any]]:
if not (cluster_id):