import json
import re
import socket
-from typing import cast, Dict, List, Any, Union, Optional, TypeVar, Callable, TYPE_CHECKING, Tuple
+from typing import cast, Dict, List, Any, Union, Optional, TYPE_CHECKING, Tuple
from ceph.deployment.service_spec import NFSServiceSpec, PlacementSpec, IngressSpec
from nfs.module import Module
from mgr_module import MgrModule
-FuncT = TypeVar('FuncT', bound=Callable)
log = logging.getLogger(__name__)
raise NFSInvalidOperation(f"Cannot resolve IP for host {hostname}: {e}")
-def cluster_setter(func: FuncT) -> FuncT:
- def set_pool_ns_clusterid(nfs: 'NFSCluster', *args: Any, **kwargs: Any) -> Any:
- return func(nfs, *args, **kwargs)
- return cast(FuncT, set_pool_ns_clusterid)
-
-
def create_ganesha_pool(mgr: 'MgrModule', pool: str) -> None:
pool_list = [p['pool_name'] for p in mgr.get_osdmap().dump().get('pools', [])]
if pool not in pool_list:
log.info(f"Deleted {self._get_common_conf_obj_name(cluster_id)} object and all objects in "
f"{cluster_id}")
- @cluster_setter
def create_nfs_cluster(self,
cluster_id: str,
placement: Optional[str],
except Exception as e:
return exception_handler(e, f"NFS Cluster {cluster_id} could not be created")
- @cluster_setter
def delete_nfs_cluster(self, cluster_id: str) -> Tuple[int, str, str]:
try:
cluster_list = available_clusters(self.mgr)
except Exception as e:
return exception_handler(e, "Failed to show info for cluster")
- @cluster_setter
def set_nfs_cluster_config(self, cluster_id: str, nfs_config: str) -> Tuple[int, str, str]:
try:
if cluster_id in available_clusters(self.mgr):
except Exception as e:
return exception_handler(e, f"Setting NFS-Ganesha Config failed for {cluster_id}")
- @cluster_setter
def reset_nfs_cluster_config(self, cluster_id: str) -> Tuple[int, str, str]:
try:
if cluster_id in available_clusters(self.mgr):