from unittest.mock import MagicMock
from mgr_module import NFS_POOL_NAME
+from rados import ObjectNotFound
+
from ceph.deployment.service_spec import NFSServiceSpec
from nfs import Module
from nfs.export import ExportMgr
from nfs.export_utils import GaneshaConfParser, Export, RawBlock
+from nfs.cluster import NFSCluster
from orchestrator import ServiceDescription, DaemonDescription, OrchResult
return self.temp_store[self.temp_store_namespace][key].stat()
def _ioctl_read_mock(self, key: str, size: Optional[Any] = None) -> bytes:
+ if key not in self.temp_store[self.temp_store_namespace]:
+ raise ObjectNotFound
return self.temp_store[self.temp_store_namespace][key].read(size)
def _ioctx_set_namespace_mock(self, namespace: str) -> None:
ServiceDescription(spec=NFSServiceSpec(service_id=self.cluster_id))
] if enable else []
+ orch_nfs_daemons = [
+ DaemonDescription('nfs', 'foo.mydaemon', 'myhostname')
+ ] if enable else []
+
def mock_exec(cls, args):
u = {
"user_id": "abc",
return 0, json.dumps([u]), ''
return 0, json.dumps(u), ''
- with mock.patch('nfs.module.Module.describe_service') as describe_service, \
+ def mock_describe_service(cls, *args, **kwargs):
+ if kwargs['service_type'] == 'nfs':
+ return OrchResult(orch_nfs_services)
+ return OrchResult([])
+
+ def mock_list_daemons(cls, *args, **kwargs):
+ if kwargs['daemon_type'] == 'nfs':
+ return OrchResult(orch_nfs_daemons)
+ return OrchResult([])
+
+ with mock.patch('nfs.module.Module.describe_service', mock_describe_service) as describe_service, \
+ mock.patch('nfs.module.Module.list_daemons', mock_list_daemons) as list_daemons, \
mock.patch('nfs.module.Module.rados') as rados, \
mock.patch('nfs.export.available_clusters',
return_value=[self.cluster_id]), \
mock.patch('nfs.export.restart_nfs_service'), \
+ mock.patch('nfs.cluster.restart_nfs_service'), \
mock.patch('nfs.export.ExportMgr._exec', mock_exec), \
mock.patch('nfs.export.check_fs', return_value=True), \
mock.patch('nfs.export_utils.check_fs', return_value=True), \
rados.open_ioctx.return_value.__enter__.return_value = self.io_mock
rados.open_ioctx.return_value.__exit__ = mock.Mock(return_value=None)
- describe_service.return_value = OrchResult(orch_nfs_services)
-
self._reset_temp_store()
yield
assert export.clients[0].access_type == 'rw'
assert export.clients[0].addresses == ["192.168.1.0/8"]
assert export.cluster_id == self.cluster_id
+
+ def _do_test_cluster_ls(self):
+ nfs_mod = Module('nfs', '', '')
+ cluster = NFSCluster(nfs_mod)
+
+ rc, out, err = cluster.list_nfs_cluster()
+ assert rc == 0
+ assert out == self.cluster_id
+
+ def test_cluster_ls(self):
+ self._do_mock_test(self._do_test_cluster_ls)
+
+ def _do_test_cluster_info(self):
+ nfs_mod = Module('nfs', '', '')
+ cluster = NFSCluster(nfs_mod)
+
+ rc, out, err = cluster.show_nfs_cluster_info(self.cluster_id)
+ assert rc == 0
+ assert json.loads(out) == {"foo": {"virtual_ip": None, "backend": []}}
+
+ def test_cluster_info(self):
+ self._do_mock_test(self._do_test_cluster_info)
+
+ def _do_test_cluster_config(self):
+ nfs_mod = Module('nfs', '', '')
+ cluster = NFSCluster(nfs_mod)
+
+ rc, out, err = cluster.get_nfs_cluster_config(self.cluster_id)
+ assert rc == 0
+ assert out == ""
+
+ rc, out, err = cluster.set_nfs_cluster_config(self.cluster_id, '# foo\n')
+ assert rc == 0
+
+ rc, out, err = cluster.get_nfs_cluster_config(self.cluster_id)
+ assert rc == 0
+ assert out == "# foo\n"
+
+ rc, out, err = cluster.reset_nfs_cluster_config(self.cluster_id)
+ assert rc == 0
+
+ rc, out, err = cluster.get_nfs_cluster_config(self.cluster_id)
+ assert rc == 0
+ assert out == ""
+
+ def test_cluster_config(self):
+ self._do_mock_test(self._do_test_cluster_config)