protocols = [protocols]
transports = export_block.values.get('transports')
- if not isinstance(transports, list):
+ if isinstance(transports, str):
transports = [transports]
+ elif not transports:
+ transports = []
return cls(export_block.values['export_id'],
export_block.values['path'],
cluster_id,
export_block.values['pseudo'],
- export_block.values['access_type'],
+ export_block.values.get('access_type', 'none'),
export_block.values.get('squash', 'no_root_squash'),
export_block.values.get('security_label', True),
protocols,
@staticmethod
def validate_access_type(access_type: str) -> None:
valid_access_types = ['rw', 'ro', 'none']
- if access_type.lower() not in valid_access_types:
+ if not isinstance(access_type, str) or access_type.lower() not in valid_access_types:
raise NFSInvalidOperation(
f'{access_type} is invalid, valid access type are'
f'{valid_access_types}'
"rootidsquash", "all", "all_squash", "allsquash", "all_anomnymous",
"allanonymous", "no_root_squash", "none", "noidsquash",
]
- if squash not in valid_squash_ls:
+ if squash.lower() not in valid_squash_ls:
raise NFSInvalidOperation(
f"squash {squash} not in valid list {valid_squash_ls}"
)
raise NFSInvalidOperation(f'{trans} is not a valid transport protocol')
for client in self.clients:
- self.validate_squash(client.squash)
- self.validate_access_type(client.access_type)
+ if client.squash:
+ self.validate_squash(client.squash)
+ if client.access_type:
+ self.validate_access_type(client.access_type)
if self.fsal.name == 'CEPH':
fs = cast(CephFSFSAL, self.fsal)
'pseudo': '/cephfs_a/',
'security_label': True,
'squash': 'no_root_squash',
- 'transports': [None]}
+ 'transports': []}
export = [e for e in conf.exports['foo'] if e.export_id == 2][0]
ex_dict = export.to_dict()
export2 = Export.from_dict(j['export_id'], j)
j2 = export2.to_dict()
assert j == j2
- """
+
+ @pytest.mark.parametrize(
+ "block",
+ [
+ export_1,
+ export_2,
+ ]
+ )
+ def test_export_validate(self, block):
+ cluster_id = 'foo'
+ blocks = GaneshaConfParser(block).parse()
+ print(block)
+ export = Export.from_export_block(blocks[0], cluster_id)
+ print(export.__dict__)
+ nfs_mod = Module('nfs', '', '')
+ with mock.patch('nfs.export_utils.check_fs', return_value=True):
+ export.validate(nfs_mod)
+
+ """
def test_update_export(self):
for cluster_id, info in self.clusters.items():
self._do_test_update_export(cluster_id, info['exports'])