from rados import TimedOut, ObjectNotFound, Rados, LIBRADOS_ALL_NSPACES
+from object_format import ErrorResponse
from orchestrator import NoOrchestrator
from mgr_module import NFS_POOL_NAME as POOL_NAME, NFS_GANESHA_SUPPORTED_FSALS
if need_nfs_service_restart:
restart_nfs_service(self.mgr, export.cluster_id)
+ def _validate_cluster_id(self, cluster_id: str) -> None:
+ """Raise an exception if cluster_id is not valid."""
+ clusters = known_cluster_ids(self.mgr)
+ log.debug("checking for %r in known nfs clusters: %r",
+ cluster_id, clusters)
+ if cluster_id not in clusters:
+ raise ErrorResponse(f"Cluster {cluster_id!r} does not exist",
+ return_value=-errno.ENOENT)
+
@export_cluster_checker
def create_export(self, addr: Optional[List[str]] = None, **kwargs: Any) -> Tuple[int, str, str]:
# if addr(s) are provided, construct client list and adjust outer block
r.extend([e.to_dict() for e in ls])
return r
- @export_cluster_checker
def list_exports(self,
cluster_id: str,
- detailed: bool = False) -> Tuple[int, str, str]:
+ detailed: bool = False) -> List[Any]:
+ self._validate_cluster_id(cluster_id)
try:
if detailed:
result_d = [export.to_dict() for export in self.exports[cluster_id]]
- return 0, json.dumps(result_d, indent=2), ''
+ return result_d
else:
result_ps = [export.pseudo for export in self.exports[cluster_id]]
- return 0, json.dumps(result_ps, indent=2), ''
+ return result_ps
except KeyError:
log.warning("No exports to list for %s", cluster_id)
- return 0, '', ''
+ return []
except Exception as e:
- return exception_handler(e, f"Failed to list exports for {cluster_id}")
+ log.exception(f"Failed to list exports for {cluster_id}")
+ raise ErrorResponse.wrap(e)
def _get_export_dict(self, cluster_id: str, pseudo_path: str) -> Optional[Dict[str, Any]]:
export = self._fetch_export(cluster_id, pseudo_path)
from typing import Tuple, Optional, List, Dict, Any
from mgr_module import MgrModule, CLICommand, Option, CLICheckNonemptyFileInput
+import object_format
import orchestrator
from .export import ExportMgr
return self.export_mgr.delete_export(cluster_id=cluster_id, pseudo_path=pseudo_path)
@CLICommand('nfs export ls', perm='r')
- def _cmd_nfs_export_ls(self, cluster_id: str, detailed: bool = False) -> Tuple[int, str, str]:
+ @object_format.Responder()
+ def _cmd_nfs_export_ls(self, cluster_id: str, detailed: bool = False) -> List[Any]:
"""List exports of a NFS cluster"""
return self.export_mgr.list_exports(cluster_id=cluster_id, detailed=detailed)
nfs_mod = Module('nfs', '', '')
conf = ExportMgr(nfs_mod)
- exports = conf.list_exports(cluster_id=self.cluster_id)
- ls = json.loads(exports[1])
+ ls = conf.list_exports(cluster_id=self.cluster_id)
assert len(ls) == 2
r = conf.create_export(
)
assert r[0] == 0
- exports = conf.list_exports(cluster_id=self.cluster_id)
- ls = json.loads(exports[1])
+ ls = conf.list_exports(cluster_id=self.cluster_id)
assert len(ls) == 3
export = conf._fetch_export('foo', '/mybucket')
nfs_mod = Module('nfs', '', '')
conf = ExportMgr(nfs_mod)
- exports = conf.list_exports(cluster_id=self.cluster_id)
- ls = json.loads(exports[1])
+ ls = conf.list_exports(cluster_id=self.cluster_id)
assert len(ls) == 2
r = conf.create_export(
)
assert r[0] == 0
- exports = conf.list_exports(cluster_id=self.cluster_id)
- ls = json.loads(exports[1])
+ ls = conf.list_exports(cluster_id=self.cluster_id)
assert len(ls) == 3
export = conf._fetch_export('foo', '/mybucket')
nfs_mod = Module('nfs', '', '')
conf = ExportMgr(nfs_mod)
- exports = conf.list_exports(cluster_id=self.cluster_id)
- ls = json.loads(exports[1])
+ ls = conf.list_exports(cluster_id=self.cluster_id)
assert len(ls) == 2
r = conf.create_export(
)
assert r[0] == 0
- exports = conf.list_exports(cluster_id=self.cluster_id)
- ls = json.loads(exports[1])
+ ls = conf.list_exports(cluster_id=self.cluster_id)
assert len(ls) == 3
export = conf._fetch_export('foo', '/mybucket')
nfs_mod = Module('nfs', '', '')
conf = ExportMgr(nfs_mod)
- exports = conf.list_exports(cluster_id=self.cluster_id)
- ls = json.loads(exports[1])
+ ls = conf.list_exports(cluster_id=self.cluster_id)
assert len(ls) == 2
r = conf.create_export(
)
assert r[0] == 0
- exports = conf.list_exports(cluster_id=self.cluster_id)
- ls = json.loads(exports[1])
+ ls = conf.list_exports(cluster_id=self.cluster_id)
assert len(ls) == 3
export = conf._fetch_export('foo', '/cephfs2')