ceph orch apply prometheus
- or
+ or
.. prompt:: bash #
-
+
ceph orch apply prometheus --placement 'count:2'
#. Deploy grafana:
Internally, cephadm already uses `Jinja2
<https://jinja.palletsprojects.com/en/2.11.x/>`_ templates to generate the
-configuration files for all monitoring components. To be able to customize the
-configuration of Prometheus, Grafana or the Alertmanager it is possible to store
-a Jinja2 template for each service that will be used for configuration
-generation instead. This template will be evaluated every time a service of that
-kind is deployed or reconfigured. That way, the custom configuration is
-preserved and automatically applied on future deployments of these services.
+configuration files for all monitoring components. Starting from version x.x.x,
+cephadm uses Prometheus http service discovery support `http_sd_config
+<https://prometheus.io/docs/prometheus/2.28/configuration/configuration/#http_sd_config>`
+in order to get the currently configured targets from Ceph. Internally, `ceph-mgr`
+provides a service discovery endpoint at `<https://<mgr-ip>:8765/sd/` (port is
+configurable through the variable `service_discovery_port`) which is used by
+Prometheus to get the needed targets.
+
+Customers with external monitoring stack can use `ceph-mgr` service discovery endpoint
+to get scraping configuration. Root certificate of the server can be obtained by the
+following command:
+
+ .. prompt:: bash #
+
+ ceph orch sd dump cert
+
+The configuration of Prometheus, Grafana, or Alertmanager may be customized by storing
+a Jinja2 template for each service. This template will be evaluated every time a service
+of that kind is deployed or reconfigured. That way, the custom configuration is preserved
+and automatically applied on future deployments of these services.
.. note::
By default, ceph-mgr presents prometheus metrics on port 9283 on each host
running a ceph-mgr daemon. Configure prometheus to scrape these.
+To make this integration easier, Ceph provides by means of `ceph-mgr` a service
+discovery endpoint at `<https://<mgr-ip>:8765/sd/` which can be used by an external
+Prometheus to retrieve targets information. Information reported by this EP used
+the format specified by `http_sd_config
+<https://prometheus.io/docs/prometheus/2.28/configuration/configuration/#http_sd_config>`
+
* To enable the dashboard's prometheus-based alerting, see :ref:`dashboard-alerting`.
* To enable dashboard integration with Grafana, see :ref:`dashboard-grafana`.
'port': self.listener_port})
data = data.encode('ascii')
- url = f'https://{self.target_ip}:{self.target_port}/data'
+ url = f'https://{self.target_ip}:{self.target_port}/data/'
try:
req = Request(url, data, {'Content-Type': 'application/json'})
send_time = time.monotonic()
# Note:the default port used by the Prometheus node exporter is opened in fw
ctx.meta_json = json.dumps({'service_name': 'mgr'})
deploy_daemon(ctx, fsid, 'mgr', mgr_id, mgr_c, uid, gid,
- config=config, keyring=mgr_keyring, ports=[9283])
+ config=config, keyring=mgr_keyring, ports=[9283, 8765])
# wait for the service to become available
logger.info('Waiting for mgr to start...')
-import cherrypy
-import ipaddress
+try:
+ import cherrypy
+ from cherrypy._cpserver import Server
+except ImportError:
+ # to avoid sphinx build crash
+ class Server: # type: ignore
+ pass
+
import json
import logging
import socket
import threading
import time
-from mgr_module import ServiceInfoT
-from mgr_util import verify_tls_files, build_url
+
+from mgr_util import verify_tls_files
from orchestrator import DaemonDescriptionStatus, OrchestratorError
from orchestrator._interface import daemon_type_to_service
from ceph.utils import datetime_now
from ceph.deployment.inventory import Devices
from ceph.deployment.service_spec import ServiceSpec, PlacementSpec
from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
-from cephadm.services.ingress import IngressSpec
-
-from datetime import datetime, timedelta
-from cryptography import x509
-from cryptography.x509.oid import NameOID
-from cryptography.hazmat.primitives.asymmetric import rsa
-from cryptography.hazmat.primitives import hashes, serialization
-from cryptography.hazmat.backends import default_backend
+from cephadm.ssl_cert_utils import SSLCerts
-from typing import Any, Dict, List, Set, Tuple, \
- TYPE_CHECKING, Optional, cast, Collection
+from typing import Any, Dict, List, Set, TYPE_CHECKING, Optional
if TYPE_CHECKING:
from cephadm.module import CephadmOrchestrator
cherrypy.log.access_log.propagate = False
-class CherryPyThread(threading.Thread):
+class AgentEndpoint:
+
+ KV_STORE_AGENT_ROOT_CERT = 'cephadm_agent/root/cert'
+ KV_STORE_AGENT_ROOT_KEY = 'cephadm_agent/root/key'
+
def __init__(self, mgr: "CephadmOrchestrator") -> None:
self.mgr = mgr
- self.cherrypy_shutdown_event = threading.Event()
- self.ssl_certs = SSLCerts(self.mgr)
+ self.ssl_certs = SSLCerts()
self.server_port = 7150
self.server_addr = self.mgr.get_mgr_ip()
- super(CherryPyThread, self).__init__(target=self.run)
-
- def configure_cherrypy(self) -> None:
- cherrypy.config.update({
- 'environment': 'production',
- 'server.socket_host': self.server_addr,
- 'server.socket_port': self.server_port,
- 'engine.autoreload.on': False,
- 'server.ssl_module': 'builtin',
- 'server.ssl_certificate': self.cert_tmp.name,
- 'server.ssl_private_key': self.key_tmp.name,
- })
+ self.host_data: Server = None
+
+ def configure_routes(self) -> None:
+
+ self.host_data = HostData(self.mgr,
+ self.server_port,
+ self.server_addr,
+ self.cert_file.name,
+ self.key_file.name)
# configure routes
- root = Root(self.mgr)
- host_data = HostData(self.mgr)
d = cherrypy.dispatch.RoutesDispatcher()
- d.connect(name='index', route='/', controller=root.index)
- d.connect(name='sd-config', route='/prometheus/sd-config', controller=root.get_sd_config)
- d.connect(name='rules', route='/prometheus/rules', controller=root.get_prometheus_rules)
- d.connect(name='host-data', route='/data', controller=host_data.POST,
+ d.connect(name='host-data', route='/',
+ controller=self.host_data.POST,
conditions=dict(method=['POST']))
- conf = {'/': {'request.dispatch': d}}
- cherrypy.tree.mount(None, "/", config=conf)
+ cherrypy.tree.mount(None, '/data', config={'/': {'request.dispatch': d}})
- def run(self) -> None:
+ def configure_tls(self) -> None:
try:
+ old_cert = self.mgr.get_store(self.KV_STORE_AGENT_ROOT_CERT)
+ old_key = self.mgr.get_store(self.KV_STORE_AGENT_ROOT_KEY)
+ if not old_key or not old_cert:
+ raise OrchestratorError('No old credentials for agent found')
+ self.ssl_certs.load_root_credentials(old_cert, old_key)
+ except (OrchestratorError, json.decoder.JSONDecodeError, KeyError, ValueError):
+ self.ssl_certs.generate_root_cert(self.mgr.get_mgr_ip())
+ self.mgr.set_store(self.KV_STORE_AGENT_ROOT_CERT, self.ssl_certs.get_root_cert())
+ self.mgr.set_store(self.KV_STORE_AGENT_ROOT_KEY, self.ssl_certs.get_root_key())
+
+ cert, key = self.ssl_certs.generate_cert(self.mgr.get_mgr_ip())
+ self.key_file = tempfile.NamedTemporaryFile()
+ self.key_file.write(key.encode('utf-8'))
+ self.key_file.flush() # pkey_tmp must not be gc'ed
+ self.cert_file = tempfile.NamedTemporaryFile()
+ self.cert_file.write(cert.encode('utf-8'))
+ self.cert_file.flush() # cert_tmp must not be gc'ed
+ verify_tls_files(self.cert_file.name, self.key_file.name)
+
+ def find_free_port(self) -> None:
+ max_port = self.server_port + 150
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ while self.server_port <= max_port:
try:
- old_creds = self.mgr.get_store('cephadm_endpoint_credentials')
- if not old_creds:
- raise OrchestratorError('No old credentials for cephadm endpoint found')
- old_creds_dict = json.loads(old_creds)
- old_key = old_creds_dict['key']
- old_cert = old_creds_dict['cert']
- self.ssl_certs.load_root_credentials(old_cert, old_key)
- except (OrchestratorError, json.decoder.JSONDecodeError, KeyError, ValueError):
- self.ssl_certs.generate_root_cert()
-
- cert, key = self.ssl_certs.generate_cert()
-
- self.key_tmp = tempfile.NamedTemporaryFile()
- self.key_tmp.write(key.encode('utf-8'))
- self.key_tmp.flush() # pkey_tmp must not be gc'ed
- key_fname = self.key_tmp.name
-
- self.cert_tmp = tempfile.NamedTemporaryFile()
- self.cert_tmp.write(cert.encode('utf-8'))
- self.cert_tmp.flush() # cert_tmp must not be gc'ed
- cert_fname = self.cert_tmp.name
-
- verify_tls_files(cert_fname, key_fname)
- self.configure_cherrypy()
-
- self.mgr.log.debug('Starting cherrypy engine...')
- self.start_engine()
- self.mgr.log.debug('Cherrypy engine started.')
- cephadm_endpoint_creds = {
- 'cert': self.ssl_certs.get_root_cert(),
- 'key': self.ssl_certs.get_root_key()
- }
- self.mgr.set_store('cephadm_endpoint_credentials', json.dumps(cephadm_endpoint_creds))
- self.mgr._kick_serve_loop()
- # wait for the shutdown event
- self.cherrypy_shutdown_event.wait()
- self.cherrypy_shutdown_event.clear()
- cherrypy.engine.stop()
- self.mgr.log.debug('Cherrypy engine stopped.')
- except Exception as e:
- self.mgr.log.error(f'Failed to run cephadm cherrypy endpoint: {e}')
-
- def start_engine(self) -> None:
- port_connect_attempts = 0
- while port_connect_attempts < 150:
- try:
- cherrypy.engine.start()
- self.mgr.log.debug(f'Cephadm endpoint connected to port {self.server_port}')
+ sock.bind((self.server_addr, self.server_port))
+ sock.close()
+ self.host_data.socket_port = self.server_port
+ self.mgr.log.debug(f'Cephadm agent endpoint using {self.server_port}')
return
- except cherrypy.process.wspbus.ChannelFailures as e:
- self.mgr.log.debug(
- f'{e}. Trying next port.')
+ except OSError:
self.server_port += 1
- cherrypy.server.httpserver = None
- cherrypy.config.update({
- 'server.socket_port': self.server_port
- })
- port_connect_attempts += 1
self.mgr.log.error(
- 'Cephadm Endpoint could not find free port in range 7150-7300 and failed to start')
+ 'Cephadm agent endpoint could not find free port in range 7150-7300 and failed to start')
- def shutdown(self) -> None:
- self.mgr.log.debug('Stopping cherrypy engine...')
- self.cherrypy_shutdown_event.set()
+ def configure(self) -> None:
+ self.configure_tls()
+ self.configure_routes()
+ self.find_free_port()
-class Root(object):
-
- # collapse everything to '/'
- def _cp_dispatch(self, vpath: str) -> 'Root':
- cherrypy.request.path = ''
- return self
-
- def __init__(self, mgr: "CephadmOrchestrator"):
- self.mgr = mgr
-
- @cherrypy.expose
- def index(self) -> str:
- return '''<!DOCTYPE html>
-<html>
-<head><title>Cephadm HTTP Endpoint</title></head>
-<body>
-<h2>Cephadm Service Discovery Endpoints</h2>
-<p><a href='prometheus/sd-config?service=mgr-prometheus'>mgr/Prometheus http sd-config</a></p>
-<p><a href='prometheus/sd-config?service=alertmanager'>Alertmanager http sd-config</a></p>
-<p><a href='prometheus/sd-config?service=node-exporter'>Node exporter http sd-config</a></p>
-<p><a href='prometheus/sd-config?service=haproxy'>HAProxy http sd-config</a></p>
-<p><a href='prometheus/rules'>Prometheus rules</a></p>
-</body>
-</html>'''
-
- @cherrypy.expose
- @cherrypy.tools.json_out()
- def get_sd_config(self, service: str) -> List[Dict[str, Collection[str]]]:
- """Return <http_sd_config> compatible prometheus config for the specified service."""
- if service == 'mgr-prometheus':
- return self.prometheus_sd_config()
- elif service == 'alertmanager':
- return self.alertmgr_sd_config()
- elif service == 'node-exporter':
- return self.node_exporter_sd_config()
- elif service == 'haproxy':
- return self.haproxy_sd_config()
- else:
- return []
-
- def prometheus_sd_config(self) -> List[Dict[str, Collection[str]]]:
- """Return <http_sd_config> compatible prometheus config for prometheus service."""
- servers = self.mgr.list_servers()
- targets = []
- for server in servers:
- hostname = server.get('hostname', '')
- for service in cast(List[ServiceInfoT], server.get('services', [])):
- if service['type'] != 'mgr':
- continue
- port = self.mgr.get_module_option_ex('prometheus', 'server_port', 9283)
- targets.append(f'{hostname}:{port}')
- return [{"targets": targets, "labels": {}}]
-
- def alertmgr_sd_config(self) -> List[Dict[str, Collection[str]]]:
- """Return <http_sd_config> compatible prometheus config for mgr alertmanager service."""
- srv_entries = []
- for dd in self.mgr.cache.get_daemons_by_service('alertmanager'):
- assert dd.hostname is not None
- addr = dd.ip if dd.ip else self.mgr.inventory.get_addr(dd.hostname)
- port = dd.ports[0] if dd.ports else 9093
- srv_entries.append('{}'.format(build_url(host=addr, port=port).lstrip('/')))
- return [{"targets": srv_entries, "labels": {}}]
-
- def node_exporter_sd_config(self) -> List[Dict[str, Collection[str]]]:
- """Return <http_sd_config> compatible prometheus config for node-exporter service."""
- srv_entries = []
- for dd in self.mgr.cache.get_daemons_by_service('node-exporter'):
- assert dd.hostname is not None
- addr = dd.ip if dd.ip else self.mgr.inventory.get_addr(dd.hostname)
- port = dd.ports[0] if dd.ports else 9100
- srv_entries.append({
- 'targets': [build_url(host=addr, port=port).lstrip('/')],
- 'labels': {'instance': dd.hostname}
- })
- return srv_entries
-
- def haproxy_sd_config(self) -> List[Dict[str, Collection[str]]]:
- """Return <http_sd_config> compatible prometheus config for haproxy service."""
- srv_entries = []
- for dd in self.mgr.cache.get_daemons_by_type('ingress'):
- if dd.service_name() in self.mgr.spec_store:
- spec = cast(IngressSpec, self.mgr.spec_store[dd.service_name()].spec)
- assert dd.hostname is not None
- if dd.daemon_type == 'haproxy':
- addr = self.mgr.inventory.get_addr(dd.hostname)
- srv_entries.append({
- 'targets': [f"{build_url(host=addr, port=spec.monitor_port).lstrip('/')}"],
- 'labels': {'instance': dd.service_name()}
- })
- return srv_entries
-
- @cherrypy.expose(alias='prometheus/rules')
- def get_prometheus_rules(self) -> str:
- """Return currently configured prometheus rules as Yaml."""
- cherrypy.response.headers['Content-Type'] = 'text/plain'
- with open(self.mgr.prometheus_alerts_path, 'r', encoding='utf-8') as f:
- return f.read()
-
-
-class HostData:
+class HostData(Server):
exposed = True
- def __init__(self, mgr: "CephadmOrchestrator"):
+ def __init__(self, mgr: "CephadmOrchestrator", port: int, host: str, ssl_ca_cert: str, ssl_priv_key: str):
self.mgr = mgr
+ super().__init__()
+ self.socket_port = port
+ self.ssl_certificate = ssl_ca_cert
+ self.ssl_private_key = ssl_priv_key
+ self._socket_host = host
+ self.subscribe()
@cherrypy.tools.json_in()
@cherrypy.tools.json_out()
class AgentMessageThread(threading.Thread):
def __init__(self, host: str, port: int, data: Dict[Any, Any], mgr: "CephadmOrchestrator", daemon_spec: Optional[CephadmDaemonDeploySpec] = None) -> None:
self.mgr = mgr
+ self.agent = mgr.http_server.agent
self.host = host
self.addr = self.mgr.inventory.get_addr(host) if host in self.mgr.inventory else host
self.port = port
self.data: str = json.dumps(data)
self.daemon_spec: Optional[CephadmDaemonDeploySpec] = daemon_spec
- super(AgentMessageThread, self).__init__(target=self.run)
+ super().__init__(target=self.run)
def run(self) -> None:
self.mgr.log.debug(f'Sending message to agent on host {self.host}')
self.mgr.agent_cache.sending_agent_message[self.host] = True
try:
- assert self.mgr.cherrypy_thread
- root_cert = self.mgr.cherrypy_thread.ssl_certs.get_root_cert()
+ assert self.agent
+ root_cert = self.agent.ssl_certs.get_root_cert()
root_cert_tmp = tempfile.NamedTemporaryFile()
root_cert_tmp.write(root_cert.encode('utf-8'))
root_cert_tmp.flush()
root_cert_fname = root_cert_tmp.name
- cert, key = self.mgr.cherrypy_thread.ssl_certs.generate_cert()
+ cert, key = self.agent.ssl_certs.generate_cert(self.mgr.get_mgr_ip())
cert_tmp = tempfile.NamedTemporaryFile()
cert_tmp.write(cert.encode('utf-8'))
class CephadmAgentHelpers:
def __init__(self, mgr: "CephadmOrchestrator"):
self.mgr: "CephadmOrchestrator" = mgr
+ self.agent = mgr.http_server.agent
def _request_agent_acks(self, hosts: Set[str], increment: bool = False, daemon_spec: Optional[CephadmDaemonDeploySpec] = None) -> None:
for host in hosts:
def _check_agent(self, host: str) -> bool:
down = False
try:
- assert self.mgr.cherrypy_thread
- assert self.mgr.cherrypy_thread.ssl_certs.get_root_cert()
+ assert self.agent
+ assert self.agent.ssl_certs.get_root_cert()
except Exception:
self.mgr.log.debug(
f'Delaying checking agent on {host} until cephadm endpoint finished creating root cert')
# so it's necessary to check this one specifically
root_cert_match = False
try:
- root_cert = self.mgr.cherrypy_thread.ssl_certs.get_root_cert()
+ root_cert = self.agent.ssl_certs.get_root_cert()
if last_deps and root_cert in last_deps:
root_cert_match = True
except Exception:
self.mgr.log.debug(
f'Agent on host {host} not ready to {action}: {e}')
return down
-
-
-class SSLCerts:
- def __init__(self, mgr: "CephadmOrchestrator") -> None:
- self.mgr = mgr
- self.root_cert: Any
- self.root_key: Any
-
- def generate_root_cert(self) -> Tuple[str, str]:
- self.root_key = rsa.generate_private_key(
- public_exponent=65537, key_size=4096, backend=default_backend())
- root_public_key = self.root_key.public_key()
-
- root_builder = x509.CertificateBuilder()
-
- root_builder = root_builder.subject_name(x509.Name([
- x509.NameAttribute(NameOID.COMMON_NAME, u'cephadm-root'),
- ]))
-
- root_builder = root_builder.issuer_name(x509.Name([
- x509.NameAttribute(NameOID.COMMON_NAME, u'cephadm-root'),
- ]))
-
- root_builder = root_builder.not_valid_before(datetime.now())
- root_builder = root_builder.not_valid_after(datetime.now() + timedelta(days=(365 * 10 + 3)))
- root_builder = root_builder.serial_number(x509.random_serial_number())
- root_builder = root_builder.public_key(root_public_key)
- root_builder = root_builder.add_extension(
- x509.SubjectAlternativeName(
- [x509.IPAddress(ipaddress.IPv4Address(str(self.mgr.get_mgr_ip())))]
- ),
- critical=False
- )
- root_builder = root_builder.add_extension(
- x509.BasicConstraints(ca=True, path_length=None), critical=True,
- )
-
- self.root_cert = root_builder.sign(
- private_key=self.root_key, algorithm=hashes.SHA256(), backend=default_backend()
- )
-
- cert_str = self.root_cert.public_bytes(encoding=serialization.Encoding.PEM).decode('utf-8')
- key_str = self.root_key.private_bytes(
- encoding=serialization.Encoding.PEM,
- format=serialization.PrivateFormat.TraditionalOpenSSL,
- encryption_algorithm=serialization.NoEncryption()
- ).decode('utf-8')
-
- return (cert_str, key_str)
-
- def generate_cert(self, addr: str = '') -> Tuple[str, str]:
- have_ip = True
- if addr:
- try:
- ip = x509.IPAddress(ipaddress.IPv4Address(addr))
- except Exception:
- try:
- ip = x509.IPAddress(ipaddress.IPv6Address(addr))
- except Exception:
- have_ip = False
- pass
- else:
- ip = x509.IPAddress(ipaddress.IPv4Address(self.mgr.get_mgr_ip()))
-
- private_key = rsa.generate_private_key(
- public_exponent=65537, key_size=4096, backend=default_backend())
- public_key = private_key.public_key()
-
- builder = x509.CertificateBuilder()
-
- builder = builder.subject_name(x509.Name([
- x509.NameAttribute(NameOID.COMMON_NAME, addr if addr else str(self.mgr.get_mgr_ip())),
- ]))
-
- builder = builder.issuer_name(x509.Name([
- x509.NameAttribute(NameOID.COMMON_NAME, u'cephadm-root'),
- ]))
-
- builder = builder.not_valid_before(datetime.now())
- builder = builder.not_valid_after(datetime.now() + timedelta(days=(365 * 10 + 3)))
- builder = builder.serial_number(x509.random_serial_number())
- builder = builder.public_key(public_key)
- if have_ip:
- builder = builder.add_extension(
- x509.SubjectAlternativeName(
- [ip]
- ),
- critical=False
- )
- builder = builder.add_extension(
- x509.BasicConstraints(ca=False, path_length=None), critical=True,
- )
-
- cert = builder.sign(
- private_key=self.root_key, algorithm=hashes.SHA256(), backend=default_backend()
- )
-
- cert_str = cert.public_bytes(encoding=serialization.Encoding.PEM).decode('utf-8')
- key_str = private_key.private_bytes(
- encoding=serialization.Encoding.PEM,
- format=serialization.PrivateFormat.TraditionalOpenSSL,
- encryption_algorithm=serialization.NoEncryption()
- ).decode('utf-8')
-
- return (cert_str, key_str)
-
- def get_root_cert(self) -> str:
- try:
- return self.root_cert.public_bytes(encoding=serialization.Encoding.PEM).decode('utf-8')
- except AttributeError:
- return ''
-
- def get_root_key(self) -> str:
- try:
- return self.root_key.private_bytes(
- encoding=serialization.Encoding.PEM,
- format=serialization.PrivateFormat.TraditionalOpenSSL,
- encryption_algorithm=serialization.NoEncryption(),
- ).decode('utf-8')
- except AttributeError:
- return ''
-
- def load_root_credentials(self, cert: str, priv_key: str) -> None:
- given_cert = x509.load_pem_x509_certificate(cert.encode('utf-8'), backend=default_backend())
- tz = given_cert.not_valid_after.tzinfo
- if datetime.now(tz) >= given_cert.not_valid_after:
- raise OrchestratorError('Given cert is expired')
- self.root_cert = given_cert
- self.root_key = serialization.load_pem_private_key(
- data=priv_key.encode('utf-8'), backend=default_backend(), password=None)
--- /dev/null
+import cherrypy
+import threading
+import logging
+from typing import TYPE_CHECKING
+
+from cephadm.agent import AgentEndpoint
+from cephadm.service_discovery import ServiceDiscovery
+
+
+if TYPE_CHECKING:
+ from cephadm.module import CephadmOrchestrator
+
+
+def cherrypy_filter(record: logging.LogRecord) -> int:
+ blocked = [
+ 'TLSV1_ALERT_DECRYPT_ERROR'
+ ]
+ msg = record.getMessage()
+ return not any([m for m in blocked if m in msg])
+
+
+logging.getLogger('cherrypy.error').addFilter(cherrypy_filter)
+cherrypy.log.access_log.propagate = False
+
+
+class CephadmHttpServer(threading.Thread):
+ def __init__(self, mgr: "CephadmOrchestrator") -> None:
+ self.mgr = mgr
+ self.agent = AgentEndpoint(mgr)
+ self.service_discovery = ServiceDiscovery(mgr)
+ self.cherrypy_shutdown_event = threading.Event()
+ super().__init__(target=self.run)
+
+ def configure_cherrypy(self) -> None:
+ cherrypy.config.update({
+ 'environment': 'production',
+ 'engine.autoreload.on': False,
+ })
+
+ def run(self) -> None:
+ try:
+ self.configure_cherrypy()
+ self.agent.configure()
+ self.service_discovery.configure()
+
+ self.mgr.log.debug('Starting cherrypy engine...')
+ cherrypy.server.unsubscribe() # disable default server
+ cherrypy.engine.start()
+ self.mgr.log.debug('Cherrypy engine started.')
+
+ self.mgr._kick_serve_loop()
+ # wait for the shutdown event
+ self.cherrypy_shutdown_event.wait()
+ self.cherrypy_shutdown_event.clear()
+ cherrypy.engine.stop()
+ cherrypy.server.httpserver = None
+ self.mgr.log.debug('Cherrypy engine stopped.')
+ except Exception as e:
+ self.mgr.log.error(f'Failed to run cephadm http server: {e}')
+
+ def shutdown(self) -> None:
+ self.mgr.log.debug('Stopping cherrypy engine...')
+ self.cherrypy_shutdown_event.set()
from tempfile import TemporaryDirectory, NamedTemporaryFile
from threading import Event
+from cephadm.service_discovery import ServiceDiscovery
+
import string
from typing import List, Dict, Optional, Callable, Tuple, TypeVar, \
Any, Set, TYPE_CHECKING, cast, NamedTuple, Sequence, Type, Awaitable
from ceph.utils import str_to_datetime, datetime_to_str, datetime_now
from cephadm.serve import CephadmServe
from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
-from cephadm.agent import CherryPyThread, CephadmAgentHelpers
+from cephadm.http_server import CephadmHttpServer
+from cephadm.agent import CephadmAgentHelpers
from mgr_module import MgrModule, HandleCommandResult, Option, NotifyType
default=10,
desc='max number of osds that will be drained simultaneously when osds are removed'
),
+ Option(
+ 'service_discovery_port',
+ type='int',
+ default=8765,
+ desc='cephadm service discovery port'
+ ),
]
def __init__(self, *args: Any, **kwargs: Any):
self.agent_refresh_rate = 0
self.agent_down_multiplier = 0.0
self.agent_starting_port = 0
+ self.service_discovery_port = 0
self.apply_spec_fails: List[Tuple[str, str]] = []
self.max_osd_draining_count = 10
self.device_enhanced_scan = False
self.config_checker = CephadmConfigChecks(self)
- self.cherrypy_thread = CherryPyThread(self)
- self.cherrypy_thread.start()
+ self.http_server = CephadmHttpServer(self)
+ self.http_server.start()
self.agent_helpers = CephadmAgentHelpers(self)
if self.use_agent:
self.agent_helpers._apply_agent()
self.log.debug('shutdown')
self._worker_pool.close()
self._worker_pool.join()
- self.cherrypy_thread.shutdown()
+ self.http_server.shutdown()
self.offline_watcher.shutdown()
self.run = False
self.event.set()
def daemon_is_self(self, daemon_type: str, daemon_id: str) -> bool:
return daemon_type == 'mgr' and daemon_id == self.get_mgr_id()
+ def get_active_mgr(self) -> DaemonDescription:
+ return self.mgr_service.get_active_daemon(self.cache.get_daemons_by_type('mgr'))
+
def get_active_mgr_digests(self) -> List[str]:
digests = self.mgr_service.get_active_daemon(
self.cache.get_daemons_by_type('mgr')).container_image_digests
root_cert = ''
server_port = ''
try:
- server_port = str(self.cherrypy_thread.server_port)
- root_cert = self.cherrypy_thread.ssl_certs.get_root_cert()
+ server_port = str(self.http_server.agent.server_port)
+ root_cert = self.http_server.agent.ssl_certs.get_root_cert()
except Exception:
pass
deps = sorted([self.get_mgr_ip(), server_port, root_cert,
str(self.device_enhanced_scan)])
elif daemon_type == 'iscsi':
deps = [self.get_mgr_ip()]
+
+ elif daemon_type == 'prometheus':
+ # for prometheus we add the active mgr as an explicit dependency,
+ # this way we force a redeploy after a mgr failover
+ deps.append(self.get_active_mgr().name())
+ deps.append(str(self.get_module_option_ex('prometheus', 'server_port', 9283)))
+ deps += [s for s in ['node-exporter', 'alertmanager', 'ingress']
+ if self.cache.get_daemons_by_service(s)]
else:
need = {
- 'prometheus': ['mgr', 'alertmanager', 'node-exporter', 'ingress'],
'grafana': ['prometheus', 'loki'],
'alertmanager': ['mgr', 'alertmanager', 'snmp-gateway'],
'promtail': ['loki'],
for dep_type in need.get(daemon_type, []):
for dd in self.cache.get_daemons_by_type(dep_type):
deps.append(dd.name())
- if daemon_type == 'prometheus':
- deps.append(str(self.get_module_option_ex('prometheus', 'server_port', 9283)))
return sorted(deps)
@forall_hosts
self._kick_serve_loop()
return f'Removed setting {setting} from tuned profile {profile_name}'
+ @handle_orch_error
+ def service_discovery_dump_cert(self) -> str:
+ root_cert = self.get_store(ServiceDiscovery.KV_STORE_SD_ROOT_CERT)
+ if not root_cert:
+ raise OrchestratorError('No certificate found for service discovery')
+ return root_cert
+
def set_health_warning(self, name: str, summary: str, count: int, detail: List[str]) -> None:
self.health_checks[name] = {
'severity': 'warning',
if service_type == 'agent':
try:
- assert self.mgr.cherrypy_thread
- assert self.mgr.cherrypy_thread.ssl_certs.get_root_cert()
+ assert self.mgr.http_server.agent
+ assert self.mgr.http_server.agent.ssl_certs.get_root_cert()
except Exception:
self.log.info(
'Delaying applying agent spec until cephadm endpoint root cert created')
--- /dev/null
+try:
+ import cherrypy
+ from cherrypy._cpserver import Server
+except ImportError:
+ # to avoid sphinx build crash
+ class Server: # type: ignore
+ pass
+
+import logging
+import tempfile
+from typing import Dict, List, TYPE_CHECKING, cast, Collection
+
+from orchestrator import OrchestratorError
+from mgr_module import ServiceInfoT
+from mgr_util import verify_tls_files, build_url
+from cephadm.services.ingress import IngressSpec
+from cephadm.ssl_cert_utils import SSLCerts
+
+if TYPE_CHECKING:
+ from cephadm.module import CephadmOrchestrator
+
+
+def cherrypy_filter(record: logging.LogRecord) -> int:
+ blocked = [
+ 'TLSV1_ALERT_DECRYPT_ERROR'
+ ]
+ msg = record.getMessage()
+ return not any([m for m in blocked if m in msg])
+
+
+logging.getLogger('cherrypy.error').addFilter(cherrypy_filter)
+cherrypy.log.access_log.propagate = False
+
+
+class ServiceDiscovery:
+
+ KV_STORE_SD_ROOT_CERT = 'service_discovery/root/cert'
+ KV_STORE_SD_ROOT_KEY = 'service_discovery/root/key'
+
+ def __init__(self, mgr: "CephadmOrchestrator") -> None:
+ self.mgr = mgr
+ self.ssl_certs = SSLCerts()
+ self.server_port = self.mgr.service_discovery_port
+ self.server_addr = '::'
+
+ def configure_routes(self) -> None:
+
+ root_server = Root(self.mgr,
+ self.server_port,
+ self.server_addr,
+ self.cert_file.name,
+ self.key_file.name)
+
+ # configure routes
+ d = cherrypy.dispatch.RoutesDispatcher()
+ d.connect(name='index', route='/', controller=root_server.index)
+ d.connect(name='index', route='/sd', controller=root_server.index)
+ d.connect(name='index', route='/sd/', controller=root_server.index)
+ d.connect(name='sd-config', route='/sd/prometheus/sd-config',
+ controller=root_server.get_sd_config)
+ d.connect(name='rules', route='/sd/prometheus/rules',
+ controller=root_server.get_prometheus_rules)
+ cherrypy.tree.mount(None, '/', config={'/': {'request.dispatch': d}})
+
+ def configure_tls(self) -> None:
+ try:
+ old_cert = self.mgr.get_store(self.KV_STORE_SD_ROOT_CERT)
+ old_key = self.mgr.get_store(self.KV_STORE_SD_ROOT_KEY)
+ if not old_key or not old_cert:
+ raise OrchestratorError('No old credentials for service discovery found')
+ self.ssl_certs.load_root_credentials(old_cert, old_key)
+ except (OrchestratorError, KeyError, ValueError):
+ self.ssl_certs.generate_root_cert(self.mgr.get_mgr_ip())
+ self.mgr.set_store(self.KV_STORE_SD_ROOT_CERT, self.ssl_certs.get_root_cert())
+ self.mgr.set_store(self.KV_STORE_SD_ROOT_KEY, self.ssl_certs.get_root_key())
+
+ cert, key = self.ssl_certs.generate_cert(self.mgr.get_mgr_ip())
+ self.key_file = tempfile.NamedTemporaryFile()
+ self.key_file.write(key.encode('utf-8'))
+ self.key_file.flush() # pkey_tmp must not be gc'ed
+ self.cert_file = tempfile.NamedTemporaryFile()
+ self.cert_file.write(cert.encode('utf-8'))
+ self.cert_file.flush() # cert_tmp must not be gc'ed
+ verify_tls_files(self.cert_file.name, self.key_file.name)
+
+ def configure(self) -> None:
+ self.configure_tls()
+ self.configure_routes()
+
+
+class Root(Server):
+
+ # collapse everything to '/'
+ def _cp_dispatch(self, vpath: str) -> 'Root':
+ cherrypy.request.path = ''
+ return self
+
+ def __init__(self, mgr: "CephadmOrchestrator",
+ port: int = 0,
+ host: str = '',
+ ssl_ca_cert: str = '',
+ ssl_priv_key: str = ''):
+ self.mgr = mgr
+ super().__init__()
+ self.socket_port = port
+ self._socket_host = host
+ self.ssl_certificate = ssl_ca_cert
+ self.ssl_private_key = ssl_priv_key
+ self.subscribe()
+
+ @cherrypy.expose
+ def index(self) -> str:
+ return '''<!DOCTYPE html>
+<html>
+<head><title>Cephadm HTTP Endpoint</title></head>
+<body>
+<h2>Cephadm Service Discovery Endpoints</h2>
+<p><a href='prometheus/sd-config?service=mgr-prometheus'>mgr/Prometheus http sd-config</a></p>
+<p><a href='prometheus/sd-config?service=alertmanager'>Alertmanager http sd-config</a></p>
+<p><a href='prometheus/sd-config?service=node-exporter'>Node exporter http sd-config</a></p>
+<p><a href='prometheus/sd-config?service=haproxy'>HAProxy http sd-config</a></p>
+<p><a href='prometheus/rules'>Prometheus rules</a></p>
+</body>
+</html>'''
+
+ @cherrypy.expose
+ @cherrypy.tools.json_out()
+ def get_sd_config(self, service: str) -> List[Dict[str, Collection[str]]]:
+ """Return <http_sd_config> compatible prometheus config for the specified service."""
+ if service == 'mgr-prometheus':
+ return self.prometheus_sd_config()
+ elif service == 'alertmanager':
+ return self.alertmgr_sd_config()
+ elif service == 'node-exporter':
+ return self.node_exporter_sd_config()
+ elif service == 'haproxy':
+ return self.haproxy_sd_config()
+ else:
+ return []
+
+ def prometheus_sd_config(self) -> List[Dict[str, Collection[str]]]:
+ """Return <http_sd_config> compatible prometheus config for prometheus service."""
+ servers = self.mgr.list_servers()
+ targets = []
+ for server in servers:
+ hostname = server.get('hostname', '')
+ for service in cast(List[ServiceInfoT], server.get('services', [])):
+ if service['type'] != 'mgr':
+ continue
+ port = self.mgr.get_module_option_ex('prometheus', 'server_port', 9283)
+ targets.append(f'{hostname}:{port}')
+ return [{"targets": targets, "labels": {}}]
+
+ def alertmgr_sd_config(self) -> List[Dict[str, Collection[str]]]:
+ """Return <http_sd_config> compatible prometheus config for mgr alertmanager service."""
+ srv_entries = []
+ for dd in self.mgr.cache.get_daemons_by_service('alertmanager'):
+ assert dd.hostname is not None
+ addr = dd.ip if dd.ip else self.mgr.inventory.get_addr(dd.hostname)
+ port = dd.ports[0] if dd.ports else 9093
+ srv_entries.append('{}'.format(build_url(host=addr, port=port).lstrip('/')))
+ return [{"targets": srv_entries, "labels": {}}]
+
+ def node_exporter_sd_config(self) -> List[Dict[str, Collection[str]]]:
+ """Return <http_sd_config> compatible prometheus config for node-exporter service."""
+ srv_entries = []
+ for dd in self.mgr.cache.get_daemons_by_service('node-exporter'):
+ assert dd.hostname is not None
+ addr = dd.ip if dd.ip else self.mgr.inventory.get_addr(dd.hostname)
+ port = dd.ports[0] if dd.ports else 9100
+ srv_entries.append({
+ 'targets': [build_url(host=addr, port=port).lstrip('/')],
+ 'labels': {'instance': dd.hostname}
+ })
+ return srv_entries
+
+ def haproxy_sd_config(self) -> List[Dict[str, Collection[str]]]:
+ """Return <http_sd_config> compatible prometheus config for haproxy service."""
+ srv_entries = []
+ for dd in self.mgr.cache.get_daemons_by_type('ingress'):
+ if dd.service_name() in self.mgr.spec_store:
+ spec = cast(IngressSpec, self.mgr.spec_store[dd.service_name()].spec)
+ assert dd.hostname is not None
+ if dd.daemon_type == 'haproxy':
+ addr = self.mgr.inventory.get_addr(dd.hostname)
+ srv_entries.append({
+ 'targets': [f"{build_url(host=addr, port=spec.monitor_port).lstrip('/')}"],
+ 'labels': {'instance': dd.service_name()}
+ })
+ return srv_entries
+
+ @cherrypy.expose(alias='prometheus/rules')
+ def get_prometheus_rules(self) -> str:
+ """Return currently configured prometheus rules as Yaml."""
+ cherrypy.response.headers['Content-Type'] = 'text/plain'
+ with open(self.mgr.prometheus_alerts_path, 'r', encoding='utf-8') as f:
+ return f.read()
assert self.TYPE == daemon_spec.daemon_type
daemon_id, host = daemon_spec.daemon_id, daemon_spec.host
- if not self.mgr.cherrypy_thread:
+ if not self.mgr.http_server.agent:
raise OrchestratorError('Cannot deploy agent before creating cephadm endpoint')
keyring = self.get_keyring_with_caps(self.get_auth_entity(daemon_id, host=host), [])
return daemon_spec
def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]:
+ agent = self.mgr.http_server.agent
try:
- assert self.mgr.cherrypy_thread
- assert self.mgr.cherrypy_thread.ssl_certs.get_root_cert()
- assert self.mgr.cherrypy_thread.server_port
+ assert agent
+ assert agent.ssl_certs.get_root_cert()
+ assert agent.server_port
except Exception:
raise OrchestratorError(
'Cannot deploy agent daemons until cephadm endpoint has finished generating certs')
cfg = {'target_ip': self.mgr.get_mgr_ip(),
- 'target_port': self.mgr.cherrypy_thread.server_port,
+ 'target_port': agent.server_port,
'refresh_period': self.mgr.agent_refresh_rate,
'listener_port': self.mgr.agent_starting_port,
'host': daemon_spec.host,
'device_enhanced_scan': str(self.mgr.device_enhanced_scan)}
- listener_cert, listener_key = self.mgr.cherrypy_thread.ssl_certs.generate_cert(
- self.mgr.inventory.get_addr(daemon_spec.host))
+ listener_cert, listener_key = agent.ssl_certs.generate_cert(self.mgr.inventory.get_addr(daemon_spec.host))
config = {
'agent.json': json.dumps(cfg),
'keyring': daemon_spec.keyring,
- 'root_cert.pem': self.mgr.cherrypy_thread.ssl_certs.get_root_cert(),
+ 'root_cert.pem': agent.ssl_certs.get_root_cert(),
'listener.crt': listener_cert,
'listener.key': listener_key,
}
- return config, sorted([str(self.mgr.get_mgr_ip()), str(self.mgr.cherrypy_thread.server_port),
- self.mgr.cherrypy_thread.ssl_certs.get_root_cert(),
+ return config, sorted([str(self.mgr.get_mgr_ip()), str(agent.server_port),
+ agent.ssl_certs.get_root_cert(),
str(self.mgr.get_module_option('device_enhanced_scan'))])
from orchestrator import DaemonDescription
from ceph.deployment.service_spec import AlertManagerSpec, GrafanaSpec, ServiceSpec, SNMPGatewaySpec
from cephadm.services.cephadmservice import CephadmService, CephadmDaemonDeploySpec
-from cephadm.services.ingress import IngressSpec
from mgr_util import verify_tls, ServerConfigException, create_self_signed_cert, build_url
logger = logging.getLogger(__name__)
self,
daemon_spec: CephadmDaemonDeploySpec,
) -> Tuple[Dict[str, Any], List[str]]:
+
assert self.TYPE == daemon_spec.daemon_type
- deps = [] # type: List[str]
- # scrape mgrs
- mgr_scrape_list = []
- mgr_map = self.mgr.get('mgr_map')
- port = cast(int, self.mgr.get_module_option_ex(
- 'prometheus', 'server_port', self.DEFAULT_MGR_PROMETHEUS_PORT))
- deps.append(str(port))
- t = mgr_map.get('services', {}).get('prometheus', None)
+ t = self.mgr.get('mgr_map').get('services', {}).get('prometheus', None)
+ sd_port = self.mgr.service_discovery_port
+ srv_end_point = ''
if t:
p_result = urlparse(t)
# urlparse .hostname removes '[]' from the hostname in case
# of ipv6 addresses so if this is the case then we just
# append the brackets when building the final scrape endpoint
if '[' in p_result.netloc and ']' in p_result.netloc:
- mgr_scrape_list.append(f"[{p_result.hostname}]:{port}")
+ srv_end_point = f'https://[{p_result.hostname}]:{sd_port}/sd/prometheus/sd-config?'
else:
- mgr_scrape_list.append(f"{p_result.hostname}:{port}")
- # scan all mgrs to generate deps and to get standbys too.
- # assume that they are all on the same port as the active mgr.
- for dd in self.mgr.cache.get_daemons_by_service('mgr'):
- # we consider the mgr a dep even if the prometheus module is
- # disabled in order to be consistent with _calc_daemon_deps().
- deps.append(dd.name())
- if not port:
- continue
- if dd.daemon_id == self.mgr.get_mgr_id():
- continue
- assert dd.hostname is not None
- addr = self._inventory_get_fqdn(dd.hostname)
- mgr_scrape_list.append(build_url(host=addr, port=port).lstrip('/'))
+ srv_end_point = f'https://{p_result.hostname}:{sd_port}/sd/prometheus/sd-config?'
- # scrape node exporters
- nodes = []
- for dd in self.mgr.cache.get_daemons_by_service('node-exporter'):
- assert dd.hostname is not None
- deps.append(dd.name())
- addr = dd.ip if dd.ip else self._inventory_get_fqdn(dd.hostname)
- port = dd.ports[0] if dd.ports else 9100
- nodes.append({
- 'hostname': dd.hostname,
- 'url': build_url(host=addr, port=port).lstrip('/')
- })
-
- # scrape alert managers
- alertmgr_targets = []
- for dd in self.mgr.cache.get_daemons_by_service('alertmanager'):
- assert dd.hostname is not None
- deps.append(dd.name())
- addr = dd.ip if dd.ip else self._inventory_get_fqdn(dd.hostname)
- port = dd.ports[0] if dd.ports else 9093
- alertmgr_targets.append("'{}'".format(build_url(host=addr, port=port).lstrip('/')))
-
- # scrape haproxies
- haproxy_targets = []
- for dd in self.mgr.cache.get_daemons_by_type('ingress'):
- if dd.service_name() in self.mgr.spec_store:
- spec = cast(IngressSpec, self.mgr.spec_store[dd.service_name()].spec)
- assert dd.hostname is not None
- deps.append(dd.name())
- if dd.daemon_type == 'haproxy':
- addr = self._inventory_get_fqdn(dd.hostname)
- haproxy_targets.append({
- "url": f"'{build_url(host=addr, port=spec.monitor_port).lstrip('/')}'",
- "service": dd.service_name(),
- })
+ node_exporter_cnt = len(self.mgr.cache.get_daemons_by_service('node-exporter'))
+ alertmgr_cnt = len(self.mgr.cache.get_daemons_by_service('alertmanager'))
+ haproxy_cnt = len(self.mgr.cache.get_daemons_by_type('ingress'))
+ node_exporter_sd_url = f'{srv_end_point}service=node-exporter' if node_exporter_cnt > 0 else None
+ alertmanager_sd_url = f'{srv_end_point}service=alertmanager' if alertmgr_cnt > 0 else None
+ haproxy_sd_url = f'{srv_end_point}service=haproxy' if haproxy_cnt > 0 else None
+ mgr_prometheus_sd_url = f'{srv_end_point}service=mgr-prometheus' # always included
# generate the prometheus configuration
context = {
- 'alertmgr_targets': alertmgr_targets,
- 'mgr_scrape_list': mgr_scrape_list,
- 'haproxy_targets': haproxy_targets,
- 'nodes': nodes,
+ 'mgr_prometheus_sd_url': mgr_prometheus_sd_url,
+ 'node_exporter_sd_url': node_exporter_sd_url,
+ 'alertmanager_sd_url': alertmanager_sd_url,
+ 'haproxy_sd_url': haproxy_sd_url,
}
+
r = {
'files': {
- 'prometheus.yml':
- self.mgr.template.render(
- 'services/prometheus/prometheus.yml.j2', context)
+ 'prometheus.yml': self.mgr.template.render('services/prometheus/prometheus.yml.j2', context),
+ 'root_cert.pem': self.mgr.http_server.service_discovery.ssl_certs.get_root_cert()
}
}
alerts = f.read()
r['files']['/etc/prometheus/alerting/ceph_alerts.yml'] = alerts
- return r, sorted(deps)
+ return r, sorted(self.calculate_deps())
+
+ def calculate_deps(self) -> List[str]:
+ deps = [] # type: List[str]
+ port = cast(int, self.mgr.get_module_option_ex(
+ 'prometheus', 'server_port', self.DEFAULT_MGR_PROMETHEUS_PORT))
+ deps.append(str(port))
+ # add an explicit dependency on the active manager. This will force to
+ # re-deploy prometheus if the mgr has changed (due to a fail-over i.e).
+ deps.append(self.mgr.get_active_mgr().name())
+ deps += [s for s in ['node-exporter', 'alertmanager', 'ingress']
+ if self.mgr.cache.get_daemons_by_service(s)]
+ return deps
def get_active_daemon(self, daemon_descrs: List[DaemonDescription]) -> DaemonDescription:
# TODO: if there are multiple daemons, who is the active one?
--- /dev/null
+
+from typing import Any, Tuple
+import ipaddress
+
+from datetime import datetime, timedelta
+from cryptography import x509
+from cryptography.x509.oid import NameOID
+from cryptography.hazmat.primitives.asymmetric import rsa
+from cryptography.hazmat.primitives import hashes, serialization
+from cryptography.hazmat.backends import default_backend
+
+from orchestrator import OrchestratorError
+
+
+class SSLCerts:
+ def __init__(self) -> None:
+ self.root_cert: Any
+ self.root_key: Any
+
+ def generate_root_cert(self, host: str) -> Tuple[str, str]:
+ self.root_key = rsa.generate_private_key(
+ public_exponent=65537, key_size=4096, backend=default_backend())
+ root_public_key = self.root_key.public_key()
+ root_builder = x509.CertificateBuilder()
+ root_builder = root_builder.subject_name(x509.Name([
+ x509.NameAttribute(NameOID.COMMON_NAME, u'cephadm-root'),
+ ]))
+ root_builder = root_builder.issuer_name(x509.Name([
+ x509.NameAttribute(NameOID.COMMON_NAME, u'cephadm-root'),
+ ]))
+ root_builder = root_builder.not_valid_before(datetime.now())
+ root_builder = root_builder.not_valid_after(datetime.now() + timedelta(days=(365 * 10 + 3)))
+ root_builder = root_builder.serial_number(x509.random_serial_number())
+ root_builder = root_builder.public_key(root_public_key)
+ root_builder = root_builder.add_extension(
+ x509.SubjectAlternativeName(
+ [x509.IPAddress(ipaddress.IPv4Address(host))]
+ ),
+ critical=False
+ )
+ root_builder = root_builder.add_extension(
+ x509.BasicConstraints(ca=True, path_length=None), critical=True,
+ )
+
+ self.root_cert = root_builder.sign(
+ private_key=self.root_key, algorithm=hashes.SHA256(), backend=default_backend()
+ )
+
+ cert_str = self.root_cert.public_bytes(encoding=serialization.Encoding.PEM).decode('utf-8')
+ key_str = self.root_key.private_bytes(
+ encoding=serialization.Encoding.PEM,
+ format=serialization.PrivateFormat.TraditionalOpenSSL,
+ encryption_algorithm=serialization.NoEncryption()
+ ).decode('utf-8')
+
+ return (cert_str, key_str)
+
+ def generate_cert(self, addr: str) -> Tuple[str, str]:
+ have_ip = True
+ try:
+ ip = x509.IPAddress(ipaddress.IPv4Address(addr))
+ except Exception:
+ try:
+ ip = x509.IPAddress(ipaddress.IPv6Address(addr))
+ except Exception:
+ have_ip = False
+
+ private_key = rsa.generate_private_key(
+ public_exponent=65537, key_size=4096, backend=default_backend())
+ public_key = private_key.public_key()
+
+ builder = x509.CertificateBuilder()
+ builder = builder.subject_name(x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, addr), ]))
+ builder = builder.issuer_name(
+ x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, u'cephadm-root'), ]))
+ builder = builder.not_valid_before(datetime.now())
+ builder = builder.not_valid_after(datetime.now() + timedelta(days=(365 * 10 + 3)))
+ builder = builder.serial_number(x509.random_serial_number())
+ builder = builder.public_key(public_key)
+ if have_ip:
+ builder = builder.add_extension(
+ x509.SubjectAlternativeName(
+ [ip]
+ ),
+ critical=False
+ )
+ builder = builder.add_extension(x509.BasicConstraints(
+ ca=False, path_length=None), critical=True,)
+
+ cert = builder.sign(private_key=self.root_key,
+ algorithm=hashes.SHA256(), backend=default_backend())
+ cert_str = cert.public_bytes(encoding=serialization.Encoding.PEM).decode('utf-8')
+ key_str = private_key.private_bytes(encoding=serialization.Encoding.PEM,
+ format=serialization.PrivateFormat.TraditionalOpenSSL,
+ encryption_algorithm=serialization.NoEncryption()
+ ).decode('utf-8')
+
+ return (cert_str, key_str)
+
+ def get_root_cert(self) -> str:
+ try:
+ return self.root_cert.public_bytes(encoding=serialization.Encoding.PEM).decode('utf-8')
+ except AttributeError:
+ return ''
+
+ def get_root_key(self) -> str:
+ try:
+ return self.root_key.private_bytes(
+ encoding=serialization.Encoding.PEM,
+ format=serialization.PrivateFormat.TraditionalOpenSSL,
+ encryption_algorithm=serialization.NoEncryption(),
+ ).decode('utf-8')
+ except AttributeError:
+ return ''
+
+ def load_root_credentials(self, cert: str, priv_key: str) -> None:
+ given_cert = x509.load_pem_x509_certificate(cert.encode('utf-8'), backend=default_backend())
+ tz = given_cert.not_valid_after.tzinfo
+ if datetime.now(tz) >= given_cert.not_valid_after:
+ raise OrchestratorError('Given cert is expired')
+ self.root_cert = given_cert
+ self.root_key = serialization.load_pem_private_key(
+ data=priv_key.encode('utf-8'), backend=default_backend(), password=None)
evaluation_interval: 10s
rule_files:
- /etc/prometheus/alerting/*
-{% if alertmgr_targets %}
+
+{% if alertmanager_sd_url %}
alerting:
alertmanagers:
- scheme: http
- static_configs:
- - targets: [{{ alertmgr_targets|join(', ') }}]
+ http_sd_configs:
+ - url: {{ alertmanager_sd_url }}
+ tls_config:
+ ca_file: root_cert.pem
{% endif %}
+
scrape_configs:
- job_name: 'ceph'
honor_labels: true
- static_configs:
- - targets:
-{% for mgr in mgr_scrape_list %}
- - '{{ mgr }}'
-{% endfor %}
+ http_sd_configs:
+ - url: {{ mgr_prometheus_sd_url }}
+ tls_config:
+ ca_file: root_cert.pem
-{% if nodes %}
+{% if node_exporter_sd_url %}
- job_name: 'node'
- static_configs:
-{% for node in nodes %}
- - targets: ['{{ node.url }}']
- labels:
- instance: '{{ node.hostname }}'
-{% endfor %}
+ http_sd_configs:
+ - url: {{ node_exporter_sd_url }}
+ tls_config:
+ ca_file: root_cert.pem
{% endif %}
-{% if haproxy_targets %}
+{% if haproxy_sd_url %}
- job_name: 'haproxy'
- static_configs:
-{% for haproxy in haproxy_targets %}
- - targets: [{{ haproxy.url }}]
- labels:
- instance: '{{ haproxy.service }}'
-{% endfor %}
+ http_sd_configs:
+ - url: {{ haproxy_sd_url }}
+ tls_config:
+ ca_file: root_cert.pem
{% endif %}
mock.patch("cephadm.agent.CephadmAgentHelpers._request_agent_acks"), \
mock.patch("cephadm.agent.CephadmAgentHelpers._apply_agent", return_value=False), \
mock.patch("cephadm.agent.CephadmAgentHelpers._agent_down", return_value=False), \
- mock.patch('cephadm.agent.CherryPyThread.run'), \
mock.patch('cephadm.offline_watcher.OfflineHostWatcher.run'), \
- mock.patch('cephadm.tuned_profiles.TunedProfileUtils._remove_stray_tuned_profiles'):
+ mock.patch('cephadm.tuned_profiles.TunedProfileUtils._remove_stray_tuned_profiles'),\
+ mock.patch('cephadm.offline_watcher.OfflineHostWatcher.run'):
m = CephadmOrchestrator.__new__(CephadmOrchestrator)
if module_options is not None:
+++ /dev/null
-from unittest.mock import MagicMock
-from cephadm.agent import Root
-
-
-class FakeDaemonDescription:
- def __init__(self, ip, ports, hostname, service_name='', daemon_type=''):
- self.ip = ip
- self.ports = ports
- self.hostname = hostname
- self._service_name = service_name
- self.daemon_type = daemon_type
-
- def service_name(self):
- return self._service_name
-
-
-class FakeCache:
- def get_daemons_by_service(self, service_type):
- return [FakeDaemonDescription('1.2.3.4', [9100], 'node0'),
- FakeDaemonDescription('1.2.3.5', [9200], 'node1')]
-
- def get_daemons_by_type(self, daemon_type):
- return [FakeDaemonDescription('1.2.3.4', [9100], 'node0', 'ingress', 'haproxy'),
- FakeDaemonDescription('1.2.3.5', [9200], 'node1', 'ingress', 'haproxy')]
-
-
-class FakeInventory:
- def get_addr(self, name: str):
- return '1.2.3.4'
-
-
-class FakeServiceSpec:
- def __init__(self, port):
- self.monitor_port = port
-
-
-class FakeSpecDescription:
- def __init__(self, port):
- self.spec = FakeServiceSpec(port)
-
-
-class FakeSpecStore():
- def __init__(self, mgr):
- self.mgr = mgr
- self._specs = {'ingress': FakeSpecDescription(9049)}
-
- def __contains__(self, name):
- return name in self._specs
-
- def __getitem__(self, name):
- return self._specs['ingress']
-
-
-class FakeMgr:
- def __init__(self):
- self.config = ''
- self.check_mon_command = MagicMock(side_effect=self._check_mon_command)
- self.mon_command = MagicMock(side_effect=self._check_mon_command)
- self.template = MagicMock()
- self.log = MagicMock()
- self.inventory = FakeInventory()
- self.cache = FakeCache()
- self.spec_store = FakeSpecStore(self)
-
- def list_servers(self):
-
- servers = [
- {'hostname': 'node0',
- 'ceph_version': '16.2',
- 'services': [{'type': 'mgr'}, {'type': 'mon'}]},
- {'hostname': 'node1',
- 'ceph_version': '16.2',
- 'services': [{'type': 'mgr'}, {'type': 'mon'}]}
- ]
-
- return servers
-
- def _check_mon_command(self, cmd_dict, inbuf=None):
- prefix = cmd_dict.get('prefix')
- if prefix == 'get-cmd':
- return 0, self.config, ''
- if prefix == 'set-cmd':
- self.config = cmd_dict.get('value')
- return 0, 'value set', ''
- return -1, '', 'error'
-
- def get_module_option_ex(self, module, option, default_value):
- return "9283"
-
-
-class TestCephadmService:
-
- def test_get_sd_config_prometheus(self):
- mgr = FakeMgr()
- root = Root(mgr)
- cfg = root.get_sd_config('mgr-prometheus')
-
- # check response structure
- assert cfg
- for entry in cfg:
- assert 'labels' in entry
- assert 'targets' in entry
-
- # check content
- assert cfg[0]['targets'] == ['node0:9283', 'node1:9283']
-
- def test_get_sd_config_node_exporter(self):
- mgr = FakeMgr()
- root = Root(mgr)
- cfg = root.get_sd_config('node-exporter')
-
- # check response structure
- assert cfg
- for entry in cfg:
- assert 'labels' in entry
- assert 'targets' in entry
-
- # check content
- assert cfg[0]['targets'] == ['1.2.3.4:9100']
- assert cfg[0]['labels'] == {'instance': 'node0'}
- assert cfg[1]['targets'] == ['1.2.3.5:9200']
- assert cfg[1]['labels'] == {'instance': 'node1'}
-
- def test_get_sd_config_alertmgr(self):
- mgr = FakeMgr()
- root = Root(mgr)
- cfg = root.get_sd_config('alertmanager')
-
- # check response structure
- assert cfg
- for entry in cfg:
- assert 'labels' in entry
- assert 'targets' in entry
-
- # check content
- assert cfg[0]['targets'] == ['1.2.3.4:9100', '1.2.3.5:9200']
-
- def test_get_sd_config_haproxy(self):
- mgr = FakeMgr()
- root = Root(mgr)
- cfg = root.get_sd_config('haproxy')
-
- # check response structure
- assert cfg
- for entry in cfg:
- assert 'labels' in entry
- assert 'targets' in entry
-
- # check content
- assert cfg[0]['targets'] == ['1.2.3.4:9049']
- assert cfg[0]['labels'] == {'instance': 'ingress'}
-
- def test_get_sd_config_invalid_service(self):
- mgr = FakeMgr()
- root = Root(mgr)
- cfg = root.get_sd_config('invalid-service')
- assert cfg == []
--- /dev/null
+from unittest.mock import MagicMock
+from cephadm.service_discovery import Root
+
+
+class FakeDaemonDescription:
+ def __init__(self, ip, ports, hostname, service_name='', daemon_type=''):
+ self.ip = ip
+ self.ports = ports
+ self.hostname = hostname
+ self._service_name = service_name
+ self.daemon_type = daemon_type
+
+ def service_name(self):
+ return self._service_name
+
+
+class FakeCache:
+ def get_daemons_by_service(self, service_type):
+ return [FakeDaemonDescription('1.2.3.4', [9100], 'node0'),
+ FakeDaemonDescription('1.2.3.5', [9200], 'node1')]
+
+ def get_daemons_by_type(self, daemon_type):
+ return [FakeDaemonDescription('1.2.3.4', [9100], 'node0', 'ingress', 'haproxy'),
+ FakeDaemonDescription('1.2.3.5', [9200], 'node1', 'ingress', 'haproxy')]
+
+
+class FakeInventory:
+ def get_addr(self, name: str):
+ return '1.2.3.4'
+
+
+class FakeServiceSpec:
+ def __init__(self, port):
+ self.monitor_port = port
+
+
+class FakeSpecDescription:
+ def __init__(self, port):
+ self.spec = FakeServiceSpec(port)
+
+
+class FakeSpecStore():
+ def __init__(self, mgr):
+ self.mgr = mgr
+ self._specs = {'ingress': FakeSpecDescription(9049)}
+
+ def __contains__(self, name):
+ return name in self._specs
+
+ def __getitem__(self, name):
+ return self._specs['ingress']
+
+
+class FakeMgr:
+ def __init__(self):
+ self.config = ''
+ self.check_mon_command = MagicMock(side_effect=self._check_mon_command)
+ self.mon_command = MagicMock(side_effect=self._check_mon_command)
+ self.template = MagicMock()
+ self.log = MagicMock()
+ self.inventory = FakeInventory()
+ self.cache = FakeCache()
+ self.spec_store = FakeSpecStore(self)
+
+ def list_servers(self):
+
+ servers = [
+ {'hostname': 'node0',
+ 'ceph_version': '16.2',
+ 'services': [{'type': 'mgr'}, {'type': 'mon'}]},
+ {'hostname': 'node1',
+ 'ceph_version': '16.2',
+ 'services': [{'type': 'mgr'}, {'type': 'mon'}]}
+ ]
+
+ return servers
+
+ def _check_mon_command(self, cmd_dict, inbuf=None):
+ prefix = cmd_dict.get('prefix')
+ if prefix == 'get-cmd':
+ return 0, self.config, ''
+ if prefix == 'set-cmd':
+ self.config = cmd_dict.get('value')
+ return 0, 'value set', ''
+ return -1, '', 'error'
+
+ def get_module_option_ex(self, module, option, default_value):
+ return "9283"
+
+
+class TestServiceDiscovery:
+
+ def test_get_sd_config_prometheus(self):
+ mgr = FakeMgr()
+ root = Root(mgr)
+ cfg = root.get_sd_config('mgr-prometheus')
+
+ # check response structure
+ assert cfg
+ for entry in cfg:
+ assert 'labels' in entry
+ assert 'targets' in entry
+
+ # check content
+ assert cfg[0]['targets'] == ['node0:9283', 'node1:9283']
+
+ def test_get_sd_config_node_exporter(self):
+ mgr = FakeMgr()
+ root = Root(mgr)
+ cfg = root.get_sd_config('node-exporter')
+
+ # check response structure
+ assert cfg
+ for entry in cfg:
+ assert 'labels' in entry
+ assert 'targets' in entry
+
+ # check content
+ assert cfg[0]['targets'] == ['1.2.3.4:9100']
+ assert cfg[0]['labels'] == {'instance': 'node0'}
+ assert cfg[1]['targets'] == ['1.2.3.5:9200']
+ assert cfg[1]['labels'] == {'instance': 'node1'}
+
+ def test_get_sd_config_alertmgr(self):
+ mgr = FakeMgr()
+ root = Root(mgr)
+ cfg = root.get_sd_config('alertmanager')
+
+ # check response structure
+ assert cfg
+ for entry in cfg:
+ assert 'labels' in entry
+ assert 'targets' in entry
+
+ # check content
+ assert cfg[0]['targets'] == ['1.2.3.4:9100', '1.2.3.5:9200']
+
+ def test_get_sd_config_haproxy(self):
+ mgr = FakeMgr()
+ root = Root(mgr)
+ cfg = root.get_sd_config('haproxy')
+
+ # check response structure
+ assert cfg
+ for entry in cfg:
+ assert 'labels' in entry
+ assert 'targets' in entry
+
+ # check content
+ assert cfg[0]['targets'] == ['1.2.3.4:9049']
+ assert cfg[0]['labels'] == {'instance': 'ingress'}
+
+ def test_get_sd_config_invalid_service(self):
+ mgr = FakeMgr()
+ root = Root(mgr)
+ cfg = root.get_sd_config('invalid-service')
+ assert cfg == []
evaluation_interval: 10s
rule_files:
- /etc/prometheus/alerting/*
+
+
scrape_configs:
- job_name: 'ceph'
honor_labels: true
- static_configs:
- - targets:
- - '[::1]:9283'
+ http_sd_configs:
+ - url: https://[::1]:8765/sd/prometheus/sd-config?service=mgr-prometheus
+ tls_config:
+ ca_file: root_cert.pem
- job_name: 'node'
- static_configs:
- - targets: ['[1::4]:9100']
- labels:
- instance: 'test'
+ http_sd_configs:
+ - url: https://[::1]:8765/sd/prometheus/sd-config?service=node-exporter
+ tls_config:
+ ca_file: root_cert.pem
""").lstrip()
'--config-json', '-',
'--tcp-ports', '9095'
],
- stdin=json.dumps({"files": {"prometheus.yml": y}}),
+ stdin=json.dumps({"files": {"prometheus.yml": y, "root_cert.pem": ''}}),
image='')
@patch("cephadm.serve.CephadmServe._run_cephadm")
"""
raise NotImplementedError()
+ def service_discovery_dump_cert(self) -> OrchResult:
+ """
+ Returns service discovery server root certificate
+
+ :return: service discovery root certificate
+ """
+ raise NotImplementedError()
+
def describe_service(self, service_type: Optional[str] = None, service_name: Optional[str] = None, refresh: bool = False) -> OrchResult[List['ServiceDescription']]:
"""
Describe a service (of any kind) that is already configured in
raise_if_exception(completion)
return HandleCommandResult(stdout=completion.result_str())
+ @_cli_write_command('orch sd dump cert')
+ def _service_discovery_dump_cert(self) -> HandleCommandResult:
+ """
+ Returns service discovery server root certificate
+ """
+ completion = self.service_discovery_dump_cert()
+ raise_if_exception(completion)
+ return HandleCommandResult(stdout=completion.result_str())
+
@_cli_read_command('orch ls')
def _list_services(self,
service_type: Optional[str] = None,