# flake8: noqa
-
+import json
import pytest
from typing import Optional, Tuple, Iterator, List, Any, Dict
"""
with mock.patch('nfs.module.Module.describe_service') as describe_service, \
- mock.patch('nfs.module.Module.rados') as rados:
+ mock.patch('nfs.module.Module.rados') as rados, \
+ mock.patch('nfs.export.available_clusters',
+ return_value=self.clusters.keys()), \
+ mock.patch('nfs.export.restart_nfs_service'):
rados.open_ioctx.return_value.__enter__.return_value = self.io_mock
rados.open_ioctx.return_value.__exit__ = mock.Mock(return_value=None)
'cluster_id': 'foo',
'export_id': 2,
'fsal': {'name': 'RGW',
+ 'access_key_id': 'access_key',
'secret_access_key': 'secret_key',
'user_id': 'testuser'},
'path': '/',
def test_export_validate(self, block):
cluster_id = 'foo'
blocks = GaneshaConfParser(block).parse()
- print(block)
export = Export.from_export_block(blocks[0], cluster_id)
- print(export.__dict__)
nfs_mod = Module('nfs', '', '')
with mock.patch('nfs.export_utils.check_fs', return_value=True):
export.validate(nfs_mod)
- """
def test_update_export(self):
- for cluster_id, info in self.clusters.items():
- self._do_test_update_export(cluster_id, info['exports'])
- self._reset_temp_store()
+ with self._mock_orchestrator(True):
+ for cluster_id, info in self.clusters.items():
+ self._do_test_update_export(cluster_id, info['exports'])
+ self._reset_temp_store()
def _do_test_update_export(self, cluster_id, expected_exports):
- ganesha.RgwClient = MagicMock()
- admin_inst_mock = MagicMock()
- admin_inst_mock.get_user_keys.return_value = {
- 'access_key': 'access_key',
- 'secret_key': 'secret_key'
- }
- ganesha.RgwClient.admin_instance.return_value = admin_inst_mock
-
- conf = ExportMgr.instance(cluster_id)
- conf.update_export({
+ nfs_mod = Module('nfs', '', '')
+ conf = ExportMgr(nfs_mod)
+ r = conf.update_export(cluster_id, json.dumps({
'export_id': 2,
'daemons': expected_exports[2],
'path': 'bucket',
}],
'fsal': {
'name': 'RGW',
- 'rgw_user_id': 'testuser'
+ 'user_id': 'testuser',
+ 'access_key_id': 'access_key',
+ 'secret_access_key': 'secret_key',
}
- })
+ }))
+ assert r[0] == 0
- conf = ExportMgr.instance(cluster_id)
- export = conf.get_export(2)
+ export = conf._fetch_export('foo', '/rgw/bucket')
assert export.export_id == 2
- assert export.path, "bucket")
- assert export.pseudo, "/rgw/bucket")
- assert export.tag, "bucket_tag")
+ assert export.path == "bucket"
+ assert export.pseudo == "/rgw/bucket"
assert export.access_type == "RW"
assert export.squash == "all_squash"
- assert export.protocols == {4, 3}
- assert export.transports == {"TCP", "UDP"}
+ assert export.protocols == [4, 3]
+ assert export.transports == ["TCP", "UDP"]
assert export.fsal.name == "RGW"
- assert export.fsal.rgw_user_id == "testuser"
- assert export.fsal.access_key == "access_key"
- assert export.fsal.secret_key == "secret_key"
- assert len(export.clients), 1)
- assert export.clients[0].user_id == "ganesha"
- self.assertIsNone(export.clients[0].squash)
+ assert export.fsal.user_id == "testuser"
+ assert export.fsal.access_key_id == "access_key"
+ assert export.fsal.secret_access_key == "secret_key"
+ assert len(export.clients) == 1
+ assert export.clients[0].squash is None
assert export.clients[0].access_type is None
- assert export.daemons == set(expected_exports[2])
assert export.cluster_id == cluster_id
- """
def test_remove_export(self) -> None:
with self._mock_orchestrator(True), mock.patch('nfs.module.ExportMgr._exec'):