]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/cephadm: cephadm agent 2.0
authorAdam King <adking@redhat.com>
Wed, 16 Jun 2021 12:02:15 +0000 (08:02 -0400)
committerAdam King <adking@redhat.com>
Fri, 24 Sep 2021 11:23:50 +0000 (07:23 -0400)
Creates an http endpoint in mgr/cephadm to receive
http requests and an agent that can be deployed on
each host that will gather metadata on the host and
send it to the mgr/cephadm http endpoint. Should save the
cephadm mgr module a lot of time it would have to spend
repeatedly ssh-ing into each host to gather the metadata
and help performance on larger clusters.

Fixes: https://tracker.ceph.com/issues/51004
Signed-off-by: Adam King <adking@redhat.com>
15 files changed:
ceph.spec.in
debian/control
src/cephadm/cephadm
src/pybind/mgr/cephadm/agent.py [new file with mode: 0644]
src/pybind/mgr/cephadm/inventory.py
src/pybind/mgr/cephadm/module.py
src/pybind/mgr/cephadm/serve.py
src/pybind/mgr/cephadm/services/cephadmservice.py
src/pybind/mgr/cephadm/tests/fixtures.py
src/pybind/mgr/cephadm/tests/test_cephadm.py
src/pybind/mgr/cephadm/tests/test_migration.py
src/pybind/mgr/cephadm/tests/test_upgrade.py
src/pybind/mgr/cephadm/upgrade.py
src/pybind/mgr/orchestrator/_interface.py
src/python-common/ceph/deployment/service_spec.py

index 3827d7bb094f93ad01019d42e32b30732164b27b..f102b185d2688633bcb52e36c588aa891c92c45d 100644 (file)
@@ -660,6 +660,7 @@ Group:          System/Filesystems
 %endif
 Requires:       ceph-mgr = %{_epoch_prefix}%{version}-%{release}
 Requires:       python%{python3_pkgversion}-asyncssh
+Requires:       python%{python3_pkgversion}-cherrypy
 Requires:       cephadm = %{_epoch_prefix}%{version}-%{release}
 %if 0%{?suse_version}
 Requires:       openssh
index 09011acb4fa0b9e14ae005b0894f432ddb35a003..d58da69aef0fa71191221bcaece483cbbe91934f 100644 (file)
@@ -342,7 +342,8 @@ Depends: ceph-mgr (= ${binary:Version}),
          ${misc:Depends},
          ${python:Depends},
          openssh-client,
-         python3-jinja2
+         python3-jinja2,
+         python3-cherrypy
 Description: cephadm orchestrator module for ceph-mgr
  Ceph is a massively scalable, open-source, distributed
  storage system that runs on commodity hardware and delivers object,
index 657124425e05a04aecbc9ba9e80305a53e83f2bf..4e938295d6d4eb51d336a8e32558373c33c5816a 100755 (executable)
@@ -42,7 +42,7 @@ from glob import glob
 from io import StringIO
 from threading import Thread, RLock
 from urllib.error import HTTPError
-from urllib.request import urlopen
+from urllib.request import urlopen, Request
 from pathlib import Path
 
 FuncT = TypeVar('FuncT', bound=Callable)
@@ -1005,6 +1005,7 @@ def get_supported_daemons():
     supported_daemons.append(CephadmDaemon.daemon_type)
     supported_daemons.append(HAproxy.daemon_type)
     supported_daemons.append(Keepalived.daemon_type)
+    supported_daemons.append(CephadmAgent.daemon_type)
     assert len(supported_daemons) == len(set(supported_daemons))
     return supported_daemons
 
@@ -2093,6 +2094,9 @@ def check_units(ctx, units, enabler=None):
 
 
 def is_container_running(ctx: CephadmContext, c: 'CephContainer') -> bool:
+    if ctx.name.split('.', 1)[0] in ['agent', 'cephadm-exporter']:
+        # these are non-containerized daemon types
+        return False
     return bool(get_running_container_name(ctx, c))
 
 
@@ -2697,6 +2701,15 @@ def deploy_daemon(ctx, fsid, daemon_type, daemon_id, c, uid, gid,
 
             cephadm_exporter = CephadmDaemon(ctx, fsid, daemon_id, port)
             cephadm_exporter.deploy_daemon_unit(config_js)
+        elif daemon_type == CephadmAgent.daemon_type:
+            if ctx.config_json == '-':
+                config_js = get_parm('-')
+            else:
+                config_js = get_parm(ctx.config_json)
+            assert isinstance(config_js, dict)
+
+            cephadm_agent = CephadmAgent(ctx, fsid, daemon_id)
+            cephadm_agent.deploy_daemon_unit(config_js)
         else:
             if c:
                 deploy_daemon_units(ctx, fsid, uid, gid, daemon_type, daemon_id,
@@ -3447,6 +3460,205 @@ class CephContainer:
                                 desc=self.entrypoint, timeout=timeout)
         return out
 
+
+#####################################
+
+class MgrListener(Thread):
+    def __init__(self, agent: 'CephadmAgent') -> None:
+        self.agent = agent
+        self.stop = False
+        super(MgrListener, self).__init__(target=self.run)
+
+    def run(self) -> None:
+        listenSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        listenSocket.bind(('0.0.0.0', int(self.agent.listener_port)))
+        listenSocket.settimeout(60)
+        listenSocket.listen(1)
+        while not self.stop:
+            try:
+                conn, _ = listenSocket.accept()
+            except socket.timeout:
+                continue
+            while True:
+                data = conn.recv(self.agent.socket_buffer_size).decode()
+                if not data:
+                    break
+                try:
+                    self.agent.ack = int(data)
+                except Exception as e:
+                    err_str = f'Failed to extract timestamp integer from message: {e}'
+                    conn.send(err_str.encode())
+                    self.agent.ack = -1
+                else:
+                    conn.send(b'ACK')
+                self.agent.wakeup()
+                logger.debug(f'Got mgr message {data}')
+
+    def shutdown(self) -> None:
+        self.stop = True
+
+
+class CephadmAgent():
+
+    daemon_type = 'agent'
+    default_port = 8498
+    loop_interval = 30
+    socket_buffer_size = 256
+    stop = False
+
+    required_files = ['cephadm', 'agent.json', 'keyring']
+
+    def __init__(self, ctx: CephadmContext, fsid: str, daemon_id: Union[int, str] = ''):
+        self.ctx = ctx
+        self.fsid = fsid
+        self.daemon_id = daemon_id
+        self.target_ip = ''
+        self.target_port = ''
+        self.host = ''
+        self.daemon_dir = os.path.join(ctx.data_dir, self.fsid, f'{self.daemon_type}.{self.daemon_id}')
+        self.binary_path = os.path.join(self.daemon_dir, 'cephadm')
+        self.config_path = os.path.join(self.daemon_dir, 'agent.json')
+        self.keyring_path = os.path.join(self.daemon_dir, 'keyring')
+        self.ca_path = os.path.join(self.daemon_dir, 'root_cert.pem')
+        self.listener_port = ''
+        self.ack = -1
+        self.event = threading.Event()
+        self.mgr_listener = MgrListener(self)
+
+    def deploy_daemon_unit(self, config: Dict[str, str] = {}) -> None:
+        if not config:
+            raise Error('Agent needs a config')
+        assert isinstance(config, dict)
+
+        # Create the required config files in the daemons dir, with restricted permissions
+        for filename in config:
+            with open(os.path.join(self.daemon_dir, filename), 'w') as f:
+                f.write(config[filename])
+
+        with open(os.path.join(self.daemon_dir, 'unit.run'), 'w') as f:
+            f.write(self.unit_run())
+
+        unit_file_path = os.path.join(self.ctx.unit_dir, self.unit_name())
+        with open(unit_file_path + '.new', 'w') as f:
+            f.write(self.unit_file())
+            os.rename(unit_file_path + '.new', unit_file_path)
+
+        call_throws(self.ctx, ['systemctl', 'daemon-reload'])
+        call(self.ctx, ['systemctl', 'stop', self.unit_name()],
+             verbosity=CallVerbosity.DEBUG)
+        call(self.ctx, ['systemctl', 'reset-failed', self.unit_name()],
+             verbosity=CallVerbosity.DEBUG)
+        call_throws(self.ctx, ['systemctl', 'enable', '--now', self.unit_name()])
+
+    def unit_name(self) -> str:
+        return '{}.service'.format(get_unit_name(self.fsid, self.daemon_type, self.daemon_id))
+
+    def unit_run(self) -> str:
+        py3 = shutil.which('python3')
+        return ('set -e\n' + f'{py3} {self.binary_path} agent --fsid {self.fsid} --daemon-id {self.daemon_id} &\n')
+
+    def unit_file(self) -> str:
+        return """#generated by cephadm
+[Unit]
+Description=cephadm agent for cluster {fsid}
+
+PartOf=ceph-{fsid}.target
+Before=ceph-{fsid}.target
+
+[Service]
+Type=forking
+ExecStart=/bin/bash {data_dir}/unit.run
+Restart=on-failure
+RestartSec=10s
+
+[Install]
+WantedBy=ceph-{fsid}.target
+""".format(
+            fsid=self.fsid,
+            data_dir=self.daemon_dir
+        )
+
+    def shutdown(self) -> None:
+        self.stop = True
+        if self.mgr_listener.is_alive():
+            self.mgr_listener.shutdown()
+
+    def wakeup(self) -> None:
+        self.event.set()
+
+    def run(self) -> None:
+        try:
+            with open(self.config_path, 'r') as f:
+                config = json.load(f)
+                self.target_ip = config['target_ip']
+                self.target_port = config['target_port']
+                self.loop_interval = int(config['refresh_period'])
+                starting_port = int(config['listener_port'])
+                self.host = config['host']
+        except Exception as e:
+            self.shutdown()
+            raise Error(f'Failed to get agent target ip and port from config: {e}')
+
+        try:
+            with open(self.keyring_path, 'r') as f:
+                self.keyring = f.read()
+        except Exception as e:
+            self.shutdown()
+            raise Error(f'Failed to get agent keyring: {e}')
+
+        assert self.target_ip and self.target_port
+
+        try:
+            for _ in range(1001):
+                if not port_in_use(self.ctx, starting_port):
+                    self.listener_port = str(starting_port)
+                    break
+                starting_port += 1
+            if not self.listener_port:
+                raise Error(f'All 1000 ports starting at {str(starting_port - 1001)} taken.')
+        except Exception as e:
+            raise Error(f'Failed to pick port for agent to listen on: {e}')
+
+        if not self.mgr_listener.is_alive():
+            self.mgr_listener.start()
+
+        ssl_ctx = ssl.create_default_context()
+        ssl_ctx.verify_mode = ssl.CERT_REQUIRED
+        ssl_ctx.check_hostname = True
+        ssl_ctx.load_verify_locations(self.ca_path)
+
+        while not self.stop:
+            ack = self.ack
+
+            data = json.dumps({'host': self.host,
+                              'ls': list_daemons(self.ctx),
+                               'networks': list_networks(self.ctx),
+                               'facts': HostFacts(self.ctx).dump(),
+                               'ack': str(ack),
+                               'keyring': self.keyring,
+                               'port': self.listener_port})
+            data = data.encode('ascii')
+
+            url = f'https://{self.target_ip}:{self.target_port}/data'
+            try:
+                req = Request(url, data, {'Content-Type': 'application/json'})
+                urlopen(req, context=ssl_ctx)
+            except Exception as e:
+                logger.error(f'Failed to send metadata to mgr: {e}')
+
+            self.event.wait(self.loop_interval)
+            self.event.clear()
+
+
+def command_agent(ctx: CephadmContext) -> None:
+    agent = CephadmAgent(ctx, ctx.fsid, ctx.daemon_id)
+
+    if not os.path.isdir(agent.daemon_dir):
+        raise Error(f'Agent daemon directory {agent.daemon_dir} does not exist. Perhaps agent was never deployed?')
+
+    agent.run()
+
+
 ##################################
 
 
@@ -3542,7 +3754,7 @@ def get_image_info_from_inspect(out, image):
         'image_id': normalize_container_id(image_id)
     }  # type: Dict[str, Union[str,List[str]]]
     if digests:
-        r['repo_digests'] = list(map(normalize_image_digest, digests[1:-1].split(' ')))
+        r['repo_digests'] = list(map(normalize_image_digest, digests[1: -1].split(' ')))
     return r
 
 ##################################
@@ -3578,7 +3790,7 @@ def check_subnet(subnets: str) -> Tuple[int, List[int], str]:
 def unwrap_ipv6(address):
     # type: (str) -> str
     if address.startswith('[') and address.endswith(']'):
-        return address[1:-1]
+        return address[1: -1]
     return address
 
 
@@ -3643,7 +3855,7 @@ def prepare_mon_addresses(
             raise Error('--mon-addrv value %s must use square backets' %
                         addr_arg)
         ipv6 = addr_arg.count('[') > 1
-        for addr in addr_arg[1:-1].split(','):
+        for addr in addr_arg[1: -1].split(','):
             hasport = r.findall(addr)
             if not hasport:
                 raise Error('--mon-addrv value %s must include port number' %
@@ -4233,13 +4445,11 @@ def command_bootstrap(ctx):
     (uid, gid) = extract_uid_gid(ctx)
 
     # create some initial keys
-    (mon_key, mgr_key, admin_key, bootstrap_keyring, admin_keyring) = \
-        create_initial_keys(ctx, uid, gid, mgr_id)
+    (mon_key, mgr_key, admin_key, bootstrap_keyring, admin_keyring) = create_initial_keys(ctx, uid, gid, mgr_id)
 
     monmap = create_initial_monmap(ctx, uid, gid, fsid, mon_id, addr_arg)
-    (mon_dir, log_dir) = \
-        prepare_create_mon(ctx, uid, gid, fsid, mon_id,
-                           bootstrap_keyring.name, monmap.name)
+    (mon_dir, log_dir) = prepare_create_mon(ctx, uid, gid, fsid, mon_id,
+                                            bootstrap_keyring.name, monmap.name)
 
     with open(mon_dir + '/config', 'w') as f:
         os.fchown(f.fileno(), uid, gid)
@@ -4600,6 +4810,13 @@ def command_deploy(ctx):
         deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, None,
                       uid, gid, ports=daemon_ports)
 
+    elif daemon_type == CephadmAgent.daemon_type:
+        # get current user gid and uid
+        uid = os.getuid()
+        gid = os.getgid()
+        deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, None,
+                      uid, gid, ports=daemon_ports)
+
     else:
         raise Error('daemon type {} not implemented in command_deploy function'
                     .format(daemon_type))
@@ -5002,8 +5219,7 @@ def list_daemons(ctx, detail=True, legacy_dir=None):
                         'systemd_unit': legacy_unit_name,
                     }
                     if detail:
-                        (val['enabled'], val['state'], _) = \
-                            check_unit(ctx, legacy_unit_name)
+                        (val['enabled'], val['state'], _) = check_unit(ctx, legacy_unit_name)
                         if not host_version:
                             try:
                                 out, err, code = call(ctx,
@@ -5034,8 +5250,7 @@ def list_daemons(ctx, detail=True, legacy_dir=None):
                     }
                     if detail:
                         # get container id
-                        (val['enabled'], val['state'], _) = \
-                            check_unit(ctx, unit_name)
+                        (val['enabled'], val['state'], _) = check_unit(ctx, unit_name)
                         container_id = None
                         image_name = None
                         image_id = None
@@ -8423,6 +8638,17 @@ def _get_parser():
         help='Maintenance action - enter maintenance, or exit maintenance')
     parser_maintenance.set_defaults(func=command_maintenance)
 
+    parser_agent = subparsers.add_parser(
+        'agent', help='start cephadm agent')
+    parser_agent.set_defaults(func=command_agent)
+    parser_agent.add_argument(
+        '--fsid',
+        required=True,
+        help='cluster FSID')
+    parser_agent.add_argument(
+        '--daemon-id',
+        help='daemon id for agent')
+
     return parser
 
 
diff --git a/src/pybind/mgr/cephadm/agent.py b/src/pybind/mgr/cephadm/agent.py
new file mode 100644 (file)
index 0000000..3ed1c36
--- /dev/null
@@ -0,0 +1,301 @@
+import cherrypy
+import json
+import socket
+import tempfile
+import threading
+import time
+
+from mgr_util import verify_tls_files
+from ceph.utils import datetime_now
+from ceph.deployment.service_spec import ServiceSpec, PlacementSpec
+
+from OpenSSL import crypto
+from typing import Any, Dict, Set, Tuple, TYPE_CHECKING
+from uuid import uuid4
+
+if TYPE_CHECKING:
+    from cephadm.module import CephadmOrchestrator
+
+
+class CherryPyThread(threading.Thread):
+    def __init__(self, mgr: "CephadmOrchestrator") -> None:
+        self.mgr = mgr
+        self.cherrypy_shutdown_event = threading.Event()
+        self.ssl_certs = SSLCerts(self.mgr)
+        super(CherryPyThread, self).__init__(target=self.run)
+
+    def run(self) -> None:
+        try:
+            server_addr = self.mgr.get_mgr_ip()
+            server_port = self.mgr.endpoint_port
+
+            self.ssl_certs.generate_root_cert()
+            cert, key = self.ssl_certs.generate_cert()
+
+            self.key_tmp = tempfile.NamedTemporaryFile()
+            self.key_tmp.write(key.encode('utf-8'))
+            self.key_tmp.flush()  # pkey_tmp must not be gc'ed
+            key_fname = self.key_tmp.name
+
+            self.cert_tmp = tempfile.NamedTemporaryFile()
+            self.cert_tmp.write(cert.encode('utf-8'))
+            self.cert_tmp.flush()  # cert_tmp must not be gc'ed
+            cert_fname = self.cert_tmp.name
+
+            verify_tls_files(cert_fname, key_fname)
+
+            self.mgr.set_uri('https://{0}:{1}/'.format(server_addr, server_port))
+
+            cherrypy.config.update({
+                'server.socket_host': server_addr,
+                'server.socket_port': server_port,
+                'engine.autoreload.on': False,
+                'server.ssl_module': 'builtin',
+                'server.ssl_certificate': cert_fname,
+                'server.ssl_private_key': key_fname,
+            })
+            root_conf = {'/': {'request.dispatch': cherrypy.dispatch.MethodDispatcher(),
+                               'tools.response_headers.on': True}}
+            cherrypy.tree.mount(Root(self.mgr), '/', root_conf)
+            self.mgr.log.info('Starting cherrypy engine...')
+            cherrypy.engine.start()
+            self.mgr.log.info('Cherrypy engine started.')
+            # wait for the shutdown event
+            self.cherrypy_shutdown_event.wait()
+            self.cherrypy_shutdown_event.clear()
+            cherrypy.engine.stop()
+            self.mgr.log.info('Cherrypy engine stopped.')
+        except Exception as e:
+            self.mgr.log.error(f'Failed to run cephadm cherrypy endpoint: {e}')
+
+    def shutdown(self) -> None:
+        self.mgr.log.info('Stopping cherrypy engine...')
+        self.cherrypy_shutdown_event.set()
+
+
+class Root:
+    exposed = True
+
+    def __init__(self, mgr: "CephadmOrchestrator"):
+        self.mgr = mgr
+        self.data = HostData(self.mgr)
+
+    def GET(self) -> str:
+        return '''<!DOCTYPE html>
+<html>
+<head><title>Cephadm HTTP Endpoint</title></head>
+<body>
+<p>Cephadm HTTP Endpoint is up and running</p>
+</body>
+</html>'''
+
+
+class HostData:
+    exposed = True
+
+    def __init__(self, mgr: "CephadmOrchestrator"):
+        self.mgr = mgr
+
+    @cherrypy.tools.json_in()
+    def POST(self) -> None:
+        data: Dict[str, Any] = cherrypy.request.json
+        try:
+            self.check_request_fields(data)
+        except Exception as e:
+            self.mgr.log.warning(f'Received bad metadata from an agent: {e}')
+        else:
+            self.handle_metadata(data)
+
+    def check_request_fields(self, data: Dict[str, Any]) -> None:
+        fields = '{' + ', '.join([key for key in data.keys()]) + '}'
+        if 'host' not in data:
+            raise Exception(
+                f'No host in metadata from agent ("host" field). Only received fields {fields}')
+        host = data['host']
+        if host not in self.mgr.cache.get_hosts():
+            raise Exception(f'Received metadata from agent on unknown hostname {host}')
+        if 'keyring' not in data:
+            raise Exception(
+                f'Agent on host {host} not reporting its keyring for validation ("keyring" field). Only received fields {fields}')
+        if host not in self.mgr.cache.agent_keys:
+            raise Exception(f'No agent keyring stored for host {host}. Cannot verify agent')
+        if data['keyring'] != self.mgr.cache.agent_keys[host]:
+            raise Exception(f'Got wrong keyring from agent on host {host}.')
+        if 'port' not in data:
+            raise Exception(
+                f'Agent on host {host} not reporting its listener port ("port" fields). Only received fields {fields}')
+        if 'ack' not in data:
+            raise Exception(
+                f'Agent on host {host} not reporting its counter value ("ack" field). Only received fields {fields}')
+        try:
+            int(data['ack'])
+        except Exception as e:
+            raise Exception(
+                f'Counter value from agent on host {host} could not be converted to an integer: {e}')
+        metadata_types = ['ls', 'networks', 'facts']
+        metadata_types_str = '{' + ', '.join(metadata_types) + '}'
+        if not all(item in data.keys() for item in metadata_types):
+            self.mgr.log.warning(
+                f'Agent on host {host} reported incomplete metadata. Not all of {metadata_types_str} were present. Received fields {fields}')
+
+    def handle_metadata(self, data: Dict[str, Any]) -> None:
+        try:
+            host = data['host']
+            if host not in self.mgr.cache.agent_counter:
+                self.mgr.log.debug(
+                    f'Got metadata from agent on host {host} with no known counter entry. Starting counter at 1 and requesting new metadata')
+                self.mgr.cache.agent_counter[host] = 1
+                self.mgr.agent_helpers._request_agent_acks({host})
+                return
+
+            self.mgr.cache.agent_ports[host] = int(data['port'])
+            self.mgr.cache.agent_timestamp[host] = datetime_now()
+            up_to_date = False
+
+            int_ack = int(data['ack'])
+            if int_ack == self.mgr.cache.agent_counter[host]:
+                up_to_date = True
+            else:
+                # we got old counter value with message, inform agent of new timestamp
+                self.mgr.agent_helpers._request_agent_acks({host})
+                self.mgr.log.debug(
+                    f'Received old metadata from agent on host {host}. Requested up-to-date metadata.')
+
+            if up_to_date:
+                if 'ls' in data:
+                    self.mgr._process_ls_output(host, data['ls'])
+                if 'networks' in data:
+                    self.mgr.cache.update_host_networks(host, data['networks'])
+                if 'facts' in data:
+                    self.mgr.cache.update_host_facts(host, json.loads(data['facts']))
+
+                # update timestamp of most recent up-to-date agent update
+                self.mgr.cache.metadata_up_to_date[host] = True
+                self.mgr.log.debug(
+                    f'Received up-to-date metadata from agent on host {host}.')
+
+        except Exception as e:
+            self.mgr.log.warning(
+                f'Failed to update metadata with metadata from agent on host {host}: {e}')
+
+
+class AgentMessageThread(threading.Thread):
+    def __init__(self, host: str, port: int, counter: int, mgr: "CephadmOrchestrator") -> None:
+        self.mgr = mgr
+        self.host = host
+        self.port = port
+        self.counter = counter
+        super(AgentMessageThread, self).__init__(target=self.run)
+
+    def run(self) -> None:
+        for retry_wait in [3, 5]:
+            try:
+                agent_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+                agent_socket.connect((self.mgr.inventory.get_addr(self.host), self.port))
+                agent_socket.send(str(self.counter).encode())
+                agent_response = agent_socket.recv(1024).decode()
+                self.mgr.log.debug(f'Received {agent_response} from agent on host {self.host}')
+                return
+            except ConnectionError as e:
+                # if it's a connection error, possibly try to connect again.
+                # We could have just deployed agent and it might not be ready
+                self.mgr.log.debug(
+                    f'Retrying connection to agent on {self.host} in {str(retry_wait)} seconds. Connection failed with: {e}')
+                time.sleep(retry_wait)
+            except Exception as e:
+                # if it's not a connection error, something has gone wrong. Give up.
+                self.mgr.log.debug(f'Failed to contact agent on host {self.host}: {e}')
+                return
+        return
+
+
+class CephadmAgentHelpers:
+    def __init__(self, mgr: "CephadmOrchestrator"):
+        self.mgr: "CephadmOrchestrator" = mgr
+
+    def _request_agent_acks(self, hosts: Set[str]) -> None:
+        for host in hosts:
+            self.mgr.cache.metadata_up_to_date[host] = False
+            if host not in self.mgr.cache.agent_counter:
+                self.mgr.cache.agent_counter[host] = 1
+            else:
+                self.mgr.cache.agent_counter[host] = self.mgr.cache.agent_counter[host] + 1
+            message_thread = AgentMessageThread(
+                host, self.mgr.cache.agent_ports[host], self.mgr.cache.agent_counter[host], self.mgr)
+            message_thread.start()
+
+    def _agent_down(self, host: str) -> bool:
+        # if we don't have a timestamp, it's likely because of a mgr fail over.
+        # just set the timestamp to now
+        if host not in self.mgr.cache.agent_timestamp:
+            self.mgr.cache.agent_timestamp[host] = datetime_now()
+        # agent hasn't reported in 2.5 * it's refresh rate. Something is likely wrong with it.
+        time_diff = datetime_now() - self.mgr.cache.agent_timestamp[host]
+        if time_diff.total_seconds() > 2.5 * float(self.mgr.agent_refresh_rate):
+            return True
+        return False
+
+    # this function probably seems very unnecessary, but it makes it considerably easier
+    # to get the unit tests working. All unit tests that check which daemons were deployed
+    # or services setup would have to be individually changed to expect an agent service or
+    # daemons, OR we can put this in its own function then mock the function
+    def _apply_agent(self) -> None:
+        spec = ServiceSpec(
+            service_type='agent',
+            placement=PlacementSpec(host_pattern='*')
+        )
+        self.mgr.spec_store.save(spec)
+
+
+class SSLCerts:
+    def __init__(self, mgr: "CephadmOrchestrator") -> None:
+        self.mgr = mgr
+        self.root_cert: Any
+        self.root_key: Any
+        self.root_subj: Any
+
+    def generate_root_cert(self) -> Tuple[str, str]:
+        self.root_key = crypto.PKey()
+        self.root_key.generate_key(crypto.TYPE_RSA, 2048)
+
+        self.root_cert = crypto.X509()
+        self.root_cert.set_serial_number(int(uuid4()))
+
+        self.root_subj = self.root_cert.get_subject()
+        self.root_subj.commonName = "cephadm-root"
+
+        self.root_cert.set_issuer(self.root_subj)
+        self.root_cert.set_pubkey(self.root_key)
+        self.root_cert.gmtime_adj_notBefore(0)
+        self.root_cert.gmtime_adj_notAfter(10 * 365 * 24 * 60 * 60)  # 10 years
+        self.root_cert.sign(self.root_key, 'sha256')
+
+        cert_str = crypto.dump_certificate(crypto.FILETYPE_PEM, self.root_cert).decode('utf-8')
+        key_str = crypto.dump_privatekey(crypto.FILETYPE_PEM, self.root_key).decode('utf-8')
+        return (cert_str, key_str)
+
+    def generate_cert(self) -> Tuple[str, str]:
+        key = crypto.PKey()
+        key.generate_key(crypto.TYPE_RSA, 2048)
+
+        cert = crypto.X509()
+        cert.set_serial_number(int(uuid4()))
+
+        subj = cert.get_subject()
+        subj.commonName = str(self.mgr.get_mgr_ip())
+
+        cert.set_issuer(self.root_subj)
+        cert.set_pubkey(key)
+        cert.gmtime_adj_notBefore(0)
+        cert.gmtime_adj_notAfter(10 * 365 * 24 * 60 * 60)  # 10 years
+        cert.sign(self.root_key, 'sha256')
+
+        cert_str = crypto.dump_certificate(crypto.FILETYPE_PEM, cert).decode('utf-8')
+        key_str = crypto.dump_privatekey(crypto.FILETYPE_PEM, key).decode('utf-8')
+        return (cert_str, key_str)
+
+    def get_root_cert(self) -> str:
+        return crypto.dump_certificate(crypto.FILETYPE_PEM, self.root_cert).decode('utf-8')
+
+    def get_root_key(self) -> str:
+        return crypto.dump_privatekey(crypto.FILETYPE_PEM, self.root_key).decode('utf-8')
index cd398221123e549a581a17c2dcddf519893c374f..3ea558607aaa765a0a0a853e1796a5c60efa99c2 100644 (file)
@@ -438,10 +438,12 @@ class HostCache():
         self.osdspec_previews = {}     # type: Dict[str, List[Dict[str, Any]]]
         self.osdspec_last_applied = {}  # type: Dict[str, Dict[str, datetime.datetime]]
         self.networks = {}             # type: Dict[str, Dict[str, Dict[str, List[str]]]]
+        self.last_network_update = {}   # type: Dict[str, datetime.datetime]
         self.last_device_update = {}   # type: Dict[str, datetime.datetime]
         self.last_device_change = {}   # type: Dict[str, datetime.datetime]
         self.daemon_refresh_queue = []  # type: List[str]
         self.device_refresh_queue = []  # type: List[str]
+        self.network_refresh_queue = []  # type: List[str]
         self.osdspec_previews_refresh_queue = []  # type: List[str]
 
         # host -> daemon name -> dict
@@ -453,6 +455,12 @@ class HostCache():
 
         self.scheduled_daemon_actions: Dict[str, Dict[str, str]] = {}
 
+        self.agent_counter = {}  # type: Dict[str, int]
+        self.agent_timestamp = {}  # type: Dict[str, datetime.datetime]
+        self.metadata_up_to_date = {}  # type: Dict[str, bool]
+        self.agent_keys = {}  # type: Dict[str, str]
+        self.agent_ports = {}  # type: Dict[str, int]
+
     def load(self):
         # type: () -> None
         for k, v in self.mgr.get_store_prefix(HOST_CACHE_PREFIX).items():
@@ -472,6 +480,7 @@ class HostCache():
                 # for services, we ignore the persisted last_*_update
                 # and always trigger a new scrape on mgr restart.
                 self.daemon_refresh_queue.append(host)
+                self.network_refresh_queue.append(host)
                 self.daemons[host] = {}
                 self.osdspec_previews[host] = []
                 self.osdspec_last_applied[host] = {}
@@ -499,6 +508,10 @@ class HostCache():
                 self.registry_login_queue.add(host)
                 self.scheduled_daemon_actions[host] = j.get('scheduled_daemon_actions', {})
 
+                self.agent_counter[host] = int(j.get('agent_counter', 1))
+                self.metadata_up_to_date[host] = False
+                self.agent_keys[host] = str(j.get('agent_keys', ''))
+
                 self.mgr.log.debug(
                     'HostCache.load: host %s has %d daemons, '
                     '%d devices, %d networks' % (
@@ -537,11 +550,10 @@ class HostCache():
             return True
         return False
 
-    def update_host_devices_networks(
+    def update_host_devices(
             self,
             host: str,
             dls: List[inventory.Device],
-            nets: Dict[str, Dict[str, List[str]]]
     ) -> None:
         if (
                 host not in self.devices
@@ -551,7 +563,14 @@ class HostCache():
             self.last_device_change[host] = datetime_now()
         self.last_device_update[host] = datetime_now()
         self.devices[host] = dls
+
+    def update_host_networks(
+            self,
+            host: str,
+            nets: Dict[str, Dict[str, List[str]]]
+    ) -> None:
         self.networks[host] = nets
+        self.last_network_update[host] = datetime_now()
 
     def update_daemon_config_deps(self, host: str, name: str, deps: List[str], stamp: datetime.datetime) -> None:
         self.daemon_config_deps[host][name] = {
@@ -598,6 +617,7 @@ class HostCache():
         self.daemon_config_deps[host] = {}
         self.daemon_refresh_queue.append(host)
         self.device_refresh_queue.append(host)
+        self.network_refresh_queue.append(host)
         self.osdspec_previews_refresh_queue.append(host)
         self.registry_login_queue.add(host)
         self.last_client_files[host] = {}
@@ -616,6 +636,13 @@ class HostCache():
             del self.last_device_update[host]
         self.mgr.event.set()
 
+    def invalidate_host_networks(self, host):
+        # type: (str) -> None
+        self.network_refresh_queue.append(host)
+        if host in self.last_network_update:
+            del self.last_network_update[host]
+        self.mgr.event.set()
+
     def distribute_new_registry_login_info(self) -> None:
         self.registry_login_queue = set(self.mgr.inventory.keys())
 
@@ -631,6 +658,8 @@ class HostCache():
             j['last_daemon_update'] = datetime_to_str(self.last_daemon_update[host])
         if host in self.last_device_update:
             j['last_device_update'] = datetime_to_str(self.last_device_update[host])
+        if host in self.last_network_update:
+            j['last_network_update'] = datetime_to_str(self.last_network_update[host])
         if host in self.last_device_change:
             j['last_device_change'] = datetime_to_str(self.last_device_change[host])
         if host in self.daemons:
@@ -661,6 +690,11 @@ class HostCache():
         if host in self.scheduled_daemon_actions:
             j['scheduled_daemon_actions'] = self.scheduled_daemon_actions[host]
 
+        if host in self.agent_counter:
+            j['agent_counter'] = self.agent_counter[host]
+        if host in self.agent_keys:
+            j['agent_keys'] = self.agent_keys[host]
+
         self.mgr.set_store(HOST_CACHE_PREFIX + host, json.dumps(j))
 
     def rm_host(self, host):
@@ -687,6 +721,8 @@ class HostCache():
             del self.last_daemon_update[host]
         if host in self.last_device_update:
             del self.last_device_update[host]
+        if host in self.last_network_update:
+            del self.last_network_update[host]
         if host in self.last_device_change:
             del self.last_device_change[host]
         if host in self.daemon_config_deps:
@@ -852,6 +888,20 @@ class HostCache():
             return True
         return False
 
+    def host_needs_network_refresh(self, host):
+        # type: (str) -> bool
+        if host in self.mgr.offline_hosts:
+            logger.debug(f'Host "{host}" marked as offline. Skipping network refresh')
+            return False
+        if host in self.network_refresh_queue:
+            self.network_refresh_queue.remove(host)
+            return True
+        cutoff = datetime_now() - datetime.timedelta(
+            seconds=self.mgr.device_cache_timeout)
+        if host not in self.last_network_update or self.last_network_update[host] < cutoff:
+            return True
+        return False
+
     def host_needs_osdspec_preview_refresh(self, host: str) -> bool:
         if host in self.mgr.offline_hosts:
             logger.debug(f'Host "{host}" marked as offline. Skipping osdspec preview refresh')
@@ -891,6 +941,16 @@ class HostCache():
             return True
         return False
 
+    def host_metadata_up_to_date(self, host: str) -> bool:
+        if host not in self.metadata_up_to_date or not self.metadata_up_to_date[host]:
+            return False
+        return True
+
+    def all_host_metadata_up_to_date(self) -> bool:
+        if [h for h in self.get_hosts() if not self.host_metadata_up_to_date(h)]:
+            return False
+        return True
+
     def add_daemon(self, host, dd):
         # type: (str, orchestrator.DaemonDescription) -> None
         assert host in self.daemons
index 7b5676ac5d52c3fdef0e6c2cad3e902d7412a491..8a5ff3de81d4b0f9f68230ffad4358b19e71b3e0 100644 (file)
@@ -28,6 +28,8 @@ from ceph.deployment.service_spec import \
 from ceph.utils import str_to_datetime, datetime_to_str, datetime_now
 from cephadm.serve import CephadmServe
 from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
+from cephadm.agent import CherryPyThread, CephadmAgentHelpers
+
 
 from mgr_module import MgrModule, HandleCommandResult, Option
 from mgr_util import create_self_signed_cert
@@ -45,7 +47,7 @@ from . import utils
 from . import ssh
 from .migrations import Migrations
 from .services.cephadmservice import MonService, MgrService, MdsService, RgwService, \
-    RbdMirrorService, CrashService, CephadmService, CephfsMirrorService
+    RbdMirrorService, CrashService, CephadmService, CephfsMirrorService, CephadmAgent
 from .services.ingress import IngressService
 from .services.container import CustomContainerService
 from .services.iscsi import IscsiService
@@ -79,6 +81,16 @@ Host *
   ConnectTimeout=30
 """
 
+# cherrypy likes to sys.exit on error.  don't let it take us down too!
+
+
+def os_exit_noop(status: int) -> None:
+    pass
+
+
+os._exit = os_exit_noop   # type: ignore
+
+
 # Default container images -----------------------------------------------------
 DEFAULT_IMAGE = 'quay.io/ceph/ceph'
 DEFAULT_PROMETHEUS_IMAGE = 'quay.io/prometheus/prometheus:v2.18.1'
@@ -334,6 +346,30 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
             default=10 * 60,
             desc='how frequently to autotune daemon memory'
         ),
+        Option(
+            'use_agent',
+            type='bool',
+            default=True,
+            desc='Use cephadm agent on each host to gather and send metadata'
+        ),
+        Option(
+            'agent_refresh_rate',
+            type='secs',
+            default=20,
+            desc='How often agent on each host will try to gather and send metadata'
+        ),
+        Option(
+            'endpoint_port',
+            type='int',
+            default=8499,
+            desc='Which port cephadm http endpoint will listen on'
+        ),
+        Option(
+            'agent_starting_port',
+            type='int',
+            default=4721,
+            desc='First port agent will try to bind to (will also try up to next 1000 subsequent ports if blocked)'
+        ),
     ]
 
     def __init__(self, *args: Any, **kwargs: Any):
@@ -393,6 +429,10 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
             self._temp_files: List = []
             self.ssh_key: Optional[str] = None
             self.ssh_pub: Optional[str] = None
+            self.use_agent = False
+            self.agent_refresh_rate = 0
+            self.endpoint_port = 0
+            self.agent_starting_port = 0
 
         self.notify('mon_map', None)
         self.config_notify()
@@ -450,7 +490,8 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
             OSDService, NFSService, MonService, MgrService, MdsService,
             RgwService, RbdMirrorService, GrafanaService, AlertmanagerService,
             PrometheusService, NodeExporterService, CrashService, IscsiService,
-            IngressService, CustomContainerService, CephadmExporter, CephfsMirrorService
+            IngressService, CustomContainerService, CephadmExporter, CephfsMirrorService,
+            CephadmAgent
         ]
 
         # https://github.com/python/mypy/issues/8993
@@ -467,10 +508,26 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
 
         self.config_checker = CephadmConfigChecks(self)
 
+        self.cherrypy_thread = None
+        self.agent_helpers = CephadmAgentHelpers(self)
+
+        if self.use_agent:
+            try:
+                if not self.cherrypy_thread:
+                    self.cherrypy_thread = CherryPyThread(self)
+                    self.cherrypy_thread.start()
+                if 'agent' not in self.spec_store:
+                    self.agent_helpers._apply_agent()
+            except Exception as e:
+                self.log.error(f'Failed to initialize agent spec and cherrypy server: {e}')
+
     def shutdown(self) -> None:
         self.log.debug('shutdown')
         self._worker_pool.close()
         self._worker_pool.join()
+        if self.cherrypy_thread:
+            self.cherrypy_thread.shutdown()
+            self.cherrypy_thread = None
         self.run = False
         self.event.set()
 
@@ -587,7 +644,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
         suffix = daemon_type not in [
             'mon', 'crash',
             'prometheus', 'node-exporter', 'grafana', 'alertmanager',
-            'container', 'cephadm-exporter',
+            'container', 'cephadm-exporter', 'agent'
         ]
         if forcename:
             if len([d for d in existing if d.daemon_id == forcename]):
@@ -632,6 +689,65 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
             raise OrchestratorValidationError("ssh_config \"{}\" does not exist".format(
                 ssh_config_fname))
 
+    def _process_ls_output(self, host: str, ls: List[Dict[str, Any]]) -> None:
+        dm = {}
+        for d in ls:
+            if not d['style'].startswith('cephadm'):
+                continue
+            if d['fsid'] != self._cluster_fsid:
+                continue
+            if '.' not in d['name']:
+                continue
+            sd = orchestrator.DaemonDescription()
+            sd.last_refresh = datetime_now()
+            for k in ['created', 'started', 'last_configured', 'last_deployed']:
+                v = d.get(k, None)
+                if v:
+                    setattr(sd, k, str_to_datetime(d[k]))
+            sd.daemon_type = d['name'].split('.')[0]
+            if sd.daemon_type not in orchestrator.KNOWN_DAEMON_TYPES:
+                logger.warning(f"Found unknown daemon type {sd.daemon_type} on host {host}")
+                continue
+
+            sd.daemon_id = '.'.join(d['name'].split('.')[1:])
+            sd.hostname = host
+            sd.container_id = d.get('container_id')
+            if sd.container_id:
+                # shorten the hash
+                sd.container_id = sd.container_id[0:12]
+            sd.container_image_name = d.get('container_image_name')
+            sd.container_image_id = d.get('container_image_id')
+            sd.container_image_digests = d.get('container_image_digests')
+            sd.memory_usage = d.get('memory_usage')
+            sd.memory_request = d.get('memory_request')
+            sd.memory_limit = d.get('memory_limit')
+            sd._service_name = d.get('service_name')
+            sd.deployed_by = d.get('deployed_by')
+            sd.version = d.get('version')
+            sd.ports = d.get('ports')
+            sd.ip = d.get('ip')
+            sd.rank = int(d['rank']) if d.get('rank') is not None else None
+            sd.rank_generation = int(d['rank_generation']) if d.get(
+                'rank_generation') is not None else None
+            if sd.daemon_type == 'osd':
+                sd.osdspec_affinity = self.osd_service.get_osdspec_affinity(sd.daemon_id)
+            if 'state' in d:
+                sd.status_desc = d['state']
+                sd.status = {
+                    'running': DaemonDescriptionStatus.running,
+                    'stopped': DaemonDescriptionStatus.stopped,
+                    'error': DaemonDescriptionStatus.error,
+                    'unknown': DaemonDescriptionStatus.error,
+                }[d['state']]
+            else:
+                sd.status_desc = 'unknown'
+                sd.status = None
+            dm[sd.name()] = sd
+        self.log.debug('Refreshed host %s daemons (%d)' % (host, len(dm)))
+        self.cache.update_host_daemons(host, dm)
+        self.cache.save_host(host)
+        return None
+
     def offline_hosts_remove(self, host: str) -> None:
         if host in self.offline_hosts:
             self.offline_hosts.remove(host)
@@ -1963,10 +2079,12 @@ Then run the following:
                 for h in host_filter.hosts:
                     self.log.debug(f'will refresh {h} devs')
                     self.cache.invalidate_host_devices(h)
+                    self.cache.invalidate_host_networks(h)
             else:
                 for h in self.cache.get_hosts():
                     self.log.debug(f'will refresh {h} devs')
                     self.cache.invalidate_host_devices(h)
+                    self.cache.invalidate_host_networks(h)
 
             self.event.set()
             self.log.debug('Kicked serve() loop to refresh devices')
@@ -2004,6 +2122,7 @@ Then run the following:
             ['--', 'lvm', 'zap', '--destroy', path],
             error_ok=True)
         self.cache.invalidate_host_devices(host)
+        self.cache.invalidate_host_networks(host)
         if code:
             raise OrchestratorError('Zap failed: %s' % '\n'.join(out + err))
         return '\n'.join(out + err)
@@ -2152,6 +2271,11 @@ Then run the following:
                 return []
             daemons = self.cache.get_daemons_by_service(spec.service_name())
             deps = [d.name() for d in daemons if d.daemon_type == 'haproxy']
+        elif daemon_type == 'agent':
+            root_cert = ''
+            if self.cherrypy_thread and self.cherrypy_thread.ssl_certs.root_cert:
+                root_cert = self.cherrypy_thread.ssl_certs.get_root_cert()
+            deps = [self.get_mgr_ip(), str(self.endpoint_port), root_cert]
         else:
             need = {
                 'prometheus': ['mgr', 'alertmanager', 'node-exporter', 'ingress'],
index e20a445ef707f9a135e8465fb15aa75b426d4601..0840242ba8398ea209f39c05d791354ab52e5786 100644 (file)
@@ -3,18 +3,19 @@ import json
 import logging
 import uuid
 from collections import defaultdict
-from typing import TYPE_CHECKING, Optional, List, cast, Dict, Any, Union, Tuple
+from typing import TYPE_CHECKING, Optional, List, cast, Dict, Any, Union, Tuple, Iterator, Set
 
 from ceph.deployment import inventory
 from ceph.deployment.drive_group import DriveGroupSpec
 from ceph.deployment.service_spec import ServiceSpec, CustomContainerSpec, PlacementSpec
-from ceph.utils import str_to_datetime, datetime_now
+from ceph.utils import datetime_now
 
 import orchestrator
 from orchestrator import OrchestratorError, set_exception_subject, OrchestratorEvent, \
     DaemonDescriptionStatus, daemon_type_to_service
 from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
 from cephadm.schedule import HostAssignment
+from cephadm.agent import CherryPyThread
 from cephadm.autotune import MemoryAutotuner
 from cephadm.utils import forall_hosts, cephadmNoImage, is_repo_digest, \
     CephadmNoImage, CEPH_TYPES, ContainerInspectInfo
@@ -91,6 +92,19 @@ class CephadmServe:
 
                     self._purge_deleted_services()
 
+                    if self.mgr.use_agent:
+                        if not self.mgr.cherrypy_thread:
+                            self.mgr.cherrypy_thread = CherryPyThread(self.mgr)
+                            self.mgr.cherrypy_thread.start()
+                        if 'agent' not in self.mgr.spec_store:
+                            self.mgr.agent_helpers._apply_agent()
+                    else:
+                        if self.mgr.cherrypy_thread:
+                            self.mgr.cherrypy_thread.shutdown()
+                            self.mgr.cherrypy_thread = None
+                        if 'agent' in self.mgr.spec_store:
+                            self.mgr.spec_store.rm('agent')
+
                     if self.mgr.upgrade.continue_upgrade():
                         continue
 
@@ -242,6 +256,8 @@ class CephadmServe:
                 self.log.warning(
                     f'unable to calc client keyring {ks.entity} placement {ks.placement}: {e}')
 
+        agents_down: List[str] = []
+
         @forall_hosts
         def refresh(host: str) -> None:
 
@@ -249,15 +265,48 @@ class CephadmServe:
             if self.mgr.inventory._inventory[host].get("status", "").lower() == "maintenance":
                 return
 
+            if self.mgr.use_agent and self.mgr.agent_helpers._agent_down(host):
+                if host in self.mgr.offline_hosts:
+                    return
+                self.mgr.offline_hosts.add(host)
+                self.mgr._reset_con(host)
+                agents_down.append(host)
+                # try to schedule redeploy of agent in case it is individually down
+                try:
+                    agent = [a for a in self.mgr.cache.get_daemons_by_host(
+                        host) if a.hostname == host][0]
+                    self.mgr._schedule_daemon_action(agent.name(), 'redeploy')
+                except Exception as e:
+                    self.log.debug(
+                        f'Failed to find entry for agent deployed on host {host}. Agent possibly never deployed: {e}')
+                return
+            else:
+                self.mgr.offline_hosts_remove(host)
+
             if self.mgr.cache.host_needs_check(host):
                 r = self._check_host(host)
                 if r is not None:
                     bad_hosts.append(r)
-            if self.mgr.cache.host_needs_daemon_refresh(host):
-                self.log.debug('refreshing %s daemons' % host)
-                r = self._refresh_host_daemons(host)
-                if r:
-                    failures.append(r)
+
+            if not self.mgr.use_agent:
+                if self.mgr.cache.host_needs_daemon_refresh(host):
+                    self.log.debug('refreshing %s daemons' % host)
+                    r = self._refresh_host_daemons(host)
+                    if r:
+                        failures.append(r)
+
+                if self.mgr.cache.host_needs_facts_refresh(host):
+                    self.log.debug(('Refreshing %s facts' % host))
+                    r = self._refresh_facts(host)
+                    if r:
+                        failures.append(r)
+
+                if self.mgr.cache.host_needs_network_refresh(host):
+                    self.log.debug(('Refreshing %s networks' % host))
+                    r = self._refresh_host_networks(host)
+                    if r:
+                        failures.append(r)
+                self.mgr.cache.metadata_up_to_date[host] = True
 
             if self.mgr.cache.host_needs_registry_login(host) and self.mgr.registry_url:
                 self.log.debug(f"Logging `{host}` into custom registry")
@@ -272,12 +321,6 @@ class CephadmServe:
                 if r:
                     failures.append(r)
 
-            if self.mgr.cache.host_needs_facts_refresh(host):
-                self.log.debug(('Refreshing %s facts' % host))
-                r = self._refresh_facts(host)
-                if r:
-                    failures.append(r)
-
             if self.mgr.cache.host_needs_osdspec_preview_refresh(host):
                 self.log.debug(f"refreshing OSDSpec previews for {host}")
                 r = self._refresh_host_osdspec_previews(host)
@@ -316,6 +359,23 @@ class CephadmServe:
 
         refresh(self.mgr.cache.get_hosts())
 
+        if 'CEPHADM_AGENT_DOWN' in self.mgr.health_checks:
+            del self.mgr.health_checks['CEPHADM_AGENT_DOWN']
+        if agents_down:
+            detail: List[str] = []
+            for agent in agents_down:
+                detail.append((f'Cephadm agent on host {agent} has not reported in '
+                              f'{2.5 * self.mgr.agent_refresh_rate} seconds. Agent is assumed '
+                               'down and host has been marked offline.'))
+            self.mgr.health_checks['CEPHADM_AGENT_DOWN'] = {
+                'severity': 'warning',
+                'summary': '%d Cephadm Agent(s) are not reporting. '
+                'Hosts marked offline' % (len(agents_down)),
+                'count': len(agents_down),
+                'detail': detail,
+            }
+            self.mgr.set_health_checks(self.mgr.health_checks)
+
         self.mgr.config_checker.run_checks()
 
         health_changed = False
@@ -387,62 +447,7 @@ class CephadmServe:
             ls = self._run_cephadm_json(host, 'mon', 'ls', [], no_fsid=True)
         except OrchestratorError as e:
             return str(e)
-        dm = {}
-        for d in ls:
-            if not d['style'].startswith('cephadm'):
-                continue
-            if d['fsid'] != self.mgr._cluster_fsid:
-                continue
-            if '.' not in d['name']:
-                continue
-            sd = orchestrator.DaemonDescription()
-            sd.last_refresh = datetime_now()
-            for k in ['created', 'started', 'last_configured', 'last_deployed']:
-                v = d.get(k, None)
-                if v:
-                    setattr(sd, k, str_to_datetime(d[k]))
-            sd.daemon_type = d['name'].split('.')[0]
-            if sd.daemon_type not in orchestrator.KNOWN_DAEMON_TYPES:
-                logger.warning(f"Found unknown daemon type {sd.daemon_type} on host {host}")
-                continue
-
-            sd.daemon_id = '.'.join(d['name'].split('.')[1:])
-            sd.hostname = host
-            sd.container_id = d.get('container_id')
-            if sd.container_id:
-                # shorten the hash
-                sd.container_id = sd.container_id[0:12]
-            sd.container_image_name = d.get('container_image_name')
-            sd.container_image_id = d.get('container_image_id')
-            sd.container_image_digests = d.get('container_image_digests')
-            sd.memory_usage = d.get('memory_usage')
-            sd.memory_request = d.get('memory_request')
-            sd.memory_limit = d.get('memory_limit')
-            sd._service_name = d.get('service_name')
-            sd.deployed_by = d.get('deployed_by')
-            sd.version = d.get('version')
-            sd.ports = d.get('ports')
-            sd.ip = d.get('ip')
-            sd.rank = int(d['rank']) if d.get('rank') is not None else None
-            sd.rank_generation = int(d['rank_generation']) if d.get(
-                'rank_generation') is not None else None
-            if sd.daemon_type == 'osd':
-                sd.osdspec_affinity = self.mgr.osd_service.get_osdspec_affinity(sd.daemon_id)
-            if 'state' in d:
-                sd.status_desc = d['state']
-                sd.status = {
-                    'running': DaemonDescriptionStatus.running,
-                    'stopped': DaemonDescriptionStatus.stopped,
-                    'error': DaemonDescriptionStatus.error,
-                    'unknown': DaemonDescriptionStatus.error,
-                }[d['state']]
-            else:
-                sd.status_desc = 'unknown'
-                sd.status = None
-            dm[sd.name()] = sd
-        self.log.debug('Refreshed host %s daemons (%d)' % (host, len(dm)))
-        self.mgr.cache.update_host_daemons(host, dm)
-        self.mgr.cache.save_host(host)
+        self.mgr._process_ls_output(host, ls)
         return None
 
     def _refresh_facts(self, host: str) -> Optional[str]:
@@ -456,7 +461,6 @@ class CephadmServe:
         return None
 
     def _refresh_host_devices(self, host: str) -> Optional[str]:
-
         with_lsm = self.mgr.get_module_option('device_enhanced_scan')
         inventory_args = ['--', 'inventory',
                           '--format=json',
@@ -477,18 +481,29 @@ class CephadmServe:
                 else:
                     raise
 
-            networks = self._run_cephadm_json(host, 'mon', 'list-networks', [], no_fsid=True)
         except OrchestratorError as e:
             return str(e)
 
-        self.log.debug('Refreshed host %s devices (%d) networks (%s)' % (
-            host, len(devices), len(networks)))
+        self.log.debug('Refreshed host %s devices (%d)' % (
+            host, len(devices)))
         ret = inventory.Devices.from_json(devices)
-        self.mgr.cache.update_host_devices_networks(host, ret.devices, networks)
+        self.mgr.cache.update_host_devices(host, ret.devices)
         self.update_osdspec_previews(host)
         self.mgr.cache.save_host(host)
         return None
 
+    def _refresh_host_networks(self, host: str) -> Optional[str]:
+        try:
+            networks = self._run_cephadm_json(host, 'mon', 'list-networks', [], no_fsid=True)
+        except OrchestratorError as e:
+            return str(e)
+
+        self.log.debug('Refreshed host %s networks (%s)' % (
+            host, len(networks)))
+        self.mgr.cache.update_host_networks(host, networks)
+        self.mgr.cache.save_host(host)
+        return None
+
     def _refresh_host_osdspec_previews(self, host: str) -> Optional[str]:
         self.update_osdspec_previews(host)
         self.mgr.cache.save_host(host)
@@ -578,8 +593,20 @@ class CephadmServe:
     def _apply_all_services(self) -> bool:
         r = False
         specs = []  # type: List[ServiceSpec]
-        for sn, spec in self.mgr.spec_store.active_specs.items():
-            specs.append(spec)
+        # if metadata is not up to date, we still need to apply spec for agent
+        # since the agent is the one who gather the metadata. If we don't we
+        # end up stuck between wanting metadata to be up to date to apply specs
+        # and needing to apply the agent spec to get up to date metadata
+        if self.mgr.use_agent and not self.mgr.cache.all_host_metadata_up_to_date():
+            try:
+                specs.append(self.mgr.spec_store['agent'].spec)
+            except Exception as e:
+                self.log.debug(f'Failed to find agent spec: {e}')
+                self.mgr.agent_helpers._apply_agent()
+                return r
+        else:
+            for sn, spec in self.mgr.spec_store.active_specs.items():
+                specs.append(spec)
         for spec in specs:
             try:
                 if self._apply_service(spec):
@@ -673,7 +700,7 @@ class CephadmServe:
             rank_map = self.mgr.spec_store[spec.service_name()].rank_map or {}
         ha = HostAssignment(
             spec=spec,
-            hosts=self.mgr._schedulable_hosts(),
+            hosts=self.mgr.inventory.all_specs() if spec.service_name() == 'agent' else self.mgr._schedulable_hosts(),
             unreachable_hosts=self.mgr._unreachable_hosts(),
             daemons=daemons,
             networks=self.mgr.cache.networks,
@@ -734,6 +761,8 @@ class CephadmServe:
         self.log.debug('Hosts that will receive new daemons: %s' % slots_to_add)
         self.log.debug('Daemons that will be removed: %s' % daemons_to_remove)
 
+        hosts_altered: Set[str] = set()
+
         try:
             # assign names
             for i in range(len(slots_to_add)):
@@ -812,6 +841,7 @@ class CephadmServe:
                     r = True
                     progress_done += 1
                     update_progress()
+                    hosts_altered.add(daemon_spec.host)
                 except (RuntimeError, OrchestratorError) as e:
                     msg = (f"Failed while placing {slot.daemon_type}.{daemon_id} "
                            f"on {slot.hostname}: {e}")
@@ -851,11 +881,17 @@ class CephadmServe:
 
                 progress_done += 1
                 update_progress()
+                hosts_altered.add(d.hostname)
 
             self.mgr.remote('progress', 'complete', progress_id)
         except Exception as e:
             self.mgr.remote('progress', 'fail', progress_id, str(e))
             raise
+        finally:
+            if self.mgr.use_agent:
+                # can only send ack to agents if we know for sure port they bound to
+                hosts_altered = set([h for h in hosts_altered if h in self.mgr.cache.agent_ports])
+                self.mgr.agent_helpers._request_agent_acks(hosts_altered)
 
         if r is None:
             r = False
@@ -871,7 +907,7 @@ class CephadmServe:
             assert dd.hostname is not None
             assert dd.daemon_type is not None
             assert dd.daemon_id is not None
-            if not spec and dd.daemon_type not in ['mon', 'mgr', 'osd']:
+            if not spec and dd.daemon_type not in ['mon', 'mgr', 'osd', 'agent']:
                 # (mon and mgr specs should always exist; osds aren't matched
                 # to a service spec)
                 self.log.info('Removing orphan daemon %s...' % dd.name())
@@ -1055,6 +1091,9 @@ class CephadmServe:
                     stdin=json.dumps(daemon_spec.final_config),
                     image=image)
 
+                if daemon_spec.daemon_type == 'agent':
+                    self.mgr.cache.agent_timestamp[daemon_spec.host] = datetime_now()
+
                 # refresh daemon state?  (ceph daemon reconfig does not need it)
                 if not reconfig or daemon_spec.daemon_type not in CEPH_TYPES:
                     if not code and daemon_spec.host in self.mgr.cache.daemons:
@@ -1165,7 +1204,7 @@ class CephadmServe:
         self.log.debug(f"_run_cephadm : command = {command}")
         self.log.debug(f"_run_cephadm : args = {args}")
 
-        bypass_image = ('cephadm-exporter',)
+        bypass_image = ('cephadm-exporter', 'agent')
 
         assert image or entity
         # Skip the image check for daemons deployed that are not ceph containers
@@ -1198,7 +1237,9 @@ class CephadmServe:
         # exec
         self.log.debug('args: %s' % (' '.join(final_args)))
         if self.mgr.mode == 'root':
-            if stdin:
+            # agent has cephadm binary as an extra file which is
+            # therefore passed over stdin. Even for debug logs it's too much
+            if stdin and 'agent' not in str(entity):
                 self.log.debug('stdin: %s' % stdin)
 
             cmd = ['which', 'python3']
index 2453d5d3894060b462605a7a92ef0145446ce1f9..3559b9651674b16c99f31d1c75761194f03696f0 100644 (file)
@@ -441,9 +441,10 @@ class CephService(CephadmService):
         # the CephService class refers to service types, not daemon types
         if self.TYPE in ['rgw', 'rbd-mirror', 'cephfs-mirror', 'nfs', "iscsi", 'ingress']:
             return AuthEntity(f'client.{self.TYPE}.{daemon_id}')
-        elif self.TYPE == 'crash':
+        elif self.TYPE in ['crash', 'agent']:
             if host == "":
-                raise OrchestratorError("Host not provided to generate <crash> auth entity name")
+                raise OrchestratorError(
+                    f'Host not provided to generate <{self.TYPE}> auth entity name')
             return AuthEntity(f'client.{self.TYPE}.{host}')
         elif self.TYPE == 'mon':
             return AuthEntity('mon.')
@@ -997,3 +998,39 @@ class CephfsMirrorService(CephService):
         daemon_spec.keyring = keyring
         daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
         return daemon_spec
+
+
+class CephadmAgent(CephService):
+    TYPE = 'agent'
+
+    def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
+        assert self.TYPE == daemon_spec.daemon_type
+        daemon_id, host = daemon_spec.daemon_id, daemon_spec.host
+
+        if not self.mgr.cherrypy_thread:
+            raise OrchestratorError('Cannot deploy agent before creating cephadm endpoint')
+
+        keyring = self.get_keyring_with_caps(self.get_auth_entity(daemon_id, host=host), [])
+        daemon_spec.keyring = keyring
+        self.mgr.cache.agent_keys[host] = keyring
+
+        daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
+
+        return daemon_spec
+
+    def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]:
+        cfg = {'target_ip': self.mgr.get_mgr_ip(),
+               'target_port': self.mgr.endpoint_port,
+               'refresh_period': self.mgr.agent_refresh_rate,
+               'listener_port': self.mgr.agent_starting_port,
+               'host': daemon_spec.host}
+
+        assert self.mgr.cherrypy_thread
+        config = {
+            'agent.json': json.dumps(cfg),
+            'cephadm': self.mgr._cephadm,
+            'keyring': daemon_spec.keyring,
+            'root_cert.pem': self.mgr.cherrypy_thread.ssl_certs.get_root_cert(),
+        }
+
+        return config, sorted([str(self.mgr.get_mgr_ip()), str(self.mgr.endpoint_port), self.mgr.cherrypy_thread.ssl_certs.get_root_cert()])
index aea533990f2bda38100eccbac2a4504e91488a06..bc0fdeeba244e76cf21768eb7f1db4d23dba105a 100644 (file)
@@ -6,10 +6,10 @@ from contextlib import contextmanager
 
 from ceph.deployment.service_spec import PlacementSpec, ServiceSpec
 from ceph.utils import datetime_to_str, datetime_now
-from cephadm.serve import CephadmServe
+from cephadm.serve import CephadmServe, cephadmNoImage
 
 try:
-    from typing import Any, Iterator, List
+    from typing import Any, Iterator, List, Callable, Dict
 except ImportError:
     pass
 
@@ -49,6 +49,25 @@ class MockEventLoopThread:
             loop.close()
             asyncio.set_event_loop(None)
 
+def receive_agent_metadata(m: CephadmOrchestrator, host: str, ops: List[str] = None) -> None:
+    to_update: Dict[str, Callable[[str, Any], None]] = {
+        'ls': m._process_ls_output,
+        'gather-facts': m.cache.update_host_facts,
+        'list-networks': m.cache.update_host_networks,
+    }
+    if ops:
+        for op in ops:
+            out = CephadmServe(m)._run_cephadm_json(host, cephadmNoImage, op, [])
+            to_update[op](host, out)
+    m.cache.last_daemon_update[host] = datetime_now()
+    m.cache.last_facts_update[host] = datetime_now()
+    m.cache.last_network_update[host] = datetime_now()
+    m.cache.metadata_up_to_date[host] = True
+
+
+def receive_agent_metadata_all_hosts(m: CephadmOrchestrator) -> None:
+    for host in m.cache.get_hosts():
+        receive_agent_metadata(m, host)
 
 @contextmanager
 def with_cephadm_module(module_options=None, store=None):
@@ -60,7 +79,11 @@ def with_cephadm_module(module_options=None, store=None):
             mock.patch("cephadm.services.osd.RemoveUtil._run_mon_cmd"), \
             mock.patch("cephadm.module.CephadmOrchestrator.get_osdmap"), \
             mock.patch("cephadm.services.osd.OSDService.get_osdspec_affinity", return_value='test_spec'), \
-            mock.patch("cephadm.module.CephadmOrchestrator.remote"):
+            mock.patch("cephadm.module.CephadmOrchestrator.remote"), \
+            mock.patch("cephadm.agent.CephadmAgentHelpers._request_agent_acks"), \
+            mock.patch("cephadm.agent.CephadmAgentHelpers._apply_agent", return_value=False), \
+            mock.patch("cephadm.agent.CephadmAgentHelpers._agent_down", return_value=False), \
+            mock.patch('cephadm.agent.CherryPyThread.run'):
 
         m = CephadmOrchestrator.__new__(CephadmOrchestrator)
         if module_options is not None:
@@ -97,6 +120,7 @@ def with_host(m: CephadmOrchestrator, name, addr='1.2.3.4', refresh_hosts=True):
         wait(m, m.add_host(HostSpec(hostname=name)))
         if refresh_hosts:
             CephadmServe(m)._refresh_hosts_and_daemons()
+            receive_agent_metadata(m, name)
         yield
         wait(m, m.remove_host(name))
 
index 1620a9740a97147d9886e408b4a41dd9f48c183d..50628ca3f78b9fe819b39663beb94864dd5feeb0 100644 (file)
@@ -342,9 +342,8 @@ class TestCephadm(object):
                 'value': '127.0.0.0/8'
             })
 
-            cephadm_module.cache.update_host_devices_networks(
+            cephadm_module.cache.update_host_networks(
                 'test',
-                [],
                 {
                     "127.0.0.0/8": [
                         "127.0.0.1"
@@ -615,7 +614,7 @@ spec:
                 ),
             ])
 
-            cephadm_module.cache.update_host_devices_networks('test', inventory.devices, {})
+            cephadm_module.cache.update_host_devices('test', inventory.devices)
 
             _run_cephadm.return_value = (['{}'], '', 0)
 
@@ -653,7 +652,7 @@ spec:
                 Device('/dev/sdd', available=True)
             ])
 
-            cephadm_module.cache.update_host_devices_networks('test', inventory.devices, {})
+            cephadm_module.cache.update_host_devices('test', inventory.devices)
 
             _run_cephadm.return_value = (['{}'], '', 0)
 
index fa90ae70c08179df10c6c7b5d6235d73ef1499f2..ce35672e49d9dbe0131c8a8a899e46e043320f4e 100644 (file)
@@ -4,7 +4,7 @@ from ceph.deployment.service_spec import PlacementSpec, ServiceSpec, HostPlaceme
 from ceph.utils import datetime_to_str, datetime_now
 from cephadm import CephadmOrchestrator
 from cephadm.inventory import SPEC_STORE_PREFIX
-from cephadm.tests.fixtures import _run_cephadm, wait, with_host
+from cephadm.tests.fixtures import _run_cephadm, wait, with_host, receive_agent_metadata_all_hosts
 from cephadm.serve import CephadmServe
 from tests import mock
 
@@ -29,6 +29,7 @@ def test_migrate_scheduler(cephadm_module: CephadmOrchestrator):
             assert cephadm_module.migration_current == 0
 
             CephadmServe(cephadm_module)._refresh_hosts_and_daemons()
+            receive_agent_metadata_all_hosts(cephadm_module)
             cephadm_module.migration.migrate()
 
             CephadmServe(cephadm_module)._apply_all_services()
index df1cd3b563e796d5a0476729491afca22400c52f..690385878bdf6923e5bb9de48b488508865027d0 100644 (file)
@@ -6,9 +6,9 @@ import pytest
 from ceph.deployment.service_spec import PlacementSpec, ServiceSpec
 from cephadm import CephadmOrchestrator
 from cephadm.upgrade import CephadmUpgrade
-from cephadm.serve import CephadmServe
 from orchestrator import OrchestratorError, DaemonDescription
-from .fixtures import _run_cephadm, wait, with_host, with_service
+from .fixtures import _run_cephadm, wait, with_host, with_service, \
+    receive_agent_metadata
 
 
 @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
@@ -95,7 +95,8 @@ def test_upgrade_run(use_repo_digest, cephadm_module: CephadmOrchestrator):
                         )
                     ])
                 )):
-                    CephadmServe(cephadm_module)._refresh_hosts_and_daemons()
+                    receive_agent_metadata(cephadm_module, 'host1', ['ls'])
+                    receive_agent_metadata(cephadm_module, 'host2', ['ls'])
 
                 with mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm(json.dumps({
                     'image_id': 'image_id',
index fad632687e2c388e47db3bd3c7c8e7d8842efc8e..7fc2596d2fc6135a5058857cbf2047e516483ad4 100644 (file)
@@ -469,11 +469,17 @@ class CephadmUpgrade:
         target_digests = self.upgrade_state.target_digests
         target_version = self.upgrade_state.target_version
 
+        if self.mgr.use_agent and not self.mgr.cache.all_host_metadata_up_to_date():
+            # need to wait for metadata to come in
+            self.mgr.agent_helpers._request_agent_acks(
+                set([h for h in self.mgr.cache.get_hosts() if not self.mgr.cache.host_metadata_up_to_date(h)]))
+            return
+
         first = False
         if not target_id or not target_version or not target_digests:
             # need to learn the container hash
             logger.info('Upgrade: First pull of %s' % target_image)
-            self.upgrade_info_str = 'Doing first pull of %s image' % (target_image)
+            self.upgrade_info_str: str = 'Doing first pull of %s image' % (target_image)
             try:
                 target_id, target_version, target_digests = CephadmServe(self.mgr)._get_container_image_info(
                     target_image)
index 34beb5e2a3fb640cfef8e1f3e2296a9f8011d40c..accd8dca0a929d5ab9f9cb0d49b62c22746feb4e 100644 (file)
@@ -709,6 +709,7 @@ def daemon_type_to_service(dtype: str) -> str:
         'crashcollector': 'crash',  # Specific Rook Daemon
         'container': 'container',
         'cephadm-exporter': 'cephadm-exporter',
+        'agent': 'agent'
     }
     return mapping[dtype]
 
@@ -732,6 +733,7 @@ def service_to_daemon_types(stype: str) -> List[str]:
         'crash': ['crash'],
         'container': ['container'],
         'cephadm-exporter': ['cephadm-exporter'],
+        'agent': ['agent']
     }
     return mapping[stype]
 
index e8c47b4e47b7fcc6fa23fd63c9ed7587ac2e4fc0..e8978a0c4e7d10fb217ee9d6d75100018caa73ae 100644 (file)
@@ -414,7 +414,7 @@ class ServiceSpec(object):
 
     """
     KNOWN_SERVICE_TYPES = 'alertmanager crash grafana iscsi mds mgr mon nfs ' \
-                          'node-exporter osd prometheus rbd-mirror rgw ' \
+                          'node-exporter osd prometheus rbd-mirror rgw agent ' \
                           'container cephadm-exporter ingress cephfs-mirror'.split()
     REQUIRES_SERVICE_ID = 'iscsi mds nfs osd rgw container ingress '.split()
     MANAGED_CONFIG_OPTIONS = [