From aa057f19b751bdb8894b4af38c05afe534299a90 Mon Sep 17 00:00:00 2001 From: Adam King Date: Thu, 9 Dec 2021 13:58:14 -0500 Subject: [PATCH] mgr/cephadm: reconfig agents over http Fixes: https://tracker.ceph.com/issues/53570 Signed-off-by: Adam King --- src/cephadm/cephadm | 47 +++++++++++++---- src/pybind/mgr/cephadm/agent.py | 82 ++++++++++++++++++++++++----- src/pybind/mgr/cephadm/inventory.py | 5 ++ 3 files changed, 112 insertions(+), 22 deletions(-) diff --git a/src/cephadm/cephadm b/src/cephadm/cephadm index bfc7fd64624..5e2d958d667 100755 --- a/src/cephadm/cephadm +++ b/src/cephadm/cephadm @@ -3532,6 +3532,8 @@ class MgrListener(Thread): logger.error(err_str) else: conn.send(b'ACK') + if 'config' in data: + self.agent.wakeup() self.agent.ls_gatherer.wakeup() self.agent.volume_gatherer.wakeup() logger.debug(f'Got mgr message {data}') @@ -3543,6 +3545,15 @@ class MgrListener(Thread): def handle_json_payload(self, data: Dict[Any, Any]) -> None: self.agent.ack = int(data['counter']) + if 'config' in data: + logger.info('Received new config from mgr') + config = data['config'] + for filename in config: + if filename in self.agent.required_files: + with open(os.path.join(self.agent.daemon_dir, filename), 'w') as f: + f.write(config[filename]) + self.agent.pull_conf_settings() + self.agent.wakeup() class CephadmAgent(): @@ -3552,12 +3563,19 @@ class CephadmAgent(): loop_interval = 30 stop = False - required_files = ['agent.json', 'keyring'] + required_files = [ + 'agent.json', + 'keyring', + 'root_cert.pem', + 'listener.crt', + 'listener.key', + ] def __init__(self, ctx: CephadmContext, fsid: str, daemon_id: Union[int, str] = ''): self.ctx = ctx self.fsid = fsid self.daemon_id = daemon_id + self.starting_port = 14873 self.target_ip = '' self.target_port = '' self.host = '' @@ -3578,15 +3596,23 @@ class CephadmAgent(): self.recent_iteration_index: int = 0 self.cached_ls_values: Dict[str, Dict[str, str]] = {} + def validate(self, config: Dict[str, str] = {}) -> None: + # check for the required files + for fname in self.required_files: + if fname not in config: + raise Error('required file missing from config: %s' % fname) + def deploy_daemon_unit(self, config: Dict[str, str] = {}) -> None: if not config: raise Error('Agent needs a config') assert isinstance(config, dict) + self.validate(config) # Create the required config files in the daemons dir, with restricted permissions for filename in config: - with open(os.path.join(self.daemon_dir, filename), 'w') as f: - f.write(config[filename]) + if filename in self.required_files: + with open(os.path.join(self.daemon_dir, filename), 'w') as f: + f.write(config[filename]) with open(os.path.join(self.daemon_dir, 'unit.run'), 'w') as f: f.write(self.unit_run()) @@ -3640,14 +3666,14 @@ WantedBy=ceph-{fsid}.target def wakeup(self) -> None: self.event.set() - def run(self) -> None: + def pull_conf_settings(self) -> None: try: with open(self.config_path, 'r') as f: config = json.load(f) self.target_ip = config['target_ip'] self.target_port = config['target_port'] self.loop_interval = int(config['refresh_period']) - starting_port = int(config['listener_port']) + self.starting_port = int(config['listener_port']) self.host = config['host'] use_lsm = config['device_enhanced_scan'] except Exception as e: @@ -3667,14 +3693,17 @@ WantedBy=ceph-{fsid}.target if use_lsm.lower() == 'true': self.device_enhanced_scan = True + def run(self) -> None: + self.pull_conf_settings() + try: for _ in range(1001): - if not port_in_use(self.ctx, starting_port): - self.listener_port = str(starting_port) + if not port_in_use(self.ctx, self.starting_port): + self.listener_port = str(self.starting_port) break - starting_port += 1 + self.starting_port += 1 if not self.listener_port: - raise Error(f'All 1000 ports starting at {str(starting_port - 1001)} taken.') + raise Error(f'All 1000 ports starting at {str(self.starting_port - 1001)} taken.') except Exception as e: raise Error(f'Failed to pick port for agent to listen on: {e}') diff --git a/src/pybind/mgr/cephadm/agent.py b/src/pybind/mgr/cephadm/agent.py index 64ab95cc4ee..728e59c25ed 100644 --- a/src/pybind/mgr/cephadm/agent.py +++ b/src/pybind/mgr/cephadm/agent.py @@ -8,23 +8,23 @@ import tempfile import threading import time -# from orchestrator import OrchestratorError from mgr_util import verify_tls_files -from orchestrator import DaemonDescriptionStatus +from orchestrator import DaemonDescriptionStatus, OrchestratorError +from orchestrator._interface import daemon_type_to_service from ceph.utils import datetime_now from ceph.deployment.inventory import Devices from ceph.deployment.service_spec import ServiceSpec, PlacementSpec from cephadm.services.cephadmservice import CephadmDaemonDeploySpec from datetime import datetime, timedelta -from OpenSSL import crypto from cryptography import x509 from cryptography.x509.oid import NameOID from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.backends import default_backend -from typing import Any, Dict, List, Set, Tuple, TYPE_CHECKING +from typing import Any, Dict, List, Set, Tuple, \ + TYPE_CHECKING, Optional if TYPE_CHECKING: from cephadm.module import CephadmOrchestrator @@ -53,7 +53,17 @@ class CherryPyThread(threading.Thread): def run(self) -> None: try: - self.ssl_certs.generate_root_cert() + try: + old_creds = self.mgr.get_store('cephadm_endpoint_credentials') + if not old_creds: + raise OrchestratorError('No old credentials for cephadm endpoint found') + old_creds_dict = json.loads(old_creds) + old_key = old_creds_dict['key'] + old_cert = old_creds_dict['cert'] + self.ssl_certs.load_root_credentials(old_cert, old_key) + except (OrchestratorError, json.decoder.JSONDecodeError, KeyError, ValueError): + self.ssl_certs.generate_root_cert() + cert, key = self.ssl_certs.generate_cert() self.key_tmp = tempfile.NamedTemporaryFile() @@ -82,6 +92,12 @@ class CherryPyThread(threading.Thread): self.mgr.log.debug('Starting cherrypy engine...') self.start_engine() self.mgr.log.debug('Cherrypy engine started.') + cephadm_endpoint_creds = { + 'cert': self.ssl_certs.get_root_cert(), + 'key': self.ssl_certs.get_root_key() + } + self.mgr.set_store('cephadm_endpoint_credentials', json.dumps(cephadm_endpoint_creds)) + self.mgr._kick_serve_loop() # wait for the shutdown event self.cherrypy_shutdown_event.wait() self.cherrypy_shutdown_event.clear() @@ -338,7 +354,7 @@ class CephadmAgentHelpers: def __init__(self, mgr: "CephadmOrchestrator"): self.mgr: "CephadmOrchestrator" = mgr - def _request_agent_acks(self, hosts: Set[str], increment: bool = False) -> None: + def _request_agent_acks(self, hosts: Set[str], increment: bool = False, new_config: Optional[Dict[str, str]] = None) -> None: for host in hosts: if increment: self.mgr.cache.metadata_up_to_date[host] = False @@ -346,8 +362,11 @@ class CephadmAgentHelpers: self.mgr.cache.agent_counter[host] = 1 elif increment: self.mgr.cache.agent_counter[host] = self.mgr.cache.agent_counter[host] + 1 + payload: Dict[str, Any] = {'counter': self.mgr.cache.agent_counter[host]} + if new_config: + payload['config'] = new_config message_thread = AgentMessageThread( - host, self.mgr.cache.agent_ports[host], {'counter': self.mgr.cache.agent_counter[host]}, self.mgr) + host, self.mgr.cache.agent_ports[host], payload, self.mgr) message_thread.start() def _agent_down(self, host: str) -> bool: @@ -450,8 +469,33 @@ class CephadmAgentHelpers: last_deps, last_config = self.mgr.cache.get_daemon_last_config_deps( host, agent.name()) if not last_config or last_deps != deps: + # if root cert is the dep that changed, we must use ssh to reconfig + # so it's necessary to check this one specifically + root_cert_match = False + try: + root_cert = self.mgr.cherrypy_thread.ssl_certs.get_root_cert() + if last_deps and root_cert in last_deps: + root_cert_match = True + except Exception: + pass daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(agent) - self.mgr._daemon_action(daemon_spec, action='reconfig') + # we need to know the agent port to try to reconfig w/ http + # otherwise there is no choice but a full ssh reconfig + if host in self.mgr.cache.agent_ports and root_cert_match: + daemon_spec = self.mgr.cephadm_services[daemon_type_to_service( + daemon_spec.daemon_type)].prepare_create(daemon_spec) + self.mgr.cache.agent_timestamp[daemon_spec.host] = datetime_now() + self.mgr.cache.agent_counter[daemon_spec.host] = 1 + self.mgr.agent_helpers._request_agent_acks( + hosts={daemon_spec.host}, + increment=True, + new_config=daemon_spec.final_config + ) + self.mgr.cache.update_daemon_config_deps( + daemon_spec.host, daemon_spec.name(), daemon_spec.deps, datetime_now()) + self.mgr.cache.save_host(daemon_spec.host) + else: + self.mgr._daemon_action(daemon_spec, action='reconfig') return False except Exception as e: self.mgr.log.debug( @@ -473,7 +517,6 @@ class SSLCerts: self.mgr = mgr self.root_cert: Any self.root_key: Any - self.root_subj: Any def generate_root_cert(self) -> Tuple[str, str]: self.root_key = rsa.generate_private_key( @@ -508,7 +551,7 @@ class SSLCerts: private_key=self.root_key, algorithm=hashes.SHA256(), backend=default_backend() ) - cert_str = crypto.dump_certificate(crypto.FILETYPE_PEM, self.root_cert).decode('utf-8') + cert_str = self.root_cert.public_bytes(encoding=serialization.Encoding.PEM).decode('utf-8') key_str = self.root_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.TraditionalOpenSSL, @@ -564,7 +607,7 @@ class SSLCerts: private_key=self.root_key, algorithm=hashes.SHA256(), backend=default_backend() ) - cert_str = crypto.dump_certificate(crypto.FILETYPE_PEM, cert).decode('utf-8') + cert_str = cert.public_bytes(encoding=serialization.Encoding.PEM).decode('utf-8') key_str = private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.TraditionalOpenSSL, @@ -575,12 +618,25 @@ class SSLCerts: def get_root_cert(self) -> str: try: - return crypto.dump_certificate(crypto.FILETYPE_PEM, self.root_cert).decode('utf-8') + return self.root_cert.public_bytes(encoding=serialization.Encoding.PEM).decode('utf-8') except AttributeError: return '' def get_root_key(self) -> str: try: - return crypto.dump_certificate(crypto.FILETYPE_PEM, self.root_key).decode('utf-8') + return self.root_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=serialization.NoEncryption(), + ).decode('utf-8') except AttributeError: return '' + + def load_root_credentials(self, cert: str, priv_key: str) -> None: + given_cert = x509.load_pem_x509_certificate(cert.encode('utf-8'), backend=default_backend()) + tz = given_cert.not_valid_after.tzinfo + if datetime.now(tz) >= given_cert.not_valid_after: + raise OrchestratorError('Given cert is expired') + self.root_cert = given_cert + self.root_key = serialization.load_pem_private_key( + data=priv_key.encode('utf-8'), backend=default_backend(), password=None) diff --git a/src/pybind/mgr/cephadm/inventory.py b/src/pybind/mgr/cephadm/inventory.py index 48e087997a7..805346bbe66 100644 --- a/src/pybind/mgr/cephadm/inventory.py +++ b/src/pybind/mgr/cephadm/inventory.py @@ -512,6 +512,9 @@ class HostCache(): self.agent_counter[host] = int(j.get('agent_counter', 1)) self.metadata_up_to_date[host] = False self.agent_keys[host] = str(j.get('agent_keys', '')) + agent_port = int(j.get('agent_ports', 0)) + if agent_port: + self.agent_ports[host] = agent_port self.mgr.log.debug( 'HostCache.load: host %s has %d daemons, ' @@ -706,6 +709,8 @@ class HostCache(): j['agent_counter'] = self.agent_counter[host] if host in self.agent_keys: j['agent_keys'] = self.agent_keys[host] + if host in self.agent_ports: + j['agent_ports'] = self.agent_ports[host] self.mgr.set_store(HOST_CACHE_PREFIX + host, json.dumps(j)) -- 2.39.5