CEPHADM_ROOT_CA_KEY = 'cephadm_root_ca_key'
def __init__(self, mgr: "CephadmOrchestrator", ip: str) -> None:
- self.ssl_certs: SSLCerts = SSLCerts()
+ self.ssl_certs: SSLCerts = SSLCerts(mgr._cluster_fsid)
old_cert = mgr.cert_key_store.get_cert(self.CEPHADM_ROOT_CA_CERT)
old_key = mgr.cert_key_store.get_key(self.CEPHADM_ROOT_CA_KEY)
if old_key and old_cert:
class SSLCerts:
- def __init__(self) -> None:
+ def __init__(self, fsid: str) -> None:
self.root_cert: Any
self.root_key: Any
self.key_file: IO[bytes]
self.cert_file: IO[bytes]
+ self.cluster_fsid: str = fsid
def generate_root_cert(
self,
root_builder = root_builder.public_key(root_public_key)
san_list: List[x509.GeneralName] = []
+ san_list.append(x509.DNSName(f'fsid-{self.cluster_fsid}'))
if addr:
san_list.extend([x509.IPAddress(ipaddress.ip_address(addr))])
if custom_san_list:
self.node_proxy = MagicMock()
self.http_server = MagicMock()
self.http_server.agent = MagicMock()
- self.http_server.agent.ssl_certs = SSLCerts()
+ self.http_server.agent.ssl_certs = SSLCerts("59d1b32e-xxxx-11ef-xxxx-52540060267a")
self.http_server.agent.ssl_certs.generate_root_cert(addr=self.get_mgr_ip())
self.cert_mgr = FakeCertMgr()