]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/cephadm: store device info separately from rest of host cache 46791/head
authorAdam King <adking@redhat.com>
Mon, 23 May 2022 19:57:14 +0000 (15:57 -0400)
committerAdam King <adking@redhat.com>
Wed, 22 Jun 2022 00:07:40 +0000 (20:07 -0400)
device info tends to take up the most space out of
everything, so the hope is by giving it its own
location in the config key store we can avoid hitting
issues where the host cache value we attempt to
place in the config key store exceeds the size limit

Fixes: https://tracker.ceph.com/issues/54251
Fixes: https://tracker.ceph.com/issues/53624
Signed-off-by: Adam King <adking@redhat.com>
(cherry picked from commit e35d4144d380cef190a04517b4d7b30d520d5b4f)

src/pybind/mgr/cephadm/inventory.py
src/pybind/mgr/cephadm/tests/test_cephadm.py

index dbdedd477780c5a607c06ae2ab2a0c73fe68d368..b3a76eaca70d14f1bc39c079c40fd1788e4889c2 100644 (file)
@@ -1,8 +1,10 @@
 import datetime
+import enum
 from copy import copy
 import ipaddress
 import json
 import logging
+import math
 import socket
 from typing import TYPE_CHECKING, Dict, List, Iterator, Optional, Any, Tuple, Set, Mapping, cast, \
     NamedTuple, Type
@@ -28,6 +30,12 @@ SPEC_STORE_PREFIX = "spec."
 AGENT_CACHE_PREFIX = 'agent.'
 
 
+class HostCacheStatus(enum.Enum):
+    stray = 'stray'
+    host = 'host'
+    devices = 'devices'
+
+
 class Inventory:
     """
     The inventory stores a HostSpec for all hosts persistently.
@@ -463,7 +471,9 @@ class HostCache():
         # type: () -> None
         for k, v in self.mgr.get_store_prefix(HOST_CACHE_PREFIX).items():
             host = k[len(HOST_CACHE_PREFIX):]
-            if host not in self.mgr.inventory:
+            if self._get_host_cache_entry_status(host) != HostCacheStatus.host:
+                if self._get_host_cache_entry_status(host) == HostCacheStatus.devices:
+                    continue
                 self.mgr.log.warning('removing stray HostCache host record %s' % (
                     host))
                 self.mgr.set_store(k, None)
@@ -482,14 +492,16 @@ class HostCache():
                 self.daemons[host] = {}
                 self.osdspec_previews[host] = []
                 self.osdspec_last_applied[host] = {}
-                self.devices[host] = []
                 self.networks[host] = {}
                 self.daemon_config_deps[host] = {}
                 for name, d in j.get('daemons', {}).items():
                     self.daemons[host][name] = \
                         orchestrator.DaemonDescription.from_json(d)
+                self.devices[host] = []
+                # still want to check old device location for upgrade scenarios
                 for d in j.get('devices', []):
                     self.devices[host].append(inventory.Device.from_json(d))
+                self.devices[host] += self.load_host_devices(host)
                 self.networks[host] = j.get('networks_and_interfaces', {})
                 self.osdspec_previews[host] = j.get('osdspec_previews', {})
                 self.last_client_files[host] = j.get('last_client_files', {})
@@ -517,6 +529,23 @@ class HostCache():
                     host, e))
                 pass
 
+    def _get_host_cache_entry_status(self, host: str) -> HostCacheStatus:
+        # return whether a host cache entry in the config-key
+        # store is for a host, a set of devices or is stray.
+        # for a host, the entry name will match a hostname in our
+        # inventory. For devices, it will be formatted
+        # <hostname>.devices.<integer> where <hostname> is
+        # in out inventory. If neither case applies, it is stray
+        if host in self.mgr.inventory:
+            return HostCacheStatus.host
+        try:
+            # try stripping off the ".devices.<integer>" and see if we get
+            # a host name that matches our inventory
+            actual_host = '.'.join(host.split('.')[:-2])
+            return HostCacheStatus.devices if actual_host in self.mgr.inventory else HostCacheStatus.stray
+        except Exception:
+            return HostCacheStatus.stray
+
     def update_host_daemons(self, host, dm):
         # type: (str, Dict[str, orchestrator.DaemonDescription]) -> None
         self.daemons[host] = dm
@@ -671,9 +700,6 @@ class HostCache():
         if host in self.daemons:
             for name, dd in self.daemons[host].items():
                 j['daemons'][name] = dd.to_json()
-        if host in self.devices:
-            for d in self.devices[host]:
-                j['devices'].append(d.to_json())
         if host in self.networks:
             j['networks_and_interfaces'] = self.networks[host]
         if host in self.daemon_config_deps:
@@ -697,9 +723,71 @@ class HostCache():
             j['scheduled_daemon_actions'] = self.scheduled_daemon_actions[host]
         if host in self.metadata_up_to_date:
             j['metadata_up_to_date'] = self.metadata_up_to_date[host]
+        if host in self.devices:
+            self.save_host_devices(host)
 
         self.mgr.set_store(HOST_CACHE_PREFIX + host, json.dumps(j))
 
+    def save_host_devices(self, host: str) -> None:
+        if host not in self.devices or not self.devices[host]:
+            logger.debug(f'Host {host} has no devices to save')
+            return
+
+        devs: List[Dict[str, Any]] = []
+        for d in self.devices[host]:
+            devs.append(d.to_json())
+
+        def byte_len(s: str) -> int:
+            return len(s.encode('utf-8'))
+
+        dev_cache_counter: int = 0
+        cache_size: int = self.mgr.get_foreign_ceph_option('mon', 'mon_config_key_max_entry_size')
+        if cache_size is not None and cache_size != 0 and byte_len(json.dumps(devs)) > cache_size - 1024:
+            # no guarantee all device entries take up the same amount of space
+            # splitting it up so there's one more entry than we need should be fairly
+            # safe and save a lot of extra logic checking sizes
+            cache_entries_needed = math.ceil(byte_len(json.dumps(devs)) / cache_size) + 1
+            dev_sublist_size = math.ceil(len(devs) / cache_entries_needed)
+            dev_lists: List[List[Dict[str, Any]]] = [devs[i:i + dev_sublist_size]
+                                                     for i in range(0, len(devs), dev_sublist_size)]
+            for dev_list in dev_lists:
+                dev_dict: Dict[str, Any] = {'devices': dev_list}
+                if dev_cache_counter == 0:
+                    dev_dict.update({'entries': len(dev_lists)})
+                self.mgr.set_store(HOST_CACHE_PREFIX + host + '.devices.'
+                                   + str(dev_cache_counter), json.dumps(dev_dict))
+                dev_cache_counter += 1
+        else:
+            self.mgr.set_store(HOST_CACHE_PREFIX + host + '.devices.'
+                               + str(dev_cache_counter), json.dumps({'devices': devs, 'entries': 1}))
+
+    def load_host_devices(self, host: str) -> List[inventory.Device]:
+        dev_cache_counter: int = 0
+        devs: List[Dict[str, Any]] = []
+        dev_entries: int = 0
+        try:
+            # number of entries for the host's devices should be in
+            # the "entries" field of the first entry
+            dev_entries = json.loads(self.mgr.get_store(
+                HOST_CACHE_PREFIX + host + '.devices.0')).get('entries')
+        except Exception:
+            logger.debug(f'No device entries found for host {host}')
+        for i in range(dev_entries):
+            try:
+                new_devs = json.loads(self.mgr.get_store(
+                    HOST_CACHE_PREFIX + host + '.devices.' + str(i))).get('devices', [])
+                if len(new_devs) > 0:
+                    # verify list contains actual device objects by trying to load one from json
+                    inventory.Device.from_json(new_devs[0])
+                    # if we didn't throw an Exception on above line, we can add the devices
+                    devs = devs + new_devs
+                    dev_cache_counter += 1
+            except Exception as e:
+                logger.error(('Hit exception trying to load devices from '
+                             + f'{HOST_CACHE_PREFIX + host + ".devices." + str(dev_cache_counter)} in key store: {e}'))
+                return []
+        return [inventory.Device.from_json(d) for d in devs]
+
     def rm_host(self, host):
         # type: (str) -> None
         if host in self.daemons:
index d8eb76b43b0f8f25944af8dd97961d1fe0888f8b..2cce71aae2208c7983cbba9101922e0a48832ff8 100644 (file)
@@ -7,6 +7,7 @@ import pytest
 
 from ceph.deployment.drive_group import DriveGroupSpec, DeviceSelection
 from cephadm.serve import CephadmServe
+from cephadm.inventory import HostCacheStatus
 from cephadm.services.osd import OSD, OSDRemovalQueue, OsdIdClaims
 
 try:
@@ -1153,7 +1154,9 @@ class TestCephadm(object):
 
     @mock.patch("cephadm.module.CephadmOrchestrator.get_foreign_ceph_option")
     @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
-    def test_invalid_config_option_health_warning(self, _run_cephadm, get_foreign_ceph_option, cephadm_module: CephadmOrchestrator):
+    @mock.patch("cephadm.module.HostCache.save_host_devices")
+    def test_invalid_config_option_health_warning(self, _save_devs, _run_cephadm, get_foreign_ceph_option, cephadm_module: CephadmOrchestrator):
+        _save_devs.return_value = None
         _run_cephadm.side_effect = async_side_effect(('{}', '', 0))
         with with_host(cephadm_module, 'test'):
             ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1)
@@ -1167,6 +1170,144 @@ class TestCephadm(object):
             assert 'Ignoring invalid mgr config option test' in cephadm_module.health_checks[
                 'CEPHADM_INVALID_CONFIG_OPTION']['detail']
 
+    @mock.patch("cephadm.module.CephadmOrchestrator.get_foreign_ceph_option")
+    @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
+    @mock.patch("cephadm.module.CephadmOrchestrator.set_store")
+    def test_save_devices(self, _set_store, _run_cephadm, _get_foreign_ceph_option, cephadm_module: CephadmOrchestrator):
+        _run_cephadm.side_effect = async_side_effect(('{}', '', 0))
+        entry_size = 65536  # default 64k size
+        _get_foreign_ceph_option.return_value = entry_size
+
+        class FakeDev():
+            def __init__(self, c: str = 'a'):
+                # using 1015 here makes the serialized string exactly 1024 bytes if c is one char
+                self.content = {c: c * 1015}
+
+            def to_json(self):
+                return self.content
+
+            def from_json(self, stuff):
+                return json.loads(stuff)
+
+        def byte_len(s):
+            return len(s.encode('utf-8'))
+
+        with with_host(cephadm_module, 'test'):
+            fake_devices = [FakeDev()] * 100  # should be ~100k
+            assert byte_len(json.dumps([d.to_json() for d in fake_devices])) > entry_size
+            assert byte_len(json.dumps([d.to_json() for d in fake_devices])) < entry_size * 2
+            cephadm_module.cache.update_host_devices('test', fake_devices)
+            cephadm_module.cache.save_host_devices('test')
+            expected_calls = [
+                mock.call('host.test.devices.0', json.dumps(
+                    {'devices': [d.to_json() for d in [FakeDev()] * 34], 'entries': 3})),
+                mock.call('host.test.devices.1', json.dumps(
+                    {'devices': [d.to_json() for d in [FakeDev()] * 34]})),
+                mock.call('host.test.devices.2', json.dumps(
+                    {'devices': [d.to_json() for d in [FakeDev()] * 32]})),
+            ]
+            _set_store.assert_has_calls(expected_calls)
+
+            fake_devices = [FakeDev()] * 300  # should be ~300k
+            assert byte_len(json.dumps([d.to_json() for d in fake_devices])) > entry_size * 4
+            assert byte_len(json.dumps([d.to_json() for d in fake_devices])) < entry_size * 5
+            cephadm_module.cache.update_host_devices('test', fake_devices)
+            cephadm_module.cache.save_host_devices('test')
+            expected_calls = [
+                mock.call('host.test.devices.0', json.dumps(
+                    {'devices': [d.to_json() for d in [FakeDev()] * 50], 'entries': 6})),
+                mock.call('host.test.devices.1', json.dumps(
+                    {'devices': [d.to_json() for d in [FakeDev()] * 50]})),
+                mock.call('host.test.devices.2', json.dumps(
+                    {'devices': [d.to_json() for d in [FakeDev()] * 50]})),
+                mock.call('host.test.devices.3', json.dumps(
+                    {'devices': [d.to_json() for d in [FakeDev()] * 50]})),
+                mock.call('host.test.devices.4', json.dumps(
+                    {'devices': [d.to_json() for d in [FakeDev()] * 50]})),
+                mock.call('host.test.devices.5', json.dumps(
+                    {'devices': [d.to_json() for d in [FakeDev()] * 50]})),
+            ]
+            _set_store.assert_has_calls(expected_calls)
+
+            fake_devices = [FakeDev()] * 62  # should be ~62k, just under cache size
+            assert byte_len(json.dumps([d.to_json() for d in fake_devices])) < entry_size
+            cephadm_module.cache.update_host_devices('test', fake_devices)
+            cephadm_module.cache.save_host_devices('test')
+            expected_calls = [
+                mock.call('host.test.devices.0', json.dumps(
+                    {'devices': [d.to_json() for d in [FakeDev()] * 62], 'entries': 1})),
+            ]
+            _set_store.assert_has_calls(expected_calls)
+
+            # should be ~64k but just over so it requires more entries
+            fake_devices = [FakeDev()] * 64
+            assert byte_len(json.dumps([d.to_json() for d in fake_devices])) > entry_size
+            assert byte_len(json.dumps([d.to_json() for d in fake_devices])) < entry_size * 2
+            cephadm_module.cache.update_host_devices('test', fake_devices)
+            cephadm_module.cache.save_host_devices('test')
+            expected_calls = [
+                mock.call('host.test.devices.0', json.dumps(
+                    {'devices': [d.to_json() for d in [FakeDev()] * 22], 'entries': 3})),
+                mock.call('host.test.devices.1', json.dumps(
+                    {'devices': [d.to_json() for d in [FakeDev()] * 22]})),
+                mock.call('host.test.devices.2', json.dumps(
+                    {'devices': [d.to_json() for d in [FakeDev()] * 20]})),
+            ]
+            _set_store.assert_has_calls(expected_calls)
+
+            # test for actual content being correct using differing devices
+            entry_size = 3072
+            _get_foreign_ceph_option.return_value = entry_size
+            fake_devices = [FakeDev('a'), FakeDev('b'), FakeDev('c'), FakeDev('d'), FakeDev('e')]
+            assert byte_len(json.dumps([d.to_json() for d in fake_devices])) > entry_size
+            assert byte_len(json.dumps([d.to_json() for d in fake_devices])) < entry_size * 2
+            cephadm_module.cache.update_host_devices('test', fake_devices)
+            cephadm_module.cache.save_host_devices('test')
+            expected_calls = [
+                mock.call('host.test.devices.0', json.dumps(
+                    {'devices': [d.to_json() for d in [FakeDev('a'), FakeDev('b')]], 'entries': 3})),
+                mock.call('host.test.devices.1', json.dumps(
+                    {'devices': [d.to_json() for d in [FakeDev('c'), FakeDev('d')]]})),
+                mock.call('host.test.devices.2', json.dumps(
+                    {'devices': [d.to_json() for d in [FakeDev('e')]]})),
+            ]
+            _set_store.assert_has_calls(expected_calls)
+
+    @mock.patch("cephadm.module.CephadmOrchestrator.get_store")
+    def test_load_devices(self, _get_store, cephadm_module: CephadmOrchestrator):
+        def _fake_store(key):
+            if key == 'host.test.devices.0':
+                return json.dumps({'devices': [d.to_json() for d in [Device('/path')] * 9], 'entries': 3})
+            elif key == 'host.test.devices.1':
+                return json.dumps({'devices': [d.to_json() for d in [Device('/path')] * 7]})
+            elif key == 'host.test.devices.2':
+                return json.dumps({'devices': [d.to_json() for d in [Device('/path')] * 4]})
+            else:
+                raise Exception(f'Get store with unexpected value {key}')
+
+        _get_store.side_effect = _fake_store
+        devs = cephadm_module.cache.load_host_devices('test')
+        assert devs == [Device('/path')] * 20
+
+    @mock.patch("cephadm.module.Inventory.__contains__")
+    def test_check_stray_host_cache_entry(self, _contains, cephadm_module: CephadmOrchestrator):
+        def _fake_inv(key):
+            if key in ['host1', 'node02', 'host.something.com']:
+                return True
+            return False
+
+        _contains.side_effect = _fake_inv
+        assert cephadm_module.cache._get_host_cache_entry_status('host1') == HostCacheStatus.host
+        assert cephadm_module.cache._get_host_cache_entry_status(
+            'host.something.com') == HostCacheStatus.host
+        assert cephadm_module.cache._get_host_cache_entry_status(
+            'node02.devices.37') == HostCacheStatus.devices
+        assert cephadm_module.cache._get_host_cache_entry_status(
+            'host.something.com.devices.0') == HostCacheStatus.devices
+        assert cephadm_module.cache._get_host_cache_entry_status('hostXXX') == HostCacheStatus.stray
+        assert cephadm_module.cache._get_host_cache_entry_status(
+            'host.nothing.com') == HostCacheStatus.stray
+
     @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
     @mock.patch("cephadm.services.nfs.NFSService.run_grace_tool", mock.MagicMock())
     @mock.patch("cephadm.services.nfs.NFSService.purge", mock.MagicMock())