]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/cephadm: refactor cert_store to use the same code for cert/key
authorRedouane Kachach <rkachach@ibm.com>
Mon, 27 Jan 2025 15:10:34 +0000 (16:10 +0100)
committerRedouane Kachach <rkachach@ibm.com>
Tue, 11 Mar 2025 09:33:04 +0000 (10:33 +0100)
Signed-off-by: Redouane Kachach <rkachach@ibm.com>
src/pybind/mgr/cephadm/cert_mgr.py
src/pybind/mgr/cephadm/inventory.py
src/pybind/mgr/cephadm/migrations.py
src/pybind/mgr/cephadm/module.py
src/pybind/mgr/cephadm/serve.py
src/pybind/mgr/cephadm/services/mgmt_gateway.py
src/pybind/mgr/cephadm/services/monitoring.py
src/pybind/mgr/cephadm/services/oauth2_proxy.py
src/pybind/mgr/cephadm/tlsobject_store.py [new file with mode: 0644]
src/pybind/mgr/cephadm/tlsobject_types.py [new file with mode: 0644]

index 3744f7491a588509035bc57d1472dd484dcdf52f..6ba186aec71df12f0975389190ca5b6dabe761f2 100644 (file)
+from typing import TYPE_CHECKING, Tuple, Union, List, Dict, Optional, cast, Any
+import logging
+import copy
 
 from cephadm.ssl_cert_utils import SSLCerts, SSLConfigException
 from typing import TYPE_CHECKING, Tuple, Union, List, Optional
+from cephadm.tlsobject_types import Cert, PrivKey
+from cephadm.tlsobject_store import TLSObjectStore, TLSObjectScope
 
 if TYPE_CHECKING:
     from cephadm.module import CephadmOrchestrator
 
+logger = logging.getLogger(__name__)
+
 
 class CertMgr:
 
     CEPHADM_ROOT_CA_CERT = 'cephadm_root_ca_cert'
     CEPHADM_ROOT_CA_KEY = 'cephadm_root_ca_key'
 
-    def __init__(self, mgr: "CephadmOrchestrator", ip: str) -> None:
-        self.ssl_certs: SSLCerts = SSLCerts(mgr._cluster_fsid)
-        old_cert = mgr.cert_key_store.get_cert(self.CEPHADM_ROOT_CA_CERT)
-        old_key = mgr.cert_key_store.get_key(self.CEPHADM_ROOT_CA_KEY)
+    # In an effort to try and track all the certs we manage in cephadm
+    # we're being explicit here and listing them out.
+
+    ####################################################
+    #  cephadm certmgr known Certificates section
+    known_certs = {
+        TLSObjectScope.SERVICE: [
+            'iscsi_ssl_cert',
+            'rgw_frontend_ssl_cert',
+            'ingress_ssl_cert',
+            'nvmeof_server_cert',
+            'nvmeof_client_cert',
+            'nvmeof_root_ca_cert',
+        ],
+        TLSObjectScope.HOST: [
+            'grafana_cert',
+        ],
+        TLSObjectScope.GLOBAL: [
+            'mgmt_gw_cert',
+            'oauth2_proxy_cert',
+            CEPHADM_ROOT_CA_CERT,
+        ],
+    }
+
+    ####################################################
+    #  cephadm certmgr known Keys section
+    known_keys = {
+        TLSObjectScope.SERVICE: [
+            'iscsi_ssl_key',
+            'ingress_ssl_key',
+            'nvmeof_server_key',
+            'nvmeof_client_key',
+            'nvmeof_encryption_key',
+        ],
+        TLSObjectScope.HOST: [
+            'grafana_key',
+        ],
+        TLSObjectScope.GLOBAL: [
+            'mgmt_gw_key',
+            'oauth2_proxy_key',
+            CEPHADM_ROOT_CA_KEY,
+        ],
+    }
+
+    cert_to_service = {
+        'rgw_frontend_ssl_cert': 'rgw',
+        'iscsi_ssl_cert': 'iscsi',
+        'ingress_ssl_cert': 'ingress',
+        'nvmeof_server_cert': 'nvmeof',
+        'nvmeof_client_cert': 'nvmeof',
+        'nvmeof_root_ca_cert': 'nvmeof',
+        'mgmt_gw_cert': 'mgmt-gateway',
+        'oauth2_proxy_cert': 'oauth2-proxy',
+        'grafana_cert': 'grafana',
+    }
+
+    def __init__(self,
+                 mgr: "CephadmOrchestrator",
+                 certificate_automated_rotation_enabled: bool,
+                 certificate_duration_days: int,
+                 renewal_threshold_days: int,
+                 mgr_ip: str) -> None:
+        self.mgr = mgr
+        self.mgr_ip = mgr_ip
+        self.certificate_automated_rotation_enabled = certificate_automated_rotation_enabled
+        self.certificate_duration_days = certificate_duration_days
+        self.renewal_threshold_days = renewal_threshold_days
+        self._init_tlsobject_store()
+        self._initialize_root_ca(mgr_ip)
+
+    def _init_tlsobject_store(self) -> None:
+        self.cert_store = TLSObjectStore(self.mgr, Cert, self.known_certs)
+        self.cert_store.load()
+        self.key_store = TLSObjectStore(self.mgr, PrivKey, self.known_keys)
+        self.key_store.load()
+
+    def load(self) -> None:
+        self.cert_store.load()
+        self.key_store.load()
+
+    def _initialize_root_ca(self, ip: str) -> None:
+        self.ssl_certs: SSLCerts = SSLCerts(self.certificate_duration_days)
+        old_cert = cast(Cert, self.cert_store.get_tlsobject(self.CEPHADM_ROOT_CA_CERT))
+        old_key = cast(PrivKey, self.key_store.get_tlsobject(self.CEPHADM_ROOT_CA_KEY))
         if old_key and old_cert:
             try:
-                self.ssl_certs.load_root_credentials(old_cert, old_key)
-            except SSLConfigException:
-                raise Exception("Cannot load cephadm root CA certificates.")
+                self.ssl_certs.load_root_credentials(old_cert.cert, old_key.key)
+            except SSLConfigException as e:
+                raise SSLConfigException("Cannot load cephadm root CA certificates.") from e
         else:
             self.ssl_certs.generate_root_cert(addr=ip)
-            mgr.cert_key_store.save_cert(self.CEPHADM_ROOT_CA_CERT, self.ssl_certs.get_root_cert())
-            mgr.cert_key_store.save_key(self.CEPHADM_ROOT_CA_KEY, self.ssl_certs.get_root_key())
+            self.cert_store.save_tlsobject(self.CEPHADM_ROOT_CA_CERT, self.ssl_certs.get_root_cert())
+            self.key_store.save_tlsobject(self.CEPHADM_ROOT_CA_KEY, self.ssl_certs.get_root_key())
 
     def get_root_ca(self) -> str:
         return self.ssl_certs.get_root_cert()
@@ -35,3 +122,65 @@ class CertMgr:
         custom_san_list: Optional[List[str]] = None,
     ) -> Tuple[str, str]:
         return self.ssl_certs.generate_cert(host_fqdn, node_ip, custom_san_list=custom_san_list)
+
+    def get_cert(self, cert_name: str, service_name: Optional[str] = None, host: Optional[str] = None) -> Optional[str]:
+        cert_obj = cast(Cert, self.cert_store.get_tlsobject(cert_name, service_name, host))
+        return cert_obj.cert if cert_obj else None
+
+    def get_key(self, key_name: str, service_name: Optional[str] = None, host: Optional[str] = None) -> Optional[str]:
+        key_obj = cast(PrivKey, self.key_store.get_tlsobject(key_name, service_name, host))
+        return key_obj.key if key_obj else None
+
+    def save_cert(self, cert_name: str, cert: str, service_name: Optional[str] = None, host: Optional[str] = None, user_made: bool = False) -> None:
+        self.cert_store.save_tlsobject(cert_name, cert, service_name, host, user_made)
+
+    def save_key(self, key_name: str, key: str, service_name: Optional[str] = None, host: Optional[str] = None, user_made: bool = False) -> None:
+        self.key_store.save_tlsobject(key_name, key, service_name, host, user_made)
+
+    def rm_cert(self, cert_name: str, service_name: Optional[str] = None, host: Optional[str] = None) -> None:
+        self.cert_store.rm_tlsobject(cert_name, service_name, host)
+
+    def rm_key(self, key_name: str, service_name: Optional[str] = None, host: Optional[str] = None) -> None:
+        self.key_store.rm_tlsobject(key_name, service_name, host)
+
+    def cert_ls(self) -> Dict[str, Union[bool, Dict[str, Dict[str, bool]]]]:
+        ls: Dict = copy.deepcopy(self.cert_store.get_tlsobjects())
+        for k, v in ls.items():
+            if isinstance(v, dict):
+                tmp: Dict[str, Any] = {key: get_certificate_info(cast(Cert, v[key]).cert) for key in v if isinstance(v[key], Cert)}
+                ls[k] = tmp if tmp else {}
+            elif isinstance(v, Cert):
+                ls[k] = get_certificate_info(cast(Cert, v).cert) if bool(v) else False
+        return ls
+
+    def key_ls(self) -> Dict[str, Union[bool, Dict[str, bool]]]:
+        ls: Dict = copy.deepcopy(self.key_store.get_tlsobjects())
+        if self.CEPHADM_ROOT_CA_KEY in ls:
+            del ls[self.CEPHADM_ROOT_CA_KEY]
+        for k, v in ls.items():
+            if isinstance(v, dict) and v:
+                tmp: Dict[str, Any] = {key: get_private_key_info(cast(PrivKey, v[key]).key) for key in v if v[key]}
+                ls[k] = tmp if tmp else {}
+            elif isinstance(v, PrivKey):
+                ls[k] = get_private_key_info(cast(PrivKey, v).key)
+        return ls
+
+    def list_entity_known_certificates(self, entity: str) -> List[str]:
+        return [cert_name for cert_name, service in self.cert_to_service.items() if service == entity]
+
+    def entity_ls(self, get_scope: bool = False) -> List[Union[str, Tuple[str, str]]]:
+        if get_scope:
+            return [(entity, self.determine_scope(entity)) for entity in set(self.cert_to_service.values())]
+        else:
+            return list(self.cert_to_service.values())
+
+    def determine_scope(self, entity: str) -> str:
+        for cert, service in self.cert_to_service.items():
+            if service == entity:
+                if cert in self.known_certs[TLSObjectScope.SERVICE]:
+                    return TLSObjectScope.SERVICE.value
+                elif cert in self.known_certs[TLSObjectScope.HOST]:
+                    return TLSObjectScope.HOST.value
+                elif cert in self.known_certs[TLSObjectScope.GLOBAL]:
+                    return TLSObjectScope.GLOBAL.value
+        return TLSObjectScope.UNKNOWN.value
index 7bd5704eb0bd6ad24baa518f5863cd581a4c9646..0e948c0cb905e0b2d48e34398a840e5ce1e92e09 100644 (file)
@@ -38,8 +38,6 @@ HOST_CACHE_PREFIX = "host."
 SPEC_STORE_PREFIX = "spec."
 AGENT_CACHE_PREFIX = 'agent.'
 NODE_PROXY_CACHE_PREFIX = 'node_proxy'
-CERT_STORE_CERT_PREFIX = 'cert_store.cert.'
-CERT_STORE_KEY_PREFIX = 'cert_store.key.'
 
 
 class HostCacheStatus(enum.Enum):
@@ -402,7 +400,7 @@ class SpecStore():
                 else:
                     cert_str = rgw_cert
                 assert isinstance(cert_str, str)
-                self.mgr.cert_key_store.save_cert(
+                self.mgr.cert_mgr.save_cert(
                     'rgw_frontend_ssl_cert',
                     cert_str,
                     service_name=rgw_spec.service_name(),
@@ -410,13 +408,13 @@ class SpecStore():
         elif spec.service_type == 'iscsi':
             iscsi_spec = cast(IscsiServiceSpec, spec)
             if iscsi_spec.ssl_cert:
-                self.mgr.cert_key_store.save_cert(
+                self.mgr.cert_mgr.save_cert(
                     'iscsi_ssl_cert',
                     iscsi_spec.ssl_cert,
                     service_name=iscsi_spec.service_name(),
                     user_made=True)
             if iscsi_spec.ssl_key:
-                self.mgr.cert_key_store.save_key(
+                self.mgr.cert_mgr.save_key(
                     'iscsi_ssl_key',
                     iscsi_spec.ssl_key,
                     service_name=iscsi_spec.service_name(),
@@ -424,13 +422,13 @@ class SpecStore():
         elif spec.service_type == 'ingress':
             ingress_spec = cast(IngressSpec, spec)
             if ingress_spec.ssl_cert:
-                self.mgr.cert_key_store.save_cert(
+                self.mgr.cert_mgr.save_cert(
                     'ingress_ssl_cert',
                     ingress_spec.ssl_cert,
                     service_name=ingress_spec.service_name(),
                     user_made=True)
             if ingress_spec.ssl_key:
-                self.mgr.cert_key_store.save_key(
+                self.mgr.cert_mgr.save_key(
                     'ingress_ssl_key',
                     ingress_spec.ssl_key,
                     service_name=ingress_spec.service_name(),
@@ -444,7 +442,7 @@ class SpecStore():
             ]:
                 cert = getattr(nvmeof_spec, cert_attr, None)
                 if cert:
-                    self.mgr.cert_key_store.save_cert(
+                    self.mgr.cert_mgr.save_cert(
                         f'nvmeof_{cert_attr}',
                         cert,
                         service_name=nvmeof_spec.service_name(),
@@ -456,7 +454,7 @@ class SpecStore():
             ]:
                 key = getattr(nvmeof_spec, key_attr, None)
                 if key:
-                    self.mgr.cert_key_store.save_key(
+                    self.mgr.cert_mgr.save_key(
                         f'nvmeof_{key_attr}',
                         key,
                         service_name=nvmeof_spec.service_name(),
@@ -492,20 +490,20 @@ class SpecStore():
 
     def _rm_certs_and_keys(self, spec: ServiceSpec) -> None:
         if spec.service_type == 'rgw':
-            self.mgr.cert_key_store.rm_cert('rgw_frontend_ssl_cert', service_name=spec.service_name())
+            self.mgr.cert_mgr.rm_cert('rgw_frontend_ssl_cert', service_name=spec.service_name())
         if spec.service_type == 'iscsi':
-            self.mgr.cert_key_store.rm_cert('iscsi_ssl_cert', service_name=spec.service_name())
-            self.mgr.cert_key_store.rm_key('iscsi_ssl_key', service_name=spec.service_name())
+            self.mgr.cert_mgr.rm_cert('iscsi_ssl_cert', service_name=spec.service_name())
+            self.mgr.cert_mgr.rm_key('iscsi_ssl_key', service_name=spec.service_name())
         if spec.service_type == 'ingress':
-            self.mgr.cert_key_store.rm_cert('ingress_ssl_cert', service_name=spec.service_name())
-            self.mgr.cert_key_store.rm_key('ingress_ssl_key', service_name=spec.service_name())
+            self.mgr.cert_mgr.rm_cert('ingress_ssl_cert', service_name=spec.service_name())
+            self.mgr.cert_mgr.rm_key('ingress_ssl_key', service_name=spec.service_name())
         if spec.service_type == 'nvmeof':
-            self.mgr.cert_key_store.rm_cert('nvmeof_server_cert', service_name=spec.service_name())
-            self.mgr.cert_key_store.rm_cert('nvmeof_client_cert', service_name=spec.service_name())
-            self.mgr.cert_key_store.rm_cert('nvmeof_root_ca_cert', service_name=spec.service_name())
-            self.mgr.cert_key_store.rm_key('nvmeof_server_key', service_name=spec.service_name())
-            self.mgr.cert_key_store.rm_key('nvmeof_client_key', service_name=spec.service_name())
-            self.mgr.cert_key_store.rm_key('nvmeof_encryption_key', service_name=spec.service_name())
+            self.mgr.cert_mgr.rm_cert('nvmeof_server_cert', service_name=spec.service_name())
+            self.mgr.cert_mgr.rm_cert('nvmeof_client_cert', service_name=spec.service_name())
+            self.mgr.cert_mgr.rm_cert('nvmeof_root_ca_cert', service_name=spec.service_name())
+            self.mgr.cert_mgr.rm_key('nvmeof_server_key', service_name=spec.service_name())
+            self.mgr.cert_mgr.rm_key('nvmeof_client_key', service_name=spec.service_name())
+            self.mgr.cert_mgr.rm_key('nvmeof_encryption_key', service_name=spec.service_name())
 
     def get_created(self, spec: ServiceSpec) -> Optional[datetime.datetime]:
         return self.spec_created.get(spec.service_name())
@@ -1890,284 +1888,6 @@ class AgentCache():
         self.save_agent(daemon_spec.host)
 
 
-class Cert():
-    def __init__(self, cert: str = '', user_made: bool = False) -> None:
-        self.cert = cert
-        self.user_made = user_made
-
-    def __bool__(self) -> bool:
-        return bool(self.cert)
-
-    def __eq__(self, other: Any) -> bool:
-        if isinstance(other, Cert):
-            return self.cert == other.cert and self.user_made == other.user_made
-        return NotImplemented
-
-    def to_json(self) -> Dict[str, Union[str, bool]]:
-        return {
-            'cert': self.cert,
-            'user_made': self.user_made
-        }
-
-    @classmethod
-    def from_json(cls, data: Dict[str, Union[str, bool]]) -> 'Cert':
-        if 'cert' not in data:
-            return cls()
-        cert = data['cert']
-        if not isinstance(cert, str):
-            raise OrchestratorError('Tried to make Cert object with non-string cert')
-        if any(k not in ['cert', 'user_made'] for k in data.keys()):
-            raise OrchestratorError(f'Got unknown field for Cert object. Fields: {data.keys()}')
-        user_made: Union[str, bool] = data.get('user_made', False)
-        if not isinstance(user_made, bool):
-            if isinstance(user_made, str):
-                if user_made.lower() == 'true':
-                    user_made = True
-                elif user_made.lower() == 'false':
-                    user_made = False
-            try:
-                user_made = bool(user_made)
-            except Exception:
-                raise OrchestratorError(f'Expected user_made field in Cert object to be bool but got {type(user_made)}')
-        return cls(cert=cert, user_made=user_made)
-
-
-class PrivKey():
-    def __init__(self, key: str = '', user_made: bool = False) -> None:
-        self.key = key
-        self.user_made = user_made
-
-    def __bool__(self) -> bool:
-        return bool(self.key)
-
-    def __eq__(self, other: Any) -> bool:
-        if isinstance(other, PrivKey):
-            return self.key == other.key and self.user_made == other.user_made
-        return NotImplemented
-
-    def to_json(self) -> Dict[str, Union[str, bool]]:
-        return {
-            'key': self.key,
-            'user_made': self.user_made
-        }
-
-    @classmethod
-    def from_json(cls, data: Dict[str, str]) -> 'PrivKey':
-        if 'key' not in data:
-            return cls()
-        key = data['key']
-        if not isinstance(key, str):
-            raise OrchestratorError('Tried to make PrivKey object with non-string key')
-        if any(k not in ['key', 'user_made'] for k in data.keys()):
-            raise OrchestratorError(f'Got unknown field for PrivKey object. Fields: {data.keys()}')
-        user_made: Union[str, bool] = data.get('user_made', False)
-        if not isinstance(user_made, bool):
-            if isinstance(user_made, str):
-                if user_made.lower() == 'true':
-                    user_made = True
-                elif user_made.lower() == 'false':
-                    user_made = False
-            try:
-                user_made = bool(user_made)
-            except Exception:
-                raise OrchestratorError(f'Expected user_made field in PrivKey object to be bool but got {type(user_made)}')
-        return cls(key=key, user_made=user_made)
-
-
-class CertKeyStore():
-    service_name_cert = [
-        'rgw_frontend_ssl_cert',
-        'iscsi_ssl_cert',
-        'ingress_ssl_cert',
-        'nvmeof_server_cert',
-        'nvmeof_client_cert',
-        'nvmeof_root_ca_cert',
-    ]
-
-    host_cert = [
-        'grafana_cert',
-    ]
-
-    host_key = [
-        'grafana_key',
-    ]
-
-    service_name_key = [
-        'iscsi_ssl_key',
-        'ingress_ssl_key',
-        'nvmeof_server_key',
-        'nvmeof_client_key',
-        'nvmeof_encryption_key',
-    ]
-
-    known_certs: Dict[str, Any] = {}
-    known_keys: Dict[str, Any] = {}
-
-    def __init__(self, mgr: 'CephadmOrchestrator') -> None:
-        self.mgr: CephadmOrchestrator = mgr
-        self._init_known_cert_key_dicts()
-
-    def _init_known_cert_key_dicts(self) -> None:
-        # In an effort to try and track all the certs we manage in cephadm
-        # we're being explicit here and listing them out.
-        self.known_certs = {
-            'rgw_frontend_ssl_cert': {},  # service-name -> cert
-            'iscsi_ssl_cert': {},  # service-name -> cert
-            'ingress_ssl_cert': {},  # service-name -> cert
-            'nvmeof_server_cert': {},  # service-name -> cert
-            'nvmeof_client_cert': {},  # service-name -> cert
-            'nvmeof_root_ca_cert': {},  # service-name -> cert
-            'mgmt_gw_cert': Cert(),  # cert
-            'oauth2_proxy_cert': Cert(),  # cert
-            'cephadm_root_ca_cert': Cert(),  # cert
-            'grafana_cert': {},  # host -> cert
-        }
-        # Similar to certs but for priv keys. Entries in known_certs
-        # that don't have a key here are probably certs in PEM format
-        # so there is no need to store a separate key
-        self.known_keys = {
-            'mgmt_gw_key': PrivKey(),  # cert
-            'oauth2_proxy_key': PrivKey(),  # cert
-            'cephadm_root_ca_key': PrivKey(),  # cert
-            'grafana_key': {},  # host -> key
-            'iscsi_ssl_key': {},  # service-name -> key
-            'ingress_ssl_key': {},  # service-name -> key
-            'nvmeof_server_key': {},  # service-name -> key
-            'nvmeof_client_key': {},  # service-name -> key
-            'nvmeof_encryption_key': {},  # service-name -> key
-        }
-
-    def get_cert(self, entity: str, service_name: str = '', host: str = '') -> str:
-        self._validate_cert_entity(entity, service_name, host)
-
-        cert = Cert()
-        if entity in self.service_name_cert or entity in self.host_cert:
-            var = service_name if entity in self.service_name_cert else host
-            if var not in self.known_certs[entity]:
-                return ''
-            cert = self.known_certs[entity][var]
-        else:
-            cert = self.known_certs[entity]
-        if not cert or not isinstance(cert, Cert):
-            return ''
-        return cert.cert
-
-    def save_cert(self, entity: str, cert: str, service_name: str = '', host: str = '', user_made: bool = False) -> None:
-        self._validate_cert_entity(entity, service_name, host)
-
-        cert_obj = Cert(cert, user_made)
-
-        j: Union[str, Dict[Any, Any], None] = None
-        if entity in self.service_name_cert or entity in self.host_cert:
-            var = service_name if entity in self.service_name_cert else host
-            j = {}
-            self.known_certs[entity][var] = cert_obj
-            for cert_key in self.known_certs[entity]:
-                j[cert_key] = Cert.to_json(self.known_certs[entity][cert_key])
-        else:
-            self.known_certs[entity] = cert_obj
-            j = Cert.to_json(cert_obj)
-        self.mgr.set_store(CERT_STORE_CERT_PREFIX + entity, json.dumps(j))
-
-    def rm_cert(self, entity: str, service_name: str = '', host: str = '') -> None:
-        self.save_cert(entity, cert='', service_name=service_name, host=host)
-
-    def _validate_cert_entity(self, entity: str, service_name: str = '', host: str = '') -> None:
-        if entity not in self.known_certs.keys():
-            raise OrchestratorError(f'Attempted to access cert for unknown entity {entity}')
-
-        if entity in self.host_cert and not host:
-            raise OrchestratorError(f'Need host to access cert for entity {entity}')
-
-        if entity in self.service_name_cert and not service_name:
-            raise OrchestratorError(f'Need service name to access cert for entity {entity}')
-
-    def cert_ls(self) -> Dict[str, Union[bool, Dict[str, bool]]]:
-        ls: Dict[str, Any] = {}
-        for k, v in self.known_certs.items():
-            if k in self.service_name_cert or k in self.host_cert:
-                tmp: Dict[str, Any] = {key: True for key in v if v[key]}
-                ls[k] = tmp if tmp else False
-            else:
-                ls[k] = bool(v)
-        return ls
-
-    def get_key(self, entity: str, service_name: str = '', host: str = '') -> str:
-        self._validate_key_entity(entity, host)
-
-        key = PrivKey()
-        if entity in self.host_key or entity in self.service_name_key:
-            var = service_name if entity in self.service_name_key else host
-            if var not in self.known_keys[entity]:
-                return ''
-            key = self.known_keys[entity][var]
-        else:
-            key = self.known_keys[entity]
-        if not key or not isinstance(key, PrivKey):
-            return ''
-        return key.key
-
-    def save_key(self, entity: str, key: str, service_name: str = '', host: str = '', user_made: bool = False) -> None:
-        self._validate_key_entity(entity, host)
-
-        pkey = PrivKey(key, user_made)
-
-        j: Union[str, Dict[Any, Any], None] = None
-        if entity in self.host_key or entity in self.service_name_key:
-            var = service_name if entity in self.service_name_key else host
-            j = {}
-            self.known_keys[entity][var] = pkey
-            for k in self.known_keys[entity]:
-                j[k] = PrivKey.to_json(self.known_keys[entity][k])
-        else:
-            self.known_keys[entity] = pkey
-            j = PrivKey.to_json(pkey)
-        self.mgr.set_store(CERT_STORE_KEY_PREFIX + entity, json.dumps(j))
-
-    def rm_key(self, entity: str, service_name: str = '', host: str = '') -> None:
-        self.save_key(entity, key='', service_name=service_name, host=host)
-
-    def _validate_key_entity(self, entity: str, host: str = '') -> None:
-        if entity not in self.known_keys.keys():
-            raise OrchestratorError(f'Attempted to access priv key for unknown entity {entity}')
-
-        if entity in self.host_key and not host:
-            raise OrchestratorError(f'Need host to access priv key for entity {entity}')
-
-    def key_ls(self) -> Dict[str, Union[bool, Dict[str, bool]]]:
-        ls: Dict[str, Any] = {}
-        for k, v in self.known_keys.items():
-            if k in self.host_key or k in self.service_name_key:
-                tmp: Dict[str, Any] = {key: True for key in v if v[key]}
-                ls[k] = tmp if tmp else False
-            else:
-                ls[k] = bool(v)
-        return ls
-
-    def load(self) -> None:
-        for k, v in self.mgr.get_store_prefix(CERT_STORE_CERT_PREFIX).items():
-            entity = k[len(CERT_STORE_CERT_PREFIX):]
-            self.known_certs[entity] = json.loads(v)
-            if entity in self.service_name_cert or entity in self.host_cert:
-                for k in self.known_certs[entity]:
-                    cert_obj = Cert.from_json(self.known_certs[entity][k])
-                    self.known_certs[entity][k] = cert_obj
-            else:
-                cert_obj = Cert.from_json(self.known_certs[entity])
-                self.known_certs[entity] = cert_obj
-
-        for k, v in self.mgr.get_store_prefix(CERT_STORE_KEY_PREFIX).items():
-            entity = k[len(CERT_STORE_KEY_PREFIX):]
-            self.known_keys[entity] = json.loads(v)
-            if entity in self.host_key or entity in self.service_name_key:
-                for k in self.known_keys[entity]:
-                    priv_key_obj = PrivKey.from_json(self.known_keys[entity][k])
-                    self.known_keys[entity][k] = priv_key_obj
-            else:
-                priv_key_obj = PrivKey.from_json(self.known_keys[entity])
-                self.known_keys[entity] = priv_key_obj
-
-
 class EventStore():
     def __init__(self, mgr):
         # type: (CephadmOrchestrator) -> None
index 8ca07ea5242b434d8934e33943b71c847243434a..2683ed60e6a1c822ae96b7fc42a1920f13217d4f 100644 (file)
@@ -431,11 +431,11 @@ class Migrations:
             grafana_cert = self.mgr.get_store(grafana_cert_path)
             if grafana_cert:
                 logger.info(f'Migrating {grafana_daemon.name()} cert to cert store')
-                self.mgr.cert_key_store.save_cert('grafana_cert', grafana_cert, host=hostname)
+                self.mgr.cert_mgr.save_cert('grafana_cert', grafana_cert, host=hostname)
             grafana_key = self.mgr.get_store(grafana_key_path)
             if grafana_key:
                 logger.info(f'Migrating {grafana_daemon.name()} key to cert store')
-                self.mgr.cert_key_store.save_key('grafana_key', grafana_key, host=hostname)
+                self.mgr.cert_mgr.save_key('grafana_key', grafana_key, host=hostname)
 
         # NOTE: prometheus, alertmanager, and node-exporter certs were not stored
         # and appeared to just be generated at daemon deploy time if secure_monitoring_stack
index 731cd2f31587c966607eee90951b5a0372e1ebfb..dd308943e702f931575c2695ee245c7098e15af5 100644 (file)
@@ -89,7 +89,6 @@ from .inventory import (
     ClientKeyringSpec,
     TunedProfileStore,
     NodeProxyCache,
-    CertKeyStore,
     OrchSecretNotFound,
 )
 from .upgrade import CephadmUpgrade
@@ -593,9 +592,6 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
 
         self.tuned_profile_utils = TunedProfileUtils(self)
 
-        self.cert_key_store = CertKeyStore(self)
-        self.cert_key_store.load()
-
         self.cert_mgr = CertMgr(self, self.get_mgr_ip())
 
         # ensure the host lists are in sync
@@ -3141,7 +3137,7 @@ Then run the following:
 
     @handle_orch_error
     def cert_store_key_ls(self) -> Dict[str, Any]:
-        return self.cert_key_store.key_ls()
+        return self.cert_mgr.key_ls()
 
     @handle_orch_error
     def cert_store_get_cert(
@@ -3151,7 +3147,7 @@ Then run the following:
         hostname: Optional[str] = None,
         no_exception_when_missing: bool = False
     ) -> str:
-        cert = self.cert_key_store.get_cert(entity, service_name or '', hostname or '')
+        cert = self.cert_mgr.get_cert(entity, service_name or '', hostname or '')
         if not cert:
             if no_exception_when_missing:
                 return ''
@@ -3166,7 +3162,7 @@ Then run the following:
         hostname: Optional[str] = None,
         no_exception_when_missing: bool = False
     ) -> str:
-        key = self.cert_key_store.get_key(entity, service_name or '', hostname or '')
+        key = self.cert_mgr.get_key(entity, service_name or '', hostname or '')
         if not key:
             if no_exception_when_missing:
                 return ''
@@ -3323,6 +3319,15 @@ Then run the following:
         }
         self.set_health_checks(self.health_checks)
 
+    def set_health_error(self, name: str, summary: str, count: int, detail: List[str]) -> None:
+        self.health_checks[name] = {
+            'severity': 'error',
+            'summary': summary,
+            'count': count,
+            'detail': detail,
+        }
+        self.set_health_checks(self.health_checks)
+
     def remove_health_warning(self, name: str) -> None:
         if name in self.health_checks:
             del self.health_checks[name]
index d0357f78afc1d2be3a3508f006d19a3561bf8e7c..a32ec6fbe0ec22eed3cae6a2a2647bc9de282295 100644 (file)
@@ -142,8 +142,8 @@ class CephadmServe:
         for d in self.mgr.cache.get_daemons_by_type('grafana'):
             host = d.hostname
             assert host is not None
-            cert = self.mgr.cert_key_store.get_cert('grafana_cert', host=host)
-            key = self.mgr.cert_key_store.get_key('grafana_key', host=host)
+            cert = self.mgr.cert_mgr.get_cert('grafana_cert', host=host)
+            key = self.mgr.cert_mgr.get_key('grafana_key', host=host)
             if (not cert or not cert.strip()) and (not key or not key.strip()):
                 # certificate/key are empty... nothing to check
                 return
index fa47428e5dcccb208337ba45a3b0ba972645f2d7..e506fa534d275f1dd035da12d4c902e28700e095 100644 (file)
@@ -52,8 +52,8 @@ class MgmtGatewayService(CephadmService):
         self.mgr.set_module_option_ex('dashboard', 'standby_behaviour', 'error')
 
     def get_external_certificates(self, svc_spec: MgmtGatewaySpec, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[str, str]:
-        cert = self.mgr.cert_key_store.get_cert('mgmt_gw_cert')
-        key = self.mgr.cert_key_store.get_key('mgmt_gw_key')
+        cert = self.mgr.cert_mgr.get_cert('mgmt_gw_cert')
+        key = self.mgr.cert_mgr.get_key('mgmt_gw_key')
         if not (cert and key):
             # not available on store, check if provided on the spec
             if svc_spec.ssl_certificate and svc_spec.ssl_certificate_key:
@@ -66,8 +66,8 @@ class MgmtGatewayService(CephadmService):
                 cert, key = self.mgr.cert_mgr.generate_cert(host_fqdn, ips)
             # save certificates
             if cert and key:
-                self.mgr.cert_key_store.save_cert('mgmt_gw_cert', cert)
-                self.mgr.cert_key_store.save_key('mgmt_gw_key', key)
+                self.mgr.cert_mgr.save_cert('mgmt_gw_cert', cert)
+                self.mgr.cert_mgr.save_key('mgmt_gw_key', key)
             else:
                 logger.error("Failed to obtain certificate and key from mgmt-gateway.")
         return cert, key
@@ -169,5 +169,5 @@ class MgmtGatewayService(CephadmService):
         self.mgr.set_module_option_ex('dashboard', 'standby_behaviour', 'redirect')
         if daemon.hostname is not None:
             # delete cert/key entires for this mgmt-gateway daemon
-            self.mgr.cert_key_store.rm_cert('mgmt_gw_cert')
-            self.mgr.cert_key_store.rm_key('mgmt_gw_key')
+            self.mgr.cert_mgr.rm_cert('mgmt_gw_cert')
+            self.mgr.cert_mgr.rm_key('mgmt_gw_key')
index dfd9317b5229581cb0f14ca79e85547883f8794d..5f1e09bfca367b20351ecf4e28255f5de304a407 100644 (file)
@@ -179,8 +179,8 @@ class GrafanaService(CephadmService):
         return config_file, self.get_dependencies(self.mgr)
 
     def prepare_certificates(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[str, str]:
-        cert = self.mgr.cert_key_store.get_cert('grafana_cert', host=daemon_spec.host)
-        pkey = self.mgr.cert_key_store.get_key('grafana_key', host=daemon_spec.host)
+        cert = self.mgr.cert_mgr.get_cert('grafana_cert', host=daemon_spec.host)
+        pkey = self.mgr.cert_mgr.get_key('grafana_key', host=daemon_spec.host)
         certs_present = (cert and pkey)
         is_valid_certificate = False
         (org, cn) = (None, None)
@@ -206,8 +206,8 @@ class GrafanaService(CephadmService):
             node_ip = self.mgr.inventory.get_addr(daemon_spec.host)
             cert, pkey = self.mgr.cert_mgr.generate_cert([host_fqdn, "grafana_servers"], node_ip)
             # cert, pkey = create_self_signed_cert('Ceph', host_fqdn)
-            self.mgr.cert_key_store.save_cert('grafana_cert', cert, host=daemon_spec.host)
-            self.mgr.cert_key_store.save_key('grafana_key', pkey, host=daemon_spec.host)
+            self.mgr.cert_mgr.save_cert('grafana_cert', cert, host=daemon_spec.host)
+            self.mgr.cert_mgr.save_key('grafana_key', pkey, host=daemon_spec.host)
             if 'dashboard' in self.mgr.get('mgr_map')['modules']:
                 self.mgr.check_mon_command({
                     'prefix': 'dashboard set-grafana-api-ssl-verify',
@@ -277,8 +277,8 @@ class GrafanaService(CephadmService):
         """
         if daemon.hostname is not None:
             # delete cert/key entires for this grafana daemon
-            self.mgr.cert_key_store.rm_cert('grafana_cert', host=daemon.hostname)
-            self.mgr.cert_key_store.rm_key('grafana_key', host=daemon.hostname)
+            self.mgr.cert_mgr.rm_cert('grafana_cert', host=daemon.hostname)
+            self.mgr.cert_mgr.rm_key('grafana_key', host=daemon.hostname)
 
     def ok_to_stop(self,
                    daemon_ids: List[str],
index 078de78e5d3c63377a6a8def56f939cb9b9f8047..89ee720620acb82fd1739b158eba8e4f1ecd9740 100644 (file)
@@ -41,8 +41,8 @@ class OAuth2ProxyService(CephadmService):
         return DaemonDescription()
 
     def get_certificates(self, svc_spec: OAuth2ProxySpec, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[str, str]:
-        cert = self.mgr.cert_key_store.get_cert('oauth2_proxy_cert')
-        key = self.mgr.cert_key_store.get_key('oauth2_proxy_key')
+        cert = self.mgr.cert_mgr.get_cert('oauth2_proxy_cert')
+        key = self.mgr.cert_mgr.get_key('oauth2_proxy_key')
         if not (cert and key):
             # not available on store, check if provided on the spec
             if svc_spec.ssl_certificate and svc_spec.ssl_certificate_key:
@@ -55,8 +55,8 @@ class OAuth2ProxyService(CephadmService):
                 cert, key = self.mgr.cert_mgr.generate_cert(host_fqdn, addr)
             # save certificates
             if cert and key:
-                self.mgr.cert_key_store.save_cert('oauth2_proxy_cert', cert)
-                self.mgr.cert_key_store.save_key('oauth2_proxy_key', key)
+                self.mgr.cert_mgr.save_cert('oauth2_proxy_cert', cert)
+                self.mgr.cert_mgr.save_key('oauth2_proxy_key', key)
             else:
                 logger.error("Failed to obtain certificate and key from mgmt-gateway.")
         return cert, key
diff --git a/src/pybind/mgr/cephadm/tlsobject_store.py b/src/pybind/mgr/cephadm/tlsobject_store.py
new file mode 100644 (file)
index 0000000..3f6203b
--- /dev/null
@@ -0,0 +1,143 @@
+from typing import Any, Dict, Union, List, Tuple, Optional, TYPE_CHECKING, Type
+from enum import Enum
+import json
+import logging
+
+from cephadm.tlsobject_types import TLSObjectProtocol, TLSObjectException
+
+
+if TYPE_CHECKING:
+    from cephadm.module import CephadmOrchestrator
+
+
+TLSOBJECT_STORE_PREFIX = 'cert_store.'
+
+
+logger = logging.getLogger(__name__)
+
+
+class TLSObjectScope(Enum):
+    HOST = "host"
+    SERVICE = "service"
+    GLOBAL = "global"
+    UNKNOWN = "unknown"
+
+
+class TLSObjectStore():
+
+    def __init__(self, mgr: 'CephadmOrchestrator',
+                 tlsobject_class: Type[TLSObjectProtocol],
+                 known_entities: Dict[TLSObjectScope, List[str]]) -> None:
+        self.mgr: CephadmOrchestrator = mgr
+        self.tlsobject_class = tlsobject_class
+        all_known_entities = [item for sublist in known_entities.values() for item in sublist]
+        self.known_entities: Dict[str, Any] = {key: {} for key in all_known_entities}
+        self.per_service_name_tlsobjects = known_entities[TLSObjectScope.SERVICE]
+        self.per_host_tlsobjects = known_entities[TLSObjectScope.HOST]
+        self.store_prefix = f'{TLSOBJECT_STORE_PREFIX}{tlsobject_class.STORAGE_PREFIX}.'
+
+    def determine_tlsobject_target(self, entity: str, target: Optional[str]) -> Tuple[Optional[str], Optional[str]]:
+        if entity in self.per_service_name_tlsobjects:
+            return (target, None)
+        elif entity in self.per_host_tlsobjects:
+            return (None, target)
+        else:
+            return (None, None)
+
+    def get_tlsobject_scope_and_target(self, entity: str, service_name: Optional[str] = None, host: Optional[str] = None) -> Tuple[TLSObjectScope, Optional[Any]]:
+        if entity in self.per_service_name_tlsobjects:
+            return TLSObjectScope.SERVICE, service_name
+        elif entity in self.per_host_tlsobjects:
+            return TLSObjectScope.HOST, host
+        else:
+            return TLSObjectScope.GLOBAL, None
+
+    def get_tlsobject(self, entity: str, service_name: Optional[str] = None, host: Optional[str] = None) -> Optional[TLSObjectProtocol]:
+        self._validate_tlsobject_entity(entity, service_name, host)
+        scope, target = self.get_tlsobject_scope_and_target(entity, service_name, host)
+        if scope == TLSObjectScope.GLOBAL:
+            return self.known_entities.get(entity)
+        else:
+            return self.known_entities.get(entity, {}).get(target)
+
+    def save_tlsobject(self, entity: str, tlsobject: str, service_name: Optional[str] = None, host: Optional[str] = None, user_made: bool = False) -> None:
+        self._validate_tlsobject_entity(entity, service_name, host)
+        tlsobject = self.tlsobject_class(tlsobject, user_made)
+        scope, target = self.get_tlsobject_scope_and_target(entity, service_name, host)
+        j: Union[str, Dict[Any, Any], None] = None
+        if scope in {TLSObjectScope.SERVICE, TLSObjectScope.HOST}:
+            self.known_entities[entity][target] = tlsobject
+            j = {
+                key: self.tlsobject_class.to_json(self.known_entities[entity][key])
+                for key in self.known_entities[entity]
+            }
+        else:
+            self.known_entities[entity] = tlsobject
+            j = self.tlsobject_class.to_json(tlsobject)
+
+        self.mgr.set_store(self.store_prefix + entity, json.dumps(j))
+
+    def rm_tlsobject(self, entity: str, service_name: Optional[str] = None, host: Optional[str] = None) -> None:
+        """Remove a tlsobjectificate for a specific entity, service, or host."""
+        self._validate_tlsobject_entity(entity, service_name, host)
+        scope, target = self.get_tlsobject_scope_and_target(entity, service_name, host)
+        j: Union[str, Dict[Any, Any], None] = None
+        if scope in {TLSObjectScope.SERVICE, TLSObjectScope.HOST}:
+            if entity in self.known_entities and target in self.known_entities[entity]:
+                del self.known_entities[entity][target]
+                j = {
+                    key: self.tlsobject_class.to_json(self.known_entities[entity][key])
+                    for key in self.known_entities[entity]
+                }
+                self.mgr.set_store(self.store_prefix + entity, json.dumps(j))
+        elif scope == TLSObjectScope.GLOBAL:
+            self.known_entities[entity] = self.tlsobject_class()
+            j = self.tlsobject_class.to_json(self.known_entities[entity])
+            self.mgr.set_store(self.store_prefix + entity, json.dumps(j))
+        else:
+            raise TLSObjectException(f'Attempted to remove {self.tlsobject_class.__name__.lower()} for unknown entity {entity}')
+
+    def _validate_tlsobject_entity(self, entity: str, service_name: Optional[str] = None, host: Optional[str] = None) -> None:
+        cred_type = self.tlsobject_class.__name__.lower()
+        if entity not in self.known_entities.keys():
+            raise TLSObjectException(f'Attempted to access {cred_type} for unknown entity {entity}')
+        if entity in self.per_host_tlsobjects and not host:
+            raise TLSObjectException(f'Need host to access {cred_type} for entity {entity}')
+        if entity in self.per_service_name_tlsobjects and not service_name:
+            raise TLSObjectException(f'Need service name to access {cred_type} for entity {entity}')
+
+    def list_tlsobjects(self) -> List[Tuple[str, Type[TLSObjectProtocol], Optional[str]]]:
+        """
+        Returns a shallow list of all known TLS objects, including their targets.
+
+        Returns:
+            List of tuples: (entity, tlsobject, target)
+            - entity: The TLS object entity name.
+            - tlsobject: The TLS object itself.
+            - target: The associated target (service_name, host, or None for global).
+        """
+        tlsobjects = []
+        for known_entity, value in self.known_entities.items():
+            if isinstance(value, dict):  # Handle per-service or per-host TLS objects
+                for target, tlsobject in value.items():
+                    if tlsobject:
+                        tlsobjects.append((known_entity, tlsobject, target))
+            else:  # Handle Global TLS objects
+                tlsobjects.append((known_entity, value, None))
+
+        return tlsobjects
+
+    def get_tlsobjects(self) -> Dict[str, Union[Type[TLSObjectProtocol], Dict[str, Type[TLSObjectProtocol]]]]:
+        return self.known_entities
+
+    def load(self) -> None:
+        for k, v in self.mgr.get_store_prefix(self.store_prefix).items():
+            entity = k[len(self.store_prefix):]
+            self.known_entities[entity] = json.loads(v)
+            if entity in self.per_service_name_tlsobjects or entity in self.per_host_tlsobjects:
+                for k in self.known_entities[entity]:
+                    tlsobject = self.tlsobject_class.from_json(self.known_entities[entity][k])
+                    self.known_entities[entity][k] = tlsobject
+            else:
+                tlsobject = self.tlsobject_class.from_json(self.known_entities[entity])
+                self.known_entities[entity] = tlsobject
diff --git a/src/pybind/mgr/cephadm/tlsobject_types.py b/src/pybind/mgr/cephadm/tlsobject_types.py
new file mode 100644 (file)
index 0000000..fd46a65
--- /dev/null
@@ -0,0 +1,120 @@
+from typing import Any, Dict, Protocol, Union
+from orchestrator import OrchestratorError
+
+
+class TLSObjectException(OrchestratorError):
+    pass
+
+
+class TLSObjectProtocol(Protocol):
+    STORAGE_PREFIX: str
+
+    def __init__(self, cert: str = '', user_made: bool = False) -> None:
+        ...
+
+    def __bool__(self) -> bool:
+        ...
+
+    def __eq__(self, other: Any) -> bool:
+        ...
+
+    def to_json(self) -> Dict[str, Union[str, bool]]:
+        ...
+
+    @classmethod
+    def from_json(cls, data: Dict[str, Any]) -> 'TLSObjectProtocol':
+        ...
+
+
+class Cert(TLSObjectProtocol):
+    STORAGE_PREFIX = 'cert'
+
+    def __init__(self, cert: str = '', user_made: bool = False) -> None:
+        self.cert = cert
+        self.user_made = user_made
+
+    def __bool__(self) -> bool:
+        return bool(self.cert)
+
+    def __eq__(self, other: Any) -> bool:
+        if isinstance(other, Cert):
+            return self.cert == other.cert and self.user_made == other.user_made
+        return NotImplemented
+
+    def to_json(self) -> Dict[str, Union[str, bool]]:
+        if (self):
+            return {
+                'cert': self.cert,
+                'user_made': self.user_made
+            }
+        else:
+            return {}
+
+    @classmethod
+    def from_json(cls, data: Dict[str, Union[str, bool]]) -> 'Cert':
+        if 'cert' not in data:
+            return cls()
+        cert = data['cert']
+        if not isinstance(cert, str):
+            raise TLSObjectException('Tried to make Cert object with non-string cert')
+        if any(k not in ['cert', 'user_made'] for k in data.keys()):
+            raise TLSObjectException(f'Got unknown field for Cert object. Fields: {data.keys()}')
+        user_made: Union[str, bool] = data.get('user_made', False)
+        if not isinstance(user_made, bool):
+            if isinstance(user_made, str):
+                if user_made.lower() == 'true':
+                    user_made = True
+                elif user_made.lower() == 'false':
+                    user_made = False
+            try:
+                user_made = bool(user_made)
+            except Exception:
+                raise TLSObjectException(f'Expected user_made field in Cert object to be bool but got {type(user_made)}')
+        return cls(cert=cert, user_made=user_made)
+
+
+class PrivKey(TLSObjectProtocol):
+    STORAGE_PREFIX = 'key'
+
+    def __init__(self, key: str = '', user_made: bool = False) -> None:
+        self.key = key
+        self.user_made = user_made
+
+    def __bool__(self) -> bool:
+        return bool(self.key)
+
+    def __eq__(self, other: Any) -> bool:
+        if isinstance(other, PrivKey):
+            return self.key == other.key and self.user_made == other.user_made
+        return NotImplemented
+
+    def to_json(self) -> Dict[str, Union[str, bool]]:
+        if bool(self):
+            return {
+                'key': self.key,
+                'user_made': self.user_made
+            }
+        else:
+            return {}
+
+    @classmethod
+    def from_json(cls, data: Dict[str, str]) -> 'PrivKey':
+        if 'key' not in data:
+            return cls()
+        key = data['key']
+        if not isinstance(key, str):
+            raise TLSObjectException('Tried to make PrivKey object with non-string key')
+        if any(k not in ['key', 'user_made'] for k in data.keys()):
+            raise TLSObjectException(f'Got unknown field for PrivKey object. Fields: {data.keys()}')
+        user_made: Union[str, bool] = data.get('user_made', False)
+        if not isinstance(user_made, bool):
+            if isinstance(user_made, str):
+                if user_made.lower() == 'true':
+                    user_made = True
+                elif user_made.lower() == 'false':
+                    user_made = False
+            try:
+                user_made = bool(user_made)
+            except Exception:
+                raise TLSObjectException(f'Expected user_made field in PrivKey object to be bool but got {type(user_made)}')
+        return cls(key=key, user_made=user_made)