log.warning(f"No {pseudo_path} export to show for {cluster_id}")
return None
- @export_cluster_checker
def get_export(
self,
cluster_id: str,
pseudo_path: str,
- ) -> Tuple[int, str, str]:
+ ) -> Dict[str, Any]:
+ self._validate_cluster_id(cluster_id)
try:
export_dict = self._get_export_dict(cluster_id, pseudo_path)
- if export_dict:
- return 0, json.dumps(export_dict, indent=2), ''
- log.warning("No %s export to show for %s", pseudo_path, cluster_id)
- return 0, '', ''
+ log.info(f"Fetched {export_dict!r} for {cluster_id!r}, {pseudo_path!r}")
+ return export_dict if export_dict else {}
except Exception as e:
- return exception_handler(e, f"Failed to get {pseudo_path} export for {cluster_id}")
+ log.exception(f"Failed to get {pseudo_path} export for {cluster_id}")
+ raise ErrorResponse.wrap(e)
def get_export_by_id(
self,
return self.export_mgr.list_exports(cluster_id=cluster_id, detailed=detailed)
@CLICommand('nfs export info', perm='r')
- def _cmd_nfs_export_info(self, cluster_id: str, pseudo_path: str) -> Tuple[int, str, str]:
+ @object_format.Responder()
+ def _cmd_nfs_export_info(self, cluster_id: str, pseudo_path: str) -> Dict[str, Any]:
"""Fetch a export of a NFS cluster given the pseudo path/binding"""
return self.export_mgr.get_export(cluster_id=cluster_id, pseudo_path=pseudo_path)
@CLICommand('nfs export get', perm='r')
- def _cmd_nfs_export_get(self, cluster_id: str, pseudo_path: str) -> Tuple[int, str, str]:
+ @object_format.Responder()
+ def _cmd_nfs_export_get(self, cluster_id: str, pseudo_path: str) -> Dict[str, Any]:
"""Fetch a export of a NFS cluster given the pseudo path/binding (DEPRECATED)"""
return self.export_mgr.get_export(cluster_id=cluster_id, pseudo_path=pseudo_path)