import threading
import time
-
-from mgr_util import verify_tls_files
-from orchestrator import DaemonDescriptionStatus, OrchestratorError
+from orchestrator import DaemonDescriptionStatus
from orchestrator._interface import daemon_type_to_service
from ceph.utils import datetime_now
from ceph.deployment.inventory import Devices
from ceph.deployment.service_spec import ServiceSpec, PlacementSpec
from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
from cephadm.ssl_cert_utils import SSLCerts
+from mgr_util import test_port_allocation, PortAlreadyInUse
from typing import Any, Dict, List, Set, TYPE_CHECKING, Optional
self.ssl_certs = SSLCerts()
self.server_port = 7150
self.server_addr = self.mgr.get_mgr_ip()
- self.host_data: Server = None
def configure_routes(self) -> None:
-
- self.host_data = HostData(self.mgr,
- self.server_port,
- self.server_addr,
- self.cert_file.name,
- self.key_file.name)
-
- # configure routes
d = cherrypy.dispatch.RoutesDispatcher()
- d.connect(name='host-data', route='/',
+ d.connect(name='host-data', route='/data',
controller=self.host_data.POST,
conditions=dict(method=['POST']))
+ cherrypy.tree.mount(None, '/', config={'/': {'request.dispatch': d}})
- cherrypy.tree.mount(None, '/data', config={'/': {'request.dispatch': d}})
-
- def configure_tls(self) -> None:
- try:
- old_cert = self.mgr.get_store(self.KV_STORE_AGENT_ROOT_CERT)
- old_key = self.mgr.get_store(self.KV_STORE_AGENT_ROOT_KEY)
- if not old_key or not old_cert:
- raise OrchestratorError('No old credentials for agent found')
+ def configure_tls(self, server: Server) -> None:
+ old_cert = self.mgr.get_store(self.KV_STORE_AGENT_ROOT_CERT)
+ old_key = self.mgr.get_store(self.KV_STORE_AGENT_ROOT_KEY)
+ if old_cert and old_key:
self.ssl_certs.load_root_credentials(old_cert, old_key)
- except (OrchestratorError, json.decoder.JSONDecodeError, KeyError, ValueError):
+ else:
self.ssl_certs.generate_root_cert(self.mgr.get_mgr_ip())
self.mgr.set_store(self.KV_STORE_AGENT_ROOT_CERT, self.ssl_certs.get_root_cert())
self.mgr.set_store(self.KV_STORE_AGENT_ROOT_KEY, self.ssl_certs.get_root_key())
- cert, key = self.ssl_certs.generate_cert(self.mgr.get_mgr_ip())
- self.key_file = tempfile.NamedTemporaryFile()
- self.key_file.write(key.encode('utf-8'))
- self.key_file.flush() # pkey_tmp must not be gc'ed
- self.cert_file = tempfile.NamedTemporaryFile()
- self.cert_file.write(cert.encode('utf-8'))
- self.cert_file.flush() # cert_tmp must not be gc'ed
- verify_tls_files(self.cert_file.name, self.key_file.name)
+ host = self.mgr.get_hostname()
+ addr = self.mgr.get_mgr_ip()
+ server.ssl_certificate, server.ssl_private_key = self.ssl_certs.generate_cert_files(host, addr)
def find_free_port(self) -> None:
max_port = self.server_port + 150
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
while self.server_port <= max_port:
try:
- sock.bind((self.server_addr, self.server_port))
- sock.close()
+ test_port_allocation(self.server_addr, self.server_port)
self.host_data.socket_port = self.server_port
self.mgr.log.debug(f'Cephadm agent endpoint using {self.server_port}')
return
- except OSError:
+ except PortAlreadyInUse:
self.server_port += 1
- self.mgr.log.error(
- 'Cephadm agent endpoint could not find free port in range 7150-7300 and failed to start')
+ self.mgr.log.error(f'Cephadm agent could not find free port in range {max_port - 150}-{max_port} and failed to start')
def configure(self) -> None:
- self.configure_tls()
+ self.host_data = HostData(self.mgr, self.server_port, self.server_addr)
+ self.configure_tls(self.host_data)
self.configure_routes()
self.find_free_port()
class HostData(Server):
exposed = True
- def __init__(self, mgr: "CephadmOrchestrator", port: int, host: str, ssl_ca_cert: str, ssl_priv_key: str):
+ def __init__(self, mgr: "CephadmOrchestrator", port: int, host: str):
self.mgr = mgr
super().__init__()
self.socket_port = port
- self.ssl_certificate = ssl_ca_cert
- self.ssl_private_key = ssl_priv_key
- self._socket_host = host
+ self.socket_host = host
self.subscribe()
+ def stop(self) -> None:
+ # we must call unsubscribe before stopping the server,
+ # otherwise the port is not released and we will get
+ # an exception when trying to restart it
+ self.unsubscribe()
+ super().stop()
+
@cherrypy.tools.json_in()
@cherrypy.tools.json_out()
def POST(self) -> Dict[str, Any]:
root_cert_tmp.flush()
root_cert_fname = root_cert_tmp.name
- cert, key = self.agent.ssl_certs.generate_cert(self.mgr.get_mgr_ip())
+ cert, key = self.agent.ssl_certs.generate_cert(
+ self.mgr.get_hostname(), self.mgr.get_mgr_ip())
cert_tmp = tempfile.NamedTemporaryFile()
cert_tmp.write(cert.encode('utf-8'))
from cephadm.agent import AgentEndpoint
from cephadm.service_discovery import ServiceDiscovery
-
+from mgr_util import test_port_allocation, PortAlreadyInUse
+from orchestrator import OrchestratorError
if TYPE_CHECKING:
from cephadm.module import CephadmOrchestrator
self.agent = AgentEndpoint(mgr)
self.service_discovery = ServiceDiscovery(mgr)
self.cherrypy_shutdown_event = threading.Event()
+ self._service_discovery_port = self.mgr.service_discovery_port
super().__init__(target=self.run)
def configure_cherrypy(self) -> None:
'engine.autoreload.on': False,
})
- def run(self) -> None:
+ def configure(self) -> None:
+ self.configure_cherrypy()
+ self.agent.configure()
+ self.service_discovery.configure(self.mgr.service_discovery_port, self.mgr.get_mgr_ip())
+
+ def config_update(self) -> None:
+ self.service_discovery_port = self.mgr.service_discovery_port
+
+ @property
+ def service_discovery_port(self) -> int:
+ return self._service_discovery_port
+
+ @service_discovery_port.setter
+ def service_discovery_port(self, value: int) -> None:
+ if self._service_discovery_port == value:
+ return
+
try:
- self.configure_cherrypy()
- self.agent.configure()
- self.service_discovery.configure()
+ test_port_allocation(self.mgr.get_mgr_ip(), value)
+ except PortAlreadyInUse:
+ raise OrchestratorError(f'Service discovery port {value} is already in use. Listening on old port {self._service_discovery_port}.')
+ except Exception as e:
+ raise OrchestratorError(f'Cannot check service discovery port ip:{self.mgr.get_mgr_ip()} port:{value} error:{e}')
+
+ self.mgr.log.info(f'Changing service discovery port from {self._service_discovery_port} to {value}...')
+ self._service_discovery_port = value
+ self.restart()
+
+ def restart(self) -> None:
+ cherrypy.engine.stop()
+ cherrypy.server.httpserver = None
+ self.configure()
+ cherrypy.engine.start()
+ def run(self) -> None:
+ try:
self.mgr.log.debug('Starting cherrypy engine...')
+ self.configure()
cherrypy.server.unsubscribe() # disable default server
cherrypy.engine.start()
self.mgr.log.debug('Cherrypy engine started.')
-
self.mgr._kick_serve_loop()
# wait for the shutdown event
self.cherrypy_shutdown_event.wait()
# this way we force a redeploy after a mgr failover
deps.append(self.get_active_mgr().name())
deps.append(str(self.get_module_option_ex('prometheus', 'server_port', 9283)))
- deps += [s for s in ['node-exporter', 'alertmanager', 'ingress']
- if self.cache.get_daemons_by_service(s)]
+ deps.append(str(self.service_discovery_port))
+ deps += [s for s in ['node-exporter', 'alertmanager', 'ingress'] if self.cache.get_daemons_by_service(s)]
else:
need = {
'grafana': ['prometheus', 'loki'],
refresh(self.mgr.cache.get_hosts())
self.mgr.agent_helpers._update_agent_down_healthcheck(agents_down)
+ self.mgr.http_server.config_update()
self.mgr.config_checker.run_checks()
pass
import logging
-import tempfile
-from typing import Dict, List, TYPE_CHECKING, cast, Collection
-
-from orchestrator import OrchestratorError
+import orchestrator # noqa
from mgr_module import ServiceInfoT
-from mgr_util import verify_tls_files, build_url
+from mgr_util import build_url
+from typing import Dict, List, TYPE_CHECKING, cast, Collection, Callable, NamedTuple
+from cephadm.services.monitoring import AlertmanagerService, NodeExporterService, PrometheusService
+
from cephadm.services.ingress import IngressSpec
from cephadm.ssl_cert_utils import SSLCerts
cherrypy.log.access_log.propagate = False
+class Route(NamedTuple):
+ name: str
+ route: str
+ controller: Callable
+
+
class ServiceDiscovery:
KV_STORE_SD_ROOT_CERT = 'service_discovery/root/cert'
def __init__(self, mgr: "CephadmOrchestrator") -> None:
self.mgr = mgr
self.ssl_certs = SSLCerts()
- self.server_port = self.mgr.service_discovery_port
- self.server_addr = '::'
- def configure_routes(self) -> None:
-
- root_server = Root(self.mgr,
- self.server_port,
- self.server_addr,
- self.cert_file.name,
- self.key_file.name)
-
- # configure routes
+ def configure_routes(self, server: Server) -> None:
+ ROUTES = [
+ Route('index', '/', server.index),
+ Route('sd-config', '/prometheus/sd-config', server.get_sd_config),
+ Route('rules', '/prometheus/rules', server.get_prometheus_rules),
+ ]
d = cherrypy.dispatch.RoutesDispatcher()
- d.connect(name='index', route='/', controller=root_server.index)
- d.connect(name='index', route='/sd', controller=root_server.index)
- d.connect(name='index', route='/sd/', controller=root_server.index)
- d.connect(name='sd-config', route='/sd/prometheus/sd-config',
- controller=root_server.get_sd_config)
- d.connect(name='rules', route='/sd/prometheus/rules',
- controller=root_server.get_prometheus_rules)
- cherrypy.tree.mount(None, '/', config={'/': {'request.dispatch': d}})
-
- def configure_tls(self) -> None:
- try:
- old_cert = self.mgr.get_store(self.KV_STORE_SD_ROOT_CERT)
- old_key = self.mgr.get_store(self.KV_STORE_SD_ROOT_KEY)
- if not old_key or not old_cert:
- raise OrchestratorError('No old credentials for service discovery found')
+ for route in ROUTES:
+ d.connect(**route._asdict())
+ conf = {'/': {'request.dispatch': d}}
+ cherrypy.tree.mount(None, '/sd', config=conf)
+
+ def configure_tls(self, server: Server) -> None:
+ old_cert = self.mgr.get_store(self.KV_STORE_SD_ROOT_CERT)
+ old_key = self.mgr.get_store(self.KV_STORE_SD_ROOT_KEY)
+ if old_key and old_cert:
self.ssl_certs.load_root_credentials(old_cert, old_key)
- except (OrchestratorError, KeyError, ValueError):
+ else:
self.ssl_certs.generate_root_cert(self.mgr.get_mgr_ip())
self.mgr.set_store(self.KV_STORE_SD_ROOT_CERT, self.ssl_certs.get_root_cert())
self.mgr.set_store(self.KV_STORE_SD_ROOT_KEY, self.ssl_certs.get_root_key())
- cert, key = self.ssl_certs.generate_cert(self.mgr.get_mgr_ip())
- self.key_file = tempfile.NamedTemporaryFile()
- self.key_file.write(key.encode('utf-8'))
- self.key_file.flush() # pkey_tmp must not be gc'ed
- self.cert_file = tempfile.NamedTemporaryFile()
- self.cert_file.write(cert.encode('utf-8'))
- self.cert_file.flush() # cert_tmp must not be gc'ed
- verify_tls_files(self.cert_file.name, self.key_file.name)
+ host = self.mgr.get_hostname()
+ addr = self.mgr.get_mgr_ip()
+ server.ssl_certificate, server.ssl_private_key = self.ssl_certs.generate_cert_files(host, addr)
- def configure(self) -> None:
- self.configure_tls()
- self.configure_routes()
+ def configure(self, port: int, addr: str) -> None:
+ # we create a new server to enforce TLS/SSL config refresh
+ self.root_server = Root(self.mgr, port, addr)
+ self.configure_tls(self.root_server)
+ self.configure_routes(self.root_server)
class Root(Server):
cherrypy.request.path = ''
return self
- def __init__(self, mgr: "CephadmOrchestrator",
- port: int = 0,
- host: str = '',
- ssl_ca_cert: str = '',
- ssl_priv_key: str = ''):
+ def stop(self) -> None:
+ # we must call unsubscribe before stopping the server,
+ # otherwise the port is not released and we will get
+ # an exception when trying to restart it
+ self.unsubscribe()
+ super().stop()
+
+ def __init__(self, mgr: "CephadmOrchestrator", port: int, host: str):
self.mgr = mgr
super().__init__()
self.socket_port = port
- self._socket_host = host
- self.ssl_certificate = ssl_ca_cert
- self.ssl_private_key = ssl_priv_key
+ self.socket_host = host
self.subscribe()
@cherrypy.expose
for service in cast(List[ServiceInfoT], server.get('services', [])):
if service['type'] != 'mgr':
continue
- port = self.mgr.get_module_option_ex('prometheus', 'server_port', 9283)
+ port = self.mgr.get_module_option_ex(
+ 'prometheus', 'server_port', PrometheusService.DEFAULT_MGR_PROMETHEUS_PORT)
targets.append(f'{hostname}:{port}')
return [{"targets": targets, "labels": {}}]
for dd in self.mgr.cache.get_daemons_by_service('alertmanager'):
assert dd.hostname is not None
addr = dd.ip if dd.ip else self.mgr.inventory.get_addr(dd.hostname)
- port = dd.ports[0] if dd.ports else 9093
+ port = dd.ports[0] if dd.ports else AlertmanagerService.DEFAULT_SERVICE_PORT
srv_entries.append('{}'.format(build_url(host=addr, port=port).lstrip('/')))
return [{"targets": srv_entries, "labels": {}}]
for dd in self.mgr.cache.get_daemons_by_service('node-exporter'):
assert dd.hostname is not None
addr = dd.ip if dd.ip else self.mgr.inventory.get_addr(dd.hostname)
- port = dd.ports[0] if dd.ports else 9100
+ port = dd.ports[0] if dd.ports else NodeExporterService.DEFAULT_SERVICE_PORT
srv_entries.append({
'targets': [build_url(host=addr, port=port).lstrip('/')],
'labels': {'instance': dd.hostname}
'host': daemon_spec.host,
'device_enhanced_scan': str(self.mgr.device_enhanced_scan)}
- listener_cert, listener_key = agent.ssl_certs.generate_cert(
- self.mgr.inventory.get_addr(daemon_spec.host))
+ listener_cert, listener_key = agent.ssl_certs.generate_cert(daemon_spec.host, self.mgr.inventory.get_addr(daemon_spec.host))
config = {
'agent.json': json.dumps(cfg),
'keyring': daemon_spec.keyring,
port = cast(int, self.mgr.get_module_option_ex(
'prometheus', 'server_port', self.DEFAULT_MGR_PROMETHEUS_PORT))
deps.append(str(port))
+ deps.append(str(self.mgr.service_discovery_port))
# add an explicit dependency on the active manager. This will force to
# re-deploy prometheus if the mgr has changed (due to a fail-over i.e).
deps.append(self.mgr.get_active_mgr().name())
class NodeExporterService(CephadmService):
TYPE = 'node-exporter'
+ DEFAULT_SERVICE_PORT = 9100
def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
assert self.TYPE == daemon_spec.daemon_type
-from typing import Any, Tuple
+from typing import Any, Tuple, IO
import ipaddress
+import tempfile
+import os
+import logging
from datetime import datetime, timedelta
from cryptography import x509
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.backends import default_backend
+from OpenSSL import crypto, SSL
from orchestrator import OrchestratorError
+logger = logging.getLogger(__name__)
+
+
+class SSLConfigException(Exception):
+ pass
+
+
class SSLCerts:
def __init__(self) -> None:
self.root_cert: Any
self.root_key: Any
+ self.key_file: IO[bytes]
+ self.cert_file: IO[bytes]
- def generate_root_cert(self, host: str) -> Tuple[str, str]:
+ def generate_root_cert(self, addr: str) -> Tuple[str, str]:
self.root_key = rsa.generate_private_key(
public_exponent=65537, key_size=4096, backend=default_backend())
root_public_key = self.root_key.public_key()
root_builder = root_builder.public_key(root_public_key)
root_builder = root_builder.add_extension(
x509.SubjectAlternativeName(
- [x509.IPAddress(ipaddress.IPv4Address(host))]
+ [x509.IPAddress(ipaddress.IPv4Address(addr))]
),
critical=False
)
return (cert_str, key_str)
- def generate_cert(self, addr: str) -> Tuple[str, str]:
+ def generate_cert(self, host: str, addr: str) -> Tuple[str, str]:
have_ip = True
try:
ip = x509.IPAddress(ipaddress.IPv4Address(addr))
if have_ip:
builder = builder.add_extension(
x509.SubjectAlternativeName(
- [ip]
+ [ip, x509.DNSName(host)]
+ ),
+ critical=False
+ )
+ else:
+ builder = builder.add_extension(
+ x509.SubjectAlternativeName(
+ [x509.DNSName(host)]
),
critical=False
)
return (cert_str, key_str)
+ def generate_cert_files(self, host: str, addr: str) -> Tuple[str, str]:
+ cert, key = self.generate_cert(host, addr)
+
+ self.cert_file = tempfile.NamedTemporaryFile()
+ self.cert_file.write(cert.encode('utf-8'))
+ self.cert_file.flush() # cert_tmp must not be gc'ed
+
+ self.key_file = tempfile.NamedTemporaryFile()
+ self.key_file.write(key.encode('utf-8'))
+ self.key_file.flush() # pkey_tmp must not be gc'ed
+
+ verify_tls_files(self.cert_file.name, self.key_file.name)
+ return self.cert_file.name, self.key_file.name
+
def get_root_cert(self) -> str:
try:
return self.root_cert.public_bytes(encoding=serialization.Encoding.PEM).decode('utf-8')
self.root_cert = given_cert
self.root_key = serialization.load_pem_private_key(
data=priv_key.encode('utf-8'), backend=default_backend(), password=None)
+
+
+def verify_tls(crt, key):
+ # type: (str, str) -> None
+ verify_cacrt_content(crt)
+
+ try:
+ _key = crypto.load_privatekey(crypto.FILETYPE_PEM, key)
+ _key.check()
+ except (ValueError, crypto.Error) as e:
+ raise SSLConfigException('Invalid private key: {}'.format(str(e)))
+ try:
+ _crt = crypto.load_certificate(crypto.FILETYPE_PEM, crt)
+ except ValueError as e:
+ raise SSLConfigException('Invalid certificate key: {}'.format(str(e))
+ )
+
+ try:
+ context = SSL.Context(SSL.TLSv1_METHOD)
+ context.use_certificate(_crt)
+ context.use_privatekey(_key)
+ context.check_privatekey()
+ except crypto.Error as e:
+ logger.warning(f'Private key and certificate do not match up: {e}')
+
+
+def verify_tls_files(cert_fname, pkey_fname):
+ # type: (str, str) -> None
+ """Basic checks for TLS certificate and key files
+
+ Do some validations to the private key and certificate:
+ - Check the type and format
+ - Check the certificate expiration date
+ - Check the consistency of the private key
+ - Check that the private key and certificate match up
+
+ :param cert_fname: Name of the certificate file
+ :param pkey_fname: name of the certificate public key file
+
+ :raises SSLConfigException: An error with a message
+
+ """
+ if not cert_fname or not pkey_fname:
+ raise SSLConfigException('no certificate configured')
+
+ verify_cacrt(cert_fname)
+
+ if not os.path.isfile(pkey_fname):
+ raise SSLConfigException('private key %s does not exist' % pkey_fname)
+
+ try:
+ with open(pkey_fname) as f:
+ pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, f.read())
+ pkey.check()
+ except (ValueError, crypto.Error) as e:
+ raise SSLConfigException(
+ 'Invalid private key {}: {}'.format(pkey_fname, str(e)))
+ try:
+ context = SSL.Context(SSL.TLSv1_METHOD)
+ context.use_certificate_file(cert_fname, crypto.FILETYPE_PEM)
+ context.use_privatekey_file(pkey_fname, crypto.FILETYPE_PEM)
+ context.check_privatekey()
+ except crypto.Error as e:
+ logger.warning(
+ f'Private key {pkey_fname} and certificate {cert_fname} do not match up: {e}')
+
+
+def verify_cacrt(cert_fname):
+ # type: (str) -> None
+ """Basic validation of a ca cert"""
+
+ if not cert_fname:
+ raise SSLConfigException("CA cert not configured")
+ if not os.path.isfile(cert_fname):
+ raise SSLConfigException("Certificate {} does not exist".format(cert_fname))
+
+ try:
+ with open(cert_fname) as f:
+ verify_cacrt_content(f.read())
+ except ValueError as e:
+ raise SSLConfigException(
+ 'Invalid certificate {}: {}'.format(cert_fname, str(e)))
+
+
+def verify_cacrt_content(crt):
+ # type: (str) -> None
+ try:
+ x509 = crypto.load_certificate(crypto.FILETYPE_PEM, crt)
+ if x509.has_expired():
+ logger.warning(f'Certificate has expired: {crt}')
+ except (ValueError, crypto.Error) as e:
+ raise SSLConfigException(f'Invalid certificate: {e}')
def test_get_sd_config_prometheus(self):
mgr = FakeMgr()
- root = Root(mgr)
+ root = Root(mgr, 5000, '0.0.0.0')
cfg = root.get_sd_config('mgr-prometheus')
# check response structure
def test_get_sd_config_node_exporter(self):
mgr = FakeMgr()
- root = Root(mgr)
+ root = Root(mgr, 5000, '0.0.0.0')
cfg = root.get_sd_config('node-exporter')
# check response structure
def test_get_sd_config_alertmgr(self):
mgr = FakeMgr()
- root = Root(mgr)
+ root = Root(mgr, 5000, '0.0.0.0')
cfg = root.get_sd_config('alertmanager')
# check response structure
def test_get_sd_config_haproxy(self):
mgr = FakeMgr()
- root = Root(mgr)
+ root = Root(mgr, 5000, '0.0.0.0')
cfg = root.get_sd_config('haproxy')
# check response structure
def test_get_sd_config_invalid_service(self):
mgr = FakeMgr()
- root = Root(mgr)
+ root = Root(mgr, 5000, '0.0.0.0')
cfg = root.get_sd_config('invalid-service')
assert cfg == []
return socket.gethostname()
return ips[0]
+ def get_hostname(self) -> str:
+ return socket.gethostname()
+
def get_localized_module_option(self, key: str, default: OptionValue = None) -> OptionValue:
r = self._ceph_get_module_option(key, self.get_mgr_id())
if r is None:
assert self._mgr_ips is not None
return self._mgr_ips
+ @API.expose
+ def get_hostname(self) -> str:
+ return socket.gethostname()
+
@API.expose
def get_ceph_option(self, key: str) -> OptionValue:
return self._ceph_get_option(key)
logger = logging.getLogger(__name__)
+class PortAlreadyInUse(Exception):
+ pass
+
+
class CephfsConnectionException(Exception):
def __init__(self, error_code: int, error_message: str):
self.errno = error_code
return format_units(n, width, colored, decimal=False)
+def test_port_allocation(addr: str, port: int) -> None:
+ """Checks if the port is available
+ :raises PortAlreadyInUse: in case port is already in use
+ :raises Exception: any generic error other than port already in use
+ """
+ try:
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.bind((addr, port))
+ sock.close()
+ except socket.error as e:
+ if e.errno == errno.EADDRINUSE:
+ raise PortAlreadyInUse
+ else:
+ raise e
+
+
def merge_dicts(*args: Dict[T, Any]) -> Dict[T, Any]:
"""
>>> merge_dicts({1:2}, {3:4})
common_name: str = 'mgr',
dname: Optional[Dict[str, str]] = None) -> Tuple[str, str]:
"""Returns self-signed PEM certificates valid for 10 years.
-
+
The optional dname parameter provides complete control of the cert/key
creation by supporting all valid RDNs via a dictionary. However, if dname
is not provided the default O and CN settings will be applied.