]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/cephadm: reconfig prometheus when service-discovery port changes
authorRedouane Kachach <rkachach@redhat.com>
Thu, 1 Sep 2022 12:43:12 +0000 (14:43 +0200)
committerRedouane Kachach <rkachach@redhat.com>
Wed, 14 Sep 2022 10:17:35 +0000 (12:17 +0200)
fixes: https://tracker.ceph.com/issues/57366

Signed-off-by: Redouane Kachach <rkachach@redhat.com>
src/pybind/mgr/cephadm/agent.py
src/pybind/mgr/cephadm/http_server.py
src/pybind/mgr/cephadm/module.py
src/pybind/mgr/cephadm/serve.py
src/pybind/mgr/cephadm/service_discovery.py
src/pybind/mgr/cephadm/services/cephadmservice.py
src/pybind/mgr/cephadm/services/monitoring.py
src/pybind/mgr/cephadm/ssl_cert_utils.py
src/pybind/mgr/cephadm/tests/test_service_discovery.py
src/pybind/mgr/mgr_module.py
src/pybind/mgr/mgr_util.py

index bb35166a155f213df55e39ad95c3e676732f9023..a968c74c612b42fefd23b55739fa2fc8a5b3a117 100644 (file)
@@ -14,15 +14,14 @@ import tempfile
 import threading
 import time
 
-
-from mgr_util import verify_tls_files
-from orchestrator import DaemonDescriptionStatus, OrchestratorError
+from orchestrator import DaemonDescriptionStatus
 from orchestrator._interface import daemon_type_to_service
 from ceph.utils import datetime_now
 from ceph.deployment.inventory import Devices
 from ceph.deployment.service_spec import ServiceSpec, PlacementSpec
 from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
 from cephadm.ssl_cert_utils import SSLCerts
+from mgr_util import test_port_allocation, PortAlreadyInUse
 
 from typing import Any, Dict, List, Set, TYPE_CHECKING, Optional
 
@@ -52,62 +51,43 @@ class AgentEndpoint:
         self.ssl_certs = SSLCerts()
         self.server_port = 7150
         self.server_addr = self.mgr.get_mgr_ip()
-        self.host_data: Server = None
 
     def configure_routes(self) -> None:
-
-        self.host_data = HostData(self.mgr,
-                                  self.server_port,
-                                  self.server_addr,
-                                  self.cert_file.name,
-                                  self.key_file.name)
-
-        # configure routes
         d = cherrypy.dispatch.RoutesDispatcher()
-        d.connect(name='host-data', route='/',
+        d.connect(name='host-data', route='/data',
                   controller=self.host_data.POST,
                   conditions=dict(method=['POST']))
+        cherrypy.tree.mount(None, '/', config={'/': {'request.dispatch': d}})
 
-        cherrypy.tree.mount(None, '/data', config={'/': {'request.dispatch': d}})
-
-    def configure_tls(self) -> None:
-        try:
-            old_cert = self.mgr.get_store(self.KV_STORE_AGENT_ROOT_CERT)
-            old_key = self.mgr.get_store(self.KV_STORE_AGENT_ROOT_KEY)
-            if not old_key or not old_cert:
-                raise OrchestratorError('No old credentials for agent found')
+    def configure_tls(self, server: Server) -> None:
+        old_cert = self.mgr.get_store(self.KV_STORE_AGENT_ROOT_CERT)
+        old_key = self.mgr.get_store(self.KV_STORE_AGENT_ROOT_KEY)
+        if old_cert and old_key:
             self.ssl_certs.load_root_credentials(old_cert, old_key)
-        except (OrchestratorError, json.decoder.JSONDecodeError, KeyError, ValueError):
+        else:
             self.ssl_certs.generate_root_cert(self.mgr.get_mgr_ip())
             self.mgr.set_store(self.KV_STORE_AGENT_ROOT_CERT, self.ssl_certs.get_root_cert())
             self.mgr.set_store(self.KV_STORE_AGENT_ROOT_KEY, self.ssl_certs.get_root_key())
 
-        cert, key = self.ssl_certs.generate_cert(self.mgr.get_mgr_ip())
-        self.key_file = tempfile.NamedTemporaryFile()
-        self.key_file.write(key.encode('utf-8'))
-        self.key_file.flush()  # pkey_tmp must not be gc'ed
-        self.cert_file = tempfile.NamedTemporaryFile()
-        self.cert_file.write(cert.encode('utf-8'))
-        self.cert_file.flush()  # cert_tmp must not be gc'ed
-        verify_tls_files(self.cert_file.name, self.key_file.name)
+        host = self.mgr.get_hostname()
+        addr = self.mgr.get_mgr_ip()
+        server.ssl_certificate, server.ssl_private_key = self.ssl_certs.generate_cert_files(host, addr)
 
     def find_free_port(self) -> None:
         max_port = self.server_port + 150
-        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
         while self.server_port <= max_port:
             try:
-                sock.bind((self.server_addr, self.server_port))
-                sock.close()
+                test_port_allocation(self.server_addr, self.server_port)
                 self.host_data.socket_port = self.server_port
                 self.mgr.log.debug(f'Cephadm agent endpoint using {self.server_port}')
                 return
-            except OSError:
+            except PortAlreadyInUse:
                 self.server_port += 1
-        self.mgr.log.error(
-            'Cephadm agent endpoint could not find free port in range 7150-7300 and failed to start')
+        self.mgr.log.error(f'Cephadm agent could not find free port in range {max_port - 150}-{max_port} and failed to start')
 
     def configure(self) -> None:
-        self.configure_tls()
+        self.host_data = HostData(self.mgr, self.server_port, self.server_addr)
+        self.configure_tls(self.host_data)
         self.configure_routes()
         self.find_free_port()
 
@@ -115,15 +95,20 @@ class AgentEndpoint:
 class HostData(Server):
     exposed = True
 
-    def __init__(self, mgr: "CephadmOrchestrator", port: int, host: str, ssl_ca_cert: str, ssl_priv_key: str):
+    def __init__(self, mgr: "CephadmOrchestrator", port: int, host: str):
         self.mgr = mgr
         super().__init__()
         self.socket_port = port
-        self.ssl_certificate = ssl_ca_cert
-        self.ssl_private_key = ssl_priv_key
-        self._socket_host = host
+        self.socket_host = host
         self.subscribe()
 
+    def stop(self) -> None:
+        # we must call unsubscribe before stopping the server,
+        # otherwise the port is not released and we will get
+        # an exception when trying to restart it
+        self.unsubscribe()
+        super().stop()
+
     @cherrypy.tools.json_in()
     @cherrypy.tools.json_out()
     def POST(self) -> Dict[str, Any]:
@@ -262,7 +247,8 @@ class AgentMessageThread(threading.Thread):
             root_cert_tmp.flush()
             root_cert_fname = root_cert_tmp.name
 
-            cert, key = self.agent.ssl_certs.generate_cert(self.mgr.get_mgr_ip())
+            cert, key = self.agent.ssl_certs.generate_cert(
+                self.mgr.get_hostname(), self.mgr.get_mgr_ip())
 
             cert_tmp = tempfile.NamedTemporaryFile()
             cert_tmp.write(cert.encode('utf-8'))
index 7c63740db5ce510f889617fc8da01f19c8b1c32a..0c0b940aa94d9c18600511abd23f8866794e944e 100644 (file)
@@ -5,7 +5,8 @@ from typing import TYPE_CHECKING
 
 from cephadm.agent import AgentEndpoint
 from cephadm.service_discovery import ServiceDiscovery
-
+from mgr_util import test_port_allocation, PortAlreadyInUse
+from orchestrator import OrchestratorError
 
 if TYPE_CHECKING:
     from cephadm.module import CephadmOrchestrator
@@ -29,6 +30,7 @@ class CephadmHttpServer(threading.Thread):
         self.agent = AgentEndpoint(mgr)
         self.service_discovery = ServiceDiscovery(mgr)
         self.cherrypy_shutdown_event = threading.Event()
+        self._service_discovery_port = self.mgr.service_discovery_port
         super().__init__(target=self.run)
 
     def configure_cherrypy(self) -> None:
@@ -37,17 +39,47 @@ class CephadmHttpServer(threading.Thread):
             'engine.autoreload.on': False,
         })
 
-    def run(self) -> None:
+    def configure(self) -> None:
+        self.configure_cherrypy()
+        self.agent.configure()
+        self.service_discovery.configure(self.mgr.service_discovery_port, self.mgr.get_mgr_ip())
+
+    def config_update(self) -> None:
+        self.service_discovery_port = self.mgr.service_discovery_port
+
+    @property
+    def service_discovery_port(self) -> int:
+        return self._service_discovery_port
+
+    @service_discovery_port.setter
+    def service_discovery_port(self, value: int) -> None:
+        if self._service_discovery_port == value:
+            return
+
         try:
-            self.configure_cherrypy()
-            self.agent.configure()
-            self.service_discovery.configure()
+            test_port_allocation(self.mgr.get_mgr_ip(), value)
+        except PortAlreadyInUse:
+            raise OrchestratorError(f'Service discovery port {value} is already in use. Listening on old port {self._service_discovery_port}.')
+        except Exception as e:
+            raise OrchestratorError(f'Cannot check service discovery port ip:{self.mgr.get_mgr_ip()} port:{value} error:{e}')
+
+        self.mgr.log.info(f'Changing service discovery port from {self._service_discovery_port} to {value}...')
+        self._service_discovery_port = value
+        self.restart()
+
+    def restart(self) -> None:
+        cherrypy.engine.stop()
+        cherrypy.server.httpserver = None
+        self.configure()
+        cherrypy.engine.start()
 
+    def run(self) -> None:
+        try:
             self.mgr.log.debug('Starting cherrypy engine...')
+            self.configure()
             cherrypy.server.unsubscribe()  # disable default server
             cherrypy.engine.start()
             self.mgr.log.debug('Cherrypy engine started.')
-
             self.mgr._kick_serve_loop()
             # wait for the shutdown event
             self.cherrypy_shutdown_event.wait()
index e08996088dc816d149a7383f8d674894a9ab85a8..c720f058c2401ab93bbf2c71eb46309d0609cdeb 100644 (file)
@@ -2431,8 +2431,8 @@ Then run the following:
             # this way we force a redeploy after a mgr failover
             deps.append(self.get_active_mgr().name())
             deps.append(str(self.get_module_option_ex('prometheus', 'server_port', 9283)))
-            deps += [s for s in ['node-exporter', 'alertmanager', 'ingress']
-                     if self.cache.get_daemons_by_service(s)]
+            deps.append(str(self.service_discovery_port))
+            deps += [s for s in ['node-exporter', 'alertmanager', 'ingress'] if self.cache.get_daemons_by_service(s)]
         else:
             need = {
                 'grafana': ['prometheus', 'loki'],
index 487578288548c1d63655c21d50aecf40b497789a..14143ca5e74957fb39c1b60987c271f25aad3745 100644 (file)
@@ -273,6 +273,7 @@ class CephadmServe:
         refresh(self.mgr.cache.get_hosts())
 
         self.mgr.agent_helpers._update_agent_down_healthcheck(agents_down)
+        self.mgr.http_server.config_update()
 
         self.mgr.config_checker.run_checks()
 
index 50d83e44d18929c692740cb264254f5d567dfdbc..b35aa142df6e37faa7f1ef179ba0b24c03614f0b 100644 (file)
@@ -7,12 +7,12 @@ except ImportError:
         pass
 
 import logging
-import tempfile
-from typing import Dict, List, TYPE_CHECKING, cast, Collection
-
-from orchestrator import OrchestratorError
+import orchestrator  # noqa
 from mgr_module import ServiceInfoT
-from mgr_util import verify_tls_files, build_url
+from mgr_util import build_url
+from typing import Dict, List, TYPE_CHECKING, cast, Collection, Callable, NamedTuple
+from cephadm.services.monitoring import AlertmanagerService, NodeExporterService, PrometheusService
+
 from cephadm.services.ingress import IngressSpec
 from cephadm.ssl_cert_utils import SSLCerts
 
@@ -32,6 +32,12 @@ logging.getLogger('cherrypy.error').addFilter(cherrypy_filter)
 cherrypy.log.access_log.propagate = False
 
 
+class Route(NamedTuple):
+    name: str
+    route: str
+    controller: Callable
+
+
 class ServiceDiscovery:
 
     KV_STORE_SD_ROOT_CERT = 'service_discovery/root/cert'
@@ -40,52 +46,38 @@ class ServiceDiscovery:
     def __init__(self, mgr: "CephadmOrchestrator") -> None:
         self.mgr = mgr
         self.ssl_certs = SSLCerts()
-        self.server_port = self.mgr.service_discovery_port
-        self.server_addr = '::'
 
-    def configure_routes(self) -> None:
-
-        root_server = Root(self.mgr,
-                           self.server_port,
-                           self.server_addr,
-                           self.cert_file.name,
-                           self.key_file.name)
-
-        # configure routes
+    def configure_routes(self, server: Server) -> None:
+        ROUTES = [
+            Route('index', '/', server.index),
+            Route('sd-config', '/prometheus/sd-config', server.get_sd_config),
+            Route('rules', '/prometheus/rules', server.get_prometheus_rules),
+        ]
         d = cherrypy.dispatch.RoutesDispatcher()
-        d.connect(name='index', route='/', controller=root_server.index)
-        d.connect(name='index', route='/sd', controller=root_server.index)
-        d.connect(name='index', route='/sd/', controller=root_server.index)
-        d.connect(name='sd-config', route='/sd/prometheus/sd-config',
-                  controller=root_server.get_sd_config)
-        d.connect(name='rules', route='/sd/prometheus/rules',
-                  controller=root_server.get_prometheus_rules)
-        cherrypy.tree.mount(None, '/', config={'/': {'request.dispatch': d}})
-
-    def configure_tls(self) -> None:
-        try:
-            old_cert = self.mgr.get_store(self.KV_STORE_SD_ROOT_CERT)
-            old_key = self.mgr.get_store(self.KV_STORE_SD_ROOT_KEY)
-            if not old_key or not old_cert:
-                raise OrchestratorError('No old credentials for service discovery found')
+        for route in ROUTES:
+            d.connect(**route._asdict())
+        conf = {'/': {'request.dispatch': d}}
+        cherrypy.tree.mount(None, '/sd', config=conf)
+
+    def configure_tls(self, server: Server) -> None:
+        old_cert = self.mgr.get_store(self.KV_STORE_SD_ROOT_CERT)
+        old_key = self.mgr.get_store(self.KV_STORE_SD_ROOT_KEY)
+        if old_key and old_cert:
             self.ssl_certs.load_root_credentials(old_cert, old_key)
-        except (OrchestratorError, KeyError, ValueError):
+        else:
             self.ssl_certs.generate_root_cert(self.mgr.get_mgr_ip())
             self.mgr.set_store(self.KV_STORE_SD_ROOT_CERT, self.ssl_certs.get_root_cert())
             self.mgr.set_store(self.KV_STORE_SD_ROOT_KEY, self.ssl_certs.get_root_key())
 
-        cert, key = self.ssl_certs.generate_cert(self.mgr.get_mgr_ip())
-        self.key_file = tempfile.NamedTemporaryFile()
-        self.key_file.write(key.encode('utf-8'))
-        self.key_file.flush()  # pkey_tmp must not be gc'ed
-        self.cert_file = tempfile.NamedTemporaryFile()
-        self.cert_file.write(cert.encode('utf-8'))
-        self.cert_file.flush()  # cert_tmp must not be gc'ed
-        verify_tls_files(self.cert_file.name, self.key_file.name)
+        host = self.mgr.get_hostname()
+        addr = self.mgr.get_mgr_ip()
+        server.ssl_certificate, server.ssl_private_key = self.ssl_certs.generate_cert_files(host, addr)
 
-    def configure(self) -> None:
-        self.configure_tls()
-        self.configure_routes()
+    def configure(self, port: int, addr: str) -> None:
+        # we create a new server to enforce TLS/SSL config refresh
+        self.root_server = Root(self.mgr, port, addr)
+        self.configure_tls(self.root_server)
+        self.configure_routes(self.root_server)
 
 
 class Root(Server):
@@ -95,17 +87,18 @@ class Root(Server):
         cherrypy.request.path = ''
         return self
 
-    def __init__(self, mgr: "CephadmOrchestrator",
-                 port: int = 0,
-                 host: str = '',
-                 ssl_ca_cert: str = '',
-                 ssl_priv_key: str = ''):
+    def stop(self) -> None:
+        # we must call unsubscribe before stopping the server,
+        # otherwise the port is not released and we will get
+        # an exception when trying to restart it
+        self.unsubscribe()
+        super().stop()
+
+    def __init__(self, mgr: "CephadmOrchestrator", port: int, host: str):
         self.mgr = mgr
         super().__init__()
         self.socket_port = port
-        self._socket_host = host
-        self.ssl_certificate = ssl_ca_cert
-        self.ssl_private_key = ssl_priv_key
+        self.socket_host = host
         self.subscribe()
 
     @cherrypy.expose
@@ -147,7 +140,8 @@ class Root(Server):
             for service in cast(List[ServiceInfoT], server.get('services', [])):
                 if service['type'] != 'mgr':
                     continue
-                port = self.mgr.get_module_option_ex('prometheus', 'server_port', 9283)
+                port = self.mgr.get_module_option_ex(
+                    'prometheus', 'server_port', PrometheusService.DEFAULT_MGR_PROMETHEUS_PORT)
                 targets.append(f'{hostname}:{port}')
         return [{"targets": targets, "labels": {}}]
 
@@ -157,7 +151,7 @@ class Root(Server):
         for dd in self.mgr.cache.get_daemons_by_service('alertmanager'):
             assert dd.hostname is not None
             addr = dd.ip if dd.ip else self.mgr.inventory.get_addr(dd.hostname)
-            port = dd.ports[0] if dd.ports else 9093
+            port = dd.ports[0] if dd.ports else AlertmanagerService.DEFAULT_SERVICE_PORT
             srv_entries.append('{}'.format(build_url(host=addr, port=port).lstrip('/')))
         return [{"targets": srv_entries, "labels": {}}]
 
@@ -167,7 +161,7 @@ class Root(Server):
         for dd in self.mgr.cache.get_daemons_by_service('node-exporter'):
             assert dd.hostname is not None
             addr = dd.ip if dd.ip else self.mgr.inventory.get_addr(dd.hostname)
-            port = dd.ports[0] if dd.ports else 9100
+            port = dd.ports[0] if dd.ports else NodeExporterService.DEFAULT_SERVICE_PORT
             srv_entries.append({
                 'targets': [build_url(host=addr, port=port).lstrip('/')],
                 'labels': {'instance': dd.hostname}
index 9a7c7e40e8e6cb55cd36ec589b2eb9f0cd5b2dd0..2621b1369f9a6d85d602d961f602bc31ea16edd2 100644 (file)
@@ -1077,8 +1077,7 @@ class CephadmAgent(CephService):
                'host': daemon_spec.host,
                'device_enhanced_scan': str(self.mgr.device_enhanced_scan)}
 
-        listener_cert, listener_key = agent.ssl_certs.generate_cert(
-            self.mgr.inventory.get_addr(daemon_spec.host))
+        listener_cert, listener_key = agent.ssl_certs.generate_cert(daemon_spec.host, self.mgr.inventory.get_addr(daemon_spec.host))
         config = {
             'agent.json': json.dumps(cfg),
             'keyring': daemon_spec.keyring,
index 5942a92597bfa378ce50758f8a7016b9c733b9fc..f55dcda233aeb42b75916330805d12f7237f8abe 100644 (file)
@@ -372,6 +372,7 @@ class PrometheusService(CephadmService):
         port = cast(int, self.mgr.get_module_option_ex(
             'prometheus', 'server_port', self.DEFAULT_MGR_PROMETHEUS_PORT))
         deps.append(str(port))
+        deps.append(str(self.mgr.service_discovery_port))
         # add an explicit dependency on the active manager. This will force to
         # re-deploy prometheus if the mgr has changed (due to a fail-over i.e).
         deps.append(self.mgr.get_active_mgr().name())
@@ -411,6 +412,7 @@ class PrometheusService(CephadmService):
 
 class NodeExporterService(CephadmService):
     TYPE = 'node-exporter'
+    DEFAULT_SERVICE_PORT = 9100
 
     def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
         assert self.TYPE == daemon_spec.daemon_type
index b18d185facd789a3667f2f90040154a7960a7cc9..8063cd64c4d2e2fb6bf88a6b1a04fba86d99e8da 100644 (file)
@@ -1,6 +1,9 @@
 
-from typing import Any, Tuple
+from typing import Any, Tuple, IO
 import ipaddress
+import tempfile
+import os
+import logging
 
 from datetime import datetime, timedelta
 from cryptography import x509
@@ -8,16 +11,26 @@ from cryptography.x509.oid import NameOID
 from cryptography.hazmat.primitives.asymmetric import rsa
 from cryptography.hazmat.primitives import hashes, serialization
 from cryptography.hazmat.backends import default_backend
+from OpenSSL import crypto, SSL
 
 from orchestrator import OrchestratorError
 
 
+logger = logging.getLogger(__name__)
+
+
+class SSLConfigException(Exception):
+    pass
+
+
 class SSLCerts:
     def __init__(self) -> None:
         self.root_cert: Any
         self.root_key: Any
+        self.key_file: IO[bytes]
+        self.cert_file: IO[bytes]
 
-    def generate_root_cert(self, host: str) -> Tuple[str, str]:
+    def generate_root_cert(self, addr: str) -> Tuple[str, str]:
         self.root_key = rsa.generate_private_key(
             public_exponent=65537, key_size=4096, backend=default_backend())
         root_public_key = self.root_key.public_key()
@@ -34,7 +47,7 @@ class SSLCerts:
         root_builder = root_builder.public_key(root_public_key)
         root_builder = root_builder.add_extension(
             x509.SubjectAlternativeName(
-                [x509.IPAddress(ipaddress.IPv4Address(host))]
+                [x509.IPAddress(ipaddress.IPv4Address(addr))]
             ),
             critical=False
         )
@@ -55,7 +68,7 @@ class SSLCerts:
 
         return (cert_str, key_str)
 
-    def generate_cert(self, addr: str) -> Tuple[str, str]:
+    def generate_cert(self, host: str, addr: str) -> Tuple[str, str]:
         have_ip = True
         try:
             ip = x509.IPAddress(ipaddress.IPv4Address(addr))
@@ -80,7 +93,14 @@ class SSLCerts:
         if have_ip:
             builder = builder.add_extension(
                 x509.SubjectAlternativeName(
-                    [ip]
+                    [ip, x509.DNSName(host)]
+                ),
+                critical=False
+            )
+        else:
+            builder = builder.add_extension(
+                x509.SubjectAlternativeName(
+                    [x509.DNSName(host)]
                 ),
                 critical=False
             )
@@ -97,6 +117,20 @@ class SSLCerts:
 
         return (cert_str, key_str)
 
+    def generate_cert_files(self, host: str, addr: str) -> Tuple[str, str]:
+        cert, key = self.generate_cert(host, addr)
+
+        self.cert_file = tempfile.NamedTemporaryFile()
+        self.cert_file.write(cert.encode('utf-8'))
+        self.cert_file.flush()  # cert_tmp must not be gc'ed
+
+        self.key_file = tempfile.NamedTemporaryFile()
+        self.key_file.write(key.encode('utf-8'))
+        self.key_file.flush()  # pkey_tmp must not be gc'ed
+
+        verify_tls_files(self.cert_file.name, self.key_file.name)
+        return self.cert_file.name, self.key_file.name
+
     def get_root_cert(self) -> str:
         try:
             return self.root_cert.public_bytes(encoding=serialization.Encoding.PEM).decode('utf-8')
@@ -121,3 +155,95 @@ class SSLCerts:
         self.root_cert = given_cert
         self.root_key = serialization.load_pem_private_key(
             data=priv_key.encode('utf-8'), backend=default_backend(), password=None)
+
+
+def verify_tls(crt, key):
+    # type: (str, str) -> None
+    verify_cacrt_content(crt)
+
+    try:
+        _key = crypto.load_privatekey(crypto.FILETYPE_PEM, key)
+        _key.check()
+    except (ValueError, crypto.Error) as e:
+        raise SSLConfigException('Invalid private key: {}'.format(str(e)))
+    try:
+        _crt = crypto.load_certificate(crypto.FILETYPE_PEM, crt)
+    except ValueError as e:
+        raise SSLConfigException('Invalid certificate key: {}'.format(str(e))
+                                 )
+
+    try:
+        context = SSL.Context(SSL.TLSv1_METHOD)
+        context.use_certificate(_crt)
+        context.use_privatekey(_key)
+        context.check_privatekey()
+    except crypto.Error as e:
+        logger.warning(f'Private key and certificate do not match up: {e}')
+
+
+def verify_tls_files(cert_fname, pkey_fname):
+    # type: (str, str) -> None
+    """Basic checks for TLS certificate and key files
+
+    Do some validations to the private key and certificate:
+    - Check the type and format
+    - Check the certificate expiration date
+    - Check the consistency of the private key
+    - Check that the private key and certificate match up
+
+    :param cert_fname: Name of the certificate file
+    :param pkey_fname: name of the certificate public key file
+
+    :raises SSLConfigException: An error with a message
+
+    """
+    if not cert_fname or not pkey_fname:
+        raise SSLConfigException('no certificate configured')
+
+    verify_cacrt(cert_fname)
+
+    if not os.path.isfile(pkey_fname):
+        raise SSLConfigException('private key %s does not exist' % pkey_fname)
+
+    try:
+        with open(pkey_fname) as f:
+            pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, f.read())
+            pkey.check()
+    except (ValueError, crypto.Error) as e:
+        raise SSLConfigException(
+            'Invalid private key {}: {}'.format(pkey_fname, str(e)))
+    try:
+        context = SSL.Context(SSL.TLSv1_METHOD)
+        context.use_certificate_file(cert_fname, crypto.FILETYPE_PEM)
+        context.use_privatekey_file(pkey_fname, crypto.FILETYPE_PEM)
+        context.check_privatekey()
+    except crypto.Error as e:
+        logger.warning(
+            f'Private key {pkey_fname} and certificate {cert_fname} do not match up: {e}')
+
+
+def verify_cacrt(cert_fname):
+    # type: (str) -> None
+    """Basic validation of a ca cert"""
+
+    if not cert_fname:
+        raise SSLConfigException("CA cert not configured")
+    if not os.path.isfile(cert_fname):
+        raise SSLConfigException("Certificate {} does not exist".format(cert_fname))
+
+    try:
+        with open(cert_fname) as f:
+            verify_cacrt_content(f.read())
+    except ValueError as e:
+        raise SSLConfigException(
+            'Invalid certificate {}: {}'.format(cert_fname, str(e)))
+
+
+def verify_cacrt_content(crt):
+    # type: (str) -> None
+    try:
+        x509 = crypto.load_certificate(crypto.FILETYPE_PEM, crt)
+        if x509.has_expired():
+            logger.warning(f'Certificate has expired: {crt}')
+    except (ValueError, crypto.Error) as e:
+        raise SSLConfigException(f'Invalid certificate: {e}')
index 870b4341f5ae706e263e702919959eb4a599573c..f770c857e6557ee9c610ccf455820a63f8251479 100644 (file)
@@ -92,7 +92,7 @@ class TestServiceDiscovery:
 
     def test_get_sd_config_prometheus(self):
         mgr = FakeMgr()
-        root = Root(mgr)
+        root = Root(mgr, 5000, '0.0.0.0')
         cfg = root.get_sd_config('mgr-prometheus')
 
         # check response structure
@@ -106,7 +106,7 @@ class TestServiceDiscovery:
 
     def test_get_sd_config_node_exporter(self):
         mgr = FakeMgr()
-        root = Root(mgr)
+        root = Root(mgr, 5000, '0.0.0.0')
         cfg = root.get_sd_config('node-exporter')
 
         # check response structure
@@ -123,7 +123,7 @@ class TestServiceDiscovery:
 
     def test_get_sd_config_alertmgr(self):
         mgr = FakeMgr()
-        root = Root(mgr)
+        root = Root(mgr, 5000, '0.0.0.0')
         cfg = root.get_sd_config('alertmanager')
 
         # check response structure
@@ -137,7 +137,7 @@ class TestServiceDiscovery:
 
     def test_get_sd_config_haproxy(self):
         mgr = FakeMgr()
-        root = Root(mgr)
+        root = Root(mgr, 5000, '0.0.0.0')
         cfg = root.get_sd_config('haproxy')
 
         # check response structure
@@ -152,6 +152,6 @@ class TestServiceDiscovery:
 
     def test_get_sd_config_invalid_service(self):
         mgr = FakeMgr()
-        root = Root(mgr)
+        root = Root(mgr, 5000, '0.0.0.0')
         cfg = root.get_sd_config('invalid-service')
         assert cfg == []
index 72de7820621a6add9adc521f86bc644562c53411..0e69b0a12c190d146e841e886a5842efe779eaff 100644 (file)
@@ -885,6 +885,9 @@ class MgrStandbyModule(ceph_module.BaseMgrStandbyModule, MgrModuleLoggingMixin):
             return socket.gethostname()
         return ips[0]
 
+    def get_hostname(self) -> str:
+        return socket.gethostname()
+
     def get_localized_module_option(self, key: str, default: OptionValue = None) -> OptionValue:
         r = self._ceph_get_module_option(key, self.get_mgr_id())
         if r is None:
@@ -1805,6 +1808,10 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
         assert self._mgr_ips is not None
         return self._mgr_ips
 
+    @API.expose
+    def get_hostname(self) -> str:
+        return socket.gethostname()
+
     @API.expose
     def get_ceph_option(self, key: str) -> OptionValue:
         return self._ceph_get_option(key)
index 3eedb985a7d8ad6d6fd6ad6b30116d0777e73cfc..ba938401175aab17c3220df0a19ee822a89dd82e 100644 (file)
@@ -51,6 +51,10 @@ UNDERLINE_SEQ = "\033[4m"
 logger = logging.getLogger(__name__)
 
 
+class PortAlreadyInUse(Exception):
+    pass
+
+
 class CephfsConnectionException(Exception):
     def __init__(self, error_code: int, error_message: str):
         self.errno = error_code
@@ -409,6 +413,22 @@ def format_bytes(n: int, width: int, colored: bool = False) -> str:
     return format_units(n, width, colored, decimal=False)
 
 
+def test_port_allocation(addr: str, port: int) -> None:
+    """Checks if the port is available
+    :raises PortAlreadyInUse: in case port is already in use
+    :raises Exception: any generic error other than port already in use
+    """
+    try:
+        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        sock.bind((addr, port))
+        sock.close()
+    except socket.error as e:
+        if e.errno == errno.EADDRINUSE:
+            raise PortAlreadyInUse
+        else:
+            raise e
+
+
 def merge_dicts(*args: Dict[T, Any]) -> Dict[T, Any]:
     """
     >>> merge_dicts({1:2}, {3:4})
@@ -492,7 +512,7 @@ def create_self_signed_cert(organisation: str = 'Ceph',
                             common_name: str = 'mgr',
                             dname: Optional[Dict[str, str]] = None) -> Tuple[str, str]:
     """Returns self-signed PEM certificates valid for 10 years.
-    
+
     The optional dname parameter provides complete control of the cert/key
     creation by supporting all valid RDNs via a dictionary. However, if dname
     is not provided the default O and CN settings will be applied.