from . import utils
from .migrations import Migrations
from .services.cephadmservice import MonService, MgrService, MdsService, RgwService, \
- RbdMirrorService, CrashService, CephadmService
+ RbdMirrorService, CrashService, CephadmService, AuthEntity
from .services.iscsi import IscsiService
from .services.nfs import NFSService
from .services.osd import RemoveUtil, OSDQueue, OSDService, OSD, NotFoundError
# type: (str, str, str, Optional[str], Optional[str]) -> Dict[str, Any]
# keyring
if not keyring:
- ename = utils.name_to_auth_entity(daemon_type, daemon_id, host=host)
+ entity: AuthEntity = \
+ self.cephadm_services[daemon_type].get_auth_entity(daemon_id, host=host)
ret, keyring, err = self.check_mon_command({
'prefix': 'auth get',
- 'entity': ename,
+ 'entity': entity,
})
# generate config
import logging
import subprocess
from abc import ABCMeta, abstractmethod
-from typing import TYPE_CHECKING, List, Callable, Any, TypeVar, Generic, Optional, Dict, Any, Tuple
+from typing import TYPE_CHECKING, List, Callable, Any, TypeVar, Generic, \
+ Optional, Dict, Any, Tuple, NewType
from mgr_module import HandleCommandResult, MonCommandFailed
logger = logging.getLogger(__name__)
ServiceSpecs = TypeVar('ServiceSpecs', bound=ServiceSpec)
+AuthEntity = NewType('AuthEntity', str)
class CephadmDaemonSpec(Generic[ServiceSpecs]):
"""
pass
+ def get_auth_entity(self, daemon_id: str, host: str = "") -> AuthEntity:
+ """
+ Map the daemon id to a cephx keyring entity name
+ """
+ if self.TYPE in ['rgw', 'rbd-mirror', 'nfs', "iscsi"]:
+ return AuthEntity('client.' + self.TYPE + "." + daemon_id)
+ elif self.TYPE == 'crash':
+ if host == "":
+ raise OrchestratorError("Host not provided to generate <crash> auth entity name")
+ return AuthEntity('client.' + self.TYPE + "." + host)
+ elif self.TYPE == 'mon':
+ return AuthEntity('mon.')
+ elif self.TYPE == 'mgr':
+ return AuthEntity(self.TYPE + "." + daemon_id)
+ elif self.TYPE in ['osd', 'mds', 'client']:
+ return AuthEntity(self.TYPE + "." + daemon_id)
+ else:
+ raise OrchestratorError("unknown daemon type")
+
class MonService(CephadmService):
TYPE = 'mon'
ret, keyring, err = self.mgr.check_mon_command({
'prefix': 'auth get-or-create',
- 'entity': utils.name_to_auth_entity('iscsi', igw_id),
+ 'entity': self.get_auth_entity(igw_id),
'caps': ['mon', 'profile rbd, '
'allow command "osd blacklist", '
'allow command "config-key get" with "key" prefix "iscsi/"',
+import pytest
+
from unittest.mock import MagicMock
-from cephadm.services.monitoring import GrafanaService
+from cephadm.services.cephadmservice import MonService, MgrService, MdsService, RgwService, \
+ RbdMirrorService, CrashService, CephadmService, AuthEntity
+from cephadm.services.iscsi import IscsiService
+from cephadm.services.nfs import NFSService
+from cephadm.services.osd import RemoveUtil, OSDQueue, OSDService, OSD, NotFoundError
+from cephadm.services.monitoring import GrafanaService, AlertmanagerService, PrometheusService, \
+ NodeExporterService
+
+from orchestrator import OrchestratorError
class FakeMgr:
mgr.check_mon_command.reset_mock()
service._set_service_url_on_dashboard('svc', 'get-cmd', 'set-cmd', service_url)
mgr.check_mon_command.assert_called_once_with({'prefix': 'get-cmd'})
+
+ def _get_services(self, mgr):
+ # services:
+ osd_service = OSDService(mgr)
+ nfs_service = NFSService(mgr)
+ mon_service = MonService(mgr)
+ mgr_service = MgrService(mgr)
+ mds_service = MdsService(mgr)
+ rgw_service = RgwService(mgr)
+ rbd_mirror_service = RbdMirrorService(mgr)
+ grafana_service = GrafanaService(mgr)
+ alertmanager_service = AlertmanagerService(mgr)
+ prometheus_service = PrometheusService(mgr)
+ node_exporter_service = NodeExporterService(mgr)
+ crash_service = CrashService(mgr)
+ iscsi_service = IscsiService(mgr)
+ cephadm_services = {
+ 'mon': mon_service,
+ 'mgr': mgr_service,
+ 'osd': osd_service,
+ 'mds': mds_service,
+ 'rgw': rgw_service,
+ 'rbd-mirror': rbd_mirror_service,
+ 'nfs': nfs_service,
+ 'grafana': grafana_service,
+ 'alertmanager': alertmanager_service,
+ 'prometheus': prometheus_service,
+ 'node-exporter': node_exporter_service,
+ 'crash': crash_service,
+ 'iscsi': iscsi_service,
+ }
+ return cephadm_services
+
+ def test_get_auth_entity(self):
+ mgr = FakeMgr()
+ cephadm_services = self._get_services(mgr)
+
+ for daemon_type in ['rgw', 'rbd-mirror', 'nfs', "iscsi"]:
+ assert "client.%s.id1" % (daemon_type) == \
+ cephadm_services[daemon_type].get_auth_entity("id1", "host")
+ assert "client.%s.id1" % (daemon_type) == \
+ cephadm_services[daemon_type].get_auth_entity("id1", "")
+ assert "client.%s.id1" % (daemon_type) == \
+ cephadm_services[daemon_type].get_auth_entity("id1")
+
+ assert "client.crash.host" == \
+ cephadm_services["crash"].get_auth_entity("id1", "host")
+ with pytest.raises(OrchestratorError):
+ t = cephadm_services["crash"].get_auth_entity("id1", "")
+ t = cephadm_services["crash"].get_auth_entity("id1")
+
+ assert "mon." == cephadm_services["mon"].get_auth_entity("id1", "host")
+ assert "mon." == cephadm_services["mon"].get_auth_entity("id1", "")
+ assert "mon." == cephadm_services["mon"].get_auth_entity("id1")
+
+ assert "mgr.id1" == cephadm_services["mgr"].get_auth_entity("id1", "host")
+ assert "mgr.id1" == cephadm_services["mgr"].get_auth_entity("id1", "")
+ assert "mgr.id1" == cephadm_services["mgr"].get_auth_entity("id1")
+
+ for daemon_type in ["osd", "mds"]:
+ assert "%s.id1" % daemon_type == \
+ cephadm_services[daemon_type].get_auth_entity("id1", "host")
+ assert "%s.id1" % daemon_type == \
+ cephadm_services[daemon_type].get_auth_entity("id1", "")
+ assert "%s.id1" % daemon_type == \
+ cephadm_services[daemon_type].get_auth_entity("id1")
+
+ with pytest.raises(OrchestratorError):
+ for daemon_type in ['grafana', 'alertmanager', 'prometheus', 'node-exporter']:
+ cephadm_services[daemon_type].get_auth_entity("id1", "host")
+ cephadm_services[daemon_type].get_auth_entity("id1", "")
+ cephadm_services[daemon_type].get_auth_entity("id1")
+++ /dev/null
-import pytest
-
-from orchestrator import OrchestratorError
-from cephadm.utils import name_to_auth_entity
-
-
-def test_name_to_auth_entity(fs):
-
- for daemon_type in ['rgw', 'rbd-mirror', 'nfs', "iscsi"]:
- assert "client.%s.id1" % (daemon_type) == name_to_auth_entity(daemon_type, "id1", "host")
- assert "client.%s.id1" % (daemon_type) == name_to_auth_entity(daemon_type, "id1", "")
- assert "client.%s.id1" % (daemon_type) == name_to_auth_entity(daemon_type, "id1")
-
- assert "client.crash.host" == name_to_auth_entity("crash", "id1", "host")
- with pytest.raises(OrchestratorError):
- t = name_to_auth_entity("crash", "id1", "")
- t = name_to_auth_entity("crash", "id1")
-
- assert "mon." == name_to_auth_entity("mon", "id1", "host")
- assert "mon." == name_to_auth_entity("mon", "id1", "")
- assert "mon." == name_to_auth_entity("mon", "id1")
-
- assert "mgr.id1" == name_to_auth_entity("mgr", "id1", "host")
- assert "mgr.id1" == name_to_auth_entity("mgr", "id1", "")
- assert "mgr.id1" == name_to_auth_entity("mgr", "id1")
-
- for daemon_type in ["osd", "mds", "client"]:
- assert "%s.id1" % daemon_type == name_to_auth_entity(daemon_type, "id1", "host")
- assert "%s.id1" % daemon_type == name_to_auth_entity(daemon_type, "id1", "")
- assert "%s.id1" % daemon_type == name_to_auth_entity(daemon_type, "id1")
-
- with pytest.raises(OrchestratorError):
- name_to_auth_entity("whatever", "id1", "host")
- name_to_auth_entity("whatever", "id1", "")
- name_to_auth_entity("whatever", "id1")
logger = logging.getLogger(__name__)
ConfEntity = NewType('ConfEntity', str)
-AuthEntity = NewType('AuthEntity', str)
class CephadmNoImage(Enum):
return ConfEntity('mon')
-def name_to_auth_entity(daemon_type: str,
- daemon_id: str,
- host: str = "",
- ) -> AuthEntity:
- """
- Map from daemon names/host to ceph entity names (as seen in config)
- """
- if daemon_type in ['rgw', 'rbd-mirror', 'nfs', "iscsi"]:
- return AuthEntity('client.' + daemon_type + "." + daemon_id)
- elif daemon_type == 'crash':
- if host == "":
- raise OrchestratorError("Host not provided to generate <crash> auth entity name")
- return AuthEntity('client.' + daemon_type + "." + host)
- elif daemon_type == 'mon':
- return AuthEntity('mon.')
- elif daemon_type == 'mgr':
- return AuthEntity(daemon_type + "." + daemon_id)
- elif daemon_type in ['osd', 'mds', 'client']:
- return AuthEntity(daemon_type + "." + daemon_id)
- else:
- raise OrchestratorError("unknown auth entity name")
-
-
def forall_hosts(f: Callable[..., T]) -> Callable[..., List[T]]:
@wraps(f)
def forall_hosts_wrapper(*args) -> List[T]: