from threading import Event
from functools import wraps
-from uuid import uuid4
-from OpenSSL import crypto
+from mgr_util import create_self_signed_cert
import string
try:
data_sources=data_sources,
)
- def create_self_signed_cert() -> Tuple[str, str]:
- # create a key pair
- pkey = crypto.PKey()
- pkey.generate_key(crypto.TYPE_RSA, 2048)
-
- # create a self-signed cert
- cert = crypto.X509()
- cert.get_subject().O = "Ceph"
- cert.get_subject().CN = "cephadm"
- cert.set_serial_number(int(uuid4()))
- cert.gmtime_adj_notBefore(0)
- cert.gmtime_adj_notAfter(10 * 365 * 24 * 60 * 60)
- cert.set_issuer(cert.get_subject())
- cert.set_pubkey(pkey)
- cert.sign(pkey, 'sha512')
-
- cert = crypto.dump_certificate(crypto.FILETYPE_PEM, cert)
- pkey = crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey)
-
- return str(cert.decode('utf-8')), str(pkey.decode('utf-8'))
-
prom_services = [ps.hostname for ps in self.cache.get_daemons_by_service('prometheus')]
- cert, pkey = create_self_signed_cert()
+ cert, pkey = create_self_signed_cert('Ceph', 'cephadm')
config_file = json.dumps({
'files': {
"grafana.ini": """# generated by cephadm
import tempfile
import threading
import time
-from uuid import uuid4
-from OpenSSL import crypto
from mgr_module import MgrModule, MgrStandbyModule, Option, CLIWriteCommand
-from mgr_util import get_default_addr, ServerConfigException, verify_tls_files
+from mgr_util import get_default_addr, ServerConfigException, verify_tls_files, \
+ create_self_signed_cert
try:
import cherrypy
.format(cmd['prefix']))
def create_self_signed_cert(self):
- # create a key pair
- pkey = crypto.PKey()
- pkey.generate_key(crypto.TYPE_RSA, 2048)
-
- # create a self-signed cert
- cert = crypto.X509()
- cert.get_subject().O = "IT"
- cert.get_subject().CN = "ceph-dashboard"
- cert.set_serial_number(int(uuid4()))
- cert.gmtime_adj_notBefore(0)
- cert.gmtime_adj_notAfter(10*365*24*60*60)
- cert.set_issuer(cert.get_subject())
- cert.set_pubkey(pkey)
- cert.sign(pkey, 'sha512')
-
- cert = crypto.dump_certificate(crypto.FILETYPE_PEM, cert)
- self.set_store('crt', cert.decode('utf-8'))
-
- pkey = crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey)
- self.set_store('key', pkey.decode('utf-8'))
+ cert, pkey = create_self_signed_cert('IT', 'ceph-dashboard')
+ self.set_store('crt', cert)
+ self.set_store('key', pkey)
def notify(self, notify_type, notify_id):
NotificationQueue.new_notification(notify_type, notify_id)
import socket
import logging
+try:
+ from typing import Tuple
+except ImportError:
+ TYPE_CHECKING = False # just for type checking
+
(
BLACK,
RED,
class ServerConfigException(Exception):
pass
+
+def create_self_signed_cert(organisation='Ceph', common_name='mgr') -> Tuple[str, str]:
+ """Returns self-signed PEM certificates valid for 10 years.
+ :return cert, pkey
+ """
+
+ from OpenSSL import crypto
+ from uuid import uuid4
+
+ # create a key pair
+ pkey = crypto.PKey()
+ pkey.generate_key(crypto.TYPE_RSA, 2048)
+
+ # create a self-signed cert
+ cert = crypto.X509()
+ cert.get_subject().O = organisation
+ cert.get_subject().CN = common_name
+ cert.set_serial_number(int(uuid4()))
+ cert.gmtime_adj_notBefore(0)
+ cert.gmtime_adj_notAfter(10 * 365 * 24 * 60 * 60) # 10 years
+ cert.set_issuer(cert.get_subject())
+ cert.set_pubkey(pkey)
+ cert.sign(pkey, 'sha512')
+
+ cert = crypto.dump_certificate(crypto.FILETYPE_PEM, cert)
+ pkey = crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey)
+
+ return cert.decode('utf-8'), pkey.decode('utf-8')
+
+
def verify_cacrt(cert_fname):
# type: (str) -> None
"""Basic validation of a ca cert"""