+# flake8: noqa
+
import os
if 'UNITTEST' in os.environ:
import tests
import logging
-import socket
import json
import re
+import socket
from typing import cast, Dict, List, Any, Union, Optional, TypeVar, Callable, TYPE_CHECKING, Tuple
from ceph.deployment.service_spec import NFSServiceSpec, PlacementSpec, IngressSpec
raise ClusterNotFound()
except NotImplementedError:
return 0, "NFS-Ganesha Config Added Successfully "\
- "(Manual Restart of NFS PODS required)", ""
+ "(Manual Restart of NFS PODS required)", ""
except Exception as e:
return exception_handler(e, f"Setting NFS-Ganesha Config failed for {cluster_id}")
raise ClusterNotFound()
except NotImplementedError:
return 0, "NFS-Ganesha Config Removed Successfully "\
- "(Manual Restart of NFS PODS required)", ""
+ "(Manual Restart of NFS PODS required)", ""
except Exception as e:
return exception_handler(e, f"Resetting NFS-Ganesha Config failed for {cluster_id}")
import json
import logging
from typing import List, Any, Dict, Tuple, Optional, TYPE_CHECKING, TypeVar, Callable, cast
-from os.path import isabs, normpath
+from os.path import normpath
from rados import TimedOut, ObjectNotFound
from .export_utils import GaneshaConfParser, Export, RawBlock
from .exception import NFSException, NFSInvalidOperation, NFSObjectNotFound, FSNotFound, \
- ClusterNotFound
-from .utils import POOL_NAME, available_clusters, restart_nfs_service, check_fs
+ ClusterNotFound
+from .utils import POOL_NAME, available_clusters, check_fs
if TYPE_CHECKING:
from nfs.module import Module
try:
ioctx.notify(obj)
except TimedOut:
- log.exception(f"Ganesha timed out")
+ log.exception("Ganesha timed out")
@property
def exports(self) -> Dict[str, List[Export]]:
self.mgr.check_mon_command({
'prefix': 'auth rm',
'entity': 'client.{}'.format(entity),
- })
+ })
log.info(f"Export user deleted is {entity}")
def _gen_export_id(self) -> int:
if export:
if pseudo_path:
NFSRados(self.mgr, self.rados_namespace).remove_obj(
- f'export-{export.export_id}', f'conf-nfs.{cluster_id}')
+ f'export-{export.export_id}', f'conf-nfs.{cluster_id}')
self.exports[cluster_id].remove(export)
self._delete_user(export.fsal.user_id)
if not self.exports[cluster_id]:
with self.mgr.rados.open_ioctx(self.rados_pool) as ioctx:
ioctx.set_namespace(self.rados_namespace)
export = Export.from_export_block(GaneshaConfParser(ioctx.read(f"export-{ex_id}"
- ).decode("utf-8")).parse()[0], self.rados_namespace)
+ ).decode("utf-8")).parse()[0], self.rados_namespace)
return export
except ObjectNotFound:
log.exception(f"Export ID: {ex_id} not found")
assert self.rados_namespace
self.exports[self.rados_namespace].append(export)
NFSRados(self.mgr, self.rados_namespace).update_obj(
- GaneshaConfParser.write_block(export.to_export_block()),
- f'export-{export.export_id}', f'conf-nfs.{export.cluster_id}')
+ GaneshaConfParser.write_block(export.to_export_block()),
+ f'export-{export.export_id}', f'conf-nfs.{export.cluster_id}')
def format_path(self, path: str) -> str:
if path:
def _update_user_id(self, path: str, access_type: str, fs_name: str, user_id: str) -> None:
osd_cap = 'allow rw pool={} namespace={}, allow rw tag cephfs data={}'.format(
- self.rados_pool, self.rados_namespace, fs_name)
+ self.rados_pool, self.rados_namespace, fs_name)
access_type = 'r' if access_type == 'RO' else 'rw'
self.mgr.check_mon_command({
'entity': f'client.{user_id}',
'caps': ['mon', 'allow r', 'osd', osd_cap, 'mds', 'allow {} path={}'.format(
access_type, path)],
- })
+ })
log.info(f"Export user updated {user_id}")
def _create_user_key(self, entity: str, path: str, fs_name: str, fs_ro: bool) -> Tuple[str, str]:
osd_cap = 'allow rw pool={} namespace={}, allow rw tag cephfs data={}'.format(
- self.rados_pool, self.rados_namespace, fs_name)
+ self.rados_pool, self.rados_namespace, fs_name)
access_type = 'r' if fs_ro else 'rw'
ret, out, err = self.mgr.check_mon_command({
'caps': ['mon', 'allow r', 'osd', osd_cap, 'mds', 'allow {} path={}'.format(
access_type, path)],
'format': 'json',
- })
+ })
json_res = json.loads(out)
log.info("Export user created is {}".format(json_res[0]['entity']))
pseudo_path: str,
read_only: bool,
squash: str,
- clients: list=[]) -> Tuple[int, str, str]:
+ clients: list = []) -> Tuple[int, str, str]:
pseudo_path = self.format_path(pseudo_path)
if cluster_id not in self.exports:
if not self._fetch_export(cluster_id, pseudo_path):
# generate access+secret keys
-
+
ex_id = self._gen_export_id()
if clients:
access_type = "none"
'squash': squash,
'fsal': {
"name": "RGW",
- #"user_id": user_id,
- #"access_key_id": access_key_id,
- #"secret_access_key": secret_access_key,
+ # "user_id": user_id,
+ # "access_key_id": access_key_id,
+ # "secret_access_key": secret_access_key,
},
'clients': clients
}
if idx == -1:
raise Exception(f"Cannot find block name at {self.last_context()}")
block_name = self.stream()[:idx]
- self.pos += idx+1
+ self.pos += idx + 1
return block_name
def parse_block_or_section(self) -> RawBlock:
self.pos += len(value)
else:
value = self.stream()[:idx]
- self.pos += idx+1
+ self.pos += idx + 1
block_dict = RawBlock('%url', values={'value': value})
return block_dict
raise Exception("Malformed stanza: no equal symbol found.")
semicolon_idx = self.stream().find(';')
parameter_name = self.stream()[:equal_idx].lower()
- parameter_value = self.stream()[equal_idx+1:semicolon_idx]
+ parameter_value = self.stream()[equal_idx + 1:semicolon_idx]
block_dict.values[parameter_name] = self.parse_parameter_value(parameter_value)
- self.pos += semicolon_idx+1
+ self.pos += semicolon_idx + 1
def parse_block_body(self, block_dict: RawBlock) -> None:
while True:
if is_semicolon and ((is_lbracket and is_semicolon_lt_lbracket) or not is_lbracket):
self.parse_stanza(block_dict)
- elif is_lbracket and ((is_semicolon and not is_semicolon_lt_lbracket) or
- (not is_semicolon)):
+ elif is_lbracket and ((is_semicolon and not is_semicolon_lt_lbracket)
+ or (not is_semicolon)):
block_dict.blocks.append(self.parse_block_or_section())
else:
raise Exception("Malformed stanza: no semicolon found.")
@staticmethod
def _indentation(depth: int, size: int = 4) -> str:
conf_str = ""
- for _ in range(0, depth*size):
+ for _ in range(0, depth * size):
conf_str += " "
return conf_str
conf_str += GaneshaConfParser._indentation(depth)
conf_str += format(block.block_name)
conf_str += " {\n"
- conf_str += GaneshaConfParser.write_block_body(block, depth+1)
+ conf_str += GaneshaConfParser.write_block_body(block, depth + 1)
conf_str += GaneshaConfParser._indentation(depth)
conf_str += "}\n"
return conf_str
raise NFSInvalidOperation(f'Unknown FSAL {fsal_block.values.get("name")}')
def to_fsal_block(self) -> RawBlock:
- raise NotImplemented
+ raise NotImplementedError
def to_dict(self) -> Dict[str, Any]:
- raise NotImplemented
+ raise NotImplementedError
class CephFSFSAL(FSAL):
if not fs.fs_name or not check_fs(mgr, fs.fs_name):
raise FSNotFound(fs.fs_name)
elif self.fsal.name == 'RGW':
- rgw = cast(RGWFSAL, self.fsal)
+ rgw = cast(RGWFSAL, self.fsal) # noqa
pass
else:
raise NFSInvalidOperation('FSAL {self.fsal.name} not supported')
@CLICommand('nfs cluster create', perm='rw')
def _cmd_nfs_cluster_create(self,
clusterid: str,
- placement: Optional[str]=None,
- ingress: Optional[bool]=None,
- virtual_ip: Optional[str]=None) -> Tuple[int, str, str]:
+ placement: Optional[str] = None,
+ ingress: Optional[bool] = None,
+ virtual_ip: Optional[str] = None) -> Tuple[int, str, str]:
"""Create an NFS Cluster"""
return self.nfs.create_nfs_cluster(cluster_id=clusterid, placement=placement,
virtual_ip=virtual_ip, ingress=ingress)
+# flake8: noqa
+
from typing import Optional, Tuple, Iterator, List, Any, Dict
from contextlib import contextmanager
}
}
-
class RObject(object):
def __init__(self, key: str, raw: str) -> None:
self.key = key
"""
with mock.patch('nfs.module.Module.describe_service') as describe_service, \
- mock.patch('nfs.module.Module.rados') as rados:
-
+ mock.patch('nfs.module.Module.rados') as rados:
rados.open_ioctx.return_value.__enter__.return_value = self.io_mock
rados.open_ioctx.return_value.__exit__ = mock.Mock(return_value=None)
assert export.fsal.sec_label_xattr == None
assert len(export.clients) == 2
assert export.clients[0].addresses == \
- ["192.168.0.10", "192.168.1.0/8"]
+ ["192.168.0.10", "192.168.1.0/8"]
# assert export.clients[0].squash == "no_root_squash" # probably correct value
assert export.clients[0].squash == "None"
assert export.clients[0].access_type is None
assert export.protocols == [4, 3]
assert set(export.transports) == {"TCP", "UDP"}
assert export.fsal.name == "RGW"
- #assert export.fsal.rgw_user_id == "testuser" # probably correct value
- #assert export.fsal.access_key == "access_key" # probably correct value
- #assert export.fsal.secret_key == "secret_key" # probably correct value
+ # assert export.fsal.rgw_user_id == "testuser" # probably correct value
+ # assert export.fsal.access_key == "access_key" # probably correct value
+ # assert export.fsal.secret_key == "secret_key" # probably correct value
assert len(export.clients) == 0
assert export.cluster_id in ('_default_', 'foo')
export = Export.from_export_block(blocks[0], '_default_')
self._validate_export_2(export)
-
def test_daemon_conf_parser_a(self) -> None:
blocks = GaneshaConfParser(self.conf_nodea).parse()
assert isinstance(blocks, list)
self._validate_export_1([e for e in exports if e.export_id == 1][0])
self._validate_export_2([e for e in exports if e.export_id == 2][0])
-
def test_config_dict(self) -> None:
with self._mock_orchestrator(True):
for cluster_id, info in self.clusters.items():
ex_dict = export.to_dict()
assert ex_dict == {'access_type': 'RW',
- 'clients': [{'access_type': None,
- 'addresses': ['192.168.0.10', '192.168.1.0/8'],
- 'squash': 'None'},
- {'access_type': 'RO',
- 'addresses': ['192.168.0.0/16'],
- 'squash': 'All'}],
- 'cluster_id': 'foo',
- 'export_id': 1,
- 'fsal': {'fs_name': 'a', 'name': 'CEPH', 'user_id': 'ganesha'},
- 'path': '/',
- 'protocols': [4],
- 'pseudo': '/cephfs_a/',
- 'security_label': True,
- 'squash': 'no_root_squash',
- 'transports': [None]}
+ 'clients': [{'access_type': None,
+ 'addresses': ['192.168.0.10', '192.168.1.0/8'],
+ 'squash': 'None'},
+ {'access_type': 'RO',
+ 'addresses': ['192.168.0.0/16'],
+ 'squash': 'All'}],
+ 'cluster_id': 'foo',
+ 'export_id': 1,
+ 'fsal': {'fs_name': 'a', 'name': 'CEPH', 'user_id': 'ganesha'},
+ 'path': '/',
+ 'protocols': [4],
+ 'pseudo': '/cephfs_a/',
+ 'security_label': True,
+ 'squash': 'no_root_squash',
+ 'transports': [None]}
export = [e for e in conf.exports['foo'] if e.export_id == 2][0]
ex_dict = export.to_dict()
assert ex_dict == {'access_type': 'RW',
- 'clients': [],
- 'cluster_id': 'foo',
- 'export_id': 2,
- 'fsal': {'name': 'RGW',
- 'secret_access_key': 'secret_key',
- 'user_id': 'testuser'},
- 'path': '/',
- 'protocols': [3, 4],
- 'pseudo': '/rgw',
- 'security_label': True,
- 'squash': 'AllAnonymous',
- 'transports': ['TCP', 'UDP']}
+ 'clients': [],
+ 'cluster_id': 'foo',
+ 'export_id': 2,
+ 'fsal': {'name': 'RGW',
+ 'secret_access_key': 'secret_key',
+ 'user_id': 'testuser'},
+ 'path': '/',
+ 'protocols': [3, 4],
+ 'pseudo': '/rgw',
+ 'security_label': True,
+ 'squash': 'AllAnonymous',
+ 'transports': ['TCP', 'UDP']}
def test_config_from_dict(self) -> None:
with self._mock_orchestrator(True):
assert export.fsal.sec_label_xattr == 'security.selinux'
assert len(export.clients) == 2
assert export.clients[0].addresses == \
- ["192.168.0.10", "192.168.1.0/8"]
+ ["192.168.0.10", "192.168.1.0/8"]
assert export.clients[0].squash == "no_root_squash"
assert export.clients[0].access_type is None
assert export.clients[1].addresses == ["192.168.0.0/16"]
assert export.daemons == set(expected_exports[2])
assert export.cluster_id == cluster_id
"""
-
+
def test_remove_export(self) -> None:
with self._mock_orchestrator(True):
for cluster_id, info in self.clusters.items():
nfs_mod = Module('nfs', '', '')
conf = ExportMgr(nfs_mod)
assert len(conf.exports[cluster_id]) == 2
- assert conf.delete_export(cluster_id=cluster_id, pseudo_path="/rgw") == (0, "Successfully deleted export", "")
+ assert conf.delete_export(cluster_id=cluster_id,
+ pseudo_path="/rgw") == (0, "Successfully deleted export", "")
exports = conf.exports[cluster_id]
assert len(exports) == 1
assert exports[0].export_id == 1
conf.reload_daemons(['nodea', 'nodeb'])
self.io_mock.notify.assert_has_calls(calls)
"""
-
+
"""
def test_list_daemons(self):
for cluster_id, info in self.clusters.items():
This methods restarts the nfs daemons
'''
completion = mgr.service_action(action='restart',
- service_name='nfs.'+cluster_id)
+ service_name='nfs.' + cluster_id)
orchestrator.raise_if_exception(completion)
diskprediction_local
insights
iostat
+ nfs
orchestrator
prometheus
status
hello
iostat
localpool
+ nfs
orchestrator
prometheus
selftest