import math
import socket
from typing import TYPE_CHECKING, Dict, List, Iterator, Optional, Any, Tuple, Set, Mapping, cast, \
- NamedTuple, Type, ValuesView
+ NamedTuple, Type, ValuesView, Union
import orchestrator
from ceph.deployment import inventory
SPEC_STORE_PREFIX = "spec."
AGENT_CACHE_PREFIX = 'agent.'
NODE_PROXY_CACHE_PREFIX = 'node_proxy'
+CERT_STORE_CERT_PREFIX = 'cert_store.cert.'
+CERT_STORE_KEY_PREFIX = 'cert_store.key.'
class HostCacheStatus(enum.Enum):
self.save_agent(daemon_spec.host)
+class Cert():
+ def __init__(self, cert: str = '', user_made: bool = False) -> None:
+ self.cert = cert
+ self.user_made = user_made
+
+ def __bool__(self) -> bool:
+ return bool(self.cert)
+
+ def __eq__(self, other: Any) -> bool:
+ if isinstance(other, Cert):
+ return self.cert == other.cert and self.user_made == other.user_made
+ return NotImplemented
+
+ def to_json(self) -> Dict[str, Union[str, bool]]:
+ return {
+ 'cert': self.cert,
+ 'user_made': self.user_made
+ }
+
+ @classmethod
+ def from_json(cls, data: Dict[str, Union[str, bool]]) -> 'Cert':
+ if 'cert' not in data:
+ return cls()
+ cert = data['cert']
+ if not isinstance(cert, str):
+ raise OrchestratorError('Tried to make Cert object with non-string cert')
+ if any(k not in ['cert', 'user_made'] for k in data.keys()):
+ raise OrchestratorError(f'Got unknown field for Cert object. Fields: {data.keys()}')
+ user_made: Union[str, bool] = data.get('user_made', False)
+ if not isinstance(user_made, bool):
+ if isinstance(user_made, str):
+ if user_made.lower() == 'true':
+ user_made = True
+ elif user_made.lower() == 'false':
+ user_made = False
+ try:
+ user_made = bool(user_made)
+ except Exception:
+ raise OrchestratorError(f'Expected user_made field in Cert object to be bool but got {type(user_made)}')
+ return cls(cert=cert, user_made=user_made)
+
+
+class PrivKey():
+ def __init__(self, key: str = '', user_made: bool = False) -> None:
+ self.key = key
+ self.user_made = user_made
+
+ def __bool__(self) -> bool:
+ return bool(self.key)
+
+ def __eq__(self, other: Any) -> bool:
+ if isinstance(other, PrivKey):
+ return self.key == other.key and self.user_made == other.user_made
+ return NotImplemented
+
+ def to_json(self) -> Dict[str, Union[str, bool]]:
+ return {
+ 'key': self.key,
+ 'user_made': self.user_made
+ }
+
+ @classmethod
+ def from_json(cls, data: Dict[str, str]) -> 'PrivKey':
+ if 'key' not in data:
+ return cls()
+ key = data['key']
+ if not isinstance(key, str):
+ raise OrchestratorError('Tried to make PrivKey object with non-string key')
+ if any(k not in ['key', 'user_made'] for k in data.keys()):
+ raise OrchestratorError(f'Got unknown field for PrivKey object. Fields: {data.keys()}')
+ user_made: Union[str, bool] = data.get('user_made', False)
+ if not isinstance(user_made, bool):
+ if isinstance(user_made, str):
+ if user_made.lower() == 'true':
+ user_made = True
+ elif user_made.lower() == 'false':
+ user_made = False
+ try:
+ user_made = bool(user_made)
+ except Exception:
+ raise OrchestratorError(f'Expected user_made field in PrivKey object to be bool but got {type(user_made)}')
+ return cls(key=key, user_made=user_made)
+
+
+class CertKeyStore():
+ service_name_cert = [
+ 'rgw_frontend_ssl_cert',
+ 'iscsi_ssl_cert',
+ 'ingress_ssl_cert',
+ ]
+
+ host_cert = [
+ 'grafana_cert',
+ 'alertmanager_cert',
+ 'prometheus_cert',
+ 'node_exporter_cert',
+ ]
+
+ host_key = [
+ 'grafana_key',
+ 'alertmanager_key',
+ 'prometheus_key',
+ 'node_exporter_key',
+ ]
+
+ service_name_key = [
+ 'iscsi_ssl_key',
+ 'ingress_ssl_key',
+ ]
+
+ known_certs: Dict[str, Any] = {}
+ known_keys: Dict[str, Any] = {}
+
+ def __init__(self, mgr: 'CephadmOrchestrator') -> None:
+ self.mgr: CephadmOrchestrator = mgr
+ self._init_known_cert_key_dicts()
+
+ def _init_known_cert_key_dicts(self) -> None:
+ # In an effort to try and track all the certs we manage in cephadm
+ # we're being explicit here and listing them out.
+ self.known_certs = {
+ 'rgw_frontend_ssl_cert': {}, # service-name -> cert
+ 'iscsi_ssl_cert': {}, # service-name -> cert
+ 'ingress_ssl_cert': {}, # service-name -> cert
+ 'agent_endpoint_root_cert': Cert(), # cert
+ 'service_discovery_root_cert': Cert(), # cert
+ 'grafana_cert': {}, # host -> cert
+ 'alertmanager_cert': {}, # host -> cert
+ 'prometheus_cert': {}, # host -> cert
+ 'node_exporter_cert': {}, # host -> cert
+ }
+ # Similar to certs but for priv keys. Entries in known_certs
+ # that don't have a key here are probably certs in PEM format
+ # so there is no need to store a separate key
+ self.known_keys = {
+ 'agent_endpoint_key': PrivKey(), # key
+ 'service_discovery_key': PrivKey(), # key
+ 'grafana_key': {}, # host -> key
+ 'alertmanager_key': {}, # host -> key
+ 'prometheus_key': {}, # host -> key
+ 'node_exporter_key': {}, # host -> key
+ 'iscsi_ssl_key': {}, # service-name -> key
+ 'ingress_ssl_key': {}, # service-name -> key
+ }
+
+ def get_cert(self, entity: str, service_name: str = '', host: str = '') -> str:
+ self._validate_cert_entity(entity, service_name, host)
+
+ cert = Cert()
+ if entity in self.service_name_cert or entity in self.host_cert:
+ var = service_name if entity in self.service_name_cert else host
+ if var not in self.known_certs[entity]:
+ return ''
+ cert = self.known_certs[entity][var]
+ else:
+ cert = self.known_certs[entity]
+ if not cert or not isinstance(cert, Cert):
+ return ''
+ return cert.cert
+
+ def save_cert(self, entity: str, cert: str, service_name: str = '', host: str = '', user_made: bool = False) -> None:
+ self._validate_cert_entity(entity, service_name, host)
+
+ cert_obj = Cert(cert, user_made)
+
+ j: Union[str, Dict[Any, Any], None] = None
+ if entity in self.service_name_cert or entity in self.host_cert:
+ var = service_name if entity in self.service_name_cert else host
+ j = {}
+ self.known_certs[entity][var] = cert_obj
+ for service_name in self.known_certs[entity].keys():
+ j[var] = Cert.to_json(self.known_certs[entity][var])
+ else:
+ self.known_certs[entity] = cert_obj
+ j = Cert.to_json(cert_obj)
+ self.mgr.set_store(CERT_STORE_CERT_PREFIX + entity, json.dumps(j))
+
+ def rm_cert(self, entity: str, service_name: str = '', host: str = '') -> None:
+ self.save_cert(entity, cert='', service_name=service_name, host=host)
+
+ def _validate_cert_entity(self, entity: str, service_name: str = '', host: str = '') -> None:
+ if entity not in self.known_certs.keys():
+ raise OrchestratorError(f'Attempted to access cert for unknown entity {entity}')
+
+ if entity in self.host_cert and not host:
+ raise OrchestratorError(f'Need host to access cert for entity {entity}')
+
+ if entity in self.service_name_cert and not service_name:
+ raise OrchestratorError(f'Need service name to access cert for entity {entity}')
+
+ def cert_ls(self) -> Dict[str, Union[bool, Dict[str, bool]]]:
+ ls: Dict[str, Any] = {}
+ for k, v in self.known_certs.items():
+ if k in self.service_name_cert or k in self.host_cert:
+ tmp: Dict[str, Any] = {key: True for key in v if v[key]}
+ ls[k] = tmp if tmp else False
+ else:
+ ls[k] = bool(v)
+ return ls
+
+ def get_key(self, entity: str, service_name: str = '', host: str = '') -> str:
+ self._validate_key_entity(entity, host)
+
+ key = PrivKey()
+ if entity in self.host_key or entity in self.service_name_key:
+ var = service_name if entity in self.service_name_key else host
+ if var not in self.known_keys[entity]:
+ return ''
+ key = self.known_keys[entity][var]
+ else:
+ key = self.known_keys[entity]
+ if not key or not isinstance(key, PrivKey):
+ return ''
+ return key.key
+
+ def save_key(self, entity: str, key: str, service_name: str = '', host: str = '', user_made: bool = False) -> None:
+ self._validate_key_entity(entity, host)
+
+ pkey = PrivKey(key, user_made)
+
+ j: Union[str, Dict[Any, Any], None] = None
+ if entity in self.host_key or entity in self.service_name_key:
+ var = service_name if entity in self.service_name_key else host
+ j = {}
+ self.known_keys[entity][var] = pkey
+ for k in self.known_keys[entity]:
+ j[k] = PrivKey.to_json(self.known_keys[entity][k])
+ else:
+ self.known_keys[entity] = pkey
+ j = PrivKey.to_json(pkey)
+ self.mgr.set_store(CERT_STORE_KEY_PREFIX + entity, json.dumps(j))
+
+ def rm_key(self, entity: str, service_name: str = '', host: str = '') -> None:
+ self.save_key(entity, key='', service_name=service_name, host=host)
+
+ def _validate_key_entity(self, entity: str, host: str = '') -> None:
+ if entity not in self.known_keys.keys():
+ raise OrchestratorError(f'Attempted to access priv key for unknown entity {entity}')
+
+ if entity in self.host_key and not host:
+ raise OrchestratorError(f'Need host to access priv key for entity {entity}')
+
+ def key_ls(self) -> Dict[str, Union[bool, Dict[str, bool]]]:
+ ls: Dict[str, Any] = {}
+ for k, v in self.known_keys.items():
+ if k in self.host_key or k in self.service_name_key:
+ tmp: Dict[str, Any] = {key: True for key in v if v[key]}
+ ls[k] = tmp if tmp else False
+ else:
+ ls[k] = bool(v)
+ return ls
+
+ def load(self) -> None:
+ for k, v in self.mgr.get_store_prefix(CERT_STORE_CERT_PREFIX).items():
+ entity = k[len(CERT_STORE_CERT_PREFIX):]
+ self.known_certs[entity] = json.loads(v)
+ if entity in self.service_name_cert or entity in self.host_cert:
+ for k in self.known_certs[entity]:
+ cert_obj = Cert.from_json(self.known_certs[entity][k])
+ self.known_certs[entity][k] = cert_obj
+ else:
+ cert_obj = Cert.from_json(self.known_certs[entity])
+ self.known_certs[entity] = cert_obj
+
+ for k, v in self.mgr.get_store_prefix(CERT_STORE_KEY_PREFIX).items():
+ entity = k[len(CERT_STORE_KEY_PREFIX):]
+ self.known_keys[entity] = json.loads(v)
+ if entity in self.host_key or entity in self.service_name_key:
+ for k in self.known_keys[entity]:
+ priv_key_obj = PrivKey.from_json(self.known_keys[entity][k])
+ self.known_keys[entity][k] = priv_key_obj
+ else:
+ priv_key_obj = PrivKey.from_json(self.known_keys[entity])
+ self.known_keys[entity] = priv_key_obj
+
+
class EventStore():
def __init__(self, mgr):
# type: (CephadmOrchestrator) -> None
from ceph.deployment.drive_group import DriveGroupSpec, DeviceSelection
from cephadm.serve import CephadmServe
-from cephadm.inventory import HostCacheStatus, ClientKeyringSpec
+from cephadm.inventory import (
+ HostCacheStatus,
+ ClientKeyringSpec,
+ Cert,
+ PrivKey,
+ CERT_STORE_CERT_PREFIX,
+ CERT_STORE_KEY_PREFIX,
+)
from cephadm.services.osd import OSD, OSDRemovalQueue, OsdIdClaims
from cephadm.utils import SpecialHostLabels
assert cephadm_module.cache._get_host_cache_entry_status(
'host.nothing.com') == HostCacheStatus.stray
+ @mock.patch("cephadm.module.CephadmOrchestrator.set_store")
+ def test_cert_store_save_cert(self, _set_store, cephadm_module: CephadmOrchestrator):
+ cephadm_module.cert_key_store._init_known_cert_key_dicts()
+
+ agent_endpoint_root_cert = 'fake-agent-cert'
+ alertmanager_host1_cert = 'fake-alertm-host1-cert'
+ rgw_frontend_rgw_foo_host2_cert = 'fake-rgw-cert'
+ cephadm_module.cert_key_store.save_cert('agent_endpoint_root_cert', agent_endpoint_root_cert)
+ cephadm_module.cert_key_store.save_cert('alertmanager_cert', alertmanager_host1_cert, host='host1')
+ cephadm_module.cert_key_store.save_cert('rgw_frontend_ssl_cert', rgw_frontend_rgw_foo_host2_cert, service_name='rgw.foo', user_made=True)
+
+ expected_calls = [
+ mock.call(f'{CERT_STORE_CERT_PREFIX}agent_endpoint_root_cert', json.dumps(Cert(agent_endpoint_root_cert).to_json())),
+ mock.call(f'{CERT_STORE_CERT_PREFIX}alertmanager_cert', json.dumps({'host1': Cert(alertmanager_host1_cert).to_json()})),
+ mock.call(f'{CERT_STORE_CERT_PREFIX}rgw_frontend_ssl_cert', json.dumps({'rgw.foo': Cert(rgw_frontend_rgw_foo_host2_cert, True).to_json()})),
+ ]
+ _set_store.assert_has_calls(expected_calls)
+
+ @mock.patch("cephadm.module.CephadmOrchestrator.set_store")
+ def test_cert_store_cert_ls(self, _set_store, cephadm_module: CephadmOrchestrator):
+ cephadm_module.cert_key_store._init_known_cert_key_dicts()
+
+ expected_ls = {
+ 'rgw_frontend_ssl_cert': False,
+ 'iscsi_ssl_cert': False,
+ 'ingress_ssl_cert': False,
+ 'agent_endpoint_root_cert': False,
+ 'service_discovery_root_cert': False,
+ 'grafana_cert': False,
+ 'alertmanager_cert': False,
+ 'prometheus_cert': False,
+ 'node_exporter_cert': False,
+ }
+ assert cephadm_module.cert_key_store.cert_ls() == expected_ls
+
+ cephadm_module.cert_key_store.save_cert('agent_endpoint_root_cert', 'xxx')
+ expected_ls['agent_endpoint_root_cert'] = True
+ assert cephadm_module.cert_key_store.cert_ls() == expected_ls
+
+ cephadm_module.cert_key_store.save_cert('alertmanager_cert', 'xxx', host='host1')
+ cephadm_module.cert_key_store.save_cert('alertmanager_cert', 'xxx', host='host2')
+ expected_ls['alertmanager_cert'] = {}
+ expected_ls['alertmanager_cert']['host1'] = True
+ expected_ls['alertmanager_cert']['host2'] = True
+ assert cephadm_module.cert_key_store.cert_ls() == expected_ls
+
+ cephadm_module.cert_key_store.save_cert('rgw_frontend_ssl_cert', 'xxx', service_name='rgw.foo', user_made=True)
+ cephadm_module.cert_key_store.save_cert('rgw_frontend_ssl_cert', 'xxx', service_name='rgw.bar', user_made=True)
+ expected_ls['rgw_frontend_ssl_cert'] = {}
+ expected_ls['rgw_frontend_ssl_cert']['rgw.foo'] = True
+ expected_ls['rgw_frontend_ssl_cert']['rgw.bar'] = True
+ assert cephadm_module.cert_key_store.cert_ls() == expected_ls
+
+ @mock.patch("cephadm.module.CephadmOrchestrator.set_store")
+ def test_cert_store_save_key(self, _set_store, cephadm_module: CephadmOrchestrator):
+ cephadm_module.cert_key_store._init_known_cert_key_dicts()
+
+ agent_endpoint_key = 'fake-agent-key'
+ grafana_host1_key = 'fake-grafana-host1-cert'
+ cephadm_module.cert_key_store.save_key('agent_endpoint_key', agent_endpoint_key)
+ cephadm_module.cert_key_store.save_key('grafana_key', grafana_host1_key, host='host1')
+
+ expected_calls = [
+ mock.call(f'{CERT_STORE_KEY_PREFIX}agent_endpoint_key', json.dumps(PrivKey(agent_endpoint_key).to_json())),
+ mock.call(f'{CERT_STORE_KEY_PREFIX}grafana_key', json.dumps({'host1': PrivKey(grafana_host1_key).to_json()})),
+ ]
+ _set_store.assert_has_calls(expected_calls)
+
+ @mock.patch("cephadm.module.CephadmOrchestrator.set_store")
+ def test_cert_store_key_ls(self, _set_store, cephadm_module: CephadmOrchestrator):
+ cephadm_module.cert_key_store._init_known_cert_key_dicts()
+
+ expected_ls = {
+ 'agent_endpoint_key': False,
+ 'service_discovery_key': False,
+ 'grafana_key': False,
+ 'alertmanager_key': False,
+ 'prometheus_key': False,
+ 'node_exporter_key': False,
+ 'iscsi_ssl_key': False,
+ 'ingress_ssl_key': False,
+ }
+ assert cephadm_module.cert_key_store.key_ls() == expected_ls
+
+ cephadm_module.cert_key_store.save_key('agent_endpoint_key', 'xxx')
+ expected_ls['agent_endpoint_key'] = True
+ assert cephadm_module.cert_key_store.key_ls() == expected_ls
+
+ cephadm_module.cert_key_store.save_key('alertmanager_key', 'xxx', host='host1')
+ cephadm_module.cert_key_store.save_key('alertmanager_key', 'xxx', host='host2')
+ expected_ls['alertmanager_key'] = {}
+ expected_ls['alertmanager_key']['host1'] = True
+ expected_ls['alertmanager_key']['host2'] = True
+ assert cephadm_module.cert_key_store.key_ls() == expected_ls
+
+ @mock.patch("cephadm.module.CephadmOrchestrator.get_store_prefix")
+ def test_cert_store_load(self, _get_store_prefix, cephadm_module: CephadmOrchestrator):
+ cephadm_module.cert_key_store._init_known_cert_key_dicts()
+
+ agent_endpoint_root_cert = 'fake-agent-cert'
+ alertmanager_host1_cert = 'fake-alertm-host1-cert'
+ rgw_frontend_rgw_foo_host2_cert = 'fake-rgw-cert'
+ agent_endpoint_key = 'fake-agent-key'
+ grafana_host1_key = 'fake-grafana-host1-cert'
+
+ def _fake_prefix_store(key):
+ if key == 'cert_store.cert.':
+ return {
+ f'{CERT_STORE_CERT_PREFIX}agent_endpoint_root_cert': json.dumps(Cert(agent_endpoint_root_cert).to_json()),
+ f'{CERT_STORE_CERT_PREFIX}alertmanager_cert': json.dumps({'host1': Cert(alertmanager_host1_cert).to_json()}),
+ f'{CERT_STORE_CERT_PREFIX}rgw_frontend_ssl_cert': json.dumps({'rgw.foo': Cert(rgw_frontend_rgw_foo_host2_cert, True).to_json()})
+ }
+ elif key == 'cert_store.key.':
+ return {
+ f'{CERT_STORE_KEY_PREFIX}agent_endpoint_key': json.dumps(PrivKey(agent_endpoint_key).to_json()),
+ f'{CERT_STORE_KEY_PREFIX}grafana_key': json.dumps({'host1': PrivKey(grafana_host1_key).to_json()}),
+ }
+ else:
+ raise Exception(f'Get store with unexpected value {key}')
+
+ _get_store_prefix.side_effect = _fake_prefix_store
+ cephadm_module.cert_key_store.load()
+ assert cephadm_module.cert_key_store.known_certs['agent_endpoint_root_cert'] == Cert(agent_endpoint_root_cert)
+ assert cephadm_module.cert_key_store.known_certs['alertmanager_cert']['host1'] == Cert(alertmanager_host1_cert)
+ assert cephadm_module.cert_key_store.known_certs['rgw_frontend_ssl_cert']['rgw.foo'] == Cert(rgw_frontend_rgw_foo_host2_cert, True)
+ assert cephadm_module.cert_key_store.known_keys['agent_endpoint_key'] == PrivKey(agent_endpoint_key)
+ assert cephadm_module.cert_key_store.known_keys['grafana_key']['host1'] == PrivKey(grafana_host1_key)
+
+ def test_cert_store_get_cert_key(self, cephadm_module: CephadmOrchestrator):
+ cephadm_module.cert_key_store._init_known_cert_key_dicts()
+
+ agent_endpoint_root_cert = 'fake-agent-cert'
+ alertmanager_host1_cert = 'fake-alertm-host1-cert'
+ rgw_frontend_rgw_foo_host2_cert = 'fake-rgw-cert'
+ cephadm_module.cert_key_store.save_cert('agent_endpoint_root_cert', agent_endpoint_root_cert)
+ cephadm_module.cert_key_store.save_cert('alertmanager_cert', alertmanager_host1_cert, host='host1')
+ cephadm_module.cert_key_store.save_cert('rgw_frontend_ssl_cert', rgw_frontend_rgw_foo_host2_cert, service_name='rgw.foo', user_made=True)
+
+ assert cephadm_module.cert_key_store.get_cert('agent_endpoint_root_cert') == agent_endpoint_root_cert
+ assert cephadm_module.cert_key_store.get_cert('alertmanager_cert', host='host1') == alertmanager_host1_cert
+ assert cephadm_module.cert_key_store.get_cert('rgw_frontend_ssl_cert', service_name='rgw.foo') == rgw_frontend_rgw_foo_host2_cert
+ assert cephadm_module.cert_key_store.get_cert('service_discovery_root_cert') == ''
+ assert cephadm_module.cert_key_store.get_cert('grafana_cert', host='host1') == ''
+ assert cephadm_module.cert_key_store.get_cert('iscsi_ssl_cert', service_name='iscsi.foo') == ''
+
+ with pytest.raises(OrchestratorError, match='Attempted to access cert for unknown entity'):
+ cephadm_module.cert_key_store.get_cert('unknown_entity')
+ with pytest.raises(OrchestratorError, match='Need host to access cert for entity'):
+ cephadm_module.cert_key_store.get_cert('grafana_cert')
+ with pytest.raises(OrchestratorError, match='Need service name to access cert for entity'):
+ cephadm_module.cert_key_store.get_cert('rgw_frontend_ssl_cert', host='foo')
+
+ agent_endpoint_key = 'fake-agent-key'
+ grafana_host1_key = 'fake-grafana-host1-cert'
+ cephadm_module.cert_key_store.save_key('agent_endpoint_key', agent_endpoint_key)
+ cephadm_module.cert_key_store.save_key('grafana_key', grafana_host1_key, host='host1')
+
+ assert cephadm_module.cert_key_store.get_key('agent_endpoint_key') == agent_endpoint_key
+ assert cephadm_module.cert_key_store.get_key('grafana_key', host='host1') == grafana_host1_key
+ assert cephadm_module.cert_key_store.get_key('service_discovery_key') == ''
+ assert cephadm_module.cert_key_store.get_key('alertmanager_key', host='host1') == ''
+
+ with pytest.raises(OrchestratorError, match='Attempted to access priv key for unknown entity'):
+ cephadm_module.cert_key_store.get_key('unknown_entity')
+ with pytest.raises(OrchestratorError, match='Need host to access priv key for entity'):
+ cephadm_module.cert_key_store.get_key('grafana_key')
+
@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
@mock.patch("cephadm.services.nfs.NFSService.run_grace_tool", mock.MagicMock())
@mock.patch("cephadm.services.nfs.NFSService.purge", mock.MagicMock())