]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/cephadm: using service registry pattern for cephadm services 61135/head
authorRedouane Kachach <rkachach@ibm.com>
Tue, 14 Jan 2025 09:38:13 +0000 (10:38 +0100)
committerRedouane Kachach <rkachach@ibm.com>
Tue, 28 Jan 2025 12:06:45 +0000 (13:06 +0100)
This change includes mainly the following enhancements:
- Introduced a centralized `CephadmServiceRegistry` to manage service registration and initialization.
- Added dynamic discovery of service modules in the same directory using `pkgutil` and `importlib`.
- Implemented a decorator `@service_registry_decorator` for automatic registration of service classes.

Fixes: https://tracker.ceph.com/issues/69021
Signed-off-by: Redouane Kachach <rkachach@ibm.com>
19 files changed:
src/pybind/mgr/cephadm/agent.py
src/pybind/mgr/cephadm/module.py
src/pybind/mgr/cephadm/serve.py
src/pybind/mgr/cephadm/services/cephadmservice.py
src/pybind/mgr/cephadm/services/container.py
src/pybind/mgr/cephadm/services/ingress.py
src/pybind/mgr/cephadm/services/iscsi.py
src/pybind/mgr/cephadm/services/jaeger.py
src/pybind/mgr/cephadm/services/mgmt_gateway.py
src/pybind/mgr/cephadm/services/monitoring.py
src/pybind/mgr/cephadm/services/nfs.py
src/pybind/mgr/cephadm/services/node_proxy.py
src/pybind/mgr/cephadm/services/nvmeof.py
src/pybind/mgr/cephadm/services/oauth2_proxy.py
src/pybind/mgr/cephadm/services/osd.py
src/pybind/mgr/cephadm/services/service_registry.py [new file with mode: 0644]
src/pybind/mgr/cephadm/services/smb.py
src/pybind/mgr/cephadm/tests/test_services.py
src/pybind/mgr/cephadm/upgrade.py

index d972e5bbde2c94bc118f9165fc1d75514612011a..11b33427ec7475fe14160866f9624245fcec3c64 100644 (file)
@@ -22,6 +22,7 @@ from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
 from mgr_util import test_port_allocation, PortAlreadyInUse
 from mgr_util import verify_tls_files
 import tempfile
+from cephadm.services.service_registry import service_registry
 
 from urllib.error import HTTPError, URLError
 from typing import Any, Dict, List, Set, TYPE_CHECKING, Optional, MutableMapping, IO
@@ -983,8 +984,8 @@ class CephadmAgentHelpers:
                 # we need to know the agent port to try to reconfig w/ http
                 # otherwise there is no choice but a full ssh reconfig
                 if host in self.mgr.agent_cache.agent_ports and root_cert_match and not down:
-                    daemon_spec = self.mgr.cephadm_services[daemon_type_to_service(
-                        daemon_spec.daemon_type)].prepare_create(daemon_spec)
+                    daemon_spec = service_registry.get_service(daemon_type_to_service(
+                        daemon_spec.daemon_type)).prepare_create(daemon_spec)
                     self.mgr.agent_helpers._request_agent_acks(
                         hosts={daemon_spec.host},
                         increment=True,
index afa715bb9125f86af9e897d300dad496ae722fde..61c6282ddaac2f0056b4dfbb8c8488b4f19d2363 100644 (file)
@@ -20,7 +20,7 @@ from cephadm.cert_mgr import CertMgr
 
 import string
 from typing import List, Dict, Optional, Callable, Tuple, TypeVar, \
-    Any, Set, TYPE_CHECKING, cast, NamedTuple, Sequence, Type, \
+    Any, Set, TYPE_CHECKING, cast, NamedTuple, Sequence, \
     Awaitable, Iterator
 
 import datetime
@@ -43,6 +43,7 @@ from cephadm.serve import CephadmServe
 from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
 from cephadm.http_server import CephadmHttpServer
 from cephadm.agent import CephadmAgentHelpers
+from cephadm.services.service_registry import service_registry
 
 
 from mgr_module import (
@@ -65,20 +66,12 @@ from orchestrator._interface import daemon_type_to_service
 from . import utils
 from . import ssh
 from .migrations import Migrations
-from .services.cephadmservice import MonService, MgrService, MdsService, RgwService, \
-    RbdMirrorService, CrashService, CephadmService, CephfsMirrorService, CephadmAgent, \
-    CephExporterService
-from .services.ingress import IngressService
+from .services.cephadmservice import MgrService, RgwService
 from .services.container import CustomContainerService
 from .services.iscsi import IscsiService
-from .services.nvmeof import NvmeofService
 from .services.mgmt_gateway import MgmtGatewayService
-from .services.oauth2_proxy import OAuth2ProxyService
-from .services.nfs import NFSService
 from .services.osd import OSDRemovalQueue, OSDService, OSD, NotFoundError
-from .services.monitoring import GrafanaService, AlertmanagerService, PrometheusService, \
-    NodeExporterService, SNMPGatewayService, LokiService, PromtailService
-from .services.jaeger import ElasticSearchService, JaegerAgentService, JaegerCollectorService, JaegerQueryService
+from .services.monitoring import AlertmanagerService, PrometheusService
 from .services.node_proxy import NodeProxy
 from .services.smb import SMBService
 from .schedule import HostAssignment
@@ -615,49 +608,13 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
 
         self.migration = Migrations(self)
 
-        _service_classes: Sequence[Type[CephadmService]] = [
-            AlertmanagerService,
-            CephExporterService,
-            CephadmAgent,
-            CephfsMirrorService,
-            CrashService,
-            CustomContainerService,
-            ElasticSearchService,
-            GrafanaService,
-            IngressService,
-            IscsiService,
-            JaegerAgentService,
-            JaegerCollectorService,
-            JaegerQueryService,
-            LokiService,
-            MdsService,
-            MgrService,
-            MonService,
-            NFSService,
-            NodeExporterService,
-            NodeProxy,
-            NvmeofService,
-            OSDService,
-            PrometheusService,
-            PromtailService,
-            RbdMirrorService,
-            RgwService,
-            SMBService,
-            SNMPGatewayService,
-            MgmtGatewayService,
-            OAuth2ProxyService,
-        ]
-
-        # https://github.com/python/mypy/issues/8993
-        self.cephadm_services: Dict[str, CephadmService] = {
-            cls.TYPE: cls(self) for cls in _service_classes}  # type: ignore
+        service_registry.init_services(self)
 
-        self.mgr_service: MgrService = cast(MgrService, self.cephadm_services['mgr'])
-        self.osd_service: OSDService = cast(OSDService, self.cephadm_services['osd'])
-        self.iscsi_service: IscsiService = cast(IscsiService, self.cephadm_services['iscsi'])
-        self.nvmeof_service: NvmeofService = cast(NvmeofService, self.cephadm_services['nvmeof'])
-        self.node_proxy_service: NodeProxy = cast(NodeProxy, self.cephadm_services['node-proxy'])
-        self.rgw_service: RgwService = cast(RgwService, self.cephadm_services['rgw'])
+        self.mgr_service: MgrService = cast(MgrService, service_registry.get_service('mgr'))
+        self.osd_service: OSDService = cast(OSDService, service_registry.get_service('osd'))
+        self.rgw_service: RgwService = cast(RgwService, service_registry.get_service('rgw'))
+        self.node_proxy_service: NodeProxy = cast(NodeProxy, service_registry.get_service('node-proxy'))
+        self.iscsi_service: IscsiService = cast(IscsiService, service_registry.get_service('iscsi'))  # used for UT only
 
         self.scheduled_async_actions: List[Callable] = []
 
@@ -696,10 +653,6 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
         self.run = False
         self.event.set()
 
-    def _get_cephadm_service(self, service_type: str) -> CephadmService:
-        assert service_type in ServiceSpec.KNOWN_SERVICE_TYPES
-        return self.cephadm_services[service_type]
-
     def get_fqdn(self, hostname: str) -> str:
         """Get a host's FQDN with its hostname.
 
@@ -1933,9 +1886,9 @@ Then run the following:
                 self.log.info(f"removing: {d.name()}")
 
                 if d.daemon_type != 'osd':
-                    self.cephadm_services[daemon_type_to_service(str(d.daemon_type))].pre_remove(d)
-                    self.cephadm_services[daemon_type_to_service(
-                        str(d.daemon_type))].post_remove(d, is_failed_deploy=False)
+                    service_registry.get_service(daemon_type_to_service(str(d.daemon_type))).pre_remove(d)
+                    service_registry.get_service(daemon_type_to_service(
+                        str(d.daemon_type))).post_remove(d, is_failed_deploy=False)
                 else:
                     cmd_args = {
                         'prefix': 'osd purge-actual',
@@ -2049,8 +2002,8 @@ Then run the following:
         error_notifications: List[str] = []
         okay: bool = True
         for daemon_type, daemon_ids in daemon_map.items():
-            r = self.cephadm_services[daemon_type_to_service(
-                daemon_type)].ok_to_stop(daemon_ids, force=force)
+            r = service_registry.get_service(daemon_type_to_service(
+                daemon_type)).ok_to_stop(daemon_ids, force=force)
             if r.retval:
                 okay = False
                 # collect error notifications so user can see every daemon causing host
@@ -2487,8 +2440,8 @@ Then run the following:
 
         # deploy a new keyring file
         if daemon_spec.daemon_type != 'osd':
-            daemon_spec = self.cephadm_services[daemon_type_to_service(
-                daemon_spec.daemon_type)].prepare_create(daemon_spec)
+            daemon_spec = service_registry.get_service(daemon_type_to_service(
+                daemon_spec.daemon_type)).prepare_create(daemon_spec)
         with self.async_timeout_handler(daemon_spec.host, f'cephadm deploy ({daemon_spec.daemon_type} daemon)'):
             self.wait_async(CephadmServe(self)._create_daemon(daemon_spec, reconfig=True))
 
@@ -2539,8 +2492,8 @@ Then run the following:
 
         if action == 'redeploy' or action == 'reconfig':
             if daemon_spec.daemon_type != 'osd':
-                daemon_spec = self.cephadm_services[daemon_type_to_service(
-                    daemon_spec.daemon_type)].prepare_create(daemon_spec)
+                daemon_spec = service_registry.get_service(daemon_type_to_service(
+                    daemon_spec.daemon_type)).prepare_create(daemon_spec)
             else:
                 # for OSDs, we still need to update config, just not carry out the full
                 # prepare_create function
@@ -2932,7 +2885,7 @@ Then run the following:
                           daemon_type: str,
                           daemon_id: str) -> List[str]:
         svc_type = daemon_type_to_service(daemon_type)
-        svc_cls = self.cephadm_services.get(svc_type, None)
+        svc_cls = service_registry.get_service(svc_type)
         deps = svc_cls.get_dependencies(self, spec, daemon_type) if svc_cls else []
         return sorted(deps)
 
@@ -2986,10 +2939,10 @@ Then run the following:
                                              forcename=name)
 
             if not did_config:
-                self.cephadm_services[service_type].config(spec)
+                service_registry.get_service(service_type).config(spec)
                 did_config = True
 
-            daemon_spec = self.cephadm_services[service_type].make_daemon_spec(
+            daemon_spec = service_registry.get_service(service_type).make_daemon_spec(
                 host, daemon_id, network, spec,
                 # NOTE: this does not consider port conflicts!
                 ports=spec.get_port_start())
@@ -3007,7 +2960,7 @@ Then run the following:
 
         @forall_hosts
         def create_func_map(*args: Any) -> str:
-            daemon_spec = self.cephadm_services[daemon_type].prepare_create(*args)
+            daemon_spec = service_registry.get_service(daemon_type).prepare_create(*args)
             with self.async_timeout_handler(daemon_spec.host, f'cephadm deploy ({daemon_spec.daemon_type} daemon)'):
                 return self.wait_async(CephadmServe(self)._create_daemon(daemon_spec))
 
@@ -3351,7 +3304,7 @@ Then run the following:
                     'service_type': spec.service_type,
                     'data': self._preview_osdspecs(osdspecs=[cast(DriveGroupSpec, spec)])}
 
-        svc = self.cephadm_services[spec.service_type]
+        svc = service_registry.get_service(spec.service_type)
         rank_map = None
         if svc.ranked(spec):
             rank_map = self.spec_store[spec.service_name()].rank_map
@@ -3455,7 +3408,7 @@ Then run the following:
             draining_hosts=self.cache.get_draining_hosts(),
             networks=self.cache.networks,
             daemons=self.cache.get_daemons_by_service(spec.service_name()),
-            allow_colo=self.cephadm_services[spec.service_type].allow_colo(),
+            allow_colo=service_registry.get_service(spec.service_type).allow_colo(),
         ).validate()
 
         self.log.info('Saving service %s spec with placement %s' % (
index 87b3a1df85155b6551082354b31ac97dc1d0bbef..f2bccb104e490418ea672a36840a9eb21520b885 100644 (file)
@@ -31,6 +31,7 @@ from cephadm.utils import forall_hosts, cephadmNoImage, is_repo_digest, \
     CephadmNoImage, CEPH_TYPES, ContainerInspectInfo, SpecialHostLabels
 from mgr_module import MonCommandFailed
 from mgr_util import format_bytes, verify_tls, get_cert_issuer_info, ServerConfigException
+from cephadm.services.service_registry import service_registry
 
 from . import utils
 from . import exchange
@@ -552,7 +553,7 @@ class CephadmServe:
             daemon_type_to_service(cast(str, dd.daemon_type))
             for dd in managed
         }
-        _services = [self.mgr.cephadm_services[dt] for dt in svcs]
+        _services = [service_registry.get_service(dt) for dt in svcs]
 
         def _filter(
             service_type: str, daemon_id: str, name: str
@@ -752,7 +753,7 @@ class CephadmServe:
             # return a solid indication
             return False
 
-        svc = self.mgr.cephadm_services[service_type]
+        svc = service_registry.get_service(service_type)
         daemons = self.mgr.cache.get_daemons_by_service(service_name)
         related_service_daemons = self.mgr.cache.get_related_service_daemons(spec)
 
@@ -1112,7 +1113,7 @@ class CephadmServe:
             if dd.daemon_type in REQUIRES_POST_ACTIONS:
                 daemons_post[dd.daemon_type].append(dd)
 
-            if self.mgr.cephadm_services[daemon_type_to_service(dd.daemon_type)].get_active_daemon(
+            if service_registry.get_service(daemon_type_to_service(dd.daemon_type)).get_active_daemon(
                self.mgr.cache.get_daemons_by_service(dd.service_name())).daemon_id == dd.daemon_id:
                 dd.is_active = True
             else:
@@ -1196,7 +1197,7 @@ class CephadmServe:
                     self.mgr.requires_post_actions.remove(d.name())
                     run_post = True
             if run_post:
-                self.mgr._get_cephadm_service(daemon_type_to_service(
+                service_registry.get_service(daemon_type_to_service(
                     daemon_type)).daemon_check_post(daemon_descs)
 
     def _purge_deleted_services(self) -> None:
@@ -1212,7 +1213,7 @@ class CephadmServe:
 
             logger.info(f'Purge service {service_name}')
 
-            self.mgr.cephadm_services[spec.service_type].purge(service_name)
+            service_registry.get_service(spec.service_type).purge(service_name)
             self.mgr.spec_store.finally_rm(service_name)
 
     def convert_tags_to_repo_digest(self) -> None:
@@ -1494,7 +1495,7 @@ class CephadmServe:
                     # we have to clean up the daemon. E.g. keyrings.
                     servict_type = daemon_type_to_service(daemon_spec.daemon_type)
                     dd = daemon_spec.to_daemon_description(DaemonDescriptionStatus.error, 'failed')
-                    self.mgr.cephadm_services[servict_type].post_remove(dd, is_failed_deploy=True)
+                    service_registry.get_service(servict_type).post_remove(dd, is_failed_deploy=True)
                 raise
 
     def _setup_extra_deployment_args(
@@ -1559,7 +1560,7 @@ class CephadmServe:
 
         with set_exception_subject('service', daemon.service_id(), overwrite=True):
 
-            self.mgr.cephadm_services[daemon_type_to_service(daemon_type)].pre_remove(daemon)
+            service_registry.get_service(daemon_type_to_service(daemon_type)).pre_remove(daemon)
             # NOTE: we are passing the 'force' flag here, which means
             # we can delete a mon instances data.
             dd = self.mgr.cache.get_daemon(daemon.daemon_name)
@@ -1579,11 +1580,11 @@ class CephadmServe:
 
             if not no_post_remove:
                 if daemon_type not in ['iscsi']:
-                    self.mgr.cephadm_services[daemon_type_to_service(
-                        daemon_type)].post_remove(daemon, is_failed_deploy=False)
+                    service_registry.get_service(daemon_type_to_service(
+                        daemon_type)).post_remove(daemon, is_failed_deploy=False)
                 else:
-                    self.mgr.scheduled_async_actions.append(lambda: self.mgr.cephadm_services[daemon_type_to_service(
-                                                            daemon_type)].post_remove(daemon, is_failed_deploy=False))
+                    self.mgr.scheduled_async_actions.append(lambda: service_registry.get_service(daemon_type_to_service(
+                                                            daemon_type)).post_remove(daemon, is_failed_deploy=False))
                     self.mgr._kick_serve_loop()
 
             self.mgr.recently_altered_daemons[name] = datetime_now()
index 607e1bd9019e6d770b2ec728fce73d238eb9cc99..19ecc79b414b36114a2a3f2a93c846a8a4002bdc 100644 (file)
@@ -26,6 +26,7 @@ from mgr_util import build_url, merge_dicts
 from orchestrator import OrchestratorError, DaemonDescription, DaemonDescriptionStatus
 from orchestrator._interface import daemon_type_to_service
 from cephadm import utils
+from .service_registry import register_cephadm_service
 
 if TYPE_CHECKING:
     from cephadm.module import CephadmOrchestrator
@@ -645,6 +646,7 @@ class CephService(CephadmService):
         })
 
 
+@register_cephadm_service
 class MonService(CephService):
     TYPE = 'mon'
 
@@ -808,6 +810,7 @@ class MonService(CephService):
                     logger.error(f'Failed setting crush location for mon {dd.daemon_id}: {e}')
 
 
+@register_cephadm_service
 class MgrService(CephService):
     TYPE = 'mgr'
 
@@ -929,6 +932,7 @@ class MgrService(CephService):
         return HandleCommandResult(0, warn_message, '')
 
 
+@register_cephadm_service
 class MdsService(CephService):
     TYPE = 'mds'
 
@@ -985,6 +989,7 @@ class MdsService(CephService):
         })
 
 
+@register_cephadm_service
 class RgwService(CephService):
     TYPE = 'rgw'
 
@@ -1252,6 +1257,7 @@ class RgwService(CephService):
         self.mgr.trigger_connect_dashboard_rgw()
 
 
+@register_cephadm_service
 class RbdMirrorService(CephService):
     TYPE = 'rbd-mirror'
 
@@ -1286,6 +1292,7 @@ class RbdMirrorService(CephService):
         return HandleCommandResult(0, warn_message, '')
 
 
+@register_cephadm_service
 class CrashService(CephService):
     TYPE = 'crash'
 
@@ -1304,6 +1311,7 @@ class CrashService(CephService):
         return daemon_spec
 
 
+@register_cephadm_service
 class CephExporterService(CephService):
     TYPE = 'ceph-exporter'
     DEFAULT_SERVICE_PORT = 9926
@@ -1356,6 +1364,7 @@ class CephExporterService(CephService):
         return self.mgr.cert_mgr.generate_cert(host_fqdn, node_ip)
 
 
+@register_cephadm_service
 class CephfsMirrorService(CephService):
     TYPE = 'cephfs-mirror'
 
@@ -1388,6 +1397,7 @@ class CephfsMirrorService(CephService):
         return daemon_spec
 
 
+@register_cephadm_service
 class CephadmAgent(CephService):
     TYPE = 'agent'
 
index b9cdfad5e760e73f7e35d189ad778d900c80acb4..9451952415e398cb16c179740f9308cb31a1e803 100644 (file)
@@ -2,12 +2,14 @@ import logging
 from typing import List, Any, Tuple, Dict, cast
 
 from ceph.deployment.service_spec import CustomContainerSpec
+from .service_registry import register_cephadm_service
 
 from .cephadmservice import CephadmService, CephadmDaemonDeploySpec
 
 logger = logging.getLogger(__name__)
 
 
+@register_cephadm_service
 class CustomContainerService(CephadmService):
     TYPE = 'container'
 
index 442458f2711d35397d5a9692a5fb94651cec0c4a..60fc586da85c65582b3330e7f9c3f49492ee4805 100644 (file)
@@ -9,6 +9,7 @@ from mgr_util import build_url
 from cephadm import utils
 from orchestrator import OrchestratorError, DaemonDescription
 from cephadm.services.cephadmservice import CephadmDaemonDeploySpec, CephService
+from .service_registry import register_cephadm_service
 
 if TYPE_CHECKING:
     from ..module import CephadmOrchestrator
@@ -16,6 +17,7 @@ if TYPE_CHECKING:
 logger = logging.getLogger(__name__)
 
 
+@register_cephadm_service
 class IngressService(CephService):
     TYPE = 'ingress'
     MAX_KEEPALIVED_PASS_LEN = 8
index 963a845ec74da7fa5ceacde13f6a4d341b57327e..e3e924493c406f3b2bbee6a95d9a90602e3b3991 100644 (file)
@@ -10,6 +10,7 @@ from ceph.deployment.service_spec import IscsiServiceSpec, ServiceSpec
 
 from orchestrator import DaemonDescription, DaemonDescriptionStatus
 from .cephadmservice import CephadmDaemonDeploySpec, CephService
+from .service_registry import register_cephadm_service
 from .. import utils
 
 if TYPE_CHECKING:
@@ -29,6 +30,7 @@ def get_trusted_ips(mgr: "CephadmOrchestrator", spec: IscsiServiceSpec) -> str:
     return trusted_ip_list
 
 
+@register_cephadm_service
 class IscsiService(CephService):
     TYPE = 'iscsi'
 
index 6c415512eefae8254b2f59b812d0ebd0233ea8fb..cde78769aa7cd45d2555674f4be1abd40932e0c3 100644 (file)
@@ -1,12 +1,14 @@
 from typing import List, cast, Optional, TYPE_CHECKING
 from cephadm.services.cephadmservice import CephadmService, CephadmDaemonDeploySpec
 from ceph.deployment.service_spec import TracingSpec, ServiceSpec
+from .service_registry import register_cephadm_service
 from mgr_util import build_url
 
 if TYPE_CHECKING:
     from ..module import CephadmOrchestrator
 
 
+@register_cephadm_service
 class ElasticSearchService(CephadmService):
     TYPE = 'elasticsearch'
     DEFAULT_SERVICE_PORT = 9200
@@ -16,6 +18,7 @@ class ElasticSearchService(CephadmService):
         return daemon_spec
 
 
+@register_cephadm_service
 class JaegerAgentService(CephadmService):
     TYPE = 'jaeger-agent'
     DEFAULT_SERVICE_PORT = 6799
@@ -47,6 +50,7 @@ class JaegerAgentService(CephadmService):
         return daemon_spec
 
 
+@register_cephadm_service
 class JaegerCollectorService(CephadmService):
     TYPE = 'jaeger-collector'
     DEFAULT_SERVICE_PORT = 14250
@@ -58,6 +62,7 @@ class JaegerCollectorService(CephadmService):
         return daemon_spec
 
 
+@register_cephadm_service
 class JaegerQueryService(CephadmService):
     TYPE = 'jaeger-query'
     DEFAULT_SERVICE_PORT = 16686
index 0706a9ad17752f168b21d73a90b7a09cb7e3530e..fa47428e5dcccb208337ba45a3b0ba972645f2d7 100644 (file)
@@ -4,6 +4,7 @@ from typing import List, Any, Tuple, Dict, cast, Optional, TYPE_CHECKING
 from orchestrator import DaemonDescription
 from ceph.deployment.service_spec import MgmtGatewaySpec, GrafanaSpec, ServiceSpec
 from cephadm.services.cephadmservice import CephadmService, CephadmDaemonDeploySpec, get_dashboard_endpoints
+from .service_registry import register_cephadm_service
 
 if TYPE_CHECKING:
     from ..module import CephadmOrchestrator
@@ -11,6 +12,7 @@ if TYPE_CHECKING:
 logger = logging.getLogger(__name__)
 
 
+@register_cephadm_service
 class MgmtGatewayService(CephadmService):
     TYPE = 'mgmt-gateway'
     SVC_TEMPLATE_PATH = 'services/mgmt-gateway/nginx.conf.j2'
index bd0620f595f5dc21a7874acc8adefe16e3a0ba38..ced3c5c2d42ba210c923b5043f2e6b46f737dac5 100644 (file)
@@ -6,6 +6,7 @@ from typing import List, Any, Tuple, Dict, Optional, cast, TYPE_CHECKING
 import ipaddress
 
 from mgr_module import HandleCommandResult
+from .service_registry import register_cephadm_service
 
 from orchestrator import DaemonDescription
 from ceph.deployment.service_spec import AlertManagerSpec, GrafanaSpec, ServiceSpec, \
@@ -21,6 +22,7 @@ if TYPE_CHECKING:
 logger = logging.getLogger(__name__)
 
 
+@register_cephadm_service
 class GrafanaService(CephadmService):
     TYPE = 'grafana'
     DEFAULT_SERVICE_PORT = 3000
@@ -286,6 +288,7 @@ class GrafanaService(CephadmService):
         return HandleCommandResult(0, warn_message, '')
 
 
+@register_cephadm_service
 class AlertmanagerService(CephadmService):
     TYPE = 'alertmanager'
     DEFAULT_SERVICE_PORT = 9093
@@ -452,6 +455,7 @@ class AlertmanagerService(CephadmService):
         return HandleCommandResult(0, warn_message, '')
 
 
+@register_cephadm_service
 class PrometheusService(CephadmService):
     TYPE = 'prometheus'
     DEFAULT_SERVICE_PORT = 9095
@@ -732,6 +736,7 @@ class PrometheusService(CephadmService):
         return '/prometheus/federate'
 
 
+@register_cephadm_service
 class NodeExporterService(CephadmService):
     TYPE = 'node-exporter'
     DEFAULT_SERVICE_PORT = 9100
@@ -789,6 +794,7 @@ class NodeExporterService(CephadmService):
         return HandleCommandResult(0, out, '')
 
 
+@register_cephadm_service
 class LokiService(CephadmService):
     TYPE = 'loki'
     DEFAULT_SERVICE_PORT = 3100
@@ -810,6 +816,7 @@ class LokiService(CephadmService):
         }, sorted(deps)
 
 
+@register_cephadm_service
 class PromtailService(CephadmService):
     TYPE = 'promtail'
     DEFAULT_SERVICE_PORT = 9080
@@ -847,6 +854,7 @@ class PromtailService(CephadmService):
         }, deps
 
 
+@register_cephadm_service
 class SNMPGatewayService(CephadmService):
     TYPE = 'snmp-gateway'
 
index 89a977c4624df3acf2a2d05c9af2f3a90a0474b6..8b6ea752d5484e0793b2f9cdded63108b37687ba 100644 (file)
@@ -12,6 +12,7 @@ from mgr_module import HandleCommandResult
 from mgr_module import NFS_POOL_NAME as POOL_NAME
 
 from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec
+from .service_registry import register_cephadm_service
 
 from orchestrator import DaemonDescription
 
@@ -20,6 +21,7 @@ from cephadm.services.cephadmservice import AuthEntity, CephadmDaemonDeploySpec,
 logger = logging.getLogger(__name__)
 
 
+@register_cephadm_service
 class NFSService(CephService):
     TYPE = 'nfs'
     DEFAULT_EXPORTER_PORT = 9587
index 8ad230b6342b27db33ebd2085cc942957e816a38..3547b14bdb4ead13e47f03493c50f75ab33e8e8f 100644 (file)
@@ -5,6 +5,7 @@ import base64
 from urllib.error import HTTPError, URLError
 from typing import List, Any, Dict, Tuple, Optional, MutableMapping, TYPE_CHECKING
 
+from .service_registry import register_cephadm_service
 from .cephadmservice import CephadmDaemonDeploySpec, CephService
 from ceph.deployment.service_spec import ServiceSpec, PlacementSpec
 from ceph.utils import http_req
@@ -14,6 +15,7 @@ if TYPE_CHECKING:
     from ..module import CephadmOrchestrator
 
 
+@register_cephadm_service
 class NodeProxy(CephService):
     TYPE = 'node-proxy'
 
index 8acec94f3829c05b4b5e88f268f57f97e8d357ba..b522a762032636e0c477f3610896f57eebd84272 100644 (file)
@@ -9,11 +9,13 @@ from ceph.deployment.service_spec import NvmeofServiceSpec
 
 from orchestrator import OrchestratorError, DaemonDescription, DaemonDescriptionStatus
 from .cephadmservice import CephadmDaemonDeploySpec, CephService
+from .service_registry import register_cephadm_service
 from .. import utils
 
 logger = logging.getLogger(__name__)
 
 
+@register_cephadm_service
 class NvmeofService(CephService):
     TYPE = 'nvmeof'
     PROMETHEUS_PORT = 10008
index cabb21bce139e0f0f20ee21875d8938731a25f3a..078de78e5d3c63377a6a8def56f939cb9b9f8047 100644 (file)
@@ -6,10 +6,12 @@ import base64
 from orchestrator import DaemonDescription
 from ceph.deployment.service_spec import OAuth2ProxySpec
 from cephadm.services.cephadmservice import CephadmService, CephadmDaemonDeploySpec
+from .service_registry import register_cephadm_service
 
 logger = logging.getLogger(__name__)
 
 
+@register_cephadm_service
 class OAuth2ProxyService(CephadmService):
     TYPE = 'oauth2-proxy'
     SVC_TEMPLATE_PATH = 'services/oauth2-proxy/oauth2-proxy.conf.j2'
index 80bf92772c49b0740747af1a31583cf365fd34fe..ff93f857bd3448887bf49f956eb9290867c86e5d 100644 (file)
@@ -19,6 +19,7 @@ from orchestrator import OrchestratorError, DaemonDescription
 from mgr_module import MonCommandFailed
 
 from cephadm.services.cephadmservice import CephadmDaemonDeploySpec, CephService
+from .service_registry import register_cephadm_service
 
 if TYPE_CHECKING:
     from cephadm.module import CephadmOrchestrator
@@ -26,6 +27,7 @@ if TYPE_CHECKING:
 logger = logging.getLogger(__name__)
 
 
+@register_cephadm_service
 class OSDService(CephService):
     TYPE = 'osd'
 
diff --git a/src/pybind/mgr/cephadm/services/service_registry.py b/src/pybind/mgr/cephadm/services/service_registry.py
new file mode 100644 (file)
index 0000000..1efb5e4
--- /dev/null
@@ -0,0 +1,79 @@
+"""
+Cephadm Service Registry
+
+This module provides a centralized service registry for managing and initializing Cephadm services.
+It dynamically discovers, imports, and registers service classes in the services directory, ensuring
+modularity and scalability. The `@register_cephadm_service` decorator relies on an automatic discovery
+mechanism based on `pkgutil` and `importlib` which dynamically import modules, triggering the decorator
+for service registration and eliminating the need for manual imports.
+
+Key Features:
+- Automatically discovers and imports all service modules during initialization.
+- Registers service classes using the `@register_cephadm_service`.
+- Manages the lifecycle of service instances, initialized via `init_services`.
+- Provides a singleton `service_registry` for global access.
+
+Usage:
+1. Define a service class by deriving from the CephadmService base class.
+2. Place @register_cephadm_service decorator above your class to register it automatically.
+3. Call `service_registry.init_services(mgr)` to initialize all registered services.
+4. Access services using `service_registry.get_service(service_type)`.
+"""
+
+import os
+import logging
+from typing import Type, Dict, TYPE_CHECKING
+import importlib
+import pkgutil
+
+if TYPE_CHECKING:
+    from cephadm.module import CephadmOrchestrator
+    from .cephadmservice import CephadmService
+
+logger = logging.getLogger(__name__)
+
+
+class CephadmServiceRegistry:
+    """Centralized registry for Cephadm services."""
+
+    def __init__(self) -> None:
+        self._services: Dict[str, "CephadmService"] = {}  # Initialized service instances
+        self._registered_services: Dict[str, Type["CephadmService"]] = {}  # Mapping of service types to classes
+
+    def register_service(self, service_type: str, service_cls: Type["CephadmService"]) -> None:
+        """Registers a service class to the registry."""
+        if service_type in self._registered_services:
+            logger.warning(f"Service type '{service_type}' is already registered.")
+        logger.debug(f"Registering service: {service_type}")
+        self._registered_services[service_type] = service_cls
+
+    def init_services(self, mgr: "CephadmOrchestrator") -> None:
+        """Initializes and stores service instances using the provided orchestrator manager."""
+        self._discover_services()
+        for service_type, service_cls in self._registered_services.items():
+            logger.debug(f"Initializing service: {service_type}")
+            self._services[service_type] = service_cls(mgr)
+
+    def _discover_services(self) -> None:
+        """Dynamically discovers and imports all modules in the current directory."""
+        current_package = __name__.rsplit(".", 1)[0]  # Get the package name
+        package_path = os.path.dirname(__file__)  # Directory of this file
+        for _, module_name, _ in pkgutil.iter_modules([package_path]):
+            logger.debug(f"Importing service module: {current_package}.{module_name}")
+            importlib.import_module(f"{current_package}.{module_name}")
+
+    def get_service(self, service_type: str) -> "CephadmService":
+        """Retrieves an initialized service instance by type."""
+        return self._services[service_type]
+
+
+def register_cephadm_service(cls: Type["CephadmService"]) -> Type["CephadmService"]:
+    """
+    Decorator to register a service class with the global service registry.
+    """
+    service_registry.register_service(str(cls.TYPE), cls)
+    return cls
+
+
+# Singleton instance of the registry
+service_registry = CephadmServiceRegistry()
index e322acb0e3e7383c81effcd40e9982e73f238389..4382736042f066043dac3109b60d77e4276a5b52 100644 (file)
@@ -5,6 +5,7 @@ from typing import Any, Dict, List, Tuple, cast, Optional
 from mgr_module import HandleCommandResult
 
 from ceph.deployment.service_spec import ServiceSpec, SMBSpec
+from .service_registry import register_cephadm_service
 
 from orchestrator import DaemonDescription
 from .cephadmservice import (
@@ -17,6 +18,7 @@ from .cephadmservice import (
 logger = logging.getLogger(__name__)
 
 
+@register_cephadm_service
 class SMBService(CephService):
     TYPE = 'smb'
     DEFAULT_EXPORTER_PORT = 9922
index edb0f887188802d60053457f80773d545395daf6..a321ab75d0db6518d52ccd142af636e4819acf24 100644 (file)
@@ -9,14 +9,11 @@ import pytest
 from unittest.mock import Mock, MagicMock, call, patch, ANY
 
 from cephadm.serve import CephadmServe
-from cephadm.services.cephadmservice import MonService, MgrService, MdsService, RgwService, \
-    RbdMirrorService, CrashService, CephadmDaemonDeploySpec
+from cephadm.services.service_registry import service_registry
+from cephadm.services.cephadmservice import MonService, CephadmDaemonDeploySpec
 from cephadm.services.iscsi import IscsiService
-from cephadm.services.nfs import NFSService
 from cephadm.services.nvmeof import NvmeofService
-from cephadm.services.osd import OSDService
-from cephadm.services.monitoring import GrafanaService, AlertmanagerService, PrometheusService, \
-    NodeExporterService, LokiService, PromtailService
+from cephadm.services.monitoring import GrafanaService, AlertmanagerService, PrometheusService
 from cephadm.services.smb import SMBSpec
 from cephadm.module import CephadmOrchestrator
 from ceph.deployment.service_spec import (
@@ -108,84 +105,46 @@ class TestCephadmService:
         service._set_value_on_dashboard('svc', 'get-cmd', 'set-cmd', service_url)
         mgr.check_mon_command.assert_called_once_with({'prefix': 'get-cmd'})
 
-    def _get_services(self, mgr):
-        # services:
-        osd_service = OSDService(mgr)
-        nfs_service = NFSService(mgr)
-        mon_service = MonService(mgr)
-        mgr_service = MgrService(mgr)
-        mds_service = MdsService(mgr)
-        rgw_service = RgwService(mgr)
-        rbd_mirror_service = RbdMirrorService(mgr)
-        grafana_service = GrafanaService(mgr)
-        alertmanager_service = AlertmanagerService(mgr)
-        prometheus_service = PrometheusService(mgr)
-        node_exporter_service = NodeExporterService(mgr)
-        loki_service = LokiService(mgr)
-        promtail_service = PromtailService(mgr)
-        crash_service = CrashService(mgr)
-        iscsi_service = IscsiService(mgr)
-        nvmeof_service = NvmeofService(mgr)
-        cephadm_services = {
-            'mon': mon_service,
-            'mgr': mgr_service,
-            'osd': osd_service,
-            'mds': mds_service,
-            'rgw': rgw_service,
-            'rbd-mirror': rbd_mirror_service,
-            'nfs': nfs_service,
-            'grafana': grafana_service,
-            'alertmanager': alertmanager_service,
-            'prometheus': prometheus_service,
-            'node-exporter': node_exporter_service,
-            'loki': loki_service,
-            'promtail': promtail_service,
-            'crash': crash_service,
-            'iscsi': iscsi_service,
-            'nvmeof': nvmeof_service,
-        }
-        return cephadm_services
-
     def test_get_auth_entity(self):
         mgr = FakeMgr()
-        cephadm_services = self._get_services(mgr)
+        service_registry.init_services(mgr)
 
         for daemon_type in ['rgw', 'rbd-mirror', 'nfs', "iscsi"]:
             assert "client.%s.id1" % (daemon_type) == \
-                cephadm_services[daemon_type].get_auth_entity("id1", "host")
+                service_registry.get_service(daemon_type).get_auth_entity("id1", "host")
             assert "client.%s.id1" % (daemon_type) == \
-                cephadm_services[daemon_type].get_auth_entity("id1", "")
+                service_registry.get_service(daemon_type).get_auth_entity("id1", "")
             assert "client.%s.id1" % (daemon_type) == \
-                cephadm_services[daemon_type].get_auth_entity("id1")
+                service_registry.get_service(daemon_type).get_auth_entity("id1")
 
         assert "client.crash.host" == \
-            cephadm_services["crash"].get_auth_entity("id1", "host")
+            service_registry.get_service('crash').get_auth_entity("id1", "host")
         with pytest.raises(OrchestratorError):
-            cephadm_services["crash"].get_auth_entity("id1", "")
-            cephadm_services["crash"].get_auth_entity("id1")
+            service_registry.get_service('crash').get_auth_entity("id1", "")
+            service_registry.get_service('crash').get_auth_entity("id1")
 
-        assert "mon." == cephadm_services["mon"].get_auth_entity("id1", "host")
-        assert "mon." == cephadm_services["mon"].get_auth_entity("id1", "")
-        assert "mon." == cephadm_services["mon"].get_auth_entity("id1")
+        assert "mon." == service_registry.get_service('mon').get_auth_entity("id1", "host")
+        assert "mon." == service_registry.get_service('mon').get_auth_entity("id1", "")
+        assert "mon." == service_registry.get_service('mon').get_auth_entity("id1")
 
-        assert "mgr.id1" == cephadm_services["mgr"].get_auth_entity("id1", "host")
-        assert "mgr.id1" == cephadm_services["mgr"].get_auth_entity("id1", "")
-        assert "mgr.id1" == cephadm_services["mgr"].get_auth_entity("id1")
+        assert "mgr.id1" == service_registry.get_service('mgr').get_auth_entity("id1", "host")
+        assert "mgr.id1" == service_registry.get_service('mgr').get_auth_entity("id1", "")
+        assert "mgr.id1" == service_registry.get_service('mgr').get_auth_entity("id1")
 
         for daemon_type in ["osd", "mds"]:
             assert "%s.id1" % daemon_type == \
-                cephadm_services[daemon_type].get_auth_entity("id1", "host")
+                service_registry.get_service(daemon_type).get_auth_entity("id1", "host")
             assert "%s.id1" % daemon_type == \
-                cephadm_services[daemon_type].get_auth_entity("id1", "")
+                service_registry.get_service(daemon_type).get_auth_entity("id1", "")
             assert "%s.id1" % daemon_type == \
-                cephadm_services[daemon_type].get_auth_entity("id1")
+                service_registry.get_service(daemon_type).get_auth_entity("id1")
 
         # services based on CephadmService shouldn't have get_auth_entity
         with pytest.raises(AttributeError):
             for daemon_type in ['grafana', 'alertmanager', 'prometheus', 'node-exporter', 'loki', 'promtail']:
-                cephadm_services[daemon_type].get_auth_entity("id1", "host")
-                cephadm_services[daemon_type].get_auth_entity("id1", "")
-                cephadm_services[daemon_type].get_auth_entity("id1")
+                service_registry.get_service(daemon_type).get_auth_entity("id1", "host")
+                service_registry.get_service(daemon_type).get_auth_entity("id1", "")
+                service_registry.get_service(daemon_type).get_auth_entity("id1")
 
 
 class TestISCSIService:
@@ -1905,7 +1864,7 @@ class TestMonitoring:
         with with_host(cephadm_module, 'test'):
             with with_service(cephadm_module, ServiceSpec('mgr')) as _, \
                     with_service(cephadm_module, GrafanaSpec(initial_admin_password='secure')):
-                out = cephadm_module.cephadm_services['grafana'].generate_config(
+                out = service_registry.get_service('grafana').generate_config(
                     CephadmDaemonDeploySpec('test', 'daemon', 'grafana'))
                 assert out == (
                     {
@@ -1971,7 +1930,7 @@ class TestMonitoring:
         with with_host(cephadm_module, 'test'):
             with with_service(cephadm_module, ServiceSpec('mgr')) as _, \
                     with_service(cephadm_module, GrafanaSpec(anonymous_access=False, initial_admin_password='secure')):
-                out = cephadm_module.cephadm_services['grafana'].generate_config(
+                out = service_registry.get_service('grafana').generate_config(
                     CephadmDaemonDeploySpec('test', 'daemon', 'grafana'))
                 assert out == (
                     {
@@ -2484,7 +2443,7 @@ class TestIngressService:
         ]
         _get_daemons_by_service.return_value = nfs_daemons
 
-        haproxy_generated_conf = cephadm_module.cephadm_services['ingress'].haproxy_generate_config(
+        haproxy_generated_conf = service_registry.get_service('ingress').haproxy_generate_config(
             CephadmDaemonDeploySpec(host='host1', daemon_id='ingress', service_name=ispec.service_name()))
 
         assert haproxy_generated_conf[0] == haproxy_expected_conf
@@ -2498,7 +2457,7 @@ class TestIngressService:
         ]
         _get_daemons_by_service.return_value = nfs_daemons
 
-        haproxy_generated_conf = cephadm_module.cephadm_services['ingress'].haproxy_generate_config(
+        haproxy_generated_conf = service_registry.get_service('ingress').haproxy_generate_config(
             CephadmDaemonDeploySpec(host='host1', daemon_id='ingress', service_name=ispec.service_name()))
 
         assert haproxy_generated_conf[0] == haproxy_expected_conf
@@ -2533,7 +2492,7 @@ class TestIngressService:
                                 virtual_ip="1.2.3.4/32")
             with with_service(cephadm_module, s) as _, with_service(cephadm_module, ispec) as _:
                 # generate the keepalived conf based on the specified spec
-                keepalived_generated_conf = cephadm_module.cephadm_services['ingress'].keepalived_generate_config(
+                keepalived_generated_conf = service_registry.get_service('ingress').keepalived_generate_config(
                     CephadmDaemonDeploySpec(host='test', daemon_id='ingress', service_name=ispec.service_name()))
 
                 keepalived_expected_conf = {
@@ -2577,7 +2536,7 @@ class TestIngressService:
                 assert keepalived_generated_conf[0] == keepalived_expected_conf
 
                 # generate the haproxy conf based on the specified spec
-                haproxy_generated_conf = cephadm_module.cephadm_services['ingress'].haproxy_generate_config(
+                haproxy_generated_conf = service_registry.get_service('ingress').haproxy_generate_config(
                     CephadmDaemonDeploySpec(host='test', daemon_id='ingress', service_name=ispec.service_name()))
 
                 haproxy_expected_conf = {
@@ -2659,7 +2618,7 @@ class TestIngressService:
                                 virtual_ip="1.2.3.4/32")
             with with_service(cephadm_module, s) as _, with_service(cephadm_module, ispec) as _:
                 # generate the keepalived conf based on the specified spec
-                keepalived_generated_conf = cephadm_module.cephadm_services['ingress'].keepalived_generate_config(
+                keepalived_generated_conf = service_registry.get_service('ingress').keepalived_generate_config(
                     CephadmDaemonDeploySpec(host='test', daemon_id='ingress', service_name=ispec.service_name()))
 
                 keepalived_expected_conf = {
@@ -2703,7 +2662,7 @@ class TestIngressService:
                 assert keepalived_generated_conf[0] == keepalived_expected_conf
 
                 # generate the haproxy conf based on the specified spec
-                haproxy_generated_conf = cephadm_module.cephadm_services['ingress'].haproxy_generate_config(
+                haproxy_generated_conf = service_registry.get_service('ingress').haproxy_generate_config(
                     CephadmDaemonDeploySpec(host='test', daemon_id='ingress', service_name=ispec.service_name()))
 
                 haproxy_expected_conf = {
@@ -2788,7 +2747,7 @@ class TestIngressService:
             with with_service(cephadm_module, s) as _, with_service(cephadm_module, ispec) as _:
                 # generate the keepalived conf based on the specified spec
                 # Test with only 1 IP on the list, as it will fail with more VIPS but only one host.
-                keepalived_generated_conf = cephadm_module.cephadm_services['ingress'].keepalived_generate_config(
+                keepalived_generated_conf = service_registry.get_service('ingress').keepalived_generate_config(
                     CephadmDaemonDeploySpec(host='test', daemon_id='ingress', service_name=ispec.service_name()))
 
                 keepalived_expected_conf = {
@@ -2832,7 +2791,7 @@ class TestIngressService:
                 assert keepalived_generated_conf[0] == keepalived_expected_conf
 
                 # generate the haproxy conf based on the specified spec
-                haproxy_generated_conf = cephadm_module.cephadm_services['ingress'].haproxy_generate_config(
+                haproxy_generated_conf = service_registry.get_service('ingress').haproxy_generate_config(
                     CephadmDaemonDeploySpec(host='test', daemon_id='ingress', service_name=ispec.service_name()))
 
                 haproxy_expected_conf = {
@@ -2925,7 +2884,7 @@ class TestIngressService:
                                     keepalived_password='12345',
                                     virtual_ips_list=["1.2.3.100/24", "100.100.100.100/24"])
                 with with_service(cephadm_module, s) as _, with_service(cephadm_module, ispec) as _:
-                    keepalived_generated_conf = cephadm_module.cephadm_services['ingress'].keepalived_generate_config(
+                    keepalived_generated_conf = service_registry.get_service('ingress').keepalived_generate_config(
                         CephadmDaemonDeploySpec(host='test', daemon_id='ingress', service_name=ispec.service_name()))
 
                     keepalived_expected_conf = {
@@ -3077,7 +3036,7 @@ class TestIngressService:
                                 virtual_ip=f"{ip}/24")
             with with_service(cephadm_module, s) as _, with_service(cephadm_module, ispec) as _:
                 # generate the haproxy conf based on the specified spec
-                haproxy_daemon_spec = cephadm_module.cephadm_services['ingress'].prepare_create(
+                haproxy_daemon_spec = service_registry.get_service('ingress').prepare_create(
                     CephadmDaemonDeploySpec(
                         host='test',
                         daemon_type='haproxy',
@@ -3115,12 +3074,12 @@ class TestIngressService:
                                 virtual_ip='1.2.3.0/24',
                                 keepalive_only=True)
             with with_service(cephadm_module, s) as _, with_service(cephadm_module, ispec) as _:
-                nfs_generated_conf, _ = cephadm_module.cephadm_services['nfs'].generate_config(
+                nfs_generated_conf, _ = service_registry.get_service('nfs').generate_config(
                     CephadmDaemonDeploySpec(host='test', daemon_id='foo.test.0.0', service_name=s.service_name()))
                 ganesha_conf = nfs_generated_conf['files']['ganesha.conf']
                 assert "Bind_addr = 1.2.3.0/24" in ganesha_conf
 
-                keepalived_generated_conf = cephadm_module.cephadm_services['ingress'].keepalived_generate_config(
+                keepalived_generated_conf = service_registry.get_service('ingress').keepalived_generate_config(
                     CephadmDaemonDeploySpec(host='test', daemon_id='ingress', service_name=ispec.service_name()))
 
                 keepalived_expected_conf = {
@@ -3356,8 +3315,8 @@ class TestIngressService:
         ]
         _get_daemons_by_service.return_value = nfs_daemons
 
-        ingress_svc = cephadm_module.cephadm_services['ingress']
-        nfs_svc = cephadm_module.cephadm_services['nfs']
+        ingress_svc = service_registry.get_service('ingress')
+        nfs_svc = service_registry.get_service('nfs')
 
         # add host network info to one host to test the behavior of
         # adding all known-good addresses of the host to the list.
@@ -3420,9 +3379,7 @@ class TestJaeger:
     def test_jaeger_query(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
         _run_cephadm.side_effect = async_side_effect(('{}', '', 0))
 
-        spec = TracingSpec(es_nodes="192.168.0.1:9200",
-                           service_type="jaeger-query")
-
+        spec = TracingSpec(es_nodes="192.168.0.1:9200", service_type="jaeger-query")
         config = {"elasticsearch_nodes": "http://192.168.0.1:9200"}
 
         with with_host(cephadm_module, 'test'):
index ed3d26807e5ceec3da92d165ea50b1beed5e9c10..303d63eaa8247acaaa6e6a9e925a208f61d7e052 100644 (file)
@@ -3,6 +3,7 @@ import logging
 import time
 import uuid
 from typing import TYPE_CHECKING, Optional, Dict, List, Tuple, Any, cast
+from cephadm.services.service_registry import service_registry
 
 import orchestrator
 from cephadm.registry import Registry
@@ -535,7 +536,7 @@ class CephadmUpgrade:
 
             # setting force flag to retain old functionality.
             # note that known is an output argument for ok_to_stop()
-            r = self.mgr.cephadm_services[daemon_type_to_service(s.daemon_type)].ok_to_stop([
+            r = service_registry.get_service(daemon_type_to_service(s.daemon_type)).ok_to_stop([
                 s.daemon_id], known=known, force=True)
 
             if not r.retval: