for k_s in unknowns:
del self.events[k_s]
- def get_for_service(self, name):
+ def get_for_service(self, name) -> List[OrchestratorEvent]:
return self.events.get('service:' + name, [])
- def get_for_daemon(self, name):
+ def get_for_daemon(self, name) -> List[OrchestratorEvent]:
return self.events.get('daemon:' + name, [])
assert pat in val
-def mon_command(*args, **kwargs):
- return 0, '', ''
-
@contextmanager
def with_cephadm_module(module_options=None, store=None):
"""
:param store: Set the store before module.__init__ is called
"""
with mock.patch("cephadm.module.CephadmOrchestrator.get_ceph_option", get_ceph_option),\
- mock.patch("cephadm.module.CephadmOrchestrator.remote"), \
- mock.patch("cephadm.services.osd.RemoveUtil._run_mon_cmd"), \
- mock.patch("cephadm.module.CephadmOrchestrator.send_command"), \
- mock.patch("cephadm.module.CephadmOrchestrator.get_osdmap"), \
- mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command):
+ mock.patch("cephadm.services.osd.RemoveUtil._run_mon_cmd"), \
+ mock.patch("cephadm.module.CephadmOrchestrator.get_osdmap"), \
+ mock.patch("cephadm.module.CephadmOrchestrator.remote"):
m = CephadmOrchestrator.__new__ (CephadmOrchestrator)
if module_options is not None:
from orchestrator import ServiceDescription, DaemonDescription, InventoryHost, \
HostSpec, OrchestratorError
from tests import mock
-from .fixtures import cephadm_module, wait, _run_cephadm, mon_command, match_glob, with_host, \
+from .fixtures import cephadm_module, wait, _run_cephadm, match_glob, with_host, \
with_cephadm_module
from cephadm.module import CephadmOrchestrator, CEPH_DATEFMT
cephadm_module._check_daemons()
+ @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
+ @mock.patch("cephadm.services.cephadmservice.RgwService.create_realm_zonegroup_zone", lambda _,__,___: None)
+ def test_daemon_action_fail(self, cephadm_module: CephadmOrchestrator):
+ cephadm_module.service_cache_timeout = 10
+ with with_host(cephadm_module, 'test'):
+ with with_daemon(cephadm_module, RGWSpec(service_id='myrgw.foobar'), CephadmOrchestrator.add_rgw, 'test') as daemon_id:
+ with mock.patch('ceph_module.BaseMgrModule._ceph_send_command') as _ceph_send_command:
+
+ _ceph_send_command.side_effect = Exception("myerror")
+
+ # Make sure, _check_daemons does a redeploy due to monmap change:
+ cephadm_module._store['_ceph_get/mon_map'] = {
+ 'modified': datetime.datetime.utcnow().strftime(CEPH_DATEFMT),
+ 'fsid': 'foobar',
+ }
+ cephadm_module.notify('mon_map', None)
+
+ cephadm_module._check_daemons()
+
+ evs = [e.message for e in cephadm_module.events.get_for_daemon(f'rgw.{daemon_id}')]
+
+ assert 'myerror' in ''.join(evs)
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
def test_mon_add(self, cephadm_module):