return ex
return None
except KeyError:
- log.info(f'unable to fetch f{cluster_id}')
+ log.info(f'no exports for cluster {cluster_id}')
return None
def _delete_export_user(self, export: Export) -> None:
for c in export.clients:
if c.access_type.lower() == 'rw':
rw = True
+ break
fsal.user_id = f"nfs.{export.cluster_id}.{export.export_id}"
fsal.cephx_key = self._create_user_key(
kwargs['squash'] = 'none'
kwargs['clients'] = clients
+ if clients:
+ kwargs['access_type'] = "none"
+ elif kwargs['read_only']:
+ kwargs['access_type'] = "RO"
+ else:
+ kwargs['access_type'] = "RW"
+
+ if kwargs['cluster_id'] not in self.exports:
+ self.exports[kwargs['cluster_id']] = []
+
try:
fsal_type = kwargs.pop('fsal_type')
if fsal_type == 'cephfs':
pseudo_path = ex_dict.get("pseudo")
if not pseudo_path:
raise NFSInvalidOperation("export must specify pseudo path")
- pseudo_path = self.format_path(pseudo_path)
path = ex_dict.get("path")
if not path:
read_only: bool,
path: str,
squash: str,
+ access_type: str,
clients: list = []) -> Tuple[int, str, str]:
- if not check_fs(self.mgr, fs_name):
- raise FSNotFound(fs_name)
-
pseudo_path = self.format_path(pseudo_path)
- if cluster_id not in self.exports:
- self.exports[cluster_id] = []
-
if not self._fetch_export(cluster_id, pseudo_path):
- if clients:
- access_type = "none"
- elif read_only:
- access_type = "RO"
- else:
- access_type = "RW"
export = self.create_export_from_dict(
cluster_id,
self._gen_export_id(cluster_id),
bucket: str,
cluster_id: str,
pseudo_path: str,
+ access_type: str,
read_only: bool,
squash: str,
realm: Optional[str] = None,
clients: list = []) -> Tuple[int, str, str]:
- if '/' in bucket:
- raise NFSInvalidOperation('"/" is not allowed in bucket name')
pseudo_path = self.format_path(pseudo_path)
if not self._fetch_export(cluster_id, pseudo_path):
- if clients:
- access_type = "none"
- elif read_only:
- access_type = "RO"
- else:
- access_type = "RW"
export = self.create_export_from_dict(
cluster_id,
self._gen_export_id(cluster_id),
cluster_id: str,
new_export_dict: Dict,
) -> Tuple[int, str, str]:
- for k in ['cluster_id', 'path', 'pseudo']:
+ for k in ['path', 'pseudo']:
if k not in new_export_dict:
raise NFSInvalidOperation(f'Export missing required field {k}')
- if new_export_dict['cluster_id'] not in available_clusters(self.mgr):
+ if cluster_id not in available_clusters(self.mgr):
raise ClusterNotFound()
+ if cluster_id not in self.exports:
+ self.exports[cluster_id] = []
new_export_dict['path'] = self.format_path(new_export_dict['path'])
new_export_dict['pseudo'] = self.format_path(new_export_dict['pseudo'])
- old_export = self._fetch_export(new_export_dict['cluster_id'],
- new_export_dict['pseudo'])
+ old_export = self._fetch_export(cluster_id, new_export_dict['pseudo'])
if old_export:
# Check if export id matches
if new_export_dict.get('export_id'):