Set,
cast)
from os.path import normpath
+import cephfs
from rados import TimedOut, ObjectNotFound, Rados, LIBRADOS_ALL_NSPACES
RGWFSAL,
RawBlock,
format_block)
-from .exception import NFSException, NFSInvalidOperation, FSNotFound
+from .exception import NFSException, NFSInvalidOperation, FSNotFound, NFSObjectNotFound
from .utils import (
CONF_PREFIX,
EXPORT_PREFIX,
conf_obj_name,
available_clusters,
check_fs,
- restart_nfs_service, check_cephfs_path)
+ restart_nfs_service, cephfs_path_is_dir)
if TYPE_CHECKING:
from nfs.module import Module
clients: list = [],
sectype: Optional[List[str]] = None) -> Dict[str, Any]:
- check_cephfs_path(self.mgr, fs_name, path)
+ try:
+ cephfs_path_is_dir(self.mgr, fs_name, path)
+ except NotADirectoryError:
+ raise NFSException(f"path {path} is not a dir", -errno.ENOTDIR)
+ except cephfs.ObjectNotFound:
+ raise NFSObjectNotFound(f"path {path} does not exist")
+ except cephfs.Error as e:
+ raise NFSException(e.args[1], -e.args[0])
pseudo_path = normalize_path(pseudo_path)