+from typing import TYPE_CHECKING, Tuple, Union, List, Dict, Optional, cast, Any
+import logging
+import copy
from cephadm.ssl_cert_utils import SSLCerts, SSLConfigException
from typing import TYPE_CHECKING, Tuple, Union, List, Optional
+from cephadm.tlsobject_types import Cert, PrivKey
+from cephadm.tlsobject_store import TLSObjectStore, TLSObjectScope
if TYPE_CHECKING:
from cephadm.module import CephadmOrchestrator
+logger = logging.getLogger(__name__)
+
class CertMgr:
CEPHADM_ROOT_CA_CERT = 'cephadm_root_ca_cert'
CEPHADM_ROOT_CA_KEY = 'cephadm_root_ca_key'
- def __init__(self, mgr: "CephadmOrchestrator", ip: str) -> None:
- self.ssl_certs: SSLCerts = SSLCerts(mgr._cluster_fsid)
- old_cert = mgr.cert_key_store.get_cert(self.CEPHADM_ROOT_CA_CERT)
- old_key = mgr.cert_key_store.get_key(self.CEPHADM_ROOT_CA_KEY)
+ # In an effort to try and track all the certs we manage in cephadm
+ # we're being explicit here and listing them out.
+
+ ####################################################
+ # cephadm certmgr known Certificates section
+ known_certs = {
+ TLSObjectScope.SERVICE: [
+ 'iscsi_ssl_cert',
+ 'rgw_frontend_ssl_cert',
+ 'ingress_ssl_cert',
+ 'nvmeof_server_cert',
+ 'nvmeof_client_cert',
+ 'nvmeof_root_ca_cert',
+ ],
+ TLSObjectScope.HOST: [
+ 'grafana_cert',
+ ],
+ TLSObjectScope.GLOBAL: [
+ 'mgmt_gw_cert',
+ 'oauth2_proxy_cert',
+ CEPHADM_ROOT_CA_CERT,
+ ],
+ }
+
+ ####################################################
+ # cephadm certmgr known Keys section
+ known_keys = {
+ TLSObjectScope.SERVICE: [
+ 'iscsi_ssl_key',
+ 'ingress_ssl_key',
+ 'nvmeof_server_key',
+ 'nvmeof_client_key',
+ 'nvmeof_encryption_key',
+ ],
+ TLSObjectScope.HOST: [
+ 'grafana_key',
+ ],
+ TLSObjectScope.GLOBAL: [
+ 'mgmt_gw_key',
+ 'oauth2_proxy_key',
+ CEPHADM_ROOT_CA_KEY,
+ ],
+ }
+
+ cert_to_service = {
+ 'rgw_frontend_ssl_cert': 'rgw',
+ 'iscsi_ssl_cert': 'iscsi',
+ 'ingress_ssl_cert': 'ingress',
+ 'nvmeof_server_cert': 'nvmeof',
+ 'nvmeof_client_cert': 'nvmeof',
+ 'nvmeof_root_ca_cert': 'nvmeof',
+ 'mgmt_gw_cert': 'mgmt-gateway',
+ 'oauth2_proxy_cert': 'oauth2-proxy',
+ 'grafana_cert': 'grafana',
+ }
+
+ def __init__(self,
+ mgr: "CephadmOrchestrator",
+ certificate_automated_rotation_enabled: bool,
+ certificate_duration_days: int,
+ renewal_threshold_days: int,
+ mgr_ip: str) -> None:
+ self.mgr = mgr
+ self.mgr_ip = mgr_ip
+ self.certificate_automated_rotation_enabled = certificate_automated_rotation_enabled
+ self.certificate_duration_days = certificate_duration_days
+ self.renewal_threshold_days = renewal_threshold_days
+ self._init_tlsobject_store()
+ self._initialize_root_ca(mgr_ip)
+
+ def _init_tlsobject_store(self) -> None:
+ self.cert_store = TLSObjectStore(self.mgr, Cert, self.known_certs)
+ self.cert_store.load()
+ self.key_store = TLSObjectStore(self.mgr, PrivKey, self.known_keys)
+ self.key_store.load()
+
+ def load(self) -> None:
+ self.cert_store.load()
+ self.key_store.load()
+
+ def _initialize_root_ca(self, ip: str) -> None:
+ self.ssl_certs: SSLCerts = SSLCerts(self.certificate_duration_days)
+ old_cert = cast(Cert, self.cert_store.get_tlsobject(self.CEPHADM_ROOT_CA_CERT))
+ old_key = cast(PrivKey, self.key_store.get_tlsobject(self.CEPHADM_ROOT_CA_KEY))
if old_key and old_cert:
try:
- self.ssl_certs.load_root_credentials(old_cert, old_key)
- except SSLConfigException:
- raise Exception("Cannot load cephadm root CA certificates.")
+ self.ssl_certs.load_root_credentials(old_cert.cert, old_key.key)
+ except SSLConfigException as e:
+ raise SSLConfigException("Cannot load cephadm root CA certificates.") from e
else:
self.ssl_certs.generate_root_cert(addr=ip)
- mgr.cert_key_store.save_cert(self.CEPHADM_ROOT_CA_CERT, self.ssl_certs.get_root_cert())
- mgr.cert_key_store.save_key(self.CEPHADM_ROOT_CA_KEY, self.ssl_certs.get_root_key())
+ self.cert_store.save_tlsobject(self.CEPHADM_ROOT_CA_CERT, self.ssl_certs.get_root_cert())
+ self.key_store.save_tlsobject(self.CEPHADM_ROOT_CA_KEY, self.ssl_certs.get_root_key())
def get_root_ca(self) -> str:
return self.ssl_certs.get_root_cert()
custom_san_list: Optional[List[str]] = None,
) -> Tuple[str, str]:
return self.ssl_certs.generate_cert(host_fqdn, node_ip, custom_san_list=custom_san_list)
+
+ def get_cert(self, cert_name: str, service_name: Optional[str] = None, host: Optional[str] = None) -> Optional[str]:
+ cert_obj = cast(Cert, self.cert_store.get_tlsobject(cert_name, service_name, host))
+ return cert_obj.cert if cert_obj else None
+
+ def get_key(self, key_name: str, service_name: Optional[str] = None, host: Optional[str] = None) -> Optional[str]:
+ key_obj = cast(PrivKey, self.key_store.get_tlsobject(key_name, service_name, host))
+ return key_obj.key if key_obj else None
+
+ def save_cert(self, cert_name: str, cert: str, service_name: Optional[str] = None, host: Optional[str] = None, user_made: bool = False) -> None:
+ self.cert_store.save_tlsobject(cert_name, cert, service_name, host, user_made)
+
+ def save_key(self, key_name: str, key: str, service_name: Optional[str] = None, host: Optional[str] = None, user_made: bool = False) -> None:
+ self.key_store.save_tlsobject(key_name, key, service_name, host, user_made)
+
+ def rm_cert(self, cert_name: str, service_name: Optional[str] = None, host: Optional[str] = None) -> None:
+ self.cert_store.rm_tlsobject(cert_name, service_name, host)
+
+ def rm_key(self, key_name: str, service_name: Optional[str] = None, host: Optional[str] = None) -> None:
+ self.key_store.rm_tlsobject(key_name, service_name, host)
+
+ def cert_ls(self) -> Dict[str, Union[bool, Dict[str, Dict[str, bool]]]]:
+ ls: Dict = copy.deepcopy(self.cert_store.get_tlsobjects())
+ for k, v in ls.items():
+ if isinstance(v, dict):
+ tmp: Dict[str, Any] = {key: get_certificate_info(cast(Cert, v[key]).cert) for key in v if isinstance(v[key], Cert)}
+ ls[k] = tmp if tmp else {}
+ elif isinstance(v, Cert):
+ ls[k] = get_certificate_info(cast(Cert, v).cert) if bool(v) else False
+ return ls
+
+ def key_ls(self) -> Dict[str, Union[bool, Dict[str, bool]]]:
+ ls: Dict = copy.deepcopy(self.key_store.get_tlsobjects())
+ if self.CEPHADM_ROOT_CA_KEY in ls:
+ del ls[self.CEPHADM_ROOT_CA_KEY]
+ for k, v in ls.items():
+ if isinstance(v, dict) and v:
+ tmp: Dict[str, Any] = {key: get_private_key_info(cast(PrivKey, v[key]).key) for key in v if v[key]}
+ ls[k] = tmp if tmp else {}
+ elif isinstance(v, PrivKey):
+ ls[k] = get_private_key_info(cast(PrivKey, v).key)
+ return ls
+
+ def list_entity_known_certificates(self, entity: str) -> List[str]:
+ return [cert_name for cert_name, service in self.cert_to_service.items() if service == entity]
+
+ def entity_ls(self, get_scope: bool = False) -> List[Union[str, Tuple[str, str]]]:
+ if get_scope:
+ return [(entity, self.determine_scope(entity)) for entity in set(self.cert_to_service.values())]
+ else:
+ return list(self.cert_to_service.values())
+
+ def determine_scope(self, entity: str) -> str:
+ for cert, service in self.cert_to_service.items():
+ if service == entity:
+ if cert in self.known_certs[TLSObjectScope.SERVICE]:
+ return TLSObjectScope.SERVICE.value
+ elif cert in self.known_certs[TLSObjectScope.HOST]:
+ return TLSObjectScope.HOST.value
+ elif cert in self.known_certs[TLSObjectScope.GLOBAL]:
+ return TLSObjectScope.GLOBAL.value
+ return TLSObjectScope.UNKNOWN.value
SPEC_STORE_PREFIX = "spec."
AGENT_CACHE_PREFIX = 'agent.'
NODE_PROXY_CACHE_PREFIX = 'node_proxy'
-CERT_STORE_CERT_PREFIX = 'cert_store.cert.'
-CERT_STORE_KEY_PREFIX = 'cert_store.key.'
class HostCacheStatus(enum.Enum):
else:
cert_str = rgw_cert
assert isinstance(cert_str, str)
- self.mgr.cert_key_store.save_cert(
+ self.mgr.cert_mgr.save_cert(
'rgw_frontend_ssl_cert',
cert_str,
service_name=rgw_spec.service_name(),
elif spec.service_type == 'iscsi':
iscsi_spec = cast(IscsiServiceSpec, spec)
if iscsi_spec.ssl_cert:
- self.mgr.cert_key_store.save_cert(
+ self.mgr.cert_mgr.save_cert(
'iscsi_ssl_cert',
iscsi_spec.ssl_cert,
service_name=iscsi_spec.service_name(),
user_made=True)
if iscsi_spec.ssl_key:
- self.mgr.cert_key_store.save_key(
+ self.mgr.cert_mgr.save_key(
'iscsi_ssl_key',
iscsi_spec.ssl_key,
service_name=iscsi_spec.service_name(),
elif spec.service_type == 'ingress':
ingress_spec = cast(IngressSpec, spec)
if ingress_spec.ssl_cert:
- self.mgr.cert_key_store.save_cert(
+ self.mgr.cert_mgr.save_cert(
'ingress_ssl_cert',
ingress_spec.ssl_cert,
service_name=ingress_spec.service_name(),
user_made=True)
if ingress_spec.ssl_key:
- self.mgr.cert_key_store.save_key(
+ self.mgr.cert_mgr.save_key(
'ingress_ssl_key',
ingress_spec.ssl_key,
service_name=ingress_spec.service_name(),
]:
cert = getattr(nvmeof_spec, cert_attr, None)
if cert:
- self.mgr.cert_key_store.save_cert(
+ self.mgr.cert_mgr.save_cert(
f'nvmeof_{cert_attr}',
cert,
service_name=nvmeof_spec.service_name(),
]:
key = getattr(nvmeof_spec, key_attr, None)
if key:
- self.mgr.cert_key_store.save_key(
+ self.mgr.cert_mgr.save_key(
f'nvmeof_{key_attr}',
key,
service_name=nvmeof_spec.service_name(),
def _rm_certs_and_keys(self, spec: ServiceSpec) -> None:
if spec.service_type == 'rgw':
- self.mgr.cert_key_store.rm_cert('rgw_frontend_ssl_cert', service_name=spec.service_name())
+ self.mgr.cert_mgr.rm_cert('rgw_frontend_ssl_cert', service_name=spec.service_name())
if spec.service_type == 'iscsi':
- self.mgr.cert_key_store.rm_cert('iscsi_ssl_cert', service_name=spec.service_name())
- self.mgr.cert_key_store.rm_key('iscsi_ssl_key', service_name=spec.service_name())
+ self.mgr.cert_mgr.rm_cert('iscsi_ssl_cert', service_name=spec.service_name())
+ self.mgr.cert_mgr.rm_key('iscsi_ssl_key', service_name=spec.service_name())
if spec.service_type == 'ingress':
- self.mgr.cert_key_store.rm_cert('ingress_ssl_cert', service_name=spec.service_name())
- self.mgr.cert_key_store.rm_key('ingress_ssl_key', service_name=spec.service_name())
+ self.mgr.cert_mgr.rm_cert('ingress_ssl_cert', service_name=spec.service_name())
+ self.mgr.cert_mgr.rm_key('ingress_ssl_key', service_name=spec.service_name())
if spec.service_type == 'nvmeof':
- self.mgr.cert_key_store.rm_cert('nvmeof_server_cert', service_name=spec.service_name())
- self.mgr.cert_key_store.rm_cert('nvmeof_client_cert', service_name=spec.service_name())
- self.mgr.cert_key_store.rm_cert('nvmeof_root_ca_cert', service_name=spec.service_name())
- self.mgr.cert_key_store.rm_key('nvmeof_server_key', service_name=spec.service_name())
- self.mgr.cert_key_store.rm_key('nvmeof_client_key', service_name=spec.service_name())
- self.mgr.cert_key_store.rm_key('nvmeof_encryption_key', service_name=spec.service_name())
+ self.mgr.cert_mgr.rm_cert('nvmeof_server_cert', service_name=spec.service_name())
+ self.mgr.cert_mgr.rm_cert('nvmeof_client_cert', service_name=spec.service_name())
+ self.mgr.cert_mgr.rm_cert('nvmeof_root_ca_cert', service_name=spec.service_name())
+ self.mgr.cert_mgr.rm_key('nvmeof_server_key', service_name=spec.service_name())
+ self.mgr.cert_mgr.rm_key('nvmeof_client_key', service_name=spec.service_name())
+ self.mgr.cert_mgr.rm_key('nvmeof_encryption_key', service_name=spec.service_name())
def get_created(self, spec: ServiceSpec) -> Optional[datetime.datetime]:
return self.spec_created.get(spec.service_name())
self.save_agent(daemon_spec.host)
-class Cert():
- def __init__(self, cert: str = '', user_made: bool = False) -> None:
- self.cert = cert
- self.user_made = user_made
-
- def __bool__(self) -> bool:
- return bool(self.cert)
-
- def __eq__(self, other: Any) -> bool:
- if isinstance(other, Cert):
- return self.cert == other.cert and self.user_made == other.user_made
- return NotImplemented
-
- def to_json(self) -> Dict[str, Union[str, bool]]:
- return {
- 'cert': self.cert,
- 'user_made': self.user_made
- }
-
- @classmethod
- def from_json(cls, data: Dict[str, Union[str, bool]]) -> 'Cert':
- if 'cert' not in data:
- return cls()
- cert = data['cert']
- if not isinstance(cert, str):
- raise OrchestratorError('Tried to make Cert object with non-string cert')
- if any(k not in ['cert', 'user_made'] for k in data.keys()):
- raise OrchestratorError(f'Got unknown field for Cert object. Fields: {data.keys()}')
- user_made: Union[str, bool] = data.get('user_made', False)
- if not isinstance(user_made, bool):
- if isinstance(user_made, str):
- if user_made.lower() == 'true':
- user_made = True
- elif user_made.lower() == 'false':
- user_made = False
- try:
- user_made = bool(user_made)
- except Exception:
- raise OrchestratorError(f'Expected user_made field in Cert object to be bool but got {type(user_made)}')
- return cls(cert=cert, user_made=user_made)
-
-
-class PrivKey():
- def __init__(self, key: str = '', user_made: bool = False) -> None:
- self.key = key
- self.user_made = user_made
-
- def __bool__(self) -> bool:
- return bool(self.key)
-
- def __eq__(self, other: Any) -> bool:
- if isinstance(other, PrivKey):
- return self.key == other.key and self.user_made == other.user_made
- return NotImplemented
-
- def to_json(self) -> Dict[str, Union[str, bool]]:
- return {
- 'key': self.key,
- 'user_made': self.user_made
- }
-
- @classmethod
- def from_json(cls, data: Dict[str, str]) -> 'PrivKey':
- if 'key' not in data:
- return cls()
- key = data['key']
- if not isinstance(key, str):
- raise OrchestratorError('Tried to make PrivKey object with non-string key')
- if any(k not in ['key', 'user_made'] for k in data.keys()):
- raise OrchestratorError(f'Got unknown field for PrivKey object. Fields: {data.keys()}')
- user_made: Union[str, bool] = data.get('user_made', False)
- if not isinstance(user_made, bool):
- if isinstance(user_made, str):
- if user_made.lower() == 'true':
- user_made = True
- elif user_made.lower() == 'false':
- user_made = False
- try:
- user_made = bool(user_made)
- except Exception:
- raise OrchestratorError(f'Expected user_made field in PrivKey object to be bool but got {type(user_made)}')
- return cls(key=key, user_made=user_made)
-
-
-class CertKeyStore():
- service_name_cert = [
- 'rgw_frontend_ssl_cert',
- 'iscsi_ssl_cert',
- 'ingress_ssl_cert',
- 'nvmeof_server_cert',
- 'nvmeof_client_cert',
- 'nvmeof_root_ca_cert',
- ]
-
- host_cert = [
- 'grafana_cert',
- ]
-
- host_key = [
- 'grafana_key',
- ]
-
- service_name_key = [
- 'iscsi_ssl_key',
- 'ingress_ssl_key',
- 'nvmeof_server_key',
- 'nvmeof_client_key',
- 'nvmeof_encryption_key',
- ]
-
- known_certs: Dict[str, Any] = {}
- known_keys: Dict[str, Any] = {}
-
- def __init__(self, mgr: 'CephadmOrchestrator') -> None:
- self.mgr: CephadmOrchestrator = mgr
- self._init_known_cert_key_dicts()
-
- def _init_known_cert_key_dicts(self) -> None:
- # In an effort to try and track all the certs we manage in cephadm
- # we're being explicit here and listing them out.
- self.known_certs = {
- 'rgw_frontend_ssl_cert': {}, # service-name -> cert
- 'iscsi_ssl_cert': {}, # service-name -> cert
- 'ingress_ssl_cert': {}, # service-name -> cert
- 'nvmeof_server_cert': {}, # service-name -> cert
- 'nvmeof_client_cert': {}, # service-name -> cert
- 'nvmeof_root_ca_cert': {}, # service-name -> cert
- 'mgmt_gw_cert': Cert(), # cert
- 'oauth2_proxy_cert': Cert(), # cert
- 'cephadm_root_ca_cert': Cert(), # cert
- 'grafana_cert': {}, # host -> cert
- }
- # Similar to certs but for priv keys. Entries in known_certs
- # that don't have a key here are probably certs in PEM format
- # so there is no need to store a separate key
- self.known_keys = {
- 'mgmt_gw_key': PrivKey(), # cert
- 'oauth2_proxy_key': PrivKey(), # cert
- 'cephadm_root_ca_key': PrivKey(), # cert
- 'grafana_key': {}, # host -> key
- 'iscsi_ssl_key': {}, # service-name -> key
- 'ingress_ssl_key': {}, # service-name -> key
- 'nvmeof_server_key': {}, # service-name -> key
- 'nvmeof_client_key': {}, # service-name -> key
- 'nvmeof_encryption_key': {}, # service-name -> key
- }
-
- def get_cert(self, entity: str, service_name: str = '', host: str = '') -> str:
- self._validate_cert_entity(entity, service_name, host)
-
- cert = Cert()
- if entity in self.service_name_cert or entity in self.host_cert:
- var = service_name if entity in self.service_name_cert else host
- if var not in self.known_certs[entity]:
- return ''
- cert = self.known_certs[entity][var]
- else:
- cert = self.known_certs[entity]
- if not cert or not isinstance(cert, Cert):
- return ''
- return cert.cert
-
- def save_cert(self, entity: str, cert: str, service_name: str = '', host: str = '', user_made: bool = False) -> None:
- self._validate_cert_entity(entity, service_name, host)
-
- cert_obj = Cert(cert, user_made)
-
- j: Union[str, Dict[Any, Any], None] = None
- if entity in self.service_name_cert or entity in self.host_cert:
- var = service_name if entity in self.service_name_cert else host
- j = {}
- self.known_certs[entity][var] = cert_obj
- for cert_key in self.known_certs[entity]:
- j[cert_key] = Cert.to_json(self.known_certs[entity][cert_key])
- else:
- self.known_certs[entity] = cert_obj
- j = Cert.to_json(cert_obj)
- self.mgr.set_store(CERT_STORE_CERT_PREFIX + entity, json.dumps(j))
-
- def rm_cert(self, entity: str, service_name: str = '', host: str = '') -> None:
- self.save_cert(entity, cert='', service_name=service_name, host=host)
-
- def _validate_cert_entity(self, entity: str, service_name: str = '', host: str = '') -> None:
- if entity not in self.known_certs.keys():
- raise OrchestratorError(f'Attempted to access cert for unknown entity {entity}')
-
- if entity in self.host_cert and not host:
- raise OrchestratorError(f'Need host to access cert for entity {entity}')
-
- if entity in self.service_name_cert and not service_name:
- raise OrchestratorError(f'Need service name to access cert for entity {entity}')
-
- def cert_ls(self) -> Dict[str, Union[bool, Dict[str, bool]]]:
- ls: Dict[str, Any] = {}
- for k, v in self.known_certs.items():
- if k in self.service_name_cert or k in self.host_cert:
- tmp: Dict[str, Any] = {key: True for key in v if v[key]}
- ls[k] = tmp if tmp else False
- else:
- ls[k] = bool(v)
- return ls
-
- def get_key(self, entity: str, service_name: str = '', host: str = '') -> str:
- self._validate_key_entity(entity, host)
-
- key = PrivKey()
- if entity in self.host_key or entity in self.service_name_key:
- var = service_name if entity in self.service_name_key else host
- if var not in self.known_keys[entity]:
- return ''
- key = self.known_keys[entity][var]
- else:
- key = self.known_keys[entity]
- if not key or not isinstance(key, PrivKey):
- return ''
- return key.key
-
- def save_key(self, entity: str, key: str, service_name: str = '', host: str = '', user_made: bool = False) -> None:
- self._validate_key_entity(entity, host)
-
- pkey = PrivKey(key, user_made)
-
- j: Union[str, Dict[Any, Any], None] = None
- if entity in self.host_key or entity in self.service_name_key:
- var = service_name if entity in self.service_name_key else host
- j = {}
- self.known_keys[entity][var] = pkey
- for k in self.known_keys[entity]:
- j[k] = PrivKey.to_json(self.known_keys[entity][k])
- else:
- self.known_keys[entity] = pkey
- j = PrivKey.to_json(pkey)
- self.mgr.set_store(CERT_STORE_KEY_PREFIX + entity, json.dumps(j))
-
- def rm_key(self, entity: str, service_name: str = '', host: str = '') -> None:
- self.save_key(entity, key='', service_name=service_name, host=host)
-
- def _validate_key_entity(self, entity: str, host: str = '') -> None:
- if entity not in self.known_keys.keys():
- raise OrchestratorError(f'Attempted to access priv key for unknown entity {entity}')
-
- if entity in self.host_key and not host:
- raise OrchestratorError(f'Need host to access priv key for entity {entity}')
-
- def key_ls(self) -> Dict[str, Union[bool, Dict[str, bool]]]:
- ls: Dict[str, Any] = {}
- for k, v in self.known_keys.items():
- if k in self.host_key or k in self.service_name_key:
- tmp: Dict[str, Any] = {key: True for key in v if v[key]}
- ls[k] = tmp if tmp else False
- else:
- ls[k] = bool(v)
- return ls
-
- def load(self) -> None:
- for k, v in self.mgr.get_store_prefix(CERT_STORE_CERT_PREFIX).items():
- entity = k[len(CERT_STORE_CERT_PREFIX):]
- self.known_certs[entity] = json.loads(v)
- if entity in self.service_name_cert or entity in self.host_cert:
- for k in self.known_certs[entity]:
- cert_obj = Cert.from_json(self.known_certs[entity][k])
- self.known_certs[entity][k] = cert_obj
- else:
- cert_obj = Cert.from_json(self.known_certs[entity])
- self.known_certs[entity] = cert_obj
-
- for k, v in self.mgr.get_store_prefix(CERT_STORE_KEY_PREFIX).items():
- entity = k[len(CERT_STORE_KEY_PREFIX):]
- self.known_keys[entity] = json.loads(v)
- if entity in self.host_key or entity in self.service_name_key:
- for k in self.known_keys[entity]:
- priv_key_obj = PrivKey.from_json(self.known_keys[entity][k])
- self.known_keys[entity][k] = priv_key_obj
- else:
- priv_key_obj = PrivKey.from_json(self.known_keys[entity])
- self.known_keys[entity] = priv_key_obj
-
-
class EventStore():
def __init__(self, mgr):
# type: (CephadmOrchestrator) -> None
grafana_cert = self.mgr.get_store(grafana_cert_path)
if grafana_cert:
logger.info(f'Migrating {grafana_daemon.name()} cert to cert store')
- self.mgr.cert_key_store.save_cert('grafana_cert', grafana_cert, host=hostname)
+ self.mgr.cert_mgr.save_cert('grafana_cert', grafana_cert, host=hostname)
grafana_key = self.mgr.get_store(grafana_key_path)
if grafana_key:
logger.info(f'Migrating {grafana_daemon.name()} key to cert store')
- self.mgr.cert_key_store.save_key('grafana_key', grafana_key, host=hostname)
+ self.mgr.cert_mgr.save_key('grafana_key', grafana_key, host=hostname)
# NOTE: prometheus, alertmanager, and node-exporter certs were not stored
# and appeared to just be generated at daemon deploy time if secure_monitoring_stack
ClientKeyringSpec,
TunedProfileStore,
NodeProxyCache,
- CertKeyStore,
OrchSecretNotFound,
)
from .upgrade import CephadmUpgrade
self.tuned_profile_utils = TunedProfileUtils(self)
- self.cert_key_store = CertKeyStore(self)
- self.cert_key_store.load()
-
self.cert_mgr = CertMgr(self, self.get_mgr_ip())
# ensure the host lists are in sync
@handle_orch_error
def cert_store_key_ls(self) -> Dict[str, Any]:
- return self.cert_key_store.key_ls()
+ return self.cert_mgr.key_ls()
@handle_orch_error
def cert_store_get_cert(
hostname: Optional[str] = None,
no_exception_when_missing: bool = False
) -> str:
- cert = self.cert_key_store.get_cert(entity, service_name or '', hostname or '')
+ cert = self.cert_mgr.get_cert(entity, service_name or '', hostname or '')
if not cert:
if no_exception_when_missing:
return ''
hostname: Optional[str] = None,
no_exception_when_missing: bool = False
) -> str:
- key = self.cert_key_store.get_key(entity, service_name or '', hostname or '')
+ key = self.cert_mgr.get_key(entity, service_name or '', hostname or '')
if not key:
if no_exception_when_missing:
return ''
}
self.set_health_checks(self.health_checks)
+ def set_health_error(self, name: str, summary: str, count: int, detail: List[str]) -> None:
+ self.health_checks[name] = {
+ 'severity': 'error',
+ 'summary': summary,
+ 'count': count,
+ 'detail': detail,
+ }
+ self.set_health_checks(self.health_checks)
+
def remove_health_warning(self, name: str) -> None:
if name in self.health_checks:
del self.health_checks[name]
for d in self.mgr.cache.get_daemons_by_type('grafana'):
host = d.hostname
assert host is not None
- cert = self.mgr.cert_key_store.get_cert('grafana_cert', host=host)
- key = self.mgr.cert_key_store.get_key('grafana_key', host=host)
+ cert = self.mgr.cert_mgr.get_cert('grafana_cert', host=host)
+ key = self.mgr.cert_mgr.get_key('grafana_key', host=host)
if (not cert or not cert.strip()) and (not key or not key.strip()):
# certificate/key are empty... nothing to check
return
self.mgr.set_module_option_ex('dashboard', 'standby_behaviour', 'error')
def get_external_certificates(self, svc_spec: MgmtGatewaySpec, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[str, str]:
- cert = self.mgr.cert_key_store.get_cert('mgmt_gw_cert')
- key = self.mgr.cert_key_store.get_key('mgmt_gw_key')
+ cert = self.mgr.cert_mgr.get_cert('mgmt_gw_cert')
+ key = self.mgr.cert_mgr.get_key('mgmt_gw_key')
if not (cert and key):
# not available on store, check if provided on the spec
if svc_spec.ssl_certificate and svc_spec.ssl_certificate_key:
cert, key = self.mgr.cert_mgr.generate_cert(host_fqdn, ips)
# save certificates
if cert and key:
- self.mgr.cert_key_store.save_cert('mgmt_gw_cert', cert)
- self.mgr.cert_key_store.save_key('mgmt_gw_key', key)
+ self.mgr.cert_mgr.save_cert('mgmt_gw_cert', cert)
+ self.mgr.cert_mgr.save_key('mgmt_gw_key', key)
else:
logger.error("Failed to obtain certificate and key from mgmt-gateway.")
return cert, key
self.mgr.set_module_option_ex('dashboard', 'standby_behaviour', 'redirect')
if daemon.hostname is not None:
# delete cert/key entires for this mgmt-gateway daemon
- self.mgr.cert_key_store.rm_cert('mgmt_gw_cert')
- self.mgr.cert_key_store.rm_key('mgmt_gw_key')
+ self.mgr.cert_mgr.rm_cert('mgmt_gw_cert')
+ self.mgr.cert_mgr.rm_key('mgmt_gw_key')
return config_file, self.get_dependencies(self.mgr)
def prepare_certificates(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[str, str]:
- cert = self.mgr.cert_key_store.get_cert('grafana_cert', host=daemon_spec.host)
- pkey = self.mgr.cert_key_store.get_key('grafana_key', host=daemon_spec.host)
+ cert = self.mgr.cert_mgr.get_cert('grafana_cert', host=daemon_spec.host)
+ pkey = self.mgr.cert_mgr.get_key('grafana_key', host=daemon_spec.host)
certs_present = (cert and pkey)
is_valid_certificate = False
(org, cn) = (None, None)
node_ip = self.mgr.inventory.get_addr(daemon_spec.host)
cert, pkey = self.mgr.cert_mgr.generate_cert([host_fqdn, "grafana_servers"], node_ip)
# cert, pkey = create_self_signed_cert('Ceph', host_fqdn)
- self.mgr.cert_key_store.save_cert('grafana_cert', cert, host=daemon_spec.host)
- self.mgr.cert_key_store.save_key('grafana_key', pkey, host=daemon_spec.host)
+ self.mgr.cert_mgr.save_cert('grafana_cert', cert, host=daemon_spec.host)
+ self.mgr.cert_mgr.save_key('grafana_key', pkey, host=daemon_spec.host)
if 'dashboard' in self.mgr.get('mgr_map')['modules']:
self.mgr.check_mon_command({
'prefix': 'dashboard set-grafana-api-ssl-verify',
"""
if daemon.hostname is not None:
# delete cert/key entires for this grafana daemon
- self.mgr.cert_key_store.rm_cert('grafana_cert', host=daemon.hostname)
- self.mgr.cert_key_store.rm_key('grafana_key', host=daemon.hostname)
+ self.mgr.cert_mgr.rm_cert('grafana_cert', host=daemon.hostname)
+ self.mgr.cert_mgr.rm_key('grafana_key', host=daemon.hostname)
def ok_to_stop(self,
daemon_ids: List[str],
return DaemonDescription()
def get_certificates(self, svc_spec: OAuth2ProxySpec, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[str, str]:
- cert = self.mgr.cert_key_store.get_cert('oauth2_proxy_cert')
- key = self.mgr.cert_key_store.get_key('oauth2_proxy_key')
+ cert = self.mgr.cert_mgr.get_cert('oauth2_proxy_cert')
+ key = self.mgr.cert_mgr.get_key('oauth2_proxy_key')
if not (cert and key):
# not available on store, check if provided on the spec
if svc_spec.ssl_certificate and svc_spec.ssl_certificate_key:
cert, key = self.mgr.cert_mgr.generate_cert(host_fqdn, addr)
# save certificates
if cert and key:
- self.mgr.cert_key_store.save_cert('oauth2_proxy_cert', cert)
- self.mgr.cert_key_store.save_key('oauth2_proxy_key', key)
+ self.mgr.cert_mgr.save_cert('oauth2_proxy_cert', cert)
+ self.mgr.cert_mgr.save_key('oauth2_proxy_key', key)
else:
logger.error("Failed to obtain certificate and key from mgmt-gateway.")
return cert, key
--- /dev/null
+from typing import Any, Dict, Union, List, Tuple, Optional, TYPE_CHECKING, Type
+from enum import Enum
+import json
+import logging
+
+from cephadm.tlsobject_types import TLSObjectProtocol, TLSObjectException
+
+
+if TYPE_CHECKING:
+ from cephadm.module import CephadmOrchestrator
+
+
+TLSOBJECT_STORE_PREFIX = 'cert_store.'
+
+
+logger = logging.getLogger(__name__)
+
+
+class TLSObjectScope(Enum):
+ HOST = "host"
+ SERVICE = "service"
+ GLOBAL = "global"
+ UNKNOWN = "unknown"
+
+
+class TLSObjectStore():
+
+ def __init__(self, mgr: 'CephadmOrchestrator',
+ tlsobject_class: Type[TLSObjectProtocol],
+ known_entities: Dict[TLSObjectScope, List[str]]) -> None:
+ self.mgr: CephadmOrchestrator = mgr
+ self.tlsobject_class = tlsobject_class
+ all_known_entities = [item for sublist in known_entities.values() for item in sublist]
+ self.known_entities: Dict[str, Any] = {key: {} for key in all_known_entities}
+ self.per_service_name_tlsobjects = known_entities[TLSObjectScope.SERVICE]
+ self.per_host_tlsobjects = known_entities[TLSObjectScope.HOST]
+ self.store_prefix = f'{TLSOBJECT_STORE_PREFIX}{tlsobject_class.STORAGE_PREFIX}.'
+
+ def determine_tlsobject_target(self, entity: str, target: Optional[str]) -> Tuple[Optional[str], Optional[str]]:
+ if entity in self.per_service_name_tlsobjects:
+ return (target, None)
+ elif entity in self.per_host_tlsobjects:
+ return (None, target)
+ else:
+ return (None, None)
+
+ def get_tlsobject_scope_and_target(self, entity: str, service_name: Optional[str] = None, host: Optional[str] = None) -> Tuple[TLSObjectScope, Optional[Any]]:
+ if entity in self.per_service_name_tlsobjects:
+ return TLSObjectScope.SERVICE, service_name
+ elif entity in self.per_host_tlsobjects:
+ return TLSObjectScope.HOST, host
+ else:
+ return TLSObjectScope.GLOBAL, None
+
+ def get_tlsobject(self, entity: str, service_name: Optional[str] = None, host: Optional[str] = None) -> Optional[TLSObjectProtocol]:
+ self._validate_tlsobject_entity(entity, service_name, host)
+ scope, target = self.get_tlsobject_scope_and_target(entity, service_name, host)
+ if scope == TLSObjectScope.GLOBAL:
+ return self.known_entities.get(entity)
+ else:
+ return self.known_entities.get(entity, {}).get(target)
+
+ def save_tlsobject(self, entity: str, tlsobject: str, service_name: Optional[str] = None, host: Optional[str] = None, user_made: bool = False) -> None:
+ self._validate_tlsobject_entity(entity, service_name, host)
+ tlsobject = self.tlsobject_class(tlsobject, user_made)
+ scope, target = self.get_tlsobject_scope_and_target(entity, service_name, host)
+ j: Union[str, Dict[Any, Any], None] = None
+ if scope in {TLSObjectScope.SERVICE, TLSObjectScope.HOST}:
+ self.known_entities[entity][target] = tlsobject
+ j = {
+ key: self.tlsobject_class.to_json(self.known_entities[entity][key])
+ for key in self.known_entities[entity]
+ }
+ else:
+ self.known_entities[entity] = tlsobject
+ j = self.tlsobject_class.to_json(tlsobject)
+
+ self.mgr.set_store(self.store_prefix + entity, json.dumps(j))
+
+ def rm_tlsobject(self, entity: str, service_name: Optional[str] = None, host: Optional[str] = None) -> None:
+ """Remove a tlsobjectificate for a specific entity, service, or host."""
+ self._validate_tlsobject_entity(entity, service_name, host)
+ scope, target = self.get_tlsobject_scope_and_target(entity, service_name, host)
+ j: Union[str, Dict[Any, Any], None] = None
+ if scope in {TLSObjectScope.SERVICE, TLSObjectScope.HOST}:
+ if entity in self.known_entities and target in self.known_entities[entity]:
+ del self.known_entities[entity][target]
+ j = {
+ key: self.tlsobject_class.to_json(self.known_entities[entity][key])
+ for key in self.known_entities[entity]
+ }
+ self.mgr.set_store(self.store_prefix + entity, json.dumps(j))
+ elif scope == TLSObjectScope.GLOBAL:
+ self.known_entities[entity] = self.tlsobject_class()
+ j = self.tlsobject_class.to_json(self.known_entities[entity])
+ self.mgr.set_store(self.store_prefix + entity, json.dumps(j))
+ else:
+ raise TLSObjectException(f'Attempted to remove {self.tlsobject_class.__name__.lower()} for unknown entity {entity}')
+
+ def _validate_tlsobject_entity(self, entity: str, service_name: Optional[str] = None, host: Optional[str] = None) -> None:
+ cred_type = self.tlsobject_class.__name__.lower()
+ if entity not in self.known_entities.keys():
+ raise TLSObjectException(f'Attempted to access {cred_type} for unknown entity {entity}')
+ if entity in self.per_host_tlsobjects and not host:
+ raise TLSObjectException(f'Need host to access {cred_type} for entity {entity}')
+ if entity in self.per_service_name_tlsobjects and not service_name:
+ raise TLSObjectException(f'Need service name to access {cred_type} for entity {entity}')
+
+ def list_tlsobjects(self) -> List[Tuple[str, Type[TLSObjectProtocol], Optional[str]]]:
+ """
+ Returns a shallow list of all known TLS objects, including their targets.
+
+ Returns:
+ List of tuples: (entity, tlsobject, target)
+ - entity: The TLS object entity name.
+ - tlsobject: The TLS object itself.
+ - target: The associated target (service_name, host, or None for global).
+ """
+ tlsobjects = []
+ for known_entity, value in self.known_entities.items():
+ if isinstance(value, dict): # Handle per-service or per-host TLS objects
+ for target, tlsobject in value.items():
+ if tlsobject:
+ tlsobjects.append((known_entity, tlsobject, target))
+ else: # Handle Global TLS objects
+ tlsobjects.append((known_entity, value, None))
+
+ return tlsobjects
+
+ def get_tlsobjects(self) -> Dict[str, Union[Type[TLSObjectProtocol], Dict[str, Type[TLSObjectProtocol]]]]:
+ return self.known_entities
+
+ def load(self) -> None:
+ for k, v in self.mgr.get_store_prefix(self.store_prefix).items():
+ entity = k[len(self.store_prefix):]
+ self.known_entities[entity] = json.loads(v)
+ if entity in self.per_service_name_tlsobjects or entity in self.per_host_tlsobjects:
+ for k in self.known_entities[entity]:
+ tlsobject = self.tlsobject_class.from_json(self.known_entities[entity][k])
+ self.known_entities[entity][k] = tlsobject
+ else:
+ tlsobject = self.tlsobject_class.from_json(self.known_entities[entity])
+ self.known_entities[entity] = tlsobject
--- /dev/null
+from typing import Any, Dict, Protocol, Union
+from orchestrator import OrchestratorError
+
+
+class TLSObjectException(OrchestratorError):
+ pass
+
+
+class TLSObjectProtocol(Protocol):
+ STORAGE_PREFIX: str
+
+ def __init__(self, cert: str = '', user_made: bool = False) -> None:
+ ...
+
+ def __bool__(self) -> bool:
+ ...
+
+ def __eq__(self, other: Any) -> bool:
+ ...
+
+ def to_json(self) -> Dict[str, Union[str, bool]]:
+ ...
+
+ @classmethod
+ def from_json(cls, data: Dict[str, Any]) -> 'TLSObjectProtocol':
+ ...
+
+
+class Cert(TLSObjectProtocol):
+ STORAGE_PREFIX = 'cert'
+
+ def __init__(self, cert: str = '', user_made: bool = False) -> None:
+ self.cert = cert
+ self.user_made = user_made
+
+ def __bool__(self) -> bool:
+ return bool(self.cert)
+
+ def __eq__(self, other: Any) -> bool:
+ if isinstance(other, Cert):
+ return self.cert == other.cert and self.user_made == other.user_made
+ return NotImplemented
+
+ def to_json(self) -> Dict[str, Union[str, bool]]:
+ if (self):
+ return {
+ 'cert': self.cert,
+ 'user_made': self.user_made
+ }
+ else:
+ return {}
+
+ @classmethod
+ def from_json(cls, data: Dict[str, Union[str, bool]]) -> 'Cert':
+ if 'cert' not in data:
+ return cls()
+ cert = data['cert']
+ if not isinstance(cert, str):
+ raise TLSObjectException('Tried to make Cert object with non-string cert')
+ if any(k not in ['cert', 'user_made'] for k in data.keys()):
+ raise TLSObjectException(f'Got unknown field for Cert object. Fields: {data.keys()}')
+ user_made: Union[str, bool] = data.get('user_made', False)
+ if not isinstance(user_made, bool):
+ if isinstance(user_made, str):
+ if user_made.lower() == 'true':
+ user_made = True
+ elif user_made.lower() == 'false':
+ user_made = False
+ try:
+ user_made = bool(user_made)
+ except Exception:
+ raise TLSObjectException(f'Expected user_made field in Cert object to be bool but got {type(user_made)}')
+ return cls(cert=cert, user_made=user_made)
+
+
+class PrivKey(TLSObjectProtocol):
+ STORAGE_PREFIX = 'key'
+
+ def __init__(self, key: str = '', user_made: bool = False) -> None:
+ self.key = key
+ self.user_made = user_made
+
+ def __bool__(self) -> bool:
+ return bool(self.key)
+
+ def __eq__(self, other: Any) -> bool:
+ if isinstance(other, PrivKey):
+ return self.key == other.key and self.user_made == other.user_made
+ return NotImplemented
+
+ def to_json(self) -> Dict[str, Union[str, bool]]:
+ if bool(self):
+ return {
+ 'key': self.key,
+ 'user_made': self.user_made
+ }
+ else:
+ return {}
+
+ @classmethod
+ def from_json(cls, data: Dict[str, str]) -> 'PrivKey':
+ if 'key' not in data:
+ return cls()
+ key = data['key']
+ if not isinstance(key, str):
+ raise TLSObjectException('Tried to make PrivKey object with non-string key')
+ if any(k not in ['key', 'user_made'] for k in data.keys()):
+ raise TLSObjectException(f'Got unknown field for PrivKey object. Fields: {data.keys()}')
+ user_made: Union[str, bool] = data.get('user_made', False)
+ if not isinstance(user_made, bool):
+ if isinstance(user_made, str):
+ if user_made.lower() == 'true':
+ user_made = True
+ elif user_made.lower() == 'false':
+ user_made = False
+ try:
+ user_made = bool(user_made)
+ except Exception:
+ raise TLSObjectException(f'Expected user_made field in PrivKey object to be bool but got {type(user_made)}')
+ return cls(key=key, user_made=user_made)