Set,
cast)
from os.path import normpath
+import cephfs
from rados import TimedOut, ObjectNotFound, Rados, LIBRADOS_ALL_NSPACES
from mgr_module import NFS_POOL_NAME as POOL_NAME, NFS_GANESHA_SUPPORTED_FSALS
from .export_utils import GaneshaConfParser, Export, RawBlock, CephFSFSAL, RGWFSAL
-from .exception import NFSException, NFSInvalidOperation, FSNotFound
+from .exception import NFSException, NFSInvalidOperation, FSNotFound, NFSObjectNotFound
from .utils import (
CONF_PREFIX,
EXPORT_PREFIX,
conf_obj_name,
available_clusters,
check_fs,
- restart_nfs_service, check_cephfs_path)
+ restart_nfs_service, cephfs_path_is_dir)
if TYPE_CHECKING:
from nfs.module import Module
clients: list = [],
sectype: Optional[List[str]] = None) -> Tuple[int, str, str]:
- check_cephfs_path(self.mgr, fs_name, path)
+ try:
+ cephfs_path_is_dir(self.mgr, fs_name, path)
+ except NotADirectoryError:
+ raise NFSException(f"path {path} is not a dir", -errno.ENOTDIR)
+ except cephfs.ObjectNotFound:
+ raise NFSObjectNotFound(f"path {path} does not exist")
+ except cephfs.Error as e:
+ raise NFSException(e.args[1], -e.args[0])
pseudo_path = normalize_path(pseudo_path)