secureListenSocket = ssl_ctx.wrap_socket(listenSocket, server_side=True)
while not self.stop:
try:
- conn, _ = secureListenSocket.accept()
- except socket.timeout:
- continue
- try:
- length: int = int(conn.recv(10).decode())
- except Exception as e:
- err_str = f'Failed to extract length of payload from message: {e}'
- conn.send(err_str.encode())
- logger.error(err_str)
- while True:
- payload = conn.recv(length).decode()
- if not payload:
- break
try:
- data: Dict[Any, Any] = json.loads(payload)
- self.handle_json_payload(data)
+ conn, _ = secureListenSocket.accept()
+ except socket.timeout:
+ continue
+ try:
+ length: int = int(conn.recv(10).decode())
except Exception as e:
- err_str = f'Failed to extract json payload from message: {e}'
+ err_str = f'Failed to extract length of payload from message: {e}'
conn.send(err_str.encode())
logger.error(err_str)
- else:
- conn.send(b'ACK')
- self.agent.wakeup()
- logger.debug(f'Got mgr message {data}')
+ while True:
+ payload = conn.recv(length).decode()
+ if not payload:
+ break
+ try:
+ data: Dict[Any, Any] = json.loads(payload)
+ self.handle_json_payload(data)
+ except Exception as e:
+ err_str = f'Failed to extract json payload from message: {e}'
+ conn.send(err_str.encode())
+ logger.error(err_str)
+ else:
+ conn.send(b'ACK')
+ self.agent.wakeup()
+ logger.debug(f'Got mgr message {data}')
+ except Exception as e:
+ logger.error(f'Mgr Listener encountered exception: {e}')
def shutdown(self) -> None:
self.stop = True
self.mgr_listener.start()
ssl_ctx = ssl.create_default_context()
- ssl_ctx.verify_mode = ssl.CERT_REQUIRED
ssl_ctx.check_hostname = True
+ ssl_ctx.verify_mode = ssl.CERT_REQUIRED
ssl_ctx.load_verify_locations(self.ca_path)
while not self.stop:
import cherrypy
+import ipaddress
import json
import socket
import ssl
import threading
import time
+from orchestrator import OrchestratorError
from mgr_util import verify_tls_files
from ceph.utils import datetime_now
from ceph.deployment.inventory import Devices
from ceph.deployment.service_spec import ServiceSpec, PlacementSpec
+from datetime import datetime, timedelta
from OpenSSL import crypto
+from cryptography import x509
+from cryptography.x509.oid import NameOID
+from cryptography.hazmat.primitives.asymmetric import rsa
+from cryptography.hazmat.primitives import hashes, serialization
+from cryptography.hazmat.backends import default_backend
+
from typing import Any, Dict, Set, Tuple, TYPE_CHECKING
-from uuid import uuid4
if TYPE_CHECKING:
from cephadm.module import CephadmOrchestrator
def __init__(self, host: str, port: int, data: Dict[Any, Any], mgr: "CephadmOrchestrator") -> None:
self.mgr = mgr
self.host = host
+ self.addr = self.mgr.inventory.get_addr(host)
self.port = port
self.data: str = json.dumps(data)
super(AgentMessageThread, self).__init__(target=self.run)
for retry_wait in [3, 5]:
try:
agent_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- secure_agent_socket = ssl_ctx.wrap_socket(agent_socket, server_hostname=self.host)
- secure_agent_socket.connect((self.mgr.inventory.get_addr(self.host), self.port))
+ secure_agent_socket = ssl_ctx.wrap_socket(agent_socket, server_hostname=self.addr)
+ secure_agent_socket.connect((self.addr, self.port))
msg = (bytes_len + self.data)
secure_agent_socket.sendall(msg.encode('utf-8'))
agent_response = secure_agent_socket.recv(1024).decode()
self.root_subj: Any
def generate_root_cert(self) -> Tuple[str, str]:
- self.root_key = crypto.PKey()
- self.root_key.generate_key(crypto.TYPE_RSA, 2048)
-
- self.root_cert = crypto.X509()
- self.root_cert.set_serial_number(int(uuid4()))
-
- self.root_subj = self.root_cert.get_subject()
- self.root_subj.commonName = "cephadm-root"
+ self.root_key = rsa.generate_private_key(
+ public_exponent=65537, key_size=4096, backend=default_backend())
+ root_public_key = self.root_key.public_key()
+
+ root_builder = x509.CertificateBuilder()
+
+ root_builder = root_builder.subject_name(x509.Name([
+ x509.NameAttribute(NameOID.COMMON_NAME, u'cephadm-root'),
+ ]))
+
+ root_builder = root_builder.issuer_name(x509.Name([
+ x509.NameAttribute(NameOID.COMMON_NAME, u'cephadm-root'),
+ ]))
+
+ root_builder = root_builder.not_valid_before(datetime.now())
+ root_builder = root_builder.not_valid_after(datetime.now() + timedelta(days=(365 * 10 + 3)))
+ root_builder = root_builder.serial_number(x509.random_serial_number())
+ root_builder = root_builder.public_key(root_public_key)
+ root_builder = root_builder.add_extension(
+ x509.SubjectAlternativeName(
+ [x509.IPAddress(ipaddress.IPv4Address(str(self.mgr.get_mgr_ip())))]
+ ),
+ critical=False
+ )
+ root_builder = root_builder.add_extension(
+ x509.BasicConstraints(ca=True, path_length=None), critical=True,
+ )
- self.root_cert.set_issuer(self.root_subj)
- self.root_cert.set_pubkey(self.root_key)
- self.root_cert.gmtime_adj_notBefore(0)
- self.root_cert.gmtime_adj_notAfter(10 * 365 * 24 * 60 * 60) # 10 years
- self.root_cert.sign(self.root_key, 'sha256')
+ self.root_cert = root_builder.sign(
+ private_key=self.root_key, algorithm=hashes.SHA256(), backend=default_backend()
+ )
cert_str = crypto.dump_certificate(crypto.FILETYPE_PEM, self.root_cert).decode('utf-8')
- key_str = crypto.dump_privatekey(crypto.FILETYPE_PEM, self.root_key).decode('utf-8')
+ key_str = self.root_key.private_bytes(
+ encoding=serialization.Encoding.PEM,
+ format=serialization.PrivateFormat.TraditionalOpenSSL,
+ encryption_algorithm=serialization.NoEncryption()
+ ).decode('utf-8')
return (cert_str, key_str)
- def generate_cert(self, name: str = '') -> Tuple[str, str]:
- key = crypto.PKey()
- key.generate_key(crypto.TYPE_RSA, 2048)
-
- cert = crypto.X509()
- cert.set_serial_number(int(uuid4()))
-
- subj = cert.get_subject()
- if not name:
- subj.commonName = str(self.mgr.get_mgr_ip())
- else:
- subj.commonName = name
+ def generate_cert(self, addr: str = '') -> Tuple[str, str]:
+ if addr:
+ try:
+ ipaddress.IPv4Address(addr)
+ except Exception:
+ raise OrchestratorError(
+ f'Address supplied to build cert ({addr}) is not valid IPv4 address')
+
+ private_key = rsa.generate_private_key(
+ public_exponent=65537, key_size=4096, backend=default_backend())
+ public_key = private_key.public_key()
+
+ builder = x509.CertificateBuilder()
+
+ builder = builder.subject_name(x509.Name([
+ x509.NameAttribute(NameOID.COMMON_NAME, addr if addr else str(self.mgr.get_mgr_ip())),
+ ]))
+
+ builder = builder.issuer_name(x509.Name([
+ x509.NameAttribute(NameOID.COMMON_NAME, u'cephadm-root'),
+ ]))
+
+ builder = builder.not_valid_before(datetime.now())
+ builder = builder.not_valid_after(datetime.now() + timedelta(days=(365 * 10 + 3)))
+ builder = builder.serial_number(x509.random_serial_number())
+ builder = builder.public_key(public_key)
+ builder = builder.add_extension(
+ x509.SubjectAlternativeName(
+ [x509.IPAddress(ipaddress.IPv4Address(
+ addr if addr else str(self.mgr.get_mgr_ip())))]
+ ),
+ critical=False
+ )
+ builder = builder.add_extension(
+ x509.BasicConstraints(ca=False, path_length=None), critical=True,
+ )
- cert.set_issuer(self.root_subj)
- cert.set_pubkey(key)
- cert.gmtime_adj_notBefore(0)
- cert.gmtime_adj_notAfter(10 * 365 * 24 * 60 * 60) # 10 years
- cert.sign(self.root_key, 'sha256')
+ cert = builder.sign(
+ private_key=self.root_key, algorithm=hashes.SHA256(), backend=default_backend()
+ )
cert_str = crypto.dump_certificate(crypto.FILETYPE_PEM, cert).decode('utf-8')
- key_str = crypto.dump_privatekey(crypto.FILETYPE_PEM, key).decode('utf-8')
+ key_str = private_key.private_bytes(
+ encoding=serialization.Encoding.PEM,
+ format=serialization.PrivateFormat.TraditionalOpenSSL,
+ encryption_algorithm=serialization.NoEncryption()
+ ).decode('utf-8')
+
return (cert_str, key_str)
def get_root_cert(self) -> str: