import logging
import socket
import ssl
-import tempfile
import threading
import time
from ceph.deployment.inventory import Devices
from ceph.deployment.service_spec import ServiceSpec, PlacementSpec
from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
-from cephadm.ssl_cert_utils import SSLCerts
from mgr_util import test_port_allocation, PortAlreadyInUse
+from mgr_util import verify_tls_files
+import tempfile
from urllib.error import HTTPError, URLError
-from typing import Any, Dict, List, Set, TYPE_CHECKING, Optional, MutableMapping
+from typing import Any, Dict, List, Set, TYPE_CHECKING, Optional, MutableMapping, IO
if TYPE_CHECKING:
from cephadm.module import CephadmOrchestrator
def __init__(self, mgr: "CephadmOrchestrator") -> None:
self.mgr = mgr
- self.ssl_certs = SSLCerts()
self.server_port = 7150
self.server_addr = self.mgr.get_mgr_ip()
+ self.key_file: IO[bytes]
+ self.cert_file: IO[bytes]
def configure_routes(self) -> None:
conf = {'/': {'tools.trailing_slash.on': False}}
cherrypy.tree.mount(self.node_proxy_endpoint, '/node-proxy', config=conf)
def configure_tls(self, server: Server) -> None:
- old_cert = self.mgr.cert_key_store.get_cert('agent_endpoint_root_cert')
- old_key = self.mgr.cert_key_store.get_key('agent_endpoint_key')
+ addr = self.mgr.get_mgr_ip()
+ host = self.mgr.get_hostname()
+ cert, key = self.mgr.cert_mgr.generate_cert(host, addr)
+ self.cert_file = tempfile.NamedTemporaryFile()
+ self.cert_file.write(cert.encode('utf-8'))
+ self.cert_file.flush() # cert_tmp must not be gc'ed
- if old_cert and old_key:
- self.ssl_certs.load_root_credentials(old_cert, old_key)
- else:
- self.ssl_certs.generate_root_cert(self.mgr.get_mgr_ip())
- self.mgr.cert_key_store.save_cert('agent_endpoint_root_cert', self.ssl_certs.get_root_cert())
- self.mgr.cert_key_store.save_key('agent_endpoint_key', self.ssl_certs.get_root_key())
+ self.key_file = tempfile.NamedTemporaryFile()
+ self.key_file.write(key.encode('utf-8'))
+ self.key_file.flush() # pkey_tmp must not be gc'ed
- host = self.mgr.get_hostname()
- addr = self.mgr.get_mgr_ip()
- server.ssl_certificate, server.ssl_private_key = self.ssl_certs.generate_cert_files(host, addr)
+ verify_tls_files(self.cert_file.name, self.key_file.name)
+ server.ssl_certificate, server.ssl_private_key = self.cert_file.name, self.key_file.name
def find_free_port(self) -> None:
max_port = self.server_port + 150
class NodeProxyEndpoint:
def __init__(self, mgr: "CephadmOrchestrator"):
self.mgr = mgr
- self.ssl_root_crt = self.mgr.http_server.agent.ssl_certs.get_root_cert()
+ self.ssl_root_crt = self.mgr.cert_mgr.get_root_ca()
self.ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
self.ssl_ctx.check_hostname = False
self.ssl_ctx.verify_mode = ssl.CERT_NONE
endpoint: List[Any] = ['led', led_type]
device: str = id_drive if id_drive else ''
- ssl_root_crt = self.mgr.http_server.agent.ssl_certs.get_root_cert()
+ ssl_root_crt = self.mgr.cert_mgr.get_root_ca()
ssl_ctx = ssl.create_default_context()
ssl_ctx.check_hostname = True
ssl_ctx.verify_mode = ssl.CERT_REQUIRED
self.mgr.agent_cache.sending_agent_message[self.host] = True
try:
assert self.agent
- root_cert = self.agent.ssl_certs.get_root_cert()
+ root_cert = self.mgr.cert_mgr.get_root_ca()
root_cert_tmp = tempfile.NamedTemporaryFile()
root_cert_tmp.write(root_cert.encode('utf-8'))
root_cert_tmp.flush()
root_cert_fname = root_cert_tmp.name
- cert, key = self.agent.ssl_certs.generate_cert(
- self.mgr.get_hostname(), self.mgr.get_mgr_ip())
+ cert, key = self.mgr.cert_mgr.generate_cert(self.mgr.get_hostname(), self.mgr.get_mgr_ip())
cert_tmp = tempfile.NamedTemporaryFile()
cert_tmp.write(cert.encode('utf-8'))
down = False
try:
assert self.agent
- assert self.agent.ssl_certs.get_root_cert()
+ assert self.mgr.cert_mgr.get_root_ca()
except Exception:
self.mgr.log.debug(
f'Delaying checking agent on {host} until cephadm endpoint finished creating root cert')
# so it's necessary to check this one specifically
root_cert_match = False
try:
- root_cert = self.agent.ssl_certs.get_root_cert()
+ root_cert = self.mgr.cert_mgr.get_root_ca()
if last_deps and root_cert in last_deps:
root_cert_match = True
except Exception:
--- /dev/null
+
+from cephadm.ssl_cert_utils import SSLCerts
+from threading import Lock
+from typing import TYPE_CHECKING, Tuple, Union, List
+
+if TYPE_CHECKING:
+ from cephadm.module import CephadmOrchestrator
+
+
+class CertMgr:
+
+ CEPHADM_ROOT_CA_CERT = 'cephadm_root_ca_cert'
+ CEPHADM_ROOT_CA_KEY = 'cephadm_root_ca_key'
+
+ def __init__(self, mgr: "CephadmOrchestrator", ip: str) -> None:
+ self.lock = Lock()
+ self.initialized = False
+ with self.lock:
+ if self.initialized:
+ return
+ self.initialized = True
+ self.mgr = mgr
+ self.ssl_certs: SSLCerts = SSLCerts()
+ old_cert = self.mgr.cert_key_store.get_cert(self.CEPHADM_ROOT_CA_CERT)
+ old_key = self.mgr.cert_key_store.get_key(self.CEPHADM_ROOT_CA_KEY)
+ if old_key and old_cert:
+ self.ssl_certs.load_root_credentials(old_cert, old_key)
+ else:
+ self.ssl_certs.generate_root_cert(ip)
+ self.mgr.cert_key_store.save_cert(self.CEPHADM_ROOT_CA_CERT, self.ssl_certs.get_root_cert())
+ self.mgr.cert_key_store.save_key(self.CEPHADM_ROOT_CA_KEY, self.ssl_certs.get_root_key())
+
+ def get_root_ca(self) -> str:
+ with self.lock:
+ if self.initialized:
+ return self.ssl_certs.get_root_cert()
+ raise Exception("Not initialized")
+
+ def generate_cert(self, host_fqdn: Union[str, List[str]], node_ip: str) -> Tuple[str, str]:
+ with self.lock:
+ if self.initialized:
+ return self.ssl_certs.generate_cert(host_fqdn, node_ip)
+ raise Exception("Not initialized")
host_cert = [
'grafana_cert',
- 'alertmanager_cert',
- 'prometheus_cert',
- 'node_exporter_cert',
]
host_key = [
'grafana_key',
- 'alertmanager_key',
- 'prometheus_key',
- 'node_exporter_key',
]
service_name_key = [
'agent_endpoint_root_cert': Cert(), # cert
'mgmt_gw_root_cert': Cert(), # cert
'service_discovery_root_cert': Cert(), # cert
+ 'cephadm_root_ca_cert': Cert(), # cert
'grafana_cert': {}, # host -> cert
- 'alertmanager_cert': {}, # host -> cert
- 'prometheus_cert': {}, # host -> cert
- 'node_exporter_cert': {}, # host -> cert
}
# Similar to certs but for priv keys. Entries in known_certs
# that don't have a key here are probably certs in PEM format
# so there is no need to store a separate key
self.known_keys = {
- 'agent_endpoint_key': PrivKey(), # key
- 'service_discovery_key': PrivKey(), # key
- 'mgmt_gw_root_key': PrivKey(), # cert
+ 'cephadm_root_ca_key': PrivKey(), # cert
'grafana_key': {}, # host -> key
- 'alertmanager_key': {}, # host -> key
- 'prometheus_key': {}, # host -> key
- 'node_exporter_key': {}, # host -> key
'iscsi_ssl_key': {}, # service-name -> key
'ingress_ssl_key': {}, # service-name -> key
'nvmeof_server_key': {}, # service-name -> key
logger.info(f'Migrating certs/keys for {spec.service_name()} spec to cert store')
self.mgr.spec_store._save_certs_and_keys(spec)
- # Migrate service discovery and agent endpoint certs
- # These constants were taken from where these certs were
- # originally generated and should be the location they
- # were store at prior to the cert store
- KV_STORE_AGENT_ROOT_CERT = 'cephadm_agent/root/cert'
- KV_STORE_AGENT_ROOT_KEY = 'cephadm_agent/root/key'
- KV_STORE_SD_ROOT_CERT = 'service_discovery/root/cert'
- KV_STORE_SD_ROOT_KEY = 'service_discovery/root/key'
-
- agent_endpoint_cert = self.mgr.get_store(KV_STORE_AGENT_ROOT_CERT)
- if agent_endpoint_cert:
- logger.info('Migrating agent root cert to cert store')
- self.mgr.cert_key_store.save_cert('agent_endpoint_root_cert', agent_endpoint_cert)
- agent_endpoint_key = self.mgr.get_store(KV_STORE_AGENT_ROOT_KEY)
- if agent_endpoint_key:
- logger.info('Migrating agent root key to cert store')
- self.mgr.cert_key_store.save_key('agent_endpoint_key', agent_endpoint_key)
- service_discovery_cert = self.mgr.get_store(KV_STORE_SD_ROOT_CERT)
- if service_discovery_cert:
- logger.info('Migrating service discovery cert to cert store')
- self.mgr.cert_key_store.save_cert('service_discovery_root_cert', service_discovery_cert)
- service_discovery_key = self.mgr.get_store(KV_STORE_SD_ROOT_KEY)
- if service_discovery_key:
- logger.info('Migrating service discovery key to cert store')
- self.mgr.cert_key_store.save_key('service_discovery_key', service_discovery_key)
-
# grafana certs are stored based on the host they are placed on
for grafana_daemon in self.mgr.cache.get_daemons_by_type('grafana'):
logger.info(f'Checking for cert/key for {grafana_daemon.name()}')
from threading import Event
from ceph.deployment.service_spec import PrometheusSpec
+from cephadm.cert_mgr import CertMgr
import string
from typing import List, Dict, Optional, Callable, Tuple, TypeVar, \
super(CephadmOrchestrator, self).__init__(*args, **kwargs)
self._cluster_fsid: str = self.get('mon_map')['fsid']
self.last_monmap: Optional[datetime.datetime] = None
+ self.cert_mgr = CertMgr(self, self.get_mgr_ip())
# for serve()
self.run = True
self.event = Event()
-
self.ssh = ssh.SSHManager(self)
if self.get_store('pause'):
raise OrchestratorError(
f'If {service_name} is removed then the following OSDs will remain, --force to proceed anyway\n{msg}')
+ if service_name == 'mgmt-gateway':
+ self.set_module_option('secure_monitoring_stack', False)
+
found = self.spec_store.rm(service_name)
if found and service_name.startswith('osd.'):
self.spec_store.finally_rm(service_name)
server_port = ''
try:
server_port = str(self.http_server.agent.server_port)
- root_cert = self.http_server.agent.ssl_certs.get_root_cert()
+ root_cert = self.cert_mgr.get_root_ca()
except Exception:
pass
deps = sorted([self.get_mgr_ip(), server_port, root_cert,
server_port = ''
try:
server_port = str(self.http_server.agent.server_port)
- root_cert = self.http_server.agent.ssl_certs.get_root_cert()
+ root_cert = self.cert_mgr.get_root_ca()
except Exception:
pass
deps = sorted([self.get_mgr_ip(), server_port, root_cert])
user, password = self._get_prometheus_credentials()
return {'user': user,
'password': password,
- 'certificate': self.http_server.service_discovery.ssl_certs.get_root_cert()}
+ 'certificate': self.cert_mgr.get_root_ca()}
@handle_orch_error
def get_alertmanager_access_info(self) -> Dict[str, str]:
user, password = self._get_alertmanager_credentials()
return {'user': user,
'password': password,
- 'certificate': self.http_server.service_discovery.ssl_certs.get_root_cert()}
+ 'certificate': self.cert_mgr.get_root_ca()}
@handle_orch_error
def cert_store_cert_ls(self) -> Dict[str, Any]:
host_count = len(self.inventory.keys())
max_count = self.max_count_per_host
+ if spec.service_type == 'mgmt-gateway':
+ self.set_module_option('secure_monitoring_stack', True)
+
if spec.placement.count is not None:
if spec.service_type in ['mon', 'mgr']:
if spec.placement.count > max(5, host_count):
if service_type == 'agent':
try:
assert self.mgr.http_server.agent
- assert self.mgr.http_server.agent.ssl_certs.get_root_cert()
+ assert self.mgr.cert_mgr.get_root_ca()
except Exception:
self.log.info(
'Delaying applying agent spec until cephadm endpoint root cert created')
pass
import logging
-import socket
import orchestrator # noqa
from mgr_module import ServiceInfoT
from mgr_util import build_url
-from typing import Dict, List, TYPE_CHECKING, cast, Collection, Callable, NamedTuple, Optional
+from typing import Dict, List, TYPE_CHECKING, cast, Collection, Callable, NamedTuple, Optional, IO
from cephadm.services.monitoring import AlertmanagerService, NodeExporterService, PrometheusService
import secrets
+from mgr_util import verify_tls_files
+import tempfile
from cephadm.services.ingress import IngressSpec
-from cephadm.ssl_cert_utils import SSLCerts
from cephadm.services.cephadmservice import CephExporterService
from cephadm.services.nvmeof import NvmeofService
def __init__(self, mgr: "CephadmOrchestrator") -> None:
self.mgr = mgr
- self.ssl_certs = SSLCerts()
self.username: Optional[str] = None
self.password: Optional[str] = None
+ self.key_file: IO[bytes]
+ self.cert_file: IO[bytes]
def validate_password(self, realm: str, username: str, password: str) -> bool:
return (password == self.password and username == self.username)
self.mgr.set_store('service_discovery/root/username', self.username)
def configure_tls(self, server: Server) -> None:
- old_cert = self.mgr.cert_key_store.get_cert('service_discovery_root_cert')
- old_key = self.mgr.cert_key_store.get_key('service_discovery_key')
- if old_key and old_cert:
- self.ssl_certs.load_root_credentials(old_cert, old_key)
- else:
- self.ssl_certs.generate_root_cert(self.mgr.get_mgr_ip())
- self.mgr.cert_key_store.save_cert('service_discovery_root_cert', self.ssl_certs.get_root_cert())
- self.mgr.cert_key_store.save_key('service_discovery_key', self.ssl_certs.get_root_key())
addr = self.mgr.get_mgr_ip()
- host_fqdn = socket.getfqdn(addr)
- server.ssl_certificate, server.ssl_private_key = self.ssl_certs.generate_cert_files(
- host_fqdn, addr)
+ host = self.mgr.get_hostname()
+ cert, key = self.mgr.cert_mgr.generate_cert(host, addr)
+ self.cert_file = tempfile.NamedTemporaryFile()
+ self.cert_file.write(cert.encode('utf-8'))
+ self.cert_file.flush() # cert_tmp must not be gc'ed
+
+ self.key_file = tempfile.NamedTemporaryFile()
+ self.key_file.write(key.encode('utf-8'))
+ self.key_file.flush() # pkey_tmp must not be gc'ed
+
+ verify_tls_files(self.cert_file.name, self.key_file.name)
+
+ server.ssl_certificate, server.ssl_private_key = self.cert_file.name, self.key_file.name
def configure(self, port: int, addr: str, enable_security: bool) -> None:
# we create a new server to enforce TLS/SSL config refresh
agent = self.mgr.http_server.agent
try:
assert agent
- assert agent.ssl_certs.get_root_cert()
assert agent.server_port
except Exception:
raise OrchestratorError(
'host': daemon_spec.host,
'device_enhanced_scan': str(self.mgr.device_enhanced_scan)}
- listener_cert, listener_key = agent.ssl_certs.generate_cert(daemon_spec.host, self.mgr.inventory.get_addr(daemon_spec.host))
+ listener_cert, listener_key = self.mgr.cert_mgr.generate_cert(daemon_spec.host, self.mgr.inventory.get_addr(daemon_spec.host))
config = {
'agent.json': json.dumps(cfg),
'keyring': daemon_spec.keyring,
- 'root_cert.pem': agent.ssl_certs.get_root_cert(),
+ 'root_cert.pem': self.mgr.cert_mgr.get_root_ca(),
'listener.crt': listener_cert,
'listener.key': listener_key,
}
return config, sorted([str(self.mgr.get_mgr_ip()), str(agent.server_port),
- agent.ssl_certs.get_root_cert(),
+ self.mgr.cert_mgr.get_root_ca(),
str(self.mgr.get_module_option('device_enhanced_scan'))])
from orchestrator import DaemonDescription
from ceph.deployment.service_spec import MgmtGatewaySpec, GrafanaSpec
from cephadm.services.cephadmservice import CephadmService, CephadmDaemonDeploySpec, get_dashboard_endpoints
-from cephadm.ssl_cert_utils import SSLCerts
logger = logging.getLogger(__name__)
self.mgr.set_module_option_ex('dashboard', 'standby_behaviour', 'error')
def get_certificates(self, svc_spec: MgmtGatewaySpec, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[str, str, str, str]:
- self.ssl_certs = SSLCerts()
- old_cert = self.mgr.cert_key_store.get_cert('mgmt_gw_root_cert')
- old_key = self.mgr.cert_key_store.get_key('mgmt_gw_root_key')
- if old_cert and old_key:
- self.ssl_certs.load_root_credentials(old_cert, old_key)
- else:
- self.ssl_certs.generate_root_cert(self.mgr.get_mgr_ip())
- self.mgr.cert_key_store.save_cert('mgmt_gw_root_cert', self.ssl_certs.get_root_cert())
- self.mgr.cert_key_store.save_key('mgmt_gw_root_key', self.ssl_certs.get_root_key())
-
node_ip = self.mgr.inventory.get_addr(daemon_spec.host)
host_fqdn = self._inventory_get_fqdn(daemon_spec.host)
- internal_cert, internal_pkey = self.ssl_certs.generate_cert(host_fqdn, node_ip)
+ internal_cert, internal_pkey = self.mgr.cert_mgr.generate_cert(host_fqdn, node_ip)
cert = svc_spec.ssl_certificate
pkey = svc_spec.ssl_certificate_key
if not (cert and pkey):
# In case the user has not provided certificates then we generate self-signed ones
- cert, pkey = self.ssl_certs.generate_cert(host_fqdn, node_ip)
+ cert, pkey = self.mgr.cert_mgr.generate_cert(host_fqdn, node_ip)
return internal_cert, internal_pkey, cert, pkey
if not certs_present or (org == 'Ceph' and cn == 'cephadm'):
logger.info('Regenerating cephadm self-signed grafana TLS certificates')
host_fqdn = socket.getfqdn(daemon_spec.host)
- cert, pkey = create_self_signed_cert('Ceph', host_fqdn)
+ node_ip = self.mgr.inventory.get_addr(daemon_spec.host)
+ cert, pkey = self.mgr.cert_mgr.generate_cert([host_fqdn, "grafana_servers"], node_ip)
+ # cert, pkey = create_self_signed_cert('Ceph', host_fqdn)
self.mgr.cert_key_store.save_cert('grafana_cert', cert, host=daemon_spec.host)
self.mgr.cert_key_store.save_key('grafana_key', pkey, host=daemon_spec.host)
if 'dashboard' in self.mgr.get('mgr_map')['modules']:
daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
return daemon_spec
+ def get_alertmanager_certificates(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[str, str]:
+ node_ip = self.mgr.inventory.get_addr(daemon_spec.host)
+ host_fqdn = self._inventory_get_fqdn(daemon_spec.host)
+ cert, key = self.mgr.cert_mgr.generate_cert([host_fqdn, "alertmanager_servers"], node_ip)
+ return cert, key
+
def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]:
assert self.TYPE == daemon_spec.daemon_type
deps: List[str] = []
alertmanager_user, alertmanager_password = self.mgr._get_alertmanager_credentials()
if alertmanager_user and alertmanager_password:
deps.append(f'{hash(alertmanager_user + alertmanager_password)}')
- node_ip = self.mgr.inventory.get_addr(daemon_spec.host)
- host_fqdn = self._inventory_get_fqdn(daemon_spec.host)
- cert = self.mgr.cert_key_store.get_cert('alertmanager_cert', host=daemon_spec.host)
- key = self.mgr.cert_key_store.get_key('alertmanager_key', host=daemon_spec.host)
- if not (cert and key):
- cert, key = self.mgr.http_server.service_discovery.ssl_certs.generate_cert(
- host_fqdn, node_ip)
- self.mgr.cert_key_store.save_cert('alertmanager_cert', cert, host=daemon_spec.host)
- self.mgr.cert_key_store.save_key('alertmanager_key', key, host=daemon_spec.host)
+ cert, key = self.get_alertmanager_certificates(daemon_spec)
context = {
'alertmanager_web_user': alertmanager_user,
'alertmanager_web_password': password_hash(alertmanager_password),
service_url
)
- def pre_remove(self, daemon: DaemonDescription) -> None:
- """
- Called before alertmanager daemon is removed.
- """
- if daemon.hostname is not None:
- # delete cert/key entires for this grafana daemon
- self.mgr.cert_key_store.rm_cert('alertmanager_cert', host=daemon.hostname)
- self.mgr.cert_key_store.rm_key('alertmanager_key', host=daemon.hostname)
-
def ok_to_stop(self,
daemon_ids: List[str],
force: bool = False,
# we shouldn't get here (mon will tell the mgr to respawn), but no
# harm done if we do.
+ def get_mgr_prometheus_certificates(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[str, str]:
+ node_ip = self.mgr.inventory.get_addr(daemon_spec.host)
+ host_fqdn = self._inventory_get_fqdn(daemon_spec.host)
+ cert, key = self.mgr.cert_mgr.generate_cert([host_fqdn, "prometheus_servers"], node_ip)
+ return cert, key
+
def prepare_create(
self,
daemon_spec: CephadmDaemonDeploySpec,
mgmt_gw_enabled = len(self.mgr.cache.get_daemons_by_service('mgmt-gateway')) > 0
if self.mgr.secure_monitoring_stack:
- # NOTE: this prometheus root cert is managed by the prometheus module
- # we are using it in a read only fashion in the cephadm module
- cfg_key = 'mgr/prometheus/root/cert'
- cmd = {'prefix': 'config-key get', 'key': cfg_key}
- ret, mgr_prometheus_rootca, err = self.mgr.mon_command(cmd)
- if ret != 0:
- logger.error(f'mon command to get config-key {cfg_key} failed: {err}')
- else:
- node_ip = self.mgr.inventory.get_addr(daemon_spec.host)
- host_fqdn = self._inventory_get_fqdn(daemon_spec.host)
- cert = self.mgr.cert_key_store.get_cert('prometheus_cert', host=daemon_spec.host)
- key = self.mgr.cert_key_store.get_key('prometheus_key', host=daemon_spec.host)
- if not (cert and key):
- cert, key = self.mgr.http_server.service_discovery.ssl_certs.generate_cert(host_fqdn, node_ip)
- self.mgr.cert_key_store.save_cert('prometheus_cert', cert, host=daemon_spec.host)
- self.mgr.cert_key_store.save_key('prometheus_key', key, host=daemon_spec.host)
- r: Dict[str, Any] = {
- 'files': {
- 'prometheus.yml': self.mgr.template.render('services/prometheus/prometheus.yml.j2', context),
- 'root_cert.pem': self.mgr.http_server.service_discovery.ssl_certs.get_root_cert(),
- 'mgr_prometheus_cert.pem': mgr_prometheus_rootca,
- 'web.yml': self.mgr.template.render('services/prometheus/web.yml.j2', web_context),
- 'prometheus.crt': cert,
- 'prometheus.key': key,
- },
- 'retention_time': retention_time,
- 'retention_size': retention_size,
- 'ip_to_bind_to': ip_to_bind_to,
- 'web_config': '/etc/prometheus/web.yml',
- 'use_url_prefix': mgmt_gw_enabled
- }
+ cert, key = self.get_mgr_prometheus_certificates(daemon_spec)
+ r: Dict[str, Any] = {
+ 'files': {
+ 'prometheus.yml': self.mgr.template.render('services/prometheus/prometheus.yml.j2', context),
+ 'root_cert.pem': self.mgr.cert_mgr.get_root_ca(),
+ 'web.yml': self.mgr.template.render('services/prometheus/web.yml.j2', web_context),
+ 'prometheus.crt': cert,
+ 'prometheus.key': key,
+ },
+ 'retention_time': retention_time,
+ 'retention_size': retention_size,
+ 'ip_to_bind_to': ip_to_bind_to,
+ 'web_config': '/etc/prometheus/web.yml',
+ 'use_url_prefix': mgmt_gw_enabled
+ }
else:
r = {
'files': {
service_url
)
- def pre_remove(self, daemon: DaemonDescription) -> None:
- """
- Called before prometheus daemon is removed.
- """
- if daemon.hostname is not None:
- # delete cert/key entires for this prometheus daemon
- self.mgr.cert_key_store.rm_cert('prometheus_cert', host=daemon.hostname)
- self.mgr.cert_key_store.rm_key('prometheus_key', host=daemon.hostname)
-
def ok_to_stop(self,
daemon_ids: List[str],
force: bool = False,
daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
return daemon_spec
+ def get_node_exporter_certificates(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[str, str]:
+ node_ip = self.mgr.inventory.get_addr(daemon_spec.host)
+ host_fqdn = self._inventory_get_fqdn(daemon_spec.host)
+ cert, key = self.mgr.cert_mgr.generate_cert(host_fqdn, node_ip)
+ return cert, key
+
def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]:
assert self.TYPE == daemon_spec.daemon_type
deps = [f'secure_monitoring_stack:{self.mgr.secure_monitoring_stack}']
if self.mgr.secure_monitoring_stack:
- node_ip = self.mgr.inventory.get_addr(daemon_spec.host)
- host_fqdn = self._inventory_get_fqdn(daemon_spec.host)
- cert = self.mgr.cert_key_store.get_cert('node_exporter_cert', host=daemon_spec.host)
- key = self.mgr.cert_key_store.get_key('node_exporter_key', host=daemon_spec.host)
- if not (cert and key):
- cert, key = self.mgr.http_server.service_discovery.ssl_certs.generate_cert(
- host_fqdn, node_ip)
- self.mgr.cert_key_store.save_cert('node_exporter_cert', cert, host=daemon_spec.host)
- self.mgr.cert_key_store.save_key('node_exporter_key', key, host=daemon_spec.host)
+ cert, key = self.get_node_exporter_certificates(daemon_spec)
+ mgmt_gw_enabled = len(self.mgr.cache.get_daemons_by_service('mgmt-gateway')) > 0
r = {
'files': {
'web.yml': self.mgr.template.render('services/node-exporter/web.yml.j2', {}),
return r, deps
- def pre_remove(self, daemon: DaemonDescription) -> None:
- """
- Called before node-exporter daemon is removed.
- """
- if daemon.hostname is not None:
- # delete cert/key entires for this node-exporter daemon
- self.mgr.cert_key_store.rm_cert('node_exporter_cert', host=daemon.hostname)
- self.mgr.cert_key_store.rm_key('node_exporter_key', host=daemon.hostname)
-
def ok_to_stop(self,
daemon_ids: List[str],
force: bool = False,
self.agent_endpoint = self.mgr.http_server.agent
try:
assert self.agent_endpoint
- assert self.agent_endpoint.ssl_certs.get_root_cert()
assert self.agent_endpoint.server_port
except Exception:
raise OrchestratorError(
'Cannot deploy node-proxy daemons until cephadm endpoint has finished generating certs')
- listener_cert, listener_key = self.agent_endpoint.ssl_certs.generate_cert(daemon_spec.host, self.mgr.inventory.get_addr(daemon_spec.host))
+ listener_cert, listener_key = self.mgr.cert_mgr.generate_cert(daemon_spec.host, self.mgr.inventory.get_addr(daemon_spec.host))
cfg = {
'target_ip': self.mgr.get_mgr_ip(),
'target_port': self.agent_endpoint.server_port,
'name': f'node-proxy.{daemon_spec.host}',
'keyring': daemon_spec.keyring,
- 'root_cert.pem': self.agent_endpoint.ssl_certs.get_root_cert(),
+ 'root_cert.pem': self.mgr.cert_mgr.get_root_ca(),
'listener.crt': listener_cert,
'listener.key': listener_key,
}
config = {'node-proxy.json': json.dumps(cfg)}
return config, sorted([str(self.mgr.get_mgr_ip()), str(self.agent_endpoint.server_port),
- self.agent_endpoint.ssl_certs.get_root_cert()])
+ self.mgr.cert_mgr.get_root_ca()])
def handle_hw_monitoring_setting(self) -> bool:
# function to apply or remove node-proxy service spec depending
return False
def get_ssl_ctx(self) -> ssl.SSLContext:
- ssl_root_crt = self.mgr.http_server.agent.ssl_certs.get_root_cert()
+ ssl_root_crt = self.mgr.cert_mgr.get_root_ca()
ssl_ctx = ssl.create_default_context()
ssl_ctx.check_hostname = True
ssl_ctx.verify_mode = ssl.CERT_REQUIRED
-from typing import Any, Tuple, IO
+from typing import Any, Tuple, IO, List
import ipaddress
-import tempfile
-import logging
from datetime import datetime, timedelta
from cryptography import x509
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.backends import default_backend
-from mgr_util import verify_tls_files
from orchestrator import OrchestratorError
return (cert_str, key_str)
- def generate_cert(self, host: str, addr: str) -> Tuple[str, str]:
+ def generate_cert(self, hosts: Any, addr: str) -> Tuple[str, str]:
have_ip = True
try:
ip = x509.IPAddress(ipaddress.ip_address(addr))
builder = builder.not_valid_after(datetime.now() + timedelta(days=(365 * 10 + 3)))
builder = builder.serial_number(x509.random_serial_number())
builder = builder.public_key(public_key)
+
+ if isinstance(hosts, str):
+ hosts = [hosts]
+ san_list: List[x509.GeneralName] = [x509.DNSName(host) for host in hosts]
if have_ip:
- builder = builder.add_extension(
- x509.SubjectAlternativeName(
- [ip, x509.DNSName(host)]
- ),
- critical=False
- )
- else:
- builder = builder.add_extension(
- x509.SubjectAlternativeName(
- [x509.DNSName(host)]
- ),
- critical=False
- )
+ san_list.append(ip)
+
+ builder = builder.add_extension(
+ x509.SubjectAlternativeName(
+ san_list
+ ),
+ critical=False
+ )
builder = builder.add_extension(x509.BasicConstraints(
ca=False, path_length=None), critical=True,)
return (cert_str, key_str)
- def generate_cert_files(self, host: str, addr: str) -> Tuple[str, str]:
- cert, key = self.generate_cert(host, addr)
-
- self.cert_file = tempfile.NamedTemporaryFile()
- self.cert_file.write(cert.encode('utf-8'))
- self.cert_file.flush() # cert_tmp must not be gc'ed
-
- self.key_file = tempfile.NamedTemporaryFile()
- self.key_file.write(key.encode('utf-8'))
- self.key_file.flush() # pkey_tmp must not be gc'ed
-
- verify_tls_files(self.cert_file.name, self.key_file.name)
- return self.cert_file.name, self.key_file.name
-
def get_root_cert(self) -> str:
try:
return self.root_cert.public_bytes(encoding=serialization.Encoding.PEM).decode('utf-8')
mock.patch('cephadm.module.CephadmOrchestrator.get_module_option_ex', get_module_option_ex), \
mock.patch("cephadm.module.CephadmOrchestrator.get_osdmap"), \
mock.patch("cephadm.module.CephadmOrchestrator.remote"), \
+ mock.patch("cephadm.module.CephadmOrchestrator.get_mgr_ip", lambda _: '::1'),\
mock.patch("cephadm.agent.CephadmAgentHelpers._request_agent_acks"), \
mock.patch("cephadm.agent.CephadmAgentHelpers._apply_agent", return_value=False), \
mock.patch("cephadm.agent.CephadmAgentHelpers._agent_down", return_value=False), \
def test_cert_store_save_cert(self, _set_store, cephadm_module: CephadmOrchestrator):
cephadm_module.cert_key_store._init_known_cert_key_dicts()
- agent_endpoint_root_cert = 'fake-agent-cert'
- alertmanager_host1_cert = 'fake-alertm-host1-cert'
rgw_frontend_rgw_foo_host2_cert = 'fake-rgw-cert'
nvmeof_client_cert = 'fake-nvmeof-client-cert'
nvmeof_server_cert = 'fake-nvmeof-server-cert'
cephadm_module.cert_key_store.save_cert('nvmeof_root_ca_cert', nvmeof_root_ca_cert, service_name='nvmeof.foo', user_made=True)
expected_calls = [
- mock.call(f'{CERT_STORE_CERT_PREFIX}agent_endpoint_root_cert', json.dumps(Cert(agent_endpoint_root_cert).to_json())),
- mock.call(f'{CERT_STORE_CERT_PREFIX}alertmanager_cert', json.dumps({'host1': Cert(alertmanager_host1_cert).to_json()})),
mock.call(f'{CERT_STORE_CERT_PREFIX}rgw_frontend_ssl_cert', json.dumps({'rgw.foo': Cert(rgw_frontend_rgw_foo_host2_cert, True).to_json()})),
mock.call(f'{CERT_STORE_CERT_PREFIX}nvmeof_server_cert', json.dumps({'nvmeof.foo': Cert(nvmeof_server_cert, True).to_json()})),
mock.call(f'{CERT_STORE_CERT_PREFIX}nvmeof_client_cert', json.dumps({'nvmeof.foo': Cert(nvmeof_client_cert, True).to_json()})),
'rgw_frontend_ssl_cert': False,
'iscsi_ssl_cert': False,
'ingress_ssl_cert': False,
- 'agent_endpoint_root_cert': False,
- 'service_discovery_root_cert': False,
'mgmt_gw_root_cert': False,
+ 'cephadm_root_ca_cert': False,
'grafana_cert': False,
'alertmanager_cert': False,
'prometheus_cert': False,
}
assert cephadm_module.cert_key_store.cert_ls() == expected_ls
- cephadm_module.cert_key_store.save_cert('agent_endpoint_root_cert', 'xxx')
- expected_ls['agent_endpoint_root_cert'] = True
- assert cephadm_module.cert_key_store.cert_ls() == expected_ls
-
- cephadm_module.cert_key_store.save_cert('alertmanager_cert', 'xxx', host='host1')
- cephadm_module.cert_key_store.save_cert('alertmanager_cert', 'xxx', host='host2')
- expected_ls['alertmanager_cert'] = {}
- expected_ls['alertmanager_cert']['host1'] = True
- expected_ls['alertmanager_cert']['host2'] = True
- assert cephadm_module.cert_key_store.cert_ls() == expected_ls
-
cephadm_module.cert_key_store.save_cert('rgw_frontend_ssl_cert', 'xxx', service_name='rgw.foo', user_made=True)
cephadm_module.cert_key_store.save_cert('rgw_frontend_ssl_cert', 'xxx', service_name='rgw.bar', user_made=True)
expected_ls['rgw_frontend_ssl_cert'] = {}
def test_cert_store_save_key(self, _set_store, cephadm_module: CephadmOrchestrator):
cephadm_module.cert_key_store._init_known_cert_key_dicts()
- agent_endpoint_key = 'fake-agent-key'
grafana_host1_key = 'fake-grafana-host1-key'
nvmeof_client_key = 'nvmeof-client-key'
nvmeof_server_key = 'nvmeof-server-key'
- cephadm_module.cert_key_store.save_key('agent_endpoint_key', agent_endpoint_key)
+ grafana_host1_key = 'fake-grafana-host1-cert'
cephadm_module.cert_key_store.save_key('grafana_key', grafana_host1_key, host='host1')
cephadm_module.cert_key_store.save_key('nvmeof_client_key', nvmeof_client_key, service_name='nvmeof.foo')
cephadm_module.cert_key_store.save_key('nvmeof_server_key', nvmeof_server_key, service_name='nvmeof.foo')
expected_calls = [
- mock.call(f'{CERT_STORE_KEY_PREFIX}agent_endpoint_key', json.dumps(PrivKey(agent_endpoint_key).to_json())),
mock.call(f'{CERT_STORE_KEY_PREFIX}grafana_key', json.dumps({'host1': PrivKey(grafana_host1_key).to_json()})),
mock.call(f'{CERT_STORE_KEY_PREFIX}nvmeof_client_key', json.dumps({'nvmeof.foo': PrivKey(nvmeof_client_key).to_json()})),
mock.call(f'{CERT_STORE_KEY_PREFIX}nvmeof_server_key', json.dumps({'nvmeof.foo': PrivKey(nvmeof_server_key).to_json()})),
cephadm_module.cert_key_store._init_known_cert_key_dicts()
expected_ls = {
- 'agent_endpoint_key': False,
- 'service_discovery_key': False,
'grafana_key': False,
- 'alertmanager_key': False,
'mgmt_gw_root_key': False,
- 'prometheus_key': False,
- 'node_exporter_key': False,
+ 'cephadm_root_ca_key': False,
'iscsi_ssl_key': False,
'ingress_ssl_key': False,
'nvmeof_client_key': False,
}
assert cephadm_module.cert_key_store.key_ls() == expected_ls
- cephadm_module.cert_key_store.save_key('agent_endpoint_key', 'xxx')
- expected_ls['agent_endpoint_key'] = True
- assert cephadm_module.cert_key_store.key_ls() == expected_ls
-
- cephadm_module.cert_key_store.save_key('alertmanager_key', 'xxx', host='host1')
- cephadm_module.cert_key_store.save_key('alertmanager_key', 'xxx', host='host2')
- expected_ls['alertmanager_key'] = {}
- expected_ls['alertmanager_key']['host1'] = True
- expected_ls['alertmanager_key']['host2'] = True
- assert cephadm_module.cert_key_store.key_ls() == expected_ls
-
cephadm_module.cert_key_store.save_key('nvmeof_client_key', 'xxx', service_name='nvmeof.foo')
cephadm_module.cert_key_store.save_key('nvmeof_server_key', 'xxx', service_name='nvmeof.foo')
expected_ls['nvmeof_server_key'] = {}
def test_cert_store_load(self, _get_store_prefix, cephadm_module: CephadmOrchestrator):
cephadm_module.cert_key_store._init_known_cert_key_dicts()
- agent_endpoint_root_cert = 'fake-agent-cert'
- alertmanager_host1_cert = 'fake-alertm-host1-cert'
rgw_frontend_rgw_foo_host2_cert = 'fake-rgw-cert'
- agent_endpoint_key = 'fake-agent-key'
grafana_host1_key = 'fake-grafana-host1-cert'
nvmeof_server_cert = 'nvmeof-server-cert'
nvmeof_client_cert = 'nvmeof-client-cert'
def _fake_prefix_store(key):
if key == 'cert_store.cert.':
return {
- f'{CERT_STORE_CERT_PREFIX}agent_endpoint_root_cert': json.dumps(Cert(agent_endpoint_root_cert).to_json()),
- f'{CERT_STORE_CERT_PREFIX}alertmanager_cert': json.dumps({'host1': Cert(alertmanager_host1_cert).to_json()}),
f'{CERT_STORE_CERT_PREFIX}rgw_frontend_ssl_cert': json.dumps({'rgw.foo': Cert(rgw_frontend_rgw_foo_host2_cert, True).to_json()}),
f'{CERT_STORE_CERT_PREFIX}nvmeof_server_cert': json.dumps({'nvmeof.foo': Cert(nvmeof_server_cert, True).to_json()}),
f'{CERT_STORE_CERT_PREFIX}nvmeof_client_cert': json.dumps({'nvmeof.foo': Cert(nvmeof_client_cert, True).to_json()}),
}
elif key == 'cert_store.key.':
return {
- f'{CERT_STORE_KEY_PREFIX}agent_endpoint_key': json.dumps(PrivKey(agent_endpoint_key).to_json()),
f'{CERT_STORE_KEY_PREFIX}grafana_key': json.dumps({'host1': PrivKey(grafana_host1_key).to_json()}),
f'{CERT_STORE_KEY_PREFIX}nvmeof_server_key': json.dumps({'nvmeof.foo': PrivKey(nvmeof_server_key).to_json()}),
f'{CERT_STORE_KEY_PREFIX}nvmeof_client_key': json.dumps({'nvmeof.foo': PrivKey(nvmeof_client_key).to_json()}),
_get_store_prefix.side_effect = _fake_prefix_store
cephadm_module.cert_key_store.load()
- assert cephadm_module.cert_key_store.known_certs['agent_endpoint_root_cert'] == Cert(agent_endpoint_root_cert)
- assert cephadm_module.cert_key_store.known_certs['alertmanager_cert']['host1'] == Cert(alertmanager_host1_cert)
assert cephadm_module.cert_key_store.known_certs['rgw_frontend_ssl_cert']['rgw.foo'] == Cert(rgw_frontend_rgw_foo_host2_cert, True)
assert cephadm_module.cert_key_store.known_certs['nvmeof_server_cert']['nvmeof.foo'] == Cert(nvmeof_server_cert, True)
assert cephadm_module.cert_key_store.known_certs['nvmeof_client_cert']['nvmeof.foo'] == Cert(nvmeof_client_cert, True)
assert cephadm_module.cert_key_store.known_certs['nvmeof_root_ca_cert']['nvmeof.foo'] == Cert(nvmeof_root_ca_cert, True)
- assert cephadm_module.cert_key_store.known_keys['agent_endpoint_key'] == PrivKey(agent_endpoint_key)
assert cephadm_module.cert_key_store.known_keys['grafana_key']['host1'] == PrivKey(grafana_host1_key)
assert cephadm_module.cert_key_store.known_keys['nvmeof_server_key']['nvmeof.foo'] == PrivKey(nvmeof_server_key)
assert cephadm_module.cert_key_store.known_keys['nvmeof_client_key']['nvmeof.foo'] == PrivKey(nvmeof_client_key)
def test_cert_store_get_cert_key(self, cephadm_module: CephadmOrchestrator):
cephadm_module.cert_key_store._init_known_cert_key_dicts()
- agent_endpoint_root_cert = 'fake-agent-cert'
- alertmanager_host1_cert = 'fake-alertm-host1-cert'
rgw_frontend_rgw_foo_host2_cert = 'fake-rgw-cert'
nvmeof_client_cert = 'fake-nvmeof-client-cert'
nvmeof_server_cert = 'fake-nvmeof-server-cert'
- cephadm_module.cert_key_store.save_cert('agent_endpoint_root_cert', agent_endpoint_root_cert)
- cephadm_module.cert_key_store.save_cert('alertmanager_cert', alertmanager_host1_cert, host='host1')
cephadm_module.cert_key_store.save_cert('rgw_frontend_ssl_cert', rgw_frontend_rgw_foo_host2_cert, service_name='rgw.foo', user_made=True)
cephadm_module.cert_key_store.save_cert('nvmeof_server_cert', nvmeof_server_cert, service_name='nvmeof.foo', user_made=True)
cephadm_module.cert_key_store.save_cert('nvmeof_client_cert', nvmeof_client_cert, service_name='nvmeof.foo', user_made=True)
- assert cephadm_module.cert_key_store.get_cert('agent_endpoint_root_cert') == agent_endpoint_root_cert
- assert cephadm_module.cert_key_store.get_cert('alertmanager_cert', host='host1') == alertmanager_host1_cert
assert cephadm_module.cert_key_store.get_cert('rgw_frontend_ssl_cert', service_name='rgw.foo') == rgw_frontend_rgw_foo_host2_cert
assert cephadm_module.cert_key_store.get_cert('nvmeof_server_cert', service_name='nvmeof.foo') == nvmeof_server_cert
assert cephadm_module.cert_key_store.get_cert('nvmeof_client_cert', service_name='nvmeof.foo') == nvmeof_client_cert
- assert cephadm_module.cert_key_store.get_cert('service_discovery_root_cert') == ''
assert cephadm_module.cert_key_store.get_cert('grafana_cert', host='host1') == ''
assert cephadm_module.cert_key_store.get_cert('iscsi_ssl_cert', service_name='iscsi.foo') == ''
assert cephadm_module.cert_key_store.get_cert('nvmeof_root_ca_cert', service_name='nvmeof.foo') == ''
with pytest.raises(OrchestratorError, match='Need service name to access cert for entity'):
cephadm_module.cert_key_store.get_cert('rgw_frontend_ssl_cert', host='foo')
- agent_endpoint_key = 'fake-agent-key'
grafana_host1_key = 'fake-grafana-host1-cert'
nvmeof_server_key = 'nvmeof-server-key'
- cephadm_module.cert_key_store.save_key('agent_endpoint_key', agent_endpoint_key)
cephadm_module.cert_key_store.save_key('grafana_key', grafana_host1_key, host='host1')
- cephadm_module.cert_key_store.save_key('agent_endpoint_key', agent_endpoint_key)
cephadm_module.cert_key_store.save_key('grafana_key', grafana_host1_key, host='host1')
cephadm_module.cert_key_store.save_key('nvmeof_server_key', nvmeof_server_key, service_name='nvmeof.foo')
- assert cephadm_module.cert_key_store.get_key('agent_endpoint_key') == agent_endpoint_key
assert cephadm_module.cert_key_store.get_key('grafana_key', host='host1') == grafana_host1_key
assert cephadm_module.cert_key_store.get_key('nvmeof_server_key', service_name='nvmeof.foo') == nvmeof_server_key
assert cephadm_module.cert_key_store.get_key('nvmeof_client_key', service_name='nvmeof.foo') == ''
- assert cephadm_module.cert_key_store.get_key('service_discovery_key') == ''
- assert cephadm_module.cert_key_store.get_key('alertmanager_key', host='host1') == ''
with pytest.raises(OrchestratorError, match='Attempted to access priv key for unknown entity'):
cephadm_module.cert_key_store.get_key('unknown_entity')
assert cephadm_module.cert_key_store.get_cert('ingress_ssl_cert', service_name='ingress.rgw.foo')
assert cephadm_module.cert_key_store.get_key('ingress_ssl_key', service_name='ingress.rgw.foo')
- assert cephadm_module.cert_key_store.get_cert('agent_endpoint_root_cert')
- assert cephadm_module.cert_key_store.get_key('agent_endpoint_key')
- assert cephadm_module.cert_key_store.get_cert('service_discovery_root_cert')
- assert cephadm_module.cert_key_store.get_key('service_discovery_key')
-
assert cephadm_module.cert_key_store.get_cert('grafana_cert', host='host1')
assert cephadm_module.cert_key_store.get_cert('grafana_cert', host='host2')
assert cephadm_module.cert_key_store.get_key('grafana_key', host='host1')
from . import node_proxy_data
PORT = 58585
+fake_cert = """-----BEGIN CERTIFICATE-----\nMIICxjCCAa4CEQDIZSujNBlKaLJzmvntjukjMA0GCSqGSIb3DQEBDQUAMCExDTAL\nBgNVBAoMBENlcGgxEDAOBgNVBAMMB2NlcGhhZG0wHhcNMjIwNzEzMTE0NzA3WhcN\nMzIwNzEwMTE0NzA3WjAhMQ0wCwYDVQQKDARDZXBoMRAwDgYDVQQDDAdjZXBoYWRt\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAyyMe4DMA+MeYK7BHZMHB\nq7zjliEOcNgxomjU8qbf5USF7Mqrf6+/87XWqj4pCyAW8x0WXEr6A56a+cmBVmt+\nqtWDzl020aoId6lL5EgLLn6/kMDCCJLq++Lg9cEofMSvcZh+lY2f+1p+C+00xent\nrLXvXGOilAZWaQfojT2BpRnNWWIFbpFwlcKrlg2G0cFjV5c1m6a0wpsQ9JHOieq0\nSvwCixajwq3CwAYuuiU1wjI4oJO4Io1+g8yB3nH2Mo/25SApCxMXuXh4kHLQr/T4\n4hqisvG4uJYgKMcSIrWj5o25mclByGi1UI/kZkCUES94i7Z/3ihx4Bad0AMs/9tw\nFwIDAQABMA0GCSqGSIb3DQEBDQUAA4IBAQAf+pwz7Gd7mDwU2LY0TQXsK6/8KGzh\nHuX+ErOb8h5cOAbvCnHjyJFWf6gCITG98k9nxU9NToG0WYuNm/max1y/54f0dtxZ\npUo6KSNl3w6iYCfGOeUIj8isi06xMmeTgMNzv8DYhDt+P2igN6LenqWTVztogkiV\nxQ5ZJFFLEw4sN0CXnrZX3t5ruakxLXLTLKeE0I91YJvjClSBGkVJq26wOKQNHMhx\npWxeydQ5EgPZY+Aviz5Dnxe8aB7oSSovpXByzxURSabOuCK21awW5WJCGNpmqhWK\nZzACBDEstccj57c4OGV0eayHJRsluVr2e9NHRINZA3qdB37e6gsI1xHo\n-----END CERTIFICATE-----\n"""
+
+
+class FakeCertMgr:
+ def get_root_ca(self):
+ return fake_cert
+
+ def generate_cert(self, host_fqdn, node_ip):
+ return fake_cert
class FakeMgr:
self.http_server.agent = MagicMock()
self.http_server.agent.ssl_certs = SSLCerts()
self.http_server.agent.ssl_certs.generate_root_cert(self.get_mgr_ip())
+ self.cert_mgr = FakeCertMgr()
def get_mgr_ip(self) -> str:
return '0.0.0.0'
cephadm_module.secure_monitoring_stack = True
cephadm_module.set_store(AlertmanagerService.USER_CFG_KEY, 'alertmanager_user')
cephadm_module.set_store(AlertmanagerService.PASS_CFG_KEY, 'alertmanager_plain_password')
- cephadm_module.http_server.service_discovery.ssl_certs.generate_cert = MagicMock(side_effect=gen_cert)
- cephadm_module.http_server.service_discovery.ssl_certs.get_root_cert = MagicMock(side_effect=get_root_cert)
with with_service(cephadm_module, AlertManagerSpec()):
y = dedent(f"""
use_current_daemon_image=False,
)
- assert cephadm_module.cert_key_store.get_cert('alertmanager_cert', host='test') == 'mycert'
- assert cephadm_module.cert_key_store.get_key('alertmanager_key', host='test') == 'mykey'
-
@patch("cephadm.serve.CephadmServe._run_cephadm")
@patch("cephadm.module.CephadmOrchestrator.get_mgr_ip", lambda _: '::1')
def test_prometheus_config_security_disabled(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
cephadm_module.set_store(AlertmanagerService.PASS_CFG_KEY, 'alertmanager_plain_password')
cephadm_module.http_server.service_discovery.username = 'sd_user'
cephadm_module.http_server.service_discovery.password = 'sd_password'
- cephadm_module.http_server.service_discovery.ssl_certs.generate_cert = MagicMock(
- side_effect=gen_cert)
# host "test" needs to have networks for keepalive to be placed
cephadm_module.cache.update_host_networks('test', {
'1.2.3.0/24': {
"""
raise NotImplementedError()
- def service_discovery_dump_cert(self) -> OrchResult:
- """
- Returns service discovery server root certificate
-
- :return: service discovery root certificate
- """
- raise NotImplementedError()
-
def describe_service(self, service_type: Optional[str] = None, service_name: Optional[str] = None, refresh: bool = False) -> OrchResult[List['ServiceDescription']]:
"""
Describe a service (of any kind) that is already configured in
raise_if_exception(completion)
return HandleCommandResult(stdout=completion.result_str())
- @_cli_write_command('orch sd dump cert')
- def _service_discovery_dump_cert(self) -> HandleCommandResult:
- """
- Returns service discovery server root certificate
- """
- completion = self.service_discovery_dump_cert()
- raise_if_exception(completion)
- return HandleCommandResult(stdout=completion.result_str())
-
@_cli_read_command('orch ls')
def _list_services(self,
service_type: Optional[str] = None,