return True
return False
+ def host_had_daemon_refresh(self, host: str) -> bool:
+ """
+ ... at least once.
+ """
+ if host in self.last_daemon_update:
+ return True
+ if host not in self.daemons:
+ return False
+ return bool(self.daemons[host])
+
def host_needs_device_refresh(self, host):
# type: (str) -> bool
if host in self.mgr.offline_hosts:
We're not checking for `host_needs_daemon_refresh`, as this might never be
False for all hosts.
"""
- return all((h in self.last_daemon_update or h in self.mgr.offline_hosts)
+ return all((self.host_had_daemon_refresh(h) or h in self.mgr.offline_hosts)
for h in self.get_hosts())
def schedule_daemon_action(self, host: str, daemon_name: str, action: str):
code, '\n'.join(err)))
return out, err, code
+ def _hosts_with_daemon_inventory(self) -> List[HostSpec]:
+ """
+ Returns all hosts that went through _refresh_host_daemons().
+
+ This mitigates a potential race, where new host was added *after*
+ ``_refresh_host_daemons()`` was called, but *before*
+ ``_apply_all_specs()`` was called. thus we end up with a hosts
+ where daemons might be running, but we have not yet detected them.
+ """
+ return [
+ h for h in self.inventory.all_specs()
+ if self.cache.host_had_daemon_refresh(h.hostname)
+ ]
+
def _add_host(self, spec):
# type: (HostSpec) -> str
"""
Schedule a service. Deploy new daemons or remove old ones, depending
on the target label and count specified in the placement.
"""
+ self.migration.verify_no_migration()
+
daemon_type = spec.service_type
service_name = spec.service_name()
if spec.unmanaged:
ha = HostAssignment(
spec=spec,
- hosts=self.inventory.all_specs(),
+ hosts=self._hosts_with_daemon_inventory(),
get_daemons_func=self.cache.get_daemons_by_service,
filter_new_host=matches_network if daemon_type == 'mon' else None,
)
return self._add_daemon('mgr', spec, self.mgr_service.prepare_create)
def _apply(self, spec: GenericSpec) -> str:
- self.migration.verify_no_migration()
-
if spec.service_type == 'host':
return self._add_host(cast(HostSpec, spec))
ha = HostAssignment(
spec=spec,
- hosts=self.inventory.all_specs(),
+ hosts=self._hosts_with_daemon_inventory(),
get_daemons_func=self.cache.get_daemons_by_service,
)
ha.validate()
HostAssignment(
spec=spec,
- hosts=self.inventory.all_specs(),
+ hosts=self.inventory.all_specs(), # All hosts, even those without daemon refresh
get_daemons_func=self.cache.get_daemons_by_service,
).validate()
@contextmanager
-def with_host(m: CephadmOrchestrator, name):
+def with_host(m: CephadmOrchestrator, name, refresh_hosts=True):
# type: (CephadmOrchestrator, str) -> None
wait(m, m.add_host(HostSpec(hostname=name)))
+ if refresh_hosts:
+ m._refresh_hosts_and_daemons()
yield
wait(m, m.remove_host(name))
return '{}', None, 0
with mock.patch("remoto.Connection", side_effect=[Connection(), Connection(), Connection()]):
with mock.patch("remoto.process.check", _check):
- with with_host(cephadm_module, 'test'):
+ with with_host(cephadm_module, 'test', refresh_hosts=False):
code, out, err = cephadm_module.check_host('test')
# First should succeed.
assert err is None
True
])
def test_upgrade_run(self, use_repo_digest, cephadm_module: CephadmOrchestrator):
- with with_host(cephadm_module, 'test'):
+ with with_host(cephadm_module, 'test', refresh_hosts=False):
cephadm_module.set_container_image('global', 'image')
if use_repo_digest:
cephadm_module.use_repo_digest = True
import json
from datetime import datetime
+import pytest
+
from ceph.deployment.service_spec import PlacementSpec, ServiceSpec, HostPlacementSpec
from cephadm import CephadmOrchestrator
from cephadm.inventory import SPEC_STORE_PREFIX, DATEFMT
from cephadm.tests.fixtures import _run_cephadm, cephadm_module, wait, with_host
+from orchestrator import OrchestratorError
from tests import mock
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
@mock.patch("cephadm.services.cephadmservice.RgwService.create_realm_zonegroup_zone", lambda _, __, ___: None)
def test_migrate_scheduler(cephadm_module: CephadmOrchestrator):
- with with_host(cephadm_module, 'host1'):
- with with_host(cephadm_module, 'host2'):
+ with with_host(cephadm_module, 'host1', refresh_hosts=False):
+ with with_host(cephadm_module, 'host2', refresh_hosts=False):
# emulate the old scheduler:
c = cephadm_module.apply_rgw(
)
assert wait(cephadm_module, c) == 'Scheduled rgw.r.z update...'
+ # with pytest.raises(OrchestratorError, match="cephadm migration still ongoing. Please wait, until the migration is complete."):
cephadm_module._apply_all_services()
+
+ cephadm_module.migration_current = 0
+ cephadm_module.migration.migrate()
+ # assert we need all daemons.
+ assert cephadm_module.migration_current == 0
+
+ cephadm_module._refresh_hosts_and_daemons()
+ cephadm_module.migration.migrate()
+
+ cephadm_module._apply_all_services()
+
out = {o.hostname for o in wait(cephadm_module, cephadm_module.list_daemons())}
assert out == {'host1', 'host2'}
)
assert wait(cephadm_module, c) == 'Scheduled rgw.r.z update...'
- cephadm_module.migration_current = 0
- cephadm_module.migration.migrate()
-
- # assert we need all daemons.
- assert cephadm_module.migration_current == 0
-
# Sorry, for this hack, but I need to make sure, Migration thinks,
# we have updated all daemons already.
cephadm_module.cache.last_daemon_update['host1'] = datetime.now()
cephadm_module.cache.last_daemon_update['host2'] = datetime.now()
+ cephadm_module.migration_current = 0
cephadm_module.migration.migrate()
assert cephadm_module.migration_current == 2