From: Varsha Rao Date: Fri, 9 Apr 2021 16:59:19 +0000 (+0530) Subject: mgr/nfs: move validate methods into new ValidateExport class X-Git-Tag: v16.2.5~105^2~14 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=0c1fe1396ea62c99fbf6449b2aa5d70fddcbb4d7;p=ceph.git mgr/nfs: move validate methods into new ValidateExport class Signed-off-by: Varsha Rao (cherry picked from commit be22f0592e3123bf121b572cbb22fc8d5d329d4f) --- diff --git a/src/pybind/mgr/nfs/export/nfs.py b/src/pybind/mgr/nfs/export/nfs.py index 3ce77bdaee78..b2b544a9fa18 100644 --- a/src/pybind/mgr/nfs/export/nfs.py +++ b/src/pybind/mgr/nfs/export/nfs.py @@ -69,6 +69,11 @@ def exception_handler(exception_obj, log_msg=""): return getattr(exception_obj, 'errno', -1), "", str(exception_obj) +def check_fs(mgr, fs_name): + fs_map = mgr.get('fs_map') + return fs_name in [fs['mdsmap']['fs_name'] for fs in fs_map['filesystems']] + + class NFSRados: def __init__(self, mgr, namespace): self.mgr = mgr @@ -135,6 +140,72 @@ class NFSRados: return False +class ValidateExport: + @staticmethod + def pseudo_path(path): + if not isabs(path) or path == "/": + raise NFSInvalidOperation(f"pseudo path {path} is invalid. It should be an absolute " + "path and it cannot be just '/'.") + + @staticmethod + def squash(squash): + valid_squash_ls = ["root", "root_squash", "rootsquash", "rootid", "root_id_squash", + "rootidsquash", "all", "all_squash", "allsquash", "all_anomnymous", + "allanonymous", "no_root_squash", "none", "noidsquash"] + if squash not in valid_squash_ls: + raise NFSInvalidOperation(f"squash {squash} not in valid list {valid_squash_ls}") + + @staticmethod + def security_label(label): + if not isinstance(label, bool): + raise NFSInvalidOperation('Only boolean values allowed') + + @staticmethod + def protocols(proto): + for p in proto: + if p not in [3, 4]: + raise NFSInvalidOperation(f"Invalid protocol {p}") + if 3 in proto: + log.warning("NFS V3 is an old version, it might not work") + + @staticmethod + def transport(transport): + valid_transport = ["UDP", "TCP"] + for trans in transport: + if trans.upper() not in valid_transport: + raise NFSInvalidOperation(f'{trans} is not a valid transport protocol') + + @staticmethod + def access_type(access_type): + valid_ones = ['RW', 'RO'] + if access_type not in valid_ones: + raise NFSInvalidOperation(f'{access_type} is invalid, valid access type are' + f'{valid_ones}') + + @staticmethod + def fsal(mgr, old, new): + if old.name != new['name']: + raise NFSInvalidOperation('FSAL name change not allowed') + if old.user_id != new['user_id']: + raise NFSInvalidOperation('User ID modification is not allowed') + if new['sec_label_xattr']: + raise NFSInvalidOperation('Security label xattr cannot be changed') + if old.fs_name != new['fs_name']: + if not check_fs(mgr, new['fs_name']): + raise FSNotFound(new['fs_name']) + return 1 + + @staticmethod + def _client(client): + ValidateExport.access_type(client['access_type']) + ValidateExport.squash(client['squash']) + + @staticmethod + def clients(clients_ls): + for client in clients_ls: + ValidateExport._client(client) + + class FSExport(object): def __init__(self, mgr, namespace=None): self.mgr = mgr @@ -249,18 +320,14 @@ class FSExport(object): path = path[1:] return path - def check_fs(self, fs_name): - fs_map = self.mgr.get('fs_map') - return fs_name in [fs['mdsmap']['fs_name'] for fs in fs_map['filesystems']] - @export_cluster_checker def create_export(self, fs_name, cluster_id, pseudo_path, read_only, path): try: - if not self.check_fs(fs_name): + if not check_fs(self.mgr, fs_name): raise FSNotFound(fs_name) pseudo_path = self.format_path(pseudo_path) - self._validate_pseudo_path(pseudo_path) + ValidateExport.pseudo_path(pseudo_path) if cluster_id not in self.exports: self.exports[cluster_id] = [] @@ -339,61 +406,6 @@ class FSExport(object): except Exception as e: return exception_handler(e, f"Failed to get {pseudo_path} export for {cluster_id}") - def _validate_pseudo_path(self, path): - if not isabs(path) or path == "/": - raise NFSInvalidOperation(f"pseudo path {path} is invalid. It should be an absolute " - "path and it cannot be just '/'.") - - def _validate_squash(self, squash): - valid_squash_ls = ["root", "root_squash", "rootsquash", "rootid", "root_id_squash", - "rootidsquash", "all", "all_squash", "allsquash", "all_anomnymous", "allanonymous", - "no_root_squash", "none", "noidsquash"] - if squash not in valid_squash_ls: - raise NFSInvalidOperation(f"squash {squash} not in valid list {valid_squash_ls}") - - def _validate_security_label(self, label): - if not isinstance(label, bool): - raise NFSInvalidOperation('Only boolean values allowed') - - def _validate_protocols(self, proto): - for p in proto: - if p not in [3, 4]: - raise NFSInvalidOperation(f"Invalid protocol {p}") - if 3 in proto: - log.warning("NFS V3 is an old version, it might not work") - - def _validate_transport(self, transport): - valid_transport = ["UDP", "TCP"] - for trans in transport: - if trans.upper() not in valid_transport: - raise NFSInvalidOperation(f'{trans} is not a valid transport protocol') - - def _validate_access_type(self, access_type): - valid_ones = ['RW', 'RO'] - if access_type not in valid_ones: - raise NFSInvalidOperation(f'{access_type} is invalid, valid access type are' - f'{valid_ones}') - - def _validate_fsal(self, old, new): - if old.name != new['name']: - raise NFSInvalidOperation('FSAL name change not allowed') - if old.user_id != new['user_id']: - raise NFSInvalidOperation('User ID modification is not allowed') - if new['sec_label_xattr']: - raise NFSInvalidOperation('Security label xattr cannot be changed') - if old.fs_name != new['fs_name']: - if not self.check_fs(new['fs_name']): - raise FSNotFound(new['fs_name']) - return 1 - - def _validate_client(self, client): - self._validate_access_type(client['access_type']) - self._validate_squash(client['squash']) - - def _validate_clients(self, clients_ls): - for client in clients_ls: - self._validate_client(client) - def _fetch_export_obj(self, ex_id): try: with self.mgr.rados.open_ioctx(self.rados_pool) as ioctx: @@ -421,39 +433,39 @@ class FSExport(object): raise NFSObjectNotFound('Export does not exist') else: new_export_dict['pseudo'] = self.format_path(new_export_dict['pseudo']) - self._validate_pseudo_path(new_export_dict['pseudo']) + ValidateExport.pseudo_path(new_export_dict['pseudo']) log.debug(f"Pseudo path has changed from {export.pseudo} to "\ f"{new_export_dict['pseudo']}") # Check if squash changed if export.squash != new_export_dict['squash']: if new_export_dict['squash']: new_export_dict['squash'] = new_export_dict['squash'].lower() - self._validate_squash(new_export_dict['squash']) + ValidateExport.squash(new_export_dict['squash']) log.debug(f"squash has changed from {export.squash} to {new_export_dict['squash']}") # Security label check if export.security_label != new_export_dict['security_label']: - self._validate_security_label(new_export_dict['security_label']) + ValidateExport.security_label(new_export_dict['security_label']) # Protocol Checking if export.protocols != new_export_dict['protocols']: - self._validate_protocols(new_export_dict['protocols']) + ValidateExport.protocols(new_export_dict['protocols']) # Transport checking if export.transports != new_export_dict['transports']: - self._validate_transport(new_export_dict['transports']) + ValidateExport.transport(new_export_dict['transports']) # Path check if export.path != new_export_dict['path']: new_export_dict['path'] = self.format_path(new_export_dict['path']) out_msg = 'update caps' # Check Access Type if export.access_type != new_export_dict['access_type']: - self._validate_access_type(new_export_dict['access_type']) + ValidateExport.access_type(new_export_dict['access_type']) # Fsal block check if export.fsal != new_export_dict['fsal']: - ret = self._validate_fsal(export.fsal, new_export_dict['fsal']) + ret = ValidateExport.fsal(self.mgr, export.fsal, new_export_dict['fsal']) if ret == 1 and not out_msg: out_msg = 'update caps' # Check client block if export.clients != new_export_dict['clients']: - self._validate_clients(new_export_dict['clients']) + ValidateExport.clients(new_export_dict['clients']) log.debug(f'Validation succeeded for Export {export.pseudo}') return export, out_msg