From b014cf525b76187a277d7c3b37e022b80fb26d6f Mon Sep 17 00:00:00 2001 From: Paul Cuzner Date: Thu, 26 Nov 2020 13:02:54 +1300 Subject: [PATCH] cephadm: updates to cephadm configuration and runtime behaviour Multiple changes based on PR feedback - invocation of set-exporter-config now sets all paramters - customvalidation of exporter-config now throws argparse exceptions - config validation now checks for SSL crt/key format - failing threads are now caught and change the http response to the caller (500, and 503 may not be returned) - bootstrap options descriptions tidied up Signed-off-by: Paul Cuzner --- src/cephadm/cephadm | 263 ++++++++++++++++++++++++++++---------------- 1 file changed, 166 insertions(+), 97 deletions(-) diff --git a/src/cephadm/cephadm b/src/cephadm/cephadm index b63f3928d76f0..e6711f22fc5e0 100755 --- a/src/cephadm/cephadm +++ b/src/cephadm/cephadm @@ -3233,26 +3233,18 @@ def command_bootstrap(): if args.with_exporter: cli(['config-key', 'set', 'mgr/cephadm/exporter_enabled', 'true']) if args.exporter_config: - logger.info("Applying custom SSL/port settings for the cephadm exporter") + logger.info("Applying custom cephadm exporter settings") # validated within the parser, so we can just apply to the store - - # create a tmp file wwrite the config to itthen set it with tempfile.NamedTemporaryFile(buffering=0) as tmp: - tmp.write(json.dumps({ - CephadmDaemon.crt_name: args.exporter_config[CephadmDaemon.crt_name], - CephadmDaemon.key_name: args.exporter_config[CephadmDaemon.key_name] - })) + tmp.write(json.dumps(args.exporter_config).encode('utf-8')) mounts = { - tmp.name: '/tmp/exporter_tls.json:z' + tmp.name: "/tmp/exporter-config.json:z" } - cli(["cephadm", "set-exporter-tls", "-i", "/tmp/exporter_tls.json"], extra_mounts=mounts) - - cli(["cephadm", "set-exporter-config", f"token='{args.exporter_config[CephadmDaemon.token_name]}'"]) - port = args.exporter_config.get('port', CephadmDaemon.default_port) - cli(["cephadm", "set-exporter-config", f"port={str(port)}"]) + cli(["cephadm", "set-exporter-config", "-i", "/tmp/exporter-config.json"], extra_mounts=mounts) + logger.info("-> Use ceph orch apply cephadm-exporter to deploy") else: # generate a default SSL configuration for the exporter(s) - logger.info("Generating a default self-signed SSL cert/key for the cephadm exporter") + logger.info("Generating a default cephadm exporter configuration (self-signed)") cli(['cephadm', 'generate-exporter-config']) # # deploy the service (commented out until the cephadm changes are in the ceph container build) @@ -4584,7 +4576,13 @@ class CustomValidation(argparse.Action): setattr(namespace, self.dest, values) elif self.dest == 'exporter_config': cfg = get_parm(values) - CephadmDaemon.validate_config(cfg) + # run the class' validate method, and convert to an argparse error + # if problems are found + try: + CephadmDaemon.validate_config(cfg) + except Error as e: + raise argparse.ArgumentError(self, + str(e)) setattr(namespace, self.dest, cfg) ################################## @@ -5590,8 +5588,13 @@ class CephadmCache: def update_task(self, task_type, content): assert task_type in CephadmCache.task_types + assert isinstance(content, dict) with self.lock: - setattr(self, task_type, content) + current = getattr(self, task_type) + for k in content: + current[k] = content[k] + + setattr(self, task_type, current) class CephadmHTTPServer(ThreadingMixIn, HTTPServer): @@ -5679,18 +5682,41 @@ td,th {{ elif self.path in CephadmDaemonHandler.valid_routes: u = self.path.split('/')[-1] data = json.dumps({}) + status_code = 200 + + tasks = self.server.cephadm_cache.health.get('tasks', {}) + assert tasks + + # We're using the http status code to help indicate thread health + # - all threads inactive returns a 500 (Internal Server Error) + # - some threads inactive returns a 503 (Service Unavailable) if u == 'metadata': data = json.dumps(self.server.cephadm_cache.to_json()) + if all([tasks[task_name] == 'inactive' for task_name in tasks if task_name != 'http_server']): + # the subtasks are dead + status_code = 500 + elif any([tasks[task_name] == 'inactive' for task_name in tasks if task_name != 'http_server']): + status_code = 503 + + # Individual GETs against the a tasks endpoint will also return a 503 if the corresponding thread is inactive elif u == 'daemons': data = json.dumps(self.server.cephadm_cache.daemons) + if tasks['daemons'] == 'inactive': + status_code = 503 elif u == 'disks': data = json.dumps(self.server.cephadm_cache.disks) - elif u == 'health': - data = json.dumps(self.server.cephadm_cache.health) + if tasks['disks'] == 'inactive': + status_code = 503 elif u == 'host': data = json.dumps(self.server.cephadm_cache.host) - - self.send_response(200) + if tasks['host'] == 'inactive': + status_code = 503 + + # a GET against health will always return a 200, since the op is always successful + elif u == 'health': + data = json.dumps(self.server.cephadm_cache.health) + + self.send_response(status_code) self.send_header('Content-type','application/json') self.end_headers() self.wfile.write(data.encode('utf-8')) @@ -5742,15 +5768,22 @@ class CephadmDaemon(): reqs = ", ".join(CephadmDaemon.config_requirements) errors = [] - if not config or not all(k_name in config for k_name in CephadmDaemon.config_requirements): - raise Error("config must contain the following fields : {}".format(reqs)) + if not config or not all([k_name in config for k_name in CephadmDaemon.config_requirements]): + raise Error(f"config must contain the following fields : {reqs}") - if not isinstance(config[CephadmDaemon.crt_name], str) or not isinstance(config[CephadmDaemon.key_name], str): - errors.append("crt and key fields must be 'str' types") + if not all([isinstance(config[k_name], str) for k_name in CephadmDaemon.config_requirements]): + errors.append(f"the following fields must be strings: {reqs}") + crt = config[CephadmDaemon.crt_name] + key = config[CephadmDaemon.key_name] token = config[CephadmDaemon.token_name] - if len(token) < 8 or not isinstance(token, str): - errors.append("token must be of type 'str' and more than 8 characters long") + + if not crt.startswith('-----BEGIN CERTIFICATE-----') or not crt.endswith('-----END CERTIFICATE-----\n'): + errors.append("crt field is not a valid SSL certificate") + if not key.startswith('-----BEGIN PRIVATE KEY-----') or not key.endswith('-----END PRIVATE KEY-----\n'): + errors.append("key is not a valid SSL private key") + if len(token) < 8: + errors.append("'token' must be more than 8 characters long") if 'port' in config: try: @@ -5761,7 +5794,7 @@ class CephadmDaemon(): errors.append("port must be an integer > 1024") if errors: - raise Error("Exporter config error(s): {}".format(", ".join(errors))) + raise Error("Parameter errors : {}".format(", ".join(errors))) @property def port_active(self): @@ -5804,11 +5837,25 @@ class CephadmDaemon(): CephadmDaemon.bin_name ) + def _handle_thread_exception(self, exc, thread_type): + e_msg = f"{exc.__class__.__name__} exception: {str(exc)}" + errors = [e_msg] + logger.error(e_msg) + logger.exception(e) + self.cephadm_cache.update_task( + thread_type, + { + "scrape_errors": errors, + } + ) + def _scrape_host_facts(self, refresh_interval=10): ctr = 0 + exception_encountered = False + while True: - if self.stop: + if self.stop or exception_encountered: break if ctr >= refresh_interval: @@ -5816,74 +5863,90 @@ class CephadmDaemon(): logger.debug("executing host-facts scrape") errors = [] s_time = time.time() - facts = HostFacts() - elapsed = time.time() - s_time + try: - data = json.loads(facts.dump()) - except json.decoder.JSONDecodeError: - errors.append("host-facts provided invalid JSON") - logger.warning(errors[-1]) - data = {} - self.cephadm_cache.update_task( - 'host', - { - "scrape_timestamp": s_time, - "scrape_duration_secs": elapsed, - "scrape_errors": errors, - "data": data, - } - ) - logger.debug(f"completed host-facts scrape - {elapsed}s") + facts = HostFacts() + except Exception as e: + self._handle_thread_exception(e, 'host') + exception_encountered = True + else: + elapsed = time.time() - s_time + try: + data = json.loads(facts.dump()) + except json.decoder.JSONDecodeError: + errors.append("host-facts provided invalid JSON") + logger.warning(errors[-1]) + data = {} + self.cephadm_cache.update_task( + 'host', + { + "scrape_timestamp": s_time, + "scrape_duration_secs": elapsed, + "scrape_errors": errors, + "data": data, + } + ) + logger.debug(f"completed host-facts scrape - {elapsed}s") time.sleep(CephadmDaemon.loop_delay) ctr += CephadmDaemon.loop_delay logger.info("host-facts thread stopped") def _scrape_ceph_volume(self, refresh_interval=15): + # we're invoking the ceph_volume command, so we need to set the args that it + # expects to use args.command = "inventory --format=json".split() args.fsid = self.fsid args.log_output = False + ctr = 0 + exception_encountered = False + while True: - if self.stop: + if self.stop or exception_encountered: break + if ctr >= refresh_interval: ctr = 0 logger.debug("executing ceph-volume scrape") errors = [] s_time = time.time() stream = io.StringIO() - - with redirect_stdout(stream): - command_ceph_volume() - elapsed = time.time() - s_time - - # if the call to ceph-volume returns junk with the - # json, it won't parse - stdout = stream.getvalue() - - if stdout: - try: - data = json.loads(stdout) - except json.decoder.JSONDecodeError: - errors.append("ceph-volume thread provided bad json data") - logger.warning(errors[-1]) - data = [] + try: + with redirect_stdout(stream): + command_ceph_volume() + except Exception as e: + self._handle_thread_exception(e, 'disks') + exception_encountered = True else: - errors.append("ceph-volume didn't return any data") - logger.warning(errors[-1]) - - self.cephadm_cache.update_task( - 'disks', - { - "scrape_timestamp": s_time, - "scrape_duration_secs": elapsed, - "scrape_errors": errors, - "data": data, - } - ) - - logger.debug(f"completed ceph-volume scrape - {elapsed}s") + elapsed = time.time() - s_time + + # if the call to ceph-volume returns junk with the + # json, it won't parse + stdout = stream.getvalue() + + if stdout: + try: + data = json.loads(stdout) + except json.decoder.JSONDecodeError: + errors.append("ceph-volume thread provided bad json data") + logger.warning(errors[-1]) + data = [] + else: + errors.append("ceph-volume didn't return any data") + logger.warning(errors[-1]) + + self.cephadm_cache.update_task( + 'disks', + { + "scrape_timestamp": s_time, + "scrape_duration_secs": elapsed, + "scrape_errors": errors, + "data": data, + } + ) + + logger.debug(f"completed ceph-volume scrape - {elapsed}s") time.sleep(CephadmDaemon.loop_delay) ctr += CephadmDaemon.loop_delay @@ -5891,8 +5954,9 @@ class CephadmDaemon(): def _scrape_list_daemons(self, refresh_interval=20): ctr = 0 + exception_encountered = False while True: - if self.stop: + if self.stop or exception_encountered: break if ctr >= refresh_interval: @@ -5900,24 +5964,29 @@ class CephadmDaemon(): logger.debug("executing list-daemons scrape") errors = [] s_time = time.time() - - # list daemons should ideally be invoked with a fsid - data = list_daemons() - if not isinstance(data, list): - errors.append("list-daemons didn't supply a list?") - logger.warning(errors[-1]) - data = [] - elapsed = time.time() - s_time - self.cephadm_cache.update_task( - 'daemons', - { - "scrape_timestamp": s_time, - "scrape_duration_secs": elapsed, - "scrape_errors": errors, - "data": data, - } - ) - logger.debug(f"completed list-daemons scrape - {elapsed}s") + + try: + # list daemons should ideally be invoked with a fsid + data = list_daemons() + except Exception as e: + self._handle_thread_exception(e, 'daemons') + exception_encountered = True + else: + if not isinstance(data, list): + errors.append("list-daemons didn't supply a list?") + logger.warning(errors[-1]) + data = [] + elapsed = time.time() - s_time + self.cephadm_cache.update_task( + 'daemons', + { + "scrape_timestamp": s_time, + "scrape_duration_secs": elapsed, + "scrape_errors": errors, + "data": data, + } + ) + logger.debug(f"completed list-daemons scrape - {elapsed}s") time.sleep(CephadmDaemon.loop_delay) ctr += CephadmDaemon.loop_delay @@ -6548,11 +6617,11 @@ def _get_parser(): parser_bootstrap.add_argument( '--with-exporter', action='store_true', - help='Do not automatically deploy cephadm metadata exporter (https) to each node') + help='Automatically deploy cephadm metadata exporter to each node') parser_bootstrap.add_argument( '--exporter-config', action=CustomValidation, - help='Configuration information in JSON format, providing SSL configuration settings') + help=f'Exporter configuration information in JSON format (providing: {", ".join(CephadmDaemon.config_requirements)}, port information)') parser_deploy = subparsers.add_parser( 'deploy', help='deploy a daemon') -- 2.39.5