import orchestrator
from .exception import NFSInvalidOperation, ClusterNotFound
-from .utils import (available_clusters, restart_nfs_service, conf_obj_name,
- user_conf_obj_name)
+from .utils import (
+ NonFatalError,
+ available_clusters,
+ conf_obj_name,
+ restart_nfs_service,
+ user_conf_obj_name)
from .export import NFSRados, exception_handler
if TYPE_CHECKING:
virtual_ip: Optional[str],
ingress: Optional[bool] = None,
port: Optional[int] = None,
- ) -> Tuple[int, str, str]:
-
+ ) -> None:
try:
if virtual_ip:
# validate virtual_ip value: ip_address throws a ValueError
if cluster_id not in available_clusters(self.mgr):
self._call_orch_apply_nfs(cluster_id, placement, virtual_ip, port)
- return 0, "", ""
- return 0, "", f"{cluster_id} cluster already exists"
+ return
+ raise NonFatalError(f"{cluster_id} cluster already exists")
except Exception as e:
- return exception_handler(e, f"NFS Cluster {cluster_id} could not be created")
+ log.exception(f"NFS Cluster {cluster_id} could not be created")
+ raise ErrorResponse.wrap(e)
def delete_nfs_cluster(self, cluster_id: str) -> Tuple[int, str, str]:
try:
return self.export_mgr.apply_export(cluster_id, export_config=inbuf)
@CLICommand('nfs cluster create', perm='rw')
+ @object_format.EmptyResponder()
def _cmd_nfs_cluster_create(self,
cluster_id: str,
placement: Optional[str] = None,
ingress: Optional[bool] = None,
virtual_ip: Optional[str] = None,
- port: Optional[int] = None) -> Tuple[int, str, str]:
+ port: Optional[int] = None) -> None:
"""Create an NFS Cluster"""
return self.nfs.create_nfs_cluster(cluster_id=cluster_id, placement=placement,
virtual_ip=virtual_ip, ingress=ingress,