protocols = 4;
transports = "TCP";
}
+"""
+ export_4 = """
+EXPORT {
+ FSAL {
+ name = "CEPH";
+ user_id = "nfs.foo.1";
+ filesystem = "a";
+ secret_access_key = "AQCjU+hgjyReLBAAddJa0Dza/ZHqjX5+JiePMA==";
+ }
+ export_id = 1;
+ path = "/secure/me";
+ pseudo = "/secure1";
+ access_type = "RW";
+ squash = "no_root_squash";
+ SecType = "krb5p", "krb5i";
+ attr_expiration_time = 0;
+ security_label = true;
+ protocols = 4;
+ transports = "TCP";
+}
"""
conf_nfs_foo = f'''
assert blocks[1].block_name == "%url"
assert blocks[1].values['value'] == f"rados://{NFS_POOL_NAME}/{self.cluster_id}/export-2"
- def _do_mock_test(self, func) -> None:
+ def _do_mock_test(self, func, *args) -> None:
with self._mock_orchestrator(True):
- func()
+ func(*args)
self._reset_temp_store()
def test_ganesha_conf(self) -> None:
assert export.clients[0].access_type is None
assert export.cluster_id == self.cluster_id
+ def test_update_export_sectype(self):
+ self._do_mock_test(self._test_update_export_sectype)
+
+ def _test_update_export_sectype(self):
+ nfs_mod = Module('nfs', '', '')
+ conf = ExportMgr(nfs_mod)
+ r = conf.apply_export(self.cluster_id, json.dumps({
+ 'export_id': 2,
+ 'path': 'bucket',
+ 'pseudo': '/rgw/bucket',
+ 'cluster_id': self.cluster_id,
+ 'access_type': 'RW',
+ 'squash': 'all_squash',
+ 'security_label': False,
+ 'protocols': [4, 3],
+ 'transports': ['TCP', 'UDP'],
+ 'clients': [{
+ 'addresses': ["192.168.0.0/16"],
+ 'access_type': None,
+ 'squash': None
+ }],
+ 'fsal': {
+ 'name': 'RGW',
+ 'user_id': 'nfs.foo.bucket',
+ 'access_key_id': 'the_access_key',
+ 'secret_access_key': 'the_secret_key',
+ }
+ }))
+ assert r[0] == 0
+
+ # no sectype was given, key not present
+ info = conf._get_export_dict(self.cluster_id, "/rgw/bucket")
+ assert info["export_id"] == 2
+ assert info["path"] == "bucket"
+ assert "sectype" not in info
+
+ r = conf.apply_export(self.cluster_id, json.dumps({
+ 'export_id': 2,
+ 'path': 'bucket',
+ 'pseudo': '/rgw/bucket',
+ 'cluster_id': self.cluster_id,
+ 'access_type': 'RW',
+ 'squash': 'all_squash',
+ 'security_label': False,
+ 'protocols': [4, 3],
+ 'transports': ['TCP', 'UDP'],
+ 'clients': [{
+ 'addresses': ["192.168.0.0/16"],
+ 'access_type': None,
+ 'squash': None
+ }],
+ 'sectype': ["krb5p", "krb5i", "sys"],
+ 'fsal': {
+ 'name': 'RGW',
+ 'user_id': 'nfs.foo.bucket',
+ 'access_key_id': 'the_access_key',
+ 'secret_access_key': 'the_secret_key',
+ }
+ }))
+ assert r[0] == 0
+
+ # assert sectype matches new value(s)
+ info = conf._get_export_dict(self.cluster_id, "/rgw/bucket")
+ assert info["export_id"] == 2
+ assert info["path"] == "bucket"
+ assert info["sectype"] == ["krb5p", "krb5i", "sys"]
+
def test_update_export_with_ganesha_conf(self):
self._do_mock_test(self._do_test_update_export_with_ganesha_conf)
r = conf.apply_export(self.cluster_id, self.export_3)
assert r[0] == 0
+ def test_update_export_with_ganesha_conf_sectype(self):
+ self._do_mock_test(
+ self._do_test_update_export_with_ganesha_conf_sectype,
+ self.export_4, ["krb5p", "krb5i"])
+
+ def test_update_export_with_ganesha_conf_sectype_lcase(self):
+ export_conf = self.export_4.replace("SecType", "sectype").replace("krb5i", "sys")
+ self._do_mock_test(
+ self._do_test_update_export_with_ganesha_conf_sectype,
+ export_conf, ["krb5p", "sys"])
+
+ def _do_test_update_export_with_ganesha_conf_sectype(self, export_conf, expect_sectype):
+ nfs_mod = Module('nfs', '', '')
+ conf = ExportMgr(nfs_mod)
+ r = conf.apply_export(self.cluster_id, export_conf)
+ assert r[0] == 0
+
+ # assert sectype matches new value(s)
+ info = conf._get_export_dict(self.cluster_id, "/secure1")
+ assert info["export_id"] == 1
+ assert info["path"] == "/secure/me"
+ assert info["sectype"] == expect_sectype
+
def test_update_export_with_list(self):
self._do_mock_test(self._do_test_update_export_with_list)