return self._daemon_action(daemon_type, daemon_id, host, action)
def _daemon_action(self, daemon_type, daemon_id, host, action):
+ daemon_spec: CephadmDaemonSpec = CephadmDaemonSpec(
+ host=host,
+ daemon_id=daemon_id,
+ daemon_type=daemon_type,
+ )
+
if action == 'redeploy':
# stop, recreate the container+unit, then restart
- return self._create_daemon(daemon_type, daemon_id, host)
+ return self._create_daemon(daemon_spec)
elif action == 'reconfig':
- return self._create_daemon(daemon_type, daemon_id, host,
- reconfig=True)
+ return self._create_daemon(daemon_spec, reconfig=True)
actions = {
'start': ['reset-failed', 'start'],
'stop': ['stop'],
'restart': ['reset-failed', 'restart'],
}
- name = '%s.%s' % (daemon_type, daemon_id)
+ name = daemon_spec.name()
for a in actions[action]:
try:
out, err, code = self._run_cephadm(
- host, name, 'unit',
+ daemon_spec.daemon_type, name, 'unit',
['--name', name, a])
except Exception:
self.log.exception(f'`{host}: cephadm unit {name} {a}` failed')
- self.cache.invalidate_host_daemons(host)
- return "{} {} from host '{}'".format(action, name, host)
+ self.cache.invalidate_host_daemons(daemon_spec.host)
+ return "{} {} from host '{}'".format(action, name, daemon_spec.host)
@trivial_completion
def daemon_action(self, action, daemon_type, daemon_id):
}
def _create_daemon(self,
- daemon_type: str,
- daemon_id: str,
- host: str,
- keyring: Optional[str] = None,
- extra_args: Optional[List[str]] = None,
- extra_config: Optional[Dict[str, Any]] = None,
+ daemon_spec: CephadmDaemonSpec,
reconfig=False,
osd_uuid_map: Optional[Dict[str, Any]] = None,
redeploy=False,
) -> str:
- if not extra_args:
- extra_args = []
- if not extra_config:
- extra_config = {}
- name = '%s.%s' % (daemon_type, daemon_id)
start_time = datetime.datetime.utcnow()
- deps = [] # type: List[str]
- cephadm_config = {} # type: Dict[str, Any]
- if daemon_type == 'prometheus':
- cephadm_config, deps = self.prometheus_service.generate_config()
- extra_args.extend(['--config-json', '-'])
- elif daemon_type == 'grafana':
- cephadm_config, deps = self.grafana_service.generate_config()
- extra_args.extend(['--config-json', '-'])
- elif daemon_type == 'nfs':
- cephadm_config, deps = \
- self.nfs_service._generate_nfs_config(daemon_type, daemon_id, host)
- extra_args.extend(['--config-json', '-'])
- elif daemon_type == 'alertmanager':
- cephadm_config, deps = self.alertmanager_service.generate_config()
- extra_args.extend(['--config-json', '-'])
- elif daemon_type == 'node-exporter':
- cephadm_config, deps = self.node_exporter_service.generate_config()
- extra_args.extend(['--config-json', '-'])
- else:
- # Ceph.daemons (mon, mgr, mds, osd, etc)
- cephadm_config = self._get_config_and_keyring(
- daemon_type, daemon_id, host,
- keyring=keyring,
- extra_ceph_config=extra_config.pop('config', ''))
- if extra_config:
- cephadm_config.update({'files': extra_config})
- extra_args.extend(['--config-json', '-'])
-
- # osd deployments needs an --osd-uuid arg
- if daemon_type == 'osd':
- if not osd_uuid_map:
- osd_uuid_map = self.get_osd_uuid_map()
- osd_uuid = osd_uuid_map.get(daemon_id)
- if not osd_uuid:
- raise OrchestratorError('osd.%s not in osdmap' % daemon_id)
- extra_args.extend(['--osd-fsid', osd_uuid])
+ cephadm_config, deps = self.cephadm_services[daemon_spec.daemon_type].generate_config(daemon_spec)
+
+ daemon_spec.extra_args.extend(['--config-json', '-'])
+
+ # osd deployments needs an --osd-uuid arg
+ if daemon_spec.daemon_type == 'osd':
+ if not osd_uuid_map:
+ osd_uuid_map = self.get_osd_uuid_map()
+ osd_uuid = osd_uuid_map.get(daemon_spec.daemon_id)
+ if not osd_uuid:
+ raise OrchestratorError('osd.%s not in osdmap' % daemon_spec.daemon_id)
+ daemon_spec.extra_args.extend(['--osd-fsid', osd_uuid])
if reconfig:
- extra_args.append('--reconfig')
+ daemon_spec.extra_args.append('--reconfig')
if self.allow_ptrace:
- extra_args.append('--allow-ptrace')
+ daemon_spec.extra_args.append('--allow-ptrace')
self.log.info('%s daemon %s on %s' % (
'Reconfiguring' if reconfig else 'Deploying',
- name, host))
+ daemon_spec.name(), daemon_spec.host))
out, err, code = self._run_cephadm(
- host, name, 'deploy',
+ daemon_spec.host, daemon_spec.name(), 'deploy',
[
- '--name', name,
- ] + extra_args,
+ '--name', daemon_spec.name(),
+ ] + daemon_spec.extra_args,
stdin=json.dumps(cephadm_config))
- if not code and host in self.cache.daemons:
+ if not code and daemon_spec.host in self.cache.daemons:
# prime cached service state with what we (should have)
# just created
sd = orchestrator.DaemonDescription()
- sd.daemon_type = daemon_type
- sd.daemon_id = daemon_id
- sd.hostname = host
+ sd.daemon_type = daemon_spec.daemon_type
+ sd.daemon_id = daemon_spec.daemon_id
+ sd.hostname = daemon_spec.host
sd.status = 1
sd.status_desc = 'starting'
- self.cache.add_daemon(host, sd)
- if daemon_type in ['grafana', 'iscsi', 'prometheus', 'alertmanager', 'nfs']:
- self.requires_post_actions.add(daemon_type)
- self.cache.invalidate_host_daemons(host)
- self.cache.update_daemon_config_deps(host, name, deps, start_time)
- self.cache.save_host(host)
+ self.cache.add_daemon(daemon_spec.host, sd)
+ if daemon_spec.daemon_type in ['grafana', 'iscsi', 'prometheus', 'alertmanager', 'nfs']:
+ self.requires_post_actions.add(daemon_spec.daemon_type)
+ self.cache.invalidate_host_daemons(daemon_spec.host)
+ self.cache.update_daemon_config_deps(daemon_spec.host, daemon_spec.name(), deps, start_time)
+ self.cache.save_host(daemon_spec.host)
return "{} {} on host '{}'".format(
- 'Reconfigured' if reconfig else 'Deployed', name, host)
+ 'Reconfigured' if reconfig else 'Deployed', daemon_spec.name(), daemon_spec.host)
@forall_hosts
def _remove_daemons(self, name, host) -> str:
self.log.info('Reconfiguring %s (monmap changed)...' % dd.name())
reconfig = True
if reconfig:
- self._create_daemon(dd.daemon_type, dd.daemon_id,
- dd.hostname, reconfig=True)
+ self._create_daemon(
+ CephadmDaemonSpec(
+ host=dd.hostname,
+ daemon_id=dd.daemon_id,
+ daemon_type=dd.daemon_type),
+ reconfig=True)
# do daemon post actions
for daemon_type, daemon_descs in daemons_post.items():
import json
import logging
from abc import ABCMeta, abstractmethod
-from typing import TYPE_CHECKING, List, Callable, Any, TypeVar, Generic
+from typing import TYPE_CHECKING, List, Callable, Any, TypeVar, Generic, Optional, Dict, Any, Tuple
from mgr_module import MonCommandFailed
class CephadmDaemonSpec(Generic[ServiceSpecs]):
# typing.NamedTuple + Generic is broken in py36
- def __init__(self, host, daemon_id, spec: ServiceSpecs, network):
+ def __init__(self, host, daemon_id,
+ spec: Optional[ServiceSpecs]=None,
+ network: Optional[str]=None,
+ keyring: Optional[str]=None,
+ extra_args: Optional[List[str]]=None,
+ extra_config: Optional[Dict[str, Any]]=None,
+ daemon_type: Optional[str]=None):
+ """
+ Used for
+ * deploying new daemons. then everything is set
+ * redeploying existing daemons, then only the first three attrs are set.
+
+ Would be great to have a consistent usage where all properties are set.
+ """
self.host = host
self.daemon_id = daemon_id
- self.spec: ServiceSpecs = spec
- self.network = network # mons
+ daemon_type = daemon_type or (spec.service_type if spec else None)
+ assert daemon_type is not None
+ self.daemon_type: str = daemon_type
+
+ # would be great to have the spec always available:
+ self.spec: Optional[ServiceSpecs] = spec
+
+ # mons
+ self.network = network
+
+ # for run_cephadm.
+ self.keyring: Optional[str] = keyring
+
+ # For run_cephadm. Would be great to have more expressive names.
+ self.extra_args: List[str] = extra_args or []
+ self.extra_config: Dict[str, Any] = extra_config or {}
+
+
+ def name(self) -> str:
+ return '%s.%s' % (self.daemon_type, self.daemon_id)
+
class CephadmService(metaclass=ABCMeta):
"""
def create(self, daemon_spec: CephadmDaemonSpec):
raise NotImplementedError()
+ def generate_config(self, daemon_spec: CephadmDaemonSpec) -> Tuple[Dict[str, Any], List[str]]:
+ # Ceph.daemons (mon, mgr, mds, osd, etc)
+ cephadm_config = self.mgr._get_config_and_keyring(
+ daemon_spec.daemon_type,
+ daemon_spec.daemon_id,
+ host=daemon_spec.host,
+ keyring=daemon_spec.keyring,
+ extra_ceph_config=daemon_spec.extra_config.pop('config', ''))
+
+
+ if daemon_spec.extra_config:
+ cephadm_config.update({'files': daemon_spec.extra_config})
+
+ return cephadm_config, []
+
+
def daemon_check_post(self, daemon_descrs: List[DaemonDescription]):
"""The post actions needed to be done after daemons are checked"""
if self.mgr.config_dashboard:
raise OrchestratorError('public_network is set but does not look like a CIDR network: \'%s\'' % network)
extra_config += 'public network = %s\n' % network
- return self.mgr._create_daemon('mon', name, host,
- keyring=keyring,
- extra_config={'config': extra_config})
+ daemon_spec.extra_config={'config': extra_config}
+ daemon_spec.keyring=keyring
+
+ return self.mgr._create_daemon(daemon_spec)
def _check_safe_to_destroy(self, mon_id):
# type: (str) -> None
'mds', 'allow *'],
})
- return self.mgr._create_daemon('mgr', mgr_id, host, keyring=keyring)
+ daemon_spec.keyring = keyring
+
+ return self.mgr._create_daemon(daemon_spec)
class MdsService(CephadmService):
'osd', 'allow rw tag cephfs *=*',
'mds', 'allow'],
})
- return self.mgr._create_daemon('mds', mds_id, host, keyring=keyring)
+ daemon_spec.keyring = keyring
+
+ return self.mgr._create_daemon(daemon_spec)
class RgwService(CephadmService):
'mgr', 'allow rw',
'osd', 'allow rwx'],
})
- return self.mgr._create_daemon('rgw', rgw_id, host, keyring=keyring)
+
+ daemon_spec.keyring = keyring
+
+ return self.mgr._create_daemon(daemon_spec)
class RbdMirrorService(CephadmService):
'caps': ['mon', 'profile rbd-mirror',
'osd', 'profile rbd'],
})
- return self.mgr._create_daemon('rbd-mirror', daemon_id, host,
- keyring=keyring)
+
+ daemon_spec.keyring = keyring
+
+ return self.mgr._create_daemon(daemon_spec)
class CrashService(CephadmService):
'caps': ['mon', 'profile crash',
'mgr', 'profile crash'],
})
- return self.mgr._create_daemon('crash', daemon_id, host, keyring=keyring)
+
+ daemon_spec.keyring = keyring
+
+ return self.mgr._create_daemon(daemon_spec)
from mgr_module import MonCommandFailed
from ceph.deployment.service_spec import IscsiServiceSpec
-from orchestrator import DaemonDescription
+from orchestrator import DaemonDescription, OrchestratorError
from .cephadmservice import CephadmService, CephadmDaemonSpec
from .. import utils
def create(self, daemon_spec: CephadmDaemonSpec[IscsiServiceSpec]) -> str:
spec = daemon_spec.spec
+ if spec is None:
+ raise OrchestratorError(f'Unable to deploy {daemon_spec.name()}: Service not found.')
igw_id = daemon_spec.daemon_id
- host = daemon_spec.host
ret, keyring, err = self.mgr.check_mon_command({
'prefix': 'auth get-or-create',
'entity': utils.name_to_auth_entity('iscsi', igw_id),
'spec': spec
}
igw_conf = self.mgr.template.render('services/iscsi/iscsi-gateway.cfg.j2', context)
- extra_config = {'iscsi-gateway.cfg': igw_conf}
- return self.mgr._create_daemon('iscsi', igw_id, host, keyring=keyring,
- extra_config=extra_config)
+
+ daemon_spec.keyring = keyring
+ daemon_spec.extra_config = {'iscsi-gateway.cfg': igw_conf}
+
+ return self.mgr._create_daemon(daemon_spec)
+
def config_dashboard(self, daemon_descrs: List[DaemonDescription]):
def get_set_cmd_dicts(out: str) -> List[dict]:
DEFAULT_SERVICE_PORT = 3000
def create(self, daemon_spec: CephadmDaemonSpec):
- daemon_id, host = daemon_spec.daemon_id, daemon_spec.host
+ return self.mgr._create_daemon(daemon_spec)
- return self.mgr._create_daemon('grafana', daemon_id, host)
-
- def generate_config(self):
- # type: () -> Tuple[Dict[str, Any], List[str]]
+ def generate_config(self, daemon_spec: CephadmDaemonSpec) -> Tuple[Dict[str, Any], List[str]]:
deps = [] # type: List[str]
prom_services = [] # type: List[str]
DEFAULT_SERVICE_PORT = 9093
def create(self, daemon_spec: CephadmDaemonSpec):
- daemon_id, host = daemon_spec.daemon_id, daemon_spec.host
-
- return self.mgr._create_daemon('alertmanager', daemon_id, host)
+ return self.mgr._create_daemon(daemon_spec)
- def generate_config(self):
- # type: () -> Tuple[Dict[str, Any], List[str]]
+ def generate_config(self, daemon_spec: CephadmDaemonSpec) -> Tuple[Dict[str, Any], List[str]]:
deps = [] # type: List[str]
# dashboard(s)
DEFAULT_SERVICE_PORT = 9095
def create(self, daemon_spec: CephadmDaemonSpec):
- daemon_id, host = daemon_spec.daemon_id, daemon_spec.host
+ return self.mgr._create_daemon(daemon_spec)
- return self.mgr._create_daemon('prometheus', daemon_id, host)
-
- def generate_config(self):
- # type: () -> Tuple[Dict[str, Any], List[str]]
+ def generate_config(self, daemon_spec: CephadmDaemonSpec) -> Tuple[Dict[str, Any], List[str]]:
deps = [] # type: List[str]
# scrape mgrs
TYPE = 'node-exporter'
def create(self, daemon_spec: CephadmDaemonSpec):
- daemon_id, host = daemon_spec.daemon_id, daemon_spec.host
-
- return self.mgr._create_daemon('node-exporter', daemon_id, host)
+ return self.mgr._create_daemon(daemon_spec)
- def generate_config(self) -> Tuple[Dict[str, Any], List[str]]:
+ def generate_config(self, daemon_spec: CephadmDaemonSpec) -> Tuple[Dict[str, Any], List[str]]:
return {}, []
class NFSService(CephadmService):
TYPE = 'nfs'
- def _generate_nfs_config(self, daemon_type, daemon_id, host):
- # type: (str, str, str) -> Tuple[Dict[str, Any], List[str]]
+ def generate_config(self, daemon_spec: CephadmDaemonSpec) -> Tuple[Dict[str, Any], List[str]]:
+ daemon_type = daemon_spec.daemon_type
+ daemon_id = daemon_spec.daemon_id
+ host = daemon_spec.host
+
deps = [] # type: List[str]
# find the matching NFSServiceSpec
self.mgr.spec_store.save(spec)
def create(self, daemon_spec: CephadmDaemonSpec[NFSServiceSpec]):
+ daemon_id = daemon_spec.daemon_id
+ host = daemon_spec.host
+ spec = daemon_spec.spec
logger.info('Create daemon %s on host %s with spec %s' % (
daemon_id, host, spec))
- return self.mgr._create_daemon('nfs', daemon_spec.daemon_id, daemon_spec.host)
+ return self.mgr._create_daemon(daemon_spec)
def config_dashboard(self, daemon_descrs: List[DaemonDescription]):
from orchestrator import OrchestratorError
from mgr_module import MonCommandFailed
-from cephadm.services.cephadmservice import CephadmService
-
+from cephadm.services.cephadmservice import CephadmService, CephadmDaemonSpec
logger = logging.getLogger(__name__)
continue
created.append(osd_id)
+ daemon_spec: CephadmDaemonSpec = CephadmDaemonSpec(
+ daemon_id=osd_id,
+ host=host,
+ daemon_type='osd',
+ )
self.mgr._create_daemon(
- 'osd', osd_id, host,
+ daemon_spec,
osd_uuid_map=osd_uuid_map)
if created: