**kwargs: Any
) -> Tuple[int, str, str]:
"""
- This method checks if cluster exists and sets rados namespace.
+ This method checks if cluster exists
"""
if kwargs['cluster_id'] not in available_clusters(fs_export.mgr):
return -errno.ENOENT, "", "Cluster does not exists"
return path
@export_cluster_checker
- def create_export(self, cluster_id: str, **kwargs: Any) -> Tuple[int, str, str]:
+ def create_export(self, **kwargs: Any) -> Tuple[int, str, str]:
try:
fsal_type = kwargs.pop('fsal_type')
if fsal_type == 'cephfs':
return exception_handler(e, f"Failed to create {kwargs['pseudo_path']} export for {kwargs['cluster_id']}")
@export_cluster_checker
- def delete_export(self, cluster_id: str, pseudo_path: str) -> Tuple[int, str, str]:
+ def delete_export(self,
+ cluster_id: str = '',
+ pseudo_path: Optional[str] = None) -> Tuple[int, str, str]:
+ assert pseudo_path is not None
return self._delete_export(cluster_id, pseudo_path)
def delete_all_exports(self, cluster_id: str) -> None:
log.info(f"All exports successfully deleted for cluster id: {cluster_id}")
@export_cluster_checker
- def list_exports(self, cluster_id: str, detailed: bool) -> Tuple[int, str, str]:
+ def list_exports(self,
+ cluster_id: str = '',
+ detailed: bool = False) -> Tuple[int, str, str]:
try:
if detailed:
result_d = [export.to_dict() for export in self.exports[cluster_id]]
@export_cluster_checker
def get_export(
self,
- cluster_id: str,
- pseudo_path: Optional[str]
+ cluster_id: str = '',
+ pseudo_path: Optional[str] = None
) -> Tuple[int, str, str]:
try:
export = self._fetch_export(cluster_id, pseudo_path)
def _cmd_nfs_export_create_cephfs(
self,
fsname: str,
- clusterid: str,
+ cluster_id: str,
binding: str,
path: Optional[str] = '/',
readonly: Optional[bool] = False,
}]
squash = 'none'
return self.export_mgr.create_export(fsal_type='cephfs', fs_name=fsname,
- cluster_id=clusterid, pseudo_path=binding,
+ cluster_id=cluster_id, pseudo_path=binding,
read_only=readonly, path=path,
squash=squash, clients=clients)
def _cmd_rgw_export_create_cephfs(
self,
bucket: str,
- clusterid: str,
+ cluster_id: str,
binding: str,
readonly: Optional[bool] = False,
addr: Optional[str] = None,
squash = 'none'
return self.export_mgr.create_export(fsal_type='rgw', bucket=bucket,
realm=realm,
- cluster_id=clusterid, pseudo_path=binding,
+ cluster_id=cluster_id, pseudo_path=binding,
read_only=readonly, squash=squash,
clients=clients)
@CLICommand('nfs export import', perm='rw')
def _cmd_nfs_export_import(self,
- clusterid: str,
+ cluster_id: str,
inbuf: str) -> Tuple[int, str, str]:
"""Create one or more exports from JSON specification"""
- return self.export_mgr.import_export(clusterid, inbuf)
+ return self.export_mgr.import_export(cluster_id, inbuf)
@CLICommand('nfs export rm', perm='rw')
- def _cmd_nfs_export_rm(self, clusterid: str, binding: str) -> Tuple[int, str, str]:
+ def _cmd_nfs_export_rm(self, cluster_id: str, binding: str) -> Tuple[int, str, str]:
"""Remove a cephfs export"""
- return self.export_mgr.delete_export(cluster_id=clusterid, pseudo_path=binding)
+ return self.export_mgr.delete_export(cluster_id=cluster_id, pseudo_path=binding)
@CLICommand('nfs export delete', perm='rw')
- def _cmd_nfs_export_delete(self, clusterid: str, binding: str) -> Tuple[int, str, str]:
+ def _cmd_nfs_export_delete(self, cluster_id: str, binding: str) -> Tuple[int, str, str]:
"""Delete a cephfs export (DEPRECATED)"""
- return self.export_mgr.delete_export(cluster_id=clusterid, pseudo_path=binding)
+ return self.export_mgr.delete_export(cluster_id=cluster_id, pseudo_path=binding)
@CLICommand('nfs export ls', perm='r')
- def _cmd_nfs_export_ls(self, clusterid: str, detailed: bool = False) -> Tuple[int, str, str]:
+ def _cmd_nfs_export_ls(self, cluster_id: str, detailed: bool = False) -> Tuple[int, str, str]:
"""List exports of a NFS cluster"""
- return self.export_mgr.list_exports(cluster_id=clusterid, detailed=detailed)
+ return self.export_mgr.list_exports(cluster_id=cluster_id, detailed=detailed)
@CLICommand('nfs export get', perm='r')
- def _cmd_nfs_export_get(self, clusterid: str, binding: str) -> Tuple[int, str, str]:
+ def _cmd_nfs_export_get(self, cluster_id: str, binding: str) -> Tuple[int, str, str]:
"""Fetch a export of a NFS cluster given the pseudo path/binding"""
- return self.export_mgr.get_export(cluster_id=clusterid, pseudo_path=binding)
+ return self.export_mgr.get_export(cluster_id=cluster_id, pseudo_path=binding)
@CLICommand('nfs export update', perm='rw')
@CLICheckNonemptyFileInput(desc='CephFS Export configuration')
- def _cmd_nfs_export_update(self, clusterid: str, inbuf: str) -> Tuple[int, str, str]:
+ def _cmd_nfs_export_update(self, cluster_id: str, inbuf: str) -> Tuple[int, str, str]:
"""Update an export of a NFS cluster by `-i <json_file>`"""
# The export <json_file> is passed to -i and it's processing is handled by the Ceph CLI.
- return self.export_mgr.update_export(clusterid, export_config=inbuf)
+ return self.export_mgr.update_export(cluster_id, export_config=inbuf)
@CLICommand('nfs cluster create', perm='rw')
def _cmd_nfs_cluster_create(self,
- clusterid: str,
+ cluster_id: str,
placement: Optional[str] = None,
ingress: Optional[bool] = None,
virtual_ip: Optional[str] = None) -> Tuple[int, str, str]:
"""Create an NFS Cluster"""
- return self.nfs.create_nfs_cluster(cluster_id=clusterid, placement=placement,
+ return self.nfs.create_nfs_cluster(cluster_id=cluster_id, placement=placement,
virtual_ip=virtual_ip, ingress=ingress)
@CLICommand('nfs cluster rm', perm='rw')
- def _cmd_nfs_cluster_rm(self, clusterid: str) -> Tuple[int, str, str]:
+ def _cmd_nfs_cluster_rm(self, cluster_id: str) -> Tuple[int, str, str]:
"""Removes an NFS Cluster"""
- return self.nfs.delete_nfs_cluster(cluster_id=clusterid)
+ return self.nfs.delete_nfs_cluster(cluster_id=cluster_id)
@CLICommand('nfs cluster delete', perm='rw')
- def _cmd_nfs_cluster_delete(self, clusterid: str) -> Tuple[int, str, str]:
+ def _cmd_nfs_cluster_delete(self, cluster_id: str) -> Tuple[int, str, str]:
"""Removes an NFS Cluster (DEPRECATED)"""
- return self.nfs.delete_nfs_cluster(cluster_id=clusterid)
+ return self.nfs.delete_nfs_cluster(cluster_id=cluster_id)
@CLICommand('nfs cluster ls', perm='r')
def _cmd_nfs_cluster_ls(self) -> Tuple[int, str, str]:
return self.nfs.list_nfs_cluster()
@CLICommand('nfs cluster info', perm='r')
- def _cmd_nfs_cluster_info(self, clusterid: Optional[str] = None) -> Tuple[int, str, str]:
+ def _cmd_nfs_cluster_info(self, cluster_id: Optional[str] = None) -> Tuple[int, str, str]:
"""Displays NFS Cluster info"""
- return self.nfs.show_nfs_cluster_info(cluster_id=clusterid)
+ return self.nfs.show_nfs_cluster_info(cluster_id=cluster_id)
@CLICommand('nfs cluster config set', perm='rw')
@CLICheckNonemptyFileInput(desc='NFS-Ganesha Configuration')
- def _cmd_nfs_cluster_config_set(self, clusterid: str, inbuf: str) -> Tuple[int, str, str]:
+ def _cmd_nfs_cluster_config_set(self, cluster_id: str, inbuf: str) -> Tuple[int, str, str]:
"""Set NFS-Ganesha config by `-i <config_file>`"""
- return self.nfs.set_nfs_cluster_config(cluster_id=clusterid, nfs_config=inbuf)
+ return self.nfs.set_nfs_cluster_config(cluster_id=cluster_id, nfs_config=inbuf)
@CLICommand('nfs cluster config reset', perm='rw')
- def _cmd_nfs_cluster_config_reset(self, clusterid: str) -> Tuple[int, str, str]:
+ def _cmd_nfs_cluster_config_reset(self, cluster_id: str) -> Tuple[int, str, str]:
"""Reset NFS-Ganesha Config to default"""
- return self.nfs.reset_nfs_cluster_config(cluster_id=clusterid)
+ return self.nfs.reset_nfs_cluster_config(cluster_id=cluster_id)