from mgr_util import test_port_allocation, PortAlreadyInUse
from mgr_util import verify_tls_files
import tempfile
+from cephadm.services.service_registry import service_registry
from urllib.error import HTTPError, URLError
from typing import Any, Dict, List, Set, TYPE_CHECKING, Optional, MutableMapping, IO
# we need to know the agent port to try to reconfig w/ http
# otherwise there is no choice but a full ssh reconfig
if host in self.mgr.agent_cache.agent_ports and root_cert_match and not down:
- daemon_spec = self.mgr.cephadm_services[daemon_type_to_service(
- daemon_spec.daemon_type)].prepare_create(daemon_spec)
+ daemon_spec = service_registry.get_service(daemon_type_to_service(
+ daemon_spec.daemon_type)).prepare_create(daemon_spec)
self.mgr.agent_helpers._request_agent_acks(
hosts={daemon_spec.host},
increment=True,
import string
from typing import List, Dict, Optional, Callable, Tuple, TypeVar, \
- Any, Set, TYPE_CHECKING, cast, NamedTuple, Sequence, Type, \
+ Any, Set, TYPE_CHECKING, cast, NamedTuple, Sequence, \
Awaitable, Iterator
import datetime
from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
from cephadm.http_server import CephadmHttpServer
from cephadm.agent import CephadmAgentHelpers
+from cephadm.services.service_registry import service_registry
from mgr_module import (
from . import utils
from . import ssh
from .migrations import Migrations
-from .services.cephadmservice import MonService, MgrService, MdsService, RgwService, \
- RbdMirrorService, CrashService, CephadmService, CephfsMirrorService, CephadmAgent, \
- CephExporterService
-from .services.ingress import IngressService
+from .services.cephadmservice import MgrService, RgwService
from .services.container import CustomContainerService
from .services.iscsi import IscsiService
-from .services.nvmeof import NvmeofService
from .services.mgmt_gateway import MgmtGatewayService
-from .services.oauth2_proxy import OAuth2ProxyService
-from .services.nfs import NFSService
from .services.osd import OSDRemovalQueue, OSDService, OSD, NotFoundError
-from .services.monitoring import GrafanaService, AlertmanagerService, PrometheusService, \
- NodeExporterService, SNMPGatewayService, LokiService, PromtailService
-from .services.jaeger import ElasticSearchService, JaegerAgentService, JaegerCollectorService, JaegerQueryService
+from .services.monitoring import AlertmanagerService, PrometheusService
from .services.node_proxy import NodeProxy
from .services.smb import SMBService
from .schedule import HostAssignment
self.migration = Migrations(self)
- _service_classes: Sequence[Type[CephadmService]] = [
- AlertmanagerService,
- CephExporterService,
- CephadmAgent,
- CephfsMirrorService,
- CrashService,
- CustomContainerService,
- ElasticSearchService,
- GrafanaService,
- IngressService,
- IscsiService,
- JaegerAgentService,
- JaegerCollectorService,
- JaegerQueryService,
- LokiService,
- MdsService,
- MgrService,
- MonService,
- NFSService,
- NodeExporterService,
- NodeProxy,
- NvmeofService,
- OSDService,
- PrometheusService,
- PromtailService,
- RbdMirrorService,
- RgwService,
- SMBService,
- SNMPGatewayService,
- MgmtGatewayService,
- OAuth2ProxyService,
- ]
-
- # https://github.com/python/mypy/issues/8993
- self.cephadm_services: Dict[str, CephadmService] = {
- cls.TYPE: cls(self) for cls in _service_classes} # type: ignore
+ service_registry.init_services(self)
- self.mgr_service: MgrService = cast(MgrService, self.cephadm_services['mgr'])
- self.osd_service: OSDService = cast(OSDService, self.cephadm_services['osd'])
- self.iscsi_service: IscsiService = cast(IscsiService, self.cephadm_services['iscsi'])
- self.nvmeof_service: NvmeofService = cast(NvmeofService, self.cephadm_services['nvmeof'])
- self.node_proxy_service: NodeProxy = cast(NodeProxy, self.cephadm_services['node-proxy'])
- self.rgw_service: RgwService = cast(RgwService, self.cephadm_services['rgw'])
+ self.mgr_service: MgrService = cast(MgrService, service_registry.get_service('mgr'))
+ self.osd_service: OSDService = cast(OSDService, service_registry.get_service('osd'))
+ self.rgw_service: RgwService = cast(RgwService, service_registry.get_service('rgw'))
+ self.node_proxy_service: NodeProxy = cast(NodeProxy, service_registry.get_service('node-proxy'))
+ self.iscsi_service: IscsiService = cast(IscsiService, service_registry.get_service('iscsi')) # used for UT only
self.scheduled_async_actions: List[Callable] = []
self.run = False
self.event.set()
- def _get_cephadm_service(self, service_type: str) -> CephadmService:
- assert service_type in ServiceSpec.KNOWN_SERVICE_TYPES
- return self.cephadm_services[service_type]
-
def get_fqdn(self, hostname: str) -> str:
"""Get a host's FQDN with its hostname.
self.log.info(f"removing: {d.name()}")
if d.daemon_type != 'osd':
- self.cephadm_services[daemon_type_to_service(str(d.daemon_type))].pre_remove(d)
- self.cephadm_services[daemon_type_to_service(
- str(d.daemon_type))].post_remove(d, is_failed_deploy=False)
+ service_registry.get_service(daemon_type_to_service(str(d.daemon_type))).pre_remove(d)
+ service_registry.get_service(daemon_type_to_service(
+ str(d.daemon_type))).post_remove(d, is_failed_deploy=False)
else:
cmd_args = {
'prefix': 'osd purge-actual',
error_notifications: List[str] = []
okay: bool = True
for daemon_type, daemon_ids in daemon_map.items():
- r = self.cephadm_services[daemon_type_to_service(
- daemon_type)].ok_to_stop(daemon_ids, force=force)
+ r = service_registry.get_service(daemon_type_to_service(
+ daemon_type)).ok_to_stop(daemon_ids, force=force)
if r.retval:
okay = False
# collect error notifications so user can see every daemon causing host
# deploy a new keyring file
if daemon_spec.daemon_type != 'osd':
- daemon_spec = self.cephadm_services[daemon_type_to_service(
- daemon_spec.daemon_type)].prepare_create(daemon_spec)
+ daemon_spec = service_registry.get_service(daemon_type_to_service(
+ daemon_spec.daemon_type)).prepare_create(daemon_spec)
with self.async_timeout_handler(daemon_spec.host, f'cephadm deploy ({daemon_spec.daemon_type} daemon)'):
self.wait_async(CephadmServe(self)._create_daemon(daemon_spec, reconfig=True))
if action == 'redeploy' or action == 'reconfig':
if daemon_spec.daemon_type != 'osd':
- daemon_spec = self.cephadm_services[daemon_type_to_service(
- daemon_spec.daemon_type)].prepare_create(daemon_spec)
+ daemon_spec = service_registry.get_service(daemon_type_to_service(
+ daemon_spec.daemon_type)).prepare_create(daemon_spec)
else:
# for OSDs, we still need to update config, just not carry out the full
# prepare_create function
daemon_type: str,
daemon_id: str) -> List[str]:
svc_type = daemon_type_to_service(daemon_type)
- svc_cls = self.cephadm_services.get(svc_type, None)
+ svc_cls = service_registry.get_service(svc_type)
deps = svc_cls.get_dependencies(self, spec, daemon_type) if svc_cls else []
return sorted(deps)
forcename=name)
if not did_config:
- self.cephadm_services[service_type].config(spec)
+ service_registry.get_service(service_type).config(spec)
did_config = True
- daemon_spec = self.cephadm_services[service_type].make_daemon_spec(
+ daemon_spec = service_registry.get_service(service_type).make_daemon_spec(
host, daemon_id, network, spec,
# NOTE: this does not consider port conflicts!
ports=spec.get_port_start())
@forall_hosts
def create_func_map(*args: Any) -> str:
- daemon_spec = self.cephadm_services[daemon_type].prepare_create(*args)
+ daemon_spec = service_registry.get_service(daemon_type).prepare_create(*args)
with self.async_timeout_handler(daemon_spec.host, f'cephadm deploy ({daemon_spec.daemon_type} daemon)'):
return self.wait_async(CephadmServe(self)._create_daemon(daemon_spec))
'service_type': spec.service_type,
'data': self._preview_osdspecs(osdspecs=[cast(DriveGroupSpec, spec)])}
- svc = self.cephadm_services[spec.service_type]
+ svc = service_registry.get_service(spec.service_type)
rank_map = None
if svc.ranked(spec):
rank_map = self.spec_store[spec.service_name()].rank_map
draining_hosts=self.cache.get_draining_hosts(),
networks=self.cache.networks,
daemons=self.cache.get_daemons_by_service(spec.service_name()),
- allow_colo=self.cephadm_services[spec.service_type].allow_colo(),
+ allow_colo=service_registry.get_service(spec.service_type).allow_colo(),
).validate()
self.log.info('Saving service %s spec with placement %s' % (
CephadmNoImage, CEPH_TYPES, ContainerInspectInfo, SpecialHostLabels
from mgr_module import MonCommandFailed
from mgr_util import format_bytes, verify_tls, get_cert_issuer_info, ServerConfigException
+from cephadm.services.service_registry import service_registry
from . import utils
from . import exchange
daemon_type_to_service(cast(str, dd.daemon_type))
for dd in managed
}
- _services = [self.mgr.cephadm_services[dt] for dt in svcs]
+ _services = [service_registry.get_service(dt) for dt in svcs]
def _filter(
service_type: str, daemon_id: str, name: str
# return a solid indication
return False
- svc = self.mgr.cephadm_services[service_type]
+ svc = service_registry.get_service(service_type)
daemons = self.mgr.cache.get_daemons_by_service(service_name)
related_service_daemons = self.mgr.cache.get_related_service_daemons(spec)
if dd.daemon_type in REQUIRES_POST_ACTIONS:
daemons_post[dd.daemon_type].append(dd)
- if self.mgr.cephadm_services[daemon_type_to_service(dd.daemon_type)].get_active_daemon(
+ if service_registry.get_service(daemon_type_to_service(dd.daemon_type)).get_active_daemon(
self.mgr.cache.get_daemons_by_service(dd.service_name())).daemon_id == dd.daemon_id:
dd.is_active = True
else:
self.mgr.requires_post_actions.remove(d.name())
run_post = True
if run_post:
- self.mgr._get_cephadm_service(daemon_type_to_service(
+ service_registry.get_service(daemon_type_to_service(
daemon_type)).daemon_check_post(daemon_descs)
def _purge_deleted_services(self) -> None:
logger.info(f'Purge service {service_name}')
- self.mgr.cephadm_services[spec.service_type].purge(service_name)
+ service_registry.get_service(spec.service_type).purge(service_name)
self.mgr.spec_store.finally_rm(service_name)
def convert_tags_to_repo_digest(self) -> None:
# we have to clean up the daemon. E.g. keyrings.
servict_type = daemon_type_to_service(daemon_spec.daemon_type)
dd = daemon_spec.to_daemon_description(DaemonDescriptionStatus.error, 'failed')
- self.mgr.cephadm_services[servict_type].post_remove(dd, is_failed_deploy=True)
+ service_registry.get_service(servict_type).post_remove(dd, is_failed_deploy=True)
raise
def _setup_extra_deployment_args(
with set_exception_subject('service', daemon.service_id(), overwrite=True):
- self.mgr.cephadm_services[daemon_type_to_service(daemon_type)].pre_remove(daemon)
+ service_registry.get_service(daemon_type_to_service(daemon_type)).pre_remove(daemon)
# NOTE: we are passing the 'force' flag here, which means
# we can delete a mon instances data.
dd = self.mgr.cache.get_daemon(daemon.daemon_name)
if not no_post_remove:
if daemon_type not in ['iscsi']:
- self.mgr.cephadm_services[daemon_type_to_service(
- daemon_type)].post_remove(daemon, is_failed_deploy=False)
+ service_registry.get_service(daemon_type_to_service(
+ daemon_type)).post_remove(daemon, is_failed_deploy=False)
else:
- self.mgr.scheduled_async_actions.append(lambda: self.mgr.cephadm_services[daemon_type_to_service(
- daemon_type)].post_remove(daemon, is_failed_deploy=False))
+ self.mgr.scheduled_async_actions.append(lambda: service_registry.get_service(daemon_type_to_service(
+ daemon_type)).post_remove(daemon, is_failed_deploy=False))
self.mgr._kick_serve_loop()
self.mgr.recently_altered_daemons[name] = datetime_now()
from orchestrator import OrchestratorError, DaemonDescription, DaemonDescriptionStatus
from orchestrator._interface import daemon_type_to_service
from cephadm import utils
+from .service_registry import register_cephadm_service
if TYPE_CHECKING:
from cephadm.module import CephadmOrchestrator
})
+@register_cephadm_service
class MonService(CephService):
TYPE = 'mon'
logger.error(f'Failed setting crush location for mon {dd.daemon_id}: {e}')
+@register_cephadm_service
class MgrService(CephService):
TYPE = 'mgr'
return HandleCommandResult(0, warn_message, '')
+@register_cephadm_service
class MdsService(CephService):
TYPE = 'mds'
})
+@register_cephadm_service
class RgwService(CephService):
TYPE = 'rgw'
self.mgr.trigger_connect_dashboard_rgw()
+@register_cephadm_service
class RbdMirrorService(CephService):
TYPE = 'rbd-mirror'
return HandleCommandResult(0, warn_message, '')
+@register_cephadm_service
class CrashService(CephService):
TYPE = 'crash'
return daemon_spec
+@register_cephadm_service
class CephExporterService(CephService):
TYPE = 'ceph-exporter'
DEFAULT_SERVICE_PORT = 9926
return self.mgr.cert_mgr.generate_cert(host_fqdn, node_ip)
+@register_cephadm_service
class CephfsMirrorService(CephService):
TYPE = 'cephfs-mirror'
return daemon_spec
+@register_cephadm_service
class CephadmAgent(CephService):
TYPE = 'agent'
from typing import List, Any, Tuple, Dict, cast
from ceph.deployment.service_spec import CustomContainerSpec
+from .service_registry import register_cephadm_service
from .cephadmservice import CephadmService, CephadmDaemonDeploySpec
logger = logging.getLogger(__name__)
+@register_cephadm_service
class CustomContainerService(CephadmService):
TYPE = 'container'
from cephadm import utils
from orchestrator import OrchestratorError, DaemonDescription
from cephadm.services.cephadmservice import CephadmDaemonDeploySpec, CephService
+from .service_registry import register_cephadm_service
if TYPE_CHECKING:
from ..module import CephadmOrchestrator
logger = logging.getLogger(__name__)
+@register_cephadm_service
class IngressService(CephService):
TYPE = 'ingress'
MAX_KEEPALIVED_PASS_LEN = 8
from orchestrator import DaemonDescription, DaemonDescriptionStatus
from .cephadmservice import CephadmDaemonDeploySpec, CephService
+from .service_registry import register_cephadm_service
from .. import utils
if TYPE_CHECKING:
return trusted_ip_list
+@register_cephadm_service
class IscsiService(CephService):
TYPE = 'iscsi'
from typing import List, cast, Optional, TYPE_CHECKING
from cephadm.services.cephadmservice import CephadmService, CephadmDaemonDeploySpec
from ceph.deployment.service_spec import TracingSpec, ServiceSpec
+from .service_registry import register_cephadm_service
from mgr_util import build_url
if TYPE_CHECKING:
from ..module import CephadmOrchestrator
+@register_cephadm_service
class ElasticSearchService(CephadmService):
TYPE = 'elasticsearch'
DEFAULT_SERVICE_PORT = 9200
return daemon_spec
+@register_cephadm_service
class JaegerAgentService(CephadmService):
TYPE = 'jaeger-agent'
DEFAULT_SERVICE_PORT = 6799
return daemon_spec
+@register_cephadm_service
class JaegerCollectorService(CephadmService):
TYPE = 'jaeger-collector'
DEFAULT_SERVICE_PORT = 14250
return daemon_spec
+@register_cephadm_service
class JaegerQueryService(CephadmService):
TYPE = 'jaeger-query'
DEFAULT_SERVICE_PORT = 16686
from orchestrator import DaemonDescription
from ceph.deployment.service_spec import MgmtGatewaySpec, GrafanaSpec, ServiceSpec
from cephadm.services.cephadmservice import CephadmService, CephadmDaemonDeploySpec, get_dashboard_endpoints
+from .service_registry import register_cephadm_service
if TYPE_CHECKING:
from ..module import CephadmOrchestrator
logger = logging.getLogger(__name__)
+@register_cephadm_service
class MgmtGatewayService(CephadmService):
TYPE = 'mgmt-gateway'
SVC_TEMPLATE_PATH = 'services/mgmt-gateway/nginx.conf.j2'
import ipaddress
from mgr_module import HandleCommandResult
+from .service_registry import register_cephadm_service
from orchestrator import DaemonDescription
from ceph.deployment.service_spec import AlertManagerSpec, GrafanaSpec, ServiceSpec, \
logger = logging.getLogger(__name__)
+@register_cephadm_service
class GrafanaService(CephadmService):
TYPE = 'grafana'
DEFAULT_SERVICE_PORT = 3000
return HandleCommandResult(0, warn_message, '')
+@register_cephadm_service
class AlertmanagerService(CephadmService):
TYPE = 'alertmanager'
DEFAULT_SERVICE_PORT = 9093
return HandleCommandResult(0, warn_message, '')
+@register_cephadm_service
class PrometheusService(CephadmService):
TYPE = 'prometheus'
DEFAULT_SERVICE_PORT = 9095
return '/prometheus/federate'
+@register_cephadm_service
class NodeExporterService(CephadmService):
TYPE = 'node-exporter'
DEFAULT_SERVICE_PORT = 9100
return HandleCommandResult(0, out, '')
+@register_cephadm_service
class LokiService(CephadmService):
TYPE = 'loki'
DEFAULT_SERVICE_PORT = 3100
}, sorted(deps)
+@register_cephadm_service
class PromtailService(CephadmService):
TYPE = 'promtail'
DEFAULT_SERVICE_PORT = 9080
}, deps
+@register_cephadm_service
class SNMPGatewayService(CephadmService):
TYPE = 'snmp-gateway'
from mgr_module import NFS_POOL_NAME as POOL_NAME
from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec
+from .service_registry import register_cephadm_service
from orchestrator import DaemonDescription
logger = logging.getLogger(__name__)
+@register_cephadm_service
class NFSService(CephService):
TYPE = 'nfs'
DEFAULT_EXPORTER_PORT = 9587
from urllib.error import HTTPError, URLError
from typing import List, Any, Dict, Tuple, Optional, MutableMapping, TYPE_CHECKING
+from .service_registry import register_cephadm_service
from .cephadmservice import CephadmDaemonDeploySpec, CephService
from ceph.deployment.service_spec import ServiceSpec, PlacementSpec
from ceph.utils import http_req
from ..module import CephadmOrchestrator
+@register_cephadm_service
class NodeProxy(CephService):
TYPE = 'node-proxy'
from orchestrator import OrchestratorError, DaemonDescription, DaemonDescriptionStatus
from .cephadmservice import CephadmDaemonDeploySpec, CephService
+from .service_registry import register_cephadm_service
from .. import utils
logger = logging.getLogger(__name__)
+@register_cephadm_service
class NvmeofService(CephService):
TYPE = 'nvmeof'
PROMETHEUS_PORT = 10008
from orchestrator import DaemonDescription
from ceph.deployment.service_spec import OAuth2ProxySpec
from cephadm.services.cephadmservice import CephadmService, CephadmDaemonDeploySpec
+from .service_registry import register_cephadm_service
logger = logging.getLogger(__name__)
+@register_cephadm_service
class OAuth2ProxyService(CephadmService):
TYPE = 'oauth2-proxy'
SVC_TEMPLATE_PATH = 'services/oauth2-proxy/oauth2-proxy.conf.j2'
from mgr_module import MonCommandFailed
from cephadm.services.cephadmservice import CephadmDaemonDeploySpec, CephService
+from .service_registry import register_cephadm_service
if TYPE_CHECKING:
from cephadm.module import CephadmOrchestrator
logger = logging.getLogger(__name__)
+@register_cephadm_service
class OSDService(CephService):
TYPE = 'osd'
--- /dev/null
+"""
+Cephadm Service Registry
+
+This module provides a centralized service registry for managing and initializing Cephadm services.
+It dynamically discovers, imports, and registers service classes in the services directory, ensuring
+modularity and scalability. The `@register_cephadm_service` decorator relies on an automatic discovery
+mechanism based on `pkgutil` and `importlib` which dynamically import modules, triggering the decorator
+for service registration and eliminating the need for manual imports.
+
+Key Features:
+- Automatically discovers and imports all service modules during initialization.
+- Registers service classes using the `@register_cephadm_service`.
+- Manages the lifecycle of service instances, initialized via `init_services`.
+- Provides a singleton `service_registry` for global access.
+
+Usage:
+1. Define a service class by deriving from the CephadmService base class.
+2. Place @register_cephadm_service decorator above your class to register it automatically.
+3. Call `service_registry.init_services(mgr)` to initialize all registered services.
+4. Access services using `service_registry.get_service(service_type)`.
+"""
+
+import os
+import logging
+from typing import Type, Dict, TYPE_CHECKING
+import importlib
+import pkgutil
+
+if TYPE_CHECKING:
+ from cephadm.module import CephadmOrchestrator
+ from .cephadmservice import CephadmService
+
+logger = logging.getLogger(__name__)
+
+
+class CephadmServiceRegistry:
+ """Centralized registry for Cephadm services."""
+
+ def __init__(self) -> None:
+ self._services: Dict[str, "CephadmService"] = {} # Initialized service instances
+ self._registered_services: Dict[str, Type["CephadmService"]] = {} # Mapping of service types to classes
+
+ def register_service(self, service_type: str, service_cls: Type["CephadmService"]) -> None:
+ """Registers a service class to the registry."""
+ if service_type in self._registered_services:
+ logger.warning(f"Service type '{service_type}' is already registered.")
+ logger.debug(f"Registering service: {service_type}")
+ self._registered_services[service_type] = service_cls
+
+ def init_services(self, mgr: "CephadmOrchestrator") -> None:
+ """Initializes and stores service instances using the provided orchestrator manager."""
+ self._discover_services()
+ for service_type, service_cls in self._registered_services.items():
+ logger.debug(f"Initializing service: {service_type}")
+ self._services[service_type] = service_cls(mgr)
+
+ def _discover_services(self) -> None:
+ """Dynamically discovers and imports all modules in the current directory."""
+ current_package = __name__.rsplit(".", 1)[0] # Get the package name
+ package_path = os.path.dirname(__file__) # Directory of this file
+ for _, module_name, _ in pkgutil.iter_modules([package_path]):
+ logger.debug(f"Importing service module: {current_package}.{module_name}")
+ importlib.import_module(f"{current_package}.{module_name}")
+
+ def get_service(self, service_type: str) -> "CephadmService":
+ """Retrieves an initialized service instance by type."""
+ return self._services[service_type]
+
+
+def register_cephadm_service(cls: Type["CephadmService"]) -> Type["CephadmService"]:
+ """
+ Decorator to register a service class with the global service registry.
+ """
+ service_registry.register_service(str(cls.TYPE), cls)
+ return cls
+
+
+# Singleton instance of the registry
+service_registry = CephadmServiceRegistry()
from mgr_module import HandleCommandResult
from ceph.deployment.service_spec import ServiceSpec, SMBSpec
+from .service_registry import register_cephadm_service
from orchestrator import DaemonDescription
from .cephadmservice import (
logger = logging.getLogger(__name__)
+@register_cephadm_service
class SMBService(CephService):
TYPE = 'smb'
DEFAULT_EXPORTER_PORT = 9922
from unittest.mock import Mock, MagicMock, call, patch, ANY
from cephadm.serve import CephadmServe
-from cephadm.services.cephadmservice import MonService, MgrService, MdsService, RgwService, \
- RbdMirrorService, CrashService, CephadmDaemonDeploySpec
+from cephadm.services.service_registry import service_registry
+from cephadm.services.cephadmservice import MonService, CephadmDaemonDeploySpec
from cephadm.services.iscsi import IscsiService
-from cephadm.services.nfs import NFSService
from cephadm.services.nvmeof import NvmeofService
-from cephadm.services.osd import OSDService
-from cephadm.services.monitoring import GrafanaService, AlertmanagerService, PrometheusService, \
- NodeExporterService, LokiService, PromtailService
+from cephadm.services.monitoring import GrafanaService, AlertmanagerService, PrometheusService
from cephadm.services.smb import SMBSpec
from cephadm.module import CephadmOrchestrator
from ceph.deployment.service_spec import (
service._set_value_on_dashboard('svc', 'get-cmd', 'set-cmd', service_url)
mgr.check_mon_command.assert_called_once_with({'prefix': 'get-cmd'})
- def _get_services(self, mgr):
- # services:
- osd_service = OSDService(mgr)
- nfs_service = NFSService(mgr)
- mon_service = MonService(mgr)
- mgr_service = MgrService(mgr)
- mds_service = MdsService(mgr)
- rgw_service = RgwService(mgr)
- rbd_mirror_service = RbdMirrorService(mgr)
- grafana_service = GrafanaService(mgr)
- alertmanager_service = AlertmanagerService(mgr)
- prometheus_service = PrometheusService(mgr)
- node_exporter_service = NodeExporterService(mgr)
- loki_service = LokiService(mgr)
- promtail_service = PromtailService(mgr)
- crash_service = CrashService(mgr)
- iscsi_service = IscsiService(mgr)
- nvmeof_service = NvmeofService(mgr)
- cephadm_services = {
- 'mon': mon_service,
- 'mgr': mgr_service,
- 'osd': osd_service,
- 'mds': mds_service,
- 'rgw': rgw_service,
- 'rbd-mirror': rbd_mirror_service,
- 'nfs': nfs_service,
- 'grafana': grafana_service,
- 'alertmanager': alertmanager_service,
- 'prometheus': prometheus_service,
- 'node-exporter': node_exporter_service,
- 'loki': loki_service,
- 'promtail': promtail_service,
- 'crash': crash_service,
- 'iscsi': iscsi_service,
- 'nvmeof': nvmeof_service,
- }
- return cephadm_services
-
def test_get_auth_entity(self):
mgr = FakeMgr()
- cephadm_services = self._get_services(mgr)
+ service_registry.init_services(mgr)
for daemon_type in ['rgw', 'rbd-mirror', 'nfs', "iscsi"]:
assert "client.%s.id1" % (daemon_type) == \
- cephadm_services[daemon_type].get_auth_entity("id1", "host")
+ service_registry.get_service(daemon_type).get_auth_entity("id1", "host")
assert "client.%s.id1" % (daemon_type) == \
- cephadm_services[daemon_type].get_auth_entity("id1", "")
+ service_registry.get_service(daemon_type).get_auth_entity("id1", "")
assert "client.%s.id1" % (daemon_type) == \
- cephadm_services[daemon_type].get_auth_entity("id1")
+ service_registry.get_service(daemon_type).get_auth_entity("id1")
assert "client.crash.host" == \
- cephadm_services["crash"].get_auth_entity("id1", "host")
+ service_registry.get_service('crash').get_auth_entity("id1", "host")
with pytest.raises(OrchestratorError):
- cephadm_services["crash"].get_auth_entity("id1", "")
- cephadm_services["crash"].get_auth_entity("id1")
+ service_registry.get_service('crash').get_auth_entity("id1", "")
+ service_registry.get_service('crash').get_auth_entity("id1")
- assert "mon." == cephadm_services["mon"].get_auth_entity("id1", "host")
- assert "mon." == cephadm_services["mon"].get_auth_entity("id1", "")
- assert "mon." == cephadm_services["mon"].get_auth_entity("id1")
+ assert "mon." == service_registry.get_service('mon').get_auth_entity("id1", "host")
+ assert "mon." == service_registry.get_service('mon').get_auth_entity("id1", "")
+ assert "mon." == service_registry.get_service('mon').get_auth_entity("id1")
- assert "mgr.id1" == cephadm_services["mgr"].get_auth_entity("id1", "host")
- assert "mgr.id1" == cephadm_services["mgr"].get_auth_entity("id1", "")
- assert "mgr.id1" == cephadm_services["mgr"].get_auth_entity("id1")
+ assert "mgr.id1" == service_registry.get_service('mgr').get_auth_entity("id1", "host")
+ assert "mgr.id1" == service_registry.get_service('mgr').get_auth_entity("id1", "")
+ assert "mgr.id1" == service_registry.get_service('mgr').get_auth_entity("id1")
for daemon_type in ["osd", "mds"]:
assert "%s.id1" % daemon_type == \
- cephadm_services[daemon_type].get_auth_entity("id1", "host")
+ service_registry.get_service(daemon_type).get_auth_entity("id1", "host")
assert "%s.id1" % daemon_type == \
- cephadm_services[daemon_type].get_auth_entity("id1", "")
+ service_registry.get_service(daemon_type).get_auth_entity("id1", "")
assert "%s.id1" % daemon_type == \
- cephadm_services[daemon_type].get_auth_entity("id1")
+ service_registry.get_service(daemon_type).get_auth_entity("id1")
# services based on CephadmService shouldn't have get_auth_entity
with pytest.raises(AttributeError):
for daemon_type in ['grafana', 'alertmanager', 'prometheus', 'node-exporter', 'loki', 'promtail']:
- cephadm_services[daemon_type].get_auth_entity("id1", "host")
- cephadm_services[daemon_type].get_auth_entity("id1", "")
- cephadm_services[daemon_type].get_auth_entity("id1")
+ service_registry.get_service(daemon_type).get_auth_entity("id1", "host")
+ service_registry.get_service(daemon_type).get_auth_entity("id1", "")
+ service_registry.get_service(daemon_type).get_auth_entity("id1")
class TestISCSIService:
with with_host(cephadm_module, 'test'):
with with_service(cephadm_module, ServiceSpec('mgr')) as _, \
with_service(cephadm_module, GrafanaSpec(initial_admin_password='secure')):
- out = cephadm_module.cephadm_services['grafana'].generate_config(
+ out = service_registry.get_service('grafana').generate_config(
CephadmDaemonDeploySpec('test', 'daemon', 'grafana'))
assert out == (
{
with with_host(cephadm_module, 'test'):
with with_service(cephadm_module, ServiceSpec('mgr')) as _, \
with_service(cephadm_module, GrafanaSpec(anonymous_access=False, initial_admin_password='secure')):
- out = cephadm_module.cephadm_services['grafana'].generate_config(
+ out = service_registry.get_service('grafana').generate_config(
CephadmDaemonDeploySpec('test', 'daemon', 'grafana'))
assert out == (
{
]
_get_daemons_by_service.return_value = nfs_daemons
- haproxy_generated_conf = cephadm_module.cephadm_services['ingress'].haproxy_generate_config(
+ haproxy_generated_conf = service_registry.get_service('ingress').haproxy_generate_config(
CephadmDaemonDeploySpec(host='host1', daemon_id='ingress', service_name=ispec.service_name()))
assert haproxy_generated_conf[0] == haproxy_expected_conf
]
_get_daemons_by_service.return_value = nfs_daemons
- haproxy_generated_conf = cephadm_module.cephadm_services['ingress'].haproxy_generate_config(
+ haproxy_generated_conf = service_registry.get_service('ingress').haproxy_generate_config(
CephadmDaemonDeploySpec(host='host1', daemon_id='ingress', service_name=ispec.service_name()))
assert haproxy_generated_conf[0] == haproxy_expected_conf
virtual_ip="1.2.3.4/32")
with with_service(cephadm_module, s) as _, with_service(cephadm_module, ispec) as _:
# generate the keepalived conf based on the specified spec
- keepalived_generated_conf = cephadm_module.cephadm_services['ingress'].keepalived_generate_config(
+ keepalived_generated_conf = service_registry.get_service('ingress').keepalived_generate_config(
CephadmDaemonDeploySpec(host='test', daemon_id='ingress', service_name=ispec.service_name()))
keepalived_expected_conf = {
assert keepalived_generated_conf[0] == keepalived_expected_conf
# generate the haproxy conf based on the specified spec
- haproxy_generated_conf = cephadm_module.cephadm_services['ingress'].haproxy_generate_config(
+ haproxy_generated_conf = service_registry.get_service('ingress').haproxy_generate_config(
CephadmDaemonDeploySpec(host='test', daemon_id='ingress', service_name=ispec.service_name()))
haproxy_expected_conf = {
virtual_ip="1.2.3.4/32")
with with_service(cephadm_module, s) as _, with_service(cephadm_module, ispec) as _:
# generate the keepalived conf based on the specified spec
- keepalived_generated_conf = cephadm_module.cephadm_services['ingress'].keepalived_generate_config(
+ keepalived_generated_conf = service_registry.get_service('ingress').keepalived_generate_config(
CephadmDaemonDeploySpec(host='test', daemon_id='ingress', service_name=ispec.service_name()))
keepalived_expected_conf = {
assert keepalived_generated_conf[0] == keepalived_expected_conf
# generate the haproxy conf based on the specified spec
- haproxy_generated_conf = cephadm_module.cephadm_services['ingress'].haproxy_generate_config(
+ haproxy_generated_conf = service_registry.get_service('ingress').haproxy_generate_config(
CephadmDaemonDeploySpec(host='test', daemon_id='ingress', service_name=ispec.service_name()))
haproxy_expected_conf = {
with with_service(cephadm_module, s) as _, with_service(cephadm_module, ispec) as _:
# generate the keepalived conf based on the specified spec
# Test with only 1 IP on the list, as it will fail with more VIPS but only one host.
- keepalived_generated_conf = cephadm_module.cephadm_services['ingress'].keepalived_generate_config(
+ keepalived_generated_conf = service_registry.get_service('ingress').keepalived_generate_config(
CephadmDaemonDeploySpec(host='test', daemon_id='ingress', service_name=ispec.service_name()))
keepalived_expected_conf = {
assert keepalived_generated_conf[0] == keepalived_expected_conf
# generate the haproxy conf based on the specified spec
- haproxy_generated_conf = cephadm_module.cephadm_services['ingress'].haproxy_generate_config(
+ haproxy_generated_conf = service_registry.get_service('ingress').haproxy_generate_config(
CephadmDaemonDeploySpec(host='test', daemon_id='ingress', service_name=ispec.service_name()))
haproxy_expected_conf = {
keepalived_password='12345',
virtual_ips_list=["1.2.3.100/24", "100.100.100.100/24"])
with with_service(cephadm_module, s) as _, with_service(cephadm_module, ispec) as _:
- keepalived_generated_conf = cephadm_module.cephadm_services['ingress'].keepalived_generate_config(
+ keepalived_generated_conf = service_registry.get_service('ingress').keepalived_generate_config(
CephadmDaemonDeploySpec(host='test', daemon_id='ingress', service_name=ispec.service_name()))
keepalived_expected_conf = {
virtual_ip=f"{ip}/24")
with with_service(cephadm_module, s) as _, with_service(cephadm_module, ispec) as _:
# generate the haproxy conf based on the specified spec
- haproxy_daemon_spec = cephadm_module.cephadm_services['ingress'].prepare_create(
+ haproxy_daemon_spec = service_registry.get_service('ingress').prepare_create(
CephadmDaemonDeploySpec(
host='test',
daemon_type='haproxy',
virtual_ip='1.2.3.0/24',
keepalive_only=True)
with with_service(cephadm_module, s) as _, with_service(cephadm_module, ispec) as _:
- nfs_generated_conf, _ = cephadm_module.cephadm_services['nfs'].generate_config(
+ nfs_generated_conf, _ = service_registry.get_service('nfs').generate_config(
CephadmDaemonDeploySpec(host='test', daemon_id='foo.test.0.0', service_name=s.service_name()))
ganesha_conf = nfs_generated_conf['files']['ganesha.conf']
assert "Bind_addr = 1.2.3.0/24" in ganesha_conf
- keepalived_generated_conf = cephadm_module.cephadm_services['ingress'].keepalived_generate_config(
+ keepalived_generated_conf = service_registry.get_service('ingress').keepalived_generate_config(
CephadmDaemonDeploySpec(host='test', daemon_id='ingress', service_name=ispec.service_name()))
keepalived_expected_conf = {
]
_get_daemons_by_service.return_value = nfs_daemons
- ingress_svc = cephadm_module.cephadm_services['ingress']
- nfs_svc = cephadm_module.cephadm_services['nfs']
+ ingress_svc = service_registry.get_service('ingress')
+ nfs_svc = service_registry.get_service('nfs')
# add host network info to one host to test the behavior of
# adding all known-good addresses of the host to the list.
def test_jaeger_query(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
_run_cephadm.side_effect = async_side_effect(('{}', '', 0))
- spec = TracingSpec(es_nodes="192.168.0.1:9200",
- service_type="jaeger-query")
-
+ spec = TracingSpec(es_nodes="192.168.0.1:9200", service_type="jaeger-query")
config = {"elasticsearch_nodes": "http://192.168.0.1:9200"}
with with_host(cephadm_module, 'test'):
import time
import uuid
from typing import TYPE_CHECKING, Optional, Dict, List, Tuple, Any, cast
+from cephadm.services.service_registry import service_registry
import orchestrator
from cephadm.registry import Registry
# setting force flag to retain old functionality.
# note that known is an output argument for ok_to_stop()
- r = self.mgr.cephadm_services[daemon_type_to_service(s.daemon_type)].ok_to_stop([
+ r = service_registry.get_service(daemon_type_to_service(s.daemon_type)).ok_to_stop([
s.daemon_id], known=known, force=True)
if not r.retval: