from ceph.deployment.drive_group import DriveGroupSpec
from ceph.deployment.service_spec import \
NFSServiceSpec, ServiceSpec, PlacementSpec, assert_valid_host, \
- CustomContainerSpec, HostPlacementSpec, HA_RGWSpec
+ HostPlacementSpec, HA_RGWSpec
from ceph.utils import str_to_datetime, datetime_to_str, datetime_now
from cephadm.serve import CephadmServe
from cephadm.services.cephadmservice import CephadmDaemonSpec
self.mgr_service.fail_over()
return '' # unreachable
# stop, recreate the container+unit, then restart
- return self._create_daemon(daemon_spec)
+ return CephadmServe(self)._create_daemon(daemon_spec)
elif action == 'reconfig':
- return self._create_daemon(daemon_spec, reconfig=True)
+ return CephadmServe(self)._create_daemon(daemon_spec, reconfig=True)
actions = {
'start': ['reset-failed', 'start'],
deps.append(dd.name())
return sorted(deps)
- def _create_daemon(self,
- daemon_spec: CephadmDaemonSpec,
- reconfig: bool = False,
- osd_uuid_map: Optional[Dict[str, Any]] = None,
- ) -> str:
-
- with set_exception_subject('service', orchestrator.DaemonDescription(
- daemon_type=daemon_spec.daemon_type,
- daemon_id=daemon_spec.daemon_id,
- hostname=daemon_spec.host,
- ).service_id(), overwrite=True):
-
- image = ''
- start_time = datetime_now()
- ports: List[int] = daemon_spec.ports if daemon_spec.ports else []
-
- if daemon_spec.daemon_type == 'container':
- spec: Optional[CustomContainerSpec] = daemon_spec.spec
- if spec is None:
- # Exit here immediately because the required service
- # spec to create a daemon is not provided. This is only
- # provided when a service is applied via 'orch apply'
- # command.
- msg = "Failed to {} daemon {} on {}: Required " \
- "service specification not provided".format(
- 'reconfigure' if reconfig else 'deploy',
- daemon_spec.name(), daemon_spec.host)
- self.log.info(msg)
- return msg
- image = spec.image
- if spec.ports:
- ports.extend(spec.ports)
-
- if daemon_spec.daemon_type == 'cephadm-exporter':
- if not reconfig:
- assert daemon_spec.host
- deploy_ok = self._deploy_cephadm_binary(daemon_spec.host)
- if not deploy_ok:
- msg = f"Unable to deploy the cephadm binary to {daemon_spec.host}"
- self.log.warning(msg)
- return msg
-
- if daemon_spec.daemon_type == 'haproxy':
- haspec = cast(HA_RGWSpec, daemon_spec.spec)
- if haspec.haproxy_container_image:
- image = haspec.haproxy_container_image
-
- if daemon_spec.daemon_type == 'keepalived':
- haspec = cast(HA_RGWSpec, daemon_spec.spec)
- if haspec.keepalived_container_image:
- image = haspec.keepalived_container_image
-
- cephadm_config, deps = self.cephadm_services[daemon_type_to_service(daemon_spec.daemon_type)].generate_config(
- daemon_spec)
-
- # TCP port to open in the host firewall
- if len(ports) > 0:
- daemon_spec.extra_args.extend([
- '--tcp-ports', ' '.join(map(str, ports))
- ])
-
- # osd deployments needs an --osd-uuid arg
- if daemon_spec.daemon_type == 'osd':
- if not osd_uuid_map:
- osd_uuid_map = self.get_osd_uuid_map()
- osd_uuid = osd_uuid_map.get(daemon_spec.daemon_id)
- if not osd_uuid:
- raise OrchestratorError('osd.%s not in osdmap' % daemon_spec.daemon_id)
- daemon_spec.extra_args.extend(['--osd-fsid', osd_uuid])
-
- if reconfig:
- daemon_spec.extra_args.append('--reconfig')
- if self.allow_ptrace:
- daemon_spec.extra_args.append('--allow-ptrace')
-
- if self.cache.host_needs_registry_login(daemon_spec.host) and self.registry_url:
- self._registry_login(daemon_spec.host, self.registry_url,
- self.registry_username, self.registry_password)
-
- daemon_spec.extra_args.extend(['--config-json', '-'])
-
- self.log.info('%s daemon %s on %s' % (
- 'Reconfiguring' if reconfig else 'Deploying',
- daemon_spec.name(), daemon_spec.host))
-
- out, err, code = self._run_cephadm(
- daemon_spec.host, daemon_spec.name(), 'deploy',
- [
- '--name', daemon_spec.name(),
- ] + daemon_spec.extra_args,
- stdin=json.dumps(cephadm_config),
- image=image)
- if not code and daemon_spec.host in self.cache.daemons:
- # prime cached service state with what we (should have)
- # just created
- sd = orchestrator.DaemonDescription()
- sd.daemon_type = daemon_spec.daemon_type
- sd.daemon_id = daemon_spec.daemon_id
- sd.hostname = daemon_spec.host
- sd.status = 1
- sd.status_desc = 'starting'
- self.cache.add_daemon(daemon_spec.host, sd)
- if daemon_spec.daemon_type in ['grafana', 'iscsi', 'prometheus', 'alertmanager']:
- self.requires_post_actions.add(daemon_spec.daemon_type)
- self.cache.invalidate_host_daemons(daemon_spec.host)
- self.cache.update_daemon_config_deps(
- daemon_spec.host, daemon_spec.name(), deps, start_time)
- self.cache.save_host(daemon_spec.host)
- msg = "{} {} on host '{}'".format(
- 'Reconfigured' if reconfig else 'Deployed', daemon_spec.name(), daemon_spec.host)
- if not code:
- self.events.for_daemon(daemon_spec.name(), OrchestratorEvent.INFO, msg)
- else:
- what = 'reconfigure' if reconfig else 'deploy'
- self.events.for_daemon(
- daemon_spec.name(), OrchestratorEvent.ERROR, f'Failed to {what}: {err}')
- return msg
-
def _deploy_cephadm_binary(self, host: str) -> bool:
# Use tee (from coreutils) to create a copy of cephadm on the target machine
self.log.info(f"Deploying cephadm binary to {host}")
@forall_hosts
def create_func_map(*args: Any) -> str:
daemon_spec = create_func(*args)
- return self._create_daemon(daemon_spec)
+ return CephadmServe(self)._create_daemon(daemon_spec)
return create_func_map(args)
import json
import logging
from collections import defaultdict
-from typing import TYPE_CHECKING, Optional, List, Callable, cast, Set, Dict
+from typing import TYPE_CHECKING, Optional, List, Callable, cast, Set, Dict, Any
try:
import remoto
from ceph.deployment import inventory
from ceph.deployment.drive_group import DriveGroupSpec
-from ceph.deployment.service_spec import ServiceSpec, HostPlacementSpec, RGWSpec, HA_RGWSpec
+from ceph.deployment.service_spec import ServiceSpec, HostPlacementSpec, RGWSpec, \
+ HA_RGWSpec, CustomContainerSpec
from ceph.utils import str_to_datetime, datetime_now
import orchestrator
+from orchestrator import OrchestratorError, set_exception_subject, OrchestratorEvent
+from cephadm.services.cephadmservice import CephadmDaemonSpec
from cephadm.schedule import HostAssignment
from cephadm.upgrade import CEPH_UPGRADE_ORDER
from cephadm.utils import forall_hosts, cephadmNoImage, is_repo_digest
-from orchestrator import OrchestratorError
from orchestrator._interface import daemon_type_to_service, service_to_daemon_types
if TYPE_CHECKING:
try:
daemon_spec = self.mgr.cephadm_services[service_type].prepare_create(
daemon_spec)
- self.mgr._create_daemon(daemon_spec)
+ self._create_daemon(daemon_spec)
r = True
except (RuntimeError, OrchestratorError) as e:
self.mgr.events.for_service(spec, 'ERROR',
self.mgr.cache.schedule_daemon_action(
daemon.hostname, daemon.name(), 'reconfig')
return spec
+
+ def _create_daemon(self,
+ daemon_spec: CephadmDaemonSpec,
+ reconfig: bool = False,
+ osd_uuid_map: Optional[Dict[str, Any]] = None,
+ ) -> str:
+
+ with set_exception_subject('service', orchestrator.DaemonDescription(
+ daemon_type=daemon_spec.daemon_type,
+ daemon_id=daemon_spec.daemon_id,
+ hostname=daemon_spec.host,
+ ).service_id(), overwrite=True):
+
+ image = ''
+ start_time = datetime_now()
+ ports: List[int] = daemon_spec.ports if daemon_spec.ports else []
+
+ if daemon_spec.daemon_type == 'container':
+ spec: Optional[CustomContainerSpec] = daemon_spec.spec
+ if spec is None:
+ # Exit here immediately because the required service
+ # spec to create a daemon is not provided. This is only
+ # provided when a service is applied via 'orch apply'
+ # command.
+ msg = "Failed to {} daemon {} on {}: Required " \
+ "service specification not provided".format(
+ 'reconfigure' if reconfig else 'deploy',
+ daemon_spec.name(), daemon_spec.host)
+ self.log.info(msg)
+ return msg
+ image = spec.image
+ if spec.ports:
+ ports.extend(spec.ports)
+
+ if daemon_spec.daemon_type == 'cephadm-exporter':
+ if not reconfig:
+ assert daemon_spec.host
+ deploy_ok = self.mgr._deploy_cephadm_binary(daemon_spec.host)
+ if not deploy_ok:
+ msg = f"Unable to deploy the cephadm binary to {daemon_spec.host}"
+ self.log.warning(msg)
+ return msg
+
+ if daemon_spec.daemon_type == 'haproxy':
+ haspec = cast(HA_RGWSpec, daemon_spec.spec)
+ if haspec.haproxy_container_image:
+ image = haspec.haproxy_container_image
+
+ if daemon_spec.daemon_type == 'keepalived':
+ haspec = cast(HA_RGWSpec, daemon_spec.spec)
+ if haspec.keepalived_container_image:
+ image = haspec.keepalived_container_image
+
+ cephadm_config, deps = self.mgr.cephadm_services[daemon_type_to_service(daemon_spec.daemon_type)].generate_config(
+ daemon_spec)
+
+ # TCP port to open in the host firewall
+ if len(ports) > 0:
+ daemon_spec.extra_args.extend([
+ '--tcp-ports', ' '.join(map(str, ports))
+ ])
+
+ # osd deployments needs an --osd-uuid arg
+ if daemon_spec.daemon_type == 'osd':
+ if not osd_uuid_map:
+ osd_uuid_map = self.mgr.get_osd_uuid_map()
+ osd_uuid = osd_uuid_map.get(daemon_spec.daemon_id)
+ if not osd_uuid:
+ raise OrchestratorError('osd.%s not in osdmap' % daemon_spec.daemon_id)
+ daemon_spec.extra_args.extend(['--osd-fsid', osd_uuid])
+
+ if reconfig:
+ daemon_spec.extra_args.append('--reconfig')
+ if self.mgr.allow_ptrace:
+ daemon_spec.extra_args.append('--allow-ptrace')
+
+ if self.mgr.cache.host_needs_registry_login(daemon_spec.host) and self.mgr.registry_url:
+ self.mgr._registry_login(daemon_spec.host, self.mgr.registry_url,
+ self.mgr.registry_username, self.mgr.registry_password)
+
+ daemon_spec.extra_args.extend(['--config-json', '-'])
+
+ self.log.info('%s daemon %s on %s' % (
+ 'Reconfiguring' if reconfig else 'Deploying',
+ daemon_spec.name(), daemon_spec.host))
+
+ out, err, code = self.mgr._run_cephadm(
+ daemon_spec.host, daemon_spec.name(), 'deploy',
+ [
+ '--name', daemon_spec.name(),
+ ] + daemon_spec.extra_args,
+ stdin=json.dumps(cephadm_config),
+ image=image)
+ if not code and daemon_spec.host in self.mgr.cache.daemons:
+ # prime cached service state with what we (should have)
+ # just created
+ sd = orchestrator.DaemonDescription()
+ sd.daemon_type = daemon_spec.daemon_type
+ sd.daemon_id = daemon_spec.daemon_id
+ sd.hostname = daemon_spec.host
+ sd.status = 1
+ sd.status_desc = 'starting'
+ self.mgr.cache.add_daemon(daemon_spec.host, sd)
+ if daemon_spec.daemon_type in ['grafana', 'iscsi', 'prometheus', 'alertmanager']:
+ self.mgr.requires_post_actions.add(daemon_spec.daemon_type)
+ self.mgr.cache.invalidate_host_daemons(daemon_spec.host)
+ self.mgr.cache.update_daemon_config_deps(
+ daemon_spec.host, daemon_spec.name(), deps, start_time)
+ self.mgr.cache.save_host(daemon_spec.host)
+ msg = "{} {} on host '{}'".format(
+ 'Reconfigured' if reconfig else 'Deployed', daemon_spec.name(), daemon_spec.host)
+ if not code:
+ self.mgr.events.for_daemon(daemon_spec.name(), OrchestratorEvent.INFO, msg)
+ else:
+ what = 'reconfigure' if reconfig else 'deploy'
+ self.mgr.events.for_daemon(
+ daemon_spec.name(), OrchestratorEvent.ERROR, f'Failed to {what}: {err}')
+ return msg