]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
mgr/dashboard_v2: Notification queue
authorRicardo Dias <rdias@suse.com>
Wed, 31 Jan 2018 15:07:00 +0000 (15:07 +0000)
committerRicardo Dias <rdias@suse.com>
Mon, 5 Mar 2018 13:07:04 +0000 (13:07 +0000)
Signed-off-by: Ricardo Dias <rdias@suse.com>
src/pybind/mgr/dashboard_v2/module.py
src/pybind/mgr/dashboard_v2/tests/test_notification.py [new file with mode: 0644]
src/pybind/mgr/dashboard_v2/tools.py

index c1d443b087109f92074ba5dfe2da6017c2a8e334..2e4cb97389b9d8a6596475514950695daa21c22d 100644 (file)
@@ -11,7 +11,8 @@ import cherrypy
 from mgr_module import MgrModule
 
 from .controllers.auth import Auth
-from .tools import load_controllers, json_error_page, SessionExpireAtBrowserCloseTool
+from .tools import load_controllers, json_error_page, SessionExpireAtBrowserCloseTool, \
+                   NotificationQueue
 from . import logger
 
 
@@ -88,12 +89,15 @@ class Module(MgrModule):
         self.configure_cherrypy()
 
         cherrypy.engine.start()
+        NotificationQueue.start_queue()
         logger.info('Waiting for engine...')
+        self.log.info('Waiting for engine...')
         cherrypy.engine.block()
         logger.info('Engine done')
 
     def shutdown(self):
         logger.info('Stopping server...')
+        NotificationQueue.stop()
         cherrypy.engine.exit()
         logger.info('Stopped server')
 
@@ -105,6 +109,9 @@ class Module(MgrModule):
         return (-errno.EINVAL, '', 'Command not found \'{0}\''
                 .format(cmd['prefix']))
 
+    def notify(self, notify_type, notify_id):
+        NotificationQueue.new_notification(notify_type, notify_id)
+
     class ApiRoot(object):
 
         def __init__(self, mgrmod):
diff --git a/src/pybind/mgr/dashboard_v2/tests/test_notification.py b/src/pybind/mgr/dashboard_v2/tests/test_notification.py
new file mode 100644 (file)
index 0000000..bca27f9
--- /dev/null
@@ -0,0 +1,94 @@
+# -*- coding: utf-8 -*-
+from __future__ import absolute_import
+
+import random
+import time
+import unittest
+
+
+from ..tools import NotificationQueue
+
+
+class Listener(object):
+    def __init__(self):
+        NotificationQueue.register(self.log_type1, 'type1')
+        NotificationQueue.register(self.log_type2, 'type2')
+        NotificationQueue.register(self.log_type1_3, ['type1', 'type3'])
+        NotificationQueue.register(self.log_all)
+        self.type1 = []
+        self.type2 = []
+        self.type1_3 = []
+        self.all = []
+
+        # these should be ignored by the queue
+        NotificationQueue.register(self.log_type1, 'type1')
+        NotificationQueue.register(self.log_type1_3, ['type1', 'type3'])
+        NotificationQueue.register(self.log_all)
+
+    def log_type1(self, val):
+        self.type1.append(val)
+
+    def log_type2(self, val):
+        self.type2.append(val)
+
+    def log_type1_3(self, val):
+        self.type1_3.append(val)
+
+    def log_all(self, val):
+        self.all.append(val)
+
+    def clear(self):
+        self.type1 = []
+        self.type2 = []
+        self.type1_3 = []
+        self.all = []
+
+
+class NotificationQueueTest(unittest.TestCase):
+    @classmethod
+    def setUpClass(cls):
+        cls.listener = Listener()
+
+    def setUp(self):
+        self.listener.clear()
+
+    def test_invalid_register(self):
+        with self.assertRaises(Exception) as ctx:
+            NotificationQueue.register(None, 1)
+        self.assertEqual(str(ctx.exception),
+                         "types param is neither a string nor a list")
+
+    def test_notifications(self):
+        NotificationQueue.start_queue()
+        NotificationQueue.new_notification('type1', 1)
+        NotificationQueue.new_notification('type2', 2)
+        NotificationQueue.new_notification('type3', 3)
+        NotificationQueue.stop()
+        self.assertEqual(self.listener.type1, [1])
+        self.assertEqual(self.listener.type2, [2])
+        self.assertEqual(self.listener.type1_3, [1, 3])
+        self.assertEqual(self.listener.all, [1, 2, 3])
+
+    def test_notifications2(self):
+        NotificationQueue.start_queue()
+        for i in range(0, 600):
+            typ = "type{}".format(i % 3 + 1)
+            if random.random() < 0.5:
+                time.sleep(0.002)
+            NotificationQueue.new_notification(typ, i)
+        NotificationQueue.stop()
+        for i in range(0, 500):
+            typ = i % 3 + 1
+            if typ == 1:
+                self.assertIn(i, self.listener.type1)
+                self.assertIn(i, self.listener.type1_3)
+            elif typ == 2:
+                self.assertIn(i, self.listener.type2)
+            elif typ == 3:
+                self.assertIn(i, self.listener.type1_3)
+            self.assertIn(i, self.listener.all)
+
+        self.assertEqual(len(self.listener.type1), 200)
+        self.assertEqual(len(self.listener.type2), 200)
+        self.assertEqual(len(self.listener.type1_3), 400)
+        self.assertEqual(len(self.listener.all), 600)
index e803a68757e443e7f4a5c529d66943112a6fdd28..65c4e96eb3566071bc80acf0ef36dacf893d1105 100644 (file)
@@ -2,12 +2,14 @@
 # pylint: disable=W0212
 from __future__ import absolute_import
 
+import collections
 import importlib
 import inspect
 import json
 import os
 import pkgutil
 import sys
+import threading
 
 import six
 import cherrypy
@@ -298,3 +300,93 @@ class SessionExpireAtBrowserCloseTool(cherrypy.Tool):
             if name in cookie:
                 del cookie[name]['expires']
                 del cookie[name]['max-age']
+
+
+class NotificationQueue(threading.Thread):
+    _ALL_TYPES_ = '__ALL__'
+    _listeners = collections.defaultdict(set)
+    _lock = threading.Lock()
+    _cond = threading.Condition()
+    _queue = collections.deque()
+    _running = False
+    _instance = None
+
+    def __init__(self):
+        super(NotificationQueue, self).__init__()
+
+    @classmethod
+    def start_queue(cls):
+        with cls._lock:
+            if cls._instance:
+                # the queue thread is already running
+                return
+            cls._running = True
+            cls._instance = NotificationQueue()
+        cls._instance.start()
+
+    @classmethod
+    def stop(cls):
+        with cls._lock:
+            if not cls._instance:
+                # the queue thread was not started
+                return
+            instance = cls._instance
+            cls._instance = None
+            cls._running = False
+        with cls._cond:
+            cls._cond.notify()
+        instance.join()
+
+    @classmethod
+    def register(cls, func, types=None):
+        """Registers function to listen for notifications
+
+        If the second parameter `types` is omitted, the function in `func`
+        parameter will be called for any type of notifications.
+
+        Args:
+            func (function): python function ex: def foo(val)
+            types (str|list): the single type to listen, or a list of types
+        """
+        with cls._lock:
+            if not types:
+                cls._listeners[cls._ALL_TYPES_].add(func)
+                return
+            if isinstance(types, str):
+                cls._listeners[types].add(func)
+            elif isinstance(types, list):
+                for typ in types:
+                    cls._listeners[typ].add(func)
+            else:
+                raise Exception("types param is neither a string nor a list")
+
+    @classmethod
+    def new_notification(cls, notify_type, notify_value):
+        cls._queue.append((notify_type, notify_value))
+        with cls._cond:
+            cls._cond.notify()
+
+    @classmethod
+    def notify_listeners(cls, events):
+        for ev in events:
+            notify_type, notify_value = ev
+            with cls._lock:
+                listeners = list(cls._listeners[notify_type])
+                listeners.extend(cls._listeners[cls._ALL_TYPES_])
+            for listener in listeners:
+                listener(notify_value)
+
+    def run(self):
+        while self._running:
+            private_buffer = []
+            try:
+                while True:
+                    private_buffer.append(self._queue.popleft())
+            except IndexError:
+                pass
+            self.notify_listeners(private_buffer)
+            with self._cond:
+                self._cond.wait(1.0)
+        # flush remaining events
+        self.notify_listeners(self._queue)
+        self._queue.clear()