From e35d4144d380cef190a04517b4d7b30d520d5b4f Mon Sep 17 00:00:00 2001 From: Adam King Date: Mon, 23 May 2022 15:57:14 -0400 Subject: [PATCH] mgr/cephadm: store device info separately from rest of host cache device info tends to take up the most space out of everything, so the hope is by giving it its own location in the config key store we can avoid hitting issues where the host cache value we attempt to place in the config key store exceeds the size limit Fixes: https://tracker.ceph.com/issues/54251 Fixes: https://tracker.ceph.com/issues/53624 Signed-off-by: Adam King --- src/pybind/mgr/cephadm/inventory.py | 98 ++++++++++++- src/pybind/mgr/cephadm/tests/test_cephadm.py | 143 ++++++++++++++++++- 2 files changed, 235 insertions(+), 6 deletions(-) diff --git a/src/pybind/mgr/cephadm/inventory.py b/src/pybind/mgr/cephadm/inventory.py index dbdedd47778..b3a76eaca70 100644 --- a/src/pybind/mgr/cephadm/inventory.py +++ b/src/pybind/mgr/cephadm/inventory.py @@ -1,8 +1,10 @@ import datetime +import enum from copy import copy import ipaddress import json import logging +import math import socket from typing import TYPE_CHECKING, Dict, List, Iterator, Optional, Any, Tuple, Set, Mapping, cast, \ NamedTuple, Type @@ -28,6 +30,12 @@ SPEC_STORE_PREFIX = "spec." AGENT_CACHE_PREFIX = 'agent.' +class HostCacheStatus(enum.Enum): + stray = 'stray' + host = 'host' + devices = 'devices' + + class Inventory: """ The inventory stores a HostSpec for all hosts persistently. @@ -463,7 +471,9 @@ class HostCache(): # type: () -> None for k, v in self.mgr.get_store_prefix(HOST_CACHE_PREFIX).items(): host = k[len(HOST_CACHE_PREFIX):] - if host not in self.mgr.inventory: + if self._get_host_cache_entry_status(host) != HostCacheStatus.host: + if self._get_host_cache_entry_status(host) == HostCacheStatus.devices: + continue self.mgr.log.warning('removing stray HostCache host record %s' % ( host)) self.mgr.set_store(k, None) @@ -482,14 +492,16 @@ class HostCache(): self.daemons[host] = {} self.osdspec_previews[host] = [] self.osdspec_last_applied[host] = {} - self.devices[host] = [] self.networks[host] = {} self.daemon_config_deps[host] = {} for name, d in j.get('daemons', {}).items(): self.daemons[host][name] = \ orchestrator.DaemonDescription.from_json(d) + self.devices[host] = [] + # still want to check old device location for upgrade scenarios for d in j.get('devices', []): self.devices[host].append(inventory.Device.from_json(d)) + self.devices[host] += self.load_host_devices(host) self.networks[host] = j.get('networks_and_interfaces', {}) self.osdspec_previews[host] = j.get('osdspec_previews', {}) self.last_client_files[host] = j.get('last_client_files', {}) @@ -517,6 +529,23 @@ class HostCache(): host, e)) pass + def _get_host_cache_entry_status(self, host: str) -> HostCacheStatus: + # return whether a host cache entry in the config-key + # store is for a host, a set of devices or is stray. + # for a host, the entry name will match a hostname in our + # inventory. For devices, it will be formatted + # .devices. where is + # in out inventory. If neither case applies, it is stray + if host in self.mgr.inventory: + return HostCacheStatus.host + try: + # try stripping off the ".devices." and see if we get + # a host name that matches our inventory + actual_host = '.'.join(host.split('.')[:-2]) + return HostCacheStatus.devices if actual_host in self.mgr.inventory else HostCacheStatus.stray + except Exception: + return HostCacheStatus.stray + def update_host_daemons(self, host, dm): # type: (str, Dict[str, orchestrator.DaemonDescription]) -> None self.daemons[host] = dm @@ -671,9 +700,6 @@ class HostCache(): if host in self.daemons: for name, dd in self.daemons[host].items(): j['daemons'][name] = dd.to_json() - if host in self.devices: - for d in self.devices[host]: - j['devices'].append(d.to_json()) if host in self.networks: j['networks_and_interfaces'] = self.networks[host] if host in self.daemon_config_deps: @@ -697,9 +723,71 @@ class HostCache(): j['scheduled_daemon_actions'] = self.scheduled_daemon_actions[host] if host in self.metadata_up_to_date: j['metadata_up_to_date'] = self.metadata_up_to_date[host] + if host in self.devices: + self.save_host_devices(host) self.mgr.set_store(HOST_CACHE_PREFIX + host, json.dumps(j)) + def save_host_devices(self, host: str) -> None: + if host not in self.devices or not self.devices[host]: + logger.debug(f'Host {host} has no devices to save') + return + + devs: List[Dict[str, Any]] = [] + for d in self.devices[host]: + devs.append(d.to_json()) + + def byte_len(s: str) -> int: + return len(s.encode('utf-8')) + + dev_cache_counter: int = 0 + cache_size: int = self.mgr.get_foreign_ceph_option('mon', 'mon_config_key_max_entry_size') + if cache_size is not None and cache_size != 0 and byte_len(json.dumps(devs)) > cache_size - 1024: + # no guarantee all device entries take up the same amount of space + # splitting it up so there's one more entry than we need should be fairly + # safe and save a lot of extra logic checking sizes + cache_entries_needed = math.ceil(byte_len(json.dumps(devs)) / cache_size) + 1 + dev_sublist_size = math.ceil(len(devs) / cache_entries_needed) + dev_lists: List[List[Dict[str, Any]]] = [devs[i:i + dev_sublist_size] + for i in range(0, len(devs), dev_sublist_size)] + for dev_list in dev_lists: + dev_dict: Dict[str, Any] = {'devices': dev_list} + if dev_cache_counter == 0: + dev_dict.update({'entries': len(dev_lists)}) + self.mgr.set_store(HOST_CACHE_PREFIX + host + '.devices.' + + str(dev_cache_counter), json.dumps(dev_dict)) + dev_cache_counter += 1 + else: + self.mgr.set_store(HOST_CACHE_PREFIX + host + '.devices.' + + str(dev_cache_counter), json.dumps({'devices': devs, 'entries': 1})) + + def load_host_devices(self, host: str) -> List[inventory.Device]: + dev_cache_counter: int = 0 + devs: List[Dict[str, Any]] = [] + dev_entries: int = 0 + try: + # number of entries for the host's devices should be in + # the "entries" field of the first entry + dev_entries = json.loads(self.mgr.get_store( + HOST_CACHE_PREFIX + host + '.devices.0')).get('entries') + except Exception: + logger.debug(f'No device entries found for host {host}') + for i in range(dev_entries): + try: + new_devs = json.loads(self.mgr.get_store( + HOST_CACHE_PREFIX + host + '.devices.' + str(i))).get('devices', []) + if len(new_devs) > 0: + # verify list contains actual device objects by trying to load one from json + inventory.Device.from_json(new_devs[0]) + # if we didn't throw an Exception on above line, we can add the devices + devs = devs + new_devs + dev_cache_counter += 1 + except Exception as e: + logger.error(('Hit exception trying to load devices from ' + + f'{HOST_CACHE_PREFIX + host + ".devices." + str(dev_cache_counter)} in key store: {e}')) + return [] + return [inventory.Device.from_json(d) for d in devs] + def rm_host(self, host): # type: (str) -> None if host in self.daemons: diff --git a/src/pybind/mgr/cephadm/tests/test_cephadm.py b/src/pybind/mgr/cephadm/tests/test_cephadm.py index d8eb76b43b0..2cce71aae22 100644 --- a/src/pybind/mgr/cephadm/tests/test_cephadm.py +++ b/src/pybind/mgr/cephadm/tests/test_cephadm.py @@ -7,6 +7,7 @@ import pytest from ceph.deployment.drive_group import DriveGroupSpec, DeviceSelection from cephadm.serve import CephadmServe +from cephadm.inventory import HostCacheStatus from cephadm.services.osd import OSD, OSDRemovalQueue, OsdIdClaims try: @@ -1153,7 +1154,9 @@ class TestCephadm(object): @mock.patch("cephadm.module.CephadmOrchestrator.get_foreign_ceph_option") @mock.patch("cephadm.serve.CephadmServe._run_cephadm") - def test_invalid_config_option_health_warning(self, _run_cephadm, get_foreign_ceph_option, cephadm_module: CephadmOrchestrator): + @mock.patch("cephadm.module.HostCache.save_host_devices") + def test_invalid_config_option_health_warning(self, _save_devs, _run_cephadm, get_foreign_ceph_option, cephadm_module: CephadmOrchestrator): + _save_devs.return_value = None _run_cephadm.side_effect = async_side_effect(('{}', '', 0)) with with_host(cephadm_module, 'test'): ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1) @@ -1167,6 +1170,144 @@ class TestCephadm(object): assert 'Ignoring invalid mgr config option test' in cephadm_module.health_checks[ 'CEPHADM_INVALID_CONFIG_OPTION']['detail'] + @mock.patch("cephadm.module.CephadmOrchestrator.get_foreign_ceph_option") + @mock.patch("cephadm.serve.CephadmServe._run_cephadm") + @mock.patch("cephadm.module.CephadmOrchestrator.set_store") + def test_save_devices(self, _set_store, _run_cephadm, _get_foreign_ceph_option, cephadm_module: CephadmOrchestrator): + _run_cephadm.side_effect = async_side_effect(('{}', '', 0)) + entry_size = 65536 # default 64k size + _get_foreign_ceph_option.return_value = entry_size + + class FakeDev(): + def __init__(self, c: str = 'a'): + # using 1015 here makes the serialized string exactly 1024 bytes if c is one char + self.content = {c: c * 1015} + + def to_json(self): + return self.content + + def from_json(self, stuff): + return json.loads(stuff) + + def byte_len(s): + return len(s.encode('utf-8')) + + with with_host(cephadm_module, 'test'): + fake_devices = [FakeDev()] * 100 # should be ~100k + assert byte_len(json.dumps([d.to_json() for d in fake_devices])) > entry_size + assert byte_len(json.dumps([d.to_json() for d in fake_devices])) < entry_size * 2 + cephadm_module.cache.update_host_devices('test', fake_devices) + cephadm_module.cache.save_host_devices('test') + expected_calls = [ + mock.call('host.test.devices.0', json.dumps( + {'devices': [d.to_json() for d in [FakeDev()] * 34], 'entries': 3})), + mock.call('host.test.devices.1', json.dumps( + {'devices': [d.to_json() for d in [FakeDev()] * 34]})), + mock.call('host.test.devices.2', json.dumps( + {'devices': [d.to_json() for d in [FakeDev()] * 32]})), + ] + _set_store.assert_has_calls(expected_calls) + + fake_devices = [FakeDev()] * 300 # should be ~300k + assert byte_len(json.dumps([d.to_json() for d in fake_devices])) > entry_size * 4 + assert byte_len(json.dumps([d.to_json() for d in fake_devices])) < entry_size * 5 + cephadm_module.cache.update_host_devices('test', fake_devices) + cephadm_module.cache.save_host_devices('test') + expected_calls = [ + mock.call('host.test.devices.0', json.dumps( + {'devices': [d.to_json() for d in [FakeDev()] * 50], 'entries': 6})), + mock.call('host.test.devices.1', json.dumps( + {'devices': [d.to_json() for d in [FakeDev()] * 50]})), + mock.call('host.test.devices.2', json.dumps( + {'devices': [d.to_json() for d in [FakeDev()] * 50]})), + mock.call('host.test.devices.3', json.dumps( + {'devices': [d.to_json() for d in [FakeDev()] * 50]})), + mock.call('host.test.devices.4', json.dumps( + {'devices': [d.to_json() for d in [FakeDev()] * 50]})), + mock.call('host.test.devices.5', json.dumps( + {'devices': [d.to_json() for d in [FakeDev()] * 50]})), + ] + _set_store.assert_has_calls(expected_calls) + + fake_devices = [FakeDev()] * 62 # should be ~62k, just under cache size + assert byte_len(json.dumps([d.to_json() for d in fake_devices])) < entry_size + cephadm_module.cache.update_host_devices('test', fake_devices) + cephadm_module.cache.save_host_devices('test') + expected_calls = [ + mock.call('host.test.devices.0', json.dumps( + {'devices': [d.to_json() for d in [FakeDev()] * 62], 'entries': 1})), + ] + _set_store.assert_has_calls(expected_calls) + + # should be ~64k but just over so it requires more entries + fake_devices = [FakeDev()] * 64 + assert byte_len(json.dumps([d.to_json() for d in fake_devices])) > entry_size + assert byte_len(json.dumps([d.to_json() for d in fake_devices])) < entry_size * 2 + cephadm_module.cache.update_host_devices('test', fake_devices) + cephadm_module.cache.save_host_devices('test') + expected_calls = [ + mock.call('host.test.devices.0', json.dumps( + {'devices': [d.to_json() for d in [FakeDev()] * 22], 'entries': 3})), + mock.call('host.test.devices.1', json.dumps( + {'devices': [d.to_json() for d in [FakeDev()] * 22]})), + mock.call('host.test.devices.2', json.dumps( + {'devices': [d.to_json() for d in [FakeDev()] * 20]})), + ] + _set_store.assert_has_calls(expected_calls) + + # test for actual content being correct using differing devices + entry_size = 3072 + _get_foreign_ceph_option.return_value = entry_size + fake_devices = [FakeDev('a'), FakeDev('b'), FakeDev('c'), FakeDev('d'), FakeDev('e')] + assert byte_len(json.dumps([d.to_json() for d in fake_devices])) > entry_size + assert byte_len(json.dumps([d.to_json() for d in fake_devices])) < entry_size * 2 + cephadm_module.cache.update_host_devices('test', fake_devices) + cephadm_module.cache.save_host_devices('test') + expected_calls = [ + mock.call('host.test.devices.0', json.dumps( + {'devices': [d.to_json() for d in [FakeDev('a'), FakeDev('b')]], 'entries': 3})), + mock.call('host.test.devices.1', json.dumps( + {'devices': [d.to_json() for d in [FakeDev('c'), FakeDev('d')]]})), + mock.call('host.test.devices.2', json.dumps( + {'devices': [d.to_json() for d in [FakeDev('e')]]})), + ] + _set_store.assert_has_calls(expected_calls) + + @mock.patch("cephadm.module.CephadmOrchestrator.get_store") + def test_load_devices(self, _get_store, cephadm_module: CephadmOrchestrator): + def _fake_store(key): + if key == 'host.test.devices.0': + return json.dumps({'devices': [d.to_json() for d in [Device('/path')] * 9], 'entries': 3}) + elif key == 'host.test.devices.1': + return json.dumps({'devices': [d.to_json() for d in [Device('/path')] * 7]}) + elif key == 'host.test.devices.2': + return json.dumps({'devices': [d.to_json() for d in [Device('/path')] * 4]}) + else: + raise Exception(f'Get store with unexpected value {key}') + + _get_store.side_effect = _fake_store + devs = cephadm_module.cache.load_host_devices('test') + assert devs == [Device('/path')] * 20 + + @mock.patch("cephadm.module.Inventory.__contains__") + def test_check_stray_host_cache_entry(self, _contains, cephadm_module: CephadmOrchestrator): + def _fake_inv(key): + if key in ['host1', 'node02', 'host.something.com']: + return True + return False + + _contains.side_effect = _fake_inv + assert cephadm_module.cache._get_host_cache_entry_status('host1') == HostCacheStatus.host + assert cephadm_module.cache._get_host_cache_entry_status( + 'host.something.com') == HostCacheStatus.host + assert cephadm_module.cache._get_host_cache_entry_status( + 'node02.devices.37') == HostCacheStatus.devices + assert cephadm_module.cache._get_host_cache_entry_status( + 'host.something.com.devices.0') == HostCacheStatus.devices + assert cephadm_module.cache._get_host_cache_entry_status('hostXXX') == HostCacheStatus.stray + assert cephadm_module.cache._get_host_cache_entry_status( + 'host.nothing.com') == HostCacheStatus.stray + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) @mock.patch("cephadm.services.nfs.NFSService.run_grace_tool", mock.MagicMock()) @mock.patch("cephadm.services.nfs.NFSService.purge", mock.MagicMock()) -- 2.39.5