log.exception("Ganesha timed out")
+def normalize_path(path: str) -> str:
+ if path:
+ path = normpath(path.strip())
+ if path[:2] == "//":
+ path = path[1:]
+ return path
+
+
class NFSRados:
def __init__(self, rados: 'Rados', namespace: str) -> None:
self.rados = rados
GaneshaConfParser.write_block(export.to_export_block()),
export_obj_name(export.export_id), conf_obj_name(export.cluster_id))
- def format_path(self, path: str) -> str:
- if path:
- path = normpath(path.strip())
- if path[:2] == "//":
- path = path[1:]
- return path
-
@export_cluster_checker
def create_export(self, addr: Optional[List[str]] = None, **kwargs: Any) -> Tuple[int, str, str]:
# if addr(s) are provided, construct client list and adjust outer block
path = ex_dict.get("path")
if path is None:
raise NFSInvalidOperation("export must specify path")
- path = self.format_path(path)
+ path = normalize_path(path)
fsal = ex_dict.get("fsal", {})
fsal_type = fsal.get("name")
squash: str,
access_type: str,
clients: list = []) -> Tuple[int, str, str]:
- pseudo_path = self.format_path(pseudo_path)
+ pseudo_path = normalize_path(pseudo_path)
if not self._fetch_export(cluster_id, pseudo_path):
export = self.create_export_from_dict(
bucket: Optional[str] = None,
user_id: Optional[str] = None,
clients: list = []) -> Tuple[int, str, str]:
- pseudo_path = self.format_path(pseudo_path)
+ pseudo_path = normalize_path(pseudo_path)
if not bucket and not user_id:
return -errno.EINVAL, "", "Must specify either bucket or user_id"
if cluster_id not in self.exports:
self.exports[cluster_id] = []
- new_export_dict['path'] = self.format_path(new_export_dict['path'])
- new_export_dict['pseudo'] = self.format_path(new_export_dict['pseudo'])
+ new_export_dict['path'] = normalize_path(new_export_dict['path'])
+ new_export_dict['pseudo'] = normalize_path(new_export_dict['pseudo'])
old_export = self._fetch_export(cluster_id, new_export_dict['pseudo'])
if old_export: