From b7c2ffaf2412d659dfabb1939128d868f8351113 Mon Sep 17 00:00:00 2001 From: Ricardo Dias Date: Wed, 31 Jan 2018 15:07:00 +0000 Subject: [PATCH] mgr/dashboard_v2: Notification queue Signed-off-by: Ricardo Dias --- src/pybind/mgr/dashboard_v2/module.py | 9 +- .../dashboard_v2/tests/test_notification.py | 94 +++++++++++++++++++ src/pybind/mgr/dashboard_v2/tools.py | 92 ++++++++++++++++++ 3 files changed, 194 insertions(+), 1 deletion(-) create mode 100644 src/pybind/mgr/dashboard_v2/tests/test_notification.py diff --git a/src/pybind/mgr/dashboard_v2/module.py b/src/pybind/mgr/dashboard_v2/module.py index c1d443b087109..2e4cb97389b9d 100644 --- a/src/pybind/mgr/dashboard_v2/module.py +++ b/src/pybind/mgr/dashboard_v2/module.py @@ -11,7 +11,8 @@ import cherrypy from mgr_module import MgrModule from .controllers.auth import Auth -from .tools import load_controllers, json_error_page, SessionExpireAtBrowserCloseTool +from .tools import load_controllers, json_error_page, SessionExpireAtBrowserCloseTool, \ + NotificationQueue from . import logger @@ -88,12 +89,15 @@ class Module(MgrModule): self.configure_cherrypy() cherrypy.engine.start() + NotificationQueue.start_queue() logger.info('Waiting for engine...') + self.log.info('Waiting for engine...') cherrypy.engine.block() logger.info('Engine done') def shutdown(self): logger.info('Stopping server...') + NotificationQueue.stop() cherrypy.engine.exit() logger.info('Stopped server') @@ -105,6 +109,9 @@ class Module(MgrModule): return (-errno.EINVAL, '', 'Command not found \'{0}\'' .format(cmd['prefix'])) + def notify(self, notify_type, notify_id): + NotificationQueue.new_notification(notify_type, notify_id) + class ApiRoot(object): def __init__(self, mgrmod): diff --git a/src/pybind/mgr/dashboard_v2/tests/test_notification.py b/src/pybind/mgr/dashboard_v2/tests/test_notification.py new file mode 100644 index 0000000000000..bca27f9e6f911 --- /dev/null +++ b/src/pybind/mgr/dashboard_v2/tests/test_notification.py @@ -0,0 +1,94 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import + +import random +import time +import unittest + + +from ..tools import NotificationQueue + + +class Listener(object): + def __init__(self): + NotificationQueue.register(self.log_type1, 'type1') + NotificationQueue.register(self.log_type2, 'type2') + NotificationQueue.register(self.log_type1_3, ['type1', 'type3']) + NotificationQueue.register(self.log_all) + self.type1 = [] + self.type2 = [] + self.type1_3 = [] + self.all = [] + + # these should be ignored by the queue + NotificationQueue.register(self.log_type1, 'type1') + NotificationQueue.register(self.log_type1_3, ['type1', 'type3']) + NotificationQueue.register(self.log_all) + + def log_type1(self, val): + self.type1.append(val) + + def log_type2(self, val): + self.type2.append(val) + + def log_type1_3(self, val): + self.type1_3.append(val) + + def log_all(self, val): + self.all.append(val) + + def clear(self): + self.type1 = [] + self.type2 = [] + self.type1_3 = [] + self.all = [] + + +class NotificationQueueTest(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.listener = Listener() + + def setUp(self): + self.listener.clear() + + def test_invalid_register(self): + with self.assertRaises(Exception) as ctx: + NotificationQueue.register(None, 1) + self.assertEqual(str(ctx.exception), + "types param is neither a string nor a list") + + def test_notifications(self): + NotificationQueue.start_queue() + NotificationQueue.new_notification('type1', 1) + NotificationQueue.new_notification('type2', 2) + NotificationQueue.new_notification('type3', 3) + NotificationQueue.stop() + self.assertEqual(self.listener.type1, [1]) + self.assertEqual(self.listener.type2, [2]) + self.assertEqual(self.listener.type1_3, [1, 3]) + self.assertEqual(self.listener.all, [1, 2, 3]) + + def test_notifications2(self): + NotificationQueue.start_queue() + for i in range(0, 600): + typ = "type{}".format(i % 3 + 1) + if random.random() < 0.5: + time.sleep(0.002) + NotificationQueue.new_notification(typ, i) + NotificationQueue.stop() + for i in range(0, 500): + typ = i % 3 + 1 + if typ == 1: + self.assertIn(i, self.listener.type1) + self.assertIn(i, self.listener.type1_3) + elif typ == 2: + self.assertIn(i, self.listener.type2) + elif typ == 3: + self.assertIn(i, self.listener.type1_3) + self.assertIn(i, self.listener.all) + + self.assertEqual(len(self.listener.type1), 200) + self.assertEqual(len(self.listener.type2), 200) + self.assertEqual(len(self.listener.type1_3), 400) + self.assertEqual(len(self.listener.all), 600) diff --git a/src/pybind/mgr/dashboard_v2/tools.py b/src/pybind/mgr/dashboard_v2/tools.py index e803a68757e44..65c4e96eb3566 100644 --- a/src/pybind/mgr/dashboard_v2/tools.py +++ b/src/pybind/mgr/dashboard_v2/tools.py @@ -2,12 +2,14 @@ # pylint: disable=W0212 from __future__ import absolute_import +import collections import importlib import inspect import json import os import pkgutil import sys +import threading import six import cherrypy @@ -298,3 +300,93 @@ class SessionExpireAtBrowserCloseTool(cherrypy.Tool): if name in cookie: del cookie[name]['expires'] del cookie[name]['max-age'] + + +class NotificationQueue(threading.Thread): + _ALL_TYPES_ = '__ALL__' + _listeners = collections.defaultdict(set) + _lock = threading.Lock() + _cond = threading.Condition() + _queue = collections.deque() + _running = False + _instance = None + + def __init__(self): + super(NotificationQueue, self).__init__() + + @classmethod + def start_queue(cls): + with cls._lock: + if cls._instance: + # the queue thread is already running + return + cls._running = True + cls._instance = NotificationQueue() + cls._instance.start() + + @classmethod + def stop(cls): + with cls._lock: + if not cls._instance: + # the queue thread was not started + return + instance = cls._instance + cls._instance = None + cls._running = False + with cls._cond: + cls._cond.notify() + instance.join() + + @classmethod + def register(cls, func, types=None): + """Registers function to listen for notifications + + If the second parameter `types` is omitted, the function in `func` + parameter will be called for any type of notifications. + + Args: + func (function): python function ex: def foo(val) + types (str|list): the single type to listen, or a list of types + """ + with cls._lock: + if not types: + cls._listeners[cls._ALL_TYPES_].add(func) + return + if isinstance(types, str): + cls._listeners[types].add(func) + elif isinstance(types, list): + for typ in types: + cls._listeners[typ].add(func) + else: + raise Exception("types param is neither a string nor a list") + + @classmethod + def new_notification(cls, notify_type, notify_value): + cls._queue.append((notify_type, notify_value)) + with cls._cond: + cls._cond.notify() + + @classmethod + def notify_listeners(cls, events): + for ev in events: + notify_type, notify_value = ev + with cls._lock: + listeners = list(cls._listeners[notify_type]) + listeners.extend(cls._listeners[cls._ALL_TYPES_]) + for listener in listeners: + listener(notify_value) + + def run(self): + while self._running: + private_buffer = [] + try: + while True: + private_buffer.append(self._queue.popleft()) + except IndexError: + pass + self.notify_listeners(private_buffer) + with self._cond: + self._cond.wait(1.0) + # flush remaining events + self.notify_listeners(self._queue) + self._queue.clear() -- 2.39.5