]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/cephadm: reconfig agents over http
authorAdam King <adking@redhat.com>
Thu, 9 Dec 2021 18:58:14 +0000 (13:58 -0500)
committerAdam King <adking@redhat.com>
Thu, 16 Dec 2021 12:33:50 +0000 (07:33 -0500)
Fixes: https://tracker.ceph.com/issues/53570
Signed-off-by: Adam King <adking@redhat.com>
src/cephadm/cephadm
src/pybind/mgr/cephadm/agent.py
src/pybind/mgr/cephadm/inventory.py

index bfc7fd64624f5568325b4ee40223143b44332afd..5e2d958d667483c7886a2034b3a6651a3026ae47 100755 (executable)
@@ -3532,6 +3532,8 @@ class MgrListener(Thread):
                         logger.error(err_str)
                     else:
                         conn.send(b'ACK')
+                        if 'config' in data:
+                            self.agent.wakeup()
                         self.agent.ls_gatherer.wakeup()
                         self.agent.volume_gatherer.wakeup()
                         logger.debug(f'Got mgr message {data}')
@@ -3543,6 +3545,15 @@ class MgrListener(Thread):
 
     def handle_json_payload(self, data: Dict[Any, Any]) -> None:
         self.agent.ack = int(data['counter'])
+        if 'config' in data:
+            logger.info('Received new config from mgr')
+            config = data['config']
+            for filename in config:
+                if filename in self.agent.required_files:
+                    with open(os.path.join(self.agent.daemon_dir, filename), 'w') as f:
+                        f.write(config[filename])
+            self.agent.pull_conf_settings()
+            self.agent.wakeup()
 
 
 class CephadmAgent():
@@ -3552,12 +3563,19 @@ class CephadmAgent():
     loop_interval = 30
     stop = False
 
-    required_files = ['agent.json', 'keyring']
+    required_files = [
+        'agent.json',
+        'keyring',
+        'root_cert.pem',
+        'listener.crt',
+        'listener.key',
+    ]
 
     def __init__(self, ctx: CephadmContext, fsid: str, daemon_id: Union[int, str] = ''):
         self.ctx = ctx
         self.fsid = fsid
         self.daemon_id = daemon_id
+        self.starting_port = 14873
         self.target_ip = ''
         self.target_port = ''
         self.host = ''
@@ -3578,15 +3596,23 @@ class CephadmAgent():
         self.recent_iteration_index: int = 0
         self.cached_ls_values: Dict[str, Dict[str, str]] = {}
 
+    def validate(self, config: Dict[str, str] = {}) -> None:
+        # check for the required files
+        for fname in self.required_files:
+            if fname not in config:
+                raise Error('required file missing from config: %s' % fname)
+
     def deploy_daemon_unit(self, config: Dict[str, str] = {}) -> None:
         if not config:
             raise Error('Agent needs a config')
         assert isinstance(config, dict)
+        self.validate(config)
 
         # Create the required config files in the daemons dir, with restricted permissions
         for filename in config:
-            with open(os.path.join(self.daemon_dir, filename), 'w') as f:
-                f.write(config[filename])
+            if filename in self.required_files:
+                with open(os.path.join(self.daemon_dir, filename), 'w') as f:
+                    f.write(config[filename])
 
         with open(os.path.join(self.daemon_dir, 'unit.run'), 'w') as f:
             f.write(self.unit_run())
@@ -3640,14 +3666,14 @@ WantedBy=ceph-{fsid}.target
     def wakeup(self) -> None:
         self.event.set()
 
-    def run(self) -> None:
+    def pull_conf_settings(self) -> None:
         try:
             with open(self.config_path, 'r') as f:
                 config = json.load(f)
                 self.target_ip = config['target_ip']
                 self.target_port = config['target_port']
                 self.loop_interval = int(config['refresh_period'])
-                starting_port = int(config['listener_port'])
+                self.starting_port = int(config['listener_port'])
                 self.host = config['host']
                 use_lsm = config['device_enhanced_scan']
         except Exception as e:
@@ -3667,14 +3693,17 @@ WantedBy=ceph-{fsid}.target
         if use_lsm.lower() == 'true':
             self.device_enhanced_scan = True
 
+    def run(self) -> None:
+        self.pull_conf_settings()
+
         try:
             for _ in range(1001):
-                if not port_in_use(self.ctx, starting_port):
-                    self.listener_port = str(starting_port)
+                if not port_in_use(self.ctx, self.starting_port):
+                    self.listener_port = str(self.starting_port)
                     break
-                starting_port += 1
+                self.starting_port += 1
             if not self.listener_port:
-                raise Error(f'All 1000 ports starting at {str(starting_port - 1001)} taken.')
+                raise Error(f'All 1000 ports starting at {str(self.starting_port - 1001)} taken.')
         except Exception as e:
             raise Error(f'Failed to pick port for agent to listen on: {e}')
 
index 64ab95cc4ee19fe617c9ecb6b6fe42d9cfaf975c..728e59c25ed846ede6d43e28d0f7d72727effae2 100644 (file)
@@ -8,23 +8,23 @@ import tempfile
 import threading
 import time
 
-# from orchestrator import OrchestratorError
 from mgr_util import verify_tls_files
-from orchestrator import DaemonDescriptionStatus
+from orchestrator import DaemonDescriptionStatus, OrchestratorError
+from orchestrator._interface import daemon_type_to_service
 from ceph.utils import datetime_now
 from ceph.deployment.inventory import Devices
 from ceph.deployment.service_spec import ServiceSpec, PlacementSpec
 from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
 
 from datetime import datetime, timedelta
-from OpenSSL import crypto
 from cryptography import x509
 from cryptography.x509.oid import NameOID
 from cryptography.hazmat.primitives.asymmetric import rsa
 from cryptography.hazmat.primitives import hashes, serialization
 from cryptography.hazmat.backends import default_backend
 
-from typing import Any, Dict, List, Set, Tuple, TYPE_CHECKING
+from typing import Any, Dict, List, Set, Tuple, \
+    TYPE_CHECKING, Optional
 
 if TYPE_CHECKING:
     from cephadm.module import CephadmOrchestrator
@@ -53,7 +53,17 @@ class CherryPyThread(threading.Thread):
 
     def run(self) -> None:
         try:
-            self.ssl_certs.generate_root_cert()
+            try:
+                old_creds = self.mgr.get_store('cephadm_endpoint_credentials')
+                if not old_creds:
+                    raise OrchestratorError('No old credentials for cephadm endpoint found')
+                old_creds_dict = json.loads(old_creds)
+                old_key = old_creds_dict['key']
+                old_cert = old_creds_dict['cert']
+                self.ssl_certs.load_root_credentials(old_cert, old_key)
+            except (OrchestratorError, json.decoder.JSONDecodeError, KeyError, ValueError):
+                self.ssl_certs.generate_root_cert()
+
             cert, key = self.ssl_certs.generate_cert()
 
             self.key_tmp = tempfile.NamedTemporaryFile()
@@ -82,6 +92,12 @@ class CherryPyThread(threading.Thread):
             self.mgr.log.debug('Starting cherrypy engine...')
             self.start_engine()
             self.mgr.log.debug('Cherrypy engine started.')
+            cephadm_endpoint_creds = {
+                'cert': self.ssl_certs.get_root_cert(),
+                'key': self.ssl_certs.get_root_key()
+            }
+            self.mgr.set_store('cephadm_endpoint_credentials', json.dumps(cephadm_endpoint_creds))
+            self.mgr._kick_serve_loop()
             # wait for the shutdown event
             self.cherrypy_shutdown_event.wait()
             self.cherrypy_shutdown_event.clear()
@@ -338,7 +354,7 @@ class CephadmAgentHelpers:
     def __init__(self, mgr: "CephadmOrchestrator"):
         self.mgr: "CephadmOrchestrator" = mgr
 
-    def _request_agent_acks(self, hosts: Set[str], increment: bool = False) -> None:
+    def _request_agent_acks(self, hosts: Set[str], increment: bool = False, new_config: Optional[Dict[str, str]] = None) -> None:
         for host in hosts:
             if increment:
                 self.mgr.cache.metadata_up_to_date[host] = False
@@ -346,8 +362,11 @@ class CephadmAgentHelpers:
                 self.mgr.cache.agent_counter[host] = 1
             elif increment:
                 self.mgr.cache.agent_counter[host] = self.mgr.cache.agent_counter[host] + 1
+            payload: Dict[str, Any] = {'counter': self.mgr.cache.agent_counter[host]}
+            if new_config:
+                payload['config'] = new_config
             message_thread = AgentMessageThread(
-                host, self.mgr.cache.agent_ports[host], {'counter': self.mgr.cache.agent_counter[host]}, self.mgr)
+                host, self.mgr.cache.agent_ports[host], payload, self.mgr)
             message_thread.start()
 
     def _agent_down(self, host: str) -> bool:
@@ -450,8 +469,33 @@ class CephadmAgentHelpers:
                 last_deps, last_config = self.mgr.cache.get_daemon_last_config_deps(
                     host, agent.name())
                 if not last_config or last_deps != deps:
+                    # if root cert is the dep that changed, we must use ssh to reconfig
+                    # so it's necessary to check this one specifically
+                    root_cert_match = False
+                    try:
+                        root_cert = self.mgr.cherrypy_thread.ssl_certs.get_root_cert()
+                        if last_deps and root_cert in last_deps:
+                            root_cert_match = True
+                    except Exception:
+                        pass
                     daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(agent)
-                    self.mgr._daemon_action(daemon_spec, action='reconfig')
+                    # we need to know the agent port to try to reconfig w/ http
+                    # otherwise there is no choice but a full ssh reconfig
+                    if host in self.mgr.cache.agent_ports and root_cert_match:
+                        daemon_spec = self.mgr.cephadm_services[daemon_type_to_service(
+                            daemon_spec.daemon_type)].prepare_create(daemon_spec)
+                        self.mgr.cache.agent_timestamp[daemon_spec.host] = datetime_now()
+                        self.mgr.cache.agent_counter[daemon_spec.host] = 1
+                        self.mgr.agent_helpers._request_agent_acks(
+                            hosts={daemon_spec.host},
+                            increment=True,
+                            new_config=daemon_spec.final_config
+                        )
+                        self.mgr.cache.update_daemon_config_deps(
+                            daemon_spec.host, daemon_spec.name(), daemon_spec.deps, datetime_now())
+                        self.mgr.cache.save_host(daemon_spec.host)
+                    else:
+                        self.mgr._daemon_action(daemon_spec, action='reconfig')
                     return False
             except Exception as e:
                 self.mgr.log.debug(
@@ -473,7 +517,6 @@ class SSLCerts:
         self.mgr = mgr
         self.root_cert: Any
         self.root_key: Any
-        self.root_subj: Any
 
     def generate_root_cert(self) -> Tuple[str, str]:
         self.root_key = rsa.generate_private_key(
@@ -508,7 +551,7 @@ class SSLCerts:
             private_key=self.root_key, algorithm=hashes.SHA256(), backend=default_backend()
         )
 
-        cert_str = crypto.dump_certificate(crypto.FILETYPE_PEM, self.root_cert).decode('utf-8')
+        cert_str = self.root_cert.public_bytes(encoding=serialization.Encoding.PEM).decode('utf-8')
         key_str = self.root_key.private_bytes(
             encoding=serialization.Encoding.PEM,
             format=serialization.PrivateFormat.TraditionalOpenSSL,
@@ -564,7 +607,7 @@ class SSLCerts:
             private_key=self.root_key, algorithm=hashes.SHA256(), backend=default_backend()
         )
 
-        cert_str = crypto.dump_certificate(crypto.FILETYPE_PEM, cert).decode('utf-8')
+        cert_str = cert.public_bytes(encoding=serialization.Encoding.PEM).decode('utf-8')
         key_str = private_key.private_bytes(
             encoding=serialization.Encoding.PEM,
             format=serialization.PrivateFormat.TraditionalOpenSSL,
@@ -575,12 +618,25 @@ class SSLCerts:
 
     def get_root_cert(self) -> str:
         try:
-            return crypto.dump_certificate(crypto.FILETYPE_PEM, self.root_cert).decode('utf-8')
+            return self.root_cert.public_bytes(encoding=serialization.Encoding.PEM).decode('utf-8')
         except AttributeError:
             return ''
 
     def get_root_key(self) -> str:
         try:
-            return crypto.dump_certificate(crypto.FILETYPE_PEM, self.root_key).decode('utf-8')
+            return self.root_key.private_bytes(
+                encoding=serialization.Encoding.PEM,
+                format=serialization.PrivateFormat.TraditionalOpenSSL,
+                encryption_algorithm=serialization.NoEncryption(),
+            ).decode('utf-8')
         except AttributeError:
             return ''
+
+    def load_root_credentials(self, cert: str, priv_key: str) -> None:
+        given_cert = x509.load_pem_x509_certificate(cert.encode('utf-8'), backend=default_backend())
+        tz = given_cert.not_valid_after.tzinfo
+        if datetime.now(tz) >= given_cert.not_valid_after:
+            raise OrchestratorError('Given cert is expired')
+        self.root_cert = given_cert
+        self.root_key = serialization.load_pem_private_key(
+            data=priv_key.encode('utf-8'), backend=default_backend(), password=None)
index 48e087997a77dde8895c3cba8b3fdc3473dbd511..805346bbe6665c6cb6bebd36384c5e7313410053 100644 (file)
@@ -512,6 +512,9 @@ class HostCache():
                 self.agent_counter[host] = int(j.get('agent_counter', 1))
                 self.metadata_up_to_date[host] = False
                 self.agent_keys[host] = str(j.get('agent_keys', ''))
+                agent_port = int(j.get('agent_ports', 0))
+                if agent_port:
+                    self.agent_ports[host] = agent_port
 
                 self.mgr.log.debug(
                     'HostCache.load: host %s has %d daemons, '
@@ -706,6 +709,8 @@ class HostCache():
             j['agent_counter'] = self.agent_counter[host]
         if host in self.agent_keys:
             j['agent_keys'] = self.agent_keys[host]
+        if host in self.agent_ports:
+            j['agent_ports'] = self.agent_ports[host]
 
         self.mgr.set_store(HOST_CACHE_PREFIX + host, json.dumps(j))