From: Adam King Date: Mon, 22 Jan 2024 17:37:19 +0000 (-0500) Subject: mgr/cephadm: add a Cert/Key tracking/storage class X-Git-Tag: v19.1.1~104^2~19 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=300a8a3f07bad0d3dc3422ecd8e0bee6a8102d55;p=ceph.git mgr/cephadm: add a Cert/Key tracking/storage class The idea is to move storage/handling of certs from any misc. spot in the mgr/cephadm codebase into a single class. This will make it much easier to do things with the certs we have in the future. Signed-off-by: Adam King (cherry picked from commit 67814772458eb2cb3311ae81bd3a111ee9069a6c) --- diff --git a/src/pybind/mgr/cephadm/inventory.py b/src/pybind/mgr/cephadm/inventory.py index cbaff8a5b000f..e1645f8e75dbe 100644 --- a/src/pybind/mgr/cephadm/inventory.py +++ b/src/pybind/mgr/cephadm/inventory.py @@ -8,7 +8,7 @@ import logging import math import socket from typing import TYPE_CHECKING, Dict, List, Iterator, Optional, Any, Tuple, Set, Mapping, cast, \ - NamedTuple, Type, ValuesView + NamedTuple, Type, ValuesView, Union import orchestrator from ceph.deployment import inventory @@ -30,6 +30,8 @@ HOST_CACHE_PREFIX = "host." SPEC_STORE_PREFIX = "spec." AGENT_CACHE_PREFIX = 'agent.' NODE_PROXY_CACHE_PREFIX = 'node_proxy' +CERT_STORE_CERT_PREFIX = 'cert_store.cert.' +CERT_STORE_KEY_PREFIX = 'cert_store.key.' class HostCacheStatus(enum.Enum): @@ -1688,6 +1690,282 @@ class AgentCache(): self.save_agent(daemon_spec.host) +class Cert(): + def __init__(self, cert: str = '', user_made: bool = False) -> None: + self.cert = cert + self.user_made = user_made + + def __bool__(self) -> bool: + return bool(self.cert) + + def __eq__(self, other: Any) -> bool: + if isinstance(other, Cert): + return self.cert == other.cert and self.user_made == other.user_made + return NotImplemented + + def to_json(self) -> Dict[str, Union[str, bool]]: + return { + 'cert': self.cert, + 'user_made': self.user_made + } + + @classmethod + def from_json(cls, data: Dict[str, Union[str, bool]]) -> 'Cert': + if 'cert' not in data: + return cls() + cert = data['cert'] + if not isinstance(cert, str): + raise OrchestratorError('Tried to make Cert object with non-string cert') + if any(k not in ['cert', 'user_made'] for k in data.keys()): + raise OrchestratorError(f'Got unknown field for Cert object. Fields: {data.keys()}') + user_made: Union[str, bool] = data.get('user_made', False) + if not isinstance(user_made, bool): + if isinstance(user_made, str): + if user_made.lower() == 'true': + user_made = True + elif user_made.lower() == 'false': + user_made = False + try: + user_made = bool(user_made) + except Exception: + raise OrchestratorError(f'Expected user_made field in Cert object to be bool but got {type(user_made)}') + return cls(cert=cert, user_made=user_made) + + +class PrivKey(): + def __init__(self, key: str = '', user_made: bool = False) -> None: + self.key = key + self.user_made = user_made + + def __bool__(self) -> bool: + return bool(self.key) + + def __eq__(self, other: Any) -> bool: + if isinstance(other, PrivKey): + return self.key == other.key and self.user_made == other.user_made + return NotImplemented + + def to_json(self) -> Dict[str, Union[str, bool]]: + return { + 'key': self.key, + 'user_made': self.user_made + } + + @classmethod + def from_json(cls, data: Dict[str, str]) -> 'PrivKey': + if 'key' not in data: + return cls() + key = data['key'] + if not isinstance(key, str): + raise OrchestratorError('Tried to make PrivKey object with non-string key') + if any(k not in ['key', 'user_made'] for k in data.keys()): + raise OrchestratorError(f'Got unknown field for PrivKey object. Fields: {data.keys()}') + user_made: Union[str, bool] = data.get('user_made', False) + if not isinstance(user_made, bool): + if isinstance(user_made, str): + if user_made.lower() == 'true': + user_made = True + elif user_made.lower() == 'false': + user_made = False + try: + user_made = bool(user_made) + except Exception: + raise OrchestratorError(f'Expected user_made field in PrivKey object to be bool but got {type(user_made)}') + return cls(key=key, user_made=user_made) + + +class CertKeyStore(): + service_name_cert = [ + 'rgw_frontend_ssl_cert', + 'iscsi_ssl_cert', + 'ingress_ssl_cert', + ] + + host_cert = [ + 'grafana_cert', + 'alertmanager_cert', + 'prometheus_cert', + 'node_exporter_cert', + ] + + host_key = [ + 'grafana_key', + 'alertmanager_key', + 'prometheus_key', + 'node_exporter_key', + ] + + service_name_key = [ + 'iscsi_ssl_key', + 'ingress_ssl_key', + ] + + known_certs: Dict[str, Any] = {} + known_keys: Dict[str, Any] = {} + + def __init__(self, mgr: 'CephadmOrchestrator') -> None: + self.mgr: CephadmOrchestrator = mgr + self._init_known_cert_key_dicts() + + def _init_known_cert_key_dicts(self) -> None: + # In an effort to try and track all the certs we manage in cephadm + # we're being explicit here and listing them out. + self.known_certs = { + 'rgw_frontend_ssl_cert': {}, # service-name -> cert + 'iscsi_ssl_cert': {}, # service-name -> cert + 'ingress_ssl_cert': {}, # service-name -> cert + 'agent_endpoint_root_cert': Cert(), # cert + 'service_discovery_root_cert': Cert(), # cert + 'grafana_cert': {}, # host -> cert + 'alertmanager_cert': {}, # host -> cert + 'prometheus_cert': {}, # host -> cert + 'node_exporter_cert': {}, # host -> cert + } + # Similar to certs but for priv keys. Entries in known_certs + # that don't have a key here are probably certs in PEM format + # so there is no need to store a separate key + self.known_keys = { + 'agent_endpoint_key': PrivKey(), # key + 'service_discovery_key': PrivKey(), # key + 'grafana_key': {}, # host -> key + 'alertmanager_key': {}, # host -> key + 'prometheus_key': {}, # host -> key + 'node_exporter_key': {}, # host -> key + 'iscsi_ssl_key': {}, # service-name -> key + 'ingress_ssl_key': {}, # service-name -> key + } + + def get_cert(self, entity: str, service_name: str = '', host: str = '') -> str: + self._validate_cert_entity(entity, service_name, host) + + cert = Cert() + if entity in self.service_name_cert or entity in self.host_cert: + var = service_name if entity in self.service_name_cert else host + if var not in self.known_certs[entity]: + return '' + cert = self.known_certs[entity][var] + else: + cert = self.known_certs[entity] + if not cert or not isinstance(cert, Cert): + return '' + return cert.cert + + def save_cert(self, entity: str, cert: str, service_name: str = '', host: str = '', user_made: bool = False) -> None: + self._validate_cert_entity(entity, service_name, host) + + cert_obj = Cert(cert, user_made) + + j: Union[str, Dict[Any, Any], None] = None + if entity in self.service_name_cert or entity in self.host_cert: + var = service_name if entity in self.service_name_cert else host + j = {} + self.known_certs[entity][var] = cert_obj + for service_name in self.known_certs[entity].keys(): + j[var] = Cert.to_json(self.known_certs[entity][var]) + else: + self.known_certs[entity] = cert_obj + j = Cert.to_json(cert_obj) + self.mgr.set_store(CERT_STORE_CERT_PREFIX + entity, json.dumps(j)) + + def rm_cert(self, entity: str, service_name: str = '', host: str = '') -> None: + self.save_cert(entity, cert='', service_name=service_name, host=host) + + def _validate_cert_entity(self, entity: str, service_name: str = '', host: str = '') -> None: + if entity not in self.known_certs.keys(): + raise OrchestratorError(f'Attempted to access cert for unknown entity {entity}') + + if entity in self.host_cert and not host: + raise OrchestratorError(f'Need host to access cert for entity {entity}') + + if entity in self.service_name_cert and not service_name: + raise OrchestratorError(f'Need service name to access cert for entity {entity}') + + def cert_ls(self) -> Dict[str, Union[bool, Dict[str, bool]]]: + ls: Dict[str, Any] = {} + for k, v in self.known_certs.items(): + if k in self.service_name_cert or k in self.host_cert: + tmp: Dict[str, Any] = {key: True for key in v if v[key]} + ls[k] = tmp if tmp else False + else: + ls[k] = bool(v) + return ls + + def get_key(self, entity: str, service_name: str = '', host: str = '') -> str: + self._validate_key_entity(entity, host) + + key = PrivKey() + if entity in self.host_key or entity in self.service_name_key: + var = service_name if entity in self.service_name_key else host + if var not in self.known_keys[entity]: + return '' + key = self.known_keys[entity][var] + else: + key = self.known_keys[entity] + if not key or not isinstance(key, PrivKey): + return '' + return key.key + + def save_key(self, entity: str, key: str, service_name: str = '', host: str = '', user_made: bool = False) -> None: + self._validate_key_entity(entity, host) + + pkey = PrivKey(key, user_made) + + j: Union[str, Dict[Any, Any], None] = None + if entity in self.host_key or entity in self.service_name_key: + var = service_name if entity in self.service_name_key else host + j = {} + self.known_keys[entity][var] = pkey + for k in self.known_keys[entity]: + j[k] = PrivKey.to_json(self.known_keys[entity][k]) + else: + self.known_keys[entity] = pkey + j = PrivKey.to_json(pkey) + self.mgr.set_store(CERT_STORE_KEY_PREFIX + entity, json.dumps(j)) + + def rm_key(self, entity: str, service_name: str = '', host: str = '') -> None: + self.save_key(entity, key='', service_name=service_name, host=host) + + def _validate_key_entity(self, entity: str, host: str = '') -> None: + if entity not in self.known_keys.keys(): + raise OrchestratorError(f'Attempted to access priv key for unknown entity {entity}') + + if entity in self.host_key and not host: + raise OrchestratorError(f'Need host to access priv key for entity {entity}') + + def key_ls(self) -> Dict[str, Union[bool, Dict[str, bool]]]: + ls: Dict[str, Any] = {} + for k, v in self.known_keys.items(): + if k in self.host_key or k in self.service_name_key: + tmp: Dict[str, Any] = {key: True for key in v if v[key]} + ls[k] = tmp if tmp else False + else: + ls[k] = bool(v) + return ls + + def load(self) -> None: + for k, v in self.mgr.get_store_prefix(CERT_STORE_CERT_PREFIX).items(): + entity = k[len(CERT_STORE_CERT_PREFIX):] + self.known_certs[entity] = json.loads(v) + if entity in self.service_name_cert or entity in self.host_cert: + for k in self.known_certs[entity]: + cert_obj = Cert.from_json(self.known_certs[entity][k]) + self.known_certs[entity][k] = cert_obj + else: + cert_obj = Cert.from_json(self.known_certs[entity]) + self.known_certs[entity] = cert_obj + + for k, v in self.mgr.get_store_prefix(CERT_STORE_KEY_PREFIX).items(): + entity = k[len(CERT_STORE_KEY_PREFIX):] + self.known_keys[entity] = json.loads(v) + if entity in self.host_key or entity in self.service_name_key: + for k in self.known_keys[entity]: + priv_key_obj = PrivKey.from_json(self.known_keys[entity][k]) + self.known_keys[entity][k] = priv_key_obj + else: + priv_key_obj = PrivKey.from_json(self.known_keys[entity]) + self.known_keys[entity] = priv_key_obj + + class EventStore(): def __init__(self, mgr): # type: (CephadmOrchestrator) -> None diff --git a/src/pybind/mgr/cephadm/module.py b/src/pybind/mgr/cephadm/module.py index b48a5deba8975..374c58ab2a0c7 100644 --- a/src/pybind/mgr/cephadm/module.py +++ b/src/pybind/mgr/cephadm/module.py @@ -78,8 +78,18 @@ from .services.jaeger import ElasticSearchService, JaegerAgentService, JaegerCol from .services.node_proxy import NodeProxy from .services.smb import SMBService from .schedule import HostAssignment -from .inventory import Inventory, SpecStore, HostCache, AgentCache, EventStore, \ - ClientKeyringStore, ClientKeyringSpec, TunedProfileStore, NodeProxyCache +from .inventory import ( + Inventory, + SpecStore, + HostCache, + AgentCache, + EventStore, + ClientKeyringStore, + ClientKeyringSpec, + TunedProfileStore, + NodeProxyCache, + CertKeyStore, +) from .upgrade import CephadmUpgrade from .template import TemplateMgr from .utils import CEPH_IMAGE_TYPES, RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES, forall_hosts, \ @@ -654,6 +664,9 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, self.tuned_profile_utils = TunedProfileUtils(self) + self.cert_key_store = CertKeyStore(self) + self.cert_key_store.load() + # ensure the host lists are in sync for h in self.inventory.keys(): if h not in self.cache.daemons: diff --git a/src/pybind/mgr/cephadm/tests/test_cephadm.py b/src/pybind/mgr/cephadm/tests/test_cephadm.py index 80cda47df527b..d9b1f5f8e784c 100644 --- a/src/pybind/mgr/cephadm/tests/test_cephadm.py +++ b/src/pybind/mgr/cephadm/tests/test_cephadm.py @@ -8,7 +8,14 @@ import pytest from ceph.deployment.drive_group import DriveGroupSpec, DeviceSelection from cephadm.serve import CephadmServe -from cephadm.inventory import HostCacheStatus, ClientKeyringSpec +from cephadm.inventory import ( + HostCacheStatus, + ClientKeyringSpec, + Cert, + PrivKey, + CERT_STORE_CERT_PREFIX, + CERT_STORE_KEY_PREFIX, +) from cephadm.services.osd import OSD, OSDRemovalQueue, OsdIdClaims from cephadm.utils import SpecialHostLabels @@ -1690,6 +1697,173 @@ class TestCephadm(object): assert cephadm_module.cache._get_host_cache_entry_status( 'host.nothing.com') == HostCacheStatus.stray + @mock.patch("cephadm.module.CephadmOrchestrator.set_store") + def test_cert_store_save_cert(self, _set_store, cephadm_module: CephadmOrchestrator): + cephadm_module.cert_key_store._init_known_cert_key_dicts() + + agent_endpoint_root_cert = 'fake-agent-cert' + alertmanager_host1_cert = 'fake-alertm-host1-cert' + rgw_frontend_rgw_foo_host2_cert = 'fake-rgw-cert' + cephadm_module.cert_key_store.save_cert('agent_endpoint_root_cert', agent_endpoint_root_cert) + cephadm_module.cert_key_store.save_cert('alertmanager_cert', alertmanager_host1_cert, host='host1') + cephadm_module.cert_key_store.save_cert('rgw_frontend_ssl_cert', rgw_frontend_rgw_foo_host2_cert, service_name='rgw.foo', user_made=True) + + expected_calls = [ + mock.call(f'{CERT_STORE_CERT_PREFIX}agent_endpoint_root_cert', json.dumps(Cert(agent_endpoint_root_cert).to_json())), + mock.call(f'{CERT_STORE_CERT_PREFIX}alertmanager_cert', json.dumps({'host1': Cert(alertmanager_host1_cert).to_json()})), + mock.call(f'{CERT_STORE_CERT_PREFIX}rgw_frontend_ssl_cert', json.dumps({'rgw.foo': Cert(rgw_frontend_rgw_foo_host2_cert, True).to_json()})), + ] + _set_store.assert_has_calls(expected_calls) + + @mock.patch("cephadm.module.CephadmOrchestrator.set_store") + def test_cert_store_cert_ls(self, _set_store, cephadm_module: CephadmOrchestrator): + cephadm_module.cert_key_store._init_known_cert_key_dicts() + + expected_ls = { + 'rgw_frontend_ssl_cert': False, + 'iscsi_ssl_cert': False, + 'ingress_ssl_cert': False, + 'agent_endpoint_root_cert': False, + 'service_discovery_root_cert': False, + 'grafana_cert': False, + 'alertmanager_cert': False, + 'prometheus_cert': False, + 'node_exporter_cert': False, + } + assert cephadm_module.cert_key_store.cert_ls() == expected_ls + + cephadm_module.cert_key_store.save_cert('agent_endpoint_root_cert', 'xxx') + expected_ls['agent_endpoint_root_cert'] = True + assert cephadm_module.cert_key_store.cert_ls() == expected_ls + + cephadm_module.cert_key_store.save_cert('alertmanager_cert', 'xxx', host='host1') + cephadm_module.cert_key_store.save_cert('alertmanager_cert', 'xxx', host='host2') + expected_ls['alertmanager_cert'] = {} + expected_ls['alertmanager_cert']['host1'] = True + expected_ls['alertmanager_cert']['host2'] = True + assert cephadm_module.cert_key_store.cert_ls() == expected_ls + + cephadm_module.cert_key_store.save_cert('rgw_frontend_ssl_cert', 'xxx', service_name='rgw.foo', user_made=True) + cephadm_module.cert_key_store.save_cert('rgw_frontend_ssl_cert', 'xxx', service_name='rgw.bar', user_made=True) + expected_ls['rgw_frontend_ssl_cert'] = {} + expected_ls['rgw_frontend_ssl_cert']['rgw.foo'] = True + expected_ls['rgw_frontend_ssl_cert']['rgw.bar'] = True + assert cephadm_module.cert_key_store.cert_ls() == expected_ls + + @mock.patch("cephadm.module.CephadmOrchestrator.set_store") + def test_cert_store_save_key(self, _set_store, cephadm_module: CephadmOrchestrator): + cephadm_module.cert_key_store._init_known_cert_key_dicts() + + agent_endpoint_key = 'fake-agent-key' + grafana_host1_key = 'fake-grafana-host1-cert' + cephadm_module.cert_key_store.save_key('agent_endpoint_key', agent_endpoint_key) + cephadm_module.cert_key_store.save_key('grafana_key', grafana_host1_key, host='host1') + + expected_calls = [ + mock.call(f'{CERT_STORE_KEY_PREFIX}agent_endpoint_key', json.dumps(PrivKey(agent_endpoint_key).to_json())), + mock.call(f'{CERT_STORE_KEY_PREFIX}grafana_key', json.dumps({'host1': PrivKey(grafana_host1_key).to_json()})), + ] + _set_store.assert_has_calls(expected_calls) + + @mock.patch("cephadm.module.CephadmOrchestrator.set_store") + def test_cert_store_key_ls(self, _set_store, cephadm_module: CephadmOrchestrator): + cephadm_module.cert_key_store._init_known_cert_key_dicts() + + expected_ls = { + 'agent_endpoint_key': False, + 'service_discovery_key': False, + 'grafana_key': False, + 'alertmanager_key': False, + 'prometheus_key': False, + 'node_exporter_key': False, + 'iscsi_ssl_key': False, + 'ingress_ssl_key': False, + } + assert cephadm_module.cert_key_store.key_ls() == expected_ls + + cephadm_module.cert_key_store.save_key('agent_endpoint_key', 'xxx') + expected_ls['agent_endpoint_key'] = True + assert cephadm_module.cert_key_store.key_ls() == expected_ls + + cephadm_module.cert_key_store.save_key('alertmanager_key', 'xxx', host='host1') + cephadm_module.cert_key_store.save_key('alertmanager_key', 'xxx', host='host2') + expected_ls['alertmanager_key'] = {} + expected_ls['alertmanager_key']['host1'] = True + expected_ls['alertmanager_key']['host2'] = True + assert cephadm_module.cert_key_store.key_ls() == expected_ls + + @mock.patch("cephadm.module.CephadmOrchestrator.get_store_prefix") + def test_cert_store_load(self, _get_store_prefix, cephadm_module: CephadmOrchestrator): + cephadm_module.cert_key_store._init_known_cert_key_dicts() + + agent_endpoint_root_cert = 'fake-agent-cert' + alertmanager_host1_cert = 'fake-alertm-host1-cert' + rgw_frontend_rgw_foo_host2_cert = 'fake-rgw-cert' + agent_endpoint_key = 'fake-agent-key' + grafana_host1_key = 'fake-grafana-host1-cert' + + def _fake_prefix_store(key): + if key == 'cert_store.cert.': + return { + f'{CERT_STORE_CERT_PREFIX}agent_endpoint_root_cert': json.dumps(Cert(agent_endpoint_root_cert).to_json()), + f'{CERT_STORE_CERT_PREFIX}alertmanager_cert': json.dumps({'host1': Cert(alertmanager_host1_cert).to_json()}), + f'{CERT_STORE_CERT_PREFIX}rgw_frontend_ssl_cert': json.dumps({'rgw.foo': Cert(rgw_frontend_rgw_foo_host2_cert, True).to_json()}) + } + elif key == 'cert_store.key.': + return { + f'{CERT_STORE_KEY_PREFIX}agent_endpoint_key': json.dumps(PrivKey(agent_endpoint_key).to_json()), + f'{CERT_STORE_KEY_PREFIX}grafana_key': json.dumps({'host1': PrivKey(grafana_host1_key).to_json()}), + } + else: + raise Exception(f'Get store with unexpected value {key}') + + _get_store_prefix.side_effect = _fake_prefix_store + cephadm_module.cert_key_store.load() + assert cephadm_module.cert_key_store.known_certs['agent_endpoint_root_cert'] == Cert(agent_endpoint_root_cert) + assert cephadm_module.cert_key_store.known_certs['alertmanager_cert']['host1'] == Cert(alertmanager_host1_cert) + assert cephadm_module.cert_key_store.known_certs['rgw_frontend_ssl_cert']['rgw.foo'] == Cert(rgw_frontend_rgw_foo_host2_cert, True) + assert cephadm_module.cert_key_store.known_keys['agent_endpoint_key'] == PrivKey(agent_endpoint_key) + assert cephadm_module.cert_key_store.known_keys['grafana_key']['host1'] == PrivKey(grafana_host1_key) + + def test_cert_store_get_cert_key(self, cephadm_module: CephadmOrchestrator): + cephadm_module.cert_key_store._init_known_cert_key_dicts() + + agent_endpoint_root_cert = 'fake-agent-cert' + alertmanager_host1_cert = 'fake-alertm-host1-cert' + rgw_frontend_rgw_foo_host2_cert = 'fake-rgw-cert' + cephadm_module.cert_key_store.save_cert('agent_endpoint_root_cert', agent_endpoint_root_cert) + cephadm_module.cert_key_store.save_cert('alertmanager_cert', alertmanager_host1_cert, host='host1') + cephadm_module.cert_key_store.save_cert('rgw_frontend_ssl_cert', rgw_frontend_rgw_foo_host2_cert, service_name='rgw.foo', user_made=True) + + assert cephadm_module.cert_key_store.get_cert('agent_endpoint_root_cert') == agent_endpoint_root_cert + assert cephadm_module.cert_key_store.get_cert('alertmanager_cert', host='host1') == alertmanager_host1_cert + assert cephadm_module.cert_key_store.get_cert('rgw_frontend_ssl_cert', service_name='rgw.foo') == rgw_frontend_rgw_foo_host2_cert + assert cephadm_module.cert_key_store.get_cert('service_discovery_root_cert') == '' + assert cephadm_module.cert_key_store.get_cert('grafana_cert', host='host1') == '' + assert cephadm_module.cert_key_store.get_cert('iscsi_ssl_cert', service_name='iscsi.foo') == '' + + with pytest.raises(OrchestratorError, match='Attempted to access cert for unknown entity'): + cephadm_module.cert_key_store.get_cert('unknown_entity') + with pytest.raises(OrchestratorError, match='Need host to access cert for entity'): + cephadm_module.cert_key_store.get_cert('grafana_cert') + with pytest.raises(OrchestratorError, match='Need service name to access cert for entity'): + cephadm_module.cert_key_store.get_cert('rgw_frontend_ssl_cert', host='foo') + + agent_endpoint_key = 'fake-agent-key' + grafana_host1_key = 'fake-grafana-host1-cert' + cephadm_module.cert_key_store.save_key('agent_endpoint_key', agent_endpoint_key) + cephadm_module.cert_key_store.save_key('grafana_key', grafana_host1_key, host='host1') + + assert cephadm_module.cert_key_store.get_key('agent_endpoint_key') == agent_endpoint_key + assert cephadm_module.cert_key_store.get_key('grafana_key', host='host1') == grafana_host1_key + assert cephadm_module.cert_key_store.get_key('service_discovery_key') == '' + assert cephadm_module.cert_key_store.get_key('alertmanager_key', host='host1') == '' + + with pytest.raises(OrchestratorError, match='Attempted to access priv key for unknown entity'): + cephadm_module.cert_key_store.get_key('unknown_entity') + with pytest.raises(OrchestratorError, match='Need host to access priv key for entity'): + cephadm_module.cert_key_store.get_key('grafana_key') + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) @mock.patch("cephadm.services.nfs.NFSService.run_grace_tool", mock.MagicMock()) @mock.patch("cephadm.services.nfs.NFSService.purge", mock.MagicMock())