import ipaddress
import logging
-from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, cast
+from typing import TYPE_CHECKING, Any, Dict, List, Optional, cast, Tuple, Callable
if TYPE_CHECKING:
from cephadm.module import CephadmOrchestrator
self.hdd_count: Optional[int] = None
self.hdd_list: Optional[List[Dict[str, Any]]] = None
self.hostname: Optional[str] = None
- self.interfaces: Dict[str, Dict[str, Any]] = {}
+ self.interfaces: Optional[Dict[str, Dict[str, Any]]] = None
self.kernel: Optional[str] = None
self.kernel_parameters: Optional[Dict[str, Any]] = None
- self.kernel_security: Dict[str, str] = {}
+ self.kernel_security: Optional[Dict[str, str]] = None
self.memory_available_kb: Optional[int] = None
self.memory_free_kb: Optional[int] = None
self.memory_total_kb: Optional[int] = None
else:
self._valid = False
+ def subnet_to_nic(self, subnet: str) -> Optional[str]:
+ ip_version = ipaddress.ip_network(subnet).version
+ logger.debug(f"subnet {subnet} is IP version {ip_version}")
+ interfaces = cast(Dict[str, Dict[str, Any]], self.interfaces)
+ nic = None
+ for iface in interfaces.keys():
+ addr = ''
+ if ip_version == 4:
+ addr = interfaces[iface].get('ipv4_address', '')
+ else:
+ addr = interfaces[iface].get('ipv6_address', '')
+ if addr:
+ a = addr.split('/')[0]
+ if ipaddress.ip_address(a) in ipaddress.ip_network(subnet):
+ nic = iface
+ break
+ return nic
+
class SubnetLookup:
def __init__(self, subnet: str, hostname: str, mtu: str, speed: str):
@ property
def host_list(self) -> List[str]:
- return self.mtu_map[next(iter(self.mtu_map))]
+ hosts = []
+ for mtu in self.mtu_map:
+ hosts.extend(self.mtu_map.get(mtu, []))
+ return hosts
def update(self, hostname: str, mtu: str, speed: str) -> None:
if mtu in self.mtu_map and hostname not in self.mtu_map[mtu]:
})
+class CephadmCheckDefinition:
+ def __init__(self, name: str, description: str, parameter: str, func: Callable) -> None:
+ self.name = name
+ self.description = description
+ self.parameter = parameter
+ self.func = func
+
+
class CephadmConfigChecks:
def __init__(self, mgr: "CephadmOrchestrator"):
self.mgr: "CephadmOrchestrator" = mgr
+ self.health_checks: List[CephadmCheckDefinition] = [
+ CephadmCheckDefinition("CEPHADM_CHECK_KERNEL_LSM",
+ "checks SELINUX/Apparmor profiles are consistent across cluster hosts",
+ "kernel_security",
+ self._check_kernel_lsm),
+ CephadmCheckDefinition("CEPHADM_CHECK_SUBSCRIPTION",
+ "checks subscription states are consistent for all cluster hosts",
+ "os_subscription",
+ self._check_subscription),
+ CephadmCheckDefinition("CEPHADM_CHECK_PUBLIC_MEMBERSHIP",
+ "check that all hosts have a NIC on the Ceph public_netork",
+ "public_network",
+ self._check_public_network),
+ CephadmCheckDefinition("CEPHADM_CHECK_MTU",
+ "check that OSD hosts share a common MTU setting",
+ "osd_mtu_size",
+ self._check_osd_mtu),
+ CephadmCheckDefinition("CEPHADM_CHECK_LINKSPEED",
+ "check that OSD hosts share a common linkspeed",
+ "osd_linkspeed",
+ self._check_osd_linkspeed),
+ CephadmCheckDefinition("CEPHADM_CHECK_NETWORK_MISSING",
+ "checks that the cluster/public networks defined exist on the Ceph hosts",
+ "network_missing",
+ self._check_network_missing),
+ CephadmCheckDefinition("CEPHADM_CHECK_CEPH_RELEASE",
+ "check for Ceph version consistency - ceph daemons should be on the same release (unless upgrade is active)",
+ "ceph_release",
+ self._check_release_parity),
+ CephadmCheckDefinition("CEPHADM_CHECK_KERNEL_VERSION",
+ "checks that the MAJ.MIN of the kernel on Ceph hosts is consistent",
+ "kernel_version",
+ self._check_kernel_version),
+ ]
self.log = logger
- self.healthchecks: Dict[str, List[str]] = {
- "CEPHADM_CHECK_OS": [],
- "CEPHADM_CHECK_SUBSCRIPTION": [],
- "CEPHADM_CHECK_KERNEL_LSM": [],
- "CEPHADM_CHECK_MTU": [],
- "CEPHADM_CHECK_LINKSPEED": [],
- "CEPHADM_CHECK_PUBLIC_MEMBERSHIP": [],
- "CEPHADM_CHECK_NETWORK_MISSING": [],
- "CEPHADM_CHECK_DAEMON_DISABLED": [],
- "CEPHADM_CHECK_CEPH_VERSION": [],
- "CEPHADM_CHECK_KERNEL_VERSION": [],
- }
+ self.host_facts: Dict[str, HostFacts] = {}
self.subnet_lookup: Dict[str, SubnetLookup] = {} # subnet CIDR -> SubnetLookup Object
self.lsm_to_host: Dict[str, List[str]] = {}
self.subscribed: Dict[str, List[str]] = {
self.public_network_list: List[str] = []
self.cluster_network_list: List[str] = []
+ self.health_check_raised = False
+ self.active_checks: List[str] = [] # checks enabled and executed
+ self.skipped_checks: List[str] = [] # checks enabled, but skipped due to a pre-req
+
+ @property
+ def defined_checks(self) -> int:
+ return len(self.health_checks)
+
+ @property
+ def active_checks_count(self) -> int:
+ return len(self.active_checks)
+
+ @property
+ def skipped_checks_count(self) -> int:
+ return len(self.skipped_checks)
def load_network_config(self) -> None:
ret, out, _err = self.mgr.mon_command({
if item['name'] == "public_network":
self.public_network_list = item['value'].strip().split(',')
- self.log.error(f"public networks {self.public_network_list}")
- self.log.error(f"cluster networks {self.cluster_network_list}")
+ self.log.debug(f"public networks {self.public_network_list}")
+ self.log.debug(f"cluster networks {self.cluster_network_list}")
def _update_subnet_lookups(self, hostname: str, devname: str, nic: Dict[str, Any]) -> None:
if nic['ipv4_address']:
self.subnet_lookup[subnet] = SubnetLookup(subnet, hostname, mtu, speed)
if nic['ipv6_address']:
+ # TODO
pass
def hosts_with_role(self, role: str) -> List[str]:
self.host_to_role.clear()
self.kernel_to_hosts.clear()
+ def _get_majority(self, data: Dict[str, List[str]]) -> Tuple[str, int]:
+ assert isinstance(data, dict)
+
+ majority_key = ''
+ majority_count = 0
+ for key in data:
+ if len(data[key]) > majority_count:
+ majority_count = len(data[key])
+ majority_key = key
+ return majority_key, majority_count
+
def get_ceph_metadata(self) -> Dict[str, Optional[Dict[str, str]]]:
"""Build a map of service -> service metadata"""
service_map: Dict[str, Optional[Dict[str, str]]] = {}
for service in server.get('services', []):
service_map.update(
{
- f"{service['type']}.{service['id']}": self.mgr.get_metadata(service['type'], service['id'])
+ f"{service['type']}.{service['id']}":
+ self.mgr.get_metadata(service['type'], service['id'])
}
)
return service_map
def _check_kernel_lsm(self) -> None:
if len(self.lsm_to_host.keys()) > 1:
- # selinux security policy issue - CEPHADM_CHECK_KERNEL_LSM
- pass
+
+ majority_hosts_ptr, majority_hosts_count = self._get_majority(self.lsm_to_host)
+ lsm_copy = self.lsm_to_host.copy()
+ del lsm_copy[majority_hosts_ptr]
+ details = []
+ for lsm_key in lsm_copy.keys():
+ for host in lsm_copy[lsm_key]:
+ details.append(
+ f"{host} has inconsistent KSM settings compared to the "
+ f"majority of hosts({majority_hosts_count}) in the cluster")
+ host_sfx = 's' if len(details) > 1 else ''
+ self.mgr.health_checks['CEPHADM_CHECK_KERNEL_LSM'] = {
+ 'severity': 'warning',
+ 'summary': f"Kernel Security Module (SELinux/AppArmor) is inconsistent for "
+ f"{len(details)} host{host_sfx}",
+ 'count': len(details),
+ 'detail': details,
+ }
+ self.health_check_raised = True
+ else:
+ self.mgr.health_checks.pop('CEPHADM_CHECK_KERNEL_LSM', None)
def _check_subscription(self) -> None:
if len(self.subscribed['yes']) > 0 and len(self.subscribed['no']) > 0:
# inconsistent subscription states - CEPHADM_CHECK_SUBSCRIPTION
- pass
+ details = []
+ for host in self.subscribed['no']:
+ details.append(f"{host} does not have an active subscription")
+ self.mgr.health_checks['CEPHADM_CHECK_SUBSCRIPTION'] = {
+ 'severity': 'warning',
+ 'summary': f"Support subscriptions inactive on {len(details)} host(s)"
+ f"({len(self.subscribed['yes'])} subscriptions active)",
+ 'count': len(details),
+ 'detail': details,
+ }
+ self.health_check_raised = True
+ else:
+ self.mgr.health_checks.pop('CEPHADM_CHECK_SUBSCRIPTION', None)
def _check_public_network(self) -> None:
all_hosts = set(self.mgr.cache.facts.keys())
+ self.log.debug(f"checking public network membership for: {all_hosts}")
+
+ public_errors = []
for c_net in self.public_network_list:
+ self.log.debug(f"checking network {c_net}")
subnet_data = self.subnet_lookup.get(c_net, None)
+ self.log.debug(f"subnet data - {subnet_data}")
+
if subnet_data:
hosts_in_subnet = set(subnet_data.host_list)
errors = all_hosts.difference(hosts_in_subnet)
if errors:
- # CEPHADM_CHECK_PUBLIC_MEMBERSHIP
- self.log.error(f"subnet check for {c_net} found {len(errors)} host(s) missing")
+ public_errors.append(errors)
+
+ if public_errors:
+ self.mgr.health_checks['CEPHADM_CHECK_PUBLIC_MEMBERSHIP'] = {
+ 'severity': 'warning',
+ 'summary': 'Public network is not accessible from <some> cluster hosts',
+ 'count': 1,
+ 'detail': ["wah"],
+ }
+ self.health_check_raised = True
+ else:
+ self.mgr.health_checks.pop('CEPHADM_CHECK_PUBLIC_MEMBERSHIP', None)
- def _check_osd_network(self) -> None:
+ def _check_osd_mtu(self) -> None:
osd_hosts = set(self.hosts_with_role('osd'))
osd_network_list = self.cluster_network_list or self.public_network_list
+ mtu_errors: List[str] = []
+
for osd_net in osd_network_list:
subnet_data = self.subnet_lookup.get(osd_net, None)
+
if subnet_data:
+ self.log.debug(f"processing mtu map : {json.dumps(subnet_data.mtu_map)}")
+ mtu_count = {}
+ max_hosts = 0
+ mtu_ptr = ''
+ diffs = {}
for mtu, host_list in subnet_data.mtu_map.items():
mtu_hosts = set(host_list)
+ mtu_count[mtu] = len(mtu_hosts)
errors = osd_hosts.difference(mtu_hosts)
if errors:
- self.log.error(f"MTU problem {mtu}: {errors}")
- # CEPHADM_CHECK_MTU
- pass
+ diffs[mtu] = errors
+ if len(errors) > max_hosts:
+ mtu_ptr = mtu
+
+ if diffs:
+ self.log.debug("MTU problems detected")
+ self.log.debug(f"most hosts using {mtu_ptr}")
+ mtu_copy = subnet_data.mtu_map.copy()
+ del mtu_copy[mtu_ptr]
+ for bad_mtu in mtu_copy:
+ for h in mtu_copy[bad_mtu]:
+ host = HostFacts()
+ host.load_facts(self.mgr.cache.facts[h])
+ mtu_errors.append(
+ f"host {h}({host.subnet_to_nic(osd_net)}) is using MTU "
+ f"{bad_mtu} on {osd_net}, NICs on other hosts use {mtu_ptr}")
+
+ if mtu_errors:
+ self.mgr.health_checks['CEPHADM_CHECK_MTU'] = {
+ 'severity': 'warning',
+ 'summary': f"MTU setting inconsistent on osd network NICs on {len(mtu_errors)} host(s)",
+ 'count': len(mtu_errors),
+ 'detail': mtu_errors,
+ }
+ self.health_check_raised = True
+ else:
+ self.mgr.health_checks.pop('CEPHADM_CHECK_MTU', None)
+
+ def _check_osd_linkspeed(self) -> None:
+ osd_hosts = set(self.hosts_with_role('osd'))
+ osd_network_list = self.cluster_network_list or self.public_network_list
+
+ linkspeed_errors = []
+
+ for osd_net in osd_network_list:
+ subnet_data = self.subnet_lookup.get(osd_net, None)
+
+ if subnet_data:
+
+ self.log.debug(f"processing subnet : {subnet_data}")
+
+ speed_count = {}
+ max_hosts = 0
+ speed_ptr = ''
+ diffs = {}
for speed, host_list in subnet_data.speed_map.items():
speed_hosts = set(host_list)
+ speed_count[speed] = len(speed_hosts)
errors = osd_hosts.difference(speed_hosts)
if errors:
- self.log.error(f"Linkspeed problem {speed} : {errors}")
- # CEPHADM_CHECK_LINKSPEED
- pass
+ diffs[speed] = errors
+ if len(errors) > max_hosts:
+ speed_ptr = speed
+
+ if diffs:
+ self.log.debug("linkspeed issue(s) detected")
+ self.log.debug(f"most hosts using {speed_ptr}")
+ speed_copy = subnet_data.speed_map.copy()
+ del speed_copy[speed_ptr]
+ for bad_speed in speed_copy:
+ for h in speed_copy[bad_speed]:
+ host = HostFacts()
+ host.load_facts(self.mgr.cache.facts[h])
+ linkspeed_errors.append(
+ f"host {h}({host.subnet_to_nic(osd_net)}) has linkspeed of "
+ f"{bad_speed} on {osd_net}, NICs on other hosts use {speed_ptr}")
+
+ if linkspeed_errors:
+ self.mgr.health_checks['CEPHADM_CHECK_LINKSPEED'] = {
+ 'severity': 'warning',
+ 'summary': "Link speed is inconsistent on osd network NICs for "
+ f"{len(linkspeed_errors)} host(s)",
+ 'count': len(linkspeed_errors),
+ 'detail': linkspeed_errors,
+ }
+ self.health_check_raised = True
+ else:
+ self.mgr.health_checks.pop('CEPHADM_CHECK_LINKSPEED', None)
- else:
- # set CEPHADM_CHECK_NETWORK_MISSING healthcheck for this subnet
+ def _check_network_missing(self) -> None:
+ all_networks = self.public_network_list.copy()
+ all_networks.extend(self.cluster_network_list)
+
+ missing_networks = []
+ for osd_net in all_networks:
+ subnet_data = self.subnet_lookup.get(osd_net, None)
+
+ if not subnet_data:
+ missing_networks.append(f"{osd_net} not found on any host in the cluster")
self.log.warning(
f"Network {osd_net} has been defined, but is not present on any host")
- def _check_version_parity(self) -> None:
+ if missing_networks:
+ net_sfx = 's' if len(missing_networks) > 1 else ''
+ self.mgr.health_checks['CEPHADM_CHECK_NETWORK_MISSING'] = {
+ 'severity': 'warning',
+ 'summary': f"Public/cluster network{net_sfx} defined, but can not be found on "
+ "any host",
+ 'count': len(missing_networks),
+ 'detail': missing_networks,
+ }
+ self.health_check_raised = True
+ else:
+ self.mgr.health_checks.pop('CEPHADM_CHECK_NETWORK_MISSING', None)
+
+ def _check_release_parity(self) -> None:
upgrade_status = self.mgr.upgrade.upgrade_status()
if upgrade_status.in_progress:
# skip version consistency checks during an upgrade cycle
+ self.skipped_checks.append('ceph_release')
return
services = self.get_ceph_metadata()
- self.log.error(json.dumps(services))
- versions: Set[str] = set()
+ self.log.debug(json.dumps(services))
+ version_to_svcs: Dict[str, List[str]] = {}
+
for svc in services:
if services[svc]:
metadata = cast(Dict[str, str], services[svc])
- versions.add(metadata.get('ceph_release', ''))
- self.log.error(f"versions detected are: {versions}")
- if len(versions) > 1:
- # CEPHADM_CHECK_CEPH_VERSION
+ v = metadata.get('ceph_release', '')
+ if v in version_to_svcs:
+ version_to_svcs[v].append(svc)
+ else:
+ version_to_svcs[v] = [svc]
+
+ if len(version_to_svcs) > 1:
+ majority_ptr, _majority_count = self._get_majority(version_to_svcs)
+ ver_copy = version_to_svcs.copy()
+ del ver_copy[majority_ptr]
+ details = []
+ for v in ver_copy:
+ for svc in ver_copy[v]:
+ details.append(
+ f"{svc} is running {v} (majority of cluster is using {majority_ptr})")
+
+ self.mgr.health_checks['CEPHADM_CHECK_CEPH_RELEASE'] = {
+ 'severity': 'warning',
+ 'summary': 'Ceph cluster running mixed ceph releases',
+ 'count': len(details),
+ 'detail': details,
+ }
+ self.health_check_raised = True
self.log.warning(
- f"running with {len(versions)} different ceph releases within this cluster")
+ f"running with {len(version_to_svcs)} different ceph releases within this cluster")
+ else:
+ self.mgr.health_checks.pop('CEPHADM_CHECK_CEPH_RELEASE', None)
def _check_kernel_version(self) -> None:
if len(self.kernel_to_hosts.keys()) > 1:
- self.log.warning("mixed kernel versions")
- # CEPHADM_CHECK_KERNEL_VERSION
-
- def run_checks(self) -> None:
-
- # os_to_host = {} # TODO needs better support in gather-facts
- self.reset()
+ majority_hosts_ptr, majority_hosts_count = self._get_majority(self.kernel_to_hosts)
+ kver_copy = self.kernel_to_hosts.copy()
+ del kver_copy[majority_hosts_ptr]
+ details = []
+ for k in kver_copy:
+ for h in kver_copy[k]:
+ details.append(
+ f"host {h} running kernel {k}, majority of hosts({majority_hosts_count}) "
+ f"running {majority_hosts_ptr}")
+
+ self.log.warning("mixed kernel versions detected")
+ self.mgr.health_checks['CEPHADM_CHECK_KERNEL_VERSION'] = {
+ 'severity': 'warning',
+ 'summary': f"{len(details)} host(s) running different kernel versions",
+ 'count': len(details),
+ 'detail': details,
+ }
+ self.health_check_raised = True
+ else:
+ self.mgr.health_checks.pop('CEPHADM_CHECK_KERNEL_VERSION', None)
- # build lookup "maps" by walking the host facts
+ def _process_hosts(self) -> None:
+ self.log.debug(f"processing data from {len(self.mgr.cache.facts)} hosts")
for hostname in self.mgr.cache.facts:
- self.log.error(json.dumps(self.mgr.cache.facts[hostname]))
host = HostFacts()
host.load_facts(self.mgr.cache.facts[hostname])
if not host._valid:
self.log.warning(f"skipping {hostname} - incompatible host facts")
continue
- lsm_desc = host.kernel_security.get('description', '')
+ kernel_lsm = cast(Dict[str, str], host.kernel_security)
+ lsm_desc = kernel_lsm.get('description', '')
if lsm_desc:
if lsm_desc in self.lsm_to_host:
self.lsm_to_host[lsm_desc].append(hostname)
if subscription_state:
self.subscribed[subscription_state].append(hostname)
- interfaces: Dict[str, Dict[str, Any]] = host.interfaces
+ interfaces = cast(Dict[str, Dict[str, Any]], host.interfaces)
for name in interfaces.keys():
if name in ['lo']:
continue
self.host_to_role[hostname] = list({host_daemons[name].daemon_type
for name in host_daemons})
- # checks
- # TODO only call the check if the check is enabled
- self._check_kernel_lsm()
- self._check_subscription()
- self._check_public_network()
- self._check_osd_network()
- self._check_version_parity()
- self._check_kernel_version()
-
- self.log.error(f"lsm status {json.dumps(self.lsm_to_host)}")
- self.log.error(f"subscription states : {json.dumps(self.subscribed)}")
-
- self.log.error(f"mtu data : {str(self.subnet_lookup)}")
- self.log.error(f"roles : {json.dumps(self.host_to_role)}")
- self.log.error(f"kernel versions : {json.dumps(self.kernel_to_hosts)}")
+ def run_checks(self) -> None:
+
+ checks_enabled = self.mgr.get_module_option('config_checks_enabled')
+ if checks_enabled != 'true':
+ self.log.info("ALL cephadm checks are disabled, use 'ceph config set "
+ "mgr/cephadm/config_checks_enabled true' to enable")
+ return
+
+ self.reset()
+
+ check_config: Dict[str, str] = {}
+ checks_raw: Optional[str] = self.mgr.get_store('config_checks')
+ if checks_raw:
+ try:
+ check_config.update(json.loads(checks_raw))
+ except json.JSONDecodeError:
+ self.log.error(
+ "mgr/cephadm/config_checks is not JSON serializable - all checks will run")
+
+ # build lookup "maps" by walking the host facts, once
+ self._process_hosts()
+
+ self.health_check_raised = False
+ self.active_checks = []
+ self.skipped_checks = []
+
+ # process all healthchecks that are not explcitly disabled
+ for health_check in self.health_checks:
+ if check_config.get(health_check.parameter, '') != 'disabled':
+ self.active_checks.append(health_check.parameter)
+ health_check.func()
+
+ if self.health_check_raised:
+ self.log.warning("CEPHADM checks have detected configuration anomalies")
+ else:
+ self.log.info(
+ f"CEPHADM {self.active_checks_count}/{self.defined_checks} checks enabled "
+ f"and executed ({self.skipped_checks_count} bypassed). No issues detected")
+
+ self.mgr.set_health_checks(self.mgr.health_checks)