From: NitzanMordhai Date: Mon, 2 Feb 2026 13:37:34 +0000 (+0000) Subject: mgr/prometheus/test_module: Adding unit-test for new classes X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=b31df463d149707b966e72a739355ecc663283f8;p=ceph-ci.git mgr/prometheus/test_module: Adding unit-test for new classes Fixes: https://tracker.ceph.com/issues/74149 Signed-off-by: Nitzan Mordechai --- diff --git a/src/pybind/mgr/prometheus/test_module.py b/src/pybind/mgr/prometheus/test_module.py index 0647cb658c7..8a2a686de3e 100644 --- a/src/pybind/mgr/prometheus/test_module.py +++ b/src/pybind/mgr/prometheus/test_module.py @@ -1,7 +1,8 @@ from typing import Dict -from unittest import TestCase +from unittest import TestCase, mock -from prometheus.module import Metric, LabelValues, Number +from prometheus.module import Metric, LabelValues, Number, HealthHistory, ThreadSafeLRUCacheDict +import threading class MetricGroupTest(TestCase): @@ -91,3 +92,92 @@ ceph_disk_occupation_display{ceph_daemon="osd.5+osd.6",device="/dev/dm-1",instan with self.assertRaises(AssertionError) as cm: m.group_by(["foo"], {"bar": "not callable str"}) self.assertEqual(str(cm.exception), "joins must be callable") + + +class HealthHistoryTest(TestCase): + def setUp(self): + self.mgr = mock.MagicMock() + self.mgr.get_localized_module_option.return_value = 1000 + self.mgr.get_store.return_value = "{}" + self.health_history = HealthHistory(self.mgr) + + def test_check_save_no_deadlock(self): + """Verifies that check() can call save() without deadlocking.""" + info = {'severity': 1} + health_data = { + 'checks': { + 'OSD_DOWN': info + } + } + + def call_check(): + self.health_history.check(health_data) + + t = threading.Thread(target=call_check) + t.start() + t.join(timeout=2) + + self.assertFalse(t.is_alive(), "Deadlock detected: Thread is still hung!") + self.mgr.set_store.assert_called() + + +class ThreadSafeLRUCacheDictTest(TestCase): + def test_items_returns_snapshot(self): + """items() should return a list snapshot, not a live view.""" + d = ThreadSafeLRUCacheDict(maxsize=10) + d['a'] = 1 + d['b'] = 2 + items = d.items() + self.assertIsInstance(items, list) + self.assertEqual(items, [('a', 1), ('b', 2)]) + + def test_keys_returns_snapshot(self): + d = ThreadSafeLRUCacheDict(maxsize=10) + d['a'] = 1 + keys = d.keys() + self.assertIsInstance(keys, list) + self.assertEqual(keys, ['a']) + + def test_values_returns_snapshot(self): + d = ThreadSafeLRUCacheDict(maxsize=10) + d['a'] = 1 + values = d.values() + self.assertIsInstance(values, list) + self.assertEqual(values, [1]) + + def test_lru_eviction(self): + d = ThreadSafeLRUCacheDict(maxsize=2) + d['a'] = 1 + d['b'] = 2 + d['c'] = 3 # This should evict 'a' + self.assertNotIn('a', d) + self.assertIn('b', d) + self.assertIn('c', d) + + def test_reentrancy(self): + cache = ThreadSafeLRUCacheDict(maxsize=10) + + with cache._lock: + cache['key1'] = 'value1' + self.assertEqual(cache['key1'], 'value1') + self.assertEqual(len(cache), 1) + + def test_concurrent_writes(self): + cache = ThreadSafeLRUCacheDict(maxsize=100) + + def writer(start, end): + for i in range(start, end): + cache[f'key{i}'] = f'value{i}' + + threads = [] + for i in range(5): + t = threading.Thread(target=writer, args=(i * 20, (i + 1) * 20)) + threads.append(t) + t.start() + + for t in threads: + t.join() + + self.assertEqual(len(cache), 100) + for i in range(100): + self.assertEqual(cache[f'key{i}'], f'value{i}')