from copy import copy
import json
import logging
-from typing import TYPE_CHECKING, Dict, List, Iterator, Optional, Any, Tuple, Set, Mapping, cast
+from typing import TYPE_CHECKING, Dict, List, Iterator, Optional, Any, Tuple, Set, Mapping, cast, \
+ NamedTuple
import orchestrator
from ceph.deployment import inventory
self.mgr.set_store('inventory', json.dumps(self._inventory))
+class SpecDescription(NamedTuple):
+ spec: ServiceSpec
+ created: datetime.datetime
+ deleted: Optional[datetime.datetime]
+
+
class SpecStore():
def __init__(self, mgr):
# type: (CephadmOrchestrator) -> None
"""
return self._specs
+ def __contains__(self, name: str) -> bool:
+ return name in self._specs
+
+ def __getitem__(self, name: str) -> SpecDescription:
+ if name not in self._specs:
+ raise OrchestratorError(f'Service {name} not found.')
+ return SpecDescription(self._specs[name],
+ self.spec_created[name],
+ self.spec_deleted.get(name, None))
+
@property
def active_specs(self) -> Mapping[str, ServiceSpec]:
return {k: v for k, v in self._specs.items() if k not in self.spec_deleted}
wait(m, m.remove_host(name))
-def assert_rm_service(cephadm, srv_name):
+def assert_rm_service(cephadm: CephadmOrchestrator, srv_name):
+ mon_or_mgr = cephadm.spec_store[srv_name].spec.service_type in ('mon', 'mgr')
+ if mon_or_mgr:
+ assert 'Unable' in wait(cephadm, cephadm.remove_service(srv_name))
+ return
assert wait(cephadm, cephadm.remove_service(srv_name)) == f'Removed service {srv_name}'
+ assert cephadm.spec_store[srv_name].deleted is not None
+ CephadmServe(cephadm)._check_daemons()
CephadmServe(cephadm)._apply_all_services()
+ assert cephadm.spec_store[srv_name].deleted
+ unmanaged = cephadm.spec_store[srv_name].spec.unmanaged
+ CephadmServe(cephadm)._purge_deleted_services()
+ if not unmanaged: # cause then we're not deleting daemons
+ assert srv_name not in cephadm.spec_store, f'{cephadm.spec_store[srv_name]!r}'
@contextmanager