import pytest
-import yaml
-
from ceph.deployment.drive_group import DriveGroupSpec, DeviceSelection
from cephadm.serve import CephadmServe
from cephadm.services.osd import OSD, OSDRemovalQueue, OsdIdClaims
+ '"keyring": "", "files": {"config": "[mon.test]\\npublic network = 127.0.0.0/8\\n"}}',
image='')
- @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
- def test_monitoring_ports(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
- _run_cephadm.return_value = ('{}', '', 0)
-
- with with_host(cephadm_module, 'test'):
-
- yaml_str = """service_type: alertmanager
-service_name: alertmanager
-placement:
- count: 1
-spec:
- port: 4200
-"""
- yaml_file = yaml.safe_load(yaml_str)
- spec = ServiceSpec.from_json(yaml_file)
-
- with mock.patch("cephadm.services.monitoring.AlertmanagerService.generate_config", return_value=({}, [])):
- with with_service(cephadm_module, spec):
-
- CephadmServe(cephadm_module)._check_daemons()
-
- _run_cephadm.assert_called_with(
- 'test', 'alertmanager.test', 'deploy', [
- '--name', 'alertmanager.test',
- '--meta-json', '{"service_name": "alertmanager", "ports": [4200, 9094], "ip": null, "deployed_by": [], "rank": null, "rank_generation": null}',
- '--config-json', '-',
- '--tcp-ports', '4200 9094',
- '--reconfig'
- ],
- stdin='{}',
- image='')
-
@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
def test_daemon_check_post(self, cephadm_module: CephadmOrchestrator):
with with_host(cephadm_module, 'test'):
assert not r
assert cephadm_module.health_checks.get('CEPHADM_DAEMON_PLACE_FAIL') is not None
assert cephadm_module.health_checks['CEPHADM_DAEMON_PLACE_FAIL']['count'] == 1
- assert 'Failed to place 1 daemon(s)' in cephadm_module.health_checks['CEPHADM_DAEMON_PLACE_FAIL']['summary']
- assert 'Failed while placing mgr.a on test: fail' in cephadm_module.health_checks['CEPHADM_DAEMON_PLACE_FAIL']['detail']
+ assert 'Failed to place 1 daemon(s)' in cephadm_module.health_checks[
+ 'CEPHADM_DAEMON_PLACE_FAIL']['summary']
+ assert 'Failed while placing mgr.a on test: fail' in cephadm_module.health_checks[
+ 'CEPHADM_DAEMON_PLACE_FAIL']['detail']
@mock.patch("cephadm.serve.CephadmServe._run_cephadm")
def test_apply_spec_fail_health_warning(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
assert cephadm_module.apply_spec_fails
assert cephadm_module.health_checks.get('CEPHADM_APPLY_SPEC_FAIL') is not None
assert cephadm_module.health_checks['CEPHADM_APPLY_SPEC_FAIL']['count'] == 1
- assert 'Failed to apply 1 service(s)' in cephadm_module.health_checks['CEPHADM_APPLY_SPEC_FAIL']['summary']
+ assert 'Failed to apply 1 service(s)' in cephadm_module.health_checks[
+ 'CEPHADM_APPLY_SPEC_FAIL']['summary']
@mock.patch("cephadm.module.CephadmOrchestrator.get_foreign_ceph_option")
@mock.patch("cephadm.serve.CephadmServe._run_cephadm")
with with_host(cephadm_module, 'test'):
ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1)
get_foreign_ceph_option.side_effect = KeyError
- CephadmServe(cephadm_module)._apply_service_config(ServiceSpec('mgr', placement=ps, config={'test': 'foo'}))
+ CephadmServe(cephadm_module)._apply_service_config(
+ ServiceSpec('mgr', placement=ps, config={'test': 'foo'}))
assert cephadm_module.health_checks.get('CEPHADM_INVALID_CONFIG_OPTION') is not None
assert cephadm_module.health_checks['CEPHADM_INVALID_CONFIG_OPTION']['count'] == 1
- assert 'Ignoring 1 invalid config option(s)' in cephadm_module.health_checks['CEPHADM_INVALID_CONFIG_OPTION']['summary']
- assert 'Ignoring invalid mgr config option test' in cephadm_module.health_checks['CEPHADM_INVALID_CONFIG_OPTION']['detail']
+ assert 'Ignoring 1 invalid config option(s)' in cephadm_module.health_checks[
+ 'CEPHADM_INVALID_CONFIG_OPTION']['summary']
+ assert 'Ignoring invalid mgr config option test' in cephadm_module.health_checks[
+ 'CEPHADM_INVALID_CONFIG_OPTION']['detail']
@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
@mock.patch("cephadm.services.nfs.NFSService.run_grace_tool", mock.MagicMock())
from textwrap import dedent
import json
+import yaml
import pytest
from unittest.mock import MagicMock, call, patch
+from cephadm.serve import CephadmServe
from cephadm.services.cephadmservice import MonService, MgrService, MdsService, RgwService, \
RbdMirrorService, CrashService, CephadmDaemonDeploySpec
from cephadm.services.iscsi import IscsiService
from cephadm.services.monitoring import GrafanaService, AlertmanagerService, PrometheusService, \
NodeExporterService
from cephadm.module import CephadmOrchestrator
-from ceph.deployment.service_spec import IscsiServiceSpec, MonitoringSpec, AlertManagerSpec
+from ceph.deployment.service_spec import IscsiServiceSpec, MonitoringSpec, AlertManagerSpec, ServiceSpec
from cephadm.tests.fixtures import with_host, with_service
from orchestrator import OrchestratorError
from orchestrator._interface import DaemonDescription
-
class FakeInventory:
def get_addr(self, name: str) -> str:
return '1.2.3.4'
'--config-json', '-', '--tcp-ports', '3000'],
stdin=json.dumps({"files": files}),
image='')
+
+ @patch("cephadm.serve.CephadmServe._run_cephadm")
+ def test_monitoring_ports(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
+ _run_cephadm.return_value = ('{}', '', 0)
+
+ with with_host(cephadm_module, 'test'):
+
+ yaml_str = """service_type: alertmanager
+service_name: alertmanager
+placement:
+ count: 1
+spec:
+ port: 4200
+"""
+ yaml_file = yaml.safe_load(yaml_str)
+ spec = ServiceSpec.from_json(yaml_file)
+
+ with patch("cephadm.services.monitoring.AlertmanagerService.generate_config", return_value=({}, [])):
+ with with_service(cephadm_module, spec):
+
+ CephadmServe(cephadm_module)._check_daemons()
+
+ _run_cephadm.assert_called_with(
+ 'test', 'alertmanager.test', 'deploy', [
+ '--name', 'alertmanager.test',
+ '--meta-json', '{"service_name": "alertmanager", "ports": [4200, 9094], "ip": null, "deployed_by": [], "rank": null, "rank_generation": null}',
+ '--config-json', '-',
+ '--tcp-ports', '4200 9094',
+ '--reconfig'
+ ],
+ stdin='{}',
+ image='')