]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/cephadm: add a Cert/Key tracking/storage class
authorAdam King <adking@redhat.com>
Mon, 22 Jan 2024 17:37:19 +0000 (12:37 -0500)
committerAdam King <adking@redhat.com>
Fri, 12 Jul 2024 13:05:38 +0000 (09:05 -0400)
The idea is to move storage/handling of certs from any misc.
spot in the mgr/cephadm codebase into a single class. This will
make it much easier to do things with the certs we have in
the future.

Signed-off-by: Adam King <adking@redhat.com>
(cherry picked from commit 67814772458eb2cb3311ae81bd3a111ee9069a6c)

src/pybind/mgr/cephadm/inventory.py
src/pybind/mgr/cephadm/module.py
src/pybind/mgr/cephadm/tests/test_cephadm.py

index cbaff8a5b000f651f7aad04db7d8b6c8c2c435c2..e1645f8e75dbeb2cb26cd926e6606a45aca2e95b 100644 (file)
@@ -8,7 +8,7 @@ import logging
 import math
 import socket
 from typing import TYPE_CHECKING, Dict, List, Iterator, Optional, Any, Tuple, Set, Mapping, cast, \
-    NamedTuple, Type, ValuesView
+    NamedTuple, Type, ValuesView, Union
 
 import orchestrator
 from ceph.deployment import inventory
@@ -30,6 +30,8 @@ HOST_CACHE_PREFIX = "host."
 SPEC_STORE_PREFIX = "spec."
 AGENT_CACHE_PREFIX = 'agent.'
 NODE_PROXY_CACHE_PREFIX = 'node_proxy'
+CERT_STORE_CERT_PREFIX = 'cert_store.cert.'
+CERT_STORE_KEY_PREFIX = 'cert_store.key.'
 
 
 class HostCacheStatus(enum.Enum):
@@ -1688,6 +1690,282 @@ class AgentCache():
         self.save_agent(daemon_spec.host)
 
 
+class Cert():
+    def __init__(self, cert: str = '', user_made: bool = False) -> None:
+        self.cert = cert
+        self.user_made = user_made
+
+    def __bool__(self) -> bool:
+        return bool(self.cert)
+
+    def __eq__(self, other: Any) -> bool:
+        if isinstance(other, Cert):
+            return self.cert == other.cert and self.user_made == other.user_made
+        return NotImplemented
+
+    def to_json(self) -> Dict[str, Union[str, bool]]:
+        return {
+            'cert': self.cert,
+            'user_made': self.user_made
+        }
+
+    @classmethod
+    def from_json(cls, data: Dict[str, Union[str, bool]]) -> 'Cert':
+        if 'cert' not in data:
+            return cls()
+        cert = data['cert']
+        if not isinstance(cert, str):
+            raise OrchestratorError('Tried to make Cert object with non-string cert')
+        if any(k not in ['cert', 'user_made'] for k in data.keys()):
+            raise OrchestratorError(f'Got unknown field for Cert object. Fields: {data.keys()}')
+        user_made: Union[str, bool] = data.get('user_made', False)
+        if not isinstance(user_made, bool):
+            if isinstance(user_made, str):
+                if user_made.lower() == 'true':
+                    user_made = True
+                elif user_made.lower() == 'false':
+                    user_made = False
+            try:
+                user_made = bool(user_made)
+            except Exception:
+                raise OrchestratorError(f'Expected user_made field in Cert object to be bool but got {type(user_made)}')
+        return cls(cert=cert, user_made=user_made)
+
+
+class PrivKey():
+    def __init__(self, key: str = '', user_made: bool = False) -> None:
+        self.key = key
+        self.user_made = user_made
+
+    def __bool__(self) -> bool:
+        return bool(self.key)
+
+    def __eq__(self, other: Any) -> bool:
+        if isinstance(other, PrivKey):
+            return self.key == other.key and self.user_made == other.user_made
+        return NotImplemented
+
+    def to_json(self) -> Dict[str, Union[str, bool]]:
+        return {
+            'key': self.key,
+            'user_made': self.user_made
+        }
+
+    @classmethod
+    def from_json(cls, data: Dict[str, str]) -> 'PrivKey':
+        if 'key' not in data:
+            return cls()
+        key = data['key']
+        if not isinstance(key, str):
+            raise OrchestratorError('Tried to make PrivKey object with non-string key')
+        if any(k not in ['key', 'user_made'] for k in data.keys()):
+            raise OrchestratorError(f'Got unknown field for PrivKey object. Fields: {data.keys()}')
+        user_made: Union[str, bool] = data.get('user_made', False)
+        if not isinstance(user_made, bool):
+            if isinstance(user_made, str):
+                if user_made.lower() == 'true':
+                    user_made = True
+                elif user_made.lower() == 'false':
+                    user_made = False
+            try:
+                user_made = bool(user_made)
+            except Exception:
+                raise OrchestratorError(f'Expected user_made field in PrivKey object to be bool but got {type(user_made)}')
+        return cls(key=key, user_made=user_made)
+
+
+class CertKeyStore():
+    service_name_cert = [
+        'rgw_frontend_ssl_cert',
+        'iscsi_ssl_cert',
+        'ingress_ssl_cert',
+    ]
+
+    host_cert = [
+        'grafana_cert',
+        'alertmanager_cert',
+        'prometheus_cert',
+        'node_exporter_cert',
+    ]
+
+    host_key = [
+        'grafana_key',
+        'alertmanager_key',
+        'prometheus_key',
+        'node_exporter_key',
+    ]
+
+    service_name_key = [
+        'iscsi_ssl_key',
+        'ingress_ssl_key',
+    ]
+
+    known_certs: Dict[str, Any] = {}
+    known_keys: Dict[str, Any] = {}
+
+    def __init__(self, mgr: 'CephadmOrchestrator') -> None:
+        self.mgr: CephadmOrchestrator = mgr
+        self._init_known_cert_key_dicts()
+
+    def _init_known_cert_key_dicts(self) -> None:
+        # In an effort to try and track all the certs we manage in cephadm
+        # we're being explicit here and listing them out.
+        self.known_certs = {
+            'rgw_frontend_ssl_cert': {},  # service-name -> cert
+            'iscsi_ssl_cert': {},  # service-name -> cert
+            'ingress_ssl_cert': {},  # service-name -> cert
+            'agent_endpoint_root_cert': Cert(),  # cert
+            'service_discovery_root_cert': Cert(),  # cert
+            'grafana_cert': {},  # host -> cert
+            'alertmanager_cert': {},  # host -> cert
+            'prometheus_cert': {},  # host -> cert
+            'node_exporter_cert': {},  # host -> cert
+        }
+        # Similar to certs but for priv keys. Entries in known_certs
+        # that don't have a key here are probably certs in PEM format
+        # so there is no need to store a separate key
+        self.known_keys = {
+            'agent_endpoint_key': PrivKey(),  # key
+            'service_discovery_key': PrivKey(),  # key
+            'grafana_key': {},  # host -> key
+            'alertmanager_key': {},  # host -> key
+            'prometheus_key': {},  # host -> key
+            'node_exporter_key': {},  # host -> key
+            'iscsi_ssl_key': {},  # service-name -> key
+            'ingress_ssl_key': {},  # service-name -> key
+        }
+
+    def get_cert(self, entity: str, service_name: str = '', host: str = '') -> str:
+        self._validate_cert_entity(entity, service_name, host)
+
+        cert = Cert()
+        if entity in self.service_name_cert or entity in self.host_cert:
+            var = service_name if entity in self.service_name_cert else host
+            if var not in self.known_certs[entity]:
+                return ''
+            cert = self.known_certs[entity][var]
+        else:
+            cert = self.known_certs[entity]
+        if not cert or not isinstance(cert, Cert):
+            return ''
+        return cert.cert
+
+    def save_cert(self, entity: str, cert: str, service_name: str = '', host: str = '', user_made: bool = False) -> None:
+        self._validate_cert_entity(entity, service_name, host)
+
+        cert_obj = Cert(cert, user_made)
+
+        j: Union[str, Dict[Any, Any], None] = None
+        if entity in self.service_name_cert or entity in self.host_cert:
+            var = service_name if entity in self.service_name_cert else host
+            j = {}
+            self.known_certs[entity][var] = cert_obj
+            for service_name in self.known_certs[entity].keys():
+                j[var] = Cert.to_json(self.known_certs[entity][var])
+        else:
+            self.known_certs[entity] = cert_obj
+            j = Cert.to_json(cert_obj)
+        self.mgr.set_store(CERT_STORE_CERT_PREFIX + entity, json.dumps(j))
+
+    def rm_cert(self, entity: str, service_name: str = '', host: str = '') -> None:
+        self.save_cert(entity, cert='', service_name=service_name, host=host)
+
+    def _validate_cert_entity(self, entity: str, service_name: str = '', host: str = '') -> None:
+        if entity not in self.known_certs.keys():
+            raise OrchestratorError(f'Attempted to access cert for unknown entity {entity}')
+
+        if entity in self.host_cert and not host:
+            raise OrchestratorError(f'Need host to access cert for entity {entity}')
+
+        if entity in self.service_name_cert and not service_name:
+            raise OrchestratorError(f'Need service name to access cert for entity {entity}')
+
+    def cert_ls(self) -> Dict[str, Union[bool, Dict[str, bool]]]:
+        ls: Dict[str, Any] = {}
+        for k, v in self.known_certs.items():
+            if k in self.service_name_cert or k in self.host_cert:
+                tmp: Dict[str, Any] = {key: True for key in v if v[key]}
+                ls[k] = tmp if tmp else False
+            else:
+                ls[k] = bool(v)
+        return ls
+
+    def get_key(self, entity: str, service_name: str = '', host: str = '') -> str:
+        self._validate_key_entity(entity, host)
+
+        key = PrivKey()
+        if entity in self.host_key or entity in self.service_name_key:
+            var = service_name if entity in self.service_name_key else host
+            if var not in self.known_keys[entity]:
+                return ''
+            key = self.known_keys[entity][var]
+        else:
+            key = self.known_keys[entity]
+        if not key or not isinstance(key, PrivKey):
+            return ''
+        return key.key
+
+    def save_key(self, entity: str, key: str, service_name: str = '', host: str = '', user_made: bool = False) -> None:
+        self._validate_key_entity(entity, host)
+
+        pkey = PrivKey(key, user_made)
+
+        j: Union[str, Dict[Any, Any], None] = None
+        if entity in self.host_key or entity in self.service_name_key:
+            var = service_name if entity in self.service_name_key else host
+            j = {}
+            self.known_keys[entity][var] = pkey
+            for k in self.known_keys[entity]:
+                j[k] = PrivKey.to_json(self.known_keys[entity][k])
+        else:
+            self.known_keys[entity] = pkey
+            j = PrivKey.to_json(pkey)
+        self.mgr.set_store(CERT_STORE_KEY_PREFIX + entity, json.dumps(j))
+
+    def rm_key(self, entity: str, service_name: str = '', host: str = '') -> None:
+        self.save_key(entity, key='', service_name=service_name, host=host)
+
+    def _validate_key_entity(self, entity: str, host: str = '') -> None:
+        if entity not in self.known_keys.keys():
+            raise OrchestratorError(f'Attempted to access priv key for unknown entity {entity}')
+
+        if entity in self.host_key and not host:
+            raise OrchestratorError(f'Need host to access priv key for entity {entity}')
+
+    def key_ls(self) -> Dict[str, Union[bool, Dict[str, bool]]]:
+        ls: Dict[str, Any] = {}
+        for k, v in self.known_keys.items():
+            if k in self.host_key or k in self.service_name_key:
+                tmp: Dict[str, Any] = {key: True for key in v if v[key]}
+                ls[k] = tmp if tmp else False
+            else:
+                ls[k] = bool(v)
+        return ls
+
+    def load(self) -> None:
+        for k, v in self.mgr.get_store_prefix(CERT_STORE_CERT_PREFIX).items():
+            entity = k[len(CERT_STORE_CERT_PREFIX):]
+            self.known_certs[entity] = json.loads(v)
+            if entity in self.service_name_cert or entity in self.host_cert:
+                for k in self.known_certs[entity]:
+                    cert_obj = Cert.from_json(self.known_certs[entity][k])
+                    self.known_certs[entity][k] = cert_obj
+            else:
+                cert_obj = Cert.from_json(self.known_certs[entity])
+                self.known_certs[entity] = cert_obj
+
+        for k, v in self.mgr.get_store_prefix(CERT_STORE_KEY_PREFIX).items():
+            entity = k[len(CERT_STORE_KEY_PREFIX):]
+            self.known_keys[entity] = json.loads(v)
+            if entity in self.host_key or entity in self.service_name_key:
+                for k in self.known_keys[entity]:
+                    priv_key_obj = PrivKey.from_json(self.known_keys[entity][k])
+                    self.known_keys[entity][k] = priv_key_obj
+            else:
+                priv_key_obj = PrivKey.from_json(self.known_keys[entity])
+                self.known_keys[entity] = priv_key_obj
+
+
 class EventStore():
     def __init__(self, mgr):
         # type: (CephadmOrchestrator) -> None
index b48a5deba89754e4843d5e7e46846f2fd48a55be..374c58ab2a0c7871c9f2847020a8dba744764561 100644 (file)
@@ -78,8 +78,18 @@ from .services.jaeger import ElasticSearchService, JaegerAgentService, JaegerCol
 from .services.node_proxy import NodeProxy
 from .services.smb import SMBService
 from .schedule import HostAssignment
-from .inventory import Inventory, SpecStore, HostCache, AgentCache, EventStore, \
-    ClientKeyringStore, ClientKeyringSpec, TunedProfileStore, NodeProxyCache
+from .inventory import (
+    Inventory,
+    SpecStore,
+    HostCache,
+    AgentCache,
+    EventStore,
+    ClientKeyringStore,
+    ClientKeyringSpec,
+    TunedProfileStore,
+    NodeProxyCache,
+    CertKeyStore,
+)
 from .upgrade import CephadmUpgrade
 from .template import TemplateMgr
 from .utils import CEPH_IMAGE_TYPES, RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES, forall_hosts, \
@@ -654,6 +664,9 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
 
         self.tuned_profile_utils = TunedProfileUtils(self)
 
+        self.cert_key_store = CertKeyStore(self)
+        self.cert_key_store.load()
+
         # ensure the host lists are in sync
         for h in self.inventory.keys():
             if h not in self.cache.daemons:
index 80cda47df527b17de7c51bbf8d772e2002b7891c..d9b1f5f8e784c3f9e72f04347217f0c061d3cb64 100644 (file)
@@ -8,7 +8,14 @@ import pytest
 
 from ceph.deployment.drive_group import DriveGroupSpec, DeviceSelection
 from cephadm.serve import CephadmServe
-from cephadm.inventory import HostCacheStatus, ClientKeyringSpec
+from cephadm.inventory import (
+    HostCacheStatus,
+    ClientKeyringSpec,
+    Cert,
+    PrivKey,
+    CERT_STORE_CERT_PREFIX,
+    CERT_STORE_KEY_PREFIX,
+)
 from cephadm.services.osd import OSD, OSDRemovalQueue, OsdIdClaims
 from cephadm.utils import SpecialHostLabels
 
@@ -1690,6 +1697,173 @@ class TestCephadm(object):
         assert cephadm_module.cache._get_host_cache_entry_status(
             'host.nothing.com') == HostCacheStatus.stray
 
+    @mock.patch("cephadm.module.CephadmOrchestrator.set_store")
+    def test_cert_store_save_cert(self, _set_store, cephadm_module: CephadmOrchestrator):
+        cephadm_module.cert_key_store._init_known_cert_key_dicts()
+
+        agent_endpoint_root_cert = 'fake-agent-cert'
+        alertmanager_host1_cert = 'fake-alertm-host1-cert'
+        rgw_frontend_rgw_foo_host2_cert = 'fake-rgw-cert'
+        cephadm_module.cert_key_store.save_cert('agent_endpoint_root_cert', agent_endpoint_root_cert)
+        cephadm_module.cert_key_store.save_cert('alertmanager_cert', alertmanager_host1_cert, host='host1')
+        cephadm_module.cert_key_store.save_cert('rgw_frontend_ssl_cert', rgw_frontend_rgw_foo_host2_cert, service_name='rgw.foo', user_made=True)
+
+        expected_calls = [
+            mock.call(f'{CERT_STORE_CERT_PREFIX}agent_endpoint_root_cert', json.dumps(Cert(agent_endpoint_root_cert).to_json())),
+            mock.call(f'{CERT_STORE_CERT_PREFIX}alertmanager_cert', json.dumps({'host1': Cert(alertmanager_host1_cert).to_json()})),
+            mock.call(f'{CERT_STORE_CERT_PREFIX}rgw_frontend_ssl_cert', json.dumps({'rgw.foo': Cert(rgw_frontend_rgw_foo_host2_cert, True).to_json()})),
+        ]
+        _set_store.assert_has_calls(expected_calls)
+
+    @mock.patch("cephadm.module.CephadmOrchestrator.set_store")
+    def test_cert_store_cert_ls(self, _set_store, cephadm_module: CephadmOrchestrator):
+        cephadm_module.cert_key_store._init_known_cert_key_dicts()
+
+        expected_ls = {
+            'rgw_frontend_ssl_cert': False,
+            'iscsi_ssl_cert': False,
+            'ingress_ssl_cert': False,
+            'agent_endpoint_root_cert': False,
+            'service_discovery_root_cert': False,
+            'grafana_cert': False,
+            'alertmanager_cert': False,
+            'prometheus_cert': False,
+            'node_exporter_cert': False,
+        }
+        assert cephadm_module.cert_key_store.cert_ls() == expected_ls
+
+        cephadm_module.cert_key_store.save_cert('agent_endpoint_root_cert', 'xxx')
+        expected_ls['agent_endpoint_root_cert'] = True
+        assert cephadm_module.cert_key_store.cert_ls() == expected_ls
+
+        cephadm_module.cert_key_store.save_cert('alertmanager_cert', 'xxx', host='host1')
+        cephadm_module.cert_key_store.save_cert('alertmanager_cert', 'xxx', host='host2')
+        expected_ls['alertmanager_cert'] = {}
+        expected_ls['alertmanager_cert']['host1'] = True
+        expected_ls['alertmanager_cert']['host2'] = True
+        assert cephadm_module.cert_key_store.cert_ls() == expected_ls
+
+        cephadm_module.cert_key_store.save_cert('rgw_frontend_ssl_cert', 'xxx', service_name='rgw.foo', user_made=True)
+        cephadm_module.cert_key_store.save_cert('rgw_frontend_ssl_cert', 'xxx', service_name='rgw.bar', user_made=True)
+        expected_ls['rgw_frontend_ssl_cert'] = {}
+        expected_ls['rgw_frontend_ssl_cert']['rgw.foo'] = True
+        expected_ls['rgw_frontend_ssl_cert']['rgw.bar'] = True
+        assert cephadm_module.cert_key_store.cert_ls() == expected_ls
+
+    @mock.patch("cephadm.module.CephadmOrchestrator.set_store")
+    def test_cert_store_save_key(self, _set_store, cephadm_module: CephadmOrchestrator):
+        cephadm_module.cert_key_store._init_known_cert_key_dicts()
+
+        agent_endpoint_key = 'fake-agent-key'
+        grafana_host1_key = 'fake-grafana-host1-cert'
+        cephadm_module.cert_key_store.save_key('agent_endpoint_key', agent_endpoint_key)
+        cephadm_module.cert_key_store.save_key('grafana_key', grafana_host1_key, host='host1')
+
+        expected_calls = [
+            mock.call(f'{CERT_STORE_KEY_PREFIX}agent_endpoint_key', json.dumps(PrivKey(agent_endpoint_key).to_json())),
+            mock.call(f'{CERT_STORE_KEY_PREFIX}grafana_key', json.dumps({'host1': PrivKey(grafana_host1_key).to_json()})),
+        ]
+        _set_store.assert_has_calls(expected_calls)
+
+    @mock.patch("cephadm.module.CephadmOrchestrator.set_store")
+    def test_cert_store_key_ls(self, _set_store, cephadm_module: CephadmOrchestrator):
+        cephadm_module.cert_key_store._init_known_cert_key_dicts()
+
+        expected_ls = {
+            'agent_endpoint_key': False,
+            'service_discovery_key': False,
+            'grafana_key': False,
+            'alertmanager_key': False,
+            'prometheus_key': False,
+            'node_exporter_key': False,
+            'iscsi_ssl_key': False,
+            'ingress_ssl_key': False,
+        }
+        assert cephadm_module.cert_key_store.key_ls() == expected_ls
+
+        cephadm_module.cert_key_store.save_key('agent_endpoint_key', 'xxx')
+        expected_ls['agent_endpoint_key'] = True
+        assert cephadm_module.cert_key_store.key_ls() == expected_ls
+
+        cephadm_module.cert_key_store.save_key('alertmanager_key', 'xxx', host='host1')
+        cephadm_module.cert_key_store.save_key('alertmanager_key', 'xxx', host='host2')
+        expected_ls['alertmanager_key'] = {}
+        expected_ls['alertmanager_key']['host1'] = True
+        expected_ls['alertmanager_key']['host2'] = True
+        assert cephadm_module.cert_key_store.key_ls() == expected_ls
+
+    @mock.patch("cephadm.module.CephadmOrchestrator.get_store_prefix")
+    def test_cert_store_load(self, _get_store_prefix, cephadm_module: CephadmOrchestrator):
+        cephadm_module.cert_key_store._init_known_cert_key_dicts()
+
+        agent_endpoint_root_cert = 'fake-agent-cert'
+        alertmanager_host1_cert = 'fake-alertm-host1-cert'
+        rgw_frontend_rgw_foo_host2_cert = 'fake-rgw-cert'
+        agent_endpoint_key = 'fake-agent-key'
+        grafana_host1_key = 'fake-grafana-host1-cert'
+
+        def _fake_prefix_store(key):
+            if key == 'cert_store.cert.':
+                return {
+                    f'{CERT_STORE_CERT_PREFIX}agent_endpoint_root_cert': json.dumps(Cert(agent_endpoint_root_cert).to_json()),
+                    f'{CERT_STORE_CERT_PREFIX}alertmanager_cert': json.dumps({'host1': Cert(alertmanager_host1_cert).to_json()}),
+                    f'{CERT_STORE_CERT_PREFIX}rgw_frontend_ssl_cert': json.dumps({'rgw.foo': Cert(rgw_frontend_rgw_foo_host2_cert, True).to_json()})
+                }
+            elif key == 'cert_store.key.':
+                return {
+                    f'{CERT_STORE_KEY_PREFIX}agent_endpoint_key': json.dumps(PrivKey(agent_endpoint_key).to_json()),
+                    f'{CERT_STORE_KEY_PREFIX}grafana_key': json.dumps({'host1': PrivKey(grafana_host1_key).to_json()}),
+                }
+            else:
+                raise Exception(f'Get store with unexpected value {key}')
+
+        _get_store_prefix.side_effect = _fake_prefix_store
+        cephadm_module.cert_key_store.load()
+        assert cephadm_module.cert_key_store.known_certs['agent_endpoint_root_cert'] == Cert(agent_endpoint_root_cert)
+        assert cephadm_module.cert_key_store.known_certs['alertmanager_cert']['host1'] == Cert(alertmanager_host1_cert)
+        assert cephadm_module.cert_key_store.known_certs['rgw_frontend_ssl_cert']['rgw.foo'] == Cert(rgw_frontend_rgw_foo_host2_cert, True)
+        assert cephadm_module.cert_key_store.known_keys['agent_endpoint_key'] == PrivKey(agent_endpoint_key)
+        assert cephadm_module.cert_key_store.known_keys['grafana_key']['host1'] == PrivKey(grafana_host1_key)
+
+    def test_cert_store_get_cert_key(self, cephadm_module: CephadmOrchestrator):
+        cephadm_module.cert_key_store._init_known_cert_key_dicts()
+
+        agent_endpoint_root_cert = 'fake-agent-cert'
+        alertmanager_host1_cert = 'fake-alertm-host1-cert'
+        rgw_frontend_rgw_foo_host2_cert = 'fake-rgw-cert'
+        cephadm_module.cert_key_store.save_cert('agent_endpoint_root_cert', agent_endpoint_root_cert)
+        cephadm_module.cert_key_store.save_cert('alertmanager_cert', alertmanager_host1_cert, host='host1')
+        cephadm_module.cert_key_store.save_cert('rgw_frontend_ssl_cert', rgw_frontend_rgw_foo_host2_cert, service_name='rgw.foo', user_made=True)
+
+        assert cephadm_module.cert_key_store.get_cert('agent_endpoint_root_cert') == agent_endpoint_root_cert
+        assert cephadm_module.cert_key_store.get_cert('alertmanager_cert', host='host1') == alertmanager_host1_cert
+        assert cephadm_module.cert_key_store.get_cert('rgw_frontend_ssl_cert', service_name='rgw.foo') == rgw_frontend_rgw_foo_host2_cert
+        assert cephadm_module.cert_key_store.get_cert('service_discovery_root_cert') == ''
+        assert cephadm_module.cert_key_store.get_cert('grafana_cert', host='host1') == ''
+        assert cephadm_module.cert_key_store.get_cert('iscsi_ssl_cert', service_name='iscsi.foo') == ''
+
+        with pytest.raises(OrchestratorError, match='Attempted to access cert for unknown entity'):
+            cephadm_module.cert_key_store.get_cert('unknown_entity')
+        with pytest.raises(OrchestratorError, match='Need host to access cert for entity'):
+            cephadm_module.cert_key_store.get_cert('grafana_cert')
+        with pytest.raises(OrchestratorError, match='Need service name to access cert for entity'):
+            cephadm_module.cert_key_store.get_cert('rgw_frontend_ssl_cert', host='foo')
+
+        agent_endpoint_key = 'fake-agent-key'
+        grafana_host1_key = 'fake-grafana-host1-cert'
+        cephadm_module.cert_key_store.save_key('agent_endpoint_key', agent_endpoint_key)
+        cephadm_module.cert_key_store.save_key('grafana_key', grafana_host1_key, host='host1')
+
+        assert cephadm_module.cert_key_store.get_key('agent_endpoint_key') == agent_endpoint_key
+        assert cephadm_module.cert_key_store.get_key('grafana_key', host='host1') == grafana_host1_key
+        assert cephadm_module.cert_key_store.get_key('service_discovery_key') == ''
+        assert cephadm_module.cert_key_store.get_key('alertmanager_key', host='host1') == ''
+
+        with pytest.raises(OrchestratorError, match='Attempted to access priv key for unknown entity'):
+            cephadm_module.cert_key_store.get_key('unknown_entity')
+        with pytest.raises(OrchestratorError, match='Need host to access priv key for entity'):
+            cephadm_module.cert_key_store.get_key('grafana_key')
+
     @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
     @mock.patch("cephadm.services.nfs.NFSService.run_grace_tool", mock.MagicMock())
     @mock.patch("cephadm.services.nfs.NFSService.purge", mock.MagicMock())