[mypy-mds_autoscaler.*]
disallow_untyped_defs = True
+[mypy-nfs.*]
+disallow_untyped_defs = True
+
[mypy-orchestrator.*]
disallow_untyped_defs = True
import socket
import json
import re
-from typing import cast, Dict, List, Any, Union, Optional
+from typing import cast, Dict, List, Any, Union, Optional, TypeVar, Callable, TYPE_CHECKING, Tuple
from ceph.deployment.service_spec import NFSServiceSpec, PlacementSpec, IngressSpec
from .utils import POOL_NAME, available_clusters, restart_nfs_service
from .export import NFSRados, exception_handler
+if TYPE_CHECKING:
+ from nfs.module import Module
+ from mgr_module import MgrModule
+
+FuncT = TypeVar('FuncT', bound=Callable)
+
log = logging.getLogger(__name__)
raise NFSInvalidOperation(f"Cannot resolve IP for host {hostname}: {e}")
-def cluster_setter(func):
- def set_pool_ns_clusterid(nfs, *args, **kwargs):
+def cluster_setter(func: FuncT) -> FuncT:
+ def set_pool_ns_clusterid(nfs: 'NFSCluster', *args: Any, **kwargs: Any) -> Any:
nfs._set_pool_namespace(kwargs['cluster_id'])
nfs._set_cluster_id(kwargs['cluster_id'])
return func(nfs, *args, **kwargs)
- return set_pool_ns_clusterid
+ return cast(FuncT, set_pool_ns_clusterid)
-def create_ganesha_pool(mgr, pool):
+def create_ganesha_pool(mgr: 'MgrModule', pool: str) -> None:
pool_list = [p['pool_name'] for p in mgr.get_osdmap().dump().get('pools', [])]
if pool not in pool_list:
mgr.check_mon_command({'prefix': 'osd pool create', 'pool': pool})
class NFSCluster:
- def __init__(self, mgr):
+ def __init__(self, mgr: 'Module') -> None:
self.pool_name = POOL_NAME
self.pool_ns = ''
self.mgr = mgr
- def _set_cluster_id(self, cluster_id):
+ def _set_cluster_id(self, cluster_id: str) -> None:
self.cluster_id = cluster_id
- def _set_pool_namespace(self, cluster_id):
+ def _set_pool_namespace(self, cluster_id: str) -> None:
self.pool_ns = cluster_id
- def _get_common_conf_obj_name(self):
+ def _get_common_conf_obj_name(self) -> str:
return f'conf-nfs.{self.cluster_id}'
- def _get_user_conf_obj_name(self):
+ def _get_user_conf_obj_name(self) -> str:
return f'userconf-nfs.{self.cluster_id}'
- def _call_orch_apply_nfs(self, placement, virtual_ip=None):
+ def _call_orch_apply_nfs(self, placement: Optional[str], virtual_ip: Optional[str] = None) -> None:
if virtual_ip:
# nfs + ingress
# run NFS on non-standard port
completion = self.mgr.apply_nfs(spec)
orchestrator.raise_if_exception(completion)
- def create_empty_rados_obj(self):
+ def create_empty_rados_obj(self) -> None:
common_conf = self._get_common_conf_obj_name()
NFSRados(self.mgr, self.pool_ns).write_obj('', self._get_common_conf_obj_name())
log.info(f"Created empty object:{common_conf}")
- def delete_config_obj(self):
+ def delete_config_obj(self) -> None:
NFSRados(self.mgr, self.pool_ns).remove_all_obj()
log.info(f"Deleted {self._get_common_conf_obj_name()} object and all objects in "
f"{self.pool_ns}")
cluster_id: str,
placement: Optional[str],
virtual_ip: Optional[str],
- ingress: Optional[bool] = None):
+ ingress: Optional[bool] = None) -> Tuple[int, str, str]:
try:
if virtual_ip and not ingress:
raise NFSInvalidOperation('virtual_ip can only be provided with ingress enabled')
return exception_handler(e, f"NFS Cluster {cluster_id} could not be created")
@cluster_setter
- def delete_nfs_cluster(self, cluster_id):
+ def delete_nfs_cluster(self, cluster_id: str) -> Tuple[int, str, str]:
try:
cluster_list = available_clusters(self.mgr)
if cluster_id in cluster_list:
except Exception as e:
return exception_handler(e, f"Failed to delete NFS Cluster {cluster_id}")
- def list_nfs_cluster(self):
+ def list_nfs_cluster(self) -> Tuple[int, str, str]:
try:
return 0, '\n'.join(available_clusters(self.mgr)), ""
except Exception as e:
def _show_nfs_cluster_info(self, cluster_id: str) -> Dict[str, Any]:
self._set_cluster_id(cluster_id)
completion = self.mgr.list_daemons(daemon_type='nfs')
- orchestrator.raise_if_exception(completion)
- backends: List[Dict[str, Union[str, int]]] = []
# Here completion.result is a list DaemonDescription objects
- for cluster in completion.result:
+ clusters = orchestrator.raise_if_exception(completion)
+ backends: List[Dict[str, Union[Any]]] = []
+
+ for cluster in clusters:
if self.cluster_id == cluster.service_id():
+ assert cluster.hostname
try:
if cluster.ip:
ip = cluster.ip
# sigh
ip = resolve_ip(cluster.hostname)
backends.append({
- "hostname": cluster.hostname,
- "ip": ip,
- "port": cluster.ports[0]
- })
+ "hostname": cluster.hostname,
+ "ip": ip,
+ "port": cluster.ports[0] if cluster.ports else None
+ })
except orchestrator.OrchestratorError:
continue
'backend': backends,
}
sc = self.mgr.describe_service(service_type='ingress')
- orchestrator.raise_if_exception(sc)
- for i in sc.result:
+ services = orchestrator.raise_if_exception(sc)
+ for i in services:
spec = cast(IngressSpec, i.spec)
if spec.backend_service == f'nfs.{cluster_id}':
- r['virtual_ip'] = i.virtual_ip.split('/')[0]
+ r['virtual_ip'] = i.virtual_ip.split('/')[0] if i.virtual_ip else None
if i.ports:
r['port'] = i.ports[0]
if len(i.ports) > 1:
r['monitor_port'] = i.ports[1]
return r
- def show_nfs_cluster_info(self, cluster_id=None):
+ def show_nfs_cluster_info(self, cluster_id: Optional[str] = None) -> Tuple[int, str, str]:
try:
cluster_ls = []
info_res = {}
return exception_handler(e, "Failed to show info for cluster")
@cluster_setter
- def set_nfs_cluster_config(self, cluster_id, nfs_config):
+ def set_nfs_cluster_config(self, cluster_id: str, nfs_config: str) -> Tuple[int, str, str]:
try:
if cluster_id in available_clusters(self.mgr):
rados_obj = NFSRados(self.mgr, self.pool_ns)
return exception_handler(e, f"Setting NFS-Ganesha Config failed for {cluster_id}")
@cluster_setter
- def reset_nfs_cluster_config(self, cluster_id):
+ def reset_nfs_cluster_config(self, cluster_id: str) -> Tuple[int, str, str]:
try:
if cluster_id in available_clusters(self.mgr):
rados_obj = NFSRados(self.mgr, self.pool_ns)
import errno
+from typing import Optional
class NFSException(Exception):
- def __init__(self, errno, err_msg):
+ def __init__(self, errno: int, err_msg: str) -> None:
super(NFSException, self).__init__(errno, err_msg)
self.errno = errno
self.err_msg = err_msg
- def __str__(self):
+ def __str__(self) -> str:
return self.err_msg
class NFSInvalidOperation(NFSException):
- def __init__(self, err_msg):
+ def __init__(self, err_msg: str) -> None:
super(NFSInvalidOperation, self).__init__(-errno.EINVAL, err_msg)
class NFSObjectNotFound(NFSException):
- def __init__(self, err_msg):
+ def __init__(self, err_msg: str) -> None:
super(NFSObjectNotFound, self).__init__(-errno.ENOENT, err_msg)
class FSNotFound(NFSObjectNotFound):
- def __init__(self, fs_name):
+ def __init__(self, fs_name: Optional[str]) -> None:
super(FSNotFound, self).__init__(f'filesystem {fs_name} not found')
class ClusterNotFound(NFSObjectNotFound):
- def __init__(self):
+ def __init__(self) -> None:
super(ClusterNotFound, self).__init__('cluster does not exist')
import errno
import json
import logging
-from typing import List
+from typing import List, Any, Dict, Tuple, Optional, TYPE_CHECKING, TypeVar, Callable, cast
from os.path import isabs, normpath
from rados import TimedOut, ObjectNotFound
-from .export_utils import GaneshaConfParser, Export
+from .export_utils import GaneshaConfParser, Export, RawBlock
from .exception import NFSException, NFSInvalidOperation, NFSObjectNotFound, FSNotFound, \
ClusterNotFound
from .utils import POOL_NAME, available_clusters, restart_nfs_service, check_fs
+if TYPE_CHECKING:
+ from nfs.module import Module
+
+FuncT = TypeVar('FuncT', bound=Callable)
+
log = logging.getLogger(__name__)
-def export_cluster_checker(func):
- def cluster_check(fs_export, *args, **kwargs):
+def export_cluster_checker(func: FuncT) -> FuncT:
+ def cluster_check(fs_export: 'ExportMgr', *args: Any, **kwargs: Any) -> Tuple[int, str, str]:
"""
This method checks if cluster exists and sets rados namespace.
"""
return -errno.ENOENT, "", "Cluster does not exists"
fs_export.rados_namespace = kwargs['cluster_id']
return func(fs_export, *args, **kwargs)
- return cluster_check
+ return cast(FuncT, cluster_check)
-def exception_handler(exception_obj, log_msg=""):
+def exception_handler(exception_obj: Exception, log_msg: str = "") -> Tuple[int, str, str]:
if log_msg:
log.exception(log_msg)
return getattr(exception_obj, 'errno', -1), "", str(exception_obj)
class NFSRados:
- def __init__(self, mgr, namespace):
+ def __init__(self, mgr: 'Module', namespace: str) -> None:
self.mgr = mgr
self.pool = POOL_NAME
self.namespace = namespace
- def _make_rados_url(self, obj):
+ def _make_rados_url(self, obj: str) -> str:
return "rados://{}/{}/{}".format(self.pool, self.namespace, obj)
- def _create_url_block(self, obj_name):
- return {'block_name': '%url', 'value': self._make_rados_url(obj_name)}
+ def _create_url_block(self, obj_name: str) -> RawBlock:
+ return RawBlock('%url', values={'value': self._make_rados_url(obj_name)})
- def write_obj(self, conf_block, obj, config_obj=''):
+ def write_obj(self, conf_block: str, obj: str, config_obj: str = '') -> None:
with self.mgr.rados.open_ioctx(self.pool) as ioctx:
ioctx.set_namespace(self.namespace)
ioctx.write_full(obj, conf_block.encode('utf-8'))
ExportMgr._check_rados_notify(ioctx, config_obj)
log.debug(f"Added {obj} url to {config_obj}")
- def update_obj(self, conf_block, obj, config_obj):
+ def update_obj(self, conf_block: str, obj: str, config_obj: str) -> None:
with self.mgr.rados.open_ioctx(self.pool) as ioctx:
ioctx.set_namespace(self.namespace)
ioctx.write_full(obj, conf_block.encode('utf-8'))
ExportMgr._check_rados_notify(ioctx, config_obj)
log.debug(f"Update export {obj} in {config_obj}")
- def remove_obj(self, obj, config_obj):
+ def remove_obj(self, obj: str, config_obj: str) -> None:
with self.mgr.rados.open_ioctx(self.pool) as ioctx:
ioctx.set_namespace(self.namespace)
export_urls = ioctx.read(config_obj)
ExportMgr._check_rados_notify(ioctx, config_obj)
log.debug("Object deleted: {}".format(url))
- def remove_all_obj(self):
+ def remove_all_obj(self) -> None:
with self.mgr.rados.open_ioctx(self.pool) as ioctx:
ioctx.set_namespace(self.namespace)
for obj in ioctx.list_objects():
obj.remove()
- def check_user_config(self):
+ def check_user_config(self) -> bool:
with self.mgr.rados.open_ioctx(self.pool) as ioctx:
ioctx.set_namespace(self.namespace)
for obj in ioctx.list_objects():
class ExportMgr:
- def __init__(self, mgr, namespace=None, export_ls=None):
+ def __init__(self, mgr: 'Module', namespace: Optional[str] = None, export_ls: Optional[Dict[str, List[Export]]] = None) -> None:
self.mgr = mgr
self.rados_pool = POOL_NAME
self.rados_namespace = namespace
- self._exports = export_ls
+ self._exports: Optional[Dict[str, List[Export]]] = export_ls
@staticmethod
- def _check_rados_notify(ioctx, obj):
+ def _check_rados_notify(ioctx: Any, obj: str) -> None:
try:
ioctx.notify(obj)
except TimedOut:
log.exception(f"Ganesha timed out")
@property
- def exports(self):
+ def exports(self) -> Dict[str, List[Export]]:
if self._exports is None:
self._exports = {}
log.info("Begin export parsing")
log.info(f"Exports parsed successfully {self.exports.items()}")
return self._exports
- def _fetch_export(self, cluster_id, pseudo_path):
+ def _fetch_export(self, cluster_id: str, pseudo_path: Optional[str]) -> Optional[Export]:
try:
for ex in self.exports[cluster_id]:
if ex.pseudo == pseudo_path:
return ex
+ return None
except KeyError:
- pass
+ log.info(f'unable to fetch f{cluster_id}')
+ return None
- def _delete_user(self, entity):
+ def _delete_user(self, entity: str) -> None:
self.mgr.check_mon_command({
'prefix': 'auth rm',
'entity': 'client.{}'.format(entity),
})
log.info(f"Export user deleted is {entity}")
- def _gen_export_id(self):
+ def _gen_export_id(self) -> int:
+ assert self.rados_namespace
exports = sorted([ex.export_id for ex in self.exports[self.rados_namespace]])
nid = 1
for e_id in exports:
break
return nid
- def _read_raw_config(self, rados_namespace):
+ def _read_raw_config(self, rados_namespace: str) -> None:
with self.mgr.rados.open_ioctx(self.rados_pool) as ioctx:
ioctx.set_namespace(rados_namespace)
for obj in ioctx.list_objects():
self.export_conf_objs.append(Export.from_export_block(
GaneshaConfParser(raw_config).parse()[0], rados_namespace))
- def _save_export(self, export):
+ def _save_export(self, export: Export) -> None:
+ assert self.rados_namespace
self.exports[self.rados_namespace].append(export)
NFSRados(self.mgr, self.rados_namespace).write_obj(
GaneshaConfParser.write_block(export.to_export_block()),
f'conf-nfs.{export.cluster_id}'
)
- def _delete_export(self, cluster_id, pseudo_path, export_obj=None):
+ def _delete_export(self, cluster_id: str, pseudo_path: Optional[str], export_obj: Optional[Any] = None) -> Tuple[int, str, str]:
+ assert self.rados_namespace
try:
if export_obj:
export = export_obj
except Exception as e:
return exception_handler(e, f"Failed to delete {pseudo_path} export for {cluster_id}")
- def _fetch_export_obj(self, ex_id):
+ def _fetch_export_obj(self, ex_id: int) -> Optional[Export]:
+ assert self.rados_namespace
try:
with self.mgr.rados.open_ioctx(self.rados_pool) as ioctx:
ioctx.set_namespace(self.rados_namespace)
return export
except ObjectNotFound:
log.exception(f"Export ID: {ex_id} not found")
+ return None
- def _update_export(self, export):
+ def _update_export(self, export: Export) -> None:
+ assert self.rados_namespace
self.exports[self.rados_namespace].append(export)
NFSRados(self.mgr, self.rados_namespace).update_obj(
GaneshaConfParser.write_block(export.to_export_block()),
f'export-{export.export_id}', f'conf-nfs.{export.cluster_id}')
- def format_path(self, path: str):
+ def format_path(self, path: str) -> str:
if path:
path = normpath(path.strip())
if path[:2] == "//":
return path
@export_cluster_checker
- def create_export(self, **kwargs):
+ def create_export(self, cluster_id: str, **kwargs: Any) -> Tuple[int, str, str]:
try:
fsal_type = kwargs.pop('fsal_type')
if fsal_type == 'cephfs':
return exception_handler(e, f"Failed to create {kwargs['pseudo_path']} export for {kwargs['cluster_id']}")
@export_cluster_checker
- def delete_export(self, cluster_id, pseudo_path):
+ def delete_export(self, cluster_id: str, pseudo_path: str) -> Tuple[int, str, str]:
return self._delete_export(cluster_id, pseudo_path)
- def delete_all_exports(self, cluster_id):
+ def delete_all_exports(self, cluster_id: str) -> None:
try:
export_list = list(self.exports[cluster_id])
except KeyError:
log.info(f"All exports successfully deleted for cluster id: {cluster_id}")
@export_cluster_checker
- def list_exports(self, cluster_id, detailed):
+ def list_exports(self, cluster_id: str, detailed: bool) -> Tuple[int, str, str]:
try:
if detailed:
- result = [export.to_dict() for export in self.exports[cluster_id]]
+ result_d = [export.to_dict() for export in self.exports[cluster_id]]
+ return 0, json.dumps(result_d, indent=2), ''
else:
- result = [export.pseudo for export in self.exports[cluster_id]]
- return 0, json.dumps(result, indent=2), ''
+ result_ps = [export.pseudo for export in self.exports[cluster_id]]
+ return 0, json.dumps(result_ps, indent=2), ''
+
except KeyError:
log.warning(f"No exports to list for {cluster_id}")
return 0, '', ''
return exception_handler(e, f"Failed to list exports for {cluster_id}")
@export_cluster_checker
- def get_export(self, cluster_id, pseudo_path):
+ def get_export(self, cluster_id: str, pseudo_path: Optional[str]) -> Tuple[int, str, str]:
try:
export = self._fetch_export(cluster_id, pseudo_path)
if export:
except Exception as e:
return exception_handler(e, f"Failed to get {pseudo_path} export for {cluster_id}")
- def update_export(self, export_config):
+ def update_export(self, export_config: str) -> Tuple[int, str, str]:
try:
new_export = json.loads(export_config)
# check export type
- return FSExport(self).update_export(new_export, False)
+ return FSExport(self).update_export_1(new_export, False)
except NotImplementedError:
return 0, " Manual Restart of NFS PODS required for successful update of exports", ""
except Exception as e:
return exception_handler(e, f'Failed to update export: {e}')
- def import_export(self, export_config):
+ def import_export(self, export_config: str) -> Tuple[int, str, str]:
try:
if not export_config:
raise NFSInvalidOperation("Empty Config!!")
new_export = json.loads(export_config)
# check export type
- return FSExport(self).update_export(new_export, True)
+ return FSExport(self).update_export_1(new_export, True)
except NotImplementedError:
return 0, " Manual Restart of NFS PODS required for successful update of exports", ""
except Exception as e:
class FSExport(ExportMgr):
- def __init__(self, export_mgr_obj):
+ def __init__(self, export_mgr_obj: 'ExportMgr') -> None:
super().__init__(export_mgr_obj.mgr, export_mgr_obj.rados_namespace,
export_mgr_obj._exports)
- def _update_user_id(self, path, access_type, fs_name, user_id):
+ def _update_user_id(self, path: str, access_type: str, fs_name: str, user_id: str) -> None:
osd_cap = 'allow rw pool={} namespace={}, allow rw tag cephfs data={}'.format(
self.rados_pool, self.rados_namespace, fs_name)
access_type = 'r' if access_type == 'RO' else 'rw'
log.info(f"Export user updated {user_id}")
- def _create_user_key(self, entity, path, fs_name, fs_ro):
+ def _create_user_key(self, entity: str, path: str, fs_name: str, fs_ro: bool) -> Tuple[str, str]:
osd_cap = 'allow rw pool={} namespace={}, allow rw tag cephfs data={}'.format(
self.rados_pool, self.rados_namespace, fs_name)
access_type = 'r' if fs_ro else 'rw'
log.info("Export user created is {}".format(json_res[0]['entity']))
return json_res[0]['entity'], json_res[0]['key']
- def create_cephfs_export(self, fs_name, cluster_id, pseudo_path,
- read_only, path, squash,
- clients=[]):
+ def create_cephfs_export(self,
+ fs_name: str,
+ cluster_id: str,
+ pseudo_path: str,
+ read_only: bool,
+ path: str,
+ squash: str,
+ clients: list = []) -> Tuple[int, str, str]:
if not check_fs(self.mgr, fs_name):
raise FSNotFound(fs_name)
return (0, json.dumps(result, indent=4), '')
return 0, "", "Export already exists"
- def create_rgw_export(self, bucket, cluster_id, pseudo_path, read_only, squash,
- client=[]):
+ def create_rgw_export(self,
+ bucket: str,
+ cluster_id: str,
+ path: str,
+ pseudo_path: str,
+ read_only: bool,
+ squash: str,
+ clients: list=[]) -> Tuple[int, str, str]:
pseudo_path = self.format_path(pseudo_path)
if cluster_id not in self.exports:
'squash': squash,
'fsal': {
"name": "RGW",
- "user_id": user_id,
- "access_key_id": access_key_id,
- "secret_access_key": secret_access_key,
+ #"user_id": user_id,
+ #"access_key_id": access_key_id,
+ #"secret_access_key": secret_access_key,
},
'clients': clients
}
self._save_export(export)
result = {
"bind": pseudo_path,
- "fs": fs_name,
"path": path,
"cluster": cluster_id,
"mode": access_type,
return (0, json.dumps(result, indent=4), '')
return 0, "", "Export already exists"
- def update_export(self, new_export, can_create):
+ def update_export_1(self, new_export: Dict, can_create: bool) -> Tuple[int, str, str]:
+
for k in ['cluster_id', 'path', 'pseudo']:
if k not in new_export:
raise NFSInvalidOperation(f'Export missing required field {k}')
if new_export['cluster_id'] not in available_clusters(self.mgr):
raise ClusterNotFound()
self.rados_namespace = new_export['cluster_id']
+ assert self.rados_namespace
new_export['path'] = self.format_path(new_export['path'])
new_export['pseudo'] = self.format_path(new_export['pseudo'])
if old_export.fsal.name != new_export.fsal.name:
raise NFSInvalidOperation('FSAL change not allowed')
+ """
if old_export.fsal.user_id != new_export.fsal.user_id:
raise NFSInvalidOperation('user_id change is not allowed')
self._update_user_id(new_export.path, new_export.access_type,
new_export.fsal.fs_name, new_export.fsal.user_id)
cast(new_export.fsal, CephFSFSAL).cephx_key = cast(old_export.fsal, CephFSFSAL).cephx_key
-
self._update_export(new_export)
export_ls = self.exports[self.rados_namespace]
if old_export not in export_ls:
old_export = self._fetch_export(old_export.cluster_id, old_export.pseudo)
export_ls.remove(old_export)
restart_nfs_service(self.mgr, new_export.cluster_id)
+ """
return 0, f"Updated export {new_export.pseudo}", ""
-from typing import cast, List, Dict, Any, Optional
+from typing import cast, List, Dict, Any, Optional, TYPE_CHECKING
from os.path import isabs
from .exception import NFSInvalidOperation, FSNotFound
from .utils import check_fs
+if TYPE_CHECKING:
+ from nfs.module import Module
+
+
+class RawBlock():
+ def __init__(self, block_name: str, blocks: List['RawBlock'] = [], values: Dict[str, Any] = {}):
+ if not values: # workaround mutable default argument
+ values = {}
+ if not blocks: # workaround mutable default argument
+ blocks = []
+ self.block_name = block_name
+ self.blocks = blocks
+ self.values = values
+
+ def __eq__(self, other: Any) -> bool:
+ if not isinstance(other, RawBlock):
+ return False
+ return self.block_name == other.block_name and \
+ self.blocks == other.blocks and \
+ self.values == other.values
+
+ def __repr__(self) -> str:
+ return f'RawBlock({self.block_name!r}, {self.blocks!r}, {self.values!r})'
+
class GaneshaConfParser:
def __init__(self, raw_config: str):
else:
self.text += "".join(line.split())
- def stream(self):
+ def stream(self) -> str:
return self.text[self.pos:]
def last_context(self) -> str:
return f'"...{self.text[max(0, self.pos - 30):self.pos]}<here>{self.stream()[:30]}"'
- def parse_block_name(self):
+ def parse_block_name(self) -> str:
idx = self.stream().find('{')
if idx == -1:
raise Exception(f"Cannot find block name at {self.last_context()}")
self.pos += idx+1
return block_name
- def parse_block_or_section(self):
+ def parse_block_or_section(self) -> RawBlock:
if self.stream().startswith("%url "):
# section line
self.pos += 5
else:
value = self.stream()[:idx]
self.pos += idx+1
- block_dict = {'block_name': '%url', 'value': value}
+ block_dict = RawBlock('%url', values={'value': value})
return block_dict
- block_dict = {'block_name': self.parse_block_name().upper()}
+ block_dict = RawBlock(self.parse_block_name().upper())
self.parse_block_body(block_dict)
if self.stream()[0] != '}':
raise Exception("No closing bracket '}' found at the end of block")
self.pos += 1
return block_dict
- def parse_parameter_value(self, raw_value):
+ def parse_parameter_value(self, raw_value: str) -> Any:
if raw_value.find(',') != -1:
return [self.parse_parameter_value(v.strip())
for v in raw_value.split(',')]
return raw_value[1:-1]
return raw_value
- def parse_stanza(self, block_dict):
+ def parse_stanza(self, block_dict: RawBlock) -> None:
equal_idx = self.stream().find('=')
if equal_idx == -1:
raise Exception("Malformed stanza: no equal symbol found.")
semicolon_idx = self.stream().find(';')
parameter_name = self.stream()[:equal_idx].lower()
parameter_value = self.stream()[equal_idx+1:semicolon_idx]
- block_dict[parameter_name] = self.parse_parameter_value(parameter_value)
+ block_dict.values[parameter_name] = self.parse_parameter_value(parameter_value)
self.pos += semicolon_idx+1
- def parse_block_body(self, block_dict):
+ def parse_block_body(self, block_dict: RawBlock) -> None:
while True:
if self.stream().find('}') == 0:
# block end
self.parse_stanza(block_dict)
elif is_lbracket and ((is_semicolon and not is_semicolon_lt_lbracket) or
(not is_semicolon)):
- if '_blocks_' not in block_dict:
- block_dict['_blocks_'] = []
- block_dict['_blocks_'].append(self.parse_block_or_section())
+ block_dict.blocks.append(self.parse_block_or_section())
else:
raise Exception("Malformed stanza: no semicolon found.")
if last_pos == self.pos:
raise Exception("Infinite loop while parsing block content")
- def parse(self):
+ def parse(self) -> List[RawBlock]:
blocks = []
while self.stream():
blocks.append(self.parse_block_or_section())
return blocks
@staticmethod
- def _indentation(depth, size=4):
+ def _indentation(depth: int, size: int = 4) -> str:
conf_str = ""
for _ in range(0, depth*size):
conf_str += " "
return conf_str
@staticmethod
- def write_block_body(block, depth=0):
- def format_val(key, val):
+ def write_block_body(block: RawBlock, depth: int = 0) -> str:
+ def format_val(key: str, val: str) -> str:
if isinstance(val, list):
return ', '.join([format_val(key, v) for v in val])
if isinstance(val, bool):
return str(val).lower()
- if isinstance(val, int) or (block['block_name'] == 'CLIENT'
+ if isinstance(val, int) or (block.block_name == 'CLIENT'
and key == 'clients'):
return '{}'.format(val)
return '"{}"'.format(val)
conf_str = ""
- for key, val in block.items():
- if key == 'block_name':
- continue
- elif key == '_blocks_':
- for blo in val:
- conf_str += GaneshaConfParser.write_block(blo, depth)
- elif val is not None:
+ for blo in block.blocks:
+ conf_str += GaneshaConfParser.write_block(blo, depth)
+
+ for key, val in block.values.items():
+ if val is not None:
conf_str += GaneshaConfParser._indentation(depth)
conf_str += '{} = {};\n'.format(key, format_val(key, val))
return conf_str
@staticmethod
- def write_block(block, depth=0):
- if block['block_name'] == "%url":
- return '%url "{}"\n\n'.format(block['value'])
+ def write_block(block: RawBlock, depth: int = 0) -> str:
+ if block.block_name == "%url":
+ return '%url "{}"\n\n'.format(block.values['value'])
conf_str = ""
conf_str += GaneshaConfParser._indentation(depth)
- conf_str += format(block['block_name'])
+ conf_str += format(block.block_name)
conf_str += " {\n"
conf_str += GaneshaConfParser.write_block_body(block, depth+1)
conf_str += GaneshaConfParser._indentation(depth)
class FSAL(object):
- def __init__(self, name: str):
+ def __init__(self, name: str) -> None:
self.name = name
@classmethod
raise NFSInvalidOperation(f'Unknown FSAL {fsal_dict.get("name")}')
@classmethod
- def from_fsal_block(cls, fsal_block: Dict[str, Any]) -> 'FSAL':
- if fsal_block.get('name') == 'CEPH':
+ def from_fsal_block(cls, fsal_block: RawBlock) -> 'FSAL':
+ if fsal_block.values.get('name') == 'CEPH':
return CephFSFSAL.from_fsal_block(fsal_block)
- if fsal_block.get('name') == 'RGW':
+ if fsal_block.values.get('name') == 'RGW':
return RGWFSAL.from_fsal_block(fsal_block)
- raise NFSInvalidOperation(f'Unknown FSAL {fsal_block.get("name")}')
+ raise NFSInvalidOperation(f'Unknown FSAL {fsal_block.values.get("name")}')
- def to_fsal_block(self) -> Dict[str, Any]:
+ def to_fsal_block(self) -> RawBlock:
raise NotImplemented
def to_dict(self) -> Dict[str, Any]:
user_id: Optional[str] = None,
fs_name: Optional[str] = None,
sec_label_xattr: Optional[str] = None,
- cephx_key: Optional[str] = None):
+ cephx_key: Optional[str] = None) -> None:
super().__init__(name)
assert name == 'CEPH'
self.fs_name = fs_name
self.cephx_key = cephx_key
@classmethod
- def from_fsal_block(cls, fsal_block: Dict[str, str]) -> 'CephFSFSAL':
- return cls(fsal_block['name'],
- fsal_block.get('user_id'),
- fsal_block.get('filesystem'),
- fsal_block.get('sec_label_xattr'),
- fsal_block.get('secret_access_key'))
-
- def to_fsal_block(self) -> Dict[str, str]:
- result = {
- 'block_name': 'FSAL',
- 'name': self.name,
- }
+ def from_fsal_block(cls, fsal_block: RawBlock) -> 'CephFSFSAL':
+ return cls(fsal_block.values['name'],
+ fsal_block.values.get('user_id'),
+ fsal_block.values.get('filesystem'),
+ fsal_block.values.get('sec_label_xattr'),
+ fsal_block.values.get('secret_access_key'))
+
+ def to_fsal_block(self) -> RawBlock:
+ result = RawBlock('FSAL', values={'name': self.name})
+
if self.user_id:
- result['user_id'] = self.user_id
+ result.values['user_id'] = self.user_id
if self.fs_name:
- result['filesystem'] = self.fs_name
+ result.values['filesystem'] = self.fs_name
if self.sec_label_xattr:
- result['sec_label_xattr'] = self.sec_label_xattr
+ result.values['sec_label_xattr'] = self.sec_label_xattr
if self.cephx_key:
- result['secret_access_key'] = self.cephx_key
+ result.values['secret_access_key'] = self.cephx_key
return result
@classmethod
- def from_dict(cls, fsal_dict: Dict[str, str]) -> 'CephFSFSAL':
+ def from_dict(cls, fsal_dict: Dict[str, Any]) -> 'CephFSFSAL':
return cls(fsal_dict['name'],
fsal_dict.get('user_id'),
fsal_dict.get('fs_name'),
user_id: Optional[str] = None,
access_key_id: Optional[str] = None,
secret_access_key: Optional[str] = None
- ):
+ ) -> None:
super().__init__(name)
assert name == 'RGW'
self.user_id = user_id
self.secret_access_key = secret_access_key
@classmethod
- def from_fsal_block(cls, fsal_block: Dict[str, str]) -> 'RGWFSAL':
- return cls(fsal_block['name'],
- fsal_block.get('user_id'),
- fsal_block.get('access_key'),
- fsal_block.get('secret_access_key'))
-
- def to_fsal_block(self) -> Dict[str, str]:
- result = {
- 'block_name': 'FSAL',
- 'name': self.name,
- }
+ def from_fsal_block(cls, fsal_block: RawBlock) -> 'RGWFSAL':
+ return cls(fsal_block.values['name'],
+ fsal_block.values.get('user_id'),
+ fsal_block.values.get('access_key'),
+ fsal_block.values.get('secret_access_key'))
+
+ def to_fsal_block(self) -> RawBlock:
+ result = RawBlock('FSAL', values={'name': self.name})
+
if self.user_id:
- result['user_id'] = self.user_id
- if self.fs_name:
- result['access_key_id'] = self.access_key_id
+ result.values['user_id'] = self.user_id
+ if self.access_key_id:
+ result.values['access_key_id'] = self.access_key_id
if self.secret_access_key:
- result['secret_access_key'] = self.secret_access_key
+ result.values['secret_access_key'] = self.secret_access_key
return result
@classmethod
class Client:
def __init__(self,
addresses: List[str],
- access_type: Optional[str] = None,
- squash: Optional[str] = None):
+ access_type: str,
+ squash: str):
self.addresses = addresses
self.access_type = access_type
self.squash = squash
@classmethod
- def from_client_block(cls, client_block) -> 'Client':
- addresses = client_block.get('clients', [])
+ def from_client_block(cls, client_block: RawBlock) -> 'Client':
+ addresses = client_block.values.get('clients', [])
if isinstance(addresses, str):
addresses = [addresses]
return cls(addresses,
- client_block.get('access_type', None),
- client_block.get('squash', None))
+ client_block.values.get('access_type', None),
+ client_block.values.get('squash', None))
- def to_client_block(self):
- result = {
- 'block_name': 'CLIENT',
- 'clients': self.addresses,
- }
+ def to_client_block(self) -> RawBlock:
+ result = RawBlock('CLIENT', values={'clients': self.addresses})
if self.access_type:
- result['access_type'] = self.access_type
+ result.values['access_type'] = self.access_type
if self.squash:
- result['squash'] = self.squash
+ result.values['squash'] = self.squash
return result
@classmethod
- def from_dict(cls, client_dict) -> 'Client':
+ def from_dict(cls, client_dict: Dict[str, Any]) -> 'Client':
return cls(client_dict['addresses'], client_dict['access_type'],
client_dict['squash'])
- def to_dict(self):
+ def to_dict(self) -> Dict[str, Any]:
return {
'addresses': self.addresses,
'access_type': self.access_type,
protocols: List[int],
transports: List[str],
fsal: FSAL,
- clients=None):
+ clients: Optional[List[Client]] = None) -> None:
self.export_id = export_id
self.path = path
self.fsal = fsal
self.security_label = security_label
self.protocols = protocols
self.transports = transports
- self.clients = clients
+ self.clients: List[Client] = clients or []
@classmethod
- def from_export_block(cls, export_block, cluster_id):
- fsal_blocks = [b for b in export_block['_blocks_']
- if b['block_name'] == "FSAL"]
+ def from_export_block(cls, export_block: RawBlock, cluster_id: str) -> 'Export':
+ fsal_blocks = [b for b in export_block.blocks
+ if b.block_name == "FSAL"]
- client_blocks = [b for b in export_block['_blocks_']
- if b['block_name'] == "CLIENT"]
+ client_blocks = [b for b in export_block.blocks
+ if b.block_name == "CLIENT"]
- protocols = export_block.get('protocols')
+ protocols = export_block.values.get('protocols')
if not isinstance(protocols, list):
protocols = [protocols]
- transports = export_block.get('transports')
+ transports = export_block.values.get('transports')
if not isinstance(transports, list):
transports = [transports]
- return cls(export_block['export_id'],
- export_block['path'],
+ return cls(export_block.values['export_id'],
+ export_block.values['path'],
cluster_id,
- export_block['pseudo'],
- export_block['access_type'],
- export_block.get('squash', 'no_root_squash'),
- export_block.get('security_label', True),
+ export_block.values['pseudo'],
+ export_block.values['access_type'],
+ export_block.values.get('squash', 'no_root_squash'),
+ export_block.values.get('security_label', True),
protocols,
transports,
FSAL.from_fsal_block(fsal_blocks[0]),
[Client.from_client_block(client)
for client in client_blocks])
- def to_export_block(self):
- result = {
- 'block_name': 'EXPORT',
+ def to_export_block(self) -> RawBlock:
+ result = RawBlock('EXPORT', values={
'export_id': self.export_id,
'path': self.path,
'pseudo': self.pseudo,
'security_label': self.security_label,
'protocols': self.protocols,
'transports': self.transports,
- }
- result['_blocks_'] = [
+ })
+ result.blocks = [
self.fsal.to_fsal_block()
] + [
client.to_client_block()
return result
@classmethod
- def from_dict(cls, export_id, ex_dict):
+ def from_dict(cls, export_id: int, ex_dict: Dict[str, Any]) -> 'Export':
return cls(export_id,
ex_dict.get('path', '/'),
ex_dict['cluster_id'],
FSAL.from_dict(ex_dict.get('fsal', {})),
[Client.from_dict(client) for client in ex_dict.get('clients', [])])
- def to_dict(self):
+ def to_dict(self) -> Dict[str, Any]:
return {
'export_id': self.export_id,
'path': self.path,
}
@staticmethod
- def validate_access_type(access_type):
+ def validate_access_type(access_type: str) -> None:
valid_access_types = ['rw', 'ro', 'none']
if access_type.lower() not in valid_access_types:
raise NFSInvalidOperation(
)
@staticmethod
- def validate_squash(squash):
+ def validate_squash(squash: str) -> None:
valid_squash_ls = [
"root", "root_squash", "rootsquash", "rootid", "root_id_squash",
"rootidsquash", "all", "all_squash", "allsquash", "all_anomnymous",
f"squash {squash} not in valid list {valid_squash_ls}"
)
- def validate(self, mgr):
+ def validate(self, mgr: 'Module') -> None:
if not isabs(self.pseudo) or self.pseudo == "/":
raise NFSInvalidOperation(
f"pseudo path {self.pseudo} is invalid. It should be an absolute "
if self.fsal.name == 'CEPH':
fs = cast(CephFSFSAL, self.fsal)
- if not check_fs(mgr, fs.fs_name):
+ if not fs.fs_name or not check_fs(mgr, fs.fs_name):
raise FSNotFound(fs.fs_name)
elif self.fsal.name == 'RGW':
rgw = cast(RGWFSAL, self.fsal)
else:
raise NFSInvalidOperation('FSAL {self.fsal.name} not supported')
- def __eq__(self, other):
+ def __eq__(self, other: Any) -> bool:
if not isinstance(other, Export):
return False
return self.to_dict() == other.to_dict()
from .export import ExportMgr
from .cluster import NFSCluster
+from typing import Any
log = logging.getLogger(__name__)
class Module(orchestrator.OrchestratorClientMixin, MgrModule):
MODULE_OPTIONS: List[Option] = []
- def __init__(self, *args, **kwargs):
+ def __init__(self, *args: str, **kwargs: Any) -> None:
self.inited = False
self.lock = threading.Lock()
super(Module, self).__init__(*args, **kwargs)
squash=squash, clients=clients)
@CLICommand('nfs export create rgw', perm='rw')
- def _cmd_nfs_export_create_cephfs(
+ def _cmd_rgw_export_create_cephfs(
self,
bucket: str,
clusterid: str,
+from typing import Optional, Tuple, Iterator, List, Any, Dict
+
from contextlib import contextmanager
from unittest import mock
from unittest.mock import MagicMock
from ceph.deployment.service_spec import NFSServiceSpec
from nfs import Module
from nfs.export import ExportMgr
-from nfs.export_utils import GaneshaConfParser, Export
+from nfs.export_utils import GaneshaConfParser, Export, RawBlock
from orchestrator import ServiceDescription, DaemonDescription, OrchResult
-class TestNFS():
+class TestNFS:
daemon_raw_config = """
NFS_CORE_PARAM {
Enable_NLM = false;
class RObject(object):
- def __init__(self, key, raw):
+ def __init__(self, key: str, raw: str) -> None:
self.key = key
self.raw = raw
- def read(self, _):
+ def read(self, _: Optional[int]) -> bytes:
return self.raw.encode('utf-8')
- def stat(self):
+ def stat(self) -> Tuple[int, None]:
return len(self.raw), None
- def _ioctx_write_full_mock(self, key, content):
+ def _ioctx_write_full_mock(self, key: str, content: bytes) -> None:
if key not in self.temp_store[self.temp_store_namespace]:
self.temp_store[self.temp_store_namespace][key] = \
TestNFS.RObject(key, content.decode('utf-8'))
else:
self.temp_store[self.temp_store_namespace][key].raw = content.decode('utf-8')
- def _ioctx_remove_mock(self, key):
+ def _ioctx_remove_mock(self, key: str) -> None:
del self.temp_store[self.temp_store_namespace][key]
- def _ioctx_list_objects_mock(self):
+ def _ioctx_list_objects_mock(self) -> List['TestNFS.RObject']:
r = [obj for _, obj in self.temp_store[self.temp_store_namespace].items()]
return r
def _ioctl_stat_mock(self, key):
return self.temp_store[self.temp_store_namespace][key].stat()
- def _ioctl_read_mock(self, key, size=None):
+ def _ioctl_read_mock(self, key: str, size: Optional[Any] = None) -> bytes:
return self.temp_store[self.temp_store_namespace][key].read(size)
- def _ioctx_set_namespace_mock(self, namespace):
+ def _ioctx_set_namespace_mock(self, namespace: str) -> None:
self.temp_store_namespace = namespace
- def _reset_temp_store(self):
+ def _reset_temp_store(self) -> None:
self.temp_store_namespace = None
self.temp_store = {
'ns': {
}
@contextmanager
- def _mock_orchestrator(self, enable):
+ def _mock_orchestrator(self, enable: bool) -> Iterator:
self.io_mock = MagicMock()
self.io_mock.set_namespace.side_effect = self._ioctx_set_namespace_mock
yield
- def test_parse_daemon_raw_config(self):
+ def test_parse_daemon_raw_config(self) -> None:
expected_daemon_config = [
- {
- "block_name": "NFS_CORE_PARAM",
+ RawBlock('NFS_CORE_PARAM', values={
"enable_nlm": False,
"enable_rquota": False,
"protocols": 4,
"nfs_port": 14000
- },
- {
- "block_name": "MDCACHE",
+ }),
+ RawBlock('MDCACHE', values={
"dir_chunk": 0
- },
- {
- "block_name": "NFSV4",
+ }),
+ RawBlock('NFSV4', values={
"recoverybackend": "rados_cluster",
"minor_versions": [1, 2]
- },
- {
- "block_name": "RADOS_KV",
+ }),
+ RawBlock('RADOS_KV', values={
"pool": "nfs-ganesha",
"namespace": "vstart",
"userid": "vstart",
"nodeid": "a"
- },
- {
- "block_name": "RADOS_URLS",
+ }),
+ RawBlock('RADOS_URLS', values={
"userid": "vstart",
"watch_url": "'rados://nfs-ganesha/vstart/conf-nfs.vstart'"
- },
- {
- "block_name": "%url",
+ }),
+ RawBlock('%url', values={
"value": "rados://nfs-ganesha/vstart/conf-nfs.vstart"
- }
+ })
]
daemon_config = GaneshaConfParser(self.daemon_raw_config).parse()
assert daemon_config == expected_daemon_config
# assert export.security_label == False # probably correct value
assert export.security_label == True
- def test_export_parser_1(self):
+ def test_export_parser_1(self) -> None:
blocks = GaneshaConfParser(self.export_1).parse()
assert isinstance(blocks, list)
assert len(blocks) == 1
assert len(export.clients) == 0
assert export.cluster_id in ('_default_', 'foo')
- def test_export_parser_2(self):
+ def test_export_parser_2(self) -> None:
blocks = GaneshaConfParser(self.export_2).parse()
assert isinstance(blocks, list)
assert len(blocks) == 1
self._validate_export_2(export)
- def test_daemon_conf_parser_a(self):
+ def test_daemon_conf_parser_a(self) -> None:
blocks = GaneshaConfParser(self.conf_nodea).parse()
assert isinstance(blocks, list)
assert len(blocks) == 2
- assert blocks[0]['block_name'] == "%url"
- assert blocks[0]['value'] == "rados://ganesha/ns/export-2"
- assert blocks[1]['block_name'] == "%url"
- assert blocks[1]['value'] == "rados://ganesha/ns/export-1"
+ assert blocks[0].block_name == "%url"
+ assert blocks[0].values['value'] == "rados://ganesha/ns/export-2"
+ assert blocks[1].block_name == "%url"
+ assert blocks[1].values['value'] == "rados://ganesha/ns/export-1"
- def test_daemon_conf_parser_b(self):
+ def test_daemon_conf_parser_b(self) -> None:
blocks = GaneshaConfParser(self.conf_nodeb).parse()
assert isinstance(blocks, list)
assert len(blocks) == 1
- assert blocks[0]['block_name'] == "%url"
- assert blocks[0]['value'] == "rados://ganesha/ns/export-1"
+ assert blocks[0].block_name == "%url"
+ assert blocks[0].values['value'] == "rados://ganesha/ns/export-1"
- def test_ganesha_conf(self):
+ def test_ganesha_conf(self) -> None:
with self._mock_orchestrator(True):
for cluster_id, info in self.clusters.items():
self._do_test_ganesha_conf(cluster_id, info['exports'])
self._reset_temp_store()
- def _do_test_ganesha_conf(self, cluster, expected_exports):
+ def _do_test_ganesha_conf(self, cluster: str, expected_exports: Dict[int, List[str]]) -> None:
nfs_mod = Module('nfs', '', '')
ganesha_conf = ExportMgr(nfs_mod)
exports = ganesha_conf.exports['foo']
self._validate_export_2([e for e in exports if e.export_id == 2][0])
- def test_config_dict(self):
+ def test_config_dict(self) -> None:
with self._mock_orchestrator(True):
for cluster_id, info in self.clusters.items():
self._do_test_config_dict(cluster_id, info['exports'])
self._reset_temp_store()
- def _do_test_config_dict(self, cluster, expected_exports):
+ def _do_test_config_dict(self, cluster: str, expected_exports: Dict[int, List[str]]) -> None:
nfs_mod = Module('nfs', '', '')
conf = ExportMgr(nfs_mod)
export = [e for e in conf.exports['foo'] if e.export_id == 1][0]
'squash': 'AllAnonymous',
'transports': ['TCP', 'UDP']}
- def test_config_from_dict(self):
+ def test_config_from_dict(self) -> None:
with self._mock_orchestrator(True):
for cluster_id, info in self.clusters.items():
self._do_test_config_from_dict(cluster_id, info['exports'])
self._reset_temp_store()
- def _do_test_config_from_dict(self, cluster_id, expected_exports):
+ def _do_test_config_from_dict(self, cluster_id: str, expected_exports: Dict[int, List[str]]) -> None:
export = Export.from_dict(1, {
'daemons': expected_exports[1],
'export_id': 1,
assert export.cluster_id == cluster_id
"""
- def test_remove_export(self):
+ def test_remove_export(self) -> None:
with self._mock_orchestrator(True):
for cluster_id, info in self.clusters.items():
self._do_test_remove_export(cluster_id, info['exports'])
self._reset_temp_store()
- def _do_test_remove_export(self, cluster_id, expected_exports):
+ def _do_test_remove_export(self, cluster_id: str, expected_exports: Dict[int, List[str]]) -> None:
nfs_mod = Module('nfs', '', '')
conf = ExportMgr(nfs_mod)
assert len(conf.exports[cluster_id]) == 2
+from typing import List, TYPE_CHECKING
+
import orchestrator
+if TYPE_CHECKING:
+ from nfs.module import Module
+
POOL_NAME = 'nfs-ganesha'
-def available_clusters(mgr):
+def available_clusters(mgr: 'Module') -> List[str]:
'''
This method returns list of available cluster ids.
Service name is service_type.service_id
# TODO check cephadm cluster list with rados pool conf objects
completion = mgr.describe_service(service_type='nfs')
orchestrator.raise_if_exception(completion)
+ assert completion.result is not None
return [cluster.spec.service_id for cluster in completion.result
if cluster.spec.service_id]
-def restart_nfs_service(mgr, cluster_id):
+def restart_nfs_service(mgr: 'Module', cluster_id: str) -> None:
'''
This methods restarts the nfs daemons
'''
orchestrator.raise_if_exception(completion)
-def check_fs(mgr, fs_name):
+def check_fs(mgr: 'Module', fs_name: str) -> bool:
'''
This method checks if given fs is valid
'''
skipsdist = true
requires = cython
+[pytest]
+log_level=NOTSET
+
[flake8]
max-line-length = 100
ignore =
-m mgr_module \
-m mgr_util \
-m mirroring \
+ -m nfs \
-m orchestrator \
-m progress \
-m prometheus \