from contextlib import contextmanager
from pyfakefs import fake_filesystem
-from typing import Dict, List, Optional
+
+from typing import Callable, Dict, List, Optional
with mock.patch('builtins.open', create=True):
cd = SourceFileLoader('cephadm', 'cephadm').load_module()
+def mock_docker():
+ docker = mock.Mock(cd.Docker)
+ docker.path = '/usr/bin/docker'
+ return docker
+
+
+def mock_podman():
+ podman = mock.Mock(cd.Podman)
+ podman.path = '/usr/bin/podman'
+ podman.version = (2, 1, 0)
+ return podman
+
+
def _daemon_path():
return os.getcwd()
@contextmanager
def with_cephadm_ctx(
cmd: List[str],
+ container_engine: Callable = mock_podman(),
list_networks: Optional[Dict[str,Dict[str,List[str]]]] = None
):
"""
:param cmd: cephadm command argv
+ :param container_engine: container engine mock (podman or docker)
:param list_networks: mock 'list-networks' return
"""
if not list_networks:
mock.patch('cephadm.json_loads_retry', return_value={'epoch' : 1}), \
mock.patch('cephadm.list_networks', return_value=list_networks):
ctx: cd.CephadmContext = cd.cephadm_init_ctx(cmd)
+ ctx.container_engine = container_engine
yield ctx
from urllib.error import HTTPError
from typing import List, Optional
-from .fixtures import exporter
+from .fixtures import exporter, mock_docker, mock_podman
with mock.patch('builtins.open', create=True):
class TestCephAdm(object):
- @staticmethod
- def mock_docker():
- docker = mock.Mock(cd.Docker)
- docker.path = '/usr/bin/docker'
- return docker
-
- @staticmethod
- def mock_podman():
- podman = mock.Mock(cd.Podman)
- podman.path = '/usr/bin/podman'
- podman.version = (2, 1, 0)
- return podman
-
def test_docker_unit_file(self):
ctx = mock.Mock()
- ctx.container_engine = self.mock_docker()
+ ctx.container_engine = mock_docker()
r = cd.get_unit_file(ctx, '9b9d7609-f4d5-4aba-94c8-effa764d96c9')
assert 'Requires=docker.service' in r
- ctx.container_engine = self.mock_podman()
+ ctx.container_engine = mock_podman()
r = cd.get_unit_file(ctx, '9b9d7609-f4d5-4aba-94c8-effa764d96c9')
assert 'Requires=docker.service' not in r
['registry-login', '--registry-url', 'sample-url',
'--registry-username', 'sample-user', '--registry-password',
'sample-pass'])
- ctx.container_engine = self.mock_docker()
+ ctx.container_engine = mock_docker()
retval = cd.command_registry_login(ctx)
assert retval == 0
get_parm.return_value = {"url": "sample-url", "username": "sample-username", "password": "sample-password"}
ctx: cd.CephadmContext = cd.cephadm_init_ctx(
['registry-login', '--registry-json', 'sample-json'])
- ctx.container_engine = self.mock_docker()
+ ctx.container_engine = mock_docker()
retval = cd.command_registry_login(ctx)
assert retval == 0