import socket
import json
import re
-from typing import cast, Dict, List, Any, Union
+from typing import cast, Dict, List, Any, Union, Optional
from ceph.deployment.service_spec import NFSServiceSpec, PlacementSpec, IngressSpec
from cephadm.utils import resolve_ip
f"{self.pool_ns}")
@cluster_setter
- def create_nfs_cluster(self, cluster_id, placement, virtual_ip):
+ def create_nfs_cluster(self,
+ cluster_id: str,
+ placement: Optional[str],
+ virtual_ip: Optional[str],
+ ingress: Optional[bool] = None):
try:
+ if virtual_ip and not ingress:
+ raise NFSInvalidOperation('virtual_ip can only be provided with ingress enabled')
+ if not virtual_ip and ingress:
+ raise NFSInvalidOperation('ingress currently requires a virtual_ip')
invalid_str = re.search('[^A-Za-z0-9-_.]', cluster_id)
if invalid_str:
raise NFSInvalidOperation(f"cluster id {cluster_id} is invalid. "
"ip": cluster.ip or resolve_ip(cluster.hostname),
"port": cluster.ports[0]
})
- except OrchestratorError:
+ except orchestrator.OrchestratorError:
continue
r: Dict[str, Any] = {
ingress: Optional[bool]=None,
virtual_ip: Optional[str]=None) -> Tuple[int, str, str]:
"""Create an NFS Cluster"""
- if virtual_ip and not ingress:
- return (-errno.EINVAL, '',
- '--virtual-ip can only be provided with --ingress')
- if ingress and not virtual_ip:
- return (-errno.EINVAL, '',
- '--ingress current requires --virtual-ip')
return self.nfs.create_nfs_cluster(cluster_id=clusterid, placement=placement,
- virtual_ip=virtual_ip)
+ virtual_ip=virtual_ip, ingress=ingress)
@CLICommand('nfs cluster rm', perm='rw')
def _cmd_nfs_cluster_rm(self, clusterid: str) -> Tuple[int, str, str]: