class TestCephAdm(object):
def test_docker_unit_file(self):
- ctx = mock.Mock()
+ ctx = cd.CephadmContext()
ctx.container_engine = mock_docker()
r = cd.get_unit_file(ctx, '9b9d7609-f4d5-4aba-94c8-effa764d96c9')
assert 'Requires=docker.service' in r
@mock.patch('socket.socket')
@mock.patch('cephadm.logger')
def test_check_ip_port_success(self, logger, _socket):
- ctx = mock.Mock()
+ ctx = cd.CephadmContext()
ctx.skip_ping_check = False # enables executing port check with `check_ip_port`
for address, address_family in (
@mock.patch('socket.socket')
@mock.patch('cephadm.logger')
def test_check_ip_port_failure(self, logger, _socket):
- ctx = mock.Mock()
+ ctx = cd.CephadmContext()
ctx.skip_ping_check = False # enables executing port check with `check_ip_port`
def os_error(errno):
assert image == 'docker.io/ceph/ceph:v15.2.5'
def test_should_log_to_journald(self):
- ctx = mock.Mock()
+ ctx = cd.CephadmContext()
# explicit
ctx.log_to_journald = True
assert cd.should_log_to_journald(ctx)
class TestMonitoring(object):
@mock.patch('cephadm.call')
def test_get_version_alertmanager(self, _call):
- ctx = mock.Mock()
+ ctx = cd.CephadmContext()
+ ctx.container_engine = mock_podman()
daemon_type = 'alertmanager'
# binary `prometheus`
@mock.patch('cephadm.call')
def test_get_version_prometheus(self, _call):
- ctx = mock.Mock()
+ ctx = cd.CephadmContext()
+ ctx.container_engine = mock_podman()
daemon_type = 'prometheus'
_call.return_value = '', '{}, version 0.16.1'.format(daemon_type), 0
version = cd.Monitoring.get_version(ctx, 'container_id', daemon_type)
@mock.patch('cephadm.call')
def test_get_version_node_exporter(self, _call):
- ctx = mock.Mock()
+ ctx = cd.CephadmContext()
+ ctx.container_engine = mock_podman()
daemon_type = 'node-exporter'
_call.return_value = '', '{}, version 0.16.1'.format(daemon_type.replace('-', '_')), 0
version = cd.Monitoring.get_version(ctx, 'container_id', daemon_type)