From: Sebastian Wagner Date: Tue, 5 Jan 2021 15:19:27 +0000 (+0100) Subject: mgr/cephadm: move _create_daemon to serve.py X-Git-Tag: v17.0.0~90^2~6 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=a4be711d44d3ee79d7704435e6dd0c7fa3690fe2;p=ceph-ci.git mgr/cephadm: move _create_daemon to serve.py `_create_daemon` can potentially make the CLI unresponsive and should only be called from the serve() thread. Signed-off-by: Sebastian Wagner --- diff --git a/src/pybind/mgr/cephadm/module.py b/src/pybind/mgr/cephadm/module.py index 47d8315b5eb..9cb65d8b3bc 100644 --- a/src/pybind/mgr/cephadm/module.py +++ b/src/pybind/mgr/cephadm/module.py @@ -25,7 +25,7 @@ from ceph.deployment import inventory from ceph.deployment.drive_group import DriveGroupSpec from ceph.deployment.service_spec import \ NFSServiceSpec, ServiceSpec, PlacementSpec, assert_valid_host, \ - CustomContainerSpec, HostPlacementSpec, HA_RGWSpec + HostPlacementSpec, HA_RGWSpec from ceph.utils import str_to_datetime, datetime_to_str, datetime_now from cephadm.serve import CephadmServe from cephadm.services.cephadmservice import CephadmDaemonSpec @@ -1745,9 +1745,9 @@ To check that the host is reachable: self.mgr_service.fail_over() return '' # unreachable # stop, recreate the container+unit, then restart - return self._create_daemon(daemon_spec) + return CephadmServe(self)._create_daemon(daemon_spec) elif action == 'reconfig': - return self._create_daemon(daemon_spec, reconfig=True) + return CephadmServe(self)._create_daemon(daemon_spec, reconfig=True) actions = { 'start': ['reset-failed', 'start'], @@ -2014,124 +2014,6 @@ To check that the host is reachable: deps.append(dd.name()) return sorted(deps) - def _create_daemon(self, - daemon_spec: CephadmDaemonSpec, - reconfig: bool = False, - osd_uuid_map: Optional[Dict[str, Any]] = None, - ) -> str: - - with set_exception_subject('service', orchestrator.DaemonDescription( - daemon_type=daemon_spec.daemon_type, - daemon_id=daemon_spec.daemon_id, - hostname=daemon_spec.host, - ).service_id(), overwrite=True): - - image = '' - start_time = datetime_now() - ports: List[int] = daemon_spec.ports if daemon_spec.ports else [] - - if daemon_spec.daemon_type == 'container': - spec: Optional[CustomContainerSpec] = daemon_spec.spec - if spec is None: - # Exit here immediately because the required service - # spec to create a daemon is not provided. This is only - # provided when a service is applied via 'orch apply' - # command. - msg = "Failed to {} daemon {} on {}: Required " \ - "service specification not provided".format( - 'reconfigure' if reconfig else 'deploy', - daemon_spec.name(), daemon_spec.host) - self.log.info(msg) - return msg - image = spec.image - if spec.ports: - ports.extend(spec.ports) - - if daemon_spec.daemon_type == 'cephadm-exporter': - if not reconfig: - assert daemon_spec.host - deploy_ok = self._deploy_cephadm_binary(daemon_spec.host) - if not deploy_ok: - msg = f"Unable to deploy the cephadm binary to {daemon_spec.host}" - self.log.warning(msg) - return msg - - if daemon_spec.daemon_type == 'haproxy': - haspec = cast(HA_RGWSpec, daemon_spec.spec) - if haspec.haproxy_container_image: - image = haspec.haproxy_container_image - - if daemon_spec.daemon_type == 'keepalived': - haspec = cast(HA_RGWSpec, daemon_spec.spec) - if haspec.keepalived_container_image: - image = haspec.keepalived_container_image - - cephadm_config, deps = self.cephadm_services[daemon_type_to_service(daemon_spec.daemon_type)].generate_config( - daemon_spec) - - # TCP port to open in the host firewall - if len(ports) > 0: - daemon_spec.extra_args.extend([ - '--tcp-ports', ' '.join(map(str, ports)) - ]) - - # osd deployments needs an --osd-uuid arg - if daemon_spec.daemon_type == 'osd': - if not osd_uuid_map: - osd_uuid_map = self.get_osd_uuid_map() - osd_uuid = osd_uuid_map.get(daemon_spec.daemon_id) - if not osd_uuid: - raise OrchestratorError('osd.%s not in osdmap' % daemon_spec.daemon_id) - daemon_spec.extra_args.extend(['--osd-fsid', osd_uuid]) - - if reconfig: - daemon_spec.extra_args.append('--reconfig') - if self.allow_ptrace: - daemon_spec.extra_args.append('--allow-ptrace') - - if self.cache.host_needs_registry_login(daemon_spec.host) and self.registry_url: - self._registry_login(daemon_spec.host, self.registry_url, - self.registry_username, self.registry_password) - - daemon_spec.extra_args.extend(['--config-json', '-']) - - self.log.info('%s daemon %s on %s' % ( - 'Reconfiguring' if reconfig else 'Deploying', - daemon_spec.name(), daemon_spec.host)) - - out, err, code = self._run_cephadm( - daemon_spec.host, daemon_spec.name(), 'deploy', - [ - '--name', daemon_spec.name(), - ] + daemon_spec.extra_args, - stdin=json.dumps(cephadm_config), - image=image) - if not code and daemon_spec.host in self.cache.daemons: - # prime cached service state with what we (should have) - # just created - sd = orchestrator.DaemonDescription() - sd.daemon_type = daemon_spec.daemon_type - sd.daemon_id = daemon_spec.daemon_id - sd.hostname = daemon_spec.host - sd.status = 1 - sd.status_desc = 'starting' - self.cache.add_daemon(daemon_spec.host, sd) - if daemon_spec.daemon_type in ['grafana', 'iscsi', 'prometheus', 'alertmanager']: - self.requires_post_actions.add(daemon_spec.daemon_type) - self.cache.invalidate_host_daemons(daemon_spec.host) - self.cache.update_daemon_config_deps( - daemon_spec.host, daemon_spec.name(), deps, start_time) - self.cache.save_host(daemon_spec.host) - msg = "{} {} on host '{}'".format( - 'Reconfigured' if reconfig else 'Deployed', daemon_spec.name(), daemon_spec.host) - if not code: - self.events.for_daemon(daemon_spec.name(), OrchestratorEvent.INFO, msg) - else: - what = 'reconfigure' if reconfig else 'deploy' - self.events.for_daemon( - daemon_spec.name(), OrchestratorEvent.ERROR, f'Failed to {what}: {err}') - return msg - def _deploy_cephadm_binary(self, host: str) -> bool: # Use tee (from coreutils) to create a copy of cephadm on the target machine self.log.info(f"Deploying cephadm binary to {host}") @@ -2242,7 +2124,7 @@ To check that the host is reachable: @forall_hosts def create_func_map(*args: Any) -> str: daemon_spec = create_func(*args) - return self._create_daemon(daemon_spec) + return CephadmServe(self)._create_daemon(daemon_spec) return create_func_map(args) diff --git a/src/pybind/mgr/cephadm/serve.py b/src/pybind/mgr/cephadm/serve.py index ffcc69d7197..a258feeedbc 100644 --- a/src/pybind/mgr/cephadm/serve.py +++ b/src/pybind/mgr/cephadm/serve.py @@ -2,7 +2,7 @@ import datetime import json import logging from collections import defaultdict -from typing import TYPE_CHECKING, Optional, List, Callable, cast, Set, Dict +from typing import TYPE_CHECKING, Optional, List, Callable, cast, Set, Dict, Any try: import remoto @@ -11,14 +11,16 @@ except ImportError: from ceph.deployment import inventory from ceph.deployment.drive_group import DriveGroupSpec -from ceph.deployment.service_spec import ServiceSpec, HostPlacementSpec, RGWSpec, HA_RGWSpec +from ceph.deployment.service_spec import ServiceSpec, HostPlacementSpec, RGWSpec, \ + HA_RGWSpec, CustomContainerSpec from ceph.utils import str_to_datetime, datetime_now import orchestrator +from orchestrator import OrchestratorError, set_exception_subject, OrchestratorEvent +from cephadm.services.cephadmservice import CephadmDaemonSpec from cephadm.schedule import HostAssignment from cephadm.upgrade import CEPH_UPGRADE_ORDER from cephadm.utils import forall_hosts, cephadmNoImage, is_repo_digest -from orchestrator import OrchestratorError from orchestrator._interface import daemon_type_to_service, service_to_daemon_types if TYPE_CHECKING: @@ -559,7 +561,7 @@ class CephadmServe: try: daemon_spec = self.mgr.cephadm_services[service_type].prepare_create( daemon_spec) - self.mgr._create_daemon(daemon_spec) + self._create_daemon(daemon_spec) r = True except (RuntimeError, OrchestratorError) as e: self.mgr.events.for_service(spec, 'ERROR', @@ -713,3 +715,121 @@ class CephadmServe: self.mgr.cache.schedule_daemon_action( daemon.hostname, daemon.name(), 'reconfig') return spec + + def _create_daemon(self, + daemon_spec: CephadmDaemonSpec, + reconfig: bool = False, + osd_uuid_map: Optional[Dict[str, Any]] = None, + ) -> str: + + with set_exception_subject('service', orchestrator.DaemonDescription( + daemon_type=daemon_spec.daemon_type, + daemon_id=daemon_spec.daemon_id, + hostname=daemon_spec.host, + ).service_id(), overwrite=True): + + image = '' + start_time = datetime_now() + ports: List[int] = daemon_spec.ports if daemon_spec.ports else [] + + if daemon_spec.daemon_type == 'container': + spec: Optional[CustomContainerSpec] = daemon_spec.spec + if spec is None: + # Exit here immediately because the required service + # spec to create a daemon is not provided. This is only + # provided when a service is applied via 'orch apply' + # command. + msg = "Failed to {} daemon {} on {}: Required " \ + "service specification not provided".format( + 'reconfigure' if reconfig else 'deploy', + daemon_spec.name(), daemon_spec.host) + self.log.info(msg) + return msg + image = spec.image + if spec.ports: + ports.extend(spec.ports) + + if daemon_spec.daemon_type == 'cephadm-exporter': + if not reconfig: + assert daemon_spec.host + deploy_ok = self.mgr._deploy_cephadm_binary(daemon_spec.host) + if not deploy_ok: + msg = f"Unable to deploy the cephadm binary to {daemon_spec.host}" + self.log.warning(msg) + return msg + + if daemon_spec.daemon_type == 'haproxy': + haspec = cast(HA_RGWSpec, daemon_spec.spec) + if haspec.haproxy_container_image: + image = haspec.haproxy_container_image + + if daemon_spec.daemon_type == 'keepalived': + haspec = cast(HA_RGWSpec, daemon_spec.spec) + if haspec.keepalived_container_image: + image = haspec.keepalived_container_image + + cephadm_config, deps = self.mgr.cephadm_services[daemon_type_to_service(daemon_spec.daemon_type)].generate_config( + daemon_spec) + + # TCP port to open in the host firewall + if len(ports) > 0: + daemon_spec.extra_args.extend([ + '--tcp-ports', ' '.join(map(str, ports)) + ]) + + # osd deployments needs an --osd-uuid arg + if daemon_spec.daemon_type == 'osd': + if not osd_uuid_map: + osd_uuid_map = self.mgr.get_osd_uuid_map() + osd_uuid = osd_uuid_map.get(daemon_spec.daemon_id) + if not osd_uuid: + raise OrchestratorError('osd.%s not in osdmap' % daemon_spec.daemon_id) + daemon_spec.extra_args.extend(['--osd-fsid', osd_uuid]) + + if reconfig: + daemon_spec.extra_args.append('--reconfig') + if self.mgr.allow_ptrace: + daemon_spec.extra_args.append('--allow-ptrace') + + if self.mgr.cache.host_needs_registry_login(daemon_spec.host) and self.mgr.registry_url: + self.mgr._registry_login(daemon_spec.host, self.mgr.registry_url, + self.mgr.registry_username, self.mgr.registry_password) + + daemon_spec.extra_args.extend(['--config-json', '-']) + + self.log.info('%s daemon %s on %s' % ( + 'Reconfiguring' if reconfig else 'Deploying', + daemon_spec.name(), daemon_spec.host)) + + out, err, code = self.mgr._run_cephadm( + daemon_spec.host, daemon_spec.name(), 'deploy', + [ + '--name', daemon_spec.name(), + ] + daemon_spec.extra_args, + stdin=json.dumps(cephadm_config), + image=image) + if not code and daemon_spec.host in self.mgr.cache.daemons: + # prime cached service state with what we (should have) + # just created + sd = orchestrator.DaemonDescription() + sd.daemon_type = daemon_spec.daemon_type + sd.daemon_id = daemon_spec.daemon_id + sd.hostname = daemon_spec.host + sd.status = 1 + sd.status_desc = 'starting' + self.mgr.cache.add_daemon(daemon_spec.host, sd) + if daemon_spec.daemon_type in ['grafana', 'iscsi', 'prometheus', 'alertmanager']: + self.mgr.requires_post_actions.add(daemon_spec.daemon_type) + self.mgr.cache.invalidate_host_daemons(daemon_spec.host) + self.mgr.cache.update_daemon_config_deps( + daemon_spec.host, daemon_spec.name(), deps, start_time) + self.mgr.cache.save_host(daemon_spec.host) + msg = "{} {} on host '{}'".format( + 'Reconfigured' if reconfig else 'Deployed', daemon_spec.name(), daemon_spec.host) + if not code: + self.mgr.events.for_daemon(daemon_spec.name(), OrchestratorEvent.INFO, msg) + else: + what = 'reconfigure' if reconfig else 'deploy' + self.mgr.events.for_daemon( + daemon_spec.name(), OrchestratorEvent.ERROR, f'Failed to {what}: {err}') + return msg diff --git a/src/pybind/mgr/cephadm/services/osd.py b/src/pybind/mgr/cephadm/services/osd.py index 0929edf2f6a..8bbec38d582 100644 --- a/src/pybind/mgr/cephadm/services/osd.py +++ b/src/pybind/mgr/cephadm/services/osd.py @@ -10,6 +10,7 @@ from ceph.utils import datetime_to_str, str_to_datetime from datetime import datetime import orchestrator +from cephadm.serve import CephadmServe from cephadm.utils import forall_hosts from orchestrator import OrchestratorError from mgr_module import MonCommandFailed @@ -106,7 +107,7 @@ class OSDService(CephService): host=host, daemon_type='osd', ) - self.mgr._create_daemon( + CephadmServe(self.mgr)._create_daemon( daemon_spec, osd_uuid_map=osd_uuid_map)