From 95b105c0a52e67f3d76a5e9e3b74466126352bb1 Mon Sep 17 00:00:00 2001 From: Paul Cuzner Date: Thu, 10 Sep 2020 09:13:01 +1200 Subject: [PATCH] cephadm: initial webservice implementation Adds a cephadmd subcommand to start a web service to expose the gather-facts and list-daemons data over HTTP. Multiple threads are used - one for the main http service, and 2 others for each of the data gathering calls. Once data is gathered it is held in cache, and use to respond to the HTTP GET requests. Signed-off-by: Paul Cuzner --- src/cephadm/cephadm | 219 +++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 218 insertions(+), 1 deletion(-) diff --git a/src/cephadm/cephadm b/src/cephadm/cephadm index b8e4f19f18214..8f2b411ed1575 100755 --- a/src/cephadm/cephadm +++ b/src/cephadm/cephadm @@ -59,6 +59,10 @@ import tempfile import time import errno import struct +from socketserver import ThreadingMixIn +from http.server import BaseHTTPRequestHandler, HTTPServer +import signal + try: from typing import Dict, List, Tuple, Optional, Union, Any, NoReturn, Callable, IO except ImportError: @@ -67,7 +71,7 @@ import uuid from functools import wraps from glob import glob -from threading import Thread +from threading import Thread, RLock if sys.version_info >= (3, 0): from io import StringIO @@ -93,6 +97,12 @@ cached_stdin = None DATEFMT = '%Y-%m-%dT%H:%M:%S.%f' +cephadm_cache = { + "health": {}, + "host": {}, + "daemons": {}, +} +cephadm_cache_lock = RLock() class termcolor: yellow = '\033[93m' @@ -5212,6 +5222,185 @@ def command_gather_facts(): ################################## +class CephadmDaemonHandler(BaseHTTPRequestHandler): + def do_GET(self): + # return the current state of the cache + self.send_response(200) + self.send_header('Content-type','application/json') + self.end_headers() + self.wfile.write(bytes(json.dumps(cephadm_cache), 'ascii')) + + def log_message(self, format, *args): + rqst = " ".join(args) + logger.info(f"client:{self.address_string()} [{self.log_date_time_string()}] {rqst}") + + +class CephadmHTTPServer(ThreadingMixIn, HTTPServer): + allow_reuse_address = True + daemon_threads = True + + +def cephadmd_install(): + # open firewall port + # copy self to /usr/bin somewhere + # create the systemd unit file + # enable the systemd unit file + # start the systemd unit + pass + +def cephadmd_uninstall(): + # shutdown the systemd unit + # disable it and remove it + # close the firewall port + pass + +def cephadmd_upgrade(): + # copy self to the /usr/bin + # restart the systemd unit + pass + + +class CephadmDaemon(): + def __init__(self, fsid, port=None): + self.fsid = fsid + self.port = port + self.workers = [] + self.http_server = None + self.stop = False + + @property + def port_active(self): + return port_in_use(self.port) + + @property + def can_run(self): + return not self.port_active + + def _scrape_host_facts(self, refresh_interval=10): + loop_delay = 0.1 + ctr = 0 + while True: + + if self.stop: + break + + if ctr >= refresh_interval: + ctr = 0 + logger.debug("executing host_facts scrape") + s_time = time.time() + facts = HostFacts() + elapsed = time.time() - s_time + data = json.loads(facts.dump()) + with cephadm_cache_lock: + cephadm_cache['host'] = { + "scrape_timestamp": s_time, + "scrape_duration_secs": elapsed, + "data": data, + } + logger.debug(f"completed host-facts scrape - {elapsed}s") + + time.sleep(loop_delay) + ctr += loop_delay + logger.info("host-facts thread stopped") + + def _scrape_list_daemons(self, refresh_interval=20): + loop_delay = 0.1 + ctr = 0 + while True: + if self.stop: + break + + if ctr >= refresh_interval: + ctr = 0 + logger.debug("executing list_daemons scrape") + s_time = time.time() + ld = list_daemons() + elapsed = time.time() - s_time + with cephadm_cache_lock: + cephadm_cache['daemons'] = { + "scrape_timestamp": s_time, + "scrape_duration_secs": elapsed, + "data": ld, + } + logger.debug(f"completed list_daemons scrape - {elapsed}s") + + time.sleep(loop_delay) + ctr += loop_delay + logger.info("list-daemons thread stopped") + + def _create_thread(self, target, name): + t = Thread(target=target) + t.daemon = True + t.name = name + with cephadm_cache_lock: + cephadm_cache['health'][name] = "active" + t.start() + logger.info(f"Started '{name}' thread") + return t + + def create_daemon_dirs(self): + pass + + def reload(self, *args): + logger.info("caught a reload") + + def shutdown(self, *args): + logger.info("Shutting down") + self.stop = True + self.http_server.shutdown() + + def run(self): + logger.info(f"cephadmd starting for FSID '{self.fsid}'") + if not self.can_run: + logger.error("Unable to start") + return + + # register signal handlers for running under systemd control + signal.signal(signal.SIGTERM, self.shutdown) + signal.signal(signal.SIGINT, self.shutdown) + signal.signal(signal.SIGHUP, self.reload) + logger.debug("Signal handlers attached") + + host_facts = self._create_thread(self._scrape_host_facts, 'host_facts') + self.workers.append(host_facts) + + daemons = self._create_thread(self._scrape_list_daemons, 'list_daemons') + self.workers.append(daemons) + + self.http_server = CephadmHTTPServer(('0.0.0.0', self.port), CephadmDaemonHandler) # IPv4 only + server_thread = self._create_thread(self.http_server.serve_forever, 'http_server') + logger.info(f"http server listening on {self.http_server.server_address[0]}:{self.http_server.server_port}") + # run an event driven loop to check the scrape thread is active + # if not, start it again + # if started >5 times in 10 seconds, abort with an error + while server_thread.is_alive(): + + for worker in self.workers: + if not worker.is_alive(): + logger.warning(f"{worker.name} thread not running") + with cephadm_cache_lock: + # update health in the cache + cephadm_cache['health'][worker.name] = "inactive" + else: + pass + time.sleep(10) + + +def command_cephadmd(): + + if args.start: + data_dir = '/var/lib/ceph' + if args.fsid not in os.listdir(data_dir): + raise Error(f"cluster fsid '{args.fsid}' not found in '{data_dir}'") + + cephadmd = CephadmDaemon(args.fsid, args.port) + cephadmd.run() + + + +################################## + + def _get_parser(): # type: () -> argparse.ArgumentParser parser = argparse.ArgumentParser( @@ -5723,6 +5912,34 @@ def _get_parser(): 'gather-facts', help='gather and return host related information (JSON format)') parser_gather_facts.set_defaults(func=command_gather_facts) + parser_cephadmd = subparsers.add_parser( + 'cephadmd', help='Manage cephadm running as a service') + parser_cephadmd.add_argument( + '--install', + action='store_true', + help='install the cephadmd exporter service') + parser_cephadmd.add_argument( + '--start', + default=False, + action='store_true', + help='start the cephadmd service') + parser_cephadmd.add_argument( + '--fsid', + required=True, + type=str, + default=5003, + help='fsid of the cephadmd to run against') + parser_cephadmd.add_argument( + '--port', + type=int, + default=5003, + help='port number for the cephadmd service') + parser_cephadmd.add_argument( + '--uninstall', + action='store_true', + help='uninstall the cephadmd service') + parser_cephadmd.set_defaults(func=command_cephadmd) + return parser -- 2.39.5