from typing import Dict
-from unittest import TestCase
+from unittest import TestCase, mock
-from prometheus.module import Metric, LabelValues, Number
+from prometheus.module import Metric, LabelValues, Number, HealthHistory, ThreadSafeLRUCacheDict
+import threading
class MetricGroupTest(TestCase):
with self.assertRaises(AssertionError) as cm:
m.group_by(["foo"], {"bar": "not callable str"})
self.assertEqual(str(cm.exception), "joins must be callable")
+
+
+class HealthHistoryTest(TestCase):
+ def setUp(self):
+ self.mgr = mock.MagicMock()
+ self.mgr.get_localized_module_option.return_value = 1000
+ self.mgr.get_store.return_value = "{}"
+ self.health_history = HealthHistory(self.mgr)
+
+ def test_check_save_no_deadlock(self):
+ """Verifies that check() can call save() without deadlocking."""
+ info = {'severity': 1}
+ health_data = {
+ 'checks': {
+ 'OSD_DOWN': info
+ }
+ }
+
+ def call_check():
+ self.health_history.check(health_data)
+
+ t = threading.Thread(target=call_check)
+ t.start()
+ t.join(timeout=2)
+
+ self.assertFalse(t.is_alive(), "Deadlock detected: Thread is still hung!")
+ self.mgr.set_store.assert_called()
+
+
+class ThreadSafeLRUCacheDictTest(TestCase):
+ def test_items_returns_snapshot(self):
+ """items() should return a list snapshot, not a live view."""
+ d = ThreadSafeLRUCacheDict(maxsize=10)
+ d['a'] = 1
+ d['b'] = 2
+ items = d.items()
+ self.assertIsInstance(items, list)
+ self.assertEqual(items, [('a', 1), ('b', 2)])
+
+ def test_keys_returns_snapshot(self):
+ d = ThreadSafeLRUCacheDict(maxsize=10)
+ d['a'] = 1
+ keys = d.keys()
+ self.assertIsInstance(keys, list)
+ self.assertEqual(keys, ['a'])
+
+ def test_values_returns_snapshot(self):
+ d = ThreadSafeLRUCacheDict(maxsize=10)
+ d['a'] = 1
+ values = d.values()
+ self.assertIsInstance(values, list)
+ self.assertEqual(values, [1])
+
+ def test_lru_eviction(self):
+ d = ThreadSafeLRUCacheDict(maxsize=2)
+ d['a'] = 1
+ d['b'] = 2
+ d['c'] = 3 # This should evict 'a'
+ self.assertNotIn('a', d)
+ self.assertIn('b', d)
+ self.assertIn('c', d)
+
+ def test_reentrancy(self):
+ cache = ThreadSafeLRUCacheDict(maxsize=10)
+
+ with cache._lock:
+ cache['key1'] = 'value1'
+ self.assertEqual(cache['key1'], 'value1')
+ self.assertEqual(len(cache), 1)
+
+ def test_concurrent_writes(self):
+ cache = ThreadSafeLRUCacheDict(maxsize=100)
+
+ def writer(start, end):
+ for i in range(start, end):
+ cache[f'key{i}'] = f'value{i}'
+
+ threads = []
+ for i in range(5):
+ t = threading.Thread(target=writer, args=(i * 20, (i + 1) * 20))
+ threads.append(t)
+ t.start()
+
+ for t in threads:
+ t.join()
+
+ self.assertEqual(len(cache), 100)
+ for i in range(100):
+ self.assertEqual(cache[f'key{i}'], f'value{i}')