class Listener(object):
+ # pylint: disable=too-many-instance-attributes
def __init__(self):
- NotificationQueue.register(self.log_type1, 'type1')
+ NotificationQueue.register(self.log_type1, 'type1', priority=90)
NotificationQueue.register(self.log_type2, 'type2')
NotificationQueue.register(self.log_type1_3, ['type1', 'type3'])
- NotificationQueue.register(self.log_all)
+ NotificationQueue.register(self.log_all, priority=50)
self.type1 = []
+ self.type1_ts = []
self.type2 = []
+ self.type2_ts = []
self.type1_3 = []
+ self.type1_3_ts = []
self.all = []
+ self.all_ts = []
# these should be ignored by the queue
NotificationQueue.register(self.log_type1, 'type1')
NotificationQueue.register(self.log_all)
def log_type1(self, val):
+ self.type1_ts.append(time.time())
self.type1.append(val)
def log_type2(self, val):
+ self.type2_ts.append(time.time())
self.type2.append(val)
def log_type1_3(self, val):
+ self.type1_3_ts.append(time.time())
self.type1_3.append(val)
def log_all(self, val):
+ self.all_ts.append(time.time())
self.all.append(val)
def clear(self):
self.type1 = []
+ self.type1_ts = []
self.type2 = []
+ self.type2_ts = []
self.type1_3 = []
+ self.type1_3_ts = []
self.all = []
+ self.all_ts = []
class NotificationQueueTest(unittest.TestCase):
self.assertEqual(self.listener.type1_3, [1, 3])
self.assertEqual(self.listener.all, [1, 2, 3])
+ # validate priorities
+ self.assertLess(self.listener.type1_3_ts[0], self.listener.all_ts[0])
+ self.assertLess(self.listener.all_ts[0], self.listener.type1_ts[0])
+ self.assertLess(self.listener.type2_ts[0], self.listener.all_ts[1])
+ self.assertLess(self.listener.type1_3_ts[1], self.listener.all_ts[2])
+
def test_notifications2(self):
NotificationQueue.start_queue()
for i in range(0, 600):
time.sleep(0.002)
NotificationQueue.new_notification(typ, i)
NotificationQueue.stop()
- for i in range(0, 500):
+ for i in range(0, 600):
typ = i % 3 + 1
if typ == 1:
self.assertIn(i, self.listener.type1)
logger.debug("notification queue stopped")
@classmethod
- def register(cls, func, n_types=None):
+ def _registered_handler(cls, func, n_types):
+ for _, reg_func in cls._listeners[n_types]:
+ if reg_func == func:
+ return True
+ return False
+
+ @classmethod
+ def register(cls, func, n_types=None, priority=1):
"""Registers function to listen for notifications
If the second parameter `n_types` is omitted, the function in `func`
Args:
func (function): python function ex: def foo(val)
- n_types (str|list): the single type to listen, or a list of notification types
+ n_types (str|list): the single type to listen, or a list of types
+ priority (int): the priority level (1=max, +inf=min)
"""
with cls._lock:
if not n_types:
- cls._listeners[cls._ALL_TYPES_].add(func)
+ if not cls._registered_handler(func, cls._ALL_TYPES_):
+ cls._listeners[cls._ALL_TYPES_].add((priority, func))
return
if isinstance(n_types, str):
- cls._listeners[n_types].add(func)
+ if not cls._registered_handler(func, n_types):
+ cls._listeners[n_types].add((priority, func))
elif isinstance(n_types, list):
for typ in n_types:
- cls._listeners[typ].add(func)
+ if not cls._registered_handler(func, typ):
+ cls._listeners[typ].add((priority, func))
else:
raise Exception("n_types param is neither a string nor a list")
with cls._lock:
listeners = list(cls._listeners[notify_type])
listeners.extend(cls._listeners[cls._ALL_TYPES_])
+ listeners.sort(key=lambda lis: lis[0])
for listener in listeners:
- listener(notify_value)
+ listener[1](notify_value)
def run(self):
logger.debug("notification queue started")