raise NFSException(e.args[1], -e.args[0])
-def _validate_cmount_path(mgr: 'Module', fs_name: str, cmount_path: str, path: str) -> None:
- validate_cephfs_path(mgr, fs_name, cmount_path)
+def _validate_cmount_path(cmount_path: str, path: str) -> None:
if cmount_path not in path:
raise ValueError(
f"Invalid cmount_path: '{cmount_path}'. The path '{path}' is not within the mount path. "
validate_cephfs_path(self.mgr, fs_name, path)
if fsal["cmount_path"] != "/":
- _validate_cmount_path(self.mgr, fsal["cmount_path"], path) # type: ignore
+ _validate_cmount_path(fsal["cmount_path"], path) # type: ignore
user_id = f"nfs.{get_user_id(cluster_id, fs_name, fsal['cmount_path'])}"
if "user_id" in fsal and fsal["user_id"] != user_id:
ex_dict["fsal"] = fsal
ex_dict["cluster_id"] = cluster_id
export = Export.from_dict(ex_id, ex_dict)
+ if export.fsal.name == NFS_GANESHA_SUPPORTED_FSALS[0]:
+ self._ensure_cephfs_export_user(export)
export.validate(self.mgr)
log.debug("Successfully created %s export-%s from dict for cluster %s",
fsal_type, ex_id, cluster_id)
validate_cephfs_path(self.mgr, fs_name, path)
if cmount_path != "/":
- _validate_cmount_path(self.mgr, cmount_path, path) # type: ignore
+ _validate_cmount_path(cmount_path, path) # type: ignore
pseudo_path = normalize_path(pseudo_path)