]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
mgr/cephadm: move _create_daemon to serve.py
authorSebastian Wagner <sebastian.wagner@suse.com>
Tue, 5 Jan 2021 15:19:27 +0000 (16:19 +0100)
committerSebastian Wagner <sebastian.wagner@suse.com>
Mon, 11 Jan 2021 13:54:49 +0000 (14:54 +0100)
`_create_daemon` can potentially make the CLI unresponsive and should
only be called from the serve() thread.

Signed-off-by: Sebastian Wagner <sebastian.wagner@suse.com>
src/pybind/mgr/cephadm/module.py
src/pybind/mgr/cephadm/serve.py
src/pybind/mgr/cephadm/services/osd.py

index 47d8315b5ebedc67ea8c3c0e185ff737cd5ba7f1..9cb65d8b3bc8e927b992de134cab6198669d3cc7 100644 (file)
@@ -25,7 +25,7 @@ from ceph.deployment import inventory
 from ceph.deployment.drive_group import DriveGroupSpec
 from ceph.deployment.service_spec import \
     NFSServiceSpec, ServiceSpec, PlacementSpec, assert_valid_host, \
-    CustomContainerSpec, HostPlacementSpec, HA_RGWSpec
+    HostPlacementSpec, HA_RGWSpec
 from ceph.utils import str_to_datetime, datetime_to_str, datetime_now
 from cephadm.serve import CephadmServe
 from cephadm.services.cephadmservice import CephadmDaemonSpec
@@ -1745,9 +1745,9 @@ To check that the host is reachable:
                 self.mgr_service.fail_over()
                 return ''  # unreachable
             # stop, recreate the container+unit, then restart
-            return self._create_daemon(daemon_spec)
+            return CephadmServe(self)._create_daemon(daemon_spec)
         elif action == 'reconfig':
-            return self._create_daemon(daemon_spec, reconfig=True)
+            return CephadmServe(self)._create_daemon(daemon_spec, reconfig=True)
 
         actions = {
             'start': ['reset-failed', 'start'],
@@ -2014,124 +2014,6 @@ To check that the host is reachable:
                 deps.append(dd.name())
         return sorted(deps)
 
-    def _create_daemon(self,
-                       daemon_spec: CephadmDaemonSpec,
-                       reconfig: bool = False,
-                       osd_uuid_map: Optional[Dict[str, Any]] = None,
-                       ) -> str:
-
-        with set_exception_subject('service', orchestrator.DaemonDescription(
-                daemon_type=daemon_spec.daemon_type,
-                daemon_id=daemon_spec.daemon_id,
-                hostname=daemon_spec.host,
-        ).service_id(), overwrite=True):
-
-            image = ''
-            start_time = datetime_now()
-            ports: List[int] = daemon_spec.ports if daemon_spec.ports else []
-
-            if daemon_spec.daemon_type == 'container':
-                spec: Optional[CustomContainerSpec] = daemon_spec.spec
-                if spec is None:
-                    # Exit here immediately because the required service
-                    # spec to create a daemon is not provided. This is only
-                    # provided when a service is applied via 'orch apply'
-                    # command.
-                    msg = "Failed to {} daemon {} on {}: Required " \
-                          "service specification not provided".format(
-                              'reconfigure' if reconfig else 'deploy',
-                              daemon_spec.name(), daemon_spec.host)
-                    self.log.info(msg)
-                    return msg
-                image = spec.image
-                if spec.ports:
-                    ports.extend(spec.ports)
-
-            if daemon_spec.daemon_type == 'cephadm-exporter':
-                if not reconfig:
-                    assert daemon_spec.host
-                    deploy_ok = self._deploy_cephadm_binary(daemon_spec.host)
-                    if not deploy_ok:
-                        msg = f"Unable to deploy the cephadm binary to {daemon_spec.host}"
-                        self.log.warning(msg)
-                        return msg
-
-            if daemon_spec.daemon_type == 'haproxy':
-                haspec = cast(HA_RGWSpec, daemon_spec.spec)
-                if haspec.haproxy_container_image:
-                    image = haspec.haproxy_container_image
-
-            if daemon_spec.daemon_type == 'keepalived':
-                haspec = cast(HA_RGWSpec, daemon_spec.spec)
-                if haspec.keepalived_container_image:
-                    image = haspec.keepalived_container_image
-
-            cephadm_config, deps = self.cephadm_services[daemon_type_to_service(daemon_spec.daemon_type)].generate_config(
-                daemon_spec)
-
-            # TCP port to open in the host firewall
-            if len(ports) > 0:
-                daemon_spec.extra_args.extend([
-                    '--tcp-ports', ' '.join(map(str, ports))
-                ])
-
-            # osd deployments needs an --osd-uuid arg
-            if daemon_spec.daemon_type == 'osd':
-                if not osd_uuid_map:
-                    osd_uuid_map = self.get_osd_uuid_map()
-                osd_uuid = osd_uuid_map.get(daemon_spec.daemon_id)
-                if not osd_uuid:
-                    raise OrchestratorError('osd.%s not in osdmap' % daemon_spec.daemon_id)
-                daemon_spec.extra_args.extend(['--osd-fsid', osd_uuid])
-
-            if reconfig:
-                daemon_spec.extra_args.append('--reconfig')
-            if self.allow_ptrace:
-                daemon_spec.extra_args.append('--allow-ptrace')
-
-            if self.cache.host_needs_registry_login(daemon_spec.host) and self.registry_url:
-                self._registry_login(daemon_spec.host, self.registry_url,
-                                     self.registry_username, self.registry_password)
-
-            daemon_spec.extra_args.extend(['--config-json', '-'])
-
-            self.log.info('%s daemon %s on %s' % (
-                'Reconfiguring' if reconfig else 'Deploying',
-                daemon_spec.name(), daemon_spec.host))
-
-            out, err, code = self._run_cephadm(
-                daemon_spec.host, daemon_spec.name(), 'deploy',
-                [
-                    '--name', daemon_spec.name(),
-                ] + daemon_spec.extra_args,
-                stdin=json.dumps(cephadm_config),
-                image=image)
-            if not code and daemon_spec.host in self.cache.daemons:
-                # prime cached service state with what we (should have)
-                # just created
-                sd = orchestrator.DaemonDescription()
-                sd.daemon_type = daemon_spec.daemon_type
-                sd.daemon_id = daemon_spec.daemon_id
-                sd.hostname = daemon_spec.host
-                sd.status = 1
-                sd.status_desc = 'starting'
-                self.cache.add_daemon(daemon_spec.host, sd)
-                if daemon_spec.daemon_type in ['grafana', 'iscsi', 'prometheus', 'alertmanager']:
-                    self.requires_post_actions.add(daemon_spec.daemon_type)
-            self.cache.invalidate_host_daemons(daemon_spec.host)
-            self.cache.update_daemon_config_deps(
-                daemon_spec.host, daemon_spec.name(), deps, start_time)
-            self.cache.save_host(daemon_spec.host)
-            msg = "{} {} on host '{}'".format(
-                'Reconfigured' if reconfig else 'Deployed', daemon_spec.name(), daemon_spec.host)
-            if not code:
-                self.events.for_daemon(daemon_spec.name(), OrchestratorEvent.INFO, msg)
-            else:
-                what = 'reconfigure' if reconfig else 'deploy'
-                self.events.for_daemon(
-                    daemon_spec.name(), OrchestratorEvent.ERROR, f'Failed to {what}: {err}')
-            return msg
-
     def _deploy_cephadm_binary(self, host: str) -> bool:
         # Use tee (from coreutils) to create a copy of cephadm on the target machine
         self.log.info(f"Deploying cephadm binary to {host}")
@@ -2242,7 +2124,7 @@ To check that the host is reachable:
         @forall_hosts
         def create_func_map(*args: Any) -> str:
             daemon_spec = create_func(*args)
-            return self._create_daemon(daemon_spec)
+            return CephadmServe(self)._create_daemon(daemon_spec)
 
         return create_func_map(args)
 
index ffcc69d7197b8abd9d31d35b4d14b4810a63fe8e..a258feeedbce4a25c00be3cd007610cb9244bab5 100644 (file)
@@ -2,7 +2,7 @@ import datetime
 import json
 import logging
 from collections import defaultdict
-from typing import TYPE_CHECKING, Optional, List, Callable, cast, Set, Dict
+from typing import TYPE_CHECKING, Optional, List, Callable, cast, Set, Dict, Any
 
 try:
     import remoto
@@ -11,14 +11,16 @@ except ImportError:
 
 from ceph.deployment import inventory
 from ceph.deployment.drive_group import DriveGroupSpec
-from ceph.deployment.service_spec import ServiceSpec, HostPlacementSpec, RGWSpec, HA_RGWSpec
+from ceph.deployment.service_spec import ServiceSpec, HostPlacementSpec, RGWSpec, \
+    HA_RGWSpec, CustomContainerSpec
 from ceph.utils import str_to_datetime, datetime_now
 
 import orchestrator
+from orchestrator import OrchestratorError, set_exception_subject, OrchestratorEvent
+from cephadm.services.cephadmservice import CephadmDaemonSpec
 from cephadm.schedule import HostAssignment
 from cephadm.upgrade import CEPH_UPGRADE_ORDER
 from cephadm.utils import forall_hosts, cephadmNoImage, is_repo_digest
-from orchestrator import OrchestratorError
 from orchestrator._interface import daemon_type_to_service, service_to_daemon_types
 
 if TYPE_CHECKING:
@@ -559,7 +561,7 @@ class CephadmServe:
                 try:
                     daemon_spec = self.mgr.cephadm_services[service_type].prepare_create(
                         daemon_spec)
-                    self.mgr._create_daemon(daemon_spec)
+                    self._create_daemon(daemon_spec)
                     r = True
                 except (RuntimeError, OrchestratorError) as e:
                     self.mgr.events.for_service(spec, 'ERROR',
@@ -713,3 +715,121 @@ class CephadmServe:
                     self.mgr.cache.schedule_daemon_action(
                         daemon.hostname, daemon.name(), 'reconfig')
         return spec
+
+    def _create_daemon(self,
+                       daemon_spec: CephadmDaemonSpec,
+                       reconfig: bool = False,
+                       osd_uuid_map: Optional[Dict[str, Any]] = None,
+                       ) -> str:
+
+        with set_exception_subject('service', orchestrator.DaemonDescription(
+                daemon_type=daemon_spec.daemon_type,
+                daemon_id=daemon_spec.daemon_id,
+                hostname=daemon_spec.host,
+        ).service_id(), overwrite=True):
+
+            image = ''
+            start_time = datetime_now()
+            ports: List[int] = daemon_spec.ports if daemon_spec.ports else []
+
+            if daemon_spec.daemon_type == 'container':
+                spec: Optional[CustomContainerSpec] = daemon_spec.spec
+                if spec is None:
+                    # Exit here immediately because the required service
+                    # spec to create a daemon is not provided. This is only
+                    # provided when a service is applied via 'orch apply'
+                    # command.
+                    msg = "Failed to {} daemon {} on {}: Required " \
+                          "service specification not provided".format(
+                              'reconfigure' if reconfig else 'deploy',
+                              daemon_spec.name(), daemon_spec.host)
+                    self.log.info(msg)
+                    return msg
+                image = spec.image
+                if spec.ports:
+                    ports.extend(spec.ports)
+
+            if daemon_spec.daemon_type == 'cephadm-exporter':
+                if not reconfig:
+                    assert daemon_spec.host
+                    deploy_ok = self.mgr._deploy_cephadm_binary(daemon_spec.host)
+                    if not deploy_ok:
+                        msg = f"Unable to deploy the cephadm binary to {daemon_spec.host}"
+                        self.log.warning(msg)
+                        return msg
+
+            if daemon_spec.daemon_type == 'haproxy':
+                haspec = cast(HA_RGWSpec, daemon_spec.spec)
+                if haspec.haproxy_container_image:
+                    image = haspec.haproxy_container_image
+
+            if daemon_spec.daemon_type == 'keepalived':
+                haspec = cast(HA_RGWSpec, daemon_spec.spec)
+                if haspec.keepalived_container_image:
+                    image = haspec.keepalived_container_image
+
+            cephadm_config, deps = self.mgr.cephadm_services[daemon_type_to_service(daemon_spec.daemon_type)].generate_config(
+                daemon_spec)
+
+            # TCP port to open in the host firewall
+            if len(ports) > 0:
+                daemon_spec.extra_args.extend([
+                    '--tcp-ports', ' '.join(map(str, ports))
+                ])
+
+            # osd deployments needs an --osd-uuid arg
+            if daemon_spec.daemon_type == 'osd':
+                if not osd_uuid_map:
+                    osd_uuid_map = self.mgr.get_osd_uuid_map()
+                osd_uuid = osd_uuid_map.get(daemon_spec.daemon_id)
+                if not osd_uuid:
+                    raise OrchestratorError('osd.%s not in osdmap' % daemon_spec.daemon_id)
+                daemon_spec.extra_args.extend(['--osd-fsid', osd_uuid])
+
+            if reconfig:
+                daemon_spec.extra_args.append('--reconfig')
+            if self.mgr.allow_ptrace:
+                daemon_spec.extra_args.append('--allow-ptrace')
+
+            if self.mgr.cache.host_needs_registry_login(daemon_spec.host) and self.mgr.registry_url:
+                self.mgr._registry_login(daemon_spec.host, self.mgr.registry_url,
+                                         self.mgr.registry_username, self.mgr.registry_password)
+
+            daemon_spec.extra_args.extend(['--config-json', '-'])
+
+            self.log.info('%s daemon %s on %s' % (
+                'Reconfiguring' if reconfig else 'Deploying',
+                daemon_spec.name(), daemon_spec.host))
+
+            out, err, code = self.mgr._run_cephadm(
+                daemon_spec.host, daemon_spec.name(), 'deploy',
+                [
+                    '--name', daemon_spec.name(),
+                ] + daemon_spec.extra_args,
+                stdin=json.dumps(cephadm_config),
+                image=image)
+            if not code and daemon_spec.host in self.mgr.cache.daemons:
+                # prime cached service state with what we (should have)
+                # just created
+                sd = orchestrator.DaemonDescription()
+                sd.daemon_type = daemon_spec.daemon_type
+                sd.daemon_id = daemon_spec.daemon_id
+                sd.hostname = daemon_spec.host
+                sd.status = 1
+                sd.status_desc = 'starting'
+                self.mgr.cache.add_daemon(daemon_spec.host, sd)
+                if daemon_spec.daemon_type in ['grafana', 'iscsi', 'prometheus', 'alertmanager']:
+                    self.mgr.requires_post_actions.add(daemon_spec.daemon_type)
+            self.mgr.cache.invalidate_host_daemons(daemon_spec.host)
+            self.mgr.cache.update_daemon_config_deps(
+                daemon_spec.host, daemon_spec.name(), deps, start_time)
+            self.mgr.cache.save_host(daemon_spec.host)
+            msg = "{} {} on host '{}'".format(
+                'Reconfigured' if reconfig else 'Deployed', daemon_spec.name(), daemon_spec.host)
+            if not code:
+                self.mgr.events.for_daemon(daemon_spec.name(), OrchestratorEvent.INFO, msg)
+            else:
+                what = 'reconfigure' if reconfig else 'deploy'
+                self.mgr.events.for_daemon(
+                    daemon_spec.name(), OrchestratorEvent.ERROR, f'Failed to {what}: {err}')
+            return msg
index 0929edf2f6a8d6f3697a532f639bf4f1e3e66c19..8bbec38d582f3969483864157cc69a28fc74d58b 100644 (file)
@@ -10,6 +10,7 @@ from ceph.utils import datetime_to_str, str_to_datetime
 
 from datetime import datetime
 import orchestrator
+from cephadm.serve import CephadmServe
 from cephadm.utils import forall_hosts
 from orchestrator import OrchestratorError
 from mgr_module import MonCommandFailed
@@ -106,7 +107,7 @@ class OSDService(CephService):
                     host=host,
                     daemon_type='osd',
                 )
-                self.mgr._create_daemon(
+                CephadmServe(self.mgr)._create_daemon(
                     daemon_spec,
                     osd_uuid_map=osd_uuid_map)