]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
cephadm: implement a webservice to offer host metadata to mgr
authorPaul Cuzner <pcuzner@redhat.com>
Fri, 11 Sep 2020 05:42:55 +0000 (17:42 +1200)
committerPaul Cuzner <pcuzner@redhat.com>
Fri, 11 Sep 2020 05:42:55 +0000 (17:42 +1200)
Gathering the data on a host is time consuming, so this patch give
cephadm a daemon mode that handles the data gathering tasks
itself, so mgr can just ping an API endpoint. This provides better
scale and improves the data currency to orchestrator.

This patch introduces a couple of new classes to cephadm to handle
daemon mode supporting by some compatibility patches to allow
it to integrate with existing function flows.

Signed-off-by: Paul Cuzner <pcuzner@redhat.com>
src/cephadm/cephadm

index 8f2b411ed1575d54913c2a335e518f5827fd84ab..bde62848c240cd473234ba401f6cd02f2f3ff625 100755 (executable)
@@ -62,6 +62,9 @@ import struct
 from socketserver import ThreadingMixIn
 from http.server import BaseHTTPRequestHandler, HTTPServer
 import signal
+import io
+from contextlib import redirect_stdout
+
 
 try:
     from typing import Dict, List, Tuple, Optional, Union, Any, NoReturn, Callable, IO
@@ -101,6 +104,7 @@ cephadm_cache = {
     "health": {},
     "host": {},
     "daemons": {},
+    "disks": {},
 }
 cephadm_cache_lock = RLock()
 
@@ -502,6 +506,7 @@ def get_supported_daemons():
     supported_daemons.extend(Monitoring.components)
     supported_daemons.append(NFSGanesha.daemon_type)
     supported_daemons.append(CephIscsi.daemon_type)
+    supported_daemons.append(CephadmDaemon.daemon_type)
     assert len(supported_daemons) == len(set(supported_daemons))
     return supported_daemons
 
@@ -1941,8 +1946,17 @@ def deploy_daemon(fsid, daemon_type, daemon_id, c, uid, gid,
             config, keyring)
 
     if not reconfig:
-        deploy_daemon_units(fsid, uid, gid, daemon_type, daemon_id, c,
-                            osd_fsid=osd_fsid)
+        if daemon_type == CephadmDaemon.daemon_type:
+            if ports:
+                port = ports[0]
+            else:
+                port = CephadmDaemon.default_port
+                ports = [port]
+            cephadmd = CephadmDaemon(fsid, daemon_id, port)
+            cephadmd.deploy_daemon_unit()
+        else:
+            deploy_daemon_units(fsid, uid, gid, daemon_type, daemon_id, c,
+                                osd_fsid=osd_fsid)
 
     if not os.path.exists(data_dir + '/unit.created'):
         with open(data_dir + '/unit.created', 'w') as f:
@@ -2073,11 +2087,12 @@ def deploy_daemon_units(fsid, uid, gid, daemon_type, daemon_id, c,
         os.rename(data_dir + '/unit.poststop.new',
                   data_dir + '/unit.poststop')
 
-    with open(data_dir + '/unit.image.new', 'w') as f:
-        f.write(c.image + '\n')
-        os.fchmod(f.fileno(), 0o600)
-        os.rename(data_dir + '/unit.image.new',
-                  data_dir + '/unit.image')
+    if c:
+        with open(data_dir + '/unit.image.new', 'w') as f:
+            f.write(c.image + '\n')
+            os.fchmod(f.fileno(), 0o600)
+            os.rename(data_dir + '/unit.image.new',
+                    data_dir + '/unit.image')
 
     # systemd
     install_base_units(fsid)
@@ -2166,6 +2181,26 @@ class Firewalld(object):
             else:
                 logger.debug('firewalld port %s is enabled in current zone' % tcp_port)
 
+    def close_ports(self, fw_ports):
+        # type: (List[int]) -> None
+        if not self.available:
+            logger.debug('Not possible to close ports <%s>. firewalld.service is not available' % fw_ports)
+            return
+
+        for port in fw_ports:
+            tcp_port = str(port) + '/tcp'
+            out, err, ret = call([self.cmd, '--permanent', '--query-port', tcp_port], verbose_on_failure=False)
+            if not ret:
+                logger.info('Disabling port %s in current zone...' % tcp_port)
+                out, err, ret = call([self.cmd, '--permanent', '--remove-port', tcp_port])
+                if ret:
+                    raise RuntimeError('unable to remove port %s from current zone: %s' %
+                                    (tcp_port, err))
+                else:
+                    logger.info(f"Port {tcp_port} disabled")
+            else:
+                logger.info(f"firewalld port {tcp_port} already closed")
+
     def apply_rules(self):
         # type: () -> None
         if not self.available:
@@ -3205,6 +3240,14 @@ def command_deploy():
                       config=config, keyring=keyring,
                       reconfig=args.reconfig,
                       ports=daemon_ports)
+
+    elif daemon_type == CephadmDaemon.daemon_type:
+        # get current user gid and uid
+        uid = os.getuid()
+        gid = os.getgid()
+        deploy_daemon(args.fsid, daemon_type, daemon_id, None,
+                      uid, gid, ports=daemon_ports)
+
     else:
         raise Error("{} not implemented in command_deploy function".format(daemon_type))
 
@@ -3361,7 +3404,7 @@ def command_ceph_volume():
         privileged=True,
         volume_mounts=mounts,
     )
-    out, err, code = call_throws(c.run_cmd(), verbose=True)
+    out, err, code = call_throws(c.run_cmd(), verbose=not args.no_log_output)
     if not code:
         print(out)
 
@@ -4043,10 +4086,13 @@ def command_rm_daemon():
 
     l = FileLock(args.fsid)
     l.acquire()
+    (daemon_type, daemon_id) = args.name.split('.', 1)
+    if daemon_type == CephadmDaemon.daemon_type:
+        unit_name = f'ceph-{args.fsid}-cephadm.{daemon_id}.service'
+    else:
+        unit_name = get_unit_name_by_daemon_name(args.fsid, args.name)
 
-    unit_name = get_unit_name_by_daemon_name(args.fsid, args.name)
 
-    (daemon_type, daemon_id) = args.name.split('.', 1)
     if daemon_type in ['mon', 'osd'] and not args.force:
         raise Error('must pass --force to proceed: '
                       'this command may destroy precious data!')
@@ -4070,6 +4116,8 @@ def command_rm_daemon():
                   os.path.join(backup_dir, dirname))
     else:
         call_throws(['rm', '-rf', data_dir])
+        if daemon_type == CephadmDaemon.daemon_type:
+            CephadmDaemon.uninstall(args.fsid, daemon_type, daemon_id)
 
 ##################################
 
@@ -5240,29 +5288,16 @@ class CephadmHTTPServer(ThreadingMixIn, HTTPServer):
     daemon_threads = True
 
 
-def cephadmd_install():
-    # open firewall port
-    # copy self to /usr/bin somewhere
-    # create the systemd unit file
-    # enable the systemd unit file
-    # start the systemd unit
-    pass
-
-def cephadmd_uninstall():
-    # shutdown the systemd unit
-    # disable it and remove it
-    # close the firewall port
-    pass
-
-def cephadmd_upgrade():
-    # copy self to the /usr/bin
-    # restart the systemd unit
-    pass
-
-
 class CephadmDaemon():
-    def __init__(self, fsid, port=None):
+    daemon_type = "cephadm"
+    default_port = 5003
+    bin_name = 'cephadm'
+    loop_delay = 0.1
+    thread_check_interval = 5
+
+    def __init__(self, fsid, daemon_id, port=5003):
         self.fsid = fsid
+        self.daemon_id = daemon_id 
         self.port = port
         self.workers = []
         self.http_server = None
@@ -5275,9 +5310,26 @@ class CephadmDaemon():
     @property
     def can_run(self):
         return not self.port_active
-    
+
+    @staticmethod
+    def _unit_name(fsid, daemon_id):
+        return f"ceph-{fsid}-cephadm.{daemon_id}.service"
+
+    @property
+    def unit_name(self):
+        # Our name here will cause a problem with the the old list_daemons code, 
+        # since that only expects daemons to be containers and cephadmd is not!
+        return CephadmDaemon._unit_name(self.fsid, self.daemon_id)
+
+    @property
+    def binary_path(self):
+        return os.path.join(
+                    args.data_dir,
+                    self.fsid,
+                    f'cephadm.{self.daemon_id}',
+                    CephadmDaemon.bin_name)
+
     def _scrape_host_facts(self, refresh_interval=10):
-        loop_delay = 0.1
         ctr = 0
         while True:
             
@@ -5286,25 +5338,79 @@ class CephadmDaemon():
 
             if ctr >= refresh_interval:
                 ctr = 0
-                logger.debug("executing host_facts scrape")
+                logger.debug("executing host-facts scrape")
+                errors = []
                 s_time = time.time()
                 facts = HostFacts()
                 elapsed = time.time() - s_time
-                data = json.loads(facts.dump())
+                try:
+                    data = json.loads(facts.dump())
+                except json.decoder.JSONDecodeError:
+                    errors.append("host-facts received invalid JSON")
+                    logger.warning(errors[-1])
+                    data = {}
                 with cephadm_cache_lock:
                     cephadm_cache['host'] = {
                         "scrape_timestamp": s_time,
                         "scrape_duration_secs": elapsed,
+                        "scrape_errors": errors,
                         "data": data,                    
                     }
                 logger.debug(f"completed host-facts scrape - {elapsed}s")
             
-            time.sleep(loop_delay)
-            ctr += loop_delay
+            time.sleep(CephadmDaemon.loop_delay)
+            ctr += CephadmDaemon.loop_delay
         logger.info("host-facts thread stopped")
 
+    def _scrape_ceph_volume(self, refresh_interval=15):
+        args.command = "inventory --format=json".split()
+        args.fsid = self.fsid
+        args.no_log_output = True
+        ctr = 0
+        while True:
+            if self.stop:
+                break
+            if ctr >= refresh_interval:
+                ctr = 0
+                logger.debug("executing ceph-volume scrape")
+                errors = []
+                s_time = time.time()
+                stream = io.StringIO()
+
+                with redirect_stdout(stream):
+                    command_ceph_volume()
+                elapsed = time.time() - s_time
+
+                # if the call to ceph-volume returns junk with the 
+                # json, it won't parse
+                stdout = stream.getvalue()
+
+                if stdout:
+                    try:
+                        data = json.loads(stdout)
+                    except json.decoder.JSONDecodeError:
+                        errors.append("ceph-volume thread received bad json data")
+                        logger.warning(errors[-1])
+                        data = []
+                else:
+                    errors.append("ceph-volume didn't return any data")
+                    logger.warning(errors[-1])
+
+                with cephadm_cache_lock:
+                    cephadm_cache['disks'] = {
+                        "scrape_timestamp": s_time,
+                        "scrape_duration_secs": elapsed,
+                        "scrape_errors": errors,
+                        "data": data, 
+                    }
+                
+                logger.debug(f"completed ceph-volume scrape - {elapsed}s")
+            time.sleep(CephadmDaemon.loop_delay)
+            ctr += CephadmDaemon.loop_delay
+        
+        logger.info("ceph-volume thread stopped")
+
     def _scrape_list_daemons(self, refresh_interval=20):
-        loop_delay = 0.1
         ctr = 0
         while True:
             if self.stop:
@@ -5312,7 +5418,8 @@ class CephadmDaemon():
             
             if ctr >= refresh_interval:
                 ctr = 0
-                logger.debug("executing list_daemons scrape")
+                logger.debug("executing list-daemons scrape")
+                errors = []
                 s_time = time.time()
                 ld = list_daemons()
                 elapsed = time.time() - s_time
@@ -5320,32 +5427,38 @@ class CephadmDaemon():
                     cephadm_cache['daemons'] = {
                         "scrape_timestamp": s_time,
                         "scrape_duration_secs": elapsed,
+                        "scrape_errors": errors,
                         "data": ld,
                     }
-                logger.debug(f"completed list_daemons scrape - {elapsed}s")
+                logger.debug(f"completed list-daemons scrape - {elapsed}s")
             
-            time.sleep(loop_delay)
-            ctr += loop_delay
+            time.sleep(CephadmDaemon.loop_delay)
+            ctr += CephadmDaemon.loop_delay
         logger.info("list-daemons thread stopped")
 
-    def _create_thread(self, target, name):
-        t = Thread(target=target)
+    def _create_thread(self, target, name, refresh_interval=None):
+        if refresh_interval:
+            t = Thread(target=target, args=(refresh_interval,))
+        else:
+            t = Thread(target=target)
         t.daemon = True
         t.name = name
         with cephadm_cache_lock:
             cephadm_cache['health'][name] = "active"
         t.start()
-        logger.info(f"Started '{name}' thread")
-        return t
 
-    def create_daemon_dirs(self):
-        pass
+        start_msg = f"Started {name} thread"
+        if refresh_interval:
+            logger.info(f"{start_msg}, with a refresh interval of {refresh_interval}s")
+        else:
+            logger.info(f"{start_msg}")
+        return t
 
     def reload(self, *args):
-        logger.info("caught a reload")
+        logger.info("Reload request received")
 
     def shutdown(self, *args):
-        logger.info("Shutting down")
+        logger.info("Shutdown request received")
         self.stop = True
         self.http_server.shutdown()
 
@@ -5361,40 +5474,131 @@ class CephadmDaemon():
         signal.signal(signal.SIGHUP, self.reload)
         logger.debug("Signal handlers attached")
 
-        host_facts = self._create_thread(self._scrape_host_facts, 'host_facts')
+        host_facts = self._create_thread(self._scrape_host_facts, 'host_facts', 5)
         self.workers.append(host_facts)
         
-        daemons = self._create_thread(self._scrape_list_daemons, 'list_daemons')
+        daemons = self._create_thread(self._scrape_list_daemons, 'list_daemons', 20)
         self.workers.append(daemons)
 
+        disks = self._create_thread(self._scrape_ceph_volume, 'ceph_volume', 20)
+        self.workers.append(disks)
+
         self.http_server = CephadmHTTPServer(('0.0.0.0', self.port), CephadmDaemonHandler)  # IPv4 only
         server_thread = self._create_thread(self.http_server.serve_forever, 'http_server')
         logger.info(f"http server listening on {self.http_server.server_address[0]}:{self.http_server.server_port}")
-        # run an event driven loop to check the scrape thread is active
-        # if not, start it again
-        # if started >5 times in 10 seconds, abort with an error
+        
+        ctr = 0
         while server_thread.is_alive():
+            if self.stop:
+                break
 
-            for worker in self.workers:
-                if not worker.is_alive():
-                    logger.warning(f"{worker.name} thread not running")
-                    with cephadm_cache_lock:
-                        # update health in the cache
-                        cephadm_cache['health'][worker.name] = "inactive"
-                else:
-                    pass
-            time.sleep(10)
+            if ctr >= CephadmDaemon.thread_check_interval:
+                ctr = 0
+                for worker in self.workers:
+                    if not worker.is_alive():
+                        logger.warning(f"{worker.name} thread not running")
+                        with cephadm_cache_lock:
+                            # update health in the cache
+                            cephadm_cache['health'][worker.name] = "inactive"
+        
+            time.sleep(CephadmDaemon.loop_delay)
+            ctr += CephadmDaemon.loop_delay
+
+        logger.info("Main http server thread stopped")
+    
+    @property
+    def unit_file(self):
+        py3 = shutil.which('python3')
+        return """#generated by cephadm
+        [Unit]
+Description=cephadm data service (web interface) for cluster {fsid}
+After=network-online.target
+Wants=network-online.target
+
+PartOf=ceph-{fsid}.target
+Before=ceph-{fsid}.target
+
+[Service]
+Type=simple
+ExecStart={py3} {bin_path} cephadmd --fsid {fsid} --port {port}
+Restart=on-failure
+RestartSec=10s
+
+[Install]
+WantedBy=ceph-{fsid}.target
+""".format(
+        py3=py3,
+        fsid=self.fsid,
+        bin_path=self.binary_path,
+        port=self.port)
+
+    def deploy_daemon_unit(self):
+        """deploy a specific unit file for cephadm
+
+        the normal deploy_daemon_units doesn't apply for this
+        daemon since it's not a container, so we just create a
+        simple service definition and add it to the fsid's target
+        """
+        shutil.copy(__file__,
+                    self.binary_path)
+
+        with open(os.path.join(args.unit_dir, f"{self.unit_name}.new"), "w") as f:
+            f.write(self.unit_file)
+            os.rename(
+                os.path.join(args.unit_dir, f"{self.unit_name}.new"),
+                os.path.join(args.unit_dir, self.unit_name))
+        
+        call_throws(['systemctl', 'daemon-reload'])
+        call(['systemctl', 'stop', self.unit_name],
+            verbose_on_failure=False)
+        call(['systemctl', 'reset-failed', self.unit_name],
+            verbose_on_failure=False)
+        call_throws(['systemctl', 'enable', '--now', self.unit_name])
+
+    @classmethod
+    def uninstall(cls, fsid, daemon_type, daemon_id):
+        unit_name = CephadmDaemon._unit_name(fsid, daemon_id)
+        unit_path = os.path.join(args.unit_dir, unit_name)
+        try: 
+            with open(unit_path, "r") as u:
+                contents = u.read().strip()
+        except OSError:
+            logger.warning(f"Unable to access the systemd file @ {unit_path}")
+            return
+
+        for line in contents.split('\n'):
+            if '--port ' in line:
+                try:
+                    port = int(line.split('--port ')[-1])
+                except ValueError:
+                    logger.warning("Unexpected format in unit file: port is not numeric")
+                    logger.warning("Unable to remove the systemd file and close the port")
+                    return
+                break
+
+        if port:
+            fw = Firewalld()
+            try:
+                fw.close_ports([port])
+            except RuntimeError:
+                logger.error(f"Unable to close port {port}")
+
+        stdout, stderr, rc = call(["rm", "-f", unit_path])
+        if rc:
+            logger.error(f"Unable to remove the systemd file @ {unit_path}")
+        else:
+            logger.info(f"removed systemd unit file @ {unit_path}")
+            stdout, stderr, rc = call(["systemctl", "daemon-reload"])
 
 
 def command_cephadmd():
     
-    if args.start:
-        data_dir = '/var/lib/ceph'
-        if args.fsid not in os.listdir(data_dir):
-            raise Error(f"cluster fsid '{args.fsid}' not found in '{data_dir}'")
-    
-        cephadmd = CephadmDaemon(args.fsid, args.port)
-        cephadmd.run()
+    cephadmd = CephadmDaemon(args.fsid, args.port)
+
+    if args.fsid not in os.listdir(args.data_dir):
+        raise Error(f"cluster fsid '{args.fsid}' not found in '{args.data_dir}'")
+            
+    cephadmd.run()
         
 
 
@@ -5616,6 +5820,10 @@ def _get_parser():
     parser_ceph_volume.add_argument(
         '--keyring', '-k',
         help='ceph.keyring to pass through to the container')
+    parser_ceph_volume.add_argument(
+        '--no-log-output',
+        action='store_true',
+        help='suppress ceph volume output from the log')
     parser_ceph_volume.add_argument(
         'command', nargs=argparse.REMAINDER,
         help='command')
@@ -5913,31 +6121,17 @@ def _get_parser():
     parser_gather_facts.set_defaults(func=command_gather_facts)
 
     parser_cephadmd = subparsers.add_parser(
-        'cephadmd', help='Manage cephadm running as a service')
-    parser_cephadmd.add_argument(
-        '--install',
-        action='store_true',
-        help='install the cephadmd exporter service')
-    parser_cephadmd.add_argument(
-        '--start',
-        default=False,
-        action='store_true',
-        help='start the cephadmd service')
+        'cephadmd', help='Start cephadm as a web service, providing host/daemon metadata')
     parser_cephadmd.add_argument(
         '--fsid',
         required=True,
         type=str,
-        default=5003,
         help='fsid of the cephadmd to run against')
     parser_cephadmd.add_argument(
         '--port',
         type=int,
         default=5003,
         help='port number for the cephadmd service')
-    parser_cephadmd.add_argument(
-        '--uninstall',
-        action='store_true',        
-        help='uninstall the cephadmd service')
     parser_cephadmd.set_defaults(func=command_cephadmd)
 
     return parser