From d3d6bd6ca1f5eaec6cd1f8ff83e4c2aba51ac0ee Mon Sep 17 00:00:00 2001 From: Redouane Kachach Date: Tue, 14 Jan 2025 10:38:13 +0100 Subject: [PATCH] mgr/cephadm: using service registry pattern for cephadm services This change includes mainly the following enhancements: - Introduced a centralized `CephadmServiceRegistry` to manage service registration and initialization. - Added dynamic discovery of service modules in the same directory using `pkgutil` and `importlib`. - Implemented a decorator `@service_registry_decorator` for automatic registration of service classes. Fixes: https://tracker.ceph.com/issues/69021 Signed-off-by: Redouane Kachach --- src/pybind/mgr/cephadm/agent.py | 5 +- src/pybind/mgr/cephadm/module.py | 97 ++++---------- src/pybind/mgr/cephadm/serve.py | 23 ++-- .../mgr/cephadm/services/cephadmservice.py | 10 ++ src/pybind/mgr/cephadm/services/container.py | 2 + src/pybind/mgr/cephadm/services/ingress.py | 2 + src/pybind/mgr/cephadm/services/iscsi.py | 2 + src/pybind/mgr/cephadm/services/jaeger.py | 5 + .../mgr/cephadm/services/mgmt_gateway.py | 2 + src/pybind/mgr/cephadm/services/monitoring.py | 8 ++ src/pybind/mgr/cephadm/services/nfs.py | 2 + src/pybind/mgr/cephadm/services/node_proxy.py | 2 + src/pybind/mgr/cephadm/services/nvmeof.py | 2 + .../mgr/cephadm/services/oauth2_proxy.py | 2 + src/pybind/mgr/cephadm/services/osd.py | 2 + .../mgr/cephadm/services/service_registry.py | 79 ++++++++++++ src/pybind/mgr/cephadm/services/smb.py | 2 + src/pybind/mgr/cephadm/tests/test_services.py | 121 ++++++------------ src/pybind/mgr/cephadm/upgrade.py | 3 +- 19 files changed, 203 insertions(+), 168 deletions(-) create mode 100644 src/pybind/mgr/cephadm/services/service_registry.py diff --git a/src/pybind/mgr/cephadm/agent.py b/src/pybind/mgr/cephadm/agent.py index d972e5bbde2c9..11b33427ec747 100644 --- a/src/pybind/mgr/cephadm/agent.py +++ b/src/pybind/mgr/cephadm/agent.py @@ -22,6 +22,7 @@ from cephadm.services.cephadmservice import CephadmDaemonDeploySpec from mgr_util import test_port_allocation, PortAlreadyInUse from mgr_util import verify_tls_files import tempfile +from cephadm.services.service_registry import service_registry from urllib.error import HTTPError, URLError from typing import Any, Dict, List, Set, TYPE_CHECKING, Optional, MutableMapping, IO @@ -983,8 +984,8 @@ class CephadmAgentHelpers: # we need to know the agent port to try to reconfig w/ http # otherwise there is no choice but a full ssh reconfig if host in self.mgr.agent_cache.agent_ports and root_cert_match and not down: - daemon_spec = self.mgr.cephadm_services[daemon_type_to_service( - daemon_spec.daemon_type)].prepare_create(daemon_spec) + daemon_spec = service_registry.get_service(daemon_type_to_service( + daemon_spec.daemon_type)).prepare_create(daemon_spec) self.mgr.agent_helpers._request_agent_acks( hosts={daemon_spec.host}, increment=True, diff --git a/src/pybind/mgr/cephadm/module.py b/src/pybind/mgr/cephadm/module.py index afa715bb9125f..61c6282ddaac2 100644 --- a/src/pybind/mgr/cephadm/module.py +++ b/src/pybind/mgr/cephadm/module.py @@ -20,7 +20,7 @@ from cephadm.cert_mgr import CertMgr import string from typing import List, Dict, Optional, Callable, Tuple, TypeVar, \ - Any, Set, TYPE_CHECKING, cast, NamedTuple, Sequence, Type, \ + Any, Set, TYPE_CHECKING, cast, NamedTuple, Sequence, \ Awaitable, Iterator import datetime @@ -43,6 +43,7 @@ from cephadm.serve import CephadmServe from cephadm.services.cephadmservice import CephadmDaemonDeploySpec from cephadm.http_server import CephadmHttpServer from cephadm.agent import CephadmAgentHelpers +from cephadm.services.service_registry import service_registry from mgr_module import ( @@ -65,20 +66,12 @@ from orchestrator._interface import daemon_type_to_service from . import utils from . import ssh from .migrations import Migrations -from .services.cephadmservice import MonService, MgrService, MdsService, RgwService, \ - RbdMirrorService, CrashService, CephadmService, CephfsMirrorService, CephadmAgent, \ - CephExporterService -from .services.ingress import IngressService +from .services.cephadmservice import MgrService, RgwService from .services.container import CustomContainerService from .services.iscsi import IscsiService -from .services.nvmeof import NvmeofService from .services.mgmt_gateway import MgmtGatewayService -from .services.oauth2_proxy import OAuth2ProxyService -from .services.nfs import NFSService from .services.osd import OSDRemovalQueue, OSDService, OSD, NotFoundError -from .services.monitoring import GrafanaService, AlertmanagerService, PrometheusService, \ - NodeExporterService, SNMPGatewayService, LokiService, PromtailService -from .services.jaeger import ElasticSearchService, JaegerAgentService, JaegerCollectorService, JaegerQueryService +from .services.monitoring import AlertmanagerService, PrometheusService from .services.node_proxy import NodeProxy from .services.smb import SMBService from .schedule import HostAssignment @@ -615,49 +608,13 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, self.migration = Migrations(self) - _service_classes: Sequence[Type[CephadmService]] = [ - AlertmanagerService, - CephExporterService, - CephadmAgent, - CephfsMirrorService, - CrashService, - CustomContainerService, - ElasticSearchService, - GrafanaService, - IngressService, - IscsiService, - JaegerAgentService, - JaegerCollectorService, - JaegerQueryService, - LokiService, - MdsService, - MgrService, - MonService, - NFSService, - NodeExporterService, - NodeProxy, - NvmeofService, - OSDService, - PrometheusService, - PromtailService, - RbdMirrorService, - RgwService, - SMBService, - SNMPGatewayService, - MgmtGatewayService, - OAuth2ProxyService, - ] - - # https://github.com/python/mypy/issues/8993 - self.cephadm_services: Dict[str, CephadmService] = { - cls.TYPE: cls(self) for cls in _service_classes} # type: ignore + service_registry.init_services(self) - self.mgr_service: MgrService = cast(MgrService, self.cephadm_services['mgr']) - self.osd_service: OSDService = cast(OSDService, self.cephadm_services['osd']) - self.iscsi_service: IscsiService = cast(IscsiService, self.cephadm_services['iscsi']) - self.nvmeof_service: NvmeofService = cast(NvmeofService, self.cephadm_services['nvmeof']) - self.node_proxy_service: NodeProxy = cast(NodeProxy, self.cephadm_services['node-proxy']) - self.rgw_service: RgwService = cast(RgwService, self.cephadm_services['rgw']) + self.mgr_service: MgrService = cast(MgrService, service_registry.get_service('mgr')) + self.osd_service: OSDService = cast(OSDService, service_registry.get_service('osd')) + self.rgw_service: RgwService = cast(RgwService, service_registry.get_service('rgw')) + self.node_proxy_service: NodeProxy = cast(NodeProxy, service_registry.get_service('node-proxy')) + self.iscsi_service: IscsiService = cast(IscsiService, service_registry.get_service('iscsi')) # used for UT only self.scheduled_async_actions: List[Callable] = [] @@ -696,10 +653,6 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, self.run = False self.event.set() - def _get_cephadm_service(self, service_type: str) -> CephadmService: - assert service_type in ServiceSpec.KNOWN_SERVICE_TYPES - return self.cephadm_services[service_type] - def get_fqdn(self, hostname: str) -> str: """Get a host's FQDN with its hostname. @@ -1933,9 +1886,9 @@ Then run the following: self.log.info(f"removing: {d.name()}") if d.daemon_type != 'osd': - self.cephadm_services[daemon_type_to_service(str(d.daemon_type))].pre_remove(d) - self.cephadm_services[daemon_type_to_service( - str(d.daemon_type))].post_remove(d, is_failed_deploy=False) + service_registry.get_service(daemon_type_to_service(str(d.daemon_type))).pre_remove(d) + service_registry.get_service(daemon_type_to_service( + str(d.daemon_type))).post_remove(d, is_failed_deploy=False) else: cmd_args = { 'prefix': 'osd purge-actual', @@ -2049,8 +2002,8 @@ Then run the following: error_notifications: List[str] = [] okay: bool = True for daemon_type, daemon_ids in daemon_map.items(): - r = self.cephadm_services[daemon_type_to_service( - daemon_type)].ok_to_stop(daemon_ids, force=force) + r = service_registry.get_service(daemon_type_to_service( + daemon_type)).ok_to_stop(daemon_ids, force=force) if r.retval: okay = False # collect error notifications so user can see every daemon causing host @@ -2487,8 +2440,8 @@ Then run the following: # deploy a new keyring file if daemon_spec.daemon_type != 'osd': - daemon_spec = self.cephadm_services[daemon_type_to_service( - daemon_spec.daemon_type)].prepare_create(daemon_spec) + daemon_spec = service_registry.get_service(daemon_type_to_service( + daemon_spec.daemon_type)).prepare_create(daemon_spec) with self.async_timeout_handler(daemon_spec.host, f'cephadm deploy ({daemon_spec.daemon_type} daemon)'): self.wait_async(CephadmServe(self)._create_daemon(daemon_spec, reconfig=True)) @@ -2539,8 +2492,8 @@ Then run the following: if action == 'redeploy' or action == 'reconfig': if daemon_spec.daemon_type != 'osd': - daemon_spec = self.cephadm_services[daemon_type_to_service( - daemon_spec.daemon_type)].prepare_create(daemon_spec) + daemon_spec = service_registry.get_service(daemon_type_to_service( + daemon_spec.daemon_type)).prepare_create(daemon_spec) else: # for OSDs, we still need to update config, just not carry out the full # prepare_create function @@ -2932,7 +2885,7 @@ Then run the following: daemon_type: str, daemon_id: str) -> List[str]: svc_type = daemon_type_to_service(daemon_type) - svc_cls = self.cephadm_services.get(svc_type, None) + svc_cls = service_registry.get_service(svc_type) deps = svc_cls.get_dependencies(self, spec, daemon_type) if svc_cls else [] return sorted(deps) @@ -2986,10 +2939,10 @@ Then run the following: forcename=name) if not did_config: - self.cephadm_services[service_type].config(spec) + service_registry.get_service(service_type).config(spec) did_config = True - daemon_spec = self.cephadm_services[service_type].make_daemon_spec( + daemon_spec = service_registry.get_service(service_type).make_daemon_spec( host, daemon_id, network, spec, # NOTE: this does not consider port conflicts! ports=spec.get_port_start()) @@ -3007,7 +2960,7 @@ Then run the following: @forall_hosts def create_func_map(*args: Any) -> str: - daemon_spec = self.cephadm_services[daemon_type].prepare_create(*args) + daemon_spec = service_registry.get_service(daemon_type).prepare_create(*args) with self.async_timeout_handler(daemon_spec.host, f'cephadm deploy ({daemon_spec.daemon_type} daemon)'): return self.wait_async(CephadmServe(self)._create_daemon(daemon_spec)) @@ -3351,7 +3304,7 @@ Then run the following: 'service_type': spec.service_type, 'data': self._preview_osdspecs(osdspecs=[cast(DriveGroupSpec, spec)])} - svc = self.cephadm_services[spec.service_type] + svc = service_registry.get_service(spec.service_type) rank_map = None if svc.ranked(spec): rank_map = self.spec_store[spec.service_name()].rank_map @@ -3455,7 +3408,7 @@ Then run the following: draining_hosts=self.cache.get_draining_hosts(), networks=self.cache.networks, daemons=self.cache.get_daemons_by_service(spec.service_name()), - allow_colo=self.cephadm_services[spec.service_type].allow_colo(), + allow_colo=service_registry.get_service(spec.service_type).allow_colo(), ).validate() self.log.info('Saving service %s spec with placement %s' % ( diff --git a/src/pybind/mgr/cephadm/serve.py b/src/pybind/mgr/cephadm/serve.py index 87b3a1df85155..f2bccb104e490 100644 --- a/src/pybind/mgr/cephadm/serve.py +++ b/src/pybind/mgr/cephadm/serve.py @@ -31,6 +31,7 @@ from cephadm.utils import forall_hosts, cephadmNoImage, is_repo_digest, \ CephadmNoImage, CEPH_TYPES, ContainerInspectInfo, SpecialHostLabels from mgr_module import MonCommandFailed from mgr_util import format_bytes, verify_tls, get_cert_issuer_info, ServerConfigException +from cephadm.services.service_registry import service_registry from . import utils from . import exchange @@ -552,7 +553,7 @@ class CephadmServe: daemon_type_to_service(cast(str, dd.daemon_type)) for dd in managed } - _services = [self.mgr.cephadm_services[dt] for dt in svcs] + _services = [service_registry.get_service(dt) for dt in svcs] def _filter( service_type: str, daemon_id: str, name: str @@ -752,7 +753,7 @@ class CephadmServe: # return a solid indication return False - svc = self.mgr.cephadm_services[service_type] + svc = service_registry.get_service(service_type) daemons = self.mgr.cache.get_daemons_by_service(service_name) related_service_daemons = self.mgr.cache.get_related_service_daemons(spec) @@ -1112,7 +1113,7 @@ class CephadmServe: if dd.daemon_type in REQUIRES_POST_ACTIONS: daemons_post[dd.daemon_type].append(dd) - if self.mgr.cephadm_services[daemon_type_to_service(dd.daemon_type)].get_active_daemon( + if service_registry.get_service(daemon_type_to_service(dd.daemon_type)).get_active_daemon( self.mgr.cache.get_daemons_by_service(dd.service_name())).daemon_id == dd.daemon_id: dd.is_active = True else: @@ -1196,7 +1197,7 @@ class CephadmServe: self.mgr.requires_post_actions.remove(d.name()) run_post = True if run_post: - self.mgr._get_cephadm_service(daemon_type_to_service( + service_registry.get_service(daemon_type_to_service( daemon_type)).daemon_check_post(daemon_descs) def _purge_deleted_services(self) -> None: @@ -1212,7 +1213,7 @@ class CephadmServe: logger.info(f'Purge service {service_name}') - self.mgr.cephadm_services[spec.service_type].purge(service_name) + service_registry.get_service(spec.service_type).purge(service_name) self.mgr.spec_store.finally_rm(service_name) def convert_tags_to_repo_digest(self) -> None: @@ -1494,7 +1495,7 @@ class CephadmServe: # we have to clean up the daemon. E.g. keyrings. servict_type = daemon_type_to_service(daemon_spec.daemon_type) dd = daemon_spec.to_daemon_description(DaemonDescriptionStatus.error, 'failed') - self.mgr.cephadm_services[servict_type].post_remove(dd, is_failed_deploy=True) + service_registry.get_service(servict_type).post_remove(dd, is_failed_deploy=True) raise def _setup_extra_deployment_args( @@ -1559,7 +1560,7 @@ class CephadmServe: with set_exception_subject('service', daemon.service_id(), overwrite=True): - self.mgr.cephadm_services[daemon_type_to_service(daemon_type)].pre_remove(daemon) + service_registry.get_service(daemon_type_to_service(daemon_type)).pre_remove(daemon) # NOTE: we are passing the 'force' flag here, which means # we can delete a mon instances data. dd = self.mgr.cache.get_daemon(daemon.daemon_name) @@ -1579,11 +1580,11 @@ class CephadmServe: if not no_post_remove: if daemon_type not in ['iscsi']: - self.mgr.cephadm_services[daemon_type_to_service( - daemon_type)].post_remove(daemon, is_failed_deploy=False) + service_registry.get_service(daemon_type_to_service( + daemon_type)).post_remove(daemon, is_failed_deploy=False) else: - self.mgr.scheduled_async_actions.append(lambda: self.mgr.cephadm_services[daemon_type_to_service( - daemon_type)].post_remove(daemon, is_failed_deploy=False)) + self.mgr.scheduled_async_actions.append(lambda: service_registry.get_service(daemon_type_to_service( + daemon_type)).post_remove(daemon, is_failed_deploy=False)) self.mgr._kick_serve_loop() self.mgr.recently_altered_daemons[name] = datetime_now() diff --git a/src/pybind/mgr/cephadm/services/cephadmservice.py b/src/pybind/mgr/cephadm/services/cephadmservice.py index 607e1bd9019e6..19ecc79b414b3 100644 --- a/src/pybind/mgr/cephadm/services/cephadmservice.py +++ b/src/pybind/mgr/cephadm/services/cephadmservice.py @@ -26,6 +26,7 @@ from mgr_util import build_url, merge_dicts from orchestrator import OrchestratorError, DaemonDescription, DaemonDescriptionStatus from orchestrator._interface import daemon_type_to_service from cephadm import utils +from .service_registry import register_cephadm_service if TYPE_CHECKING: from cephadm.module import CephadmOrchestrator @@ -645,6 +646,7 @@ class CephService(CephadmService): }) +@register_cephadm_service class MonService(CephService): TYPE = 'mon' @@ -808,6 +810,7 @@ class MonService(CephService): logger.error(f'Failed setting crush location for mon {dd.daemon_id}: {e}') +@register_cephadm_service class MgrService(CephService): TYPE = 'mgr' @@ -929,6 +932,7 @@ class MgrService(CephService): return HandleCommandResult(0, warn_message, '') +@register_cephadm_service class MdsService(CephService): TYPE = 'mds' @@ -985,6 +989,7 @@ class MdsService(CephService): }) +@register_cephadm_service class RgwService(CephService): TYPE = 'rgw' @@ -1252,6 +1257,7 @@ class RgwService(CephService): self.mgr.trigger_connect_dashboard_rgw() +@register_cephadm_service class RbdMirrorService(CephService): TYPE = 'rbd-mirror' @@ -1286,6 +1292,7 @@ class RbdMirrorService(CephService): return HandleCommandResult(0, warn_message, '') +@register_cephadm_service class CrashService(CephService): TYPE = 'crash' @@ -1304,6 +1311,7 @@ class CrashService(CephService): return daemon_spec +@register_cephadm_service class CephExporterService(CephService): TYPE = 'ceph-exporter' DEFAULT_SERVICE_PORT = 9926 @@ -1356,6 +1364,7 @@ class CephExporterService(CephService): return self.mgr.cert_mgr.generate_cert(host_fqdn, node_ip) +@register_cephadm_service class CephfsMirrorService(CephService): TYPE = 'cephfs-mirror' @@ -1388,6 +1397,7 @@ class CephfsMirrorService(CephService): return daemon_spec +@register_cephadm_service class CephadmAgent(CephService): TYPE = 'agent' diff --git a/src/pybind/mgr/cephadm/services/container.py b/src/pybind/mgr/cephadm/services/container.py index b9cdfad5e760e..9451952415e39 100644 --- a/src/pybind/mgr/cephadm/services/container.py +++ b/src/pybind/mgr/cephadm/services/container.py @@ -2,12 +2,14 @@ import logging from typing import List, Any, Tuple, Dict, cast from ceph.deployment.service_spec import CustomContainerSpec +from .service_registry import register_cephadm_service from .cephadmservice import CephadmService, CephadmDaemonDeploySpec logger = logging.getLogger(__name__) +@register_cephadm_service class CustomContainerService(CephadmService): TYPE = 'container' diff --git a/src/pybind/mgr/cephadm/services/ingress.py b/src/pybind/mgr/cephadm/services/ingress.py index 442458f2711d3..60fc586da85c6 100644 --- a/src/pybind/mgr/cephadm/services/ingress.py +++ b/src/pybind/mgr/cephadm/services/ingress.py @@ -9,6 +9,7 @@ from mgr_util import build_url from cephadm import utils from orchestrator import OrchestratorError, DaemonDescription from cephadm.services.cephadmservice import CephadmDaemonDeploySpec, CephService +from .service_registry import register_cephadm_service if TYPE_CHECKING: from ..module import CephadmOrchestrator @@ -16,6 +17,7 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) +@register_cephadm_service class IngressService(CephService): TYPE = 'ingress' MAX_KEEPALIVED_PASS_LEN = 8 diff --git a/src/pybind/mgr/cephadm/services/iscsi.py b/src/pybind/mgr/cephadm/services/iscsi.py index 963a845ec74da..e3e924493c406 100644 --- a/src/pybind/mgr/cephadm/services/iscsi.py +++ b/src/pybind/mgr/cephadm/services/iscsi.py @@ -10,6 +10,7 @@ from ceph.deployment.service_spec import IscsiServiceSpec, ServiceSpec from orchestrator import DaemonDescription, DaemonDescriptionStatus from .cephadmservice import CephadmDaemonDeploySpec, CephService +from .service_registry import register_cephadm_service from .. import utils if TYPE_CHECKING: @@ -29,6 +30,7 @@ def get_trusted_ips(mgr: "CephadmOrchestrator", spec: IscsiServiceSpec) -> str: return trusted_ip_list +@register_cephadm_service class IscsiService(CephService): TYPE = 'iscsi' diff --git a/src/pybind/mgr/cephadm/services/jaeger.py b/src/pybind/mgr/cephadm/services/jaeger.py index 6c415512eefae..cde78769aa7cd 100644 --- a/src/pybind/mgr/cephadm/services/jaeger.py +++ b/src/pybind/mgr/cephadm/services/jaeger.py @@ -1,12 +1,14 @@ from typing import List, cast, Optional, TYPE_CHECKING from cephadm.services.cephadmservice import CephadmService, CephadmDaemonDeploySpec from ceph.deployment.service_spec import TracingSpec, ServiceSpec +from .service_registry import register_cephadm_service from mgr_util import build_url if TYPE_CHECKING: from ..module import CephadmOrchestrator +@register_cephadm_service class ElasticSearchService(CephadmService): TYPE = 'elasticsearch' DEFAULT_SERVICE_PORT = 9200 @@ -16,6 +18,7 @@ class ElasticSearchService(CephadmService): return daemon_spec +@register_cephadm_service class JaegerAgentService(CephadmService): TYPE = 'jaeger-agent' DEFAULT_SERVICE_PORT = 6799 @@ -47,6 +50,7 @@ class JaegerAgentService(CephadmService): return daemon_spec +@register_cephadm_service class JaegerCollectorService(CephadmService): TYPE = 'jaeger-collector' DEFAULT_SERVICE_PORT = 14250 @@ -58,6 +62,7 @@ class JaegerCollectorService(CephadmService): return daemon_spec +@register_cephadm_service class JaegerQueryService(CephadmService): TYPE = 'jaeger-query' DEFAULT_SERVICE_PORT = 16686 diff --git a/src/pybind/mgr/cephadm/services/mgmt_gateway.py b/src/pybind/mgr/cephadm/services/mgmt_gateway.py index 0706a9ad17752..fa47428e5dccc 100644 --- a/src/pybind/mgr/cephadm/services/mgmt_gateway.py +++ b/src/pybind/mgr/cephadm/services/mgmt_gateway.py @@ -4,6 +4,7 @@ from typing import List, Any, Tuple, Dict, cast, Optional, TYPE_CHECKING from orchestrator import DaemonDescription from ceph.deployment.service_spec import MgmtGatewaySpec, GrafanaSpec, ServiceSpec from cephadm.services.cephadmservice import CephadmService, CephadmDaemonDeploySpec, get_dashboard_endpoints +from .service_registry import register_cephadm_service if TYPE_CHECKING: from ..module import CephadmOrchestrator @@ -11,6 +12,7 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) +@register_cephadm_service class MgmtGatewayService(CephadmService): TYPE = 'mgmt-gateway' SVC_TEMPLATE_PATH = 'services/mgmt-gateway/nginx.conf.j2' diff --git a/src/pybind/mgr/cephadm/services/monitoring.py b/src/pybind/mgr/cephadm/services/monitoring.py index bd0620f595f5d..ced3c5c2d42ba 100644 --- a/src/pybind/mgr/cephadm/services/monitoring.py +++ b/src/pybind/mgr/cephadm/services/monitoring.py @@ -6,6 +6,7 @@ from typing import List, Any, Tuple, Dict, Optional, cast, TYPE_CHECKING import ipaddress from mgr_module import HandleCommandResult +from .service_registry import register_cephadm_service from orchestrator import DaemonDescription from ceph.deployment.service_spec import AlertManagerSpec, GrafanaSpec, ServiceSpec, \ @@ -21,6 +22,7 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) +@register_cephadm_service class GrafanaService(CephadmService): TYPE = 'grafana' DEFAULT_SERVICE_PORT = 3000 @@ -286,6 +288,7 @@ class GrafanaService(CephadmService): return HandleCommandResult(0, warn_message, '') +@register_cephadm_service class AlertmanagerService(CephadmService): TYPE = 'alertmanager' DEFAULT_SERVICE_PORT = 9093 @@ -452,6 +455,7 @@ class AlertmanagerService(CephadmService): return HandleCommandResult(0, warn_message, '') +@register_cephadm_service class PrometheusService(CephadmService): TYPE = 'prometheus' DEFAULT_SERVICE_PORT = 9095 @@ -732,6 +736,7 @@ class PrometheusService(CephadmService): return '/prometheus/federate' +@register_cephadm_service class NodeExporterService(CephadmService): TYPE = 'node-exporter' DEFAULT_SERVICE_PORT = 9100 @@ -789,6 +794,7 @@ class NodeExporterService(CephadmService): return HandleCommandResult(0, out, '') +@register_cephadm_service class LokiService(CephadmService): TYPE = 'loki' DEFAULT_SERVICE_PORT = 3100 @@ -810,6 +816,7 @@ class LokiService(CephadmService): }, sorted(deps) +@register_cephadm_service class PromtailService(CephadmService): TYPE = 'promtail' DEFAULT_SERVICE_PORT = 9080 @@ -847,6 +854,7 @@ class PromtailService(CephadmService): }, deps +@register_cephadm_service class SNMPGatewayService(CephadmService): TYPE = 'snmp-gateway' diff --git a/src/pybind/mgr/cephadm/services/nfs.py b/src/pybind/mgr/cephadm/services/nfs.py index 89a977c4624df..8b6ea752d5484 100644 --- a/src/pybind/mgr/cephadm/services/nfs.py +++ b/src/pybind/mgr/cephadm/services/nfs.py @@ -12,6 +12,7 @@ from mgr_module import HandleCommandResult from mgr_module import NFS_POOL_NAME as POOL_NAME from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec +from .service_registry import register_cephadm_service from orchestrator import DaemonDescription @@ -20,6 +21,7 @@ from cephadm.services.cephadmservice import AuthEntity, CephadmDaemonDeploySpec, logger = logging.getLogger(__name__) +@register_cephadm_service class NFSService(CephService): TYPE = 'nfs' DEFAULT_EXPORTER_PORT = 9587 diff --git a/src/pybind/mgr/cephadm/services/node_proxy.py b/src/pybind/mgr/cephadm/services/node_proxy.py index 8ad230b6342b2..3547b14bdb4ea 100644 --- a/src/pybind/mgr/cephadm/services/node_proxy.py +++ b/src/pybind/mgr/cephadm/services/node_proxy.py @@ -5,6 +5,7 @@ import base64 from urllib.error import HTTPError, URLError from typing import List, Any, Dict, Tuple, Optional, MutableMapping, TYPE_CHECKING +from .service_registry import register_cephadm_service from .cephadmservice import CephadmDaemonDeploySpec, CephService from ceph.deployment.service_spec import ServiceSpec, PlacementSpec from ceph.utils import http_req @@ -14,6 +15,7 @@ if TYPE_CHECKING: from ..module import CephadmOrchestrator +@register_cephadm_service class NodeProxy(CephService): TYPE = 'node-proxy' diff --git a/src/pybind/mgr/cephadm/services/nvmeof.py b/src/pybind/mgr/cephadm/services/nvmeof.py index 8acec94f3829c..b522a76203263 100644 --- a/src/pybind/mgr/cephadm/services/nvmeof.py +++ b/src/pybind/mgr/cephadm/services/nvmeof.py @@ -9,11 +9,13 @@ from ceph.deployment.service_spec import NvmeofServiceSpec from orchestrator import OrchestratorError, DaemonDescription, DaemonDescriptionStatus from .cephadmservice import CephadmDaemonDeploySpec, CephService +from .service_registry import register_cephadm_service from .. import utils logger = logging.getLogger(__name__) +@register_cephadm_service class NvmeofService(CephService): TYPE = 'nvmeof' PROMETHEUS_PORT = 10008 diff --git a/src/pybind/mgr/cephadm/services/oauth2_proxy.py b/src/pybind/mgr/cephadm/services/oauth2_proxy.py index cabb21bce139e..078de78e5d3c6 100644 --- a/src/pybind/mgr/cephadm/services/oauth2_proxy.py +++ b/src/pybind/mgr/cephadm/services/oauth2_proxy.py @@ -6,10 +6,12 @@ import base64 from orchestrator import DaemonDescription from ceph.deployment.service_spec import OAuth2ProxySpec from cephadm.services.cephadmservice import CephadmService, CephadmDaemonDeploySpec +from .service_registry import register_cephadm_service logger = logging.getLogger(__name__) +@register_cephadm_service class OAuth2ProxyService(CephadmService): TYPE = 'oauth2-proxy' SVC_TEMPLATE_PATH = 'services/oauth2-proxy/oauth2-proxy.conf.j2' diff --git a/src/pybind/mgr/cephadm/services/osd.py b/src/pybind/mgr/cephadm/services/osd.py index 80bf92772c49b..ff93f857bd344 100644 --- a/src/pybind/mgr/cephadm/services/osd.py +++ b/src/pybind/mgr/cephadm/services/osd.py @@ -19,6 +19,7 @@ from orchestrator import OrchestratorError, DaemonDescription from mgr_module import MonCommandFailed from cephadm.services.cephadmservice import CephadmDaemonDeploySpec, CephService +from .service_registry import register_cephadm_service if TYPE_CHECKING: from cephadm.module import CephadmOrchestrator @@ -26,6 +27,7 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) +@register_cephadm_service class OSDService(CephService): TYPE = 'osd' diff --git a/src/pybind/mgr/cephadm/services/service_registry.py b/src/pybind/mgr/cephadm/services/service_registry.py new file mode 100644 index 0000000000000..1efb5e49b3d7d --- /dev/null +++ b/src/pybind/mgr/cephadm/services/service_registry.py @@ -0,0 +1,79 @@ +""" +Cephadm Service Registry + +This module provides a centralized service registry for managing and initializing Cephadm services. +It dynamically discovers, imports, and registers service classes in the services directory, ensuring +modularity and scalability. The `@register_cephadm_service` decorator relies on an automatic discovery +mechanism based on `pkgutil` and `importlib` which dynamically import modules, triggering the decorator +for service registration and eliminating the need for manual imports. + +Key Features: +- Automatically discovers and imports all service modules during initialization. +- Registers service classes using the `@register_cephadm_service`. +- Manages the lifecycle of service instances, initialized via `init_services`. +- Provides a singleton `service_registry` for global access. + +Usage: +1. Define a service class by deriving from the CephadmService base class. +2. Place @register_cephadm_service decorator above your class to register it automatically. +3. Call `service_registry.init_services(mgr)` to initialize all registered services. +4. Access services using `service_registry.get_service(service_type)`. +""" + +import os +import logging +from typing import Type, Dict, TYPE_CHECKING +import importlib +import pkgutil + +if TYPE_CHECKING: + from cephadm.module import CephadmOrchestrator + from .cephadmservice import CephadmService + +logger = logging.getLogger(__name__) + + +class CephadmServiceRegistry: + """Centralized registry for Cephadm services.""" + + def __init__(self) -> None: + self._services: Dict[str, "CephadmService"] = {} # Initialized service instances + self._registered_services: Dict[str, Type["CephadmService"]] = {} # Mapping of service types to classes + + def register_service(self, service_type: str, service_cls: Type["CephadmService"]) -> None: + """Registers a service class to the registry.""" + if service_type in self._registered_services: + logger.warning(f"Service type '{service_type}' is already registered.") + logger.debug(f"Registering service: {service_type}") + self._registered_services[service_type] = service_cls + + def init_services(self, mgr: "CephadmOrchestrator") -> None: + """Initializes and stores service instances using the provided orchestrator manager.""" + self._discover_services() + for service_type, service_cls in self._registered_services.items(): + logger.debug(f"Initializing service: {service_type}") + self._services[service_type] = service_cls(mgr) + + def _discover_services(self) -> None: + """Dynamically discovers and imports all modules in the current directory.""" + current_package = __name__.rsplit(".", 1)[0] # Get the package name + package_path = os.path.dirname(__file__) # Directory of this file + for _, module_name, _ in pkgutil.iter_modules([package_path]): + logger.debug(f"Importing service module: {current_package}.{module_name}") + importlib.import_module(f"{current_package}.{module_name}") + + def get_service(self, service_type: str) -> "CephadmService": + """Retrieves an initialized service instance by type.""" + return self._services[service_type] + + +def register_cephadm_service(cls: Type["CephadmService"]) -> Type["CephadmService"]: + """ + Decorator to register a service class with the global service registry. + """ + service_registry.register_service(str(cls.TYPE), cls) + return cls + + +# Singleton instance of the registry +service_registry = CephadmServiceRegistry() diff --git a/src/pybind/mgr/cephadm/services/smb.py b/src/pybind/mgr/cephadm/services/smb.py index e322acb0e3e73..4382736042f06 100644 --- a/src/pybind/mgr/cephadm/services/smb.py +++ b/src/pybind/mgr/cephadm/services/smb.py @@ -5,6 +5,7 @@ from typing import Any, Dict, List, Tuple, cast, Optional from mgr_module import HandleCommandResult from ceph.deployment.service_spec import ServiceSpec, SMBSpec +from .service_registry import register_cephadm_service from orchestrator import DaemonDescription from .cephadmservice import ( @@ -17,6 +18,7 @@ from .cephadmservice import ( logger = logging.getLogger(__name__) +@register_cephadm_service class SMBService(CephService): TYPE = 'smb' DEFAULT_EXPORTER_PORT = 9922 diff --git a/src/pybind/mgr/cephadm/tests/test_services.py b/src/pybind/mgr/cephadm/tests/test_services.py index edb0f88718880..a321ab75d0db6 100644 --- a/src/pybind/mgr/cephadm/tests/test_services.py +++ b/src/pybind/mgr/cephadm/tests/test_services.py @@ -9,14 +9,11 @@ import pytest from unittest.mock import Mock, MagicMock, call, patch, ANY from cephadm.serve import CephadmServe -from cephadm.services.cephadmservice import MonService, MgrService, MdsService, RgwService, \ - RbdMirrorService, CrashService, CephadmDaemonDeploySpec +from cephadm.services.service_registry import service_registry +from cephadm.services.cephadmservice import MonService, CephadmDaemonDeploySpec from cephadm.services.iscsi import IscsiService -from cephadm.services.nfs import NFSService from cephadm.services.nvmeof import NvmeofService -from cephadm.services.osd import OSDService -from cephadm.services.monitoring import GrafanaService, AlertmanagerService, PrometheusService, \ - NodeExporterService, LokiService, PromtailService +from cephadm.services.monitoring import GrafanaService, AlertmanagerService, PrometheusService from cephadm.services.smb import SMBSpec from cephadm.module import CephadmOrchestrator from ceph.deployment.service_spec import ( @@ -108,84 +105,46 @@ class TestCephadmService: service._set_value_on_dashboard('svc', 'get-cmd', 'set-cmd', service_url) mgr.check_mon_command.assert_called_once_with({'prefix': 'get-cmd'}) - def _get_services(self, mgr): - # services: - osd_service = OSDService(mgr) - nfs_service = NFSService(mgr) - mon_service = MonService(mgr) - mgr_service = MgrService(mgr) - mds_service = MdsService(mgr) - rgw_service = RgwService(mgr) - rbd_mirror_service = RbdMirrorService(mgr) - grafana_service = GrafanaService(mgr) - alertmanager_service = AlertmanagerService(mgr) - prometheus_service = PrometheusService(mgr) - node_exporter_service = NodeExporterService(mgr) - loki_service = LokiService(mgr) - promtail_service = PromtailService(mgr) - crash_service = CrashService(mgr) - iscsi_service = IscsiService(mgr) - nvmeof_service = NvmeofService(mgr) - cephadm_services = { - 'mon': mon_service, - 'mgr': mgr_service, - 'osd': osd_service, - 'mds': mds_service, - 'rgw': rgw_service, - 'rbd-mirror': rbd_mirror_service, - 'nfs': nfs_service, - 'grafana': grafana_service, - 'alertmanager': alertmanager_service, - 'prometheus': prometheus_service, - 'node-exporter': node_exporter_service, - 'loki': loki_service, - 'promtail': promtail_service, - 'crash': crash_service, - 'iscsi': iscsi_service, - 'nvmeof': nvmeof_service, - } - return cephadm_services - def test_get_auth_entity(self): mgr = FakeMgr() - cephadm_services = self._get_services(mgr) + service_registry.init_services(mgr) for daemon_type in ['rgw', 'rbd-mirror', 'nfs', "iscsi"]: assert "client.%s.id1" % (daemon_type) == \ - cephadm_services[daemon_type].get_auth_entity("id1", "host") + service_registry.get_service(daemon_type).get_auth_entity("id1", "host") assert "client.%s.id1" % (daemon_type) == \ - cephadm_services[daemon_type].get_auth_entity("id1", "") + service_registry.get_service(daemon_type).get_auth_entity("id1", "") assert "client.%s.id1" % (daemon_type) == \ - cephadm_services[daemon_type].get_auth_entity("id1") + service_registry.get_service(daemon_type).get_auth_entity("id1") assert "client.crash.host" == \ - cephadm_services["crash"].get_auth_entity("id1", "host") + service_registry.get_service('crash').get_auth_entity("id1", "host") with pytest.raises(OrchestratorError): - cephadm_services["crash"].get_auth_entity("id1", "") - cephadm_services["crash"].get_auth_entity("id1") + service_registry.get_service('crash').get_auth_entity("id1", "") + service_registry.get_service('crash').get_auth_entity("id1") - assert "mon." == cephadm_services["mon"].get_auth_entity("id1", "host") - assert "mon." == cephadm_services["mon"].get_auth_entity("id1", "") - assert "mon." == cephadm_services["mon"].get_auth_entity("id1") + assert "mon." == service_registry.get_service('mon').get_auth_entity("id1", "host") + assert "mon." == service_registry.get_service('mon').get_auth_entity("id1", "") + assert "mon." == service_registry.get_service('mon').get_auth_entity("id1") - assert "mgr.id1" == cephadm_services["mgr"].get_auth_entity("id1", "host") - assert "mgr.id1" == cephadm_services["mgr"].get_auth_entity("id1", "") - assert "mgr.id1" == cephadm_services["mgr"].get_auth_entity("id1") + assert "mgr.id1" == service_registry.get_service('mgr').get_auth_entity("id1", "host") + assert "mgr.id1" == service_registry.get_service('mgr').get_auth_entity("id1", "") + assert "mgr.id1" == service_registry.get_service('mgr').get_auth_entity("id1") for daemon_type in ["osd", "mds"]: assert "%s.id1" % daemon_type == \ - cephadm_services[daemon_type].get_auth_entity("id1", "host") + service_registry.get_service(daemon_type).get_auth_entity("id1", "host") assert "%s.id1" % daemon_type == \ - cephadm_services[daemon_type].get_auth_entity("id1", "") + service_registry.get_service(daemon_type).get_auth_entity("id1", "") assert "%s.id1" % daemon_type == \ - cephadm_services[daemon_type].get_auth_entity("id1") + service_registry.get_service(daemon_type).get_auth_entity("id1") # services based on CephadmService shouldn't have get_auth_entity with pytest.raises(AttributeError): for daemon_type in ['grafana', 'alertmanager', 'prometheus', 'node-exporter', 'loki', 'promtail']: - cephadm_services[daemon_type].get_auth_entity("id1", "host") - cephadm_services[daemon_type].get_auth_entity("id1", "") - cephadm_services[daemon_type].get_auth_entity("id1") + service_registry.get_service(daemon_type).get_auth_entity("id1", "host") + service_registry.get_service(daemon_type).get_auth_entity("id1", "") + service_registry.get_service(daemon_type).get_auth_entity("id1") class TestISCSIService: @@ -1905,7 +1864,7 @@ class TestMonitoring: with with_host(cephadm_module, 'test'): with with_service(cephadm_module, ServiceSpec('mgr')) as _, \ with_service(cephadm_module, GrafanaSpec(initial_admin_password='secure')): - out = cephadm_module.cephadm_services['grafana'].generate_config( + out = service_registry.get_service('grafana').generate_config( CephadmDaemonDeploySpec('test', 'daemon', 'grafana')) assert out == ( { @@ -1971,7 +1930,7 @@ class TestMonitoring: with with_host(cephadm_module, 'test'): with with_service(cephadm_module, ServiceSpec('mgr')) as _, \ with_service(cephadm_module, GrafanaSpec(anonymous_access=False, initial_admin_password='secure')): - out = cephadm_module.cephadm_services['grafana'].generate_config( + out = service_registry.get_service('grafana').generate_config( CephadmDaemonDeploySpec('test', 'daemon', 'grafana')) assert out == ( { @@ -2484,7 +2443,7 @@ class TestIngressService: ] _get_daemons_by_service.return_value = nfs_daemons - haproxy_generated_conf = cephadm_module.cephadm_services['ingress'].haproxy_generate_config( + haproxy_generated_conf = service_registry.get_service('ingress').haproxy_generate_config( CephadmDaemonDeploySpec(host='host1', daemon_id='ingress', service_name=ispec.service_name())) assert haproxy_generated_conf[0] == haproxy_expected_conf @@ -2498,7 +2457,7 @@ class TestIngressService: ] _get_daemons_by_service.return_value = nfs_daemons - haproxy_generated_conf = cephadm_module.cephadm_services['ingress'].haproxy_generate_config( + haproxy_generated_conf = service_registry.get_service('ingress').haproxy_generate_config( CephadmDaemonDeploySpec(host='host1', daemon_id='ingress', service_name=ispec.service_name())) assert haproxy_generated_conf[0] == haproxy_expected_conf @@ -2533,7 +2492,7 @@ class TestIngressService: virtual_ip="1.2.3.4/32") with with_service(cephadm_module, s) as _, with_service(cephadm_module, ispec) as _: # generate the keepalived conf based on the specified spec - keepalived_generated_conf = cephadm_module.cephadm_services['ingress'].keepalived_generate_config( + keepalived_generated_conf = service_registry.get_service('ingress').keepalived_generate_config( CephadmDaemonDeploySpec(host='test', daemon_id='ingress', service_name=ispec.service_name())) keepalived_expected_conf = { @@ -2577,7 +2536,7 @@ class TestIngressService: assert keepalived_generated_conf[0] == keepalived_expected_conf # generate the haproxy conf based on the specified spec - haproxy_generated_conf = cephadm_module.cephadm_services['ingress'].haproxy_generate_config( + haproxy_generated_conf = service_registry.get_service('ingress').haproxy_generate_config( CephadmDaemonDeploySpec(host='test', daemon_id='ingress', service_name=ispec.service_name())) haproxy_expected_conf = { @@ -2659,7 +2618,7 @@ class TestIngressService: virtual_ip="1.2.3.4/32") with with_service(cephadm_module, s) as _, with_service(cephadm_module, ispec) as _: # generate the keepalived conf based on the specified spec - keepalived_generated_conf = cephadm_module.cephadm_services['ingress'].keepalived_generate_config( + keepalived_generated_conf = service_registry.get_service('ingress').keepalived_generate_config( CephadmDaemonDeploySpec(host='test', daemon_id='ingress', service_name=ispec.service_name())) keepalived_expected_conf = { @@ -2703,7 +2662,7 @@ class TestIngressService: assert keepalived_generated_conf[0] == keepalived_expected_conf # generate the haproxy conf based on the specified spec - haproxy_generated_conf = cephadm_module.cephadm_services['ingress'].haproxy_generate_config( + haproxy_generated_conf = service_registry.get_service('ingress').haproxy_generate_config( CephadmDaemonDeploySpec(host='test', daemon_id='ingress', service_name=ispec.service_name())) haproxy_expected_conf = { @@ -2788,7 +2747,7 @@ class TestIngressService: with with_service(cephadm_module, s) as _, with_service(cephadm_module, ispec) as _: # generate the keepalived conf based on the specified spec # Test with only 1 IP on the list, as it will fail with more VIPS but only one host. - keepalived_generated_conf = cephadm_module.cephadm_services['ingress'].keepalived_generate_config( + keepalived_generated_conf = service_registry.get_service('ingress').keepalived_generate_config( CephadmDaemonDeploySpec(host='test', daemon_id='ingress', service_name=ispec.service_name())) keepalived_expected_conf = { @@ -2832,7 +2791,7 @@ class TestIngressService: assert keepalived_generated_conf[0] == keepalived_expected_conf # generate the haproxy conf based on the specified spec - haproxy_generated_conf = cephadm_module.cephadm_services['ingress'].haproxy_generate_config( + haproxy_generated_conf = service_registry.get_service('ingress').haproxy_generate_config( CephadmDaemonDeploySpec(host='test', daemon_id='ingress', service_name=ispec.service_name())) haproxy_expected_conf = { @@ -2925,7 +2884,7 @@ class TestIngressService: keepalived_password='12345', virtual_ips_list=["1.2.3.100/24", "100.100.100.100/24"]) with with_service(cephadm_module, s) as _, with_service(cephadm_module, ispec) as _: - keepalived_generated_conf = cephadm_module.cephadm_services['ingress'].keepalived_generate_config( + keepalived_generated_conf = service_registry.get_service('ingress').keepalived_generate_config( CephadmDaemonDeploySpec(host='test', daemon_id='ingress', service_name=ispec.service_name())) keepalived_expected_conf = { @@ -3077,7 +3036,7 @@ class TestIngressService: virtual_ip=f"{ip}/24") with with_service(cephadm_module, s) as _, with_service(cephadm_module, ispec) as _: # generate the haproxy conf based on the specified spec - haproxy_daemon_spec = cephadm_module.cephadm_services['ingress'].prepare_create( + haproxy_daemon_spec = service_registry.get_service('ingress').prepare_create( CephadmDaemonDeploySpec( host='test', daemon_type='haproxy', @@ -3115,12 +3074,12 @@ class TestIngressService: virtual_ip='1.2.3.0/24', keepalive_only=True) with with_service(cephadm_module, s) as _, with_service(cephadm_module, ispec) as _: - nfs_generated_conf, _ = cephadm_module.cephadm_services['nfs'].generate_config( + nfs_generated_conf, _ = service_registry.get_service('nfs').generate_config( CephadmDaemonDeploySpec(host='test', daemon_id='foo.test.0.0', service_name=s.service_name())) ganesha_conf = nfs_generated_conf['files']['ganesha.conf'] assert "Bind_addr = 1.2.3.0/24" in ganesha_conf - keepalived_generated_conf = cephadm_module.cephadm_services['ingress'].keepalived_generate_config( + keepalived_generated_conf = service_registry.get_service('ingress').keepalived_generate_config( CephadmDaemonDeploySpec(host='test', daemon_id='ingress', service_name=ispec.service_name())) keepalived_expected_conf = { @@ -3356,8 +3315,8 @@ class TestIngressService: ] _get_daemons_by_service.return_value = nfs_daemons - ingress_svc = cephadm_module.cephadm_services['ingress'] - nfs_svc = cephadm_module.cephadm_services['nfs'] + ingress_svc = service_registry.get_service('ingress') + nfs_svc = service_registry.get_service('nfs') # add host network info to one host to test the behavior of # adding all known-good addresses of the host to the list. @@ -3420,9 +3379,7 @@ class TestJaeger: def test_jaeger_query(self, _run_cephadm, cephadm_module: CephadmOrchestrator): _run_cephadm.side_effect = async_side_effect(('{}', '', 0)) - spec = TracingSpec(es_nodes="192.168.0.1:9200", - service_type="jaeger-query") - + spec = TracingSpec(es_nodes="192.168.0.1:9200", service_type="jaeger-query") config = {"elasticsearch_nodes": "http://192.168.0.1:9200"} with with_host(cephadm_module, 'test'): diff --git a/src/pybind/mgr/cephadm/upgrade.py b/src/pybind/mgr/cephadm/upgrade.py index ed3d26807e5ce..303d63eaa8247 100644 --- a/src/pybind/mgr/cephadm/upgrade.py +++ b/src/pybind/mgr/cephadm/upgrade.py @@ -3,6 +3,7 @@ import logging import time import uuid from typing import TYPE_CHECKING, Optional, Dict, List, Tuple, Any, cast +from cephadm.services.service_registry import service_registry import orchestrator from cephadm.registry import Registry @@ -535,7 +536,7 @@ class CephadmUpgrade: # setting force flag to retain old functionality. # note that known is an output argument for ok_to_stop() - r = self.mgr.cephadm_services[daemon_type_to_service(s.daemon_type)].ok_to_stop([ + r = service_registry.get_service(daemon_type_to_service(s.daemon_type)).ok_to_stop([ s.daemon_id], known=known, force=True) if not r.retval: -- 2.39.5