return getattr(exception_obj, 'errno', -1), "", str(exception_obj)
+def check_fs(mgr, fs_name):
+ fs_map = mgr.get('fs_map')
+ return fs_name in [fs['mdsmap']['fs_name'] for fs in fs_map['filesystems']]
+
+
class NFSRados:
def __init__(self, mgr, namespace):
self.mgr = mgr
return False
+class ValidateExport:
+ @staticmethod
+ def pseudo_path(path):
+ if not isabs(path) or path == "/":
+ raise NFSInvalidOperation(f"pseudo path {path} is invalid. It should be an absolute "
+ "path and it cannot be just '/'.")
+
+ @staticmethod
+ def squash(squash):
+ valid_squash_ls = ["root", "root_squash", "rootsquash", "rootid", "root_id_squash",
+ "rootidsquash", "all", "all_squash", "allsquash", "all_anomnymous",
+ "allanonymous", "no_root_squash", "none", "noidsquash"]
+ if squash not in valid_squash_ls:
+ raise NFSInvalidOperation(f"squash {squash} not in valid list {valid_squash_ls}")
+
+ @staticmethod
+ def security_label(label):
+ if not isinstance(label, bool):
+ raise NFSInvalidOperation('Only boolean values allowed')
+
+ @staticmethod
+ def protocols(proto):
+ for p in proto:
+ if p not in [3, 4]:
+ raise NFSInvalidOperation(f"Invalid protocol {p}")
+ if 3 in proto:
+ log.warning("NFS V3 is an old version, it might not work")
+
+ @staticmethod
+ def transport(transport):
+ valid_transport = ["UDP", "TCP"]
+ for trans in transport:
+ if trans.upper() not in valid_transport:
+ raise NFSInvalidOperation(f'{trans} is not a valid transport protocol')
+
+ @staticmethod
+ def access_type(access_type):
+ valid_ones = ['RW', 'RO']
+ if access_type not in valid_ones:
+ raise NFSInvalidOperation(f'{access_type} is invalid, valid access type are'
+ f'{valid_ones}')
+
+ @staticmethod
+ def fsal(mgr, old, new):
+ if old.name != new['name']:
+ raise NFSInvalidOperation('FSAL name change not allowed')
+ if old.user_id != new['user_id']:
+ raise NFSInvalidOperation('User ID modification is not allowed')
+ if new['sec_label_xattr']:
+ raise NFSInvalidOperation('Security label xattr cannot be changed')
+ if old.fs_name != new['fs_name']:
+ if not check_fs(mgr, new['fs_name']):
+ raise FSNotFound(new['fs_name'])
+ return 1
+
+ @staticmethod
+ def _client(client):
+ ValidateExport.access_type(client['access_type'])
+ ValidateExport.squash(client['squash'])
+
+ @staticmethod
+ def clients(clients_ls):
+ for client in clients_ls:
+ ValidateExport._client(client)
+
+
class FSExport(object):
def __init__(self, mgr, namespace=None):
self.mgr = mgr
path = path[1:]
return path
- def check_fs(self, fs_name):
- fs_map = self.mgr.get('fs_map')
- return fs_name in [fs['mdsmap']['fs_name'] for fs in fs_map['filesystems']]
-
@export_cluster_checker
def create_export(self, fs_name, cluster_id, pseudo_path, read_only, path):
try:
- if not self.check_fs(fs_name):
+ if not check_fs(self.mgr, fs_name):
raise FSNotFound(fs_name)
pseudo_path = self.format_path(pseudo_path)
- self._validate_pseudo_path(pseudo_path)
+ ValidateExport.pseudo_path(pseudo_path)
if cluster_id not in self.exports:
self.exports[cluster_id] = []
except Exception as e:
return exception_handler(e, f"Failed to get {pseudo_path} export for {cluster_id}")
- def _validate_pseudo_path(self, path):
- if not isabs(path) or path == "/":
- raise NFSInvalidOperation(f"pseudo path {path} is invalid. It should be an absolute "
- "path and it cannot be just '/'.")
-
- def _validate_squash(self, squash):
- valid_squash_ls = ["root", "root_squash", "rootsquash", "rootid", "root_id_squash",
- "rootidsquash", "all", "all_squash", "allsquash", "all_anomnymous", "allanonymous",
- "no_root_squash", "none", "noidsquash"]
- if squash not in valid_squash_ls:
- raise NFSInvalidOperation(f"squash {squash} not in valid list {valid_squash_ls}")
-
- def _validate_security_label(self, label):
- if not isinstance(label, bool):
- raise NFSInvalidOperation('Only boolean values allowed')
-
- def _validate_protocols(self, proto):
- for p in proto:
- if p not in [3, 4]:
- raise NFSInvalidOperation(f"Invalid protocol {p}")
- if 3 in proto:
- log.warning("NFS V3 is an old version, it might not work")
-
- def _validate_transport(self, transport):
- valid_transport = ["UDP", "TCP"]
- for trans in transport:
- if trans.upper() not in valid_transport:
- raise NFSInvalidOperation(f'{trans} is not a valid transport protocol')
-
- def _validate_access_type(self, access_type):
- valid_ones = ['RW', 'RO']
- if access_type not in valid_ones:
- raise NFSInvalidOperation(f'{access_type} is invalid, valid access type are'
- f'{valid_ones}')
-
- def _validate_fsal(self, old, new):
- if old.name != new['name']:
- raise NFSInvalidOperation('FSAL name change not allowed')
- if old.user_id != new['user_id']:
- raise NFSInvalidOperation('User ID modification is not allowed')
- if new['sec_label_xattr']:
- raise NFSInvalidOperation('Security label xattr cannot be changed')
- if old.fs_name != new['fs_name']:
- if not self.check_fs(new['fs_name']):
- raise FSNotFound(new['fs_name'])
- return 1
-
- def _validate_client(self, client):
- self._validate_access_type(client['access_type'])
- self._validate_squash(client['squash'])
-
- def _validate_clients(self, clients_ls):
- for client in clients_ls:
- self._validate_client(client)
-
def _fetch_export_obj(self, ex_id):
try:
with self.mgr.rados.open_ioctx(self.rados_pool) as ioctx:
raise NFSObjectNotFound('Export does not exist')
else:
new_export_dict['pseudo'] = self.format_path(new_export_dict['pseudo'])
- self._validate_pseudo_path(new_export_dict['pseudo'])
+ ValidateExport.pseudo_path(new_export_dict['pseudo'])
log.debug(f"Pseudo path has changed from {export.pseudo} to "\
f"{new_export_dict['pseudo']}")
# Check if squash changed
if export.squash != new_export_dict['squash']:
if new_export_dict['squash']:
new_export_dict['squash'] = new_export_dict['squash'].lower()
- self._validate_squash(new_export_dict['squash'])
+ ValidateExport.squash(new_export_dict['squash'])
log.debug(f"squash has changed from {export.squash} to {new_export_dict['squash']}")
# Security label check
if export.security_label != new_export_dict['security_label']:
- self._validate_security_label(new_export_dict['security_label'])
+ ValidateExport.security_label(new_export_dict['security_label'])
# Protocol Checking
if export.protocols != new_export_dict['protocols']:
- self._validate_protocols(new_export_dict['protocols'])
+ ValidateExport.protocols(new_export_dict['protocols'])
# Transport checking
if export.transports != new_export_dict['transports']:
- self._validate_transport(new_export_dict['transports'])
+ ValidateExport.transport(new_export_dict['transports'])
# Path check
if export.path != new_export_dict['path']:
new_export_dict['path'] = self.format_path(new_export_dict['path'])
out_msg = 'update caps'
# Check Access Type
if export.access_type != new_export_dict['access_type']:
- self._validate_access_type(new_export_dict['access_type'])
+ ValidateExport.access_type(new_export_dict['access_type'])
# Fsal block check
if export.fsal != new_export_dict['fsal']:
- ret = self._validate_fsal(export.fsal, new_export_dict['fsal'])
+ ret = ValidateExport.fsal(self.mgr, export.fsal, new_export_dict['fsal'])
if ret == 1 and not out_msg:
out_msg = 'update caps'
# Check client block
if export.clients != new_export_dict['clients']:
- self._validate_clients(new_export_dict['clients'])
+ ValidateExport.clients(new_export_dict['clients'])
log.debug(f'Validation succeeded for Export {export.pseudo}')
return export, out_msg