completion = self.describe_service(service_type='mds',
service_name=service,
refresh=True)
- self._orchestrator_wait([completion])
orchestrator.raise_if_exception(completion)
if completion.result:
return completion.result[0]
self.log.info(f"fs {fs_name}: adjusting daemon count from {svc.spec.placement.count} to {want}")
newspec = self.update_daemon_count(svc.spec, fs_name, want)
completion = self.apply_mds(newspec)
- self._orchestrator_wait([completion])
orchestrator.raise_if_exception(completion)
except orchestrator.OrchestratorError as e:
self.log.exception(f"fs {fs_name}: exception while updating service: {e}")
from unittest import mock
from ceph.deployment.service_spec import ServiceSpec, PlacementSpec
-from orchestrator import DaemonDescription, Completion, ServiceDescription
+from orchestrator import DaemonDescription, OrchResult, ServiceDescription
try:
from typing import Any, List
@pytest.yield_fixture()
def mds_autoscaler_module():
- with mock.patch("mds_autoscaler.module.MDSAutoscaler._orchestrator_wait"):
- m = MDSAutoscaler('cephadm', 0, 0)
-
- yield m
+ yield MDSAutoscaler('cephadm', 0, 0)
class TestCephadm(object):
@mock.patch("mds_autoscaler.module.MDSAutoscaler.describe_service")
@mock.patch("mds_autoscaler.module.MDSAutoscaler.apply_mds")
def test_scale_up(self, _apply_mds, _describe_service, _list_daemons, _get, mds_autoscaler_module: MDSAutoscaler):
- daemons = Completion(value=[
+ daemons = OrchResult(result=[
DaemonDescription(
hostname='myhost',
daemon_type='mds',
daemon_id='fs_name.myhost.b'
),
])
- daemons.finalize()
_list_daemons.return_value = daemons
- services = Completion(value=[
+ services = OrchResult(result=[
ServiceDescription(
spec=ServiceSpec(
service_type='mds',
)
)
])
- services.finalize()
_describe_service.return_value = services
- apply = Completion(value='')
- apply.finalize()
+ apply = OrchResult(result='')
_apply_mds.return_value = apply