import logging
from typing import List
import socket
+from os.path import isabs, normpath
from ceph.deployment.service_spec import NFSServiceSpec, PlacementSpec
from rados import TimedOut
def format_path(self, path):
if path:
- path = path.strip()
- if len(path) > 1 and path[-1] == '/':
- path = path[:-1]
+ path = normpath(path.strip())
+ if path[:2] == "//":
+ path = path[1:]
return path
def check_fs(self, fs_name):
if not self.check_fs(fs_name):
return -errno.ENOENT, "", f"filesystem {fs_name} not found"
+ pseudo_path = self.format_path(pseudo_path)
+ if not isabs(pseudo_path) or pseudo_path == "/":
+ return -errno.EINVAL, "", f"pseudo path {pseudo_path} is invalid. "\
+ "It should not be absolute path or just '/'."
+
if cluster_id not in self.exports:
self.exports[cluster_id] = []
access_type = "RO"
ex_dict = {
'path': self.format_path(path),
- 'pseudo': self.format_path(pseudo_path),
+ 'pseudo': pseudo_path,
'cluster_id': cluster_id,
'access_type': access_type,
'fsal': {"name": "CEPH", "user_id": user_id,