From a0fe3ffdaaddd661f115de78ddae89b07b98a9ae Mon Sep 17 00:00:00 2001 From: Redouane Kachach Date: Wed, 25 May 2022 12:27:23 +0200 Subject: [PATCH] mgr/cephadm: adding dynamic prometheus config based on http_sd Signed-off-by: Redouane Kachach --- doc/cephadm/services/monitoring.rst | 36 +- src/cephadm/cephadm | 4 +- src/pybind/mgr/cephadm/agent.py | 417 ++++-------------- src/pybind/mgr/cephadm/http_server.py | 63 +++ src/pybind/mgr/cephadm/module.py | 43 +- src/pybind/mgr/cephadm/serve.py | 4 +- src/pybind/mgr/cephadm/service_discovery.py | 197 +++++++++ .../mgr/cephadm/services/cephadmservice.py | 20 +- src/pybind/mgr/cephadm/services/monitoring.py | 99 ++--- src/pybind/mgr/cephadm/ssl_cert_utils.py | 123 ++++++ .../services/prometheus/prometheus.yml.j2 | 43 +- src/pybind/mgr/cephadm/tests/fixtures.py | 4 +- ...est_agent.py => test_service_discovery.py} | 4 +- src/pybind/mgr/cephadm/tests/test_services.py | 19 +- src/pybind/mgr/orchestrator/_interface.py | 8 + src/pybind/mgr/orchestrator/module.py | 9 + 16 files changed, 626 insertions(+), 467 deletions(-) create mode 100644 src/pybind/mgr/cephadm/http_server.py create mode 100644 src/pybind/mgr/cephadm/service_discovery.py create mode 100644 src/pybind/mgr/cephadm/ssl_cert_utils.py rename src/pybind/mgr/cephadm/tests/{test_agent.py => test_service_discovery.py} (98%) diff --git a/doc/cephadm/services/monitoring.rst b/doc/cephadm/services/monitoring.rst index 698cac0b8aa..7380560e7a6 100644 --- a/doc/cephadm/services/monitoring.rst +++ b/doc/cephadm/services/monitoring.rst @@ -71,10 +71,10 @@ steps below: ceph orch apply prometheus - or + or .. prompt:: bash # - + ceph orch apply prometheus --placement 'count:2' #. Deploy grafana: @@ -193,12 +193,26 @@ configuration files for monitoring services. Internally, cephadm already uses `Jinja2 `_ templates to generate the -configuration files for all monitoring components. To be able to customize the -configuration of Prometheus, Grafana or the Alertmanager it is possible to store -a Jinja2 template for each service that will be used for configuration -generation instead. This template will be evaluated every time a service of that -kind is deployed or reconfigured. That way, the custom configuration is -preserved and automatically applied on future deployments of these services. +configuration files for all monitoring components. Starting from version x.x.x, +cephadm uses Prometheus http service discovery support `http_sd_config +` +in order to get the currently configured targets from Ceph. Internally, `ceph-mgr` +provides a service discovery endpoint at `:8765/sd/` (port is +configurable through the variable `service_discovery_port`) which is used by +Prometheus to get the needed targets. + +Customers with external monitoring stack can use `ceph-mgr` service discovery endpoint +to get scraping configuration. Root certificate of the server can be obtained by the +following command: + + .. prompt:: bash # + + ceph orch sd dump cert + +The configuration of Prometheus, Grafana, or Alertmanager may be customized by storing +a Jinja2 template for each service. This template will be evaluated every time a service +of that kind is deployed or reconfigured. That way, the custom configuration is preserved +and automatically applied on future deployments of these services. .. note:: @@ -282,6 +296,12 @@ cluster. By default, ceph-mgr presents prometheus metrics on port 9283 on each host running a ceph-mgr daemon. Configure prometheus to scrape these. +To make this integration easier, Ceph provides by means of `ceph-mgr` a service +discovery endpoint at `:8765/sd/` which can be used by an external +Prometheus to retrieve targets information. Information reported by this EP used +the format specified by `http_sd_config +` + * To enable the dashboard's prometheus-based alerting, see :ref:`dashboard-alerting`. * To enable dashboard integration with Grafana, see :ref:`dashboard-grafana`. diff --git a/src/cephadm/cephadm b/src/cephadm/cephadm index ba342991e1b..82ccad9d32c 100755 --- a/src/cephadm/cephadm +++ b/src/cephadm/cephadm @@ -4403,7 +4403,7 @@ WantedBy=ceph-{fsid}.target 'port': self.listener_port}) data = data.encode('ascii') - url = f'https://{self.target_ip}:{self.target_port}/data' + url = f'https://{self.target_ip}:{self.target_port}/data/' try: req = Request(url, data, {'Content-Type': 'application/json'}) send_time = time.monotonic() @@ -5177,7 +5177,7 @@ def create_mgr( # Note:the default port used by the Prometheus node exporter is opened in fw ctx.meta_json = json.dumps({'service_name': 'mgr'}) deploy_daemon(ctx, fsid, 'mgr', mgr_id, mgr_c, uid, gid, - config=config, keyring=mgr_keyring, ports=[9283]) + config=config, keyring=mgr_keyring, ports=[9283, 8765]) # wait for the service to become available logger.info('Waiting for mgr to start...') diff --git a/src/pybind/mgr/cephadm/agent.py b/src/pybind/mgr/cephadm/agent.py index fa75a8759bb..bb35166a155 100644 --- a/src/pybind/mgr/cephadm/agent.py +++ b/src/pybind/mgr/cephadm/agent.py @@ -1,5 +1,11 @@ -import cherrypy -import ipaddress +try: + import cherrypy + from cherrypy._cpserver import Server +except ImportError: + # to avoid sphinx build crash + class Server: # type: ignore + pass + import json import logging import socket @@ -8,25 +14,17 @@ import tempfile import threading import time -from mgr_module import ServiceInfoT -from mgr_util import verify_tls_files, build_url + +from mgr_util import verify_tls_files from orchestrator import DaemonDescriptionStatus, OrchestratorError from orchestrator._interface import daemon_type_to_service from ceph.utils import datetime_now from ceph.deployment.inventory import Devices from ceph.deployment.service_spec import ServiceSpec, PlacementSpec from cephadm.services.cephadmservice import CephadmDaemonDeploySpec -from cephadm.services.ingress import IngressSpec - -from datetime import datetime, timedelta -from cryptography import x509 -from cryptography.x509.oid import NameOID -from cryptography.hazmat.primitives.asymmetric import rsa -from cryptography.hazmat.primitives import hashes, serialization -from cryptography.hazmat.backends import default_backend +from cephadm.ssl_cert_utils import SSLCerts -from typing import Any, Dict, List, Set, Tuple, \ - TYPE_CHECKING, Optional, cast, Collection +from typing import Any, Dict, List, Set, TYPE_CHECKING, Optional if TYPE_CHECKING: from cephadm.module import CephadmOrchestrator @@ -44,212 +42,87 @@ logging.getLogger('cherrypy.error').addFilter(cherrypy_filter) cherrypy.log.access_log.propagate = False -class CherryPyThread(threading.Thread): +class AgentEndpoint: + + KV_STORE_AGENT_ROOT_CERT = 'cephadm_agent/root/cert' + KV_STORE_AGENT_ROOT_KEY = 'cephadm_agent/root/key' + def __init__(self, mgr: "CephadmOrchestrator") -> None: self.mgr = mgr - self.cherrypy_shutdown_event = threading.Event() - self.ssl_certs = SSLCerts(self.mgr) + self.ssl_certs = SSLCerts() self.server_port = 7150 self.server_addr = self.mgr.get_mgr_ip() - super(CherryPyThread, self).__init__(target=self.run) - - def configure_cherrypy(self) -> None: - cherrypy.config.update({ - 'environment': 'production', - 'server.socket_host': self.server_addr, - 'server.socket_port': self.server_port, - 'engine.autoreload.on': False, - 'server.ssl_module': 'builtin', - 'server.ssl_certificate': self.cert_tmp.name, - 'server.ssl_private_key': self.key_tmp.name, - }) + self.host_data: Server = None + + def configure_routes(self) -> None: + + self.host_data = HostData(self.mgr, + self.server_port, + self.server_addr, + self.cert_file.name, + self.key_file.name) # configure routes - root = Root(self.mgr) - host_data = HostData(self.mgr) d = cherrypy.dispatch.RoutesDispatcher() - d.connect(name='index', route='/', controller=root.index) - d.connect(name='sd-config', route='/prometheus/sd-config', controller=root.get_sd_config) - d.connect(name='rules', route='/prometheus/rules', controller=root.get_prometheus_rules) - d.connect(name='host-data', route='/data', controller=host_data.POST, + d.connect(name='host-data', route='/', + controller=self.host_data.POST, conditions=dict(method=['POST'])) - conf = {'/': {'request.dispatch': d}} - cherrypy.tree.mount(None, "/", config=conf) + cherrypy.tree.mount(None, '/data', config={'/': {'request.dispatch': d}}) - def run(self) -> None: + def configure_tls(self) -> None: try: + old_cert = self.mgr.get_store(self.KV_STORE_AGENT_ROOT_CERT) + old_key = self.mgr.get_store(self.KV_STORE_AGENT_ROOT_KEY) + if not old_key or not old_cert: + raise OrchestratorError('No old credentials for agent found') + self.ssl_certs.load_root_credentials(old_cert, old_key) + except (OrchestratorError, json.decoder.JSONDecodeError, KeyError, ValueError): + self.ssl_certs.generate_root_cert(self.mgr.get_mgr_ip()) + self.mgr.set_store(self.KV_STORE_AGENT_ROOT_CERT, self.ssl_certs.get_root_cert()) + self.mgr.set_store(self.KV_STORE_AGENT_ROOT_KEY, self.ssl_certs.get_root_key()) + + cert, key = self.ssl_certs.generate_cert(self.mgr.get_mgr_ip()) + self.key_file = tempfile.NamedTemporaryFile() + self.key_file.write(key.encode('utf-8')) + self.key_file.flush() # pkey_tmp must not be gc'ed + self.cert_file = tempfile.NamedTemporaryFile() + self.cert_file.write(cert.encode('utf-8')) + self.cert_file.flush() # cert_tmp must not be gc'ed + verify_tls_files(self.cert_file.name, self.key_file.name) + + def find_free_port(self) -> None: + max_port = self.server_port + 150 + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + while self.server_port <= max_port: try: - old_creds = self.mgr.get_store('cephadm_endpoint_credentials') - if not old_creds: - raise OrchestratorError('No old credentials for cephadm endpoint found') - old_creds_dict = json.loads(old_creds) - old_key = old_creds_dict['key'] - old_cert = old_creds_dict['cert'] - self.ssl_certs.load_root_credentials(old_cert, old_key) - except (OrchestratorError, json.decoder.JSONDecodeError, KeyError, ValueError): - self.ssl_certs.generate_root_cert() - - cert, key = self.ssl_certs.generate_cert() - - self.key_tmp = tempfile.NamedTemporaryFile() - self.key_tmp.write(key.encode('utf-8')) - self.key_tmp.flush() # pkey_tmp must not be gc'ed - key_fname = self.key_tmp.name - - self.cert_tmp = tempfile.NamedTemporaryFile() - self.cert_tmp.write(cert.encode('utf-8')) - self.cert_tmp.flush() # cert_tmp must not be gc'ed - cert_fname = self.cert_tmp.name - - verify_tls_files(cert_fname, key_fname) - self.configure_cherrypy() - - self.mgr.log.debug('Starting cherrypy engine...') - self.start_engine() - self.mgr.log.debug('Cherrypy engine started.') - cephadm_endpoint_creds = { - 'cert': self.ssl_certs.get_root_cert(), - 'key': self.ssl_certs.get_root_key() - } - self.mgr.set_store('cephadm_endpoint_credentials', json.dumps(cephadm_endpoint_creds)) - self.mgr._kick_serve_loop() - # wait for the shutdown event - self.cherrypy_shutdown_event.wait() - self.cherrypy_shutdown_event.clear() - cherrypy.engine.stop() - self.mgr.log.debug('Cherrypy engine stopped.') - except Exception as e: - self.mgr.log.error(f'Failed to run cephadm cherrypy endpoint: {e}') - - def start_engine(self) -> None: - port_connect_attempts = 0 - while port_connect_attempts < 150: - try: - cherrypy.engine.start() - self.mgr.log.debug(f'Cephadm endpoint connected to port {self.server_port}') + sock.bind((self.server_addr, self.server_port)) + sock.close() + self.host_data.socket_port = self.server_port + self.mgr.log.debug(f'Cephadm agent endpoint using {self.server_port}') return - except cherrypy.process.wspbus.ChannelFailures as e: - self.mgr.log.debug( - f'{e}. Trying next port.') + except OSError: self.server_port += 1 - cherrypy.server.httpserver = None - cherrypy.config.update({ - 'server.socket_port': self.server_port - }) - port_connect_attempts += 1 self.mgr.log.error( - 'Cephadm Endpoint could not find free port in range 7150-7300 and failed to start') + 'Cephadm agent endpoint could not find free port in range 7150-7300 and failed to start') - def shutdown(self) -> None: - self.mgr.log.debug('Stopping cherrypy engine...') - self.cherrypy_shutdown_event.set() + def configure(self) -> None: + self.configure_tls() + self.configure_routes() + self.find_free_port() -class Root(object): - - # collapse everything to '/' - def _cp_dispatch(self, vpath: str) -> 'Root': - cherrypy.request.path = '' - return self - - def __init__(self, mgr: "CephadmOrchestrator"): - self.mgr = mgr - - @cherrypy.expose - def index(self) -> str: - return ''' - -Cephadm HTTP Endpoint - -

Cephadm Service Discovery Endpoints

-

mgr/Prometheus http sd-config

-

Alertmanager http sd-config

-

Node exporter http sd-config

-

HAProxy http sd-config

-

Prometheus rules

- -''' - - @cherrypy.expose - @cherrypy.tools.json_out() - def get_sd_config(self, service: str) -> List[Dict[str, Collection[str]]]: - """Return compatible prometheus config for the specified service.""" - if service == 'mgr-prometheus': - return self.prometheus_sd_config() - elif service == 'alertmanager': - return self.alertmgr_sd_config() - elif service == 'node-exporter': - return self.node_exporter_sd_config() - elif service == 'haproxy': - return self.haproxy_sd_config() - else: - return [] - - def prometheus_sd_config(self) -> List[Dict[str, Collection[str]]]: - """Return compatible prometheus config for prometheus service.""" - servers = self.mgr.list_servers() - targets = [] - for server in servers: - hostname = server.get('hostname', '') - for service in cast(List[ServiceInfoT], server.get('services', [])): - if service['type'] != 'mgr': - continue - port = self.mgr.get_module_option_ex('prometheus', 'server_port', 9283) - targets.append(f'{hostname}:{port}') - return [{"targets": targets, "labels": {}}] - - def alertmgr_sd_config(self) -> List[Dict[str, Collection[str]]]: - """Return compatible prometheus config for mgr alertmanager service.""" - srv_entries = [] - for dd in self.mgr.cache.get_daemons_by_service('alertmanager'): - assert dd.hostname is not None - addr = dd.ip if dd.ip else self.mgr.inventory.get_addr(dd.hostname) - port = dd.ports[0] if dd.ports else 9093 - srv_entries.append('{}'.format(build_url(host=addr, port=port).lstrip('/'))) - return [{"targets": srv_entries, "labels": {}}] - - def node_exporter_sd_config(self) -> List[Dict[str, Collection[str]]]: - """Return compatible prometheus config for node-exporter service.""" - srv_entries = [] - for dd in self.mgr.cache.get_daemons_by_service('node-exporter'): - assert dd.hostname is not None - addr = dd.ip if dd.ip else self.mgr.inventory.get_addr(dd.hostname) - port = dd.ports[0] if dd.ports else 9100 - srv_entries.append({ - 'targets': [build_url(host=addr, port=port).lstrip('/')], - 'labels': {'instance': dd.hostname} - }) - return srv_entries - - def haproxy_sd_config(self) -> List[Dict[str, Collection[str]]]: - """Return compatible prometheus config for haproxy service.""" - srv_entries = [] - for dd in self.mgr.cache.get_daemons_by_type('ingress'): - if dd.service_name() in self.mgr.spec_store: - spec = cast(IngressSpec, self.mgr.spec_store[dd.service_name()].spec) - assert dd.hostname is not None - if dd.daemon_type == 'haproxy': - addr = self.mgr.inventory.get_addr(dd.hostname) - srv_entries.append({ - 'targets': [f"{build_url(host=addr, port=spec.monitor_port).lstrip('/')}"], - 'labels': {'instance': dd.service_name()} - }) - return srv_entries - - @cherrypy.expose(alias='prometheus/rules') - def get_prometheus_rules(self) -> str: - """Return currently configured prometheus rules as Yaml.""" - cherrypy.response.headers['Content-Type'] = 'text/plain' - with open(self.mgr.prometheus_alerts_path, 'r', encoding='utf-8') as f: - return f.read() - - -class HostData: +class HostData(Server): exposed = True - def __init__(self, mgr: "CephadmOrchestrator"): + def __init__(self, mgr: "CephadmOrchestrator", port: int, host: str, ssl_ca_cert: str, ssl_priv_key: str): self.mgr = mgr + super().__init__() + self.socket_port = port + self.ssl_certificate = ssl_ca_cert + self.ssl_private_key = ssl_priv_key + self._socket_host = host + self.subscribe() @cherrypy.tools.json_in() @cherrypy.tools.json_out() @@ -370,25 +243,26 @@ class HostData: class AgentMessageThread(threading.Thread): def __init__(self, host: str, port: int, data: Dict[Any, Any], mgr: "CephadmOrchestrator", daemon_spec: Optional[CephadmDaemonDeploySpec] = None) -> None: self.mgr = mgr + self.agent = mgr.http_server.agent self.host = host self.addr = self.mgr.inventory.get_addr(host) if host in self.mgr.inventory else host self.port = port self.data: str = json.dumps(data) self.daemon_spec: Optional[CephadmDaemonDeploySpec] = daemon_spec - super(AgentMessageThread, self).__init__(target=self.run) + super().__init__(target=self.run) def run(self) -> None: self.mgr.log.debug(f'Sending message to agent on host {self.host}') self.mgr.agent_cache.sending_agent_message[self.host] = True try: - assert self.mgr.cherrypy_thread - root_cert = self.mgr.cherrypy_thread.ssl_certs.get_root_cert() + assert self.agent + root_cert = self.agent.ssl_certs.get_root_cert() root_cert_tmp = tempfile.NamedTemporaryFile() root_cert_tmp.write(root_cert.encode('utf-8')) root_cert_tmp.flush() root_cert_fname = root_cert_tmp.name - cert, key = self.mgr.cherrypy_thread.ssl_certs.generate_cert() + cert, key = self.agent.ssl_certs.generate_cert(self.mgr.get_mgr_ip()) cert_tmp = tempfile.NamedTemporaryFile() cert_tmp.write(cert.encode('utf-8')) @@ -451,6 +325,7 @@ class AgentMessageThread(threading.Thread): class CephadmAgentHelpers: def __init__(self, mgr: "CephadmOrchestrator"): self.mgr: "CephadmOrchestrator" = mgr + self.agent = mgr.http_server.agent def _request_agent_acks(self, hosts: Set[str], increment: bool = False, daemon_spec: Optional[CephadmDaemonDeploySpec] = None) -> None: for host in hosts: @@ -551,8 +426,8 @@ class CephadmAgentHelpers: def _check_agent(self, host: str) -> bool: down = False try: - assert self.mgr.cherrypy_thread - assert self.mgr.cherrypy_thread.ssl_certs.get_root_cert() + assert self.agent + assert self.agent.ssl_certs.get_root_cert() except Exception: self.mgr.log.debug( f'Delaying checking agent on {host} until cephadm endpoint finished creating root cert') @@ -576,7 +451,7 @@ class CephadmAgentHelpers: # so it's necessary to check this one specifically root_cert_match = False try: - root_cert = self.mgr.cherrypy_thread.ssl_certs.get_root_cert() + root_cert = self.agent.ssl_certs.get_root_cert() if last_deps and root_cert in last_deps: root_cert_match = True except Exception: @@ -608,133 +483,3 @@ class CephadmAgentHelpers: self.mgr.log.debug( f'Agent on host {host} not ready to {action}: {e}') return down - - -class SSLCerts: - def __init__(self, mgr: "CephadmOrchestrator") -> None: - self.mgr = mgr - self.root_cert: Any - self.root_key: Any - - def generate_root_cert(self) -> Tuple[str, str]: - self.root_key = rsa.generate_private_key( - public_exponent=65537, key_size=4096, backend=default_backend()) - root_public_key = self.root_key.public_key() - - root_builder = x509.CertificateBuilder() - - root_builder = root_builder.subject_name(x509.Name([ - x509.NameAttribute(NameOID.COMMON_NAME, u'cephadm-root'), - ])) - - root_builder = root_builder.issuer_name(x509.Name([ - x509.NameAttribute(NameOID.COMMON_NAME, u'cephadm-root'), - ])) - - root_builder = root_builder.not_valid_before(datetime.now()) - root_builder = root_builder.not_valid_after(datetime.now() + timedelta(days=(365 * 10 + 3))) - root_builder = root_builder.serial_number(x509.random_serial_number()) - root_builder = root_builder.public_key(root_public_key) - root_builder = root_builder.add_extension( - x509.SubjectAlternativeName( - [x509.IPAddress(ipaddress.IPv4Address(str(self.mgr.get_mgr_ip())))] - ), - critical=False - ) - root_builder = root_builder.add_extension( - x509.BasicConstraints(ca=True, path_length=None), critical=True, - ) - - self.root_cert = root_builder.sign( - private_key=self.root_key, algorithm=hashes.SHA256(), backend=default_backend() - ) - - cert_str = self.root_cert.public_bytes(encoding=serialization.Encoding.PEM).decode('utf-8') - key_str = self.root_key.private_bytes( - encoding=serialization.Encoding.PEM, - format=serialization.PrivateFormat.TraditionalOpenSSL, - encryption_algorithm=serialization.NoEncryption() - ).decode('utf-8') - - return (cert_str, key_str) - - def generate_cert(self, addr: str = '') -> Tuple[str, str]: - have_ip = True - if addr: - try: - ip = x509.IPAddress(ipaddress.IPv4Address(addr)) - except Exception: - try: - ip = x509.IPAddress(ipaddress.IPv6Address(addr)) - except Exception: - have_ip = False - pass - else: - ip = x509.IPAddress(ipaddress.IPv4Address(self.mgr.get_mgr_ip())) - - private_key = rsa.generate_private_key( - public_exponent=65537, key_size=4096, backend=default_backend()) - public_key = private_key.public_key() - - builder = x509.CertificateBuilder() - - builder = builder.subject_name(x509.Name([ - x509.NameAttribute(NameOID.COMMON_NAME, addr if addr else str(self.mgr.get_mgr_ip())), - ])) - - builder = builder.issuer_name(x509.Name([ - x509.NameAttribute(NameOID.COMMON_NAME, u'cephadm-root'), - ])) - - builder = builder.not_valid_before(datetime.now()) - builder = builder.not_valid_after(datetime.now() + timedelta(days=(365 * 10 + 3))) - builder = builder.serial_number(x509.random_serial_number()) - builder = builder.public_key(public_key) - if have_ip: - builder = builder.add_extension( - x509.SubjectAlternativeName( - [ip] - ), - critical=False - ) - builder = builder.add_extension( - x509.BasicConstraints(ca=False, path_length=None), critical=True, - ) - - cert = builder.sign( - private_key=self.root_key, algorithm=hashes.SHA256(), backend=default_backend() - ) - - cert_str = cert.public_bytes(encoding=serialization.Encoding.PEM).decode('utf-8') - key_str = private_key.private_bytes( - encoding=serialization.Encoding.PEM, - format=serialization.PrivateFormat.TraditionalOpenSSL, - encryption_algorithm=serialization.NoEncryption() - ).decode('utf-8') - - return (cert_str, key_str) - - def get_root_cert(self) -> str: - try: - return self.root_cert.public_bytes(encoding=serialization.Encoding.PEM).decode('utf-8') - except AttributeError: - return '' - - def get_root_key(self) -> str: - try: - return self.root_key.private_bytes( - encoding=serialization.Encoding.PEM, - format=serialization.PrivateFormat.TraditionalOpenSSL, - encryption_algorithm=serialization.NoEncryption(), - ).decode('utf-8') - except AttributeError: - return '' - - def load_root_credentials(self, cert: str, priv_key: str) -> None: - given_cert = x509.load_pem_x509_certificate(cert.encode('utf-8'), backend=default_backend()) - tz = given_cert.not_valid_after.tzinfo - if datetime.now(tz) >= given_cert.not_valid_after: - raise OrchestratorError('Given cert is expired') - self.root_cert = given_cert - self.root_key = serialization.load_pem_private_key( - data=priv_key.encode('utf-8'), backend=default_backend(), password=None) diff --git a/src/pybind/mgr/cephadm/http_server.py b/src/pybind/mgr/cephadm/http_server.py new file mode 100644 index 00000000000..7c63740db5c --- /dev/null +++ b/src/pybind/mgr/cephadm/http_server.py @@ -0,0 +1,63 @@ +import cherrypy +import threading +import logging +from typing import TYPE_CHECKING + +from cephadm.agent import AgentEndpoint +from cephadm.service_discovery import ServiceDiscovery + + +if TYPE_CHECKING: + from cephadm.module import CephadmOrchestrator + + +def cherrypy_filter(record: logging.LogRecord) -> int: + blocked = [ + 'TLSV1_ALERT_DECRYPT_ERROR' + ] + msg = record.getMessage() + return not any([m for m in blocked if m in msg]) + + +logging.getLogger('cherrypy.error').addFilter(cherrypy_filter) +cherrypy.log.access_log.propagate = False + + +class CephadmHttpServer(threading.Thread): + def __init__(self, mgr: "CephadmOrchestrator") -> None: + self.mgr = mgr + self.agent = AgentEndpoint(mgr) + self.service_discovery = ServiceDiscovery(mgr) + self.cherrypy_shutdown_event = threading.Event() + super().__init__(target=self.run) + + def configure_cherrypy(self) -> None: + cherrypy.config.update({ + 'environment': 'production', + 'engine.autoreload.on': False, + }) + + def run(self) -> None: + try: + self.configure_cherrypy() + self.agent.configure() + self.service_discovery.configure() + + self.mgr.log.debug('Starting cherrypy engine...') + cherrypy.server.unsubscribe() # disable default server + cherrypy.engine.start() + self.mgr.log.debug('Cherrypy engine started.') + + self.mgr._kick_serve_loop() + # wait for the shutdown event + self.cherrypy_shutdown_event.wait() + self.cherrypy_shutdown_event.clear() + cherrypy.engine.stop() + cherrypy.server.httpserver = None + self.mgr.log.debug('Cherrypy engine stopped.') + except Exception as e: + self.mgr.log.error(f'Failed to run cephadm http server: {e}') + + def shutdown(self) -> None: + self.mgr.log.debug('Stopping cherrypy engine...') + self.cherrypy_shutdown_event.set() diff --git a/src/pybind/mgr/cephadm/module.py b/src/pybind/mgr/cephadm/module.py index 8a11b937323..3ee67f20938 100644 --- a/src/pybind/mgr/cephadm/module.py +++ b/src/pybind/mgr/cephadm/module.py @@ -10,6 +10,8 @@ from functools import wraps from tempfile import TemporaryDirectory, NamedTemporaryFile from threading import Event +from cephadm.service_discovery import ServiceDiscovery + import string from typing import List, Dict, Optional, Callable, Tuple, TypeVar, \ Any, Set, TYPE_CHECKING, cast, NamedTuple, Sequence, Type, Awaitable @@ -30,7 +32,8 @@ from ceph.deployment.service_spec import \ from ceph.utils import str_to_datetime, datetime_to_str, datetime_now from cephadm.serve import CephadmServe from cephadm.services.cephadmservice import CephadmDaemonDeploySpec -from cephadm.agent import CherryPyThread, CephadmAgentHelpers +from cephadm.http_server import CephadmHttpServer +from cephadm.agent import CephadmAgentHelpers from mgr_module import MgrModule, HandleCommandResult, Option, NotifyType @@ -420,6 +423,12 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, default=10, desc='max number of osds that will be drained simultaneously when osds are removed' ), + Option( + 'service_discovery_port', + type='int', + default=8765, + desc='cephadm service discovery port' + ), ] def __init__(self, *args: Any, **kwargs: Any): @@ -491,6 +500,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, self.agent_refresh_rate = 0 self.agent_down_multiplier = 0.0 self.agent_starting_port = 0 + self.service_discovery_port = 0 self.apply_spec_fails: List[Tuple[str, str]] = [] self.max_osd_draining_count = 10 self.device_enhanced_scan = False @@ -578,8 +588,8 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, self.config_checker = CephadmConfigChecks(self) - self.cherrypy_thread = CherryPyThread(self) - self.cherrypy_thread.start() + self.http_server = CephadmHttpServer(self) + self.http_server.start() self.agent_helpers = CephadmAgentHelpers(self) if self.use_agent: self.agent_helpers._apply_agent() @@ -591,7 +601,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, self.log.debug('shutdown') self._worker_pool.close() self._worker_pool.join() - self.cherrypy_thread.shutdown() + self.http_server.shutdown() self.offline_watcher.shutdown() self.run = False self.event.set() @@ -2051,6 +2061,9 @@ Then run the following: def daemon_is_self(self, daemon_type: str, daemon_id: str) -> bool: return daemon_type == 'mgr' and daemon_id == self.get_mgr_id() + def get_active_mgr(self) -> DaemonDescription: + return self.mgr_service.get_active_daemon(self.cache.get_daemons_by_type('mgr')) + def get_active_mgr_digests(self) -> List[str]: digests = self.mgr_service.get_active_daemon( self.cache.get_daemons_by_type('mgr')).container_image_digests @@ -2388,17 +2401,24 @@ Then run the following: root_cert = '' server_port = '' try: - server_port = str(self.cherrypy_thread.server_port) - root_cert = self.cherrypy_thread.ssl_certs.get_root_cert() + server_port = str(self.http_server.agent.server_port) + root_cert = self.http_server.agent.ssl_certs.get_root_cert() except Exception: pass deps = sorted([self.get_mgr_ip(), server_port, root_cert, str(self.device_enhanced_scan)]) elif daemon_type == 'iscsi': deps = [self.get_mgr_ip()] + + elif daemon_type == 'prometheus': + # for prometheus we add the active mgr as an explicit dependency, + # this way we force a redeploy after a mgr failover + deps.append(self.get_active_mgr().name()) + deps.append(str(self.get_module_option_ex('prometheus', 'server_port', 9283))) + deps += [s for s in ['node-exporter', 'alertmanager', 'ingress'] + if self.cache.get_daemons_by_service(s)] else: need = { - 'prometheus': ['mgr', 'alertmanager', 'node-exporter', 'ingress'], 'grafana': ['prometheus', 'loki'], 'alertmanager': ['mgr', 'alertmanager', 'snmp-gateway'], 'promtail': ['loki'], @@ -2406,8 +2426,6 @@ Then run the following: for dep_type in need.get(daemon_type, []): for dd in self.cache.get_daemons_by_type(dep_type): deps.append(dd.name()) - if daemon_type == 'prometheus': - deps.append(str(self.get_module_option_ex('prometheus', 'server_port', 9283))) return sorted(deps) @forall_hosts @@ -2553,6 +2571,13 @@ Then run the following: self._kick_serve_loop() return f'Removed setting {setting} from tuned profile {profile_name}' + @handle_orch_error + def service_discovery_dump_cert(self) -> str: + root_cert = self.get_store(ServiceDiscovery.KV_STORE_SD_ROOT_CERT) + if not root_cert: + raise OrchestratorError('No certificate found for service discovery') + return root_cert + def set_health_warning(self, name: str, summary: str, count: int, detail: List[str]) -> None: self.health_checks[name] = { 'severity': 'warning', diff --git a/src/pybind/mgr/cephadm/serve.py b/src/pybind/mgr/cephadm/serve.py index 2f26ca70900..7596b3f5344 100644 --- a/src/pybind/mgr/cephadm/serve.py +++ b/src/pybind/mgr/cephadm/serve.py @@ -580,8 +580,8 @@ class CephadmServe: if service_type == 'agent': try: - assert self.mgr.cherrypy_thread - assert self.mgr.cherrypy_thread.ssl_certs.get_root_cert() + assert self.mgr.http_server.agent + assert self.mgr.http_server.agent.ssl_certs.get_root_cert() except Exception: self.log.info( 'Delaying applying agent spec until cephadm endpoint root cert created') diff --git a/src/pybind/mgr/cephadm/service_discovery.py b/src/pybind/mgr/cephadm/service_discovery.py new file mode 100644 index 00000000000..50d83e44d18 --- /dev/null +++ b/src/pybind/mgr/cephadm/service_discovery.py @@ -0,0 +1,197 @@ +try: + import cherrypy + from cherrypy._cpserver import Server +except ImportError: + # to avoid sphinx build crash + class Server: # type: ignore + pass + +import logging +import tempfile +from typing import Dict, List, TYPE_CHECKING, cast, Collection + +from orchestrator import OrchestratorError +from mgr_module import ServiceInfoT +from mgr_util import verify_tls_files, build_url +from cephadm.services.ingress import IngressSpec +from cephadm.ssl_cert_utils import SSLCerts + +if TYPE_CHECKING: + from cephadm.module import CephadmOrchestrator + + +def cherrypy_filter(record: logging.LogRecord) -> int: + blocked = [ + 'TLSV1_ALERT_DECRYPT_ERROR' + ] + msg = record.getMessage() + return not any([m for m in blocked if m in msg]) + + +logging.getLogger('cherrypy.error').addFilter(cherrypy_filter) +cherrypy.log.access_log.propagate = False + + +class ServiceDiscovery: + + KV_STORE_SD_ROOT_CERT = 'service_discovery/root/cert' + KV_STORE_SD_ROOT_KEY = 'service_discovery/root/key' + + def __init__(self, mgr: "CephadmOrchestrator") -> None: + self.mgr = mgr + self.ssl_certs = SSLCerts() + self.server_port = self.mgr.service_discovery_port + self.server_addr = '::' + + def configure_routes(self) -> None: + + root_server = Root(self.mgr, + self.server_port, + self.server_addr, + self.cert_file.name, + self.key_file.name) + + # configure routes + d = cherrypy.dispatch.RoutesDispatcher() + d.connect(name='index', route='/', controller=root_server.index) + d.connect(name='index', route='/sd', controller=root_server.index) + d.connect(name='index', route='/sd/', controller=root_server.index) + d.connect(name='sd-config', route='/sd/prometheus/sd-config', + controller=root_server.get_sd_config) + d.connect(name='rules', route='/sd/prometheus/rules', + controller=root_server.get_prometheus_rules) + cherrypy.tree.mount(None, '/', config={'/': {'request.dispatch': d}}) + + def configure_tls(self) -> None: + try: + old_cert = self.mgr.get_store(self.KV_STORE_SD_ROOT_CERT) + old_key = self.mgr.get_store(self.KV_STORE_SD_ROOT_KEY) + if not old_key or not old_cert: + raise OrchestratorError('No old credentials for service discovery found') + self.ssl_certs.load_root_credentials(old_cert, old_key) + except (OrchestratorError, KeyError, ValueError): + self.ssl_certs.generate_root_cert(self.mgr.get_mgr_ip()) + self.mgr.set_store(self.KV_STORE_SD_ROOT_CERT, self.ssl_certs.get_root_cert()) + self.mgr.set_store(self.KV_STORE_SD_ROOT_KEY, self.ssl_certs.get_root_key()) + + cert, key = self.ssl_certs.generate_cert(self.mgr.get_mgr_ip()) + self.key_file = tempfile.NamedTemporaryFile() + self.key_file.write(key.encode('utf-8')) + self.key_file.flush() # pkey_tmp must not be gc'ed + self.cert_file = tempfile.NamedTemporaryFile() + self.cert_file.write(cert.encode('utf-8')) + self.cert_file.flush() # cert_tmp must not be gc'ed + verify_tls_files(self.cert_file.name, self.key_file.name) + + def configure(self) -> None: + self.configure_tls() + self.configure_routes() + + +class Root(Server): + + # collapse everything to '/' + def _cp_dispatch(self, vpath: str) -> 'Root': + cherrypy.request.path = '' + return self + + def __init__(self, mgr: "CephadmOrchestrator", + port: int = 0, + host: str = '', + ssl_ca_cert: str = '', + ssl_priv_key: str = ''): + self.mgr = mgr + super().__init__() + self.socket_port = port + self._socket_host = host + self.ssl_certificate = ssl_ca_cert + self.ssl_private_key = ssl_priv_key + self.subscribe() + + @cherrypy.expose + def index(self) -> str: + return ''' + +Cephadm HTTP Endpoint + +

Cephadm Service Discovery Endpoints

+

mgr/Prometheus http sd-config

+

Alertmanager http sd-config

+

Node exporter http sd-config

+

HAProxy http sd-config

+

Prometheus rules

+ +''' + + @cherrypy.expose + @cherrypy.tools.json_out() + def get_sd_config(self, service: str) -> List[Dict[str, Collection[str]]]: + """Return compatible prometheus config for the specified service.""" + if service == 'mgr-prometheus': + return self.prometheus_sd_config() + elif service == 'alertmanager': + return self.alertmgr_sd_config() + elif service == 'node-exporter': + return self.node_exporter_sd_config() + elif service == 'haproxy': + return self.haproxy_sd_config() + else: + return [] + + def prometheus_sd_config(self) -> List[Dict[str, Collection[str]]]: + """Return compatible prometheus config for prometheus service.""" + servers = self.mgr.list_servers() + targets = [] + for server in servers: + hostname = server.get('hostname', '') + for service in cast(List[ServiceInfoT], server.get('services', [])): + if service['type'] != 'mgr': + continue + port = self.mgr.get_module_option_ex('prometheus', 'server_port', 9283) + targets.append(f'{hostname}:{port}') + return [{"targets": targets, "labels": {}}] + + def alertmgr_sd_config(self) -> List[Dict[str, Collection[str]]]: + """Return compatible prometheus config for mgr alertmanager service.""" + srv_entries = [] + for dd in self.mgr.cache.get_daemons_by_service('alertmanager'): + assert dd.hostname is not None + addr = dd.ip if dd.ip else self.mgr.inventory.get_addr(dd.hostname) + port = dd.ports[0] if dd.ports else 9093 + srv_entries.append('{}'.format(build_url(host=addr, port=port).lstrip('/'))) + return [{"targets": srv_entries, "labels": {}}] + + def node_exporter_sd_config(self) -> List[Dict[str, Collection[str]]]: + """Return compatible prometheus config for node-exporter service.""" + srv_entries = [] + for dd in self.mgr.cache.get_daemons_by_service('node-exporter'): + assert dd.hostname is not None + addr = dd.ip if dd.ip else self.mgr.inventory.get_addr(dd.hostname) + port = dd.ports[0] if dd.ports else 9100 + srv_entries.append({ + 'targets': [build_url(host=addr, port=port).lstrip('/')], + 'labels': {'instance': dd.hostname} + }) + return srv_entries + + def haproxy_sd_config(self) -> List[Dict[str, Collection[str]]]: + """Return compatible prometheus config for haproxy service.""" + srv_entries = [] + for dd in self.mgr.cache.get_daemons_by_type('ingress'): + if dd.service_name() in self.mgr.spec_store: + spec = cast(IngressSpec, self.mgr.spec_store[dd.service_name()].spec) + assert dd.hostname is not None + if dd.daemon_type == 'haproxy': + addr = self.mgr.inventory.get_addr(dd.hostname) + srv_entries.append({ + 'targets': [f"{build_url(host=addr, port=spec.monitor_port).lstrip('/')}"], + 'labels': {'instance': dd.service_name()} + }) + return srv_entries + + @cherrypy.expose(alias='prometheus/rules') + def get_prometheus_rules(self) -> str: + """Return currently configured prometheus rules as Yaml.""" + cherrypy.response.headers['Content-Type'] = 'text/plain' + with open(self.mgr.prometheus_alerts_path, 'r', encoding='utf-8') as f: + return f.read() diff --git a/src/pybind/mgr/cephadm/services/cephadmservice.py b/src/pybind/mgr/cephadm/services/cephadmservice.py index 8028b27c661..e2cd62aa7cb 100644 --- a/src/pybind/mgr/cephadm/services/cephadmservice.py +++ b/src/pybind/mgr/cephadm/services/cephadmservice.py @@ -1049,7 +1049,7 @@ class CephadmAgent(CephService): assert self.TYPE == daemon_spec.daemon_type daemon_id, host = daemon_spec.daemon_id, daemon_spec.host - if not self.mgr.cherrypy_thread: + if not self.mgr.http_server.agent: raise OrchestratorError('Cannot deploy agent before creating cephadm endpoint') keyring = self.get_keyring_with_caps(self.get_auth_entity(daemon_id, host=host), []) @@ -1061,31 +1061,31 @@ class CephadmAgent(CephService): return daemon_spec def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]: + agent = self.mgr.http_server.agent try: - assert self.mgr.cherrypy_thread - assert self.mgr.cherrypy_thread.ssl_certs.get_root_cert() - assert self.mgr.cherrypy_thread.server_port + assert agent + assert agent.ssl_certs.get_root_cert() + assert agent.server_port except Exception: raise OrchestratorError( 'Cannot deploy agent daemons until cephadm endpoint has finished generating certs') cfg = {'target_ip': self.mgr.get_mgr_ip(), - 'target_port': self.mgr.cherrypy_thread.server_port, + 'target_port': agent.server_port, 'refresh_period': self.mgr.agent_refresh_rate, 'listener_port': self.mgr.agent_starting_port, 'host': daemon_spec.host, 'device_enhanced_scan': str(self.mgr.device_enhanced_scan)} - listener_cert, listener_key = self.mgr.cherrypy_thread.ssl_certs.generate_cert( - self.mgr.inventory.get_addr(daemon_spec.host)) + listener_cert, listener_key = agent.ssl_certs.generate_cert(self.mgr.inventory.get_addr(daemon_spec.host)) config = { 'agent.json': json.dumps(cfg), 'keyring': daemon_spec.keyring, - 'root_cert.pem': self.mgr.cherrypy_thread.ssl_certs.get_root_cert(), + 'root_cert.pem': agent.ssl_certs.get_root_cert(), 'listener.crt': listener_cert, 'listener.key': listener_key, } - return config, sorted([str(self.mgr.get_mgr_ip()), str(self.mgr.cherrypy_thread.server_port), - self.mgr.cherrypy_thread.ssl_certs.get_root_cert(), + return config, sorted([str(self.mgr.get_mgr_ip()), str(agent.server_port), + agent.ssl_certs.get_root_cert(), str(self.mgr.get_module_option('device_enhanced_scan'))]) diff --git a/src/pybind/mgr/cephadm/services/monitoring.py b/src/pybind/mgr/cephadm/services/monitoring.py index d07c67bd5b9..b5c2276d6c4 100644 --- a/src/pybind/mgr/cephadm/services/monitoring.py +++ b/src/pybind/mgr/cephadm/services/monitoring.py @@ -11,7 +11,6 @@ from mgr_module import HandleCommandResult from orchestrator import DaemonDescription from ceph.deployment.service_spec import AlertManagerSpec, GrafanaSpec, ServiceSpec, SNMPGatewaySpec from cephadm.services.cephadmservice import CephadmService, CephadmDaemonDeploySpec -from cephadm.services.ingress import IngressSpec from mgr_util import verify_tls, ServerConfigException, create_self_signed_cert, build_url logger = logging.getLogger(__name__) @@ -271,86 +270,42 @@ class PrometheusService(CephadmService): self, daemon_spec: CephadmDaemonDeploySpec, ) -> Tuple[Dict[str, Any], List[str]]: + assert self.TYPE == daemon_spec.daemon_type - deps = [] # type: List[str] - # scrape mgrs - mgr_scrape_list = [] - mgr_map = self.mgr.get('mgr_map') - port = cast(int, self.mgr.get_module_option_ex( - 'prometheus', 'server_port', self.DEFAULT_MGR_PROMETHEUS_PORT)) - deps.append(str(port)) - t = mgr_map.get('services', {}).get('prometheus', None) + t = self.mgr.get('mgr_map').get('services', {}).get('prometheus', None) + sd_port = self.mgr.service_discovery_port + srv_end_point = '' if t: p_result = urlparse(t) # urlparse .hostname removes '[]' from the hostname in case # of ipv6 addresses so if this is the case then we just # append the brackets when building the final scrape endpoint if '[' in p_result.netloc and ']' in p_result.netloc: - mgr_scrape_list.append(f"[{p_result.hostname}]:{port}") + srv_end_point = f'https://[{p_result.hostname}]:{sd_port}/sd/prometheus/sd-config?' else: - mgr_scrape_list.append(f"{p_result.hostname}:{port}") - # scan all mgrs to generate deps and to get standbys too. - # assume that they are all on the same port as the active mgr. - for dd in self.mgr.cache.get_daemons_by_service('mgr'): - # we consider the mgr a dep even if the prometheus module is - # disabled in order to be consistent with _calc_daemon_deps(). - deps.append(dd.name()) - if not port: - continue - if dd.daemon_id == self.mgr.get_mgr_id(): - continue - assert dd.hostname is not None - addr = self._inventory_get_fqdn(dd.hostname) - mgr_scrape_list.append(build_url(host=addr, port=port).lstrip('/')) + srv_end_point = f'https://{p_result.hostname}:{sd_port}/sd/prometheus/sd-config?' - # scrape node exporters - nodes = [] - for dd in self.mgr.cache.get_daemons_by_service('node-exporter'): - assert dd.hostname is not None - deps.append(dd.name()) - addr = dd.ip if dd.ip else self._inventory_get_fqdn(dd.hostname) - port = dd.ports[0] if dd.ports else 9100 - nodes.append({ - 'hostname': dd.hostname, - 'url': build_url(host=addr, port=port).lstrip('/') - }) - - # scrape alert managers - alertmgr_targets = [] - for dd in self.mgr.cache.get_daemons_by_service('alertmanager'): - assert dd.hostname is not None - deps.append(dd.name()) - addr = dd.ip if dd.ip else self._inventory_get_fqdn(dd.hostname) - port = dd.ports[0] if dd.ports else 9093 - alertmgr_targets.append("'{}'".format(build_url(host=addr, port=port).lstrip('/'))) - - # scrape haproxies - haproxy_targets = [] - for dd in self.mgr.cache.get_daemons_by_type('ingress'): - if dd.service_name() in self.mgr.spec_store: - spec = cast(IngressSpec, self.mgr.spec_store[dd.service_name()].spec) - assert dd.hostname is not None - deps.append(dd.name()) - if dd.daemon_type == 'haproxy': - addr = self._inventory_get_fqdn(dd.hostname) - haproxy_targets.append({ - "url": f"'{build_url(host=addr, port=spec.monitor_port).lstrip('/')}'", - "service": dd.service_name(), - }) + node_exporter_cnt = len(self.mgr.cache.get_daemons_by_service('node-exporter')) + alertmgr_cnt = len(self.mgr.cache.get_daemons_by_service('alertmanager')) + haproxy_cnt = len(self.mgr.cache.get_daemons_by_type('ingress')) + node_exporter_sd_url = f'{srv_end_point}service=node-exporter' if node_exporter_cnt > 0 else None + alertmanager_sd_url = f'{srv_end_point}service=alertmanager' if alertmgr_cnt > 0 else None + haproxy_sd_url = f'{srv_end_point}service=haproxy' if haproxy_cnt > 0 else None + mgr_prometheus_sd_url = f'{srv_end_point}service=mgr-prometheus' # always included # generate the prometheus configuration context = { - 'alertmgr_targets': alertmgr_targets, - 'mgr_scrape_list': mgr_scrape_list, - 'haproxy_targets': haproxy_targets, - 'nodes': nodes, + 'mgr_prometheus_sd_url': mgr_prometheus_sd_url, + 'node_exporter_sd_url': node_exporter_sd_url, + 'alertmanager_sd_url': alertmanager_sd_url, + 'haproxy_sd_url': haproxy_sd_url, } + r = { 'files': { - 'prometheus.yml': - self.mgr.template.render( - 'services/prometheus/prometheus.yml.j2', context) + 'prometheus.yml': self.mgr.template.render('services/prometheus/prometheus.yml.j2', context), + 'root_cert.pem': self.mgr.http_server.service_discovery.ssl_certs.get_root_cert() } } @@ -360,7 +315,19 @@ class PrometheusService(CephadmService): alerts = f.read() r['files']['/etc/prometheus/alerting/ceph_alerts.yml'] = alerts - return r, sorted(deps) + return r, sorted(self.calculate_deps()) + + def calculate_deps(self) -> List[str]: + deps = [] # type: List[str] + port = cast(int, self.mgr.get_module_option_ex( + 'prometheus', 'server_port', self.DEFAULT_MGR_PROMETHEUS_PORT)) + deps.append(str(port)) + # add an explicit dependency on the active manager. This will force to + # re-deploy prometheus if the mgr has changed (due to a fail-over i.e). + deps.append(self.mgr.get_active_mgr().name()) + deps += [s for s in ['node-exporter', 'alertmanager', 'ingress'] + if self.mgr.cache.get_daemons_by_service(s)] + return deps def get_active_daemon(self, daemon_descrs: List[DaemonDescription]) -> DaemonDescription: # TODO: if there are multiple daemons, who is the active one? diff --git a/src/pybind/mgr/cephadm/ssl_cert_utils.py b/src/pybind/mgr/cephadm/ssl_cert_utils.py new file mode 100644 index 00000000000..b18d185facd --- /dev/null +++ b/src/pybind/mgr/cephadm/ssl_cert_utils.py @@ -0,0 +1,123 @@ + +from typing import Any, Tuple +import ipaddress + +from datetime import datetime, timedelta +from cryptography import x509 +from cryptography.x509.oid import NameOID +from cryptography.hazmat.primitives.asymmetric import rsa +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.backends import default_backend + +from orchestrator import OrchestratorError + + +class SSLCerts: + def __init__(self) -> None: + self.root_cert: Any + self.root_key: Any + + def generate_root_cert(self, host: str) -> Tuple[str, str]: + self.root_key = rsa.generate_private_key( + public_exponent=65537, key_size=4096, backend=default_backend()) + root_public_key = self.root_key.public_key() + root_builder = x509.CertificateBuilder() + root_builder = root_builder.subject_name(x509.Name([ + x509.NameAttribute(NameOID.COMMON_NAME, u'cephadm-root'), + ])) + root_builder = root_builder.issuer_name(x509.Name([ + x509.NameAttribute(NameOID.COMMON_NAME, u'cephadm-root'), + ])) + root_builder = root_builder.not_valid_before(datetime.now()) + root_builder = root_builder.not_valid_after(datetime.now() + timedelta(days=(365 * 10 + 3))) + root_builder = root_builder.serial_number(x509.random_serial_number()) + root_builder = root_builder.public_key(root_public_key) + root_builder = root_builder.add_extension( + x509.SubjectAlternativeName( + [x509.IPAddress(ipaddress.IPv4Address(host))] + ), + critical=False + ) + root_builder = root_builder.add_extension( + x509.BasicConstraints(ca=True, path_length=None), critical=True, + ) + + self.root_cert = root_builder.sign( + private_key=self.root_key, algorithm=hashes.SHA256(), backend=default_backend() + ) + + cert_str = self.root_cert.public_bytes(encoding=serialization.Encoding.PEM).decode('utf-8') + key_str = self.root_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=serialization.NoEncryption() + ).decode('utf-8') + + return (cert_str, key_str) + + def generate_cert(self, addr: str) -> Tuple[str, str]: + have_ip = True + try: + ip = x509.IPAddress(ipaddress.IPv4Address(addr)) + except Exception: + try: + ip = x509.IPAddress(ipaddress.IPv6Address(addr)) + except Exception: + have_ip = False + + private_key = rsa.generate_private_key( + public_exponent=65537, key_size=4096, backend=default_backend()) + public_key = private_key.public_key() + + builder = x509.CertificateBuilder() + builder = builder.subject_name(x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, addr), ])) + builder = builder.issuer_name( + x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, u'cephadm-root'), ])) + builder = builder.not_valid_before(datetime.now()) + builder = builder.not_valid_after(datetime.now() + timedelta(days=(365 * 10 + 3))) + builder = builder.serial_number(x509.random_serial_number()) + builder = builder.public_key(public_key) + if have_ip: + builder = builder.add_extension( + x509.SubjectAlternativeName( + [ip] + ), + critical=False + ) + builder = builder.add_extension(x509.BasicConstraints( + ca=False, path_length=None), critical=True,) + + cert = builder.sign(private_key=self.root_key, + algorithm=hashes.SHA256(), backend=default_backend()) + cert_str = cert.public_bytes(encoding=serialization.Encoding.PEM).decode('utf-8') + key_str = private_key.private_bytes(encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=serialization.NoEncryption() + ).decode('utf-8') + + return (cert_str, key_str) + + def get_root_cert(self) -> str: + try: + return self.root_cert.public_bytes(encoding=serialization.Encoding.PEM).decode('utf-8') + except AttributeError: + return '' + + def get_root_key(self) -> str: + try: + return self.root_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=serialization.NoEncryption(), + ).decode('utf-8') + except AttributeError: + return '' + + def load_root_credentials(self, cert: str, priv_key: str) -> None: + given_cert = x509.load_pem_x509_certificate(cert.encode('utf-8'), backend=default_backend()) + tz = given_cert.not_valid_after.tzinfo + if datetime.now(tz) >= given_cert.not_valid_after: + raise OrchestratorError('Given cert is expired') + self.root_cert = given_cert + self.root_key = serialization.load_pem_private_key( + data=priv_key.encode('utf-8'), backend=default_backend(), password=None) diff --git a/src/pybind/mgr/cephadm/templates/services/prometheus/prometheus.yml.j2 b/src/pybind/mgr/cephadm/templates/services/prometheus/prometheus.yml.j2 index bb0a8fcae51..d442c5c4e43 100644 --- a/src/pybind/mgr/cephadm/templates/services/prometheus/prometheus.yml.j2 +++ b/src/pybind/mgr/cephadm/templates/services/prometheus/prometheus.yml.j2 @@ -4,38 +4,37 @@ global: evaluation_interval: 10s rule_files: - /etc/prometheus/alerting/* -{% if alertmgr_targets %} + +{% if alertmanager_sd_url %} alerting: alertmanagers: - scheme: http - static_configs: - - targets: [{{ alertmgr_targets|join(', ') }}] + http_sd_configs: + - url: {{ alertmanager_sd_url }} + tls_config: + ca_file: root_cert.pem {% endif %} + scrape_configs: - job_name: 'ceph' honor_labels: true - static_configs: - - targets: -{% for mgr in mgr_scrape_list %} - - '{{ mgr }}' -{% endfor %} + http_sd_configs: + - url: {{ mgr_prometheus_sd_url }} + tls_config: + ca_file: root_cert.pem -{% if nodes %} +{% if node_exporter_sd_url %} - job_name: 'node' - static_configs: -{% for node in nodes %} - - targets: ['{{ node.url }}'] - labels: - instance: '{{ node.hostname }}' -{% endfor %} + http_sd_configs: + - url: {{ node_exporter_sd_url }} + tls_config: + ca_file: root_cert.pem {% endif %} -{% if haproxy_targets %} +{% if haproxy_sd_url %} - job_name: 'haproxy' - static_configs: -{% for haproxy in haproxy_targets %} - - targets: [{{ haproxy.url }}] - labels: - instance: '{{ haproxy.service }}' -{% endfor %} + http_sd_configs: + - url: {{ haproxy_sd_url }} + tls_config: + ca_file: root_cert.pem {% endif %} diff --git a/src/pybind/mgr/cephadm/tests/fixtures.py b/src/pybind/mgr/cephadm/tests/fixtures.py index 0567f7f7e68..c2e7a6ed8d2 100644 --- a/src/pybind/mgr/cephadm/tests/fixtures.py +++ b/src/pybind/mgr/cephadm/tests/fixtures.py @@ -98,9 +98,9 @@ def with_cephadm_module(module_options=None, store=None): mock.patch("cephadm.agent.CephadmAgentHelpers._request_agent_acks"), \ mock.patch("cephadm.agent.CephadmAgentHelpers._apply_agent", return_value=False), \ mock.patch("cephadm.agent.CephadmAgentHelpers._agent_down", return_value=False), \ - mock.patch('cephadm.agent.CherryPyThread.run'), \ mock.patch('cephadm.offline_watcher.OfflineHostWatcher.run'), \ - mock.patch('cephadm.tuned_profiles.TunedProfileUtils._remove_stray_tuned_profiles'): + mock.patch('cephadm.tuned_profiles.TunedProfileUtils._remove_stray_tuned_profiles'),\ + mock.patch('cephadm.offline_watcher.OfflineHostWatcher.run'): m = CephadmOrchestrator.__new__(CephadmOrchestrator) if module_options is not None: diff --git a/src/pybind/mgr/cephadm/tests/test_agent.py b/src/pybind/mgr/cephadm/tests/test_service_discovery.py similarity index 98% rename from src/pybind/mgr/cephadm/tests/test_agent.py rename to src/pybind/mgr/cephadm/tests/test_service_discovery.py index a4b1dc1b243..870b4341f5a 100644 --- a/src/pybind/mgr/cephadm/tests/test_agent.py +++ b/src/pybind/mgr/cephadm/tests/test_service_discovery.py @@ -1,5 +1,5 @@ from unittest.mock import MagicMock -from cephadm.agent import Root +from cephadm.service_discovery import Root class FakeDaemonDescription: @@ -88,7 +88,7 @@ class FakeMgr: return "9283" -class TestCephadmService: +class TestServiceDiscovery: def test_get_sd_config_prometheus(self): mgr = FakeMgr() diff --git a/src/pybind/mgr/cephadm/tests/test_services.py b/src/pybind/mgr/cephadm/tests/test_services.py index 334afe167ec..b9c1ee9b606 100644 --- a/src/pybind/mgr/cephadm/tests/test_services.py +++ b/src/pybind/mgr/cephadm/tests/test_services.py @@ -401,18 +401,21 @@ class TestMonitoring: evaluation_interval: 10s rule_files: - /etc/prometheus/alerting/* + + scrape_configs: - job_name: 'ceph' honor_labels: true - static_configs: - - targets: - - '[::1]:9283' + http_sd_configs: + - url: https://[::1]:8765/sd/prometheus/sd-config?service=mgr-prometheus + tls_config: + ca_file: root_cert.pem - job_name: 'node' - static_configs: - - targets: ['[1::4]:9100'] - labels: - instance: 'test' + http_sd_configs: + - url: https://[::1]:8765/sd/prometheus/sd-config?service=node-exporter + tls_config: + ca_file: root_cert.pem """).lstrip() @@ -427,7 +430,7 @@ class TestMonitoring: '--config-json', '-', '--tcp-ports', '9095' ], - stdin=json.dumps({"files": {"prometheus.yml": y}}), + stdin=json.dumps({"files": {"prometheus.yml": y, "root_cert.pem": ''}}), image='') @patch("cephadm.serve.CephadmServe._run_cephadm") diff --git a/src/pybind/mgr/orchestrator/_interface.py b/src/pybind/mgr/orchestrator/_interface.py index 2a3e9af124e..4f7a961af47 100644 --- a/src/pybind/mgr/orchestrator/_interface.py +++ b/src/pybind/mgr/orchestrator/_interface.py @@ -437,6 +437,14 @@ class Orchestrator(object): """ raise NotImplementedError() + def service_discovery_dump_cert(self) -> OrchResult: + """ + Returns service discovery server root certificate + + :return: service discovery root certificate + """ + raise NotImplementedError() + def describe_service(self, service_type: Optional[str] = None, service_name: Optional[str] = None, refresh: bool = False) -> OrchResult[List['ServiceDescription']]: """ Describe a service (of any kind) that is already configured in diff --git a/src/pybind/mgr/orchestrator/module.py b/src/pybind/mgr/orchestrator/module.py index b3c2fcc7918..a11d87cb08c 100644 --- a/src/pybind/mgr/orchestrator/module.py +++ b/src/pybind/mgr/orchestrator/module.py @@ -573,6 +573,15 @@ class OrchestratorCli(OrchestratorClientMixin, MgrModule, raise_if_exception(completion) return HandleCommandResult(stdout=completion.result_str()) + @_cli_write_command('orch sd dump cert') + def _service_discovery_dump_cert(self) -> HandleCommandResult: + """ + Returns service discovery server root certificate + """ + completion = self.service_discovery_dump_cert() + raise_if_exception(completion) + return HandleCommandResult(stdout=completion.result_str()) + @_cli_read_command('orch ls') def _list_services(self, service_type: Optional[str] = None, -- 2.39.5