from .utils import (
CONF_PREFIX,
EXPORT_PREFIX,
+ NonFatalError,
USER_CONF_PREFIX,
export_obj_name,
conf_obj_name,
raise ErrorResponse(f"Cluster {cluster_id!r} does not exist",
return_value=-errno.ENOENT)
- @export_cluster_checker
- def create_export(self, addr: Optional[List[str]] = None, **kwargs: Any) -> Tuple[int, str, str]:
+ def create_export(self, addr: Optional[List[str]] = None, **kwargs: Any) -> Dict[str, Any]:
+ self._validate_cluster_id(kwargs['cluster_id'])
# if addr(s) are provided, construct client list and adjust outer block
clients = []
if addr:
return self.create_rgw_export(**kwargs)
raise NotImplementedError()
except Exception as e:
- return exception_handler(e, f"Failed to create {kwargs['pseudo_path']} export for {kwargs['cluster_id']}")
+ log.exception(f"Failed to create {kwargs['pseudo_path']} export for {kwargs['cluster_id']}")
+ raise ErrorResponse.wrap(e)
@export_cluster_checker
def delete_export(self,
squash: str,
access_type: str,
clients: list = [],
- sectype: Optional[List[str]] = None) -> Tuple[int, str, str]:
+ sectype: Optional[List[str]] = None) -> Dict[str, Any]:
pseudo_path = normalize_path(pseudo_path)
if not self._fetch_export(cluster_id, pseudo_path):
"cluster": cluster_id,
"mode": export.access_type,
}
- return (0, json.dumps(result, indent=4), '')
- return 0, "", "Export already exists"
+ return result
+ raise NonFatalError("Export already exists")
def create_rgw_export(self,
cluster_id: str,
bucket: Optional[str] = None,
user_id: Optional[str] = None,
clients: list = [],
- sectype: Optional[List[str]] = None) -> Tuple[int, str, str]:
+ sectype: Optional[List[str]] = None) -> Dict[str, Any]:
pseudo_path = normalize_path(pseudo_path)
if not bucket and not user_id:
- return -errno.EINVAL, "", "Must specify either bucket or user_id"
+ raise ErrorResponse("Must specify either bucket or user_id")
if not self._fetch_export(cluster_id, pseudo_path):
export = self.create_export_from_dict(
"mode": export.access_type,
"squash": export.squash,
}
- return (0, json.dumps(result, indent=4), '')
- return 0, "", "Export already exists"
+ return result
+ raise NonFatalError("Export already exists")
def _apply_export(
self,
self.inited = True
@CLICommand('nfs export create cephfs', perm='rw')
+ @object_format.Responder()
def _cmd_nfs_export_create_cephfs(
self,
cluster_id: str,
client_addr: Optional[List[str]] = None,
squash: str = 'none',
sectype: Optional[List[str]] = None,
- ) -> Tuple[int, str, str]:
+ ) -> Dict[str, Any]:
"""Create a CephFS export"""
return self.export_mgr.create_export(
fsal_type='cephfs',
)
@CLICommand('nfs export create rgw', perm='rw')
+ @object_format.Responder()
def _cmd_nfs_export_create_rgw(
self,
cluster_id: str,
client_addr: Optional[List[str]] = None,
squash: str = 'none',
sectype: Optional[List[str]] = None,
- ) -> Tuple[int, str, str]:
+ ) -> Dict[str, Any]:
"""Create an RGW export"""
return self.export_mgr.create_export(
fsal_type='rgw',
-from typing import List, TYPE_CHECKING
+from typing import List, Tuple, TYPE_CHECKING
+from object_format import ErrorResponseBase
import orchestrator
if TYPE_CHECKING:
USER_CONF_PREFIX: str = "userconf-nfs."
+class NonFatalError(ErrorResponseBase):
+ """Raise this exception when you want to interrupt the flow of a function
+ and return an informative message to the user. In certain situations the
+ NFS MGR module wants to indicate an action was or was not taken but still
+ return a success code so that non-interactive scripts continue as if the
+ overall action was completed.
+ """
+ def __init__(self, msg: str) -> None:
+ super().__init__(msg)
+ self.msg = msg
+
+ def format_response(self) -> Tuple[int, str, str]:
+ return 0, "", self.msg
+
+
def export_obj_name(export_id: int) -> str:
"""Return a rados object name for the export."""
return f"{EXPORT_PREFIX}{export_id}"