import datetime
+import enum
from copy import copy
import ipaddress
import json
import logging
+import math
import socket
from typing import TYPE_CHECKING, Dict, List, Iterator, Optional, Any, Tuple, Set, Mapping, cast, \
NamedTuple, Type
AGENT_CACHE_PREFIX = 'agent.'
+class HostCacheStatus(enum.Enum):
+ stray = 'stray'
+ host = 'host'
+ devices = 'devices'
+
+
class Inventory:
"""
The inventory stores a HostSpec for all hosts persistently.
# type: () -> None
for k, v in self.mgr.get_store_prefix(HOST_CACHE_PREFIX).items():
host = k[len(HOST_CACHE_PREFIX):]
- if host not in self.mgr.inventory:
+ if self._get_host_cache_entry_status(host) != HostCacheStatus.host:
+ if self._get_host_cache_entry_status(host) == HostCacheStatus.devices:
+ continue
self.mgr.log.warning('removing stray HostCache host record %s' % (
host))
self.mgr.set_store(k, None)
self.daemons[host] = {}
self.osdspec_previews[host] = []
self.osdspec_last_applied[host] = {}
- self.devices[host] = []
self.networks[host] = {}
self.daemon_config_deps[host] = {}
for name, d in j.get('daemons', {}).items():
self.daemons[host][name] = \
orchestrator.DaemonDescription.from_json(d)
+ self.devices[host] = []
+ # still want to check old device location for upgrade scenarios
for d in j.get('devices', []):
self.devices[host].append(inventory.Device.from_json(d))
+ self.devices[host] += self.load_host_devices(host)
self.networks[host] = j.get('networks_and_interfaces', {})
self.osdspec_previews[host] = j.get('osdspec_previews', {})
self.last_client_files[host] = j.get('last_client_files', {})
host, e))
pass
+ def _get_host_cache_entry_status(self, host: str) -> HostCacheStatus:
+ # return whether a host cache entry in the config-key
+ # store is for a host, a set of devices or is stray.
+ # for a host, the entry name will match a hostname in our
+ # inventory. For devices, it will be formatted
+ # <hostname>.devices.<integer> where <hostname> is
+ # in out inventory. If neither case applies, it is stray
+ if host in self.mgr.inventory:
+ return HostCacheStatus.host
+ try:
+ # try stripping off the ".devices.<integer>" and see if we get
+ # a host name that matches our inventory
+ actual_host = '.'.join(host.split('.')[:-2])
+ return HostCacheStatus.devices if actual_host in self.mgr.inventory else HostCacheStatus.stray
+ except Exception:
+ return HostCacheStatus.stray
+
def update_host_daemons(self, host, dm):
# type: (str, Dict[str, orchestrator.DaemonDescription]) -> None
self.daemons[host] = dm
if host in self.daemons:
for name, dd in self.daemons[host].items():
j['daemons'][name] = dd.to_json()
- if host in self.devices:
- for d in self.devices[host]:
- j['devices'].append(d.to_json())
if host in self.networks:
j['networks_and_interfaces'] = self.networks[host]
if host in self.daemon_config_deps:
j['scheduled_daemon_actions'] = self.scheduled_daemon_actions[host]
if host in self.metadata_up_to_date:
j['metadata_up_to_date'] = self.metadata_up_to_date[host]
+ if host in self.devices:
+ self.save_host_devices(host)
self.mgr.set_store(HOST_CACHE_PREFIX + host, json.dumps(j))
+ def save_host_devices(self, host: str) -> None:
+ if host not in self.devices or not self.devices[host]:
+ logger.debug(f'Host {host} has no devices to save')
+ return
+
+ devs: List[Dict[str, Any]] = []
+ for d in self.devices[host]:
+ devs.append(d.to_json())
+
+ def byte_len(s: str) -> int:
+ return len(s.encode('utf-8'))
+
+ dev_cache_counter: int = 0
+ cache_size: int = self.mgr.get_foreign_ceph_option('mon', 'mon_config_key_max_entry_size')
+ if cache_size is not None and cache_size != 0 and byte_len(json.dumps(devs)) > cache_size - 1024:
+ # no guarantee all device entries take up the same amount of space
+ # splitting it up so there's one more entry than we need should be fairly
+ # safe and save a lot of extra logic checking sizes
+ cache_entries_needed = math.ceil(byte_len(json.dumps(devs)) / cache_size) + 1
+ dev_sublist_size = math.ceil(len(devs) / cache_entries_needed)
+ dev_lists: List[List[Dict[str, Any]]] = [devs[i:i + dev_sublist_size]
+ for i in range(0, len(devs), dev_sublist_size)]
+ for dev_list in dev_lists:
+ dev_dict: Dict[str, Any] = {'devices': dev_list}
+ if dev_cache_counter == 0:
+ dev_dict.update({'entries': len(dev_lists)})
+ self.mgr.set_store(HOST_CACHE_PREFIX + host + '.devices.'
+ + str(dev_cache_counter), json.dumps(dev_dict))
+ dev_cache_counter += 1
+ else:
+ self.mgr.set_store(HOST_CACHE_PREFIX + host + '.devices.'
+ + str(dev_cache_counter), json.dumps({'devices': devs, 'entries': 1}))
+
+ def load_host_devices(self, host: str) -> List[inventory.Device]:
+ dev_cache_counter: int = 0
+ devs: List[Dict[str, Any]] = []
+ dev_entries: int = 0
+ try:
+ # number of entries for the host's devices should be in
+ # the "entries" field of the first entry
+ dev_entries = json.loads(self.mgr.get_store(
+ HOST_CACHE_PREFIX + host + '.devices.0')).get('entries')
+ except Exception:
+ logger.debug(f'No device entries found for host {host}')
+ for i in range(dev_entries):
+ try:
+ new_devs = json.loads(self.mgr.get_store(
+ HOST_CACHE_PREFIX + host + '.devices.' + str(i))).get('devices', [])
+ if len(new_devs) > 0:
+ # verify list contains actual device objects by trying to load one from json
+ inventory.Device.from_json(new_devs[0])
+ # if we didn't throw an Exception on above line, we can add the devices
+ devs = devs + new_devs
+ dev_cache_counter += 1
+ except Exception as e:
+ logger.error(('Hit exception trying to load devices from '
+ + f'{HOST_CACHE_PREFIX + host + ".devices." + str(dev_cache_counter)} in key store: {e}'))
+ return []
+ return [inventory.Device.from_json(d) for d in devs]
+
def rm_host(self, host):
# type: (str) -> None
if host in self.daemons:
from ceph.deployment.drive_group import DriveGroupSpec, DeviceSelection
from cephadm.serve import CephadmServe
+from cephadm.inventory import HostCacheStatus
from cephadm.services.osd import OSD, OSDRemovalQueue, OsdIdClaims
try:
@mock.patch("cephadm.module.CephadmOrchestrator.get_foreign_ceph_option")
@mock.patch("cephadm.serve.CephadmServe._run_cephadm")
- def test_invalid_config_option_health_warning(self, _run_cephadm, get_foreign_ceph_option, cephadm_module: CephadmOrchestrator):
+ @mock.patch("cephadm.module.HostCache.save_host_devices")
+ def test_invalid_config_option_health_warning(self, _save_devs, _run_cephadm, get_foreign_ceph_option, cephadm_module: CephadmOrchestrator):
+ _save_devs.return_value = None
_run_cephadm.side_effect = async_side_effect(('{}', '', 0))
with with_host(cephadm_module, 'test'):
ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1)
assert 'Ignoring invalid mgr config option test' in cephadm_module.health_checks[
'CEPHADM_INVALID_CONFIG_OPTION']['detail']
+ @mock.patch("cephadm.module.CephadmOrchestrator.get_foreign_ceph_option")
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
+ @mock.patch("cephadm.module.CephadmOrchestrator.set_store")
+ def test_save_devices(self, _set_store, _run_cephadm, _get_foreign_ceph_option, cephadm_module: CephadmOrchestrator):
+ _run_cephadm.side_effect = async_side_effect(('{}', '', 0))
+ entry_size = 65536 # default 64k size
+ _get_foreign_ceph_option.return_value = entry_size
+
+ class FakeDev():
+ def __init__(self, c: str = 'a'):
+ # using 1015 here makes the serialized string exactly 1024 bytes if c is one char
+ self.content = {c: c * 1015}
+
+ def to_json(self):
+ return self.content
+
+ def from_json(self, stuff):
+ return json.loads(stuff)
+
+ def byte_len(s):
+ return len(s.encode('utf-8'))
+
+ with with_host(cephadm_module, 'test'):
+ fake_devices = [FakeDev()] * 100 # should be ~100k
+ assert byte_len(json.dumps([d.to_json() for d in fake_devices])) > entry_size
+ assert byte_len(json.dumps([d.to_json() for d in fake_devices])) < entry_size * 2
+ cephadm_module.cache.update_host_devices('test', fake_devices)
+ cephadm_module.cache.save_host_devices('test')
+ expected_calls = [
+ mock.call('host.test.devices.0', json.dumps(
+ {'devices': [d.to_json() for d in [FakeDev()] * 34], 'entries': 3})),
+ mock.call('host.test.devices.1', json.dumps(
+ {'devices': [d.to_json() for d in [FakeDev()] * 34]})),
+ mock.call('host.test.devices.2', json.dumps(
+ {'devices': [d.to_json() for d in [FakeDev()] * 32]})),
+ ]
+ _set_store.assert_has_calls(expected_calls)
+
+ fake_devices = [FakeDev()] * 300 # should be ~300k
+ assert byte_len(json.dumps([d.to_json() for d in fake_devices])) > entry_size * 4
+ assert byte_len(json.dumps([d.to_json() for d in fake_devices])) < entry_size * 5
+ cephadm_module.cache.update_host_devices('test', fake_devices)
+ cephadm_module.cache.save_host_devices('test')
+ expected_calls = [
+ mock.call('host.test.devices.0', json.dumps(
+ {'devices': [d.to_json() for d in [FakeDev()] * 50], 'entries': 6})),
+ mock.call('host.test.devices.1', json.dumps(
+ {'devices': [d.to_json() for d in [FakeDev()] * 50]})),
+ mock.call('host.test.devices.2', json.dumps(
+ {'devices': [d.to_json() for d in [FakeDev()] * 50]})),
+ mock.call('host.test.devices.3', json.dumps(
+ {'devices': [d.to_json() for d in [FakeDev()] * 50]})),
+ mock.call('host.test.devices.4', json.dumps(
+ {'devices': [d.to_json() for d in [FakeDev()] * 50]})),
+ mock.call('host.test.devices.5', json.dumps(
+ {'devices': [d.to_json() for d in [FakeDev()] * 50]})),
+ ]
+ _set_store.assert_has_calls(expected_calls)
+
+ fake_devices = [FakeDev()] * 62 # should be ~62k, just under cache size
+ assert byte_len(json.dumps([d.to_json() for d in fake_devices])) < entry_size
+ cephadm_module.cache.update_host_devices('test', fake_devices)
+ cephadm_module.cache.save_host_devices('test')
+ expected_calls = [
+ mock.call('host.test.devices.0', json.dumps(
+ {'devices': [d.to_json() for d in [FakeDev()] * 62], 'entries': 1})),
+ ]
+ _set_store.assert_has_calls(expected_calls)
+
+ # should be ~64k but just over so it requires more entries
+ fake_devices = [FakeDev()] * 64
+ assert byte_len(json.dumps([d.to_json() for d in fake_devices])) > entry_size
+ assert byte_len(json.dumps([d.to_json() for d in fake_devices])) < entry_size * 2
+ cephadm_module.cache.update_host_devices('test', fake_devices)
+ cephadm_module.cache.save_host_devices('test')
+ expected_calls = [
+ mock.call('host.test.devices.0', json.dumps(
+ {'devices': [d.to_json() for d in [FakeDev()] * 22], 'entries': 3})),
+ mock.call('host.test.devices.1', json.dumps(
+ {'devices': [d.to_json() for d in [FakeDev()] * 22]})),
+ mock.call('host.test.devices.2', json.dumps(
+ {'devices': [d.to_json() for d in [FakeDev()] * 20]})),
+ ]
+ _set_store.assert_has_calls(expected_calls)
+
+ # test for actual content being correct using differing devices
+ entry_size = 3072
+ _get_foreign_ceph_option.return_value = entry_size
+ fake_devices = [FakeDev('a'), FakeDev('b'), FakeDev('c'), FakeDev('d'), FakeDev('e')]
+ assert byte_len(json.dumps([d.to_json() for d in fake_devices])) > entry_size
+ assert byte_len(json.dumps([d.to_json() for d in fake_devices])) < entry_size * 2
+ cephadm_module.cache.update_host_devices('test', fake_devices)
+ cephadm_module.cache.save_host_devices('test')
+ expected_calls = [
+ mock.call('host.test.devices.0', json.dumps(
+ {'devices': [d.to_json() for d in [FakeDev('a'), FakeDev('b')]], 'entries': 3})),
+ mock.call('host.test.devices.1', json.dumps(
+ {'devices': [d.to_json() for d in [FakeDev('c'), FakeDev('d')]]})),
+ mock.call('host.test.devices.2', json.dumps(
+ {'devices': [d.to_json() for d in [FakeDev('e')]]})),
+ ]
+ _set_store.assert_has_calls(expected_calls)
+
+ @mock.patch("cephadm.module.CephadmOrchestrator.get_store")
+ def test_load_devices(self, _get_store, cephadm_module: CephadmOrchestrator):
+ def _fake_store(key):
+ if key == 'host.test.devices.0':
+ return json.dumps({'devices': [d.to_json() for d in [Device('/path')] * 9], 'entries': 3})
+ elif key == 'host.test.devices.1':
+ return json.dumps({'devices': [d.to_json() for d in [Device('/path')] * 7]})
+ elif key == 'host.test.devices.2':
+ return json.dumps({'devices': [d.to_json() for d in [Device('/path')] * 4]})
+ else:
+ raise Exception(f'Get store with unexpected value {key}')
+
+ _get_store.side_effect = _fake_store
+ devs = cephadm_module.cache.load_host_devices('test')
+ assert devs == [Device('/path')] * 20
+
+ @mock.patch("cephadm.module.Inventory.__contains__")
+ def test_check_stray_host_cache_entry(self, _contains, cephadm_module: CephadmOrchestrator):
+ def _fake_inv(key):
+ if key in ['host1', 'node02', 'host.something.com']:
+ return True
+ return False
+
+ _contains.side_effect = _fake_inv
+ assert cephadm_module.cache._get_host_cache_entry_status('host1') == HostCacheStatus.host
+ assert cephadm_module.cache._get_host_cache_entry_status(
+ 'host.something.com') == HostCacheStatus.host
+ assert cephadm_module.cache._get_host_cache_entry_status(
+ 'node02.devices.37') == HostCacheStatus.devices
+ assert cephadm_module.cache._get_host_cache_entry_status(
+ 'host.something.com.devices.0') == HostCacheStatus.devices
+ assert cephadm_module.cache._get_host_cache_entry_status('hostXXX') == HostCacheStatus.stray
+ assert cephadm_module.cache._get_host_cache_entry_status(
+ 'host.nothing.com') == HostCacheStatus.stray
+
@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
@mock.patch("cephadm.services.nfs.NFSService.run_grace_tool", mock.MagicMock())
@mock.patch("cephadm.services.nfs.NFSService.purge", mock.MagicMock())