import logging
import re
import socket
-from typing import cast, Dict, List, Any, Union, Optional, TYPE_CHECKING, Tuple
+from typing import cast, Dict, List, Any, Union, Optional, TYPE_CHECKING
from mgr_module import NFS_POOL_NAME as POOL_NAME
from ceph.deployment.service_spec import NFSServiceSpec, PlacementSpec, IngressSpec
conf_obj_name,
restart_nfs_service,
user_conf_obj_name)
-from .export import NFSRados, exception_handler
+from .export import NFSRados
if TYPE_CHECKING:
from nfs.module import Module
log.exception("Failed to show info for cluster")
raise ErrorResponse.wrap(e)
- def get_nfs_cluster_config(self, cluster_id: str) -> Tuple[int, str, str]:
+ def get_nfs_cluster_config(self, cluster_id: str) -> str:
try:
if cluster_id in available_clusters(self.mgr):
rados_obj = self._rados(cluster_id)
conf = rados_obj.read_obj(user_conf_obj_name(cluster_id))
- return 0, conf or "", ""
+ return conf or ""
raise ClusterNotFound()
except Exception as e:
- return exception_handler(e, f"Fetching NFS-Ganesha Config failed for {cluster_id}")
+ log.exception(f"Fetching NFS-Ganesha Config failed for {cluster_id}")
+ raise ErrorResponse.wrap(e)
def set_nfs_cluster_config(self, cluster_id: str, nfs_config: str) -> None:
try:
return self.nfs.show_nfs_cluster_info(cluster_id=cluster_id)
@CLICommand('nfs cluster config get', perm='r')
+ @object_format.ErrorResponseHandler()
def _cmd_nfs_cluster_config_get(self, cluster_id: str) -> Tuple[int, str, str]:
"""Fetch NFS-Ganesha config"""
- return self.nfs.get_nfs_cluster_config(cluster_id=cluster_id)
+ conf = self.nfs.get_nfs_cluster_config(cluster_id=cluster_id)
+ return 0, conf, ""
@CLICommand('nfs cluster config set', perm='rw')
@CLICheckNonemptyFileInput(desc='NFS-Ganesha Configuration')
nfs_mod = Module('nfs', '', '')
cluster = NFSCluster(nfs_mod)
- rc, out, err = cluster.get_nfs_cluster_config(self.cluster_id)
- assert rc == 0
+ out = cluster.get_nfs_cluster_config(self.cluster_id)
assert out == ""
cluster.set_nfs_cluster_config(self.cluster_id, '# foo\n')
- rc, out, err = cluster.get_nfs_cluster_config(self.cluster_id)
- assert rc == 0
+ out = cluster.get_nfs_cluster_config(self.cluster_id)
assert out == "# foo\n"
cluster.reset_nfs_cluster_config(self.cluster_id)
- rc, out, err = cluster.get_nfs_cluster_config(self.cluster_id)
- assert rc == 0
+ out = cluster.get_nfs_cluster_config(self.cluster_id)
assert out == ""
def test_cluster_config(self):