from mgr_module import NFS_POOL_NAME as POOL_NAME
from ceph.deployment.service_spec import NFSServiceSpec, PlacementSpec, IngressSpec
+from object_format import ErrorResponse
import orchestrator
except Exception as e:
return exception_handler(e, f"Failed to delete NFS Cluster {cluster_id}")
- def list_nfs_cluster(self) -> Tuple[int, str, str]:
+ def list_nfs_cluster(self) -> List[str]:
try:
- return 0, '\n'.join(available_clusters(self.mgr)), ""
+ return available_clusters(self.mgr)
except Exception as e:
- return exception_handler(e, "Failed to list NFS Cluster")
+ log.exception("Failed to list NFS Cluster")
+ raise ErrorResponse.wrap(e)
def _show_nfs_cluster_info(self, cluster_id: str) -> Dict[str, Any]:
completion = self.mgr.list_daemons(daemon_type='nfs')
return self.nfs.delete_nfs_cluster(cluster_id=cluster_id)
@CLICommand('nfs cluster ls', perm='r')
- def _cmd_nfs_cluster_ls(self) -> Tuple[int, str, str]:
+ @object_format.Responder()
+ def _cmd_nfs_cluster_ls(self) -> List[str]:
"""List NFS Clusters"""
return self.nfs.list_nfs_cluster()
nfs_mod = Module('nfs', '', '')
cluster = NFSCluster(nfs_mod)
- rc, out, err = cluster.list_nfs_cluster()
- assert rc == 0
- assert out == self.cluster_id
+ out = cluster.list_nfs_cluster()
+ assert out[0] == self.cluster_id
def test_cluster_ls(self):
self._do_mock_test(self._do_test_cluster_ls)