From 13713b5caaca151f7b45e5ef27eccd028778efd7 Mon Sep 17 00:00:00 2001 From: Paul Cuzner Date: Wed, 17 Feb 2021 12:49:41 +1300 Subject: [PATCH] mgr/cephadm:Updates to CephadmConfigChecks class Multiple updates to ensure - mgr health checks are raised correctly - checks are independent and may be disabled Signed-off-by: Paul Cuzner --- src/pybind/mgr/cephadm/configchecks.py | 421 +++++++++++++++++++++---- 1 file changed, 354 insertions(+), 67 deletions(-) diff --git a/src/pybind/mgr/cephadm/configchecks.py b/src/pybind/mgr/cephadm/configchecks.py index a2b3a71e72d04..2d52bd7024052 100644 --- a/src/pybind/mgr/cephadm/configchecks.py +++ b/src/pybind/mgr/cephadm/configchecks.py @@ -2,7 +2,7 @@ import json import ipaddress import logging -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, cast +from typing import TYPE_CHECKING, Any, Dict, List, Optional, cast, Tuple, Callable if TYPE_CHECKING: from cephadm.module import CephadmOrchestrator @@ -30,10 +30,10 @@ class HostFacts: self.hdd_count: Optional[int] = None self.hdd_list: Optional[List[Dict[str, Any]]] = None self.hostname: Optional[str] = None - self.interfaces: Dict[str, Dict[str, Any]] = {} + self.interfaces: Optional[Dict[str, Dict[str, Any]]] = None self.kernel: Optional[str] = None self.kernel_parameters: Optional[Dict[str, Any]] = None - self.kernel_security: Dict[str, str] = {} + self.kernel_security: Optional[Dict[str, str]] = None self.memory_available_kb: Optional[int] = None self.memory_free_kb: Optional[int] = None self.memory_total_kb: Optional[int] = None @@ -60,6 +60,24 @@ class HostFacts: else: self._valid = False + def subnet_to_nic(self, subnet: str) -> Optional[str]: + ip_version = ipaddress.ip_network(subnet).version + logger.debug(f"subnet {subnet} is IP version {ip_version}") + interfaces = cast(Dict[str, Dict[str, Any]], self.interfaces) + nic = None + for iface in interfaces.keys(): + addr = '' + if ip_version == 4: + addr = interfaces[iface].get('ipv4_address', '') + else: + addr = interfaces[iface].get('ipv6_address', '') + if addr: + a = addr.split('/')[0] + if ipaddress.ip_address(a) in ipaddress.ip_network(subnet): + nic = iface + break + return nic + class SubnetLookup: def __init__(self, subnet: str, hostname: str, mtu: str, speed: str): @@ -73,7 +91,10 @@ class SubnetLookup: @ property def host_list(self) -> List[str]: - return self.mtu_map[next(iter(self.mtu_map))] + hosts = [] + for mtu in self.mtu_map: + hosts.extend(self.mtu_map.get(mtu, [])) + return hosts def update(self, hostname: str, mtu: str, speed: str) -> None: if mtu in self.mtu_map and hostname not in self.mtu_map[mtu]: @@ -94,22 +115,53 @@ class SubnetLookup: }) +class CephadmCheckDefinition: + def __init__(self, name: str, description: str, parameter: str, func: Callable) -> None: + self.name = name + self.description = description + self.parameter = parameter + self.func = func + + class CephadmConfigChecks: def __init__(self, mgr: "CephadmOrchestrator"): self.mgr: "CephadmOrchestrator" = mgr + self.health_checks: List[CephadmCheckDefinition] = [ + CephadmCheckDefinition("CEPHADM_CHECK_KERNEL_LSM", + "checks SELINUX/Apparmor profiles are consistent across cluster hosts", + "kernel_security", + self._check_kernel_lsm), + CephadmCheckDefinition("CEPHADM_CHECK_SUBSCRIPTION", + "checks subscription states are consistent for all cluster hosts", + "os_subscription", + self._check_subscription), + CephadmCheckDefinition("CEPHADM_CHECK_PUBLIC_MEMBERSHIP", + "check that all hosts have a NIC on the Ceph public_netork", + "public_network", + self._check_public_network), + CephadmCheckDefinition("CEPHADM_CHECK_MTU", + "check that OSD hosts share a common MTU setting", + "osd_mtu_size", + self._check_osd_mtu), + CephadmCheckDefinition("CEPHADM_CHECK_LINKSPEED", + "check that OSD hosts share a common linkspeed", + "osd_linkspeed", + self._check_osd_linkspeed), + CephadmCheckDefinition("CEPHADM_CHECK_NETWORK_MISSING", + "checks that the cluster/public networks defined exist on the Ceph hosts", + "network_missing", + self._check_network_missing), + CephadmCheckDefinition("CEPHADM_CHECK_CEPH_RELEASE", + "check for Ceph version consistency - ceph daemons should be on the same release (unless upgrade is active)", + "ceph_release", + self._check_release_parity), + CephadmCheckDefinition("CEPHADM_CHECK_KERNEL_VERSION", + "checks that the MAJ.MIN of the kernel on Ceph hosts is consistent", + "kernel_version", + self._check_kernel_version), + ] self.log = logger - self.healthchecks: Dict[str, List[str]] = { - "CEPHADM_CHECK_OS": [], - "CEPHADM_CHECK_SUBSCRIPTION": [], - "CEPHADM_CHECK_KERNEL_LSM": [], - "CEPHADM_CHECK_MTU": [], - "CEPHADM_CHECK_LINKSPEED": [], - "CEPHADM_CHECK_PUBLIC_MEMBERSHIP": [], - "CEPHADM_CHECK_NETWORK_MISSING": [], - "CEPHADM_CHECK_DAEMON_DISABLED": [], - "CEPHADM_CHECK_CEPH_VERSION": [], - "CEPHADM_CHECK_KERNEL_VERSION": [], - } + self.host_facts: Dict[str, HostFacts] = {} self.subnet_lookup: Dict[str, SubnetLookup] = {} # subnet CIDR -> SubnetLookup Object self.lsm_to_host: Dict[str, List[str]] = {} self.subscribed: Dict[str, List[str]] = { @@ -121,6 +173,21 @@ class CephadmConfigChecks: self.public_network_list: List[str] = [] self.cluster_network_list: List[str] = [] + self.health_check_raised = False + self.active_checks: List[str] = [] # checks enabled and executed + self.skipped_checks: List[str] = [] # checks enabled, but skipped due to a pre-req + + @property + def defined_checks(self) -> int: + return len(self.health_checks) + + @property + def active_checks_count(self) -> int: + return len(self.active_checks) + + @property + def skipped_checks_count(self) -> int: + return len(self.skipped_checks) def load_network_config(self) -> None: ret, out, _err = self.mgr.mon_command({ @@ -135,8 +202,8 @@ class CephadmConfigChecks: if item['name'] == "public_network": self.public_network_list = item['value'].strip().split(',') - self.log.error(f"public networks {self.public_network_list}") - self.log.error(f"cluster networks {self.cluster_network_list}") + self.log.debug(f"public networks {self.public_network_list}") + self.log.debug(f"cluster networks {self.cluster_network_list}") def _update_subnet_lookups(self, hostname: str, devname: str, nic: Dict[str, Any]) -> None: if nic['ipv4_address']: @@ -160,6 +227,7 @@ class CephadmConfigChecks: self.subnet_lookup[subnet] = SubnetLookup(subnet, hostname, mtu, speed) if nic['ipv6_address']: + # TODO pass def hosts_with_role(self, role: str) -> List[str]: @@ -177,6 +245,17 @@ class CephadmConfigChecks: self.host_to_role.clear() self.kernel_to_hosts.clear() + def _get_majority(self, data: Dict[str, List[str]]) -> Tuple[str, int]: + assert isinstance(data, dict) + + majority_key = '' + majority_count = 0 + for key in data: + if len(data[key]) > majority_count: + majority_count = len(data[key]) + majority_key = key + return majority_key, majority_count + def get_ceph_metadata(self) -> Dict[str, Optional[Dict[str, str]]]: """Build a map of service -> service metadata""" service_map: Dict[str, Optional[Dict[str, str]]] = {} @@ -187,98 +266,281 @@ class CephadmConfigChecks: for service in server.get('services', []): service_map.update( { - f"{service['type']}.{service['id']}": self.mgr.get_metadata(service['type'], service['id']) + f"{service['type']}.{service['id']}": + self.mgr.get_metadata(service['type'], service['id']) } ) return service_map def _check_kernel_lsm(self) -> None: if len(self.lsm_to_host.keys()) > 1: - # selinux security policy issue - CEPHADM_CHECK_KERNEL_LSM - pass + + majority_hosts_ptr, majority_hosts_count = self._get_majority(self.lsm_to_host) + lsm_copy = self.lsm_to_host.copy() + del lsm_copy[majority_hosts_ptr] + details = [] + for lsm_key in lsm_copy.keys(): + for host in lsm_copy[lsm_key]: + details.append( + f"{host} has inconsistent KSM settings compared to the " + f"majority of hosts({majority_hosts_count}) in the cluster") + host_sfx = 's' if len(details) > 1 else '' + self.mgr.health_checks['CEPHADM_CHECK_KERNEL_LSM'] = { + 'severity': 'warning', + 'summary': f"Kernel Security Module (SELinux/AppArmor) is inconsistent for " + f"{len(details)} host{host_sfx}", + 'count': len(details), + 'detail': details, + } + self.health_check_raised = True + else: + self.mgr.health_checks.pop('CEPHADM_CHECK_KERNEL_LSM', None) def _check_subscription(self) -> None: if len(self.subscribed['yes']) > 0 and len(self.subscribed['no']) > 0: # inconsistent subscription states - CEPHADM_CHECK_SUBSCRIPTION - pass + details = [] + for host in self.subscribed['no']: + details.append(f"{host} does not have an active subscription") + self.mgr.health_checks['CEPHADM_CHECK_SUBSCRIPTION'] = { + 'severity': 'warning', + 'summary': f"Support subscriptions inactive on {len(details)} host(s)" + f"({len(self.subscribed['yes'])} subscriptions active)", + 'count': len(details), + 'detail': details, + } + self.health_check_raised = True + else: + self.mgr.health_checks.pop('CEPHADM_CHECK_SUBSCRIPTION', None) def _check_public_network(self) -> None: all_hosts = set(self.mgr.cache.facts.keys()) + self.log.debug(f"checking public network membership for: {all_hosts}") + + public_errors = [] for c_net in self.public_network_list: + self.log.debug(f"checking network {c_net}") subnet_data = self.subnet_lookup.get(c_net, None) + self.log.debug(f"subnet data - {subnet_data}") + if subnet_data: hosts_in_subnet = set(subnet_data.host_list) errors = all_hosts.difference(hosts_in_subnet) if errors: - # CEPHADM_CHECK_PUBLIC_MEMBERSHIP - self.log.error(f"subnet check for {c_net} found {len(errors)} host(s) missing") + public_errors.append(errors) + + if public_errors: + self.mgr.health_checks['CEPHADM_CHECK_PUBLIC_MEMBERSHIP'] = { + 'severity': 'warning', + 'summary': 'Public network is not accessible from cluster hosts', + 'count': 1, + 'detail': ["wah"], + } + self.health_check_raised = True + else: + self.mgr.health_checks.pop('CEPHADM_CHECK_PUBLIC_MEMBERSHIP', None) - def _check_osd_network(self) -> None: + def _check_osd_mtu(self) -> None: osd_hosts = set(self.hosts_with_role('osd')) osd_network_list = self.cluster_network_list or self.public_network_list + mtu_errors: List[str] = [] + for osd_net in osd_network_list: subnet_data = self.subnet_lookup.get(osd_net, None) + if subnet_data: + self.log.debug(f"processing mtu map : {json.dumps(subnet_data.mtu_map)}") + mtu_count = {} + max_hosts = 0 + mtu_ptr = '' + diffs = {} for mtu, host_list in subnet_data.mtu_map.items(): mtu_hosts = set(host_list) + mtu_count[mtu] = len(mtu_hosts) errors = osd_hosts.difference(mtu_hosts) if errors: - self.log.error(f"MTU problem {mtu}: {errors}") - # CEPHADM_CHECK_MTU - pass + diffs[mtu] = errors + if len(errors) > max_hosts: + mtu_ptr = mtu + + if diffs: + self.log.debug("MTU problems detected") + self.log.debug(f"most hosts using {mtu_ptr}") + mtu_copy = subnet_data.mtu_map.copy() + del mtu_copy[mtu_ptr] + for bad_mtu in mtu_copy: + for h in mtu_copy[bad_mtu]: + host = HostFacts() + host.load_facts(self.mgr.cache.facts[h]) + mtu_errors.append( + f"host {h}({host.subnet_to_nic(osd_net)}) is using MTU " + f"{bad_mtu} on {osd_net}, NICs on other hosts use {mtu_ptr}") + + if mtu_errors: + self.mgr.health_checks['CEPHADM_CHECK_MTU'] = { + 'severity': 'warning', + 'summary': f"MTU setting inconsistent on osd network NICs on {len(mtu_errors)} host(s)", + 'count': len(mtu_errors), + 'detail': mtu_errors, + } + self.health_check_raised = True + else: + self.mgr.health_checks.pop('CEPHADM_CHECK_MTU', None) + + def _check_osd_linkspeed(self) -> None: + osd_hosts = set(self.hosts_with_role('osd')) + osd_network_list = self.cluster_network_list or self.public_network_list + + linkspeed_errors = [] + + for osd_net in osd_network_list: + subnet_data = self.subnet_lookup.get(osd_net, None) + + if subnet_data: + + self.log.debug(f"processing subnet : {subnet_data}") + + speed_count = {} + max_hosts = 0 + speed_ptr = '' + diffs = {} for speed, host_list in subnet_data.speed_map.items(): speed_hosts = set(host_list) + speed_count[speed] = len(speed_hosts) errors = osd_hosts.difference(speed_hosts) if errors: - self.log.error(f"Linkspeed problem {speed} : {errors}") - # CEPHADM_CHECK_LINKSPEED - pass + diffs[speed] = errors + if len(errors) > max_hosts: + speed_ptr = speed + + if diffs: + self.log.debug("linkspeed issue(s) detected") + self.log.debug(f"most hosts using {speed_ptr}") + speed_copy = subnet_data.speed_map.copy() + del speed_copy[speed_ptr] + for bad_speed in speed_copy: + for h in speed_copy[bad_speed]: + host = HostFacts() + host.load_facts(self.mgr.cache.facts[h]) + linkspeed_errors.append( + f"host {h}({host.subnet_to_nic(osd_net)}) has linkspeed of " + f"{bad_speed} on {osd_net}, NICs on other hosts use {speed_ptr}") + + if linkspeed_errors: + self.mgr.health_checks['CEPHADM_CHECK_LINKSPEED'] = { + 'severity': 'warning', + 'summary': "Link speed is inconsistent on osd network NICs for " + f"{len(linkspeed_errors)} host(s)", + 'count': len(linkspeed_errors), + 'detail': linkspeed_errors, + } + self.health_check_raised = True + else: + self.mgr.health_checks.pop('CEPHADM_CHECK_LINKSPEED', None) - else: - # set CEPHADM_CHECK_NETWORK_MISSING healthcheck for this subnet + def _check_network_missing(self) -> None: + all_networks = self.public_network_list.copy() + all_networks.extend(self.cluster_network_list) + + missing_networks = [] + for osd_net in all_networks: + subnet_data = self.subnet_lookup.get(osd_net, None) + + if not subnet_data: + missing_networks.append(f"{osd_net} not found on any host in the cluster") self.log.warning( f"Network {osd_net} has been defined, but is not present on any host") - def _check_version_parity(self) -> None: + if missing_networks: + net_sfx = 's' if len(missing_networks) > 1 else '' + self.mgr.health_checks['CEPHADM_CHECK_NETWORK_MISSING'] = { + 'severity': 'warning', + 'summary': f"Public/cluster network{net_sfx} defined, but can not be found on " + "any host", + 'count': len(missing_networks), + 'detail': missing_networks, + } + self.health_check_raised = True + else: + self.mgr.health_checks.pop('CEPHADM_CHECK_NETWORK_MISSING', None) + + def _check_release_parity(self) -> None: upgrade_status = self.mgr.upgrade.upgrade_status() if upgrade_status.in_progress: # skip version consistency checks during an upgrade cycle + self.skipped_checks.append('ceph_release') return services = self.get_ceph_metadata() - self.log.error(json.dumps(services)) - versions: Set[str] = set() + self.log.debug(json.dumps(services)) + version_to_svcs: Dict[str, List[str]] = {} + for svc in services: if services[svc]: metadata = cast(Dict[str, str], services[svc]) - versions.add(metadata.get('ceph_release', '')) - self.log.error(f"versions detected are: {versions}") - if len(versions) > 1: - # CEPHADM_CHECK_CEPH_VERSION + v = metadata.get('ceph_release', '') + if v in version_to_svcs: + version_to_svcs[v].append(svc) + else: + version_to_svcs[v] = [svc] + + if len(version_to_svcs) > 1: + majority_ptr, _majority_count = self._get_majority(version_to_svcs) + ver_copy = version_to_svcs.copy() + del ver_copy[majority_ptr] + details = [] + for v in ver_copy: + for svc in ver_copy[v]: + details.append( + f"{svc} is running {v} (majority of cluster is using {majority_ptr})") + + self.mgr.health_checks['CEPHADM_CHECK_CEPH_RELEASE'] = { + 'severity': 'warning', + 'summary': 'Ceph cluster running mixed ceph releases', + 'count': len(details), + 'detail': details, + } + self.health_check_raised = True self.log.warning( - f"running with {len(versions)} different ceph releases within this cluster") + f"running with {len(version_to_svcs)} different ceph releases within this cluster") + else: + self.mgr.health_checks.pop('CEPHADM_CHECK_CEPH_RELEASE', None) def _check_kernel_version(self) -> None: if len(self.kernel_to_hosts.keys()) > 1: - self.log.warning("mixed kernel versions") - # CEPHADM_CHECK_KERNEL_VERSION - - def run_checks(self) -> None: - - # os_to_host = {} # TODO needs better support in gather-facts - self.reset() + majority_hosts_ptr, majority_hosts_count = self._get_majority(self.kernel_to_hosts) + kver_copy = self.kernel_to_hosts.copy() + del kver_copy[majority_hosts_ptr] + details = [] + for k in kver_copy: + for h in kver_copy[k]: + details.append( + f"host {h} running kernel {k}, majority of hosts({majority_hosts_count}) " + f"running {majority_hosts_ptr}") + + self.log.warning("mixed kernel versions detected") + self.mgr.health_checks['CEPHADM_CHECK_KERNEL_VERSION'] = { + 'severity': 'warning', + 'summary': f"{len(details)} host(s) running different kernel versions", + 'count': len(details), + 'detail': details, + } + self.health_check_raised = True + else: + self.mgr.health_checks.pop('CEPHADM_CHECK_KERNEL_VERSION', None) - # build lookup "maps" by walking the host facts + def _process_hosts(self) -> None: + self.log.debug(f"processing data from {len(self.mgr.cache.facts)} hosts") for hostname in self.mgr.cache.facts: - self.log.error(json.dumps(self.mgr.cache.facts[hostname])) host = HostFacts() host.load_facts(self.mgr.cache.facts[hostname]) if not host._valid: self.log.warning(f"skipping {hostname} - incompatible host facts") continue - lsm_desc = host.kernel_security.get('description', '') + kernel_lsm = cast(Dict[str, str], host.kernel_security) + lsm_desc = kernel_lsm.get('description', '') if lsm_desc: if lsm_desc in self.lsm_to_host: self.lsm_to_host[lsm_desc].append(hostname) @@ -289,7 +551,7 @@ class CephadmConfigChecks: if subscription_state: self.subscribed[subscription_state].append(hostname) - interfaces: Dict[str, Dict[str, Any]] = host.interfaces + interfaces = cast(Dict[str, Dict[str, Any]], host.interfaces) for name in interfaces.keys(): if name in ['lo']: continue @@ -310,18 +572,43 @@ class CephadmConfigChecks: self.host_to_role[hostname] = list({host_daemons[name].daemon_type for name in host_daemons}) - # checks - # TODO only call the check if the check is enabled - self._check_kernel_lsm() - self._check_subscription() - self._check_public_network() - self._check_osd_network() - self._check_version_parity() - self._check_kernel_version() - - self.log.error(f"lsm status {json.dumps(self.lsm_to_host)}") - self.log.error(f"subscription states : {json.dumps(self.subscribed)}") - - self.log.error(f"mtu data : {str(self.subnet_lookup)}") - self.log.error(f"roles : {json.dumps(self.host_to_role)}") - self.log.error(f"kernel versions : {json.dumps(self.kernel_to_hosts)}") + def run_checks(self) -> None: + + checks_enabled = self.mgr.get_module_option('config_checks_enabled') + if checks_enabled != 'true': + self.log.info("ALL cephadm checks are disabled, use 'ceph config set " + "mgr/cephadm/config_checks_enabled true' to enable") + return + + self.reset() + + check_config: Dict[str, str] = {} + checks_raw: Optional[str] = self.mgr.get_store('config_checks') + if checks_raw: + try: + check_config.update(json.loads(checks_raw)) + except json.JSONDecodeError: + self.log.error( + "mgr/cephadm/config_checks is not JSON serializable - all checks will run") + + # build lookup "maps" by walking the host facts, once + self._process_hosts() + + self.health_check_raised = False + self.active_checks = [] + self.skipped_checks = [] + + # process all healthchecks that are not explcitly disabled + for health_check in self.health_checks: + if check_config.get(health_check.parameter, '') != 'disabled': + self.active_checks.append(health_check.parameter) + health_check.func() + + if self.health_check_raised: + self.log.warning("CEPHADM checks have detected configuration anomalies") + else: + self.log.info( + f"CEPHADM {self.active_checks_count}/{self.defined_checks} checks enabled " + f"and executed ({self.skipped_checks_count} bypassed). No issues detected") + + self.mgr.set_health_checks(self.mgr.health_checks) -- 2.39.5