from ceph.deployment.drive_group import DriveGroupSpec
from ceph.deployment.service_spec import \
NFSServiceSpec, ServiceSpec, PlacementSpec, assert_valid_host
+from cephadm.services.cephadmservice import CephadmDaemonSpec
from mgr_module import MgrModule, HandleCommandResult
import orchestrator
@trivial_completion
def create_osds(self, drive_group: DriveGroupSpec):
- return self.osd_service.create(drive_group)
+ return self.osd_service.create_from_spec(drive_group)
@trivial_completion
def preview_osdspecs(self,
self.cache.invalidate_host_daemons(host)
return "Removed {} from host '{}'".format(name, host)
- def _create_fn(self, service_type: str) -> Callable[..., str]:
- try:
- d: Dict[str, function] = {
- 'mon': self.mon_service.create,
- 'mgr': self.mgr_service.create,
- 'osd': self.osd_service.create,
- 'mds': self.mds_service.create,
- 'rgw': self.rgw_service.create,
- 'rbd-mirror': self.rbd_mirror_service.create,
- 'nfs': self.nfs_service.create,
- 'grafana': self.grafana_service.create,
- 'alertmanager': self.alertmanager_service.create,
- 'prometheus': self.prometheus_service.create,
- 'node-exporter': self.node_exporter_service.create,
- 'crash': self.crash_service.create,
- 'iscsi': self.iscsi_service.create,
- }
- return d[service_type] # type: ignore
- except KeyError:
- self.log.exception(f'unknown service type {service_type}')
- raise OrchestratorError(f'unknown service type {service_type}') from e
-
def _config_fn(self, service_type) -> Optional[Callable[[ServiceSpec], None]]:
return {
'mds': self.mds_service.config,
return False
self.log.debug('Applying service %s spec' % service_name)
- create_func = self._create_fn(daemon_type)
config_func = self._config_fn(daemon_type)
if daemon_type == 'osd':
- create_func(spec)
+ self.osd_service.create_from_spec(cast(DriveGroupSpec, spec))
# TODO: return True would result in a busy loop
return False
daemon_id = self.get_unique_name(daemon_type, host, daemons,
prefix=spec.service_id,
forcename=name)
+ daemon_spec = self.cephadm_services[daemon_type].make_daemon_spec(host, daemon_id, network, spec)
self.log.debug('Placing %s.%s on host %s' % (
daemon_type, daemon_id, host))
- if daemon_type == 'mon':
- create_func(daemon_id, host, network) # type: ignore
- elif daemon_type in ['nfs', 'iscsi']:
- create_func(daemon_id, host, spec) # type: ignore
- else:
- create_func(daemon_id, host) # type: ignore
+
+ self.cephadm_services[daemon_type].create(daemon_spec)
# add to daemon list so next name(s) will also be unique
sd = orchestrator.DaemonDescription(
if config_func:
config_func(spec)
- args = [] # type: List[tuple]
+ args = [] # type: List[CephadmDaemonSpec]
for host, network, name in hosts:
daemon_id = self.get_unique_name(daemon_type, host, daemons,
prefix=spec.service_id,
forcename=name)
+ daemon_spec = self.cephadm_services[daemon_type].make_daemon_spec(host, daemon_id, network, spec)
self.log.debug('Placing %s.%s on host %s' % (
daemon_type, daemon_id, host))
- if daemon_type == 'mon':
- args.append((daemon_id, host, network)) # type: ignore
- elif daemon_type in ['nfs', 'iscsi']:
- args.append((daemon_id, host, spec)) # type: ignore
- else:
- args.append((daemon_id, host)) # type: ignore
+ args.append(daemon_spec)
# add to daemon list so next name(s) will also be unique
sd = orchestrator.DaemonDescription(
import json
import logging
from abc import ABCMeta, abstractmethod
-from typing import TYPE_CHECKING, List, Callable, Any
+from typing import TYPE_CHECKING, List, Callable, Any, TypeVar, Generic
from mgr_module import MonCommandFailed
logger = logging.getLogger(__name__)
+ServiceSpecs = TypeVar('ServiceSpecs', bound=ServiceSpec)
+
+
+class CephadmDaemonSpec(Generic[ServiceSpecs]):
+ # typing.NamedTuple + Generic is broken in py36
+ def __init__(self, host, daemon_id, spec: ServiceSpecs, network):
+ self.host = host
+ self.daemon_id = daemon_id
+ self.spec: ServiceSpecs = spec
+ self.network = network # mons
class CephadmService(metaclass=ABCMeta):
"""
def __init__(self, mgr: "CephadmOrchestrator"):
self.mgr: "CephadmOrchestrator" = mgr
+ def make_daemon_spec(self, host, daemon_id, netowrk, spec: ServiceSpecs) -> CephadmDaemonSpec:
+ return CephadmDaemonSpec(
+ host=host,
+ daemon_id=daemon_id,
+ spec=spec,
+ network=netowrk
+ )
+
+ def create(self, daemon_spec: CephadmDaemonSpec):
+ raise NotImplementedError()
+
def daemon_check_post(self, daemon_descrs: List[DaemonDescription]):
"""The post actions needed to be done after daemons are checked"""
if self.mgr.config_dashboard:
class MonService(CephadmService):
TYPE = 'mon'
- def create(self, name, host, network):
+ def create(self, daemon_spec: CephadmDaemonSpec):
"""
Create a new monitor on the given host.
"""
+ name, host, network = daemon_spec.daemon_id, daemon_spec.host, daemon_spec.network
+
# get mon. key
ret, keyring, err = self.mgr.check_mon_command({
'prefix': 'auth get',
class MgrService(CephadmService):
TYPE = 'mgr'
- def create(self, mgr_id, host):
+ def create(self, daemon_spec: CephadmDaemonSpec):
"""
Create a new manager instance on a host.
"""
+ mgr_id, host = daemon_spec.daemon_id, daemon_spec.host
# get mgr. key
ret, keyring, err = self.mgr.check_mon_command({
'prefix': 'auth get-or-create',
'value': spec.service_id,
})
- def create(self, mds_id, host) -> str:
+ def create(self, daemon_spec: CephadmDaemonSpec):
+ mds_id, host = daemon_spec.daemon_id, daemon_spec.host
+
# get mgr. key
ret, keyring, err = self.mgr.check_mon_command({
'prefix': 'auth get-or-create',
spec.service_name(), spec.placement.pretty_str()))
self.mgr.spec_store.save(spec)
- def create(self, rgw_id, host) -> str:
+ def create(self, daemon_spec: CephadmDaemonSpec):
+ rgw_id, host = daemon_spec.daemon_id, daemon_spec.host
ret, keyring, err = self.mgr.check_mon_command({
'prefix': 'auth get-or-create',
'entity': f"{utils.name_to_config_section('rgw')}.{rgw_id}",
class RbdMirrorService(CephadmService):
TYPE = 'rbd-mirror'
- def create(self, daemon_id, host) -> str:
+ def create(self, daemon_spec: CephadmDaemonSpec):
+ daemon_id, host = daemon_spec.daemon_id, daemon_spec.host
+
ret, keyring, err = self.mgr.check_mon_command({
'prefix': 'auth get-or-create',
'entity': 'client.rbd-mirror.' + daemon_id,
class CrashService(CephadmService):
TYPE = 'crash'
- def create(self, daemon_id, host) -> str:
+ def create(self, daemon_spec: CephadmDaemonSpec):
+ daemon_id, host = daemon_spec.daemon_id, daemon_spec.host
+
ret, keyring, err = self.mgr.check_mon_command({
'prefix': 'auth get-or-create',
'entity': 'client.crash.' + host,
from ceph.deployment.service_spec import IscsiServiceSpec
from orchestrator import DaemonDescription
-from .cephadmservice import CephadmService
+from .cephadmservice import CephadmService, CephadmDaemonSpec
from .. import utils
logger = logging.getLogger(__name__)
spec.service_name(), spec.placement.pretty_str()))
self.mgr.spec_store.save(spec)
- def create(self, igw_id, host, spec) -> str:
+ def create(self, daemon_spec: CephadmDaemonSpec[IscsiServiceSpec]) -> str:
+ spec = daemon_spec.spec
+ igw_id = daemon_spec.daemon_id
+ host = daemon_spec.host
ret, keyring, err = self.mgr.check_mon_command({
'prefix': 'auth get-or-create',
'entity': utils.name_to_auth_entity('iscsi', igw_id),
from typing import List, Any, Tuple, Dict
from orchestrator import DaemonDescription
-from cephadm.services.cephadmservice import CephadmService
+from cephadm.services.cephadmservice import CephadmService, CephadmDaemonSpec
from mgr_util import verify_tls, ServerConfigException, create_self_signed_cert
logger = logging.getLogger(__name__)
TYPE = 'grafana'
DEFAULT_SERVICE_PORT = 3000
- def create(self, daemon_id, host):
- # type: (str, str) -> str
+ def create(self, daemon_spec: CephadmDaemonSpec):
+ daemon_id, host = daemon_spec.daemon_id, daemon_spec.host
+
return self.mgr._create_daemon('grafana', daemon_id, host)
def generate_config(self):
TYPE = 'alertmanager'
DEFAULT_SERVICE_PORT = 9093
- def create(self, daemon_id, host) -> str:
+ def create(self, daemon_spec: CephadmDaemonSpec):
+ daemon_id, host = daemon_spec.daemon_id, daemon_spec.host
+
return self.mgr._create_daemon('alertmanager', daemon_id, host)
def generate_config(self):
TYPE = 'prometheus'
DEFAULT_SERVICE_PORT = 9095
- def create(self, daemon_id, host) -> str:
+ def create(self, daemon_spec: CephadmDaemonSpec):
+ daemon_id, host = daemon_spec.daemon_id, daemon_spec.host
+
return self.mgr._create_daemon('prometheus', daemon_id, host)
def generate_config(self):
class NodeExporterService(CephadmService):
TYPE = 'node-exporter'
- def create(self, daemon_id, host) -> str:
+ def create(self, daemon_spec: CephadmDaemonSpec):
+ daemon_id, host = daemon_spec.daemon_id, daemon_spec.host
+
return self.mgr._create_daemon('node-exporter', daemon_id, host)
def generate_config(self) -> Tuple[Dict[str, Any], List[str]]:
import cephadm
from .. import utils
-from .cephadmservice import CephadmService
+from .cephadmservice import CephadmService, CephadmDaemonSpec
+
logger = logging.getLogger(__name__)
spec.service_name(), spec.placement.pretty_str()))
self.mgr.spec_store.save(spec)
- def create(self, daemon_id, host, spec):
+ def create(self, daemon_spec: CephadmDaemonSpec[NFSServiceSpec]):
logger.info('Create daemon %s on host %s with spec %s' % (
daemon_id, host, spec))
- return self.mgr._create_daemon('nfs', daemon_id, host)
+ return self.mgr._create_daemon('nfs', daemon_spec.daemon_id, daemon_spec.host)
def config_dashboard(self, daemon_descrs: List[DaemonDescription]):
class OSDService(CephadmService):
TYPE = 'osd'
- def create(self, drive_group: DriveGroupSpec) -> str:
+ def create_from_spec(self, drive_group: DriveGroupSpec) -> str:
logger.debug(f"Processing DriveGroup {drive_group}")
ret = []
osd_id_claims = self.find_destroyed_osds()