]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/nfs: move validate methods into new ValidateExport class
authorVarsha Rao <varao@redhat.com>
Fri, 9 Apr 2021 16:59:19 +0000 (22:29 +0530)
committerVarsha Rao <varao@redhat.com>
Thu, 22 Apr 2021 06:08:30 +0000 (11:38 +0530)
Signed-off-by: Varsha Rao <varao@redhat.com>
src/pybind/mgr/nfs/export/nfs.py

index 3ce77bdaee78a3d3adc1facc651e3dd9befe8612..b2b544a9fa180f313d0d517be291012ae51ac631 100644 (file)
@@ -69,6 +69,11 @@ def exception_handler(exception_obj, log_msg=""):
     return getattr(exception_obj, 'errno', -1), "", str(exception_obj)
 
 
+def check_fs(mgr, fs_name):
+    fs_map = mgr.get('fs_map')
+    return fs_name in [fs['mdsmap']['fs_name'] for fs in fs_map['filesystems']]
+
+
 class NFSRados:
     def __init__(self, mgr, namespace):
         self.mgr = mgr
@@ -135,6 +140,72 @@ class NFSRados:
         return False
 
 
+class ValidateExport:
+    @staticmethod
+    def pseudo_path(path):
+        if not isabs(path) or path == "/":
+            raise NFSInvalidOperation(f"pseudo path {path} is invalid. It should be an absolute "
+                                      "path and it cannot be just '/'.")
+
+    @staticmethod
+    def squash(squash):
+        valid_squash_ls = ["root", "root_squash", "rootsquash", "rootid", "root_id_squash",
+                           "rootidsquash", "all", "all_squash", "allsquash", "all_anomnymous",
+                           "allanonymous", "no_root_squash", "none", "noidsquash"]
+        if squash not in valid_squash_ls:
+            raise NFSInvalidOperation(f"squash {squash} not in valid list {valid_squash_ls}")
+
+    @staticmethod
+    def security_label(label):
+        if not isinstance(label, bool):
+            raise NFSInvalidOperation('Only boolean values allowed')
+
+    @staticmethod
+    def protocols(proto):
+        for p in proto:
+            if p not in [3, 4]:
+                raise NFSInvalidOperation(f"Invalid protocol {p}")
+        if 3 in proto:
+            log.warning("NFS V3 is an old version, it might not work")
+
+    @staticmethod
+    def transport(transport):
+        valid_transport = ["UDP", "TCP"]
+        for trans in transport:
+            if trans.upper() not in valid_transport:
+                raise NFSInvalidOperation(f'{trans} is not a valid transport protocol')
+
+    @staticmethod
+    def access_type(access_type):
+        valid_ones = ['RW', 'RO']
+        if access_type not in valid_ones:
+            raise NFSInvalidOperation(f'{access_type} is invalid, valid access type are'
+                                      f'{valid_ones}')
+
+    @staticmethod
+    def fsal(mgr, old, new):
+        if old.name != new['name']:
+            raise NFSInvalidOperation('FSAL name change not allowed')
+        if old.user_id != new['user_id']:
+            raise NFSInvalidOperation('User ID modification is not allowed')
+        if new['sec_label_xattr']:
+            raise NFSInvalidOperation('Security label xattr cannot be changed')
+        if old.fs_name != new['fs_name']:
+            if not check_fs(mgr, new['fs_name']):
+                raise FSNotFound(new['fs_name'])
+            return 1
+
+    @staticmethod
+    def _client(client):
+        ValidateExport.access_type(client['access_type'])
+        ValidateExport.squash(client['squash'])
+
+    @staticmethod
+    def clients(clients_ls):
+        for client in clients_ls:
+            ValidateExport._client(client)
+
+
 class FSExport(object):
     def __init__(self, mgr, namespace=None):
         self.mgr = mgr
@@ -249,18 +320,14 @@ class FSExport(object):
                 path = path[1:]
         return path
 
-    def check_fs(self, fs_name):
-        fs_map = self.mgr.get('fs_map')
-        return fs_name in [fs['mdsmap']['fs_name'] for fs in fs_map['filesystems']]
-
     @export_cluster_checker
     def create_export(self, fs_name, cluster_id, pseudo_path, read_only, path):
         try:
-            if not self.check_fs(fs_name):
+            if not check_fs(self.mgr, fs_name):
                 raise FSNotFound(fs_name)
 
             pseudo_path = self.format_path(pseudo_path)
-            self._validate_pseudo_path(pseudo_path)
+            ValidateExport.pseudo_path(pseudo_path)
 
             if cluster_id not in self.exports:
                 self.exports[cluster_id] = []
@@ -339,61 +406,6 @@ class FSExport(object):
         except Exception as e:
             return exception_handler(e, f"Failed to get {pseudo_path} export for {cluster_id}")
 
-    def _validate_pseudo_path(self, path):
-        if not isabs(path) or path == "/":
-            raise NFSInvalidOperation(f"pseudo path {path} is invalid. It should be an absolute "
-                                      "path and it cannot be just '/'.")
-
-    def _validate_squash(self, squash):
-        valid_squash_ls = ["root", "root_squash", "rootsquash", "rootid", "root_id_squash",
-                "rootidsquash", "all", "all_squash", "allsquash", "all_anomnymous", "allanonymous",
-                "no_root_squash", "none", "noidsquash"]
-        if squash not in valid_squash_ls:
-            raise NFSInvalidOperation(f"squash {squash} not in valid list {valid_squash_ls}")
-
-    def _validate_security_label(self, label):
-        if not isinstance(label, bool):
-            raise NFSInvalidOperation('Only boolean values allowed')
-
-    def _validate_protocols(self, proto):
-        for p in proto:
-            if p not in [3, 4]:
-                raise NFSInvalidOperation(f"Invalid protocol {p}")
-        if 3 in proto:
-            log.warning("NFS V3 is an old version, it might not work")
-
-    def _validate_transport(self, transport):
-        valid_transport = ["UDP", "TCP"]
-        for trans in transport:
-            if trans.upper() not in valid_transport:
-                raise NFSInvalidOperation(f'{trans} is not a valid transport protocol')
-
-    def _validate_access_type(self, access_type):
-        valid_ones = ['RW', 'RO']
-        if access_type not in valid_ones:
-            raise NFSInvalidOperation(f'{access_type} is invalid, valid access type are'
-                                      f'{valid_ones}')
-
-    def _validate_fsal(self, old, new):
-        if old.name != new['name']:
-            raise NFSInvalidOperation('FSAL name change not allowed')
-        if old.user_id != new['user_id']:
-            raise NFSInvalidOperation('User ID modification is not allowed')
-        if new['sec_label_xattr']:
-            raise NFSInvalidOperation('Security label xattr cannot be changed')
-        if old.fs_name != new['fs_name']:
-            if not self.check_fs(new['fs_name']):
-                raise FSNotFound(new['fs_name'])
-            return 1
-
-    def _validate_client(self, client):
-        self._validate_access_type(client['access_type'])
-        self._validate_squash(client['squash'])
-
-    def _validate_clients(self, clients_ls):
-        for client in clients_ls:
-            self._validate_client(client)
-
     def _fetch_export_obj(self, ex_id):
         try:
             with self.mgr.rados.open_ioctx(self.rados_pool) as ioctx:
@@ -421,39 +433,39 @@ class FSExport(object):
                 raise NFSObjectNotFound('Export does not exist')
             else:
                 new_export_dict['pseudo'] = self.format_path(new_export_dict['pseudo'])
-                self._validate_pseudo_path(new_export_dict['pseudo'])
+                ValidateExport.pseudo_path(new_export_dict['pseudo'])
                 log.debug(f"Pseudo path has changed from {export.pseudo} to "\
                           f"{new_export_dict['pseudo']}")
         # Check if squash changed
         if export.squash != new_export_dict['squash']:
             if new_export_dict['squash']:
                 new_export_dict['squash'] = new_export_dict['squash'].lower()
-                self._validate_squash(new_export_dict['squash'])
+                ValidateExport.squash(new_export_dict['squash'])
             log.debug(f"squash has changed from {export.squash} to {new_export_dict['squash']}")
         # Security label check
         if export.security_label != new_export_dict['security_label']:
-            self._validate_security_label(new_export_dict['security_label'])
+            ValidateExport.security_label(new_export_dict['security_label'])
         # Protocol Checking
         if export.protocols != new_export_dict['protocols']:
-            self._validate_protocols(new_export_dict['protocols'])
+            ValidateExport.protocols(new_export_dict['protocols'])
         # Transport checking
         if export.transports != new_export_dict['transports']:
-            self._validate_transport(new_export_dict['transports'])
+            ValidateExport.transport(new_export_dict['transports'])
         # Path check
         if export.path != new_export_dict['path']:
             new_export_dict['path'] = self.format_path(new_export_dict['path'])
             out_msg = 'update caps'
         # Check Access Type
         if export.access_type != new_export_dict['access_type']:
-            self._validate_access_type(new_export_dict['access_type'])
+            ValidateExport.access_type(new_export_dict['access_type'])
         # Fsal block check
         if export.fsal != new_export_dict['fsal']:
-            ret = self._validate_fsal(export.fsal, new_export_dict['fsal'])
+            ret = ValidateExport.fsal(self.mgr, export.fsal, new_export_dict['fsal'])
             if ret == 1 and not out_msg:
                 out_msg = 'update caps'
         # Check client block
         if export.clients != new_export_dict['clients']:
-            self._validate_clients(new_export_dict['clients'])
+            ValidateExport.clients(new_export_dict['clients'])
         log.debug(f'Validation succeeded for Export {export.pseudo}')
         return export, out_msg