]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
mgr/prometheus/test_module: Adding unit-test for new classes
authorNitzanMordhai <nmordech@ibm.com>
Mon, 2 Feb 2026 13:37:34 +0000 (13:37 +0000)
committerNitzanMordhai <nmordech@ibm.com>
Thu, 5 Feb 2026 12:09:16 +0000 (12:09 +0000)
Fixes: https://tracker.ceph.com/issues/74149
Signed-off-by: Nitzan Mordechai <nmordech@ibm.com>
src/pybind/mgr/prometheus/test_module.py

index 0647cb658c75de174a9ced594f5bd9ab5fd794a1..8a2a686de3e9ac33eee79868141ab55b439d6dd4 100644 (file)
@@ -1,7 +1,8 @@
 from typing import Dict
-from unittest import TestCase
+from unittest import TestCase, mock
 
-from prometheus.module import Metric, LabelValues, Number
+from prometheus.module import Metric, LabelValues, Number, HealthHistory, ThreadSafeLRUCacheDict
+import threading
 
 
 class MetricGroupTest(TestCase):
@@ -91,3 +92,92 @@ ceph_disk_occupation_display{ceph_daemon="osd.5+osd.6",device="/dev/dm-1",instan
         with self.assertRaises(AssertionError) as cm:
             m.group_by(["foo"], {"bar": "not callable str"})
         self.assertEqual(str(cm.exception), "joins must be callable")
+
+
+class HealthHistoryTest(TestCase):
+    def setUp(self):
+        self.mgr = mock.MagicMock()
+        self.mgr.get_localized_module_option.return_value = 1000
+        self.mgr.get_store.return_value = "{}"
+        self.health_history = HealthHistory(self.mgr)
+
+    def test_check_save_no_deadlock(self):
+        """Verifies that check() can call save() without deadlocking."""
+        info = {'severity': 1}
+        health_data = {
+            'checks': {
+                'OSD_DOWN': info
+            }
+        }
+
+        def call_check():
+            self.health_history.check(health_data)
+
+        t = threading.Thread(target=call_check)
+        t.start()
+        t.join(timeout=2)
+
+        self.assertFalse(t.is_alive(), "Deadlock detected: Thread is still hung!")
+        self.mgr.set_store.assert_called()
+
+
+class ThreadSafeLRUCacheDictTest(TestCase):
+    def test_items_returns_snapshot(self):
+        """items() should return a list snapshot, not a live view."""
+        d = ThreadSafeLRUCacheDict(maxsize=10)
+        d['a'] = 1
+        d['b'] = 2
+        items = d.items()
+        self.assertIsInstance(items, list)
+        self.assertEqual(items, [('a', 1), ('b', 2)])
+
+    def test_keys_returns_snapshot(self):
+        d = ThreadSafeLRUCacheDict(maxsize=10)
+        d['a'] = 1
+        keys = d.keys()
+        self.assertIsInstance(keys, list)
+        self.assertEqual(keys, ['a'])
+
+    def test_values_returns_snapshot(self):
+        d = ThreadSafeLRUCacheDict(maxsize=10)
+        d['a'] = 1
+        values = d.values()
+        self.assertIsInstance(values, list)
+        self.assertEqual(values, [1])
+
+    def test_lru_eviction(self):
+        d = ThreadSafeLRUCacheDict(maxsize=2)
+        d['a'] = 1
+        d['b'] = 2
+        d['c'] = 3  # This should evict 'a'
+        self.assertNotIn('a', d)
+        self.assertIn('b', d)
+        self.assertIn('c', d)
+
+    def test_reentrancy(self):
+        cache = ThreadSafeLRUCacheDict(maxsize=10)
+
+        with cache._lock:
+            cache['key1'] = 'value1'
+            self.assertEqual(cache['key1'], 'value1')
+            self.assertEqual(len(cache), 1)
+
+    def test_concurrent_writes(self):
+        cache = ThreadSafeLRUCacheDict(maxsize=100)
+
+        def writer(start, end):
+            for i in range(start, end):
+                cache[f'key{i}'] = f'value{i}'
+
+        threads = []
+        for i in range(5):
+            t = threading.Thread(target=writer, args=(i * 20, (i + 1) * 20))
+            threads.append(t)
+            t.start()
+
+        for t in threads:
+            t.join()
+
+        self.assertEqual(len(cache), 100)
+        for i in range(100):
+            self.assertEqual(cache[f'key{i}'], f'value{i}')